Matrix: dedupe replayed inbound events on restart (#50922)
Merged via squash. Prepared head SHA: 10d9770aa61d864686e4ba20fbcffb8a8dd68903 Co-authored-by: gumadeiras <5599352+gumadeiras@users.noreply.github.com> Co-authored-by: gumadeiras <5599352+gumadeiras@users.noreply.github.com> Reviewed-by: @gumadeiras
This commit is contained in:
parent
5408a3d1a4
commit
a05da76718
@ -196,6 +196,7 @@ Docs: https://docs.openclaw.ai
|
||||
- Exec/env sandbox: block build-tool JVM injection (`MAVEN_OPTS`, `SBT_OPTS`, `GRADLE_OPTS`, `ANT_OPTS`), glibc tunable exploitation (`GLIBC_TUNABLES`), and .NET dependency resolution hijack (`DOTNET_ADDITIONAL_DEPS`) from the host exec environment, and restrict Gradle init script redirect (`GRADLE_USER_HOME`) as an override-only block so user-configured Gradle homes still propagate. (#49702)
|
||||
- Plugins/Matrix: add a new Matrix plugin backed by the official `matrix-js-sdk`. If you are upgrading from the previous public Matrix plugin, follow the migration guide: https://docs.openclaw.ai/install/migrating-matrix Thanks @gumadeiras.
|
||||
- Discord/commands: switch native command deployment to Carbon reconcile by default so Discord restarts stop churning slash commands through OpenClaw’s local deploy path. (#46597) Thanks @huntharo and @thewilloftheshadow.
|
||||
- Plugins/Matrix: durably dedupe inbound room events across gateway restarts so previously handled Matrix messages are not replayed as new, while preserving clean-restart backlog delivery for unseen events. (#50922) thanks @gumadeiras
|
||||
|
||||
## 2026.3.13
|
||||
|
||||
|
||||
@ -52,16 +52,28 @@ type MatrixHandlerTestHarnessOptions = {
|
||||
resolveEnvelopeFormatOptions?: () => Record<string, never>;
|
||||
formatAgentEnvelope?: ({ body }: { body: string }) => string;
|
||||
finalizeInboundContext?: (ctx: unknown) => unknown;
|
||||
createReplyDispatcherWithTyping?: () => {
|
||||
createReplyDispatcherWithTyping?: (params?: {
|
||||
onError?: (err: unknown, info: { kind: "tool" | "block" | "final" }) => void;
|
||||
}) => {
|
||||
dispatcher: Record<string, unknown>;
|
||||
replyOptions: Record<string, unknown>;
|
||||
markDispatchIdle: () => void;
|
||||
markRunComplete: () => void;
|
||||
};
|
||||
resolveHumanDelayConfig?: () => undefined;
|
||||
dispatchReplyFromConfig?: () => Promise<{
|
||||
queuedFinal: boolean;
|
||||
counts: { final: number; block: number; tool: number };
|
||||
}>;
|
||||
withReplyDispatcher?: <T>(params: {
|
||||
dispatcher: {
|
||||
markComplete?: () => void;
|
||||
waitForIdle?: () => Promise<void>;
|
||||
};
|
||||
run: () => Promise<T>;
|
||||
onSettled?: () => void | Promise<void>;
|
||||
}) => Promise<T>;
|
||||
inboundDeduper?: MatrixMonitorHandlerParams["inboundDeduper"];
|
||||
shouldAckReaction?: () => boolean;
|
||||
enqueueSystemEvent?: (...args: unknown[]) => void;
|
||||
getRoomInfo?: MatrixMonitorHandlerParams["getRoomInfo"];
|
||||
@ -138,9 +150,32 @@ export function createMatrixHandlerTestHarness(
|
||||
dispatcher: {},
|
||||
replyOptions: {},
|
||||
markDispatchIdle: () => {},
|
||||
markRunComplete: () => {},
|
||||
})),
|
||||
resolveHumanDelayConfig: options.resolveHumanDelayConfig ?? (() => undefined),
|
||||
dispatchReplyFromConfig,
|
||||
withReplyDispatcher:
|
||||
options.withReplyDispatcher ??
|
||||
(async <T>(params: {
|
||||
dispatcher: {
|
||||
markComplete?: () => void;
|
||||
waitForIdle?: () => Promise<void>;
|
||||
};
|
||||
run: () => Promise<T>;
|
||||
onSettled?: () => void | Promise<void>;
|
||||
}) => {
|
||||
const { dispatcher, run, onSettled } = params;
|
||||
try {
|
||||
return await run();
|
||||
} finally {
|
||||
dispatcher.markComplete?.();
|
||||
try {
|
||||
await dispatcher.waitForIdle?.();
|
||||
} finally {
|
||||
await onSettled?.();
|
||||
}
|
||||
}
|
||||
}),
|
||||
},
|
||||
reactions: {
|
||||
shouldAckReaction: options.shouldAckReaction ?? (() => false),
|
||||
@ -179,6 +214,7 @@ export function createMatrixHandlerTestHarness(
|
||||
startupMs: options.startupMs ?? 0,
|
||||
startupGraceMs: options.startupGraceMs ?? 0,
|
||||
dropPreStartupMessages: options.dropPreStartupMessages ?? true,
|
||||
inboundDeduper: options.inboundDeduper,
|
||||
directTracker: {
|
||||
isDirectMessage: async () => options.isDirectMessage ?? true,
|
||||
},
|
||||
|
||||
@ -720,12 +720,36 @@ describe("matrix monitor handler pairing account scope", () => {
|
||||
dispatcher: {},
|
||||
replyOptions: {},
|
||||
markDispatchIdle: () => {},
|
||||
markRunComplete: () => {},
|
||||
}),
|
||||
resolveHumanDelayConfig: () => undefined,
|
||||
dispatchReplyFromConfig: async () => ({
|
||||
queuedFinal: true,
|
||||
counts: { final: 1, block: 0, tool: 0 },
|
||||
}),
|
||||
withReplyDispatcher: async <T>({
|
||||
dispatcher,
|
||||
run,
|
||||
onSettled,
|
||||
}: {
|
||||
dispatcher: {
|
||||
markComplete?: () => void;
|
||||
waitForIdle?: () => Promise<void>;
|
||||
};
|
||||
run: () => Promise<T>;
|
||||
onSettled?: () => void | Promise<void>;
|
||||
}) => {
|
||||
try {
|
||||
return await run();
|
||||
} finally {
|
||||
dispatcher.markComplete?.();
|
||||
try {
|
||||
await dispatcher.waitForIdle?.();
|
||||
} finally {
|
||||
await onSettled?.();
|
||||
}
|
||||
}
|
||||
},
|
||||
},
|
||||
reactions: {
|
||||
shouldAckReaction: () => false,
|
||||
@ -989,3 +1013,282 @@ describe("matrix monitor handler pairing account scope", () => {
|
||||
expect(resolveAgentRoute).toHaveBeenCalledTimes(1);
|
||||
});
|
||||
});
|
||||
|
||||
describe("matrix monitor handler durable inbound dedupe", () => {
|
||||
it("skips replayed inbound events before session recording", async () => {
|
||||
const inboundDeduper = {
|
||||
claimEvent: vi.fn(() => false),
|
||||
commitEvent: vi.fn(async () => undefined),
|
||||
releaseEvent: vi.fn(),
|
||||
};
|
||||
const { handler, recordInboundSession } = createMatrixHandlerTestHarness({
|
||||
inboundDeduper,
|
||||
dispatchReplyFromConfig: vi.fn(async () => ({
|
||||
queuedFinal: true,
|
||||
counts: { final: 1, block: 0, tool: 0 },
|
||||
})),
|
||||
});
|
||||
|
||||
await handler(
|
||||
"!room:example.org",
|
||||
createMatrixTextMessageEvent({
|
||||
eventId: "$dup",
|
||||
body: "hello",
|
||||
}),
|
||||
);
|
||||
|
||||
expect(inboundDeduper.claimEvent).toHaveBeenCalledWith({
|
||||
roomId: "!room:example.org",
|
||||
eventId: "$dup",
|
||||
});
|
||||
expect(recordInboundSession).not.toHaveBeenCalled();
|
||||
expect(inboundDeduper.commitEvent).not.toHaveBeenCalled();
|
||||
expect(inboundDeduper.releaseEvent).not.toHaveBeenCalled();
|
||||
});
|
||||
|
||||
it("commits inbound events only after queued replies finish delivering", async () => {
|
||||
const callOrder: string[] = [];
|
||||
const inboundDeduper = {
|
||||
claimEvent: vi.fn(() => {
|
||||
callOrder.push("claim");
|
||||
return true;
|
||||
}),
|
||||
commitEvent: vi.fn(async () => {
|
||||
callOrder.push("commit");
|
||||
}),
|
||||
releaseEvent: vi.fn(() => {
|
||||
callOrder.push("release");
|
||||
}),
|
||||
};
|
||||
const recordInboundSession = vi.fn(async () => {
|
||||
callOrder.push("record");
|
||||
});
|
||||
const dispatchReplyFromConfig = vi.fn(async () => {
|
||||
callOrder.push("dispatch");
|
||||
return {
|
||||
queuedFinal: true,
|
||||
counts: { final: 1, block: 0, tool: 0 },
|
||||
};
|
||||
});
|
||||
const { handler } = createMatrixHandlerTestHarness({
|
||||
inboundDeduper,
|
||||
recordInboundSession,
|
||||
dispatchReplyFromConfig,
|
||||
createReplyDispatcherWithTyping: () => ({
|
||||
dispatcher: {
|
||||
markComplete: () => {
|
||||
callOrder.push("mark-complete");
|
||||
},
|
||||
waitForIdle: async () => {
|
||||
callOrder.push("wait-for-idle");
|
||||
},
|
||||
},
|
||||
replyOptions: {},
|
||||
markDispatchIdle: () => {
|
||||
callOrder.push("dispatch-idle");
|
||||
},
|
||||
markRunComplete: () => {
|
||||
callOrder.push("run-complete");
|
||||
},
|
||||
}),
|
||||
});
|
||||
|
||||
await handler(
|
||||
"!room:example.org",
|
||||
createMatrixTextMessageEvent({
|
||||
eventId: "$commit-order",
|
||||
body: "hello",
|
||||
}),
|
||||
);
|
||||
|
||||
expect(callOrder).toEqual([
|
||||
"claim",
|
||||
"record",
|
||||
"dispatch",
|
||||
"run-complete",
|
||||
"mark-complete",
|
||||
"wait-for-idle",
|
||||
"dispatch-idle",
|
||||
"commit",
|
||||
]);
|
||||
expect(inboundDeduper.releaseEvent).not.toHaveBeenCalled();
|
||||
});
|
||||
|
||||
it("releases a claimed event when reply dispatch fails before completion", async () => {
|
||||
const inboundDeduper = {
|
||||
claimEvent: vi.fn(() => true),
|
||||
commitEvent: vi.fn(async () => undefined),
|
||||
releaseEvent: vi.fn(),
|
||||
};
|
||||
const runtime = {
|
||||
error: vi.fn(),
|
||||
};
|
||||
const { handler } = createMatrixHandlerTestHarness({
|
||||
inboundDeduper,
|
||||
runtime: runtime as never,
|
||||
recordInboundSession: vi.fn(async () => {
|
||||
throw new Error("disk failed");
|
||||
}),
|
||||
dispatchReplyFromConfig: vi.fn(async () => ({
|
||||
queuedFinal: true,
|
||||
counts: { final: 1, block: 0, tool: 0 },
|
||||
})),
|
||||
});
|
||||
|
||||
await handler(
|
||||
"!room:example.org",
|
||||
createMatrixTextMessageEvent({
|
||||
eventId: "$release-on-error",
|
||||
body: "hello",
|
||||
}),
|
||||
);
|
||||
|
||||
expect(inboundDeduper.commitEvent).not.toHaveBeenCalled();
|
||||
expect(inboundDeduper.releaseEvent).toHaveBeenCalledWith({
|
||||
roomId: "!room:example.org",
|
||||
eventId: "$release-on-error",
|
||||
});
|
||||
expect(runtime.error).toHaveBeenCalledWith(expect.stringContaining("matrix handler failed"));
|
||||
});
|
||||
|
||||
it("releases a claimed event when queued final delivery fails", async () => {
|
||||
const inboundDeduper = {
|
||||
claimEvent: vi.fn(() => true),
|
||||
commitEvent: vi.fn(async () => undefined),
|
||||
releaseEvent: vi.fn(),
|
||||
};
|
||||
const runtime = {
|
||||
error: vi.fn(),
|
||||
};
|
||||
const { handler } = createMatrixHandlerTestHarness({
|
||||
inboundDeduper,
|
||||
runtime: runtime as never,
|
||||
dispatchReplyFromConfig: vi.fn(async () => ({
|
||||
queuedFinal: true,
|
||||
counts: { final: 1, block: 0, tool: 0 },
|
||||
})),
|
||||
createReplyDispatcherWithTyping: (params) => ({
|
||||
dispatcher: {
|
||||
markComplete: () => {},
|
||||
waitForIdle: async () => {
|
||||
params?.onError?.(new Error("send failed"), { kind: "final" });
|
||||
},
|
||||
},
|
||||
replyOptions: {},
|
||||
markDispatchIdle: () => {},
|
||||
markRunComplete: () => {},
|
||||
}),
|
||||
});
|
||||
|
||||
await handler(
|
||||
"!room:example.org",
|
||||
createMatrixTextMessageEvent({
|
||||
eventId: "$release-on-final-delivery-error",
|
||||
body: "hello",
|
||||
}),
|
||||
);
|
||||
|
||||
expect(inboundDeduper.commitEvent).not.toHaveBeenCalled();
|
||||
expect(inboundDeduper.releaseEvent).toHaveBeenCalledWith({
|
||||
roomId: "!room:example.org",
|
||||
eventId: "$release-on-final-delivery-error",
|
||||
});
|
||||
expect(runtime.error).toHaveBeenCalledWith(
|
||||
expect.stringContaining("matrix final reply failed"),
|
||||
);
|
||||
});
|
||||
|
||||
it.each(["tool", "block"] as const)(
|
||||
"releases a claimed event when queued %s delivery fails and no final reply exists",
|
||||
async (kind) => {
|
||||
const inboundDeduper = {
|
||||
claimEvent: vi.fn(() => true),
|
||||
commitEvent: vi.fn(async () => undefined),
|
||||
releaseEvent: vi.fn(),
|
||||
};
|
||||
const runtime = {
|
||||
error: vi.fn(),
|
||||
};
|
||||
const { handler } = createMatrixHandlerTestHarness({
|
||||
inboundDeduper,
|
||||
runtime: runtime as never,
|
||||
dispatchReplyFromConfig: vi.fn(async () => ({
|
||||
queuedFinal: false,
|
||||
counts: {
|
||||
final: 0,
|
||||
block: kind === "block" ? 1 : 0,
|
||||
tool: kind === "tool" ? 1 : 0,
|
||||
},
|
||||
})),
|
||||
createReplyDispatcherWithTyping: (params) => ({
|
||||
dispatcher: {
|
||||
markComplete: () => {},
|
||||
waitForIdle: async () => {
|
||||
params?.onError?.(new Error("send failed"), { kind });
|
||||
},
|
||||
},
|
||||
replyOptions: {},
|
||||
markDispatchIdle: () => {},
|
||||
markRunComplete: () => {},
|
||||
}),
|
||||
});
|
||||
|
||||
await handler(
|
||||
"!room:example.org",
|
||||
createMatrixTextMessageEvent({
|
||||
eventId: `$release-on-${kind}-delivery-error`,
|
||||
body: "hello",
|
||||
}),
|
||||
);
|
||||
|
||||
expect(inboundDeduper.commitEvent).not.toHaveBeenCalled();
|
||||
expect(inboundDeduper.releaseEvent).toHaveBeenCalledWith({
|
||||
roomId: "!room:example.org",
|
||||
eventId: `$release-on-${kind}-delivery-error`,
|
||||
});
|
||||
expect(runtime.error).toHaveBeenCalledWith(
|
||||
expect.stringContaining(`matrix ${kind} reply failed`),
|
||||
);
|
||||
},
|
||||
);
|
||||
|
||||
it("commits a claimed event when dispatch completes without a final reply", async () => {
|
||||
const callOrder: string[] = [];
|
||||
const inboundDeduper = {
|
||||
claimEvent: vi.fn(() => {
|
||||
callOrder.push("claim");
|
||||
return true;
|
||||
}),
|
||||
commitEvent: vi.fn(async () => {
|
||||
callOrder.push("commit");
|
||||
}),
|
||||
releaseEvent: vi.fn(() => {
|
||||
callOrder.push("release");
|
||||
}),
|
||||
};
|
||||
const { handler } = createMatrixHandlerTestHarness({
|
||||
inboundDeduper,
|
||||
recordInboundSession: vi.fn(async () => {
|
||||
callOrder.push("record");
|
||||
}),
|
||||
dispatchReplyFromConfig: vi.fn(async () => {
|
||||
callOrder.push("dispatch");
|
||||
return {
|
||||
queuedFinal: false,
|
||||
counts: { final: 0, block: 0, tool: 0 },
|
||||
};
|
||||
}),
|
||||
});
|
||||
|
||||
await handler(
|
||||
"!room:example.org",
|
||||
createMatrixTextMessageEvent({
|
||||
eventId: "$no-final",
|
||||
body: "hello",
|
||||
}),
|
||||
);
|
||||
|
||||
expect(callOrder).toEqual(["claim", "record", "dispatch", "commit"]);
|
||||
expect(inboundDeduper.releaseEvent).not.toHaveBeenCalled();
|
||||
});
|
||||
});
|
||||
|
||||
@ -30,6 +30,7 @@ import {
|
||||
} from "../send.js";
|
||||
import { resolveMatrixMonitorAccessState } from "./access-state.js";
|
||||
import { resolveMatrixAckReactionConfig } from "./ack-config.js";
|
||||
import type { MatrixInboundEventDeduper } from "./inbound-dedupe.js";
|
||||
import { resolveMatrixLocation, type MatrixLocationPayload } from "./location.js";
|
||||
import { downloadMatrixMedia } from "./media.js";
|
||||
import { resolveMentions } from "./mentions.js";
|
||||
@ -72,6 +73,7 @@ export type MatrixMonitorHandlerParams = {
|
||||
startupMs: number;
|
||||
startupGraceMs: number;
|
||||
dropPreStartupMessages: boolean;
|
||||
inboundDeduper?: Pick<MatrixInboundEventDeduper, "claimEvent" | "commitEvent" | "releaseEvent">;
|
||||
directTracker: {
|
||||
isDirectMessage: (params: {
|
||||
roomId: string;
|
||||
@ -163,6 +165,7 @@ export function createMatrixRoomMessageHandler(params: MatrixMonitorHandlerParam
|
||||
startupMs,
|
||||
startupGraceMs,
|
||||
dropPreStartupMessages,
|
||||
inboundDeduper,
|
||||
directTracker,
|
||||
getRoomInfo,
|
||||
getMemberDisplayName,
|
||||
@ -219,6 +222,8 @@ export function createMatrixRoomMessageHandler(params: MatrixMonitorHandlerParam
|
||||
};
|
||||
|
||||
return async (roomId: string, event: MatrixRawEvent) => {
|
||||
const eventId = typeof event.event_id === "string" ? event.event_id.trim() : "";
|
||||
let claimedInboundEvent = false;
|
||||
try {
|
||||
const eventType = event.type;
|
||||
if (eventType === EventType.RoomMessageEncrypted) {
|
||||
@ -256,6 +261,13 @@ export function createMatrixRoomMessageHandler(params: MatrixMonitorHandlerParam
|
||||
}
|
||||
const eventTs = event.origin_server_ts;
|
||||
const eventAge = event.unsigned?.age;
|
||||
const commitInboundEventIfClaimed = async () => {
|
||||
if (!claimedInboundEvent || !inboundDeduper || !eventId) {
|
||||
return;
|
||||
}
|
||||
await inboundDeduper.commitEvent({ roomId, eventId });
|
||||
claimedInboundEvent = false;
|
||||
};
|
||||
if (dropPreStartupMessages) {
|
||||
if (typeof eventTs === "number" && eventTs < startupMs - startupGraceMs) {
|
||||
return;
|
||||
@ -293,6 +305,13 @@ export function createMatrixRoomMessageHandler(params: MatrixMonitorHandlerParam
|
||||
return;
|
||||
}
|
||||
}
|
||||
if (eventId && inboundDeduper) {
|
||||
claimedInboundEvent = inboundDeduper.claimEvent({ roomId, eventId });
|
||||
if (!claimedInboundEvent) {
|
||||
logVerboseMessage(`matrix: skip duplicate inbound event room=${roomId} id=${eventId}`);
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
const isDirectMessage = await directTracker.isDirectMessage({
|
||||
roomId,
|
||||
@ -302,6 +321,7 @@ export function createMatrixRoomMessageHandler(params: MatrixMonitorHandlerParam
|
||||
const isRoom = !isDirectMessage;
|
||||
|
||||
if (isRoom && groupPolicy === "disabled") {
|
||||
await commitInboundEventIfClaimed();
|
||||
return;
|
||||
}
|
||||
|
||||
@ -332,20 +352,24 @@ export function createMatrixRoomMessageHandler(params: MatrixMonitorHandlerParam
|
||||
logVerboseMessage(
|
||||
`matrix: drop configured bot sender=${senderId} (allowBots=false${isDirectMessage ? "" : `, ${roomMatchMeta}`})`,
|
||||
);
|
||||
await commitInboundEventIfClaimed();
|
||||
return;
|
||||
}
|
||||
|
||||
if (isRoom && roomConfig && !roomConfigInfo?.allowed) {
|
||||
logVerboseMessage(`matrix: room disabled room=${roomId} (${roomMatchMeta})`);
|
||||
await commitInboundEventIfClaimed();
|
||||
return;
|
||||
}
|
||||
if (isRoom && groupPolicy === "allowlist") {
|
||||
if (!roomConfigInfo?.allowlistConfigured) {
|
||||
logVerboseMessage(`matrix: drop room message (no allowlist, ${roomMatchMeta})`);
|
||||
await commitInboundEventIfClaimed();
|
||||
return;
|
||||
}
|
||||
if (!roomConfig) {
|
||||
logVerboseMessage(`matrix: drop room message (not in allowlist, ${roomMatchMeta})`);
|
||||
await commitInboundEventIfClaimed();
|
||||
return;
|
||||
}
|
||||
}
|
||||
@ -378,6 +402,7 @@ export function createMatrixRoomMessageHandler(params: MatrixMonitorHandlerParam
|
||||
|
||||
if (isDirectMessage) {
|
||||
if (!dmEnabled || dmPolicy === "disabled") {
|
||||
await commitInboundEventIfClaimed();
|
||||
return;
|
||||
}
|
||||
if (dmPolicy !== "open") {
|
||||
@ -414,19 +439,23 @@ export function createMatrixRoomMessageHandler(params: MatrixMonitorHandlerParam
|
||||
accountId,
|
||||
},
|
||||
);
|
||||
await commitInboundEventIfClaimed();
|
||||
} catch (err) {
|
||||
logVerboseMessage(`matrix pairing reply failed for ${senderId}: ${String(err)}`);
|
||||
return;
|
||||
}
|
||||
} else {
|
||||
logVerboseMessage(
|
||||
`matrix pairing reminder suppressed sender=${senderId} (cooldown)`,
|
||||
);
|
||||
await commitInboundEventIfClaimed();
|
||||
}
|
||||
}
|
||||
if (isReactionEvent || dmPolicy !== "pairing") {
|
||||
logVerboseMessage(
|
||||
`matrix: blocked ${isReactionEvent ? "reaction" : "dm"} sender ${senderId} (dmPolicy=${dmPolicy}, ${allowMatchMeta})`,
|
||||
);
|
||||
await commitInboundEventIfClaimed();
|
||||
}
|
||||
return;
|
||||
}
|
||||
@ -439,6 +468,7 @@ export function createMatrixRoomMessageHandler(params: MatrixMonitorHandlerParam
|
||||
roomUserMatch,
|
||||
)})`,
|
||||
);
|
||||
await commitInboundEventIfClaimed();
|
||||
return;
|
||||
}
|
||||
if (
|
||||
@ -453,6 +483,7 @@ export function createMatrixRoomMessageHandler(params: MatrixMonitorHandlerParam
|
||||
groupAllowMatch,
|
||||
)})`,
|
||||
);
|
||||
await commitInboundEventIfClaimed();
|
||||
return;
|
||||
}
|
||||
}
|
||||
@ -475,6 +506,7 @@ export function createMatrixRoomMessageHandler(params: MatrixMonitorHandlerParam
|
||||
isDirectMessage,
|
||||
logVerboseMessage,
|
||||
});
|
||||
await commitInboundEventIfClaimed();
|
||||
return;
|
||||
}
|
||||
|
||||
@ -491,6 +523,7 @@ export function createMatrixRoomMessageHandler(params: MatrixMonitorHandlerParam
|
||||
: undefined;
|
||||
const mediaUrl = contentUrl ?? contentFile?.url;
|
||||
if (!mentionPrecheckText && !mediaUrl && !isPollEvent) {
|
||||
await commitInboundEventIfClaimed();
|
||||
return;
|
||||
}
|
||||
|
||||
@ -509,6 +542,7 @@ export function createMatrixRoomMessageHandler(params: MatrixMonitorHandlerParam
|
||||
logVerboseMessage(
|
||||
`matrix: drop configured bot sender=${senderId} (allowBots=mentions, missing mention, ${roomMatchMeta})`,
|
||||
);
|
||||
await commitInboundEventIfClaimed();
|
||||
return;
|
||||
}
|
||||
const allowTextCommands = core.channel.commands.shouldHandleTextCommands({
|
||||
@ -534,6 +568,7 @@ export function createMatrixRoomMessageHandler(params: MatrixMonitorHandlerParam
|
||||
reason: "control command (unauthorized)",
|
||||
target: senderId,
|
||||
});
|
||||
await commitInboundEventIfClaimed();
|
||||
return;
|
||||
}
|
||||
const shouldRequireMention = isRoom
|
||||
@ -556,6 +591,7 @@ export function createMatrixRoomMessageHandler(params: MatrixMonitorHandlerParam
|
||||
const canDetectMention = mentionRegexes.length > 0 || hasExplicitMention;
|
||||
if (isRoom && shouldRequireMention && !wasMentioned && !shouldBypassMention) {
|
||||
logger.info("skipping room message", { roomId, reason: "no-mention" });
|
||||
await commitInboundEventIfClaimed();
|
||||
return;
|
||||
}
|
||||
|
||||
@ -631,6 +667,7 @@ export function createMatrixRoomMessageHandler(params: MatrixMonitorHandlerParam
|
||||
mediaDownloadFailed,
|
||||
});
|
||||
if (!bodyText) {
|
||||
await commitInboundEventIfClaimed();
|
||||
return;
|
||||
}
|
||||
const senderName = await getSenderName();
|
||||
@ -799,6 +836,8 @@ export function createMatrixRoomMessageHandler(params: MatrixMonitorHandlerParam
|
||||
accountId: route.accountId,
|
||||
});
|
||||
const mediaLocalRoots = getAgentScopedMediaLocalRoots(cfg, route.agentId);
|
||||
let finalReplyDeliveryFailed = false;
|
||||
let nonFinalReplyDeliveryFailed = false;
|
||||
const { onModelSelected, ...prefixOptions } = createReplyPrefixOptions({
|
||||
cfg,
|
||||
agentId: route.agentId,
|
||||
@ -827,7 +866,7 @@ export function createMatrixRoomMessageHandler(params: MatrixMonitorHandlerParam
|
||||
});
|
||||
},
|
||||
});
|
||||
const { dispatcher, replyOptions, markDispatchIdle } =
|
||||
const { dispatcher, replyOptions, markDispatchIdle, markRunComplete } =
|
||||
core.channel.reply.createReplyDispatcherWithTyping({
|
||||
...prefixOptions,
|
||||
humanDelay: core.channel.reply.resolveHumanDelayConfig(cfg, route.agentId),
|
||||
@ -847,13 +886,25 @@ export function createMatrixRoomMessageHandler(params: MatrixMonitorHandlerParam
|
||||
});
|
||||
},
|
||||
onError: (err: unknown, info: { kind: "tool" | "block" | "final" }) => {
|
||||
if (info.kind === "final") {
|
||||
finalReplyDeliveryFailed = true;
|
||||
} else {
|
||||
nonFinalReplyDeliveryFailed = true;
|
||||
}
|
||||
runtime.error?.(`matrix ${info.kind} reply failed: ${String(err)}`);
|
||||
},
|
||||
onReplyStart: typingCallbacks.onReplyStart,
|
||||
onIdle: typingCallbacks.onIdle,
|
||||
});
|
||||
|
||||
const { queuedFinal, counts } = await core.channel.reply.dispatchReplyFromConfig({
|
||||
const { queuedFinal, counts } = await core.channel.reply.withReplyDispatcher({
|
||||
dispatcher,
|
||||
onSettled: () => {
|
||||
markDispatchIdle();
|
||||
},
|
||||
run: async () => {
|
||||
try {
|
||||
return await core.channel.reply.dispatchReplyFromConfig({
|
||||
ctx: ctxPayload,
|
||||
cfg,
|
||||
dispatcher,
|
||||
@ -863,16 +914,38 @@ export function createMatrixRoomMessageHandler(params: MatrixMonitorHandlerParam
|
||||
onModelSelected,
|
||||
},
|
||||
});
|
||||
markDispatchIdle();
|
||||
} finally {
|
||||
markRunComplete();
|
||||
}
|
||||
},
|
||||
});
|
||||
if (finalReplyDeliveryFailed) {
|
||||
logVerboseMessage(
|
||||
`matrix: final reply delivery failed room=${roomId} id=${messageId}; leaving event uncommitted`,
|
||||
);
|
||||
return;
|
||||
}
|
||||
if (!queuedFinal && nonFinalReplyDeliveryFailed) {
|
||||
logVerboseMessage(
|
||||
`matrix: non-final reply delivery failed room=${roomId} id=${messageId}; leaving event uncommitted`,
|
||||
);
|
||||
return;
|
||||
}
|
||||
if (!queuedFinal) {
|
||||
await commitInboundEventIfClaimed();
|
||||
return;
|
||||
}
|
||||
const finalCount = counts.final;
|
||||
logVerboseMessage(
|
||||
`matrix: delivered ${finalCount} reply${finalCount === 1 ? "" : "ies"} to ${replyTarget}`,
|
||||
);
|
||||
await commitInboundEventIfClaimed();
|
||||
} catch (err) {
|
||||
runtime.error?.(`matrix handler failed: ${String(err)}`);
|
||||
} finally {
|
||||
if (claimedInboundEvent && inboundDeduper && eventId) {
|
||||
inboundDeduper.releaseEvent({ roomId, eventId });
|
||||
}
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
146
extensions/matrix/src/matrix/monitor/inbound-dedupe.test.ts
Normal file
146
extensions/matrix/src/matrix/monitor/inbound-dedupe.test.ts
Normal file
@ -0,0 +1,146 @@
|
||||
import fs from "node:fs";
|
||||
import os from "node:os";
|
||||
import path from "node:path";
|
||||
import { afterEach, describe, expect, it, vi } from "vitest";
|
||||
import { createMatrixInboundEventDeduper } from "./inbound-dedupe.js";
|
||||
|
||||
describe("Matrix inbound event dedupe", () => {
|
||||
const tempDirs: string[] = [];
|
||||
|
||||
afterEach(() => {
|
||||
vi.restoreAllMocks();
|
||||
vi.useRealTimers();
|
||||
for (const dir of tempDirs.splice(0)) {
|
||||
fs.rmSync(dir, { recursive: true, force: true });
|
||||
}
|
||||
});
|
||||
|
||||
function createStoragePath(): string {
|
||||
const dir = fs.mkdtempSync(path.join(os.tmpdir(), "openclaw-matrix-inbound-dedupe-"));
|
||||
tempDirs.push(dir);
|
||||
return path.join(dir, "inbound-dedupe.json");
|
||||
}
|
||||
|
||||
const auth = {
|
||||
accountId: "ops",
|
||||
homeserver: "https://matrix.example.org",
|
||||
userId: "@bot:example.org",
|
||||
accessToken: "token",
|
||||
deviceId: "DEVICE",
|
||||
} as const;
|
||||
|
||||
it("persists committed events across restarts", async () => {
|
||||
const storagePath = createStoragePath();
|
||||
const first = await createMatrixInboundEventDeduper({
|
||||
auth: auth as never,
|
||||
storagePath,
|
||||
});
|
||||
|
||||
expect(first.claimEvent({ roomId: "!room:example.org", eventId: "$event-1" })).toBe(true);
|
||||
await first.commitEvent({
|
||||
roomId: "!room:example.org",
|
||||
eventId: "$event-1",
|
||||
});
|
||||
await first.stop();
|
||||
|
||||
const second = await createMatrixInboundEventDeduper({
|
||||
auth: auth as never,
|
||||
storagePath,
|
||||
});
|
||||
expect(second.claimEvent({ roomId: "!room:example.org", eventId: "$event-1" })).toBe(false);
|
||||
});
|
||||
|
||||
it("does not persist released pending claims", async () => {
|
||||
const storagePath = createStoragePath();
|
||||
const first = await createMatrixInboundEventDeduper({
|
||||
auth: auth as never,
|
||||
storagePath,
|
||||
});
|
||||
|
||||
expect(first.claimEvent({ roomId: "!room:example.org", eventId: "$event-2" })).toBe(true);
|
||||
first.releaseEvent({ roomId: "!room:example.org", eventId: "$event-2" });
|
||||
await first.stop();
|
||||
|
||||
const second = await createMatrixInboundEventDeduper({
|
||||
auth: auth as never,
|
||||
storagePath,
|
||||
});
|
||||
expect(second.claimEvent({ roomId: "!room:example.org", eventId: "$event-2" })).toBe(true);
|
||||
});
|
||||
|
||||
it("prunes expired and overflowed entries on load", async () => {
|
||||
const storagePath = createStoragePath();
|
||||
fs.writeFileSync(
|
||||
storagePath,
|
||||
JSON.stringify({
|
||||
version: 1,
|
||||
entries: [
|
||||
{ key: "!room:example.org|$old", ts: 10 },
|
||||
{ key: "!room:example.org|$keep-1", ts: 90 },
|
||||
{ key: "!room:example.org|$keep-2", ts: 95 },
|
||||
{ key: "!room:example.org|$keep-3", ts: 100 },
|
||||
],
|
||||
}),
|
||||
"utf8",
|
||||
);
|
||||
|
||||
const deduper = await createMatrixInboundEventDeduper({
|
||||
auth: auth as never,
|
||||
storagePath,
|
||||
ttlMs: 20,
|
||||
maxEntries: 2,
|
||||
nowMs: () => 100,
|
||||
});
|
||||
|
||||
expect(deduper.claimEvent({ roomId: "!room:example.org", eventId: "$old" })).toBe(true);
|
||||
expect(deduper.claimEvent({ roomId: "!room:example.org", eventId: "$keep-1" })).toBe(true);
|
||||
expect(deduper.claimEvent({ roomId: "!room:example.org", eventId: "$keep-2" })).toBe(false);
|
||||
expect(deduper.claimEvent({ roomId: "!room:example.org", eventId: "$keep-3" })).toBe(false);
|
||||
});
|
||||
|
||||
it("retains replayed backlog events based on processing time", async () => {
|
||||
const storagePath = createStoragePath();
|
||||
let now = 100;
|
||||
const first = await createMatrixInboundEventDeduper({
|
||||
auth: auth as never,
|
||||
storagePath,
|
||||
ttlMs: 20,
|
||||
nowMs: () => now,
|
||||
});
|
||||
|
||||
expect(first.claimEvent({ roomId: "!room:example.org", eventId: "$backlog" })).toBe(true);
|
||||
await first.commitEvent({
|
||||
roomId: "!room:example.org",
|
||||
eventId: "$backlog",
|
||||
});
|
||||
await first.stop();
|
||||
|
||||
now = 110;
|
||||
const second = await createMatrixInboundEventDeduper({
|
||||
auth: auth as never,
|
||||
storagePath,
|
||||
ttlMs: 20,
|
||||
nowMs: () => now,
|
||||
});
|
||||
expect(second.claimEvent({ roomId: "!room:example.org", eventId: "$backlog" })).toBe(false);
|
||||
});
|
||||
|
||||
it("treats stop persistence failures as best-effort cleanup", async () => {
|
||||
const blockingPath = createStoragePath();
|
||||
fs.writeFileSync(blockingPath, "blocking file", "utf8");
|
||||
const deduper = await createMatrixInboundEventDeduper({
|
||||
auth: auth as never,
|
||||
storagePath: path.join(blockingPath, "nested", "inbound-dedupe.json"),
|
||||
});
|
||||
|
||||
expect(deduper.claimEvent({ roomId: "!room:example.org", eventId: "$persist-fail" })).toBe(
|
||||
true,
|
||||
);
|
||||
await deduper.commitEvent({
|
||||
roomId: "!room:example.org",
|
||||
eventId: "$persist-fail",
|
||||
});
|
||||
|
||||
await expect(deduper.stop()).resolves.toBeUndefined();
|
||||
});
|
||||
});
|
||||
285
extensions/matrix/src/matrix/monitor/inbound-dedupe.ts
Normal file
285
extensions/matrix/src/matrix/monitor/inbound-dedupe.ts
Normal file
@ -0,0 +1,285 @@
|
||||
import path from "node:path";
|
||||
import { readJsonFileWithFallback, writeJsonFileAtomically } from "../../runtime-api.js";
|
||||
import { resolveMatrixStoragePaths } from "../client/storage.js";
|
||||
import type { MatrixAuth } from "../client/types.js";
|
||||
import { LogService } from "../sdk/logger.js";
|
||||
|
||||
const INBOUND_DEDUPE_FILENAME = "inbound-dedupe.json";
|
||||
const STORE_VERSION = 1;
|
||||
const DEFAULT_MAX_ENTRIES = 20_000;
|
||||
const DEFAULT_TTL_MS = 30 * 24 * 60 * 60 * 1000;
|
||||
const PERSIST_DEBOUNCE_MS = 250;
|
||||
|
||||
type StoredMatrixInboundDedupeEntry = {
|
||||
key: string;
|
||||
ts: number;
|
||||
};
|
||||
|
||||
type StoredMatrixInboundDedupeState = {
|
||||
version: number;
|
||||
entries: StoredMatrixInboundDedupeEntry[];
|
||||
};
|
||||
|
||||
export type MatrixInboundEventDeduper = {
|
||||
claimEvent: (params: { roomId: string; eventId: string }) => boolean;
|
||||
commitEvent: (params: { roomId: string; eventId: string }) => Promise<void>;
|
||||
releaseEvent: (params: { roomId: string; eventId: string }) => void;
|
||||
flush: () => Promise<void>;
|
||||
stop: () => Promise<void>;
|
||||
};
|
||||
|
||||
function createAsyncLock() {
|
||||
let lock: Promise<void> = Promise.resolve();
|
||||
return async function withLock<T>(fn: () => Promise<T>): Promise<T> {
|
||||
const previous = lock;
|
||||
let release: (() => void) | undefined;
|
||||
lock = new Promise<void>((resolve) => {
|
||||
release = resolve;
|
||||
});
|
||||
await previous;
|
||||
try {
|
||||
return await fn();
|
||||
} finally {
|
||||
release?.();
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
function normalizeEventPart(value: string): string {
|
||||
return value.trim();
|
||||
}
|
||||
|
||||
function buildEventKey(params: { roomId: string; eventId: string }): string {
|
||||
const roomId = normalizeEventPart(params.roomId);
|
||||
const eventId = normalizeEventPart(params.eventId);
|
||||
return roomId && eventId ? `${roomId}|${eventId}` : "";
|
||||
}
|
||||
|
||||
function resolveInboundDedupeStatePath(params: {
|
||||
auth: MatrixAuth;
|
||||
env?: NodeJS.ProcessEnv;
|
||||
stateDir?: string;
|
||||
}): string {
|
||||
const storagePaths = resolveMatrixStoragePaths({
|
||||
homeserver: params.auth.homeserver,
|
||||
userId: params.auth.userId,
|
||||
accessToken: params.auth.accessToken,
|
||||
accountId: params.auth.accountId,
|
||||
deviceId: params.auth.deviceId,
|
||||
env: params.env,
|
||||
stateDir: params.stateDir,
|
||||
});
|
||||
return path.join(storagePaths.rootDir, INBOUND_DEDUPE_FILENAME);
|
||||
}
|
||||
|
||||
function normalizeTimestamp(raw: unknown): number | null {
|
||||
if (typeof raw !== "number" || !Number.isFinite(raw)) {
|
||||
return null;
|
||||
}
|
||||
return Math.max(0, Math.floor(raw));
|
||||
}
|
||||
|
||||
function pruneSeenEvents(params: {
|
||||
seen: Map<string, number>;
|
||||
ttlMs: number;
|
||||
maxEntries: number;
|
||||
nowMs: number;
|
||||
}) {
|
||||
const { seen, ttlMs, maxEntries, nowMs } = params;
|
||||
if (ttlMs > 0) {
|
||||
const cutoff = nowMs - ttlMs;
|
||||
for (const [key, ts] of seen) {
|
||||
if (ts < cutoff) {
|
||||
seen.delete(key);
|
||||
}
|
||||
}
|
||||
}
|
||||
const max = Math.max(0, Math.floor(maxEntries));
|
||||
if (max <= 0) {
|
||||
seen.clear();
|
||||
return;
|
||||
}
|
||||
while (seen.size > max) {
|
||||
const oldestKey = seen.keys().next().value;
|
||||
if (typeof oldestKey !== "string") {
|
||||
break;
|
||||
}
|
||||
seen.delete(oldestKey);
|
||||
}
|
||||
}
|
||||
|
||||
function toStoredState(params: {
|
||||
seen: Map<string, number>;
|
||||
ttlMs: number;
|
||||
maxEntries: number;
|
||||
nowMs: number;
|
||||
}): StoredMatrixInboundDedupeState {
|
||||
pruneSeenEvents(params);
|
||||
return {
|
||||
version: STORE_VERSION,
|
||||
entries: Array.from(params.seen.entries()).map(([key, ts]) => ({ key, ts })),
|
||||
};
|
||||
}
|
||||
|
||||
async function readStoredState(
|
||||
storagePath: string,
|
||||
): Promise<StoredMatrixInboundDedupeState | null> {
|
||||
const { value } = await readJsonFileWithFallback<StoredMatrixInboundDedupeState | null>(
|
||||
storagePath,
|
||||
null,
|
||||
);
|
||||
if (value?.version !== STORE_VERSION || !Array.isArray(value.entries)) {
|
||||
return null;
|
||||
}
|
||||
return value;
|
||||
}
|
||||
|
||||
export async function createMatrixInboundEventDeduper(params: {
|
||||
auth: MatrixAuth;
|
||||
env?: NodeJS.ProcessEnv;
|
||||
stateDir?: string;
|
||||
storagePath?: string;
|
||||
ttlMs?: number;
|
||||
maxEntries?: number;
|
||||
nowMs?: () => number;
|
||||
}): Promise<MatrixInboundEventDeduper> {
|
||||
const nowMs = params.nowMs ?? (() => Date.now());
|
||||
const ttlMs =
|
||||
typeof params.ttlMs === "number" && Number.isFinite(params.ttlMs)
|
||||
? Math.max(0, Math.floor(params.ttlMs))
|
||||
: DEFAULT_TTL_MS;
|
||||
const maxEntries =
|
||||
typeof params.maxEntries === "number" && Number.isFinite(params.maxEntries)
|
||||
? Math.max(0, Math.floor(params.maxEntries))
|
||||
: DEFAULT_MAX_ENTRIES;
|
||||
const storagePath =
|
||||
params.storagePath ??
|
||||
resolveInboundDedupeStatePath({
|
||||
auth: params.auth,
|
||||
env: params.env,
|
||||
stateDir: params.stateDir,
|
||||
});
|
||||
|
||||
const seen = new Map<string, number>();
|
||||
const pending = new Set<string>();
|
||||
const persistLock = createAsyncLock();
|
||||
|
||||
try {
|
||||
const stored = await readStoredState(storagePath);
|
||||
for (const entry of stored?.entries ?? []) {
|
||||
if (!entry || typeof entry.key !== "string") {
|
||||
continue;
|
||||
}
|
||||
const key = entry.key.trim();
|
||||
const ts = normalizeTimestamp(entry.ts);
|
||||
if (!key || ts === null) {
|
||||
continue;
|
||||
}
|
||||
seen.set(key, ts);
|
||||
}
|
||||
pruneSeenEvents({ seen, ttlMs, maxEntries, nowMs: nowMs() });
|
||||
} catch (err) {
|
||||
LogService.warn("MatrixInboundDedupe", "Failed loading Matrix inbound dedupe store:", err);
|
||||
}
|
||||
|
||||
let dirty = false;
|
||||
let persistTimer: NodeJS.Timeout | null = null;
|
||||
let persistPromise: Promise<void> | null = null;
|
||||
|
||||
const persist = async () => {
|
||||
dirty = false;
|
||||
const payload = toStoredState({
|
||||
seen,
|
||||
ttlMs,
|
||||
maxEntries,
|
||||
nowMs: nowMs(),
|
||||
});
|
||||
try {
|
||||
await persistLock(async () => {
|
||||
await writeJsonFileAtomically(storagePath, payload);
|
||||
});
|
||||
} catch (err) {
|
||||
dirty = true;
|
||||
throw err;
|
||||
}
|
||||
};
|
||||
|
||||
const flush = async (): Promise<void> => {
|
||||
if (persistTimer) {
|
||||
clearTimeout(persistTimer);
|
||||
persistTimer = null;
|
||||
}
|
||||
while (dirty || persistPromise) {
|
||||
if (dirty && !persistPromise) {
|
||||
persistPromise = persist().finally(() => {
|
||||
persistPromise = null;
|
||||
});
|
||||
}
|
||||
await persistPromise;
|
||||
}
|
||||
};
|
||||
|
||||
const schedulePersist = () => {
|
||||
dirty = true;
|
||||
if (persistTimer) {
|
||||
return;
|
||||
}
|
||||
persistTimer = setTimeout(() => {
|
||||
persistTimer = null;
|
||||
void flush().catch((err) => {
|
||||
LogService.warn(
|
||||
"MatrixInboundDedupe",
|
||||
"Failed persisting Matrix inbound dedupe store:",
|
||||
err,
|
||||
);
|
||||
});
|
||||
}, PERSIST_DEBOUNCE_MS);
|
||||
persistTimer.unref?.();
|
||||
};
|
||||
|
||||
return {
|
||||
claimEvent: ({ roomId, eventId }) => {
|
||||
const key = buildEventKey({ roomId, eventId });
|
||||
if (!key) {
|
||||
return true;
|
||||
}
|
||||
pruneSeenEvents({ seen, ttlMs, maxEntries, nowMs: nowMs() });
|
||||
if (seen.has(key) || pending.has(key)) {
|
||||
return false;
|
||||
}
|
||||
pending.add(key);
|
||||
return true;
|
||||
},
|
||||
commitEvent: async ({ roomId, eventId }) => {
|
||||
const key = buildEventKey({ roomId, eventId });
|
||||
if (!key) {
|
||||
return;
|
||||
}
|
||||
pending.delete(key);
|
||||
const ts = nowMs();
|
||||
seen.delete(key);
|
||||
seen.set(key, ts);
|
||||
pruneSeenEvents({ seen, ttlMs, maxEntries, nowMs: nowMs() });
|
||||
schedulePersist();
|
||||
},
|
||||
releaseEvent: ({ roomId, eventId }) => {
|
||||
const key = buildEventKey({ roomId, eventId });
|
||||
if (!key) {
|
||||
return;
|
||||
}
|
||||
pending.delete(key);
|
||||
},
|
||||
flush,
|
||||
stop: async () => {
|
||||
try {
|
||||
await flush();
|
||||
} catch (err) {
|
||||
LogService.warn(
|
||||
"MatrixInboundDedupe",
|
||||
"Failed to flush Matrix inbound dedupe store during stop():",
|
||||
err,
|
||||
);
|
||||
}
|
||||
},
|
||||
};
|
||||
}
|
||||
@ -5,9 +5,18 @@ const hoisted = vi.hoisted(() => {
|
||||
const state = {
|
||||
startClientError: null as Error | null,
|
||||
};
|
||||
const inboundDeduper = {
|
||||
claimEvent: vi.fn(() => true),
|
||||
commitEvent: vi.fn(async () => undefined),
|
||||
releaseEvent: vi.fn(),
|
||||
flush: vi.fn(async () => undefined),
|
||||
stop: vi.fn(async () => undefined),
|
||||
};
|
||||
const client = {
|
||||
id: "matrix-client",
|
||||
hasPersistedSyncState: vi.fn(() => false),
|
||||
stopSyncWithoutPersist: vi.fn(),
|
||||
drainPendingDecryptions: vi.fn(async () => undefined),
|
||||
};
|
||||
const createMatrixRoomMessageHandler = vi.fn(() => vi.fn());
|
||||
const resolveTextChunkLimit = vi.fn<
|
||||
@ -26,7 +35,9 @@ const hoisted = vi.hoisted(() => {
|
||||
callOrder,
|
||||
client,
|
||||
createMatrixRoomMessageHandler,
|
||||
inboundDeduper,
|
||||
logger,
|
||||
registeredOnRoomMessage: null as null | ((roomId: string, event: unknown) => Promise<void>),
|
||||
releaseSharedClientInstance,
|
||||
resolveTextChunkLimit,
|
||||
setActiveMatrixClient,
|
||||
@ -181,15 +192,22 @@ vi.mock("./direct.js", () => ({
|
||||
}));
|
||||
|
||||
vi.mock("./events.js", () => ({
|
||||
registerMatrixMonitorEvents: vi.fn(() => {
|
||||
registerMatrixMonitorEvents: vi.fn(
|
||||
(params: { onRoomMessage: (roomId: string, event: unknown) => Promise<void> }) => {
|
||||
hoisted.callOrder.push("register-events");
|
||||
}),
|
||||
hoisted.registeredOnRoomMessage = params.onRoomMessage;
|
||||
},
|
||||
),
|
||||
}));
|
||||
|
||||
vi.mock("./handler.js", () => ({
|
||||
createMatrixRoomMessageHandler: hoisted.createMatrixRoomMessageHandler,
|
||||
}));
|
||||
|
||||
vi.mock("./inbound-dedupe.js", () => ({
|
||||
createMatrixInboundEventDeduper: vi.fn(async () => hoisted.inboundDeduper),
|
||||
}));
|
||||
|
||||
vi.mock("./legacy-crypto-restore.js", () => ({
|
||||
maybeRestoreLegacyMatrixBackup: vi.fn(),
|
||||
}));
|
||||
@ -214,9 +232,17 @@ describe("monitorMatrixProvider", () => {
|
||||
hoisted.state.startClientError = null;
|
||||
hoisted.resolveTextChunkLimit.mockReset().mockReturnValue(4000);
|
||||
hoisted.releaseSharedClientInstance.mockReset().mockResolvedValue(true);
|
||||
hoisted.registeredOnRoomMessage = null;
|
||||
hoisted.setActiveMatrixClient.mockReset();
|
||||
hoisted.stopThreadBindingManager.mockReset();
|
||||
hoisted.client.hasPersistedSyncState.mockReset().mockReturnValue(false);
|
||||
hoisted.client.stopSyncWithoutPersist.mockReset();
|
||||
hoisted.client.drainPendingDecryptions.mockReset().mockResolvedValue(undefined);
|
||||
hoisted.inboundDeduper.claimEvent.mockReset().mockReturnValue(true);
|
||||
hoisted.inboundDeduper.commitEvent.mockReset().mockResolvedValue(undefined);
|
||||
hoisted.inboundDeduper.releaseEvent.mockReset();
|
||||
hoisted.inboundDeduper.flush.mockReset().mockResolvedValue(undefined);
|
||||
hoisted.inboundDeduper.stop.mockReset().mockResolvedValue(undefined);
|
||||
hoisted.createMatrixRoomMessageHandler.mockReset().mockReturnValue(vi.fn());
|
||||
Object.values(hoisted.logger).forEach((mock) => mock.mockReset());
|
||||
});
|
||||
@ -278,4 +304,77 @@ describe("monitorMatrixProvider", () => {
|
||||
}),
|
||||
);
|
||||
});
|
||||
|
||||
it("stops sync, drains decryptions, then waits for in-flight handlers before persisting", async () => {
|
||||
const { monitorMatrixProvider } = await import("./index.js");
|
||||
const abortController = new AbortController();
|
||||
let resolveHandler: (() => void) | null = null;
|
||||
|
||||
hoisted.createMatrixRoomMessageHandler.mockReturnValue(
|
||||
vi.fn(() => {
|
||||
hoisted.callOrder.push("handler-start");
|
||||
return new Promise<void>((resolve) => {
|
||||
resolveHandler = () => {
|
||||
hoisted.callOrder.push("handler-done");
|
||||
resolve();
|
||||
};
|
||||
});
|
||||
}),
|
||||
);
|
||||
hoisted.client.stopSyncWithoutPersist.mockImplementation(() => {
|
||||
hoisted.callOrder.push("pause-client");
|
||||
});
|
||||
hoisted.client.drainPendingDecryptions.mockImplementation(async () => {
|
||||
hoisted.callOrder.push("drain-decrypts");
|
||||
});
|
||||
hoisted.stopThreadBindingManager.mockImplementation(() => {
|
||||
hoisted.callOrder.push("stop-manager");
|
||||
});
|
||||
hoisted.releaseSharedClientInstance.mockImplementation(async () => {
|
||||
hoisted.callOrder.push("release-client");
|
||||
return true;
|
||||
});
|
||||
hoisted.inboundDeduper.stop.mockImplementation(async () => {
|
||||
hoisted.callOrder.push("stop-deduper");
|
||||
});
|
||||
|
||||
const monitorPromise = monitorMatrixProvider({ abortSignal: abortController.signal });
|
||||
await vi.waitFor(() => {
|
||||
expect(hoisted.callOrder).toContain("start-client");
|
||||
});
|
||||
const onRoomMessage = hoisted.registeredOnRoomMessage;
|
||||
if (!onRoomMessage) {
|
||||
throw new Error("expected room message handler to be registered");
|
||||
}
|
||||
|
||||
const roomMessagePromise = onRoomMessage("!room:example.org", { event_id: "$event" });
|
||||
abortController.abort();
|
||||
await vi.waitFor(() => {
|
||||
expect(hoisted.callOrder).toContain("pause-client");
|
||||
});
|
||||
expect(hoisted.callOrder).not.toContain("stop-deduper");
|
||||
|
||||
if (resolveHandler === null) {
|
||||
throw new Error("expected in-flight handler to be pending");
|
||||
}
|
||||
(resolveHandler as () => void)();
|
||||
await roomMessagePromise;
|
||||
await monitorPromise;
|
||||
|
||||
expect(hoisted.callOrder.indexOf("pause-client")).toBeLessThan(
|
||||
hoisted.callOrder.indexOf("drain-decrypts"),
|
||||
);
|
||||
expect(hoisted.callOrder.indexOf("drain-decrypts")).toBeLessThan(
|
||||
hoisted.callOrder.indexOf("handler-done"),
|
||||
);
|
||||
expect(hoisted.callOrder.indexOf("handler-done")).toBeLessThan(
|
||||
hoisted.callOrder.indexOf("stop-manager"),
|
||||
);
|
||||
expect(hoisted.callOrder.indexOf("stop-manager")).toBeLessThan(
|
||||
hoisted.callOrder.indexOf("stop-deduper"),
|
||||
);
|
||||
expect(hoisted.callOrder.indexOf("stop-deduper")).toBeLessThan(
|
||||
hoisted.callOrder.indexOf("release-client"),
|
||||
);
|
||||
});
|
||||
});
|
||||
|
||||
@ -25,6 +25,7 @@ import { resolveMatrixMonitorConfig } from "./config.js";
|
||||
import { createDirectRoomTracker } from "./direct.js";
|
||||
import { registerMatrixMonitorEvents } from "./events.js";
|
||||
import { createMatrixRoomMessageHandler } from "./handler.js";
|
||||
import { createMatrixInboundEventDeduper } from "./inbound-dedupe.js";
|
||||
import { createMatrixRoomInfoResolver } from "./room-info.js";
|
||||
import { runMatrixStartupMaintenance } from "./startup.js";
|
||||
|
||||
@ -136,15 +137,29 @@ export async function monitorMatrixProvider(opts: MonitorMatrixOpts = {}): Promi
|
||||
setActiveMatrixClient(client, auth.accountId);
|
||||
let cleanedUp = false;
|
||||
let threadBindingManager: { accountId: string; stop: () => void } | null = null;
|
||||
const inboundDeduper = await createMatrixInboundEventDeduper({
|
||||
auth,
|
||||
env: process.env,
|
||||
});
|
||||
const inFlightRoomMessages = new Set<Promise<void>>();
|
||||
const waitForInFlightRoomMessages = async () => {
|
||||
while (inFlightRoomMessages.size > 0) {
|
||||
await Promise.allSettled(Array.from(inFlightRoomMessages));
|
||||
}
|
||||
};
|
||||
const cleanup = async () => {
|
||||
if (cleanedUp) {
|
||||
return;
|
||||
}
|
||||
cleanedUp = true;
|
||||
try {
|
||||
client.stopSyncWithoutPersist();
|
||||
await client.drainPendingDecryptions("matrix monitor shutdown");
|
||||
await waitForInFlightRoomMessages();
|
||||
threadBindingManager?.stop();
|
||||
} finally {
|
||||
await inboundDeduper.stop();
|
||||
await releaseSharedClientInstance(client, "persist");
|
||||
} finally {
|
||||
setActiveMatrixClient(null, auth.accountId);
|
||||
}
|
||||
};
|
||||
@ -219,11 +234,19 @@ export async function monitorMatrixProvider(opts: MonitorMatrixOpts = {}): Promi
|
||||
startupMs,
|
||||
startupGraceMs,
|
||||
dropPreStartupMessages,
|
||||
inboundDeduper,
|
||||
directTracker,
|
||||
getRoomInfo,
|
||||
getMemberDisplayName,
|
||||
needsRoomAliasesForConfig,
|
||||
});
|
||||
const trackRoomMessage = (roomId: string, event: Parameters<typeof handleRoomMessage>[1]) => {
|
||||
const task = Promise.resolve(handleRoomMessage(roomId, event)).finally(() => {
|
||||
inFlightRoomMessages.delete(task);
|
||||
});
|
||||
inFlightRoomMessages.add(task);
|
||||
return task;
|
||||
};
|
||||
|
||||
try {
|
||||
threadBindingManager = await createMatrixThreadBindingManager({
|
||||
@ -249,7 +272,7 @@ export async function monitorMatrixProvider(opts: MonitorMatrixOpts = {}): Promi
|
||||
warnedCryptoMissingRooms,
|
||||
logger,
|
||||
formatNativeDependencyHint: core.system.formatNativeDependencyHint,
|
||||
onRoomMessage: handleRoomMessage,
|
||||
onRoomMessage: trackRoomMessage,
|
||||
});
|
||||
|
||||
// Register Matrix thread bindings before the client starts syncing so threaded
|
||||
|
||||
@ -684,6 +684,52 @@ describe("MatrixClient event bridge", () => {
|
||||
expect(delivered).toEqual(["m.room.message"]);
|
||||
});
|
||||
|
||||
it("can drain pending decrypt retries after sync stops", async () => {
|
||||
vi.useFakeTimers();
|
||||
const client = new MatrixClient("https://matrix.example.org", "token");
|
||||
const delivered: string[] = [];
|
||||
|
||||
client.on("room.message", (_roomId, event) => {
|
||||
delivered.push(event.type);
|
||||
});
|
||||
|
||||
const encrypted = new FakeMatrixEvent({
|
||||
roomId: "!room:example.org",
|
||||
eventId: "$event",
|
||||
sender: "@alice:example.org",
|
||||
type: "m.room.encrypted",
|
||||
ts: Date.now(),
|
||||
content: {},
|
||||
decryptionFailure: true,
|
||||
});
|
||||
const decrypted = new FakeMatrixEvent({
|
||||
roomId: "!room:example.org",
|
||||
eventId: "$event",
|
||||
sender: "@alice:example.org",
|
||||
type: "m.room.message",
|
||||
ts: Date.now(),
|
||||
content: {
|
||||
msgtype: "m.text",
|
||||
body: "hello",
|
||||
},
|
||||
});
|
||||
|
||||
matrixJsClient.decryptEventIfNeeded = vi.fn(async () => {
|
||||
encrypted.emit("decrypted", decrypted);
|
||||
});
|
||||
|
||||
await client.start();
|
||||
matrixJsClient.emit("event", encrypted);
|
||||
encrypted.emit("decrypted", encrypted, new Error("missing room key"));
|
||||
|
||||
client.stopSyncWithoutPersist();
|
||||
await client.drainPendingDecryptions("test shutdown");
|
||||
|
||||
expect(matrixJsClient.stopClient).toHaveBeenCalledTimes(1);
|
||||
expect(matrixJsClient.decryptEventIfNeeded).toHaveBeenCalledTimes(1);
|
||||
expect(delivered).toEqual(["m.room.message"]);
|
||||
});
|
||||
|
||||
it("retries failed decryptions immediately on crypto key update signals", async () => {
|
||||
vi.useFakeTimers();
|
||||
const client = new MatrixClient("https://matrix.example.org", "token", undefined, undefined, {
|
||||
|
||||
@ -365,11 +365,21 @@ export class MatrixClient {
|
||||
await this.startSyncSession({ bootstrapCrypto: false });
|
||||
}
|
||||
|
||||
stop(): void {
|
||||
stopSyncWithoutPersist(): void {
|
||||
if (this.idbPersistTimer) {
|
||||
clearInterval(this.idbPersistTimer);
|
||||
this.idbPersistTimer = null;
|
||||
}
|
||||
this.client.stopClient();
|
||||
this.started = false;
|
||||
}
|
||||
|
||||
async drainPendingDecryptions(reason = "matrix client shutdown"): Promise<void> {
|
||||
await this.decryptBridge.drainPendingDecryptions(reason);
|
||||
}
|
||||
|
||||
stop(): void {
|
||||
this.stopSyncWithoutPersist();
|
||||
this.decryptBridge.stop();
|
||||
// Final persist on shutdown
|
||||
this.syncStore?.markCleanShutdown();
|
||||
@ -380,8 +390,6 @@ export class MatrixClient {
|
||||
}).catch(noop),
|
||||
this.syncStore?.flush().catch(noop),
|
||||
]).then(() => undefined);
|
||||
this.client.stopClient();
|
||||
this.started = false;
|
||||
}
|
||||
|
||||
async stopAndPersist(): Promise<void> {
|
||||
|
||||
@ -51,6 +51,8 @@ export class MatrixDecryptBridge<TRawEvent extends DecryptBridgeRawEvent> {
|
||||
private readonly decryptedMessageDedupe = new Map<string, number>();
|
||||
private readonly decryptRetries = new Map<string, MatrixDecryptRetryState>();
|
||||
private readonly failedDecryptionsNotified = new Set<string>();
|
||||
private activeRetryRuns = 0;
|
||||
private readonly retryIdleResolvers = new Set<() => void>();
|
||||
private cryptoRetrySignalsBound = false;
|
||||
|
||||
constructor(
|
||||
@ -139,6 +141,22 @@ export class MatrixDecryptBridge<TRawEvent extends DecryptBridgeRawEvent> {
|
||||
}
|
||||
}
|
||||
|
||||
async drainPendingDecryptions(reason: string): Promise<void> {
|
||||
for (let attempts = 0; attempts < MATRIX_DECRYPT_RETRY_MAX_ATTEMPTS; attempts += 1) {
|
||||
if (this.decryptRetries.size === 0) {
|
||||
return;
|
||||
}
|
||||
this.retryPendingNow(reason);
|
||||
await this.waitForActiveRetryRunsToFinish();
|
||||
const hasPendingRetryTimers = Array.from(this.decryptRetries.values()).some(
|
||||
(state) => state.timer || state.inFlight,
|
||||
);
|
||||
if (!hasPendingRetryTimers) {
|
||||
return;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private handleEncryptedEventDecrypted(params: {
|
||||
roomId: string;
|
||||
encryptedEvent: MatrixEvent;
|
||||
@ -246,9 +264,12 @@ export class MatrixDecryptBridge<TRawEvent extends DecryptBridgeRawEvent> {
|
||||
|
||||
state.inFlight = true;
|
||||
state.timer = null;
|
||||
this.activeRetryRuns += 1;
|
||||
const canDecrypt = typeof this.deps.client.decryptEventIfNeeded === "function";
|
||||
if (!canDecrypt) {
|
||||
this.clearDecryptRetry(retryKey);
|
||||
this.activeRetryRuns = Math.max(0, this.activeRetryRuns - 1);
|
||||
this.resolveRetryIdleIfNeeded();
|
||||
return;
|
||||
}
|
||||
|
||||
@ -260,8 +281,13 @@ export class MatrixDecryptBridge<TRawEvent extends DecryptBridgeRawEvent> {
|
||||
// Retry with backoff until we hit the configured retry cap.
|
||||
} finally {
|
||||
state.inFlight = false;
|
||||
this.activeRetryRuns = Math.max(0, this.activeRetryRuns - 1);
|
||||
this.resolveRetryIdleIfNeeded();
|
||||
}
|
||||
|
||||
if (this.decryptRetries.get(retryKey) !== state) {
|
||||
return;
|
||||
}
|
||||
if (isDecryptionFailure(state.event)) {
|
||||
this.scheduleDecryptRetry(state);
|
||||
return;
|
||||
@ -304,4 +330,27 @@ export class MatrixDecryptBridge<TRawEvent extends DecryptBridgeRawEvent> {
|
||||
this.decryptedMessageDedupe.delete(oldest);
|
||||
}
|
||||
}
|
||||
|
||||
private async waitForActiveRetryRunsToFinish(): Promise<void> {
|
||||
if (this.activeRetryRuns === 0) {
|
||||
return;
|
||||
}
|
||||
await new Promise<void>((resolve) => {
|
||||
this.retryIdleResolvers.add(resolve);
|
||||
if (this.activeRetryRuns === 0) {
|
||||
this.retryIdleResolvers.delete(resolve);
|
||||
resolve();
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
private resolveRetryIdleIfNeeded(): void {
|
||||
if (this.activeRetryRuns !== 0) {
|
||||
return;
|
||||
}
|
||||
for (const resolve of this.retryIdleResolvers) {
|
||||
resolve();
|
||||
}
|
||||
this.retryIdleResolvers.clear();
|
||||
}
|
||||
}
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user