From d03e7ae8ed39c6126f0a9d5f59cebc44b2e97796 Mon Sep 17 00:00:00 2001 From: jiarung Date: Fri, 13 Mar 2026 09:37:25 +0000 Subject: [PATCH] fix(usage-log): serialise concurrent writes with per-file promise queue Fire-and-forget callers (attempt.ts) can trigger two concurrent recordTokenUsage() calls for the same workspaceDir. The previous read-modify-write pattern had no locking, so the last writer silently overwrote the first, losing that run's entry. Fix: keep a Map> write queue so each write awaits the previous one. The queue slot is replaced with a no-throw wrapper so a failed write does not stall future writes. Added a concurrent-write test (20 parallel calls) that asserts no record is lost. --- src/agents/usage-log.test.ts | 21 +++++++++++++++++++++ src/agents/usage-log.ts | 21 ++++++++++++++++++--- 2 files changed, 39 insertions(+), 3 deletions(-) diff --git a/src/agents/usage-log.test.ts b/src/agents/usage-log.test.ts index 19420b5e3fd..ed2efd755df 100644 --- a/src/agents/usage-log.test.ts +++ b/src/agents/usage-log.test.ts @@ -119,4 +119,25 @@ describe("recordTokenUsage", () => { expect(records[0].inputTokens).toBe(100); expect(records[0].outputTokens).toBe(50); }); + + it("serialises concurrent writes — no record is lost", async () => { + const N = 20; + await Promise.all( + Array.from({ length: N }, (_, i) => + recordTokenUsage({ + workspaceDir: tmpDir, + label: "llm_output", + usage: { input: i + 1, output: 1, total: i + 2 }, + }), + ), + ); + + const records = JSON.parse(await fs.readFile(usageFile, "utf-8")); + expect(records).toHaveLength(N); + // Every distinct tokensUsed value must appear exactly once + const totals = records + .map((r: { tokensUsed: number }) => r.tokensUsed) + .toSorted((a: number, b: number) => a - b); + expect(totals).toEqual(Array.from({ length: N }, (_, i) => i + 2)); + }); }); diff --git a/src/agents/usage-log.ts b/src/agents/usage-log.ts index 469ee6915e8..a6e180b958d 100644 --- a/src/agents/usage-log.ts +++ b/src/agents/usage-log.ts @@ -33,6 +33,17 @@ async function readJsonArray(file: string): Promise { } } +async function appendRecord(file: string, entry: TokenUsageRecord): Promise { + const records = await readJsonArray(file); + records.push(entry); + await fs.writeFile(file, JSON.stringify(records, null, 2)); +} + +// Per-file write queue: serialises concurrent recordTokenUsage() calls so that +// a fire-and-forget caller cannot cause two concurrent writers to read the same +// snapshot and overwrite each other's entry. +const writeQueues = new Map>(); + export async function recordTokenUsage(params: { workspaceDir: string; runId?: string; @@ -83,7 +94,11 @@ export async function recordTokenUsage(params: { createdAt: new Date().toISOString(), }; - const records = await readJsonArray(file); - records.push(entry); - await fs.writeFile(file, JSON.stringify(records, null, 2)); + const queued = writeQueues.get(file) ?? Promise.resolve(); + const next = queued.then(() => appendRecord(file, entry)); + writeQueues.set( + file, + next.catch(() => {}), + ); + await next; }