feat: extract real token usage from messages and auto-restart gateway on web runtime updates

Add extractUsageFromMessages and normalizeOutputForPostHog to pull actual
token counts and cost from OpenClaw per-message usage metadata, and convert
Anthropic-style tool call blocks to OpenAI format for PostHog AI observability.

Integrate gateway daemon restart (stop → install → start) into both the
update and start web runtime commands so the gateway stays in sync with
runtime upgrades.
This commit is contained in:
kumarabhirup 2026-03-05 13:46:51 -08:00
parent 460bf6bf6e
commit 8755c509f8
No known key found for this signature in database
GPG Key ID: DB7CA2289CAB0167
3 changed files with 280 additions and 11 deletions

View File

@ -13,6 +13,34 @@ function getAnonymousId(): string {
}
}
/**
* Extract actual token counts and cost from OpenClaw's per-message usage metadata.
* Each assistant message in the agent_end messages array includes:
* usage: { input, output, cacheRead, cacheWrite, cost: { total, input, output, ... } }
*/
export function extractUsageFromMessages(messages: unknown): {
inputTokens: number;
outputTokens: number;
totalCostUsd: number;
} {
if (!Array.isArray(messages)) return { inputTokens: 0, outputTokens: 0, totalCostUsd: 0 };
let inputTokens = 0;
let outputTokens = 0;
let totalCostUsd = 0;
for (const msg of messages) {
if (!msg || typeof msg !== "object") continue;
const m = msg as Record<string, unknown>;
if (m.role !== "assistant") continue;
const usage = m.usage as Record<string, unknown> | undefined;
if (!usage) continue;
if (typeof usage.input === "number") inputTokens += usage.input;
if (typeof usage.output === "number") outputTokens += usage.output;
const cost = usage.cost as Record<string, unknown> | undefined;
if (cost && typeof cost.total === "number") totalCostUsd += cost.total;
}
return { inputTokens, outputTokens, totalCostUsd };
}
/**
* Extract tool call names from the messages array provided by agent_end.
* Works regardless of privacy mode since tool names are metadata, not content.
@ -50,6 +78,63 @@ export function extractToolNamesFromMessages(messages: unknown): string[] {
return [...new Set(names)];
}
/**
* Normalize OpenClaw's message format into OpenAI-compatible output choices
* so PostHog can extract tool calls for the Tools tab.
* Converts Anthropic-style content blocks (type: "toolCall") to OpenAI's
* tool_calls array format.
*/
export function normalizeOutputForPostHog(messages: unknown): unknown[] | undefined {
if (!Array.isArray(messages)) return undefined;
const choices: unknown[] = [];
for (const msg of messages) {
if (!msg || typeof msg !== "object") continue;
const m = msg as Record<string, unknown>;
if (m.role !== "assistant") continue;
const toolCalls: unknown[] = [];
let textContent = "";
if (Array.isArray(m.content)) {
for (const block of m.content as Array<Record<string, unknown>>) {
if (block.type === "text" && typeof block.text === "string") {
textContent += block.text;
}
if (block.type === "toolCall" && typeof block.name === "string") {
toolCalls.push({
id: block.id ?? block.toolCallId,
type: "function",
function: {
name: block.name,
arguments: typeof block.arguments === "string"
? block.arguments
: JSON.stringify(block.arguments ?? {}),
},
});
}
}
} else if (typeof m.content === "string") {
textContent = m.content;
}
if (Array.isArray(m.tool_calls)) {
for (const tc of m.tool_calls as Array<Record<string, unknown>>) {
toolCalls.push(tc);
}
}
const choice: Record<string, unknown> = {
role: "assistant",
content: textContent || null,
};
if (toolCalls.length > 0) {
choice.tool_calls = toolCalls;
}
choices.push(choice);
}
return choices.length > 0 ? choices : undefined;
}
/**
* Emit a `$ai_generation` event from the agent_end hook data.
*/
@ -93,18 +178,20 @@ export function emitGeneration(
const outputTokens = event.usage.outputTokens ?? event.usage.output_tokens;
if (inputTokens != null && inputTokens > 0) properties.$ai_input_tokens = inputTokens;
if (outputTokens != null && outputTokens > 0) properties.$ai_output_tokens = outputTokens;
}
if (event.cost) {
const cost = event.cost.totalUsd ?? event.cost.total_usd;
if (cost != null && cost > 0) {
properties.$ai_total_cost_usd = cost;
}
const cost = event.cost?.totalUsd ?? event.cost?.total_usd;
if (cost != null && cost > 0) properties.$ai_total_cost_usd = cost;
} else if (event.messages) {
const extracted = extractUsageFromMessages(event.messages);
if (extracted.inputTokens > 0) properties.$ai_input_tokens = extracted.inputTokens;
if (extracted.outputTokens > 0) properties.$ai_output_tokens = extracted.outputTokens;
if (extracted.totalCostUsd > 0) properties.$ai_total_cost_usd = extracted.totalCostUsd;
}
properties.$ai_input = sanitizeForCapture(trace.input, privacyMode);
const outputChoices = normalizeOutputForPostHog(event.messages);
properties.$ai_output_choices = sanitizeForCapture(
event.output ?? event.messages,
outputChoices ?? event.output ?? event.messages,
privacyMode,
);

View File

@ -64,6 +64,8 @@ export type UpdateWebRuntimeSummary = {
skippedForeignPids: number[];
ready: boolean;
reason: string;
gatewayRestarted: boolean;
gatewayError?: string;
};
export type StopWebRuntimeSummary = {
@ -80,6 +82,8 @@ export type StartWebRuntimeSummary = {
skippedForeignPids: number[];
started: boolean;
reason: string;
gatewayRestarted: boolean;
gatewayError?: string;
};
function parseOptionalPort(value: string | number | undefined): number | undefined {
@ -254,6 +258,59 @@ function readConfigGatewayPort(configPath: string): number | undefined {
}
}
async function restartGatewayDaemon(params: {
profile: string;
gatewayPort: number;
json: boolean;
}): Promise<{ restarted: boolean; error?: string }> {
let openclawCommand: string;
try {
openclawCommand = resolveOpenClawCommandOrThrow();
} catch {
return { restarted: false, error: "openclaw CLI not found on PATH" };
}
const s = !params.json ? spinner() : null;
s?.start("Stopping gateway daemon…");
await runOpenClawCommand({
openclawCommand,
args: ["--profile", params.profile, "gateway", "stop"],
timeoutMs: 90_000,
}).catch(() => ({ code: 1, stdout: "", stderr: "stop timed out" }));
s?.message("Installing gateway daemon…");
await runOpenClawCommand({
openclawCommand,
args: [
"--profile", params.profile,
"gateway", "install", "--force",
"--port", String(params.gatewayPort),
],
timeoutMs: 2 * 60_000,
}).catch(() => ({ code: 1, stdout: "", stderr: "install failed" }));
s?.message("Starting gateway daemon…");
const startResult = await runOpenClawCommand({
openclawCommand,
args: [
"--profile", params.profile,
"gateway", "start",
"--port", String(params.gatewayPort),
],
timeoutMs: 2 * 60_000,
}).catch(() => ({ code: 1, stdout: "", stderr: "start failed" }));
if (startResult.code !== 0) {
const detail = firstNonEmptyLine(startResult.stderr, startResult.stdout);
s?.stop(detail ? `Gateway restart failed: ${detail}` : "Gateway restart failed.");
return { restarted: false, error: detail ?? "gateway start failed" };
}
s?.stop("Gateway daemon restarted.");
return { restarted: true };
}
export async function updateWebRuntimeCommand(
opts: UpdateWebRuntimeOptions,
runtime: RuntimeEnv = defaultRuntime,
@ -298,6 +355,13 @@ export async function updateWebRuntimeCommand(
port: selectedPort,
includeLegacyStandalone: true,
});
const gatewayResult = await restartGatewayDaemon({
profile,
gatewayPort,
json: Boolean(opts.json),
});
const ensureResult = await ensureManagedWebRuntime({
stateDir,
packageRoot,
@ -319,6 +383,8 @@ export async function updateWebRuntimeCommand(
skippedForeignPids: stopResult.skippedForeignPids,
ready: ensureResult.ready,
reason: ensureResult.reason,
gatewayRestarted: gatewayResult.restarted,
gatewayError: gatewayResult.error,
};
if (!opts.json) {
@ -327,6 +393,10 @@ export async function updateWebRuntimeCommand(
runtime.log(`Profile: ${profile}`);
runtime.log(`Version: ${VERSION}`);
runtime.log(`Web port: ${selectedPort}`);
runtime.log(`Gateway: ${summary.gatewayRestarted ? "restarted" : "restart failed"}`);
if (summary.gatewayError) {
runtime.log(theme.warn(`Gateway error: ${summary.gatewayError}`));
}
runtime.log(`Stopped web processes: ${summary.stoppedPids.length}`);
if (summary.skippedForeignPids.length > 0) {
runtime.log(
@ -437,6 +507,12 @@ export async function startWebRuntimeCommand(
);
}
const gatewayResult = await restartGatewayDaemon({
profile,
gatewayPort,
json: Boolean(opts.json),
});
const startResult = startManagedWebRuntime({
stateDir,
port: selectedPort,
@ -461,6 +537,8 @@ export async function startWebRuntimeCommand(
skippedForeignPids: stopResult.skippedForeignPids,
started: probe.ok,
reason: probe.reason,
gatewayRestarted: gatewayResult.restarted,
gatewayError: gatewayResult.error,
};
if (opts.json) {
@ -472,6 +550,10 @@ export async function startWebRuntimeCommand(
runtime.log(theme.heading(`Dench web ${label}`));
runtime.log(`Profile: ${profile}`);
runtime.log(`Web port: ${selectedPort}`);
runtime.log(`Gateway: ${summary.gatewayRestarted ? "restarted" : "restart failed"}`);
if (summary.gatewayError) {
runtime.log(theme.warn(`Gateway error: ${summary.gatewayError}`));
}
runtime.log(`Restarted managed web runtime: ${summary.started ? "yes" : "no"}`);
if (!summary.started) {
runtime.log(theme.warn(summary.reason));

View File

@ -1,6 +1,6 @@
import { describe, it, expect, beforeEach, vi } from "vitest";
import { TraceContextManager } from "../../extensions/posthog-analytics/lib/trace-context.js";
import { emitGeneration, emitToolSpan, emitTrace, emitCustomEvent, extractToolNamesFromMessages } from "../../extensions/posthog-analytics/lib/event-mappers.js";
import { emitGeneration, emitToolSpan, emitTrace, emitCustomEvent, extractToolNamesFromMessages, extractUsageFromMessages, normalizeOutputForPostHog } from "../../extensions/posthog-analytics/lib/event-mappers.js";
function createMockPostHog() {
return {
@ -47,6 +47,90 @@ describe("extractToolNamesFromMessages", () => {
});
});
describe("extractUsageFromMessages", () => {
it("sums input/output tokens and cost across assistant messages", () => {
const messages = [
{ role: "user", content: "hello" },
{ role: "assistant", content: "hi", usage: { input: 10, output: 50, cost: { total: 0.001 } } },
{ role: "user", content: "more" },
{ role: "assistant", content: "sure", usage: { input: 20, output: 100, cost: { total: 0.002 } } },
];
const result = extractUsageFromMessages(messages);
expect(result.inputTokens).toBe(30);
expect(result.outputTokens).toBe(150);
expect(result.totalCostUsd).toBeCloseTo(0.003);
});
it("skips non-assistant messages (user, tool, system)", () => {
const messages = [
{ role: "user", content: "hello", usage: { input: 999, output: 999, cost: { total: 99 } } },
{ role: "tool", name: "exec", content: "result" },
];
const result = extractUsageFromMessages(messages);
expect(result.inputTokens).toBe(0);
expect(result.outputTokens).toBe(0);
expect(result.totalCostUsd).toBe(0);
});
it("handles assistant messages without usage field", () => {
const messages = [
{ role: "assistant", content: "hi" },
];
const result = extractUsageFromMessages(messages);
expect(result.inputTokens).toBe(0);
});
it("returns zeros for non-array input", () => {
expect(extractUsageFromMessages(null)).toEqual({ inputTokens: 0, outputTokens: 0, totalCostUsd: 0 });
});
});
describe("normalizeOutputForPostHog", () => {
it("converts Anthropic toolCall content blocks to OpenAI tool_calls format", () => {
const messages = [
{
role: "assistant",
content: [
{ type: "text", text: "Let me run that." },
{ type: "toolCall", name: "exec", id: "call_1", arguments: { command: "ls" } },
],
},
];
const result = normalizeOutputForPostHog(messages) as any[];
expect(result).toHaveLength(1);
expect(result[0].role).toBe("assistant");
expect(result[0].content).toBe("Let me run that.");
expect(result[0].tool_calls).toHaveLength(1);
expect(result[0].tool_calls[0].function.name).toBe("exec");
expect(result[0].tool_calls[0].type).toBe("function");
});
it("passes through OpenAI-format tool_calls unchanged", () => {
const messages = [
{
role: "assistant",
content: "OK",
tool_calls: [{ id: "c1", type: "function", function: { name: "search" } }],
},
];
const result = normalizeOutputForPostHog(messages) as any[];
expect(result[0].tool_calls[0].function.name).toBe("search");
});
it("returns undefined for non-array or empty input", () => {
expect(normalizeOutputForPostHog(null)).toBeUndefined();
expect(normalizeOutputForPostHog([])).toBeUndefined();
});
it("skips non-assistant messages", () => {
const messages = [
{ role: "user", content: "hello" },
{ role: "tool", name: "exec", content: "result" },
];
expect(normalizeOutputForPostHog(messages)).toBeUndefined();
});
});
describe("emitGeneration", () => {
let ph: ReturnType<typeof createMockPostHog>;
let traceCtx: TraceContextManager;
@ -105,9 +189,12 @@ describe("emitGeneration", () => {
expect(ph.capture.mock.calls[0][0].properties.$ai_total_cost_usd).toBeUndefined();
});
it("sets $ai_total_cost_usd when cost is positive", () => {
it("sets $ai_total_cost_usd when cost is positive (via event.usage path)", () => {
traceCtx.startTrace("s", "r");
emitGeneration(ph, traceCtx, "s", { cost: { totalUsd: 0.05 } }, true);
emitGeneration(ph, traceCtx, "s", {
usage: { inputTokens: 10, outputTokens: 20 },
cost: { totalUsd: 0.05 },
}, true);
expect(ph.capture.mock.calls[0][0].properties.$ai_total_cost_usd).toBe(0.05);
});
@ -127,6 +214,19 @@ describe("emitGeneration", () => {
expect(props).not.toHaveProperty("$ai_output_tokens");
});
it("extracts real token counts and cost from message usage metadata (OpenClaw fallback)", () => {
traceCtx.startTrace("s", "r");
const messages = [
{ role: "user", content: "What is 2 + 2?" },
{ role: "assistant", content: "The answer is 4.", usage: { input: 15, output: 8, cost: { total: 0.0005 } } },
];
emitGeneration(ph, traceCtx, "s", { messages }, true);
const props = ph.capture.mock.calls[0][0].properties;
expect(props.$ai_input_tokens).toBe(15);
expect(props.$ai_output_tokens).toBe(8);
expect(props.$ai_total_cost_usd).toBe(0.0005);
});
it("captures error details when generation fails", () => {
traceCtx.startTrace("s", "r");
emitGeneration(ph, traceCtx, "s", { error: { message: "Rate limit exceeded" } }, true);