From a05da767180f82b90389bd7cfa7087131e4ecc00 Mon Sep 17 00:00:00 2001 From: Gustavo Madeira Santana Date: Fri, 20 Mar 2026 12:13:24 -0700 Subject: [PATCH] Matrix: dedupe replayed inbound events on restart (#50922) Merged via squash. Prepared head SHA: 10d9770aa61d864686e4ba20fbcffb8a8dd68903 Co-authored-by: gumadeiras <5599352+gumadeiras@users.noreply.github.com> Co-authored-by: gumadeiras <5599352+gumadeiras@users.noreply.github.com> Reviewed-by: @gumadeiras --- CHANGELOG.md | 1 + .../matrix/monitor/handler.test-helpers.ts | 38 ++- .../matrix/src/matrix/monitor/handler.test.ts | 303 ++++++++++++++++++ .../matrix/src/matrix/monitor/handler.ts | 91 +++++- .../src/matrix/monitor/inbound-dedupe.test.ts | 146 +++++++++ .../src/matrix/monitor/inbound-dedupe.ts | 285 ++++++++++++++++ .../matrix/src/matrix/monitor/index.test.ts | 105 +++++- extensions/matrix/src/matrix/monitor/index.ts | 27 +- extensions/matrix/src/matrix/sdk.test.ts | 46 +++ extensions/matrix/src/matrix/sdk.ts | 14 +- .../matrix/src/matrix/sdk/decrypt-bridge.ts | 49 +++ 11 files changed, 1087 insertions(+), 18 deletions(-) create mode 100644 extensions/matrix/src/matrix/monitor/inbound-dedupe.test.ts create mode 100644 extensions/matrix/src/matrix/monitor/inbound-dedupe.ts diff --git a/CHANGELOG.md b/CHANGELOG.md index d857ac980ee..10abb592b24 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -196,6 +196,7 @@ Docs: https://docs.openclaw.ai - Exec/env sandbox: block build-tool JVM injection (`MAVEN_OPTS`, `SBT_OPTS`, `GRADLE_OPTS`, `ANT_OPTS`), glibc tunable exploitation (`GLIBC_TUNABLES`), and .NET dependency resolution hijack (`DOTNET_ADDITIONAL_DEPS`) from the host exec environment, and restrict Gradle init script redirect (`GRADLE_USER_HOME`) as an override-only block so user-configured Gradle homes still propagate. (#49702) - Plugins/Matrix: add a new Matrix plugin backed by the official `matrix-js-sdk`. If you are upgrading from the previous public Matrix plugin, follow the migration guide: https://docs.openclaw.ai/install/migrating-matrix Thanks @gumadeiras. - Discord/commands: switch native command deployment to Carbon reconcile by default so Discord restarts stop churning slash commands through OpenClaw’s local deploy path. (#46597) Thanks @huntharo and @thewilloftheshadow. +- Plugins/Matrix: durably dedupe inbound room events across gateway restarts so previously handled Matrix messages are not replayed as new, while preserving clean-restart backlog delivery for unseen events. (#50922) thanks @gumadeiras ## 2026.3.13 diff --git a/extensions/matrix/src/matrix/monitor/handler.test-helpers.ts b/extensions/matrix/src/matrix/monitor/handler.test-helpers.ts index 3aa13a735a0..585ce851b0a 100644 --- a/extensions/matrix/src/matrix/monitor/handler.test-helpers.ts +++ b/extensions/matrix/src/matrix/monitor/handler.test-helpers.ts @@ -52,16 +52,28 @@ type MatrixHandlerTestHarnessOptions = { resolveEnvelopeFormatOptions?: () => Record; formatAgentEnvelope?: ({ body }: { body: string }) => string; finalizeInboundContext?: (ctx: unknown) => unknown; - createReplyDispatcherWithTyping?: () => { + createReplyDispatcherWithTyping?: (params?: { + onError?: (err: unknown, info: { kind: "tool" | "block" | "final" }) => void; + }) => { dispatcher: Record; replyOptions: Record; markDispatchIdle: () => void; + markRunComplete: () => void; }; resolveHumanDelayConfig?: () => undefined; dispatchReplyFromConfig?: () => Promise<{ queuedFinal: boolean; counts: { final: number; block: number; tool: number }; }>; + withReplyDispatcher?: (params: { + dispatcher: { + markComplete?: () => void; + waitForIdle?: () => Promise; + }; + run: () => Promise; + onSettled?: () => void | Promise; + }) => Promise; + inboundDeduper?: MatrixMonitorHandlerParams["inboundDeduper"]; shouldAckReaction?: () => boolean; enqueueSystemEvent?: (...args: unknown[]) => void; getRoomInfo?: MatrixMonitorHandlerParams["getRoomInfo"]; @@ -138,9 +150,32 @@ export function createMatrixHandlerTestHarness( dispatcher: {}, replyOptions: {}, markDispatchIdle: () => {}, + markRunComplete: () => {}, })), resolveHumanDelayConfig: options.resolveHumanDelayConfig ?? (() => undefined), dispatchReplyFromConfig, + withReplyDispatcher: + options.withReplyDispatcher ?? + (async (params: { + dispatcher: { + markComplete?: () => void; + waitForIdle?: () => Promise; + }; + run: () => Promise; + onSettled?: () => void | Promise; + }) => { + const { dispatcher, run, onSettled } = params; + try { + return await run(); + } finally { + dispatcher.markComplete?.(); + try { + await dispatcher.waitForIdle?.(); + } finally { + await onSettled?.(); + } + } + }), }, reactions: { shouldAckReaction: options.shouldAckReaction ?? (() => false), @@ -179,6 +214,7 @@ export function createMatrixHandlerTestHarness( startupMs: options.startupMs ?? 0, startupGraceMs: options.startupGraceMs ?? 0, dropPreStartupMessages: options.dropPreStartupMessages ?? true, + inboundDeduper: options.inboundDeduper, directTracker: { isDirectMessage: async () => options.isDirectMessage ?? true, }, diff --git a/extensions/matrix/src/matrix/monitor/handler.test.ts b/extensions/matrix/src/matrix/monitor/handler.test.ts index 289623631fa..8e842e38baa 100644 --- a/extensions/matrix/src/matrix/monitor/handler.test.ts +++ b/extensions/matrix/src/matrix/monitor/handler.test.ts @@ -720,12 +720,36 @@ describe("matrix monitor handler pairing account scope", () => { dispatcher: {}, replyOptions: {}, markDispatchIdle: () => {}, + markRunComplete: () => {}, }), resolveHumanDelayConfig: () => undefined, dispatchReplyFromConfig: async () => ({ queuedFinal: true, counts: { final: 1, block: 0, tool: 0 }, }), + withReplyDispatcher: async ({ + dispatcher, + run, + onSettled, + }: { + dispatcher: { + markComplete?: () => void; + waitForIdle?: () => Promise; + }; + run: () => Promise; + onSettled?: () => void | Promise; + }) => { + try { + return await run(); + } finally { + dispatcher.markComplete?.(); + try { + await dispatcher.waitForIdle?.(); + } finally { + await onSettled?.(); + } + } + }, }, reactions: { shouldAckReaction: () => false, @@ -989,3 +1013,282 @@ describe("matrix monitor handler pairing account scope", () => { expect(resolveAgentRoute).toHaveBeenCalledTimes(1); }); }); + +describe("matrix monitor handler durable inbound dedupe", () => { + it("skips replayed inbound events before session recording", async () => { + const inboundDeduper = { + claimEvent: vi.fn(() => false), + commitEvent: vi.fn(async () => undefined), + releaseEvent: vi.fn(), + }; + const { handler, recordInboundSession } = createMatrixHandlerTestHarness({ + inboundDeduper, + dispatchReplyFromConfig: vi.fn(async () => ({ + queuedFinal: true, + counts: { final: 1, block: 0, tool: 0 }, + })), + }); + + await handler( + "!room:example.org", + createMatrixTextMessageEvent({ + eventId: "$dup", + body: "hello", + }), + ); + + expect(inboundDeduper.claimEvent).toHaveBeenCalledWith({ + roomId: "!room:example.org", + eventId: "$dup", + }); + expect(recordInboundSession).not.toHaveBeenCalled(); + expect(inboundDeduper.commitEvent).not.toHaveBeenCalled(); + expect(inboundDeduper.releaseEvent).not.toHaveBeenCalled(); + }); + + it("commits inbound events only after queued replies finish delivering", async () => { + const callOrder: string[] = []; + const inboundDeduper = { + claimEvent: vi.fn(() => { + callOrder.push("claim"); + return true; + }), + commitEvent: vi.fn(async () => { + callOrder.push("commit"); + }), + releaseEvent: vi.fn(() => { + callOrder.push("release"); + }), + }; + const recordInboundSession = vi.fn(async () => { + callOrder.push("record"); + }); + const dispatchReplyFromConfig = vi.fn(async () => { + callOrder.push("dispatch"); + return { + queuedFinal: true, + counts: { final: 1, block: 0, tool: 0 }, + }; + }); + const { handler } = createMatrixHandlerTestHarness({ + inboundDeduper, + recordInboundSession, + dispatchReplyFromConfig, + createReplyDispatcherWithTyping: () => ({ + dispatcher: { + markComplete: () => { + callOrder.push("mark-complete"); + }, + waitForIdle: async () => { + callOrder.push("wait-for-idle"); + }, + }, + replyOptions: {}, + markDispatchIdle: () => { + callOrder.push("dispatch-idle"); + }, + markRunComplete: () => { + callOrder.push("run-complete"); + }, + }), + }); + + await handler( + "!room:example.org", + createMatrixTextMessageEvent({ + eventId: "$commit-order", + body: "hello", + }), + ); + + expect(callOrder).toEqual([ + "claim", + "record", + "dispatch", + "run-complete", + "mark-complete", + "wait-for-idle", + "dispatch-idle", + "commit", + ]); + expect(inboundDeduper.releaseEvent).not.toHaveBeenCalled(); + }); + + it("releases a claimed event when reply dispatch fails before completion", async () => { + const inboundDeduper = { + claimEvent: vi.fn(() => true), + commitEvent: vi.fn(async () => undefined), + releaseEvent: vi.fn(), + }; + const runtime = { + error: vi.fn(), + }; + const { handler } = createMatrixHandlerTestHarness({ + inboundDeduper, + runtime: runtime as never, + recordInboundSession: vi.fn(async () => { + throw new Error("disk failed"); + }), + dispatchReplyFromConfig: vi.fn(async () => ({ + queuedFinal: true, + counts: { final: 1, block: 0, tool: 0 }, + })), + }); + + await handler( + "!room:example.org", + createMatrixTextMessageEvent({ + eventId: "$release-on-error", + body: "hello", + }), + ); + + expect(inboundDeduper.commitEvent).not.toHaveBeenCalled(); + expect(inboundDeduper.releaseEvent).toHaveBeenCalledWith({ + roomId: "!room:example.org", + eventId: "$release-on-error", + }); + expect(runtime.error).toHaveBeenCalledWith(expect.stringContaining("matrix handler failed")); + }); + + it("releases a claimed event when queued final delivery fails", async () => { + const inboundDeduper = { + claimEvent: vi.fn(() => true), + commitEvent: vi.fn(async () => undefined), + releaseEvent: vi.fn(), + }; + const runtime = { + error: vi.fn(), + }; + const { handler } = createMatrixHandlerTestHarness({ + inboundDeduper, + runtime: runtime as never, + dispatchReplyFromConfig: vi.fn(async () => ({ + queuedFinal: true, + counts: { final: 1, block: 0, tool: 0 }, + })), + createReplyDispatcherWithTyping: (params) => ({ + dispatcher: { + markComplete: () => {}, + waitForIdle: async () => { + params?.onError?.(new Error("send failed"), { kind: "final" }); + }, + }, + replyOptions: {}, + markDispatchIdle: () => {}, + markRunComplete: () => {}, + }), + }); + + await handler( + "!room:example.org", + createMatrixTextMessageEvent({ + eventId: "$release-on-final-delivery-error", + body: "hello", + }), + ); + + expect(inboundDeduper.commitEvent).not.toHaveBeenCalled(); + expect(inboundDeduper.releaseEvent).toHaveBeenCalledWith({ + roomId: "!room:example.org", + eventId: "$release-on-final-delivery-error", + }); + expect(runtime.error).toHaveBeenCalledWith( + expect.stringContaining("matrix final reply failed"), + ); + }); + + it.each(["tool", "block"] as const)( + "releases a claimed event when queued %s delivery fails and no final reply exists", + async (kind) => { + const inboundDeduper = { + claimEvent: vi.fn(() => true), + commitEvent: vi.fn(async () => undefined), + releaseEvent: vi.fn(), + }; + const runtime = { + error: vi.fn(), + }; + const { handler } = createMatrixHandlerTestHarness({ + inboundDeduper, + runtime: runtime as never, + dispatchReplyFromConfig: vi.fn(async () => ({ + queuedFinal: false, + counts: { + final: 0, + block: kind === "block" ? 1 : 0, + tool: kind === "tool" ? 1 : 0, + }, + })), + createReplyDispatcherWithTyping: (params) => ({ + dispatcher: { + markComplete: () => {}, + waitForIdle: async () => { + params?.onError?.(new Error("send failed"), { kind }); + }, + }, + replyOptions: {}, + markDispatchIdle: () => {}, + markRunComplete: () => {}, + }), + }); + + await handler( + "!room:example.org", + createMatrixTextMessageEvent({ + eventId: `$release-on-${kind}-delivery-error`, + body: "hello", + }), + ); + + expect(inboundDeduper.commitEvent).not.toHaveBeenCalled(); + expect(inboundDeduper.releaseEvent).toHaveBeenCalledWith({ + roomId: "!room:example.org", + eventId: `$release-on-${kind}-delivery-error`, + }); + expect(runtime.error).toHaveBeenCalledWith( + expect.stringContaining(`matrix ${kind} reply failed`), + ); + }, + ); + + it("commits a claimed event when dispatch completes without a final reply", async () => { + const callOrder: string[] = []; + const inboundDeduper = { + claimEvent: vi.fn(() => { + callOrder.push("claim"); + return true; + }), + commitEvent: vi.fn(async () => { + callOrder.push("commit"); + }), + releaseEvent: vi.fn(() => { + callOrder.push("release"); + }), + }; + const { handler } = createMatrixHandlerTestHarness({ + inboundDeduper, + recordInboundSession: vi.fn(async () => { + callOrder.push("record"); + }), + dispatchReplyFromConfig: vi.fn(async () => { + callOrder.push("dispatch"); + return { + queuedFinal: false, + counts: { final: 0, block: 0, tool: 0 }, + }; + }), + }); + + await handler( + "!room:example.org", + createMatrixTextMessageEvent({ + eventId: "$no-final", + body: "hello", + }), + ); + + expect(callOrder).toEqual(["claim", "record", "dispatch", "commit"]); + expect(inboundDeduper.releaseEvent).not.toHaveBeenCalled(); + }); +}); diff --git a/extensions/matrix/src/matrix/monitor/handler.ts b/extensions/matrix/src/matrix/monitor/handler.ts index b7295009bcd..40c386e3820 100644 --- a/extensions/matrix/src/matrix/monitor/handler.ts +++ b/extensions/matrix/src/matrix/monitor/handler.ts @@ -30,6 +30,7 @@ import { } from "../send.js"; import { resolveMatrixMonitorAccessState } from "./access-state.js"; import { resolveMatrixAckReactionConfig } from "./ack-config.js"; +import type { MatrixInboundEventDeduper } from "./inbound-dedupe.js"; import { resolveMatrixLocation, type MatrixLocationPayload } from "./location.js"; import { downloadMatrixMedia } from "./media.js"; import { resolveMentions } from "./mentions.js"; @@ -72,6 +73,7 @@ export type MatrixMonitorHandlerParams = { startupMs: number; startupGraceMs: number; dropPreStartupMessages: boolean; + inboundDeduper?: Pick; directTracker: { isDirectMessage: (params: { roomId: string; @@ -163,6 +165,7 @@ export function createMatrixRoomMessageHandler(params: MatrixMonitorHandlerParam startupMs, startupGraceMs, dropPreStartupMessages, + inboundDeduper, directTracker, getRoomInfo, getMemberDisplayName, @@ -219,6 +222,8 @@ export function createMatrixRoomMessageHandler(params: MatrixMonitorHandlerParam }; return async (roomId: string, event: MatrixRawEvent) => { + const eventId = typeof event.event_id === "string" ? event.event_id.trim() : ""; + let claimedInboundEvent = false; try { const eventType = event.type; if (eventType === EventType.RoomMessageEncrypted) { @@ -256,6 +261,13 @@ export function createMatrixRoomMessageHandler(params: MatrixMonitorHandlerParam } const eventTs = event.origin_server_ts; const eventAge = event.unsigned?.age; + const commitInboundEventIfClaimed = async () => { + if (!claimedInboundEvent || !inboundDeduper || !eventId) { + return; + } + await inboundDeduper.commitEvent({ roomId, eventId }); + claimedInboundEvent = false; + }; if (dropPreStartupMessages) { if (typeof eventTs === "number" && eventTs < startupMs - startupGraceMs) { return; @@ -293,6 +305,13 @@ export function createMatrixRoomMessageHandler(params: MatrixMonitorHandlerParam return; } } + if (eventId && inboundDeduper) { + claimedInboundEvent = inboundDeduper.claimEvent({ roomId, eventId }); + if (!claimedInboundEvent) { + logVerboseMessage(`matrix: skip duplicate inbound event room=${roomId} id=${eventId}`); + return; + } + } const isDirectMessage = await directTracker.isDirectMessage({ roomId, @@ -302,6 +321,7 @@ export function createMatrixRoomMessageHandler(params: MatrixMonitorHandlerParam const isRoom = !isDirectMessage; if (isRoom && groupPolicy === "disabled") { + await commitInboundEventIfClaimed(); return; } @@ -332,20 +352,24 @@ export function createMatrixRoomMessageHandler(params: MatrixMonitorHandlerParam logVerboseMessage( `matrix: drop configured bot sender=${senderId} (allowBots=false${isDirectMessage ? "" : `, ${roomMatchMeta}`})`, ); + await commitInboundEventIfClaimed(); return; } if (isRoom && roomConfig && !roomConfigInfo?.allowed) { logVerboseMessage(`matrix: room disabled room=${roomId} (${roomMatchMeta})`); + await commitInboundEventIfClaimed(); return; } if (isRoom && groupPolicy === "allowlist") { if (!roomConfigInfo?.allowlistConfigured) { logVerboseMessage(`matrix: drop room message (no allowlist, ${roomMatchMeta})`); + await commitInboundEventIfClaimed(); return; } if (!roomConfig) { logVerboseMessage(`matrix: drop room message (not in allowlist, ${roomMatchMeta})`); + await commitInboundEventIfClaimed(); return; } } @@ -378,6 +402,7 @@ export function createMatrixRoomMessageHandler(params: MatrixMonitorHandlerParam if (isDirectMessage) { if (!dmEnabled || dmPolicy === "disabled") { + await commitInboundEventIfClaimed(); return; } if (dmPolicy !== "open") { @@ -414,19 +439,23 @@ export function createMatrixRoomMessageHandler(params: MatrixMonitorHandlerParam accountId, }, ); + await commitInboundEventIfClaimed(); } catch (err) { logVerboseMessage(`matrix pairing reply failed for ${senderId}: ${String(err)}`); + return; } } else { logVerboseMessage( `matrix pairing reminder suppressed sender=${senderId} (cooldown)`, ); + await commitInboundEventIfClaimed(); } } if (isReactionEvent || dmPolicy !== "pairing") { logVerboseMessage( `matrix: blocked ${isReactionEvent ? "reaction" : "dm"} sender ${senderId} (dmPolicy=${dmPolicy}, ${allowMatchMeta})`, ); + await commitInboundEventIfClaimed(); } return; } @@ -439,6 +468,7 @@ export function createMatrixRoomMessageHandler(params: MatrixMonitorHandlerParam roomUserMatch, )})`, ); + await commitInboundEventIfClaimed(); return; } if ( @@ -453,6 +483,7 @@ export function createMatrixRoomMessageHandler(params: MatrixMonitorHandlerParam groupAllowMatch, )})`, ); + await commitInboundEventIfClaimed(); return; } } @@ -475,6 +506,7 @@ export function createMatrixRoomMessageHandler(params: MatrixMonitorHandlerParam isDirectMessage, logVerboseMessage, }); + await commitInboundEventIfClaimed(); return; } @@ -491,6 +523,7 @@ export function createMatrixRoomMessageHandler(params: MatrixMonitorHandlerParam : undefined; const mediaUrl = contentUrl ?? contentFile?.url; if (!mentionPrecheckText && !mediaUrl && !isPollEvent) { + await commitInboundEventIfClaimed(); return; } @@ -509,6 +542,7 @@ export function createMatrixRoomMessageHandler(params: MatrixMonitorHandlerParam logVerboseMessage( `matrix: drop configured bot sender=${senderId} (allowBots=mentions, missing mention, ${roomMatchMeta})`, ); + await commitInboundEventIfClaimed(); return; } const allowTextCommands = core.channel.commands.shouldHandleTextCommands({ @@ -534,6 +568,7 @@ export function createMatrixRoomMessageHandler(params: MatrixMonitorHandlerParam reason: "control command (unauthorized)", target: senderId, }); + await commitInboundEventIfClaimed(); return; } const shouldRequireMention = isRoom @@ -556,6 +591,7 @@ export function createMatrixRoomMessageHandler(params: MatrixMonitorHandlerParam const canDetectMention = mentionRegexes.length > 0 || hasExplicitMention; if (isRoom && shouldRequireMention && !wasMentioned && !shouldBypassMention) { logger.info("skipping room message", { roomId, reason: "no-mention" }); + await commitInboundEventIfClaimed(); return; } @@ -631,6 +667,7 @@ export function createMatrixRoomMessageHandler(params: MatrixMonitorHandlerParam mediaDownloadFailed, }); if (!bodyText) { + await commitInboundEventIfClaimed(); return; } const senderName = await getSenderName(); @@ -799,6 +836,8 @@ export function createMatrixRoomMessageHandler(params: MatrixMonitorHandlerParam accountId: route.accountId, }); const mediaLocalRoots = getAgentScopedMediaLocalRoots(cfg, route.agentId); + let finalReplyDeliveryFailed = false; + let nonFinalReplyDeliveryFailed = false; const { onModelSelected, ...prefixOptions } = createReplyPrefixOptions({ cfg, agentId: route.agentId, @@ -827,7 +866,7 @@ export function createMatrixRoomMessageHandler(params: MatrixMonitorHandlerParam }); }, }); - const { dispatcher, replyOptions, markDispatchIdle } = + const { dispatcher, replyOptions, markDispatchIdle, markRunComplete } = core.channel.reply.createReplyDispatcherWithTyping({ ...prefixOptions, humanDelay: core.channel.reply.resolveHumanDelayConfig(cfg, route.agentId), @@ -847,32 +886,66 @@ export function createMatrixRoomMessageHandler(params: MatrixMonitorHandlerParam }); }, onError: (err: unknown, info: { kind: "tool" | "block" | "final" }) => { + if (info.kind === "final") { + finalReplyDeliveryFailed = true; + } else { + nonFinalReplyDeliveryFailed = true; + } runtime.error?.(`matrix ${info.kind} reply failed: ${String(err)}`); }, onReplyStart: typingCallbacks.onReplyStart, onIdle: typingCallbacks.onIdle, }); - const { queuedFinal, counts } = await core.channel.reply.dispatchReplyFromConfig({ - ctx: ctxPayload, - cfg, + const { queuedFinal, counts } = await core.channel.reply.withReplyDispatcher({ dispatcher, - replyOptions: { - ...replyOptions, - skillFilter: roomConfig?.skills, - onModelSelected, + onSettled: () => { + markDispatchIdle(); + }, + run: async () => { + try { + return await core.channel.reply.dispatchReplyFromConfig({ + ctx: ctxPayload, + cfg, + dispatcher, + replyOptions: { + ...replyOptions, + skillFilter: roomConfig?.skills, + onModelSelected, + }, + }); + } finally { + markRunComplete(); + } }, }); - markDispatchIdle(); + if (finalReplyDeliveryFailed) { + logVerboseMessage( + `matrix: final reply delivery failed room=${roomId} id=${messageId}; leaving event uncommitted`, + ); + return; + } + if (!queuedFinal && nonFinalReplyDeliveryFailed) { + logVerboseMessage( + `matrix: non-final reply delivery failed room=${roomId} id=${messageId}; leaving event uncommitted`, + ); + return; + } if (!queuedFinal) { + await commitInboundEventIfClaimed(); return; } const finalCount = counts.final; logVerboseMessage( `matrix: delivered ${finalCount} reply${finalCount === 1 ? "" : "ies"} to ${replyTarget}`, ); + await commitInboundEventIfClaimed(); } catch (err) { runtime.error?.(`matrix handler failed: ${String(err)}`); + } finally { + if (claimedInboundEvent && inboundDeduper && eventId) { + inboundDeduper.releaseEvent({ roomId, eventId }); + } } }; } diff --git a/extensions/matrix/src/matrix/monitor/inbound-dedupe.test.ts b/extensions/matrix/src/matrix/monitor/inbound-dedupe.test.ts new file mode 100644 index 00000000000..e0ad423c1f1 --- /dev/null +++ b/extensions/matrix/src/matrix/monitor/inbound-dedupe.test.ts @@ -0,0 +1,146 @@ +import fs from "node:fs"; +import os from "node:os"; +import path from "node:path"; +import { afterEach, describe, expect, it, vi } from "vitest"; +import { createMatrixInboundEventDeduper } from "./inbound-dedupe.js"; + +describe("Matrix inbound event dedupe", () => { + const tempDirs: string[] = []; + + afterEach(() => { + vi.restoreAllMocks(); + vi.useRealTimers(); + for (const dir of tempDirs.splice(0)) { + fs.rmSync(dir, { recursive: true, force: true }); + } + }); + + function createStoragePath(): string { + const dir = fs.mkdtempSync(path.join(os.tmpdir(), "openclaw-matrix-inbound-dedupe-")); + tempDirs.push(dir); + return path.join(dir, "inbound-dedupe.json"); + } + + const auth = { + accountId: "ops", + homeserver: "https://matrix.example.org", + userId: "@bot:example.org", + accessToken: "token", + deviceId: "DEVICE", + } as const; + + it("persists committed events across restarts", async () => { + const storagePath = createStoragePath(); + const first = await createMatrixInboundEventDeduper({ + auth: auth as never, + storagePath, + }); + + expect(first.claimEvent({ roomId: "!room:example.org", eventId: "$event-1" })).toBe(true); + await first.commitEvent({ + roomId: "!room:example.org", + eventId: "$event-1", + }); + await first.stop(); + + const second = await createMatrixInboundEventDeduper({ + auth: auth as never, + storagePath, + }); + expect(second.claimEvent({ roomId: "!room:example.org", eventId: "$event-1" })).toBe(false); + }); + + it("does not persist released pending claims", async () => { + const storagePath = createStoragePath(); + const first = await createMatrixInboundEventDeduper({ + auth: auth as never, + storagePath, + }); + + expect(first.claimEvent({ roomId: "!room:example.org", eventId: "$event-2" })).toBe(true); + first.releaseEvent({ roomId: "!room:example.org", eventId: "$event-2" }); + await first.stop(); + + const second = await createMatrixInboundEventDeduper({ + auth: auth as never, + storagePath, + }); + expect(second.claimEvent({ roomId: "!room:example.org", eventId: "$event-2" })).toBe(true); + }); + + it("prunes expired and overflowed entries on load", async () => { + const storagePath = createStoragePath(); + fs.writeFileSync( + storagePath, + JSON.stringify({ + version: 1, + entries: [ + { key: "!room:example.org|$old", ts: 10 }, + { key: "!room:example.org|$keep-1", ts: 90 }, + { key: "!room:example.org|$keep-2", ts: 95 }, + { key: "!room:example.org|$keep-3", ts: 100 }, + ], + }), + "utf8", + ); + + const deduper = await createMatrixInboundEventDeduper({ + auth: auth as never, + storagePath, + ttlMs: 20, + maxEntries: 2, + nowMs: () => 100, + }); + + expect(deduper.claimEvent({ roomId: "!room:example.org", eventId: "$old" })).toBe(true); + expect(deduper.claimEvent({ roomId: "!room:example.org", eventId: "$keep-1" })).toBe(true); + expect(deduper.claimEvent({ roomId: "!room:example.org", eventId: "$keep-2" })).toBe(false); + expect(deduper.claimEvent({ roomId: "!room:example.org", eventId: "$keep-3" })).toBe(false); + }); + + it("retains replayed backlog events based on processing time", async () => { + const storagePath = createStoragePath(); + let now = 100; + const first = await createMatrixInboundEventDeduper({ + auth: auth as never, + storagePath, + ttlMs: 20, + nowMs: () => now, + }); + + expect(first.claimEvent({ roomId: "!room:example.org", eventId: "$backlog" })).toBe(true); + await first.commitEvent({ + roomId: "!room:example.org", + eventId: "$backlog", + }); + await first.stop(); + + now = 110; + const second = await createMatrixInboundEventDeduper({ + auth: auth as never, + storagePath, + ttlMs: 20, + nowMs: () => now, + }); + expect(second.claimEvent({ roomId: "!room:example.org", eventId: "$backlog" })).toBe(false); + }); + + it("treats stop persistence failures as best-effort cleanup", async () => { + const blockingPath = createStoragePath(); + fs.writeFileSync(blockingPath, "blocking file", "utf8"); + const deduper = await createMatrixInboundEventDeduper({ + auth: auth as never, + storagePath: path.join(blockingPath, "nested", "inbound-dedupe.json"), + }); + + expect(deduper.claimEvent({ roomId: "!room:example.org", eventId: "$persist-fail" })).toBe( + true, + ); + await deduper.commitEvent({ + roomId: "!room:example.org", + eventId: "$persist-fail", + }); + + await expect(deduper.stop()).resolves.toBeUndefined(); + }); +}); diff --git a/extensions/matrix/src/matrix/monitor/inbound-dedupe.ts b/extensions/matrix/src/matrix/monitor/inbound-dedupe.ts new file mode 100644 index 00000000000..2e2b3b8461d --- /dev/null +++ b/extensions/matrix/src/matrix/monitor/inbound-dedupe.ts @@ -0,0 +1,285 @@ +import path from "node:path"; +import { readJsonFileWithFallback, writeJsonFileAtomically } from "../../runtime-api.js"; +import { resolveMatrixStoragePaths } from "../client/storage.js"; +import type { MatrixAuth } from "../client/types.js"; +import { LogService } from "../sdk/logger.js"; + +const INBOUND_DEDUPE_FILENAME = "inbound-dedupe.json"; +const STORE_VERSION = 1; +const DEFAULT_MAX_ENTRIES = 20_000; +const DEFAULT_TTL_MS = 30 * 24 * 60 * 60 * 1000; +const PERSIST_DEBOUNCE_MS = 250; + +type StoredMatrixInboundDedupeEntry = { + key: string; + ts: number; +}; + +type StoredMatrixInboundDedupeState = { + version: number; + entries: StoredMatrixInboundDedupeEntry[]; +}; + +export type MatrixInboundEventDeduper = { + claimEvent: (params: { roomId: string; eventId: string }) => boolean; + commitEvent: (params: { roomId: string; eventId: string }) => Promise; + releaseEvent: (params: { roomId: string; eventId: string }) => void; + flush: () => Promise; + stop: () => Promise; +}; + +function createAsyncLock() { + let lock: Promise = Promise.resolve(); + return async function withLock(fn: () => Promise): Promise { + const previous = lock; + let release: (() => void) | undefined; + lock = new Promise((resolve) => { + release = resolve; + }); + await previous; + try { + return await fn(); + } finally { + release?.(); + } + }; +} + +function normalizeEventPart(value: string): string { + return value.trim(); +} + +function buildEventKey(params: { roomId: string; eventId: string }): string { + const roomId = normalizeEventPart(params.roomId); + const eventId = normalizeEventPart(params.eventId); + return roomId && eventId ? `${roomId}|${eventId}` : ""; +} + +function resolveInboundDedupeStatePath(params: { + auth: MatrixAuth; + env?: NodeJS.ProcessEnv; + stateDir?: string; +}): string { + const storagePaths = resolveMatrixStoragePaths({ + homeserver: params.auth.homeserver, + userId: params.auth.userId, + accessToken: params.auth.accessToken, + accountId: params.auth.accountId, + deviceId: params.auth.deviceId, + env: params.env, + stateDir: params.stateDir, + }); + return path.join(storagePaths.rootDir, INBOUND_DEDUPE_FILENAME); +} + +function normalizeTimestamp(raw: unknown): number | null { + if (typeof raw !== "number" || !Number.isFinite(raw)) { + return null; + } + return Math.max(0, Math.floor(raw)); +} + +function pruneSeenEvents(params: { + seen: Map; + ttlMs: number; + maxEntries: number; + nowMs: number; +}) { + const { seen, ttlMs, maxEntries, nowMs } = params; + if (ttlMs > 0) { + const cutoff = nowMs - ttlMs; + for (const [key, ts] of seen) { + if (ts < cutoff) { + seen.delete(key); + } + } + } + const max = Math.max(0, Math.floor(maxEntries)); + if (max <= 0) { + seen.clear(); + return; + } + while (seen.size > max) { + const oldestKey = seen.keys().next().value; + if (typeof oldestKey !== "string") { + break; + } + seen.delete(oldestKey); + } +} + +function toStoredState(params: { + seen: Map; + ttlMs: number; + maxEntries: number; + nowMs: number; +}): StoredMatrixInboundDedupeState { + pruneSeenEvents(params); + return { + version: STORE_VERSION, + entries: Array.from(params.seen.entries()).map(([key, ts]) => ({ key, ts })), + }; +} + +async function readStoredState( + storagePath: string, +): Promise { + const { value } = await readJsonFileWithFallback( + storagePath, + null, + ); + if (value?.version !== STORE_VERSION || !Array.isArray(value.entries)) { + return null; + } + return value; +} + +export async function createMatrixInboundEventDeduper(params: { + auth: MatrixAuth; + env?: NodeJS.ProcessEnv; + stateDir?: string; + storagePath?: string; + ttlMs?: number; + maxEntries?: number; + nowMs?: () => number; +}): Promise { + const nowMs = params.nowMs ?? (() => Date.now()); + const ttlMs = + typeof params.ttlMs === "number" && Number.isFinite(params.ttlMs) + ? Math.max(0, Math.floor(params.ttlMs)) + : DEFAULT_TTL_MS; + const maxEntries = + typeof params.maxEntries === "number" && Number.isFinite(params.maxEntries) + ? Math.max(0, Math.floor(params.maxEntries)) + : DEFAULT_MAX_ENTRIES; + const storagePath = + params.storagePath ?? + resolveInboundDedupeStatePath({ + auth: params.auth, + env: params.env, + stateDir: params.stateDir, + }); + + const seen = new Map(); + const pending = new Set(); + const persistLock = createAsyncLock(); + + try { + const stored = await readStoredState(storagePath); + for (const entry of stored?.entries ?? []) { + if (!entry || typeof entry.key !== "string") { + continue; + } + const key = entry.key.trim(); + const ts = normalizeTimestamp(entry.ts); + if (!key || ts === null) { + continue; + } + seen.set(key, ts); + } + pruneSeenEvents({ seen, ttlMs, maxEntries, nowMs: nowMs() }); + } catch (err) { + LogService.warn("MatrixInboundDedupe", "Failed loading Matrix inbound dedupe store:", err); + } + + let dirty = false; + let persistTimer: NodeJS.Timeout | null = null; + let persistPromise: Promise | null = null; + + const persist = async () => { + dirty = false; + const payload = toStoredState({ + seen, + ttlMs, + maxEntries, + nowMs: nowMs(), + }); + try { + await persistLock(async () => { + await writeJsonFileAtomically(storagePath, payload); + }); + } catch (err) { + dirty = true; + throw err; + } + }; + + const flush = async (): Promise => { + if (persistTimer) { + clearTimeout(persistTimer); + persistTimer = null; + } + while (dirty || persistPromise) { + if (dirty && !persistPromise) { + persistPromise = persist().finally(() => { + persistPromise = null; + }); + } + await persistPromise; + } + }; + + const schedulePersist = () => { + dirty = true; + if (persistTimer) { + return; + } + persistTimer = setTimeout(() => { + persistTimer = null; + void flush().catch((err) => { + LogService.warn( + "MatrixInboundDedupe", + "Failed persisting Matrix inbound dedupe store:", + err, + ); + }); + }, PERSIST_DEBOUNCE_MS); + persistTimer.unref?.(); + }; + + return { + claimEvent: ({ roomId, eventId }) => { + const key = buildEventKey({ roomId, eventId }); + if (!key) { + return true; + } + pruneSeenEvents({ seen, ttlMs, maxEntries, nowMs: nowMs() }); + if (seen.has(key) || pending.has(key)) { + return false; + } + pending.add(key); + return true; + }, + commitEvent: async ({ roomId, eventId }) => { + const key = buildEventKey({ roomId, eventId }); + if (!key) { + return; + } + pending.delete(key); + const ts = nowMs(); + seen.delete(key); + seen.set(key, ts); + pruneSeenEvents({ seen, ttlMs, maxEntries, nowMs: nowMs() }); + schedulePersist(); + }, + releaseEvent: ({ roomId, eventId }) => { + const key = buildEventKey({ roomId, eventId }); + if (!key) { + return; + } + pending.delete(key); + }, + flush, + stop: async () => { + try { + await flush(); + } catch (err) { + LogService.warn( + "MatrixInboundDedupe", + "Failed to flush Matrix inbound dedupe store during stop():", + err, + ); + } + }, + }; +} diff --git a/extensions/matrix/src/matrix/monitor/index.test.ts b/extensions/matrix/src/matrix/monitor/index.test.ts index b7ddb8f9656..b9aa8e8b624 100644 --- a/extensions/matrix/src/matrix/monitor/index.test.ts +++ b/extensions/matrix/src/matrix/monitor/index.test.ts @@ -5,9 +5,18 @@ const hoisted = vi.hoisted(() => { const state = { startClientError: null as Error | null, }; + const inboundDeduper = { + claimEvent: vi.fn(() => true), + commitEvent: vi.fn(async () => undefined), + releaseEvent: vi.fn(), + flush: vi.fn(async () => undefined), + stop: vi.fn(async () => undefined), + }; const client = { id: "matrix-client", hasPersistedSyncState: vi.fn(() => false), + stopSyncWithoutPersist: vi.fn(), + drainPendingDecryptions: vi.fn(async () => undefined), }; const createMatrixRoomMessageHandler = vi.fn(() => vi.fn()); const resolveTextChunkLimit = vi.fn< @@ -26,7 +35,9 @@ const hoisted = vi.hoisted(() => { callOrder, client, createMatrixRoomMessageHandler, + inboundDeduper, logger, + registeredOnRoomMessage: null as null | ((roomId: string, event: unknown) => Promise), releaseSharedClientInstance, resolveTextChunkLimit, setActiveMatrixClient, @@ -181,15 +192,22 @@ vi.mock("./direct.js", () => ({ })); vi.mock("./events.js", () => ({ - registerMatrixMonitorEvents: vi.fn(() => { - hoisted.callOrder.push("register-events"); - }), + registerMatrixMonitorEvents: vi.fn( + (params: { onRoomMessage: (roomId: string, event: unknown) => Promise }) => { + hoisted.callOrder.push("register-events"); + hoisted.registeredOnRoomMessage = params.onRoomMessage; + }, + ), })); vi.mock("./handler.js", () => ({ createMatrixRoomMessageHandler: hoisted.createMatrixRoomMessageHandler, })); +vi.mock("./inbound-dedupe.js", () => ({ + createMatrixInboundEventDeduper: vi.fn(async () => hoisted.inboundDeduper), +})); + vi.mock("./legacy-crypto-restore.js", () => ({ maybeRestoreLegacyMatrixBackup: vi.fn(), })); @@ -214,9 +232,17 @@ describe("monitorMatrixProvider", () => { hoisted.state.startClientError = null; hoisted.resolveTextChunkLimit.mockReset().mockReturnValue(4000); hoisted.releaseSharedClientInstance.mockReset().mockResolvedValue(true); + hoisted.registeredOnRoomMessage = null; hoisted.setActiveMatrixClient.mockReset(); hoisted.stopThreadBindingManager.mockReset(); hoisted.client.hasPersistedSyncState.mockReset().mockReturnValue(false); + hoisted.client.stopSyncWithoutPersist.mockReset(); + hoisted.client.drainPendingDecryptions.mockReset().mockResolvedValue(undefined); + hoisted.inboundDeduper.claimEvent.mockReset().mockReturnValue(true); + hoisted.inboundDeduper.commitEvent.mockReset().mockResolvedValue(undefined); + hoisted.inboundDeduper.releaseEvent.mockReset(); + hoisted.inboundDeduper.flush.mockReset().mockResolvedValue(undefined); + hoisted.inboundDeduper.stop.mockReset().mockResolvedValue(undefined); hoisted.createMatrixRoomMessageHandler.mockReset().mockReturnValue(vi.fn()); Object.values(hoisted.logger).forEach((mock) => mock.mockReset()); }); @@ -278,4 +304,77 @@ describe("monitorMatrixProvider", () => { }), ); }); + + it("stops sync, drains decryptions, then waits for in-flight handlers before persisting", async () => { + const { monitorMatrixProvider } = await import("./index.js"); + const abortController = new AbortController(); + let resolveHandler: (() => void) | null = null; + + hoisted.createMatrixRoomMessageHandler.mockReturnValue( + vi.fn(() => { + hoisted.callOrder.push("handler-start"); + return new Promise((resolve) => { + resolveHandler = () => { + hoisted.callOrder.push("handler-done"); + resolve(); + }; + }); + }), + ); + hoisted.client.stopSyncWithoutPersist.mockImplementation(() => { + hoisted.callOrder.push("pause-client"); + }); + hoisted.client.drainPendingDecryptions.mockImplementation(async () => { + hoisted.callOrder.push("drain-decrypts"); + }); + hoisted.stopThreadBindingManager.mockImplementation(() => { + hoisted.callOrder.push("stop-manager"); + }); + hoisted.releaseSharedClientInstance.mockImplementation(async () => { + hoisted.callOrder.push("release-client"); + return true; + }); + hoisted.inboundDeduper.stop.mockImplementation(async () => { + hoisted.callOrder.push("stop-deduper"); + }); + + const monitorPromise = monitorMatrixProvider({ abortSignal: abortController.signal }); + await vi.waitFor(() => { + expect(hoisted.callOrder).toContain("start-client"); + }); + const onRoomMessage = hoisted.registeredOnRoomMessage; + if (!onRoomMessage) { + throw new Error("expected room message handler to be registered"); + } + + const roomMessagePromise = onRoomMessage("!room:example.org", { event_id: "$event" }); + abortController.abort(); + await vi.waitFor(() => { + expect(hoisted.callOrder).toContain("pause-client"); + }); + expect(hoisted.callOrder).not.toContain("stop-deduper"); + + if (resolveHandler === null) { + throw new Error("expected in-flight handler to be pending"); + } + (resolveHandler as () => void)(); + await roomMessagePromise; + await monitorPromise; + + expect(hoisted.callOrder.indexOf("pause-client")).toBeLessThan( + hoisted.callOrder.indexOf("drain-decrypts"), + ); + expect(hoisted.callOrder.indexOf("drain-decrypts")).toBeLessThan( + hoisted.callOrder.indexOf("handler-done"), + ); + expect(hoisted.callOrder.indexOf("handler-done")).toBeLessThan( + hoisted.callOrder.indexOf("stop-manager"), + ); + expect(hoisted.callOrder.indexOf("stop-manager")).toBeLessThan( + hoisted.callOrder.indexOf("stop-deduper"), + ); + expect(hoisted.callOrder.indexOf("stop-deduper")).toBeLessThan( + hoisted.callOrder.indexOf("release-client"), + ); + }); }); diff --git a/extensions/matrix/src/matrix/monitor/index.ts b/extensions/matrix/src/matrix/monitor/index.ts index 62ea41b0169..71efc539424 100644 --- a/extensions/matrix/src/matrix/monitor/index.ts +++ b/extensions/matrix/src/matrix/monitor/index.ts @@ -25,6 +25,7 @@ import { resolveMatrixMonitorConfig } from "./config.js"; import { createDirectRoomTracker } from "./direct.js"; import { registerMatrixMonitorEvents } from "./events.js"; import { createMatrixRoomMessageHandler } from "./handler.js"; +import { createMatrixInboundEventDeduper } from "./inbound-dedupe.js"; import { createMatrixRoomInfoResolver } from "./room-info.js"; import { runMatrixStartupMaintenance } from "./startup.js"; @@ -136,15 +137,29 @@ export async function monitorMatrixProvider(opts: MonitorMatrixOpts = {}): Promi setActiveMatrixClient(client, auth.accountId); let cleanedUp = false; let threadBindingManager: { accountId: string; stop: () => void } | null = null; + const inboundDeduper = await createMatrixInboundEventDeduper({ + auth, + env: process.env, + }); + const inFlightRoomMessages = new Set>(); + const waitForInFlightRoomMessages = async () => { + while (inFlightRoomMessages.size > 0) { + await Promise.allSettled(Array.from(inFlightRoomMessages)); + } + }; const cleanup = async () => { if (cleanedUp) { return; } cleanedUp = true; try { + client.stopSyncWithoutPersist(); + await client.drainPendingDecryptions("matrix monitor shutdown"); + await waitForInFlightRoomMessages(); threadBindingManager?.stop(); - } finally { + await inboundDeduper.stop(); await releaseSharedClientInstance(client, "persist"); + } finally { setActiveMatrixClient(null, auth.accountId); } }; @@ -219,11 +234,19 @@ export async function monitorMatrixProvider(opts: MonitorMatrixOpts = {}): Promi startupMs, startupGraceMs, dropPreStartupMessages, + inboundDeduper, directTracker, getRoomInfo, getMemberDisplayName, needsRoomAliasesForConfig, }); + const trackRoomMessage = (roomId: string, event: Parameters[1]) => { + const task = Promise.resolve(handleRoomMessage(roomId, event)).finally(() => { + inFlightRoomMessages.delete(task); + }); + inFlightRoomMessages.add(task); + return task; + }; try { threadBindingManager = await createMatrixThreadBindingManager({ @@ -249,7 +272,7 @@ export async function monitorMatrixProvider(opts: MonitorMatrixOpts = {}): Promi warnedCryptoMissingRooms, logger, formatNativeDependencyHint: core.system.formatNativeDependencyHint, - onRoomMessage: handleRoomMessage, + onRoomMessage: trackRoomMessage, }); // Register Matrix thread bindings before the client starts syncing so threaded diff --git a/extensions/matrix/src/matrix/sdk.test.ts b/extensions/matrix/src/matrix/sdk.test.ts index 8b7330294e6..dd84a7f6eb2 100644 --- a/extensions/matrix/src/matrix/sdk.test.ts +++ b/extensions/matrix/src/matrix/sdk.test.ts @@ -684,6 +684,52 @@ describe("MatrixClient event bridge", () => { expect(delivered).toEqual(["m.room.message"]); }); + it("can drain pending decrypt retries after sync stops", async () => { + vi.useFakeTimers(); + const client = new MatrixClient("https://matrix.example.org", "token"); + const delivered: string[] = []; + + client.on("room.message", (_roomId, event) => { + delivered.push(event.type); + }); + + const encrypted = new FakeMatrixEvent({ + roomId: "!room:example.org", + eventId: "$event", + sender: "@alice:example.org", + type: "m.room.encrypted", + ts: Date.now(), + content: {}, + decryptionFailure: true, + }); + const decrypted = new FakeMatrixEvent({ + roomId: "!room:example.org", + eventId: "$event", + sender: "@alice:example.org", + type: "m.room.message", + ts: Date.now(), + content: { + msgtype: "m.text", + body: "hello", + }, + }); + + matrixJsClient.decryptEventIfNeeded = vi.fn(async () => { + encrypted.emit("decrypted", decrypted); + }); + + await client.start(); + matrixJsClient.emit("event", encrypted); + encrypted.emit("decrypted", encrypted, new Error("missing room key")); + + client.stopSyncWithoutPersist(); + await client.drainPendingDecryptions("test shutdown"); + + expect(matrixJsClient.stopClient).toHaveBeenCalledTimes(1); + expect(matrixJsClient.decryptEventIfNeeded).toHaveBeenCalledTimes(1); + expect(delivered).toEqual(["m.room.message"]); + }); + it("retries failed decryptions immediately on crypto key update signals", async () => { vi.useFakeTimers(); const client = new MatrixClient("https://matrix.example.org", "token", undefined, undefined, { diff --git a/extensions/matrix/src/matrix/sdk.ts b/extensions/matrix/src/matrix/sdk.ts index f394974106a..4fb0b53389c 100644 --- a/extensions/matrix/src/matrix/sdk.ts +++ b/extensions/matrix/src/matrix/sdk.ts @@ -365,11 +365,21 @@ export class MatrixClient { await this.startSyncSession({ bootstrapCrypto: false }); } - stop(): void { + stopSyncWithoutPersist(): void { if (this.idbPersistTimer) { clearInterval(this.idbPersistTimer); this.idbPersistTimer = null; } + this.client.stopClient(); + this.started = false; + } + + async drainPendingDecryptions(reason = "matrix client shutdown"): Promise { + await this.decryptBridge.drainPendingDecryptions(reason); + } + + stop(): void { + this.stopSyncWithoutPersist(); this.decryptBridge.stop(); // Final persist on shutdown this.syncStore?.markCleanShutdown(); @@ -380,8 +390,6 @@ export class MatrixClient { }).catch(noop), this.syncStore?.flush().catch(noop), ]).then(() => undefined); - this.client.stopClient(); - this.started = false; } async stopAndPersist(): Promise { diff --git a/extensions/matrix/src/matrix/sdk/decrypt-bridge.ts b/extensions/matrix/src/matrix/sdk/decrypt-bridge.ts index 1df9e8748bd..1ca35993e91 100644 --- a/extensions/matrix/src/matrix/sdk/decrypt-bridge.ts +++ b/extensions/matrix/src/matrix/sdk/decrypt-bridge.ts @@ -51,6 +51,8 @@ export class MatrixDecryptBridge { private readonly decryptedMessageDedupe = new Map(); private readonly decryptRetries = new Map(); private readonly failedDecryptionsNotified = new Set(); + private activeRetryRuns = 0; + private readonly retryIdleResolvers = new Set<() => void>(); private cryptoRetrySignalsBound = false; constructor( @@ -139,6 +141,22 @@ export class MatrixDecryptBridge { } } + async drainPendingDecryptions(reason: string): Promise { + for (let attempts = 0; attempts < MATRIX_DECRYPT_RETRY_MAX_ATTEMPTS; attempts += 1) { + if (this.decryptRetries.size === 0) { + return; + } + this.retryPendingNow(reason); + await this.waitForActiveRetryRunsToFinish(); + const hasPendingRetryTimers = Array.from(this.decryptRetries.values()).some( + (state) => state.timer || state.inFlight, + ); + if (!hasPendingRetryTimers) { + return; + } + } + } + private handleEncryptedEventDecrypted(params: { roomId: string; encryptedEvent: MatrixEvent; @@ -246,9 +264,12 @@ export class MatrixDecryptBridge { state.inFlight = true; state.timer = null; + this.activeRetryRuns += 1; const canDecrypt = typeof this.deps.client.decryptEventIfNeeded === "function"; if (!canDecrypt) { this.clearDecryptRetry(retryKey); + this.activeRetryRuns = Math.max(0, this.activeRetryRuns - 1); + this.resolveRetryIdleIfNeeded(); return; } @@ -260,8 +281,13 @@ export class MatrixDecryptBridge { // Retry with backoff until we hit the configured retry cap. } finally { state.inFlight = false; + this.activeRetryRuns = Math.max(0, this.activeRetryRuns - 1); + this.resolveRetryIdleIfNeeded(); } + if (this.decryptRetries.get(retryKey) !== state) { + return; + } if (isDecryptionFailure(state.event)) { this.scheduleDecryptRetry(state); return; @@ -304,4 +330,27 @@ export class MatrixDecryptBridge { this.decryptedMessageDedupe.delete(oldest); } } + + private async waitForActiveRetryRunsToFinish(): Promise { + if (this.activeRetryRuns === 0) { + return; + } + await new Promise((resolve) => { + this.retryIdleResolvers.add(resolve); + if (this.activeRetryRuns === 0) { + this.retryIdleResolvers.delete(resolve); + resolve(); + } + }); + } + + private resolveRetryIdleIfNeeded(): void { + if (this.activeRetryRuns !== 0) { + return; + } + for (const resolve of this.retryIdleResolvers) { + resolve(); + } + this.retryIdleResolvers.clear(); + } }