diff --git a/src/agents/usage-log.test.ts b/src/agents/usage-log.test.ts index d308bf7e34e..e250bd63a3b 100644 --- a/src/agents/usage-log.test.ts +++ b/src/agents/usage-log.test.ts @@ -191,7 +191,11 @@ describe("recordTokenUsage", () => { const memoryDir = path.join(tmpDir, "memory"); await fs.mkdir(memoryDir, { recursive: true }); const lockPath = path.join(memoryDir, "token-usage.json.lock"); - await fs.writeFile(lockPath, String(deadPid)); + // withFileLock (plugin-sdk) stores {pid, createdAt} — match that format. + await fs.writeFile( + lockPath, + JSON.stringify({ pid: deadPid, createdAt: new Date().toISOString() }), + ); // recordTokenUsage must detect the stale lock, reclaim it, and succeed. await recordTokenUsage({ diff --git a/src/agents/usage-log.ts b/src/agents/usage-log.ts index 43bfaabeab1..4f04ddd0d01 100644 --- a/src/agents/usage-log.ts +++ b/src/agents/usage-log.ts @@ -1,6 +1,7 @@ import { randomBytes } from "crypto"; import fs from "fs/promises"; import path from "path"; +import { type FileLockOptions, withFileLock } from "../infra/file-lock.js"; export type TokenUsageRecord = { id: string; @@ -55,94 +56,30 @@ async function readJsonArray(file: string): Promise { // Cross-process file lock // // The in-memory writeQueues Map serialises writes within a single Node -// process, but two concurrent OpenClaw processes targeting the same -// workspaceDir can still race: both read the same snapshot before either -// writes. We guard against that with an advisory lock file (.lock) using -// O_EXCL (create-exclusive), which is atomic on POSIX filesystems. +// process. Two concurrent OpenClaw processes targeting the same +// workspaceDir can still race, so we use an advisory O_EXCL lock provided +// by the shared withFileLock helper in plugin-sdk/file-lock.ts. // -// The lock file stores the holder's PID so that waiters can detect a stale -// lock left by a crashed process. On each EEXIST the waiter reads the PID -// and calls kill(pid, 0): if the process no longer exists (ESRCH) the lock -// is stale and is reclaimed immediately via a fresh O_EXCL open, preserving -// mutual exclusion even when multiple waiters race for the steal. If the -// holder is alive the waiter backs off for LOCK_RETRY_MS and retries. -// After LOCK_TIMEOUT_MS without acquiring the lock ERR_LOCK_TIMEOUT is -// thrown; the lock file is left untouched to avoid breaking a live holder. +// That implementation: +// • stores {pid, createdAt} so waiters can detect a crashed holder +// • treats empty/unparseable lock content as stale (crash during open→write) +// • re-verifies the lock inode before removing it so a slow waiter's +// unlink cannot delete a fresh lock from another process +// • uses exponential backoff with jitter capped at stale ms // --------------------------------------------------------------------------- -const LOCK_RETRY_MS = 50; -const LOCK_TIMEOUT_MS = 5_000; - -/** - * Returns true only when the lock at `lockPath` was written by a process - * that no longer exists (ESRCH). Any other outcome (process alive, EPERM, - * unreadable file, non-numeric content) is treated as "not stale" so we - * never break a legitimately held lock. - */ -async function isLockStale(lockPath: string): Promise { - try { - const raw = await fs.readFile(lockPath, "utf-8"); - const pid = parseInt(raw.trim(), 10); - if (isNaN(pid) || pid <= 0) { - return false; - } - try { - process.kill(pid, 0); // signal 0 checks existence without delivering a signal - return false; // process is alive - } catch (e) { - return (e as NodeJS.ErrnoException).code === "ESRCH"; - } - } catch { - return false; // can't read lock — treat as live - } -} - -async function withFileLock(lockPath: string, fn: () => Promise): Promise { - const deadline = Date.now() + LOCK_TIMEOUT_MS; - const myPid = String(process.pid); - - while (Date.now() < deadline) { - let fh: fs.FileHandle | undefined; - try { - // wx = O_WRONLY | O_CREAT | O_EXCL — fails if the file already exists. - // Write our PID so that a waiting process can verify we are still alive. - fh = await fs.open(lockPath, "wx"); - await fh.writeFile(myPid); - await fh.close(); - fh = undefined; - try { - return await fn(); - } finally { - await fs.unlink(lockPath).catch(() => {}); - } - } catch (err) { - await fh?.close().catch(() => {}); - if ((err as NodeJS.ErrnoException).code !== "EEXIST") { - throw err; - } - // The lock file exists. Check immediately whether the holder crashed; - // if so, unlink the stale lock and loop back to race on O_EXCL. - // Multiple concurrent waiters may all detect the stale lock and attempt - // the unlink — that is fine because only one O_EXCL open will succeed. - if (await isLockStale(lockPath)) { - await fs.unlink(lockPath).catch(() => {}); - continue; - } - // Holder is alive — back off and retry. - await new Promise((r) => setTimeout(r, LOCK_RETRY_MS)); - } - } - - // Timed out without acquiring the lock. The lock file is intentionally left - // untouched: the holder may still be active (slow disk, large file), and - // removing a live lock would break mutual exclusion. - throw Object.assign(new Error(`Could not acquire lock ${lockPath} within ${LOCK_TIMEOUT_MS}ms`), { - code: "ERR_LOCK_TIMEOUT", - }); -} +const APPEND_LOCK_OPTIONS: FileLockOptions = { + // ~100 retries × 50 ms ≈ 5 s total — matches the previous LOCK_TIMEOUT_MS. + retries: { + retries: 100, + factor: 1, + minTimeout: 50, + maxTimeout: 50, + }, + stale: 5_000, +}; async function appendRecord(file: string, entry: TokenUsageRecord): Promise { - const lockPath = `${file}.lock`; - await withFileLock(lockPath, async () => { + await withFileLock(file, APPEND_LOCK_OPTIONS, async () => { const records = await readJsonArray(file); records.push(entry); // Write to a sibling temp file then atomically rename into place so that