diff --git a/extensions/matrix/src/matrix/client/file-sync-store.test.ts b/extensions/matrix/src/matrix/client/file-sync-store.test.ts index 5bda781b5b2..632ec309210 100644 --- a/extensions/matrix/src/matrix/client/file-sync-store.test.ts +++ b/extensions/matrix/src/matrix/client/file-sync-store.test.ts @@ -91,6 +91,50 @@ describe("FileBackedMatrixSyncStore", () => { }, ]); expect(savedSync?.roomsData.join?.["!room:example.org"]).toBeTruthy(); + expect(secondStore.hasSavedSyncFromCleanShutdown()).toBe(false); + }); + + it("only treats sync state as restart-safe after a clean shutdown persist", async () => { + const tempDir = fs.mkdtempSync(path.join(os.tmpdir(), "openclaw-matrix-sync-store-")); + tempDirs.push(tempDir); + const storagePath = path.join(tempDir, "bot-storage.json"); + + const firstStore = new FileBackedMatrixSyncStore(storagePath); + await firstStore.setSyncData(createSyncResponse("s123")); + await firstStore.flush(); + + const afterDirtyPersist = new FileBackedMatrixSyncStore(storagePath); + expect(afterDirtyPersist.hasSavedSync()).toBe(true); + expect(afterDirtyPersist.hasSavedSyncFromCleanShutdown()).toBe(false); + + firstStore.markCleanShutdown(); + await firstStore.flush(); + + const afterCleanShutdown = new FileBackedMatrixSyncStore(storagePath); + expect(afterCleanShutdown.hasSavedSync()).toBe(true); + expect(afterCleanShutdown.hasSavedSyncFromCleanShutdown()).toBe(true); + }); + + it("clears the clean-shutdown marker once fresh sync data arrives", async () => { + const tempDir = fs.mkdtempSync(path.join(os.tmpdir(), "openclaw-matrix-sync-store-")); + tempDirs.push(tempDir); + const storagePath = path.join(tempDir, "bot-storage.json"); + + const firstStore = new FileBackedMatrixSyncStore(storagePath); + await firstStore.setSyncData(createSyncResponse("s123")); + firstStore.markCleanShutdown(); + await firstStore.flush(); + + const restartedStore = new FileBackedMatrixSyncStore(storagePath); + expect(restartedStore.hasSavedSyncFromCleanShutdown()).toBe(true); + + await restartedStore.setSyncData(createSyncResponse("s456")); + await restartedStore.flush(); + + const afterNewSync = new FileBackedMatrixSyncStore(storagePath); + expect(afterNewSync.hasSavedSync()).toBe(true); + expect(afterNewSync.hasSavedSyncFromCleanShutdown()).toBe(false); + await expect(afterNewSync.getSavedSyncToken()).resolves.toBe("s456"); }); it("coalesces background persistence until the debounce window elapses", async () => { diff --git a/extensions/matrix/src/matrix/client/file-sync-store.ts b/extensions/matrix/src/matrix/client/file-sync-store.ts index 411f4e0decd..cbb71e09727 100644 --- a/extensions/matrix/src/matrix/client/file-sync-store.ts +++ b/extensions/matrix/src/matrix/client/file-sync-store.ts @@ -17,6 +17,7 @@ type PersistedMatrixSyncStore = { version: number; savedSync: ISyncData | null; clientOptions?: IStoredClientOpts; + cleanShutdown?: boolean; }; function createAsyncLock() { @@ -76,6 +77,7 @@ function readPersistedStore(raw: string): PersistedMatrixSyncStore | null { version?: unknown; savedSync?: unknown; clientOptions?: unknown; + cleanShutdown?: unknown; }; const savedSync = toPersistedSyncData(parsed.savedSync); if (parsed.version === STORE_VERSION) { @@ -85,6 +87,7 @@ function readPersistedStore(raw: string): PersistedMatrixSyncStore | null { clientOptions: isRecord(parsed.clientOptions) ? (parsed.clientOptions as IStoredClientOpts) : undefined, + cleanShutdown: parsed.cleanShutdown === true, }; } @@ -93,6 +96,7 @@ function readPersistedStore(raw: string): PersistedMatrixSyncStore | null { return { version: STORE_VERSION, savedSync: toPersistedSyncData(parsed), + cleanShutdown: false, }; } catch { return null; @@ -119,6 +123,8 @@ export class FileBackedMatrixSyncStore extends MemoryStore { private savedSync: ISyncData | null = null; private savedClientOptions: IStoredClientOpts | undefined; private readonly hadSavedSyncOnLoad: boolean; + private readonly hadCleanShutdownOnLoad: boolean; + private cleanShutdown = false; private dirty = false; private persistTimer: NodeJS.Timeout | null = null; private persistPromise: Promise | null = null; @@ -128,11 +134,13 @@ export class FileBackedMatrixSyncStore extends MemoryStore { let restoredSavedSync: ISyncData | null = null; let restoredClientOptions: IStoredClientOpts | undefined; + let restoredCleanShutdown = false; try { const raw = readFileSync(this.storagePath, "utf8"); const persisted = readPersistedStore(raw); restoredSavedSync = persisted?.savedSync ?? null; restoredClientOptions = persisted?.clientOptions; + restoredCleanShutdown = persisted?.cleanShutdown === true; } catch { // Missing or unreadable sync cache should not block startup. } @@ -140,6 +148,8 @@ export class FileBackedMatrixSyncStore extends MemoryStore { this.savedSync = restoredSavedSync; this.savedClientOptions = restoredClientOptions; this.hadSavedSyncOnLoad = restoredSavedSync !== null; + this.hadCleanShutdownOnLoad = this.hadSavedSyncOnLoad && restoredCleanShutdown; + this.cleanShutdown = this.hadCleanShutdownOnLoad; if (this.savedSync) { this.accumulator.accumulate(syncDataToSyncResponse(this.savedSync), true); @@ -154,6 +164,10 @@ export class FileBackedMatrixSyncStore extends MemoryStore { return this.hadSavedSyncOnLoad; } + hasSavedSyncFromCleanShutdown(): boolean { + return this.hadCleanShutdownOnLoad; + } + override getSavedSync(): Promise { return Promise.resolve(this.savedSync ? cloneJson(this.savedSync) : null); } @@ -205,9 +219,15 @@ export class FileBackedMatrixSyncStore extends MemoryStore { await super.deleteAllData(); this.savedSync = null; this.savedClientOptions = undefined; + this.cleanShutdown = false; await fs.rm(this.storagePath, { force: true }).catch(() => undefined); } + markCleanShutdown(): void { + this.cleanShutdown = true; + this.dirty = true; + } + async flush(): Promise { if (this.persistTimer) { clearTimeout(this.persistTimer); @@ -224,6 +244,7 @@ export class FileBackedMatrixSyncStore extends MemoryStore { } private markDirtyAndSchedulePersist(): void { + this.cleanShutdown = false; this.dirty = true; if (this.persistTimer) { return; @@ -242,6 +263,7 @@ export class FileBackedMatrixSyncStore extends MemoryStore { const payload: PersistedMatrixSyncStore = { version: STORE_VERSION, savedSync: this.savedSync ? cloneJson(this.savedSync) : null, + cleanShutdown: this.cleanShutdown === true, ...(this.savedClientOptions ? { clientOptions: cloneJson(this.savedClientOptions) } : {}), }; try { diff --git a/extensions/matrix/src/matrix/monitor/index.test.ts b/extensions/matrix/src/matrix/monitor/index.test.ts index 6d6779de445..34538ed5b80 100644 --- a/extensions/matrix/src/matrix/monitor/index.test.ts +++ b/extensions/matrix/src/matrix/monitor/index.test.ts @@ -17,17 +17,17 @@ const hoisted = vi.hoisted(() => { debug: vi.fn(), }; const stopThreadBindingManager = vi.fn(); - const stopSharedClientInstance = vi.fn(); + const releaseSharedClientInstance = vi.fn(async () => true); const setActiveMatrixClient = vi.fn(); return { callOrder, client, createMatrixRoomMessageHandler, logger, + releaseSharedClientInstance, resolveTextChunkLimit, setActiveMatrixClient, startClientError: null as Error | null, - stopSharedClientInstance, stopThreadBindingManager, }; }); @@ -127,7 +127,10 @@ vi.mock("../client.js", () => ({ hoisted.callOrder.push("start-client"); return hoisted.client; }), - stopSharedClientInstance: hoisted.stopSharedClientInstance, +})); + +vi.mock("../client/shared.js", () => ({ + releaseSharedClientInstance: hoisted.releaseSharedClientInstance, })); vi.mock("../config-update.js", () => ({ @@ -206,8 +209,8 @@ describe("monitorMatrixProvider", () => { hoisted.callOrder.length = 0; hoisted.startClientError = null; hoisted.resolveTextChunkLimit.mockReset().mockReturnValue(4000); + hoisted.releaseSharedClientInstance.mockReset().mockResolvedValue(true); hoisted.setActiveMatrixClient.mockReset(); - hoisted.stopSharedClientInstance.mockReset(); hoisted.stopThreadBindingManager.mockReset(); hoisted.client.hasPersistedSyncState.mockReset().mockReturnValue(false); hoisted.createMatrixRoomMessageHandler.mockReset().mockReturnValue(vi.fn()); @@ -251,12 +254,13 @@ describe("monitorMatrixProvider", () => { await expect(monitorMatrixProvider()).rejects.toThrow("start failed"); expect(hoisted.stopThreadBindingManager).toHaveBeenCalledTimes(1); - expect(hoisted.stopSharedClientInstance).toHaveBeenCalledTimes(1); + expect(hoisted.releaseSharedClientInstance).toHaveBeenCalledTimes(1); + expect(hoisted.releaseSharedClientInstance).toHaveBeenCalledWith(hoisted.client, "persist"); expect(hoisted.setActiveMatrixClient).toHaveBeenNthCalledWith(1, hoisted.client, "default"); expect(hoisted.setActiveMatrixClient).toHaveBeenNthCalledWith(2, null, "default"); }); - it("disables cold-start backlog dropping when sync state already exists", async () => { + it("disables cold-start backlog dropping only when sync state is cleanly persisted", async () => { hoisted.client.hasPersistedSyncState.mockReturnValue(true); const { monitorMatrixProvider } = await import("./index.js"); const abortController = new AbortController(); diff --git a/extensions/matrix/src/matrix/monitor/index.ts b/extensions/matrix/src/matrix/monitor/index.ts index cb0b22734be..957d629440c 100644 --- a/extensions/matrix/src/matrix/monitor/index.ts +++ b/extensions/matrix/src/matrix/monitor/index.ts @@ -17,8 +17,8 @@ import { resolveMatrixAuth, resolveMatrixAuthContext, resolveSharedMatrixClient, - stopSharedClientInstance, } from "../client.js"; +import { releaseSharedClientInstance } from "../client/shared.js"; import { createMatrixThreadBindingManager } from "../thread-bindings.js"; import { registerMatrixAutoJoin } from "./auto-join.js"; import { resolveMatrixMonitorConfig } from "./config.js"; @@ -131,7 +131,7 @@ export async function monitorMatrixProvider(opts: MonitorMatrixOpts = {}): Promi setActiveMatrixClient(client, auth.accountId); let cleanedUp = false; let threadBindingManager: { accountId: string; stop: () => void } | null = null; - const cleanup = () => { + const cleanup = async () => { if (cleanedUp) { return; } @@ -139,7 +139,7 @@ export async function monitorMatrixProvider(opts: MonitorMatrixOpts = {}): Promi try { threadBindingManager?.stop(); } finally { - stopSharedClientInstance(client); + await releaseSharedClientInstance(client, "persist"); setActiveMatrixClient(null, auth.accountId); } }; @@ -273,19 +273,32 @@ export async function monitorMatrixProvider(opts: MonitorMatrixOpts = {}): Promi }); await new Promise((resolve) => { - const onAbort = () => { - logVerboseMessage("matrix: stopping client"); - cleanup(); - resolve(); + const stopAndResolve = async () => { + try { + logVerboseMessage("matrix: stopping client"); + await cleanup(); + } catch (err) { + logger.warn("matrix: failed during monitor shutdown cleanup", { + error: String(err), + }); + } finally { + resolve(); + } }; if (opts.abortSignal?.aborted) { - onAbort(); + void stopAndResolve(); return; } - opts.abortSignal?.addEventListener("abort", onAbort, { once: true }); + opts.abortSignal?.addEventListener( + "abort", + () => { + void stopAndResolve(); + }, + { once: true }, + ); }); } catch (err) { - cleanup(); + await cleanup(); throw err; } } diff --git a/extensions/matrix/src/matrix/sdk.ts b/extensions/matrix/src/matrix/sdk.ts index b2084e5c210..5b56e07d5d8 100644 --- a/extensions/matrix/src/matrix/sdk.ts +++ b/extensions/matrix/src/matrix/sdk.ts @@ -350,7 +350,9 @@ export class MatrixClient { } hasPersistedSyncState(): boolean { - return this.syncStore?.hasSavedSync() === true; + // Only trust restart replay when the previous process completed a final + // sync-store persist. A stale cursor can make Matrix re-surface old events. + return this.syncStore?.hasSavedSyncFromCleanShutdown() === true; } private async ensureStartedForCryptoControlPlane(): Promise { @@ -367,6 +369,7 @@ export class MatrixClient { } this.decryptBridge.stop(); // Final persist on shutdown + this.syncStore?.markCleanShutdown(); this.stopPersistPromise = Promise.all([ persistIdbToDisk({ snapshotPath: this.idbSnapshotPath,