diff --git a/docs/start/hubs.md b/docs/start/hubs.md index 7e530f769b5..f7653a5ac6b 100644 --- a/docs/start/hubs.md +++ b/docs/start/hubs.md @@ -166,7 +166,7 @@ Use these hubs to discover every page, including deep dives and reference docs t - [Plugins overview](/tools/plugin) - [Building extensions](/plugins/building-extensions) - [Plugin manifest](/plugins/manifest) -- [Agent tools](/plugins/agent-tools) +- [Plugin API overview](/tools/plugin#plugin-api-overview) - [Plugin bundles](/plugins/bundles) - [Community plugins](/plugins/community) - [Capability cookbook](/tools/capability-cookbook) diff --git a/docs/tools/lobster.md b/docs/tools/lobster.md index 6e502c09c19..24023bf5c80 100644 --- a/docs/tools/lobster.md +++ b/docs/tools/lobster.md @@ -330,7 +330,7 @@ OpenProse pairs well with Lobster: use `/prose` to orchestrate multi-agent prep, ## Learn more - [Plugins](/tools/plugin) -- [Plugin tool authoring](/plugins/agent-tools) +- [Plugin API overview](/tools/plugin#plugin-api-overview) ## Case study: community workflows diff --git a/docs/tools/plugin.md b/docs/tools/plugin.md index 7f1ba0fade4..906a8843c2b 100644 --- a/docs/tools/plugin.md +++ b/docs/tools/plugin.md @@ -332,6 +332,6 @@ See [Plugin manifest](/plugins/manifest) for the manifest file format. - [Building extensions](/plugins/building-extensions) - [Plugin bundles](/plugins/bundles) - [Plugin manifest](/plugins/manifest) -- [Plugin agent tools](/plugins/agent-tools) +- [Plugin agent tools](/tools/plugin#plugin-api-overview) - [Capability Cookbook](/tools/capability-cookbook) - [Community plugins](/plugins/community) diff --git a/extensions/matrix/src/matrix/monitor/handler.media-failure.test.ts b/extensions/matrix/src/matrix/monitor/handler.media-failure.test.ts index 45c7484d3ca..361dc5cb158 100644 --- a/extensions/matrix/src/matrix/monitor/handler.media-failure.test.ts +++ b/extensions/matrix/src/matrix/monitor/handler.media-failure.test.ts @@ -54,6 +54,7 @@ function createHandlerHarness() { replyOptions: {}, markDispatchIdle: vi.fn(), }), + withReplyDispatcher: vi.fn(async (_dispatcher, fn) => await fn()), resolveHumanDelayConfig: vi.fn().mockReturnValue(undefined), dispatchReplyFromConfig: vi .fn() diff --git a/extensions/telegram/src/bot-handlers.runtime.ts b/extensions/telegram/src/bot-handlers.runtime.ts index 1770446c8e8..170864de7eb 100644 --- a/extensions/telegram/src/bot-handlers.runtime.ts +++ b/extensions/telegram/src/bot-handlers.runtime.ts @@ -143,7 +143,7 @@ export const registerTelegramHandlers = ({ telegramCfg.documentBatchWindowMs ?? DEFAULT_DOCUMENT_BATCH_WINDOW_MS; type DocumentBatchEntry = { key: string; - messages: Array<{ msg: Message; ctx: TelegramContext }>; + messages: Array<{ msg: Message; ctx: TelegramContext; debounceLane: TelegramDebounceLane }>; timer: ReturnType; }; const documentBatchBuffer = new Map(); @@ -453,60 +453,88 @@ export const registerTelegramHandlers = ({ }; const processDocumentBatch = async (entry: DocumentBatchEntry) => { - try { - entry.messages.sort((a, b) => a.msg.message_id - b.msg.message_id); + entry.messages.sort((a, b) => a.msg.message_id - b.msg.message_id); - const captionMsg = entry.messages.find((m) => m.msg.caption || m.msg.text); - const primaryEntry = captionMsg ?? entry.messages[0]; + const captionMsg = entry.messages.find((m) => m.msg.caption || m.msg.text); + const primaryEntry = captionMsg ?? entry.messages[0]; + if (!primaryEntry) { + return; + } - const allMedia: TelegramMediaRef[] = []; + const storeAllowFrom = await loadStoreAllowFrom(); + const senderId = + primaryEntry.msg.from?.id != null + ? String(primaryEntry.msg.from.id) + : primaryEntry.msg.sender_chat?.id != null + ? `sender_chat:${primaryEntry.msg.sender_chat.id}` + : ""; + const replyMedia = await resolveReplyMediaForMessage(primaryEntry.ctx, primaryEntry.msg); - for (const { ctx: itemCtx, msg: itemMsg } of entry.messages) { - try { - const media = await resolveMedia(itemCtx, mediaMaxBytes, opts.token, telegramTransport); - if (media) { - allMedia.push({ - path: media.path, - contentType: media.contentType, - stickerMetadata: media.stickerMetadata, - }); - } - } catch (mediaErr) { - const errMsg = String(mediaErr); - if (isMediaSizeLimitError(mediaErr)) { - const limitMb = Math.round(mediaMaxBytes / (1024 * 1024)); - await withTelegramApiErrorLogging({ - operation: "sendMessage", - runtime, - fn: () => - bot.api.sendMessage( - itemMsg.chat.id, - `⚠️ File too large. Maximum size is ${limitMb}MB.`, - { reply_to_message_id: itemMsg.message_id }, - ), - }).catch(() => {}); - logger.warn( - { chatId: itemMsg.chat.id, error: errMsg }, - "document batch: media exceeds size limit", - ); - } else { - runtime.error?.(warn(`document batch: skipping file that failed to fetch: ${errMsg}`)); - } + const allMedia: TelegramMediaRef[] = []; + for (const { ctx: itemCtx, msg: itemMsg } of entry.messages) { + try { + const media = await resolveMedia(itemCtx, mediaMaxBytes, opts.token, telegramTransport); + if (media) { + allMedia.push({ + path: media.path, + contentType: media.contentType, + stickerMetadata: media.stickerMetadata, + }); + continue; + } + } catch (mediaErr) { + const errMsg = String(mediaErr); + if (isMediaSizeLimitError(mediaErr)) { + const limitMb = Math.round(mediaMaxBytes / (1024 * 1024)); + await withTelegramApiErrorLogging({ + operation: "sendMessage", + runtime, + fn: () => + bot.api.sendMessage( + itemMsg.chat.id, + `⚠️ File too large. Maximum size is ${limitMb}MB.`, + { + reply_to_message_id: itemMsg.message_id, + }, + ), + }).catch(() => {}); + logger.warn( + { chatId: itemMsg.chat.id, error: errMsg }, + "document batch: media exceeds size limit", + ); + } else { + runtime.error?.( + warn(`document batch: preserving placeholder for file that failed to fetch: ${errMsg}`), + ); } } - if ( - allMedia.length === 0 && - !(primaryEntry.msg.text ?? primaryEntry.msg.caption ?? "").trim() - ) { - return; - } - - const storeAllowFrom = await loadStoreAllowFrom(); - await processMessage(primaryEntry.ctx, allMedia, storeAllowFrom); - } catch (err) { - runtime.error?.(danger(`document batch handler failed: ${String(err)}`)); + const placeholderMedia = await resolveReplyMediaForMessage(itemCtx, itemMsg); + await processMessage(itemCtx, [], storeAllowFrom, undefined, placeholderMedia); } + + const captionText = primaryEntry.msg.text ?? primaryEntry.msg.caption ?? ""; + if (!captionText.trim() && allMedia.length === 0) { + return; + } + + const conversationThreadId = primaryEntry.msg.message_thread_id; + const conversationKey = + conversationThreadId != null + ? `${primaryEntry.msg.chat.id}:topic:${conversationThreadId}` + : String(primaryEntry.msg.chat.id); + const debounceKey = senderId + ? `telegram:${accountId ?? "default"}:${conversationKey}:${senderId}:${primaryEntry.debounceLane}` + : null; + await inboundDebouncer.enqueue({ + ctx: primaryEntry.ctx, + msg: primaryEntry.msg, + allMedia, + storeAllowFrom, + debounceKey, + debounceLane: primaryEntry.debounceLane, + botUsername: primaryEntry.ctx.me?.username, + }); }; const scheduleDocumentBatchFlush = (entry: DocumentBatchEntry) => { @@ -1058,24 +1086,29 @@ export const registerTelegramHandlers = ({ // Document batch handling — buffer individual document messages from the same sender. // Telegram sends documents without media_group_id, so we collect them in a time window. - // Skip batching for messages without msg.from (channel_post) to prevent unrelated posts merging. + const documentSenderId = + msg.from?.id != null + ? String(msg.from.id) + : msg.sender_chat?.id != null + ? `sender_chat:${msg.sender_chat.id}` + : null; if ( DOCUMENT_BATCH_WINDOW_MS > 0 && (msg as { document?: unknown }).document && !mediaGroupId && - msg.from?.id + documentSenderId ) { - const senderId = String(msg.from.id); - const docBatchKey = `doc:${chatId}:${resolvedThreadId ?? "main"}:${senderId}`; + const debounceLane = resolveTelegramDebounceLane(msg); + const docBatchKey = `doc:${chatId}:${resolvedThreadId ?? "main"}:${documentSenderId}:${debounceLane}`; const existing = documentBatchBuffer.get(docBatchKey); if (existing) { clearTimeout(existing.timer); - existing.messages.push({ msg, ctx }); + existing.messages.push({ msg, ctx, debounceLane }); scheduleDocumentBatchFlush(existing); } else { const entry: DocumentBatchEntry = { key: docBatchKey, - messages: [{ msg, ctx }], + messages: [{ msg, ctx, debounceLane }], timer: setTimeout(() => {}, DOCUMENT_BATCH_WINDOW_MS), }; documentBatchBuffer.set(docBatchKey, entry); @@ -1084,6 +1117,26 @@ export const registerTelegramHandlers = ({ return; } + if (!(msg as { document?: unknown }).document && documentSenderId) { + const forwardLane = resolveTelegramDebounceLane(msg); + for (const [key, pendingEntry] of documentBatchBuffer) { + if (!key.startsWith(`doc:${chatId}:${resolvedThreadId ?? "main"}:${documentSenderId}:`)) { + continue; + } + if (pendingEntry.messages.some((item) => item.debounceLane !== forwardLane)) { + continue; + } + documentBatchBuffer.delete(key); + clearTimeout(pendingEntry.timer); + documentBatchProcessing = documentBatchProcessing + .then(async () => { + await processDocumentBatch(pendingEntry); + }) + .catch(() => undefined); + } + await documentBatchProcessing; + } + let media: Awaited> = null; try { media = await resolveMedia(ctx, mediaMaxBytes, opts.token, telegramTransport); diff --git a/src/cli/plugin-registry.test.ts b/src/cli/plugin-registry.test.ts index 76c2eeebe57..629de0a740b 100644 --- a/src/cli/plugin-registry.test.ts +++ b/src/cli/plugin-registry.test.ts @@ -86,11 +86,36 @@ describe("ensurePluginRegistryLoaded", () => { expect(mocks.loadOpenClawPlugins).toHaveBeenCalledTimes(2); expect(mocks.loadOpenClawPlugins).toHaveBeenNthCalledWith( 1, - expect.objectContaining({ onlyPluginIds: ["telegram"], throwOnLoadError: true }), + expect.objectContaining({ + config: expect.objectContaining({ + channels: { telegram: { enabled: true } }, + plugins: { enabled: true }, + }), + workspaceDir: "/tmp/workspace", + logger: expect.objectContaining({ + info: expect.any(Function), + warn: expect.any(Function), + error: expect.any(Function), + debug: expect.any(Function), + }), + onlyPluginIds: ["telegram"], + throwOnLoadError: true, + }), ); expect(mocks.loadOpenClawPlugins).toHaveBeenNthCalledWith( 2, expect.objectContaining({ + config: expect.objectContaining({ + channels: { telegram: { enabled: true } }, + plugins: { enabled: true }, + }), + workspaceDir: "/tmp/workspace", + logger: expect.objectContaining({ + info: expect.any(Function), + warn: expect.any(Function), + error: expect.any(Function), + debug: expect.any(Function), + }), onlyPluginIds: ["telegram", "slack"], throwOnLoadError: true, }),