gateway: add session event log and replay cursor tests

This commit is contained in:
kumarabhirup 2026-02-21 11:03:04 -08:00
parent 581e73d1ce
commit 6c3e144166
No known key found for this signature in database
GPG Key ID: DB7CA2289CAB0167

View File

@ -4,6 +4,8 @@ import {
createAgentEventHandler,
createChatRunState,
createToolEventRecipientRegistry,
createSessionEventLog,
createSessionSubscriptionRegistry,
} from "./server-chat.js";
describe("agent event handler", () => {
@ -20,6 +22,9 @@ describe("agent event handler", () => {
const chatRunState = createChatRunState();
const toolEventRecipients = createToolEventRecipientRegistry();
const sessionEventLog = createSessionEventLog();
const sessionSubscriptions = createSessionSubscriptionRegistry();
const handler = createAgentEventHandler({
broadcast,
broadcastToConnIds,
@ -29,6 +34,8 @@ describe("agent event handler", () => {
resolveSessionKeyForRun: params?.resolveSessionKeyForRun ?? (() => undefined),
clearAgentRunContext: vi.fn(),
toolEventRecipients,
sessionEventLog,
sessionSubscriptions,
});
return {
@ -39,6 +46,8 @@ describe("agent event handler", () => {
agentRunSeq,
chatRunState,
toolEventRecipients,
sessionEventLog,
sessionSubscriptions,
handler,
};
}
@ -252,4 +261,97 @@ describe("agent event handler", () => {
expect(payload.data?.result).toEqual(result);
resetAgentRunContextForTest();
});
// ── Session event log + replay cursor tests ──
it("assigns globalSeq to broadcast events and logs them", () => {
const { broadcast, sessionEventLog, handler } = createHarness({
resolveSessionKeyForRun: () => "session-log",
});
handler({
runId: "run-log",
seq: 1,
stream: "lifecycle",
ts: Date.now(),
data: { phase: "start" },
});
handler({
runId: "run-log",
seq: 2,
stream: "assistant",
ts: Date.now(),
data: { delta: "hello" },
});
expect(broadcast).toHaveBeenCalledTimes(2);
const firstPayload = broadcast.mock.calls[0]?.[1] as { globalSeq?: number };
const secondPayload = broadcast.mock.calls[1]?.[1] as { globalSeq?: number };
expect(typeof firstPayload.globalSeq).toBe("number");
expect(typeof secondPayload.globalSeq).toBe("number");
expect(secondPayload.globalSeq).toBeGreaterThan(firstPayload.globalSeq!);
expect(sessionEventLog.currentSeq()).toBe(2);
});
it("routes events to session subscribers and replays from cursor", () => {
const { broadcastToConnIds, sessionEventLog, sessionSubscriptions, handler } = createHarness({
resolveSessionKeyForRun: () => "session-sub",
});
// Emit two events before subscribing.
handler({ runId: "run-sub", seq: 1, stream: "lifecycle", ts: 1000, data: { phase: "start" } });
handler({ runId: "run-sub", seq: 2, stream: "assistant", ts: 1001, data: { delta: "hi" } });
const seqAfterTwo = sessionEventLog.currentSeq();
// Subscribe with cursor 0 — should be able to replay both events.
const replayed = sessionEventLog.replayAfter("session-sub", 0);
expect(replayed.length).toBe(2);
expect(replayed[0].globalSeq).toBe(seqAfterTwo - 1);
expect(replayed[1].globalSeq).toBe(seqAfterTwo);
// Subscribe after first event — should replay only the second.
const partial = sessionEventLog.replayAfter("session-sub", seqAfterTwo - 1);
expect(partial.length).toBe(1);
expect(partial[0].globalSeq).toBe(seqAfterTwo);
// Register a session subscriber and emit a new event.
sessionSubscriptions.add("session-sub", "conn-1");
broadcastToConnIds.mockClear();
handler({ runId: "run-sub", seq: 3, stream: "assistant", ts: 1002, data: { delta: " world" } });
// Session subscriber should receive the event via broadcastToConnIds (twice:
// once from the general tool/broadcast path, once from session subscriber routing).
const subCalls = broadcastToConnIds.mock.calls.filter((c) => {
const connIds = c[2] as ReadonlySet<string>;
return connIds.has("conn-1");
});
expect(subCalls.length).toBeGreaterThanOrEqual(1);
const subPayload = subCalls[0]?.[1] as { globalSeq?: number };
expect(typeof subPayload.globalSeq).toBe("number");
});
it("replays nothing for unknown session key", () => {
const { sessionEventLog, handler } = createHarness({
resolveSessionKeyForRun: () => "session-x",
});
handler({ runId: "run-x", seq: 1, stream: "lifecycle", ts: 1000, data: { phase: "start" } });
const replayed = sessionEventLog.replayAfter("unknown-session", 0);
expect(replayed.length).toBe(0);
});
it("replays nothing when afterSeq >= current cursor", () => {
const { sessionEventLog, handler } = createHarness({
resolveSessionKeyForRun: () => "session-y",
});
handler({ runId: "run-y", seq: 1, stream: "lifecycle", ts: 1000, data: { phase: "start" } });
const current = sessionEventLog.currentSeq();
const replayed = sessionEventLog.replayAfter("session-y", current);
expect(replayed.length).toBe(0);
});
});