diff --git a/src/agents/usage-log.test.ts b/src/agents/usage-log.test.ts index 0922a328989..822e67fab49 100644 --- a/src/agents/usage-log.test.ts +++ b/src/agents/usage-log.test.ts @@ -3,7 +3,7 @@ import fs from "fs/promises"; import os from "os"; import path from "path"; import { describe, expect, it, beforeEach, afterEach } from "vitest"; -import { recordTokenUsage } from "./usage-log.js"; +import { recordTokenUsage, _testOnly_getWriteQueueSize } from "./usage-log.js"; describe("recordTokenUsage", () => { let tmpDir: string; @@ -245,6 +245,52 @@ describe("recordTokenUsage", () => { } }); + it("writeQueues entries are removed after each write settles — no unbounded Map growth", async () => { + // Regression: writeQueues entries were never deleted after a write settled, + // causing unbounded Map growth in long-lived processes that touch many + // ephemeral workspace directories (one unique path → one permanent entry). + // + // Verify the fix: after all writes to N distinct paths complete, the Map + // must be empty (no retained entries for settled paths). + const N = 10; + const dirs = await Promise.all( + Array.from({ length: N }, () => fs.mkdtemp(path.join(os.tmpdir(), "usage-queue-leak-"))), + ); + try { + await Promise.all( + dirs.map((dir) => + recordTokenUsage({ + workspaceDir: dir, + label: "llm_output", + usage: { input: 1, output: 1, total: 2 }, + }), + ), + ); + // All writes are awaited; their stored promises must have settled and + // been removed from the Map. + expect(_testOnly_getWriteQueueSize()).toBe(0); + } finally { + await Promise.all(dirs.map((d) => fs.rm(d, { recursive: true, force: true }))); + } + }); + + it("writeQueues entry is removed even when the write rejects", async () => { + // Plant a non-array token-usage.json to trigger a rejection in appendRecord. + // The Map entry must still be cleaned up — a failing write must not leak. + await fs.mkdir(path.join(tmpDir, "memory"), { recursive: true }); + await fs.writeFile(path.join(tmpDir, "memory", "token-usage.json"), '"not-an-array"', "utf-8"); + + await expect( + recordTokenUsage({ + workspaceDir: tmpDir, + label: "llm_output", + usage: { input: 1, output: 1, total: 2 }, + }), + ).rejects.toThrow(); + + expect(_testOnly_getWriteQueueSize()).toBe(0); + }); + it("serialises concurrent writes — no record is lost", async () => { const N = 20; await Promise.all( diff --git a/src/agents/usage-log.ts b/src/agents/usage-log.ts index 9ebb3ff580b..d154133e24a 100644 --- a/src/agents/usage-log.ts +++ b/src/agents/usage-log.ts @@ -112,6 +112,11 @@ async function appendRecord(file: string, entry: TokenUsageRecord): Promise>(); +/** Exposed for testing only — returns the current number of live queue entries. */ +export function _testOnly_getWriteQueueSize(): number { + return writeQueues.size; +} + export async function recordTokenUsage(params: { workspaceDir: string; runId?: string; @@ -171,9 +176,19 @@ export async function recordTokenUsage(params: { const queued = writeQueues.get(file) ?? Promise.resolve(); const next = queued.then(() => appendRecord(file, entry)); - writeQueues.set( - file, - next.catch(() => {}), - ); + // Suppress rejection so the Map value never holds a rejected promise, which + // would cause unhandled-rejection noise for any future .then() chained on it. + const stored = next.catch(() => {}); + writeQueues.set(file, stored); + // Remove the entry once it settles. Check identity first: if another call + // already enqueued a new write for the same path, writeQueues now holds that + // newer promise and must not be deleted — the newer entry owns its own cleanup. + // This prevents unbounded Map growth in long-lived processes that write to + // many ephemeral workspace directories. + void stored.then(() => { + if (writeQueues.get(file) === stored) { + writeQueues.delete(file); + } + }); await next; }