refactor(web): unify subagent tracking with parent session system

Subagents now use the same ActiveRun infrastructure as parent sessions:
- startSubscribeRun() creates a subscribe-only ActiveRun when sessions_spawn
  tool results are detected, using the same event buffering, persistence,
  and SSE reconnection as parent runs
- Stream/stop/chat routes no longer branch on subagent vs parent; both
  use getActiveRun/subscribeToRun with the session key as map key
- hasRunningSubagentsForParent moved into active-runs.ts to check the
  unified activeRuns map (+ disk registry fallback)
- Deferred finalization on lifecycle/end with 5s safety timeout
- ev.data.text fallback for assistant events without delta field
- 24h cleanup grace for subscribe-only runs (vs 30s for parent)

Reverts the broken childSessionKey registration from 32cfcf14f.

Co-authored-by: Cursor <cursoragent@cursor.com>
This commit is contained in:
kumarabhirup 2026-02-22 00:35:07 -08:00
parent 32cfcf14fa
commit a046cf2349
No known key found for this signature in database
GPG Key ID: DB7CA2289CAB0167
4 changed files with 521 additions and 240 deletions

View File

@ -1,35 +1,27 @@
import type { UIMessage } from "ai";
import { existsSync, readFileSync } from "node:fs";
import { join } from "node:path";
import { resolveAgentWorkspacePrefix } from "@/lib/workspace";
import {
startRun,
startSubscribeRun,
hasActiveRun,
getActiveRun,
subscribeToRun,
persistUserMessage,
type SseEvent as ParentSseEvent,
persistSubscribeUserMessage,
reactivateSubscribeRun,
sendSubagentFollowUp,
type SseEvent,
} from "@/lib/active-runs";
import {
hasActiveSubagent,
isSubagentRunning,
ensureRegisteredFromDisk,
subscribeToSubagent,
persistUserMessage as persistSubagentUserMessage,
reactivateSubagent,
spawnSubagentMessage,
type SseEvent as SubagentSseEvent,
} from "@/lib/subagent-runs";
import { existsSync, readFileSync } from "node:fs";
import { join } from "node:path";
import { resolveOpenClawStateDir } from "@/lib/workspace";
// Force Node.js runtime (required for child_process)
export const runtime = "nodejs";
// Allow streaming responses up to 10 minutes
export const maxDuration = 600;
function deriveSubagentParentSessionId(sessionKey: string): string {
function deriveSubagentInfo(sessionKey: string): { parentSessionId: string; task: string } | null {
const registryPath = join(resolveOpenClawStateDir(), "subagents", "runs.json");
if (!existsSync(registryPath)) {return "";}
if (!existsSync(registryPath)) {return null;}
try {
const raw = JSON.parse(readFileSync(registryPath, "utf-8")) as {
runs?: Record<string, Record<string, unknown>>;
@ -38,18 +30,14 @@ function deriveSubagentParentSessionId(sessionKey: string): string {
if (entry.childSessionKey !== sessionKey) {continue;}
const requester = typeof entry.requesterSessionKey === "string" ? entry.requesterSessionKey : "";
const match = requester.match(/^agent:[^:]+:web:(.+)$/);
return match?.[1] ?? "";
const parentSessionId = match?.[1] ?? "";
const task = typeof entry.task === "string" ? entry.task : "";
return { parentSessionId, task };
}
} catch {
// ignore
}
return "";
}
function ensureSubagentRegistered(sessionKey: string): boolean {
if (hasActiveSubagent(sessionKey)) {return true;}
const parentWebSessionId = deriveSubagentParentSessionId(sessionKey);
return ensureRegisteredFromDisk(sessionKey, parentWebSessionId);
return null;
}
export async function POST(req: Request) {
@ -59,7 +47,6 @@ export async function POST(req: Request) {
sessionKey,
}: { messages: UIMessage[]; sessionId?: string; sessionKey?: string } = await req.json();
// Extract the latest user message text
const lastUserMessage = messages.filter((m) => m.role === "user").pop();
const userText =
lastUserMessage?.parts
@ -76,15 +63,16 @@ export async function POST(req: Request) {
const isSubagentSession = typeof sessionKey === "string" && sessionKey.includes(":subagent:");
// Reject if a run is already active for this session.
if (!isSubagentSession && sessionId && hasActiveRun(sessionId)) {
return new Response("Active run in progress", { status: 409 });
}
if (isSubagentSession && isSubagentRunning(sessionKey)) {
return new Response("Active subagent run in progress", { status: 409 });
if (isSubagentSession && sessionKey) {
const existingRun = getActiveRun(sessionKey);
if (existingRun?.status === "running") {
return new Response("Active subagent run in progress", { status: 409 });
}
}
// Resolve workspace file paths to be agent-cwd-relative.
let agentMessage = userText;
const wsPrefix = resolveAgentWorkspacePrefix();
if (wsPrefix) {
@ -94,34 +82,35 @@ export async function POST(req: Request) {
);
}
// Persist the user message server-side so it survives a page reload
// even if the client never gets a chance to save.
const runKey = isSubagentSession && sessionKey ? sessionKey : (sessionId as string);
if (isSubagentSession && sessionKey && lastUserMessage) {
if (!ensureSubagentRegistered(sessionKey)) {
return new Response("Subagent not found", { status: 404 });
let run = getActiveRun(sessionKey);
if (!run) {
const info = deriveSubagentInfo(sessionKey);
if (!info) {
return new Response("Subagent not found", { status: 404 });
}
run = startSubscribeRun({
sessionKey,
parentSessionId: info.parentSessionId,
task: info.task,
});
}
persistSubagentUserMessage(sessionKey, {
persistSubscribeUserMessage(sessionKey, {
id: lastUserMessage.id,
text: userText,
});
reactivateSubscribeRun(sessionKey);
if (!sendSubagentFollowUp(sessionKey, agentMessage)) {
return new Response("Failed to send subagent message", { status: 500 });
}
} else if (sessionId && lastUserMessage) {
persistUserMessage(sessionId, {
id: lastUserMessage.id,
content: userText,
parts: lastUserMessage.parts as unknown[],
});
}
// Start the agent run (decoupled from this HTTP connection).
// The child process will keep running even if this response is cancelled.
if (isSubagentSession && sessionKey) {
if (!reactivateSubagent(sessionKey)) {
return new Response("Subagent not found", { status: 404 });
}
if (!spawnSubagentMessage(sessionKey, agentMessage)) {
return new Response("Failed to start subagent run", { status: 500 });
}
} else if (sessionId) {
try {
startRun({
sessionId,
@ -136,78 +125,40 @@ export async function POST(req: Request) {
}
}
// Stream SSE events to the client using the AI SDK v6 wire format.
const encoder = new TextEncoder();
let closed = false;
let unsubscribe: (() => void) | null = null;
const stream = new ReadableStream({
start(controller) {
if (!sessionId && !sessionKey) {
// No session — shouldn't happen but close gracefully.
if (!runKey) {
controller.close();
return;
}
unsubscribe = isSubagentSession && sessionKey
? subscribeToSubagent(
sessionKey,
(event: SubagentSseEvent | null) => {
if (closed) {return;}
if (event === null) {
closed = true;
try {
controller.close();
} catch {
/* already closed */
}
return;
}
try {
const json = JSON.stringify(event);
controller.enqueue(encoder.encode(`data: ${json}\n\n`));
} catch {
/* ignore enqueue errors on closed stream */
}
},
{ replay: false },
)
: subscribeToRun(
sessionId as string,
(event: ParentSseEvent | null) => {
unsubscribe = subscribeToRun(
runKey,
(event: SseEvent | null) => {
if (closed) {return;}
if (event === null) {
// Run completed — close the SSE stream.
closed = true;
try {
controller.close();
} catch {
/* already closed */
}
try { controller.close(); } catch { /* already closed */ }
return;
}
try {
const json = JSON.stringify(event);
controller.enqueue(
encoder.encode(`data: ${json}\n\n`),
);
} catch {
/* ignore enqueue errors on closed stream */
}
controller.enqueue(encoder.encode(`data: ${json}\n\n`));
} catch { /* ignore */ }
},
// Don't replay — we just created the run, the buffer is empty.
{ replay: false },
);
if (!unsubscribe) {
// Race: run was cleaned up between startRun and subscribe.
closed = true;
controller.close();
}
},
cancel() {
// Client disconnected — unsubscribe but keep the run alive.
// The ActiveRunManager continues buffering + persisting in the background.
closed = true;
unsubscribe?.();
},

View File

@ -2,59 +2,25 @@
* POST /api/chat/stop
*
* Abort an active agent run. Called by the Stop button.
* The child process is sent SIGTERM and the run transitions to "error" state.
* Works for both parent sessions (by sessionId) and subagent sessions (by sessionKey).
*/
import { abortRun } from "@/lib/active-runs";
import {
abortSubagent,
hasActiveSubagent,
isSubagentRunning,
ensureRegisteredFromDisk,
} from "@/lib/subagent-runs";
import { existsSync, readFileSync } from "node:fs";
import { join } from "node:path";
import { resolveOpenClawStateDir } from "@/lib/workspace";
import { abortRun, getActiveRun } from "@/lib/active-runs";
export const runtime = "nodejs";
function deriveSubagentParentSessionId(sessionKey: string): string {
const registryPath = join(resolveOpenClawStateDir(), "subagents", "runs.json");
if (!existsSync(registryPath)) {return "";}
try {
const raw = JSON.parse(readFileSync(registryPath, "utf-8")) as {
runs?: Record<string, Record<string, unknown>>;
};
for (const entry of Object.values(raw.runs ?? {})) {
if (entry.childSessionKey !== sessionKey) {continue;}
const requester = typeof entry.requesterSessionKey === "string" ? entry.requesterSessionKey : "";
const match = requester.match(/^agent:[^:]+:web:(.+)$/);
return match?.[1] ?? "";
}
} catch {
// ignore
}
return "";
}
export async function POST(req: Request) {
const body: { sessionId?: string; sessionKey?: string } = await req
.json()
.catch(() => ({}));
const isSubagentSession = typeof body.sessionKey === "string" && body.sessionKey.includes(":subagent:");
if (isSubagentSession && body.sessionKey) {
if (!hasActiveSubagent(body.sessionKey)) {
const parentWebSessionId = deriveSubagentParentSessionId(body.sessionKey);
ensureRegisteredFromDisk(body.sessionKey, parentWebSessionId);
}
const aborted = isSubagentRunning(body.sessionKey) ? abortSubagent(body.sessionKey) : false;
return Response.json({ aborted });
}
const runKey = isSubagentSession && body.sessionKey ? body.sessionKey : body.sessionId;
if (!body.sessionId) {
if (!runKey) {
return new Response("sessionId or subagent sessionKey required", { status: 400 });
}
const aborted = abortRun(body.sessionId);
const run = getActiveRun(runKey);
const aborted = run?.status === "running" ? abortRun(runKey) : false;
return Response.json({ aborted });
}

View File

@ -1,25 +1,19 @@
/**
* GET /api/chat/stream?sessionId=xxx
* GET /api/chat/stream?sessionId=xxx (parent sessions)
* GET /api/chat/stream?sessionKey=xxx (subagent sessions)
*
* Reconnect to an active (or recently-completed) agent run.
* Replays all buffered SSE events from the start of the run, then
* streams live events until the run finishes.
*
* Returns 404 if no run exists for the given session.
* Both parent and subagent sessions use the same ActiveRun system.
*/
import {
getActiveRun,
startSubscribeRun,
subscribeToRun,
type SseEvent as ParentSseEvent,
type SseEvent,
} from "@/lib/active-runs";
import {
subscribeToSubagent,
hasActiveSubagent,
isSubagentRunning,
ensureRegisteredFromDisk,
ensureSubagentStreamable,
type SseEvent as SubagentSseEvent,
} from "@/lib/subagent-runs";
import { existsSync, readFileSync } from "node:fs";
import { join } from "node:path";
import { resolveOpenClawStateDir } from "@/lib/workspace";
@ -27,9 +21,9 @@ import { resolveOpenClawStateDir } from "@/lib/workspace";
export const runtime = "nodejs";
export const maxDuration = 600;
function deriveSubagentParentSessionId(sessionKey: string): string {
function deriveSubagentInfo(sessionKey: string): { parentSessionId: string; task: string } | null {
const registryPath = join(resolveOpenClawStateDir(), "subagents", "runs.json");
if (!existsSync(registryPath)) {return "";}
if (!existsSync(registryPath)) {return null;}
try {
const raw = JSON.parse(readFileSync(registryPath, "utf-8")) as {
runs?: Record<string, Record<string, unknown>>;
@ -38,12 +32,14 @@ function deriveSubagentParentSessionId(sessionKey: string): string {
if (entry.childSessionKey !== sessionKey) {continue;}
const requester = typeof entry.requesterSessionKey === "string" ? entry.requesterSessionKey : "";
const match = requester.match(/^agent:[^:]+:web:(.+)$/);
return match?.[1] ?? "";
const parentSessionId = match?.[1] ?? "";
const task = typeof entry.task === "string" ? entry.task : "";
return { parentSessionId, task };
}
} catch {
// ignore
}
return "";
return null;
}
export async function GET(req: Request) {
@ -56,66 +52,21 @@ export async function GET(req: Request) {
return new Response("sessionId or subagent sessionKey required", { status: 400 });
}
if (isSubagentSession && sessionKey) {
if (!hasActiveSubagent(sessionKey)) {
const parentWebSessionId = deriveSubagentParentSessionId(sessionKey);
const registered = ensureRegisteredFromDisk(sessionKey, parentWebSessionId);
if (!registered && !hasActiveSubagent(sessionKey)) {
return Response.json({ active: false }, { status: 404 });
}
const runKey = isSubagentSession && sessionKey ? sessionKey : (sessionId as string);
let run = getActiveRun(runKey);
if (!run && isSubagentSession && sessionKey) {
const info = deriveSubagentInfo(sessionKey);
if (info) {
run = startSubscribeRun({
sessionKey,
parentSessionId: info.parentSessionId,
task: info.task,
});
}
ensureSubagentStreamable(sessionKey);
const isActive = isSubagentRunning(sessionKey);
const encoder = new TextEncoder();
let closed = false;
let unsubscribe: (() => void) | null = null;
const stream = new ReadableStream({
start(controller) {
unsubscribe = subscribeToSubagent(
sessionKey,
(event: SubagentSseEvent | null) => {
if (closed) {return;}
if (event === null) {
closed = true;
try {
controller.close();
} catch {
/* already closed */
}
return;
}
try {
const json = JSON.stringify(event);
controller.enqueue(encoder.encode(`data: ${json}\n\n`));
} catch {
/* ignore enqueue errors on closed stream */
}
},
{ replay: true },
);
if (!unsubscribe) {
closed = true;
controller.close();
}
},
cancel() {
closed = true;
unsubscribe?.();
},
});
return new Response(stream, {
headers: {
"Content-Type": "text/event-stream",
"Cache-Control": "no-cache, no-transform",
Connection: "keep-alive",
"X-Run-Active": isActive ? "true" : "false",
},
});
}
const run = getActiveRun(sessionId as string);
if (!run) {
return Response.json({ active: false }, { status: 404 });
}
@ -127,7 +78,6 @@ export async function GET(req: Request) {
const stream = new ReadableStream({
start(controller) {
// Keep idle SSE connections alive while waiting for subagent announcements.
keepalive = setInterval(() => {
if (closed) {return;}
try {
@ -137,14 +87,11 @@ export async function GET(req: Request) {
}
}, 15_000);
// subscribeToRun with replay=true replays the full event buffer
// synchronously, then subscribes for live events.
unsubscribe = subscribeToRun(
sessionId as string,
(event: ParentSseEvent | null) => {
runKey,
(event: SseEvent | null) => {
if (closed) {return;}
if (event === null) {
// Run completed — close the SSE stream.
closed = true;
if (keepalive) {
clearInterval(keepalive);
@ -159,9 +106,7 @@ export async function GET(req: Request) {
}
try {
const json = JSON.stringify(event);
controller.enqueue(
encoder.encode(`data: ${json}\n\n`),
);
controller.enqueue(encoder.encode(`data: ${json}\n\n`));
} catch {
/* ignore enqueue errors on closed stream */
}
@ -170,7 +115,6 @@ export async function GET(req: Request) {
);
if (!unsubscribe) {
// Run was cleaned up between getActiveRun and subscribe.
closed = true;
if (keepalive) {
clearInterval(keepalive);
@ -180,7 +124,6 @@ export async function GET(req: Request) {
}
},
cancel() {
// Client disconnected — unsubscribe only (don't kill the run).
closed = true;
if (keepalive) {
clearInterval(keepalive);

View File

@ -17,7 +17,7 @@ import {
existsSync,
mkdirSync,
} from "node:fs";
import { resolveWebChatDir } from "./workspace";
import { resolveWebChatDir, resolveOpenClawStateDir } from "./workspace";
import {
type AgentEvent,
spawnAgentProcess,
@ -29,10 +29,6 @@ import {
parseErrorBody,
parseErrorFromStderr,
} from "./agent-runner";
import {
hasRunningSubagentsForParent,
registerSubagent,
} from "./subagent-runs";
// ── Types ──
@ -79,12 +75,27 @@ export type ActiveRun = {
lastGlobalSeq: number;
/** @internal subscribe child process for waiting-for-subagents continuation */
_subscribeProcess?: ChildProcess | null;
/** Full gateway session key (used for subagent subscribe-only runs) */
sessionKey?: string;
/** Parent web session ID (for subagent runs) */
parentSessionId?: string;
/** Subagent task description */
task?: string;
/** Subagent label */
label?: string;
/** True for subscribe-only runs (subagents) that don't own the agent process */
isSubscribeOnly?: boolean;
/** Set when lifecycle/end is received; defers finalization until subscribe close */
_lifecycleEnded?: boolean;
/** Safety timer to finalize if subscribe process hangs after lifecycle/end */
_finalizeTimer?: ReturnType<typeof setTimeout> | null;
};
// ── Constants ──
const PERSIST_INTERVAL_MS = 2_000;
const CLEANUP_GRACE_MS = 30_000;
const SUBSCRIBE_CLEANUP_GRACE_MS = 24 * 60 * 60_000;
const SILENT_REPLY_TOKEN = "NO_REPLY";
@ -143,6 +154,33 @@ export function getRunningSessionIds(): string[] {
return ids;
}
/** Check if any subagent sessions are still running for a parent web session. */
export function hasRunningSubagentsForParent(parentWebSessionId: string): boolean {
for (const [_key, run] of activeRuns) {
if (run.isSubscribeOnly && run.parentSessionId === parentWebSessionId && run.status === "running") {
return true;
}
}
// Fallback: check the gateway disk registry
const registryPath = join(resolveOpenClawStateDir(), "subagents", "runs.json");
if (!existsSync(registryPath)) {return false;}
try {
const raw = JSON.parse(readFileSync(registryPath, "utf-8")) as {
runs?: Record<string, Record<string, unknown>>;
};
const runs = raw?.runs;
if (!runs) {return false;}
const parentKeyPattern = `:web:${parentWebSessionId}`;
for (const entry of Object.values(runs)) {
const requester = typeof entry.requesterSessionKey === "string" ? entry.requesterSessionKey : "";
if (!requester.endsWith(parentKeyPattern)) {continue;}
if (typeof entry.endedAt === "number") {continue;}
return true;
}
} catch { /* ignore */ }
return false;
}
/**
* Subscribe to an active run's SSE events.
*
@ -181,6 +219,85 @@ export function subscribeToRun(
};
}
/**
* Reactivate a completed subscribe-only run for a follow-up message.
* Resets status to "running" and restarts the subscribe stream.
*/
export function reactivateSubscribeRun(sessionKey: string): boolean {
const run = activeRuns.get(sessionKey);
if (!run?.isSubscribeOnly) {return false;}
if (run.status === "running") {return true;}
run.status = "running";
run._lifecycleEnded = false;
if (run._finalizeTimer) {clearTimeout(run._finalizeTimer); run._finalizeTimer = null;}
run.accumulated = {
id: `assistant-${sessionKey}-${Date.now()}`,
role: "assistant",
parts: [],
};
const newChild = spawnAgentSubscribeProcess(sessionKey, run.lastGlobalSeq);
run._subscribeProcess = newChild;
run.childProcess = newChild;
wireSubscribeOnlyProcess(run, newChild, sessionKey);
return true;
}
/**
* Send a follow-up message to a subagent session via gateway RPC.
* The subscribe stream picks up the agent's response events.
*/
export function sendSubagentFollowUp(sessionKey: string, message: string): boolean {
try {
const root = resolvePackageRoot();
const devScript = join(root, "scripts", "run-node.mjs");
const prodScript = join(root, "openclaw.mjs");
const scriptPath = existsSync(devScript) ? devScript : prodScript;
const child = spawn(
"node",
[
scriptPath, "gateway", "call", "agent",
"--params", JSON.stringify({
message, sessionKey,
idempotencyKey: `follow-${Date.now()}-${Math.random().toString(36).slice(2)}`,
deliver: false, channel: "webchat", lane: "subagent", timeout: 0,
}),
"--json", "--timeout", "10000",
],
{ cwd: root, env: { ...process.env }, stdio: "ignore", detached: true },
);
child.unref();
return true;
} catch {
return false;
}
}
/**
* Persist a user message for a subscribe-only (subagent) run.
* Emits a user-message event so reconnecting clients see the message.
*/
export function persistSubscribeUserMessage(
sessionKey: string,
msg: { id?: string; text: string },
): boolean {
const run = activeRuns.get(sessionKey);
if (!run) {return false;}
const event: SseEvent = {
type: "user-message",
id: msg.id ?? `user-${Date.now()}-${Math.random().toString(36).slice(2)}`,
text: msg.text,
};
run.eventBuffer.push(event);
for (const sub of run.subscribers) {
try { sub(event); } catch { /* ignore */ }
}
schedulePersist(run);
return true;
}
/** Abort a running agent. Returns true if a run was actually aborted. */
export function abortRun(sessionId: string): boolean {
const run = activeRuns.get(sessionId);
@ -334,6 +451,321 @@ export function startRun(params: {
return run;
}
/**
* Start a subscribe-only run for a subagent session.
* The agent is already running in the gateway; we just subscribe to its
* event stream so buffering, persistence, and reconnection work identically
* to parent sessions.
*/
export function startSubscribeRun(params: {
sessionKey: string;
parentSessionId: string;
task: string;
label?: string;
}): ActiveRun {
const { sessionKey, parentSessionId, task, label } = params;
if (activeRuns.has(sessionKey)) {
return activeRuns.get(sessionKey)!;
}
const abortController = new AbortController();
const subscribeChild = spawnAgentSubscribeProcess(sessionKey, 0);
const run: ActiveRun = {
sessionId: sessionKey,
childProcess: subscribeChild,
eventBuffer: [],
subscribers: new Set(),
accumulated: {
id: `assistant-${sessionKey}-${Date.now()}`,
role: "assistant",
parts: [],
},
status: "running",
startedAt: Date.now(),
exitCode: null,
abortController,
_persistTimer: null,
_lastPersistedAt: 0,
lastGlobalSeq: 0,
sessionKey,
parentSessionId,
task,
label,
isSubscribeOnly: true,
_lifecycleEnded: false,
_finalizeTimer: null,
};
activeRuns.set(sessionKey, run);
wireSubscribeOnlyProcess(run, subscribeChild, sessionKey);
return run;
}
/**
* Wire event processing for a subscribe-only run (subagent).
* Uses the same processParentEvent pipeline as parent runs,
* with deferred finalization on lifecycle/end.
*/
function wireSubscribeOnlyProcess(
run: ActiveRun,
child: ChildProcess,
sessionKey: string,
): void {
let idCounter = 0;
const nextId = (prefix: string) =>
`${prefix}-${Date.now()}-${++idCounter}`;
let currentTextId = "";
let currentReasoningId = "";
let textStarted = false;
let reasoningStarted = false;
let statusReasoningActive = false;
let agentErrorReported = false;
let accTextIdx = -1;
let accReasoningIdx = -1;
const accToolMap = new Map<string, number>();
const accAppendReasoning = (delta: string) => {
if (accReasoningIdx < 0) {
run.accumulated.parts.push({ type: "reasoning", text: delta });
accReasoningIdx = run.accumulated.parts.length - 1;
} else {
(run.accumulated.parts[accReasoningIdx] as { type: "reasoning"; text: string }).text += delta;
}
};
const accAppendText = (delta: string) => {
if (accTextIdx < 0) {
run.accumulated.parts.push({ type: "text", text: delta });
accTextIdx = run.accumulated.parts.length - 1;
} else {
(run.accumulated.parts[accTextIdx] as { type: "text"; text: string }).text += delta;
}
};
const emit = (event: SseEvent) => {
run.eventBuffer.push(event);
for (const sub of run.subscribers) {
try { sub(event); } catch { /* ignore */ }
}
schedulePersist(run);
};
const emitError = (message: string) => {
closeReasoning();
closeText();
const tid = nextId("text");
emit({ type: "text-start", id: tid });
emit({ type: "text-delta", id: tid, delta: `[error] ${message}` });
emit({ type: "text-end", id: tid });
accAppendText(`[error] ${message}`);
};
const closeReasoning = () => {
if (reasoningStarted) {
emit({ type: "reasoning-end", id: currentReasoningId });
reasoningStarted = false;
statusReasoningActive = false;
}
accReasoningIdx = -1;
};
const closeText = () => {
if (textStarted) {
const lastPart = run.accumulated.parts[accTextIdx];
if (lastPart?.type === "text" && isLeakedSilentReplyToken(lastPart.text)) {
run.accumulated.parts.splice(accTextIdx, 1);
}
emit({ type: "text-end", id: currentTextId });
textStarted = false;
}
accTextIdx = -1;
};
const openStatusReasoning = (label: string) => {
closeReasoning();
closeText();
currentReasoningId = nextId("status");
emit({ type: "reasoning-start", id: currentReasoningId });
emit({ type: "reasoning-delta", id: currentReasoningId, delta: label });
reasoningStarted = true;
statusReasoningActive = true;
};
const processEvent = (ev: AgentEvent) => {
if (ev.event === "agent" && ev.stream === "lifecycle" && ev.data?.phase === "start") {
openStatusReasoning("Preparing response...");
}
if (ev.event === "agent" && ev.stream === "thinking") {
const delta = typeof ev.data?.delta === "string" ? ev.data.delta : undefined;
if (delta) {
if (statusReasoningActive) { closeReasoning(); }
if (!reasoningStarted) {
currentReasoningId = nextId("reasoning");
emit({ type: "reasoning-start", id: currentReasoningId });
reasoningStarted = true;
}
emit({ type: "reasoning-delta", id: currentReasoningId, delta });
accAppendReasoning(delta);
}
}
if (ev.event === "agent" && ev.stream === "assistant") {
const delta = typeof ev.data?.delta === "string" ? ev.data.delta : undefined;
const textFallback = !delta && typeof ev.data?.text === "string" ? ev.data.text : undefined;
const chunk = delta ?? textFallback;
if (chunk) {
closeReasoning();
if (!textStarted) {
currentTextId = nextId("text");
emit({ type: "text-start", id: currentTextId });
textStarted = true;
}
emit({ type: "text-delta", id: currentTextId, delta: chunk });
accAppendText(chunk);
}
if (typeof ev.data?.stopReason === "string" && ev.data.stopReason === "error" && typeof ev.data?.errorMessage === "string" && !agentErrorReported) {
agentErrorReported = true;
emitError(parseErrorBody(ev.data.errorMessage));
}
}
if (ev.event === "agent" && ev.stream === "tool") {
const phase = typeof ev.data?.phase === "string" ? ev.data.phase : undefined;
const toolCallId = typeof ev.data?.toolCallId === "string" ? ev.data.toolCallId : "";
const toolName = typeof ev.data?.name === "string" ? ev.data.name : "";
if (phase === "start") {
closeReasoning();
closeText();
const args = ev.data?.args && typeof ev.data.args === "object" ? (ev.data.args as Record<string, unknown>) : {};
emit({ type: "tool-input-start", toolCallId, toolName });
emit({ type: "tool-input-available", toolCallId, toolName, input: args });
run.accumulated.parts.push({ type: "tool-invocation", toolCallId, toolName, args });
accToolMap.set(toolCallId, run.accumulated.parts.length - 1);
} else if (phase === "result") {
const isError = ev.data?.isError === true;
const result = extractToolResult(ev.data?.result);
if (isError) {
const errorText = result?.text || (result?.details?.error as string | undefined) || "Tool execution failed";
emit({ type: "tool-output-error", toolCallId, errorText });
} else {
const output = buildToolOutput(result);
emit({ type: "tool-output-available", toolCallId, output });
const idx = accToolMap.get(toolCallId);
if (idx !== undefined) {
const part = run.accumulated.parts[idx];
if (part.type === "tool-invocation") { part.result = output; }
}
}
}
}
if (ev.event === "agent" && ev.stream === "compaction") {
const phase = typeof ev.data?.phase === "string" ? ev.data.phase : undefined;
if (phase === "start") { openStatusReasoning("Optimizing session context..."); }
else if (phase === "end") {
if (statusReasoningActive) {
if (ev.data?.willRetry === true) {
emit({ type: "reasoning-delta", id: currentReasoningId, delta: "\nRetrying with compacted context..." });
} else { closeReasoning(); }
}
}
}
if (ev.event === "agent" && ev.stream === "lifecycle" && ev.data?.phase === "end") {
closeReasoning();
closeText();
run._lifecycleEnded = true;
if (run._finalizeTimer) { clearTimeout(run._finalizeTimer); }
run._finalizeTimer = setTimeout(() => {
run._finalizeTimer = null;
if (run.status === "running") { finalizeSubscribeRun(run); }
}, 5_000);
}
if (ev.event === "agent" && ev.stream === "lifecycle" && ev.data?.phase === "error" && !agentErrorReported) {
const msg = parseAgentErrorMessage(ev.data);
if (msg) { agentErrorReported = true; emitError(msg); }
finalizeSubscribeRun(run, "error");
}
if (ev.event === "error" && !agentErrorReported) {
const msg = parseAgentErrorMessage(ev.data ?? (ev as unknown as Record<string, unknown>));
if (msg) { agentErrorReported = true; emitError(msg); }
}
};
const rl = createInterface({ input: child.stdout! });
rl.on("line", (line: string) => {
if (!line.trim()) { return; }
let ev: AgentEvent;
try { ev = JSON.parse(line) as AgentEvent; } catch { return; }
if (ev.sessionKey && ev.sessionKey !== sessionKey) { return; }
const gSeq = typeof (ev as Record<string, unknown>).globalSeq === "number"
? (ev as Record<string, unknown>).globalSeq as number
: undefined;
if (gSeq !== undefined) {
if (gSeq <= run.lastGlobalSeq) { return; }
run.lastGlobalSeq = gSeq;
}
processEvent(ev);
});
child.on("close", () => {
if (run._subscribeProcess === child) { run._subscribeProcess = null; }
if (run.status !== "running") { return; }
if (run._lifecycleEnded) {
if (run._finalizeTimer) { clearTimeout(run._finalizeTimer); run._finalizeTimer = null; }
finalizeSubscribeRun(run);
return;
}
setTimeout(() => {
if (run.status === "running" && !run._subscribeProcess) {
const newChild = spawnAgentSubscribeProcess(sessionKey, run.lastGlobalSeq);
run._subscribeProcess = newChild;
run.childProcess = newChild;
wireSubscribeOnlyProcess(run, newChild, sessionKey);
}
}, 300);
});
child.on("error", (err) => {
console.error("[active-runs] Subscribe child error:", err);
});
child.stderr?.on("data", (chunk: Buffer) => {
console.error("[active-runs subscribe stderr]", chunk.toString());
});
run._subscribeProcess = child;
}
function finalizeSubscribeRun(run: ActiveRun, status: "completed" | "error" = "completed"): void {
if (run.status !== "running") { return; }
if (run._finalizeTimer) { clearTimeout(run._finalizeTimer); run._finalizeTimer = null; }
run.status = status;
flushPersistence(run);
for (const sub of run.subscribers) {
try { sub(null); } catch { /* ignore */ }
}
run.subscribers.clear();
stopSubscribeProcess(run);
const grace = run.isSubscribeOnly ? SUBSCRIBE_CLEANUP_GRACE_MS : CLEANUP_GRACE_MS;
setTimeout(() => {
if (activeRuns.get(run.sessionId) === run) { cleanupRun(run.sessionId); }
}, grace);
}
// ── Persistence helpers (called from route to persist user messages) ──
/** Save a user message to the session JSONL (called once at run start). */
@ -719,26 +1151,15 @@ function wireChildProcess(run: ActiveRun): void {
if (toolName === "sessions_spawn" && !isError) {
const childSessionKey =
result?.details?.childSessionKey as string | undefined;
const childRunId =
result?.details?.runId as string | undefined;
// task/label are in the tool input args, not the result
const spawnArgs = accToolMap.has(toolCallId)
? run.accumulated.parts[accToolMap.get(toolCallId)!]
: undefined;
const spawnTask =
(spawnArgs as Record<string, unknown> | undefined)?.args
? ((spawnArgs as Record<string, unknown>).args as Record<string, unknown>)?.task as string | undefined
if (childSessionKey) {
const spawnArgs = accToolMap.has(toolCallId)
? (run.accumulated.parts[accToolMap.get(toolCallId)!] as { args?: Record<string, unknown> })?.args
: undefined;
const spawnLabel =
(spawnArgs as Record<string, unknown> | undefined)?.args
? ((spawnArgs as Record<string, unknown>).args as Record<string, unknown>)?.label as string | undefined
: undefined;
if (childSessionKey && childRunId) {
registerSubagent(run.sessionId, {
startSubscribeRun({
sessionKey: childSessionKey,
runId: childRunId,
task: spawnTask ?? "Subagent task",
label: spawnLabel,
parentSessionId: run.sessionId,
task: (spawnArgs?.task as string | undefined) ?? "Subagent task",
label: spawnArgs?.label as string | undefined,
});
}
}