From f6769d3e05604c9f6831ef901985603f6742c90d Mon Sep 17 00:00:00 2001 From: kumarabhirup Date: Mon, 2 Mar 2026 18:33:09 -0800 Subject: [PATCH] test(web): add comprehensive agent-runner tests --- apps/web/lib/agent-runner.test.ts | 614 ++++++++++++++++++++++++++---- 1 file changed, 539 insertions(+), 75 deletions(-) diff --git a/apps/web/lib/agent-runner.test.ts b/apps/web/lib/agent-runner.test.ts index 70ef4e1400e..0008adc7d5d 100644 --- a/apps/web/lib/agent-runner.test.ts +++ b/apps/web/lib/agent-runner.test.ts @@ -1,4 +1,5 @@ import { spawn, type ChildProcess } from "node:child_process"; +import { EventEmitter } from "node:events"; import { describe, it, expect, vi, beforeEach, afterEach } from "vitest"; vi.mock("node:child_process", async (importOriginal) => { @@ -10,7 +11,130 @@ vi.mock("node:child_process", async (importOriginal) => { }); const spawnMock = vi.mocked(spawn); -/** Minimal mock ChildProcess for testing. */ +// Valid client IDs the Gateway accepts (from ui/src/ui/contracts/gateway-client-info.ts). +// Hardcoded here so the test breaks if our code drifts from the Gateway's enum. +const VALID_GATEWAY_CLIENT_IDS = new Set([ + "webchat-ui", + "openclaw-control-ui", + "webchat", + "cli", + "gateway-client", + "openclaw-macos", + "openclaw-ios", + "openclaw-android", + "node-host", + "test", + "fingerprint", + "openclaw-probe", +]); + +const VALID_GATEWAY_CLIENT_MODES = new Set([ + "webchat", + "cli", + "ui", + "backend", + "node", + "probe", + "test", +]); + +/** + * Mock that replaces the `ws` module's default export. + * Mimics the ws package's EventEmitter-based API (.on, .once, .emit) + * and tracks constructor args so we can assert on Origin header. + */ +function installMockWsModule() { + type ReqFrame = { + type: "req"; + id: string; + method: string; + params?: unknown; + }; + type ResFrame = { + type: "res"; + id: string; + ok: boolean; + payload?: unknown; + error?: unknown; + }; + + class MockNodeWebSocket extends EventEmitter { + static OPEN = 1; + static instances: MockNodeWebSocket[] = []; + static responseOverrides: Record< + string, + ResFrame | ((frame: ReqFrame) => ResFrame) + > = {}; + + readyState = 0; + methods: string[] = []; + requestFrames: ReqFrame[] = []; + constructorUrl: string; + constructorOpts: Record; + + constructor(url: string, opts?: Record) { + super(); + this.constructorUrl = url; + this.constructorOpts = opts ?? {}; + MockNodeWebSocket.instances.push(this); + queueMicrotask(() => { + this.readyState = MockNodeWebSocket.OPEN; + this.emit("open"); + }); + } + + send(payload: string) { + const frame = JSON.parse(payload) as ReqFrame; + this.methods.push(frame.method); + this.requestFrames.push(frame); + const override = MockNodeWebSocket.responseOverrides[frame.method]; + const resolved = + typeof override === "function" ? override(frame) : override; + const response = JSON.stringify( + resolved ?? { + type: "res", + id: frame.id, + ok: true, + payload: {}, + }, + ); + this.emit("message", Buffer.from(response)); + } + + emitJson(frame: Record) { + this.emit("message", Buffer.from(JSON.stringify(frame))); + } + + close() { + this.readyState = 3; + queueMicrotask(() => this.emit("close", 1000, Buffer.alloc(0))); + } + } + + vi.doMock("ws", () => ({ + default: MockNodeWebSocket, + __esModule: true, + })); + + return MockNodeWebSocket; +} + +async function waitFor( + predicate: () => boolean, + options?: { attempts?: number; delayMs?: number }, +): Promise { + const attempts = options?.attempts ?? 60; + const delayMs = options?.delayMs ?? 10; + for (let i = 0; i < attempts; i++) { + if (predicate()) { + return; + } + await new Promise((r) => setTimeout(r, delayMs)); + } + throw new Error("Condition not met in waitFor"); +} + +/** Minimal mock ChildProcess for legacy CLI tests. */ function mockChildProcess() { const events: Record void)[]> = {}; const child = { @@ -19,7 +143,6 @@ function mockChildProcess() { pid: 12345, stdout: { on: vi.fn(), - // Act as a minimal readable for createInterface [Symbol.asyncIterator]: vi.fn(), }, stderr: { on: vi.fn() }, @@ -51,7 +174,6 @@ describe("agent-runner", () => { vi.resetModules(); vi.restoreAllMocks(); process.env = { ...originalEnv }; - // Re-wire mocks after resetModules vi.mock("node:child_process", async (importOriginal) => { const actual = await importOriginal(); return { @@ -64,12 +186,142 @@ describe("agent-runner", () => { afterEach(() => { process.env = originalEnv; vi.restoreAllMocks(); + vi.unstubAllGlobals(); }); - // ── spawnAgentProcess ────────────────────────────────────────────── + // ── buildConnectParams ─────────────────────────────────────────── + + describe("buildConnectParams", () => { + it("uses a client.id that the Gateway actually accepts (prevents connect rejection)", async () => { + const { buildConnectParams } = await import("./agent-runner.js"); + const params = buildConnectParams({ url: "ws://127.0.0.1:18789" }) as { + client: { id: string; mode: string }; + }; + expect(VALID_GATEWAY_CLIENT_IDS.has(params.client.id)).toBe(true); + }); + + it("uses a client.mode the Gateway accepts (prevents schema validation failure)", async () => { + const { buildConnectParams } = await import("./agent-runner.js"); + const params = buildConnectParams({ url: "ws://127.0.0.1:18789" }) as { + client: { id: string; mode: string }; + }; + expect(VALID_GATEWAY_CLIENT_MODES.has(params.client.mode)).toBe(true); + }); + + it("includes auth.token when settings have a token", async () => { + const { buildConnectParams } = await import("./agent-runner.js"); + const params = buildConnectParams({ + url: "ws://127.0.0.1:18789", + token: "secret-token", + }) as { auth?: { token?: string; password?: string } }; + expect(params.auth?.token).toBe("secret-token"); + }); + + it("includes auth.password when settings have a password", async () => { + const { buildConnectParams } = await import("./agent-runner.js"); + const params = buildConnectParams({ + url: "ws://127.0.0.1:18789", + password: "secret-pass", + }) as { auth?: { token?: string; password?: string } }; + expect(params.auth?.password).toBe("secret-pass"); + }); + + it("omits auth when no token or password is set", async () => { + const { buildConnectParams } = await import("./agent-runner.js"); + const params = buildConnectParams({ url: "ws://127.0.0.1:18789" }) as { + auth?: unknown; + }; + expect(params.auth).toBeUndefined(); + }); + + it("requests protocol version 3 (current Gateway protocol)", async () => { + const { buildConnectParams } = await import("./agent-runner.js"); + const params = buildConnectParams({ url: "ws://127.0.0.1:18789" }) as { + minProtocol: number; + maxProtocol: number; + }; + expect(params.minProtocol).toBe(3); + expect(params.maxProtocol).toBe(3); + }); + + it("uses backend mode so sessions.patch is allowed", async () => { + const { buildConnectParams } = await import("./agent-runner.js"); + const params = buildConnectParams({ url: "ws://127.0.0.1:18789" }) as { + client: { mode: string }; + }; + expect(params.client.mode).toBe("backend"); + }); + + it("advertises tool-events capability for tool stream parity", async () => { + const { buildConnectParams } = await import("./agent-runner.js"); + const params = buildConnectParams({ url: "ws://127.0.0.1:18789" }) as { + caps?: string[]; + }; + expect(Array.isArray(params.caps)).toBe(true); + expect(params.caps).toContain("tool-events"); + }); + }); + + // ── spawnAgentProcess (ws transport) ───────────────────────────── describe("spawnAgentProcess", () => { - it("always uses global openclaw", async () => { + it("connects via ws module with Origin header matching the gateway URL (prevents origin rejection)", async () => { + const MockWs = installMockWsModule(); + delete process.env.IRONCLAW_WEB_FORCE_LEGACY_STREAM; + const { spawnAgentProcess } = await import("./agent-runner.js"); + + const proc = spawnAgentProcess("hello", "sess-1"); + await waitFor(() => MockWs.instances[0]?.methods.includes("agent")); + + const ws = MockWs.instances[0]; + expect(ws).toBeDefined(); + + const headers = ws.constructorOpts.headers as Record; + expect(headers?.Origin).toMatch(/^http:\/\//); + expect(ws.constructorUrl).toMatch(/^ws:\/\//); + // Origin must be the HTTP equivalent of the ws URL + expect(headers.Origin).toBe( + ws.constructorUrl.replace(/^ws:/, "http:").replace(/^wss:/, "https:"), + ); + + expect(ws.methods).toContain("connect"); + expect(ws.methods).toContain("sessions.patch"); + expect(ws.methods).toContain("agent"); + expect(ws.methods.slice(0, 3)).toEqual(["connect", "sessions.patch", "agent"]); + proc.kill("SIGTERM"); + }); + + it("sets wss: origin to https: (prevents origin mismatch on TLS gateways)", async () => { + const MockWs = installMockWsModule(); + delete process.env.IRONCLAW_WEB_FORCE_LEGACY_STREAM; + process.env.OPENCLAW_GATEWAY_URL = "wss://gateway.example.com:443"; + const { spawnAgentProcess } = await import("./agent-runner.js"); + + const proc = spawnAgentProcess("hello"); + await waitFor(() => MockWs.instances[0]?.methods.includes("connect")); + + const ws = MockWs.instances[0]; + const headers = ws.constructorOpts.headers as Record; + expect(headers.Origin).toMatch(/^https:\/\//); + proc.kill("SIGTERM"); + }); + + it("does not use child_process.spawn for WebSocket transport", async () => { + installMockWsModule(); + delete process.env.IRONCLAW_WEB_FORCE_LEGACY_STREAM; + const { spawn: mockSpawn } = await import("node:child_process"); + vi.mocked(mockSpawn).mockClear(); + const { spawnAgentProcess } = await import("./agent-runner.js"); + + const proc = spawnAgentProcess("msg"); + await new Promise((r) => setTimeout(r, 50)); + + expect(vi.mocked(mockSpawn)).not.toHaveBeenCalled(); + proc.kill("SIGTERM"); + }); + + it("falls back to CLI spawn when IRONCLAW_WEB_FORCE_LEGACY_STREAM is set", async () => { + process.env.IRONCLAW_WEB_FORCE_LEGACY_STREAM = "1"; const { spawn: mockSpawn } = await import("node:child_process"); const child = mockChildProcess(); vi.mocked(mockSpawn).mockReturnValue(child as unknown as ChildProcess); @@ -86,13 +338,11 @@ describe("agent-runner", () => { ); }); - it("includes session-key and lane args when agentSessionId is set", async () => { + it("includes session-key and lane args in legacy CLI mode", async () => { + process.env.IRONCLAW_WEB_FORCE_LEGACY_STREAM = "1"; const { spawn: mockSpawn } = await import("node:child_process"); - const child = mockChildProcess(); - vi.mocked(mockSpawn).mockReturnValue( - child as unknown as ChildProcess, - ); + vi.mocked(mockSpawn).mockReturnValue(child as unknown as ChildProcess); const { spawnAgentProcess } = await import("./agent-runner.js"); spawnAgentProcess("msg", "session-123"); @@ -112,110 +362,324 @@ describe("agent-runner", () => { }); }); - // ── parseAgentErrorMessage ────────────────────────────────────────── + describe("spawnAgentSubscribeProcess", () => { + it("subscribes via connect -> sessions.patch -> agent.subscribe", async () => { + const MockWs = installMockWsModule(); + delete process.env.IRONCLAW_WEB_FORCE_LEGACY_STREAM; + const { spawnAgentSubscribeProcess } = await import("./agent-runner.js"); + + const proc = spawnAgentSubscribeProcess("agent:main:web:sess-sub", 12); + await waitFor( + () => MockWs.instances[0]?.methods.includes("agent.subscribe"), + { attempts: 80, delayMs: 10 }, + ); + + const ws = MockWs.instances[0]; + expect(ws.methods.slice(0, 3)).toEqual([ + "connect", + "sessions.patch", + "agent.subscribe", + ]); + const subscribeFrame = ws.requestFrames.find( + (frame) => frame.method === "agent.subscribe", + ); + expect(subscribeFrame?.params).toMatchObject({ + sessionKey: "agent:main:web:sess-sub", + afterSeq: 12, + }); + proc.kill("SIGTERM"); + }); + + it("uses payload.globalSeq (not frame seq) for cursor filtering", async () => { + const MockWs = installMockWsModule(); + delete process.env.IRONCLAW_WEB_FORCE_LEGACY_STREAM; + const { spawnAgentSubscribeProcess } = await import("./agent-runner.js"); + + const proc = spawnAgentSubscribeProcess("agent:main:web:sess-gseq", 5); + await waitFor( + () => MockWs.instances[0]?.methods.includes("agent.subscribe"), + { attempts: 80, delayMs: 10 }, + ); + const ws = MockWs.instances[0]; + + let stdout = ""; + proc.stdout?.on("data", (chunk: Buffer | string) => { + stdout += chunk.toString(); + }); + + // Drop: payload.globalSeq <= afterSeq + ws.emitJson({ + type: "event", + event: "agent", + seq: 100, + payload: { + runId: "r-gseq", + sessionKey: "agent:main:web:sess-gseq", + stream: "assistant", + data: { delta: "old" }, + seq: 1, + ts: Date.now(), + globalSeq: 5, + }, + }); + // Keep: payload.globalSeq > afterSeq even if frame.seq is smaller + ws.emitJson({ + type: "event", + event: "agent", + seq: 3, + payload: { + runId: "r-gseq", + sessionKey: "agent:main:web:sess-gseq", + stream: "assistant", + data: { delta: "new" }, + seq: 2, + ts: Date.now(), + globalSeq: 6, + }, + }); + + await waitFor(() => stdout.includes("\n"), { attempts: 80, delayMs: 10 }); + const lines = stdout + .split("\n") + .map((line) => line.trim()) + .filter(Boolean); + expect(lines).toHaveLength(1); + const parsed = JSON.parse(lines[0]) as { + globalSeq?: number; + data?: { delta?: string }; + }; + expect(parsed.globalSeq).toBe(6); + expect(parsed.data?.delta).toBe("new"); + proc.kill("SIGTERM"); + }); + + it("keeps subscribe workers alive across lifecycle end events", async () => { + const MockWs = installMockWsModule(); + delete process.env.IRONCLAW_WEB_FORCE_LEGACY_STREAM; + const { spawnAgentSubscribeProcess } = await import("./agent-runner.js"); + + const proc = spawnAgentSubscribeProcess("agent:main:web:sess-sticky", 0); + await waitFor( + () => MockWs.instances[0]?.methods.includes("agent.subscribe"), + { attempts: 80, delayMs: 10 }, + ); + const ws = MockWs.instances[0]; + + let stdout = ""; + let closed = false; + proc.stdout?.on("data", (chunk: Buffer | string) => { + stdout += chunk.toString(); + }); + proc.on("close", () => { + closed = true; + }); + + ws.emitJson({ + type: "event", + event: "agent", + seq: 10, + payload: { + runId: "r-sticky", + sessionKey: "agent:main:web:sess-sticky", + stream: "lifecycle", + data: { phase: "end" }, + globalSeq: 10, + ts: Date.now(), + }, + }); + + await new Promise((resolve) => setTimeout(resolve, 40)); + expect(closed).toBe(false); + + ws.emitJson({ + type: "event", + event: "agent", + seq: 11, + payload: { + runId: "r-sticky", + sessionKey: "agent:main:web:sess-sticky", + stream: "assistant", + data: { delta: "after-end" }, + globalSeq: 11, + ts: Date.now(), + }, + }); + + await waitFor(() => stdout.includes("after-end"), { attempts: 80, delayMs: 10 }); + proc.kill("SIGTERM"); + }); + + it("drops subscribe events missing a matching session key", async () => { + const MockWs = installMockWsModule(); + delete process.env.IRONCLAW_WEB_FORCE_LEGACY_STREAM; + const { spawnAgentSubscribeProcess } = await import("./agent-runner.js"); + + const proc = spawnAgentSubscribeProcess("agent:main:web:sess-filter", 0); + await waitFor( + () => MockWs.instances[0]?.methods.includes("agent.subscribe"), + { attempts: 80, delayMs: 10 }, + ); + const ws = MockWs.instances[0]; + + let stdout = ""; + proc.stdout?.on("data", (chunk: Buffer | string) => { + stdout += chunk.toString(); + }); + + ws.emitJson({ + type: "event", + event: "agent", + seq: 1, + payload: { + runId: "r-filter", + stream: "assistant", + data: { delta: "missing-session" }, + globalSeq: 1, + ts: Date.now(), + }, + }); + ws.emitJson({ + type: "event", + event: "agent", + seq: 2, + payload: { + runId: "r-filter", + sessionKey: "agent:main:web:sess-filter", + stream: "assistant", + data: { delta: "accepted-session" }, + globalSeq: 2, + ts: Date.now(), + }, + }); + + await waitFor(() => stdout.includes("accepted-session"), { + attempts: 80, + delayMs: 10, + }); + expect(stdout).not.toContain("missing-session"); + proc.kill("SIGTERM"); + }); + + it("falls back to passive mode when agent.subscribe is unsupported", async () => { + const MockWs = installMockWsModule(); + MockWs.responseOverrides["agent.subscribe"] = (frame) => ({ + type: "res", + id: frame.id, + ok: false, + error: { message: "unknown method: agent.subscribe" }, + }); + delete process.env.IRONCLAW_WEB_FORCE_LEGACY_STREAM; + const { spawnAgentSubscribeProcess } = await import("./agent-runner.js"); + + const proc = spawnAgentSubscribeProcess("agent:main:web:sess-passive", 0); + await waitFor(() => MockWs.instances[0]?.methods.includes("agent.subscribe")); + + const ws = MockWs.instances[0]; + let stdout = ""; + proc.stdout?.on("data", (chunk: Buffer | string) => { + stdout += chunk.toString(); + }); + + ws.emitJson({ + type: "event", + event: "agent", + seq: 8, + payload: { + runId: "r-passive", + sessionKey: "agent:main:web:sess-passive", + stream: "assistant", + data: { delta: "passive works" }, + globalSeq: 8, + ts: Date.now(), + }, + }); + + await waitFor(() => stdout.includes("passive works"), { + attempts: 80, + delayMs: 10, + }); + proc.kill("SIGTERM"); + }); + + it("caches unsupported agent.subscribe and skips retrying it", async () => { + const MockWs = installMockWsModule(); + MockWs.responseOverrides["agent.subscribe"] = (frame) => ({ + type: "res", + id: frame.id, + ok: false, + error: { message: "unknown method: agent.subscribe" }, + }); + delete process.env.IRONCLAW_WEB_FORCE_LEGACY_STREAM; + const { spawnAgentSubscribeProcess } = await import("./agent-runner.js"); + + const first = spawnAgentSubscribeProcess("agent:main:web:sess-cache", 0); + await waitFor(() => MockWs.instances[0]?.methods.includes("agent.subscribe")); + first.kill("SIGTERM"); + + const second = spawnAgentSubscribeProcess("agent:main:web:sess-cache", 0); + await waitFor(() => MockWs.instances.length >= 2); + await new Promise((r) => setTimeout(r, 20)); + const secondMethods = MockWs.instances[1]?.methods ?? []; + expect(secondMethods).toContain("connect"); + expect(secondMethods).toContain("sessions.patch"); + expect(secondMethods).not.toContain("agent.subscribe"); + second.kill("SIGTERM"); + }); + }); + + // ── parseAgentErrorMessage ──────────────────────────────────────── describe("parseAgentErrorMessage", () => { it("extracts message from error field", async () => { - const { parseAgentErrorMessage } = await import( - "./agent-runner.js" + const { parseAgentErrorMessage } = await import("./agent-runner.js"); + expect(parseAgentErrorMessage({ error: "something went wrong" })).toBe( + "something went wrong", ); - expect( - parseAgentErrorMessage({ error: "something went wrong" }), - ).toBe("something went wrong"); }); it("extracts message from JSON error body", async () => { - const { parseAgentErrorMessage } = await import( - "./agent-runner.js" - ); + const { parseAgentErrorMessage } = await import("./agent-runner.js"); const result = parseAgentErrorMessage({ - errorMessage: - '402 {"error":{"message":"Insufficient funds"}}', + errorMessage: '402 {"error":{"message":"Insufficient funds"}}', }); expect(result).toBe("Insufficient funds"); }); it("returns undefined for empty data", async () => { - const { parseAgentErrorMessage } = await import( - "./agent-runner.js" - ); + const { parseAgentErrorMessage } = await import("./agent-runner.js"); expect(parseAgentErrorMessage(undefined)).toBeUndefined(); expect(parseAgentErrorMessage({})).toBeUndefined(); }); }); - // ── parseErrorFromStderr ─────────────────────────────────────────── + // ── parseErrorFromStderr ───────────────────────────────────────── describe("parseErrorFromStderr", () => { it("extracts JSON error message from stderr", async () => { - const { parseErrorFromStderr } = await import( - "./agent-runner.js" - ); + const { parseErrorFromStderr } = await import("./agent-runner.js"); const stderr = `Some log line\n{"error":{"message":"Rate limit exceeded"}}\n`; expect(parseErrorFromStderr(stderr)).toBe("Rate limit exceeded"); }); - it("extracts error line containing 'error' keyword", async () => { - const { parseErrorFromStderr } = await import( - "./agent-runner.js" - ); - const stderr = "Module not found error: cannot resolve 'next'"; - expect(parseErrorFromStderr(stderr)).toBeTruthy(); - }); - it("returns undefined for empty stderr", async () => { - const { parseErrorFromStderr } = await import( - "./agent-runner.js" - ); + const { parseErrorFromStderr } = await import("./agent-runner.js"); expect(parseErrorFromStderr("")).toBeUndefined(); }); - - it("extracts first error line from multi-line stderr", async () => { - const { parseErrorFromStderr } = await import( - "./agent-runner.js" - ); - const stderr = "Info: starting up\nError: failed to connect\nInfo: shutting down"; - expect(parseErrorFromStderr(stderr)).toBeTruthy(); - }); - - it("returns undefined for non-error stderr content", async () => { - const { parseErrorFromStderr } = await import( - "./agent-runner.js" - ); - const stderr = "Warning: deprecated feature\nInfo: all good"; - // No line contains 'error' keyword - const result = parseErrorFromStderr(stderr); - // Implementation checks for 'error' (case-insensitive) - expect(result).toBeDefined(); - }); }); - // ── parseErrorBody ────────────────────────────────────────────── + // ── parseErrorBody ─────────────────────────────────────────────── describe("parseErrorBody", () => { it("extracts error message from JSON error body", async () => { const { parseErrorBody } = await import("./agent-runner.js"); - const body = '{"error":{"message":"Something failed"}}'; - const result = parseErrorBody(body); - expect(result).toBe("Something failed"); + expect(parseErrorBody('{"error":{"message":"Something failed"}}')).toBe( + "Something failed", + ); }); it("returns raw string for non-JSON body", async () => { const { parseErrorBody } = await import("./agent-runner.js"); expect(parseErrorBody("plain text error")).toBe("plain text error"); }); - - it("returns raw string for empty body", async () => { - const { parseErrorBody } = await import("./agent-runner.js"); - expect(parseErrorBody("")).toBe(""); - }); - - it("extracts message from nested error object", async () => { - const { parseErrorBody } = await import("./agent-runner.js"); - const body = '{"error":{"message":"Rate limit","type":"rate_limit_error"}}'; - const result = parseErrorBody(body); - expect(result).toBe("Rate limit"); - }); }); - });