913 lines
25 KiB
TypeScript
913 lines
25 KiB
TypeScript
/**
|
|
* Server-side manager for subagent runs spawned by the web chat agent.
|
|
*
|
|
* Mirrors the ActiveRunManager pattern: buffers SSE events, supports
|
|
* subscriber fan-out, and tracks subagent metadata per parent web session.
|
|
*
|
|
* Events are fed from CLI NDJSON streams (parent run + subscribe continuations).
|
|
*/
|
|
import { randomUUID } from "node:crypto";
|
|
import { createInterface } from "node:readline";
|
|
import { existsSync, readFileSync, writeFileSync, mkdirSync, appendFileSync } from "node:fs";
|
|
import { join } from "node:path";
|
|
import {
|
|
type AgentEvent,
|
|
type AgentProcessHandle,
|
|
spawnAgentSubscribeProcess,
|
|
callGatewayRpc,
|
|
extractToolResult,
|
|
buildToolOutput,
|
|
parseAgentErrorMessage,
|
|
parseErrorBody,
|
|
} from "./agent-runner";
|
|
import { resolveOpenClawStateDir, resolveWebChatDir } from "./workspace";
|
|
|
|
// ── Types ──
|
|
|
|
export type SseEvent = Record<string, unknown> & { type: string };
|
|
export type SubagentSubscriber = (event: SseEvent | null) => void;
|
|
|
|
export type SubagentInfo = {
|
|
sessionKey: string;
|
|
runId: string;
|
|
parentWebSessionId: string;
|
|
task: string;
|
|
label?: string;
|
|
status: "running" | "completed" | "error";
|
|
startedAt: number;
|
|
endedAt?: number;
|
|
};
|
|
|
|
type SubagentRun = SubagentInfo & {
|
|
eventBuffer: SseEvent[];
|
|
subscribers: Set<SubagentSubscriber>;
|
|
/** Internal state for event-to-SSE transformation */
|
|
_state: TransformState;
|
|
_subscribeProcess: AgentProcessHandle | null;
|
|
_cleanupTimer: ReturnType<typeof setTimeout> | null;
|
|
/** Set when lifecycle/end is received; actual finalization deferred to subscribe close. */
|
|
_lifecycleEnded: boolean;
|
|
/** Safety timer to finalize if subscribe process hangs after lifecycle/end. */
|
|
_finalizeTimer: ReturnType<typeof setTimeout> | null;
|
|
/** Last globalSeq seen from the gateway event stream for replay cursor. */
|
|
lastGlobalSeq: number;
|
|
/** Retry timer for subscribe stream restarts. */
|
|
_subscribeRetryTimer: ReturnType<typeof setTimeout> | null;
|
|
/** Consecutive subscribe restart attempts without a received event. */
|
|
_subscribeRetryAttempt: number;
|
|
};
|
|
|
|
type TransformState = {
|
|
idCounter: number;
|
|
currentTextId: string;
|
|
currentReasoningId: string;
|
|
textStarted: boolean;
|
|
reasoningStarted: boolean;
|
|
everSentText: boolean;
|
|
statusReasoningActive: boolean;
|
|
};
|
|
|
|
// ── Constants ──
|
|
|
|
const CLEANUP_GRACE_MS = 24 * 60 * 60_000; // 24 hours — events are persisted to disk
|
|
const SUBSCRIBE_RETRY_BASE_MS = 300;
|
|
const SUBSCRIBE_RETRY_MAX_MS = 5_000;
|
|
const GLOBAL_KEY = "__openclaw_subagentRuns" as const;
|
|
|
|
// ── Singleton registry ──
|
|
|
|
type SubagentRegistry = {
|
|
runs: Map<string, SubagentRun>;
|
|
/** Reverse index: parent web session ID → subagent session keys */
|
|
parentIndex: Map<string, Set<string>>;
|
|
};
|
|
|
|
function getRegistry(): SubagentRegistry {
|
|
const existing = (globalThis as Record<string, unknown>)[GLOBAL_KEY] as
|
|
| SubagentRegistry
|
|
| undefined;
|
|
if (existing) {return existing;}
|
|
|
|
const registry: SubagentRegistry = {
|
|
runs: new Map(),
|
|
parentIndex: new Map(),
|
|
};
|
|
(globalThis as Record<string, unknown>)[GLOBAL_KEY] = registry;
|
|
return registry;
|
|
}
|
|
|
|
// ── Event persistence ──
|
|
|
|
/** Profile-scoped directory for subagent event JSONL files. */
|
|
function subagentEventsDir(): string {
|
|
return join(resolveWebChatDir(), "subagent-events");
|
|
}
|
|
|
|
/** Pre-profile-scoping legacy path — used as a read fallback for migration. */
|
|
function legacySubagentEventsDir(): string {
|
|
return join(resolveOpenClawStateDir(), "web-chat", "subagent-events");
|
|
}
|
|
|
|
/** Filesystem-safe filename derived from a session key. */
|
|
function safeFilename(sessionKey: string): string {
|
|
return sessionKey.replaceAll(":", "_") + ".jsonl";
|
|
}
|
|
|
|
function persistEvent(sessionKey: string, event: SseEvent): void {
|
|
try {
|
|
const dir = subagentEventsDir();
|
|
mkdirSync(dir, { recursive: true });
|
|
appendFileSync(join(dir, safeFilename(sessionKey)), JSON.stringify(event) + "\n");
|
|
} catch { /* best-effort */ }
|
|
}
|
|
|
|
function loadPersistedEvents(sessionKey: string): SseEvent[] {
|
|
const fname = safeFilename(sessionKey);
|
|
|
|
// Try profile-scoped dir first, fall back to legacy shared dir.
|
|
let filePath = join(subagentEventsDir(), fname);
|
|
if (!existsSync(filePath)) {
|
|
const legacyPath = join(legacySubagentEventsDir(), fname);
|
|
if (existsSync(legacyPath)) {
|
|
filePath = legacyPath;
|
|
} else {
|
|
return [];
|
|
}
|
|
}
|
|
|
|
try {
|
|
const lines = readFileSync(filePath, "utf-8").split("\n");
|
|
const events: SseEvent[] = [];
|
|
for (const line of lines) {
|
|
if (!line.trim()) {continue;}
|
|
try { events.push(JSON.parse(line) as SseEvent); } catch { /* skip */ }
|
|
}
|
|
return events;
|
|
} catch { return []; }
|
|
}
|
|
|
|
// ── Profile-scoped subagent index ──
|
|
|
|
type SubagentIndexEntry = {
|
|
runId: string;
|
|
parentWebSessionId: string;
|
|
task: string;
|
|
label?: string;
|
|
status: "running" | "completed" | "error";
|
|
startedAt: number;
|
|
endedAt?: number;
|
|
};
|
|
|
|
function subagentIndexPath(): string {
|
|
return join(resolveWebChatDir(), "subagent-index.json");
|
|
}
|
|
|
|
function loadSubagentIndex(): Record<string, SubagentIndexEntry> {
|
|
const p = subagentIndexPath();
|
|
if (!existsSync(p)) {return {};}
|
|
try {
|
|
return JSON.parse(readFileSync(p, "utf-8")) as Record<string, SubagentIndexEntry>;
|
|
} catch { return {}; }
|
|
}
|
|
|
|
function upsertSubagentIndex(sessionKey: string, entry: SubagentIndexEntry): void {
|
|
try {
|
|
const dir = resolveWebChatDir();
|
|
mkdirSync(dir, { recursive: true });
|
|
const index = loadSubagentIndex();
|
|
index[sessionKey] = entry;
|
|
writeFileSync(subagentIndexPath(), JSON.stringify(index, null, 2));
|
|
} catch { /* best-effort */ }
|
|
}
|
|
|
|
/** Read the on-disk registry entry and derive the proper status. */
|
|
function readDiskStatus(sessionKey: string): "running" | "completed" | "error" {
|
|
// Check profile-scoped index first.
|
|
const profileIndex = loadSubagentIndex();
|
|
const profileEntry = profileIndex[sessionKey];
|
|
if (profileEntry) {
|
|
return profileEntry.status;
|
|
}
|
|
|
|
// Fall back to the shared gateway registry.
|
|
const registryPath = join(resolveOpenClawStateDir(), "subagents", "runs.json");
|
|
if (!existsSync(registryPath)) {return "running";}
|
|
try {
|
|
const raw = JSON.parse(readFileSync(registryPath, "utf-8")) as { runs?: Record<string, Record<string, unknown>> };
|
|
const runs = raw?.runs;
|
|
if (!runs) {return "running";}
|
|
for (const entry of Object.values(runs)) {
|
|
if (entry.childSessionKey === sessionKey) {
|
|
if (typeof entry.endedAt !== "number") {return "running";}
|
|
const outcome = entry.outcome as { status?: string } | undefined;
|
|
if (outcome?.status === "error") {return "error";}
|
|
return "completed";
|
|
}
|
|
}
|
|
} catch { /* ignore */ }
|
|
return "running";
|
|
}
|
|
|
|
// ── Public API ──
|
|
|
|
/**
|
|
* Register a newly spawned subagent. Called when the parent agent's
|
|
* `sessions_spawn` tool result is detected in active-runs.ts.
|
|
*
|
|
* When `fromDisk` is true, the run is being rehydrated after a refresh,
|
|
* so we load persisted events and set the correct status from the registry.
|
|
*/
|
|
export function registerSubagent(
|
|
parentWebSessionId: string,
|
|
info: { sessionKey: string; runId: string; task: string; label?: string },
|
|
options?: { fromDisk?: boolean },
|
|
): void {
|
|
const reg = getRegistry();
|
|
|
|
// Avoid duplicate registration
|
|
if (reg.runs.has(info.sessionKey)) {return;}
|
|
|
|
const fromDisk = options?.fromDisk ?? false;
|
|
const diskStatus = fromDisk ? readDiskStatus(info.sessionKey) : "running";
|
|
|
|
const run: SubagentRun = {
|
|
sessionKey: info.sessionKey,
|
|
runId: info.runId,
|
|
parentWebSessionId,
|
|
task: info.task,
|
|
label: info.label,
|
|
status: diskStatus,
|
|
startedAt: Date.now(),
|
|
eventBuffer: [],
|
|
subscribers: new Set(),
|
|
_state: createTransformState(),
|
|
_subscribeProcess: null,
|
|
_cleanupTimer: null,
|
|
_lifecycleEnded: false,
|
|
_finalizeTimer: null,
|
|
lastGlobalSeq: 0,
|
|
_subscribeRetryTimer: null,
|
|
_subscribeRetryAttempt: 0,
|
|
};
|
|
|
|
// Load persisted events from disk (fills the replay buffer)
|
|
if (fromDisk) {
|
|
run.eventBuffer = loadPersistedEvents(info.sessionKey);
|
|
}
|
|
|
|
reg.runs.set(info.sessionKey, run);
|
|
|
|
// Update parent index
|
|
let keys = reg.parentIndex.get(parentWebSessionId);
|
|
if (!keys) {
|
|
keys = new Set();
|
|
reg.parentIndex.set(parentWebSessionId, keys);
|
|
}
|
|
keys.add(info.sessionKey);
|
|
|
|
// Persist to the profile-scoped subagent index.
|
|
upsertSubagentIndex(info.sessionKey, {
|
|
runId: info.runId,
|
|
parentWebSessionId,
|
|
task: info.task,
|
|
label: info.label,
|
|
status: run.status,
|
|
startedAt: run.startedAt,
|
|
endedAt: run.endedAt,
|
|
});
|
|
|
|
// Subagents are first-class sessions; subscribe immediately.
|
|
if (run.status === "running") {
|
|
startSubagentSubscribeStream(run);
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Ensure a rehydrated subagent can receive live events. Called when a client
|
|
* actually connects to the subagent's SSE stream after a page refresh.
|
|
* For still-running subagents, this activates the subscribe-child fallback.
|
|
*/
|
|
export function ensureSubagentStreamable(sessionKey: string): void {
|
|
const run = getRegistry().runs.get(sessionKey);
|
|
if (!run || run.status !== "running" || run._subscribeProcess) {return;}
|
|
startSubagentSubscribeStream(run);
|
|
}
|
|
|
|
/** Get metadata for all subagents belonging to a parent web session. */
|
|
export function getSubagentsForSession(parentWebSessionId: string): SubagentInfo[] {
|
|
const reg = getRegistry();
|
|
const keys = reg.parentIndex.get(parentWebSessionId);
|
|
if (!keys) {return [];}
|
|
|
|
const result: SubagentInfo[] = [];
|
|
for (const key of keys) {
|
|
const run = reg.runs.get(key);
|
|
if (run) {
|
|
result.push({
|
|
sessionKey: run.sessionKey,
|
|
runId: run.runId,
|
|
parentWebSessionId: run.parentWebSessionId,
|
|
task: run.task,
|
|
label: run.label,
|
|
status: run.status,
|
|
startedAt: run.startedAt,
|
|
endedAt: run.endedAt,
|
|
});
|
|
}
|
|
}
|
|
return result;
|
|
}
|
|
|
|
/**
|
|
* Subscribe to a subagent's SSE events. Replays buffered events first
|
|
* (synchronously), then live events follow.
|
|
*/
|
|
export function subscribeToSubagent(
|
|
sessionKey: string,
|
|
callback: SubagentSubscriber,
|
|
options?: { replay?: boolean },
|
|
): (() => void) | null {
|
|
const reg = getRegistry();
|
|
const run = reg.runs.get(sessionKey);
|
|
if (!run) {return null;}
|
|
|
|
const replay = options?.replay ?? true;
|
|
if (replay) {
|
|
for (const event of run.eventBuffer) {
|
|
callback(event);
|
|
}
|
|
}
|
|
|
|
if (run.status !== "running") {
|
|
callback(null);
|
|
return () => {};
|
|
}
|
|
|
|
run.subscribers.add(callback);
|
|
return () => {
|
|
run.subscribers.delete(callback);
|
|
};
|
|
}
|
|
|
|
/** Check if a subagent run exists (running or completed with buffered data). */
|
|
export function hasActiveSubagent(sessionKey: string): boolean {
|
|
return getRegistry().runs.has(sessionKey);
|
|
}
|
|
|
|
/** Check if a subagent is currently running (not yet completed). */
|
|
export function isSubagentRunning(sessionKey: string): boolean {
|
|
const run = getRegistry().runs.get(sessionKey);
|
|
return run !== undefined && run.status === "running";
|
|
}
|
|
|
|
/**
|
|
* Activate subscribe-child streams for all subagent runs that are
|
|
* still in "running" status and don't already have a gateway subscription.
|
|
*
|
|
* Called when the parent agent's NDJSON stream ends (child process exits).
|
|
* After that point the NDJSON routing is no longer available, so the
|
|
* subscribe child streams become the only event source for orphaned subagents.
|
|
*/
|
|
export function hasRunningSubagentsForParent(parentWebSessionId: string): boolean {
|
|
const reg = getRegistry();
|
|
const keys = reg.parentIndex.get(parentWebSessionId);
|
|
|
|
if (keys && keys.size > 0) {
|
|
let anyRunning = false;
|
|
for (const key of keys) {
|
|
const run = reg.runs.get(key);
|
|
if (run?.status !== "running") {continue;}
|
|
const diskStatus = readDiskStatus(key);
|
|
if (diskStatus !== "running") {
|
|
finalizeRun(run, diskStatus === "error" ? "error" : "completed");
|
|
continue;
|
|
}
|
|
anyRunning = true;
|
|
}
|
|
if (anyRunning) {return true;}
|
|
}
|
|
|
|
// Fallback: check the gateway disk registry for running subagents
|
|
// that may not have been registered in-memory yet.
|
|
return checkDiskRegistryForRunningSubagents(parentWebSessionId);
|
|
}
|
|
|
|
function checkDiskRegistryForRunningSubagents(parentWebSessionId: string): boolean {
|
|
const registryPath = join(resolveOpenClawStateDir(), "subagents", "runs.json");
|
|
if (!existsSync(registryPath)) {return false;}
|
|
try {
|
|
const raw = JSON.parse(readFileSync(registryPath, "utf-8")) as {
|
|
runs?: Record<string, Record<string, unknown>>;
|
|
};
|
|
const runs = raw?.runs;
|
|
if (!runs) {return false;}
|
|
const parentKeyPattern = `:web:${parentWebSessionId}`;
|
|
for (const entry of Object.values(runs)) {
|
|
const requester = typeof entry.requesterSessionKey === "string"
|
|
? entry.requesterSessionKey
|
|
: "";
|
|
if (!requester.endsWith(parentKeyPattern)) {continue;}
|
|
if (typeof entry.endedAt === "number") {continue;}
|
|
return true;
|
|
}
|
|
} catch {
|
|
// ignore read errors
|
|
}
|
|
return false;
|
|
}
|
|
|
|
/** Return session keys of all currently running subagents. */
|
|
export function getRunningSubagentKeys(): string[] {
|
|
const keys: string[] = [];
|
|
for (const [key, run] of getRegistry().runs) {
|
|
if (run.status === "running") {
|
|
keys.push(key);
|
|
}
|
|
}
|
|
return keys;
|
|
}
|
|
|
|
export function persistUserMessage(
|
|
sessionKey: string,
|
|
msg: { id?: string; text: string },
|
|
): boolean {
|
|
const run = getRegistry().runs.get(sessionKey);
|
|
if (!run) {return false;}
|
|
const event: SseEvent = {
|
|
type: "user-message",
|
|
id: msg.id ?? `user-${Date.now()}-${Math.random().toString(36).slice(2)}`,
|
|
text: msg.text,
|
|
};
|
|
run.eventBuffer.push(event);
|
|
persistEvent(sessionKey, event);
|
|
for (const sub of run.subscribers) {
|
|
try { sub(event); } catch { /* ignore */ }
|
|
}
|
|
return true;
|
|
}
|
|
|
|
export function reactivateSubagent(sessionKey: string): boolean {
|
|
const run = getRegistry().runs.get(sessionKey);
|
|
if (!run) {return false;}
|
|
if (run._cleanupTimer) {
|
|
clearTimeout(run._cleanupTimer);
|
|
run._cleanupTimer = null;
|
|
}
|
|
run.status = "running";
|
|
run.endedAt = undefined;
|
|
upsertSubagentIndex(run.sessionKey, {
|
|
runId: run.runId,
|
|
parentWebSessionId: run.parentWebSessionId,
|
|
task: run.task,
|
|
label: run.label,
|
|
status: run.status,
|
|
startedAt: run.startedAt,
|
|
endedAt: run.endedAt,
|
|
});
|
|
startSubagentSubscribeStream(run);
|
|
return true;
|
|
}
|
|
|
|
function sendGatewayAbortForSubagent(sessionKey: string): void {
|
|
try {
|
|
void callGatewayRpc("chat.abort", { sessionKey }, { timeoutMs: 4_000 }).catch(
|
|
() => {
|
|
// best effort
|
|
},
|
|
);
|
|
} catch {
|
|
// best effort
|
|
}
|
|
}
|
|
|
|
export function abortSubagent(sessionKey: string): boolean {
|
|
const run = getRegistry().runs.get(sessionKey);
|
|
if (!run || run.status !== "running") {return false;}
|
|
sendGatewayAbortForSubagent(sessionKey);
|
|
finalizeRun(run, "error");
|
|
return true;
|
|
}
|
|
|
|
export function spawnSubagentMessage(sessionKey: string, message: string): boolean {
|
|
try {
|
|
const run = getRegistry().runs.get(sessionKey);
|
|
if (!run) {return false;}
|
|
const idempotencyKey = randomUUID();
|
|
void callGatewayRpc(
|
|
"agent",
|
|
{
|
|
message,
|
|
sessionKey,
|
|
idempotencyKey,
|
|
deliver: false,
|
|
channel: "webchat",
|
|
lane: "subagent",
|
|
timeout: 0,
|
|
},
|
|
{ timeoutMs: 10_000 },
|
|
).catch(() => {
|
|
// best effort
|
|
});
|
|
return true;
|
|
} catch {
|
|
return false;
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Lazily register a subagent by reading the on-disk registries.
|
|
* Checks the profile-scoped subagent-index.json first, then falls back
|
|
* to the shared gateway registry (~/.openclaw/subagents/runs.json).
|
|
* Returns true if the subagent was found and registered (or already registered).
|
|
*/
|
|
export function ensureRegisteredFromDisk(
|
|
sessionKey: string,
|
|
parentWebSessionId: string,
|
|
): boolean {
|
|
if (getRegistry().runs.has(sessionKey)) {return true;}
|
|
|
|
// 1. Check profile-scoped index.
|
|
const profileIndex = loadSubagentIndex();
|
|
const profileEntry = profileIndex[sessionKey];
|
|
if (profileEntry) {
|
|
registerSubagent(profileEntry.parentWebSessionId || parentWebSessionId, {
|
|
sessionKey,
|
|
runId: profileEntry.runId,
|
|
task: profileEntry.task,
|
|
label: profileEntry.label,
|
|
}, { fromDisk: true });
|
|
return true;
|
|
}
|
|
|
|
// 2. Fall back to the shared gateway registry.
|
|
const registryPath = join(resolveOpenClawStateDir(), "subagents", "runs.json");
|
|
if (!existsSync(registryPath)) {return false;}
|
|
|
|
try {
|
|
const raw = JSON.parse(readFileSync(registryPath, "utf-8")) as { runs?: Record<string, Record<string, unknown>> };
|
|
const runs = raw?.runs;
|
|
if (!runs) {return false;}
|
|
|
|
for (const entry of Object.values(runs)) {
|
|
if (entry.childSessionKey === sessionKey) {
|
|
registerSubagent(parentWebSessionId, {
|
|
sessionKey,
|
|
runId: typeof entry.runId === "string" ? entry.runId : "",
|
|
task: typeof entry.task === "string" ? entry.task : "",
|
|
label: typeof entry.label === "string" ? entry.label : undefined,
|
|
}, { fromDisk: true });
|
|
return true;
|
|
}
|
|
}
|
|
} catch { /* ignore */ }
|
|
|
|
return false;
|
|
}
|
|
|
|
// ── Event transformation (gateway event → SSE events) ──
|
|
|
|
function createTransformState(): TransformState {
|
|
return {
|
|
idCounter: 0,
|
|
currentTextId: "",
|
|
currentReasoningId: "",
|
|
textStarted: false,
|
|
reasoningStarted: false,
|
|
everSentText: false,
|
|
statusReasoningActive: false,
|
|
};
|
|
}
|
|
|
|
function handleAgentEvent(run: SubagentRun, evt: AgentEvent): void {
|
|
if (evt.event !== "agent") {return;}
|
|
const gSeq = typeof (evt as Record<string, unknown>).globalSeq === "number"
|
|
? (evt as Record<string, unknown>).globalSeq as number
|
|
: undefined;
|
|
if (gSeq !== undefined) {
|
|
if (gSeq <= run.lastGlobalSeq) {return;}
|
|
run.lastGlobalSeq = gSeq;
|
|
}
|
|
const stream = typeof evt.stream === "string" ? evt.stream : undefined;
|
|
const data =
|
|
evt.data && typeof evt.data === "object"
|
|
? (evt.data)
|
|
: undefined;
|
|
|
|
if (!stream || !data) {return;}
|
|
|
|
const st = run._state;
|
|
const nextId = (prefix: string) => `${prefix}-${Date.now()}-${++st.idCounter}`;
|
|
|
|
const emit = (event: SseEvent) => {
|
|
run.eventBuffer.push(event);
|
|
persistEvent(run.sessionKey, event);
|
|
for (const sub of run.subscribers) {
|
|
try { sub(event); } catch { /* ignore */ }
|
|
}
|
|
};
|
|
|
|
const closeReasoning = () => {
|
|
if (st.reasoningStarted) {
|
|
emit({ type: "reasoning-end", id: st.currentReasoningId });
|
|
st.reasoningStarted = false;
|
|
st.statusReasoningActive = false;
|
|
}
|
|
};
|
|
|
|
const closeText = () => {
|
|
if (st.textStarted) {
|
|
emit({ type: "text-end", id: st.currentTextId });
|
|
st.textStarted = false;
|
|
}
|
|
};
|
|
|
|
const openStatusReasoning = (label: string) => {
|
|
closeReasoning();
|
|
closeText();
|
|
st.currentReasoningId = nextId("status");
|
|
emit({ type: "reasoning-start", id: st.currentReasoningId });
|
|
emit({ type: "reasoning-delta", id: st.currentReasoningId, delta: label });
|
|
st.reasoningStarted = true;
|
|
st.statusReasoningActive = true;
|
|
};
|
|
|
|
const emitError = (message: string) => {
|
|
closeReasoning();
|
|
closeText();
|
|
const tid = nextId("text");
|
|
emit({ type: "text-start", id: tid });
|
|
emit({ type: "text-delta", id: tid, delta: `[error] ${message}` });
|
|
emit({ type: "text-end", id: tid });
|
|
st.everSentText = true;
|
|
};
|
|
|
|
// Lifecycle start
|
|
if (stream === "lifecycle" && data.phase === "start") {
|
|
openStatusReasoning("Preparing response...");
|
|
}
|
|
|
|
// Thinking / reasoning
|
|
if (stream === "thinking") {
|
|
const delta = typeof data.delta === "string" ? data.delta : undefined;
|
|
if (delta) {
|
|
if (st.statusReasoningActive) {closeReasoning();}
|
|
if (!st.reasoningStarted) {
|
|
st.currentReasoningId = nextId("reasoning");
|
|
emit({ type: "reasoning-start", id: st.currentReasoningId });
|
|
st.reasoningStarted = true;
|
|
}
|
|
emit({ type: "reasoning-delta", id: st.currentReasoningId, delta });
|
|
}
|
|
}
|
|
|
|
// Assistant text
|
|
if (stream === "assistant") {
|
|
const delta = typeof data.delta === "string" ? data.delta : undefined;
|
|
const textFallback =
|
|
!delta && typeof data.text === "string" ? data.text : undefined;
|
|
const chunk = delta ?? textFallback;
|
|
if (chunk) {
|
|
closeReasoning();
|
|
if (!st.textStarted) {
|
|
st.currentTextId = nextId("text");
|
|
emit({ type: "text-start", id: st.currentTextId });
|
|
st.textStarted = true;
|
|
}
|
|
st.everSentText = true;
|
|
emit({ type: "text-delta", id: st.currentTextId, delta: chunk });
|
|
}
|
|
// Inline error
|
|
if (
|
|
typeof data.stopReason === "string" &&
|
|
data.stopReason === "error" &&
|
|
typeof data.errorMessage === "string"
|
|
) {
|
|
emitError(parseErrorBody(data.errorMessage));
|
|
}
|
|
}
|
|
|
|
// Tool events
|
|
if (stream === "tool") {
|
|
const phase = typeof data.phase === "string" ? data.phase : undefined;
|
|
const toolCallId = typeof data.toolCallId === "string" ? data.toolCallId : "";
|
|
const toolName = typeof data.name === "string" ? data.name : "";
|
|
|
|
if (phase === "start") {
|
|
closeReasoning();
|
|
closeText();
|
|
const args =
|
|
data.args && typeof data.args === "object"
|
|
? (data.args as Record<string, unknown>)
|
|
: {};
|
|
emit({ type: "tool-input-start", toolCallId, toolName });
|
|
emit({ type: "tool-input-available", toolCallId, toolName, input: args });
|
|
} else if (phase === "result") {
|
|
const isError = data.isError === true;
|
|
const result = extractToolResult(data.result);
|
|
if (isError) {
|
|
const errorText =
|
|
result?.text ||
|
|
(result?.details?.error as string | undefined) ||
|
|
"Tool execution failed";
|
|
emit({ type: "tool-output-error", toolCallId, errorText });
|
|
} else {
|
|
const output = buildToolOutput(result);
|
|
emit({ type: "tool-output-available", toolCallId, output });
|
|
}
|
|
}
|
|
}
|
|
|
|
// Compaction
|
|
if (stream === "compaction") {
|
|
const phase = typeof data.phase === "string" ? data.phase : undefined;
|
|
if (phase === "start") {
|
|
openStatusReasoning("Optimizing session context...");
|
|
} else if (phase === "end") {
|
|
if (st.statusReasoningActive) {
|
|
if (data.willRetry === true) {
|
|
emit({
|
|
type: "reasoning-delta",
|
|
id: st.currentReasoningId,
|
|
delta: "\nRetrying with compacted context...",
|
|
});
|
|
} else {
|
|
closeReasoning();
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// Lifecycle end → defer finalization until subscribe process closes
|
|
// so any remaining events in the readline buffer are still delivered.
|
|
if (stream === "lifecycle" && data.phase === "end") {
|
|
closeReasoning();
|
|
closeText();
|
|
run._lifecycleEnded = true;
|
|
if (run._finalizeTimer) {clearTimeout(run._finalizeTimer);}
|
|
run._finalizeTimer = setTimeout(() => {
|
|
run._finalizeTimer = null;
|
|
if (run.status === "running") {
|
|
finalizeRun(run, "completed");
|
|
}
|
|
}, 5_000);
|
|
}
|
|
|
|
// Lifecycle error
|
|
if (stream === "lifecycle" && data.phase === "error") {
|
|
const msg = parseAgentErrorMessage(data);
|
|
if (msg) {emitError(msg);}
|
|
finalizeRun(run, "error");
|
|
}
|
|
}
|
|
|
|
function finalizeRun(run: SubagentRun, status: "completed" | "error"): void {
|
|
if (run.status !== "running") {return;}
|
|
resetSubscribeRetryState(run);
|
|
|
|
if (run._finalizeTimer) {
|
|
clearTimeout(run._finalizeTimer);
|
|
run._finalizeTimer = null;
|
|
}
|
|
|
|
run.status = status;
|
|
run.endedAt = Date.now();
|
|
|
|
// Update the profile-scoped subagent index with final status.
|
|
upsertSubagentIndex(run.sessionKey, {
|
|
runId: run.runId,
|
|
parentWebSessionId: run.parentWebSessionId,
|
|
task: run.task,
|
|
label: run.label,
|
|
status: run.status,
|
|
startedAt: run.startedAt,
|
|
endedAt: run.endedAt,
|
|
});
|
|
|
|
// Signal completion to all subscribers
|
|
for (const sub of run.subscribers) {
|
|
try { sub(null); } catch { /* ignore */ }
|
|
}
|
|
run.subscribers.clear();
|
|
|
|
stopSubagentSubscribeStream(run);
|
|
|
|
// Schedule cleanup after grace period
|
|
run._cleanupTimer = setTimeout(() => {
|
|
cleanupRun(run.sessionKey);
|
|
}, CLEANUP_GRACE_MS);
|
|
}
|
|
|
|
function cleanupRun(sessionKey: string): void {
|
|
const reg = getRegistry();
|
|
const run = reg.runs.get(sessionKey);
|
|
if (!run) {return;}
|
|
|
|
if (run._cleanupTimer) {
|
|
clearTimeout(run._cleanupTimer);
|
|
run._cleanupTimer = null;
|
|
}
|
|
stopSubagentSubscribeStream(run);
|
|
reg.runs.delete(sessionKey);
|
|
|
|
// Clean up parent index
|
|
const keys = reg.parentIndex.get(run.parentWebSessionId);
|
|
if (keys) {
|
|
keys.delete(sessionKey);
|
|
if (keys.size === 0) {
|
|
reg.parentIndex.delete(run.parentWebSessionId);
|
|
}
|
|
}
|
|
}
|
|
|
|
function startSubagentSubscribeStream(run: SubagentRun): void {
|
|
stopSubagentSubscribeStream(run);
|
|
const child = spawnAgentSubscribeProcess(run.sessionKey, run.lastGlobalSeq);
|
|
run._subscribeProcess = child;
|
|
const rl = createInterface({ input: child.stdout! });
|
|
|
|
rl.on("line", (line: string) => {
|
|
if (!line.trim()) {return;}
|
|
let ev: AgentEvent;
|
|
try {
|
|
ev = JSON.parse(line) as AgentEvent;
|
|
} catch {
|
|
return;
|
|
}
|
|
if (ev.sessionKey && ev.sessionKey !== run.sessionKey) {
|
|
return;
|
|
}
|
|
if (run._subscribeRetryAttempt > 0) {
|
|
resetSubscribeRetryState(run);
|
|
}
|
|
handleAgentEvent(run, ev);
|
|
});
|
|
|
|
child.on("close", () => {
|
|
if (run._subscribeProcess !== child) {
|
|
return;
|
|
}
|
|
run._subscribeProcess = null;
|
|
if (run.status !== "running") {return;}
|
|
if (run._lifecycleEnded) {
|
|
if (run._finalizeTimer) {
|
|
clearTimeout(run._finalizeTimer);
|
|
run._finalizeTimer = null;
|
|
}
|
|
finalizeRun(run, "completed");
|
|
return;
|
|
}
|
|
scheduleSubscribeRestart(run, () => {
|
|
if (run.status === "running" && !run._subscribeProcess) {
|
|
startSubagentSubscribeStream(run);
|
|
}
|
|
});
|
|
});
|
|
|
|
child.on("error", (err) => {
|
|
console.error("[subagent-runs] Subscribe child error:", err);
|
|
});
|
|
|
|
child.stderr?.on("data", (chunk: Buffer) => {
|
|
console.error("[subagent-runs subscribe stderr]", chunk.toString());
|
|
});
|
|
}
|
|
|
|
function stopSubagentSubscribeStream(run: SubagentRun): void {
|
|
clearSubscribeRetryTimer(run);
|
|
if (!run._subscribeProcess) {return;}
|
|
try {
|
|
run._subscribeProcess.kill("SIGTERM");
|
|
} catch {
|
|
/* ignore */
|
|
}
|
|
run._subscribeProcess = null;
|
|
}
|
|
|
|
function clearSubscribeRetryTimer(run: SubagentRun): void {
|
|
if (!run._subscribeRetryTimer) {
|
|
return;
|
|
}
|
|
clearTimeout(run._subscribeRetryTimer);
|
|
run._subscribeRetryTimer = null;
|
|
}
|
|
|
|
function resetSubscribeRetryState(run: SubagentRun): void {
|
|
run._subscribeRetryAttempt = 0;
|
|
clearSubscribeRetryTimer(run);
|
|
}
|
|
|
|
function scheduleSubscribeRestart(run: SubagentRun, restart: () => void): void {
|
|
if (run._subscribeRetryTimer) {
|
|
return;
|
|
}
|
|
const delay = Math.min(
|
|
SUBSCRIBE_RETRY_MAX_MS,
|
|
SUBSCRIBE_RETRY_BASE_MS * 2 ** run._subscribeRetryAttempt,
|
|
);
|
|
run._subscribeRetryAttempt += 1;
|
|
run._subscribeRetryTimer = setTimeout(() => {
|
|
run._subscribeRetryTimer = null;
|
|
restart();
|
|
}, delay);
|
|
}
|