From 58abaa0942ef7fc9107134133999b97f4520377f Mon Sep 17 00:00:00 2001 From: Chao Fan Date: Fri, 20 Mar 2026 21:55:45 +0800 Subject: [PATCH] fix(gateway): ignore stale post-lifecycle tails without hiding seq gaps --- src/gateway/server-chat.agent-events.test.ts | 24 ++++++++++++++++++++ src/gateway/server-chat.ts | 9 ++++++++ src/gateway/server-maintenance.test.ts | 2 +- src/gateway/server-maintenance.ts | 9 +++++++- 4 files changed, 42 insertions(+), 2 deletions(-) diff --git a/src/gateway/server-chat.agent-events.test.ts b/src/gateway/server-chat.agent-events.test.ts index dd644955afc..a03c25bcb32 100644 --- a/src/gateway/server-chat.agent-events.test.ts +++ b/src/gateway/server-chat.agent-events.test.ts @@ -527,6 +527,30 @@ describe("agent event handler", () => { nowSpy?.mockRestore(); }); + it("still emits a seq-gap error when the first observed event is not seq 1", () => { + const { broadcast, handler, nowSpy } = createHarness({ now: 3_000 }); + + handler({ + runId: "run-missed-start", + seq: 3, + stream: "assistant", + ts: Date.now(), + data: { text: "late first chunk" }, + }); + + const errorCalls = broadcast.mock.calls.filter( + ([event, payload]) => + event === "agent" && (payload as { stream?: string }).stream === "error", + ); + expect(errorCalls).toHaveLength(1); + expect(errorCalls[0]?.[1]).toMatchObject({ + runId: "run-missed-start", + stream: "error", + data: { reason: "seq gap", expected: 1, received: 3 }, + }); + nowSpy?.mockRestore(); + }); + it("flushes buffered chat delta before tool start events", () => { let now = 12_000; const nowSpy = vi.spyOn(Date, "now").mockImplementation(() => now); diff --git a/src/gateway/server-chat.ts b/src/gateway/server-chat.ts index 7fda61b6c0c..dd72ceb32ac 100644 --- a/src/gateway/server-chat.ts +++ b/src/gateway/server-chat.ts @@ -207,6 +207,7 @@ export type ChatRunState = { /** Length of text at the time of the last broadcast, used to avoid duplicate flushes. */ deltaLastBroadcastLen: Map; abortedRuns: Map; + finalizedRuns: Map; clear: () => void; }; @@ -216,6 +217,7 @@ export function createChatRunState(): ChatRunState { const deltaSentAt = new Map(); const deltaLastBroadcastLen = new Map(); const abortedRuns = new Map(); + const finalizedRuns = new Map(); const clear = () => { registry.clear(); @@ -223,6 +225,7 @@ export function createChatRunState(): ChatRunState { deltaSentAt.clear(); deltaLastBroadcastLen.clear(); abortedRuns.clear(); + finalizedRuns.clear(); }; return { @@ -231,6 +234,7 @@ export function createChatRunState(): ChatRunState { deltaSentAt, deltaLastBroadcastLen, abortedRuns, + finalizedRuns, clear, }; } @@ -696,6 +700,10 @@ export function createAgentEventHandler({ // Include sessionKey so Control UI can filter tool streams per session. const agentPayload = sessionKey ? { ...eventForClients, sessionKey } : eventForClients; const last = agentRunSeq.get(evt.runId) ?? 0; + const isStalePostLifecycleEvent = last === 0 && chatRunState.finalizedRuns.has(evt.runId); + if (isStalePostLifecycleEvent) { + return; + } const isToolEvent = evt.stream === "tool"; const toolVerbose = isToolEvent ? resolveToolVerboseLevel(evt.runId, sessionKey) : "off"; // Build tool payload: strip result/partialResult unless verbose=full @@ -808,6 +816,7 @@ export function createAgentEventHandler({ if (lifecyclePhase === "end" || lifecyclePhase === "error") { toolEventRecipients.markFinal(evt.runId); clearAgentRunContext(evt.runId); + chatRunState.finalizedRuns.set(evt.runId, Date.now()); agentRunSeq.delete(evt.runId); agentRunSeq.delete(clientRunId); } diff --git a/src/gateway/server-maintenance.test.ts b/src/gateway/server-maintenance.test.ts index 045f73d802a..2a80fc3034d 100644 --- a/src/gateway/server-maintenance.test.ts +++ b/src/gateway/server-maintenance.test.ts @@ -23,7 +23,7 @@ function createMaintenanceTimerDeps() { logHealth: { error: () => {} }, dedupe: new Map(), chatAbortControllers: new Map(), - chatRunState: { abortedRuns: new Map() }, + chatRunState: { abortedRuns: new Map(), finalizedRuns: new Map() }, chatRunBuffers: new Map(), chatDeltaSentAt: new Map(), removeChatRun: () => undefined, diff --git a/src/gateway/server-maintenance.ts b/src/gateway/server-maintenance.ts index 581e0d43ec3..0e86e9e9fa8 100644 --- a/src/gateway/server-maintenance.ts +++ b/src/gateway/server-maintenance.ts @@ -28,7 +28,7 @@ export function startGatewayMaintenanceTimers(params: { logHealth: { error: (msg: string) => void }; dedupe: Map; chatAbortControllers: Map; - chatRunState: { abortedRuns: Map }; + chatRunState: { abortedRuns: Map; finalizedRuns: Map }; chatRunBuffers: Map; chatDeltaSentAt: Map; removeChatRun: ( @@ -122,6 +122,7 @@ export function startGatewayMaintenanceTimers(params: { } const ABORTED_RUN_TTL_MS = 60 * 60_000; + const FINALIZED_RUN_TTL_MS = 5 * 60_000; for (const [runId, abortedAt] of params.chatRunState.abortedRuns) { if (now - abortedAt <= ABORTED_RUN_TTL_MS) { continue; @@ -130,6 +131,12 @@ export function startGatewayMaintenanceTimers(params: { params.chatRunBuffers.delete(runId); params.chatDeltaSentAt.delete(runId); } + for (const [runId, finalizedAt] of params.chatRunState.finalizedRuns) { + if (now - finalizedAt <= FINALIZED_RUN_TTL_MS) { + continue; + } + params.chatRunState.finalizedRuns.delete(runId); + } }, 60_000); if (typeof params.mediaCleanupTtlMs !== "number") {