diff --git a/extensions/telegram/src/draft-stream.test.ts b/extensions/telegram/src/draft-stream.test.ts index 8f10e552406..014f2101de1 100644 --- a/extensions/telegram/src/draft-stream.test.ts +++ b/extensions/telegram/src/draft-stream.test.ts @@ -669,3 +669,263 @@ describe("draft stream initial message debounce", () => { }); }); }); + +describe("HTML parse error fallback", () => { + afterEach(() => { + __testing.resetTelegramDraftStreamForTests(); + }); + + const htmlRenderer = (text: string) => ({ text: `${text}`, parseMode: "HTML" as const }); + + function createRenderedStream( + api: ReturnType, + overrides: Omit, "api" | "chatId"> = {}, + ) { + return createDraftStream(api, { renderText: htmlRenderer, ...overrides }); + } + + it("retries editMessageText as plain text when HTML parse error occurs", async () => { + const api = createMockDraftApi(); + api.editMessageText + .mockRejectedValueOnce( + new Error("400: Bad Request: can't parse entities: unexpected end tag"), + ) + .mockResolvedValueOnce(true); + const warn = vi.fn(); + const stream = createRenderedStream(api, { warn }); + + stream.update("hello"); + await stream.flush(); + expect(api.sendMessage).toHaveBeenCalledWith(123, "hello", { parse_mode: "HTML" }); + + stream.update("hello again"); + await stream.flush(); + + expect(api.editMessageText).toHaveBeenCalledTimes(2); + expect(api.editMessageText).toHaveBeenNthCalledWith(1, 123, 17, "hello again", { + parse_mode: "HTML", + }); + expect(api.editMessageText).toHaveBeenNthCalledWith(2, 123, 17, "hello again"); + expect(warn).toHaveBeenCalledWith( + "telegram stream preview edit: HTML parse error, retrying as plain text", + ); + }); + + it("propagates to outer handler when edit plain text retry also fails", async () => { + const api = createMockDraftApi(); + api.editMessageText + .mockRejectedValueOnce( + new Error("400: Bad Request: can't parse entities: unexpected end tag"), + ) + .mockRejectedValueOnce(new Error("500: Internal Server Error")); + const warn = vi.fn(); + const stream = createRenderedStream(api, { warn }); + + stream.update("hello"); + await stream.flush(); + + stream.update("hello again"); + await stream.flush(); + + expect(warn).toHaveBeenCalledWith( + expect.stringContaining("telegram stream preview failed: 500: Internal Server Error"), + ); + + // Stream is stopped — further updates are no-ops + stream.update("third"); + await stream.flush(); + expect(api.editMessageText).toHaveBeenCalledTimes(2); + }); + + it("retries first sendMessage as plain text when HTML parse error occurs", async () => { + const api = createMockDraftApi(); + api.sendMessage + .mockRejectedValueOnce( + new Error("400: Bad Request: can't parse entities: unexpected end tag"), + ) + .mockResolvedValueOnce({ message_id: 17 }); + const warn = vi.fn(); + const stream = createRenderedStream(api, { warn }); + + stream.update("hello"); + await stream.flush(); + + expect(api.sendMessage).toHaveBeenCalledTimes(2); + expect(api.sendMessage).toHaveBeenNthCalledWith(1, 123, "hello", { parse_mode: "HTML" }); + expect(api.sendMessage).toHaveBeenNthCalledWith(2, 123, "hello", undefined); + expect(warn).toHaveBeenCalledWith( + "telegram stream preview send: HTML parse error, retrying as plain text", + ); + + // Stream continues with captured message id + stream.update("hello again"); + await stream.flush(); + expect(api.editMessageText).toHaveBeenCalledWith(123, 17, "hello again"); + }); + + it("retries sendMessageDraft as plain text when HTML parse error occurs", async () => { + const api = createMockDraftApi(); + api.sendMessageDraft + .mockRejectedValueOnce( + new Error("400: Bad Request: can't parse entities: unexpected end tag"), + ) + .mockResolvedValueOnce(true); + const warn = vi.fn(); + const stream = createDraftStream(api, { + thread: { id: 42, scope: "dm" }, + previewTransport: "draft", + renderText: htmlRenderer, + warn, + }); + + stream.update("hello"); + await stream.flush(); + + expect(api.sendMessageDraft).toHaveBeenCalledTimes(2); + expect(api.sendMessageDraft).toHaveBeenNthCalledWith( + 1, + 123, + expect.any(Number), + "hello", + { message_thread_id: 42, parse_mode: "HTML" }, + ); + expect(api.sendMessageDraft).toHaveBeenNthCalledWith(2, 123, expect.any(Number), "hello", { + message_thread_id: 42, + }); + expect(warn).toHaveBeenCalledWith( + "telegram stream draft preview: HTML parse error, retrying as plain text", + ); + }); + + it("treats MESSAGE_NOT_MODIFIED on edit as success and continues streaming", async () => { + const api = createMockDraftApi(); + api.editMessageText + .mockRejectedValueOnce(new Error("400: Bad Request: message is not modified")) + .mockResolvedValue(true); + const stream = createDraftStream(api); + + stream.update("hello"); + await stream.flush(); + expect(api.sendMessage).toHaveBeenCalledTimes(1); + + stream.update("hello2"); + await stream.flush(); + expect(api.editMessageText).toHaveBeenCalledTimes(1); + + // Stream continues working after MESSAGE_NOT_MODIFIED + stream.update("hello3"); + await stream.flush(); + expect(api.editMessageText).toHaveBeenCalledTimes(2); + }); + + it("disables parse_mode for remaining updates in the same generation after a parse error", async () => { + const api = createMockDraftApi(); + api.editMessageText + .mockRejectedValueOnce( + new Error("400: Bad Request: can't parse entities: unexpected end tag"), + ) + .mockResolvedValue(true); + const warn = vi.fn(); + const stream = createRenderedStream(api, { warn }); + + stream.update("hello"); + await stream.flush(); + expect(api.sendMessage).toHaveBeenCalledWith(123, "hello", { parse_mode: "HTML" }); + + // Edit triggers parse error → plain text retry + stream.update("hello again"); + await stream.flush(); + expect(api.editMessageText).toHaveBeenNthCalledWith(2, 123, 17, "hello again"); + + // Subsequent update uses plain text directly (no parse_mode) + stream.update("third update"); + await stream.flush(); + expect(api.editMessageText).toHaveBeenCalledTimes(3); + expect(api.editMessageText.mock.calls[2]).toEqual([123, 17, "third update"]); + }); + + it("re-enables HTML parse mode after forceNewMessage resets the generation", async () => { + const api = createMockDraftApi(); + api.sendMessage + .mockResolvedValueOnce({ message_id: 17 }) + .mockResolvedValueOnce({ message_id: 42 }); + api.editMessageText + .mockRejectedValueOnce( + new Error("400: Bad Request: can't parse entities: unexpected end tag"), + ) + .mockResolvedValue(true); + const warn = vi.fn(); + const stream = createRenderedStream(api, { warn }); + + stream.update("hello"); + await stream.flush(); + expect(api.sendMessage).toHaveBeenCalledWith(123, "hello", { parse_mode: "HTML" }); + + // Edit triggers parse error → disables parse mode for gen 0 + stream.update("hello again"); + await stream.flush(); + expect(api.editMessageText).toHaveBeenNthCalledWith(2, 123, 17, "hello again"); + + // Force new message → new generation, parse mode re-enabled + stream.forceNewMessage(); + stream.update("new message"); + await stream.flush(); + + expect(api.sendMessage).toHaveBeenNthCalledWith(2, 123, "new message", { + parse_mode: "HTML", + }); + }); + + it("does not stop stream when HTML parse error reaches outer handler", async () => { + const api = createMockDraftApi(); + api.editMessageText + .mockRejectedValueOnce( + new Error("400: Bad Request: can't parse entities: unexpected end tag"), + ) + .mockRejectedValueOnce( + new Error("400: Bad Request: can't parse entities: unexpected end tag"), + ) + .mockResolvedValue(true); + const warn = vi.fn(); + const stream = createRenderedStream(api, { warn }); + + stream.update("hello"); + await stream.flush(); + + // Both HTML edit and plain text retry fail with parse errors + stream.update("hello again"); + await stream.flush(); + + expect(warn).toHaveBeenCalledWith( + "telegram stream preview: HTML parse error escaped to outer handler (degrading to plain text)", + ); + + // Stream still works (not stopped) + stream.update("third update"); + await stream.flush(); + expect(api.editMessageText).toHaveBeenCalledTimes(3); + expect(api.editMessageText.mock.calls[2]).toEqual([123, 17, "third update"]); + }); + + it("stops stream on non-parse errors in outer handler", async () => { + const api = createMockDraftApi(); + api.editMessageText.mockRejectedValueOnce(new Error("429: Too Many Requests")); + const warn = vi.fn(); + const stream = createRenderedStream(api, { warn }); + + stream.update("hello"); + await stream.flush(); + + stream.update("hello again"); + await stream.flush(); + + expect(warn).toHaveBeenCalledWith( + expect.stringContaining("telegram stream preview failed: 429: Too Many Requests"), + ); + + // Stream is stopped — further updates are no-ops + stream.update("third"); + await stream.flush(); + expect(api.editMessageText).toHaveBeenCalledTimes(1); + }); +}); diff --git a/extensions/telegram/src/draft-stream.ts b/extensions/telegram/src/draft-stream.ts index ae943f169d3..127089d5449 100644 --- a/extensions/telegram/src/draft-stream.ts +++ b/extensions/telegram/src/draft-stream.ts @@ -1,9 +1,22 @@ import type { Bot } from "grammy"; import { createFinalizableDraftLifecycle } from "openclaw/plugin-sdk/channel-lifecycle"; import { resolveGlobalSingleton } from "openclaw/plugin-sdk/text-runtime"; +import { formatErrorMessage } from "../../../src/infra/errors.js"; import { buildTelegramThreadParams, type TelegramThreadSpec } from "./bot/helpers.js"; import { isSafeToRetrySendError, isTelegramClientRejection } from "./network-errors.js"; +const PARSE_ERR_RE = /can't parse entities|parse entities|find end of the entity/i; +const MESSAGE_NOT_MODIFIED_RE = + /400:\s*Bad Request:\s*message is not modified|MESSAGE_NOT_MODIFIED/i; + +function isTelegramHtmlParseError(err: unknown): boolean { + return PARSE_ERR_RE.test(formatErrorMessage(err)); +} + +function isMessageNotModifiedError(err: unknown): boolean { + return MESSAGE_NOT_MODIFIED_RE.test(formatErrorMessage(err)); +} + const TELEGRAM_STREAM_MAX_CHARS = 4096; const DEFAULT_THROTTLE_MS = 1000; const TELEGRAM_DRAFT_ID_MAX = 2_147_483_647; @@ -148,10 +161,14 @@ export function createTelegramDraftStream(params: { let lastSentParseMode: "HTML" | undefined; let previewRevision = 0; let generation = 0; + /** Generation for which HTML parse_mode has been disabled due to parse errors. */ + let parseModeDisabledForGeneration: number | undefined; type PreviewSendParams = { renderedText: string; renderedParseMode: "HTML" | undefined; sendGeneration: number; + /** Original unrendered text for plain-text fallback on HTML parse errors. */ + plainText: string; }; const sendRenderedMessageWithThreadFallback = async (sendArgs: { renderedText: string; @@ -195,14 +212,36 @@ export function createTelegramDraftStream(params: { renderedText, renderedParseMode, sendGeneration, + plainText, }: PreviewSendParams): Promise => { + // Resolve effective parse mode: disabled for this generation after a prior parse error. + const effectiveParseMode = + parseModeDisabledForGeneration === sendGeneration ? undefined : renderedParseMode; + const effectiveText = effectiveParseMode ? renderedText : plainText; + if (typeof streamMessageId === "number") { - if (renderedParseMode) { - await params.api.editMessageText(chatId, streamMessageId, renderedText, { - parse_mode: renderedParseMode, - }); - } else { - await params.api.editMessageText(chatId, streamMessageId, renderedText); + try { + if (effectiveParseMode) { + await params.api.editMessageText(chatId, streamMessageId, effectiveText, { + parse_mode: effectiveParseMode, + }); + } else { + await params.api.editMessageText(chatId, streamMessageId, effectiveText); + } + } catch (err) { + if (isMessageNotModifiedError(err)) { + // Harmless noop — content identical to current message. + return true; + } + if (effectiveParseMode && isTelegramHtmlParseError(err)) { + // HTML rejected by Telegram — retry as plain text and disable + // parse_mode for the rest of this generation. + parseModeDisabledForGeneration = sendGeneration; + params.warn?.("telegram stream preview edit: HTML parse error, retrying as plain text"); + await params.api.editMessageText(chatId, streamMessageId, plainText); + return true; + } + throw err; } return true; } @@ -210,19 +249,40 @@ export function createTelegramDraftStream(params: { let sent: Awaited>["sent"]; try { ({ sent } = await sendRenderedMessageWithThreadFallback({ - renderedText, - renderedParseMode, + renderedText: effectiveText, + renderedParseMode: effectiveParseMode, fallbackWarnMessage: "telegram stream preview send failed with message_thread_id, retrying without thread", })); } catch (err) { - // Pre-connect failures (DNS, refused) and explicit Telegram rejections (4xx) - // guarantee the message was never delivered — clear the flag so - // sendMayHaveLanded() doesn't suppress fallback. - if (isSafeToRetrySendError(err) || isTelegramClientRejection(err)) { - messageSendAttempted = false; + if (effectiveParseMode && isTelegramHtmlParseError(err)) { + // HTML rejected on first send — retry as plain text. + parseModeDisabledForGeneration = sendGeneration; + params.warn?.("telegram stream preview send: HTML parse error, retrying as plain text"); + try { + ({ sent } = await sendRenderedMessageWithThreadFallback({ + renderedText: plainText, + renderedParseMode: undefined, + fallbackWarnMessage: + "telegram stream preview send (plain) failed with message_thread_id, retrying without thread", + })); + } catch (plainErr) { + // Plain text retry also failed — reset messageSendAttempted when the + // error guarantees the message was never delivered. + if (isSafeToRetrySendError(plainErr) || isTelegramClientRejection(plainErr)) { + messageSendAttempted = false; + } + throw plainErr; + } + } else { + // Pre-connect failures (DNS, refused) and explicit Telegram rejections (4xx) + // guarantee the message was never delivered — clear the flag so + // sendMayHaveLanded() doesn't suppress fallback. + if (isSafeToRetrySendError(err) || isTelegramClientRejection(err)) { + messageSendAttempted = false; + } + throw err; } - throw err; } const sentMessageId = sent?.message_id; if (typeof sentMessageId !== "number" || !Number.isFinite(sentMessageId)) { @@ -245,21 +305,35 @@ export function createTelegramDraftStream(params: { const sendDraftTransportPreview = async ({ renderedText, renderedParseMode, + sendGeneration, + plainText, }: PreviewSendParams): Promise => { + const effectiveParseMode = + parseModeDisabledForGeneration === sendGeneration ? undefined : renderedParseMode; + const effectiveText = effectiveParseMode ? renderedText : plainText; const draftId = streamDraftId ?? allocateTelegramDraftId(); streamDraftId = draftId; - const draftParams = { - ...(threadParams?.message_thread_id != null - ? { message_thread_id: threadParams.message_thread_id } - : {}), - ...(renderedParseMode ? { parse_mode: renderedParseMode } : {}), + const buildDraftParams = (parseMode: "HTML" | undefined) => { + const p: { message_thread_id?: number; parse_mode?: "HTML" } = {}; + if (threadParams?.message_thread_id != null) { + p.message_thread_id = threadParams.message_thread_id; + } + if (parseMode) { + p.parse_mode = parseMode; + } + return Object.keys(p).length > 0 ? p : undefined; }; - await resolvedDraftApi!( - chatId, - draftId, - renderedText, - Object.keys(draftParams).length > 0 ? draftParams : undefined, - ); + try { + await resolvedDraftApi!(chatId, draftId, effectiveText, buildDraftParams(effectiveParseMode)); + } catch (err) { + if (effectiveParseMode && isTelegramHtmlParseError(err)) { + parseModeDisabledForGeneration = sendGeneration; + params.warn?.("telegram stream draft preview: HTML parse error, retrying as plain text"); + await resolvedDraftApi!(chatId, draftId, plainText, buildDraftParams(undefined)); + } else { + throw err; + } + } return true; }; @@ -278,12 +352,17 @@ export function createTelegramDraftStream(params: { if (!renderedText) { return false; } - if (renderedText.length > maxChars) { - // Telegram text messages/edits cap at 4096 chars. + // Telegram text messages/edits cap at 4096 chars. + // Use the effective payload length: when HTML parse mode is disabled for + // this generation, the actual payload is the shorter plain text, not the + // expanded HTML renderedText. + const effectiveLength = + parseModeDisabledForGeneration === generation ? trimmed.length : renderedText.length; + if (effectiveLength > maxChars) { // Stop streaming once we exceed the cap to avoid repeated API failures. streamState.stopped = true; params.warn?.( - `telegram stream preview stopped (text length ${renderedText.length} > ${maxChars})`, + `telegram stream preview stopped (text length ${effectiveLength} > ${maxChars})`, ); return false; } @@ -299,8 +378,6 @@ export function createTelegramDraftStream(params: { } } - lastSentText = renderedText; - lastSentParseMode = renderedParseMode; try { let sent = false; if (previewTransport === "draft") { @@ -309,6 +386,7 @@ export function createTelegramDraftStream(params: { renderedText, renderedParseMode, sendGeneration, + plainText: trimmed, }); } catch (err) { if (!shouldFallbackFromDraftTransport(err)) { @@ -323,6 +401,7 @@ export function createTelegramDraftStream(params: { renderedText, renderedParseMode, sendGeneration, + plainText: trimmed, }); } } else { @@ -330,14 +409,31 @@ export function createTelegramDraftStream(params: { renderedText, renderedParseMode, sendGeneration, + plainText: trimmed, }); } - if (sent) { + if (sent && sendGeneration === generation) { previewRevision += 1; lastDeliveredText = trimmed; + // Reflect the actual text and parse mode that was delivered. When the + // transport fell back to plain text, these may differ from the original + // renderedText/renderedParseMode. + lastSentText = parseModeDisabledForGeneration === sendGeneration ? trimmed : renderedText; + lastSentParseMode = + parseModeDisabledForGeneration === sendGeneration ? undefined : renderedParseMode; } return sent; } catch (err) { + // HTML parse errors should not permanently kill the stream — the next + // update will arrive with more text that may produce valid HTML, and + // the transport functions already disable parse_mode for this generation. + if (isTelegramHtmlParseError(err)) { + parseModeDisabledForGeneration = sendGeneration; + params.warn?.( + `telegram stream preview: HTML parse error escaped to outer handler (degrading to plain text)`, + ); + return false; + } streamState.stopped = true; params.warn?.( `telegram stream preview failed: ${err instanceof Error ? err.message : String(err)}`, @@ -345,7 +441,6 @@ export function createTelegramDraftStream(params: { return false; } }; - const { loop, update, stop, clear } = createFinalizableDraftLifecycle({ throttleMs, state: streamState, @@ -394,13 +489,21 @@ export function createTelegramDraftStream(params: { if (previewTransport === "message" && typeof streamMessageId === "number") { return streamMessageId; } - // For draft transport, use the rendered snapshot first so parse_mode stays - // aligned with the text being materialized. - const renderedText = lastSentText || lastDeliveredText; + // For draft transport, prefer the unrendered text when HTML parse mode has + // been disabled for the current generation — avoids re-sending the same + // malformed HTML that caused the parse error during streaming. + const htmlDisabled = parseModeDisabledForGeneration === generation; + const renderedText = htmlDisabled + ? lastDeliveredText || lastSentText + : lastSentText || lastDeliveredText; if (!renderedText) { return undefined; } - const renderedParseMode = lastSentText ? lastSentParseMode : undefined; + const renderedParseMode = htmlDisabled + ? undefined + : lastSentText + ? lastSentParseMode + : undefined; try { const { sent, usedThreadParams } = await sendRenderedMessageWithThreadFallback({ renderedText,