From ad4b037b289e75723d9ec509bd5a019596eb912a Mon Sep 17 00:00:00 2001 From: Gustavo Madeira Santana Date: Sun, 15 Mar 2026 20:16:29 +0000 Subject: [PATCH] Memory: extract embedding sync planning --- .../embedding-sync-planning.test.ts | 171 ++++++++++++++++++ src/extension-host/embedding-sync-planning.ts | 138 ++++++++++++++ src/memory/manager-sync-ops.ts | 137 ++++++-------- 3 files changed, 365 insertions(+), 81 deletions(-) create mode 100644 src/extension-host/embedding-sync-planning.test.ts create mode 100644 src/extension-host/embedding-sync-planning.ts diff --git a/src/extension-host/embedding-sync-planning.test.ts b/src/extension-host/embedding-sync-planning.test.ts new file mode 100644 index 00000000000..3c73ba0f54d --- /dev/null +++ b/src/extension-host/embedding-sync-planning.test.ts @@ -0,0 +1,171 @@ +import { describe, expect, it } from "vitest"; +import { + buildEmbeddingIndexMeta, + metaSourcesDiffer, + normalizeEmbeddingMetaSources, + resolveEmbeddingSyncPlan, + shouldUseUnsafeEmbeddingReindex, +} from "./embedding-sync-planning.js"; + +describe("embedding-sync-planning", () => { + it("prefers targeted session refreshes over broader sync decisions", () => { + const plan = resolveEmbeddingSyncPlan({ + hasTargetSessionFiles: true, + targetSessionFiles: new Set(["/tmp/session.jsonl"]), + sessionsEnabled: true, + dirty: true, + shouldSyncSessions: true, + useUnsafeReindex: false, + vectorReady: false, + meta: null, + provider: null, + providerKey: null, + configuredSources: ["sessions"], + configuredScopeHash: "scope", + chunkTokens: 200, + chunkOverlap: 20, + }); + + expect(plan).toEqual({ + kind: "targeted-sessions", + targetSessionFiles: ["/tmp/session.jsonl"], + }); + }); + + it("plans a full reindex when metadata drift is detected", () => { + const plan = resolveEmbeddingSyncPlan({ + force: false, + hasTargetSessionFiles: false, + targetSessionFiles: null, + sessionsEnabled: true, + dirty: false, + shouldSyncSessions: false, + useUnsafeReindex: true, + vectorReady: true, + meta: { + model: "old-model", + provider: "openai", + providerKey: "key", + sources: ["memory"], + scopeHash: "scope", + chunkTokens: 200, + chunkOverlap: 20, + }, + provider: { + id: "openai", + model: "new-model", + embedQuery: async () => [1], + embedBatch: async () => [[1]], + }, + providerKey: "key", + configuredSources: ["memory"], + configuredScopeHash: "scope", + chunkTokens: 200, + chunkOverlap: 20, + }); + + expect(plan).toEqual({ + kind: "full-reindex", + unsafe: true, + }); + }); + + it("builds incremental sync plans from dirty/session state", () => { + const plan = resolveEmbeddingSyncPlan({ + force: false, + hasTargetSessionFiles: false, + targetSessionFiles: null, + sessionsEnabled: true, + dirty: true, + shouldSyncSessions: true, + useUnsafeReindex: false, + vectorReady: false, + meta: { + model: "model", + provider: "openai", + providerKey: "key", + sources: ["memory", "sessions"], + scopeHash: "scope", + chunkTokens: 200, + chunkOverlap: 20, + vectorDims: 1536, + }, + provider: { + id: "openai", + model: "model", + embedQuery: async () => [1], + embedBatch: async () => [[1]], + }, + providerKey: "key", + configuredSources: ["memory", "sessions"], + configuredScopeHash: "scope", + chunkTokens: 200, + chunkOverlap: 20, + }); + + expect(plan).toEqual({ + kind: "incremental", + shouldSyncMemory: true, + shouldSyncSessions: true, + targetSessionFiles: undefined, + }); + }); + + it("builds embedding metadata with provider and vector dimensions", () => { + expect( + buildEmbeddingIndexMeta({ + provider: { + id: "openai", + model: "text-embedding-3-small", + embedQuery: async () => [1], + embedBatch: async () => [[1]], + }, + providerKey: "provider-key", + configuredSources: ["memory", "sessions"], + configuredScopeHash: "scope", + chunkTokens: 256, + chunkOverlap: 32, + vectorDims: 1536, + }), + ).toEqual({ + model: "text-embedding-3-small", + provider: "openai", + providerKey: "provider-key", + sources: ["memory", "sessions"], + scopeHash: "scope", + chunkTokens: 256, + chunkOverlap: 32, + vectorDims: 1536, + }); + }); + + it("normalizes legacy meta sources and detects drift", () => { + expect(normalizeEmbeddingMetaSources(null)).toEqual(["memory"]); + expect(normalizeEmbeddingMetaSources({ sources: ["sessions", "memory", "sessions"] })).toEqual([ + "memory", + "sessions", + ]); + expect( + metaSourcesDiffer( + { + model: "model", + provider: "openai", + sources: ["memory"], + chunkTokens: 200, + chunkOverlap: 20, + }, + ["memory", "sessions"], + ), + ).toBe(true); + }); + + it("reads the unsafe test reindex gate from env vars", () => { + expect( + shouldUseUnsafeEmbeddingReindex({ + OPENCLAW_TEST_FAST: "1", + OPENCLAW_TEST_MEMORY_UNSAFE_REINDEX: "1", + } as NodeJS.ProcessEnv), + ).toBe(true); + expect(shouldUseUnsafeEmbeddingReindex({} as NodeJS.ProcessEnv)).toBe(false); + }); +}); diff --git a/src/extension-host/embedding-sync-planning.ts b/src/extension-host/embedding-sync-planning.ts new file mode 100644 index 00000000000..46f396599ad --- /dev/null +++ b/src/extension-host/embedding-sync-planning.ts @@ -0,0 +1,138 @@ +import type { EmbeddingProvider } from "./embedding-runtime.js"; + +export type EmbeddingMemorySource = "memory" | "sessions"; + +export type EmbeddingIndexMeta = { + model: string; + provider: string; + providerKey?: string; + sources?: EmbeddingMemorySource[]; + scopeHash?: string; + chunkTokens: number; + chunkOverlap: number; + vectorDims?: number; +}; + +export type EmbeddingSyncPlan = + | { + kind: "targeted-sessions"; + targetSessionFiles: string[]; + } + | { + kind: "full-reindex"; + unsafe: boolean; + } + | { + kind: "incremental"; + shouldSyncMemory: boolean; + shouldSyncSessions: boolean; + targetSessionFiles?: string[]; + }; + +export function resolveEmbeddingSyncPlan(params: { + force?: boolean; + hasTargetSessionFiles: boolean; + targetSessionFiles: Set | null; + sessionsEnabled: boolean; + dirty: boolean; + shouldSyncSessions: boolean; + useUnsafeReindex: boolean; + vectorReady: boolean; + meta: EmbeddingIndexMeta | null; + provider: EmbeddingProvider | null; + providerKey: string | null; + configuredSources: EmbeddingMemorySource[]; + configuredScopeHash: string; + chunkTokens: number; + chunkOverlap: number; +}): EmbeddingSyncPlan { + if (params.hasTargetSessionFiles && params.targetSessionFiles && params.sessionsEnabled) { + return { + kind: "targeted-sessions", + targetSessionFiles: Array.from(params.targetSessionFiles), + }; + } + + const needsFullReindex = + (params.force && !params.hasTargetSessionFiles) || + !params.meta || + (params.provider && params.meta.model !== params.provider.model) || + (params.provider && params.meta.provider !== params.provider.id) || + params.meta?.providerKey !== params.providerKey || + metaSourcesDiffer(params.meta, params.configuredSources) || + params.meta?.scopeHash !== params.configuredScopeHash || + params.meta?.chunkTokens !== params.chunkTokens || + params.meta?.chunkOverlap !== params.chunkOverlap || + (params.vectorReady && !params.meta?.vectorDims); + + if (needsFullReindex) { + return { + kind: "full-reindex", + unsafe: params.useUnsafeReindex, + }; + } + + return { + kind: "incremental", + shouldSyncMemory: !params.hasTargetSessionFiles && (Boolean(params.force) || params.dirty), + shouldSyncSessions: params.shouldSyncSessions, + targetSessionFiles: params.targetSessionFiles + ? Array.from(params.targetSessionFiles) + : undefined, + }; +} + +export function buildEmbeddingIndexMeta(params: { + provider: EmbeddingProvider | null; + providerKey: string | null; + configuredSources: EmbeddingMemorySource[]; + configuredScopeHash: string; + chunkTokens: number; + chunkOverlap: number; + vectorDims?: number; +}): EmbeddingIndexMeta { + const meta: EmbeddingIndexMeta = { + model: params.provider?.model ?? "fts-only", + provider: params.provider?.id ?? "none", + providerKey: params.providerKey ?? undefined, + sources: params.configuredSources, + scopeHash: params.configuredScopeHash, + chunkTokens: params.chunkTokens, + chunkOverlap: params.chunkOverlap, + }; + if (params.vectorDims) { + meta.vectorDims = params.vectorDims; + } + return meta; +} + +export function shouldUseUnsafeEmbeddingReindex(env = process.env): boolean { + return env.OPENCLAW_TEST_FAST === "1" && env.OPENCLAW_TEST_MEMORY_UNSAFE_REINDEX === "1"; +} + +export function metaSourcesDiffer( + meta: EmbeddingIndexMeta | null, + configuredSources: EmbeddingMemorySource[], +): boolean { + const metaSources = normalizeEmbeddingMetaSources(meta); + if (metaSources.length !== configuredSources.length) { + return true; + } + return metaSources.some((source, index) => source !== configuredSources[index]); +} + +export function normalizeEmbeddingMetaSources( + meta: Pick | null, +): EmbeddingMemorySource[] { + if (!Array.isArray(meta?.sources)) { + return ["memory"]; + } + const normalized = Array.from( + new Set( + meta.sources.filter( + (source): source is EmbeddingMemorySource => source === "memory" || source === "sessions", + ), + ), + ).toSorted(); + return normalized.length > 0 ? normalized : ["memory"]; +} diff --git a/src/memory/manager-sync-ops.ts b/src/memory/manager-sync-ops.ts index 97b8ae0469d..680cb2a4ab6 100644 --- a/src/memory/manager-sync-ops.ts +++ b/src/memory/manager-sync-ops.ts @@ -20,6 +20,14 @@ import { type OpenAiEmbeddingClient, type VoyageEmbeddingClient, } from "../extension-host/embedding-runtime.js"; +import { + buildEmbeddingIndexMeta, + type EmbeddingIndexMeta, + metaSourcesDiffer as extensionHostMetaSourcesDiffer, + normalizeEmbeddingMetaSources, + resolveEmbeddingSyncPlan, + shouldUseUnsafeEmbeddingReindex, +} from "../extension-host/embedding-sync-planning.js"; import { createSubsystemLogger } from "../logging/subsystem.js"; import { onSessionTranscriptUpdate } from "../sessions/transcript-events.js"; import { resolveUserPath } from "../utils.js"; @@ -49,17 +57,6 @@ import { loadSqliteVecExtension } from "./sqlite-vec.js"; import { requireNodeSqlite } from "./sqlite.js"; import type { MemorySource, MemorySyncProgressUpdate } from "./types.js"; -type MemoryIndexMeta = { - model: string; - provider: string; - providerKey?: string; - sources?: MemorySource[]; - scopeHash?: string; - chunkTokens: number; - chunkOverlap: number; - vectorDims?: number; -}; - type MemorySyncProgressState = { completed: number; total: number; @@ -950,25 +947,39 @@ export abstract class MemoryManagerSyncOps { const configuredScopeHash = this.resolveConfiguredScopeHash(); const targetSessionFiles = this.normalizeTargetSessionFiles(params?.sessionFiles); const hasTargetSessionFiles = targetSessionFiles !== null; - if (hasTargetSessionFiles && targetSessionFiles && this.sources.has("sessions")) { + const syncPlan = resolveEmbeddingSyncPlan({ + force: params?.force, + hasTargetSessionFiles, + targetSessionFiles, + sessionsEnabled: this.sources.has("sessions"), + dirty: this.dirty, + shouldSyncSessions: this.shouldSyncSessions(params, false), + useUnsafeReindex: shouldUseUnsafeEmbeddingReindex(), + vectorReady, + meta, + provider: this.provider, + providerKey: this.providerKey, + configuredSources, + configuredScopeHash, + chunkTokens: this.settings.chunking.tokens, + chunkOverlap: this.settings.chunking.overlap, + }); + if (syncPlan.kind === "targeted-sessions") { // Post-compaction refreshes should only update the explicit transcript files and // leave broader reindex/dirty-work decisions to the regular sync path. try { await this.syncSessionFiles({ needsFullReindex: false, - targetSessionFiles: Array.from(targetSessionFiles), + targetSessionFiles: syncPlan.targetSessionFiles, progress: progress ?? undefined, }); - this.clearSyncedSessionFiles(targetSessionFiles); + this.clearSyncedSessionFiles(new Set(syncPlan.targetSessionFiles)); } catch (err) { const reason = err instanceof Error ? err.message : String(err); const activated = this.shouldFallbackOnError(reason) && (await this.activateFallbackProvider(reason)); if (activated) { - if ( - process.env.OPENCLAW_TEST_FAST === "1" && - process.env.OPENCLAW_TEST_MEMORY_UNSAFE_REINDEX === "1" - ) { + if (shouldUseUnsafeEmbeddingReindex()) { await this.runUnsafeReindex({ reason: params?.reason, force: true, @@ -987,23 +998,9 @@ export abstract class MemoryManagerSyncOps { } return; } - const needsFullReindex = - (params?.force && !hasTargetSessionFiles) || - !meta || - (this.provider && meta.model !== this.provider.model) || - (this.provider && meta.provider !== this.provider.id) || - meta.providerKey !== this.providerKey || - this.metaSourcesDiffer(meta, configuredSources) || - meta.scopeHash !== configuredScopeHash || - meta.chunkTokens !== this.settings.chunking.tokens || - meta.chunkOverlap !== this.settings.chunking.overlap || - (vectorReady && !meta?.vectorDims); try { - if (needsFullReindex) { - if ( - process.env.OPENCLAW_TEST_FAST === "1" && - process.env.OPENCLAW_TEST_MEMORY_UNSAFE_REINDEX === "1" - ) { + if (syncPlan.kind === "full-reindex") { + if (syncPlan.unsafe) { await this.runUnsafeReindex({ reason: params?.reason, force: params?.force, @@ -1019,20 +1016,15 @@ export abstract class MemoryManagerSyncOps { return; } - const shouldSyncMemory = - this.sources.has("memory") && - ((!hasTargetSessionFiles && params?.force) || needsFullReindex || this.dirty); - const shouldSyncSessions = this.shouldSyncSessions(params, needsFullReindex); - - if (shouldSyncMemory) { - await this.syncMemoryFiles({ needsFullReindex, progress: progress ?? undefined }); + if (syncPlan.shouldSyncMemory) { + await this.syncMemoryFiles({ needsFullReindex: false, progress: progress ?? undefined }); this.dirty = false; } - if (shouldSyncSessions) { + if (syncPlan.shouldSyncSessions) { await this.syncSessionFiles({ - needsFullReindex, - targetSessionFiles: targetSessionFiles ? Array.from(targetSessionFiles) : undefined, + needsFullReindex: false, + targetSessionFiles: syncPlan.targetSessionFiles, progress: progress ?? undefined, }); this.sessionsDirty = false; @@ -1159,7 +1151,7 @@ export abstract class MemoryManagerSyncOps { this.fts.loadError = undefined; this.ensureSchema(); - let nextMeta: MemoryIndexMeta | null = null; + let nextMeta: EmbeddingIndexMeta | null = null; try { this.seedEmbeddingCache(originalDb); @@ -1184,15 +1176,14 @@ export abstract class MemoryManagerSyncOps { this.sessionsDirty = false; } - nextMeta = { - model: this.provider?.model ?? "fts-only", - provider: this.provider?.id ?? "none", - providerKey: this.providerKey!, - sources: this.resolveConfiguredSourcesForMeta(), - scopeHash: this.resolveConfiguredScopeHash(), + nextMeta = buildEmbeddingIndexMeta({ + provider: this.provider, + providerKey: this.providerKey, + configuredSources: this.resolveConfiguredSourcesForMeta(), + configuredScopeHash: this.resolveConfiguredScopeHash(), chunkTokens: this.settings.chunking.tokens, chunkOverlap: this.settings.chunking.overlap, - }; + }); if (!nextMeta) { throw new Error("Failed to compute memory index metadata for reindexing."); } @@ -1256,15 +1247,14 @@ export abstract class MemoryManagerSyncOps { this.sessionsDirty = false; } - const nextMeta: MemoryIndexMeta = { - model: this.provider?.model ?? "fts-only", - provider: this.provider?.id ?? "none", - providerKey: this.providerKey!, - sources: this.resolveConfiguredSourcesForMeta(), - scopeHash: this.resolveConfiguredScopeHash(), + const nextMeta = buildEmbeddingIndexMeta({ + provider: this.provider, + providerKey: this.providerKey, + configuredSources: this.resolveConfiguredSourcesForMeta(), + configuredScopeHash: this.resolveConfiguredScopeHash(), chunkTokens: this.settings.chunking.tokens, chunkOverlap: this.settings.chunking.overlap, - }; + }); if (this.vector.available && this.vector.dims) { nextMeta.vectorDims = this.vector.dims; } @@ -1286,7 +1276,7 @@ export abstract class MemoryManagerSyncOps { this.sessionsDirtyFiles.clear(); } - protected readMeta(): MemoryIndexMeta | null { + protected readMeta(): EmbeddingIndexMeta | null { const row = this.db.prepare(`SELECT value FROM meta WHERE key = ?`).get(META_KEY) as | { value: string } | undefined; @@ -1295,7 +1285,7 @@ export abstract class MemoryManagerSyncOps { return null; } try { - const parsed = JSON.parse(row.value) as MemoryIndexMeta; + const parsed = JSON.parse(row.value) as EmbeddingIndexMeta; this.lastMetaSerialized = row.value; return parsed; } catch { @@ -1304,7 +1294,7 @@ export abstract class MemoryManagerSyncOps { } } - protected writeMeta(meta: MemoryIndexMeta) { + protected writeMeta(meta: EmbeddingIndexMeta) { const value = JSON.stringify(meta); if (this.lastMetaSerialized === value) { return; @@ -1324,19 +1314,8 @@ export abstract class MemoryManagerSyncOps { return normalized.length > 0 ? normalized : ["memory"]; } - private normalizeMetaSources(meta: MemoryIndexMeta): MemorySource[] { - if (!Array.isArray(meta.sources)) { - // Backward compatibility for older indexes that did not persist sources. - return ["memory"]; - } - const normalized = Array.from( - new Set( - meta.sources.filter( - (source): source is MemorySource => source === "memory" || source === "sessions", - ), - ), - ).toSorted(); - return normalized.length > 0 ? normalized : ["memory"]; + private normalizeMetaSources(meta: EmbeddingIndexMeta): MemorySource[] { + return normalizeEmbeddingMetaSources(meta); } private resolveConfiguredScopeHash(): string { @@ -1355,11 +1334,7 @@ export abstract class MemoryManagerSyncOps { ); } - private metaSourcesDiffer(meta: MemoryIndexMeta, configuredSources: MemorySource[]): boolean { - const metaSources = this.normalizeMetaSources(meta); - if (metaSources.length !== configuredSources.length) { - return true; - } - return metaSources.some((source, index) => source !== configuredSources[index]); + private metaSourcesDiffer(meta: EmbeddingIndexMeta, configuredSources: MemorySource[]): boolean { + return extensionHostMetaSourcesDiffer(meta, configuredSources); } }