2026-01-15 15:07:19 -08:00
import { hasControlCommand } from "../../auto-reply/command-detection.js" ;
import { resolveInboundDebounceMs } from "../../auto-reply/inbound-debounce.js" ;
2026-02-01 10:03:47 +09:00
import { getReplyFromConfig } from "../../auto-reply/reply.js" ;
import { DEFAULT_GROUP_HISTORY_LIMIT } from "../../auto-reply/reply/history.js" ;
import { formatCliCommand } from "../../cli/command-format.js" ;
2026-01-14 01:08:15 +00:00
import { waitForever } from "../../cli/wait.js" ;
import { loadConfig } from "../../config/config.js" ;
2026-03-07 00:58:08 -06:00
import { createConnectedChannelStatusPatch } from "../../gateway/channel-status-patches.js" ;
2026-01-14 01:08:15 +00:00
import { logVerbose } from "../../globals.js" ;
2026-02-08 04:53:31 -08:00
import { formatDurationPrecise } from "../../infra/format-time/format-duration.ts" ;
2026-01-14 01:08:15 +00:00
import { enqueueSystemEvent } from "../../infra/system-events.js" ;
import { registerUnhandledRejectionHandler } from "../../infra/unhandled-rejections.js" ;
import { getChildLogger } from "../../logging.js" ;
import { resolveAgentRoute } from "../../routing/resolve-route.js" ;
import { defaultRuntime , type RuntimeEnv } from "../../runtime.js" ;
2026-03-06 11:08:15 -05:00
import { resolveWhatsAppAccount , resolveWhatsAppMediaMaxBytes } from "../accounts.js" ;
2026-01-14 01:08:15 +00:00
import { setActiveWebListener } from "../active-listener.js" ;
import { monitorWebInbox } from "../inbound.js" ;
import {
computeBackoff ,
newConnectionId ,
resolveHeartbeatSeconds ,
resolveReconnectPolicy ,
sleepWithAbort ,
} from "../reconnect.js" ;
import { formatError , getWebAuthAgeMs , readWebSelfId } from "../session.js" ;
import { whatsappHeartbeatLog , whatsappLog } from "./loggers.js" ;
import { buildMentionConfig } from "./mentions.js" ;
import { createEchoTracker } from "./monitor/echo.js" ;
import { createWebOnMessageHandler } from "./monitor/on-message.js" ;
2026-02-18 01:34:35 +00:00
import type { WebChannelStatus , WebInboundMsg , WebMonitorTuning } from "./types.js" ;
2026-01-14 01:08:15 +00:00
import { isLikelyWhatsAppCryptoError } from "./util.js" ;
2026-02-24 19:28:47 +00:00
function isNonRetryableWebCloseStatus ( statusCode : unknown ) : boolean {
// WhatsApp 440 = session conflict ("Unknown Stream Errored (conflict)").
// This is persistent until the operator resolves the conflicting session.
return statusCode === 440 ;
}
2026-01-14 01:08:15 +00:00
export async function monitorWebChannel (
verbose : boolean ,
listenerFactory : typeof monitorWebInbox | undefined = monitorWebInbox ,
keepAlive = true ,
replyResolver : typeof getReplyFromConfig | undefined = getReplyFromConfig ,
runtime : RuntimeEnv = defaultRuntime ,
abortSignal? : AbortSignal ,
tuning : WebMonitorTuning = { } ,
) {
const runId = newConnectionId ( ) ;
const replyLogger = getChildLogger ( { module : "web-auto-reply" , runId } ) ;
const heartbeatLogger = getChildLogger ( { module : "web-heartbeat" , runId } ) ;
const reconnectLogger = getChildLogger ( { module : "web-reconnect" , runId } ) ;
const status : WebChannelStatus = {
running : true ,
connected : false ,
reconnectAttempts : 0 ,
lastConnectedAt : null ,
lastDisconnect : null ,
lastMessageAt : null ,
lastEventAt : null ,
lastError : null ,
} ;
const emitStatus = ( ) = > {
tuning . statusSink ? . ( {
. . . status ,
2026-01-14 14:31:43 +00:00
lastDisconnect : status.lastDisconnect ? { . . . status . lastDisconnect } : null ,
2026-01-14 01:08:15 +00:00
} ) ;
} ;
emitStatus ( ) ;
const baseCfg = loadConfig ( ) ;
const account = resolveWhatsAppAccount ( {
cfg : baseCfg ,
accountId : tuning.accountId ,
} ) ;
const cfg = {
. . . baseCfg ,
channels : {
. . . baseCfg . channels ,
whatsapp : {
. . . baseCfg . channels ? . whatsapp ,
ackReaction : account.ackReaction ,
messagePrefix : account.messagePrefix ,
allowFrom : account.allowFrom ,
groupAllowFrom : account.groupAllowFrom ,
groupPolicy : account.groupPolicy ,
textChunkLimit : account.textChunkLimit ,
2026-01-25 04:05:14 +00:00
chunkMode : account.chunkMode ,
2026-01-14 01:08:15 +00:00
mediaMaxMb : account.mediaMaxMb ,
blockStreaming : account.blockStreaming ,
groups : account.groups ,
} ,
} ,
} satisfies ReturnType < typeof loadConfig > ;
2026-03-06 11:08:15 -05:00
const maxMediaBytes = resolveWhatsAppMediaMaxBytes ( account ) ;
2026-01-14 14:31:43 +00:00
const heartbeatSeconds = resolveHeartbeatSeconds ( cfg , tuning . heartbeatSeconds ) ;
2026-01-14 01:08:15 +00:00
const reconnectPolicy = resolveReconnectPolicy ( cfg , tuning . reconnect ) ;
const baseMentionConfig = buildMentionConfig ( cfg ) ;
const groupHistoryLimit =
cfg . channels ? . whatsapp ? . accounts ? . [ tuning . accountId ? ? "" ] ? . historyLimit ? ?
cfg . channels ? . whatsapp ? . historyLimit ? ?
cfg . messages ? . groupChat ? . historyLimit ? ?
DEFAULT_GROUP_HISTORY_LIMIT ;
const groupHistories = new Map <
string ,
Array < {
sender : string ;
body : string ;
timestamp? : number ;
id? : string ;
senderJid? : string ;
} >
> ( ) ;
const groupMemberNames = new Map < string , Map < string , string > > ( ) ;
const echoTracker = createEchoTracker ( { maxItems : 100 , logVerbose } ) ;
const sleep =
tuning . sleep ? ?
2026-01-14 14:31:43 +00:00
( ( ms : number , signal? : AbortSignal ) = > sleepWithAbort ( ms , signal ? ? abortSignal ) ) ;
2026-01-14 01:08:15 +00:00
const stopRequested = ( ) = > abortSignal ? . aborted === true ;
const abortPromise =
abortSignal &&
new Promise < "aborted" > ( ( resolve ) = >
abortSignal . addEventListener ( "abort" , ( ) = > resolve ( "aborted" ) , {
once : true ,
} ) ,
) ;
// Avoid noisy MaxListenersExceeded warnings in test environments where
// multiple gateway instances may be constructed.
const currentMaxListeners = process . getMaxListeners ? . ( ) ? ? 10 ;
if ( process . setMaxListeners && currentMaxListeners < 50 ) {
process . setMaxListeners ( 50 ) ;
}
let sigintStop = false ;
const handleSigint = ( ) = > {
sigintStop = true ;
} ;
process . once ( "SIGINT" , handleSigint ) ;
let reconnectAttempts = 0 ;
while ( true ) {
2026-01-31 16:19:20 +09:00
if ( stopRequested ( ) ) {
break ;
}
2026-01-14 01:08:15 +00:00
const connectionId = newConnectionId ( ) ;
const startedAt = Date . now ( ) ;
let heartbeat : NodeJS.Timeout | null = null ;
let watchdogTimer : NodeJS.Timeout | null = null ;
let lastMessageAt : number | null = null ;
let handledMessages = 0 ;
let _lastInboundMsg : WebInboundMsg | null = null ;
let unregisterUnhandled : ( ( ) = > void ) | null = null ;
2026-02-22 22:05:39 +00:00
// Watchdog to detect stuck message processing (e.g., event emitter died).
// Tuning overrides are test-oriented; production defaults remain unchanged.
const MESSAGE_TIMEOUT_MS = tuning . messageTimeoutMs ? ? 30 * 60 * 1000 ; // 30m default
const WATCHDOG_CHECK_MS = tuning . watchdogCheckMs ? ? 60 * 1000 ; // 1m default
2026-01-14 01:08:15 +00:00
const backgroundTasks = new Set < Promise < unknown > > ( ) ;
const onMessage = createWebOnMessageHandler ( {
cfg ,
verbose ,
connectionId ,
maxMediaBytes ,
groupHistoryLimit ,
groupHistories ,
groupMemberNames ,
echoTracker ,
backgroundTasks ,
replyResolver : replyResolver ? ? getReplyFromConfig ,
replyLogger ,
baseMentionConfig ,
account ,
} ) ;
2026-01-15 15:07:19 -08:00
const inboundDebounceMs = resolveInboundDebounceMs ( { cfg , channel : "whatsapp" } ) ;
const shouldDebounce = ( msg : WebInboundMsg ) = > {
2026-01-31 16:19:20 +09:00
if ( msg . mediaPath || msg . mediaType ) {
return false ;
}
if ( msg . location ) {
return false ;
}
if ( msg . replyToId || msg . replyToBody ) {
return false ;
}
2026-01-15 15:07:19 -08:00
return ! hasControlCommand ( msg . body , cfg ) ;
} ;
2026-01-14 01:08:15 +00:00
const listener = await ( listenerFactory ? ? monitorWebInbox ) ( {
verbose ,
accountId : account.accountId ,
authDir : account.authDir ,
mediaMaxMb : account.mediaMaxMb ,
2026-01-13 22:01:55 -04:00
sendReadReceipts : account.sendReadReceipts ,
2026-01-15 15:07:19 -08:00
debounceMs : inboundDebounceMs ,
shouldDebounce ,
2026-01-14 01:08:15 +00:00
onMessage : async ( msg : WebInboundMsg ) = > {
handledMessages += 1 ;
lastMessageAt = Date . now ( ) ;
status . lastMessageAt = lastMessageAt ;
status . lastEventAt = lastMessageAt ;
emitStatus ( ) ;
_lastInboundMsg = msg ;
await onMessage ( msg ) ;
} ,
} ) ;
2026-03-07 00:58:08 -06:00
Object . assign ( status , createConnectedChannelStatusPatch ( ) ) ;
2026-01-14 01:08:15 +00:00
status . lastError = null ;
emitStatus ( ) ;
// Surface a concise connection event for the next main-session turn/heartbeat.
const { e164 : selfE164 } = readWebSelfId ( account . authDir ) ;
const connectRoute = resolveAgentRoute ( {
cfg ,
channel : "whatsapp" ,
accountId : account.accountId ,
} ) ;
2026-01-14 14:31:43 +00:00
enqueueSystemEvent ( ` WhatsApp gateway connected ${ selfE164 ? ` as ${ selfE164 } ` : "" } . ` , {
sessionKey : connectRoute.sessionKey ,
} ) ;
2026-01-14 01:08:15 +00:00
setActiveWebListener ( account . accountId , listener ) ;
unregisterUnhandled = registerUnhandledRejectionHandler ( ( reason ) = > {
2026-01-31 16:19:20 +09:00
if ( ! isLikelyWhatsAppCryptoError ( reason ) ) {
return false ;
}
2026-01-14 01:08:15 +00:00
const errorStr = formatError ( reason ) ;
reconnectLogger . warn (
{ connectionId , error : errorStr } ,
"web reconnect: unhandled rejection from WhatsApp socket; forcing reconnect" ,
) ;
listener . signalClose ? . ( {
status : 499 ,
isLoggedOut : false ,
error : reason ,
} ) ;
return true ;
} ) ;
const closeListener = async ( ) = > {
setActiveWebListener ( account . accountId , null ) ;
if ( unregisterUnhandled ) {
unregisterUnhandled ( ) ;
unregisterUnhandled = null ;
}
2026-01-31 16:19:20 +09:00
if ( heartbeat ) {
clearInterval ( heartbeat ) ;
}
if ( watchdogTimer ) {
clearInterval ( watchdogTimer ) ;
}
2026-01-14 01:08:15 +00:00
if ( backgroundTasks . size > 0 ) {
await Promise . allSettled ( backgroundTasks ) ;
backgroundTasks . clear ( ) ;
}
try {
await listener . close ( ) ;
} catch ( err ) {
logVerbose ( ` Socket close failed: ${ formatError ( err ) } ` ) ;
}
} ;
if ( keepAlive ) {
heartbeat = setInterval ( ( ) = > {
const authAgeMs = getWebAuthAgeMs ( account . authDir ) ;
const minutesSinceLastMessage = lastMessageAt
? Math . floor ( ( Date . now ( ) - lastMessageAt ) / 60000 )
: null ;
const logData = {
connectionId ,
reconnectAttempts ,
messagesHandled : handledMessages ,
lastMessageAt ,
authAgeMs ,
uptimeMs : Date.now ( ) - startedAt ,
. . . ( minutesSinceLastMessage !== null && minutesSinceLastMessage > 30
? { minutesSinceLastMessage }
: { } ) ,
} ;
if ( minutesSinceLastMessage && minutesSinceLastMessage > 30 ) {
2026-01-14 14:31:43 +00:00
heartbeatLogger . warn ( logData , "⚠️ web gateway heartbeat - no messages in 30+ minutes" ) ;
2026-01-14 01:08:15 +00:00
} else {
heartbeatLogger . info ( logData , "web gateway heartbeat" ) ;
}
} , heartbeatSeconds * 1000 ) ;
watchdogTimer = setInterval ( ( ) = > {
2026-01-31 16:19:20 +09:00
if ( ! lastMessageAt ) {
return ;
}
2026-01-14 01:08:15 +00:00
const timeSinceLastMessage = Date . now ( ) - lastMessageAt ;
2026-01-31 16:19:20 +09:00
if ( timeSinceLastMessage <= MESSAGE_TIMEOUT_MS ) {
return ;
}
2026-01-14 14:31:43 +00:00
const minutesSinceLastMessage = Math . floor ( timeSinceLastMessage / 60000 ) ;
2026-01-14 01:08:15 +00:00
heartbeatLogger . warn (
{
connectionId ,
minutesSinceLastMessage ,
lastMessageAt : new Date ( lastMessageAt ) ,
messagesHandled : handledMessages ,
} ,
"Message timeout detected - forcing reconnect" ,
) ;
whatsappHeartbeatLog . warn (
` No messages received in ${ minutesSinceLastMessage } m - restarting connection ` ,
) ;
void closeListener ( ) . catch ( ( err ) = > {
logVerbose ( ` Close listener failed: ${ formatError ( err ) } ` ) ;
} ) ;
listener . signalClose ? . ( {
status : 499 ,
isLoggedOut : false ,
error : "watchdog-timeout" ,
} ) ;
} , WATCHDOG_CHECK_MS ) ;
}
whatsappLog . info ( "Listening for personal WhatsApp inbound messages." ) ;
if ( process . stdout . isTTY || process . stderr . isTTY ) {
whatsappLog . raw ( "Ctrl+C to stop." ) ;
}
if ( ! keepAlive ) {
await closeListener ( ) ;
2026-02-14 22:37:05 +00:00
process . removeListener ( "SIGINT" , handleSigint ) ;
2026-01-14 01:08:15 +00:00
return ;
}
const reason = await Promise . race ( [
listener . onClose ? . catch ( ( err ) = > {
2026-01-14 14:31:43 +00:00
reconnectLogger . error ( { error : formatError ( err ) } , "listener.onClose rejected" ) ;
2026-01-14 01:08:15 +00:00
return { status : 500 , isLoggedOut : false , error : err } ;
} ) ? ? waitForever ( ) ,
abortPromise ? ? waitForever ( ) ,
] ) ;
const uptimeMs = Date . now ( ) - startedAt ;
if ( uptimeMs > heartbeatSeconds * 1000 ) {
reconnectAttempts = 0 ; // Healthy stretch; reset the backoff.
}
status . reconnectAttempts = reconnectAttempts ;
emitStatus ( ) ;
if ( stopRequested ( ) || sigintStop || reason === "aborted" ) {
await closeListener ( ) ;
break ;
}
const statusCode =
( typeof reason === "object" && reason && "status" in reason
? ( reason as { status? : number } ) . status
: undefined ) ? ? "unknown" ;
const loggedOut =
typeof reason === "object" &&
reason &&
"isLoggedOut" in reason &&
( reason as { isLoggedOut? : boolean } ) . isLoggedOut ;
const errorStr = formatError ( reason ) ;
status . connected = false ;
status . lastEventAt = Date . now ( ) ;
status . lastDisconnect = {
at : status.lastEventAt ,
status : typeof statusCode === "number" ? statusCode : undefined ,
error : errorStr ,
loggedOut : Boolean ( loggedOut ) ,
} ;
status . lastError = errorStr ;
status . reconnectAttempts = reconnectAttempts ;
emitStatus ( ) ;
reconnectLogger . info (
{
connectionId ,
status : statusCode ,
loggedOut ,
reconnectAttempts ,
error : errorStr ,
} ,
"web reconnect: connection closed" ,
) ;
2026-01-14 14:31:43 +00:00
enqueueSystemEvent ( ` WhatsApp gateway disconnected (status ${ statusCode ? ? "unknown" } ) ` , {
sessionKey : connectRoute.sessionKey ,
} ) ;
2026-01-14 01:08:15 +00:00
if ( loggedOut ) {
runtime . error (
2026-01-30 03:15:10 +01:00
` WhatsApp session logged out. Run \` ${ formatCliCommand ( "openclaw channels login --channel web" ) } \` to relink. ` ,
2026-01-14 01:08:15 +00:00
) ;
await closeListener ( ) ;
break ;
}
2026-02-24 19:28:47 +00:00
if ( isNonRetryableWebCloseStatus ( statusCode ) ) {
reconnectLogger . warn (
{
connectionId ,
status : statusCode ,
error : errorStr ,
} ,
"web reconnect: non-retryable close status; stopping monitor" ,
) ;
runtime . error (
` WhatsApp Web connection closed (status ${ statusCode } : session conflict). Resolve conflicting WhatsApp Web sessions, then relink with \` ${ formatCliCommand ( "openclaw channels login --channel web" ) } \` . Stopping web monitoring. ` ,
) ;
await closeListener ( ) ;
break ;
}
2026-01-14 01:08:15 +00:00
reconnectAttempts += 1 ;
status . reconnectAttempts = reconnectAttempts ;
emitStatus ( ) ;
2026-01-14 14:31:43 +00:00
if ( reconnectPolicy . maxAttempts > 0 && reconnectAttempts >= reconnectPolicy . maxAttempts ) {
2026-01-14 01:08:15 +00:00
reconnectLogger . warn (
{
connectionId ,
status : statusCode ,
reconnectAttempts ,
maxAttempts : reconnectPolicy.maxAttempts ,
} ,
"web reconnect: max attempts reached; continuing in degraded mode" ,
) ;
runtime . error (
` WhatsApp Web reconnect: max attempts reached ( ${ reconnectAttempts } / ${ reconnectPolicy . maxAttempts } ). Stopping web monitoring. ` ,
) ;
await closeListener ( ) ;
break ;
}
const delay = computeBackoff ( reconnectPolicy , reconnectAttempts ) ;
reconnectLogger . info (
{
connectionId ,
status : statusCode ,
reconnectAttempts ,
maxAttempts : reconnectPolicy.maxAttempts || "unlimited" ,
delayMs : delay ,
} ,
"web reconnect: scheduling retry" ,
) ;
runtime . error (
2026-02-08 04:53:31 -08:00
` WhatsApp Web connection closed (status ${ statusCode } ). Retry ${ reconnectAttempts } / ${ reconnectPolicy . maxAttempts || "∞" } in ${ formatDurationPrecise ( delay ) } … ( ${ errorStr } ) ` ,
2026-01-14 01:08:15 +00:00
) ;
await closeListener ( ) ;
try {
await sleep ( delay , abortSignal ) ;
} catch {
break ;
}
}
status . running = false ;
status . connected = false ;
status . lastEventAt = Date . now ( ) ;
emitStatus ( ) ;
process . removeListener ( "SIGINT" , handleSigint ) ;
}