From 8838cc4e1667dd8451a7d36f5a00e6f375f4dbaf Mon Sep 17 00:00:00 2001 From: kumarabhirup Date: Sun, 15 Mar 2026 00:30:43 -0700 Subject: [PATCH] feat(chat): add runs API, subagent registry, and cascade stop Backend support for multi-session chat: /api/chat/runs endpoint for parent/subagent run status, subagent registry for reading run state, and cascade stop to abort child sessions when stopping a parent. --- apps/web/app/api/chat/runs/route.test.ts | 107 +++++++++++++++++++++ apps/web/app/api/chat/runs/route.ts | 47 ++++++++++ apps/web/app/api/chat/stop/route.test.ts | 113 +++++++++++++++++++++++ apps/web/app/api/chat/stop/route.ts | 23 ++++- apps/web/app/api/web-sessions/route.ts | 9 +- apps/web/lib/subagent-registry.ts | 54 +++++++++++ 6 files changed, 347 insertions(+), 6 deletions(-) create mode 100644 apps/web/app/api/chat/runs/route.test.ts create mode 100644 apps/web/app/api/chat/runs/route.ts create mode 100644 apps/web/app/api/chat/stop/route.test.ts create mode 100644 apps/web/lib/subagent-registry.ts diff --git a/apps/web/app/api/chat/runs/route.test.ts b/apps/web/app/api/chat/runs/route.test.ts new file mode 100644 index 00000000000..da8523b8aa1 --- /dev/null +++ b/apps/web/app/api/chat/runs/route.test.ts @@ -0,0 +1,107 @@ +import { beforeEach, describe, expect, it, vi } from "vitest"; + +vi.mock("@/lib/active-runs", () => ({ + getActiveRun: vi.fn(), +})); + +vi.mock("@/lib/subagent-registry", () => ({ + listSubagentsForRequesterSession: vi.fn(), +})); + +vi.mock("@/lib/workspace", () => ({ + resolveActiveAgentId: vi.fn(() => "main"), +})); + +vi.mock("@/app/api/web-sessions/shared", () => ({ + readIndex: vi.fn(() => []), + resolveSessionKey: vi.fn((sessionId: string, fallbackAgentId: string) => `agent:${fallbackAgentId}:web:${sessionId}`), +})); + +describe("GET /api/chat/runs", () => { + beforeEach(() => { + vi.resetModules(); + vi.clearAllMocks(); + }); + + it("returns active parent runs plus subagents mapped back to their parent web session", async () => { + const { getActiveRun } = await import("@/lib/active-runs"); + const { listSubagentsForRequesterSession } = await import("@/lib/subagent-registry"); + const { readIndex } = await import("@/app/api/web-sessions/shared"); + + vi.mocked(readIndex).mockReturnValue([ + { id: "parent-1", title: "Parent 1", createdAt: 1, updatedAt: 1, messageCount: 2 }, + { id: "parent-2", title: "Parent 2", createdAt: 1, updatedAt: 1, messageCount: 3 }, + ] as never); + + vi.mocked(getActiveRun).mockImplementation(((sessionId: string) => { + if (sessionId === "parent-1") { + return { status: "running" }; + } + if (sessionId === "parent-2") { + return { status: "waiting-for-subagents" }; + } + return undefined; + }) as never); + + vi.mocked(listSubagentsForRequesterSession).mockImplementation(((requesterSessionKey: string) => { + if (requesterSessionKey === "agent:main:web:parent-1") { + return [ + { + runId: "run-1", + childSessionKey: "agent:chat-slot-main-1:subagent:child-1", + requesterSessionKey, + task: "Collect facts", + label: "Fact finding", + status: "running", + createdAt: 10, + }, + ]; + } + if (requesterSessionKey === "agent:main:web:parent-2") { + return [ + { + runId: "run-2", + childSessionKey: "agent:chat-slot-main-2:subagent:child-2", + requesterSessionKey, + task: "Summarize", + status: "completed", + createdAt: 20, + endedAt: 30, + }, + ]; + } + return []; + }) as never); + + const { GET } = await import("./route.js"); + const res = await GET(); + const json = await res.json(); + + expect(json.parentRuns).toEqual([ + { sessionId: "parent-1", status: "running" }, + { sessionId: "parent-2", status: "waiting-for-subagents" }, + ]); + expect(json.subagents).toEqual([ + { + childSessionKey: "agent:chat-slot-main-1:subagent:child-1", + parentSessionId: "parent-1", + runId: "run-1", + task: "Collect facts", + label: "Fact finding", + status: "running", + startedAt: 10, + endedAt: undefined, + }, + { + childSessionKey: "agent:chat-slot-main-2:subagent:child-2", + parentSessionId: "parent-2", + runId: "run-2", + task: "Summarize", + label: undefined, + status: "completed", + startedAt: 20, + endedAt: 30, + }, + ]); + }); +}); diff --git a/apps/web/app/api/chat/runs/route.ts b/apps/web/app/api/chat/runs/route.ts new file mode 100644 index 00000000000..e285b28824d --- /dev/null +++ b/apps/web/app/api/chat/runs/route.ts @@ -0,0 +1,47 @@ +import { getActiveRun } from "@/lib/active-runs"; +import { listSubagentsForRequesterSession } from "@/lib/subagent-registry"; +import { resolveActiveAgentId } from "@/lib/workspace"; +import { readIndex, resolveSessionKey } from "@/app/api/web-sessions/shared"; + +export const runtime = "nodejs"; + +export function GET() { + const sessions = readIndex(); + const fallbackAgentId = resolveActiveAgentId(); + const parentSessionKeys = new Map( + sessions.map((session) => [resolveSessionKey(session.id, fallbackAgentId), session.id]), + ); + + const parentRuns = sessions + .map((session) => { + const run = getActiveRun(session.id); + if (!run) { + return null; + } + return { + sessionId: session.id, + status: run.status, + }; + }) + .filter((run): run is { sessionId: string; status: "running" | "waiting-for-subagents" | "completed" | "error" } => Boolean(run)); + + const subagents = [...parentSessionKeys.entries()] + .flatMap(([requesterSessionKey, parentSessionId]) => + listSubagentsForRequesterSession(requesterSessionKey).map((entry) => ({ + childSessionKey: entry.childSessionKey, + parentSessionId, + runId: entry.runId, + task: entry.task, + label: entry.label || undefined, + status: entry.status, + startedAt: entry.createdAt, + endedAt: entry.endedAt, + })), + ) + .toSorted((a, b) => (a.startedAt ?? 0) - (b.startedAt ?? 0)); + + return Response.json({ + parentRuns, + subagents, + }); +} diff --git a/apps/web/app/api/chat/stop/route.test.ts b/apps/web/app/api/chat/stop/route.test.ts new file mode 100644 index 00000000000..b1dbd85e564 --- /dev/null +++ b/apps/web/app/api/chat/stop/route.test.ts @@ -0,0 +1,113 @@ +import { beforeEach, describe, expect, it, vi } from "vitest"; + +vi.mock("@/lib/active-runs", () => ({ + abortRun: vi.fn(() => false), + getActiveRun: vi.fn(), +})); + +vi.mock("@/lib/subagent-registry", () => ({ + listSubagentsForRequesterSession: vi.fn(() => []), +})); + +vi.mock("@/lib/workspace", () => ({ + resolveActiveAgentId: vi.fn(() => "main"), +})); + +vi.mock("@/app/api/web-sessions/shared", () => ({ + resolveSessionKey: vi.fn((sessionId: string, fallbackAgentId: string) => `agent:${fallbackAgentId}:web:${sessionId}`), +})); + +vi.mock("@/lib/telemetry", () => ({ + trackServer: vi.fn(), +})); + +describe("POST /api/chat/stop", () => { + beforeEach(() => { + vi.resetModules(); + vi.clearAllMocks(); + }); + + it("stops a parent session and all active child subagents when cascadeChildren is enabled (prevents orphan background work)", async () => { + const { abortRun, getActiveRun } = await import("@/lib/active-runs"); + const { listSubagentsForRequesterSession } = await import("@/lib/subagent-registry"); + + vi.mocked(getActiveRun).mockImplementation(((runKey: string) => { + if (runKey === "parent-1") { + return { status: "waiting-for-subagents" }; + } + if (runKey === "agent:chat-slot-main-1:subagent:child-1") { + return { status: "running" }; + } + if (runKey === "agent:chat-slot-main-2:subagent:child-2") { + return { status: "completed" }; + } + return undefined; + }) as never); + + vi.mocked(listSubagentsForRequesterSession).mockReturnValue([ + { + runId: "run-1", + childSessionKey: "agent:chat-slot-main-1:subagent:child-1", + requesterSessionKey: "agent:main:web:parent-1", + task: "Collect facts", + status: "running", + }, + { + runId: "run-2", + childSessionKey: "agent:chat-slot-main-2:subagent:child-2", + requesterSessionKey: "agent:main:web:parent-1", + task: "Already done", + status: "completed", + }, + ] as never); + + vi.mocked(abortRun).mockReturnValue(true); + + const { POST } = await import("./route.js"); + const req = new Request("http://localhost/api/chat/stop", { + method: "POST", + headers: { "Content-Type": "application/json" }, + body: JSON.stringify({ + sessionId: "parent-1", + cascadeChildren: true, + }), + }); + + const res = await POST(req); + const json = await res.json(); + + expect(abortRun).toHaveBeenCalledWith("parent-1"); + expect(abortRun).toHaveBeenCalledWith("agent:chat-slot-main-1:subagent:child-1"); + expect(abortRun).not.toHaveBeenCalledWith("agent:chat-slot-main-2:subagent:child-2"); + expect(json).toEqual({ aborted: true, abortedChildren: 1 }); + }); + + it("stops only the requested subagent session when sessionKey is provided", async () => { + const { abortRun, getActiveRun } = await import("@/lib/active-runs"); + const { listSubagentsForRequesterSession } = await import("@/lib/subagent-registry"); + + vi.mocked(getActiveRun).mockImplementation(((runKey: string) => { + if (runKey === "agent:chat-slot-main-1:subagent:child-1") { + return { status: "running" }; + } + return undefined; + }) as never); + vi.mocked(abortRun).mockReturnValue(true); + + const { POST } = await import("./route.js"); + const req = new Request("http://localhost/api/chat/stop", { + method: "POST", + headers: { "Content-Type": "application/json" }, + body: JSON.stringify({ + sessionKey: "agent:chat-slot-main-1:subagent:child-1", + }), + }); + + const res = await POST(req); + const json = await res.json(); + + expect(abortRun).toHaveBeenCalledWith("agent:chat-slot-main-1:subagent:child-1"); + expect(listSubagentsForRequesterSession).not.toHaveBeenCalled(); + expect(json).toEqual({ aborted: true, abortedChildren: 0 }); + }); +}); diff --git a/apps/web/app/api/chat/stop/route.ts b/apps/web/app/api/chat/stop/route.ts index 22ee1a6b905..c4886dca8aa 100644 --- a/apps/web/app/api/chat/stop/route.ts +++ b/apps/web/app/api/chat/stop/route.ts @@ -5,12 +5,15 @@ * Works for both parent sessions (by sessionId) and subagent sessions (by sessionKey). */ import { abortRun, getActiveRun } from "@/lib/active-runs"; +import { listSubagentsForRequesterSession } from "@/lib/subagent-registry"; import { trackServer } from "@/lib/telemetry"; +import { resolveActiveAgentId } from "@/lib/workspace"; +import { resolveSessionKey } from "@/app/api/web-sessions/shared"; export const runtime = "nodejs"; export async function POST(req: Request) { - const body: { sessionId?: string; sessionKey?: string } = await req + const body: { sessionId?: string; sessionKey?: string; cascadeChildren?: boolean } = await req .json() .catch(() => ({})); @@ -25,8 +28,22 @@ export async function POST(req: Request) { const canAbort = run?.status === "running" || run?.status === "waiting-for-subagents"; const aborted = canAbort ? abortRun(runKey) : false; - if (aborted) { + let abortedChildren = 0; + + if (!isSubagentSession && body.sessionId && body.cascadeChildren) { + const fallbackAgentId = resolveActiveAgentId(); + const requesterSessionKey = resolveSessionKey(body.sessionId, fallbackAgentId); + for (const subagent of listSubagentsForRequesterSession(requesterSessionKey)) { + const childRun = getActiveRun(subagent.childSessionKey); + const canAbortChild = + childRun?.status === "running" || childRun?.status === "waiting-for-subagents"; + if (canAbortChild && abortRun(subagent.childSessionKey)) { + abortedChildren += 1; + } + } + } + if (aborted || abortedChildren > 0) { trackServer("chat_stopped"); } - return Response.json({ aborted }); + return Response.json({ aborted, abortedChildren }); } diff --git a/apps/web/app/api/web-sessions/route.ts b/apps/web/app/api/web-sessions/route.ts index 772dea7df67..6373753baf5 100644 --- a/apps/web/app/api/web-sessions/route.ts +++ b/apps/web/app/api/web-sessions/route.ts @@ -3,8 +3,10 @@ import { randomUUID } from "node:crypto"; import { trackServer } from "@/lib/telemetry"; import { type WebSessionMeta, ensureDir, readIndex, writeIndex } from "./shared"; import { + ensureManagedWorkspaceRouting, getActiveWorkspaceName, resolveActiveAgentId, + resolveWorkspaceDirForName, resolveWorkspaceRoot, } from "@/lib/workspace"; import { allocateChatAgent } from "@/lib/chat-agent-registry"; @@ -34,9 +36,10 @@ export async function POST(req: Request) { const id = randomUUID(); const now = Date.now(); - const workspaceName = getActiveWorkspaceName() ?? undefined; + const workspaceName = getActiveWorkspaceName() ?? "default"; + const workspaceRoot = resolveWorkspaceRoot() ?? resolveWorkspaceDirForName(workspaceName); + ensureManagedWorkspaceRouting(workspaceName, workspaceRoot, { markDefault: false }); const workspaceAgentId = resolveActiveAgentId(); - const workspaceRoot = resolveWorkspaceRoot() ?? undefined; // Assign a pool slot agent for concurrent chat support. // Falls back to the workspace agent if no slots are available. @@ -59,7 +62,7 @@ export async function POST(req: Request) { updatedAt: now, messageCount: 0, ...(body.filePath ? { filePath: body.filePath } : {}), - workspaceName, + workspaceName: workspaceName || undefined, workspaceRoot, workspaceAgentId, chatAgentId, diff --git a/apps/web/lib/subagent-registry.ts b/apps/web/lib/subagent-registry.ts new file mode 100644 index 00000000000..eed8769d8b9 --- /dev/null +++ b/apps/web/lib/subagent-registry.ts @@ -0,0 +1,54 @@ +import { existsSync, readFileSync } from "node:fs"; +import { join } from "node:path"; +import { resolveOpenClawStateDir } from "./workspace"; + +export type SubagentRegistryEntry = { + runId: string; + childSessionKey: string; + requesterSessionKey: string; + task: string; + label?: string; + createdAt?: number; + endedAt?: number; + outcome?: { status: string; error?: string }; +}; + +export function readSubagentRegistry(): SubagentRegistryEntry[] { + const registryPath = join(resolveOpenClawStateDir(), "subagents", "runs.json"); + if (!existsSync(registryPath)) { + return []; + } + + try { + const raw = JSON.parse(readFileSync(registryPath, "utf-8")) as { + runs?: Record; + }; + return Object.values(raw.runs ?? {}); + } catch { + return []; + } +} + +export function resolveSubagentStatus( + entry: SubagentRegistryEntry, +): "running" | "completed" | "error" { + if (typeof entry.endedAt !== "number") { + return "running"; + } + if (entry.outcome?.status === "error") { + return "error"; + } + return "completed"; +} + +export function listSubagentsForRequesterSession( + requesterSessionKey: string, +): Array { + return readSubagentRegistry() + .filter((entry) => entry.requesterSessionKey === requesterSessionKey) + .map((entry) => ({ + ...entry, + status: resolveSubagentStatus(entry), + })) + .toSorted((a, b) => (a.createdAt ?? 0) - (b.createdAt ?? 0)); +}