diff --git a/extensions/feishu/src/monitor.account.ts b/extensions/feishu/src/monitor.account.ts index ff3a0ba9dc9..eebf69e5f45 100644 --- a/extensions/feishu/src/monitor.account.ts +++ b/extensions/feishu/src/monitor.account.ts @@ -1,15 +1,15 @@ -import * as crypto from "crypto"; -import * as Lark from "@larksuiteoapi/node-sdk"; -import type { ClawdbotConfig, RuntimeEnv, HistoryEntry } from "../runtime-api.js"; +import * as crypto from "node:crypto"; +import type * as Lark from "@larksuiteoapi/node-sdk"; +import type { ClawdbotConfig, HistoryEntry, RuntimeEnv } from "../runtime-api.js"; import { resolveFeishuAccount } from "./accounts.js"; import { raceWithTimeoutAndAbort } from "./async.js"; import { + type FeishuBotAddedEvent, + type FeishuMessageEvent, handleFeishuMessage, parseFeishuMessageEvent, - type FeishuMessageEvent, - type FeishuBotAddedEvent, } from "./bot.js"; -import { handleFeishuCardAction, type FeishuCardActionEvent } from "./card-action.js"; +import { type FeishuCardActionEvent, handleFeishuCardAction } from "./card-action.js"; import { maybeHandleFeishuQuickActionMenu } from "./card-ux-launcher.js"; import { createEventDispatcher } from "./client.js"; import { @@ -254,7 +254,7 @@ function resolveFeishuDebounceMentions(params: { function registerEventHandlers( eventDispatcher: Lark.EventDispatcher, context: RegisterEventHandlersContext, -): void { +): { unregisterDebouncer: () => void } { const { cfg, accountId, runtime, chatHistories, fireAndForget } = context; const core = getFeishuRuntime(); const inboundDebounceMs = core.channel.debounce.resolveInboundDebounceMs({ @@ -617,6 +617,8 @@ function registerEventHandlers( } }, }); + + return { unregisterDebouncer: inboundDebouncer.unregister }; } export type BotOpenIdSource = @@ -639,7 +641,10 @@ export async function monitorSingleAccount(params: MonitorSingleAccountParams): const botOpenIdSource = params.botOpenIdSource ?? { kind: "fetch" }; const botIdentity = botOpenIdSource.kind === "prefetched" - ? { botOpenId: botOpenIdSource.botOpenId, botName: botOpenIdSource.botName } + ? { + botOpenId: botOpenIdSource.botOpenId, + botName: botOpenIdSource.botName, + } : await fetchBotIdentityForMonitor(account, { runtime, abortSignal }); const botOpenId = botIdentity.botOpenId; const botName = botIdentity.botName?.trim(); @@ -670,7 +675,7 @@ export async function monitorSingleAccount(params: MonitorSingleAccountParams): const chatHistories = new Map(); threadBindingManager = createFeishuThreadBindingManager({ accountId, cfg }); - registerEventHandlers(eventDispatcher, { + const { unregisterDebouncer } = registerEventHandlers(eventDispatcher, { cfg, accountId, runtime, @@ -678,10 +683,26 @@ export async function monitorSingleAccount(params: MonitorSingleAccountParams): fireAndForget: true, }); - if (connectionMode === "webhook") { - return await monitorWebhook({ account, accountId, runtime, abortSignal, eventDispatcher }); + try { + if (connectionMode === "webhook") { + return await monitorWebhook({ + account, + accountId, + runtime, + abortSignal, + eventDispatcher, + }); + } + return await monitorWebSocket({ + account, + accountId, + runtime, + abortSignal, + eventDispatcher, + }); + } finally { + unregisterDebouncer(); } - return await monitorWebSocket({ account, accountId, runtime, abortSignal, eventDispatcher }); } finally { threadBindingManager?.stop(); } diff --git a/extensions/msteams/src/monitor-handler.file-consent.test.ts b/extensions/msteams/src/monitor-handler.file-consent.test.ts index 93ad1634c4d..ea06b5d19c7 100644 --- a/extensions/msteams/src/monitor-handler.file-consent.test.ts +++ b/extensions/msteams/src/monitor-handler.file-consent.test.ts @@ -141,7 +141,7 @@ function createConsentInvokeHarness(params: { contentType: "text/plain", conversationId: params.pendingConversationId ?? "19:victim@thread.v2", }); - const handler = registerMSTeamsHandlers(createActivityHandler(), createDeps()); + const { handler } = registerMSTeamsHandlers(createActivityHandler(), createDeps()); const { context, sendActivity } = createInvokeContext({ conversationId: params.invokeConversationId, uploadId, diff --git a/extensions/msteams/src/monitor-handler.ts b/extensions/msteams/src/monitor-handler.ts index 4cda545bd02..fec16302014 100644 --- a/extensions/msteams/src/monitor-handler.ts +++ b/extensions/msteams/src/monitor-handler.ts @@ -140,8 +140,8 @@ async function handleFileConsentInvoke( export function registerMSTeamsHandlers( handler: T, deps: MSTeamsMessageHandlerDeps, -): T { - const handleTeamsMessage = createMSTeamsMessageHandler(deps); +): { handler: T; unregisterDebouncer: () => void } { + const { handleTeamsMessage, unregisterDebouncer } = createMSTeamsMessageHandler(deps); // Wrap the original run method to intercept invokes const originalRun = handler.run; @@ -151,7 +151,10 @@ export function registerMSTeamsHandlers( // Handle file consent invokes before passing to normal flow if (ctx.activity?.type === "invoke" && ctx.activity?.name === "fileConsent/invoke") { // Send invoke response IMMEDIATELY to prevent Teams timeout - await ctx.sendActivity({ type: "invokeResponse", value: { status: 200 } }); + await ctx.sendActivity({ + type: "invokeResponse", + value: { status: 200 }, + }); try { await withRevokedProxyFallback({ @@ -164,7 +167,9 @@ export function registerMSTeamsHandlers( }, }); } catch (err) { - deps.log.debug?.("file consent handler error", { error: String(err) }); + deps.log.debug?.("file consent handler error", { + error: String(err), + }); } return; } @@ -192,5 +197,5 @@ export function registerMSTeamsHandlers( await next(); }); - return handler; + return { handler, unregisterDebouncer }; } diff --git a/extensions/msteams/src/monitor-handler/message-handler.authz.test.ts b/extensions/msteams/src/monitor-handler/message-handler.authz.test.ts index 96b8d85a790..1bb0cbf6c0d 100644 --- a/extensions/msteams/src/monitor-handler/message-handler.authz.test.ts +++ b/extensions/msteams/src/monitor-handler/message-handler.authz.test.ts @@ -79,8 +79,8 @@ describe("msteams monitor handler authz", () => { }, } as OpenClawConfig); - const handler = createMSTeamsMessageHandler(deps); - await handler({ + const { handleTeamsMessage } = createMSTeamsMessageHandler(deps); + await handleTeamsMessage({ activity: { id: "msg-1", type: "message", @@ -130,8 +130,8 @@ describe("msteams monitor handler authz", () => { }, } as OpenClawConfig); - const handler = createMSTeamsMessageHandler(deps); - await handler({ + const { handleTeamsMessage } = createMSTeamsMessageHandler(deps); + await handleTeamsMessage({ activity: { id: "msg-1", type: "message", diff --git a/extensions/msteams/src/monitor-handler/message-handler.ts b/extensions/msteams/src/monitor-handler/message-handler.ts index fe6751b94c3..05987b234ee 100644 --- a/extensions/msteams/src/monitor-handler/message-handler.ts +++ b/extensions/msteams/src/monitor-handler/message-handler.ts @@ -1,24 +1,24 @@ import { DEFAULT_ACCOUNT_ID, + DEFAULT_GROUP_HISTORY_LIMIT, buildPendingHistoryContextFromMap, clearHistoryEntriesIfEnabled, createChannelPairingController, dispatchReplyFromConfigWithSettledDispatcher, - DEFAULT_GROUP_HISTORY_LIMIT, - logInboundDrop, evaluateSenderGroupAccessForPolicy, - resolveSenderScopedGroupPolicy, - recordPendingHistoryEntryIfEnabled, - resolveDualTextControlCommandGate, - resolveDefaultGroupPolicy, - isDangerousNameMatchingEnabled, - readStoreAllowFromForDmPolicy, - resolveMentionGating, - resolveInboundSessionEnvelopeContext, formatAllowlistMatchMeta, - resolveEffectiveAllowFromLists, - resolveDmGroupAccessWithLists, type HistoryEntry, + isDangerousNameMatchingEnabled, + logInboundDrop, + readStoreAllowFromForDmPolicy, + recordPendingHistoryEntryIfEnabled, + resolveDefaultGroupPolicy, + resolveDmGroupAccessWithLists, + resolveDualTextControlCommandGate, + resolveEffectiveAllowFromLists, + resolveInboundSessionEnvelopeContext, + resolveMentionGating, + resolveSenderScopedGroupPolicy, } from "../../runtime-api.js"; import { buildMSTeamsAttachmentPlaceholder, @@ -675,7 +675,7 @@ export function createMSTeamsMessageHandler(deps: MSTeamsMessageHandlerDeps) { }, }); - return async function handleTeamsMessage(context: MSTeamsTurnContext) { + const handleTeamsMessage = async (context: MSTeamsTurnContext) => { const activity = context.activity; const rawText = activity.text?.trim() ?? ""; const text = stripMSTeamsMentionTags(rawText); @@ -698,4 +698,9 @@ export function createMSTeamsMessageHandler(deps: MSTeamsMessageHandlerDeps) { implicitMention, }); }; + + return { + handleTeamsMessage, + unregisterDebouncer: inboundDebouncer.unregister, + }; } diff --git a/extensions/msteams/src/monitor.lifecycle.test.ts b/extensions/msteams/src/monitor.lifecycle.test.ts index 67302dc61dd..d0a902ca263 100644 --- a/extensions/msteams/src/monitor.lifecycle.test.ts +++ b/extensions/msteams/src/monitor.lifecycle.test.ts @@ -30,7 +30,9 @@ vi.mock("../runtime-api.js", () => ({ resolve(); return; } - params.abortSignal?.addEventListener("abort", () => resolve(), { once: true }); + params.abortSignal?.addEventListener("abort", () => resolve(), { + once: true, + }); }); await params.onAbort?.(); }, @@ -80,7 +82,8 @@ vi.mock("express", () => { const registerMSTeamsHandlers = vi.hoisted(() => vi.fn(() => ({ - run: vi.fn(async () => {}), + handler: { run: vi.fn(async () => {}) }, + unregisterDebouncer: vi.fn(), })), ); const createMSTeamsAdapter = vi.hoisted(() => diff --git a/extensions/msteams/src/monitor.ts b/extensions/msteams/src/monitor.ts index f5c60064174..993705b2d30 100644 --- a/extensions/msteams/src/monitor.ts +++ b/extensions/msteams/src/monitor.ts @@ -12,7 +12,7 @@ import { createMSTeamsConversationStoreFs } from "./conversation-store-fs.js"; import type { MSTeamsConversationStore } from "./conversation-store.js"; import { formatUnknownError } from "./errors.js"; import type { MSTeamsAdapter } from "./messenger.js"; -import { registerMSTeamsHandlers, type MSTeamsActivityHandler } from "./monitor-handler.js"; +import { type MSTeamsActivityHandler, registerMSTeamsHandlers } from "./monitor-handler.js"; import { createMSTeamsPollStoreFs, type MSTeamsPollStore } from "./polls.js"; import { resolveMSTeamsChannelAllowlist, @@ -136,12 +136,19 @@ export async function monitorMSTeamsProvider( .filter((entry) => entry && entry !== "*"); if (groupEntries.length > 0) { const { additions } = await resolveAllowlistUsers("msteams group users", groupEntries); - groupAllowFrom = mergeAllowlist({ existing: groupAllowFrom, additions }); + groupAllowFrom = mergeAllowlist({ + existing: groupAllowFrom, + additions, + }); } } if (teamsConfig && Object.keys(teamsConfig).length > 0) { - const entries: Array<{ input: string; teamKey: string; channelKey?: string }> = []; + const entries: Array<{ + input: string; + teamKey: string; + channelKey?: string; + }> = []; for (const [teamKey, teamCfg] of Object.entries(teamsConfig)) { if (teamKey === "*") { continue; @@ -190,7 +197,11 @@ export async function monitorMSTeamsProvider( ...sourceTeam.channels, ...existing.channels, }; - const mergedTeam = { ...sourceTeam, ...existing, channels: mergedChannels }; + const mergedTeam = { + ...sourceTeam, + ...existing, + channels: mergedChannels, + }; nextTeams[entry.teamId] = mergedTeam; if (source.channelKey && entry.channelId) { const sourceChannel = sourceTeam.channels?.[source.channelKey]; @@ -254,18 +265,21 @@ export async function monitorMSTeamsProvider( const tokenProvider = new MsalTokenProvider(authConfig); const adapter = createMSTeamsAdapter(authConfig, sdk); - const handler = registerMSTeamsHandlers(new ActivityHandler() as MSTeamsActivityHandler, { - cfg, - runtime, - appId, - adapter: adapter as unknown as MSTeamsAdapter, - tokenProvider, - textLimit, - mediaMaxBytes, - conversationStore, - pollStore, - log, - }); + const { handler, unregisterDebouncer } = registerMSTeamsHandlers( + new ActivityHandler() as MSTeamsActivityHandler, + { + cfg, + runtime, + appId, + adapter: adapter as unknown as MSTeamsAdapter, + tokenProvider, + textLimit, + mediaMaxBytes, + conversationStore, + pollStore, + log, + }, + ); // Create Express server const expressApp = express.default(); @@ -283,7 +297,7 @@ export async function monitorMSTeamsProvider( const configuredPath = msteamsCfg.webhook?.path ?? "/api/messages"; const messageHandler = (req: Request, res: Response) => { void adapter - .process(req, res, (context: unknown) => handler.run!(context)) + .process(req, res, (context: unknown) => handler.run?.(context)) .catch((err: unknown) => { log.error("msteams webhook failed", { error: formatUnknownError(err) }); }); @@ -324,6 +338,7 @@ export async function monitorMSTeamsProvider( const shutdown = async () => { log.info("shutting down msteams provider"); + unregisterDebouncer(); return new Promise((resolve) => { httpServer.close((err) => { if (err) { diff --git a/extensions/telegram/src/bot-handlers.runtime.ts b/extensions/telegram/src/bot-handlers.runtime.ts index 6df428d1273..5da805e0297 100644 --- a/extensions/telegram/src/bot-handlers.runtime.ts +++ b/extensions/telegram/src/bot-handlers.runtime.ts @@ -110,7 +110,7 @@ export const registerTelegramHandlers = ({ processMessage, logger, telegramDeps = defaultTelegramBotDeps, -}: RegisterTelegramHandlerParams) => { +}: RegisterTelegramHandlerParams): { unregisterDebouncer: () => void } => { const DEFAULT_TEXT_FRAGMENT_MAX_GAP_MS = 1500; const TELEGRAM_TEXT_FRAGMENT_START_THRESHOLD_CHARS = 4000; const TELEGRAM_TEXT_FRAGMENT_MAX_GAP_MS = @@ -1746,4 +1746,6 @@ export const registerTelegramHandlers = ({ errorMessage: "channel_post handler failed", }); }); + + return { unregisterDebouncer: inboundDebouncer.unregister }; }; diff --git a/extensions/telegram/src/bot.ts b/extensions/telegram/src/bot.ts index 479560c8e38..b2887af581b 100644 --- a/extensions/telegram/src/bot.ts +++ b/extensions/telegram/src/bot.ts @@ -322,7 +322,7 @@ export function createTelegramBot(opts: TelegramBotOptions) { const MAX_RAW_UPDATE_ARRAY = 20; const stringifyUpdate = (update: unknown) => { const seen = new WeakSet(); - return JSON.stringify(update ?? null, (key, value) => { + return JSON.stringify(update ?? null, (_key, value) => { if (typeof value === "string" && value.length > MAX_RAW_UPDATE_STRING) { return `${value.slice(0, MAX_RAW_UPDATE_STRING)}...`; } @@ -529,7 +529,7 @@ export function createTelegramBot(opts: TelegramBotOptions) { telegramDeps, }); - registerTelegramHandlers({ + const { unregisterDebouncer } = registerTelegramHandlers({ cfg, accountId: account.accountId, bot, @@ -550,6 +550,7 @@ export function createTelegramBot(opts: TelegramBotOptions) { const originalStop = bot.stop.bind(bot); bot.stop = ((...args: Parameters) => { + unregisterDebouncer(); threadBindingManager?.stop(); return originalStop(...args); }) as typeof bot.stop; diff --git a/src/auto-reply/inbound-debounce.ts b/src/auto-reply/inbound-debounce.ts index c5ab68a58d7..813aaade21a 100644 --- a/src/auto-reply/inbound-debounce.ts +++ b/src/auto-reply/inbound-debounce.ts @@ -16,6 +16,8 @@ type DebouncerFlushResult = { type DebouncerFlushHandle = { flushAll: (options?: { deadlineMs?: number }) => Promise; unregister: () => void; + /** Epoch ms of last enqueue or creation, whichever is more recent. */ + lastActivityMs: number; }; const INBOUND_DEBOUNCERS_KEY = Symbol.for("openclaw.inboundDebouncers"); const INBOUND_DEBOUNCERS = resolveGlobalMap(INBOUND_DEBOUNCERS_KEY); @@ -27,25 +29,35 @@ export function clearInboundDebouncerRegistry(): void { INBOUND_DEBOUNCERS.clear(); } +/** Debouncers idle longer than this are auto-removed during flush as a safety + * net against channels that forget to call unregister() on teardown. */ +const STALE_DEBOUNCER_MS = 5 * 60 * 1000; // 5 minutes + /** * Flush all registered inbound debouncers immediately. Called during SIGUSR1 * restart to push buffered messages into the session before reinitializing. * Returns the number of debounce buffers actually flushed so restart logic can * skip followup draining when there was no buffered work. + * + * Stale debouncers (no enqueue activity for >5 minutes) are auto-evicted as a + * safety net in case a channel monitor forgot to call unregister() on teardown. */ export async function flushAllInboundDebouncers(options?: { timeoutMs?: number }): Promise { const entries = [...INBOUND_DEBOUNCERS.entries()]; if (entries.length === 0) { return 0; } + const now = Date.now(); const deadlineMs = typeof options?.timeoutMs === "number" && Number.isFinite(options.timeoutMs) - ? Date.now() + Math.max(0, Math.trunc(options.timeoutMs)) + ? now + Math.max(0, Math.trunc(options.timeoutMs)) : undefined; const flushedCounts = await Promise.all( entries.map(async ([_key, handle]) => { const result = await handle.flushAll({ deadlineMs }); - if (result.drained) { + // Remove drained debouncers, and auto-evict stale entries whose + // owning channel never called unregister() (e.g. after reconnect). + if (result.drained || now - handle.lastActivityMs >= STALE_DEBOUNCER_MS) { handle.unregister(); } return result.flushedCount; @@ -152,6 +164,7 @@ export function createInboundDebouncer(params: InboundDebounceCreateParams }; const enqueue = async (item: T) => { + handle.lastActivityMs = Date.now(); const key = params.buildKey(item); const debounceMs = resolveDebounceMs(item); const canDebounce = debounceMs > 0 && (params.shouldDebounce?.(item) ?? true); @@ -239,10 +252,12 @@ export function createInboundDebouncer(params: InboundDebounceCreateParams const unregister = () => { INBOUND_DEBOUNCERS.delete(registryKey); }; - INBOUND_DEBOUNCERS.set(registryKey, { + const handle: DebouncerFlushHandle = { flushAll: flushAllInternal, unregister, - }); + lastActivityMs: Date.now(), + }; + INBOUND_DEBOUNCERS.set(registryKey, handle); return { enqueue, flushKey, flushAll, unregister }; } diff --git a/src/auto-reply/inbound.test.ts b/src/auto-reply/inbound.test.ts index ef1880f4784..a724036d007 100644 --- a/src/auto-reply/inbound.test.ts +++ b/src/auto-reply/inbound.test.ts @@ -581,6 +581,61 @@ describe("flushAllInboundDebouncers", () => { vi.useRealTimers(); }); + + it("auto-evicts stale debouncers idle >5 min even with a tight deadline", async () => { + vi.useFakeTimers(); + + // Use a debounceMs longer than the staleness window so the debounce + // timeout does NOT fire when we advance the clock. + createInboundDebouncer<{ key: string; id: string }>({ + debounceMs: 10 * 60 * 1000, + buildKey: (item) => item.key, + onFlush: async () => {}, + }); + + // Advance past the 5-minute staleness window (debounce timer still pending) + vi.advanceTimersByTime(5 * 60 * 1000 + 1); + + // Flush with a zero-ms timeout. The deadline fires immediately so the + // debouncer cannot actually drain, but the staleness guard evicts it. + await flushAllInboundDebouncers({ timeoutMs: 0 }); + + // Second flush should find nothing — stale entry was auto-evicted + const flushed2 = await flushAllInboundDebouncers(); + expect(flushed2).toBe(0); + + vi.useRealTimers(); + }); + + it("does not evict debouncers that have recent activity", async () => { + vi.useFakeTimers(); + + const debouncer = createInboundDebouncer<{ key: string; id: string }>({ + debounceMs: 10 * 60 * 1000, + buildKey: (item) => item.key, + onFlush: async () => {}, + }); + + // Advance 4 minutes (below staleness threshold) + vi.advanceTimersByTime(4 * 60 * 1000); + + // Enqueue refreshes the activity timestamp + await debouncer.enqueue({ key: "session-1", id: "msg-1" }); + + // Advance another 4 minutes (8 total since creation, 4 since enqueue) + vi.advanceTimersByTime(4 * 60 * 1000); + + // Flush with zero timeout — debouncer can't drain, but it's NOT stale + // (only 4 min since last enqueue). It should remain registered. + await flushAllInboundDebouncers({ timeoutMs: 0 }); + + // Debouncer should still be in the registry + // Do a full flush to verify it's still there + const flushed = await flushAllInboundDebouncers(); + expect(flushed).toBe(1); + + vi.useRealTimers(); + }); }); describe("createInboundDebouncer flushAll", () => {