diff --git a/git-hooks/pre-commit b/git-hooks/pre-commit index 11079bc9f22..9dd1b5af253 100755 --- a/git-hooks/pre-commit +++ b/git-hooks/pre-commit @@ -3,6 +3,16 @@ set -euo pipefail ROOT_DIR="$(git rev-parse --show-toplevel 2>/dev/null || pwd)" + +# Resolve node when not in PATH (e.g. nvm environments where the shell +# profile hasn't been sourced by the git hook). Picks the newest installed +# nvm version using version-aware sort so that v22 is preferred over v18. +_resolve_node_sh="$ROOT_DIR/scripts/pre-commit/resolve-node.sh" +if [[ -f "$_resolve_node_sh" ]]; then + # shellcheck source=scripts/pre-commit/resolve-node.sh + source "$_resolve_node_sh" +fi +unset _resolve_node_sh RUN_NODE_TOOL="$ROOT_DIR/scripts/pre-commit/run-node-tool.sh" FILTER_FILES="$ROOT_DIR/scripts/pre-commit/filter-staged-files.mjs" diff --git a/scripts/pre-commit/resolve-node.sh b/scripts/pre-commit/resolve-node.sh new file mode 100644 index 00000000000..60a77ed759b --- /dev/null +++ b/scripts/pre-commit/resolve-node.sh @@ -0,0 +1,29 @@ +#!/usr/bin/env bash +# Resolve the newest nvm-managed Node when it is not already in PATH. +# Source this file; do not execute it directly. +# +# Cross-platform: avoids GNU-only `sort -V` (not supported by BSD sort on +# macOS). Instead, each semver component is zero-padded to five digits so +# that a plain lexicographic sort correctly selects the highest semantic +# version (e.g. v22.x wins over v18.x). +if ! command -v node >/dev/null 2>&1; then + _nvm_node=$( + for _p in "$HOME/.nvm/versions/node"/*/bin/node; do + [[ -x "$_p" ]] || continue + _ver="${_p%/bin/node}"; _ver="${_ver##*/v}" + IFS=. read -r _ma _mi _pa <<< "$_ver" + # Strip any non-numeric suffix so prerelease tags (e.g. "0-rc.1", "0-nightly") + # do not make printf '%05d' fail under set -euo pipefail. + # ${var%%pattern} removes the longest suffix matching pattern, so + # "0-rc.1" → "0" and "14" → "14" (no-op when already numeric-only). + _ma="${_ma%%[^0-9]*}" + _mi="${_mi%%[^0-9]*}" + _pa="${_pa%%[^0-9]*}" + printf '%05d%05d%05d\t%s\n' "${_ma:-0}" "${_mi:-0}" "${_pa:-0}" "$_p" + done | sort | tail -1 | cut -f2- + ) + if [[ -x "$_nvm_node" ]]; then + export PATH="$(dirname "$_nvm_node"):$PATH" + fi + unset _nvm_node _p _ver _ma _mi _pa +fi diff --git a/scripts/pre-commit/run-node-tool.sh b/scripts/pre-commit/run-node-tool.sh index 34163075517..4fda9e4ba0b 100755 --- a/scripts/pre-commit/run-node-tool.sh +++ b/scripts/pre-commit/run-node-tool.sh @@ -3,6 +3,11 @@ set -euo pipefail ROOT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")/../.." && pwd)" +# Resolve node when not in PATH (e.g. nvm environments). Picks the newest +# installed nvm version using version-aware sort. +# shellcheck source=resolve-node.sh +source "$ROOT_DIR/scripts/pre-commit/resolve-node.sh" + if [[ $# -lt 1 ]]; then echo "usage: run-node-tool.sh [args...]" >&2 exit 2 diff --git a/src/agents/pi-embedded-runner/run/attempt.ts b/src/agents/pi-embedded-runner/run/attempt.ts index d785218f819..cd630417e53 100644 --- a/src/agents/pi-embedded-runner/run/attempt.ts +++ b/src/agents/pi-embedded-runner/run/attempt.ts @@ -100,6 +100,7 @@ import { resolveEffectiveToolFsWorkspaceOnly } from "../../tool-fs-policy.js"; import { normalizeToolName } from "../../tool-policy.js"; import type { TranscriptPolicy } from "../../transcript-policy.js"; import { resolveTranscriptPolicy } from "../../transcript-policy.js"; +import { recordTokenUsage } from "../../usage-log.js"; import { DEFAULT_BOOTSTRAP_FILENAME } from "../../workspace.js"; import { isRunnerAbortError } from "../abort.js"; import { appendCacheTtlTimestamp, isCacheTtlEligibleProvider } from "../cache-ttl.js"; @@ -3156,6 +3157,19 @@ export async function runEmbeddedAttempt( }); } + recordTokenUsage({ + workspaceDir: effectiveWorkspace, + runId: params.runId, + sessionId: params.sessionId, + sessionKey: params.sessionKey, + provider: params.provider, + model: params.modelId, + label: "llm_output", + usage: getUsageTotals(), + }).catch((err) => { + log.warn(`token usage log failed: ${String(err)}`); + }); + return { aborted, timedOut, diff --git a/src/agents/usage-log.test.ts b/src/agents/usage-log.test.ts new file mode 100644 index 00000000000..822e67fab49 --- /dev/null +++ b/src/agents/usage-log.test.ts @@ -0,0 +1,314 @@ +import { spawn } from "child_process"; +import fs from "fs/promises"; +import os from "os"; +import path from "path"; +import { describe, expect, it, beforeEach, afterEach } from "vitest"; +import { recordTokenUsage, _testOnly_getWriteQueueSize } from "./usage-log.js"; + +describe("recordTokenUsage", () => { + let tmpDir: string; + let usageFile: string; + + beforeEach(async () => { + tmpDir = await fs.mkdtemp(path.join(os.tmpdir(), "usage-log-test-")); + usageFile = path.join(tmpDir, "memory", "token-usage.json"); + }); + + afterEach(async () => { + await fs.rm(tmpDir, { recursive: true, force: true }); + }); + + it("writes inputTokens and outputTokens when provided", async () => { + await recordTokenUsage({ + workspaceDir: tmpDir, + label: "llm_output", + model: "claude-sonnet-4-6", + provider: "anthropic", + usage: { input: 1000, output: 500, total: 1500 }, + }); + + const records = JSON.parse(await fs.readFile(usageFile, "utf-8")); + expect(records).toHaveLength(1); + expect(records[0].tokensUsed).toBe(1500); + expect(records[0].inputTokens).toBe(1000); + expect(records[0].outputTokens).toBe(500); + expect(records[0].cacheReadTokens).toBeUndefined(); + expect(records[0].cacheWriteTokens).toBeUndefined(); + }); + + it("writes cacheReadTokens and cacheWriteTokens when provided", async () => { + await recordTokenUsage({ + workspaceDir: tmpDir, + label: "llm_output", + usage: { input: 800, output: 200, cacheRead: 300, cacheWrite: 100, total: 1400 }, + }); + + const records = JSON.parse(await fs.readFile(usageFile, "utf-8")); + expect(records[0].inputTokens).toBe(800); + expect(records[0].outputTokens).toBe(200); + expect(records[0].cacheReadTokens).toBe(300); + expect(records[0].cacheWriteTokens).toBe(100); + }); + + it("omits IO fields when usage only has total (legacy records)", async () => { + await recordTokenUsage({ + workspaceDir: tmpDir, + label: "llm_output", + usage: { total: 28402 }, + }); + + const records = JSON.parse(await fs.readFile(usageFile, "utf-8")); + expect(records[0].tokensUsed).toBe(28402); + expect(records[0].inputTokens).toBeUndefined(); + expect(records[0].outputTokens).toBeUndefined(); + }); + + it("skips writing when usage is undefined", async () => { + await recordTokenUsage({ + workspaceDir: tmpDir, + label: "llm_output", + usage: undefined, + }); + + const exists = await fs + .access(usageFile) + .then(() => true) + .catch(() => false); + expect(exists).toBe(false); + }); + + it("skips writing when total is zero", async () => { + await recordTokenUsage({ + workspaceDir: tmpDir, + label: "llm_output", + usage: { input: 0, output: 0 }, + }); + + const exists = await fs + .access(usageFile) + .then(() => true) + .catch(() => false); + expect(exists).toBe(false); + }); + + it("appends multiple records to the same file", async () => { + await recordTokenUsage({ + workspaceDir: tmpDir, + label: "llm_output", + usage: { input: 100, output: 50, total: 150 }, + }); + await recordTokenUsage({ + workspaceDir: tmpDir, + label: "llm_output", + usage: { input: 200, output: 80, total: 280 }, + }); + + const records = JSON.parse(await fs.readFile(usageFile, "utf-8")); + expect(records).toHaveLength(2); + expect(records[0].inputTokens).toBe(100); + expect(records[1].inputTokens).toBe(200); + }); + + it("truncates fractional tokens", async () => { + await recordTokenUsage({ + workspaceDir: tmpDir, + label: "llm_output", + usage: { input: 100.9, output: 50.1, total: 151 }, + }); + + const records = JSON.parse(await fs.readFile(usageFile, "utf-8")); + expect(records[0].inputTokens).toBe(100); + expect(records[0].outputTokens).toBe(50); + }); + + it("does not overwrite a valid-but-non-array token-usage.json — rejects unexpected shape", async () => { + // Simulate a manual edit or migration that left a valid JSON object + await fs.mkdir(path.join(tmpDir, "memory"), { recursive: true }); + await fs.writeFile(usageFile, '{"legacy": true, "records": []}', "utf-8"); + + await expect( + recordTokenUsage({ + workspaceDir: tmpDir, + label: "llm_output", + usage: { input: 100, output: 50, total: 150 }, + }), + ).rejects.toThrow("not an array"); + + // File must be unchanged — the legacy data is preserved. + const content = await fs.readFile(usageFile, "utf-8"); + expect(content).toBe('{"legacy": true, "records": []}'); + }); + + it("does not overwrite a malformed token-usage.json — preserves corrupted file", async () => { + // Simulate an interrupted write that left partial JSON + await fs.mkdir(path.join(tmpDir, "memory"), { recursive: true }); + await fs.writeFile(usageFile, '{"broken":true', "utf-8"); + + // recordTokenUsage must reject (caller is responsible for handling, e.g. + // attempt.ts uses .catch()) and must NOT overwrite the existing file. + await expect( + recordTokenUsage({ + workspaceDir: tmpDir, + label: "llm_output", + usage: { input: 100, output: 50, total: 150 }, + }), + ).rejects.toThrow(SyntaxError); + + // File must still contain the original corrupted content, not a new array. + const content = await fs.readFile(usageFile, "utf-8"); + expect(content).toBe('{"broken":true'); + }); + + it("cross-process lock: concurrent writers via file lock do not lose records", async () => { + // Simulate two processes bypassing the in-memory queue by calling + // recordTokenUsage from independent promise chains simultaneously. + // If the file lock is working they must still land all records. + const N = 10; + const writes = Array.from({ length: N }, (_, i) => { + // Each call is deliberately NOT chained — they race on the file lock. + return recordTokenUsage({ + workspaceDir: tmpDir, + label: "llm_output", + usage: { input: i + 1, output: 1, total: i + 2 }, + }); + }); + await Promise.all(writes); + + const records = JSON.parse(await fs.readFile(usageFile, "utf-8")); + expect(records).toHaveLength(N); + }); + + it("reclaims stale lock left by a crashed process", async () => { + // Spawn a subprocess that exits immediately, then use its (now-dead) PID + // to simulate a lock file left behind after an abnormal exit. + const deadPid = await new Promise((resolve, reject) => { + const child = spawn(process.execPath, ["-e", "process.exit(0)"]); + const pid = child.pid!; + child.on("exit", () => resolve(pid)); + child.on("error", reject); + }); + + const memoryDir = path.join(tmpDir, "memory"); + await fs.mkdir(memoryDir, { recursive: true }); + const lockPath = path.join(memoryDir, "token-usage.json.lock"); + // withFileLock (plugin-sdk) stores {pid, createdAt} — match that format. + await fs.writeFile( + lockPath, + JSON.stringify({ pid: deadPid, createdAt: new Date().toISOString() }), + ); + + // recordTokenUsage must detect the stale lock, reclaim it, and succeed. + await recordTokenUsage({ + workspaceDir: tmpDir, + label: "llm_output", + usage: { input: 100, output: 50, total: 150 }, + }); + + const records = JSON.parse(await fs.readFile(usageFile, "utf-8")); + expect(records).toHaveLength(1); + expect(records[0].tokensUsed).toBe(150); + // Lock file must be cleaned up by the winner. + const lockExists = await fs + .access(lockPath) + .then(() => true) + .catch(() => false); + expect(lockExists).toBe(false); + }); + + it("different path spellings for the same workspace share one queue — no record is lost", async () => { + // Symlink tmpDir → another name so the same physical directory has two + // spellings. Without queue-key canonicalisation both spellings create + // independent writeQueues entries; when one chain holds the file lock + // (HELD_LOCKS set) the other re-entrantly joins it and both execute the + // read-modify-write cycle concurrently, silently dropping entries. + const symlinkDir = `${tmpDir}-symlink`; + await fs.symlink(tmpDir, symlinkDir); + try { + // Mix canonical and symlink paths across concurrent writes. + const N = 6; + await Promise.all( + Array.from({ length: N }, (_, i) => + recordTokenUsage({ + workspaceDir: i % 2 === 0 ? tmpDir : symlinkDir, + label: "llm_output", + usage: { input: i + 1, output: 1, total: i + 2 }, + }), + ), + ); + + // All N records must survive — none may be lost to a concurrent + // read-modify-write collision. + const records = JSON.parse(await fs.readFile(usageFile, "utf-8")); + expect(records).toHaveLength(N); + } finally { + await fs.unlink(symlinkDir).catch(() => {}); + } + }); + + it("writeQueues entries are removed after each write settles — no unbounded Map growth", async () => { + // Regression: writeQueues entries were never deleted after a write settled, + // causing unbounded Map growth in long-lived processes that touch many + // ephemeral workspace directories (one unique path → one permanent entry). + // + // Verify the fix: after all writes to N distinct paths complete, the Map + // must be empty (no retained entries for settled paths). + const N = 10; + const dirs = await Promise.all( + Array.from({ length: N }, () => fs.mkdtemp(path.join(os.tmpdir(), "usage-queue-leak-"))), + ); + try { + await Promise.all( + dirs.map((dir) => + recordTokenUsage({ + workspaceDir: dir, + label: "llm_output", + usage: { input: 1, output: 1, total: 2 }, + }), + ), + ); + // All writes are awaited; their stored promises must have settled and + // been removed from the Map. + expect(_testOnly_getWriteQueueSize()).toBe(0); + } finally { + await Promise.all(dirs.map((d) => fs.rm(d, { recursive: true, force: true }))); + } + }); + + it("writeQueues entry is removed even when the write rejects", async () => { + // Plant a non-array token-usage.json to trigger a rejection in appendRecord. + // The Map entry must still be cleaned up — a failing write must not leak. + await fs.mkdir(path.join(tmpDir, "memory"), { recursive: true }); + await fs.writeFile(path.join(tmpDir, "memory", "token-usage.json"), '"not-an-array"', "utf-8"); + + await expect( + recordTokenUsage({ + workspaceDir: tmpDir, + label: "llm_output", + usage: { input: 1, output: 1, total: 2 }, + }), + ).rejects.toThrow(); + + expect(_testOnly_getWriteQueueSize()).toBe(0); + }); + + it("serialises concurrent writes — no record is lost", async () => { + const N = 20; + await Promise.all( + Array.from({ length: N }, (_, i) => + recordTokenUsage({ + workspaceDir: tmpDir, + label: "llm_output", + usage: { input: i + 1, output: 1, total: i + 2 }, + }), + ), + ); + + const records = JSON.parse(await fs.readFile(usageFile, "utf-8")); + expect(records).toHaveLength(N); + // Every distinct tokensUsed value must appear exactly once + const totals = records + .map((r: { tokensUsed: number }) => r.tokensUsed) + .toSorted((a: number, b: number) => a - b); + expect(totals).toEqual(Array.from({ length: N }, (_, i) => i + 2)); + }); +}); diff --git a/src/agents/usage-log.ts b/src/agents/usage-log.ts new file mode 100644 index 00000000000..d154133e24a --- /dev/null +++ b/src/agents/usage-log.ts @@ -0,0 +1,194 @@ +import { randomBytes } from "crypto"; +import fs from "fs/promises"; +import path from "path"; +import { type FileLockOptions, withFileLock } from "../infra/file-lock.js"; + +export type TokenUsageRecord = { + id: string; + label: string; + tokensUsed: number; + tokenLimit?: number; + inputTokens?: number; + outputTokens?: number; + cacheReadTokens?: number; + cacheWriteTokens?: number; + model?: string; + provider?: string; + runId?: string; + sessionId?: string; + sessionKey?: string; + createdAt: string; +}; + +function makeId() { + return `usage_${Date.now().toString(36)}_${randomBytes(4).toString("hex")}`; +} + +async function readJsonArray(file: string): Promise { + try { + const raw = await fs.readFile(file, "utf-8"); + const parsed = JSON.parse(raw); + if (!Array.isArray(parsed)) { + // Valid JSON but unexpected shape (object, number, string, …). + // Returning [] here would cause appendRecord to overwrite the file + // with only the new entry, silently deleting prior data. + throw Object.assign( + new Error( + `token-usage.json contains valid JSON but is not an array (got ${typeof parsed})`, + ), + { code: "ERR_UNEXPECTED_TOKEN_LOG_SHAPE" }, + ); + } + return parsed as TokenUsageRecord[]; + } catch (err) { + // File does not exist yet — start with an empty array. + if ((err as NodeJS.ErrnoException).code === "ENOENT") { + return []; + } + // Any other error (malformed JSON, permission denied, partial write, …) + // must propagate so appendRecord aborts and the existing file is not + // silently overwritten with only the new entry. + throw err; + } +} + +// --------------------------------------------------------------------------- +// Cross-process file lock +// +// The in-memory writeQueues Map serialises writes within a single Node +// process. Two concurrent OpenClaw processes targeting the same +// workspaceDir can still race, so we use an advisory O_EXCL lock provided +// by the shared withFileLock helper in plugin-sdk/file-lock.ts. +// +// That implementation: +// • stores {pid, createdAt} so waiters can detect a crashed holder +// • treats empty/unparseable lock content as stale (crash during open→write) +// • re-verifies the lock inode before removing it so a slow waiter's +// unlink cannot delete a fresh lock from another process +// • uses exponential backoff with jitter capped at stale ms +// --------------------------------------------------------------------------- +const APPEND_LOCK_OPTIONS: FileLockOptions = { + // appendRecord rewrites the full token-usage.json on every call, so hold + // time scales with file size and disk speed. 5 s was too tight: a slow + // write on a large history could exceed the stale window and allow a + // concurrent waiter to steal an active lock, causing overlapping + // read-modify-write cycles that silently drop entries. + // + // 30 s gives plenty of headroom for large files on slow disks while still + // reclaiming locks left by crashed processes within a reasonable window. + // The retry budget (~150 × 200 ms = 30 s) matches the stale window so + // waiters exhaust retries only if the holder holds longer than stale, + // i.e., is genuinely stuck or crashed. + retries: { + retries: 150, + factor: 1, + minTimeout: 200, + maxTimeout: 200, + randomize: true, + }, + stale: 30_000, +}; + +async function appendRecord(file: string, entry: TokenUsageRecord): Promise { + await withFileLock(file, APPEND_LOCK_OPTIONS, async () => { + const records = await readJsonArray(file); + records.push(entry); + // Write to a sibling temp file then atomically rename into place so that + // a crash or kill during the write never leaves token-usage.json truncated. + // rename(2) is atomic on POSIX when src and dst are on the same filesystem, + // which is guaranteed here because both paths share the same directory. + const tmp = `${file}.tmp.${randomBytes(4).toString("hex")}`; + try { + await fs.writeFile(tmp, JSON.stringify(records, null, 2)); + await fs.rename(tmp, file); + } catch (err) { + await fs.unlink(tmp).catch(() => {}); + throw err; + } + }); +} + +// Per-file write queue: serialises concurrent recordTokenUsage() calls within +// the same process so they do not all contend on the cross-process file lock. +const writeQueues = new Map>(); + +/** Exposed for testing only — returns the current number of live queue entries. */ +export function _testOnly_getWriteQueueSize(): number { + return writeQueues.size; +} + +export async function recordTokenUsage(params: { + workspaceDir: string; + runId?: string; + sessionId?: string; + sessionKey?: string; + provider?: string; + model?: string; + label: string; + usage?: { + input?: number; + output?: number; + cacheRead?: number; + cacheWrite?: number; + total?: number; + }; +}) { + const usage = params.usage; + if (!usage) { + return; + } + const total = + usage.total ?? + (usage.input ?? 0) + (usage.output ?? 0) + (usage.cacheRead ?? 0) + (usage.cacheWrite ?? 0); + if (!total || total <= 0) { + return; + } + + const memoryDir = path.join(params.workspaceDir, "memory"); + await fs.mkdir(memoryDir, { recursive: true }); + // Canonicalize before keying writeQueues so that different path spellings + // for the same physical directory (e.g. a symlink vs its target) share a + // single in-process queue. Without this, two spellings produce separate + // queue entries and both call appendRecord concurrently; when + // withFileLock's HELD_LOCKS map then resolves both to the same normalised + // path the second caller re-entrantly joins the first — allowing concurrent + // read-modify-write cycles that silently drop entries. + const realMemoryDir = await fs.realpath(memoryDir).catch(() => memoryDir); + const file = path.join(realMemoryDir, "token-usage.json"); + + const entry: TokenUsageRecord = { + id: makeId(), + label: params.label, + tokensUsed: Math.trunc(total), + ...(usage.input != null && usage.input > 0 && { inputTokens: Math.trunc(usage.input) }), + ...(usage.output != null && usage.output > 0 && { outputTokens: Math.trunc(usage.output) }), + ...(usage.cacheRead != null && + usage.cacheRead > 0 && { cacheReadTokens: Math.trunc(usage.cacheRead) }), + ...(usage.cacheWrite != null && + usage.cacheWrite > 0 && { cacheWriteTokens: Math.trunc(usage.cacheWrite) }), + model: params.model, + provider: params.provider, + runId: params.runId, + sessionId: params.sessionId, + sessionKey: params.sessionKey, + createdAt: new Date().toISOString(), + }; + + const queued = writeQueues.get(file) ?? Promise.resolve(); + const next = queued.then(() => appendRecord(file, entry)); + // Suppress rejection so the Map value never holds a rejected promise, which + // would cause unhandled-rejection noise for any future .then() chained on it. + const stored = next.catch(() => {}); + writeQueues.set(file, stored); + // Remove the entry once it settles. Check identity first: if another call + // already enqueued a new write for the same path, writeQueues now holds that + // newer promise and must not be deleted — the newer entry owns its own cleanup. + // This prevents unbounded Map growth in long-lived processes that write to + // many ephemeral workspace directories. + void stored.then(() => { + if (writeQueues.get(file) === stored) { + writeQueues.delete(file); + } + }); + await next; +} diff --git a/src/plugin-sdk/file-lock.test.ts b/src/plugin-sdk/file-lock.test.ts new file mode 100644 index 00000000000..474149d0323 --- /dev/null +++ b/src/plugin-sdk/file-lock.test.ts @@ -0,0 +1,281 @@ +import fs from "node:fs/promises"; +import os from "node:os"; +import path from "node:path"; +import { afterEach, beforeEach, describe, expect, it, vi } from "vitest"; +import { getProcessStartTime } from "../shared/pid-alive.js"; +import { withFileLock } from "./file-lock.js"; + +/** Set a file's mtime to `msAgo` milliseconds in the past. */ +async function ageFile(filePath: string, msAgo: number): Promise { + const t = (Date.now() - msAgo) / 1000; // fs.utimes takes seconds + await fs.utimes(filePath, t, t); +} + +const LOCK_OPTIONS = { + retries: { + retries: 2, + factor: 1, + minTimeout: 20, + maxTimeout: 50, + }, + stale: 5_000, +}; + +// More retries for tests where one waiter must survive while the other holds +// the lock for a non-trivial duration. +const RETRY_LOCK_OPTIONS = { + retries: { + retries: 10, + factor: 1, + minTimeout: 10, + maxTimeout: 30, + }, + stale: 5_000, +}; + +describe("withFileLock", () => { + let tmpDir: string; + let targetFile: string; + + beforeEach(async () => { + tmpDir = await fs.mkdtemp(path.join(os.tmpdir(), "file-lock-test-")); + targetFile = path.join(tmpDir, "data.json"); + }); + + afterEach(async () => { + await fs.rm(tmpDir, { recursive: true, force: true }); + }); + + it("acquires and releases the lock, allowing a second caller to proceed", async () => { + const order: string[] = []; + await withFileLock(targetFile, LOCK_OPTIONS, async () => { + order.push("first-start"); + await new Promise((r) => setTimeout(r, 10)); + order.push("first-end"); + }); + await withFileLock(targetFile, LOCK_OPTIONS, async () => { + order.push("second"); + }); + expect(order).toEqual(["first-start", "first-end", "second"]); + }); + + it("reclaims an empty lock file left by a crash between open and writeFile", async () => { + // Simulate a crash in the open("wx")-to-writeFile window: the .lock file + // exists but has empty (unparseable) content. + const lockPath = `${targetFile}.lock`; + await fs.mkdir(path.dirname(targetFile), { recursive: true }); + await fs.writeFile(lockPath, ""); // empty — no pid/createdAt written + // An empty file with a young mtime is conservatively treated as a live + // writer still in the open→writeFile window. Age it past staleMs so the + // mtime-based fallback marks it as stale. + await ageFile(lockPath, LOCK_OPTIONS.stale + 1_000); + + let ran = false; + await expect( + withFileLock(targetFile, LOCK_OPTIONS, async () => { + ran = true; + }), + ).resolves.toBeUndefined(); + expect(ran).toBe(true); + }); + + it("does not reclaim an empty lock file whose mtime is within staleMs (live writer window)", async () => { + // A freshly-created empty lock belongs to a live writer still in the + // open→writeFile window. isStaleLock must NOT treat it as stale. + const lockPath = `${targetFile}.lock`; + await fs.mkdir(path.dirname(targetFile), { recursive: true }); + await fs.writeFile(lockPath, ""); // young mtime: effectively "just opened" + + // withFileLock should time out because the empty lock looks live. + await expect( + withFileLock( + targetFile, + { ...LOCK_OPTIONS, retries: { ...LOCK_OPTIONS.retries, retries: 1 } }, + async () => {}, + ), + ).rejects.toThrow("file lock timeout"); + }); + + it("reclaims a lock file containing partial/invalid JSON aged past staleMs", async () => { + const lockPath = `${targetFile}.lock`; + await fs.mkdir(path.dirname(targetFile), { recursive: true }); + await fs.writeFile(lockPath, '{"pid":'); // truncated JSON + await ageFile(lockPath, LOCK_OPTIONS.stale + 1_000); + + let ran = false; + await expect( + withFileLock(targetFile, LOCK_OPTIONS, async () => { + ran = true; + }), + ).resolves.toBeUndefined(); + expect(ran).toBe(true); + }); + + it("reclaims a lock file whose pid field is not a number, aged past staleMs", async () => { + const lockPath = `${targetFile}.lock`; + await fs.mkdir(path.dirname(targetFile), { recursive: true }); + await fs.writeFile( + lockPath, + JSON.stringify({ pid: "not-a-number", createdAt: new Date().toISOString() }), + ); + await ageFile(lockPath, LOCK_OPTIONS.stale + 1_000); + + let ran = false; + await expect( + withFileLock(targetFile, LOCK_OPTIONS, async () => { + ran = true; + }), + ).resolves.toBeUndefined(); + expect(ran).toBe(true); + }); + + it("reclaims a lock whose live PID has an old createdAt (PID-reuse via age check)", async () => { + // Simulate PID reuse: the lock contains the test process's own (live) PID + // but a createdAt older than staleMs, as if the original holder crashed and + // the OS later assigned the same PID to an unrelated process. The age + // check must reclaim the lock even though the PID is alive. + const lockPath = `${targetFile}.lock`; + await fs.mkdir(path.dirname(targetFile), { recursive: true }); + await fs.writeFile( + lockPath, + JSON.stringify({ + pid: process.pid, + createdAt: new Date(Date.now() - (LOCK_OPTIONS.stale + 5_000)).toISOString(), + }), + ); + + let ran = false; + await expect( + withFileLock(targetFile, LOCK_OPTIONS, async () => { + ran = true; + }), + ).resolves.toBeUndefined(); + expect(ran).toBe(true); + }); + + it.skipIf(process.platform !== "linux")( + "reclaims a lock whose PID was recycled by a different process (startTime mismatch)", + async () => { + // On Linux, /proc/{pid}/stat provides a per-process start time that + // survives PID reuse. Write a lock with the current process's PID but a + // startTime that cannot match — isStaleLock should detect the mismatch + // and reclaim immediately, without waiting for createdAt to age out. + const actualStartTime = getProcessStartTime(process.pid); + if (actualStartTime === null) { + // getProcessStartTime returned null unexpectedly on Linux — skip. + return; + } + + const lockPath = `${targetFile}.lock`; + await fs.mkdir(path.dirname(targetFile), { recursive: true }); + await fs.writeFile( + lockPath, + JSON.stringify({ + pid: process.pid, + createdAt: new Date().toISOString(), // recent — age check alone would not fire + startTime: actualStartTime + 999_999, // deliberately wrong + }), + ); + + let ran = false; + await expect( + withFileLock(targetFile, LOCK_OPTIONS, async () => { + ran = true; + }), + ).resolves.toBeUndefined(); + expect(ran).toBe(true); + }, + ); + + it("reclaims a stale empty lock file even when detected on the very last retry (retries=0)", async () => { + // Regression: when retries=0 (attempts=1), a stale lock detected on + // attempt 0 caused `continue` to increment attempt to 1, exiting the loop + // and throwing timeout without ever calling open(O_EXCL) on the now-clear + // path. This reproduces the token-usage.json.lock crash-recovery bug where + // an empty .lock file left by a crash is reclaimable only on the last slot. + const lockPath = `${targetFile}.lock`; + await fs.mkdir(path.dirname(targetFile), { recursive: true }); + await fs.writeFile(lockPath, ""); // empty — crash between open("wx") and writeFile + await ageFile(lockPath, LOCK_OPTIONS.stale + 1_000); // age past stale threshold + + let ran = false; + await expect( + withFileLock( + targetFile, + { ...LOCK_OPTIONS, retries: { ...LOCK_OPTIONS.retries, retries: 0 } }, + async () => { + ran = true; + }, + ), + ).resolves.toBeUndefined(); + expect(ran).toBe(true); + }); + + it("times out (does not spin forever) when a stale lock cannot be removed", async () => { + // Safety bound: if fs.rm silently fails (EACCES/EPERM — swallowed by + // .catch), subsequent iterations keep seeing isStale=true. The one-shot + // reclaimSlotAvailable guard must prevent the attempt -= 1 trick from + // repeating indefinitely; after the free slot is consumed, the loop must + // exhaust its retry budget and throw timeout rather than hanging forever. + // + // We simulate the condition by mocking fs.rm as a no-op so the .lock + // file is never removed — every subsequent open(O_EXCL) still sees EEXIST. + // Without the reclaimSlotAvailable bound, each stale detection on the last + // slot would decrement attempt and loop back, spinning forever. + const lockPath = `${targetFile}.lock`; + await fs.mkdir(path.dirname(targetFile), { recursive: true }); + await fs.writeFile(lockPath, JSON.stringify({ pid: 0, createdAt: new Date(0).toISOString() })); + await ageFile(lockPath, LOCK_OPTIONS.stale + 1_000); + + // Spy on fs.rm: resolve successfully but do nothing, so the lock file + // persists and isStaleLock() keeps returning true on every iteration. + const rmSpy = vi.spyOn(fs, "rm").mockResolvedValue(undefined); + + try { + await expect( + withFileLock( + targetFile, + { ...LOCK_OPTIONS, retries: { ...LOCK_OPTIONS.retries, retries: 2 } }, + async () => {}, + ), + ).rejects.toThrow("file lock timeout"); + } finally { + rmSpy.mockRestore(); + await fs.rm(lockPath, { force: true }).catch(() => {}); + } + }); + + it("two concurrent waiters on a stale lock never overlap inside fn()", async () => { + // Plant a stale lock (dead PID, old timestamp) so both waiters will + // simultaneously enter the stale-reclaim branch. The inode guard must + // prevent the slower waiter's unlink from deleting the faster waiter's + // freshly-acquired lock, which would allow both fn() calls to run + // concurrently and corrupt each other's read-modify-write sequences. + const lockPath = `${targetFile}.lock`; + await fs.mkdir(path.dirname(targetFile), { recursive: true }); + await fs.writeFile(lockPath, JSON.stringify({ pid: 0, createdAt: new Date(0).toISOString() })); + + let inside = 0; // number of concurrent fn() executions + let maxInside = 0; + const results: number[] = []; + + const run = async (id: number) => { + // Use RETRY_LOCK_OPTIONS so the losing waiter has enough budget to + // outlast the winning waiter's 20 ms hold without timing out. + await withFileLock(targetFile, RETRY_LOCK_OPTIONS, async () => { + inside += 1; + maxInside = Math.max(maxInside, inside); + await new Promise((r) => setTimeout(r, 20)); // hold the lock briefly + results.push(id); + inside -= 1; + }); + }; + + // Launch both concurrently so they race on the stale lock. + await Promise.all([run(1), run(2)]); + + // Both callbacks must have run exactly once and never overlapped. + expect(results.toSorted((a, b) => a - b)).toEqual([1, 2]); + expect(maxInside).toBe(1); + }); +}); diff --git a/src/plugin-sdk/file-lock.ts b/src/plugin-sdk/file-lock.ts index 3870c38fc35..e0360602757 100644 --- a/src/plugin-sdk/file-lock.ts +++ b/src/plugin-sdk/file-lock.ts @@ -1,6 +1,6 @@ import fs from "node:fs/promises"; import path from "node:path"; -import { isPidAlive } from "../shared/pid-alive.js"; +import { getProcessStartTime, isPidAlive } from "../shared/pid-alive.js"; import { resolveProcessScopedMap } from "../shared/process-scoped-map.js"; export type FileLockOptions = { @@ -17,6 +17,11 @@ export type FileLockOptions = { type LockFilePayload = { pid: number; createdAt: string; + // /proc/{pid}/stat field 22 (clock ticks since boot), recorded at lock + // creation. Present only on Linux; omitted (undefined) on other platforms. + // Used to detect PID recycling: if the same PID is later alive but has a + // different startTime, it belongs to a different process. + startTime?: number; }; type HeldLock = { @@ -44,7 +49,11 @@ async function readLockPayload(lockPath: string): Promise { async function isStaleLock(lockPath: string, staleMs: number): Promise { const payload = await readLockPayload(lockPath); - if (payload?.pid && !isPidAlive(payload.pid)) { - return true; - } - if (payload?.createdAt) { + + if (payload !== null) { + // PID liveness alone is not enough: the OS can recycle a PID after the + // original holder exits. Three complementary checks handle this: + // + // 1. isPidAlive: fast path — if the PID is gone, the lock is stale. + // + // 2. startTime (Linux only): /proc/{pid}/stat field 22 records how long + // after boot the process started. If the current holder's startTime + // differs from the stored value, the PID was recycled by an unrelated + // process and the lock can be reclaimed immediately. + // + // 3. createdAt age: a reused PID inherits the old creation timestamp, so + // once it exceeds staleMs the lock is reclaimed on any platform. + if (!isPidAlive(payload.pid)) { + return true; + } + if (payload.startTime !== undefined) { + const currentStartTime = getProcessStartTime(payload.pid); + if (currentStartTime !== null && currentStartTime !== payload.startTime) { + return true; // PID was recycled by a different process + } + } const createdAt = Date.parse(payload.createdAt); if (!Number.isFinite(createdAt) || Date.now() - createdAt > staleMs) { return true; } + return false; } + + // payload is null: the lock file exists but its content is empty or + // unparseable. The most likely cause is a crash in the narrow window + // between open("wx") (file created, empty) and writeFile (payload written). + // A live writer still in that window is indistinguishable from a crashed + // one by content alone, so we fall back to the file's mtime: a young file + // (mtime < staleMs ago) may belong to a live writer; an old file was + // definitely orphaned. Treating null as immediately stale would steal the + // lock from a live writer and break mutual exclusion. try { const stat = await fs.stat(lockPath); return Date.now() - stat.mtimeMs > staleMs; } catch { - return true; + return true; // file vanished: another waiter already handled it } } @@ -117,11 +155,27 @@ export async function acquireFileLock( } const attempts = Math.max(1, options.retries.retries + 1); + // One-shot budget for the post-reclaim free slot (see stale-reclaim comment + // below). Consumed the first time we step attempt back; subsequent stale + // detections on the last slot are allowed to exhaust the loop and time out, + // preventing an endless spin when fs.rm silently fails (e.g. EACCES/EPERM). + let reclaimSlotAvailable = true; for (let attempt = 0; attempt < attempts; attempt += 1) { try { const handle = await fs.open(lockPath, "wx"); + const startTime = getProcessStartTime(process.pid); await handle.writeFile( - JSON.stringify({ pid: process.pid, createdAt: new Date().toISOString() }, null, 2), + JSON.stringify( + { + pid: process.pid, + createdAt: new Date().toISOString(), + // Omit startTime on non-Linux where it is null so the field is + // absent from the JSON rather than present as null. + ...(startTime !== null && { startTime }), + }, + null, + 2, + ), "utf8", ); HELD_LOCKS.set(normalizedFile, { count: 1, handle, lockPath }); @@ -134,10 +188,65 @@ export async function acquireFileLock( if (code !== "EEXIST") { throw err; } - if (await isStaleLock(lockPath, options.stale)) { - await fs.rm(lockPath, { force: true }).catch(() => undefined); + + // Snapshot the inode of the existing lock file *before* checking + // staleness. We compare it again just before unlinking; if the inode + // has changed in the interim, another waiter already reclaimed the + // stale file and created a fresh lock — deleting it would silently + // break that holder's mutual exclusion guarantee. + const staleIno = await fs + .stat(lockPath) + .then((s) => s.ino) + .catch(() => -1); + + // staleIno === -1 means the file vanished between open(EEXIST) and + // stat — another process already removed it. Skip straight to the + // next open(O_EXCL) attempt. + const isStale = staleIno === -1 || (await isStaleLock(lockPath, options.stale)); + + if (isStale) { + if (staleIno !== -1) { + // Re-verify the path still maps to the same inode we deemed stale. + // If it changed, a concurrent waiter beat us to the reclaim and has + // already written its own fresh lock; leave that file alone. + const currentIno = await fs + .stat(lockPath) + .then((s) => s.ino) + .catch(() => -1); + if (currentIno === staleIno) { + await fs.rm(lockPath, { force: true }).catch(() => undefined); + } + } + // Retry open(O_EXCL) regardless: either we removed the stale lock or + // a concurrent waiter already handled it; either way, the path is now + // either free or holds a fresh lock that isStaleLock will reject. + // + // Guard: the for-loop's `attempt += 1` runs after every `continue`, + // consuming a retry slot. If we are already on the last slot + // (attempt === attempts - 1), that increment exits the loop and we + // throw timeout without ever re-trying open(O_EXCL) on the now-clear + // path. This is the crash-recovery bug: an empty/partial .lock file + // (crash between open("wx") and writeFile) that becomes reclaimable + // on the last iteration causes the record to be silently dropped. + // + // Fix: when on the last slot, step attempt back by one so the + // upcoming += 1 nets to zero, guaranteeing at least one post-reclaim + // open(O_EXCL) attempt. No extra sleep budget is consumed — we only + // charge a retry for backoff sleeps, not reclaim-only work. + // + // Safety bound: reclaimSlotAvailable is consumed after the first use. + // If fs.rm silently fails (EACCES/EPERM swallowed by .catch above), + // subsequent iterations will still detect isStale=true; without the + // bound, decrementing attempt on every last-slot iteration would spin + // the loop forever. After the one-shot budget is gone, the loop is + // allowed to exhaust normally and throw timeout. + if (reclaimSlotAvailable && attempt >= attempts - 1) { + reclaimSlotAvailable = false; + attempt -= 1; + } continue; } + if (attempt >= attempts - 1) { break; } diff --git a/src/plugins/path-safety.ts b/src/plugins/path-safety.ts index 7935312cbe4..691931faf2e 100644 --- a/src/plugins/path-safety.ts +++ b/src/plugins/path-safety.ts @@ -11,7 +11,7 @@ export function safeRealpathSync(targetPath: string, cache?: Map return cached; } try { - const resolved = fs.realpathSync(targetPath); + const resolved = fs.realpathSync.native(targetPath); cache?.set(targetPath, resolved); return resolved; } catch { diff --git a/ui/package.json b/ui/package.json index 5d514f671cd..47884c24437 100644 --- a/ui/package.json +++ b/ui/package.json @@ -9,6 +9,8 @@ "test": "vitest run --config vitest.config.ts" }, "dependencies": { + "@lit-labs/signals": "^0.2.0", + "@lit/context": "^1.1.6", "@noble/ed25519": "3.0.1", "dompurify": "^3.3.3", "lit": "^3.3.2", diff --git a/ui/src/ui/views/chat.test.ts b/ui/src/ui/views/chat.test.ts index 8e0e18dcba9..d5b4732ffaa 100644 --- a/ui/src/ui/views/chat.test.ts +++ b/ui/src/ui/views/chat.test.ts @@ -91,6 +91,12 @@ function createChatHeaderState( sessionKey: "main", connected: true, sessionsHideCron: true, + sessionsLoading: false, + sessionsError: null, + sessionsFilterActive: "0", + sessionsFilterLimit: "0", + sessionsIncludeGlobal: true, + sessionsIncludeUnknown: true, sessionsResult: { ts: 0, path: "", @@ -111,6 +117,7 @@ function createChatHeaderState( chatModelOverrides: {}, chatModelCatalog: catalog, chatModelsLoading: false, + chatSending: false, client: { request } as unknown as GatewayBrowserClient, settings: { gatewayUrl: "",