From 6c3e144166ab9e4e42ed96303ad7ef745316c2b8 Mon Sep 17 00:00:00 2001 From: kumarabhirup Date: Sat, 21 Feb 2026 11:03:04 -0800 Subject: [PATCH] gateway: add session event log and replay cursor tests --- src/gateway/server-chat.agent-events.test.ts | 102 +++++++++++++++++++ 1 file changed, 102 insertions(+) diff --git a/src/gateway/server-chat.agent-events.test.ts b/src/gateway/server-chat.agent-events.test.ts index 56eb2464a73..4c41c2e5cff 100644 --- a/src/gateway/server-chat.agent-events.test.ts +++ b/src/gateway/server-chat.agent-events.test.ts @@ -4,6 +4,8 @@ import { createAgentEventHandler, createChatRunState, createToolEventRecipientRegistry, + createSessionEventLog, + createSessionSubscriptionRegistry, } from "./server-chat.js"; describe("agent event handler", () => { @@ -20,6 +22,9 @@ describe("agent event handler", () => { const chatRunState = createChatRunState(); const toolEventRecipients = createToolEventRecipientRegistry(); + const sessionEventLog = createSessionEventLog(); + const sessionSubscriptions = createSessionSubscriptionRegistry(); + const handler = createAgentEventHandler({ broadcast, broadcastToConnIds, @@ -29,6 +34,8 @@ describe("agent event handler", () => { resolveSessionKeyForRun: params?.resolveSessionKeyForRun ?? (() => undefined), clearAgentRunContext: vi.fn(), toolEventRecipients, + sessionEventLog, + sessionSubscriptions, }); return { @@ -39,6 +46,8 @@ describe("agent event handler", () => { agentRunSeq, chatRunState, toolEventRecipients, + sessionEventLog, + sessionSubscriptions, handler, }; } @@ -252,4 +261,97 @@ describe("agent event handler", () => { expect(payload.data?.result).toEqual(result); resetAgentRunContextForTest(); }); + + // ── Session event log + replay cursor tests ── + + it("assigns globalSeq to broadcast events and logs them", () => { + const { broadcast, sessionEventLog, handler } = createHarness({ + resolveSessionKeyForRun: () => "session-log", + }); + + handler({ + runId: "run-log", + seq: 1, + stream: "lifecycle", + ts: Date.now(), + data: { phase: "start" }, + }); + handler({ + runId: "run-log", + seq: 2, + stream: "assistant", + ts: Date.now(), + data: { delta: "hello" }, + }); + + expect(broadcast).toHaveBeenCalledTimes(2); + const firstPayload = broadcast.mock.calls[0]?.[1] as { globalSeq?: number }; + const secondPayload = broadcast.mock.calls[1]?.[1] as { globalSeq?: number }; + expect(typeof firstPayload.globalSeq).toBe("number"); + expect(typeof secondPayload.globalSeq).toBe("number"); + expect(secondPayload.globalSeq).toBeGreaterThan(firstPayload.globalSeq!); + expect(sessionEventLog.currentSeq()).toBe(2); + }); + + it("routes events to session subscribers and replays from cursor", () => { + const { broadcastToConnIds, sessionEventLog, sessionSubscriptions, handler } = createHarness({ + resolveSessionKeyForRun: () => "session-sub", + }); + + // Emit two events before subscribing. + handler({ runId: "run-sub", seq: 1, stream: "lifecycle", ts: 1000, data: { phase: "start" } }); + handler({ runId: "run-sub", seq: 2, stream: "assistant", ts: 1001, data: { delta: "hi" } }); + + const seqAfterTwo = sessionEventLog.currentSeq(); + + // Subscribe with cursor 0 — should be able to replay both events. + const replayed = sessionEventLog.replayAfter("session-sub", 0); + expect(replayed.length).toBe(2); + expect(replayed[0].globalSeq).toBe(seqAfterTwo - 1); + expect(replayed[1].globalSeq).toBe(seqAfterTwo); + + // Subscribe after first event — should replay only the second. + const partial = sessionEventLog.replayAfter("session-sub", seqAfterTwo - 1); + expect(partial.length).toBe(1); + expect(partial[0].globalSeq).toBe(seqAfterTwo); + + // Register a session subscriber and emit a new event. + sessionSubscriptions.add("session-sub", "conn-1"); + broadcastToConnIds.mockClear(); + + handler({ runId: "run-sub", seq: 3, stream: "assistant", ts: 1002, data: { delta: " world" } }); + + // Session subscriber should receive the event via broadcastToConnIds (twice: + // once from the general tool/broadcast path, once from session subscriber routing). + const subCalls = broadcastToConnIds.mock.calls.filter((c) => { + const connIds = c[2] as ReadonlySet; + return connIds.has("conn-1"); + }); + expect(subCalls.length).toBeGreaterThanOrEqual(1); + const subPayload = subCalls[0]?.[1] as { globalSeq?: number }; + expect(typeof subPayload.globalSeq).toBe("number"); + }); + + it("replays nothing for unknown session key", () => { + const { sessionEventLog, handler } = createHarness({ + resolveSessionKeyForRun: () => "session-x", + }); + + handler({ runId: "run-x", seq: 1, stream: "lifecycle", ts: 1000, data: { phase: "start" } }); + + const replayed = sessionEventLog.replayAfter("unknown-session", 0); + expect(replayed.length).toBe(0); + }); + + it("replays nothing when afterSeq >= current cursor", () => { + const { sessionEventLog, handler } = createHarness({ + resolveSessionKeyForRun: () => "session-y", + }); + + handler({ runId: "run-y", seq: 1, stream: "lifecycle", ts: 1000, data: { phase: "start" } }); + const current = sessionEventLog.currentSeq(); + + const replayed = sessionEventLog.replayAfter("session-y", current); + expect(replayed.length).toBe(0); + }); });