This commit refactors the handling of subagent sessions by removing the enrichSubagentMessages function and integrating its functionality into the GET request handler. It also introduces the enrichSubagentSessionFromTranscript function to enhance session data retrieval. Additionally, the persistSubscribeUserMessage function is updated to ensure user messages are saved to the session JSONL file, improving message persistence across page reloads. These changes enhance the clarity and efficiency of subagent session management.
2173 lines
65 KiB
TypeScript
2173 lines
65 KiB
TypeScript
/**
|
|
* Server-side singleton that manages agent child processes independently of
|
|
* HTTP connections. Buffers SSE events, fans out to subscribers, and
|
|
* persists assistant messages incrementally to disk.
|
|
*
|
|
* This decouples agent lifecycles from request lifecycles so:
|
|
* - Streams survive page reloads (process keeps running).
|
|
* - Messages are written to persistent sessions as they arrive.
|
|
* - New HTTP connections can re-attach to a running stream.
|
|
*/
|
|
import { createInterface } from "node:readline";
|
|
import { join } from "node:path";
|
|
import {
|
|
readFileSync,
|
|
writeFileSync,
|
|
existsSync,
|
|
mkdirSync,
|
|
} from "node:fs";
|
|
import { resolveWebChatDir, resolveOpenClawStateDir, resolveActiveAgentId } from "./workspace";
|
|
import {
|
|
type AgentProcessHandle,
|
|
type AgentEvent,
|
|
spawnAgentProcess,
|
|
spawnAgentSubscribeProcess,
|
|
spawnAgentStartForSession,
|
|
callGatewayRpc,
|
|
extractToolResult,
|
|
buildToolOutput,
|
|
parseAgentErrorMessage,
|
|
parseErrorBody,
|
|
parseErrorFromStderr,
|
|
} from "./agent-runner";
|
|
|
|
// ── Types ──
|
|
|
|
/** An SSE event object in the AI SDK v6 data stream wire format. */
|
|
export type SseEvent = Record<string, unknown> & { type: string };
|
|
|
|
/** Subscriber callback. Receives SSE events, or `null` when the run completes. */
|
|
export type RunSubscriber = (event: SseEvent | null) => void;
|
|
|
|
type AccumulatedPart =
|
|
| { type: "reasoning"; text: string }
|
|
| {
|
|
type: "tool-invocation";
|
|
toolCallId: string;
|
|
toolName: string;
|
|
args: Record<string, unknown>;
|
|
result?: Record<string, unknown>;
|
|
errorText?: string;
|
|
}
|
|
| { type: "text"; text: string };
|
|
|
|
type AccumulatedMessage = {
|
|
id: string;
|
|
role: "assistant";
|
|
/** Ordered parts preserving the interleaving of reasoning, tools, and text. */
|
|
parts: AccumulatedPart[];
|
|
};
|
|
|
|
export type ActiveRun = {
|
|
sessionId: string;
|
|
childProcess: AgentProcessHandle;
|
|
eventBuffer: SseEvent[];
|
|
subscribers: Set<RunSubscriber>;
|
|
accumulated: AccumulatedMessage;
|
|
status: "running" | "waiting-for-subagents" | "completed" | "error";
|
|
startedAt: number;
|
|
exitCode: number | null;
|
|
abortController: AbortController;
|
|
/** @internal debounced persistence timer */
|
|
_persistTimer: ReturnType<typeof setTimeout> | null;
|
|
/** @internal last time persistence was flushed */
|
|
_lastPersistedAt: number;
|
|
/** @internal last globalSeq seen from the gateway event stream */
|
|
lastGlobalSeq: number;
|
|
/** @internal subscribe child process for waiting-for-subagents continuation */
|
|
_subscribeProcess?: AgentProcessHandle | null;
|
|
/** @internal retry timer for subscribe stream restarts */
|
|
_subscribeRetryTimer?: ReturnType<typeof setTimeout> | null;
|
|
/** @internal consecutive subscribe restart attempts without a received event */
|
|
_subscribeRetryAttempt?: number;
|
|
/** Full gateway session key (used for subagent subscribe-only runs) */
|
|
sessionKey?: string;
|
|
/** Parent web session ID (for subagent runs) */
|
|
parentSessionId?: string;
|
|
/** Subagent task description */
|
|
task?: string;
|
|
/** Subagent label */
|
|
label?: string;
|
|
/** True for subscribe-only runs (subagents) that don't own the agent process */
|
|
isSubscribeOnly?: boolean;
|
|
/** Set when lifecycle/end is received; defers finalization until subscribe close */
|
|
_lifecycleEnded?: boolean;
|
|
/** Safety timer to finalize if subscribe process hangs after lifecycle/end */
|
|
_finalizeTimer?: ReturnType<typeof setTimeout> | null;
|
|
/** @internal short reconciliation window before waiting-run completion */
|
|
_waitingFinalizeTimer?: ReturnType<typeof setTimeout> | null;
|
|
};
|
|
|
|
// ── Constants ──
|
|
|
|
const PERSIST_INTERVAL_MS = 2_000;
|
|
const CLEANUP_GRACE_MS = 30_000;
|
|
const SUBSCRIBE_CLEANUP_GRACE_MS = 24 * 60 * 60_000;
|
|
const SUBSCRIBE_RETRY_BASE_MS = 300;
|
|
const SUBSCRIBE_RETRY_MAX_MS = 5_000;
|
|
const SUBSCRIBE_LIFECYCLE_END_GRACE_MS = 750;
|
|
const WAITING_FINALIZE_RECONCILE_MS = 5_000;
|
|
|
|
const SILENT_REPLY_TOKEN = "NO_REPLY";
|
|
|
|
/**
|
|
* Detect leaked silent-reply fragments in finalized text parts.
|
|
* The agent runner suppresses full "NO_REPLY" tokens, but during streaming
|
|
* the model may emit a partial prefix (e.g. "NO") before the full token is
|
|
* assembled and caught. This catches both the full token and known partial
|
|
* prefixes so they don't leak into persisted/displayed messages.
|
|
*/
|
|
function isLeakedSilentReplyToken(text: string): boolean {
|
|
const t = text.trim();
|
|
if (!t) {return false;}
|
|
if (new RegExp(`^${SILENT_REPLY_TOKEN}\\W*$`).test(t)) {return true;}
|
|
if (SILENT_REPLY_TOKEN.startsWith(t) && t.length >= 2 && t.length < SILENT_REPLY_TOKEN.length) {return true;}
|
|
return false;
|
|
}
|
|
|
|
function asRecord(value: unknown): Record<string, unknown> | null {
|
|
if (!value || typeof value !== "object" || Array.isArray(value)) {
|
|
return null;
|
|
}
|
|
return value as Record<string, unknown>;
|
|
}
|
|
|
|
function resolveModelLabel(provider: unknown, model: unknown): string | null {
|
|
if (typeof model !== "string" || !model.trim()) { return null; }
|
|
const m = model.trim();
|
|
if (typeof provider === "string" && provider.trim()) {
|
|
const p = provider.trim();
|
|
return m.toLowerCase().startsWith(`${p.toLowerCase()}/`) ? m : `${p}/${m}`;
|
|
}
|
|
return m;
|
|
}
|
|
|
|
function extractAssistantTextFromChatPayload(
|
|
data: Record<string, unknown> | undefined,
|
|
): string {
|
|
if (!data) {
|
|
return "";
|
|
}
|
|
const state = typeof data.state === "string" ? data.state : "";
|
|
if (state !== "final") {
|
|
return "";
|
|
}
|
|
const message = asRecord(data.message);
|
|
if (!message || message.role !== "assistant") {
|
|
return "";
|
|
}
|
|
const content = message.content;
|
|
if (typeof content === "string") {
|
|
return content;
|
|
}
|
|
if (!Array.isArray(content)) {
|
|
return "";
|
|
}
|
|
const chunks: string[] = [];
|
|
for (const part of content) {
|
|
const rec = asRecord(part);
|
|
if (!rec) {
|
|
continue;
|
|
}
|
|
const type = typeof rec.type === "string" ? rec.type : "";
|
|
if ((type === "text" || type === "output_text") && typeof rec.text === "string") {
|
|
chunks.push(rec.text);
|
|
}
|
|
}
|
|
return chunks.join("");
|
|
}
|
|
// Evaluated per-call so it tracks profile switches at runtime.
|
|
function webChatDir(): string { return resolveWebChatDir(); }
|
|
function indexFile(): string { return join(webChatDir(), "index.json"); }
|
|
|
|
// ── Singleton registry ──
|
|
// Store on globalThis so the Map survives Next.js HMR reloads in dev mode.
|
|
// Without this, hot-reloading any server module resets the Map, orphaning
|
|
// running child processes and dropping SSE streams mid-flight.
|
|
|
|
const GLOBAL_KEY = "__openclaw_activeRuns" as const;
|
|
|
|
const activeRuns: Map<string, ActiveRun> =
|
|
(globalThis as Record<string, unknown>)[GLOBAL_KEY] as Map<string, ActiveRun> ??
|
|
new Map<string, ActiveRun>();
|
|
|
|
(globalThis as Record<string, unknown>)[GLOBAL_KEY] = activeRuns;
|
|
|
|
// ── Public API ──
|
|
|
|
/** Retrieve an active or recently-completed run (within the grace period). */
|
|
export function getActiveRun(sessionId: string): ActiveRun | undefined {
|
|
return activeRuns.get(sessionId);
|
|
}
|
|
|
|
/** Check whether a *running* (not just completed) run exists for a session. */
|
|
export function hasActiveRun(sessionId: string): boolean {
|
|
const run = activeRuns.get(sessionId);
|
|
return run !== undefined && (run.status === "running" || run.status === "waiting-for-subagents");
|
|
}
|
|
|
|
/** Return the session IDs of all currently running agent runs. */
|
|
export function getRunningSessionIds(): string[] {
|
|
const ids: string[] = [];
|
|
for (const [sessionId, run] of activeRuns) {
|
|
if (run.status === "running" || run.status === "waiting-for-subagents") {
|
|
ids.push(sessionId);
|
|
}
|
|
}
|
|
return ids;
|
|
}
|
|
|
|
/** Check if any subagent sessions are still running for a parent web session. */
|
|
export function hasRunningSubagentsForParent(parentWebSessionId: string): boolean {
|
|
for (const [_key, run] of activeRuns) {
|
|
if (run.isSubscribeOnly && run.parentSessionId === parentWebSessionId && run.status === "running") {
|
|
return true;
|
|
}
|
|
}
|
|
// Fallback: check the gateway disk registry
|
|
const registryPath = join(resolveOpenClawStateDir(), "subagents", "runs.json");
|
|
if (!existsSync(registryPath)) {return false;}
|
|
try {
|
|
const raw = JSON.parse(readFileSync(registryPath, "utf-8")) as {
|
|
runs?: Record<string, Record<string, unknown>>;
|
|
};
|
|
const runs = raw?.runs;
|
|
if (!runs) {return false;}
|
|
const parentKeyPattern = `:web:${parentWebSessionId}`;
|
|
for (const entry of Object.values(runs)) {
|
|
const requester = typeof entry.requesterSessionKey === "string" ? entry.requesterSessionKey : "";
|
|
if (!requester.endsWith(parentKeyPattern)) {continue;}
|
|
if (typeof entry.endedAt === "number") {continue;}
|
|
return true;
|
|
}
|
|
} catch { /* ignore */ }
|
|
return false;
|
|
}
|
|
|
|
/**
|
|
* Subscribe to an active run's SSE events.
|
|
*
|
|
* When `replay` is true (default), all buffered events are replayed first
|
|
* (synchronously), then live events follow. If the run already finished,
|
|
* the subscriber is called with `null` after the replay.
|
|
*
|
|
* Returns an unsubscribe function, or `null` if no run exists.
|
|
*/
|
|
export function subscribeToRun(
|
|
sessionId: string,
|
|
callback: RunSubscriber,
|
|
options?: { replay?: boolean },
|
|
): (() => void) | null {
|
|
const run = activeRuns.get(sessionId);
|
|
if (!run) {return null;}
|
|
|
|
const replay = options?.replay ?? true;
|
|
|
|
// Replay buffered events synchronously (safe — no event-loop yield).
|
|
if (replay) {
|
|
for (const event of run.eventBuffer) {
|
|
callback(event);
|
|
}
|
|
}
|
|
|
|
// If the run already finished, signal completion immediately.
|
|
if (run.status !== "running" && run.status !== "waiting-for-subagents") {
|
|
callback(null);
|
|
return () => {};
|
|
}
|
|
|
|
run.subscribers.add(callback);
|
|
return () => {
|
|
run.subscribers.delete(callback);
|
|
};
|
|
}
|
|
|
|
/**
|
|
* Reactivate a completed subscribe-only run for a follow-up message.
|
|
* Resets status to "running" and restarts the subscribe stream.
|
|
*/
|
|
export function reactivateSubscribeRun(sessionKey: string, message?: string): boolean {
|
|
const run = activeRuns.get(sessionKey);
|
|
if (!run?.isSubscribeOnly) {return false;}
|
|
if (run.status === "running") {return true;}
|
|
|
|
run.status = "running";
|
|
run._lifecycleEnded = false;
|
|
if (run._finalizeTimer) {clearTimeout(run._finalizeTimer); run._finalizeTimer = null;}
|
|
clearWaitingFinalizeTimer(run);
|
|
resetSubscribeRetryState(run);
|
|
stopSubscribeProcess(run);
|
|
|
|
run.accumulated = {
|
|
id: `assistant-${sessionKey}-${Date.now()}`,
|
|
role: "assistant",
|
|
parts: [],
|
|
};
|
|
|
|
// When a follow-up message is provided, use start mode so the `agent`
|
|
// RPC streams ALL events (including tool events) on the same connection.
|
|
// In passive subscribe mode, tool events are not broadcast by the gateway.
|
|
const newChild = message
|
|
? spawnAgentStartForSession(message, sessionKey)
|
|
: spawnAgentSubscribeProcess(sessionKey, run.lastGlobalSeq);
|
|
run._subscribeProcess = newChild;
|
|
run.childProcess = newChild;
|
|
wireSubscribeOnlyProcess(run, newChild, sessionKey);
|
|
return true;
|
|
}
|
|
|
|
/**
|
|
* Send a follow-up message to a subagent session via gateway RPC.
|
|
* The subscribe stream picks up the agent's response events.
|
|
*/
|
|
export function sendSubagentFollowUp(sessionKey: string, message: string): boolean {
|
|
try {
|
|
void callGatewayRpc(
|
|
"agent",
|
|
{
|
|
message,
|
|
sessionKey,
|
|
idempotencyKey: `follow-${Date.now()}-${Math.random().toString(36).slice(2)}`,
|
|
deliver: false,
|
|
channel: "webchat",
|
|
lane: "subagent",
|
|
timeout: 0,
|
|
},
|
|
{ timeoutMs: 10_000 },
|
|
).catch(() => {
|
|
// Best effort.
|
|
});
|
|
return true;
|
|
} catch {
|
|
return false;
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Persist a user message for a subscribe-only (subagent) run.
|
|
* Emits a user-message event so reconnecting clients see the message,
|
|
* and writes the message to the session JSONL file on disk.
|
|
*/
|
|
export function persistSubscribeUserMessage(
|
|
sessionKey: string,
|
|
msg: { id?: string; text: string },
|
|
): boolean {
|
|
const run = activeRuns.get(sessionKey);
|
|
if (!run) {return false;}
|
|
const msgId = msg.id ?? `user-${Date.now()}-${Math.random().toString(36).slice(2)}`;
|
|
const event: SseEvent = {
|
|
type: "user-message",
|
|
id: msgId,
|
|
text: msg.text,
|
|
};
|
|
run.eventBuffer.push(event);
|
|
for (const sub of run.subscribers) {
|
|
try { sub(event); } catch { /* ignore */ }
|
|
}
|
|
|
|
// Write the user message to the session JSONL (same as persistUserMessage
|
|
// does for parent sessions) so it survives page reloads.
|
|
try {
|
|
ensureDir();
|
|
const fp = join(webChatDir(), `${sessionKey}.jsonl`);
|
|
if (!existsSync(fp)) {writeFileSync(fp, "");}
|
|
const content = readFileSync(fp, "utf-8");
|
|
const lines = content.split("\n").filter((l) => l.trim());
|
|
const alreadySaved = lines.some((l) => {
|
|
try { return JSON.parse(l).id === msgId; } catch { return false; }
|
|
});
|
|
if (!alreadySaved) {
|
|
const line = JSON.stringify({
|
|
id: msgId,
|
|
role: "user",
|
|
content: msg.text,
|
|
parts: [{ type: "text", text: msg.text }],
|
|
timestamp: new Date().toISOString(),
|
|
});
|
|
writeFileSync(fp, [...lines, line].join("\n") + "\n");
|
|
}
|
|
} catch { /* best effort */ }
|
|
|
|
schedulePersist(run);
|
|
return true;
|
|
}
|
|
|
|
/** Abort a running agent. Returns true if a run was actually aborted. */
|
|
export function abortRun(sessionId: string): boolean {
|
|
const run = activeRuns.get(sessionId);
|
|
if (!run || (run.status !== "running" && run.status !== "waiting-for-subagents")) {return false;}
|
|
|
|
// Immediately mark the run as non-running so hasActiveRun() returns
|
|
// false and the next user message isn't rejected with 409.
|
|
const wasWaiting = run.status === "waiting-for-subagents";
|
|
run.status = "error";
|
|
clearWaitingFinalizeTimer(run);
|
|
|
|
// Clean up waiting subscribe process if present.
|
|
stopSubscribeProcess(run);
|
|
|
|
run.abortController.abort();
|
|
if (!wasWaiting) {
|
|
run.childProcess.kill("SIGTERM");
|
|
}
|
|
|
|
// Send chat.abort directly to the gateway so the agent run stops
|
|
// even if the CLI child's best-effort onAbort doesn't complete in time.
|
|
sendGatewayAbort(sessionId);
|
|
|
|
// Flush persistence to save the partial response (without _streaming).
|
|
flushPersistence(run);
|
|
|
|
// Signal subscribers that the stream ended.
|
|
for (const sub of run.subscribers) {
|
|
try { sub(null); } catch { /* ignore */ }
|
|
}
|
|
run.subscribers.clear();
|
|
|
|
// Schedule grace-period cleanup (guard: only if we're still the active run).
|
|
setTimeout(() => {
|
|
if (activeRuns.get(sessionId) === run) {
|
|
cleanupRun(sessionId);
|
|
}
|
|
}, CLEANUP_GRACE_MS);
|
|
|
|
// Fallback: if the child doesn't exit within 5 seconds after
|
|
// SIGTERM (e.g. the CLI's best-effort chat.abort RPC hangs),
|
|
// send SIGKILL to force-terminate.
|
|
if (!wasWaiting) {
|
|
const killTimer = setTimeout(() => {
|
|
try {
|
|
run.childProcess.kill("SIGKILL");
|
|
} catch { /* already dead */ }
|
|
}, 5_000);
|
|
run.childProcess.once("close", () => clearTimeout(killTimer));
|
|
}
|
|
|
|
return true;
|
|
}
|
|
|
|
/**
|
|
* Send a `chat.abort` RPC directly to the gateway daemon via a short-lived
|
|
* CLI process. This is a belt-and-suspenders complement to the SIGTERM sent
|
|
* to the child: even if the child's best-effort `onAbort` callback doesn't
|
|
* reach the gateway in time, this separate process will.
|
|
*/
|
|
function sendGatewayAbort(sessionId: string): void {
|
|
try {
|
|
const sessionKey = `agent:${resolveActiveAgentId()}:web:${sessionId}`;
|
|
void callGatewayRpc("chat.abort", { sessionKey }, { timeoutMs: 4_000 }).catch(
|
|
() => {
|
|
// Best effort; don't let abort failures break the stop flow.
|
|
},
|
|
);
|
|
} catch {
|
|
// Best-effort; don't let abort failures break the stop flow.
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Start a new agent run for the given session.
|
|
* Throws if a run is already active for this session.
|
|
*/
|
|
export function startRun(params: {
|
|
sessionId: string;
|
|
message: string;
|
|
agentSessionId?: string;
|
|
}): ActiveRun {
|
|
const { sessionId, message, agentSessionId } = params;
|
|
|
|
const existing = activeRuns.get(sessionId);
|
|
if (existing?.status === "running") {
|
|
throw new Error("Active run already exists for this session");
|
|
}
|
|
// Clean up a finished run that's still in the grace period.
|
|
if (existing) {cleanupRun(sessionId);}
|
|
|
|
const abortController = new AbortController();
|
|
const child = spawnAgentProcess(message, agentSessionId);
|
|
|
|
const run: ActiveRun = {
|
|
sessionId,
|
|
childProcess: child,
|
|
eventBuffer: [],
|
|
subscribers: new Set(),
|
|
accumulated: {
|
|
id: `assistant-${sessionId}-${Date.now()}`,
|
|
role: "assistant",
|
|
parts: [],
|
|
},
|
|
status: "running",
|
|
startedAt: Date.now(),
|
|
exitCode: null,
|
|
abortController,
|
|
_persistTimer: null,
|
|
_lastPersistedAt: 0,
|
|
lastGlobalSeq: 0,
|
|
_subscribeRetryTimer: null,
|
|
_subscribeRetryAttempt: 0,
|
|
_waitingFinalizeTimer: null,
|
|
};
|
|
|
|
activeRuns.set(sessionId, run);
|
|
|
|
// Wire abort signal → child process kill.
|
|
const onAbort = () => child.kill("SIGTERM");
|
|
if (abortController.signal.aborted) {
|
|
child.kill("SIGTERM");
|
|
} else {
|
|
abortController.signal.addEventListener("abort", onAbort, {
|
|
once: true,
|
|
});
|
|
child.on("close", () =>
|
|
abortController.signal.removeEventListener("abort", onAbort),
|
|
);
|
|
}
|
|
|
|
wireChildProcess(run);
|
|
return run;
|
|
}
|
|
|
|
/**
|
|
* Start a subscribe-only run for a subagent session.
|
|
* The agent is already running in the gateway; we just subscribe to its
|
|
* event stream so buffering, persistence, and reconnection work identically
|
|
* to parent sessions.
|
|
*/
|
|
export function startSubscribeRun(params: {
|
|
sessionKey: string;
|
|
parentSessionId: string;
|
|
task: string;
|
|
label?: string;
|
|
}): ActiveRun {
|
|
const { sessionKey, parentSessionId, task, label } = params;
|
|
|
|
if (activeRuns.has(sessionKey)) {
|
|
return activeRuns.get(sessionKey)!;
|
|
}
|
|
|
|
// Patch verbose level BEFORE spawning the subscribe process so tool
|
|
// events are generated for events that occur after this point.
|
|
// The subscribe process also patches, but this gives us a head start.
|
|
void callGatewayRpc(
|
|
"sessions.patch",
|
|
{ key: sessionKey, verboseLevel: "full", reasoningLevel: "on" },
|
|
{ timeoutMs: 4_000 },
|
|
).catch(() => {});
|
|
|
|
const abortController = new AbortController();
|
|
const subscribeChild = spawnAgentSubscribeProcess(sessionKey, 0);
|
|
|
|
const run: ActiveRun = {
|
|
sessionId: sessionKey,
|
|
childProcess: subscribeChild,
|
|
eventBuffer: [],
|
|
subscribers: new Set(),
|
|
accumulated: {
|
|
id: `assistant-${sessionKey}-${Date.now()}`,
|
|
role: "assistant",
|
|
parts: [],
|
|
},
|
|
status: "running",
|
|
startedAt: Date.now(),
|
|
exitCode: null,
|
|
abortController,
|
|
_persistTimer: null,
|
|
_lastPersistedAt: 0,
|
|
lastGlobalSeq: 0,
|
|
sessionKey,
|
|
parentSessionId,
|
|
task,
|
|
label,
|
|
isSubscribeOnly: true,
|
|
_lifecycleEnded: false,
|
|
_finalizeTimer: null,
|
|
_subscribeRetryTimer: null,
|
|
_subscribeRetryAttempt: 0,
|
|
_waitingFinalizeTimer: null,
|
|
};
|
|
|
|
activeRuns.set(sessionKey, run);
|
|
wireSubscribeOnlyProcess(run, subscribeChild, sessionKey);
|
|
return run;
|
|
}
|
|
|
|
type TranscriptToolPart = {
|
|
type: "tool-invocation";
|
|
toolCallId: string;
|
|
toolName: string;
|
|
args: Record<string, unknown>;
|
|
result?: Record<string, unknown>;
|
|
};
|
|
|
|
function readLatestTranscriptToolParts(
|
|
sessionKey: string,
|
|
): { sessionId: string; tools: TranscriptToolPart[] } | null {
|
|
const stateDir = resolveOpenClawStateDir();
|
|
const agentId = resolveActiveAgentId();
|
|
const sessionsJsonPath = join(stateDir, "agents", agentId, "sessions", "sessions.json");
|
|
if (!existsSync(sessionsJsonPath)) { return null; }
|
|
const sessions = JSON.parse(readFileSync(sessionsJsonPath, "utf-8")) as Record<string, Record<string, unknown>>;
|
|
const sessionData = sessions[sessionKey];
|
|
const sessionId = typeof sessionData?.sessionId === "string" ? sessionData.sessionId : null;
|
|
if (!sessionId) { return null; }
|
|
const transcriptPath = join(stateDir, "agents", agentId, "sessions", `${sessionId}.jsonl`);
|
|
if (!existsSync(transcriptPath)) { return null; }
|
|
|
|
const entries = readFileSync(transcriptPath, "utf-8")
|
|
.split("\n").filter((l) => l.trim())
|
|
.map((l) => { try { return JSON.parse(l); } catch { return null; } })
|
|
.filter(Boolean) as Array<Record<string, unknown>>;
|
|
|
|
const toolResults = new Map<string, Record<string, unknown>>();
|
|
let latestToolCalls: TranscriptToolPart[] = [];
|
|
|
|
for (const entry of entries) {
|
|
if (entry.type !== "message") { continue; }
|
|
const msg = entry.message as Record<string, unknown> | undefined;
|
|
if (!msg) { continue; }
|
|
const role = typeof msg.role === "string" ? msg.role : "";
|
|
const content = msg.content;
|
|
|
|
if (role === "toolResult" && typeof msg.toolCallId === "string") {
|
|
const text = Array.isArray(content)
|
|
? (content as Array<Record<string, unknown>>)
|
|
.filter((c) => c.type === "text" && typeof c.text === "string")
|
|
.map((c) => c.text as string).join("\n")
|
|
: typeof content === "string" ? content : "";
|
|
toolResults.set(msg.toolCallId, { text: text.slice(0, 500) });
|
|
continue;
|
|
}
|
|
|
|
if (role !== "assistant" || !Array.isArray(content)) { continue; }
|
|
const calls: TranscriptToolPart[] = [];
|
|
for (const part of content as Array<Record<string, unknown>>) {
|
|
if (part.type === "toolCall" && typeof part.id === "string" && typeof part.name === "string") {
|
|
calls.push({
|
|
type: "tool-invocation",
|
|
toolCallId: part.id,
|
|
toolName: part.name,
|
|
args: (part.arguments as Record<string, unknown>) ?? {},
|
|
});
|
|
}
|
|
}
|
|
if (calls.length > 0) {
|
|
latestToolCalls = calls;
|
|
}
|
|
}
|
|
|
|
if (latestToolCalls.length === 0) {
|
|
return null;
|
|
}
|
|
|
|
const withResults = latestToolCalls.map((tool) => {
|
|
const result = toolResults.get(tool.toolCallId);
|
|
return result ? { ...tool, result } : tool;
|
|
});
|
|
|
|
return { sessionId, tools: withResults };
|
|
}
|
|
|
|
/**
|
|
* Wire event processing for a subscribe-only run (subagent).
|
|
* Uses the same processParentEvent pipeline as parent runs,
|
|
* with deferred finalization on lifecycle/end.
|
|
*/
|
|
function wireSubscribeOnlyProcess(
|
|
run: ActiveRun,
|
|
child: AgentProcessHandle,
|
|
sessionKey: string,
|
|
): void {
|
|
let idCounter = 0;
|
|
const nextId = (prefix: string) =>
|
|
`${prefix}-${Date.now()}-${++idCounter}`;
|
|
|
|
let currentTextId = "";
|
|
let currentReasoningId = "";
|
|
let currentStatusReasoningLabel: string | null = null;
|
|
let textStarted = false;
|
|
let reasoningStarted = false;
|
|
let statusReasoningActive = false;
|
|
let agentErrorReported = false;
|
|
const liveStats = {
|
|
assistantChunks: 0,
|
|
toolStartCount: 0,
|
|
};
|
|
|
|
let accTextIdx = -1;
|
|
let accReasoningIdx = -1;
|
|
const accToolMap = new Map<string, number>();
|
|
|
|
const accAppendReasoning = (delta: string) => {
|
|
if (accReasoningIdx < 0) {
|
|
run.accumulated.parts.push({ type: "reasoning", text: delta });
|
|
accReasoningIdx = run.accumulated.parts.length - 1;
|
|
} else {
|
|
(run.accumulated.parts[accReasoningIdx] as { type: "reasoning"; text: string }).text += delta;
|
|
}
|
|
};
|
|
|
|
const accAppendText = (delta: string) => {
|
|
if (accTextIdx < 0) {
|
|
run.accumulated.parts.push({ type: "text", text: delta });
|
|
accTextIdx = run.accumulated.parts.length - 1;
|
|
} else {
|
|
(run.accumulated.parts[accTextIdx] as { type: "text"; text: string }).text += delta;
|
|
}
|
|
};
|
|
|
|
const emit = (event: SseEvent) => {
|
|
run.eventBuffer.push(event);
|
|
for (const sub of run.subscribers) {
|
|
try { sub(event); } catch { /* ignore */ }
|
|
}
|
|
schedulePersist(run);
|
|
};
|
|
|
|
const emitError = (message: string) => {
|
|
closeReasoning();
|
|
closeText();
|
|
const tid = nextId("text");
|
|
emit({ type: "text-start", id: tid });
|
|
emit({ type: "text-delta", id: tid, delta: `[error] ${message}` });
|
|
emit({ type: "text-end", id: tid });
|
|
accAppendText(`[error] ${message}`);
|
|
};
|
|
|
|
const closeReasoning = () => {
|
|
if (reasoningStarted) {
|
|
emit({ type: "reasoning-end", id: currentReasoningId });
|
|
reasoningStarted = false;
|
|
statusReasoningActive = false;
|
|
}
|
|
currentStatusReasoningLabel = null;
|
|
accReasoningIdx = -1;
|
|
};
|
|
|
|
const closeText = () => {
|
|
if (textStarted) {
|
|
const lastPart = run.accumulated.parts[accTextIdx];
|
|
if (lastPart?.type === "text" && isLeakedSilentReplyToken(lastPart.text)) {
|
|
run.accumulated.parts.splice(accTextIdx, 1);
|
|
}
|
|
emit({ type: "text-end", id: currentTextId });
|
|
textStarted = false;
|
|
}
|
|
accTextIdx = -1;
|
|
};
|
|
|
|
const openStatusReasoning = (label: string) => {
|
|
if (statusReasoningActive && currentStatusReasoningLabel === label) {
|
|
return;
|
|
}
|
|
closeReasoning();
|
|
closeText();
|
|
currentReasoningId = nextId("status");
|
|
emit({ type: "reasoning-start", id: currentReasoningId });
|
|
emit({ type: "reasoning-delta", id: currentReasoningId, delta: label });
|
|
reasoningStarted = true;
|
|
statusReasoningActive = true;
|
|
currentStatusReasoningLabel = label;
|
|
};
|
|
|
|
const maybeBackfillLiveToolsFromTranscript = (_reason: "assistant-chunk" | "lifecycle-end") => {
|
|
if (liveStats.toolStartCount > 0) {
|
|
return;
|
|
}
|
|
const bundle = readLatestTranscriptToolParts(sessionKey);
|
|
if (!bundle) {
|
|
return;
|
|
}
|
|
|
|
for (const tool of bundle.tools) {
|
|
const existingIdx = accToolMap.get(tool.toolCallId);
|
|
if (existingIdx === undefined) {
|
|
closeReasoning();
|
|
closeText();
|
|
emit({
|
|
type: "tool-input-start",
|
|
toolCallId: tool.toolCallId,
|
|
toolName: tool.toolName,
|
|
});
|
|
emit({
|
|
type: "tool-input-available",
|
|
toolCallId: tool.toolCallId,
|
|
toolName: tool.toolName,
|
|
input: tool.args ?? {},
|
|
});
|
|
run.accumulated.parts.push({
|
|
type: "tool-invocation",
|
|
toolCallId: tool.toolCallId,
|
|
toolName: tool.toolName,
|
|
args: tool.args ?? {},
|
|
});
|
|
accToolMap.set(tool.toolCallId, run.accumulated.parts.length - 1);
|
|
}
|
|
|
|
if (!tool.result) {
|
|
continue;
|
|
}
|
|
|
|
const idx = accToolMap.get(tool.toolCallId);
|
|
if (idx === undefined) {
|
|
continue;
|
|
}
|
|
const part = run.accumulated.parts[idx];
|
|
if (part.type !== "tool-invocation" || part.result) {
|
|
continue;
|
|
}
|
|
part.result = tool.result;
|
|
emit({
|
|
type: "tool-output-available",
|
|
toolCallId: tool.toolCallId,
|
|
output: tool.result,
|
|
});
|
|
}
|
|
};
|
|
|
|
const processEvent = (ev: AgentEvent) => {
|
|
const isLifecycleEndEvent =
|
|
ev.event === "agent" &&
|
|
ev.stream === "lifecycle" &&
|
|
ev.data?.phase === "end";
|
|
if (!isLifecycleEndEvent) {
|
|
if (run._finalizeTimer) {
|
|
clearTimeout(run._finalizeTimer);
|
|
run._finalizeTimer = null;
|
|
}
|
|
run._lifecycleEnded = false;
|
|
}
|
|
|
|
if (ev.event === "agent" && ev.stream === "lifecycle" && ev.data?.phase === "start") {
|
|
openStatusReasoning("Preparing response...");
|
|
}
|
|
|
|
if (ev.event === "agent" && ev.stream === "thinking") {
|
|
const delta = typeof ev.data?.delta === "string" ? ev.data.delta : undefined;
|
|
if (delta) {
|
|
if (statusReasoningActive) { closeReasoning(); }
|
|
if (!reasoningStarted) {
|
|
currentReasoningId = nextId("reasoning");
|
|
emit({ type: "reasoning-start", id: currentReasoningId });
|
|
reasoningStarted = true;
|
|
}
|
|
emit({ type: "reasoning-delta", id: currentReasoningId, delta });
|
|
accAppendReasoning(delta);
|
|
}
|
|
}
|
|
|
|
if (ev.event === "agent" && ev.stream === "assistant") {
|
|
const delta = typeof ev.data?.delta === "string" ? ev.data.delta : undefined;
|
|
const textFallback = !delta && typeof ev.data?.text === "string" ? ev.data.text : undefined;
|
|
const chunk = delta ?? textFallback;
|
|
if (chunk) {
|
|
liveStats.assistantChunks += 1;
|
|
if (
|
|
liveStats.toolStartCount === 0 &&
|
|
(liveStats.assistantChunks === 1 || liveStats.assistantChunks % 40 === 0)
|
|
) {
|
|
maybeBackfillLiveToolsFromTranscript("assistant-chunk");
|
|
}
|
|
closeReasoning();
|
|
if (!textStarted) {
|
|
currentTextId = nextId("text");
|
|
emit({ type: "text-start", id: currentTextId });
|
|
textStarted = true;
|
|
}
|
|
emit({ type: "text-delta", id: currentTextId, delta: chunk });
|
|
accAppendText(chunk);
|
|
}
|
|
const mediaUrls = ev.data?.mediaUrls;
|
|
if (Array.isArray(mediaUrls)) {
|
|
for (const url of mediaUrls) {
|
|
if (typeof url === "string" && url.trim()) {
|
|
closeReasoning();
|
|
if (!textStarted) {
|
|
currentTextId = nextId("text");
|
|
emit({ type: "text-start", id: currentTextId });
|
|
textStarted = true;
|
|
}
|
|
const md = `\n})\n`;
|
|
emit({ type: "text-delta", id: currentTextId, delta: md });
|
|
accAppendText(md);
|
|
}
|
|
}
|
|
}
|
|
if (typeof ev.data?.stopReason === "string" && ev.data.stopReason === "error" && typeof ev.data?.errorMessage === "string" && !agentErrorReported) {
|
|
agentErrorReported = true;
|
|
emitError(parseErrorBody(ev.data.errorMessage));
|
|
}
|
|
}
|
|
|
|
if (ev.event === "agent" && ev.stream === "tool") {
|
|
const phase = typeof ev.data?.phase === "string" ? ev.data.phase : undefined;
|
|
const toolCallId = typeof ev.data?.toolCallId === "string" ? ev.data.toolCallId : "";
|
|
const toolName = typeof ev.data?.name === "string" ? ev.data.name : "";
|
|
if (phase === "start") {
|
|
liveStats.toolStartCount += 1;
|
|
closeReasoning();
|
|
closeText();
|
|
const args = ev.data?.args && typeof ev.data.args === "object" ? (ev.data.args as Record<string, unknown>) : {};
|
|
emit({ type: "tool-input-start", toolCallId, toolName });
|
|
emit({ type: "tool-input-available", toolCallId, toolName, input: args });
|
|
run.accumulated.parts.push({ type: "tool-invocation", toolCallId, toolName, args });
|
|
accToolMap.set(toolCallId, run.accumulated.parts.length - 1);
|
|
} else if (phase === "update") {
|
|
const partialResult = extractToolResult(ev.data?.partialResult);
|
|
if (partialResult) {
|
|
const output = buildToolOutput(partialResult);
|
|
emit({ type: "tool-output-partial", toolCallId, output });
|
|
}
|
|
} else if (phase === "result") {
|
|
const isError = ev.data?.isError === true;
|
|
const result = extractToolResult(ev.data?.result);
|
|
if (isError) {
|
|
const errorText = result?.text || (result?.details?.error as string | undefined) || "Tool execution failed";
|
|
emit({ type: "tool-output-error", toolCallId, errorText });
|
|
const idx = accToolMap.get(toolCallId);
|
|
if (idx !== undefined) {
|
|
const part = run.accumulated.parts[idx];
|
|
if (part.type === "tool-invocation") {
|
|
part.errorText = errorText;
|
|
}
|
|
}
|
|
} else {
|
|
const output = buildToolOutput(result);
|
|
emit({ type: "tool-output-available", toolCallId, output });
|
|
const idx = accToolMap.get(toolCallId);
|
|
if (idx !== undefined) {
|
|
const part = run.accumulated.parts[idx];
|
|
if (part.type === "tool-invocation") { part.result = output; }
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
if (ev.event === "agent" && ev.stream === "lifecycle" && (ev.data?.phase === "fallback" || ev.data?.phase === "fallback_cleared")) {
|
|
const data = ev.data;
|
|
const selected = resolveModelLabel(data?.selectedProvider, data?.selectedModel)
|
|
?? resolveModelLabel(data?.fromProvider, data?.fromModel);
|
|
const active = resolveModelLabel(data?.activeProvider, data?.activeModel)
|
|
?? resolveModelLabel(data?.toProvider, data?.toModel);
|
|
if (selected && active) {
|
|
const isClear = data?.phase === "fallback_cleared";
|
|
const reason = typeof data?.reasonSummary === "string" ? data.reasonSummary
|
|
: typeof data?.reason === "string" ? data.reason : undefined;
|
|
const label = isClear
|
|
? `Restored to ${selected}`
|
|
: `Switched to ${active}${reason ? ` (${reason})` : ""}`;
|
|
openStatusReasoning(label);
|
|
}
|
|
}
|
|
|
|
if (ev.event === "agent" && ev.stream === "compaction") {
|
|
const phase = typeof ev.data?.phase === "string" ? ev.data.phase : undefined;
|
|
if (phase === "start") { openStatusReasoning("Optimizing session context..."); }
|
|
else if (phase === "end") {
|
|
if (statusReasoningActive) {
|
|
if (ev.data?.willRetry === true) {
|
|
const retryDelta = "\nRetrying with compacted context...";
|
|
emit({ type: "reasoning-delta", id: currentReasoningId, delta: retryDelta });
|
|
accAppendReasoning(retryDelta);
|
|
} else { closeReasoning(); }
|
|
}
|
|
}
|
|
}
|
|
|
|
if (ev.event === "chat") {
|
|
const finalText = extractAssistantTextFromChatPayload(ev.data);
|
|
if (finalText) {
|
|
closeReasoning();
|
|
if (!textStarted) {
|
|
currentTextId = nextId("text");
|
|
emit({ type: "text-start", id: currentTextId });
|
|
textStarted = true;
|
|
}
|
|
emit({ type: "text-delta", id: currentTextId, delta: finalText });
|
|
accAppendText(finalText);
|
|
closeText();
|
|
}
|
|
}
|
|
|
|
if (ev.event === "agent" && ev.stream === "lifecycle" && ev.data?.phase === "end") {
|
|
maybeBackfillLiveToolsFromTranscript("lifecycle-end");
|
|
closeReasoning();
|
|
closeText();
|
|
run._lifecycleEnded = true;
|
|
if (run._finalizeTimer) { clearTimeout(run._finalizeTimer); }
|
|
run._finalizeTimer = setTimeout(() => {
|
|
run._finalizeTimer = null;
|
|
if (run.status === "running") { finalizeSubscribeRun(run); }
|
|
}, SUBSCRIBE_LIFECYCLE_END_GRACE_MS);
|
|
}
|
|
|
|
if (ev.event === "agent" && ev.stream === "lifecycle" && ev.data?.phase === "error" && !agentErrorReported) {
|
|
const msg = parseAgentErrorMessage(ev.data);
|
|
if (msg) { agentErrorReported = true; emitError(msg); }
|
|
finalizeSubscribeRun(run, "error");
|
|
}
|
|
|
|
if (ev.event === "error" && !agentErrorReported) {
|
|
const msg = parseAgentErrorMessage(ev.data ?? (ev as unknown as Record<string, unknown>));
|
|
if (msg) { agentErrorReported = true; emitError(msg); }
|
|
}
|
|
};
|
|
|
|
const rl = createInterface({ input: child.stdout! });
|
|
|
|
rl.on("line", (line: string) => {
|
|
if (!line.trim()) { return; }
|
|
let ev: AgentEvent;
|
|
try { ev = JSON.parse(line) as AgentEvent; } catch { return; }
|
|
if (ev.sessionKey && ev.sessionKey !== sessionKey) { return; }
|
|
const gSeq = typeof (ev as Record<string, unknown>).globalSeq === "number"
|
|
? (ev as Record<string, unknown>).globalSeq as number
|
|
: undefined;
|
|
if (gSeq !== undefined) {
|
|
if (gSeq <= run.lastGlobalSeq) { return; }
|
|
run.lastGlobalSeq = gSeq;
|
|
}
|
|
if ((run._subscribeRetryAttempt ?? 0) > 0) {
|
|
resetSubscribeRetryState(run);
|
|
}
|
|
processEvent(ev);
|
|
});
|
|
|
|
child.on("close", () => {
|
|
if (run._subscribeProcess !== child) {
|
|
return;
|
|
}
|
|
run._subscribeProcess = null;
|
|
if (run.status !== "running") { return; }
|
|
if (run._lifecycleEnded) {
|
|
if (run._finalizeTimer) { clearTimeout(run._finalizeTimer); run._finalizeTimer = null; }
|
|
finalizeSubscribeRun(run);
|
|
return;
|
|
}
|
|
scheduleSubscribeRestart(run, () => {
|
|
if (run.status === "running" && !run._subscribeProcess) {
|
|
const newChild = spawnAgentSubscribeProcess(sessionKey, run.lastGlobalSeq);
|
|
run._subscribeProcess = newChild;
|
|
run.childProcess = newChild;
|
|
wireSubscribeOnlyProcess(run, newChild, sessionKey);
|
|
}
|
|
});
|
|
});
|
|
|
|
child.on("error", (err) => {
|
|
console.error("[active-runs] Subscribe child error:", err);
|
|
});
|
|
|
|
child.stderr?.on("data", (chunk: Buffer) => {
|
|
console.error("[active-runs subscribe stderr]", chunk.toString());
|
|
});
|
|
|
|
run._subscribeProcess = child;
|
|
}
|
|
|
|
function finalizeSubscribeRun(run: ActiveRun, status: "completed" | "error" = "completed"): void {
|
|
if (run.status !== "running") { return; }
|
|
if (run._finalizeTimer) { clearTimeout(run._finalizeTimer); run._finalizeTimer = null; }
|
|
clearWaitingFinalizeTimer(run);
|
|
resetSubscribeRetryState(run);
|
|
|
|
run.status = status;
|
|
flushPersistence(run);
|
|
|
|
for (const sub of run.subscribers) {
|
|
try { sub(null); } catch { /* ignore */ }
|
|
}
|
|
run.subscribers.clear();
|
|
|
|
stopSubscribeProcess(run);
|
|
|
|
const hasToolParts = run.accumulated.parts.some((p) => p.type === "tool-invocation");
|
|
|
|
// Deferred enrichment: after the gateway writes the transcript (2s delay),
|
|
// backfill tool-invocation parts from the transcript into the web-chat JSONL.
|
|
if (run.isSubscribeOnly && run.sessionKey && !hasToolParts) {
|
|
const sessionKey = run.sessionKey;
|
|
setTimeout(() => { deferredTranscriptEnrich(sessionKey); }, 2_000);
|
|
}
|
|
|
|
const grace = run.isSubscribeOnly ? SUBSCRIBE_CLEANUP_GRACE_MS : CLEANUP_GRACE_MS;
|
|
setTimeout(() => {
|
|
if (activeRuns.get(run.sessionId) === run) { cleanupRun(run.sessionId); }
|
|
}, grace);
|
|
}
|
|
|
|
/**
|
|
* Deferred enrichment: reads the gateway's session transcript and backfills
|
|
* tool-invocation parts into the web-chat JSONL for a subagent session.
|
|
* Matches tools to assistant messages by text content to avoid index-mapping issues.
|
|
*/
|
|
function deferredTranscriptEnrich(sessionKey: string): void {
|
|
try {
|
|
const stateDir = resolveOpenClawStateDir();
|
|
const agentId = resolveActiveAgentId();
|
|
const sessionsJsonPath = join(stateDir, "agents", agentId, "sessions", "sessions.json");
|
|
if (!existsSync(sessionsJsonPath)) {return;}
|
|
|
|
const sessions = JSON.parse(readFileSync(sessionsJsonPath, "utf-8")) as Record<string, Record<string, unknown>>;
|
|
const sessionData = sessions[sessionKey];
|
|
const sessionId = typeof sessionData?.sessionId === "string" ? sessionData.sessionId : null;
|
|
if (!sessionId) {return;}
|
|
|
|
const transcriptPath = join(stateDir, "agents", agentId, "sessions", `${sessionId}.jsonl`);
|
|
if (!existsSync(transcriptPath)) {return;}
|
|
|
|
// Build text→tools map from transcript
|
|
const entries = readFileSync(transcriptPath, "utf-8")
|
|
.split("\n").filter((l) => l.trim())
|
|
.map((l) => { try { return JSON.parse(l); } catch { return null; } })
|
|
.filter(Boolean) as Array<Record<string, unknown>>;
|
|
|
|
const toolResults = new Map<string, Record<string, unknown>>();
|
|
let pendingTools: Array<Record<string, unknown>> = [];
|
|
const textToTools = new Map<string, Array<Record<string, unknown>>>();
|
|
|
|
for (const entry of entries) {
|
|
if (entry.type !== "message") {continue;}
|
|
const msg = entry.message as Record<string, unknown> | undefined;
|
|
if (!msg) {continue;}
|
|
const content = msg.content;
|
|
if (msg.role === "user") { pendingTools = []; }
|
|
if (msg.role === "toolResult" && typeof msg.toolCallId === "string") {
|
|
const text = Array.isArray(content)
|
|
? (content as Array<Record<string, unknown>>)
|
|
.filter((c) => c.type === "text" && typeof c.text === "string")
|
|
.map((c) => c.text as string).join("\n")
|
|
: typeof content === "string" ? content : "";
|
|
toolResults.set(msg.toolCallId, { text: text.slice(0, 500) });
|
|
}
|
|
if (msg.role !== "assistant" || !Array.isArray(content)) {continue;}
|
|
for (const part of content as Array<Record<string, unknown>>) {
|
|
if (part.type === "toolCall" && typeof part.id === "string" && typeof part.name === "string") {
|
|
pendingTools.push({
|
|
type: "tool-invocation", toolCallId: part.id,
|
|
toolName: part.name, args: (part.arguments as Record<string, unknown>) ?? {},
|
|
});
|
|
}
|
|
}
|
|
const textParts = (content as Array<Record<string, unknown>>)
|
|
.filter((c) => c.type === "text" && typeof c.text === "string")
|
|
.map((c) => (c.text as string).trim()).filter(Boolean);
|
|
if (textParts.length > 0 && pendingTools.length > 0) {
|
|
const key = textParts.join("\n").slice(0, 200);
|
|
if (key.length >= 10) {
|
|
const toolsWithResults = pendingTools.map((tp) => {
|
|
const result = toolResults.get(tp.toolCallId as string);
|
|
return result ? { ...tp, result } : tp;
|
|
});
|
|
textToTools.set(key, toolsWithResults);
|
|
pendingTools = [];
|
|
}
|
|
}
|
|
}
|
|
|
|
if (textToTools.size === 0) {return;}
|
|
|
|
// Read and enrich web-chat JSONL
|
|
const fp = join(webChatDir(), `${sessionKey}.jsonl`);
|
|
if (!existsSync(fp)) {return;}
|
|
const lines = readFileSync(fp, "utf-8").split("\n").filter((l) => l.trim());
|
|
const messages = lines.map((l) => { try { return JSON.parse(l) as Record<string, unknown>; } catch { return null; } }).filter(Boolean) as Array<Record<string, unknown>>;
|
|
|
|
let changed = false;
|
|
const enriched = messages.map((m) => {
|
|
if (m.role !== "assistant") {return m;}
|
|
const parts = (m.parts as Array<Record<string, unknown>>) ?? [{ type: "text", text: m.content }];
|
|
if (parts.some((p) => p.type === "tool-invocation")) {return m;}
|
|
const msgText = parts
|
|
.filter((p) => p.type === "text" && typeof p.text === "string")
|
|
.map((p) => (p.text as string).trim()).filter(Boolean)
|
|
.join("\n").slice(0, 200);
|
|
if (msgText.length < 10) {return m;}
|
|
const tools = textToTools.get(msgText);
|
|
if (!tools || tools.length === 0) {return m;}
|
|
const textP = parts.filter((p) => p.type === "text");
|
|
const otherP = parts.filter((p) => p.type !== "text");
|
|
changed = true;
|
|
return { ...m, parts: [...otherP, ...tools, ...textP] };
|
|
});
|
|
|
|
if (changed) {
|
|
writeFileSync(fp, enriched.map((m) => JSON.stringify(m)).join("\n") + "\n");
|
|
}
|
|
} catch { /* best effort */ }
|
|
}
|
|
|
|
/**
|
|
* Opportunistic on-read backfill for subagent sessions.
|
|
* This is safe to call repeatedly; enrichment is idempotent.
|
|
*/
|
|
export function enrichSubagentSessionFromTranscript(sessionKey: string): void {
|
|
if (!sessionKey.includes(":subagent:")) {
|
|
return;
|
|
}
|
|
deferredTranscriptEnrich(sessionKey);
|
|
}
|
|
|
|
// ── Persistence helpers (called from route to persist user messages) ──
|
|
|
|
/** Save a user message to the session JSONL (called once at run start). */
|
|
export function persistUserMessage(
|
|
sessionId: string,
|
|
msg: { id: string; content: string; parts?: unknown[] },
|
|
): void {
|
|
ensureDir();
|
|
const filePath = join(webChatDir(), `${sessionId}.jsonl`);
|
|
if (!existsSync(filePath)) {writeFileSync(filePath, "");}
|
|
|
|
const line = JSON.stringify({
|
|
id: msg.id,
|
|
role: "user",
|
|
content: msg.content,
|
|
...(msg.parts ? { parts: msg.parts } : {}),
|
|
timestamp: new Date().toISOString(),
|
|
});
|
|
|
|
// Avoid duplicates (e.g. retry).
|
|
const existing = readFileSync(filePath, "utf-8");
|
|
const lines = existing.split("\n").filter((l) => l.trim());
|
|
const alreadySaved = lines.some((l) => {
|
|
try {
|
|
return JSON.parse(l).id === msg.id;
|
|
} catch {
|
|
return false;
|
|
}
|
|
});
|
|
|
|
if (!alreadySaved) {
|
|
writeFileSync(filePath, [...lines, line].join("\n") + "\n");
|
|
updateIndex(sessionId, { incrementCount: 1 });
|
|
}
|
|
}
|
|
|
|
// ── Internals ──
|
|
|
|
function ensureDir() {
|
|
const dir = webChatDir();
|
|
if (!existsSync(dir)) {
|
|
mkdirSync(dir, { recursive: true });
|
|
}
|
|
}
|
|
|
|
function updateIndex(
|
|
sessionId: string,
|
|
opts: { incrementCount?: number; title?: string },
|
|
) {
|
|
try {
|
|
const idxPath = indexFile();
|
|
let index: Array<Record<string, unknown>>;
|
|
if (!existsSync(idxPath)) {
|
|
// Auto-create index with a bootstrap entry for this session so
|
|
// orphaned .jsonl files become visible in the sidebar.
|
|
index = [{
|
|
id: sessionId,
|
|
title: opts.title || "New Chat",
|
|
createdAt: Date.now(),
|
|
updatedAt: Date.now(),
|
|
messageCount: opts.incrementCount || 0,
|
|
}];
|
|
writeFileSync(idxPath, JSON.stringify(index, null, 2));
|
|
return;
|
|
}
|
|
index = JSON.parse(
|
|
readFileSync(idxPath, "utf-8"),
|
|
) as Array<Record<string, unknown>>;
|
|
let session = index.find((s) => s.id === sessionId);
|
|
if (!session) {
|
|
// Session file exists but wasn't indexed — add it.
|
|
session = {
|
|
id: sessionId,
|
|
title: opts.title || "New Chat",
|
|
createdAt: Date.now(),
|
|
updatedAt: Date.now(),
|
|
messageCount: 0,
|
|
};
|
|
index.unshift(session);
|
|
}
|
|
session.updatedAt = Date.now();
|
|
if (opts.incrementCount) {
|
|
session.messageCount =
|
|
((session.messageCount as number) || 0) + opts.incrementCount;
|
|
}
|
|
if (opts.title) {session.title = opts.title;}
|
|
writeFileSync(idxPath, JSON.stringify(index, null, 2));
|
|
} catch {
|
|
/* best-effort */
|
|
}
|
|
}
|
|
|
|
// ── SSE event generation from child-process JSON lines ──
|
|
|
|
function wireChildProcess(run: ActiveRun): void {
|
|
const child = run.childProcess;
|
|
|
|
let idCounter = 0;
|
|
const nextId = (prefix: string) =>
|
|
`${prefix}-${Date.now()}-${++idCounter}`;
|
|
|
|
let currentTextId = "";
|
|
let currentReasoningId = "";
|
|
let currentStatusReasoningLabel: string | null = null;
|
|
let textStarted = false;
|
|
let reasoningStarted = false;
|
|
let everSentText = false;
|
|
let statusReasoningActive = false;
|
|
let waitingStatusAnnounced = false;
|
|
let agentErrorReported = false;
|
|
const stderrChunks: string[] = [];
|
|
|
|
// ── Ordered accumulation tracking (preserves interleaving for persistence) ──
|
|
let accTextIdx = -1;
|
|
let accReasoningIdx = -1;
|
|
const accToolMap = new Map<string, number>();
|
|
|
|
const accAppendReasoning = (delta: string) => {
|
|
if (accReasoningIdx < 0) {
|
|
run.accumulated.parts.push({ type: "reasoning", text: delta });
|
|
accReasoningIdx = run.accumulated.parts.length - 1;
|
|
} else {
|
|
(run.accumulated.parts[accReasoningIdx] as { type: "reasoning"; text: string }).text += delta;
|
|
}
|
|
};
|
|
|
|
const accAppendText = (delta: string) => {
|
|
if (accTextIdx < 0) {
|
|
run.accumulated.parts.push({ type: "text", text: delta });
|
|
accTextIdx = run.accumulated.parts.length - 1;
|
|
} else {
|
|
(run.accumulated.parts[accTextIdx] as { type: "text"; text: string }).text += delta;
|
|
}
|
|
};
|
|
|
|
/** Emit an SSE event: push to buffer + notify all subscribers. */
|
|
const emit = (event: SseEvent) => {
|
|
run.eventBuffer.push(event);
|
|
for (const sub of run.subscribers) {
|
|
try {
|
|
sub(event);
|
|
} catch {
|
|
/* ignore subscriber errors */
|
|
}
|
|
}
|
|
schedulePersist(run);
|
|
};
|
|
|
|
const closeReasoning = () => {
|
|
if (reasoningStarted) {
|
|
emit({ type: "reasoning-end", id: currentReasoningId });
|
|
reasoningStarted = false;
|
|
statusReasoningActive = false;
|
|
}
|
|
currentStatusReasoningLabel = null;
|
|
accReasoningIdx = -1;
|
|
};
|
|
|
|
const closeText = () => {
|
|
if (textStarted) {
|
|
if (accTextIdx >= 0) {
|
|
const part = run.accumulated.parts[accTextIdx] as { type: "text"; text: string };
|
|
if (isLeakedSilentReplyToken(part.text)) {
|
|
run.accumulated.parts.splice(accTextIdx, 1);
|
|
for (const [k, v] of accToolMap) {
|
|
if (v > accTextIdx) { accToolMap.set(k, v - 1); }
|
|
}
|
|
}
|
|
}
|
|
emit({ type: "text-end", id: currentTextId });
|
|
textStarted = false;
|
|
}
|
|
accTextIdx = -1;
|
|
};
|
|
|
|
const openStatusReasoning = (label: string) => {
|
|
if (statusReasoningActive && currentStatusReasoningLabel === label) {
|
|
return;
|
|
}
|
|
closeReasoning();
|
|
closeText();
|
|
currentReasoningId = nextId("status");
|
|
emit({ type: "reasoning-start", id: currentReasoningId });
|
|
emit({
|
|
type: "reasoning-delta",
|
|
id: currentReasoningId,
|
|
delta: label,
|
|
});
|
|
reasoningStarted = true;
|
|
statusReasoningActive = true;
|
|
currentStatusReasoningLabel = label;
|
|
accAppendReasoning(label);
|
|
};
|
|
|
|
const emitError = (message: string) => {
|
|
closeReasoning();
|
|
closeText();
|
|
const tid = nextId("text");
|
|
emit({ type: "text-start", id: tid });
|
|
emit({ type: "text-delta", id: tid, delta: `[error] ${message}` });
|
|
emit({ type: "text-end", id: tid });
|
|
accAppendText(`[error] ${message}`);
|
|
accTextIdx = -1; // error text is self-contained
|
|
everSentText = true;
|
|
};
|
|
|
|
const emitAssistantFinalText = (text: string) => {
|
|
if (!text) {
|
|
return;
|
|
}
|
|
closeReasoning();
|
|
if (!textStarted) {
|
|
currentTextId = nextId("text");
|
|
emit({ type: "text-start", id: currentTextId });
|
|
textStarted = true;
|
|
}
|
|
everSentText = true;
|
|
emit({ type: "text-delta", id: currentTextId, delta: text });
|
|
accAppendText(text);
|
|
closeText();
|
|
};
|
|
|
|
// ── Parse stdout JSON lines ──
|
|
|
|
const rl = createInterface({ input: child.stdout! });
|
|
const parentSessionKey = `agent:${resolveActiveAgentId()}:web:${run.sessionId}`;
|
|
// Prevent unhandled 'error' events on the readline interface.
|
|
// When the child process fails to start (e.g. ENOENT — missing script)
|
|
// the stdout pipe is destroyed and readline re-emits the error. Without
|
|
// this handler Node.js throws "Unhandled 'error' event" which crashes
|
|
// the API route instead of surfacing a clean message to the user.
|
|
rl.on("error", () => {
|
|
// Swallow — the child 'error' / 'close' handlers take care of
|
|
// emitting user-visible diagnostics.
|
|
});
|
|
|
|
// ── Reusable parent event processor ──
|
|
// Handles lifecycle, thinking, assistant text, tool, compaction, and error
|
|
// events for the parent agent. Used by both the CLI NDJSON stream and the
|
|
// subscribe-only CLI fallback (waiting-for-subagents state).
|
|
|
|
const processParentEvent = (ev: AgentEvent) => {
|
|
// Lifecycle start
|
|
if (
|
|
ev.event === "agent" &&
|
|
ev.stream === "lifecycle" &&
|
|
ev.data?.phase === "start"
|
|
) {
|
|
openStatusReasoning("Preparing response...");
|
|
}
|
|
|
|
// Thinking / reasoning
|
|
if (ev.event === "agent" && ev.stream === "thinking") {
|
|
const delta =
|
|
typeof ev.data?.delta === "string"
|
|
? ev.data.delta
|
|
: undefined;
|
|
if (delta) {
|
|
if (statusReasoningActive) {closeReasoning();}
|
|
if (!reasoningStarted) {
|
|
currentReasoningId = nextId("reasoning");
|
|
emit({
|
|
type: "reasoning-start",
|
|
id: currentReasoningId,
|
|
});
|
|
reasoningStarted = true;
|
|
}
|
|
emit({
|
|
type: "reasoning-delta",
|
|
id: currentReasoningId,
|
|
delta,
|
|
});
|
|
accAppendReasoning(delta);
|
|
}
|
|
}
|
|
|
|
// Assistant text
|
|
if (ev.event === "agent" && ev.stream === "assistant") {
|
|
const delta =
|
|
typeof ev.data?.delta === "string"
|
|
? ev.data.delta
|
|
: undefined;
|
|
const textFallback =
|
|
!delta && typeof ev.data?.text === "string"
|
|
? ev.data.text
|
|
: undefined;
|
|
const chunk = delta ?? textFallback;
|
|
if (chunk) {
|
|
closeReasoning();
|
|
if (!textStarted) {
|
|
currentTextId = nextId("text");
|
|
emit({ type: "text-start", id: currentTextId });
|
|
textStarted = true;
|
|
}
|
|
everSentText = true;
|
|
emit({ type: "text-delta", id: currentTextId, delta: chunk });
|
|
accAppendText(chunk);
|
|
}
|
|
// Media URLs
|
|
const mediaUrls = ev.data?.mediaUrls;
|
|
if (Array.isArray(mediaUrls)) {
|
|
for (const url of mediaUrls) {
|
|
if (typeof url === "string" && url.trim()) {
|
|
closeReasoning();
|
|
if (!textStarted) {
|
|
currentTextId = nextId("text");
|
|
emit({
|
|
type: "text-start",
|
|
id: currentTextId,
|
|
});
|
|
textStarted = true;
|
|
}
|
|
everSentText = true;
|
|
const md = `\n})\n`;
|
|
emit({
|
|
type: "text-delta",
|
|
id: currentTextId,
|
|
delta: md,
|
|
});
|
|
accAppendText(md);
|
|
}
|
|
}
|
|
}
|
|
// Agent error inline (stopReason=error)
|
|
if (
|
|
typeof ev.data?.stopReason === "string" &&
|
|
ev.data.stopReason === "error" &&
|
|
typeof ev.data?.errorMessage === "string" &&
|
|
!agentErrorReported
|
|
) {
|
|
agentErrorReported = true;
|
|
emitError(parseErrorBody(ev.data.errorMessage));
|
|
}
|
|
}
|
|
|
|
// Tool events
|
|
if (ev.event === "agent" && ev.stream === "tool") {
|
|
const phase =
|
|
typeof ev.data?.phase === "string"
|
|
? ev.data.phase
|
|
: undefined;
|
|
const toolCallId =
|
|
typeof ev.data?.toolCallId === "string"
|
|
? ev.data.toolCallId
|
|
: "";
|
|
const toolName =
|
|
typeof ev.data?.name === "string" ? ev.data.name : "";
|
|
|
|
if (phase === "start") {
|
|
closeReasoning();
|
|
closeText();
|
|
const args =
|
|
ev.data?.args && typeof ev.data.args === "object"
|
|
? (ev.data.args as Record<string, unknown>)
|
|
: {};
|
|
emit({ type: "tool-input-start", toolCallId, toolName });
|
|
emit({
|
|
type: "tool-input-available",
|
|
toolCallId,
|
|
toolName,
|
|
input: args,
|
|
});
|
|
run.accumulated.parts.push({
|
|
type: "tool-invocation",
|
|
toolCallId,
|
|
toolName,
|
|
args,
|
|
});
|
|
accToolMap.set(toolCallId, run.accumulated.parts.length - 1);
|
|
} else if (phase === "update") {
|
|
const partialResult = extractToolResult(ev.data?.partialResult);
|
|
if (partialResult) {
|
|
const output = buildToolOutput(partialResult);
|
|
emit({ type: "tool-output-partial", toolCallId, output });
|
|
}
|
|
} else if (phase === "result") {
|
|
const isError = ev.data?.isError === true;
|
|
const result = extractToolResult(ev.data?.result);
|
|
if (isError) {
|
|
const errorText =
|
|
result?.text ||
|
|
(result?.details?.error as string | undefined) ||
|
|
"Tool execution failed";
|
|
emit({
|
|
type: "tool-output-error",
|
|
toolCallId,
|
|
errorText,
|
|
});
|
|
const idx = accToolMap.get(toolCallId);
|
|
if (idx !== undefined) {
|
|
const part = run.accumulated.parts[idx];
|
|
if (part.type === "tool-invocation") {
|
|
part.errorText = errorText;
|
|
}
|
|
}
|
|
} else {
|
|
const output = buildToolOutput(result);
|
|
emit({
|
|
type: "tool-output-available",
|
|
toolCallId,
|
|
output,
|
|
});
|
|
const idx = accToolMap.get(toolCallId);
|
|
if (idx !== undefined) {
|
|
const part = run.accumulated.parts[idx];
|
|
if (part.type === "tool-invocation") {
|
|
part.result = output;
|
|
}
|
|
}
|
|
}
|
|
|
|
if (toolName === "sessions_spawn" && !isError) {
|
|
const childSessionKey =
|
|
result?.details?.childSessionKey as string | undefined;
|
|
if (childSessionKey) {
|
|
const spawnArgs = accToolMap.has(toolCallId)
|
|
? (run.accumulated.parts[accToolMap.get(toolCallId)!] as { args?: Record<string, unknown> })?.args
|
|
: undefined;
|
|
startSubscribeRun({
|
|
sessionKey: childSessionKey,
|
|
parentSessionId: run.sessionId,
|
|
task: (spawnArgs?.task as string | undefined) ?? "Subagent task",
|
|
label: spawnArgs?.label as string | undefined,
|
|
});
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// Model fallback events
|
|
if (
|
|
ev.event === "agent" &&
|
|
ev.stream === "lifecycle" &&
|
|
(ev.data?.phase === "fallback" || ev.data?.phase === "fallback_cleared")
|
|
) {
|
|
const data = ev.data;
|
|
const selected = resolveModelLabel(data?.selectedProvider, data?.selectedModel)
|
|
?? resolveModelLabel(data?.fromProvider, data?.fromModel);
|
|
const active = resolveModelLabel(data?.activeProvider, data?.activeModel)
|
|
?? resolveModelLabel(data?.toProvider, data?.toModel);
|
|
if (selected && active) {
|
|
const isClear = data?.phase === "fallback_cleared";
|
|
const reason = typeof data?.reasonSummary === "string" ? data.reasonSummary
|
|
: typeof data?.reason === "string" ? data.reason : undefined;
|
|
const label = isClear
|
|
? `Restored to ${selected}`
|
|
: `Switched to ${active}${reason ? ` (${reason})` : ""}`;
|
|
openStatusReasoning(label);
|
|
}
|
|
}
|
|
|
|
// Chat final events can include assistant turns from runs outside
|
|
// the original parent process (e.g. subagent announce follow-ups).
|
|
if (ev.event === "chat") {
|
|
const text = extractAssistantTextFromChatPayload(ev.data);
|
|
if (text) {
|
|
emitAssistantFinalText(text);
|
|
}
|
|
}
|
|
|
|
// Compaction
|
|
if (ev.event === "agent" && ev.stream === "compaction") {
|
|
const phase =
|
|
typeof ev.data?.phase === "string"
|
|
? ev.data.phase
|
|
: undefined;
|
|
if (phase === "start") {
|
|
openStatusReasoning("Optimizing session context...");
|
|
} else if (phase === "end") {
|
|
if (statusReasoningActive) {
|
|
if (ev.data?.willRetry === true) {
|
|
const retryDelta = "\nRetrying with compacted context...";
|
|
emit({
|
|
type: "reasoning-delta",
|
|
id: currentReasoningId,
|
|
delta: retryDelta,
|
|
});
|
|
accAppendReasoning(retryDelta);
|
|
} else {
|
|
closeReasoning();
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// Lifecycle end
|
|
if (
|
|
ev.event === "agent" &&
|
|
ev.stream === "lifecycle" &&
|
|
ev.data?.phase === "end"
|
|
) {
|
|
closeReasoning();
|
|
closeText();
|
|
}
|
|
|
|
// Lifecycle error
|
|
if (
|
|
ev.event === "agent" &&
|
|
ev.stream === "lifecycle" &&
|
|
ev.data?.phase === "error" &&
|
|
!agentErrorReported
|
|
) {
|
|
const msg = parseAgentErrorMessage(ev.data);
|
|
if (msg) {
|
|
agentErrorReported = true;
|
|
emitError(msg);
|
|
}
|
|
}
|
|
|
|
// Top-level error event
|
|
if (ev.event === "error" && !agentErrorReported) {
|
|
const msg = parseAgentErrorMessage(
|
|
ev.data ??
|
|
(ev as unknown as Record<string, unknown>),
|
|
);
|
|
if (msg) {
|
|
agentErrorReported = true;
|
|
emitError(msg);
|
|
}
|
|
}
|
|
};
|
|
|
|
const processParentSubscribeEvent = (ev: AgentEvent) => {
|
|
const gSeq = typeof (ev as Record<string, unknown>).globalSeq === "number"
|
|
? (ev as Record<string, unknown>).globalSeq as number
|
|
: undefined;
|
|
if (gSeq !== undefined) {
|
|
if (gSeq <= run.lastGlobalSeq) {return;}
|
|
run.lastGlobalSeq = gSeq;
|
|
}
|
|
|
|
const showWaitingStatus = () => {
|
|
if (!waitingStatusAnnounced) {
|
|
openStatusReasoning("Waiting for subagent results...");
|
|
waitingStatusAnnounced = true;
|
|
}
|
|
flushPersistence(run);
|
|
};
|
|
|
|
const scheduleWaitingCompletionCheck = () => {
|
|
clearWaitingFinalizeTimer(run);
|
|
run._waitingFinalizeTimer = setTimeout(() => {
|
|
run._waitingFinalizeTimer = null;
|
|
if (run.status !== "waiting-for-subagents") {
|
|
return;
|
|
}
|
|
if (hasRunningSubagentsForParent(run.sessionId)) {
|
|
showWaitingStatus();
|
|
return;
|
|
}
|
|
finalizeWaitingRun(run);
|
|
}, WAITING_FINALIZE_RECONCILE_MS);
|
|
};
|
|
|
|
// Any new parent event means waiting completion should be reconsidered
|
|
// from this point forward, not from a prior end/final checkpoint.
|
|
clearWaitingFinalizeTimer(run);
|
|
|
|
processParentEvent(ev);
|
|
if (ev.stream === "lifecycle" && ev.data?.phase === "end") {
|
|
if (hasRunningSubagentsForParent(run.sessionId)) {
|
|
clearWaitingFinalizeTimer(run);
|
|
showWaitingStatus();
|
|
} else {
|
|
scheduleWaitingCompletionCheck();
|
|
}
|
|
}
|
|
if (ev.event === "chat") {
|
|
const payload = ev.data;
|
|
const state = typeof payload?.state === "string" ? payload.state : "";
|
|
const message = asRecord(payload?.message);
|
|
const role = typeof message?.role === "string" ? message.role : "";
|
|
if (state === "final" && role === "assistant") {
|
|
if (hasRunningSubagentsForParent(run.sessionId)) {
|
|
clearWaitingFinalizeTimer(run);
|
|
showWaitingStatus();
|
|
} else {
|
|
scheduleWaitingCompletionCheck();
|
|
}
|
|
}
|
|
}
|
|
};
|
|
|
|
rl.on("line", (line: string) => {
|
|
if (!line.trim()) {return;}
|
|
|
|
let ev: AgentEvent;
|
|
try {
|
|
ev = JSON.parse(line) as AgentEvent;
|
|
} catch {
|
|
return;
|
|
}
|
|
|
|
// Skip events from other sessions (e.g. subagent broadcasts that
|
|
// the gateway delivers on the same WS connection).
|
|
if (ev.sessionKey && ev.sessionKey !== parentSessionKey) {
|
|
return;
|
|
}
|
|
|
|
// Track the global event cursor from the gateway for replay on handoff.
|
|
const gSeq = typeof (ev as Record<string, unknown>).globalSeq === "number"
|
|
? (ev as Record<string, unknown>).globalSeq as number
|
|
: undefined;
|
|
if (gSeq !== undefined && gSeq > run.lastGlobalSeq) {
|
|
run.lastGlobalSeq = gSeq;
|
|
}
|
|
|
|
processParentEvent(ev);
|
|
});
|
|
|
|
// ── Child process exit ──
|
|
|
|
child.on("close", (code) => {
|
|
// If already finalized (e.g. by abortRun), just record the exit code.
|
|
if (run.status !== "running") {
|
|
run.exitCode = code;
|
|
return;
|
|
}
|
|
|
|
if (!agentErrorReported && stderrChunks.length > 0) {
|
|
const stderr = stderrChunks.join("").trim();
|
|
const msg = parseErrorFromStderr(stderr);
|
|
if (msg) {
|
|
agentErrorReported = true;
|
|
emitError(msg);
|
|
}
|
|
}
|
|
|
|
closeReasoning();
|
|
|
|
const exitedClean = code === 0 || code === null;
|
|
|
|
if (!everSentText && !exitedClean) {
|
|
const tid = nextId("text");
|
|
emit({ type: "text-start", id: tid });
|
|
const errMsg = `[error] Agent exited with code ${code}. Check server logs for details.`;
|
|
emit({ type: "text-delta", id: tid, delta: errMsg });
|
|
emit({ type: "text-end", id: tid });
|
|
accAppendText(errMsg);
|
|
} else if (!everSentText && exitedClean) {
|
|
const tid = nextId("text");
|
|
emit({ type: "text-start", id: tid });
|
|
const msg = "No response from agent.";
|
|
emit({ type: "text-delta", id: tid, delta: msg });
|
|
emit({ type: "text-end", id: tid });
|
|
accAppendText(msg);
|
|
} else {
|
|
closeText();
|
|
}
|
|
|
|
run.exitCode = code;
|
|
|
|
const hasRunningSubagents = hasRunningSubagentsForParent(run.sessionId);
|
|
|
|
// If the CLI exited cleanly and subagents are still running,
|
|
// keep the SSE stream open and wait for announcement-triggered
|
|
// parent turns via subscribe-only CLI NDJSON.
|
|
if (exitedClean && hasRunningSubagents) {
|
|
run.status = "waiting-for-subagents";
|
|
|
|
if (!waitingStatusAnnounced) {
|
|
openStatusReasoning("Waiting for subagent results...");
|
|
waitingStatusAnnounced = true;
|
|
}
|
|
flushPersistence(run);
|
|
startParentSubscribeStream(run, parentSessionKey, processParentSubscribeEvent);
|
|
return;
|
|
}
|
|
|
|
// Normal completion path.
|
|
run.status = exitedClean ? "completed" : "error";
|
|
|
|
// Final persistence flush (removes _streaming flag).
|
|
flushPersistence(run);
|
|
|
|
// Signal completion to all subscribers.
|
|
for (const sub of run.subscribers) {
|
|
try {
|
|
sub(null);
|
|
} catch {
|
|
/* ignore */
|
|
}
|
|
}
|
|
run.subscribers.clear();
|
|
|
|
// Clean up run state after a grace period so reconnections
|
|
// within that window still get the buffered events.
|
|
// Guard: only clean up if we're still the active run for this session.
|
|
setTimeout(() => {
|
|
if (activeRuns.get(run.sessionId) === run) {
|
|
cleanupRun(run.sessionId);
|
|
}
|
|
}, CLEANUP_GRACE_MS);
|
|
});
|
|
|
|
child.on("error", (err) => {
|
|
// If already finalized (e.g. by abortRun), skip.
|
|
if (run.status !== "running") {return;}
|
|
|
|
console.error("[active-runs] Child process error:", err);
|
|
const message = err instanceof Error ? err.message : String(err);
|
|
emitError(`Failed to start agent: ${message}`);
|
|
run.status = "error";
|
|
flushPersistence(run);
|
|
for (const sub of run.subscribers) {
|
|
try {
|
|
sub(null);
|
|
} catch {
|
|
/* ignore */
|
|
}
|
|
}
|
|
run.subscribers.clear();
|
|
setTimeout(() => {
|
|
if (activeRuns.get(run.sessionId) === run) {
|
|
cleanupRun(run.sessionId);
|
|
}
|
|
}, CLEANUP_GRACE_MS);
|
|
});
|
|
|
|
child.stderr?.on("data", (chunk: Buffer) => {
|
|
const text = chunk.toString();
|
|
stderrChunks.push(text);
|
|
console.error("[active-runs stderr]", text);
|
|
});
|
|
}
|
|
|
|
function startParentSubscribeStream(
|
|
run: ActiveRun,
|
|
parentSessionKey: string,
|
|
onEvent: (ev: AgentEvent) => void,
|
|
): void {
|
|
stopSubscribeProcess(run);
|
|
const child = spawnAgentSubscribeProcess(parentSessionKey, run.lastGlobalSeq);
|
|
run._subscribeProcess = child;
|
|
const rl = createInterface({ input: child.stdout! });
|
|
|
|
rl.on("line", (line: string) => {
|
|
if (!line.trim()) {return;}
|
|
let ev: AgentEvent;
|
|
try {
|
|
ev = JSON.parse(line) as AgentEvent;
|
|
} catch {
|
|
return;
|
|
}
|
|
if (ev.sessionKey && ev.sessionKey !== parentSessionKey) {
|
|
return;
|
|
}
|
|
if ((run._subscribeRetryAttempt ?? 0) > 0) {
|
|
resetSubscribeRetryState(run);
|
|
}
|
|
onEvent(ev);
|
|
});
|
|
|
|
child.on("close", () => {
|
|
if (run._subscribeProcess !== child) {
|
|
return;
|
|
}
|
|
run._subscribeProcess = null;
|
|
if (run.status !== "waiting-for-subagents") {return;}
|
|
// If still waiting, restart subscribe stream from the latest cursor.
|
|
scheduleSubscribeRestart(run, () => {
|
|
if (run.status === "waiting-for-subagents" && !run._subscribeProcess) {
|
|
startParentSubscribeStream(run, parentSessionKey, onEvent);
|
|
}
|
|
});
|
|
});
|
|
|
|
child.on("error", (err) => {
|
|
console.error("[active-runs] Parent subscribe child error:", err);
|
|
});
|
|
|
|
child.stderr?.on("data", (chunk: Buffer) => {
|
|
console.error("[active-runs subscribe stderr]", chunk.toString());
|
|
});
|
|
}
|
|
|
|
function stopSubscribeProcess(run: ActiveRun): void {
|
|
clearSubscribeRetryTimer(run);
|
|
clearWaitingFinalizeTimer(run);
|
|
if (!run._subscribeProcess) {return;}
|
|
try {
|
|
run._subscribeProcess.kill("SIGTERM");
|
|
} catch {
|
|
/* ignore */
|
|
}
|
|
run._subscribeProcess = null;
|
|
}
|
|
|
|
// ── Finalize a waiting-for-subagents run ──
|
|
|
|
/**
|
|
* Transition a run from "waiting-for-subagents" to "completed".
|
|
* Called when the last subagent finishes and the parent's announcement-
|
|
* triggered turn completes.
|
|
*/
|
|
function finalizeWaitingRun(run: ActiveRun): void {
|
|
if (run.status !== "waiting-for-subagents") {return;}
|
|
|
|
run.status = "completed";
|
|
clearWaitingFinalizeTimer(run);
|
|
resetSubscribeRetryState(run);
|
|
|
|
stopSubscribeProcess(run);
|
|
|
|
flushPersistence(run);
|
|
|
|
for (const sub of run.subscribers) {
|
|
try { sub(null); } catch { /* ignore */ }
|
|
}
|
|
run.subscribers.clear();
|
|
|
|
setTimeout(() => {
|
|
if (activeRuns.get(run.sessionId) === run) {
|
|
cleanupRun(run.sessionId);
|
|
}
|
|
}, CLEANUP_GRACE_MS);
|
|
}
|
|
|
|
function clearWaitingFinalizeTimer(run: ActiveRun): void {
|
|
if (!run._waitingFinalizeTimer) {
|
|
return;
|
|
}
|
|
clearTimeout(run._waitingFinalizeTimer);
|
|
run._waitingFinalizeTimer = null;
|
|
}
|
|
|
|
function clearSubscribeRetryTimer(run: ActiveRun): void {
|
|
if (!run._subscribeRetryTimer) {
|
|
return;
|
|
}
|
|
clearTimeout(run._subscribeRetryTimer);
|
|
run._subscribeRetryTimer = null;
|
|
}
|
|
|
|
function resetSubscribeRetryState(run: ActiveRun): void {
|
|
run._subscribeRetryAttempt = 0;
|
|
clearSubscribeRetryTimer(run);
|
|
}
|
|
|
|
function scheduleSubscribeRestart(run: ActiveRun, restart: () => void): void {
|
|
if (run._subscribeRetryTimer) {
|
|
return;
|
|
}
|
|
const attempt = run._subscribeRetryAttempt ?? 0;
|
|
const delay = Math.min(
|
|
SUBSCRIBE_RETRY_MAX_MS,
|
|
SUBSCRIBE_RETRY_BASE_MS * 2 ** attempt,
|
|
);
|
|
run._subscribeRetryAttempt = attempt + 1;
|
|
run._subscribeRetryTimer = setTimeout(() => {
|
|
run._subscribeRetryTimer = null;
|
|
restart();
|
|
}, delay);
|
|
}
|
|
|
|
// ── Debounced persistence ──
|
|
|
|
function schedulePersist(run: ActiveRun) {
|
|
if (run._persistTimer) {return;}
|
|
const elapsed = Date.now() - run._lastPersistedAt;
|
|
const delay = Math.max(0, PERSIST_INTERVAL_MS - elapsed);
|
|
run._persistTimer = setTimeout(() => {
|
|
run._persistTimer = null;
|
|
flushPersistence(run);
|
|
}, delay);
|
|
}
|
|
|
|
function flushPersistence(run: ActiveRun) {
|
|
if (run._persistTimer) {
|
|
clearTimeout(run._persistTimer);
|
|
run._persistTimer = null;
|
|
}
|
|
run._lastPersistedAt = Date.now();
|
|
|
|
const parts = run.accumulated.parts;
|
|
if (parts.length === 0) {
|
|
return; // Nothing to persist yet.
|
|
}
|
|
|
|
// Filter out leaked silent-reply text fragments before persisting.
|
|
const cleanParts = parts.filter((p) =>
|
|
p.type !== "text" || !isLeakedSilentReplyToken((p as { text: string }).text),
|
|
);
|
|
|
|
// Build content text from text parts for the backwards-compatible
|
|
// content field (used when parts are not available).
|
|
const text = cleanParts
|
|
.filter((p): p is { type: "text"; text: string } => p.type === "text")
|
|
.map((p) => p.text)
|
|
.join("");
|
|
|
|
const isStillStreaming = run.status === "running" || run.status === "waiting-for-subagents";
|
|
const message: Record<string, unknown> = {
|
|
id: run.accumulated.id,
|
|
role: "assistant",
|
|
content: text,
|
|
parts: cleanParts,
|
|
timestamp: new Date().toISOString(),
|
|
};
|
|
if (isStillStreaming) {
|
|
message._streaming = true;
|
|
}
|
|
|
|
try {
|
|
upsertMessage(run.sessionId, message);
|
|
} catch (err) {
|
|
console.error("[active-runs] Persistence error:", err);
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Upsert a single message into the session JSONL.
|
|
* If a line with the same `id` already exists it is replaced; otherwise appended.
|
|
*/
|
|
function upsertMessage(
|
|
sessionId: string,
|
|
message: Record<string, unknown>,
|
|
) {
|
|
ensureDir();
|
|
const fp = join(webChatDir(), `${sessionId}.jsonl`);
|
|
if (!existsSync(fp)) {writeFileSync(fp, "");}
|
|
|
|
const msgId = message.id as string;
|
|
const content = readFileSync(fp, "utf-8");
|
|
const lines = content.split("\n").filter((l) => l.trim());
|
|
|
|
let found = false;
|
|
const updated = lines.map((line) => {
|
|
try {
|
|
const parsed = JSON.parse(line);
|
|
if (parsed.id === msgId) {
|
|
found = true;
|
|
return JSON.stringify(message);
|
|
}
|
|
} catch {
|
|
/* keep as-is */
|
|
}
|
|
return line;
|
|
});
|
|
|
|
if (!found) {
|
|
updated.push(JSON.stringify(message));
|
|
}
|
|
|
|
writeFileSync(fp, updated.join("\n") + "\n");
|
|
|
|
if (!sessionId.includes(":subagent:")) {
|
|
if (!found) {
|
|
updateIndex(sessionId, { incrementCount: 1 });
|
|
} else {
|
|
updateIndex(sessionId, {});
|
|
}
|
|
}
|
|
}
|
|
|
|
function cleanupRun(sessionId: string) {
|
|
const run = activeRuns.get(sessionId);
|
|
if (!run) {return;}
|
|
if (run._persistTimer) {clearTimeout(run._persistTimer);}
|
|
clearWaitingFinalizeTimer(run);
|
|
stopSubscribeProcess(run);
|
|
activeRuns.delete(sessionId);
|
|
}
|