diff --git a/apps/web/lib/subagent-runs.ts b/apps/web/lib/subagent-runs.ts index abcd3e65d20..7f6b1851da0 100644 --- a/apps/web/lib/subagent-runs.ts +++ b/apps/web/lib/subagent-runs.ts @@ -6,13 +6,15 @@ * * Events are fed from CLI NDJSON streams (parent run + subscribe continuations). */ -import type { ChildProcess } from "node:child_process"; +import { type ChildProcess, spawn } from "node:child_process"; +import { randomUUID } from "node:crypto"; import { createInterface } from "node:readline"; import { existsSync, readFileSync, writeFileSync, mkdirSync, appendFileSync } from "node:fs"; import { join } from "node:path"; import { type AgentEvent, spawnAgentSubscribeProcess, + resolvePackageRoot, extractToolResult, buildToolOutput, parseAgentErrorMessage, @@ -68,8 +70,6 @@ type SubagentRegistry = { runs: Map; /** Reverse index: parent web session ID → subagent session keys */ parentIndex: Map>; - /** Pre-registration buffer: events that arrive before the subagent is registered */ - preRegBuffer: Map; }; function getRegistry(): SubagentRegistry { @@ -81,7 +81,6 @@ function getRegistry(): SubagentRegistry { const registry: SubagentRegistry = { runs: new Map(), parentIndex: new Map(), - preRegBuffer: new Map(), }; (globalThis as Record)[GLOBAL_KEY] = registry; return registry; @@ -263,19 +262,9 @@ export function registerSubagent( endedAt: run.endedAt, }); - // NOTE: We do NOT start subscribe child processes here. During live - // streaming, events arrive via routeRawEvent() from the parent's NDJSON - // stream. After the parent exits, activateGatewayFallback() subscribes. - // For on-demand rehydration (page refresh), ensureSubagentStreamable() - // handles the subscription. - - // Replay any pre-registration buffered events (live sessions only) - const buf = reg.preRegBuffer.get(info.sessionKey); - if (buf && buf.length > 0) { - for (const evt of buf) { - handleAgentEvent(run, evt); - } - reg.preRegBuffer.delete(info.sessionKey); + // Subagents are first-class sessions; subscribe immediately. + if (run.status === "running") { + startSubagentSubscribeStream(run); } } @@ -365,20 +354,6 @@ export function isSubagentRunning(sessionKey: string): boolean { * After that point the NDJSON routing is no longer available, so the * subscribe child streams become the only event source for orphaned subagents. */ -export function activateGatewayFallback(): void { - const reg = getRegistry(); - for (const [_key, run] of reg.runs) { - if (run.status === "running" && !run._subscribeProcess) { - startSubagentSubscribeStream(run); - } - } -} - -/** - * Check if any subagents spawned by this parent web session are still running. - * Cross-checks with the on-disk registry to reconcile subagents whose - * lifecycle "end" events were missed (e.g. during gateway WS handshake). - */ export function hasRunningSubagentsForParent(parentWebSessionId: string): boolean { const reg = getRegistry(); const keys = reg.parentIndex.get(parentWebSessionId); @@ -408,42 +383,128 @@ export function getRunningSubagentKeys(): string[] { return keys; } -/** - * Route a raw NDJSON agent event (from the CLI child process stdout) to the - * appropriate subagent run. This is the primary event source -- the parent - * agent's CLI process already receives all gateway broadcasts, so we piggyback - * on its NDJSON stream instead of maintaining a separate WebSocket connection. - * - * Routes a flat NDJSON event shape to the subagent transformer. - */ -export function routeRawEvent( +export function persistUserMessage( sessionKey: string, - ev: { event: string; stream?: string; data?: Record; globalSeq?: number }, -): void { - const sourceEvent: AgentEvent = { - event: ev.event, - sessionKey, - stream: ev.stream, - data: ev.data, - globalSeq: ev.globalSeq, - }; - + msg: { id?: string; text: string }, +): boolean { const run = getRegistry().runs.get(sessionKey); - if (run) { - handleAgentEvent(run, sourceEvent); - return; + if (!run) {return false;} + const event: SseEvent = { + type: "user-message", + id: msg.id ?? `user-${Date.now()}-${Math.random().toString(36).slice(2)}`, + text: msg.text, + }; + run.eventBuffer.push(event); + persistEvent(sessionKey, event); + for (const sub of run.subscribers) { + try { sub(event); } catch { /* ignore */ } } + return true; +} - // Buffer events that arrive before the subagent is registered - // (runs.json may not be written yet). These are replayed on registration. - const reg = getRegistry(); - let buf = reg.preRegBuffer.get(sessionKey); - if (!buf) { - buf = []; - reg.preRegBuffer.set(sessionKey, buf); +export function reactivateSubagent(sessionKey: string): boolean { + const run = getRegistry().runs.get(sessionKey); + if (!run) {return false;} + if (run._cleanupTimer) { + clearTimeout(run._cleanupTimer); + run._cleanupTimer = null; } - if (buf.length < 10_000) { - buf.push(sourceEvent); + run.status = "running"; + run.endedAt = undefined; + upsertSubagentIndex(run.sessionKey, { + runId: run.runId, + parentWebSessionId: run.parentWebSessionId, + task: run.task, + label: run.label, + status: run.status, + startedAt: run.startedAt, + endedAt: run.endedAt, + }); + startSubagentSubscribeStream(run); + return true; +} + +function sendGatewayAbortForSubagent(sessionKey: string): void { + try { + const root = resolvePackageRoot(); + const devScript = join(root, "scripts", "run-node.mjs"); + const prodScript = join(root, "openclaw.mjs"); + const scriptPath = existsSync(devScript) ? devScript : prodScript; + const child = spawn( + "node", + [ + scriptPath, + "gateway", + "call", + "chat.abort", + "--params", + JSON.stringify({ sessionKey }), + "--json", + "--timeout", + "4000", + ], + { + cwd: root, + env: { ...process.env }, + stdio: "ignore", + detached: true, + }, + ); + child.unref(); + } catch { + // best effort + } +} + +export function abortSubagent(sessionKey: string): boolean { + const run = getRegistry().runs.get(sessionKey); + if (!run || run.status !== "running") {return false;} + sendGatewayAbortForSubagent(sessionKey); + finalizeRun(run, "error"); + return true; +} + +export function spawnSubagentMessage(sessionKey: string, message: string): boolean { + try { + const run = getRegistry().runs.get(sessionKey); + if (!run) {return false;} + const root = resolvePackageRoot(); + const devScript = join(root, "scripts", "run-node.mjs"); + const prodScript = join(root, "openclaw.mjs"); + const scriptPath = existsSync(devScript) ? devScript : prodScript; + const idempotencyKey = randomUUID(); + const child = spawn( + "node", + [ + scriptPath, + "gateway", + "call", + "agent", + "--params", + JSON.stringify({ + message, + sessionKey, + idempotencyKey, + deliver: false, + channel: "webchat", + lane: "subagent", + timeout: 0, + }), + "--json", + "--timeout", + "10000", + ], + { + cwd: root, + env: { ...process.env }, + stdio: "ignore", + detached: true, + }, + ); + child.unref(); + return true; + } catch { + return false; } }