From 0f5ddf7e8f7eb71c289589b9464165b41f9fe875 Mon Sep 17 00:00:00 2001 From: kumarabhirup Date: Sat, 21 Feb 2026 11:02:50 -0800 Subject: [PATCH] gateway: add session event log, subscription registry, and broadcast globalSeq --- src/gateway/server-chat.ts | 148 ++++++++++++++++++++++++++-- src/gateway/server-methods/types.ts | 4 + src/gateway/server-runtime-state.ts | 6 ++ src/gateway/server.impl.ts | 18 ++++ 4 files changed, 170 insertions(+), 6 deletions(-) diff --git a/src/gateway/server-chat.ts b/src/gateway/server-chat.ts index eff7455953c..242f567e4c5 100644 --- a/src/gateway/server-chat.ts +++ b/src/gateway/server-chat.ts @@ -194,6 +194,112 @@ export function createToolEventRecipientRegistry(): ToolEventRecipientRegistry { return { add, get, markFinal }; } +// ── Global event log with replay cursor ── + +export type SessionEventLogEntry = { + globalSeq: number; + sessionKey: string; + appendedAt: number; + payload: Record; +}; + +export type SessionEventLog = { + /** Append an event and return its assigned globalSeq. */ + append: (sessionKey: string, payload: Record) => number; + /** Return all events for a sessionKey where globalSeq > afterSeq. */ + replayAfter: (sessionKey: string, afterSeq: number) => SessionEventLogEntry[]; + /** Current global sequence value (latest assigned). */ + currentSeq: () => number; +}; + +const DEFAULT_EVENT_LOG_MAX = 50_000; +const EVENT_LOG_TTL_MS = 30 * 60 * 1000; + +export function createSessionEventLog(maxEntries = DEFAULT_EVENT_LOG_MAX): SessionEventLog { + let seq = 0; + const buffer: SessionEventLogEntry[] = []; + + const prune = () => { + const cutoff = Date.now() - EVENT_LOG_TTL_MS; + while (buffer.length > maxEntries || (buffer.length > 0 && buffer[0].appendedAt < cutoff)) { + buffer.shift(); + } + }; + + return { + append(sessionKey, payload) { + const entry: SessionEventLogEntry = { + globalSeq: ++seq, + sessionKey, + appendedAt: Date.now(), + payload, + }; + buffer.push(entry); + prune(); + return entry.globalSeq; + }, + replayAfter(sessionKey, afterSeq) { + const result: SessionEventLogEntry[] = []; + for (let i = buffer.length - 1; i >= 0; i--) { + if (buffer[i].globalSeq <= afterSeq) { + break; + } + if (buffer[i].sessionKey === sessionKey) { + result.push(buffer[i]); + } + } + result.reverse(); + return result; + }, + currentSeq: () => seq, + }; +} + +// ── Session subscription registry ── + +export type SessionSubscriptionRegistry = { + add: (sessionKey: string, connId: string) => void; + remove: (sessionKey: string, connId: string) => void; + removeConn: (connId: string) => void; + get: (sessionKey: string) => ReadonlySet | undefined; +}; + +export function createSessionSubscriptionRegistry(): SessionSubscriptionRegistry { + const subs = new Map>(); + + return { + add(sessionKey, connId) { + let set = subs.get(sessionKey); + if (!set) { + set = new Set(); + subs.set(sessionKey, set); + } + set.add(connId); + }, + remove(sessionKey, connId) { + const set = subs.get(sessionKey); + if (!set) { + return; + } + set.delete(connId); + if (set.size === 0) { + subs.delete(sessionKey); + } + }, + removeConn(connId) { + for (const [key, set] of subs) { + set.delete(connId); + if (set.size === 0) { + subs.delete(key); + } + } + }, + get(sessionKey) { + return subs.get(sessionKey); + }, + }; +} + export type ChatEventBroadcast = ( event: string, payload: unknown, @@ -216,6 +322,8 @@ export type AgentEventHandlerOptions = { resolveSessionKeyForRun: (runId: string) => string | undefined; clearAgentRunContext: (runId: string) => void; toolEventRecipients: ToolEventRecipientRegistry; + sessionEventLog: SessionEventLog; + sessionSubscriptions: SessionSubscriptionRegistry; }; export function createAgentEventHandler({ @@ -227,6 +335,8 @@ export function createAgentEventHandler({ resolveSessionKeyForRun, clearAgentRunContext, toolEventRecipients, + sessionEventLog, + sessionSubscriptions, }: AgentEventHandlerOptions) { const emitChatDelta = (sessionKey: string, clientRunId: string, seq: number, text: string) => { if (isSilentReplyText(text, SILENT_REPLY_TOKEN)) { @@ -358,17 +468,43 @@ export function createAgentEventHandler({ }); } agentRunSeq.set(evt.runId, evt.seq); + + // Assign a global cursor and log the event for replay. + const globalSeq = sessionKey + ? sessionEventLog.append( + sessionKey, + (isToolEvent ? toolPayload : agentPayload) as Record, + ) + : undefined; + if (isToolEvent) { - // Always broadcast tool events to registered WS recipients with - // tool-events capability, regardless of verboseLevel. The verbose - // setting only controls whether tool details are sent as channel - // messages to messaging surfaces (Telegram, Discord, etc.). const recipients = toolEventRecipients.get(evt.runId); if (recipients && recipients.size > 0) { - broadcastToConnIds("agent", toolPayload, recipients); + const payload = + globalSeq !== undefined + ? { ...(toolPayload as Record), globalSeq } + : toolPayload; + broadcastToConnIds("agent", payload, recipients); } } else { - broadcast("agent", agentPayload); + const payload = + globalSeq !== undefined + ? { ...(agentPayload as Record), globalSeq } + : agentPayload; + broadcast("agent", payload); + } + + // Route to session subscribers (replay-cursor protocol consumers). + if (sessionKey && globalSeq !== undefined) { + const sessionSubs = sessionSubscriptions.get(sessionKey); + if (sessionSubs && sessionSubs.size > 0) { + const outPayload = isToolEvent ? toolPayload : agentPayload; + broadcastToConnIds( + "agent", + { ...(outPayload as Record), globalSeq }, + sessionSubs, + ); + } } const lifecyclePhase = diff --git a/src/gateway/server-methods/types.ts b/src/gateway/server-methods/types.ts index b0c70acd505..49454b3478c 100644 --- a/src/gateway/server-methods/types.ts +++ b/src/gateway/server-methods/types.ts @@ -59,6 +59,10 @@ export type GatewayRequestContext = { sessionKey?: string, ) => { sessionKey: string; clientRunId: string } | undefined; registerToolEventRecipient: (runId: string, connId: string) => void; + registerSessionSubscription: (sessionKey: string, connId: string) => void; + unregisterSessionSubscription: (sessionKey: string, connId: string) => void; + replaySessionEvents: (sessionKey: string, afterSeq: number, connId: string) => number; + currentGlobalSeq: () => number; dedupe: Map; wizardSessions: Map; findRunningWizard: () => string | null; diff --git a/src/gateway/server-runtime-state.ts b/src/gateway/server-runtime-state.ts index d0c8c01e17c..ba65f9055dd 100644 --- a/src/gateway/server-runtime-state.ts +++ b/src/gateway/server-runtime-state.ts @@ -24,6 +24,8 @@ import { type ChatRunEntry, createChatRunState, createToolEventRecipientRegistry, + createSessionEventLog, + createSessionSubscriptionRegistry, } from "./server-chat.js"; import { MAX_PAYLOAD_BYTES } from "./server-constants.js"; import { attachGatewayUpgradeHandler, createGatewayHttpServer } from "./server-http.js"; @@ -181,6 +183,8 @@ export async function createGatewayRuntimeState(params: { const removeChatRun = chatRunRegistry.remove; const chatAbortControllers = new Map(); const toolEventRecipients = createToolEventRecipientRegistry(); + const sessionEventLog = createSessionEventLog(); + const sessionSubscriptions = createSessionSubscriptionRegistry(); return { canvasHost, @@ -200,5 +204,7 @@ export async function createGatewayRuntimeState(params: { removeChatRun, chatAbortControllers, toolEventRecipients, + sessionEventLog, + sessionSubscriptions, }; } diff --git a/src/gateway/server.impl.ts b/src/gateway/server.impl.ts index 162b4f148d4..88d15748eda 100644 --- a/src/gateway/server.impl.ts +++ b/src/gateway/server.impl.ts @@ -371,6 +371,8 @@ export async function startGatewayServer( removeChatRun, chatAbortControllers, toolEventRecipients, + sessionEventLog, + sessionSubscriptions, } = await createGatewayRuntimeState({ cfg: cfgAtStart, bindHost, @@ -508,6 +510,8 @@ export async function startGatewayServer( resolveSessionKeyForRun, clearAgentRunContext, toolEventRecipients, + sessionEventLog, + sessionSubscriptions, }), ); @@ -598,6 +602,20 @@ export async function startGatewayServer( addChatRun, removeChatRun, registerToolEventRecipient: toolEventRecipients.add, + registerSessionSubscription: sessionSubscriptions.add, + unregisterSessionSubscription: sessionSubscriptions.remove, + replaySessionEvents: (sessionKey: string, afterSeq: number, connId: string) => { + const events = sessionEventLog.replayAfter(sessionKey, afterSeq); + for (const entry of events) { + broadcastToConnIds( + "agent", + { ...entry.payload, globalSeq: entry.globalSeq }, + new Set([connId]), + ); + } + return events.length; + }, + currentGlobalSeq: () => sessionEventLog.currentSeq(), dedupe, wizardSessions, findRunningWizard,