From 0d219413a8d1486b4fd27221bc64dd3472168178 Mon Sep 17 00:00:00 2001 From: kumarabhirup Date: Mon, 2 Mar 2026 18:33:17 -0800 Subject: [PATCH] feat(web): enhance active-runs with WS RPC, subscribe retry, and chat event support --- apps/web/lib/active-runs.ts | 330 ++++++++++++++++++++++++++++++------ 1 file changed, 282 insertions(+), 48 deletions(-) diff --git a/apps/web/lib/active-runs.ts b/apps/web/lib/active-runs.ts index 34a9f415491..224e24e4bb8 100644 --- a/apps/web/lib/active-runs.ts +++ b/apps/web/lib/active-runs.ts @@ -8,7 +8,6 @@ * - Messages are written to persistent sessions as they arrive. * - New HTTP connections can re-attach to a running stream. */ -import { spawn } from "node:child_process"; import { createInterface } from "node:readline"; import { join } from "node:path"; import { @@ -23,6 +22,7 @@ import { type AgentEvent, spawnAgentProcess, spawnAgentSubscribeProcess, + callGatewayRpc, extractToolResult, buildToolOutput, parseAgentErrorMessage, @@ -75,6 +75,10 @@ export type ActiveRun = { lastGlobalSeq: number; /** @internal subscribe child process for waiting-for-subagents continuation */ _subscribeProcess?: AgentProcessHandle | null; + /** @internal retry timer for subscribe stream restarts */ + _subscribeRetryTimer?: ReturnType | null; + /** @internal consecutive subscribe restart attempts without a received event */ + _subscribeRetryAttempt?: number; /** Full gateway session key (used for subagent subscribe-only runs) */ sessionKey?: string; /** Parent web session ID (for subagent runs) */ @@ -89,6 +93,8 @@ export type ActiveRun = { _lifecycleEnded?: boolean; /** Safety timer to finalize if subscribe process hangs after lifecycle/end */ _finalizeTimer?: ReturnType | null; + /** @internal short reconciliation window before waiting-run completion */ + _waitingFinalizeTimer?: ReturnType | null; }; // ── Constants ── @@ -96,6 +102,10 @@ export type ActiveRun = { const PERSIST_INTERVAL_MS = 2_000; const CLEANUP_GRACE_MS = 30_000; const SUBSCRIBE_CLEANUP_GRACE_MS = 24 * 60 * 60_000; +const SUBSCRIBE_RETRY_BASE_MS = 300; +const SUBSCRIBE_RETRY_MAX_MS = 5_000; +const SUBSCRIBE_LIFECYCLE_END_GRACE_MS = 750; +const WAITING_FINALIZE_RECONCILE_MS = 5_000; const SILENT_REPLY_TOKEN = "NO_REPLY"; @@ -113,6 +123,48 @@ function isLeakedSilentReplyToken(text: string): boolean { if (SILENT_REPLY_TOKEN.startsWith(t) && t.length >= 2 && t.length < SILENT_REPLY_TOKEN.length) {return true;} return false; } + +function asRecord(value: unknown): Record | null { + if (!value || typeof value !== "object" || Array.isArray(value)) { + return null; + } + return value as Record; +} + +function extractAssistantTextFromChatPayload( + data: Record | undefined, +): string { + if (!data) { + return ""; + } + const state = typeof data.state === "string" ? data.state : ""; + if (state !== "final") { + return ""; + } + const message = asRecord(data.message); + if (!message || message.role !== "assistant") { + return ""; + } + const content = message.content; + if (typeof content === "string") { + return content; + } + if (!Array.isArray(content)) { + return ""; + } + const chunks: string[] = []; + for (const part of content) { + const rec = asRecord(part); + if (!rec) { + continue; + } + const type = typeof rec.type === "string" ? rec.type : ""; + if ((type === "text" || type === "output_text") && typeof rec.text === "string") { + chunks.push(rec.text); + } + } + return chunks.join(""); +} // Evaluated per-call so it tracks profile switches at runtime. function webChatDir(): string { return resolveWebChatDir(); } function indexFile(): string { return join(webChatDir(), "index.json"); } @@ -231,6 +283,8 @@ export function reactivateSubscribeRun(sessionKey: string): boolean { run.status = "running"; run._lifecycleEnded = false; if (run._finalizeTimer) {clearTimeout(run._finalizeTimer); run._finalizeTimer = null;} + clearWaitingFinalizeTimer(run); + resetSubscribeRetryState(run); run.accumulated = { id: `assistant-${sessionKey}-${Date.now()}`, @@ -251,21 +305,21 @@ export function reactivateSubscribeRun(sessionKey: string): boolean { */ export function sendSubagentFollowUp(sessionKey: string, message: string): boolean { try { - const child = spawn( - "openclaw", - [ - "gateway", "call", "agent", - "--params", JSON.stringify({ - message, sessionKey, - idempotencyKey: `follow-${Date.now()}-${Math.random().toString(36).slice(2)}`, - deliver: false, channel: "webchat", lane: "subagent", timeout: 0, - }), - "--json", "--timeout", "10000", - ], - { env: { ...process.env }, stdio: "ignore", detached: true }, - ); - child.on("error", () => {}); - child.unref(); + void callGatewayRpc( + "agent", + { + message, + sessionKey, + idempotencyKey: `follow-${Date.now()}-${Math.random().toString(36).slice(2)}`, + deliver: false, + channel: "webchat", + lane: "subagent", + timeout: 0, + }, + { timeoutMs: 10_000 }, + ).catch(() => { + // Best effort. + }); return true; } catch { return false; @@ -304,6 +358,7 @@ export function abortRun(sessionId: string): boolean { // false and the next user message isn't rejected with 409. const wasWaiting = run.status === "waiting-for-subagents"; run.status = "error"; + clearWaitingFinalizeTimer(run); // Clean up waiting subscribe process if present. stopSubscribeProcess(run); @@ -357,27 +412,11 @@ export function abortRun(sessionId: string): boolean { function sendGatewayAbort(sessionId: string): void { try { const sessionKey = `agent:main:web:${sessionId}`; - const child = spawn( - "openclaw", - [ - "gateway", - "call", - "chat.abort", - "--params", - JSON.stringify({ sessionKey }), - "--json", - "--timeout", - "4000", - ], - { - env: { ...process.env }, - stdio: "ignore", - detached: true, + void callGatewayRpc("chat.abort", { sessionKey }, { timeoutMs: 4_000 }).catch( + () => { + // Best effort; don't let abort failures break the stop flow. }, ); - child.on("error", () => {}); - // Let the abort process run independently — don't block on it. - child.unref(); } catch { // Best-effort; don't let abort failures break the stop flow. } @@ -421,6 +460,9 @@ export function startRun(params: { _persistTimer: null, _lastPersistedAt: 0, lastGlobalSeq: 0, + _subscribeRetryTimer: null, + _subscribeRetryAttempt: 0, + _waitingFinalizeTimer: null, }; activeRuns.set(sessionId, run); @@ -487,6 +529,9 @@ export function startSubscribeRun(params: { isSubscribeOnly: true, _lifecycleEnded: false, _finalizeTimer: null, + _subscribeRetryTimer: null, + _subscribeRetryAttempt: 0, + _waitingFinalizeTimer: null, }; activeRuns.set(sessionKey, run); @@ -510,6 +555,7 @@ function wireSubscribeOnlyProcess( let currentTextId = ""; let currentReasoningId = ""; + let currentStatusReasoningLabel: string | null = null; let textStarted = false; let reasoningStarted = false; let statusReasoningActive = false; @@ -561,6 +607,7 @@ function wireSubscribeOnlyProcess( reasoningStarted = false; statusReasoningActive = false; } + currentStatusReasoningLabel = null; accReasoningIdx = -1; }; @@ -577,6 +624,9 @@ function wireSubscribeOnlyProcess( }; const openStatusReasoning = (label: string) => { + if (statusReasoningActive && currentStatusReasoningLabel === label) { + return; + } closeReasoning(); closeText(); currentReasoningId = nextId("status"); @@ -584,9 +634,22 @@ function wireSubscribeOnlyProcess( emit({ type: "reasoning-delta", id: currentReasoningId, delta: label }); reasoningStarted = true; statusReasoningActive = true; + currentStatusReasoningLabel = label; }; const processEvent = (ev: AgentEvent) => { + const isLifecycleEndEvent = + ev.event === "agent" && + ev.stream === "lifecycle" && + ev.data?.phase === "end"; + if (!isLifecycleEndEvent) { + if (run._finalizeTimer) { + clearTimeout(run._finalizeTimer); + run._finalizeTimer = null; + } + run._lifecycleEnded = false; + } + if (ev.event === "agent" && ev.stream === "lifecycle" && ev.data?.phase === "start") { openStatusReasoning("Preparing response..."); } @@ -619,6 +682,22 @@ function wireSubscribeOnlyProcess( emit({ type: "text-delta", id: currentTextId, delta: chunk }); accAppendText(chunk); } + const mediaUrls = ev.data?.mediaUrls; + if (Array.isArray(mediaUrls)) { + for (const url of mediaUrls) { + if (typeof url === "string" && url.trim()) { + closeReasoning(); + if (!textStarted) { + currentTextId = nextId("text"); + emit({ type: "text-start", id: currentTextId }); + textStarted = true; + } + const md = `\n![media](${url.trim()})\n`; + emit({ type: "text-delta", id: currentTextId, delta: md }); + accAppendText(md); + } + } + } if (typeof ev.data?.stopReason === "string" && ev.data.stopReason === "error" && typeof ev.data?.errorMessage === "string" && !agentErrorReported) { agentErrorReported = true; emitError(parseErrorBody(ev.data.errorMessage)); @@ -644,6 +723,13 @@ function wireSubscribeOnlyProcess( if (isError) { const errorText = result?.text || (result?.details?.error as string | undefined) || "Tool execution failed"; emit({ type: "tool-output-error", toolCallId, errorText }); + const idx = accToolMap.get(toolCallId); + if (idx !== undefined) { + const part = run.accumulated.parts[idx]; + if (part.type === "tool-invocation") { + part.errorText = errorText; + } + } } else { const output = buildToolOutput(result); emit({ type: "tool-output-available", toolCallId, output }); @@ -662,12 +748,29 @@ function wireSubscribeOnlyProcess( else if (phase === "end") { if (statusReasoningActive) { if (ev.data?.willRetry === true) { - emit({ type: "reasoning-delta", id: currentReasoningId, delta: "\nRetrying with compacted context..." }); + const retryDelta = "\nRetrying with compacted context..."; + emit({ type: "reasoning-delta", id: currentReasoningId, delta: retryDelta }); + accAppendReasoning(retryDelta); } else { closeReasoning(); } } } } + if (ev.event === "chat") { + const finalText = extractAssistantTextFromChatPayload(ev.data); + if (finalText) { + closeReasoning(); + if (!textStarted) { + currentTextId = nextId("text"); + emit({ type: "text-start", id: currentTextId }); + textStarted = true; + } + emit({ type: "text-delta", id: currentTextId, delta: finalText }); + accAppendText(finalText); + closeText(); + } + } + if (ev.event === "agent" && ev.stream === "lifecycle" && ev.data?.phase === "end") { closeReasoning(); closeText(); @@ -676,7 +779,7 @@ function wireSubscribeOnlyProcess( run._finalizeTimer = setTimeout(() => { run._finalizeTimer = null; if (run.status === "running") { finalizeSubscribeRun(run); } - }, 5_000); + }, SUBSCRIBE_LIFECYCLE_END_GRACE_MS); } if (ev.event === "agent" && ev.stream === "lifecycle" && ev.data?.phase === "error" && !agentErrorReported) { @@ -705,25 +808,31 @@ function wireSubscribeOnlyProcess( if (gSeq <= run.lastGlobalSeq) { return; } run.lastGlobalSeq = gSeq; } + if ((run._subscribeRetryAttempt ?? 0) > 0) { + resetSubscribeRetryState(run); + } processEvent(ev); }); child.on("close", () => { - if (run._subscribeProcess === child) { run._subscribeProcess = null; } + if (run._subscribeProcess !== child) { + return; + } + run._subscribeProcess = null; if (run.status !== "running") { return; } if (run._lifecycleEnded) { if (run._finalizeTimer) { clearTimeout(run._finalizeTimer); run._finalizeTimer = null; } finalizeSubscribeRun(run); return; } - setTimeout(() => { + scheduleSubscribeRestart(run, () => { if (run.status === "running" && !run._subscribeProcess) { const newChild = spawnAgentSubscribeProcess(sessionKey, run.lastGlobalSeq); run._subscribeProcess = newChild; run.childProcess = newChild; wireSubscribeOnlyProcess(run, newChild, sessionKey); } - }, 300); + }); }); child.on("error", (err) => { @@ -740,6 +849,8 @@ function wireSubscribeOnlyProcess( function finalizeSubscribeRun(run: ActiveRun, status: "completed" | "error" = "completed"): void { if (run.status !== "running") { return; } if (run._finalizeTimer) { clearTimeout(run._finalizeTimer); run._finalizeTimer = null; } + clearWaitingFinalizeTimer(run); + resetSubscribeRetryState(run); run.status = status; flushPersistence(run); @@ -860,10 +971,12 @@ function wireChildProcess(run: ActiveRun): void { let currentTextId = ""; let currentReasoningId = ""; + let currentStatusReasoningLabel: string | null = null; let textStarted = false; let reasoningStarted = false; let everSentText = false; let statusReasoningActive = false; + let waitingStatusAnnounced = false; let agentErrorReported = false; const stderrChunks: string[] = []; @@ -909,6 +1022,7 @@ function wireChildProcess(run: ActiveRun): void { reasoningStarted = false; statusReasoningActive = false; } + currentStatusReasoningLabel = null; accReasoningIdx = -1; }; @@ -930,6 +1044,9 @@ function wireChildProcess(run: ActiveRun): void { }; const openStatusReasoning = (label: string) => { + if (statusReasoningActive && currentStatusReasoningLabel === label) { + return; + } closeReasoning(); closeText(); currentReasoningId = nextId("status"); @@ -941,6 +1058,7 @@ function wireChildProcess(run: ActiveRun): void { }); reasoningStarted = true; statusReasoningActive = true; + currentStatusReasoningLabel = label; accAppendReasoning(label); }; @@ -956,6 +1074,22 @@ function wireChildProcess(run: ActiveRun): void { everSentText = true; }; + const emitAssistantFinalText = (text: string) => { + if (!text) { + return; + } + closeReasoning(); + if (!textStarted) { + currentTextId = nextId("text"); + emit({ type: "text-start", id: currentTextId }); + textStarted = true; + } + everSentText = true; + emit({ type: "text-delta", id: currentTextId, delta: text }); + accAppendText(text); + closeText(); + }; + // ── Parse stdout JSON lines ── const rl = createInterface({ input: child.stdout! }); @@ -1157,6 +1291,15 @@ function wireChildProcess(run: ActiveRun): void { } } + // Chat final events can include assistant turns from runs outside + // the original parent process (e.g. subagent announce follow-ups). + if (ev.event === "chat") { + const text = extractAssistantTextFromChatPayload(ev.data); + if (text) { + emitAssistantFinalText(text); + } + } + // Compaction if (ev.event === "agent" && ev.stream === "compaction") { const phase = @@ -1227,13 +1370,55 @@ function wireChildProcess(run: ActiveRun): void { if (gSeq <= run.lastGlobalSeq) {return;} run.lastGlobalSeq = gSeq; } + + const showWaitingStatus = () => { + if (!waitingStatusAnnounced) { + openStatusReasoning("Waiting for subagent results..."); + waitingStatusAnnounced = true; + } + flushPersistence(run); + }; + + const scheduleWaitingCompletionCheck = () => { + clearWaitingFinalizeTimer(run); + run._waitingFinalizeTimer = setTimeout(() => { + run._waitingFinalizeTimer = null; + if (run.status !== "waiting-for-subagents") { + return; + } + if (hasRunningSubagentsForParent(run.sessionId)) { + showWaitingStatus(); + return; + } + finalizeWaitingRun(run); + }, WAITING_FINALIZE_RECONCILE_MS); + }; + + // Any new parent event means waiting completion should be reconsidered + // from this point forward, not from a prior end/final checkpoint. + clearWaitingFinalizeTimer(run); + processParentEvent(ev); if (ev.stream === "lifecycle" && ev.data?.phase === "end") { if (hasRunningSubagentsForParent(run.sessionId)) { - openStatusReasoning("Waiting for subagent results..."); - flushPersistence(run); + clearWaitingFinalizeTimer(run); + showWaitingStatus(); } else { - finalizeWaitingRun(run); + scheduleWaitingCompletionCheck(); + } + } + if (ev.event === "chat") { + const payload = ev.data; + const state = typeof payload?.state === "string" ? payload.state : ""; + const message = asRecord(payload?.message); + const role = typeof message?.role === "string" ? message.role : ""; + if (state === "final" && role === "assistant") { + if (hasRunningSubagentsForParent(run.sessionId)) { + clearWaitingFinalizeTimer(run); + showWaitingStatus(); + } else { + scheduleWaitingCompletionCheck(); + } } } }; @@ -1315,7 +1500,10 @@ function wireChildProcess(run: ActiveRun): void { if (exitedClean && hasRunningSubagents) { run.status = "waiting-for-subagents"; - openStatusReasoning("Waiting for subagent results..."); + if (!waitingStatusAnnounced) { + openStatusReasoning("Waiting for subagent results..."); + waitingStatusAnnounced = true; + } flushPersistence(run); startParentSubscribeStream(run, parentSessionKey, processParentSubscribeEvent); return; @@ -1399,20 +1587,24 @@ function startParentSubscribeStream( if (ev.sessionKey && ev.sessionKey !== parentSessionKey) { return; } + if ((run._subscribeRetryAttempt ?? 0) > 0) { + resetSubscribeRetryState(run); + } onEvent(ev); }); child.on("close", () => { - if (run._subscribeProcess === child) { - run._subscribeProcess = null; + if (run._subscribeProcess !== child) { + return; } + run._subscribeProcess = null; if (run.status !== "waiting-for-subagents") {return;} // If still waiting, restart subscribe stream from the latest cursor. - setTimeout(() => { + scheduleSubscribeRestart(run, () => { if (run.status === "waiting-for-subagents" && !run._subscribeProcess) { startParentSubscribeStream(run, parentSessionKey, onEvent); } - }, 300); + }); }); child.on("error", (err) => { @@ -1425,6 +1617,8 @@ function startParentSubscribeStream( } function stopSubscribeProcess(run: ActiveRun): void { + clearSubscribeRetryTimer(run); + clearWaitingFinalizeTimer(run); if (!run._subscribeProcess) {return;} try { run._subscribeProcess.kill("SIGTERM"); @@ -1445,6 +1639,8 @@ function finalizeWaitingRun(run: ActiveRun): void { if (run.status !== "waiting-for-subagents") {return;} run.status = "completed"; + clearWaitingFinalizeTimer(run); + resetSubscribeRetryState(run); stopSubscribeProcess(run); @@ -1462,6 +1658,43 @@ function finalizeWaitingRun(run: ActiveRun): void { }, CLEANUP_GRACE_MS); } +function clearWaitingFinalizeTimer(run: ActiveRun): void { + if (!run._waitingFinalizeTimer) { + return; + } + clearTimeout(run._waitingFinalizeTimer); + run._waitingFinalizeTimer = null; +} + +function clearSubscribeRetryTimer(run: ActiveRun): void { + if (!run._subscribeRetryTimer) { + return; + } + clearTimeout(run._subscribeRetryTimer); + run._subscribeRetryTimer = null; +} + +function resetSubscribeRetryState(run: ActiveRun): void { + run._subscribeRetryAttempt = 0; + clearSubscribeRetryTimer(run); +} + +function scheduleSubscribeRestart(run: ActiveRun, restart: () => void): void { + if (run._subscribeRetryTimer) { + return; + } + const attempt = run._subscribeRetryAttempt ?? 0; + const delay = Math.min( + SUBSCRIBE_RETRY_MAX_MS, + SUBSCRIBE_RETRY_BASE_MS * 2 ** attempt, + ); + run._subscribeRetryAttempt = attempt + 1; + run._subscribeRetryTimer = setTimeout(() => { + run._subscribeRetryTimer = null; + restart(); + }, delay); +} + // ── Debounced persistence ── function schedulePersist(run: ActiveRun) { @@ -1566,6 +1799,7 @@ function cleanupRun(sessionId: string) { const run = activeRuns.get(sessionId); if (!run) {return;} if (run._persistTimer) {clearTimeout(run._persistTimer);} + clearWaitingFinalizeTimer(run); stopSubscribeProcess(run); activeRuns.delete(sessionId); }