Harden dashboard session events and history APIs

This commit is contained in:
Tyler Yust 2026-03-12 05:16:11 -07:00
parent 66b7aea616
commit 2beb2afdd7
14 changed files with 415 additions and 58 deletions

View File

@ -69,6 +69,8 @@ const METHOD_SCOPE_GROUPS: Record<OperatorScope, readonly string[]> = {
"sessions.get",
"sessions.preview",
"sessions.resolve",
"sessions.subscribe",
"sessions.unsubscribe",
"sessions.usage",
"sessions.usage.timeseries",
"sessions.usage.logs",

View File

@ -1,11 +1,14 @@
import {
ADMIN_SCOPE,
APPROVALS_SCOPE,
PAIRING_SCOPE,
READ_SCOPE,
WRITE_SCOPE,
} from "./method-scopes.js";
import { MAX_BUFFERED_BYTES } from "./server-constants.js";
import type { GatewayWsClient } from "./server/ws-types.js";
import { logWs, shouldLogWs, summarizeAgentEventForWsLog } from "./ws-log.js";
const ADMIN_SCOPE = "operator.admin";
const APPROVALS_SCOPE = "operator.approvals";
const PAIRING_SCOPE = "operator.pairing";
const EVENT_SCOPE_GUARDS: Record<string, string[]> = {
"exec.approval.requested": [APPROVALS_SCOPE],
"exec.approval.resolved": [APPROVALS_SCOPE],
@ -13,6 +16,8 @@ const EVENT_SCOPE_GUARDS: Record<string, string[]> = {
"device.pair.resolved": [PAIRING_SCOPE],
"node.pair.requested": [PAIRING_SCOPE],
"node.pair.resolved": [PAIRING_SCOPE],
"sessions.changed": [READ_SCOPE],
"session.message": [READ_SCOPE],
};
export type GatewayBroadcastStateVersion = {
@ -51,6 +56,9 @@ function hasEventScope(client: GatewayWsClient, event: string): boolean {
if (scopes.includes(ADMIN_SCOPE)) {
return true;
}
if (required.includes(READ_SCOPE)) {
return scopes.includes(READ_SCOPE) || scopes.includes(WRITE_SCOPE);
}
return required.some((scope) => scopes.includes(scope));
}

View File

@ -237,6 +237,13 @@ export type ToolEventRecipientRegistry = {
markFinal: (runId: string) => void;
};
export type SessionEventSubscriberRegistry = {
subscribe: (connId: string) => void;
unsubscribe: (connId: string) => void;
getAll: () => ReadonlySet<string>;
clear: () => void;
};
type ToolRecipientEntry = {
connIds: Set<string>;
updatedAt: number;
@ -246,6 +253,32 @@ type ToolRecipientEntry = {
const TOOL_EVENT_RECIPIENT_TTL_MS = 10 * 60 * 1000;
const TOOL_EVENT_RECIPIENT_FINAL_GRACE_MS = 30 * 1000;
export function createSessionEventSubscriberRegistry(): SessionEventSubscriberRegistry {
const connIds = new Set<string>();
const empty = new Set<string>();
return {
subscribe: (connId: string) => {
const normalized = connId.trim();
if (!normalized) {
return;
}
connIds.add(normalized);
},
unsubscribe: (connId: string) => {
const normalized = connId.trim();
if (!normalized) {
return;
}
connIds.delete(normalized);
},
getAll: () => (connIds.size > 0 ? connIds : empty),
clear: () => {
connIds.clear();
},
};
}
export function createToolEventRecipientRegistry(): ToolEventRecipientRegistry {
const recipients = new Map<string, ToolRecipientEntry>();
@ -326,6 +359,7 @@ export type AgentEventHandlerOptions = {
resolveSessionKeyForRun: (runId: string) => string | undefined;
clearAgentRunContext: (runId: string) => void;
toolEventRecipients: ToolEventRecipientRegistry;
sessionEventSubscribers: SessionEventSubscriberRegistry;
};
export function createAgentEventHandler({
@ -337,7 +371,16 @@ export function createAgentEventHandler({
resolveSessionKeyForRun,
clearAgentRunContext,
toolEventRecipients,
sessionEventSubscribers,
}: AgentEventHandlerOptions) {
const emitSessionEvent = (event: string, payload: unknown) => {
const connIds = sessionEventSubscribers.getAll();
if (connIds.size === 0) {
return;
}
broadcastToConnIds(event, payload, connIds, { dropIfSlow: true });
};
const emitChatDelta = (
sessionKey: string,
clientRunId: string,
@ -649,11 +692,12 @@ export function createAgentEventHandler({
sessionKey &&
(lifecyclePhase === "start" || lifecyclePhase === "end" || lifecyclePhase === "error")
) {
broadcast(
"sessions.changed",
{ sessionKey, phase: lifecyclePhase, runId: evt.runId, ts: evt.ts },
{ dropIfSlow: true },
);
emitSessionEvent("sessions.changed", {
sessionKey,
phase: lifecyclePhase,
runId: evt.runId,
ts: evt.ts,
});
}
};
}

View File

@ -75,15 +75,20 @@ function resolveGatewaySessionTargetFromKey(key: string) {
}
function emitSessionsChanged(
broadcast: GatewayRequestContext["broadcast"],
context: Pick<GatewayRequestContext, "broadcastToConnIds" | "getSessionEventSubscriberConnIds">,
payload: { sessionKey?: string; reason: string; compacted?: boolean },
) {
broadcast(
const connIds = context.getSessionEventSubscriberConnIds();
if (connIds.size === 0) {
return;
}
context.broadcastToConnIds(
"sessions.changed",
{
...payload,
ts: Date.now(),
},
connIds,
{ dropIfSlow: true },
);
}
@ -152,10 +157,18 @@ export const sessionsHandlers: GatewayRequestHandlers = {
});
respond(true, result, undefined);
},
"sessions.subscribe": ({ respond }) => {
respond(true, { subscribed: true }, undefined);
"sessions.subscribe": ({ client, context, respond }) => {
const connId = client?.connId?.trim();
if (connId) {
context.subscribeSessionEvents(connId);
}
respond(true, { subscribed: Boolean(connId) }, undefined);
},
"sessions.unsubscribe": ({ respond }) => {
"sessions.unsubscribe": ({ client, context, respond }) => {
const connId = client?.connId?.trim();
if (connId) {
context.unsubscribeSessionEvents(connId);
}
respond(true, { subscribed: false }, undefined);
},
"sessions.preview": ({ params, respond }) => {
@ -276,7 +289,7 @@ export const sessionsHandlers: GatewayRequestHandlers = {
},
};
respond(true, result, undefined);
emitSessionsChanged(context.broadcast, {
emitSessionsChanged(context, {
sessionKey: target.canonicalKey,
reason: "patch",
});
@ -302,7 +315,7 @@ export const sessionsHandlers: GatewayRequestHandlers = {
return;
}
respond(true, { ok: true, key: result.key, entry: result.entry }, undefined);
emitSessionsChanged(context.broadcast, {
emitSessionsChanged(context, {
sessionKey: result.key,
reason,
});
@ -378,7 +391,7 @@ export const sessionsHandlers: GatewayRequestHandlers = {
respond(true, { ok: true, key: target.canonicalKey, deleted, archived }, undefined);
if (deleted) {
emitSessionsChanged(context.broadcast, {
emitSessionsChanged(context, {
sessionKey: target.canonicalKey,
reason: "delete",
});
@ -507,7 +520,7 @@ export const sessionsHandlers: GatewayRequestHandlers = {
},
undefined,
);
emitSessionsChanged(context.broadcast, {
emitSessionsChanged(context, {
sessionKey: target.canonicalKey,
reason: "compact",
compacted: true,

View File

@ -63,6 +63,10 @@ export type GatewayRequestContext = {
clientRunId: string,
sessionKey?: string,
) => { sessionKey: string; clientRunId: string } | undefined;
subscribeSessionEvents: (connId: string) => void;
unsubscribeSessionEvents: (connId: string) => void;
unsubscribeAllSessionEvents: (connId: string) => void;
getSessionEventSubscriberConnIds: () => ReadonlySet<string>;
registerToolEventRecipient: (runId: string, connId: string) => void;
dedupe: Map<string, DedupeEntry>;
wizardSessions: Map<string, WizardSession>;

View File

@ -77,7 +77,7 @@ import { ExecApprovalManager } from "./exec-approval-manager.js";
import { NodeRegistry } from "./node-registry.js";
import type { startBrowserControlServerIfEnabled } from "./server-browser.js";
import { createChannelManager } from "./server-channels.js";
import { createAgentEventHandler } from "./server-chat.js";
import { createAgentEventHandler, createSessionEventSubscriberRegistry } from "./server-chat.js";
import { createGatewayCloseHandler } from "./server-close.js";
import { buildGatewayCronService } from "./server-cron.js";
import { startGatewayDiscovery } from "./server-discovery-runtime.js";
@ -633,6 +633,7 @@ export async function startGatewayServer(
const nodeRegistry = new NodeRegistry();
const nodePresenceTimers = new Map<string, ReturnType<typeof setInterval>>();
const nodeSubscriptions = createNodeSubscriptionManager();
const sessionEventSubscribers = createSessionEventSubscriberRegistry();
const nodeSendEvent = (opts: { nodeId: string; event: string; payloadJSON?: string | null }) => {
const payload = safeParseJson(opts.payloadJSON ?? null);
nodeRegistry.sendEvent(opts.nodeId, opts.event, payload);
@ -741,6 +742,7 @@ export async function startGatewayServer(
resolveSessionKeyForRun,
clearAgentRunContext,
toolEventRecipients,
sessionEventSubscribers,
}),
);
@ -758,12 +760,17 @@ export async function startGatewayServer(
if (!sessionKey || update.message === undefined) {
return;
}
broadcast(
const connIds = sessionEventSubscribers.getAll();
if (connIds.size === 0) {
return;
}
broadcastToConnIds(
"session.message",
{
sessionKey,
message: update.message,
},
connIds,
{ dropIfSlow: true },
);
});
@ -873,6 +880,10 @@ export async function startGatewayServer(
chatDeltaSentAt: chatRunState.deltaSentAt,
addChatRun,
removeChatRun,
subscribeSessionEvents: sessionEventSubscribers.subscribe,
unsubscribeSessionEvents: sessionEventSubscribers.unsubscribe,
unsubscribeAllSessionEvents: sessionEventSubscribers.unsubscribe,
getSessionEventSubscriberConnIds: sessionEventSubscribers.getAll,
registerToolEventRecipient: toolEventRecipients.add,
dedupe,
wizardSessions,

View File

@ -242,8 +242,9 @@ export function attachGatewayWsConnectionHandler(params: AttachGatewayWsConnecti
upsertPresence(client.presenceKey, { reason: "disconnect" });
broadcastPresenceSnapshot({ broadcast, incrementPresenceVersion, getHealthVersion });
}
const context = buildRequestContext();
context.unsubscribeAllSessionEvents(connId);
if (client?.connect?.role === "node") {
const context = buildRequestContext();
const nodeId = context.nodeRegistry.unregister(connId);
if (nodeId) {
removeRemoteNodeInfo(nodeId);

View File

@ -6,7 +6,11 @@ const TEST_GATEWAY_TOKEN = "test-gateway-token-1234567890";
let cfg: Record<string, unknown> = {};
const authMock = vi.fn(async () => ({ ok: true }));
const isLocalDirectRequestMock = vi.fn(() => true);
const loadSessionEntryMock = vi.fn();
const getSubagentRunByChildSessionKeyMock = vi.fn();
const resolveSubagentControllerMock = vi.fn();
const killControlledSubagentRunMock = vi.fn();
const killSubagentRunAdminMock = vi.fn();
vi.mock("../config/config.js", () => ({
@ -15,14 +19,22 @@ vi.mock("../config/config.js", () => ({
vi.mock("./auth.js", () => ({
authorizeHttpGatewayConnect: (...args: unknown[]) => authMock(...args),
isLocalDirectRequest: (...args: unknown[]) => isLocalDirectRequestMock(...args),
}));
vi.mock("./session-utils.js", () => ({
loadSessionEntry: (...args: unknown[]) => loadSessionEntryMock(...args),
}));
vi.mock("../agents/subagent-registry.js", () => ({
getSubagentRunByChildSessionKey: (...args: unknown[]) =>
getSubagentRunByChildSessionKeyMock(...args),
}));
vi.mock("../agents/subagent-control.js", () => ({
killControlledSubagentRun: (...args: unknown[]) => killControlledSubagentRunMock(...args),
killSubagentRunAdmin: (...args: unknown[]) => killSubagentRunAdminMock(...args),
resolveSubagentController: (...args: unknown[]) => resolveSubagentControllerMock(...args),
}));
const { handleSessionKillHttpRequest } = await import("./session-kill-http.js");
@ -66,15 +78,26 @@ beforeEach(() => {
cfg = {};
authMock.mockReset();
authMock.mockResolvedValue({ ok: true });
isLocalDirectRequestMock.mockReset();
isLocalDirectRequestMock.mockReturnValue(true);
loadSessionEntryMock.mockReset();
getSubagentRunByChildSessionKeyMock.mockReset();
resolveSubagentControllerMock.mockReset();
resolveSubagentControllerMock.mockReturnValue({ controllerSessionKey: "agent:main:main" });
killControlledSubagentRunMock.mockReset();
killSubagentRunAdminMock.mockReset();
});
async function post(pathname: string, token = TEST_GATEWAY_TOKEN) {
async function post(
pathname: string,
token = TEST_GATEWAY_TOKEN,
extraHeaders?: Record<string, string>,
) {
const headers: Record<string, string> = {};
if (token) {
headers.Authorization = `Bearer ${token}`;
}
Object.assign(headers, extraHeaders ?? {});
return fetch(`http://127.0.0.1:${port}${pathname}`, {
method: "POST",
headers,
@ -101,13 +124,14 @@ describe("POST /sessions/:sessionKey/kill", () => {
expect(killSubagentRunAdminMock).not.toHaveBeenCalled();
});
it("kills a matching session via the admin kill helper", async () => {
it("kills a matching session via the admin kill helper using the canonical key", async () => {
loadSessionEntryMock.mockReturnValue({
entry: { sessionId: "sess-worker", updatedAt: Date.now() },
canonicalKey: "agent:main:subagent:worker",
});
killSubagentRunAdminMock.mockResolvedValue({ found: true, killed: true });
const response = await post("/sessions/agent%3Amain%3Asubagent%3Aworker/kill");
const response = await post("/sessions/agent%3AMain%3ASubagent%3AWorker/kill");
expect(response.status).toBe(200);
await expect(response.json()).resolves.toEqual({ ok: true, killed: true });
expect(killSubagentRunAdminMock).toHaveBeenCalledWith({
@ -119,6 +143,7 @@ describe("POST /sessions/:sessionKey/kill", () => {
it("returns killed=false when the target exists but nothing was stopped", async () => {
loadSessionEntryMock.mockReturnValue({
entry: { sessionId: "sess-worker", updatedAt: Date.now() },
canonicalKey: "agent:main:subagent:worker",
});
killSubagentRunAdminMock.mockResolvedValue({ found: true, killed: false });
@ -126,4 +151,42 @@ describe("POST /sessions/:sessionKey/kill", () => {
expect(response.status).toBe(200);
await expect(response.json()).resolves.toEqual({ ok: true, killed: false });
});
it("rejects remote admin kills without requester ownership", async () => {
isLocalDirectRequestMock.mockReturnValue(false);
loadSessionEntryMock.mockReturnValue({
entry: { sessionId: "sess-worker", updatedAt: Date.now() },
canonicalKey: "agent:main:subagent:worker",
});
const response = await post("/sessions/agent%3Amain%3Asubagent%3Aworker/kill");
expect(response.status).toBe(403);
expect(killSubagentRunAdminMock).not.toHaveBeenCalled();
});
it("uses requester ownership checks when a requester session header is provided", async () => {
isLocalDirectRequestMock.mockReturnValue(false);
loadSessionEntryMock.mockReturnValue({
entry: { sessionId: "sess-worker", updatedAt: Date.now() },
canonicalKey: "agent:main:subagent:worker",
});
getSubagentRunByChildSessionKeyMock.mockReturnValue({
runId: "run-1",
childSessionKey: "agent:main:subagent:worker",
});
killControlledSubagentRunMock.mockResolvedValue({ status: "ok" });
const response = await post(
"/sessions/agent%3Amain%3Asubagent%3Aworker/kill",
TEST_GATEWAY_TOKEN,
{ "x-openclaw-requester-session-key": "agent:main:main" },
);
expect(response.status).toBe(200);
await expect(response.json()).resolves.toEqual({ ok: true, killed: true });
expect(resolveSubagentControllerMock).toHaveBeenCalledWith({
cfg,
agentSessionKey: "agent:main:main",
});
expect(getSubagentRunByChildSessionKeyMock).toHaveBeenCalledWith("agent:main:subagent:worker");
});
});

View File

@ -1,15 +1,22 @@
import type { IncomingMessage, ServerResponse } from "node:http";
import { killSubagentRunAdmin } from "../agents/subagent-control.js";
import { loadConfig } from "../config/config.js";
import { authorizeHttpGatewayConnect, type ResolvedGatewayAuth } from "./auth.js";
import {
sendGatewayAuthFailure,
sendJson,
sendMethodNotAllowed,
} from "./http-common.js";
killControlledSubagentRun,
killSubagentRunAdmin,
resolveSubagentController,
} from "../agents/subagent-control.js";
import { getSubagentRunByChildSessionKey } from "../agents/subagent-registry.js";
import { loadConfig } from "../config/config.js";
import type { AuthRateLimiter } from "./auth-rate-limit.js";
import {
authorizeHttpGatewayConnect,
isLocalDirectRequest,
type ResolvedGatewayAuth,
} from "./auth.js";
import { sendGatewayAuthFailure, sendJson, sendMethodNotAllowed } from "./http-common.js";
import { getBearerToken } from "./http-utils.js";
import { loadSessionEntry } from "./session-utils.js";
import type { AuthRateLimiter } from "./auth-rate-limit.js";
const REQUESTER_SESSION_KEY_HEADER = "x-openclaw-requester-session-key";
function resolveSessionKeyFromPath(pathname: string): string | null {
const match = pathname.match(/^\/sessions\/([^/]+)\/kill$/);
@ -60,7 +67,7 @@ export async function handleSessionKillHttpRequest(
return true;
}
const { entry } = loadSessionEntry(sessionKey);
const { entry, canonicalKey } = loadSessionEntry(sessionKey);
if (!entry) {
sendJson(res, 404, {
ok: false,
@ -72,14 +79,54 @@ export async function handleSessionKillHttpRequest(
return true;
}
const result = await killSubagentRunAdmin({
cfg,
sessionKey,
});
const trustedProxies = opts.trustedProxies ?? cfg.gateway?.trustedProxies;
const allowRealIpFallback = opts.allowRealIpFallback ?? cfg.gateway?.allowRealIpFallback;
const requesterSessionKey = req.headers[REQUESTER_SESSION_KEY_HEADER]?.toString().trim();
const allowLocalAdminKill = isLocalDirectRequest(req, trustedProxies, allowRealIpFallback);
if (!requesterSessionKey && !allowLocalAdminKill) {
sendJson(res, 403, {
ok: false,
error: {
type: "forbidden",
message: "Session kills require a local admin request or requester session ownership.",
},
});
return true;
}
let killed = false;
if (requesterSessionKey) {
const runEntry = getSubagentRunByChildSessionKey(canonicalKey);
if (runEntry) {
const result = await killControlledSubagentRun({
cfg,
controller: resolveSubagentController({ cfg, agentSessionKey: requesterSessionKey }),
entry: runEntry,
});
if (result.status === "forbidden") {
sendJson(res, 403, {
ok: false,
error: {
type: "forbidden",
message: result.error,
},
});
return true;
}
killed = result.status === "ok";
}
} else {
const result = await killSubagentRunAdmin({
cfg,
sessionKey: canonicalKey,
});
killed = result.killed;
}
sendJson(res, 200, {
ok: true,
killed: result.killed,
killed,
});
return true;
}

View File

@ -9,6 +9,7 @@ import {
createGatewaySuiteHarness,
installGatewayTestHooks,
onceMessage,
rpcReq,
writeSessionStore,
} from "./test-helpers.server.js";
@ -31,6 +32,71 @@ async function createSessionStoreFile(): Promise<string> {
}
describe("session.message websocket events", () => {
test("only sends transcript events to subscribed operator clients", async () => {
const storePath = await createSessionStoreFile();
await writeSessionStore({
entries: {
main: {
sessionId: "sess-main",
updatedAt: Date.now(),
},
},
storePath,
});
const harness = await createGatewaySuiteHarness();
try {
const subscribedWs = await harness.openWs();
const unsubscribedWs = await harness.openWs();
const nodeWs = await harness.openWs();
try {
await connectOk(subscribedWs, { scopes: ["operator.read"] });
await rpcReq(subscribedWs, "sessions.subscribe");
await connectOk(unsubscribedWs, { scopes: ["operator.read"] });
await connectOk(nodeWs, { role: "node", scopes: [] });
const subscribedEvent = onceMessage(
subscribedWs,
(message) =>
message.type === "event" &&
message.event === "session.message" &&
(message.payload as { sessionKey?: string } | undefined)?.sessionKey ===
"agent:main:main",
);
const unsubscribedEvent = Promise.race([
onceMessage(
unsubscribedWs,
(message) => message.type === "event" && message.event === "session.message",
).then(() => "received"),
new Promise((resolve) => setTimeout(() => resolve("timeout"), 300)),
]);
const nodeEvent = Promise.race([
onceMessage(
nodeWs,
(message) => message.type === "event" && message.event === "session.message",
).then(() => "received"),
new Promise((resolve) => setTimeout(() => resolve("timeout"), 300)),
]);
const appended = await appendAssistantMessageToSessionTranscript({
sessionKey: "agent:main:main",
text: "subscribed only",
storePath,
});
expect(appended.ok).toBe(true);
await expect(subscribedEvent).resolves.toBeTruthy();
await expect(unsubscribedEvent).resolves.toBe("timeout");
await expect(nodeEvent).resolves.toBe("timeout");
} finally {
subscribedWs.close();
unsubscribedWs.close();
nodeWs.close();
}
} finally {
await harness.close();
}
});
test("broadcasts appended transcript messages with the session key", async () => {
const storePath = await createSessionStoreFile();
await writeSessionStore({
@ -47,7 +113,8 @@ describe("session.message websocket events", () => {
try {
const ws = await harness.openWs();
try {
await connectOk(ws);
await connectOk(ws, { scopes: ["operator.read"] });
await rpcReq(ws, "sessions.subscribe");
const appendPromise = appendAssistantMessageToSessionTranscript({
sessionKey: "agent:main:main",

View File

@ -981,6 +981,45 @@ describe("listSessionsFromStore subagent metadata", () => {
expect(failed?.status).toBe("failed");
expect(failed?.runtimeMs).toBe(5_000);
});
test("maps timeout outcomes to timeout status and clamps negative runtime", () => {
const now = Date.now();
const store: Record<string, SessionEntry> = {
"agent:main:subagent:timeout": {
sessionId: "sess-timeout",
updatedAt: now,
spawnedBy: "agent:main:main",
} as SessionEntry,
};
addSubagentRunForTests({
runId: "run-timeout",
childSessionKey: "agent:main:subagent:timeout",
controllerSessionKey: "agent:main:main",
requesterSessionKey: "agent:main:main",
requesterDisplayKey: "main",
task: "timeout task",
cleanup: "keep",
createdAt: now - 10_000,
startedAt: now - 1_000,
endedAt: now - 2_000,
outcome: { status: "timeout" },
model: "openai/gpt-5.4",
});
const result = listSessionsFromStore({
cfg,
storePath: "/tmp/sessions.json",
store,
opts: {},
});
const timeout = result.sessions.find(
(session) => session.key === "agent:main:subagent:timeout",
);
expect(timeout?.status).toBe("timeout");
expect(timeout?.runtimeMs).toBe(0);
});
});
describe("loadCombinedSessionStoreForGateway includes disk-only agents (#32804)", () => {

View File

@ -187,7 +187,7 @@ function resolveSessionRunStatus(
endedAt?: number;
outcome?: { status?: string };
} | null,
): "running" | "done" | "failed" | "killed" | undefined {
): "running" | "done" | "failed" | "killed" | "timeout" | undefined {
if (!run) {
return undefined;
}
@ -201,6 +201,9 @@ function resolveSessionRunStatus(
if (status === "killed") {
return "killed";
}
if (status === "timeout") {
return "timeout";
}
return "done";
}

View File

@ -110,6 +110,58 @@ describe("session history HTTP endpoints", () => {
}
});
test("streams bounded history windows over SSE", async () => {
const { storePath } = await seedSession({ text: "first message" });
const second = await appendAssistantMessageToSessionTranscript({
sessionKey: "agent:main:main",
text: "second message",
storePath,
});
expect(second.ok).toBe(true);
const harness = await createGatewaySuiteHarness();
try {
const res = await fetch(
`http://127.0.0.1:${harness.port}/sessions/${encodeURIComponent("agent:main:main")}/history?limit=1`,
{
headers: {
...AUTH_HEADER,
Accept: "text/event-stream",
},
},
);
expect(res.status).toBe(200);
const reader = res.body?.getReader();
expect(reader).toBeTruthy();
const streamState = { buffer: "" };
const historyEvent = await readSseEvent(reader!, streamState);
expect(historyEvent.event).toBe("history");
expect(
(historyEvent.data as { messages?: Array<{ content?: Array<{ text?: string }> }> })
.messages?.[0]?.content?.[0]?.text,
).toBe("second message");
const appended = await appendAssistantMessageToSessionTranscript({
sessionKey: "agent:main:main",
text: "third message",
storePath,
});
expect(appended.ok).toBe(true);
const nextEvent = await readSseEvent(reader!, streamState);
expect(nextEvent.event).toBe("history");
expect(
(nextEvent.data as { messages?: Array<{ content?: Array<{ text?: string }> }> })
.messages?.[0]?.content?.[0]?.text,
).toBe("third message");
await reader?.cancel();
} finally {
await harness.close();
}
});
test("streams session history updates over SSE", async () => {
const { storePath } = await seedSession({ text: "first message" });

View File

@ -20,6 +20,8 @@ import {
resolveSessionTranscriptCandidates,
} from "./session-utils.js";
const MAX_SESSION_HISTORY_LIMIT = 1000;
function resolveSessionHistoryPath(req: IncomingMessage): string | null {
const url = new URL(req.url ?? "/", `http://${req.headers.host ?? "localhost"}`);
const match = url.pathname.match(/^\/sessions\/([^/]+)\/history$/);
@ -48,7 +50,7 @@ function resolveLimit(req: IncomingMessage): number | undefined {
if (!Number.isFinite(value) || value < 1) {
return 1;
}
return Math.max(1, value);
return Math.min(MAX_SESSION_HISTORY_LIMIT, Math.max(1, value));
}
function maybeLimitMessages(messages: unknown[], limit: number | undefined): unknown[] {
@ -168,29 +170,30 @@ export async function handleSessionHistoryHttpRequest(
if (!updatePath || !transcriptCandidates.has(updatePath)) {
return;
}
const nextMessages = maybeLimitMessages(
readSessionMessages(entry.sessionId, target.storePath, entry.sessionFile),
limit,
);
if (nextMessages.length < sentMessages.length) {
sentMessages = nextMessages;
if (update.message !== undefined) {
if (limit === undefined) {
sentMessages = [...sentMessages, update.message];
sseWrite(res, "message", {
sessionKey: target.canonicalKey,
message: update.message,
});
return;
}
sentMessages = maybeLimitMessages([...sentMessages, update.message], limit);
sseWrite(res, "history", {
sessionKey: target.canonicalKey,
messages: sentMessages,
});
return;
}
if (nextMessages.length === sentMessages.length) {
return;
}
const appended = nextMessages.slice(sentMessages.length);
sentMessages = nextMessages;
for (const message of appended) {
sseWrite(res, "message", {
sessionKey: target.canonicalKey,
message,
});
}
sentMessages = maybeLimitMessages(
readSessionMessages(entry.sessionId, target.storePath, entry.sessionFile),
limit,
);
sseWrite(res, "history", {
sessionKey: target.canonicalKey,
messages: sentMessages,
});
});
const cleanup = () => {