web: persist subagent events to disk and support rehydration after refresh

This commit is contained in:
kumarabhirup 2026-02-19 21:52:10 -08:00
parent f5c6fa186f
commit a0cc5834f2
No known key found for this signature in database
GPG Key ID: DB7CA2289CAB0167

View File

@ -6,7 +6,7 @@
*
* Events are fed from the gateway WebSocket connection (gateway-events.ts).
*/
import { existsSync, readFileSync } from "node:fs";
import { existsSync, readFileSync, mkdirSync, appendFileSync } from "node:fs";
import { join } from "node:path";
import {
extractToolResult,
@ -54,7 +54,7 @@ type TransformState = {
// ── Constants ──
const CLEANUP_GRACE_MS = 60_000;
const CLEANUP_GRACE_MS = 24 * 60 * 60_000; // 24 hours — events are persisted to disk
const GLOBAL_KEY = "__openclaw_subagentRuns" as const;
// ── Singleton registry ──
@ -82,28 +82,89 @@ function getRegistry(): SubagentRegistry {
return registry;
}
// ── Event persistence ──
function subagentEventsDir(): string {
return join(resolveOpenClawStateDir(), "web-chat", "subagent-events");
}
/** Filesystem-safe filename derived from a session key. */
function safeFilename(sessionKey: string): string {
return sessionKey.replaceAll(":", "_") + ".jsonl";
}
function persistEvent(sessionKey: string, event: SseEvent): void {
try {
const dir = subagentEventsDir();
mkdirSync(dir, { recursive: true });
appendFileSync(join(dir, safeFilename(sessionKey)), JSON.stringify(event) + "\n");
} catch { /* best-effort */ }
}
function loadPersistedEvents(sessionKey: string): SseEvent[] {
const filePath = join(subagentEventsDir(), safeFilename(sessionKey));
if (!existsSync(filePath)) {return [];}
try {
const lines = readFileSync(filePath, "utf-8").split("\n");
const events: SseEvent[] = [];
for (const line of lines) {
if (!line.trim()) {continue;}
try { events.push(JSON.parse(line) as SseEvent); } catch { /* skip */ }
}
return events;
} catch { return []; }
}
/** Read the on-disk registry entry and derive the proper status. */
function readDiskStatus(sessionKey: string): "running" | "completed" | "error" {
const registryPath = join(resolveOpenClawStateDir(), "subagents", "runs.json");
if (!existsSync(registryPath)) {return "running";}
try {
const raw = JSON.parse(readFileSync(registryPath, "utf-8"));
const runs = raw?.runs;
if (!runs || typeof runs !== "object") {return "running";}
for (const entry of Object.values(runs)) {
if (entry.childSessionKey === sessionKey) {
if (typeof entry.endedAt !== "number") {return "running";}
const outcome = entry.outcome as { status?: string } | undefined;
if (outcome?.status === "error") {return "error";}
return "completed";
}
}
} catch { /* ignore */ }
return "running";
}
// ── Public API ──
/**
* Register a newly spawned subagent. Called when the parent agent's
* `sessions_spawn` tool result is detected in active-runs.ts.
*
* When `fromDisk` is true, the run is being rehydrated after a refresh,
* so we load persisted events and set the correct status from the registry.
*/
export function registerSubagent(
parentWebSessionId: string,
info: { sessionKey: string; runId: string; task: string; label?: string },
options?: { fromDisk?: boolean },
): void {
const reg = getRegistry();
// Avoid duplicate registration
if (reg.runs.has(info.sessionKey)) {return;}
const fromDisk = options?.fromDisk ?? false;
const diskStatus = fromDisk ? readDiskStatus(info.sessionKey) : "running";
const run: SubagentRun = {
sessionKey: info.sessionKey,
runId: info.runId,
parentWebSessionId,
task: info.task,
label: info.label,
status: "running",
status: diskStatus,
startedAt: Date.now(),
eventBuffer: [],
subscribers: new Set(),
@ -112,6 +173,11 @@ export function registerSubagent(
_cleanupTimer: null,
};
// Load persisted events from disk (fills the replay buffer)
if (fromDisk) {
run.eventBuffer = loadPersistedEvents(info.sessionKey);
}
reg.runs.set(info.sessionKey, run);
// Update parent index
@ -122,11 +188,13 @@ export function registerSubagent(
}
keys.add(info.sessionKey);
// The primary event source is the parent agent's NDJSON stream, routed
// via routeRawEvent(). We do NOT subscribe to gateway WebSocket here to
// avoid duplicate events (the parent CLI already receives all broadcasts).
// NOTE: We do NOT subscribe to gateway WebSocket here. During live
// streaming, events arrive via routeRawEvent() from the parent's NDJSON
// stream. After the parent exits, activateGatewayFallback() subscribes.
// For on-demand rehydration (page refresh), ensureSubagentStreamable()
// handles the subscription.
// Replay any pre-registration buffered events
// Replay any pre-registration buffered events (live sessions only)
const buf = reg.preRegBuffer.get(info.sessionKey);
if (buf && buf.length > 0) {
for (const evt of buf) {
@ -136,6 +204,19 @@ export function registerSubagent(
}
}
/**
* Ensure a rehydrated subagent can receive live events. Called when a client
* actually connects to the subagent's SSE stream after a page refresh.
* For still-running subagents, this activates the gateway WebSocket fallback.
*/
export function ensureSubagentStreamable(sessionKey: string): void {
const run = getRegistry().runs.get(sessionKey);
if (!run || run.status !== "running" || run._unsubGateway) {return;}
run._unsubGateway = subscribeToSessionKey(sessionKey, (evt) => {
handleGatewayEvent(run, evt);
});
}
/** Get metadata for all subagents belonging to a parent web session. */
export function getSubagentsForSession(parentWebSessionId: string): SubagentInfo[] {
const reg = getRegistry();
@ -296,7 +377,7 @@ export function ensureRegisteredFromDisk(
runId: typeof entry.runId === "string" ? entry.runId : "",
task: typeof entry.task === "string" ? entry.task : "",
label: typeof entry.label === "string" ? entry.label : undefined,
});
}, { fromDisk: true });
return true;
}
}
@ -336,6 +417,7 @@ function handleGatewayEvent(run: SubagentRun, evt: GatewayEvent): void {
const emit = (event: SseEvent) => {
run.eventBuffer.push(event);
persistEvent(run.sessionKey, event);
for (const sub of run.subscribers) {
try { sub(event); } catch { /* ignore */ }
}