diff --git a/src/gateway/method-scopes.ts b/src/gateway/method-scopes.ts index f4f57259212..6282c22365e 100644 --- a/src/gateway/method-scopes.ts +++ b/src/gateway/method-scopes.ts @@ -69,6 +69,8 @@ const METHOD_SCOPE_GROUPS: Record = { "sessions.get", "sessions.preview", "sessions.resolve", + "sessions.subscribe", + "sessions.unsubscribe", "sessions.usage", "sessions.usage.timeseries", "sessions.usage.logs", diff --git a/src/gateway/server-broadcast.ts b/src/gateway/server-broadcast.ts index f8ef2d69a74..416d72f3d4c 100644 --- a/src/gateway/server-broadcast.ts +++ b/src/gateway/server-broadcast.ts @@ -1,11 +1,14 @@ +import { + ADMIN_SCOPE, + APPROVALS_SCOPE, + PAIRING_SCOPE, + READ_SCOPE, + WRITE_SCOPE, +} from "./method-scopes.js"; import { MAX_BUFFERED_BYTES } from "./server-constants.js"; import type { GatewayWsClient } from "./server/ws-types.js"; import { logWs, shouldLogWs, summarizeAgentEventForWsLog } from "./ws-log.js"; -const ADMIN_SCOPE = "operator.admin"; -const APPROVALS_SCOPE = "operator.approvals"; -const PAIRING_SCOPE = "operator.pairing"; - const EVENT_SCOPE_GUARDS: Record = { "exec.approval.requested": [APPROVALS_SCOPE], "exec.approval.resolved": [APPROVALS_SCOPE], @@ -13,6 +16,8 @@ const EVENT_SCOPE_GUARDS: Record = { "device.pair.resolved": [PAIRING_SCOPE], "node.pair.requested": [PAIRING_SCOPE], "node.pair.resolved": [PAIRING_SCOPE], + "sessions.changed": [READ_SCOPE], + "session.message": [READ_SCOPE], }; export type GatewayBroadcastStateVersion = { @@ -51,6 +56,9 @@ function hasEventScope(client: GatewayWsClient, event: string): boolean { if (scopes.includes(ADMIN_SCOPE)) { return true; } + if (required.includes(READ_SCOPE)) { + return scopes.includes(READ_SCOPE) || scopes.includes(WRITE_SCOPE); + } return required.some((scope) => scopes.includes(scope)); } diff --git a/src/gateway/server-chat.ts b/src/gateway/server-chat.ts index 536c650385d..46c61ebe37a 100644 --- a/src/gateway/server-chat.ts +++ b/src/gateway/server-chat.ts @@ -237,6 +237,13 @@ export type ToolEventRecipientRegistry = { markFinal: (runId: string) => void; }; +export type SessionEventSubscriberRegistry = { + subscribe: (connId: string) => void; + unsubscribe: (connId: string) => void; + getAll: () => ReadonlySet; + clear: () => void; +}; + type ToolRecipientEntry = { connIds: Set; updatedAt: number; @@ -246,6 +253,32 @@ type ToolRecipientEntry = { const TOOL_EVENT_RECIPIENT_TTL_MS = 10 * 60 * 1000; const TOOL_EVENT_RECIPIENT_FINAL_GRACE_MS = 30 * 1000; +export function createSessionEventSubscriberRegistry(): SessionEventSubscriberRegistry { + const connIds = new Set(); + const empty = new Set(); + + return { + subscribe: (connId: string) => { + const normalized = connId.trim(); + if (!normalized) { + return; + } + connIds.add(normalized); + }, + unsubscribe: (connId: string) => { + const normalized = connId.trim(); + if (!normalized) { + return; + } + connIds.delete(normalized); + }, + getAll: () => (connIds.size > 0 ? connIds : empty), + clear: () => { + connIds.clear(); + }, + }; +} + export function createToolEventRecipientRegistry(): ToolEventRecipientRegistry { const recipients = new Map(); @@ -326,6 +359,7 @@ export type AgentEventHandlerOptions = { resolveSessionKeyForRun: (runId: string) => string | undefined; clearAgentRunContext: (runId: string) => void; toolEventRecipients: ToolEventRecipientRegistry; + sessionEventSubscribers: SessionEventSubscriberRegistry; }; export function createAgentEventHandler({ @@ -337,7 +371,16 @@ export function createAgentEventHandler({ resolveSessionKeyForRun, clearAgentRunContext, toolEventRecipients, + sessionEventSubscribers, }: AgentEventHandlerOptions) { + const emitSessionEvent = (event: string, payload: unknown) => { + const connIds = sessionEventSubscribers.getAll(); + if (connIds.size === 0) { + return; + } + broadcastToConnIds(event, payload, connIds, { dropIfSlow: true }); + }; + const emitChatDelta = ( sessionKey: string, clientRunId: string, @@ -649,11 +692,12 @@ export function createAgentEventHandler({ sessionKey && (lifecyclePhase === "start" || lifecyclePhase === "end" || lifecyclePhase === "error") ) { - broadcast( - "sessions.changed", - { sessionKey, phase: lifecyclePhase, runId: evt.runId, ts: evt.ts }, - { dropIfSlow: true }, - ); + emitSessionEvent("sessions.changed", { + sessionKey, + phase: lifecyclePhase, + runId: evt.runId, + ts: evt.ts, + }); } }; } diff --git a/src/gateway/server-methods/sessions.ts b/src/gateway/server-methods/sessions.ts index 2365f106357..0f969b0de95 100644 --- a/src/gateway/server-methods/sessions.ts +++ b/src/gateway/server-methods/sessions.ts @@ -75,15 +75,20 @@ function resolveGatewaySessionTargetFromKey(key: string) { } function emitSessionsChanged( - broadcast: GatewayRequestContext["broadcast"], + context: Pick, payload: { sessionKey?: string; reason: string; compacted?: boolean }, ) { - broadcast( + const connIds = context.getSessionEventSubscriberConnIds(); + if (connIds.size === 0) { + return; + } + context.broadcastToConnIds( "sessions.changed", { ...payload, ts: Date.now(), }, + connIds, { dropIfSlow: true }, ); } @@ -152,10 +157,18 @@ export const sessionsHandlers: GatewayRequestHandlers = { }); respond(true, result, undefined); }, - "sessions.subscribe": ({ respond }) => { - respond(true, { subscribed: true }, undefined); + "sessions.subscribe": ({ client, context, respond }) => { + const connId = client?.connId?.trim(); + if (connId) { + context.subscribeSessionEvents(connId); + } + respond(true, { subscribed: Boolean(connId) }, undefined); }, - "sessions.unsubscribe": ({ respond }) => { + "sessions.unsubscribe": ({ client, context, respond }) => { + const connId = client?.connId?.trim(); + if (connId) { + context.unsubscribeSessionEvents(connId); + } respond(true, { subscribed: false }, undefined); }, "sessions.preview": ({ params, respond }) => { @@ -276,7 +289,7 @@ export const sessionsHandlers: GatewayRequestHandlers = { }, }; respond(true, result, undefined); - emitSessionsChanged(context.broadcast, { + emitSessionsChanged(context, { sessionKey: target.canonicalKey, reason: "patch", }); @@ -302,7 +315,7 @@ export const sessionsHandlers: GatewayRequestHandlers = { return; } respond(true, { ok: true, key: result.key, entry: result.entry }, undefined); - emitSessionsChanged(context.broadcast, { + emitSessionsChanged(context, { sessionKey: result.key, reason, }); @@ -378,7 +391,7 @@ export const sessionsHandlers: GatewayRequestHandlers = { respond(true, { ok: true, key: target.canonicalKey, deleted, archived }, undefined); if (deleted) { - emitSessionsChanged(context.broadcast, { + emitSessionsChanged(context, { sessionKey: target.canonicalKey, reason: "delete", }); @@ -507,7 +520,7 @@ export const sessionsHandlers: GatewayRequestHandlers = { }, undefined, ); - emitSessionsChanged(context.broadcast, { + emitSessionsChanged(context, { sessionKey: target.canonicalKey, reason: "compact", compacted: true, diff --git a/src/gateway/server-methods/types.ts b/src/gateway/server-methods/types.ts index 4998a84c842..dfff3d02ed2 100644 --- a/src/gateway/server-methods/types.ts +++ b/src/gateway/server-methods/types.ts @@ -63,6 +63,10 @@ export type GatewayRequestContext = { clientRunId: string, sessionKey?: string, ) => { sessionKey: string; clientRunId: string } | undefined; + subscribeSessionEvents: (connId: string) => void; + unsubscribeSessionEvents: (connId: string) => void; + unsubscribeAllSessionEvents: (connId: string) => void; + getSessionEventSubscriberConnIds: () => ReadonlySet; registerToolEventRecipient: (runId: string, connId: string) => void; dedupe: Map; wizardSessions: Map; diff --git a/src/gateway/server.impl.ts b/src/gateway/server.impl.ts index a910d64e0f6..1c2ab1b958b 100644 --- a/src/gateway/server.impl.ts +++ b/src/gateway/server.impl.ts @@ -77,7 +77,7 @@ import { ExecApprovalManager } from "./exec-approval-manager.js"; import { NodeRegistry } from "./node-registry.js"; import type { startBrowserControlServerIfEnabled } from "./server-browser.js"; import { createChannelManager } from "./server-channels.js"; -import { createAgentEventHandler } from "./server-chat.js"; +import { createAgentEventHandler, createSessionEventSubscriberRegistry } from "./server-chat.js"; import { createGatewayCloseHandler } from "./server-close.js"; import { buildGatewayCronService } from "./server-cron.js"; import { startGatewayDiscovery } from "./server-discovery-runtime.js"; @@ -633,6 +633,7 @@ export async function startGatewayServer( const nodeRegistry = new NodeRegistry(); const nodePresenceTimers = new Map>(); const nodeSubscriptions = createNodeSubscriptionManager(); + const sessionEventSubscribers = createSessionEventSubscriberRegistry(); const nodeSendEvent = (opts: { nodeId: string; event: string; payloadJSON?: string | null }) => { const payload = safeParseJson(opts.payloadJSON ?? null); nodeRegistry.sendEvent(opts.nodeId, opts.event, payload); @@ -741,6 +742,7 @@ export async function startGatewayServer( resolveSessionKeyForRun, clearAgentRunContext, toolEventRecipients, + sessionEventSubscribers, }), ); @@ -758,12 +760,17 @@ export async function startGatewayServer( if (!sessionKey || update.message === undefined) { return; } - broadcast( + const connIds = sessionEventSubscribers.getAll(); + if (connIds.size === 0) { + return; + } + broadcastToConnIds( "session.message", { sessionKey, message: update.message, }, + connIds, { dropIfSlow: true }, ); }); @@ -873,6 +880,10 @@ export async function startGatewayServer( chatDeltaSentAt: chatRunState.deltaSentAt, addChatRun, removeChatRun, + subscribeSessionEvents: sessionEventSubscribers.subscribe, + unsubscribeSessionEvents: sessionEventSubscribers.unsubscribe, + unsubscribeAllSessionEvents: sessionEventSubscribers.unsubscribe, + getSessionEventSubscriberConnIds: sessionEventSubscribers.getAll, registerToolEventRecipient: toolEventRecipients.add, dedupe, wizardSessions, diff --git a/src/gateway/server/ws-connection.ts b/src/gateway/server/ws-connection.ts index 1a66cbdfe63..c71d27b8c11 100644 --- a/src/gateway/server/ws-connection.ts +++ b/src/gateway/server/ws-connection.ts @@ -242,8 +242,9 @@ export function attachGatewayWsConnectionHandler(params: AttachGatewayWsConnecti upsertPresence(client.presenceKey, { reason: "disconnect" }); broadcastPresenceSnapshot({ broadcast, incrementPresenceVersion, getHealthVersion }); } + const context = buildRequestContext(); + context.unsubscribeAllSessionEvents(connId); if (client?.connect?.role === "node") { - const context = buildRequestContext(); const nodeId = context.nodeRegistry.unregister(connId); if (nodeId) { removeRemoteNodeInfo(nodeId); diff --git a/src/gateway/session-kill-http.test.ts b/src/gateway/session-kill-http.test.ts index c5cd532a0ce..7fbc47abc91 100644 --- a/src/gateway/session-kill-http.test.ts +++ b/src/gateway/session-kill-http.test.ts @@ -6,7 +6,11 @@ const TEST_GATEWAY_TOKEN = "test-gateway-token-1234567890"; let cfg: Record = {}; const authMock = vi.fn(async () => ({ ok: true })); +const isLocalDirectRequestMock = vi.fn(() => true); const loadSessionEntryMock = vi.fn(); +const getSubagentRunByChildSessionKeyMock = vi.fn(); +const resolveSubagentControllerMock = vi.fn(); +const killControlledSubagentRunMock = vi.fn(); const killSubagentRunAdminMock = vi.fn(); vi.mock("../config/config.js", () => ({ @@ -15,14 +19,22 @@ vi.mock("../config/config.js", () => ({ vi.mock("./auth.js", () => ({ authorizeHttpGatewayConnect: (...args: unknown[]) => authMock(...args), + isLocalDirectRequest: (...args: unknown[]) => isLocalDirectRequestMock(...args), })); vi.mock("./session-utils.js", () => ({ loadSessionEntry: (...args: unknown[]) => loadSessionEntryMock(...args), })); +vi.mock("../agents/subagent-registry.js", () => ({ + getSubagentRunByChildSessionKey: (...args: unknown[]) => + getSubagentRunByChildSessionKeyMock(...args), +})); + vi.mock("../agents/subagent-control.js", () => ({ + killControlledSubagentRun: (...args: unknown[]) => killControlledSubagentRunMock(...args), killSubagentRunAdmin: (...args: unknown[]) => killSubagentRunAdminMock(...args), + resolveSubagentController: (...args: unknown[]) => resolveSubagentControllerMock(...args), })); const { handleSessionKillHttpRequest } = await import("./session-kill-http.js"); @@ -66,15 +78,26 @@ beforeEach(() => { cfg = {}; authMock.mockReset(); authMock.mockResolvedValue({ ok: true }); + isLocalDirectRequestMock.mockReset(); + isLocalDirectRequestMock.mockReturnValue(true); loadSessionEntryMock.mockReset(); + getSubagentRunByChildSessionKeyMock.mockReset(); + resolveSubagentControllerMock.mockReset(); + resolveSubagentControllerMock.mockReturnValue({ controllerSessionKey: "agent:main:main" }); + killControlledSubagentRunMock.mockReset(); killSubagentRunAdminMock.mockReset(); }); -async function post(pathname: string, token = TEST_GATEWAY_TOKEN) { +async function post( + pathname: string, + token = TEST_GATEWAY_TOKEN, + extraHeaders?: Record, +) { const headers: Record = {}; if (token) { headers.Authorization = `Bearer ${token}`; } + Object.assign(headers, extraHeaders ?? {}); return fetch(`http://127.0.0.1:${port}${pathname}`, { method: "POST", headers, @@ -101,13 +124,14 @@ describe("POST /sessions/:sessionKey/kill", () => { expect(killSubagentRunAdminMock).not.toHaveBeenCalled(); }); - it("kills a matching session via the admin kill helper", async () => { + it("kills a matching session via the admin kill helper using the canonical key", async () => { loadSessionEntryMock.mockReturnValue({ entry: { sessionId: "sess-worker", updatedAt: Date.now() }, + canonicalKey: "agent:main:subagent:worker", }); killSubagentRunAdminMock.mockResolvedValue({ found: true, killed: true }); - const response = await post("/sessions/agent%3Amain%3Asubagent%3Aworker/kill"); + const response = await post("/sessions/agent%3AMain%3ASubagent%3AWorker/kill"); expect(response.status).toBe(200); await expect(response.json()).resolves.toEqual({ ok: true, killed: true }); expect(killSubagentRunAdminMock).toHaveBeenCalledWith({ @@ -119,6 +143,7 @@ describe("POST /sessions/:sessionKey/kill", () => { it("returns killed=false when the target exists but nothing was stopped", async () => { loadSessionEntryMock.mockReturnValue({ entry: { sessionId: "sess-worker", updatedAt: Date.now() }, + canonicalKey: "agent:main:subagent:worker", }); killSubagentRunAdminMock.mockResolvedValue({ found: true, killed: false }); @@ -126,4 +151,42 @@ describe("POST /sessions/:sessionKey/kill", () => { expect(response.status).toBe(200); await expect(response.json()).resolves.toEqual({ ok: true, killed: false }); }); + + it("rejects remote admin kills without requester ownership", async () => { + isLocalDirectRequestMock.mockReturnValue(false); + loadSessionEntryMock.mockReturnValue({ + entry: { sessionId: "sess-worker", updatedAt: Date.now() }, + canonicalKey: "agent:main:subagent:worker", + }); + + const response = await post("/sessions/agent%3Amain%3Asubagent%3Aworker/kill"); + expect(response.status).toBe(403); + expect(killSubagentRunAdminMock).not.toHaveBeenCalled(); + }); + + it("uses requester ownership checks when a requester session header is provided", async () => { + isLocalDirectRequestMock.mockReturnValue(false); + loadSessionEntryMock.mockReturnValue({ + entry: { sessionId: "sess-worker", updatedAt: Date.now() }, + canonicalKey: "agent:main:subagent:worker", + }); + getSubagentRunByChildSessionKeyMock.mockReturnValue({ + runId: "run-1", + childSessionKey: "agent:main:subagent:worker", + }); + killControlledSubagentRunMock.mockResolvedValue({ status: "ok" }); + + const response = await post( + "/sessions/agent%3Amain%3Asubagent%3Aworker/kill", + TEST_GATEWAY_TOKEN, + { "x-openclaw-requester-session-key": "agent:main:main" }, + ); + expect(response.status).toBe(200); + await expect(response.json()).resolves.toEqual({ ok: true, killed: true }); + expect(resolveSubagentControllerMock).toHaveBeenCalledWith({ + cfg, + agentSessionKey: "agent:main:main", + }); + expect(getSubagentRunByChildSessionKeyMock).toHaveBeenCalledWith("agent:main:subagent:worker"); + }); }); diff --git a/src/gateway/session-kill-http.ts b/src/gateway/session-kill-http.ts index a16e1fed52e..d0058037c6a 100644 --- a/src/gateway/session-kill-http.ts +++ b/src/gateway/session-kill-http.ts @@ -1,15 +1,22 @@ import type { IncomingMessage, ServerResponse } from "node:http"; -import { killSubagentRunAdmin } from "../agents/subagent-control.js"; -import { loadConfig } from "../config/config.js"; -import { authorizeHttpGatewayConnect, type ResolvedGatewayAuth } from "./auth.js"; import { - sendGatewayAuthFailure, - sendJson, - sendMethodNotAllowed, -} from "./http-common.js"; + killControlledSubagentRun, + killSubagentRunAdmin, + resolveSubagentController, +} from "../agents/subagent-control.js"; +import { getSubagentRunByChildSessionKey } from "../agents/subagent-registry.js"; +import { loadConfig } from "../config/config.js"; +import type { AuthRateLimiter } from "./auth-rate-limit.js"; +import { + authorizeHttpGatewayConnect, + isLocalDirectRequest, + type ResolvedGatewayAuth, +} from "./auth.js"; +import { sendGatewayAuthFailure, sendJson, sendMethodNotAllowed } from "./http-common.js"; import { getBearerToken } from "./http-utils.js"; import { loadSessionEntry } from "./session-utils.js"; -import type { AuthRateLimiter } from "./auth-rate-limit.js"; + +const REQUESTER_SESSION_KEY_HEADER = "x-openclaw-requester-session-key"; function resolveSessionKeyFromPath(pathname: string): string | null { const match = pathname.match(/^\/sessions\/([^/]+)\/kill$/); @@ -60,7 +67,7 @@ export async function handleSessionKillHttpRequest( return true; } - const { entry } = loadSessionEntry(sessionKey); + const { entry, canonicalKey } = loadSessionEntry(sessionKey); if (!entry) { sendJson(res, 404, { ok: false, @@ -72,14 +79,54 @@ export async function handleSessionKillHttpRequest( return true; } - const result = await killSubagentRunAdmin({ - cfg, - sessionKey, - }); + const trustedProxies = opts.trustedProxies ?? cfg.gateway?.trustedProxies; + const allowRealIpFallback = opts.allowRealIpFallback ?? cfg.gateway?.allowRealIpFallback; + const requesterSessionKey = req.headers[REQUESTER_SESSION_KEY_HEADER]?.toString().trim(); + const allowLocalAdminKill = isLocalDirectRequest(req, trustedProxies, allowRealIpFallback); + + if (!requesterSessionKey && !allowLocalAdminKill) { + sendJson(res, 403, { + ok: false, + error: { + type: "forbidden", + message: "Session kills require a local admin request or requester session ownership.", + }, + }); + return true; + } + + let killed = false; + if (requesterSessionKey) { + const runEntry = getSubagentRunByChildSessionKey(canonicalKey); + if (runEntry) { + const result = await killControlledSubagentRun({ + cfg, + controller: resolveSubagentController({ cfg, agentSessionKey: requesterSessionKey }), + entry: runEntry, + }); + if (result.status === "forbidden") { + sendJson(res, 403, { + ok: false, + error: { + type: "forbidden", + message: result.error, + }, + }); + return true; + } + killed = result.status === "ok"; + } + } else { + const result = await killSubagentRunAdmin({ + cfg, + sessionKey: canonicalKey, + }); + killed = result.killed; + } sendJson(res, 200, { ok: true, - killed: result.killed, + killed, }); return true; } diff --git a/src/gateway/session-message-events.test.ts b/src/gateway/session-message-events.test.ts index 9eef254d8ba..f1556c82c71 100644 --- a/src/gateway/session-message-events.test.ts +++ b/src/gateway/session-message-events.test.ts @@ -9,6 +9,7 @@ import { createGatewaySuiteHarness, installGatewayTestHooks, onceMessage, + rpcReq, writeSessionStore, } from "./test-helpers.server.js"; @@ -31,6 +32,71 @@ async function createSessionStoreFile(): Promise { } describe("session.message websocket events", () => { + test("only sends transcript events to subscribed operator clients", async () => { + const storePath = await createSessionStoreFile(); + await writeSessionStore({ + entries: { + main: { + sessionId: "sess-main", + updatedAt: Date.now(), + }, + }, + storePath, + }); + + const harness = await createGatewaySuiteHarness(); + try { + const subscribedWs = await harness.openWs(); + const unsubscribedWs = await harness.openWs(); + const nodeWs = await harness.openWs(); + try { + await connectOk(subscribedWs, { scopes: ["operator.read"] }); + await rpcReq(subscribedWs, "sessions.subscribe"); + await connectOk(unsubscribedWs, { scopes: ["operator.read"] }); + await connectOk(nodeWs, { role: "node", scopes: [] }); + + const subscribedEvent = onceMessage( + subscribedWs, + (message) => + message.type === "event" && + message.event === "session.message" && + (message.payload as { sessionKey?: string } | undefined)?.sessionKey === + "agent:main:main", + ); + const unsubscribedEvent = Promise.race([ + onceMessage( + unsubscribedWs, + (message) => message.type === "event" && message.event === "session.message", + ).then(() => "received"), + new Promise((resolve) => setTimeout(() => resolve("timeout"), 300)), + ]); + const nodeEvent = Promise.race([ + onceMessage( + nodeWs, + (message) => message.type === "event" && message.event === "session.message", + ).then(() => "received"), + new Promise((resolve) => setTimeout(() => resolve("timeout"), 300)), + ]); + + const appended = await appendAssistantMessageToSessionTranscript({ + sessionKey: "agent:main:main", + text: "subscribed only", + storePath, + }); + expect(appended.ok).toBe(true); + await expect(subscribedEvent).resolves.toBeTruthy(); + await expect(unsubscribedEvent).resolves.toBe("timeout"); + await expect(nodeEvent).resolves.toBe("timeout"); + } finally { + subscribedWs.close(); + unsubscribedWs.close(); + nodeWs.close(); + } + } finally { + await harness.close(); + } + }); + test("broadcasts appended transcript messages with the session key", async () => { const storePath = await createSessionStoreFile(); await writeSessionStore({ @@ -47,7 +113,8 @@ describe("session.message websocket events", () => { try { const ws = await harness.openWs(); try { - await connectOk(ws); + await connectOk(ws, { scopes: ["operator.read"] }); + await rpcReq(ws, "sessions.subscribe"); const appendPromise = appendAssistantMessageToSessionTranscript({ sessionKey: "agent:main:main", diff --git a/src/gateway/session-utils.test.ts b/src/gateway/session-utils.test.ts index 725dccae2f2..6960e6de838 100644 --- a/src/gateway/session-utils.test.ts +++ b/src/gateway/session-utils.test.ts @@ -981,6 +981,45 @@ describe("listSessionsFromStore subagent metadata", () => { expect(failed?.status).toBe("failed"); expect(failed?.runtimeMs).toBe(5_000); }); + + test("maps timeout outcomes to timeout status and clamps negative runtime", () => { + const now = Date.now(); + const store: Record = { + "agent:main:subagent:timeout": { + sessionId: "sess-timeout", + updatedAt: now, + spawnedBy: "agent:main:main", + } as SessionEntry, + }; + + addSubagentRunForTests({ + runId: "run-timeout", + childSessionKey: "agent:main:subagent:timeout", + controllerSessionKey: "agent:main:main", + requesterSessionKey: "agent:main:main", + requesterDisplayKey: "main", + task: "timeout task", + cleanup: "keep", + createdAt: now - 10_000, + startedAt: now - 1_000, + endedAt: now - 2_000, + outcome: { status: "timeout" }, + model: "openai/gpt-5.4", + }); + + const result = listSessionsFromStore({ + cfg, + storePath: "/tmp/sessions.json", + store, + opts: {}, + }); + + const timeout = result.sessions.find( + (session) => session.key === "agent:main:subagent:timeout", + ); + expect(timeout?.status).toBe("timeout"); + expect(timeout?.runtimeMs).toBe(0); + }); }); describe("loadCombinedSessionStoreForGateway includes disk-only agents (#32804)", () => { diff --git a/src/gateway/session-utils.ts b/src/gateway/session-utils.ts index 62816a1a778..79237504197 100644 --- a/src/gateway/session-utils.ts +++ b/src/gateway/session-utils.ts @@ -187,7 +187,7 @@ function resolveSessionRunStatus( endedAt?: number; outcome?: { status?: string }; } | null, -): "running" | "done" | "failed" | "killed" | undefined { +): "running" | "done" | "failed" | "killed" | "timeout" | undefined { if (!run) { return undefined; } @@ -201,6 +201,9 @@ function resolveSessionRunStatus( if (status === "killed") { return "killed"; } + if (status === "timeout") { + return "timeout"; + } return "done"; } diff --git a/src/gateway/sessions-history-http.test.ts b/src/gateway/sessions-history-http.test.ts index c8cab95e328..b431bc7643f 100644 --- a/src/gateway/sessions-history-http.test.ts +++ b/src/gateway/sessions-history-http.test.ts @@ -110,6 +110,58 @@ describe("session history HTTP endpoints", () => { } }); + test("streams bounded history windows over SSE", async () => { + const { storePath } = await seedSession({ text: "first message" }); + const second = await appendAssistantMessageToSessionTranscript({ + sessionKey: "agent:main:main", + text: "second message", + storePath, + }); + expect(second.ok).toBe(true); + + const harness = await createGatewaySuiteHarness(); + try { + const res = await fetch( + `http://127.0.0.1:${harness.port}/sessions/${encodeURIComponent("agent:main:main")}/history?limit=1`, + { + headers: { + ...AUTH_HEADER, + Accept: "text/event-stream", + }, + }, + ); + + expect(res.status).toBe(200); + const reader = res.body?.getReader(); + expect(reader).toBeTruthy(); + const streamState = { buffer: "" }; + const historyEvent = await readSseEvent(reader!, streamState); + expect(historyEvent.event).toBe("history"); + expect( + (historyEvent.data as { messages?: Array<{ content?: Array<{ text?: string }> }> }) + .messages?.[0]?.content?.[0]?.text, + ).toBe("second message"); + + const appended = await appendAssistantMessageToSessionTranscript({ + sessionKey: "agent:main:main", + text: "third message", + storePath, + }); + expect(appended.ok).toBe(true); + + const nextEvent = await readSseEvent(reader!, streamState); + expect(nextEvent.event).toBe("history"); + expect( + (nextEvent.data as { messages?: Array<{ content?: Array<{ text?: string }> }> }) + .messages?.[0]?.content?.[0]?.text, + ).toBe("third message"); + + await reader?.cancel(); + } finally { + await harness.close(); + } + }); + test("streams session history updates over SSE", async () => { const { storePath } = await seedSession({ text: "first message" }); diff --git a/src/gateway/sessions-history-http.ts b/src/gateway/sessions-history-http.ts index 44e6494dbce..2a0831be525 100644 --- a/src/gateway/sessions-history-http.ts +++ b/src/gateway/sessions-history-http.ts @@ -20,6 +20,8 @@ import { resolveSessionTranscriptCandidates, } from "./session-utils.js"; +const MAX_SESSION_HISTORY_LIMIT = 1000; + function resolveSessionHistoryPath(req: IncomingMessage): string | null { const url = new URL(req.url ?? "/", `http://${req.headers.host ?? "localhost"}`); const match = url.pathname.match(/^\/sessions\/([^/]+)\/history$/); @@ -48,7 +50,7 @@ function resolveLimit(req: IncomingMessage): number | undefined { if (!Number.isFinite(value) || value < 1) { return 1; } - return Math.max(1, value); + return Math.min(MAX_SESSION_HISTORY_LIMIT, Math.max(1, value)); } function maybeLimitMessages(messages: unknown[], limit: number | undefined): unknown[] { @@ -168,29 +170,30 @@ export async function handleSessionHistoryHttpRequest( if (!updatePath || !transcriptCandidates.has(updatePath)) { return; } - const nextMessages = maybeLimitMessages( - readSessionMessages(entry.sessionId, target.storePath, entry.sessionFile), - limit, - ); - if (nextMessages.length < sentMessages.length) { - sentMessages = nextMessages; + if (update.message !== undefined) { + if (limit === undefined) { + sentMessages = [...sentMessages, update.message]; + sseWrite(res, "message", { + sessionKey: target.canonicalKey, + message: update.message, + }); + return; + } + sentMessages = maybeLimitMessages([...sentMessages, update.message], limit); sseWrite(res, "history", { sessionKey: target.canonicalKey, messages: sentMessages, }); return; } - if (nextMessages.length === sentMessages.length) { - return; - } - const appended = nextMessages.slice(sentMessages.length); - sentMessages = nextMessages; - for (const message of appended) { - sseWrite(res, "message", { - sessionKey: target.canonicalKey, - message, - }); - } + sentMessages = maybeLimitMessages( + readSessionMessages(entry.sessionId, target.storePath, entry.sessionFile), + limit, + ); + sseWrite(res, "history", { + sessionKey: target.canonicalKey, + messages: sentMessages, + }); }); const cleanup = () => {