diff --git a/apps/web/lib/subagent-runs.test.ts b/apps/web/lib/subagent-runs.test.ts index 1baf4912733..a65797f4659 100644 --- a/apps/web/lib/subagent-runs.test.ts +++ b/apps/web/lib/subagent-runs.test.ts @@ -38,14 +38,84 @@ vi.mock("node:os", () => ({ })); vi.mock("node:readline", () => ({ - createInterface: vi.fn(() => ({ + createInterface: vi.fn(() => { + const handlers: Record void)[]> = {}; + const iface = { + on: vi.fn((event: string, cb: (...args: unknown[]) => void) => { + handlers[event] = handlers[event] || []; + handlers[event].push(cb); + return iface; + }), + close: vi.fn(), + _emit: (event: string, ...args: unknown[]) => { + for (const cb of handlers[event] || []) { + cb(...args); + } + }, + }; + return iface; + }), +})); + +vi.mock("./agent-runner", () => ({ + spawnAgentSubscribeProcess: vi.fn(() => ({ + stdout: { on: vi.fn() }, + stderr: { on: vi.fn() }, on: vi.fn(), - close: vi.fn(), + once: vi.fn(), + kill: vi.fn(), + pid: 12345, })), + callGatewayRpc: vi.fn(() => Promise.resolve({ ok: true })), + extractToolResult: vi.fn((raw: unknown) => { + if (!raw) {return undefined;} + if (typeof raw === "string") {return { text: raw };} + return { text: undefined, details: raw as Record }; + }), + buildToolOutput: vi.fn((result?: { text?: string }) => + result ? { text: result.text } : {}, + ), + parseAgentErrorMessage: vi.fn((data?: Record) => { + if (data?.error && typeof data.error === "string") {return data.error;} + if (data?.message && typeof data.message === "string") {return data.message;} + return undefined; + }), + parseErrorBody: vi.fn((raw: string) => raw), + parseErrorFromStderr: vi.fn((stderr: string) => { + if (!stderr) {return undefined;} + if (/error/i.test(stderr)) {return stderr.trim();} + return undefined; + }), })); import { appendFileSync } from "node:fs"; +function createMockSubscribeProcess() { + const handlers: Record void)[]> = {}; + const proc = { + stdout: { on: vi.fn() }, + stderr: { on: vi.fn() }, + on: vi.fn((event: string, cb: (...args: unknown[]) => void) => { + handlers[event] = handlers[event] || []; + handlers[event].push(cb); + return proc; + }), + once: vi.fn((event: string, cb: (...args: unknown[]) => void) => { + handlers[event] = handlers[event] || []; + handlers[event].push(cb); + return proc; + }), + kill: vi.fn(), + pid: 12345, + _emit(event: string, ...args: unknown[]) { + for (const cb of handlers[event] || []) { + cb(...args); + } + }, + }; + return proc; +} + // Shared global key used by subagent-runs.ts for its singleton registry const GLOBAL_KEY = "__openclaw_subagentRuns"; @@ -99,10 +169,53 @@ describe("subagent runs", () => { homedir: vi.fn(() => "/home/testuser"), })); vi.mock("node:readline", () => ({ - createInterface: vi.fn(() => ({ + createInterface: vi.fn(() => { + const handlers: Record void)[]> = {}; + const iface = { + on: vi.fn((event: string, cb: (...args: unknown[]) => void) => { + handlers[event] = handlers[event] || []; + handlers[event].push(cb); + return iface; + }), + close: vi.fn(), + _emit: (event: string, ...args: unknown[]) => { + for (const cb of handlers[event] || []) { + cb(...args); + } + }, + }; + return iface; + }), + })); + vi.mock("./agent-runner", () => ({ + spawnAgentSubscribeProcess: vi.fn(() => ({ + stdout: { on: vi.fn() }, + stderr: { on: vi.fn() }, on: vi.fn(), - close: vi.fn(), + once: vi.fn(), + kill: vi.fn(), + pid: 12345, })), + callGatewayRpc: vi.fn(() => Promise.resolve({ ok: true })), + extractToolResult: vi.fn((raw: unknown) => { + if (!raw) {return undefined;} + if (typeof raw === "string") {return { text: raw };} + return { text: undefined, details: raw as Record }; + }), + buildToolOutput: vi.fn((result?: { text?: string }) => + result ? { text: result.text } : {}, + ), + parseAgentErrorMessage: vi.fn((data?: Record) => { + if (data?.error && typeof data.error === "string") {return data.error;} + if (data?.message && typeof data.message === "string") {return data.message;} + return undefined; + }), + parseErrorBody: vi.fn((raw: string) => raw), + parseErrorFromStderr: vi.fn((stderr: string) => { + if (!stderr) {return undefined;} + if (/error/i.test(stderr)) {return stderr.trim();} + return undefined; + }), })); }); @@ -192,6 +305,135 @@ describe("subagent runs", () => { }); }); + describe("subscribe restart stability", () => { + it("applies exponential backoff and resets after a recovered stream event", async () => { + vi.useFakeTimers(); + try { + const { spawnAgentSubscribeProcess } = await import("./agent-runner.js"); + const { createInterface } = await import("node:readline"); + const mockSubscribeSpawn = vi.mocked(spawnAgentSubscribeProcess); + mockSubscribeSpawn.mockReset(); + + const first = createMockSubscribeProcess(); + const second = createMockSubscribeProcess(); + const third = createMockSubscribeProcess(); + const fourth = createMockSubscribeProcess(); + mockSubscribeSpawn + .mockReturnValueOnce(first as never) + .mockReturnValueOnce(second as never) + .mockReturnValueOnce(third as never) + .mockReturnValueOnce(fourth as never); + + const { registerSubagent } = await importSubagentRuns(); + registerSubagent("parent-stable", { + sessionKey: "sub:stable:c1", + runId: "run-stable", + task: "retry stream", + }); + expect(mockSubscribeSpawn).toHaveBeenCalledTimes(1); + + first._emit("close", 1); + await vi.advanceTimersByTimeAsync(299); + expect(mockSubscribeSpawn).toHaveBeenCalledTimes(1); + await vi.advanceTimersByTimeAsync(1); + expect(mockSubscribeSpawn).toHaveBeenCalledTimes(2); + + second._emit("close", 1); + await vi.advanceTimersByTimeAsync(599); + expect(mockSubscribeSpawn).toHaveBeenCalledTimes(2); + await vi.advanceTimersByTimeAsync(1); + expect(mockSubscribeSpawn).toHaveBeenCalledTimes(3); + + const createInterfaceMock = vi.mocked(createInterface); + const thirdInterface = createInterfaceMock.mock.results.at(-1) + ?.value as { _emit: (event: string, payload: string) => void }; + thirdInterface?._emit("line", JSON.stringify({ + event: "agent", + sessionKey: "sub:stable:c1", + stream: "assistant", + data: { delta: "recovered" }, + globalSeq: 1, + })); + await vi.advanceTimersByTimeAsync(0); + + third._emit("close", 1); + await vi.advanceTimersByTimeAsync(299); + expect(mockSubscribeSpawn).toHaveBeenCalledTimes(3); + await vi.advanceTimersByTimeAsync(1); + expect(mockSubscribeSpawn).toHaveBeenCalledTimes(4); + } finally { + vi.useRealTimers(); + } + }); + }); + + describe("event parity safeguards", () => { + it("emits tool input/output events from subscribed lines", async () => { + const { createInterface } = await import("node:readline"); + const { registerSubagent, subscribeToSubagent } = await importSubagentRuns(); + + registerSubagent("parent-tools", { + sessionKey: "sub:tools:c1", + runId: "run-tools", + task: "tool parity", + }); + + const received: Array> = []; + subscribeToSubagent( + "sub:tools:c1", + (event) => { + if (event) { + received.push(event as Record); + } + }, + { replay: false }, + ); + + const createInterfaceMock = vi.mocked(createInterface); + const iface = createInterfaceMock.mock.results.at(-1) + ?.value as { _emit: (event: string, payload: string) => void }; + + iface._emit("line", JSON.stringify({ + event: "agent", + sessionKey: "sub:tools:c1", + stream: "tool", + data: { + phase: "start", + toolCallId: "tc-tools-1", + name: "bash", + args: { command: "echo hello" }, + }, + globalSeq: 1, + })); + iface._emit("line", JSON.stringify({ + event: "agent", + sessionKey: "sub:tools:c1", + stream: "tool", + data: { + phase: "result", + toolCallId: "tc-tools-1", + result: { text: "hello" }, + }, + globalSeq: 2, + })); + + expect( + received.some( + (e) => + e.type === "tool-input-start" && + e.toolCallId === "tc-tools-1", + ), + ).toBe(true); + expect( + received.some( + (e) => + e.type === "tool-output-available" && + e.toolCallId === "tc-tools-1", + ), + ).toBe(true); + }); + }); + // ─── getSubagentsForSession ─────────────────────────────────────── describe("getSubagentsForSession", () => { @@ -501,5 +743,50 @@ describe("subagent runs", () => { const { abortSubagent } = await importSubagentRuns(); expect(abortSubagent("unknown")).toBe(false); }); + + it("aborts known subagent via gateway RPC", async () => { + const { callGatewayRpc } = await import("./agent-runner.js"); + const { registerSubagent, abortSubagent } = await importSubagentRuns(); + + registerSubagent("parent-1", { + sessionKey: "sub:p:c1", + runId: "run-1", + task: "task", + }); + + expect(abortSubagent("sub:p:c1")).toBe(true); + expect(vi.mocked(callGatewayRpc)).toHaveBeenCalledWith( + "chat.abort", + { sessionKey: "sub:p:c1" }, + { timeoutMs: 4_000 }, + ); + }); + }); + + describe("spawnSubagentMessage", () => { + it("sends follow-up messages via gateway RPC", async () => { + const { callGatewayRpc } = await import("./agent-runner.js"); + const { registerSubagent, spawnSubagentMessage } = await importSubagentRuns(); + + registerSubagent("parent-1", { + sessionKey: "sub:p:c1", + runId: "run-1", + task: "task", + }); + + expect(spawnSubagentMessage("sub:p:c1", "continue")).toBe(true); + expect(vi.mocked(callGatewayRpc)).toHaveBeenCalledWith( + "agent", + expect.objectContaining({ + sessionKey: "sub:p:c1", + message: "continue", + channel: "webchat", + lane: "subagent", + deliver: false, + timeout: 0, + }), + { timeoutMs: 10_000 }, + ); + }); }); });