diff --git a/src/agents/usage-log.test.ts b/src/agents/usage-log.test.ts index 37ea1a92e54..d308bf7e34e 100644 --- a/src/agents/usage-log.test.ts +++ b/src/agents/usage-log.test.ts @@ -1,3 +1,4 @@ +import { spawn } from "child_process"; import fs from "fs/promises"; import os from "os"; import path from "path"; @@ -177,6 +178,39 @@ describe("recordTokenUsage", () => { expect(records).toHaveLength(N); }); + it("reclaims stale lock left by a crashed process", async () => { + // Spawn a subprocess that exits immediately, then use its (now-dead) PID + // to simulate a lock file left behind after an abnormal exit. + const deadPid = await new Promise((resolve, reject) => { + const child = spawn(process.execPath, ["-e", "process.exit(0)"]); + const pid = child.pid!; + child.on("exit", () => resolve(pid)); + child.on("error", reject); + }); + + const memoryDir = path.join(tmpDir, "memory"); + await fs.mkdir(memoryDir, { recursive: true }); + const lockPath = path.join(memoryDir, "token-usage.json.lock"); + await fs.writeFile(lockPath, String(deadPid)); + + // recordTokenUsage must detect the stale lock, reclaim it, and succeed. + await recordTokenUsage({ + workspaceDir: tmpDir, + label: "llm_output", + usage: { input: 100, output: 50, total: 150 }, + }); + + const records = JSON.parse(await fs.readFile(usageFile, "utf-8")); + expect(records).toHaveLength(1); + expect(records[0].tokensUsed).toBe(150); + // Lock file must be cleaned up by the winner. + const lockExists = await fs + .access(lockPath) + .then(() => true) + .catch(() => false); + expect(lockExists).toBe(false); + }); + it("serialises concurrent writes — no record is lost", async () => { const N = 20; await Promise.all( diff --git a/src/agents/usage-log.ts b/src/agents/usage-log.ts index 1738e0db4c1..c8a1cdb254c 100644 --- a/src/agents/usage-log.ts +++ b/src/agents/usage-log.ts @@ -60,21 +60,53 @@ async function readJsonArray(file: string): Promise { // writes. We guard against that with an advisory lock file (.lock) using // O_EXCL (create-exclusive), which is atomic on POSIX filesystems. // -// Lock acquisition retries with a fixed interval up to LOCK_TIMEOUT_MS. -// On timeout ERR_LOCK_TIMEOUT is thrown; the lock file is intentionally left -// untouched so that an active holder's mutual exclusion is never broken. +// The lock file stores the holder's PID so that waiters can detect a stale +// lock left by a crashed process. On each EEXIST the waiter reads the PID +// and calls kill(pid, 0): if the process no longer exists (ESRCH) the lock +// is stale and is reclaimed immediately via a fresh O_EXCL open, preserving +// mutual exclusion even when multiple waiters race for the steal. If the +// holder is alive the waiter backs off for LOCK_RETRY_MS and retries. +// After LOCK_TIMEOUT_MS without acquiring the lock ERR_LOCK_TIMEOUT is +// thrown; the lock file is left untouched to avoid breaking a live holder. // --------------------------------------------------------------------------- const LOCK_RETRY_MS = 50; const LOCK_TIMEOUT_MS = 5_000; +/** + * Returns true only when the lock at `lockPath` was written by a process + * that no longer exists (ESRCH). Any other outcome (process alive, EPERM, + * unreadable file, non-numeric content) is treated as "not stale" so we + * never break a legitimately held lock. + */ +async function isLockStale(lockPath: string): Promise { + try { + const raw = await fs.readFile(lockPath, "utf-8"); + const pid = parseInt(raw.trim(), 10); + if (isNaN(pid) || pid <= 0) { + return false; + } + try { + process.kill(pid, 0); // signal 0 checks existence without delivering a signal + return false; // process is alive + } catch (e) { + return (e as NodeJS.ErrnoException).code === "ESRCH"; + } + } catch { + return false; // can't read lock — treat as live + } +} + async function withFileLock(lockPath: string, fn: () => Promise): Promise { const deadline = Date.now() + LOCK_TIMEOUT_MS; + const myPid = String(process.pid); while (Date.now() < deadline) { let fh: fs.FileHandle | undefined; try { - // wx = O_WRONLY | O_CREAT | O_EXCL — fails if the file already exists + // wx = O_WRONLY | O_CREAT | O_EXCL — fails if the file already exists. + // Write our PID so that a waiting process can verify we are still alive. fh = await fs.open(lockPath, "wx"); + await fh.writeFile(myPid); await fh.close(); fh = undefined; try { @@ -87,15 +119,22 @@ async function withFileLock(lockPath: string, fn: () => Promise): Promise< if ((err as NodeJS.ErrnoException).code !== "EEXIST") { throw err; } - // Another process holds the lock — wait and retry. + // The lock file exists. Check immediately whether the holder crashed; + // if so, unlink the stale lock and loop back to race on O_EXCL. + // Multiple concurrent waiters may all detect the stale lock and attempt + // the unlink — that is fine because only one O_EXCL open will succeed. + if (await isLockStale(lockPath)) { + await fs.unlink(lockPath).catch(() => {}); + continue; + } + // Holder is alive — back off and retry. await new Promise((r) => setTimeout(r, LOCK_RETRY_MS)); } } - // Timed out without acquiring the lock. We deliberately do NOT delete the - // lock file here: the holder may still be active (slow disk, large file), - // and removing a live lock would break mutual exclusion and allow concurrent - // read-modify-write cycles to overwrite each other's entries. + // Timed out without acquiring the lock. The lock file is intentionally left + // untouched: the holder may still be active (slow disk, large file), and + // removing a live lock would break mutual exclusion. throw Object.assign(new Error(`Could not acquire lock ${lockPath} within ${LOCK_TIMEOUT_MS}ms`), { code: "ERR_LOCK_TIMEOUT", });