diff --git a/extensions/telegram/src/format.ts b/extensions/telegram/src/format.ts index 4d14f179b2f..9134ed420e6 100644 --- a/extensions/telegram/src/format.ts +++ b/extensions/telegram/src/format.ts @@ -103,16 +103,23 @@ function escapeRegex(str: string): string { return str.replace(/[.*+?^${}()|[\]\\]/g, "\\$&"); } -const FILE_EXTENSIONS_PATTERN = Array.from(FILE_REF_EXTENSIONS_WITH_TLD).map(escapeRegex).join("|"); +const FILE_EXTENSIONS_PATTERN = + FILE_REF_EXTENSIONS_WITH_TLD && Symbol.iterator in FILE_REF_EXTENSIONS_WITH_TLD + ? Array.from(FILE_REF_EXTENSIONS_WITH_TLD).map(escapeRegex).join("|") + : ""; const AUTO_LINKED_ANCHOR_PATTERN = /]*>\1<\/a>/gi; -const FILE_REFERENCE_PATTERN = new RegExp( - `(^|[^a-zA-Z0-9_\\-/])([a-zA-Z0-9_.\\-./]+\\.(?:${FILE_EXTENSIONS_PATTERN}))(?=$|[^a-zA-Z0-9_\\-/])`, - "gi", -); -const ORPHANED_TLD_PATTERN = new RegExp( - `([^a-zA-Z0-9]|^)([A-Za-z]\\.(?:${FILE_EXTENSIONS_PATTERN}))(?=[^a-zA-Z0-9/]|$)`, - "g", -); +const FILE_REFERENCE_PATTERN = FILE_EXTENSIONS_PATTERN + ? new RegExp( + `(^|[^a-zA-Z0-9_\\-/])([a-zA-Z0-9_.\\-./]+\\.(?:${FILE_EXTENSIONS_PATTERN}))(?=$|[^a-zA-Z0-9_\\-/])`, + "gi", + ) + : null; +const ORPHANED_TLD_PATTERN = FILE_EXTENSIONS_PATTERN + ? new RegExp( + `([^a-zA-Z0-9]|^)([A-Za-z]\\.(?:${FILE_EXTENSIONS_PATTERN}))(?=[^a-zA-Z0-9/]|$)`, + "g", + ) + : null; const HTML_TAG_PATTERN = /(<\/?)([a-zA-Z][a-zA-Z0-9-]*)\b[^>]*?>/gi; function wrapStandaloneFileRef(match: string, prefix: string, filename: string): string { @@ -131,7 +138,14 @@ function wrapSegmentFileRefs( preDepth: number, anchorDepth: number, ): string { - if (!text || codeDepth > 0 || preDepth > 0 || anchorDepth > 0) { + if ( + !text || + !FILE_REFERENCE_PATTERN || + !ORPHANED_TLD_PATTERN || + codeDepth > 0 || + preDepth > 0 || + anchorDepth > 0 + ) { return text; } const wrappedStandalone = text.replace(FILE_REFERENCE_PATTERN, wrapStandaloneFileRef); diff --git a/src/auto-reply/reply/agent-runner.ts b/src/auto-reply/reply/agent-runner.ts index fbdad1be160..0ac504154a3 100644 --- a/src/auto-reply/reply/agent-runner.ts +++ b/src/auto-reply/reply/agent-runner.ts @@ -58,6 +58,8 @@ import { incrementRunCompactionCount, persistRunSessionUsage } from "./session-r import { createTypingSignaler } from "./typing-mode.js"; import type { TypingController } from "./typing.js"; +const RECENT_MESSAGING_TOOL_DEDUPE_WINDOW_MS = 2 * 60 * 1000; + const BLOCK_REPLY_SEND_TIMEOUT_MS = 15_000; export async function runReplyAgent(params: { @@ -292,6 +294,11 @@ export async function runReplyAgent(params: { fallbackNoticeSelectedModel: undefined, fallbackNoticeActiveModel: undefined, fallbackNoticeReason: undefined, + lastMessagingToolSessionId: undefined, + lastMessagingToolSentAt: undefined, + lastMessagingToolSentTexts: undefined, + lastMessagingToolSentMediaUrls: undefined, + lastMessagingToolSentTargets: undefined, }; const agentId = resolveAgentIdFromSessionKey(sessionKey); const nextSessionFile = resolveSessionTranscriptPath( @@ -414,6 +421,54 @@ export async function runReplyAgent(params: { const payloadArray = runResult.payloads ?? []; + const sentTexts = runResult.messagingToolSentTexts ?? []; + const sentMediaUrls = runResult.messagingToolSentMediaUrls ?? []; + const sentTargets = runResult.messagingToolSentTargets ?? []; + const sessionDedupeEntry = activeSessionEntry; + if (sessionDedupeEntry) { + const now = Date.now(); + if (sentTexts.length || sentMediaUrls.length || sentTargets.length) { + sessionDedupeEntry.lastMessagingToolSessionId = followupRun.run.sessionId; + sessionDedupeEntry.lastMessagingToolSentAt = now; + sessionDedupeEntry.lastMessagingToolSentTexts = sentTexts; + sessionDedupeEntry.lastMessagingToolSentMediaUrls = sentMediaUrls; + sessionDedupeEntry.lastMessagingToolSentTargets = sentTargets; + } else if ( + typeof sessionDedupeEntry.lastMessagingToolSentAt === "number" && + now - sessionDedupeEntry.lastMessagingToolSentAt > RECENT_MESSAGING_TOOL_DEDUPE_WINDOW_MS + ) { + delete sessionDedupeEntry.lastMessagingToolSessionId; + delete sessionDedupeEntry.lastMessagingToolSentAt; + delete sessionDedupeEntry.lastMessagingToolSentTexts; + delete sessionDedupeEntry.lastMessagingToolSentMediaUrls; + delete sessionDedupeEntry.lastMessagingToolSentTargets; + } + if (sessionKey && activeSessionStore) { + activeSessionStore[sessionKey] = sessionDedupeEntry; + } + if (sessionKey && storePath) { + try { + await updateSessionStoreEntry({ + storePath, + sessionKey, + update: async () => ({ + lastMessagingToolSessionId: sessionDedupeEntry.lastMessagingToolSessionId, + lastMessagingToolSentAt: sessionDedupeEntry.lastMessagingToolSentAt, + lastMessagingToolSentTexts: sessionDedupeEntry.lastMessagingToolSentTexts, + lastMessagingToolSentMediaUrls: sessionDedupeEntry.lastMessagingToolSentMediaUrls, + lastMessagingToolSentTargets: sessionDedupeEntry.lastMessagingToolSentTargets, + }), + }); + } catch (error) { + console.warn( + "Failed to persist messaging-tool dedupe state for session", + sessionKey, + error, + ); + } + } + } + if (blockReplyPipeline) { await blockReplyPipeline.flush({ force: true }); blockReplyPipeline.stop(); diff --git a/src/auto-reply/reply/followup-runner.test.ts b/src/auto-reply/reply/followup-runner.test.ts index 0e93ab156a8..ac9634568db 100644 --- a/src/auto-reply/reply/followup-runner.test.ts +++ b/src/auto-reply/reply/followup-runner.test.ts @@ -79,10 +79,6 @@ function mockCompactionRun(params: { ); } -function createAsyncReplySpy() { - return vi.fn(async () => {}); -} - describe("createFollowupRunner compaction", () => { it("adds verbose auto-compaction notice and tracks count", async () => { const storePath = path.join( @@ -321,97 +317,95 @@ describe("createFollowupRunner messaging tool dedupe", () => { }); } - async function runMessagingCase(params: { - agentResult: Record; - queued?: FollowupRun; - runnerOverrides?: Partial<{ - sessionEntry: SessionEntry; - sessionStore: Record; - sessionKey: string; - storePath: string; - }>; - }) { - const onBlockReply = createAsyncReplySpy(); - runEmbeddedPiAgentMock.mockResolvedValueOnce({ - meta: {}, - ...params.agentResult, - }); - const runner = createMessagingDedupeRunner(onBlockReply, params.runnerOverrides); - await runner(params.queued ?? baseQueuedRun()); - return { onBlockReply }; - } - - function makeTextReplyDedupeResult(overrides?: Record) { - return { - payloads: [{ text: "hello world!" }], - messagingToolSentTexts: ["different message"], - ...overrides, - }; - } - it("drops payloads already sent via messaging tool", async () => { - const { onBlockReply } = await runMessagingCase({ - agentResult: { - payloads: [{ text: "hello world!" }], - messagingToolSentTexts: ["hello world!"], - }, + const onBlockReply = vi.fn(async () => {}); + runEmbeddedPiAgentMock.mockResolvedValueOnce({ + payloads: [{ text: "hello world!" }], + messagingToolSentTexts: ["hello world!"], + meta: {}, + }); + + const runner = createMessagingDedupeRunner(onBlockReply); + + await runner({ + ...baseQueuedRun("telegram"), + originatingTo: "123", }); expect(onBlockReply).not.toHaveBeenCalled(); }); it("delivers payloads when not duplicates", async () => { - const { onBlockReply } = await runMessagingCase({ - agentResult: makeTextReplyDedupeResult(), + const onBlockReply = vi.fn(async () => {}); + runEmbeddedPiAgentMock.mockResolvedValueOnce({ + payloads: [{ text: "hello world!" }], + messagingToolSentTexts: ["different message"], + meta: {}, }); + const runner = createMessagingDedupeRunner(onBlockReply); + + await runner(baseQueuedRun()); + expect(onBlockReply).toHaveBeenCalledTimes(1); }); it("suppresses replies when a messaging tool sent via the same provider + target", async () => { - const { onBlockReply } = await runMessagingCase({ - agentResult: { - ...makeTextReplyDedupeResult(), - messagingToolSentTargets: [{ tool: "slack", provider: "slack", to: "channel:C1" }], - }, - queued: baseQueuedRun("slack"), + const onBlockReply = vi.fn(async () => {}); + runEmbeddedPiAgentMock.mockResolvedValueOnce({ + payloads: [{ text: "hello world!" }], + messagingToolSentTexts: ["different message"], + messagingToolSentTargets: [{ tool: "slack", provider: "slack", to: "channel:C1" }], + meta: {}, }); + const runner = createMessagingDedupeRunner(onBlockReply); + + await runner(baseQueuedRun("slack")); + expect(onBlockReply).not.toHaveBeenCalled(); }); it("suppresses replies when provider is synthetic but originating channel matches", async () => { - const { onBlockReply } = await runMessagingCase({ - agentResult: { - ...makeTextReplyDedupeResult(), - messagingToolSentTargets: [{ tool: "telegram", provider: "telegram", to: "268300329" }], - }, - queued: { - ...baseQueuedRun("heartbeat"), - originatingChannel: "telegram", - originatingTo: "268300329", - } as FollowupRun, + const onBlockReply = vi.fn(async () => {}); + runEmbeddedPiAgentMock.mockResolvedValueOnce({ + payloads: [{ text: "hello world!" }], + messagingToolSentTexts: ["different message"], + messagingToolSentTargets: [{ tool: "telegram", provider: "telegram", to: "268300329" }], + meta: {}, }); + const runner = createMessagingDedupeRunner(onBlockReply); + + await runner({ + ...baseQueuedRun("heartbeat"), + originatingChannel: "telegram", + originatingTo: "268300329", + } as FollowupRun); + expect(onBlockReply).not.toHaveBeenCalled(); }); it("does not suppress replies for same target when account differs", async () => { - const { onBlockReply } = await runMessagingCase({ - agentResult: { - ...makeTextReplyDedupeResult(), - messagingToolSentTargets: [ - { tool: "telegram", provider: "telegram", to: "268300329", accountId: "work" }, - ], - }, - queued: { - ...baseQueuedRun("heartbeat"), - originatingChannel: "telegram", - originatingTo: "268300329", - originatingAccountId: "personal", - } as FollowupRun, + const onBlockReply = vi.fn(async () => {}); + runEmbeddedPiAgentMock.mockResolvedValueOnce({ + payloads: [{ text: "hello world!" }], + messagingToolSentTexts: ["different message"], + messagingToolSentTargets: [ + { tool: "telegram", provider: "telegram", to: "268300329", accountId: "work" }, + ], + meta: {}, }); + const runner = createMessagingDedupeRunner(onBlockReply); + + await runner({ + ...baseQueuedRun("heartbeat"), + originatingChannel: "telegram", + originatingTo: "268300329", + originatingAccountId: "personal", + } as FollowupRun); + expect(routeReplyMock).toHaveBeenCalledWith( expect.objectContaining({ channel: "telegram", @@ -422,26 +416,224 @@ describe("createFollowupRunner messaging tool dedupe", () => { expect(onBlockReply).not.toHaveBeenCalled(); }); - it("drops media URL from payload when messaging tool already sent it", async () => { - const { onBlockReply } = await runMessagingCase({ - agentResult: { - payloads: [{ mediaUrl: "/tmp/img.png" }], - messagingToolSentMediaUrls: ["/tmp/img.png"], - }, + it("suppresses replies using recent session-level messaging-tool dedupe state", async () => { + const onBlockReply = vi.fn(async () => {}); + const sessionEntry: SessionEntry = { + sessionId: "session", + updatedAt: Date.now(), + lastMessagingToolSessionId: "session", + lastMessagingToolSentAt: Date.now(), + lastMessagingToolSentTexts: ["hello world!"], + lastMessagingToolSentTargets: [{ tool: "message", provider: "telegram", to: "123" }], + }; + const sessionStore: Record = { main: sessionEntry }; + + runEmbeddedPiAgentMock.mockResolvedValueOnce({ + payloads: [{ text: "hello world!" }], + meta: {}, }); + const runner = createMessagingDedupeRunner(onBlockReply, { + sessionEntry, + sessionStore, + sessionKey: "main", + }); + + await runner({ + ...baseQueuedRun("telegram"), + originatingTo: "123", + }); + + expect(onBlockReply).not.toHaveBeenCalled(); + }); + + it("does not reuse session-level dedupe fingerprints for queued user turns", async () => { + const onBlockReply = vi.fn(async () => {}); + const sessionEntry: SessionEntry = { + sessionId: "session", + updatedAt: Date.now(), + lastMessagingToolSessionId: "session", + lastMessagingToolSentAt: Date.now(), + lastMessagingToolSentTexts: ["hello world!"], + lastMessagingToolSentTargets: [{ tool: "message", provider: "telegram", to: "123" }], + }; + const sessionStore: Record = { main: sessionEntry }; + + runEmbeddedPiAgentMock.mockResolvedValueOnce({ + payloads: [{ text: "hello world!" }], + meta: {}, + }); + + const runner = createMessagingDedupeRunner(onBlockReply, { + sessionEntry, + sessionStore, + sessionKey: "main", + }); + + await runner({ + ...baseQueuedRun("telegram"), + messageId: "user-msg-123", + originatingTo: "123", + }); + + expect(onBlockReply).toHaveBeenCalledTimes(1); + }); + + it("does not use session-level dedupe from a previous session id", async () => { + const onBlockReply = vi.fn(async () => {}); + const sessionEntry: SessionEntry = { + sessionId: "current-session", + updatedAt: Date.now(), + lastMessagingToolSessionId: "old-session", + lastMessagingToolSentAt: Date.now(), + lastMessagingToolSentTexts: ["hello world!"], + lastMessagingToolSentTargets: [{ tool: "message", provider: "telegram", to: "123" }], + }; + const sessionStore: Record = { main: sessionEntry }; + + runEmbeddedPiAgentMock.mockResolvedValueOnce({ + payloads: [{ text: "hello world!" }], + meta: {}, + }); + + const runner = createMessagingDedupeRunner(onBlockReply, { + sessionEntry, + sessionStore, + sessionKey: "main", + }); + + await runner({ + ...baseQueuedRun("telegram"), + originatingTo: "123", + run: { ...baseQueuedRun("telegram").run, sessionId: "current-session" }, + }); + + expect(onBlockReply).toHaveBeenCalled(); + }); + + it("does not use session-level text dedupe when recent target does not match", async () => { + const onBlockReply = vi.fn(async () => {}); + const sessionEntry: SessionEntry = { + sessionId: "session", + updatedAt: Date.now(), + lastMessagingToolSentAt: Date.now(), + lastMessagingToolSentTexts: ["hello world!"], + lastMessagingToolSentTargets: [{ tool: "message", provider: "telegram", to: "999" }], + }; + const sessionStore: Record = { main: sessionEntry }; + + runEmbeddedPiAgentMock.mockResolvedValueOnce({ + payloads: [{ text: "hello world!" }], + meta: {}, + }); + + const runner = createMessagingDedupeRunner(onBlockReply, { + sessionEntry, + sessionStore, + sessionKey: "main", + }); + + await runner(baseQueuedRun("telegram")); + + expect(onBlockReply).toHaveBeenCalled(); + }); + + it("does not reuse session-level text dedupe when prior run had multiple messaging targets", async () => { + const onBlockReply = vi.fn(async () => {}); + const sessionEntry: SessionEntry = { + sessionId: "session", + updatedAt: Date.now(), + lastMessagingToolSessionId: "session", + lastMessagingToolSentAt: Date.now(), + lastMessagingToolSentTexts: ["hello world!"], + lastMessagingToolSentTargets: [ + { tool: "message", provider: "telegram", to: "123" }, + { tool: "message", provider: "telegram", to: "999" }, + ], + }; + const sessionStore: Record = { main: sessionEntry }; + + runEmbeddedPiAgentMock.mockResolvedValueOnce({ + payloads: [{ text: "hello world!" }], + meta: {}, + }); + + const runner = createMessagingDedupeRunner(onBlockReply, { + sessionEntry, + sessionStore, + sessionKey: "main", + }); + + await runner({ + ...baseQueuedRun("telegram"), + originatingTo: "123", + }); + + expect(onBlockReply).toHaveBeenCalled(); + }); + + it("reuses session-level text dedupe when prior run had repeated sends to one target", async () => { + const onBlockReply = vi.fn(async () => {}); + const sessionEntry: SessionEntry = { + sessionId: "session", + updatedAt: Date.now(), + lastMessagingToolSessionId: "session", + lastMessagingToolSentAt: Date.now(), + lastMessagingToolSentTexts: ["hello world!"], + lastMessagingToolSentTargets: [ + { tool: "message", provider: "telegram", to: "123" }, + { tool: "message", provider: "telegram", to: "123" }, + ], + }; + const sessionStore: Record = { main: sessionEntry }; + + runEmbeddedPiAgentMock.mockResolvedValueOnce({ + payloads: [{ text: "hello world!" }], + meta: {}, + }); + + const runner = createMessagingDedupeRunner(onBlockReply, { + sessionEntry, + sessionStore, + sessionKey: "main", + }); + + await runner({ + ...baseQueuedRun("telegram"), + originatingTo: "123", + }); + + expect(onBlockReply).not.toHaveBeenCalled(); + }); + + it("drops media URL from payload when messaging tool already sent it", async () => { + const onBlockReply = vi.fn(async () => {}); + runEmbeddedPiAgentMock.mockResolvedValueOnce({ + payloads: [{ mediaUrl: "/tmp/img.png" }], + messagingToolSentMediaUrls: ["/tmp/img.png"], + meta: {}, + }); + + const runner = createMessagingDedupeRunner(onBlockReply); + + await runner(baseQueuedRun()); + // Media stripped → payload becomes non-renderable → not delivered. expect(onBlockReply).not.toHaveBeenCalled(); }); it("delivers media payload when not a duplicate", async () => { - const { onBlockReply } = await runMessagingCase({ - agentResult: { - payloads: [{ mediaUrl: "/tmp/img.png" }], - messagingToolSentMediaUrls: ["/tmp/other.png"], - }, + const onBlockReply = vi.fn(async () => {}); + runEmbeddedPiAgentMock.mockResolvedValueOnce({ + payloads: [{ mediaUrl: "/tmp/img.png" }], + messagingToolSentMediaUrls: ["/tmp/other.png"], + meta: {}, }); + const runner = createMessagingDedupeRunner(onBlockReply); + + await runner(baseQueuedRun()); + expect(onBlockReply).toHaveBeenCalledTimes(1); }); @@ -455,28 +647,30 @@ describe("createFollowupRunner messaging tool dedupe", () => { const sessionStore: Record = { [sessionKey]: sessionEntry }; await saveSessionStore(storePath, sessionStore); - const { onBlockReply } = await runMessagingCase({ - agentResult: { - ...makeTextReplyDedupeResult(), - messagingToolSentTargets: [{ tool: "slack", provider: "slack", to: "channel:C1" }], - meta: { - agentMeta: { - usage: { input: 1_000, output: 50 }, - lastCallUsage: { input: 400, output: 20 }, - model: "claude-opus-4-5", - provider: "anthropic", - }, + const onBlockReply = vi.fn(async () => {}); + runEmbeddedPiAgentMock.mockResolvedValueOnce({ + payloads: [{ text: "hello world!" }], + messagingToolSentTexts: ["different message"], + messagingToolSentTargets: [{ tool: "slack", provider: "slack", to: "channel:C1" }], + meta: { + agentMeta: { + usage: { input: 1_000, output: 50 }, + lastCallUsage: { input: 400, output: 20 }, + model: "claude-opus-4-5", + provider: "anthropic", }, }, - runnerOverrides: { - sessionEntry, - sessionStore, - sessionKey, - storePath, - }, - queued: baseQueuedRun("slack"), }); + const runner = createMessagingDedupeRunner(onBlockReply, { + sessionEntry, + sessionStore, + sessionKey, + storePath, + }); + + await runner(baseQueuedRun("slack")); + expect(onBlockReply).not.toHaveBeenCalled(); const store = loadSessionStore(storePath, { skipCache: true }); // totalTokens should reflect the last call usage snapshot, not the accumulated input. @@ -546,36 +740,46 @@ describe("createFollowupRunner messaging tool dedupe", () => { }); it("does not fall back to dispatcher when cross-channel origin routing fails", async () => { + const onBlockReply = vi.fn(async () => {}); + runEmbeddedPiAgentMock.mockResolvedValueOnce({ + payloads: [{ text: "hello world!" }], + meta: {}, + }); routeReplyMock.mockResolvedValueOnce({ ok: false, error: "forced route failure", }); - const { onBlockReply } = await runMessagingCase({ - agentResult: { payloads: [{ text: "hello world!" }] }, - queued: { - ...baseQueuedRun("webchat"), - originatingChannel: "discord", - originatingTo: "channel:C1", - } as FollowupRun, - }); + + const runner = createMessagingDedupeRunner(onBlockReply); + + await runner({ + ...baseQueuedRun("webchat"), + originatingChannel: "discord", + originatingTo: "channel:C1", + } as FollowupRun); expect(routeReplyMock).toHaveBeenCalled(); expect(onBlockReply).not.toHaveBeenCalled(); }); it("falls back to dispatcher when same-channel origin routing fails", async () => { + const onBlockReply = vi.fn(async () => {}); + runEmbeddedPiAgentMock.mockResolvedValueOnce({ + payloads: [{ text: "hello world!" }], + meta: {}, + }); routeReplyMock.mockResolvedValueOnce({ ok: false, error: "outbound adapter unavailable", }); - const { onBlockReply } = await runMessagingCase({ - agentResult: { payloads: [{ text: "hello world!" }] }, - queued: { - ...baseQueuedRun(" Feishu "), - originatingChannel: "FEISHU", - originatingTo: "ou_abc123", - } as FollowupRun, - }); + + const runner = createMessagingDedupeRunner(onBlockReply); + + await runner({ + ...baseQueuedRun(" Feishu "), + originatingChannel: "FEISHU", + originatingTo: "ou_abc123", + } as FollowupRun); expect(routeReplyMock).toHaveBeenCalled(); expect(onBlockReply).toHaveBeenCalledTimes(1); @@ -583,17 +787,22 @@ describe("createFollowupRunner messaging tool dedupe", () => { }); it("routes followups with originating account/thread metadata", async () => { - const { onBlockReply } = await runMessagingCase({ - agentResult: { payloads: [{ text: "hello world!" }] }, - queued: { - ...baseQueuedRun("webchat"), - originatingChannel: "discord", - originatingTo: "channel:C1", - originatingAccountId: "work", - originatingThreadId: "1739142736.000100", - } as FollowupRun, + const onBlockReply = vi.fn(async () => {}); + runEmbeddedPiAgentMock.mockResolvedValueOnce({ + payloads: [{ text: "hello world!" }], + meta: {}, }); + const runner = createMessagingDedupeRunner(onBlockReply); + + await runner({ + ...baseQueuedRun("webchat"), + originatingChannel: "discord", + originatingTo: "channel:C1", + originatingAccountId: "work", + originatingThreadId: "1739142736.000100", + } as FollowupRun); + expect(routeReplyMock).toHaveBeenCalledWith( expect.objectContaining({ channel: "discord", @@ -607,37 +816,44 @@ describe("createFollowupRunner messaging tool dedupe", () => { }); describe("createFollowupRunner typing cleanup", () => { - async function runTypingCase(agentResult: Record) { + it("calls both markRunComplete and markDispatchIdle on NO_REPLY", async () => { const typing = createMockTypingController(); runEmbeddedPiAgentMock.mockResolvedValueOnce({ + payloads: [{ text: "NO_REPLY" }], meta: {}, - ...agentResult, }); const runner = createFollowupRunner({ - opts: { onBlockReply: createAsyncReplySpy() }, + opts: { onBlockReply: vi.fn(async () => {}) }, typing, typingMode: "instant", defaultModel: "anthropic/claude-opus-4-5", }); await runner(baseQueuedRun()); - return typing; - } - function expectTypingCleanup(typing: ReturnType) { expect(typing.markRunComplete).toHaveBeenCalled(); expect(typing.markDispatchIdle).toHaveBeenCalled(); - } - - it("calls both markRunComplete and markDispatchIdle on NO_REPLY", async () => { - const typing = await runTypingCase({ payloads: [{ text: "NO_REPLY" }] }); - expectTypingCleanup(typing); }); it("calls both markRunComplete and markDispatchIdle on empty payloads", async () => { - const typing = await runTypingCase({ payloads: [] }); - expectTypingCleanup(typing); + const typing = createMockTypingController(); + runEmbeddedPiAgentMock.mockResolvedValueOnce({ + payloads: [], + meta: {}, + }); + + const runner = createFollowupRunner({ + opts: { onBlockReply: vi.fn(async () => {}) }, + typing, + typingMode: "instant", + defaultModel: "anthropic/claude-opus-4-5", + }); + + await runner(baseQueuedRun()); + + expect(typing.markRunComplete).toHaveBeenCalled(); + expect(typing.markDispatchIdle).toHaveBeenCalled(); }); it("calls both markRunComplete and markDispatchIdle on agent error", async () => { @@ -653,7 +869,8 @@ describe("createFollowupRunner typing cleanup", () => { await runner(baseQueuedRun()); - expectTypingCleanup(typing); + expect(typing.markRunComplete).toHaveBeenCalled(); + expect(typing.markDispatchIdle).toHaveBeenCalled(); }); it("calls both markRunComplete and markDispatchIdle on successful delivery", async () => { @@ -674,7 +891,8 @@ describe("createFollowupRunner typing cleanup", () => { await runner(baseQueuedRun()); expect(onBlockReply).toHaveBeenCalled(); - expectTypingCleanup(typing); + expect(typing.markRunComplete).toHaveBeenCalled(); + expect(typing.markDispatchIdle).toHaveBeenCalled(); }); }); diff --git a/src/auto-reply/reply/followup-runner.ts b/src/auto-reply/reply/followup-runner.ts index 2fd21607095..234161d2498 100644 --- a/src/auto-reply/reply/followup-runner.ts +++ b/src/auto-reply/reply/followup-runner.ts @@ -38,6 +38,8 @@ import { incrementRunCompactionCount, persistRunSessionUsage } from "./session-r import { createTypingSignaler } from "./typing-mode.js"; import type { TypingController } from "./typing.js"; +const RECENT_MESSAGING_TOOL_DEDUPE_WINDOW_MS = 2 * 60 * 1000; + export function createFollowupRunner(params: { opts?: GetReplyOptions; typing: TypingController; @@ -316,27 +318,74 @@ export function createFollowupRunner(params: { replyToChannel, }); + const messageProvider = resolveOriginMessageProvider({ + originatingChannel: queued.originatingChannel, + provider: queued.run.messageProvider, + }); + const originatingTo = resolveOriginMessageTo({ + originatingTo: queued.originatingTo, + }); + const originAccountId = resolveOriginAccountId({ + originatingAccountId: queued.originatingAccountId, + accountId: queued.run.agentAccountId, + }); + + const now = Date.now(); + const recentWindowActive = + typeof sessionEntry?.lastMessagingToolSentAt === "number" && + sessionEntry?.lastMessagingToolSessionId === queued.run.sessionId && + now - sessionEntry.lastMessagingToolSentAt <= RECENT_MESSAGING_TOOL_DEDUPE_WINDOW_MS; + const previousSentTargets = sessionEntry?.lastMessagingToolSentTargets ?? []; + const recentTargetMatch = + recentWindowActive && + shouldSuppressMessagingToolReplies({ + messageProvider, + messagingToolSentTargets: previousSentTargets, + originatingTo, + accountId: originAccountId, + }); + const uniquePreviousTargets = new Set( + previousSentTargets.map((target) => + JSON.stringify({ + provider: target.provider, + to: target.to ?? "", + accountId: target.accountId ?? "", + }), + ), + ); + const isSystemGeneratedFollowup = !queued.messageId; + const canReuseSessionDedupeFingerprints = + isSystemGeneratedFollowup && recentTargetMatch && uniquePreviousTargets.size <= 1; + + const sentTexts = [ + ...(runResult.messagingToolSentTexts ?? []), + ...(canReuseSessionDedupeFingerprints + ? (sessionEntry?.lastMessagingToolSentTexts ?? []) + : []), + ]; + const sentMediaUrls = [ + ...(runResult.messagingToolSentMediaUrls ?? []), + ...(canReuseSessionDedupeFingerprints + ? (sessionEntry?.lastMessagingToolSentMediaUrls ?? []) + : []), + ]; + // Keep target-based suppression scoped to the current run only. + // Session-level dedupe state is used for text/media duplicate filtering when target matches. + const sentTargets = runResult.messagingToolSentTargets ?? []; + const dedupedPayloads = filterMessagingToolDuplicates({ payloads: replyTaggedPayloads, - sentTexts: runResult.messagingToolSentTexts ?? [], + sentTexts, }); const mediaFilteredPayloads = filterMessagingToolMediaDuplicates({ payloads: dedupedPayloads, - sentMediaUrls: runResult.messagingToolSentMediaUrls ?? [], + sentMediaUrls, }); const suppressMessagingToolReplies = shouldSuppressMessagingToolReplies({ - messageProvider: resolveOriginMessageProvider({ - originatingChannel: queued.originatingChannel, - provider: queued.run.messageProvider, - }), - messagingToolSentTargets: runResult.messagingToolSentTargets, - originatingTo: resolveOriginMessageTo({ - originatingTo: queued.originatingTo, - }), - accountId: resolveOriginAccountId({ - originatingAccountId: queued.originatingAccountId, - accountId: queued.run.agentAccountId, - }), + messageProvider, + messagingToolSentTargets: sentTargets, + originatingTo, + accountId: originAccountId, }); const finalPayloads = suppressMessagingToolReplies ? [] : mediaFilteredPayloads; diff --git a/src/cli/daemon-cli/lifecycle.test.ts b/src/cli/daemon-cli/lifecycle.test.ts index f026f81399f..815ee30e158 100644 --- a/src/cli/daemon-cli/lifecycle.test.ts +++ b/src/cli/daemon-cli/lifecycle.test.ts @@ -35,16 +35,17 @@ const resolveGatewayPort = vi.fn(() => 18789); const findVerifiedGatewayListenerPidsOnPortSync = vi.fn<(port: number) => number[]>(() => []); const signalVerifiedGatewayPidSync = vi.fn<(pid: number, signal: "SIGTERM" | "SIGUSR1") => void>(); const formatGatewayPidList = vi.fn<(pids: number[]) => string>((pids) => pids.join(", ")); -const probeGateway = vi.fn< - (opts: { - url: string; - auth?: { token?: string; password?: string }; - timeoutMs: number; - }) => Promise<{ - ok: boolean; - configSnapshot: unknown; - }> ->(); +const probeGateway = + vi.fn< + (opts: { + url: string; + auth?: { token?: string; password?: string }; + timeoutMs: number; + }) => Promise<{ + ok: boolean; + configSnapshot: unknown; + }> + >(); const isRestartEnabled = vi.fn<(config?: { commands?: unknown }) => boolean>(() => true); const loadConfig = vi.fn(() => ({})); diff --git a/src/config/sessions/types.ts b/src/config/sessions/types.ts index 6513fc81b37..06ad3a53441 100644 --- a/src/config/sessions/types.ts +++ b/src/config/sessions/types.ts @@ -1,5 +1,6 @@ import crypto from "node:crypto"; import type { Skill } from "@mariozechner/pi-coding-agent"; +import type { MessagingToolSend } from "../../agents/pi-embedded-runner.js"; import type { ChatType } from "../../channels/chat-type.js"; import type { ChannelId } from "../../channels/plugins/types.js"; import type { DeliveryContext } from "../../utils/delivery-context.js"; @@ -73,6 +74,16 @@ export type SessionEntry = { lastHeartbeatText?: string; /** Timestamp (ms) when lastHeartbeatText was delivered. */ lastHeartbeatSentAt?: number; + /** Timestamp (ms) for the most recent message-tool send fingerprint (cross-run dedupe). */ + lastMessagingToolSentAt?: number; + /** Session id that produced the most recent message-tool dedupe fingerprint. */ + lastMessagingToolSessionId?: string; + /** Recently sent message-tool text payloads for short-window cross-run dedupe. */ + lastMessagingToolSentTexts?: string[]; + /** Recently sent message-tool media urls for short-window cross-run dedupe. */ + lastMessagingToolSentMediaUrls?: string[]; + /** Recently sent message-tool routing targets for short-window cross-run dedupe. */ + lastMessagingToolSentTargets?: MessagingToolSend[]; sessionId: string; updatedAt: number; sessionFile?: string;