From 8023f4c701bce58182f42d013903800f1dc8db25 Mon Sep 17 00:00:00 2001 From: Ayaan Zaidi Date: Fri, 13 Mar 2026 10:11:43 +0530 Subject: [PATCH] fix(telegram): thread media transport policy into SSRF (#44639) * fix(telegram): preserve media download transport policy * refactor(telegram): thread media transport policy * fix(telegram): sync fallback media policy * fix: note telegram media transport fix (#44639) --- CHANGELOG.md | 1 + src/infra/net/fetch-guard.ts | 4 +- src/infra/net/ssrf.dispatcher.test.ts | 93 +++++++++- src/infra/net/ssrf.ts | 54 +++++- src/media/fetch.telegram-network.test.ts | 142 +++++++++++++++ src/media/fetch.ts | 5 +- src/telegram/bot-handlers.ts | 8 +- src/telegram/bot-native-commands.ts | 3 +- src/telegram/bot.ts | 16 +- .../bot/delivery.resolve-media-retry.test.ts | 16 +- src/telegram/bot/delivery.resolve-media.ts | 40 +++-- src/telegram/fetch.test.ts | 10 +- src/telegram/fetch.ts | 162 +++++++++++++----- 13 files changed, 469 insertions(+), 85 deletions(-) create mode 100644 src/media/fetch.telegram-network.test.ts diff --git a/CHANGELOG.md b/CHANGELOG.md index a485dd36d0a..e6e85666b08 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -74,6 +74,7 @@ Docs: https://docs.openclaw.ai - Memory/session sync: add mode-aware post-compaction session reindexing with `agents.defaults.compaction.postIndexSync` plus `agents.defaults.memorySearch.sync.sessions.postCompactionForce`, so compacted session memory can refresh immediately without forcing every deployment into synchronous reindexing. (#25561) thanks @rodrigouroz. - Telegram/model picker: make inline model button selections persist the chosen session model correctly, clear overrides when selecting the configured default, and include effective fallback models in `/models` button validation. (#40105) Thanks @avirweb. - Telegram/native command sync: suppress expected `BOT_COMMANDS_TOO_MUCH` retry error noise, add a final fallback summary log, and document the difference between command-menu overflow and real Telegram network failures. +- Telegram/media downloads: thread the same direct or proxy transport policy into SSRF-guarded file fetches so inbound attachments keep working when Telegram falls back between env-proxy and direct networking. (#44639) Thanks @obviyus. - Mattermost/reply media delivery: pass agent-scoped `mediaLocalRoots` through shared reply delivery so allowed local files upload correctly from button, slash-command, and model-picker replies. (#44021) Thanks @LyleLiu666. - Plugins/env-scoped roots: fix plugin discovery/load caches and provenance tracking so same-process `HOME`/`OPENCLAW_HOME` changes no longer reuse stale plugin state or misreport `~/...` plugins as untracked. (#44046) thanks @gumadeiras. - Gateway/session discovery: discover disk-only and retired ACP session stores under custom templated `session.store` roots so ACP reconciliation, session-id/session-label targeting, and run-id fallback keep working after restart. (#44176) thanks @gumadeiras. diff --git a/src/infra/net/fetch-guard.ts b/src/infra/net/fetch-guard.ts index faae38b013c..ed082e92fb9 100644 --- a/src/infra/net/fetch-guard.ts +++ b/src/infra/net/fetch-guard.ts @@ -7,6 +7,7 @@ import { createPinnedDispatcher, resolvePinnedHostnameWithPolicy, type LookupFn, + type PinnedDispatcherPolicy, SsrFBlockedError, type SsrFPolicy, } from "./ssrf.js"; @@ -29,6 +30,7 @@ export type GuardedFetchOptions = { signal?: AbortSignal; policy?: SsrFPolicy; lookupFn?: LookupFn; + dispatcherPolicy?: PinnedDispatcherPolicy; mode?: GuardedFetchMode; pinDns?: boolean; /** @deprecated use `mode: "trusted_env_proxy"` for trusted/operator-controlled URLs. */ @@ -196,7 +198,7 @@ export async function fetchWithSsrFGuard(params: GuardedFetchOptions): Promise ({ +const { agentCtor, envHttpProxyAgentCtor, proxyAgentCtor } = vi.hoisted(() => ({ agentCtor: vi.fn(function MockAgent(this: { options: unknown }, options: unknown) { this.options = options; }), + envHttpProxyAgentCtor: vi.fn(function MockEnvHttpProxyAgent( + this: { options: unknown }, + options: unknown, + ) { + this.options = options; + }), + proxyAgentCtor: vi.fn(function MockProxyAgent(this: { options: unknown }, options: unknown) { + this.options = options; + }), })); vi.mock("undici", () => ({ Agent: agentCtor, + EnvHttpProxyAgent: envHttpProxyAgentCtor, + ProxyAgent: proxyAgentCtor, })); import { createPinnedDispatcher, type PinnedHostname } from "./ssrf.js"; @@ -34,4 +45,84 @@ describe("createPinnedDispatcher", () => { | undefined; expect(firstCallArg?.connect?.autoSelectFamily).toBeUndefined(); }); + + it("preserves caller transport hints while overriding lookup", () => { + const lookup = vi.fn() as unknown as PinnedHostname["lookup"]; + const previousLookup = vi.fn(); + const pinned: PinnedHostname = { + hostname: "api.telegram.org", + addresses: ["149.154.167.220"], + lookup, + }; + + createPinnedDispatcher(pinned, { + mode: "direct", + connect: { + autoSelectFamily: true, + autoSelectFamilyAttemptTimeout: 300, + lookup: previousLookup, + }, + }); + + expect(agentCtor).toHaveBeenCalledWith({ + connect: { + autoSelectFamily: true, + autoSelectFamilyAttemptTimeout: 300, + lookup, + }, + }); + }); + + it("keeps env proxy route while pinning the direct no-proxy path", () => { + const lookup = vi.fn() as unknown as PinnedHostname["lookup"]; + const pinned: PinnedHostname = { + hostname: "api.telegram.org", + addresses: ["149.154.167.220"], + lookup, + }; + + createPinnedDispatcher(pinned, { + mode: "env-proxy", + connect: { + autoSelectFamily: true, + }, + proxyTls: { + autoSelectFamily: true, + }, + }); + + expect(envHttpProxyAgentCtor).toHaveBeenCalledWith({ + connect: { + autoSelectFamily: true, + lookup, + }, + proxyTls: { + autoSelectFamily: true, + }, + }); + }); + + it("keeps explicit proxy routing intact", () => { + const lookup = vi.fn() as unknown as PinnedHostname["lookup"]; + const pinned: PinnedHostname = { + hostname: "api.telegram.org", + addresses: ["149.154.167.220"], + lookup, + }; + + createPinnedDispatcher(pinned, { + mode: "explicit-proxy", + proxyUrl: "http://127.0.0.1:7890", + proxyTls: { + autoSelectFamily: false, + }, + }); + + expect(proxyAgentCtor).toHaveBeenCalledWith({ + uri: "http://127.0.0.1:7890", + proxyTls: { + autoSelectFamily: false, + }, + }); + }); }); diff --git a/src/infra/net/ssrf.ts b/src/infra/net/ssrf.ts index 45fba10fd30..db70664a43f 100644 --- a/src/infra/net/ssrf.ts +++ b/src/infra/net/ssrf.ts @@ -1,6 +1,6 @@ import { lookup as dnsLookupCb, type LookupAddress } from "node:dns"; import { lookup as dnsLookup } from "node:dns/promises"; -import { Agent, type Dispatcher } from "undici"; +import { Agent, EnvHttpProxyAgent, ProxyAgent, type Dispatcher } from "undici"; import { extractEmbeddedIpv4FromIpv6, isBlockedSpecialUseIpv4Address, @@ -255,6 +255,22 @@ export type PinnedHostname = { lookup: typeof dnsLookupCb; }; +export type PinnedDispatcherPolicy = + | { + mode: "direct"; + connect?: Record; + } + | { + mode: "env-proxy"; + connect?: Record; + proxyTls?: Record; + } + | { + mode: "explicit-proxy"; + proxyUrl: string; + proxyTls?: Record; + }; + function dedupeAndPreferIpv4(results: readonly LookupAddress[]): string[] { const seen = new Set(); const ipv4: string[] = []; @@ -329,11 +345,37 @@ export async function resolvePinnedHostname( return await resolvePinnedHostnameWithPolicy(hostname, { lookupFn }); } -export function createPinnedDispatcher(pinned: PinnedHostname): Dispatcher { - return new Agent({ - connect: { - lookup: pinned.lookup, - }, +function withPinnedLookup( + lookup: PinnedHostname["lookup"], + connect?: Record, +): Record { + return connect ? { ...connect, lookup } : { lookup }; +} + +export function createPinnedDispatcher( + pinned: PinnedHostname, + policy?: PinnedDispatcherPolicy, +): Dispatcher { + if (!policy || policy.mode === "direct") { + return new Agent({ + connect: withPinnedLookup(pinned.lookup, policy?.connect), + }); + } + + if (policy.mode === "env-proxy") { + return new EnvHttpProxyAgent({ + connect: withPinnedLookup(pinned.lookup, policy.connect), + ...(policy.proxyTls ? { proxyTls: { ...policy.proxyTls } } : {}), + }); + } + + const proxyUrl = policy.proxyUrl.trim(); + if (!policy.proxyTls) { + return new ProxyAgent(proxyUrl); + } + return new ProxyAgent({ + uri: proxyUrl, + proxyTls: { ...policy.proxyTls }, }); } diff --git a/src/media/fetch.telegram-network.test.ts b/src/media/fetch.telegram-network.test.ts new file mode 100644 index 00000000000..c9989867f0b --- /dev/null +++ b/src/media/fetch.telegram-network.test.ts @@ -0,0 +1,142 @@ +import { afterEach, describe, expect, it, vi } from "vitest"; +import { resolveTelegramTransport } from "../telegram/fetch.js"; +import { fetchRemoteMedia } from "./fetch.js"; + +const undiciFetch = vi.hoisted(() => vi.fn()); +const AgentCtor = vi.hoisted(() => + vi.fn(function MockAgent( + this: { options?: Record }, + options?: Record, + ) { + this.options = options; + }), +); +const EnvHttpProxyAgentCtor = vi.hoisted(() => + vi.fn(function MockEnvHttpProxyAgent( + this: { options?: Record }, + options?: Record, + ) { + this.options = options; + }), +); +const ProxyAgentCtor = vi.hoisted(() => + vi.fn(function MockProxyAgent( + this: { options?: Record | string }, + options?: Record | string, + ) { + this.options = options; + }), +); + +vi.mock("undici", () => ({ + Agent: AgentCtor, + EnvHttpProxyAgent: EnvHttpProxyAgentCtor, + ProxyAgent: ProxyAgentCtor, + fetch: undiciFetch, +})); + +describe("fetchRemoteMedia telegram network policy", () => { + type LookupFn = NonNullable[0]["lookupFn"]>; + + afterEach(() => { + undiciFetch.mockReset(); + AgentCtor.mockClear(); + EnvHttpProxyAgentCtor.mockClear(); + ProxyAgentCtor.mockClear(); + vi.unstubAllEnvs(); + }); + + it("preserves Telegram resolver transport policy for file downloads", async () => { + const lookupFn = vi.fn(async () => [ + { address: "149.154.167.220", family: 4 }, + ]) as unknown as LookupFn; + undiciFetch.mockResolvedValueOnce( + new Response(new Uint8Array([0xff, 0xd8, 0xff, 0x00]), { + status: 200, + headers: { "content-type": "image/jpeg" }, + }), + ); + + const telegramTransport = resolveTelegramTransport(undefined, { + network: { + autoSelectFamily: true, + dnsResultOrder: "verbatim", + }, + }); + + await fetchRemoteMedia({ + url: "https://api.telegram.org/file/bottok/photos/1.jpg", + fetchImpl: telegramTransport.sourceFetch, + dispatcherPolicy: telegramTransport.pinnedDispatcherPolicy, + lookupFn, + maxBytes: 1024, + ssrfPolicy: { + allowedHostnames: ["api.telegram.org"], + allowRfc2544BenchmarkRange: true, + }, + }); + + const init = undiciFetch.mock.calls[0]?.[1] as + | (RequestInit & { + dispatcher?: { + options?: { + connect?: Record; + }; + }; + }) + | undefined; + + expect(init?.dispatcher?.options?.connect).toEqual( + expect.objectContaining({ + autoSelectFamily: true, + autoSelectFamilyAttemptTimeout: 300, + lookup: expect.any(Function), + }), + ); + }); + + it("keeps explicit proxy routing for file downloads", async () => { + const { makeProxyFetch } = await import("../telegram/proxy.js"); + const lookupFn = vi.fn(async () => [ + { address: "149.154.167.220", family: 4 }, + ]) as unknown as LookupFn; + undiciFetch.mockResolvedValueOnce( + new Response(new Uint8Array([0x25, 0x50, 0x44, 0x46]), { + status: 200, + headers: { "content-type": "application/pdf" }, + }), + ); + + const telegramTransport = resolveTelegramTransport(makeProxyFetch("http://127.0.0.1:7890"), { + network: { + autoSelectFamily: false, + dnsResultOrder: "ipv4first", + }, + }); + + await fetchRemoteMedia({ + url: "https://api.telegram.org/file/bottok/files/1.pdf", + fetchImpl: telegramTransport.sourceFetch, + dispatcherPolicy: telegramTransport.pinnedDispatcherPolicy, + lookupFn, + maxBytes: 1024, + ssrfPolicy: { + allowedHostnames: ["api.telegram.org"], + allowRfc2544BenchmarkRange: true, + }, + }); + + const init = undiciFetch.mock.calls[0]?.[1] as + | (RequestInit & { + dispatcher?: { + options?: { + uri?: string; + }; + }; + }) + | undefined; + + expect(init?.dispatcher?.options?.uri).toBe("http://127.0.0.1:7890"); + expect(ProxyAgentCtor).toHaveBeenCalled(); + }); +}); diff --git a/src/media/fetch.ts b/src/media/fetch.ts index cdd62e4a044..40cd8b2414f 100644 --- a/src/media/fetch.ts +++ b/src/media/fetch.ts @@ -1,6 +1,6 @@ import path from "node:path"; import { fetchWithSsrFGuard, withStrictGuardedFetchMode } from "../infra/net/fetch-guard.js"; -import type { LookupFn, SsrFPolicy } from "../infra/net/ssrf.js"; +import type { LookupFn, PinnedDispatcherPolicy, SsrFPolicy } from "../infra/net/ssrf.js"; import { detectMime, extensionForMime } from "./mime.js"; import { readResponseWithLimit } from "./read-response-with-limit.js"; @@ -35,6 +35,7 @@ type FetchMediaOptions = { readIdleTimeoutMs?: number; ssrfPolicy?: SsrFPolicy; lookupFn?: LookupFn; + dispatcherPolicy?: PinnedDispatcherPolicy; }; function stripQuotes(value: string): string { @@ -92,6 +93,7 @@ export async function fetchRemoteMedia(options: FetchMediaOptions): Promise> = null; try { - media = await resolveMedia(ctx, mediaMaxBytes, opts.token, telegramFetchImpl); + media = await resolveMedia(ctx, mediaMaxBytes, opts.token, telegramTransport); } catch (mediaErr) { if (isMediaSizeLimitError(mediaErr)) { if (sendOversizeWarning) { diff --git a/src/telegram/bot-native-commands.ts b/src/telegram/bot-native-commands.ts index 06148b17b33..2bcbebe63fa 100644 --- a/src/telegram/bot-native-commands.ts +++ b/src/telegram/bot-native-commands.ts @@ -65,6 +65,7 @@ import { import type { TelegramContext } from "./bot/types.js"; import { resolveTelegramConversationRoute } from "./conversation-route.js"; import { shouldSuppressLocalTelegramExecApprovalPrompt } from "./exec-approvals.js"; +import type { TelegramTransport } from "./fetch.js"; import { evaluateTelegramGroupBaseAccess, evaluateTelegramGroupPolicyAccess, @@ -94,7 +95,7 @@ export type RegisterTelegramHandlerParams = { bot: Bot; mediaMaxBytes: number; opts: TelegramBotOptions; - telegramFetchImpl?: typeof fetch; + telegramTransport?: TelegramTransport; runtime: RuntimeEnv; telegramCfg: TelegramAccountConfig; allowFrom?: Array; diff --git a/src/telegram/bot.ts b/src/telegram/bot.ts index ddb26314f12..a1d60e61f71 100644 --- a/src/telegram/bot.ts +++ b/src/telegram/bot.ts @@ -38,7 +38,7 @@ import { type TelegramUpdateKeyContext, } from "./bot-updates.js"; import { buildTelegramGroupPeerId, resolveTelegramStreamMode } from "./bot/helpers.js"; -import { resolveTelegramFetch } from "./fetch.js"; +import { resolveTelegramTransport } from "./fetch.js"; import { tagTelegramNetworkError } from "./network-errors.js"; import { createTelegramSendChatActionHandler } from "./sendchataction-401-backoff.js"; import { getTelegramSequentialKey } from "./sequential-key.js"; @@ -132,19 +132,21 @@ export function createTelegramBot(opts: TelegramBotOptions) { : null; const telegramCfg = account.config; - const fetchImpl = resolveTelegramFetch(opts.proxyFetch, { + const telegramTransport = resolveTelegramTransport(opts.proxyFetch, { network: telegramCfg.network, - }) as unknown as ApiClientOptions["fetch"]; - const shouldProvideFetch = Boolean(fetchImpl); + }); + const shouldProvideFetch = Boolean(telegramTransport.fetch); // grammY's ApiClientOptions types still track `node-fetch` types; Node 22+ global fetch // (undici) is structurally compatible at runtime but not assignable in TS. - const fetchForClient = fetchImpl as unknown as NonNullable; + const fetchForClient = telegramTransport.fetch as unknown as NonNullable< + ApiClientOptions["fetch"] + >; // When a shutdown abort signal is provided, wrap fetch so every Telegram API request // (especially long-polling getUpdates) aborts immediately on shutdown. Without this, // the in-flight getUpdates hangs for up to 30s, and a new gateway instance starting // its own poll triggers a 409 Conflict from Telegram. - let finalFetch = shouldProvideFetch && fetchImpl ? fetchForClient : undefined; + let finalFetch = shouldProvideFetch ? fetchForClient : undefined; if (opts.fetchAbortSignal) { const baseFetch = finalFetch ?? (globalThis.fetch as unknown as NonNullable); @@ -493,7 +495,7 @@ export function createTelegramBot(opts: TelegramBotOptions) { accountId: account.accountId, bot, opts, - telegramFetchImpl: fetchImpl as unknown as typeof fetch | undefined, + telegramTransport, runtime, mediaMaxBytes, telegramCfg, diff --git a/src/telegram/bot/delivery.resolve-media-retry.test.ts b/src/telegram/bot/delivery.resolve-media-retry.test.ts index df6124343fd..05d5c5f8b3e 100644 --- a/src/telegram/bot/delivery.resolve-media-retry.test.ts +++ b/src/telegram/bot/delivery.resolve-media-retry.test.ts @@ -6,9 +6,13 @@ import type { TelegramContext } from "./types.js"; const saveMediaBuffer = vi.fn(); const fetchRemoteMedia = vi.fn(); -vi.mock("../../media/store.js", () => ({ - saveMediaBuffer: (...args: unknown[]) => saveMediaBuffer(...args), -})); +vi.mock("../../media/store.js", async (importOriginal) => { + const actual = await importOriginal(); + return { + ...actual, + saveMediaBuffer: (...args: unknown[]) => saveMediaBuffer(...args), + }; +}); vi.mock("../../media/fetch.js", () => ({ fetchRemoteMedia: (...args: unknown[]) => fetchRemoteMedia(...args), @@ -297,6 +301,7 @@ describe("resolveMedia getFile retry", () => { it("uses caller-provided fetch impl for file downloads", async () => { const getFile = vi.fn().mockResolvedValue({ file_path: "documents/file_42.pdf" }); const callerFetch = vi.fn() as unknown as typeof fetch; + const callerTransport = { fetch: callerFetch, sourceFetch: callerFetch }; fetchRemoteMedia.mockResolvedValueOnce({ buffer: Buffer.from("pdf-data"), contentType: "application/pdf", @@ -311,7 +316,7 @@ describe("resolveMedia getFile retry", () => { makeCtx("document", getFile), MAX_MEDIA_BYTES, BOT_TOKEN, - callerFetch, + callerTransport, ); expect(result).not.toBeNull(); @@ -325,6 +330,7 @@ describe("resolveMedia getFile retry", () => { it("uses caller-provided fetch impl for sticker downloads", async () => { const getFile = vi.fn().mockResolvedValue({ file_path: "stickers/file_0.webp" }); const callerFetch = vi.fn() as unknown as typeof fetch; + const callerTransport = { fetch: callerFetch, sourceFetch: callerFetch }; fetchRemoteMedia.mockResolvedValueOnce({ buffer: Buffer.from("sticker-data"), contentType: "image/webp", @@ -339,7 +345,7 @@ describe("resolveMedia getFile retry", () => { makeCtx("sticker", getFile), MAX_MEDIA_BYTES, BOT_TOKEN, - callerFetch, + callerTransport, ); expect(result).not.toBeNull(); diff --git a/src/telegram/bot/delivery.resolve-media.ts b/src/telegram/bot/delivery.resolve-media.ts index 9f560116a5d..9f34c3cecc2 100644 --- a/src/telegram/bot/delivery.resolve-media.ts +++ b/src/telegram/bot/delivery.resolve-media.ts @@ -4,6 +4,7 @@ import { formatErrorMessage } from "../../infra/errors.js"; import { retryAsync } from "../../infra/retry.js"; import { fetchRemoteMedia } from "../../media/fetch.js"; import { saveMediaBuffer } from "../../media/store.js"; +import type { TelegramTransport } from "../fetch.js"; import { cacheSticker, getCachedSticker } from "../sticker-cache.js"; import { resolveTelegramMediaPlaceholder } from "./helpers.js"; import type { StickerMetadata, TelegramContext } from "./types.js"; @@ -92,17 +93,23 @@ async function resolveTelegramFileWithRetry( } } -function resolveRequiredFetchImpl(fetchImpl?: typeof fetch): typeof fetch { - const resolved = fetchImpl ?? globalThis.fetch; - if (!resolved) { +function resolveRequiredTelegramTransport(transport?: TelegramTransport): TelegramTransport { + if (transport) { + return transport; + } + const resolvedFetch = globalThis.fetch; + if (!resolvedFetch) { throw new Error("fetch is not available; set channels.telegram.proxy in config"); } - return resolved; + return { + fetch: resolvedFetch, + sourceFetch: resolvedFetch, + }; } -function resolveOptionalFetchImpl(fetchImpl?: typeof fetch): typeof fetch | null { +function resolveOptionalTelegramTransport(transport?: TelegramTransport): TelegramTransport | null { try { - return resolveRequiredFetchImpl(fetchImpl); + return resolveRequiredTelegramTransport(transport); } catch { return null; } @@ -114,14 +121,15 @@ const TELEGRAM_DOWNLOAD_IDLE_TIMEOUT_MS = 30_000; async function downloadAndSaveTelegramFile(params: { filePath: string; token: string; - fetchImpl: typeof fetch; + transport: TelegramTransport; maxBytes: number; telegramFileName?: string; }) { const url = `https://api.telegram.org/file/bot${params.token}/${params.filePath}`; const fetched = await fetchRemoteMedia({ url, - fetchImpl: params.fetchImpl, + fetchImpl: params.transport.sourceFetch, + dispatcherPolicy: params.transport.pinnedDispatcherPolicy, filePathHint: params.filePath, maxBytes: params.maxBytes, readIdleTimeoutMs: TELEGRAM_DOWNLOAD_IDLE_TIMEOUT_MS, @@ -142,7 +150,7 @@ async function resolveStickerMedia(params: { ctx: TelegramContext; maxBytes: number; token: string; - fetchImpl?: typeof fetch; + transport?: TelegramTransport; }): Promise< | { path: string; @@ -153,7 +161,7 @@ async function resolveStickerMedia(params: { | null | undefined > { - const { msg, ctx, maxBytes, token, fetchImpl } = params; + const { msg, ctx, maxBytes, token, transport } = params; if (!msg.sticker) { return undefined; } @@ -173,15 +181,15 @@ async function resolveStickerMedia(params: { logVerbose("telegram: getFile returned no file_path for sticker"); return null; } - const resolvedFetchImpl = resolveOptionalFetchImpl(fetchImpl); - if (!resolvedFetchImpl) { + const resolvedTransport = resolveOptionalTelegramTransport(transport); + if (!resolvedTransport) { logVerbose("telegram: fetch not available for sticker download"); return null; } const saved = await downloadAndSaveTelegramFile({ filePath: file.file_path, token, - fetchImpl: resolvedFetchImpl, + transport: resolvedTransport, maxBytes, }); @@ -237,7 +245,7 @@ export async function resolveMedia( ctx: TelegramContext, maxBytes: number, token: string, - fetchImpl?: typeof fetch, + transport?: TelegramTransport, ): Promise<{ path: string; contentType?: string; @@ -250,7 +258,7 @@ export async function resolveMedia( ctx, maxBytes, token, - fetchImpl, + transport, }); if (stickerResolved !== undefined) { return stickerResolved; @@ -271,7 +279,7 @@ export async function resolveMedia( const saved = await downloadAndSaveTelegramFile({ filePath: file.file_path, token, - fetchImpl: resolveRequiredFetchImpl(fetchImpl), + transport: resolveRequiredTelegramTransport(transport), maxBytes, telegramFileName: resolveTelegramFileName(msg), }); diff --git a/src/telegram/fetch.test.ts b/src/telegram/fetch.test.ts index dc4c7a5145a..4d6658e0327 100644 --- a/src/telegram/fetch.test.ts +++ b/src/telegram/fetch.test.ts @@ -1,6 +1,6 @@ import { afterEach, describe, expect, it, vi } from "vitest"; import { resolveFetch } from "../infra/fetch.js"; -import { resolveTelegramFetch } from "./fetch.js"; +import { resolveTelegramFetch, resolveTelegramTransport } from "./fetch.js"; const setDefaultResultOrder = vi.hoisted(() => vi.fn()); const setDefaultAutoSelectFamily = vi.hoisted(() => vi.fn()); @@ -313,12 +313,13 @@ describe("resolveTelegramFetch", () => { .mockResolvedValueOnce({ ok: true } as Response) .mockResolvedValueOnce({ ok: true } as Response); - const resolved = resolveTelegramFetchOrThrow(undefined, { + const transport = resolveTelegramTransport(undefined, { network: { autoSelectFamily: true, dnsResultOrder: "ipv4first", }, }); + const resolved = transport.fetch; await resolved("https://api.telegram.org/botx/sendMessage"); await resolved("https://api.telegram.org/botx/sendChatAction"); @@ -338,6 +339,11 @@ describe("resolveTelegramFetch", () => { autoSelectFamily: false, }), ); + expect(transport.pinnedDispatcherPolicy).toEqual( + expect.objectContaining({ + mode: "direct", + }), + ); }); it("arms sticky IPv4 fallback when env proxy init falls back to direct Agent", async () => { diff --git a/src/telegram/fetch.ts b/src/telegram/fetch.ts index a6b2cec4810..52484edde80 100644 --- a/src/telegram/fetch.ts +++ b/src/telegram/fetch.ts @@ -3,6 +3,7 @@ import { Agent, EnvHttpProxyAgent, ProxyAgent, fetch as undiciFetch } from "undi import type { TelegramNetworkConfig } from "../config/types.telegram.js"; import { resolveFetch } from "../infra/fetch.js"; import { hasEnvHttpProxyConfigured } from "../infra/net/proxy-env.js"; +import type { PinnedDispatcherPolicy } from "../infra/net/ssrf.js"; import { createSubsystemLogger } from "../logging/subsystem.js"; import { resolveTelegramAutoSelectFamilyDecision, @@ -181,13 +182,13 @@ function hasEnvHttpProxyForTelegramApi(env: NodeJS.ProcessEnv = process.env): bo return hasEnvHttpProxyConfigured("https", env); } -function createTelegramDispatcher(params: { +function resolveTelegramDispatcherPolicy(params: { autoSelectFamily: boolean | null; dnsResultOrder: TelegramDnsResultOrder | null; useEnvProxy: boolean; forceIpv4: boolean; proxyUrl?: string; -}): { dispatcher: TelegramDispatcher; mode: TelegramDispatcherMode } { +}): { policy: PinnedDispatcherPolicy; mode: TelegramDispatcherMode } { const connect = buildTelegramConnectOptions({ autoSelectFamily: params.autoSelectFamily, dnsResultOrder: params.dnsResultOrder, @@ -195,35 +196,77 @@ function createTelegramDispatcher(params: { }); const explicitProxyUrl = params.proxyUrl?.trim(); if (explicitProxyUrl) { - const proxyOptions = connect + return { + policy: connect + ? { + mode: "explicit-proxy", + proxyUrl: explicitProxyUrl, + proxyTls: { ...connect }, + } + : { + mode: "explicit-proxy", + proxyUrl: explicitProxyUrl, + }, + mode: "explicit-proxy", + }; + } + if (params.useEnvProxy) { + return { + policy: { + mode: "env-proxy", + ...(connect ? { connect: { ...connect }, proxyTls: { ...connect } } : {}), + }, + mode: "env-proxy", + }; + } + return { + policy: { + mode: "direct", + ...(connect ? { connect: { ...connect } } : {}), + }, + mode: "direct", + }; +} + +function createTelegramDispatcher(policy: PinnedDispatcherPolicy): { + dispatcher: TelegramDispatcher; + mode: TelegramDispatcherMode; + effectivePolicy: PinnedDispatcherPolicy; +} { + if (policy.mode === "explicit-proxy") { + const proxyOptions = policy.proxyTls ? ({ - uri: explicitProxyUrl, - proxyTls: connect, + uri: policy.proxyUrl, + proxyTls: { ...policy.proxyTls }, } satisfies ConstructorParameters[0]) - : explicitProxyUrl; + : policy.proxyUrl; try { return { dispatcher: new ProxyAgent(proxyOptions), mode: "explicit-proxy", + effectivePolicy: policy, }; } catch (err) { const reason = err instanceof Error ? err.message : String(err); throw new Error(`explicit proxy dispatcher init failed: ${reason}`, { cause: err }); } } - if (params.useEnvProxy) { - const proxyOptions = connect - ? ({ - connect, - // undici's EnvHttpProxyAgent passes `connect` only to the no-proxy Agent. - // Real proxied HTTPS traffic reads transport settings from ProxyAgent.proxyTls. - proxyTls: connect, - } satisfies ConstructorParameters[0]) - : undefined; + + if (policy.mode === "env-proxy") { + const proxyOptions = + policy.connect || policy.proxyTls + ? ({ + ...(policy.connect ? { connect: { ...policy.connect } } : {}), + // undici's EnvHttpProxyAgent passes `connect` only to the no-proxy Agent. + // Real proxied HTTPS traffic reads transport settings from ProxyAgent.proxyTls. + ...(policy.proxyTls ? { proxyTls: { ...policy.proxyTls } } : {}), + } satisfies ConstructorParameters[0]) + : undefined; try { return { dispatcher: new EnvHttpProxyAgent(proxyOptions), mode: "env-proxy", + effectivePolicy: policy, }; } catch (err) { log.warn( @@ -231,16 +274,34 @@ function createTelegramDispatcher(params: { err instanceof Error ? err.message : String(err) }`, ); + const directPolicy: PinnedDispatcherPolicy = { + mode: "direct", + ...(policy.connect ? { connect: { ...policy.connect } } : {}), + }; + return { + dispatcher: new Agent( + directPolicy.connect + ? ({ + connect: { ...directPolicy.connect }, + } satisfies ConstructorParameters[0]) + : undefined, + ), + mode: "direct", + effectivePolicy: directPolicy, + }; } } - const agentOptions = connect - ? ({ - connect, - } satisfies ConstructorParameters[0]) - : undefined; + return { - dispatcher: new Agent(agentOptions), + dispatcher: new Agent( + policy.connect + ? ({ + connect: { ...policy.connect }, + } satisfies ConstructorParameters[0]) + : undefined, + ), mode: "direct", + effectivePolicy: policy, }; } @@ -329,10 +390,16 @@ function shouldRetryWithIpv4Fallback(err: unknown): boolean { } // Prefer wrapped fetch when available to normalize AbortSignal across runtimes. -export function resolveTelegramFetch( +export type TelegramTransport = { + fetch: typeof fetch; + sourceFetch: typeof fetch; + pinnedDispatcherPolicy?: PinnedDispatcherPolicy; +}; + +export function resolveTelegramTransport( proxyFetch?: typeof fetch, options?: { network?: TelegramNetworkConfig }, -): typeof fetch { +): TelegramTransport { const autoSelectDecision = resolveTelegramAutoSelectFamilyDecision({ network: options?.network, }); @@ -351,51 +418,51 @@ export function resolveTelegramFetch( : proxyFetch ? resolveWrappedFetch(proxyFetch) : undiciSourceFetch; - + const dnsResultOrder = normalizeDnsResultOrder(dnsDecision.value); // Preserve fully caller-owned custom fetch implementations. - // OpenClaw proxy fetches are metadata-tagged and continue into resolver-scoped policy. if (proxyFetch && !explicitProxyUrl) { - return sourceFetch; + return { fetch: sourceFetch, sourceFetch }; } - const dnsResultOrder = normalizeDnsResultOrder(dnsDecision.value); const useEnvProxy = !explicitProxyUrl && hasEnvHttpProxyForTelegramApi(); - const defaultDispatcherResolution = createTelegramDispatcher({ + const defaultDispatcherResolution = resolveTelegramDispatcherPolicy({ autoSelectFamily: autoSelectDecision.value, dnsResultOrder, useEnvProxy, forceIpv4: false, proxyUrl: explicitProxyUrl, }); - const defaultDispatcher = defaultDispatcherResolution.dispatcher; + const defaultDispatcher = createTelegramDispatcher(defaultDispatcherResolution.policy); const shouldBypassEnvProxy = shouldBypassEnvProxyForTelegramApi(); const allowStickyIpv4Fallback = - defaultDispatcherResolution.mode === "direct" || - (defaultDispatcherResolution.mode === "env-proxy" && shouldBypassEnvProxy); - const stickyShouldUseEnvProxy = defaultDispatcherResolution.mode === "env-proxy"; + defaultDispatcher.mode === "direct" || + (defaultDispatcher.mode === "env-proxy" && shouldBypassEnvProxy); + const stickyShouldUseEnvProxy = defaultDispatcher.mode === "env-proxy"; let stickyIpv4FallbackEnabled = false; let stickyIpv4Dispatcher: TelegramDispatcher | null = null; const resolveStickyIpv4Dispatcher = () => { if (!stickyIpv4Dispatcher) { - stickyIpv4Dispatcher = createTelegramDispatcher({ - autoSelectFamily: false, - dnsResultOrder: "ipv4first", - useEnvProxy: stickyShouldUseEnvProxy, - forceIpv4: true, - proxyUrl: explicitProxyUrl, - }).dispatcher; + stickyIpv4Dispatcher = createTelegramDispatcher( + resolveTelegramDispatcherPolicy({ + autoSelectFamily: false, + dnsResultOrder: "ipv4first", + useEnvProxy: stickyShouldUseEnvProxy, + forceIpv4: true, + proxyUrl: explicitProxyUrl, + }).policy, + ).dispatcher; } return stickyIpv4Dispatcher; }; - return (async (input: RequestInfo | URL, init?: RequestInit) => { + const resolvedFetch = (async (input: RequestInfo | URL, init?: RequestInit) => { const callerProvidedDispatcher = Boolean( (init as RequestInitWithDispatcher | undefined)?.dispatcher, ); const initialInit = withDispatcherIfMissing( init, - stickyIpv4FallbackEnabled ? resolveStickyIpv4Dispatcher() : defaultDispatcher, + stickyIpv4FallbackEnabled ? resolveStickyIpv4Dispatcher() : defaultDispatcher.dispatcher, ); try { return await sourceFetch(input, initialInit); @@ -421,4 +488,17 @@ export function resolveTelegramFetch( throw err; } }) as typeof fetch; + + return { + fetch: resolvedFetch, + sourceFetch, + pinnedDispatcherPolicy: defaultDispatcher.effectivePolicy, + }; +} + +export function resolveTelegramFetch( + proxyFetch?: typeof fetch, + options?: { network?: TelegramNetworkConfig }, +): typeof fetch { + return resolveTelegramTransport(proxyFetch, options).fetch; }