fix(usage-log): add cross-process file lock to prevent write races
The in-memory writeQueues Map serialises writes within one Node process but two concurrent OpenClaw processes sharing the same workspaceDir (e.g. parallel CLI runs) can still race: both read the same snapshot before either writes, and the later writer silently overwrites the earlier entry. Add withFileLock() — an O_EXCL advisory lock on <file>.lock — to coordinate across processes. The per-file in-memory queue is kept to reduce lock contention within the same process. On lock-acquire failure the helper retries every 50 ms up to a 5 s timeout; on timeout it removes a potentially stale lock file and makes one final attempt to prevent permanent blocking after a crash.
This commit is contained in:
parent
f4c4ab416d
commit
f267ff7888
@ -140,6 +140,25 @@ describe("recordTokenUsage", () => {
|
||||
expect(content).toBe('{"broken":true');
|
||||
});
|
||||
|
||||
it("cross-process lock: concurrent writers via file lock do not lose records", async () => {
|
||||
// Simulate two processes bypassing the in-memory queue by calling
|
||||
// recordTokenUsage from independent promise chains simultaneously.
|
||||
// If the file lock is working they must still land all records.
|
||||
const N = 10;
|
||||
const writes = Array.from({ length: N }, (_, i) => {
|
||||
// Each call is deliberately NOT chained — they race on the file lock.
|
||||
return recordTokenUsage({
|
||||
workspaceDir: tmpDir,
|
||||
label: "llm_output",
|
||||
usage: { input: i + 1, output: 1, total: i + 2 },
|
||||
});
|
||||
});
|
||||
await Promise.all(writes);
|
||||
|
||||
const records = JSON.parse(await fs.readFile(usageFile, "utf-8"));
|
||||
expect(records).toHaveLength(N);
|
||||
});
|
||||
|
||||
it("serialises concurrent writes — no record is lost", async () => {
|
||||
const N = 20;
|
||||
await Promise.all(
|
||||
|
||||
@ -40,15 +40,64 @@ async function readJsonArray(file: string): Promise<TokenUsageRecord[]> {
|
||||
}
|
||||
}
|
||||
|
||||
async function appendRecord(file: string, entry: TokenUsageRecord): Promise<void> {
|
||||
const records = await readJsonArray(file);
|
||||
records.push(entry);
|
||||
await fs.writeFile(file, JSON.stringify(records, null, 2));
|
||||
// ---------------------------------------------------------------------------
|
||||
// Cross-process file lock
|
||||
//
|
||||
// The in-memory writeQueues Map serialises writes within a single Node
|
||||
// process, but two concurrent OpenClaw processes targeting the same
|
||||
// workspaceDir can still race: both read the same snapshot before either
|
||||
// writes. We guard against that with an advisory lock file (.lock) using
|
||||
// O_EXCL (create-exclusive), which is atomic on POSIX filesystems.
|
||||
//
|
||||
// Lock acquisition retries with a fixed interval up to LOCK_TIMEOUT_MS.
|
||||
// If the holding process crashes the stale lock is removed after the
|
||||
// timeout so subsequent callers are not permanently blocked.
|
||||
// ---------------------------------------------------------------------------
|
||||
const LOCK_RETRY_MS = 50;
|
||||
const LOCK_TIMEOUT_MS = 5_000;
|
||||
|
||||
async function withFileLock<T>(lockPath: string, fn: () => Promise<T>): Promise<T> {
|
||||
const deadline = Date.now() + LOCK_TIMEOUT_MS;
|
||||
|
||||
while (Date.now() < deadline) {
|
||||
let fh: fs.FileHandle | undefined;
|
||||
try {
|
||||
// wx = O_WRONLY | O_CREAT | O_EXCL — fails if the file already exists
|
||||
fh = await fs.open(lockPath, "wx");
|
||||
await fh.close();
|
||||
fh = undefined;
|
||||
try {
|
||||
return await fn();
|
||||
} finally {
|
||||
await fs.unlink(lockPath).catch(() => {});
|
||||
}
|
||||
} catch (err) {
|
||||
await fh?.close().catch(() => {});
|
||||
if ((err as NodeJS.ErrnoException).code !== "EEXIST") {
|
||||
throw err;
|
||||
}
|
||||
// Another process holds the lock — wait and retry.
|
||||
await new Promise<void>((r) => setTimeout(r, LOCK_RETRY_MS));
|
||||
}
|
||||
}
|
||||
|
||||
// Timeout: remove a potentially stale lock and make one final attempt.
|
||||
await fs.unlink(lockPath).catch(() => {});
|
||||
const records = await fn();
|
||||
return records;
|
||||
}
|
||||
|
||||
// Per-file write queue: serialises concurrent recordTokenUsage() calls so that
|
||||
// a fire-and-forget caller cannot cause two concurrent writers to read the same
|
||||
// snapshot and overwrite each other's entry.
|
||||
async function appendRecord(file: string, entry: TokenUsageRecord): Promise<void> {
|
||||
const lockPath = `${file}.lock`;
|
||||
await withFileLock(lockPath, async () => {
|
||||
const records = await readJsonArray(file);
|
||||
records.push(entry);
|
||||
await fs.writeFile(file, JSON.stringify(records, null, 2));
|
||||
});
|
||||
}
|
||||
|
||||
// Per-file write queue: serialises concurrent recordTokenUsage() calls within
|
||||
// the same process so they do not all contend on the cross-process file lock.
|
||||
const writeQueues = new Map<string, Promise<void>>();
|
||||
|
||||
export async function recordTokenUsage(params: {
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user