feat(chat): add runs API, subagent registry, and cascade stop
Backend support for multi-session chat: /api/chat/runs endpoint for parent/subagent run status, subagent registry for reading run state, and cascade stop to abort child sessions when stopping a parent.
This commit is contained in:
parent
46fe15df81
commit
8838cc4e16
107
apps/web/app/api/chat/runs/route.test.ts
Normal file
107
apps/web/app/api/chat/runs/route.test.ts
Normal file
@ -0,0 +1,107 @@
|
||||
import { beforeEach, describe, expect, it, vi } from "vitest";
|
||||
|
||||
vi.mock("@/lib/active-runs", () => ({
|
||||
getActiveRun: vi.fn(),
|
||||
}));
|
||||
|
||||
vi.mock("@/lib/subagent-registry", () => ({
|
||||
listSubagentsForRequesterSession: vi.fn(),
|
||||
}));
|
||||
|
||||
vi.mock("@/lib/workspace", () => ({
|
||||
resolveActiveAgentId: vi.fn(() => "main"),
|
||||
}));
|
||||
|
||||
vi.mock("@/app/api/web-sessions/shared", () => ({
|
||||
readIndex: vi.fn(() => []),
|
||||
resolveSessionKey: vi.fn((sessionId: string, fallbackAgentId: string) => `agent:${fallbackAgentId}:web:${sessionId}`),
|
||||
}));
|
||||
|
||||
describe("GET /api/chat/runs", () => {
|
||||
beforeEach(() => {
|
||||
vi.resetModules();
|
||||
vi.clearAllMocks();
|
||||
});
|
||||
|
||||
it("returns active parent runs plus subagents mapped back to their parent web session", async () => {
|
||||
const { getActiveRun } = await import("@/lib/active-runs");
|
||||
const { listSubagentsForRequesterSession } = await import("@/lib/subagent-registry");
|
||||
const { readIndex } = await import("@/app/api/web-sessions/shared");
|
||||
|
||||
vi.mocked(readIndex).mockReturnValue([
|
||||
{ id: "parent-1", title: "Parent 1", createdAt: 1, updatedAt: 1, messageCount: 2 },
|
||||
{ id: "parent-2", title: "Parent 2", createdAt: 1, updatedAt: 1, messageCount: 3 },
|
||||
] as never);
|
||||
|
||||
vi.mocked(getActiveRun).mockImplementation(((sessionId: string) => {
|
||||
if (sessionId === "parent-1") {
|
||||
return { status: "running" };
|
||||
}
|
||||
if (sessionId === "parent-2") {
|
||||
return { status: "waiting-for-subagents" };
|
||||
}
|
||||
return undefined;
|
||||
}) as never);
|
||||
|
||||
vi.mocked(listSubagentsForRequesterSession).mockImplementation(((requesterSessionKey: string) => {
|
||||
if (requesterSessionKey === "agent:main:web:parent-1") {
|
||||
return [
|
||||
{
|
||||
runId: "run-1",
|
||||
childSessionKey: "agent:chat-slot-main-1:subagent:child-1",
|
||||
requesterSessionKey,
|
||||
task: "Collect facts",
|
||||
label: "Fact finding",
|
||||
status: "running",
|
||||
createdAt: 10,
|
||||
},
|
||||
];
|
||||
}
|
||||
if (requesterSessionKey === "agent:main:web:parent-2") {
|
||||
return [
|
||||
{
|
||||
runId: "run-2",
|
||||
childSessionKey: "agent:chat-slot-main-2:subagent:child-2",
|
||||
requesterSessionKey,
|
||||
task: "Summarize",
|
||||
status: "completed",
|
||||
createdAt: 20,
|
||||
endedAt: 30,
|
||||
},
|
||||
];
|
||||
}
|
||||
return [];
|
||||
}) as never);
|
||||
|
||||
const { GET } = await import("./route.js");
|
||||
const res = await GET();
|
||||
const json = await res.json();
|
||||
|
||||
expect(json.parentRuns).toEqual([
|
||||
{ sessionId: "parent-1", status: "running" },
|
||||
{ sessionId: "parent-2", status: "waiting-for-subagents" },
|
||||
]);
|
||||
expect(json.subagents).toEqual([
|
||||
{
|
||||
childSessionKey: "agent:chat-slot-main-1:subagent:child-1",
|
||||
parentSessionId: "parent-1",
|
||||
runId: "run-1",
|
||||
task: "Collect facts",
|
||||
label: "Fact finding",
|
||||
status: "running",
|
||||
startedAt: 10,
|
||||
endedAt: undefined,
|
||||
},
|
||||
{
|
||||
childSessionKey: "agent:chat-slot-main-2:subagent:child-2",
|
||||
parentSessionId: "parent-2",
|
||||
runId: "run-2",
|
||||
task: "Summarize",
|
||||
label: undefined,
|
||||
status: "completed",
|
||||
startedAt: 20,
|
||||
endedAt: 30,
|
||||
},
|
||||
]);
|
||||
});
|
||||
});
|
||||
47
apps/web/app/api/chat/runs/route.ts
Normal file
47
apps/web/app/api/chat/runs/route.ts
Normal file
@ -0,0 +1,47 @@
|
||||
import { getActiveRun } from "@/lib/active-runs";
|
||||
import { listSubagentsForRequesterSession } from "@/lib/subagent-registry";
|
||||
import { resolveActiveAgentId } from "@/lib/workspace";
|
||||
import { readIndex, resolveSessionKey } from "@/app/api/web-sessions/shared";
|
||||
|
||||
export const runtime = "nodejs";
|
||||
|
||||
export function GET() {
|
||||
const sessions = readIndex();
|
||||
const fallbackAgentId = resolveActiveAgentId();
|
||||
const parentSessionKeys = new Map(
|
||||
sessions.map((session) => [resolveSessionKey(session.id, fallbackAgentId), session.id]),
|
||||
);
|
||||
|
||||
const parentRuns = sessions
|
||||
.map((session) => {
|
||||
const run = getActiveRun(session.id);
|
||||
if (!run) {
|
||||
return null;
|
||||
}
|
||||
return {
|
||||
sessionId: session.id,
|
||||
status: run.status,
|
||||
};
|
||||
})
|
||||
.filter((run): run is { sessionId: string; status: "running" | "waiting-for-subagents" | "completed" | "error" } => Boolean(run));
|
||||
|
||||
const subagents = [...parentSessionKeys.entries()]
|
||||
.flatMap(([requesterSessionKey, parentSessionId]) =>
|
||||
listSubagentsForRequesterSession(requesterSessionKey).map((entry) => ({
|
||||
childSessionKey: entry.childSessionKey,
|
||||
parentSessionId,
|
||||
runId: entry.runId,
|
||||
task: entry.task,
|
||||
label: entry.label || undefined,
|
||||
status: entry.status,
|
||||
startedAt: entry.createdAt,
|
||||
endedAt: entry.endedAt,
|
||||
})),
|
||||
)
|
||||
.toSorted((a, b) => (a.startedAt ?? 0) - (b.startedAt ?? 0));
|
||||
|
||||
return Response.json({
|
||||
parentRuns,
|
||||
subagents,
|
||||
});
|
||||
}
|
||||
113
apps/web/app/api/chat/stop/route.test.ts
Normal file
113
apps/web/app/api/chat/stop/route.test.ts
Normal file
@ -0,0 +1,113 @@
|
||||
import { beforeEach, describe, expect, it, vi } from "vitest";
|
||||
|
||||
vi.mock("@/lib/active-runs", () => ({
|
||||
abortRun: vi.fn(() => false),
|
||||
getActiveRun: vi.fn(),
|
||||
}));
|
||||
|
||||
vi.mock("@/lib/subagent-registry", () => ({
|
||||
listSubagentsForRequesterSession: vi.fn(() => []),
|
||||
}));
|
||||
|
||||
vi.mock("@/lib/workspace", () => ({
|
||||
resolveActiveAgentId: vi.fn(() => "main"),
|
||||
}));
|
||||
|
||||
vi.mock("@/app/api/web-sessions/shared", () => ({
|
||||
resolveSessionKey: vi.fn((sessionId: string, fallbackAgentId: string) => `agent:${fallbackAgentId}:web:${sessionId}`),
|
||||
}));
|
||||
|
||||
vi.mock("@/lib/telemetry", () => ({
|
||||
trackServer: vi.fn(),
|
||||
}));
|
||||
|
||||
describe("POST /api/chat/stop", () => {
|
||||
beforeEach(() => {
|
||||
vi.resetModules();
|
||||
vi.clearAllMocks();
|
||||
});
|
||||
|
||||
it("stops a parent session and all active child subagents when cascadeChildren is enabled (prevents orphan background work)", async () => {
|
||||
const { abortRun, getActiveRun } = await import("@/lib/active-runs");
|
||||
const { listSubagentsForRequesterSession } = await import("@/lib/subagent-registry");
|
||||
|
||||
vi.mocked(getActiveRun).mockImplementation(((runKey: string) => {
|
||||
if (runKey === "parent-1") {
|
||||
return { status: "waiting-for-subagents" };
|
||||
}
|
||||
if (runKey === "agent:chat-slot-main-1:subagent:child-1") {
|
||||
return { status: "running" };
|
||||
}
|
||||
if (runKey === "agent:chat-slot-main-2:subagent:child-2") {
|
||||
return { status: "completed" };
|
||||
}
|
||||
return undefined;
|
||||
}) as never);
|
||||
|
||||
vi.mocked(listSubagentsForRequesterSession).mockReturnValue([
|
||||
{
|
||||
runId: "run-1",
|
||||
childSessionKey: "agent:chat-slot-main-1:subagent:child-1",
|
||||
requesterSessionKey: "agent:main:web:parent-1",
|
||||
task: "Collect facts",
|
||||
status: "running",
|
||||
},
|
||||
{
|
||||
runId: "run-2",
|
||||
childSessionKey: "agent:chat-slot-main-2:subagent:child-2",
|
||||
requesterSessionKey: "agent:main:web:parent-1",
|
||||
task: "Already done",
|
||||
status: "completed",
|
||||
},
|
||||
] as never);
|
||||
|
||||
vi.mocked(abortRun).mockReturnValue(true);
|
||||
|
||||
const { POST } = await import("./route.js");
|
||||
const req = new Request("http://localhost/api/chat/stop", {
|
||||
method: "POST",
|
||||
headers: { "Content-Type": "application/json" },
|
||||
body: JSON.stringify({
|
||||
sessionId: "parent-1",
|
||||
cascadeChildren: true,
|
||||
}),
|
||||
});
|
||||
|
||||
const res = await POST(req);
|
||||
const json = await res.json();
|
||||
|
||||
expect(abortRun).toHaveBeenCalledWith("parent-1");
|
||||
expect(abortRun).toHaveBeenCalledWith("agent:chat-slot-main-1:subagent:child-1");
|
||||
expect(abortRun).not.toHaveBeenCalledWith("agent:chat-slot-main-2:subagent:child-2");
|
||||
expect(json).toEqual({ aborted: true, abortedChildren: 1 });
|
||||
});
|
||||
|
||||
it("stops only the requested subagent session when sessionKey is provided", async () => {
|
||||
const { abortRun, getActiveRun } = await import("@/lib/active-runs");
|
||||
const { listSubagentsForRequesterSession } = await import("@/lib/subagent-registry");
|
||||
|
||||
vi.mocked(getActiveRun).mockImplementation(((runKey: string) => {
|
||||
if (runKey === "agent:chat-slot-main-1:subagent:child-1") {
|
||||
return { status: "running" };
|
||||
}
|
||||
return undefined;
|
||||
}) as never);
|
||||
vi.mocked(abortRun).mockReturnValue(true);
|
||||
|
||||
const { POST } = await import("./route.js");
|
||||
const req = new Request("http://localhost/api/chat/stop", {
|
||||
method: "POST",
|
||||
headers: { "Content-Type": "application/json" },
|
||||
body: JSON.stringify({
|
||||
sessionKey: "agent:chat-slot-main-1:subagent:child-1",
|
||||
}),
|
||||
});
|
||||
|
||||
const res = await POST(req);
|
||||
const json = await res.json();
|
||||
|
||||
expect(abortRun).toHaveBeenCalledWith("agent:chat-slot-main-1:subagent:child-1");
|
||||
expect(listSubagentsForRequesterSession).not.toHaveBeenCalled();
|
||||
expect(json).toEqual({ aborted: true, abortedChildren: 0 });
|
||||
});
|
||||
});
|
||||
@ -5,12 +5,15 @@
|
||||
* Works for both parent sessions (by sessionId) and subagent sessions (by sessionKey).
|
||||
*/
|
||||
import { abortRun, getActiveRun } from "@/lib/active-runs";
|
||||
import { listSubagentsForRequesterSession } from "@/lib/subagent-registry";
|
||||
import { trackServer } from "@/lib/telemetry";
|
||||
import { resolveActiveAgentId } from "@/lib/workspace";
|
||||
import { resolveSessionKey } from "@/app/api/web-sessions/shared";
|
||||
|
||||
export const runtime = "nodejs";
|
||||
|
||||
export async function POST(req: Request) {
|
||||
const body: { sessionId?: string; sessionKey?: string } = await req
|
||||
const body: { sessionId?: string; sessionKey?: string; cascadeChildren?: boolean } = await req
|
||||
.json()
|
||||
.catch(() => ({}));
|
||||
|
||||
@ -25,8 +28,22 @@ export async function POST(req: Request) {
|
||||
const canAbort =
|
||||
run?.status === "running" || run?.status === "waiting-for-subagents";
|
||||
const aborted = canAbort ? abortRun(runKey) : false;
|
||||
if (aborted) {
|
||||
let abortedChildren = 0;
|
||||
|
||||
if (!isSubagentSession && body.sessionId && body.cascadeChildren) {
|
||||
const fallbackAgentId = resolveActiveAgentId();
|
||||
const requesterSessionKey = resolveSessionKey(body.sessionId, fallbackAgentId);
|
||||
for (const subagent of listSubagentsForRequesterSession(requesterSessionKey)) {
|
||||
const childRun = getActiveRun(subagent.childSessionKey);
|
||||
const canAbortChild =
|
||||
childRun?.status === "running" || childRun?.status === "waiting-for-subagents";
|
||||
if (canAbortChild && abortRun(subagent.childSessionKey)) {
|
||||
abortedChildren += 1;
|
||||
}
|
||||
}
|
||||
}
|
||||
if (aborted || abortedChildren > 0) {
|
||||
trackServer("chat_stopped");
|
||||
}
|
||||
return Response.json({ aborted });
|
||||
return Response.json({ aborted, abortedChildren });
|
||||
}
|
||||
|
||||
@ -3,8 +3,10 @@ import { randomUUID } from "node:crypto";
|
||||
import { trackServer } from "@/lib/telemetry";
|
||||
import { type WebSessionMeta, ensureDir, readIndex, writeIndex } from "./shared";
|
||||
import {
|
||||
ensureManagedWorkspaceRouting,
|
||||
getActiveWorkspaceName,
|
||||
resolveActiveAgentId,
|
||||
resolveWorkspaceDirForName,
|
||||
resolveWorkspaceRoot,
|
||||
} from "@/lib/workspace";
|
||||
import { allocateChatAgent } from "@/lib/chat-agent-registry";
|
||||
@ -34,9 +36,10 @@ export async function POST(req: Request) {
|
||||
const id = randomUUID();
|
||||
const now = Date.now();
|
||||
|
||||
const workspaceName = getActiveWorkspaceName() ?? undefined;
|
||||
const workspaceName = getActiveWorkspaceName() ?? "default";
|
||||
const workspaceRoot = resolveWorkspaceRoot() ?? resolveWorkspaceDirForName(workspaceName);
|
||||
ensureManagedWorkspaceRouting(workspaceName, workspaceRoot, { markDefault: false });
|
||||
const workspaceAgentId = resolveActiveAgentId();
|
||||
const workspaceRoot = resolveWorkspaceRoot() ?? undefined;
|
||||
|
||||
// Assign a pool slot agent for concurrent chat support.
|
||||
// Falls back to the workspace agent if no slots are available.
|
||||
@ -59,7 +62,7 @@ export async function POST(req: Request) {
|
||||
updatedAt: now,
|
||||
messageCount: 0,
|
||||
...(body.filePath ? { filePath: body.filePath } : {}),
|
||||
workspaceName,
|
||||
workspaceName: workspaceName || undefined,
|
||||
workspaceRoot,
|
||||
workspaceAgentId,
|
||||
chatAgentId,
|
||||
|
||||
54
apps/web/lib/subagent-registry.ts
Normal file
54
apps/web/lib/subagent-registry.ts
Normal file
@ -0,0 +1,54 @@
|
||||
import { existsSync, readFileSync } from "node:fs";
|
||||
import { join } from "node:path";
|
||||
import { resolveOpenClawStateDir } from "./workspace";
|
||||
|
||||
export type SubagentRegistryEntry = {
|
||||
runId: string;
|
||||
childSessionKey: string;
|
||||
requesterSessionKey: string;
|
||||
task: string;
|
||||
label?: string;
|
||||
createdAt?: number;
|
||||
endedAt?: number;
|
||||
outcome?: { status: string; error?: string };
|
||||
};
|
||||
|
||||
export function readSubagentRegistry(): SubagentRegistryEntry[] {
|
||||
const registryPath = join(resolveOpenClawStateDir(), "subagents", "runs.json");
|
||||
if (!existsSync(registryPath)) {
|
||||
return [];
|
||||
}
|
||||
|
||||
try {
|
||||
const raw = JSON.parse(readFileSync(registryPath, "utf-8")) as {
|
||||
runs?: Record<string, SubagentRegistryEntry>;
|
||||
};
|
||||
return Object.values(raw.runs ?? {});
|
||||
} catch {
|
||||
return [];
|
||||
}
|
||||
}
|
||||
|
||||
export function resolveSubagentStatus(
|
||||
entry: SubagentRegistryEntry,
|
||||
): "running" | "completed" | "error" {
|
||||
if (typeof entry.endedAt !== "number") {
|
||||
return "running";
|
||||
}
|
||||
if (entry.outcome?.status === "error") {
|
||||
return "error";
|
||||
}
|
||||
return "completed";
|
||||
}
|
||||
|
||||
export function listSubagentsForRequesterSession(
|
||||
requesterSessionKey: string,
|
||||
): Array<SubagentRegistryEntry & { status: "running" | "completed" | "error" }> {
|
||||
return readSubagentRegistry()
|
||||
.filter((entry) => entry.requesterSessionKey === requesterSessionKey)
|
||||
.map((entry) => ({
|
||||
...entry,
|
||||
status: resolveSubagentStatus(entry),
|
||||
}))
|
||||
.toSorted((a, b) => (a.createdAt ?? 0) - (b.createdAt ?? 0));
|
||||
}
|
||||
Loading…
x
Reference in New Issue
Block a user