Merge c7c89062e2eafbc858f18aafe23ec31bde5e7f68 into 598f1826d8b2bc969aace2c6459824737667218c

This commit is contained in:
raystorm 2026-03-21 04:02:54 +00:00 committed by GitHub
commit 7a5a6c8e39
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
10 changed files with 2007 additions and 125 deletions

View File

@ -5,6 +5,7 @@ import {
type ChatAbortOps,
type ChatAbortControllerEntry,
} from "./chat-abort.js";
import { createChatRunState } from "./server-chat.js";
function createActiveEntry(sessionKey: string): ChatAbortControllerEntry {
const now = Date.now();
@ -21,21 +22,25 @@ function createOps(params: {
runId: string;
entry: ChatAbortControllerEntry;
buffer?: string;
chatRunState?: ReturnType<typeof createChatRunState>;
}): ChatAbortOps & {
broadcast: ReturnType<typeof vi.fn>;
nodeSendToSession: ReturnType<typeof vi.fn>;
removeChatRun: ReturnType<typeof vi.fn>;
} {
const { runId, entry, buffer } = params;
const { runId, entry, buffer, chatRunState = createChatRunState() } = params;
const broadcast = vi.fn();
const nodeSendToSession = vi.fn();
const removeChatRun = vi.fn();
if (buffer !== undefined) {
chatRunState.buffers.set(runId, buffer);
}
chatRunState.deltaSentAt.set(runId, Date.now());
return {
chatAbortControllers: new Map([[runId, entry]]),
chatRunBuffers: new Map(buffer !== undefined ? [[runId, buffer]] : []),
chatDeltaSentAt: new Map([[runId, Date.now()]]),
chatAbortedRuns: new Map(),
chatAbortedRuns: chatRunState.abortedRuns,
chatRunState,
removeChatRun,
agentRunSeq: new Map(),
broadcast,
@ -76,8 +81,8 @@ describe("abortChatRunById", () => {
expect(result).toEqual({ aborted: true });
expect(entry.controller.signal.aborted).toBe(true);
expect(ops.chatAbortControllers.has(runId)).toBe(false);
expect(ops.chatRunBuffers.has(runId)).toBe(false);
expect(ops.chatDeltaSentAt.has(runId)).toBe(false);
expect(ops.chatRunState.buffers.has(runId)).toBe(false);
expect(ops.chatRunState.deltaSentAt.has(runId)).toBe(false);
expect(ops.removeChatRun).toHaveBeenCalledWith(runId, runId, sessionKey);
expect(ops.agentRunSeq.has(runId)).toBe(false);
expect(ops.agentRunSeq.has("client-run-1")).toBe(false);
@ -124,7 +129,7 @@ describe("abortChatRunById", () => {
// Simulate synchronous cleanup triggered by AbortController listeners.
entry.controller.signal.addEventListener("abort", () => {
ops.chatRunBuffers.delete(runId);
ops.chatRunState.buffers.delete(runId);
});
const result = abortChatRunById(ops, { runId, sessionKey });
@ -138,4 +143,28 @@ describe("abortChatRunById", () => {
}),
);
});
it("clears effective-run recovery state so a reused key starts fresh after abort teardown", () => {
const runId = "retry-run";
const sessionKey = "main";
const entry = createActiveEntry(sessionKey);
const chatRunState = createChatRunState();
chatRunState.buffers.set(runId, "Hello");
chatRunState.lastSeenEventSeq.set(runId, 3);
chatRunState.lastAcceptedSeq.set(runId, 1);
chatRunState.waitingForRecovery.add(runId);
chatRunState.deltaSentAt.set(runId, 10);
chatRunState.deltaLastBroadcastLen.set(runId, 5);
const ops = createOps({ runId, entry, buffer: "Hello", chatRunState });
const result = abortChatRunById(ops, { runId, sessionKey });
expect(result).toEqual({ aborted: true });
expect(chatRunState.buffers.has(runId)).toBe(false);
expect(chatRunState.lastSeenEventSeq.has(runId)).toBe(false);
expect(chatRunState.lastAcceptedSeq.has(runId)).toBe(false);
expect(chatRunState.waitingForRecovery.has(runId)).toBe(false);
expect(chatRunState.deltaSentAt.has(runId)).toBe(false);
expect(chatRunState.deltaLastBroadcastLen.has(runId)).toBe(false);
});
});

View File

@ -1,4 +1,5 @@
import { isAbortRequestText } from "../auto-reply/reply/abort.js";
import { clearEffectiveChatRunState, type EffectiveChatRunStateSlice } from "./server-chat.js";
export type ChatAbortControllerEntry = {
controller: AbortController;
@ -31,9 +32,8 @@ export function resolveChatRunExpiresAtMs(params: {
export type ChatAbortOps = {
chatAbortControllers: Map<string, ChatAbortControllerEntry>;
chatRunBuffers: Map<string, string>;
chatDeltaSentAt: Map<string, number>;
chatAbortedRuns: Map<string, number>;
chatRunState: EffectiveChatRunStateSlice;
removeChatRun: (
sessionId: string,
clientRunId: string,
@ -89,18 +89,18 @@ export function abortChatRunById(
return { aborted: false };
}
const bufferedText = ops.chatRunBuffers.get(runId);
const bufferedText = ops.chatRunState.buffers.get(runId);
const partialText = bufferedText && bufferedText.trim() ? bufferedText : undefined;
ops.chatAbortedRuns.set(runId, Date.now());
active.controller.abort();
ops.chatAbortControllers.delete(runId);
ops.chatRunBuffers.delete(runId);
ops.chatDeltaSentAt.delete(runId);
clearEffectiveChatRunState(ops.chatRunState, runId);
const removed = ops.removeChatRun(runId, runId, sessionKey);
broadcastChatAborted(ops, { runId, sessionKey, stopReason, partialText });
ops.agentRunSeq.delete(runId);
if (removed?.clientRunId) {
ops.agentRunSeq.delete(removed.clientRunId);
clearEffectiveChatRunState(ops.chatRunState, removed.clientRunId);
}
return { aborted: true };
}

File diff suppressed because it is too large Load Diff

View File

@ -93,6 +93,51 @@ function isSilentReplyLeadFragment(text: string): boolean {
return SILENT_REPLY_TOKEN.startsWith(normalized);
}
/**
* Treat an event as a safe replacement only when `nextText` already carries the
* full visible assistant text for this step.
*
* Example:
* - ACP cumulative snapshot: previous=`Hello`, nextText=`Hello world`,
* nextDelta=` world` => safe full replacement.
* - Ambiguous delta-only producer: nextText=``, nextDelta=` world` => not safe.
*/
function isFullVisibleTextEvent(params: {
previousText: string;
nextText: string;
nextDelta: string;
}) {
const { previousText, nextText, nextDelta } = params;
if (!nextText) {
return false;
}
if (!nextDelta) {
return true;
}
if (!previousText) {
return nextText === nextDelta;
}
return nextText === previousText || nextText.startsWith(previousText);
}
function isDeltaOnlyAssistantEvent(params: { nextText: string; nextDelta: string }) {
return !params.nextText && Boolean(params.nextDelta);
}
function isCumulativeRecoverySnapshotFromEmptyBase(params: {
previousText: string;
nextText: string;
nextDelta: string;
}) {
const { previousText, nextText, nextDelta } = params;
return (
!previousText &&
Boolean(nextDelta) &&
nextText.length > nextDelta.length &&
nextText.endsWith(nextDelta)
);
}
function appendUniqueSuffix(base: string, suffix: string): string {
if (!suffix) {
return base;
@ -112,26 +157,101 @@ function appendUniqueSuffix(base: string, suffix: string): string {
return base + suffix;
}
function resolveMergedAssistantText(params: {
/**
* Run-global seq can only force assistant recovery when we skipped over an
* event we never observed. Already-seen tool/lifecycle events must not freeze
* the assistant buffer.
*/
function hasObservedEventSeqGap(params: {
hasNumericSeq: boolean;
previousSeenEventSeq: number;
nextSeq: number;
}) {
return (
params.hasNumericSeq &&
params.previousSeenEventSeq > 0 &&
params.nextSeq > params.previousSeenEventSeq + 1
);
}
function hasSeenNewerRunEvent(params: {
hasNumericSeq: boolean;
previousSeenEventSeq: number;
nextSeq: number;
}) {
return params.hasNumericSeq && params.previousSeenEventSeq > params.nextSeq;
}
/**
* Delta-only chunks are append-safe only when the run stayed observed in order.
* If we skipped an event, wait for a full replacement instead of appending.
*/
function canAppendDelta(params: {
hasNumericSeq: boolean;
isWaitingForRecovery: boolean;
hasObservedGap: boolean;
nextText: string;
nextDelta: string;
}) {
return (
params.hasNumericSeq &&
!params.isWaitingForRecovery &&
!params.hasObservedGap &&
isDeltaOnlyAssistantEvent({ nextText: params.nextText, nextDelta: params.nextDelta })
);
}
/**
* Recovery may resume only from an event shape that proves `nextText` is the
* complete visible assistant text. ACP-style cumulative `text` + `delta`
* snapshots qualify; delta-only chunks must keep waiting because we cannot
* prove whether they replay a suffix or skip hidden text.
*/
function canRecoverFromFullReplacement(params: {
previousText: string;
nextText: string;
nextDelta: string;
}) {
const { previousText, nextText, nextDelta } = params;
if (nextText && previousText) {
return isFullVisibleTextEvent(params) || isCumulativeRecoverySnapshotFromEmptyBase(params);
}
function resolveMergedAssistantText(params: {
previousText: string;
nextText: string;
nextDelta: string;
allowDeltaAppend: boolean;
allowFullReplacementShrink?: boolean;
allowEmptyBaseRecoveryReplacement?: boolean;
}) {
const {
previousText,
nextText,
nextDelta,
allowDeltaAppend,
allowFullReplacementShrink = false,
allowEmptyBaseRecoveryReplacement = false,
} = params;
if (isFullVisibleTextEvent({ previousText, nextText, nextDelta })) {
if (nextText === previousText) {
return previousText;
}
if (nextText.startsWith(previousText)) {
return nextText;
}
if (previousText.startsWith(nextText) && !nextDelta) {
if (!allowFullReplacementShrink && !nextDelta && previousText.startsWith(nextText)) {
return previousText;
}
}
if (nextDelta) {
return appendUniqueSuffix(previousText, nextDelta);
}
if (nextText) {
return nextText;
}
if (
allowEmptyBaseRecoveryReplacement &&
isCumulativeRecoverySnapshotFromEmptyBase({ previousText, nextText, nextDelta })
) {
return nextText;
}
if (allowDeltaAppend) {
return appendUniqueSuffix(previousText, nextDelta);
}
return previousText;
}
@ -203,38 +323,89 @@ export function createChatRunRegistry(): ChatRunRegistry {
export type ChatRunState = {
registry: ChatRunRegistry;
buffers: Map<string, string>;
/** Highest run-global seq observed for this effective run, across all streams. */
lastSeenEventSeq: Map<string, number>;
/** Highest assistant-visible seq accepted into the chat buffer. */
lastAcceptedSeq: Map<string, number>;
/** Seq gap latch: block delta-only assistant merges until a safe full replacement arrives. */
waitingForRecovery: Set<string>;
/** Last assistant text that was actually broadcast to streaming clients. */
deltaLastBroadcastText: Map<string, string>;
deltaSentAt: Map<string, number>;
/** Length of text at the time of the last broadcast, used to avoid duplicate flushes. */
deltaLastBroadcastLen: Map<string, number>;
/** Run keys that have already hit a terminal lifecycle event (end/error) and are finalized. */
finalizedEffectiveRunKeys: Set<string>;
abortedRuns: Map<string, number>;
clear: () => void;
};
export type EffectiveChatRunStateSlice = Pick<
ChatRunState,
| "buffers"
| "lastSeenEventSeq"
| "lastAcceptedSeq"
| "waitingForRecovery"
| "deltaLastBroadcastText"
| "deltaSentAt"
| "deltaLastBroadcastLen"
| "finalizedEffectiveRunKeys"
>;
export function createChatRunState(): ChatRunState {
const registry = createChatRunRegistry();
const buffers = new Map<string, string>();
const lastSeenEventSeq = new Map<string, number>();
const lastAcceptedSeq = new Map<string, number>();
const waitingForRecovery = new Set<string>();
const deltaLastBroadcastText = new Map<string, string>();
const deltaSentAt = new Map<string, number>();
const deltaLastBroadcastLen = new Map<string, number>();
const finalizedEffectiveRunKeys = new Set<string>();
const abortedRuns = new Map<string, number>();
const clear = () => {
registry.clear();
buffers.clear();
lastSeenEventSeq.clear();
lastAcceptedSeq.clear();
waitingForRecovery.clear();
deltaLastBroadcastText.clear();
deltaSentAt.clear();
deltaLastBroadcastLen.clear();
finalizedEffectiveRunKeys.clear();
abortedRuns.clear();
};
return {
registry,
buffers,
lastSeenEventSeq,
lastAcceptedSeq,
waitingForRecovery,
deltaLastBroadcastText,
deltaSentAt,
deltaLastBroadcastLen,
finalizedEffectiveRunKeys,
abortedRuns,
clear,
};
}
export function clearEffectiveChatRunState(
chatRunState: EffectiveChatRunStateSlice,
effectiveRunKey: string,
) {
chatRunState.buffers.delete(effectiveRunKey);
chatRunState.lastSeenEventSeq.delete(effectiveRunKey);
chatRunState.lastAcceptedSeq.delete(effectiveRunKey);
chatRunState.waitingForRecovery.delete(effectiveRunKey);
chatRunState.deltaLastBroadcastText.delete(effectiveRunKey);
chatRunState.deltaSentAt.delete(effectiveRunKey);
chatRunState.deltaLastBroadcastLen.delete(effectiveRunKey);
chatRunState.finalizedEffectiveRunKeys.delete(effectiveRunKey);
}
export type ToolEventRecipientRegistry = {
add: (runId: string, connId: string) => void;
get: (runId: string) => ReadonlySet<string> | undefined;
@ -452,6 +623,26 @@ export type AgentEventHandlerOptions = {
sessionEventSubscribers: SessionEventSubscriberRegistry;
};
type EmitChatDeltaParams = {
sessionKey: string;
effectiveRunKey: string;
sourceRunId: string;
seq: number;
text: string;
previousSeenEventSeq: number;
delta?: unknown;
};
type ResolveChatDeltaTextParams = Pick<
EmitChatDeltaParams,
"effectiveRunKey" | "seq" | "previousSeenEventSeq"
> & {
previousText: string;
cleanedText: string;
cleanedDelta: string;
hasNumericSeq: boolean;
};
export function createAgentEventHandler({
broadcast,
broadcastToConnIds,
@ -499,45 +690,169 @@ export function createAgentEventHandler({
};
};
const emitChatDelta = (
sessionKey: string,
clientRunId: string,
sourceRunId: string,
seq: number,
text: string,
delta?: unknown,
) => {
const cleanedText = stripInlineDirectiveTagsForDisplay(text).text;
const cleanedDelta =
typeof delta === "string" ? stripInlineDirectiveTagsForDisplay(delta).text : "";
const previousText = chatRunState.buffers.get(clientRunId) ?? "";
const resolveRecoveryChatText = ({
effectiveRunKey,
previousText,
cleanedText,
cleanedDelta,
}: ResolveChatDeltaTextParams) => {
if (
!canRecoverFromFullReplacement({
previousText,
nextText: cleanedText,
nextDelta: cleanedDelta,
})
) {
return undefined;
}
chatRunState.waitingForRecovery.delete(effectiveRunKey);
const replacementText = resolveMergedAssistantText({
previousText,
nextText: cleanedText,
nextDelta: cleanedDelta,
allowDeltaAppend: false,
allowFullReplacementShrink: true,
allowEmptyBaseRecoveryReplacement: true,
});
// Recovery can relock onto the stream without changing visible text.
// That should clear the recovery latch, but it must not advance accepted seq.
if (!replacementText || replacementText === previousText) {
return undefined;
}
return replacementText;
};
const resolveInOrderChatText = ({
effectiveRunKey,
seq,
previousSeenEventSeq,
previousText,
cleanedText,
cleanedDelta,
hasNumericSeq,
}: ResolveChatDeltaTextParams) => {
const hasObservedGap = hasObservedEventSeqGap({
hasNumericSeq,
previousSeenEventSeq,
nextSeq: seq,
});
if (hasObservedGap) {
if (
!canRecoverFromFullReplacement({
previousText,
nextText: cleanedText,
nextDelta: cleanedDelta,
})
) {
chatRunState.waitingForRecovery.add(effectiveRunKey);
return undefined;
}
chatRunState.waitingForRecovery.delete(effectiveRunKey);
}
const mergedText = resolveMergedAssistantText({
previousText,
nextText: cleanedText,
nextDelta: cleanedDelta,
allowDeltaAppend: canAppendDelta({
hasNumericSeq,
isWaitingForRecovery: false,
hasObservedGap,
nextText: cleanedText,
nextDelta: cleanedDelta,
}),
allowFullReplacementShrink: hasObservedGap,
allowEmptyBaseRecoveryReplacement: hasObservedGap,
});
if (!mergedText || mergedText === previousText) {
return undefined;
}
return mergedText;
};
const emitChatDelta = ({
sessionKey,
effectiveRunKey,
sourceRunId,
seq,
text,
previousSeenEventSeq,
delta,
}: EmitChatDeltaParams) => {
/**
* Effective-run merge invariants:
* - `lastSeenEventSeq` tracks the highest run-global seq observed on any stream for gap detection.
* - `lastAcceptedSeq` tracks the highest assistant seq merged into the visible buffer.
* - `waitingForRecovery` latches after a seq gap until a safe full-text replacement arrives.
* - `agentRunSeq` tracks the run-global seq we have observed for client-facing event ordering.
*
* Normal behavior merges in-order assistant events: delta-only chunks append, while full visible
* snapshots replace the buffer. Recovery behavior is stricter: after a gap, ignore delta-only
* chunks until a full visible snapshot re-establishes the complete assistant text.
*/
const cleanedText = stripInlineDirectiveTagsForDisplay(text).text;
const cleanedDelta =
typeof delta === "string" ? stripInlineDirectiveTagsForDisplay(delta).text : "";
const previousText = chatRunState.buffers.get(effectiveRunKey) ?? "";
const hasNumericSeq = Number.isFinite(seq);
const lastAcceptedSeq = chatRunState.lastAcceptedSeq.get(effectiveRunKey) ?? 0;
const isStaleOrReplay = hasNumericSeq && seq <= lastAcceptedSeq;
if (isStaleOrReplay) {
return;
}
const hasSeenNewerEvent = hasSeenNewerRunEvent({
hasNumericSeq,
previousSeenEventSeq,
nextSeq: seq,
});
if (hasSeenNewerEvent) {
chatRunState.waitingForRecovery.add(effectiveRunKey);
return;
}
const mergedText = chatRunState.waitingForRecovery.has(effectiveRunKey)
? resolveRecoveryChatText({
effectiveRunKey,
seq,
previousSeenEventSeq,
previousText,
cleanedText,
cleanedDelta,
hasNumericSeq,
})
: resolveInOrderChatText({
effectiveRunKey,
seq,
previousSeenEventSeq,
previousText,
cleanedText,
cleanedDelta,
hasNumericSeq,
});
if (!mergedText) {
return;
}
chatRunState.buffers.set(clientRunId, mergedText);
chatRunState.buffers.set(effectiveRunKey, mergedText);
if (hasNumericSeq) {
chatRunState.lastAcceptedSeq.set(effectiveRunKey, seq);
}
if (isSilentReplyText(mergedText, SILENT_REPLY_TOKEN)) {
return;
}
if (isSilentReplyLeadFragment(mergedText)) {
return;
}
if (shouldHideHeartbeatChatOutput(clientRunId, sourceRunId)) {
if (shouldHideHeartbeatChatOutput(effectiveRunKey, sourceRunId)) {
return;
}
const now = Date.now();
const last = chatRunState.deltaSentAt.get(clientRunId) ?? 0;
const last = chatRunState.deltaSentAt.get(effectiveRunKey) ?? 0;
if (now - last < 150) {
return;
}
chatRunState.deltaSentAt.set(clientRunId, now);
chatRunState.deltaLastBroadcastLen.set(clientRunId, mergedText.length);
chatRunState.deltaSentAt.set(effectiveRunKey, now);
chatRunState.deltaLastBroadcastText.set(effectiveRunKey, mergedText);
chatRunState.deltaLastBroadcastLen.set(effectiveRunKey, mergedText.length);
const payload = {
runId: clientRunId,
runId: effectiveRunKey,
sessionKey,
seq,
state: "delta" as const,
@ -551,12 +866,12 @@ export function createAgentEventHandler({
nodeSendToSession(sessionKey, "chat", payload);
};
const resolveBufferedChatTextState = (clientRunId: string, sourceRunId: string) => {
const resolveBufferedChatTextState = (effectiveRunKey: string, sourceRunId: string) => {
const bufferedText = stripInlineDirectiveTagsForDisplay(
chatRunState.buffers.get(clientRunId) ?? "",
chatRunState.buffers.get(effectiveRunKey) ?? "",
).text.trim();
const normalizedHeartbeatText = normalizeHeartbeatChatFinalText({
runId: clientRunId,
runId: effectiveRunKey,
sourceRunId,
text: bufferedText,
});
@ -568,14 +883,17 @@ export function createAgentEventHandler({
const flushBufferedChatDeltaIfNeeded = (
sessionKey: string,
clientRunId: string,
effectiveRunKey: string,
sourceRunId: string,
seq: number,
) => {
const { text, shouldSuppressSilent } = resolveBufferedChatTextState(clientRunId, sourceRunId);
const { text, shouldSuppressSilent } = resolveBufferedChatTextState(
effectiveRunKey,
sourceRunId,
);
const shouldSuppressSilentLeadFragment = isSilentReplyLeadFragment(text);
const shouldSuppressHeartbeatStreaming = shouldHideHeartbeatChatOutput(
clientRunId,
effectiveRunKey,
sourceRunId,
);
if (
@ -587,14 +905,14 @@ export function createAgentEventHandler({
return;
}
const lastBroadcastLen = chatRunState.deltaLastBroadcastLen.get(clientRunId) ?? 0;
if (text.length <= lastBroadcastLen) {
const lastBroadcastText = chatRunState.deltaLastBroadcastText.get(effectiveRunKey) ?? "";
if (text === lastBroadcastText) {
return;
}
const now = Date.now();
const flushPayload = {
runId: clientRunId,
runId: effectiveRunKey,
sessionKey,
seq,
state: "delta" as const,
@ -606,31 +924,33 @@ export function createAgentEventHandler({
};
broadcast("chat", flushPayload, { dropIfSlow: true });
nodeSendToSession(sessionKey, "chat", flushPayload);
chatRunState.deltaLastBroadcastLen.set(clientRunId, text.length);
chatRunState.deltaSentAt.set(clientRunId, now);
chatRunState.deltaLastBroadcastText.set(effectiveRunKey, text);
chatRunState.deltaLastBroadcastLen.set(effectiveRunKey, text.length);
chatRunState.deltaSentAt.set(effectiveRunKey, now);
};
const emitChatFinal = (
sessionKey: string,
clientRunId: string,
effectiveRunKey: string,
sourceRunId: string,
seq: number,
jobState: "done" | "error",
error?: unknown,
stopReason?: string,
) => {
const { text, shouldSuppressSilent } = resolveBufferedChatTextState(clientRunId, sourceRunId);
const { text, shouldSuppressSilent } = resolveBufferedChatTextState(
effectiveRunKey,
sourceRunId,
);
// Flush any throttled delta so streaming clients receive the complete text
// before the final event. The 150 ms throttle in emitChatDelta may have
// suppressed the most recent chunk, leaving the client with stale text.
// Only flush if the buffer has grown since the last broadcast to avoid duplicates.
flushBufferedChatDeltaIfNeeded(sessionKey, clientRunId, sourceRunId, seq);
chatRunState.deltaLastBroadcastLen.delete(clientRunId);
chatRunState.buffers.delete(clientRunId);
chatRunState.deltaSentAt.delete(clientRunId);
// Only flush if the buffered text differs from the last broadcast to avoid duplicates.
flushBufferedChatDeltaIfNeeded(sessionKey, effectiveRunKey, sourceRunId, seq);
clearEffectiveChatRunState(chatRunState, effectiveRunKey);
if (jobState === "done") {
const payload = {
runId: clientRunId,
runId: effectiveRunKey,
sessionKey,
seq,
state: "final" as const,
@ -649,7 +969,7 @@ export function createAgentEventHandler({
return;
}
const payload = {
runId: clientRunId,
runId: effectiveRunKey,
sessionKey,
seq,
state: "error" as const,
@ -688,14 +1008,39 @@ export function createAgentEventHandler({
const isControlUiVisible = getAgentRunContext(evt.runId)?.isControlUiVisible ?? true;
const sessionKey =
chatLink?.sessionKey ?? eventSessionKey ?? resolveSessionKeyForRun(evt.runId);
const clientRunId = chatLink?.clientRunId ?? evt.runId;
const eventRunId = chatLink?.clientRunId ?? evt.runId;
// `effectiveRunKey` is the client-visible run identity used for chat merge
// state. `evt.runId` remains the upstream source run id used for agent
// context lookups and tool recipient routing.
const effectiveRunKey = chatLink?.clientRunId ?? evt.runId;
const eventRunId = effectiveRunKey;
const eventForClients = chatLink ? { ...evt, runId: eventRunId } : evt;
const lifecyclePhase =
evt.stream === "lifecycle" && typeof evt.data?.phase === "string" ? evt.data.phase : null;
// Prevent post-terminal state resurrection: ignore events for finalized runs unless it's a new run start signal
if (chatRunState.finalizedEffectiveRunKeys.has(effectiveRunKey)) {
const isNewRunStart =
lifecyclePhase === "start" || (Number.isFinite(evt.seq) && evt.seq === 1);
if (!isNewRunStart) {
return;
}
chatRunState.finalizedEffectiveRunKeys.delete(effectiveRunKey);
}
const isAborted =
chatRunState.abortedRuns.has(clientRunId) || chatRunState.abortedRuns.has(evt.runId);
chatRunState.abortedRuns.has(effectiveRunKey) || chatRunState.abortedRuns.has(evt.runId);
const previousSeenEventSeq = chatRunState.lastSeenEventSeq.get(effectiveRunKey) ?? 0;
const hasNumericSeq = Number.isFinite(evt.seq);
if (hasNumericSeq && evt.seq > previousSeenEventSeq) {
chatRunState.lastSeenEventSeq.set(effectiveRunKey, evt.seq);
}
// Include sessionKey so Control UI can filter tool streams per session.
const agentPayload = sessionKey ? { ...eventForClients, sessionKey } : eventForClients;
const last = agentRunSeq.get(evt.runId) ?? 0;
const previousAgentRunSeq = agentRunSeq.get(effectiveRunKey);
const last =
typeof previousAgentRunSeq === "number" && Number.isFinite(previousAgentRunSeq)
? previousAgentRunSeq
: 0;
const isToolEvent = evt.stream === "tool";
const toolVerbose = isToolEvent ? resolveToolVerboseLevel(evt.runId, sessionKey) : "off";
// Build tool payload: strip result/partialResult unless verbose=full
@ -710,7 +1055,7 @@ export function createAgentEventHandler({
: { ...eventForClients, data };
})()
: agentPayload;
if (last > 0 && evt.seq !== last + 1) {
if (hasNumericSeq && last > 0 && evt.seq !== last + 1) {
broadcast("agent", {
runId: eventRunId,
stream: "error",
@ -723,13 +1068,15 @@ export function createAgentEventHandler({
},
});
}
agentRunSeq.set(evt.runId, evt.seq);
if (hasNumericSeq) {
agentRunSeq.set(effectiveRunKey, Math.max(last, evt.seq));
}
if (isToolEvent) {
const toolPhase = typeof evt.data?.phase === "string" ? evt.data.phase : "";
// Flush pending assistant text before tool-start events so clients can
// render complete pre-tool text above tool cards (not truncated by delta throttle).
if (toolPhase === "start" && isControlUiVisible && sessionKey && !isAborted) {
flushBufferedChatDeltaIfNeeded(sessionKey, clientRunId, evt.runId, evt.seq);
flushBufferedChatDeltaIfNeeded(sessionKey, effectiveRunKey, evt.runId, evt.seq);
}
// Always broadcast tool events to registered WS recipients with
// tool-events capability, regardless of verboseLevel. The verbose
@ -754,9 +1101,6 @@ export function createAgentEventHandler({
broadcast("agent", agentPayload);
}
const lifecyclePhase =
evt.stream === "lifecycle" && typeof evt.data?.phase === "string" ? evt.data.phase : null;
if (isControlUiVisible && sessionKey) {
// Send tool events to node/channel subscribers only when verbose is enabled;
// WS clients already received the event above via broadcastToConnIds.
@ -764,7 +1108,15 @@ export function createAgentEventHandler({
nodeSendToSession(sessionKey, "agent", isToolEvent ? toolPayload : agentPayload);
}
if (!isAborted && evt.stream === "assistant" && typeof evt.data?.text === "string") {
emitChatDelta(sessionKey, clientRunId, evt.runId, evt.seq, evt.data.text, evt.data.delta);
emitChatDelta({
sessionKey,
effectiveRunKey,
sourceRunId: evt.runId,
seq: evt.seq,
text: evt.data.text,
previousSeenEventSeq,
delta: evt.data.delta,
});
} else if (!isAborted && (lifecyclePhase === "end" || lifecyclePhase === "error")) {
const evtStopReason =
typeof evt.data?.stopReason === "string" ? evt.data.stopReason : undefined;
@ -795,12 +1147,13 @@ export function createAgentEventHandler({
);
}
} else if (isAborted && (lifecyclePhase === "end" || lifecyclePhase === "error")) {
chatRunState.abortedRuns.delete(clientRunId);
// Keep aborted-run cleanup explicit: abortedRuns may be keyed by both
// the source runId and the effective client-visible runId.
chatRunState.abortedRuns.delete(effectiveRunKey);
chatRunState.abortedRuns.delete(evt.runId);
chatRunState.buffers.delete(clientRunId);
chatRunState.deltaSentAt.delete(clientRunId);
clearEffectiveChatRunState(chatRunState, effectiveRunKey);
if (chatLink) {
chatRunState.registry.remove(evt.runId, clientRunId, sessionKey);
chatRunState.registry.remove(evt.runId, effectiveRunKey, sessionKey);
}
}
}
@ -809,7 +1162,9 @@ export function createAgentEventHandler({
toolEventRecipients.markFinal(evt.runId);
clearAgentRunContext(evt.runId);
agentRunSeq.delete(evt.runId);
agentRunSeq.delete(clientRunId);
agentRunSeq.delete(effectiveRunKey);
clearEffectiveChatRunState(chatRunState, effectiveRunKey);
chatRunState.finalizedEffectiveRunKeys.add(effectiveRunKey);
}
if (

View File

@ -1,7 +1,15 @@
import { afterEach, describe, expect, it, vi } from "vitest";
import type { HealthSummary } from "../commands/health.js";
import {
createAgentEventHandler,
createChatRunState,
createSessionEventSubscriberRegistry,
createToolEventRecipientRegistry,
} from "./server-chat.js";
const cleanOldMediaMock = vi.fn(async () => {});
const { cleanOldMediaMock } = vi.hoisted(() => ({
cleanOldMediaMock: vi.fn(async () => {}),
}));
vi.mock("../media/store.js", async (importOriginal) => {
const actual = await importOriginal<typeof import("../media/store.js")>();
@ -11,9 +19,14 @@ vi.mock("../media/store.js", async (importOriginal) => {
};
});
vi.mock("./server/health-state.js", () => ({
setBroadcastHealthUpdate: () => {},
}));
const MEDIA_CLEANUP_TTL_MS = 24 * 60 * 60_000;
function createMaintenanceTimerDeps() {
const chatRunState = createChatRunState();
return {
broadcast: () => {},
nodeSendToAllSubscribed: () => {},
@ -23,15 +36,54 @@ function createMaintenanceTimerDeps() {
logHealth: { error: () => {} },
dedupe: new Map(),
chatAbortControllers: new Map(),
chatRunState: { abortedRuns: new Map() },
chatRunBuffers: new Map(),
chatDeltaSentAt: new Map(),
removeChatRun: () => undefined,
chatRunState,
removeChatRun: chatRunState.registry.remove,
agentRunSeq: new Map(),
nodeSendToSession: () => {},
};
}
function seedEffectiveRunState(
chatRunState: ReturnType<typeof createChatRunState>,
runId: string,
text = "Hello world",
) {
chatRunState.buffers.set(runId, text);
chatRunState.lastSeenEventSeq.set(runId, 3);
chatRunState.lastAcceptedSeq.set(runId, 2);
chatRunState.waitingForRecovery.add(runId);
chatRunState.deltaLastBroadcastText.set(runId, text);
chatRunState.deltaSentAt.set(runId, 100);
chatRunState.deltaLastBroadcastLen.set(runId, text.length);
}
function seedAgentRunSeqPastCap(agentRunSeq: Map<string, number>, oldestRunId: string) {
agentRunSeq.set(oldestRunId, 1);
for (let i = 0; i < 10_000; i++) {
agentRunSeq.set(`run-${String(i)}`, i + 2);
}
}
function seedAgentRunSeqOverCapWithOlderRuns(
agentRunSeq: Map<string, number>,
olderRunIds: string[],
extraRunCount = 1,
) {
let seq = 1;
for (const runId of olderRunIds) {
agentRunSeq.set(runId, seq);
seq += 1;
}
for (let i = 0; i < 10_000; i++) {
agentRunSeq.set(`run-${String(i)}`, seq);
seq += 1;
}
for (let i = 0; i < extraRunCount; i++) {
agentRunSeq.set(`overflow-${String(i)}`, seq);
seq += 1;
}
}
function stopMaintenanceTimers(timers: {
tickInterval: NodeJS.Timeout;
healthInterval: NodeJS.Timeout;
@ -123,4 +175,250 @@ describe("startGatewayMaintenanceTimers", () => {
stopMaintenanceTimers(timers);
});
it("clears timeout-aborted seq and recovery state before the same effective key is reused", async () => {
vi.useFakeTimers();
vi.setSystemTime(1_000);
const { startGatewayMaintenanceTimers } = await import("./server-maintenance.js");
const deps = createMaintenanceTimerDeps();
const runId = "client-timeout-reuse";
const sessionKey = "session-timeout-reuse";
seedEffectiveRunState(deps.chatRunState, runId);
deps.agentRunSeq.set(runId, 3);
deps.chatRunState.registry.add(runId, { sessionKey, clientRunId: runId });
deps.chatAbortControllers.set(runId, {
controller: new AbortController(),
sessionId: sessionKey,
sessionKey,
startedAtMs: Date.now() - 5_000,
expiresAtMs: Date.now() - 1,
});
const timers = startGatewayMaintenanceTimers(deps);
await vi.advanceTimersByTimeAsync(60_000);
expect(deps.chatRunState.lastSeenEventSeq.has(runId)).toBe(false);
expect(deps.chatRunState.lastAcceptedSeq.has(runId)).toBe(false);
expect(deps.chatRunState.waitingForRecovery.has(runId)).toBe(false);
expect(deps.chatRunState.deltaLastBroadcastLen.has(runId)).toBe(false);
expect(deps.agentRunSeq.has(runId)).toBe(false);
deps.chatRunState.abortedRuns.delete(runId);
deps.chatRunState.registry.add(runId, { sessionKey, clientRunId: runId });
const broadcast = vi.fn();
const nodeSendToSession = vi.fn();
const handler = createAgentEventHandler({
broadcast,
broadcastToConnIds: vi.fn(),
nodeSendToSession,
agentRunSeq: deps.agentRunSeq,
chatRunState: deps.chatRunState,
resolveSessionKeyForRun: () => undefined,
clearAgentRunContext: vi.fn(),
toolEventRecipients: createToolEventRecipientRegistry(),
sessionEventSubscribers: createSessionEventSubscriberRegistry(),
});
handler({
runId,
seq: 1,
stream: "assistant",
ts: Date.now(),
data: { text: "Fresh start" },
});
handler({
runId,
seq: 2,
stream: "lifecycle",
ts: Date.now(),
data: { phase: "end" },
});
const chatCalls = broadcast.mock.calls.filter(([event]) => event === "chat");
expect(chatCalls).toHaveLength(2);
const finalPayload = chatCalls.at(-1)?.[1] as {
message?: { content?: Array<{ text?: string }> };
};
expect(finalPayload.message?.content?.[0]?.text).toBe("Fresh start");
stopMaintenanceTimers(timers);
});
it("prunes all per-run maps after aborted-run TTL expiry", async () => {
vi.useFakeTimers();
vi.setSystemTime(5_000);
const { startGatewayMaintenanceTimers } = await import("./server-maintenance.js");
const deps = createMaintenanceTimerDeps();
const runId = "client-prune";
seedEffectiveRunState(deps.chatRunState, runId, "Stale text");
deps.chatRunState.abortedRuns.set(runId, Date.now() - 60 * 60_000 - 1);
const timers = startGatewayMaintenanceTimers(deps);
await vi.advanceTimersByTimeAsync(60_000);
expect(deps.chatRunState.abortedRuns.has(runId)).toBe(false);
expect(deps.chatRunState.buffers.has(runId)).toBe(false);
expect(deps.chatRunState.lastSeenEventSeq.has(runId)).toBe(false);
expect(deps.chatRunState.lastAcceptedSeq.has(runId)).toBe(false);
expect(deps.chatRunState.waitingForRecovery.has(runId)).toBe(false);
expect(deps.chatRunState.deltaLastBroadcastText.has(runId)).toBe(false);
expect(deps.chatRunState.deltaSentAt.has(runId)).toBe(false);
expect(deps.chatRunState.deltaLastBroadcastLen.has(runId)).toBe(false);
stopMaintenanceTimers(timers);
});
it("eviction clears stale effective-run state before a client-visible key is reused", async () => {
vi.useFakeTimers();
vi.setSystemTime(10_000);
const { startGatewayMaintenanceTimers } = await import("./server-maintenance.js");
const deps = createMaintenanceTimerDeps();
const runId = "client-reused";
seedEffectiveRunState(deps.chatRunState, runId, "Stale reused text");
deps.chatRunState.abortedRuns.set(runId, Date.now() - 500);
seedAgentRunSeqPastCap(deps.agentRunSeq, runId);
const timers = startGatewayMaintenanceTimers(deps);
await vi.advanceTimersByTimeAsync(60_000);
expect(deps.agentRunSeq.has(runId)).toBe(false);
expect(deps.chatRunState.abortedRuns.has(runId)).toBe(true);
expect(deps.chatRunState.buffers.has(runId)).toBe(false);
expect(deps.chatRunState.lastSeenEventSeq.has(runId)).toBe(false);
expect(deps.chatRunState.lastAcceptedSeq.has(runId)).toBe(false);
expect(deps.chatRunState.waitingForRecovery.has(runId)).toBe(false);
expect(deps.chatRunState.deltaLastBroadcastText.has(runId)).toBe(false);
expect(deps.chatRunState.deltaSentAt.has(runId)).toBe(false);
expect(deps.chatRunState.deltaLastBroadcastLen.has(runId)).toBe(false);
stopMaintenanceTimers(timers);
});
it("bounds abandoned observed run state through the same agentRunSeq eviction path", async () => {
vi.useFakeTimers();
vi.setSystemTime(20_000);
const { startGatewayMaintenanceTimers } = await import("./server-maintenance.js");
const deps = createMaintenanceTimerDeps();
const runId = "client-abandoned-observed";
seedEffectiveRunState(deps.chatRunState, runId, "Observed but never ended");
seedAgentRunSeqPastCap(deps.agentRunSeq, runId);
const timers = startGatewayMaintenanceTimers(deps);
await vi.advanceTimersByTimeAsync(60_000);
expect(deps.agentRunSeq.has(runId)).toBe(false);
expect(deps.chatRunState.lastSeenEventSeq.has(runId)).toBe(false);
expect(deps.chatRunState.lastAcceptedSeq.has(runId)).toBe(false);
expect(deps.chatRunState.waitingForRecovery.has(runId)).toBe(false);
expect(deps.chatRunState.buffers.has(runId)).toBe(false);
expect(deps.chatRunState.deltaLastBroadcastText.has(runId)).toBe(false);
expect(deps.chatRunState.deltaSentAt.has(runId)).toBe(false);
expect(deps.chatRunState.deltaLastBroadcastLen.has(runId)).toBe(false);
stopMaintenanceTimers(timers);
});
it("preserves aborted markers during overflow eviction until a late terminal cleanup can consume them", async () => {
vi.useFakeTimers();
vi.setSystemTime(15_000);
const { startGatewayMaintenanceTimers } = await import("./server-maintenance.js");
const deps = createMaintenanceTimerDeps();
const runId = "client-aborted-overflow";
seedEffectiveRunState(deps.chatRunState, runId, "Aborted text");
deps.chatRunState.abortedRuns.set(runId, Date.now() - 500);
seedAgentRunSeqPastCap(deps.agentRunSeq, runId);
const timers = startGatewayMaintenanceTimers(deps);
await vi.advanceTimersByTimeAsync(60_000);
expect(deps.agentRunSeq.has(runId)).toBe(false);
expect(deps.chatRunState.abortedRuns.has(runId)).toBe(true);
expect(deps.chatRunState.buffers.has(runId)).toBe(false);
expect(deps.chatRunState.lastSeenEventSeq.has(runId)).toBe(false);
expect(deps.chatRunState.lastAcceptedSeq.has(runId)).toBe(false);
expect(deps.chatRunState.waitingForRecovery.has(runId)).toBe(false);
expect(deps.chatRunState.deltaLastBroadcastText.has(runId)).toBe(false);
expect(deps.chatRunState.deltaSentAt.has(runId)).toBe(false);
expect(deps.chatRunState.deltaLastBroadcastLen.has(runId)).toBe(false);
stopMaintenanceTimers(timers);
});
it("skips active chat keys during agentRunSeq overflow eviction", async () => {
vi.useFakeTimers();
vi.setSystemTime(30_000);
const { startGatewayMaintenanceTimers } = await import("./server-maintenance.js");
const deps = createMaintenanceTimerDeps();
const activeRunId = "client-active";
const inactiveRunId = "client-inactive-old";
seedEffectiveRunState(deps.chatRunState, activeRunId, "Active text");
seedEffectiveRunState(deps.chatRunState, inactiveRunId, "Inactive text");
deps.chatRunState.abortedRuns.set(inactiveRunId, Date.now() - 500);
deps.chatAbortControllers.set(activeRunId, {
controller: new AbortController(),
sessionId: "session-active",
sessionKey: "session-active",
startedAtMs: Date.now() - 1_000,
expiresAtMs: Date.now() + 60_000,
});
seedAgentRunSeqOverCapWithOlderRuns(deps.agentRunSeq, [activeRunId, inactiveRunId]);
const timers = startGatewayMaintenanceTimers(deps);
await vi.advanceTimersByTimeAsync(60_000);
expect(deps.agentRunSeq.has(activeRunId)).toBe(true);
expect(deps.chatRunState.buffers.get(activeRunId)).toBe("Active text");
expect(deps.chatRunState.lastSeenEventSeq.get(activeRunId)).toBe(3);
expect(deps.chatRunState.lastAcceptedSeq.get(activeRunId)).toBe(2);
expect(deps.chatRunState.waitingForRecovery.has(activeRunId)).toBe(true);
expect(deps.chatRunState.deltaLastBroadcastText.get(activeRunId)).toBe("Active text");
expect(deps.chatRunState.deltaLastBroadcastLen.get(activeRunId)).toBe("Active text".length);
expect(deps.agentRunSeq.has(inactiveRunId)).toBe(false);
expect(deps.chatRunState.abortedRuns.has(inactiveRunId)).toBe(true);
expect(deps.chatRunState.buffers.has(inactiveRunId)).toBe(false);
expect(deps.chatRunState.lastSeenEventSeq.has(inactiveRunId)).toBe(false);
expect(deps.chatRunState.lastAcceptedSeq.has(inactiveRunId)).toBe(false);
expect(deps.chatRunState.waitingForRecovery.has(inactiveRunId)).toBe(false);
expect(deps.chatRunState.deltaLastBroadcastText.has(inactiveRunId)).toBe(false);
expect(deps.chatRunState.deltaSentAt.has(inactiveRunId)).toBe(false);
expect(deps.chatRunState.deltaLastBroadcastLen.has(inactiveRunId)).toBe(false);
stopMaintenanceTimers(timers);
});
it("still evicts inactive overflow keys and clears their effective state", async () => {
vi.useFakeTimers();
vi.setSystemTime(40_000);
const { startGatewayMaintenanceTimers } = await import("./server-maintenance.js");
const deps = createMaintenanceTimerDeps();
const inactiveOldRunIds = ["client-inactive-1", "client-inactive-2"];
for (const runId of inactiveOldRunIds) {
seedEffectiveRunState(deps.chatRunState, runId, `State for ${runId}`);
}
seedAgentRunSeqOverCapWithOlderRuns(deps.agentRunSeq, inactiveOldRunIds, 2);
const timers = startGatewayMaintenanceTimers(deps);
await vi.advanceTimersByTimeAsync(60_000);
for (const runId of inactiveOldRunIds) {
expect(deps.agentRunSeq.has(runId)).toBe(false);
expect(deps.chatRunState.abortedRuns.has(runId)).toBe(false);
expect(deps.chatRunState.buffers.has(runId)).toBe(false);
expect(deps.chatRunState.lastSeenEventSeq.has(runId)).toBe(false);
expect(deps.chatRunState.lastAcceptedSeq.has(runId)).toBe(false);
expect(deps.chatRunState.waitingForRecovery.has(runId)).toBe(false);
expect(deps.chatRunState.deltaLastBroadcastText.has(runId)).toBe(false);
expect(deps.chatRunState.deltaSentAt.has(runId)).toBe(false);
expect(deps.chatRunState.deltaLastBroadcastLen.has(runId)).toBe(false);
}
stopMaintenanceTimers(timers);
});
});

View File

@ -1,7 +1,11 @@
import type { HealthSummary } from "../commands/health.js";
import { cleanOldMedia } from "../media/store.js";
import { abortChatRunById, type ChatAbortControllerEntry } from "./chat-abort.js";
import type { ChatRunEntry } from "./server-chat.js";
import {
clearEffectiveChatRunState,
type ChatRunEntry,
type EffectiveChatRunStateSlice,
} from "./server-chat.js";
import {
DEDUPE_MAX,
DEDUPE_TTL_MS,
@ -28,9 +32,7 @@ export function startGatewayMaintenanceTimers(params: {
logHealth: { error: (msg: string) => void };
dedupe: Map<string, DedupeEntry>;
chatAbortControllers: Map<string, ChatAbortControllerEntry>;
chatRunState: { abortedRuns: Map<string, number> };
chatRunBuffers: Map<string, string>;
chatDeltaSentAt: Map<string, number>;
chatRunState: EffectiveChatRunStateSlice & { abortedRuns: Map<string, number> };
removeChatRun: (
sessionId: string,
clientRunId: string,
@ -94,7 +96,11 @@ export function startGatewayMaintenanceTimers(params: {
const excess = params.agentRunSeq.size - AGENT_RUN_SEQ_MAX;
let removed = 0;
for (const runId of params.agentRunSeq.keys()) {
if (params.chatAbortControllers.has(runId)) {
continue;
}
params.agentRunSeq.delete(runId);
clearEffectiveChatRunState(params.chatRunState, runId);
removed += 1;
if (removed >= excess) {
break;
@ -109,9 +115,8 @@ export function startGatewayMaintenanceTimers(params: {
abortChatRunById(
{
chatAbortControllers: params.chatAbortControllers,
chatRunBuffers: params.chatRunBuffers,
chatDeltaSentAt: params.chatDeltaSentAt,
chatAbortedRuns: params.chatRunState.abortedRuns,
chatRunState: params.chatRunState,
removeChatRun: params.removeChatRun,
agentRunSeq: params.agentRunSeq,
broadcast: params.broadcast,
@ -127,8 +132,7 @@ export function startGatewayMaintenanceTimers(params: {
continue;
}
params.chatRunState.abortedRuns.delete(runId);
params.chatRunBuffers.delete(runId);
params.chatDeltaSentAt.delete(runId);
clearEffectiveChatRunState(params.chatRunState, runId);
}
}, 60_000);

View File

@ -1,5 +1,6 @@
import { vi } from "vitest";
import type { Mock } from "vitest";
import { createChatRunState } from "../server-chat.js";
import type { GatewayRequestHandler, RespondFn } from "./types.js";
export function createActiveRun(
@ -26,6 +27,7 @@ export type ChatAbortTestContext = Record<string, unknown> & {
chatRunBuffers: Map<string, string>;
chatDeltaSentAt: Map<string, number>;
chatAbortedRuns: Map<string, number>;
chatRunState: ReturnType<typeof createChatRunState>;
removeChatRun: (...args: unknown[]) => { sessionKey: string; clientRunId: string } | undefined;
agentRunSeq: Map<string, number>;
broadcast: (...args: unknown[]) => void;
@ -38,11 +40,36 @@ export type ChatAbortRespondMock = Mock<RespondFn>;
export function createChatAbortContext(
overrides: Record<string, unknown> = {},
): ChatAbortTestContext {
const {
chatRunState: overrideChatRunState,
chatRunBuffers: overrideChatRunBuffers,
chatDeltaSentAt: overrideChatDeltaSentAt,
chatAbortedRuns: overrideChatAbortedRuns,
...rest
} = overrides as Record<string, unknown> & {
chatRunState?: ReturnType<typeof createChatRunState>;
chatRunBuffers?: Map<string, string>;
chatDeltaSentAt?: Map<string, number>;
chatAbortedRuns?: Map<string, number>;
};
const chatRunState = overrideChatRunState ?? createChatRunState();
const seedMap = <T>(target: Map<string, T>, source?: Map<string, T>) => {
if (!source || source === target) {
return;
}
target.clear();
for (const [key, value] of source) {
target.set(key, value);
}
};
seedMap(chatRunState.buffers, overrideChatRunBuffers);
seedMap(chatRunState.deltaSentAt, overrideChatDeltaSentAt);
seedMap(chatRunState.abortedRuns, overrideChatAbortedRuns);
return {
chatAbortControllers: new Map(),
chatRunBuffers: new Map(),
chatDeltaSentAt: new Map(),
chatAbortedRuns: new Map<string, number>(),
removeChatRun: vi
.fn()
.mockImplementation((run: string) => ({ sessionKey: "main", clientRunId: run })),
@ -50,7 +77,11 @@ export function createChatAbortContext(
broadcast: vi.fn(),
nodeSendToSession: vi.fn(),
logGateway: { warn: vi.fn() },
...overrides,
...rest,
chatRunState,
chatRunBuffers: chatRunState.buffers,
chatDeltaSentAt: chatRunState.deltaSentAt,
chatAbortedRuns: chatRunState.abortedRuns,
};
}

View File

@ -766,9 +766,8 @@ function persistAbortedPartials(params: {
function createChatAbortOps(context: GatewayRequestContext): ChatAbortOps {
return {
chatAbortControllers: context.chatAbortControllers,
chatRunBuffers: context.chatRunBuffers,
chatDeltaSentAt: context.chatDeltaSentAt,
chatAbortedRuns: context.chatAbortedRuns,
chatRunState: context.chatRunState,
removeChatRun: context.removeChatRun,
agentRunSeq: context.agentRunSeq,
broadcast: context.broadcast,

View File

@ -10,6 +10,7 @@ import type { NodeRegistry } from "../node-registry.js";
import type { ConnectParams, ErrorShape, RequestFrame } from "../protocol/index.js";
import type { GatewayBroadcastFn, GatewayBroadcastToConnIdsFn } from "../server-broadcast.js";
import type { ChannelRuntimeSnapshot } from "../server-channels.js";
import type { ChatRunState } from "../server-chat.js";
import type { DedupeEntry } from "../server-shared.js";
type SubsystemLogger = ReturnType<typeof createSubsystemLogger>;
@ -61,6 +62,7 @@ export type GatewayRequestContext = {
chatAbortedRuns: Map<string, number>;
chatRunBuffers: Map<string, string>;
chatDeltaSentAt: Map<string, number>;
chatRunState: ChatRunState;
addChatRun: (sessionId: string, entry: { sessionKey: string; clientRunId: string }) => void;
removeChatRun: (
sessionId: string,

View File

@ -687,8 +687,6 @@ export async function startGatewayServer(
agentRunSeq,
dedupe,
chatRunState,
chatRunBuffers,
chatDeltaSentAt,
addChatRun,
removeChatRun,
chatAbortControllers,
@ -812,8 +810,6 @@ export async function startGatewayServer(
dedupe,
chatAbortControllers,
chatRunState,
chatRunBuffers,
chatDeltaSentAt,
removeChatRun,
agentRunSeq,
nodeSendToSession,
@ -1099,6 +1095,7 @@ export async function startGatewayServer(
chatAbortedRuns: chatRunState.abortedRuns,
chatRunBuffers: chatRunState.buffers,
chatDeltaSentAt: chatRunState.deltaSentAt,
chatRunState,
addChatRun,
removeChatRun,
subscribeSessionEvents: sessionEventSubscribers.subscribe,