From 6f9e2b664c6a7d201d64a03c1d90876232d55730 Mon Sep 17 00:00:00 2001 From: Tyler Yust Date: Thu, 12 Mar 2026 14:31:03 -0700 Subject: [PATCH] fix: tighten dashboard session API metadata --- ...ols.subagents.sessions-spawn.model.test.ts | 7 +- src/agents/subagent-spawn.ts | 37 +- src/discord/monitor/gateway-plugin.ts | 28 +- src/discord/monitor/provider.proxy.test.ts | 61 ++- src/gateway/server-chat.agent-events.test.ts | 3 + src/gateway/server-chat.ts | 45 ++- src/gateway/server.impl.ts | 35 ++ src/gateway/session-message-events.test.ts | 109 ++++++ src/gateway/session-utils.fs.test.ts | 40 ++ src/gateway/session-utils.fs.ts | 67 +++- src/gateway/session-utils.test.ts | 89 ++++- src/gateway/session-utils.ts | 350 +++++++++++------- src/gateway/sessions-history-http.test.ts | 64 ++++ src/gateway/sessions-history-http.ts | 111 ++++-- 14 files changed, 814 insertions(+), 232 deletions(-) diff --git a/src/agents/openclaw-tools.subagents.sessions-spawn.model.test.ts b/src/agents/openclaw-tools.subagents.sessions-spawn.model.test.ts index 042f479d5e4..1253957cd46 100644 --- a/src/agents/openclaw-tools.subagents.sessions-spawn.model.test.ts +++ b/src/agents/openclaw-tools.subagents.sessions-spawn.model.test.ts @@ -129,12 +129,11 @@ describe("openclaw-tools: subagents (sessions_spawn model + thinking)", () => { expect(patchIndex).toBeGreaterThan(-1); expect(agentIndex).toBeGreaterThan(-1); expect(patchIndex).toBeLessThan(agentIndex); - const patchCall = calls.find( - (call) => call.method === "sessions.patch" && (call.params as { model?: string })?.model, - ); - expect(patchCall?.params).toMatchObject({ + const patchCalls = calls.filter((call) => call.method === "sessions.patch"); + expect(patchCalls[0]?.params).toMatchObject({ key: expect.stringContaining("subagent:"), model: "claude-haiku-4-5", + spawnDepth: 1, }); }); diff --git a/src/agents/subagent-spawn.ts b/src/agents/subagent-spawn.ts index a4a6229c715..33b8ade03d0 100644 --- a/src/agents/subagent-spawn.ts +++ b/src/agents/subagent-spawn.ts @@ -438,42 +438,29 @@ export async function spawnSubagentDirect( } }; - const spawnDepthPatchError = await patchChildSession({ + const initialChildSessionPatch: Record = { spawnDepth: childDepth, subagentRole: childCapabilities.role === "main" ? null : childCapabilities.role, subagentControlScope: childCapabilities.controlScope, - }); - if (spawnDepthPatchError) { + }; + if (resolvedModel) { + initialChildSessionPatch.model = resolvedModel; + } + if (thinkingOverride !== undefined) { + initialChildSessionPatch.thinkingLevel = thinkingOverride === "off" ? null : thinkingOverride; + } + + const initialPatchError = await patchChildSession(initialChildSessionPatch); + if (initialPatchError) { return { status: "error", - error: spawnDepthPatchError, + error: initialPatchError, childSessionKey, }; } - if (resolvedModel) { - const modelPatchError = await patchChildSession({ model: resolvedModel }); - if (modelPatchError) { - return { - status: "error", - error: modelPatchError, - childSessionKey, - }; - } modelApplied = true; } - if (thinkingOverride !== undefined) { - const thinkingPatchError = await patchChildSession({ - thinkingLevel: thinkingOverride === "off" ? null : thinkingOverride, - }); - if (thinkingPatchError) { - return { - status: "error", - error: thinkingPatchError, - childSessionKey, - }; - } - } if (requestThreadBinding) { const bindResult = await ensureThreadBindingForSubagentSpawn({ hookRunner, diff --git a/src/discord/monitor/gateway-plugin.ts b/src/discord/monitor/gateway-plugin.ts index c86b6259c5e..b06db9514b2 100644 --- a/src/discord/monitor/gateway-plugin.ts +++ b/src/discord/monitor/gateway-plugin.ts @@ -63,11 +63,31 @@ export function createDiscordGatewayPlugin(params: { }, dispatcher: fetchAgent, } as Record); - this.gatewayInfo = (await response.json()) as APIGatewayBotInfo; + const bodyText = await response.text(); + if (!response.ok) { + const preview = bodyText.trim().slice(0, 160) || ``; + params.runtime.error?.( + danger( + `discord: failed to fetch gateway metadata through proxy, status=${response.status}, body=${JSON.stringify(preview)}`, + ), + ); + } else { + try { + this.gatewayInfo = JSON.parse(bodyText) as APIGatewayBotInfo; + } catch (error) { + const preview = bodyText.trim().slice(0, 160) || ""; + params.runtime.error?.( + danger( + `discord: invalid gateway metadata response through proxy, body=${JSON.stringify(preview)}, error=${error instanceof Error ? error.message : String(error)}`, + ), + ); + } + } } catch (error) { - throw new Error( - `Failed to get gateway information from Discord: ${error instanceof Error ? error.message : String(error)}`, - { cause: error }, + params.runtime.error?.( + danger( + `discord: failed to fetch gateway metadata through proxy: ${error instanceof Error ? error.message : String(error)}`, + ), ); } } diff --git a/src/discord/monitor/provider.proxy.test.ts b/src/discord/monitor/provider.proxy.test.ts index 4d43469e2e4..466dfda02a5 100644 --- a/src/discord/monitor/provider.proxy.test.ts +++ b/src/discord/monitor/provider.proxy.test.ts @@ -169,7 +169,9 @@ describe("createDiscordGatewayPlugin", () => { it("uses proxy fetch for gateway metadata lookup before registering", async () => { const runtime = createRuntime(); undiciFetchMock.mockResolvedValue({ - json: async () => ({ url: "wss://gateway.discord.gg" }), + ok: true, + status: 200, + text: async () => JSON.stringify({ url: "wss://gateway.discord.gg" }), } as Response); const plugin = createDiscordGatewayPlugin({ discordConfig: { proxy: "http://proxy.test:8080" }, @@ -193,5 +195,62 @@ describe("createDiscordGatewayPlugin", () => { }), ); expect(baseRegisterClientSpy).toHaveBeenCalledTimes(1); + expect(runtime.error).not.toHaveBeenCalled(); + }); + + it("logs and continues when Discord returns invalid JSON through the proxy", async () => { + const runtime = createRuntime(); + undiciFetchMock.mockResolvedValue({ + ok: true, + status: 200, + text: async () => "upstream c", + } as Response); + const plugin = createDiscordGatewayPlugin({ + discordConfig: { proxy: "http://proxy.test:8080" }, + runtime, + }); + + await expect( + ( + plugin as unknown as { + registerClient: (client: { options: { token: string } }) => Promise; + } + ).registerClient({ + options: { token: "token-123" }, + }), + ).resolves.toBeUndefined(); + + expect(baseRegisterClientSpy).toHaveBeenCalledTimes(1); + expect(runtime.error).toHaveBeenCalledWith( + expect.stringContaining("invalid gateway metadata response through proxy"), + ); + }); + + it("logs non-200 proxy responses instead of throwing", async () => { + const runtime = createRuntime(); + undiciFetchMock.mockResolvedValue({ + ok: false, + status: 502, + text: async () => "upstream crash", + } as Response); + const plugin = createDiscordGatewayPlugin({ + discordConfig: { proxy: "http://proxy.test:8080" }, + runtime, + }); + + await expect( + ( + plugin as unknown as { + registerClient: (client: { options: { token: string } }) => Promise; + } + ).registerClient({ + options: { token: "token-123" }, + }), + ).resolves.toBeUndefined(); + + expect(baseRegisterClientSpy).toHaveBeenCalledTimes(1); + expect(runtime.error).toHaveBeenCalledWith( + expect.stringContaining("failed to fetch gateway metadata through proxy, status=502"), + ); }); }); diff --git a/src/gateway/server-chat.agent-events.test.ts b/src/gateway/server-chat.agent-events.test.ts index 6d705fc4a8c..47878ee5ce2 100644 --- a/src/gateway/server-chat.agent-events.test.ts +++ b/src/gateway/server-chat.agent-events.test.ts @@ -5,6 +5,7 @@ import { resolveHeartbeatVisibility } from "../infra/heartbeat-visibility.js"; import { createAgentEventHandler, createChatRunState, + createSessionEventSubscriberRegistry, createToolEventRecipientRegistry, } from "./server-chat.js"; @@ -47,6 +48,7 @@ describe("agent event handler", () => { const agentRunSeq = new Map(); const chatRunState = createChatRunState(); const toolEventRecipients = createToolEventRecipientRegistry(); + const sessionEventSubscribers = createSessionEventSubscriberRegistry(); const handler = createAgentEventHandler({ broadcast, @@ -57,6 +59,7 @@ describe("agent event handler", () => { resolveSessionKeyForRun: params?.resolveSessionKeyForRun ?? (() => undefined), clearAgentRunContext: vi.fn(), toolEventRecipients, + sessionEventSubscribers, }); return { diff --git a/src/gateway/server-chat.ts b/src/gateway/server-chat.ts index 1b4a06b174b..a1a213bc8fd 100644 --- a/src/gateway/server-chat.ts +++ b/src/gateway/server-chat.ts @@ -5,7 +5,7 @@ import { loadConfig } from "../config/config.js"; import { type AgentEventPayload, getAgentRunContext } from "../infra/agent-events.js"; import { resolveHeartbeatVisibility } from "../infra/heartbeat-visibility.js"; import { stripInlineDirectiveTagsForDisplay } from "../utils/directive-tags.js"; -import { loadSessionEntry } from "./session-utils.js"; +import { loadGatewaySessionRow, loadSessionEntry } from "./session-utils.js"; import { formatForLog } from "./ws-log.js"; function resolveHeartbeatAckMaxChars(): number { @@ -459,12 +459,24 @@ export function createAgentEventHandler({ toolEventRecipients, sessionEventSubscribers, }: AgentEventHandlerOptions) { - const emitSessionEvent = (event: string, payload: unknown) => { - const connIds = sessionEventSubscribers.getAll(); - if (connIds.size === 0) { - return; + const buildSessionEventSnapshot = (sessionKey: string) => { + const row = loadGatewaySessionRow(sessionKey); + if (!row) { + return {}; } - broadcastToConnIds(event, payload, connIds, { dropIfSlow: true }); + return { + session: row, + totalTokens: row.totalTokens, + totalTokensFresh: row.totalTokensFresh, + contextTokens: row.contextTokens, + estimatedCostUsd: row.estimatedCostUsd, + modelProvider: row.modelProvider, + model: row.model, + status: row.status, + startedAt: row.startedAt, + endedAt: row.endedAt, + runtimeMs: row.runtimeMs, + }; }; const emitChatDelta = ( @@ -778,12 +790,21 @@ export function createAgentEventHandler({ sessionKey && (lifecyclePhase === "start" || lifecyclePhase === "end" || lifecyclePhase === "error") ) { - emitSessionEvent("sessions.changed", { - sessionKey, - phase: lifecyclePhase, - runId: evt.runId, - ts: evt.ts, - }); + const sessionEventConnIds = sessionEventSubscribers.getAll(); + if (sessionEventConnIds.size > 0) { + broadcastToConnIds( + "sessions.changed", + { + sessionKey, + phase: lifecyclePhase, + runId: evt.runId, + ts: evt.ts, + ...buildSessionEventSnapshot(sessionKey), + }, + sessionEventConnIds, + { dropIfSlow: true }, + ); + } } }; } diff --git a/src/gateway/server.impl.ts b/src/gateway/server.impl.ts index fed40bdbaba..98b8122a456 100644 --- a/src/gateway/server.impl.ts +++ b/src/gateway/server.impl.ts @@ -118,6 +118,7 @@ import { loadGatewayTlsRuntime } from "./server/tls.js"; import { resolveSessionKeyForTranscriptFile } from "./session-transcript-key.js"; import { attachOpenClawTranscriptMeta, + loadGatewaySessionRow, loadSessionEntry, readSessionMessages, } from "./session-utils.js"; @@ -784,6 +785,22 @@ export async function startGatewayServer( const messageSeq = entry?.sessionId ? readSessionMessages(entry.sessionId, storePath, entry.sessionFile).length : undefined; + const sessionRow = loadGatewaySessionRow(sessionKey); + const sessionSnapshot = sessionRow + ? { + session: sessionRow, + totalTokens: sessionRow.totalTokens, + totalTokensFresh: sessionRow.totalTokensFresh, + contextTokens: sessionRow.contextTokens, + estimatedCostUsd: sessionRow.estimatedCostUsd, + modelProvider: sessionRow.modelProvider, + model: sessionRow.model, + status: sessionRow.status, + startedAt: sessionRow.startedAt, + endedAt: sessionRow.endedAt, + runtimeMs: sessionRow.runtimeMs, + } + : {}; const message = attachOpenClawTranscriptMeta(update.message, { ...(typeof update.messageId === "string" ? { id: update.messageId } : {}), ...(typeof messageSeq === "number" ? { seq: messageSeq } : {}), @@ -795,10 +812,28 @@ export async function startGatewayServer( message, ...(typeof update.messageId === "string" ? { messageId: update.messageId } : {}), ...(typeof messageSeq === "number" ? { messageSeq } : {}), + ...sessionSnapshot, }, connIds, { dropIfSlow: true }, ); + + const sessionEventConnIds = sessionEventSubscribers.getAll(); + if (sessionEventConnIds.size > 0) { + broadcastToConnIds( + "sessions.changed", + { + sessionKey, + phase: "message", + ts: Date.now(), + ...(typeof update.messageId === "string" ? { messageId: update.messageId } : {}), + ...(typeof messageSeq === "number" ? { messageSeq } : {}), + ...sessionSnapshot, + }, + sessionEventConnIds, + { dropIfSlow: true }, + ); + } }); let heartbeatRunner: HeartbeatRunner = minimalTestGateway diff --git a/src/gateway/session-message-events.test.ts b/src/gateway/session-message-events.test.ts index 08469910890..238cb826c9e 100644 --- a/src/gateway/session-message-events.test.ts +++ b/src/gateway/session-message-events.test.ts @@ -3,6 +3,7 @@ import os from "node:os"; import path from "node:path"; import { afterEach, describe, expect, test } from "vitest"; import { appendAssistantMessageToSessionTranscript } from "../config/sessions/transcript.js"; +import { emitSessionTranscriptUpdate } from "../sessions/transcript-events.js"; import { testState } from "./test-helpers.mocks.js"; import { connectOk, @@ -155,6 +156,114 @@ describe("session.message websocket events", () => { } }); + test("includes live usage metadata on session.message and sessions.changed transcript events", async () => { + const storePath = await createSessionStoreFile(); + await writeSessionStore({ + entries: { + main: { + sessionId: "sess-main", + updatedAt: Date.now(), + modelProvider: "openai", + model: "gpt-5.4", + contextTokens: 123_456, + totalTokens: 0, + totalTokensFresh: false, + }, + }, + storePath, + }); + const transcriptPath = path.join(path.dirname(storePath), "sess-main.jsonl"); + const transcriptMessage = { + role: "assistant", + content: [{ type: "text", text: "usage snapshot" }], + provider: "openai", + model: "gpt-5.4", + usage: { + input: 2_000, + output: 400, + cacheRead: 300, + cacheWrite: 100, + cost: { total: 0.0042 }, + }, + timestamp: Date.now(), + }; + await fs.writeFile( + transcriptPath, + [ + JSON.stringify({ type: "session", version: 1, id: "sess-main" }), + JSON.stringify({ id: "msg-usage", message: transcriptMessage }), + ].join("\n"), + "utf-8", + ); + + const harness = await createGatewaySuiteHarness(); + try { + const ws = await harness.openWs(); + try { + await connectOk(ws, { scopes: ["operator.read"] }); + await rpcReq(ws, "sessions.subscribe"); + + const messageEventPromise = onceMessage( + ws, + (message) => + message.type === "event" && + message.event === "session.message" && + (message.payload as { sessionKey?: string } | undefined)?.sessionKey === + "agent:main:main", + ); + const changedEventPromise = onceMessage( + ws, + (message) => + message.type === "event" && + message.event === "sessions.changed" && + (message.payload as { phase?: string; sessionKey?: string } | undefined)?.phase === + "message" && + (message.payload as { sessionKey?: string } | undefined)?.sessionKey === + "agent:main:main", + ); + + emitSessionTranscriptUpdate({ + sessionFile: transcriptPath, + sessionKey: "agent:main:main", + message: transcriptMessage, + messageId: "msg-usage", + }); + + const [messageEvent, changedEvent] = await Promise.all([ + messageEventPromise, + changedEventPromise, + ]); + expect(messageEvent.payload).toMatchObject({ + sessionKey: "agent:main:main", + messageId: "msg-usage", + messageSeq: 1, + totalTokens: 2_400, + totalTokensFresh: true, + contextTokens: 123_456, + estimatedCostUsd: 0.0042, + modelProvider: "openai", + model: "gpt-5.4", + }); + expect(changedEvent.payload).toMatchObject({ + sessionKey: "agent:main:main", + phase: "message", + messageId: "msg-usage", + messageSeq: 1, + totalTokens: 2_400, + totalTokensFresh: true, + contextTokens: 123_456, + estimatedCostUsd: 0.0042, + modelProvider: "openai", + model: "gpt-5.4", + }); + } finally { + ws.close(); + } + } finally { + await harness.close(); + } + }); + test("sessions.messages.subscribe only delivers transcript events for the requested session", async () => { const storePath = await createSessionStoreFile(); await writeSessionStore({ diff --git a/src/gateway/session-utils.fs.test.ts b/src/gateway/session-utils.fs.test.ts index 608e1ec42ec..a6ffdebb0af 100644 --- a/src/gateway/session-utils.fs.test.ts +++ b/src/gateway/session-utils.fs.test.ts @@ -699,6 +699,46 @@ describe("readLatestSessionUsageFromTranscript", () => { }); }); + test("backfills missing model and cost fields from earlier assistant usage snapshots", () => { + const sessionId = "usage-aggregate"; + writeTranscript(tmpDir, sessionId, [ + { type: "session", version: 1, id: sessionId }, + { + message: { + role: "assistant", + provider: "anthropic", + model: "claude-sonnet-4-6", + usage: { + input: 1_800, + output: 400, + cacheRead: 600, + cost: { total: 0.0055 }, + }, + }, + }, + { + message: { + role: "assistant", + usage: { + input: 2_400, + cacheRead: 900, + }, + }, + }, + ]); + + expect(readLatestSessionUsageFromTranscript(sessionId, storePath)).toEqual({ + modelProvider: "anthropic", + model: "claude-sonnet-4-6", + inputTokens: 2400, + outputTokens: 400, + cacheRead: 900, + totalTokens: 3300, + totalTokensFresh: true, + costUsd: 0.0055, + }); + }); + test("returns null when the transcript has no assistant usage snapshot", () => { const sessionId = "usage-empty"; writeTranscript(tmpDir, sessionId, [ diff --git a/src/gateway/session-utils.fs.ts b/src/gateway/session-utils.fs.ts index cd9ac817252..5ca974cbb1e 100644 --- a/src/gateway/session-utils.fs.ts +++ b/src/gateway/session-utils.fs.ts @@ -594,10 +594,17 @@ function readTailChunk(fd: number, size: number, maxBytes: number): string | nul return buf.toString("utf-8"); } +function resolvePositiveUsageNumber(value: unknown): number | undefined { + return typeof value === "number" && Number.isFinite(value) && value > 0 ? value : undefined; +} + function extractLatestUsageFromTranscriptChunk( chunk: string, ): SessionTranscriptUsageSnapshot | null { const lines = chunk.split(/\r?\n/).filter((line) => line.trim().length > 0); + const snapshot: SessionTranscriptUsageSnapshot = {}; + let sawSnapshot = false; + for (let i = lines.length - 1; i >= 0; i -= 1) { const line = lines[i]; try { @@ -620,7 +627,7 @@ function extractLatestUsageFromTranscriptChunk( ? parsed.usage : undefined; const usage = normalizeUsage(usageRaw); - const totalTokens = deriveSessionTotalTokens({ usage }); + const totalTokens = resolvePositiveUsageNumber(deriveSessionTotalTokens({ usage })); const costUsd = extractTranscriptUsageCost(usageRaw); const modelProvider = typeof message.provider === "string" @@ -637,8 +644,8 @@ function extractLatestUsageFromTranscriptChunk( const isDeliveryMirror = modelProvider === "openclaw" && model === "delivery-mirror"; const hasMeaningfulUsage = hasNonzeroUsage(usage) || - (typeof totalTokens === "number" && Number.isFinite(totalTokens) && totalTokens > 0) || - (typeof costUsd === "number" && Number.isFinite(costUsd) && costUsd > 0); + typeof totalTokens === "number" || + (typeof costUsd === "number" && Number.isFinite(costUsd)); const hasModelIdentity = Boolean(modelProvider || model); if (!hasMeaningfulUsage && !hasModelIdentity) { continue; @@ -646,23 +653,51 @@ function extractLatestUsageFromTranscriptChunk( if (isDeliveryMirror && !hasMeaningfulUsage) { continue; } - return { - ...(modelProvider ? { modelProvider } : {}), - ...(model ? { model } : {}), - ...(typeof usage?.input === "number" ? { inputTokens: usage.input } : {}), - ...(typeof usage?.output === "number" ? { outputTokens: usage.output } : {}), - ...(typeof usage?.cacheRead === "number" ? { cacheRead: usage.cacheRead } : {}), - ...(typeof usage?.cacheWrite === "number" ? { cacheWrite: usage.cacheWrite } : {}), - ...(typeof totalTokens === "number" && Number.isFinite(totalTokens) && totalTokens > 0 - ? { totalTokens, totalTokensFresh: true } - : {}), - ...(typeof costUsd === "number" && Number.isFinite(costUsd) ? { costUsd } : {}), - }; + + sawSnapshot = true; + if (!snapshot.modelProvider && modelProvider) { + snapshot.modelProvider = modelProvider; + } + if (!snapshot.model && model) { + snapshot.model = model; + } + if (snapshot.inputTokens === undefined) { + snapshot.inputTokens = resolvePositiveUsageNumber(usage?.input); + } + if (snapshot.outputTokens === undefined) { + snapshot.outputTokens = resolvePositiveUsageNumber(usage?.output); + } + if (snapshot.cacheRead === undefined) { + snapshot.cacheRead = resolvePositiveUsageNumber(usage?.cacheRead); + } + if (snapshot.cacheWrite === undefined) { + snapshot.cacheWrite = resolvePositiveUsageNumber(usage?.cacheWrite); + } + if (snapshot.totalTokens === undefined && typeof totalTokens === "number") { + snapshot.totalTokens = totalTokens; + snapshot.totalTokensFresh = true; + } + if ( + snapshot.costUsd === undefined && + typeof costUsd === "number" && + Number.isFinite(costUsd) + ) { + snapshot.costUsd = costUsd; + } + + if ( + snapshot.modelProvider && + snapshot.model && + snapshot.totalTokens !== undefined && + snapshot.costUsd !== undefined + ) { + break; + } } catch { // skip malformed lines } } - return null; + return sawSnapshot ? snapshot : null; } export function readLatestSessionUsageFromTranscript( diff --git a/src/gateway/session-utils.test.ts b/src/gateway/session-utils.test.ts index 66e21fea20e..19ac3b4b425 100644 --- a/src/gateway/session-utils.test.ts +++ b/src/gateway/session-utils.test.ts @@ -1,7 +1,7 @@ import fs from "node:fs"; import os from "node:os"; import path from "node:path"; -import { afterEach, describe, expect, test } from "vitest"; +import { afterEach, beforeEach, describe, expect, test } from "vitest"; import { addSubagentRunForTests, resetSubagentRegistryForTests, @@ -941,9 +941,96 @@ describe("listSessionsFromStore search", () => { fs.rmSync(tmpDir, { recursive: true, force: true }); } }); + + test("uses subagent run model immediately for child sessions while transcript usage fills live totals", () => { + const tmpDir = fs.mkdtempSync(path.join(os.tmpdir(), "openclaw-session-utils-subagent-")); + const storePath = path.join(tmpDir, "sessions.json"); + const now = Date.now(); + const cfg = { + session: { mainKey: "main" }, + agents: { + list: [{ id: "main", default: true }], + defaults: { + models: { + "anthropic/claude-sonnet-4-6": { params: { context1m: true } }, + }, + }, + }, + } as unknown as OpenClawConfig; + fs.writeFileSync( + path.join(tmpDir, "sess-child.jsonl"), + [ + JSON.stringify({ type: "session", version: 1, id: "sess-child" }), + JSON.stringify({ + message: { + role: "assistant", + provider: "anthropic", + model: "claude-sonnet-4-6", + usage: { + input: 2_000, + output: 500, + cacheRead: 1_200, + cost: { total: 0.007725 }, + }, + }, + }), + ].join("\n"), + "utf-8", + ); + + addSubagentRunForTests({ + runId: "run-child-live", + childSessionKey: "agent:main:subagent:child-live", + controllerSessionKey: "agent:main:main", + requesterSessionKey: "agent:main:main", + requesterDisplayKey: "main", + task: "child task", + cleanup: "keep", + createdAt: now - 5_000, + startedAt: now - 4_000, + model: "anthropic/claude-sonnet-4-6", + }); + + try { + const result = listSessionsFromStore({ + cfg, + storePath, + store: { + "agent:main:subagent:child-live": { + sessionId: "sess-child", + updatedAt: now, + spawnedBy: "agent:main:main", + totalTokens: 0, + totalTokensFresh: false, + } as SessionEntry, + }, + opts: {}, + }); + + expect(result.sessions[0]).toMatchObject({ + key: "agent:main:subagent:child-live", + status: "running", + modelProvider: "anthropic", + model: "claude-sonnet-4-6", + totalTokens: 3_200, + totalTokensFresh: true, + contextTokens: 1_048_576, + }); + expect(result.sessions[0]?.estimatedCostUsd).toBeCloseTo(0.007725, 8); + } finally { + fs.rmSync(tmpDir, { recursive: true, force: true }); + } + }); }); describe("listSessionsFromStore subagent metadata", () => { + afterEach(() => { + resetSubagentRegistryForTests({ persist: false }); + }); + beforeEach(() => { + resetSubagentRegistryForTests({ persist: false }); + }); + const cfg = { session: { mainKey: "main" }, agents: { list: [{ id: "main", default: true }] }, diff --git a/src/gateway/session-utils.ts b/src/gateway/session-utils.ts index 8aa5fd4152b..039e6c77291 100644 --- a/src/gateway/session-utils.ts +++ b/src/gateway/session-utils.ts @@ -297,6 +297,8 @@ function resolveTranscriptUsageFallback(params: { key: string; entry?: SessionEntry; storePath: string; + fallbackProvider?: string; + fallbackModel?: string; }): { estimatedCostUsd?: number; totalTokens?: number; @@ -320,15 +322,17 @@ function resolveTranscriptUsageFallback(params: { if (!snapshot) { return null; } + const modelProvider = snapshot.modelProvider ?? params.fallbackProvider; + const model = snapshot.model ?? params.fallbackModel; const contextTokens = resolveContextTokensForModel({ cfg: params.cfg, - provider: snapshot.modelProvider, - model: snapshot.model, + provider: modelProvider, + model, }); const estimatedCostUsd = resolveEstimatedSessionCostUsd({ cfg: params.cfg, - provider: snapshot.modelProvider, - model: snapshot.model, + provider: modelProvider, + model, explicitCostUsd: snapshot.costUsd, entry: { inputTokens: snapshot.inputTokens, @@ -959,6 +963,7 @@ export function resolveSessionModelIdentityRef( | SessionEntry | Pick, agentId?: string, + fallbackModelRef?: string, ): { provider?: string; model: string } { const runtimeModel = entry?.model?.trim(); const runtimeProvider = entry?.modelProvider?.trim(); @@ -982,10 +987,198 @@ export function resolveSessionModelIdentityRef( } return { model: runtimeModel }; } + const fallbackRef = fallbackModelRef?.trim(); + if (fallbackRef) { + const parsedFallback = parseModelRef(fallbackRef, DEFAULT_PROVIDER); + if (parsedFallback) { + return { provider: parsedFallback.provider, model: parsedFallback.model }; + } + const inferredProvider = inferUniqueProviderFromConfiguredModels({ + cfg, + model: fallbackRef, + }); + if (inferredProvider) { + return { provider: inferredProvider, model: fallbackRef }; + } + return { model: fallbackRef }; + } const resolved = resolveSessionModelRef(cfg, entry, agentId); return { provider: resolved.provider, model: resolved.model }; } +export function buildGatewaySessionRow(params: { + cfg: OpenClawConfig; + storePath: string; + store: Record; + key: string; + entry?: SessionEntry; + now?: number; + includeDerivedTitles?: boolean; + includeLastMessage?: boolean; +}): GatewaySessionRow { + const { cfg, storePath, store, key, entry } = params; + const now = params.now ?? Date.now(); + const updatedAt = entry?.updatedAt ?? null; + const parsed = parseGroupKey(key); + const channel = entry?.channel ?? parsed?.channel; + const subject = entry?.subject; + const groupChannel = entry?.groupChannel; + const space = entry?.space; + const id = parsed?.id; + const origin = entry?.origin; + const originLabel = origin?.label; + const displayName = + entry?.displayName ?? + (channel + ? buildGroupDisplayName({ + provider: channel, + subject, + groupChannel, + space, + id, + key, + }) + : undefined) ?? + entry?.label ?? + originLabel; + const deliveryFields = normalizeSessionDeliveryFields(entry); + const parsedAgent = parseAgentSessionKey(key); + const sessionAgentId = normalizeAgentId(parsedAgent?.agentId ?? resolveDefaultAgentId(cfg)); + const subagentRun = getSubagentRunByChildSessionKey(key); + const resolvedModel = resolveSessionModelIdentityRef( + cfg, + entry, + sessionAgentId, + subagentRun?.model, + ); + const modelProvider = resolvedModel.provider; + const model = resolvedModel.model ?? DEFAULT_MODEL; + const transcriptUsage = + resolvePositiveNumber(resolveFreshSessionTotalTokens(entry)) === undefined || + resolvePositiveNumber(entry?.contextTokens) === undefined || + resolveEstimatedSessionCostUsd({ + cfg, + provider: modelProvider, + model, + entry, + }) === undefined + ? resolveTranscriptUsageFallback({ + cfg, + key, + entry, + storePath, + fallbackProvider: modelProvider, + fallbackModel: model, + }) + : null; + const totalTokens = + resolvePositiveNumber(resolveFreshSessionTotalTokens(entry)) ?? + resolvePositiveNumber(transcriptUsage?.totalTokens); + const totalTokensFresh = + typeof totalTokens === "number" && Number.isFinite(totalTokens) && totalTokens > 0 + ? true + : transcriptUsage?.totalTokensFresh === true; + const childSessions = resolveChildSessionKeys(key, store); + const estimatedCostUsd = + resolveEstimatedSessionCostUsd({ + cfg, + provider: modelProvider, + model, + entry, + }) ?? resolvePositiveNumber(transcriptUsage?.estimatedCostUsd); + const contextTokens = + resolvePositiveNumber(entry?.contextTokens) ?? + resolvePositiveNumber(transcriptUsage?.contextTokens) ?? + resolvePositiveNumber( + resolveContextTokensForModel({ + cfg, + provider: modelProvider, + model, + }), + ); + + let derivedTitle: string | undefined; + let lastMessagePreview: string | undefined; + if (entry?.sessionId && (params.includeDerivedTitles || params.includeLastMessage)) { + const fields = readSessionTitleFieldsFromTranscript( + entry.sessionId, + storePath, + entry.sessionFile, + sessionAgentId, + ); + if (params.includeDerivedTitles) { + derivedTitle = deriveSessionTitle(entry, fields.firstUserMessage); + } + if (params.includeLastMessage && fields.lastMessagePreview) { + lastMessagePreview = fields.lastMessagePreview; + } + } + + return { + key, + spawnedBy: entry?.spawnedBy, + kind: classifySessionKey(key, entry), + label: entry?.label, + displayName, + derivedTitle, + lastMessagePreview, + channel, + subject, + groupChannel, + space, + chatType: entry?.chatType, + origin, + updatedAt, + sessionId: entry?.sessionId, + systemSent: entry?.systemSent, + abortedLastRun: entry?.abortedLastRun, + thinkingLevel: entry?.thinkingLevel, + verboseLevel: entry?.verboseLevel, + reasoningLevel: entry?.reasoningLevel, + elevatedLevel: entry?.elevatedLevel, + sendPolicy: entry?.sendPolicy, + inputTokens: entry?.inputTokens, + outputTokens: entry?.outputTokens, + totalTokens, + totalTokensFresh, + estimatedCostUsd, + status: resolveSessionRunStatus(subagentRun), + startedAt: subagentRun?.startedAt, + endedAt: subagentRun?.endedAt, + runtimeMs: resolveSessionRuntimeMs(subagentRun, now), + parentSessionKey: entry?.parentSessionKey, + childSessions, + responseUsage: entry?.responseUsage, + modelProvider, + model, + contextTokens, + deliveryContext: deliveryFields.deliveryContext, + lastChannel: deliveryFields.lastChannel ?? entry?.lastChannel, + lastTo: deliveryFields.lastTo ?? entry?.lastTo, + lastAccountId: deliveryFields.lastAccountId ?? entry?.lastAccountId, + }; +} + +export function loadGatewaySessionRow( + sessionKey: string, + options?: { includeDerivedTitles?: boolean; includeLastMessage?: boolean; now?: number }, +): GatewaySessionRow | null { + const { cfg, storePath, store, entry, canonicalKey } = loadSessionEntry(sessionKey); + if (!entry) { + return null; + } + return buildGatewaySessionRow({ + cfg, + storePath, + store, + key: canonicalKey, + entry, + now: options?.now, + includeDerivedTitles: options?.includeDerivedTitles, + includeLastMessage: options?.includeLastMessage, + }); +} + export function listSessionsFromStore(params: { cfg: OpenClawConfig; storePath: string; @@ -1046,117 +1239,18 @@ export function listSessionsFromStore(params: { } return entry?.label === label; }) - .map(([key, entry]) => { - const updatedAt = entry?.updatedAt ?? null; - const parsed = parseGroupKey(key); - const channel = entry?.channel ?? parsed?.channel; - const subject = entry?.subject; - const groupChannel = entry?.groupChannel; - const space = entry?.space; - const id = parsed?.id; - const origin = entry?.origin; - const originLabel = origin?.label; - const displayName = - entry?.displayName ?? - (channel - ? buildGroupDisplayName({ - provider: channel, - subject, - groupChannel, - space, - id, - key, - }) - : undefined) ?? - entry?.label ?? - originLabel; - const deliveryFields = normalizeSessionDeliveryFields(entry); - const parsedAgent = parseAgentSessionKey(key); - const sessionAgentId = normalizeAgentId(parsedAgent?.agentId ?? resolveDefaultAgentId(cfg)); - const resolvedModel = resolveSessionModelIdentityRef(cfg, entry, sessionAgentId); - const modelProvider = resolvedModel.provider; - const model = resolvedModel.model ?? DEFAULT_MODEL; - const transcriptUsage = - resolvePositiveNumber(resolveFreshSessionTotalTokens(entry)) === undefined || - resolvePositiveNumber(entry?.contextTokens) === undefined || - resolveEstimatedSessionCostUsd({ - cfg, - provider: modelProvider, - model, - entry, - }) === undefined - ? resolveTranscriptUsageFallback({ cfg, key, entry, storePath }) - : null; - const totalTokens = - resolvePositiveNumber(resolveFreshSessionTotalTokens(entry)) ?? - resolvePositiveNumber(transcriptUsage?.totalTokens); - const totalTokensFresh = - typeof totalTokens === "number" && Number.isFinite(totalTokens) && totalTokens > 0 - ? true - : transcriptUsage?.totalTokensFresh === true; - const subagentRun = getSubagentRunByChildSessionKey(key); - const childSessions = resolveChildSessionKeys(key, store); - const estimatedCostUsd = - resolveEstimatedSessionCostUsd({ - cfg, - provider: modelProvider, - model, - entry, - }) ?? resolvePositiveNumber(transcriptUsage?.estimatedCostUsd); - const contextTokens = - resolvePositiveNumber(entry?.contextTokens) ?? - resolvePositiveNumber(transcriptUsage?.contextTokens) ?? - resolvePositiveNumber( - resolveContextTokensForModel({ - cfg, - provider: modelProvider, - model, - }), - ); - return { + .map(([key, entry]) => + buildGatewaySessionRow({ + cfg, + storePath, + store, key, - spawnedBy: entry?.spawnedBy, entry, - kind: classifySessionKey(key, entry), - label: entry?.label, - displayName, - channel, - subject, - groupChannel, - space, - chatType: entry?.chatType, - origin, - updatedAt, - sessionId: entry?.sessionId, - systemSent: entry?.systemSent, - abortedLastRun: entry?.abortedLastRun, - thinkingLevel: entry?.thinkingLevel, - fastMode: entry?.fastMode, - verboseLevel: entry?.verboseLevel, - reasoningLevel: entry?.reasoningLevel, - elevatedLevel: entry?.elevatedLevel, - sendPolicy: entry?.sendPolicy, - inputTokens: entry?.inputTokens, - outputTokens: entry?.outputTokens, - totalTokens, - totalTokensFresh, - estimatedCostUsd, - status: resolveSessionRunStatus(subagentRun), - startedAt: subagentRun?.startedAt, - endedAt: subagentRun?.endedAt, - runtimeMs: resolveSessionRuntimeMs(subagentRun, now), - parentSessionKey: entry?.parentSessionKey, - childSessions, - responseUsage: entry?.responseUsage, - modelProvider, - model, - contextTokens, - deliveryContext: deliveryFields.deliveryContext, - lastChannel: deliveryFields.lastChannel ?? entry?.lastChannel, - lastTo: deliveryFields.lastTo ?? entry?.lastTo, - lastAccountId: deliveryFields.lastAccountId ?? entry?.lastAccountId, - }; - }) + now, + includeDerivedTitles, + includeLastMessage, + }), + ) .toSorted((a, b) => (b.updatedAt ?? 0) - (a.updatedAt ?? 0)); if (search) { @@ -1176,37 +1270,11 @@ export function listSessionsFromStore(params: { sessions = sessions.slice(0, limit); } - const finalSessions: GatewaySessionRow[] = sessions.map((s) => { - const { entry, ...rest } = s; - let derivedTitle: string | undefined; - let lastMessagePreview: string | undefined; - if (entry?.sessionId) { - if (includeDerivedTitles || includeLastMessage) { - const parsed = parseAgentSessionKey(s.key); - const agentId = - parsed && parsed.agentId ? normalizeAgentId(parsed.agentId) : resolveDefaultAgentId(cfg); - const fields = readSessionTitleFieldsFromTranscript( - entry.sessionId, - storePath, - entry.sessionFile, - agentId, - ); - if (includeDerivedTitles) { - derivedTitle = deriveSessionTitle(entry, fields.firstUserMessage); - } - if (includeLastMessage && fields.lastMessagePreview) { - lastMessagePreview = fields.lastMessagePreview; - } - } - } - return { ...rest, derivedTitle, lastMessagePreview } satisfies GatewaySessionRow; - }); - return { ts: now, path: storePath, - count: finalSessions.length, + count: sessions.length, defaults: getSessionDefaults(cfg), - sessions: finalSessions, + sessions, }; } diff --git a/src/gateway/sessions-history-http.test.ts b/src/gateway/sessions-history-http.test.ts index dec57a75448..4f05516e25e 100644 --- a/src/gateway/sessions-history-http.test.ts +++ b/src/gateway/sessions-history-http.test.ts @@ -119,6 +119,70 @@ describe("session history HTTP endpoints", () => { } }); + test("supports cursor pagination over direct REST while preserving the messages field", async () => { + const { storePath } = await seedSession({ text: "first message" }); + const second = await appendAssistantMessageToSessionTranscript({ + sessionKey: "agent:main:main", + text: "second message", + storePath, + }); + expect(second.ok).toBe(true); + const third = await appendAssistantMessageToSessionTranscript({ + sessionKey: "agent:main:main", + text: "third message", + storePath, + }); + expect(third.ok).toBe(true); + + const harness = await createGatewaySuiteHarness(); + try { + const firstPage = await fetch( + `http://127.0.0.1:${harness.port}/sessions/${encodeURIComponent("agent:main:main")}/history?limit=2`, + { + headers: AUTH_HEADER, + }, + ); + expect(firstPage.status).toBe(200); + const firstBody = (await firstPage.json()) as { + sessionKey?: string; + items?: Array<{ content?: Array<{ text?: string }>; __openclaw?: { seq?: number } }>; + messages?: Array<{ content?: Array<{ text?: string }>; __openclaw?: { seq?: number } }>; + nextCursor?: string; + hasMore?: boolean; + }; + expect(firstBody.sessionKey).toBe("agent:main:main"); + expect(firstBody.items?.map((message) => message.content?.[0]?.text)).toEqual([ + "second message", + "third message", + ]); + expect(firstBody.messages?.map((message) => message.__openclaw?.seq)).toEqual([2, 3]); + expect(firstBody.hasMore).toBe(true); + expect(firstBody.nextCursor).toBe("2"); + + const secondPage = await fetch( + `http://127.0.0.1:${harness.port}/sessions/${encodeURIComponent("agent:main:main")}/history?limit=2&cursor=${encodeURIComponent(firstBody.nextCursor ?? "")}`, + { + headers: AUTH_HEADER, + }, + ); + expect(secondPage.status).toBe(200); + const secondBody = (await secondPage.json()) as { + items?: Array<{ content?: Array<{ text?: string }>; __openclaw?: { seq?: number } }>; + messages?: Array<{ __openclaw?: { seq?: number } }>; + nextCursor?: string; + hasMore?: boolean; + }; + expect(secondBody.items?.map((message) => message.content?.[0]?.text)).toEqual([ + "first message", + ]); + expect(secondBody.messages?.map((message) => message.__openclaw?.seq)).toEqual([1]); + expect(secondBody.hasMore).toBe(false); + expect(secondBody.nextCursor).toBeUndefined(); + } finally { + await harness.close(); + } + }); + test("streams bounded history windows over SSE", async () => { const { storePath } = await seedSession({ text: "first message" }); const second = await appendAssistantMessageToSessionTranscript({ diff --git a/src/gateway/sessions-history-http.ts b/src/gateway/sessions-history-http.ts index a61d85c46ba..c7fed6d888c 100644 --- a/src/gateway/sessions-history-http.ts +++ b/src/gateway/sessions-history-http.ts @@ -41,9 +41,12 @@ function shouldStreamSse(req: IncomingMessage): boolean { return accept.includes("text/event-stream"); } +function getRequestUrl(req: IncomingMessage): URL { + return new URL(req.url ?? "/", `http://${req.headers.host ?? "localhost"}`); +} + function resolveLimit(req: IncomingMessage): number | undefined { - const url = new URL(req.url ?? "/", `http://${req.headers.host ?? "localhost"}`); - const raw = url.searchParams.get("limit"); + const raw = getRequestUrl(req).searchParams.get("limit"); if (raw == null || raw.trim() === "") { return undefined; } @@ -54,11 +57,59 @@ function resolveLimit(req: IncomingMessage): number | undefined { return Math.min(MAX_SESSION_HISTORY_LIMIT, Math.max(1, value)); } -function maybeLimitMessages(messages: unknown[], limit: number | undefined): unknown[] { - if (limit === undefined || limit >= messages.length) { - return messages; +function resolveCursor(req: IncomingMessage): string | undefined { + const raw = getRequestUrl(req).searchParams.get("cursor"); + const trimmed = raw?.trim(); + return trimmed ? trimmed : undefined; +} + +type PaginatedSessionHistory = { + items: unknown[]; + messages: unknown[]; + nextCursor?: string; + hasMore: boolean; +}; + +function resolveCursorSeq(cursor: string | undefined): number | undefined { + if (!cursor) { + return undefined; } - return messages.slice(-limit); + const normalized = cursor.startsWith("seq:") ? cursor.slice(4) : cursor; + const value = Number.parseInt(normalized, 10); + return Number.isFinite(value) && value > 0 ? value : undefined; +} + +function resolveMessageSeq(message: unknown): number | undefined { + if (!message || typeof message !== "object" || Array.isArray(message)) { + return undefined; + } + const meta = (message as { __openclaw?: unknown }).__openclaw; + if (!meta || typeof meta !== "object" || Array.isArray(meta)) { + return undefined; + } + const seq = (meta as { seq?: unknown }).seq; + return typeof seq === "number" && Number.isFinite(seq) && seq > 0 ? seq : undefined; +} + +function paginateSessionMessages( + messages: unknown[], + limit: number | undefined, + cursor: string | undefined, +): PaginatedSessionHistory { + const cursorSeq = resolveCursorSeq(cursor); + const endExclusive = + typeof cursorSeq === "number" + ? Math.max(0, Math.min(messages.length, cursorSeq - 1)) + : messages.length; + const start = typeof limit === "number" && limit > 0 ? Math.max(0, endExclusive - limit) : 0; + const items = messages.slice(start, endExclusive); + const firstSeq = resolveMessageSeq(items[0]); + return { + items, + messages: items, + hasMore: start > 0, + ...(start > 0 && typeof firstSeq === "number" ? { nextCursor: String(firstSeq) } : {}), + }; } function canonicalizePath(value: string | undefined): string | undefined { @@ -121,17 +172,19 @@ export async function handleSessionHistoryHttpRequest( const store = loadSessionStore(target.storePath); const entry = target.storeKeys.map((key) => store[key]).find(Boolean); const limit = resolveLimit(req); - const messages = entry?.sessionId - ? maybeLimitMessages( - readSessionMessages(entry.sessionId, target.storePath, entry.sessionFile), - limit, - ) - : []; + const cursor = resolveCursor(req); + const history = paginateSessionMessages( + entry?.sessionId + ? readSessionMessages(entry.sessionId, target.storePath, entry.sessionFile) + : [], + limit, + cursor, + ); if (!shouldStreamSse(req)) { sendJson(res, 200, { sessionKey: target.canonicalKey, - messages, + ...history, }); return true; } @@ -149,12 +202,12 @@ export async function handleSessionHistoryHttpRequest( ) : new Set(); - let sentMessages = messages; + let sentHistory = history; setSseHeaders(res); res.write("retry: 1000\n\n"); sseWrite(res, "history", { sessionKey: target.canonicalKey, - messages: sentMessages, + ...sentHistory, }); const heartbeat = setInterval(() => { @@ -172,35 +225,37 @@ export async function handleSessionHistoryHttpRequest( return; } if (update.message !== undefined) { - const messageSeq = sentMessages.length + 1; + const previousSeq = resolveMessageSeq(sentHistory.items.at(-1)); const nextMessage = attachOpenClawTranscriptMeta(update.message, { ...(typeof update.messageId === "string" ? { id: update.messageId } : {}), - seq: messageSeq, + seq: + typeof previousSeq === "number" + ? previousSeq + 1 + : readSessionMessages(entry.sessionId, target.storePath, entry.sessionFile).length, }); - if (limit === undefined) { - sentMessages = [...sentMessages, nextMessage]; + if (limit === undefined && cursor === undefined) { + sentHistory = { + items: [...sentHistory.items, nextMessage], + messages: [...sentHistory.items, nextMessage], + hasMore: false, + }; sseWrite(res, "message", { sessionKey: target.canonicalKey, message: nextMessage, ...(typeof update.messageId === "string" ? { messageId: update.messageId } : {}), - messageSeq, + messageSeq: resolveMessageSeq(nextMessage), }); return; } - sentMessages = maybeLimitMessages([...sentMessages, nextMessage], limit); - sseWrite(res, "history", { - sessionKey: target.canonicalKey, - messages: sentMessages, - }); - return; } - sentMessages = maybeLimitMessages( + sentHistory = paginateSessionMessages( readSessionMessages(entry.sessionId, target.storePath, entry.sessionFile), limit, + cursor, ); sseWrite(res, "history", { sessionKey: target.canonicalKey, - messages: sentMessages, + ...sentHistory, }); });