diff --git a/src/agents/usage-log.test.ts b/src/agents/usage-log.test.ts index b61b3154703..e61c48af23d 100644 --- a/src/agents/usage-log.test.ts +++ b/src/agents/usage-log.test.ts @@ -140,6 +140,25 @@ describe("recordTokenUsage", () => { expect(content).toBe('{"broken":true'); }); + it("cross-process lock: concurrent writers via file lock do not lose records", async () => { + // Simulate two processes bypassing the in-memory queue by calling + // recordTokenUsage from independent promise chains simultaneously. + // If the file lock is working they must still land all records. + const N = 10; + const writes = Array.from({ length: N }, (_, i) => { + // Each call is deliberately NOT chained — they race on the file lock. + return recordTokenUsage({ + workspaceDir: tmpDir, + label: "llm_output", + usage: { input: i + 1, output: 1, total: i + 2 }, + }); + }); + await Promise.all(writes); + + const records = JSON.parse(await fs.readFile(usageFile, "utf-8")); + expect(records).toHaveLength(N); + }); + it("serialises concurrent writes — no record is lost", async () => { const N = 20; await Promise.all( diff --git a/src/agents/usage-log.ts b/src/agents/usage-log.ts index 992c3e945ea..22966a1cee8 100644 --- a/src/agents/usage-log.ts +++ b/src/agents/usage-log.ts @@ -40,15 +40,64 @@ async function readJsonArray(file: string): Promise { } } -async function appendRecord(file: string, entry: TokenUsageRecord): Promise { - const records = await readJsonArray(file); - records.push(entry); - await fs.writeFile(file, JSON.stringify(records, null, 2)); +// --------------------------------------------------------------------------- +// Cross-process file lock +// +// The in-memory writeQueues Map serialises writes within a single Node +// process, but two concurrent OpenClaw processes targeting the same +// workspaceDir can still race: both read the same snapshot before either +// writes. We guard against that with an advisory lock file (.lock) using +// O_EXCL (create-exclusive), which is atomic on POSIX filesystems. +// +// Lock acquisition retries with a fixed interval up to LOCK_TIMEOUT_MS. +// If the holding process crashes the stale lock is removed after the +// timeout so subsequent callers are not permanently blocked. +// --------------------------------------------------------------------------- +const LOCK_RETRY_MS = 50; +const LOCK_TIMEOUT_MS = 5_000; + +async function withFileLock(lockPath: string, fn: () => Promise): Promise { + const deadline = Date.now() + LOCK_TIMEOUT_MS; + + while (Date.now() < deadline) { + let fh: fs.FileHandle | undefined; + try { + // wx = O_WRONLY | O_CREAT | O_EXCL — fails if the file already exists + fh = await fs.open(lockPath, "wx"); + await fh.close(); + fh = undefined; + try { + return await fn(); + } finally { + await fs.unlink(lockPath).catch(() => {}); + } + } catch (err) { + await fh?.close().catch(() => {}); + if ((err as NodeJS.ErrnoException).code !== "EEXIST") { + throw err; + } + // Another process holds the lock — wait and retry. + await new Promise((r) => setTimeout(r, LOCK_RETRY_MS)); + } + } + + // Timeout: remove a potentially stale lock and make one final attempt. + await fs.unlink(lockPath).catch(() => {}); + const records = await fn(); + return records; } -// Per-file write queue: serialises concurrent recordTokenUsage() calls so that -// a fire-and-forget caller cannot cause two concurrent writers to read the same -// snapshot and overwrite each other's entry. +async function appendRecord(file: string, entry: TokenUsageRecord): Promise { + const lockPath = `${file}.lock`; + await withFileLock(lockPath, async () => { + const records = await readJsonArray(file); + records.push(entry); + await fs.writeFile(file, JSON.stringify(records, null, 2)); + }); +} + +// Per-file write queue: serialises concurrent recordTokenUsage() calls within +// the same process so they do not all contend on the cross-process file lock. const writeQueues = new Map>(); export async function recordTokenUsage(params: {