Merge b556486d22abefff23789403603f8c2ffa61a6bb into 598f1826d8b2bc969aace2c6459824737667218c
This commit is contained in:
commit
4250dfc876
@ -3,6 +3,16 @@
|
||||
set -euo pipefail
|
||||
|
||||
ROOT_DIR="$(git rev-parse --show-toplevel 2>/dev/null || pwd)"
|
||||
|
||||
# Resolve node when not in PATH (e.g. nvm environments where the shell
|
||||
# profile hasn't been sourced by the git hook). Picks the newest installed
|
||||
# nvm version using version-aware sort so that v22 is preferred over v18.
|
||||
_resolve_node_sh="$ROOT_DIR/scripts/pre-commit/resolve-node.sh"
|
||||
if [[ -f "$_resolve_node_sh" ]]; then
|
||||
# shellcheck source=scripts/pre-commit/resolve-node.sh
|
||||
source "$_resolve_node_sh"
|
||||
fi
|
||||
unset _resolve_node_sh
|
||||
RUN_NODE_TOOL="$ROOT_DIR/scripts/pre-commit/run-node-tool.sh"
|
||||
FILTER_FILES="$ROOT_DIR/scripts/pre-commit/filter-staged-files.mjs"
|
||||
|
||||
|
||||
29
scripts/pre-commit/resolve-node.sh
Normal file
29
scripts/pre-commit/resolve-node.sh
Normal file
@ -0,0 +1,29 @@
|
||||
#!/usr/bin/env bash
|
||||
# Resolve the newest nvm-managed Node when it is not already in PATH.
|
||||
# Source this file; do not execute it directly.
|
||||
#
|
||||
# Cross-platform: avoids GNU-only `sort -V` (not supported by BSD sort on
|
||||
# macOS). Instead, each semver component is zero-padded to five digits so
|
||||
# that a plain lexicographic sort correctly selects the highest semantic
|
||||
# version (e.g. v22.x wins over v18.x).
|
||||
if ! command -v node >/dev/null 2>&1; then
|
||||
_nvm_node=$(
|
||||
for _p in "$HOME/.nvm/versions/node"/*/bin/node; do
|
||||
[[ -x "$_p" ]] || continue
|
||||
_ver="${_p%/bin/node}"; _ver="${_ver##*/v}"
|
||||
IFS=. read -r _ma _mi _pa <<< "$_ver"
|
||||
# Strip any non-numeric suffix so prerelease tags (e.g. "0-rc.1", "0-nightly")
|
||||
# do not make printf '%05d' fail under set -euo pipefail.
|
||||
# ${var%%pattern} removes the longest suffix matching pattern, so
|
||||
# "0-rc.1" → "0" and "14" → "14" (no-op when already numeric-only).
|
||||
_ma="${_ma%%[^0-9]*}"
|
||||
_mi="${_mi%%[^0-9]*}"
|
||||
_pa="${_pa%%[^0-9]*}"
|
||||
printf '%05d%05d%05d\t%s\n' "${_ma:-0}" "${_mi:-0}" "${_pa:-0}" "$_p"
|
||||
done | sort | tail -1 | cut -f2-
|
||||
)
|
||||
if [[ -x "$_nvm_node" ]]; then
|
||||
export PATH="$(dirname "$_nvm_node"):$PATH"
|
||||
fi
|
||||
unset _nvm_node _p _ver _ma _mi _pa
|
||||
fi
|
||||
@ -3,6 +3,11 @@ set -euo pipefail
|
||||
|
||||
ROOT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")/../.." && pwd)"
|
||||
|
||||
# Resolve node when not in PATH (e.g. nvm environments). Picks the newest
|
||||
# installed nvm version using version-aware sort.
|
||||
# shellcheck source=resolve-node.sh
|
||||
source "$ROOT_DIR/scripts/pre-commit/resolve-node.sh"
|
||||
|
||||
if [[ $# -lt 1 ]]; then
|
||||
echo "usage: run-node-tool.sh <tool> [args...]" >&2
|
||||
exit 2
|
||||
|
||||
@ -100,6 +100,7 @@ import { resolveEffectiveToolFsWorkspaceOnly } from "../../tool-fs-policy.js";
|
||||
import { normalizeToolName } from "../../tool-policy.js";
|
||||
import type { TranscriptPolicy } from "../../transcript-policy.js";
|
||||
import { resolveTranscriptPolicy } from "../../transcript-policy.js";
|
||||
import { recordTokenUsage } from "../../usage-log.js";
|
||||
import { DEFAULT_BOOTSTRAP_FILENAME } from "../../workspace.js";
|
||||
import { isRunnerAbortError } from "../abort.js";
|
||||
import { appendCacheTtlTimestamp, isCacheTtlEligibleProvider } from "../cache-ttl.js";
|
||||
@ -3156,6 +3157,19 @@ export async function runEmbeddedAttempt(
|
||||
});
|
||||
}
|
||||
|
||||
recordTokenUsage({
|
||||
workspaceDir: effectiveWorkspace,
|
||||
runId: params.runId,
|
||||
sessionId: params.sessionId,
|
||||
sessionKey: params.sessionKey,
|
||||
provider: params.provider,
|
||||
model: params.modelId,
|
||||
label: "llm_output",
|
||||
usage: getUsageTotals(),
|
||||
}).catch((err) => {
|
||||
log.warn(`token usage log failed: ${String(err)}`);
|
||||
});
|
||||
|
||||
return {
|
||||
aborted,
|
||||
timedOut,
|
||||
|
||||
314
src/agents/usage-log.test.ts
Normal file
314
src/agents/usage-log.test.ts
Normal file
@ -0,0 +1,314 @@
|
||||
import { spawn } from "child_process";
|
||||
import fs from "fs/promises";
|
||||
import os from "os";
|
||||
import path from "path";
|
||||
import { describe, expect, it, beforeEach, afterEach } from "vitest";
|
||||
import { recordTokenUsage, _testOnly_getWriteQueueSize } from "./usage-log.js";
|
||||
|
||||
describe("recordTokenUsage", () => {
|
||||
let tmpDir: string;
|
||||
let usageFile: string;
|
||||
|
||||
beforeEach(async () => {
|
||||
tmpDir = await fs.mkdtemp(path.join(os.tmpdir(), "usage-log-test-"));
|
||||
usageFile = path.join(tmpDir, "memory", "token-usage.json");
|
||||
});
|
||||
|
||||
afterEach(async () => {
|
||||
await fs.rm(tmpDir, { recursive: true, force: true });
|
||||
});
|
||||
|
||||
it("writes inputTokens and outputTokens when provided", async () => {
|
||||
await recordTokenUsage({
|
||||
workspaceDir: tmpDir,
|
||||
label: "llm_output",
|
||||
model: "claude-sonnet-4-6",
|
||||
provider: "anthropic",
|
||||
usage: { input: 1000, output: 500, total: 1500 },
|
||||
});
|
||||
|
||||
const records = JSON.parse(await fs.readFile(usageFile, "utf-8"));
|
||||
expect(records).toHaveLength(1);
|
||||
expect(records[0].tokensUsed).toBe(1500);
|
||||
expect(records[0].inputTokens).toBe(1000);
|
||||
expect(records[0].outputTokens).toBe(500);
|
||||
expect(records[0].cacheReadTokens).toBeUndefined();
|
||||
expect(records[0].cacheWriteTokens).toBeUndefined();
|
||||
});
|
||||
|
||||
it("writes cacheReadTokens and cacheWriteTokens when provided", async () => {
|
||||
await recordTokenUsage({
|
||||
workspaceDir: tmpDir,
|
||||
label: "llm_output",
|
||||
usage: { input: 800, output: 200, cacheRead: 300, cacheWrite: 100, total: 1400 },
|
||||
});
|
||||
|
||||
const records = JSON.parse(await fs.readFile(usageFile, "utf-8"));
|
||||
expect(records[0].inputTokens).toBe(800);
|
||||
expect(records[0].outputTokens).toBe(200);
|
||||
expect(records[0].cacheReadTokens).toBe(300);
|
||||
expect(records[0].cacheWriteTokens).toBe(100);
|
||||
});
|
||||
|
||||
it("omits IO fields when usage only has total (legacy records)", async () => {
|
||||
await recordTokenUsage({
|
||||
workspaceDir: tmpDir,
|
||||
label: "llm_output",
|
||||
usage: { total: 28402 },
|
||||
});
|
||||
|
||||
const records = JSON.parse(await fs.readFile(usageFile, "utf-8"));
|
||||
expect(records[0].tokensUsed).toBe(28402);
|
||||
expect(records[0].inputTokens).toBeUndefined();
|
||||
expect(records[0].outputTokens).toBeUndefined();
|
||||
});
|
||||
|
||||
it("skips writing when usage is undefined", async () => {
|
||||
await recordTokenUsage({
|
||||
workspaceDir: tmpDir,
|
||||
label: "llm_output",
|
||||
usage: undefined,
|
||||
});
|
||||
|
||||
const exists = await fs
|
||||
.access(usageFile)
|
||||
.then(() => true)
|
||||
.catch(() => false);
|
||||
expect(exists).toBe(false);
|
||||
});
|
||||
|
||||
it("skips writing when total is zero", async () => {
|
||||
await recordTokenUsage({
|
||||
workspaceDir: tmpDir,
|
||||
label: "llm_output",
|
||||
usage: { input: 0, output: 0 },
|
||||
});
|
||||
|
||||
const exists = await fs
|
||||
.access(usageFile)
|
||||
.then(() => true)
|
||||
.catch(() => false);
|
||||
expect(exists).toBe(false);
|
||||
});
|
||||
|
||||
it("appends multiple records to the same file", async () => {
|
||||
await recordTokenUsage({
|
||||
workspaceDir: tmpDir,
|
||||
label: "llm_output",
|
||||
usage: { input: 100, output: 50, total: 150 },
|
||||
});
|
||||
await recordTokenUsage({
|
||||
workspaceDir: tmpDir,
|
||||
label: "llm_output",
|
||||
usage: { input: 200, output: 80, total: 280 },
|
||||
});
|
||||
|
||||
const records = JSON.parse(await fs.readFile(usageFile, "utf-8"));
|
||||
expect(records).toHaveLength(2);
|
||||
expect(records[0].inputTokens).toBe(100);
|
||||
expect(records[1].inputTokens).toBe(200);
|
||||
});
|
||||
|
||||
it("truncates fractional tokens", async () => {
|
||||
await recordTokenUsage({
|
||||
workspaceDir: tmpDir,
|
||||
label: "llm_output",
|
||||
usage: { input: 100.9, output: 50.1, total: 151 },
|
||||
});
|
||||
|
||||
const records = JSON.parse(await fs.readFile(usageFile, "utf-8"));
|
||||
expect(records[0].inputTokens).toBe(100);
|
||||
expect(records[0].outputTokens).toBe(50);
|
||||
});
|
||||
|
||||
it("does not overwrite a valid-but-non-array token-usage.json — rejects unexpected shape", async () => {
|
||||
// Simulate a manual edit or migration that left a valid JSON object
|
||||
await fs.mkdir(path.join(tmpDir, "memory"), { recursive: true });
|
||||
await fs.writeFile(usageFile, '{"legacy": true, "records": []}', "utf-8");
|
||||
|
||||
await expect(
|
||||
recordTokenUsage({
|
||||
workspaceDir: tmpDir,
|
||||
label: "llm_output",
|
||||
usage: { input: 100, output: 50, total: 150 },
|
||||
}),
|
||||
).rejects.toThrow("not an array");
|
||||
|
||||
// File must be unchanged — the legacy data is preserved.
|
||||
const content = await fs.readFile(usageFile, "utf-8");
|
||||
expect(content).toBe('{"legacy": true, "records": []}');
|
||||
});
|
||||
|
||||
it("does not overwrite a malformed token-usage.json — preserves corrupted file", async () => {
|
||||
// Simulate an interrupted write that left partial JSON
|
||||
await fs.mkdir(path.join(tmpDir, "memory"), { recursive: true });
|
||||
await fs.writeFile(usageFile, '{"broken":true', "utf-8");
|
||||
|
||||
// recordTokenUsage must reject (caller is responsible for handling, e.g.
|
||||
// attempt.ts uses .catch()) and must NOT overwrite the existing file.
|
||||
await expect(
|
||||
recordTokenUsage({
|
||||
workspaceDir: tmpDir,
|
||||
label: "llm_output",
|
||||
usage: { input: 100, output: 50, total: 150 },
|
||||
}),
|
||||
).rejects.toThrow(SyntaxError);
|
||||
|
||||
// File must still contain the original corrupted content, not a new array.
|
||||
const content = await fs.readFile(usageFile, "utf-8");
|
||||
expect(content).toBe('{"broken":true');
|
||||
});
|
||||
|
||||
it("cross-process lock: concurrent writers via file lock do not lose records", async () => {
|
||||
// Simulate two processes bypassing the in-memory queue by calling
|
||||
// recordTokenUsage from independent promise chains simultaneously.
|
||||
// If the file lock is working they must still land all records.
|
||||
const N = 10;
|
||||
const writes = Array.from({ length: N }, (_, i) => {
|
||||
// Each call is deliberately NOT chained — they race on the file lock.
|
||||
return recordTokenUsage({
|
||||
workspaceDir: tmpDir,
|
||||
label: "llm_output",
|
||||
usage: { input: i + 1, output: 1, total: i + 2 },
|
||||
});
|
||||
});
|
||||
await Promise.all(writes);
|
||||
|
||||
const records = JSON.parse(await fs.readFile(usageFile, "utf-8"));
|
||||
expect(records).toHaveLength(N);
|
||||
});
|
||||
|
||||
it("reclaims stale lock left by a crashed process", async () => {
|
||||
// Spawn a subprocess that exits immediately, then use its (now-dead) PID
|
||||
// to simulate a lock file left behind after an abnormal exit.
|
||||
const deadPid = await new Promise<number>((resolve, reject) => {
|
||||
const child = spawn(process.execPath, ["-e", "process.exit(0)"]);
|
||||
const pid = child.pid!;
|
||||
child.on("exit", () => resolve(pid));
|
||||
child.on("error", reject);
|
||||
});
|
||||
|
||||
const memoryDir = path.join(tmpDir, "memory");
|
||||
await fs.mkdir(memoryDir, { recursive: true });
|
||||
const lockPath = path.join(memoryDir, "token-usage.json.lock");
|
||||
// withFileLock (plugin-sdk) stores {pid, createdAt} — match that format.
|
||||
await fs.writeFile(
|
||||
lockPath,
|
||||
JSON.stringify({ pid: deadPid, createdAt: new Date().toISOString() }),
|
||||
);
|
||||
|
||||
// recordTokenUsage must detect the stale lock, reclaim it, and succeed.
|
||||
await recordTokenUsage({
|
||||
workspaceDir: tmpDir,
|
||||
label: "llm_output",
|
||||
usage: { input: 100, output: 50, total: 150 },
|
||||
});
|
||||
|
||||
const records = JSON.parse(await fs.readFile(usageFile, "utf-8"));
|
||||
expect(records).toHaveLength(1);
|
||||
expect(records[0].tokensUsed).toBe(150);
|
||||
// Lock file must be cleaned up by the winner.
|
||||
const lockExists = await fs
|
||||
.access(lockPath)
|
||||
.then(() => true)
|
||||
.catch(() => false);
|
||||
expect(lockExists).toBe(false);
|
||||
});
|
||||
|
||||
it("different path spellings for the same workspace share one queue — no record is lost", async () => {
|
||||
// Symlink tmpDir → another name so the same physical directory has two
|
||||
// spellings. Without queue-key canonicalisation both spellings create
|
||||
// independent writeQueues entries; when one chain holds the file lock
|
||||
// (HELD_LOCKS set) the other re-entrantly joins it and both execute the
|
||||
// read-modify-write cycle concurrently, silently dropping entries.
|
||||
const symlinkDir = `${tmpDir}-symlink`;
|
||||
await fs.symlink(tmpDir, symlinkDir);
|
||||
try {
|
||||
// Mix canonical and symlink paths across concurrent writes.
|
||||
const N = 6;
|
||||
await Promise.all(
|
||||
Array.from({ length: N }, (_, i) =>
|
||||
recordTokenUsage({
|
||||
workspaceDir: i % 2 === 0 ? tmpDir : symlinkDir,
|
||||
label: "llm_output",
|
||||
usage: { input: i + 1, output: 1, total: i + 2 },
|
||||
}),
|
||||
),
|
||||
);
|
||||
|
||||
// All N records must survive — none may be lost to a concurrent
|
||||
// read-modify-write collision.
|
||||
const records = JSON.parse(await fs.readFile(usageFile, "utf-8"));
|
||||
expect(records).toHaveLength(N);
|
||||
} finally {
|
||||
await fs.unlink(symlinkDir).catch(() => {});
|
||||
}
|
||||
});
|
||||
|
||||
it("writeQueues entries are removed after each write settles — no unbounded Map growth", async () => {
|
||||
// Regression: writeQueues entries were never deleted after a write settled,
|
||||
// causing unbounded Map growth in long-lived processes that touch many
|
||||
// ephemeral workspace directories (one unique path → one permanent entry).
|
||||
//
|
||||
// Verify the fix: after all writes to N distinct paths complete, the Map
|
||||
// must be empty (no retained entries for settled paths).
|
||||
const N = 10;
|
||||
const dirs = await Promise.all(
|
||||
Array.from({ length: N }, () => fs.mkdtemp(path.join(os.tmpdir(), "usage-queue-leak-"))),
|
||||
);
|
||||
try {
|
||||
await Promise.all(
|
||||
dirs.map((dir) =>
|
||||
recordTokenUsage({
|
||||
workspaceDir: dir,
|
||||
label: "llm_output",
|
||||
usage: { input: 1, output: 1, total: 2 },
|
||||
}),
|
||||
),
|
||||
);
|
||||
// All writes are awaited; their stored promises must have settled and
|
||||
// been removed from the Map.
|
||||
expect(_testOnly_getWriteQueueSize()).toBe(0);
|
||||
} finally {
|
||||
await Promise.all(dirs.map((d) => fs.rm(d, { recursive: true, force: true })));
|
||||
}
|
||||
});
|
||||
|
||||
it("writeQueues entry is removed even when the write rejects", async () => {
|
||||
// Plant a non-array token-usage.json to trigger a rejection in appendRecord.
|
||||
// The Map entry must still be cleaned up — a failing write must not leak.
|
||||
await fs.mkdir(path.join(tmpDir, "memory"), { recursive: true });
|
||||
await fs.writeFile(path.join(tmpDir, "memory", "token-usage.json"), '"not-an-array"', "utf-8");
|
||||
|
||||
await expect(
|
||||
recordTokenUsage({
|
||||
workspaceDir: tmpDir,
|
||||
label: "llm_output",
|
||||
usage: { input: 1, output: 1, total: 2 },
|
||||
}),
|
||||
).rejects.toThrow();
|
||||
|
||||
expect(_testOnly_getWriteQueueSize()).toBe(0);
|
||||
});
|
||||
|
||||
it("serialises concurrent writes — no record is lost", async () => {
|
||||
const N = 20;
|
||||
await Promise.all(
|
||||
Array.from({ length: N }, (_, i) =>
|
||||
recordTokenUsage({
|
||||
workspaceDir: tmpDir,
|
||||
label: "llm_output",
|
||||
usage: { input: i + 1, output: 1, total: i + 2 },
|
||||
}),
|
||||
),
|
||||
);
|
||||
|
||||
const records = JSON.parse(await fs.readFile(usageFile, "utf-8"));
|
||||
expect(records).toHaveLength(N);
|
||||
// Every distinct tokensUsed value must appear exactly once
|
||||
const totals = records
|
||||
.map((r: { tokensUsed: number }) => r.tokensUsed)
|
||||
.toSorted((a: number, b: number) => a - b);
|
||||
expect(totals).toEqual(Array.from({ length: N }, (_, i) => i + 2));
|
||||
});
|
||||
});
|
||||
194
src/agents/usage-log.ts
Normal file
194
src/agents/usage-log.ts
Normal file
@ -0,0 +1,194 @@
|
||||
import { randomBytes } from "crypto";
|
||||
import fs from "fs/promises";
|
||||
import path from "path";
|
||||
import { type FileLockOptions, withFileLock } from "../infra/file-lock.js";
|
||||
|
||||
export type TokenUsageRecord = {
|
||||
id: string;
|
||||
label: string;
|
||||
tokensUsed: number;
|
||||
tokenLimit?: number;
|
||||
inputTokens?: number;
|
||||
outputTokens?: number;
|
||||
cacheReadTokens?: number;
|
||||
cacheWriteTokens?: number;
|
||||
model?: string;
|
||||
provider?: string;
|
||||
runId?: string;
|
||||
sessionId?: string;
|
||||
sessionKey?: string;
|
||||
createdAt: string;
|
||||
};
|
||||
|
||||
function makeId() {
|
||||
return `usage_${Date.now().toString(36)}_${randomBytes(4).toString("hex")}`;
|
||||
}
|
||||
|
||||
async function readJsonArray(file: string): Promise<TokenUsageRecord[]> {
|
||||
try {
|
||||
const raw = await fs.readFile(file, "utf-8");
|
||||
const parsed = JSON.parse(raw);
|
||||
if (!Array.isArray(parsed)) {
|
||||
// Valid JSON but unexpected shape (object, number, string, …).
|
||||
// Returning [] here would cause appendRecord to overwrite the file
|
||||
// with only the new entry, silently deleting prior data.
|
||||
throw Object.assign(
|
||||
new Error(
|
||||
`token-usage.json contains valid JSON but is not an array (got ${typeof parsed})`,
|
||||
),
|
||||
{ code: "ERR_UNEXPECTED_TOKEN_LOG_SHAPE" },
|
||||
);
|
||||
}
|
||||
return parsed as TokenUsageRecord[];
|
||||
} catch (err) {
|
||||
// File does not exist yet — start with an empty array.
|
||||
if ((err as NodeJS.ErrnoException).code === "ENOENT") {
|
||||
return [];
|
||||
}
|
||||
// Any other error (malformed JSON, permission denied, partial write, …)
|
||||
// must propagate so appendRecord aborts and the existing file is not
|
||||
// silently overwritten with only the new entry.
|
||||
throw err;
|
||||
}
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// Cross-process file lock
|
||||
//
|
||||
// The in-memory writeQueues Map serialises writes within a single Node
|
||||
// process. Two concurrent OpenClaw processes targeting the same
|
||||
// workspaceDir can still race, so we use an advisory O_EXCL lock provided
|
||||
// by the shared withFileLock helper in plugin-sdk/file-lock.ts.
|
||||
//
|
||||
// That implementation:
|
||||
// • stores {pid, createdAt} so waiters can detect a crashed holder
|
||||
// • treats empty/unparseable lock content as stale (crash during open→write)
|
||||
// • re-verifies the lock inode before removing it so a slow waiter's
|
||||
// unlink cannot delete a fresh lock from another process
|
||||
// • uses exponential backoff with jitter capped at stale ms
|
||||
// ---------------------------------------------------------------------------
|
||||
const APPEND_LOCK_OPTIONS: FileLockOptions = {
|
||||
// appendRecord rewrites the full token-usage.json on every call, so hold
|
||||
// time scales with file size and disk speed. 5 s was too tight: a slow
|
||||
// write on a large history could exceed the stale window and allow a
|
||||
// concurrent waiter to steal an active lock, causing overlapping
|
||||
// read-modify-write cycles that silently drop entries.
|
||||
//
|
||||
// 30 s gives plenty of headroom for large files on slow disks while still
|
||||
// reclaiming locks left by crashed processes within a reasonable window.
|
||||
// The retry budget (~150 × 200 ms = 30 s) matches the stale window so
|
||||
// waiters exhaust retries only if the holder holds longer than stale,
|
||||
// i.e., is genuinely stuck or crashed.
|
||||
retries: {
|
||||
retries: 150,
|
||||
factor: 1,
|
||||
minTimeout: 200,
|
||||
maxTimeout: 200,
|
||||
randomize: true,
|
||||
},
|
||||
stale: 30_000,
|
||||
};
|
||||
|
||||
async function appendRecord(file: string, entry: TokenUsageRecord): Promise<void> {
|
||||
await withFileLock(file, APPEND_LOCK_OPTIONS, async () => {
|
||||
const records = await readJsonArray(file);
|
||||
records.push(entry);
|
||||
// Write to a sibling temp file then atomically rename into place so that
|
||||
// a crash or kill during the write never leaves token-usage.json truncated.
|
||||
// rename(2) is atomic on POSIX when src and dst are on the same filesystem,
|
||||
// which is guaranteed here because both paths share the same directory.
|
||||
const tmp = `${file}.tmp.${randomBytes(4).toString("hex")}`;
|
||||
try {
|
||||
await fs.writeFile(tmp, JSON.stringify(records, null, 2));
|
||||
await fs.rename(tmp, file);
|
||||
} catch (err) {
|
||||
await fs.unlink(tmp).catch(() => {});
|
||||
throw err;
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
// Per-file write queue: serialises concurrent recordTokenUsage() calls within
|
||||
// the same process so they do not all contend on the cross-process file lock.
|
||||
const writeQueues = new Map<string, Promise<void>>();
|
||||
|
||||
/** Exposed for testing only — returns the current number of live queue entries. */
|
||||
export function _testOnly_getWriteQueueSize(): number {
|
||||
return writeQueues.size;
|
||||
}
|
||||
|
||||
export async function recordTokenUsage(params: {
|
||||
workspaceDir: string;
|
||||
runId?: string;
|
||||
sessionId?: string;
|
||||
sessionKey?: string;
|
||||
provider?: string;
|
||||
model?: string;
|
||||
label: string;
|
||||
usage?: {
|
||||
input?: number;
|
||||
output?: number;
|
||||
cacheRead?: number;
|
||||
cacheWrite?: number;
|
||||
total?: number;
|
||||
};
|
||||
}) {
|
||||
const usage = params.usage;
|
||||
if (!usage) {
|
||||
return;
|
||||
}
|
||||
const total =
|
||||
usage.total ??
|
||||
(usage.input ?? 0) + (usage.output ?? 0) + (usage.cacheRead ?? 0) + (usage.cacheWrite ?? 0);
|
||||
if (!total || total <= 0) {
|
||||
return;
|
||||
}
|
||||
|
||||
const memoryDir = path.join(params.workspaceDir, "memory");
|
||||
await fs.mkdir(memoryDir, { recursive: true });
|
||||
// Canonicalize before keying writeQueues so that different path spellings
|
||||
// for the same physical directory (e.g. a symlink vs its target) share a
|
||||
// single in-process queue. Without this, two spellings produce separate
|
||||
// queue entries and both call appendRecord concurrently; when
|
||||
// withFileLock's HELD_LOCKS map then resolves both to the same normalised
|
||||
// path the second caller re-entrantly joins the first — allowing concurrent
|
||||
// read-modify-write cycles that silently drop entries.
|
||||
const realMemoryDir = await fs.realpath(memoryDir).catch(() => memoryDir);
|
||||
const file = path.join(realMemoryDir, "token-usage.json");
|
||||
|
||||
const entry: TokenUsageRecord = {
|
||||
id: makeId(),
|
||||
label: params.label,
|
||||
tokensUsed: Math.trunc(total),
|
||||
...(usage.input != null && usage.input > 0 && { inputTokens: Math.trunc(usage.input) }),
|
||||
...(usage.output != null && usage.output > 0 && { outputTokens: Math.trunc(usage.output) }),
|
||||
...(usage.cacheRead != null &&
|
||||
usage.cacheRead > 0 && { cacheReadTokens: Math.trunc(usage.cacheRead) }),
|
||||
...(usage.cacheWrite != null &&
|
||||
usage.cacheWrite > 0 && { cacheWriteTokens: Math.trunc(usage.cacheWrite) }),
|
||||
model: params.model,
|
||||
provider: params.provider,
|
||||
runId: params.runId,
|
||||
sessionId: params.sessionId,
|
||||
sessionKey: params.sessionKey,
|
||||
createdAt: new Date().toISOString(),
|
||||
};
|
||||
|
||||
const queued = writeQueues.get(file) ?? Promise.resolve();
|
||||
const next = queued.then(() => appendRecord(file, entry));
|
||||
// Suppress rejection so the Map value never holds a rejected promise, which
|
||||
// would cause unhandled-rejection noise for any future .then() chained on it.
|
||||
const stored = next.catch(() => {});
|
||||
writeQueues.set(file, stored);
|
||||
// Remove the entry once it settles. Check identity first: if another call
|
||||
// already enqueued a new write for the same path, writeQueues now holds that
|
||||
// newer promise and must not be deleted — the newer entry owns its own cleanup.
|
||||
// This prevents unbounded Map growth in long-lived processes that write to
|
||||
// many ephemeral workspace directories.
|
||||
void stored.then(() => {
|
||||
if (writeQueues.get(file) === stored) {
|
||||
writeQueues.delete(file);
|
||||
}
|
||||
});
|
||||
await next;
|
||||
}
|
||||
281
src/plugin-sdk/file-lock.test.ts
Normal file
281
src/plugin-sdk/file-lock.test.ts
Normal file
@ -0,0 +1,281 @@
|
||||
import fs from "node:fs/promises";
|
||||
import os from "node:os";
|
||||
import path from "node:path";
|
||||
import { afterEach, beforeEach, describe, expect, it, vi } from "vitest";
|
||||
import { getProcessStartTime } from "../shared/pid-alive.js";
|
||||
import { withFileLock } from "./file-lock.js";
|
||||
|
||||
/** Set a file's mtime to `msAgo` milliseconds in the past. */
|
||||
async function ageFile(filePath: string, msAgo: number): Promise<void> {
|
||||
const t = (Date.now() - msAgo) / 1000; // fs.utimes takes seconds
|
||||
await fs.utimes(filePath, t, t);
|
||||
}
|
||||
|
||||
const LOCK_OPTIONS = {
|
||||
retries: {
|
||||
retries: 2,
|
||||
factor: 1,
|
||||
minTimeout: 20,
|
||||
maxTimeout: 50,
|
||||
},
|
||||
stale: 5_000,
|
||||
};
|
||||
|
||||
// More retries for tests where one waiter must survive while the other holds
|
||||
// the lock for a non-trivial duration.
|
||||
const RETRY_LOCK_OPTIONS = {
|
||||
retries: {
|
||||
retries: 10,
|
||||
factor: 1,
|
||||
minTimeout: 10,
|
||||
maxTimeout: 30,
|
||||
},
|
||||
stale: 5_000,
|
||||
};
|
||||
|
||||
describe("withFileLock", () => {
|
||||
let tmpDir: string;
|
||||
let targetFile: string;
|
||||
|
||||
beforeEach(async () => {
|
||||
tmpDir = await fs.mkdtemp(path.join(os.tmpdir(), "file-lock-test-"));
|
||||
targetFile = path.join(tmpDir, "data.json");
|
||||
});
|
||||
|
||||
afterEach(async () => {
|
||||
await fs.rm(tmpDir, { recursive: true, force: true });
|
||||
});
|
||||
|
||||
it("acquires and releases the lock, allowing a second caller to proceed", async () => {
|
||||
const order: string[] = [];
|
||||
await withFileLock(targetFile, LOCK_OPTIONS, async () => {
|
||||
order.push("first-start");
|
||||
await new Promise((r) => setTimeout(r, 10));
|
||||
order.push("first-end");
|
||||
});
|
||||
await withFileLock(targetFile, LOCK_OPTIONS, async () => {
|
||||
order.push("second");
|
||||
});
|
||||
expect(order).toEqual(["first-start", "first-end", "second"]);
|
||||
});
|
||||
|
||||
it("reclaims an empty lock file left by a crash between open and writeFile", async () => {
|
||||
// Simulate a crash in the open("wx")-to-writeFile window: the .lock file
|
||||
// exists but has empty (unparseable) content.
|
||||
const lockPath = `${targetFile}.lock`;
|
||||
await fs.mkdir(path.dirname(targetFile), { recursive: true });
|
||||
await fs.writeFile(lockPath, ""); // empty — no pid/createdAt written
|
||||
// An empty file with a young mtime is conservatively treated as a live
|
||||
// writer still in the open→writeFile window. Age it past staleMs so the
|
||||
// mtime-based fallback marks it as stale.
|
||||
await ageFile(lockPath, LOCK_OPTIONS.stale + 1_000);
|
||||
|
||||
let ran = false;
|
||||
await expect(
|
||||
withFileLock(targetFile, LOCK_OPTIONS, async () => {
|
||||
ran = true;
|
||||
}),
|
||||
).resolves.toBeUndefined();
|
||||
expect(ran).toBe(true);
|
||||
});
|
||||
|
||||
it("does not reclaim an empty lock file whose mtime is within staleMs (live writer window)", async () => {
|
||||
// A freshly-created empty lock belongs to a live writer still in the
|
||||
// open→writeFile window. isStaleLock must NOT treat it as stale.
|
||||
const lockPath = `${targetFile}.lock`;
|
||||
await fs.mkdir(path.dirname(targetFile), { recursive: true });
|
||||
await fs.writeFile(lockPath, ""); // young mtime: effectively "just opened"
|
||||
|
||||
// withFileLock should time out because the empty lock looks live.
|
||||
await expect(
|
||||
withFileLock(
|
||||
targetFile,
|
||||
{ ...LOCK_OPTIONS, retries: { ...LOCK_OPTIONS.retries, retries: 1 } },
|
||||
async () => {},
|
||||
),
|
||||
).rejects.toThrow("file lock timeout");
|
||||
});
|
||||
|
||||
it("reclaims a lock file containing partial/invalid JSON aged past staleMs", async () => {
|
||||
const lockPath = `${targetFile}.lock`;
|
||||
await fs.mkdir(path.dirname(targetFile), { recursive: true });
|
||||
await fs.writeFile(lockPath, '{"pid":'); // truncated JSON
|
||||
await ageFile(lockPath, LOCK_OPTIONS.stale + 1_000);
|
||||
|
||||
let ran = false;
|
||||
await expect(
|
||||
withFileLock(targetFile, LOCK_OPTIONS, async () => {
|
||||
ran = true;
|
||||
}),
|
||||
).resolves.toBeUndefined();
|
||||
expect(ran).toBe(true);
|
||||
});
|
||||
|
||||
it("reclaims a lock file whose pid field is not a number, aged past staleMs", async () => {
|
||||
const lockPath = `${targetFile}.lock`;
|
||||
await fs.mkdir(path.dirname(targetFile), { recursive: true });
|
||||
await fs.writeFile(
|
||||
lockPath,
|
||||
JSON.stringify({ pid: "not-a-number", createdAt: new Date().toISOString() }),
|
||||
);
|
||||
await ageFile(lockPath, LOCK_OPTIONS.stale + 1_000);
|
||||
|
||||
let ran = false;
|
||||
await expect(
|
||||
withFileLock(targetFile, LOCK_OPTIONS, async () => {
|
||||
ran = true;
|
||||
}),
|
||||
).resolves.toBeUndefined();
|
||||
expect(ran).toBe(true);
|
||||
});
|
||||
|
||||
it("reclaims a lock whose live PID has an old createdAt (PID-reuse via age check)", async () => {
|
||||
// Simulate PID reuse: the lock contains the test process's own (live) PID
|
||||
// but a createdAt older than staleMs, as if the original holder crashed and
|
||||
// the OS later assigned the same PID to an unrelated process. The age
|
||||
// check must reclaim the lock even though the PID is alive.
|
||||
const lockPath = `${targetFile}.lock`;
|
||||
await fs.mkdir(path.dirname(targetFile), { recursive: true });
|
||||
await fs.writeFile(
|
||||
lockPath,
|
||||
JSON.stringify({
|
||||
pid: process.pid,
|
||||
createdAt: new Date(Date.now() - (LOCK_OPTIONS.stale + 5_000)).toISOString(),
|
||||
}),
|
||||
);
|
||||
|
||||
let ran = false;
|
||||
await expect(
|
||||
withFileLock(targetFile, LOCK_OPTIONS, async () => {
|
||||
ran = true;
|
||||
}),
|
||||
).resolves.toBeUndefined();
|
||||
expect(ran).toBe(true);
|
||||
});
|
||||
|
||||
it.skipIf(process.platform !== "linux")(
|
||||
"reclaims a lock whose PID was recycled by a different process (startTime mismatch)",
|
||||
async () => {
|
||||
// On Linux, /proc/{pid}/stat provides a per-process start time that
|
||||
// survives PID reuse. Write a lock with the current process's PID but a
|
||||
// startTime that cannot match — isStaleLock should detect the mismatch
|
||||
// and reclaim immediately, without waiting for createdAt to age out.
|
||||
const actualStartTime = getProcessStartTime(process.pid);
|
||||
if (actualStartTime === null) {
|
||||
// getProcessStartTime returned null unexpectedly on Linux — skip.
|
||||
return;
|
||||
}
|
||||
|
||||
const lockPath = `${targetFile}.lock`;
|
||||
await fs.mkdir(path.dirname(targetFile), { recursive: true });
|
||||
await fs.writeFile(
|
||||
lockPath,
|
||||
JSON.stringify({
|
||||
pid: process.pid,
|
||||
createdAt: new Date().toISOString(), // recent — age check alone would not fire
|
||||
startTime: actualStartTime + 999_999, // deliberately wrong
|
||||
}),
|
||||
);
|
||||
|
||||
let ran = false;
|
||||
await expect(
|
||||
withFileLock(targetFile, LOCK_OPTIONS, async () => {
|
||||
ran = true;
|
||||
}),
|
||||
).resolves.toBeUndefined();
|
||||
expect(ran).toBe(true);
|
||||
},
|
||||
);
|
||||
|
||||
it("reclaims a stale empty lock file even when detected on the very last retry (retries=0)", async () => {
|
||||
// Regression: when retries=0 (attempts=1), a stale lock detected on
|
||||
// attempt 0 caused `continue` to increment attempt to 1, exiting the loop
|
||||
// and throwing timeout without ever calling open(O_EXCL) on the now-clear
|
||||
// path. This reproduces the token-usage.json.lock crash-recovery bug where
|
||||
// an empty .lock file left by a crash is reclaimable only on the last slot.
|
||||
const lockPath = `${targetFile}.lock`;
|
||||
await fs.mkdir(path.dirname(targetFile), { recursive: true });
|
||||
await fs.writeFile(lockPath, ""); // empty — crash between open("wx") and writeFile
|
||||
await ageFile(lockPath, LOCK_OPTIONS.stale + 1_000); // age past stale threshold
|
||||
|
||||
let ran = false;
|
||||
await expect(
|
||||
withFileLock(
|
||||
targetFile,
|
||||
{ ...LOCK_OPTIONS, retries: { ...LOCK_OPTIONS.retries, retries: 0 } },
|
||||
async () => {
|
||||
ran = true;
|
||||
},
|
||||
),
|
||||
).resolves.toBeUndefined();
|
||||
expect(ran).toBe(true);
|
||||
});
|
||||
|
||||
it("times out (does not spin forever) when a stale lock cannot be removed", async () => {
|
||||
// Safety bound: if fs.rm silently fails (EACCES/EPERM — swallowed by
|
||||
// .catch), subsequent iterations keep seeing isStale=true. The one-shot
|
||||
// reclaimSlotAvailable guard must prevent the attempt -= 1 trick from
|
||||
// repeating indefinitely; after the free slot is consumed, the loop must
|
||||
// exhaust its retry budget and throw timeout rather than hanging forever.
|
||||
//
|
||||
// We simulate the condition by mocking fs.rm as a no-op so the .lock
|
||||
// file is never removed — every subsequent open(O_EXCL) still sees EEXIST.
|
||||
// Without the reclaimSlotAvailable bound, each stale detection on the last
|
||||
// slot would decrement attempt and loop back, spinning forever.
|
||||
const lockPath = `${targetFile}.lock`;
|
||||
await fs.mkdir(path.dirname(targetFile), { recursive: true });
|
||||
await fs.writeFile(lockPath, JSON.stringify({ pid: 0, createdAt: new Date(0).toISOString() }));
|
||||
await ageFile(lockPath, LOCK_OPTIONS.stale + 1_000);
|
||||
|
||||
// Spy on fs.rm: resolve successfully but do nothing, so the lock file
|
||||
// persists and isStaleLock() keeps returning true on every iteration.
|
||||
const rmSpy = vi.spyOn(fs, "rm").mockResolvedValue(undefined);
|
||||
|
||||
try {
|
||||
await expect(
|
||||
withFileLock(
|
||||
targetFile,
|
||||
{ ...LOCK_OPTIONS, retries: { ...LOCK_OPTIONS.retries, retries: 2 } },
|
||||
async () => {},
|
||||
),
|
||||
).rejects.toThrow("file lock timeout");
|
||||
} finally {
|
||||
rmSpy.mockRestore();
|
||||
await fs.rm(lockPath, { force: true }).catch(() => {});
|
||||
}
|
||||
});
|
||||
|
||||
it("two concurrent waiters on a stale lock never overlap inside fn()", async () => {
|
||||
// Plant a stale lock (dead PID, old timestamp) so both waiters will
|
||||
// simultaneously enter the stale-reclaim branch. The inode guard must
|
||||
// prevent the slower waiter's unlink from deleting the faster waiter's
|
||||
// freshly-acquired lock, which would allow both fn() calls to run
|
||||
// concurrently and corrupt each other's read-modify-write sequences.
|
||||
const lockPath = `${targetFile}.lock`;
|
||||
await fs.mkdir(path.dirname(targetFile), { recursive: true });
|
||||
await fs.writeFile(lockPath, JSON.stringify({ pid: 0, createdAt: new Date(0).toISOString() }));
|
||||
|
||||
let inside = 0; // number of concurrent fn() executions
|
||||
let maxInside = 0;
|
||||
const results: number[] = [];
|
||||
|
||||
const run = async (id: number) => {
|
||||
// Use RETRY_LOCK_OPTIONS so the losing waiter has enough budget to
|
||||
// outlast the winning waiter's 20 ms hold without timing out.
|
||||
await withFileLock(targetFile, RETRY_LOCK_OPTIONS, async () => {
|
||||
inside += 1;
|
||||
maxInside = Math.max(maxInside, inside);
|
||||
await new Promise((r) => setTimeout(r, 20)); // hold the lock briefly
|
||||
results.push(id);
|
||||
inside -= 1;
|
||||
});
|
||||
};
|
||||
|
||||
// Launch both concurrently so they race on the stale lock.
|
||||
await Promise.all([run(1), run(2)]);
|
||||
|
||||
// Both callbacks must have run exactly once and never overlapped.
|
||||
expect(results.toSorted((a, b) => a - b)).toEqual([1, 2]);
|
||||
expect(maxInside).toBe(1);
|
||||
});
|
||||
});
|
||||
@ -1,6 +1,6 @@
|
||||
import fs from "node:fs/promises";
|
||||
import path from "node:path";
|
||||
import { isPidAlive } from "../shared/pid-alive.js";
|
||||
import { getProcessStartTime, isPidAlive } from "../shared/pid-alive.js";
|
||||
import { resolveProcessScopedMap } from "../shared/process-scoped-map.js";
|
||||
|
||||
export type FileLockOptions = {
|
||||
@ -17,6 +17,11 @@ export type FileLockOptions = {
|
||||
type LockFilePayload = {
|
||||
pid: number;
|
||||
createdAt: string;
|
||||
// /proc/{pid}/stat field 22 (clock ticks since boot), recorded at lock
|
||||
// creation. Present only on Linux; omitted (undefined) on other platforms.
|
||||
// Used to detect PID recycling: if the same PID is later alive but has a
|
||||
// different startTime, it belongs to a different process.
|
||||
startTime?: number;
|
||||
};
|
||||
|
||||
type HeldLock = {
|
||||
@ -44,7 +49,11 @@ async function readLockPayload(lockPath: string): Promise<LockFilePayload | null
|
||||
if (typeof parsed.pid !== "number" || typeof parsed.createdAt !== "string") {
|
||||
return null;
|
||||
}
|
||||
return { pid: parsed.pid, createdAt: parsed.createdAt };
|
||||
return {
|
||||
pid: parsed.pid,
|
||||
createdAt: parsed.createdAt,
|
||||
...(typeof parsed.startTime === "number" && { startTime: parsed.startTime }),
|
||||
};
|
||||
} catch {
|
||||
return null;
|
||||
}
|
||||
@ -64,20 +73,49 @@ async function resolveNormalizedFilePath(filePath: string): Promise<string> {
|
||||
|
||||
async function isStaleLock(lockPath: string, staleMs: number): Promise<boolean> {
|
||||
const payload = await readLockPayload(lockPath);
|
||||
if (payload?.pid && !isPidAlive(payload.pid)) {
|
||||
return true;
|
||||
}
|
||||
if (payload?.createdAt) {
|
||||
|
||||
if (payload !== null) {
|
||||
// PID liveness alone is not enough: the OS can recycle a PID after the
|
||||
// original holder exits. Three complementary checks handle this:
|
||||
//
|
||||
// 1. isPidAlive: fast path — if the PID is gone, the lock is stale.
|
||||
//
|
||||
// 2. startTime (Linux only): /proc/{pid}/stat field 22 records how long
|
||||
// after boot the process started. If the current holder's startTime
|
||||
// differs from the stored value, the PID was recycled by an unrelated
|
||||
// process and the lock can be reclaimed immediately.
|
||||
//
|
||||
// 3. createdAt age: a reused PID inherits the old creation timestamp, so
|
||||
// once it exceeds staleMs the lock is reclaimed on any platform.
|
||||
if (!isPidAlive(payload.pid)) {
|
||||
return true;
|
||||
}
|
||||
if (payload.startTime !== undefined) {
|
||||
const currentStartTime = getProcessStartTime(payload.pid);
|
||||
if (currentStartTime !== null && currentStartTime !== payload.startTime) {
|
||||
return true; // PID was recycled by a different process
|
||||
}
|
||||
}
|
||||
const createdAt = Date.parse(payload.createdAt);
|
||||
if (!Number.isFinite(createdAt) || Date.now() - createdAt > staleMs) {
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
// payload is null: the lock file exists but its content is empty or
|
||||
// unparseable. The most likely cause is a crash in the narrow window
|
||||
// between open("wx") (file created, empty) and writeFile (payload written).
|
||||
// A live writer still in that window is indistinguishable from a crashed
|
||||
// one by content alone, so we fall back to the file's mtime: a young file
|
||||
// (mtime < staleMs ago) may belong to a live writer; an old file was
|
||||
// definitely orphaned. Treating null as immediately stale would steal the
|
||||
// lock from a live writer and break mutual exclusion.
|
||||
try {
|
||||
const stat = await fs.stat(lockPath);
|
||||
return Date.now() - stat.mtimeMs > staleMs;
|
||||
} catch {
|
||||
return true;
|
||||
return true; // file vanished: another waiter already handled it
|
||||
}
|
||||
}
|
||||
|
||||
@ -117,11 +155,27 @@ export async function acquireFileLock(
|
||||
}
|
||||
|
||||
const attempts = Math.max(1, options.retries.retries + 1);
|
||||
// One-shot budget for the post-reclaim free slot (see stale-reclaim comment
|
||||
// below). Consumed the first time we step attempt back; subsequent stale
|
||||
// detections on the last slot are allowed to exhaust the loop and time out,
|
||||
// preventing an endless spin when fs.rm silently fails (e.g. EACCES/EPERM).
|
||||
let reclaimSlotAvailable = true;
|
||||
for (let attempt = 0; attempt < attempts; attempt += 1) {
|
||||
try {
|
||||
const handle = await fs.open(lockPath, "wx");
|
||||
const startTime = getProcessStartTime(process.pid);
|
||||
await handle.writeFile(
|
||||
JSON.stringify({ pid: process.pid, createdAt: new Date().toISOString() }, null, 2),
|
||||
JSON.stringify(
|
||||
{
|
||||
pid: process.pid,
|
||||
createdAt: new Date().toISOString(),
|
||||
// Omit startTime on non-Linux where it is null so the field is
|
||||
// absent from the JSON rather than present as null.
|
||||
...(startTime !== null && { startTime }),
|
||||
},
|
||||
null,
|
||||
2,
|
||||
),
|
||||
"utf8",
|
||||
);
|
||||
HELD_LOCKS.set(normalizedFile, { count: 1, handle, lockPath });
|
||||
@ -134,10 +188,65 @@ export async function acquireFileLock(
|
||||
if (code !== "EEXIST") {
|
||||
throw err;
|
||||
}
|
||||
if (await isStaleLock(lockPath, options.stale)) {
|
||||
await fs.rm(lockPath, { force: true }).catch(() => undefined);
|
||||
|
||||
// Snapshot the inode of the existing lock file *before* checking
|
||||
// staleness. We compare it again just before unlinking; if the inode
|
||||
// has changed in the interim, another waiter already reclaimed the
|
||||
// stale file and created a fresh lock — deleting it would silently
|
||||
// break that holder's mutual exclusion guarantee.
|
||||
const staleIno = await fs
|
||||
.stat(lockPath)
|
||||
.then((s) => s.ino)
|
||||
.catch(() => -1);
|
||||
|
||||
// staleIno === -1 means the file vanished between open(EEXIST) and
|
||||
// stat — another process already removed it. Skip straight to the
|
||||
// next open(O_EXCL) attempt.
|
||||
const isStale = staleIno === -1 || (await isStaleLock(lockPath, options.stale));
|
||||
|
||||
if (isStale) {
|
||||
if (staleIno !== -1) {
|
||||
// Re-verify the path still maps to the same inode we deemed stale.
|
||||
// If it changed, a concurrent waiter beat us to the reclaim and has
|
||||
// already written its own fresh lock; leave that file alone.
|
||||
const currentIno = await fs
|
||||
.stat(lockPath)
|
||||
.then((s) => s.ino)
|
||||
.catch(() => -1);
|
||||
if (currentIno === staleIno) {
|
||||
await fs.rm(lockPath, { force: true }).catch(() => undefined);
|
||||
}
|
||||
}
|
||||
// Retry open(O_EXCL) regardless: either we removed the stale lock or
|
||||
// a concurrent waiter already handled it; either way, the path is now
|
||||
// either free or holds a fresh lock that isStaleLock will reject.
|
||||
//
|
||||
// Guard: the for-loop's `attempt += 1` runs after every `continue`,
|
||||
// consuming a retry slot. If we are already on the last slot
|
||||
// (attempt === attempts - 1), that increment exits the loop and we
|
||||
// throw timeout without ever re-trying open(O_EXCL) on the now-clear
|
||||
// path. This is the crash-recovery bug: an empty/partial .lock file
|
||||
// (crash between open("wx") and writeFile) that becomes reclaimable
|
||||
// on the last iteration causes the record to be silently dropped.
|
||||
//
|
||||
// Fix: when on the last slot, step attempt back by one so the
|
||||
// upcoming += 1 nets to zero, guaranteeing at least one post-reclaim
|
||||
// open(O_EXCL) attempt. No extra sleep budget is consumed — we only
|
||||
// charge a retry for backoff sleeps, not reclaim-only work.
|
||||
//
|
||||
// Safety bound: reclaimSlotAvailable is consumed after the first use.
|
||||
// If fs.rm silently fails (EACCES/EPERM swallowed by .catch above),
|
||||
// subsequent iterations will still detect isStale=true; without the
|
||||
// bound, decrementing attempt on every last-slot iteration would spin
|
||||
// the loop forever. After the one-shot budget is gone, the loop is
|
||||
// allowed to exhaust normally and throw timeout.
|
||||
if (reclaimSlotAvailable && attempt >= attempts - 1) {
|
||||
reclaimSlotAvailable = false;
|
||||
attempt -= 1;
|
||||
}
|
||||
continue;
|
||||
}
|
||||
|
||||
if (attempt >= attempts - 1) {
|
||||
break;
|
||||
}
|
||||
|
||||
@ -11,7 +11,7 @@ export function safeRealpathSync(targetPath: string, cache?: Map<string, string>
|
||||
return cached;
|
||||
}
|
||||
try {
|
||||
const resolved = fs.realpathSync(targetPath);
|
||||
const resolved = fs.realpathSync.native(targetPath);
|
||||
cache?.set(targetPath, resolved);
|
||||
return resolved;
|
||||
} catch {
|
||||
|
||||
@ -9,6 +9,8 @@
|
||||
"test": "vitest run --config vitest.config.ts"
|
||||
},
|
||||
"dependencies": {
|
||||
"@lit-labs/signals": "^0.2.0",
|
||||
"@lit/context": "^1.1.6",
|
||||
"@noble/ed25519": "3.0.1",
|
||||
"dompurify": "^3.3.3",
|
||||
"lit": "^3.3.2",
|
||||
|
||||
@ -91,6 +91,12 @@ function createChatHeaderState(
|
||||
sessionKey: "main",
|
||||
connected: true,
|
||||
sessionsHideCron: true,
|
||||
sessionsLoading: false,
|
||||
sessionsError: null,
|
||||
sessionsFilterActive: "0",
|
||||
sessionsFilterLimit: "0",
|
||||
sessionsIncludeGlobal: true,
|
||||
sessionsIncludeUnknown: true,
|
||||
sessionsResult: {
|
||||
ts: 0,
|
||||
path: "",
|
||||
@ -111,6 +117,7 @@ function createChatHeaderState(
|
||||
chatModelOverrides: {},
|
||||
chatModelCatalog: catalog,
|
||||
chatModelsLoading: false,
|
||||
chatSending: false,
|
||||
client: { request } as unknown as GatewayBrowserClient,
|
||||
settings: {
|
||||
gatewayUrl: "",
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user