feat(telegram): batch document attachments sent in quick succession into a single agent turn

Telegram sends each document as a separate message (no media_group_id).
This change introduces a configurable time window (documentBatchWindowMs,
default 1500ms) that collects consecutive document messages from the same
sender in the same chat/thread and dispatches them as a single processMessage
call, mirroring the existing media-group batching behavior.
This commit is contained in:
Simon Kim 2026-03-20 12:48:53 +09:00
parent a73e517ae3
commit 4620040ceb
3 changed files with 118 additions and 0 deletions

View File

@ -136,6 +136,19 @@ export const registerTelegramHandlers = ({
const textFragmentBuffer = new Map<string, TextFragmentEntry>();
let textFragmentProcessing: Promise<void> = Promise.resolve();
// Document batch buffer — collects document messages from the same sender
// within a short window and dispatches them as a single processMessage call.
const DEFAULT_DOCUMENT_BATCH_WINDOW_MS = 1500;
const DOCUMENT_BATCH_WINDOW_MS =
telegramCfg.documentBatchWindowMs ?? DEFAULT_DOCUMENT_BATCH_WINDOW_MS;
type DocumentBatchEntry = {
key: string;
messages: Array<{ msg: Message; ctx: TelegramContext }>;
timer: ReturnType<typeof setTimeout>;
};
const documentBatchBuffer = new Map<string, DocumentBatchEntry>();
let documentBatchProcessing: Promise<void> = Promise.resolve();
const debounceMs = resolveInboundDebounceMs({ cfg, channel: "telegram" });
const FORWARD_BURST_DEBOUNCE_MS = 80;
type TelegramDebounceLane = "default" | "forward";
@ -439,6 +452,76 @@ export const registerTelegramHandlers = ({
}, TELEGRAM_TEXT_FRAGMENT_MAX_GAP_MS);
};
const processDocumentBatch = async (entry: DocumentBatchEntry) => {
try {
entry.messages.sort((a, b) => a.msg.message_id - b.msg.message_id);
const captionMsg = entry.messages.find((m) => m.msg.caption || m.msg.text);
const primaryEntry = captionMsg ?? entry.messages[0];
const allMedia: TelegramMediaRef[] = [];
for (const { ctx: itemCtx, msg: itemMsg } of entry.messages) {
try {
const media = await resolveMedia(itemCtx, mediaMaxBytes, opts.token, telegramTransport);
if (media) {
allMedia.push({
path: media.path,
contentType: media.contentType,
stickerMetadata: media.stickerMetadata,
});
}
} catch (mediaErr) {
const errMsg = String(mediaErr);
if (isMediaSizeLimitError(mediaErr)) {
const limitMb = Math.round(mediaMaxBytes / (1024 * 1024));
await withTelegramApiErrorLogging({
operation: "sendMessage",
runtime,
fn: () =>
bot.api.sendMessage(
itemMsg.chat.id,
`⚠️ File too large. Maximum size is ${limitMb}MB.`,
{ reply_to_message_id: itemMsg.message_id },
),
}).catch(() => {});
logger.warn(
{ chatId: itemMsg.chat.id, error: errMsg },
"document batch: media exceeds size limit",
);
} else {
runtime.error?.(warn(`document batch: skipping file that failed to fetch: ${errMsg}`));
}
}
}
if (
allMedia.length === 0 &&
!(primaryEntry.msg.text ?? primaryEntry.msg.caption ?? "").trim()
) {
return;
}
const storeAllowFrom = await loadStoreAllowFrom();
await processMessage(primaryEntry.ctx, allMedia, storeAllowFrom);
} catch (err) {
runtime.error?.(danger(`document batch handler failed: ${String(err)}`));
}
};
const scheduleDocumentBatchFlush = (entry: DocumentBatchEntry) => {
clearTimeout(entry.timer);
entry.timer = setTimeout(async () => {
documentBatchBuffer.delete(entry.key);
documentBatchProcessing = documentBatchProcessing
.then(async () => {
await processDocumentBatch(entry);
})
.catch(() => undefined);
await documentBatchProcessing;
}, DOCUMENT_BATCH_WINDOW_MS);
};
const loadStoreAllowFrom = async () =>
telegramDeps.readChannelAllowFromStore("telegram", process.env, accountId).catch(() => []);
@ -973,6 +1056,34 @@ export const registerTelegramHandlers = ({
return;
}
// Document batch handling — buffer individual document messages from the same sender.
// Telegram sends documents without media_group_id, so we collect them in a time window.
// Skip batching for messages without msg.from (channel_post) to prevent unrelated posts merging.
if (
DOCUMENT_BATCH_WINDOW_MS > 0 &&
(msg as { document?: unknown }).document &&
!mediaGroupId &&
msg.from?.id
) {
const senderId = String(msg.from.id);
const docBatchKey = `doc:${chatId}:${resolvedThreadId ?? "main"}:${senderId}`;
const existing = documentBatchBuffer.get(docBatchKey);
if (existing) {
clearTimeout(existing.timer);
existing.messages.push({ msg, ctx });
scheduleDocumentBatchFlush(existing);
} else {
const entry: DocumentBatchEntry = {
key: docBatchKey,
messages: [{ msg, ctx }],
timer: setTimeout(() => {}, DOCUMENT_BATCH_WINDOW_MS),
};
documentBatchBuffer.set(docBatchKey, entry);
scheduleDocumentBatchFlush(entry);
}
return;
}
let media: Awaited<ReturnType<typeof resolveMedia>> = null;
try {
media = await resolveMedia(ctx, mediaMaxBytes, opts.token, telegramTransport);

View File

@ -160,6 +160,12 @@ export type TelegramAccountConfig = {
/** @deprecated Legacy key; migrated automatically to `streaming`. */
streamMode?: "off" | "partial" | "block";
mediaMaxMb?: number;
/**
* Batch window (ms) for document files sent in quick succession by the same sender.
* Telegram sends documents as individual messages (no media_group_id), so this buffer
* collects them into a single agent turn. Set to 0 to disable. Default: 1500.
*/
documentBatchWindowMs?: number;
/** Telegram API client timeout in seconds (grammY ApiClientOptions). */
timeoutSeconds?: number;
/** Retry policy for outbound Telegram API calls. */

View File

@ -202,6 +202,7 @@ export const TelegramAccountSchemaBase = z
// Legacy key kept for automatic migration to `streaming`.
streamMode: z.enum(["off", "partial", "block"]).optional(),
mediaMaxMb: z.number().positive().optional(),
documentBatchWindowMs: z.number().int().min(0).optional(),
timeoutSeconds: z.number().int().positive().optional(),
retry: RetryConfigSchema,
network: z