From 23172896afbb6fd827e9c6da34dc2b85ca63c58d Mon Sep 17 00:00:00 2001 From: kumarabhirup Date: Thu, 12 Feb 2026 20:26:46 -0800 Subject: [PATCH] Web app: fix chat stream in standalone builds and add tests - Fix spawnAgentProcess path resolution: walk up to find package root instead of assuming 2 levels up from apps/web (breaks in standalone where cwd is deep inside .next/standalone/); use openclaw.mjs in production since scripts/run-node.mjs isn't shipped in the package - Add missing readline error handlers in active-runs.ts and agent-runner.ts to prevent "Unhandled 'error' event" crashes when the child process fails to start - Pass OPENCLAW_ROOT env var from gateway to standalone server so the web app can reliably find the CLI entry point - Add 32 tests covering path resolution, SSE streaming, error handling, run lifecycle, replay, and abort Co-authored-by: Cursor --- apps/web/lib/active-runs.test.ts | 670 ++++++++++++++++++++++++++++++ apps/web/lib/active-runs.ts | 10 + apps/web/lib/agent-runner.test.ts | 311 ++++++++++++++ apps/web/lib/agent-runner.ts | 63 ++- package.json | 2 +- src/gateway/server-web-app.ts | 20 +- 6 files changed, 1067 insertions(+), 9 deletions(-) create mode 100644 apps/web/lib/active-runs.test.ts create mode 100644 apps/web/lib/agent-runner.test.ts diff --git a/apps/web/lib/active-runs.test.ts b/apps/web/lib/active-runs.test.ts new file mode 100644 index 00000000000..b160ae7c698 --- /dev/null +++ b/apps/web/lib/active-runs.test.ts @@ -0,0 +1,670 @@ +import { type ChildProcess } from "node:child_process"; +import { PassThrough } from "node:stream"; +import { describe, it, expect, vi, beforeEach, afterEach } from "vitest"; + +// Mock agent-runner to control spawnAgentProcess +vi.mock("./agent-runner", () => ({ + spawnAgentProcess: vi.fn(), + extractToolResult: vi.fn((raw: unknown) => { + if (!raw) {return undefined;} + if (typeof raw === "string") {return { text: raw };} + return { text: undefined, details: raw as Record }; + }), + buildToolOutput: vi.fn( + (result?: { text?: string }) => (result ? { text: result.text } : {}), + ), + parseAgentErrorMessage: vi.fn((data?: Record) => { + if (data?.error && typeof data.error === "string") {return data.error;} + if (data?.message && typeof data.message === "string") {return data.message;} + return undefined; + }), + parseErrorBody: vi.fn((raw: string) => raw), + parseErrorFromStderr: vi.fn((stderr: string) => { + if (!stderr) {return undefined;} + if (/error/i.test(stderr)) {return stderr.trim();} + return undefined; + }), +})); + +// Mock fs operations used for persistence so tests don't hit disk +vi.mock("node:fs", async (importOriginal) => { + const original = await importOriginal(); + return { + ...original, + existsSync: vi.fn(() => false), + readFileSync: vi.fn(() => ""), + writeFileSync: vi.fn(), + mkdirSync: vi.fn(), + }; +}); + +import type { SseEvent } from "./active-runs.js"; + +/** + * Create a mock child process with a real PassThrough stream for stdout, + * so the readline interface inside wireChildProcess actually receives data. + */ +function createMockChild() { + const events: Record void)[]> = {}; + const stdoutStream = new PassThrough(); + const stderrStream = new PassThrough(); + + const child = { + exitCode: null as number | null, + killed: false, + pid: 12345, + stdout: stdoutStream, + stderr: stderrStream, + on: vi.fn((event: string, cb: (...args: unknown[]) => void) => { + events[event] = events[event] || []; + events[event].push(cb); + return child; + }), + once: vi.fn((event: string, cb: (...args: unknown[]) => void) => { + events[event] = events[event] || []; + events[event].push(cb); + return child; + }), + kill: vi.fn(), + /** Emit an event to all registered listeners. */ + _emit(event: string, ...args: unknown[]) { + for (const cb of events[event] || []) { + cb(...args); + } + }, + /** Write a JSON line to stdout (simulating agent output). */ + _writeLine(jsonObj: Record) { + stdoutStream.write(JSON.stringify(jsonObj) + "\n"); + }, + /** Write raw text to stderr. */ + _writeStderr(text: string) { + stderrStream.write(Buffer.from(text)); + }, + }; + + return child; +} + +describe("active-runs", () => { + beforeEach(() => { + vi.resetModules(); + + // Re-wire mocks after resetModules + vi.mock("./agent-runner", () => ({ + spawnAgentProcess: vi.fn(), + extractToolResult: vi.fn((raw: unknown) => { + if (!raw) {return undefined;} + if (typeof raw === "string") {return { text: raw };} + return { + text: undefined, + details: raw as Record, + }; + }), + buildToolOutput: vi.fn( + (result?: { text?: string }) => + result ? { text: result.text } : {}, + ), + parseAgentErrorMessage: vi.fn( + (data?: Record) => { + if (data?.error && typeof data.error === "string") + {return data.error;} + if (data?.message && typeof data.message === "string") + {return data.message;} + return undefined; + }, + ), + parseErrorBody: vi.fn((raw: string) => raw), + parseErrorFromStderr: vi.fn((stderr: string) => { + if (!stderr) {return undefined;} + if (/error/i.test(stderr)) {return stderr.trim();} + return undefined; + }), + })); + + vi.mock("node:fs", async (importOriginal) => { + const original = + await importOriginal(); + return { + ...original, + existsSync: vi.fn(() => false), + readFileSync: vi.fn(() => ""), + writeFileSync: vi.fn(), + mkdirSync: vi.fn(), + }; + }); + }); + + afterEach(() => { + vi.restoreAllMocks(); + }); + + /** Helper: set up a mock child and import the active-runs module fresh. */ + async function setup() { + const child = createMockChild(); + + const { spawnAgentProcess } = await import("./agent-runner.js"); + vi.mocked(spawnAgentProcess).mockReturnValue( + child as unknown as ChildProcess, + ); + + const mod = await import("./active-runs.js"); + return { child, ...mod }; + } + + // ── startRun + subscribeToRun ────────────────────────────────────── + + describe("startRun + subscribeToRun", () => { + it("creates a run and emits fallback text when process exits without output", async () => { + const { child, startRun, subscribeToRun } = await setup(); + + const events: SseEvent[] = []; + + startRun({ + sessionId: "s1", + message: "hello", + agentSessionId: "s1", + }); + + subscribeToRun( + "s1", + (event) => { + if (event) {events.push(event);} + }, + { replay: false }, + ); + + // Close stdout before emitting close, so readline finishes + child.stdout.end(); + // Small delay to let readline drain + await new Promise((r) => setTimeout(r, 50)); + + child._emit("close", 0); + + // Should have emitted fallback "[error] No response from agent." + expect( + events.some( + (e) => + e.type === "text-delta" && + typeof e.delta === "string" && + (e.delta).includes("No response"), + ), + ).toBe(true); + }); + + it("streams assistant text events for agent assistant output", async () => { + const { child, startRun, subscribeToRun } = await setup(); + + const events: SseEvent[] = []; + + startRun({ + sessionId: "s-text", + message: "say hi", + agentSessionId: "s-text", + }); + + subscribeToRun( + "s-text", + (event) => { + if (event) {events.push(event);} + }, + { replay: false }, + ); + + // Emit an assistant text delta via stdout JSON + child._writeLine({ + event: "agent", + stream: "assistant", + data: { delta: "Hello world!" }, + }); + + // Give readline a tick to process + await new Promise((r) => setTimeout(r, 50)); + + // Should have text-start + text-delta + expect(events.some((e) => e.type === "text-start")).toBe(true); + expect( + events.some( + (e) => e.type === "text-delta" && e.delta === "Hello world!", + ), + ).toBe(true); + + // Clean up + child.stdout.end(); + await new Promise((r) => setTimeout(r, 50)); + child._emit("close", 0); + }); + + it("streams reasoning events for thinking output", async () => { + const { child, startRun, subscribeToRun } = await setup(); + + const events: SseEvent[] = []; + + startRun({ + sessionId: "s-think", + message: "think about it", + agentSessionId: "s-think", + }); + + subscribeToRun( + "s-think", + (event) => { + if (event) {events.push(event);} + }, + { replay: false }, + ); + + child._writeLine({ + event: "agent", + stream: "thinking", + data: { delta: "Let me think..." }, + }); + + await new Promise((r) => setTimeout(r, 50)); + + expect(events.some((e) => e.type === "reasoning-start")).toBe( + true, + ); + expect( + events.some( + (e) => + e.type === "reasoning-delta" && + e.delta === "Let me think...", + ), + ).toBe(true); + + child.stdout.end(); + await new Promise((r) => setTimeout(r, 50)); + child._emit("close", 0); + }); + + it("streams tool-input-start and tool-input-available for tool calls", async () => { + const { child, startRun, subscribeToRun } = await setup(); + + const events: SseEvent[] = []; + + startRun({ + sessionId: "s-tool", + message: "use a tool", + agentSessionId: "s-tool", + }); + + subscribeToRun( + "s-tool", + (event) => { + if (event) {events.push(event);} + }, + { replay: false }, + ); + + child._writeLine({ + event: "agent", + stream: "tool", + data: { + phase: "start", + toolCallId: "tc-1", + name: "search", + args: { query: "test" }, + }, + }); + + await new Promise((r) => setTimeout(r, 50)); + + expect( + events.some( + (e) => + e.type === "tool-input-start" && + e.toolCallId === "tc-1", + ), + ).toBe(true); + expect( + events.some( + (e) => + e.type === "tool-input-available" && + e.toolCallId === "tc-1" && + e.toolName === "search", + ), + ).toBe(true); + + child.stdout.end(); + await new Promise((r) => setTimeout(r, 50)); + child._emit("close", 0); + }); + + it("emits error text for non-zero exit code", async () => { + const { child, startRun, subscribeToRun } = await setup(); + + const events: SseEvent[] = []; + + startRun({ + sessionId: "s-fail", + message: "fail", + agentSessionId: "s-fail", + }); + + subscribeToRun( + "s-fail", + (event) => { + if (event) {events.push(event);} + }, + { replay: false }, + ); + + child.stdout.end(); + await new Promise((r) => setTimeout(r, 50)); + child._emit("close", 1); + + expect( + events.some( + (e) => + e.type === "text-delta" && + typeof e.delta === "string" && + (e.delta).includes("exited with code 1"), + ), + ).toBe(true); + }); + + it("signals completion (null) to subscribers when run finishes", async () => { + const { child, startRun, subscribeToRun } = await setup(); + + const completed: boolean[] = []; + + startRun({ + sessionId: "s-complete", + message: "hi", + agentSessionId: "s-complete", + }); + + subscribeToRun( + "s-complete", + (event) => { + if (event === null) {completed.push(true);} + }, + { replay: false }, + ); + + child.stdout.end(); + await new Promise((r) => setTimeout(r, 50)); + child._emit("close", 0); + + expect(completed).toHaveLength(1); + }); + }); + + // ── child process error handling ──────────────────────────────────── + + describe("child process error handling", () => { + it("emits 'Failed to start agent' on spawn error (ENOENT)", async () => { + const { child, startRun, subscribeToRun } = await setup(); + + const events: SseEvent[] = []; + const completions: boolean[] = []; + + startRun({ + sessionId: "s-enoent", + message: "hello", + agentSessionId: "s-enoent", + }); + + subscribeToRun( + "s-enoent", + (event) => { + if (event) { + events.push(event); + } else { + completions.push(true); + } + }, + { replay: false }, + ); + + const err = new Error("spawn node ENOENT"); + (err as NodeJS.ErrnoException).code = "ENOENT"; + child._emit("error", err); + + expect( + events.some( + (e) => + e.type === "text-delta" && + typeof e.delta === "string" && + (e.delta).includes("Failed to start agent"), + ), + ).toBe(true); + + expect(completions).toHaveLength(1); + }); + + it("does not crash on readline error (the root cause of 'Unhandled error event')", async () => { + const { child, startRun } = await setup(); + + startRun({ + sessionId: "s-rl-err", + message: "hello", + agentSessionId: "s-rl-err", + }); + + // Simulate what happens when a child process fails to start: + // stdout stream is destroyed with an error, which readline re-emits. + // Before the fix, this would throw "Unhandled 'error' event". + // After the fix, the rl.on("error") handler swallows it. + expect(() => { + child.stdout.destroy(new Error("stream destroyed")); + }).not.toThrow(); + + // Give a tick for the error to propagate + await new Promise((r) => setTimeout(r, 50)); + + // The run should still be tracked (error handler on child takes care of cleanup) + }); + }); + + // ── subscribeToRun replay ────────────────────────────────────────── + + describe("subscribeToRun replay", () => { + it("replays buffered events to new subscribers", async () => { + const { child, startRun, subscribeToRun } = await setup(); + + startRun({ + sessionId: "s-replay", + message: "hi", + agentSessionId: "s-replay", + }); + + // Generate some events + child._writeLine({ + event: "agent", + stream: "assistant", + data: { delta: "Hello" }, + }); + + await new Promise((r) => setTimeout(r, 50)); + + child.stdout.end(); + await new Promise((r) => setTimeout(r, 50)); + child._emit("close", 0); + + // New subscriber with replay=true + const replayed: (SseEvent | null)[] = []; + subscribeToRun( + "s-replay", + (event) => { + replayed.push(event); + }, + { replay: true }, + ); + + // Should include the text events + null (completion) + expect(replayed.length).toBeGreaterThan(0); + expect(replayed[replayed.length - 1]).toBeNull(); + expect( + replayed.some( + (e) => + e !== null && + e.type === "text-delta" && + e.delta === "Hello", + ), + ).toBe(true); + }); + + it("returns null for unsubscribe when no run exists", async () => { + const { subscribeToRun } = await setup(); + + const unsub = subscribeToRun( + "nonexistent", + () => {}, + { replay: true }, + ); + + expect(unsub).toBeNull(); + }); + }); + + // ── hasActiveRun / getActiveRun ──────────────────────────────────── + + describe("hasActiveRun / getActiveRun", () => { + it("returns true for a running process", async () => { + const { child: _child, startRun, hasActiveRun, getActiveRun } = + await setup(); + + startRun({ + sessionId: "s-active", + message: "hi", + agentSessionId: "s-active", + }); + + expect(hasActiveRun("s-active")).toBe(true); + expect(getActiveRun("s-active")).toBeDefined(); + expect(getActiveRun("s-active")?.status).toBe("running"); + }); + + it("marks status as completed after clean exit", async () => { + const { child, startRun, hasActiveRun, getActiveRun } = + await setup(); + + startRun({ + sessionId: "s-done", + message: "hi", + agentSessionId: "s-done", + }); + + child.stdout.end(); + await new Promise((r) => setTimeout(r, 50)); + child._emit("close", 0); + + expect(hasActiveRun("s-done")).toBe(false); + expect(getActiveRun("s-done")?.status).toBe("completed"); + }); + + it("marks status as error after non-zero exit", async () => { + const { child, startRun, getActiveRun } = await setup(); + + startRun({ + sessionId: "s-err-exit", + message: "hi", + agentSessionId: "s-err-exit", + }); + + child.stdout.end(); + await new Promise((r) => setTimeout(r, 50)); + child._emit("close", 1); + + expect(getActiveRun("s-err-exit")?.status).toBe("error"); + }); + + it("returns false for unknown sessions", async () => { + const { hasActiveRun, getActiveRun } = await setup(); + expect(hasActiveRun("nonexistent")).toBe(false); + expect(getActiveRun("nonexistent")).toBeUndefined(); + }); + }); + + // ── abortRun ────────────────────────────────────────────────────── + + describe("abortRun", () => { + it("kills a running child process", async () => { + const { child, startRun, abortRun } = await setup(); + + startRun({ + sessionId: "s-abort", + message: "hi", + agentSessionId: "s-abort", + }); + + expect(abortRun("s-abort")).toBe(true); + expect(child.kill).toHaveBeenCalledWith("SIGTERM"); + }); + + it("returns false for non-running sessions", async () => { + const { abortRun } = await setup(); + expect(abortRun("nonexistent")).toBe(false); + }); + }); + + // ── duplicate run prevention ────────────────────────────────────── + + describe("duplicate run prevention", () => { + it("throws when starting a run for an already-active session", async () => { + const { startRun } = await setup(); + + startRun({ + sessionId: "s-dup", + message: "first", + agentSessionId: "s-dup", + }); + + expect(() => + startRun({ + sessionId: "s-dup", + message: "second", + agentSessionId: "s-dup", + }), + ).toThrow("Active run already exists"); + }); + }); + + // ── lifecycle events ────────────────────────────────────────────── + + describe("lifecycle events", () => { + it("emits reasoning status on lifecycle start", async () => { + const { child, startRun, subscribeToRun } = await setup(); + + const events: SseEvent[] = []; + + startRun({ + sessionId: "s-lifecycle", + message: "hi", + agentSessionId: "s-lifecycle", + }); + + subscribeToRun( + "s-lifecycle", + (event) => { + if (event) {events.push(event);} + }, + { replay: false }, + ); + + child._writeLine({ + event: "agent", + stream: "lifecycle", + data: { phase: "start" }, + }); + + await new Promise((r) => setTimeout(r, 50)); + + expect(events.some((e) => e.type === "reasoning-start")).toBe( + true, + ); + expect( + events.some( + (e) => + e.type === "reasoning-delta" && + e.delta === "Preparing response...", + ), + ).toBe(true); + + child.stdout.end(); + await new Promise((r) => setTimeout(r, 50)); + child._emit("close", 0); + }); + }); +}); diff --git a/apps/web/lib/active-runs.ts b/apps/web/lib/active-runs.ts index ee94a045ad9..d846bcdfd96 100644 --- a/apps/web/lib/active-runs.ts +++ b/apps/web/lib/active-runs.ts @@ -350,6 +350,16 @@ function wireChildProcess(run: ActiveRun): void { const rl = createInterface({ input: child.stdout! }); + // Prevent unhandled 'error' events on the readline interface. + // When the child process fails to start (e.g. ENOENT — missing script) + // the stdout pipe is destroyed and readline re-emits the error. Without + // this handler Node.js throws "Unhandled 'error' event" which crashes + // the API route instead of surfacing a clean message to the user. + rl.on("error", () => { + // Swallow — the child 'error' / 'close' handlers take care of + // emitting user-visible diagnostics. + }); + rl.on("line", (line: string) => { if (!line.trim()) {return;} diff --git a/apps/web/lib/agent-runner.test.ts b/apps/web/lib/agent-runner.test.ts new file mode 100644 index 00000000000..04c8c395c5e --- /dev/null +++ b/apps/web/lib/agent-runner.test.ts @@ -0,0 +1,311 @@ +import { spawn, type ChildProcess } from "node:child_process"; +import { join } from "node:path"; +import { describe, it, expect, vi, beforeEach, afterEach } from "vitest"; + +vi.mock("node:child_process", () => ({ spawn: vi.fn() })); +vi.mock("node:fs", () => ({ existsSync: vi.fn() })); + +const spawnMock = vi.mocked(spawn); + +/** Minimal mock ChildProcess for testing. */ +function mockChildProcess() { + const events: Record void)[]> = {}; + const child = { + exitCode: null as number | null, + killed: false, + pid: 12345, + stdout: { + on: vi.fn(), + // Act as a minimal readable for createInterface + [Symbol.asyncIterator]: vi.fn(), + }, + stderr: { on: vi.fn() }, + on: vi.fn((event: string, cb: (...args: unknown[]) => void) => { + events[event] = events[event] || []; + events[event].push(cb); + return child; + }), + once: vi.fn((event: string, cb: (...args: unknown[]) => void) => { + events[event] = events[event] || []; + events[event].push(cb); + return child; + }), + kill: vi.fn(), + _emit(event: string, ...args: unknown[]) { + for (const cb of events[event] || []) { + cb(...args); + } + }, + }; + spawnMock.mockReturnValue(child as unknown as ChildProcess); + return child; +} + +describe("agent-runner", () => { + const originalEnv = { ...process.env }; + + beforeEach(() => { + vi.resetModules(); + vi.restoreAllMocks(); + process.env = { ...originalEnv }; + // Re-wire mocks after resetModules + vi.mock("node:child_process", () => ({ spawn: vi.fn() })); + vi.mock("node:fs", () => ({ existsSync: vi.fn() })); + }); + + afterEach(() => { + process.env = originalEnv; + vi.restoreAllMocks(); + }); + + // ── resolvePackageRoot ────────────────────────────────────────────── + + describe("resolvePackageRoot", () => { + it("uses OPENCLAW_ROOT env var when set and valid", async () => { + process.env.OPENCLAW_ROOT = "/opt/ironclaw"; + const { existsSync: mockExists } = await import("node:fs"); + vi.mocked(mockExists).mockImplementation( + (p) => String(p) === "/opt/ironclaw", + ); + + const { resolvePackageRoot } = await import("./agent-runner.js"); + expect(resolvePackageRoot()).toBe("/opt/ironclaw"); + }); + + it("ignores OPENCLAW_ROOT when the path does not exist", async () => { + process.env.OPENCLAW_ROOT = "/nonexistent/path"; + + const { existsSync: mockExists } = await import("node:fs"); + // OPENCLAW_ROOT doesn't exist, but we'll find openclaw.mjs by walking up + vi.mocked(mockExists).mockImplementation((p) => { + return String(p) === join("/pkg", "openclaw.mjs"); + }); + + vi.spyOn(process, "cwd").mockReturnValue("/pkg/apps/web"); + + const { resolvePackageRoot } = await import("./agent-runner.js"); + expect(resolvePackageRoot()).toBe("/pkg"); + }); + + it("finds package root via openclaw.mjs in production (standalone cwd)", async () => { + delete process.env.OPENCLAW_ROOT; + + const { existsSync: mockExists } = await import("node:fs"); + vi.mocked(mockExists).mockImplementation((p) => { + // Only openclaw.mjs exists at the real package root + return String(p) === join("/pkg", "openclaw.mjs"); + }); + + // Standalone mode: cwd is deep inside .next/standalone + vi.spyOn(process, "cwd").mockReturnValue( + "/pkg/apps/web/.next/standalone/apps/web", + ); + + const { resolvePackageRoot } = await import("./agent-runner.js"); + expect(resolvePackageRoot()).toBe("/pkg"); + }); + + it("finds package root via scripts/run-node.mjs in dev workspace", async () => { + delete process.env.OPENCLAW_ROOT; + + const { existsSync: mockExists } = await import("node:fs"); + vi.mocked(mockExists).mockImplementation((p) => { + return String(p) === join("/repo", "scripts", "run-node.mjs"); + }); + + vi.spyOn(process, "cwd").mockReturnValue("/repo/apps/web"); + + const { resolvePackageRoot } = await import("./agent-runner.js"); + expect(resolvePackageRoot()).toBe("/repo"); + }); + + it("falls back to legacy 2-levels-up heuristic", async () => { + delete process.env.OPENCLAW_ROOT; + + const { existsSync: mockExists } = await import("node:fs"); + vi.mocked(mockExists).mockReturnValue(false); // nothing found + + vi.spyOn(process, "cwd").mockReturnValue("/unknown/apps/web"); + + const { resolvePackageRoot } = await import("./agent-runner.js"); + expect(resolvePackageRoot()).toBe( + join("/unknown/apps/web", "..", ".."), + ); + }); + }); + + // ── spawnAgentProcess ────────────────────────────────────────────── + + describe("spawnAgentProcess", () => { + it("uses scripts/run-node.mjs in dev when both scripts exist", async () => { + delete process.env.OPENCLAW_ROOT; + + const { existsSync: mockExists } = await import("node:fs"); + const { spawn: mockSpawn } = await import("node:child_process"); + + vi.mocked(mockExists).mockImplementation((p) => { + const s = String(p); + // Package root found via scripts/run-node.mjs + if (s === join("/repo", "scripts", "run-node.mjs")) {return true;} + // openclaw.mjs also exists in dev + if (s === join("/repo", "openclaw.mjs")) {return true;} + return false; + }); + + vi.spyOn(process, "cwd").mockReturnValue("/repo/apps/web"); + + const child = mockChildProcess(); + vi.mocked(mockSpawn).mockReturnValue( + child as unknown as ChildProcess, + ); + + const { spawnAgentProcess } = await import("./agent-runner.js"); + spawnAgentProcess("hello"); + + expect(vi.mocked(mockSpawn)).toHaveBeenCalledWith( + "node", + expect.arrayContaining([ + join("/repo", "scripts", "run-node.mjs"), + "agent", + "--agent", + "main", + "--message", + "hello", + "--stream-json", + ]), + expect.objectContaining({ + cwd: "/repo", + }), + ); + }); + + it("falls back to openclaw.mjs in production (standalone)", async () => { + process.env.OPENCLAW_ROOT = "/pkg"; + + const { existsSync: mockExists } = await import("node:fs"); + const { spawn: mockSpawn } = await import("node:child_process"); + + vi.mocked(mockExists).mockImplementation((p) => { + const s = String(p); + if (s === "/pkg") {return true;} // OPENCLAW_ROOT valid + if (s === join("/pkg", "openclaw.mjs")) {return true;} // prod script + // scripts/run-node.mjs does NOT exist (production install) + return false; + }); + + const child = mockChildProcess(); + vi.mocked(mockSpawn).mockReturnValue( + child as unknown as ChildProcess, + ); + + const { spawnAgentProcess } = await import("./agent-runner.js"); + spawnAgentProcess("test message"); + + expect(vi.mocked(mockSpawn)).toHaveBeenCalledWith( + "node", + expect.arrayContaining([ + join("/pkg", "openclaw.mjs"), + "agent", + "--agent", + "main", + "--message", + "test message", + "--stream-json", + ]), + expect.objectContaining({ + cwd: "/pkg", + }), + ); + }); + + it("includes session-key and lane args when agentSessionId is set", async () => { + process.env.OPENCLAW_ROOT = "/pkg"; + + const { existsSync: mockExists } = await import("node:fs"); + const { spawn: mockSpawn } = await import("node:child_process"); + + vi.mocked(mockExists).mockImplementation((p) => { + const s = String(p); + return s === "/pkg" || s === join("/pkg", "openclaw.mjs"); + }); + + const child = mockChildProcess(); + vi.mocked(mockSpawn).mockReturnValue( + child as unknown as ChildProcess, + ); + + const { spawnAgentProcess } = await import("./agent-runner.js"); + spawnAgentProcess("msg", "session-123"); + + expect(vi.mocked(mockSpawn)).toHaveBeenCalledWith( + "node", + expect.arrayContaining([ + "--session-key", + "agent:main:subagent:session-123", + "--lane", + "subagent", + ]), + expect.anything(), + ); + }); + }); + + // ── parseAgentErrorMessage ────────────────────────────────────────── + + describe("parseAgentErrorMessage", () => { + it("extracts message from error field", async () => { + const { parseAgentErrorMessage } = await import( + "./agent-runner.js" + ); + expect( + parseAgentErrorMessage({ error: "something went wrong" }), + ).toBe("something went wrong"); + }); + + it("extracts message from JSON error body", async () => { + const { parseAgentErrorMessage } = await import( + "./agent-runner.js" + ); + const result = parseAgentErrorMessage({ + errorMessage: + '402 {"error":{"message":"Insufficient funds"}}', + }); + expect(result).toBe("Insufficient funds"); + }); + + it("returns undefined for empty data", async () => { + const { parseAgentErrorMessage } = await import( + "./agent-runner.js" + ); + expect(parseAgentErrorMessage(undefined)).toBeUndefined(); + expect(parseAgentErrorMessage({})).toBeUndefined(); + }); + }); + + // ── parseErrorFromStderr ─────────────────────────────────────────── + + describe("parseErrorFromStderr", () => { + it("extracts JSON error message from stderr", async () => { + const { parseErrorFromStderr } = await import( + "./agent-runner.js" + ); + const stderr = `Some log line\n{"error":{"message":"Rate limit exceeded"}}\n`; + expect(parseErrorFromStderr(stderr)).toBe("Rate limit exceeded"); + }); + + it("extracts error line containing 'error' keyword", async () => { + const { parseErrorFromStderr } = await import( + "./agent-runner.js" + ); + const stderr = "Module not found error: cannot resolve 'next'"; + expect(parseErrorFromStderr(stderr)).toBeTruthy(); + }); + + it("returns undefined for empty stderr", async () => { + const { parseErrorFromStderr } = await import( + "./agent-runner.js" + ); + expect(parseErrorFromStderr("")).toBeUndefined(); + }); + }); +}); diff --git a/apps/web/lib/agent-runner.ts b/apps/web/lib/agent-runner.ts index ce4dfaa8293..df0513d7899 100644 --- a/apps/web/lib/agent-runner.ts +++ b/apps/web/lib/agent-runner.ts @@ -1,6 +1,7 @@ import { spawn } from "node:child_process"; import { createInterface } from "node:readline"; -import { join } from "node:path"; +import { existsSync } from "node:fs"; +import { dirname, join } from "node:path"; export type AgentEvent = { event: string; @@ -108,20 +109,65 @@ export type RunAgentOptions = { sessionId?: string; }; +/** + * Resolve the ironclaw/openclaw package root directory. + * + * In a dev workspace the cwd is `/apps/web` and `scripts/run-node.mjs` + * exists two levels up. In a production standalone build the cwd is + * `/apps/web/.next/standalone/apps/web/` — walking two levels up lands + * inside the `.next` tree, not at the package root. + * + * Strategy: + * 1. Honour `OPENCLAW_ROOT` env var (set by the gateway when spawning the + * standalone server — guaranteed correct). + * 2. Walk upward from cwd looking for `openclaw.mjs` (production) or + * `scripts/run-node.mjs` (dev). + * 3. Fallback: original 2-levels-up heuristic. + */ +export function resolvePackageRoot(): string { + // 1. Env var (fastest, most reliable in standalone mode). + if (process.env.OPENCLAW_ROOT && existsSync(process.env.OPENCLAW_ROOT)) { + return process.env.OPENCLAW_ROOT; + } + + // 2. Walk up from cwd. + let dir = process.cwd(); + for (let i = 0; i < 20; i++) { + if ( + existsSync(join(dir, "openclaw.mjs")) || + existsSync(join(dir, "scripts", "run-node.mjs")) + ) { + return dir; + } + const parent = dirname(dir); + if (parent === dir) {break;} + dir = parent; + } + + // 3. Fallback: legacy heuristic. + const cwd = process.cwd(); + return cwd.endsWith(join("apps", "web")) + ? join(cwd, "..", "..") + : cwd; +} + /** * Spawn an agent child process and return the ChildProcess handle. * Shared between `runAgent` (legacy callback API) and the ActiveRunManager. + * + * In a dev workspace uses `scripts/run-node.mjs` (auto-rebuilds TypeScript). + * In production / global-install uses `openclaw.mjs` directly (pre-built). */ export function spawnAgentProcess( message: string, agentSessionId?: string, ): ReturnType { - const cwd = process.cwd(); - const root = cwd.endsWith(join("apps", "web")) - ? join(cwd, "..", "..") - : cwd; + const root = resolvePackageRoot(); - const scriptPath = join(root, "scripts", "run-node.mjs"); + // Dev: scripts/run-node.mjs (auto-rebuild). Prod: openclaw.mjs (pre-built). + const devScript = join(root, "scripts", "run-node.mjs"); + const prodScript = join(root, "openclaw.mjs"); + const scriptPath = existsSync(devScript) ? devScript : prodScript; const args = [ scriptPath, @@ -226,6 +272,11 @@ export async function runAgent( const rl = createInterface({ input: child.stdout! }); + // Prevent unhandled 'error' events when the child process fails + // to start (e.g. ENOENT). The child's own 'error' handler below + // surfaces the real error to the caller. + rl.on("error", () => { /* handled by child error/close */ }); + rl.on("line", (line: string) => { if (!line.trim()) {return;} diff --git a/package.json b/package.json index f820cfafb94..46f3b9b1a51 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "ironclaw", - "version": "2026.2.10-1.8", + "version": "2026.2.10-1.9", "description": "AI-powered CRM platform with multi-channel agent gateway, DuckDB workspace, and knowledge management", "keywords": [], "license": "MIT", diff --git a/src/gateway/server-web-app.ts b/src/gateway/server-web-app.ts index 4c4d1c38837..7751030cb82 100644 --- a/src/gateway/server-web-app.ts +++ b/src/gateway/server-web-app.ts @@ -224,13 +224,24 @@ export async function startWebAppIfEnabled( // Production: prefer standalone, fall back to legacy, then workspace build. const serverJs = resolveStandaloneServerJs(webAppDir); + // The web app's agent-runner needs to find openclaw.mjs or + // scripts/run-node.mjs to spawn agent processes. In standalone mode + // the server's cwd is deep inside .next/standalone/ so we pass the + // actual package root via env so it doesn't have to guess. + const packageRoot = path.resolve(webAppDir, "..", ".."); + if (fs.existsSync(serverJs)) { // Standalone build found — just run it (npm global install or post-build). log.info(`starting web app (standalone) on port ${port}…`); child = spawn("node", [serverJs], { cwd: path.dirname(serverJs), stdio: "pipe", - env: { ...process.env, PORT: String(port), HOSTNAME: "0.0.0.0" }, + env: { + ...process.env, + PORT: String(port), + HOSTNAME: "0.0.0.0", + OPENCLAW_ROOT: packageRoot, + }, }); } else if (hasLegacyNextBuild(webAppDir)) { // Legacy build — use `next start` (dev workspace that hasn't rebuilt). @@ -253,7 +264,12 @@ export async function startWebAppIfEnabled( child = spawn("node", [serverJs], { cwd: path.dirname(serverJs), stdio: "pipe", - env: { ...process.env, PORT: String(port), HOSTNAME: "0.0.0.0" }, + env: { + ...process.env, + PORT: String(port), + HOSTNAME: "0.0.0.0", + OPENCLAW_ROOT: packageRoot, + }, }); } else { log.info(`starting web app (production) on port ${port}…`);