openclaw/src/auto-reply/reply/block-reply-pipeline.ts
Mathias Nagler e8a162d3d8
fix(mattermost): prevent duplicate messages when block streaming + threading are active (#41362)
* fix(mattermost): prevent duplicate messages when block streaming + threading are active

Remove replyToId from createBlockReplyPayloadKey so identical content is
deduplicated regardless of threading target. Add explicit threading dock
to the Mattermost plugin with resolveReplyToMode reading from config
(default "all"), and add replyToMode to the Mattermost config schema.

Fixes #41219

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

* fix(mattermost): address PR review — per-account replyToMode and test clarity

Read replyToMode from the merged per-account config via
resolveMattermostAccount so account-level overrides are honored in
multi-account setups. Add replyToMode to MattermostAccountConfig type.
Rename misleading test to clarify it exercises shouldDropFinalPayloads
short-circuit, not payload key dedup.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

* Replies: keep block-pipeline reply targets distinct

* Tests: cover block reply target-aware dedupe

* Update CHANGELOG.md

---------

Co-authored-by: Claude Opus 4.6 <noreply@anthropic.com>
Co-authored-by: Vincent Koc <vincentkoc@ieee.org>
2026-03-12 03:15:17 -04:00

262 lines
7.4 KiB
TypeScript

import { logVerbose } from "../../globals.js";
import type { ReplyPayload } from "../types.js";
import { createBlockReplyCoalescer } from "./block-reply-coalescer.js";
import type { BlockStreamingCoalescing } from "./block-streaming.js";
export type BlockReplyPipeline = {
enqueue: (payload: ReplyPayload) => void;
flush: (options?: { force?: boolean }) => Promise<void>;
stop: () => void;
hasBuffered: () => boolean;
didStream: () => boolean;
isAborted: () => boolean;
hasSentPayload: (payload: ReplyPayload) => boolean;
};
export type BlockReplyBuffer = {
shouldBuffer: (payload: ReplyPayload) => boolean;
onEnqueue?: (payload: ReplyPayload) => void;
finalize?: (payload: ReplyPayload) => ReplyPayload;
};
export function createAudioAsVoiceBuffer(params: {
isAudioPayload: (payload: ReplyPayload) => boolean;
}): BlockReplyBuffer {
let seenAudioAsVoice = false;
return {
onEnqueue: (payload) => {
if (payload.audioAsVoice) {
seenAudioAsVoice = true;
}
},
shouldBuffer: (payload) => params.isAudioPayload(payload),
finalize: (payload) => (seenAudioAsVoice ? { ...payload, audioAsVoice: true } : payload),
};
}
export function createBlockReplyPayloadKey(payload: ReplyPayload): string {
const text = payload.text?.trim() ?? "";
const mediaList = payload.mediaUrls?.length
? payload.mediaUrls
: payload.mediaUrl
? [payload.mediaUrl]
: [];
return JSON.stringify({
text,
mediaList,
replyToId: payload.replyToId ?? null,
});
}
export function createBlockReplyContentKey(payload: ReplyPayload): string {
const text = payload.text?.trim() ?? "";
const mediaList = payload.mediaUrls?.length
? payload.mediaUrls
: payload.mediaUrl
? [payload.mediaUrl]
: [];
// Content-only key used for final-payload suppression after block streaming.
// This intentionally ignores replyToId so a streamed threaded payload and the
// later final payload still collapse when they carry the same content.
return JSON.stringify({ text, mediaList });
}
const withTimeout = async <T>(
promise: Promise<T>,
timeoutMs: number,
timeoutError: Error,
): Promise<T> => {
if (!timeoutMs || timeoutMs <= 0) {
return promise;
}
let timer: NodeJS.Timeout | undefined;
const timeoutPromise = new Promise<never>((_, reject) => {
timer = setTimeout(() => reject(timeoutError), timeoutMs);
});
try {
return await Promise.race([promise, timeoutPromise]);
} finally {
if (timer) {
clearTimeout(timer);
}
}
};
export function createBlockReplyPipeline(params: {
onBlockReply: (
payload: ReplyPayload,
options?: { abortSignal?: AbortSignal; timeoutMs?: number },
) => Promise<void> | void;
timeoutMs: number;
coalescing?: BlockStreamingCoalescing;
buffer?: BlockReplyBuffer;
}): BlockReplyPipeline {
const { onBlockReply, timeoutMs, coalescing, buffer } = params;
const sentKeys = new Set<string>();
const sentContentKeys = new Set<string>();
const pendingKeys = new Set<string>();
const seenKeys = new Set<string>();
const bufferedKeys = new Set<string>();
const bufferedPayloadKeys = new Set<string>();
const bufferedPayloads: ReplyPayload[] = [];
let sendChain: Promise<void> = Promise.resolve();
let aborted = false;
let didStream = false;
let didLogTimeout = false;
const sendPayload = (payload: ReplyPayload, bypassSeenCheck: boolean = false) => {
if (aborted) {
return;
}
const payloadKey = createBlockReplyPayloadKey(payload);
const contentKey = createBlockReplyContentKey(payload);
if (!bypassSeenCheck) {
if (seenKeys.has(payloadKey)) {
return;
}
seenKeys.add(payloadKey);
}
if (sentKeys.has(payloadKey) || pendingKeys.has(payloadKey)) {
return;
}
pendingKeys.add(payloadKey);
const timeoutError = new Error(`block reply delivery timed out after ${timeoutMs}ms`);
const abortController = new AbortController();
sendChain = sendChain
.then(async () => {
if (aborted) {
return false;
}
await withTimeout(
Promise.resolve(
onBlockReply(payload, {
abortSignal: abortController.signal,
timeoutMs,
}),
),
timeoutMs,
timeoutError,
);
return true;
})
.then((didSend) => {
if (!didSend) {
return;
}
sentKeys.add(payloadKey);
sentContentKeys.add(contentKey);
didStream = true;
})
.catch((err) => {
if (err === timeoutError) {
abortController.abort();
aborted = true;
if (!didLogTimeout) {
didLogTimeout = true;
logVerbose(
`block reply delivery timed out after ${timeoutMs}ms; skipping remaining block replies to preserve ordering`,
);
}
return;
}
logVerbose(`block reply delivery failed: ${String(err)}`);
})
.finally(() => {
pendingKeys.delete(payloadKey);
});
};
const coalescer = coalescing
? createBlockReplyCoalescer({
config: coalescing,
shouldAbort: () => aborted,
onFlush: (payload) => {
bufferedKeys.clear();
sendPayload(payload, /* bypassSeenCheck */ true);
},
})
: null;
const bufferPayload = (payload: ReplyPayload) => {
buffer?.onEnqueue?.(payload);
if (!buffer?.shouldBuffer(payload)) {
return false;
}
const payloadKey = createBlockReplyPayloadKey(payload);
if (
seenKeys.has(payloadKey) ||
sentKeys.has(payloadKey) ||
pendingKeys.has(payloadKey) ||
bufferedPayloadKeys.has(payloadKey)
) {
return true;
}
seenKeys.add(payloadKey);
bufferedPayloadKeys.add(payloadKey);
bufferedPayloads.push(payload);
return true;
};
const flushBuffered = () => {
if (!bufferedPayloads.length) {
return;
}
for (const payload of bufferedPayloads) {
const finalPayload = buffer?.finalize?.(payload) ?? payload;
sendPayload(finalPayload, /* bypassSeenCheck */ true);
}
bufferedPayloads.length = 0;
bufferedPayloadKeys.clear();
};
const enqueue = (payload: ReplyPayload) => {
if (aborted) {
return;
}
if (bufferPayload(payload)) {
return;
}
const hasMedia = Boolean(payload.mediaUrl) || (payload.mediaUrls?.length ?? 0) > 0;
if (hasMedia) {
void coalescer?.flush({ force: true });
sendPayload(payload, /* bypassSeenCheck */ false);
return;
}
if (coalescer) {
const payloadKey = createBlockReplyPayloadKey(payload);
if (seenKeys.has(payloadKey) || pendingKeys.has(payloadKey) || bufferedKeys.has(payloadKey)) {
return;
}
seenKeys.add(payloadKey);
bufferedKeys.add(payloadKey);
coalescer.enqueue(payload);
return;
}
sendPayload(payload, /* bypassSeenCheck */ false);
};
const flush = async (options?: { force?: boolean }) => {
await coalescer?.flush(options);
flushBuffered();
await sendChain;
};
const stop = () => {
coalescer?.stop();
};
return {
enqueue,
flush,
stop,
hasBuffered: () => Boolean(coalescer?.hasBuffered() || bufferedPayloads.length > 0),
didStream: () => didStream,
isAborted: () => aborted,
hasSentPayload: (payload) => {
const payloadKey = createBlockReplyContentKey(payload);
return sentContentKeys.has(payloadKey);
},
};
}