fix: address PR #34942 review follow-ups

This commit is contained in:
Simon Kim 2026-03-21 05:13:20 +09:00
parent ce244decd0
commit 164c83998b
6 changed files with 137 additions and 58 deletions

View File

@ -166,7 +166,7 @@ Use these hubs to discover every page, including deep dives and reference docs t
- [Plugins overview](/tools/plugin)
- [Building extensions](/plugins/building-extensions)
- [Plugin manifest](/plugins/manifest)
- [Agent tools](/plugins/agent-tools)
- [Plugin API overview](/tools/plugin#plugin-api-overview)
- [Plugin bundles](/plugins/bundles)
- [Community plugins](/plugins/community)
- [Capability cookbook](/tools/capability-cookbook)

View File

@ -330,7 +330,7 @@ OpenProse pairs well with Lobster: use `/prose` to orchestrate multi-agent prep,
## Learn more
- [Plugins](/tools/plugin)
- [Plugin tool authoring](/plugins/agent-tools)
- [Plugin API overview](/tools/plugin#plugin-api-overview)
## Case study: community workflows

View File

@ -332,6 +332,6 @@ See [Plugin manifest](/plugins/manifest) for the manifest file format.
- [Building extensions](/plugins/building-extensions)
- [Plugin bundles](/plugins/bundles)
- [Plugin manifest](/plugins/manifest)
- [Plugin agent tools](/plugins/agent-tools)
- [Plugin agent tools](/tools/plugin#plugin-api-overview)
- [Capability Cookbook](/tools/capability-cookbook)
- [Community plugins](/plugins/community)

View File

@ -54,6 +54,7 @@ function createHandlerHarness() {
replyOptions: {},
markDispatchIdle: vi.fn(),
}),
withReplyDispatcher: vi.fn(async (_dispatcher, fn) => await fn()),
resolveHumanDelayConfig: vi.fn().mockReturnValue(undefined),
dispatchReplyFromConfig: vi
.fn()

View File

@ -143,7 +143,7 @@ export const registerTelegramHandlers = ({
telegramCfg.documentBatchWindowMs ?? DEFAULT_DOCUMENT_BATCH_WINDOW_MS;
type DocumentBatchEntry = {
key: string;
messages: Array<{ msg: Message; ctx: TelegramContext }>;
messages: Array<{ msg: Message; ctx: TelegramContext; debounceLane: TelegramDebounceLane }>;
timer: ReturnType<typeof setTimeout>;
};
const documentBatchBuffer = new Map<string, DocumentBatchEntry>();
@ -453,60 +453,88 @@ export const registerTelegramHandlers = ({
};
const processDocumentBatch = async (entry: DocumentBatchEntry) => {
try {
entry.messages.sort((a, b) => a.msg.message_id - b.msg.message_id);
entry.messages.sort((a, b) => a.msg.message_id - b.msg.message_id);
const captionMsg = entry.messages.find((m) => m.msg.caption || m.msg.text);
const primaryEntry = captionMsg ?? entry.messages[0];
const captionMsg = entry.messages.find((m) => m.msg.caption || m.msg.text);
const primaryEntry = captionMsg ?? entry.messages[0];
if (!primaryEntry) {
return;
}
const allMedia: TelegramMediaRef[] = [];
const storeAllowFrom = await loadStoreAllowFrom();
const senderId =
primaryEntry.msg.from?.id != null
? String(primaryEntry.msg.from.id)
: primaryEntry.msg.sender_chat?.id != null
? `sender_chat:${primaryEntry.msg.sender_chat.id}`
: "";
const replyMedia = await resolveReplyMediaForMessage(primaryEntry.ctx, primaryEntry.msg);
for (const { ctx: itemCtx, msg: itemMsg } of entry.messages) {
try {
const media = await resolveMedia(itemCtx, mediaMaxBytes, opts.token, telegramTransport);
if (media) {
allMedia.push({
path: media.path,
contentType: media.contentType,
stickerMetadata: media.stickerMetadata,
});
}
} catch (mediaErr) {
const errMsg = String(mediaErr);
if (isMediaSizeLimitError(mediaErr)) {
const limitMb = Math.round(mediaMaxBytes / (1024 * 1024));
await withTelegramApiErrorLogging({
operation: "sendMessage",
runtime,
fn: () =>
bot.api.sendMessage(
itemMsg.chat.id,
`⚠️ File too large. Maximum size is ${limitMb}MB.`,
{ reply_to_message_id: itemMsg.message_id },
),
}).catch(() => {});
logger.warn(
{ chatId: itemMsg.chat.id, error: errMsg },
"document batch: media exceeds size limit",
);
} else {
runtime.error?.(warn(`document batch: skipping file that failed to fetch: ${errMsg}`));
}
const allMedia: TelegramMediaRef[] = [];
for (const { ctx: itemCtx, msg: itemMsg } of entry.messages) {
try {
const media = await resolveMedia(itemCtx, mediaMaxBytes, opts.token, telegramTransport);
if (media) {
allMedia.push({
path: media.path,
contentType: media.contentType,
stickerMetadata: media.stickerMetadata,
});
continue;
}
} catch (mediaErr) {
const errMsg = String(mediaErr);
if (isMediaSizeLimitError(mediaErr)) {
const limitMb = Math.round(mediaMaxBytes / (1024 * 1024));
await withTelegramApiErrorLogging({
operation: "sendMessage",
runtime,
fn: () =>
bot.api.sendMessage(
itemMsg.chat.id,
`⚠️ File too large. Maximum size is ${limitMb}MB.`,
{
reply_to_message_id: itemMsg.message_id,
},
),
}).catch(() => {});
logger.warn(
{ chatId: itemMsg.chat.id, error: errMsg },
"document batch: media exceeds size limit",
);
} else {
runtime.error?.(
warn(`document batch: preserving placeholder for file that failed to fetch: ${errMsg}`),
);
}
}
if (
allMedia.length === 0 &&
!(primaryEntry.msg.text ?? primaryEntry.msg.caption ?? "").trim()
) {
return;
}
const storeAllowFrom = await loadStoreAllowFrom();
await processMessage(primaryEntry.ctx, allMedia, storeAllowFrom);
} catch (err) {
runtime.error?.(danger(`document batch handler failed: ${String(err)}`));
const placeholderMedia = await resolveReplyMediaForMessage(itemCtx, itemMsg);
await processMessage(itemCtx, [], storeAllowFrom, undefined, placeholderMedia);
}
const captionText = primaryEntry.msg.text ?? primaryEntry.msg.caption ?? "";
if (!captionText.trim() && allMedia.length === 0) {
return;
}
const conversationThreadId = primaryEntry.msg.message_thread_id;
const conversationKey =
conversationThreadId != null
? `${primaryEntry.msg.chat.id}:topic:${conversationThreadId}`
: String(primaryEntry.msg.chat.id);
const debounceKey = senderId
? `telegram:${accountId ?? "default"}:${conversationKey}:${senderId}:${primaryEntry.debounceLane}`
: null;
await inboundDebouncer.enqueue({
ctx: primaryEntry.ctx,
msg: primaryEntry.msg,
allMedia,
storeAllowFrom,
debounceKey,
debounceLane: primaryEntry.debounceLane,
botUsername: primaryEntry.ctx.me?.username,
});
};
const scheduleDocumentBatchFlush = (entry: DocumentBatchEntry) => {
@ -1058,24 +1086,29 @@ export const registerTelegramHandlers = ({
// Document batch handling — buffer individual document messages from the same sender.
// Telegram sends documents without media_group_id, so we collect them in a time window.
// Skip batching for messages without msg.from (channel_post) to prevent unrelated posts merging.
const documentSenderId =
msg.from?.id != null
? String(msg.from.id)
: msg.sender_chat?.id != null
? `sender_chat:${msg.sender_chat.id}`
: null;
if (
DOCUMENT_BATCH_WINDOW_MS > 0 &&
(msg as { document?: unknown }).document &&
!mediaGroupId &&
msg.from?.id
documentSenderId
) {
const senderId = String(msg.from.id);
const docBatchKey = `doc:${chatId}:${resolvedThreadId ?? "main"}:${senderId}`;
const debounceLane = resolveTelegramDebounceLane(msg);
const docBatchKey = `doc:${chatId}:${resolvedThreadId ?? "main"}:${documentSenderId}:${debounceLane}`;
const existing = documentBatchBuffer.get(docBatchKey);
if (existing) {
clearTimeout(existing.timer);
existing.messages.push({ msg, ctx });
existing.messages.push({ msg, ctx, debounceLane });
scheduleDocumentBatchFlush(existing);
} else {
const entry: DocumentBatchEntry = {
key: docBatchKey,
messages: [{ msg, ctx }],
messages: [{ msg, ctx, debounceLane }],
timer: setTimeout(() => {}, DOCUMENT_BATCH_WINDOW_MS),
};
documentBatchBuffer.set(docBatchKey, entry);
@ -1084,6 +1117,26 @@ export const registerTelegramHandlers = ({
return;
}
if (!(msg as { document?: unknown }).document && documentSenderId) {
const forwardLane = resolveTelegramDebounceLane(msg);
for (const [key, pendingEntry] of documentBatchBuffer) {
if (!key.startsWith(`doc:${chatId}:${resolvedThreadId ?? "main"}:${documentSenderId}:`)) {
continue;
}
if (pendingEntry.messages.some((item) => item.debounceLane !== forwardLane)) {
continue;
}
documentBatchBuffer.delete(key);
clearTimeout(pendingEntry.timer);
documentBatchProcessing = documentBatchProcessing
.then(async () => {
await processDocumentBatch(pendingEntry);
})
.catch(() => undefined);
}
await documentBatchProcessing;
}
let media: Awaited<ReturnType<typeof resolveMedia>> = null;
try {
media = await resolveMedia(ctx, mediaMaxBytes, opts.token, telegramTransport);

View File

@ -86,11 +86,36 @@ describe("ensurePluginRegistryLoaded", () => {
expect(mocks.loadOpenClawPlugins).toHaveBeenCalledTimes(2);
expect(mocks.loadOpenClawPlugins).toHaveBeenNthCalledWith(
1,
expect.objectContaining({ onlyPluginIds: ["telegram"], throwOnLoadError: true }),
expect.objectContaining({
config: expect.objectContaining({
channels: { telegram: { enabled: true } },
plugins: { enabled: true },
}),
workspaceDir: "/tmp/workspace",
logger: expect.objectContaining({
info: expect.any(Function),
warn: expect.any(Function),
error: expect.any(Function),
debug: expect.any(Function),
}),
onlyPluginIds: ["telegram"],
throwOnLoadError: true,
}),
);
expect(mocks.loadOpenClawPlugins).toHaveBeenNthCalledWith(
2,
expect.objectContaining({
config: expect.objectContaining({
channels: { telegram: { enabled: true } },
plugins: { enabled: true },
}),
workspaceDir: "/tmp/workspace",
logger: expect.objectContaining({
info: expect.any(Function),
warn: expect.any(Function),
error: expect.any(Function),
debug: expect.any(Function),
}),
onlyPluginIds: ["telegram", "slack"],
throwOnLoadError: true,
}),