fix: address remaining PR #34942 review comments
This commit is contained in:
parent
f4aed46c16
commit
d25df41a47
@ -145,6 +145,8 @@ export const registerTelegramHandlers = ({
|
||||
key: string;
|
||||
messages: Array<{ msg: Message; ctx: TelegramContext; debounceLane: TelegramDebounceLane }>;
|
||||
timer: ReturnType<typeof setTimeout>;
|
||||
resolvedThreadId?: number;
|
||||
dmThreadId?: number;
|
||||
};
|
||||
const documentBatchBuffer = new Map<string, DocumentBatchEntry>();
|
||||
let documentBatchProcessing: Promise<void> = Promise.resolve();
|
||||
@ -471,7 +473,9 @@ export const registerTelegramHandlers = ({
|
||||
const replyMedia = await resolveReplyMediaForMessage(primaryEntry.ctx, primaryEntry.msg);
|
||||
|
||||
const allMedia: TelegramMediaRef[] = [];
|
||||
for (const { ctx: itemCtx, msg: itemMsg } of entry.messages) {
|
||||
const succeededEntries = new Set<typeof primaryEntry>();
|
||||
for (const item of entry.messages) {
|
||||
const { ctx: itemCtx, msg: itemMsg } = item;
|
||||
try {
|
||||
const media = await resolveMedia(itemCtx, mediaMaxBytes, opts.token, telegramTransport);
|
||||
if (media) {
|
||||
@ -480,6 +484,7 @@ export const registerTelegramHandlers = ({
|
||||
contentType: media.contentType,
|
||||
stickerMetadata: media.stickerMetadata,
|
||||
});
|
||||
succeededEntries.add(item);
|
||||
continue;
|
||||
}
|
||||
} catch (mediaErr) {
|
||||
@ -502,6 +507,7 @@ export const registerTelegramHandlers = ({
|
||||
{ chatId: itemMsg.chat.id, error: errMsg },
|
||||
"document batch: media exceeds size limit",
|
||||
);
|
||||
continue;
|
||||
} else {
|
||||
runtime.error?.(
|
||||
warn(`document batch: preserving placeholder for file that failed to fetch: ${errMsg}`),
|
||||
@ -513,27 +519,36 @@ export const registerTelegramHandlers = ({
|
||||
await processMessage(itemCtx, [], storeAllowFrom, undefined, placeholderMedia);
|
||||
}
|
||||
|
||||
const captionText = primaryEntry.msg.text ?? primaryEntry.msg.caption ?? "";
|
||||
// Re-select primary if the original failed media resolution, to avoid
|
||||
// duplicating caption/instruction from a rejected oversize message.
|
||||
const effectivePrimary = succeededEntries.has(primaryEntry)
|
||||
? primaryEntry
|
||||
: (entry.messages.find((m) => succeededEntries.has(m) && (m.msg.caption || m.msg.text)) ??
|
||||
entry.messages.find((m) => succeededEntries.has(m)) ??
|
||||
primaryEntry);
|
||||
|
||||
const captionText = effectivePrimary.msg.text ?? effectivePrimary.msg.caption ?? "";
|
||||
if (!captionText.trim() && allMedia.length === 0) {
|
||||
return;
|
||||
}
|
||||
|
||||
const conversationThreadId = primaryEntry.msg.message_thread_id;
|
||||
// Use resolved thread IDs consistent with the single-message path
|
||||
const conversationThreadId = entry.resolvedThreadId ?? entry.dmThreadId;
|
||||
const conversationKey =
|
||||
conversationThreadId != null
|
||||
? `${primaryEntry.msg.chat.id}:topic:${conversationThreadId}`
|
||||
: String(primaryEntry.msg.chat.id);
|
||||
? `${effectivePrimary.msg.chat.id}:topic:${conversationThreadId}`
|
||||
: String(effectivePrimary.msg.chat.id);
|
||||
const debounceKey = senderId
|
||||
? `telegram:${accountId ?? "default"}:${conversationKey}:${senderId}:${primaryEntry.debounceLane}`
|
||||
? `telegram:${accountId ?? "default"}:${conversationKey}:${senderId}:${effectivePrimary.debounceLane}`
|
||||
: null;
|
||||
await inboundDebouncer.enqueue({
|
||||
ctx: primaryEntry.ctx,
|
||||
msg: primaryEntry.msg,
|
||||
ctx: effectivePrimary.ctx,
|
||||
msg: effectivePrimary.msg,
|
||||
allMedia,
|
||||
storeAllowFrom,
|
||||
debounceKey,
|
||||
debounceLane: primaryEntry.debounceLane,
|
||||
botUsername: primaryEntry.ctx.me?.username,
|
||||
debounceLane: effectivePrimary.debounceLane,
|
||||
botUsername: effectivePrimary.ctx.me?.username,
|
||||
});
|
||||
};
|
||||
|
||||
@ -1092,13 +1107,15 @@ export const registerTelegramHandlers = ({
|
||||
: msg.sender_chat?.id != null
|
||||
? `sender_chat:${msg.sender_chat.id}`
|
||||
: null;
|
||||
const documentDebounceLane = resolveTelegramDebounceLane(msg);
|
||||
if (
|
||||
DOCUMENT_BATCH_WINDOW_MS > 0 &&
|
||||
(msg as { document?: unknown }).document &&
|
||||
!mediaGroupId &&
|
||||
documentSenderId
|
||||
documentSenderId &&
|
||||
documentDebounceLane !== "forward"
|
||||
) {
|
||||
const debounceLane = resolveTelegramDebounceLane(msg);
|
||||
const debounceLane = documentDebounceLane;
|
||||
const docBatchKey = `doc:${chatId}:${resolvedThreadId ?? "main"}:${documentSenderId}:${debounceLane}`;
|
||||
const existing = documentBatchBuffer.get(docBatchKey);
|
||||
if (existing) {
|
||||
@ -1110,6 +1127,8 @@ export const registerTelegramHandlers = ({
|
||||
key: docBatchKey,
|
||||
messages: [{ msg, ctx, debounceLane }],
|
||||
timer: setTimeout(() => {}, DOCUMENT_BATCH_WINDOW_MS),
|
||||
resolvedThreadId,
|
||||
dmThreadId,
|
||||
};
|
||||
documentBatchBuffer.set(docBatchKey, entry);
|
||||
scheduleDocumentBatchFlush(entry);
|
||||
|
||||
@ -38,6 +38,24 @@ function isIOSNode(node: NodeSummary): boolean {
|
||||
);
|
||||
}
|
||||
|
||||
function isLegacyMethodError(error: unknown): boolean {
|
||||
const msg =
|
||||
typeof error === "object" && error !== null && "message" in error
|
||||
? String((error as { message: unknown }).message).toLowerCase()
|
||||
: typeof error === "string"
|
||||
? error.toLowerCase()
|
||||
: "";
|
||||
if (!msg.includes("node.list")) {
|
||||
return false;
|
||||
}
|
||||
return (
|
||||
msg.includes("unknown method") ||
|
||||
msg.includes("method not found") ||
|
||||
msg.includes("not implemented") ||
|
||||
msg.includes("unsupported")
|
||||
);
|
||||
}
|
||||
|
||||
async function loadNodes(cfg: OpenClawConfig): Promise<NodeSummary[]> {
|
||||
try {
|
||||
const res = await callGateway<{ nodes?: NodeSummary[] }>({
|
||||
@ -46,7 +64,10 @@ async function loadNodes(cfg: OpenClawConfig): Promise<NodeSummary[]> {
|
||||
config: cfg,
|
||||
});
|
||||
return Array.isArray(res.nodes) ? res.nodes : [];
|
||||
} catch {
|
||||
} catch (error) {
|
||||
if (!isLegacyMethodError(error)) {
|
||||
throw error;
|
||||
}
|
||||
const res = await callGateway<{ pending?: unknown[]; paired?: NodeSummary[] }>({
|
||||
method: "node.pair.list",
|
||||
params: {},
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user