diff --git a/src/agents/pi-embedded-helpers.ts b/src/agents/pi-embedded-helpers.ts index 53f21814492..70a88a449cb 100644 --- a/src/agents/pi-embedded-helpers.ts +++ b/src/agents/pi-embedded-helpers.ts @@ -57,7 +57,10 @@ export { isMessagingToolDuplicate, isMessagingToolDuplicateNormalized, normalizeTextForComparison, + isRecentlyDelivered, + recordDeliveredText, } from "./pi-embedded-helpers/messaging-dedupe.js"; +export type { RecentDeliveredEntry } from "./pi-embedded-helpers/messaging-dedupe.js"; export { pickFallbackThinkingLevel } from "./pi-embedded-helpers/thinking.js"; diff --git a/src/agents/pi-embedded-helpers/messaging-dedupe.test.ts b/src/agents/pi-embedded-helpers/messaging-dedupe.test.ts new file mode 100644 index 00000000000..f1d238064a0 --- /dev/null +++ b/src/agents/pi-embedded-helpers/messaging-dedupe.test.ts @@ -0,0 +1,110 @@ +import { describe, expect, it } from "vitest"; +import { + buildDeliveredTextHash, + isRecentlyDelivered, + normalizeTextForComparison, + recordDeliveredText, + type RecentDeliveredEntry, +} from "./messaging-dedupe.js"; + +describe("normalizeTextForComparison", () => { + it("lowercases and trims", () => { + expect(normalizeTextForComparison(" Hello World ")).toBe("hello world"); + }); + + it("collapses whitespace", () => { + expect(normalizeTextForComparison("hello world")).toBe("hello world"); + }); +}); + +describe("cross-turn dedup", () => { + describe("buildDeliveredTextHash", () => { + it("returns normalized prefix up to 200 chars", () => { + const hash = buildDeliveredTextHash("Hello World!"); + expect(hash).toBe("hello world!"); + }); + + it("includes length and full-text hash for strings over 200 chars", () => { + const long = "a".repeat(300); + const hash = buildDeliveredTextHash(long); + expect(hash).toContain("|300|"); + expect(hash.startsWith("a".repeat(200))).toBe(true); + }); + + it("produces different hashes for texts with same prefix but different tails", () => { + const base = "x".repeat(200); + const textA = base + " ending alpha with more content here"; + const textB = base + " ending beta with different content"; + expect(buildDeliveredTextHash(textA)).not.toBe(buildDeliveredTextHash(textB)); + }); + + it("returns empty for very short text", () => { + expect(buildDeliveredTextHash("hi")).toBe("hi"); + }); + }); + + describe("isRecentlyDelivered", () => { + it("returns false for empty cache", () => { + expect(isRecentlyDelivered("Hello world test message", [])).toBe(false); + }); + + it("returns true when text was recently recorded", () => { + const cache: RecentDeliveredEntry[] = []; + const now = Date.now(); + recordDeliveredText("Hello world test message", cache, now); + expect(isRecentlyDelivered("Hello world test message", cache, now + 1000)).toBe(true); + }); + + it("returns false after TTL expires", () => { + const cache: RecentDeliveredEntry[] = []; + const now = Date.now(); + recordDeliveredText("Hello world test message", cache, now); + // 1 hour + 1ms later + expect(isRecentlyDelivered("Hello world test message", cache, now + 3_600_001)).toBe(false); + }); + + it("returns false for text shorter than MIN_DUPLICATE_TEXT_LENGTH", () => { + const cache: RecentDeliveredEntry[] = []; + recordDeliveredText("short", cache); + expect(isRecentlyDelivered("short", cache)).toBe(false); + }); + + it("detects duplicates with different whitespace/casing", () => { + const cache: RecentDeliveredEntry[] = []; + const now = Date.now(); + recordDeliveredText(" Hello World Test Message ", cache, now); + expect(isRecentlyDelivered("hello world test message", cache, now)).toBe(true); + }); + }); + + describe("recordDeliveredText", () => { + it("evicts expired entries on record", () => { + const cache: RecentDeliveredEntry[] = []; + const now = Date.now(); + recordDeliveredText("First message that is long enough", cache, now); + // Record second much later (> TTL) + recordDeliveredText("Second message that is long enough", cache, now + 3_700_000); + // First should be evicted + expect(cache.length).toBe(1); + expect(cache[0].hash).toContain("second"); + }); + + it("caps at RECENT_DELIVERED_MAX entries", () => { + const cache: RecentDeliveredEntry[] = []; + const now = Date.now(); + for (let i = 0; i < 25; i++) { + recordDeliveredText(`Unique message number ${i} with enough length`, cache, now + i); + } + expect(cache.length).toBe(20); + }); + + it("updates timestamp for duplicate hash instead of adding", () => { + const cache: RecentDeliveredEntry[] = []; + const now = Date.now(); + recordDeliveredText("Same message repeated in session", cache, now); + recordDeliveredText("Same message repeated in session", cache, now + 5000); + expect(cache.length).toBe(1); + expect(cache[0].timestamp).toBe(now + 5000); + }); + }); +}); diff --git a/src/agents/pi-embedded-helpers/messaging-dedupe.ts b/src/agents/pi-embedded-helpers/messaging-dedupe.ts index 71e1c3aa43c..d4f9eb973cd 100644 --- a/src/agents/pi-embedded-helpers/messaging-dedupe.ts +++ b/src/agents/pi-embedded-helpers/messaging-dedupe.ts @@ -1,5 +1,97 @@ const MIN_DUPLICATE_TEXT_LENGTH = 10; +/** + * Maximum number of recent delivered text hashes to retain for cross-turn + * deduplication. Keeps memory bounded while covering the typical window + * where context compaction may cause the model to re-emit a previous reply. + */ +const RECENT_DELIVERED_MAX = 20; + +/** + * TTL for entries in the cross-turn dedup cache (1 hour). + * After this period the entry is evicted and the same text can be delivered + * again (which is desirable for intentionally repeated content). + */ +const RECENT_DELIVERED_TTL_MS = 60 * 60_000; + +export type RecentDeliveredEntry = { + hash: string; + timestamp: number; +}; + +/** + * Build a collision-resistant hash from the full normalised text of a + * delivered assistant message. Uses a fast non-cryptographic approach: + * the first 200 normalised chars (for quick prefix screening) combined + * with the total length and a simple 53-bit numeric hash of the full + * string. This avoids false positives when two responses share the same + * opening paragraph but diverge later. + */ +export function buildDeliveredTextHash(text: string): string { + const normalized = normalizeTextForComparison(text); + if (normalized.length <= 200) { + return normalized; + } + // 53-bit FNV-1a-inspired hash (fits in a JS safe integer). + let h = 0x811c9dc5; + for (let i = 0; i < normalized.length; i++) { + h ^= normalized.charCodeAt(i); + h = Math.imul(h, 0x01000193); + } + // Combine prefix + length + full-text hash for uniqueness. + return `${normalized.slice(0, 200)}|${normalized.length}|${(h >>> 0).toString(36)}`; +} + +/** + * Check whether `text` was recently delivered (cross-turn). + */ +export function isRecentlyDelivered( + text: string, + recentDelivered: RecentDeliveredEntry[], + now?: number, +): boolean { + const hash = buildDeliveredTextHash(text); + if (!hash || hash.length < MIN_DUPLICATE_TEXT_LENGTH) { + return false; + } + const currentTime = now ?? Date.now(); + return recentDelivered.some( + (entry) => currentTime - entry.timestamp < RECENT_DELIVERED_TTL_MS && entry.hash === hash, + ); +} + +/** + * Record a delivered text in the rolling cache. + */ +export function recordDeliveredText( + text: string, + recentDelivered: RecentDeliveredEntry[], + now?: number, +): void { + const hash = buildDeliveredTextHash(text); + if (!hash || hash.length < MIN_DUPLICATE_TEXT_LENGTH) { + return; + } + const currentTime = now ?? Date.now(); + // Evict expired entries. + for (let i = recentDelivered.length - 1; i >= 0; i--) { + if (currentTime - recentDelivered[i].timestamp >= RECENT_DELIVERED_TTL_MS) { + recentDelivered.splice(i, 1); + } + } + // Avoid duplicate entries for the same hash. + const existing = recentDelivered.findIndex((e) => e.hash === hash); + if (existing >= 0) { + recentDelivered[existing].timestamp = currentTime; + return; + } + recentDelivered.push({ hash, timestamp: currentTime }); + // Trim oldest if over capacity. + while (recentDelivered.length > RECENT_DELIVERED_MAX) { + recentDelivered.shift(); + } +} + /** * Normalize text for duplicate comparison. * - Trims whitespace diff --git a/src/agents/pi-embedded-subscribe.handlers.types.ts b/src/agents/pi-embedded-subscribe.handlers.types.ts index 4436e6f6aa3..c2eef463104 100644 --- a/src/agents/pi-embedded-subscribe.handlers.types.ts +++ b/src/agents/pi-embedded-subscribe.handlers.types.ts @@ -4,6 +4,7 @@ import type { ReasoningLevel } from "../auto-reply/thinking.js"; import type { InlineCodeState } from "../markdown/code-spans.js"; import type { HookRunner } from "../plugins/hooks.js"; import type { EmbeddedBlockChunker } from "./pi-embedded-block-chunker.js"; +import type { RecentDeliveredEntry } from "./pi-embedded-helpers/messaging-dedupe.js"; import type { MessagingToolSend } from "./pi-embedded-messaging.js"; import type { BlockReplyChunking, @@ -68,6 +69,7 @@ export type EmbeddedPiSubscribeState = { compactionRetryPromise: Promise | null; unsubscribed: boolean; + recentDeliveredTexts: RecentDeliveredEntry[]; messagingToolSentTexts: string[]; messagingToolSentTextsNormalized: string[]; messagingToolSentTargets: MessagingToolSend[]; diff --git a/src/agents/pi-embedded-subscribe.ts b/src/agents/pi-embedded-subscribe.ts index 83592372e80..3b8aeaa2cb2 100644 --- a/src/agents/pi-embedded-subscribe.ts +++ b/src/agents/pi-embedded-subscribe.ts @@ -9,8 +9,11 @@ import { buildCodeSpanIndex, createInlineCodeState } from "../markdown/code-span import { EmbeddedBlockChunker } from "./pi-embedded-block-chunker.js"; import { isMessagingToolDuplicateNormalized, + isRecentlyDelivered, normalizeTextForComparison, + recordDeliveredText, } from "./pi-embedded-helpers.js"; +import type { RecentDeliveredEntry } from "./pi-embedded-helpers.js"; import { createEmbeddedPiSessionEventHandler } from "./pi-embedded-subscribe.handlers.js"; import type { EmbeddedPiSubscribeContext, @@ -70,6 +73,7 @@ export function subscribeEmbeddedPiSession(params: SubscribeEmbeddedPiSessionPar compactionRetryReject: undefined, compactionRetryPromise: null, unsubscribed: false, + recentDeliveredTexts: [] as RecentDeliveredEntry[], messagingToolSentTexts: [], messagingToolSentTextsNormalized: [], messagingToolSentTargets: [], @@ -103,12 +107,20 @@ export function subscribeEmbeddedPiSession(params: SubscribeEmbeddedPiSessionPar const partialReplyDirectiveAccumulator = createStreamingDirectiveAccumulator(); const emitBlockReplySafely = ( payload: Parameters>[0], + opts?: { sourceText?: string }, ) => { if (!params.onBlockReply) { return; } void Promise.resolve() .then(() => params.onBlockReply?.(payload)) + .then(() => { + // Record in cross-turn dedup cache only after successful delivery. + // Recording before send would suppress retries on transient failures. + if (opts?.sourceText) { + recordDeliveredText(opts.sourceText, state.recentDeliveredTexts); + } + }) .catch((err) => { log.warn(`block reply callback failed: ${String(err)}`); }); @@ -149,15 +161,21 @@ export function subscribeEmbeddedPiSession(params: SubscribeEmbeddedPiSessionPar }; const shouldSkipAssistantText = (text: string) => { - if (state.lastAssistantTextMessageIndex !== state.assistantMessageIndex) { - return false; + // Same-turn dedup (existing behaviour). + if (state.lastAssistantTextMessageIndex === state.assistantMessageIndex) { + const trimmed = text.trimEnd(); + if (trimmed && trimmed === state.lastAssistantTextTrimmed) { + return true; + } + const normalized = normalizeTextForComparison(text); + if (normalized.length > 0 && normalized === state.lastAssistantTextNormalized) { + return true; + } } - const trimmed = text.trimEnd(); - if (trimmed && trimmed === state.lastAssistantTextTrimmed) { - return true; - } - const normalized = normalizeTextForComparison(text); - if (normalized.length > 0 && normalized === state.lastAssistantTextNormalized) { + // Cross-turn dedup: catch duplicates caused by context compaction replaying + // the same assistant text in a new turn. Uses a rolling hash cache with a + // 1-hour TTL so intentionally repeated content still goes through eventually. + if (isRecentlyDelivered(text, state.recentDeliveredTexts)) { return true; } return false; @@ -172,6 +190,8 @@ export function subscribeEmbeddedPiSession(params: SubscribeEmbeddedPiSessionPar } assistantTexts.push(text); rememberAssistantText(text); + // Record in cross-turn cache so post-compaction replays are caught. + recordDeliveredText(text, state.recentDeliveredTexts); }; const finalizeAssistantTexts = (args: { @@ -191,6 +211,7 @@ export function subscribeEmbeddedPiSession(params: SubscribeEmbeddedPiSessionPar text, ); rememberAssistantText(text); + recordDeliveredText(text, state.recentDeliveredTexts); } else { pushAssistantText(text); } @@ -505,6 +526,9 @@ export function subscribeEmbeddedPiSession(params: SubscribeEmbeddedPiSessionPar assistantTexts.push(chunk); rememberAssistantText(chunk); if (!params.onBlockReply) { + // No block reply callback — text is accumulated for final delivery. + // Record now since there's no async send that could fail. + recordDeliveredText(chunk, state.recentDeliveredTexts); return; } const splitResult = replyDirectiveAccumulator.consume(chunk); @@ -523,14 +547,17 @@ export function subscribeEmbeddedPiSession(params: SubscribeEmbeddedPiSessionPar if (!cleanedText && (!mediaUrls || mediaUrls.length === 0) && !audioAsVoice) { return; } - emitBlockReplySafely({ - text: cleanedText, - mediaUrls: mediaUrls?.length ? mediaUrls : undefined, - audioAsVoice, - replyToId, - replyToTag, - replyToCurrent, - }); + emitBlockReplySafely( + { + text: cleanedText, + mediaUrls: mediaUrls?.length ? mediaUrls : undefined, + audioAsVoice, + replyToId, + replyToTag, + replyToCurrent, + }, + { sourceText: chunk }, + ); }; const consumeReplyDirectives = (text: string, options?: { final?: boolean }) =>