diff --git a/src/agents/anthropic-payload-log.ts b/src/agents/anthropic-payload-log.ts index 7f4463a6b48..03c2cbc1c1c 100644 --- a/src/agents/anthropic-payload-log.ts +++ b/src/agents/anthropic-payload-log.ts @@ -1,5 +1,4 @@ import crypto from "node:crypto"; -import fs from "node:fs/promises"; import path from "node:path"; import type { AgentMessage, StreamFn } from "@mariozechner/pi-agent-core"; import type { Api, Model } from "@mariozechner/pi-ai"; @@ -8,6 +7,7 @@ import { createSubsystemLogger } from "../logging/subsystem.js"; import { resolveUserPath } from "../utils.js"; import { parseBooleanValue } from "../utils/boolean.js"; import { safeJsonStringify } from "../utils/safe-json.js"; +import { getQueuedFileWriter, type QueuedFileWriter } from "./queued-file-writer.js"; type PayloadLogStage = "request" | "usage"; @@ -32,10 +32,7 @@ type PayloadLogConfig = { filePath: string; }; -type PayloadLogWriter = { - filePath: string; - write: (line: string) => void; -}; +type PayloadLogWriter = QueuedFileWriter; const writers = new Map(); const log = createSubsystemLogger("agent/anthropic-payload"); @@ -50,27 +47,7 @@ function resolvePayloadLogConfig(env: NodeJS.ProcessEnv): PayloadLogConfig { } function getWriter(filePath: string): PayloadLogWriter { - const existing = writers.get(filePath); - if (existing) { - return existing; - } - - const dir = path.dirname(filePath); - const ready = fs.mkdir(dir, { recursive: true }).catch(() => undefined); - let queue = Promise.resolve(); - - const writer: PayloadLogWriter = { - filePath, - write: (line: string) => { - queue = queue - .then(() => ready) - .then(() => fs.appendFile(filePath, line, "utf8")) - .catch(() => undefined); - }, - }; - - writers.set(filePath, writer); - return writer; + return getQueuedFileWriter(writers, filePath); } function formatError(error: unknown): string | undefined { diff --git a/src/agents/cache-trace.ts b/src/agents/cache-trace.ts index f8ecd67c3db..0cc770dabe7 100644 --- a/src/agents/cache-trace.ts +++ b/src/agents/cache-trace.ts @@ -1,5 +1,4 @@ import crypto from "node:crypto"; -import fs from "node:fs/promises"; import path from "node:path"; import type { AgentMessage, StreamFn } from "@mariozechner/pi-agent-core"; import type { OpenClawConfig } from "../config/config.js"; @@ -7,6 +6,7 @@ import { resolveStateDir } from "../config/paths.js"; import { resolveUserPath } from "../utils.js"; import { parseBooleanValue } from "../utils/boolean.js"; import { safeJsonStringify } from "../utils/safe-json.js"; +import { getQueuedFileWriter, type QueuedFileWriter } from "./queued-file-writer.js"; export type CacheTraceStage = | "session:loaded" @@ -70,10 +70,7 @@ type CacheTraceConfig = { includeSystem: boolean; }; -type CacheTraceWriter = { - filePath: string; - write: (line: string) => void; -}; +type CacheTraceWriter = QueuedFileWriter; const writers = new Map(); @@ -102,27 +99,7 @@ function resolveCacheTraceConfig(params: CacheTraceInit): CacheTraceConfig { } function getWriter(filePath: string): CacheTraceWriter { - const existing = writers.get(filePath); - if (existing) { - return existing; - } - - const dir = path.dirname(filePath); - const ready = fs.mkdir(dir, { recursive: true }).catch(() => undefined); - let queue = Promise.resolve(); - - const writer: CacheTraceWriter = { - filePath, - write: (line: string) => { - queue = queue - .then(() => ready) - .then(() => fs.appendFile(filePath, line, "utf8")) - .catch(() => undefined); - }, - }; - - writers.set(filePath, writer); - return writer; + return getQueuedFileWriter(writers, filePath); } function stableStringify(value: unknown): string { diff --git a/src/agents/queued-file-writer.ts b/src/agents/queued-file-writer.ts new file mode 100644 index 00000000000..906ebee6f82 --- /dev/null +++ b/src/agents/queued-file-writer.ts @@ -0,0 +1,34 @@ +import fs from "node:fs/promises"; +import path from "node:path"; + +export type QueuedFileWriter = { + filePath: string; + write: (line: string) => void; +}; + +export function getQueuedFileWriter( + writers: Map, + filePath: string, +): QueuedFileWriter { + const existing = writers.get(filePath); + if (existing) { + return existing; + } + + const dir = path.dirname(filePath); + const ready = fs.mkdir(dir, { recursive: true }).catch(() => undefined); + let queue = Promise.resolve(); + + const writer: QueuedFileWriter = { + filePath, + write: (line: string) => { + queue = queue + .then(() => ready) + .then(() => fs.appendFile(filePath, line, "utf8")) + .catch(() => undefined); + }, + }; + + writers.set(filePath, writer); + return writer; +}