perf(gateway): add configurable sessions.list fallback concurrency
Keep the existing serial sessions.list behavior by default while allowing transcript usage fallbacks to run with bounded parallelism when explicitly configured.
This commit is contained in:
parent
6b4c24c2e5
commit
3cc89eabdd
@ -225,6 +225,33 @@ describe("gateway.tools config", () => {
|
||||
});
|
||||
});
|
||||
|
||||
describe("gateway.sessionsList.fallbackConcurrency", () => {
|
||||
it("accepts positive integer concurrency", () => {
|
||||
const res = validateConfigObject({
|
||||
gateway: {
|
||||
sessionsList: {
|
||||
fallbackConcurrency: 4,
|
||||
},
|
||||
},
|
||||
});
|
||||
expect(res.ok).toBe(true);
|
||||
});
|
||||
|
||||
it("rejects zero or negative values", () => {
|
||||
const res = validateConfigObject({
|
||||
gateway: {
|
||||
sessionsList: {
|
||||
fallbackConcurrency: 0,
|
||||
},
|
||||
},
|
||||
});
|
||||
expect(res.ok).toBe(false);
|
||||
if (!res.ok) {
|
||||
expect(res.issues[0]?.path).toBe("gateway.sessionsList.fallbackConcurrency");
|
||||
}
|
||||
});
|
||||
});
|
||||
|
||||
describe("gateway.channelHealthCheckMinutes", () => {
|
||||
it("accepts zero to disable monitor", () => {
|
||||
const res = validateConfigObject({
|
||||
|
||||
@ -100,6 +100,10 @@ export const FIELD_HELP: Record<string, string> = {
|
||||
"Explicit gateway-level tool allowlist when you want a narrow set of tools available at runtime. Use this for locked-down environments where tool scope must be tightly controlled.",
|
||||
"gateway.tools.deny":
|
||||
"Explicit gateway-level tool denylist to block risky tools even if lower-level policies allow them. Use deny rules for emergency response and defense-in-depth hardening.",
|
||||
"gateway.sessionsList":
|
||||
"Gateway sessions.list read-path tuning for transcript-derived metadata fallbacks. Leave defaults alone unless large session inventories or large transcripts are causing sessions.list timeouts.",
|
||||
"gateway.sessionsList.fallbackConcurrency":
|
||||
"Maximum number of transcript usage fallback reads allowed in parallel during gateway sessions.list. Default: 1, which preserves the current serial behavior; raise this only when host I/O and CPU headroom are available.",
|
||||
"gateway.channelHealthCheckMinutes":
|
||||
"Interval in minutes for automatic channel health probing and status updates. Use lower intervals for faster detection, or higher intervals to reduce periodic probe noise.",
|
||||
"gateway.channelStaleEventThresholdMinutes":
|
||||
|
||||
@ -83,6 +83,8 @@ export const FIELD_LABELS: Record<string, string> = {
|
||||
"gateway.tools": "Gateway Tool Exposure Policy",
|
||||
"gateway.tools.allow": "Gateway Tool Allowlist",
|
||||
"gateway.tools.deny": "Gateway Tool Denylist",
|
||||
"gateway.sessionsList": "Gateway Sessions List",
|
||||
"gateway.sessionsList.fallbackConcurrency": "Gateway Sessions List Fallback Concurrency",
|
||||
"gateway.channelHealthCheckMinutes": "Gateway Channel Health Check Interval (min)",
|
||||
"gateway.channelStaleEventThresholdMinutes": "Gateway Channel Stale Event Threshold (min)",
|
||||
"gateway.channelMaxRestartsPerHour": "Gateway Channel Max Restarts Per Hour",
|
||||
|
||||
@ -390,6 +390,15 @@ export type GatewayToolsConfig = {
|
||||
allow?: string[];
|
||||
};
|
||||
|
||||
export type GatewaySessionsListConfig = {
|
||||
/**
|
||||
* Maximum number of transcript usage fallbacks to hydrate in parallel while
|
||||
* servicing gateway sessions.list.
|
||||
* Default: 1 (preserves the current serial behavior).
|
||||
*/
|
||||
fallbackConcurrency?: number;
|
||||
};
|
||||
|
||||
export type GatewayConfig = {
|
||||
/** Single multiplexed port for Gateway WS + HTTP (default: 18789). */
|
||||
port?: number;
|
||||
@ -432,6 +441,8 @@ export type GatewayConfig = {
|
||||
allowRealIpFallback?: boolean;
|
||||
/** Tool access restrictions for HTTP /tools/invoke endpoint. */
|
||||
tools?: GatewayToolsConfig;
|
||||
/** sessions.list performance settings. */
|
||||
sessionsList?: GatewaySessionsListConfig;
|
||||
/**
|
||||
* Channel health monitor interval in minutes.
|
||||
* Periodically checks channel health and restarts unhealthy channels.
|
||||
|
||||
@ -718,6 +718,12 @@ export const OpenClawSchema = z
|
||||
})
|
||||
.strict()
|
||||
.optional(),
|
||||
sessionsList: z
|
||||
.object({
|
||||
fallbackConcurrency: z.number().int().min(1).optional(),
|
||||
})
|
||||
.strict()
|
||||
.optional(),
|
||||
channelHealthCheckMinutes: z.number().int().min(0).optional(),
|
||||
channelStaleEventThresholdMinutes: z.number().int().min(1).optional(),
|
||||
channelMaxRestartsPerHour: z.number().int().min(1).optional(),
|
||||
|
||||
@ -49,7 +49,7 @@ import {
|
||||
import { reactivateCompletedSubagentSession } from "../session-subagent-reactivation.js";
|
||||
import {
|
||||
archiveFileOnDisk,
|
||||
listSessionsFromStore,
|
||||
listSessionsFromStoreAsync,
|
||||
loadCombinedSessionStoreForGateway,
|
||||
loadGatewaySessionRow,
|
||||
loadSessionEntry,
|
||||
@ -471,14 +471,14 @@ async function handleSessionSend(params: {
|
||||
}
|
||||
}
|
||||
export const sessionsHandlers: GatewayRequestHandlers = {
|
||||
"sessions.list": ({ params, respond }) => {
|
||||
"sessions.list": async ({ params, respond }) => {
|
||||
if (!assertValidParams(params, validateSessionsListParams, "sessions.list", respond)) {
|
||||
return;
|
||||
}
|
||||
const p = params;
|
||||
const cfg = loadConfig();
|
||||
const { storePath, store } = loadCombinedSessionStoreForGateway(cfg);
|
||||
const result = listSessionsFromStore({
|
||||
const result = await listSessionsFromStoreAsync({
|
||||
cfg,
|
||||
storePath,
|
||||
store,
|
||||
|
||||
@ -8,6 +8,7 @@ import {
|
||||
readFirstUserMessageFromTranscript,
|
||||
readLastMessagePreviewFromTranscript,
|
||||
readLatestSessionUsageFromTranscript,
|
||||
readLatestSessionUsageFromTranscriptAsync,
|
||||
readSessionMessages,
|
||||
readSessionTitleFieldsFromTranscript,
|
||||
readSessionPreviewItemsFromTranscript,
|
||||
@ -699,6 +700,30 @@ describe("readLatestSessionUsageFromTranscript", () => {
|
||||
});
|
||||
});
|
||||
|
||||
test("supports async usage fallback reads with matching results", async () => {
|
||||
const sessionId = "usage-session-async";
|
||||
writeTranscript(tmpDir, sessionId, [
|
||||
{ type: "session", version: 1, id: sessionId },
|
||||
{
|
||||
message: {
|
||||
role: "assistant",
|
||||
provider: "openai",
|
||||
model: "gpt-5.4",
|
||||
usage: {
|
||||
input: 900,
|
||||
output: 200,
|
||||
cacheRead: 100,
|
||||
cost: { total: 0.0031 },
|
||||
},
|
||||
},
|
||||
},
|
||||
]);
|
||||
|
||||
await expect(readLatestSessionUsageFromTranscriptAsync(sessionId, storePath)).resolves.toEqual(
|
||||
readLatestSessionUsageFromTranscript(sessionId, storePath),
|
||||
);
|
||||
});
|
||||
|
||||
test("aggregates assistant usage across the full transcript and keeps the latest context snapshot", () => {
|
||||
const sessionId = "usage-aggregate";
|
||||
writeTranscript(tmpDir, sessionId, [
|
||||
|
||||
@ -730,6 +730,27 @@ export function readLatestSessionUsageFromTranscript(
|
||||
});
|
||||
}
|
||||
|
||||
export async function readLatestSessionUsageFromTranscriptAsync(
|
||||
sessionId: string,
|
||||
storePath: string | undefined,
|
||||
sessionFile?: string,
|
||||
agentId?: string,
|
||||
): Promise<SessionTranscriptUsageSnapshot | null> {
|
||||
const filePath = findExistingTranscriptPath(sessionId, storePath, sessionFile, agentId);
|
||||
if (!filePath) {
|
||||
return null;
|
||||
}
|
||||
try {
|
||||
const chunk = await fs.promises.readFile(filePath, "utf-8");
|
||||
if (!chunk) {
|
||||
return null;
|
||||
}
|
||||
return extractLatestUsageFromTranscriptChunk(chunk);
|
||||
} catch {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
const PREVIEW_READ_SIZES = [64 * 1024, 256 * 1024, 1024 * 1024];
|
||||
const PREVIEW_MAX_LINES = 200;
|
||||
|
||||
|
||||
@ -17,6 +17,7 @@ import {
|
||||
deriveSessionTitle,
|
||||
listAgentsForGateway,
|
||||
listSessionsFromStore,
|
||||
listSessionsFromStoreAsync,
|
||||
loadCombinedSessionStoreForGateway,
|
||||
loadSessionEntry,
|
||||
parseGroupKey,
|
||||
@ -1164,6 +1165,76 @@ describe("listSessionsFromStore search", () => {
|
||||
fs.rmSync(tmpDir, { recursive: true, force: true });
|
||||
}
|
||||
});
|
||||
|
||||
test("listSessionsFromStoreAsync hydrates transcript fallbacks when concurrency is enabled", async () => {
|
||||
const tmpDir = fs.mkdtempSync(path.join(os.tmpdir(), "openclaw-session-utils-async-"));
|
||||
const storePath = path.join(tmpDir, "sessions.json");
|
||||
const cfg = {
|
||||
gateway: {
|
||||
sessionsList: {
|
||||
fallbackConcurrency: 4,
|
||||
},
|
||||
},
|
||||
session: { mainKey: "main" },
|
||||
agents: {
|
||||
list: [{ id: "main", default: true }],
|
||||
defaults: {
|
||||
models: {
|
||||
"anthropic/claude-sonnet-4-6": { params: { context1m: true } },
|
||||
},
|
||||
},
|
||||
},
|
||||
} as unknown as OpenClawConfig;
|
||||
fs.writeFileSync(
|
||||
path.join(tmpDir, "sess-main.jsonl"),
|
||||
[
|
||||
JSON.stringify({ type: "session", version: 1, id: "sess-main" }),
|
||||
JSON.stringify({
|
||||
message: {
|
||||
role: "assistant",
|
||||
provider: "anthropic",
|
||||
model: "claude-sonnet-4-6",
|
||||
usage: {
|
||||
input: 2_000,
|
||||
output: 500,
|
||||
cacheRead: 1_200,
|
||||
cost: { total: 0.007725 },
|
||||
},
|
||||
},
|
||||
}),
|
||||
].join("\n"),
|
||||
"utf-8",
|
||||
);
|
||||
|
||||
try {
|
||||
const result = await listSessionsFromStoreAsync({
|
||||
cfg,
|
||||
storePath,
|
||||
store: {
|
||||
"agent:main:main": {
|
||||
sessionId: "sess-main",
|
||||
updatedAt: Date.now(),
|
||||
modelProvider: "anthropic",
|
||||
model: "claude-sonnet-4-6",
|
||||
totalTokens: 0,
|
||||
totalTokensFresh: false,
|
||||
inputTokens: 0,
|
||||
outputTokens: 0,
|
||||
cacheRead: 0,
|
||||
cacheWrite: 0,
|
||||
} as SessionEntry,
|
||||
},
|
||||
opts: {},
|
||||
});
|
||||
|
||||
expect(result.sessions[0]?.totalTokens).toBe(3_200);
|
||||
expect(result.sessions[0]?.totalTokensFresh).toBe(true);
|
||||
expect(result.sessions[0]?.contextTokens).toBe(1_048_576);
|
||||
expect(result.sessions[0]?.estimatedCostUsd).toBeCloseTo(0.007725, 8);
|
||||
} finally {
|
||||
fs.rmSync(tmpDir, { recursive: true, force: true });
|
||||
}
|
||||
});
|
||||
});
|
||||
|
||||
describe("listSessionsFromStore subagent metadata", () => {
|
||||
|
||||
@ -47,11 +47,15 @@ import {
|
||||
resolveAvatarMime,
|
||||
} from "../shared/avatar-policy.js";
|
||||
import { normalizeSessionDeliveryFields } from "../utils/delivery-context.js";
|
||||
import { runTasksWithConcurrency } from "../utils/run-with-concurrency.js";
|
||||
import { estimateUsageCost, resolveModelCostConfig } from "../utils/usage-format.js";
|
||||
import {
|
||||
readLatestSessionUsageFromTranscript,
|
||||
readLatestSessionUsageFromTranscriptAsync,
|
||||
readSessionTitleFieldsFromTranscript,
|
||||
type SessionTranscriptUsageSnapshot,
|
||||
} from "./session-utils.fs.js";
|
||||
import type { SessionsListParams } from "./protocol/index.js";
|
||||
import type {
|
||||
GatewayAgentRow,
|
||||
GatewaySessionRow,
|
||||
@ -83,6 +87,29 @@ export type {
|
||||
} from "./session-utils.types.js";
|
||||
|
||||
const DERIVED_TITLE_MAX_LEN = 60;
|
||||
const DEFAULT_SESSIONS_LIST_FALLBACK_CONCURRENCY = 1;
|
||||
const MAX_SESSIONS_LIST_FALLBACK_CONCURRENCY = 16;
|
||||
|
||||
type GatewayTranscriptUsageFallback = {
|
||||
estimatedCostUsd?: number;
|
||||
totalTokens?: number;
|
||||
totalTokensFresh?: boolean;
|
||||
contextTokens?: number;
|
||||
};
|
||||
|
||||
type SessionsListNormalizedOptions = {
|
||||
now: number;
|
||||
includeGlobal: boolean;
|
||||
includeUnknown: boolean;
|
||||
includeDerivedTitles: boolean;
|
||||
includeLastMessage: boolean;
|
||||
spawnedBy: string;
|
||||
label: string;
|
||||
agentId: string;
|
||||
search: string;
|
||||
activeMinutes?: number;
|
||||
limit?: number;
|
||||
};
|
||||
|
||||
function tryResolveExistingPath(value: string): string | null {
|
||||
try {
|
||||
@ -253,6 +280,119 @@ function resolveEstimatedSessionCostUsd(params: {
|
||||
return resolveNonNegativeNumber(estimated);
|
||||
}
|
||||
|
||||
function resolveSessionsListFallbackConcurrency(cfg: OpenClawConfig): number {
|
||||
const raw = cfg.gateway?.sessionsList?.fallbackConcurrency;
|
||||
if (typeof raw !== "number" || !Number.isFinite(raw)) {
|
||||
return DEFAULT_SESSIONS_LIST_FALLBACK_CONCURRENCY;
|
||||
}
|
||||
return Math.max(
|
||||
DEFAULT_SESSIONS_LIST_FALLBACK_CONCURRENCY,
|
||||
Math.min(MAX_SESSIONS_LIST_FALLBACK_CONCURRENCY, Math.floor(raw)),
|
||||
);
|
||||
}
|
||||
|
||||
function normalizeSessionsListOptions(
|
||||
opts: SessionsListParams,
|
||||
now = Date.now(),
|
||||
): SessionsListNormalizedOptions {
|
||||
return {
|
||||
now,
|
||||
includeGlobal: opts.includeGlobal === true,
|
||||
includeUnknown: opts.includeUnknown === true,
|
||||
includeDerivedTitles: opts.includeDerivedTitles === true,
|
||||
includeLastMessage: opts.includeLastMessage === true,
|
||||
spawnedBy: typeof opts.spawnedBy === "string" ? opts.spawnedBy : "",
|
||||
label: typeof opts.label === "string" ? opts.label.trim() : "",
|
||||
agentId: typeof opts.agentId === "string" ? normalizeAgentId(opts.agentId) : "",
|
||||
search: typeof opts.search === "string" ? opts.search.trim().toLowerCase() : "",
|
||||
activeMinutes:
|
||||
typeof opts.activeMinutes === "number" && Number.isFinite(opts.activeMinutes)
|
||||
? Math.max(1, Math.floor(opts.activeMinutes))
|
||||
: undefined,
|
||||
limit:
|
||||
typeof opts.limit === "number" && Number.isFinite(opts.limit)
|
||||
? Math.max(1, Math.floor(opts.limit))
|
||||
: undefined,
|
||||
};
|
||||
}
|
||||
|
||||
function filterSessionListEntries(
|
||||
store: Record<string, SessionEntry>,
|
||||
opts: SessionsListNormalizedOptions,
|
||||
): Array<[string, SessionEntry]> {
|
||||
return Object.entries(store)
|
||||
.filter(([key]) => {
|
||||
if (isCronRunSessionKey(key)) {
|
||||
return false;
|
||||
}
|
||||
if (!opts.includeGlobal && key === "global") {
|
||||
return false;
|
||||
}
|
||||
if (!opts.includeUnknown && key === "unknown") {
|
||||
return false;
|
||||
}
|
||||
if (opts.agentId) {
|
||||
if (key === "global" || key === "unknown") {
|
||||
return false;
|
||||
}
|
||||
const parsed = parseAgentSessionKey(key);
|
||||
if (!parsed) {
|
||||
return false;
|
||||
}
|
||||
return normalizeAgentId(parsed.agentId) === opts.agentId;
|
||||
}
|
||||
return true;
|
||||
})
|
||||
.filter(([key, entry]) => {
|
||||
if (!opts.spawnedBy) {
|
||||
return true;
|
||||
}
|
||||
if (key === "unknown" || key === "global") {
|
||||
return false;
|
||||
}
|
||||
return entry?.spawnedBy === opts.spawnedBy;
|
||||
})
|
||||
.filter(([, entry]) => {
|
||||
if (!opts.label) {
|
||||
return true;
|
||||
}
|
||||
return entry?.label === opts.label;
|
||||
});
|
||||
}
|
||||
|
||||
function finalizeSessionListRows(
|
||||
sessions: GatewaySessionRow[],
|
||||
opts: SessionsListNormalizedOptions,
|
||||
): GatewaySessionRow[] {
|
||||
let filtered = sessions.toSorted((a, b) => (b.updatedAt ?? 0) - (a.updatedAt ?? 0));
|
||||
|
||||
if (opts.search) {
|
||||
filtered = filtered.filter((session) => {
|
||||
const fields = [
|
||||
session.displayName,
|
||||
session.label,
|
||||
session.subject,
|
||||
session.sessionId,
|
||||
session.key,
|
||||
];
|
||||
return fields.some(
|
||||
(field) => typeof field === "string" && field.toLowerCase().includes(opts.search),
|
||||
);
|
||||
});
|
||||
}
|
||||
|
||||
if (opts.activeMinutes !== undefined) {
|
||||
const cutoff = opts.now - opts.activeMinutes * 60_000;
|
||||
filtered = filtered.filter((session) => (session.updatedAt ?? 0) >= cutoff);
|
||||
}
|
||||
|
||||
if (opts.limit !== undefined) {
|
||||
filtered = filtered.slice(0, opts.limit);
|
||||
}
|
||||
|
||||
return filtered;
|
||||
}
|
||||
|
||||
function resolveChildSessionKeys(
|
||||
controllerSessionKey: string,
|
||||
store: Record<string, SessionEntry>,
|
||||
@ -276,6 +416,72 @@ function resolveChildSessionKeys(
|
||||
return childSessions.length > 0 ? childSessions : undefined;
|
||||
}
|
||||
|
||||
function resolveSessionEntryModelIdentity(params: {
|
||||
cfg: OpenClawConfig;
|
||||
key: string;
|
||||
entry?: SessionEntry;
|
||||
}): { provider?: string; model: string } {
|
||||
const sessionAgentId = normalizeAgentId(
|
||||
parseAgentSessionKey(params.key)?.agentId ?? resolveDefaultAgentId(params.cfg),
|
||||
);
|
||||
const resolved = resolveSessionModelIdentityRef(params.cfg, params.entry, sessionAgentId);
|
||||
return {
|
||||
provider: resolved.provider,
|
||||
model: resolved.model ?? DEFAULT_MODEL,
|
||||
};
|
||||
}
|
||||
|
||||
function shouldResolveTranscriptUsageFallback(params: {
|
||||
cfg: OpenClawConfig;
|
||||
entry?: SessionEntry;
|
||||
fallbackProvider?: string;
|
||||
fallbackModel?: string;
|
||||
}): boolean {
|
||||
return (
|
||||
resolvePositiveNumber(resolveFreshSessionTotalTokens(params.entry)) === undefined ||
|
||||
resolvePositiveNumber(params.entry?.contextTokens) === undefined ||
|
||||
resolveEstimatedSessionCostUsd({
|
||||
cfg: params.cfg,
|
||||
provider: params.fallbackProvider,
|
||||
model: params.fallbackModel,
|
||||
entry: params.entry,
|
||||
}) === undefined
|
||||
);
|
||||
}
|
||||
|
||||
function buildTranscriptUsageFallbackFromSnapshot(params: {
|
||||
cfg: OpenClawConfig;
|
||||
snapshot: SessionTranscriptUsageSnapshot;
|
||||
fallbackProvider?: string;
|
||||
fallbackModel?: string;
|
||||
}): GatewayTranscriptUsageFallback {
|
||||
const modelProvider = params.snapshot.modelProvider ?? params.fallbackProvider;
|
||||
const model = params.snapshot.model ?? params.fallbackModel;
|
||||
const contextTokens = resolveContextTokensForModel({
|
||||
cfg: params.cfg,
|
||||
provider: modelProvider,
|
||||
model,
|
||||
});
|
||||
const estimatedCostUsd = resolveEstimatedSessionCostUsd({
|
||||
cfg: params.cfg,
|
||||
provider: modelProvider,
|
||||
model,
|
||||
explicitCostUsd: params.snapshot.costUsd,
|
||||
entry: {
|
||||
inputTokens: params.snapshot.inputTokens,
|
||||
outputTokens: params.snapshot.outputTokens,
|
||||
cacheRead: params.snapshot.cacheRead,
|
||||
cacheWrite: params.snapshot.cacheWrite,
|
||||
},
|
||||
});
|
||||
return {
|
||||
totalTokens: resolvePositiveNumber(params.snapshot.totalTokens),
|
||||
totalTokensFresh: params.snapshot.totalTokensFresh === true,
|
||||
contextTokens: resolvePositiveNumber(contextTokens),
|
||||
estimatedCostUsd,
|
||||
};
|
||||
}
|
||||
|
||||
function resolveTranscriptUsageFallback(params: {
|
||||
cfg: OpenClawConfig;
|
||||
key: string;
|
||||
@ -306,31 +512,45 @@ function resolveTranscriptUsageFallback(params: {
|
||||
if (!snapshot) {
|
||||
return null;
|
||||
}
|
||||
const modelProvider = snapshot.modelProvider ?? params.fallbackProvider;
|
||||
const model = snapshot.model ?? params.fallbackModel;
|
||||
const contextTokens = resolveContextTokensForModel({
|
||||
return buildTranscriptUsageFallbackFromSnapshot({
|
||||
cfg: params.cfg,
|
||||
provider: modelProvider,
|
||||
model,
|
||||
snapshot,
|
||||
fallbackProvider: params.fallbackProvider,
|
||||
fallbackModel: params.fallbackModel,
|
||||
});
|
||||
const estimatedCostUsd = resolveEstimatedSessionCostUsd({
|
||||
}
|
||||
|
||||
async function resolveTranscriptUsageFallbackAsync(params: {
|
||||
cfg: OpenClawConfig;
|
||||
key: string;
|
||||
entry?: SessionEntry;
|
||||
storePath: string;
|
||||
fallbackProvider?: string;
|
||||
fallbackModel?: string;
|
||||
}): Promise<GatewayTranscriptUsageFallback | null> {
|
||||
const entry = params.entry;
|
||||
if (!entry?.sessionId) {
|
||||
return null;
|
||||
}
|
||||
const parsed = parseAgentSessionKey(params.key);
|
||||
const agentId = parsed?.agentId
|
||||
? normalizeAgentId(parsed.agentId)
|
||||
: resolveDefaultAgentId(params.cfg);
|
||||
const snapshot = await readLatestSessionUsageFromTranscriptAsync(
|
||||
entry.sessionId,
|
||||
params.storePath,
|
||||
entry.sessionFile,
|
||||
agentId,
|
||||
);
|
||||
if (!snapshot) {
|
||||
return null;
|
||||
}
|
||||
return buildTranscriptUsageFallbackFromSnapshot({
|
||||
cfg: params.cfg,
|
||||
provider: modelProvider,
|
||||
model,
|
||||
explicitCostUsd: snapshot.costUsd,
|
||||
entry: {
|
||||
inputTokens: snapshot.inputTokens,
|
||||
outputTokens: snapshot.outputTokens,
|
||||
cacheRead: snapshot.cacheRead,
|
||||
cacheWrite: snapshot.cacheWrite,
|
||||
},
|
||||
snapshot,
|
||||
fallbackProvider: params.fallbackProvider,
|
||||
fallbackModel: params.fallbackModel,
|
||||
});
|
||||
return {
|
||||
totalTokens: resolvePositiveNumber(snapshot.totalTokens),
|
||||
totalTokensFresh: snapshot.totalTokensFresh === true,
|
||||
contextTokens: resolvePositiveNumber(contextTokens),
|
||||
estimatedCostUsd,
|
||||
};
|
||||
}
|
||||
|
||||
export function loadSessionEntry(sessionKey: string) {
|
||||
@ -1024,6 +1244,8 @@ export function buildGatewaySessionRow(params: {
|
||||
now?: number;
|
||||
includeDerivedTitles?: boolean;
|
||||
includeLastMessage?: boolean;
|
||||
transcriptUsage?: GatewayTranscriptUsageFallback | null;
|
||||
skipTranscriptUsageFallback?: boolean;
|
||||
}): GatewaySessionRow {
|
||||
const { cfg, storePath, store, key, entry } = params;
|
||||
const now = params.now ?? Date.now();
|
||||
@ -1066,15 +1288,14 @@ export function buildGatewaySessionRow(params: {
|
||||
);
|
||||
const modelProvider = resolvedModel.provider;
|
||||
const model = resolvedModel.model ?? DEFAULT_MODEL;
|
||||
const transcriptUsage =
|
||||
resolvePositiveNumber(resolveFreshSessionTotalTokens(entry)) === undefined ||
|
||||
resolvePositiveNumber(entry?.contextTokens) === undefined ||
|
||||
resolveEstimatedSessionCostUsd({
|
||||
cfg,
|
||||
provider: modelProvider,
|
||||
model,
|
||||
entry,
|
||||
}) === undefined
|
||||
const transcriptUsage = params.skipTranscriptUsageFallback
|
||||
? (params.transcriptUsage ?? null)
|
||||
: shouldResolveTranscriptUsageFallback({
|
||||
cfg,
|
||||
entry,
|
||||
fallbackProvider: modelProvider,
|
||||
fallbackModel: model,
|
||||
})
|
||||
? resolveTranscriptUsageFallback({
|
||||
cfg,
|
||||
key,
|
||||
@ -1196,62 +1417,12 @@ export function listSessionsFromStore(params: {
|
||||
cfg: OpenClawConfig;
|
||||
storePath: string;
|
||||
store: Record<string, SessionEntry>;
|
||||
opts: import("./protocol/index.js").SessionsListParams;
|
||||
opts: SessionsListParams;
|
||||
}): SessionsListResult {
|
||||
const { cfg, storePath, store, opts } = params;
|
||||
const now = Date.now();
|
||||
const normalizedOpts = normalizeSessionsListOptions(opts);
|
||||
|
||||
const includeGlobal = opts.includeGlobal === true;
|
||||
const includeUnknown = opts.includeUnknown === true;
|
||||
const includeDerivedTitles = opts.includeDerivedTitles === true;
|
||||
const includeLastMessage = opts.includeLastMessage === true;
|
||||
const spawnedBy = typeof opts.spawnedBy === "string" ? opts.spawnedBy : "";
|
||||
const label = typeof opts.label === "string" ? opts.label.trim() : "";
|
||||
const agentId = typeof opts.agentId === "string" ? normalizeAgentId(opts.agentId) : "";
|
||||
const search = typeof opts.search === "string" ? opts.search.trim().toLowerCase() : "";
|
||||
const activeMinutes =
|
||||
typeof opts.activeMinutes === "number" && Number.isFinite(opts.activeMinutes)
|
||||
? Math.max(1, Math.floor(opts.activeMinutes))
|
||||
: undefined;
|
||||
|
||||
let sessions = Object.entries(store)
|
||||
.filter(([key]) => {
|
||||
if (isCronRunSessionKey(key)) {
|
||||
return false;
|
||||
}
|
||||
if (!includeGlobal && key === "global") {
|
||||
return false;
|
||||
}
|
||||
if (!includeUnknown && key === "unknown") {
|
||||
return false;
|
||||
}
|
||||
if (agentId) {
|
||||
if (key === "global" || key === "unknown") {
|
||||
return false;
|
||||
}
|
||||
const parsed = parseAgentSessionKey(key);
|
||||
if (!parsed) {
|
||||
return false;
|
||||
}
|
||||
return normalizeAgentId(parsed.agentId) === agentId;
|
||||
}
|
||||
return true;
|
||||
})
|
||||
.filter(([key, entry]) => {
|
||||
if (!spawnedBy) {
|
||||
return true;
|
||||
}
|
||||
if (key === "unknown" || key === "global") {
|
||||
return false;
|
||||
}
|
||||
return entry?.spawnedBy === spawnedBy;
|
||||
})
|
||||
.filter(([, entry]) => {
|
||||
if (!label) {
|
||||
return true;
|
||||
}
|
||||
return entry?.label === label;
|
||||
})
|
||||
let sessions = filterSessionListEntries(store, normalizedOpts)
|
||||
.map(([key, entry]) =>
|
||||
buildGatewaySessionRow({
|
||||
cfg,
|
||||
@ -1259,35 +1430,101 @@ export function listSessionsFromStore(params: {
|
||||
store,
|
||||
key,
|
||||
entry,
|
||||
now,
|
||||
includeDerivedTitles,
|
||||
includeLastMessage,
|
||||
now: normalizedOpts.now,
|
||||
includeDerivedTitles: normalizedOpts.includeDerivedTitles,
|
||||
includeLastMessage: normalizedOpts.includeLastMessage,
|
||||
}),
|
||||
)
|
||||
.toSorted((a, b) => (b.updatedAt ?? 0) - (a.updatedAt ?? 0));
|
||||
|
||||
if (search) {
|
||||
sessions = sessions.filter((s) => {
|
||||
const fields = [s.displayName, s.label, s.subject, s.sessionId, s.key];
|
||||
return fields.some((f) => typeof f === "string" && f.toLowerCase().includes(search));
|
||||
});
|
||||
}
|
||||
|
||||
if (activeMinutes !== undefined) {
|
||||
const cutoff = now - activeMinutes * 60_000;
|
||||
sessions = sessions.filter((s) => (s.updatedAt ?? 0) >= cutoff);
|
||||
}
|
||||
|
||||
if (typeof opts.limit === "number" && Number.isFinite(opts.limit)) {
|
||||
const limit = Math.max(1, Math.floor(opts.limit));
|
||||
sessions = sessions.slice(0, limit);
|
||||
}
|
||||
);
|
||||
sessions = finalizeSessionListRows(sessions, normalizedOpts);
|
||||
|
||||
return {
|
||||
ts: now,
|
||||
ts: normalizedOpts.now,
|
||||
path: storePath,
|
||||
count: sessions.length,
|
||||
defaults: getSessionDefaults(cfg),
|
||||
sessions,
|
||||
};
|
||||
}
|
||||
|
||||
export async function listSessionsFromStoreAsync(params: {
|
||||
cfg: OpenClawConfig;
|
||||
storePath: string;
|
||||
store: Record<string, SessionEntry>;
|
||||
opts: SessionsListParams;
|
||||
}): Promise<SessionsListResult> {
|
||||
const concurrency = resolveSessionsListFallbackConcurrency(params.cfg);
|
||||
if (concurrency <= DEFAULT_SESSIONS_LIST_FALLBACK_CONCURRENCY) {
|
||||
return listSessionsFromStore(params);
|
||||
}
|
||||
|
||||
const normalizedOpts = normalizeSessionsListOptions(params.opts);
|
||||
const filteredEntries = filterSessionListEntries(params.store, normalizedOpts);
|
||||
const transcriptUsageByKey = new Map<string, GatewayTranscriptUsageFallback | null>();
|
||||
const tasks = filteredEntries
|
||||
.filter(([key, entry]) => {
|
||||
const resolvedModel = resolveSessionEntryModelIdentity({
|
||||
cfg: params.cfg,
|
||||
key,
|
||||
entry,
|
||||
});
|
||||
return shouldResolveTranscriptUsageFallback({
|
||||
cfg: params.cfg,
|
||||
entry,
|
||||
fallbackProvider: resolvedModel.provider,
|
||||
fallbackModel: resolvedModel.model,
|
||||
});
|
||||
})
|
||||
.map(([key, entry]) => async () => {
|
||||
const resolvedModel = resolveSessionEntryModelIdentity({
|
||||
cfg: params.cfg,
|
||||
key,
|
||||
entry,
|
||||
});
|
||||
const usage = await resolveTranscriptUsageFallbackAsync({
|
||||
cfg: params.cfg,
|
||||
key,
|
||||
entry,
|
||||
storePath: params.storePath,
|
||||
fallbackProvider: resolvedModel.provider,
|
||||
fallbackModel: resolvedModel.model,
|
||||
});
|
||||
return { key, usage };
|
||||
});
|
||||
|
||||
if (tasks.length > 0) {
|
||||
const { results } = await runTasksWithConcurrency({
|
||||
tasks,
|
||||
limit: concurrency,
|
||||
errorMode: "continue",
|
||||
});
|
||||
for (const result of results) {
|
||||
if (result?.key) {
|
||||
transcriptUsageByKey.set(result.key, result.usage ?? null);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
let sessions = filteredEntries.map(([key, entry]) =>
|
||||
buildGatewaySessionRow({
|
||||
cfg: params.cfg,
|
||||
storePath: params.storePath,
|
||||
store: params.store,
|
||||
key,
|
||||
entry,
|
||||
now: normalizedOpts.now,
|
||||
includeDerivedTitles: normalizedOpts.includeDerivedTitles,
|
||||
includeLastMessage: normalizedOpts.includeLastMessage,
|
||||
transcriptUsage: transcriptUsageByKey.get(key) ?? null,
|
||||
skipTranscriptUsageFallback: transcriptUsageByKey.has(key),
|
||||
}),
|
||||
);
|
||||
sessions = finalizeSessionListRows(sessions, normalizedOpts);
|
||||
|
||||
return {
|
||||
ts: normalizedOpts.now,
|
||||
path: params.storePath,
|
||||
count: sessions.length,
|
||||
defaults: getSessionDefaults(params.cfg),
|
||||
sessions,
|
||||
};
|
||||
}
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user