feat(web): enhance active-runs with WS RPC, subscribe retry, and chat event support

This commit is contained in:
kumarabhirup 2026-03-02 18:33:17 -08:00
parent f6769d3e05
commit 0d219413a8
No known key found for this signature in database
GPG Key ID: DB7CA2289CAB0167

View File

@ -8,7 +8,6 @@
* - Messages are written to persistent sessions as they arrive.
* - New HTTP connections can re-attach to a running stream.
*/
import { spawn } from "node:child_process";
import { createInterface } from "node:readline";
import { join } from "node:path";
import {
@ -23,6 +22,7 @@ import {
type AgentEvent,
spawnAgentProcess,
spawnAgentSubscribeProcess,
callGatewayRpc,
extractToolResult,
buildToolOutput,
parseAgentErrorMessage,
@ -75,6 +75,10 @@ export type ActiveRun = {
lastGlobalSeq: number;
/** @internal subscribe child process for waiting-for-subagents continuation */
_subscribeProcess?: AgentProcessHandle | null;
/** @internal retry timer for subscribe stream restarts */
_subscribeRetryTimer?: ReturnType<typeof setTimeout> | null;
/** @internal consecutive subscribe restart attempts without a received event */
_subscribeRetryAttempt?: number;
/** Full gateway session key (used for subagent subscribe-only runs) */
sessionKey?: string;
/** Parent web session ID (for subagent runs) */
@ -89,6 +93,8 @@ export type ActiveRun = {
_lifecycleEnded?: boolean;
/** Safety timer to finalize if subscribe process hangs after lifecycle/end */
_finalizeTimer?: ReturnType<typeof setTimeout> | null;
/** @internal short reconciliation window before waiting-run completion */
_waitingFinalizeTimer?: ReturnType<typeof setTimeout> | null;
};
// ── Constants ──
@ -96,6 +102,10 @@ export type ActiveRun = {
const PERSIST_INTERVAL_MS = 2_000;
const CLEANUP_GRACE_MS = 30_000;
const SUBSCRIBE_CLEANUP_GRACE_MS = 24 * 60 * 60_000;
const SUBSCRIBE_RETRY_BASE_MS = 300;
const SUBSCRIBE_RETRY_MAX_MS = 5_000;
const SUBSCRIBE_LIFECYCLE_END_GRACE_MS = 750;
const WAITING_FINALIZE_RECONCILE_MS = 5_000;
const SILENT_REPLY_TOKEN = "NO_REPLY";
@ -113,6 +123,48 @@ function isLeakedSilentReplyToken(text: string): boolean {
if (SILENT_REPLY_TOKEN.startsWith(t) && t.length >= 2 && t.length < SILENT_REPLY_TOKEN.length) {return true;}
return false;
}
function asRecord(value: unknown): Record<string, unknown> | null {
if (!value || typeof value !== "object" || Array.isArray(value)) {
return null;
}
return value as Record<string, unknown>;
}
function extractAssistantTextFromChatPayload(
data: Record<string, unknown> | undefined,
): string {
if (!data) {
return "";
}
const state = typeof data.state === "string" ? data.state : "";
if (state !== "final") {
return "";
}
const message = asRecord(data.message);
if (!message || message.role !== "assistant") {
return "";
}
const content = message.content;
if (typeof content === "string") {
return content;
}
if (!Array.isArray(content)) {
return "";
}
const chunks: string[] = [];
for (const part of content) {
const rec = asRecord(part);
if (!rec) {
continue;
}
const type = typeof rec.type === "string" ? rec.type : "";
if ((type === "text" || type === "output_text") && typeof rec.text === "string") {
chunks.push(rec.text);
}
}
return chunks.join("");
}
// Evaluated per-call so it tracks profile switches at runtime.
function webChatDir(): string { return resolveWebChatDir(); }
function indexFile(): string { return join(webChatDir(), "index.json"); }
@ -231,6 +283,8 @@ export function reactivateSubscribeRun(sessionKey: string): boolean {
run.status = "running";
run._lifecycleEnded = false;
if (run._finalizeTimer) {clearTimeout(run._finalizeTimer); run._finalizeTimer = null;}
clearWaitingFinalizeTimer(run);
resetSubscribeRetryState(run);
run.accumulated = {
id: `assistant-${sessionKey}-${Date.now()}`,
@ -251,21 +305,21 @@ export function reactivateSubscribeRun(sessionKey: string): boolean {
*/
export function sendSubagentFollowUp(sessionKey: string, message: string): boolean {
try {
const child = spawn(
"openclaw",
[
"gateway", "call", "agent",
"--params", JSON.stringify({
message, sessionKey,
idempotencyKey: `follow-${Date.now()}-${Math.random().toString(36).slice(2)}`,
deliver: false, channel: "webchat", lane: "subagent", timeout: 0,
}),
"--json", "--timeout", "10000",
],
{ env: { ...process.env }, stdio: "ignore", detached: true },
);
child.on("error", () => {});
child.unref();
void callGatewayRpc(
"agent",
{
message,
sessionKey,
idempotencyKey: `follow-${Date.now()}-${Math.random().toString(36).slice(2)}`,
deliver: false,
channel: "webchat",
lane: "subagent",
timeout: 0,
},
{ timeoutMs: 10_000 },
).catch(() => {
// Best effort.
});
return true;
} catch {
return false;
@ -304,6 +358,7 @@ export function abortRun(sessionId: string): boolean {
// false and the next user message isn't rejected with 409.
const wasWaiting = run.status === "waiting-for-subagents";
run.status = "error";
clearWaitingFinalizeTimer(run);
// Clean up waiting subscribe process if present.
stopSubscribeProcess(run);
@ -357,27 +412,11 @@ export function abortRun(sessionId: string): boolean {
function sendGatewayAbort(sessionId: string): void {
try {
const sessionKey = `agent:main:web:${sessionId}`;
const child = spawn(
"openclaw",
[
"gateway",
"call",
"chat.abort",
"--params",
JSON.stringify({ sessionKey }),
"--json",
"--timeout",
"4000",
],
{
env: { ...process.env },
stdio: "ignore",
detached: true,
void callGatewayRpc("chat.abort", { sessionKey }, { timeoutMs: 4_000 }).catch(
() => {
// Best effort; don't let abort failures break the stop flow.
},
);
child.on("error", () => {});
// Let the abort process run independently — don't block on it.
child.unref();
} catch {
// Best-effort; don't let abort failures break the stop flow.
}
@ -421,6 +460,9 @@ export function startRun(params: {
_persistTimer: null,
_lastPersistedAt: 0,
lastGlobalSeq: 0,
_subscribeRetryTimer: null,
_subscribeRetryAttempt: 0,
_waitingFinalizeTimer: null,
};
activeRuns.set(sessionId, run);
@ -487,6 +529,9 @@ export function startSubscribeRun(params: {
isSubscribeOnly: true,
_lifecycleEnded: false,
_finalizeTimer: null,
_subscribeRetryTimer: null,
_subscribeRetryAttempt: 0,
_waitingFinalizeTimer: null,
};
activeRuns.set(sessionKey, run);
@ -510,6 +555,7 @@ function wireSubscribeOnlyProcess(
let currentTextId = "";
let currentReasoningId = "";
let currentStatusReasoningLabel: string | null = null;
let textStarted = false;
let reasoningStarted = false;
let statusReasoningActive = false;
@ -561,6 +607,7 @@ function wireSubscribeOnlyProcess(
reasoningStarted = false;
statusReasoningActive = false;
}
currentStatusReasoningLabel = null;
accReasoningIdx = -1;
};
@ -577,6 +624,9 @@ function wireSubscribeOnlyProcess(
};
const openStatusReasoning = (label: string) => {
if (statusReasoningActive && currentStatusReasoningLabel === label) {
return;
}
closeReasoning();
closeText();
currentReasoningId = nextId("status");
@ -584,9 +634,22 @@ function wireSubscribeOnlyProcess(
emit({ type: "reasoning-delta", id: currentReasoningId, delta: label });
reasoningStarted = true;
statusReasoningActive = true;
currentStatusReasoningLabel = label;
};
const processEvent = (ev: AgentEvent) => {
const isLifecycleEndEvent =
ev.event === "agent" &&
ev.stream === "lifecycle" &&
ev.data?.phase === "end";
if (!isLifecycleEndEvent) {
if (run._finalizeTimer) {
clearTimeout(run._finalizeTimer);
run._finalizeTimer = null;
}
run._lifecycleEnded = false;
}
if (ev.event === "agent" && ev.stream === "lifecycle" && ev.data?.phase === "start") {
openStatusReasoning("Preparing response...");
}
@ -619,6 +682,22 @@ function wireSubscribeOnlyProcess(
emit({ type: "text-delta", id: currentTextId, delta: chunk });
accAppendText(chunk);
}
const mediaUrls = ev.data?.mediaUrls;
if (Array.isArray(mediaUrls)) {
for (const url of mediaUrls) {
if (typeof url === "string" && url.trim()) {
closeReasoning();
if (!textStarted) {
currentTextId = nextId("text");
emit({ type: "text-start", id: currentTextId });
textStarted = true;
}
const md = `\n![media](${url.trim()})\n`;
emit({ type: "text-delta", id: currentTextId, delta: md });
accAppendText(md);
}
}
}
if (typeof ev.data?.stopReason === "string" && ev.data.stopReason === "error" && typeof ev.data?.errorMessage === "string" && !agentErrorReported) {
agentErrorReported = true;
emitError(parseErrorBody(ev.data.errorMessage));
@ -644,6 +723,13 @@ function wireSubscribeOnlyProcess(
if (isError) {
const errorText = result?.text || (result?.details?.error as string | undefined) || "Tool execution failed";
emit({ type: "tool-output-error", toolCallId, errorText });
const idx = accToolMap.get(toolCallId);
if (idx !== undefined) {
const part = run.accumulated.parts[idx];
if (part.type === "tool-invocation") {
part.errorText = errorText;
}
}
} else {
const output = buildToolOutput(result);
emit({ type: "tool-output-available", toolCallId, output });
@ -662,12 +748,29 @@ function wireSubscribeOnlyProcess(
else if (phase === "end") {
if (statusReasoningActive) {
if (ev.data?.willRetry === true) {
emit({ type: "reasoning-delta", id: currentReasoningId, delta: "\nRetrying with compacted context..." });
const retryDelta = "\nRetrying with compacted context...";
emit({ type: "reasoning-delta", id: currentReasoningId, delta: retryDelta });
accAppendReasoning(retryDelta);
} else { closeReasoning(); }
}
}
}
if (ev.event === "chat") {
const finalText = extractAssistantTextFromChatPayload(ev.data);
if (finalText) {
closeReasoning();
if (!textStarted) {
currentTextId = nextId("text");
emit({ type: "text-start", id: currentTextId });
textStarted = true;
}
emit({ type: "text-delta", id: currentTextId, delta: finalText });
accAppendText(finalText);
closeText();
}
}
if (ev.event === "agent" && ev.stream === "lifecycle" && ev.data?.phase === "end") {
closeReasoning();
closeText();
@ -676,7 +779,7 @@ function wireSubscribeOnlyProcess(
run._finalizeTimer = setTimeout(() => {
run._finalizeTimer = null;
if (run.status === "running") { finalizeSubscribeRun(run); }
}, 5_000);
}, SUBSCRIBE_LIFECYCLE_END_GRACE_MS);
}
if (ev.event === "agent" && ev.stream === "lifecycle" && ev.data?.phase === "error" && !agentErrorReported) {
@ -705,25 +808,31 @@ function wireSubscribeOnlyProcess(
if (gSeq <= run.lastGlobalSeq) { return; }
run.lastGlobalSeq = gSeq;
}
if ((run._subscribeRetryAttempt ?? 0) > 0) {
resetSubscribeRetryState(run);
}
processEvent(ev);
});
child.on("close", () => {
if (run._subscribeProcess === child) { run._subscribeProcess = null; }
if (run._subscribeProcess !== child) {
return;
}
run._subscribeProcess = null;
if (run.status !== "running") { return; }
if (run._lifecycleEnded) {
if (run._finalizeTimer) { clearTimeout(run._finalizeTimer); run._finalizeTimer = null; }
finalizeSubscribeRun(run);
return;
}
setTimeout(() => {
scheduleSubscribeRestart(run, () => {
if (run.status === "running" && !run._subscribeProcess) {
const newChild = spawnAgentSubscribeProcess(sessionKey, run.lastGlobalSeq);
run._subscribeProcess = newChild;
run.childProcess = newChild;
wireSubscribeOnlyProcess(run, newChild, sessionKey);
}
}, 300);
});
});
child.on("error", (err) => {
@ -740,6 +849,8 @@ function wireSubscribeOnlyProcess(
function finalizeSubscribeRun(run: ActiveRun, status: "completed" | "error" = "completed"): void {
if (run.status !== "running") { return; }
if (run._finalizeTimer) { clearTimeout(run._finalizeTimer); run._finalizeTimer = null; }
clearWaitingFinalizeTimer(run);
resetSubscribeRetryState(run);
run.status = status;
flushPersistence(run);
@ -860,10 +971,12 @@ function wireChildProcess(run: ActiveRun): void {
let currentTextId = "";
let currentReasoningId = "";
let currentStatusReasoningLabel: string | null = null;
let textStarted = false;
let reasoningStarted = false;
let everSentText = false;
let statusReasoningActive = false;
let waitingStatusAnnounced = false;
let agentErrorReported = false;
const stderrChunks: string[] = [];
@ -909,6 +1022,7 @@ function wireChildProcess(run: ActiveRun): void {
reasoningStarted = false;
statusReasoningActive = false;
}
currentStatusReasoningLabel = null;
accReasoningIdx = -1;
};
@ -930,6 +1044,9 @@ function wireChildProcess(run: ActiveRun): void {
};
const openStatusReasoning = (label: string) => {
if (statusReasoningActive && currentStatusReasoningLabel === label) {
return;
}
closeReasoning();
closeText();
currentReasoningId = nextId("status");
@ -941,6 +1058,7 @@ function wireChildProcess(run: ActiveRun): void {
});
reasoningStarted = true;
statusReasoningActive = true;
currentStatusReasoningLabel = label;
accAppendReasoning(label);
};
@ -956,6 +1074,22 @@ function wireChildProcess(run: ActiveRun): void {
everSentText = true;
};
const emitAssistantFinalText = (text: string) => {
if (!text) {
return;
}
closeReasoning();
if (!textStarted) {
currentTextId = nextId("text");
emit({ type: "text-start", id: currentTextId });
textStarted = true;
}
everSentText = true;
emit({ type: "text-delta", id: currentTextId, delta: text });
accAppendText(text);
closeText();
};
// ── Parse stdout JSON lines ──
const rl = createInterface({ input: child.stdout! });
@ -1157,6 +1291,15 @@ function wireChildProcess(run: ActiveRun): void {
}
}
// Chat final events can include assistant turns from runs outside
// the original parent process (e.g. subagent announce follow-ups).
if (ev.event === "chat") {
const text = extractAssistantTextFromChatPayload(ev.data);
if (text) {
emitAssistantFinalText(text);
}
}
// Compaction
if (ev.event === "agent" && ev.stream === "compaction") {
const phase =
@ -1227,13 +1370,55 @@ function wireChildProcess(run: ActiveRun): void {
if (gSeq <= run.lastGlobalSeq) {return;}
run.lastGlobalSeq = gSeq;
}
const showWaitingStatus = () => {
if (!waitingStatusAnnounced) {
openStatusReasoning("Waiting for subagent results...");
waitingStatusAnnounced = true;
}
flushPersistence(run);
};
const scheduleWaitingCompletionCheck = () => {
clearWaitingFinalizeTimer(run);
run._waitingFinalizeTimer = setTimeout(() => {
run._waitingFinalizeTimer = null;
if (run.status !== "waiting-for-subagents") {
return;
}
if (hasRunningSubagentsForParent(run.sessionId)) {
showWaitingStatus();
return;
}
finalizeWaitingRun(run);
}, WAITING_FINALIZE_RECONCILE_MS);
};
// Any new parent event means waiting completion should be reconsidered
// from this point forward, not from a prior end/final checkpoint.
clearWaitingFinalizeTimer(run);
processParentEvent(ev);
if (ev.stream === "lifecycle" && ev.data?.phase === "end") {
if (hasRunningSubagentsForParent(run.sessionId)) {
openStatusReasoning("Waiting for subagent results...");
flushPersistence(run);
clearWaitingFinalizeTimer(run);
showWaitingStatus();
} else {
finalizeWaitingRun(run);
scheduleWaitingCompletionCheck();
}
}
if (ev.event === "chat") {
const payload = ev.data;
const state = typeof payload?.state === "string" ? payload.state : "";
const message = asRecord(payload?.message);
const role = typeof message?.role === "string" ? message.role : "";
if (state === "final" && role === "assistant") {
if (hasRunningSubagentsForParent(run.sessionId)) {
clearWaitingFinalizeTimer(run);
showWaitingStatus();
} else {
scheduleWaitingCompletionCheck();
}
}
}
};
@ -1315,7 +1500,10 @@ function wireChildProcess(run: ActiveRun): void {
if (exitedClean && hasRunningSubagents) {
run.status = "waiting-for-subagents";
openStatusReasoning("Waiting for subagent results...");
if (!waitingStatusAnnounced) {
openStatusReasoning("Waiting for subagent results...");
waitingStatusAnnounced = true;
}
flushPersistence(run);
startParentSubscribeStream(run, parentSessionKey, processParentSubscribeEvent);
return;
@ -1399,20 +1587,24 @@ function startParentSubscribeStream(
if (ev.sessionKey && ev.sessionKey !== parentSessionKey) {
return;
}
if ((run._subscribeRetryAttempt ?? 0) > 0) {
resetSubscribeRetryState(run);
}
onEvent(ev);
});
child.on("close", () => {
if (run._subscribeProcess === child) {
run._subscribeProcess = null;
if (run._subscribeProcess !== child) {
return;
}
run._subscribeProcess = null;
if (run.status !== "waiting-for-subagents") {return;}
// If still waiting, restart subscribe stream from the latest cursor.
setTimeout(() => {
scheduleSubscribeRestart(run, () => {
if (run.status === "waiting-for-subagents" && !run._subscribeProcess) {
startParentSubscribeStream(run, parentSessionKey, onEvent);
}
}, 300);
});
});
child.on("error", (err) => {
@ -1425,6 +1617,8 @@ function startParentSubscribeStream(
}
function stopSubscribeProcess(run: ActiveRun): void {
clearSubscribeRetryTimer(run);
clearWaitingFinalizeTimer(run);
if (!run._subscribeProcess) {return;}
try {
run._subscribeProcess.kill("SIGTERM");
@ -1445,6 +1639,8 @@ function finalizeWaitingRun(run: ActiveRun): void {
if (run.status !== "waiting-for-subagents") {return;}
run.status = "completed";
clearWaitingFinalizeTimer(run);
resetSubscribeRetryState(run);
stopSubscribeProcess(run);
@ -1462,6 +1658,43 @@ function finalizeWaitingRun(run: ActiveRun): void {
}, CLEANUP_GRACE_MS);
}
function clearWaitingFinalizeTimer(run: ActiveRun): void {
if (!run._waitingFinalizeTimer) {
return;
}
clearTimeout(run._waitingFinalizeTimer);
run._waitingFinalizeTimer = null;
}
function clearSubscribeRetryTimer(run: ActiveRun): void {
if (!run._subscribeRetryTimer) {
return;
}
clearTimeout(run._subscribeRetryTimer);
run._subscribeRetryTimer = null;
}
function resetSubscribeRetryState(run: ActiveRun): void {
run._subscribeRetryAttempt = 0;
clearSubscribeRetryTimer(run);
}
function scheduleSubscribeRestart(run: ActiveRun, restart: () => void): void {
if (run._subscribeRetryTimer) {
return;
}
const attempt = run._subscribeRetryAttempt ?? 0;
const delay = Math.min(
SUBSCRIBE_RETRY_MAX_MS,
SUBSCRIBE_RETRY_BASE_MS * 2 ** attempt,
);
run._subscribeRetryAttempt = attempt + 1;
run._subscribeRetryTimer = setTimeout(() => {
run._subscribeRetryTimer = null;
restart();
}, delay);
}
// ── Debounced persistence ──
function schedulePersist(run: ActiveRun) {
@ -1566,6 +1799,7 @@ function cleanupRun(sessionId: string) {
const run = activeRuns.get(sessionId);
if (!run) {return;}
if (run._persistTimer) {clearTimeout(run._persistTimer);}
clearWaitingFinalizeTimer(run);
stopSubscribeProcess(run);
activeRuns.delete(sessionId);
}