From d71a01d0ead60af4d881c0fa7931c46845ac41a4 Mon Sep 17 00:00:00 2001 From: Kim <150593189+KimGLee@users.noreply.github.com> Date: Fri, 27 Feb 2026 11:13:54 +0800 Subject: [PATCH 01/15] fix(reply): prevent cross-run duplicate resend after message tool sends --- src/auto-reply/reply/agent-runner.ts | 36 ++++++++++++++++++++ src/auto-reply/reply/followup-runner.test.ts | 26 ++++++++++++++ src/auto-reply/reply/followup-runner.ts | 25 ++++++++++++-- src/config/sessions/types.ts | 13 +++++++ 4 files changed, 97 insertions(+), 3 deletions(-) diff --git a/src/auto-reply/reply/agent-runner.ts b/src/auto-reply/reply/agent-runner.ts index fbdad1be160..4cdae88142d 100644 --- a/src/auto-reply/reply/agent-runner.ts +++ b/src/auto-reply/reply/agent-runner.ts @@ -414,6 +414,42 @@ export async function runReplyAgent(params: { const payloadArray = runResult.payloads ?? []; + const sentTexts = runResult.messagingToolSentTexts ?? []; + const sentMediaUrls = runResult.messagingToolSentMediaUrls ?? []; + const sentTargets = runResult.messagingToolSentTargets ?? []; + if (activeSessionEntry) { + const now = Date.now(); + if (sentTexts.length || sentMediaUrls.length || sentTargets.length) { + activeSessionEntry.lastMessagingToolSentAt = now; + activeSessionEntry.lastMessagingToolSentTexts = sentTexts; + activeSessionEntry.lastMessagingToolSentMediaUrls = sentMediaUrls; + activeSessionEntry.lastMessagingToolSentTargets = sentTargets; + } else if ( + typeof activeSessionEntry.lastMessagingToolSentAt === "number" && + now - activeSessionEntry.lastMessagingToolSentAt > RECENT_MESSAGING_TOOL_DEDUPE_WINDOW_MS + ) { + delete activeSessionEntry.lastMessagingToolSentAt; + delete activeSessionEntry.lastMessagingToolSentTexts; + delete activeSessionEntry.lastMessagingToolSentMediaUrls; + delete activeSessionEntry.lastMessagingToolSentTargets; + } + if (sessionKey && activeSessionStore) { + activeSessionStore[sessionKey] = activeSessionEntry; + } + if (sessionKey && storePath) { + await updateSessionStoreEntry({ + storePath, + sessionKey, + update: async () => ({ + lastMessagingToolSentAt: activeSessionEntry.lastMessagingToolSentAt, + lastMessagingToolSentTexts: activeSessionEntry.lastMessagingToolSentTexts, + lastMessagingToolSentMediaUrls: activeSessionEntry.lastMessagingToolSentMediaUrls, + lastMessagingToolSentTargets: activeSessionEntry.lastMessagingToolSentTargets, + }), + }); + } + } + if (blockReplyPipeline) { await blockReplyPipeline.flush({ force: true }); blockReplyPipeline.stop(); diff --git a/src/auto-reply/reply/followup-runner.test.ts b/src/auto-reply/reply/followup-runner.test.ts index 0e93ab156a8..067b2713cc9 100644 --- a/src/auto-reply/reply/followup-runner.test.ts +++ b/src/auto-reply/reply/followup-runner.test.ts @@ -422,6 +422,32 @@ describe("createFollowupRunner messaging tool dedupe", () => { expect(onBlockReply).not.toHaveBeenCalled(); }); + it("suppresses replies using recent session-level messaging-tool dedupe state", async () => { + const onBlockReply = vi.fn(async () => {}); + const sessionEntry: SessionEntry = { + sessionId: "session", + updatedAt: Date.now(), + lastMessagingToolSentAt: Date.now(), + lastMessagingToolSentTexts: ["hello world!"], + }; + const sessionStore: Record = { main: sessionEntry }; + + runEmbeddedPiAgentMock.mockResolvedValueOnce({ + payloads: [{ text: "hello world!" }], + meta: {}, + }); + + const runner = createMessagingDedupeRunner(onBlockReply, { + sessionEntry, + sessionStore, + sessionKey: "main", + }); + + await runner(baseQueuedRun()); + + expect(onBlockReply).not.toHaveBeenCalled(); + }); + it("drops media URL from payload when messaging tool already sent it", async () => { const { onBlockReply } = await runMessagingCase({ agentResult: { diff --git a/src/auto-reply/reply/followup-runner.ts b/src/auto-reply/reply/followup-runner.ts index 2fd21607095..cf7384fd3bc 100644 --- a/src/auto-reply/reply/followup-runner.ts +++ b/src/auto-reply/reply/followup-runner.ts @@ -38,6 +38,8 @@ import { incrementRunCompactionCount, persistRunSessionUsage } from "./session-r import { createTypingSignaler } from "./typing-mode.js"; import type { TypingController } from "./typing.js"; +const RECENT_MESSAGING_TOOL_DEDUPE_WINDOW_MS = 2 * 60 * 1000; + export function createFollowupRunner(params: { opts?: GetReplyOptions; typing: TypingController; @@ -316,20 +318,37 @@ export function createFollowupRunner(params: { replyToChannel, }); + const now = Date.now(); + const recentWindowActive = + typeof sessionEntry?.lastMessagingToolSentAt === "number" && + now - sessionEntry.lastMessagingToolSentAt <= RECENT_MESSAGING_TOOL_DEDUPE_WINDOW_MS; + const sentTexts = [ + ...(runResult.messagingToolSentTexts ?? []), + ...(recentWindowActive ? (sessionEntry?.lastMessagingToolSentTexts ?? []) : []), + ]; + const sentMediaUrls = [ + ...(runResult.messagingToolSentMediaUrls ?? []), + ...(recentWindowActive ? (sessionEntry?.lastMessagingToolSentMediaUrls ?? []) : []), + ]; + const sentTargets = [ + ...(runResult.messagingToolSentTargets ?? []), + ...(recentWindowActive ? (sessionEntry?.lastMessagingToolSentTargets ?? []) : []), + ]; + const dedupedPayloads = filterMessagingToolDuplicates({ payloads: replyTaggedPayloads, - sentTexts: runResult.messagingToolSentTexts ?? [], + sentTexts, }); const mediaFilteredPayloads = filterMessagingToolMediaDuplicates({ payloads: dedupedPayloads, - sentMediaUrls: runResult.messagingToolSentMediaUrls ?? [], + sentMediaUrls, }); const suppressMessagingToolReplies = shouldSuppressMessagingToolReplies({ messageProvider: resolveOriginMessageProvider({ originatingChannel: queued.originatingChannel, provider: queued.run.messageProvider, }), - messagingToolSentTargets: runResult.messagingToolSentTargets, + messagingToolSentTargets: sentTargets, originatingTo: resolveOriginMessageTo({ originatingTo: queued.originatingTo, }), diff --git a/src/config/sessions/types.ts b/src/config/sessions/types.ts index 6513fc81b37..64e43ecd2ff 100644 --- a/src/config/sessions/types.ts +++ b/src/config/sessions/types.ts @@ -73,6 +73,19 @@ export type SessionEntry = { lastHeartbeatText?: string; /** Timestamp (ms) when lastHeartbeatText was delivered. */ lastHeartbeatSentAt?: number; + /** Timestamp (ms) for the most recent message-tool send fingerprint (cross-run dedupe). */ + lastMessagingToolSentAt?: number; + /** Recently sent message-tool text payloads for short-window cross-run dedupe. */ + lastMessagingToolSentTexts?: string[]; + /** Recently sent message-tool media urls for short-window cross-run dedupe. */ + lastMessagingToolSentMediaUrls?: string[]; + /** Recently sent message-tool routing targets for short-window cross-run dedupe. */ + lastMessagingToolSentTargets?: Array<{ + tool?: string; + provider?: string; + to?: string; + accountId?: string; + }>; sessionId: string; updatedAt: number; sessionFile?: string; From a1b5386693f3e112fc105cc98ea46fecff81cc6c Mon Sep 17 00:00:00 2001 From: Kim <150593189+KimGLee@users.noreply.github.com> Date: Fri, 27 Feb 2026 11:20:49 +0800 Subject: [PATCH 02/15] fix(reply): define dedupe window constant in agent-runner --- src/auto-reply/reply/agent-runner.ts | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/auto-reply/reply/agent-runner.ts b/src/auto-reply/reply/agent-runner.ts index 4cdae88142d..3d89eabb193 100644 --- a/src/auto-reply/reply/agent-runner.ts +++ b/src/auto-reply/reply/agent-runner.ts @@ -58,6 +58,8 @@ import { incrementRunCompactionCount, persistRunSessionUsage } from "./session-r import { createTypingSignaler } from "./typing-mode.js"; import type { TypingController } from "./typing.js"; +const RECENT_MESSAGING_TOOL_DEDUPE_WINDOW_MS = 2 * 60 * 1000; + const BLOCK_REPLY_SEND_TIMEOUT_MS = 15_000; export async function runReplyAgent(params: { From 15dd43f6bc46a9da58606c176cd4b5e9cbef70dd Mon Sep 17 00:00:00 2001 From: Kim <150593189+KimGLee@users.noreply.github.com> Date: Fri, 27 Feb 2026 11:26:41 +0800 Subject: [PATCH 03/15] fix(reply): address CI type errors for cross-run dedupe state --- src/auto-reply/reply/agent-runner.ts | 33 ++++++++++++++-------------- src/config/sessions/types.ts | 8 ++----- 2 files changed, 19 insertions(+), 22 deletions(-) diff --git a/src/auto-reply/reply/agent-runner.ts b/src/auto-reply/reply/agent-runner.ts index 3d89eabb193..a7d3131107a 100644 --- a/src/auto-reply/reply/agent-runner.ts +++ b/src/auto-reply/reply/agent-runner.ts @@ -419,34 +419,35 @@ export async function runReplyAgent(params: { const sentTexts = runResult.messagingToolSentTexts ?? []; const sentMediaUrls = runResult.messagingToolSentMediaUrls ?? []; const sentTargets = runResult.messagingToolSentTargets ?? []; - if (activeSessionEntry) { + const sessionDedupeEntry = activeSessionEntry; + if (sessionDedupeEntry) { const now = Date.now(); if (sentTexts.length || sentMediaUrls.length || sentTargets.length) { - activeSessionEntry.lastMessagingToolSentAt = now; - activeSessionEntry.lastMessagingToolSentTexts = sentTexts; - activeSessionEntry.lastMessagingToolSentMediaUrls = sentMediaUrls; - activeSessionEntry.lastMessagingToolSentTargets = sentTargets; + sessionDedupeEntry.lastMessagingToolSentAt = now; + sessionDedupeEntry.lastMessagingToolSentTexts = sentTexts; + sessionDedupeEntry.lastMessagingToolSentMediaUrls = sentMediaUrls; + sessionDedupeEntry.lastMessagingToolSentTargets = sentTargets; } else if ( - typeof activeSessionEntry.lastMessagingToolSentAt === "number" && - now - activeSessionEntry.lastMessagingToolSentAt > RECENT_MESSAGING_TOOL_DEDUPE_WINDOW_MS + typeof sessionDedupeEntry.lastMessagingToolSentAt === "number" && + now - sessionDedupeEntry.lastMessagingToolSentAt > RECENT_MESSAGING_TOOL_DEDUPE_WINDOW_MS ) { - delete activeSessionEntry.lastMessagingToolSentAt; - delete activeSessionEntry.lastMessagingToolSentTexts; - delete activeSessionEntry.lastMessagingToolSentMediaUrls; - delete activeSessionEntry.lastMessagingToolSentTargets; + delete sessionDedupeEntry.lastMessagingToolSentAt; + delete sessionDedupeEntry.lastMessagingToolSentTexts; + delete sessionDedupeEntry.lastMessagingToolSentMediaUrls; + delete sessionDedupeEntry.lastMessagingToolSentTargets; } if (sessionKey && activeSessionStore) { - activeSessionStore[sessionKey] = activeSessionEntry; + activeSessionStore[sessionKey] = sessionDedupeEntry; } if (sessionKey && storePath) { await updateSessionStoreEntry({ storePath, sessionKey, update: async () => ({ - lastMessagingToolSentAt: activeSessionEntry.lastMessagingToolSentAt, - lastMessagingToolSentTexts: activeSessionEntry.lastMessagingToolSentTexts, - lastMessagingToolSentMediaUrls: activeSessionEntry.lastMessagingToolSentMediaUrls, - lastMessagingToolSentTargets: activeSessionEntry.lastMessagingToolSentTargets, + lastMessagingToolSentAt: sessionDedupeEntry.lastMessagingToolSentAt, + lastMessagingToolSentTexts: sessionDedupeEntry.lastMessagingToolSentTexts, + lastMessagingToolSentMediaUrls: sessionDedupeEntry.lastMessagingToolSentMediaUrls, + lastMessagingToolSentTargets: sessionDedupeEntry.lastMessagingToolSentTargets, }), }); } diff --git a/src/config/sessions/types.ts b/src/config/sessions/types.ts index 64e43ecd2ff..e133b4ae300 100644 --- a/src/config/sessions/types.ts +++ b/src/config/sessions/types.ts @@ -1,5 +1,6 @@ import crypto from "node:crypto"; import type { Skill } from "@mariozechner/pi-coding-agent"; +import type { MessagingToolSend } from "../../agents/pi-embedded-runner.js"; import type { ChatType } from "../../channels/chat-type.js"; import type { ChannelId } from "../../channels/plugins/types.js"; import type { DeliveryContext } from "../../utils/delivery-context.js"; @@ -80,12 +81,7 @@ export type SessionEntry = { /** Recently sent message-tool media urls for short-window cross-run dedupe. */ lastMessagingToolSentMediaUrls?: string[]; /** Recently sent message-tool routing targets for short-window cross-run dedupe. */ - lastMessagingToolSentTargets?: Array<{ - tool?: string; - provider?: string; - to?: string; - accountId?: string; - }>; + lastMessagingToolSentTargets?: MessagingToolSend[]; sessionId: string; updatedAt: number; sessionFile?: string; From a1e6f0f943cc1654030c3f21d7f30327cc9906cf Mon Sep 17 00:00:00 2001 From: Kim <150593189+KimGLee@users.noreply.github.com> Date: Fri, 27 Feb 2026 13:51:30 +0800 Subject: [PATCH 04/15] fix(reply): narrow cross-run suppression and harden dedupe-state persistence --- src/auto-reply/reply/agent-runner.ts | 28 ++++++++++++++++--------- src/auto-reply/reply/followup-runner.ts | 7 +++---- 2 files changed, 21 insertions(+), 14 deletions(-) diff --git a/src/auto-reply/reply/agent-runner.ts b/src/auto-reply/reply/agent-runner.ts index a7d3131107a..ecc08a8a7d5 100644 --- a/src/auto-reply/reply/agent-runner.ts +++ b/src/auto-reply/reply/agent-runner.ts @@ -440,16 +440,24 @@ export async function runReplyAgent(params: { activeSessionStore[sessionKey] = sessionDedupeEntry; } if (sessionKey && storePath) { - await updateSessionStoreEntry({ - storePath, - sessionKey, - update: async () => ({ - lastMessagingToolSentAt: sessionDedupeEntry.lastMessagingToolSentAt, - lastMessagingToolSentTexts: sessionDedupeEntry.lastMessagingToolSentTexts, - lastMessagingToolSentMediaUrls: sessionDedupeEntry.lastMessagingToolSentMediaUrls, - lastMessagingToolSentTargets: sessionDedupeEntry.lastMessagingToolSentTargets, - }), - }); + try { + await updateSessionStoreEntry({ + storePath, + sessionKey, + update: async () => ({ + lastMessagingToolSentAt: sessionDedupeEntry.lastMessagingToolSentAt, + lastMessagingToolSentTexts: sessionDedupeEntry.lastMessagingToolSentTexts, + lastMessagingToolSentMediaUrls: sessionDedupeEntry.lastMessagingToolSentMediaUrls, + lastMessagingToolSentTargets: sessionDedupeEntry.lastMessagingToolSentTargets, + }), + }); + } catch (error) { + logger.warning( + "Failed to persist messaging-tool dedupe state for session {}: {}", + sessionKey, + error, + ); + } } } diff --git a/src/auto-reply/reply/followup-runner.ts b/src/auto-reply/reply/followup-runner.ts index cf7384fd3bc..e8ea958f978 100644 --- a/src/auto-reply/reply/followup-runner.ts +++ b/src/auto-reply/reply/followup-runner.ts @@ -330,10 +330,9 @@ export function createFollowupRunner(params: { ...(runResult.messagingToolSentMediaUrls ?? []), ...(recentWindowActive ? (sessionEntry?.lastMessagingToolSentMediaUrls ?? []) : []), ]; - const sentTargets = [ - ...(runResult.messagingToolSentTargets ?? []), - ...(recentWindowActive ? (sessionEntry?.lastMessagingToolSentTargets ?? []) : []), - ]; + // Keep target-based suppression scoped to the current run only. + // Session-level dedupe state is used for text/media duplicate filtering. + const sentTargets = runResult.messagingToolSentTargets ?? []; const dedupedPayloads = filterMessagingToolDuplicates({ payloads: replyTaggedPayloads, From 3792269b5e194e8228a4fda8ff080b04bc3fc929 Mon Sep 17 00:00:00 2001 From: Kim <150593189+KimGLee@users.noreply.github.com> Date: Fri, 27 Feb 2026 13:56:39 +0800 Subject: [PATCH 05/15] fix(reply): avoid undefined logger in dedupe-state persistence warning --- src/auto-reply/reply/agent-runner.ts | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/auto-reply/reply/agent-runner.ts b/src/auto-reply/reply/agent-runner.ts index ecc08a8a7d5..c80704f4cb9 100644 --- a/src/auto-reply/reply/agent-runner.ts +++ b/src/auto-reply/reply/agent-runner.ts @@ -452,8 +452,8 @@ export async function runReplyAgent(params: { }), }); } catch (error) { - logger.warning( - "Failed to persist messaging-tool dedupe state for session {}: {}", + console.warn( + "Failed to persist messaging-tool dedupe state for session", sessionKey, error, ); From 91ae4468f509964b09ab0dd3c43a78e3482157d0 Mon Sep 17 00:00:00 2001 From: Kim <150593189+KimGLee@users.noreply.github.com> Date: Fri, 27 Feb 2026 14:22:46 +0800 Subject: [PATCH 06/15] fix(reply): scope cross-run dedupe to matching routing target --- src/auto-reply/reply/followup-runner.test.ts | 335 +++++++++++-------- src/auto-reply/reply/followup-runner.ts | 41 ++- 2 files changed, 224 insertions(+), 152 deletions(-) diff --git a/src/auto-reply/reply/followup-runner.test.ts b/src/auto-reply/reply/followup-runner.test.ts index 067b2713cc9..add48e3a129 100644 --- a/src/auto-reply/reply/followup-runner.test.ts +++ b/src/auto-reply/reply/followup-runner.test.ts @@ -79,10 +79,6 @@ function mockCompactionRun(params: { ); } -function createAsyncReplySpy() { - return vi.fn(async () => {}); -} - describe("createFollowupRunner compaction", () => { it("adds verbose auto-compaction notice and tracks count", async () => { const storePath = path.join( @@ -321,97 +317,95 @@ describe("createFollowupRunner messaging tool dedupe", () => { }); } - async function runMessagingCase(params: { - agentResult: Record; - queued?: FollowupRun; - runnerOverrides?: Partial<{ - sessionEntry: SessionEntry; - sessionStore: Record; - sessionKey: string; - storePath: string; - }>; - }) { - const onBlockReply = createAsyncReplySpy(); - runEmbeddedPiAgentMock.mockResolvedValueOnce({ - meta: {}, - ...params.agentResult, - }); - const runner = createMessagingDedupeRunner(onBlockReply, params.runnerOverrides); - await runner(params.queued ?? baseQueuedRun()); - return { onBlockReply }; - } - - function makeTextReplyDedupeResult(overrides?: Record) { - return { - payloads: [{ text: "hello world!" }], - messagingToolSentTexts: ["different message"], - ...overrides, - }; - } - it("drops payloads already sent via messaging tool", async () => { - const { onBlockReply } = await runMessagingCase({ - agentResult: { - payloads: [{ text: "hello world!" }], - messagingToolSentTexts: ["hello world!"], - }, + const onBlockReply = vi.fn(async () => {}); + runEmbeddedPiAgentMock.mockResolvedValueOnce({ + payloads: [{ text: "hello world!" }], + messagingToolSentTexts: ["hello world!"], + meta: {}, + }); + + const runner = createMessagingDedupeRunner(onBlockReply); + + await runner({ + ...baseQueuedRun("telegram"), + originatingTo: "123", }); expect(onBlockReply).not.toHaveBeenCalled(); }); it("delivers payloads when not duplicates", async () => { - const { onBlockReply } = await runMessagingCase({ - agentResult: makeTextReplyDedupeResult(), + const onBlockReply = vi.fn(async () => {}); + runEmbeddedPiAgentMock.mockResolvedValueOnce({ + payloads: [{ text: "hello world!" }], + messagingToolSentTexts: ["different message"], + meta: {}, }); + const runner = createMessagingDedupeRunner(onBlockReply); + + await runner(baseQueuedRun()); + expect(onBlockReply).toHaveBeenCalledTimes(1); }); it("suppresses replies when a messaging tool sent via the same provider + target", async () => { - const { onBlockReply } = await runMessagingCase({ - agentResult: { - ...makeTextReplyDedupeResult(), - messagingToolSentTargets: [{ tool: "slack", provider: "slack", to: "channel:C1" }], - }, - queued: baseQueuedRun("slack"), + const onBlockReply = vi.fn(async () => {}); + runEmbeddedPiAgentMock.mockResolvedValueOnce({ + payloads: [{ text: "hello world!" }], + messagingToolSentTexts: ["different message"], + messagingToolSentTargets: [{ tool: "slack", provider: "slack", to: "channel:C1" }], + meta: {}, }); + const runner = createMessagingDedupeRunner(onBlockReply); + + await runner(baseQueuedRun("slack")); + expect(onBlockReply).not.toHaveBeenCalled(); }); it("suppresses replies when provider is synthetic but originating channel matches", async () => { - const { onBlockReply } = await runMessagingCase({ - agentResult: { - ...makeTextReplyDedupeResult(), - messagingToolSentTargets: [{ tool: "telegram", provider: "telegram", to: "268300329" }], - }, - queued: { - ...baseQueuedRun("heartbeat"), - originatingChannel: "telegram", - originatingTo: "268300329", - } as FollowupRun, + const onBlockReply = vi.fn(async () => {}); + runEmbeddedPiAgentMock.mockResolvedValueOnce({ + payloads: [{ text: "hello world!" }], + messagingToolSentTexts: ["different message"], + messagingToolSentTargets: [{ tool: "telegram", provider: "telegram", to: "268300329" }], + meta: {}, }); + const runner = createMessagingDedupeRunner(onBlockReply); + + await runner({ + ...baseQueuedRun("heartbeat"), + originatingChannel: "telegram", + originatingTo: "268300329", + } as FollowupRun); + expect(onBlockReply).not.toHaveBeenCalled(); }); it("does not suppress replies for same target when account differs", async () => { - const { onBlockReply } = await runMessagingCase({ - agentResult: { - ...makeTextReplyDedupeResult(), - messagingToolSentTargets: [ - { tool: "telegram", provider: "telegram", to: "268300329", accountId: "work" }, - ], - }, - queued: { - ...baseQueuedRun("heartbeat"), - originatingChannel: "telegram", - originatingTo: "268300329", - originatingAccountId: "personal", - } as FollowupRun, + const onBlockReply = vi.fn(async () => {}); + runEmbeddedPiAgentMock.mockResolvedValueOnce({ + payloads: [{ text: "hello world!" }], + messagingToolSentTexts: ["different message"], + messagingToolSentTargets: [ + { tool: "telegram", provider: "telegram", to: "268300329", accountId: "work" }, + ], + meta: {}, }); + const runner = createMessagingDedupeRunner(onBlockReply); + + await runner({ + ...baseQueuedRun("heartbeat"), + originatingChannel: "telegram", + originatingTo: "268300329", + originatingAccountId: "personal", + } as FollowupRun); + expect(routeReplyMock).toHaveBeenCalledWith( expect.objectContaining({ channel: "telegram", @@ -429,6 +423,7 @@ describe("createFollowupRunner messaging tool dedupe", () => { updatedAt: Date.now(), lastMessagingToolSentAt: Date.now(), lastMessagingToolSentTexts: ["hello world!"], + lastMessagingToolSentTargets: [{ tool: "message", provider: "telegram", to: "123" }], }; const sessionStore: Record = { main: sessionEntry }; @@ -443,31 +438,69 @@ describe("createFollowupRunner messaging tool dedupe", () => { sessionKey: "main", }); - await runner(baseQueuedRun()); + await runner({ + ...baseQueuedRun("telegram"), + originatingTo: "123", + }); expect(onBlockReply).not.toHaveBeenCalled(); }); - it("drops media URL from payload when messaging tool already sent it", async () => { - const { onBlockReply } = await runMessagingCase({ - agentResult: { - payloads: [{ mediaUrl: "/tmp/img.png" }], - messagingToolSentMediaUrls: ["/tmp/img.png"], - }, + it("does not use session-level text dedupe when recent target does not match", async () => { + const onBlockReply = vi.fn(async () => {}); + const sessionEntry: SessionEntry = { + sessionId: "session", + updatedAt: Date.now(), + lastMessagingToolSentAt: Date.now(), + lastMessagingToolSentTexts: ["hello world!"], + lastMessagingToolSentTargets: [{ tool: "message", provider: "telegram", to: "999" }], + }; + const sessionStore: Record = { main: sessionEntry }; + + runEmbeddedPiAgentMock.mockResolvedValueOnce({ + payloads: [{ text: "hello world!" }], + meta: {}, }); + const runner = createMessagingDedupeRunner(onBlockReply, { + sessionEntry, + sessionStore, + sessionKey: "main", + }); + + await runner(baseQueuedRun("telegram")); + + expect(onBlockReply).toHaveBeenCalled(); + }); + + it("drops media URL from payload when messaging tool already sent it", async () => { + const onBlockReply = vi.fn(async () => {}); + runEmbeddedPiAgentMock.mockResolvedValueOnce({ + payloads: [{ mediaUrl: "/tmp/img.png" }], + messagingToolSentMediaUrls: ["/tmp/img.png"], + meta: {}, + }); + + const runner = createMessagingDedupeRunner(onBlockReply); + + await runner(baseQueuedRun()); + // Media stripped → payload becomes non-renderable → not delivered. expect(onBlockReply).not.toHaveBeenCalled(); }); it("delivers media payload when not a duplicate", async () => { - const { onBlockReply } = await runMessagingCase({ - agentResult: { - payloads: [{ mediaUrl: "/tmp/img.png" }], - messagingToolSentMediaUrls: ["/tmp/other.png"], - }, + const onBlockReply = vi.fn(async () => {}); + runEmbeddedPiAgentMock.mockResolvedValueOnce({ + payloads: [{ mediaUrl: "/tmp/img.png" }], + messagingToolSentMediaUrls: ["/tmp/other.png"], + meta: {}, }); + const runner = createMessagingDedupeRunner(onBlockReply); + + await runner(baseQueuedRun()); + expect(onBlockReply).toHaveBeenCalledTimes(1); }); @@ -481,28 +514,30 @@ describe("createFollowupRunner messaging tool dedupe", () => { const sessionStore: Record = { [sessionKey]: sessionEntry }; await saveSessionStore(storePath, sessionStore); - const { onBlockReply } = await runMessagingCase({ - agentResult: { - ...makeTextReplyDedupeResult(), - messagingToolSentTargets: [{ tool: "slack", provider: "slack", to: "channel:C1" }], - meta: { - agentMeta: { - usage: { input: 1_000, output: 50 }, - lastCallUsage: { input: 400, output: 20 }, - model: "claude-opus-4-5", - provider: "anthropic", - }, + const onBlockReply = vi.fn(async () => {}); + runEmbeddedPiAgentMock.mockResolvedValueOnce({ + payloads: [{ text: "hello world!" }], + messagingToolSentTexts: ["different message"], + messagingToolSentTargets: [{ tool: "slack", provider: "slack", to: "channel:C1" }], + meta: { + agentMeta: { + usage: { input: 1_000, output: 50 }, + lastCallUsage: { input: 400, output: 20 }, + model: "claude-opus-4-5", + provider: "anthropic", }, }, - runnerOverrides: { - sessionEntry, - sessionStore, - sessionKey, - storePath, - }, - queued: baseQueuedRun("slack"), }); + const runner = createMessagingDedupeRunner(onBlockReply, { + sessionEntry, + sessionStore, + sessionKey, + storePath, + }); + + await runner(baseQueuedRun("slack")); + expect(onBlockReply).not.toHaveBeenCalled(); const store = loadSessionStore(storePath, { skipCache: true }); // totalTokens should reflect the last call usage snapshot, not the accumulated input. @@ -572,36 +607,46 @@ describe("createFollowupRunner messaging tool dedupe", () => { }); it("does not fall back to dispatcher when cross-channel origin routing fails", async () => { + const onBlockReply = vi.fn(async () => {}); + runEmbeddedPiAgentMock.mockResolvedValueOnce({ + payloads: [{ text: "hello world!" }], + meta: {}, + }); routeReplyMock.mockResolvedValueOnce({ ok: false, error: "forced route failure", }); - const { onBlockReply } = await runMessagingCase({ - agentResult: { payloads: [{ text: "hello world!" }] }, - queued: { - ...baseQueuedRun("webchat"), - originatingChannel: "discord", - originatingTo: "channel:C1", - } as FollowupRun, - }); + + const runner = createMessagingDedupeRunner(onBlockReply); + + await runner({ + ...baseQueuedRun("webchat"), + originatingChannel: "discord", + originatingTo: "channel:C1", + } as FollowupRun); expect(routeReplyMock).toHaveBeenCalled(); expect(onBlockReply).not.toHaveBeenCalled(); }); it("falls back to dispatcher when same-channel origin routing fails", async () => { + const onBlockReply = vi.fn(async () => {}); + runEmbeddedPiAgentMock.mockResolvedValueOnce({ + payloads: [{ text: "hello world!" }], + meta: {}, + }); routeReplyMock.mockResolvedValueOnce({ ok: false, error: "outbound adapter unavailable", }); - const { onBlockReply } = await runMessagingCase({ - agentResult: { payloads: [{ text: "hello world!" }] }, - queued: { - ...baseQueuedRun(" Feishu "), - originatingChannel: "FEISHU", - originatingTo: "ou_abc123", - } as FollowupRun, - }); + + const runner = createMessagingDedupeRunner(onBlockReply); + + await runner({ + ...baseQueuedRun(" Feishu "), + originatingChannel: "FEISHU", + originatingTo: "ou_abc123", + } as FollowupRun); expect(routeReplyMock).toHaveBeenCalled(); expect(onBlockReply).toHaveBeenCalledTimes(1); @@ -609,17 +654,22 @@ describe("createFollowupRunner messaging tool dedupe", () => { }); it("routes followups with originating account/thread metadata", async () => { - const { onBlockReply } = await runMessagingCase({ - agentResult: { payloads: [{ text: "hello world!" }] }, - queued: { - ...baseQueuedRun("webchat"), - originatingChannel: "discord", - originatingTo: "channel:C1", - originatingAccountId: "work", - originatingThreadId: "1739142736.000100", - } as FollowupRun, + const onBlockReply = vi.fn(async () => {}); + runEmbeddedPiAgentMock.mockResolvedValueOnce({ + payloads: [{ text: "hello world!" }], + meta: {}, }); + const runner = createMessagingDedupeRunner(onBlockReply); + + await runner({ + ...baseQueuedRun("webchat"), + originatingChannel: "discord", + originatingTo: "channel:C1", + originatingAccountId: "work", + originatingThreadId: "1739142736.000100", + } as FollowupRun); + expect(routeReplyMock).toHaveBeenCalledWith( expect.objectContaining({ channel: "discord", @@ -633,37 +683,44 @@ describe("createFollowupRunner messaging tool dedupe", () => { }); describe("createFollowupRunner typing cleanup", () => { - async function runTypingCase(agentResult: Record) { + it("calls both markRunComplete and markDispatchIdle on NO_REPLY", async () => { const typing = createMockTypingController(); runEmbeddedPiAgentMock.mockResolvedValueOnce({ + payloads: [{ text: "NO_REPLY" }], meta: {}, - ...agentResult, }); const runner = createFollowupRunner({ - opts: { onBlockReply: createAsyncReplySpy() }, + opts: { onBlockReply: vi.fn(async () => {}) }, typing, typingMode: "instant", defaultModel: "anthropic/claude-opus-4-5", }); await runner(baseQueuedRun()); - return typing; - } - function expectTypingCleanup(typing: ReturnType) { expect(typing.markRunComplete).toHaveBeenCalled(); expect(typing.markDispatchIdle).toHaveBeenCalled(); - } - - it("calls both markRunComplete and markDispatchIdle on NO_REPLY", async () => { - const typing = await runTypingCase({ payloads: [{ text: "NO_REPLY" }] }); - expectTypingCleanup(typing); }); it("calls both markRunComplete and markDispatchIdle on empty payloads", async () => { - const typing = await runTypingCase({ payloads: [] }); - expectTypingCleanup(typing); + const typing = createMockTypingController(); + runEmbeddedPiAgentMock.mockResolvedValueOnce({ + payloads: [], + meta: {}, + }); + + const runner = createFollowupRunner({ + opts: { onBlockReply: vi.fn(async () => {}) }, + typing, + typingMode: "instant", + defaultModel: "anthropic/claude-opus-4-5", + }); + + await runner(baseQueuedRun()); + + expect(typing.markRunComplete).toHaveBeenCalled(); + expect(typing.markDispatchIdle).toHaveBeenCalled(); }); it("calls both markRunComplete and markDispatchIdle on agent error", async () => { @@ -679,7 +736,8 @@ describe("createFollowupRunner typing cleanup", () => { await runner(baseQueuedRun()); - expectTypingCleanup(typing); + expect(typing.markRunComplete).toHaveBeenCalled(); + expect(typing.markDispatchIdle).toHaveBeenCalled(); }); it("calls both markRunComplete and markDispatchIdle on successful delivery", async () => { @@ -700,7 +758,8 @@ describe("createFollowupRunner typing cleanup", () => { await runner(baseQueuedRun()); expect(onBlockReply).toHaveBeenCalled(); - expectTypingCleanup(typing); + expect(typing.markRunComplete).toHaveBeenCalled(); + expect(typing.markDispatchIdle).toHaveBeenCalled(); }); }); diff --git a/src/auto-reply/reply/followup-runner.ts b/src/auto-reply/reply/followup-runner.ts index e8ea958f978..a8f0222d090 100644 --- a/src/auto-reply/reply/followup-runner.ts +++ b/src/auto-reply/reply/followup-runner.ts @@ -318,20 +318,41 @@ export function createFollowupRunner(params: { replyToChannel, }); + const messageProvider = resolveOriginMessageProvider({ + originatingChannel: queued.originatingChannel, + provider: queued.run.messageProvider, + }); + const originatingTo = resolveOriginMessageTo({ + originatingTo: queued.originatingTo, + }); + const originAccountId = resolveOriginAccountId({ + originatingAccountId: queued.originatingAccountId, + accountId: queued.run.agentAccountId, + }); + const now = Date.now(); const recentWindowActive = typeof sessionEntry?.lastMessagingToolSentAt === "number" && now - sessionEntry.lastMessagingToolSentAt <= RECENT_MESSAGING_TOOL_DEDUPE_WINDOW_MS; + const recentTargetMatch = + recentWindowActive && + shouldSuppressMessagingToolReplies({ + messageProvider, + messagingToolSentTargets: sessionEntry?.lastMessagingToolSentTargets, + originatingTo, + accountId: originAccountId, + }); + const sentTexts = [ ...(runResult.messagingToolSentTexts ?? []), - ...(recentWindowActive ? (sessionEntry?.lastMessagingToolSentTexts ?? []) : []), + ...(recentTargetMatch ? (sessionEntry?.lastMessagingToolSentTexts ?? []) : []), ]; const sentMediaUrls = [ ...(runResult.messagingToolSentMediaUrls ?? []), - ...(recentWindowActive ? (sessionEntry?.lastMessagingToolSentMediaUrls ?? []) : []), + ...(recentTargetMatch ? (sessionEntry?.lastMessagingToolSentMediaUrls ?? []) : []), ]; // Keep target-based suppression scoped to the current run only. - // Session-level dedupe state is used for text/media duplicate filtering. + // Session-level dedupe state is used for text/media duplicate filtering when target matches. const sentTargets = runResult.messagingToolSentTargets ?? []; const dedupedPayloads = filterMessagingToolDuplicates({ @@ -343,18 +364,10 @@ export function createFollowupRunner(params: { sentMediaUrls, }); const suppressMessagingToolReplies = shouldSuppressMessagingToolReplies({ - messageProvider: resolveOriginMessageProvider({ - originatingChannel: queued.originatingChannel, - provider: queued.run.messageProvider, - }), + messageProvider, messagingToolSentTargets: sentTargets, - originatingTo: resolveOriginMessageTo({ - originatingTo: queued.originatingTo, - }), - accountId: resolveOriginAccountId({ - originatingAccountId: queued.originatingAccountId, - accountId: queued.run.agentAccountId, - }), + originatingTo, + accountId: originAccountId, }); const finalPayloads = suppressMessagingToolReplies ? [] : mediaFilteredPayloads; From b8d8e59b892cef9f8225da82e61973fce1aa72cf Mon Sep 17 00:00:00 2001 From: Kim <150593189+KimGLee@users.noreply.github.com> Date: Fri, 27 Feb 2026 14:44:30 +0800 Subject: [PATCH 07/15] fix(reply): isolate cross-run dedupe by session id and clear on reset --- src/auto-reply/reply/agent-runner.ts | 7 +++++ src/auto-reply/reply/followup-runner.test.ts | 33 ++++++++++++++++++++ src/auto-reply/reply/followup-runner.ts | 1 + src/config/sessions/types.ts | 2 ++ 4 files changed, 43 insertions(+) diff --git a/src/auto-reply/reply/agent-runner.ts b/src/auto-reply/reply/agent-runner.ts index c80704f4cb9..36f0a173fa7 100644 --- a/src/auto-reply/reply/agent-runner.ts +++ b/src/auto-reply/reply/agent-runner.ts @@ -294,6 +294,11 @@ export async function runReplyAgent(params: { fallbackNoticeSelectedModel: undefined, fallbackNoticeActiveModel: undefined, fallbackNoticeReason: undefined, + lastMessagingToolSessionId: undefined, + lastMessagingToolSentAt: undefined, + lastMessagingToolSentTexts: undefined, + lastMessagingToolSentMediaUrls: undefined, + lastMessagingToolSentTargets: undefined, }; const agentId = resolveAgentIdFromSessionKey(sessionKey); const nextSessionFile = resolveSessionTranscriptPath( @@ -423,6 +428,7 @@ export async function runReplyAgent(params: { if (sessionDedupeEntry) { const now = Date.now(); if (sentTexts.length || sentMediaUrls.length || sentTargets.length) { + sessionDedupeEntry.lastMessagingToolSessionId = followupRun.run.sessionId; sessionDedupeEntry.lastMessagingToolSentAt = now; sessionDedupeEntry.lastMessagingToolSentTexts = sentTexts; sessionDedupeEntry.lastMessagingToolSentMediaUrls = sentMediaUrls; @@ -431,6 +437,7 @@ export async function runReplyAgent(params: { typeof sessionDedupeEntry.lastMessagingToolSentAt === "number" && now - sessionDedupeEntry.lastMessagingToolSentAt > RECENT_MESSAGING_TOOL_DEDUPE_WINDOW_MS ) { + delete sessionDedupeEntry.lastMessagingToolSessionId; delete sessionDedupeEntry.lastMessagingToolSentAt; delete sessionDedupeEntry.lastMessagingToolSentTexts; delete sessionDedupeEntry.lastMessagingToolSentMediaUrls; diff --git a/src/auto-reply/reply/followup-runner.test.ts b/src/auto-reply/reply/followup-runner.test.ts index add48e3a129..db4a086923c 100644 --- a/src/auto-reply/reply/followup-runner.test.ts +++ b/src/auto-reply/reply/followup-runner.test.ts @@ -421,6 +421,7 @@ describe("createFollowupRunner messaging tool dedupe", () => { const sessionEntry: SessionEntry = { sessionId: "session", updatedAt: Date.now(), + lastMessagingToolSessionId: "session", lastMessagingToolSentAt: Date.now(), lastMessagingToolSentTexts: ["hello world!"], lastMessagingToolSentTargets: [{ tool: "message", provider: "telegram", to: "123" }], @@ -446,6 +447,38 @@ describe("createFollowupRunner messaging tool dedupe", () => { expect(onBlockReply).not.toHaveBeenCalled(); }); + it("does not use session-level dedupe from a previous session id", async () => { + const onBlockReply = vi.fn(async () => {}); + const sessionEntry: SessionEntry = { + sessionId: "current-session", + updatedAt: Date.now(), + lastMessagingToolSessionId: "old-session", + lastMessagingToolSentAt: Date.now(), + lastMessagingToolSentTexts: ["hello world!"], + lastMessagingToolSentTargets: [{ tool: "message", provider: "telegram", to: "123" }], + }; + const sessionStore: Record = { main: sessionEntry }; + + runEmbeddedPiAgentMock.mockResolvedValueOnce({ + payloads: [{ text: "hello world!" }], + meta: {}, + }); + + const runner = createMessagingDedupeRunner(onBlockReply, { + sessionEntry, + sessionStore, + sessionKey: "main", + }); + + await runner({ + ...baseQueuedRun("telegram"), + originatingTo: "123", + run: { ...baseQueuedRun("telegram").run, sessionId: "current-session" }, + }); + + expect(onBlockReply).toHaveBeenCalled(); + }); + it("does not use session-level text dedupe when recent target does not match", async () => { const onBlockReply = vi.fn(async () => {}); const sessionEntry: SessionEntry = { diff --git a/src/auto-reply/reply/followup-runner.ts b/src/auto-reply/reply/followup-runner.ts index a8f0222d090..2793f588c94 100644 --- a/src/auto-reply/reply/followup-runner.ts +++ b/src/auto-reply/reply/followup-runner.ts @@ -333,6 +333,7 @@ export function createFollowupRunner(params: { const now = Date.now(); const recentWindowActive = typeof sessionEntry?.lastMessagingToolSentAt === "number" && + sessionEntry?.lastMessagingToolSessionId === queued.run.sessionId && now - sessionEntry.lastMessagingToolSentAt <= RECENT_MESSAGING_TOOL_DEDUPE_WINDOW_MS; const recentTargetMatch = recentWindowActive && diff --git a/src/config/sessions/types.ts b/src/config/sessions/types.ts index e133b4ae300..06ad3a53441 100644 --- a/src/config/sessions/types.ts +++ b/src/config/sessions/types.ts @@ -76,6 +76,8 @@ export type SessionEntry = { lastHeartbeatSentAt?: number; /** Timestamp (ms) for the most recent message-tool send fingerprint (cross-run dedupe). */ lastMessagingToolSentAt?: number; + /** Session id that produced the most recent message-tool dedupe fingerprint. */ + lastMessagingToolSessionId?: string; /** Recently sent message-tool text payloads for short-window cross-run dedupe. */ lastMessagingToolSentTexts?: string[]; /** Recently sent message-tool media urls for short-window cross-run dedupe. */ From 0bef79bef39cc651a884b26091d07aa112045ff2 Mon Sep 17 00:00:00 2001 From: Kim <150593189+KimGLee@users.noreply.github.com> Date: Fri, 27 Feb 2026 14:54:05 +0800 Subject: [PATCH 08/15] fix(reply): persist dedupe session id in session-store updates --- src/auto-reply/reply/agent-runner.ts | 1 + 1 file changed, 1 insertion(+) diff --git a/src/auto-reply/reply/agent-runner.ts b/src/auto-reply/reply/agent-runner.ts index 36f0a173fa7..0ac504154a3 100644 --- a/src/auto-reply/reply/agent-runner.ts +++ b/src/auto-reply/reply/agent-runner.ts @@ -452,6 +452,7 @@ export async function runReplyAgent(params: { storePath, sessionKey, update: async () => ({ + lastMessagingToolSessionId: sessionDedupeEntry.lastMessagingToolSessionId, lastMessagingToolSentAt: sessionDedupeEntry.lastMessagingToolSentAt, lastMessagingToolSentTexts: sessionDedupeEntry.lastMessagingToolSentTexts, lastMessagingToolSentMediaUrls: sessionDedupeEntry.lastMessagingToolSentMediaUrls, From 5625e3b2c01e6f5be355da5a6b395321b5dde04a Mon Sep 17 00:00:00 2001 From: KimGLee <05_bolster_inkling@icloud.com> Date: Tue, 3 Mar 2026 00:12:31 +0800 Subject: [PATCH 09/15] chore(ci): retrigger checks From 8ef9a77e6378b60880547a7ccc1955b8027903e8 Mon Sep 17 00:00:00 2001 From: KimGLee <05_bolster_inkling@icloud.com> Date: Tue, 3 Mar 2026 00:16:34 +0800 Subject: [PATCH 10/15] fix(followup): scope session dedupe fingerprints to single-target runs --- src/auto-reply/reply/followup-runner.test.ts | 34 ++++++++++++++++++++ src/auto-reply/reply/followup-runner.ts | 11 +++++-- 2 files changed, 42 insertions(+), 3 deletions(-) diff --git a/src/auto-reply/reply/followup-runner.test.ts b/src/auto-reply/reply/followup-runner.test.ts index db4a086923c..82f95ee5551 100644 --- a/src/auto-reply/reply/followup-runner.test.ts +++ b/src/auto-reply/reply/followup-runner.test.ts @@ -506,6 +506,40 @@ describe("createFollowupRunner messaging tool dedupe", () => { expect(onBlockReply).toHaveBeenCalled(); }); + it("does not reuse session-level text dedupe when prior run had multiple messaging targets", async () => { + const onBlockReply = vi.fn(async () => {}); + const sessionEntry: SessionEntry = { + sessionId: "session", + updatedAt: Date.now(), + lastMessagingToolSessionId: "session", + lastMessagingToolSentAt: Date.now(), + lastMessagingToolSentTexts: ["hello world!"], + lastMessagingToolSentTargets: [ + { tool: "message", provider: "telegram", to: "123" }, + { tool: "message", provider: "telegram", to: "999" }, + ], + }; + const sessionStore: Record = { main: sessionEntry }; + + runEmbeddedPiAgentMock.mockResolvedValueOnce({ + payloads: [{ text: "hello world!" }], + meta: {}, + }); + + const runner = createMessagingDedupeRunner(onBlockReply, { + sessionEntry, + sessionStore, + sessionKey: "main", + }); + + await runner({ + ...baseQueuedRun("telegram"), + originatingTo: "123", + }); + + expect(onBlockReply).toHaveBeenCalled(); + }); + it("drops media URL from payload when messaging tool already sent it", async () => { const onBlockReply = vi.fn(async () => {}); runEmbeddedPiAgentMock.mockResolvedValueOnce({ diff --git a/src/auto-reply/reply/followup-runner.ts b/src/auto-reply/reply/followup-runner.ts index 2793f588c94..9344c1e78ed 100644 --- a/src/auto-reply/reply/followup-runner.ts +++ b/src/auto-reply/reply/followup-runner.ts @@ -335,22 +335,27 @@ export function createFollowupRunner(params: { typeof sessionEntry?.lastMessagingToolSentAt === "number" && sessionEntry?.lastMessagingToolSessionId === queued.run.sessionId && now - sessionEntry.lastMessagingToolSentAt <= RECENT_MESSAGING_TOOL_DEDUPE_WINDOW_MS; + const previousSentTargets = sessionEntry?.lastMessagingToolSentTargets ?? []; const recentTargetMatch = recentWindowActive && shouldSuppressMessagingToolReplies({ messageProvider, - messagingToolSentTargets: sessionEntry?.lastMessagingToolSentTargets, + messagingToolSentTargets: previousSentTargets, originatingTo, accountId: originAccountId, }); + const canReuseSessionDedupeFingerprints = + recentTargetMatch && previousSentTargets.length <= 1; const sentTexts = [ ...(runResult.messagingToolSentTexts ?? []), - ...(recentTargetMatch ? (sessionEntry?.lastMessagingToolSentTexts ?? []) : []), + ...(canReuseSessionDedupeFingerprints ? (sessionEntry?.lastMessagingToolSentTexts ?? []) : []), ]; const sentMediaUrls = [ ...(runResult.messagingToolSentMediaUrls ?? []), - ...(recentTargetMatch ? (sessionEntry?.lastMessagingToolSentMediaUrls ?? []) : []), + ...(canReuseSessionDedupeFingerprints + ? (sessionEntry?.lastMessagingToolSentMediaUrls ?? []) + : []), ]; // Keep target-based suppression scoped to the current run only. // Session-level dedupe state is used for text/media duplicate filtering when target matches. From 5c37d58fff62fff80771b865e22f5116c5373e20 Mon Sep 17 00:00:00 2001 From: KimGLee <05_bolster_inkling@icloud.com> Date: Tue, 3 Mar 2026 00:20:54 +0800 Subject: [PATCH 11/15] style(followup): format dedupe scope changes --- src/auto-reply/reply/followup-runner.ts | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/src/auto-reply/reply/followup-runner.ts b/src/auto-reply/reply/followup-runner.ts index 9344c1e78ed..021eadcf241 100644 --- a/src/auto-reply/reply/followup-runner.ts +++ b/src/auto-reply/reply/followup-runner.ts @@ -349,7 +349,9 @@ export function createFollowupRunner(params: { const sentTexts = [ ...(runResult.messagingToolSentTexts ?? []), - ...(canReuseSessionDedupeFingerprints ? (sessionEntry?.lastMessagingToolSentTexts ?? []) : []), + ...(canReuseSessionDedupeFingerprints + ? (sessionEntry?.lastMessagingToolSentTexts ?? []) + : []), ]; const sentMediaUrls = [ ...(runResult.messagingToolSentMediaUrls ?? []), From 868d766b9a47e0feac6bf0db97b0135d2bfda8f4 Mon Sep 17 00:00:00 2001 From: KimGLee <05_bolster_inkling@icloud.com> Date: Tue, 3 Mar 2026 00:37:32 +0800 Subject: [PATCH 12/15] fix(ci): resolve tsgo regressions and target-scoped followup dedupe --- src/auto-reply/reply/followup-runner.test.ts | 34 ++++++++++++++++++++ src/auto-reply/reply/followup-runner.ts | 11 ++++++- 2 files changed, 44 insertions(+), 1 deletion(-) diff --git a/src/auto-reply/reply/followup-runner.test.ts b/src/auto-reply/reply/followup-runner.test.ts index 82f95ee5551..c0da2f757eb 100644 --- a/src/auto-reply/reply/followup-runner.test.ts +++ b/src/auto-reply/reply/followup-runner.test.ts @@ -540,6 +540,40 @@ describe("createFollowupRunner messaging tool dedupe", () => { expect(onBlockReply).toHaveBeenCalled(); }); + it("reuses session-level text dedupe when prior run had repeated sends to one target", async () => { + const onBlockReply = vi.fn(async () => {}); + const sessionEntry: SessionEntry = { + sessionId: "session", + updatedAt: Date.now(), + lastMessagingToolSessionId: "session", + lastMessagingToolSentAt: Date.now(), + lastMessagingToolSentTexts: ["hello world!"], + lastMessagingToolSentTargets: [ + { tool: "message", provider: "telegram", to: "123" }, + { tool: "message", provider: "telegram", to: "123" }, + ], + }; + const sessionStore: Record = { main: sessionEntry }; + + runEmbeddedPiAgentMock.mockResolvedValueOnce({ + payloads: [{ text: "hello world!" }], + meta: {}, + }); + + const runner = createMessagingDedupeRunner(onBlockReply, { + sessionEntry, + sessionStore, + sessionKey: "main", + }); + + await runner({ + ...baseQueuedRun("telegram"), + originatingTo: "123", + }); + + expect(onBlockReply).not.toHaveBeenCalled(); + }); + it("drops media URL from payload when messaging tool already sent it", async () => { const onBlockReply = vi.fn(async () => {}); runEmbeddedPiAgentMock.mockResolvedValueOnce({ diff --git a/src/auto-reply/reply/followup-runner.ts b/src/auto-reply/reply/followup-runner.ts index 021eadcf241..7eceda2628f 100644 --- a/src/auto-reply/reply/followup-runner.ts +++ b/src/auto-reply/reply/followup-runner.ts @@ -344,8 +344,17 @@ export function createFollowupRunner(params: { originatingTo, accountId: originAccountId, }); + const uniquePreviousTargets = new Set( + previousSentTargets.map((target) => + JSON.stringify({ + provider: target.provider, + to: target.to ?? "", + accountId: target.accountId ?? "", + }), + ), + ); const canReuseSessionDedupeFingerprints = - recentTargetMatch && previousSentTargets.length <= 1; + recentTargetMatch && uniquePreviousTargets.size <= 1; const sentTexts = [ ...(runResult.messagingToolSentTexts ?? []), From 1faa358d00e9c006dd04cfbc92108c69462028e3 Mon Sep 17 00:00:00 2001 From: KimGLee <05_bolster_inkling@icloud.com> Date: Tue, 3 Mar 2026 16:44:51 +0800 Subject: [PATCH 13/15] fix(followup): scope session dedupe reuse to system-generated runs --- src/auto-reply/reply/followup-runner.test.ts | 32 ++++++++++++++++++++ src/auto-reply/reply/followup-runner.ts | 3 +- 2 files changed, 34 insertions(+), 1 deletion(-) diff --git a/src/auto-reply/reply/followup-runner.test.ts b/src/auto-reply/reply/followup-runner.test.ts index c0da2f757eb..ac9634568db 100644 --- a/src/auto-reply/reply/followup-runner.test.ts +++ b/src/auto-reply/reply/followup-runner.test.ts @@ -447,6 +447,38 @@ describe("createFollowupRunner messaging tool dedupe", () => { expect(onBlockReply).not.toHaveBeenCalled(); }); + it("does not reuse session-level dedupe fingerprints for queued user turns", async () => { + const onBlockReply = vi.fn(async () => {}); + const sessionEntry: SessionEntry = { + sessionId: "session", + updatedAt: Date.now(), + lastMessagingToolSessionId: "session", + lastMessagingToolSentAt: Date.now(), + lastMessagingToolSentTexts: ["hello world!"], + lastMessagingToolSentTargets: [{ tool: "message", provider: "telegram", to: "123" }], + }; + const sessionStore: Record = { main: sessionEntry }; + + runEmbeddedPiAgentMock.mockResolvedValueOnce({ + payloads: [{ text: "hello world!" }], + meta: {}, + }); + + const runner = createMessagingDedupeRunner(onBlockReply, { + sessionEntry, + sessionStore, + sessionKey: "main", + }); + + await runner({ + ...baseQueuedRun("telegram"), + messageId: "user-msg-123", + originatingTo: "123", + }); + + expect(onBlockReply).toHaveBeenCalledTimes(1); + }); + it("does not use session-level dedupe from a previous session id", async () => { const onBlockReply = vi.fn(async () => {}); const sessionEntry: SessionEntry = { diff --git a/src/auto-reply/reply/followup-runner.ts b/src/auto-reply/reply/followup-runner.ts index 7eceda2628f..234161d2498 100644 --- a/src/auto-reply/reply/followup-runner.ts +++ b/src/auto-reply/reply/followup-runner.ts @@ -353,8 +353,9 @@ export function createFollowupRunner(params: { }), ), ); + const isSystemGeneratedFollowup = !queued.messageId; const canReuseSessionDedupeFingerprints = - recentTargetMatch && uniquePreviousTargets.size <= 1; + isSystemGeneratedFollowup && recentTargetMatch && uniquePreviousTargets.size <= 1; const sentTexts = [ ...(runResult.messagingToolSentTexts ?? []), From 382cf35d6a71854bf50b7a0c3657ff37ac228c6c Mon Sep 17 00:00:00 2001 From: KimGLee <05_bolster_inkling@icloud.com> Date: Tue, 3 Mar 2026 18:21:48 +0800 Subject: [PATCH 14/15] chore(ci): retrigger windows shard timeout run --- src/cli/daemon-cli/lifecycle.test.ts | 21 +++++++++++---------- 1 file changed, 11 insertions(+), 10 deletions(-) diff --git a/src/cli/daemon-cli/lifecycle.test.ts b/src/cli/daemon-cli/lifecycle.test.ts index f026f81399f..815ee30e158 100644 --- a/src/cli/daemon-cli/lifecycle.test.ts +++ b/src/cli/daemon-cli/lifecycle.test.ts @@ -35,16 +35,17 @@ const resolveGatewayPort = vi.fn(() => 18789); const findVerifiedGatewayListenerPidsOnPortSync = vi.fn<(port: number) => number[]>(() => []); const signalVerifiedGatewayPidSync = vi.fn<(pid: number, signal: "SIGTERM" | "SIGUSR1") => void>(); const formatGatewayPidList = vi.fn<(pids: number[]) => string>((pids) => pids.join(", ")); -const probeGateway = vi.fn< - (opts: { - url: string; - auth?: { token?: string; password?: string }; - timeoutMs: number; - }) => Promise<{ - ok: boolean; - configSnapshot: unknown; - }> ->(); +const probeGateway = + vi.fn< + (opts: { + url: string; + auth?: { token?: string; password?: string }; + timeoutMs: number; + }) => Promise<{ + ok: boolean; + configSnapshot: unknown; + }> + >(); const isRestartEnabled = vi.fn<(config?: { commands?: unknown }) => boolean>(() => true); const loadConfig = vi.fn(() => ({})); From 84bf1d73ea29f7616eec13a41af9683cba25da6b Mon Sep 17 00:00:00 2001 From: KimGLee <05_bolster_inkling@icloud.com> Date: Sat, 21 Mar 2026 13:14:05 +0800 Subject: [PATCH 15/15] fix(telegram): guard file-ref extension set access for mocked runtimes --- extensions/telegram/src/format.ts | 34 ++++++++++++++++++++++--------- 1 file changed, 24 insertions(+), 10 deletions(-) diff --git a/extensions/telegram/src/format.ts b/extensions/telegram/src/format.ts index 4d14f179b2f..9134ed420e6 100644 --- a/extensions/telegram/src/format.ts +++ b/extensions/telegram/src/format.ts @@ -103,16 +103,23 @@ function escapeRegex(str: string): string { return str.replace(/[.*+?^${}()|[\]\\]/g, "\\$&"); } -const FILE_EXTENSIONS_PATTERN = Array.from(FILE_REF_EXTENSIONS_WITH_TLD).map(escapeRegex).join("|"); +const FILE_EXTENSIONS_PATTERN = + FILE_REF_EXTENSIONS_WITH_TLD && Symbol.iterator in FILE_REF_EXTENSIONS_WITH_TLD + ? Array.from(FILE_REF_EXTENSIONS_WITH_TLD).map(escapeRegex).join("|") + : ""; const AUTO_LINKED_ANCHOR_PATTERN = /]*>\1<\/a>/gi; -const FILE_REFERENCE_PATTERN = new RegExp( - `(^|[^a-zA-Z0-9_\\-/])([a-zA-Z0-9_.\\-./]+\\.(?:${FILE_EXTENSIONS_PATTERN}))(?=$|[^a-zA-Z0-9_\\-/])`, - "gi", -); -const ORPHANED_TLD_PATTERN = new RegExp( - `([^a-zA-Z0-9]|^)([A-Za-z]\\.(?:${FILE_EXTENSIONS_PATTERN}))(?=[^a-zA-Z0-9/]|$)`, - "g", -); +const FILE_REFERENCE_PATTERN = FILE_EXTENSIONS_PATTERN + ? new RegExp( + `(^|[^a-zA-Z0-9_\\-/])([a-zA-Z0-9_.\\-./]+\\.(?:${FILE_EXTENSIONS_PATTERN}))(?=$|[^a-zA-Z0-9_\\-/])`, + "gi", + ) + : null; +const ORPHANED_TLD_PATTERN = FILE_EXTENSIONS_PATTERN + ? new RegExp( + `([^a-zA-Z0-9]|^)([A-Za-z]\\.(?:${FILE_EXTENSIONS_PATTERN}))(?=[^a-zA-Z0-9/]|$)`, + "g", + ) + : null; const HTML_TAG_PATTERN = /(<\/?)([a-zA-Z][a-zA-Z0-9-]*)\b[^>]*?>/gi; function wrapStandaloneFileRef(match: string, prefix: string, filename: string): string { @@ -131,7 +138,14 @@ function wrapSegmentFileRefs( preDepth: number, anchorDepth: number, ): string { - if (!text || codeDepth > 0 || preDepth > 0 || anchorDepth > 0) { + if ( + !text || + !FILE_REFERENCE_PATTERN || + !ORPHANED_TLD_PATTERN || + codeDepth > 0 || + preDepth > 0 || + anchorDepth > 0 + ) { return text; } const wrappedStandalone = text.replace(FILE_REFERENCE_PATTERN, wrapStandaloneFileRef);