From 9bfd3ca1958aae1513990acad1043a0dec52316b Mon Sep 17 00:00:00 2001 From: Peter Steinberger Date: Tue, 17 Feb 2026 00:10:32 +0000 Subject: [PATCH] refactor(memory): consolidate embeddings and batch helpers --- src/memory/batch-gemini.ts | 223 +++++++++++-------------- src/memory/batch-openai.ts | 178 +++++++++----------- src/memory/batch-runner.ts | 40 +++++ src/memory/batch-upload.ts | 37 ++++ src/memory/batch-voyage.ts | 211 +++++++++++------------ src/memory/embeddings-debug.ts | 13 ++ src/memory/embeddings-gemini.ts | 16 +- src/memory/embeddings-openai.ts | 47 ++---- src/memory/embeddings-remote-client.ts | 33 ++++ src/memory/embeddings-remote-fetch.ts | 21 +++ src/memory/embeddings-voyage.ts | 47 ++---- 11 files changed, 443 insertions(+), 423 deletions(-) create mode 100644 src/memory/batch-runner.ts create mode 100644 src/memory/batch-upload.ts create mode 100644 src/memory/embeddings-debug.ts create mode 100644 src/memory/embeddings-remote-client.ts create mode 100644 src/memory/embeddings-remote-fetch.ts diff --git a/src/memory/batch-gemini.ts b/src/memory/batch-gemini.ts index a0feef2672a..cb7df35f61e 100644 --- a/src/memory/batch-gemini.ts +++ b/src/memory/batch-gemini.ts @@ -1,8 +1,8 @@ import type { GeminiEmbeddingClient } from "./embeddings-gemini.js"; -import { isTruthyEnvValue } from "../infra/env.js"; -import { createSubsystemLogger } from "../logging/subsystem.js"; -import { buildBatchHeaders, normalizeBatchBaseUrl, splitBatchRequests } from "./batch-utils.js"; -import { hashText, runWithConcurrency } from "./internal.js"; +import { runEmbeddingBatchGroups } from "./batch-runner.js"; +import { buildBatchHeaders, normalizeBatchBaseUrl } from "./batch-utils.js"; +import { debugEmbeddingsLog } from "./embeddings-debug.js"; +import { hashText } from "./internal.js"; export type GeminiBatchRequest = { custom_id: string; @@ -35,17 +35,6 @@ export type GeminiBatchOutputLine = { }; const GEMINI_BATCH_MAX_REQUESTS = 50000; -const debugEmbeddings = isTruthyEnvValue(process.env.OPENCLAW_DEBUG_MEMORY_EMBEDDINGS); -const log = createSubsystemLogger("memory/embeddings"); - -const debugLog = (message: string, meta?: Record) => { - if (!debugEmbeddings) { - return; - } - const suffix = meta ? ` ${JSON.stringify(meta)}` : ""; - log.raw(`${message}${suffix}`); -}; - function getGeminiUploadUrl(baseUrl: string): string { if (baseUrl.includes("/v1beta")) { return baseUrl.replace(/\/v1beta\/?$/, "/upload/v1beta"); @@ -99,7 +88,7 @@ async function submitGeminiBatch(params: { const uploadPayload = buildGeminiUploadBody({ jsonl, displayName }); const uploadUrl = `${getGeminiUploadUrl(baseUrl)}/files?uploadType=multipart`; - debugLog("memory embeddings: gemini batch upload", { + debugEmbeddingsLog("memory embeddings: gemini batch upload", { uploadUrl, baseUrl, requests: params.requests.length, @@ -132,7 +121,7 @@ async function submitGeminiBatch(params: { }; const batchEndpoint = `${baseUrl}/${params.gemini.modelPath}:asyncBatchEmbedContent`; - debugLog("memory embeddings: gemini batch create", { + debugEmbeddingsLog("memory embeddings: gemini batch create", { batchEndpoint, fileId, }); @@ -162,7 +151,7 @@ async function fetchGeminiBatchStatus(params: { ? params.batchName : `batches/${params.batchName}`; const statusUrl = `${baseUrl}/${name}`; - debugLog("memory embeddings: gemini batch status", { statusUrl }); + debugEmbeddingsLog("memory embeddings: gemini batch status", { statusUrl }); const res = await fetch(statusUrl, { headers: buildBatchHeaders(params.gemini, { json: true }), }); @@ -180,7 +169,7 @@ async function fetchGeminiFileContent(params: { const baseUrl = normalizeBatchBaseUrl(params.gemini); const file = params.fileId.startsWith("files/") ? params.fileId : `files/${params.fileId}`; const downloadUrl = `${baseUrl}/${file}:download`; - debugLog("memory embeddings: gemini batch download", { downloadUrl }); + debugEmbeddingsLog("memory embeddings: gemini batch download", { downloadUrl }); const res = await fetch(downloadUrl, { headers: buildBatchHeaders(params.gemini, { json: true }), }); @@ -257,110 +246,102 @@ export async function runGeminiEmbeddingBatches(params: { concurrency: number; debug?: (message: string, data?: Record) => void; }): Promise> { - if (params.requests.length === 0) { - return new Map(); - } - const groups = splitBatchRequests(params.requests, GEMINI_BATCH_MAX_REQUESTS); - const byCustomId = new Map(); - - const tasks = groups.map((group, groupIndex) => async () => { - const batchInfo = await submitGeminiBatch({ - gemini: params.gemini, - requests: group, - agentId: params.agentId, - }); - const batchName = batchInfo.name ?? ""; - if (!batchName) { - throw new Error("gemini batch create failed: missing batch name"); - } - - params.debug?.("memory embeddings: gemini batch created", { - batchName, - state: batchInfo.state, - group: groupIndex + 1, - groups: groups.length, - requests: group.length, - }); - - if ( - !params.wait && - batchInfo.state && - !["SUCCEEDED", "COMPLETED", "DONE"].includes(batchInfo.state) - ) { - throw new Error( - `gemini batch ${batchName} submitted; enable remote.batch.wait to await completion`, - ); - } - - const completed = - batchInfo.state && ["SUCCEEDED", "COMPLETED", "DONE"].includes(batchInfo.state) - ? { - outputFileId: - batchInfo.outputConfig?.file ?? - batchInfo.outputConfig?.fileId ?? - batchInfo.metadata?.output?.responsesFile ?? - "", - } - : await waitForGeminiBatch({ - gemini: params.gemini, - batchName, - wait: params.wait, - pollIntervalMs: params.pollIntervalMs, - timeoutMs: params.timeoutMs, - debug: params.debug, - initial: batchInfo, - }); - if (!completed.outputFileId) { - throw new Error(`gemini batch ${batchName} completed without output file`); - } - - const content = await fetchGeminiFileContent({ - gemini: params.gemini, - fileId: completed.outputFileId, - }); - const outputLines = parseGeminiBatchOutput(content); - const errors: string[] = []; - const remaining = new Set(group.map((request) => request.custom_id)); - - for (const line of outputLines) { - const customId = line.key ?? line.custom_id ?? line.request_id; - if (!customId) { - continue; - } - remaining.delete(customId); - if (line.error?.message) { - errors.push(`${customId}: ${line.error.message}`); - continue; - } - if (line.response?.error?.message) { - errors.push(`${customId}: ${line.response.error.message}`); - continue; - } - const embedding = line.embedding?.values ?? line.response?.embedding?.values ?? []; - if (embedding.length === 0) { - errors.push(`${customId}: empty embedding`); - continue; - } - byCustomId.set(customId, embedding); - } - - if (errors.length > 0) { - throw new Error(`gemini batch ${batchName} failed: ${errors.join("; ")}`); - } - if (remaining.size > 0) { - throw new Error(`gemini batch ${batchName} missing ${remaining.size} embedding responses`); - } - }); - - params.debug?.("memory embeddings: gemini batch submit", { - requests: params.requests.length, - groups: groups.length, + return await runEmbeddingBatchGroups({ + requests: params.requests, + maxRequests: GEMINI_BATCH_MAX_REQUESTS, wait: params.wait, - concurrency: params.concurrency, pollIntervalMs: params.pollIntervalMs, timeoutMs: params.timeoutMs, - }); + concurrency: params.concurrency, + debug: params.debug, + debugLabel: "memory embeddings: gemini batch submit", + runGroup: async ({ group, groupIndex, groups, byCustomId }) => { + const batchInfo = await submitGeminiBatch({ + gemini: params.gemini, + requests: group, + agentId: params.agentId, + }); + const batchName = batchInfo.name ?? ""; + if (!batchName) { + throw new Error("gemini batch create failed: missing batch name"); + } - await runWithConcurrency(tasks, params.concurrency); - return byCustomId; + params.debug?.("memory embeddings: gemini batch created", { + batchName, + state: batchInfo.state, + group: groupIndex + 1, + groups, + requests: group.length, + }); + + if ( + !params.wait && + batchInfo.state && + !["SUCCEEDED", "COMPLETED", "DONE"].includes(batchInfo.state) + ) { + throw new Error( + `gemini batch ${batchName} submitted; enable remote.batch.wait to await completion`, + ); + } + + const completed = + batchInfo.state && ["SUCCEEDED", "COMPLETED", "DONE"].includes(batchInfo.state) + ? { + outputFileId: + batchInfo.outputConfig?.file ?? + batchInfo.outputConfig?.fileId ?? + batchInfo.metadata?.output?.responsesFile ?? + "", + } + : await waitForGeminiBatch({ + gemini: params.gemini, + batchName, + wait: params.wait, + pollIntervalMs: params.pollIntervalMs, + timeoutMs: params.timeoutMs, + debug: params.debug, + initial: batchInfo, + }); + if (!completed.outputFileId) { + throw new Error(`gemini batch ${batchName} completed without output file`); + } + + const content = await fetchGeminiFileContent({ + gemini: params.gemini, + fileId: completed.outputFileId, + }); + const outputLines = parseGeminiBatchOutput(content); + const errors: string[] = []; + const remaining = new Set(group.map((request) => request.custom_id)); + + for (const line of outputLines) { + const customId = line.key ?? line.custom_id ?? line.request_id; + if (!customId) { + continue; + } + remaining.delete(customId); + if (line.error?.message) { + errors.push(`${customId}: ${line.error.message}`); + continue; + } + if (line.response?.error?.message) { + errors.push(`${customId}: ${line.response.error.message}`); + continue; + } + const embedding = line.embedding?.values ?? line.response?.embedding?.values ?? []; + if (embedding.length === 0) { + errors.push(`${customId}: empty embedding`); + continue; + } + byCustomId.set(customId, embedding); + } + + if (errors.length > 0) { + throw new Error(`gemini batch ${batchName} failed: ${errors.join("; ")}`); + } + if (remaining.size > 0) { + throw new Error(`gemini batch ${batchName} missing ${remaining.size} embedding responses`); + } + }, + }); } diff --git a/src/memory/batch-openai.ts b/src/memory/batch-openai.ts index d0c84f7fb08..80716b1b3c2 100644 --- a/src/memory/batch-openai.ts +++ b/src/memory/batch-openai.ts @@ -2,8 +2,9 @@ import type { OpenAiEmbeddingClient } from "./embeddings-openai.js"; import { extractBatchErrorMessage, formatUnavailableBatchError } from "./batch-error-utils.js"; import { postJsonWithRetry } from "./batch-http.js"; import { applyEmbeddingBatchOutputLine } from "./batch-output.js"; -import { buildBatchHeaders, normalizeBatchBaseUrl, splitBatchRequests } from "./batch-utils.js"; -import { hashText, runWithConcurrency } from "./internal.js"; +import { runEmbeddingBatchGroups } from "./batch-runner.js"; +import { uploadBatchJsonlFile } from "./batch-upload.js"; +import { buildBatchHeaders, normalizeBatchBaseUrl } from "./batch-utils.js"; export type OpenAiBatchRequest = { custom_id: string; @@ -44,34 +45,17 @@ async function submitOpenAiBatch(params: { agentId: string; }): Promise { const baseUrl = normalizeBatchBaseUrl(params.openAi); - const jsonl = params.requests.map((request) => JSON.stringify(request)).join("\n"); - const form = new FormData(); - form.append("purpose", "batch"); - form.append( - "file", - new Blob([jsonl], { type: "application/jsonl" }), - `memory-embeddings.${hashText(String(Date.now()))}.jsonl`, - ); - - const fileRes = await fetch(`${baseUrl}/files`, { - method: "POST", - headers: buildBatchHeaders(params.openAi, { json: false }), - body: form, + const inputFileId = await uploadBatchJsonlFile({ + client: params.openAi, + requests: params.requests, + errorPrefix: "openai batch file upload failed", }); - if (!fileRes.ok) { - const text = await fileRes.text(); - throw new Error(`openai batch file upload failed: ${fileRes.status} ${text}`); - } - const filePayload = (await fileRes.json()) as { id?: string }; - if (!filePayload.id) { - throw new Error("openai batch file upload failed: missing file id"); - } return await postJsonWithRetry({ url: `${baseUrl}/batches`, headers: buildBatchHeaders(params.openAi, { json: true }), body: { - input_file_id: filePayload.id, + input_file_id: inputFileId, endpoint: OPENAI_BATCH_ENDPOINT, completion_window: OPENAI_BATCH_COMPLETION_WINDOW, metadata: { @@ -197,84 +181,78 @@ export async function runOpenAiEmbeddingBatches(params: { concurrency: number; debug?: (message: string, data?: Record) => void; }): Promise> { - if (params.requests.length === 0) { - return new Map(); - } - const groups = splitBatchRequests(params.requests, OPENAI_BATCH_MAX_REQUESTS); - const byCustomId = new Map(); - - const tasks = groups.map((group, groupIndex) => async () => { - const batchInfo = await submitOpenAiBatch({ - openAi: params.openAi, - requests: group, - agentId: params.agentId, - }); - if (!batchInfo.id) { - throw new Error("openai batch create failed: missing batch id"); - } - - params.debug?.("memory embeddings: openai batch created", { - batchId: batchInfo.id, - status: batchInfo.status, - group: groupIndex + 1, - groups: groups.length, - requests: group.length, - }); - - if (!params.wait && batchInfo.status !== "completed") { - throw new Error( - `openai batch ${batchInfo.id} submitted; enable remote.batch.wait to await completion`, - ); - } - - const completed = - batchInfo.status === "completed" - ? { - outputFileId: batchInfo.output_file_id ?? "", - errorFileId: batchInfo.error_file_id ?? undefined, - } - : await waitForOpenAiBatch({ - openAi: params.openAi, - batchId: batchInfo.id, - wait: params.wait, - pollIntervalMs: params.pollIntervalMs, - timeoutMs: params.timeoutMs, - debug: params.debug, - initial: batchInfo, - }); - if (!completed.outputFileId) { - throw new Error(`openai batch ${batchInfo.id} completed without output file`); - } - - const content = await fetchOpenAiFileContent({ - openAi: params.openAi, - fileId: completed.outputFileId, - }); - const outputLines = parseOpenAiBatchOutput(content); - const errors: string[] = []; - const remaining = new Set(group.map((request) => request.custom_id)); - - for (const line of outputLines) { - applyEmbeddingBatchOutputLine({ line, remaining, errors, byCustomId }); - } - - if (errors.length > 0) { - throw new Error(`openai batch ${batchInfo.id} failed: ${errors.join("; ")}`); - } - if (remaining.size > 0) { - throw new Error(`openai batch ${batchInfo.id} missing ${remaining.size} embedding responses`); - } - }); - - params.debug?.("memory embeddings: openai batch submit", { - requests: params.requests.length, - groups: groups.length, + return await runEmbeddingBatchGroups({ + requests: params.requests, + maxRequests: OPENAI_BATCH_MAX_REQUESTS, wait: params.wait, - concurrency: params.concurrency, pollIntervalMs: params.pollIntervalMs, timeoutMs: params.timeoutMs, - }); + concurrency: params.concurrency, + debug: params.debug, + debugLabel: "memory embeddings: openai batch submit", + runGroup: async ({ group, groupIndex, groups, byCustomId }) => { + const batchInfo = await submitOpenAiBatch({ + openAi: params.openAi, + requests: group, + agentId: params.agentId, + }); + if (!batchInfo.id) { + throw new Error("openai batch create failed: missing batch id"); + } - await runWithConcurrency(tasks, params.concurrency); - return byCustomId; + params.debug?.("memory embeddings: openai batch created", { + batchId: batchInfo.id, + status: batchInfo.status, + group: groupIndex + 1, + groups, + requests: group.length, + }); + + if (!params.wait && batchInfo.status !== "completed") { + throw new Error( + `openai batch ${batchInfo.id} submitted; enable remote.batch.wait to await completion`, + ); + } + + const completed = + batchInfo.status === "completed" + ? { + outputFileId: batchInfo.output_file_id ?? "", + errorFileId: batchInfo.error_file_id ?? undefined, + } + : await waitForOpenAiBatch({ + openAi: params.openAi, + batchId: batchInfo.id, + wait: params.wait, + pollIntervalMs: params.pollIntervalMs, + timeoutMs: params.timeoutMs, + debug: params.debug, + initial: batchInfo, + }); + if (!completed.outputFileId) { + throw new Error(`openai batch ${batchInfo.id} completed without output file`); + } + + const content = await fetchOpenAiFileContent({ + openAi: params.openAi, + fileId: completed.outputFileId, + }); + const outputLines = parseOpenAiBatchOutput(content); + const errors: string[] = []; + const remaining = new Set(group.map((request) => request.custom_id)); + + for (const line of outputLines) { + applyEmbeddingBatchOutputLine({ line, remaining, errors, byCustomId }); + } + + if (errors.length > 0) { + throw new Error(`openai batch ${batchInfo.id} failed: ${errors.join("; ")}`); + } + if (remaining.size > 0) { + throw new Error( + `openai batch ${batchInfo.id} missing ${remaining.size} embedding responses`, + ); + } + }, + }); } diff --git a/src/memory/batch-runner.ts b/src/memory/batch-runner.ts new file mode 100644 index 00000000000..52045a3a268 --- /dev/null +++ b/src/memory/batch-runner.ts @@ -0,0 +1,40 @@ +import { splitBatchRequests } from "./batch-utils.js"; +import { runWithConcurrency } from "./internal.js"; + +export async function runEmbeddingBatchGroups(params: { + requests: TRequest[]; + maxRequests: number; + wait: boolean; + pollIntervalMs: number; + timeoutMs: number; + concurrency: number; + debugLabel: string; + debug?: (message: string, data?: Record) => void; + runGroup: (args: { + group: TRequest[]; + groupIndex: number; + groups: number; + byCustomId: Map; + }) => Promise; +}): Promise> { + if (params.requests.length === 0) { + return new Map(); + } + const groups = splitBatchRequests(params.requests, params.maxRequests); + const byCustomId = new Map(); + const tasks = groups.map((group, groupIndex) => async () => { + await params.runGroup({ group, groupIndex, groups: groups.length, byCustomId }); + }); + + params.debug?.(params.debugLabel, { + requests: params.requests.length, + groups: groups.length, + wait: params.wait, + concurrency: params.concurrency, + pollIntervalMs: params.pollIntervalMs, + timeoutMs: params.timeoutMs, + }); + + await runWithConcurrency(tasks, params.concurrency); + return byCustomId; +} diff --git a/src/memory/batch-upload.ts b/src/memory/batch-upload.ts new file mode 100644 index 00000000000..94b8713050f --- /dev/null +++ b/src/memory/batch-upload.ts @@ -0,0 +1,37 @@ +import { + buildBatchHeaders, + normalizeBatchBaseUrl, + type BatchHttpClientConfig, +} from "./batch-utils.js"; +import { hashText } from "./internal.js"; + +export async function uploadBatchJsonlFile(params: { + client: BatchHttpClientConfig; + requests: unknown[]; + errorPrefix: string; +}): Promise { + const baseUrl = normalizeBatchBaseUrl(params.client); + const jsonl = params.requests.map((request) => JSON.stringify(request)).join("\n"); + const form = new FormData(); + form.append("purpose", "batch"); + form.append( + "file", + new Blob([jsonl], { type: "application/jsonl" }), + `memory-embeddings.${hashText(String(Date.now()))}.jsonl`, + ); + + const fileRes = await fetch(`${baseUrl}/files`, { + method: "POST", + headers: buildBatchHeaders(params.client, { json: false }), + body: form, + }); + if (!fileRes.ok) { + const text = await fileRes.text(); + throw new Error(`${params.errorPrefix}: ${fileRes.status} ${text}`); + } + const filePayload = (await fileRes.json()) as { id?: string }; + if (!filePayload.id) { + throw new Error(`${params.errorPrefix}: missing file id`); + } + return filePayload.id; +} diff --git a/src/memory/batch-voyage.ts b/src/memory/batch-voyage.ts index 1a9b21a6ad3..8919c6fa412 100644 --- a/src/memory/batch-voyage.ts +++ b/src/memory/batch-voyage.ts @@ -4,8 +4,9 @@ import type { VoyageEmbeddingClient } from "./embeddings-voyage.js"; import { extractBatchErrorMessage, formatUnavailableBatchError } from "./batch-error-utils.js"; import { postJsonWithRetry } from "./batch-http.js"; import { applyEmbeddingBatchOutputLine } from "./batch-output.js"; -import { buildBatchHeaders, normalizeBatchBaseUrl, splitBatchRequests } from "./batch-utils.js"; -import { hashText, runWithConcurrency } from "./internal.js"; +import { runEmbeddingBatchGroups } from "./batch-runner.js"; +import { uploadBatchJsonlFile } from "./batch-upload.js"; +import { buildBatchHeaders, normalizeBatchBaseUrl } from "./batch-utils.js"; /** * Voyage Batch API Input Line format. @@ -47,36 +48,18 @@ async function submitVoyageBatch(params: { agentId: string; }): Promise { const baseUrl = normalizeBatchBaseUrl(params.client); - const jsonl = params.requests.map((request) => JSON.stringify(request)).join("\n"); - const form = new FormData(); - form.append("purpose", "batch"); - form.append( - "file", - new Blob([jsonl], { type: "application/jsonl" }), - `memory-embeddings.${hashText(String(Date.now()))}.jsonl`, - ); - - // 1. Upload file using Voyage Files API - const fileRes = await fetch(`${baseUrl}/files`, { - method: "POST", - headers: buildBatchHeaders(params.client, { json: false }), - body: form, + const inputFileId = await uploadBatchJsonlFile({ + client: params.client, + requests: params.requests, + errorPrefix: "voyage batch file upload failed", }); - if (!fileRes.ok) { - const text = await fileRes.text(); - throw new Error(`voyage batch file upload failed: ${fileRes.status} ${text}`); - } - const filePayload = (await fileRes.json()) as { id?: string }; - if (!filePayload.id) { - throw new Error("voyage batch file upload failed: missing file id"); - } // 2. Create batch job using Voyage Batches API return await postJsonWithRetry({ url: `${baseUrl}/batches`, headers: buildBatchHeaders(params.client, { json: true }), body: { - input_file_id: filePayload.id, + input_file_id: inputFileId, endpoint: VOYAGE_BATCH_ENDPOINT, completion_window: VOYAGE_BATCH_COMPLETION_WINDOW, request_params: { @@ -192,99 +175,95 @@ export async function runVoyageEmbeddingBatches(params: { concurrency: number; debug?: (message: string, data?: Record) => void; }): Promise> { - if (params.requests.length === 0) { - return new Map(); - } - const groups = splitBatchRequests(params.requests, VOYAGE_BATCH_MAX_REQUESTS); - const byCustomId = new Map(); - - const tasks = groups.map((group, groupIndex) => async () => { - const batchInfo = await submitVoyageBatch({ - client: params.client, - requests: group, - agentId: params.agentId, - }); - if (!batchInfo.id) { - throw new Error("voyage batch create failed: missing batch id"); - } - - params.debug?.("memory embeddings: voyage batch created", { - batchId: batchInfo.id, - status: batchInfo.status, - group: groupIndex + 1, - groups: groups.length, - requests: group.length, - }); - - if (!params.wait && batchInfo.status !== "completed") { - throw new Error( - `voyage batch ${batchInfo.id} submitted; enable remote.batch.wait to await completion`, - ); - } - - const completed = - batchInfo.status === "completed" - ? { - outputFileId: batchInfo.output_file_id ?? "", - errorFileId: batchInfo.error_file_id ?? undefined, - } - : await waitForVoyageBatch({ - client: params.client, - batchId: batchInfo.id, - wait: params.wait, - pollIntervalMs: params.pollIntervalMs, - timeoutMs: params.timeoutMs, - debug: params.debug, - initial: batchInfo, - }); - if (!completed.outputFileId) { - throw new Error(`voyage batch ${batchInfo.id} completed without output file`); - } - - const baseUrl = normalizeBatchBaseUrl(params.client); - const contentRes = await fetch(`${baseUrl}/files/${completed.outputFileId}/content`, { - headers: buildBatchHeaders(params.client, { json: true }), - }); - if (!contentRes.ok) { - const text = await contentRes.text(); - throw new Error(`voyage batch file content failed: ${contentRes.status} ${text}`); - } - - const errors: string[] = []; - const remaining = new Set(group.map((request) => request.custom_id)); - - if (contentRes.body) { - const reader = createInterface({ - input: Readable.fromWeb(contentRes.body as unknown as import("stream/web").ReadableStream), - terminal: false, - }); - - for await (const rawLine of reader) { - if (!rawLine.trim()) { - continue; - } - const line = JSON.parse(rawLine) as VoyageBatchOutputLine; - applyEmbeddingBatchOutputLine({ line, remaining, errors, byCustomId }); - } - } - - if (errors.length > 0) { - throw new Error(`voyage batch ${batchInfo.id} failed: ${errors.join("; ")}`); - } - if (remaining.size > 0) { - throw new Error(`voyage batch ${batchInfo.id} missing ${remaining.size} embedding responses`); - } - }); - - params.debug?.("memory embeddings: voyage batch submit", { - requests: params.requests.length, - groups: groups.length, + return await runEmbeddingBatchGroups({ + requests: params.requests, + maxRequests: VOYAGE_BATCH_MAX_REQUESTS, wait: params.wait, - concurrency: params.concurrency, pollIntervalMs: params.pollIntervalMs, timeoutMs: params.timeoutMs, - }); + concurrency: params.concurrency, + debug: params.debug, + debugLabel: "memory embeddings: voyage batch submit", + runGroup: async ({ group, groupIndex, groups, byCustomId }) => { + const batchInfo = await submitVoyageBatch({ + client: params.client, + requests: group, + agentId: params.agentId, + }); + if (!batchInfo.id) { + throw new Error("voyage batch create failed: missing batch id"); + } - await runWithConcurrency(tasks, params.concurrency); - return byCustomId; + params.debug?.("memory embeddings: voyage batch created", { + batchId: batchInfo.id, + status: batchInfo.status, + group: groupIndex + 1, + groups, + requests: group.length, + }); + + if (!params.wait && batchInfo.status !== "completed") { + throw new Error( + `voyage batch ${batchInfo.id} submitted; enable remote.batch.wait to await completion`, + ); + } + + const completed = + batchInfo.status === "completed" + ? { + outputFileId: batchInfo.output_file_id ?? "", + errorFileId: batchInfo.error_file_id ?? undefined, + } + : await waitForVoyageBatch({ + client: params.client, + batchId: batchInfo.id, + wait: params.wait, + pollIntervalMs: params.pollIntervalMs, + timeoutMs: params.timeoutMs, + debug: params.debug, + initial: batchInfo, + }); + if (!completed.outputFileId) { + throw new Error(`voyage batch ${batchInfo.id} completed without output file`); + } + + const baseUrl = normalizeBatchBaseUrl(params.client); + const contentRes = await fetch(`${baseUrl}/files/${completed.outputFileId}/content`, { + headers: buildBatchHeaders(params.client, { json: true }), + }); + if (!contentRes.ok) { + const text = await contentRes.text(); + throw new Error(`voyage batch file content failed: ${contentRes.status} ${text}`); + } + + const errors: string[] = []; + const remaining = new Set(group.map((request) => request.custom_id)); + + if (contentRes.body) { + const reader = createInterface({ + input: Readable.fromWeb( + contentRes.body as unknown as import("stream/web").ReadableStream, + ), + terminal: false, + }); + + for await (const rawLine of reader) { + if (!rawLine.trim()) { + continue; + } + const line = JSON.parse(rawLine) as VoyageBatchOutputLine; + applyEmbeddingBatchOutputLine({ line, remaining, errors, byCustomId }); + } + } + + if (errors.length > 0) { + throw new Error(`voyage batch ${batchInfo.id} failed: ${errors.join("; ")}`); + } + if (remaining.size > 0) { + throw new Error( + `voyage batch ${batchInfo.id} missing ${remaining.size} embedding responses`, + ); + } + }, + }); } diff --git a/src/memory/embeddings-debug.ts b/src/memory/embeddings-debug.ts new file mode 100644 index 00000000000..951d88b6c09 --- /dev/null +++ b/src/memory/embeddings-debug.ts @@ -0,0 +1,13 @@ +import { isTruthyEnvValue } from "../infra/env.js"; +import { createSubsystemLogger } from "../logging/subsystem.js"; + +const debugEmbeddings = isTruthyEnvValue(process.env.OPENCLAW_DEBUG_MEMORY_EMBEDDINGS); +const log = createSubsystemLogger("memory/embeddings"); + +export function debugEmbeddingsLog(message: string, meta?: Record): void { + if (!debugEmbeddings) { + return; + } + const suffix = meta ? ` ${JSON.stringify(meta)}` : ""; + log.raw(`${message}${suffix}`); +} diff --git a/src/memory/embeddings-gemini.ts b/src/memory/embeddings-gemini.ts index 8c4f1a66fb4..29afb2d9fb3 100644 --- a/src/memory/embeddings-gemini.ts +++ b/src/memory/embeddings-gemini.ts @@ -1,8 +1,7 @@ import type { EmbeddingProvider, EmbeddingProviderOptions } from "./embeddings.js"; import { requireApiKey, resolveApiKeyForProvider } from "../agents/model-auth.js"; -import { isTruthyEnvValue } from "../infra/env.js"; import { parseGeminiAuth } from "../infra/gemini-auth.js"; -import { createSubsystemLogger } from "../logging/subsystem.js"; +import { debugEmbeddingsLog } from "./embeddings-debug.js"; export type GeminiEmbeddingClient = { baseUrl: string; @@ -16,17 +15,6 @@ export const DEFAULT_GEMINI_EMBEDDING_MODEL = "gemini-embedding-001"; const GEMINI_MAX_INPUT_TOKENS: Record = { "text-embedding-004": 2048, }; -const debugEmbeddings = isTruthyEnvValue(process.env.OPENCLAW_DEBUG_MEMORY_EMBEDDINGS); -const log = createSubsystemLogger("memory/embeddings"); - -const debugLog = (message: string, meta?: Record) => { - if (!debugEmbeddings) { - return; - } - const suffix = meta ? ` ${JSON.stringify(meta)}` : ""; - log.raw(`${message}${suffix}`); -}; - function resolveRemoteApiKey(remoteApiKey?: string): string | undefined { const trimmed = remoteApiKey?.trim(); if (!trimmed) { @@ -158,7 +146,7 @@ export async function resolveGeminiEmbeddingClient( }; const model = normalizeGeminiModel(options.model); const modelPath = buildGeminiModelPath(model); - debugLog("memory embeddings: gemini client", { + debugEmbeddingsLog("memory embeddings: gemini client", { rawBaseUrl, baseUrl, model, diff --git a/src/memory/embeddings-openai.ts b/src/memory/embeddings-openai.ts index f4705fd6245..806d5f3358c 100644 --- a/src/memory/embeddings-openai.ts +++ b/src/memory/embeddings-openai.ts @@ -1,5 +1,6 @@ import type { EmbeddingProvider, EmbeddingProviderOptions } from "./embeddings.js"; -import { requireApiKey, resolveApiKeyForProvider } from "../agents/model-auth.js"; +import { resolveRemoteEmbeddingBearerClient } from "./embeddings-remote-client.js"; +import { fetchRemoteEmbeddingVectors } from "./embeddings-remote-fetch.js"; export type OpenAiEmbeddingClient = { baseUrl: string; @@ -36,20 +37,12 @@ export async function createOpenAiEmbeddingProvider( if (input.length === 0) { return []; } - const res = await fetch(url, { - method: "POST", + return await fetchRemoteEmbeddingVectors({ + url, headers: client.headers, - body: JSON.stringify({ model: client.model, input }), + body: { model: client.model, input }, + errorPrefix: "openai embeddings failed", }); - if (!res.ok) { - const text = await res.text(); - throw new Error(`openai embeddings failed: ${res.status} ${text}`); - } - const payload = (await res.json()) as { - data?: Array<{ embedding?: number[] }>; - }; - const data = payload.data ?? []; - return data.map((entry) => entry.embedding ?? []); }; return { @@ -70,29 +63,11 @@ export async function createOpenAiEmbeddingProvider( export async function resolveOpenAiEmbeddingClient( options: EmbeddingProviderOptions, ): Promise { - const remote = options.remote; - const remoteApiKey = remote?.apiKey?.trim(); - const remoteBaseUrl = remote?.baseUrl?.trim(); - - const apiKey = remoteApiKey - ? remoteApiKey - : requireApiKey( - await resolveApiKeyForProvider({ - provider: "openai", - cfg: options.config, - agentDir: options.agentDir, - }), - "openai", - ); - - const providerConfig = options.config.models?.providers?.openai; - const baseUrl = remoteBaseUrl || providerConfig?.baseUrl?.trim() || DEFAULT_OPENAI_BASE_URL; - const headerOverrides = Object.assign({}, providerConfig?.headers, remote?.headers); - const headers: Record = { - "Content-Type": "application/json", - Authorization: `Bearer ${apiKey}`, - ...headerOverrides, - }; + const { baseUrl, headers } = await resolveRemoteEmbeddingBearerClient({ + provider: "openai", + options, + defaultBaseUrl: DEFAULT_OPENAI_BASE_URL, + }); const model = normalizeOpenAiModel(options.model); return { baseUrl, headers, model }; } diff --git a/src/memory/embeddings-remote-client.ts b/src/memory/embeddings-remote-client.ts new file mode 100644 index 00000000000..cc1ce5d51ea --- /dev/null +++ b/src/memory/embeddings-remote-client.ts @@ -0,0 +1,33 @@ +import type { EmbeddingProviderOptions } from "./embeddings.js"; +import { requireApiKey, resolveApiKeyForProvider } from "../agents/model-auth.js"; + +type RemoteEmbeddingProviderId = "openai" | "voyage"; + +export async function resolveRemoteEmbeddingBearerClient(params: { + provider: RemoteEmbeddingProviderId; + options: EmbeddingProviderOptions; + defaultBaseUrl: string; +}): Promise<{ baseUrl: string; headers: Record }> { + const remote = params.options.remote; + const remoteApiKey = remote?.apiKey?.trim(); + const remoteBaseUrl = remote?.baseUrl?.trim(); + const providerConfig = params.options.config.models?.providers?.[params.provider]; + const apiKey = remoteApiKey + ? remoteApiKey + : requireApiKey( + await resolveApiKeyForProvider({ + provider: params.provider, + cfg: params.options.config, + agentDir: params.options.agentDir, + }), + params.provider, + ); + const baseUrl = remoteBaseUrl || providerConfig?.baseUrl?.trim() || params.defaultBaseUrl; + const headerOverrides = Object.assign({}, providerConfig?.headers, remote?.headers); + const headers: Record = { + "Content-Type": "application/json", + Authorization: `Bearer ${apiKey}`, + ...headerOverrides, + }; + return { baseUrl, headers }; +} diff --git a/src/memory/embeddings-remote-fetch.ts b/src/memory/embeddings-remote-fetch.ts new file mode 100644 index 00000000000..5fa77e3d087 --- /dev/null +++ b/src/memory/embeddings-remote-fetch.ts @@ -0,0 +1,21 @@ +export async function fetchRemoteEmbeddingVectors(params: { + url: string; + headers: Record; + body: unknown; + errorPrefix: string; +}): Promise { + const res = await fetch(params.url, { + method: "POST", + headers: params.headers, + body: JSON.stringify(params.body), + }); + if (!res.ok) { + const text = await res.text(); + throw new Error(`${params.errorPrefix}: ${res.status} ${text}`); + } + const payload = (await res.json()) as { + data?: Array<{ embedding?: number[] }>; + }; + const data = payload.data ?? []; + return data.map((entry) => entry.embedding ?? []); +} diff --git a/src/memory/embeddings-voyage.ts b/src/memory/embeddings-voyage.ts index 4e014a28fbd..1a685608f2d 100644 --- a/src/memory/embeddings-voyage.ts +++ b/src/memory/embeddings-voyage.ts @@ -1,5 +1,6 @@ import type { EmbeddingProvider, EmbeddingProviderOptions } from "./embeddings.js"; -import { requireApiKey, resolveApiKeyForProvider } from "../agents/model-auth.js"; +import { resolveRemoteEmbeddingBearerClient } from "./embeddings-remote-client.js"; +import { fetchRemoteEmbeddingVectors } from "./embeddings-remote-fetch.js"; export type VoyageEmbeddingClient = { baseUrl: string; @@ -44,20 +45,12 @@ export async function createVoyageEmbeddingProvider( body.input_type = input_type; } - const res = await fetch(url, { - method: "POST", + return await fetchRemoteEmbeddingVectors({ + url, headers: client.headers, - body: JSON.stringify(body), + body, + errorPrefix: "voyage embeddings failed", }); - if (!res.ok) { - const text = await res.text(); - throw new Error(`voyage embeddings failed: ${res.status} ${text}`); - } - const payload = (await res.json()) as { - data?: Array<{ embedding?: number[] }>; - }; - const data = payload.data ?? []; - return data.map((entry) => entry.embedding ?? []); }; return { @@ -78,29 +71,11 @@ export async function createVoyageEmbeddingProvider( export async function resolveVoyageEmbeddingClient( options: EmbeddingProviderOptions, ): Promise { - const remote = options.remote; - const remoteApiKey = remote?.apiKey?.trim(); - const remoteBaseUrl = remote?.baseUrl?.trim(); - - const apiKey = remoteApiKey - ? remoteApiKey - : requireApiKey( - await resolveApiKeyForProvider({ - provider: "voyage", - cfg: options.config, - agentDir: options.agentDir, - }), - "voyage", - ); - - const providerConfig = options.config.models?.providers?.voyage; - const baseUrl = remoteBaseUrl || providerConfig?.baseUrl?.trim() || DEFAULT_VOYAGE_BASE_URL; - const headerOverrides = Object.assign({}, providerConfig?.headers, remote?.headers); - const headers: Record = { - "Content-Type": "application/json", - Authorization: `Bearer ${apiKey}`, - ...headerOverrides, - }; + const { baseUrl, headers } = await resolveRemoteEmbeddingBearerClient({ + provider: "voyage", + options, + defaultBaseUrl: DEFAULT_VOYAGE_BASE_URL, + }); const model = normalizeVoyageModel(options.model); return { baseUrl, headers, model }; }