diff --git a/apps/web/lib/active-runs.test.ts b/apps/web/lib/active-runs.test.ts index 41d435a0b42..647c6911c6f 100644 --- a/apps/web/lib/active-runs.test.ts +++ b/apps/web/lib/active-runs.test.ts @@ -5,6 +5,8 @@ import { describe, it, expect, vi, beforeEach, afterEach } from "vitest"; // Mock agent-runner to control spawnAgentProcess vi.mock("./agent-runner", () => ({ spawnAgentProcess: vi.fn(), + spawnAgentSubscribeProcess: vi.fn(), + callGatewayRpc: vi.fn(() => Promise.resolve({ ok: true })), extractToolResult: vi.fn((raw: unknown) => { if (!raw) {return undefined;} if (typeof raw === "string") {return { text: raw };} @@ -92,6 +94,8 @@ describe("active-runs", () => { // Re-wire mocks after resetModules vi.mock("./agent-runner", () => ({ spawnAgentProcess: vi.fn(), + spawnAgentSubscribeProcess: vi.fn(), + callGatewayRpc: vi.fn(() => Promise.resolve({ ok: true })), extractToolResult: vi.fn((raw: unknown) => { if (!raw) {return undefined;} if (typeof raw === "string") {return { text: raw };} @@ -582,6 +586,7 @@ describe("active-runs", () => { describe("abortRun", () => { it("kills a running child process", async () => { const { child, startRun, abortRun } = await setup(); + const { callGatewayRpc } = await import("./agent-runner.js"); startRun({ sessionId: "s-abort", @@ -591,6 +596,11 @@ describe("active-runs", () => { expect(abortRun("s-abort")).toBe(true); expect(child.kill).toHaveBeenCalledWith("SIGTERM"); + expect(vi.mocked(callGatewayRpc)).toHaveBeenCalledWith( + "chat.abort", + { sessionKey: "agent:main:web:s-abort" }, + { timeoutMs: 4_000 }, + ); }); it("returns false for non-running sessions", async () => { @@ -664,6 +674,403 @@ describe("active-runs", () => { expect(completed).toHaveLength(1); }); + + it("aborts runs while waiting for subagents", async () => { + const { startRun, startSubscribeRun, abortRun, getActiveRun } = await setup(); + const { spawnAgentProcess, spawnAgentSubscribeProcess } = await import( + "./agent-runner.js" + ); + const mockRunSpawn = vi.mocked(spawnAgentProcess); + const mockSubscribeSpawn = vi.mocked(spawnAgentSubscribeProcess); + mockRunSpawn.mockReset(); + mockSubscribeSpawn.mockReset(); + + const parentChild = createMockChild(); + const subagentStream = createMockChild(); + const parentSubscribe = createMockChild(); + + mockRunSpawn.mockReturnValue(parentChild as unknown as ChildProcess); + mockSubscribeSpawn + .mockReturnValueOnce(subagentStream as unknown as ChildProcess) + .mockReturnValueOnce(parentSubscribe as unknown as ChildProcess); + + startSubscribeRun({ + sessionKey: "sub:waiting:abort", + parentSessionId: "parent-waiting-abort", + task: "child task", + }); + startRun({ + sessionId: "parent-waiting-abort", + message: "run parent", + agentSessionId: "parent-waiting-abort", + }); + + parentChild.stdout.end(); + await new Promise((r) => setTimeout(r, 0)); + parentChild._emit("close", 0); + + expect(getActiveRun("parent-waiting-abort")?.status).toBe( + "waiting-for-subagents", + ); + expect(abortRun("parent-waiting-abort")).toBe(true); + expect(parentSubscribe.kill).toHaveBeenCalledWith("SIGTERM"); + expect(getActiveRun("parent-waiting-abort")?.status).toBe("error"); + }); + }); + + describe("sendSubagentFollowUp", () => { + it("sends follow-up over gateway RPC", async () => { + const { sendSubagentFollowUp } = await setup(); + const { callGatewayRpc } = await import("./agent-runner.js"); + + expect(sendSubagentFollowUp("session-1", "continue")).toBe(true); + expect(vi.mocked(callGatewayRpc)).toHaveBeenCalledWith( + "agent", + expect.objectContaining({ + sessionKey: "session-1", + message: "continue", + channel: "webchat", + lane: "subagent", + deliver: false, + timeout: 0, + }), + { timeoutMs: 10_000 }, + ); + }); + }); + + describe("subscribe stream restart stability", () => { + it("uses bounded exponential backoff for subscribe-only restarts and resets after first event", async () => { + vi.useFakeTimers(); + try { + const { startSubscribeRun, abortRun } = await setup(); + const { spawnAgentSubscribeProcess } = await import("./agent-runner.js"); + const mockSubscribeSpawn = vi.mocked(spawnAgentSubscribeProcess); + mockSubscribeSpawn.mockReset(); + + const first = createMockChild(); + const second = createMockChild(); + const third = createMockChild(); + const fourth = createMockChild(); + mockSubscribeSpawn + .mockReturnValueOnce(first as unknown as ChildProcess) + .mockReturnValueOnce(second as unknown as ChildProcess) + .mockReturnValueOnce(third as unknown as ChildProcess) + .mockReturnValueOnce(fourth as unknown as ChildProcess); + + startSubscribeRun({ + sessionKey: "sub:retry:one", + parentSessionId: "parent-retry", + task: "retry task", + }); + expect(mockSubscribeSpawn).toHaveBeenCalledTimes(1); + + first._emit("close", 1); + await vi.advanceTimersByTimeAsync(299); + expect(mockSubscribeSpawn).toHaveBeenCalledTimes(1); + await vi.advanceTimersByTimeAsync(1); + expect(mockSubscribeSpawn).toHaveBeenCalledTimes(2); + + second._emit("close", 1); + await vi.advanceTimersByTimeAsync(599); + expect(mockSubscribeSpawn).toHaveBeenCalledTimes(2); + await vi.advanceTimersByTimeAsync(1); + expect(mockSubscribeSpawn).toHaveBeenCalledTimes(3); + + third._writeLine({ + event: "agent", + sessionKey: "sub:retry:one", + stream: "assistant", + data: { delta: "recovered" }, + globalSeq: 1, + }); + await vi.advanceTimersByTimeAsync(0); + + third._emit("close", 1); + await vi.advanceTimersByTimeAsync(299); + expect(mockSubscribeSpawn).toHaveBeenCalledTimes(3); + await vi.advanceTimersByTimeAsync(1); + expect(mockSubscribeSpawn).toHaveBeenCalledTimes(4); + + expect(abortRun("sub:retry:one")).toBe(true); + } finally { + vi.useRealTimers(); + } + }); + + it("retries parent waiting streams with backoff instead of tight loops", async () => { + vi.useFakeTimers(); + try { + const { startRun, startSubscribeRun, getActiveRun, abortRun } = + await setup(); + const { spawnAgentProcess, spawnAgentSubscribeProcess } = await import( + "./agent-runner.js" + ); + const mockRunSpawn = vi.mocked(spawnAgentProcess); + const mockSubscribeSpawn = vi.mocked(spawnAgentSubscribeProcess); + mockRunSpawn.mockReset(); + mockSubscribeSpawn.mockReset(); + + const parentChild = createMockChild(); + const subagentStream = createMockChild(); + const parentSubscribeFirst = createMockChild(); + const parentSubscribeSecond = createMockChild(); + + mockRunSpawn.mockReturnValue(parentChild as unknown as ChildProcess); + mockSubscribeSpawn + .mockReturnValueOnce(subagentStream as unknown as ChildProcess) + .mockReturnValueOnce(parentSubscribeFirst as unknown as ChildProcess) + .mockReturnValueOnce(parentSubscribeSecond as unknown as ChildProcess); + + startSubscribeRun({ + sessionKey: "sub:parent:retry", + parentSessionId: "parent-retry-2", + task: "child task", + }); + startRun({ + sessionId: "parent-retry-2", + message: "run parent", + agentSessionId: "parent-retry-2", + }); + + parentChild.stdout.end(); + await vi.advanceTimersByTimeAsync(0); + parentChild._emit("close", 0); + + expect(getActiveRun("parent-retry-2")?.status).toBe( + "waiting-for-subagents", + ); + expect(mockSubscribeSpawn).toHaveBeenCalledTimes(2); + + parentSubscribeFirst._emit("close", 1); + await vi.advanceTimersByTimeAsync(299); + expect(mockSubscribeSpawn).toHaveBeenCalledTimes(2); + await vi.advanceTimersByTimeAsync(1); + expect(mockSubscribeSpawn).toHaveBeenCalledTimes(3); + + expect(abortRun("parent-retry-2")).toBe(true); + expect(abortRun("sub:parent:retry")).toBe(true); + } finally { + vi.useRealTimers(); + } + }); + + it("streams multiple announce turns while waiting and finalizes after idle reconciliation", async () => { + vi.useFakeTimers(); + try { + const { startRun, startSubscribeRun, subscribeToRun, getActiveRun } = + await setup(); + const { spawnAgentProcess, spawnAgentSubscribeProcess } = await import( + "./agent-runner.js" + ); + const mockRunSpawn = vi.mocked(spawnAgentProcess); + const mockSubscribeSpawn = vi.mocked(spawnAgentSubscribeProcess); + mockRunSpawn.mockReset(); + mockSubscribeSpawn.mockReset(); + + const parentChild = createMockChild(); + const subagentStream = createMockChild(); + const parentSubscribe = createMockChild(); + + mockRunSpawn.mockReturnValue(parentChild as unknown as ChildProcess); + mockSubscribeSpawn + .mockReturnValueOnce(subagentStream as unknown as ChildProcess) + .mockReturnValueOnce(parentSubscribe as unknown as ChildProcess); + + startSubscribeRun({ + sessionKey: "sub:announce:one", + parentSessionId: "parent-announce", + task: "child task", + }); + startRun({ + sessionId: "parent-announce", + message: "run parent", + agentSessionId: "parent-announce", + }); + + const events: SseEvent[] = []; + const completed: boolean[] = []; + subscribeToRun( + "parent-announce", + (event) => { + if (event) { + events.push(event); + } else { + completed.push(true); + } + }, + { replay: false }, + ); + + parentChild.stdout.end(); + await vi.advanceTimersByTimeAsync(0); + parentChild._emit("close", 0); + expect(getActiveRun("parent-announce")?.status).toBe( + "waiting-for-subagents", + ); + + subagentStream._writeLine({ + event: "agent", + sessionKey: "sub:announce:one", + stream: "lifecycle", + data: { phase: "end" }, + globalSeq: 1, + }); + await vi.advanceTimersByTimeAsync(0); + await vi.advanceTimersByTimeAsync(750); + expect(getActiveRun("sub:announce:one")?.status).toBe("completed"); + + parentSubscribe._writeLine({ + event: "chat", + sessionKey: "agent:main:web:parent-announce", + globalSeq: 2, + data: { + state: "final", + message: { + role: "assistant", + content: [{ type: "text", text: "Subagent finished and reported back." }], + }, + }, + }); + await vi.advanceTimersByTimeAsync(0); + + expect( + events.some( + (e) => + e.type === "text-delta" && + typeof e.delta === "string" && + e.delta.includes("Subagent finished and reported back."), + ), + ).toBe(true); + + // A subsequent announce turn should keep the waiting run alive + // by resetting the finalize reconciliation timer. + parentSubscribe._writeLine({ + event: "agent", + sessionKey: "agent:main:web:parent-announce", + stream: "lifecycle", + data: { phase: "start" }, + globalSeq: 3, + }); + await vi.advanceTimersByTimeAsync(0); + await vi.advanceTimersByTimeAsync(4_900); + expect(completed).toHaveLength(0); + expect(getActiveRun("parent-announce")?.status).toBe( + "waiting-for-subagents", + ); + + parentSubscribe._writeLine({ + event: "chat", + sessionKey: "agent:main:web:parent-announce", + globalSeq: 4, + data: { + state: "final", + message: { + role: "assistant", + content: [{ type: "text", text: "Another subagent result delivered." }], + }, + }, + }); + await vi.advanceTimersByTimeAsync(0); + expect( + events.some( + (e) => + e.type === "text-delta" && + typeof e.delta === "string" && + e.delta.includes("Another subagent result delivered."), + ), + ).toBe(true); + + await vi.advanceTimersByTimeAsync(5_000); + expect(completed).toHaveLength(1); + expect(getActiveRun("parent-announce")?.status).toBe("completed"); + } finally { + vi.useRealTimers(); + } + }); + + it("does not spam duplicate waiting-status deltas while already waiting", async () => { + vi.useFakeTimers(); + try { + const { startRun, startSubscribeRun, subscribeToRun, abortRun } = + await setup(); + const { spawnAgentProcess, spawnAgentSubscribeProcess } = await import( + "./agent-runner.js" + ); + const mockRunSpawn = vi.mocked(spawnAgentProcess); + const mockSubscribeSpawn = vi.mocked(spawnAgentSubscribeProcess); + mockRunSpawn.mockReset(); + mockSubscribeSpawn.mockReset(); + + const parentChild = createMockChild(); + const subagentStream = createMockChild(); + const parentSubscribe = createMockChild(); + + mockRunSpawn.mockReturnValue(parentChild as unknown as ChildProcess); + mockSubscribeSpawn + .mockReturnValueOnce(subagentStream as unknown as ChildProcess) + .mockReturnValueOnce(parentSubscribe as unknown as ChildProcess); + + startSubscribeRun({ + sessionKey: "sub:waiting:dedupe", + parentSessionId: "parent-waiting-dedupe", + task: "child task", + }); + startRun({ + sessionId: "parent-waiting-dedupe", + message: "run parent", + agentSessionId: "parent-waiting-dedupe", + }); + + const events: SseEvent[] = []; + subscribeToRun( + "parent-waiting-dedupe", + (event) => { + if (event) { + events.push(event); + } + }, + { replay: false }, + ); + + parentChild.stdout.end(); + await vi.advanceTimersByTimeAsync(0); + parentChild._emit("close", 0); + await vi.advanceTimersByTimeAsync(0); + + const waitingText = "Waiting for subagent results..."; + const waitingCountAfterEnter = events.filter( + (e) => e.type === "reasoning-delta" && e.delta === waitingText, + ).length; + expect(waitingCountAfterEnter).toBe(1); + + parentSubscribe._writeLine({ + event: "agent", + sessionKey: "agent:main:web:parent-waiting-dedupe", + stream: "lifecycle", + data: { phase: "end" }, + globalSeq: 2, + }); + parentSubscribe._writeLine({ + event: "agent", + sessionKey: "agent:main:web:parent-waiting-dedupe", + stream: "lifecycle", + data: { phase: "end" }, + globalSeq: 3, + }); + await vi.advanceTimersByTimeAsync(0); + + const waitingCountFinal = events.filter( + (e) => e.type === "reasoning-delta" && e.delta === waitingText, + ).length; + expect(waitingCountFinal).toBe(1); + + expect(abortRun("parent-waiting-dedupe")).toBe(true); + expect(abortRun("sub:waiting:dedupe")).toBe(true); + } finally { + vi.useRealTimers(); + } + }); }); // ── duplicate run prevention ──────────────────────────────────────