fix(agent-runner): ensure tool events stream on first chat message

sessions.patch for verboseLevel=full was called before the agent RPC
created the session, so it silently failed on new chats. Tool events
were never emitted and the frontend only showed brief text responses.

Now patches both before (for existing sessions) and after the agent RPC
(for newly created sessions). Also adds SSE keepalive to the POST /api/chat
stream to prevent connection drops during long tool executions, and removes
the unused legacy CLI spawn codepath.
This commit is contained in:
kumarabhirup 2026-03-06 23:30:31 -08:00
parent eaef8df20b
commit 36e9cf9517
No known key found for this signature in database
GPG Key ID: DB7CA2289CAB0167
5 changed files with 256 additions and 459 deletions

View File

@ -134,6 +134,7 @@ export async function POST(req: Request) {
const encoder = new TextEncoder();
let closed = false;
let unsubscribe: (() => void) | null = null;
let keepalive: ReturnType<typeof setInterval> | null = null;
const stream = new ReadableStream({
start(controller) {
@ -142,12 +143,20 @@ export async function POST(req: Request) {
return;
}
keepalive = setInterval(() => {
if (closed) {return;}
try {
controller.enqueue(encoder.encode(": keepalive\n\n"));
} catch { /* ignore enqueue errors on closed stream */ }
}, 15_000);
unsubscribe = subscribeToRun(
runKey,
(event: SseEvent | null) => {
if (closed) {return;}
if (event === null) {
closed = true;
if (keepalive) { clearInterval(keepalive); keepalive = null; }
try { controller.close(); } catch { /* already closed */ }
return;
}
@ -164,11 +173,13 @@ export async function POST(req: Request) {
if (!unsubscribe) {
closed = true;
if (keepalive) { clearInterval(keepalive); keepalive = null; }
controller.close();
}
},
cancel() {
closed = true;
if (keepalive) { clearInterval(keepalive); keepalive = null; }
unsubscribe?.();
},
});

View File

@ -1098,16 +1098,238 @@ describe("active-runs", () => {
// ── multiple concurrent runs ─────────────────────────────────────
describe("multiple concurrent runs", () => {
let concurrentCounter = 0;
async function setupConcurrent() {
concurrentCounter += 1;
const prefix = `conc-${concurrentCounter}`;
const childA = createMockChild();
const childB = createMockChild();
const { spawnAgentProcess } = await import("./agent-runner.js");
vi.mocked(spawnAgentProcess)
.mockReturnValueOnce(childA as unknown as ChildProcess)
.mockReturnValueOnce(childB as unknown as ChildProcess);
const mod = await import("./active-runs.js");
return { childA, childB, prefix, ...mod };
}
it("tracks multiple sessions independently", async () => {
const { startRun, hasActiveRun, getActiveRun } = await setup();
const { childA, childB, prefix, startRun, abortRun, hasActiveRun, getActiveRun } =
await setupConcurrent();
startRun({ sessionId: "s-a", message: "first", agentSessionId: "s-a" });
startRun({ sessionId: "s-b", message: "second", agentSessionId: "s-b" });
const idA = `${prefix}-track-a`;
const idB = `${prefix}-track-b`;
expect(hasActiveRun("s-a")).toBe(true);
expect(hasActiveRun("s-b")).toBe(true);
expect(getActiveRun("s-a")?.status).toBe("running");
expect(getActiveRun("s-b")?.status).toBe("running");
startRun({ sessionId: idA, message: "first", agentSessionId: idA });
startRun({ sessionId: idB, message: "second", agentSessionId: idB });
expect(hasActiveRun(idA)).toBe(true);
expect(hasActiveRun(idB)).toBe(true);
expect(getActiveRun(idA)?.status).toBe("running");
expect(getActiveRun(idB)?.status).toBe("running");
abortRun(idA);
abortRun(idB);
});
it("delivers events to the correct session without cross-contamination", async () => {
const { childA, childB, prefix, startRun, abortRun, subscribeToRun } =
await setupConcurrent();
const idA = `${prefix}-iso-a`;
const idB = `${prefix}-iso-b`;
startRun({ sessionId: idA, message: "first", agentSessionId: idA });
startRun({ sessionId: idB, message: "second", agentSessionId: idB });
const eventsA: SseEvent[] = [];
const eventsB: SseEvent[] = [];
subscribeToRun(idA, (e) => { if (e) eventsA.push(e); }, { replay: false });
subscribeToRun(idB, (e) => { if (e) eventsB.push(e); }, { replay: false });
childA._writeLine({
event: "agent", stream: "assistant",
data: { delta: "Hello from A" },
});
childB._writeLine({
event: "agent", stream: "assistant",
data: { delta: "Hello from B" },
});
await new Promise((r) => setTimeout(r, 50));
expect(eventsA.some((e) => e.type === "text-delta" && e.delta === "Hello from A")).toBe(true);
expect(eventsA.some((e) => e.type === "text-delta" && e.delta === "Hello from B")).toBe(false);
expect(eventsB.some((e) => e.type === "text-delta" && e.delta === "Hello from B")).toBe(true);
expect(eventsB.some((e) => e.type === "text-delta" && e.delta === "Hello from A")).toBe(false);
childA.stdout.end();
childB.stdout.end();
await new Promise((r) => setTimeout(r, 50));
childA._emit("close", 0);
childB._emit("close", 0);
});
it("completing one session does not affect the other", async () => {
const { childA, childB, prefix, startRun, abortRun, hasActiveRun, getActiveRun } =
await setupConcurrent();
const idA = `${prefix}-comp-a`;
const idB = `${prefix}-comp-b`;
startRun({ sessionId: idA, message: "first", agentSessionId: idA });
startRun({ sessionId: idB, message: "second", agentSessionId: idB });
childA.stdout.end();
await new Promise((r) => setTimeout(r, 50));
childA._emit("close", 0);
expect(hasActiveRun(idA)).toBe(false);
expect(getActiveRun(idA)?.status).toBe("completed");
expect(hasActiveRun(idB)).toBe(true);
expect(getActiveRun(idB)?.status).toBe("running");
childB.stdout.end();
await new Promise((r) => setTimeout(r, 50));
childB._emit("close", 0);
});
it("aborting one session does not affect the other", async () => {
const { prefix, startRun, abortRun, hasActiveRun, getActiveRun } =
await setupConcurrent();
const idA = `${prefix}-abt-a`;
const idB = `${prefix}-abt-b`;
startRun({ sessionId: idA, message: "first", agentSessionId: idA });
startRun({ sessionId: idB, message: "second", agentSessionId: idB });
abortRun(idA);
expect(hasActiveRun(idA)).toBe(false);
expect(getActiveRun(idA)?.status).toBe("error");
expect(hasActiveRun(idB)).toBe(true);
expect(getActiveRun(idB)?.status).toBe("running");
abortRun(idB);
});
it("session B can still receive events after session A completes", async () => {
const { childA, childB, prefix, startRun, subscribeToRun, hasActiveRun } =
await setupConcurrent();
const idA = `${prefix}-cont-a`;
const idB = `${prefix}-cont-b`;
startRun({ sessionId: idA, message: "first", agentSessionId: idA });
startRun({ sessionId: idB, message: "second", agentSessionId: idB });
const eventsB: SseEvent[] = [];
subscribeToRun(idB, (e) => { if (e) eventsB.push(e); }, { replay: false });
childA.stdout.end();
await new Promise((r) => setTimeout(r, 50));
childA._emit("close", 0);
expect(hasActiveRun(idA)).toBe(false);
childB._writeLine({
event: "agent", stream: "assistant",
data: { delta: "Still running on B" },
});
await new Promise((r) => setTimeout(r, 50));
expect(eventsB.some(
(e) => e.type === "text-delta" && e.delta === "Still running on B",
)).toBe(true);
childB.stdout.end();
await new Promise((r) => setTimeout(r, 50));
childB._emit("close", 0);
});
it("both sessions can stream tools concurrently", async () => {
const { childA, childB, prefix, startRun, subscribeToRun } =
await setupConcurrent();
const idA = `${prefix}-tool-a`;
const idB = `${prefix}-tool-b`;
startRun({ sessionId: idA, message: "first", agentSessionId: idA });
startRun({ sessionId: idB, message: "second", agentSessionId: idB });
const eventsA: SseEvent[] = [];
const eventsB: SseEvent[] = [];
subscribeToRun(idA, (e) => { if (e) eventsA.push(e); }, { replay: false });
subscribeToRun(idB, (e) => { if (e) eventsB.push(e); }, { replay: false });
childA._writeLine({
event: "agent", stream: "tool",
data: { phase: "start", toolCallId: "tc-a-1", name: "search", args: { q: "query A" } },
});
childB._writeLine({
event: "agent", stream: "tool",
data: { phase: "start", toolCallId: "tc-b-1", name: "browser", args: { url: "example.com" } },
});
await new Promise((r) => setTimeout(r, 50));
expect(eventsA.some(
(e) => e.type === "tool-input-start" && e.toolCallId === "tc-a-1",
)).toBe(true);
expect(eventsA.some(
(e) => e.type === "tool-input-start" && e.toolCallId === "tc-b-1",
)).toBe(false);
expect(eventsB.some(
(e) => e.type === "tool-input-start" && e.toolCallId === "tc-b-1",
)).toBe(true);
expect(eventsB.some(
(e) => e.type === "tool-input-start" && e.toolCallId === "tc-a-1",
)).toBe(false);
childA.stdout.end();
childB.stdout.end();
await new Promise((r) => setTimeout(r, 50));
childA._emit("close", 0);
childB._emit("close", 0);
});
it("duplicate run is rejected per-session, not globally", async () => {
const { prefix, startRun, abortRun, hasActiveRun } =
await setupConcurrent();
const { spawnAgentProcess } = await import("./agent-runner.js");
const idA = `${prefix}-dup-a`;
const idB = `${prefix}-dup-b`;
const idC = `${prefix}-dup-c`;
startRun({ sessionId: idA, message: "first", agentSessionId: idA });
startRun({ sessionId: idB, message: "second", agentSessionId: idB });
expect(hasActiveRun(idA)).toBe(true);
expect(hasActiveRun(idB)).toBe(true);
const childC = createMockChild();
vi.mocked(spawnAgentProcess).mockReturnValueOnce(
childC as unknown as ChildProcess,
);
expect(() =>
startRun({ sessionId: idC, message: "third", agentSessionId: idC }),
).not.toThrow();
expect(hasActiveRun(idC)).toBe(true);
expect(() =>
startRun({ sessionId: idA, message: "dupe", agentSessionId: idA }),
).toThrow("Active run already exists");
abortRun(idA);
abortRun(idB);
abortRun(idC);
});
});

View File

@ -1,21 +1,10 @@
import { spawn, type ChildProcess } from "node:child_process";
import { EventEmitter } from "node:events";
import { describe, it, expect, vi, beforeEach, afterEach } from "vitest";
vi.mock("node:child_process", async (importOriginal) => {
const actual = await importOriginal<typeof import("node:child_process")>();
return {
...actual,
spawn: vi.fn(),
};
});
vi.mock("./workspace", () => ({
resolveActiveAgentId: () => "main",
getEffectiveProfile: () => undefined,
resolveWorkspaceRoot: () => undefined,
resolveOpenClawStateDir: () => "/tmp/__agent_runner_test_state",
}));
const spawnMock = vi.mocked(spawn);
// Valid client IDs the Gateway accepts (from ui/src/ui/contracts/gateway-client-info.ts).
// Hardcoded here so the test breaks if our code drifts from the Gateway's enum.
@ -145,39 +134,6 @@ async function waitFor(
throw new Error("Condition not met in waitFor");
}
/** Minimal mock ChildProcess for legacy CLI tests. */
function mockChildProcess() {
const events: Record<string, ((...args: unknown[]) => void)[]> = {};
const child = {
exitCode: null as number | null,
killed: false,
pid: 12345,
stdout: {
on: vi.fn(),
[Symbol.asyncIterator]: vi.fn(),
},
stderr: { on: vi.fn() },
on: vi.fn((event: string, cb: (...args: unknown[]) => void) => {
events[event] = events[event] || [];
events[event].push(cb);
return child;
}),
once: vi.fn((event: string, cb: (...args: unknown[]) => void) => {
events[event] = events[event] || [];
events[event].push(cb);
return child;
}),
kill: vi.fn(),
_emit(event: string, ...args: unknown[]) {
for (const cb of events[event] || []) {
cb(...args);
}
},
};
spawnMock.mockReturnValue(child as unknown as ChildProcess);
return child;
}
describe("agent-runner", () => {
const originalEnv = { ...process.env };
@ -185,13 +141,6 @@ describe("agent-runner", () => {
vi.resetModules();
vi.restoreAllMocks();
process.env = { ...originalEnv };
vi.mock("node:child_process", async (importOriginal) => {
const actual = await importOriginal<typeof import("node:child_process")>();
return {
...actual,
spawn: vi.fn(),
};
});
});
afterEach(() => {
@ -278,7 +227,6 @@ describe("agent-runner", () => {
describe("spawnAgentProcess", () => {
it("connects via ws module with Origin header matching the gateway URL (prevents origin rejection)", async () => {
const MockWs = installMockWsModule();
delete process.env.DENCHCLAW_WEB_FORCE_LEGACY_STREAM;
const { spawnAgentProcess } = await import("./agent-runner.js");
const proc = spawnAgentProcess("hello", "sess-1");
@ -304,7 +252,6 @@ describe("agent-runner", () => {
it("sets wss: origin to https: (prevents origin mismatch on TLS gateways)", async () => {
const MockWs = installMockWsModule();
delete process.env.DENCHCLAW_WEB_FORCE_LEGACY_STREAM;
process.env.OPENCLAW_GATEWAY_URL = "wss://gateway.example.com:443";
const { spawnAgentProcess } = await import("./agent-runner.js");
@ -319,7 +266,6 @@ describe("agent-runner", () => {
it("falls back to config gateway port when env port is stale", async () => {
const MockWs = installMockWsModule();
delete process.env.DENCHCLAW_WEB_FORCE_LEGACY_STREAM;
process.env.OPENCLAW_HOME = "/tmp/__ironclaw_agent_runner_test_no_config";
process.env.OPENCLAW_GATEWAY_PORT = "19001";
MockWs.failOpenForUrls.add("ws://127.0.0.1:19001/");
@ -347,66 +293,11 @@ describe("agent-runner", () => {
proc.kill("SIGTERM");
});
it("does not use child_process.spawn for WebSocket transport", async () => {
installMockWsModule();
delete process.env.DENCHCLAW_WEB_FORCE_LEGACY_STREAM;
const { spawn: mockSpawn } = await import("node:child_process");
vi.mocked(mockSpawn).mockClear();
const { spawnAgentProcess } = await import("./agent-runner.js");
const proc = spawnAgentProcess("msg");
await new Promise((r) => setTimeout(r, 50));
expect(vi.mocked(mockSpawn)).not.toHaveBeenCalled();
proc.kill("SIGTERM");
});
it("falls back to CLI spawn when DENCHCLAW_WEB_FORCE_LEGACY_STREAM is set", async () => {
process.env.DENCHCLAW_WEB_FORCE_LEGACY_STREAM = "1";
const { spawn: mockSpawn } = await import("node:child_process");
const child = mockChildProcess();
vi.mocked(mockSpawn).mockReturnValue(child as unknown as ChildProcess);
const { spawnAgentProcess } = await import("./agent-runner.js");
spawnAgentProcess("hello");
expect(vi.mocked(mockSpawn)).toHaveBeenCalledWith(
"openclaw",
expect.arrayContaining(["agent", "--agent", "main", "--message", "hello", "--stream-json"]),
expect.objectContaining({
stdio: ["ignore", "pipe", "pipe"],
}),
);
});
it("includes session-key and lane args in legacy CLI mode", async () => {
process.env.DENCHCLAW_WEB_FORCE_LEGACY_STREAM = "1";
const { spawn: mockSpawn } = await import("node:child_process");
const child = mockChildProcess();
vi.mocked(mockSpawn).mockReturnValue(child as unknown as ChildProcess);
const { spawnAgentProcess } = await import("./agent-runner.js");
spawnAgentProcess("msg", "session-123");
expect(vi.mocked(mockSpawn)).toHaveBeenCalledWith(
"openclaw",
expect.arrayContaining([
"--session-key",
"agent:main:web:session-123",
"--lane",
"web",
"--channel",
"webchat",
]),
expect.anything(),
);
});
});
describe("spawnAgentSubscribeProcess", () => {
it("subscribes via connect -> sessions.patch -> agent.subscribe", async () => {
const MockWs = installMockWsModule();
delete process.env.DENCHCLAW_WEB_FORCE_LEGACY_STREAM;
const { spawnAgentSubscribeProcess } = await import("./agent-runner.js");
const proc = spawnAgentSubscribeProcess("agent:main:web:sess-sub", 12);
@ -433,7 +324,6 @@ describe("agent-runner", () => {
it("uses payload.globalSeq (not frame seq) for cursor filtering", async () => {
const MockWs = installMockWsModule();
delete process.env.DENCHCLAW_WEB_FORCE_LEGACY_STREAM;
const { spawnAgentSubscribeProcess } = await import("./agent-runner.js");
const proc = spawnAgentSubscribeProcess("agent:main:web:sess-gseq", 5);
@ -496,7 +386,6 @@ describe("agent-runner", () => {
it("keeps subscribe workers alive across lifecycle end events", async () => {
const MockWs = installMockWsModule();
delete process.env.DENCHCLAW_WEB_FORCE_LEGACY_STREAM;
const { spawnAgentSubscribeProcess } = await import("./agent-runner.js");
const proc = spawnAgentSubscribeProcess("agent:main:web:sess-sticky", 0);
@ -552,7 +441,6 @@ describe("agent-runner", () => {
it("drops subscribe events missing a matching session key", async () => {
const MockWs = installMockWsModule();
delete process.env.DENCHCLAW_WEB_FORCE_LEGACY_STREAM;
const { spawnAgentSubscribeProcess } = await import("./agent-runner.js");
const proc = spawnAgentSubscribeProcess("agent:main:web:sess-filter", 0);
@ -609,7 +497,6 @@ describe("agent-runner", () => {
ok: false,
error: { message: "unknown method: agent.subscribe" },
});
delete process.env.DENCHCLAW_WEB_FORCE_LEGACY_STREAM;
const { spawnAgentSubscribeProcess } = await import("./agent-runner.js");
const proc = spawnAgentSubscribeProcess("agent:main:web:sess-passive", 0);
@ -650,7 +537,6 @@ describe("agent-runner", () => {
ok: false,
error: { message: "unknown method: agent.subscribe" },
});
delete process.env.DENCHCLAW_WEB_FORCE_LEGACY_STREAM;
const { spawnAgentSubscribeProcess } = await import("./agent-runner.js");
const first = spawnAgentSubscribeProcess("agent:main:web:sess-cache", 0);

View File

@ -1,16 +1,12 @@
import { spawn } from "node:child_process";
import { randomUUID } from "node:crypto";
import { EventEmitter } from "node:events";
import { existsSync, readFileSync } from "node:fs";
import { join } from "node:path";
import { createInterface } from "node:readline";
import { PassThrough } from "node:stream";
import NodeWebSocket from "ws";
import {
getEffectiveProfile,
resolveActiveAgentId,
resolveOpenClawStateDir,
resolveWorkspaceRoot,
} from "./workspace";
export type AgentEvent = {
@ -35,38 +31,6 @@ export type ToolResult = {
details?: Record<string, unknown>;
};
export type AgentCallback = {
onTextDelta: (delta: string) => void;
onThinkingDelta: (delta: string) => void;
onToolStart: (
toolCallId: string,
toolName: string,
args?: Record<string, unknown>,
) => void;
onToolEnd: (
toolCallId: string,
toolName: string,
isError: boolean,
result?: ToolResult,
) => void;
/** Called when the agent run is picked up and starts executing. */
onLifecycleStart?: () => void;
onLifecycleEnd: () => void;
/** Called when session auto-compaction begins. */
onCompactionStart?: () => void;
/** Called when session auto-compaction finishes. */
onCompactionEnd?: (willRetry: boolean) => void;
/** Called when a running tool emits a progress update. */
onToolUpdate?: (
toolCallId: string,
toolName: string,
) => void;
onError: (error: Error) => void;
onClose: (code: number | null) => void;
/** Called when the agent encounters an API or runtime error (402, rate limit, etc.) */
onAgentError?: (message: string) => void;
};
/**
* Extract text content from the agent's tool result object.
* The result has `content: Array<{ type: "text", text: string } | ...>` and
@ -115,11 +79,6 @@ export function extractToolResult(
return { text, details };
}
export type RunAgentOptions = {
/** When set, the agent runs in an isolated web chat session. */
sessionId?: string;
};
export type AgentProcessHandle = {
stdout: NodeJS.ReadableStream | null;
stderr: NodeJS.ReadableStream | null;
@ -630,11 +589,13 @@ class GatewayProcessHandle
throw new Error(frameErrorMessage(connectRes));
}
if (this.params.sessionKey) {
await this.ensureFullToolVerbose(this.params.sessionKey);
}
if (this.params.mode === "start") {
// Pre-patch verbose for existing sessions (best-effort; new
// sessions don't exist yet so this may fail — we retry below).
if (this.params.sessionKey) {
await this.ensureFullToolVerbose(this.params.sessionKey);
}
const sessionKey = this.params.sessionKey;
const startRes = await this.client.request("agent", {
message: this.params.message ?? "",
@ -652,11 +613,19 @@ class GatewayProcessHandle
const runId =
payload && typeof payload.runId === "string" ? payload.runId : null;
this.runId = runId;
// Retry verbose patch now that the agent RPC has created the
// session. This is the critical path for first-message-in-chat
// where the pre-patch above failed.
if (sessionKey) {
await this.ensureFullToolVerbose(sessionKey);
}
} else {
const sessionKey = this.params.sessionKey;
if (!sessionKey) {
throw new Error("Missing session key for subscribe mode");
}
await this.ensureFullToolVerbose(sessionKey);
if (cachedAgentSubscribeSupport !== "unsupported") {
const subscribeRes = await this.client.request("agent.subscribe", {
sessionKey,
@ -879,11 +848,6 @@ class GatewayProcessHandle
}
}
function shouldForceLegacyStream(): boolean {
const raw = process.env.DENCHCLAW_WEB_FORCE_LEGACY_STREAM?.trim().toLowerCase();
return raw === "1" || raw === "true" || raw === "yes";
}
export async function callGatewayRpc(
method: string,
params?: Record<string, unknown>,
@ -919,16 +883,12 @@ export async function callGatewayRpc(
}
/**
* Spawn an agent child process and return the ChildProcess handle.
* Shared between `runAgent` (legacy callback API) and the ActiveRunManager.
* Start an agent run via the Gateway WebSocket and return a process handle.
*/
export function spawnAgentProcess(
message: string,
agentSessionId?: string,
): AgentProcessHandle {
if (shouldForceLegacyStream()) {
return spawnLegacyAgentProcess(message, agentSessionId);
}
const agentId = resolveActiveAgentId();
const sessionKey = agentSessionId
? `agent:${agentId}:web:${agentSessionId}`
@ -941,44 +901,6 @@ export function spawnAgentProcess(
});
}
function spawnLegacyAgentProcess(
message: string,
agentSessionId?: string,
): ReturnType<typeof spawn> {
return spawnCliAgentProcess(message, agentSessionId);
}
function spawnCliAgentProcess(
message: string,
agentSessionId?: string,
): ReturnType<typeof spawn> {
const cliAgentId = resolveActiveAgentId();
const args = [
"agent",
"--agent",
cliAgentId,
"--message",
message,
"--stream-json",
];
if (agentSessionId) {
const sessionKey = `agent:${cliAgentId}:web:${agentSessionId}`;
args.push("--session-key", sessionKey, "--lane", "web", "--channel", "webchat");
}
const profile = getEffectiveProfile();
const workspace = resolveWorkspaceRoot();
return spawn("openclaw", args, {
env: {
...process.env,
...(profile ? { OPENCLAW_PROFILE: profile } : {}),
...(workspace ? { OPENCLAW_WORKSPACE: workspace } : {}),
},
stdio: ["ignore", "pipe", "pipe"],
});
}
/**
* Spawn a subscribe-only agent child process that tails a session key's events.
* Uses the same runtime/env wiring as spawnAgentProcess.
@ -987,9 +909,6 @@ export function spawnAgentSubscribeProcess(
sessionKey: string,
afterSeq = 0,
): AgentProcessHandle {
if (shouldForceLegacyStream()) {
return spawnLegacyAgentSubscribeProcess(sessionKey, afterSeq);
}
return new GatewayProcessHandle({
mode: "subscribe",
sessionKey,
@ -1015,31 +934,6 @@ export function spawnAgentStartForSession(
});
}
function spawnLegacyAgentSubscribeProcess(
sessionKey: string,
afterSeq = 0,
): ReturnType<typeof spawn> {
const args = [
"agent",
"--stream-json",
"--subscribe-session-key",
sessionKey,
"--after-seq",
String(Math.max(0, Number.isFinite(afterSeq) ? afterSeq : 0)),
];
const profile = getEffectiveProfile();
const workspace = resolveWorkspaceRoot();
return spawn("openclaw", args, {
env: {
...process.env,
...(profile ? { OPENCLAW_PROFILE: profile } : {}),
...(workspace ? { OPENCLAW_WORKSPACE: workspace } : {}),
},
stdio: ["ignore", "pipe", "pipe"],
});
}
/**
* Build a flat output object from the agent's tool result so the frontend
* can render tool output text, exit codes, etc.
@ -1074,222 +968,6 @@ export function buildToolOutput(
return out;
}
/**
* Spawn the openclaw agent and stream its output.
* Pass an AbortSignal to kill the child process when the caller cancels.
*
* When `options.sessionId` is set the child process gets `--session-id <id>`,
* which creates an isolated agent session that won't interfere with the main
* agent or other sidebar chats.
*/
export async function runAgent(
message: string,
signal: AbortSignal | undefined,
callback: AgentCallback,
options?: RunAgentOptions,
): Promise<void> {
return new Promise<void>((resolve) => {
const child = spawnAgentProcess(message, options?.sessionId);
// Kill the child process if the caller aborts (e.g. user hit stop).
if (signal) {
const onAbort = () => child.kill("SIGTERM");
if (signal.aborted) {
child.kill("SIGTERM");
} else {
signal.addEventListener("abort", onAbort, { once: true });
child.on("close", () =>
signal.removeEventListener("abort", onAbort),
);
}
}
// Collect stderr so we can surface errors to the UI
const stderrChunks: string[] = [];
let agentErrorReported = false;
const rl = createInterface({ input: child.stdout! });
// Prevent unhandled 'error' events when the child process fails
// to start (e.g. ENOENT). The child's own 'error' handler below
// surfaces the real error to the caller.
rl.on("error", () => { /* handled by child error/close */ });
rl.on("line", (line: string) => {
if (!line.trim()) {return;}
let event: AgentEvent;
try {
event = JSON.parse(line) as AgentEvent;
} catch {
console.log("[agent-runner] Non-JSON line:", line);
return; // skip non-JSON lines
}
// Handle assistant text deltas
if (event.event === "agent" && event.stream === "assistant") {
const delta =
typeof event.data?.delta === "string"
? event.data.delta
: undefined;
if (delta) {
callback.onTextDelta(delta);
}
// Forward media URLs (images, files generated by the agent)
const mediaUrls = event.data?.mediaUrls;
if (Array.isArray(mediaUrls)) {
for (const url of mediaUrls) {
if (typeof url === "string" && url.trim()) {
callback.onTextDelta(`\n![media](${url.trim()})\n`);
}
}
}
}
// Handle thinking/reasoning deltas
if (event.event === "agent" && event.stream === "thinking") {
const delta =
typeof event.data?.delta === "string"
? event.data.delta
: undefined;
if (delta) {
callback.onThinkingDelta(delta);
}
}
// Handle tool execution events
if (event.event === "agent" && event.stream === "tool") {
const phase =
typeof event.data?.phase === "string"
? event.data.phase
: undefined;
const toolCallId =
typeof event.data?.toolCallId === "string"
? event.data.toolCallId
: "";
const toolName =
typeof event.data?.name === "string"
? event.data.name
: "";
if (phase === "start") {
const args =
event.data?.args &&
typeof event.data.args === "object"
? (event.data.args as Record<string, unknown>)
: undefined;
callback.onToolStart(toolCallId, toolName, args);
} else if (phase === "update") {
callback.onToolUpdate?.(toolCallId, toolName);
} else if (phase === "result") {
const isError = event.data?.isError === true;
const result = extractToolResult(event.data?.result);
callback.onToolEnd(toolCallId, toolName, isError, result);
}
}
// Handle lifecycle start
if (
event.event === "agent" &&
event.stream === "lifecycle" &&
event.data?.phase === "start"
) {
callback.onLifecycleStart?.();
}
// Handle lifecycle end
if (
event.event === "agent" &&
event.stream === "lifecycle" &&
event.data?.phase === "end"
) {
callback.onLifecycleEnd();
}
// Handle session compaction events
if (event.event === "agent" && event.stream === "compaction") {
const phase =
typeof event.data?.phase === "string"
? event.data.phase
: undefined;
if (phase === "start") {
callback.onCompactionStart?.();
} else if (phase === "end") {
const willRetry = event.data?.willRetry === true;
callback.onCompactionEnd?.(willRetry);
}
}
// ── Surface agent-level errors (API 402, rate limits, etc.) ──
// Lifecycle error phase
if (
event.event === "agent" &&
event.stream === "lifecycle" &&
event.data?.phase === "error"
) {
const msg = parseAgentErrorMessage(event.data);
if (msg && !agentErrorReported) {
agentErrorReported = true;
callback.onAgentError?.(msg);
}
}
// Top-level error events
if (event.event === "error") {
const msg = parseAgentErrorMessage(event.data ?? event);
if (msg && !agentErrorReported) {
agentErrorReported = true;
callback.onAgentError?.(msg);
}
}
// Messages with stopReason "error" (some agents inline errors this way)
if (
event.event === "agent" &&
event.stream === "assistant" &&
typeof event.data?.stopReason === "string" &&
event.data.stopReason === "error" &&
typeof event.data?.errorMessage === "string"
) {
if (!agentErrorReported) {
agentErrorReported = true;
callback.onAgentError?.(
parseErrorBody(event.data.errorMessage),
);
}
}
});
child.on("close", (code) => {
// If no error was reported yet, check stderr for useful info
if (!agentErrorReported && stderrChunks.length > 0) {
const stderr = stderrChunks.join("").trim();
const msg = parseErrorFromStderr(stderr);
if (msg) {
agentErrorReported = true;
callback.onAgentError?.(msg);
}
}
callback.onClose(code);
resolve();
});
child.on("error", (err) => {
const error = err instanceof Error ? err : new Error(String(err));
callback.onError(error);
resolve();
});
// Capture stderr for debugging + error surfacing
child.stderr?.on("data", (chunk: Buffer) => {
const text = chunk.toString();
stderrChunks.push(text);
console.error("[denchclaw stderr]", text);
});
});
}
// ── Error message extraction helpers ──
/**

File diff suppressed because one or more lines are too long