Merge ac9fb10643991c750a02cb63f270f1974d67760f into 9fb78453e088cd7b553d7779faa0de5c83708e70

This commit is contained in:
Studio729 2026-03-21 05:26:55 +00:00 committed by GitHub
commit 3b2a160bb3
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
7 changed files with 305 additions and 15 deletions

View File

@ -1255,6 +1255,29 @@ async function agentCommandInternal(
throw err;
}
// Emit supplementary lifecycle "usage" event with accumulated token/cost
// data so external observers (dashboards, recorders) can track per-run
// resource consumption without needing RPC access to session transcripts.
const agentMeta = result.meta.agentMeta;
if (agentMeta?.usage) {
try {
emitAgentEvent({
runId,
stream: "lifecycle",
data: {
phase: "usage",
provider: agentMeta.provider,
model: agentMeta.model,
usage: agentMeta.usage,
lastCallUsage: agentMeta.lastCallUsage,
durationMs: Date.now() - startedAt,
},
});
} catch {
// Non-fatal: usage reporting should not surface as a run error.
}
}
// Update token+model fields in the session store.
if (sessionStore && sessionKey) {
await updateSessionStoreAfterAgentRun({

View File

@ -277,19 +277,44 @@ export async function runAgentTurnWithFallback(params: {
});
lifecycleTerminalEmitted = true;
// Emit supplementary usage event with accumulated token/cost data.
// Wrapped in its own try/catch so a failure here cannot
// propagate to the outer catch and emit a spurious "error" event.
const agentMeta = result.meta?.agentMeta;
if (agentMeta?.usage) {
try {
emitAgentEvent({
runId,
stream: "lifecycle",
data: {
phase: "usage",
provider: agentMeta.provider,
model: agentMeta.model,
usage: agentMeta.usage,
lastCallUsage: agentMeta.lastCallUsage,
durationMs: Date.now() - startedAt,
},
});
} catch {
// Non-fatal: usage reporting should not surface as a run error.
}
}
return result;
} catch (err) {
emitAgentEvent({
runId,
stream: "lifecycle",
data: {
phase: "error",
startedAt,
endedAt: Date.now(),
error: String(err),
},
});
lifecycleTerminalEmitted = true;
if (!lifecycleTerminalEmitted) {
emitAgentEvent({
runId,
stream: "lifecycle",
data: {
phase: "error",
startedAt,
endedAt: Date.now(),
error: String(err),
},
});
lifecycleTerminalEmitted = true;
}
throw err;
} finally {
// Defensive backstop: never let a CLI run complete without a terminal
@ -472,6 +497,30 @@ export async function runAgentTurnWithFallback(params: {
bootstrapPromptWarningSignaturesSeen = resolveBootstrapWarningSignaturesSeen(
result.meta?.systemPromptReport,
);
// Emit supplementary usage event with accumulated token/cost data.
// The pi-embedded subscribe handler emits lifecycle start/end but does
// not have access to agentMeta; emit a separate usage event now that
// the run result is available.
const agentMeta = result.meta?.agentMeta;
if (agentMeta?.usage) {
try {
emitAgentEvent({
runId,
stream: "lifecycle",
data: {
phase: "usage",
provider: agentMeta.provider,
model: agentMeta.model,
usage: agentMeta.usage,
lastCallUsage: agentMeta.lastCallUsage,
durationMs: result.meta?.durationMs,
},
});
} catch {
// Non-fatal: usage reporting should not surface as a run error.
}
}
const resultCompactionCount = Math.max(
0,
result.meta?.agentMeta?.compactionCount ?? 0,

View File

@ -22,7 +22,7 @@ import {
updateSessionStoreEntry,
} from "../../config/sessions.js";
import { logVerbose } from "../../globals.js";
import { registerAgentRunContext } from "../../infra/agent-events.js";
import { emitAgentEvent, registerAgentRunContext } from "../../infra/agent-events.js";
import type { TemplateContext } from "../templating.js";
import type { VerboseLevel } from "../thinking.js";
import type { GetReplyOptions } from "../types.js";
@ -477,7 +477,7 @@ export async function runMemoryFlushIfNeeded(params: {
.filter(Boolean)
.join("\n\n");
try {
await runWithModelFallback({
const flushFallbackResult = await runWithModelFallback({
...resolveModelFallbackOptions(params.followupRun.run),
runId: flushRunId,
run: async (provider, model, runOptions) => {
@ -521,6 +521,26 @@ export async function runMemoryFlushIfNeeded(params: {
return result;
},
});
// Emit supplementary usage event with accumulated token/cost data.
const flushAgentMeta = flushFallbackResult.result?.meta?.agentMeta;
if (flushAgentMeta?.usage) {
try {
emitAgentEvent({
runId: flushRunId,
stream: "lifecycle",
data: {
phase: "usage",
provider: flushAgentMeta.provider,
model: flushAgentMeta.model,
usage: flushAgentMeta.usage,
lastCallUsage: flushAgentMeta.lastCallUsage,
durationMs: Date.now() - memoryFlushNowMs,
},
});
} catch {
// Non-fatal: usage reporting should not surface as a run error.
}
}
let memoryFlushCompactionCount =
activeSessionEntry?.compactionCount ??
(params.sessionKey ? activeSessionStore?.[params.sessionKey]?.compactionCount : 0) ??

View File

@ -12,7 +12,7 @@ import { runEmbeddedPiAgent } from "../../agents/pi-embedded.js";
import type { SessionEntry } from "../../config/sessions.js";
import type { TypingMode } from "../../config/types.js";
import { logVerbose } from "../../globals.js";
import { registerAgentRunContext } from "../../infra/agent-events.js";
import { emitAgentEvent, registerAgentRunContext } from "../../infra/agent-events.js";
import { defaultRuntime } from "../../runtime.js";
import { isInternalMessageChannel } from "../../utils/message-channel.js";
import { stripHeartbeatToken } from "../heartbeat.js";
@ -238,6 +238,28 @@ export function createFollowupRunner(params: {
bootstrapPromptWarningSignaturesSeen = resolveBootstrapWarningSignaturesSeen(
result.meta?.systemPromptReport,
);
// Emit supplementary usage event with accumulated token/cost data.
const agentMeta = result.meta?.agentMeta;
if (agentMeta?.usage) {
try {
emitAgentEvent({
runId,
stream: "lifecycle",
data: {
phase: "usage",
provider: agentMeta.provider,
model: agentMeta.model,
usage: agentMeta.usage,
lastCallUsage: agentMeta.lastCallUsage,
durationMs: result.meta?.durationMs,
},
});
} catch {
// Non-fatal: usage reporting should not surface as a run error.
}
}
const resultCompactionCount = Math.max(
0,
result.meta?.agentMeta?.compactionCount ?? 0,

View File

@ -47,7 +47,7 @@ import {
updateSessionStore,
} from "../../config/sessions.js";
import type { AgentDefaultsConfig } from "../../config/types.js";
import { registerAgentRunContext } from "../../infra/agent-events.js";
import { emitAgentEvent, registerAgentRunContext } from "../../infra/agent-events.js";
import { logWarn } from "../../logger.js";
import { normalizeAgentId } from "../../routing/session-key.js";
import {
@ -734,6 +734,28 @@ export async function runCronIsolatedAgentTurn(params: {
return withRunSession({ status: "error", error: "cron isolated run returned no result" });
}
const finalRunResult = runResult;
// Emit supplementary usage event with accumulated token/cost data.
const agentMeta = finalRunResult.meta?.agentMeta;
if (agentMeta?.usage) {
try {
emitAgentEvent({
runId: cronSession.sessionEntry.sessionId,
stream: "lifecycle",
data: {
phase: "usage",
provider: agentMeta.provider,
model: agentMeta.model,
usage: agentMeta.usage,
lastCallUsage: agentMeta.lastCallUsage,
durationMs: Date.now() - runStartedAt,
},
});
} catch {
// Non-fatal: usage reporting should not surface as a run error.
}
}
const payloads = finalRunResult.payloads ?? [];
// Update token+model fields in the session store.

View File

@ -810,6 +810,13 @@ export function createAgentEventHandler({
clearAgentRunContext(evt.runId);
agentRunSeq.delete(evt.runId);
agentRunSeq.delete(clientRunId);
} else if (lifecyclePhase === "usage") {
// Usage events may arrive after the terminal end/error event because
// agentMeta is only available after runEmbeddedPiAgent returns. The seq
// tracking at the top of this handler re-creates agentRunSeq entries;
// clean them up immediately to prevent a permanent leak.
agentRunSeq.delete(evt.runId);
agentRunSeq.delete(clientRunId);
}
if (

View File

@ -0,0 +1,147 @@
import { afterEach, beforeEach, describe, expect, it } from "vitest";
import {
emitAgentEvent,
onAgentEvent,
registerAgentRunContext,
type AgentEventPayload,
} from "./agent-events.js";
/**
* Validates the lifecycle "usage" event contract added for external observers
* (dashboards, recorders). The actual emission sites live in agent-command.ts,
* agent-runner-execution.ts, followup-runner.ts, cron/isolated-agent/run.ts,
* and agent-runner-memory.ts; this test verifies the shape and fields of the
* event as it flows through the agent-events bus.
*/
describe("lifecycle usage event", () => {
let events: AgentEventPayload[];
let unsubscribe: () => boolean;
beforeEach(() => {
events = [];
unsubscribe = onAgentEvent((evt) => events.push(evt));
});
afterEach(() => unsubscribe());
it("emits phase=usage with correct token and model fields", () => {
const runId = `usage-test-${Date.now()}`;
registerAgentRunContext(runId, { sessionKey: "agent:main:main" });
emitAgentEvent({
runId,
stream: "lifecycle",
data: {
phase: "usage",
provider: "anthropic",
model: "claude-sonnet-4-6",
usage: { input: 100, output: 50, cacheRead: 20, cacheWrite: 10 },
lastCallUsage: { input: 30, output: 15 },
durationMs: 4500,
},
});
expect(events).toHaveLength(1);
const evt = events[0];
expect(evt.stream).toBe("lifecycle");
expect(evt.sessionKey).toBe("agent:main:main");
expect(evt.data).toMatchObject({
phase: "usage",
provider: "anthropic",
model: "claude-sonnet-4-6",
usage: { input: 100, output: 50, cacheRead: 20, cacheWrite: 10 },
lastCallUsage: { input: 30, output: 15 },
durationMs: 4500,
});
expect(evt.seq).toBeGreaterThan(0);
expect(evt.ts).toBeGreaterThan(0);
});
it("is not emitted when agentMeta.usage is absent", () => {
// This mirrors the guard: `if (agentMeta?.usage) { emitAgentEvent(...) }`
const agentMeta: { usage?: unknown } = {};
if (agentMeta?.usage) {
emitAgentEvent({
runId: "no-usage",
stream: "lifecycle",
data: { phase: "usage" },
});
}
expect(events).toHaveLength(0);
});
it("does not throw when wrapped in defensive try/catch", () => {
// Simulates the non-fatal pattern used in all emission sites.
// Even if the event bus throws, it should not propagate.
const badListener = onAgentEvent(() => {
throw new Error("listener crash");
});
expect(() => {
emitAgentEvent({
runId: "defensive-test",
stream: "lifecycle",
data: { phase: "usage", usage: { input: 1 } },
});
}).not.toThrow();
badListener();
});
it("usage event after terminal end does not corrupt tracking state", () => {
// Simulates the real flow: end fires first, then usage arrives.
// Downstream consumers (server-chat.ts) must handle this gracefully
// without leaking agentRunSeq entries.
const runId = `post-terminal-${Date.now()}`;
registerAgentRunContext(runId, { sessionKey: "agent:main:main" });
emitAgentEvent({
runId,
stream: "lifecycle",
data: { phase: "end", endedAt: Date.now() },
});
emitAgentEvent({
runId,
stream: "lifecycle",
data: {
phase: "usage",
provider: "anthropic",
model: "claude-sonnet-4-6",
usage: { input: 200, output: 100 },
durationMs: 3000,
},
});
expect(events).toHaveLength(2);
expect(events[0].data.phase).toBe("end");
expect(events[1].data.phase).toBe("usage");
// Both events share the same runId and sessionKey
expect(events[1].runId).toBe(runId);
expect(events[1].sessionKey).toBe("agent:main:main");
});
it("durationMs reflects wall-clock time, not just last attempt", () => {
// Validates the contract: durationMs should be Date.now() - startedAt
// (full run including fallback retries), not result.meta.durationMs
// (single attempt only).
const startedAt = Date.now() - 5000; // Simulate 5s run
const runId = `duration-${Date.now()}`;
registerAgentRunContext(runId, { sessionKey: "agent:main:main" });
emitAgentEvent({
runId,
stream: "lifecycle",
data: {
phase: "usage",
provider: "anthropic",
model: "claude-sonnet-4-6",
usage: { input: 50, output: 25 },
durationMs: Date.now() - startedAt,
},
});
const evt = events[0];
expect(evt.data.durationMs).toBeGreaterThanOrEqual(5000);
});
});