test(web): add active-runs tests with retry and lifecycle coverage
This commit is contained in:
parent
0d219413a8
commit
14feec10af
@ -5,6 +5,8 @@ import { describe, it, expect, vi, beforeEach, afterEach } from "vitest";
|
||||
// Mock agent-runner to control spawnAgentProcess
|
||||
vi.mock("./agent-runner", () => ({
|
||||
spawnAgentProcess: vi.fn(),
|
||||
spawnAgentSubscribeProcess: vi.fn(),
|
||||
callGatewayRpc: vi.fn(() => Promise.resolve({ ok: true })),
|
||||
extractToolResult: vi.fn((raw: unknown) => {
|
||||
if (!raw) {return undefined;}
|
||||
if (typeof raw === "string") {return { text: raw };}
|
||||
@ -92,6 +94,8 @@ describe("active-runs", () => {
|
||||
// Re-wire mocks after resetModules
|
||||
vi.mock("./agent-runner", () => ({
|
||||
spawnAgentProcess: vi.fn(),
|
||||
spawnAgentSubscribeProcess: vi.fn(),
|
||||
callGatewayRpc: vi.fn(() => Promise.resolve({ ok: true })),
|
||||
extractToolResult: vi.fn((raw: unknown) => {
|
||||
if (!raw) {return undefined;}
|
||||
if (typeof raw === "string") {return { text: raw };}
|
||||
@ -582,6 +586,7 @@ describe("active-runs", () => {
|
||||
describe("abortRun", () => {
|
||||
it("kills a running child process", async () => {
|
||||
const { child, startRun, abortRun } = await setup();
|
||||
const { callGatewayRpc } = await import("./agent-runner.js");
|
||||
|
||||
startRun({
|
||||
sessionId: "s-abort",
|
||||
@ -591,6 +596,11 @@ describe("active-runs", () => {
|
||||
|
||||
expect(abortRun("s-abort")).toBe(true);
|
||||
expect(child.kill).toHaveBeenCalledWith("SIGTERM");
|
||||
expect(vi.mocked(callGatewayRpc)).toHaveBeenCalledWith(
|
||||
"chat.abort",
|
||||
{ sessionKey: "agent:main:web:s-abort" },
|
||||
{ timeoutMs: 4_000 },
|
||||
);
|
||||
});
|
||||
|
||||
it("returns false for non-running sessions", async () => {
|
||||
@ -664,6 +674,403 @@ describe("active-runs", () => {
|
||||
|
||||
expect(completed).toHaveLength(1);
|
||||
});
|
||||
|
||||
it("aborts runs while waiting for subagents", async () => {
|
||||
const { startRun, startSubscribeRun, abortRun, getActiveRun } = await setup();
|
||||
const { spawnAgentProcess, spawnAgentSubscribeProcess } = await import(
|
||||
"./agent-runner.js"
|
||||
);
|
||||
const mockRunSpawn = vi.mocked(spawnAgentProcess);
|
||||
const mockSubscribeSpawn = vi.mocked(spawnAgentSubscribeProcess);
|
||||
mockRunSpawn.mockReset();
|
||||
mockSubscribeSpawn.mockReset();
|
||||
|
||||
const parentChild = createMockChild();
|
||||
const subagentStream = createMockChild();
|
||||
const parentSubscribe = createMockChild();
|
||||
|
||||
mockRunSpawn.mockReturnValue(parentChild as unknown as ChildProcess);
|
||||
mockSubscribeSpawn
|
||||
.mockReturnValueOnce(subagentStream as unknown as ChildProcess)
|
||||
.mockReturnValueOnce(parentSubscribe as unknown as ChildProcess);
|
||||
|
||||
startSubscribeRun({
|
||||
sessionKey: "sub:waiting:abort",
|
||||
parentSessionId: "parent-waiting-abort",
|
||||
task: "child task",
|
||||
});
|
||||
startRun({
|
||||
sessionId: "parent-waiting-abort",
|
||||
message: "run parent",
|
||||
agentSessionId: "parent-waiting-abort",
|
||||
});
|
||||
|
||||
parentChild.stdout.end();
|
||||
await new Promise((r) => setTimeout(r, 0));
|
||||
parentChild._emit("close", 0);
|
||||
|
||||
expect(getActiveRun("parent-waiting-abort")?.status).toBe(
|
||||
"waiting-for-subagents",
|
||||
);
|
||||
expect(abortRun("parent-waiting-abort")).toBe(true);
|
||||
expect(parentSubscribe.kill).toHaveBeenCalledWith("SIGTERM");
|
||||
expect(getActiveRun("parent-waiting-abort")?.status).toBe("error");
|
||||
});
|
||||
});
|
||||
|
||||
describe("sendSubagentFollowUp", () => {
|
||||
it("sends follow-up over gateway RPC", async () => {
|
||||
const { sendSubagentFollowUp } = await setup();
|
||||
const { callGatewayRpc } = await import("./agent-runner.js");
|
||||
|
||||
expect(sendSubagentFollowUp("session-1", "continue")).toBe(true);
|
||||
expect(vi.mocked(callGatewayRpc)).toHaveBeenCalledWith(
|
||||
"agent",
|
||||
expect.objectContaining({
|
||||
sessionKey: "session-1",
|
||||
message: "continue",
|
||||
channel: "webchat",
|
||||
lane: "subagent",
|
||||
deliver: false,
|
||||
timeout: 0,
|
||||
}),
|
||||
{ timeoutMs: 10_000 },
|
||||
);
|
||||
});
|
||||
});
|
||||
|
||||
describe("subscribe stream restart stability", () => {
|
||||
it("uses bounded exponential backoff for subscribe-only restarts and resets after first event", async () => {
|
||||
vi.useFakeTimers();
|
||||
try {
|
||||
const { startSubscribeRun, abortRun } = await setup();
|
||||
const { spawnAgentSubscribeProcess } = await import("./agent-runner.js");
|
||||
const mockSubscribeSpawn = vi.mocked(spawnAgentSubscribeProcess);
|
||||
mockSubscribeSpawn.mockReset();
|
||||
|
||||
const first = createMockChild();
|
||||
const second = createMockChild();
|
||||
const third = createMockChild();
|
||||
const fourth = createMockChild();
|
||||
mockSubscribeSpawn
|
||||
.mockReturnValueOnce(first as unknown as ChildProcess)
|
||||
.mockReturnValueOnce(second as unknown as ChildProcess)
|
||||
.mockReturnValueOnce(third as unknown as ChildProcess)
|
||||
.mockReturnValueOnce(fourth as unknown as ChildProcess);
|
||||
|
||||
startSubscribeRun({
|
||||
sessionKey: "sub:retry:one",
|
||||
parentSessionId: "parent-retry",
|
||||
task: "retry task",
|
||||
});
|
||||
expect(mockSubscribeSpawn).toHaveBeenCalledTimes(1);
|
||||
|
||||
first._emit("close", 1);
|
||||
await vi.advanceTimersByTimeAsync(299);
|
||||
expect(mockSubscribeSpawn).toHaveBeenCalledTimes(1);
|
||||
await vi.advanceTimersByTimeAsync(1);
|
||||
expect(mockSubscribeSpawn).toHaveBeenCalledTimes(2);
|
||||
|
||||
second._emit("close", 1);
|
||||
await vi.advanceTimersByTimeAsync(599);
|
||||
expect(mockSubscribeSpawn).toHaveBeenCalledTimes(2);
|
||||
await vi.advanceTimersByTimeAsync(1);
|
||||
expect(mockSubscribeSpawn).toHaveBeenCalledTimes(3);
|
||||
|
||||
third._writeLine({
|
||||
event: "agent",
|
||||
sessionKey: "sub:retry:one",
|
||||
stream: "assistant",
|
||||
data: { delta: "recovered" },
|
||||
globalSeq: 1,
|
||||
});
|
||||
await vi.advanceTimersByTimeAsync(0);
|
||||
|
||||
third._emit("close", 1);
|
||||
await vi.advanceTimersByTimeAsync(299);
|
||||
expect(mockSubscribeSpawn).toHaveBeenCalledTimes(3);
|
||||
await vi.advanceTimersByTimeAsync(1);
|
||||
expect(mockSubscribeSpawn).toHaveBeenCalledTimes(4);
|
||||
|
||||
expect(abortRun("sub:retry:one")).toBe(true);
|
||||
} finally {
|
||||
vi.useRealTimers();
|
||||
}
|
||||
});
|
||||
|
||||
it("retries parent waiting streams with backoff instead of tight loops", async () => {
|
||||
vi.useFakeTimers();
|
||||
try {
|
||||
const { startRun, startSubscribeRun, getActiveRun, abortRun } =
|
||||
await setup();
|
||||
const { spawnAgentProcess, spawnAgentSubscribeProcess } = await import(
|
||||
"./agent-runner.js"
|
||||
);
|
||||
const mockRunSpawn = vi.mocked(spawnAgentProcess);
|
||||
const mockSubscribeSpawn = vi.mocked(spawnAgentSubscribeProcess);
|
||||
mockRunSpawn.mockReset();
|
||||
mockSubscribeSpawn.mockReset();
|
||||
|
||||
const parentChild = createMockChild();
|
||||
const subagentStream = createMockChild();
|
||||
const parentSubscribeFirst = createMockChild();
|
||||
const parentSubscribeSecond = createMockChild();
|
||||
|
||||
mockRunSpawn.mockReturnValue(parentChild as unknown as ChildProcess);
|
||||
mockSubscribeSpawn
|
||||
.mockReturnValueOnce(subagentStream as unknown as ChildProcess)
|
||||
.mockReturnValueOnce(parentSubscribeFirst as unknown as ChildProcess)
|
||||
.mockReturnValueOnce(parentSubscribeSecond as unknown as ChildProcess);
|
||||
|
||||
startSubscribeRun({
|
||||
sessionKey: "sub:parent:retry",
|
||||
parentSessionId: "parent-retry-2",
|
||||
task: "child task",
|
||||
});
|
||||
startRun({
|
||||
sessionId: "parent-retry-2",
|
||||
message: "run parent",
|
||||
agentSessionId: "parent-retry-2",
|
||||
});
|
||||
|
||||
parentChild.stdout.end();
|
||||
await vi.advanceTimersByTimeAsync(0);
|
||||
parentChild._emit("close", 0);
|
||||
|
||||
expect(getActiveRun("parent-retry-2")?.status).toBe(
|
||||
"waiting-for-subagents",
|
||||
);
|
||||
expect(mockSubscribeSpawn).toHaveBeenCalledTimes(2);
|
||||
|
||||
parentSubscribeFirst._emit("close", 1);
|
||||
await vi.advanceTimersByTimeAsync(299);
|
||||
expect(mockSubscribeSpawn).toHaveBeenCalledTimes(2);
|
||||
await vi.advanceTimersByTimeAsync(1);
|
||||
expect(mockSubscribeSpawn).toHaveBeenCalledTimes(3);
|
||||
|
||||
expect(abortRun("parent-retry-2")).toBe(true);
|
||||
expect(abortRun("sub:parent:retry")).toBe(true);
|
||||
} finally {
|
||||
vi.useRealTimers();
|
||||
}
|
||||
});
|
||||
|
||||
it("streams multiple announce turns while waiting and finalizes after idle reconciliation", async () => {
|
||||
vi.useFakeTimers();
|
||||
try {
|
||||
const { startRun, startSubscribeRun, subscribeToRun, getActiveRun } =
|
||||
await setup();
|
||||
const { spawnAgentProcess, spawnAgentSubscribeProcess } = await import(
|
||||
"./agent-runner.js"
|
||||
);
|
||||
const mockRunSpawn = vi.mocked(spawnAgentProcess);
|
||||
const mockSubscribeSpawn = vi.mocked(spawnAgentSubscribeProcess);
|
||||
mockRunSpawn.mockReset();
|
||||
mockSubscribeSpawn.mockReset();
|
||||
|
||||
const parentChild = createMockChild();
|
||||
const subagentStream = createMockChild();
|
||||
const parentSubscribe = createMockChild();
|
||||
|
||||
mockRunSpawn.mockReturnValue(parentChild as unknown as ChildProcess);
|
||||
mockSubscribeSpawn
|
||||
.mockReturnValueOnce(subagentStream as unknown as ChildProcess)
|
||||
.mockReturnValueOnce(parentSubscribe as unknown as ChildProcess);
|
||||
|
||||
startSubscribeRun({
|
||||
sessionKey: "sub:announce:one",
|
||||
parentSessionId: "parent-announce",
|
||||
task: "child task",
|
||||
});
|
||||
startRun({
|
||||
sessionId: "parent-announce",
|
||||
message: "run parent",
|
||||
agentSessionId: "parent-announce",
|
||||
});
|
||||
|
||||
const events: SseEvent[] = [];
|
||||
const completed: boolean[] = [];
|
||||
subscribeToRun(
|
||||
"parent-announce",
|
||||
(event) => {
|
||||
if (event) {
|
||||
events.push(event);
|
||||
} else {
|
||||
completed.push(true);
|
||||
}
|
||||
},
|
||||
{ replay: false },
|
||||
);
|
||||
|
||||
parentChild.stdout.end();
|
||||
await vi.advanceTimersByTimeAsync(0);
|
||||
parentChild._emit("close", 0);
|
||||
expect(getActiveRun("parent-announce")?.status).toBe(
|
||||
"waiting-for-subagents",
|
||||
);
|
||||
|
||||
subagentStream._writeLine({
|
||||
event: "agent",
|
||||
sessionKey: "sub:announce:one",
|
||||
stream: "lifecycle",
|
||||
data: { phase: "end" },
|
||||
globalSeq: 1,
|
||||
});
|
||||
await vi.advanceTimersByTimeAsync(0);
|
||||
await vi.advanceTimersByTimeAsync(750);
|
||||
expect(getActiveRun("sub:announce:one")?.status).toBe("completed");
|
||||
|
||||
parentSubscribe._writeLine({
|
||||
event: "chat",
|
||||
sessionKey: "agent:main:web:parent-announce",
|
||||
globalSeq: 2,
|
||||
data: {
|
||||
state: "final",
|
||||
message: {
|
||||
role: "assistant",
|
||||
content: [{ type: "text", text: "Subagent finished and reported back." }],
|
||||
},
|
||||
},
|
||||
});
|
||||
await vi.advanceTimersByTimeAsync(0);
|
||||
|
||||
expect(
|
||||
events.some(
|
||||
(e) =>
|
||||
e.type === "text-delta" &&
|
||||
typeof e.delta === "string" &&
|
||||
e.delta.includes("Subagent finished and reported back."),
|
||||
),
|
||||
).toBe(true);
|
||||
|
||||
// A subsequent announce turn should keep the waiting run alive
|
||||
// by resetting the finalize reconciliation timer.
|
||||
parentSubscribe._writeLine({
|
||||
event: "agent",
|
||||
sessionKey: "agent:main:web:parent-announce",
|
||||
stream: "lifecycle",
|
||||
data: { phase: "start" },
|
||||
globalSeq: 3,
|
||||
});
|
||||
await vi.advanceTimersByTimeAsync(0);
|
||||
await vi.advanceTimersByTimeAsync(4_900);
|
||||
expect(completed).toHaveLength(0);
|
||||
expect(getActiveRun("parent-announce")?.status).toBe(
|
||||
"waiting-for-subagents",
|
||||
);
|
||||
|
||||
parentSubscribe._writeLine({
|
||||
event: "chat",
|
||||
sessionKey: "agent:main:web:parent-announce",
|
||||
globalSeq: 4,
|
||||
data: {
|
||||
state: "final",
|
||||
message: {
|
||||
role: "assistant",
|
||||
content: [{ type: "text", text: "Another subagent result delivered." }],
|
||||
},
|
||||
},
|
||||
});
|
||||
await vi.advanceTimersByTimeAsync(0);
|
||||
expect(
|
||||
events.some(
|
||||
(e) =>
|
||||
e.type === "text-delta" &&
|
||||
typeof e.delta === "string" &&
|
||||
e.delta.includes("Another subagent result delivered."),
|
||||
),
|
||||
).toBe(true);
|
||||
|
||||
await vi.advanceTimersByTimeAsync(5_000);
|
||||
expect(completed).toHaveLength(1);
|
||||
expect(getActiveRun("parent-announce")?.status).toBe("completed");
|
||||
} finally {
|
||||
vi.useRealTimers();
|
||||
}
|
||||
});
|
||||
|
||||
it("does not spam duplicate waiting-status deltas while already waiting", async () => {
|
||||
vi.useFakeTimers();
|
||||
try {
|
||||
const { startRun, startSubscribeRun, subscribeToRun, abortRun } =
|
||||
await setup();
|
||||
const { spawnAgentProcess, spawnAgentSubscribeProcess } = await import(
|
||||
"./agent-runner.js"
|
||||
);
|
||||
const mockRunSpawn = vi.mocked(spawnAgentProcess);
|
||||
const mockSubscribeSpawn = vi.mocked(spawnAgentSubscribeProcess);
|
||||
mockRunSpawn.mockReset();
|
||||
mockSubscribeSpawn.mockReset();
|
||||
|
||||
const parentChild = createMockChild();
|
||||
const subagentStream = createMockChild();
|
||||
const parentSubscribe = createMockChild();
|
||||
|
||||
mockRunSpawn.mockReturnValue(parentChild as unknown as ChildProcess);
|
||||
mockSubscribeSpawn
|
||||
.mockReturnValueOnce(subagentStream as unknown as ChildProcess)
|
||||
.mockReturnValueOnce(parentSubscribe as unknown as ChildProcess);
|
||||
|
||||
startSubscribeRun({
|
||||
sessionKey: "sub:waiting:dedupe",
|
||||
parentSessionId: "parent-waiting-dedupe",
|
||||
task: "child task",
|
||||
});
|
||||
startRun({
|
||||
sessionId: "parent-waiting-dedupe",
|
||||
message: "run parent",
|
||||
agentSessionId: "parent-waiting-dedupe",
|
||||
});
|
||||
|
||||
const events: SseEvent[] = [];
|
||||
subscribeToRun(
|
||||
"parent-waiting-dedupe",
|
||||
(event) => {
|
||||
if (event) {
|
||||
events.push(event);
|
||||
}
|
||||
},
|
||||
{ replay: false },
|
||||
);
|
||||
|
||||
parentChild.stdout.end();
|
||||
await vi.advanceTimersByTimeAsync(0);
|
||||
parentChild._emit("close", 0);
|
||||
await vi.advanceTimersByTimeAsync(0);
|
||||
|
||||
const waitingText = "Waiting for subagent results...";
|
||||
const waitingCountAfterEnter = events.filter(
|
||||
(e) => e.type === "reasoning-delta" && e.delta === waitingText,
|
||||
).length;
|
||||
expect(waitingCountAfterEnter).toBe(1);
|
||||
|
||||
parentSubscribe._writeLine({
|
||||
event: "agent",
|
||||
sessionKey: "agent:main:web:parent-waiting-dedupe",
|
||||
stream: "lifecycle",
|
||||
data: { phase: "end" },
|
||||
globalSeq: 2,
|
||||
});
|
||||
parentSubscribe._writeLine({
|
||||
event: "agent",
|
||||
sessionKey: "agent:main:web:parent-waiting-dedupe",
|
||||
stream: "lifecycle",
|
||||
data: { phase: "end" },
|
||||
globalSeq: 3,
|
||||
});
|
||||
await vi.advanceTimersByTimeAsync(0);
|
||||
|
||||
const waitingCountFinal = events.filter(
|
||||
(e) => e.type === "reasoning-delta" && e.delta === waitingText,
|
||||
).length;
|
||||
expect(waitingCountFinal).toBe(1);
|
||||
|
||||
expect(abortRun("parent-waiting-dedupe")).toBe(true);
|
||||
expect(abortRun("sub:waiting:dedupe")).toBe(true);
|
||||
} finally {
|
||||
vi.useRealTimers();
|
||||
}
|
||||
});
|
||||
});
|
||||
|
||||
// ── duplicate run prevention ──────────────────────────────────────
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user