diff --git a/apps/web/lib/active-runs.ts b/apps/web/lib/active-runs.ts index 58168455ca8..a3540addb08 100644 --- a/apps/web/lib/active-runs.ts +++ b/apps/web/lib/active-runs.ts @@ -21,6 +21,7 @@ import { resolveWebChatDir } from "./workspace"; import { type AgentEvent, spawnAgentProcess, + spawnAgentSubscribeProcess, resolvePackageRoot, extractToolResult, buildToolOutput, @@ -33,6 +34,7 @@ import { ensureRegisteredFromDisk, hasActiveSubagent as hasSubagentRun, activateGatewayFallback, + hasRunningSubagentsForParent, } from "./subagent-runs"; // ── Types ── @@ -68,7 +70,7 @@ export type ActiveRun = { eventBuffer: SseEvent[]; subscribers: Set; accumulated: AccumulatedMessage; - status: "running" | "completed" | "error"; + status: "running" | "waiting-for-subagents" | "completed" | "error"; startedAt: number; exitCode: number | null; abortController: AbortController; @@ -76,6 +78,10 @@ export type ActiveRun = { _persistTimer: ReturnType | null; /** @internal last time persistence was flushed */ _lastPersistedAt: number; + /** @internal last globalSeq seen from the gateway event stream */ + lastGlobalSeq: number; + /** @internal subscribe child process for waiting-for-subagents continuation */ + _subscribeProcess?: ChildProcess | null; }; // ── Constants ── @@ -109,14 +115,14 @@ export function getActiveRun(sessionId: string): ActiveRun | undefined { /** Check whether a *running* (not just completed) run exists for a session. */ export function hasActiveRun(sessionId: string): boolean { const run = activeRuns.get(sessionId); - return run !== undefined && run.status === "running"; + return run !== undefined && (run.status === "running" || run.status === "waiting-for-subagents"); } /** Return the session IDs of all currently running agent runs. */ export function getRunningSessionIds(): string[] { const ids: string[] = []; for (const [sessionId, run] of activeRuns) { - if (run.status === "running") { + if (run.status === "running" || run.status === "waiting-for-subagents") { ids.push(sessionId); } } @@ -150,7 +156,7 @@ export function subscribeToRun( } // If the run already finished, signal completion immediately. - if (run.status !== "running") { + if (run.status !== "running" && run.status !== "waiting-for-subagents") { callback(null); return () => {}; } @@ -164,14 +170,20 @@ export function subscribeToRun( /** Abort a running agent. Returns true if a run was actually aborted. */ export function abortRun(sessionId: string): boolean { const run = activeRuns.get(sessionId); - if (!run || run.status !== "running") {return false;} + if (!run || (run.status !== "running" && run.status !== "waiting-for-subagents")) {return false;} // Immediately mark the run as non-running so hasActiveRun() returns // false and the next user message isn't rejected with 409. + const wasWaiting = run.status === "waiting-for-subagents"; run.status = "error"; + // Clean up waiting subscribe process if present. + stopSubscribeProcess(run); + run.abortController.abort(); - run.childProcess.kill("SIGTERM"); + if (!wasWaiting) { + run.childProcess.kill("SIGTERM"); + } // Send chat.abort directly to the gateway so the agent run stops // even if the CLI child's best-effort onAbort doesn't complete in time. @@ -196,12 +208,14 @@ export function abortRun(sessionId: string): boolean { // Fallback: if the child doesn't exit within 5 seconds after // SIGTERM (e.g. the CLI's best-effort chat.abort RPC hangs), // send SIGKILL to force-terminate. - const killTimer = setTimeout(() => { - try { - run.childProcess.kill("SIGKILL"); - } catch { /* already dead */ } - }, 5_000); - run.childProcess.once("close", () => clearTimeout(killTimer)); + if (!wasWaiting) { + const killTimer = setTimeout(() => { + try { + run.childProcess.kill("SIGKILL"); + } catch { /* already dead */ } + }, 5_000); + run.childProcess.once("close", () => clearTimeout(killTimer)); + } return true; } @@ -284,6 +298,7 @@ export function startRun(params: { abortController, _persistTimer: null, _lastPersistedAt: 0, + lastGlobalSeq: 0, }; activeRuns.set(sessionId, run); @@ -512,39 +527,12 @@ function wireChildProcess(run: ActiveRun): void { // emitting user-visible diagnostics. }); - rl.on("line", (line: string) => { - if (!line.trim()) {return;} - - let ev: AgentEvent; - try { - ev = JSON.parse(line) as AgentEvent; - } catch { - return; - } - - // ── Route non-parent events to SubagentRunManager ── - // The CLI child process receives ALL gateway broadcasts, including - // events from subagent runs. Filter them out of the parent chat - // and route to the SubagentRunManager for separate streaming. - if (ev.sessionKey && ev.sessionKey !== parentSessionKey) { - const childKey = ev.sessionKey; - // Try to register the subagent if not yet known. Events - // arriving before runs.json is written get buffered inside - // routeRawEvent and replayed upon successful registration. - if (!hasSubagentRun(childKey) && !seenSubagentKeys.has(childKey)) { - if (ensureRegisteredFromDisk(childKey, run.sessionId)) { - seenSubagentKeys.add(childKey); - } - // Don't add to seenSubagentKeys on failure — retry on the next event - } - routeSubagentEvent(childKey, { - event: ev.event, - stream: ev.stream, - data: ev.data, - }); - return; - } + // ── Reusable parent event processor ── + // Handles lifecycle, thinking, assistant text, tool, compaction, and error + // events for the parent agent. Used by both the CLI NDJSON stream and the + // subscribe-only CLI fallback (waiting-for-subagents state). + const processParentEvent = (ev: AgentEvent) => { // Lifecycle start if ( ev.event === "agent" && @@ -660,7 +648,6 @@ function wireChildProcess(run: ActiveRun): void { toolName, input: args, }); - // Accumulate tool start in ordered parts run.accumulated.parts.push({ type: "tool-invocation", toolCallId, @@ -681,7 +668,6 @@ function wireChildProcess(run: ActiveRun): void { toolCallId, errorText, }); - // Update the accumulated tool part const idx = accToolMap.get(toolCallId); if (idx !== undefined) { const part = run.accumulated.parts[idx]; @@ -696,7 +682,6 @@ function wireChildProcess(run: ActiveRun): void { toolCallId, output, }); - // Update the accumulated tool part const idx = accToolMap.get(toolCallId); if (idx !== undefined) { const part = run.accumulated.parts[idx]; @@ -768,6 +753,68 @@ function wireChildProcess(run: ActiveRun): void { emitError(msg); } } + }; + + const processParentSubscribeEvent = (ev: AgentEvent) => { + const gSeq = typeof (ev as Record).globalSeq === "number" + ? (ev as Record).globalSeq as number + : undefined; + if (gSeq !== undefined) { + if (gSeq <= run.lastGlobalSeq) {return;} + run.lastGlobalSeq = gSeq; + } + processParentEvent(ev); + if (ev.stream === "lifecycle" && ev.data?.phase === "end") { + if (hasRunningSubagentsForParent(run.sessionId)) { + openStatusReasoning("Waiting for subagent results..."); + flushPersistence(run); + } else { + finalizeWaitingRun(run); + } + } + }; + + rl.on("line", (line: string) => { + if (!line.trim()) {return;} + + let ev: AgentEvent; + try { + ev = JSON.parse(line) as AgentEvent; + } catch { + return; + } + + // ── Route non-parent events to SubagentRunManager ── + // The CLI child process receives ALL gateway broadcasts, including + // events from subagent runs. Filter them out of the parent chat + // and route to the SubagentRunManager for separate streaming. + if (ev.sessionKey && ev.sessionKey !== parentSessionKey) { + const childKey = ev.sessionKey; + if (!hasSubagentRun(childKey) && !seenSubagentKeys.has(childKey)) { + if (ensureRegisteredFromDisk(childKey, run.sessionId)) { + seenSubagentKeys.add(childKey); + } + } + routeSubagentEvent(childKey, { + event: ev.event, + stream: ev.stream, + data: ev.data, + globalSeq: typeof (ev as Record).globalSeq === "number" + ? (ev as Record).globalSeq as number + : undefined, + }); + return; + } + + // Track the global event cursor from the gateway for replay on handoff. + const gSeq = typeof (ev as Record).globalSeq === "number" + ? (ev as Record).globalSeq as number + : undefined; + if (gSeq !== undefined && gSeq > run.lastGlobalSeq) { + run.lastGlobalSeq = gSeq; + } + + processParentEvent(ev); }); // ── Child process exit ── @@ -789,28 +836,48 @@ function wireChildProcess(run: ActiveRun): void { } closeReasoning(); - if (!everSentText) { + + const exitedClean = code === 0 || code === null; + + if (!everSentText && !exitedClean) { const tid = nextId("text"); emit({ type: "text-start", id: tid }); - const errMsg = - code !== null && code !== 0 - ? `[error] Agent exited with code ${code}. Check server logs for details.` - : "[error] No response from agent."; + const errMsg = `[error] Agent exited with code ${code}. Check server logs for details.`; emit({ type: "text-delta", id: tid, delta: errMsg }); emit({ type: "text-end", id: tid }); accAppendText(errMsg); + } else if (!everSentText && exitedClean) { + const tid = nextId("text"); + emit({ type: "text-start", id: tid }); + const msg = "No response from agent."; + emit({ type: "text-delta", id: tid, delta: msg }); + emit({ type: "text-end", id: tid }); + accAppendText(msg); } else { closeText(); } - run.status = code === 0 || code === null ? "completed" : "error"; run.exitCode = code; - // The parent's NDJSON stream has ended. Any subagents that are - // still running lose their event source (sessions_spawn is - // fire-and-forget). Switch to gateway WebSocket subscriptions - // so they keep streaming. - activateGatewayFallback(); + const hasRunningSubagents = hasRunningSubagentsForParent(run.sessionId); + + // If the CLI exited cleanly and subagents are still running, + // keep the SSE stream open and wait for announcement-triggered + // parent turns via subscribe-only CLI NDJSON. + if (exitedClean && hasRunningSubagents) { + // The parent's NDJSON stream has ended; running subagents lose their + // original event source, so switch each subagent to subscribe children. + activateGatewayFallback(); + run.status = "waiting-for-subagents"; + + openStatusReasoning("Waiting for subagent results..."); + flushPersistence(run); + startParentSubscribeStream(run, parentSessionKey, processParentSubscribeEvent); + return; + } + + // Normal completion path. + run.status = exitedClean ? "completed" : "error"; // Final persistence flush (removes _streaming flag). flushPersistence(run); @@ -865,6 +932,90 @@ function wireChildProcess(run: ActiveRun): void { }); } +function startParentSubscribeStream( + run: ActiveRun, + parentSessionKey: string, + onEvent: (ev: AgentEvent) => void, +): void { + stopSubscribeProcess(run); + const child = spawnAgentSubscribeProcess(parentSessionKey, run.lastGlobalSeq); + run._subscribeProcess = child; + const rl = createInterface({ input: child.stdout! }); + + rl.on("line", (line: string) => { + if (!line.trim()) {return;} + let ev: AgentEvent; + try { + ev = JSON.parse(line) as AgentEvent; + } catch { + return; + } + if (ev.sessionKey && ev.sessionKey !== parentSessionKey) { + return; + } + onEvent(ev); + }); + + child.on("close", () => { + if (run._subscribeProcess === child) { + run._subscribeProcess = null; + } + if (run.status !== "waiting-for-subagents") {return;} + // If still waiting, restart subscribe stream from the latest cursor. + setTimeout(() => { + if (run.status === "waiting-for-subagents" && !run._subscribeProcess) { + startParentSubscribeStream(run, parentSessionKey, onEvent); + } + }, 300); + }); + + child.on("error", (err) => { + console.error("[active-runs] Parent subscribe child error:", err); + }); + + child.stderr?.on("data", (chunk: Buffer) => { + console.error("[active-runs subscribe stderr]", chunk.toString()); + }); +} + +function stopSubscribeProcess(run: ActiveRun): void { + if (!run._subscribeProcess) {return;} + try { + run._subscribeProcess.kill("SIGTERM"); + } catch { + /* ignore */ + } + run._subscribeProcess = null; +} + +// ── Finalize a waiting-for-subagents run ── + +/** + * Transition a run from "waiting-for-subagents" to "completed". + * Called when the last subagent finishes and the parent's announcement- + * triggered turn completes. + */ +function finalizeWaitingRun(run: ActiveRun): void { + if (run.status !== "waiting-for-subagents") {return;} + + run.status = "completed"; + + stopSubscribeProcess(run); + + flushPersistence(run); + + for (const sub of run.subscribers) { + try { sub(null); } catch { /* ignore */ } + } + run.subscribers.clear(); + + setTimeout(() => { + if (activeRuns.get(run.sessionId) === run) { + cleanupRun(run.sessionId); + } + }, CLEANUP_GRACE_MS); +} + // ── Debounced persistence ── function schedulePersist(run: ActiveRun) { @@ -896,7 +1047,7 @@ function flushPersistence(run: ActiveRun) { .map((p) => p.text) .join(""); - const isStillRunning = run.status === "running"; + const isStillStreaming = run.status === "running" || run.status === "waiting-for-subagents"; const message: Record = { id: run.accumulated.id, role: "assistant", @@ -904,7 +1055,7 @@ function flushPersistence(run: ActiveRun) { parts, // Ordered parts — preserves interleaving of reasoning, tools, text timestamp: new Date().toISOString(), }; - if (isStillRunning) { + if (isStillStreaming) { message._streaming = true; } @@ -959,5 +1110,6 @@ function cleanupRun(sessionId: string) { const run = activeRuns.get(sessionId); if (!run) {return;} if (run._persistTimer) {clearTimeout(run._persistTimer);} + stopSubscribeProcess(run); activeRuns.delete(sessionId); }