diff --git a/src/gateway/chat-abort.test.ts b/src/gateway/chat-abort.test.ts index f3aff5ebfe5..5458ec5b84a 100644 --- a/src/gateway/chat-abort.test.ts +++ b/src/gateway/chat-abort.test.ts @@ -5,6 +5,7 @@ import { type ChatAbortOps, type ChatAbortControllerEntry, } from "./chat-abort.js"; +import { createChatRunState } from "./server-chat.js"; function createActiveEntry(sessionKey: string): ChatAbortControllerEntry { const now = Date.now(); @@ -21,21 +22,25 @@ function createOps(params: { runId: string; entry: ChatAbortControllerEntry; buffer?: string; + chatRunState?: ReturnType; }): ChatAbortOps & { broadcast: ReturnType; nodeSendToSession: ReturnType; removeChatRun: ReturnType; } { - const { runId, entry, buffer } = params; + const { runId, entry, buffer, chatRunState = createChatRunState() } = params; const broadcast = vi.fn(); const nodeSendToSession = vi.fn(); const removeChatRun = vi.fn(); + if (buffer !== undefined) { + chatRunState.buffers.set(runId, buffer); + } + chatRunState.deltaSentAt.set(runId, Date.now()); return { chatAbortControllers: new Map([[runId, entry]]), - chatRunBuffers: new Map(buffer !== undefined ? [[runId, buffer]] : []), - chatDeltaSentAt: new Map([[runId, Date.now()]]), - chatAbortedRuns: new Map(), + chatAbortedRuns: chatRunState.abortedRuns, + chatRunState, removeChatRun, agentRunSeq: new Map(), broadcast, @@ -76,8 +81,8 @@ describe("abortChatRunById", () => { expect(result).toEqual({ aborted: true }); expect(entry.controller.signal.aborted).toBe(true); expect(ops.chatAbortControllers.has(runId)).toBe(false); - expect(ops.chatRunBuffers.has(runId)).toBe(false); - expect(ops.chatDeltaSentAt.has(runId)).toBe(false); + expect(ops.chatRunState.buffers.has(runId)).toBe(false); + expect(ops.chatRunState.deltaSentAt.has(runId)).toBe(false); expect(ops.removeChatRun).toHaveBeenCalledWith(runId, runId, sessionKey); expect(ops.agentRunSeq.has(runId)).toBe(false); expect(ops.agentRunSeq.has("client-run-1")).toBe(false); @@ -124,7 +129,7 @@ describe("abortChatRunById", () => { // Simulate synchronous cleanup triggered by AbortController listeners. entry.controller.signal.addEventListener("abort", () => { - ops.chatRunBuffers.delete(runId); + ops.chatRunState.buffers.delete(runId); }); const result = abortChatRunById(ops, { runId, sessionKey }); @@ -138,4 +143,28 @@ describe("abortChatRunById", () => { }), ); }); + + it("clears effective-run recovery state so a reused key starts fresh after abort teardown", () => { + const runId = "retry-run"; + const sessionKey = "main"; + const entry = createActiveEntry(sessionKey); + const chatRunState = createChatRunState(); + chatRunState.buffers.set(runId, "Hello"); + chatRunState.lastSeenEventSeq.set(runId, 3); + chatRunState.lastAcceptedSeq.set(runId, 1); + chatRunState.waitingForRecovery.add(runId); + chatRunState.deltaSentAt.set(runId, 10); + chatRunState.deltaLastBroadcastLen.set(runId, 5); + const ops = createOps({ runId, entry, buffer: "Hello", chatRunState }); + + const result = abortChatRunById(ops, { runId, sessionKey }); + + expect(result).toEqual({ aborted: true }); + expect(chatRunState.buffers.has(runId)).toBe(false); + expect(chatRunState.lastSeenEventSeq.has(runId)).toBe(false); + expect(chatRunState.lastAcceptedSeq.has(runId)).toBe(false); + expect(chatRunState.waitingForRecovery.has(runId)).toBe(false); + expect(chatRunState.deltaSentAt.has(runId)).toBe(false); + expect(chatRunState.deltaLastBroadcastLen.has(runId)).toBe(false); + }); }); diff --git a/src/gateway/chat-abort.ts b/src/gateway/chat-abort.ts index 4be479153f6..bd6d932160e 100644 --- a/src/gateway/chat-abort.ts +++ b/src/gateway/chat-abort.ts @@ -1,4 +1,5 @@ import { isAbortRequestText } from "../auto-reply/reply/abort.js"; +import { clearEffectiveChatRunState, type EffectiveChatRunStateSlice } from "./server-chat.js"; export type ChatAbortControllerEntry = { controller: AbortController; @@ -31,9 +32,8 @@ export function resolveChatRunExpiresAtMs(params: { export type ChatAbortOps = { chatAbortControllers: Map; - chatRunBuffers: Map; - chatDeltaSentAt: Map; chatAbortedRuns: Map; + chatRunState: EffectiveChatRunStateSlice; removeChatRun: ( sessionId: string, clientRunId: string, @@ -89,18 +89,18 @@ export function abortChatRunById( return { aborted: false }; } - const bufferedText = ops.chatRunBuffers.get(runId); + const bufferedText = ops.chatRunState.buffers.get(runId); const partialText = bufferedText && bufferedText.trim() ? bufferedText : undefined; ops.chatAbortedRuns.set(runId, Date.now()); active.controller.abort(); ops.chatAbortControllers.delete(runId); - ops.chatRunBuffers.delete(runId); - ops.chatDeltaSentAt.delete(runId); + clearEffectiveChatRunState(ops.chatRunState, runId); const removed = ops.removeChatRun(runId, runId, sessionKey); broadcastChatAborted(ops, { runId, sessionKey, stopReason, partialText }); ops.agentRunSeq.delete(runId); if (removed?.clientRunId) { ops.agentRunSeq.delete(removed.clientRunId); + clearEffectiveChatRunState(ops.chatRunState, removed.clientRunId); } return { aborted: true }; } diff --git a/src/gateway/server-chat.agent-events.test.ts b/src/gateway/server-chat.agent-events.test.ts index dd644955afc..c8aefcbf4a1 100644 --- a/src/gateway/server-chat.agent-events.test.ts +++ b/src/gateway/server-chat.agent-events.test.ts @@ -14,7 +14,9 @@ vi.mock("./session-lifecycle-state.js", async (importOriginal) => { }; }); +import { abortChatRunById } from "./chat-abort.js"; import { + clearEffectiveChatRunState, createAgentEventHandler, createChatRunState, createSessionEventSubscriberRegistry, @@ -136,6 +138,59 @@ describe("agent event handler", () => { }); } + function emitLifecycleStart( + handler: ReturnType["handler"], + runId: string, + seq = 1, + ) { + handler({ + runId, + seq, + stream: "lifecycle", + ts: Date.now(), + data: { phase: "start" }, + }); + } + + function emitAssistantText(params: { + handler: ReturnType["handler"]; + runId: string; + seq: number; + text: string; + delta?: string; + }) { + params.handler({ + runId: params.runId, + seq: params.seq, + stream: "assistant", + ts: Date.now(), + data: { + text: params.text, + ...(params.delta === undefined ? {} : { delta: params.delta }), + }, + }); + } + + function emitToolStart(params: { + handler: ReturnType["handler"]; + runId: string; + seq: number; + name?: string; + toolCallId?: string; + }) { + params.handler({ + runId: params.runId, + seq: params.seq, + stream: "tool", + ts: Date.now(), + data: { + phase: "start", + name: params.name ?? "read", + toolCallId: params.toolCallId ?? `tool-${String(params.seq)}`, + }, + }); + } + function emitFallbackLifecycle(params: { handler: ReturnType["handler"]; runId: string; @@ -191,6 +246,30 @@ describe("agent event handler", () => { nowSpy?.mockRestore(); }); + it("ignores an initial assistant text + delta event when delta is not the full first chunk", () => { + const { broadcast, nodeSendToSession, chatRunState, handler, nowSpy } = createHarness({ + now: 1_000, + }); + chatRunState.registry.add("run-ambiguous-first", { + sessionKey: "session-ambiguous-first", + clientRunId: "client-ambiguous-first", + }); + + emitAssistantText({ + handler, + runId: "run-ambiguous-first", + seq: 1, + text: "Hello world", + delta: " world", + }); + + expect(chatRunState.buffers.has("client-ambiguous-first")).toBe(false); + expect(chatRunState.lastAcceptedSeq.has("client-ambiguous-first")).toBe(false); + expect(chatBroadcastCalls(broadcast)).toHaveLength(0); + expect(sessionChatCalls(nodeSendToSession)).toHaveLength(0); + nowSpy?.mockRestore(); + }); + it("strips inline directives from assistant chat events", () => { const { broadcast, nodeSendToSession, nowSpy } = emitRun1AssistantText( createHarness({ now: 1_000 }), @@ -243,16 +322,18 @@ describe("agent event handler", () => { }); chatRunState.registry.add("run-3", { sessionKey: "session-3", clientRunId: "client-3" }); + let seq = 1; for (const text of ["NO", "NO_", "NO_RE", "NO_REPLY"]) { handler({ runId: "run-3", - seq: 1, + seq, stream: "assistant", ts: Date.now(), data: { text }, }); + seq += 1; } - emitLifecycleEnd(handler, "run-3"); + emitLifecycleEnd(handler, "run-3", seq); const payload = expectSingleFinalChatPayload(broadcast) as { message?: unknown }; expect(payload.message).toBeUndefined(); @@ -303,7 +384,7 @@ describe("agent event handler", () => { now = 10_100; handler({ runId: "run-flush", - seq: 1, + seq: 2, stream: "assistant", ts: Date.now(), data: { text: "Hello world" }, @@ -327,7 +408,77 @@ describe("agent event handler", () => { nowSpy.mockRestore(); }); - it("preserves pre-tool assistant text when later segments stream as non-prefix snapshots", () => { + it("flushes a same-length corrective snapshot before tool start after throttle suppression", () => { + let now = 10_250; + const nowSpy = vi.spyOn(Date, "now").mockImplementation(() => now); + const { + broadcast, + broadcastToConnIds, + nodeSendToSession, + chatRunState, + toolEventRecipients, + handler, + } = createHarness({ + resolveSessionKeyForRun: () => "session-same-length-correction", + }); + + chatRunState.registry.add("run-same-length-correction", { + sessionKey: "session-same-length-correction", + clientRunId: "client-same-length-correction", + }); + registerAgentRunContext("run-same-length-correction", { + sessionKey: "session-same-length-correction", + verboseLevel: "off", + }); + toolEventRecipients.add("run-same-length-correction", "conn-1"); + + handler({ + runId: "run-same-length-correction", + seq: 1, + stream: "assistant", + ts: Date.now(), + data: { text: "Hello world" }, + }); + + now = 10_320; + handler({ + runId: "run-same-length-correction", + seq: 2, + stream: "assistant", + ts: Date.now(), + data: { text: "Hello there" }, + }); + + now = 10_500; + handler({ + runId: "run-same-length-correction", + seq: 3, + stream: "tool", + ts: Date.now(), + data: { + phase: "start", + name: "read", + toolCallId: "tool-same-length-correction", + }, + }); + + emitLifecycleEnd(handler, "run-same-length-correction", 4); + + const chatCalls = chatBroadcastCalls(broadcast); + expect(chatCalls).toHaveLength(3); + const payloadTexts = chatCalls + .map( + ([, payload]) => + payload as { state?: string; message?: { content?: Array<{ text?: string }> } }, + ) + .map((payload) => payload.message?.content?.[0]?.text); + expect(payloadTexts).toEqual(["Hello world", "Hello there", "Hello there"]); + expect(sessionChatCalls(nodeSendToSession)).toHaveLength(3); + expect(broadcastToConnIds).toHaveBeenCalled(); + nowSpy.mockRestore(); + }); + + it("drops ambiguous non-prefix assistant chunks instead of appending them", () => { let now = 10_500; const nowSpy = vi.spyOn(Date, "now").mockImplementation(() => now); const { broadcast, nodeSendToSession, chatRunState, handler } = createHarness(); @@ -356,24 +507,18 @@ describe("agent event handler", () => { emitLifecycleEnd(handler, "run-segmented", 3); const chatCalls = chatBroadcastCalls(broadcast); - expect(chatCalls).toHaveLength(3); - const secondPayload = chatCalls[1]?.[1] as { + expect(chatCalls).toHaveLength(2); + const finalPayload = chatCalls[1]?.[1] as { state?: string; message?: { content?: Array<{ text?: string }> }; }; - const finalPayload = chatCalls[2]?.[1] as { - state?: string; - message?: { content?: Array<{ text?: string }> }; - }; - expect(secondPayload.state).toBe("delta"); - expect(secondPayload.message?.content?.[0]?.text).toBe("Before tool call\nAfter tool call"); expect(finalPayload.state).toBe("final"); - expect(finalPayload.message?.content?.[0]?.text).toBe("Before tool call\nAfter tool call"); - expect(sessionChatCalls(nodeSendToSession)).toHaveLength(3); + expect(finalPayload.message?.content?.[0]?.text).toBe("Before tool call"); + expect(sessionChatCalls(nodeSendToSession)).toHaveLength(2); nowSpy.mockRestore(); }); - it("flushes merged segmented text before final when latest segment is throttled", () => { + it("does not flush ambiguous non-prefix assistant chunks before final", () => { let now = 10_800; const nowSpy = vi.spyOn(Date, "now").mockImplementation(() => now); const { broadcast, nodeSendToSession, chatRunState, handler } = createHarness(); @@ -402,20 +547,14 @@ describe("agent event handler", () => { emitLifecycleEnd(handler, "run-segmented-flush", 3); const chatCalls = chatBroadcastCalls(broadcast); - expect(chatCalls).toHaveLength(3); - const flushPayload = chatCalls[1]?.[1] as { + expect(chatCalls).toHaveLength(2); + const finalPayload = chatCalls[1]?.[1] as { state?: string; message?: { content?: Array<{ text?: string }> }; }; - const finalPayload = chatCalls[2]?.[1] as { - state?: string; - message?: { content?: Array<{ text?: string }> }; - }; - expect(flushPayload.state).toBe("delta"); - expect(flushPayload.message?.content?.[0]?.text).toBe("Before tool call\nAfter tool call"); expect(finalPayload.state).toBe("final"); - expect(finalPayload.message?.content?.[0]?.text).toBe("Before tool call\nAfter tool call"); - expect(sessionChatCalls(nodeSendToSession)).toHaveLength(3); + expect(finalPayload.message?.content?.[0]?.text).toBe("Before tool call"); + expect(sessionChatCalls(nodeSendToSession)).toHaveLength(2); nowSpy.mockRestore(); }); @@ -439,7 +578,7 @@ describe("agent event handler", () => { now = 11_200; handler({ runId: "run-no-dup-flush", - seq: 1, + seq: 2, stream: "assistant", ts: Date.now(), data: { text: "Hello world" }, @@ -458,6 +597,740 @@ describe("agent event handler", () => { nowSpy.mockRestore(); }); + it("ignores duplicate seq replay instead of regrowing the visible buffer", () => { + let now = 11_300; + const nowSpy = vi.spyOn(Date, "now").mockImplementation(() => now); + const { broadcast, nodeSendToSession, chatRunState, handler } = createHarness(); + chatRunState.registry.add("run-replay", { + sessionKey: "session-replay", + clientRunId: "client-replay", + }); + + emitAssistantText({ + handler, + runId: "run-replay", + seq: 1, + text: "Hello", + }); + + now = 11_500; + emitAssistantText({ + handler, + runId: "run-replay", + seq: 1, + text: "HelloHello", + }); + + emitLifecycleEnd(handler, "run-replay", 2); + + const chatCalls = chatBroadcastCalls(broadcast); + expect(chatCalls).toHaveLength(2); + const finalPayload = chatCalls[1]?.[1] as { + state?: string; + message?: { content?: Array<{ text?: string }> }; + }; + expect(finalPayload.state).toBe("final"); + expect(finalPayload.message?.content?.[0]?.text).toBe("Hello"); + expect(sessionChatCalls(nodeSendToSession)).toHaveLength(2); + nowSpy.mockRestore(); + }); + + it("replaces with a non-prefix full snapshot instead of appending it", () => { + let now = 11_700; + const nowSpy = vi.spyOn(Date, "now").mockImplementation(() => now); + const { broadcast, nodeSendToSession, chatRunState, handler } = createHarness(); + chatRunState.registry.add("run-replace", { + sessionKey: "session-replace", + clientRunId: "client-replace", + }); + + emitAssistantText({ + handler, + runId: "run-replace", + seq: 1, + text: "Draft answer", + }); + + now = 11_900; + emitAssistantText({ + handler, + runId: "run-replace", + seq: 2, + text: "Final rewritten answer", + }); + + emitLifecycleEnd(handler, "run-replace", 3); + + const chatCalls = chatBroadcastCalls(broadcast); + expect(chatCalls).toHaveLength(3); + const secondPayload = chatCalls[1]?.[1] as { + state?: string; + message?: { content?: Array<{ text?: string }> }; + }; + const finalPayload = chatCalls[2]?.[1] as { + state?: string; + message?: { content?: Array<{ text?: string }> }; + }; + expect(secondPayload.message?.content?.[0]?.text).toBe("Final rewritten answer"); + expect(finalPayload.message?.content?.[0]?.text).toBe("Final rewritten answer"); + expect(sessionChatCalls(nodeSendToSession)).toHaveLength(3); + nowSpy.mockRestore(); + }); + + it("enters recovery on seq gap and ignores ordinary assistant deltas until a full replacement arrives", () => { + let now = 12_100; + const nowSpy = vi.spyOn(Date, "now").mockImplementation(() => now); + const { broadcast, chatRunState, handler } = createHarness(); + chatRunState.registry.add("run-gap", { + sessionKey: "session-gap", + clientRunId: "client-gap", + }); + + emitAssistantText({ + handler, + runId: "run-gap", + seq: 1, + text: "Hello", + }); + expect(chatRunState.waitingForRecovery.has("client-gap")).toBe(false); + + now = 12_300; + emitAssistantText({ + handler, + runId: "run-gap", + seq: 3, + text: "", + delta: " world", + }); + expect(chatRunState.waitingForRecovery.has("client-gap")).toBe(true); + expect(chatRunState.buffers.get("client-gap")).toBe("Hello"); + + now = 12_500; + emitAssistantText({ + handler, + runId: "run-gap", + seq: 4, + text: "", + delta: "!", + }); + expect(chatRunState.buffers.get("client-gap")).toBe("Hello"); + + emitLifecycleEnd(handler, "run-gap", 5); + + const chatCalls = chatBroadcastCalls(broadcast); + expect(chatCalls).toHaveLength(2); + const finalPayload = chatCalls[1]?.[1] as { + message?: { content?: Array<{ text?: string }> }; + }; + expect(finalPayload.message?.content?.[0]?.text).toBe("Hello"); + nowSpy.mockRestore(); + }); + + it("does not shrink the buffer on an ordinary shorter full snapshot merge", () => { + let now = 12_550; + const nowSpy = vi.spyOn(Date, "now").mockImplementation(() => now); + const { broadcast, chatRunState, handler } = createHarness(); + chatRunState.registry.add("run-no-shrink", { + sessionKey: "session-no-shrink", + clientRunId: "client-no-shrink", + }); + + emitAssistantText({ + handler, + runId: "run-no-shrink", + seq: 1, + text: "Hello world", + }); + + now = 12_750; + emitAssistantText({ + handler, + runId: "run-no-shrink", + seq: 2, + text: "Hello", + }); + + expect(chatRunState.buffers.get("client-no-shrink")).toBe("Hello world"); + + emitLifecycleEnd(handler, "run-no-shrink", 3); + + const payloadTexts = chatBroadcastCalls(broadcast) + .map(([, payload]) => payload as { message?: { content?: Array<{ text?: string }> } }) + .map((payload) => payload.message?.content?.[0]?.text); + expect(payloadTexts).toEqual(["Hello world", "Hello world"]); + nowSpy.mockRestore(); + }); + + it("does not advance lastAcceptedSeq for ignored in-order snapshots, so same-seq replay can still recover", () => { + let now = 12_575; + const nowSpy = vi.spyOn(Date, "now").mockImplementation(() => now); + const { broadcast, chatRunState, handler } = createHarness(); + chatRunState.registry.add("run-replay-after-ignore", { + sessionKey: "session-replay-after-ignore", + clientRunId: "client-replay-after-ignore", + }); + + emitAssistantText({ + handler, + runId: "run-replay-after-ignore", + seq: 1, + text: "Hello world", + }); + expect(chatRunState.lastAcceptedSeq.get("client-replay-after-ignore")).toBe(1); + + now = 12_775; + emitAssistantText({ + handler, + runId: "run-replay-after-ignore", + seq: 2, + text: "Hello", + }); + + expect(chatRunState.buffers.get("client-replay-after-ignore")).toBe("Hello world"); + expect(chatRunState.lastAcceptedSeq.get("client-replay-after-ignore")).toBe(1); + + now = 12_975; + emitAssistantText({ + handler, + runId: "run-replay-after-ignore", + seq: 2, + text: "Hello world!", + }); + + expect(chatRunState.buffers.get("client-replay-after-ignore")).toBe("Hello world!"); + expect(chatRunState.lastAcceptedSeq.get("client-replay-after-ignore")).toBe(2); + + emitLifecycleEnd(handler, "run-replay-after-ignore", 3); + + const payloadTexts = chatBroadcastCalls(broadcast) + .map(([, payload]) => payload as { message?: { content?: Array<{ text?: string }> } }) + .map((payload) => payload.message?.content?.[0]?.text); + expect(payloadTexts).toEqual(["Hello world", "Hello world!", "Hello world!"]); + nowSpy.mockRestore(); + }); + + it("replaces stale buffer with a shorter recognized full snapshot while recovering", () => { + let now = 12_800; + const nowSpy = vi.spyOn(Date, "now").mockImplementation(() => now); + const { broadcast, chatRunState, handler } = createHarness(); + chatRunState.registry.add("run-recover-shorter", { + sessionKey: "session-recover-shorter", + clientRunId: "client-recover-shorter", + }); + + emitAssistantText({ + handler, + runId: "run-recover-shorter", + seq: 1, + text: "Hello world", + }); + + now = 13_000; + emitAssistantText({ + handler, + runId: "run-recover-shorter", + seq: 3, + text: "", + delta: "!", + }); + expect(chatRunState.waitingForRecovery.has("client-recover-shorter")).toBe(true); + + now = 13_200; + emitAssistantText({ + handler, + runId: "run-recover-shorter", + seq: 4, + text: "Hello", + }); + + expect(chatRunState.waitingForRecovery.has("client-recover-shorter")).toBe(false); + expect(chatRunState.buffers.get("client-recover-shorter")).toBe("Hello"); + + emitLifecycleEnd(handler, "run-recover-shorter", 5); + + const payloadTexts = chatBroadcastCalls(broadcast) + .map(([, payload]) => payload as { message?: { content?: Array<{ text?: string }> } }) + .map((payload) => payload.message?.content?.[0]?.text); + expect(payloadTexts).toEqual(["Hello world", "Hello", "Hello"]); + nowSpy.mockRestore(); + }); + + it("recovers from a missed first assistant chunk when the next ACP snapshot is cumulative", () => { + let now = 12_850; + const nowSpy = vi.spyOn(Date, "now").mockImplementation(() => now); + const { broadcast, chatRunState, handler } = createHarness(); + chatRunState.registry.add("run-first-gap", { + sessionKey: "session-first-gap", + clientRunId: "client-first-gap", + }); + + emitLifecycleStart(handler, "run-first-gap", 1); + + now = 13_050; + emitAssistantText({ + handler, + runId: "run-first-gap", + seq: 3, + text: "Hello world", + delta: " world", + }); + + expect(chatRunState.waitingForRecovery.has("client-first-gap")).toBe(false); + expect(chatRunState.buffers.get("client-first-gap")).toBe("Hello world"); + + emitLifecycleEnd(handler, "run-first-gap", 4); + + const payloadTexts = chatBroadcastCalls(broadcast) + .map(([, payload]) => payload as { message?: { content?: Array<{ text?: string }> } }) + .map((payload) => payload.message?.content?.[0]?.text); + expect(payloadTexts).toEqual(["Hello world", "Hello world"]); + nowSpy.mockRestore(); + }); + + it("does not treat the first assistant text after lifecycle start as a gap", () => { + let now = 12_600; + const nowSpy = vi.spyOn(Date, "now").mockImplementation(() => now); + const { broadcast, chatRunState, handler } = createHarness(); + chatRunState.registry.add("run-start-gap", { + sessionKey: "session-start-gap", + clientRunId: "client-start-gap", + }); + + emitLifecycleStart(handler, "run-start-gap", 1); + + now = 12_800; + emitAssistantText({ + handler, + runId: "run-start-gap", + seq: 2, + text: "Hello from start", + }); + + expect(chatRunState.waitingForRecovery.has("client-start-gap")).toBe(false); + expect(chatRunState.buffers.get("client-start-gap")).toBe("Hello from start"); + + emitLifecycleEnd(handler, "run-start-gap", 3); + + const chatCalls = chatBroadcastCalls(broadcast); + expect(chatCalls).toHaveLength(2); + const payloadTexts = chatCalls + .map(([, payload]) => payload as { message?: { content?: Array<{ text?: string }> } }) + .map((payload) => payload.message?.content?.[0]?.text); + expect(payloadTexts).toEqual(["Hello from start", "Hello from start"]); + nowSpy.mockRestore(); + }); + + it("does not treat seen tool and lifecycle events between assistant updates as a chat gap", () => { + let now = 12_900; + const nowSpy = vi.spyOn(Date, "now").mockImplementation(() => now); + const { broadcast, chatRunState, handler } = createHarness(); + chatRunState.registry.add("run-interleaved", { + sessionKey: "session-interleaved", + clientRunId: "client-interleaved", + }); + + emitAssistantText({ + handler, + runId: "run-interleaved", + seq: 1, + text: "Hello", + }); + + now = 13_100; + emitToolStart({ + handler, + runId: "run-interleaved", + seq: 2, + toolCallId: "tool-interleaved", + }); + + now = 13_300; + emitFallbackLifecycle({ + handler, + runId: "run-interleaved", + seq: 3, + }); + + now = 13_500; + emitAssistantText({ + handler, + runId: "run-interleaved", + seq: 4, + text: "", + delta: " world", + }); + + expect(chatRunState.waitingForRecovery.has("client-interleaved")).toBe(false); + expect(chatRunState.buffers.get("client-interleaved")).toBe("Hello world"); + + emitLifecycleEnd(handler, "run-interleaved", 5); + + const chatCalls = chatBroadcastCalls(broadcast); + expect(chatCalls).toHaveLength(3); + const payloadTexts = chatCalls + .map(([, payload]) => payload as { message?: { content?: Array<{ text?: string }> } }) + .map((payload) => payload.message?.content?.[0]?.text); + expect(payloadTexts).toEqual(["Hello", "Hello world", "Hello world"]); + nowSpy.mockRestore(); + }); + + it("drops assistant chunks older than highest seen seq and waits for a safe recovery snapshot", () => { + let now = 13_650; + const nowSpy = vi.spyOn(Date, "now").mockImplementation(() => now); + const { broadcast, chatRunState, handler } = createHarness(); + chatRunState.registry.add("run-delayed-older", { + sessionKey: "session-delayed-older", + clientRunId: "client-delayed-older", + }); + + emitAssistantText({ + handler, + runId: "run-delayed-older", + seq: 1, + text: "Hello", + }); + + now = 13_850; + emitToolStart({ + handler, + runId: "run-delayed-older", + seq: 3, + toolCallId: "tool-delayed-older", + }); + + now = 14_050; + emitAssistantText({ + handler, + runId: "run-delayed-older", + seq: 2, + text: "Hello world", + }); + + expect(chatRunState.waitingForRecovery.has("client-delayed-older")).toBe(true); + expect(chatRunState.buffers.get("client-delayed-older")).toBe("Hello"); + expect(chatRunState.lastAcceptedSeq.get("client-delayed-older")).toBe(1); + + now = 14_250; + emitAssistantText({ + handler, + runId: "run-delayed-older", + seq: 4, + text: "Hello world!", + }); + + expect(chatRunState.waitingForRecovery.has("client-delayed-older")).toBe(false); + expect(chatRunState.buffers.get("client-delayed-older")).toBe("Hello world!"); + expect(chatRunState.lastAcceptedSeq.get("client-delayed-older")).toBe(4); + + emitLifecycleEnd(handler, "run-delayed-older", 5); + + const payloadTexts = chatBroadcastCalls(broadcast) + .map(([, payload]) => payload as { message?: { content?: Array<{ text?: string }> } }) + .map((payload) => payload.message?.content?.[0]?.text); + expect(payloadTexts).toEqual(["Hello", "Hello world!", "Hello world!"]); + nowSpy.mockRestore(); + }); + + it("recovers from a seq gap with a cumulative full text + delta replacement", () => { + let now = 12_700; + const nowSpy = vi.spyOn(Date, "now").mockImplementation(() => now); + const { broadcast, chatRunState, handler } = createHarness(); + chatRunState.registry.add("run-recover", { + sessionKey: "session-recover", + clientRunId: "client-recover", + }); + + emitAssistantText({ + handler, + runId: "run-recover", + seq: 1, + text: "Hello", + }); + + now = 12_900; + emitAssistantText({ + handler, + runId: "run-recover", + seq: 3, + text: "", + delta: " world", + }); + expect(chatRunState.waitingForRecovery.has("client-recover")).toBe(true); + + now = 13_100; + emitAssistantText({ + handler, + runId: "run-recover", + seq: 4, + text: "Hello world", + delta: " world", + }); + expect(chatRunState.waitingForRecovery.has("client-recover")).toBe(false); + expect(chatRunState.lastAcceptedSeq.get("client-recover")).toBe(4); + + now = 13_300; + emitAssistantText({ + handler, + runId: "run-recover", + seq: 5, + text: "Hello world!", + }); + + emitLifecycleEnd(handler, "run-recover", 6); + + const chatCalls = chatBroadcastCalls(broadcast); + expect(chatCalls).toHaveLength(4); + const payloadTexts = chatCalls + .slice(0, 3) + .map( + ([, payload]) => (payload as { message?: { content?: Array<{ text?: string }> } }).message, + ) + .map((message) => message?.content?.[0]?.text); + expect(payloadTexts).toEqual(["Hello", "Hello world", "Hello world!"]); + const finalPayload = chatCalls[3]?.[1] as { + state?: string; + message?: { content?: Array<{ text?: string }> }; + }; + expect(finalPayload.state).toBe("final"); + expect(finalPayload.message?.content?.[0]?.text).toBe("Hello world!"); + nowSpy.mockRestore(); + }); + + it("does not advance lastAcceptedSeq for same-text recovery snapshots, so same-seq replay can still be accepted", () => { + let now = 13_320; + const nowSpy = vi.spyOn(Date, "now").mockImplementation(() => now); + const { broadcast, chatRunState, handler } = createHarness(); + chatRunState.registry.add("run-recovery-replay", { + sessionKey: "session-recovery-replay", + clientRunId: "client-recovery-replay", + }); + + emitAssistantText({ + handler, + runId: "run-recovery-replay", + seq: 1, + text: "Hello", + }); + expect(chatRunState.lastAcceptedSeq.get("client-recovery-replay")).toBe(1); + + now = 13_520; + emitAssistantText({ + handler, + runId: "run-recovery-replay", + seq: 3, + text: "", + delta: " world", + }); + expect(chatRunState.waitingForRecovery.has("client-recovery-replay")).toBe(true); + + now = 13_720; + emitAssistantText({ + handler, + runId: "run-recovery-replay", + seq: 4, + text: "Hello", + }); + + expect(chatRunState.waitingForRecovery.has("client-recovery-replay")).toBe(false); + expect(chatRunState.buffers.get("client-recovery-replay")).toBe("Hello"); + expect(chatRunState.lastAcceptedSeq.get("client-recovery-replay")).toBe(1); + + now = 13_920; + emitAssistantText({ + handler, + runId: "run-recovery-replay", + seq: 4, + text: "Hello world", + delta: " world", + }); + + expect(chatRunState.buffers.get("client-recovery-replay")).toBe("Hello world"); + expect(chatRunState.lastAcceptedSeq.get("client-recovery-replay")).toBe(4); + + emitLifecycleEnd(handler, "run-recovery-replay", 5); + + const payloadTexts = chatBroadcastCalls(broadcast) + .map(([, payload]) => payload as { message?: { content?: Array<{ text?: string }> } }) + .map((payload) => payload.message?.content?.[0]?.text); + expect(payloadTexts).toEqual(["Hello", "Hello world", "Hello world"]); + nowSpy.mockRestore(); + }); + + it("accepts the first safe full snapshot immediately after a seq gap", () => { + let now = 13_350; + const nowSpy = vi.spyOn(Date, "now").mockImplementation(() => now); + const { broadcast, chatRunState, handler } = createHarness(); + chatRunState.registry.add("run-gap-snapshot", { + sessionKey: "session-gap-snapshot", + clientRunId: "client-gap-snapshot", + }); + + emitAssistantText({ + handler, + runId: "run-gap-snapshot", + seq: 1, + text: "Hello", + }); + + now = 13_550; + emitAssistantText({ + handler, + runId: "run-gap-snapshot", + seq: 3, + text: "Hello world", + delta: " world", + }); + + expect(chatRunState.waitingForRecovery.has("client-gap-snapshot")).toBe(false); + expect(chatRunState.buffers.get("client-gap-snapshot")).toBe("Hello world"); + + emitLifecycleEnd(handler, "run-gap-snapshot", 4); + + const chatCalls = chatBroadcastCalls(broadcast); + expect(chatCalls).toHaveLength(3); + const payloadTexts = chatCalls + .map( + ([, payload]) => (payload as { message?: { content?: Array<{ text?: string }> } }).message, + ) + .map((message) => message?.content?.[0]?.text); + expect(payloadTexts).toEqual(["Hello", "Hello world", "Hello world"]); + nowSpy.mockRestore(); + }); + + it("uses one effective run key when source runId and client runId differ", () => { + let now = 13_500; + const nowSpy = vi.spyOn(Date, "now").mockImplementation(() => now); + const { broadcast, agentRunSeq, chatRunState, handler } = createHarness(); + chatRunState.registry.add("source-run", { + sessionKey: "session-effective-key", + clientRunId: "client-run", + }); + + emitAssistantText({ + handler, + runId: "source-run", + seq: 1, + text: "Hello", + }); + + expect(chatRunState.buffers.get("client-run")).toBe("Hello"); + expect(chatRunState.buffers.has("source-run")).toBe(false); + expect(chatRunState.lastAcceptedSeq.get("client-run")).toBe(1); + expect(agentRunSeq.get("client-run")).toBe(1); + + now = 13_700; + emitAssistantText({ + handler, + runId: "source-run", + seq: 1, + text: "Hello again", + }); + + emitLifecycleEnd(handler, "source-run", 2); + + const chatCalls = chatBroadcastCalls(broadcast); + expect(chatCalls).toHaveLength(2); + const finalPayload = chatCalls[1]?.[1] as { + runId?: string; + message?: { content?: Array<{ text?: string }> }; + }; + expect(finalPayload.runId).toBe("client-run"); + expect(finalPayload.message?.content?.[0]?.text).toBe("Hello"); + nowSpy.mockRestore(); + }); + + it("clears effective-run-key state on error so a reused client run starts fresh", () => { + let now = 13_900; + const nowSpy = vi.spyOn(Date, "now").mockImplementation(() => now); + const { broadcast, chatRunState, handler } = createHarness(); + chatRunState.registry.add("source-error", { + sessionKey: "session-reuse", + clientRunId: "client-reuse", + }); + + emitAssistantText({ + handler, + runId: "source-error", + seq: 1, + text: "Hello", + }); + + now = 14_100; + emitAssistantText({ + handler, + runId: "source-error", + seq: 3, + text: "", + delta: " world", + }); + expect(chatRunState.waitingForRecovery.has("client-reuse")).toBe(true); + + handler({ + runId: "source-error", + seq: 4, + stream: "lifecycle", + ts: Date.now(), + data: { phase: "error", error: "boom" }, + }); + + expect(chatRunState.buffers.has("client-reuse")).toBe(false); + expect(chatRunState.lastAcceptedSeq.has("client-reuse")).toBe(false); + expect(chatRunState.waitingForRecovery.has("client-reuse")).toBe(false); + expect(chatRunState.deltaSentAt.has("client-reuse")).toBe(false); + expect(chatRunState.deltaLastBroadcastLen.has("client-reuse")).toBe(false); + + chatRunState.registry.add("source-reuse", { + sessionKey: "session-reuse", + clientRunId: "client-reuse", + }); + now = 14_300; + emitAssistantText({ + handler, + runId: "source-reuse", + seq: 1, + text: "Fresh start", + }); + emitLifecycleEnd(handler, "source-reuse", 2); + + const chatCalls = chatBroadcastCalls(broadcast); + const finalPayload = chatCalls.at(-1)?.[1] as { + state?: string; + message?: { content?: Array<{ text?: string }> }; + }; + expect(finalPayload.state).toBe("final"); + expect(finalPayload.message?.content?.[0]?.text).toBe("Fresh start"); + nowSpy.mockRestore(); + }); + + it("clears sessionless effective-run state on terminal lifecycle cleanup", () => { + const { agentRunSeq, chatRunState, handler } = createHarness({ + resolveSessionKeyForRun: () => undefined, + }); + + handler({ + runId: "run-no-session", + seq: 1, + stream: "assistant", + ts: Date.now(), + data: { text: "invisible" }, + }); + expect(chatRunState.lastSeenEventSeq.get("run-no-session")).toBe(1); + expect(agentRunSeq.get("run-no-session")).toBe(1); + + handler({ + runId: "run-no-session", + seq: 2, + stream: "lifecycle", + ts: Date.now(), + data: { phase: "end" }, + }); + + expect(chatRunState.lastSeenEventSeq.has("run-no-session")).toBe(false); + expect(chatRunState.lastAcceptedSeq.has("run-no-session")).toBe(false); + expect(chatRunState.waitingForRecovery.has("run-no-session")).toBe(false); + expect(agentRunSeq.has("run-no-session")).toBe(false); + }); + it("cleans up agent run sequence tracking when lifecycle completes", () => { const { agentRunSeq, chatRunState, handler, nowSpy } = createHarness({ now: 2_500 }); chatRunState.registry.add("run-cleanup", { @@ -472,7 +1345,8 @@ describe("agent event handler", () => { ts: Date.now(), data: { text: "done" }, }); - expect(agentRunSeq.get("run-cleanup")).toBe(1); + expect(agentRunSeq.get("run-cleanup")).toBeUndefined(); + expect(agentRunSeq.get("client-cleanup")).toBe(1); handler({ runId: "run-cleanup", @@ -484,6 +1358,8 @@ describe("agent event handler", () => { expect(agentRunSeq.has("run-cleanup")).toBe(false); expect(agentRunSeq.has("client-cleanup")).toBe(false); + expect(chatRunState.lastAcceptedSeq.has("client-cleanup")).toBe(false); + expect(chatRunState.waitingForRecovery.has("client-cleanup")).toBe(false); nowSpy?.mockRestore(); }); @@ -527,6 +1403,156 @@ describe("agent event handler", () => { nowSpy?.mockRestore(); }); + it("ignores non-finite seq values for agent-run monotonic tracking", () => { + const { agentRunSeq, broadcast, handler } = createHarness({ + resolveSessionKeyForRun: () => "session-non-finite", + }); + + emitFallbackLifecycle({ + handler, + runId: "run-non-finite", + seq: Number.NaN, + }); + + expect(agentRunSeq.has("run-non-finite")).toBe(false); + const gapErrorsAfterInvalid = broadcast.mock.calls.filter( + ([event, payload]) => + event === "agent" && + (payload as { stream?: string; data?: { reason?: string } }).stream === "error" && + (payload as { data?: { reason?: string } }).data?.reason === "seq gap", + ); + expect(gapErrorsAfterInvalid).toHaveLength(0); + + emitFallbackLifecycle({ + handler, + runId: "run-non-finite", + seq: 1, + }); + + expect(agentRunSeq.get("run-non-finite")).toBe(1); + }); + + it("trims overlap when appending allowed delta-only chunks", () => { + let now = 13_800; + const nowSpy = vi.spyOn(Date, "now").mockImplementation(() => now); + const { broadcast, chatRunState, handler } = createHarness(); + chatRunState.registry.add("run-overlap", { + sessionKey: "session-overlap", + clientRunId: "client-overlap", + }); + + emitAssistantText({ + handler, + runId: "run-overlap", + seq: 1, + text: "Hello wor", + }); + + now = 14_000; + emitAssistantText({ + handler, + runId: "run-overlap", + seq: 2, + text: "", + delta: "world", + }); + + emitLifecycleEnd(handler, "run-overlap", 3); + + const chatCalls = chatBroadcastCalls(broadcast); + expect(chatCalls).toHaveLength(3); + const finalPayload = chatCalls[2]?.[1] as { + state?: string; + message?: { content?: Array<{ text?: string }> }; + }; + expect(finalPayload.state).toBe("final"); + expect(finalPayload.message?.content?.[0]?.text).toBe("Hello world"); + nowSpy.mockRestore(); + }); + + it("clears recovery seq state on abort teardown before the next run reuses the key", () => { + let now = 14_200; + const nowSpy = vi.spyOn(Date, "now").mockImplementation(() => now); + const { agentRunSeq, broadcast, nodeSendToSession, chatRunState, handler } = createHarness(); + chatRunState.registry.add("client-abort-reuse", { + sessionKey: "session-abort-reuse", + clientRunId: "client-abort-reuse", + }); + + emitAssistantText({ + handler, + runId: "client-abort-reuse", + seq: 1, + text: "Hello", + }); + + now = 14_400; + emitAssistantText({ + handler, + runId: "client-abort-reuse", + seq: 3, + text: "", + delta: " world", + }); + expect(chatRunState.waitingForRecovery.has("client-abort-reuse")).toBe(true); + + const entry = { + controller: new AbortController(), + sessionId: "session-abort-reuse", + sessionKey: "session-abort-reuse", + startedAtMs: now, + expiresAtMs: now + 30_000, + }; + const chatAbortControllers = new Map([["client-abort-reuse", entry]]); + agentRunSeq.set("client-abort-reuse", 3); + + const res = abortChatRunById( + { + chatAbortControllers, + chatAbortedRuns: chatRunState.abortedRuns, + chatRunState, + removeChatRun: chatRunState.registry.remove, + agentRunSeq, + broadcast, + nodeSendToSession, + }, + { + runId: "client-abort-reuse", + sessionKey: "session-abort-reuse", + }, + ); + + expect(res).toEqual({ aborted: true }); + expect(chatRunState.lastSeenEventSeq.has("client-abort-reuse")).toBe(false); + expect(chatRunState.lastAcceptedSeq.has("client-abort-reuse")).toBe(false); + expect(chatRunState.waitingForRecovery.has("client-abort-reuse")).toBe(false); + + // Simulate the old aborted run's terminal cleanup having already completed. + clearEffectiveChatRunState(chatRunState, "client-abort-reuse"); + chatRunState.abortedRuns.delete("client-abort-reuse"); + chatRunState.registry.add("client-abort-reuse", { + sessionKey: "session-abort-reuse", + clientRunId: "client-abort-reuse", + }); + + now = 14_600; + emitAssistantText({ + handler, + runId: "client-abort-reuse", + seq: 1, + text: "Fresh start", + }); + emitLifecycleEnd(handler, "client-abort-reuse", 2); + + const finalPayload = chatBroadcastCalls(broadcast).at(-1)?.[1] as { + state?: string; + message?: { content?: Array<{ text?: string }> }; + }; + expect(finalPayload.state).toBe("final"); + expect(finalPayload.message?.content?.[0]?.text).toBe("Fresh start"); + nowSpy.mockRestore(); + }); + it("flushes buffered chat delta before tool start events", () => { let now = 12_000; const nowSpy = vi.spyOn(Date, "now").mockImplementation(() => now); @@ -979,4 +2005,145 @@ describe("agent event handler", () => { "Disk usage crossed 95 percent on /data and needs cleanup now.", ); }); + + it("does not resurrect seq state when a stale assistant event arrives after terminal cleanup", () => { + const { broadcast, chatRunState, handler, nowSpy } = createHarness({ now: 2_700 }); + chatRunState.registry.add("run-post-final-stale", { + sessionKey: "session-post-final-stale", + clientRunId: "client-post-final-stale", + }); + + emitAssistantText({ + handler, + runId: "run-post-final-stale", + seq: 1, + text: "Hello", + }); + emitLifecycleEnd(handler, "run-post-final-stale", 2); + + expect(chatRunState.finalizedEffectiveRunKeys.has("client-post-final-stale")).toBe(true); + expect(chatRunState.lastSeenEventSeq.has("client-post-final-stale")).toBe(false); + expect(chatRunState.lastAcceptedSeq.has("client-post-final-stale")).toBe(false); + expect(chatRunState.waitingForRecovery.has("client-post-final-stale")).toBe(false); + expect(chatRunState.buffers.has("client-post-final-stale")).toBe(false); + + const chatCallsBeforeStale = chatBroadcastCalls(broadcast).length; + handler({ + runId: "run-post-final-stale", + seq: 3, + stream: "assistant", + ts: Date.now(), + data: { text: "late tail" }, + }); + + expect(chatRunState.finalizedEffectiveRunKeys.has("client-post-final-stale")).toBe(true); + expect(chatRunState.lastSeenEventSeq.has("client-post-final-stale")).toBe(false); + expect(chatRunState.lastAcceptedSeq.has("client-post-final-stale")).toBe(false); + expect(chatRunState.waitingForRecovery.has("client-post-final-stale")).toBe(false); + expect(chatRunState.buffers.has("client-post-final-stale")).toBe(false); + expect(chatBroadcastCalls(broadcast)).toHaveLength(chatCallsBeforeStale); + nowSpy?.mockRestore(); + }); + + it("allows a finalized client-visible key to start fresh after a stale post-final tail", () => { + const { broadcast, chatRunState, handler, nowSpy } = createHarness({ now: 2_900 }); + chatRunState.registry.add("run-reuse-before-final", { + sessionKey: "session-reuse-after-final", + clientRunId: "client-reuse-after-final", + }); + + emitAssistantText({ + handler, + runId: "run-reuse-before-final", + seq: 1, + text: "Original", + }); + emitLifecycleEnd(handler, "run-reuse-before-final", 2); + + handler({ + runId: "run-reuse-before-final", + seq: 3, + stream: "tool", + ts: Date.now(), + data: { phase: "start", name: "late", toolCallId: "late-tool" }, + }); + + expect(chatRunState.finalizedEffectiveRunKeys.has("client-reuse-after-final")).toBe(true); + expect(chatRunState.lastSeenEventSeq.has("client-reuse-after-final")).toBe(false); + + chatRunState.registry.add("run-reuse-after-final", { + sessionKey: "session-reuse-after-final", + clientRunId: "client-reuse-after-final", + }); + + emitAssistantText({ + handler, + runId: "run-reuse-after-final", + seq: 1, + text: "Fresh start", + }); + + expect(chatRunState.finalizedEffectiveRunKeys.has("client-reuse-after-final")).toBe(false); + expect(chatRunState.lastSeenEventSeq.get("client-reuse-after-final")).toBe(1); + expect(chatRunState.lastAcceptedSeq.get("client-reuse-after-final")).toBe(1); + expect(chatRunState.waitingForRecovery.has("client-reuse-after-final")).toBe(false); + expect(chatRunState.buffers.get("client-reuse-after-final")).toBe("Fresh start"); + + emitLifecycleEnd(handler, "run-reuse-after-final", 2); + const finalPayload = chatBroadcastCalls(broadcast).at(-1)?.[1] as { + state?: string; + message?: { content?: Array<{ text?: string }> }; + }; + expect(finalPayload.state).toBe("final"); + expect(finalPayload.message?.content?.[0]?.text).toBe("Fresh start"); + nowSpy?.mockRestore(); + }); + + it("does not enter recovery for a same-text full snapshot, so later in-order deltas still append", () => { + let now = 12_580; + const nowSpy = vi.spyOn(Date, "now").mockImplementation(() => now); + const { broadcast, chatRunState, handler } = createHarness(); + chatRunState.registry.add("run-benign-noop", { + sessionKey: "session-benign-noop", + clientRunId: "client-benign-noop", + }); + + emitAssistantText({ + handler, + runId: "run-benign-noop", + seq: 1, + text: "Hello world", + }); + + now = 12_780; + emitAssistantText({ + handler, + runId: "run-benign-noop", + seq: 2, + text: "Hello world", + }); + + expect(chatRunState.waitingForRecovery.has("client-benign-noop")).toBe(false); + expect(chatRunState.buffers.get("client-benign-noop")).toBe("Hello world"); + + now = 12_980; + emitAssistantText({ + handler, + runId: "run-benign-noop", + seq: 3, + text: "", + delta: "!", + }); + + expect(chatRunState.buffers.get("client-benign-noop")).toBe("Hello world!"); + expect(chatRunState.waitingForRecovery.has("client-benign-noop")).toBe(false); + + emitLifecycleEnd(handler, "run-benign-noop", 4); + + const payloadTexts = chatBroadcastCalls(broadcast) + .map(([, payload]) => payload as { message?: { content?: Array<{ text?: string }> } }) + .map((payload) => payload.message?.content?.[0]?.text); + expect(payloadTexts).toEqual(["Hello world", "Hello world!", "Hello world!"]); + nowSpy.mockRestore(); + }); }); diff --git a/src/gateway/server-chat.ts b/src/gateway/server-chat.ts index 7fda61b6c0c..764c440cbce 100644 --- a/src/gateway/server-chat.ts +++ b/src/gateway/server-chat.ts @@ -93,6 +93,51 @@ function isSilentReplyLeadFragment(text: string): boolean { return SILENT_REPLY_TOKEN.startsWith(normalized); } +/** + * Treat an event as a safe replacement only when `nextText` already carries the + * full visible assistant text for this step. + * + * Example: + * - ACP cumulative snapshot: previous=`Hello`, nextText=`Hello world`, + * nextDelta=` world` => safe full replacement. + * - Ambiguous delta-only producer: nextText=``, nextDelta=` world` => not safe. + */ +function isFullVisibleTextEvent(params: { + previousText: string; + nextText: string; + nextDelta: string; +}) { + const { previousText, nextText, nextDelta } = params; + if (!nextText) { + return false; + } + if (!nextDelta) { + return true; + } + if (!previousText) { + return nextText === nextDelta; + } + return nextText === previousText || nextText.startsWith(previousText); +} + +function isDeltaOnlyAssistantEvent(params: { nextText: string; nextDelta: string }) { + return !params.nextText && Boolean(params.nextDelta); +} + +function isCumulativeRecoverySnapshotFromEmptyBase(params: { + previousText: string; + nextText: string; + nextDelta: string; +}) { + const { previousText, nextText, nextDelta } = params; + return ( + !previousText && + Boolean(nextDelta) && + nextText.length > nextDelta.length && + nextText.endsWith(nextDelta) + ); +} + function appendUniqueSuffix(base: string, suffix: string): string { if (!suffix) { return base; @@ -112,26 +157,101 @@ function appendUniqueSuffix(base: string, suffix: string): string { return base + suffix; } -function resolveMergedAssistantText(params: { +/** + * Run-global seq can only force assistant recovery when we skipped over an + * event we never observed. Already-seen tool/lifecycle events must not freeze + * the assistant buffer. + */ +function hasObservedEventSeqGap(params: { + hasNumericSeq: boolean; + previousSeenEventSeq: number; + nextSeq: number; +}) { + return ( + params.hasNumericSeq && + params.previousSeenEventSeq > 0 && + params.nextSeq > params.previousSeenEventSeq + 1 + ); +} + +function hasSeenNewerRunEvent(params: { + hasNumericSeq: boolean; + previousSeenEventSeq: number; + nextSeq: number; +}) { + return params.hasNumericSeq && params.previousSeenEventSeq > params.nextSeq; +} + +/** + * Delta-only chunks are append-safe only when the run stayed observed in order. + * If we skipped an event, wait for a full replacement instead of appending. + */ +function canAppendDelta(params: { + hasNumericSeq: boolean; + isWaitingForRecovery: boolean; + hasObservedGap: boolean; + nextText: string; + nextDelta: string; +}) { + return ( + params.hasNumericSeq && + !params.isWaitingForRecovery && + !params.hasObservedGap && + isDeltaOnlyAssistantEvent({ nextText: params.nextText, nextDelta: params.nextDelta }) + ); +} + +/** + * Recovery may resume only from an event shape that proves `nextText` is the + * complete visible assistant text. ACP-style cumulative `text` + `delta` + * snapshots qualify; delta-only chunks must keep waiting because we cannot + * prove whether they replay a suffix or skip hidden text. + */ +function canRecoverFromFullReplacement(params: { previousText: string; nextText: string; nextDelta: string; }) { - const { previousText, nextText, nextDelta } = params; - if (nextText && previousText) { + return isFullVisibleTextEvent(params) || isCumulativeRecoverySnapshotFromEmptyBase(params); +} + +function resolveMergedAssistantText(params: { + previousText: string; + nextText: string; + nextDelta: string; + allowDeltaAppend: boolean; + allowFullReplacementShrink?: boolean; + allowEmptyBaseRecoveryReplacement?: boolean; +}) { + const { + previousText, + nextText, + nextDelta, + allowDeltaAppend, + allowFullReplacementShrink = false, + allowEmptyBaseRecoveryReplacement = false, + } = params; + if (isFullVisibleTextEvent({ previousText, nextText, nextDelta })) { + if (nextText === previousText) { + return previousText; + } if (nextText.startsWith(previousText)) { return nextText; } - if (previousText.startsWith(nextText) && !nextDelta) { + if (!allowFullReplacementShrink && !nextDelta && previousText.startsWith(nextText)) { return previousText; } - } - if (nextDelta) { - return appendUniqueSuffix(previousText, nextDelta); - } - if (nextText) { return nextText; } + if ( + allowEmptyBaseRecoveryReplacement && + isCumulativeRecoverySnapshotFromEmptyBase({ previousText, nextText, nextDelta }) + ) { + return nextText; + } + if (allowDeltaAppend) { + return appendUniqueSuffix(previousText, nextDelta); + } return previousText; } @@ -203,38 +323,89 @@ export function createChatRunRegistry(): ChatRunRegistry { export type ChatRunState = { registry: ChatRunRegistry; buffers: Map; + /** Highest run-global seq observed for this effective run, across all streams. */ + lastSeenEventSeq: Map; + /** Highest assistant-visible seq accepted into the chat buffer. */ + lastAcceptedSeq: Map; + /** Seq gap latch: block delta-only assistant merges until a safe full replacement arrives. */ + waitingForRecovery: Set; + /** Last assistant text that was actually broadcast to streaming clients. */ + deltaLastBroadcastText: Map; deltaSentAt: Map; /** Length of text at the time of the last broadcast, used to avoid duplicate flushes. */ deltaLastBroadcastLen: Map; + /** Run keys that have already hit a terminal lifecycle event (end/error) and are finalized. */ + finalizedEffectiveRunKeys: Set; abortedRuns: Map; clear: () => void; }; +export type EffectiveChatRunStateSlice = Pick< + ChatRunState, + | "buffers" + | "lastSeenEventSeq" + | "lastAcceptedSeq" + | "waitingForRecovery" + | "deltaLastBroadcastText" + | "deltaSentAt" + | "deltaLastBroadcastLen" + | "finalizedEffectiveRunKeys" +>; + export function createChatRunState(): ChatRunState { const registry = createChatRunRegistry(); const buffers = new Map(); + const lastSeenEventSeq = new Map(); + const lastAcceptedSeq = new Map(); + const waitingForRecovery = new Set(); + const deltaLastBroadcastText = new Map(); const deltaSentAt = new Map(); const deltaLastBroadcastLen = new Map(); + const finalizedEffectiveRunKeys = new Set(); const abortedRuns = new Map(); const clear = () => { registry.clear(); buffers.clear(); + lastSeenEventSeq.clear(); + lastAcceptedSeq.clear(); + waitingForRecovery.clear(); + deltaLastBroadcastText.clear(); deltaSentAt.clear(); deltaLastBroadcastLen.clear(); + finalizedEffectiveRunKeys.clear(); abortedRuns.clear(); }; return { registry, buffers, + lastSeenEventSeq, + lastAcceptedSeq, + waitingForRecovery, + deltaLastBroadcastText, deltaSentAt, deltaLastBroadcastLen, + finalizedEffectiveRunKeys, abortedRuns, clear, }; } +export function clearEffectiveChatRunState( + chatRunState: EffectiveChatRunStateSlice, + effectiveRunKey: string, +) { + chatRunState.buffers.delete(effectiveRunKey); + chatRunState.lastSeenEventSeq.delete(effectiveRunKey); + chatRunState.lastAcceptedSeq.delete(effectiveRunKey); + chatRunState.waitingForRecovery.delete(effectiveRunKey); + chatRunState.deltaLastBroadcastText.delete(effectiveRunKey); + chatRunState.deltaSentAt.delete(effectiveRunKey); + chatRunState.deltaLastBroadcastLen.delete(effectiveRunKey); + chatRunState.finalizedEffectiveRunKeys.delete(effectiveRunKey); +} + export type ToolEventRecipientRegistry = { add: (runId: string, connId: string) => void; get: (runId: string) => ReadonlySet | undefined; @@ -452,6 +623,26 @@ export type AgentEventHandlerOptions = { sessionEventSubscribers: SessionEventSubscriberRegistry; }; +type EmitChatDeltaParams = { + sessionKey: string; + effectiveRunKey: string; + sourceRunId: string; + seq: number; + text: string; + previousSeenEventSeq: number; + delta?: unknown; +}; + +type ResolveChatDeltaTextParams = Pick< + EmitChatDeltaParams, + "effectiveRunKey" | "seq" | "previousSeenEventSeq" +> & { + previousText: string; + cleanedText: string; + cleanedDelta: string; + hasNumericSeq: boolean; +}; + export function createAgentEventHandler({ broadcast, broadcastToConnIds, @@ -499,45 +690,169 @@ export function createAgentEventHandler({ }; }; - const emitChatDelta = ( - sessionKey: string, - clientRunId: string, - sourceRunId: string, - seq: number, - text: string, - delta?: unknown, - ) => { - const cleanedText = stripInlineDirectiveTagsForDisplay(text).text; - const cleanedDelta = - typeof delta === "string" ? stripInlineDirectiveTagsForDisplay(delta).text : ""; - const previousText = chatRunState.buffers.get(clientRunId) ?? ""; + const resolveRecoveryChatText = ({ + effectiveRunKey, + previousText, + cleanedText, + cleanedDelta, + }: ResolveChatDeltaTextParams) => { + if ( + !canRecoverFromFullReplacement({ + previousText, + nextText: cleanedText, + nextDelta: cleanedDelta, + }) + ) { + return undefined; + } + chatRunState.waitingForRecovery.delete(effectiveRunKey); + const replacementText = resolveMergedAssistantText({ + previousText, + nextText: cleanedText, + nextDelta: cleanedDelta, + allowDeltaAppend: false, + allowFullReplacementShrink: true, + allowEmptyBaseRecoveryReplacement: true, + }); + // Recovery can relock onto the stream without changing visible text. + // That should clear the recovery latch, but it must not advance accepted seq. + if (!replacementText || replacementText === previousText) { + return undefined; + } + return replacementText; + }; + + const resolveInOrderChatText = ({ + effectiveRunKey, + seq, + previousSeenEventSeq, + previousText, + cleanedText, + cleanedDelta, + hasNumericSeq, + }: ResolveChatDeltaTextParams) => { + const hasObservedGap = hasObservedEventSeqGap({ + hasNumericSeq, + previousSeenEventSeq, + nextSeq: seq, + }); + if (hasObservedGap) { + if ( + !canRecoverFromFullReplacement({ + previousText, + nextText: cleanedText, + nextDelta: cleanedDelta, + }) + ) { + chatRunState.waitingForRecovery.add(effectiveRunKey); + return undefined; + } + chatRunState.waitingForRecovery.delete(effectiveRunKey); + } const mergedText = resolveMergedAssistantText({ previousText, nextText: cleanedText, nextDelta: cleanedDelta, + allowDeltaAppend: canAppendDelta({ + hasNumericSeq, + isWaitingForRecovery: false, + hasObservedGap, + nextText: cleanedText, + nextDelta: cleanedDelta, + }), + allowFullReplacementShrink: hasObservedGap, + allowEmptyBaseRecoveryReplacement: hasObservedGap, }); + if (!mergedText || mergedText === previousText) { + return undefined; + } + return mergedText; + }; + + const emitChatDelta = ({ + sessionKey, + effectiveRunKey, + sourceRunId, + seq, + text, + previousSeenEventSeq, + delta, + }: EmitChatDeltaParams) => { + /** + * Effective-run merge invariants: + * - `lastSeenEventSeq` tracks the highest run-global seq observed on any stream for gap detection. + * - `lastAcceptedSeq` tracks the highest assistant seq merged into the visible buffer. + * - `waitingForRecovery` latches after a seq gap until a safe full-text replacement arrives. + * - `agentRunSeq` tracks the run-global seq we have observed for client-facing event ordering. + * + * Normal behavior merges in-order assistant events: delta-only chunks append, while full visible + * snapshots replace the buffer. Recovery behavior is stricter: after a gap, ignore delta-only + * chunks until a full visible snapshot re-establishes the complete assistant text. + */ + const cleanedText = stripInlineDirectiveTagsForDisplay(text).text; + const cleanedDelta = + typeof delta === "string" ? stripInlineDirectiveTagsForDisplay(delta).text : ""; + const previousText = chatRunState.buffers.get(effectiveRunKey) ?? ""; + const hasNumericSeq = Number.isFinite(seq); + const lastAcceptedSeq = chatRunState.lastAcceptedSeq.get(effectiveRunKey) ?? 0; + const isStaleOrReplay = hasNumericSeq && seq <= lastAcceptedSeq; + if (isStaleOrReplay) { + return; + } + const hasSeenNewerEvent = hasSeenNewerRunEvent({ + hasNumericSeq, + previousSeenEventSeq, + nextSeq: seq, + }); + if (hasSeenNewerEvent) { + chatRunState.waitingForRecovery.add(effectiveRunKey); + return; + } + const mergedText = chatRunState.waitingForRecovery.has(effectiveRunKey) + ? resolveRecoveryChatText({ + effectiveRunKey, + seq, + previousSeenEventSeq, + previousText, + cleanedText, + cleanedDelta, + hasNumericSeq, + }) + : resolveInOrderChatText({ + effectiveRunKey, + seq, + previousSeenEventSeq, + previousText, + cleanedText, + cleanedDelta, + hasNumericSeq, + }); if (!mergedText) { return; } - chatRunState.buffers.set(clientRunId, mergedText); + chatRunState.buffers.set(effectiveRunKey, mergedText); + if (hasNumericSeq) { + chatRunState.lastAcceptedSeq.set(effectiveRunKey, seq); + } if (isSilentReplyText(mergedText, SILENT_REPLY_TOKEN)) { return; } if (isSilentReplyLeadFragment(mergedText)) { return; } - if (shouldHideHeartbeatChatOutput(clientRunId, sourceRunId)) { + if (shouldHideHeartbeatChatOutput(effectiveRunKey, sourceRunId)) { return; } const now = Date.now(); - const last = chatRunState.deltaSentAt.get(clientRunId) ?? 0; + const last = chatRunState.deltaSentAt.get(effectiveRunKey) ?? 0; if (now - last < 150) { return; } - chatRunState.deltaSentAt.set(clientRunId, now); - chatRunState.deltaLastBroadcastLen.set(clientRunId, mergedText.length); + chatRunState.deltaSentAt.set(effectiveRunKey, now); + chatRunState.deltaLastBroadcastText.set(effectiveRunKey, mergedText); + chatRunState.deltaLastBroadcastLen.set(effectiveRunKey, mergedText.length); const payload = { - runId: clientRunId, + runId: effectiveRunKey, sessionKey, seq, state: "delta" as const, @@ -551,12 +866,12 @@ export function createAgentEventHandler({ nodeSendToSession(sessionKey, "chat", payload); }; - const resolveBufferedChatTextState = (clientRunId: string, sourceRunId: string) => { + const resolveBufferedChatTextState = (effectiveRunKey: string, sourceRunId: string) => { const bufferedText = stripInlineDirectiveTagsForDisplay( - chatRunState.buffers.get(clientRunId) ?? "", + chatRunState.buffers.get(effectiveRunKey) ?? "", ).text.trim(); const normalizedHeartbeatText = normalizeHeartbeatChatFinalText({ - runId: clientRunId, + runId: effectiveRunKey, sourceRunId, text: bufferedText, }); @@ -568,14 +883,17 @@ export function createAgentEventHandler({ const flushBufferedChatDeltaIfNeeded = ( sessionKey: string, - clientRunId: string, + effectiveRunKey: string, sourceRunId: string, seq: number, ) => { - const { text, shouldSuppressSilent } = resolveBufferedChatTextState(clientRunId, sourceRunId); + const { text, shouldSuppressSilent } = resolveBufferedChatTextState( + effectiveRunKey, + sourceRunId, + ); const shouldSuppressSilentLeadFragment = isSilentReplyLeadFragment(text); const shouldSuppressHeartbeatStreaming = shouldHideHeartbeatChatOutput( - clientRunId, + effectiveRunKey, sourceRunId, ); if ( @@ -587,14 +905,14 @@ export function createAgentEventHandler({ return; } - const lastBroadcastLen = chatRunState.deltaLastBroadcastLen.get(clientRunId) ?? 0; - if (text.length <= lastBroadcastLen) { + const lastBroadcastText = chatRunState.deltaLastBroadcastText.get(effectiveRunKey) ?? ""; + if (text === lastBroadcastText) { return; } const now = Date.now(); const flushPayload = { - runId: clientRunId, + runId: effectiveRunKey, sessionKey, seq, state: "delta" as const, @@ -606,31 +924,33 @@ export function createAgentEventHandler({ }; broadcast("chat", flushPayload, { dropIfSlow: true }); nodeSendToSession(sessionKey, "chat", flushPayload); - chatRunState.deltaLastBroadcastLen.set(clientRunId, text.length); - chatRunState.deltaSentAt.set(clientRunId, now); + chatRunState.deltaLastBroadcastText.set(effectiveRunKey, text); + chatRunState.deltaLastBroadcastLen.set(effectiveRunKey, text.length); + chatRunState.deltaSentAt.set(effectiveRunKey, now); }; const emitChatFinal = ( sessionKey: string, - clientRunId: string, + effectiveRunKey: string, sourceRunId: string, seq: number, jobState: "done" | "error", error?: unknown, stopReason?: string, ) => { - const { text, shouldSuppressSilent } = resolveBufferedChatTextState(clientRunId, sourceRunId); + const { text, shouldSuppressSilent } = resolveBufferedChatTextState( + effectiveRunKey, + sourceRunId, + ); // Flush any throttled delta so streaming clients receive the complete text // before the final event. The 150 ms throttle in emitChatDelta may have // suppressed the most recent chunk, leaving the client with stale text. - // Only flush if the buffer has grown since the last broadcast to avoid duplicates. - flushBufferedChatDeltaIfNeeded(sessionKey, clientRunId, sourceRunId, seq); - chatRunState.deltaLastBroadcastLen.delete(clientRunId); - chatRunState.buffers.delete(clientRunId); - chatRunState.deltaSentAt.delete(clientRunId); + // Only flush if the buffered text differs from the last broadcast to avoid duplicates. + flushBufferedChatDeltaIfNeeded(sessionKey, effectiveRunKey, sourceRunId, seq); + clearEffectiveChatRunState(chatRunState, effectiveRunKey); if (jobState === "done") { const payload = { - runId: clientRunId, + runId: effectiveRunKey, sessionKey, seq, state: "final" as const, @@ -649,7 +969,7 @@ export function createAgentEventHandler({ return; } const payload = { - runId: clientRunId, + runId: effectiveRunKey, sessionKey, seq, state: "error" as const, @@ -688,14 +1008,39 @@ export function createAgentEventHandler({ const isControlUiVisible = getAgentRunContext(evt.runId)?.isControlUiVisible ?? true; const sessionKey = chatLink?.sessionKey ?? eventSessionKey ?? resolveSessionKeyForRun(evt.runId); - const clientRunId = chatLink?.clientRunId ?? evt.runId; - const eventRunId = chatLink?.clientRunId ?? evt.runId; + // `effectiveRunKey` is the client-visible run identity used for chat merge + // state. `evt.runId` remains the upstream source run id used for agent + // context lookups and tool recipient routing. + const effectiveRunKey = chatLink?.clientRunId ?? evt.runId; + const eventRunId = effectiveRunKey; const eventForClients = chatLink ? { ...evt, runId: eventRunId } : evt; + const lifecyclePhase = + evt.stream === "lifecycle" && typeof evt.data?.phase === "string" ? evt.data.phase : null; + + // Prevent post-terminal state resurrection: ignore events for finalized runs unless it's a new run start signal + if (chatRunState.finalizedEffectiveRunKeys.has(effectiveRunKey)) { + const isNewRunStart = + lifecyclePhase === "start" || (Number.isFinite(evt.seq) && evt.seq === 1); + if (!isNewRunStart) { + return; + } + chatRunState.finalizedEffectiveRunKeys.delete(effectiveRunKey); + } + const isAborted = - chatRunState.abortedRuns.has(clientRunId) || chatRunState.abortedRuns.has(evt.runId); + chatRunState.abortedRuns.has(effectiveRunKey) || chatRunState.abortedRuns.has(evt.runId); + const previousSeenEventSeq = chatRunState.lastSeenEventSeq.get(effectiveRunKey) ?? 0; + const hasNumericSeq = Number.isFinite(evt.seq); + if (hasNumericSeq && evt.seq > previousSeenEventSeq) { + chatRunState.lastSeenEventSeq.set(effectiveRunKey, evt.seq); + } // Include sessionKey so Control UI can filter tool streams per session. const agentPayload = sessionKey ? { ...eventForClients, sessionKey } : eventForClients; - const last = agentRunSeq.get(evt.runId) ?? 0; + const previousAgentRunSeq = agentRunSeq.get(effectiveRunKey); + const last = + typeof previousAgentRunSeq === "number" && Number.isFinite(previousAgentRunSeq) + ? previousAgentRunSeq + : 0; const isToolEvent = evt.stream === "tool"; const toolVerbose = isToolEvent ? resolveToolVerboseLevel(evt.runId, sessionKey) : "off"; // Build tool payload: strip result/partialResult unless verbose=full @@ -710,7 +1055,7 @@ export function createAgentEventHandler({ : { ...eventForClients, data }; })() : agentPayload; - if (last > 0 && evt.seq !== last + 1) { + if (hasNumericSeq && last > 0 && evt.seq !== last + 1) { broadcast("agent", { runId: eventRunId, stream: "error", @@ -723,13 +1068,15 @@ export function createAgentEventHandler({ }, }); } - agentRunSeq.set(evt.runId, evt.seq); + if (hasNumericSeq) { + agentRunSeq.set(effectiveRunKey, Math.max(last, evt.seq)); + } if (isToolEvent) { const toolPhase = typeof evt.data?.phase === "string" ? evt.data.phase : ""; // Flush pending assistant text before tool-start events so clients can // render complete pre-tool text above tool cards (not truncated by delta throttle). if (toolPhase === "start" && isControlUiVisible && sessionKey && !isAborted) { - flushBufferedChatDeltaIfNeeded(sessionKey, clientRunId, evt.runId, evt.seq); + flushBufferedChatDeltaIfNeeded(sessionKey, effectiveRunKey, evt.runId, evt.seq); } // Always broadcast tool events to registered WS recipients with // tool-events capability, regardless of verboseLevel. The verbose @@ -754,9 +1101,6 @@ export function createAgentEventHandler({ broadcast("agent", agentPayload); } - const lifecyclePhase = - evt.stream === "lifecycle" && typeof evt.data?.phase === "string" ? evt.data.phase : null; - if (isControlUiVisible && sessionKey) { // Send tool events to node/channel subscribers only when verbose is enabled; // WS clients already received the event above via broadcastToConnIds. @@ -764,7 +1108,15 @@ export function createAgentEventHandler({ nodeSendToSession(sessionKey, "agent", isToolEvent ? toolPayload : agentPayload); } if (!isAborted && evt.stream === "assistant" && typeof evt.data?.text === "string") { - emitChatDelta(sessionKey, clientRunId, evt.runId, evt.seq, evt.data.text, evt.data.delta); + emitChatDelta({ + sessionKey, + effectiveRunKey, + sourceRunId: evt.runId, + seq: evt.seq, + text: evt.data.text, + previousSeenEventSeq, + delta: evt.data.delta, + }); } else if (!isAborted && (lifecyclePhase === "end" || lifecyclePhase === "error")) { const evtStopReason = typeof evt.data?.stopReason === "string" ? evt.data.stopReason : undefined; @@ -795,12 +1147,13 @@ export function createAgentEventHandler({ ); } } else if (isAborted && (lifecyclePhase === "end" || lifecyclePhase === "error")) { - chatRunState.abortedRuns.delete(clientRunId); + // Keep aborted-run cleanup explicit: abortedRuns may be keyed by both + // the source runId and the effective client-visible runId. + chatRunState.abortedRuns.delete(effectiveRunKey); chatRunState.abortedRuns.delete(evt.runId); - chatRunState.buffers.delete(clientRunId); - chatRunState.deltaSentAt.delete(clientRunId); + clearEffectiveChatRunState(chatRunState, effectiveRunKey); if (chatLink) { - chatRunState.registry.remove(evt.runId, clientRunId, sessionKey); + chatRunState.registry.remove(evt.runId, effectiveRunKey, sessionKey); } } } @@ -809,7 +1162,9 @@ export function createAgentEventHandler({ toolEventRecipients.markFinal(evt.runId); clearAgentRunContext(evt.runId); agentRunSeq.delete(evt.runId); - agentRunSeq.delete(clientRunId); + agentRunSeq.delete(effectiveRunKey); + clearEffectiveChatRunState(chatRunState, effectiveRunKey); + chatRunState.finalizedEffectiveRunKeys.add(effectiveRunKey); } if ( diff --git a/src/gateway/server-maintenance.test.ts b/src/gateway/server-maintenance.test.ts index 045f73d802a..d4af4a5c1bd 100644 --- a/src/gateway/server-maintenance.test.ts +++ b/src/gateway/server-maintenance.test.ts @@ -1,7 +1,15 @@ import { afterEach, describe, expect, it, vi } from "vitest"; import type { HealthSummary } from "../commands/health.js"; +import { + createAgentEventHandler, + createChatRunState, + createSessionEventSubscriberRegistry, + createToolEventRecipientRegistry, +} from "./server-chat.js"; -const cleanOldMediaMock = vi.fn(async () => {}); +const { cleanOldMediaMock } = vi.hoisted(() => ({ + cleanOldMediaMock: vi.fn(async () => {}), +})); vi.mock("../media/store.js", async (importOriginal) => { const actual = await importOriginal(); @@ -11,9 +19,14 @@ vi.mock("../media/store.js", async (importOriginal) => { }; }); +vi.mock("./server/health-state.js", () => ({ + setBroadcastHealthUpdate: () => {}, +})); + const MEDIA_CLEANUP_TTL_MS = 24 * 60 * 60_000; function createMaintenanceTimerDeps() { + const chatRunState = createChatRunState(); return { broadcast: () => {}, nodeSendToAllSubscribed: () => {}, @@ -23,15 +36,54 @@ function createMaintenanceTimerDeps() { logHealth: { error: () => {} }, dedupe: new Map(), chatAbortControllers: new Map(), - chatRunState: { abortedRuns: new Map() }, - chatRunBuffers: new Map(), - chatDeltaSentAt: new Map(), - removeChatRun: () => undefined, + chatRunState, + removeChatRun: chatRunState.registry.remove, agentRunSeq: new Map(), nodeSendToSession: () => {}, }; } +function seedEffectiveRunState( + chatRunState: ReturnType, + runId: string, + text = "Hello world", +) { + chatRunState.buffers.set(runId, text); + chatRunState.lastSeenEventSeq.set(runId, 3); + chatRunState.lastAcceptedSeq.set(runId, 2); + chatRunState.waitingForRecovery.add(runId); + chatRunState.deltaLastBroadcastText.set(runId, text); + chatRunState.deltaSentAt.set(runId, 100); + chatRunState.deltaLastBroadcastLen.set(runId, text.length); +} + +function seedAgentRunSeqPastCap(agentRunSeq: Map, oldestRunId: string) { + agentRunSeq.set(oldestRunId, 1); + for (let i = 0; i < 10_000; i++) { + agentRunSeq.set(`run-${String(i)}`, i + 2); + } +} + +function seedAgentRunSeqOverCapWithOlderRuns( + agentRunSeq: Map, + olderRunIds: string[], + extraRunCount = 1, +) { + let seq = 1; + for (const runId of olderRunIds) { + agentRunSeq.set(runId, seq); + seq += 1; + } + for (let i = 0; i < 10_000; i++) { + agentRunSeq.set(`run-${String(i)}`, seq); + seq += 1; + } + for (let i = 0; i < extraRunCount; i++) { + agentRunSeq.set(`overflow-${String(i)}`, seq); + seq += 1; + } +} + function stopMaintenanceTimers(timers: { tickInterval: NodeJS.Timeout; healthInterval: NodeJS.Timeout; @@ -123,4 +175,250 @@ describe("startGatewayMaintenanceTimers", () => { stopMaintenanceTimers(timers); }); + + it("clears timeout-aborted seq and recovery state before the same effective key is reused", async () => { + vi.useFakeTimers(); + vi.setSystemTime(1_000); + const { startGatewayMaintenanceTimers } = await import("./server-maintenance.js"); + const deps = createMaintenanceTimerDeps(); + const runId = "client-timeout-reuse"; + const sessionKey = "session-timeout-reuse"; + seedEffectiveRunState(deps.chatRunState, runId); + deps.agentRunSeq.set(runId, 3); + deps.chatRunState.registry.add(runId, { sessionKey, clientRunId: runId }); + deps.chatAbortControllers.set(runId, { + controller: new AbortController(), + sessionId: sessionKey, + sessionKey, + startedAtMs: Date.now() - 5_000, + expiresAtMs: Date.now() - 1, + }); + + const timers = startGatewayMaintenanceTimers(deps); + await vi.advanceTimersByTimeAsync(60_000); + + expect(deps.chatRunState.lastSeenEventSeq.has(runId)).toBe(false); + expect(deps.chatRunState.lastAcceptedSeq.has(runId)).toBe(false); + expect(deps.chatRunState.waitingForRecovery.has(runId)).toBe(false); + expect(deps.chatRunState.deltaLastBroadcastLen.has(runId)).toBe(false); + expect(deps.agentRunSeq.has(runId)).toBe(false); + + deps.chatRunState.abortedRuns.delete(runId); + deps.chatRunState.registry.add(runId, { sessionKey, clientRunId: runId }); + + const broadcast = vi.fn(); + const nodeSendToSession = vi.fn(); + const handler = createAgentEventHandler({ + broadcast, + broadcastToConnIds: vi.fn(), + nodeSendToSession, + agentRunSeq: deps.agentRunSeq, + chatRunState: deps.chatRunState, + resolveSessionKeyForRun: () => undefined, + clearAgentRunContext: vi.fn(), + toolEventRecipients: createToolEventRecipientRegistry(), + sessionEventSubscribers: createSessionEventSubscriberRegistry(), + }); + + handler({ + runId, + seq: 1, + stream: "assistant", + ts: Date.now(), + data: { text: "Fresh start" }, + }); + handler({ + runId, + seq: 2, + stream: "lifecycle", + ts: Date.now(), + data: { phase: "end" }, + }); + + const chatCalls = broadcast.mock.calls.filter(([event]) => event === "chat"); + expect(chatCalls).toHaveLength(2); + const finalPayload = chatCalls.at(-1)?.[1] as { + message?: { content?: Array<{ text?: string }> }; + }; + expect(finalPayload.message?.content?.[0]?.text).toBe("Fresh start"); + + stopMaintenanceTimers(timers); + }); + + it("prunes all per-run maps after aborted-run TTL expiry", async () => { + vi.useFakeTimers(); + vi.setSystemTime(5_000); + const { startGatewayMaintenanceTimers } = await import("./server-maintenance.js"); + const deps = createMaintenanceTimerDeps(); + const runId = "client-prune"; + seedEffectiveRunState(deps.chatRunState, runId, "Stale text"); + deps.chatRunState.abortedRuns.set(runId, Date.now() - 60 * 60_000 - 1); + + const timers = startGatewayMaintenanceTimers(deps); + await vi.advanceTimersByTimeAsync(60_000); + + expect(deps.chatRunState.abortedRuns.has(runId)).toBe(false); + expect(deps.chatRunState.buffers.has(runId)).toBe(false); + expect(deps.chatRunState.lastSeenEventSeq.has(runId)).toBe(false); + expect(deps.chatRunState.lastAcceptedSeq.has(runId)).toBe(false); + expect(deps.chatRunState.waitingForRecovery.has(runId)).toBe(false); + expect(deps.chatRunState.deltaLastBroadcastText.has(runId)).toBe(false); + expect(deps.chatRunState.deltaSentAt.has(runId)).toBe(false); + expect(deps.chatRunState.deltaLastBroadcastLen.has(runId)).toBe(false); + + stopMaintenanceTimers(timers); + }); + + it("eviction clears stale effective-run state before a client-visible key is reused", async () => { + vi.useFakeTimers(); + vi.setSystemTime(10_000); + const { startGatewayMaintenanceTimers } = await import("./server-maintenance.js"); + const deps = createMaintenanceTimerDeps(); + const runId = "client-reused"; + + seedEffectiveRunState(deps.chatRunState, runId, "Stale reused text"); + deps.chatRunState.abortedRuns.set(runId, Date.now() - 500); + seedAgentRunSeqPastCap(deps.agentRunSeq, runId); + + const timers = startGatewayMaintenanceTimers(deps); + await vi.advanceTimersByTimeAsync(60_000); + + expect(deps.agentRunSeq.has(runId)).toBe(false); + expect(deps.chatRunState.abortedRuns.has(runId)).toBe(true); + expect(deps.chatRunState.buffers.has(runId)).toBe(false); + expect(deps.chatRunState.lastSeenEventSeq.has(runId)).toBe(false); + expect(deps.chatRunState.lastAcceptedSeq.has(runId)).toBe(false); + expect(deps.chatRunState.waitingForRecovery.has(runId)).toBe(false); + expect(deps.chatRunState.deltaLastBroadcastText.has(runId)).toBe(false); + expect(deps.chatRunState.deltaSentAt.has(runId)).toBe(false); + expect(deps.chatRunState.deltaLastBroadcastLen.has(runId)).toBe(false); + + stopMaintenanceTimers(timers); + }); + + it("bounds abandoned observed run state through the same agentRunSeq eviction path", async () => { + vi.useFakeTimers(); + vi.setSystemTime(20_000); + const { startGatewayMaintenanceTimers } = await import("./server-maintenance.js"); + const deps = createMaintenanceTimerDeps(); + const runId = "client-abandoned-observed"; + + seedEffectiveRunState(deps.chatRunState, runId, "Observed but never ended"); + seedAgentRunSeqPastCap(deps.agentRunSeq, runId); + + const timers = startGatewayMaintenanceTimers(deps); + await vi.advanceTimersByTimeAsync(60_000); + + expect(deps.agentRunSeq.has(runId)).toBe(false); + expect(deps.chatRunState.lastSeenEventSeq.has(runId)).toBe(false); + expect(deps.chatRunState.lastAcceptedSeq.has(runId)).toBe(false); + expect(deps.chatRunState.waitingForRecovery.has(runId)).toBe(false); + expect(deps.chatRunState.buffers.has(runId)).toBe(false); + expect(deps.chatRunState.deltaLastBroadcastText.has(runId)).toBe(false); + expect(deps.chatRunState.deltaSentAt.has(runId)).toBe(false); + expect(deps.chatRunState.deltaLastBroadcastLen.has(runId)).toBe(false); + + stopMaintenanceTimers(timers); + }); + + it("preserves aborted markers during overflow eviction until a late terminal cleanup can consume them", async () => { + vi.useFakeTimers(); + vi.setSystemTime(15_000); + const { startGatewayMaintenanceTimers } = await import("./server-maintenance.js"); + const deps = createMaintenanceTimerDeps(); + const runId = "client-aborted-overflow"; + + seedEffectiveRunState(deps.chatRunState, runId, "Aborted text"); + deps.chatRunState.abortedRuns.set(runId, Date.now() - 500); + seedAgentRunSeqPastCap(deps.agentRunSeq, runId); + + const timers = startGatewayMaintenanceTimers(deps); + await vi.advanceTimersByTimeAsync(60_000); + + expect(deps.agentRunSeq.has(runId)).toBe(false); + expect(deps.chatRunState.abortedRuns.has(runId)).toBe(true); + expect(deps.chatRunState.buffers.has(runId)).toBe(false); + expect(deps.chatRunState.lastSeenEventSeq.has(runId)).toBe(false); + expect(deps.chatRunState.lastAcceptedSeq.has(runId)).toBe(false); + expect(deps.chatRunState.waitingForRecovery.has(runId)).toBe(false); + expect(deps.chatRunState.deltaLastBroadcastText.has(runId)).toBe(false); + expect(deps.chatRunState.deltaSentAt.has(runId)).toBe(false); + expect(deps.chatRunState.deltaLastBroadcastLen.has(runId)).toBe(false); + + stopMaintenanceTimers(timers); + }); + + it("skips active chat keys during agentRunSeq overflow eviction", async () => { + vi.useFakeTimers(); + vi.setSystemTime(30_000); + const { startGatewayMaintenanceTimers } = await import("./server-maintenance.js"); + const deps = createMaintenanceTimerDeps(); + const activeRunId = "client-active"; + const inactiveRunId = "client-inactive-old"; + + seedEffectiveRunState(deps.chatRunState, activeRunId, "Active text"); + seedEffectiveRunState(deps.chatRunState, inactiveRunId, "Inactive text"); + deps.chatRunState.abortedRuns.set(inactiveRunId, Date.now() - 500); + deps.chatAbortControllers.set(activeRunId, { + controller: new AbortController(), + sessionId: "session-active", + sessionKey: "session-active", + startedAtMs: Date.now() - 1_000, + expiresAtMs: Date.now() + 60_000, + }); + seedAgentRunSeqOverCapWithOlderRuns(deps.agentRunSeq, [activeRunId, inactiveRunId]); + + const timers = startGatewayMaintenanceTimers(deps); + await vi.advanceTimersByTimeAsync(60_000); + + expect(deps.agentRunSeq.has(activeRunId)).toBe(true); + expect(deps.chatRunState.buffers.get(activeRunId)).toBe("Active text"); + expect(deps.chatRunState.lastSeenEventSeq.get(activeRunId)).toBe(3); + expect(deps.chatRunState.lastAcceptedSeq.get(activeRunId)).toBe(2); + expect(deps.chatRunState.waitingForRecovery.has(activeRunId)).toBe(true); + expect(deps.chatRunState.deltaLastBroadcastText.get(activeRunId)).toBe("Active text"); + expect(deps.chatRunState.deltaLastBroadcastLen.get(activeRunId)).toBe("Active text".length); + + expect(deps.agentRunSeq.has(inactiveRunId)).toBe(false); + expect(deps.chatRunState.abortedRuns.has(inactiveRunId)).toBe(true); + expect(deps.chatRunState.buffers.has(inactiveRunId)).toBe(false); + expect(deps.chatRunState.lastSeenEventSeq.has(inactiveRunId)).toBe(false); + expect(deps.chatRunState.lastAcceptedSeq.has(inactiveRunId)).toBe(false); + expect(deps.chatRunState.waitingForRecovery.has(inactiveRunId)).toBe(false); + expect(deps.chatRunState.deltaLastBroadcastText.has(inactiveRunId)).toBe(false); + expect(deps.chatRunState.deltaSentAt.has(inactiveRunId)).toBe(false); + expect(deps.chatRunState.deltaLastBroadcastLen.has(inactiveRunId)).toBe(false); + + stopMaintenanceTimers(timers); + }); + + it("still evicts inactive overflow keys and clears their effective state", async () => { + vi.useFakeTimers(); + vi.setSystemTime(40_000); + const { startGatewayMaintenanceTimers } = await import("./server-maintenance.js"); + const deps = createMaintenanceTimerDeps(); + const inactiveOldRunIds = ["client-inactive-1", "client-inactive-2"]; + + for (const runId of inactiveOldRunIds) { + seedEffectiveRunState(deps.chatRunState, runId, `State for ${runId}`); + } + seedAgentRunSeqOverCapWithOlderRuns(deps.agentRunSeq, inactiveOldRunIds, 2); + + const timers = startGatewayMaintenanceTimers(deps); + await vi.advanceTimersByTimeAsync(60_000); + + for (const runId of inactiveOldRunIds) { + expect(deps.agentRunSeq.has(runId)).toBe(false); + expect(deps.chatRunState.abortedRuns.has(runId)).toBe(false); + expect(deps.chatRunState.buffers.has(runId)).toBe(false); + expect(deps.chatRunState.lastSeenEventSeq.has(runId)).toBe(false); + expect(deps.chatRunState.lastAcceptedSeq.has(runId)).toBe(false); + expect(deps.chatRunState.waitingForRecovery.has(runId)).toBe(false); + expect(deps.chatRunState.deltaLastBroadcastText.has(runId)).toBe(false); + expect(deps.chatRunState.deltaSentAt.has(runId)).toBe(false); + expect(deps.chatRunState.deltaLastBroadcastLen.has(runId)).toBe(false); + } + + stopMaintenanceTimers(timers); + }); }); diff --git a/src/gateway/server-maintenance.ts b/src/gateway/server-maintenance.ts index 581e0d43ec3..e265b018011 100644 --- a/src/gateway/server-maintenance.ts +++ b/src/gateway/server-maintenance.ts @@ -1,7 +1,11 @@ import type { HealthSummary } from "../commands/health.js"; import { cleanOldMedia } from "../media/store.js"; import { abortChatRunById, type ChatAbortControllerEntry } from "./chat-abort.js"; -import type { ChatRunEntry } from "./server-chat.js"; +import { + clearEffectiveChatRunState, + type ChatRunEntry, + type EffectiveChatRunStateSlice, +} from "./server-chat.js"; import { DEDUPE_MAX, DEDUPE_TTL_MS, @@ -28,9 +32,7 @@ export function startGatewayMaintenanceTimers(params: { logHealth: { error: (msg: string) => void }; dedupe: Map; chatAbortControllers: Map; - chatRunState: { abortedRuns: Map }; - chatRunBuffers: Map; - chatDeltaSentAt: Map; + chatRunState: EffectiveChatRunStateSlice & { abortedRuns: Map }; removeChatRun: ( sessionId: string, clientRunId: string, @@ -94,7 +96,11 @@ export function startGatewayMaintenanceTimers(params: { const excess = params.agentRunSeq.size - AGENT_RUN_SEQ_MAX; let removed = 0; for (const runId of params.agentRunSeq.keys()) { + if (params.chatAbortControllers.has(runId)) { + continue; + } params.agentRunSeq.delete(runId); + clearEffectiveChatRunState(params.chatRunState, runId); removed += 1; if (removed >= excess) { break; @@ -109,9 +115,8 @@ export function startGatewayMaintenanceTimers(params: { abortChatRunById( { chatAbortControllers: params.chatAbortControllers, - chatRunBuffers: params.chatRunBuffers, - chatDeltaSentAt: params.chatDeltaSentAt, chatAbortedRuns: params.chatRunState.abortedRuns, + chatRunState: params.chatRunState, removeChatRun: params.removeChatRun, agentRunSeq: params.agentRunSeq, broadcast: params.broadcast, @@ -127,8 +132,7 @@ export function startGatewayMaintenanceTimers(params: { continue; } params.chatRunState.abortedRuns.delete(runId); - params.chatRunBuffers.delete(runId); - params.chatDeltaSentAt.delete(runId); + clearEffectiveChatRunState(params.chatRunState, runId); } }, 60_000); diff --git a/src/gateway/server-methods/chat.abort.test-helpers.ts b/src/gateway/server-methods/chat.abort.test-helpers.ts index fb6efebd8f5..acdf553ca95 100644 --- a/src/gateway/server-methods/chat.abort.test-helpers.ts +++ b/src/gateway/server-methods/chat.abort.test-helpers.ts @@ -1,5 +1,6 @@ import { vi } from "vitest"; import type { Mock } from "vitest"; +import { createChatRunState } from "../server-chat.js"; import type { GatewayRequestHandler, RespondFn } from "./types.js"; export function createActiveRun( @@ -26,6 +27,7 @@ export type ChatAbortTestContext = Record & { chatRunBuffers: Map; chatDeltaSentAt: Map; chatAbortedRuns: Map; + chatRunState: ReturnType; removeChatRun: (...args: unknown[]) => { sessionKey: string; clientRunId: string } | undefined; agentRunSeq: Map; broadcast: (...args: unknown[]) => void; @@ -38,11 +40,36 @@ export type ChatAbortRespondMock = Mock; export function createChatAbortContext( overrides: Record = {}, ): ChatAbortTestContext { + const { + chatRunState: overrideChatRunState, + chatRunBuffers: overrideChatRunBuffers, + chatDeltaSentAt: overrideChatDeltaSentAt, + chatAbortedRuns: overrideChatAbortedRuns, + ...rest + } = overrides as Record & { + chatRunState?: ReturnType; + chatRunBuffers?: Map; + chatDeltaSentAt?: Map; + chatAbortedRuns?: Map; + }; + const chatRunState = overrideChatRunState ?? createChatRunState(); + + const seedMap = (target: Map, source?: Map) => { + if (!source || source === target) { + return; + } + target.clear(); + for (const [key, value] of source) { + target.set(key, value); + } + }; + + seedMap(chatRunState.buffers, overrideChatRunBuffers); + seedMap(chatRunState.deltaSentAt, overrideChatDeltaSentAt); + seedMap(chatRunState.abortedRuns, overrideChatAbortedRuns); + return { chatAbortControllers: new Map(), - chatRunBuffers: new Map(), - chatDeltaSentAt: new Map(), - chatAbortedRuns: new Map(), removeChatRun: vi .fn() .mockImplementation((run: string) => ({ sessionKey: "main", clientRunId: run })), @@ -50,7 +77,11 @@ export function createChatAbortContext( broadcast: vi.fn(), nodeSendToSession: vi.fn(), logGateway: { warn: vi.fn() }, - ...overrides, + ...rest, + chatRunState, + chatRunBuffers: chatRunState.buffers, + chatDeltaSentAt: chatRunState.deltaSentAt, + chatAbortedRuns: chatRunState.abortedRuns, }; } diff --git a/src/gateway/server-methods/chat.ts b/src/gateway/server-methods/chat.ts index d2533f0413b..cb5160ee863 100644 --- a/src/gateway/server-methods/chat.ts +++ b/src/gateway/server-methods/chat.ts @@ -766,9 +766,8 @@ function persistAbortedPartials(params: { function createChatAbortOps(context: GatewayRequestContext): ChatAbortOps { return { chatAbortControllers: context.chatAbortControllers, - chatRunBuffers: context.chatRunBuffers, - chatDeltaSentAt: context.chatDeltaSentAt, chatAbortedRuns: context.chatAbortedRuns, + chatRunState: context.chatRunState, removeChatRun: context.removeChatRun, agentRunSeq: context.agentRunSeq, broadcast: context.broadcast, diff --git a/src/gateway/server-methods/types.ts b/src/gateway/server-methods/types.ts index 39a6f458a5f..0d102d1f630 100644 --- a/src/gateway/server-methods/types.ts +++ b/src/gateway/server-methods/types.ts @@ -10,6 +10,7 @@ import type { NodeRegistry } from "../node-registry.js"; import type { ConnectParams, ErrorShape, RequestFrame } from "../protocol/index.js"; import type { GatewayBroadcastFn, GatewayBroadcastToConnIdsFn } from "../server-broadcast.js"; import type { ChannelRuntimeSnapshot } from "../server-channels.js"; +import type { ChatRunState } from "../server-chat.js"; import type { DedupeEntry } from "../server-shared.js"; type SubsystemLogger = ReturnType; @@ -61,6 +62,7 @@ export type GatewayRequestContext = { chatAbortedRuns: Map; chatRunBuffers: Map; chatDeltaSentAt: Map; + chatRunState: ChatRunState; addChatRun: (sessionId: string, entry: { sessionKey: string; clientRunId: string }) => void; removeChatRun: ( sessionId: string, diff --git a/src/gateway/server.impl.ts b/src/gateway/server.impl.ts index 7a4c18b6593..ef29905c2b7 100644 --- a/src/gateway/server.impl.ts +++ b/src/gateway/server.impl.ts @@ -687,8 +687,6 @@ export async function startGatewayServer( agentRunSeq, dedupe, chatRunState, - chatRunBuffers, - chatDeltaSentAt, addChatRun, removeChatRun, chatAbortControllers, @@ -812,8 +810,6 @@ export async function startGatewayServer( dedupe, chatAbortControllers, chatRunState, - chatRunBuffers, - chatDeltaSentAt, removeChatRun, agentRunSeq, nodeSendToSession, @@ -1099,6 +1095,7 @@ export async function startGatewayServer( chatAbortedRuns: chatRunState.abortedRuns, chatRunBuffers: chatRunState.buffers, chatDeltaSentAt: chatRunState.deltaSentAt, + chatRunState, addChatRun, removeChatRun, subscribeSessionEvents: sessionEventSubscribers.subscribe,