test(telegram): add dispatch and handler seams
This commit is contained in:
parent
b00f3f9b64
commit
1662f3f5da
@ -1,13 +1,12 @@
|
||||
import type { Message, ReactionTypeEmoji } from "@grammyjs/types";
|
||||
import { resolveAgentDir, resolveDefaultAgentId } from "openclaw/plugin-sdk/agent-runtime";
|
||||
import { resolveDefaultModelForAgent } from "openclaw/plugin-sdk/agent-runtime";
|
||||
import { shouldDebounceTextInbound } from "openclaw/plugin-sdk/channel-runtime";
|
||||
import { resolveChannelConfigWrites } from "openclaw/plugin-sdk/channel-runtime";
|
||||
import { loadConfig } from "openclaw/plugin-sdk/config-runtime";
|
||||
import { writeConfigFile } from "openclaw/plugin-sdk/config-runtime";
|
||||
import {
|
||||
loadSessionStore,
|
||||
resolveSessionStoreEntry,
|
||||
resolveStorePath,
|
||||
updateSessionStore,
|
||||
} from "openclaw/plugin-sdk/config-runtime";
|
||||
import type { DmPolicy } from "openclaw/plugin-sdk/config-runtime";
|
||||
@ -17,21 +16,22 @@ import type {
|
||||
TelegramTopicConfig,
|
||||
} from "openclaw/plugin-sdk/config-runtime";
|
||||
import { applyModelOverrideToSessionEntry } from "openclaw/plugin-sdk/config-runtime";
|
||||
import { readChannelAllowFromStore } from "openclaw/plugin-sdk/conversation-runtime";
|
||||
import {
|
||||
buildPluginBindingResolvedText,
|
||||
parsePluginBindingApprovalCustomId,
|
||||
resolvePluginConversationBindingApproval,
|
||||
} from "openclaw/plugin-sdk/conversation-runtime";
|
||||
import { enqueueSystemEvent } from "openclaw/plugin-sdk/infra-runtime";
|
||||
import { dispatchPluginInteractiveHandler } from "openclaw/plugin-sdk/plugin-runtime";
|
||||
import {
|
||||
createInboundDebouncer,
|
||||
resolveInboundDebounceMs,
|
||||
} from "openclaw/plugin-sdk/reply-runtime";
|
||||
import { buildCommandsPaginationKeyboard } from "openclaw/plugin-sdk/reply-runtime";
|
||||
import {
|
||||
buildModelsProviderData,
|
||||
formatModelsAvailableHeader,
|
||||
} from "openclaw/plugin-sdk/reply-runtime";
|
||||
import { resolveStoredModelOverride } from "openclaw/plugin-sdk/reply-runtime";
|
||||
import { listSkillCommandsForAgents } from "openclaw/plugin-sdk/reply-runtime";
|
||||
import { buildCommandsMessagePaginated } from "openclaw/plugin-sdk/reply-runtime";
|
||||
import { resolveAgentRoute } from "openclaw/plugin-sdk/routing";
|
||||
import { resolveThreadSessionKeys } from "openclaw/plugin-sdk/routing";
|
||||
@ -42,19 +42,22 @@ import {
|
||||
normalizeDmAllowFromWithStore,
|
||||
type NormalizedAllowFrom,
|
||||
} from "./bot-access.js";
|
||||
import {
|
||||
createTelegramInboundBufferRuntime,
|
||||
type TextFragmentEntry,
|
||||
} from "./bot-handlers.buffers.js";
|
||||
import { defaultTelegramBotDeps } from "./bot-deps.js";
|
||||
import {
|
||||
APPROVE_CALLBACK_DATA_RE,
|
||||
hasInboundMedia,
|
||||
hasReplyTargetMedia,
|
||||
isMediaSizeLimitError,
|
||||
isRecoverableMediaGroupError,
|
||||
resolveInboundMediaFileId,
|
||||
} from "./bot-handlers.media.js";
|
||||
import type { TelegramMediaRef } from "./bot-message-context.js";
|
||||
import { RegisterTelegramHandlerParams } from "./bot-native-commands.js";
|
||||
import { type TelegramUpdateKeyContext } from "./bot-updates.js";
|
||||
import {
|
||||
MEDIA_GROUP_TIMEOUT_MS,
|
||||
type MediaGroupEntry,
|
||||
type TelegramUpdateKeyContext,
|
||||
} from "./bot-updates.js";
|
||||
import { resolveMedia } from "./bot/delivery.js";
|
||||
import {
|
||||
getTelegramTextParts,
|
||||
@ -90,7 +93,6 @@ import {
|
||||
type ProviderInfo,
|
||||
} from "./model-buttons.js";
|
||||
import { buildInlineKeyboard } from "./send.js";
|
||||
import { wasSentByBot } from "./sent-message-cache.js";
|
||||
|
||||
export const registerTelegramHandlers = ({
|
||||
cfg,
|
||||
@ -108,42 +110,161 @@ export const registerTelegramHandlers = ({
|
||||
shouldSkipUpdate,
|
||||
processMessage,
|
||||
logger,
|
||||
telegramDeps = defaultTelegramBotDeps,
|
||||
}: RegisterTelegramHandlerParams) => {
|
||||
const loadStoreAllowFrom = async () =>
|
||||
readChannelAllowFromStore("telegram", process.env, accountId).catch(() => []);
|
||||
const DEFAULT_TEXT_FRAGMENT_MAX_GAP_MS = 1500;
|
||||
const TELEGRAM_TEXT_FRAGMENT_START_THRESHOLD_CHARS = 4000;
|
||||
const TELEGRAM_TEXT_FRAGMENT_MAX_GAP_MS =
|
||||
typeof opts.testTimings?.textFragmentGapMs === "number" &&
|
||||
Number.isFinite(opts.testTimings.textFragmentGapMs)
|
||||
? Math.max(10, Math.floor(opts.testTimings.textFragmentGapMs))
|
||||
: DEFAULT_TEXT_FRAGMENT_MAX_GAP_MS;
|
||||
const TELEGRAM_TEXT_FRAGMENT_MAX_ID_GAP = 1;
|
||||
const TELEGRAM_TEXT_FRAGMENT_MAX_PARTS = 12;
|
||||
const TELEGRAM_TEXT_FRAGMENT_MAX_TOTAL_CHARS = 50_000;
|
||||
const mediaGroupTimeoutMs =
|
||||
typeof opts.testTimings?.mediaGroupFlushMs === "number" &&
|
||||
Number.isFinite(opts.testTimings.mediaGroupFlushMs)
|
||||
? Math.max(10, Math.floor(opts.testTimings.mediaGroupFlushMs))
|
||||
: MEDIA_GROUP_TIMEOUT_MS;
|
||||
|
||||
const {
|
||||
buildSyntheticContext,
|
||||
buildSyntheticTextMessage,
|
||||
inboundDebouncer,
|
||||
mediaGroupBuffer,
|
||||
mediaGroupProcessing,
|
||||
setMediaGroupProcessing,
|
||||
mediaGroupTimeoutMs,
|
||||
processMediaGroup,
|
||||
textFragmentBuffer,
|
||||
textFragmentProcessing,
|
||||
setTextFragmentProcessing,
|
||||
scheduleTextFragmentFlush,
|
||||
flushTextFragments,
|
||||
resolveReplyMediaForMessage,
|
||||
resolveTelegramDebounceLane,
|
||||
TELEGRAM_TEXT_FRAGMENT_MAX_GAP_MS,
|
||||
TELEGRAM_TEXT_FRAGMENT_MAX_ID_GAP,
|
||||
TELEGRAM_TEXT_FRAGMENT_MAX_PARTS,
|
||||
TELEGRAM_TEXT_FRAGMENT_MAX_TOTAL_CHARS,
|
||||
TELEGRAM_TEXT_FRAGMENT_START_THRESHOLD_CHARS,
|
||||
} = createTelegramInboundBufferRuntime({
|
||||
accountId,
|
||||
bot,
|
||||
cfg,
|
||||
logger,
|
||||
mediaMaxBytes,
|
||||
opts,
|
||||
processMessage,
|
||||
loadStoreAllowFrom,
|
||||
runtime,
|
||||
telegramTransport,
|
||||
const mediaGroupBuffer = new Map<string, MediaGroupEntry>();
|
||||
let mediaGroupProcessing: Promise<void> = Promise.resolve();
|
||||
|
||||
type TextFragmentEntry = {
|
||||
key: string;
|
||||
messages: Array<{ msg: Message; ctx: TelegramContext; receivedAtMs: number }>;
|
||||
timer: ReturnType<typeof setTimeout>;
|
||||
};
|
||||
const textFragmentBuffer = new Map<string, TextFragmentEntry>();
|
||||
let textFragmentProcessing: Promise<void> = Promise.resolve();
|
||||
|
||||
const debounceMs = resolveInboundDebounceMs({ cfg, channel: "telegram" });
|
||||
const FORWARD_BURST_DEBOUNCE_MS = 80;
|
||||
type TelegramDebounceLane = "default" | "forward";
|
||||
type TelegramDebounceEntry = {
|
||||
ctx: TelegramContext;
|
||||
msg: Message;
|
||||
allMedia: TelegramMediaRef[];
|
||||
storeAllowFrom: string[];
|
||||
debounceKey: string | null;
|
||||
debounceLane: TelegramDebounceLane;
|
||||
botUsername?: string;
|
||||
};
|
||||
const resolveTelegramDebounceLane = (msg: Message): TelegramDebounceLane => {
|
||||
const forwardMeta = msg as {
|
||||
forward_origin?: unknown;
|
||||
forward_from?: unknown;
|
||||
forward_from_chat?: unknown;
|
||||
forward_sender_name?: unknown;
|
||||
forward_date?: unknown;
|
||||
};
|
||||
return (forwardMeta.forward_origin ??
|
||||
forwardMeta.forward_from ??
|
||||
forwardMeta.forward_from_chat ??
|
||||
forwardMeta.forward_sender_name ??
|
||||
forwardMeta.forward_date)
|
||||
? "forward"
|
||||
: "default";
|
||||
};
|
||||
const buildSyntheticTextMessage = (params: {
|
||||
base: Message;
|
||||
text: string;
|
||||
date?: number;
|
||||
from?: Message["from"];
|
||||
}): Message => ({
|
||||
...params.base,
|
||||
...(params.from ? { from: params.from } : {}),
|
||||
text: params.text,
|
||||
caption: undefined,
|
||||
caption_entities: undefined,
|
||||
entities: undefined,
|
||||
...(params.date != null ? { date: params.date } : {}),
|
||||
});
|
||||
const buildSyntheticContext = (
|
||||
ctx: Pick<TelegramContext, "me"> & { getFile?: unknown },
|
||||
message: Message,
|
||||
): TelegramContext => {
|
||||
const getFile =
|
||||
typeof ctx.getFile === "function"
|
||||
? (ctx.getFile as TelegramContext["getFile"]).bind(ctx as object)
|
||||
: async () => ({});
|
||||
return { message, me: ctx.me, getFile };
|
||||
};
|
||||
const inboundDebouncer = createInboundDebouncer<TelegramDebounceEntry>({
|
||||
debounceMs,
|
||||
resolveDebounceMs: (entry) =>
|
||||
entry.debounceLane === "forward" ? FORWARD_BURST_DEBOUNCE_MS : debounceMs,
|
||||
buildKey: (entry) => entry.debounceKey,
|
||||
shouldDebounce: (entry) => {
|
||||
const text = entry.msg.text ?? entry.msg.caption ?? "";
|
||||
const hasDebounceableText = shouldDebounceTextInbound({
|
||||
text,
|
||||
cfg,
|
||||
commandOptions: { botUsername: entry.botUsername },
|
||||
});
|
||||
if (entry.debounceLane === "forward") {
|
||||
// Forwarded bursts often split text + media into adjacent updates.
|
||||
// Debounce media-only forward entries too so they can coalesce.
|
||||
return hasDebounceableText || entry.allMedia.length > 0;
|
||||
}
|
||||
if (!hasDebounceableText) {
|
||||
return false;
|
||||
}
|
||||
return entry.allMedia.length === 0;
|
||||
},
|
||||
onFlush: async (entries) => {
|
||||
const last = entries.at(-1);
|
||||
if (!last) {
|
||||
return;
|
||||
}
|
||||
if (entries.length === 1) {
|
||||
const replyMedia = await resolveReplyMediaForMessage(last.ctx, last.msg);
|
||||
await processMessage(last.ctx, last.allMedia, last.storeAllowFrom, undefined, replyMedia);
|
||||
return;
|
||||
}
|
||||
const combinedText = entries
|
||||
.map((entry) => entry.msg.text ?? entry.msg.caption ?? "")
|
||||
.filter(Boolean)
|
||||
.join("\n");
|
||||
const combinedMedia = entries.flatMap((entry) => entry.allMedia);
|
||||
if (!combinedText.trim() && combinedMedia.length === 0) {
|
||||
return;
|
||||
}
|
||||
const first = entries[0];
|
||||
const baseCtx = first.ctx;
|
||||
const syntheticMessage = buildSyntheticTextMessage({
|
||||
base: first.msg,
|
||||
text: combinedText,
|
||||
date: last.msg.date ?? first.msg.date,
|
||||
});
|
||||
const messageIdOverride = last.msg.message_id ? String(last.msg.message_id) : undefined;
|
||||
const syntheticCtx = buildSyntheticContext(baseCtx, syntheticMessage);
|
||||
const replyMedia = await resolveReplyMediaForMessage(baseCtx, syntheticMessage);
|
||||
await processMessage(
|
||||
syntheticCtx,
|
||||
combinedMedia,
|
||||
first.storeAllowFrom,
|
||||
messageIdOverride ? { messageIdOverride } : undefined,
|
||||
replyMedia,
|
||||
);
|
||||
},
|
||||
onError: (err, items) => {
|
||||
runtime.error?.(danger(`telegram debounce flush failed: ${String(err)}`));
|
||||
const chatId = items[0]?.msg.chat.id;
|
||||
if (chatId != null) {
|
||||
const threadId = items[0]?.msg.message_thread_id;
|
||||
void bot.api
|
||||
.sendMessage(
|
||||
chatId,
|
||||
"Something went wrong while processing your message. Please try again.",
|
||||
threadId != null ? { message_thread_id: threadId } : undefined,
|
||||
)
|
||||
.catch((sendErr) => {
|
||||
logVerbose(`telegram: error fallback send failed: ${String(sendErr)}`);
|
||||
});
|
||||
}
|
||||
},
|
||||
});
|
||||
|
||||
const resolveTelegramSessionState = (params: {
|
||||
@ -190,7 +311,9 @@ export const registerTelegramHandlers = ({
|
||||
? resolveThreadSessionKeys({ baseSessionKey, threadId: `${params.chatId}:${dmThreadId}` })
|
||||
: null;
|
||||
const sessionKey = threadKeys?.sessionKey ?? baseSessionKey;
|
||||
const storePath = resolveStorePath(cfg.session?.store, { agentId: route.agentId });
|
||||
const storePath = telegramDeps.resolveStorePath(cfg.session?.store, {
|
||||
agentId: route.agentId,
|
||||
});
|
||||
const store = loadSessionStore(storePath);
|
||||
const entry = resolveSessionStoreEntry({ store, sessionKey }).existing;
|
||||
const storedOverride = resolveStoredModelOverride({
|
||||
@ -227,6 +350,139 @@ export const registerTelegramHandlers = ({
|
||||
};
|
||||
};
|
||||
|
||||
const processMediaGroup = async (entry: MediaGroupEntry) => {
|
||||
try {
|
||||
entry.messages.sort((a, b) => a.msg.message_id - b.msg.message_id);
|
||||
|
||||
const captionMsg = entry.messages.find((m) => m.msg.caption || m.msg.text);
|
||||
const primaryEntry = captionMsg ?? entry.messages[0];
|
||||
|
||||
const allMedia: TelegramMediaRef[] = [];
|
||||
for (const { ctx } of entry.messages) {
|
||||
let media;
|
||||
try {
|
||||
media = await resolveMedia(ctx, mediaMaxBytes, opts.token, telegramTransport);
|
||||
} catch (mediaErr) {
|
||||
if (!isRecoverableMediaGroupError(mediaErr)) {
|
||||
throw mediaErr;
|
||||
}
|
||||
runtime.log?.(
|
||||
warn(`media group: skipping photo that failed to fetch: ${String(mediaErr)}`),
|
||||
);
|
||||
continue;
|
||||
}
|
||||
if (media) {
|
||||
allMedia.push({
|
||||
path: media.path,
|
||||
contentType: media.contentType,
|
||||
stickerMetadata: media.stickerMetadata,
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
const storeAllowFrom = await loadStoreAllowFrom();
|
||||
const replyMedia = await resolveReplyMediaForMessage(primaryEntry.ctx, primaryEntry.msg);
|
||||
await processMessage(primaryEntry.ctx, allMedia, storeAllowFrom, undefined, replyMedia);
|
||||
} catch (err) {
|
||||
runtime.error?.(danger(`media group handler failed: ${String(err)}`));
|
||||
}
|
||||
};
|
||||
|
||||
const flushTextFragments = async (entry: TextFragmentEntry) => {
|
||||
try {
|
||||
entry.messages.sort((a, b) => a.msg.message_id - b.msg.message_id);
|
||||
|
||||
const first = entry.messages[0];
|
||||
const last = entry.messages.at(-1);
|
||||
if (!first || !last) {
|
||||
return;
|
||||
}
|
||||
|
||||
const combinedText = entry.messages.map((m) => m.msg.text ?? "").join("");
|
||||
if (!combinedText.trim()) {
|
||||
return;
|
||||
}
|
||||
|
||||
const syntheticMessage = buildSyntheticTextMessage({
|
||||
base: first.msg,
|
||||
text: combinedText,
|
||||
date: last.msg.date ?? first.msg.date,
|
||||
});
|
||||
|
||||
const storeAllowFrom = await loadStoreAllowFrom();
|
||||
const baseCtx = first.ctx;
|
||||
|
||||
await processMessage(buildSyntheticContext(baseCtx, syntheticMessage), [], storeAllowFrom, {
|
||||
messageIdOverride: String(last.msg.message_id),
|
||||
});
|
||||
} catch (err) {
|
||||
runtime.error?.(danger(`text fragment handler failed: ${String(err)}`));
|
||||
}
|
||||
};
|
||||
|
||||
const queueTextFragmentFlush = async (entry: TextFragmentEntry) => {
|
||||
textFragmentProcessing = textFragmentProcessing
|
||||
.then(async () => {
|
||||
await flushTextFragments(entry);
|
||||
})
|
||||
.catch(() => undefined);
|
||||
await textFragmentProcessing;
|
||||
};
|
||||
|
||||
const runTextFragmentFlush = async (entry: TextFragmentEntry) => {
|
||||
textFragmentBuffer.delete(entry.key);
|
||||
await queueTextFragmentFlush(entry);
|
||||
};
|
||||
|
||||
const scheduleTextFragmentFlush = (entry: TextFragmentEntry) => {
|
||||
clearTimeout(entry.timer);
|
||||
entry.timer = setTimeout(async () => {
|
||||
await runTextFragmentFlush(entry);
|
||||
}, TELEGRAM_TEXT_FRAGMENT_MAX_GAP_MS);
|
||||
};
|
||||
|
||||
const loadStoreAllowFrom = async () =>
|
||||
telegramDeps.readChannelAllowFromStore("telegram", process.env, accountId).catch(() => []);
|
||||
|
||||
const resolveReplyMediaForMessage = async (
|
||||
ctx: TelegramContext,
|
||||
msg: Message,
|
||||
): Promise<TelegramMediaRef[]> => {
|
||||
const replyMessage = msg.reply_to_message;
|
||||
if (!replyMessage || !hasInboundMedia(replyMessage)) {
|
||||
return [];
|
||||
}
|
||||
const replyFileId = resolveInboundMediaFileId(replyMessage);
|
||||
if (!replyFileId) {
|
||||
return [];
|
||||
}
|
||||
try {
|
||||
const media = await resolveMedia(
|
||||
{
|
||||
message: replyMessage,
|
||||
me: ctx.me,
|
||||
getFile: async () => await bot.api.getFile(replyFileId),
|
||||
},
|
||||
mediaMaxBytes,
|
||||
opts.token,
|
||||
telegramTransport,
|
||||
);
|
||||
if (!media) {
|
||||
return [];
|
||||
}
|
||||
return [
|
||||
{
|
||||
path: media.path,
|
||||
contentType: media.contentType,
|
||||
stickerMetadata: media.stickerMetadata,
|
||||
},
|
||||
];
|
||||
} catch (err) {
|
||||
logger.warn({ chatId: msg.chat.id, error: String(err) }, "reply media fetch failed");
|
||||
return [];
|
||||
}
|
||||
};
|
||||
|
||||
const isAllowlistAuthorized = (
|
||||
allow: NormalizedAllowFrom,
|
||||
senderId: string,
|
||||
@ -502,7 +758,7 @@ export const registerTelegramHandlers = ({
|
||||
if (user?.is_bot) {
|
||||
return;
|
||||
}
|
||||
if (reactionMode === "own" && !wasSentByBot(chatId, messageId)) {
|
||||
if (reactionMode === "own" && !telegramDeps.wasSentByBot(chatId, messageId)) {
|
||||
return;
|
||||
}
|
||||
const eventAuthContext = await resolveTelegramEventAuthorizationContext({
|
||||
@ -577,7 +833,7 @@ export const registerTelegramHandlers = ({
|
||||
const parentPeer = buildTelegramParentPeer({ isGroup, resolvedThreadId, chatId });
|
||||
// Fresh config for bindings lookup; other routing inputs are payload-derived.
|
||||
const route = resolveAgentRoute({
|
||||
cfg: loadConfig(),
|
||||
cfg: telegramDeps.loadConfig(),
|
||||
channel: "telegram",
|
||||
accountId,
|
||||
peer: { kind: isGroup ? "group" : "direct", id: peerId },
|
||||
@ -589,7 +845,7 @@ export const registerTelegramHandlers = ({
|
||||
for (const r of addedReactions) {
|
||||
const emoji = r.emoji;
|
||||
const text = `Telegram reaction added: ${emoji} by ${senderLabel} on msg ${messageId}`;
|
||||
enqueueSystemEvent(text, {
|
||||
telegramDeps.enqueueSystemEvent(text, {
|
||||
sessionKey,
|
||||
contextKey: `telegram:reaction:add:${chatId}:${messageId}:${user?.id ?? "anon"}:${emoji}`,
|
||||
});
|
||||
@ -663,14 +919,12 @@ export const registerTelegramHandlers = ({
|
||||
// Not appendable (or limits exceeded): flush buffered entry first, then continue normally.
|
||||
clearTimeout(existing.timer);
|
||||
textFragmentBuffer.delete(key);
|
||||
setTextFragmentProcessing(
|
||||
textFragmentProcessing()
|
||||
.then(async () => {
|
||||
await flushTextFragments(existing);
|
||||
})
|
||||
.catch(() => undefined),
|
||||
);
|
||||
await textFragmentProcessing();
|
||||
textFragmentProcessing = textFragmentProcessing
|
||||
.then(async () => {
|
||||
await flushTextFragments(existing);
|
||||
})
|
||||
.catch(() => undefined);
|
||||
await textFragmentProcessing;
|
||||
}
|
||||
|
||||
const shouldStart = text.length >= TELEGRAM_TEXT_FRAGMENT_START_THRESHOLD_CHARS;
|
||||
@ -695,28 +949,24 @@ export const registerTelegramHandlers = ({
|
||||
existing.messages.push({ msg, ctx });
|
||||
existing.timer = setTimeout(async () => {
|
||||
mediaGroupBuffer.delete(mediaGroupId);
|
||||
setMediaGroupProcessing(
|
||||
mediaGroupProcessing()
|
||||
.then(async () => {
|
||||
await processMediaGroup(existing);
|
||||
})
|
||||
.catch(() => undefined),
|
||||
);
|
||||
await mediaGroupProcessing();
|
||||
mediaGroupProcessing = mediaGroupProcessing
|
||||
.then(async () => {
|
||||
await processMediaGroup(existing);
|
||||
})
|
||||
.catch(() => undefined);
|
||||
await mediaGroupProcessing;
|
||||
}, mediaGroupTimeoutMs);
|
||||
} else {
|
||||
const entry = {
|
||||
const entry: MediaGroupEntry = {
|
||||
messages: [{ msg, ctx }],
|
||||
timer: setTimeout(async () => {
|
||||
mediaGroupBuffer.delete(mediaGroupId);
|
||||
setMediaGroupProcessing(
|
||||
mediaGroupProcessing()
|
||||
.then(async () => {
|
||||
await processMediaGroup(entry);
|
||||
})
|
||||
.catch(() => undefined),
|
||||
);
|
||||
await mediaGroupProcessing();
|
||||
mediaGroupProcessing = mediaGroupProcessing
|
||||
.then(async () => {
|
||||
await processMediaGroup(entry);
|
||||
})
|
||||
.catch(() => undefined);
|
||||
await mediaGroupProcessing;
|
||||
}, mediaGroupTimeoutMs),
|
||||
};
|
||||
mediaGroupBuffer.set(mediaGroupId, entry);
|
||||
@ -1051,7 +1301,7 @@ export const registerTelegramHandlers = ({
|
||||
}
|
||||
|
||||
const agentId = paginationMatch[2]?.trim() || resolveDefaultAgentId(cfg);
|
||||
const skillCommands = listSkillCommandsForAgents({
|
||||
const skillCommands = telegramDeps.listSkillCommandsForAgents({
|
||||
cfg,
|
||||
agentIds: [agentId],
|
||||
});
|
||||
@ -1208,7 +1458,7 @@ export const registerTelegramHandlers = ({
|
||||
// Directly set model override in session
|
||||
try {
|
||||
// Get session store path
|
||||
const storePath = resolveStorePath(cfg.session?.store, {
|
||||
const storePath = telegramDeps.resolveStorePath(cfg.session?.store, {
|
||||
agentId: sessionState.agentId,
|
||||
});
|
||||
|
||||
@ -1288,7 +1538,7 @@ export const registerTelegramHandlers = ({
|
||||
}
|
||||
|
||||
// Check if old chat ID has config and migrate it
|
||||
const currentConfig = loadConfig();
|
||||
const currentConfig = telegramDeps.loadConfig();
|
||||
const migration = migrateTelegramGroupConfig({
|
||||
cfg: currentConfig,
|
||||
accountId,
|
||||
|
||||
@ -52,6 +52,18 @@ import { editMessageTelegram } from "./send.js";
|
||||
import { cacheSticker, describeStickerImage } from "./sticker-cache.js";
|
||||
|
||||
const EMPTY_RESPONSE_FALLBACK = "No response generated. Please try again.";
|
||||
const DEFAULT_BOT_MESSAGE_DISPATCH_RUNTIME = {
|
||||
dispatchReplyWithBufferedBlockDispatcher,
|
||||
};
|
||||
let botMessageDispatchRuntimeForTest:
|
||||
| Partial<typeof DEFAULT_BOT_MESSAGE_DISPATCH_RUNTIME>
|
||||
| undefined;
|
||||
|
||||
export function setBotMessageDispatchRuntimeForTest(
|
||||
runtime?: Partial<typeof DEFAULT_BOT_MESSAGE_DISPATCH_RUNTIME>,
|
||||
): void {
|
||||
botMessageDispatchRuntimeForTest = runtime;
|
||||
}
|
||||
|
||||
/** Minimum chars before sending first streaming message (improves push notification UX) */
|
||||
const DRAFT_MIN_INITIAL_CHARS = 30;
|
||||
@ -149,6 +161,10 @@ export const dispatchTelegramMessage = async ({
|
||||
telegramCfg,
|
||||
opts,
|
||||
}: DispatchTelegramMessageParams) => {
|
||||
const botMessageDispatchRuntime = {
|
||||
...DEFAULT_BOT_MESSAGE_DISPATCH_RUNTIME,
|
||||
...botMessageDispatchRuntimeForTest,
|
||||
};
|
||||
const {
|
||||
ctxPayload,
|
||||
msg,
|
||||
@ -535,7 +551,7 @@ export const dispatchTelegramMessage = async ({
|
||||
|
||||
let dispatchError: unknown;
|
||||
try {
|
||||
({ queuedFinal } = await dispatchReplyWithBufferedBlockDispatcher({
|
||||
({ queuedFinal } = await botMessageDispatchRuntime.dispatchReplyWithBufferedBlockDispatcher({
|
||||
ctx: ctxPayload,
|
||||
cfg,
|
||||
dispatcherOptions: {
|
||||
|
||||
@ -77,6 +77,19 @@ import { resolveTelegramGroupPromptSettings } from "./group-config-helpers.js";
|
||||
import { buildInlineKeyboard } from "./send.js";
|
||||
|
||||
const EMPTY_RESPONSE_FALLBACK = "No response generated. Please try again.";
|
||||
const DEFAULT_BOT_NATIVE_COMMANDS_RUNTIME = {
|
||||
dispatchReplyWithBufferedBlockDispatcher,
|
||||
listSkillCommandsForAgents,
|
||||
};
|
||||
let botNativeCommandsRuntimeForTest:
|
||||
| Partial<typeof DEFAULT_BOT_NATIVE_COMMANDS_RUNTIME>
|
||||
| undefined;
|
||||
|
||||
export function setBotNativeCommandsRuntimeForTest(
|
||||
runtime?: Partial<typeof DEFAULT_BOT_NATIVE_COMMANDS_RUNTIME>,
|
||||
): void {
|
||||
botNativeCommandsRuntimeForTest = runtime;
|
||||
}
|
||||
|
||||
type TelegramNativeCommandContext = Context & { match?: string };
|
||||
|
||||
@ -366,6 +379,10 @@ export const registerTelegramNativeCommands = ({
|
||||
shouldSkipUpdate,
|
||||
opts,
|
||||
}: RegisterTelegramNativeCommandsParams) => {
|
||||
const botNativeCommandsRuntime = {
|
||||
...DEFAULT_BOT_NATIVE_COMMANDS_RUNTIME,
|
||||
...botNativeCommandsRuntimeForTest,
|
||||
};
|
||||
const silentErrorReplies = telegramCfg.silentErrorReplies === true;
|
||||
const boundRoute =
|
||||
nativeEnabled && nativeSkillsEnabled
|
||||
@ -378,7 +395,10 @@ export const registerTelegramNativeCommands = ({
|
||||
}
|
||||
const skillCommands =
|
||||
nativeEnabled && nativeSkillsEnabled && boundRoute
|
||||
? listSkillCommandsForAgents({ cfg, agentIds: [boundRoute.agentId] })
|
||||
? botNativeCommandsRuntime.listSkillCommandsForAgents({
|
||||
cfg,
|
||||
agentIds: [boundRoute.agentId],
|
||||
})
|
||||
: [];
|
||||
const nativeCommands = nativeEnabled
|
||||
? listNativeCommandSpecsForConfig(cfg, {
|
||||
@ -756,7 +776,7 @@ export const registerTelegramNativeCommands = ({
|
||||
accountId: route.accountId,
|
||||
});
|
||||
|
||||
await dispatchReplyWithBufferedBlockDispatcher({
|
||||
await botNativeCommandsRuntime.dispatchReplyWithBufferedBlockDispatcher({
|
||||
ctx: ctxPayload,
|
||||
cfg,
|
||||
dispatcherOptions: {
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user