2026-01-07 22:16:49 +01:00
import { type RunOptions , run } from "@grammyjs/runner" ;
2026-02-18 01:29:02 +00:00
import { resolveAgentMaxConcurrent } from "../config/agent-limits.js" ;
2026-02-18 01:34:35 +00:00
import type { OpenClawConfig } from "../config/config.js" ;
2026-02-01 10:03:47 +09:00
import { loadConfig } from "../config/config.js" ;
2026-02-26 04:36:00 +01:00
import { waitForAbortSignal } from "../infra/abort-signal.js" ;
2026-01-08 12:04:21 +01:00
import { computeBackoff , sleepWithAbort } from "../infra/backoff.js" ;
2026-01-26 19:24:13 -05:00
import { formatErrorMessage } from "../infra/errors.js" ;
2026-02-08 04:53:31 -08:00
import { formatDurationPrecise } from "../infra/format-time/format-duration.ts" ;
2026-02-02 15:25:41 +00:00
import { registerUnhandledRejectionHandler } from "../infra/unhandled-rejections.js" ;
2026-02-18 01:34:35 +00:00
import type { RuntimeEnv } from "../runtime.js" ;
2026-01-08 01:18:37 +01:00
import { resolveTelegramAccount } from "./accounts.js" ;
2026-01-15 17:20:17 +00:00
import { resolveTelegramAllowedUpdates } from "./allowed-updates.js" ;
2026-02-22 17:48:08 +01:00
import { withTelegramApiErrorLogging } from "./api-logging.js" ;
2025-12-08 01:48:53 +01:00
import { createTelegramBot } from "./bot.js" ;
2026-01-26 19:24:13 -05:00
import { isRecoverableTelegramNetworkError } from "./network-errors.js" ;
2026-01-13 04:02:47 +00:00
import { makeProxyFetch } from "./proxy.js" ;
2026-01-14 14:31:43 +00:00
import { readTelegramUpdateOffset , writeTelegramUpdateOffset } from "./update-offset-store.js" ;
2025-12-07 22:46:02 +01:00
import { startTelegramWebhook } from "./webhook.js" ;
export type MonitorTelegramOpts = {
token? : string ;
2026-01-08 01:18:37 +01:00
accountId? : string ;
2026-01-30 03:15:10 +01:00
config? : OpenClawConfig ;
2025-12-07 22:46:02 +01:00
runtime? : RuntimeEnv ;
abortSignal? : AbortSignal ;
useWebhook? : boolean ;
webhookPath? : string ;
webhookPort? : number ;
webhookSecret? : string ;
2026-02-14 01:39:56 +10:00
webhookHost? : string ;
2025-12-07 22:46:02 +01:00
proxyFetch? : typeof fetch ;
webhookUrl? : string ;
} ;
2026-01-30 03:15:10 +01:00
export function createTelegramRunnerOptions ( cfg : OpenClawConfig ) : RunOptions < unknown > {
2026-01-07 22:16:49 +01:00
return {
sink : {
2026-01-20 10:37:43 +00:00
concurrency : resolveAgentMaxConcurrent ( cfg ) ,
2026-01-07 22:16:49 +01:00
} ,
runner : {
fetch : {
// Match grammY defaults
timeout : 30 ,
2026-01-15 17:20:17 +00:00
// Request reactions without dropping default update types.
allowed_updates : resolveTelegramAllowedUpdates ( ) ,
2026-01-07 22:16:49 +01:00
} ,
2026-01-09 15:35:22 +01:00
// Suppress grammY getUpdates stack traces; we log concise errors ourselves.
silent : true ,
2026-02-26 03:32:36 +01:00
// Keep grammY retrying for a long outage window. If polling still
// stops, the outer monitor loop restarts it with backoff.
2026-02-25 06:58:58 -05:00
maxRetryTime : 60 * 60 * 1000 ,
2026-01-26 15:25:27 -07:00
retryInterval : "exponential" ,
2026-01-07 22:16:49 +01:00
} ,
} ;
}
2026-01-08 12:04:21 +01:00
const TELEGRAM_POLL_RESTART_POLICY = {
initialMs : 2000 ,
maxMs : 30_000 ,
factor : 1.8 ,
jitter : 0.25 ,
} ;
2026-02-26 03:32:36 +01:00
type TelegramBot = ReturnType < typeof createTelegramBot > ;
2026-01-08 12:04:21 +01:00
const isGetUpdatesConflict = ( err : unknown ) = > {
2026-01-31 16:19:20 +09:00
if ( ! err || typeof err !== "object" ) {
return false ;
}
2026-01-08 12:04:21 +01:00
const typed = err as {
error_code? : number ;
errorCode? : number ;
description? : string ;
method? : string ;
message? : string ;
} ;
const errorCode = typed . error_code ? ? typed . errorCode ;
2026-01-31 16:19:20 +09:00
if ( errorCode !== 409 ) {
return false ;
}
2026-01-08 11:07:04 +00:00
const haystack = [ typed . method , typed . description , typed . message ]
2026-01-08 12:04:21 +01:00
. filter ( ( value ) : value is string = > typeof value === "string" )
. join ( " " )
. toLowerCase ( ) ;
return haystack . includes ( "getupdates" ) ;
} ;
2026-02-02 15:25:41 +00:00
/** Check if error is a Grammy HttpError (used to scope unhandled rejection handling) */
const isGrammyHttpError = ( err : unknown ) : boolean = > {
if ( ! err || typeof err !== "object" ) {
2026-01-31 16:19:20 +09:00
return false ;
}
2026-02-02 15:25:41 +00:00
return ( err as { name? : string } ) . name === "HttpError" ;
2026-01-28 11:33:51 +13:00
} ;
2025-12-07 22:46:02 +01:00
export async function monitorTelegramProvider ( opts : MonitorTelegramOpts = { } ) {
2026-02-02 15:25:41 +00:00
const log = opts . runtime ? . error ? ? console . error ;
2026-02-22 17:06:59 +01:00
let activeRunner : ReturnType < typeof run > | undefined ;
let forceRestarted = false ;
2025-12-07 22:46:02 +01:00
2026-02-02 15:25:41 +00:00
// Register handler for Grammy HttpError unhandled rejections.
// This catches network errors that escape the polling loop's try-catch
// (e.g., from setMyCommands during bot setup).
// We gate on isGrammyHttpError to avoid suppressing non-Telegram errors.
const unregisterHandler = registerUnhandledRejectionHandler ( ( err ) = > {
2026-02-22 17:06:59 +01:00
const isNetworkError = isRecoverableTelegramNetworkError ( err , { context : "polling" } ) ;
if ( isGrammyHttpError ( err ) && isNetworkError ) {
2026-02-02 15:25:41 +00:00
log ( ` [telegram] Suppressed network error: ${ formatErrorMessage ( err ) } ` ) ;
return true ; // handled - don't crash
2026-01-31 16:19:20 +09:00
}
2026-02-22 17:06:59 +01:00
// Network failures can surface outside the runner task promise and leave
// polling stuck; force-stop the active runner so the loop can recover.
if ( isNetworkError && activeRunner && activeRunner . isRunning ( ) ) {
forceRestarted = true ;
void activeRunner . stop ( ) . catch ( ( ) = > { } ) ;
log (
` [telegram] Restarting polling after unhandled network error: ${ formatErrorMessage ( err ) } ` ,
) ;
return true ; // handled
}
2026-02-02 15:25:41 +00:00
return false ;
} ) ;
try {
const cfg = opts . config ? ? loadConfig ( ) ;
const account = resolveTelegramAccount ( {
cfg ,
accountId : opts.accountId ,
} ) ;
const token = opts . token ? . trim ( ) || account . token ;
if ( ! token ) {
throw new Error (
` Telegram bot token missing for account " ${ account . accountId } " (set channels.telegram.accounts. ${ account . accountId } .botToken/tokenFile or TELEGRAM_BOT_TOKEN for default). ` ,
2026-01-12 21:52:13 -06:00
) ;
}
2026-02-02 15:25:41 +00:00
const proxyFetch =
opts . proxyFetch ? ? ( account . config . proxy ? makeProxyFetch ( account . config . proxy ) : undefined ) ;
2025-12-07 22:46:02 +01:00
2026-02-02 15:25:41 +00:00
let lastUpdateId = await readTelegramUpdateOffset ( {
2026-01-12 21:59:28 -05:00
accountId : account.accountId ,
2026-02-23 09:43:47 -05:00
botToken : token ,
2025-12-07 22:46:02 +01:00
} ) ;
2026-02-02 15:25:41 +00:00
const persistUpdateId = async ( updateId : number ) = > {
if ( lastUpdateId !== null && updateId <= lastUpdateId ) {
return ;
}
lastUpdateId = updateId ;
try {
await writeTelegramUpdateOffset ( {
accountId : account.accountId ,
updateId ,
2026-02-23 09:43:47 -05:00
botToken : token ,
2026-02-02 15:25:41 +00:00
} ) ;
} catch ( err ) {
( opts . runtime ? . error ? ? console . error ) (
` telegram: failed to persist update offset: ${ String ( err ) } ` ,
) ;
2026-01-08 12:04:21 +01:00
}
} ;
2026-02-02 15:25:41 +00:00
if ( opts . useWebhook ) {
await startTelegramWebhook ( {
token ,
accountId : account.accountId ,
config : cfg ,
path : opts.webhookPath ,
port : opts.webhookPort ,
2026-02-16 03:43:51 +01:00
secret : opts.webhookSecret ? ? account . config . webhookSecret ,
2026-02-14 01:39:56 +10:00
host : opts.webhookHost ? ? account . config . webhookHost ,
2026-02-02 15:25:41 +00:00
runtime : opts.runtime as RuntimeEnv ,
fetch : proxyFetch ,
abortSignal : opts.abortSignal ,
publicUrl : opts.webhookUrl ,
} ) ;
2026-02-26 04:36:00 +01:00
await waitForAbortSignal ( opts . abortSignal ) ;
2026-01-08 12:04:21 +01:00
return ;
2026-02-02 15:25:41 +00:00
}
// Use grammyjs/runner for concurrent update processing
let restartAttempts = 0 ;
2026-02-22 17:48:08 +01:00
let webhookCleared = false ;
2026-02-22 17:47:12 +01:00
const runnerOptions = createTelegramRunnerOptions ( cfg ) ;
2026-02-26 03:32:36 +01:00
const waitBeforeRestart = async ( buildLine : ( delay : string ) = > string ) : Promise < boolean > = > {
2026-02-22 21:18:53 +00:00
restartAttempts += 1 ;
const delayMs = computeBackoff ( TELEGRAM_POLL_RESTART_POLICY , restartAttempts ) ;
2026-02-26 03:32:36 +01:00
const delay = formatDurationPrecise ( delayMs ) ;
log ( buildLine ( delay ) ) ;
2026-02-22 21:18:53 +00:00
try {
await sleepWithAbort ( delayMs , opts . abortSignal ) ;
} catch ( sleepErr ) {
if ( opts . abortSignal ? . aborted ) {
return false ;
}
throw sleepErr ;
}
return true ;
} ;
2026-02-02 15:25:41 +00:00
2026-02-26 03:32:36 +01:00
const waitBeforeRetryOnRecoverableSetupError = async (
err : unknown ,
logPrefix : string ,
) : Promise < boolean > = > {
if ( opts . abortSignal ? . aborted ) {
return false ;
}
if ( ! isRecoverableTelegramNetworkError ( err , { context : "unknown" } ) ) {
throw err ;
}
return waitBeforeRestart (
( delay ) = > ` ${ logPrefix } : ${ formatErrorMessage ( err ) } ; retrying in ${ delay } . ` ,
) ;
} ;
const createPollingBot = async ( ) : Promise < TelegramBot | undefined > = > {
2026-02-22 17:47:12 +01:00
try {
2026-02-26 03:32:36 +01:00
return createTelegramBot ( {
2026-02-22 17:47:12 +01:00
token ,
runtime : opts.runtime ,
proxyFetch ,
config : cfg ,
accountId : account.accountId ,
updateOffset : {
lastUpdateId ,
onUpdateId : persistUpdateId ,
} ,
} ) ;
} catch ( err ) {
2026-02-22 21:18:53 +00:00
const shouldRetry = await waitBeforeRetryOnRecoverableSetupError (
err ,
"Telegram setup network error" ,
2026-02-22 17:47:12 +01:00
) ;
2026-02-22 21:18:53 +00:00
if ( ! shouldRetry ) {
2026-02-26 03:32:36 +01:00
return undefined ;
2026-02-22 17:47:12 +01:00
}
2026-02-26 03:32:36 +01:00
return undefined ;
2026-02-22 17:47:12 +01:00
}
2026-02-26 03:32:36 +01:00
} ;
2026-02-22 17:47:12 +01:00
2026-02-26 03:32:36 +01:00
const ensureWebhookCleanup = async ( bot : TelegramBot ) : Promise < "ready" | "retry" | "exit" > = > {
if ( webhookCleared ) {
return "ready" ;
}
try {
await withTelegramApiErrorLogging ( {
operation : "deleteWebhook" ,
runtime : opts.runtime ,
fn : ( ) = > bot . api . deleteWebhook ( { drop_pending_updates : false } ) ,
} ) ;
webhookCleared = true ;
return "ready" ;
} catch ( err ) {
const shouldRetry = await waitBeforeRetryOnRecoverableSetupError (
err ,
"Telegram webhook cleanup failed" ,
) ;
return shouldRetry ? "retry" : "exit" ;
2026-02-22 17:48:08 +01:00
}
2026-02-26 03:32:36 +01:00
} ;
2026-02-22 17:48:08 +01:00
2026-02-26 03:32:36 +01:00
const runPollingCycle = async ( bot : TelegramBot ) : Promise < "continue" | "exit" > = > {
2026-02-22 17:47:12 +01:00
const runner = run ( bot , runnerOptions ) ;
2026-02-22 17:06:59 +01:00
activeRunner = runner ;
2026-02-23 09:43:47 -05:00
let stopPromise : Promise < void > | undefined ;
const stopRunner = ( ) = > {
stopPromise ? ? = Promise . resolve ( runner . stop ( ) )
. then ( ( ) = > undefined )
. catch ( ( ) = > {
// Runner may already be stopped by abort/retry paths.
} ) ;
return stopPromise ;
} ;
2026-03-02 09:30:26 +08:00
const stopBot = ( ) = > {
return Promise . resolve ( bot . stop ( ) )
. then ( ( ) = > undefined )
. catch ( ( ) = > {
// Bot may already be stopped by runner stop/abort paths.
} ) ;
} ;
2026-02-02 15:25:41 +00:00
const stopOnAbort = ( ) = > {
if ( opts . abortSignal ? . aborted ) {
2026-02-23 09:43:47 -05:00
void stopRunner ( ) ;
2026-02-02 15:25:41 +00:00
}
} ;
opts . abortSignal ? . addEventListener ( "abort" , stopOnAbort , { once : true } ) ;
2026-01-08 12:04:21 +01:00
try {
2026-02-02 15:25:41 +00:00
// runner.task() returns a promise that resolves when the runner stops
await runner . task ( ) ;
2026-02-25 06:58:58 -05:00
if ( opts . abortSignal ? . aborted ) {
2026-02-26 03:32:36 +01:00
return "exit" ;
2026-02-22 17:06:59 +01:00
}
2026-02-25 06:58:58 -05:00
const reason = forceRestarted
? "unhandled network error"
: "runner stopped (maxRetryTime exceeded or graceful stop)" ;
forceRestarted = false ;
2026-02-26 03:32:36 +01:00
const shouldRestart = await waitBeforeRestart (
( delay ) = > ` Telegram polling runner stopped ( ${ reason } ); restarting in ${ delay } . ` ,
2026-02-22 17:06:59 +01:00
) ;
2026-02-26 03:32:36 +01:00
return shouldRestart ? "continue" : "exit" ;
2026-02-02 15:25:41 +00:00
} catch ( err ) {
2026-02-22 17:06:59 +01:00
forceRestarted = false ;
2026-01-31 16:19:20 +09:00
if ( opts . abortSignal ? . aborted ) {
2026-02-02 15:25:41 +00:00
throw err ;
}
const isConflict = isGetUpdatesConflict ( err ) ;
const isRecoverable = isRecoverableTelegramNetworkError ( err , { context : "polling" } ) ;
if ( ! isConflict && ! isRecoverable ) {
throw err ;
}
const reason = isConflict ? "getUpdates conflict" : "network error" ;
const errMsg = formatErrorMessage ( err ) ;
2026-02-26 03:32:36 +01:00
const shouldRestart = await waitBeforeRestart (
( delay ) = > ` Telegram ${ reason } : ${ errMsg } ; retrying in ${ delay } . ` ,
2026-02-02 15:25:41 +00:00
) ;
2026-02-26 03:32:36 +01:00
return shouldRestart ? "continue" : "exit" ;
2026-02-02 15:25:41 +00:00
} finally {
opts . abortSignal ? . removeEventListener ( "abort" , stopOnAbort ) ;
2026-02-23 09:43:47 -05:00
await stopRunner ( ) ;
2026-03-02 09:30:26 +08:00
await stopBot ( ) ;
2026-01-08 12:04:21 +01:00
}
2026-02-26 03:32:36 +01:00
} ;
while ( ! opts . abortSignal ? . aborted ) {
const bot = await createPollingBot ( ) ;
if ( ! bot ) {
continue ;
}
const cleanupState = await ensureWebhookCleanup ( bot ) ;
if ( cleanupState === "retry" ) {
continue ;
}
if ( cleanupState === "exit" ) {
return ;
}
const state = await runPollingCycle ( bot ) ;
if ( state === "exit" ) {
return ;
}
2026-01-07 05:34:37 +00:00
}
2026-02-02 15:25:41 +00:00
} finally {
unregisterHandler ( ) ;
2025-12-07 22:46:02 +01:00
}
}