Web app: fix chat stream in standalone builds and add tests
- Fix spawnAgentProcess path resolution: walk up to find package root instead of assuming 2 levels up from apps/web (breaks in standalone where cwd is deep inside .next/standalone/); use openclaw.mjs in production since scripts/run-node.mjs isn't shipped in the package - Add missing readline error handlers in active-runs.ts and agent-runner.ts to prevent "Unhandled 'error' event" crashes when the child process fails to start - Pass OPENCLAW_ROOT env var from gateway to standalone server so the web app can reliably find the CLI entry point - Add 32 tests covering path resolution, SSE streaming, error handling, run lifecycle, replay, and abort Co-authored-by: Cursor <cursoragent@cursor.com>
This commit is contained in:
parent
15b0b0bcc8
commit
23172896af
670
apps/web/lib/active-runs.test.ts
Normal file
670
apps/web/lib/active-runs.test.ts
Normal file
@ -0,0 +1,670 @@
|
||||
import { type ChildProcess } from "node:child_process";
|
||||
import { PassThrough } from "node:stream";
|
||||
import { describe, it, expect, vi, beforeEach, afterEach } from "vitest";
|
||||
|
||||
// Mock agent-runner to control spawnAgentProcess
|
||||
vi.mock("./agent-runner", () => ({
|
||||
spawnAgentProcess: vi.fn(),
|
||||
extractToolResult: vi.fn((raw: unknown) => {
|
||||
if (!raw) {return undefined;}
|
||||
if (typeof raw === "string") {return { text: raw };}
|
||||
return { text: undefined, details: raw as Record<string, unknown> };
|
||||
}),
|
||||
buildToolOutput: vi.fn(
|
||||
(result?: { text?: string }) => (result ? { text: result.text } : {}),
|
||||
),
|
||||
parseAgentErrorMessage: vi.fn((data?: Record<string, unknown>) => {
|
||||
if (data?.error && typeof data.error === "string") {return data.error;}
|
||||
if (data?.message && typeof data.message === "string") {return data.message;}
|
||||
return undefined;
|
||||
}),
|
||||
parseErrorBody: vi.fn((raw: string) => raw),
|
||||
parseErrorFromStderr: vi.fn((stderr: string) => {
|
||||
if (!stderr) {return undefined;}
|
||||
if (/error/i.test(stderr)) {return stderr.trim();}
|
||||
return undefined;
|
||||
}),
|
||||
}));
|
||||
|
||||
// Mock fs operations used for persistence so tests don't hit disk
|
||||
vi.mock("node:fs", async (importOriginal) => {
|
||||
const original = await importOriginal<typeof import("node:fs")>();
|
||||
return {
|
||||
...original,
|
||||
existsSync: vi.fn(() => false),
|
||||
readFileSync: vi.fn(() => ""),
|
||||
writeFileSync: vi.fn(),
|
||||
mkdirSync: vi.fn(),
|
||||
};
|
||||
});
|
||||
|
||||
import type { SseEvent } from "./active-runs.js";
|
||||
|
||||
/**
|
||||
* Create a mock child process with a real PassThrough stream for stdout,
|
||||
* so the readline interface inside wireChildProcess actually receives data.
|
||||
*/
|
||||
function createMockChild() {
|
||||
const events: Record<string, ((...args: unknown[]) => void)[]> = {};
|
||||
const stdoutStream = new PassThrough();
|
||||
const stderrStream = new PassThrough();
|
||||
|
||||
const child = {
|
||||
exitCode: null as number | null,
|
||||
killed: false,
|
||||
pid: 12345,
|
||||
stdout: stdoutStream,
|
||||
stderr: stderrStream,
|
||||
on: vi.fn((event: string, cb: (...args: unknown[]) => void) => {
|
||||
events[event] = events[event] || [];
|
||||
events[event].push(cb);
|
||||
return child;
|
||||
}),
|
||||
once: vi.fn((event: string, cb: (...args: unknown[]) => void) => {
|
||||
events[event] = events[event] || [];
|
||||
events[event].push(cb);
|
||||
return child;
|
||||
}),
|
||||
kill: vi.fn(),
|
||||
/** Emit an event to all registered listeners. */
|
||||
_emit(event: string, ...args: unknown[]) {
|
||||
for (const cb of events[event] || []) {
|
||||
cb(...args);
|
||||
}
|
||||
},
|
||||
/** Write a JSON line to stdout (simulating agent output). */
|
||||
_writeLine(jsonObj: Record<string, unknown>) {
|
||||
stdoutStream.write(JSON.stringify(jsonObj) + "\n");
|
||||
},
|
||||
/** Write raw text to stderr. */
|
||||
_writeStderr(text: string) {
|
||||
stderrStream.write(Buffer.from(text));
|
||||
},
|
||||
};
|
||||
|
||||
return child;
|
||||
}
|
||||
|
||||
describe("active-runs", () => {
|
||||
beforeEach(() => {
|
||||
vi.resetModules();
|
||||
|
||||
// Re-wire mocks after resetModules
|
||||
vi.mock("./agent-runner", () => ({
|
||||
spawnAgentProcess: vi.fn(),
|
||||
extractToolResult: vi.fn((raw: unknown) => {
|
||||
if (!raw) {return undefined;}
|
||||
if (typeof raw === "string") {return { text: raw };}
|
||||
return {
|
||||
text: undefined,
|
||||
details: raw as Record<string, unknown>,
|
||||
};
|
||||
}),
|
||||
buildToolOutput: vi.fn(
|
||||
(result?: { text?: string }) =>
|
||||
result ? { text: result.text } : {},
|
||||
),
|
||||
parseAgentErrorMessage: vi.fn(
|
||||
(data?: Record<string, unknown>) => {
|
||||
if (data?.error && typeof data.error === "string")
|
||||
{return data.error;}
|
||||
if (data?.message && typeof data.message === "string")
|
||||
{return data.message;}
|
||||
return undefined;
|
||||
},
|
||||
),
|
||||
parseErrorBody: vi.fn((raw: string) => raw),
|
||||
parseErrorFromStderr: vi.fn((stderr: string) => {
|
||||
if (!stderr) {return undefined;}
|
||||
if (/error/i.test(stderr)) {return stderr.trim();}
|
||||
return undefined;
|
||||
}),
|
||||
}));
|
||||
|
||||
vi.mock("node:fs", async (importOriginal) => {
|
||||
const original =
|
||||
await importOriginal<typeof import("node:fs")>();
|
||||
return {
|
||||
...original,
|
||||
existsSync: vi.fn(() => false),
|
||||
readFileSync: vi.fn(() => ""),
|
||||
writeFileSync: vi.fn(),
|
||||
mkdirSync: vi.fn(),
|
||||
};
|
||||
});
|
||||
});
|
||||
|
||||
afterEach(() => {
|
||||
vi.restoreAllMocks();
|
||||
});
|
||||
|
||||
/** Helper: set up a mock child and import the active-runs module fresh. */
|
||||
async function setup() {
|
||||
const child = createMockChild();
|
||||
|
||||
const { spawnAgentProcess } = await import("./agent-runner.js");
|
||||
vi.mocked(spawnAgentProcess).mockReturnValue(
|
||||
child as unknown as ChildProcess,
|
||||
);
|
||||
|
||||
const mod = await import("./active-runs.js");
|
||||
return { child, ...mod };
|
||||
}
|
||||
|
||||
// ── startRun + subscribeToRun ──────────────────────────────────────
|
||||
|
||||
describe("startRun + subscribeToRun", () => {
|
||||
it("creates a run and emits fallback text when process exits without output", async () => {
|
||||
const { child, startRun, subscribeToRun } = await setup();
|
||||
|
||||
const events: SseEvent[] = [];
|
||||
|
||||
startRun({
|
||||
sessionId: "s1",
|
||||
message: "hello",
|
||||
agentSessionId: "s1",
|
||||
});
|
||||
|
||||
subscribeToRun(
|
||||
"s1",
|
||||
(event) => {
|
||||
if (event) {events.push(event);}
|
||||
},
|
||||
{ replay: false },
|
||||
);
|
||||
|
||||
// Close stdout before emitting close, so readline finishes
|
||||
child.stdout.end();
|
||||
// Small delay to let readline drain
|
||||
await new Promise((r) => setTimeout(r, 50));
|
||||
|
||||
child._emit("close", 0);
|
||||
|
||||
// Should have emitted fallback "[error] No response from agent."
|
||||
expect(
|
||||
events.some(
|
||||
(e) =>
|
||||
e.type === "text-delta" &&
|
||||
typeof e.delta === "string" &&
|
||||
(e.delta).includes("No response"),
|
||||
),
|
||||
).toBe(true);
|
||||
});
|
||||
|
||||
it("streams assistant text events for agent assistant output", async () => {
|
||||
const { child, startRun, subscribeToRun } = await setup();
|
||||
|
||||
const events: SseEvent[] = [];
|
||||
|
||||
startRun({
|
||||
sessionId: "s-text",
|
||||
message: "say hi",
|
||||
agentSessionId: "s-text",
|
||||
});
|
||||
|
||||
subscribeToRun(
|
||||
"s-text",
|
||||
(event) => {
|
||||
if (event) {events.push(event);}
|
||||
},
|
||||
{ replay: false },
|
||||
);
|
||||
|
||||
// Emit an assistant text delta via stdout JSON
|
||||
child._writeLine({
|
||||
event: "agent",
|
||||
stream: "assistant",
|
||||
data: { delta: "Hello world!" },
|
||||
});
|
||||
|
||||
// Give readline a tick to process
|
||||
await new Promise((r) => setTimeout(r, 50));
|
||||
|
||||
// Should have text-start + text-delta
|
||||
expect(events.some((e) => e.type === "text-start")).toBe(true);
|
||||
expect(
|
||||
events.some(
|
||||
(e) => e.type === "text-delta" && e.delta === "Hello world!",
|
||||
),
|
||||
).toBe(true);
|
||||
|
||||
// Clean up
|
||||
child.stdout.end();
|
||||
await new Promise((r) => setTimeout(r, 50));
|
||||
child._emit("close", 0);
|
||||
});
|
||||
|
||||
it("streams reasoning events for thinking output", async () => {
|
||||
const { child, startRun, subscribeToRun } = await setup();
|
||||
|
||||
const events: SseEvent[] = [];
|
||||
|
||||
startRun({
|
||||
sessionId: "s-think",
|
||||
message: "think about it",
|
||||
agentSessionId: "s-think",
|
||||
});
|
||||
|
||||
subscribeToRun(
|
||||
"s-think",
|
||||
(event) => {
|
||||
if (event) {events.push(event);}
|
||||
},
|
||||
{ replay: false },
|
||||
);
|
||||
|
||||
child._writeLine({
|
||||
event: "agent",
|
||||
stream: "thinking",
|
||||
data: { delta: "Let me think..." },
|
||||
});
|
||||
|
||||
await new Promise((r) => setTimeout(r, 50));
|
||||
|
||||
expect(events.some((e) => e.type === "reasoning-start")).toBe(
|
||||
true,
|
||||
);
|
||||
expect(
|
||||
events.some(
|
||||
(e) =>
|
||||
e.type === "reasoning-delta" &&
|
||||
e.delta === "Let me think...",
|
||||
),
|
||||
).toBe(true);
|
||||
|
||||
child.stdout.end();
|
||||
await new Promise((r) => setTimeout(r, 50));
|
||||
child._emit("close", 0);
|
||||
});
|
||||
|
||||
it("streams tool-input-start and tool-input-available for tool calls", async () => {
|
||||
const { child, startRun, subscribeToRun } = await setup();
|
||||
|
||||
const events: SseEvent[] = [];
|
||||
|
||||
startRun({
|
||||
sessionId: "s-tool",
|
||||
message: "use a tool",
|
||||
agentSessionId: "s-tool",
|
||||
});
|
||||
|
||||
subscribeToRun(
|
||||
"s-tool",
|
||||
(event) => {
|
||||
if (event) {events.push(event);}
|
||||
},
|
||||
{ replay: false },
|
||||
);
|
||||
|
||||
child._writeLine({
|
||||
event: "agent",
|
||||
stream: "tool",
|
||||
data: {
|
||||
phase: "start",
|
||||
toolCallId: "tc-1",
|
||||
name: "search",
|
||||
args: { query: "test" },
|
||||
},
|
||||
});
|
||||
|
||||
await new Promise((r) => setTimeout(r, 50));
|
||||
|
||||
expect(
|
||||
events.some(
|
||||
(e) =>
|
||||
e.type === "tool-input-start" &&
|
||||
e.toolCallId === "tc-1",
|
||||
),
|
||||
).toBe(true);
|
||||
expect(
|
||||
events.some(
|
||||
(e) =>
|
||||
e.type === "tool-input-available" &&
|
||||
e.toolCallId === "tc-1" &&
|
||||
e.toolName === "search",
|
||||
),
|
||||
).toBe(true);
|
||||
|
||||
child.stdout.end();
|
||||
await new Promise((r) => setTimeout(r, 50));
|
||||
child._emit("close", 0);
|
||||
});
|
||||
|
||||
it("emits error text for non-zero exit code", async () => {
|
||||
const { child, startRun, subscribeToRun } = await setup();
|
||||
|
||||
const events: SseEvent[] = [];
|
||||
|
||||
startRun({
|
||||
sessionId: "s-fail",
|
||||
message: "fail",
|
||||
agentSessionId: "s-fail",
|
||||
});
|
||||
|
||||
subscribeToRun(
|
||||
"s-fail",
|
||||
(event) => {
|
||||
if (event) {events.push(event);}
|
||||
},
|
||||
{ replay: false },
|
||||
);
|
||||
|
||||
child.stdout.end();
|
||||
await new Promise((r) => setTimeout(r, 50));
|
||||
child._emit("close", 1);
|
||||
|
||||
expect(
|
||||
events.some(
|
||||
(e) =>
|
||||
e.type === "text-delta" &&
|
||||
typeof e.delta === "string" &&
|
||||
(e.delta).includes("exited with code 1"),
|
||||
),
|
||||
).toBe(true);
|
||||
});
|
||||
|
||||
it("signals completion (null) to subscribers when run finishes", async () => {
|
||||
const { child, startRun, subscribeToRun } = await setup();
|
||||
|
||||
const completed: boolean[] = [];
|
||||
|
||||
startRun({
|
||||
sessionId: "s-complete",
|
||||
message: "hi",
|
||||
agentSessionId: "s-complete",
|
||||
});
|
||||
|
||||
subscribeToRun(
|
||||
"s-complete",
|
||||
(event) => {
|
||||
if (event === null) {completed.push(true);}
|
||||
},
|
||||
{ replay: false },
|
||||
);
|
||||
|
||||
child.stdout.end();
|
||||
await new Promise((r) => setTimeout(r, 50));
|
||||
child._emit("close", 0);
|
||||
|
||||
expect(completed).toHaveLength(1);
|
||||
});
|
||||
});
|
||||
|
||||
// ── child process error handling ────────────────────────────────────
|
||||
|
||||
describe("child process error handling", () => {
|
||||
it("emits 'Failed to start agent' on spawn error (ENOENT)", async () => {
|
||||
const { child, startRun, subscribeToRun } = await setup();
|
||||
|
||||
const events: SseEvent[] = [];
|
||||
const completions: boolean[] = [];
|
||||
|
||||
startRun({
|
||||
sessionId: "s-enoent",
|
||||
message: "hello",
|
||||
agentSessionId: "s-enoent",
|
||||
});
|
||||
|
||||
subscribeToRun(
|
||||
"s-enoent",
|
||||
(event) => {
|
||||
if (event) {
|
||||
events.push(event);
|
||||
} else {
|
||||
completions.push(true);
|
||||
}
|
||||
},
|
||||
{ replay: false },
|
||||
);
|
||||
|
||||
const err = new Error("spawn node ENOENT");
|
||||
(err as NodeJS.ErrnoException).code = "ENOENT";
|
||||
child._emit("error", err);
|
||||
|
||||
expect(
|
||||
events.some(
|
||||
(e) =>
|
||||
e.type === "text-delta" &&
|
||||
typeof e.delta === "string" &&
|
||||
(e.delta).includes("Failed to start agent"),
|
||||
),
|
||||
).toBe(true);
|
||||
|
||||
expect(completions).toHaveLength(1);
|
||||
});
|
||||
|
||||
it("does not crash on readline error (the root cause of 'Unhandled error event')", async () => {
|
||||
const { child, startRun } = await setup();
|
||||
|
||||
startRun({
|
||||
sessionId: "s-rl-err",
|
||||
message: "hello",
|
||||
agentSessionId: "s-rl-err",
|
||||
});
|
||||
|
||||
// Simulate what happens when a child process fails to start:
|
||||
// stdout stream is destroyed with an error, which readline re-emits.
|
||||
// Before the fix, this would throw "Unhandled 'error' event".
|
||||
// After the fix, the rl.on("error") handler swallows it.
|
||||
expect(() => {
|
||||
child.stdout.destroy(new Error("stream destroyed"));
|
||||
}).not.toThrow();
|
||||
|
||||
// Give a tick for the error to propagate
|
||||
await new Promise((r) => setTimeout(r, 50));
|
||||
|
||||
// The run should still be tracked (error handler on child takes care of cleanup)
|
||||
});
|
||||
});
|
||||
|
||||
// ── subscribeToRun replay ──────────────────────────────────────────
|
||||
|
||||
describe("subscribeToRun replay", () => {
|
||||
it("replays buffered events to new subscribers", async () => {
|
||||
const { child, startRun, subscribeToRun } = await setup();
|
||||
|
||||
startRun({
|
||||
sessionId: "s-replay",
|
||||
message: "hi",
|
||||
agentSessionId: "s-replay",
|
||||
});
|
||||
|
||||
// Generate some events
|
||||
child._writeLine({
|
||||
event: "agent",
|
||||
stream: "assistant",
|
||||
data: { delta: "Hello" },
|
||||
});
|
||||
|
||||
await new Promise((r) => setTimeout(r, 50));
|
||||
|
||||
child.stdout.end();
|
||||
await new Promise((r) => setTimeout(r, 50));
|
||||
child._emit("close", 0);
|
||||
|
||||
// New subscriber with replay=true
|
||||
const replayed: (SseEvent | null)[] = [];
|
||||
subscribeToRun(
|
||||
"s-replay",
|
||||
(event) => {
|
||||
replayed.push(event);
|
||||
},
|
||||
{ replay: true },
|
||||
);
|
||||
|
||||
// Should include the text events + null (completion)
|
||||
expect(replayed.length).toBeGreaterThan(0);
|
||||
expect(replayed[replayed.length - 1]).toBeNull();
|
||||
expect(
|
||||
replayed.some(
|
||||
(e) =>
|
||||
e !== null &&
|
||||
e.type === "text-delta" &&
|
||||
e.delta === "Hello",
|
||||
),
|
||||
).toBe(true);
|
||||
});
|
||||
|
||||
it("returns null for unsubscribe when no run exists", async () => {
|
||||
const { subscribeToRun } = await setup();
|
||||
|
||||
const unsub = subscribeToRun(
|
||||
"nonexistent",
|
||||
() => {},
|
||||
{ replay: true },
|
||||
);
|
||||
|
||||
expect(unsub).toBeNull();
|
||||
});
|
||||
});
|
||||
|
||||
// ── hasActiveRun / getActiveRun ────────────────────────────────────
|
||||
|
||||
describe("hasActiveRun / getActiveRun", () => {
|
||||
it("returns true for a running process", async () => {
|
||||
const { child: _child, startRun, hasActiveRun, getActiveRun } =
|
||||
await setup();
|
||||
|
||||
startRun({
|
||||
sessionId: "s-active",
|
||||
message: "hi",
|
||||
agentSessionId: "s-active",
|
||||
});
|
||||
|
||||
expect(hasActiveRun("s-active")).toBe(true);
|
||||
expect(getActiveRun("s-active")).toBeDefined();
|
||||
expect(getActiveRun("s-active")?.status).toBe("running");
|
||||
});
|
||||
|
||||
it("marks status as completed after clean exit", async () => {
|
||||
const { child, startRun, hasActiveRun, getActiveRun } =
|
||||
await setup();
|
||||
|
||||
startRun({
|
||||
sessionId: "s-done",
|
||||
message: "hi",
|
||||
agentSessionId: "s-done",
|
||||
});
|
||||
|
||||
child.stdout.end();
|
||||
await new Promise((r) => setTimeout(r, 50));
|
||||
child._emit("close", 0);
|
||||
|
||||
expect(hasActiveRun("s-done")).toBe(false);
|
||||
expect(getActiveRun("s-done")?.status).toBe("completed");
|
||||
});
|
||||
|
||||
it("marks status as error after non-zero exit", async () => {
|
||||
const { child, startRun, getActiveRun } = await setup();
|
||||
|
||||
startRun({
|
||||
sessionId: "s-err-exit",
|
||||
message: "hi",
|
||||
agentSessionId: "s-err-exit",
|
||||
});
|
||||
|
||||
child.stdout.end();
|
||||
await new Promise((r) => setTimeout(r, 50));
|
||||
child._emit("close", 1);
|
||||
|
||||
expect(getActiveRun("s-err-exit")?.status).toBe("error");
|
||||
});
|
||||
|
||||
it("returns false for unknown sessions", async () => {
|
||||
const { hasActiveRun, getActiveRun } = await setup();
|
||||
expect(hasActiveRun("nonexistent")).toBe(false);
|
||||
expect(getActiveRun("nonexistent")).toBeUndefined();
|
||||
});
|
||||
});
|
||||
|
||||
// ── abortRun ──────────────────────────────────────────────────────
|
||||
|
||||
describe("abortRun", () => {
|
||||
it("kills a running child process", async () => {
|
||||
const { child, startRun, abortRun } = await setup();
|
||||
|
||||
startRun({
|
||||
sessionId: "s-abort",
|
||||
message: "hi",
|
||||
agentSessionId: "s-abort",
|
||||
});
|
||||
|
||||
expect(abortRun("s-abort")).toBe(true);
|
||||
expect(child.kill).toHaveBeenCalledWith("SIGTERM");
|
||||
});
|
||||
|
||||
it("returns false for non-running sessions", async () => {
|
||||
const { abortRun } = await setup();
|
||||
expect(abortRun("nonexistent")).toBe(false);
|
||||
});
|
||||
});
|
||||
|
||||
// ── duplicate run prevention ──────────────────────────────────────
|
||||
|
||||
describe("duplicate run prevention", () => {
|
||||
it("throws when starting a run for an already-active session", async () => {
|
||||
const { startRun } = await setup();
|
||||
|
||||
startRun({
|
||||
sessionId: "s-dup",
|
||||
message: "first",
|
||||
agentSessionId: "s-dup",
|
||||
});
|
||||
|
||||
expect(() =>
|
||||
startRun({
|
||||
sessionId: "s-dup",
|
||||
message: "second",
|
||||
agentSessionId: "s-dup",
|
||||
}),
|
||||
).toThrow("Active run already exists");
|
||||
});
|
||||
});
|
||||
|
||||
// ── lifecycle events ──────────────────────────────────────────────
|
||||
|
||||
describe("lifecycle events", () => {
|
||||
it("emits reasoning status on lifecycle start", async () => {
|
||||
const { child, startRun, subscribeToRun } = await setup();
|
||||
|
||||
const events: SseEvent[] = [];
|
||||
|
||||
startRun({
|
||||
sessionId: "s-lifecycle",
|
||||
message: "hi",
|
||||
agentSessionId: "s-lifecycle",
|
||||
});
|
||||
|
||||
subscribeToRun(
|
||||
"s-lifecycle",
|
||||
(event) => {
|
||||
if (event) {events.push(event);}
|
||||
},
|
||||
{ replay: false },
|
||||
);
|
||||
|
||||
child._writeLine({
|
||||
event: "agent",
|
||||
stream: "lifecycle",
|
||||
data: { phase: "start" },
|
||||
});
|
||||
|
||||
await new Promise((r) => setTimeout(r, 50));
|
||||
|
||||
expect(events.some((e) => e.type === "reasoning-start")).toBe(
|
||||
true,
|
||||
);
|
||||
expect(
|
||||
events.some(
|
||||
(e) =>
|
||||
e.type === "reasoning-delta" &&
|
||||
e.delta === "Preparing response...",
|
||||
),
|
||||
).toBe(true);
|
||||
|
||||
child.stdout.end();
|
||||
await new Promise((r) => setTimeout(r, 50));
|
||||
child._emit("close", 0);
|
||||
});
|
||||
});
|
||||
});
|
||||
@ -350,6 +350,16 @@ function wireChildProcess(run: ActiveRun): void {
|
||||
|
||||
const rl = createInterface({ input: child.stdout! });
|
||||
|
||||
// Prevent unhandled 'error' events on the readline interface.
|
||||
// When the child process fails to start (e.g. ENOENT — missing script)
|
||||
// the stdout pipe is destroyed and readline re-emits the error. Without
|
||||
// this handler Node.js throws "Unhandled 'error' event" which crashes
|
||||
// the API route instead of surfacing a clean message to the user.
|
||||
rl.on("error", () => {
|
||||
// Swallow — the child 'error' / 'close' handlers take care of
|
||||
// emitting user-visible diagnostics.
|
||||
});
|
||||
|
||||
rl.on("line", (line: string) => {
|
||||
if (!line.trim()) {return;}
|
||||
|
||||
|
||||
311
apps/web/lib/agent-runner.test.ts
Normal file
311
apps/web/lib/agent-runner.test.ts
Normal file
@ -0,0 +1,311 @@
|
||||
import { spawn, type ChildProcess } from "node:child_process";
|
||||
import { join } from "node:path";
|
||||
import { describe, it, expect, vi, beforeEach, afterEach } from "vitest";
|
||||
|
||||
vi.mock("node:child_process", () => ({ spawn: vi.fn() }));
|
||||
vi.mock("node:fs", () => ({ existsSync: vi.fn() }));
|
||||
|
||||
const spawnMock = vi.mocked(spawn);
|
||||
|
||||
/** Minimal mock ChildProcess for testing. */
|
||||
function mockChildProcess() {
|
||||
const events: Record<string, ((...args: unknown[]) => void)[]> = {};
|
||||
const child = {
|
||||
exitCode: null as number | null,
|
||||
killed: false,
|
||||
pid: 12345,
|
||||
stdout: {
|
||||
on: vi.fn(),
|
||||
// Act as a minimal readable for createInterface
|
||||
[Symbol.asyncIterator]: vi.fn(),
|
||||
},
|
||||
stderr: { on: vi.fn() },
|
||||
on: vi.fn((event: string, cb: (...args: unknown[]) => void) => {
|
||||
events[event] = events[event] || [];
|
||||
events[event].push(cb);
|
||||
return child;
|
||||
}),
|
||||
once: vi.fn((event: string, cb: (...args: unknown[]) => void) => {
|
||||
events[event] = events[event] || [];
|
||||
events[event].push(cb);
|
||||
return child;
|
||||
}),
|
||||
kill: vi.fn(),
|
||||
_emit(event: string, ...args: unknown[]) {
|
||||
for (const cb of events[event] || []) {
|
||||
cb(...args);
|
||||
}
|
||||
},
|
||||
};
|
||||
spawnMock.mockReturnValue(child as unknown as ChildProcess);
|
||||
return child;
|
||||
}
|
||||
|
||||
describe("agent-runner", () => {
|
||||
const originalEnv = { ...process.env };
|
||||
|
||||
beforeEach(() => {
|
||||
vi.resetModules();
|
||||
vi.restoreAllMocks();
|
||||
process.env = { ...originalEnv };
|
||||
// Re-wire mocks after resetModules
|
||||
vi.mock("node:child_process", () => ({ spawn: vi.fn() }));
|
||||
vi.mock("node:fs", () => ({ existsSync: vi.fn() }));
|
||||
});
|
||||
|
||||
afterEach(() => {
|
||||
process.env = originalEnv;
|
||||
vi.restoreAllMocks();
|
||||
});
|
||||
|
||||
// ── resolvePackageRoot ──────────────────────────────────────────────
|
||||
|
||||
describe("resolvePackageRoot", () => {
|
||||
it("uses OPENCLAW_ROOT env var when set and valid", async () => {
|
||||
process.env.OPENCLAW_ROOT = "/opt/ironclaw";
|
||||
const { existsSync: mockExists } = await import("node:fs");
|
||||
vi.mocked(mockExists).mockImplementation(
|
||||
(p) => String(p) === "/opt/ironclaw",
|
||||
);
|
||||
|
||||
const { resolvePackageRoot } = await import("./agent-runner.js");
|
||||
expect(resolvePackageRoot()).toBe("/opt/ironclaw");
|
||||
});
|
||||
|
||||
it("ignores OPENCLAW_ROOT when the path does not exist", async () => {
|
||||
process.env.OPENCLAW_ROOT = "/nonexistent/path";
|
||||
|
||||
const { existsSync: mockExists } = await import("node:fs");
|
||||
// OPENCLAW_ROOT doesn't exist, but we'll find openclaw.mjs by walking up
|
||||
vi.mocked(mockExists).mockImplementation((p) => {
|
||||
return String(p) === join("/pkg", "openclaw.mjs");
|
||||
});
|
||||
|
||||
vi.spyOn(process, "cwd").mockReturnValue("/pkg/apps/web");
|
||||
|
||||
const { resolvePackageRoot } = await import("./agent-runner.js");
|
||||
expect(resolvePackageRoot()).toBe("/pkg");
|
||||
});
|
||||
|
||||
it("finds package root via openclaw.mjs in production (standalone cwd)", async () => {
|
||||
delete process.env.OPENCLAW_ROOT;
|
||||
|
||||
const { existsSync: mockExists } = await import("node:fs");
|
||||
vi.mocked(mockExists).mockImplementation((p) => {
|
||||
// Only openclaw.mjs exists at the real package root
|
||||
return String(p) === join("/pkg", "openclaw.mjs");
|
||||
});
|
||||
|
||||
// Standalone mode: cwd is deep inside .next/standalone
|
||||
vi.spyOn(process, "cwd").mockReturnValue(
|
||||
"/pkg/apps/web/.next/standalone/apps/web",
|
||||
);
|
||||
|
||||
const { resolvePackageRoot } = await import("./agent-runner.js");
|
||||
expect(resolvePackageRoot()).toBe("/pkg");
|
||||
});
|
||||
|
||||
it("finds package root via scripts/run-node.mjs in dev workspace", async () => {
|
||||
delete process.env.OPENCLAW_ROOT;
|
||||
|
||||
const { existsSync: mockExists } = await import("node:fs");
|
||||
vi.mocked(mockExists).mockImplementation((p) => {
|
||||
return String(p) === join("/repo", "scripts", "run-node.mjs");
|
||||
});
|
||||
|
||||
vi.spyOn(process, "cwd").mockReturnValue("/repo/apps/web");
|
||||
|
||||
const { resolvePackageRoot } = await import("./agent-runner.js");
|
||||
expect(resolvePackageRoot()).toBe("/repo");
|
||||
});
|
||||
|
||||
it("falls back to legacy 2-levels-up heuristic", async () => {
|
||||
delete process.env.OPENCLAW_ROOT;
|
||||
|
||||
const { existsSync: mockExists } = await import("node:fs");
|
||||
vi.mocked(mockExists).mockReturnValue(false); // nothing found
|
||||
|
||||
vi.spyOn(process, "cwd").mockReturnValue("/unknown/apps/web");
|
||||
|
||||
const { resolvePackageRoot } = await import("./agent-runner.js");
|
||||
expect(resolvePackageRoot()).toBe(
|
||||
join("/unknown/apps/web", "..", ".."),
|
||||
);
|
||||
});
|
||||
});
|
||||
|
||||
// ── spawnAgentProcess ──────────────────────────────────────────────
|
||||
|
||||
describe("spawnAgentProcess", () => {
|
||||
it("uses scripts/run-node.mjs in dev when both scripts exist", async () => {
|
||||
delete process.env.OPENCLAW_ROOT;
|
||||
|
||||
const { existsSync: mockExists } = await import("node:fs");
|
||||
const { spawn: mockSpawn } = await import("node:child_process");
|
||||
|
||||
vi.mocked(mockExists).mockImplementation((p) => {
|
||||
const s = String(p);
|
||||
// Package root found via scripts/run-node.mjs
|
||||
if (s === join("/repo", "scripts", "run-node.mjs")) {return true;}
|
||||
// openclaw.mjs also exists in dev
|
||||
if (s === join("/repo", "openclaw.mjs")) {return true;}
|
||||
return false;
|
||||
});
|
||||
|
||||
vi.spyOn(process, "cwd").mockReturnValue("/repo/apps/web");
|
||||
|
||||
const child = mockChildProcess();
|
||||
vi.mocked(mockSpawn).mockReturnValue(
|
||||
child as unknown as ChildProcess,
|
||||
);
|
||||
|
||||
const { spawnAgentProcess } = await import("./agent-runner.js");
|
||||
spawnAgentProcess("hello");
|
||||
|
||||
expect(vi.mocked(mockSpawn)).toHaveBeenCalledWith(
|
||||
"node",
|
||||
expect.arrayContaining([
|
||||
join("/repo", "scripts", "run-node.mjs"),
|
||||
"agent",
|
||||
"--agent",
|
||||
"main",
|
||||
"--message",
|
||||
"hello",
|
||||
"--stream-json",
|
||||
]),
|
||||
expect.objectContaining({
|
||||
cwd: "/repo",
|
||||
}),
|
||||
);
|
||||
});
|
||||
|
||||
it("falls back to openclaw.mjs in production (standalone)", async () => {
|
||||
process.env.OPENCLAW_ROOT = "/pkg";
|
||||
|
||||
const { existsSync: mockExists } = await import("node:fs");
|
||||
const { spawn: mockSpawn } = await import("node:child_process");
|
||||
|
||||
vi.mocked(mockExists).mockImplementation((p) => {
|
||||
const s = String(p);
|
||||
if (s === "/pkg") {return true;} // OPENCLAW_ROOT valid
|
||||
if (s === join("/pkg", "openclaw.mjs")) {return true;} // prod script
|
||||
// scripts/run-node.mjs does NOT exist (production install)
|
||||
return false;
|
||||
});
|
||||
|
||||
const child = mockChildProcess();
|
||||
vi.mocked(mockSpawn).mockReturnValue(
|
||||
child as unknown as ChildProcess,
|
||||
);
|
||||
|
||||
const { spawnAgentProcess } = await import("./agent-runner.js");
|
||||
spawnAgentProcess("test message");
|
||||
|
||||
expect(vi.mocked(mockSpawn)).toHaveBeenCalledWith(
|
||||
"node",
|
||||
expect.arrayContaining([
|
||||
join("/pkg", "openclaw.mjs"),
|
||||
"agent",
|
||||
"--agent",
|
||||
"main",
|
||||
"--message",
|
||||
"test message",
|
||||
"--stream-json",
|
||||
]),
|
||||
expect.objectContaining({
|
||||
cwd: "/pkg",
|
||||
}),
|
||||
);
|
||||
});
|
||||
|
||||
it("includes session-key and lane args when agentSessionId is set", async () => {
|
||||
process.env.OPENCLAW_ROOT = "/pkg";
|
||||
|
||||
const { existsSync: mockExists } = await import("node:fs");
|
||||
const { spawn: mockSpawn } = await import("node:child_process");
|
||||
|
||||
vi.mocked(mockExists).mockImplementation((p) => {
|
||||
const s = String(p);
|
||||
return s === "/pkg" || s === join("/pkg", "openclaw.mjs");
|
||||
});
|
||||
|
||||
const child = mockChildProcess();
|
||||
vi.mocked(mockSpawn).mockReturnValue(
|
||||
child as unknown as ChildProcess,
|
||||
);
|
||||
|
||||
const { spawnAgentProcess } = await import("./agent-runner.js");
|
||||
spawnAgentProcess("msg", "session-123");
|
||||
|
||||
expect(vi.mocked(mockSpawn)).toHaveBeenCalledWith(
|
||||
"node",
|
||||
expect.arrayContaining([
|
||||
"--session-key",
|
||||
"agent:main:subagent:session-123",
|
||||
"--lane",
|
||||
"subagent",
|
||||
]),
|
||||
expect.anything(),
|
||||
);
|
||||
});
|
||||
});
|
||||
|
||||
// ── parseAgentErrorMessage ──────────────────────────────────────────
|
||||
|
||||
describe("parseAgentErrorMessage", () => {
|
||||
it("extracts message from error field", async () => {
|
||||
const { parseAgentErrorMessage } = await import(
|
||||
"./agent-runner.js"
|
||||
);
|
||||
expect(
|
||||
parseAgentErrorMessage({ error: "something went wrong" }),
|
||||
).toBe("something went wrong");
|
||||
});
|
||||
|
||||
it("extracts message from JSON error body", async () => {
|
||||
const { parseAgentErrorMessage } = await import(
|
||||
"./agent-runner.js"
|
||||
);
|
||||
const result = parseAgentErrorMessage({
|
||||
errorMessage:
|
||||
'402 {"error":{"message":"Insufficient funds"}}',
|
||||
});
|
||||
expect(result).toBe("Insufficient funds");
|
||||
});
|
||||
|
||||
it("returns undefined for empty data", async () => {
|
||||
const { parseAgentErrorMessage } = await import(
|
||||
"./agent-runner.js"
|
||||
);
|
||||
expect(parseAgentErrorMessage(undefined)).toBeUndefined();
|
||||
expect(parseAgentErrorMessage({})).toBeUndefined();
|
||||
});
|
||||
});
|
||||
|
||||
// ── parseErrorFromStderr ───────────────────────────────────────────
|
||||
|
||||
describe("parseErrorFromStderr", () => {
|
||||
it("extracts JSON error message from stderr", async () => {
|
||||
const { parseErrorFromStderr } = await import(
|
||||
"./agent-runner.js"
|
||||
);
|
||||
const stderr = `Some log line\n{"error":{"message":"Rate limit exceeded"}}\n`;
|
||||
expect(parseErrorFromStderr(stderr)).toBe("Rate limit exceeded");
|
||||
});
|
||||
|
||||
it("extracts error line containing 'error' keyword", async () => {
|
||||
const { parseErrorFromStderr } = await import(
|
||||
"./agent-runner.js"
|
||||
);
|
||||
const stderr = "Module not found error: cannot resolve 'next'";
|
||||
expect(parseErrorFromStderr(stderr)).toBeTruthy();
|
||||
});
|
||||
|
||||
it("returns undefined for empty stderr", async () => {
|
||||
const { parseErrorFromStderr } = await import(
|
||||
"./agent-runner.js"
|
||||
);
|
||||
expect(parseErrorFromStderr("")).toBeUndefined();
|
||||
});
|
||||
});
|
||||
});
|
||||
@ -1,6 +1,7 @@
|
||||
import { spawn } from "node:child_process";
|
||||
import { createInterface } from "node:readline";
|
||||
import { join } from "node:path";
|
||||
import { existsSync } from "node:fs";
|
||||
import { dirname, join } from "node:path";
|
||||
|
||||
export type AgentEvent = {
|
||||
event: string;
|
||||
@ -108,20 +109,65 @@ export type RunAgentOptions = {
|
||||
sessionId?: string;
|
||||
};
|
||||
|
||||
/**
|
||||
* Resolve the ironclaw/openclaw package root directory.
|
||||
*
|
||||
* In a dev workspace the cwd is `<repo>/apps/web` and `scripts/run-node.mjs`
|
||||
* exists two levels up. In a production standalone build the cwd is
|
||||
* `<pkg>/apps/web/.next/standalone/apps/web/` — walking two levels up lands
|
||||
* inside the `.next` tree, not at the package root.
|
||||
*
|
||||
* Strategy:
|
||||
* 1. Honour `OPENCLAW_ROOT` env var (set by the gateway when spawning the
|
||||
* standalone server — guaranteed correct).
|
||||
* 2. Walk upward from cwd looking for `openclaw.mjs` (production) or
|
||||
* `scripts/run-node.mjs` (dev).
|
||||
* 3. Fallback: original 2-levels-up heuristic.
|
||||
*/
|
||||
export function resolvePackageRoot(): string {
|
||||
// 1. Env var (fastest, most reliable in standalone mode).
|
||||
if (process.env.OPENCLAW_ROOT && existsSync(process.env.OPENCLAW_ROOT)) {
|
||||
return process.env.OPENCLAW_ROOT;
|
||||
}
|
||||
|
||||
// 2. Walk up from cwd.
|
||||
let dir = process.cwd();
|
||||
for (let i = 0; i < 20; i++) {
|
||||
if (
|
||||
existsSync(join(dir, "openclaw.mjs")) ||
|
||||
existsSync(join(dir, "scripts", "run-node.mjs"))
|
||||
) {
|
||||
return dir;
|
||||
}
|
||||
const parent = dirname(dir);
|
||||
if (parent === dir) {break;}
|
||||
dir = parent;
|
||||
}
|
||||
|
||||
// 3. Fallback: legacy heuristic.
|
||||
const cwd = process.cwd();
|
||||
return cwd.endsWith(join("apps", "web"))
|
||||
? join(cwd, "..", "..")
|
||||
: cwd;
|
||||
}
|
||||
|
||||
/**
|
||||
* Spawn an agent child process and return the ChildProcess handle.
|
||||
* Shared between `runAgent` (legacy callback API) and the ActiveRunManager.
|
||||
*
|
||||
* In a dev workspace uses `scripts/run-node.mjs` (auto-rebuilds TypeScript).
|
||||
* In production / global-install uses `openclaw.mjs` directly (pre-built).
|
||||
*/
|
||||
export function spawnAgentProcess(
|
||||
message: string,
|
||||
agentSessionId?: string,
|
||||
): ReturnType<typeof spawn> {
|
||||
const cwd = process.cwd();
|
||||
const root = cwd.endsWith(join("apps", "web"))
|
||||
? join(cwd, "..", "..")
|
||||
: cwd;
|
||||
const root = resolvePackageRoot();
|
||||
|
||||
const scriptPath = join(root, "scripts", "run-node.mjs");
|
||||
// Dev: scripts/run-node.mjs (auto-rebuild). Prod: openclaw.mjs (pre-built).
|
||||
const devScript = join(root, "scripts", "run-node.mjs");
|
||||
const prodScript = join(root, "openclaw.mjs");
|
||||
const scriptPath = existsSync(devScript) ? devScript : prodScript;
|
||||
|
||||
const args = [
|
||||
scriptPath,
|
||||
@ -226,6 +272,11 @@ export async function runAgent(
|
||||
|
||||
const rl = createInterface({ input: child.stdout! });
|
||||
|
||||
// Prevent unhandled 'error' events when the child process fails
|
||||
// to start (e.g. ENOENT). The child's own 'error' handler below
|
||||
// surfaces the real error to the caller.
|
||||
rl.on("error", () => { /* handled by child error/close */ });
|
||||
|
||||
rl.on("line", (line: string) => {
|
||||
if (!line.trim()) {return;}
|
||||
|
||||
|
||||
@ -1,6 +1,6 @@
|
||||
{
|
||||
"name": "ironclaw",
|
||||
"version": "2026.2.10-1.8",
|
||||
"version": "2026.2.10-1.9",
|
||||
"description": "AI-powered CRM platform with multi-channel agent gateway, DuckDB workspace, and knowledge management",
|
||||
"keywords": [],
|
||||
"license": "MIT",
|
||||
|
||||
@ -224,13 +224,24 @@ export async function startWebAppIfEnabled(
|
||||
// Production: prefer standalone, fall back to legacy, then workspace build.
|
||||
const serverJs = resolveStandaloneServerJs(webAppDir);
|
||||
|
||||
// The web app's agent-runner needs to find openclaw.mjs or
|
||||
// scripts/run-node.mjs to spawn agent processes. In standalone mode
|
||||
// the server's cwd is deep inside .next/standalone/ so we pass the
|
||||
// actual package root via env so it doesn't have to guess.
|
||||
const packageRoot = path.resolve(webAppDir, "..", "..");
|
||||
|
||||
if (fs.existsSync(serverJs)) {
|
||||
// Standalone build found — just run it (npm global install or post-build).
|
||||
log.info(`starting web app (standalone) on port ${port}…`);
|
||||
child = spawn("node", [serverJs], {
|
||||
cwd: path.dirname(serverJs),
|
||||
stdio: "pipe",
|
||||
env: { ...process.env, PORT: String(port), HOSTNAME: "0.0.0.0" },
|
||||
env: {
|
||||
...process.env,
|
||||
PORT: String(port),
|
||||
HOSTNAME: "0.0.0.0",
|
||||
OPENCLAW_ROOT: packageRoot,
|
||||
},
|
||||
});
|
||||
} else if (hasLegacyNextBuild(webAppDir)) {
|
||||
// Legacy build — use `next start` (dev workspace that hasn't rebuilt).
|
||||
@ -253,7 +264,12 @@ export async function startWebAppIfEnabled(
|
||||
child = spawn("node", [serverJs], {
|
||||
cwd: path.dirname(serverJs),
|
||||
stdio: "pipe",
|
||||
env: { ...process.env, PORT: String(port), HOSTNAME: "0.0.0.0" },
|
||||
env: {
|
||||
...process.env,
|
||||
PORT: String(port),
|
||||
HOSTNAME: "0.0.0.0",
|
||||
OPENCLAW_ROOT: packageRoot,
|
||||
},
|
||||
});
|
||||
} else {
|
||||
log.info(`starting web app (production) on port ${port}…`);
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user