2026-01-07 22:16:49 +01:00
import { type RunOptions , run } from "@grammyjs/runner" ;
import type { ClawdbotConfig } from "../config/config.js" ;
2025-12-07 22:46:02 +01:00
import { loadConfig } from "../config/config.js" ;
2026-01-20 10:37:43 +00:00
import { resolveAgentMaxConcurrent } from "../config/agent-limits.js" ;
2026-01-08 12:04:21 +01:00
import { computeBackoff , sleepWithAbort } from "../infra/backoff.js" ;
import { formatDurationMs } from "../infra/format-duration.js" ;
2025-12-07 22:46:02 +01:00
import type { RuntimeEnv } from "../runtime.js" ;
2026-01-08 01:18:37 +01:00
import { resolveTelegramAccount } from "./accounts.js" ;
2026-01-15 17:20:17 +00:00
import { resolveTelegramAllowedUpdates } from "./allowed-updates.js" ;
2025-12-08 01:48:53 +01:00
import { createTelegramBot } from "./bot.js" ;
2026-01-13 04:02:47 +00:00
import { makeProxyFetch } from "./proxy.js" ;
2026-01-14 14:31:43 +00:00
import { readTelegramUpdateOffset , writeTelegramUpdateOffset } from "./update-offset-store.js" ;
2025-12-07 22:46:02 +01:00
import { startTelegramWebhook } from "./webhook.js" ;
export type MonitorTelegramOpts = {
token? : string ;
2026-01-08 01:18:37 +01:00
accountId? : string ;
config? : ClawdbotConfig ;
2025-12-07 22:46:02 +01:00
runtime? : RuntimeEnv ;
abortSignal? : AbortSignal ;
useWebhook? : boolean ;
webhookPath? : string ;
webhookPort? : number ;
webhookSecret? : string ;
proxyFetch? : typeof fetch ;
webhookUrl? : string ;
} ;
2026-01-14 14:31:43 +00:00
export function createTelegramRunnerOptions ( cfg : ClawdbotConfig ) : RunOptions < unknown > {
2026-01-07 22:16:49 +01:00
return {
sink : {
2026-01-20 10:37:43 +00:00
concurrency : resolveAgentMaxConcurrent ( cfg ) ,
2026-01-07 22:16:49 +01:00
} ,
runner : {
fetch : {
// Match grammY defaults
timeout : 30 ,
2026-01-15 17:20:17 +00:00
// Request reactions without dropping default update types.
allowed_updates : resolveTelegramAllowedUpdates ( ) ,
2026-01-07 22:16:49 +01:00
} ,
2026-01-09 15:35:22 +01:00
// Suppress grammY getUpdates stack traces; we log concise errors ourselves.
silent : true ,
2026-01-07 22:16:49 +01:00
} ,
} ;
}
2026-01-08 12:04:21 +01:00
const TELEGRAM_POLL_RESTART_POLICY = {
initialMs : 2000 ,
maxMs : 30_000 ,
factor : 1.8 ,
jitter : 0.25 ,
} ;
const isGetUpdatesConflict = ( err : unknown ) = > {
if ( ! err || typeof err !== "object" ) return false ;
const typed = err as {
error_code? : number ;
errorCode? : number ;
description? : string ;
method? : string ;
message? : string ;
} ;
const errorCode = typed . error_code ? ? typed . errorCode ;
if ( errorCode !== 409 ) return false ;
2026-01-08 11:07:04 +00:00
const haystack = [ typed . method , typed . description , typed . message ]
2026-01-08 12:04:21 +01:00
. filter ( ( value ) : value is string = > typeof value === "string" )
. join ( " " )
. toLowerCase ( ) ;
return haystack . includes ( "getupdates" ) ;
} ;
2025-12-07 22:46:02 +01:00
export async function monitorTelegramProvider ( opts : MonitorTelegramOpts = { } ) {
2026-01-08 01:18:37 +01:00
const cfg = opts . config ? ? loadConfig ( ) ;
const account = resolveTelegramAccount ( {
cfg ,
accountId : opts.accountId ,
2026-01-01 21:22:59 +01:00
} ) ;
2026-01-08 01:18:37 +01:00
const token = opts . token ? . trim ( ) || account . token ;
2025-12-07 22:46:02 +01:00
if ( ! token ) {
2025-12-08 01:48:53 +01:00
throw new Error (
2026-01-13 06:16:43 +00:00
` Telegram bot token missing for account " ${ account . accountId } " (set channels.telegram.accounts. ${ account . accountId } .botToken/tokenFile or TELEGRAM_BOT_TOKEN for default). ` ,
2025-12-08 01:48:53 +01:00
) ;
2025-12-07 22:46:02 +01:00
}
const proxyFetch =
opts . proxyFetch ? ?
2026-01-14 14:31:43 +00:00
( account . config . proxy ? makeProxyFetch ( account . config . proxy as string ) : undefined ) ;
2025-12-07 22:46:02 +01:00
2026-01-12 21:52:13 -06:00
let lastUpdateId = await readTelegramUpdateOffset ( {
accountId : account.accountId ,
} ) ;
const persistUpdateId = async ( updateId : number ) = > {
if ( lastUpdateId !== null && updateId <= lastUpdateId ) return ;
lastUpdateId = updateId ;
try {
await writeTelegramUpdateOffset ( {
accountId : account.accountId ,
updateId ,
} ) ;
} catch ( err ) {
( opts . runtime ? . error ? ? console . error ) (
` telegram: failed to persist update offset: ${ String ( err ) } ` ,
) ;
}
} ;
2025-12-07 22:46:02 +01:00
const bot = createTelegramBot ( {
token ,
runtime : opts.runtime ,
proxyFetch ,
2026-01-07 22:16:49 +01:00
config : cfg ,
2026-01-08 01:18:37 +01:00
accountId : account.accountId ,
2026-01-12 21:52:13 -06:00
updateOffset : {
lastUpdateId ,
onUpdateId : persistUpdateId ,
} ,
2025-12-07 22:46:02 +01:00
} ) ;
if ( opts . useWebhook ) {
await startTelegramWebhook ( {
token ,
2026-01-12 21:59:28 -05:00
accountId : account.accountId ,
config : cfg ,
2025-12-07 22:46:02 +01:00
path : opts.webhookPath ,
port : opts.webhookPort ,
secret : opts.webhookSecret ,
runtime : opts.runtime as RuntimeEnv ,
fetch : proxyFetch ,
abortSignal : opts.abortSignal ,
publicUrl : opts.webhookUrl ,
} ) ;
return ;
}
2026-01-07 05:34:37 +00:00
// Use grammyjs/runner for concurrent update processing
2026-01-15 17:20:17 +00:00
const log = opts . runtime ? . log ? ? console . log ;
2026-01-08 12:04:21 +01:00
let restartAttempts = 0 ;
2026-01-07 05:34:37 +00:00
2026-01-08 12:04:21 +01:00
while ( ! opts . abortSignal ? . aborted ) {
const runner = run ( bot , createTelegramRunnerOptions ( cfg ) ) ;
const stopOnAbort = ( ) = > {
if ( opts . abortSignal ? . aborted ) {
void runner . stop ( ) ;
}
} ;
opts . abortSignal ? . addEventListener ( "abort" , stopOnAbort , { once : true } ) ;
try {
// runner.task() returns a promise that resolves when the runner stops
await runner . task ( ) ;
return ;
} catch ( err ) {
if ( opts . abortSignal ? . aborted ) {
throw err ;
}
if ( ! isGetUpdatesConflict ( err ) ) {
throw err ;
}
restartAttempts += 1 ;
2026-01-14 14:31:43 +00:00
const delayMs = computeBackoff ( TELEGRAM_POLL_RESTART_POLICY , restartAttempts ) ;
log ( ` Telegram getUpdates conflict; retrying in ${ formatDurationMs ( delayMs ) } . ` ) ;
2026-01-08 12:04:21 +01:00
try {
await sleepWithAbort ( delayMs , opts . abortSignal ) ;
} catch ( sleepErr ) {
if ( opts . abortSignal ? . aborted ) return ;
throw sleepErr ;
}
} finally {
opts . abortSignal ? . removeEventListener ( "abort" , stopOnAbort ) ;
2026-01-07 05:34:37 +00:00
}
2025-12-07 22:46:02 +01:00
}
}