test(web): add subagent-runs tests

This commit is contained in:
kumarabhirup 2026-03-02 18:33:38 -08:00
parent 2723352028
commit 910d0521e2
No known key found for this signature in database
GPG Key ID: DB7CA2289CAB0167

View File

@ -38,14 +38,84 @@ vi.mock("node:os", () => ({
}));
vi.mock("node:readline", () => ({
createInterface: vi.fn(() => ({
createInterface: vi.fn(() => {
const handlers: Record<string, ((...args: unknown[]) => void)[]> = {};
const iface = {
on: vi.fn((event: string, cb: (...args: unknown[]) => void) => {
handlers[event] = handlers[event] || [];
handlers[event].push(cb);
return iface;
}),
close: vi.fn(),
_emit: (event: string, ...args: unknown[]) => {
for (const cb of handlers[event] || []) {
cb(...args);
}
},
};
return iface;
}),
}));
vi.mock("./agent-runner", () => ({
spawnAgentSubscribeProcess: vi.fn(() => ({
stdout: { on: vi.fn() },
stderr: { on: vi.fn() },
on: vi.fn(),
close: vi.fn(),
once: vi.fn(),
kill: vi.fn(),
pid: 12345,
})),
callGatewayRpc: vi.fn(() => Promise.resolve({ ok: true })),
extractToolResult: vi.fn((raw: unknown) => {
if (!raw) {return undefined;}
if (typeof raw === "string") {return { text: raw };}
return { text: undefined, details: raw as Record<string, unknown> };
}),
buildToolOutput: vi.fn((result?: { text?: string }) =>
result ? { text: result.text } : {},
),
parseAgentErrorMessage: vi.fn((data?: Record<string, unknown>) => {
if (data?.error && typeof data.error === "string") {return data.error;}
if (data?.message && typeof data.message === "string") {return data.message;}
return undefined;
}),
parseErrorBody: vi.fn((raw: string) => raw),
parseErrorFromStderr: vi.fn((stderr: string) => {
if (!stderr) {return undefined;}
if (/error/i.test(stderr)) {return stderr.trim();}
return undefined;
}),
}));
import { appendFileSync } from "node:fs";
function createMockSubscribeProcess() {
const handlers: Record<string, ((...args: unknown[]) => void)[]> = {};
const proc = {
stdout: { on: vi.fn() },
stderr: { on: vi.fn() },
on: vi.fn((event: string, cb: (...args: unknown[]) => void) => {
handlers[event] = handlers[event] || [];
handlers[event].push(cb);
return proc;
}),
once: vi.fn((event: string, cb: (...args: unknown[]) => void) => {
handlers[event] = handlers[event] || [];
handlers[event].push(cb);
return proc;
}),
kill: vi.fn(),
pid: 12345,
_emit(event: string, ...args: unknown[]) {
for (const cb of handlers[event] || []) {
cb(...args);
}
},
};
return proc;
}
// Shared global key used by subagent-runs.ts for its singleton registry
const GLOBAL_KEY = "__openclaw_subagentRuns";
@ -99,10 +169,53 @@ describe("subagent runs", () => {
homedir: vi.fn(() => "/home/testuser"),
}));
vi.mock("node:readline", () => ({
createInterface: vi.fn(() => ({
createInterface: vi.fn(() => {
const handlers: Record<string, ((...args: unknown[]) => void)[]> = {};
const iface = {
on: vi.fn((event: string, cb: (...args: unknown[]) => void) => {
handlers[event] = handlers[event] || [];
handlers[event].push(cb);
return iface;
}),
close: vi.fn(),
_emit: (event: string, ...args: unknown[]) => {
for (const cb of handlers[event] || []) {
cb(...args);
}
},
};
return iface;
}),
}));
vi.mock("./agent-runner", () => ({
spawnAgentSubscribeProcess: vi.fn(() => ({
stdout: { on: vi.fn() },
stderr: { on: vi.fn() },
on: vi.fn(),
close: vi.fn(),
once: vi.fn(),
kill: vi.fn(),
pid: 12345,
})),
callGatewayRpc: vi.fn(() => Promise.resolve({ ok: true })),
extractToolResult: vi.fn((raw: unknown) => {
if (!raw) {return undefined;}
if (typeof raw === "string") {return { text: raw };}
return { text: undefined, details: raw as Record<string, unknown> };
}),
buildToolOutput: vi.fn((result?: { text?: string }) =>
result ? { text: result.text } : {},
),
parseAgentErrorMessage: vi.fn((data?: Record<string, unknown>) => {
if (data?.error && typeof data.error === "string") {return data.error;}
if (data?.message && typeof data.message === "string") {return data.message;}
return undefined;
}),
parseErrorBody: vi.fn((raw: string) => raw),
parseErrorFromStderr: vi.fn((stderr: string) => {
if (!stderr) {return undefined;}
if (/error/i.test(stderr)) {return stderr.trim();}
return undefined;
}),
}));
});
@ -192,6 +305,135 @@ describe("subagent runs", () => {
});
});
describe("subscribe restart stability", () => {
it("applies exponential backoff and resets after a recovered stream event", async () => {
vi.useFakeTimers();
try {
const { spawnAgentSubscribeProcess } = await import("./agent-runner.js");
const { createInterface } = await import("node:readline");
const mockSubscribeSpawn = vi.mocked(spawnAgentSubscribeProcess);
mockSubscribeSpawn.mockReset();
const first = createMockSubscribeProcess();
const second = createMockSubscribeProcess();
const third = createMockSubscribeProcess();
const fourth = createMockSubscribeProcess();
mockSubscribeSpawn
.mockReturnValueOnce(first as never)
.mockReturnValueOnce(second as never)
.mockReturnValueOnce(third as never)
.mockReturnValueOnce(fourth as never);
const { registerSubagent } = await importSubagentRuns();
registerSubagent("parent-stable", {
sessionKey: "sub:stable:c1",
runId: "run-stable",
task: "retry stream",
});
expect(mockSubscribeSpawn).toHaveBeenCalledTimes(1);
first._emit("close", 1);
await vi.advanceTimersByTimeAsync(299);
expect(mockSubscribeSpawn).toHaveBeenCalledTimes(1);
await vi.advanceTimersByTimeAsync(1);
expect(mockSubscribeSpawn).toHaveBeenCalledTimes(2);
second._emit("close", 1);
await vi.advanceTimersByTimeAsync(599);
expect(mockSubscribeSpawn).toHaveBeenCalledTimes(2);
await vi.advanceTimersByTimeAsync(1);
expect(mockSubscribeSpawn).toHaveBeenCalledTimes(3);
const createInterfaceMock = vi.mocked(createInterface);
const thirdInterface = createInterfaceMock.mock.results.at(-1)
?.value as { _emit: (event: string, payload: string) => void };
thirdInterface?._emit("line", JSON.stringify({
event: "agent",
sessionKey: "sub:stable:c1",
stream: "assistant",
data: { delta: "recovered" },
globalSeq: 1,
}));
await vi.advanceTimersByTimeAsync(0);
third._emit("close", 1);
await vi.advanceTimersByTimeAsync(299);
expect(mockSubscribeSpawn).toHaveBeenCalledTimes(3);
await vi.advanceTimersByTimeAsync(1);
expect(mockSubscribeSpawn).toHaveBeenCalledTimes(4);
} finally {
vi.useRealTimers();
}
});
});
describe("event parity safeguards", () => {
it("emits tool input/output events from subscribed lines", async () => {
const { createInterface } = await import("node:readline");
const { registerSubagent, subscribeToSubagent } = await importSubagentRuns();
registerSubagent("parent-tools", {
sessionKey: "sub:tools:c1",
runId: "run-tools",
task: "tool parity",
});
const received: Array<Record<string, unknown>> = [];
subscribeToSubagent(
"sub:tools:c1",
(event) => {
if (event) {
received.push(event as Record<string, unknown>);
}
},
{ replay: false },
);
const createInterfaceMock = vi.mocked(createInterface);
const iface = createInterfaceMock.mock.results.at(-1)
?.value as { _emit: (event: string, payload: string) => void };
iface._emit("line", JSON.stringify({
event: "agent",
sessionKey: "sub:tools:c1",
stream: "tool",
data: {
phase: "start",
toolCallId: "tc-tools-1",
name: "bash",
args: { command: "echo hello" },
},
globalSeq: 1,
}));
iface._emit("line", JSON.stringify({
event: "agent",
sessionKey: "sub:tools:c1",
stream: "tool",
data: {
phase: "result",
toolCallId: "tc-tools-1",
result: { text: "hello" },
},
globalSeq: 2,
}));
expect(
received.some(
(e) =>
e.type === "tool-input-start" &&
e.toolCallId === "tc-tools-1",
),
).toBe(true);
expect(
received.some(
(e) =>
e.type === "tool-output-available" &&
e.toolCallId === "tc-tools-1",
),
).toBe(true);
});
});
// ─── getSubagentsForSession ───────────────────────────────────────
describe("getSubagentsForSession", () => {
@ -501,5 +743,50 @@ describe("subagent runs", () => {
const { abortSubagent } = await importSubagentRuns();
expect(abortSubagent("unknown")).toBe(false);
});
it("aborts known subagent via gateway RPC", async () => {
const { callGatewayRpc } = await import("./agent-runner.js");
const { registerSubagent, abortSubagent } = await importSubagentRuns();
registerSubagent("parent-1", {
sessionKey: "sub:p:c1",
runId: "run-1",
task: "task",
});
expect(abortSubagent("sub:p:c1")).toBe(true);
expect(vi.mocked(callGatewayRpc)).toHaveBeenCalledWith(
"chat.abort",
{ sessionKey: "sub:p:c1" },
{ timeoutMs: 4_000 },
);
});
});
describe("spawnSubagentMessage", () => {
it("sends follow-up messages via gateway RPC", async () => {
const { callGatewayRpc } = await import("./agent-runner.js");
const { registerSubagent, spawnSubagentMessage } = await importSubagentRuns();
registerSubagent("parent-1", {
sessionKey: "sub:p:c1",
runId: "run-1",
task: "task",
});
expect(spawnSubagentMessage("sub:p:c1", "continue")).toBe(true);
expect(vi.mocked(callGatewayRpc)).toHaveBeenCalledWith(
"agent",
expect.objectContaining({
sessionKey: "sub:p:c1",
message: "continue",
channel: "webchat",
lane: "subagent",
deliver: false,
timeout: 0,
}),
{ timeoutMs: 10_000 },
);
});
});
});