diff --git a/apps/web/lib/subagent-runs.ts b/apps/web/lib/subagent-runs.ts index cc655765ab9..5c7f53623fe 100644 --- a/apps/web/lib/subagent-runs.ts +++ b/apps/web/lib/subagent-runs.ts @@ -4,17 +4,20 @@ * Mirrors the ActiveRunManager pattern: buffers SSE events, supports * subscriber fan-out, and tracks subagent metadata per parent web session. * - * Events are fed from the gateway WebSocket connection (gateway-events.ts). + * Events are fed from CLI NDJSON streams (parent run + subscribe continuations). */ +import type { ChildProcess } from "node:child_process"; +import { createInterface } from "node:readline"; import { existsSync, readFileSync, writeFileSync, mkdirSync, appendFileSync } from "node:fs"; import { join } from "node:path"; import { + type AgentEvent, + spawnAgentSubscribeProcess, extractToolResult, buildToolOutput, parseAgentErrorMessage, parseErrorBody, } from "./agent-runner"; -import { subscribeToSessionKey, type GatewayEvent } from "./gateway-events"; import { resolveOpenClawStateDir, resolveWebChatDir } from "./workspace"; // ── Types ── @@ -38,8 +41,10 @@ type SubagentRun = SubagentInfo & { subscribers: Set; /** Internal state for event-to-SSE transformation */ _state: TransformState; - _unsubGateway: (() => void) | null; + _subscribeProcess: ChildProcess | null; _cleanupTimer: ReturnType | null; + /** Last globalSeq seen from the gateway event stream for replay cursor. */ + lastGlobalSeq: number; }; type TransformState = { @@ -64,7 +69,7 @@ type SubagentRegistry = { /** Reverse index: parent web session ID → subagent session keys */ parentIndex: Map>; /** Pre-registration buffer: events that arrive before the subagent is registered */ - preRegBuffer: Map; + preRegBuffer: Map; }; function getRegistry(): SubagentRegistry { @@ -227,8 +232,9 @@ export function registerSubagent( eventBuffer: [], subscribers: new Set(), _state: createTransformState(), - _unsubGateway: null, + _subscribeProcess: null, _cleanupTimer: null, + lastGlobalSeq: 0, }; // Load persisted events from disk (fills the replay buffer) @@ -257,7 +263,7 @@ export function registerSubagent( endedAt: run.endedAt, }); - // NOTE: We do NOT subscribe to gateway WebSocket here. During live + // NOTE: We do NOT start subscribe child processes here. During live // streaming, events arrive via routeRawEvent() from the parent's NDJSON // stream. After the parent exits, activateGatewayFallback() subscribes. // For on-demand rehydration (page refresh), ensureSubagentStreamable() @@ -267,7 +273,7 @@ export function registerSubagent( const buf = reg.preRegBuffer.get(info.sessionKey); if (buf && buf.length > 0) { for (const evt of buf) { - handleGatewayEvent(run, evt); + handleAgentEvent(run, evt); } reg.preRegBuffer.delete(info.sessionKey); } @@ -276,14 +282,12 @@ export function registerSubagent( /** * Ensure a rehydrated subagent can receive live events. Called when a client * actually connects to the subagent's SSE stream after a page refresh. - * For still-running subagents, this activates the gateway WebSocket fallback. + * For still-running subagents, this activates the subscribe-child fallback. */ export function ensureSubagentStreamable(sessionKey: string): void { const run = getRegistry().runs.get(sessionKey); - if (!run || run.status !== "running" || run._unsubGateway) {return;} - run._unsubGateway = subscribeToSessionKey(sessionKey, (evt) => { - handleGatewayEvent(run, evt); - }); + if (!run || run.status !== "running" || run._subscribeProcess) {return;} + startSubagentSubscribeStream(run); } /** Get metadata for all subagents belonging to a parent web session. */ @@ -354,24 +358,45 @@ export function isSubagentRunning(sessionKey: string): boolean { } /** - * Activate gateway WebSocket subscriptions for all subagent runs that are + * Activate subscribe-child streams for all subagent runs that are * still in "running" status and don't already have a gateway subscription. * * Called when the parent agent's NDJSON stream ends (child process exits). * After that point the NDJSON routing is no longer available, so the - * gateway WS becomes the only event source for orphaned subagents. + * subscribe child streams become the only event source for orphaned subagents. */ export function activateGatewayFallback(): void { const reg = getRegistry(); for (const [key, run] of reg.runs) { - if (run.status === "running" && !run._unsubGateway) { - run._unsubGateway = subscribeToSessionKey(key, (evt) => { - handleGatewayEvent(run, evt); - }); + if (run.status === "running" && !run._subscribeProcess) { + startSubagentSubscribeStream(run); } } } +/** + * Check if any subagents spawned by this parent web session are still running. + * Cross-checks with the on-disk registry to reconcile subagents whose + * lifecycle "end" events were missed (e.g. during gateway WS handshake). + */ +export function hasRunningSubagentsForParent(parentWebSessionId: string): boolean { + const reg = getRegistry(); + const keys = reg.parentIndex.get(parentWebSessionId); + if (!keys) {return false;} + let anyRunning = false; + for (const key of keys) { + const run = reg.runs.get(key); + if (run?.status !== "running") {continue;} + const diskStatus = readDiskStatus(key); + if (diskStatus !== "running") { + finalizeRun(run, diskStatus === "error" ? "error" : "completed"); + continue; + } + anyRunning = true; + } + return anyRunning; +} + /** Return session keys of all currently running subagents. */ export function getRunningSubagentKeys(): string[] { const keys: string[] = []; @@ -389,21 +414,23 @@ export function getRunningSubagentKeys(): string[] { * agent's CLI process already receives all gateway broadcasts, so we piggyback * on its NDJSON stream instead of maintaining a separate WebSocket connection. * - * Converts the flat NDJSON event shape to the nested GatewayEvent format that - * handleGatewayEvent expects. + * Routes a flat NDJSON event shape to the subagent transformer. */ export function routeRawEvent( sessionKey: string, - ev: { event: string; stream?: string; data?: Record }, + ev: { event: string; stream?: string; data?: Record; globalSeq?: number }, ): void { - const gwEvt: GatewayEvent = { + const sourceEvent: AgentEvent = { event: ev.event, - payload: { sessionKey, stream: ev.stream, data: ev.data }, + sessionKey, + stream: ev.stream, + data: ev.data, + globalSeq: ev.globalSeq, }; const run = getRegistry().runs.get(sessionKey); if (run) { - handleGatewayEvent(run, gwEvt); + handleAgentEvent(run, sourceEvent); return; } @@ -416,7 +443,7 @@ export function routeRawEvent( reg.preRegBuffer.set(sessionKey, buf); } if (buf.length < 10_000) { - buf.push(gwEvt); + buf.push(sourceEvent); } } @@ -484,14 +511,19 @@ function createTransformState(): TransformState { }; } -function handleGatewayEvent(run: SubagentRun, evt: GatewayEvent): void { - if (evt.event !== "agent" || !evt.payload) {return;} - - const payload = evt.payload; - const stream = typeof payload.stream === "string" ? payload.stream : undefined; +function handleAgentEvent(run: SubagentRun, evt: AgentEvent): void { + if (evt.event !== "agent") {return;} + const gSeq = typeof (evt as Record).globalSeq === "number" + ? (evt as Record).globalSeq as number + : undefined; + if (gSeq !== undefined) { + if (gSeq <= run.lastGlobalSeq) {return;} + run.lastGlobalSeq = gSeq; + } + const stream = typeof evt.stream === "string" ? evt.stream : undefined; const data = - payload.data && typeof payload.data === "object" - ? (payload.data as Record) + evt.data && typeof evt.data === "object" + ? (evt.data) : undefined; if (!stream || !data) {return;} @@ -673,9 +705,7 @@ function finalizeRun(run: SubagentRun, status: "completed" | "error"): void { } run.subscribers.clear(); - // Unsubscribe from gateway events - run._unsubGateway?.(); - run._unsubGateway = null; + stopSubagentSubscribeStream(run); // Schedule cleanup after grace period run._cleanupTimer = setTimeout(() => { @@ -692,7 +722,7 @@ function cleanupRun(sessionKey: string): void { clearTimeout(run._cleanupTimer); run._cleanupTimer = null; } - run._unsubGateway?.(); + stopSubagentSubscribeStream(run); reg.runs.delete(sessionKey); // Clean up parent index @@ -704,3 +734,54 @@ function cleanupRun(sessionKey: string): void { } } } + +function startSubagentSubscribeStream(run: SubagentRun): void { + stopSubagentSubscribeStream(run); + const child = spawnAgentSubscribeProcess(run.sessionKey, run.lastGlobalSeq); + run._subscribeProcess = child; + const rl = createInterface({ input: child.stdout! }); + + rl.on("line", (line: string) => { + if (!line.trim()) {return;} + let ev: AgentEvent; + try { + ev = JSON.parse(line) as AgentEvent; + } catch { + return; + } + if (ev.sessionKey && ev.sessionKey !== run.sessionKey) { + return; + } + handleAgentEvent(run, ev); + }); + + child.on("close", () => { + if (run._subscribeProcess === child) { + run._subscribeProcess = null; + } + if (run.status !== "running") {return;} + setTimeout(() => { + if (run.status === "running" && !run._subscribeProcess) { + startSubagentSubscribeStream(run); + } + }, 300); + }); + + child.on("error", (err) => { + console.error("[subagent-runs] Subscribe child error:", err); + }); + + child.stderr?.on("data", (chunk: Buffer) => { + console.error("[subagent-runs subscribe stderr]", chunk.toString()); + }); +} + +function stopSubagentSubscribeStream(run: SubagentRun): void { + if (!run._subscribeProcess) {return;} + try { + run._subscribeProcess.kill("SIGTERM"); + } catch { + /* ignore */ + } + run._subscribeProcess = null; +}