From edd7c80cdc28318fe9b8fd7369abee993d8a2521 Mon Sep 17 00:00:00 2001 From: Vincent Koc Date: Thu, 12 Mar 2026 03:51:08 -0400 Subject: [PATCH] Pi dedupe: avoid caching undelivered replay text --- src/agents/pi-embedded-subscribe.ts | 54 ++++++++++++++++------------- 1 file changed, 29 insertions(+), 25 deletions(-) diff --git a/src/agents/pi-embedded-subscribe.ts b/src/agents/pi-embedded-subscribe.ts index 7d827385bbe..53a5a1d99e9 100644 --- a/src/agents/pi-embedded-subscribe.ts +++ b/src/agents/pi-embedded-subscribe.ts @@ -107,7 +107,6 @@ export function subscribeEmbeddedPiSession(params: SubscribeEmbeddedPiSessionPar const partialReplyDirectiveAccumulator = createStreamingDirectiveAccumulator(); const emitBlockReplySafely = ( payload: Parameters>[0], - opts?: { sourceText?: string }, ) => { if (!params.onBlockReply) { return; @@ -183,8 +182,12 @@ export function subscribeEmbeddedPiSession(params: SubscribeEmbeddedPiSessionPar } assistantTexts.push(text); rememberAssistantText(text); - // Record in cross-turn cache so post-compaction replays are caught. - recordDeliveredText(text, state.recentDeliveredTexts); + // Non-block assistant text may still be replayed after willRetry compaction + // before any channel delivery has happened, so only block-reply paths record + // cross-turn dedup state eagerly. + if (params.onBlockReply) { + recordDeliveredText(text, state.recentDeliveredTexts); + } }; const finalizeAssistantTexts = (args: { @@ -204,7 +207,9 @@ export function subscribeEmbeddedPiSession(params: SubscribeEmbeddedPiSessionPar text, ); rememberAssistantText(text); - recordDeliveredText(text, state.recentDeliveredTexts); + if (params.onBlockReply) { + recordDeliveredText(text, state.recentDeliveredTexts); + } } else { pushAssistantText(text); } @@ -515,17 +520,10 @@ export function subscribeEmbeddedPiSession(params: SubscribeEmbeddedPiSessionPar return; } - state.lastBlockReplyText = chunk; - assistantTexts.push(chunk); - rememberAssistantText(chunk); - // Record in cross-turn dedup cache synchronously — before the async - // delivery — to close the race window where context compaction could - // trigger a new turn while the Telegram send is still in-flight. - // This matches the synchronous recording in pushAssistantText. - // Trade-off: if the send fails transiently the text stays in the cache, - // but the 1-hour TTL ensures it won't suppress the same text forever. - recordDeliveredText(chunk, state.recentDeliveredTexts); if (!params.onBlockReply) { + state.lastBlockReplyText = chunk; + assistantTexts.push(chunk); + rememberAssistantText(chunk); return; } const splitResult = replyDirectiveAccumulator.consume(chunk); @@ -544,17 +542,23 @@ export function subscribeEmbeddedPiSession(params: SubscribeEmbeddedPiSessionPar if (!cleanedText && (!mediaUrls || mediaUrls.length === 0) && !audioAsVoice) { return; } - emitBlockReplySafely( - { - text: cleanedText, - mediaUrls: mediaUrls?.length ? mediaUrls : undefined, - audioAsVoice, - replyToId, - replyToTag, - replyToCurrent, - }, - { sourceText: chunk }, - ); + if (cleanedText) { + state.lastBlockReplyText = cleanedText; + assistantTexts.push(cleanedText); + rememberAssistantText(cleanedText); + // Record in cross-turn dedup cache synchronously before async delivery + // so compaction-started follow-up turns see already-emitted text, but + // only after reply directives have produced a real outbound payload. + recordDeliveredText(cleanedText, state.recentDeliveredTexts); + } + emitBlockReplySafely({ + text: cleanedText, + mediaUrls: mediaUrls?.length ? mediaUrls : undefined, + audioAsVoice, + replyToId, + replyToTag, + replyToCurrent, + }); }; const consumeReplyDirectives = (text: string, options?: { final?: boolean }) =>