diff --git a/apps/web/lib/subagent-runs.ts b/apps/web/lib/subagent-runs.ts index 424ce709f3a..9ba99c91482 100644 --- a/apps/web/lib/subagent-runs.ts +++ b/apps/web/lib/subagent-runs.ts @@ -6,7 +6,7 @@ * * Events are fed from the gateway WebSocket connection (gateway-events.ts). */ -import { existsSync, readFileSync } from "node:fs"; +import { existsSync, readFileSync, mkdirSync, appendFileSync } from "node:fs"; import { join } from "node:path"; import { extractToolResult, @@ -54,7 +54,7 @@ type TransformState = { // ── Constants ── -const CLEANUP_GRACE_MS = 60_000; +const CLEANUP_GRACE_MS = 24 * 60 * 60_000; // 24 hours — events are persisted to disk const GLOBAL_KEY = "__openclaw_subagentRuns" as const; // ── Singleton registry ── @@ -82,28 +82,89 @@ function getRegistry(): SubagentRegistry { return registry; } +// ── Event persistence ── + +function subagentEventsDir(): string { + return join(resolveOpenClawStateDir(), "web-chat", "subagent-events"); +} + +/** Filesystem-safe filename derived from a session key. */ +function safeFilename(sessionKey: string): string { + return sessionKey.replaceAll(":", "_") + ".jsonl"; +} + +function persistEvent(sessionKey: string, event: SseEvent): void { + try { + const dir = subagentEventsDir(); + mkdirSync(dir, { recursive: true }); + appendFileSync(join(dir, safeFilename(sessionKey)), JSON.stringify(event) + "\n"); + } catch { /* best-effort */ } +} + +function loadPersistedEvents(sessionKey: string): SseEvent[] { + const filePath = join(subagentEventsDir(), safeFilename(sessionKey)); + if (!existsSync(filePath)) {return [];} + + try { + const lines = readFileSync(filePath, "utf-8").split("\n"); + const events: SseEvent[] = []; + for (const line of lines) { + if (!line.trim()) {continue;} + try { events.push(JSON.parse(line) as SseEvent); } catch { /* skip */ } + } + return events; + } catch { return []; } +} + +/** Read the on-disk registry entry and derive the proper status. */ +function readDiskStatus(sessionKey: string): "running" | "completed" | "error" { + const registryPath = join(resolveOpenClawStateDir(), "subagents", "runs.json"); + if (!existsSync(registryPath)) {return "running";} + try { + const raw = JSON.parse(readFileSync(registryPath, "utf-8")); + const runs = raw?.runs; + if (!runs || typeof runs !== "object") {return "running";} + for (const entry of Object.values(runs)) { + if (entry.childSessionKey === sessionKey) { + if (typeof entry.endedAt !== "number") {return "running";} + const outcome = entry.outcome as { status?: string } | undefined; + if (outcome?.status === "error") {return "error";} + return "completed"; + } + } + } catch { /* ignore */ } + return "running"; +} + // ── Public API ── /** * Register a newly spawned subagent. Called when the parent agent's * `sessions_spawn` tool result is detected in active-runs.ts. + * + * When `fromDisk` is true, the run is being rehydrated after a refresh, + * so we load persisted events and set the correct status from the registry. */ export function registerSubagent( parentWebSessionId: string, info: { sessionKey: string; runId: string; task: string; label?: string }, + options?: { fromDisk?: boolean }, ): void { const reg = getRegistry(); // Avoid duplicate registration if (reg.runs.has(info.sessionKey)) {return;} + const fromDisk = options?.fromDisk ?? false; + const diskStatus = fromDisk ? readDiskStatus(info.sessionKey) : "running"; + const run: SubagentRun = { sessionKey: info.sessionKey, runId: info.runId, parentWebSessionId, task: info.task, label: info.label, - status: "running", + status: diskStatus, startedAt: Date.now(), eventBuffer: [], subscribers: new Set(), @@ -112,6 +173,11 @@ export function registerSubagent( _cleanupTimer: null, }; + // Load persisted events from disk (fills the replay buffer) + if (fromDisk) { + run.eventBuffer = loadPersistedEvents(info.sessionKey); + } + reg.runs.set(info.sessionKey, run); // Update parent index @@ -122,11 +188,13 @@ export function registerSubagent( } keys.add(info.sessionKey); - // The primary event source is the parent agent's NDJSON stream, routed - // via routeRawEvent(). We do NOT subscribe to gateway WebSocket here to - // avoid duplicate events (the parent CLI already receives all broadcasts). + // NOTE: We do NOT subscribe to gateway WebSocket here. During live + // streaming, events arrive via routeRawEvent() from the parent's NDJSON + // stream. After the parent exits, activateGatewayFallback() subscribes. + // For on-demand rehydration (page refresh), ensureSubagentStreamable() + // handles the subscription. - // Replay any pre-registration buffered events + // Replay any pre-registration buffered events (live sessions only) const buf = reg.preRegBuffer.get(info.sessionKey); if (buf && buf.length > 0) { for (const evt of buf) { @@ -136,6 +204,19 @@ export function registerSubagent( } } +/** + * Ensure a rehydrated subagent can receive live events. Called when a client + * actually connects to the subagent's SSE stream after a page refresh. + * For still-running subagents, this activates the gateway WebSocket fallback. + */ +export function ensureSubagentStreamable(sessionKey: string): void { + const run = getRegistry().runs.get(sessionKey); + if (!run || run.status !== "running" || run._unsubGateway) {return;} + run._unsubGateway = subscribeToSessionKey(sessionKey, (evt) => { + handleGatewayEvent(run, evt); + }); +} + /** Get metadata for all subagents belonging to a parent web session. */ export function getSubagentsForSession(parentWebSessionId: string): SubagentInfo[] { const reg = getRegistry(); @@ -296,7 +377,7 @@ export function ensureRegisteredFromDisk( runId: typeof entry.runId === "string" ? entry.runId : "", task: typeof entry.task === "string" ? entry.task : "", label: typeof entry.label === "string" ? entry.label : undefined, - }); + }, { fromDisk: true }); return true; } } @@ -336,6 +417,7 @@ function handleGatewayEvent(run: SubagentRun, evt: GatewayEvent): void { const emit = (event: SseEvent) => { run.eventBuffer.push(event); + persistEvent(run.sessionKey, event); for (const sub of run.subscribers) { try { sub(event); } catch { /* ignore */ } }