From 4620040ceb1c8acf45f72efc5e06124bc8fbcbdd Mon Sep 17 00:00:00 2001 From: Simon Kim Date: Fri, 20 Mar 2026 12:48:53 +0900 Subject: [PATCH] feat(telegram): batch document attachments sent in quick succession into a single agent turn Telegram sends each document as a separate message (no media_group_id). This change introduces a configurable time window (documentBatchWindowMs, default 1500ms) that collects consecutive document messages from the same sender in the same chat/thread and dispatches them as a single processMessage call, mirroring the existing media-group batching behavior. --- .../telegram/src/bot-handlers.runtime.ts | 111 ++++++++++++++++++ src/config/types.telegram.ts | 6 + src/config/zod-schema.providers-core.ts | 1 + 3 files changed, 118 insertions(+) diff --git a/extensions/telegram/src/bot-handlers.runtime.ts b/extensions/telegram/src/bot-handlers.runtime.ts index 00dc35041c9..1770446c8e8 100644 --- a/extensions/telegram/src/bot-handlers.runtime.ts +++ b/extensions/telegram/src/bot-handlers.runtime.ts @@ -136,6 +136,19 @@ export const registerTelegramHandlers = ({ const textFragmentBuffer = new Map(); let textFragmentProcessing: Promise = Promise.resolve(); + // Document batch buffer — collects document messages from the same sender + // within a short window and dispatches them as a single processMessage call. + const DEFAULT_DOCUMENT_BATCH_WINDOW_MS = 1500; + const DOCUMENT_BATCH_WINDOW_MS = + telegramCfg.documentBatchWindowMs ?? DEFAULT_DOCUMENT_BATCH_WINDOW_MS; + type DocumentBatchEntry = { + key: string; + messages: Array<{ msg: Message; ctx: TelegramContext }>; + timer: ReturnType; + }; + const documentBatchBuffer = new Map(); + let documentBatchProcessing: Promise = Promise.resolve(); + const debounceMs = resolveInboundDebounceMs({ cfg, channel: "telegram" }); const FORWARD_BURST_DEBOUNCE_MS = 80; type TelegramDebounceLane = "default" | "forward"; @@ -439,6 +452,76 @@ export const registerTelegramHandlers = ({ }, TELEGRAM_TEXT_FRAGMENT_MAX_GAP_MS); }; + const processDocumentBatch = async (entry: DocumentBatchEntry) => { + try { + entry.messages.sort((a, b) => a.msg.message_id - b.msg.message_id); + + const captionMsg = entry.messages.find((m) => m.msg.caption || m.msg.text); + const primaryEntry = captionMsg ?? entry.messages[0]; + + const allMedia: TelegramMediaRef[] = []; + + for (const { ctx: itemCtx, msg: itemMsg } of entry.messages) { + try { + const media = await resolveMedia(itemCtx, mediaMaxBytes, opts.token, telegramTransport); + if (media) { + allMedia.push({ + path: media.path, + contentType: media.contentType, + stickerMetadata: media.stickerMetadata, + }); + } + } catch (mediaErr) { + const errMsg = String(mediaErr); + if (isMediaSizeLimitError(mediaErr)) { + const limitMb = Math.round(mediaMaxBytes / (1024 * 1024)); + await withTelegramApiErrorLogging({ + operation: "sendMessage", + runtime, + fn: () => + bot.api.sendMessage( + itemMsg.chat.id, + `⚠️ File too large. Maximum size is ${limitMb}MB.`, + { reply_to_message_id: itemMsg.message_id }, + ), + }).catch(() => {}); + logger.warn( + { chatId: itemMsg.chat.id, error: errMsg }, + "document batch: media exceeds size limit", + ); + } else { + runtime.error?.(warn(`document batch: skipping file that failed to fetch: ${errMsg}`)); + } + } + } + + if ( + allMedia.length === 0 && + !(primaryEntry.msg.text ?? primaryEntry.msg.caption ?? "").trim() + ) { + return; + } + + const storeAllowFrom = await loadStoreAllowFrom(); + await processMessage(primaryEntry.ctx, allMedia, storeAllowFrom); + } catch (err) { + runtime.error?.(danger(`document batch handler failed: ${String(err)}`)); + } + }; + + const scheduleDocumentBatchFlush = (entry: DocumentBatchEntry) => { + clearTimeout(entry.timer); + entry.timer = setTimeout(async () => { + documentBatchBuffer.delete(entry.key); + documentBatchProcessing = documentBatchProcessing + .then(async () => { + await processDocumentBatch(entry); + }) + .catch(() => undefined); + await documentBatchProcessing; + }, DOCUMENT_BATCH_WINDOW_MS); + }; + const loadStoreAllowFrom = async () => telegramDeps.readChannelAllowFromStore("telegram", process.env, accountId).catch(() => []); @@ -973,6 +1056,34 @@ export const registerTelegramHandlers = ({ return; } + // Document batch handling — buffer individual document messages from the same sender. + // Telegram sends documents without media_group_id, so we collect them in a time window. + // Skip batching for messages without msg.from (channel_post) to prevent unrelated posts merging. + if ( + DOCUMENT_BATCH_WINDOW_MS > 0 && + (msg as { document?: unknown }).document && + !mediaGroupId && + msg.from?.id + ) { + const senderId = String(msg.from.id); + const docBatchKey = `doc:${chatId}:${resolvedThreadId ?? "main"}:${senderId}`; + const existing = documentBatchBuffer.get(docBatchKey); + if (existing) { + clearTimeout(existing.timer); + existing.messages.push({ msg, ctx }); + scheduleDocumentBatchFlush(existing); + } else { + const entry: DocumentBatchEntry = { + key: docBatchKey, + messages: [{ msg, ctx }], + timer: setTimeout(() => {}, DOCUMENT_BATCH_WINDOW_MS), + }; + documentBatchBuffer.set(docBatchKey, entry); + scheduleDocumentBatchFlush(entry); + } + return; + } + let media: Awaited> = null; try { media = await resolveMedia(ctx, mediaMaxBytes, opts.token, telegramTransport); diff --git a/src/config/types.telegram.ts b/src/config/types.telegram.ts index 71ded650deb..6a2b996960c 100644 --- a/src/config/types.telegram.ts +++ b/src/config/types.telegram.ts @@ -160,6 +160,12 @@ export type TelegramAccountConfig = { /** @deprecated Legacy key; migrated automatically to `streaming`. */ streamMode?: "off" | "partial" | "block"; mediaMaxMb?: number; + /** + * Batch window (ms) for document files sent in quick succession by the same sender. + * Telegram sends documents as individual messages (no media_group_id), so this buffer + * collects them into a single agent turn. Set to 0 to disable. Default: 1500. + */ + documentBatchWindowMs?: number; /** Telegram API client timeout in seconds (grammY ApiClientOptions). */ timeoutSeconds?: number; /** Retry policy for outbound Telegram API calls. */ diff --git a/src/config/zod-schema.providers-core.ts b/src/config/zod-schema.providers-core.ts index e65030d8f38..34601abe672 100644 --- a/src/config/zod-schema.providers-core.ts +++ b/src/config/zod-schema.providers-core.ts @@ -202,6 +202,7 @@ export const TelegramAccountSchemaBase = z // Legacy key kept for automatic migration to `streaming`. streamMode: z.enum(["off", "partial", "block"]).optional(), mediaMaxMb: z.number().positive().optional(), + documentBatchWindowMs: z.number().int().min(0).optional(), timeoutSeconds: z.number().int().positive().optional(), retry: RetryConfigSchema, network: z