Memory: extract embedding safe reindex
This commit is contained in:
parent
f9bbbeadc6
commit
e671e1444a
153
src/extension-host/embedding-safe-reindex.test.ts
Normal file
153
src/extension-host/embedding-safe-reindex.test.ts
Normal file
@ -0,0 +1,153 @@
|
||||
import fs from "node:fs/promises";
|
||||
import os from "node:os";
|
||||
import path from "node:path";
|
||||
import { afterEach, describe, expect, it, vi } from "vitest";
|
||||
import {
|
||||
moveExtensionHostIndexFiles,
|
||||
removeExtensionHostIndexFiles,
|
||||
runExtensionHostEmbeddingSafeReindex,
|
||||
swapExtensionHostIndexFiles,
|
||||
} from "./embedding-safe-reindex.js";
|
||||
|
||||
async function writeIndexFiles(basePath: string, value: string): Promise<void> {
|
||||
await fs.writeFile(basePath, `${value}-db`);
|
||||
await fs.writeFile(`${basePath}-wal`, `${value}-wal`);
|
||||
await fs.writeFile(`${basePath}-shm`, `${value}-shm`);
|
||||
}
|
||||
|
||||
async function readIndexFiles(basePath: string): Promise<string[]> {
|
||||
return await Promise.all([
|
||||
fs.readFile(basePath, "utf8"),
|
||||
fs.readFile(`${basePath}-wal`, "utf8"),
|
||||
fs.readFile(`${basePath}-shm`, "utf8"),
|
||||
]);
|
||||
}
|
||||
|
||||
describe("embedding-safe-reindex", () => {
|
||||
const tempRoots: string[] = [];
|
||||
|
||||
afterEach(async () => {
|
||||
await Promise.all(
|
||||
tempRoots.map(async (root) => await fs.rm(root, { recursive: true, force: true })),
|
||||
);
|
||||
tempRoots.length = 0;
|
||||
});
|
||||
|
||||
it("moves, swaps, and removes index sidecar files together", async () => {
|
||||
const root = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-embed-safe-reindex-"));
|
||||
tempRoots.push(root);
|
||||
const sourcePath = path.join(root, "source.sqlite");
|
||||
const targetPath = path.join(root, "target.sqlite");
|
||||
|
||||
await writeIndexFiles(sourcePath, "source");
|
||||
await moveExtensionHostIndexFiles(sourcePath, targetPath);
|
||||
await expect(readIndexFiles(targetPath)).resolves.toEqual([
|
||||
"source-db",
|
||||
"source-wal",
|
||||
"source-shm",
|
||||
]);
|
||||
|
||||
await writeIndexFiles(sourcePath, "new-source");
|
||||
await swapExtensionHostIndexFiles(targetPath, sourcePath, "backup-id");
|
||||
await expect(readIndexFiles(targetPath)).resolves.toEqual([
|
||||
"new-source-db",
|
||||
"new-source-wal",
|
||||
"new-source-shm",
|
||||
]);
|
||||
|
||||
await removeExtensionHostIndexFiles(targetPath);
|
||||
await expect(fs.stat(targetPath)).rejects.toMatchObject({ code: "ENOENT" });
|
||||
});
|
||||
|
||||
it("runs the safe reindex flow, swaps files, and reopens the active database", async () => {
|
||||
const root = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-embed-safe-reindex-"));
|
||||
tempRoots.push(root);
|
||||
const dbPath = path.join(root, "index.sqlite");
|
||||
await writeIndexFiles(dbPath, "active");
|
||||
|
||||
const closeDatabase = vi.fn();
|
||||
const captureOriginalState = vi.fn(() => ({ state: "original" }));
|
||||
const restoreOriginalState = vi.fn();
|
||||
const prepareTempDb = vi.fn();
|
||||
const seedEmbeddingCache = vi.fn();
|
||||
const reopenAfterSwap = vi.fn();
|
||||
|
||||
const currentDb = { label: "current" };
|
||||
const openDatabaseAtPath = vi.fn((openedPath: string) => {
|
||||
if (openedPath !== dbPath) {
|
||||
void writeIndexFiles(openedPath, "temp");
|
||||
}
|
||||
return { label: openedPath };
|
||||
});
|
||||
|
||||
const nextMeta = await runExtensionHostEmbeddingSafeReindex({
|
||||
dbPath,
|
||||
currentDb,
|
||||
openDatabaseAtPath,
|
||||
closeDatabase,
|
||||
captureOriginalState,
|
||||
restoreOriginalState,
|
||||
prepareTempDb,
|
||||
seedEmbeddingCache,
|
||||
runReindexBody: async () => ({ vectorDims: 1536 }),
|
||||
reopenAfterSwap,
|
||||
randomId: () => "temp-id",
|
||||
});
|
||||
|
||||
expect(nextMeta).toEqual({ vectorDims: 1536 });
|
||||
expect(prepareTempDb).toHaveBeenCalledWith({ label: `${dbPath}.tmp-temp-id` });
|
||||
expect(seedEmbeddingCache).toHaveBeenCalledWith(currentDb);
|
||||
expect(closeDatabase).toHaveBeenCalledTimes(2);
|
||||
expect(reopenAfterSwap).toHaveBeenCalledWith(dbPath, { vectorDims: 1536 });
|
||||
expect(restoreOriginalState).not.toHaveBeenCalled();
|
||||
await expect(readIndexFiles(dbPath)).resolves.toEqual(["temp-db", "temp-wal", "temp-shm"]);
|
||||
});
|
||||
|
||||
it("restores original state and removes temp files when reindex body fails", async () => {
|
||||
const root = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-embed-safe-reindex-"));
|
||||
tempRoots.push(root);
|
||||
const dbPath = path.join(root, "index.sqlite");
|
||||
await writeIndexFiles(dbPath, "active");
|
||||
|
||||
const currentDb = { label: "current" };
|
||||
const restoreOriginalState = vi.fn();
|
||||
const closeDatabase = vi.fn();
|
||||
const openDatabaseAtPath = vi.fn((openedPath: string) => {
|
||||
if (openedPath !== dbPath) {
|
||||
void writeIndexFiles(openedPath, "temp");
|
||||
}
|
||||
return { label: openedPath };
|
||||
});
|
||||
|
||||
await expect(
|
||||
runExtensionHostEmbeddingSafeReindex({
|
||||
dbPath,
|
||||
currentDb,
|
||||
openDatabaseAtPath,
|
||||
closeDatabase,
|
||||
captureOriginalState: () => ({ state: "original" }),
|
||||
restoreOriginalState,
|
||||
prepareTempDb: vi.fn(),
|
||||
seedEmbeddingCache: vi.fn(),
|
||||
runReindexBody: async () => {
|
||||
throw new Error("boom");
|
||||
},
|
||||
reopenAfterSwap: vi.fn(),
|
||||
randomId: () => "temp-id",
|
||||
}),
|
||||
).rejects.toThrow("boom");
|
||||
|
||||
expect(restoreOriginalState).toHaveBeenCalledWith({
|
||||
originalDb: currentDb,
|
||||
originalState: { state: "original" },
|
||||
originalDbClosed: false,
|
||||
dbPath,
|
||||
});
|
||||
await expect(readIndexFiles(dbPath)).resolves.toEqual([
|
||||
"active-db",
|
||||
"active-wal",
|
||||
"active-shm",
|
||||
]);
|
||||
await expect(fs.stat(`${dbPath}.tmp-temp-id`)).rejects.toMatchObject({ code: "ENOENT" });
|
||||
});
|
||||
});
|
||||
99
src/extension-host/embedding-safe-reindex.ts
Normal file
99
src/extension-host/embedding-safe-reindex.ts
Normal file
@ -0,0 +1,99 @@
|
||||
import { randomUUID } from "node:crypto";
|
||||
import fs from "node:fs/promises";
|
||||
|
||||
const INDEX_FILE_SUFFIXES = ["", "-wal", "-shm"];
|
||||
|
||||
export async function moveExtensionHostIndexFiles(
|
||||
sourceBase: string,
|
||||
targetBase: string,
|
||||
): Promise<void> {
|
||||
for (const suffix of INDEX_FILE_SUFFIXES) {
|
||||
const source = `${sourceBase}${suffix}`;
|
||||
const target = `${targetBase}${suffix}`;
|
||||
try {
|
||||
await fs.rename(source, target);
|
||||
} catch (err) {
|
||||
if ((err as NodeJS.ErrnoException).code !== "ENOENT") {
|
||||
throw err;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
export async function removeExtensionHostIndexFiles(basePath: string): Promise<void> {
|
||||
await Promise.all(
|
||||
INDEX_FILE_SUFFIXES.map((suffix) => fs.rm(`${basePath}${suffix}`, { force: true })),
|
||||
);
|
||||
}
|
||||
|
||||
export async function swapExtensionHostIndexFiles(
|
||||
targetPath: string,
|
||||
tempPath: string,
|
||||
backupId = randomUUID(),
|
||||
): Promise<void> {
|
||||
const backupPath = `${targetPath}.backup-${backupId}`;
|
||||
await moveExtensionHostIndexFiles(targetPath, backupPath);
|
||||
try {
|
||||
await moveExtensionHostIndexFiles(tempPath, targetPath);
|
||||
} catch (err) {
|
||||
await moveExtensionHostIndexFiles(backupPath, targetPath);
|
||||
throw err;
|
||||
}
|
||||
await removeExtensionHostIndexFiles(backupPath);
|
||||
}
|
||||
|
||||
export async function runExtensionHostEmbeddingSafeReindex<
|
||||
TDb,
|
||||
TState,
|
||||
TMeta extends { vectorDims?: number },
|
||||
>(params: {
|
||||
dbPath: string;
|
||||
currentDb: TDb;
|
||||
openDatabaseAtPath: (dbPath: string) => TDb;
|
||||
closeDatabase: (db: TDb) => void;
|
||||
captureOriginalState: () => TState;
|
||||
restoreOriginalState: (params: {
|
||||
originalDb: TDb;
|
||||
originalState: TState;
|
||||
originalDbClosed: boolean;
|
||||
dbPath: string;
|
||||
}) => void;
|
||||
prepareTempDb: (tempDb: TDb) => void;
|
||||
seedEmbeddingCache: (sourceDb: TDb) => void;
|
||||
runReindexBody: () => Promise<TMeta>;
|
||||
reopenAfterSwap: (dbPath: string, nextMeta: TMeta) => void;
|
||||
randomId?: () => string;
|
||||
}): Promise<TMeta> {
|
||||
const tempDbPath = `${params.dbPath}.tmp-${(params.randomId ?? randomUUID)()}`;
|
||||
const tempDb = params.openDatabaseAtPath(tempDbPath);
|
||||
const originalDb = params.currentDb;
|
||||
const originalState = params.captureOriginalState();
|
||||
let originalDbClosed = false;
|
||||
|
||||
params.prepareTempDb(tempDb);
|
||||
|
||||
try {
|
||||
params.seedEmbeddingCache(originalDb);
|
||||
const nextMeta = await params.runReindexBody();
|
||||
|
||||
params.closeDatabase(tempDb);
|
||||
params.closeDatabase(originalDb);
|
||||
originalDbClosed = true;
|
||||
|
||||
await swapExtensionHostIndexFiles(params.dbPath, tempDbPath);
|
||||
params.reopenAfterSwap(params.dbPath, nextMeta);
|
||||
return nextMeta;
|
||||
} catch (err) {
|
||||
try {
|
||||
params.closeDatabase(tempDb);
|
||||
} catch {}
|
||||
await removeExtensionHostIndexFiles(tempDbPath);
|
||||
params.restoreOriginalState({
|
||||
originalDb,
|
||||
originalState,
|
||||
originalDbClosed,
|
||||
dbPath: params.dbPath,
|
||||
});
|
||||
throw err;
|
||||
}
|
||||
}
|
||||
@ -1,4 +1,3 @@
|
||||
import { randomUUID } from "node:crypto";
|
||||
import fsSync from "node:fs";
|
||||
import fs from "node:fs/promises";
|
||||
import path from "node:path";
|
||||
@ -24,6 +23,7 @@ import {
|
||||
type OpenAiEmbeddingClient,
|
||||
type VoyageEmbeddingClient,
|
||||
} from "../extension-host/embedding-runtime.js";
|
||||
import { runExtensionHostEmbeddingSafeReindex } from "../extension-host/embedding-safe-reindex.js";
|
||||
import { runExtensionHostEmbeddingSync } from "../extension-host/embedding-sync-execution.js";
|
||||
import {
|
||||
buildEmbeddingIndexMeta,
|
||||
@ -324,38 +324,6 @@ export abstract class MemoryManagerSyncOps {
|
||||
}
|
||||
}
|
||||
|
||||
private async swapIndexFiles(targetPath: string, tempPath: string): Promise<void> {
|
||||
const backupPath = `${targetPath}.backup-${randomUUID()}`;
|
||||
await this.moveIndexFiles(targetPath, backupPath);
|
||||
try {
|
||||
await this.moveIndexFiles(tempPath, targetPath);
|
||||
} catch (err) {
|
||||
await this.moveIndexFiles(backupPath, targetPath);
|
||||
throw err;
|
||||
}
|
||||
await this.removeIndexFiles(backupPath);
|
||||
}
|
||||
|
||||
private async moveIndexFiles(sourceBase: string, targetBase: string): Promise<void> {
|
||||
const suffixes = ["", "-wal", "-shm"];
|
||||
for (const suffix of suffixes) {
|
||||
const source = `${sourceBase}${suffix}`;
|
||||
const target = `${targetBase}${suffix}`;
|
||||
try {
|
||||
await fs.rename(source, target);
|
||||
} catch (err) {
|
||||
if ((err as NodeJS.ErrnoException).code !== "ENOENT") {
|
||||
throw err;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private async removeIndexFiles(basePath: string): Promise<void> {
|
||||
const suffixes = ["", "-wal", "-shm"];
|
||||
await Promise.all(suffixes.map((suffix) => fs.rm(`${basePath}${suffix}`, { force: true })));
|
||||
}
|
||||
|
||||
protected ensureSchema() {
|
||||
const result = ensureMemoryIndexSchema({
|
||||
db: this.db,
|
||||
@ -1061,110 +1029,103 @@ export abstract class MemoryManagerSyncOps {
|
||||
progress?: MemorySyncProgressState;
|
||||
}): Promise<void> {
|
||||
const dbPath = resolveUserPath(this.settings.store.path);
|
||||
const tempDbPath = `${dbPath}.tmp-${randomUUID()}`;
|
||||
const tempDb = this.openDatabaseAtPath(tempDbPath);
|
||||
|
||||
const originalDb = this.db;
|
||||
let originalDbClosed = false;
|
||||
const originalState = {
|
||||
ftsAvailable: this.fts.available,
|
||||
ftsError: this.fts.loadError,
|
||||
vectorAvailable: this.vector.available,
|
||||
vectorLoadError: this.vector.loadError,
|
||||
vectorDims: this.vector.dims,
|
||||
vectorReady: this.vectorReady,
|
||||
};
|
||||
|
||||
const restoreOriginalState = () => {
|
||||
if (originalDbClosed) {
|
||||
this.db = this.openDatabaseAtPath(dbPath);
|
||||
} else {
|
||||
this.db = originalDb;
|
||||
}
|
||||
this.fts.available = originalState.ftsAvailable;
|
||||
this.fts.loadError = originalState.ftsError;
|
||||
this.vector.available = originalDbClosed ? null : originalState.vectorAvailable;
|
||||
this.vector.loadError = originalState.vectorLoadError;
|
||||
this.vector.dims = originalState.vectorDims;
|
||||
this.vectorReady = originalDbClosed ? null : originalState.vectorReady;
|
||||
};
|
||||
|
||||
this.db = tempDb;
|
||||
this.vectorReady = null;
|
||||
this.vector.available = null;
|
||||
this.vector.loadError = undefined;
|
||||
this.vector.dims = undefined;
|
||||
this.fts.available = false;
|
||||
this.fts.loadError = undefined;
|
||||
this.ensureSchema();
|
||||
|
||||
let nextMeta: EmbeddingIndexMeta | null = null;
|
||||
|
||||
try {
|
||||
this.seedEmbeddingCache(originalDb);
|
||||
const shouldSyncMemory = this.sources.has("memory");
|
||||
const shouldSyncSessions = this.shouldSyncSessions(
|
||||
{ reason: params.reason, force: params.force },
|
||||
true,
|
||||
);
|
||||
nextMeta = await runExtensionHostEmbeddingReindexBody({
|
||||
shouldSyncMemory,
|
||||
shouldSyncSessions,
|
||||
hasDirtySessionFiles: this.sessionsDirtyFiles.size > 0,
|
||||
progress: params.progress,
|
||||
syncMemoryFiles: async (syncParams) => {
|
||||
await this.syncMemoryFiles(syncParams);
|
||||
},
|
||||
syncSessionFiles: async (syncParams) => {
|
||||
await this.syncSessionFiles(syncParams);
|
||||
},
|
||||
setDirty: (value) => {
|
||||
this.dirty = value;
|
||||
},
|
||||
setSessionsDirty: (value) => {
|
||||
this.sessionsDirty = value;
|
||||
},
|
||||
clearAllSessionDirtyFiles: () => {
|
||||
this.sessionsDirtyFiles.clear();
|
||||
},
|
||||
buildNextMeta: () =>
|
||||
buildEmbeddingIndexMeta({
|
||||
provider: this.provider,
|
||||
providerKey: this.providerKey,
|
||||
configuredSources: this.resolveConfiguredSourcesForMeta(),
|
||||
configuredScopeHash: this.resolveConfiguredScopeHash(),
|
||||
chunkTokens: this.settings.chunking.tokens,
|
||||
chunkOverlap: this.settings.chunking.overlap,
|
||||
}),
|
||||
vectorDims: this.vector.available && this.vector.dims ? this.vector.dims : undefined,
|
||||
writeMeta: (meta) => {
|
||||
this.writeMeta(meta);
|
||||
},
|
||||
pruneEmbeddingCacheIfNeeded: () => {
|
||||
this.pruneEmbeddingCacheIfNeeded?.();
|
||||
},
|
||||
});
|
||||
|
||||
this.db.close();
|
||||
originalDb.close();
|
||||
originalDbClosed = true;
|
||||
|
||||
await this.swapIndexFiles(dbPath, tempDbPath);
|
||||
|
||||
this.db = this.openDatabaseAtPath(dbPath);
|
||||
this.vectorReady = null;
|
||||
this.vector.available = null;
|
||||
this.vector.loadError = undefined;
|
||||
this.ensureSchema();
|
||||
this.vector.dims = nextMeta?.vectorDims;
|
||||
} catch (err) {
|
||||
try {
|
||||
this.db.close();
|
||||
} catch {}
|
||||
await this.removeIndexFiles(tempDbPath);
|
||||
restoreOriginalState();
|
||||
throw err;
|
||||
}
|
||||
await runExtensionHostEmbeddingSafeReindex({
|
||||
dbPath,
|
||||
currentDb: this.db,
|
||||
openDatabaseAtPath: (openedPath) => this.openDatabaseAtPath(openedPath),
|
||||
closeDatabase: (db) => db.close(),
|
||||
captureOriginalState: () => ({
|
||||
ftsAvailable: this.fts.available,
|
||||
ftsError: this.fts.loadError,
|
||||
vectorAvailable: this.vector.available,
|
||||
vectorLoadError: this.vector.loadError,
|
||||
vectorDims: this.vector.dims,
|
||||
vectorReady: this.vectorReady,
|
||||
}),
|
||||
restoreOriginalState: ({
|
||||
originalDb,
|
||||
originalState,
|
||||
originalDbClosed,
|
||||
dbPath: reopenPath,
|
||||
}) => {
|
||||
if (originalDbClosed) {
|
||||
this.db = this.openDatabaseAtPath(reopenPath);
|
||||
} else {
|
||||
this.db = originalDb;
|
||||
}
|
||||
this.fts.available = originalState.ftsAvailable;
|
||||
this.fts.loadError = originalState.ftsError;
|
||||
this.vector.available = originalDbClosed ? null : originalState.vectorAvailable;
|
||||
this.vector.loadError = originalState.vectorLoadError;
|
||||
this.vector.dims = originalState.vectorDims;
|
||||
this.vectorReady = originalDbClosed ? null : originalState.vectorReady;
|
||||
},
|
||||
prepareTempDb: (tempDb) => {
|
||||
this.db = tempDb;
|
||||
this.vectorReady = null;
|
||||
this.vector.available = null;
|
||||
this.vector.loadError = undefined;
|
||||
this.vector.dims = undefined;
|
||||
this.fts.available = false;
|
||||
this.fts.loadError = undefined;
|
||||
this.ensureSchema();
|
||||
},
|
||||
seedEmbeddingCache: (sourceDb) => {
|
||||
this.seedEmbeddingCache(sourceDb);
|
||||
},
|
||||
runReindexBody: async () => {
|
||||
const shouldSyncMemory = this.sources.has("memory");
|
||||
const shouldSyncSessions = this.shouldSyncSessions(
|
||||
{ reason: params.reason, force: params.force },
|
||||
true,
|
||||
);
|
||||
return await runExtensionHostEmbeddingReindexBody({
|
||||
shouldSyncMemory,
|
||||
shouldSyncSessions,
|
||||
hasDirtySessionFiles: this.sessionsDirtyFiles.size > 0,
|
||||
progress: params.progress,
|
||||
syncMemoryFiles: async (syncParams) => {
|
||||
await this.syncMemoryFiles(syncParams);
|
||||
},
|
||||
syncSessionFiles: async (syncParams) => {
|
||||
await this.syncSessionFiles(syncParams);
|
||||
},
|
||||
setDirty: (value) => {
|
||||
this.dirty = value;
|
||||
},
|
||||
setSessionsDirty: (value) => {
|
||||
this.sessionsDirty = value;
|
||||
},
|
||||
clearAllSessionDirtyFiles: () => {
|
||||
this.sessionsDirtyFiles.clear();
|
||||
},
|
||||
buildNextMeta: () =>
|
||||
buildEmbeddingIndexMeta({
|
||||
provider: this.provider,
|
||||
providerKey: this.providerKey,
|
||||
configuredSources: this.resolveConfiguredSourcesForMeta(),
|
||||
configuredScopeHash: this.resolveConfiguredScopeHash(),
|
||||
chunkTokens: this.settings.chunking.tokens,
|
||||
chunkOverlap: this.settings.chunking.overlap,
|
||||
}),
|
||||
vectorDims: this.vector.available && this.vector.dims ? this.vector.dims : undefined,
|
||||
writeMeta: (meta) => {
|
||||
this.writeMeta(meta);
|
||||
},
|
||||
pruneEmbeddingCacheIfNeeded: () => {
|
||||
this.pruneEmbeddingCacheIfNeeded?.();
|
||||
},
|
||||
});
|
||||
},
|
||||
reopenAfterSwap: (reopenPath, nextMeta) => {
|
||||
this.db = this.openDatabaseAtPath(reopenPath);
|
||||
this.vectorReady = null;
|
||||
this.vector.available = null;
|
||||
this.vector.loadError = undefined;
|
||||
this.ensureSchema();
|
||||
this.vector.dims = nextMeta.vectorDims;
|
||||
},
|
||||
});
|
||||
}
|
||||
|
||||
private async runUnsafeReindex(params: {
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user