diff --git a/apps/web/app/api/chat/route.ts b/apps/web/app/api/chat/route.ts index 7af4818542e..8dfeb9d6be5 100644 --- a/apps/web/app/api/chat/route.ts +++ b/apps/web/app/api/chat/route.ts @@ -1,35 +1,27 @@ import type { UIMessage } from "ai"; -import { existsSync, readFileSync } from "node:fs"; -import { join } from "node:path"; import { resolveAgentWorkspacePrefix } from "@/lib/workspace"; import { startRun, + startSubscribeRun, hasActiveRun, + getActiveRun, subscribeToRun, persistUserMessage, - type SseEvent as ParentSseEvent, + persistSubscribeUserMessage, + reactivateSubscribeRun, + sendSubagentFollowUp, + type SseEvent, } from "@/lib/active-runs"; -import { - hasActiveSubagent, - isSubagentRunning, - ensureRegisteredFromDisk, - subscribeToSubagent, - persistUserMessage as persistSubagentUserMessage, - reactivateSubagent, - spawnSubagentMessage, - type SseEvent as SubagentSseEvent, -} from "@/lib/subagent-runs"; +import { existsSync, readFileSync } from "node:fs"; +import { join } from "node:path"; import { resolveOpenClawStateDir } from "@/lib/workspace"; -// Force Node.js runtime (required for child_process) export const runtime = "nodejs"; - -// Allow streaming responses up to 10 minutes export const maxDuration = 600; -function deriveSubagentParentSessionId(sessionKey: string): string { +function deriveSubagentInfo(sessionKey: string): { parentSessionId: string; task: string } | null { const registryPath = join(resolveOpenClawStateDir(), "subagents", "runs.json"); - if (!existsSync(registryPath)) {return "";} + if (!existsSync(registryPath)) {return null;} try { const raw = JSON.parse(readFileSync(registryPath, "utf-8")) as { runs?: Record>; @@ -38,18 +30,14 @@ function deriveSubagentParentSessionId(sessionKey: string): string { if (entry.childSessionKey !== sessionKey) {continue;} const requester = typeof entry.requesterSessionKey === "string" ? entry.requesterSessionKey : ""; const match = requester.match(/^agent:[^:]+:web:(.+)$/); - return match?.[1] ?? ""; + const parentSessionId = match?.[1] ?? ""; + const task = typeof entry.task === "string" ? entry.task : ""; + return { parentSessionId, task }; } } catch { // ignore } - return ""; -} - -function ensureSubagentRegistered(sessionKey: string): boolean { - if (hasActiveSubagent(sessionKey)) {return true;} - const parentWebSessionId = deriveSubagentParentSessionId(sessionKey); - return ensureRegisteredFromDisk(sessionKey, parentWebSessionId); + return null; } export async function POST(req: Request) { @@ -59,7 +47,6 @@ export async function POST(req: Request) { sessionKey, }: { messages: UIMessage[]; sessionId?: string; sessionKey?: string } = await req.json(); - // Extract the latest user message text const lastUserMessage = messages.filter((m) => m.role === "user").pop(); const userText = lastUserMessage?.parts @@ -76,15 +63,16 @@ export async function POST(req: Request) { const isSubagentSession = typeof sessionKey === "string" && sessionKey.includes(":subagent:"); - // Reject if a run is already active for this session. if (!isSubagentSession && sessionId && hasActiveRun(sessionId)) { return new Response("Active run in progress", { status: 409 }); } - if (isSubagentSession && isSubagentRunning(sessionKey)) { - return new Response("Active subagent run in progress", { status: 409 }); + if (isSubagentSession && sessionKey) { + const existingRun = getActiveRun(sessionKey); + if (existingRun?.status === "running") { + return new Response("Active subagent run in progress", { status: 409 }); + } } - // Resolve workspace file paths to be agent-cwd-relative. let agentMessage = userText; const wsPrefix = resolveAgentWorkspacePrefix(); if (wsPrefix) { @@ -94,34 +82,35 @@ export async function POST(req: Request) { ); } - // Persist the user message server-side so it survives a page reload - // even if the client never gets a chance to save. + const runKey = isSubagentSession && sessionKey ? sessionKey : (sessionId as string); + if (isSubagentSession && sessionKey && lastUserMessage) { - if (!ensureSubagentRegistered(sessionKey)) { - return new Response("Subagent not found", { status: 404 }); + let run = getActiveRun(sessionKey); + if (!run) { + const info = deriveSubagentInfo(sessionKey); + if (!info) { + return new Response("Subagent not found", { status: 404 }); + } + run = startSubscribeRun({ + sessionKey, + parentSessionId: info.parentSessionId, + task: info.task, + }); } - persistSubagentUserMessage(sessionKey, { + persistSubscribeUserMessage(sessionKey, { id: lastUserMessage.id, text: userText, }); + reactivateSubscribeRun(sessionKey); + if (!sendSubagentFollowUp(sessionKey, agentMessage)) { + return new Response("Failed to send subagent message", { status: 500 }); + } } else if (sessionId && lastUserMessage) { persistUserMessage(sessionId, { id: lastUserMessage.id, content: userText, parts: lastUserMessage.parts as unknown[], }); - } - - // Start the agent run (decoupled from this HTTP connection). - // The child process will keep running even if this response is cancelled. - if (isSubagentSession && sessionKey) { - if (!reactivateSubagent(sessionKey)) { - return new Response("Subagent not found", { status: 404 }); - } - if (!spawnSubagentMessage(sessionKey, agentMessage)) { - return new Response("Failed to start subagent run", { status: 500 }); - } - } else if (sessionId) { try { startRun({ sessionId, @@ -136,78 +125,40 @@ export async function POST(req: Request) { } } - // Stream SSE events to the client using the AI SDK v6 wire format. const encoder = new TextEncoder(); let closed = false; let unsubscribe: (() => void) | null = null; const stream = new ReadableStream({ start(controller) { - if (!sessionId && !sessionKey) { - // No session — shouldn't happen but close gracefully. + if (!runKey) { controller.close(); return; } - unsubscribe = isSubagentSession && sessionKey - ? subscribeToSubagent( - sessionKey, - (event: SubagentSseEvent | null) => { - if (closed) {return;} - if (event === null) { - closed = true; - try { - controller.close(); - } catch { - /* already closed */ - } - return; - } - try { - const json = JSON.stringify(event); - controller.enqueue(encoder.encode(`data: ${json}\n\n`)); - } catch { - /* ignore enqueue errors on closed stream */ - } - }, - { replay: false }, - ) - : subscribeToRun( - sessionId as string, - (event: ParentSseEvent | null) => { + unsubscribe = subscribeToRun( + runKey, + (event: SseEvent | null) => { if (closed) {return;} if (event === null) { - // Run completed — close the SSE stream. closed = true; - try { - controller.close(); - } catch { - /* already closed */ - } + try { controller.close(); } catch { /* already closed */ } return; } try { const json = JSON.stringify(event); - controller.enqueue( - encoder.encode(`data: ${json}\n\n`), - ); - } catch { - /* ignore enqueue errors on closed stream */ - } + controller.enqueue(encoder.encode(`data: ${json}\n\n`)); + } catch { /* ignore */ } }, - // Don't replay — we just created the run, the buffer is empty. { replay: false }, ); if (!unsubscribe) { - // Race: run was cleaned up between startRun and subscribe. closed = true; controller.close(); } }, cancel() { - // Client disconnected — unsubscribe but keep the run alive. - // The ActiveRunManager continues buffering + persisting in the background. closed = true; unsubscribe?.(); }, diff --git a/apps/web/app/api/chat/stop/route.ts b/apps/web/app/api/chat/stop/route.ts index 02b1a66e488..5e87a42fde3 100644 --- a/apps/web/app/api/chat/stop/route.ts +++ b/apps/web/app/api/chat/stop/route.ts @@ -2,59 +2,25 @@ * POST /api/chat/stop * * Abort an active agent run. Called by the Stop button. - * The child process is sent SIGTERM and the run transitions to "error" state. + * Works for both parent sessions (by sessionId) and subagent sessions (by sessionKey). */ -import { abortRun } from "@/lib/active-runs"; -import { - abortSubagent, - hasActiveSubagent, - isSubagentRunning, - ensureRegisteredFromDisk, -} from "@/lib/subagent-runs"; -import { existsSync, readFileSync } from "node:fs"; -import { join } from "node:path"; -import { resolveOpenClawStateDir } from "@/lib/workspace"; +import { abortRun, getActiveRun } from "@/lib/active-runs"; export const runtime = "nodejs"; -function deriveSubagentParentSessionId(sessionKey: string): string { - const registryPath = join(resolveOpenClawStateDir(), "subagents", "runs.json"); - if (!existsSync(registryPath)) {return "";} - try { - const raw = JSON.parse(readFileSync(registryPath, "utf-8")) as { - runs?: Record>; - }; - for (const entry of Object.values(raw.runs ?? {})) { - if (entry.childSessionKey !== sessionKey) {continue;} - const requester = typeof entry.requesterSessionKey === "string" ? entry.requesterSessionKey : ""; - const match = requester.match(/^agent:[^:]+:web:(.+)$/); - return match?.[1] ?? ""; - } - } catch { - // ignore - } - return ""; -} - export async function POST(req: Request) { const body: { sessionId?: string; sessionKey?: string } = await req .json() .catch(() => ({})); const isSubagentSession = typeof body.sessionKey === "string" && body.sessionKey.includes(":subagent:"); - if (isSubagentSession && body.sessionKey) { - if (!hasActiveSubagent(body.sessionKey)) { - const parentWebSessionId = deriveSubagentParentSessionId(body.sessionKey); - ensureRegisteredFromDisk(body.sessionKey, parentWebSessionId); - } - const aborted = isSubagentRunning(body.sessionKey) ? abortSubagent(body.sessionKey) : false; - return Response.json({ aborted }); - } + const runKey = isSubagentSession && body.sessionKey ? body.sessionKey : body.sessionId; - if (!body.sessionId) { + if (!runKey) { return new Response("sessionId or subagent sessionKey required", { status: 400 }); } - const aborted = abortRun(body.sessionId); + const run = getActiveRun(runKey); + const aborted = run?.status === "running" ? abortRun(runKey) : false; return Response.json({ aborted }); } diff --git a/apps/web/app/api/chat/stream/route.ts b/apps/web/app/api/chat/stream/route.ts index c1c16fd563e..e073c947314 100644 --- a/apps/web/app/api/chat/stream/route.ts +++ b/apps/web/app/api/chat/stream/route.ts @@ -1,25 +1,19 @@ /** - * GET /api/chat/stream?sessionId=xxx + * GET /api/chat/stream?sessionId=xxx (parent sessions) + * GET /api/chat/stream?sessionKey=xxx (subagent sessions) * * Reconnect to an active (or recently-completed) agent run. * Replays all buffered SSE events from the start of the run, then * streams live events until the run finishes. * - * Returns 404 if no run exists for the given session. + * Both parent and subagent sessions use the same ActiveRun system. */ import { getActiveRun, + startSubscribeRun, subscribeToRun, - type SseEvent as ParentSseEvent, + type SseEvent, } from "@/lib/active-runs"; -import { - subscribeToSubagent, - hasActiveSubagent, - isSubagentRunning, - ensureRegisteredFromDisk, - ensureSubagentStreamable, - type SseEvent as SubagentSseEvent, -} from "@/lib/subagent-runs"; import { existsSync, readFileSync } from "node:fs"; import { join } from "node:path"; import { resolveOpenClawStateDir } from "@/lib/workspace"; @@ -27,9 +21,9 @@ import { resolveOpenClawStateDir } from "@/lib/workspace"; export const runtime = "nodejs"; export const maxDuration = 600; -function deriveSubagentParentSessionId(sessionKey: string): string { +function deriveSubagentInfo(sessionKey: string): { parentSessionId: string; task: string } | null { const registryPath = join(resolveOpenClawStateDir(), "subagents", "runs.json"); - if (!existsSync(registryPath)) {return "";} + if (!existsSync(registryPath)) {return null;} try { const raw = JSON.parse(readFileSync(registryPath, "utf-8")) as { runs?: Record>; @@ -38,12 +32,14 @@ function deriveSubagentParentSessionId(sessionKey: string): string { if (entry.childSessionKey !== sessionKey) {continue;} const requester = typeof entry.requesterSessionKey === "string" ? entry.requesterSessionKey : ""; const match = requester.match(/^agent:[^:]+:web:(.+)$/); - return match?.[1] ?? ""; + const parentSessionId = match?.[1] ?? ""; + const task = typeof entry.task === "string" ? entry.task : ""; + return { parentSessionId, task }; } } catch { // ignore } - return ""; + return null; } export async function GET(req: Request) { @@ -56,66 +52,21 @@ export async function GET(req: Request) { return new Response("sessionId or subagent sessionKey required", { status: 400 }); } - if (isSubagentSession && sessionKey) { - if (!hasActiveSubagent(sessionKey)) { - const parentWebSessionId = deriveSubagentParentSessionId(sessionKey); - const registered = ensureRegisteredFromDisk(sessionKey, parentWebSessionId); - if (!registered && !hasActiveSubagent(sessionKey)) { - return Response.json({ active: false }, { status: 404 }); - } + const runKey = isSubagentSession && sessionKey ? sessionKey : (sessionId as string); + + let run = getActiveRun(runKey); + + if (!run && isSubagentSession && sessionKey) { + const info = deriveSubagentInfo(sessionKey); + if (info) { + run = startSubscribeRun({ + sessionKey, + parentSessionId: info.parentSessionId, + task: info.task, + }); } - ensureSubagentStreamable(sessionKey); - const isActive = isSubagentRunning(sessionKey); - const encoder = new TextEncoder(); - let closed = false; - let unsubscribe: (() => void) | null = null; - - const stream = new ReadableStream({ - start(controller) { - unsubscribe = subscribeToSubagent( - sessionKey, - (event: SubagentSseEvent | null) => { - if (closed) {return;} - if (event === null) { - closed = true; - try { - controller.close(); - } catch { - /* already closed */ - } - return; - } - try { - const json = JSON.stringify(event); - controller.enqueue(encoder.encode(`data: ${json}\n\n`)); - } catch { - /* ignore enqueue errors on closed stream */ - } - }, - { replay: true }, - ); - - if (!unsubscribe) { - closed = true; - controller.close(); - } - }, - cancel() { - closed = true; - unsubscribe?.(); - }, - }); - - return new Response(stream, { - headers: { - "Content-Type": "text/event-stream", - "Cache-Control": "no-cache, no-transform", - Connection: "keep-alive", - "X-Run-Active": isActive ? "true" : "false", - }, - }); } - const run = getActiveRun(sessionId as string); + if (!run) { return Response.json({ active: false }, { status: 404 }); } @@ -127,7 +78,6 @@ export async function GET(req: Request) { const stream = new ReadableStream({ start(controller) { - // Keep idle SSE connections alive while waiting for subagent announcements. keepalive = setInterval(() => { if (closed) {return;} try { @@ -137,14 +87,11 @@ export async function GET(req: Request) { } }, 15_000); - // subscribeToRun with replay=true replays the full event buffer - // synchronously, then subscribes for live events. unsubscribe = subscribeToRun( - sessionId as string, - (event: ParentSseEvent | null) => { + runKey, + (event: SseEvent | null) => { if (closed) {return;} if (event === null) { - // Run completed — close the SSE stream. closed = true; if (keepalive) { clearInterval(keepalive); @@ -159,9 +106,7 @@ export async function GET(req: Request) { } try { const json = JSON.stringify(event); - controller.enqueue( - encoder.encode(`data: ${json}\n\n`), - ); + controller.enqueue(encoder.encode(`data: ${json}\n\n`)); } catch { /* ignore enqueue errors on closed stream */ } @@ -170,7 +115,6 @@ export async function GET(req: Request) { ); if (!unsubscribe) { - // Run was cleaned up between getActiveRun and subscribe. closed = true; if (keepalive) { clearInterval(keepalive); @@ -180,7 +124,6 @@ export async function GET(req: Request) { } }, cancel() { - // Client disconnected — unsubscribe only (don't kill the run). closed = true; if (keepalive) { clearInterval(keepalive); diff --git a/apps/web/lib/active-runs.ts b/apps/web/lib/active-runs.ts index 55013010088..86077afa451 100644 --- a/apps/web/lib/active-runs.ts +++ b/apps/web/lib/active-runs.ts @@ -17,7 +17,7 @@ import { existsSync, mkdirSync, } from "node:fs"; -import { resolveWebChatDir } from "./workspace"; +import { resolveWebChatDir, resolveOpenClawStateDir } from "./workspace"; import { type AgentEvent, spawnAgentProcess, @@ -29,10 +29,6 @@ import { parseErrorBody, parseErrorFromStderr, } from "./agent-runner"; -import { - hasRunningSubagentsForParent, - registerSubagent, -} from "./subagent-runs"; // ── Types ── @@ -79,12 +75,27 @@ export type ActiveRun = { lastGlobalSeq: number; /** @internal subscribe child process for waiting-for-subagents continuation */ _subscribeProcess?: ChildProcess | null; + /** Full gateway session key (used for subagent subscribe-only runs) */ + sessionKey?: string; + /** Parent web session ID (for subagent runs) */ + parentSessionId?: string; + /** Subagent task description */ + task?: string; + /** Subagent label */ + label?: string; + /** True for subscribe-only runs (subagents) that don't own the agent process */ + isSubscribeOnly?: boolean; + /** Set when lifecycle/end is received; defers finalization until subscribe close */ + _lifecycleEnded?: boolean; + /** Safety timer to finalize if subscribe process hangs after lifecycle/end */ + _finalizeTimer?: ReturnType | null; }; // ── Constants ── const PERSIST_INTERVAL_MS = 2_000; const CLEANUP_GRACE_MS = 30_000; +const SUBSCRIBE_CLEANUP_GRACE_MS = 24 * 60 * 60_000; const SILENT_REPLY_TOKEN = "NO_REPLY"; @@ -143,6 +154,33 @@ export function getRunningSessionIds(): string[] { return ids; } +/** Check if any subagent sessions are still running for a parent web session. */ +export function hasRunningSubagentsForParent(parentWebSessionId: string): boolean { + for (const [_key, run] of activeRuns) { + if (run.isSubscribeOnly && run.parentSessionId === parentWebSessionId && run.status === "running") { + return true; + } + } + // Fallback: check the gateway disk registry + const registryPath = join(resolveOpenClawStateDir(), "subagents", "runs.json"); + if (!existsSync(registryPath)) {return false;} + try { + const raw = JSON.parse(readFileSync(registryPath, "utf-8")) as { + runs?: Record>; + }; + const runs = raw?.runs; + if (!runs) {return false;} + const parentKeyPattern = `:web:${parentWebSessionId}`; + for (const entry of Object.values(runs)) { + const requester = typeof entry.requesterSessionKey === "string" ? entry.requesterSessionKey : ""; + if (!requester.endsWith(parentKeyPattern)) {continue;} + if (typeof entry.endedAt === "number") {continue;} + return true; + } + } catch { /* ignore */ } + return false; +} + /** * Subscribe to an active run's SSE events. * @@ -181,6 +219,85 @@ export function subscribeToRun( }; } +/** + * Reactivate a completed subscribe-only run for a follow-up message. + * Resets status to "running" and restarts the subscribe stream. + */ +export function reactivateSubscribeRun(sessionKey: string): boolean { + const run = activeRuns.get(sessionKey); + if (!run?.isSubscribeOnly) {return false;} + if (run.status === "running") {return true;} + + run.status = "running"; + run._lifecycleEnded = false; + if (run._finalizeTimer) {clearTimeout(run._finalizeTimer); run._finalizeTimer = null;} + + run.accumulated = { + id: `assistant-${sessionKey}-${Date.now()}`, + role: "assistant", + parts: [], + }; + + const newChild = spawnAgentSubscribeProcess(sessionKey, run.lastGlobalSeq); + run._subscribeProcess = newChild; + run.childProcess = newChild; + wireSubscribeOnlyProcess(run, newChild, sessionKey); + return true; +} + +/** + * Send a follow-up message to a subagent session via gateway RPC. + * The subscribe stream picks up the agent's response events. + */ +export function sendSubagentFollowUp(sessionKey: string, message: string): boolean { + try { + const root = resolvePackageRoot(); + const devScript = join(root, "scripts", "run-node.mjs"); + const prodScript = join(root, "openclaw.mjs"); + const scriptPath = existsSync(devScript) ? devScript : prodScript; + const child = spawn( + "node", + [ + scriptPath, "gateway", "call", "agent", + "--params", JSON.stringify({ + message, sessionKey, + idempotencyKey: `follow-${Date.now()}-${Math.random().toString(36).slice(2)}`, + deliver: false, channel: "webchat", lane: "subagent", timeout: 0, + }), + "--json", "--timeout", "10000", + ], + { cwd: root, env: { ...process.env }, stdio: "ignore", detached: true }, + ); + child.unref(); + return true; + } catch { + return false; + } +} + +/** + * Persist a user message for a subscribe-only (subagent) run. + * Emits a user-message event so reconnecting clients see the message. + */ +export function persistSubscribeUserMessage( + sessionKey: string, + msg: { id?: string; text: string }, +): boolean { + const run = activeRuns.get(sessionKey); + if (!run) {return false;} + const event: SseEvent = { + type: "user-message", + id: msg.id ?? `user-${Date.now()}-${Math.random().toString(36).slice(2)}`, + text: msg.text, + }; + run.eventBuffer.push(event); + for (const sub of run.subscribers) { + try { sub(event); } catch { /* ignore */ } + } + schedulePersist(run); + return true; +} + /** Abort a running agent. Returns true if a run was actually aborted. */ export function abortRun(sessionId: string): boolean { const run = activeRuns.get(sessionId); @@ -334,6 +451,321 @@ export function startRun(params: { return run; } +/** + * Start a subscribe-only run for a subagent session. + * The agent is already running in the gateway; we just subscribe to its + * event stream so buffering, persistence, and reconnection work identically + * to parent sessions. + */ +export function startSubscribeRun(params: { + sessionKey: string; + parentSessionId: string; + task: string; + label?: string; +}): ActiveRun { + const { sessionKey, parentSessionId, task, label } = params; + + if (activeRuns.has(sessionKey)) { + return activeRuns.get(sessionKey)!; + } + + const abortController = new AbortController(); + const subscribeChild = spawnAgentSubscribeProcess(sessionKey, 0); + + const run: ActiveRun = { + sessionId: sessionKey, + childProcess: subscribeChild, + eventBuffer: [], + subscribers: new Set(), + accumulated: { + id: `assistant-${sessionKey}-${Date.now()}`, + role: "assistant", + parts: [], + }, + status: "running", + startedAt: Date.now(), + exitCode: null, + abortController, + _persistTimer: null, + _lastPersistedAt: 0, + lastGlobalSeq: 0, + sessionKey, + parentSessionId, + task, + label, + isSubscribeOnly: true, + _lifecycleEnded: false, + _finalizeTimer: null, + }; + + activeRuns.set(sessionKey, run); + wireSubscribeOnlyProcess(run, subscribeChild, sessionKey); + return run; +} + +/** + * Wire event processing for a subscribe-only run (subagent). + * Uses the same processParentEvent pipeline as parent runs, + * with deferred finalization on lifecycle/end. + */ +function wireSubscribeOnlyProcess( + run: ActiveRun, + child: ChildProcess, + sessionKey: string, +): void { + let idCounter = 0; + const nextId = (prefix: string) => + `${prefix}-${Date.now()}-${++idCounter}`; + + let currentTextId = ""; + let currentReasoningId = ""; + let textStarted = false; + let reasoningStarted = false; + let statusReasoningActive = false; + let agentErrorReported = false; + + let accTextIdx = -1; + let accReasoningIdx = -1; + const accToolMap = new Map(); + + const accAppendReasoning = (delta: string) => { + if (accReasoningIdx < 0) { + run.accumulated.parts.push({ type: "reasoning", text: delta }); + accReasoningIdx = run.accumulated.parts.length - 1; + } else { + (run.accumulated.parts[accReasoningIdx] as { type: "reasoning"; text: string }).text += delta; + } + }; + + const accAppendText = (delta: string) => { + if (accTextIdx < 0) { + run.accumulated.parts.push({ type: "text", text: delta }); + accTextIdx = run.accumulated.parts.length - 1; + } else { + (run.accumulated.parts[accTextIdx] as { type: "text"; text: string }).text += delta; + } + }; + + const emit = (event: SseEvent) => { + run.eventBuffer.push(event); + for (const sub of run.subscribers) { + try { sub(event); } catch { /* ignore */ } + } + schedulePersist(run); + }; + + const emitError = (message: string) => { + closeReasoning(); + closeText(); + const tid = nextId("text"); + emit({ type: "text-start", id: tid }); + emit({ type: "text-delta", id: tid, delta: `[error] ${message}` }); + emit({ type: "text-end", id: tid }); + accAppendText(`[error] ${message}`); + }; + + const closeReasoning = () => { + if (reasoningStarted) { + emit({ type: "reasoning-end", id: currentReasoningId }); + reasoningStarted = false; + statusReasoningActive = false; + } + accReasoningIdx = -1; + }; + + const closeText = () => { + if (textStarted) { + const lastPart = run.accumulated.parts[accTextIdx]; + if (lastPart?.type === "text" && isLeakedSilentReplyToken(lastPart.text)) { + run.accumulated.parts.splice(accTextIdx, 1); + } + emit({ type: "text-end", id: currentTextId }); + textStarted = false; + } + accTextIdx = -1; + }; + + const openStatusReasoning = (label: string) => { + closeReasoning(); + closeText(); + currentReasoningId = nextId("status"); + emit({ type: "reasoning-start", id: currentReasoningId }); + emit({ type: "reasoning-delta", id: currentReasoningId, delta: label }); + reasoningStarted = true; + statusReasoningActive = true; + }; + + const processEvent = (ev: AgentEvent) => { + if (ev.event === "agent" && ev.stream === "lifecycle" && ev.data?.phase === "start") { + openStatusReasoning("Preparing response..."); + } + + if (ev.event === "agent" && ev.stream === "thinking") { + const delta = typeof ev.data?.delta === "string" ? ev.data.delta : undefined; + if (delta) { + if (statusReasoningActive) { closeReasoning(); } + if (!reasoningStarted) { + currentReasoningId = nextId("reasoning"); + emit({ type: "reasoning-start", id: currentReasoningId }); + reasoningStarted = true; + } + emit({ type: "reasoning-delta", id: currentReasoningId, delta }); + accAppendReasoning(delta); + } + } + + if (ev.event === "agent" && ev.stream === "assistant") { + const delta = typeof ev.data?.delta === "string" ? ev.data.delta : undefined; + const textFallback = !delta && typeof ev.data?.text === "string" ? ev.data.text : undefined; + const chunk = delta ?? textFallback; + if (chunk) { + closeReasoning(); + if (!textStarted) { + currentTextId = nextId("text"); + emit({ type: "text-start", id: currentTextId }); + textStarted = true; + } + emit({ type: "text-delta", id: currentTextId, delta: chunk }); + accAppendText(chunk); + } + if (typeof ev.data?.stopReason === "string" && ev.data.stopReason === "error" && typeof ev.data?.errorMessage === "string" && !agentErrorReported) { + agentErrorReported = true; + emitError(parseErrorBody(ev.data.errorMessage)); + } + } + + if (ev.event === "agent" && ev.stream === "tool") { + const phase = typeof ev.data?.phase === "string" ? ev.data.phase : undefined; + const toolCallId = typeof ev.data?.toolCallId === "string" ? ev.data.toolCallId : ""; + const toolName = typeof ev.data?.name === "string" ? ev.data.name : ""; + + if (phase === "start") { + closeReasoning(); + closeText(); + const args = ev.data?.args && typeof ev.data.args === "object" ? (ev.data.args as Record) : {}; + emit({ type: "tool-input-start", toolCallId, toolName }); + emit({ type: "tool-input-available", toolCallId, toolName, input: args }); + run.accumulated.parts.push({ type: "tool-invocation", toolCallId, toolName, args }); + accToolMap.set(toolCallId, run.accumulated.parts.length - 1); + } else if (phase === "result") { + const isError = ev.data?.isError === true; + const result = extractToolResult(ev.data?.result); + if (isError) { + const errorText = result?.text || (result?.details?.error as string | undefined) || "Tool execution failed"; + emit({ type: "tool-output-error", toolCallId, errorText }); + } else { + const output = buildToolOutput(result); + emit({ type: "tool-output-available", toolCallId, output }); + const idx = accToolMap.get(toolCallId); + if (idx !== undefined) { + const part = run.accumulated.parts[idx]; + if (part.type === "tool-invocation") { part.result = output; } + } + } + } + } + + if (ev.event === "agent" && ev.stream === "compaction") { + const phase = typeof ev.data?.phase === "string" ? ev.data.phase : undefined; + if (phase === "start") { openStatusReasoning("Optimizing session context..."); } + else if (phase === "end") { + if (statusReasoningActive) { + if (ev.data?.willRetry === true) { + emit({ type: "reasoning-delta", id: currentReasoningId, delta: "\nRetrying with compacted context..." }); + } else { closeReasoning(); } + } + } + } + + if (ev.event === "agent" && ev.stream === "lifecycle" && ev.data?.phase === "end") { + closeReasoning(); + closeText(); + run._lifecycleEnded = true; + if (run._finalizeTimer) { clearTimeout(run._finalizeTimer); } + run._finalizeTimer = setTimeout(() => { + run._finalizeTimer = null; + if (run.status === "running") { finalizeSubscribeRun(run); } + }, 5_000); + } + + if (ev.event === "agent" && ev.stream === "lifecycle" && ev.data?.phase === "error" && !agentErrorReported) { + const msg = parseAgentErrorMessage(ev.data); + if (msg) { agentErrorReported = true; emitError(msg); } + finalizeSubscribeRun(run, "error"); + } + + if (ev.event === "error" && !agentErrorReported) { + const msg = parseAgentErrorMessage(ev.data ?? (ev as unknown as Record)); + if (msg) { agentErrorReported = true; emitError(msg); } + } + }; + + const rl = createInterface({ input: child.stdout! }); + + rl.on("line", (line: string) => { + if (!line.trim()) { return; } + let ev: AgentEvent; + try { ev = JSON.parse(line) as AgentEvent; } catch { return; } + if (ev.sessionKey && ev.sessionKey !== sessionKey) { return; } + const gSeq = typeof (ev as Record).globalSeq === "number" + ? (ev as Record).globalSeq as number + : undefined; + if (gSeq !== undefined) { + if (gSeq <= run.lastGlobalSeq) { return; } + run.lastGlobalSeq = gSeq; + } + processEvent(ev); + }); + + child.on("close", () => { + if (run._subscribeProcess === child) { run._subscribeProcess = null; } + if (run.status !== "running") { return; } + if (run._lifecycleEnded) { + if (run._finalizeTimer) { clearTimeout(run._finalizeTimer); run._finalizeTimer = null; } + finalizeSubscribeRun(run); + return; + } + setTimeout(() => { + if (run.status === "running" && !run._subscribeProcess) { + const newChild = spawnAgentSubscribeProcess(sessionKey, run.lastGlobalSeq); + run._subscribeProcess = newChild; + run.childProcess = newChild; + wireSubscribeOnlyProcess(run, newChild, sessionKey); + } + }, 300); + }); + + child.on("error", (err) => { + console.error("[active-runs] Subscribe child error:", err); + }); + + child.stderr?.on("data", (chunk: Buffer) => { + console.error("[active-runs subscribe stderr]", chunk.toString()); + }); + + run._subscribeProcess = child; +} + +function finalizeSubscribeRun(run: ActiveRun, status: "completed" | "error" = "completed"): void { + if (run.status !== "running") { return; } + if (run._finalizeTimer) { clearTimeout(run._finalizeTimer); run._finalizeTimer = null; } + + run.status = status; + flushPersistence(run); + + for (const sub of run.subscribers) { + try { sub(null); } catch { /* ignore */ } + } + run.subscribers.clear(); + + stopSubscribeProcess(run); + + const grace = run.isSubscribeOnly ? SUBSCRIBE_CLEANUP_GRACE_MS : CLEANUP_GRACE_MS; + setTimeout(() => { + if (activeRuns.get(run.sessionId) === run) { cleanupRun(run.sessionId); } + }, grace); +} + // ── Persistence helpers (called from route to persist user messages) ── /** Save a user message to the session JSONL (called once at run start). */ @@ -719,26 +1151,15 @@ function wireChildProcess(run: ActiveRun): void { if (toolName === "sessions_spawn" && !isError) { const childSessionKey = result?.details?.childSessionKey as string | undefined; - const childRunId = - result?.details?.runId as string | undefined; - // task/label are in the tool input args, not the result - const spawnArgs = accToolMap.has(toolCallId) - ? run.accumulated.parts[accToolMap.get(toolCallId)!] - : undefined; - const spawnTask = - (spawnArgs as Record | undefined)?.args - ? ((spawnArgs as Record).args as Record)?.task as string | undefined + if (childSessionKey) { + const spawnArgs = accToolMap.has(toolCallId) + ? (run.accumulated.parts[accToolMap.get(toolCallId)!] as { args?: Record })?.args : undefined; - const spawnLabel = - (spawnArgs as Record | undefined)?.args - ? ((spawnArgs as Record).args as Record)?.label as string | undefined - : undefined; - if (childSessionKey && childRunId) { - registerSubagent(run.sessionId, { + startSubscribeRun({ sessionKey: childSessionKey, - runId: childRunId, - task: spawnTask ?? "Subagent task", - label: spawnLabel, + parentSessionId: run.sessionId, + task: (spawnArgs?.task as string | undefined) ?? "Subagent task", + label: spawnArgs?.label as string | undefined, }); } }