web: add waiting-for-subagents state and subscribe continuation in active-runs

This commit is contained in:
kumarabhirup 2026-02-21 11:04:26 -08:00
parent 6650287da2
commit 9d8c384517
No known key found for this signature in database
GPG Key ID: DB7CA2289CAB0167

View File

@ -21,6 +21,7 @@ import { resolveWebChatDir } from "./workspace";
import {
type AgentEvent,
spawnAgentProcess,
spawnAgentSubscribeProcess,
resolvePackageRoot,
extractToolResult,
buildToolOutput,
@ -33,6 +34,7 @@ import {
ensureRegisteredFromDisk,
hasActiveSubagent as hasSubagentRun,
activateGatewayFallback,
hasRunningSubagentsForParent,
} from "./subagent-runs";
// ── Types ──
@ -68,7 +70,7 @@ export type ActiveRun = {
eventBuffer: SseEvent[];
subscribers: Set<RunSubscriber>;
accumulated: AccumulatedMessage;
status: "running" | "completed" | "error";
status: "running" | "waiting-for-subagents" | "completed" | "error";
startedAt: number;
exitCode: number | null;
abortController: AbortController;
@ -76,6 +78,10 @@ export type ActiveRun = {
_persistTimer: ReturnType<typeof setTimeout> | null;
/** @internal last time persistence was flushed */
_lastPersistedAt: number;
/** @internal last globalSeq seen from the gateway event stream */
lastGlobalSeq: number;
/** @internal subscribe child process for waiting-for-subagents continuation */
_subscribeProcess?: ChildProcess | null;
};
// ── Constants ──
@ -109,14 +115,14 @@ export function getActiveRun(sessionId: string): ActiveRun | undefined {
/** Check whether a *running* (not just completed) run exists for a session. */
export function hasActiveRun(sessionId: string): boolean {
const run = activeRuns.get(sessionId);
return run !== undefined && run.status === "running";
return run !== undefined && (run.status === "running" || run.status === "waiting-for-subagents");
}
/** Return the session IDs of all currently running agent runs. */
export function getRunningSessionIds(): string[] {
const ids: string[] = [];
for (const [sessionId, run] of activeRuns) {
if (run.status === "running") {
if (run.status === "running" || run.status === "waiting-for-subagents") {
ids.push(sessionId);
}
}
@ -150,7 +156,7 @@ export function subscribeToRun(
}
// If the run already finished, signal completion immediately.
if (run.status !== "running") {
if (run.status !== "running" && run.status !== "waiting-for-subagents") {
callback(null);
return () => {};
}
@ -164,14 +170,20 @@ export function subscribeToRun(
/** Abort a running agent. Returns true if a run was actually aborted. */
export function abortRun(sessionId: string): boolean {
const run = activeRuns.get(sessionId);
if (!run || run.status !== "running") {return false;}
if (!run || (run.status !== "running" && run.status !== "waiting-for-subagents")) {return false;}
// Immediately mark the run as non-running so hasActiveRun() returns
// false and the next user message isn't rejected with 409.
const wasWaiting = run.status === "waiting-for-subagents";
run.status = "error";
// Clean up waiting subscribe process if present.
stopSubscribeProcess(run);
run.abortController.abort();
run.childProcess.kill("SIGTERM");
if (!wasWaiting) {
run.childProcess.kill("SIGTERM");
}
// Send chat.abort directly to the gateway so the agent run stops
// even if the CLI child's best-effort onAbort doesn't complete in time.
@ -196,12 +208,14 @@ export function abortRun(sessionId: string): boolean {
// Fallback: if the child doesn't exit within 5 seconds after
// SIGTERM (e.g. the CLI's best-effort chat.abort RPC hangs),
// send SIGKILL to force-terminate.
const killTimer = setTimeout(() => {
try {
run.childProcess.kill("SIGKILL");
} catch { /* already dead */ }
}, 5_000);
run.childProcess.once("close", () => clearTimeout(killTimer));
if (!wasWaiting) {
const killTimer = setTimeout(() => {
try {
run.childProcess.kill("SIGKILL");
} catch { /* already dead */ }
}, 5_000);
run.childProcess.once("close", () => clearTimeout(killTimer));
}
return true;
}
@ -284,6 +298,7 @@ export function startRun(params: {
abortController,
_persistTimer: null,
_lastPersistedAt: 0,
lastGlobalSeq: 0,
};
activeRuns.set(sessionId, run);
@ -512,39 +527,12 @@ function wireChildProcess(run: ActiveRun): void {
// emitting user-visible diagnostics.
});
rl.on("line", (line: string) => {
if (!line.trim()) {return;}
let ev: AgentEvent;
try {
ev = JSON.parse(line) as AgentEvent;
} catch {
return;
}
// ── Route non-parent events to SubagentRunManager ──
// The CLI child process receives ALL gateway broadcasts, including
// events from subagent runs. Filter them out of the parent chat
// and route to the SubagentRunManager for separate streaming.
if (ev.sessionKey && ev.sessionKey !== parentSessionKey) {
const childKey = ev.sessionKey;
// Try to register the subagent if not yet known. Events
// arriving before runs.json is written get buffered inside
// routeRawEvent and replayed upon successful registration.
if (!hasSubagentRun(childKey) && !seenSubagentKeys.has(childKey)) {
if (ensureRegisteredFromDisk(childKey, run.sessionId)) {
seenSubagentKeys.add(childKey);
}
// Don't add to seenSubagentKeys on failure — retry on the next event
}
routeSubagentEvent(childKey, {
event: ev.event,
stream: ev.stream,
data: ev.data,
});
return;
}
// ── Reusable parent event processor ──
// Handles lifecycle, thinking, assistant text, tool, compaction, and error
// events for the parent agent. Used by both the CLI NDJSON stream and the
// subscribe-only CLI fallback (waiting-for-subagents state).
const processParentEvent = (ev: AgentEvent) => {
// Lifecycle start
if (
ev.event === "agent" &&
@ -660,7 +648,6 @@ function wireChildProcess(run: ActiveRun): void {
toolName,
input: args,
});
// Accumulate tool start in ordered parts
run.accumulated.parts.push({
type: "tool-invocation",
toolCallId,
@ -681,7 +668,6 @@ function wireChildProcess(run: ActiveRun): void {
toolCallId,
errorText,
});
// Update the accumulated tool part
const idx = accToolMap.get(toolCallId);
if (idx !== undefined) {
const part = run.accumulated.parts[idx];
@ -696,7 +682,6 @@ function wireChildProcess(run: ActiveRun): void {
toolCallId,
output,
});
// Update the accumulated tool part
const idx = accToolMap.get(toolCallId);
if (idx !== undefined) {
const part = run.accumulated.parts[idx];
@ -768,6 +753,68 @@ function wireChildProcess(run: ActiveRun): void {
emitError(msg);
}
}
};
const processParentSubscribeEvent = (ev: AgentEvent) => {
const gSeq = typeof (ev as Record<string, unknown>).globalSeq === "number"
? (ev as Record<string, unknown>).globalSeq as number
: undefined;
if (gSeq !== undefined) {
if (gSeq <= run.lastGlobalSeq) {return;}
run.lastGlobalSeq = gSeq;
}
processParentEvent(ev);
if (ev.stream === "lifecycle" && ev.data?.phase === "end") {
if (hasRunningSubagentsForParent(run.sessionId)) {
openStatusReasoning("Waiting for subagent results...");
flushPersistence(run);
} else {
finalizeWaitingRun(run);
}
}
};
rl.on("line", (line: string) => {
if (!line.trim()) {return;}
let ev: AgentEvent;
try {
ev = JSON.parse(line) as AgentEvent;
} catch {
return;
}
// ── Route non-parent events to SubagentRunManager ──
// The CLI child process receives ALL gateway broadcasts, including
// events from subagent runs. Filter them out of the parent chat
// and route to the SubagentRunManager for separate streaming.
if (ev.sessionKey && ev.sessionKey !== parentSessionKey) {
const childKey = ev.sessionKey;
if (!hasSubagentRun(childKey) && !seenSubagentKeys.has(childKey)) {
if (ensureRegisteredFromDisk(childKey, run.sessionId)) {
seenSubagentKeys.add(childKey);
}
}
routeSubagentEvent(childKey, {
event: ev.event,
stream: ev.stream,
data: ev.data,
globalSeq: typeof (ev as Record<string, unknown>).globalSeq === "number"
? (ev as Record<string, unknown>).globalSeq as number
: undefined,
});
return;
}
// Track the global event cursor from the gateway for replay on handoff.
const gSeq = typeof (ev as Record<string, unknown>).globalSeq === "number"
? (ev as Record<string, unknown>).globalSeq as number
: undefined;
if (gSeq !== undefined && gSeq > run.lastGlobalSeq) {
run.lastGlobalSeq = gSeq;
}
processParentEvent(ev);
});
// ── Child process exit ──
@ -789,28 +836,48 @@ function wireChildProcess(run: ActiveRun): void {
}
closeReasoning();
if (!everSentText) {
const exitedClean = code === 0 || code === null;
if (!everSentText && !exitedClean) {
const tid = nextId("text");
emit({ type: "text-start", id: tid });
const errMsg =
code !== null && code !== 0
? `[error] Agent exited with code ${code}. Check server logs for details.`
: "[error] No response from agent.";
const errMsg = `[error] Agent exited with code ${code}. Check server logs for details.`;
emit({ type: "text-delta", id: tid, delta: errMsg });
emit({ type: "text-end", id: tid });
accAppendText(errMsg);
} else if (!everSentText && exitedClean) {
const tid = nextId("text");
emit({ type: "text-start", id: tid });
const msg = "No response from agent.";
emit({ type: "text-delta", id: tid, delta: msg });
emit({ type: "text-end", id: tid });
accAppendText(msg);
} else {
closeText();
}
run.status = code === 0 || code === null ? "completed" : "error";
run.exitCode = code;
// The parent's NDJSON stream has ended. Any subagents that are
// still running lose their event source (sessions_spawn is
// fire-and-forget). Switch to gateway WebSocket subscriptions
// so they keep streaming.
activateGatewayFallback();
const hasRunningSubagents = hasRunningSubagentsForParent(run.sessionId);
// If the CLI exited cleanly and subagents are still running,
// keep the SSE stream open and wait for announcement-triggered
// parent turns via subscribe-only CLI NDJSON.
if (exitedClean && hasRunningSubagents) {
// The parent's NDJSON stream has ended; running subagents lose their
// original event source, so switch each subagent to subscribe children.
activateGatewayFallback();
run.status = "waiting-for-subagents";
openStatusReasoning("Waiting for subagent results...");
flushPersistence(run);
startParentSubscribeStream(run, parentSessionKey, processParentSubscribeEvent);
return;
}
// Normal completion path.
run.status = exitedClean ? "completed" : "error";
// Final persistence flush (removes _streaming flag).
flushPersistence(run);
@ -865,6 +932,90 @@ function wireChildProcess(run: ActiveRun): void {
});
}
function startParentSubscribeStream(
run: ActiveRun,
parentSessionKey: string,
onEvent: (ev: AgentEvent) => void,
): void {
stopSubscribeProcess(run);
const child = spawnAgentSubscribeProcess(parentSessionKey, run.lastGlobalSeq);
run._subscribeProcess = child;
const rl = createInterface({ input: child.stdout! });
rl.on("line", (line: string) => {
if (!line.trim()) {return;}
let ev: AgentEvent;
try {
ev = JSON.parse(line) as AgentEvent;
} catch {
return;
}
if (ev.sessionKey && ev.sessionKey !== parentSessionKey) {
return;
}
onEvent(ev);
});
child.on("close", () => {
if (run._subscribeProcess === child) {
run._subscribeProcess = null;
}
if (run.status !== "waiting-for-subagents") {return;}
// If still waiting, restart subscribe stream from the latest cursor.
setTimeout(() => {
if (run.status === "waiting-for-subagents" && !run._subscribeProcess) {
startParentSubscribeStream(run, parentSessionKey, onEvent);
}
}, 300);
});
child.on("error", (err) => {
console.error("[active-runs] Parent subscribe child error:", err);
});
child.stderr?.on("data", (chunk: Buffer) => {
console.error("[active-runs subscribe stderr]", chunk.toString());
});
}
function stopSubscribeProcess(run: ActiveRun): void {
if (!run._subscribeProcess) {return;}
try {
run._subscribeProcess.kill("SIGTERM");
} catch {
/* ignore */
}
run._subscribeProcess = null;
}
// ── Finalize a waiting-for-subagents run ──
/**
* Transition a run from "waiting-for-subagents" to "completed".
* Called when the last subagent finishes and the parent's announcement-
* triggered turn completes.
*/
function finalizeWaitingRun(run: ActiveRun): void {
if (run.status !== "waiting-for-subagents") {return;}
run.status = "completed";
stopSubscribeProcess(run);
flushPersistence(run);
for (const sub of run.subscribers) {
try { sub(null); } catch { /* ignore */ }
}
run.subscribers.clear();
setTimeout(() => {
if (activeRuns.get(run.sessionId) === run) {
cleanupRun(run.sessionId);
}
}, CLEANUP_GRACE_MS);
}
// ── Debounced persistence ──
function schedulePersist(run: ActiveRun) {
@ -896,7 +1047,7 @@ function flushPersistence(run: ActiveRun) {
.map((p) => p.text)
.join("");
const isStillRunning = run.status === "running";
const isStillStreaming = run.status === "running" || run.status === "waiting-for-subagents";
const message: Record<string, unknown> = {
id: run.accumulated.id,
role: "assistant",
@ -904,7 +1055,7 @@ function flushPersistence(run: ActiveRun) {
parts, // Ordered parts — preserves interleaving of reasoning, tools, text
timestamp: new Date().toISOString(),
};
if (isStillRunning) {
if (isStillStreaming) {
message._streaming = true;
}
@ -959,5 +1110,6 @@ function cleanupRun(sessionId: string) {
const run = activeRuns.get(sessionId);
if (!run) {return;}
if (run._persistTimer) {clearTimeout(run._persistTimer);}
stopSubscribeProcess(run);
activeRuns.delete(sessionId);
}