2026-01-07 22:16:49 +01:00
import { type RunOptions , run } from "@grammyjs/runner" ;
2026-02-18 01:29:02 +00:00
import { resolveAgentMaxConcurrent } from "../config/agent-limits.js" ;
2026-02-18 01:34:35 +00:00
import type { OpenClawConfig } from "../config/config.js" ;
2026-02-01 10:03:47 +09:00
import { loadConfig } from "../config/config.js" ;
2026-01-08 12:04:21 +01:00
import { computeBackoff , sleepWithAbort } from "../infra/backoff.js" ;
2026-01-26 19:24:13 -05:00
import { formatErrorMessage } from "../infra/errors.js" ;
2026-02-08 04:53:31 -08:00
import { formatDurationPrecise } from "../infra/format-time/format-duration.ts" ;
2026-02-02 15:25:41 +00:00
import { registerUnhandledRejectionHandler } from "../infra/unhandled-rejections.js" ;
2026-02-18 01:34:35 +00:00
import type { RuntimeEnv } from "../runtime.js" ;
2026-01-08 01:18:37 +01:00
import { resolveTelegramAccount } from "./accounts.js" ;
2026-01-15 17:20:17 +00:00
import { resolveTelegramAllowedUpdates } from "./allowed-updates.js" ;
2025-12-08 01:48:53 +01:00
import { createTelegramBot } from "./bot.js" ;
2026-01-26 19:24:13 -05:00
import { isRecoverableTelegramNetworkError } from "./network-errors.js" ;
2026-01-13 04:02:47 +00:00
import { makeProxyFetch } from "./proxy.js" ;
2026-01-14 14:31:43 +00:00
import { readTelegramUpdateOffset , writeTelegramUpdateOffset } from "./update-offset-store.js" ;
2025-12-07 22:46:02 +01:00
import { startTelegramWebhook } from "./webhook.js" ;
export type MonitorTelegramOpts = {
token? : string ;
2026-01-08 01:18:37 +01:00
accountId? : string ;
2026-01-30 03:15:10 +01:00
config? : OpenClawConfig ;
2025-12-07 22:46:02 +01:00
runtime? : RuntimeEnv ;
abortSignal? : AbortSignal ;
useWebhook? : boolean ;
webhookPath? : string ;
webhookPort? : number ;
webhookSecret? : string ;
2026-02-14 01:39:56 +10:00
webhookHost? : string ;
2025-12-07 22:46:02 +01:00
proxyFetch? : typeof fetch ;
webhookUrl? : string ;
} ;
2026-01-30 03:15:10 +01:00
export function createTelegramRunnerOptions ( cfg : OpenClawConfig ) : RunOptions < unknown > {
2026-01-07 22:16:49 +01:00
return {
sink : {
2026-01-20 10:37:43 +00:00
concurrency : resolveAgentMaxConcurrent ( cfg ) ,
2026-01-07 22:16:49 +01:00
} ,
runner : {
fetch : {
// Match grammY defaults
timeout : 30 ,
2026-01-15 17:20:17 +00:00
// Request reactions without dropping default update types.
allowed_updates : resolveTelegramAllowedUpdates ( ) ,
2026-01-07 22:16:49 +01:00
} ,
2026-01-09 15:35:22 +01:00
// Suppress grammY getUpdates stack traces; we log concise errors ourselves.
silent : true ,
2026-01-26 19:24:13 -05:00
// Retry transient failures for a limited window before surfacing errors.
2026-01-26 15:25:27 -07:00
maxRetryTime : 5 * 60 * 1000 ,
retryInterval : "exponential" ,
2026-01-07 22:16:49 +01:00
} ,
} ;
}
2026-01-08 12:04:21 +01:00
const TELEGRAM_POLL_RESTART_POLICY = {
initialMs : 2000 ,
maxMs : 30_000 ,
factor : 1.8 ,
jitter : 0.25 ,
} ;
const isGetUpdatesConflict = ( err : unknown ) = > {
2026-01-31 16:19:20 +09:00
if ( ! err || typeof err !== "object" ) {
return false ;
}
2026-01-08 12:04:21 +01:00
const typed = err as {
error_code? : number ;
errorCode? : number ;
description? : string ;
method? : string ;
message? : string ;
} ;
const errorCode = typed . error_code ? ? typed . errorCode ;
2026-01-31 16:19:20 +09:00
if ( errorCode !== 409 ) {
return false ;
}
2026-01-08 11:07:04 +00:00
const haystack = [ typed . method , typed . description , typed . message ]
2026-01-08 12:04:21 +01:00
. filter ( ( value ) : value is string = > typeof value === "string" )
. join ( " " )
. toLowerCase ( ) ;
return haystack . includes ( "getupdates" ) ;
} ;
2026-02-02 15:25:41 +00:00
/** Check if error is a Grammy HttpError (used to scope unhandled rejection handling) */
const isGrammyHttpError = ( err : unknown ) : boolean = > {
if ( ! err || typeof err !== "object" ) {
2026-01-31 16:19:20 +09:00
return false ;
}
2026-02-02 15:25:41 +00:00
return ( err as { name? : string } ) . name === "HttpError" ;
2026-01-28 11:33:51 +13:00
} ;
2025-12-07 22:46:02 +01:00
export async function monitorTelegramProvider ( opts : MonitorTelegramOpts = { } ) {
2026-02-02 15:25:41 +00:00
const log = opts . runtime ? . error ? ? console . error ;
2025-12-07 22:46:02 +01:00
2026-02-02 15:25:41 +00:00
// Register handler for Grammy HttpError unhandled rejections.
// This catches network errors that escape the polling loop's try-catch
// (e.g., from setMyCommands during bot setup).
// We gate on isGrammyHttpError to avoid suppressing non-Telegram errors.
const unregisterHandler = registerUnhandledRejectionHandler ( ( err ) = > {
if ( isGrammyHttpError ( err ) && isRecoverableTelegramNetworkError ( err , { context : "polling" } ) ) {
log ( ` [telegram] Suppressed network error: ${ formatErrorMessage ( err ) } ` ) ;
return true ; // handled - don't crash
2026-01-31 16:19:20 +09:00
}
2026-02-02 15:25:41 +00:00
return false ;
} ) ;
try {
const cfg = opts . config ? ? loadConfig ( ) ;
const account = resolveTelegramAccount ( {
cfg ,
accountId : opts.accountId ,
} ) ;
const token = opts . token ? . trim ( ) || account . token ;
if ( ! token ) {
throw new Error (
` Telegram bot token missing for account " ${ account . accountId } " (set channels.telegram.accounts. ${ account . accountId } .botToken/tokenFile or TELEGRAM_BOT_TOKEN for default). ` ,
2026-01-12 21:52:13 -06:00
) ;
}
2026-02-02 15:25:41 +00:00
const proxyFetch =
opts . proxyFetch ? ? ( account . config . proxy ? makeProxyFetch ( account . config . proxy ) : undefined ) ;
2025-12-07 22:46:02 +01:00
2026-02-02 15:25:41 +00:00
let lastUpdateId = await readTelegramUpdateOffset ( {
2026-01-12 21:59:28 -05:00
accountId : account.accountId ,
2025-12-07 22:46:02 +01:00
} ) ;
2026-02-02 15:25:41 +00:00
const persistUpdateId = async ( updateId : number ) = > {
if ( lastUpdateId !== null && updateId <= lastUpdateId ) {
return ;
}
lastUpdateId = updateId ;
try {
await writeTelegramUpdateOffset ( {
accountId : account.accountId ,
updateId ,
} ) ;
} catch ( err ) {
( opts . runtime ? . error ? ? console . error ) (
` telegram: failed to persist update offset: ${ String ( err ) } ` ,
) ;
2026-01-08 12:04:21 +01:00
}
} ;
2026-02-02 15:25:41 +00:00
const bot = createTelegramBot ( {
token ,
runtime : opts.runtime ,
proxyFetch ,
config : cfg ,
accountId : account.accountId ,
updateOffset : {
lastUpdateId ,
onUpdateId : persistUpdateId ,
} ,
} ) ;
if ( opts . useWebhook ) {
await startTelegramWebhook ( {
token ,
accountId : account.accountId ,
config : cfg ,
path : opts.webhookPath ,
port : opts.webhookPort ,
2026-02-16 03:43:51 +01:00
secret : opts.webhookSecret ? ? account . config . webhookSecret ,
2026-02-14 01:39:56 +10:00
host : opts.webhookHost ? ? account . config . webhookHost ,
2026-02-02 15:25:41 +00:00
runtime : opts.runtime as RuntimeEnv ,
fetch : proxyFetch ,
abortSignal : opts.abortSignal ,
publicUrl : opts.webhookUrl ,
} ) ;
2026-01-08 12:04:21 +01:00
return ;
2026-02-02 15:25:41 +00:00
}
// Use grammyjs/runner for concurrent update processing
let restartAttempts = 0 ;
while ( ! opts . abortSignal ? . aborted ) {
const runner = run ( bot , createTelegramRunnerOptions ( cfg ) ) ;
const stopOnAbort = ( ) = > {
if ( opts . abortSignal ? . aborted ) {
void runner . stop ( ) ;
}
} ;
opts . abortSignal ? . addEventListener ( "abort" , stopOnAbort , { once : true } ) ;
2026-01-08 12:04:21 +01:00
try {
2026-02-02 15:25:41 +00:00
// runner.task() returns a promise that resolves when the runner stops
await runner . task ( ) ;
return ;
} catch ( err ) {
2026-01-31 16:19:20 +09:00
if ( opts . abortSignal ? . aborted ) {
2026-02-02 15:25:41 +00:00
throw err ;
}
const isConflict = isGetUpdatesConflict ( err ) ;
const isRecoverable = isRecoverableTelegramNetworkError ( err , { context : "polling" } ) ;
if ( ! isConflict && ! isRecoverable ) {
throw err ;
}
restartAttempts += 1 ;
const delayMs = computeBackoff ( TELEGRAM_POLL_RESTART_POLICY , restartAttempts ) ;
const reason = isConflict ? "getUpdates conflict" : "network error" ;
const errMsg = formatErrorMessage ( err ) ;
( opts . runtime ? . error ? ? console . error ) (
2026-02-08 04:53:31 -08:00
` Telegram ${ reason } : ${ errMsg } ; retrying in ${ formatDurationPrecise ( delayMs ) } . ` ,
2026-02-02 15:25:41 +00:00
) ;
try {
await sleepWithAbort ( delayMs , opts . abortSignal ) ;
} catch ( sleepErr ) {
if ( opts . abortSignal ? . aborted ) {
return ;
}
throw sleepErr ;
2026-01-31 16:19:20 +09:00
}
2026-02-02 15:25:41 +00:00
} finally {
opts . abortSignal ? . removeEventListener ( "abort" , stopOnAbort ) ;
2026-01-08 12:04:21 +01:00
}
2026-01-07 05:34:37 +00:00
}
2026-02-02 15:25:41 +00:00
} finally {
unregisterHandler ( ) ;
2025-12-07 22:46:02 +01:00
}
}