fix: unregister inbound debouncers on channel teardown

Telegram, Feishu, and MSTeams channel monitors created inbound debouncers
without calling unregister() during teardown. On reconnect a new debouncer
was registered while the old one stayed in the global INBOUND_DEBOUNCERS
map, accumulating stale entries that increased restart latency and memory.

- Telegram: registerTelegramHandlers now returns unregisterDebouncer;
  called in bot.stop override
- Feishu: registerEventHandlers now returns unregisterDebouncer;
  monitorSingleAccount wraps transport in try/finally
- MSTeams: createMSTeamsMessageHandler returns { handleTeamsMessage,
  unregisterDebouncer }; threaded through registerMSTeamsHandlers and
  called in monitor shutdown

Safety net: flushAllInboundDebouncers auto-evicts debouncers idle >5 min
so orphaned entries from channels that forget unregister() are cleaned up.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
Joey Krug 2026-03-14 23:15:43 -04:00
parent de53e27215
commit 1830baed03
11 changed files with 183 additions and 61 deletions

View File

@ -1,15 +1,15 @@
import * as crypto from "crypto";
import * as Lark from "@larksuiteoapi/node-sdk";
import type { ClawdbotConfig, RuntimeEnv, HistoryEntry } from "../runtime-api.js";
import * as crypto from "node:crypto";
import type * as Lark from "@larksuiteoapi/node-sdk";
import type { ClawdbotConfig, HistoryEntry, RuntimeEnv } from "../runtime-api.js";
import { resolveFeishuAccount } from "./accounts.js";
import { raceWithTimeoutAndAbort } from "./async.js";
import {
type FeishuBotAddedEvent,
type FeishuMessageEvent,
handleFeishuMessage,
parseFeishuMessageEvent,
type FeishuMessageEvent,
type FeishuBotAddedEvent,
} from "./bot.js";
import { handleFeishuCardAction, type FeishuCardActionEvent } from "./card-action.js";
import { type FeishuCardActionEvent, handleFeishuCardAction } from "./card-action.js";
import { maybeHandleFeishuQuickActionMenu } from "./card-ux-launcher.js";
import { createEventDispatcher } from "./client.js";
import {
@ -254,7 +254,7 @@ function resolveFeishuDebounceMentions(params: {
function registerEventHandlers(
eventDispatcher: Lark.EventDispatcher,
context: RegisterEventHandlersContext,
): void {
): { unregisterDebouncer: () => void } {
const { cfg, accountId, runtime, chatHistories, fireAndForget } = context;
const core = getFeishuRuntime();
const inboundDebounceMs = core.channel.debounce.resolveInboundDebounceMs({
@ -617,6 +617,8 @@ function registerEventHandlers(
}
},
});
return { unregisterDebouncer: inboundDebouncer.unregister };
}
export type BotOpenIdSource =
@ -639,7 +641,10 @@ export async function monitorSingleAccount(params: MonitorSingleAccountParams):
const botOpenIdSource = params.botOpenIdSource ?? { kind: "fetch" };
const botIdentity =
botOpenIdSource.kind === "prefetched"
? { botOpenId: botOpenIdSource.botOpenId, botName: botOpenIdSource.botName }
? {
botOpenId: botOpenIdSource.botOpenId,
botName: botOpenIdSource.botName,
}
: await fetchBotIdentityForMonitor(account, { runtime, abortSignal });
const botOpenId = botIdentity.botOpenId;
const botName = botIdentity.botName?.trim();
@ -670,7 +675,7 @@ export async function monitorSingleAccount(params: MonitorSingleAccountParams):
const chatHistories = new Map<string, HistoryEntry[]>();
threadBindingManager = createFeishuThreadBindingManager({ accountId, cfg });
registerEventHandlers(eventDispatcher, {
const { unregisterDebouncer } = registerEventHandlers(eventDispatcher, {
cfg,
accountId,
runtime,
@ -678,10 +683,26 @@ export async function monitorSingleAccount(params: MonitorSingleAccountParams):
fireAndForget: true,
});
if (connectionMode === "webhook") {
return await monitorWebhook({ account, accountId, runtime, abortSignal, eventDispatcher });
try {
if (connectionMode === "webhook") {
return await monitorWebhook({
account,
accountId,
runtime,
abortSignal,
eventDispatcher,
});
}
return await monitorWebSocket({
account,
accountId,
runtime,
abortSignal,
eventDispatcher,
});
} finally {
unregisterDebouncer();
}
return await monitorWebSocket({ account, accountId, runtime, abortSignal, eventDispatcher });
} finally {
threadBindingManager?.stop();
}

View File

@ -141,7 +141,7 @@ function createConsentInvokeHarness(params: {
contentType: "text/plain",
conversationId: params.pendingConversationId ?? "19:victim@thread.v2",
});
const handler = registerMSTeamsHandlers(createActivityHandler(), createDeps());
const { handler } = registerMSTeamsHandlers(createActivityHandler(), createDeps());
const { context, sendActivity } = createInvokeContext({
conversationId: params.invokeConversationId,
uploadId,

View File

@ -140,8 +140,8 @@ async function handleFileConsentInvoke(
export function registerMSTeamsHandlers<T extends MSTeamsActivityHandler>(
handler: T,
deps: MSTeamsMessageHandlerDeps,
): T {
const handleTeamsMessage = createMSTeamsMessageHandler(deps);
): { handler: T; unregisterDebouncer: () => void } {
const { handleTeamsMessage, unregisterDebouncer } = createMSTeamsMessageHandler(deps);
// Wrap the original run method to intercept invokes
const originalRun = handler.run;
@ -151,7 +151,10 @@ export function registerMSTeamsHandlers<T extends MSTeamsActivityHandler>(
// Handle file consent invokes before passing to normal flow
if (ctx.activity?.type === "invoke" && ctx.activity?.name === "fileConsent/invoke") {
// Send invoke response IMMEDIATELY to prevent Teams timeout
await ctx.sendActivity({ type: "invokeResponse", value: { status: 200 } });
await ctx.sendActivity({
type: "invokeResponse",
value: { status: 200 },
});
try {
await withRevokedProxyFallback({
@ -164,7 +167,9 @@ export function registerMSTeamsHandlers<T extends MSTeamsActivityHandler>(
},
});
} catch (err) {
deps.log.debug?.("file consent handler error", { error: String(err) });
deps.log.debug?.("file consent handler error", {
error: String(err),
});
}
return;
}
@ -192,5 +197,5 @@ export function registerMSTeamsHandlers<T extends MSTeamsActivityHandler>(
await next();
});
return handler;
return { handler, unregisterDebouncer };
}

View File

@ -79,8 +79,8 @@ describe("msteams monitor handler authz", () => {
},
} as OpenClawConfig);
const handler = createMSTeamsMessageHandler(deps);
await handler({
const { handleTeamsMessage } = createMSTeamsMessageHandler(deps);
await handleTeamsMessage({
activity: {
id: "msg-1",
type: "message",
@ -130,8 +130,8 @@ describe("msteams monitor handler authz", () => {
},
} as OpenClawConfig);
const handler = createMSTeamsMessageHandler(deps);
await handler({
const { handleTeamsMessage } = createMSTeamsMessageHandler(deps);
await handleTeamsMessage({
activity: {
id: "msg-1",
type: "message",

View File

@ -1,24 +1,24 @@
import {
DEFAULT_ACCOUNT_ID,
DEFAULT_GROUP_HISTORY_LIMIT,
buildPendingHistoryContextFromMap,
clearHistoryEntriesIfEnabled,
createChannelPairingController,
dispatchReplyFromConfigWithSettledDispatcher,
DEFAULT_GROUP_HISTORY_LIMIT,
logInboundDrop,
evaluateSenderGroupAccessForPolicy,
resolveSenderScopedGroupPolicy,
recordPendingHistoryEntryIfEnabled,
resolveDualTextControlCommandGate,
resolveDefaultGroupPolicy,
isDangerousNameMatchingEnabled,
readStoreAllowFromForDmPolicy,
resolveMentionGating,
resolveInboundSessionEnvelopeContext,
formatAllowlistMatchMeta,
resolveEffectiveAllowFromLists,
resolveDmGroupAccessWithLists,
type HistoryEntry,
isDangerousNameMatchingEnabled,
logInboundDrop,
readStoreAllowFromForDmPolicy,
recordPendingHistoryEntryIfEnabled,
resolveDefaultGroupPolicy,
resolveDmGroupAccessWithLists,
resolveDualTextControlCommandGate,
resolveEffectiveAllowFromLists,
resolveInboundSessionEnvelopeContext,
resolveMentionGating,
resolveSenderScopedGroupPolicy,
} from "../../runtime-api.js";
import {
buildMSTeamsAttachmentPlaceholder,
@ -675,7 +675,7 @@ export function createMSTeamsMessageHandler(deps: MSTeamsMessageHandlerDeps) {
},
});
return async function handleTeamsMessage(context: MSTeamsTurnContext) {
const handleTeamsMessage = async (context: MSTeamsTurnContext) => {
const activity = context.activity;
const rawText = activity.text?.trim() ?? "";
const text = stripMSTeamsMentionTags(rawText);
@ -698,4 +698,9 @@ export function createMSTeamsMessageHandler(deps: MSTeamsMessageHandlerDeps) {
implicitMention,
});
};
return {
handleTeamsMessage,
unregisterDebouncer: inboundDebouncer.unregister,
};
}

View File

@ -30,7 +30,9 @@ vi.mock("../runtime-api.js", () => ({
resolve();
return;
}
params.abortSignal?.addEventListener("abort", () => resolve(), { once: true });
params.abortSignal?.addEventListener("abort", () => resolve(), {
once: true,
});
});
await params.onAbort?.();
},
@ -80,7 +82,8 @@ vi.mock("express", () => {
const registerMSTeamsHandlers = vi.hoisted(() =>
vi.fn(() => ({
run: vi.fn(async () => {}),
handler: { run: vi.fn(async () => {}) },
unregisterDebouncer: vi.fn(),
})),
);
const createMSTeamsAdapter = vi.hoisted(() =>

View File

@ -12,7 +12,7 @@ import { createMSTeamsConversationStoreFs } from "./conversation-store-fs.js";
import type { MSTeamsConversationStore } from "./conversation-store.js";
import { formatUnknownError } from "./errors.js";
import type { MSTeamsAdapter } from "./messenger.js";
import { registerMSTeamsHandlers, type MSTeamsActivityHandler } from "./monitor-handler.js";
import { type MSTeamsActivityHandler, registerMSTeamsHandlers } from "./monitor-handler.js";
import { createMSTeamsPollStoreFs, type MSTeamsPollStore } from "./polls.js";
import {
resolveMSTeamsChannelAllowlist,
@ -136,12 +136,19 @@ export async function monitorMSTeamsProvider(
.filter((entry) => entry && entry !== "*");
if (groupEntries.length > 0) {
const { additions } = await resolveAllowlistUsers("msteams group users", groupEntries);
groupAllowFrom = mergeAllowlist({ existing: groupAllowFrom, additions });
groupAllowFrom = mergeAllowlist({
existing: groupAllowFrom,
additions,
});
}
}
if (teamsConfig && Object.keys(teamsConfig).length > 0) {
const entries: Array<{ input: string; teamKey: string; channelKey?: string }> = [];
const entries: Array<{
input: string;
teamKey: string;
channelKey?: string;
}> = [];
for (const [teamKey, teamCfg] of Object.entries(teamsConfig)) {
if (teamKey === "*") {
continue;
@ -190,7 +197,11 @@ export async function monitorMSTeamsProvider(
...sourceTeam.channels,
...existing.channels,
};
const mergedTeam = { ...sourceTeam, ...existing, channels: mergedChannels };
const mergedTeam = {
...sourceTeam,
...existing,
channels: mergedChannels,
};
nextTeams[entry.teamId] = mergedTeam;
if (source.channelKey && entry.channelId) {
const sourceChannel = sourceTeam.channels?.[source.channelKey];
@ -254,18 +265,21 @@ export async function monitorMSTeamsProvider(
const tokenProvider = new MsalTokenProvider(authConfig);
const adapter = createMSTeamsAdapter(authConfig, sdk);
const handler = registerMSTeamsHandlers(new ActivityHandler() as MSTeamsActivityHandler, {
cfg,
runtime,
appId,
adapter: adapter as unknown as MSTeamsAdapter,
tokenProvider,
textLimit,
mediaMaxBytes,
conversationStore,
pollStore,
log,
});
const { handler, unregisterDebouncer } = registerMSTeamsHandlers(
new ActivityHandler() as MSTeamsActivityHandler,
{
cfg,
runtime,
appId,
adapter: adapter as unknown as MSTeamsAdapter,
tokenProvider,
textLimit,
mediaMaxBytes,
conversationStore,
pollStore,
log,
},
);
// Create Express server
const expressApp = express.default();
@ -283,7 +297,7 @@ export async function monitorMSTeamsProvider(
const configuredPath = msteamsCfg.webhook?.path ?? "/api/messages";
const messageHandler = (req: Request, res: Response) => {
void adapter
.process(req, res, (context: unknown) => handler.run!(context))
.process(req, res, (context: unknown) => handler.run?.(context))
.catch((err: unknown) => {
log.error("msteams webhook failed", { error: formatUnknownError(err) });
});
@ -324,6 +338,7 @@ export async function monitorMSTeamsProvider(
const shutdown = async () => {
log.info("shutting down msteams provider");
unregisterDebouncer();
return new Promise<void>((resolve) => {
httpServer.close((err) => {
if (err) {

View File

@ -110,7 +110,7 @@ export const registerTelegramHandlers = ({
processMessage,
logger,
telegramDeps = defaultTelegramBotDeps,
}: RegisterTelegramHandlerParams) => {
}: RegisterTelegramHandlerParams): { unregisterDebouncer: () => void } => {
const DEFAULT_TEXT_FRAGMENT_MAX_GAP_MS = 1500;
const TELEGRAM_TEXT_FRAGMENT_START_THRESHOLD_CHARS = 4000;
const TELEGRAM_TEXT_FRAGMENT_MAX_GAP_MS =
@ -1746,4 +1746,6 @@ export const registerTelegramHandlers = ({
errorMessage: "channel_post handler failed",
});
});
return { unregisterDebouncer: inboundDebouncer.unregister };
};

View File

@ -322,7 +322,7 @@ export function createTelegramBot(opts: TelegramBotOptions) {
const MAX_RAW_UPDATE_ARRAY = 20;
const stringifyUpdate = (update: unknown) => {
const seen = new WeakSet();
return JSON.stringify(update ?? null, (key, value) => {
return JSON.stringify(update ?? null, (_key, value) => {
if (typeof value === "string" && value.length > MAX_RAW_UPDATE_STRING) {
return `${value.slice(0, MAX_RAW_UPDATE_STRING)}...`;
}
@ -529,7 +529,7 @@ export function createTelegramBot(opts: TelegramBotOptions) {
telegramDeps,
});
registerTelegramHandlers({
const { unregisterDebouncer } = registerTelegramHandlers({
cfg,
accountId: account.accountId,
bot,
@ -550,6 +550,7 @@ export function createTelegramBot(opts: TelegramBotOptions) {
const originalStop = bot.stop.bind(bot);
bot.stop = ((...args: Parameters<typeof originalStop>) => {
unregisterDebouncer();
threadBindingManager?.stop();
return originalStop(...args);
}) as typeof bot.stop;

View File

@ -16,6 +16,8 @@ type DebouncerFlushResult = {
type DebouncerFlushHandle = {
flushAll: (options?: { deadlineMs?: number }) => Promise<DebouncerFlushResult>;
unregister: () => void;
/** Epoch ms of last enqueue or creation, whichever is more recent. */
lastActivityMs: number;
};
const INBOUND_DEBOUNCERS_KEY = Symbol.for("openclaw.inboundDebouncers");
const INBOUND_DEBOUNCERS = resolveGlobalMap<symbol, DebouncerFlushHandle>(INBOUND_DEBOUNCERS_KEY);
@ -27,25 +29,35 @@ export function clearInboundDebouncerRegistry(): void {
INBOUND_DEBOUNCERS.clear();
}
/** Debouncers idle longer than this are auto-removed during flush as a safety
* net against channels that forget to call unregister() on teardown. */
const STALE_DEBOUNCER_MS = 5 * 60 * 1000; // 5 minutes
/**
* Flush all registered inbound debouncers immediately. Called during SIGUSR1
* restart to push buffered messages into the session before reinitializing.
* Returns the number of debounce buffers actually flushed so restart logic can
* skip followup draining when there was no buffered work.
*
* Stale debouncers (no enqueue activity for >5 minutes) are auto-evicted as a
* safety net in case a channel monitor forgot to call unregister() on teardown.
*/
export async function flushAllInboundDebouncers(options?: { timeoutMs?: number }): Promise<number> {
const entries = [...INBOUND_DEBOUNCERS.entries()];
if (entries.length === 0) {
return 0;
}
const now = Date.now();
const deadlineMs =
typeof options?.timeoutMs === "number" && Number.isFinite(options.timeoutMs)
? Date.now() + Math.max(0, Math.trunc(options.timeoutMs))
? now + Math.max(0, Math.trunc(options.timeoutMs))
: undefined;
const flushedCounts = await Promise.all(
entries.map(async ([_key, handle]) => {
const result = await handle.flushAll({ deadlineMs });
if (result.drained) {
// Remove drained debouncers, and auto-evict stale entries whose
// owning channel never called unregister() (e.g. after reconnect).
if (result.drained || now - handle.lastActivityMs >= STALE_DEBOUNCER_MS) {
handle.unregister();
}
return result.flushedCount;
@ -152,6 +164,7 @@ export function createInboundDebouncer<T>(params: InboundDebounceCreateParams<T>
};
const enqueue = async (item: T) => {
handle.lastActivityMs = Date.now();
const key = params.buildKey(item);
const debounceMs = resolveDebounceMs(item);
const canDebounce = debounceMs > 0 && (params.shouldDebounce?.(item) ?? true);
@ -239,10 +252,12 @@ export function createInboundDebouncer<T>(params: InboundDebounceCreateParams<T>
const unregister = () => {
INBOUND_DEBOUNCERS.delete(registryKey);
};
INBOUND_DEBOUNCERS.set(registryKey, {
const handle: DebouncerFlushHandle = {
flushAll: flushAllInternal,
unregister,
});
lastActivityMs: Date.now(),
};
INBOUND_DEBOUNCERS.set(registryKey, handle);
return { enqueue, flushKey, flushAll, unregister };
}

View File

@ -581,6 +581,61 @@ describe("flushAllInboundDebouncers", () => {
vi.useRealTimers();
});
it("auto-evicts stale debouncers idle >5 min even with a tight deadline", async () => {
vi.useFakeTimers();
// Use a debounceMs longer than the staleness window so the debounce
// timeout does NOT fire when we advance the clock.
createInboundDebouncer<{ key: string; id: string }>({
debounceMs: 10 * 60 * 1000,
buildKey: (item) => item.key,
onFlush: async () => {},
});
// Advance past the 5-minute staleness window (debounce timer still pending)
vi.advanceTimersByTime(5 * 60 * 1000 + 1);
// Flush with a zero-ms timeout. The deadline fires immediately so the
// debouncer cannot actually drain, but the staleness guard evicts it.
await flushAllInboundDebouncers({ timeoutMs: 0 });
// Second flush should find nothing — stale entry was auto-evicted
const flushed2 = await flushAllInboundDebouncers();
expect(flushed2).toBe(0);
vi.useRealTimers();
});
it("does not evict debouncers that have recent activity", async () => {
vi.useFakeTimers();
const debouncer = createInboundDebouncer<{ key: string; id: string }>({
debounceMs: 10 * 60 * 1000,
buildKey: (item) => item.key,
onFlush: async () => {},
});
// Advance 4 minutes (below staleness threshold)
vi.advanceTimersByTime(4 * 60 * 1000);
// Enqueue refreshes the activity timestamp
await debouncer.enqueue({ key: "session-1", id: "msg-1" });
// Advance another 4 minutes (8 total since creation, 4 since enqueue)
vi.advanceTimersByTime(4 * 60 * 1000);
// Flush with zero timeout — debouncer can't drain, but it's NOT stale
// (only 4 min since last enqueue). It should remain registered.
await flushAllInboundDebouncers({ timeoutMs: 0 });
// Debouncer should still be in the registry
// Do a full flush to verify it's still there
const flushed = await flushAllInboundDebouncers();
expect(flushed).toBe(1);
vi.useRealTimers();
});
});
describe("createInboundDebouncer flushAll", () => {