web: refactor subagent-runs to use direct gateway subscription with interactive messaging
This commit is contained in:
parent
3ee2528b75
commit
7d762b2b75
@ -6,13 +6,15 @@
|
||||
*
|
||||
* Events are fed from CLI NDJSON streams (parent run + subscribe continuations).
|
||||
*/
|
||||
import type { ChildProcess } from "node:child_process";
|
||||
import { type ChildProcess, spawn } from "node:child_process";
|
||||
import { randomUUID } from "node:crypto";
|
||||
import { createInterface } from "node:readline";
|
||||
import { existsSync, readFileSync, writeFileSync, mkdirSync, appendFileSync } from "node:fs";
|
||||
import { join } from "node:path";
|
||||
import {
|
||||
type AgentEvent,
|
||||
spawnAgentSubscribeProcess,
|
||||
resolvePackageRoot,
|
||||
extractToolResult,
|
||||
buildToolOutput,
|
||||
parseAgentErrorMessage,
|
||||
@ -68,8 +70,6 @@ type SubagentRegistry = {
|
||||
runs: Map<string, SubagentRun>;
|
||||
/** Reverse index: parent web session ID → subagent session keys */
|
||||
parentIndex: Map<string, Set<string>>;
|
||||
/** Pre-registration buffer: events that arrive before the subagent is registered */
|
||||
preRegBuffer: Map<string, AgentEvent[]>;
|
||||
};
|
||||
|
||||
function getRegistry(): SubagentRegistry {
|
||||
@ -81,7 +81,6 @@ function getRegistry(): SubagentRegistry {
|
||||
const registry: SubagentRegistry = {
|
||||
runs: new Map(),
|
||||
parentIndex: new Map(),
|
||||
preRegBuffer: new Map(),
|
||||
};
|
||||
(globalThis as Record<string, unknown>)[GLOBAL_KEY] = registry;
|
||||
return registry;
|
||||
@ -263,19 +262,9 @@ export function registerSubagent(
|
||||
endedAt: run.endedAt,
|
||||
});
|
||||
|
||||
// NOTE: We do NOT start subscribe child processes here. During live
|
||||
// streaming, events arrive via routeRawEvent() from the parent's NDJSON
|
||||
// stream. After the parent exits, activateGatewayFallback() subscribes.
|
||||
// For on-demand rehydration (page refresh), ensureSubagentStreamable()
|
||||
// handles the subscription.
|
||||
|
||||
// Replay any pre-registration buffered events (live sessions only)
|
||||
const buf = reg.preRegBuffer.get(info.sessionKey);
|
||||
if (buf && buf.length > 0) {
|
||||
for (const evt of buf) {
|
||||
handleAgentEvent(run, evt);
|
||||
}
|
||||
reg.preRegBuffer.delete(info.sessionKey);
|
||||
// Subagents are first-class sessions; subscribe immediately.
|
||||
if (run.status === "running") {
|
||||
startSubagentSubscribeStream(run);
|
||||
}
|
||||
}
|
||||
|
||||
@ -365,20 +354,6 @@ export function isSubagentRunning(sessionKey: string): boolean {
|
||||
* After that point the NDJSON routing is no longer available, so the
|
||||
* subscribe child streams become the only event source for orphaned subagents.
|
||||
*/
|
||||
export function activateGatewayFallback(): void {
|
||||
const reg = getRegistry();
|
||||
for (const [_key, run] of reg.runs) {
|
||||
if (run.status === "running" && !run._subscribeProcess) {
|
||||
startSubagentSubscribeStream(run);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Check if any subagents spawned by this parent web session are still running.
|
||||
* Cross-checks with the on-disk registry to reconcile subagents whose
|
||||
* lifecycle "end" events were missed (e.g. during gateway WS handshake).
|
||||
*/
|
||||
export function hasRunningSubagentsForParent(parentWebSessionId: string): boolean {
|
||||
const reg = getRegistry();
|
||||
const keys = reg.parentIndex.get(parentWebSessionId);
|
||||
@ -408,42 +383,128 @@ export function getRunningSubagentKeys(): string[] {
|
||||
return keys;
|
||||
}
|
||||
|
||||
/**
|
||||
* Route a raw NDJSON agent event (from the CLI child process stdout) to the
|
||||
* appropriate subagent run. This is the primary event source -- the parent
|
||||
* agent's CLI process already receives all gateway broadcasts, so we piggyback
|
||||
* on its NDJSON stream instead of maintaining a separate WebSocket connection.
|
||||
*
|
||||
* Routes a flat NDJSON event shape to the subagent transformer.
|
||||
*/
|
||||
export function routeRawEvent(
|
||||
export function persistUserMessage(
|
||||
sessionKey: string,
|
||||
ev: { event: string; stream?: string; data?: Record<string, unknown>; globalSeq?: number },
|
||||
): void {
|
||||
const sourceEvent: AgentEvent = {
|
||||
event: ev.event,
|
||||
sessionKey,
|
||||
stream: ev.stream,
|
||||
data: ev.data,
|
||||
globalSeq: ev.globalSeq,
|
||||
};
|
||||
|
||||
msg: { id?: string; text: string },
|
||||
): boolean {
|
||||
const run = getRegistry().runs.get(sessionKey);
|
||||
if (run) {
|
||||
handleAgentEvent(run, sourceEvent);
|
||||
return;
|
||||
if (!run) {return false;}
|
||||
const event: SseEvent = {
|
||||
type: "user-message",
|
||||
id: msg.id ?? `user-${Date.now()}-${Math.random().toString(36).slice(2)}`,
|
||||
text: msg.text,
|
||||
};
|
||||
run.eventBuffer.push(event);
|
||||
persistEvent(sessionKey, event);
|
||||
for (const sub of run.subscribers) {
|
||||
try { sub(event); } catch { /* ignore */ }
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
// Buffer events that arrive before the subagent is registered
|
||||
// (runs.json may not be written yet). These are replayed on registration.
|
||||
const reg = getRegistry();
|
||||
let buf = reg.preRegBuffer.get(sessionKey);
|
||||
if (!buf) {
|
||||
buf = [];
|
||||
reg.preRegBuffer.set(sessionKey, buf);
|
||||
export function reactivateSubagent(sessionKey: string): boolean {
|
||||
const run = getRegistry().runs.get(sessionKey);
|
||||
if (!run) {return false;}
|
||||
if (run._cleanupTimer) {
|
||||
clearTimeout(run._cleanupTimer);
|
||||
run._cleanupTimer = null;
|
||||
}
|
||||
if (buf.length < 10_000) {
|
||||
buf.push(sourceEvent);
|
||||
run.status = "running";
|
||||
run.endedAt = undefined;
|
||||
upsertSubagentIndex(run.sessionKey, {
|
||||
runId: run.runId,
|
||||
parentWebSessionId: run.parentWebSessionId,
|
||||
task: run.task,
|
||||
label: run.label,
|
||||
status: run.status,
|
||||
startedAt: run.startedAt,
|
||||
endedAt: run.endedAt,
|
||||
});
|
||||
startSubagentSubscribeStream(run);
|
||||
return true;
|
||||
}
|
||||
|
||||
function sendGatewayAbortForSubagent(sessionKey: string): void {
|
||||
try {
|
||||
const root = resolvePackageRoot();
|
||||
const devScript = join(root, "scripts", "run-node.mjs");
|
||||
const prodScript = join(root, "openclaw.mjs");
|
||||
const scriptPath = existsSync(devScript) ? devScript : prodScript;
|
||||
const child = spawn(
|
||||
"node",
|
||||
[
|
||||
scriptPath,
|
||||
"gateway",
|
||||
"call",
|
||||
"chat.abort",
|
||||
"--params",
|
||||
JSON.stringify({ sessionKey }),
|
||||
"--json",
|
||||
"--timeout",
|
||||
"4000",
|
||||
],
|
||||
{
|
||||
cwd: root,
|
||||
env: { ...process.env },
|
||||
stdio: "ignore",
|
||||
detached: true,
|
||||
},
|
||||
);
|
||||
child.unref();
|
||||
} catch {
|
||||
// best effort
|
||||
}
|
||||
}
|
||||
|
||||
export function abortSubagent(sessionKey: string): boolean {
|
||||
const run = getRegistry().runs.get(sessionKey);
|
||||
if (!run || run.status !== "running") {return false;}
|
||||
sendGatewayAbortForSubagent(sessionKey);
|
||||
finalizeRun(run, "error");
|
||||
return true;
|
||||
}
|
||||
|
||||
export function spawnSubagentMessage(sessionKey: string, message: string): boolean {
|
||||
try {
|
||||
const run = getRegistry().runs.get(sessionKey);
|
||||
if (!run) {return false;}
|
||||
const root = resolvePackageRoot();
|
||||
const devScript = join(root, "scripts", "run-node.mjs");
|
||||
const prodScript = join(root, "openclaw.mjs");
|
||||
const scriptPath = existsSync(devScript) ? devScript : prodScript;
|
||||
const idempotencyKey = randomUUID();
|
||||
const child = spawn(
|
||||
"node",
|
||||
[
|
||||
scriptPath,
|
||||
"gateway",
|
||||
"call",
|
||||
"agent",
|
||||
"--params",
|
||||
JSON.stringify({
|
||||
message,
|
||||
sessionKey,
|
||||
idempotencyKey,
|
||||
deliver: false,
|
||||
channel: "webchat",
|
||||
lane: "subagent",
|
||||
timeout: 0,
|
||||
}),
|
||||
"--json",
|
||||
"--timeout",
|
||||
"10000",
|
||||
],
|
||||
{
|
||||
cwd: root,
|
||||
env: { ...process.env },
|
||||
stdio: "ignore",
|
||||
detached: true,
|
||||
},
|
||||
);
|
||||
child.unref();
|
||||
return true;
|
||||
} catch {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user