Merge 8b29ab5edbaba7ed44277630e93f3feeca5670ce into 6b4c24c2e55b5b4013277bd799525086f6a0c40f
This commit is contained in:
commit
a38ea151f9
@ -225,6 +225,33 @@ describe("gateway.tools config", () => {
|
||||
});
|
||||
});
|
||||
|
||||
describe("gateway.sessionsList.fallbackConcurrency", () => {
|
||||
it("accepts positive integer concurrency", () => {
|
||||
const res = validateConfigObject({
|
||||
gateway: {
|
||||
sessionsList: {
|
||||
fallbackConcurrency: 4,
|
||||
},
|
||||
},
|
||||
});
|
||||
expect(res.ok).toBe(true);
|
||||
});
|
||||
|
||||
it("rejects zero or negative values", () => {
|
||||
const res = validateConfigObject({
|
||||
gateway: {
|
||||
sessionsList: {
|
||||
fallbackConcurrency: 0,
|
||||
},
|
||||
},
|
||||
});
|
||||
expect(res.ok).toBe(false);
|
||||
if (!res.ok) {
|
||||
expect(res.issues[0]?.path).toBe("gateway.sessionsList.fallbackConcurrency");
|
||||
}
|
||||
});
|
||||
});
|
||||
|
||||
describe("gateway.channelHealthCheckMinutes", () => {
|
||||
it("accepts zero to disable monitor", () => {
|
||||
const res = validateConfigObject({
|
||||
|
||||
@ -100,6 +100,10 @@ export const FIELD_HELP: Record<string, string> = {
|
||||
"Explicit gateway-level tool allowlist when you want a narrow set of tools available at runtime. Use this for locked-down environments where tool scope must be tightly controlled.",
|
||||
"gateway.tools.deny":
|
||||
"Explicit gateway-level tool denylist to block risky tools even if lower-level policies allow them. Use deny rules for emergency response and defense-in-depth hardening.",
|
||||
"gateway.sessionsList":
|
||||
"Gateway sessions.list read-path tuning for transcript-derived metadata fallbacks. Leave defaults alone unless large session inventories or large transcripts are causing sessions.list timeouts.",
|
||||
"gateway.sessionsList.fallbackConcurrency":
|
||||
"Maximum number of transcript usage fallback reads allowed in parallel during gateway sessions.list. Default: 1, which preserves the current serial behavior; raise this only when host I/O and CPU headroom are available.",
|
||||
"gateway.channelHealthCheckMinutes":
|
||||
"Interval in minutes for automatic channel health probing and status updates. Use lower intervals for faster detection, or higher intervals to reduce periodic probe noise.",
|
||||
"gateway.channelStaleEventThresholdMinutes":
|
||||
|
||||
@ -83,6 +83,8 @@ export const FIELD_LABELS: Record<string, string> = {
|
||||
"gateway.tools": "Gateway Tool Exposure Policy",
|
||||
"gateway.tools.allow": "Gateway Tool Allowlist",
|
||||
"gateway.tools.deny": "Gateway Tool Denylist",
|
||||
"gateway.sessionsList": "Gateway Sessions List",
|
||||
"gateway.sessionsList.fallbackConcurrency": "Gateway Sessions List Fallback Concurrency",
|
||||
"gateway.channelHealthCheckMinutes": "Gateway Channel Health Check Interval (min)",
|
||||
"gateway.channelStaleEventThresholdMinutes": "Gateway Channel Stale Event Threshold (min)",
|
||||
"gateway.channelMaxRestartsPerHour": "Gateway Channel Max Restarts Per Hour",
|
||||
|
||||
@ -390,6 +390,15 @@ export type GatewayToolsConfig = {
|
||||
allow?: string[];
|
||||
};
|
||||
|
||||
export type GatewaySessionsListConfig = {
|
||||
/**
|
||||
* Maximum number of transcript usage fallbacks to hydrate in parallel while
|
||||
* servicing gateway sessions.list.
|
||||
* Default: 1 (preserves the current serial behavior).
|
||||
*/
|
||||
fallbackConcurrency?: number;
|
||||
};
|
||||
|
||||
export type GatewayConfig = {
|
||||
/** Single multiplexed port for Gateway WS + HTTP (default: 18789). */
|
||||
port?: number;
|
||||
@ -432,6 +441,8 @@ export type GatewayConfig = {
|
||||
allowRealIpFallback?: boolean;
|
||||
/** Tool access restrictions for HTTP /tools/invoke endpoint. */
|
||||
tools?: GatewayToolsConfig;
|
||||
/** sessions.list performance settings. */
|
||||
sessionsList?: GatewaySessionsListConfig;
|
||||
/**
|
||||
* Channel health monitor interval in minutes.
|
||||
* Periodically checks channel health and restarts unhealthy channels.
|
||||
|
||||
@ -718,6 +718,12 @@ export const OpenClawSchema = z
|
||||
})
|
||||
.strict()
|
||||
.optional(),
|
||||
sessionsList: z
|
||||
.object({
|
||||
fallbackConcurrency: z.number().int().min(1).optional(),
|
||||
})
|
||||
.strict()
|
||||
.optional(),
|
||||
channelHealthCheckMinutes: z.number().int().min(0).optional(),
|
||||
channelStaleEventThresholdMinutes: z.number().int().min(1).optional(),
|
||||
channelMaxRestartsPerHour: z.number().int().min(1).optional(),
|
||||
|
||||
@ -49,7 +49,7 @@ import {
|
||||
import { reactivateCompletedSubagentSession } from "../session-subagent-reactivation.js";
|
||||
import {
|
||||
archiveFileOnDisk,
|
||||
listSessionsFromStore,
|
||||
listSessionsFromStoreAsync,
|
||||
loadCombinedSessionStoreForGateway,
|
||||
loadGatewaySessionRow,
|
||||
loadSessionEntry,
|
||||
@ -471,14 +471,14 @@ async function handleSessionSend(params: {
|
||||
}
|
||||
}
|
||||
export const sessionsHandlers: GatewayRequestHandlers = {
|
||||
"sessions.list": ({ params, respond }) => {
|
||||
"sessions.list": async ({ params, respond }) => {
|
||||
if (!assertValidParams(params, validateSessionsListParams, "sessions.list", respond)) {
|
||||
return;
|
||||
}
|
||||
const p = params;
|
||||
const cfg = loadConfig();
|
||||
const { storePath, store } = loadCombinedSessionStoreForGateway(cfg);
|
||||
const result = listSessionsFromStore({
|
||||
const result = await listSessionsFromStoreAsync({
|
||||
cfg,
|
||||
storePath,
|
||||
store,
|
||||
|
||||
@ -8,6 +8,7 @@ import {
|
||||
readFirstUserMessageFromTranscript,
|
||||
readLastMessagePreviewFromTranscript,
|
||||
readLatestSessionUsageFromTranscript,
|
||||
readLatestSessionUsageFromTranscriptAsync,
|
||||
readSessionMessages,
|
||||
readSessionTitleFieldsFromTranscript,
|
||||
readSessionPreviewItemsFromTranscript,
|
||||
@ -699,6 +700,30 @@ describe("readLatestSessionUsageFromTranscript", () => {
|
||||
});
|
||||
});
|
||||
|
||||
test("supports async usage fallback reads with matching results", async () => {
|
||||
const sessionId = "usage-session-async";
|
||||
writeTranscript(tmpDir, sessionId, [
|
||||
{ type: "session", version: 1, id: sessionId },
|
||||
{
|
||||
message: {
|
||||
role: "assistant",
|
||||
provider: "openai",
|
||||
model: "gpt-5.4",
|
||||
usage: {
|
||||
input: 900,
|
||||
output: 200,
|
||||
cacheRead: 100,
|
||||
cost: { total: 0.0031 },
|
||||
},
|
||||
},
|
||||
},
|
||||
]);
|
||||
|
||||
await expect(readLatestSessionUsageFromTranscriptAsync(sessionId, storePath)).resolves.toEqual(
|
||||
readLatestSessionUsageFromTranscript(sessionId, storePath),
|
||||
);
|
||||
});
|
||||
|
||||
test("aggregates assistant usage across the full transcript and keeps the latest context snapshot", () => {
|
||||
const sessionId = "usage-aggregate";
|
||||
writeTranscript(tmpDir, sessionId, [
|
||||
|
||||
@ -730,6 +730,27 @@ export function readLatestSessionUsageFromTranscript(
|
||||
});
|
||||
}
|
||||
|
||||
export async function readLatestSessionUsageFromTranscriptAsync(
|
||||
sessionId: string,
|
||||
storePath: string | undefined,
|
||||
sessionFile?: string,
|
||||
agentId?: string,
|
||||
): Promise<SessionTranscriptUsageSnapshot | null> {
|
||||
const filePath = findExistingTranscriptPath(sessionId, storePath, sessionFile, agentId);
|
||||
if (!filePath) {
|
||||
return null;
|
||||
}
|
||||
try {
|
||||
const chunk = await fs.promises.readFile(filePath, "utf-8");
|
||||
if (!chunk) {
|
||||
return null;
|
||||
}
|
||||
return extractLatestUsageFromTranscriptChunk(chunk);
|
||||
} catch {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
const PREVIEW_READ_SIZES = [64 * 1024, 256 * 1024, 1024 * 1024];
|
||||
const PREVIEW_MAX_LINES = 200;
|
||||
|
||||
|
||||
@ -17,6 +17,7 @@ import {
|
||||
deriveSessionTitle,
|
||||
listAgentsForGateway,
|
||||
listSessionsFromStore,
|
||||
listSessionsFromStoreAsync,
|
||||
loadCombinedSessionStoreForGateway,
|
||||
loadSessionEntry,
|
||||
parseGroupKey,
|
||||
@ -1164,6 +1165,159 @@ describe("listSessionsFromStore search", () => {
|
||||
fs.rmSync(tmpDir, { recursive: true, force: true });
|
||||
}
|
||||
});
|
||||
|
||||
test("listSessionsFromStoreAsync uses subagent run model for child session transcript fallback", async () => {
|
||||
const tmpDir = fs.mkdtempSync(path.join(os.tmpdir(), "openclaw-session-utils-async-subagent-"));
|
||||
const storePath = path.join(tmpDir, "sessions.json");
|
||||
const now = Date.now();
|
||||
const cfg = {
|
||||
gateway: {
|
||||
sessionsList: {
|
||||
fallbackConcurrency: 4,
|
||||
},
|
||||
},
|
||||
session: { mainKey: "main" },
|
||||
agents: {
|
||||
list: [{ id: "main", default: true }],
|
||||
defaults: {
|
||||
models: {
|
||||
"anthropic/claude-sonnet-4-6": { params: { context1m: true } },
|
||||
},
|
||||
},
|
||||
},
|
||||
} as unknown as OpenClawConfig;
|
||||
fs.writeFileSync(
|
||||
path.join(tmpDir, "sess-child.jsonl"),
|
||||
[
|
||||
JSON.stringify({ type: "session", version: 1, id: "sess-child" }),
|
||||
JSON.stringify({
|
||||
message: {
|
||||
role: "assistant",
|
||||
usage: {
|
||||
input: 2_000,
|
||||
output: 500,
|
||||
cacheRead: 1_200,
|
||||
cost: { total: 0.007725 },
|
||||
},
|
||||
},
|
||||
}),
|
||||
].join("\n"),
|
||||
"utf-8",
|
||||
);
|
||||
|
||||
addSubagentRunForTests({
|
||||
runId: "run-child-async",
|
||||
childSessionKey: "agent:main:subagent:child-async",
|
||||
controllerSessionKey: "agent:main:main",
|
||||
requesterSessionKey: "agent:main:main",
|
||||
requesterDisplayKey: "main",
|
||||
task: "child task",
|
||||
cleanup: "keep",
|
||||
createdAt: now - 5_000,
|
||||
startedAt: now - 4_000,
|
||||
model: "anthropic/claude-sonnet-4-6",
|
||||
});
|
||||
|
||||
try {
|
||||
const result = await listSessionsFromStoreAsync({
|
||||
cfg,
|
||||
storePath,
|
||||
store: {
|
||||
"agent:main:subagent:child-async": {
|
||||
sessionId: "sess-child",
|
||||
updatedAt: now,
|
||||
spawnedBy: "agent:main:main",
|
||||
totalTokens: 0,
|
||||
totalTokensFresh: false,
|
||||
} as SessionEntry,
|
||||
},
|
||||
opts: {},
|
||||
});
|
||||
|
||||
expect(result.sessions[0]).toMatchObject({
|
||||
key: "agent:main:subagent:child-async",
|
||||
status: "running",
|
||||
modelProvider: "anthropic",
|
||||
model: "claude-sonnet-4-6",
|
||||
totalTokens: 3_200,
|
||||
totalTokensFresh: true,
|
||||
contextTokens: 1_048_576,
|
||||
});
|
||||
expect(result.sessions[0]?.estimatedCostUsd).toBeCloseTo(0.007725, 8);
|
||||
} finally {
|
||||
fs.rmSync(tmpDir, { recursive: true, force: true });
|
||||
}
|
||||
});
|
||||
|
||||
test("listSessionsFromStoreAsync hydrates transcript fallbacks when concurrency is enabled", async () => {
|
||||
const tmpDir = fs.mkdtempSync(path.join(os.tmpdir(), "openclaw-session-utils-async-"));
|
||||
const storePath = path.join(tmpDir, "sessions.json");
|
||||
const cfg = {
|
||||
gateway: {
|
||||
sessionsList: {
|
||||
fallbackConcurrency: 4,
|
||||
},
|
||||
},
|
||||
session: { mainKey: "main" },
|
||||
agents: {
|
||||
list: [{ id: "main", default: true }],
|
||||
defaults: {
|
||||
models: {
|
||||
"anthropic/claude-sonnet-4-6": { params: { context1m: true } },
|
||||
},
|
||||
},
|
||||
},
|
||||
} as unknown as OpenClawConfig;
|
||||
fs.writeFileSync(
|
||||
path.join(tmpDir, "sess-main.jsonl"),
|
||||
[
|
||||
JSON.stringify({ type: "session", version: 1, id: "sess-main" }),
|
||||
JSON.stringify({
|
||||
message: {
|
||||
role: "assistant",
|
||||
provider: "anthropic",
|
||||
model: "claude-sonnet-4-6",
|
||||
usage: {
|
||||
input: 2_000,
|
||||
output: 500,
|
||||
cacheRead: 1_200,
|
||||
cost: { total: 0.007725 },
|
||||
},
|
||||
},
|
||||
}),
|
||||
].join("\n"),
|
||||
"utf-8",
|
||||
);
|
||||
|
||||
try {
|
||||
const result = await listSessionsFromStoreAsync({
|
||||
cfg,
|
||||
storePath,
|
||||
store: {
|
||||
"agent:main:main": {
|
||||
sessionId: "sess-main",
|
||||
updatedAt: Date.now(),
|
||||
modelProvider: "anthropic",
|
||||
model: "claude-sonnet-4-6",
|
||||
totalTokens: 0,
|
||||
totalTokensFresh: false,
|
||||
inputTokens: 0,
|
||||
outputTokens: 0,
|
||||
cacheRead: 0,
|
||||
cacheWrite: 0,
|
||||
} as SessionEntry,
|
||||
},
|
||||
opts: {},
|
||||
});
|
||||
|
||||
expect(result.sessions[0]?.totalTokens).toBe(3_200);
|
||||
expect(result.sessions[0]?.totalTokensFresh).toBe(true);
|
||||
expect(result.sessions[0]?.contextTokens).toBe(1_048_576);
|
||||
expect(result.sessions[0]?.estimatedCostUsd).toBeCloseTo(0.007725, 8);
|
||||
} finally {
|
||||
fs.rmSync(tmpDir, { recursive: true, force: true });
|
||||
}
|
||||
});
|
||||
});
|
||||
|
||||
describe("listSessionsFromStore subagent metadata", () => {
|
||||
|
||||
@ -47,10 +47,14 @@ import {
|
||||
resolveAvatarMime,
|
||||
} from "../shared/avatar-policy.js";
|
||||
import { normalizeSessionDeliveryFields } from "../utils/delivery-context.js";
|
||||
import { runTasksWithConcurrency } from "../utils/run-with-concurrency.js";
|
||||
import { estimateUsageCost, resolveModelCostConfig } from "../utils/usage-format.js";
|
||||
import type { SessionsListParams } from "./protocol/index.js";
|
||||
import {
|
||||
readLatestSessionUsageFromTranscript,
|
||||
readLatestSessionUsageFromTranscriptAsync,
|
||||
readSessionTitleFieldsFromTranscript,
|
||||
type SessionTranscriptUsageSnapshot,
|
||||
} from "./session-utils.fs.js";
|
||||
import type {
|
||||
GatewayAgentRow,
|
||||
@ -83,6 +87,29 @@ export type {
|
||||
} from "./session-utils.types.js";
|
||||
|
||||
const DERIVED_TITLE_MAX_LEN = 60;
|
||||
const DEFAULT_SESSIONS_LIST_FALLBACK_CONCURRENCY = 1;
|
||||
const MAX_SESSIONS_LIST_FALLBACK_CONCURRENCY = 16;
|
||||
|
||||
type GatewayTranscriptUsageFallback = {
|
||||
estimatedCostUsd?: number;
|
||||
totalTokens?: number;
|
||||
totalTokensFresh?: boolean;
|
||||
contextTokens?: number;
|
||||
};
|
||||
|
||||
type SessionsListNormalizedOptions = {
|
||||
now: number;
|
||||
includeGlobal: boolean;
|
||||
includeUnknown: boolean;
|
||||
includeDerivedTitles: boolean;
|
||||
includeLastMessage: boolean;
|
||||
spawnedBy: string;
|
||||
label: string;
|
||||
agentId: string;
|
||||
search: string;
|
||||
activeMinutes?: number;
|
||||
limit?: number;
|
||||
};
|
||||
|
||||
function tryResolveExistingPath(value: string): string | null {
|
||||
try {
|
||||
@ -253,6 +280,119 @@ function resolveEstimatedSessionCostUsd(params: {
|
||||
return resolveNonNegativeNumber(estimated);
|
||||
}
|
||||
|
||||
function resolveSessionsListFallbackConcurrency(cfg: OpenClawConfig): number {
|
||||
const raw = cfg.gateway?.sessionsList?.fallbackConcurrency;
|
||||
if (typeof raw !== "number" || !Number.isFinite(raw)) {
|
||||
return DEFAULT_SESSIONS_LIST_FALLBACK_CONCURRENCY;
|
||||
}
|
||||
return Math.max(
|
||||
DEFAULT_SESSIONS_LIST_FALLBACK_CONCURRENCY,
|
||||
Math.min(MAX_SESSIONS_LIST_FALLBACK_CONCURRENCY, Math.floor(raw)),
|
||||
);
|
||||
}
|
||||
|
||||
function normalizeSessionsListOptions(
|
||||
opts: SessionsListParams,
|
||||
now = Date.now(),
|
||||
): SessionsListNormalizedOptions {
|
||||
return {
|
||||
now,
|
||||
includeGlobal: opts.includeGlobal === true,
|
||||
includeUnknown: opts.includeUnknown === true,
|
||||
includeDerivedTitles: opts.includeDerivedTitles === true,
|
||||
includeLastMessage: opts.includeLastMessage === true,
|
||||
spawnedBy: typeof opts.spawnedBy === "string" ? opts.spawnedBy : "",
|
||||
label: typeof opts.label === "string" ? opts.label.trim() : "",
|
||||
agentId: typeof opts.agentId === "string" ? normalizeAgentId(opts.agentId) : "",
|
||||
search: typeof opts.search === "string" ? opts.search.trim().toLowerCase() : "",
|
||||
activeMinutes:
|
||||
typeof opts.activeMinutes === "number" && Number.isFinite(opts.activeMinutes)
|
||||
? Math.max(1, Math.floor(opts.activeMinutes))
|
||||
: undefined,
|
||||
limit:
|
||||
typeof opts.limit === "number" && Number.isFinite(opts.limit)
|
||||
? Math.max(1, Math.floor(opts.limit))
|
||||
: undefined,
|
||||
};
|
||||
}
|
||||
|
||||
function filterSessionListEntries(
|
||||
store: Record<string, SessionEntry>,
|
||||
opts: SessionsListNormalizedOptions,
|
||||
): Array<[string, SessionEntry]> {
|
||||
return Object.entries(store)
|
||||
.filter(([key]) => {
|
||||
if (isCronRunSessionKey(key)) {
|
||||
return false;
|
||||
}
|
||||
if (!opts.includeGlobal && key === "global") {
|
||||
return false;
|
||||
}
|
||||
if (!opts.includeUnknown && key === "unknown") {
|
||||
return false;
|
||||
}
|
||||
if (opts.agentId) {
|
||||
if (key === "global" || key === "unknown") {
|
||||
return false;
|
||||
}
|
||||
const parsed = parseAgentSessionKey(key);
|
||||
if (!parsed) {
|
||||
return false;
|
||||
}
|
||||
return normalizeAgentId(parsed.agentId) === opts.agentId;
|
||||
}
|
||||
return true;
|
||||
})
|
||||
.filter(([key, entry]) => {
|
||||
if (!opts.spawnedBy) {
|
||||
return true;
|
||||
}
|
||||
if (key === "unknown" || key === "global") {
|
||||
return false;
|
||||
}
|
||||
return entry?.spawnedBy === opts.spawnedBy;
|
||||
})
|
||||
.filter(([, entry]) => {
|
||||
if (!opts.label) {
|
||||
return true;
|
||||
}
|
||||
return entry?.label === opts.label;
|
||||
});
|
||||
}
|
||||
|
||||
function finalizeSessionListRows(
|
||||
sessions: GatewaySessionRow[],
|
||||
opts: SessionsListNormalizedOptions,
|
||||
): GatewaySessionRow[] {
|
||||
let filtered = sessions.toSorted((a, b) => (b.updatedAt ?? 0) - (a.updatedAt ?? 0));
|
||||
|
||||
if (opts.search) {
|
||||
filtered = filtered.filter((session) => {
|
||||
const fields = [
|
||||
session.displayName,
|
||||
session.label,
|
||||
session.subject,
|
||||
session.sessionId,
|
||||
session.key,
|
||||
];
|
||||
return fields.some(
|
||||
(field) => typeof field === "string" && field.toLowerCase().includes(opts.search),
|
||||
);
|
||||
});
|
||||
}
|
||||
|
||||
if (opts.activeMinutes !== undefined) {
|
||||
const cutoff = opts.now - opts.activeMinutes * 60_000;
|
||||
filtered = filtered.filter((session) => (session.updatedAt ?? 0) >= cutoff);
|
||||
}
|
||||
|
||||
if (opts.limit !== undefined) {
|
||||
filtered = filtered.slice(0, opts.limit);
|
||||
}
|
||||
|
||||
return filtered;
|
||||
}
|
||||
|
||||
function resolveChildSessionKeys(
|
||||
controllerSessionKey: string,
|
||||
store: Record<string, SessionEntry>,
|
||||
@ -276,6 +416,78 @@ function resolveChildSessionKeys(
|
||||
return childSessions.length > 0 ? childSessions : undefined;
|
||||
}
|
||||
|
||||
function resolveSessionEntryModelIdentity(params: {
|
||||
cfg: OpenClawConfig;
|
||||
key: string;
|
||||
entry?: SessionEntry;
|
||||
}): { provider?: string; model: string } {
|
||||
const sessionAgentId = normalizeAgentId(
|
||||
parseAgentSessionKey(params.key)?.agentId ?? resolveDefaultAgentId(params.cfg),
|
||||
);
|
||||
const subagentRun = getSubagentRunByChildSessionKey(params.key);
|
||||
const resolved = resolveSessionModelIdentityRef(
|
||||
params.cfg,
|
||||
params.entry,
|
||||
sessionAgentId,
|
||||
subagentRun?.model,
|
||||
);
|
||||
return {
|
||||
provider: resolved.provider,
|
||||
model: resolved.model ?? DEFAULT_MODEL,
|
||||
};
|
||||
}
|
||||
|
||||
function shouldResolveTranscriptUsageFallback(params: {
|
||||
cfg: OpenClawConfig;
|
||||
entry?: SessionEntry;
|
||||
fallbackProvider?: string;
|
||||
fallbackModel?: string;
|
||||
}): boolean {
|
||||
return (
|
||||
resolvePositiveNumber(resolveFreshSessionTotalTokens(params.entry)) === undefined ||
|
||||
resolvePositiveNumber(params.entry?.contextTokens) === undefined ||
|
||||
resolveEstimatedSessionCostUsd({
|
||||
cfg: params.cfg,
|
||||
provider: params.fallbackProvider,
|
||||
model: params.fallbackModel,
|
||||
entry: params.entry,
|
||||
}) === undefined
|
||||
);
|
||||
}
|
||||
|
||||
function buildTranscriptUsageFallbackFromSnapshot(params: {
|
||||
cfg: OpenClawConfig;
|
||||
snapshot: SessionTranscriptUsageSnapshot;
|
||||
fallbackProvider?: string;
|
||||
fallbackModel?: string;
|
||||
}): GatewayTranscriptUsageFallback {
|
||||
const modelProvider = params.snapshot.modelProvider ?? params.fallbackProvider;
|
||||
const model = params.snapshot.model ?? params.fallbackModel;
|
||||
const contextTokens = resolveContextTokensForModel({
|
||||
cfg: params.cfg,
|
||||
provider: modelProvider,
|
||||
model,
|
||||
});
|
||||
const estimatedCostUsd = resolveEstimatedSessionCostUsd({
|
||||
cfg: params.cfg,
|
||||
provider: modelProvider,
|
||||
model,
|
||||
explicitCostUsd: params.snapshot.costUsd,
|
||||
entry: {
|
||||
inputTokens: params.snapshot.inputTokens,
|
||||
outputTokens: params.snapshot.outputTokens,
|
||||
cacheRead: params.snapshot.cacheRead,
|
||||
cacheWrite: params.snapshot.cacheWrite,
|
||||
},
|
||||
});
|
||||
return {
|
||||
totalTokens: resolvePositiveNumber(params.snapshot.totalTokens),
|
||||
totalTokensFresh: params.snapshot.totalTokensFresh === true,
|
||||
contextTokens: resolvePositiveNumber(contextTokens),
|
||||
estimatedCostUsd,
|
||||
};
|
||||
}
|
||||
|
||||
function resolveTranscriptUsageFallback(params: {
|
||||
cfg: OpenClawConfig;
|
||||
key: string;
|
||||
@ -306,31 +518,45 @@ function resolveTranscriptUsageFallback(params: {
|
||||
if (!snapshot) {
|
||||
return null;
|
||||
}
|
||||
const modelProvider = snapshot.modelProvider ?? params.fallbackProvider;
|
||||
const model = snapshot.model ?? params.fallbackModel;
|
||||
const contextTokens = resolveContextTokensForModel({
|
||||
return buildTranscriptUsageFallbackFromSnapshot({
|
||||
cfg: params.cfg,
|
||||
provider: modelProvider,
|
||||
model,
|
||||
snapshot,
|
||||
fallbackProvider: params.fallbackProvider,
|
||||
fallbackModel: params.fallbackModel,
|
||||
});
|
||||
const estimatedCostUsd = resolveEstimatedSessionCostUsd({
|
||||
}
|
||||
|
||||
async function resolveTranscriptUsageFallbackAsync(params: {
|
||||
cfg: OpenClawConfig;
|
||||
key: string;
|
||||
entry?: SessionEntry;
|
||||
storePath: string;
|
||||
fallbackProvider?: string;
|
||||
fallbackModel?: string;
|
||||
}): Promise<GatewayTranscriptUsageFallback | null> {
|
||||
const entry = params.entry;
|
||||
if (!entry?.sessionId) {
|
||||
return null;
|
||||
}
|
||||
const parsed = parseAgentSessionKey(params.key);
|
||||
const agentId = parsed?.agentId
|
||||
? normalizeAgentId(parsed.agentId)
|
||||
: resolveDefaultAgentId(params.cfg);
|
||||
const snapshot = await readLatestSessionUsageFromTranscriptAsync(
|
||||
entry.sessionId,
|
||||
params.storePath,
|
||||
entry.sessionFile,
|
||||
agentId,
|
||||
);
|
||||
if (!snapshot) {
|
||||
return null;
|
||||
}
|
||||
return buildTranscriptUsageFallbackFromSnapshot({
|
||||
cfg: params.cfg,
|
||||
provider: modelProvider,
|
||||
model,
|
||||
explicitCostUsd: snapshot.costUsd,
|
||||
entry: {
|
||||
inputTokens: snapshot.inputTokens,
|
||||
outputTokens: snapshot.outputTokens,
|
||||
cacheRead: snapshot.cacheRead,
|
||||
cacheWrite: snapshot.cacheWrite,
|
||||
},
|
||||
snapshot,
|
||||
fallbackProvider: params.fallbackProvider,
|
||||
fallbackModel: params.fallbackModel,
|
||||
});
|
||||
return {
|
||||
totalTokens: resolvePositiveNumber(snapshot.totalTokens),
|
||||
totalTokensFresh: snapshot.totalTokensFresh === true,
|
||||
contextTokens: resolvePositiveNumber(contextTokens),
|
||||
estimatedCostUsd,
|
||||
};
|
||||
}
|
||||
|
||||
export function loadSessionEntry(sessionKey: string) {
|
||||
@ -1024,6 +1250,8 @@ export function buildGatewaySessionRow(params: {
|
||||
now?: number;
|
||||
includeDerivedTitles?: boolean;
|
||||
includeLastMessage?: boolean;
|
||||
transcriptUsage?: GatewayTranscriptUsageFallback | null;
|
||||
skipTranscriptUsageFallback?: boolean;
|
||||
}): GatewaySessionRow {
|
||||
const { cfg, storePath, store, key, entry } = params;
|
||||
const now = params.now ?? Date.now();
|
||||
@ -1066,15 +1294,14 @@ export function buildGatewaySessionRow(params: {
|
||||
);
|
||||
const modelProvider = resolvedModel.provider;
|
||||
const model = resolvedModel.model ?? DEFAULT_MODEL;
|
||||
const transcriptUsage =
|
||||
resolvePositiveNumber(resolveFreshSessionTotalTokens(entry)) === undefined ||
|
||||
resolvePositiveNumber(entry?.contextTokens) === undefined ||
|
||||
resolveEstimatedSessionCostUsd({
|
||||
cfg,
|
||||
provider: modelProvider,
|
||||
model,
|
||||
entry,
|
||||
}) === undefined
|
||||
const transcriptUsage = params.skipTranscriptUsageFallback
|
||||
? (params.transcriptUsage ?? null)
|
||||
: shouldResolveTranscriptUsageFallback({
|
||||
cfg,
|
||||
entry,
|
||||
fallbackProvider: modelProvider,
|
||||
fallbackModel: model,
|
||||
})
|
||||
? resolveTranscriptUsageFallback({
|
||||
cfg,
|
||||
key,
|
||||
@ -1196,98 +1423,112 @@ export function listSessionsFromStore(params: {
|
||||
cfg: OpenClawConfig;
|
||||
storePath: string;
|
||||
store: Record<string, SessionEntry>;
|
||||
opts: import("./protocol/index.js").SessionsListParams;
|
||||
opts: SessionsListParams;
|
||||
}): SessionsListResult {
|
||||
const { cfg, storePath, store, opts } = params;
|
||||
const now = Date.now();
|
||||
const normalizedOpts = normalizeSessionsListOptions(opts);
|
||||
|
||||
const includeGlobal = opts.includeGlobal === true;
|
||||
const includeUnknown = opts.includeUnknown === true;
|
||||
const includeDerivedTitles = opts.includeDerivedTitles === true;
|
||||
const includeLastMessage = opts.includeLastMessage === true;
|
||||
const spawnedBy = typeof opts.spawnedBy === "string" ? opts.spawnedBy : "";
|
||||
const label = typeof opts.label === "string" ? opts.label.trim() : "";
|
||||
const agentId = typeof opts.agentId === "string" ? normalizeAgentId(opts.agentId) : "";
|
||||
const search = typeof opts.search === "string" ? opts.search.trim().toLowerCase() : "";
|
||||
const activeMinutes =
|
||||
typeof opts.activeMinutes === "number" && Number.isFinite(opts.activeMinutes)
|
||||
? Math.max(1, Math.floor(opts.activeMinutes))
|
||||
: undefined;
|
||||
|
||||
let sessions = Object.entries(store)
|
||||
.filter(([key]) => {
|
||||
if (isCronRunSessionKey(key)) {
|
||||
return false;
|
||||
}
|
||||
if (!includeGlobal && key === "global") {
|
||||
return false;
|
||||
}
|
||||
if (!includeUnknown && key === "unknown") {
|
||||
return false;
|
||||
}
|
||||
if (agentId) {
|
||||
if (key === "global" || key === "unknown") {
|
||||
return false;
|
||||
}
|
||||
const parsed = parseAgentSessionKey(key);
|
||||
if (!parsed) {
|
||||
return false;
|
||||
}
|
||||
return normalizeAgentId(parsed.agentId) === agentId;
|
||||
}
|
||||
return true;
|
||||
})
|
||||
.filter(([key, entry]) => {
|
||||
if (!spawnedBy) {
|
||||
return true;
|
||||
}
|
||||
if (key === "unknown" || key === "global") {
|
||||
return false;
|
||||
}
|
||||
return entry?.spawnedBy === spawnedBy;
|
||||
})
|
||||
.filter(([, entry]) => {
|
||||
if (!label) {
|
||||
return true;
|
||||
}
|
||||
return entry?.label === label;
|
||||
})
|
||||
.map(([key, entry]) =>
|
||||
buildGatewaySessionRow({
|
||||
cfg,
|
||||
storePath,
|
||||
store,
|
||||
key,
|
||||
entry,
|
||||
now,
|
||||
includeDerivedTitles,
|
||||
includeLastMessage,
|
||||
}),
|
||||
)
|
||||
.toSorted((a, b) => (b.updatedAt ?? 0) - (a.updatedAt ?? 0));
|
||||
|
||||
if (search) {
|
||||
sessions = sessions.filter((s) => {
|
||||
const fields = [s.displayName, s.label, s.subject, s.sessionId, s.key];
|
||||
return fields.some((f) => typeof f === "string" && f.toLowerCase().includes(search));
|
||||
});
|
||||
}
|
||||
|
||||
if (activeMinutes !== undefined) {
|
||||
const cutoff = now - activeMinutes * 60_000;
|
||||
sessions = sessions.filter((s) => (s.updatedAt ?? 0) >= cutoff);
|
||||
}
|
||||
|
||||
if (typeof opts.limit === "number" && Number.isFinite(opts.limit)) {
|
||||
const limit = Math.max(1, Math.floor(opts.limit));
|
||||
sessions = sessions.slice(0, limit);
|
||||
}
|
||||
let sessions = filterSessionListEntries(store, normalizedOpts).map(([key, entry]) =>
|
||||
buildGatewaySessionRow({
|
||||
cfg,
|
||||
storePath,
|
||||
store,
|
||||
key,
|
||||
entry,
|
||||
now: normalizedOpts.now,
|
||||
includeDerivedTitles: normalizedOpts.includeDerivedTitles,
|
||||
includeLastMessage: normalizedOpts.includeLastMessage,
|
||||
}),
|
||||
);
|
||||
sessions = finalizeSessionListRows(sessions, normalizedOpts);
|
||||
|
||||
return {
|
||||
ts: now,
|
||||
ts: normalizedOpts.now,
|
||||
path: storePath,
|
||||
count: sessions.length,
|
||||
defaults: getSessionDefaults(cfg),
|
||||
sessions,
|
||||
};
|
||||
}
|
||||
|
||||
export async function listSessionsFromStoreAsync(params: {
|
||||
cfg: OpenClawConfig;
|
||||
storePath: string;
|
||||
store: Record<string, SessionEntry>;
|
||||
opts: SessionsListParams;
|
||||
}): Promise<SessionsListResult> {
|
||||
const concurrency = resolveSessionsListFallbackConcurrency(params.cfg);
|
||||
if (concurrency <= DEFAULT_SESSIONS_LIST_FALLBACK_CONCURRENCY) {
|
||||
return listSessionsFromStore(params);
|
||||
}
|
||||
|
||||
const normalizedOpts = normalizeSessionsListOptions(params.opts);
|
||||
const filteredEntries = filterSessionListEntries(params.store, normalizedOpts);
|
||||
const transcriptUsageByKey = new Map<string, GatewayTranscriptUsageFallback | null>();
|
||||
const entriesNeedingFallback = filteredEntries.flatMap(([key, entry]) => {
|
||||
const resolvedModel = resolveSessionEntryModelIdentity({
|
||||
cfg: params.cfg,
|
||||
key,
|
||||
entry,
|
||||
});
|
||||
if (
|
||||
!shouldResolveTranscriptUsageFallback({
|
||||
cfg: params.cfg,
|
||||
entry,
|
||||
fallbackProvider: resolvedModel.provider,
|
||||
fallbackModel: resolvedModel.model,
|
||||
})
|
||||
) {
|
||||
return [];
|
||||
}
|
||||
return [{ key, entry, resolvedModel }];
|
||||
});
|
||||
const tasks = entriesNeedingFallback.map(({ key, entry, resolvedModel }) => async () => {
|
||||
const usage = await resolveTranscriptUsageFallbackAsync({
|
||||
cfg: params.cfg,
|
||||
key,
|
||||
entry,
|
||||
storePath: params.storePath,
|
||||
fallbackProvider: resolvedModel.provider,
|
||||
fallbackModel: resolvedModel.model,
|
||||
});
|
||||
return { key, usage };
|
||||
});
|
||||
|
||||
if (tasks.length > 0) {
|
||||
const { results } = await runTasksWithConcurrency({
|
||||
tasks,
|
||||
limit: concurrency,
|
||||
errorMode: "continue",
|
||||
});
|
||||
for (const result of results) {
|
||||
if (result?.key) {
|
||||
transcriptUsageByKey.set(result.key, result.usage ?? null);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
let sessions = filteredEntries.map(([key, entry]) =>
|
||||
buildGatewaySessionRow({
|
||||
cfg: params.cfg,
|
||||
storePath: params.storePath,
|
||||
store: params.store,
|
||||
key,
|
||||
entry,
|
||||
now: normalizedOpts.now,
|
||||
includeDerivedTitles: normalizedOpts.includeDerivedTitles,
|
||||
includeLastMessage: normalizedOpts.includeLastMessage,
|
||||
transcriptUsage: transcriptUsageByKey.get(key) ?? null,
|
||||
skipTranscriptUsageFallback: transcriptUsageByKey.has(key),
|
||||
}),
|
||||
);
|
||||
sessions = finalizeSessionListRows(sessions, normalizedOpts);
|
||||
|
||||
return {
|
||||
ts: normalizedOpts.now,
|
||||
path: params.storePath,
|
||||
count: sessions.length,
|
||||
defaults: getSessionDefaults(params.cfg),
|
||||
sessions,
|
||||
};
|
||||
}
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user