fix(usage-log): serialise concurrent writes with per-file promise queue
Fire-and-forget callers (attempt.ts) can trigger two concurrent recordTokenUsage() calls for the same workspaceDir. The previous read-modify-write pattern had no locking, so the last writer silently overwrote the first, losing that run's entry. Fix: keep a Map<file, Promise<void>> write queue so each write awaits the previous one. The queue slot is replaced with a no-throw wrapper so a failed write does not stall future writes. Added a concurrent-write test (20 parallel calls) that asserts no record is lost.
This commit is contained in:
parent
83a566ce99
commit
d03e7ae8ed
@ -119,4 +119,25 @@ describe("recordTokenUsage", () => {
|
||||
expect(records[0].inputTokens).toBe(100);
|
||||
expect(records[0].outputTokens).toBe(50);
|
||||
});
|
||||
|
||||
it("serialises concurrent writes — no record is lost", async () => {
|
||||
const N = 20;
|
||||
await Promise.all(
|
||||
Array.from({ length: N }, (_, i) =>
|
||||
recordTokenUsage({
|
||||
workspaceDir: tmpDir,
|
||||
label: "llm_output",
|
||||
usage: { input: i + 1, output: 1, total: i + 2 },
|
||||
}),
|
||||
),
|
||||
);
|
||||
|
||||
const records = JSON.parse(await fs.readFile(usageFile, "utf-8"));
|
||||
expect(records).toHaveLength(N);
|
||||
// Every distinct tokensUsed value must appear exactly once
|
||||
const totals = records
|
||||
.map((r: { tokensUsed: number }) => r.tokensUsed)
|
||||
.toSorted((a: number, b: number) => a - b);
|
||||
expect(totals).toEqual(Array.from({ length: N }, (_, i) => i + 2));
|
||||
});
|
||||
});
|
||||
|
||||
@ -33,6 +33,17 @@ async function readJsonArray(file: string): Promise<TokenUsageRecord[]> {
|
||||
}
|
||||
}
|
||||
|
||||
async function appendRecord(file: string, entry: TokenUsageRecord): Promise<void> {
|
||||
const records = await readJsonArray(file);
|
||||
records.push(entry);
|
||||
await fs.writeFile(file, JSON.stringify(records, null, 2));
|
||||
}
|
||||
|
||||
// Per-file write queue: serialises concurrent recordTokenUsage() calls so that
|
||||
// a fire-and-forget caller cannot cause two concurrent writers to read the same
|
||||
// snapshot and overwrite each other's entry.
|
||||
const writeQueues = new Map<string, Promise<void>>();
|
||||
|
||||
export async function recordTokenUsage(params: {
|
||||
workspaceDir: string;
|
||||
runId?: string;
|
||||
@ -83,7 +94,11 @@ export async function recordTokenUsage(params: {
|
||||
createdAt: new Date().toISOString(),
|
||||
};
|
||||
|
||||
const records = await readJsonArray(file);
|
||||
records.push(entry);
|
||||
await fs.writeFile(file, JSON.stringify(records, null, 2));
|
||||
const queued = writeQueues.get(file) ?? Promise.resolve();
|
||||
const next = queued.then(() => appendRecord(file, entry));
|
||||
writeQueues.set(
|
||||
file,
|
||||
next.catch(() => {}),
|
||||
);
|
||||
await next;
|
||||
}
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user