From edcf3e9d32d3b71c9686d4678fc0802139690c69 Mon Sep 17 00:00:00 2001 From: Ayaan Zaidi Date: Wed, 18 Mar 2026 09:47:26 +0530 Subject: [PATCH] test(telegram): add dispatch and handler seams --- .../telegram/src/bot-handlers.runtime.ts | 404 ++++++++++++++---- .../telegram/src/bot-message-dispatch.ts | 18 +- .../telegram/src/bot-native-commands.ts | 24 +- 3 files changed, 366 insertions(+), 80 deletions(-) diff --git a/extensions/telegram/src/bot-handlers.runtime.ts b/extensions/telegram/src/bot-handlers.runtime.ts index 70e60bd30ad..e3a9be85d18 100644 --- a/extensions/telegram/src/bot-handlers.runtime.ts +++ b/extensions/telegram/src/bot-handlers.runtime.ts @@ -1,13 +1,12 @@ import type { Message, ReactionTypeEmoji } from "@grammyjs/types"; import { resolveAgentDir, resolveDefaultAgentId } from "openclaw/plugin-sdk/agent-runtime"; import { resolveDefaultModelForAgent } from "openclaw/plugin-sdk/agent-runtime"; +import { shouldDebounceTextInbound } from "openclaw/plugin-sdk/channel-runtime"; import { resolveChannelConfigWrites } from "openclaw/plugin-sdk/channel-runtime"; -import { loadConfig } from "openclaw/plugin-sdk/config-runtime"; import { writeConfigFile } from "openclaw/plugin-sdk/config-runtime"; import { loadSessionStore, resolveSessionStoreEntry, - resolveStorePath, updateSessionStore, } from "openclaw/plugin-sdk/config-runtime"; import type { DmPolicy } from "openclaw/plugin-sdk/config-runtime"; @@ -17,21 +16,22 @@ import type { TelegramTopicConfig, } from "openclaw/plugin-sdk/config-runtime"; import { applyModelOverrideToSessionEntry } from "openclaw/plugin-sdk/config-runtime"; -import { readChannelAllowFromStore } from "openclaw/plugin-sdk/conversation-runtime"; import { buildPluginBindingResolvedText, parsePluginBindingApprovalCustomId, resolvePluginConversationBindingApproval, } from "openclaw/plugin-sdk/conversation-runtime"; -import { enqueueSystemEvent } from "openclaw/plugin-sdk/infra-runtime"; import { dispatchPluginInteractiveHandler } from "openclaw/plugin-sdk/plugin-runtime"; +import { + createInboundDebouncer, + resolveInboundDebounceMs, +} from "openclaw/plugin-sdk/reply-runtime"; import { buildCommandsPaginationKeyboard } from "openclaw/plugin-sdk/reply-runtime"; import { buildModelsProviderData, formatModelsAvailableHeader, } from "openclaw/plugin-sdk/reply-runtime"; import { resolveStoredModelOverride } from "openclaw/plugin-sdk/reply-runtime"; -import { listSkillCommandsForAgents } from "openclaw/plugin-sdk/reply-runtime"; import { buildCommandsMessagePaginated } from "openclaw/plugin-sdk/reply-runtime"; import { resolveAgentRoute } from "openclaw/plugin-sdk/routing"; import { resolveThreadSessionKeys } from "openclaw/plugin-sdk/routing"; @@ -42,19 +42,22 @@ import { normalizeDmAllowFromWithStore, type NormalizedAllowFrom, } from "./bot-access.js"; -import { - createTelegramInboundBufferRuntime, - type TextFragmentEntry, -} from "./bot-handlers.buffers.js"; +import { defaultTelegramBotDeps } from "./bot-deps.js"; import { APPROVE_CALLBACK_DATA_RE, hasInboundMedia, hasReplyTargetMedia, isMediaSizeLimitError, + isRecoverableMediaGroupError, + resolveInboundMediaFileId, } from "./bot-handlers.media.js"; import type { TelegramMediaRef } from "./bot-message-context.js"; import { RegisterTelegramHandlerParams } from "./bot-native-commands.js"; -import { type TelegramUpdateKeyContext } from "./bot-updates.js"; +import { + MEDIA_GROUP_TIMEOUT_MS, + type MediaGroupEntry, + type TelegramUpdateKeyContext, +} from "./bot-updates.js"; import { resolveMedia } from "./bot/delivery.js"; import { getTelegramTextParts, @@ -90,7 +93,6 @@ import { type ProviderInfo, } from "./model-buttons.js"; import { buildInlineKeyboard } from "./send.js"; -import { wasSentByBot } from "./sent-message-cache.js"; export const registerTelegramHandlers = ({ cfg, @@ -108,42 +110,161 @@ export const registerTelegramHandlers = ({ shouldSkipUpdate, processMessage, logger, + telegramDeps = defaultTelegramBotDeps, }: RegisterTelegramHandlerParams) => { - const loadStoreAllowFrom = async () => - readChannelAllowFromStore("telegram", process.env, accountId).catch(() => []); + const DEFAULT_TEXT_FRAGMENT_MAX_GAP_MS = 1500; + const TELEGRAM_TEXT_FRAGMENT_START_THRESHOLD_CHARS = 4000; + const TELEGRAM_TEXT_FRAGMENT_MAX_GAP_MS = + typeof opts.testTimings?.textFragmentGapMs === "number" && + Number.isFinite(opts.testTimings.textFragmentGapMs) + ? Math.max(10, Math.floor(opts.testTimings.textFragmentGapMs)) + : DEFAULT_TEXT_FRAGMENT_MAX_GAP_MS; + const TELEGRAM_TEXT_FRAGMENT_MAX_ID_GAP = 1; + const TELEGRAM_TEXT_FRAGMENT_MAX_PARTS = 12; + const TELEGRAM_TEXT_FRAGMENT_MAX_TOTAL_CHARS = 50_000; + const mediaGroupTimeoutMs = + typeof opts.testTimings?.mediaGroupFlushMs === "number" && + Number.isFinite(opts.testTimings.mediaGroupFlushMs) + ? Math.max(10, Math.floor(opts.testTimings.mediaGroupFlushMs)) + : MEDIA_GROUP_TIMEOUT_MS; - const { - buildSyntheticContext, - buildSyntheticTextMessage, - inboundDebouncer, - mediaGroupBuffer, - mediaGroupProcessing, - setMediaGroupProcessing, - mediaGroupTimeoutMs, - processMediaGroup, - textFragmentBuffer, - textFragmentProcessing, - setTextFragmentProcessing, - scheduleTextFragmentFlush, - flushTextFragments, - resolveReplyMediaForMessage, - resolveTelegramDebounceLane, - TELEGRAM_TEXT_FRAGMENT_MAX_GAP_MS, - TELEGRAM_TEXT_FRAGMENT_MAX_ID_GAP, - TELEGRAM_TEXT_FRAGMENT_MAX_PARTS, - TELEGRAM_TEXT_FRAGMENT_MAX_TOTAL_CHARS, - TELEGRAM_TEXT_FRAGMENT_START_THRESHOLD_CHARS, - } = createTelegramInboundBufferRuntime({ - accountId, - bot, - cfg, - logger, - mediaMaxBytes, - opts, - processMessage, - loadStoreAllowFrom, - runtime, - telegramTransport, + const mediaGroupBuffer = new Map(); + let mediaGroupProcessing: Promise = Promise.resolve(); + + type TextFragmentEntry = { + key: string; + messages: Array<{ msg: Message; ctx: TelegramContext; receivedAtMs: number }>; + timer: ReturnType; + }; + const textFragmentBuffer = new Map(); + let textFragmentProcessing: Promise = Promise.resolve(); + + const debounceMs = resolveInboundDebounceMs({ cfg, channel: "telegram" }); + const FORWARD_BURST_DEBOUNCE_MS = 80; + type TelegramDebounceLane = "default" | "forward"; + type TelegramDebounceEntry = { + ctx: TelegramContext; + msg: Message; + allMedia: TelegramMediaRef[]; + storeAllowFrom: string[]; + debounceKey: string | null; + debounceLane: TelegramDebounceLane; + botUsername?: string; + }; + const resolveTelegramDebounceLane = (msg: Message): TelegramDebounceLane => { + const forwardMeta = msg as { + forward_origin?: unknown; + forward_from?: unknown; + forward_from_chat?: unknown; + forward_sender_name?: unknown; + forward_date?: unknown; + }; + return (forwardMeta.forward_origin ?? + forwardMeta.forward_from ?? + forwardMeta.forward_from_chat ?? + forwardMeta.forward_sender_name ?? + forwardMeta.forward_date) + ? "forward" + : "default"; + }; + const buildSyntheticTextMessage = (params: { + base: Message; + text: string; + date?: number; + from?: Message["from"]; + }): Message => ({ + ...params.base, + ...(params.from ? { from: params.from } : {}), + text: params.text, + caption: undefined, + caption_entities: undefined, + entities: undefined, + ...(params.date != null ? { date: params.date } : {}), + }); + const buildSyntheticContext = ( + ctx: Pick & { getFile?: unknown }, + message: Message, + ): TelegramContext => { + const getFile = + typeof ctx.getFile === "function" + ? (ctx.getFile as TelegramContext["getFile"]).bind(ctx as object) + : async () => ({}); + return { message, me: ctx.me, getFile }; + }; + const inboundDebouncer = createInboundDebouncer({ + debounceMs, + resolveDebounceMs: (entry) => + entry.debounceLane === "forward" ? FORWARD_BURST_DEBOUNCE_MS : debounceMs, + buildKey: (entry) => entry.debounceKey, + shouldDebounce: (entry) => { + const text = entry.msg.text ?? entry.msg.caption ?? ""; + const hasDebounceableText = shouldDebounceTextInbound({ + text, + cfg, + commandOptions: { botUsername: entry.botUsername }, + }); + if (entry.debounceLane === "forward") { + // Forwarded bursts often split text + media into adjacent updates. + // Debounce media-only forward entries too so they can coalesce. + return hasDebounceableText || entry.allMedia.length > 0; + } + if (!hasDebounceableText) { + return false; + } + return entry.allMedia.length === 0; + }, + onFlush: async (entries) => { + const last = entries.at(-1); + if (!last) { + return; + } + if (entries.length === 1) { + const replyMedia = await resolveReplyMediaForMessage(last.ctx, last.msg); + await processMessage(last.ctx, last.allMedia, last.storeAllowFrom, undefined, replyMedia); + return; + } + const combinedText = entries + .map((entry) => entry.msg.text ?? entry.msg.caption ?? "") + .filter(Boolean) + .join("\n"); + const combinedMedia = entries.flatMap((entry) => entry.allMedia); + if (!combinedText.trim() && combinedMedia.length === 0) { + return; + } + const first = entries[0]; + const baseCtx = first.ctx; + const syntheticMessage = buildSyntheticTextMessage({ + base: first.msg, + text: combinedText, + date: last.msg.date ?? first.msg.date, + }); + const messageIdOverride = last.msg.message_id ? String(last.msg.message_id) : undefined; + const syntheticCtx = buildSyntheticContext(baseCtx, syntheticMessage); + const replyMedia = await resolveReplyMediaForMessage(baseCtx, syntheticMessage); + await processMessage( + syntheticCtx, + combinedMedia, + first.storeAllowFrom, + messageIdOverride ? { messageIdOverride } : undefined, + replyMedia, + ); + }, + onError: (err, items) => { + runtime.error?.(danger(`telegram debounce flush failed: ${String(err)}`)); + const chatId = items[0]?.msg.chat.id; + if (chatId != null) { + const threadId = items[0]?.msg.message_thread_id; + void bot.api + .sendMessage( + chatId, + "Something went wrong while processing your message. Please try again.", + threadId != null ? { message_thread_id: threadId } : undefined, + ) + .catch((sendErr) => { + logVerbose(`telegram: error fallback send failed: ${String(sendErr)}`); + }); + } + }, }); const resolveTelegramSessionState = (params: { @@ -190,7 +311,9 @@ export const registerTelegramHandlers = ({ ? resolveThreadSessionKeys({ baseSessionKey, threadId: `${params.chatId}:${dmThreadId}` }) : null; const sessionKey = threadKeys?.sessionKey ?? baseSessionKey; - const storePath = resolveStorePath(cfg.session?.store, { agentId: route.agentId }); + const storePath = telegramDeps.resolveStorePath(cfg.session?.store, { + agentId: route.agentId, + }); const store = loadSessionStore(storePath); const entry = resolveSessionStoreEntry({ store, sessionKey }).existing; const storedOverride = resolveStoredModelOverride({ @@ -227,6 +350,139 @@ export const registerTelegramHandlers = ({ }; }; + const processMediaGroup = async (entry: MediaGroupEntry) => { + try { + entry.messages.sort((a, b) => a.msg.message_id - b.msg.message_id); + + const captionMsg = entry.messages.find((m) => m.msg.caption || m.msg.text); + const primaryEntry = captionMsg ?? entry.messages[0]; + + const allMedia: TelegramMediaRef[] = []; + for (const { ctx } of entry.messages) { + let media; + try { + media = await resolveMedia(ctx, mediaMaxBytes, opts.token, telegramTransport); + } catch (mediaErr) { + if (!isRecoverableMediaGroupError(mediaErr)) { + throw mediaErr; + } + runtime.log?.( + warn(`media group: skipping photo that failed to fetch: ${String(mediaErr)}`), + ); + continue; + } + if (media) { + allMedia.push({ + path: media.path, + contentType: media.contentType, + stickerMetadata: media.stickerMetadata, + }); + } + } + + const storeAllowFrom = await loadStoreAllowFrom(); + const replyMedia = await resolveReplyMediaForMessage(primaryEntry.ctx, primaryEntry.msg); + await processMessage(primaryEntry.ctx, allMedia, storeAllowFrom, undefined, replyMedia); + } catch (err) { + runtime.error?.(danger(`media group handler failed: ${String(err)}`)); + } + }; + + const flushTextFragments = async (entry: TextFragmentEntry) => { + try { + entry.messages.sort((a, b) => a.msg.message_id - b.msg.message_id); + + const first = entry.messages[0]; + const last = entry.messages.at(-1); + if (!first || !last) { + return; + } + + const combinedText = entry.messages.map((m) => m.msg.text ?? "").join(""); + if (!combinedText.trim()) { + return; + } + + const syntheticMessage = buildSyntheticTextMessage({ + base: first.msg, + text: combinedText, + date: last.msg.date ?? first.msg.date, + }); + + const storeAllowFrom = await loadStoreAllowFrom(); + const baseCtx = first.ctx; + + await processMessage(buildSyntheticContext(baseCtx, syntheticMessage), [], storeAllowFrom, { + messageIdOverride: String(last.msg.message_id), + }); + } catch (err) { + runtime.error?.(danger(`text fragment handler failed: ${String(err)}`)); + } + }; + + const queueTextFragmentFlush = async (entry: TextFragmentEntry) => { + textFragmentProcessing = textFragmentProcessing + .then(async () => { + await flushTextFragments(entry); + }) + .catch(() => undefined); + await textFragmentProcessing; + }; + + const runTextFragmentFlush = async (entry: TextFragmentEntry) => { + textFragmentBuffer.delete(entry.key); + await queueTextFragmentFlush(entry); + }; + + const scheduleTextFragmentFlush = (entry: TextFragmentEntry) => { + clearTimeout(entry.timer); + entry.timer = setTimeout(async () => { + await runTextFragmentFlush(entry); + }, TELEGRAM_TEXT_FRAGMENT_MAX_GAP_MS); + }; + + const loadStoreAllowFrom = async () => + telegramDeps.readChannelAllowFromStore("telegram", process.env, accountId).catch(() => []); + + const resolveReplyMediaForMessage = async ( + ctx: TelegramContext, + msg: Message, + ): Promise => { + const replyMessage = msg.reply_to_message; + if (!replyMessage || !hasInboundMedia(replyMessage)) { + return []; + } + const replyFileId = resolveInboundMediaFileId(replyMessage); + if (!replyFileId) { + return []; + } + try { + const media = await resolveMedia( + { + message: replyMessage, + me: ctx.me, + getFile: async () => await bot.api.getFile(replyFileId), + }, + mediaMaxBytes, + opts.token, + telegramTransport, + ); + if (!media) { + return []; + } + return [ + { + path: media.path, + contentType: media.contentType, + stickerMetadata: media.stickerMetadata, + }, + ]; + } catch (err) { + logger.warn({ chatId: msg.chat.id, error: String(err) }, "reply media fetch failed"); + return []; + } + }; + const isAllowlistAuthorized = ( allow: NormalizedAllowFrom, senderId: string, @@ -502,7 +758,7 @@ export const registerTelegramHandlers = ({ if (user?.is_bot) { return; } - if (reactionMode === "own" && !wasSentByBot(chatId, messageId)) { + if (reactionMode === "own" && !telegramDeps.wasSentByBot(chatId, messageId)) { return; } const eventAuthContext = await resolveTelegramEventAuthorizationContext({ @@ -577,7 +833,7 @@ export const registerTelegramHandlers = ({ const parentPeer = buildTelegramParentPeer({ isGroup, resolvedThreadId, chatId }); // Fresh config for bindings lookup; other routing inputs are payload-derived. const route = resolveAgentRoute({ - cfg: loadConfig(), + cfg: telegramDeps.loadConfig(), channel: "telegram", accountId, peer: { kind: isGroup ? "group" : "direct", id: peerId }, @@ -589,7 +845,7 @@ export const registerTelegramHandlers = ({ for (const r of addedReactions) { const emoji = r.emoji; const text = `Telegram reaction added: ${emoji} by ${senderLabel} on msg ${messageId}`; - enqueueSystemEvent(text, { + telegramDeps.enqueueSystemEvent(text, { sessionKey, contextKey: `telegram:reaction:add:${chatId}:${messageId}:${user?.id ?? "anon"}:${emoji}`, }); @@ -663,14 +919,12 @@ export const registerTelegramHandlers = ({ // Not appendable (or limits exceeded): flush buffered entry first, then continue normally. clearTimeout(existing.timer); textFragmentBuffer.delete(key); - setTextFragmentProcessing( - textFragmentProcessing() - .then(async () => { - await flushTextFragments(existing); - }) - .catch(() => undefined), - ); - await textFragmentProcessing(); + textFragmentProcessing = textFragmentProcessing + .then(async () => { + await flushTextFragments(existing); + }) + .catch(() => undefined); + await textFragmentProcessing; } const shouldStart = text.length >= TELEGRAM_TEXT_FRAGMENT_START_THRESHOLD_CHARS; @@ -695,28 +949,24 @@ export const registerTelegramHandlers = ({ existing.messages.push({ msg, ctx }); existing.timer = setTimeout(async () => { mediaGroupBuffer.delete(mediaGroupId); - setMediaGroupProcessing( - mediaGroupProcessing() - .then(async () => { - await processMediaGroup(existing); - }) - .catch(() => undefined), - ); - await mediaGroupProcessing(); + mediaGroupProcessing = mediaGroupProcessing + .then(async () => { + await processMediaGroup(existing); + }) + .catch(() => undefined); + await mediaGroupProcessing; }, mediaGroupTimeoutMs); } else { - const entry = { + const entry: MediaGroupEntry = { messages: [{ msg, ctx }], timer: setTimeout(async () => { mediaGroupBuffer.delete(mediaGroupId); - setMediaGroupProcessing( - mediaGroupProcessing() - .then(async () => { - await processMediaGroup(entry); - }) - .catch(() => undefined), - ); - await mediaGroupProcessing(); + mediaGroupProcessing = mediaGroupProcessing + .then(async () => { + await processMediaGroup(entry); + }) + .catch(() => undefined); + await mediaGroupProcessing; }, mediaGroupTimeoutMs), }; mediaGroupBuffer.set(mediaGroupId, entry); @@ -1051,7 +1301,7 @@ export const registerTelegramHandlers = ({ } const agentId = paginationMatch[2]?.trim() || resolveDefaultAgentId(cfg); - const skillCommands = listSkillCommandsForAgents({ + const skillCommands = telegramDeps.listSkillCommandsForAgents({ cfg, agentIds: [agentId], }); @@ -1208,7 +1458,7 @@ export const registerTelegramHandlers = ({ // Directly set model override in session try { // Get session store path - const storePath = resolveStorePath(cfg.session?.store, { + const storePath = telegramDeps.resolveStorePath(cfg.session?.store, { agentId: sessionState.agentId, }); @@ -1288,7 +1538,7 @@ export const registerTelegramHandlers = ({ } // Check if old chat ID has config and migrate it - const currentConfig = loadConfig(); + const currentConfig = telegramDeps.loadConfig(); const migration = migrateTelegramGroupConfig({ cfg: currentConfig, accountId, diff --git a/extensions/telegram/src/bot-message-dispatch.ts b/extensions/telegram/src/bot-message-dispatch.ts index a8a4c376b0b..3e9a3084fdb 100644 --- a/extensions/telegram/src/bot-message-dispatch.ts +++ b/extensions/telegram/src/bot-message-dispatch.ts @@ -52,6 +52,18 @@ import { editMessageTelegram } from "./send.js"; import { cacheSticker, describeStickerImage } from "./sticker-cache.js"; const EMPTY_RESPONSE_FALLBACK = "No response generated. Please try again."; +const DEFAULT_BOT_MESSAGE_DISPATCH_RUNTIME = { + dispatchReplyWithBufferedBlockDispatcher, +}; +let botMessageDispatchRuntimeForTest: + | Partial + | undefined; + +export function setBotMessageDispatchRuntimeForTest( + runtime?: Partial, +): void { + botMessageDispatchRuntimeForTest = runtime; +} /** Minimum chars before sending first streaming message (improves push notification UX) */ const DRAFT_MIN_INITIAL_CHARS = 30; @@ -149,6 +161,10 @@ export const dispatchTelegramMessage = async ({ telegramCfg, opts, }: DispatchTelegramMessageParams) => { + const botMessageDispatchRuntime = { + ...DEFAULT_BOT_MESSAGE_DISPATCH_RUNTIME, + ...botMessageDispatchRuntimeForTest, + }; const { ctxPayload, msg, @@ -535,7 +551,7 @@ export const dispatchTelegramMessage = async ({ let dispatchError: unknown; try { - ({ queuedFinal } = await dispatchReplyWithBufferedBlockDispatcher({ + ({ queuedFinal } = await botMessageDispatchRuntime.dispatchReplyWithBufferedBlockDispatcher({ ctx: ctxPayload, cfg, dispatcherOptions: { diff --git a/extensions/telegram/src/bot-native-commands.ts b/extensions/telegram/src/bot-native-commands.ts index 0e513131133..edbe98f146f 100644 --- a/extensions/telegram/src/bot-native-commands.ts +++ b/extensions/telegram/src/bot-native-commands.ts @@ -77,6 +77,19 @@ import { resolveTelegramGroupPromptSettings } from "./group-config-helpers.js"; import { buildInlineKeyboard } from "./send.js"; const EMPTY_RESPONSE_FALLBACK = "No response generated. Please try again."; +const DEFAULT_BOT_NATIVE_COMMANDS_RUNTIME = { + dispatchReplyWithBufferedBlockDispatcher, + listSkillCommandsForAgents, +}; +let botNativeCommandsRuntimeForTest: + | Partial + | undefined; + +export function setBotNativeCommandsRuntimeForTest( + runtime?: Partial, +): void { + botNativeCommandsRuntimeForTest = runtime; +} type TelegramNativeCommandContext = Context & { match?: string }; @@ -366,6 +379,10 @@ export const registerTelegramNativeCommands = ({ shouldSkipUpdate, opts, }: RegisterTelegramNativeCommandsParams) => { + const botNativeCommandsRuntime = { + ...DEFAULT_BOT_NATIVE_COMMANDS_RUNTIME, + ...botNativeCommandsRuntimeForTest, + }; const silentErrorReplies = telegramCfg.silentErrorReplies === true; const boundRoute = nativeEnabled && nativeSkillsEnabled @@ -378,7 +395,10 @@ export const registerTelegramNativeCommands = ({ } const skillCommands = nativeEnabled && nativeSkillsEnabled && boundRoute - ? listSkillCommandsForAgents({ cfg, agentIds: [boundRoute.agentId] }) + ? botNativeCommandsRuntime.listSkillCommandsForAgents({ + cfg, + agentIds: [boundRoute.agentId], + }) : []; const nativeCommands = nativeEnabled ? listNativeCommandSpecsForConfig(cfg, { @@ -756,7 +776,7 @@ export const registerTelegramNativeCommands = ({ accountId: route.accountId, }); - await dispatchReplyWithBufferedBlockDispatcher({ + await botNativeCommandsRuntime.dispatchReplyWithBufferedBlockDispatcher({ ctx: ctxPayload, cfg, dispatcherOptions: {