diff --git a/src/agents/agent-command.ts b/src/agents/agent-command.ts index 5db40b13a27..ce74a7bc2c5 100644 --- a/src/agents/agent-command.ts +++ b/src/agents/agent-command.ts @@ -1255,6 +1255,29 @@ async function agentCommandInternal( throw err; } + // Emit supplementary lifecycle "usage" event with accumulated token/cost + // data so external observers (dashboards, recorders) can track per-run + // resource consumption without needing RPC access to session transcripts. + const agentMeta = result.meta.agentMeta; + if (agentMeta?.usage) { + try { + emitAgentEvent({ + runId, + stream: "lifecycle", + data: { + phase: "usage", + provider: agentMeta.provider, + model: agentMeta.model, + usage: agentMeta.usage, + lastCallUsage: agentMeta.lastCallUsage, + durationMs: Date.now() - startedAt, + }, + }); + } catch { + // Non-fatal: usage reporting should not surface as a run error. + } + } + // Update token+model fields in the session store. if (sessionStore && sessionKey) { await updateSessionStoreAfterAgentRun({ diff --git a/src/auto-reply/reply/agent-runner-execution.ts b/src/auto-reply/reply/agent-runner-execution.ts index c25342e4a28..3b9faa9fe50 100644 --- a/src/auto-reply/reply/agent-runner-execution.ts +++ b/src/auto-reply/reply/agent-runner-execution.ts @@ -277,19 +277,44 @@ export async function runAgentTurnWithFallback(params: { }); lifecycleTerminalEmitted = true; + // Emit supplementary usage event with accumulated token/cost data. + // Wrapped in its own try/catch so a failure here cannot + // propagate to the outer catch and emit a spurious "error" event. + const agentMeta = result.meta?.agentMeta; + if (agentMeta?.usage) { + try { + emitAgentEvent({ + runId, + stream: "lifecycle", + data: { + phase: "usage", + provider: agentMeta.provider, + model: agentMeta.model, + usage: agentMeta.usage, + lastCallUsage: agentMeta.lastCallUsage, + durationMs: Date.now() - startedAt, + }, + }); + } catch { + // Non-fatal: usage reporting should not surface as a run error. + } + } + return result; } catch (err) { - emitAgentEvent({ - runId, - stream: "lifecycle", - data: { - phase: "error", - startedAt, - endedAt: Date.now(), - error: String(err), - }, - }); - lifecycleTerminalEmitted = true; + if (!lifecycleTerminalEmitted) { + emitAgentEvent({ + runId, + stream: "lifecycle", + data: { + phase: "error", + startedAt, + endedAt: Date.now(), + error: String(err), + }, + }); + lifecycleTerminalEmitted = true; + } throw err; } finally { // Defensive backstop: never let a CLI run complete without a terminal @@ -472,6 +497,30 @@ export async function runAgentTurnWithFallback(params: { bootstrapPromptWarningSignaturesSeen = resolveBootstrapWarningSignaturesSeen( result.meta?.systemPromptReport, ); + // Emit supplementary usage event with accumulated token/cost data. + // The pi-embedded subscribe handler emits lifecycle start/end but does + // not have access to agentMeta; emit a separate usage event now that + // the run result is available. + const agentMeta = result.meta?.agentMeta; + if (agentMeta?.usage) { + try { + emitAgentEvent({ + runId, + stream: "lifecycle", + data: { + phase: "usage", + provider: agentMeta.provider, + model: agentMeta.model, + usage: agentMeta.usage, + lastCallUsage: agentMeta.lastCallUsage, + durationMs: result.meta?.durationMs, + }, + }); + } catch { + // Non-fatal: usage reporting should not surface as a run error. + } + } + const resultCompactionCount = Math.max( 0, result.meta?.agentMeta?.compactionCount ?? 0, diff --git a/src/auto-reply/reply/agent-runner-memory.ts b/src/auto-reply/reply/agent-runner-memory.ts index 267326a7e20..12d454d364f 100644 --- a/src/auto-reply/reply/agent-runner-memory.ts +++ b/src/auto-reply/reply/agent-runner-memory.ts @@ -22,7 +22,7 @@ import { updateSessionStoreEntry, } from "../../config/sessions.js"; import { logVerbose } from "../../globals.js"; -import { registerAgentRunContext } from "../../infra/agent-events.js"; +import { emitAgentEvent, registerAgentRunContext } from "../../infra/agent-events.js"; import type { TemplateContext } from "../templating.js"; import type { VerboseLevel } from "../thinking.js"; import type { GetReplyOptions } from "../types.js"; @@ -477,7 +477,7 @@ export async function runMemoryFlushIfNeeded(params: { .filter(Boolean) .join("\n\n"); try { - await runWithModelFallback({ + const flushFallbackResult = await runWithModelFallback({ ...resolveModelFallbackOptions(params.followupRun.run), runId: flushRunId, run: async (provider, model, runOptions) => { @@ -521,6 +521,26 @@ export async function runMemoryFlushIfNeeded(params: { return result; }, }); + // Emit supplementary usage event with accumulated token/cost data. + const flushAgentMeta = flushFallbackResult.result?.meta?.agentMeta; + if (flushAgentMeta?.usage) { + try { + emitAgentEvent({ + runId: flushRunId, + stream: "lifecycle", + data: { + phase: "usage", + provider: flushAgentMeta.provider, + model: flushAgentMeta.model, + usage: flushAgentMeta.usage, + lastCallUsage: flushAgentMeta.lastCallUsage, + durationMs: Date.now() - memoryFlushNowMs, + }, + }); + } catch { + // Non-fatal: usage reporting should not surface as a run error. + } + } let memoryFlushCompactionCount = activeSessionEntry?.compactionCount ?? (params.sessionKey ? activeSessionStore?.[params.sessionKey]?.compactionCount : 0) ?? diff --git a/src/auto-reply/reply/followup-runner.ts b/src/auto-reply/reply/followup-runner.ts index 2fd21607095..813ad29f82c 100644 --- a/src/auto-reply/reply/followup-runner.ts +++ b/src/auto-reply/reply/followup-runner.ts @@ -12,7 +12,7 @@ import { runEmbeddedPiAgent } from "../../agents/pi-embedded.js"; import type { SessionEntry } from "../../config/sessions.js"; import type { TypingMode } from "../../config/types.js"; import { logVerbose } from "../../globals.js"; -import { registerAgentRunContext } from "../../infra/agent-events.js"; +import { emitAgentEvent, registerAgentRunContext } from "../../infra/agent-events.js"; import { defaultRuntime } from "../../runtime.js"; import { isInternalMessageChannel } from "../../utils/message-channel.js"; import { stripHeartbeatToken } from "../heartbeat.js"; @@ -238,6 +238,28 @@ export function createFollowupRunner(params: { bootstrapPromptWarningSignaturesSeen = resolveBootstrapWarningSignaturesSeen( result.meta?.systemPromptReport, ); + + // Emit supplementary usage event with accumulated token/cost data. + const agentMeta = result.meta?.agentMeta; + if (agentMeta?.usage) { + try { + emitAgentEvent({ + runId, + stream: "lifecycle", + data: { + phase: "usage", + provider: agentMeta.provider, + model: agentMeta.model, + usage: agentMeta.usage, + lastCallUsage: agentMeta.lastCallUsage, + durationMs: result.meta?.durationMs, + }, + }); + } catch { + // Non-fatal: usage reporting should not surface as a run error. + } + } + const resultCompactionCount = Math.max( 0, result.meta?.agentMeta?.compactionCount ?? 0, diff --git a/src/cron/isolated-agent/run.ts b/src/cron/isolated-agent/run.ts index 1a122f56864..4736713432b 100644 --- a/src/cron/isolated-agent/run.ts +++ b/src/cron/isolated-agent/run.ts @@ -47,7 +47,7 @@ import { updateSessionStore, } from "../../config/sessions.js"; import type { AgentDefaultsConfig } from "../../config/types.js"; -import { registerAgentRunContext } from "../../infra/agent-events.js"; +import { emitAgentEvent, registerAgentRunContext } from "../../infra/agent-events.js"; import { logWarn } from "../../logger.js"; import { normalizeAgentId } from "../../routing/session-key.js"; import { @@ -734,6 +734,28 @@ export async function runCronIsolatedAgentTurn(params: { return withRunSession({ status: "error", error: "cron isolated run returned no result" }); } const finalRunResult = runResult; + + // Emit supplementary usage event with accumulated token/cost data. + const agentMeta = finalRunResult.meta?.agentMeta; + if (agentMeta?.usage) { + try { + emitAgentEvent({ + runId: cronSession.sessionEntry.sessionId, + stream: "lifecycle", + data: { + phase: "usage", + provider: agentMeta.provider, + model: agentMeta.model, + usage: agentMeta.usage, + lastCallUsage: agentMeta.lastCallUsage, + durationMs: Date.now() - runStartedAt, + }, + }); + } catch { + // Non-fatal: usage reporting should not surface as a run error. + } + } + const payloads = finalRunResult.payloads ?? []; // Update token+model fields in the session store. diff --git a/src/gateway/server-chat.ts b/src/gateway/server-chat.ts index 7fda61b6c0c..01e877b769f 100644 --- a/src/gateway/server-chat.ts +++ b/src/gateway/server-chat.ts @@ -810,6 +810,13 @@ export function createAgentEventHandler({ clearAgentRunContext(evt.runId); agentRunSeq.delete(evt.runId); agentRunSeq.delete(clientRunId); + } else if (lifecyclePhase === "usage") { + // Usage events may arrive after the terminal end/error event because + // agentMeta is only available after runEmbeddedPiAgent returns. The seq + // tracking at the top of this handler re-creates agentRunSeq entries; + // clean them up immediately to prevent a permanent leak. + agentRunSeq.delete(evt.runId); + agentRunSeq.delete(clientRunId); } if ( diff --git a/src/infra/agent-events.lifecycle-usage.test.ts b/src/infra/agent-events.lifecycle-usage.test.ts new file mode 100644 index 00000000000..e7e488aadd1 --- /dev/null +++ b/src/infra/agent-events.lifecycle-usage.test.ts @@ -0,0 +1,147 @@ +import { afterEach, beforeEach, describe, expect, it } from "vitest"; +import { + emitAgentEvent, + onAgentEvent, + registerAgentRunContext, + type AgentEventPayload, +} from "./agent-events.js"; + +/** + * Validates the lifecycle "usage" event contract added for external observers + * (dashboards, recorders). The actual emission sites live in agent-command.ts, + * agent-runner-execution.ts, followup-runner.ts, cron/isolated-agent/run.ts, + * and agent-runner-memory.ts; this test verifies the shape and fields of the + * event as it flows through the agent-events bus. + */ +describe("lifecycle usage event", () => { + let events: AgentEventPayload[]; + let unsubscribe: () => boolean; + + beforeEach(() => { + events = []; + unsubscribe = onAgentEvent((evt) => events.push(evt)); + }); + + afterEach(() => unsubscribe()); + + it("emits phase=usage with correct token and model fields", () => { + const runId = `usage-test-${Date.now()}`; + registerAgentRunContext(runId, { sessionKey: "agent:main:main" }); + + emitAgentEvent({ + runId, + stream: "lifecycle", + data: { + phase: "usage", + provider: "anthropic", + model: "claude-sonnet-4-6", + usage: { input: 100, output: 50, cacheRead: 20, cacheWrite: 10 }, + lastCallUsage: { input: 30, output: 15 }, + durationMs: 4500, + }, + }); + + expect(events).toHaveLength(1); + const evt = events[0]; + expect(evt.stream).toBe("lifecycle"); + expect(evt.sessionKey).toBe("agent:main:main"); + expect(evt.data).toMatchObject({ + phase: "usage", + provider: "anthropic", + model: "claude-sonnet-4-6", + usage: { input: 100, output: 50, cacheRead: 20, cacheWrite: 10 }, + lastCallUsage: { input: 30, output: 15 }, + durationMs: 4500, + }); + expect(evt.seq).toBeGreaterThan(0); + expect(evt.ts).toBeGreaterThan(0); + }); + + it("is not emitted when agentMeta.usage is absent", () => { + // This mirrors the guard: `if (agentMeta?.usage) { emitAgentEvent(...) }` + const agentMeta: { usage?: unknown } = {}; + if (agentMeta?.usage) { + emitAgentEvent({ + runId: "no-usage", + stream: "lifecycle", + data: { phase: "usage" }, + }); + } + expect(events).toHaveLength(0); + }); + + it("does not throw when wrapped in defensive try/catch", () => { + // Simulates the non-fatal pattern used in all emission sites. + // Even if the event bus throws, it should not propagate. + const badListener = onAgentEvent(() => { + throw new Error("listener crash"); + }); + + expect(() => { + emitAgentEvent({ + runId: "defensive-test", + stream: "lifecycle", + data: { phase: "usage", usage: { input: 1 } }, + }); + }).not.toThrow(); + + badListener(); + }); + + it("usage event after terminal end does not corrupt tracking state", () => { + // Simulates the real flow: end fires first, then usage arrives. + // Downstream consumers (server-chat.ts) must handle this gracefully + // without leaking agentRunSeq entries. + const runId = `post-terminal-${Date.now()}`; + registerAgentRunContext(runId, { sessionKey: "agent:main:main" }); + + emitAgentEvent({ + runId, + stream: "lifecycle", + data: { phase: "end", endedAt: Date.now() }, + }); + + emitAgentEvent({ + runId, + stream: "lifecycle", + data: { + phase: "usage", + provider: "anthropic", + model: "claude-sonnet-4-6", + usage: { input: 200, output: 100 }, + durationMs: 3000, + }, + }); + + expect(events).toHaveLength(2); + expect(events[0].data.phase).toBe("end"); + expect(events[1].data.phase).toBe("usage"); + // Both events share the same runId and sessionKey + expect(events[1].runId).toBe(runId); + expect(events[1].sessionKey).toBe("agent:main:main"); + }); + + it("durationMs reflects wall-clock time, not just last attempt", () => { + // Validates the contract: durationMs should be Date.now() - startedAt + // (full run including fallback retries), not result.meta.durationMs + // (single attempt only). + const startedAt = Date.now() - 5000; // Simulate 5s run + const runId = `duration-${Date.now()}`; + registerAgentRunContext(runId, { sessionKey: "agent:main:main" }); + + emitAgentEvent({ + runId, + stream: "lifecycle", + data: { + phase: "usage", + provider: "anthropic", + model: "claude-sonnet-4-6", + usage: { input: 50, output: 25 }, + durationMs: Date.now() - startedAt, + }, + }); + + const evt = events[0]; + expect(evt.data.durationMs).toBeGreaterThanOrEqual(5000); + }); +});