Memory: extract embedding sync execution

This commit is contained in:
Gustavo Madeira Santana 2026-03-15 20:28:11 +00:00
parent 5f625ff1b3
commit 46b02e0c48
No known key found for this signature in database
3 changed files with 426 additions and 96 deletions

View File

@ -0,0 +1,234 @@
import { describe, expect, it, vi } from "vitest";
import { runExtensionHostEmbeddingSync } from "./embedding-sync-execution.js";
describe("embedding-sync-execution", () => {
it("prefers targeted session refreshes and clears only the targeted dirty files", async () => {
const syncSessionFiles = vi.fn(async () => {});
const clearSyncedSessionFiles = vi.fn();
await runExtensionHostEmbeddingSync({
reason: "post-compaction",
targetSessionFiles: new Set(["/tmp/a.jsonl"]),
vectorReady: false,
meta: null,
configuredSources: ["sessions"],
configuredScopeHash: "scope",
provider: null,
providerKey: null,
chunkTokens: 200,
chunkOverlap: 20,
sessionsEnabled: true,
dirty: true,
shouldSyncSessions: true,
useUnsafeReindex: false,
hasDirtySessionFiles: true,
syncMemoryFiles: vi.fn(async () => {}),
syncSessionFiles,
clearSyncedSessionFiles,
clearAllSessionDirtyFiles: vi.fn(),
setDirty: vi.fn(),
setSessionsDirty: vi.fn(),
shouldFallbackOnError: vi.fn(() => false),
activateFallbackProvider: vi.fn(async () => false),
runSafeReindex: vi.fn(async () => {}),
runUnsafeReindex: vi.fn(async () => {}),
});
expect(syncSessionFiles).toHaveBeenCalledWith({
needsFullReindex: false,
targetSessionFiles: ["/tmp/a.jsonl"],
progress: undefined,
});
expect(clearSyncedSessionFiles).toHaveBeenCalledWith(new Set(["/tmp/a.jsonl"]));
});
it("runs an unsafe reindex when fallback activates during a targeted refresh", async () => {
const runUnsafeReindex = vi.fn(async () => {});
await runExtensionHostEmbeddingSync({
reason: "post-compaction",
targetSessionFiles: new Set(["/tmp/a.jsonl"]),
vectorReady: false,
meta: null,
configuredSources: ["sessions"],
configuredScopeHash: "scope",
provider: null,
providerKey: null,
chunkTokens: 200,
chunkOverlap: 20,
sessionsEnabled: true,
dirty: false,
shouldSyncSessions: true,
useUnsafeReindex: true,
hasDirtySessionFiles: false,
syncMemoryFiles: vi.fn(async () => {}),
syncSessionFiles: vi.fn(async () => {
throw new Error("embedding backend failed");
}),
clearSyncedSessionFiles: vi.fn(),
clearAllSessionDirtyFiles: vi.fn(),
setDirty: vi.fn(),
setSessionsDirty: vi.fn(),
shouldFallbackOnError: vi.fn(() => true),
activateFallbackProvider: vi.fn(async () => true),
runSafeReindex: vi.fn(async () => {}),
runUnsafeReindex,
});
expect(runUnsafeReindex).toHaveBeenCalledWith({
reason: "post-compaction",
force: true,
progress: undefined,
});
});
it("runs a full safe reindex when planning detects metadata drift", async () => {
const runSafeReindex = vi.fn(async () => {});
await runExtensionHostEmbeddingSync({
reason: "test",
force: false,
targetSessionFiles: null,
vectorReady: true,
meta: {
model: "old-model",
provider: "openai",
providerKey: "key",
sources: ["memory"],
scopeHash: "scope",
chunkTokens: 200,
chunkOverlap: 20,
},
configuredSources: ["memory"],
configuredScopeHash: "scope",
provider: {
id: "openai",
model: "new-model",
embedQuery: async () => [1],
embedBatch: async () => [[1]],
},
providerKey: "key",
chunkTokens: 200,
chunkOverlap: 20,
sessionsEnabled: false,
dirty: false,
shouldSyncSessions: false,
useUnsafeReindex: false,
hasDirtySessionFiles: false,
syncMemoryFiles: vi.fn(async () => {}),
syncSessionFiles: vi.fn(async () => {}),
clearSyncedSessionFiles: vi.fn(),
clearAllSessionDirtyFiles: vi.fn(),
setDirty: vi.fn(),
setSessionsDirty: vi.fn(),
shouldFallbackOnError: vi.fn(() => false),
activateFallbackProvider: vi.fn(async () => false),
runSafeReindex,
runUnsafeReindex: vi.fn(async () => {}),
});
expect(runSafeReindex).toHaveBeenCalledWith({
reason: "test",
force: false,
progress: undefined,
});
});
it("clears dirty flags after incremental syncs and preserves pending session dirtiness otherwise", async () => {
const setDirty = vi.fn();
const setSessionsDirty = vi.fn();
const clearAllSessionDirtyFiles = vi.fn();
await runExtensionHostEmbeddingSync({
reason: "watch",
targetSessionFiles: null,
vectorReady: true,
meta: {
model: "model",
provider: "openai",
providerKey: "key",
sources: ["memory", "sessions"],
scopeHash: "scope",
chunkTokens: 200,
chunkOverlap: 20,
vectorDims: 1536,
},
configuredSources: ["memory", "sessions"],
configuredScopeHash: "scope",
provider: {
id: "openai",
model: "model",
embedQuery: async () => [1],
embedBatch: async () => [[1]],
},
providerKey: "key",
chunkTokens: 200,
chunkOverlap: 20,
sessionsEnabled: true,
dirty: true,
shouldSyncSessions: true,
useUnsafeReindex: false,
hasDirtySessionFiles: true,
syncMemoryFiles: vi.fn(async () => {}),
syncSessionFiles: vi.fn(async () => {}),
clearSyncedSessionFiles: vi.fn(),
clearAllSessionDirtyFiles,
setDirty,
setSessionsDirty,
shouldFallbackOnError: vi.fn(() => false),
activateFallbackProvider: vi.fn(async () => false),
runSafeReindex: vi.fn(async () => {}),
runUnsafeReindex: vi.fn(async () => {}),
});
expect(setDirty).toHaveBeenCalledWith(false);
expect(setSessionsDirty).toHaveBeenCalledWith(false);
expect(clearAllSessionDirtyFiles).toHaveBeenCalled();
setSessionsDirty.mockClear();
await runExtensionHostEmbeddingSync({
reason: "watch",
targetSessionFiles: null,
vectorReady: true,
meta: {
model: "model",
provider: "openai",
providerKey: "key",
sources: ["memory", "sessions"],
scopeHash: "scope",
chunkTokens: 200,
chunkOverlap: 20,
vectorDims: 1536,
},
configuredSources: ["memory", "sessions"],
configuredScopeHash: "scope",
provider: {
id: "openai",
model: "model",
embedQuery: async () => [1],
embedBatch: async () => [[1]],
},
providerKey: "key",
chunkTokens: 200,
chunkOverlap: 20,
sessionsEnabled: true,
dirty: false,
shouldSyncSessions: false,
useUnsafeReindex: false,
hasDirtySessionFiles: true,
syncMemoryFiles: vi.fn(async () => {}),
syncSessionFiles: vi.fn(async () => {}),
clearSyncedSessionFiles: vi.fn(),
clearAllSessionDirtyFiles: vi.fn(),
setDirty: vi.fn(),
setSessionsDirty,
shouldFallbackOnError: vi.fn(() => false),
activateFallbackProvider: vi.fn(async () => false),
runSafeReindex: vi.fn(async () => {}),
runUnsafeReindex: vi.fn(async () => {}),
});
expect(setSessionsDirty).toHaveBeenCalledWith(true);
});
});

View File

@ -0,0 +1,153 @@
import type { EmbeddingProvider } from "./embedding-runtime.js";
import {
type EmbeddingIndexMeta,
type EmbeddingMemorySource,
resolveEmbeddingSyncPlan,
} from "./embedding-sync-planning.js";
type EmbeddingSyncProgress = unknown;
type EmbeddingSyncMemoryFiles<TProgress = EmbeddingSyncProgress> = (params: {
needsFullReindex: boolean;
progress?: TProgress;
}) => Promise<void>;
type EmbeddingSyncSessionFiles<TProgress = EmbeddingSyncProgress> = (params: {
needsFullReindex: boolean;
targetSessionFiles?: string[];
progress?: TProgress;
}) => Promise<void>;
type EmbeddingReindex<TProgress = EmbeddingSyncProgress> = (params: {
reason?: string;
force?: boolean;
progress?: TProgress;
}) => Promise<void>;
export async function runExtensionHostEmbeddingSync<TProgress = EmbeddingSyncProgress>(params: {
reason?: string;
force?: boolean;
targetSessionFiles: Set<string> | null;
vectorReady: boolean;
meta: EmbeddingIndexMeta | null;
configuredSources: EmbeddingMemorySource[];
configuredScopeHash: string;
provider: EmbeddingProvider | null;
providerKey: string | null;
chunkTokens: number;
chunkOverlap: number;
sessionsEnabled: boolean;
dirty: boolean;
shouldSyncSessions: boolean;
useUnsafeReindex: boolean;
hasDirtySessionFiles: boolean;
progress?: TProgress;
syncMemoryFiles: EmbeddingSyncMemoryFiles<TProgress>;
syncSessionFiles: EmbeddingSyncSessionFiles<TProgress>;
clearSyncedSessionFiles: (targetSessionFiles?: Iterable<string> | null) => void;
clearAllSessionDirtyFiles: () => void;
setDirty: (value: boolean) => void;
setSessionsDirty: (value: boolean) => void;
shouldFallbackOnError: (message: string) => boolean;
activateFallbackProvider: (reason: string) => Promise<boolean>;
runSafeReindex: EmbeddingReindex<TProgress>;
runUnsafeReindex: EmbeddingReindex<TProgress>;
}): Promise<void> {
const hasTargetSessionFiles = params.targetSessionFiles !== null;
const syncPlan = resolveEmbeddingSyncPlan({
force: params.force,
hasTargetSessionFiles,
targetSessionFiles: params.targetSessionFiles,
sessionsEnabled: params.sessionsEnabled,
dirty: params.dirty,
shouldSyncSessions: params.shouldSyncSessions,
useUnsafeReindex: params.useUnsafeReindex,
vectorReady: params.vectorReady,
meta: params.meta,
provider: params.provider,
providerKey: params.providerKey,
configuredSources: params.configuredSources,
configuredScopeHash: params.configuredScopeHash,
chunkTokens: params.chunkTokens,
chunkOverlap: params.chunkOverlap,
});
if (syncPlan.kind === "targeted-sessions") {
try {
await params.syncSessionFiles({
needsFullReindex: false,
targetSessionFiles: syncPlan.targetSessionFiles,
progress: params.progress,
});
params.clearSyncedSessionFiles(new Set(syncPlan.targetSessionFiles));
} catch (err) {
const reason = err instanceof Error ? err.message : String(err);
const activated =
params.shouldFallbackOnError(reason) && (await params.activateFallbackProvider(reason));
if (activated) {
const reindexParams = {
reason: params.reason,
force: true,
progress: params.progress,
};
if (params.useUnsafeReindex) {
await params.runUnsafeReindex(reindexParams);
} else {
await params.runSafeReindex(reindexParams);
}
return;
}
throw err;
}
return;
}
try {
if (syncPlan.kind === "full-reindex") {
const reindexParams = {
reason: params.reason,
force: params.force,
progress: params.progress,
};
if (syncPlan.unsafe) {
await params.runUnsafeReindex(reindexParams);
} else {
await params.runSafeReindex(reindexParams);
}
return;
}
if (syncPlan.shouldSyncMemory) {
await params.syncMemoryFiles({
needsFullReindex: false,
progress: params.progress,
});
params.setDirty(false);
}
if (syncPlan.shouldSyncSessions) {
await params.syncSessionFiles({
needsFullReindex: false,
targetSessionFiles: syncPlan.targetSessionFiles,
progress: params.progress,
});
params.setSessionsDirty(false);
params.clearAllSessionDirtyFiles();
} else {
params.setSessionsDirty(params.hasDirtySessionFiles);
}
} catch (err) {
const reason = err instanceof Error ? err.message : String(err);
const activated =
params.shouldFallbackOnError(reason) && (await params.activateFallbackProvider(reason));
if (activated) {
await params.runSafeReindex({
reason: params.reason ?? "fallback",
force: true,
progress: params.progress,
});
return;
}
throw err;
}
}

View File

@ -20,12 +20,12 @@ import {
type OpenAiEmbeddingClient,
type VoyageEmbeddingClient,
} from "../extension-host/embedding-runtime.js";
import { runExtensionHostEmbeddingSync } from "../extension-host/embedding-sync-execution.js";
import {
buildEmbeddingIndexMeta,
type EmbeddingIndexMeta,
metaSourcesDiffer as extensionHostMetaSourcesDiffer,
normalizeEmbeddingMetaSources,
resolveEmbeddingSyncPlan,
shouldUseUnsafeEmbeddingReindex,
} from "../extension-host/embedding-sync-planning.js";
import { createSubsystemLogger } from "../logging/subsystem.js";
@ -946,108 +946,51 @@ export abstract class MemoryManagerSyncOps {
const configuredSources = this.resolveConfiguredSourcesForMeta();
const configuredScopeHash = this.resolveConfiguredScopeHash();
const targetSessionFiles = this.normalizeTargetSessionFiles(params?.sessionFiles);
const hasTargetSessionFiles = targetSessionFiles !== null;
const syncPlan = resolveEmbeddingSyncPlan({
await runExtensionHostEmbeddingSync({
reason: params?.reason,
force: params?.force,
hasTargetSessionFiles,
targetSessionFiles,
vectorReady,
meta,
configuredSources,
configuredScopeHash,
provider: this.provider,
providerKey: this.providerKey,
chunkTokens: this.settings.chunking.tokens,
chunkOverlap: this.settings.chunking.overlap,
sessionsEnabled: this.sources.has("sessions"),
dirty: this.dirty,
shouldSyncSessions: this.shouldSyncSessions(params, false),
useUnsafeReindex: shouldUseUnsafeEmbeddingReindex(),
vectorReady,
meta,
provider: this.provider,
providerKey: this.providerKey,
configuredSources,
configuredScopeHash,
chunkTokens: this.settings.chunking.tokens,
chunkOverlap: this.settings.chunking.overlap,
});
if (syncPlan.kind === "targeted-sessions") {
// Post-compaction refreshes should only update the explicit transcript files and
// leave broader reindex/dirty-work decisions to the regular sync path.
try {
await this.syncSessionFiles({
needsFullReindex: false,
targetSessionFiles: syncPlan.targetSessionFiles,
progress: progress ?? undefined,
});
this.clearSyncedSessionFiles(new Set(syncPlan.targetSessionFiles));
} catch (err) {
const reason = err instanceof Error ? err.message : String(err);
const activated =
this.shouldFallbackOnError(reason) && (await this.activateFallbackProvider(reason));
if (activated) {
if (shouldUseUnsafeEmbeddingReindex()) {
await this.runUnsafeReindex({
reason: params?.reason,
force: true,
progress: progress ?? undefined,
});
} else {
await this.runSafeReindex({
reason: params?.reason,
force: true,
progress: progress ?? undefined,
});
}
return;
}
throw err;
}
return;
}
try {
if (syncPlan.kind === "full-reindex") {
if (syncPlan.unsafe) {
await this.runUnsafeReindex({
reason: params?.reason,
force: params?.force,
progress: progress ?? undefined,
});
} else {
await this.runSafeReindex({
reason: params?.reason,
force: params?.force,
progress: progress ?? undefined,
});
}
return;
}
if (syncPlan.shouldSyncMemory) {
await this.syncMemoryFiles({ needsFullReindex: false, progress: progress ?? undefined });
this.dirty = false;
}
if (syncPlan.shouldSyncSessions) {
await this.syncSessionFiles({
needsFullReindex: false,
targetSessionFiles: syncPlan.targetSessionFiles,
progress: progress ?? undefined,
});
this.sessionsDirty = false;
hasDirtySessionFiles: this.sessionsDirtyFiles.size > 0,
progress: progress ?? undefined,
syncMemoryFiles: async (syncParams) => {
await this.syncMemoryFiles(syncParams);
},
syncSessionFiles: async (syncParams) => {
await this.syncSessionFiles(syncParams);
},
clearSyncedSessionFiles: (sessionFiles) => {
this.clearSyncedSessionFiles(sessionFiles);
},
clearAllSessionDirtyFiles: () => {
this.sessionsDirtyFiles.clear();
} else if (this.sessionsDirtyFiles.size > 0) {
this.sessionsDirty = true;
} else {
this.sessionsDirty = false;
}
} catch (err) {
const reason = err instanceof Error ? err.message : String(err);
const activated =
this.shouldFallbackOnError(reason) && (await this.activateFallbackProvider(reason));
if (activated) {
await this.runSafeReindex({
reason: params?.reason ?? "fallback",
force: true,
progress: progress ?? undefined,
});
return;
}
throw err;
}
},
setDirty: (value) => {
this.dirty = value;
},
setSessionsDirty: (value) => {
this.sessionsDirty = value;
},
shouldFallbackOnError: (message) => this.shouldFallbackOnError(message),
activateFallbackProvider: async (reason) => await this.activateFallbackProvider(reason),
runSafeReindex: async (reindexParams) => {
await this.runSafeReindex(reindexParams);
},
runUnsafeReindex: async (reindexParams) => {
await this.runUnsafeReindex(reindexParams);
},
});
}
private shouldFallbackOnError(message: string): boolean {