diff --git a/src/agents/pi-embedded-helpers/messaging-dedupe.ts b/src/agents/pi-embedded-helpers/messaging-dedupe.ts index d4f9eb973cd..bb4b4116b4a 100644 --- a/src/agents/pi-embedded-helpers/messaging-dedupe.ts +++ b/src/agents/pi-embedded-helpers/messaging-dedupe.ts @@ -23,7 +23,7 @@ export type RecentDeliveredEntry = { * Build a collision-resistant hash from the full normalised text of a * delivered assistant message. Uses a fast non-cryptographic approach: * the first 200 normalised chars (for quick prefix screening) combined - * with the total length and a simple 53-bit numeric hash of the full + * with the total length and a simple 32-bit numeric hash of the full * string. This avoids false positives when two responses share the same * opening paragraph but diverge later. */ @@ -32,7 +32,7 @@ export function buildDeliveredTextHash(text: string): string { if (normalized.length <= 200) { return normalized; } - // 53-bit FNV-1a-inspired hash (fits in a JS safe integer). + // 32-bit FNV-1a-inspired hash (Math.imul + >>> 0 operate on 32-bit integers). let h = 0x811c9dc5; for (let i = 0; i < normalized.length; i++) { h ^= normalized.charCodeAt(i); diff --git a/src/agents/pi-embedded-subscribe.ts b/src/agents/pi-embedded-subscribe.ts index 3b8aeaa2cb2..7d827385bbe 100644 --- a/src/agents/pi-embedded-subscribe.ts +++ b/src/agents/pi-embedded-subscribe.ts @@ -114,13 +114,6 @@ export function subscribeEmbeddedPiSession(params: SubscribeEmbeddedPiSessionPar } void Promise.resolve() .then(() => params.onBlockReply?.(payload)) - .then(() => { - // Record in cross-turn dedup cache only after successful delivery. - // Recording before send would suppress retries on transient failures. - if (opts?.sourceText) { - recordDeliveredText(opts.sourceText, state.recentDeliveredTexts); - } - }) .catch((err) => { log.warn(`block reply callback failed: ${String(err)}`); }); @@ -525,10 +518,14 @@ export function subscribeEmbeddedPiSession(params: SubscribeEmbeddedPiSessionPar state.lastBlockReplyText = chunk; assistantTexts.push(chunk); rememberAssistantText(chunk); + // Record in cross-turn dedup cache synchronously — before the async + // delivery — to close the race window where context compaction could + // trigger a new turn while the Telegram send is still in-flight. + // This matches the synchronous recording in pushAssistantText. + // Trade-off: if the send fails transiently the text stays in the cache, + // but the 1-hour TTL ensures it won't suppress the same text forever. + recordDeliveredText(chunk, state.recentDeliveredTexts); if (!params.onBlockReply) { - // No block reply callback — text is accumulated for final delivery. - // Record now since there's no async send that could fail. - recordDeliveredText(chunk, state.recentDeliveredTexts); return; } const splitResult = replyDirectiveAccumulator.consume(chunk);