web: replace gateway WS with subscribe-child streams in subagent-runs

This commit is contained in:
kumarabhirup 2026-02-21 11:04:07 -08:00
parent a6dab967a2
commit f83731d3b5
No known key found for this signature in database
GPG Key ID: DB7CA2289CAB0167

View File

@ -4,17 +4,20 @@
* Mirrors the ActiveRunManager pattern: buffers SSE events, supports
* subscriber fan-out, and tracks subagent metadata per parent web session.
*
* Events are fed from the gateway WebSocket connection (gateway-events.ts).
* Events are fed from CLI NDJSON streams (parent run + subscribe continuations).
*/
import type { ChildProcess } from "node:child_process";
import { createInterface } from "node:readline";
import { existsSync, readFileSync, writeFileSync, mkdirSync, appendFileSync } from "node:fs";
import { join } from "node:path";
import {
type AgentEvent,
spawnAgentSubscribeProcess,
extractToolResult,
buildToolOutput,
parseAgentErrorMessage,
parseErrorBody,
} from "./agent-runner";
import { subscribeToSessionKey, type GatewayEvent } from "./gateway-events";
import { resolveOpenClawStateDir, resolveWebChatDir } from "./workspace";
// ── Types ──
@ -38,8 +41,10 @@ type SubagentRun = SubagentInfo & {
subscribers: Set<SubagentSubscriber>;
/** Internal state for event-to-SSE transformation */
_state: TransformState;
_unsubGateway: (() => void) | null;
_subscribeProcess: ChildProcess | null;
_cleanupTimer: ReturnType<typeof setTimeout> | null;
/** Last globalSeq seen from the gateway event stream for replay cursor. */
lastGlobalSeq: number;
};
type TransformState = {
@ -64,7 +69,7 @@ type SubagentRegistry = {
/** Reverse index: parent web session ID → subagent session keys */
parentIndex: Map<string, Set<string>>;
/** Pre-registration buffer: events that arrive before the subagent is registered */
preRegBuffer: Map<string, GatewayEvent[]>;
preRegBuffer: Map<string, AgentEvent[]>;
};
function getRegistry(): SubagentRegistry {
@ -227,8 +232,9 @@ export function registerSubagent(
eventBuffer: [],
subscribers: new Set(),
_state: createTransformState(),
_unsubGateway: null,
_subscribeProcess: null,
_cleanupTimer: null,
lastGlobalSeq: 0,
};
// Load persisted events from disk (fills the replay buffer)
@ -257,7 +263,7 @@ export function registerSubagent(
endedAt: run.endedAt,
});
// NOTE: We do NOT subscribe to gateway WebSocket here. During live
// NOTE: We do NOT start subscribe child processes here. During live
// streaming, events arrive via routeRawEvent() from the parent's NDJSON
// stream. After the parent exits, activateGatewayFallback() subscribes.
// For on-demand rehydration (page refresh), ensureSubagentStreamable()
@ -267,7 +273,7 @@ export function registerSubagent(
const buf = reg.preRegBuffer.get(info.sessionKey);
if (buf && buf.length > 0) {
for (const evt of buf) {
handleGatewayEvent(run, evt);
handleAgentEvent(run, evt);
}
reg.preRegBuffer.delete(info.sessionKey);
}
@ -276,14 +282,12 @@ export function registerSubagent(
/**
* Ensure a rehydrated subagent can receive live events. Called when a client
* actually connects to the subagent's SSE stream after a page refresh.
* For still-running subagents, this activates the gateway WebSocket fallback.
* For still-running subagents, this activates the subscribe-child fallback.
*/
export function ensureSubagentStreamable(sessionKey: string): void {
const run = getRegistry().runs.get(sessionKey);
if (!run || run.status !== "running" || run._unsubGateway) {return;}
run._unsubGateway = subscribeToSessionKey(sessionKey, (evt) => {
handleGatewayEvent(run, evt);
});
if (!run || run.status !== "running" || run._subscribeProcess) {return;}
startSubagentSubscribeStream(run);
}
/** Get metadata for all subagents belonging to a parent web session. */
@ -354,24 +358,45 @@ export function isSubagentRunning(sessionKey: string): boolean {
}
/**
* Activate gateway WebSocket subscriptions for all subagent runs that are
* Activate subscribe-child streams for all subagent runs that are
* still in "running" status and don't already have a gateway subscription.
*
* Called when the parent agent's NDJSON stream ends (child process exits).
* After that point the NDJSON routing is no longer available, so the
* gateway WS becomes the only event source for orphaned subagents.
* subscribe child streams become the only event source for orphaned subagents.
*/
export function activateGatewayFallback(): void {
const reg = getRegistry();
for (const [key, run] of reg.runs) {
if (run.status === "running" && !run._unsubGateway) {
run._unsubGateway = subscribeToSessionKey(key, (evt) => {
handleGatewayEvent(run, evt);
});
if (run.status === "running" && !run._subscribeProcess) {
startSubagentSubscribeStream(run);
}
}
}
/**
* Check if any subagents spawned by this parent web session are still running.
* Cross-checks with the on-disk registry to reconcile subagents whose
* lifecycle "end" events were missed (e.g. during gateway WS handshake).
*/
export function hasRunningSubagentsForParent(parentWebSessionId: string): boolean {
const reg = getRegistry();
const keys = reg.parentIndex.get(parentWebSessionId);
if (!keys) {return false;}
let anyRunning = false;
for (const key of keys) {
const run = reg.runs.get(key);
if (run?.status !== "running") {continue;}
const diskStatus = readDiskStatus(key);
if (diskStatus !== "running") {
finalizeRun(run, diskStatus === "error" ? "error" : "completed");
continue;
}
anyRunning = true;
}
return anyRunning;
}
/** Return session keys of all currently running subagents. */
export function getRunningSubagentKeys(): string[] {
const keys: string[] = [];
@ -389,21 +414,23 @@ export function getRunningSubagentKeys(): string[] {
* agent's CLI process already receives all gateway broadcasts, so we piggyback
* on its NDJSON stream instead of maintaining a separate WebSocket connection.
*
* Converts the flat NDJSON event shape to the nested GatewayEvent format that
* handleGatewayEvent expects.
* Routes a flat NDJSON event shape to the subagent transformer.
*/
export function routeRawEvent(
sessionKey: string,
ev: { event: string; stream?: string; data?: Record<string, unknown> },
ev: { event: string; stream?: string; data?: Record<string, unknown>; globalSeq?: number },
): void {
const gwEvt: GatewayEvent = {
const sourceEvent: AgentEvent = {
event: ev.event,
payload: { sessionKey, stream: ev.stream, data: ev.data },
sessionKey,
stream: ev.stream,
data: ev.data,
globalSeq: ev.globalSeq,
};
const run = getRegistry().runs.get(sessionKey);
if (run) {
handleGatewayEvent(run, gwEvt);
handleAgentEvent(run, sourceEvent);
return;
}
@ -416,7 +443,7 @@ export function routeRawEvent(
reg.preRegBuffer.set(sessionKey, buf);
}
if (buf.length < 10_000) {
buf.push(gwEvt);
buf.push(sourceEvent);
}
}
@ -484,14 +511,19 @@ function createTransformState(): TransformState {
};
}
function handleGatewayEvent(run: SubagentRun, evt: GatewayEvent): void {
if (evt.event !== "agent" || !evt.payload) {return;}
const payload = evt.payload;
const stream = typeof payload.stream === "string" ? payload.stream : undefined;
function handleAgentEvent(run: SubagentRun, evt: AgentEvent): void {
if (evt.event !== "agent") {return;}
const gSeq = typeof (evt as Record<string, unknown>).globalSeq === "number"
? (evt as Record<string, unknown>).globalSeq as number
: undefined;
if (gSeq !== undefined) {
if (gSeq <= run.lastGlobalSeq) {return;}
run.lastGlobalSeq = gSeq;
}
const stream = typeof evt.stream === "string" ? evt.stream : undefined;
const data =
payload.data && typeof payload.data === "object"
? (payload.data as Record<string, unknown>)
evt.data && typeof evt.data === "object"
? (evt.data)
: undefined;
if (!stream || !data) {return;}
@ -673,9 +705,7 @@ function finalizeRun(run: SubagentRun, status: "completed" | "error"): void {
}
run.subscribers.clear();
// Unsubscribe from gateway events
run._unsubGateway?.();
run._unsubGateway = null;
stopSubagentSubscribeStream(run);
// Schedule cleanup after grace period
run._cleanupTimer = setTimeout(() => {
@ -692,7 +722,7 @@ function cleanupRun(sessionKey: string): void {
clearTimeout(run._cleanupTimer);
run._cleanupTimer = null;
}
run._unsubGateway?.();
stopSubagentSubscribeStream(run);
reg.runs.delete(sessionKey);
// Clean up parent index
@ -704,3 +734,54 @@ function cleanupRun(sessionKey: string): void {
}
}
}
function startSubagentSubscribeStream(run: SubagentRun): void {
stopSubagentSubscribeStream(run);
const child = spawnAgentSubscribeProcess(run.sessionKey, run.lastGlobalSeq);
run._subscribeProcess = child;
const rl = createInterface({ input: child.stdout! });
rl.on("line", (line: string) => {
if (!line.trim()) {return;}
let ev: AgentEvent;
try {
ev = JSON.parse(line) as AgentEvent;
} catch {
return;
}
if (ev.sessionKey && ev.sessionKey !== run.sessionKey) {
return;
}
handleAgentEvent(run, ev);
});
child.on("close", () => {
if (run._subscribeProcess === child) {
run._subscribeProcess = null;
}
if (run.status !== "running") {return;}
setTimeout(() => {
if (run.status === "running" && !run._subscribeProcess) {
startSubagentSubscribeStream(run);
}
}, 300);
});
child.on("error", (err) => {
console.error("[subagent-runs] Subscribe child error:", err);
});
child.stderr?.on("data", (chunk: Buffer) => {
console.error("[subagent-runs subscribe stderr]", chunk.toString());
});
}
function stopSubagentSubscribeStream(run: SubagentRun): void {
if (!run._subscribeProcess) {return;}
try {
run._subscribeProcess.kill("SIGTERM");
} catch {
/* ignore */
}
run._subscribeProcess = null;
}