From 3cc89eabdd02a4e766855d744aa6d2b13907a1f7 Mon Sep 17 00:00:00 2001 From: user <14040213+LivingGhost@users.noreply.github.com> Date: Fri, 20 Mar 2026 22:11:39 +0900 Subject: [PATCH 1/2] perf(gateway): add configurable sessions.list fallback concurrency Keep the existing serial sessions.list behavior by default while allowing transcript usage fallbacks to run with bounded parallelism when explicitly configured. --- src/config/config-misc.test.ts | 27 ++ src/config/schema.help.ts | 4 + src/config/schema.labels.ts | 2 + src/config/types.gateway.ts | 11 + src/config/zod-schema.ts | 6 + src/gateway/server-methods/sessions.ts | 6 +- src/gateway/session-utils.fs.test.ts | 25 ++ src/gateway/session-utils.fs.ts | 21 ++ src/gateway/session-utils.test.ts | 71 ++++ src/gateway/session-utils.ts | 449 +++++++++++++++++++------ 10 files changed, 513 insertions(+), 109 deletions(-) diff --git a/src/config/config-misc.test.ts b/src/config/config-misc.test.ts index 43dec5acfef..af19772ec9c 100644 --- a/src/config/config-misc.test.ts +++ b/src/config/config-misc.test.ts @@ -225,6 +225,33 @@ describe("gateway.tools config", () => { }); }); +describe("gateway.sessionsList.fallbackConcurrency", () => { + it("accepts positive integer concurrency", () => { + const res = validateConfigObject({ + gateway: { + sessionsList: { + fallbackConcurrency: 4, + }, + }, + }); + expect(res.ok).toBe(true); + }); + + it("rejects zero or negative values", () => { + const res = validateConfigObject({ + gateway: { + sessionsList: { + fallbackConcurrency: 0, + }, + }, + }); + expect(res.ok).toBe(false); + if (!res.ok) { + expect(res.issues[0]?.path).toBe("gateway.sessionsList.fallbackConcurrency"); + } + }); +}); + describe("gateway.channelHealthCheckMinutes", () => { it("accepts zero to disable monitor", () => { const res = validateConfigObject({ diff --git a/src/config/schema.help.ts b/src/config/schema.help.ts index 233900305fa..20f407ff8dd 100644 --- a/src/config/schema.help.ts +++ b/src/config/schema.help.ts @@ -100,6 +100,10 @@ export const FIELD_HELP: Record = { "Explicit gateway-level tool allowlist when you want a narrow set of tools available at runtime. Use this for locked-down environments where tool scope must be tightly controlled.", "gateway.tools.deny": "Explicit gateway-level tool denylist to block risky tools even if lower-level policies allow them. Use deny rules for emergency response and defense-in-depth hardening.", + "gateway.sessionsList": + "Gateway sessions.list read-path tuning for transcript-derived metadata fallbacks. Leave defaults alone unless large session inventories or large transcripts are causing sessions.list timeouts.", + "gateway.sessionsList.fallbackConcurrency": + "Maximum number of transcript usage fallback reads allowed in parallel during gateway sessions.list. Default: 1, which preserves the current serial behavior; raise this only when host I/O and CPU headroom are available.", "gateway.channelHealthCheckMinutes": "Interval in minutes for automatic channel health probing and status updates. Use lower intervals for faster detection, or higher intervals to reduce periodic probe noise.", "gateway.channelStaleEventThresholdMinutes": diff --git a/src/config/schema.labels.ts b/src/config/schema.labels.ts index e762e979c71..fec5ddaa511 100644 --- a/src/config/schema.labels.ts +++ b/src/config/schema.labels.ts @@ -83,6 +83,8 @@ export const FIELD_LABELS: Record = { "gateway.tools": "Gateway Tool Exposure Policy", "gateway.tools.allow": "Gateway Tool Allowlist", "gateway.tools.deny": "Gateway Tool Denylist", + "gateway.sessionsList": "Gateway Sessions List", + "gateway.sessionsList.fallbackConcurrency": "Gateway Sessions List Fallback Concurrency", "gateway.channelHealthCheckMinutes": "Gateway Channel Health Check Interval (min)", "gateway.channelStaleEventThresholdMinutes": "Gateway Channel Stale Event Threshold (min)", "gateway.channelMaxRestartsPerHour": "Gateway Channel Max Restarts Per Hour", diff --git a/src/config/types.gateway.ts b/src/config/types.gateway.ts index 385ece27aad..b1e090262ae 100644 --- a/src/config/types.gateway.ts +++ b/src/config/types.gateway.ts @@ -390,6 +390,15 @@ export type GatewayToolsConfig = { allow?: string[]; }; +export type GatewaySessionsListConfig = { + /** + * Maximum number of transcript usage fallbacks to hydrate in parallel while + * servicing gateway sessions.list. + * Default: 1 (preserves the current serial behavior). + */ + fallbackConcurrency?: number; +}; + export type GatewayConfig = { /** Single multiplexed port for Gateway WS + HTTP (default: 18789). */ port?: number; @@ -432,6 +441,8 @@ export type GatewayConfig = { allowRealIpFallback?: boolean; /** Tool access restrictions for HTTP /tools/invoke endpoint. */ tools?: GatewayToolsConfig; + /** sessions.list performance settings. */ + sessionsList?: GatewaySessionsListConfig; /** * Channel health monitor interval in minutes. * Periodically checks channel health and restarts unhealthy channels. diff --git a/src/config/zod-schema.ts b/src/config/zod-schema.ts index f8ad6bfcbc9..c24ea7d2e72 100644 --- a/src/config/zod-schema.ts +++ b/src/config/zod-schema.ts @@ -718,6 +718,12 @@ export const OpenClawSchema = z }) .strict() .optional(), + sessionsList: z + .object({ + fallbackConcurrency: z.number().int().min(1).optional(), + }) + .strict() + .optional(), channelHealthCheckMinutes: z.number().int().min(0).optional(), channelStaleEventThresholdMinutes: z.number().int().min(1).optional(), channelMaxRestartsPerHour: z.number().int().min(1).optional(), diff --git a/src/gateway/server-methods/sessions.ts b/src/gateway/server-methods/sessions.ts index d1c2efe155e..f3c31cfa058 100644 --- a/src/gateway/server-methods/sessions.ts +++ b/src/gateway/server-methods/sessions.ts @@ -49,7 +49,7 @@ import { import { reactivateCompletedSubagentSession } from "../session-subagent-reactivation.js"; import { archiveFileOnDisk, - listSessionsFromStore, + listSessionsFromStoreAsync, loadCombinedSessionStoreForGateway, loadGatewaySessionRow, loadSessionEntry, @@ -471,14 +471,14 @@ async function handleSessionSend(params: { } } export const sessionsHandlers: GatewayRequestHandlers = { - "sessions.list": ({ params, respond }) => { + "sessions.list": async ({ params, respond }) => { if (!assertValidParams(params, validateSessionsListParams, "sessions.list", respond)) { return; } const p = params; const cfg = loadConfig(); const { storePath, store } = loadCombinedSessionStoreForGateway(cfg); - const result = listSessionsFromStore({ + const result = await listSessionsFromStoreAsync({ cfg, storePath, store, diff --git a/src/gateway/session-utils.fs.test.ts b/src/gateway/session-utils.fs.test.ts index ca95b86aca1..b3ccaadf456 100644 --- a/src/gateway/session-utils.fs.test.ts +++ b/src/gateway/session-utils.fs.test.ts @@ -8,6 +8,7 @@ import { readFirstUserMessageFromTranscript, readLastMessagePreviewFromTranscript, readLatestSessionUsageFromTranscript, + readLatestSessionUsageFromTranscriptAsync, readSessionMessages, readSessionTitleFieldsFromTranscript, readSessionPreviewItemsFromTranscript, @@ -699,6 +700,30 @@ describe("readLatestSessionUsageFromTranscript", () => { }); }); + test("supports async usage fallback reads with matching results", async () => { + const sessionId = "usage-session-async"; + writeTranscript(tmpDir, sessionId, [ + { type: "session", version: 1, id: sessionId }, + { + message: { + role: "assistant", + provider: "openai", + model: "gpt-5.4", + usage: { + input: 900, + output: 200, + cacheRead: 100, + cost: { total: 0.0031 }, + }, + }, + }, + ]); + + await expect(readLatestSessionUsageFromTranscriptAsync(sessionId, storePath)).resolves.toEqual( + readLatestSessionUsageFromTranscript(sessionId, storePath), + ); + }); + test("aggregates assistant usage across the full transcript and keeps the latest context snapshot", () => { const sessionId = "usage-aggregate"; writeTranscript(tmpDir, sessionId, [ diff --git a/src/gateway/session-utils.fs.ts b/src/gateway/session-utils.fs.ts index 6ad14349c42..7535074c142 100644 --- a/src/gateway/session-utils.fs.ts +++ b/src/gateway/session-utils.fs.ts @@ -730,6 +730,27 @@ export function readLatestSessionUsageFromTranscript( }); } +export async function readLatestSessionUsageFromTranscriptAsync( + sessionId: string, + storePath: string | undefined, + sessionFile?: string, + agentId?: string, +): Promise { + const filePath = findExistingTranscriptPath(sessionId, storePath, sessionFile, agentId); + if (!filePath) { + return null; + } + try { + const chunk = await fs.promises.readFile(filePath, "utf-8"); + if (!chunk) { + return null; + } + return extractLatestUsageFromTranscriptChunk(chunk); + } catch { + return null; + } +} + const PREVIEW_READ_SIZES = [64 * 1024, 256 * 1024, 1024 * 1024]; const PREVIEW_MAX_LINES = 200; diff --git a/src/gateway/session-utils.test.ts b/src/gateway/session-utils.test.ts index 7f26059b813..409e7dc43d8 100644 --- a/src/gateway/session-utils.test.ts +++ b/src/gateway/session-utils.test.ts @@ -17,6 +17,7 @@ import { deriveSessionTitle, listAgentsForGateway, listSessionsFromStore, + listSessionsFromStoreAsync, loadCombinedSessionStoreForGateway, loadSessionEntry, parseGroupKey, @@ -1164,6 +1165,76 @@ describe("listSessionsFromStore search", () => { fs.rmSync(tmpDir, { recursive: true, force: true }); } }); + + test("listSessionsFromStoreAsync hydrates transcript fallbacks when concurrency is enabled", async () => { + const tmpDir = fs.mkdtempSync(path.join(os.tmpdir(), "openclaw-session-utils-async-")); + const storePath = path.join(tmpDir, "sessions.json"); + const cfg = { + gateway: { + sessionsList: { + fallbackConcurrency: 4, + }, + }, + session: { mainKey: "main" }, + agents: { + list: [{ id: "main", default: true }], + defaults: { + models: { + "anthropic/claude-sonnet-4-6": { params: { context1m: true } }, + }, + }, + }, + } as unknown as OpenClawConfig; + fs.writeFileSync( + path.join(tmpDir, "sess-main.jsonl"), + [ + JSON.stringify({ type: "session", version: 1, id: "sess-main" }), + JSON.stringify({ + message: { + role: "assistant", + provider: "anthropic", + model: "claude-sonnet-4-6", + usage: { + input: 2_000, + output: 500, + cacheRead: 1_200, + cost: { total: 0.007725 }, + }, + }, + }), + ].join("\n"), + "utf-8", + ); + + try { + const result = await listSessionsFromStoreAsync({ + cfg, + storePath, + store: { + "agent:main:main": { + sessionId: "sess-main", + updatedAt: Date.now(), + modelProvider: "anthropic", + model: "claude-sonnet-4-6", + totalTokens: 0, + totalTokensFresh: false, + inputTokens: 0, + outputTokens: 0, + cacheRead: 0, + cacheWrite: 0, + } as SessionEntry, + }, + opts: {}, + }); + + expect(result.sessions[0]?.totalTokens).toBe(3_200); + expect(result.sessions[0]?.totalTokensFresh).toBe(true); + expect(result.sessions[0]?.contextTokens).toBe(1_048_576); + expect(result.sessions[0]?.estimatedCostUsd).toBeCloseTo(0.007725, 8); + } finally { + fs.rmSync(tmpDir, { recursive: true, force: true }); + } + }); }); describe("listSessionsFromStore subagent metadata", () => { diff --git a/src/gateway/session-utils.ts b/src/gateway/session-utils.ts index 52c6f54b1ca..099d28767fd 100644 --- a/src/gateway/session-utils.ts +++ b/src/gateway/session-utils.ts @@ -47,11 +47,15 @@ import { resolveAvatarMime, } from "../shared/avatar-policy.js"; import { normalizeSessionDeliveryFields } from "../utils/delivery-context.js"; +import { runTasksWithConcurrency } from "../utils/run-with-concurrency.js"; import { estimateUsageCost, resolveModelCostConfig } from "../utils/usage-format.js"; import { readLatestSessionUsageFromTranscript, + readLatestSessionUsageFromTranscriptAsync, readSessionTitleFieldsFromTranscript, + type SessionTranscriptUsageSnapshot, } from "./session-utils.fs.js"; +import type { SessionsListParams } from "./protocol/index.js"; import type { GatewayAgentRow, GatewaySessionRow, @@ -83,6 +87,29 @@ export type { } from "./session-utils.types.js"; const DERIVED_TITLE_MAX_LEN = 60; +const DEFAULT_SESSIONS_LIST_FALLBACK_CONCURRENCY = 1; +const MAX_SESSIONS_LIST_FALLBACK_CONCURRENCY = 16; + +type GatewayTranscriptUsageFallback = { + estimatedCostUsd?: number; + totalTokens?: number; + totalTokensFresh?: boolean; + contextTokens?: number; +}; + +type SessionsListNormalizedOptions = { + now: number; + includeGlobal: boolean; + includeUnknown: boolean; + includeDerivedTitles: boolean; + includeLastMessage: boolean; + spawnedBy: string; + label: string; + agentId: string; + search: string; + activeMinutes?: number; + limit?: number; +}; function tryResolveExistingPath(value: string): string | null { try { @@ -253,6 +280,119 @@ function resolveEstimatedSessionCostUsd(params: { return resolveNonNegativeNumber(estimated); } +function resolveSessionsListFallbackConcurrency(cfg: OpenClawConfig): number { + const raw = cfg.gateway?.sessionsList?.fallbackConcurrency; + if (typeof raw !== "number" || !Number.isFinite(raw)) { + return DEFAULT_SESSIONS_LIST_FALLBACK_CONCURRENCY; + } + return Math.max( + DEFAULT_SESSIONS_LIST_FALLBACK_CONCURRENCY, + Math.min(MAX_SESSIONS_LIST_FALLBACK_CONCURRENCY, Math.floor(raw)), + ); +} + +function normalizeSessionsListOptions( + opts: SessionsListParams, + now = Date.now(), +): SessionsListNormalizedOptions { + return { + now, + includeGlobal: opts.includeGlobal === true, + includeUnknown: opts.includeUnknown === true, + includeDerivedTitles: opts.includeDerivedTitles === true, + includeLastMessage: opts.includeLastMessage === true, + spawnedBy: typeof opts.spawnedBy === "string" ? opts.spawnedBy : "", + label: typeof opts.label === "string" ? opts.label.trim() : "", + agentId: typeof opts.agentId === "string" ? normalizeAgentId(opts.agentId) : "", + search: typeof opts.search === "string" ? opts.search.trim().toLowerCase() : "", + activeMinutes: + typeof opts.activeMinutes === "number" && Number.isFinite(opts.activeMinutes) + ? Math.max(1, Math.floor(opts.activeMinutes)) + : undefined, + limit: + typeof opts.limit === "number" && Number.isFinite(opts.limit) + ? Math.max(1, Math.floor(opts.limit)) + : undefined, + }; +} + +function filterSessionListEntries( + store: Record, + opts: SessionsListNormalizedOptions, +): Array<[string, SessionEntry]> { + return Object.entries(store) + .filter(([key]) => { + if (isCronRunSessionKey(key)) { + return false; + } + if (!opts.includeGlobal && key === "global") { + return false; + } + if (!opts.includeUnknown && key === "unknown") { + return false; + } + if (opts.agentId) { + if (key === "global" || key === "unknown") { + return false; + } + const parsed = parseAgentSessionKey(key); + if (!parsed) { + return false; + } + return normalizeAgentId(parsed.agentId) === opts.agentId; + } + return true; + }) + .filter(([key, entry]) => { + if (!opts.spawnedBy) { + return true; + } + if (key === "unknown" || key === "global") { + return false; + } + return entry?.spawnedBy === opts.spawnedBy; + }) + .filter(([, entry]) => { + if (!opts.label) { + return true; + } + return entry?.label === opts.label; + }); +} + +function finalizeSessionListRows( + sessions: GatewaySessionRow[], + opts: SessionsListNormalizedOptions, +): GatewaySessionRow[] { + let filtered = sessions.toSorted((a, b) => (b.updatedAt ?? 0) - (a.updatedAt ?? 0)); + + if (opts.search) { + filtered = filtered.filter((session) => { + const fields = [ + session.displayName, + session.label, + session.subject, + session.sessionId, + session.key, + ]; + return fields.some( + (field) => typeof field === "string" && field.toLowerCase().includes(opts.search), + ); + }); + } + + if (opts.activeMinutes !== undefined) { + const cutoff = opts.now - opts.activeMinutes * 60_000; + filtered = filtered.filter((session) => (session.updatedAt ?? 0) >= cutoff); + } + + if (opts.limit !== undefined) { + filtered = filtered.slice(0, opts.limit); + } + + return filtered; +} + function resolveChildSessionKeys( controllerSessionKey: string, store: Record, @@ -276,6 +416,72 @@ function resolveChildSessionKeys( return childSessions.length > 0 ? childSessions : undefined; } +function resolveSessionEntryModelIdentity(params: { + cfg: OpenClawConfig; + key: string; + entry?: SessionEntry; +}): { provider?: string; model: string } { + const sessionAgentId = normalizeAgentId( + parseAgentSessionKey(params.key)?.agentId ?? resolveDefaultAgentId(params.cfg), + ); + const resolved = resolveSessionModelIdentityRef(params.cfg, params.entry, sessionAgentId); + return { + provider: resolved.provider, + model: resolved.model ?? DEFAULT_MODEL, + }; +} + +function shouldResolveTranscriptUsageFallback(params: { + cfg: OpenClawConfig; + entry?: SessionEntry; + fallbackProvider?: string; + fallbackModel?: string; +}): boolean { + return ( + resolvePositiveNumber(resolveFreshSessionTotalTokens(params.entry)) === undefined || + resolvePositiveNumber(params.entry?.contextTokens) === undefined || + resolveEstimatedSessionCostUsd({ + cfg: params.cfg, + provider: params.fallbackProvider, + model: params.fallbackModel, + entry: params.entry, + }) === undefined + ); +} + +function buildTranscriptUsageFallbackFromSnapshot(params: { + cfg: OpenClawConfig; + snapshot: SessionTranscriptUsageSnapshot; + fallbackProvider?: string; + fallbackModel?: string; +}): GatewayTranscriptUsageFallback { + const modelProvider = params.snapshot.modelProvider ?? params.fallbackProvider; + const model = params.snapshot.model ?? params.fallbackModel; + const contextTokens = resolveContextTokensForModel({ + cfg: params.cfg, + provider: modelProvider, + model, + }); + const estimatedCostUsd = resolveEstimatedSessionCostUsd({ + cfg: params.cfg, + provider: modelProvider, + model, + explicitCostUsd: params.snapshot.costUsd, + entry: { + inputTokens: params.snapshot.inputTokens, + outputTokens: params.snapshot.outputTokens, + cacheRead: params.snapshot.cacheRead, + cacheWrite: params.snapshot.cacheWrite, + }, + }); + return { + totalTokens: resolvePositiveNumber(params.snapshot.totalTokens), + totalTokensFresh: params.snapshot.totalTokensFresh === true, + contextTokens: resolvePositiveNumber(contextTokens), + estimatedCostUsd, + }; +} + function resolveTranscriptUsageFallback(params: { cfg: OpenClawConfig; key: string; @@ -306,31 +512,45 @@ function resolveTranscriptUsageFallback(params: { if (!snapshot) { return null; } - const modelProvider = snapshot.modelProvider ?? params.fallbackProvider; - const model = snapshot.model ?? params.fallbackModel; - const contextTokens = resolveContextTokensForModel({ + return buildTranscriptUsageFallbackFromSnapshot({ cfg: params.cfg, - provider: modelProvider, - model, + snapshot, + fallbackProvider: params.fallbackProvider, + fallbackModel: params.fallbackModel, }); - const estimatedCostUsd = resolveEstimatedSessionCostUsd({ +} + +async function resolveTranscriptUsageFallbackAsync(params: { + cfg: OpenClawConfig; + key: string; + entry?: SessionEntry; + storePath: string; + fallbackProvider?: string; + fallbackModel?: string; +}): Promise { + const entry = params.entry; + if (!entry?.sessionId) { + return null; + } + const parsed = parseAgentSessionKey(params.key); + const agentId = parsed?.agentId + ? normalizeAgentId(parsed.agentId) + : resolveDefaultAgentId(params.cfg); + const snapshot = await readLatestSessionUsageFromTranscriptAsync( + entry.sessionId, + params.storePath, + entry.sessionFile, + agentId, + ); + if (!snapshot) { + return null; + } + return buildTranscriptUsageFallbackFromSnapshot({ cfg: params.cfg, - provider: modelProvider, - model, - explicitCostUsd: snapshot.costUsd, - entry: { - inputTokens: snapshot.inputTokens, - outputTokens: snapshot.outputTokens, - cacheRead: snapshot.cacheRead, - cacheWrite: snapshot.cacheWrite, - }, + snapshot, + fallbackProvider: params.fallbackProvider, + fallbackModel: params.fallbackModel, }); - return { - totalTokens: resolvePositiveNumber(snapshot.totalTokens), - totalTokensFresh: snapshot.totalTokensFresh === true, - contextTokens: resolvePositiveNumber(contextTokens), - estimatedCostUsd, - }; } export function loadSessionEntry(sessionKey: string) { @@ -1024,6 +1244,8 @@ export function buildGatewaySessionRow(params: { now?: number; includeDerivedTitles?: boolean; includeLastMessage?: boolean; + transcriptUsage?: GatewayTranscriptUsageFallback | null; + skipTranscriptUsageFallback?: boolean; }): GatewaySessionRow { const { cfg, storePath, store, key, entry } = params; const now = params.now ?? Date.now(); @@ -1066,15 +1288,14 @@ export function buildGatewaySessionRow(params: { ); const modelProvider = resolvedModel.provider; const model = resolvedModel.model ?? DEFAULT_MODEL; - const transcriptUsage = - resolvePositiveNumber(resolveFreshSessionTotalTokens(entry)) === undefined || - resolvePositiveNumber(entry?.contextTokens) === undefined || - resolveEstimatedSessionCostUsd({ - cfg, - provider: modelProvider, - model, - entry, - }) === undefined + const transcriptUsage = params.skipTranscriptUsageFallback + ? (params.transcriptUsage ?? null) + : shouldResolveTranscriptUsageFallback({ + cfg, + entry, + fallbackProvider: modelProvider, + fallbackModel: model, + }) ? resolveTranscriptUsageFallback({ cfg, key, @@ -1196,62 +1417,12 @@ export function listSessionsFromStore(params: { cfg: OpenClawConfig; storePath: string; store: Record; - opts: import("./protocol/index.js").SessionsListParams; + opts: SessionsListParams; }): SessionsListResult { const { cfg, storePath, store, opts } = params; - const now = Date.now(); + const normalizedOpts = normalizeSessionsListOptions(opts); - const includeGlobal = opts.includeGlobal === true; - const includeUnknown = opts.includeUnknown === true; - const includeDerivedTitles = opts.includeDerivedTitles === true; - const includeLastMessage = opts.includeLastMessage === true; - const spawnedBy = typeof opts.spawnedBy === "string" ? opts.spawnedBy : ""; - const label = typeof opts.label === "string" ? opts.label.trim() : ""; - const agentId = typeof opts.agentId === "string" ? normalizeAgentId(opts.agentId) : ""; - const search = typeof opts.search === "string" ? opts.search.trim().toLowerCase() : ""; - const activeMinutes = - typeof opts.activeMinutes === "number" && Number.isFinite(opts.activeMinutes) - ? Math.max(1, Math.floor(opts.activeMinutes)) - : undefined; - - let sessions = Object.entries(store) - .filter(([key]) => { - if (isCronRunSessionKey(key)) { - return false; - } - if (!includeGlobal && key === "global") { - return false; - } - if (!includeUnknown && key === "unknown") { - return false; - } - if (agentId) { - if (key === "global" || key === "unknown") { - return false; - } - const parsed = parseAgentSessionKey(key); - if (!parsed) { - return false; - } - return normalizeAgentId(parsed.agentId) === agentId; - } - return true; - }) - .filter(([key, entry]) => { - if (!spawnedBy) { - return true; - } - if (key === "unknown" || key === "global") { - return false; - } - return entry?.spawnedBy === spawnedBy; - }) - .filter(([, entry]) => { - if (!label) { - return true; - } - return entry?.label === label; - }) + let sessions = filterSessionListEntries(store, normalizedOpts) .map(([key, entry]) => buildGatewaySessionRow({ cfg, @@ -1259,35 +1430,101 @@ export function listSessionsFromStore(params: { store, key, entry, - now, - includeDerivedTitles, - includeLastMessage, + now: normalizedOpts.now, + includeDerivedTitles: normalizedOpts.includeDerivedTitles, + includeLastMessage: normalizedOpts.includeLastMessage, }), - ) - .toSorted((a, b) => (b.updatedAt ?? 0) - (a.updatedAt ?? 0)); - - if (search) { - sessions = sessions.filter((s) => { - const fields = [s.displayName, s.label, s.subject, s.sessionId, s.key]; - return fields.some((f) => typeof f === "string" && f.toLowerCase().includes(search)); - }); - } - - if (activeMinutes !== undefined) { - const cutoff = now - activeMinutes * 60_000; - sessions = sessions.filter((s) => (s.updatedAt ?? 0) >= cutoff); - } - - if (typeof opts.limit === "number" && Number.isFinite(opts.limit)) { - const limit = Math.max(1, Math.floor(opts.limit)); - sessions = sessions.slice(0, limit); - } + ); + sessions = finalizeSessionListRows(sessions, normalizedOpts); return { - ts: now, + ts: normalizedOpts.now, path: storePath, count: sessions.length, defaults: getSessionDefaults(cfg), sessions, }; } + +export async function listSessionsFromStoreAsync(params: { + cfg: OpenClawConfig; + storePath: string; + store: Record; + opts: SessionsListParams; +}): Promise { + const concurrency = resolveSessionsListFallbackConcurrency(params.cfg); + if (concurrency <= DEFAULT_SESSIONS_LIST_FALLBACK_CONCURRENCY) { + return listSessionsFromStore(params); + } + + const normalizedOpts = normalizeSessionsListOptions(params.opts); + const filteredEntries = filterSessionListEntries(params.store, normalizedOpts); + const transcriptUsageByKey = new Map(); + const tasks = filteredEntries + .filter(([key, entry]) => { + const resolvedModel = resolveSessionEntryModelIdentity({ + cfg: params.cfg, + key, + entry, + }); + return shouldResolveTranscriptUsageFallback({ + cfg: params.cfg, + entry, + fallbackProvider: resolvedModel.provider, + fallbackModel: resolvedModel.model, + }); + }) + .map(([key, entry]) => async () => { + const resolvedModel = resolveSessionEntryModelIdentity({ + cfg: params.cfg, + key, + entry, + }); + const usage = await resolveTranscriptUsageFallbackAsync({ + cfg: params.cfg, + key, + entry, + storePath: params.storePath, + fallbackProvider: resolvedModel.provider, + fallbackModel: resolvedModel.model, + }); + return { key, usage }; + }); + + if (tasks.length > 0) { + const { results } = await runTasksWithConcurrency({ + tasks, + limit: concurrency, + errorMode: "continue", + }); + for (const result of results) { + if (result?.key) { + transcriptUsageByKey.set(result.key, result.usage ?? null); + } + } + } + + let sessions = filteredEntries.map(([key, entry]) => + buildGatewaySessionRow({ + cfg: params.cfg, + storePath: params.storePath, + store: params.store, + key, + entry, + now: normalizedOpts.now, + includeDerivedTitles: normalizedOpts.includeDerivedTitles, + includeLastMessage: normalizedOpts.includeLastMessage, + transcriptUsage: transcriptUsageByKey.get(key) ?? null, + skipTranscriptUsageFallback: transcriptUsageByKey.has(key), + }), + ); + sessions = finalizeSessionListRows(sessions, normalizedOpts); + + return { + ts: normalizedOpts.now, + path: params.storePath, + count: sessions.length, + defaults: getSessionDefaults(params.cfg), + sessions, + }; +} From 8b29ab5edbaba7ed44277630e93f3feeca5670ce Mon Sep 17 00:00:00 2001 From: user <14040213+LivingGhost@users.noreply.github.com> Date: Sat, 21 Mar 2026 10:56:55 +0900 Subject: [PATCH 2/2] fix(gateway): align concurrent sessions.list fallback metadata --- src/gateway/session-utils.test.ts | 83 ++++++++++++++++++++++++++++ src/gateway/session-utils.ts | 92 ++++++++++++++++--------------- 2 files changed, 131 insertions(+), 44 deletions(-) diff --git a/src/gateway/session-utils.test.ts b/src/gateway/session-utils.test.ts index 409e7dc43d8..33ae6ca49c6 100644 --- a/src/gateway/session-utils.test.ts +++ b/src/gateway/session-utils.test.ts @@ -1166,6 +1166,89 @@ describe("listSessionsFromStore search", () => { } }); + test("listSessionsFromStoreAsync uses subagent run model for child session transcript fallback", async () => { + const tmpDir = fs.mkdtempSync(path.join(os.tmpdir(), "openclaw-session-utils-async-subagent-")); + const storePath = path.join(tmpDir, "sessions.json"); + const now = Date.now(); + const cfg = { + gateway: { + sessionsList: { + fallbackConcurrency: 4, + }, + }, + session: { mainKey: "main" }, + agents: { + list: [{ id: "main", default: true }], + defaults: { + models: { + "anthropic/claude-sonnet-4-6": { params: { context1m: true } }, + }, + }, + }, + } as unknown as OpenClawConfig; + fs.writeFileSync( + path.join(tmpDir, "sess-child.jsonl"), + [ + JSON.stringify({ type: "session", version: 1, id: "sess-child" }), + JSON.stringify({ + message: { + role: "assistant", + usage: { + input: 2_000, + output: 500, + cacheRead: 1_200, + cost: { total: 0.007725 }, + }, + }, + }), + ].join("\n"), + "utf-8", + ); + + addSubagentRunForTests({ + runId: "run-child-async", + childSessionKey: "agent:main:subagent:child-async", + controllerSessionKey: "agent:main:main", + requesterSessionKey: "agent:main:main", + requesterDisplayKey: "main", + task: "child task", + cleanup: "keep", + createdAt: now - 5_000, + startedAt: now - 4_000, + model: "anthropic/claude-sonnet-4-6", + }); + + try { + const result = await listSessionsFromStoreAsync({ + cfg, + storePath, + store: { + "agent:main:subagent:child-async": { + sessionId: "sess-child", + updatedAt: now, + spawnedBy: "agent:main:main", + totalTokens: 0, + totalTokensFresh: false, + } as SessionEntry, + }, + opts: {}, + }); + + expect(result.sessions[0]).toMatchObject({ + key: "agent:main:subagent:child-async", + status: "running", + modelProvider: "anthropic", + model: "claude-sonnet-4-6", + totalTokens: 3_200, + totalTokensFresh: true, + contextTokens: 1_048_576, + }); + expect(result.sessions[0]?.estimatedCostUsd).toBeCloseTo(0.007725, 8); + } finally { + fs.rmSync(tmpDir, { recursive: true, force: true }); + } + }); + test("listSessionsFromStoreAsync hydrates transcript fallbacks when concurrency is enabled", async () => { const tmpDir = fs.mkdtempSync(path.join(os.tmpdir(), "openclaw-session-utils-async-")); const storePath = path.join(tmpDir, "sessions.json"); diff --git a/src/gateway/session-utils.ts b/src/gateway/session-utils.ts index 099d28767fd..25c50119c79 100644 --- a/src/gateway/session-utils.ts +++ b/src/gateway/session-utils.ts @@ -49,13 +49,13 @@ import { import { normalizeSessionDeliveryFields } from "../utils/delivery-context.js"; import { runTasksWithConcurrency } from "../utils/run-with-concurrency.js"; import { estimateUsageCost, resolveModelCostConfig } from "../utils/usage-format.js"; +import type { SessionsListParams } from "./protocol/index.js"; import { readLatestSessionUsageFromTranscript, readLatestSessionUsageFromTranscriptAsync, readSessionTitleFieldsFromTranscript, type SessionTranscriptUsageSnapshot, } from "./session-utils.fs.js"; -import type { SessionsListParams } from "./protocol/index.js"; import type { GatewayAgentRow, GatewaySessionRow, @@ -424,7 +424,13 @@ function resolveSessionEntryModelIdentity(params: { const sessionAgentId = normalizeAgentId( parseAgentSessionKey(params.key)?.agentId ?? resolveDefaultAgentId(params.cfg), ); - const resolved = resolveSessionModelIdentityRef(params.cfg, params.entry, sessionAgentId); + const subagentRun = getSubagentRunByChildSessionKey(params.key); + const resolved = resolveSessionModelIdentityRef( + params.cfg, + params.entry, + sessionAgentId, + subagentRun?.model, + ); return { provider: resolved.provider, model: resolved.model ?? DEFAULT_MODEL, @@ -1422,19 +1428,18 @@ export function listSessionsFromStore(params: { const { cfg, storePath, store, opts } = params; const normalizedOpts = normalizeSessionsListOptions(opts); - let sessions = filterSessionListEntries(store, normalizedOpts) - .map(([key, entry]) => - buildGatewaySessionRow({ - cfg, - storePath, - store, - key, - entry, - now: normalizedOpts.now, - includeDerivedTitles: normalizedOpts.includeDerivedTitles, - includeLastMessage: normalizedOpts.includeLastMessage, - }), - ); + let sessions = filterSessionListEntries(store, normalizedOpts).map(([key, entry]) => + buildGatewaySessionRow({ + cfg, + storePath, + store, + key, + entry, + now: normalizedOpts.now, + includeDerivedTitles: normalizedOpts.includeDerivedTitles, + includeLastMessage: normalizedOpts.includeLastMessage, + }), + ); sessions = finalizeSessionListRows(sessions, normalizedOpts); return { @@ -1460,36 +1465,35 @@ export async function listSessionsFromStoreAsync(params: { const normalizedOpts = normalizeSessionsListOptions(params.opts); const filteredEntries = filterSessionListEntries(params.store, normalizedOpts); const transcriptUsageByKey = new Map(); - const tasks = filteredEntries - .filter(([key, entry]) => { - const resolvedModel = resolveSessionEntryModelIdentity({ - cfg: params.cfg, - key, - entry, - }); - return shouldResolveTranscriptUsageFallback({ - cfg: params.cfg, - entry, - fallbackProvider: resolvedModel.provider, - fallbackModel: resolvedModel.model, - }); - }) - .map(([key, entry]) => async () => { - const resolvedModel = resolveSessionEntryModelIdentity({ - cfg: params.cfg, - key, - entry, - }); - const usage = await resolveTranscriptUsageFallbackAsync({ - cfg: params.cfg, - key, - entry, - storePath: params.storePath, - fallbackProvider: resolvedModel.provider, - fallbackModel: resolvedModel.model, - }); - return { key, usage }; + const entriesNeedingFallback = filteredEntries.flatMap(([key, entry]) => { + const resolvedModel = resolveSessionEntryModelIdentity({ + cfg: params.cfg, + key, + entry, }); + if ( + !shouldResolveTranscriptUsageFallback({ + cfg: params.cfg, + entry, + fallbackProvider: resolvedModel.provider, + fallbackModel: resolvedModel.model, + }) + ) { + return []; + } + return [{ key, entry, resolvedModel }]; + }); + const tasks = entriesNeedingFallback.map(({ key, entry, resolvedModel }) => async () => { + const usage = await resolveTranscriptUsageFallbackAsync({ + cfg: params.cfg, + key, + entry, + storePath: params.storePath, + fallbackProvider: resolvedModel.provider, + fallbackModel: resolvedModel.model, + }); + return { key, usage }; + }); if (tasks.length > 0) { const { results } = await runTasksWithConcurrency({