From b29b2e39a331d9370c64cbb502adf296f7d7e430 Mon Sep 17 00:00:00 2001 From: kumarabhirup Date: Sun, 22 Feb 2026 00:06:31 -0800 Subject: [PATCH] fix(web): repair subagent streaming pipeline - Handle ev.data.text fallback when delta is absent in assistant events (both active-runs and subagent-runs) - Defer subagent finalizeRun until subscribe process closes so buffered events in readline are still delivered to SSE subscribers - Register subagents from sessions_spawn tool results in active-runs so hasRunningSubagentsForParent works without opening SubagentPanel first - Add disk registry fallback in hasRunningSubagentsForParent for cases where in-memory parentIndex has no entries - Fix pre-commit hook: tolerate oxfmt exit 2 when all files are ignored Co-authored-by: Cursor --- apps/web/lib/active-runs.ts | 31 ++++++++++-- apps/web/lib/subagent-runs.ts | 90 +++++++++++++++++++++++++++++------ git-hooks/pre-commit | 8 +++- 3 files changed, 110 insertions(+), 19 deletions(-) diff --git a/apps/web/lib/active-runs.ts b/apps/web/lib/active-runs.ts index 5e8ef67468a..dd82d13667d 100644 --- a/apps/web/lib/active-runs.ts +++ b/apps/web/lib/active-runs.ts @@ -31,6 +31,7 @@ import { } from "./agent-runner"; import { hasRunningSubagentsForParent, + registerSubagent, } from "./subagent-runs"; // ── Types ── @@ -592,7 +593,12 @@ function wireChildProcess(run: ActiveRun): void { typeof ev.data?.delta === "string" ? ev.data.delta : undefined; - if (delta) { + const textFallback = + !delta && typeof ev.data?.text === "string" + ? ev.data.text + : undefined; + const chunk = delta ?? textFallback; + if (chunk) { closeReasoning(); if (!textStarted) { currentTextId = nextId("text"); @@ -600,8 +606,8 @@ function wireChildProcess(run: ActiveRun): void { textStarted = true; } everSentText = true; - emit({ type: "text-delta", id: currentTextId, delta }); - accAppendText(delta); + emit({ type: "text-delta", id: currentTextId, delta: chunk }); + accAppendText(chunk); } // Media URLs const mediaUrls = ev.data?.mediaUrls; @@ -709,6 +715,25 @@ function wireChildProcess(run: ActiveRun): void { } } } + + if (toolName === "sessions_spawn" && !isError) { + const childSessionKey = + result?.details?.sessionKey as string | undefined; + const childRunId = + result?.details?.runId as string | undefined; + const spawnTask = + result?.details?.task as string | undefined; + const spawnLabel = + result?.details?.label as string | undefined; + if (childSessionKey && childRunId) { + registerSubagent(run.sessionId, { + sessionKey: childSessionKey, + runId: childRunId, + task: spawnTask ?? "Subagent task", + label: spawnLabel, + }); + } + } } } diff --git a/apps/web/lib/subagent-runs.ts b/apps/web/lib/subagent-runs.ts index 7f6b1851da0..46006adda25 100644 --- a/apps/web/lib/subagent-runs.ts +++ b/apps/web/lib/subagent-runs.ts @@ -45,6 +45,10 @@ type SubagentRun = SubagentInfo & { _state: TransformState; _subscribeProcess: ChildProcess | null; _cleanupTimer: ReturnType | null; + /** Set when lifecycle/end is received; actual finalization deferred to subscribe close. */ + _lifecycleEnded: boolean; + /** Safety timer to finalize if subscribe process hangs after lifecycle/end. */ + _finalizeTimer: ReturnType | null; /** Last globalSeq seen from the gateway event stream for replay cursor. */ lastGlobalSeq: number; }; @@ -233,6 +237,8 @@ export function registerSubagent( _state: createTransformState(), _subscribeProcess: null, _cleanupTimer: null, + _lifecycleEnded: false, + _finalizeTimer: null, lastGlobalSeq: 0, }; @@ -357,19 +363,49 @@ export function isSubagentRunning(sessionKey: string): boolean { export function hasRunningSubagentsForParent(parentWebSessionId: string): boolean { const reg = getRegistry(); const keys = reg.parentIndex.get(parentWebSessionId); - if (!keys) {return false;} - let anyRunning = false; - for (const key of keys) { - const run = reg.runs.get(key); - if (run?.status !== "running") {continue;} - const diskStatus = readDiskStatus(key); - if (diskStatus !== "running") { - finalizeRun(run, diskStatus === "error" ? "error" : "completed"); - continue; + + if (keys && keys.size > 0) { + let anyRunning = false; + for (const key of keys) { + const run = reg.runs.get(key); + if (run?.status !== "running") {continue;} + const diskStatus = readDiskStatus(key); + if (diskStatus !== "running") { + finalizeRun(run, diskStatus === "error" ? "error" : "completed"); + continue; + } + anyRunning = true; } - anyRunning = true; + if (anyRunning) {return true;} } - return anyRunning; + + // Fallback: check the gateway disk registry for running subagents + // that may not have been registered in-memory yet. + return checkDiskRegistryForRunningSubagents(parentWebSessionId); +} + +function checkDiskRegistryForRunningSubagents(parentWebSessionId: string): boolean { + const registryPath = join(resolveOpenClawStateDir(), "subagents", "runs.json"); + if (!existsSync(registryPath)) {return false;} + try { + const raw = JSON.parse(readFileSync(registryPath, "utf-8")) as { + runs?: Record>; + }; + const runs = raw?.runs; + if (!runs) {return false;} + const parentKeyPattern = `:web:${parentWebSessionId}`; + for (const entry of Object.values(runs)) { + const requester = typeof entry.requesterSessionKey === "string" + ? entry.requesterSessionKey + : ""; + if (!requester.endsWith(parentKeyPattern)) {continue;} + if (typeof entry.endedAt === "number") {continue;} + return true; + } + } catch { + // ignore read errors + } + return false; } /** Return session keys of all currently running subagents. */ @@ -657,7 +693,10 @@ function handleAgentEvent(run: SubagentRun, evt: AgentEvent): void { // Assistant text if (stream === "assistant") { const delta = typeof data.delta === "string" ? data.delta : undefined; - if (delta) { + const textFallback = + !delta && typeof data.text === "string" ? data.text : undefined; + const chunk = delta ?? textFallback; + if (chunk) { closeReasoning(); if (!st.textStarted) { st.currentTextId = nextId("text"); @@ -665,7 +704,7 @@ function handleAgentEvent(run: SubagentRun, evt: AgentEvent): void { st.textStarted = true; } st.everSentText = true; - emit({ type: "text-delta", id: st.currentTextId, delta }); + emit({ type: "text-delta", id: st.currentTextId, delta: chunk }); } // Inline error if ( @@ -728,11 +767,19 @@ function handleAgentEvent(run: SubagentRun, evt: AgentEvent): void { } } - // Lifecycle end → mark run completed + // Lifecycle end → defer finalization until subscribe process closes + // so any remaining events in the readline buffer are still delivered. if (stream === "lifecycle" && data.phase === "end") { closeReasoning(); closeText(); - finalizeRun(run, "completed"); + run._lifecycleEnded = true; + if (run._finalizeTimer) {clearTimeout(run._finalizeTimer);} + run._finalizeTimer = setTimeout(() => { + run._finalizeTimer = null; + if (run.status === "running") { + finalizeRun(run, "completed"); + } + }, 5_000); } // Lifecycle error @@ -746,6 +793,11 @@ function handleAgentEvent(run: SubagentRun, evt: AgentEvent): void { function finalizeRun(run: SubagentRun, status: "completed" | "error"): void { if (run.status !== "running") {return;} + if (run._finalizeTimer) { + clearTimeout(run._finalizeTimer); + run._finalizeTimer = null; + } + run.status = status; run.endedAt = Date.now(); @@ -821,6 +873,14 @@ function startSubagentSubscribeStream(run: SubagentRun): void { run._subscribeProcess = null; } if (run.status !== "running") {return;} + if (run._lifecycleEnded) { + if (run._finalizeTimer) { + clearTimeout(run._finalizeTimer); + run._finalizeTimer = null; + } + finalizeRun(run, "completed"); + return; + } setTimeout(() => { if (run.status === "running" && !run._subscribeProcess) { startSubagentSubscribeStream(run); diff --git a/git-hooks/pre-commit b/git-hooks/pre-commit index 948f2087ada..ef535c6abc8 100755 --- a/git-hooks/pre-commit +++ b/git-hooks/pre-commit @@ -43,7 +43,13 @@ if [ "${#lint_files[@]}" -gt 0 ]; then fi if [ "${#format_files[@]}" -gt 0 ]; then - "$RUN_NODE_TOOL" oxfmt --write -- "${format_files[@]}" + "$RUN_NODE_TOOL" oxfmt --write -- "${format_files[@]}" || { + rc=$? + # oxfmt exits 2 when all target files are in ignorePatterns; treat as success. + if [ $rc -ne 2 ]; then + exit $rc + fi + } fi git add -- "${files[@]}"