fix(reply): prevent cross-run duplicate resend after message tool sends

This commit is contained in:
Kim 2026-02-27 11:13:54 +08:00 committed by KimGLee
parent 598f1826d8
commit d71a01d0ea
4 changed files with 97 additions and 3 deletions

View File

@ -414,6 +414,42 @@ export async function runReplyAgent(params: {
const payloadArray = runResult.payloads ?? [];
const sentTexts = runResult.messagingToolSentTexts ?? [];
const sentMediaUrls = runResult.messagingToolSentMediaUrls ?? [];
const sentTargets = runResult.messagingToolSentTargets ?? [];
if (activeSessionEntry) {
const now = Date.now();
if (sentTexts.length || sentMediaUrls.length || sentTargets.length) {
activeSessionEntry.lastMessagingToolSentAt = now;
activeSessionEntry.lastMessagingToolSentTexts = sentTexts;
activeSessionEntry.lastMessagingToolSentMediaUrls = sentMediaUrls;
activeSessionEntry.lastMessagingToolSentTargets = sentTargets;
} else if (
typeof activeSessionEntry.lastMessagingToolSentAt === "number" &&
now - activeSessionEntry.lastMessagingToolSentAt > RECENT_MESSAGING_TOOL_DEDUPE_WINDOW_MS
) {
delete activeSessionEntry.lastMessagingToolSentAt;
delete activeSessionEntry.lastMessagingToolSentTexts;
delete activeSessionEntry.lastMessagingToolSentMediaUrls;
delete activeSessionEntry.lastMessagingToolSentTargets;
}
if (sessionKey && activeSessionStore) {
activeSessionStore[sessionKey] = activeSessionEntry;
}
if (sessionKey && storePath) {
await updateSessionStoreEntry({
storePath,
sessionKey,
update: async () => ({
lastMessagingToolSentAt: activeSessionEntry.lastMessagingToolSentAt,
lastMessagingToolSentTexts: activeSessionEntry.lastMessagingToolSentTexts,
lastMessagingToolSentMediaUrls: activeSessionEntry.lastMessagingToolSentMediaUrls,
lastMessagingToolSentTargets: activeSessionEntry.lastMessagingToolSentTargets,
}),
});
}
}
if (blockReplyPipeline) {
await blockReplyPipeline.flush({ force: true });
blockReplyPipeline.stop();

View File

@ -422,6 +422,32 @@ describe("createFollowupRunner messaging tool dedupe", () => {
expect(onBlockReply).not.toHaveBeenCalled();
});
it("suppresses replies using recent session-level messaging-tool dedupe state", async () => {
const onBlockReply = vi.fn(async () => {});
const sessionEntry: SessionEntry = {
sessionId: "session",
updatedAt: Date.now(),
lastMessagingToolSentAt: Date.now(),
lastMessagingToolSentTexts: ["hello world!"],
};
const sessionStore: Record<string, SessionEntry> = { main: sessionEntry };
runEmbeddedPiAgentMock.mockResolvedValueOnce({
payloads: [{ text: "hello world!" }],
meta: {},
});
const runner = createMessagingDedupeRunner(onBlockReply, {
sessionEntry,
sessionStore,
sessionKey: "main",
});
await runner(baseQueuedRun());
expect(onBlockReply).not.toHaveBeenCalled();
});
it("drops media URL from payload when messaging tool already sent it", async () => {
const { onBlockReply } = await runMessagingCase({
agentResult: {

View File

@ -38,6 +38,8 @@ import { incrementRunCompactionCount, persistRunSessionUsage } from "./session-r
import { createTypingSignaler } from "./typing-mode.js";
import type { TypingController } from "./typing.js";
const RECENT_MESSAGING_TOOL_DEDUPE_WINDOW_MS = 2 * 60 * 1000;
export function createFollowupRunner(params: {
opts?: GetReplyOptions;
typing: TypingController;
@ -316,20 +318,37 @@ export function createFollowupRunner(params: {
replyToChannel,
});
const now = Date.now();
const recentWindowActive =
typeof sessionEntry?.lastMessagingToolSentAt === "number" &&
now - sessionEntry.lastMessagingToolSentAt <= RECENT_MESSAGING_TOOL_DEDUPE_WINDOW_MS;
const sentTexts = [
...(runResult.messagingToolSentTexts ?? []),
...(recentWindowActive ? (sessionEntry?.lastMessagingToolSentTexts ?? []) : []),
];
const sentMediaUrls = [
...(runResult.messagingToolSentMediaUrls ?? []),
...(recentWindowActive ? (sessionEntry?.lastMessagingToolSentMediaUrls ?? []) : []),
];
const sentTargets = [
...(runResult.messagingToolSentTargets ?? []),
...(recentWindowActive ? (sessionEntry?.lastMessagingToolSentTargets ?? []) : []),
];
const dedupedPayloads = filterMessagingToolDuplicates({
payloads: replyTaggedPayloads,
sentTexts: runResult.messagingToolSentTexts ?? [],
sentTexts,
});
const mediaFilteredPayloads = filterMessagingToolMediaDuplicates({
payloads: dedupedPayloads,
sentMediaUrls: runResult.messagingToolSentMediaUrls ?? [],
sentMediaUrls,
});
const suppressMessagingToolReplies = shouldSuppressMessagingToolReplies({
messageProvider: resolveOriginMessageProvider({
originatingChannel: queued.originatingChannel,
provider: queued.run.messageProvider,
}),
messagingToolSentTargets: runResult.messagingToolSentTargets,
messagingToolSentTargets: sentTargets,
originatingTo: resolveOriginMessageTo({
originatingTo: queued.originatingTo,
}),

View File

@ -73,6 +73,19 @@ export type SessionEntry = {
lastHeartbeatText?: string;
/** Timestamp (ms) when lastHeartbeatText was delivered. */
lastHeartbeatSentAt?: number;
/** Timestamp (ms) for the most recent message-tool send fingerprint (cross-run dedupe). */
lastMessagingToolSentAt?: number;
/** Recently sent message-tool text payloads for short-window cross-run dedupe. */
lastMessagingToolSentTexts?: string[];
/** Recently sent message-tool media urls for short-window cross-run dedupe. */
lastMessagingToolSentMediaUrls?: string[];
/** Recently sent message-tool routing targets for short-window cross-run dedupe. */
lastMessagingToolSentTargets?: Array<{
tool?: string;
provider?: string;
to?: string;
accountId?: string;
}>;
sessionId: string;
updatedAt: number;
sessionFile?: string;