Merge 58abaa0942ef7fc9107134133999b97f4520377f into 598f1826d8b2bc969aace2c6459824737667218c

This commit is contained in:
Vidos 2026-03-21 03:15:10 +00:00 committed by GitHub
commit 55fac1d32e
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 42 additions and 2 deletions

View File

@ -527,6 +527,30 @@ describe("agent event handler", () => {
nowSpy?.mockRestore();
});
it("still emits a seq-gap error when the first observed event is not seq 1", () => {
const { broadcast, handler, nowSpy } = createHarness({ now: 3_000 });
handler({
runId: "run-missed-start",
seq: 3,
stream: "assistant",
ts: Date.now(),
data: { text: "late first chunk" },
});
const errorCalls = broadcast.mock.calls.filter(
([event, payload]) =>
event === "agent" && (payload as { stream?: string }).stream === "error",
);
expect(errorCalls).toHaveLength(1);
expect(errorCalls[0]?.[1]).toMatchObject({
runId: "run-missed-start",
stream: "error",
data: { reason: "seq gap", expected: 1, received: 3 },
});
nowSpy?.mockRestore();
});
it("flushes buffered chat delta before tool start events", () => {
let now = 12_000;
const nowSpy = vi.spyOn(Date, "now").mockImplementation(() => now);

View File

@ -207,6 +207,7 @@ export type ChatRunState = {
/** Length of text at the time of the last broadcast, used to avoid duplicate flushes. */
deltaLastBroadcastLen: Map<string, number>;
abortedRuns: Map<string, number>;
finalizedRuns: Map<string, number>;
clear: () => void;
};
@ -216,6 +217,7 @@ export function createChatRunState(): ChatRunState {
const deltaSentAt = new Map<string, number>();
const deltaLastBroadcastLen = new Map<string, number>();
const abortedRuns = new Map<string, number>();
const finalizedRuns = new Map<string, number>();
const clear = () => {
registry.clear();
@ -223,6 +225,7 @@ export function createChatRunState(): ChatRunState {
deltaSentAt.clear();
deltaLastBroadcastLen.clear();
abortedRuns.clear();
finalizedRuns.clear();
};
return {
@ -231,6 +234,7 @@ export function createChatRunState(): ChatRunState {
deltaSentAt,
deltaLastBroadcastLen,
abortedRuns,
finalizedRuns,
clear,
};
}
@ -696,6 +700,10 @@ export function createAgentEventHandler({
// Include sessionKey so Control UI can filter tool streams per session.
const agentPayload = sessionKey ? { ...eventForClients, sessionKey } : eventForClients;
const last = agentRunSeq.get(evt.runId) ?? 0;
const isStalePostLifecycleEvent = last === 0 && chatRunState.finalizedRuns.has(evt.runId);
if (isStalePostLifecycleEvent) {
return;
}
const isToolEvent = evt.stream === "tool";
const toolVerbose = isToolEvent ? resolveToolVerboseLevel(evt.runId, sessionKey) : "off";
// Build tool payload: strip result/partialResult unless verbose=full
@ -808,6 +816,7 @@ export function createAgentEventHandler({
if (lifecyclePhase === "end" || lifecyclePhase === "error") {
toolEventRecipients.markFinal(evt.runId);
clearAgentRunContext(evt.runId);
chatRunState.finalizedRuns.set(evt.runId, Date.now());
agentRunSeq.delete(evt.runId);
agentRunSeq.delete(clientRunId);
}

View File

@ -23,7 +23,7 @@ function createMaintenanceTimerDeps() {
logHealth: { error: () => {} },
dedupe: new Map(),
chatAbortControllers: new Map(),
chatRunState: { abortedRuns: new Map() },
chatRunState: { abortedRuns: new Map(), finalizedRuns: new Map() },
chatRunBuffers: new Map(),
chatDeltaSentAt: new Map(),
removeChatRun: () => undefined,

View File

@ -28,7 +28,7 @@ export function startGatewayMaintenanceTimers(params: {
logHealth: { error: (msg: string) => void };
dedupe: Map<string, DedupeEntry>;
chatAbortControllers: Map<string, ChatAbortControllerEntry>;
chatRunState: { abortedRuns: Map<string, number> };
chatRunState: { abortedRuns: Map<string, number>; finalizedRuns: Map<string, number> };
chatRunBuffers: Map<string, string>;
chatDeltaSentAt: Map<string, number>;
removeChatRun: (
@ -122,6 +122,7 @@ export function startGatewayMaintenanceTimers(params: {
}
const ABORTED_RUN_TTL_MS = 60 * 60_000;
const FINALIZED_RUN_TTL_MS = 5 * 60_000;
for (const [runId, abortedAt] of params.chatRunState.abortedRuns) {
if (now - abortedAt <= ABORTED_RUN_TTL_MS) {
continue;
@ -130,6 +131,12 @@ export function startGatewayMaintenanceTimers(params: {
params.chatRunBuffers.delete(runId);
params.chatDeltaSentAt.delete(runId);
}
for (const [runId, finalizedAt] of params.chatRunState.finalizedRuns) {
if (now - finalizedAt <= FINALIZED_RUN_TTL_MS) {
continue;
}
params.chatRunState.finalizedRuns.delete(runId);
}
}, 60_000);
if (typeof params.mediaCleanupTtlMs !== "number") {