Pi dedupe: avoid caching undelivered replay text

This commit is contained in:
Vincent Koc 2026-03-12 03:51:08 -04:00
parent db7c093f07
commit edd7c80cdc

View File

@ -107,7 +107,6 @@ export function subscribeEmbeddedPiSession(params: SubscribeEmbeddedPiSessionPar
const partialReplyDirectiveAccumulator = createStreamingDirectiveAccumulator();
const emitBlockReplySafely = (
payload: Parameters<NonNullable<SubscribeEmbeddedPiSessionParams["onBlockReply"]>>[0],
opts?: { sourceText?: string },
) => {
if (!params.onBlockReply) {
return;
@ -183,8 +182,12 @@ export function subscribeEmbeddedPiSession(params: SubscribeEmbeddedPiSessionPar
}
assistantTexts.push(text);
rememberAssistantText(text);
// Record in cross-turn cache so post-compaction replays are caught.
recordDeliveredText(text, state.recentDeliveredTexts);
// Non-block assistant text may still be replayed after willRetry compaction
// before any channel delivery has happened, so only block-reply paths record
// cross-turn dedup state eagerly.
if (params.onBlockReply) {
recordDeliveredText(text, state.recentDeliveredTexts);
}
};
const finalizeAssistantTexts = (args: {
@ -204,7 +207,9 @@ export function subscribeEmbeddedPiSession(params: SubscribeEmbeddedPiSessionPar
text,
);
rememberAssistantText(text);
recordDeliveredText(text, state.recentDeliveredTexts);
if (params.onBlockReply) {
recordDeliveredText(text, state.recentDeliveredTexts);
}
} else {
pushAssistantText(text);
}
@ -515,17 +520,10 @@ export function subscribeEmbeddedPiSession(params: SubscribeEmbeddedPiSessionPar
return;
}
state.lastBlockReplyText = chunk;
assistantTexts.push(chunk);
rememberAssistantText(chunk);
// Record in cross-turn dedup cache synchronously — before the async
// delivery — to close the race window where context compaction could
// trigger a new turn while the Telegram send is still in-flight.
// This matches the synchronous recording in pushAssistantText.
// Trade-off: if the send fails transiently the text stays in the cache,
// but the 1-hour TTL ensures it won't suppress the same text forever.
recordDeliveredText(chunk, state.recentDeliveredTexts);
if (!params.onBlockReply) {
state.lastBlockReplyText = chunk;
assistantTexts.push(chunk);
rememberAssistantText(chunk);
return;
}
const splitResult = replyDirectiveAccumulator.consume(chunk);
@ -544,17 +542,23 @@ export function subscribeEmbeddedPiSession(params: SubscribeEmbeddedPiSessionPar
if (!cleanedText && (!mediaUrls || mediaUrls.length === 0) && !audioAsVoice) {
return;
}
emitBlockReplySafely(
{
text: cleanedText,
mediaUrls: mediaUrls?.length ? mediaUrls : undefined,
audioAsVoice,
replyToId,
replyToTag,
replyToCurrent,
},
{ sourceText: chunk },
);
if (cleanedText) {
state.lastBlockReplyText = cleanedText;
assistantTexts.push(cleanedText);
rememberAssistantText(cleanedText);
// Record in cross-turn dedup cache synchronously before async delivery
// so compaction-started follow-up turns see already-emitted text, but
// only after reply directives have produced a real outbound payload.
recordDeliveredText(cleanedText, state.recentDeliveredTexts);
}
emitBlockReplySafely({
text: cleanedText,
mediaUrls: mediaUrls?.length ? mediaUrls : undefined,
audioAsVoice,
replyToId,
replyToTag,
replyToCurrent,
});
};
const consumeReplyDirectives = (text: string, options?: { final?: boolean }) =>