1806 lines
51 KiB
TypeScript
1806 lines
51 KiB
TypeScript
/**
|
|
* Server-side singleton that manages agent child processes independently of
|
|
* HTTP connections. Buffers SSE events, fans out to subscribers, and
|
|
* persists assistant messages incrementally to disk.
|
|
*
|
|
* This decouples agent lifecycles from request lifecycles so:
|
|
* - Streams survive page reloads (process keeps running).
|
|
* - Messages are written to persistent sessions as they arrive.
|
|
* - New HTTP connections can re-attach to a running stream.
|
|
*/
|
|
import { createInterface } from "node:readline";
|
|
import { join } from "node:path";
|
|
import {
|
|
readFileSync,
|
|
writeFileSync,
|
|
existsSync,
|
|
mkdirSync,
|
|
} from "node:fs";
|
|
import { resolveWebChatDir, resolveOpenClawStateDir } from "./workspace";
|
|
import {
|
|
type AgentProcessHandle,
|
|
type AgentEvent,
|
|
spawnAgentProcess,
|
|
spawnAgentSubscribeProcess,
|
|
callGatewayRpc,
|
|
extractToolResult,
|
|
buildToolOutput,
|
|
parseAgentErrorMessage,
|
|
parseErrorBody,
|
|
parseErrorFromStderr,
|
|
} from "./agent-runner";
|
|
|
|
// ── Types ──
|
|
|
|
/** An SSE event object in the AI SDK v6 data stream wire format. */
|
|
export type SseEvent = Record<string, unknown> & { type: string };
|
|
|
|
/** Subscriber callback. Receives SSE events, or `null` when the run completes. */
|
|
export type RunSubscriber = (event: SseEvent | null) => void;
|
|
|
|
type AccumulatedPart =
|
|
| { type: "reasoning"; text: string }
|
|
| {
|
|
type: "tool-invocation";
|
|
toolCallId: string;
|
|
toolName: string;
|
|
args: Record<string, unknown>;
|
|
result?: Record<string, unknown>;
|
|
errorText?: string;
|
|
}
|
|
| { type: "text"; text: string };
|
|
|
|
type AccumulatedMessage = {
|
|
id: string;
|
|
role: "assistant";
|
|
/** Ordered parts preserving the interleaving of reasoning, tools, and text. */
|
|
parts: AccumulatedPart[];
|
|
};
|
|
|
|
export type ActiveRun = {
|
|
sessionId: string;
|
|
childProcess: AgentProcessHandle;
|
|
eventBuffer: SseEvent[];
|
|
subscribers: Set<RunSubscriber>;
|
|
accumulated: AccumulatedMessage;
|
|
status: "running" | "waiting-for-subagents" | "completed" | "error";
|
|
startedAt: number;
|
|
exitCode: number | null;
|
|
abortController: AbortController;
|
|
/** @internal debounced persistence timer */
|
|
_persistTimer: ReturnType<typeof setTimeout> | null;
|
|
/** @internal last time persistence was flushed */
|
|
_lastPersistedAt: number;
|
|
/** @internal last globalSeq seen from the gateway event stream */
|
|
lastGlobalSeq: number;
|
|
/** @internal subscribe child process for waiting-for-subagents continuation */
|
|
_subscribeProcess?: AgentProcessHandle | null;
|
|
/** @internal retry timer for subscribe stream restarts */
|
|
_subscribeRetryTimer?: ReturnType<typeof setTimeout> | null;
|
|
/** @internal consecutive subscribe restart attempts without a received event */
|
|
_subscribeRetryAttempt?: number;
|
|
/** Full gateway session key (used for subagent subscribe-only runs) */
|
|
sessionKey?: string;
|
|
/** Parent web session ID (for subagent runs) */
|
|
parentSessionId?: string;
|
|
/** Subagent task description */
|
|
task?: string;
|
|
/** Subagent label */
|
|
label?: string;
|
|
/** True for subscribe-only runs (subagents) that don't own the agent process */
|
|
isSubscribeOnly?: boolean;
|
|
/** Set when lifecycle/end is received; defers finalization until subscribe close */
|
|
_lifecycleEnded?: boolean;
|
|
/** Safety timer to finalize if subscribe process hangs after lifecycle/end */
|
|
_finalizeTimer?: ReturnType<typeof setTimeout> | null;
|
|
/** @internal short reconciliation window before waiting-run completion */
|
|
_waitingFinalizeTimer?: ReturnType<typeof setTimeout> | null;
|
|
};
|
|
|
|
// ── Constants ──
|
|
|
|
const PERSIST_INTERVAL_MS = 2_000;
|
|
const CLEANUP_GRACE_MS = 30_000;
|
|
const SUBSCRIBE_CLEANUP_GRACE_MS = 24 * 60 * 60_000;
|
|
const SUBSCRIBE_RETRY_BASE_MS = 300;
|
|
const SUBSCRIBE_RETRY_MAX_MS = 5_000;
|
|
const SUBSCRIBE_LIFECYCLE_END_GRACE_MS = 750;
|
|
const WAITING_FINALIZE_RECONCILE_MS = 5_000;
|
|
|
|
const SILENT_REPLY_TOKEN = "NO_REPLY";
|
|
|
|
/**
|
|
* Detect leaked silent-reply fragments in finalized text parts.
|
|
* The agent runner suppresses full "NO_REPLY" tokens, but during streaming
|
|
* the model may emit a partial prefix (e.g. "NO") before the full token is
|
|
* assembled and caught. This catches both the full token and known partial
|
|
* prefixes so they don't leak into persisted/displayed messages.
|
|
*/
|
|
function isLeakedSilentReplyToken(text: string): boolean {
|
|
const t = text.trim();
|
|
if (!t) {return false;}
|
|
if (new RegExp(`^${SILENT_REPLY_TOKEN}\\W*$`).test(t)) {return true;}
|
|
if (SILENT_REPLY_TOKEN.startsWith(t) && t.length >= 2 && t.length < SILENT_REPLY_TOKEN.length) {return true;}
|
|
return false;
|
|
}
|
|
|
|
function asRecord(value: unknown): Record<string, unknown> | null {
|
|
if (!value || typeof value !== "object" || Array.isArray(value)) {
|
|
return null;
|
|
}
|
|
return value as Record<string, unknown>;
|
|
}
|
|
|
|
function extractAssistantTextFromChatPayload(
|
|
data: Record<string, unknown> | undefined,
|
|
): string {
|
|
if (!data) {
|
|
return "";
|
|
}
|
|
const state = typeof data.state === "string" ? data.state : "";
|
|
if (state !== "final") {
|
|
return "";
|
|
}
|
|
const message = asRecord(data.message);
|
|
if (!message || message.role !== "assistant") {
|
|
return "";
|
|
}
|
|
const content = message.content;
|
|
if (typeof content === "string") {
|
|
return content;
|
|
}
|
|
if (!Array.isArray(content)) {
|
|
return "";
|
|
}
|
|
const chunks: string[] = [];
|
|
for (const part of content) {
|
|
const rec = asRecord(part);
|
|
if (!rec) {
|
|
continue;
|
|
}
|
|
const type = typeof rec.type === "string" ? rec.type : "";
|
|
if ((type === "text" || type === "output_text") && typeof rec.text === "string") {
|
|
chunks.push(rec.text);
|
|
}
|
|
}
|
|
return chunks.join("");
|
|
}
|
|
// Evaluated per-call so it tracks profile switches at runtime.
|
|
function webChatDir(): string { return resolveWebChatDir(); }
|
|
function indexFile(): string { return join(webChatDir(), "index.json"); }
|
|
|
|
// ── Singleton registry ──
|
|
// Store on globalThis so the Map survives Next.js HMR reloads in dev mode.
|
|
// Without this, hot-reloading any server module resets the Map, orphaning
|
|
// running child processes and dropping SSE streams mid-flight.
|
|
|
|
const GLOBAL_KEY = "__openclaw_activeRuns" as const;
|
|
|
|
const activeRuns: Map<string, ActiveRun> =
|
|
(globalThis as Record<string, unknown>)[GLOBAL_KEY] as Map<string, ActiveRun> ??
|
|
new Map<string, ActiveRun>();
|
|
|
|
(globalThis as Record<string, unknown>)[GLOBAL_KEY] = activeRuns;
|
|
|
|
// ── Public API ──
|
|
|
|
/** Retrieve an active or recently-completed run (within the grace period). */
|
|
export function getActiveRun(sessionId: string): ActiveRun | undefined {
|
|
return activeRuns.get(sessionId);
|
|
}
|
|
|
|
/** Check whether a *running* (not just completed) run exists for a session. */
|
|
export function hasActiveRun(sessionId: string): boolean {
|
|
const run = activeRuns.get(sessionId);
|
|
return run !== undefined && (run.status === "running" || run.status === "waiting-for-subagents");
|
|
}
|
|
|
|
/** Return the session IDs of all currently running agent runs. */
|
|
export function getRunningSessionIds(): string[] {
|
|
const ids: string[] = [];
|
|
for (const [sessionId, run] of activeRuns) {
|
|
if (run.status === "running" || run.status === "waiting-for-subagents") {
|
|
ids.push(sessionId);
|
|
}
|
|
}
|
|
return ids;
|
|
}
|
|
|
|
/** Check if any subagent sessions are still running for a parent web session. */
|
|
export function hasRunningSubagentsForParent(parentWebSessionId: string): boolean {
|
|
for (const [_key, run] of activeRuns) {
|
|
if (run.isSubscribeOnly && run.parentSessionId === parentWebSessionId && run.status === "running") {
|
|
return true;
|
|
}
|
|
}
|
|
// Fallback: check the gateway disk registry
|
|
const registryPath = join(resolveOpenClawStateDir(), "subagents", "runs.json");
|
|
if (!existsSync(registryPath)) {return false;}
|
|
try {
|
|
const raw = JSON.parse(readFileSync(registryPath, "utf-8")) as {
|
|
runs?: Record<string, Record<string, unknown>>;
|
|
};
|
|
const runs = raw?.runs;
|
|
if (!runs) {return false;}
|
|
const parentKeyPattern = `:web:${parentWebSessionId}`;
|
|
for (const entry of Object.values(runs)) {
|
|
const requester = typeof entry.requesterSessionKey === "string" ? entry.requesterSessionKey : "";
|
|
if (!requester.endsWith(parentKeyPattern)) {continue;}
|
|
if (typeof entry.endedAt === "number") {continue;}
|
|
return true;
|
|
}
|
|
} catch { /* ignore */ }
|
|
return false;
|
|
}
|
|
|
|
/**
|
|
* Subscribe to an active run's SSE events.
|
|
*
|
|
* When `replay` is true (default), all buffered events are replayed first
|
|
* (synchronously), then live events follow. If the run already finished,
|
|
* the subscriber is called with `null` after the replay.
|
|
*
|
|
* Returns an unsubscribe function, or `null` if no run exists.
|
|
*/
|
|
export function subscribeToRun(
|
|
sessionId: string,
|
|
callback: RunSubscriber,
|
|
options?: { replay?: boolean },
|
|
): (() => void) | null {
|
|
const run = activeRuns.get(sessionId);
|
|
if (!run) {return null;}
|
|
|
|
const replay = options?.replay ?? true;
|
|
|
|
// Replay buffered events synchronously (safe — no event-loop yield).
|
|
if (replay) {
|
|
for (const event of run.eventBuffer) {
|
|
callback(event);
|
|
}
|
|
}
|
|
|
|
// If the run already finished, signal completion immediately.
|
|
if (run.status !== "running" && run.status !== "waiting-for-subagents") {
|
|
callback(null);
|
|
return () => {};
|
|
}
|
|
|
|
run.subscribers.add(callback);
|
|
return () => {
|
|
run.subscribers.delete(callback);
|
|
};
|
|
}
|
|
|
|
/**
|
|
* Reactivate a completed subscribe-only run for a follow-up message.
|
|
* Resets status to "running" and restarts the subscribe stream.
|
|
*/
|
|
export function reactivateSubscribeRun(sessionKey: string): boolean {
|
|
const run = activeRuns.get(sessionKey);
|
|
if (!run?.isSubscribeOnly) {return false;}
|
|
if (run.status === "running") {return true;}
|
|
|
|
run.status = "running";
|
|
run._lifecycleEnded = false;
|
|
if (run._finalizeTimer) {clearTimeout(run._finalizeTimer); run._finalizeTimer = null;}
|
|
clearWaitingFinalizeTimer(run);
|
|
resetSubscribeRetryState(run);
|
|
|
|
run.accumulated = {
|
|
id: `assistant-${sessionKey}-${Date.now()}`,
|
|
role: "assistant",
|
|
parts: [],
|
|
};
|
|
|
|
const newChild = spawnAgentSubscribeProcess(sessionKey, run.lastGlobalSeq);
|
|
run._subscribeProcess = newChild;
|
|
run.childProcess = newChild;
|
|
wireSubscribeOnlyProcess(run, newChild, sessionKey);
|
|
return true;
|
|
}
|
|
|
|
/**
|
|
* Send a follow-up message to a subagent session via gateway RPC.
|
|
* The subscribe stream picks up the agent's response events.
|
|
*/
|
|
export function sendSubagentFollowUp(sessionKey: string, message: string): boolean {
|
|
try {
|
|
void callGatewayRpc(
|
|
"agent",
|
|
{
|
|
message,
|
|
sessionKey,
|
|
idempotencyKey: `follow-${Date.now()}-${Math.random().toString(36).slice(2)}`,
|
|
deliver: false,
|
|
channel: "webchat",
|
|
lane: "subagent",
|
|
timeout: 0,
|
|
},
|
|
{ timeoutMs: 10_000 },
|
|
).catch(() => {
|
|
// Best effort.
|
|
});
|
|
return true;
|
|
} catch {
|
|
return false;
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Persist a user message for a subscribe-only (subagent) run.
|
|
* Emits a user-message event so reconnecting clients see the message.
|
|
*/
|
|
export function persistSubscribeUserMessage(
|
|
sessionKey: string,
|
|
msg: { id?: string; text: string },
|
|
): boolean {
|
|
const run = activeRuns.get(sessionKey);
|
|
if (!run) {return false;}
|
|
const event: SseEvent = {
|
|
type: "user-message",
|
|
id: msg.id ?? `user-${Date.now()}-${Math.random().toString(36).slice(2)}`,
|
|
text: msg.text,
|
|
};
|
|
run.eventBuffer.push(event);
|
|
for (const sub of run.subscribers) {
|
|
try { sub(event); } catch { /* ignore */ }
|
|
}
|
|
schedulePersist(run);
|
|
return true;
|
|
}
|
|
|
|
/** Abort a running agent. Returns true if a run was actually aborted. */
|
|
export function abortRun(sessionId: string): boolean {
|
|
const run = activeRuns.get(sessionId);
|
|
if (!run || (run.status !== "running" && run.status !== "waiting-for-subagents")) {return false;}
|
|
|
|
// Immediately mark the run as non-running so hasActiveRun() returns
|
|
// false and the next user message isn't rejected with 409.
|
|
const wasWaiting = run.status === "waiting-for-subagents";
|
|
run.status = "error";
|
|
clearWaitingFinalizeTimer(run);
|
|
|
|
// Clean up waiting subscribe process if present.
|
|
stopSubscribeProcess(run);
|
|
|
|
run.abortController.abort();
|
|
if (!wasWaiting) {
|
|
run.childProcess.kill("SIGTERM");
|
|
}
|
|
|
|
// Send chat.abort directly to the gateway so the agent run stops
|
|
// even if the CLI child's best-effort onAbort doesn't complete in time.
|
|
sendGatewayAbort(sessionId);
|
|
|
|
// Flush persistence to save the partial response (without _streaming).
|
|
flushPersistence(run);
|
|
|
|
// Signal subscribers that the stream ended.
|
|
for (const sub of run.subscribers) {
|
|
try { sub(null); } catch { /* ignore */ }
|
|
}
|
|
run.subscribers.clear();
|
|
|
|
// Schedule grace-period cleanup (guard: only if we're still the active run).
|
|
setTimeout(() => {
|
|
if (activeRuns.get(sessionId) === run) {
|
|
cleanupRun(sessionId);
|
|
}
|
|
}, CLEANUP_GRACE_MS);
|
|
|
|
// Fallback: if the child doesn't exit within 5 seconds after
|
|
// SIGTERM (e.g. the CLI's best-effort chat.abort RPC hangs),
|
|
// send SIGKILL to force-terminate.
|
|
if (!wasWaiting) {
|
|
const killTimer = setTimeout(() => {
|
|
try {
|
|
run.childProcess.kill("SIGKILL");
|
|
} catch { /* already dead */ }
|
|
}, 5_000);
|
|
run.childProcess.once("close", () => clearTimeout(killTimer));
|
|
}
|
|
|
|
return true;
|
|
}
|
|
|
|
/**
|
|
* Send a `chat.abort` RPC directly to the gateway daemon via a short-lived
|
|
* CLI process. This is a belt-and-suspenders complement to the SIGTERM sent
|
|
* to the child: even if the child's best-effort `onAbort` callback doesn't
|
|
* reach the gateway in time, this separate process will.
|
|
*/
|
|
function sendGatewayAbort(sessionId: string): void {
|
|
try {
|
|
const sessionKey = `agent:main:web:${sessionId}`;
|
|
void callGatewayRpc("chat.abort", { sessionKey }, { timeoutMs: 4_000 }).catch(
|
|
() => {
|
|
// Best effort; don't let abort failures break the stop flow.
|
|
},
|
|
);
|
|
} catch {
|
|
// Best-effort; don't let abort failures break the stop flow.
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Start a new agent run for the given session.
|
|
* Throws if a run is already active for this session.
|
|
*/
|
|
export function startRun(params: {
|
|
sessionId: string;
|
|
message: string;
|
|
agentSessionId?: string;
|
|
}): ActiveRun {
|
|
const { sessionId, message, agentSessionId } = params;
|
|
|
|
const existing = activeRuns.get(sessionId);
|
|
if (existing?.status === "running") {
|
|
throw new Error("Active run already exists for this session");
|
|
}
|
|
// Clean up a finished run that's still in the grace period.
|
|
if (existing) {cleanupRun(sessionId);}
|
|
|
|
const abortController = new AbortController();
|
|
const child = spawnAgentProcess(message, agentSessionId);
|
|
|
|
const run: ActiveRun = {
|
|
sessionId,
|
|
childProcess: child,
|
|
eventBuffer: [],
|
|
subscribers: new Set(),
|
|
accumulated: {
|
|
id: `assistant-${sessionId}-${Date.now()}`,
|
|
role: "assistant",
|
|
parts: [],
|
|
},
|
|
status: "running",
|
|
startedAt: Date.now(),
|
|
exitCode: null,
|
|
abortController,
|
|
_persistTimer: null,
|
|
_lastPersistedAt: 0,
|
|
lastGlobalSeq: 0,
|
|
_subscribeRetryTimer: null,
|
|
_subscribeRetryAttempt: 0,
|
|
_waitingFinalizeTimer: null,
|
|
};
|
|
|
|
activeRuns.set(sessionId, run);
|
|
|
|
// Wire abort signal → child process kill.
|
|
const onAbort = () => child.kill("SIGTERM");
|
|
if (abortController.signal.aborted) {
|
|
child.kill("SIGTERM");
|
|
} else {
|
|
abortController.signal.addEventListener("abort", onAbort, {
|
|
once: true,
|
|
});
|
|
child.on("close", () =>
|
|
abortController.signal.removeEventListener("abort", onAbort),
|
|
);
|
|
}
|
|
|
|
wireChildProcess(run);
|
|
return run;
|
|
}
|
|
|
|
/**
|
|
* Start a subscribe-only run for a subagent session.
|
|
* The agent is already running in the gateway; we just subscribe to its
|
|
* event stream so buffering, persistence, and reconnection work identically
|
|
* to parent sessions.
|
|
*/
|
|
export function startSubscribeRun(params: {
|
|
sessionKey: string;
|
|
parentSessionId: string;
|
|
task: string;
|
|
label?: string;
|
|
}): ActiveRun {
|
|
const { sessionKey, parentSessionId, task, label } = params;
|
|
|
|
if (activeRuns.has(sessionKey)) {
|
|
return activeRuns.get(sessionKey)!;
|
|
}
|
|
|
|
const abortController = new AbortController();
|
|
const subscribeChild = spawnAgentSubscribeProcess(sessionKey, 0);
|
|
|
|
const run: ActiveRun = {
|
|
sessionId: sessionKey,
|
|
childProcess: subscribeChild,
|
|
eventBuffer: [],
|
|
subscribers: new Set(),
|
|
accumulated: {
|
|
id: `assistant-${sessionKey}-${Date.now()}`,
|
|
role: "assistant",
|
|
parts: [],
|
|
},
|
|
status: "running",
|
|
startedAt: Date.now(),
|
|
exitCode: null,
|
|
abortController,
|
|
_persistTimer: null,
|
|
_lastPersistedAt: 0,
|
|
lastGlobalSeq: 0,
|
|
sessionKey,
|
|
parentSessionId,
|
|
task,
|
|
label,
|
|
isSubscribeOnly: true,
|
|
_lifecycleEnded: false,
|
|
_finalizeTimer: null,
|
|
_subscribeRetryTimer: null,
|
|
_subscribeRetryAttempt: 0,
|
|
_waitingFinalizeTimer: null,
|
|
};
|
|
|
|
activeRuns.set(sessionKey, run);
|
|
wireSubscribeOnlyProcess(run, subscribeChild, sessionKey);
|
|
return run;
|
|
}
|
|
|
|
/**
|
|
* Wire event processing for a subscribe-only run (subagent).
|
|
* Uses the same processParentEvent pipeline as parent runs,
|
|
* with deferred finalization on lifecycle/end.
|
|
*/
|
|
function wireSubscribeOnlyProcess(
|
|
run: ActiveRun,
|
|
child: AgentProcessHandle,
|
|
sessionKey: string,
|
|
): void {
|
|
let idCounter = 0;
|
|
const nextId = (prefix: string) =>
|
|
`${prefix}-${Date.now()}-${++idCounter}`;
|
|
|
|
let currentTextId = "";
|
|
let currentReasoningId = "";
|
|
let currentStatusReasoningLabel: string | null = null;
|
|
let textStarted = false;
|
|
let reasoningStarted = false;
|
|
let statusReasoningActive = false;
|
|
let agentErrorReported = false;
|
|
|
|
let accTextIdx = -1;
|
|
let accReasoningIdx = -1;
|
|
const accToolMap = new Map<string, number>();
|
|
|
|
const accAppendReasoning = (delta: string) => {
|
|
if (accReasoningIdx < 0) {
|
|
run.accumulated.parts.push({ type: "reasoning", text: delta });
|
|
accReasoningIdx = run.accumulated.parts.length - 1;
|
|
} else {
|
|
(run.accumulated.parts[accReasoningIdx] as { type: "reasoning"; text: string }).text += delta;
|
|
}
|
|
};
|
|
|
|
const accAppendText = (delta: string) => {
|
|
if (accTextIdx < 0) {
|
|
run.accumulated.parts.push({ type: "text", text: delta });
|
|
accTextIdx = run.accumulated.parts.length - 1;
|
|
} else {
|
|
(run.accumulated.parts[accTextIdx] as { type: "text"; text: string }).text += delta;
|
|
}
|
|
};
|
|
|
|
const emit = (event: SseEvent) => {
|
|
run.eventBuffer.push(event);
|
|
for (const sub of run.subscribers) {
|
|
try { sub(event); } catch { /* ignore */ }
|
|
}
|
|
schedulePersist(run);
|
|
};
|
|
|
|
const emitError = (message: string) => {
|
|
closeReasoning();
|
|
closeText();
|
|
const tid = nextId("text");
|
|
emit({ type: "text-start", id: tid });
|
|
emit({ type: "text-delta", id: tid, delta: `[error] ${message}` });
|
|
emit({ type: "text-end", id: tid });
|
|
accAppendText(`[error] ${message}`);
|
|
};
|
|
|
|
const closeReasoning = () => {
|
|
if (reasoningStarted) {
|
|
emit({ type: "reasoning-end", id: currentReasoningId });
|
|
reasoningStarted = false;
|
|
statusReasoningActive = false;
|
|
}
|
|
currentStatusReasoningLabel = null;
|
|
accReasoningIdx = -1;
|
|
};
|
|
|
|
const closeText = () => {
|
|
if (textStarted) {
|
|
const lastPart = run.accumulated.parts[accTextIdx];
|
|
if (lastPart?.type === "text" && isLeakedSilentReplyToken(lastPart.text)) {
|
|
run.accumulated.parts.splice(accTextIdx, 1);
|
|
}
|
|
emit({ type: "text-end", id: currentTextId });
|
|
textStarted = false;
|
|
}
|
|
accTextIdx = -1;
|
|
};
|
|
|
|
const openStatusReasoning = (label: string) => {
|
|
if (statusReasoningActive && currentStatusReasoningLabel === label) {
|
|
return;
|
|
}
|
|
closeReasoning();
|
|
closeText();
|
|
currentReasoningId = nextId("status");
|
|
emit({ type: "reasoning-start", id: currentReasoningId });
|
|
emit({ type: "reasoning-delta", id: currentReasoningId, delta: label });
|
|
reasoningStarted = true;
|
|
statusReasoningActive = true;
|
|
currentStatusReasoningLabel = label;
|
|
};
|
|
|
|
const processEvent = (ev: AgentEvent) => {
|
|
const isLifecycleEndEvent =
|
|
ev.event === "agent" &&
|
|
ev.stream === "lifecycle" &&
|
|
ev.data?.phase === "end";
|
|
if (!isLifecycleEndEvent) {
|
|
if (run._finalizeTimer) {
|
|
clearTimeout(run._finalizeTimer);
|
|
run._finalizeTimer = null;
|
|
}
|
|
run._lifecycleEnded = false;
|
|
}
|
|
|
|
if (ev.event === "agent" && ev.stream === "lifecycle" && ev.data?.phase === "start") {
|
|
openStatusReasoning("Preparing response...");
|
|
}
|
|
|
|
if (ev.event === "agent" && ev.stream === "thinking") {
|
|
const delta = typeof ev.data?.delta === "string" ? ev.data.delta : undefined;
|
|
if (delta) {
|
|
if (statusReasoningActive) { closeReasoning(); }
|
|
if (!reasoningStarted) {
|
|
currentReasoningId = nextId("reasoning");
|
|
emit({ type: "reasoning-start", id: currentReasoningId });
|
|
reasoningStarted = true;
|
|
}
|
|
emit({ type: "reasoning-delta", id: currentReasoningId, delta });
|
|
accAppendReasoning(delta);
|
|
}
|
|
}
|
|
|
|
if (ev.event === "agent" && ev.stream === "assistant") {
|
|
const delta = typeof ev.data?.delta === "string" ? ev.data.delta : undefined;
|
|
const textFallback = !delta && typeof ev.data?.text === "string" ? ev.data.text : undefined;
|
|
const chunk = delta ?? textFallback;
|
|
if (chunk) {
|
|
closeReasoning();
|
|
if (!textStarted) {
|
|
currentTextId = nextId("text");
|
|
emit({ type: "text-start", id: currentTextId });
|
|
textStarted = true;
|
|
}
|
|
emit({ type: "text-delta", id: currentTextId, delta: chunk });
|
|
accAppendText(chunk);
|
|
}
|
|
const mediaUrls = ev.data?.mediaUrls;
|
|
if (Array.isArray(mediaUrls)) {
|
|
for (const url of mediaUrls) {
|
|
if (typeof url === "string" && url.trim()) {
|
|
closeReasoning();
|
|
if (!textStarted) {
|
|
currentTextId = nextId("text");
|
|
emit({ type: "text-start", id: currentTextId });
|
|
textStarted = true;
|
|
}
|
|
const md = `\n})\n`;
|
|
emit({ type: "text-delta", id: currentTextId, delta: md });
|
|
accAppendText(md);
|
|
}
|
|
}
|
|
}
|
|
if (typeof ev.data?.stopReason === "string" && ev.data.stopReason === "error" && typeof ev.data?.errorMessage === "string" && !agentErrorReported) {
|
|
agentErrorReported = true;
|
|
emitError(parseErrorBody(ev.data.errorMessage));
|
|
}
|
|
}
|
|
|
|
if (ev.event === "agent" && ev.stream === "tool") {
|
|
const phase = typeof ev.data?.phase === "string" ? ev.data.phase : undefined;
|
|
const toolCallId = typeof ev.data?.toolCallId === "string" ? ev.data.toolCallId : "";
|
|
const toolName = typeof ev.data?.name === "string" ? ev.data.name : "";
|
|
|
|
if (phase === "start") {
|
|
closeReasoning();
|
|
closeText();
|
|
const args = ev.data?.args && typeof ev.data.args === "object" ? (ev.data.args as Record<string, unknown>) : {};
|
|
emit({ type: "tool-input-start", toolCallId, toolName });
|
|
emit({ type: "tool-input-available", toolCallId, toolName, input: args });
|
|
run.accumulated.parts.push({ type: "tool-invocation", toolCallId, toolName, args });
|
|
accToolMap.set(toolCallId, run.accumulated.parts.length - 1);
|
|
} else if (phase === "result") {
|
|
const isError = ev.data?.isError === true;
|
|
const result = extractToolResult(ev.data?.result);
|
|
if (isError) {
|
|
const errorText = result?.text || (result?.details?.error as string | undefined) || "Tool execution failed";
|
|
emit({ type: "tool-output-error", toolCallId, errorText });
|
|
const idx = accToolMap.get(toolCallId);
|
|
if (idx !== undefined) {
|
|
const part = run.accumulated.parts[idx];
|
|
if (part.type === "tool-invocation") {
|
|
part.errorText = errorText;
|
|
}
|
|
}
|
|
} else {
|
|
const output = buildToolOutput(result);
|
|
emit({ type: "tool-output-available", toolCallId, output });
|
|
const idx = accToolMap.get(toolCallId);
|
|
if (idx !== undefined) {
|
|
const part = run.accumulated.parts[idx];
|
|
if (part.type === "tool-invocation") { part.result = output; }
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
if (ev.event === "agent" && ev.stream === "compaction") {
|
|
const phase = typeof ev.data?.phase === "string" ? ev.data.phase : undefined;
|
|
if (phase === "start") { openStatusReasoning("Optimizing session context..."); }
|
|
else if (phase === "end") {
|
|
if (statusReasoningActive) {
|
|
if (ev.data?.willRetry === true) {
|
|
const retryDelta = "\nRetrying with compacted context...";
|
|
emit({ type: "reasoning-delta", id: currentReasoningId, delta: retryDelta });
|
|
accAppendReasoning(retryDelta);
|
|
} else { closeReasoning(); }
|
|
}
|
|
}
|
|
}
|
|
|
|
if (ev.event === "chat") {
|
|
const finalText = extractAssistantTextFromChatPayload(ev.data);
|
|
if (finalText) {
|
|
closeReasoning();
|
|
if (!textStarted) {
|
|
currentTextId = nextId("text");
|
|
emit({ type: "text-start", id: currentTextId });
|
|
textStarted = true;
|
|
}
|
|
emit({ type: "text-delta", id: currentTextId, delta: finalText });
|
|
accAppendText(finalText);
|
|
closeText();
|
|
}
|
|
}
|
|
|
|
if (ev.event === "agent" && ev.stream === "lifecycle" && ev.data?.phase === "end") {
|
|
closeReasoning();
|
|
closeText();
|
|
run._lifecycleEnded = true;
|
|
if (run._finalizeTimer) { clearTimeout(run._finalizeTimer); }
|
|
run._finalizeTimer = setTimeout(() => {
|
|
run._finalizeTimer = null;
|
|
if (run.status === "running") { finalizeSubscribeRun(run); }
|
|
}, SUBSCRIBE_LIFECYCLE_END_GRACE_MS);
|
|
}
|
|
|
|
if (ev.event === "agent" && ev.stream === "lifecycle" && ev.data?.phase === "error" && !agentErrorReported) {
|
|
const msg = parseAgentErrorMessage(ev.data);
|
|
if (msg) { agentErrorReported = true; emitError(msg); }
|
|
finalizeSubscribeRun(run, "error");
|
|
}
|
|
|
|
if (ev.event === "error" && !agentErrorReported) {
|
|
const msg = parseAgentErrorMessage(ev.data ?? (ev as unknown as Record<string, unknown>));
|
|
if (msg) { agentErrorReported = true; emitError(msg); }
|
|
}
|
|
};
|
|
|
|
const rl = createInterface({ input: child.stdout! });
|
|
|
|
rl.on("line", (line: string) => {
|
|
if (!line.trim()) { return; }
|
|
let ev: AgentEvent;
|
|
try { ev = JSON.parse(line) as AgentEvent; } catch { return; }
|
|
if (ev.sessionKey && ev.sessionKey !== sessionKey) { return; }
|
|
const gSeq = typeof (ev as Record<string, unknown>).globalSeq === "number"
|
|
? (ev as Record<string, unknown>).globalSeq as number
|
|
: undefined;
|
|
if (gSeq !== undefined) {
|
|
if (gSeq <= run.lastGlobalSeq) { return; }
|
|
run.lastGlobalSeq = gSeq;
|
|
}
|
|
if ((run._subscribeRetryAttempt ?? 0) > 0) {
|
|
resetSubscribeRetryState(run);
|
|
}
|
|
processEvent(ev);
|
|
});
|
|
|
|
child.on("close", () => {
|
|
if (run._subscribeProcess !== child) {
|
|
return;
|
|
}
|
|
run._subscribeProcess = null;
|
|
if (run.status !== "running") { return; }
|
|
if (run._lifecycleEnded) {
|
|
if (run._finalizeTimer) { clearTimeout(run._finalizeTimer); run._finalizeTimer = null; }
|
|
finalizeSubscribeRun(run);
|
|
return;
|
|
}
|
|
scheduleSubscribeRestart(run, () => {
|
|
if (run.status === "running" && !run._subscribeProcess) {
|
|
const newChild = spawnAgentSubscribeProcess(sessionKey, run.lastGlobalSeq);
|
|
run._subscribeProcess = newChild;
|
|
run.childProcess = newChild;
|
|
wireSubscribeOnlyProcess(run, newChild, sessionKey);
|
|
}
|
|
});
|
|
});
|
|
|
|
child.on("error", (err) => {
|
|
console.error("[active-runs] Subscribe child error:", err);
|
|
});
|
|
|
|
child.stderr?.on("data", (chunk: Buffer) => {
|
|
console.error("[active-runs subscribe stderr]", chunk.toString());
|
|
});
|
|
|
|
run._subscribeProcess = child;
|
|
}
|
|
|
|
function finalizeSubscribeRun(run: ActiveRun, status: "completed" | "error" = "completed"): void {
|
|
if (run.status !== "running") { return; }
|
|
if (run._finalizeTimer) { clearTimeout(run._finalizeTimer); run._finalizeTimer = null; }
|
|
clearWaitingFinalizeTimer(run);
|
|
resetSubscribeRetryState(run);
|
|
|
|
run.status = status;
|
|
flushPersistence(run);
|
|
|
|
for (const sub of run.subscribers) {
|
|
try { sub(null); } catch { /* ignore */ }
|
|
}
|
|
run.subscribers.clear();
|
|
|
|
stopSubscribeProcess(run);
|
|
|
|
const grace = run.isSubscribeOnly ? SUBSCRIBE_CLEANUP_GRACE_MS : CLEANUP_GRACE_MS;
|
|
setTimeout(() => {
|
|
if (activeRuns.get(run.sessionId) === run) { cleanupRun(run.sessionId); }
|
|
}, grace);
|
|
}
|
|
|
|
// ── Persistence helpers (called from route to persist user messages) ──
|
|
|
|
/** Save a user message to the session JSONL (called once at run start). */
|
|
export function persistUserMessage(
|
|
sessionId: string,
|
|
msg: { id: string; content: string; parts?: unknown[] },
|
|
): void {
|
|
ensureDir();
|
|
const filePath = join(webChatDir(), `${sessionId}.jsonl`);
|
|
if (!existsSync(filePath)) {writeFileSync(filePath, "");}
|
|
|
|
const line = JSON.stringify({
|
|
id: msg.id,
|
|
role: "user",
|
|
content: msg.content,
|
|
...(msg.parts ? { parts: msg.parts } : {}),
|
|
timestamp: new Date().toISOString(),
|
|
});
|
|
|
|
// Avoid duplicates (e.g. retry).
|
|
const existing = readFileSync(filePath, "utf-8");
|
|
const lines = existing.split("\n").filter((l) => l.trim());
|
|
const alreadySaved = lines.some((l) => {
|
|
try {
|
|
return JSON.parse(l).id === msg.id;
|
|
} catch {
|
|
return false;
|
|
}
|
|
});
|
|
|
|
if (!alreadySaved) {
|
|
writeFileSync(filePath, [...lines, line].join("\n") + "\n");
|
|
updateIndex(sessionId, { incrementCount: 1 });
|
|
}
|
|
}
|
|
|
|
// ── Internals ──
|
|
|
|
function ensureDir() {
|
|
const dir = webChatDir();
|
|
if (!existsSync(dir)) {
|
|
mkdirSync(dir, { recursive: true });
|
|
}
|
|
}
|
|
|
|
function updateIndex(
|
|
sessionId: string,
|
|
opts: { incrementCount?: number; title?: string },
|
|
) {
|
|
try {
|
|
const idxPath = indexFile();
|
|
let index: Array<Record<string, unknown>>;
|
|
if (!existsSync(idxPath)) {
|
|
// Auto-create index with a bootstrap entry for this session so
|
|
// orphaned .jsonl files become visible in the sidebar.
|
|
index = [{
|
|
id: sessionId,
|
|
title: opts.title || "New Chat",
|
|
createdAt: Date.now(),
|
|
updatedAt: Date.now(),
|
|
messageCount: opts.incrementCount || 0,
|
|
}];
|
|
writeFileSync(idxPath, JSON.stringify(index, null, 2));
|
|
return;
|
|
}
|
|
index = JSON.parse(
|
|
readFileSync(idxPath, "utf-8"),
|
|
) as Array<Record<string, unknown>>;
|
|
let session = index.find((s) => s.id === sessionId);
|
|
if (!session) {
|
|
// Session file exists but wasn't indexed — add it.
|
|
session = {
|
|
id: sessionId,
|
|
title: opts.title || "New Chat",
|
|
createdAt: Date.now(),
|
|
updatedAt: Date.now(),
|
|
messageCount: 0,
|
|
};
|
|
index.unshift(session);
|
|
}
|
|
session.updatedAt = Date.now();
|
|
if (opts.incrementCount) {
|
|
session.messageCount =
|
|
((session.messageCount as number) || 0) + opts.incrementCount;
|
|
}
|
|
if (opts.title) {session.title = opts.title;}
|
|
writeFileSync(idxPath, JSON.stringify(index, null, 2));
|
|
} catch {
|
|
/* best-effort */
|
|
}
|
|
}
|
|
|
|
// ── SSE event generation from child-process JSON lines ──
|
|
|
|
function wireChildProcess(run: ActiveRun): void {
|
|
const child = run.childProcess;
|
|
|
|
let idCounter = 0;
|
|
const nextId = (prefix: string) =>
|
|
`${prefix}-${Date.now()}-${++idCounter}`;
|
|
|
|
let currentTextId = "";
|
|
let currentReasoningId = "";
|
|
let currentStatusReasoningLabel: string | null = null;
|
|
let textStarted = false;
|
|
let reasoningStarted = false;
|
|
let everSentText = false;
|
|
let statusReasoningActive = false;
|
|
let waitingStatusAnnounced = false;
|
|
let agentErrorReported = false;
|
|
const stderrChunks: string[] = [];
|
|
|
|
// ── Ordered accumulation tracking (preserves interleaving for persistence) ──
|
|
let accTextIdx = -1;
|
|
let accReasoningIdx = -1;
|
|
const accToolMap = new Map<string, number>();
|
|
|
|
const accAppendReasoning = (delta: string) => {
|
|
if (accReasoningIdx < 0) {
|
|
run.accumulated.parts.push({ type: "reasoning", text: delta });
|
|
accReasoningIdx = run.accumulated.parts.length - 1;
|
|
} else {
|
|
(run.accumulated.parts[accReasoningIdx] as { type: "reasoning"; text: string }).text += delta;
|
|
}
|
|
};
|
|
|
|
const accAppendText = (delta: string) => {
|
|
if (accTextIdx < 0) {
|
|
run.accumulated.parts.push({ type: "text", text: delta });
|
|
accTextIdx = run.accumulated.parts.length - 1;
|
|
} else {
|
|
(run.accumulated.parts[accTextIdx] as { type: "text"; text: string }).text += delta;
|
|
}
|
|
};
|
|
|
|
/** Emit an SSE event: push to buffer + notify all subscribers. */
|
|
const emit = (event: SseEvent) => {
|
|
run.eventBuffer.push(event);
|
|
for (const sub of run.subscribers) {
|
|
try {
|
|
sub(event);
|
|
} catch {
|
|
/* ignore subscriber errors */
|
|
}
|
|
}
|
|
schedulePersist(run);
|
|
};
|
|
|
|
const closeReasoning = () => {
|
|
if (reasoningStarted) {
|
|
emit({ type: "reasoning-end", id: currentReasoningId });
|
|
reasoningStarted = false;
|
|
statusReasoningActive = false;
|
|
}
|
|
currentStatusReasoningLabel = null;
|
|
accReasoningIdx = -1;
|
|
};
|
|
|
|
const closeText = () => {
|
|
if (textStarted) {
|
|
if (accTextIdx >= 0) {
|
|
const part = run.accumulated.parts[accTextIdx] as { type: "text"; text: string };
|
|
if (isLeakedSilentReplyToken(part.text)) {
|
|
run.accumulated.parts.splice(accTextIdx, 1);
|
|
for (const [k, v] of accToolMap) {
|
|
if (v > accTextIdx) { accToolMap.set(k, v - 1); }
|
|
}
|
|
}
|
|
}
|
|
emit({ type: "text-end", id: currentTextId });
|
|
textStarted = false;
|
|
}
|
|
accTextIdx = -1;
|
|
};
|
|
|
|
const openStatusReasoning = (label: string) => {
|
|
if (statusReasoningActive && currentStatusReasoningLabel === label) {
|
|
return;
|
|
}
|
|
closeReasoning();
|
|
closeText();
|
|
currentReasoningId = nextId("status");
|
|
emit({ type: "reasoning-start", id: currentReasoningId });
|
|
emit({
|
|
type: "reasoning-delta",
|
|
id: currentReasoningId,
|
|
delta: label,
|
|
});
|
|
reasoningStarted = true;
|
|
statusReasoningActive = true;
|
|
currentStatusReasoningLabel = label;
|
|
accAppendReasoning(label);
|
|
};
|
|
|
|
const emitError = (message: string) => {
|
|
closeReasoning();
|
|
closeText();
|
|
const tid = nextId("text");
|
|
emit({ type: "text-start", id: tid });
|
|
emit({ type: "text-delta", id: tid, delta: `[error] ${message}` });
|
|
emit({ type: "text-end", id: tid });
|
|
accAppendText(`[error] ${message}`);
|
|
accTextIdx = -1; // error text is self-contained
|
|
everSentText = true;
|
|
};
|
|
|
|
const emitAssistantFinalText = (text: string) => {
|
|
if (!text) {
|
|
return;
|
|
}
|
|
closeReasoning();
|
|
if (!textStarted) {
|
|
currentTextId = nextId("text");
|
|
emit({ type: "text-start", id: currentTextId });
|
|
textStarted = true;
|
|
}
|
|
everSentText = true;
|
|
emit({ type: "text-delta", id: currentTextId, delta: text });
|
|
accAppendText(text);
|
|
closeText();
|
|
};
|
|
|
|
// ── Parse stdout JSON lines ──
|
|
|
|
const rl = createInterface({ input: child.stdout! });
|
|
const parentSessionKey = `agent:main:web:${run.sessionId}`;
|
|
// Prevent unhandled 'error' events on the readline interface.
|
|
// When the child process fails to start (e.g. ENOENT — missing script)
|
|
// the stdout pipe is destroyed and readline re-emits the error. Without
|
|
// this handler Node.js throws "Unhandled 'error' event" which crashes
|
|
// the API route instead of surfacing a clean message to the user.
|
|
rl.on("error", () => {
|
|
// Swallow — the child 'error' / 'close' handlers take care of
|
|
// emitting user-visible diagnostics.
|
|
});
|
|
|
|
// ── Reusable parent event processor ──
|
|
// Handles lifecycle, thinking, assistant text, tool, compaction, and error
|
|
// events for the parent agent. Used by both the CLI NDJSON stream and the
|
|
// subscribe-only CLI fallback (waiting-for-subagents state).
|
|
|
|
const processParentEvent = (ev: AgentEvent) => {
|
|
// Lifecycle start
|
|
if (
|
|
ev.event === "agent" &&
|
|
ev.stream === "lifecycle" &&
|
|
ev.data?.phase === "start"
|
|
) {
|
|
openStatusReasoning("Preparing response...");
|
|
}
|
|
|
|
// Thinking / reasoning
|
|
if (ev.event === "agent" && ev.stream === "thinking") {
|
|
const delta =
|
|
typeof ev.data?.delta === "string"
|
|
? ev.data.delta
|
|
: undefined;
|
|
if (delta) {
|
|
if (statusReasoningActive) {closeReasoning();}
|
|
if (!reasoningStarted) {
|
|
currentReasoningId = nextId("reasoning");
|
|
emit({
|
|
type: "reasoning-start",
|
|
id: currentReasoningId,
|
|
});
|
|
reasoningStarted = true;
|
|
}
|
|
emit({
|
|
type: "reasoning-delta",
|
|
id: currentReasoningId,
|
|
delta,
|
|
});
|
|
accAppendReasoning(delta);
|
|
}
|
|
}
|
|
|
|
// Assistant text
|
|
if (ev.event === "agent" && ev.stream === "assistant") {
|
|
const delta =
|
|
typeof ev.data?.delta === "string"
|
|
? ev.data.delta
|
|
: undefined;
|
|
const textFallback =
|
|
!delta && typeof ev.data?.text === "string"
|
|
? ev.data.text
|
|
: undefined;
|
|
const chunk = delta ?? textFallback;
|
|
if (chunk) {
|
|
closeReasoning();
|
|
if (!textStarted) {
|
|
currentTextId = nextId("text");
|
|
emit({ type: "text-start", id: currentTextId });
|
|
textStarted = true;
|
|
}
|
|
everSentText = true;
|
|
emit({ type: "text-delta", id: currentTextId, delta: chunk });
|
|
accAppendText(chunk);
|
|
}
|
|
// Media URLs
|
|
const mediaUrls = ev.data?.mediaUrls;
|
|
if (Array.isArray(mediaUrls)) {
|
|
for (const url of mediaUrls) {
|
|
if (typeof url === "string" && url.trim()) {
|
|
closeReasoning();
|
|
if (!textStarted) {
|
|
currentTextId = nextId("text");
|
|
emit({
|
|
type: "text-start",
|
|
id: currentTextId,
|
|
});
|
|
textStarted = true;
|
|
}
|
|
everSentText = true;
|
|
const md = `\n})\n`;
|
|
emit({
|
|
type: "text-delta",
|
|
id: currentTextId,
|
|
delta: md,
|
|
});
|
|
accAppendText(md);
|
|
}
|
|
}
|
|
}
|
|
// Agent error inline (stopReason=error)
|
|
if (
|
|
typeof ev.data?.stopReason === "string" &&
|
|
ev.data.stopReason === "error" &&
|
|
typeof ev.data?.errorMessage === "string" &&
|
|
!agentErrorReported
|
|
) {
|
|
agentErrorReported = true;
|
|
emitError(parseErrorBody(ev.data.errorMessage));
|
|
}
|
|
}
|
|
|
|
// Tool events
|
|
if (ev.event === "agent" && ev.stream === "tool") {
|
|
const phase =
|
|
typeof ev.data?.phase === "string"
|
|
? ev.data.phase
|
|
: undefined;
|
|
const toolCallId =
|
|
typeof ev.data?.toolCallId === "string"
|
|
? ev.data.toolCallId
|
|
: "";
|
|
const toolName =
|
|
typeof ev.data?.name === "string" ? ev.data.name : "";
|
|
|
|
if (phase === "start") {
|
|
closeReasoning();
|
|
closeText();
|
|
const args =
|
|
ev.data?.args && typeof ev.data.args === "object"
|
|
? (ev.data.args as Record<string, unknown>)
|
|
: {};
|
|
emit({ type: "tool-input-start", toolCallId, toolName });
|
|
emit({
|
|
type: "tool-input-available",
|
|
toolCallId,
|
|
toolName,
|
|
input: args,
|
|
});
|
|
run.accumulated.parts.push({
|
|
type: "tool-invocation",
|
|
toolCallId,
|
|
toolName,
|
|
args,
|
|
});
|
|
accToolMap.set(toolCallId, run.accumulated.parts.length - 1);
|
|
} else if (phase === "result") {
|
|
const isError = ev.data?.isError === true;
|
|
const result = extractToolResult(ev.data?.result);
|
|
if (isError) {
|
|
const errorText =
|
|
result?.text ||
|
|
(result?.details?.error as string | undefined) ||
|
|
"Tool execution failed";
|
|
emit({
|
|
type: "tool-output-error",
|
|
toolCallId,
|
|
errorText,
|
|
});
|
|
const idx = accToolMap.get(toolCallId);
|
|
if (idx !== undefined) {
|
|
const part = run.accumulated.parts[idx];
|
|
if (part.type === "tool-invocation") {
|
|
part.errorText = errorText;
|
|
}
|
|
}
|
|
} else {
|
|
const output = buildToolOutput(result);
|
|
emit({
|
|
type: "tool-output-available",
|
|
toolCallId,
|
|
output,
|
|
});
|
|
const idx = accToolMap.get(toolCallId);
|
|
if (idx !== undefined) {
|
|
const part = run.accumulated.parts[idx];
|
|
if (part.type === "tool-invocation") {
|
|
part.result = output;
|
|
}
|
|
}
|
|
}
|
|
|
|
if (toolName === "sessions_spawn" && !isError) {
|
|
const childSessionKey =
|
|
result?.details?.childSessionKey as string | undefined;
|
|
if (childSessionKey) {
|
|
const spawnArgs = accToolMap.has(toolCallId)
|
|
? (run.accumulated.parts[accToolMap.get(toolCallId)!] as { args?: Record<string, unknown> })?.args
|
|
: undefined;
|
|
startSubscribeRun({
|
|
sessionKey: childSessionKey,
|
|
parentSessionId: run.sessionId,
|
|
task: (spawnArgs?.task as string | undefined) ?? "Subagent task",
|
|
label: spawnArgs?.label as string | undefined,
|
|
});
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// Chat final events can include assistant turns from runs outside
|
|
// the original parent process (e.g. subagent announce follow-ups).
|
|
if (ev.event === "chat") {
|
|
const text = extractAssistantTextFromChatPayload(ev.data);
|
|
if (text) {
|
|
emitAssistantFinalText(text);
|
|
}
|
|
}
|
|
|
|
// Compaction
|
|
if (ev.event === "agent" && ev.stream === "compaction") {
|
|
const phase =
|
|
typeof ev.data?.phase === "string"
|
|
? ev.data.phase
|
|
: undefined;
|
|
if (phase === "start") {
|
|
openStatusReasoning("Optimizing session context...");
|
|
} else if (phase === "end") {
|
|
if (statusReasoningActive) {
|
|
if (ev.data?.willRetry === true) {
|
|
const retryDelta = "\nRetrying with compacted context...";
|
|
emit({
|
|
type: "reasoning-delta",
|
|
id: currentReasoningId,
|
|
delta: retryDelta,
|
|
});
|
|
accAppendReasoning(retryDelta);
|
|
} else {
|
|
closeReasoning();
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// Lifecycle end
|
|
if (
|
|
ev.event === "agent" &&
|
|
ev.stream === "lifecycle" &&
|
|
ev.data?.phase === "end"
|
|
) {
|
|
closeReasoning();
|
|
closeText();
|
|
}
|
|
|
|
// Lifecycle error
|
|
if (
|
|
ev.event === "agent" &&
|
|
ev.stream === "lifecycle" &&
|
|
ev.data?.phase === "error" &&
|
|
!agentErrorReported
|
|
) {
|
|
const msg = parseAgentErrorMessage(ev.data);
|
|
if (msg) {
|
|
agentErrorReported = true;
|
|
emitError(msg);
|
|
}
|
|
}
|
|
|
|
// Top-level error event
|
|
if (ev.event === "error" && !agentErrorReported) {
|
|
const msg = parseAgentErrorMessage(
|
|
ev.data ??
|
|
(ev as unknown as Record<string, unknown>),
|
|
);
|
|
if (msg) {
|
|
agentErrorReported = true;
|
|
emitError(msg);
|
|
}
|
|
}
|
|
};
|
|
|
|
const processParentSubscribeEvent = (ev: AgentEvent) => {
|
|
const gSeq = typeof (ev as Record<string, unknown>).globalSeq === "number"
|
|
? (ev as Record<string, unknown>).globalSeq as number
|
|
: undefined;
|
|
if (gSeq !== undefined) {
|
|
if (gSeq <= run.lastGlobalSeq) {return;}
|
|
run.lastGlobalSeq = gSeq;
|
|
}
|
|
|
|
const showWaitingStatus = () => {
|
|
if (!waitingStatusAnnounced) {
|
|
openStatusReasoning("Waiting for subagent results...");
|
|
waitingStatusAnnounced = true;
|
|
}
|
|
flushPersistence(run);
|
|
};
|
|
|
|
const scheduleWaitingCompletionCheck = () => {
|
|
clearWaitingFinalizeTimer(run);
|
|
run._waitingFinalizeTimer = setTimeout(() => {
|
|
run._waitingFinalizeTimer = null;
|
|
if (run.status !== "waiting-for-subagents") {
|
|
return;
|
|
}
|
|
if (hasRunningSubagentsForParent(run.sessionId)) {
|
|
showWaitingStatus();
|
|
return;
|
|
}
|
|
finalizeWaitingRun(run);
|
|
}, WAITING_FINALIZE_RECONCILE_MS);
|
|
};
|
|
|
|
// Any new parent event means waiting completion should be reconsidered
|
|
// from this point forward, not from a prior end/final checkpoint.
|
|
clearWaitingFinalizeTimer(run);
|
|
|
|
processParentEvent(ev);
|
|
if (ev.stream === "lifecycle" && ev.data?.phase === "end") {
|
|
if (hasRunningSubagentsForParent(run.sessionId)) {
|
|
clearWaitingFinalizeTimer(run);
|
|
showWaitingStatus();
|
|
} else {
|
|
scheduleWaitingCompletionCheck();
|
|
}
|
|
}
|
|
if (ev.event === "chat") {
|
|
const payload = ev.data;
|
|
const state = typeof payload?.state === "string" ? payload.state : "";
|
|
const message = asRecord(payload?.message);
|
|
const role = typeof message?.role === "string" ? message.role : "";
|
|
if (state === "final" && role === "assistant") {
|
|
if (hasRunningSubagentsForParent(run.sessionId)) {
|
|
clearWaitingFinalizeTimer(run);
|
|
showWaitingStatus();
|
|
} else {
|
|
scheduleWaitingCompletionCheck();
|
|
}
|
|
}
|
|
}
|
|
};
|
|
|
|
rl.on("line", (line: string) => {
|
|
if (!line.trim()) {return;}
|
|
|
|
let ev: AgentEvent;
|
|
try {
|
|
ev = JSON.parse(line) as AgentEvent;
|
|
} catch {
|
|
return;
|
|
}
|
|
|
|
// Skip events from other sessions (e.g. subagent broadcasts that
|
|
// the gateway delivers on the same WS connection).
|
|
if (ev.sessionKey && ev.sessionKey !== parentSessionKey) {
|
|
return;
|
|
}
|
|
|
|
// Track the global event cursor from the gateway for replay on handoff.
|
|
const gSeq = typeof (ev as Record<string, unknown>).globalSeq === "number"
|
|
? (ev as Record<string, unknown>).globalSeq as number
|
|
: undefined;
|
|
if (gSeq !== undefined && gSeq > run.lastGlobalSeq) {
|
|
run.lastGlobalSeq = gSeq;
|
|
}
|
|
|
|
processParentEvent(ev);
|
|
});
|
|
|
|
// ── Child process exit ──
|
|
|
|
child.on("close", (code) => {
|
|
// If already finalized (e.g. by abortRun), just record the exit code.
|
|
if (run.status !== "running") {
|
|
run.exitCode = code;
|
|
return;
|
|
}
|
|
|
|
if (!agentErrorReported && stderrChunks.length > 0) {
|
|
const stderr = stderrChunks.join("").trim();
|
|
const msg = parseErrorFromStderr(stderr);
|
|
if (msg) {
|
|
agentErrorReported = true;
|
|
emitError(msg);
|
|
}
|
|
}
|
|
|
|
closeReasoning();
|
|
|
|
const exitedClean = code === 0 || code === null;
|
|
|
|
if (!everSentText && !exitedClean) {
|
|
const tid = nextId("text");
|
|
emit({ type: "text-start", id: tid });
|
|
const errMsg = `[error] Agent exited with code ${code}. Check server logs for details.`;
|
|
emit({ type: "text-delta", id: tid, delta: errMsg });
|
|
emit({ type: "text-end", id: tid });
|
|
accAppendText(errMsg);
|
|
} else if (!everSentText && exitedClean) {
|
|
const tid = nextId("text");
|
|
emit({ type: "text-start", id: tid });
|
|
const msg = "No response from agent.";
|
|
emit({ type: "text-delta", id: tid, delta: msg });
|
|
emit({ type: "text-end", id: tid });
|
|
accAppendText(msg);
|
|
} else {
|
|
closeText();
|
|
}
|
|
|
|
run.exitCode = code;
|
|
|
|
const hasRunningSubagents = hasRunningSubagentsForParent(run.sessionId);
|
|
|
|
// If the CLI exited cleanly and subagents are still running,
|
|
// keep the SSE stream open and wait for announcement-triggered
|
|
// parent turns via subscribe-only CLI NDJSON.
|
|
if (exitedClean && hasRunningSubagents) {
|
|
run.status = "waiting-for-subagents";
|
|
|
|
if (!waitingStatusAnnounced) {
|
|
openStatusReasoning("Waiting for subagent results...");
|
|
waitingStatusAnnounced = true;
|
|
}
|
|
flushPersistence(run);
|
|
startParentSubscribeStream(run, parentSessionKey, processParentSubscribeEvent);
|
|
return;
|
|
}
|
|
|
|
// Normal completion path.
|
|
run.status = exitedClean ? "completed" : "error";
|
|
|
|
// Final persistence flush (removes _streaming flag).
|
|
flushPersistence(run);
|
|
|
|
// Signal completion to all subscribers.
|
|
for (const sub of run.subscribers) {
|
|
try {
|
|
sub(null);
|
|
} catch {
|
|
/* ignore */
|
|
}
|
|
}
|
|
run.subscribers.clear();
|
|
|
|
// Clean up run state after a grace period so reconnections
|
|
// within that window still get the buffered events.
|
|
// Guard: only clean up if we're still the active run for this session.
|
|
setTimeout(() => {
|
|
if (activeRuns.get(run.sessionId) === run) {
|
|
cleanupRun(run.sessionId);
|
|
}
|
|
}, CLEANUP_GRACE_MS);
|
|
});
|
|
|
|
child.on("error", (err) => {
|
|
// If already finalized (e.g. by abortRun), skip.
|
|
if (run.status !== "running") {return;}
|
|
|
|
console.error("[active-runs] Child process error:", err);
|
|
const message = err instanceof Error ? err.message : String(err);
|
|
emitError(`Failed to start agent: ${message}`);
|
|
run.status = "error";
|
|
flushPersistence(run);
|
|
for (const sub of run.subscribers) {
|
|
try {
|
|
sub(null);
|
|
} catch {
|
|
/* ignore */
|
|
}
|
|
}
|
|
run.subscribers.clear();
|
|
setTimeout(() => {
|
|
if (activeRuns.get(run.sessionId) === run) {
|
|
cleanupRun(run.sessionId);
|
|
}
|
|
}, CLEANUP_GRACE_MS);
|
|
});
|
|
|
|
child.stderr?.on("data", (chunk: Buffer) => {
|
|
const text = chunk.toString();
|
|
stderrChunks.push(text);
|
|
console.error("[active-runs stderr]", text);
|
|
});
|
|
}
|
|
|
|
function startParentSubscribeStream(
|
|
run: ActiveRun,
|
|
parentSessionKey: string,
|
|
onEvent: (ev: AgentEvent) => void,
|
|
): void {
|
|
stopSubscribeProcess(run);
|
|
const child = spawnAgentSubscribeProcess(parentSessionKey, run.lastGlobalSeq);
|
|
run._subscribeProcess = child;
|
|
const rl = createInterface({ input: child.stdout! });
|
|
|
|
rl.on("line", (line: string) => {
|
|
if (!line.trim()) {return;}
|
|
let ev: AgentEvent;
|
|
try {
|
|
ev = JSON.parse(line) as AgentEvent;
|
|
} catch {
|
|
return;
|
|
}
|
|
if (ev.sessionKey && ev.sessionKey !== parentSessionKey) {
|
|
return;
|
|
}
|
|
if ((run._subscribeRetryAttempt ?? 0) > 0) {
|
|
resetSubscribeRetryState(run);
|
|
}
|
|
onEvent(ev);
|
|
});
|
|
|
|
child.on("close", () => {
|
|
if (run._subscribeProcess !== child) {
|
|
return;
|
|
}
|
|
run._subscribeProcess = null;
|
|
if (run.status !== "waiting-for-subagents") {return;}
|
|
// If still waiting, restart subscribe stream from the latest cursor.
|
|
scheduleSubscribeRestart(run, () => {
|
|
if (run.status === "waiting-for-subagents" && !run._subscribeProcess) {
|
|
startParentSubscribeStream(run, parentSessionKey, onEvent);
|
|
}
|
|
});
|
|
});
|
|
|
|
child.on("error", (err) => {
|
|
console.error("[active-runs] Parent subscribe child error:", err);
|
|
});
|
|
|
|
child.stderr?.on("data", (chunk: Buffer) => {
|
|
console.error("[active-runs subscribe stderr]", chunk.toString());
|
|
});
|
|
}
|
|
|
|
function stopSubscribeProcess(run: ActiveRun): void {
|
|
clearSubscribeRetryTimer(run);
|
|
clearWaitingFinalizeTimer(run);
|
|
if (!run._subscribeProcess) {return;}
|
|
try {
|
|
run._subscribeProcess.kill("SIGTERM");
|
|
} catch {
|
|
/* ignore */
|
|
}
|
|
run._subscribeProcess = null;
|
|
}
|
|
|
|
// ── Finalize a waiting-for-subagents run ──
|
|
|
|
/**
|
|
* Transition a run from "waiting-for-subagents" to "completed".
|
|
* Called when the last subagent finishes and the parent's announcement-
|
|
* triggered turn completes.
|
|
*/
|
|
function finalizeWaitingRun(run: ActiveRun): void {
|
|
if (run.status !== "waiting-for-subagents") {return;}
|
|
|
|
run.status = "completed";
|
|
clearWaitingFinalizeTimer(run);
|
|
resetSubscribeRetryState(run);
|
|
|
|
stopSubscribeProcess(run);
|
|
|
|
flushPersistence(run);
|
|
|
|
for (const sub of run.subscribers) {
|
|
try { sub(null); } catch { /* ignore */ }
|
|
}
|
|
run.subscribers.clear();
|
|
|
|
setTimeout(() => {
|
|
if (activeRuns.get(run.sessionId) === run) {
|
|
cleanupRun(run.sessionId);
|
|
}
|
|
}, CLEANUP_GRACE_MS);
|
|
}
|
|
|
|
function clearWaitingFinalizeTimer(run: ActiveRun): void {
|
|
if (!run._waitingFinalizeTimer) {
|
|
return;
|
|
}
|
|
clearTimeout(run._waitingFinalizeTimer);
|
|
run._waitingFinalizeTimer = null;
|
|
}
|
|
|
|
function clearSubscribeRetryTimer(run: ActiveRun): void {
|
|
if (!run._subscribeRetryTimer) {
|
|
return;
|
|
}
|
|
clearTimeout(run._subscribeRetryTimer);
|
|
run._subscribeRetryTimer = null;
|
|
}
|
|
|
|
function resetSubscribeRetryState(run: ActiveRun): void {
|
|
run._subscribeRetryAttempt = 0;
|
|
clearSubscribeRetryTimer(run);
|
|
}
|
|
|
|
function scheduleSubscribeRestart(run: ActiveRun, restart: () => void): void {
|
|
if (run._subscribeRetryTimer) {
|
|
return;
|
|
}
|
|
const attempt = run._subscribeRetryAttempt ?? 0;
|
|
const delay = Math.min(
|
|
SUBSCRIBE_RETRY_MAX_MS,
|
|
SUBSCRIBE_RETRY_BASE_MS * 2 ** attempt,
|
|
);
|
|
run._subscribeRetryAttempt = attempt + 1;
|
|
run._subscribeRetryTimer = setTimeout(() => {
|
|
run._subscribeRetryTimer = null;
|
|
restart();
|
|
}, delay);
|
|
}
|
|
|
|
// ── Debounced persistence ──
|
|
|
|
function schedulePersist(run: ActiveRun) {
|
|
if (run._persistTimer) {return;}
|
|
const elapsed = Date.now() - run._lastPersistedAt;
|
|
const delay = Math.max(0, PERSIST_INTERVAL_MS - elapsed);
|
|
run._persistTimer = setTimeout(() => {
|
|
run._persistTimer = null;
|
|
flushPersistence(run);
|
|
}, delay);
|
|
}
|
|
|
|
function flushPersistence(run: ActiveRun) {
|
|
if (run._persistTimer) {
|
|
clearTimeout(run._persistTimer);
|
|
run._persistTimer = null;
|
|
}
|
|
run._lastPersistedAt = Date.now();
|
|
|
|
const parts = run.accumulated.parts;
|
|
if (parts.length === 0) {
|
|
return; // Nothing to persist yet.
|
|
}
|
|
|
|
// Filter out leaked silent-reply text fragments before persisting.
|
|
const cleanParts = parts.filter((p) =>
|
|
p.type !== "text" || !isLeakedSilentReplyToken((p as { text: string }).text),
|
|
);
|
|
|
|
// Build content text from text parts for the backwards-compatible
|
|
// content field (used when parts are not available).
|
|
const text = cleanParts
|
|
.filter((p): p is { type: "text"; text: string } => p.type === "text")
|
|
.map((p) => p.text)
|
|
.join("");
|
|
|
|
const isStillStreaming = run.status === "running" || run.status === "waiting-for-subagents";
|
|
const message: Record<string, unknown> = {
|
|
id: run.accumulated.id,
|
|
role: "assistant",
|
|
content: text,
|
|
parts: cleanParts,
|
|
timestamp: new Date().toISOString(),
|
|
};
|
|
if (isStillStreaming) {
|
|
message._streaming = true;
|
|
}
|
|
|
|
try {
|
|
upsertMessage(run.sessionId, message);
|
|
} catch (err) {
|
|
console.error("[active-runs] Persistence error:", err);
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Upsert a single message into the session JSONL.
|
|
* If a line with the same `id` already exists it is replaced; otherwise appended.
|
|
*/
|
|
function upsertMessage(
|
|
sessionId: string,
|
|
message: Record<string, unknown>,
|
|
) {
|
|
ensureDir();
|
|
const fp = join(webChatDir(), `${sessionId}.jsonl`);
|
|
if (!existsSync(fp)) {writeFileSync(fp, "");}
|
|
|
|
const msgId = message.id as string;
|
|
const content = readFileSync(fp, "utf-8");
|
|
const lines = content.split("\n").filter((l) => l.trim());
|
|
|
|
let found = false;
|
|
const updated = lines.map((line) => {
|
|
try {
|
|
const parsed = JSON.parse(line);
|
|
if (parsed.id === msgId) {
|
|
found = true;
|
|
return JSON.stringify(message);
|
|
}
|
|
} catch {
|
|
/* keep as-is */
|
|
}
|
|
return line;
|
|
});
|
|
|
|
if (!found) {
|
|
updated.push(JSON.stringify(message));
|
|
}
|
|
|
|
writeFileSync(fp, updated.join("\n") + "\n");
|
|
|
|
if (!sessionId.includes(":subagent:")) {
|
|
if (!found) {
|
|
updateIndex(sessionId, { incrementCount: 1 });
|
|
} else {
|
|
updateIndex(sessionId, {});
|
|
}
|
|
}
|
|
}
|
|
|
|
function cleanupRun(sessionId: string) {
|
|
const run = activeRuns.get(sessionId);
|
|
if (!run) {return;}
|
|
if (run._persistTimer) {clearTimeout(run._persistTimer);}
|
|
clearWaitingFinalizeTimer(run);
|
|
stopSubscribeProcess(run);
|
|
activeRuns.delete(sessionId);
|
|
}
|