From 46b02e0c48664d4310b63d88a066374fa251a5e6 Mon Sep 17 00:00:00 2001 From: Gustavo Madeira Santana Date: Sun, 15 Mar 2026 20:28:11 +0000 Subject: [PATCH] Memory: extract embedding sync execution --- .../embedding-sync-execution.test.ts | 234 ++++++++++++++++++ .../embedding-sync-execution.ts | 153 ++++++++++++ src/memory/manager-sync-ops.ts | 135 +++------- 3 files changed, 426 insertions(+), 96 deletions(-) create mode 100644 src/extension-host/embedding-sync-execution.test.ts create mode 100644 src/extension-host/embedding-sync-execution.ts diff --git a/src/extension-host/embedding-sync-execution.test.ts b/src/extension-host/embedding-sync-execution.test.ts new file mode 100644 index 00000000000..700e6c06b8b --- /dev/null +++ b/src/extension-host/embedding-sync-execution.test.ts @@ -0,0 +1,234 @@ +import { describe, expect, it, vi } from "vitest"; +import { runExtensionHostEmbeddingSync } from "./embedding-sync-execution.js"; + +describe("embedding-sync-execution", () => { + it("prefers targeted session refreshes and clears only the targeted dirty files", async () => { + const syncSessionFiles = vi.fn(async () => {}); + const clearSyncedSessionFiles = vi.fn(); + + await runExtensionHostEmbeddingSync({ + reason: "post-compaction", + targetSessionFiles: new Set(["/tmp/a.jsonl"]), + vectorReady: false, + meta: null, + configuredSources: ["sessions"], + configuredScopeHash: "scope", + provider: null, + providerKey: null, + chunkTokens: 200, + chunkOverlap: 20, + sessionsEnabled: true, + dirty: true, + shouldSyncSessions: true, + useUnsafeReindex: false, + hasDirtySessionFiles: true, + syncMemoryFiles: vi.fn(async () => {}), + syncSessionFiles, + clearSyncedSessionFiles, + clearAllSessionDirtyFiles: vi.fn(), + setDirty: vi.fn(), + setSessionsDirty: vi.fn(), + shouldFallbackOnError: vi.fn(() => false), + activateFallbackProvider: vi.fn(async () => false), + runSafeReindex: vi.fn(async () => {}), + runUnsafeReindex: vi.fn(async () => {}), + }); + + expect(syncSessionFiles).toHaveBeenCalledWith({ + needsFullReindex: false, + targetSessionFiles: ["/tmp/a.jsonl"], + progress: undefined, + }); + expect(clearSyncedSessionFiles).toHaveBeenCalledWith(new Set(["/tmp/a.jsonl"])); + }); + + it("runs an unsafe reindex when fallback activates during a targeted refresh", async () => { + const runUnsafeReindex = vi.fn(async () => {}); + + await runExtensionHostEmbeddingSync({ + reason: "post-compaction", + targetSessionFiles: new Set(["/tmp/a.jsonl"]), + vectorReady: false, + meta: null, + configuredSources: ["sessions"], + configuredScopeHash: "scope", + provider: null, + providerKey: null, + chunkTokens: 200, + chunkOverlap: 20, + sessionsEnabled: true, + dirty: false, + shouldSyncSessions: true, + useUnsafeReindex: true, + hasDirtySessionFiles: false, + syncMemoryFiles: vi.fn(async () => {}), + syncSessionFiles: vi.fn(async () => { + throw new Error("embedding backend failed"); + }), + clearSyncedSessionFiles: vi.fn(), + clearAllSessionDirtyFiles: vi.fn(), + setDirty: vi.fn(), + setSessionsDirty: vi.fn(), + shouldFallbackOnError: vi.fn(() => true), + activateFallbackProvider: vi.fn(async () => true), + runSafeReindex: vi.fn(async () => {}), + runUnsafeReindex, + }); + + expect(runUnsafeReindex).toHaveBeenCalledWith({ + reason: "post-compaction", + force: true, + progress: undefined, + }); + }); + + it("runs a full safe reindex when planning detects metadata drift", async () => { + const runSafeReindex = vi.fn(async () => {}); + + await runExtensionHostEmbeddingSync({ + reason: "test", + force: false, + targetSessionFiles: null, + vectorReady: true, + meta: { + model: "old-model", + provider: "openai", + providerKey: "key", + sources: ["memory"], + scopeHash: "scope", + chunkTokens: 200, + chunkOverlap: 20, + }, + configuredSources: ["memory"], + configuredScopeHash: "scope", + provider: { + id: "openai", + model: "new-model", + embedQuery: async () => [1], + embedBatch: async () => [[1]], + }, + providerKey: "key", + chunkTokens: 200, + chunkOverlap: 20, + sessionsEnabled: false, + dirty: false, + shouldSyncSessions: false, + useUnsafeReindex: false, + hasDirtySessionFiles: false, + syncMemoryFiles: vi.fn(async () => {}), + syncSessionFiles: vi.fn(async () => {}), + clearSyncedSessionFiles: vi.fn(), + clearAllSessionDirtyFiles: vi.fn(), + setDirty: vi.fn(), + setSessionsDirty: vi.fn(), + shouldFallbackOnError: vi.fn(() => false), + activateFallbackProvider: vi.fn(async () => false), + runSafeReindex, + runUnsafeReindex: vi.fn(async () => {}), + }); + + expect(runSafeReindex).toHaveBeenCalledWith({ + reason: "test", + force: false, + progress: undefined, + }); + }); + + it("clears dirty flags after incremental syncs and preserves pending session dirtiness otherwise", async () => { + const setDirty = vi.fn(); + const setSessionsDirty = vi.fn(); + const clearAllSessionDirtyFiles = vi.fn(); + + await runExtensionHostEmbeddingSync({ + reason: "watch", + targetSessionFiles: null, + vectorReady: true, + meta: { + model: "model", + provider: "openai", + providerKey: "key", + sources: ["memory", "sessions"], + scopeHash: "scope", + chunkTokens: 200, + chunkOverlap: 20, + vectorDims: 1536, + }, + configuredSources: ["memory", "sessions"], + configuredScopeHash: "scope", + provider: { + id: "openai", + model: "model", + embedQuery: async () => [1], + embedBatch: async () => [[1]], + }, + providerKey: "key", + chunkTokens: 200, + chunkOverlap: 20, + sessionsEnabled: true, + dirty: true, + shouldSyncSessions: true, + useUnsafeReindex: false, + hasDirtySessionFiles: true, + syncMemoryFiles: vi.fn(async () => {}), + syncSessionFiles: vi.fn(async () => {}), + clearSyncedSessionFiles: vi.fn(), + clearAllSessionDirtyFiles, + setDirty, + setSessionsDirty, + shouldFallbackOnError: vi.fn(() => false), + activateFallbackProvider: vi.fn(async () => false), + runSafeReindex: vi.fn(async () => {}), + runUnsafeReindex: vi.fn(async () => {}), + }); + + expect(setDirty).toHaveBeenCalledWith(false); + expect(setSessionsDirty).toHaveBeenCalledWith(false); + expect(clearAllSessionDirtyFiles).toHaveBeenCalled(); + + setSessionsDirty.mockClear(); + + await runExtensionHostEmbeddingSync({ + reason: "watch", + targetSessionFiles: null, + vectorReady: true, + meta: { + model: "model", + provider: "openai", + providerKey: "key", + sources: ["memory", "sessions"], + scopeHash: "scope", + chunkTokens: 200, + chunkOverlap: 20, + vectorDims: 1536, + }, + configuredSources: ["memory", "sessions"], + configuredScopeHash: "scope", + provider: { + id: "openai", + model: "model", + embedQuery: async () => [1], + embedBatch: async () => [[1]], + }, + providerKey: "key", + chunkTokens: 200, + chunkOverlap: 20, + sessionsEnabled: true, + dirty: false, + shouldSyncSessions: false, + useUnsafeReindex: false, + hasDirtySessionFiles: true, + syncMemoryFiles: vi.fn(async () => {}), + syncSessionFiles: vi.fn(async () => {}), + clearSyncedSessionFiles: vi.fn(), + clearAllSessionDirtyFiles: vi.fn(), + setDirty: vi.fn(), + setSessionsDirty, + shouldFallbackOnError: vi.fn(() => false), + activateFallbackProvider: vi.fn(async () => false), + runSafeReindex: vi.fn(async () => {}), + runUnsafeReindex: vi.fn(async () => {}), + }); + + expect(setSessionsDirty).toHaveBeenCalledWith(true); + }); +}); diff --git a/src/extension-host/embedding-sync-execution.ts b/src/extension-host/embedding-sync-execution.ts new file mode 100644 index 00000000000..e39547f95bc --- /dev/null +++ b/src/extension-host/embedding-sync-execution.ts @@ -0,0 +1,153 @@ +import type { EmbeddingProvider } from "./embedding-runtime.js"; +import { + type EmbeddingIndexMeta, + type EmbeddingMemorySource, + resolveEmbeddingSyncPlan, +} from "./embedding-sync-planning.js"; + +type EmbeddingSyncProgress = unknown; + +type EmbeddingSyncMemoryFiles = (params: { + needsFullReindex: boolean; + progress?: TProgress; +}) => Promise; + +type EmbeddingSyncSessionFiles = (params: { + needsFullReindex: boolean; + targetSessionFiles?: string[]; + progress?: TProgress; +}) => Promise; + +type EmbeddingReindex = (params: { + reason?: string; + force?: boolean; + progress?: TProgress; +}) => Promise; + +export async function runExtensionHostEmbeddingSync(params: { + reason?: string; + force?: boolean; + targetSessionFiles: Set | null; + vectorReady: boolean; + meta: EmbeddingIndexMeta | null; + configuredSources: EmbeddingMemorySource[]; + configuredScopeHash: string; + provider: EmbeddingProvider | null; + providerKey: string | null; + chunkTokens: number; + chunkOverlap: number; + sessionsEnabled: boolean; + dirty: boolean; + shouldSyncSessions: boolean; + useUnsafeReindex: boolean; + hasDirtySessionFiles: boolean; + progress?: TProgress; + syncMemoryFiles: EmbeddingSyncMemoryFiles; + syncSessionFiles: EmbeddingSyncSessionFiles; + clearSyncedSessionFiles: (targetSessionFiles?: Iterable | null) => void; + clearAllSessionDirtyFiles: () => void; + setDirty: (value: boolean) => void; + setSessionsDirty: (value: boolean) => void; + shouldFallbackOnError: (message: string) => boolean; + activateFallbackProvider: (reason: string) => Promise; + runSafeReindex: EmbeddingReindex; + runUnsafeReindex: EmbeddingReindex; +}): Promise { + const hasTargetSessionFiles = params.targetSessionFiles !== null; + const syncPlan = resolveEmbeddingSyncPlan({ + force: params.force, + hasTargetSessionFiles, + targetSessionFiles: params.targetSessionFiles, + sessionsEnabled: params.sessionsEnabled, + dirty: params.dirty, + shouldSyncSessions: params.shouldSyncSessions, + useUnsafeReindex: params.useUnsafeReindex, + vectorReady: params.vectorReady, + meta: params.meta, + provider: params.provider, + providerKey: params.providerKey, + configuredSources: params.configuredSources, + configuredScopeHash: params.configuredScopeHash, + chunkTokens: params.chunkTokens, + chunkOverlap: params.chunkOverlap, + }); + + if (syncPlan.kind === "targeted-sessions") { + try { + await params.syncSessionFiles({ + needsFullReindex: false, + targetSessionFiles: syncPlan.targetSessionFiles, + progress: params.progress, + }); + params.clearSyncedSessionFiles(new Set(syncPlan.targetSessionFiles)); + } catch (err) { + const reason = err instanceof Error ? err.message : String(err); + const activated = + params.shouldFallbackOnError(reason) && (await params.activateFallbackProvider(reason)); + if (activated) { + const reindexParams = { + reason: params.reason, + force: true, + progress: params.progress, + }; + if (params.useUnsafeReindex) { + await params.runUnsafeReindex(reindexParams); + } else { + await params.runSafeReindex(reindexParams); + } + return; + } + throw err; + } + return; + } + + try { + if (syncPlan.kind === "full-reindex") { + const reindexParams = { + reason: params.reason, + force: params.force, + progress: params.progress, + }; + if (syncPlan.unsafe) { + await params.runUnsafeReindex(reindexParams); + } else { + await params.runSafeReindex(reindexParams); + } + return; + } + + if (syncPlan.shouldSyncMemory) { + await params.syncMemoryFiles({ + needsFullReindex: false, + progress: params.progress, + }); + params.setDirty(false); + } + + if (syncPlan.shouldSyncSessions) { + await params.syncSessionFiles({ + needsFullReindex: false, + targetSessionFiles: syncPlan.targetSessionFiles, + progress: params.progress, + }); + params.setSessionsDirty(false); + params.clearAllSessionDirtyFiles(); + } else { + params.setSessionsDirty(params.hasDirtySessionFiles); + } + } catch (err) { + const reason = err instanceof Error ? err.message : String(err); + const activated = + params.shouldFallbackOnError(reason) && (await params.activateFallbackProvider(reason)); + if (activated) { + await params.runSafeReindex({ + reason: params.reason ?? "fallback", + force: true, + progress: params.progress, + }); + return; + } + throw err; + } +} diff --git a/src/memory/manager-sync-ops.ts b/src/memory/manager-sync-ops.ts index 680cb2a4ab6..fb3ee63e53b 100644 --- a/src/memory/manager-sync-ops.ts +++ b/src/memory/manager-sync-ops.ts @@ -20,12 +20,12 @@ import { type OpenAiEmbeddingClient, type VoyageEmbeddingClient, } from "../extension-host/embedding-runtime.js"; +import { runExtensionHostEmbeddingSync } from "../extension-host/embedding-sync-execution.js"; import { buildEmbeddingIndexMeta, type EmbeddingIndexMeta, metaSourcesDiffer as extensionHostMetaSourcesDiffer, normalizeEmbeddingMetaSources, - resolveEmbeddingSyncPlan, shouldUseUnsafeEmbeddingReindex, } from "../extension-host/embedding-sync-planning.js"; import { createSubsystemLogger } from "../logging/subsystem.js"; @@ -946,108 +946,51 @@ export abstract class MemoryManagerSyncOps { const configuredSources = this.resolveConfiguredSourcesForMeta(); const configuredScopeHash = this.resolveConfiguredScopeHash(); const targetSessionFiles = this.normalizeTargetSessionFiles(params?.sessionFiles); - const hasTargetSessionFiles = targetSessionFiles !== null; - const syncPlan = resolveEmbeddingSyncPlan({ + await runExtensionHostEmbeddingSync({ + reason: params?.reason, force: params?.force, - hasTargetSessionFiles, targetSessionFiles, + vectorReady, + meta, + configuredSources, + configuredScopeHash, + provider: this.provider, + providerKey: this.providerKey, + chunkTokens: this.settings.chunking.tokens, + chunkOverlap: this.settings.chunking.overlap, sessionsEnabled: this.sources.has("sessions"), dirty: this.dirty, shouldSyncSessions: this.shouldSyncSessions(params, false), useUnsafeReindex: shouldUseUnsafeEmbeddingReindex(), - vectorReady, - meta, - provider: this.provider, - providerKey: this.providerKey, - configuredSources, - configuredScopeHash, - chunkTokens: this.settings.chunking.tokens, - chunkOverlap: this.settings.chunking.overlap, - }); - if (syncPlan.kind === "targeted-sessions") { - // Post-compaction refreshes should only update the explicit transcript files and - // leave broader reindex/dirty-work decisions to the regular sync path. - try { - await this.syncSessionFiles({ - needsFullReindex: false, - targetSessionFiles: syncPlan.targetSessionFiles, - progress: progress ?? undefined, - }); - this.clearSyncedSessionFiles(new Set(syncPlan.targetSessionFiles)); - } catch (err) { - const reason = err instanceof Error ? err.message : String(err); - const activated = - this.shouldFallbackOnError(reason) && (await this.activateFallbackProvider(reason)); - if (activated) { - if (shouldUseUnsafeEmbeddingReindex()) { - await this.runUnsafeReindex({ - reason: params?.reason, - force: true, - progress: progress ?? undefined, - }); - } else { - await this.runSafeReindex({ - reason: params?.reason, - force: true, - progress: progress ?? undefined, - }); - } - return; - } - throw err; - } - return; - } - try { - if (syncPlan.kind === "full-reindex") { - if (syncPlan.unsafe) { - await this.runUnsafeReindex({ - reason: params?.reason, - force: params?.force, - progress: progress ?? undefined, - }); - } else { - await this.runSafeReindex({ - reason: params?.reason, - force: params?.force, - progress: progress ?? undefined, - }); - } - return; - } - - if (syncPlan.shouldSyncMemory) { - await this.syncMemoryFiles({ needsFullReindex: false, progress: progress ?? undefined }); - this.dirty = false; - } - - if (syncPlan.shouldSyncSessions) { - await this.syncSessionFiles({ - needsFullReindex: false, - targetSessionFiles: syncPlan.targetSessionFiles, - progress: progress ?? undefined, - }); - this.sessionsDirty = false; + hasDirtySessionFiles: this.sessionsDirtyFiles.size > 0, + progress: progress ?? undefined, + syncMemoryFiles: async (syncParams) => { + await this.syncMemoryFiles(syncParams); + }, + syncSessionFiles: async (syncParams) => { + await this.syncSessionFiles(syncParams); + }, + clearSyncedSessionFiles: (sessionFiles) => { + this.clearSyncedSessionFiles(sessionFiles); + }, + clearAllSessionDirtyFiles: () => { this.sessionsDirtyFiles.clear(); - } else if (this.sessionsDirtyFiles.size > 0) { - this.sessionsDirty = true; - } else { - this.sessionsDirty = false; - } - } catch (err) { - const reason = err instanceof Error ? err.message : String(err); - const activated = - this.shouldFallbackOnError(reason) && (await this.activateFallbackProvider(reason)); - if (activated) { - await this.runSafeReindex({ - reason: params?.reason ?? "fallback", - force: true, - progress: progress ?? undefined, - }); - return; - } - throw err; - } + }, + setDirty: (value) => { + this.dirty = value; + }, + setSessionsDirty: (value) => { + this.sessionsDirty = value; + }, + shouldFallbackOnError: (message) => this.shouldFallbackOnError(message), + activateFallbackProvider: async (reason) => await this.activateFallbackProvider(reason), + runSafeReindex: async (reindexParams) => { + await this.runSafeReindex(reindexParams); + }, + runUnsafeReindex: async (reindexParams) => { + await this.runUnsafeReindex(reindexParams); + }, + }); } private shouldFallbackOnError(message: string): boolean {