Memory: extract embedding sync planning

This commit is contained in:
Gustavo Madeira Santana 2026-03-15 20:16:29 +00:00
parent 61a1f3bd6b
commit ad4b037b28
No known key found for this signature in database
3 changed files with 365 additions and 81 deletions

View File

@ -0,0 +1,171 @@
import { describe, expect, it } from "vitest";
import {
buildEmbeddingIndexMeta,
metaSourcesDiffer,
normalizeEmbeddingMetaSources,
resolveEmbeddingSyncPlan,
shouldUseUnsafeEmbeddingReindex,
} from "./embedding-sync-planning.js";
describe("embedding-sync-planning", () => {
it("prefers targeted session refreshes over broader sync decisions", () => {
const plan = resolveEmbeddingSyncPlan({
hasTargetSessionFiles: true,
targetSessionFiles: new Set(["/tmp/session.jsonl"]),
sessionsEnabled: true,
dirty: true,
shouldSyncSessions: true,
useUnsafeReindex: false,
vectorReady: false,
meta: null,
provider: null,
providerKey: null,
configuredSources: ["sessions"],
configuredScopeHash: "scope",
chunkTokens: 200,
chunkOverlap: 20,
});
expect(plan).toEqual({
kind: "targeted-sessions",
targetSessionFiles: ["/tmp/session.jsonl"],
});
});
it("plans a full reindex when metadata drift is detected", () => {
const plan = resolveEmbeddingSyncPlan({
force: false,
hasTargetSessionFiles: false,
targetSessionFiles: null,
sessionsEnabled: true,
dirty: false,
shouldSyncSessions: false,
useUnsafeReindex: true,
vectorReady: true,
meta: {
model: "old-model",
provider: "openai",
providerKey: "key",
sources: ["memory"],
scopeHash: "scope",
chunkTokens: 200,
chunkOverlap: 20,
},
provider: {
id: "openai",
model: "new-model",
embedQuery: async () => [1],
embedBatch: async () => [[1]],
},
providerKey: "key",
configuredSources: ["memory"],
configuredScopeHash: "scope",
chunkTokens: 200,
chunkOverlap: 20,
});
expect(plan).toEqual({
kind: "full-reindex",
unsafe: true,
});
});
it("builds incremental sync plans from dirty/session state", () => {
const plan = resolveEmbeddingSyncPlan({
force: false,
hasTargetSessionFiles: false,
targetSessionFiles: null,
sessionsEnabled: true,
dirty: true,
shouldSyncSessions: true,
useUnsafeReindex: false,
vectorReady: false,
meta: {
model: "model",
provider: "openai",
providerKey: "key",
sources: ["memory", "sessions"],
scopeHash: "scope",
chunkTokens: 200,
chunkOverlap: 20,
vectorDims: 1536,
},
provider: {
id: "openai",
model: "model",
embedQuery: async () => [1],
embedBatch: async () => [[1]],
},
providerKey: "key",
configuredSources: ["memory", "sessions"],
configuredScopeHash: "scope",
chunkTokens: 200,
chunkOverlap: 20,
});
expect(plan).toEqual({
kind: "incremental",
shouldSyncMemory: true,
shouldSyncSessions: true,
targetSessionFiles: undefined,
});
});
it("builds embedding metadata with provider and vector dimensions", () => {
expect(
buildEmbeddingIndexMeta({
provider: {
id: "openai",
model: "text-embedding-3-small",
embedQuery: async () => [1],
embedBatch: async () => [[1]],
},
providerKey: "provider-key",
configuredSources: ["memory", "sessions"],
configuredScopeHash: "scope",
chunkTokens: 256,
chunkOverlap: 32,
vectorDims: 1536,
}),
).toEqual({
model: "text-embedding-3-small",
provider: "openai",
providerKey: "provider-key",
sources: ["memory", "sessions"],
scopeHash: "scope",
chunkTokens: 256,
chunkOverlap: 32,
vectorDims: 1536,
});
});
it("normalizes legacy meta sources and detects drift", () => {
expect(normalizeEmbeddingMetaSources(null)).toEqual(["memory"]);
expect(normalizeEmbeddingMetaSources({ sources: ["sessions", "memory", "sessions"] })).toEqual([
"memory",
"sessions",
]);
expect(
metaSourcesDiffer(
{
model: "model",
provider: "openai",
sources: ["memory"],
chunkTokens: 200,
chunkOverlap: 20,
},
["memory", "sessions"],
),
).toBe(true);
});
it("reads the unsafe test reindex gate from env vars", () => {
expect(
shouldUseUnsafeEmbeddingReindex({
OPENCLAW_TEST_FAST: "1",
OPENCLAW_TEST_MEMORY_UNSAFE_REINDEX: "1",
} as NodeJS.ProcessEnv),
).toBe(true);
expect(shouldUseUnsafeEmbeddingReindex({} as NodeJS.ProcessEnv)).toBe(false);
});
});

View File

@ -0,0 +1,138 @@
import type { EmbeddingProvider } from "./embedding-runtime.js";
export type EmbeddingMemorySource = "memory" | "sessions";
export type EmbeddingIndexMeta = {
model: string;
provider: string;
providerKey?: string;
sources?: EmbeddingMemorySource[];
scopeHash?: string;
chunkTokens: number;
chunkOverlap: number;
vectorDims?: number;
};
export type EmbeddingSyncPlan =
| {
kind: "targeted-sessions";
targetSessionFiles: string[];
}
| {
kind: "full-reindex";
unsafe: boolean;
}
| {
kind: "incremental";
shouldSyncMemory: boolean;
shouldSyncSessions: boolean;
targetSessionFiles?: string[];
};
export function resolveEmbeddingSyncPlan(params: {
force?: boolean;
hasTargetSessionFiles: boolean;
targetSessionFiles: Set<string> | null;
sessionsEnabled: boolean;
dirty: boolean;
shouldSyncSessions: boolean;
useUnsafeReindex: boolean;
vectorReady: boolean;
meta: EmbeddingIndexMeta | null;
provider: EmbeddingProvider | null;
providerKey: string | null;
configuredSources: EmbeddingMemorySource[];
configuredScopeHash: string;
chunkTokens: number;
chunkOverlap: number;
}): EmbeddingSyncPlan {
if (params.hasTargetSessionFiles && params.targetSessionFiles && params.sessionsEnabled) {
return {
kind: "targeted-sessions",
targetSessionFiles: Array.from(params.targetSessionFiles),
};
}
const needsFullReindex =
(params.force && !params.hasTargetSessionFiles) ||
!params.meta ||
(params.provider && params.meta.model !== params.provider.model) ||
(params.provider && params.meta.provider !== params.provider.id) ||
params.meta?.providerKey !== params.providerKey ||
metaSourcesDiffer(params.meta, params.configuredSources) ||
params.meta?.scopeHash !== params.configuredScopeHash ||
params.meta?.chunkTokens !== params.chunkTokens ||
params.meta?.chunkOverlap !== params.chunkOverlap ||
(params.vectorReady && !params.meta?.vectorDims);
if (needsFullReindex) {
return {
kind: "full-reindex",
unsafe: params.useUnsafeReindex,
};
}
return {
kind: "incremental",
shouldSyncMemory: !params.hasTargetSessionFiles && (Boolean(params.force) || params.dirty),
shouldSyncSessions: params.shouldSyncSessions,
targetSessionFiles: params.targetSessionFiles
? Array.from(params.targetSessionFiles)
: undefined,
};
}
export function buildEmbeddingIndexMeta(params: {
provider: EmbeddingProvider | null;
providerKey: string | null;
configuredSources: EmbeddingMemorySource[];
configuredScopeHash: string;
chunkTokens: number;
chunkOverlap: number;
vectorDims?: number;
}): EmbeddingIndexMeta {
const meta: EmbeddingIndexMeta = {
model: params.provider?.model ?? "fts-only",
provider: params.provider?.id ?? "none",
providerKey: params.providerKey ?? undefined,
sources: params.configuredSources,
scopeHash: params.configuredScopeHash,
chunkTokens: params.chunkTokens,
chunkOverlap: params.chunkOverlap,
};
if (params.vectorDims) {
meta.vectorDims = params.vectorDims;
}
return meta;
}
export function shouldUseUnsafeEmbeddingReindex(env = process.env): boolean {
return env.OPENCLAW_TEST_FAST === "1" && env.OPENCLAW_TEST_MEMORY_UNSAFE_REINDEX === "1";
}
export function metaSourcesDiffer(
meta: EmbeddingIndexMeta | null,
configuredSources: EmbeddingMemorySource[],
): boolean {
const metaSources = normalizeEmbeddingMetaSources(meta);
if (metaSources.length !== configuredSources.length) {
return true;
}
return metaSources.some((source, index) => source !== configuredSources[index]);
}
export function normalizeEmbeddingMetaSources(
meta: Pick<EmbeddingIndexMeta, "sources"> | null,
): EmbeddingMemorySource[] {
if (!Array.isArray(meta?.sources)) {
return ["memory"];
}
const normalized = Array.from(
new Set(
meta.sources.filter(
(source): source is EmbeddingMemorySource => source === "memory" || source === "sessions",
),
),
).toSorted();
return normalized.length > 0 ? normalized : ["memory"];
}

View File

@ -20,6 +20,14 @@ import {
type OpenAiEmbeddingClient,
type VoyageEmbeddingClient,
} from "../extension-host/embedding-runtime.js";
import {
buildEmbeddingIndexMeta,
type EmbeddingIndexMeta,
metaSourcesDiffer as extensionHostMetaSourcesDiffer,
normalizeEmbeddingMetaSources,
resolveEmbeddingSyncPlan,
shouldUseUnsafeEmbeddingReindex,
} from "../extension-host/embedding-sync-planning.js";
import { createSubsystemLogger } from "../logging/subsystem.js";
import { onSessionTranscriptUpdate } from "../sessions/transcript-events.js";
import { resolveUserPath } from "../utils.js";
@ -49,17 +57,6 @@ import { loadSqliteVecExtension } from "./sqlite-vec.js";
import { requireNodeSqlite } from "./sqlite.js";
import type { MemorySource, MemorySyncProgressUpdate } from "./types.js";
type MemoryIndexMeta = {
model: string;
provider: string;
providerKey?: string;
sources?: MemorySource[];
scopeHash?: string;
chunkTokens: number;
chunkOverlap: number;
vectorDims?: number;
};
type MemorySyncProgressState = {
completed: number;
total: number;
@ -950,25 +947,39 @@ export abstract class MemoryManagerSyncOps {
const configuredScopeHash = this.resolveConfiguredScopeHash();
const targetSessionFiles = this.normalizeTargetSessionFiles(params?.sessionFiles);
const hasTargetSessionFiles = targetSessionFiles !== null;
if (hasTargetSessionFiles && targetSessionFiles && this.sources.has("sessions")) {
const syncPlan = resolveEmbeddingSyncPlan({
force: params?.force,
hasTargetSessionFiles,
targetSessionFiles,
sessionsEnabled: this.sources.has("sessions"),
dirty: this.dirty,
shouldSyncSessions: this.shouldSyncSessions(params, false),
useUnsafeReindex: shouldUseUnsafeEmbeddingReindex(),
vectorReady,
meta,
provider: this.provider,
providerKey: this.providerKey,
configuredSources,
configuredScopeHash,
chunkTokens: this.settings.chunking.tokens,
chunkOverlap: this.settings.chunking.overlap,
});
if (syncPlan.kind === "targeted-sessions") {
// Post-compaction refreshes should only update the explicit transcript files and
// leave broader reindex/dirty-work decisions to the regular sync path.
try {
await this.syncSessionFiles({
needsFullReindex: false,
targetSessionFiles: Array.from(targetSessionFiles),
targetSessionFiles: syncPlan.targetSessionFiles,
progress: progress ?? undefined,
});
this.clearSyncedSessionFiles(targetSessionFiles);
this.clearSyncedSessionFiles(new Set(syncPlan.targetSessionFiles));
} catch (err) {
const reason = err instanceof Error ? err.message : String(err);
const activated =
this.shouldFallbackOnError(reason) && (await this.activateFallbackProvider(reason));
if (activated) {
if (
process.env.OPENCLAW_TEST_FAST === "1" &&
process.env.OPENCLAW_TEST_MEMORY_UNSAFE_REINDEX === "1"
) {
if (shouldUseUnsafeEmbeddingReindex()) {
await this.runUnsafeReindex({
reason: params?.reason,
force: true,
@ -987,23 +998,9 @@ export abstract class MemoryManagerSyncOps {
}
return;
}
const needsFullReindex =
(params?.force && !hasTargetSessionFiles) ||
!meta ||
(this.provider && meta.model !== this.provider.model) ||
(this.provider && meta.provider !== this.provider.id) ||
meta.providerKey !== this.providerKey ||
this.metaSourcesDiffer(meta, configuredSources) ||
meta.scopeHash !== configuredScopeHash ||
meta.chunkTokens !== this.settings.chunking.tokens ||
meta.chunkOverlap !== this.settings.chunking.overlap ||
(vectorReady && !meta?.vectorDims);
try {
if (needsFullReindex) {
if (
process.env.OPENCLAW_TEST_FAST === "1" &&
process.env.OPENCLAW_TEST_MEMORY_UNSAFE_REINDEX === "1"
) {
if (syncPlan.kind === "full-reindex") {
if (syncPlan.unsafe) {
await this.runUnsafeReindex({
reason: params?.reason,
force: params?.force,
@ -1019,20 +1016,15 @@ export abstract class MemoryManagerSyncOps {
return;
}
const shouldSyncMemory =
this.sources.has("memory") &&
((!hasTargetSessionFiles && params?.force) || needsFullReindex || this.dirty);
const shouldSyncSessions = this.shouldSyncSessions(params, needsFullReindex);
if (shouldSyncMemory) {
await this.syncMemoryFiles({ needsFullReindex, progress: progress ?? undefined });
if (syncPlan.shouldSyncMemory) {
await this.syncMemoryFiles({ needsFullReindex: false, progress: progress ?? undefined });
this.dirty = false;
}
if (shouldSyncSessions) {
if (syncPlan.shouldSyncSessions) {
await this.syncSessionFiles({
needsFullReindex,
targetSessionFiles: targetSessionFiles ? Array.from(targetSessionFiles) : undefined,
needsFullReindex: false,
targetSessionFiles: syncPlan.targetSessionFiles,
progress: progress ?? undefined,
});
this.sessionsDirty = false;
@ -1159,7 +1151,7 @@ export abstract class MemoryManagerSyncOps {
this.fts.loadError = undefined;
this.ensureSchema();
let nextMeta: MemoryIndexMeta | null = null;
let nextMeta: EmbeddingIndexMeta | null = null;
try {
this.seedEmbeddingCache(originalDb);
@ -1184,15 +1176,14 @@ export abstract class MemoryManagerSyncOps {
this.sessionsDirty = false;
}
nextMeta = {
model: this.provider?.model ?? "fts-only",
provider: this.provider?.id ?? "none",
providerKey: this.providerKey!,
sources: this.resolveConfiguredSourcesForMeta(),
scopeHash: this.resolveConfiguredScopeHash(),
nextMeta = buildEmbeddingIndexMeta({
provider: this.provider,
providerKey: this.providerKey,
configuredSources: this.resolveConfiguredSourcesForMeta(),
configuredScopeHash: this.resolveConfiguredScopeHash(),
chunkTokens: this.settings.chunking.tokens,
chunkOverlap: this.settings.chunking.overlap,
};
});
if (!nextMeta) {
throw new Error("Failed to compute memory index metadata for reindexing.");
}
@ -1256,15 +1247,14 @@ export abstract class MemoryManagerSyncOps {
this.sessionsDirty = false;
}
const nextMeta: MemoryIndexMeta = {
model: this.provider?.model ?? "fts-only",
provider: this.provider?.id ?? "none",
providerKey: this.providerKey!,
sources: this.resolveConfiguredSourcesForMeta(),
scopeHash: this.resolveConfiguredScopeHash(),
const nextMeta = buildEmbeddingIndexMeta({
provider: this.provider,
providerKey: this.providerKey,
configuredSources: this.resolveConfiguredSourcesForMeta(),
configuredScopeHash: this.resolveConfiguredScopeHash(),
chunkTokens: this.settings.chunking.tokens,
chunkOverlap: this.settings.chunking.overlap,
};
});
if (this.vector.available && this.vector.dims) {
nextMeta.vectorDims = this.vector.dims;
}
@ -1286,7 +1276,7 @@ export abstract class MemoryManagerSyncOps {
this.sessionsDirtyFiles.clear();
}
protected readMeta(): MemoryIndexMeta | null {
protected readMeta(): EmbeddingIndexMeta | null {
const row = this.db.prepare(`SELECT value FROM meta WHERE key = ?`).get(META_KEY) as
| { value: string }
| undefined;
@ -1295,7 +1285,7 @@ export abstract class MemoryManagerSyncOps {
return null;
}
try {
const parsed = JSON.parse(row.value) as MemoryIndexMeta;
const parsed = JSON.parse(row.value) as EmbeddingIndexMeta;
this.lastMetaSerialized = row.value;
return parsed;
} catch {
@ -1304,7 +1294,7 @@ export abstract class MemoryManagerSyncOps {
}
}
protected writeMeta(meta: MemoryIndexMeta) {
protected writeMeta(meta: EmbeddingIndexMeta) {
const value = JSON.stringify(meta);
if (this.lastMetaSerialized === value) {
return;
@ -1324,19 +1314,8 @@ export abstract class MemoryManagerSyncOps {
return normalized.length > 0 ? normalized : ["memory"];
}
private normalizeMetaSources(meta: MemoryIndexMeta): MemorySource[] {
if (!Array.isArray(meta.sources)) {
// Backward compatibility for older indexes that did not persist sources.
return ["memory"];
}
const normalized = Array.from(
new Set(
meta.sources.filter(
(source): source is MemorySource => source === "memory" || source === "sessions",
),
),
).toSorted();
return normalized.length > 0 ? normalized : ["memory"];
private normalizeMetaSources(meta: EmbeddingIndexMeta): MemorySource[] {
return normalizeEmbeddingMetaSources(meta);
}
private resolveConfiguredScopeHash(): string {
@ -1355,11 +1334,7 @@ export abstract class MemoryManagerSyncOps {
);
}
private metaSourcesDiffer(meta: MemoryIndexMeta, configuredSources: MemorySource[]): boolean {
const metaSources = this.normalizeMetaSources(meta);
if (metaSources.length !== configuredSources.length) {
return true;
}
return metaSources.some((source, index) => source !== configuredSources[index]);
private metaSourcesDiffer(meta: EmbeddingIndexMeta, configuredSources: MemorySource[]): boolean {
return extensionHostMetaSourcesDiffer(meta, configuredSources);
}
}