From 5b27b0cecfabfff5a33595d23b0a7d5dbcc27345 Mon Sep 17 00:00:00 2001 From: Peter Steinberger Date: Sat, 7 Mar 2026 23:07:11 +0000 Subject: [PATCH] refactor(outbound,agents): extract shared payload and queue helpers --- src/agents/pi-embedded-runner/extra-params.ts | 63 +++++++++++++------ src/infra/outbound/delivery-queue.ts | 60 ++++++++++-------- 2 files changed, 78 insertions(+), 45 deletions(-) diff --git a/src/agents/pi-embedded-runner/extra-params.ts b/src/agents/pi-embedded-runner/extra-params.ts index cfd71760cb4..c5051d4d7b1 100644 --- a/src/agents/pi-embedded-runner/extra-params.ts +++ b/src/agents/pi-embedded-runner/extra-params.ts @@ -297,6 +297,42 @@ function shouldEnableOpenAIResponsesServerCompaction( return model.provider === "openai"; } +function shouldStripResponsesStore( + model: { api?: unknown; compat?: { supportsStore?: boolean } }, + forceStore: boolean, +): boolean { + if (forceStore) { + return false; + } + if (typeof model.api !== "string") { + return false; + } + return OPENAI_RESPONSES_APIS.has(model.api) && model.compat?.supportsStore === false; +} + +function applyOpenAIResponsesPayloadOverrides(params: { + payloadObj: Record; + forceStore: boolean; + stripStore: boolean; + useServerCompaction: boolean; + compactThreshold: number; +}): void { + if (params.forceStore) { + params.payloadObj.store = true; + } + if (params.stripStore) { + delete params.payloadObj.store; + } + if (params.useServerCompaction && params.payloadObj.context_management === undefined) { + params.payloadObj.context_management = [ + { + type: "compaction", + compact_threshold: params.compactThreshold, + }, + ]; + } +} + function createOpenAIResponsesContextManagementWrapper( baseStreamFn: StreamFn | undefined, extraParams: Record | undefined, @@ -308,10 +344,7 @@ function createOpenAIResponsesContextManagementWrapper( // Strip `store` from the payload when the model declares supportsStore=false. // pi-ai upstream hardcodes `store: false` for Responses API; strict // OpenAI-compatible endpoints (e.g. Gemini via Cloudflare) reject it. - const stripStore = - !forceStore && - OPENAI_RESPONSES_APIS.has(String(model.api ?? "")) && - (model as { compat?: { supportsStore?: boolean } }).compat?.supportsStore === false; + const stripStore = shouldStripResponsesStore(model, forceStore); if (!forceStore && !useServerCompaction && !stripStore) { return underlying(model, context, options); } @@ -324,21 +357,13 @@ function createOpenAIResponsesContextManagementWrapper( ...options, onPayload: (payload) => { if (payload && typeof payload === "object") { - const payloadObj = payload as Record; - if (forceStore) { - payloadObj.store = true; - } - if (stripStore) { - delete payloadObj.store; - } - if (useServerCompaction && payloadObj.context_management === undefined) { - payloadObj.context_management = [ - { - type: "compaction", - compact_threshold: compactThreshold, - }, - ]; - } + applyOpenAIResponsesPayloadOverrides({ + payloadObj: payload as Record, + forceStore, + stripStore, + useServerCompaction, + compactThreshold, + }); } originalOnPayload?.(payload); }, diff --git a/src/infra/outbound/delivery-queue.ts b/src/infra/outbound/delivery-queue.ts index 8a79cc732d5..1cbab613bc4 100644 --- a/src/infra/outbound/delivery-queue.ts +++ b/src/infra/outbound/delivery-queue.ts @@ -67,6 +67,34 @@ function resolveFailedDir(stateDir?: string): string { return path.join(resolveQueueDir(stateDir), FAILED_DIRNAME); } +function resolveQueueEntryPaths( + id: string, + stateDir?: string, +): { + jsonPath: string; + deliveredPath: string; +} { + const queueDir = resolveQueueDir(stateDir); + return { + jsonPath: path.join(queueDir, `${id}.json`), + deliveredPath: path.join(queueDir, `${id}.delivered`), + }; +} + +function getErrnoCode(err: unknown): string | null { + return err && typeof err === "object" && "code" in err + ? String((err as { code?: unknown }).code) + : null; +} + +async function unlinkBestEffort(filePath: string): Promise { + try { + await fs.promises.unlink(filePath); + } catch { + // Best-effort cleanup. + } +} + /** Ensure the queue directory (and failed/ subdirectory) exist. */ export async function ensureQueueDir(stateDir?: string): Promise { const queueDir = resolveQueueDir(stateDir); @@ -117,35 +145,22 @@ export async function enqueueDelivery( * by {@link loadPendingDeliveries} on the next startup without re-sending. */ export async function ackDelivery(id: string, stateDir?: string): Promise { - const queueDir = resolveQueueDir(stateDir); - const jsonPath = path.join(queueDir, `${id}.json`); - const deliveredPath = path.join(queueDir, `${id}.delivered`); + const { jsonPath, deliveredPath } = resolveQueueEntryPaths(id, stateDir); try { // Phase 1: atomic rename marks the delivery as complete. await fs.promises.rename(jsonPath, deliveredPath); } catch (err) { - const code = - err && typeof err === "object" && "code" in err - ? String((err as { code?: unknown }).code) - : null; + const code = getErrnoCode(err); if (code === "ENOENT") { // .json already gone — may have been renamed by a previous ack attempt. // Try to clean up a leftover .delivered marker if present. - try { - await fs.promises.unlink(deliveredPath); - } catch { - // marker already gone — no-op. - } + await unlinkBestEffort(deliveredPath); return; } throw err; } // Phase 2: remove the marker file. - try { - await fs.promises.unlink(deliveredPath); - } catch { - // Best-effort; loadPendingDeliveries will clean it up on next startup. - } + await unlinkBestEffort(deliveredPath); } /** Update a queue entry after a failed delivery attempt. */ @@ -171,10 +186,7 @@ export async function loadPendingDeliveries(stateDir?: string): Promise