Compare commits
5 Commits
main
...
fix/cross-
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
58c9d91dfb | ||
|
|
092578d09e | ||
|
|
edd7c80cdc | ||
|
|
db7c093f07 | ||
|
|
6bee86e618 |
@ -185,6 +185,7 @@ Docs: https://docs.openclaw.ai
|
|||||||
- TUI/theme: detect light terminal backgrounds via `COLORFGBG` and pick a WCAG AA-compliant light palette, with `OPENCLAW_THEME=light|dark` override for terminals without auto-detection. (#38636) Thanks @ademczuk and @vincentkoc.
|
- TUI/theme: detect light terminal backgrounds via `COLORFGBG` and pick a WCAG AA-compliant light palette, with `OPENCLAW_THEME=light|dark` override for terminals without auto-detection. (#38636) Thanks @ademczuk and @vincentkoc.
|
||||||
- Agents/openai-codex: normalize `gpt-5.4` fallback transport back to `openai-codex-responses` on `chatgpt.com/backend-api` when config drifts to the generic OpenAI responses endpoint. (#38736) Thanks @0xsline.
|
- Agents/openai-codex: normalize `gpt-5.4` fallback transport back to `openai-codex-responses` on `chatgpt.com/backend-api` when config drifts to the generic OpenAI responses endpoint. (#38736) Thanks @0xsline.
|
||||||
- Models/openai-codex GPT-5.4 forward-compat: use the GPT-5.4 1,050,000-token context window and 128,000 max tokens for `openai-codex/gpt-5.4` instead of inheriting stale legacy Codex limits in resolver fallbacks and model listing. (#37876) thanks @yuweuii.
|
- Models/openai-codex GPT-5.4 forward-compat: use the GPT-5.4 1,050,000-token context window and 128,000 max tokens for `openai-codex/gpt-5.4` instead of inheriting stale legacy Codex limits in resolver fallbacks and model listing. (#37876) thanks @yuweuii.
|
||||||
|
- Telegram/compaction replay dedupe: keep cross-turn duplicate suppression for real delivered reply text while avoiding directive-only chunks and non-block compaction retries that could drop or mis-thread replayed Telegram replies. (#39456) Thanks @eveiljuice.
|
||||||
- Tools/web search: restore Perplexity OpenRouter/Sonar compatibility for legacy `OPENROUTER_API_KEY`, `sk-or-...`, and explicit `perplexity.baseUrl` / `model` setups while keeping direct Perplexity keys on the native Search API path. (#39937) Thanks @obviyus.
|
- Tools/web search: restore Perplexity OpenRouter/Sonar compatibility for legacy `OPENROUTER_API_KEY`, `sk-or-...`, and explicit `perplexity.baseUrl` / `model` setups while keeping direct Perplexity keys on the native Search API path. (#39937) Thanks @obviyus.
|
||||||
- Agents/failover: detect Amazon Bedrock `Too many tokens per day` quota errors as rate limits across fallback, cron retry, and memory embeddings while keeping context-window `too many tokens per request` errors out of the rate-limit lane. (#39377) Thanks @gambletan.
|
- Agents/failover: detect Amazon Bedrock `Too many tokens per day` quota errors as rate limits across fallback, cron retry, and memory embeddings while keeping context-window `too many tokens per request` errors out of the rate-limit lane. (#39377) Thanks @gambletan.
|
||||||
- Mattermost replies: keep `root_id` pinned to the existing thread root when an agent replies inside a thread, while still using reply-target threading for top-level posts. (#27744) thanks @hnykda.
|
- Mattermost replies: keep `root_id` pinned to the existing thread root when an agent replies inside a thread, while still using reply-target threading for top-level posts. (#27744) thanks @hnykda.
|
||||||
|
|||||||
@ -57,7 +57,10 @@ export {
|
|||||||
isMessagingToolDuplicate,
|
isMessagingToolDuplicate,
|
||||||
isMessagingToolDuplicateNormalized,
|
isMessagingToolDuplicateNormalized,
|
||||||
normalizeTextForComparison,
|
normalizeTextForComparison,
|
||||||
|
isRecentlyDelivered,
|
||||||
|
recordDeliveredText,
|
||||||
} from "./pi-embedded-helpers/messaging-dedupe.js";
|
} from "./pi-embedded-helpers/messaging-dedupe.js";
|
||||||
|
export type { RecentDeliveredEntry } from "./pi-embedded-helpers/messaging-dedupe.js";
|
||||||
|
|
||||||
export { pickFallbackThinkingLevel } from "./pi-embedded-helpers/thinking.js";
|
export { pickFallbackThinkingLevel } from "./pi-embedded-helpers/thinking.js";
|
||||||
|
|
||||||
|
|||||||
110
src/agents/pi-embedded-helpers/messaging-dedupe.test.ts
Normal file
110
src/agents/pi-embedded-helpers/messaging-dedupe.test.ts
Normal file
@ -0,0 +1,110 @@
|
|||||||
|
import { describe, expect, it } from "vitest";
|
||||||
|
import {
|
||||||
|
buildDeliveredTextHash,
|
||||||
|
isRecentlyDelivered,
|
||||||
|
normalizeTextForComparison,
|
||||||
|
recordDeliveredText,
|
||||||
|
type RecentDeliveredEntry,
|
||||||
|
} from "./messaging-dedupe.js";
|
||||||
|
|
||||||
|
describe("normalizeTextForComparison", () => {
|
||||||
|
it("lowercases and trims", () => {
|
||||||
|
expect(normalizeTextForComparison(" Hello World ")).toBe("hello world");
|
||||||
|
});
|
||||||
|
|
||||||
|
it("collapses whitespace", () => {
|
||||||
|
expect(normalizeTextForComparison("hello world")).toBe("hello world");
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
describe("cross-turn dedup", () => {
|
||||||
|
describe("buildDeliveredTextHash", () => {
|
||||||
|
it("returns normalized prefix up to 200 chars", () => {
|
||||||
|
const hash = buildDeliveredTextHash("Hello World!");
|
||||||
|
expect(hash).toBe("hello world!");
|
||||||
|
});
|
||||||
|
|
||||||
|
it("includes length and full-text hash for strings over 200 chars", () => {
|
||||||
|
const long = "a".repeat(300);
|
||||||
|
const hash = buildDeliveredTextHash(long);
|
||||||
|
expect(hash).toContain("|300|");
|
||||||
|
expect(hash.startsWith("a".repeat(200))).toBe(true);
|
||||||
|
});
|
||||||
|
|
||||||
|
it("produces different hashes for texts with same prefix but different tails", () => {
|
||||||
|
const base = "x".repeat(200);
|
||||||
|
const textA = base + " ending alpha with more content here";
|
||||||
|
const textB = base + " ending beta with different content";
|
||||||
|
expect(buildDeliveredTextHash(textA)).not.toBe(buildDeliveredTextHash(textB));
|
||||||
|
});
|
||||||
|
|
||||||
|
it("returns empty for very short text", () => {
|
||||||
|
expect(buildDeliveredTextHash("hi")).toBe("hi");
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
describe("isRecentlyDelivered", () => {
|
||||||
|
it("returns false for empty cache", () => {
|
||||||
|
expect(isRecentlyDelivered("Hello world test message", [])).toBe(false);
|
||||||
|
});
|
||||||
|
|
||||||
|
it("returns true when text was recently recorded", () => {
|
||||||
|
const cache: RecentDeliveredEntry[] = [];
|
||||||
|
const now = Date.now();
|
||||||
|
recordDeliveredText("Hello world test message", cache, now);
|
||||||
|
expect(isRecentlyDelivered("Hello world test message", cache, now + 1000)).toBe(true);
|
||||||
|
});
|
||||||
|
|
||||||
|
it("returns false after TTL expires", () => {
|
||||||
|
const cache: RecentDeliveredEntry[] = [];
|
||||||
|
const now = Date.now();
|
||||||
|
recordDeliveredText("Hello world test message", cache, now);
|
||||||
|
// 1 hour + 1ms later
|
||||||
|
expect(isRecentlyDelivered("Hello world test message", cache, now + 3_600_001)).toBe(false);
|
||||||
|
});
|
||||||
|
|
||||||
|
it("returns false for text shorter than MIN_DUPLICATE_TEXT_LENGTH", () => {
|
||||||
|
const cache: RecentDeliveredEntry[] = [];
|
||||||
|
recordDeliveredText("short", cache);
|
||||||
|
expect(isRecentlyDelivered("short", cache)).toBe(false);
|
||||||
|
});
|
||||||
|
|
||||||
|
it("detects duplicates with different whitespace/casing", () => {
|
||||||
|
const cache: RecentDeliveredEntry[] = [];
|
||||||
|
const now = Date.now();
|
||||||
|
recordDeliveredText(" Hello World Test Message ", cache, now);
|
||||||
|
expect(isRecentlyDelivered("hello world test message", cache, now)).toBe(true);
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
describe("recordDeliveredText", () => {
|
||||||
|
it("evicts expired entries on record", () => {
|
||||||
|
const cache: RecentDeliveredEntry[] = [];
|
||||||
|
const now = Date.now();
|
||||||
|
recordDeliveredText("First message that is long enough", cache, now);
|
||||||
|
// Record second much later (> TTL)
|
||||||
|
recordDeliveredText("Second message that is long enough", cache, now + 3_700_000);
|
||||||
|
// First should be evicted
|
||||||
|
expect(cache.length).toBe(1);
|
||||||
|
expect(cache[0].hash).toContain("second");
|
||||||
|
});
|
||||||
|
|
||||||
|
it("caps at RECENT_DELIVERED_MAX entries", () => {
|
||||||
|
const cache: RecentDeliveredEntry[] = [];
|
||||||
|
const now = Date.now();
|
||||||
|
for (let i = 0; i < 25; i++) {
|
||||||
|
recordDeliveredText(`Unique message number ${i} with enough length`, cache, now + i);
|
||||||
|
}
|
||||||
|
expect(cache.length).toBe(20);
|
||||||
|
});
|
||||||
|
|
||||||
|
it("updates timestamp for duplicate hash instead of adding", () => {
|
||||||
|
const cache: RecentDeliveredEntry[] = [];
|
||||||
|
const now = Date.now();
|
||||||
|
recordDeliveredText("Same message repeated in session", cache, now);
|
||||||
|
recordDeliveredText("Same message repeated in session", cache, now + 5000);
|
||||||
|
expect(cache.length).toBe(1);
|
||||||
|
expect(cache[0].timestamp).toBe(now + 5000);
|
||||||
|
});
|
||||||
|
});
|
||||||
|
});
|
||||||
@ -1,5 +1,97 @@
|
|||||||
const MIN_DUPLICATE_TEXT_LENGTH = 10;
|
const MIN_DUPLICATE_TEXT_LENGTH = 10;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Maximum number of recent delivered text hashes to retain for cross-turn
|
||||||
|
* deduplication. Keeps memory bounded while covering the typical window
|
||||||
|
* where context compaction may cause the model to re-emit a previous reply.
|
||||||
|
*/
|
||||||
|
const RECENT_DELIVERED_MAX = 20;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* TTL for entries in the cross-turn dedup cache (1 hour).
|
||||||
|
* After this period the entry is evicted and the same text can be delivered
|
||||||
|
* again (which is desirable for intentionally repeated content).
|
||||||
|
*/
|
||||||
|
const RECENT_DELIVERED_TTL_MS = 60 * 60_000;
|
||||||
|
|
||||||
|
export type RecentDeliveredEntry = {
|
||||||
|
hash: string;
|
||||||
|
timestamp: number;
|
||||||
|
};
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Build a collision-resistant hash from the full normalised text of a
|
||||||
|
* delivered assistant message. Uses a fast non-cryptographic approach:
|
||||||
|
* the first 200 normalised chars (for quick prefix screening) combined
|
||||||
|
* with the total length and a simple 32-bit numeric hash of the full
|
||||||
|
* string. This avoids false positives when two responses share the same
|
||||||
|
* opening paragraph but diverge later.
|
||||||
|
*/
|
||||||
|
export function buildDeliveredTextHash(text: string): string {
|
||||||
|
const normalized = normalizeTextForComparison(text);
|
||||||
|
if (normalized.length <= 200) {
|
||||||
|
return normalized;
|
||||||
|
}
|
||||||
|
// 32-bit FNV-1a-inspired hash (Math.imul + >>> 0 operate on 32-bit integers).
|
||||||
|
let h = 0x811c9dc5;
|
||||||
|
for (let i = 0; i < normalized.length; i++) {
|
||||||
|
h ^= normalized.charCodeAt(i);
|
||||||
|
h = Math.imul(h, 0x01000193);
|
||||||
|
}
|
||||||
|
// Combine prefix + length + full-text hash for uniqueness.
|
||||||
|
return `${normalized.slice(0, 200)}|${normalized.length}|${(h >>> 0).toString(36)}`;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Check whether `text` was recently delivered (cross-turn).
|
||||||
|
*/
|
||||||
|
export function isRecentlyDelivered(
|
||||||
|
text: string,
|
||||||
|
recentDelivered: RecentDeliveredEntry[],
|
||||||
|
now?: number,
|
||||||
|
): boolean {
|
||||||
|
const hash = buildDeliveredTextHash(text);
|
||||||
|
if (!hash || hash.length < MIN_DUPLICATE_TEXT_LENGTH) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
const currentTime = now ?? Date.now();
|
||||||
|
return recentDelivered.some(
|
||||||
|
(entry) => currentTime - entry.timestamp < RECENT_DELIVERED_TTL_MS && entry.hash === hash,
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Record a delivered text in the rolling cache.
|
||||||
|
*/
|
||||||
|
export function recordDeliveredText(
|
||||||
|
text: string,
|
||||||
|
recentDelivered: RecentDeliveredEntry[],
|
||||||
|
now?: number,
|
||||||
|
): void {
|
||||||
|
const hash = buildDeliveredTextHash(text);
|
||||||
|
if (!hash || hash.length < MIN_DUPLICATE_TEXT_LENGTH) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
const currentTime = now ?? Date.now();
|
||||||
|
// Evict expired entries.
|
||||||
|
for (let i = recentDelivered.length - 1; i >= 0; i--) {
|
||||||
|
if (currentTime - recentDelivered[i].timestamp >= RECENT_DELIVERED_TTL_MS) {
|
||||||
|
recentDelivered.splice(i, 1);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// Avoid duplicate entries for the same hash.
|
||||||
|
const existing = recentDelivered.findIndex((e) => e.hash === hash);
|
||||||
|
if (existing >= 0) {
|
||||||
|
recentDelivered[existing].timestamp = currentTime;
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
recentDelivered.push({ hash, timestamp: currentTime });
|
||||||
|
// Trim oldest if over capacity.
|
||||||
|
while (recentDelivered.length > RECENT_DELIVERED_MAX) {
|
||||||
|
recentDelivered.shift();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Normalize text for duplicate comparison.
|
* Normalize text for duplicate comparison.
|
||||||
* - Trims whitespace
|
* - Trims whitespace
|
||||||
|
|||||||
@ -4,6 +4,7 @@ import type { ReasoningLevel } from "../auto-reply/thinking.js";
|
|||||||
import type { InlineCodeState } from "../markdown/code-spans.js";
|
import type { InlineCodeState } from "../markdown/code-spans.js";
|
||||||
import type { HookRunner } from "../plugins/hooks.js";
|
import type { HookRunner } from "../plugins/hooks.js";
|
||||||
import type { EmbeddedBlockChunker } from "./pi-embedded-block-chunker.js";
|
import type { EmbeddedBlockChunker } from "./pi-embedded-block-chunker.js";
|
||||||
|
import type { RecentDeliveredEntry } from "./pi-embedded-helpers/messaging-dedupe.js";
|
||||||
import type { MessagingToolSend } from "./pi-embedded-messaging.js";
|
import type { MessagingToolSend } from "./pi-embedded-messaging.js";
|
||||||
import type {
|
import type {
|
||||||
BlockReplyChunking,
|
BlockReplyChunking,
|
||||||
@ -68,6 +69,7 @@ export type EmbeddedPiSubscribeState = {
|
|||||||
compactionRetryPromise: Promise<void> | null;
|
compactionRetryPromise: Promise<void> | null;
|
||||||
unsubscribed: boolean;
|
unsubscribed: boolean;
|
||||||
|
|
||||||
|
recentDeliveredTexts: RecentDeliveredEntry[];
|
||||||
messagingToolSentTexts: string[];
|
messagingToolSentTexts: string[];
|
||||||
messagingToolSentTextsNormalized: string[];
|
messagingToolSentTextsNormalized: string[];
|
||||||
messagingToolSentTargets: MessagingToolSend[];
|
messagingToolSentTargets: MessagingToolSend[];
|
||||||
|
|||||||
@ -27,7 +27,7 @@ describe("subscribeEmbeddedPiSession reply tags", () => {
|
|||||||
return { emit, onBlockReply };
|
return { emit, onBlockReply };
|
||||||
}
|
}
|
||||||
|
|
||||||
it("carries reply_to_current across tag-only block chunks", () => {
|
it("carries reply_to_current across tag-only block chunks", async () => {
|
||||||
const { emit, onBlockReply } = createBlockReplyHarness();
|
const { emit, onBlockReply } = createBlockReplyHarness();
|
||||||
|
|
||||||
emit({ type: "message_start", message: { role: "assistant" } });
|
emit({ type: "message_start", message: { role: "assistant" } });
|
||||||
@ -39,6 +39,7 @@ describe("subscribeEmbeddedPiSession reply tags", () => {
|
|||||||
content: [{ type: "text", text: "[[reply_to_current]]\nHello" }],
|
content: [{ type: "text", text: "[[reply_to_current]]\nHello" }],
|
||||||
} as AssistantMessage;
|
} as AssistantMessage;
|
||||||
emit({ type: "message_end", message: assistantMessage });
|
emit({ type: "message_end", message: assistantMessage });
|
||||||
|
await Promise.resolve();
|
||||||
|
|
||||||
expect(onBlockReply).toHaveBeenCalledTimes(1);
|
expect(onBlockReply).toHaveBeenCalledTimes(1);
|
||||||
const payload = onBlockReply.mock.calls[0]?.[0];
|
const payload = onBlockReply.mock.calls[0]?.[0];
|
||||||
@ -47,7 +48,7 @@ describe("subscribeEmbeddedPiSession reply tags", () => {
|
|||||||
expect(payload?.replyToTag).toBe(true);
|
expect(payload?.replyToTag).toBe(true);
|
||||||
});
|
});
|
||||||
|
|
||||||
it("flushes trailing directive tails on stream end", () => {
|
it("flushes trailing directive tails on stream end", async () => {
|
||||||
const { emit, onBlockReply } = createBlockReplyHarness();
|
const { emit, onBlockReply } = createBlockReplyHarness();
|
||||||
|
|
||||||
emit({ type: "message_start", message: { role: "assistant" } });
|
emit({ type: "message_start", message: { role: "assistant" } });
|
||||||
@ -59,6 +60,7 @@ describe("subscribeEmbeddedPiSession reply tags", () => {
|
|||||||
content: [{ type: "text", text: "Hello [[" }],
|
content: [{ type: "text", text: "Hello [[" }],
|
||||||
} as AssistantMessage;
|
} as AssistantMessage;
|
||||||
emit({ type: "message_end", message: assistantMessage });
|
emit({ type: "message_end", message: assistantMessage });
|
||||||
|
await Promise.resolve();
|
||||||
|
|
||||||
expect(onBlockReply).toHaveBeenCalledTimes(2);
|
expect(onBlockReply).toHaveBeenCalledTimes(2);
|
||||||
expect(onBlockReply.mock.calls[0]?.[0]?.text).toBe("Hello");
|
expect(onBlockReply.mock.calls[0]?.[0]?.text).toBe("Hello");
|
||||||
@ -88,4 +90,43 @@ describe("subscribeEmbeddedPiSession reply tags", () => {
|
|||||||
expect(call[0]?.text?.includes("[[reply_to")).toBe(false);
|
expect(call[0]?.text?.includes("[[reply_to")).toBe(false);
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
|
it("rebuilds reply_to_current after a compaction retry replays a directive chunk", async () => {
|
||||||
|
const { session, emit } = createStubSessionHarness();
|
||||||
|
const onBlockReply = vi.fn();
|
||||||
|
|
||||||
|
subscribeEmbeddedPiSession({
|
||||||
|
session,
|
||||||
|
runId: "run",
|
||||||
|
onBlockReply,
|
||||||
|
blockReplyBreak: "text_end",
|
||||||
|
blockReplyChunking: {
|
||||||
|
minChars: 1,
|
||||||
|
maxChars: 50,
|
||||||
|
breakPreference: "newline",
|
||||||
|
},
|
||||||
|
});
|
||||||
|
|
||||||
|
emit({ type: "message_start", message: { role: "assistant" } });
|
||||||
|
emitAssistantTextDelta({ emit, delta: "[[reply_to_current]]" });
|
||||||
|
emit({ type: "auto_compaction_end", willRetry: true });
|
||||||
|
|
||||||
|
emit({ type: "message_start", message: { role: "assistant" } });
|
||||||
|
emitAssistantTextDelta({ emit, delta: "[[reply_to_current]]\nHello again" });
|
||||||
|
emitAssistantTextEnd({ emit });
|
||||||
|
emit({
|
||||||
|
type: "message_end",
|
||||||
|
message: {
|
||||||
|
role: "assistant",
|
||||||
|
content: [{ type: "text", text: "[[reply_to_current]]\nHello again" }],
|
||||||
|
} as AssistantMessage,
|
||||||
|
});
|
||||||
|
await Promise.resolve();
|
||||||
|
|
||||||
|
expect(onBlockReply).toHaveBeenCalledTimes(1);
|
||||||
|
const payload = onBlockReply.mock.calls[0]?.[0];
|
||||||
|
expect(payload?.text).toBe("Hello again");
|
||||||
|
expect(payload?.replyToCurrent).toBe(true);
|
||||||
|
expect(payload?.replyToTag).toBe(true);
|
||||||
|
});
|
||||||
});
|
});
|
||||||
|
|||||||
@ -11,7 +11,7 @@ import { makeZeroUsageSnapshot } from "./usage.js";
|
|||||||
type SessionEventHandler = (evt: unknown) => void;
|
type SessionEventHandler = (evt: unknown) => void;
|
||||||
|
|
||||||
describe("subscribeEmbeddedPiSession", () => {
|
describe("subscribeEmbeddedPiSession", () => {
|
||||||
it("splits long single-line fenced blocks with reopen/close", () => {
|
it("splits long single-line fenced blocks with reopen/close", async () => {
|
||||||
const onBlockReply = vi.fn();
|
const onBlockReply = vi.fn();
|
||||||
const { emit } = createParagraphChunkedBlockReplyHarness({
|
const { emit } = createParagraphChunkedBlockReplyHarness({
|
||||||
onBlockReply,
|
onBlockReply,
|
||||||
@ -23,6 +23,7 @@ describe("subscribeEmbeddedPiSession", () => {
|
|||||||
|
|
||||||
const text = `\`\`\`json\n${"x".repeat(120)}\n\`\`\``;
|
const text = `\`\`\`json\n${"x".repeat(120)}\n\`\`\``;
|
||||||
emitAssistantTextDeltaAndEnd({ emit, text });
|
emitAssistantTextDeltaAndEnd({ emit, text });
|
||||||
|
await Promise.resolve();
|
||||||
expectFencedChunks(onBlockReply.mock.calls, "```json");
|
expectFencedChunks(onBlockReply.mock.calls, "```json");
|
||||||
});
|
});
|
||||||
it("waits for auto-compaction retry and clears buffered text", async () => {
|
it("waits for auto-compaction retry and clears buffered text", async () => {
|
||||||
@ -152,4 +153,32 @@ describe("subscribeEmbeddedPiSession", () => {
|
|||||||
const usage = (session.messages?.[0] as { usage?: unknown } | undefined)?.usage;
|
const usage = (session.messages?.[0] as { usage?: unknown } | undefined)?.usage;
|
||||||
expect(usage).toEqual(makeZeroUsageSnapshot());
|
expect(usage).toEqual(makeZeroUsageSnapshot());
|
||||||
});
|
});
|
||||||
|
|
||||||
|
it("does not cache non-block assistant text across a willRetry compaction reset", () => {
|
||||||
|
const listeners: SessionEventHandler[] = [];
|
||||||
|
const session = {
|
||||||
|
subscribe: (listener: SessionEventHandler) => {
|
||||||
|
listeners.push(listener);
|
||||||
|
return () => {};
|
||||||
|
},
|
||||||
|
} as unknown as Parameters<typeof subscribeEmbeddedPiSession>[0]["session"];
|
||||||
|
|
||||||
|
const subscription = subscribeEmbeddedPiSession({
|
||||||
|
session,
|
||||||
|
runId: "run-4",
|
||||||
|
});
|
||||||
|
|
||||||
|
const assistantMessage = {
|
||||||
|
role: "assistant",
|
||||||
|
content: [{ type: "text", text: "Repeated completion text" }],
|
||||||
|
} as AssistantMessage;
|
||||||
|
|
||||||
|
for (const listener of listeners) {
|
||||||
|
listener({ type: "message_end", message: assistantMessage });
|
||||||
|
listener({ type: "auto_compaction_end", willRetry: true });
|
||||||
|
listener({ type: "message_end", message: assistantMessage });
|
||||||
|
}
|
||||||
|
|
||||||
|
expect(subscription.assistantTexts).toEqual(["Repeated completion text"]);
|
||||||
|
});
|
||||||
});
|
});
|
||||||
|
|||||||
@ -9,8 +9,11 @@ import { buildCodeSpanIndex, createInlineCodeState } from "../markdown/code-span
|
|||||||
import { EmbeddedBlockChunker } from "./pi-embedded-block-chunker.js";
|
import { EmbeddedBlockChunker } from "./pi-embedded-block-chunker.js";
|
||||||
import {
|
import {
|
||||||
isMessagingToolDuplicateNormalized,
|
isMessagingToolDuplicateNormalized,
|
||||||
|
isRecentlyDelivered,
|
||||||
normalizeTextForComparison,
|
normalizeTextForComparison,
|
||||||
|
recordDeliveredText,
|
||||||
} from "./pi-embedded-helpers.js";
|
} from "./pi-embedded-helpers.js";
|
||||||
|
import type { RecentDeliveredEntry } from "./pi-embedded-helpers.js";
|
||||||
import { createEmbeddedPiSessionEventHandler } from "./pi-embedded-subscribe.handlers.js";
|
import { createEmbeddedPiSessionEventHandler } from "./pi-embedded-subscribe.handlers.js";
|
||||||
import type {
|
import type {
|
||||||
EmbeddedPiSubscribeContext,
|
EmbeddedPiSubscribeContext,
|
||||||
@ -70,6 +73,7 @@ export function subscribeEmbeddedPiSession(params: SubscribeEmbeddedPiSessionPar
|
|||||||
compactionRetryReject: undefined,
|
compactionRetryReject: undefined,
|
||||||
compactionRetryPromise: null,
|
compactionRetryPromise: null,
|
||||||
unsubscribed: false,
|
unsubscribed: false,
|
||||||
|
recentDeliveredTexts: [] as RecentDeliveredEntry[],
|
||||||
messagingToolSentTexts: [],
|
messagingToolSentTexts: [],
|
||||||
messagingToolSentTextsNormalized: [],
|
messagingToolSentTextsNormalized: [],
|
||||||
messagingToolSentTargets: [],
|
messagingToolSentTargets: [],
|
||||||
@ -149,15 +153,21 @@ export function subscribeEmbeddedPiSession(params: SubscribeEmbeddedPiSessionPar
|
|||||||
};
|
};
|
||||||
|
|
||||||
const shouldSkipAssistantText = (text: string) => {
|
const shouldSkipAssistantText = (text: string) => {
|
||||||
if (state.lastAssistantTextMessageIndex !== state.assistantMessageIndex) {
|
// Same-turn dedup (existing behaviour).
|
||||||
return false;
|
if (state.lastAssistantTextMessageIndex === state.assistantMessageIndex) {
|
||||||
|
const trimmed = text.trimEnd();
|
||||||
|
if (trimmed && trimmed === state.lastAssistantTextTrimmed) {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
const normalized = normalizeTextForComparison(text);
|
||||||
|
if (normalized.length > 0 && normalized === state.lastAssistantTextNormalized) {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
const trimmed = text.trimEnd();
|
// Cross-turn dedup: catch duplicates caused by context compaction replaying
|
||||||
if (trimmed && trimmed === state.lastAssistantTextTrimmed) {
|
// the same assistant text in a new turn. Uses a rolling hash cache with a
|
||||||
return true;
|
// 1-hour TTL so intentionally repeated content still goes through eventually.
|
||||||
}
|
if (isRecentlyDelivered(text, state.recentDeliveredTexts)) {
|
||||||
const normalized = normalizeTextForComparison(text);
|
|
||||||
if (normalized.length > 0 && normalized === state.lastAssistantTextNormalized) {
|
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
return false;
|
return false;
|
||||||
@ -172,6 +182,12 @@ export function subscribeEmbeddedPiSession(params: SubscribeEmbeddedPiSessionPar
|
|||||||
}
|
}
|
||||||
assistantTexts.push(text);
|
assistantTexts.push(text);
|
||||||
rememberAssistantText(text);
|
rememberAssistantText(text);
|
||||||
|
// Non-block assistant text may still be replayed after willRetry compaction
|
||||||
|
// before any channel delivery has happened, so only block-reply paths record
|
||||||
|
// cross-turn dedup state eagerly.
|
||||||
|
if (params.onBlockReply) {
|
||||||
|
recordDeliveredText(text, state.recentDeliveredTexts);
|
||||||
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
const finalizeAssistantTexts = (args: {
|
const finalizeAssistantTexts = (args: {
|
||||||
@ -191,6 +207,9 @@ export function subscribeEmbeddedPiSession(params: SubscribeEmbeddedPiSessionPar
|
|||||||
text,
|
text,
|
||||||
);
|
);
|
||||||
rememberAssistantText(text);
|
rememberAssistantText(text);
|
||||||
|
if (params.onBlockReply) {
|
||||||
|
recordDeliveredText(text, state.recentDeliveredTexts);
|
||||||
|
}
|
||||||
} else {
|
} else {
|
||||||
pushAssistantText(text);
|
pushAssistantText(text);
|
||||||
}
|
}
|
||||||
@ -501,10 +520,10 @@ export function subscribeEmbeddedPiSession(params: SubscribeEmbeddedPiSessionPar
|
|||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
state.lastBlockReplyText = chunk;
|
|
||||||
assistantTexts.push(chunk);
|
|
||||||
rememberAssistantText(chunk);
|
|
||||||
if (!params.onBlockReply) {
|
if (!params.onBlockReply) {
|
||||||
|
state.lastBlockReplyText = chunk;
|
||||||
|
assistantTexts.push(chunk);
|
||||||
|
rememberAssistantText(chunk);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
const splitResult = replyDirectiveAccumulator.consume(chunk);
|
const splitResult = replyDirectiveAccumulator.consume(chunk);
|
||||||
@ -523,6 +542,15 @@ export function subscribeEmbeddedPiSession(params: SubscribeEmbeddedPiSessionPar
|
|||||||
if (!cleanedText && (!mediaUrls || mediaUrls.length === 0) && !audioAsVoice) {
|
if (!cleanedText && (!mediaUrls || mediaUrls.length === 0) && !audioAsVoice) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
if (cleanedText) {
|
||||||
|
state.lastBlockReplyText = cleanedText;
|
||||||
|
assistantTexts.push(cleanedText);
|
||||||
|
rememberAssistantText(cleanedText);
|
||||||
|
// Record in cross-turn dedup cache synchronously before async delivery
|
||||||
|
// so compaction-started follow-up turns see already-emitted text, but
|
||||||
|
// only after reply directives have produced a real outbound payload.
|
||||||
|
recordDeliveredText(cleanedText, state.recentDeliveredTexts);
|
||||||
|
}
|
||||||
emitBlockReplySafely({
|
emitBlockReplySafely({
|
||||||
text: cleanedText,
|
text: cleanedText,
|
||||||
mediaUrls: mediaUrls?.length ? mediaUrls : undefined,
|
mediaUrls: mediaUrls?.length ? mediaUrls : undefined,
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user