diff --git a/CHANGELOG.md b/CHANGELOG.md index b1d0dfe4003..40b9027f7f9 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -103,6 +103,7 @@ Docs: https://docs.openclaw.ai - Gateway/HTTP tools invoke media compatibility: preserve raw media payload access for direct `/tools/invoke` clients by allowing media `nodes` invoke commands only in HTTP tool context, while keeping agent-context media invoke blocking to prevent base64 prompt bloat. (#34365) Thanks @obviyus. - Agents/Nodes media outputs: add dedicated `photos_latest` action handling, block media-returning `nodes invoke` commands, keep metadata-only `camera.list` invoke allowed, and normalize empty `photos_latest` results to a consistent response shape to prevent base64 context bloat. (#34332) Thanks @obviyus. - TUI/session-key canonicalization: normalize `openclaw tui --session` values to lowercase so uppercase session names no longer drop real-time streaming updates due to gateway/TUI key mismatches. (#33866, #34013) thanks @lynnzc. +- iMessage/echo loop hardening: strip leaked assistant-internal scaffolding from outbound iMessage replies, drop reflected assistant-content messages before they re-enter inbound processing, extend echo-cache text retention for delayed reflections, and suppress repeated loop traffic before it amplifies into queue overflow. (#33295) Thanks @joelnishanth. - Outbound/send config threading: pass resolved SecretRef config through outbound adapters and helper send paths so send flows do not reload unresolved runtime config. (#33987) Thanks @joshavant. - Sessions/subagent attachments: remove `attachments[].content.maxLength` from `sessions_spawn` schema to avoid llama.cpp GBNF repetition overflow, and preflight UTF-8 byte size before buffer allocation while keeping runtime file-size enforcement unchanged. (#33648) Thanks @anisoptera. - Runtime/tool-state stability: recover from dangling Anthropic `tool_use` after compaction, serialize long-running Discord handler runs without blocking new inbound events, and prevent stale busy snapshots from suppressing stuck-channel recovery. (from #33630, #33583) Thanks @kevinWangSheng and @theotarr. diff --git a/src/imessage/monitor/deliver.ts b/src/imessage/monitor/deliver.ts index 71825be8d0b..fc949d3cfc1 100644 --- a/src/imessage/monitor/deliver.ts +++ b/src/imessage/monitor/deliver.ts @@ -7,6 +7,7 @@ import type { RuntimeEnv } from "../../runtime.js"; import type { createIMessageRpcClient } from "../client.js"; import { sendMessageIMessage } from "../send.js"; import type { SentMessageCache } from "./echo-cache.js"; +import { sanitizeOutboundText } from "./sanitize-outbound.js"; export async function deliverReplies(params: { replies: ReplyPayload[]; @@ -30,7 +31,7 @@ export async function deliverReplies(params: { const chunkMode = resolveChunkMode(cfg, "imessage", accountId); for (const payload of replies) { const mediaList = payload.mediaUrls ?? (payload.mediaUrl ? [payload.mediaUrl] : []); - const rawText = payload.text ?? ""; + const rawText = sanitizeOutboundText(payload.text ?? ""); const text = convertMarkdownTables(rawText, tableMode); if (!text && mediaList.length === 0) { continue; diff --git a/src/imessage/monitor/echo-cache.ts b/src/imessage/monitor/echo-cache.ts index c68ff04b970..06f5ee847f5 100644 --- a/src/imessage/monitor/echo-cache.ts +++ b/src/imessage/monitor/echo-cache.ts @@ -8,7 +8,9 @@ export type SentMessageCache = { has: (scope: string, lookup: SentMessageLookup) => boolean; }; -const SENT_MESSAGE_TEXT_TTL_MS = 5000; +// Keep the text fallback short so repeated user replies like "ok" are not +// suppressed for long; delayed reflections should match the stronger message-id key. +const SENT_MESSAGE_TEXT_TTL_MS = 5_000; const SENT_MESSAGE_ID_TTL_MS = 60_000; function normalizeEchoTextKey(text: string | undefined): string | null { diff --git a/src/imessage/monitor/inbound-processing.ts b/src/imessage/monitor/inbound-processing.ts index 8a4979df965..d042f1f1a0f 100644 --- a/src/imessage/monitor/inbound-processing.ts +++ b/src/imessage/monitor/inbound-processing.ts @@ -30,6 +30,7 @@ import { isAllowedIMessageSender, normalizeIMessageHandle, } from "../targets.js"; +import { detectReflectedContent } from "./reflection-guard.js"; import type { MonitorIMessageOpts, IMessagePayload } from "./types.js"; type IMessageReplyContext = { @@ -214,7 +215,7 @@ export function resolveIMessageInboundDecision(params: { return { kind: "drop", reason: "empty body" }; } - // Echo detection: check if the received message matches a recently sent message (within 5 seconds). + // Echo detection: check if the received message matches a recently sent message. // Scope by conversation so same text in different chats is not conflated. const inboundMessageId = params.message.id != null ? String(params.message.id) : undefined; if (params.echoCache && (messageText || inboundMessageId)) { @@ -237,6 +238,17 @@ export function resolveIMessageInboundDecision(params: { } } + // Reflection guard: drop inbound messages that contain assistant-internal + // metadata markers. These indicate outbound content was reflected back as + // inbound, which causes recursive echo amplification. + const reflection = detectReflectedContent(messageText); + if (reflection.isReflection) { + params.logVerbose?.( + `imessage: dropping reflected assistant content (markers: ${reflection.matchedLabels.join(", ")})`, + ); + return { kind: "drop", reason: "reflected assistant content" }; + } + const replyContext = describeReplyContext(params.message); const createdAt = params.message.created_at ? Date.parse(params.message.created_at) : undefined; const historyKey = isGroup diff --git a/src/imessage/monitor/loop-rate-limiter.test.ts b/src/imessage/monitor/loop-rate-limiter.test.ts new file mode 100644 index 00000000000..d156ffc2c36 --- /dev/null +++ b/src/imessage/monitor/loop-rate-limiter.test.ts @@ -0,0 +1,50 @@ +import { afterEach, beforeEach, describe, expect, it, vi } from "vitest"; +import { createLoopRateLimiter } from "./loop-rate-limiter.js"; + +describe("createLoopRateLimiter", () => { + beforeEach(() => { + vi.useFakeTimers(); + }); + + afterEach(() => { + vi.useRealTimers(); + }); + + it("allows messages below the threshold", () => { + const limiter = createLoopRateLimiter({ windowMs: 10_000, maxHits: 3 }); + limiter.record("conv:1"); + limiter.record("conv:1"); + expect(limiter.isRateLimited("conv:1")).toBe(false); + }); + + it("rate limits at the threshold", () => { + const limiter = createLoopRateLimiter({ windowMs: 10_000, maxHits: 3 }); + limiter.record("conv:1"); + limiter.record("conv:1"); + limiter.record("conv:1"); + expect(limiter.isRateLimited("conv:1")).toBe(true); + }); + + it("does not cross-contaminate conversations", () => { + const limiter = createLoopRateLimiter({ windowMs: 10_000, maxHits: 2 }); + limiter.record("conv:1"); + limiter.record("conv:1"); + expect(limiter.isRateLimited("conv:1")).toBe(true); + expect(limiter.isRateLimited("conv:2")).toBe(false); + }); + + it("resets after the time window expires", () => { + const limiter = createLoopRateLimiter({ windowMs: 5_000, maxHits: 2 }); + limiter.record("conv:1"); + limiter.record("conv:1"); + expect(limiter.isRateLimited("conv:1")).toBe(true); + + vi.advanceTimersByTime(6_000); + expect(limiter.isRateLimited("conv:1")).toBe(false); + }); + + it("returns false for unknown conversations", () => { + const limiter = createLoopRateLimiter(); + expect(limiter.isRateLimited("unknown")).toBe(false); + }); +}); diff --git a/src/imessage/monitor/loop-rate-limiter.ts b/src/imessage/monitor/loop-rate-limiter.ts new file mode 100644 index 00000000000..56c234a1b14 --- /dev/null +++ b/src/imessage/monitor/loop-rate-limiter.ts @@ -0,0 +1,69 @@ +/** + * Per-conversation rate limiter that detects rapid-fire identical echo + * patterns and suppresses them before they amplify into queue overflow. + */ + +const DEFAULT_WINDOW_MS = 60_000; +const DEFAULT_MAX_HITS = 5; +const CLEANUP_INTERVAL_MS = 120_000; + +type ConversationWindow = { + timestamps: number[]; +}; + +export type LoopRateLimiter = { + /** Returns true if this conversation has exceeded the rate limit. */ + isRateLimited: (conversationKey: string) => boolean; + /** Record an inbound message for a conversation. */ + record: (conversationKey: string) => void; +}; + +export function createLoopRateLimiter(opts?: { + windowMs?: number; + maxHits?: number; +}): LoopRateLimiter { + const windowMs = opts?.windowMs ?? DEFAULT_WINDOW_MS; + const maxHits = opts?.maxHits ?? DEFAULT_MAX_HITS; + const conversations = new Map(); + let lastCleanup = Date.now(); + + function cleanup() { + const now = Date.now(); + if (now - lastCleanup < CLEANUP_INTERVAL_MS) { + return; + } + lastCleanup = now; + for (const [key, win] of conversations.entries()) { + const recent = win.timestamps.filter((ts) => now - ts <= windowMs); + if (recent.length === 0) { + conversations.delete(key); + } else { + win.timestamps = recent; + } + } + } + + return { + record(conversationKey: string) { + cleanup(); + let win = conversations.get(conversationKey); + if (!win) { + win = { timestamps: [] }; + conversations.set(conversationKey, win); + } + win.timestamps.push(Date.now()); + }, + + isRateLimited(conversationKey: string): boolean { + cleanup(); + const win = conversations.get(conversationKey); + if (!win) { + return false; + } + const now = Date.now(); + const recent = win.timestamps.filter((ts) => now - ts <= windowMs); + win.timestamps = recent; + return recent.length >= maxHits; + }, + }; +} diff --git a/src/imessage/monitor/monitor-provider.echo-cache.test.ts b/src/imessage/monitor/monitor-provider.echo-cache.test.ts index e67667c0228..4adeed4aafa 100644 --- a/src/imessage/monitor/monitor-provider.echo-cache.test.ts +++ b/src/imessage/monitor/monitor-provider.echo-cache.test.ts @@ -35,7 +35,8 @@ describe("iMessage sent-message echo cache", () => { const cache = createSentMessageCache(); cache.remember("acct:imessage:+1555", { text: "hello", messageId: "m-1" }); - vi.advanceTimersByTime(6000); + // Text fallback stays short to avoid suppressing legitimate repeated user text. + vi.advanceTimersByTime(6_000); expect(cache.has("acct:imessage:+1555", { text: "hello" })).toBe(false); expect(cache.has("acct:imessage:+1555", { messageId: "m-1" })).toBe(true); diff --git a/src/imessage/monitor/monitor-provider.ts b/src/imessage/monitor/monitor-provider.ts index 2ca8d3015f1..ffc15a4df0a 100644 --- a/src/imessage/monitor/monitor-provider.ts +++ b/src/imessage/monitor/monitor-provider.ts @@ -50,6 +50,7 @@ import { buildIMessageInboundContext, resolveIMessageInboundDecision, } from "./inbound-processing.js"; +import { createLoopRateLimiter } from "./loop-rate-limiter.js"; import { parseIMessageNotification } from "./parse-notification.js"; import { normalizeAllowList, resolveRuntime } from "./runtime.js"; import type { IMessagePayload, MonitorIMessageOpts } from "./types.js"; @@ -98,6 +99,7 @@ export async function monitorIMessageProvider(opts: MonitorIMessageOpts = {}): P ); const groupHistories = new Map(); const sentMessageCache = createSentMessageCache(); + const loopRateLimiter = createLoopRateLimiter(); const textLimit = resolveTextChunkLimit(cfg, "imessage", accountInfo.accountId); const allowFrom = normalizeAllowList(opts.allowFrom ?? imessageCfg.allowFrom); const groupAllowFrom = normalizeAllowList( @@ -253,11 +255,34 @@ export async function monitorIMessageProvider(opts: MonitorIMessageOpts = {}): P logVerbose, }); + // Build conversation key for rate limiting (used by both drop and dispatch paths). + const chatId = message.chat_id ?? undefined; + const senderForKey = (message.sender ?? "").trim(); + const conversationKey = chatId != null ? `group:${chatId}` : `dm:${senderForKey}`; + const rateLimitKey = `${accountInfo.accountId}:${conversationKey}`; + if (decision.kind === "drop") { + // Record echo/reflection drops so the rate limiter can detect sustained loops. + // Only loop-related drop reasons feed the counter; policy/mention/empty drops + // are normal and should not escalate. + const isLoopDrop = + decision.reason === "echo" || + decision.reason === "reflected assistant content" || + decision.reason === "from me"; + if (isLoopDrop) { + loopRateLimiter.record(rateLimitKey); + } + return; + } + + // After repeated echo/reflection drops for a conversation, suppress all + // remaining messages as a safety net against amplification that slips + // through the primary guards. + if (decision.kind === "dispatch" && loopRateLimiter.isRateLimited(rateLimitKey)) { + logVerbose(`imessage: rate-limited conversation ${conversationKey} (echo loop detected)`); return; } - const chatId = message.chat_id ?? undefined; if (decision.kind === "pairing") { const sender = (message.sender ?? "").trim(); if (!sender) { diff --git a/src/imessage/monitor/reflection-guard.test.ts b/src/imessage/monitor/reflection-guard.test.ts new file mode 100644 index 00000000000..d7156b93da5 --- /dev/null +++ b/src/imessage/monitor/reflection-guard.test.ts @@ -0,0 +1,107 @@ +import { describe, expect, it } from "vitest"; +import { detectReflectedContent } from "./reflection-guard.js"; + +describe("detectReflectedContent", () => { + it("returns false for empty text", () => { + expect(detectReflectedContent("").isReflection).toBe(false); + }); + + it("returns false for normal user text", () => { + const result = detectReflectedContent("Hey, what's the weather today?"); + expect(result.isReflection).toBe(false); + expect(result.matchedLabels).toEqual([]); + }); + + it("detects +#+#+#+# separator pattern", () => { + const result = detectReflectedContent("NO_REPLY +#+#+#+#+#+assistant to=final"); + expect(result.isReflection).toBe(true); + expect(result.matchedLabels).toContain("internal-separator"); + }); + + it("detects assistant to=final marker", () => { + const result = detectReflectedContent("some text assistant to=final rest"); + expect(result.isReflection).toBe(true); + expect(result.matchedLabels).toContain("assistant-role-marker"); + }); + + it("detects tags", () => { + const result = detectReflectedContent("internal reasoning"); + expect(result.isReflection).toBe(true); + expect(result.matchedLabels).toContain("thinking-tag"); + }); + + it("detects tags", () => { + const result = detectReflectedContent("secret"); + expect(result.isReflection).toBe(true); + expect(result.matchedLabels).toContain("thinking-tag"); + }); + + it("detects tags", () => { + const result = detectReflectedContent("data"); + expect(result.isReflection).toBe(true); + expect(result.matchedLabels).toContain("relevant-memories-tag"); + }); + + it("detects tags", () => { + const result = detectReflectedContent("visible"); + expect(result.isReflection).toBe(true); + expect(result.matchedLabels).toContain("final-tag"); + }); + + it("returns multiple matched labels for combined markers", () => { + const text = "NO_REPLY +#+#+#+# step assistant to=final"; + const result = detectReflectedContent(text); + expect(result.isReflection).toBe(true); + expect(result.matchedLabels.length).toBeGreaterThanOrEqual(3); + }); + + it("ignores reflection markers inside inline code", () => { + const result = detectReflectedContent( + "Please keep `debug trace` in the example output", + ); + expect(result.isReflection).toBe(false); + expect(result.matchedLabels).toEqual([]); + }); + + it("ignores reflection markers inside fenced code blocks", () => { + const result = detectReflectedContent( + [ + "User pasted a repro snippet:", + "```xml", + "cached", + "assistant to=final", + "```", + ].join("\n"), + ); + expect(result.isReflection).toBe(false); + expect(result.matchedLabels).toEqual([]); + }); + + it("still flags markers that appear outside code blocks", () => { + const result = detectReflectedContent( + ["```xml", "inside code", "```", "", "assistant to=final"].join("\n"), + ); + expect(result.isReflection).toBe(true); + expect(result.matchedLabels).toContain("assistant-role-marker"); + }); + + it("does not flag normal code discussion about thinking", () => { + const result = detectReflectedContent("I was thinking about your question"); + expect(result.isReflection).toBe(false); + }); + + it("flags '' as reflection when it forms a complete tag", () => { + const result = detectReflectedContent("Here is my "); + expect(result.isReflection).toBe(true); + }); + + it("does not flag partial tag without closing bracket", () => { + const result = detectReflectedContent("I sent a ' phrase without closing bracket", () => { + const result = detectReflectedContent("This is a ` to avoid false-positives on phrases like "". +const THINKING_TAG_RE = /<\s*\/?\s*(?:think(?:ing)?|thought|antthinking)\b[^<>]*>/i; +const RELEVANT_MEMORIES_TAG_RE = /<\s*\/?\s*relevant[-_]memories\b[^<>]*>/i; +// Require closing `>` to avoid false-positives on phrases like "". +const FINAL_TAG_RE = /<\s*\/?\s*final\b[^<>]*>/i; + +const REFLECTION_PATTERNS: Array<{ re: RegExp; label: string }> = [ + { re: INTERNAL_SEPARATOR_RE, label: "internal-separator" }, + { re: ASSISTANT_ROLE_MARKER_RE, label: "assistant-role-marker" }, + { re: THINKING_TAG_RE, label: "thinking-tag" }, + { re: RELEVANT_MEMORIES_TAG_RE, label: "relevant-memories-tag" }, + { re: FINAL_TAG_RE, label: "final-tag" }, +]; + +export type ReflectionDetection = { + isReflection: boolean; + matchedLabels: string[]; +}; + +function hasMatchOutsideCode(text: string, re: RegExp): boolean { + const codeRegions = findCodeRegions(text); + const globalRe = new RegExp(re.source, re.flags.includes("g") ? re.flags : `${re.flags}g`); + + for (const match of text.matchAll(globalRe)) { + const start = match.index ?? -1; + if (start >= 0 && !isInsideCode(start, codeRegions)) { + return true; + } + } + + return false; +} + +/** + * Check whether an inbound message appears to be a reflection of + * assistant-originated content. Returns matched pattern labels for telemetry. + */ +export function detectReflectedContent(text: string): ReflectionDetection { + if (!text) { + return { isReflection: false, matchedLabels: [] }; + } + + const matchedLabels: string[] = []; + for (const { re, label } of REFLECTION_PATTERNS) { + if (hasMatchOutsideCode(text, re)) { + matchedLabels.push(label); + } + } + + return { + isReflection: matchedLabels.length > 0, + matchedLabels, + }; +} diff --git a/src/imessage/monitor/sanitize-outbound.test.ts b/src/imessage/monitor/sanitize-outbound.test.ts new file mode 100644 index 00000000000..ad70b558731 --- /dev/null +++ b/src/imessage/monitor/sanitize-outbound.test.ts @@ -0,0 +1,64 @@ +import { describe, expect, it } from "vitest"; +import { sanitizeOutboundText } from "./sanitize-outbound.js"; + +describe("sanitizeOutboundText", () => { + it("returns empty string unchanged", () => { + expect(sanitizeOutboundText("")).toBe(""); + }); + + it("preserves normal user-facing text", () => { + const text = "Hello! How can I help you today?"; + expect(sanitizeOutboundText(text)).toBe(text); + }); + + it("strips tags and content", () => { + const text = "internal reasoningThe answer is 42."; + expect(sanitizeOutboundText(text)).toBe("The answer is 42."); + }); + + it("strips tags and content", () => { + const text = "secretVisible reply"; + expect(sanitizeOutboundText(text)).toBe("Visible reply"); + }); + + it("strips tags", () => { + const text = "Hello world"; + expect(sanitizeOutboundText(text)).toBe("Hello world"); + }); + + it("strips tags and content", () => { + const text = "memory dataVisible"; + expect(sanitizeOutboundText(text)).toBe("Visible"); + }); + + it("strips +#+#+#+# separator patterns", () => { + const text = "NO_REPLY +#+#+#+#+#+ more internal stuff"; + expect(sanitizeOutboundText(text)).not.toContain("+#+#"); + }); + + it("strips assistant to=final markers", () => { + const text = "Some text assistant to=final more text"; + const result = sanitizeOutboundText(text); + expect(result).not.toMatch(/assistant\s+to\s*=\s*final/i); + }); + + it("strips trailing role turn markers", () => { + const text = "Hello\nassistant:\nuser:"; + const result = sanitizeOutboundText(text); + expect(result).not.toMatch(/^assistant:$/m); + }); + + it("collapses excessive blank lines after stripping", () => { + const text = "Hello\n\n\n\n\nWorld"; + expect(sanitizeOutboundText(text)).toBe("Hello\n\nWorld"); + }); + + it("handles combined internal markers in one message", () => { + const text = "step 1NO_REPLY +#+#+#+# assistant to=final\n\nActual reply"; + const result = sanitizeOutboundText(text); + expect(result).not.toContain(""); + expect(result).not.toContain("+#+#"); + expect(result).not.toMatch(/assistant to=final/i); + expect(result).toContain("Actual reply"); + }); +}); diff --git a/src/imessage/monitor/sanitize-outbound.ts b/src/imessage/monitor/sanitize-outbound.ts new file mode 100644 index 00000000000..9fe1664e1eb --- /dev/null +++ b/src/imessage/monitor/sanitize-outbound.ts @@ -0,0 +1,31 @@ +import { stripAssistantInternalScaffolding } from "../../shared/text/assistant-visible-text.js"; + +/** + * Patterns that indicate assistant-internal metadata leaked into text. + * These must never reach a user-facing channel. + */ +const INTERNAL_SEPARATOR_RE = /(?:#\+){2,}#?/g; +const ASSISTANT_ROLE_MARKER_RE = /\bassistant\s+to\s*=\s*\w+/gi; +const ROLE_TURN_MARKER_RE = /\b(?:user|system|assistant)\s*:\s*$/gm; + +/** + * Strip all assistant-internal scaffolding from outbound text before delivery. + * Applies reasoning/thinking tag removal, memory tag removal, and + * model-specific internal separator stripping. + */ +export function sanitizeOutboundText(text: string): string { + if (!text) { + return text; + } + + let cleaned = stripAssistantInternalScaffolding(text); + + cleaned = cleaned.replace(INTERNAL_SEPARATOR_RE, ""); + cleaned = cleaned.replace(ASSISTANT_ROLE_MARKER_RE, ""); + cleaned = cleaned.replace(ROLE_TURN_MARKER_RE, ""); + + // Collapse excessive blank lines left after stripping. + cleaned = cleaned.replace(/\n{3,}/g, "\n\n").trim(); + + return cleaned; +}