From 8755c509f8f46a256110e012d1b4d5fd4b983e64 Mon Sep 17 00:00:00 2001 From: kumarabhirup Date: Thu, 5 Mar 2026 13:46:51 -0800 Subject: [PATCH] feat: extract real token usage from messages and auto-restart gateway on web runtime updates MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Add extractUsageFromMessages and normalizeOutputForPostHog to pull actual token counts and cost from OpenClaw per-message usage metadata, and convert Anthropic-style tool call blocks to OpenAI format for PostHog AI observability. Integrate gateway daemon restart (stop → install → start) into both the update and start web runtime commands so the gateway stays in sync with runtime upgrades. --- .../posthog-analytics/lib/event-mappers.ts | 103 +++++++++++++++-- src/cli/web-runtime-command.ts | 82 ++++++++++++++ src/telemetry/event-mappers.test.ts | 106 +++++++++++++++++- 3 files changed, 280 insertions(+), 11 deletions(-) diff --git a/extensions/posthog-analytics/lib/event-mappers.ts b/extensions/posthog-analytics/lib/event-mappers.ts index 74baed58a1b..a902efdaa85 100644 --- a/extensions/posthog-analytics/lib/event-mappers.ts +++ b/extensions/posthog-analytics/lib/event-mappers.ts @@ -13,6 +13,34 @@ function getAnonymousId(): string { } } +/** + * Extract actual token counts and cost from OpenClaw's per-message usage metadata. + * Each assistant message in the agent_end messages array includes: + * usage: { input, output, cacheRead, cacheWrite, cost: { total, input, output, ... } } + */ +export function extractUsageFromMessages(messages: unknown): { + inputTokens: number; + outputTokens: number; + totalCostUsd: number; +} { + if (!Array.isArray(messages)) return { inputTokens: 0, outputTokens: 0, totalCostUsd: 0 }; + let inputTokens = 0; + let outputTokens = 0; + let totalCostUsd = 0; + for (const msg of messages) { + if (!msg || typeof msg !== "object") continue; + const m = msg as Record; + if (m.role !== "assistant") continue; + const usage = m.usage as Record | undefined; + if (!usage) continue; + if (typeof usage.input === "number") inputTokens += usage.input; + if (typeof usage.output === "number") outputTokens += usage.output; + const cost = usage.cost as Record | undefined; + if (cost && typeof cost.total === "number") totalCostUsd += cost.total; + } + return { inputTokens, outputTokens, totalCostUsd }; +} + /** * Extract tool call names from the messages array provided by agent_end. * Works regardless of privacy mode since tool names are metadata, not content. @@ -50,6 +78,63 @@ export function extractToolNamesFromMessages(messages: unknown): string[] { return [...new Set(names)]; } +/** + * Normalize OpenClaw's message format into OpenAI-compatible output choices + * so PostHog can extract tool calls for the Tools tab. + * Converts Anthropic-style content blocks (type: "toolCall") to OpenAI's + * tool_calls array format. + */ +export function normalizeOutputForPostHog(messages: unknown): unknown[] | undefined { + if (!Array.isArray(messages)) return undefined; + const choices: unknown[] = []; + for (const msg of messages) { + if (!msg || typeof msg !== "object") continue; + const m = msg as Record; + if (m.role !== "assistant") continue; + + const toolCalls: unknown[] = []; + let textContent = ""; + + if (Array.isArray(m.content)) { + for (const block of m.content as Array>) { + if (block.type === "text" && typeof block.text === "string") { + textContent += block.text; + } + if (block.type === "toolCall" && typeof block.name === "string") { + toolCalls.push({ + id: block.id ?? block.toolCallId, + type: "function", + function: { + name: block.name, + arguments: typeof block.arguments === "string" + ? block.arguments + : JSON.stringify(block.arguments ?? {}), + }, + }); + } + } + } else if (typeof m.content === "string") { + textContent = m.content; + } + + if (Array.isArray(m.tool_calls)) { + for (const tc of m.tool_calls as Array>) { + toolCalls.push(tc); + } + } + + const choice: Record = { + role: "assistant", + content: textContent || null, + }; + if (toolCalls.length > 0) { + choice.tool_calls = toolCalls; + } + choices.push(choice); + } + return choices.length > 0 ? choices : undefined; +} + /** * Emit a `$ai_generation` event from the agent_end hook data. */ @@ -93,18 +178,20 @@ export function emitGeneration( const outputTokens = event.usage.outputTokens ?? event.usage.output_tokens; if (inputTokens != null && inputTokens > 0) properties.$ai_input_tokens = inputTokens; if (outputTokens != null && outputTokens > 0) properties.$ai_output_tokens = outputTokens; - } - - if (event.cost) { - const cost = event.cost.totalUsd ?? event.cost.total_usd; - if (cost != null && cost > 0) { - properties.$ai_total_cost_usd = cost; - } + const cost = event.cost?.totalUsd ?? event.cost?.total_usd; + if (cost != null && cost > 0) properties.$ai_total_cost_usd = cost; + } else if (event.messages) { + const extracted = extractUsageFromMessages(event.messages); + if (extracted.inputTokens > 0) properties.$ai_input_tokens = extracted.inputTokens; + if (extracted.outputTokens > 0) properties.$ai_output_tokens = extracted.outputTokens; + if (extracted.totalCostUsd > 0) properties.$ai_total_cost_usd = extracted.totalCostUsd; } properties.$ai_input = sanitizeForCapture(trace.input, privacyMode); + + const outputChoices = normalizeOutputForPostHog(event.messages); properties.$ai_output_choices = sanitizeForCapture( - event.output ?? event.messages, + outputChoices ?? event.output ?? event.messages, privacyMode, ); diff --git a/src/cli/web-runtime-command.ts b/src/cli/web-runtime-command.ts index 0e669e4ae40..b4e30d0a98e 100644 --- a/src/cli/web-runtime-command.ts +++ b/src/cli/web-runtime-command.ts @@ -64,6 +64,8 @@ export type UpdateWebRuntimeSummary = { skippedForeignPids: number[]; ready: boolean; reason: string; + gatewayRestarted: boolean; + gatewayError?: string; }; export type StopWebRuntimeSummary = { @@ -80,6 +82,8 @@ export type StartWebRuntimeSummary = { skippedForeignPids: number[]; started: boolean; reason: string; + gatewayRestarted: boolean; + gatewayError?: string; }; function parseOptionalPort(value: string | number | undefined): number | undefined { @@ -254,6 +258,59 @@ function readConfigGatewayPort(configPath: string): number | undefined { } } +async function restartGatewayDaemon(params: { + profile: string; + gatewayPort: number; + json: boolean; +}): Promise<{ restarted: boolean; error?: string }> { + let openclawCommand: string; + try { + openclawCommand = resolveOpenClawCommandOrThrow(); + } catch { + return { restarted: false, error: "openclaw CLI not found on PATH" }; + } + + const s = !params.json ? spinner() : null; + s?.start("Stopping gateway daemon…"); + + await runOpenClawCommand({ + openclawCommand, + args: ["--profile", params.profile, "gateway", "stop"], + timeoutMs: 90_000, + }).catch(() => ({ code: 1, stdout: "", stderr: "stop timed out" })); + + s?.message("Installing gateway daemon…"); + await runOpenClawCommand({ + openclawCommand, + args: [ + "--profile", params.profile, + "gateway", "install", "--force", + "--port", String(params.gatewayPort), + ], + timeoutMs: 2 * 60_000, + }).catch(() => ({ code: 1, stdout: "", stderr: "install failed" })); + + s?.message("Starting gateway daemon…"); + const startResult = await runOpenClawCommand({ + openclawCommand, + args: [ + "--profile", params.profile, + "gateway", "start", + "--port", String(params.gatewayPort), + ], + timeoutMs: 2 * 60_000, + }).catch(() => ({ code: 1, stdout: "", stderr: "start failed" })); + + if (startResult.code !== 0) { + const detail = firstNonEmptyLine(startResult.stderr, startResult.stdout); + s?.stop(detail ? `Gateway restart failed: ${detail}` : "Gateway restart failed."); + return { restarted: false, error: detail ?? "gateway start failed" }; + } + + s?.stop("Gateway daemon restarted."); + return { restarted: true }; +} + export async function updateWebRuntimeCommand( opts: UpdateWebRuntimeOptions, runtime: RuntimeEnv = defaultRuntime, @@ -298,6 +355,13 @@ export async function updateWebRuntimeCommand( port: selectedPort, includeLegacyStandalone: true, }); + + const gatewayResult = await restartGatewayDaemon({ + profile, + gatewayPort, + json: Boolean(opts.json), + }); + const ensureResult = await ensureManagedWebRuntime({ stateDir, packageRoot, @@ -319,6 +383,8 @@ export async function updateWebRuntimeCommand( skippedForeignPids: stopResult.skippedForeignPids, ready: ensureResult.ready, reason: ensureResult.reason, + gatewayRestarted: gatewayResult.restarted, + gatewayError: gatewayResult.error, }; if (!opts.json) { @@ -327,6 +393,10 @@ export async function updateWebRuntimeCommand( runtime.log(`Profile: ${profile}`); runtime.log(`Version: ${VERSION}`); runtime.log(`Web port: ${selectedPort}`); + runtime.log(`Gateway: ${summary.gatewayRestarted ? "restarted" : "restart failed"}`); + if (summary.gatewayError) { + runtime.log(theme.warn(`Gateway error: ${summary.gatewayError}`)); + } runtime.log(`Stopped web processes: ${summary.stoppedPids.length}`); if (summary.skippedForeignPids.length > 0) { runtime.log( @@ -437,6 +507,12 @@ export async function startWebRuntimeCommand( ); } + const gatewayResult = await restartGatewayDaemon({ + profile, + gatewayPort, + json: Boolean(opts.json), + }); + const startResult = startManagedWebRuntime({ stateDir, port: selectedPort, @@ -461,6 +537,8 @@ export async function startWebRuntimeCommand( skippedForeignPids: stopResult.skippedForeignPids, started: probe.ok, reason: probe.reason, + gatewayRestarted: gatewayResult.restarted, + gatewayError: gatewayResult.error, }; if (opts.json) { @@ -472,6 +550,10 @@ export async function startWebRuntimeCommand( runtime.log(theme.heading(`Dench web ${label}`)); runtime.log(`Profile: ${profile}`); runtime.log(`Web port: ${selectedPort}`); + runtime.log(`Gateway: ${summary.gatewayRestarted ? "restarted" : "restart failed"}`); + if (summary.gatewayError) { + runtime.log(theme.warn(`Gateway error: ${summary.gatewayError}`)); + } runtime.log(`Restarted managed web runtime: ${summary.started ? "yes" : "no"}`); if (!summary.started) { runtime.log(theme.warn(summary.reason)); diff --git a/src/telemetry/event-mappers.test.ts b/src/telemetry/event-mappers.test.ts index e71bcf72c32..8e13d6927e6 100644 --- a/src/telemetry/event-mappers.test.ts +++ b/src/telemetry/event-mappers.test.ts @@ -1,6 +1,6 @@ import { describe, it, expect, beforeEach, vi } from "vitest"; import { TraceContextManager } from "../../extensions/posthog-analytics/lib/trace-context.js"; -import { emitGeneration, emitToolSpan, emitTrace, emitCustomEvent, extractToolNamesFromMessages } from "../../extensions/posthog-analytics/lib/event-mappers.js"; +import { emitGeneration, emitToolSpan, emitTrace, emitCustomEvent, extractToolNamesFromMessages, extractUsageFromMessages, normalizeOutputForPostHog } from "../../extensions/posthog-analytics/lib/event-mappers.js"; function createMockPostHog() { return { @@ -47,6 +47,90 @@ describe("extractToolNamesFromMessages", () => { }); }); +describe("extractUsageFromMessages", () => { + it("sums input/output tokens and cost across assistant messages", () => { + const messages = [ + { role: "user", content: "hello" }, + { role: "assistant", content: "hi", usage: { input: 10, output: 50, cost: { total: 0.001 } } }, + { role: "user", content: "more" }, + { role: "assistant", content: "sure", usage: { input: 20, output: 100, cost: { total: 0.002 } } }, + ]; + const result = extractUsageFromMessages(messages); + expect(result.inputTokens).toBe(30); + expect(result.outputTokens).toBe(150); + expect(result.totalCostUsd).toBeCloseTo(0.003); + }); + + it("skips non-assistant messages (user, tool, system)", () => { + const messages = [ + { role: "user", content: "hello", usage: { input: 999, output: 999, cost: { total: 99 } } }, + { role: "tool", name: "exec", content: "result" }, + ]; + const result = extractUsageFromMessages(messages); + expect(result.inputTokens).toBe(0); + expect(result.outputTokens).toBe(0); + expect(result.totalCostUsd).toBe(0); + }); + + it("handles assistant messages without usage field", () => { + const messages = [ + { role: "assistant", content: "hi" }, + ]; + const result = extractUsageFromMessages(messages); + expect(result.inputTokens).toBe(0); + }); + + it("returns zeros for non-array input", () => { + expect(extractUsageFromMessages(null)).toEqual({ inputTokens: 0, outputTokens: 0, totalCostUsd: 0 }); + }); +}); + +describe("normalizeOutputForPostHog", () => { + it("converts Anthropic toolCall content blocks to OpenAI tool_calls format", () => { + const messages = [ + { + role: "assistant", + content: [ + { type: "text", text: "Let me run that." }, + { type: "toolCall", name: "exec", id: "call_1", arguments: { command: "ls" } }, + ], + }, + ]; + const result = normalizeOutputForPostHog(messages) as any[]; + expect(result).toHaveLength(1); + expect(result[0].role).toBe("assistant"); + expect(result[0].content).toBe("Let me run that."); + expect(result[0].tool_calls).toHaveLength(1); + expect(result[0].tool_calls[0].function.name).toBe("exec"); + expect(result[0].tool_calls[0].type).toBe("function"); + }); + + it("passes through OpenAI-format tool_calls unchanged", () => { + const messages = [ + { + role: "assistant", + content: "OK", + tool_calls: [{ id: "c1", type: "function", function: { name: "search" } }], + }, + ]; + const result = normalizeOutputForPostHog(messages) as any[]; + expect(result[0].tool_calls[0].function.name).toBe("search"); + }); + + it("returns undefined for non-array or empty input", () => { + expect(normalizeOutputForPostHog(null)).toBeUndefined(); + expect(normalizeOutputForPostHog([])).toBeUndefined(); + }); + + it("skips non-assistant messages", () => { + const messages = [ + { role: "user", content: "hello" }, + { role: "tool", name: "exec", content: "result" }, + ]; + expect(normalizeOutputForPostHog(messages)).toBeUndefined(); + }); +}); + describe("emitGeneration", () => { let ph: ReturnType; let traceCtx: TraceContextManager; @@ -105,9 +189,12 @@ describe("emitGeneration", () => { expect(ph.capture.mock.calls[0][0].properties.$ai_total_cost_usd).toBeUndefined(); }); - it("sets $ai_total_cost_usd when cost is positive", () => { + it("sets $ai_total_cost_usd when cost is positive (via event.usage path)", () => { traceCtx.startTrace("s", "r"); - emitGeneration(ph, traceCtx, "s", { cost: { totalUsd: 0.05 } }, true); + emitGeneration(ph, traceCtx, "s", { + usage: { inputTokens: 10, outputTokens: 20 }, + cost: { totalUsd: 0.05 }, + }, true); expect(ph.capture.mock.calls[0][0].properties.$ai_total_cost_usd).toBe(0.05); }); @@ -127,6 +214,19 @@ describe("emitGeneration", () => { expect(props).not.toHaveProperty("$ai_output_tokens"); }); + it("extracts real token counts and cost from message usage metadata (OpenClaw fallback)", () => { + traceCtx.startTrace("s", "r"); + const messages = [ + { role: "user", content: "What is 2 + 2?" }, + { role: "assistant", content: "The answer is 4.", usage: { input: 15, output: 8, cost: { total: 0.0005 } } }, + ]; + emitGeneration(ph, traceCtx, "s", { messages }, true); + const props = ph.capture.mock.calls[0][0].properties; + expect(props.$ai_input_tokens).toBe(15); + expect(props.$ai_output_tokens).toBe(8); + expect(props.$ai_total_cost_usd).toBe(0.0005); + }); + it("captures error details when generation fails", () => { traceCtx.startTrace("s", "r"); emitGeneration(ph, traceCtx, "s", { error: { message: "Rate limit exceeded" } }, true);