Compare commits

...

5 Commits

Author SHA1 Message Date
Vincent Koc
58c9d91dfb Changelog: add Telegram compaction dedupe note 2026-03-12 03:52:36 -04:00
Vincent Koc
092578d09e Tests: cover Pi dedupe compaction replay edges 2026-03-12 03:51:55 -04:00
Vincent Koc
edd7c80cdc Pi dedupe: avoid caching undelivered replay text 2026-03-12 03:51:08 -04:00
eveiljuice
db7c093f07 fix: record cross-turn dedup synchronously before async send
Move recordDeliveredText() from the async post-delivery callback in
emitBlockReplySafely to the synchronous path in emitBlockChunk, before
the Telegram send. This closes the race window where context compaction
could trigger a new assistant turn while the delivery is still in-flight,
bypassing the dedup cache entirely.

Trade-off: if the send fails transiently the text remains in the cache,
but the 1-hour TTL ensures it won't suppress the same content forever.
This matches the synchronous recording already done in pushAssistantText.

Also fixes the hash comment: Math.imul + >>> 0 produce a 32-bit hash,
not 53-bit.

Addresses review feedback from greptile-apps.
2026-03-12 03:44:51 -04:00
eveiljuice
6bee86e618 fix(agents): add cross-turn dedup cache for embedded assistant text
shouldSkipAssistantText() only deduplicated within the same
assistantMessageIndex. After context compaction the index resets, so
when the model re-emits the same text from the compaction summary it
bypasses the dedup check and gets delivered as a new message.

Added recentDeliveredTexts — a bounded rolling cache (max 20 entries,
1h TTL) that persists across assistant message turns within a session.
Before delivering text, normalized characters are compared against
recently delivered hashes. Matches are skipped.

Includes unit tests covering hash building, cross-turn duplicate
detection, TTL expiration, whitespace/casing normalization, cache
eviction and capacity limits.

Closes #37702
Related: #38434, #33308, #33453, #33592, #37697, #30316
2026-03-12 03:44:51 -04:00
8 changed files with 320 additions and 14 deletions

View File

@ -185,6 +185,7 @@ Docs: https://docs.openclaw.ai
- TUI/theme: detect light terminal backgrounds via `COLORFGBG` and pick a WCAG AA-compliant light palette, with `OPENCLAW_THEME=light|dark` override for terminals without auto-detection. (#38636) Thanks @ademczuk and @vincentkoc.
- Agents/openai-codex: normalize `gpt-5.4` fallback transport back to `openai-codex-responses` on `chatgpt.com/backend-api` when config drifts to the generic OpenAI responses endpoint. (#38736) Thanks @0xsline.
- Models/openai-codex GPT-5.4 forward-compat: use the GPT-5.4 1,050,000-token context window and 128,000 max tokens for `openai-codex/gpt-5.4` instead of inheriting stale legacy Codex limits in resolver fallbacks and model listing. (#37876) thanks @yuweuii.
- Telegram/compaction replay dedupe: keep cross-turn duplicate suppression for real delivered reply text while avoiding directive-only chunks and non-block compaction retries that could drop or mis-thread replayed Telegram replies. (#39456) Thanks @eveiljuice.
- Tools/web search: restore Perplexity OpenRouter/Sonar compatibility for legacy `OPENROUTER_API_KEY`, `sk-or-...`, and explicit `perplexity.baseUrl` / `model` setups while keeping direct Perplexity keys on the native Search API path. (#39937) Thanks @obviyus.
- Agents/failover: detect Amazon Bedrock `Too many tokens per day` quota errors as rate limits across fallback, cron retry, and memory embeddings while keeping context-window `too many tokens per request` errors out of the rate-limit lane. (#39377) Thanks @gambletan.
- Mattermost replies: keep `root_id` pinned to the existing thread root when an agent replies inside a thread, while still using reply-target threading for top-level posts. (#27744) thanks @hnykda.

View File

@ -57,7 +57,10 @@ export {
isMessagingToolDuplicate,
isMessagingToolDuplicateNormalized,
normalizeTextForComparison,
isRecentlyDelivered,
recordDeliveredText,
} from "./pi-embedded-helpers/messaging-dedupe.js";
export type { RecentDeliveredEntry } from "./pi-embedded-helpers/messaging-dedupe.js";
export { pickFallbackThinkingLevel } from "./pi-embedded-helpers/thinking.js";

View File

@ -0,0 +1,110 @@
import { describe, expect, it } from "vitest";
import {
buildDeliveredTextHash,
isRecentlyDelivered,
normalizeTextForComparison,
recordDeliveredText,
type RecentDeliveredEntry,
} from "./messaging-dedupe.js";
describe("normalizeTextForComparison", () => {
it("lowercases and trims", () => {
expect(normalizeTextForComparison(" Hello World ")).toBe("hello world");
});
it("collapses whitespace", () => {
expect(normalizeTextForComparison("hello world")).toBe("hello world");
});
});
describe("cross-turn dedup", () => {
describe("buildDeliveredTextHash", () => {
it("returns normalized prefix up to 200 chars", () => {
const hash = buildDeliveredTextHash("Hello World!");
expect(hash).toBe("hello world!");
});
it("includes length and full-text hash for strings over 200 chars", () => {
const long = "a".repeat(300);
const hash = buildDeliveredTextHash(long);
expect(hash).toContain("|300|");
expect(hash.startsWith("a".repeat(200))).toBe(true);
});
it("produces different hashes for texts with same prefix but different tails", () => {
const base = "x".repeat(200);
const textA = base + " ending alpha with more content here";
const textB = base + " ending beta with different content";
expect(buildDeliveredTextHash(textA)).not.toBe(buildDeliveredTextHash(textB));
});
it("returns empty for very short text", () => {
expect(buildDeliveredTextHash("hi")).toBe("hi");
});
});
describe("isRecentlyDelivered", () => {
it("returns false for empty cache", () => {
expect(isRecentlyDelivered("Hello world test message", [])).toBe(false);
});
it("returns true when text was recently recorded", () => {
const cache: RecentDeliveredEntry[] = [];
const now = Date.now();
recordDeliveredText("Hello world test message", cache, now);
expect(isRecentlyDelivered("Hello world test message", cache, now + 1000)).toBe(true);
});
it("returns false after TTL expires", () => {
const cache: RecentDeliveredEntry[] = [];
const now = Date.now();
recordDeliveredText("Hello world test message", cache, now);
// 1 hour + 1ms later
expect(isRecentlyDelivered("Hello world test message", cache, now + 3_600_001)).toBe(false);
});
it("returns false for text shorter than MIN_DUPLICATE_TEXT_LENGTH", () => {
const cache: RecentDeliveredEntry[] = [];
recordDeliveredText("short", cache);
expect(isRecentlyDelivered("short", cache)).toBe(false);
});
it("detects duplicates with different whitespace/casing", () => {
const cache: RecentDeliveredEntry[] = [];
const now = Date.now();
recordDeliveredText(" Hello World Test Message ", cache, now);
expect(isRecentlyDelivered("hello world test message", cache, now)).toBe(true);
});
});
describe("recordDeliveredText", () => {
it("evicts expired entries on record", () => {
const cache: RecentDeliveredEntry[] = [];
const now = Date.now();
recordDeliveredText("First message that is long enough", cache, now);
// Record second much later (> TTL)
recordDeliveredText("Second message that is long enough", cache, now + 3_700_000);
// First should be evicted
expect(cache.length).toBe(1);
expect(cache[0].hash).toContain("second");
});
it("caps at RECENT_DELIVERED_MAX entries", () => {
const cache: RecentDeliveredEntry[] = [];
const now = Date.now();
for (let i = 0; i < 25; i++) {
recordDeliveredText(`Unique message number ${i} with enough length`, cache, now + i);
}
expect(cache.length).toBe(20);
});
it("updates timestamp for duplicate hash instead of adding", () => {
const cache: RecentDeliveredEntry[] = [];
const now = Date.now();
recordDeliveredText("Same message repeated in session", cache, now);
recordDeliveredText("Same message repeated in session", cache, now + 5000);
expect(cache.length).toBe(1);
expect(cache[0].timestamp).toBe(now + 5000);
});
});
});

View File

@ -1,5 +1,97 @@
const MIN_DUPLICATE_TEXT_LENGTH = 10;
/**
* Maximum number of recent delivered text hashes to retain for cross-turn
* deduplication. Keeps memory bounded while covering the typical window
* where context compaction may cause the model to re-emit a previous reply.
*/
const RECENT_DELIVERED_MAX = 20;
/**
* TTL for entries in the cross-turn dedup cache (1 hour).
* After this period the entry is evicted and the same text can be delivered
* again (which is desirable for intentionally repeated content).
*/
const RECENT_DELIVERED_TTL_MS = 60 * 60_000;
export type RecentDeliveredEntry = {
hash: string;
timestamp: number;
};
/**
* Build a collision-resistant hash from the full normalised text of a
* delivered assistant message. Uses a fast non-cryptographic approach:
* the first 200 normalised chars (for quick prefix screening) combined
* with the total length and a simple 32-bit numeric hash of the full
* string. This avoids false positives when two responses share the same
* opening paragraph but diverge later.
*/
export function buildDeliveredTextHash(text: string): string {
const normalized = normalizeTextForComparison(text);
if (normalized.length <= 200) {
return normalized;
}
// 32-bit FNV-1a-inspired hash (Math.imul + >>> 0 operate on 32-bit integers).
let h = 0x811c9dc5;
for (let i = 0; i < normalized.length; i++) {
h ^= normalized.charCodeAt(i);
h = Math.imul(h, 0x01000193);
}
// Combine prefix + length + full-text hash for uniqueness.
return `${normalized.slice(0, 200)}|${normalized.length}|${(h >>> 0).toString(36)}`;
}
/**
* Check whether `text` was recently delivered (cross-turn).
*/
export function isRecentlyDelivered(
text: string,
recentDelivered: RecentDeliveredEntry[],
now?: number,
): boolean {
const hash = buildDeliveredTextHash(text);
if (!hash || hash.length < MIN_DUPLICATE_TEXT_LENGTH) {
return false;
}
const currentTime = now ?? Date.now();
return recentDelivered.some(
(entry) => currentTime - entry.timestamp < RECENT_DELIVERED_TTL_MS && entry.hash === hash,
);
}
/**
* Record a delivered text in the rolling cache.
*/
export function recordDeliveredText(
text: string,
recentDelivered: RecentDeliveredEntry[],
now?: number,
): void {
const hash = buildDeliveredTextHash(text);
if (!hash || hash.length < MIN_DUPLICATE_TEXT_LENGTH) {
return;
}
const currentTime = now ?? Date.now();
// Evict expired entries.
for (let i = recentDelivered.length - 1; i >= 0; i--) {
if (currentTime - recentDelivered[i].timestamp >= RECENT_DELIVERED_TTL_MS) {
recentDelivered.splice(i, 1);
}
}
// Avoid duplicate entries for the same hash.
const existing = recentDelivered.findIndex((e) => e.hash === hash);
if (existing >= 0) {
recentDelivered[existing].timestamp = currentTime;
return;
}
recentDelivered.push({ hash, timestamp: currentTime });
// Trim oldest if over capacity.
while (recentDelivered.length > RECENT_DELIVERED_MAX) {
recentDelivered.shift();
}
}
/**
* Normalize text for duplicate comparison.
* - Trims whitespace

View File

@ -4,6 +4,7 @@ import type { ReasoningLevel } from "../auto-reply/thinking.js";
import type { InlineCodeState } from "../markdown/code-spans.js";
import type { HookRunner } from "../plugins/hooks.js";
import type { EmbeddedBlockChunker } from "./pi-embedded-block-chunker.js";
import type { RecentDeliveredEntry } from "./pi-embedded-helpers/messaging-dedupe.js";
import type { MessagingToolSend } from "./pi-embedded-messaging.js";
import type {
BlockReplyChunking,
@ -68,6 +69,7 @@ export type EmbeddedPiSubscribeState = {
compactionRetryPromise: Promise<void> | null;
unsubscribed: boolean;
recentDeliveredTexts: RecentDeliveredEntry[];
messagingToolSentTexts: string[];
messagingToolSentTextsNormalized: string[];
messagingToolSentTargets: MessagingToolSend[];

View File

@ -27,7 +27,7 @@ describe("subscribeEmbeddedPiSession reply tags", () => {
return { emit, onBlockReply };
}
it("carries reply_to_current across tag-only block chunks", () => {
it("carries reply_to_current across tag-only block chunks", async () => {
const { emit, onBlockReply } = createBlockReplyHarness();
emit({ type: "message_start", message: { role: "assistant" } });
@ -39,6 +39,7 @@ describe("subscribeEmbeddedPiSession reply tags", () => {
content: [{ type: "text", text: "[[reply_to_current]]\nHello" }],
} as AssistantMessage;
emit({ type: "message_end", message: assistantMessage });
await Promise.resolve();
expect(onBlockReply).toHaveBeenCalledTimes(1);
const payload = onBlockReply.mock.calls[0]?.[0];
@ -47,7 +48,7 @@ describe("subscribeEmbeddedPiSession reply tags", () => {
expect(payload?.replyToTag).toBe(true);
});
it("flushes trailing directive tails on stream end", () => {
it("flushes trailing directive tails on stream end", async () => {
const { emit, onBlockReply } = createBlockReplyHarness();
emit({ type: "message_start", message: { role: "assistant" } });
@ -59,6 +60,7 @@ describe("subscribeEmbeddedPiSession reply tags", () => {
content: [{ type: "text", text: "Hello [[" }],
} as AssistantMessage;
emit({ type: "message_end", message: assistantMessage });
await Promise.resolve();
expect(onBlockReply).toHaveBeenCalledTimes(2);
expect(onBlockReply.mock.calls[0]?.[0]?.text).toBe("Hello");
@ -88,4 +90,43 @@ describe("subscribeEmbeddedPiSession reply tags", () => {
expect(call[0]?.text?.includes("[[reply_to")).toBe(false);
}
});
it("rebuilds reply_to_current after a compaction retry replays a directive chunk", async () => {
const { session, emit } = createStubSessionHarness();
const onBlockReply = vi.fn();
subscribeEmbeddedPiSession({
session,
runId: "run",
onBlockReply,
blockReplyBreak: "text_end",
blockReplyChunking: {
minChars: 1,
maxChars: 50,
breakPreference: "newline",
},
});
emit({ type: "message_start", message: { role: "assistant" } });
emitAssistantTextDelta({ emit, delta: "[[reply_to_current]]" });
emit({ type: "auto_compaction_end", willRetry: true });
emit({ type: "message_start", message: { role: "assistant" } });
emitAssistantTextDelta({ emit, delta: "[[reply_to_current]]\nHello again" });
emitAssistantTextEnd({ emit });
emit({
type: "message_end",
message: {
role: "assistant",
content: [{ type: "text", text: "[[reply_to_current]]\nHello again" }],
} as AssistantMessage,
});
await Promise.resolve();
expect(onBlockReply).toHaveBeenCalledTimes(1);
const payload = onBlockReply.mock.calls[0]?.[0];
expect(payload?.text).toBe("Hello again");
expect(payload?.replyToCurrent).toBe(true);
expect(payload?.replyToTag).toBe(true);
});
});

View File

@ -11,7 +11,7 @@ import { makeZeroUsageSnapshot } from "./usage.js";
type SessionEventHandler = (evt: unknown) => void;
describe("subscribeEmbeddedPiSession", () => {
it("splits long single-line fenced blocks with reopen/close", () => {
it("splits long single-line fenced blocks with reopen/close", async () => {
const onBlockReply = vi.fn();
const { emit } = createParagraphChunkedBlockReplyHarness({
onBlockReply,
@ -23,6 +23,7 @@ describe("subscribeEmbeddedPiSession", () => {
const text = `\`\`\`json\n${"x".repeat(120)}\n\`\`\``;
emitAssistantTextDeltaAndEnd({ emit, text });
await Promise.resolve();
expectFencedChunks(onBlockReply.mock.calls, "```json");
});
it("waits for auto-compaction retry and clears buffered text", async () => {
@ -152,4 +153,32 @@ describe("subscribeEmbeddedPiSession", () => {
const usage = (session.messages?.[0] as { usage?: unknown } | undefined)?.usage;
expect(usage).toEqual(makeZeroUsageSnapshot());
});
it("does not cache non-block assistant text across a willRetry compaction reset", () => {
const listeners: SessionEventHandler[] = [];
const session = {
subscribe: (listener: SessionEventHandler) => {
listeners.push(listener);
return () => {};
},
} as unknown as Parameters<typeof subscribeEmbeddedPiSession>[0]["session"];
const subscription = subscribeEmbeddedPiSession({
session,
runId: "run-4",
});
const assistantMessage = {
role: "assistant",
content: [{ type: "text", text: "Repeated completion text" }],
} as AssistantMessage;
for (const listener of listeners) {
listener({ type: "message_end", message: assistantMessage });
listener({ type: "auto_compaction_end", willRetry: true });
listener({ type: "message_end", message: assistantMessage });
}
expect(subscription.assistantTexts).toEqual(["Repeated completion text"]);
});
});

View File

@ -9,8 +9,11 @@ import { buildCodeSpanIndex, createInlineCodeState } from "../markdown/code-span
import { EmbeddedBlockChunker } from "./pi-embedded-block-chunker.js";
import {
isMessagingToolDuplicateNormalized,
isRecentlyDelivered,
normalizeTextForComparison,
recordDeliveredText,
} from "./pi-embedded-helpers.js";
import type { RecentDeliveredEntry } from "./pi-embedded-helpers.js";
import { createEmbeddedPiSessionEventHandler } from "./pi-embedded-subscribe.handlers.js";
import type {
EmbeddedPiSubscribeContext,
@ -70,6 +73,7 @@ export function subscribeEmbeddedPiSession(params: SubscribeEmbeddedPiSessionPar
compactionRetryReject: undefined,
compactionRetryPromise: null,
unsubscribed: false,
recentDeliveredTexts: [] as RecentDeliveredEntry[],
messagingToolSentTexts: [],
messagingToolSentTextsNormalized: [],
messagingToolSentTargets: [],
@ -149,15 +153,21 @@ export function subscribeEmbeddedPiSession(params: SubscribeEmbeddedPiSessionPar
};
const shouldSkipAssistantText = (text: string) => {
if (state.lastAssistantTextMessageIndex !== state.assistantMessageIndex) {
return false;
// Same-turn dedup (existing behaviour).
if (state.lastAssistantTextMessageIndex === state.assistantMessageIndex) {
const trimmed = text.trimEnd();
if (trimmed && trimmed === state.lastAssistantTextTrimmed) {
return true;
}
const normalized = normalizeTextForComparison(text);
if (normalized.length > 0 && normalized === state.lastAssistantTextNormalized) {
return true;
}
}
const trimmed = text.trimEnd();
if (trimmed && trimmed === state.lastAssistantTextTrimmed) {
return true;
}
const normalized = normalizeTextForComparison(text);
if (normalized.length > 0 && normalized === state.lastAssistantTextNormalized) {
// Cross-turn dedup: catch duplicates caused by context compaction replaying
// the same assistant text in a new turn. Uses a rolling hash cache with a
// 1-hour TTL so intentionally repeated content still goes through eventually.
if (isRecentlyDelivered(text, state.recentDeliveredTexts)) {
return true;
}
return false;
@ -172,6 +182,12 @@ export function subscribeEmbeddedPiSession(params: SubscribeEmbeddedPiSessionPar
}
assistantTexts.push(text);
rememberAssistantText(text);
// Non-block assistant text may still be replayed after willRetry compaction
// before any channel delivery has happened, so only block-reply paths record
// cross-turn dedup state eagerly.
if (params.onBlockReply) {
recordDeliveredText(text, state.recentDeliveredTexts);
}
};
const finalizeAssistantTexts = (args: {
@ -191,6 +207,9 @@ export function subscribeEmbeddedPiSession(params: SubscribeEmbeddedPiSessionPar
text,
);
rememberAssistantText(text);
if (params.onBlockReply) {
recordDeliveredText(text, state.recentDeliveredTexts);
}
} else {
pushAssistantText(text);
}
@ -501,10 +520,10 @@ export function subscribeEmbeddedPiSession(params: SubscribeEmbeddedPiSessionPar
return;
}
state.lastBlockReplyText = chunk;
assistantTexts.push(chunk);
rememberAssistantText(chunk);
if (!params.onBlockReply) {
state.lastBlockReplyText = chunk;
assistantTexts.push(chunk);
rememberAssistantText(chunk);
return;
}
const splitResult = replyDirectiveAccumulator.consume(chunk);
@ -523,6 +542,15 @@ export function subscribeEmbeddedPiSession(params: SubscribeEmbeddedPiSessionPar
if (!cleanedText && (!mediaUrls || mediaUrls.length === 0) && !audioAsVoice) {
return;
}
if (cleanedText) {
state.lastBlockReplyText = cleanedText;
assistantTexts.push(cleanedText);
rememberAssistantText(cleanedText);
// Record in cross-turn dedup cache synchronously before async delivery
// so compaction-started follow-up turns see already-emitted text, but
// only after reply directives have produced a real outbound payload.
recordDeliveredText(cleanedText, state.recentDeliveredTexts);
}
emitBlockReplySafely({
text: cleanedText,
mediaUrls: mediaUrls?.length ? mediaUrls : undefined,