From 8fa221554e0d0fcd586c85fff5d2fff8742aad22 Mon Sep 17 00:00:00 2001 From: kumarabhirup Date: Tue, 17 Mar 2026 12:35:26 -0700 Subject: [PATCH] feat(agent-runner): per-session lanes, lifecycle error recovery, and slash command support Use per-session gateway lanes (web:sessionId) so concurrent chat tabs stream independently. Add a 15s recovery window after lifecycle errors to accept continuation runIds. Route slash commands through chat.send RPC and forward chat events to the UI. --- apps/web/lib/agent-runner.test.ts | 167 ++++++++++++++++++++++++++++++ apps/web/lib/agent-runner.ts | 135 ++++++++++++++++++------ 2 files changed, 273 insertions(+), 29 deletions(-) diff --git a/apps/web/lib/agent-runner.test.ts b/apps/web/lib/agent-runner.test.ts index 26d03793787..b158a031afd 100644 --- a/apps/web/lib/agent-runner.test.ts +++ b/apps/web/lib/agent-runner.test.ts @@ -285,6 +285,173 @@ describe("agent-runner", () => { proc.kill("SIGTERM"); }); + it("keeps stream open across lifecycle error and accepts continuation runId", async () => { + const MockWs = installMockWsModule(); + const { spawnAgentProcess } = await import("./agent-runner.js"); + + const proc = spawnAgentProcess("hello", "sess-lifeerr"); + await waitFor(() => MockWs.instances[0]?.methods.includes("agent")); + const ws = MockWs.instances[0]; + + let stdout = ""; + let closed = false; + proc.stdout?.on("data", (chunk: Buffer | string) => { + stdout += chunk.toString(); + }); + proc.on("close", () => { + closed = true; + }); + + ws.emitJson({ + type: "event", + event: "agent", + seq: 1, + payload: { + runId: "r-initial", + sessionKey: "agent:main:web:sess-lifeerr", + stream: "lifecycle", + data: { phase: "error" }, + globalSeq: 1, + ts: Date.now(), + }, + }); + + await new Promise((resolve) => setTimeout(resolve, 40)); + expect(closed).toBe(false); + + ws.emitJson({ + type: "event", + event: "agent", + seq: 2, + payload: { + runId: "r-continuation", + sessionKey: "agent:main:web:sess-lifeerr", + stream: "assistant", + data: { delta: "continued-output" }, + globalSeq: 2, + ts: Date.now(), + }, + }); + + await waitFor(() => stdout.includes("continued-output"), { + attempts: 80, + delayMs: 10, + }); + proc.kill("SIGTERM"); + }); + + it("uses chat.send RPC for slash commands instead of agent", async () => { + const MockWs = installMockWsModule(); + const { spawnAgentProcess } = await import("./agent-runner.js"); + + const proc = spawnAgentProcess("/status", "sess-cmd"); + await waitFor(() => MockWs.instances[0]?.methods.includes("chat.send")); + + const ws = MockWs.instances[0]; + expect(ws.methods).toContain("connect"); + expect(ws.methods).toContain("chat.send"); + expect(ws.methods).not.toContain("agent"); + + const chatSendFrame = ws.requestFrames.find( + (frame) => frame.method === "chat.send", + ); + const params = chatSendFrame?.params as Record; + expect(params.message).toBe("/status"); + expect(params.deliver).toBe(false); + expect(typeof params.idempotencyKey).toBe("string"); + expect((params.idempotencyKey as string).length).toBeGreaterThan(0); + proc.kill("SIGTERM"); + }); + + it("uses agent RPC for regular (non-slash) messages", async () => { + const MockWs = installMockWsModule(); + const { spawnAgentProcess } = await import("./agent-runner.js"); + + const proc = spawnAgentProcess("hello world", "sess-reg"); + await waitFor(() => MockWs.instances[0]?.methods.includes("agent")); + + const ws = MockWs.instances[0]; + expect(ws.methods).toContain("agent"); + expect(ws.methods).not.toContain("chat.send"); + proc.kill("SIGTERM"); + }); + + it("forwards chat final events to stdout for slash commands", async () => { + const MockWs = installMockWsModule(); + const { spawnAgentProcess } = await import("./agent-runner.js"); + + const proc = spawnAgentProcess("/status", "sess-chatfinal"); + await waitFor(() => MockWs.instances[0]?.methods.includes("chat.send")); + const ws = MockWs.instances[0]; + + let stdout = ""; + let closed = false; + proc.stdout?.on("data", (chunk: Buffer | string) => { + stdout += chunk.toString(); + }); + proc.on("close", () => { + closed = true; + }); + + ws.emitJson({ + type: "event", + event: "chat", + seq: 1, + payload: { + state: "final", + message: { + role: "assistant", + content: "Status: all systems go", + }, + sessionKey: "agent:main:web:sess-chatfinal", + globalSeq: 1, + }, + }); + + await waitFor(() => stdout.includes("state"), { + attempts: 80, + delayMs: 10, + }); + const parsed = JSON.parse(stdout.trim().split("\n").pop()!) as Record; + expect(parsed.event).toBe("chat"); + expect((parsed.data as Record).state).toBe("final"); + + await waitFor(() => closed, { attempts: 80, delayMs: 10 }); + }); + + it("does not forward chat events for regular messages in start mode", async () => { + const MockWs = installMockWsModule(); + const { spawnAgentProcess } = await import("./agent-runner.js"); + + const proc = spawnAgentProcess("hello", "sess-nochat"); + await waitFor(() => MockWs.instances[0]?.methods.includes("agent")); + const ws = MockWs.instances[0]; + + let stdout = ""; + proc.stdout?.on("data", (chunk: Buffer | string) => { + stdout += chunk.toString(); + }); + + ws.emitJson({ + type: "event", + event: "chat", + seq: 1, + payload: { + state: "final", + message: { + role: "assistant", + content: "should be ignored", + }, + sessionKey: "agent:main:web:sess-nochat", + globalSeq: 1, + }, + }); + + await new Promise((resolve) => setTimeout(resolve, 40)); + expect(stdout).not.toContain("should be ignored"); + proc.kill("SIGTERM"); + }); + }); describe("spawnAgentSubscribeProcess", () => { diff --git a/apps/web/lib/agent-runner.ts b/apps/web/lib/agent-runner.ts index f7f1e86ca26..10284eb6304 100644 --- a/apps/web/lib/agent-runner.ts +++ b/apps/web/lib/agent-runner.ts @@ -162,6 +162,7 @@ const REQUEST_TIMEOUT_MS = 12_000; const DEFAULT_GATEWAY_CLIENT_CAPS = ["tool-events"]; const SESSIONS_PATCH_RETRY_DELAY_MS = 150; const SESSIONS_PATCH_MAX_ATTEMPTS = 2; +const LIFECYCLE_ERROR_RECOVERY_MS = 15_000; const ED25519_SPKI_PREFIX = Buffer.from("302a300506032b6570032100", "hex"); type AgentSubscribeSupport = "unknown" | "supported" | "unsupported"; @@ -721,6 +722,10 @@ class GatewayProcessHandle private closeScheduled = false; private requestedClose = false; private runId: string | null = null; + private lifecycleErrorCloseTimer: ReturnType | null = null; + private lifecycleErrorRecoveryUntil = 0; + private useChatSend = false; + private receivedAgentEvent = false; constructor(private readonly params: SpawnGatewayProcessParams) { super(); @@ -732,6 +737,7 @@ class GatewayProcessHandle return false; } this.requestedClose = true; + this.clearLifecycleErrorCloseTimer(); this.client?.close(); const closeSignal = typeof signal === "string" ? signal : null; this.finish(0, closeSignal); @@ -772,16 +778,28 @@ class GatewayProcessHandle throw new Error(frameErrorMessage(connectRes)); } - if (this.params.mode === "start") { - // Pre-patch verbose for existing sessions (best-effort; new - // sessions don't exist yet so this may fail — we retry below). - if (this.params.sessionKey) { - await this.ensureFullToolVerbose(this.params.sessionKey); - } + if (this.params.mode === "start") { + // Pre-patch verbose for existing sessions (best-effort; new + // sessions don't exist yet so this may fail — we retry below). + if (this.params.sessionKey) { + await this.ensureFullToolVerbose(this.params.sessionKey); + } - const sessionKey = this.params.sessionKey; - const startRes = await this.client.request("agent", { - message: this.params.message ?? "", + const sessionKey = this.params.sessionKey; + const msg = this.params.message ?? ""; + this.useChatSend = msg.startsWith("/"); + + let startRes: GatewayResFrame; + if (this.useChatSend) { + startRes = await this.client.request("chat.send", { + message: msg, + ...(sessionKey ? { sessionKey } : {}), + idempotencyKey: randomUUID(), + deliver: false, + }); + } else { + startRes = await this.client.request("agent", { + message: msg, idempotencyKey: randomUUID(), ...(sessionKey ? { sessionKey } : {}), deliver: false, @@ -789,20 +807,21 @@ class GatewayProcessHandle lane: this.params.lane ?? "web", timeout: 0, }); - if (!startRes.ok) { - throw new Error(frameErrorMessage(startRes)); - } - const payload = asRecord(startRes.payload); - const runId = - payload && typeof payload.runId === "string" ? payload.runId : null; - this.runId = runId; + } + if (!startRes.ok) { + throw new Error(frameErrorMessage(startRes)); + } + const payload = asRecord(startRes.payload); + const runId = + payload && typeof payload.runId === "string" ? payload.runId : null; + this.runId = runId; - // Retry verbose patch now that the agent RPC has created the - // session. This is the critical path for first-message-in-chat - // where the pre-patch above failed. - if (sessionKey) { - await this.ensureFullToolVerbose(sessionKey); - } + // Retry verbose patch now that the RPC has created the + // session. This is the critical path for first-message-in-chat + // where the pre-patch above failed. + if (sessionKey) { + await this.ensureFullToolVerbose(sessionKey); + } } else { const sessionKey = this.params.sessionKey; if (!sessionKey) { @@ -931,6 +950,7 @@ class GatewayProcessHandle } if (frame.event === "agent") { + this.receivedAgentEvent = true; const payload = asRecord(frame.payload); if (!payload) { return; @@ -942,7 +962,16 @@ class GatewayProcessHandle } const runId = typeof payload.runId === "string" ? payload.runId : undefined; if (this.runId && runId && runId !== this.runId) { - return; + if (Date.now() <= this.lifecycleErrorRecoveryUntil) { + // The gateway can recover from lifecycle/error by creating + // a continuation run with a new runId under the same session. + // During the recovery window, follow that new run so the UI + // doesn't miss trailing events before final termination. + this.runId = runId; + this.clearLifecycleErrorCloseTimer(); + } else { + return; + } } const payloadGlobalSeq = typeof payload.globalSeq === "number" ? payload.globalSeq : undefined; @@ -974,21 +1003,37 @@ class GatewayProcessHandle const stream = typeof payload.stream === "string" ? payload.stream : ""; const data = asRecord(payload.data); const phase = data && typeof data.phase === "string" ? data.phase : ""; + if (!(stream === "lifecycle" && phase === "error")) { + this.clearLifecycleErrorCloseTimer(); + } if ( this.params.mode === "start" && stream === "lifecycle" && - (phase === "end" || phase === "error") + phase === "end" ) { this.scheduleClose(); } + if ( + this.params.mode === "start" && + stream === "lifecycle" && + phase === "error" + ) { + this.armLifecycleErrorCloseTimer(); + } return; } if (frame.event === "chat") { - // Only forward chat frames in subscribe mode. In start mode - // the agent stream already delivers the full assistant text; - // forwarding chat finals would duplicate the output. - if (this.params.mode !== "subscribe") { + // Forward chat frames in subscribe mode unconditionally. + // In start mode, only forward when using chat.send for + // slash commands — the gateway returns command responses + // as chat events rather than agent events. Skip if we + // already received agent events (agent run started) to + // avoid duplicating the assistant text. + const forwardChat = + this.params.mode === "subscribe" || + (this.useChatSend && !this.receivedAgentEvent); + if (!forwardChat) { return; } const payload = asRecord(frame.payload) ?? {}; @@ -1011,6 +1056,15 @@ class GatewayProcessHandle ...(sessionKey ? { sessionKey } : {}), }; (this.stdout as PassThrough).write(`${JSON.stringify(event)}\n`); + + if ( + this.useChatSend && + this.params.mode === "start" && + typeof payload.state === "string" && + payload.state === "final" + ) { + this.scheduleClose(); + } return; } @@ -1036,11 +1090,32 @@ class GatewayProcessHandle }; (this.stdout as PassThrough).write(`${JSON.stringify(event)}\n`); if (this.params.mode === "start") { - this.scheduleClose(); + this.armLifecycleErrorCloseTimer(); } } } + private armLifecycleErrorCloseTimer(): void { + this.lifecycleErrorRecoveryUntil = Date.now() + LIFECYCLE_ERROR_RECOVERY_MS; + this.clearLifecycleErrorCloseTimer(); + this.lifecycleErrorCloseTimer = setTimeout(() => { + this.lifecycleErrorCloseTimer = null; + if (this.finished) { + return; + } + this.scheduleClose(); + }, LIFECYCLE_ERROR_RECOVERY_MS); + } + + private clearLifecycleErrorCloseTimer(): void { + this.lifecycleErrorRecoveryUntil = 0; + if (!this.lifecycleErrorCloseTimer) { + return; + } + clearTimeout(this.lifecycleErrorCloseTimer); + this.lifecycleErrorCloseTimer = null; + } + private scheduleClose(): void { if (this.closeScheduled || this.finished) { return; @@ -1073,6 +1148,7 @@ class GatewayProcessHandle return; } this.finished = true; + this.clearLifecycleErrorCloseTimer(); try { (this.stdout as PassThrough).end(); (this.stderr as PassThrough).end(); @@ -1135,6 +1211,7 @@ export function spawnAgentProcess( message, sessionKey, afterSeq: 0, + lane: agentSessionId ? `web:${agentSessionId}` : "web", }); }