test(telegram): add dispatch and handler seams

This commit is contained in:
Ayaan Zaidi 2026-03-18 09:47:26 +05:30
parent 6aaf0d0f24
commit edcf3e9d32
No known key found for this signature in database
3 changed files with 366 additions and 80 deletions

View File

@ -1,13 +1,12 @@
import type { Message, ReactionTypeEmoji } from "@grammyjs/types";
import { resolveAgentDir, resolveDefaultAgentId } from "openclaw/plugin-sdk/agent-runtime";
import { resolveDefaultModelForAgent } from "openclaw/plugin-sdk/agent-runtime";
import { shouldDebounceTextInbound } from "openclaw/plugin-sdk/channel-runtime";
import { resolveChannelConfigWrites } from "openclaw/plugin-sdk/channel-runtime";
import { loadConfig } from "openclaw/plugin-sdk/config-runtime";
import { writeConfigFile } from "openclaw/plugin-sdk/config-runtime";
import {
loadSessionStore,
resolveSessionStoreEntry,
resolveStorePath,
updateSessionStore,
} from "openclaw/plugin-sdk/config-runtime";
import type { DmPolicy } from "openclaw/plugin-sdk/config-runtime";
@ -17,21 +16,22 @@ import type {
TelegramTopicConfig,
} from "openclaw/plugin-sdk/config-runtime";
import { applyModelOverrideToSessionEntry } from "openclaw/plugin-sdk/config-runtime";
import { readChannelAllowFromStore } from "openclaw/plugin-sdk/conversation-runtime";
import {
buildPluginBindingResolvedText,
parsePluginBindingApprovalCustomId,
resolvePluginConversationBindingApproval,
} from "openclaw/plugin-sdk/conversation-runtime";
import { enqueueSystemEvent } from "openclaw/plugin-sdk/infra-runtime";
import { dispatchPluginInteractiveHandler } from "openclaw/plugin-sdk/plugin-runtime";
import {
createInboundDebouncer,
resolveInboundDebounceMs,
} from "openclaw/plugin-sdk/reply-runtime";
import { buildCommandsPaginationKeyboard } from "openclaw/plugin-sdk/reply-runtime";
import {
buildModelsProviderData,
formatModelsAvailableHeader,
} from "openclaw/plugin-sdk/reply-runtime";
import { resolveStoredModelOverride } from "openclaw/plugin-sdk/reply-runtime";
import { listSkillCommandsForAgents } from "openclaw/plugin-sdk/reply-runtime";
import { buildCommandsMessagePaginated } from "openclaw/plugin-sdk/reply-runtime";
import { resolveAgentRoute } from "openclaw/plugin-sdk/routing";
import { resolveThreadSessionKeys } from "openclaw/plugin-sdk/routing";
@ -42,19 +42,22 @@ import {
normalizeDmAllowFromWithStore,
type NormalizedAllowFrom,
} from "./bot-access.js";
import {
createTelegramInboundBufferRuntime,
type TextFragmentEntry,
} from "./bot-handlers.buffers.js";
import { defaultTelegramBotDeps } from "./bot-deps.js";
import {
APPROVE_CALLBACK_DATA_RE,
hasInboundMedia,
hasReplyTargetMedia,
isMediaSizeLimitError,
isRecoverableMediaGroupError,
resolveInboundMediaFileId,
} from "./bot-handlers.media.js";
import type { TelegramMediaRef } from "./bot-message-context.js";
import { RegisterTelegramHandlerParams } from "./bot-native-commands.js";
import { type TelegramUpdateKeyContext } from "./bot-updates.js";
import {
MEDIA_GROUP_TIMEOUT_MS,
type MediaGroupEntry,
type TelegramUpdateKeyContext,
} from "./bot-updates.js";
import { resolveMedia } from "./bot/delivery.js";
import {
getTelegramTextParts,
@ -90,7 +93,6 @@ import {
type ProviderInfo,
} from "./model-buttons.js";
import { buildInlineKeyboard } from "./send.js";
import { wasSentByBot } from "./sent-message-cache.js";
export const registerTelegramHandlers = ({
cfg,
@ -108,42 +110,161 @@ export const registerTelegramHandlers = ({
shouldSkipUpdate,
processMessage,
logger,
telegramDeps = defaultTelegramBotDeps,
}: RegisterTelegramHandlerParams) => {
const loadStoreAllowFrom = async () =>
readChannelAllowFromStore("telegram", process.env, accountId).catch(() => []);
const DEFAULT_TEXT_FRAGMENT_MAX_GAP_MS = 1500;
const TELEGRAM_TEXT_FRAGMENT_START_THRESHOLD_CHARS = 4000;
const TELEGRAM_TEXT_FRAGMENT_MAX_GAP_MS =
typeof opts.testTimings?.textFragmentGapMs === "number" &&
Number.isFinite(opts.testTimings.textFragmentGapMs)
? Math.max(10, Math.floor(opts.testTimings.textFragmentGapMs))
: DEFAULT_TEXT_FRAGMENT_MAX_GAP_MS;
const TELEGRAM_TEXT_FRAGMENT_MAX_ID_GAP = 1;
const TELEGRAM_TEXT_FRAGMENT_MAX_PARTS = 12;
const TELEGRAM_TEXT_FRAGMENT_MAX_TOTAL_CHARS = 50_000;
const mediaGroupTimeoutMs =
typeof opts.testTimings?.mediaGroupFlushMs === "number" &&
Number.isFinite(opts.testTimings.mediaGroupFlushMs)
? Math.max(10, Math.floor(opts.testTimings.mediaGroupFlushMs))
: MEDIA_GROUP_TIMEOUT_MS;
const {
buildSyntheticContext,
buildSyntheticTextMessage,
inboundDebouncer,
mediaGroupBuffer,
mediaGroupProcessing,
setMediaGroupProcessing,
mediaGroupTimeoutMs,
processMediaGroup,
textFragmentBuffer,
textFragmentProcessing,
setTextFragmentProcessing,
scheduleTextFragmentFlush,
flushTextFragments,
resolveReplyMediaForMessage,
resolveTelegramDebounceLane,
TELEGRAM_TEXT_FRAGMENT_MAX_GAP_MS,
TELEGRAM_TEXT_FRAGMENT_MAX_ID_GAP,
TELEGRAM_TEXT_FRAGMENT_MAX_PARTS,
TELEGRAM_TEXT_FRAGMENT_MAX_TOTAL_CHARS,
TELEGRAM_TEXT_FRAGMENT_START_THRESHOLD_CHARS,
} = createTelegramInboundBufferRuntime({
accountId,
bot,
cfg,
logger,
mediaMaxBytes,
opts,
processMessage,
loadStoreAllowFrom,
runtime,
telegramTransport,
const mediaGroupBuffer = new Map<string, MediaGroupEntry>();
let mediaGroupProcessing: Promise<void> = Promise.resolve();
type TextFragmentEntry = {
key: string;
messages: Array<{ msg: Message; ctx: TelegramContext; receivedAtMs: number }>;
timer: ReturnType<typeof setTimeout>;
};
const textFragmentBuffer = new Map<string, TextFragmentEntry>();
let textFragmentProcessing: Promise<void> = Promise.resolve();
const debounceMs = resolveInboundDebounceMs({ cfg, channel: "telegram" });
const FORWARD_BURST_DEBOUNCE_MS = 80;
type TelegramDebounceLane = "default" | "forward";
type TelegramDebounceEntry = {
ctx: TelegramContext;
msg: Message;
allMedia: TelegramMediaRef[];
storeAllowFrom: string[];
debounceKey: string | null;
debounceLane: TelegramDebounceLane;
botUsername?: string;
};
const resolveTelegramDebounceLane = (msg: Message): TelegramDebounceLane => {
const forwardMeta = msg as {
forward_origin?: unknown;
forward_from?: unknown;
forward_from_chat?: unknown;
forward_sender_name?: unknown;
forward_date?: unknown;
};
return (forwardMeta.forward_origin ??
forwardMeta.forward_from ??
forwardMeta.forward_from_chat ??
forwardMeta.forward_sender_name ??
forwardMeta.forward_date)
? "forward"
: "default";
};
const buildSyntheticTextMessage = (params: {
base: Message;
text: string;
date?: number;
from?: Message["from"];
}): Message => ({
...params.base,
...(params.from ? { from: params.from } : {}),
text: params.text,
caption: undefined,
caption_entities: undefined,
entities: undefined,
...(params.date != null ? { date: params.date } : {}),
});
const buildSyntheticContext = (
ctx: Pick<TelegramContext, "me"> & { getFile?: unknown },
message: Message,
): TelegramContext => {
const getFile =
typeof ctx.getFile === "function"
? (ctx.getFile as TelegramContext["getFile"]).bind(ctx as object)
: async () => ({});
return { message, me: ctx.me, getFile };
};
const inboundDebouncer = createInboundDebouncer<TelegramDebounceEntry>({
debounceMs,
resolveDebounceMs: (entry) =>
entry.debounceLane === "forward" ? FORWARD_BURST_DEBOUNCE_MS : debounceMs,
buildKey: (entry) => entry.debounceKey,
shouldDebounce: (entry) => {
const text = entry.msg.text ?? entry.msg.caption ?? "";
const hasDebounceableText = shouldDebounceTextInbound({
text,
cfg,
commandOptions: { botUsername: entry.botUsername },
});
if (entry.debounceLane === "forward") {
// Forwarded bursts often split text + media into adjacent updates.
// Debounce media-only forward entries too so they can coalesce.
return hasDebounceableText || entry.allMedia.length > 0;
}
if (!hasDebounceableText) {
return false;
}
return entry.allMedia.length === 0;
},
onFlush: async (entries) => {
const last = entries.at(-1);
if (!last) {
return;
}
if (entries.length === 1) {
const replyMedia = await resolveReplyMediaForMessage(last.ctx, last.msg);
await processMessage(last.ctx, last.allMedia, last.storeAllowFrom, undefined, replyMedia);
return;
}
const combinedText = entries
.map((entry) => entry.msg.text ?? entry.msg.caption ?? "")
.filter(Boolean)
.join("\n");
const combinedMedia = entries.flatMap((entry) => entry.allMedia);
if (!combinedText.trim() && combinedMedia.length === 0) {
return;
}
const first = entries[0];
const baseCtx = first.ctx;
const syntheticMessage = buildSyntheticTextMessage({
base: first.msg,
text: combinedText,
date: last.msg.date ?? first.msg.date,
});
const messageIdOverride = last.msg.message_id ? String(last.msg.message_id) : undefined;
const syntheticCtx = buildSyntheticContext(baseCtx, syntheticMessage);
const replyMedia = await resolveReplyMediaForMessage(baseCtx, syntheticMessage);
await processMessage(
syntheticCtx,
combinedMedia,
first.storeAllowFrom,
messageIdOverride ? { messageIdOverride } : undefined,
replyMedia,
);
},
onError: (err, items) => {
runtime.error?.(danger(`telegram debounce flush failed: ${String(err)}`));
const chatId = items[0]?.msg.chat.id;
if (chatId != null) {
const threadId = items[0]?.msg.message_thread_id;
void bot.api
.sendMessage(
chatId,
"Something went wrong while processing your message. Please try again.",
threadId != null ? { message_thread_id: threadId } : undefined,
)
.catch((sendErr) => {
logVerbose(`telegram: error fallback send failed: ${String(sendErr)}`);
});
}
},
});
const resolveTelegramSessionState = (params: {
@ -190,7 +311,9 @@ export const registerTelegramHandlers = ({
? resolveThreadSessionKeys({ baseSessionKey, threadId: `${params.chatId}:${dmThreadId}` })
: null;
const sessionKey = threadKeys?.sessionKey ?? baseSessionKey;
const storePath = resolveStorePath(cfg.session?.store, { agentId: route.agentId });
const storePath = telegramDeps.resolveStorePath(cfg.session?.store, {
agentId: route.agentId,
});
const store = loadSessionStore(storePath);
const entry = resolveSessionStoreEntry({ store, sessionKey }).existing;
const storedOverride = resolveStoredModelOverride({
@ -227,6 +350,139 @@ export const registerTelegramHandlers = ({
};
};
const processMediaGroup = async (entry: MediaGroupEntry) => {
try {
entry.messages.sort((a, b) => a.msg.message_id - b.msg.message_id);
const captionMsg = entry.messages.find((m) => m.msg.caption || m.msg.text);
const primaryEntry = captionMsg ?? entry.messages[0];
const allMedia: TelegramMediaRef[] = [];
for (const { ctx } of entry.messages) {
let media;
try {
media = await resolveMedia(ctx, mediaMaxBytes, opts.token, telegramTransport);
} catch (mediaErr) {
if (!isRecoverableMediaGroupError(mediaErr)) {
throw mediaErr;
}
runtime.log?.(
warn(`media group: skipping photo that failed to fetch: ${String(mediaErr)}`),
);
continue;
}
if (media) {
allMedia.push({
path: media.path,
contentType: media.contentType,
stickerMetadata: media.stickerMetadata,
});
}
}
const storeAllowFrom = await loadStoreAllowFrom();
const replyMedia = await resolveReplyMediaForMessage(primaryEntry.ctx, primaryEntry.msg);
await processMessage(primaryEntry.ctx, allMedia, storeAllowFrom, undefined, replyMedia);
} catch (err) {
runtime.error?.(danger(`media group handler failed: ${String(err)}`));
}
};
const flushTextFragments = async (entry: TextFragmentEntry) => {
try {
entry.messages.sort((a, b) => a.msg.message_id - b.msg.message_id);
const first = entry.messages[0];
const last = entry.messages.at(-1);
if (!first || !last) {
return;
}
const combinedText = entry.messages.map((m) => m.msg.text ?? "").join("");
if (!combinedText.trim()) {
return;
}
const syntheticMessage = buildSyntheticTextMessage({
base: first.msg,
text: combinedText,
date: last.msg.date ?? first.msg.date,
});
const storeAllowFrom = await loadStoreAllowFrom();
const baseCtx = first.ctx;
await processMessage(buildSyntheticContext(baseCtx, syntheticMessage), [], storeAllowFrom, {
messageIdOverride: String(last.msg.message_id),
});
} catch (err) {
runtime.error?.(danger(`text fragment handler failed: ${String(err)}`));
}
};
const queueTextFragmentFlush = async (entry: TextFragmentEntry) => {
textFragmentProcessing = textFragmentProcessing
.then(async () => {
await flushTextFragments(entry);
})
.catch(() => undefined);
await textFragmentProcessing;
};
const runTextFragmentFlush = async (entry: TextFragmentEntry) => {
textFragmentBuffer.delete(entry.key);
await queueTextFragmentFlush(entry);
};
const scheduleTextFragmentFlush = (entry: TextFragmentEntry) => {
clearTimeout(entry.timer);
entry.timer = setTimeout(async () => {
await runTextFragmentFlush(entry);
}, TELEGRAM_TEXT_FRAGMENT_MAX_GAP_MS);
};
const loadStoreAllowFrom = async () =>
telegramDeps.readChannelAllowFromStore("telegram", process.env, accountId).catch(() => []);
const resolveReplyMediaForMessage = async (
ctx: TelegramContext,
msg: Message,
): Promise<TelegramMediaRef[]> => {
const replyMessage = msg.reply_to_message;
if (!replyMessage || !hasInboundMedia(replyMessage)) {
return [];
}
const replyFileId = resolveInboundMediaFileId(replyMessage);
if (!replyFileId) {
return [];
}
try {
const media = await resolveMedia(
{
message: replyMessage,
me: ctx.me,
getFile: async () => await bot.api.getFile(replyFileId),
},
mediaMaxBytes,
opts.token,
telegramTransport,
);
if (!media) {
return [];
}
return [
{
path: media.path,
contentType: media.contentType,
stickerMetadata: media.stickerMetadata,
},
];
} catch (err) {
logger.warn({ chatId: msg.chat.id, error: String(err) }, "reply media fetch failed");
return [];
}
};
const isAllowlistAuthorized = (
allow: NormalizedAllowFrom,
senderId: string,
@ -502,7 +758,7 @@ export const registerTelegramHandlers = ({
if (user?.is_bot) {
return;
}
if (reactionMode === "own" && !wasSentByBot(chatId, messageId)) {
if (reactionMode === "own" && !telegramDeps.wasSentByBot(chatId, messageId)) {
return;
}
const eventAuthContext = await resolveTelegramEventAuthorizationContext({
@ -577,7 +833,7 @@ export const registerTelegramHandlers = ({
const parentPeer = buildTelegramParentPeer({ isGroup, resolvedThreadId, chatId });
// Fresh config for bindings lookup; other routing inputs are payload-derived.
const route = resolveAgentRoute({
cfg: loadConfig(),
cfg: telegramDeps.loadConfig(),
channel: "telegram",
accountId,
peer: { kind: isGroup ? "group" : "direct", id: peerId },
@ -589,7 +845,7 @@ export const registerTelegramHandlers = ({
for (const r of addedReactions) {
const emoji = r.emoji;
const text = `Telegram reaction added: ${emoji} by ${senderLabel} on msg ${messageId}`;
enqueueSystemEvent(text, {
telegramDeps.enqueueSystemEvent(text, {
sessionKey,
contextKey: `telegram:reaction:add:${chatId}:${messageId}:${user?.id ?? "anon"}:${emoji}`,
});
@ -663,14 +919,12 @@ export const registerTelegramHandlers = ({
// Not appendable (or limits exceeded): flush buffered entry first, then continue normally.
clearTimeout(existing.timer);
textFragmentBuffer.delete(key);
setTextFragmentProcessing(
textFragmentProcessing()
.then(async () => {
await flushTextFragments(existing);
})
.catch(() => undefined),
);
await textFragmentProcessing();
textFragmentProcessing = textFragmentProcessing
.then(async () => {
await flushTextFragments(existing);
})
.catch(() => undefined);
await textFragmentProcessing;
}
const shouldStart = text.length >= TELEGRAM_TEXT_FRAGMENT_START_THRESHOLD_CHARS;
@ -695,28 +949,24 @@ export const registerTelegramHandlers = ({
existing.messages.push({ msg, ctx });
existing.timer = setTimeout(async () => {
mediaGroupBuffer.delete(mediaGroupId);
setMediaGroupProcessing(
mediaGroupProcessing()
.then(async () => {
await processMediaGroup(existing);
})
.catch(() => undefined),
);
await mediaGroupProcessing();
mediaGroupProcessing = mediaGroupProcessing
.then(async () => {
await processMediaGroup(existing);
})
.catch(() => undefined);
await mediaGroupProcessing;
}, mediaGroupTimeoutMs);
} else {
const entry = {
const entry: MediaGroupEntry = {
messages: [{ msg, ctx }],
timer: setTimeout(async () => {
mediaGroupBuffer.delete(mediaGroupId);
setMediaGroupProcessing(
mediaGroupProcessing()
.then(async () => {
await processMediaGroup(entry);
})
.catch(() => undefined),
);
await mediaGroupProcessing();
mediaGroupProcessing = mediaGroupProcessing
.then(async () => {
await processMediaGroup(entry);
})
.catch(() => undefined);
await mediaGroupProcessing;
}, mediaGroupTimeoutMs),
};
mediaGroupBuffer.set(mediaGroupId, entry);
@ -1051,7 +1301,7 @@ export const registerTelegramHandlers = ({
}
const agentId = paginationMatch[2]?.trim() || resolveDefaultAgentId(cfg);
const skillCommands = listSkillCommandsForAgents({
const skillCommands = telegramDeps.listSkillCommandsForAgents({
cfg,
agentIds: [agentId],
});
@ -1208,7 +1458,7 @@ export const registerTelegramHandlers = ({
// Directly set model override in session
try {
// Get session store path
const storePath = resolveStorePath(cfg.session?.store, {
const storePath = telegramDeps.resolveStorePath(cfg.session?.store, {
agentId: sessionState.agentId,
});
@ -1288,7 +1538,7 @@ export const registerTelegramHandlers = ({
}
// Check if old chat ID has config and migrate it
const currentConfig = loadConfig();
const currentConfig = telegramDeps.loadConfig();
const migration = migrateTelegramGroupConfig({
cfg: currentConfig,
accountId,

View File

@ -52,6 +52,18 @@ import { editMessageTelegram } from "./send.js";
import { cacheSticker, describeStickerImage } from "./sticker-cache.js";
const EMPTY_RESPONSE_FALLBACK = "No response generated. Please try again.";
const DEFAULT_BOT_MESSAGE_DISPATCH_RUNTIME = {
dispatchReplyWithBufferedBlockDispatcher,
};
let botMessageDispatchRuntimeForTest:
| Partial<typeof DEFAULT_BOT_MESSAGE_DISPATCH_RUNTIME>
| undefined;
export function setBotMessageDispatchRuntimeForTest(
runtime?: Partial<typeof DEFAULT_BOT_MESSAGE_DISPATCH_RUNTIME>,
): void {
botMessageDispatchRuntimeForTest = runtime;
}
/** Minimum chars before sending first streaming message (improves push notification UX) */
const DRAFT_MIN_INITIAL_CHARS = 30;
@ -149,6 +161,10 @@ export const dispatchTelegramMessage = async ({
telegramCfg,
opts,
}: DispatchTelegramMessageParams) => {
const botMessageDispatchRuntime = {
...DEFAULT_BOT_MESSAGE_DISPATCH_RUNTIME,
...botMessageDispatchRuntimeForTest,
};
const {
ctxPayload,
msg,
@ -535,7 +551,7 @@ export const dispatchTelegramMessage = async ({
let dispatchError: unknown;
try {
({ queuedFinal } = await dispatchReplyWithBufferedBlockDispatcher({
({ queuedFinal } = await botMessageDispatchRuntime.dispatchReplyWithBufferedBlockDispatcher({
ctx: ctxPayload,
cfg,
dispatcherOptions: {

View File

@ -77,6 +77,19 @@ import { resolveTelegramGroupPromptSettings } from "./group-config-helpers.js";
import { buildInlineKeyboard } from "./send.js";
const EMPTY_RESPONSE_FALLBACK = "No response generated. Please try again.";
const DEFAULT_BOT_NATIVE_COMMANDS_RUNTIME = {
dispatchReplyWithBufferedBlockDispatcher,
listSkillCommandsForAgents,
};
let botNativeCommandsRuntimeForTest:
| Partial<typeof DEFAULT_BOT_NATIVE_COMMANDS_RUNTIME>
| undefined;
export function setBotNativeCommandsRuntimeForTest(
runtime?: Partial<typeof DEFAULT_BOT_NATIVE_COMMANDS_RUNTIME>,
): void {
botNativeCommandsRuntimeForTest = runtime;
}
type TelegramNativeCommandContext = Context & { match?: string };
@ -366,6 +379,10 @@ export const registerTelegramNativeCommands = ({
shouldSkipUpdate,
opts,
}: RegisterTelegramNativeCommandsParams) => {
const botNativeCommandsRuntime = {
...DEFAULT_BOT_NATIVE_COMMANDS_RUNTIME,
...botNativeCommandsRuntimeForTest,
};
const silentErrorReplies = telegramCfg.silentErrorReplies === true;
const boundRoute =
nativeEnabled && nativeSkillsEnabled
@ -378,7 +395,10 @@ export const registerTelegramNativeCommands = ({
}
const skillCommands =
nativeEnabled && nativeSkillsEnabled && boundRoute
? listSkillCommandsForAgents({ cfg, agentIds: [boundRoute.agentId] })
? botNativeCommandsRuntime.listSkillCommandsForAgents({
cfg,
agentIds: [boundRoute.agentId],
})
: [];
const nativeCommands = nativeEnabled
? listNativeCommandSpecsForConfig(cfg, {
@ -756,7 +776,7 @@ export const registerTelegramNativeCommands = ({
accountId: route.accountId,
});
await dispatchReplyWithBufferedBlockDispatcher({
await botNativeCommandsRuntime.dispatchReplyWithBufferedBlockDispatcher({
ctx: ctxPayload,
cfg,
dispatcherOptions: {