fix: tighten dashboard session API metadata
This commit is contained in:
parent
545f015f3b
commit
6f9e2b664c
@ -129,12 +129,11 @@ describe("openclaw-tools: subagents (sessions_spawn model + thinking)", () => {
|
||||
expect(patchIndex).toBeGreaterThan(-1);
|
||||
expect(agentIndex).toBeGreaterThan(-1);
|
||||
expect(patchIndex).toBeLessThan(agentIndex);
|
||||
const patchCall = calls.find(
|
||||
(call) => call.method === "sessions.patch" && (call.params as { model?: string })?.model,
|
||||
);
|
||||
expect(patchCall?.params).toMatchObject({
|
||||
const patchCalls = calls.filter((call) => call.method === "sessions.patch");
|
||||
expect(patchCalls[0]?.params).toMatchObject({
|
||||
key: expect.stringContaining("subagent:"),
|
||||
model: "claude-haiku-4-5",
|
||||
spawnDepth: 1,
|
||||
});
|
||||
});
|
||||
|
||||
|
||||
@ -438,42 +438,29 @@ export async function spawnSubagentDirect(
|
||||
}
|
||||
};
|
||||
|
||||
const spawnDepthPatchError = await patchChildSession({
|
||||
const initialChildSessionPatch: Record<string, unknown> = {
|
||||
spawnDepth: childDepth,
|
||||
subagentRole: childCapabilities.role === "main" ? null : childCapabilities.role,
|
||||
subagentControlScope: childCapabilities.controlScope,
|
||||
});
|
||||
if (spawnDepthPatchError) {
|
||||
};
|
||||
if (resolvedModel) {
|
||||
initialChildSessionPatch.model = resolvedModel;
|
||||
}
|
||||
if (thinkingOverride !== undefined) {
|
||||
initialChildSessionPatch.thinkingLevel = thinkingOverride === "off" ? null : thinkingOverride;
|
||||
}
|
||||
|
||||
const initialPatchError = await patchChildSession(initialChildSessionPatch);
|
||||
if (initialPatchError) {
|
||||
return {
|
||||
status: "error",
|
||||
error: spawnDepthPatchError,
|
||||
error: initialPatchError,
|
||||
childSessionKey,
|
||||
};
|
||||
}
|
||||
|
||||
if (resolvedModel) {
|
||||
const modelPatchError = await patchChildSession({ model: resolvedModel });
|
||||
if (modelPatchError) {
|
||||
return {
|
||||
status: "error",
|
||||
error: modelPatchError,
|
||||
childSessionKey,
|
||||
};
|
||||
}
|
||||
modelApplied = true;
|
||||
}
|
||||
if (thinkingOverride !== undefined) {
|
||||
const thinkingPatchError = await patchChildSession({
|
||||
thinkingLevel: thinkingOverride === "off" ? null : thinkingOverride,
|
||||
});
|
||||
if (thinkingPatchError) {
|
||||
return {
|
||||
status: "error",
|
||||
error: thinkingPatchError,
|
||||
childSessionKey,
|
||||
};
|
||||
}
|
||||
}
|
||||
if (requestThreadBinding) {
|
||||
const bindResult = await ensureThreadBindingForSubagentSpawn({
|
||||
hookRunner,
|
||||
|
||||
@ -63,11 +63,31 @@ export function createDiscordGatewayPlugin(params: {
|
||||
},
|
||||
dispatcher: fetchAgent,
|
||||
} as Record<string, unknown>);
|
||||
this.gatewayInfo = (await response.json()) as APIGatewayBotInfo;
|
||||
const bodyText = await response.text();
|
||||
if (!response.ok) {
|
||||
const preview = bodyText.trim().slice(0, 160) || `<http ${response.status}>`;
|
||||
params.runtime.error?.(
|
||||
danger(
|
||||
`discord: failed to fetch gateway metadata through proxy, status=${response.status}, body=${JSON.stringify(preview)}`,
|
||||
),
|
||||
);
|
||||
} else {
|
||||
try {
|
||||
this.gatewayInfo = JSON.parse(bodyText) as APIGatewayBotInfo;
|
||||
} catch (error) {
|
||||
const preview = bodyText.trim().slice(0, 160) || "<empty>";
|
||||
params.runtime.error?.(
|
||||
danger(
|
||||
`discord: invalid gateway metadata response through proxy, body=${JSON.stringify(preview)}, error=${error instanceof Error ? error.message : String(error)}`,
|
||||
),
|
||||
);
|
||||
}
|
||||
}
|
||||
} catch (error) {
|
||||
throw new Error(
|
||||
`Failed to get gateway information from Discord: ${error instanceof Error ? error.message : String(error)}`,
|
||||
{ cause: error },
|
||||
params.runtime.error?.(
|
||||
danger(
|
||||
`discord: failed to fetch gateway metadata through proxy: ${error instanceof Error ? error.message : String(error)}`,
|
||||
),
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
@ -169,7 +169,9 @@ describe("createDiscordGatewayPlugin", () => {
|
||||
it("uses proxy fetch for gateway metadata lookup before registering", async () => {
|
||||
const runtime = createRuntime();
|
||||
undiciFetchMock.mockResolvedValue({
|
||||
json: async () => ({ url: "wss://gateway.discord.gg" }),
|
||||
ok: true,
|
||||
status: 200,
|
||||
text: async () => JSON.stringify({ url: "wss://gateway.discord.gg" }),
|
||||
} as Response);
|
||||
const plugin = createDiscordGatewayPlugin({
|
||||
discordConfig: { proxy: "http://proxy.test:8080" },
|
||||
@ -193,5 +195,62 @@ describe("createDiscordGatewayPlugin", () => {
|
||||
}),
|
||||
);
|
||||
expect(baseRegisterClientSpy).toHaveBeenCalledTimes(1);
|
||||
expect(runtime.error).not.toHaveBeenCalled();
|
||||
});
|
||||
|
||||
it("logs and continues when Discord returns invalid JSON through the proxy", async () => {
|
||||
const runtime = createRuntime();
|
||||
undiciFetchMock.mockResolvedValue({
|
||||
ok: true,
|
||||
status: 200,
|
||||
text: async () => "upstream c",
|
||||
} as Response);
|
||||
const plugin = createDiscordGatewayPlugin({
|
||||
discordConfig: { proxy: "http://proxy.test:8080" },
|
||||
runtime,
|
||||
});
|
||||
|
||||
await expect(
|
||||
(
|
||||
plugin as unknown as {
|
||||
registerClient: (client: { options: { token: string } }) => Promise<void>;
|
||||
}
|
||||
).registerClient({
|
||||
options: { token: "token-123" },
|
||||
}),
|
||||
).resolves.toBeUndefined();
|
||||
|
||||
expect(baseRegisterClientSpy).toHaveBeenCalledTimes(1);
|
||||
expect(runtime.error).toHaveBeenCalledWith(
|
||||
expect.stringContaining("invalid gateway metadata response through proxy"),
|
||||
);
|
||||
});
|
||||
|
||||
it("logs non-200 proxy responses instead of throwing", async () => {
|
||||
const runtime = createRuntime();
|
||||
undiciFetchMock.mockResolvedValue({
|
||||
ok: false,
|
||||
status: 502,
|
||||
text: async () => "upstream crash",
|
||||
} as Response);
|
||||
const plugin = createDiscordGatewayPlugin({
|
||||
discordConfig: { proxy: "http://proxy.test:8080" },
|
||||
runtime,
|
||||
});
|
||||
|
||||
await expect(
|
||||
(
|
||||
plugin as unknown as {
|
||||
registerClient: (client: { options: { token: string } }) => Promise<void>;
|
||||
}
|
||||
).registerClient({
|
||||
options: { token: "token-123" },
|
||||
}),
|
||||
).resolves.toBeUndefined();
|
||||
|
||||
expect(baseRegisterClientSpy).toHaveBeenCalledTimes(1);
|
||||
expect(runtime.error).toHaveBeenCalledWith(
|
||||
expect.stringContaining("failed to fetch gateway metadata through proxy, status=502"),
|
||||
);
|
||||
});
|
||||
});
|
||||
|
||||
@ -5,6 +5,7 @@ import { resolveHeartbeatVisibility } from "../infra/heartbeat-visibility.js";
|
||||
import {
|
||||
createAgentEventHandler,
|
||||
createChatRunState,
|
||||
createSessionEventSubscriberRegistry,
|
||||
createToolEventRecipientRegistry,
|
||||
} from "./server-chat.js";
|
||||
|
||||
@ -47,6 +48,7 @@ describe("agent event handler", () => {
|
||||
const agentRunSeq = new Map<string, number>();
|
||||
const chatRunState = createChatRunState();
|
||||
const toolEventRecipients = createToolEventRecipientRegistry();
|
||||
const sessionEventSubscribers = createSessionEventSubscriberRegistry();
|
||||
|
||||
const handler = createAgentEventHandler({
|
||||
broadcast,
|
||||
@ -57,6 +59,7 @@ describe("agent event handler", () => {
|
||||
resolveSessionKeyForRun: params?.resolveSessionKeyForRun ?? (() => undefined),
|
||||
clearAgentRunContext: vi.fn(),
|
||||
toolEventRecipients,
|
||||
sessionEventSubscribers,
|
||||
});
|
||||
|
||||
return {
|
||||
|
||||
@ -5,7 +5,7 @@ import { loadConfig } from "../config/config.js";
|
||||
import { type AgentEventPayload, getAgentRunContext } from "../infra/agent-events.js";
|
||||
import { resolveHeartbeatVisibility } from "../infra/heartbeat-visibility.js";
|
||||
import { stripInlineDirectiveTagsForDisplay } from "../utils/directive-tags.js";
|
||||
import { loadSessionEntry } from "./session-utils.js";
|
||||
import { loadGatewaySessionRow, loadSessionEntry } from "./session-utils.js";
|
||||
import { formatForLog } from "./ws-log.js";
|
||||
|
||||
function resolveHeartbeatAckMaxChars(): number {
|
||||
@ -459,12 +459,24 @@ export function createAgentEventHandler({
|
||||
toolEventRecipients,
|
||||
sessionEventSubscribers,
|
||||
}: AgentEventHandlerOptions) {
|
||||
const emitSessionEvent = (event: string, payload: unknown) => {
|
||||
const connIds = sessionEventSubscribers.getAll();
|
||||
if (connIds.size === 0) {
|
||||
return;
|
||||
const buildSessionEventSnapshot = (sessionKey: string) => {
|
||||
const row = loadGatewaySessionRow(sessionKey);
|
||||
if (!row) {
|
||||
return {};
|
||||
}
|
||||
broadcastToConnIds(event, payload, connIds, { dropIfSlow: true });
|
||||
return {
|
||||
session: row,
|
||||
totalTokens: row.totalTokens,
|
||||
totalTokensFresh: row.totalTokensFresh,
|
||||
contextTokens: row.contextTokens,
|
||||
estimatedCostUsd: row.estimatedCostUsd,
|
||||
modelProvider: row.modelProvider,
|
||||
model: row.model,
|
||||
status: row.status,
|
||||
startedAt: row.startedAt,
|
||||
endedAt: row.endedAt,
|
||||
runtimeMs: row.runtimeMs,
|
||||
};
|
||||
};
|
||||
|
||||
const emitChatDelta = (
|
||||
@ -778,12 +790,21 @@ export function createAgentEventHandler({
|
||||
sessionKey &&
|
||||
(lifecyclePhase === "start" || lifecyclePhase === "end" || lifecyclePhase === "error")
|
||||
) {
|
||||
emitSessionEvent("sessions.changed", {
|
||||
sessionKey,
|
||||
phase: lifecyclePhase,
|
||||
runId: evt.runId,
|
||||
ts: evt.ts,
|
||||
});
|
||||
const sessionEventConnIds = sessionEventSubscribers.getAll();
|
||||
if (sessionEventConnIds.size > 0) {
|
||||
broadcastToConnIds(
|
||||
"sessions.changed",
|
||||
{
|
||||
sessionKey,
|
||||
phase: lifecyclePhase,
|
||||
runId: evt.runId,
|
||||
ts: evt.ts,
|
||||
...buildSessionEventSnapshot(sessionKey),
|
||||
},
|
||||
sessionEventConnIds,
|
||||
{ dropIfSlow: true },
|
||||
);
|
||||
}
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
@ -118,6 +118,7 @@ import { loadGatewayTlsRuntime } from "./server/tls.js";
|
||||
import { resolveSessionKeyForTranscriptFile } from "./session-transcript-key.js";
|
||||
import {
|
||||
attachOpenClawTranscriptMeta,
|
||||
loadGatewaySessionRow,
|
||||
loadSessionEntry,
|
||||
readSessionMessages,
|
||||
} from "./session-utils.js";
|
||||
@ -784,6 +785,22 @@ export async function startGatewayServer(
|
||||
const messageSeq = entry?.sessionId
|
||||
? readSessionMessages(entry.sessionId, storePath, entry.sessionFile).length
|
||||
: undefined;
|
||||
const sessionRow = loadGatewaySessionRow(sessionKey);
|
||||
const sessionSnapshot = sessionRow
|
||||
? {
|
||||
session: sessionRow,
|
||||
totalTokens: sessionRow.totalTokens,
|
||||
totalTokensFresh: sessionRow.totalTokensFresh,
|
||||
contextTokens: sessionRow.contextTokens,
|
||||
estimatedCostUsd: sessionRow.estimatedCostUsd,
|
||||
modelProvider: sessionRow.modelProvider,
|
||||
model: sessionRow.model,
|
||||
status: sessionRow.status,
|
||||
startedAt: sessionRow.startedAt,
|
||||
endedAt: sessionRow.endedAt,
|
||||
runtimeMs: sessionRow.runtimeMs,
|
||||
}
|
||||
: {};
|
||||
const message = attachOpenClawTranscriptMeta(update.message, {
|
||||
...(typeof update.messageId === "string" ? { id: update.messageId } : {}),
|
||||
...(typeof messageSeq === "number" ? { seq: messageSeq } : {}),
|
||||
@ -795,10 +812,28 @@ export async function startGatewayServer(
|
||||
message,
|
||||
...(typeof update.messageId === "string" ? { messageId: update.messageId } : {}),
|
||||
...(typeof messageSeq === "number" ? { messageSeq } : {}),
|
||||
...sessionSnapshot,
|
||||
},
|
||||
connIds,
|
||||
{ dropIfSlow: true },
|
||||
);
|
||||
|
||||
const sessionEventConnIds = sessionEventSubscribers.getAll();
|
||||
if (sessionEventConnIds.size > 0) {
|
||||
broadcastToConnIds(
|
||||
"sessions.changed",
|
||||
{
|
||||
sessionKey,
|
||||
phase: "message",
|
||||
ts: Date.now(),
|
||||
...(typeof update.messageId === "string" ? { messageId: update.messageId } : {}),
|
||||
...(typeof messageSeq === "number" ? { messageSeq } : {}),
|
||||
...sessionSnapshot,
|
||||
},
|
||||
sessionEventConnIds,
|
||||
{ dropIfSlow: true },
|
||||
);
|
||||
}
|
||||
});
|
||||
|
||||
let heartbeatRunner: HeartbeatRunner = minimalTestGateway
|
||||
|
||||
@ -3,6 +3,7 @@ import os from "node:os";
|
||||
import path from "node:path";
|
||||
import { afterEach, describe, expect, test } from "vitest";
|
||||
import { appendAssistantMessageToSessionTranscript } from "../config/sessions/transcript.js";
|
||||
import { emitSessionTranscriptUpdate } from "../sessions/transcript-events.js";
|
||||
import { testState } from "./test-helpers.mocks.js";
|
||||
import {
|
||||
connectOk,
|
||||
@ -155,6 +156,114 @@ describe("session.message websocket events", () => {
|
||||
}
|
||||
});
|
||||
|
||||
test("includes live usage metadata on session.message and sessions.changed transcript events", async () => {
|
||||
const storePath = await createSessionStoreFile();
|
||||
await writeSessionStore({
|
||||
entries: {
|
||||
main: {
|
||||
sessionId: "sess-main",
|
||||
updatedAt: Date.now(),
|
||||
modelProvider: "openai",
|
||||
model: "gpt-5.4",
|
||||
contextTokens: 123_456,
|
||||
totalTokens: 0,
|
||||
totalTokensFresh: false,
|
||||
},
|
||||
},
|
||||
storePath,
|
||||
});
|
||||
const transcriptPath = path.join(path.dirname(storePath), "sess-main.jsonl");
|
||||
const transcriptMessage = {
|
||||
role: "assistant",
|
||||
content: [{ type: "text", text: "usage snapshot" }],
|
||||
provider: "openai",
|
||||
model: "gpt-5.4",
|
||||
usage: {
|
||||
input: 2_000,
|
||||
output: 400,
|
||||
cacheRead: 300,
|
||||
cacheWrite: 100,
|
||||
cost: { total: 0.0042 },
|
||||
},
|
||||
timestamp: Date.now(),
|
||||
};
|
||||
await fs.writeFile(
|
||||
transcriptPath,
|
||||
[
|
||||
JSON.stringify({ type: "session", version: 1, id: "sess-main" }),
|
||||
JSON.stringify({ id: "msg-usage", message: transcriptMessage }),
|
||||
].join("\n"),
|
||||
"utf-8",
|
||||
);
|
||||
|
||||
const harness = await createGatewaySuiteHarness();
|
||||
try {
|
||||
const ws = await harness.openWs();
|
||||
try {
|
||||
await connectOk(ws, { scopes: ["operator.read"] });
|
||||
await rpcReq(ws, "sessions.subscribe");
|
||||
|
||||
const messageEventPromise = onceMessage(
|
||||
ws,
|
||||
(message) =>
|
||||
message.type === "event" &&
|
||||
message.event === "session.message" &&
|
||||
(message.payload as { sessionKey?: string } | undefined)?.sessionKey ===
|
||||
"agent:main:main",
|
||||
);
|
||||
const changedEventPromise = onceMessage(
|
||||
ws,
|
||||
(message) =>
|
||||
message.type === "event" &&
|
||||
message.event === "sessions.changed" &&
|
||||
(message.payload as { phase?: string; sessionKey?: string } | undefined)?.phase ===
|
||||
"message" &&
|
||||
(message.payload as { sessionKey?: string } | undefined)?.sessionKey ===
|
||||
"agent:main:main",
|
||||
);
|
||||
|
||||
emitSessionTranscriptUpdate({
|
||||
sessionFile: transcriptPath,
|
||||
sessionKey: "agent:main:main",
|
||||
message: transcriptMessage,
|
||||
messageId: "msg-usage",
|
||||
});
|
||||
|
||||
const [messageEvent, changedEvent] = await Promise.all([
|
||||
messageEventPromise,
|
||||
changedEventPromise,
|
||||
]);
|
||||
expect(messageEvent.payload).toMatchObject({
|
||||
sessionKey: "agent:main:main",
|
||||
messageId: "msg-usage",
|
||||
messageSeq: 1,
|
||||
totalTokens: 2_400,
|
||||
totalTokensFresh: true,
|
||||
contextTokens: 123_456,
|
||||
estimatedCostUsd: 0.0042,
|
||||
modelProvider: "openai",
|
||||
model: "gpt-5.4",
|
||||
});
|
||||
expect(changedEvent.payload).toMatchObject({
|
||||
sessionKey: "agent:main:main",
|
||||
phase: "message",
|
||||
messageId: "msg-usage",
|
||||
messageSeq: 1,
|
||||
totalTokens: 2_400,
|
||||
totalTokensFresh: true,
|
||||
contextTokens: 123_456,
|
||||
estimatedCostUsd: 0.0042,
|
||||
modelProvider: "openai",
|
||||
model: "gpt-5.4",
|
||||
});
|
||||
} finally {
|
||||
ws.close();
|
||||
}
|
||||
} finally {
|
||||
await harness.close();
|
||||
}
|
||||
});
|
||||
|
||||
test("sessions.messages.subscribe only delivers transcript events for the requested session", async () => {
|
||||
const storePath = await createSessionStoreFile();
|
||||
await writeSessionStore({
|
||||
|
||||
@ -699,6 +699,46 @@ describe("readLatestSessionUsageFromTranscript", () => {
|
||||
});
|
||||
});
|
||||
|
||||
test("backfills missing model and cost fields from earlier assistant usage snapshots", () => {
|
||||
const sessionId = "usage-aggregate";
|
||||
writeTranscript(tmpDir, sessionId, [
|
||||
{ type: "session", version: 1, id: sessionId },
|
||||
{
|
||||
message: {
|
||||
role: "assistant",
|
||||
provider: "anthropic",
|
||||
model: "claude-sonnet-4-6",
|
||||
usage: {
|
||||
input: 1_800,
|
||||
output: 400,
|
||||
cacheRead: 600,
|
||||
cost: { total: 0.0055 },
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
message: {
|
||||
role: "assistant",
|
||||
usage: {
|
||||
input: 2_400,
|
||||
cacheRead: 900,
|
||||
},
|
||||
},
|
||||
},
|
||||
]);
|
||||
|
||||
expect(readLatestSessionUsageFromTranscript(sessionId, storePath)).toEqual({
|
||||
modelProvider: "anthropic",
|
||||
model: "claude-sonnet-4-6",
|
||||
inputTokens: 2400,
|
||||
outputTokens: 400,
|
||||
cacheRead: 900,
|
||||
totalTokens: 3300,
|
||||
totalTokensFresh: true,
|
||||
costUsd: 0.0055,
|
||||
});
|
||||
});
|
||||
|
||||
test("returns null when the transcript has no assistant usage snapshot", () => {
|
||||
const sessionId = "usage-empty";
|
||||
writeTranscript(tmpDir, sessionId, [
|
||||
|
||||
@ -594,10 +594,17 @@ function readTailChunk(fd: number, size: number, maxBytes: number): string | nul
|
||||
return buf.toString("utf-8");
|
||||
}
|
||||
|
||||
function resolvePositiveUsageNumber(value: unknown): number | undefined {
|
||||
return typeof value === "number" && Number.isFinite(value) && value > 0 ? value : undefined;
|
||||
}
|
||||
|
||||
function extractLatestUsageFromTranscriptChunk(
|
||||
chunk: string,
|
||||
): SessionTranscriptUsageSnapshot | null {
|
||||
const lines = chunk.split(/\r?\n/).filter((line) => line.trim().length > 0);
|
||||
const snapshot: SessionTranscriptUsageSnapshot = {};
|
||||
let sawSnapshot = false;
|
||||
|
||||
for (let i = lines.length - 1; i >= 0; i -= 1) {
|
||||
const line = lines[i];
|
||||
try {
|
||||
@ -620,7 +627,7 @@ function extractLatestUsageFromTranscriptChunk(
|
||||
? parsed.usage
|
||||
: undefined;
|
||||
const usage = normalizeUsage(usageRaw);
|
||||
const totalTokens = deriveSessionTotalTokens({ usage });
|
||||
const totalTokens = resolvePositiveUsageNumber(deriveSessionTotalTokens({ usage }));
|
||||
const costUsd = extractTranscriptUsageCost(usageRaw);
|
||||
const modelProvider =
|
||||
typeof message.provider === "string"
|
||||
@ -637,8 +644,8 @@ function extractLatestUsageFromTranscriptChunk(
|
||||
const isDeliveryMirror = modelProvider === "openclaw" && model === "delivery-mirror";
|
||||
const hasMeaningfulUsage =
|
||||
hasNonzeroUsage(usage) ||
|
||||
(typeof totalTokens === "number" && Number.isFinite(totalTokens) && totalTokens > 0) ||
|
||||
(typeof costUsd === "number" && Number.isFinite(costUsd) && costUsd > 0);
|
||||
typeof totalTokens === "number" ||
|
||||
(typeof costUsd === "number" && Number.isFinite(costUsd));
|
||||
const hasModelIdentity = Boolean(modelProvider || model);
|
||||
if (!hasMeaningfulUsage && !hasModelIdentity) {
|
||||
continue;
|
||||
@ -646,23 +653,51 @@ function extractLatestUsageFromTranscriptChunk(
|
||||
if (isDeliveryMirror && !hasMeaningfulUsage) {
|
||||
continue;
|
||||
}
|
||||
return {
|
||||
...(modelProvider ? { modelProvider } : {}),
|
||||
...(model ? { model } : {}),
|
||||
...(typeof usage?.input === "number" ? { inputTokens: usage.input } : {}),
|
||||
...(typeof usage?.output === "number" ? { outputTokens: usage.output } : {}),
|
||||
...(typeof usage?.cacheRead === "number" ? { cacheRead: usage.cacheRead } : {}),
|
||||
...(typeof usage?.cacheWrite === "number" ? { cacheWrite: usage.cacheWrite } : {}),
|
||||
...(typeof totalTokens === "number" && Number.isFinite(totalTokens) && totalTokens > 0
|
||||
? { totalTokens, totalTokensFresh: true }
|
||||
: {}),
|
||||
...(typeof costUsd === "number" && Number.isFinite(costUsd) ? { costUsd } : {}),
|
||||
};
|
||||
|
||||
sawSnapshot = true;
|
||||
if (!snapshot.modelProvider && modelProvider) {
|
||||
snapshot.modelProvider = modelProvider;
|
||||
}
|
||||
if (!snapshot.model && model) {
|
||||
snapshot.model = model;
|
||||
}
|
||||
if (snapshot.inputTokens === undefined) {
|
||||
snapshot.inputTokens = resolvePositiveUsageNumber(usage?.input);
|
||||
}
|
||||
if (snapshot.outputTokens === undefined) {
|
||||
snapshot.outputTokens = resolvePositiveUsageNumber(usage?.output);
|
||||
}
|
||||
if (snapshot.cacheRead === undefined) {
|
||||
snapshot.cacheRead = resolvePositiveUsageNumber(usage?.cacheRead);
|
||||
}
|
||||
if (snapshot.cacheWrite === undefined) {
|
||||
snapshot.cacheWrite = resolvePositiveUsageNumber(usage?.cacheWrite);
|
||||
}
|
||||
if (snapshot.totalTokens === undefined && typeof totalTokens === "number") {
|
||||
snapshot.totalTokens = totalTokens;
|
||||
snapshot.totalTokensFresh = true;
|
||||
}
|
||||
if (
|
||||
snapshot.costUsd === undefined &&
|
||||
typeof costUsd === "number" &&
|
||||
Number.isFinite(costUsd)
|
||||
) {
|
||||
snapshot.costUsd = costUsd;
|
||||
}
|
||||
|
||||
if (
|
||||
snapshot.modelProvider &&
|
||||
snapshot.model &&
|
||||
snapshot.totalTokens !== undefined &&
|
||||
snapshot.costUsd !== undefined
|
||||
) {
|
||||
break;
|
||||
}
|
||||
} catch {
|
||||
// skip malformed lines
|
||||
}
|
||||
}
|
||||
return null;
|
||||
return sawSnapshot ? snapshot : null;
|
||||
}
|
||||
|
||||
export function readLatestSessionUsageFromTranscript(
|
||||
|
||||
@ -1,7 +1,7 @@
|
||||
import fs from "node:fs";
|
||||
import os from "node:os";
|
||||
import path from "node:path";
|
||||
import { afterEach, describe, expect, test } from "vitest";
|
||||
import { afterEach, beforeEach, describe, expect, test } from "vitest";
|
||||
import {
|
||||
addSubagentRunForTests,
|
||||
resetSubagentRegistryForTests,
|
||||
@ -941,9 +941,96 @@ describe("listSessionsFromStore search", () => {
|
||||
fs.rmSync(tmpDir, { recursive: true, force: true });
|
||||
}
|
||||
});
|
||||
|
||||
test("uses subagent run model immediately for child sessions while transcript usage fills live totals", () => {
|
||||
const tmpDir = fs.mkdtempSync(path.join(os.tmpdir(), "openclaw-session-utils-subagent-"));
|
||||
const storePath = path.join(tmpDir, "sessions.json");
|
||||
const now = Date.now();
|
||||
const cfg = {
|
||||
session: { mainKey: "main" },
|
||||
agents: {
|
||||
list: [{ id: "main", default: true }],
|
||||
defaults: {
|
||||
models: {
|
||||
"anthropic/claude-sonnet-4-6": { params: { context1m: true } },
|
||||
},
|
||||
},
|
||||
},
|
||||
} as unknown as OpenClawConfig;
|
||||
fs.writeFileSync(
|
||||
path.join(tmpDir, "sess-child.jsonl"),
|
||||
[
|
||||
JSON.stringify({ type: "session", version: 1, id: "sess-child" }),
|
||||
JSON.stringify({
|
||||
message: {
|
||||
role: "assistant",
|
||||
provider: "anthropic",
|
||||
model: "claude-sonnet-4-6",
|
||||
usage: {
|
||||
input: 2_000,
|
||||
output: 500,
|
||||
cacheRead: 1_200,
|
||||
cost: { total: 0.007725 },
|
||||
},
|
||||
},
|
||||
}),
|
||||
].join("\n"),
|
||||
"utf-8",
|
||||
);
|
||||
|
||||
addSubagentRunForTests({
|
||||
runId: "run-child-live",
|
||||
childSessionKey: "agent:main:subagent:child-live",
|
||||
controllerSessionKey: "agent:main:main",
|
||||
requesterSessionKey: "agent:main:main",
|
||||
requesterDisplayKey: "main",
|
||||
task: "child task",
|
||||
cleanup: "keep",
|
||||
createdAt: now - 5_000,
|
||||
startedAt: now - 4_000,
|
||||
model: "anthropic/claude-sonnet-4-6",
|
||||
});
|
||||
|
||||
try {
|
||||
const result = listSessionsFromStore({
|
||||
cfg,
|
||||
storePath,
|
||||
store: {
|
||||
"agent:main:subagent:child-live": {
|
||||
sessionId: "sess-child",
|
||||
updatedAt: now,
|
||||
spawnedBy: "agent:main:main",
|
||||
totalTokens: 0,
|
||||
totalTokensFresh: false,
|
||||
} as SessionEntry,
|
||||
},
|
||||
opts: {},
|
||||
});
|
||||
|
||||
expect(result.sessions[0]).toMatchObject({
|
||||
key: "agent:main:subagent:child-live",
|
||||
status: "running",
|
||||
modelProvider: "anthropic",
|
||||
model: "claude-sonnet-4-6",
|
||||
totalTokens: 3_200,
|
||||
totalTokensFresh: true,
|
||||
contextTokens: 1_048_576,
|
||||
});
|
||||
expect(result.sessions[0]?.estimatedCostUsd).toBeCloseTo(0.007725, 8);
|
||||
} finally {
|
||||
fs.rmSync(tmpDir, { recursive: true, force: true });
|
||||
}
|
||||
});
|
||||
});
|
||||
|
||||
describe("listSessionsFromStore subagent metadata", () => {
|
||||
afterEach(() => {
|
||||
resetSubagentRegistryForTests({ persist: false });
|
||||
});
|
||||
beforeEach(() => {
|
||||
resetSubagentRegistryForTests({ persist: false });
|
||||
});
|
||||
|
||||
const cfg = {
|
||||
session: { mainKey: "main" },
|
||||
agents: { list: [{ id: "main", default: true }] },
|
||||
|
||||
@ -297,6 +297,8 @@ function resolveTranscriptUsageFallback(params: {
|
||||
key: string;
|
||||
entry?: SessionEntry;
|
||||
storePath: string;
|
||||
fallbackProvider?: string;
|
||||
fallbackModel?: string;
|
||||
}): {
|
||||
estimatedCostUsd?: number;
|
||||
totalTokens?: number;
|
||||
@ -320,15 +322,17 @@ function resolveTranscriptUsageFallback(params: {
|
||||
if (!snapshot) {
|
||||
return null;
|
||||
}
|
||||
const modelProvider = snapshot.modelProvider ?? params.fallbackProvider;
|
||||
const model = snapshot.model ?? params.fallbackModel;
|
||||
const contextTokens = resolveContextTokensForModel({
|
||||
cfg: params.cfg,
|
||||
provider: snapshot.modelProvider,
|
||||
model: snapshot.model,
|
||||
provider: modelProvider,
|
||||
model,
|
||||
});
|
||||
const estimatedCostUsd = resolveEstimatedSessionCostUsd({
|
||||
cfg: params.cfg,
|
||||
provider: snapshot.modelProvider,
|
||||
model: snapshot.model,
|
||||
provider: modelProvider,
|
||||
model,
|
||||
explicitCostUsd: snapshot.costUsd,
|
||||
entry: {
|
||||
inputTokens: snapshot.inputTokens,
|
||||
@ -959,6 +963,7 @@ export function resolveSessionModelIdentityRef(
|
||||
| SessionEntry
|
||||
| Pick<SessionEntry, "model" | "modelProvider" | "modelOverride" | "providerOverride">,
|
||||
agentId?: string,
|
||||
fallbackModelRef?: string,
|
||||
): { provider?: string; model: string } {
|
||||
const runtimeModel = entry?.model?.trim();
|
||||
const runtimeProvider = entry?.modelProvider?.trim();
|
||||
@ -982,10 +987,198 @@ export function resolveSessionModelIdentityRef(
|
||||
}
|
||||
return { model: runtimeModel };
|
||||
}
|
||||
const fallbackRef = fallbackModelRef?.trim();
|
||||
if (fallbackRef) {
|
||||
const parsedFallback = parseModelRef(fallbackRef, DEFAULT_PROVIDER);
|
||||
if (parsedFallback) {
|
||||
return { provider: parsedFallback.provider, model: parsedFallback.model };
|
||||
}
|
||||
const inferredProvider = inferUniqueProviderFromConfiguredModels({
|
||||
cfg,
|
||||
model: fallbackRef,
|
||||
});
|
||||
if (inferredProvider) {
|
||||
return { provider: inferredProvider, model: fallbackRef };
|
||||
}
|
||||
return { model: fallbackRef };
|
||||
}
|
||||
const resolved = resolveSessionModelRef(cfg, entry, agentId);
|
||||
return { provider: resolved.provider, model: resolved.model };
|
||||
}
|
||||
|
||||
export function buildGatewaySessionRow(params: {
|
||||
cfg: OpenClawConfig;
|
||||
storePath: string;
|
||||
store: Record<string, SessionEntry>;
|
||||
key: string;
|
||||
entry?: SessionEntry;
|
||||
now?: number;
|
||||
includeDerivedTitles?: boolean;
|
||||
includeLastMessage?: boolean;
|
||||
}): GatewaySessionRow {
|
||||
const { cfg, storePath, store, key, entry } = params;
|
||||
const now = params.now ?? Date.now();
|
||||
const updatedAt = entry?.updatedAt ?? null;
|
||||
const parsed = parseGroupKey(key);
|
||||
const channel = entry?.channel ?? parsed?.channel;
|
||||
const subject = entry?.subject;
|
||||
const groupChannel = entry?.groupChannel;
|
||||
const space = entry?.space;
|
||||
const id = parsed?.id;
|
||||
const origin = entry?.origin;
|
||||
const originLabel = origin?.label;
|
||||
const displayName =
|
||||
entry?.displayName ??
|
||||
(channel
|
||||
? buildGroupDisplayName({
|
||||
provider: channel,
|
||||
subject,
|
||||
groupChannel,
|
||||
space,
|
||||
id,
|
||||
key,
|
||||
})
|
||||
: undefined) ??
|
||||
entry?.label ??
|
||||
originLabel;
|
||||
const deliveryFields = normalizeSessionDeliveryFields(entry);
|
||||
const parsedAgent = parseAgentSessionKey(key);
|
||||
const sessionAgentId = normalizeAgentId(parsedAgent?.agentId ?? resolveDefaultAgentId(cfg));
|
||||
const subagentRun = getSubagentRunByChildSessionKey(key);
|
||||
const resolvedModel = resolveSessionModelIdentityRef(
|
||||
cfg,
|
||||
entry,
|
||||
sessionAgentId,
|
||||
subagentRun?.model,
|
||||
);
|
||||
const modelProvider = resolvedModel.provider;
|
||||
const model = resolvedModel.model ?? DEFAULT_MODEL;
|
||||
const transcriptUsage =
|
||||
resolvePositiveNumber(resolveFreshSessionTotalTokens(entry)) === undefined ||
|
||||
resolvePositiveNumber(entry?.contextTokens) === undefined ||
|
||||
resolveEstimatedSessionCostUsd({
|
||||
cfg,
|
||||
provider: modelProvider,
|
||||
model,
|
||||
entry,
|
||||
}) === undefined
|
||||
? resolveTranscriptUsageFallback({
|
||||
cfg,
|
||||
key,
|
||||
entry,
|
||||
storePath,
|
||||
fallbackProvider: modelProvider,
|
||||
fallbackModel: model,
|
||||
})
|
||||
: null;
|
||||
const totalTokens =
|
||||
resolvePositiveNumber(resolveFreshSessionTotalTokens(entry)) ??
|
||||
resolvePositiveNumber(transcriptUsage?.totalTokens);
|
||||
const totalTokensFresh =
|
||||
typeof totalTokens === "number" && Number.isFinite(totalTokens) && totalTokens > 0
|
||||
? true
|
||||
: transcriptUsage?.totalTokensFresh === true;
|
||||
const childSessions = resolveChildSessionKeys(key, store);
|
||||
const estimatedCostUsd =
|
||||
resolveEstimatedSessionCostUsd({
|
||||
cfg,
|
||||
provider: modelProvider,
|
||||
model,
|
||||
entry,
|
||||
}) ?? resolvePositiveNumber(transcriptUsage?.estimatedCostUsd);
|
||||
const contextTokens =
|
||||
resolvePositiveNumber(entry?.contextTokens) ??
|
||||
resolvePositiveNumber(transcriptUsage?.contextTokens) ??
|
||||
resolvePositiveNumber(
|
||||
resolveContextTokensForModel({
|
||||
cfg,
|
||||
provider: modelProvider,
|
||||
model,
|
||||
}),
|
||||
);
|
||||
|
||||
let derivedTitle: string | undefined;
|
||||
let lastMessagePreview: string | undefined;
|
||||
if (entry?.sessionId && (params.includeDerivedTitles || params.includeLastMessage)) {
|
||||
const fields = readSessionTitleFieldsFromTranscript(
|
||||
entry.sessionId,
|
||||
storePath,
|
||||
entry.sessionFile,
|
||||
sessionAgentId,
|
||||
);
|
||||
if (params.includeDerivedTitles) {
|
||||
derivedTitle = deriveSessionTitle(entry, fields.firstUserMessage);
|
||||
}
|
||||
if (params.includeLastMessage && fields.lastMessagePreview) {
|
||||
lastMessagePreview = fields.lastMessagePreview;
|
||||
}
|
||||
}
|
||||
|
||||
return {
|
||||
key,
|
||||
spawnedBy: entry?.spawnedBy,
|
||||
kind: classifySessionKey(key, entry),
|
||||
label: entry?.label,
|
||||
displayName,
|
||||
derivedTitle,
|
||||
lastMessagePreview,
|
||||
channel,
|
||||
subject,
|
||||
groupChannel,
|
||||
space,
|
||||
chatType: entry?.chatType,
|
||||
origin,
|
||||
updatedAt,
|
||||
sessionId: entry?.sessionId,
|
||||
systemSent: entry?.systemSent,
|
||||
abortedLastRun: entry?.abortedLastRun,
|
||||
thinkingLevel: entry?.thinkingLevel,
|
||||
verboseLevel: entry?.verboseLevel,
|
||||
reasoningLevel: entry?.reasoningLevel,
|
||||
elevatedLevel: entry?.elevatedLevel,
|
||||
sendPolicy: entry?.sendPolicy,
|
||||
inputTokens: entry?.inputTokens,
|
||||
outputTokens: entry?.outputTokens,
|
||||
totalTokens,
|
||||
totalTokensFresh,
|
||||
estimatedCostUsd,
|
||||
status: resolveSessionRunStatus(subagentRun),
|
||||
startedAt: subagentRun?.startedAt,
|
||||
endedAt: subagentRun?.endedAt,
|
||||
runtimeMs: resolveSessionRuntimeMs(subagentRun, now),
|
||||
parentSessionKey: entry?.parentSessionKey,
|
||||
childSessions,
|
||||
responseUsage: entry?.responseUsage,
|
||||
modelProvider,
|
||||
model,
|
||||
contextTokens,
|
||||
deliveryContext: deliveryFields.deliveryContext,
|
||||
lastChannel: deliveryFields.lastChannel ?? entry?.lastChannel,
|
||||
lastTo: deliveryFields.lastTo ?? entry?.lastTo,
|
||||
lastAccountId: deliveryFields.lastAccountId ?? entry?.lastAccountId,
|
||||
};
|
||||
}
|
||||
|
||||
export function loadGatewaySessionRow(
|
||||
sessionKey: string,
|
||||
options?: { includeDerivedTitles?: boolean; includeLastMessage?: boolean; now?: number },
|
||||
): GatewaySessionRow | null {
|
||||
const { cfg, storePath, store, entry, canonicalKey } = loadSessionEntry(sessionKey);
|
||||
if (!entry) {
|
||||
return null;
|
||||
}
|
||||
return buildGatewaySessionRow({
|
||||
cfg,
|
||||
storePath,
|
||||
store,
|
||||
key: canonicalKey,
|
||||
entry,
|
||||
now: options?.now,
|
||||
includeDerivedTitles: options?.includeDerivedTitles,
|
||||
includeLastMessage: options?.includeLastMessage,
|
||||
});
|
||||
}
|
||||
|
||||
export function listSessionsFromStore(params: {
|
||||
cfg: OpenClawConfig;
|
||||
storePath: string;
|
||||
@ -1046,117 +1239,18 @@ export function listSessionsFromStore(params: {
|
||||
}
|
||||
return entry?.label === label;
|
||||
})
|
||||
.map(([key, entry]) => {
|
||||
const updatedAt = entry?.updatedAt ?? null;
|
||||
const parsed = parseGroupKey(key);
|
||||
const channel = entry?.channel ?? parsed?.channel;
|
||||
const subject = entry?.subject;
|
||||
const groupChannel = entry?.groupChannel;
|
||||
const space = entry?.space;
|
||||
const id = parsed?.id;
|
||||
const origin = entry?.origin;
|
||||
const originLabel = origin?.label;
|
||||
const displayName =
|
||||
entry?.displayName ??
|
||||
(channel
|
||||
? buildGroupDisplayName({
|
||||
provider: channel,
|
||||
subject,
|
||||
groupChannel,
|
||||
space,
|
||||
id,
|
||||
key,
|
||||
})
|
||||
: undefined) ??
|
||||
entry?.label ??
|
||||
originLabel;
|
||||
const deliveryFields = normalizeSessionDeliveryFields(entry);
|
||||
const parsedAgent = parseAgentSessionKey(key);
|
||||
const sessionAgentId = normalizeAgentId(parsedAgent?.agentId ?? resolveDefaultAgentId(cfg));
|
||||
const resolvedModel = resolveSessionModelIdentityRef(cfg, entry, sessionAgentId);
|
||||
const modelProvider = resolvedModel.provider;
|
||||
const model = resolvedModel.model ?? DEFAULT_MODEL;
|
||||
const transcriptUsage =
|
||||
resolvePositiveNumber(resolveFreshSessionTotalTokens(entry)) === undefined ||
|
||||
resolvePositiveNumber(entry?.contextTokens) === undefined ||
|
||||
resolveEstimatedSessionCostUsd({
|
||||
cfg,
|
||||
provider: modelProvider,
|
||||
model,
|
||||
entry,
|
||||
}) === undefined
|
||||
? resolveTranscriptUsageFallback({ cfg, key, entry, storePath })
|
||||
: null;
|
||||
const totalTokens =
|
||||
resolvePositiveNumber(resolveFreshSessionTotalTokens(entry)) ??
|
||||
resolvePositiveNumber(transcriptUsage?.totalTokens);
|
||||
const totalTokensFresh =
|
||||
typeof totalTokens === "number" && Number.isFinite(totalTokens) && totalTokens > 0
|
||||
? true
|
||||
: transcriptUsage?.totalTokensFresh === true;
|
||||
const subagentRun = getSubagentRunByChildSessionKey(key);
|
||||
const childSessions = resolveChildSessionKeys(key, store);
|
||||
const estimatedCostUsd =
|
||||
resolveEstimatedSessionCostUsd({
|
||||
cfg,
|
||||
provider: modelProvider,
|
||||
model,
|
||||
entry,
|
||||
}) ?? resolvePositiveNumber(transcriptUsage?.estimatedCostUsd);
|
||||
const contextTokens =
|
||||
resolvePositiveNumber(entry?.contextTokens) ??
|
||||
resolvePositiveNumber(transcriptUsage?.contextTokens) ??
|
||||
resolvePositiveNumber(
|
||||
resolveContextTokensForModel({
|
||||
cfg,
|
||||
provider: modelProvider,
|
||||
model,
|
||||
}),
|
||||
);
|
||||
return {
|
||||
.map(([key, entry]) =>
|
||||
buildGatewaySessionRow({
|
||||
cfg,
|
||||
storePath,
|
||||
store,
|
||||
key,
|
||||
spawnedBy: entry?.spawnedBy,
|
||||
entry,
|
||||
kind: classifySessionKey(key, entry),
|
||||
label: entry?.label,
|
||||
displayName,
|
||||
channel,
|
||||
subject,
|
||||
groupChannel,
|
||||
space,
|
||||
chatType: entry?.chatType,
|
||||
origin,
|
||||
updatedAt,
|
||||
sessionId: entry?.sessionId,
|
||||
systemSent: entry?.systemSent,
|
||||
abortedLastRun: entry?.abortedLastRun,
|
||||
thinkingLevel: entry?.thinkingLevel,
|
||||
fastMode: entry?.fastMode,
|
||||
verboseLevel: entry?.verboseLevel,
|
||||
reasoningLevel: entry?.reasoningLevel,
|
||||
elevatedLevel: entry?.elevatedLevel,
|
||||
sendPolicy: entry?.sendPolicy,
|
||||
inputTokens: entry?.inputTokens,
|
||||
outputTokens: entry?.outputTokens,
|
||||
totalTokens,
|
||||
totalTokensFresh,
|
||||
estimatedCostUsd,
|
||||
status: resolveSessionRunStatus(subagentRun),
|
||||
startedAt: subagentRun?.startedAt,
|
||||
endedAt: subagentRun?.endedAt,
|
||||
runtimeMs: resolveSessionRuntimeMs(subagentRun, now),
|
||||
parentSessionKey: entry?.parentSessionKey,
|
||||
childSessions,
|
||||
responseUsage: entry?.responseUsage,
|
||||
modelProvider,
|
||||
model,
|
||||
contextTokens,
|
||||
deliveryContext: deliveryFields.deliveryContext,
|
||||
lastChannel: deliveryFields.lastChannel ?? entry?.lastChannel,
|
||||
lastTo: deliveryFields.lastTo ?? entry?.lastTo,
|
||||
lastAccountId: deliveryFields.lastAccountId ?? entry?.lastAccountId,
|
||||
};
|
||||
})
|
||||
now,
|
||||
includeDerivedTitles,
|
||||
includeLastMessage,
|
||||
}),
|
||||
)
|
||||
.toSorted((a, b) => (b.updatedAt ?? 0) - (a.updatedAt ?? 0));
|
||||
|
||||
if (search) {
|
||||
@ -1176,37 +1270,11 @@ export function listSessionsFromStore(params: {
|
||||
sessions = sessions.slice(0, limit);
|
||||
}
|
||||
|
||||
const finalSessions: GatewaySessionRow[] = sessions.map((s) => {
|
||||
const { entry, ...rest } = s;
|
||||
let derivedTitle: string | undefined;
|
||||
let lastMessagePreview: string | undefined;
|
||||
if (entry?.sessionId) {
|
||||
if (includeDerivedTitles || includeLastMessage) {
|
||||
const parsed = parseAgentSessionKey(s.key);
|
||||
const agentId =
|
||||
parsed && parsed.agentId ? normalizeAgentId(parsed.agentId) : resolveDefaultAgentId(cfg);
|
||||
const fields = readSessionTitleFieldsFromTranscript(
|
||||
entry.sessionId,
|
||||
storePath,
|
||||
entry.sessionFile,
|
||||
agentId,
|
||||
);
|
||||
if (includeDerivedTitles) {
|
||||
derivedTitle = deriveSessionTitle(entry, fields.firstUserMessage);
|
||||
}
|
||||
if (includeLastMessage && fields.lastMessagePreview) {
|
||||
lastMessagePreview = fields.lastMessagePreview;
|
||||
}
|
||||
}
|
||||
}
|
||||
return { ...rest, derivedTitle, lastMessagePreview } satisfies GatewaySessionRow;
|
||||
});
|
||||
|
||||
return {
|
||||
ts: now,
|
||||
path: storePath,
|
||||
count: finalSessions.length,
|
||||
count: sessions.length,
|
||||
defaults: getSessionDefaults(cfg),
|
||||
sessions: finalSessions,
|
||||
sessions,
|
||||
};
|
||||
}
|
||||
|
||||
@ -119,6 +119,70 @@ describe("session history HTTP endpoints", () => {
|
||||
}
|
||||
});
|
||||
|
||||
test("supports cursor pagination over direct REST while preserving the messages field", async () => {
|
||||
const { storePath } = await seedSession({ text: "first message" });
|
||||
const second = await appendAssistantMessageToSessionTranscript({
|
||||
sessionKey: "agent:main:main",
|
||||
text: "second message",
|
||||
storePath,
|
||||
});
|
||||
expect(second.ok).toBe(true);
|
||||
const third = await appendAssistantMessageToSessionTranscript({
|
||||
sessionKey: "agent:main:main",
|
||||
text: "third message",
|
||||
storePath,
|
||||
});
|
||||
expect(third.ok).toBe(true);
|
||||
|
||||
const harness = await createGatewaySuiteHarness();
|
||||
try {
|
||||
const firstPage = await fetch(
|
||||
`http://127.0.0.1:${harness.port}/sessions/${encodeURIComponent("agent:main:main")}/history?limit=2`,
|
||||
{
|
||||
headers: AUTH_HEADER,
|
||||
},
|
||||
);
|
||||
expect(firstPage.status).toBe(200);
|
||||
const firstBody = (await firstPage.json()) as {
|
||||
sessionKey?: string;
|
||||
items?: Array<{ content?: Array<{ text?: string }>; __openclaw?: { seq?: number } }>;
|
||||
messages?: Array<{ content?: Array<{ text?: string }>; __openclaw?: { seq?: number } }>;
|
||||
nextCursor?: string;
|
||||
hasMore?: boolean;
|
||||
};
|
||||
expect(firstBody.sessionKey).toBe("agent:main:main");
|
||||
expect(firstBody.items?.map((message) => message.content?.[0]?.text)).toEqual([
|
||||
"second message",
|
||||
"third message",
|
||||
]);
|
||||
expect(firstBody.messages?.map((message) => message.__openclaw?.seq)).toEqual([2, 3]);
|
||||
expect(firstBody.hasMore).toBe(true);
|
||||
expect(firstBody.nextCursor).toBe("2");
|
||||
|
||||
const secondPage = await fetch(
|
||||
`http://127.0.0.1:${harness.port}/sessions/${encodeURIComponent("agent:main:main")}/history?limit=2&cursor=${encodeURIComponent(firstBody.nextCursor ?? "")}`,
|
||||
{
|
||||
headers: AUTH_HEADER,
|
||||
},
|
||||
);
|
||||
expect(secondPage.status).toBe(200);
|
||||
const secondBody = (await secondPage.json()) as {
|
||||
items?: Array<{ content?: Array<{ text?: string }>; __openclaw?: { seq?: number } }>;
|
||||
messages?: Array<{ __openclaw?: { seq?: number } }>;
|
||||
nextCursor?: string;
|
||||
hasMore?: boolean;
|
||||
};
|
||||
expect(secondBody.items?.map((message) => message.content?.[0]?.text)).toEqual([
|
||||
"first message",
|
||||
]);
|
||||
expect(secondBody.messages?.map((message) => message.__openclaw?.seq)).toEqual([1]);
|
||||
expect(secondBody.hasMore).toBe(false);
|
||||
expect(secondBody.nextCursor).toBeUndefined();
|
||||
} finally {
|
||||
await harness.close();
|
||||
}
|
||||
});
|
||||
|
||||
test("streams bounded history windows over SSE", async () => {
|
||||
const { storePath } = await seedSession({ text: "first message" });
|
||||
const second = await appendAssistantMessageToSessionTranscript({
|
||||
|
||||
@ -41,9 +41,12 @@ function shouldStreamSse(req: IncomingMessage): boolean {
|
||||
return accept.includes("text/event-stream");
|
||||
}
|
||||
|
||||
function getRequestUrl(req: IncomingMessage): URL {
|
||||
return new URL(req.url ?? "/", `http://${req.headers.host ?? "localhost"}`);
|
||||
}
|
||||
|
||||
function resolveLimit(req: IncomingMessage): number | undefined {
|
||||
const url = new URL(req.url ?? "/", `http://${req.headers.host ?? "localhost"}`);
|
||||
const raw = url.searchParams.get("limit");
|
||||
const raw = getRequestUrl(req).searchParams.get("limit");
|
||||
if (raw == null || raw.trim() === "") {
|
||||
return undefined;
|
||||
}
|
||||
@ -54,11 +57,59 @@ function resolveLimit(req: IncomingMessage): number | undefined {
|
||||
return Math.min(MAX_SESSION_HISTORY_LIMIT, Math.max(1, value));
|
||||
}
|
||||
|
||||
function maybeLimitMessages(messages: unknown[], limit: number | undefined): unknown[] {
|
||||
if (limit === undefined || limit >= messages.length) {
|
||||
return messages;
|
||||
function resolveCursor(req: IncomingMessage): string | undefined {
|
||||
const raw = getRequestUrl(req).searchParams.get("cursor");
|
||||
const trimmed = raw?.trim();
|
||||
return trimmed ? trimmed : undefined;
|
||||
}
|
||||
|
||||
type PaginatedSessionHistory = {
|
||||
items: unknown[];
|
||||
messages: unknown[];
|
||||
nextCursor?: string;
|
||||
hasMore: boolean;
|
||||
};
|
||||
|
||||
function resolveCursorSeq(cursor: string | undefined): number | undefined {
|
||||
if (!cursor) {
|
||||
return undefined;
|
||||
}
|
||||
return messages.slice(-limit);
|
||||
const normalized = cursor.startsWith("seq:") ? cursor.slice(4) : cursor;
|
||||
const value = Number.parseInt(normalized, 10);
|
||||
return Number.isFinite(value) && value > 0 ? value : undefined;
|
||||
}
|
||||
|
||||
function resolveMessageSeq(message: unknown): number | undefined {
|
||||
if (!message || typeof message !== "object" || Array.isArray(message)) {
|
||||
return undefined;
|
||||
}
|
||||
const meta = (message as { __openclaw?: unknown }).__openclaw;
|
||||
if (!meta || typeof meta !== "object" || Array.isArray(meta)) {
|
||||
return undefined;
|
||||
}
|
||||
const seq = (meta as { seq?: unknown }).seq;
|
||||
return typeof seq === "number" && Number.isFinite(seq) && seq > 0 ? seq : undefined;
|
||||
}
|
||||
|
||||
function paginateSessionMessages(
|
||||
messages: unknown[],
|
||||
limit: number | undefined,
|
||||
cursor: string | undefined,
|
||||
): PaginatedSessionHistory {
|
||||
const cursorSeq = resolveCursorSeq(cursor);
|
||||
const endExclusive =
|
||||
typeof cursorSeq === "number"
|
||||
? Math.max(0, Math.min(messages.length, cursorSeq - 1))
|
||||
: messages.length;
|
||||
const start = typeof limit === "number" && limit > 0 ? Math.max(0, endExclusive - limit) : 0;
|
||||
const items = messages.slice(start, endExclusive);
|
||||
const firstSeq = resolveMessageSeq(items[0]);
|
||||
return {
|
||||
items,
|
||||
messages: items,
|
||||
hasMore: start > 0,
|
||||
...(start > 0 && typeof firstSeq === "number" ? { nextCursor: String(firstSeq) } : {}),
|
||||
};
|
||||
}
|
||||
|
||||
function canonicalizePath(value: string | undefined): string | undefined {
|
||||
@ -121,17 +172,19 @@ export async function handleSessionHistoryHttpRequest(
|
||||
const store = loadSessionStore(target.storePath);
|
||||
const entry = target.storeKeys.map((key) => store[key]).find(Boolean);
|
||||
const limit = resolveLimit(req);
|
||||
const messages = entry?.sessionId
|
||||
? maybeLimitMessages(
|
||||
readSessionMessages(entry.sessionId, target.storePath, entry.sessionFile),
|
||||
limit,
|
||||
)
|
||||
: [];
|
||||
const cursor = resolveCursor(req);
|
||||
const history = paginateSessionMessages(
|
||||
entry?.sessionId
|
||||
? readSessionMessages(entry.sessionId, target.storePath, entry.sessionFile)
|
||||
: [],
|
||||
limit,
|
||||
cursor,
|
||||
);
|
||||
|
||||
if (!shouldStreamSse(req)) {
|
||||
sendJson(res, 200, {
|
||||
sessionKey: target.canonicalKey,
|
||||
messages,
|
||||
...history,
|
||||
});
|
||||
return true;
|
||||
}
|
||||
@ -149,12 +202,12 @@ export async function handleSessionHistoryHttpRequest(
|
||||
)
|
||||
: new Set<string>();
|
||||
|
||||
let sentMessages = messages;
|
||||
let sentHistory = history;
|
||||
setSseHeaders(res);
|
||||
res.write("retry: 1000\n\n");
|
||||
sseWrite(res, "history", {
|
||||
sessionKey: target.canonicalKey,
|
||||
messages: sentMessages,
|
||||
...sentHistory,
|
||||
});
|
||||
|
||||
const heartbeat = setInterval(() => {
|
||||
@ -172,35 +225,37 @@ export async function handleSessionHistoryHttpRequest(
|
||||
return;
|
||||
}
|
||||
if (update.message !== undefined) {
|
||||
const messageSeq = sentMessages.length + 1;
|
||||
const previousSeq = resolveMessageSeq(sentHistory.items.at(-1));
|
||||
const nextMessage = attachOpenClawTranscriptMeta(update.message, {
|
||||
...(typeof update.messageId === "string" ? { id: update.messageId } : {}),
|
||||
seq: messageSeq,
|
||||
seq:
|
||||
typeof previousSeq === "number"
|
||||
? previousSeq + 1
|
||||
: readSessionMessages(entry.sessionId, target.storePath, entry.sessionFile).length,
|
||||
});
|
||||
if (limit === undefined) {
|
||||
sentMessages = [...sentMessages, nextMessage];
|
||||
if (limit === undefined && cursor === undefined) {
|
||||
sentHistory = {
|
||||
items: [...sentHistory.items, nextMessage],
|
||||
messages: [...sentHistory.items, nextMessage],
|
||||
hasMore: false,
|
||||
};
|
||||
sseWrite(res, "message", {
|
||||
sessionKey: target.canonicalKey,
|
||||
message: nextMessage,
|
||||
...(typeof update.messageId === "string" ? { messageId: update.messageId } : {}),
|
||||
messageSeq,
|
||||
messageSeq: resolveMessageSeq(nextMessage),
|
||||
});
|
||||
return;
|
||||
}
|
||||
sentMessages = maybeLimitMessages([...sentMessages, nextMessage], limit);
|
||||
sseWrite(res, "history", {
|
||||
sessionKey: target.canonicalKey,
|
||||
messages: sentMessages,
|
||||
});
|
||||
return;
|
||||
}
|
||||
sentMessages = maybeLimitMessages(
|
||||
sentHistory = paginateSessionMessages(
|
||||
readSessionMessages(entry.sessionId, target.storePath, entry.sessionFile),
|
||||
limit,
|
||||
cursor,
|
||||
);
|
||||
sseWrite(res, "history", {
|
||||
sessionKey: target.canonicalKey,
|
||||
messages: sentMessages,
|
||||
...sentHistory,
|
||||
});
|
||||
});
|
||||
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user