gateway: add session event log, subscription registry, and broadcast globalSeq

This commit is contained in:
kumarabhirup 2026-02-21 11:02:50 -08:00
parent d8afea2cbb
commit 0f5ddf7e8f
No known key found for this signature in database
GPG Key ID: DB7CA2289CAB0167
4 changed files with 170 additions and 6 deletions

View File

@ -194,6 +194,112 @@ export function createToolEventRecipientRegistry(): ToolEventRecipientRegistry {
return { add, get, markFinal };
}
// ── Global event log with replay cursor ──
export type SessionEventLogEntry = {
globalSeq: number;
sessionKey: string;
appendedAt: number;
payload: Record<string, unknown>;
};
export type SessionEventLog = {
/** Append an event and return its assigned globalSeq. */
append: (sessionKey: string, payload: Record<string, unknown>) => number;
/** Return all events for a sessionKey where globalSeq > afterSeq. */
replayAfter: (sessionKey: string, afterSeq: number) => SessionEventLogEntry[];
/** Current global sequence value (latest assigned). */
currentSeq: () => number;
};
const DEFAULT_EVENT_LOG_MAX = 50_000;
const EVENT_LOG_TTL_MS = 30 * 60 * 1000;
export function createSessionEventLog(maxEntries = DEFAULT_EVENT_LOG_MAX): SessionEventLog {
let seq = 0;
const buffer: SessionEventLogEntry[] = [];
const prune = () => {
const cutoff = Date.now() - EVENT_LOG_TTL_MS;
while (buffer.length > maxEntries || (buffer.length > 0 && buffer[0].appendedAt < cutoff)) {
buffer.shift();
}
};
return {
append(sessionKey, payload) {
const entry: SessionEventLogEntry = {
globalSeq: ++seq,
sessionKey,
appendedAt: Date.now(),
payload,
};
buffer.push(entry);
prune();
return entry.globalSeq;
},
replayAfter(sessionKey, afterSeq) {
const result: SessionEventLogEntry[] = [];
for (let i = buffer.length - 1; i >= 0; i--) {
if (buffer[i].globalSeq <= afterSeq) {
break;
}
if (buffer[i].sessionKey === sessionKey) {
result.push(buffer[i]);
}
}
result.reverse();
return result;
},
currentSeq: () => seq,
};
}
// ── Session subscription registry ──
export type SessionSubscriptionRegistry = {
add: (sessionKey: string, connId: string) => void;
remove: (sessionKey: string, connId: string) => void;
removeConn: (connId: string) => void;
get: (sessionKey: string) => ReadonlySet<string> | undefined;
};
export function createSessionSubscriptionRegistry(): SessionSubscriptionRegistry {
const subs = new Map<string, Set<string>>();
return {
add(sessionKey, connId) {
let set = subs.get(sessionKey);
if (!set) {
set = new Set();
subs.set(sessionKey, set);
}
set.add(connId);
},
remove(sessionKey, connId) {
const set = subs.get(sessionKey);
if (!set) {
return;
}
set.delete(connId);
if (set.size === 0) {
subs.delete(sessionKey);
}
},
removeConn(connId) {
for (const [key, set] of subs) {
set.delete(connId);
if (set.size === 0) {
subs.delete(key);
}
}
},
get(sessionKey) {
return subs.get(sessionKey);
},
};
}
export type ChatEventBroadcast = (
event: string,
payload: unknown,
@ -216,6 +322,8 @@ export type AgentEventHandlerOptions = {
resolveSessionKeyForRun: (runId: string) => string | undefined;
clearAgentRunContext: (runId: string) => void;
toolEventRecipients: ToolEventRecipientRegistry;
sessionEventLog: SessionEventLog;
sessionSubscriptions: SessionSubscriptionRegistry;
};
export function createAgentEventHandler({
@ -227,6 +335,8 @@ export function createAgentEventHandler({
resolveSessionKeyForRun,
clearAgentRunContext,
toolEventRecipients,
sessionEventLog,
sessionSubscriptions,
}: AgentEventHandlerOptions) {
const emitChatDelta = (sessionKey: string, clientRunId: string, seq: number, text: string) => {
if (isSilentReplyText(text, SILENT_REPLY_TOKEN)) {
@ -358,17 +468,43 @@ export function createAgentEventHandler({
});
}
agentRunSeq.set(evt.runId, evt.seq);
// Assign a global cursor and log the event for replay.
const globalSeq = sessionKey
? sessionEventLog.append(
sessionKey,
(isToolEvent ? toolPayload : agentPayload) as Record<string, unknown>,
)
: undefined;
if (isToolEvent) {
// Always broadcast tool events to registered WS recipients with
// tool-events capability, regardless of verboseLevel. The verbose
// setting only controls whether tool details are sent as channel
// messages to messaging surfaces (Telegram, Discord, etc.).
const recipients = toolEventRecipients.get(evt.runId);
if (recipients && recipients.size > 0) {
broadcastToConnIds("agent", toolPayload, recipients);
const payload =
globalSeq !== undefined
? { ...(toolPayload as Record<string, unknown>), globalSeq }
: toolPayload;
broadcastToConnIds("agent", payload, recipients);
}
} else {
broadcast("agent", agentPayload);
const payload =
globalSeq !== undefined
? { ...(agentPayload as Record<string, unknown>), globalSeq }
: agentPayload;
broadcast("agent", payload);
}
// Route to session subscribers (replay-cursor protocol consumers).
if (sessionKey && globalSeq !== undefined) {
const sessionSubs = sessionSubscriptions.get(sessionKey);
if (sessionSubs && sessionSubs.size > 0) {
const outPayload = isToolEvent ? toolPayload : agentPayload;
broadcastToConnIds(
"agent",
{ ...(outPayload as Record<string, unknown>), globalSeq },
sessionSubs,
);
}
}
const lifecyclePhase =

View File

@ -59,6 +59,10 @@ export type GatewayRequestContext = {
sessionKey?: string,
) => { sessionKey: string; clientRunId: string } | undefined;
registerToolEventRecipient: (runId: string, connId: string) => void;
registerSessionSubscription: (sessionKey: string, connId: string) => void;
unregisterSessionSubscription: (sessionKey: string, connId: string) => void;
replaySessionEvents: (sessionKey: string, afterSeq: number, connId: string) => number;
currentGlobalSeq: () => number;
dedupe: Map<string, DedupeEntry>;
wizardSessions: Map<string, WizardSession>;
findRunningWizard: () => string | null;

View File

@ -24,6 +24,8 @@ import {
type ChatRunEntry,
createChatRunState,
createToolEventRecipientRegistry,
createSessionEventLog,
createSessionSubscriptionRegistry,
} from "./server-chat.js";
import { MAX_PAYLOAD_BYTES } from "./server-constants.js";
import { attachGatewayUpgradeHandler, createGatewayHttpServer } from "./server-http.js";
@ -181,6 +183,8 @@ export async function createGatewayRuntimeState(params: {
const removeChatRun = chatRunRegistry.remove;
const chatAbortControllers = new Map<string, ChatAbortControllerEntry>();
const toolEventRecipients = createToolEventRecipientRegistry();
const sessionEventLog = createSessionEventLog();
const sessionSubscriptions = createSessionSubscriptionRegistry();
return {
canvasHost,
@ -200,5 +204,7 @@ export async function createGatewayRuntimeState(params: {
removeChatRun,
chatAbortControllers,
toolEventRecipients,
sessionEventLog,
sessionSubscriptions,
};
}

View File

@ -371,6 +371,8 @@ export async function startGatewayServer(
removeChatRun,
chatAbortControllers,
toolEventRecipients,
sessionEventLog,
sessionSubscriptions,
} = await createGatewayRuntimeState({
cfg: cfgAtStart,
bindHost,
@ -508,6 +510,8 @@ export async function startGatewayServer(
resolveSessionKeyForRun,
clearAgentRunContext,
toolEventRecipients,
sessionEventLog,
sessionSubscriptions,
}),
);
@ -598,6 +602,20 @@ export async function startGatewayServer(
addChatRun,
removeChatRun,
registerToolEventRecipient: toolEventRecipients.add,
registerSessionSubscription: sessionSubscriptions.add,
unregisterSessionSubscription: sessionSubscriptions.remove,
replaySessionEvents: (sessionKey: string, afterSeq: number, connId: string) => {
const events = sessionEventLog.replayAfter(sessionKey, afterSeq);
for (const entry of events) {
broadcastToConnIds(
"agent",
{ ...entry.payload, globalSeq: entry.globalSeq },
new Set([connId]),
);
}
return events.length;
},
currentGlobalSeq: () => sessionEventLog.currentSeq(),
dedupe,
wizardSessions,
findRunningWizard,