fix(usage-log): remove writeQueues entries after each write settles
Each unique token-usage.json path was inserted into writeQueues and never removed, even after the stored promise settled successfully or with an error. In long-lived processes that touch many workspaces (e.g. ephemeral per-run directories) this causes unbounded Map growth and retains completed promises indefinitely — a memory leak introduced by the write-queue serialisation logic. Fix: capture the stored (rejection-suppressed) promise before setting it in the Map. After it settles, delete the Map entry iff it still holds the same promise — a concurrent call that queued a new write for the same path will have replaced the entry and owns its own cleanup. Adds two regression tests via _testOnly_getWriteQueueSize(): - N distinct paths all cleaned up after writes settle (Map → empty) - rejected write also cleans up its entry (no leak on error path)
This commit is contained in:
parent
8c2a7b4181
commit
313acd4463
@ -3,7 +3,7 @@ import fs from "fs/promises";
|
||||
import os from "os";
|
||||
import path from "path";
|
||||
import { describe, expect, it, beforeEach, afterEach } from "vitest";
|
||||
import { recordTokenUsage } from "./usage-log.js";
|
||||
import { recordTokenUsage, _testOnly_getWriteQueueSize } from "./usage-log.js";
|
||||
|
||||
describe("recordTokenUsage", () => {
|
||||
let tmpDir: string;
|
||||
@ -245,6 +245,52 @@ describe("recordTokenUsage", () => {
|
||||
}
|
||||
});
|
||||
|
||||
it("writeQueues entries are removed after each write settles — no unbounded Map growth", async () => {
|
||||
// Regression: writeQueues entries were never deleted after a write settled,
|
||||
// causing unbounded Map growth in long-lived processes that touch many
|
||||
// ephemeral workspace directories (one unique path → one permanent entry).
|
||||
//
|
||||
// Verify the fix: after all writes to N distinct paths complete, the Map
|
||||
// must be empty (no retained entries for settled paths).
|
||||
const N = 10;
|
||||
const dirs = await Promise.all(
|
||||
Array.from({ length: N }, () => fs.mkdtemp(path.join(os.tmpdir(), "usage-queue-leak-"))),
|
||||
);
|
||||
try {
|
||||
await Promise.all(
|
||||
dirs.map((dir) =>
|
||||
recordTokenUsage({
|
||||
workspaceDir: dir,
|
||||
label: "llm_output",
|
||||
usage: { input: 1, output: 1, total: 2 },
|
||||
}),
|
||||
),
|
||||
);
|
||||
// All writes are awaited; their stored promises must have settled and
|
||||
// been removed from the Map.
|
||||
expect(_testOnly_getWriteQueueSize()).toBe(0);
|
||||
} finally {
|
||||
await Promise.all(dirs.map((d) => fs.rm(d, { recursive: true, force: true })));
|
||||
}
|
||||
});
|
||||
|
||||
it("writeQueues entry is removed even when the write rejects", async () => {
|
||||
// Plant a non-array token-usage.json to trigger a rejection in appendRecord.
|
||||
// The Map entry must still be cleaned up — a failing write must not leak.
|
||||
await fs.mkdir(path.join(tmpDir, "memory"), { recursive: true });
|
||||
await fs.writeFile(path.join(tmpDir, "memory", "token-usage.json"), '"not-an-array"', "utf-8");
|
||||
|
||||
await expect(
|
||||
recordTokenUsage({
|
||||
workspaceDir: tmpDir,
|
||||
label: "llm_output",
|
||||
usage: { input: 1, output: 1, total: 2 },
|
||||
}),
|
||||
).rejects.toThrow();
|
||||
|
||||
expect(_testOnly_getWriteQueueSize()).toBe(0);
|
||||
});
|
||||
|
||||
it("serialises concurrent writes — no record is lost", async () => {
|
||||
const N = 20;
|
||||
await Promise.all(
|
||||
|
||||
@ -112,6 +112,11 @@ async function appendRecord(file: string, entry: TokenUsageRecord): Promise<void
|
||||
// the same process so they do not all contend on the cross-process file lock.
|
||||
const writeQueues = new Map<string, Promise<void>>();
|
||||
|
||||
/** Exposed for testing only — returns the current number of live queue entries. */
|
||||
export function _testOnly_getWriteQueueSize(): number {
|
||||
return writeQueues.size;
|
||||
}
|
||||
|
||||
export async function recordTokenUsage(params: {
|
||||
workspaceDir: string;
|
||||
runId?: string;
|
||||
@ -171,9 +176,19 @@ export async function recordTokenUsage(params: {
|
||||
|
||||
const queued = writeQueues.get(file) ?? Promise.resolve();
|
||||
const next = queued.then(() => appendRecord(file, entry));
|
||||
writeQueues.set(
|
||||
file,
|
||||
next.catch(() => {}),
|
||||
);
|
||||
// Suppress rejection so the Map value never holds a rejected promise, which
|
||||
// would cause unhandled-rejection noise for any future .then() chained on it.
|
||||
const stored = next.catch(() => {});
|
||||
writeQueues.set(file, stored);
|
||||
// Remove the entry once it settles. Check identity first: if another call
|
||||
// already enqueued a new write for the same path, writeQueues now holds that
|
||||
// newer promise and must not be deleted — the newer entry owns its own cleanup.
|
||||
// This prevents unbounded Map growth in long-lived processes that write to
|
||||
// many ephemeral workspace directories.
|
||||
void stored.then(() => {
|
||||
if (writeQueues.get(file) === stored) {
|
||||
writeQueues.delete(file);
|
||||
}
|
||||
});
|
||||
await next;
|
||||
}
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user