From 91ae4468f509964b09ab0dd3c43a78e3482157d0 Mon Sep 17 00:00:00 2001 From: Kim <150593189+KimGLee@users.noreply.github.com> Date: Fri, 27 Feb 2026 14:22:46 +0800 Subject: [PATCH] fix(reply): scope cross-run dedupe to matching routing target --- src/auto-reply/reply/followup-runner.test.ts | 335 +++++++++++-------- src/auto-reply/reply/followup-runner.ts | 41 ++- 2 files changed, 224 insertions(+), 152 deletions(-) diff --git a/src/auto-reply/reply/followup-runner.test.ts b/src/auto-reply/reply/followup-runner.test.ts index 067b2713cc9..add48e3a129 100644 --- a/src/auto-reply/reply/followup-runner.test.ts +++ b/src/auto-reply/reply/followup-runner.test.ts @@ -79,10 +79,6 @@ function mockCompactionRun(params: { ); } -function createAsyncReplySpy() { - return vi.fn(async () => {}); -} - describe("createFollowupRunner compaction", () => { it("adds verbose auto-compaction notice and tracks count", async () => { const storePath = path.join( @@ -321,97 +317,95 @@ describe("createFollowupRunner messaging tool dedupe", () => { }); } - async function runMessagingCase(params: { - agentResult: Record; - queued?: FollowupRun; - runnerOverrides?: Partial<{ - sessionEntry: SessionEntry; - sessionStore: Record; - sessionKey: string; - storePath: string; - }>; - }) { - const onBlockReply = createAsyncReplySpy(); - runEmbeddedPiAgentMock.mockResolvedValueOnce({ - meta: {}, - ...params.agentResult, - }); - const runner = createMessagingDedupeRunner(onBlockReply, params.runnerOverrides); - await runner(params.queued ?? baseQueuedRun()); - return { onBlockReply }; - } - - function makeTextReplyDedupeResult(overrides?: Record) { - return { - payloads: [{ text: "hello world!" }], - messagingToolSentTexts: ["different message"], - ...overrides, - }; - } - it("drops payloads already sent via messaging tool", async () => { - const { onBlockReply } = await runMessagingCase({ - agentResult: { - payloads: [{ text: "hello world!" }], - messagingToolSentTexts: ["hello world!"], - }, + const onBlockReply = vi.fn(async () => {}); + runEmbeddedPiAgentMock.mockResolvedValueOnce({ + payloads: [{ text: "hello world!" }], + messagingToolSentTexts: ["hello world!"], + meta: {}, + }); + + const runner = createMessagingDedupeRunner(onBlockReply); + + await runner({ + ...baseQueuedRun("telegram"), + originatingTo: "123", }); expect(onBlockReply).not.toHaveBeenCalled(); }); it("delivers payloads when not duplicates", async () => { - const { onBlockReply } = await runMessagingCase({ - agentResult: makeTextReplyDedupeResult(), + const onBlockReply = vi.fn(async () => {}); + runEmbeddedPiAgentMock.mockResolvedValueOnce({ + payloads: [{ text: "hello world!" }], + messagingToolSentTexts: ["different message"], + meta: {}, }); + const runner = createMessagingDedupeRunner(onBlockReply); + + await runner(baseQueuedRun()); + expect(onBlockReply).toHaveBeenCalledTimes(1); }); it("suppresses replies when a messaging tool sent via the same provider + target", async () => { - const { onBlockReply } = await runMessagingCase({ - agentResult: { - ...makeTextReplyDedupeResult(), - messagingToolSentTargets: [{ tool: "slack", provider: "slack", to: "channel:C1" }], - }, - queued: baseQueuedRun("slack"), + const onBlockReply = vi.fn(async () => {}); + runEmbeddedPiAgentMock.mockResolvedValueOnce({ + payloads: [{ text: "hello world!" }], + messagingToolSentTexts: ["different message"], + messagingToolSentTargets: [{ tool: "slack", provider: "slack", to: "channel:C1" }], + meta: {}, }); + const runner = createMessagingDedupeRunner(onBlockReply); + + await runner(baseQueuedRun("slack")); + expect(onBlockReply).not.toHaveBeenCalled(); }); it("suppresses replies when provider is synthetic but originating channel matches", async () => { - const { onBlockReply } = await runMessagingCase({ - agentResult: { - ...makeTextReplyDedupeResult(), - messagingToolSentTargets: [{ tool: "telegram", provider: "telegram", to: "268300329" }], - }, - queued: { - ...baseQueuedRun("heartbeat"), - originatingChannel: "telegram", - originatingTo: "268300329", - } as FollowupRun, + const onBlockReply = vi.fn(async () => {}); + runEmbeddedPiAgentMock.mockResolvedValueOnce({ + payloads: [{ text: "hello world!" }], + messagingToolSentTexts: ["different message"], + messagingToolSentTargets: [{ tool: "telegram", provider: "telegram", to: "268300329" }], + meta: {}, }); + const runner = createMessagingDedupeRunner(onBlockReply); + + await runner({ + ...baseQueuedRun("heartbeat"), + originatingChannel: "telegram", + originatingTo: "268300329", + } as FollowupRun); + expect(onBlockReply).not.toHaveBeenCalled(); }); it("does not suppress replies for same target when account differs", async () => { - const { onBlockReply } = await runMessagingCase({ - agentResult: { - ...makeTextReplyDedupeResult(), - messagingToolSentTargets: [ - { tool: "telegram", provider: "telegram", to: "268300329", accountId: "work" }, - ], - }, - queued: { - ...baseQueuedRun("heartbeat"), - originatingChannel: "telegram", - originatingTo: "268300329", - originatingAccountId: "personal", - } as FollowupRun, + const onBlockReply = vi.fn(async () => {}); + runEmbeddedPiAgentMock.mockResolvedValueOnce({ + payloads: [{ text: "hello world!" }], + messagingToolSentTexts: ["different message"], + messagingToolSentTargets: [ + { tool: "telegram", provider: "telegram", to: "268300329", accountId: "work" }, + ], + meta: {}, }); + const runner = createMessagingDedupeRunner(onBlockReply); + + await runner({ + ...baseQueuedRun("heartbeat"), + originatingChannel: "telegram", + originatingTo: "268300329", + originatingAccountId: "personal", + } as FollowupRun); + expect(routeReplyMock).toHaveBeenCalledWith( expect.objectContaining({ channel: "telegram", @@ -429,6 +423,7 @@ describe("createFollowupRunner messaging tool dedupe", () => { updatedAt: Date.now(), lastMessagingToolSentAt: Date.now(), lastMessagingToolSentTexts: ["hello world!"], + lastMessagingToolSentTargets: [{ tool: "message", provider: "telegram", to: "123" }], }; const sessionStore: Record = { main: sessionEntry }; @@ -443,31 +438,69 @@ describe("createFollowupRunner messaging tool dedupe", () => { sessionKey: "main", }); - await runner(baseQueuedRun()); + await runner({ + ...baseQueuedRun("telegram"), + originatingTo: "123", + }); expect(onBlockReply).not.toHaveBeenCalled(); }); - it("drops media URL from payload when messaging tool already sent it", async () => { - const { onBlockReply } = await runMessagingCase({ - agentResult: { - payloads: [{ mediaUrl: "/tmp/img.png" }], - messagingToolSentMediaUrls: ["/tmp/img.png"], - }, + it("does not use session-level text dedupe when recent target does not match", async () => { + const onBlockReply = vi.fn(async () => {}); + const sessionEntry: SessionEntry = { + sessionId: "session", + updatedAt: Date.now(), + lastMessagingToolSentAt: Date.now(), + lastMessagingToolSentTexts: ["hello world!"], + lastMessagingToolSentTargets: [{ tool: "message", provider: "telegram", to: "999" }], + }; + const sessionStore: Record = { main: sessionEntry }; + + runEmbeddedPiAgentMock.mockResolvedValueOnce({ + payloads: [{ text: "hello world!" }], + meta: {}, }); + const runner = createMessagingDedupeRunner(onBlockReply, { + sessionEntry, + sessionStore, + sessionKey: "main", + }); + + await runner(baseQueuedRun("telegram")); + + expect(onBlockReply).toHaveBeenCalled(); + }); + + it("drops media URL from payload when messaging tool already sent it", async () => { + const onBlockReply = vi.fn(async () => {}); + runEmbeddedPiAgentMock.mockResolvedValueOnce({ + payloads: [{ mediaUrl: "/tmp/img.png" }], + messagingToolSentMediaUrls: ["/tmp/img.png"], + meta: {}, + }); + + const runner = createMessagingDedupeRunner(onBlockReply); + + await runner(baseQueuedRun()); + // Media stripped → payload becomes non-renderable → not delivered. expect(onBlockReply).not.toHaveBeenCalled(); }); it("delivers media payload when not a duplicate", async () => { - const { onBlockReply } = await runMessagingCase({ - agentResult: { - payloads: [{ mediaUrl: "/tmp/img.png" }], - messagingToolSentMediaUrls: ["/tmp/other.png"], - }, + const onBlockReply = vi.fn(async () => {}); + runEmbeddedPiAgentMock.mockResolvedValueOnce({ + payloads: [{ mediaUrl: "/tmp/img.png" }], + messagingToolSentMediaUrls: ["/tmp/other.png"], + meta: {}, }); + const runner = createMessagingDedupeRunner(onBlockReply); + + await runner(baseQueuedRun()); + expect(onBlockReply).toHaveBeenCalledTimes(1); }); @@ -481,28 +514,30 @@ describe("createFollowupRunner messaging tool dedupe", () => { const sessionStore: Record = { [sessionKey]: sessionEntry }; await saveSessionStore(storePath, sessionStore); - const { onBlockReply } = await runMessagingCase({ - agentResult: { - ...makeTextReplyDedupeResult(), - messagingToolSentTargets: [{ tool: "slack", provider: "slack", to: "channel:C1" }], - meta: { - agentMeta: { - usage: { input: 1_000, output: 50 }, - lastCallUsage: { input: 400, output: 20 }, - model: "claude-opus-4-5", - provider: "anthropic", - }, + const onBlockReply = vi.fn(async () => {}); + runEmbeddedPiAgentMock.mockResolvedValueOnce({ + payloads: [{ text: "hello world!" }], + messagingToolSentTexts: ["different message"], + messagingToolSentTargets: [{ tool: "slack", provider: "slack", to: "channel:C1" }], + meta: { + agentMeta: { + usage: { input: 1_000, output: 50 }, + lastCallUsage: { input: 400, output: 20 }, + model: "claude-opus-4-5", + provider: "anthropic", }, }, - runnerOverrides: { - sessionEntry, - sessionStore, - sessionKey, - storePath, - }, - queued: baseQueuedRun("slack"), }); + const runner = createMessagingDedupeRunner(onBlockReply, { + sessionEntry, + sessionStore, + sessionKey, + storePath, + }); + + await runner(baseQueuedRun("slack")); + expect(onBlockReply).not.toHaveBeenCalled(); const store = loadSessionStore(storePath, { skipCache: true }); // totalTokens should reflect the last call usage snapshot, not the accumulated input. @@ -572,36 +607,46 @@ describe("createFollowupRunner messaging tool dedupe", () => { }); it("does not fall back to dispatcher when cross-channel origin routing fails", async () => { + const onBlockReply = vi.fn(async () => {}); + runEmbeddedPiAgentMock.mockResolvedValueOnce({ + payloads: [{ text: "hello world!" }], + meta: {}, + }); routeReplyMock.mockResolvedValueOnce({ ok: false, error: "forced route failure", }); - const { onBlockReply } = await runMessagingCase({ - agentResult: { payloads: [{ text: "hello world!" }] }, - queued: { - ...baseQueuedRun("webchat"), - originatingChannel: "discord", - originatingTo: "channel:C1", - } as FollowupRun, - }); + + const runner = createMessagingDedupeRunner(onBlockReply); + + await runner({ + ...baseQueuedRun("webchat"), + originatingChannel: "discord", + originatingTo: "channel:C1", + } as FollowupRun); expect(routeReplyMock).toHaveBeenCalled(); expect(onBlockReply).not.toHaveBeenCalled(); }); it("falls back to dispatcher when same-channel origin routing fails", async () => { + const onBlockReply = vi.fn(async () => {}); + runEmbeddedPiAgentMock.mockResolvedValueOnce({ + payloads: [{ text: "hello world!" }], + meta: {}, + }); routeReplyMock.mockResolvedValueOnce({ ok: false, error: "outbound adapter unavailable", }); - const { onBlockReply } = await runMessagingCase({ - agentResult: { payloads: [{ text: "hello world!" }] }, - queued: { - ...baseQueuedRun(" Feishu "), - originatingChannel: "FEISHU", - originatingTo: "ou_abc123", - } as FollowupRun, - }); + + const runner = createMessagingDedupeRunner(onBlockReply); + + await runner({ + ...baseQueuedRun(" Feishu "), + originatingChannel: "FEISHU", + originatingTo: "ou_abc123", + } as FollowupRun); expect(routeReplyMock).toHaveBeenCalled(); expect(onBlockReply).toHaveBeenCalledTimes(1); @@ -609,17 +654,22 @@ describe("createFollowupRunner messaging tool dedupe", () => { }); it("routes followups with originating account/thread metadata", async () => { - const { onBlockReply } = await runMessagingCase({ - agentResult: { payloads: [{ text: "hello world!" }] }, - queued: { - ...baseQueuedRun("webchat"), - originatingChannel: "discord", - originatingTo: "channel:C1", - originatingAccountId: "work", - originatingThreadId: "1739142736.000100", - } as FollowupRun, + const onBlockReply = vi.fn(async () => {}); + runEmbeddedPiAgentMock.mockResolvedValueOnce({ + payloads: [{ text: "hello world!" }], + meta: {}, }); + const runner = createMessagingDedupeRunner(onBlockReply); + + await runner({ + ...baseQueuedRun("webchat"), + originatingChannel: "discord", + originatingTo: "channel:C1", + originatingAccountId: "work", + originatingThreadId: "1739142736.000100", + } as FollowupRun); + expect(routeReplyMock).toHaveBeenCalledWith( expect.objectContaining({ channel: "discord", @@ -633,37 +683,44 @@ describe("createFollowupRunner messaging tool dedupe", () => { }); describe("createFollowupRunner typing cleanup", () => { - async function runTypingCase(agentResult: Record) { + it("calls both markRunComplete and markDispatchIdle on NO_REPLY", async () => { const typing = createMockTypingController(); runEmbeddedPiAgentMock.mockResolvedValueOnce({ + payloads: [{ text: "NO_REPLY" }], meta: {}, - ...agentResult, }); const runner = createFollowupRunner({ - opts: { onBlockReply: createAsyncReplySpy() }, + opts: { onBlockReply: vi.fn(async () => {}) }, typing, typingMode: "instant", defaultModel: "anthropic/claude-opus-4-5", }); await runner(baseQueuedRun()); - return typing; - } - function expectTypingCleanup(typing: ReturnType) { expect(typing.markRunComplete).toHaveBeenCalled(); expect(typing.markDispatchIdle).toHaveBeenCalled(); - } - - it("calls both markRunComplete and markDispatchIdle on NO_REPLY", async () => { - const typing = await runTypingCase({ payloads: [{ text: "NO_REPLY" }] }); - expectTypingCleanup(typing); }); it("calls both markRunComplete and markDispatchIdle on empty payloads", async () => { - const typing = await runTypingCase({ payloads: [] }); - expectTypingCleanup(typing); + const typing = createMockTypingController(); + runEmbeddedPiAgentMock.mockResolvedValueOnce({ + payloads: [], + meta: {}, + }); + + const runner = createFollowupRunner({ + opts: { onBlockReply: vi.fn(async () => {}) }, + typing, + typingMode: "instant", + defaultModel: "anthropic/claude-opus-4-5", + }); + + await runner(baseQueuedRun()); + + expect(typing.markRunComplete).toHaveBeenCalled(); + expect(typing.markDispatchIdle).toHaveBeenCalled(); }); it("calls both markRunComplete and markDispatchIdle on agent error", async () => { @@ -679,7 +736,8 @@ describe("createFollowupRunner typing cleanup", () => { await runner(baseQueuedRun()); - expectTypingCleanup(typing); + expect(typing.markRunComplete).toHaveBeenCalled(); + expect(typing.markDispatchIdle).toHaveBeenCalled(); }); it("calls both markRunComplete and markDispatchIdle on successful delivery", async () => { @@ -700,7 +758,8 @@ describe("createFollowupRunner typing cleanup", () => { await runner(baseQueuedRun()); expect(onBlockReply).toHaveBeenCalled(); - expectTypingCleanup(typing); + expect(typing.markRunComplete).toHaveBeenCalled(); + expect(typing.markDispatchIdle).toHaveBeenCalled(); }); }); diff --git a/src/auto-reply/reply/followup-runner.ts b/src/auto-reply/reply/followup-runner.ts index e8ea958f978..a8f0222d090 100644 --- a/src/auto-reply/reply/followup-runner.ts +++ b/src/auto-reply/reply/followup-runner.ts @@ -318,20 +318,41 @@ export function createFollowupRunner(params: { replyToChannel, }); + const messageProvider = resolveOriginMessageProvider({ + originatingChannel: queued.originatingChannel, + provider: queued.run.messageProvider, + }); + const originatingTo = resolveOriginMessageTo({ + originatingTo: queued.originatingTo, + }); + const originAccountId = resolveOriginAccountId({ + originatingAccountId: queued.originatingAccountId, + accountId: queued.run.agentAccountId, + }); + const now = Date.now(); const recentWindowActive = typeof sessionEntry?.lastMessagingToolSentAt === "number" && now - sessionEntry.lastMessagingToolSentAt <= RECENT_MESSAGING_TOOL_DEDUPE_WINDOW_MS; + const recentTargetMatch = + recentWindowActive && + shouldSuppressMessagingToolReplies({ + messageProvider, + messagingToolSentTargets: sessionEntry?.lastMessagingToolSentTargets, + originatingTo, + accountId: originAccountId, + }); + const sentTexts = [ ...(runResult.messagingToolSentTexts ?? []), - ...(recentWindowActive ? (sessionEntry?.lastMessagingToolSentTexts ?? []) : []), + ...(recentTargetMatch ? (sessionEntry?.lastMessagingToolSentTexts ?? []) : []), ]; const sentMediaUrls = [ ...(runResult.messagingToolSentMediaUrls ?? []), - ...(recentWindowActive ? (sessionEntry?.lastMessagingToolSentMediaUrls ?? []) : []), + ...(recentTargetMatch ? (sessionEntry?.lastMessagingToolSentMediaUrls ?? []) : []), ]; // Keep target-based suppression scoped to the current run only. - // Session-level dedupe state is used for text/media duplicate filtering. + // Session-level dedupe state is used for text/media duplicate filtering when target matches. const sentTargets = runResult.messagingToolSentTargets ?? []; const dedupedPayloads = filterMessagingToolDuplicates({ @@ -343,18 +364,10 @@ export function createFollowupRunner(params: { sentMediaUrls, }); const suppressMessagingToolReplies = shouldSuppressMessagingToolReplies({ - messageProvider: resolveOriginMessageProvider({ - originatingChannel: queued.originatingChannel, - provider: queued.run.messageProvider, - }), + messageProvider, messagingToolSentTargets: sentTargets, - originatingTo: resolveOriginMessageTo({ - originatingTo: queued.originatingTo, - }), - accountId: resolveOriginAccountId({ - originatingAccountId: queued.originatingAccountId, - accountId: queued.run.agentAccountId, - }), + originatingTo, + accountId: originAccountId, }); const finalPayloads = suppressMessagingToolReplies ? [] : mediaFilteredPayloads;