diff --git a/src/auto-reply/reply/agent-runner.ts b/src/auto-reply/reply/agent-runner.ts index fbdad1be160..4cdae88142d 100644 --- a/src/auto-reply/reply/agent-runner.ts +++ b/src/auto-reply/reply/agent-runner.ts @@ -414,6 +414,42 @@ export async function runReplyAgent(params: { const payloadArray = runResult.payloads ?? []; + const sentTexts = runResult.messagingToolSentTexts ?? []; + const sentMediaUrls = runResult.messagingToolSentMediaUrls ?? []; + const sentTargets = runResult.messagingToolSentTargets ?? []; + if (activeSessionEntry) { + const now = Date.now(); + if (sentTexts.length || sentMediaUrls.length || sentTargets.length) { + activeSessionEntry.lastMessagingToolSentAt = now; + activeSessionEntry.lastMessagingToolSentTexts = sentTexts; + activeSessionEntry.lastMessagingToolSentMediaUrls = sentMediaUrls; + activeSessionEntry.lastMessagingToolSentTargets = sentTargets; + } else if ( + typeof activeSessionEntry.lastMessagingToolSentAt === "number" && + now - activeSessionEntry.lastMessagingToolSentAt > RECENT_MESSAGING_TOOL_DEDUPE_WINDOW_MS + ) { + delete activeSessionEntry.lastMessagingToolSentAt; + delete activeSessionEntry.lastMessagingToolSentTexts; + delete activeSessionEntry.lastMessagingToolSentMediaUrls; + delete activeSessionEntry.lastMessagingToolSentTargets; + } + if (sessionKey && activeSessionStore) { + activeSessionStore[sessionKey] = activeSessionEntry; + } + if (sessionKey && storePath) { + await updateSessionStoreEntry({ + storePath, + sessionKey, + update: async () => ({ + lastMessagingToolSentAt: activeSessionEntry.lastMessagingToolSentAt, + lastMessagingToolSentTexts: activeSessionEntry.lastMessagingToolSentTexts, + lastMessagingToolSentMediaUrls: activeSessionEntry.lastMessagingToolSentMediaUrls, + lastMessagingToolSentTargets: activeSessionEntry.lastMessagingToolSentTargets, + }), + }); + } + } + if (blockReplyPipeline) { await blockReplyPipeline.flush({ force: true }); blockReplyPipeline.stop(); diff --git a/src/auto-reply/reply/followup-runner.test.ts b/src/auto-reply/reply/followup-runner.test.ts index 0e93ab156a8..067b2713cc9 100644 --- a/src/auto-reply/reply/followup-runner.test.ts +++ b/src/auto-reply/reply/followup-runner.test.ts @@ -422,6 +422,32 @@ describe("createFollowupRunner messaging tool dedupe", () => { expect(onBlockReply).not.toHaveBeenCalled(); }); + it("suppresses replies using recent session-level messaging-tool dedupe state", async () => { + const onBlockReply = vi.fn(async () => {}); + const sessionEntry: SessionEntry = { + sessionId: "session", + updatedAt: Date.now(), + lastMessagingToolSentAt: Date.now(), + lastMessagingToolSentTexts: ["hello world!"], + }; + const sessionStore: Record = { main: sessionEntry }; + + runEmbeddedPiAgentMock.mockResolvedValueOnce({ + payloads: [{ text: "hello world!" }], + meta: {}, + }); + + const runner = createMessagingDedupeRunner(onBlockReply, { + sessionEntry, + sessionStore, + sessionKey: "main", + }); + + await runner(baseQueuedRun()); + + expect(onBlockReply).not.toHaveBeenCalled(); + }); + it("drops media URL from payload when messaging tool already sent it", async () => { const { onBlockReply } = await runMessagingCase({ agentResult: { diff --git a/src/auto-reply/reply/followup-runner.ts b/src/auto-reply/reply/followup-runner.ts index 2fd21607095..cf7384fd3bc 100644 --- a/src/auto-reply/reply/followup-runner.ts +++ b/src/auto-reply/reply/followup-runner.ts @@ -38,6 +38,8 @@ import { incrementRunCompactionCount, persistRunSessionUsage } from "./session-r import { createTypingSignaler } from "./typing-mode.js"; import type { TypingController } from "./typing.js"; +const RECENT_MESSAGING_TOOL_DEDUPE_WINDOW_MS = 2 * 60 * 1000; + export function createFollowupRunner(params: { opts?: GetReplyOptions; typing: TypingController; @@ -316,20 +318,37 @@ export function createFollowupRunner(params: { replyToChannel, }); + const now = Date.now(); + const recentWindowActive = + typeof sessionEntry?.lastMessagingToolSentAt === "number" && + now - sessionEntry.lastMessagingToolSentAt <= RECENT_MESSAGING_TOOL_DEDUPE_WINDOW_MS; + const sentTexts = [ + ...(runResult.messagingToolSentTexts ?? []), + ...(recentWindowActive ? (sessionEntry?.lastMessagingToolSentTexts ?? []) : []), + ]; + const sentMediaUrls = [ + ...(runResult.messagingToolSentMediaUrls ?? []), + ...(recentWindowActive ? (sessionEntry?.lastMessagingToolSentMediaUrls ?? []) : []), + ]; + const sentTargets = [ + ...(runResult.messagingToolSentTargets ?? []), + ...(recentWindowActive ? (sessionEntry?.lastMessagingToolSentTargets ?? []) : []), + ]; + const dedupedPayloads = filterMessagingToolDuplicates({ payloads: replyTaggedPayloads, - sentTexts: runResult.messagingToolSentTexts ?? [], + sentTexts, }); const mediaFilteredPayloads = filterMessagingToolMediaDuplicates({ payloads: dedupedPayloads, - sentMediaUrls: runResult.messagingToolSentMediaUrls ?? [], + sentMediaUrls, }); const suppressMessagingToolReplies = shouldSuppressMessagingToolReplies({ messageProvider: resolveOriginMessageProvider({ originatingChannel: queued.originatingChannel, provider: queued.run.messageProvider, }), - messagingToolSentTargets: runResult.messagingToolSentTargets, + messagingToolSentTargets: sentTargets, originatingTo: resolveOriginMessageTo({ originatingTo: queued.originatingTo, }), diff --git a/src/config/sessions/types.ts b/src/config/sessions/types.ts index 6513fc81b37..64e43ecd2ff 100644 --- a/src/config/sessions/types.ts +++ b/src/config/sessions/types.ts @@ -73,6 +73,19 @@ export type SessionEntry = { lastHeartbeatText?: string; /** Timestamp (ms) when lastHeartbeatText was delivered. */ lastHeartbeatSentAt?: number; + /** Timestamp (ms) for the most recent message-tool send fingerprint (cross-run dedupe). */ + lastMessagingToolSentAt?: number; + /** Recently sent message-tool text payloads for short-window cross-run dedupe. */ + lastMessagingToolSentTexts?: string[]; + /** Recently sent message-tool media urls for short-window cross-run dedupe. */ + lastMessagingToolSentMediaUrls?: string[]; + /** Recently sent message-tool routing targets for short-window cross-run dedupe. */ + lastMessagingToolSentTargets?: Array<{ + tool?: string; + provider?: string; + to?: string; + accountId?: string; + }>; sessionId: string; updatedAt: number; sessionFile?: string;