From e671e1444a95c6488d77242214300632fee144b2 Mon Sep 17 00:00:00 2001 From: Gustavo Madeira Santana Date: Sun, 15 Mar 2026 20:38:27 +0000 Subject: [PATCH] Memory: extract embedding safe reindex --- .../embedding-safe-reindex.test.ts | 153 ++++++++++++ src/extension-host/embedding-safe-reindex.ts | 99 ++++++++ src/memory/manager-sync-ops.ts | 235 ++++++++---------- 3 files changed, 350 insertions(+), 137 deletions(-) create mode 100644 src/extension-host/embedding-safe-reindex.test.ts create mode 100644 src/extension-host/embedding-safe-reindex.ts diff --git a/src/extension-host/embedding-safe-reindex.test.ts b/src/extension-host/embedding-safe-reindex.test.ts new file mode 100644 index 00000000000..0656d5e8857 --- /dev/null +++ b/src/extension-host/embedding-safe-reindex.test.ts @@ -0,0 +1,153 @@ +import fs from "node:fs/promises"; +import os from "node:os"; +import path from "node:path"; +import { afterEach, describe, expect, it, vi } from "vitest"; +import { + moveExtensionHostIndexFiles, + removeExtensionHostIndexFiles, + runExtensionHostEmbeddingSafeReindex, + swapExtensionHostIndexFiles, +} from "./embedding-safe-reindex.js"; + +async function writeIndexFiles(basePath: string, value: string): Promise { + await fs.writeFile(basePath, `${value}-db`); + await fs.writeFile(`${basePath}-wal`, `${value}-wal`); + await fs.writeFile(`${basePath}-shm`, `${value}-shm`); +} + +async function readIndexFiles(basePath: string): Promise { + return await Promise.all([ + fs.readFile(basePath, "utf8"), + fs.readFile(`${basePath}-wal`, "utf8"), + fs.readFile(`${basePath}-shm`, "utf8"), + ]); +} + +describe("embedding-safe-reindex", () => { + const tempRoots: string[] = []; + + afterEach(async () => { + await Promise.all( + tempRoots.map(async (root) => await fs.rm(root, { recursive: true, force: true })), + ); + tempRoots.length = 0; + }); + + it("moves, swaps, and removes index sidecar files together", async () => { + const root = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-embed-safe-reindex-")); + tempRoots.push(root); + const sourcePath = path.join(root, "source.sqlite"); + const targetPath = path.join(root, "target.sqlite"); + + await writeIndexFiles(sourcePath, "source"); + await moveExtensionHostIndexFiles(sourcePath, targetPath); + await expect(readIndexFiles(targetPath)).resolves.toEqual([ + "source-db", + "source-wal", + "source-shm", + ]); + + await writeIndexFiles(sourcePath, "new-source"); + await swapExtensionHostIndexFiles(targetPath, sourcePath, "backup-id"); + await expect(readIndexFiles(targetPath)).resolves.toEqual([ + "new-source-db", + "new-source-wal", + "new-source-shm", + ]); + + await removeExtensionHostIndexFiles(targetPath); + await expect(fs.stat(targetPath)).rejects.toMatchObject({ code: "ENOENT" }); + }); + + it("runs the safe reindex flow, swaps files, and reopens the active database", async () => { + const root = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-embed-safe-reindex-")); + tempRoots.push(root); + const dbPath = path.join(root, "index.sqlite"); + await writeIndexFiles(dbPath, "active"); + + const closeDatabase = vi.fn(); + const captureOriginalState = vi.fn(() => ({ state: "original" })); + const restoreOriginalState = vi.fn(); + const prepareTempDb = vi.fn(); + const seedEmbeddingCache = vi.fn(); + const reopenAfterSwap = vi.fn(); + + const currentDb = { label: "current" }; + const openDatabaseAtPath = vi.fn((openedPath: string) => { + if (openedPath !== dbPath) { + void writeIndexFiles(openedPath, "temp"); + } + return { label: openedPath }; + }); + + const nextMeta = await runExtensionHostEmbeddingSafeReindex({ + dbPath, + currentDb, + openDatabaseAtPath, + closeDatabase, + captureOriginalState, + restoreOriginalState, + prepareTempDb, + seedEmbeddingCache, + runReindexBody: async () => ({ vectorDims: 1536 }), + reopenAfterSwap, + randomId: () => "temp-id", + }); + + expect(nextMeta).toEqual({ vectorDims: 1536 }); + expect(prepareTempDb).toHaveBeenCalledWith({ label: `${dbPath}.tmp-temp-id` }); + expect(seedEmbeddingCache).toHaveBeenCalledWith(currentDb); + expect(closeDatabase).toHaveBeenCalledTimes(2); + expect(reopenAfterSwap).toHaveBeenCalledWith(dbPath, { vectorDims: 1536 }); + expect(restoreOriginalState).not.toHaveBeenCalled(); + await expect(readIndexFiles(dbPath)).resolves.toEqual(["temp-db", "temp-wal", "temp-shm"]); + }); + + it("restores original state and removes temp files when reindex body fails", async () => { + const root = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-embed-safe-reindex-")); + tempRoots.push(root); + const dbPath = path.join(root, "index.sqlite"); + await writeIndexFiles(dbPath, "active"); + + const currentDb = { label: "current" }; + const restoreOriginalState = vi.fn(); + const closeDatabase = vi.fn(); + const openDatabaseAtPath = vi.fn((openedPath: string) => { + if (openedPath !== dbPath) { + void writeIndexFiles(openedPath, "temp"); + } + return { label: openedPath }; + }); + + await expect( + runExtensionHostEmbeddingSafeReindex({ + dbPath, + currentDb, + openDatabaseAtPath, + closeDatabase, + captureOriginalState: () => ({ state: "original" }), + restoreOriginalState, + prepareTempDb: vi.fn(), + seedEmbeddingCache: vi.fn(), + runReindexBody: async () => { + throw new Error("boom"); + }, + reopenAfterSwap: vi.fn(), + randomId: () => "temp-id", + }), + ).rejects.toThrow("boom"); + + expect(restoreOriginalState).toHaveBeenCalledWith({ + originalDb: currentDb, + originalState: { state: "original" }, + originalDbClosed: false, + dbPath, + }); + await expect(readIndexFiles(dbPath)).resolves.toEqual([ + "active-db", + "active-wal", + "active-shm", + ]); + await expect(fs.stat(`${dbPath}.tmp-temp-id`)).rejects.toMatchObject({ code: "ENOENT" }); + }); +}); diff --git a/src/extension-host/embedding-safe-reindex.ts b/src/extension-host/embedding-safe-reindex.ts new file mode 100644 index 00000000000..24c63098e03 --- /dev/null +++ b/src/extension-host/embedding-safe-reindex.ts @@ -0,0 +1,99 @@ +import { randomUUID } from "node:crypto"; +import fs from "node:fs/promises"; + +const INDEX_FILE_SUFFIXES = ["", "-wal", "-shm"]; + +export async function moveExtensionHostIndexFiles( + sourceBase: string, + targetBase: string, +): Promise { + for (const suffix of INDEX_FILE_SUFFIXES) { + const source = `${sourceBase}${suffix}`; + const target = `${targetBase}${suffix}`; + try { + await fs.rename(source, target); + } catch (err) { + if ((err as NodeJS.ErrnoException).code !== "ENOENT") { + throw err; + } + } + } +} + +export async function removeExtensionHostIndexFiles(basePath: string): Promise { + await Promise.all( + INDEX_FILE_SUFFIXES.map((suffix) => fs.rm(`${basePath}${suffix}`, { force: true })), + ); +} + +export async function swapExtensionHostIndexFiles( + targetPath: string, + tempPath: string, + backupId = randomUUID(), +): Promise { + const backupPath = `${targetPath}.backup-${backupId}`; + await moveExtensionHostIndexFiles(targetPath, backupPath); + try { + await moveExtensionHostIndexFiles(tempPath, targetPath); + } catch (err) { + await moveExtensionHostIndexFiles(backupPath, targetPath); + throw err; + } + await removeExtensionHostIndexFiles(backupPath); +} + +export async function runExtensionHostEmbeddingSafeReindex< + TDb, + TState, + TMeta extends { vectorDims?: number }, +>(params: { + dbPath: string; + currentDb: TDb; + openDatabaseAtPath: (dbPath: string) => TDb; + closeDatabase: (db: TDb) => void; + captureOriginalState: () => TState; + restoreOriginalState: (params: { + originalDb: TDb; + originalState: TState; + originalDbClosed: boolean; + dbPath: string; + }) => void; + prepareTempDb: (tempDb: TDb) => void; + seedEmbeddingCache: (sourceDb: TDb) => void; + runReindexBody: () => Promise; + reopenAfterSwap: (dbPath: string, nextMeta: TMeta) => void; + randomId?: () => string; +}): Promise { + const tempDbPath = `${params.dbPath}.tmp-${(params.randomId ?? randomUUID)()}`; + const tempDb = params.openDatabaseAtPath(tempDbPath); + const originalDb = params.currentDb; + const originalState = params.captureOriginalState(); + let originalDbClosed = false; + + params.prepareTempDb(tempDb); + + try { + params.seedEmbeddingCache(originalDb); + const nextMeta = await params.runReindexBody(); + + params.closeDatabase(tempDb); + params.closeDatabase(originalDb); + originalDbClosed = true; + + await swapExtensionHostIndexFiles(params.dbPath, tempDbPath); + params.reopenAfterSwap(params.dbPath, nextMeta); + return nextMeta; + } catch (err) { + try { + params.closeDatabase(tempDb); + } catch {} + await removeExtensionHostIndexFiles(tempDbPath); + params.restoreOriginalState({ + originalDb, + originalState, + originalDbClosed, + dbPath: params.dbPath, + }); + throw err; + } +} diff --git a/src/memory/manager-sync-ops.ts b/src/memory/manager-sync-ops.ts index 70ce34ddc62..32b018219f2 100644 --- a/src/memory/manager-sync-ops.ts +++ b/src/memory/manager-sync-ops.ts @@ -1,4 +1,3 @@ -import { randomUUID } from "node:crypto"; import fsSync from "node:fs"; import fs from "node:fs/promises"; import path from "node:path"; @@ -24,6 +23,7 @@ import { type OpenAiEmbeddingClient, type VoyageEmbeddingClient, } from "../extension-host/embedding-runtime.js"; +import { runExtensionHostEmbeddingSafeReindex } from "../extension-host/embedding-safe-reindex.js"; import { runExtensionHostEmbeddingSync } from "../extension-host/embedding-sync-execution.js"; import { buildEmbeddingIndexMeta, @@ -324,38 +324,6 @@ export abstract class MemoryManagerSyncOps { } } - private async swapIndexFiles(targetPath: string, tempPath: string): Promise { - const backupPath = `${targetPath}.backup-${randomUUID()}`; - await this.moveIndexFiles(targetPath, backupPath); - try { - await this.moveIndexFiles(tempPath, targetPath); - } catch (err) { - await this.moveIndexFiles(backupPath, targetPath); - throw err; - } - await this.removeIndexFiles(backupPath); - } - - private async moveIndexFiles(sourceBase: string, targetBase: string): Promise { - const suffixes = ["", "-wal", "-shm"]; - for (const suffix of suffixes) { - const source = `${sourceBase}${suffix}`; - const target = `${targetBase}${suffix}`; - try { - await fs.rename(source, target); - } catch (err) { - if ((err as NodeJS.ErrnoException).code !== "ENOENT") { - throw err; - } - } - } - } - - private async removeIndexFiles(basePath: string): Promise { - const suffixes = ["", "-wal", "-shm"]; - await Promise.all(suffixes.map((suffix) => fs.rm(`${basePath}${suffix}`, { force: true }))); - } - protected ensureSchema() { const result = ensureMemoryIndexSchema({ db: this.db, @@ -1061,110 +1029,103 @@ export abstract class MemoryManagerSyncOps { progress?: MemorySyncProgressState; }): Promise { const dbPath = resolveUserPath(this.settings.store.path); - const tempDbPath = `${dbPath}.tmp-${randomUUID()}`; - const tempDb = this.openDatabaseAtPath(tempDbPath); - - const originalDb = this.db; - let originalDbClosed = false; - const originalState = { - ftsAvailable: this.fts.available, - ftsError: this.fts.loadError, - vectorAvailable: this.vector.available, - vectorLoadError: this.vector.loadError, - vectorDims: this.vector.dims, - vectorReady: this.vectorReady, - }; - - const restoreOriginalState = () => { - if (originalDbClosed) { - this.db = this.openDatabaseAtPath(dbPath); - } else { - this.db = originalDb; - } - this.fts.available = originalState.ftsAvailable; - this.fts.loadError = originalState.ftsError; - this.vector.available = originalDbClosed ? null : originalState.vectorAvailable; - this.vector.loadError = originalState.vectorLoadError; - this.vector.dims = originalState.vectorDims; - this.vectorReady = originalDbClosed ? null : originalState.vectorReady; - }; - - this.db = tempDb; - this.vectorReady = null; - this.vector.available = null; - this.vector.loadError = undefined; - this.vector.dims = undefined; - this.fts.available = false; - this.fts.loadError = undefined; - this.ensureSchema(); - - let nextMeta: EmbeddingIndexMeta | null = null; - - try { - this.seedEmbeddingCache(originalDb); - const shouldSyncMemory = this.sources.has("memory"); - const shouldSyncSessions = this.shouldSyncSessions( - { reason: params.reason, force: params.force }, - true, - ); - nextMeta = await runExtensionHostEmbeddingReindexBody({ - shouldSyncMemory, - shouldSyncSessions, - hasDirtySessionFiles: this.sessionsDirtyFiles.size > 0, - progress: params.progress, - syncMemoryFiles: async (syncParams) => { - await this.syncMemoryFiles(syncParams); - }, - syncSessionFiles: async (syncParams) => { - await this.syncSessionFiles(syncParams); - }, - setDirty: (value) => { - this.dirty = value; - }, - setSessionsDirty: (value) => { - this.sessionsDirty = value; - }, - clearAllSessionDirtyFiles: () => { - this.sessionsDirtyFiles.clear(); - }, - buildNextMeta: () => - buildEmbeddingIndexMeta({ - provider: this.provider, - providerKey: this.providerKey, - configuredSources: this.resolveConfiguredSourcesForMeta(), - configuredScopeHash: this.resolveConfiguredScopeHash(), - chunkTokens: this.settings.chunking.tokens, - chunkOverlap: this.settings.chunking.overlap, - }), - vectorDims: this.vector.available && this.vector.dims ? this.vector.dims : undefined, - writeMeta: (meta) => { - this.writeMeta(meta); - }, - pruneEmbeddingCacheIfNeeded: () => { - this.pruneEmbeddingCacheIfNeeded?.(); - }, - }); - - this.db.close(); - originalDb.close(); - originalDbClosed = true; - - await this.swapIndexFiles(dbPath, tempDbPath); - - this.db = this.openDatabaseAtPath(dbPath); - this.vectorReady = null; - this.vector.available = null; - this.vector.loadError = undefined; - this.ensureSchema(); - this.vector.dims = nextMeta?.vectorDims; - } catch (err) { - try { - this.db.close(); - } catch {} - await this.removeIndexFiles(tempDbPath); - restoreOriginalState(); - throw err; - } + await runExtensionHostEmbeddingSafeReindex({ + dbPath, + currentDb: this.db, + openDatabaseAtPath: (openedPath) => this.openDatabaseAtPath(openedPath), + closeDatabase: (db) => db.close(), + captureOriginalState: () => ({ + ftsAvailable: this.fts.available, + ftsError: this.fts.loadError, + vectorAvailable: this.vector.available, + vectorLoadError: this.vector.loadError, + vectorDims: this.vector.dims, + vectorReady: this.vectorReady, + }), + restoreOriginalState: ({ + originalDb, + originalState, + originalDbClosed, + dbPath: reopenPath, + }) => { + if (originalDbClosed) { + this.db = this.openDatabaseAtPath(reopenPath); + } else { + this.db = originalDb; + } + this.fts.available = originalState.ftsAvailable; + this.fts.loadError = originalState.ftsError; + this.vector.available = originalDbClosed ? null : originalState.vectorAvailable; + this.vector.loadError = originalState.vectorLoadError; + this.vector.dims = originalState.vectorDims; + this.vectorReady = originalDbClosed ? null : originalState.vectorReady; + }, + prepareTempDb: (tempDb) => { + this.db = tempDb; + this.vectorReady = null; + this.vector.available = null; + this.vector.loadError = undefined; + this.vector.dims = undefined; + this.fts.available = false; + this.fts.loadError = undefined; + this.ensureSchema(); + }, + seedEmbeddingCache: (sourceDb) => { + this.seedEmbeddingCache(sourceDb); + }, + runReindexBody: async () => { + const shouldSyncMemory = this.sources.has("memory"); + const shouldSyncSessions = this.shouldSyncSessions( + { reason: params.reason, force: params.force }, + true, + ); + return await runExtensionHostEmbeddingReindexBody({ + shouldSyncMemory, + shouldSyncSessions, + hasDirtySessionFiles: this.sessionsDirtyFiles.size > 0, + progress: params.progress, + syncMemoryFiles: async (syncParams) => { + await this.syncMemoryFiles(syncParams); + }, + syncSessionFiles: async (syncParams) => { + await this.syncSessionFiles(syncParams); + }, + setDirty: (value) => { + this.dirty = value; + }, + setSessionsDirty: (value) => { + this.sessionsDirty = value; + }, + clearAllSessionDirtyFiles: () => { + this.sessionsDirtyFiles.clear(); + }, + buildNextMeta: () => + buildEmbeddingIndexMeta({ + provider: this.provider, + providerKey: this.providerKey, + configuredSources: this.resolveConfiguredSourcesForMeta(), + configuredScopeHash: this.resolveConfiguredScopeHash(), + chunkTokens: this.settings.chunking.tokens, + chunkOverlap: this.settings.chunking.overlap, + }), + vectorDims: this.vector.available && this.vector.dims ? this.vector.dims : undefined, + writeMeta: (meta) => { + this.writeMeta(meta); + }, + pruneEmbeddingCacheIfNeeded: () => { + this.pruneEmbeddingCacheIfNeeded?.(); + }, + }); + }, + reopenAfterSwap: (reopenPath, nextMeta) => { + this.db = this.openDatabaseAtPath(reopenPath); + this.vectorReady = null; + this.vector.available = null; + this.vector.loadError = undefined; + this.ensureSchema(); + this.vector.dims = nextMeta.vectorDims; + }, + }); } private async runUnsafeReindex(params: {