test(telegram): stabilize inbound media harness

This commit is contained in:
Ayaan Zaidi 2026-03-18 13:21:54 +05:30
parent d9e776eb47
commit 0567f111ac
No known key found for this signature in database
5 changed files with 220 additions and 119 deletions

View File

@ -6,6 +6,7 @@ import {
createBotHandlerWithOptions,
mockTelegramFileDownload,
mockTelegramPngDownload,
watchTelegramFetch,
} from "./bot.media.test-utils.js";
describe("telegram inbound media", () => {
@ -39,8 +40,10 @@ describe("telegram inbound media", () => {
}) => {
expect(params.runtimeError).not.toHaveBeenCalled();
expect(params.fetchSpy).toHaveBeenCalledWith(
"https://api.telegram.org/file/bottok/photos/1.jpg",
expect.objectContaining({ redirect: "manual" }),
expect.objectContaining({
url: "https://api.telegram.org/file/bottok/photos/1.jpg",
filePathHint: "photos/1.jpg",
}),
);
expect(params.replySpy).toHaveBeenCalledTimes(1);
const payload = params.replySpy.mock.calls[0][0];
@ -51,7 +54,7 @@ describe("telegram inbound media", () => {
name: "skips when file_path is missing",
messageId: 2,
getFile: async () => ({}),
setupFetch: () => vi.spyOn(globalThis, "fetch"),
setupFetch: () => watchTelegramFetch(),
assert: (params: {
fetchSpy: ReturnType<typeof vi.spyOn>;
replySpy: ReturnType<typeof vi.fn>;
@ -71,6 +74,7 @@ describe("telegram inbound media", () => {
message: {
message_id: scenario.messageId,
chat: { id: 1234, type: "private" },
from: { id: 777, is_bot: false, first_name: "Ada" },
photo: [{ file_id: "fid" }],
date: 1736380800, // 2025-01-09T00:00:00Z
},
@ -106,6 +110,7 @@ describe("telegram inbound media", () => {
message: {
message_id: 1001,
chat: { id: 1234, type: "private" },
from: { id: 777, is_bot: false, first_name: "Ada" },
photo: [{ file_id: "fid" }],
date: 1736380800,
},
@ -245,6 +250,7 @@ describe("telegram media groups", () => {
messages: [
{
chat: { id: 42, type: "private" as const },
from: { id: 777, is_bot: false, first_name: "Ada" },
message_id: 1,
caption: "Here are my photos",
date: 1736380800,
@ -254,6 +260,7 @@ describe("telegram media groups", () => {
},
{
chat: { id: 42, type: "private" as const },
from: { id: 777, is_bot: false, first_name: "Ada" },
message_id: 2,
date: 1736380801,
media_group_id: "album123",
@ -272,6 +279,7 @@ describe("telegram media groups", () => {
messages: [
{
chat: { id: 42, type: "private" as const },
from: { id: 777, is_bot: false, first_name: "Ada" },
message_id: 11,
caption: "Album A",
date: 1736380800,
@ -281,6 +289,7 @@ describe("telegram media groups", () => {
},
{
chat: { id: 42, type: "private" as const },
from: { id: 777, is_bot: false, first_name: "Ada" },
message_id: 12,
caption: "Album B",
date: 1736380801,
@ -339,7 +348,6 @@ describe("telegram forwarded bursts", () => {
const runtimeError = vi.fn();
const { handler, replySpy } = await createBotHandlerWithOptions({ runtimeError });
const fetchSpy = mockTelegramPngDownload();
vi.useFakeTimers();
try {
await handler({
@ -368,8 +376,9 @@ describe("telegram forwarded bursts", () => {
getFile: async () => ({ file_path: "photos/fwd1.jpg" }),
});
await vi.runAllTimersAsync();
expect(replySpy).toHaveBeenCalledTimes(1);
await vi.waitFor(() => {
expect(replySpy).toHaveBeenCalledTimes(1);
});
expect(runtimeError).not.toHaveBeenCalled();
const payload = replySpy.mock.calls[0][0];
@ -377,7 +386,6 @@ describe("telegram forwarded bursts", () => {
expect(payload.MediaPaths).toHaveLength(1);
} finally {
fetchSpy.mockRestore();
vi.useRealTimers();
}
},
FORWARD_BURST_TEST_TIMEOUT_MS,

View File

@ -1,21 +1,55 @@
import path from "node:path";
import { MediaFetchError } from "openclaw/plugin-sdk/media-runtime";
import { resetInboundDedupe } from "openclaw/plugin-sdk/reply-runtime";
import { beforeEach, vi, type Mock } from "vitest";
import type { TelegramBotDeps } from "./bot-deps.js";
const EMPTY_REPLY_COUNTS = {
block: 0,
final: 0,
tool: 0,
} as const;
export const useSpy: Mock = vi.fn();
export const middlewareUseSpy: Mock = vi.fn();
export const onSpy: Mock = vi.fn();
export const stopSpy: Mock = vi.fn();
export const sendChatActionSpy: Mock = vi.fn();
export const undiciFetchSpy: Mock = vi.fn((input: RequestInfo | URL, init?: RequestInit) =>
globalThis.fetch(input, init),
);
function defaultUndiciFetch(input: RequestInfo | URL, init?: RequestInit) {
return globalThis.fetch(input, init);
}
export const undiciFetchSpy: Mock = vi.fn(defaultUndiciFetch);
export function resetUndiciFetchMock() {
undiciFetchSpy.mockReset();
undiciFetchSpy.mockImplementation(defaultUndiciFetch);
}
type FetchRemoteMediaFn = typeof import("openclaw/plugin-sdk/media-runtime").fetchRemoteMedia;
async function defaultFetchRemoteMedia(
params: Parameters<FetchRemoteMediaFn>[0],
): ReturnType<FetchRemoteMediaFn> {
if (!params.fetchImpl) {
throw new MediaFetchError("fetch_failed", `Missing fetchImpl for ${params.url}`);
}
const response = await params.fetchImpl(params.url, {
redirect: "manual",
});
if (!response.ok) {
throw new MediaFetchError(
"http_error",
`Failed to fetch media from ${params.url}: HTTP ${response.status} ${response.statusText}`,
);
}
const arrayBuffer = await response.arrayBuffer();
return {
buffer: Buffer.from(arrayBuffer),
contentType: response.headers.get("content-type") ?? undefined,
fileName: params.filePathHint ? path.basename(params.filePathHint) : undefined,
} as Awaited<ReturnType<FetchRemoteMediaFn>>;
}
export const fetchRemoteMediaSpy: Mock = vi.fn(defaultFetchRemoteMedia);
export function resetFetchRemoteMediaMock() {
fetchRemoteMediaSpy.mockReset();
fetchRemoteMediaSpy.mockImplementation(defaultFetchRemoteMedia);
}
async function defaultSaveMediaBuffer(buffer: Buffer, contentType?: string) {
return {
@ -63,11 +97,7 @@ const apiStub: ApiStub = {
setMyCommands: vi.fn(async () => undefined),
};
export const telegramBotRuntimeForTest: {
Bot: new (token: string) => unknown;
sequentialize: () => unknown;
apiThrottler: () => unknown;
} = {
export const telegramBotRuntimeForTest = {
Bot: class {
api = apiStub;
use = middlewareUseSpy;
@ -81,26 +111,46 @@ export const telegramBotRuntimeForTest: {
apiThrottler: () => throttlerSpy(),
};
const mediaHarnessReplySpy = vi.hoisted(() =>
vi.fn(async (_ctx, opts) => {
await opts?.onReplyStart?.();
return undefined;
}),
);
const mediaHarnessReplySpy = vi.hoisted(() => vi.fn(async () => undefined));
type DispatchReplyWithBufferedBlockDispatcherFn =
typeof import("openclaw/plugin-sdk/reply-runtime").dispatchReplyWithBufferedBlockDispatcher;
type DispatchReplyHarnessParams = Parameters<DispatchReplyWithBufferedBlockDispatcherFn>[0];
let actualDispatchReplyWithBufferedBlockDispatcherPromise:
| Promise<DispatchReplyWithBufferedBlockDispatcherFn>
| undefined;
async function getActualDispatchReplyWithBufferedBlockDispatcher() {
actualDispatchReplyWithBufferedBlockDispatcherPromise ??=
import("../../../src/auto-reply/reply/provider-dispatcher.js").then(
(module) =>
module.dispatchReplyWithBufferedBlockDispatcher as DispatchReplyWithBufferedBlockDispatcherFn,
);
return await actualDispatchReplyWithBufferedBlockDispatcherPromise;
}
async function dispatchReplyWithBufferedBlockDispatcherViaActual(
params: DispatchReplyHarnessParams,
) {
const actualDispatchReplyWithBufferedBlockDispatcher =
await getActualDispatchReplyWithBufferedBlockDispatcher();
return await actualDispatchReplyWithBufferedBlockDispatcher({
...params,
replyResolver: async (ctx, _cfg, opts) => {
await opts?.onReplyStart?.();
return await mediaHarnessReplySpy(ctx, opts);
},
});
}
const mediaHarnessDispatchReplyWithBufferedBlockDispatcher = vi.hoisted(() =>
vi.fn(async (params) => {
await params.dispatcherOptions?.typingCallbacks?.start?.();
const reply = await mediaHarnessReplySpy(params.ctx, params.replyOptions);
const payloads = reply === undefined ? [] : Array.isArray(reply) ? reply : [reply];
for (const payload of payloads) {
await params.dispatcherOptions?.deliver?.(payload, { kind: "final" });
}
return { queuedFinal: false, counts: EMPTY_REPLY_COUNTS };
}),
vi.fn<DispatchReplyWithBufferedBlockDispatcherFn>(
dispatchReplyWithBufferedBlockDispatcherViaActual,
),
);
export const telegramBotDepsForTest: TelegramBotDeps = {
export const telegramBotDepsForTest = {
loadConfig: () => ({
channels: { telegram: { dmPolicy: "open" as const, allowFrom: ["*"] } },
channels: { telegram: { dmPolicy: "open", allowFrom: ["*"] } },
}),
resolveStorePath: vi.fn((storePath?: string) => storePath ?? "/tmp/telegram-media-sessions.json"),
readChannelAllowFromStore: vi.fn(async () => [] as string[]),
@ -113,6 +163,8 @@ export const telegramBotDepsForTest: TelegramBotDeps = {
beforeEach(() => {
resetInboundDedupe();
resetSaveMediaBufferMock();
resetUndiciFetchMock();
resetFetchRemoteMediaMock();
});
const throttlerSpy = vi.fn(() => "throttler");
@ -133,6 +185,12 @@ vi.doMock("openclaw/plugin-sdk/media-runtime", async (importOriginal) => {
const actual = await importOriginal<typeof import("openclaw/plugin-sdk/media-runtime")>();
const mockModule = Object.create(null) as Record<string, unknown>;
Object.defineProperties(mockModule, Object.getOwnPropertyDescriptors(actual));
Object.defineProperty(mockModule, "fetchRemoteMedia", {
configurable: true,
enumerable: true,
writable: true,
value: (...args: Parameters<typeof fetchRemoteMediaSpy>) => fetchRemoteMediaSpy(...args),
});
Object.defineProperty(mockModule, "saveMediaBuffer", {
configurable: true,
enumerable: true,
@ -149,24 +207,35 @@ vi.doMock("openclaw/plugin-sdk/config-runtime", async (importOriginal) => {
loadConfig: () => ({
channels: { telegram: { dmPolicy: "open", allowFrom: ["*"] } },
}),
};
});
vi.doMock("openclaw/plugin-sdk/config-runtime", async (importOriginal) => {
const actual = await importOriginal<typeof import("openclaw/plugin-sdk/config-runtime")>();
return {
...actual,
updateLastRoute: vi.fn(async () => undefined),
};
});
vi.doMock("openclaw/plugin-sdk/conversation-runtime", () => ({
readChannelAllowFromStore: vi.fn(async () => [] as string[]),
upsertChannelPairingRequest: vi.fn(async () => ({
code: "PAIRCODE",
created: true,
})),
}));
vi.doMock("openclaw/plugin-sdk/agent-runtime", async (importOriginal) => {
const actual = await importOriginal<typeof import("openclaw/plugin-sdk/agent-runtime")>();
return {
...actual,
findModelInCatalog: vi.fn(() => undefined),
loadModelCatalog: vi.fn(async () => []),
modelSupportsVision: vi.fn(() => false),
resolveDefaultModelForAgent: vi.fn(() => ({
provider: "openai",
model: "gpt-test",
})),
};
});
vi.doMock("openclaw/plugin-sdk/conversation-runtime", async (importOriginal) => {
const actual = await importOriginal<typeof import("openclaw/plugin-sdk/conversation-runtime")>();
return {
...actual,
readChannelAllowFromStore: vi.fn(async () => [] as string[]),
upsertChannelPairingRequest: vi.fn(async () => ({
code: "PAIRCODE",
created: true,
})),
};
});
vi.doMock("openclaw/plugin-sdk/reply-runtime", async (importOriginal) => {
const actual = await importOriginal<typeof import("openclaw/plugin-sdk/reply-runtime")>();

View File

@ -7,6 +7,7 @@ import {
describeStickerImageSpy,
getCachedStickerSpy,
mockTelegramFileDownload,
watchTelegramFetch,
} from "./bot.media.test-utils.js";
describe("telegram stickers", () => {
@ -34,6 +35,7 @@ describe("telegram stickers", () => {
message: {
message_id: 100,
chat: { id: 1234, type: "private" },
from: { id: 777, is_bot: false, first_name: "Ada" },
sticker: {
file_id: "sticker_file_id_123",
file_unique_id: "sticker_unique_123",
@ -53,8 +55,10 @@ describe("telegram stickers", () => {
expect(runtimeError).not.toHaveBeenCalled();
expect(fetchSpy).toHaveBeenCalledWith(
"https://api.telegram.org/file/bottok/stickers/sticker.webp",
expect.objectContaining({ redirect: "manual" }),
expect.objectContaining({
url: "https://api.telegram.org/file/bottok/stickers/sticker.webp",
filePathHint: "stickers/sticker.webp",
}),
);
expect(replySpy).toHaveBeenCalledTimes(1);
const payload = replySpy.mock.calls[0][0];
@ -82,18 +86,16 @@ describe("telegram stickers", () => {
cachedAt: "2026-01-20T10:00:00.000Z",
});
const fetchSpy = vi.spyOn(globalThis, "fetch").mockResolvedValueOnce({
ok: true,
status: 200,
statusText: "OK",
headers: { get: () => "image/webp" },
arrayBuffer: async () => new Uint8Array([0x52, 0x49, 0x46, 0x46]).buffer,
} as unknown as Response);
const fetchSpy = mockTelegramFileDownload({
contentType: "image/webp",
bytes: new Uint8Array([0x52, 0x49, 0x46, 0x46]),
});
await handler({
message: {
message_id: 103,
chat: { id: 1234, type: "private" },
from: { id: 777, is_bot: false, first_name: "Ada" },
sticker: {
file_id: "new_file_id",
file_unique_id: "sticker_unique_456",
@ -167,12 +169,13 @@ describe("telegram stickers", () => {
]) {
replySpy.mockClear();
runtimeError.mockClear();
const fetchSpy = vi.spyOn(globalThis, "fetch");
const fetchSpy = watchTelegramFetch();
await handler({
message: {
message_id: scenario.messageId,
chat: { id: 1234, type: "private" },
from: { id: 777, is_bot: false, first_name: "Ada" },
sticker: scenario.sticker,
date: 1736380800,
},
@ -202,43 +205,44 @@ describe("telegram text fragments", () => {
"buffers near-limit text and processes sequential parts as one message",
async () => {
const { handler, replySpy } = await createBotHandlerWithOptions({});
vi.useFakeTimers();
try {
const part1 = "A".repeat(4050);
const part2 = "B".repeat(50);
const part1 = "A".repeat(4050);
const part2 = "B".repeat(50);
await handler({
message: {
chat: { id: 42, type: "private" },
message_id: 10,
date: 1736380800,
text: part1,
},
me: { username: "openclaw_bot" },
getFile: async () => ({}),
});
await handler({
message: {
chat: { id: 42, type: "private" },
from: { id: 777, is_bot: false, first_name: "Ada" },
message_id: 10,
date: 1736380800,
text: part1,
},
me: { username: "openclaw_bot" },
getFile: async () => ({}),
});
await handler({
message: {
chat: { id: 42, type: "private" },
message_id: 11,
date: 1736380801,
text: part2,
},
me: { username: "openclaw_bot" },
getFile: async () => ({}),
});
await handler({
message: {
chat: { id: 42, type: "private" },
from: { id: 777, is_bot: false, first_name: "Ada" },
message_id: 11,
date: 1736380801,
text: part2,
},
me: { username: "openclaw_bot" },
getFile: async () => ({}),
});
expect(replySpy).not.toHaveBeenCalled();
await vi.advanceTimersByTimeAsync(TEXT_FRAGMENT_FLUSH_MS * 2);
expect(replySpy).toHaveBeenCalledTimes(1);
expect(replySpy).not.toHaveBeenCalled();
await vi.waitFor(
() => {
expect(replySpy).toHaveBeenCalledTimes(1);
},
{ timeout: TEXT_FRAGMENT_FLUSH_MS * 6, interval: 5 },
);
const payload = replySpy.mock.calls[0][0] as { RawBody?: string };
expect(payload.RawBody).toContain(part1.slice(0, 32));
expect(payload.RawBody).toContain(part2.slice(0, 32));
} finally {
vi.useRealTimers();
}
const payload = replySpy.mock.calls[0][0] as { RawBody?: string };
expect(payload.RawBody).toContain(part1.slice(0, 32));
expect(payload.RawBody).toContain(part2.slice(0, 32));
},
TEXT_FRAGMENT_TEST_TIMEOUT_MS,
);

View File

@ -22,6 +22,18 @@ let createTelegramBotRef: typeof import("./bot.js").createTelegramBot;
let replySpyRef: ReturnType<typeof vi.fn>;
let onSpyRef: Mock;
let sendChatActionSpyRef: Mock;
let fetchRemoteMediaSpyRef: Mock;
let resetFetchRemoteMediaMockRef: () => void;
type FetchMockHandle = Mock & { mockRestore: () => void };
function createFetchMockHandle(): FetchMockHandle {
return Object.assign(fetchRemoteMediaSpyRef, {
mockRestore: () => {
resetFetchRemoteMediaMockRef();
},
}) as FetchMockHandle;
}
export async function createBotHandler(): Promise<{
handler: (ctx: Record<string, unknown>) => Promise<void>;
@ -68,24 +80,26 @@ export async function createBotHandlerWithOptions(options: {
export function mockTelegramFileDownload(params: {
contentType: string;
bytes: Uint8Array;
}): ReturnType<typeof vi.spyOn> {
return vi.spyOn(globalThis, "fetch").mockResolvedValueOnce({
ok: true,
status: 200,
statusText: "OK",
headers: { get: () => params.contentType },
arrayBuffer: async () => params.bytes.buffer,
} as unknown as Response);
}): FetchMockHandle {
fetchRemoteMediaSpyRef.mockResolvedValueOnce({
buffer: Buffer.from(params.bytes),
contentType: params.contentType,
fileName: "mock-file",
});
return createFetchMockHandle();
}
export function mockTelegramPngDownload(): ReturnType<typeof vi.spyOn> {
return vi.spyOn(globalThis, "fetch").mockResolvedValue({
ok: true,
status: 200,
statusText: "OK",
headers: { get: () => "image/png" },
arrayBuffer: async () => new Uint8Array([0x89, 0x50, 0x4e, 0x47]).buffer,
} as unknown as Response);
export function mockTelegramPngDownload(): FetchMockHandle {
fetchRemoteMediaSpyRef.mockResolvedValue({
buffer: Buffer.from(new Uint8Array([0x89, 0x50, 0x4e, 0x47])),
contentType: "image/png",
fileName: "mock-file.png",
});
return createFetchMockHandle();
}
export function watchTelegramFetch(): FetchMockHandle {
return createFetchMockHandle();
}
beforeEach(() => {
@ -106,6 +120,8 @@ beforeAll(async () => {
const harness = await import("./bot.media.e2e-harness.js");
onSpyRef = harness.onSpy;
sendChatActionSpyRef = harness.sendChatActionSpy;
fetchRemoteMediaSpyRef = harness.fetchRemoteMediaSpy;
resetFetchRemoteMediaMockRef = harness.resetFetchRemoteMediaMock;
const botModule = await import("./bot.js");
botModule.setTelegramBotRuntimeForTest(
harness.telegramBotRuntimeForTest as unknown as Parameters<
@ -121,8 +137,12 @@ beforeAll(async () => {
replySpyRef = (replyModule as unknown as { __replySpy: ReturnType<typeof vi.fn> }).__replySpy;
}, TELEGRAM_BOT_IMPORT_TIMEOUT_MS);
vi.mock("./sticker-cache.js", () => ({
cacheSticker: (...args: unknown[]) => cacheStickerSpy(...args),
getCachedSticker: (...args: unknown[]) => getCachedStickerSpy(...args),
describeStickerImage: (...args: unknown[]) => describeStickerImageSpy(...args),
}));
vi.mock("./sticker-cache.js", async (importOriginal) => {
const actual = await importOriginal<typeof import("./sticker-cache.js")>();
return {
...actual,
cacheSticker: (...args: unknown[]) => cacheStickerSpy(...args),
getCachedSticker: (...args: unknown[]) => getCachedStickerSpy(...args),
describeStickerImage: (...args: unknown[]) => describeStickerImageSpy(...args),
};
});

View File

@ -88,8 +88,8 @@ export function createInboundDebouncer<T>(params: InboundDebounceCreateParams<T>
if (buffer.timeout) {
clearTimeout(buffer.timeout);
}
buffer.timeout = setTimeout(() => {
void flushBuffer(key, buffer);
buffer.timeout = setTimeout(async () => {
await flushBuffer(key, buffer);
}, buffer.debounceMs);
buffer.timeout.unref?.();
};