diff --git a/extensions/bluebubbles/src/monitor-debounce.ts b/extensions/bluebubbles/src/monitor-debounce.ts index 298be3e4921..3896cbe4912 100644 --- a/extensions/bluebubbles/src/monitor-debounce.ts +++ b/extensions/bluebubbles/src/monitor-debounce.ts @@ -13,7 +13,9 @@ type BlueBubblesDebounceEntry = { export type BlueBubblesDebouncer = { enqueue: (item: BlueBubblesDebounceEntry) => Promise; - flushKey: (key: string) => Promise; + flushKey: (key: string) => Promise; + flushAll: () => Promise; + unregister: () => void; }; export type BlueBubblesDebounceRegistry = { @@ -199,6 +201,7 @@ export function createBlueBubblesDebounceRegistry(params: { return debouncer; }, removeDebouncer: (target) => { + targetDebouncers.get(target)?.unregister(); targetDebouncers.delete(target); }, }; diff --git a/extensions/discord/src/monitor/message-handler.ts b/extensions/discord/src/monitor/message-handler.ts index e17dcc906af..fdb41c677a8 100644 --- a/extensions/discord/src/monitor/message-handler.ts +++ b/extensions/discord/src/monitor/message-handler.ts @@ -180,7 +180,10 @@ export function createDiscordMessageHandler( } }; - handler.deactivate = inboundWorker.deactivate; + handler.deactivate = () => { + debouncer.unregister(); + inboundWorker.deactivate(); + }; return handler; } diff --git a/extensions/feishu/src/monitor.account.ts b/extensions/feishu/src/monitor.account.ts index ff3a0ba9dc9..eebf69e5f45 100644 --- a/extensions/feishu/src/monitor.account.ts +++ b/extensions/feishu/src/monitor.account.ts @@ -1,15 +1,15 @@ -import * as crypto from "crypto"; -import * as Lark from "@larksuiteoapi/node-sdk"; -import type { ClawdbotConfig, RuntimeEnv, HistoryEntry } from "../runtime-api.js"; +import * as crypto from "node:crypto"; +import type * as Lark from "@larksuiteoapi/node-sdk"; +import type { ClawdbotConfig, HistoryEntry, RuntimeEnv } from "../runtime-api.js"; import { resolveFeishuAccount } from "./accounts.js"; import { raceWithTimeoutAndAbort } from "./async.js"; import { + type FeishuBotAddedEvent, + type FeishuMessageEvent, handleFeishuMessage, parseFeishuMessageEvent, - type FeishuMessageEvent, - type FeishuBotAddedEvent, } from "./bot.js"; -import { handleFeishuCardAction, type FeishuCardActionEvent } from "./card-action.js"; +import { type FeishuCardActionEvent, handleFeishuCardAction } from "./card-action.js"; import { maybeHandleFeishuQuickActionMenu } from "./card-ux-launcher.js"; import { createEventDispatcher } from "./client.js"; import { @@ -254,7 +254,7 @@ function resolveFeishuDebounceMentions(params: { function registerEventHandlers( eventDispatcher: Lark.EventDispatcher, context: RegisterEventHandlersContext, -): void { +): { unregisterDebouncer: () => void } { const { cfg, accountId, runtime, chatHistories, fireAndForget } = context; const core = getFeishuRuntime(); const inboundDebounceMs = core.channel.debounce.resolveInboundDebounceMs({ @@ -617,6 +617,8 @@ function registerEventHandlers( } }, }); + + return { unregisterDebouncer: inboundDebouncer.unregister }; } export type BotOpenIdSource = @@ -639,7 +641,10 @@ export async function monitorSingleAccount(params: MonitorSingleAccountParams): const botOpenIdSource = params.botOpenIdSource ?? { kind: "fetch" }; const botIdentity = botOpenIdSource.kind === "prefetched" - ? { botOpenId: botOpenIdSource.botOpenId, botName: botOpenIdSource.botName } + ? { + botOpenId: botOpenIdSource.botOpenId, + botName: botOpenIdSource.botName, + } : await fetchBotIdentityForMonitor(account, { runtime, abortSignal }); const botOpenId = botIdentity.botOpenId; const botName = botIdentity.botName?.trim(); @@ -670,7 +675,7 @@ export async function monitorSingleAccount(params: MonitorSingleAccountParams): const chatHistories = new Map(); threadBindingManager = createFeishuThreadBindingManager({ accountId, cfg }); - registerEventHandlers(eventDispatcher, { + const { unregisterDebouncer } = registerEventHandlers(eventDispatcher, { cfg, accountId, runtime, @@ -678,10 +683,26 @@ export async function monitorSingleAccount(params: MonitorSingleAccountParams): fireAndForget: true, }); - if (connectionMode === "webhook") { - return await monitorWebhook({ account, accountId, runtime, abortSignal, eventDispatcher }); + try { + if (connectionMode === "webhook") { + return await monitorWebhook({ + account, + accountId, + runtime, + abortSignal, + eventDispatcher, + }); + } + return await monitorWebSocket({ + account, + accountId, + runtime, + abortSignal, + eventDispatcher, + }); + } finally { + unregisterDebouncer(); } - return await monitorWebSocket({ account, accountId, runtime, abortSignal, eventDispatcher }); } finally { threadBindingManager?.stop(); } diff --git a/extensions/feishu/src/monitor.reaction.test.ts b/extensions/feishu/src/monitor.reaction.test.ts index 5765577441f..4ac0a504cb5 100644 --- a/extensions/feishu/src/monitor.reaction.test.ts +++ b/extensions/feishu/src/monitor.reaction.test.ts @@ -711,7 +711,9 @@ describe("Feishu inbound debounce regressions", () => { enqueueMock(item); params.onError?.(new Error("dispatch failed"), [item]); }, - flushKey: async () => {}, + flushKey: async (_key: string) => false, + flushAll: async () => 0, + unregister: () => {}, }), resolveInboundDebounceMs, }, diff --git a/extensions/feishu/src/monitor.test-mocks.ts b/extensions/feishu/src/monitor.test-mocks.ts index 276d6375464..de287c14277 100644 --- a/extensions/feishu/src/monitor.test-mocks.ts +++ b/extensions/feishu/src/monitor.test-mocks.ts @@ -17,7 +17,9 @@ export function createFeishuRuntimeMockModule(): { resolveInboundDebounceMs: () => number; createInboundDebouncer: () => { enqueue: () => Promise; - flushKey: () => Promise; + flushKey: () => Promise; + flushAll: () => Promise; + unregister: () => void; }; }; text: { @@ -33,7 +35,9 @@ export function createFeishuRuntimeMockModule(): { resolveInboundDebounceMs: () => 0, createInboundDebouncer: () => ({ enqueue: async () => {}, - flushKey: async () => {}, + flushKey: async () => false, + flushAll: async () => 0, + unregister: () => {}, }), }, text: { diff --git a/extensions/mattermost/src/mattermost/monitor.ts b/extensions/mattermost/src/mattermost/monitor.ts index 958a40de705..44fcfebe46a 100644 --- a/extensions/mattermost/src/mattermost/monitor.ts +++ b/extensions/mattermost/src/mattermost/monitor.ts @@ -1686,6 +1686,7 @@ export async function monitorMattermostProvider(opts: MonitorMattermostOpts = {} }, }); } finally { + debouncer.unregister(); unregisterInteractions?.(); } diff --git a/extensions/msteams/src/monitor-handler.file-consent.test.ts b/extensions/msteams/src/monitor-handler.file-consent.test.ts index 39b6ea1b1ff..ea06b5d19c7 100644 --- a/extensions/msteams/src/monitor-handler.file-consent.test.ts +++ b/extensions/msteams/src/monitor-handler.file-consent.test.ts @@ -33,6 +33,9 @@ const runtimeStub: PluginRuntime = { resolveInboundDebounceMs: () => 0, createInboundDebouncer: () => ({ enqueue: async () => {}, + flushKey: async () => false, + flushAll: async () => 0, + unregister: () => {}, }), }, }, @@ -138,7 +141,7 @@ function createConsentInvokeHarness(params: { contentType: "text/plain", conversationId: params.pendingConversationId ?? "19:victim@thread.v2", }); - const handler = registerMSTeamsHandlers(createActivityHandler(), createDeps()); + const { handler } = registerMSTeamsHandlers(createActivityHandler(), createDeps()); const { context, sendActivity } = createInvokeContext({ conversationId: params.invokeConversationId, uploadId, diff --git a/extensions/msteams/src/monitor-handler.ts b/extensions/msteams/src/monitor-handler.ts index 4cda545bd02..fec16302014 100644 --- a/extensions/msteams/src/monitor-handler.ts +++ b/extensions/msteams/src/monitor-handler.ts @@ -140,8 +140,8 @@ async function handleFileConsentInvoke( export function registerMSTeamsHandlers( handler: T, deps: MSTeamsMessageHandlerDeps, -): T { - const handleTeamsMessage = createMSTeamsMessageHandler(deps); +): { handler: T; unregisterDebouncer: () => void } { + const { handleTeamsMessage, unregisterDebouncer } = createMSTeamsMessageHandler(deps); // Wrap the original run method to intercept invokes const originalRun = handler.run; @@ -151,7 +151,10 @@ export function registerMSTeamsHandlers( // Handle file consent invokes before passing to normal flow if (ctx.activity?.type === "invoke" && ctx.activity?.name === "fileConsent/invoke") { // Send invoke response IMMEDIATELY to prevent Teams timeout - await ctx.sendActivity({ type: "invokeResponse", value: { status: 200 } }); + await ctx.sendActivity({ + type: "invokeResponse", + value: { status: 200 }, + }); try { await withRevokedProxyFallback({ @@ -164,7 +167,9 @@ export function registerMSTeamsHandlers( }, }); } catch (err) { - deps.log.debug?.("file consent handler error", { error: String(err) }); + deps.log.debug?.("file consent handler error", { + error: String(err), + }); } return; } @@ -192,5 +197,5 @@ export function registerMSTeamsHandlers( await next(); }); - return handler; + return { handler, unregisterDebouncer }; } diff --git a/extensions/msteams/src/monitor-handler/message-handler.authz.test.ts b/extensions/msteams/src/monitor-handler/message-handler.authz.test.ts index 68295e9bb07..1bb0cbf6c0d 100644 --- a/extensions/msteams/src/monitor-handler/message-handler.authz.test.ts +++ b/extensions/msteams/src/monitor-handler/message-handler.authz.test.ts @@ -14,10 +14,18 @@ describe("msteams monitor handler authz", () => { resolveInboundDebounceMs: () => 0, createInboundDebouncer: (params: { onFlush: (entries: T[]) => Promise; - }): { enqueue: (entry: T) => Promise } => ({ + }): { + enqueue: (entry: T) => Promise; + flushKey: (_key: string) => Promise; + flushAll: () => Promise; + unregister: () => void; + } => ({ enqueue: async (entry: T) => { await params.onFlush([entry]); }, + flushKey: async (_key: string) => false, + flushAll: async () => 0, + unregister: () => {}, }), }, pairing: { @@ -71,8 +79,8 @@ describe("msteams monitor handler authz", () => { }, } as OpenClawConfig); - const handler = createMSTeamsMessageHandler(deps); - await handler({ + const { handleTeamsMessage } = createMSTeamsMessageHandler(deps); + await handleTeamsMessage({ activity: { id: "msg-1", type: "message", @@ -122,8 +130,8 @@ describe("msteams monitor handler authz", () => { }, } as OpenClawConfig); - const handler = createMSTeamsMessageHandler(deps); - await handler({ + const { handleTeamsMessage } = createMSTeamsMessageHandler(deps); + await handleTeamsMessage({ activity: { id: "msg-1", type: "message", diff --git a/extensions/msteams/src/monitor-handler/message-handler.ts b/extensions/msteams/src/monitor-handler/message-handler.ts index fe6751b94c3..05987b234ee 100644 --- a/extensions/msteams/src/monitor-handler/message-handler.ts +++ b/extensions/msteams/src/monitor-handler/message-handler.ts @@ -1,24 +1,24 @@ import { DEFAULT_ACCOUNT_ID, + DEFAULT_GROUP_HISTORY_LIMIT, buildPendingHistoryContextFromMap, clearHistoryEntriesIfEnabled, createChannelPairingController, dispatchReplyFromConfigWithSettledDispatcher, - DEFAULT_GROUP_HISTORY_LIMIT, - logInboundDrop, evaluateSenderGroupAccessForPolicy, - resolveSenderScopedGroupPolicy, - recordPendingHistoryEntryIfEnabled, - resolveDualTextControlCommandGate, - resolveDefaultGroupPolicy, - isDangerousNameMatchingEnabled, - readStoreAllowFromForDmPolicy, - resolveMentionGating, - resolveInboundSessionEnvelopeContext, formatAllowlistMatchMeta, - resolveEffectiveAllowFromLists, - resolveDmGroupAccessWithLists, type HistoryEntry, + isDangerousNameMatchingEnabled, + logInboundDrop, + readStoreAllowFromForDmPolicy, + recordPendingHistoryEntryIfEnabled, + resolveDefaultGroupPolicy, + resolveDmGroupAccessWithLists, + resolveDualTextControlCommandGate, + resolveEffectiveAllowFromLists, + resolveInboundSessionEnvelopeContext, + resolveMentionGating, + resolveSenderScopedGroupPolicy, } from "../../runtime-api.js"; import { buildMSTeamsAttachmentPlaceholder, @@ -675,7 +675,7 @@ export function createMSTeamsMessageHandler(deps: MSTeamsMessageHandlerDeps) { }, }); - return async function handleTeamsMessage(context: MSTeamsTurnContext) { + const handleTeamsMessage = async (context: MSTeamsTurnContext) => { const activity = context.activity; const rawText = activity.text?.trim() ?? ""; const text = stripMSTeamsMentionTags(rawText); @@ -698,4 +698,9 @@ export function createMSTeamsMessageHandler(deps: MSTeamsMessageHandlerDeps) { implicitMention, }); }; + + return { + handleTeamsMessage, + unregisterDebouncer: inboundDebouncer.unregister, + }; } diff --git a/extensions/msteams/src/monitor.lifecycle.test.ts b/extensions/msteams/src/monitor.lifecycle.test.ts index 67302dc61dd..d0a902ca263 100644 --- a/extensions/msteams/src/monitor.lifecycle.test.ts +++ b/extensions/msteams/src/monitor.lifecycle.test.ts @@ -30,7 +30,9 @@ vi.mock("../runtime-api.js", () => ({ resolve(); return; } - params.abortSignal?.addEventListener("abort", () => resolve(), { once: true }); + params.abortSignal?.addEventListener("abort", () => resolve(), { + once: true, + }); }); await params.onAbort?.(); }, @@ -80,7 +82,8 @@ vi.mock("express", () => { const registerMSTeamsHandlers = vi.hoisted(() => vi.fn(() => ({ - run: vi.fn(async () => {}), + handler: { run: vi.fn(async () => {}) }, + unregisterDebouncer: vi.fn(), })), ); const createMSTeamsAdapter = vi.hoisted(() => diff --git a/extensions/msteams/src/monitor.ts b/extensions/msteams/src/monitor.ts index f5c60064174..d66ada1dcf9 100644 --- a/extensions/msteams/src/monitor.ts +++ b/extensions/msteams/src/monitor.ts @@ -12,7 +12,7 @@ import { createMSTeamsConversationStoreFs } from "./conversation-store-fs.js"; import type { MSTeamsConversationStore } from "./conversation-store.js"; import { formatUnknownError } from "./errors.js"; import type { MSTeamsAdapter } from "./messenger.js"; -import { registerMSTeamsHandlers, type MSTeamsActivityHandler } from "./monitor-handler.js"; +import { type MSTeamsActivityHandler, registerMSTeamsHandlers } from "./monitor-handler.js"; import { createMSTeamsPollStoreFs, type MSTeamsPollStore } from "./polls.js"; import { resolveMSTeamsChannelAllowlist, @@ -136,12 +136,19 @@ export async function monitorMSTeamsProvider( .filter((entry) => entry && entry !== "*"); if (groupEntries.length > 0) { const { additions } = await resolveAllowlistUsers("msteams group users", groupEntries); - groupAllowFrom = mergeAllowlist({ existing: groupAllowFrom, additions }); + groupAllowFrom = mergeAllowlist({ + existing: groupAllowFrom, + additions, + }); } } if (teamsConfig && Object.keys(teamsConfig).length > 0) { - const entries: Array<{ input: string; teamKey: string; channelKey?: string }> = []; + const entries: Array<{ + input: string; + teamKey: string; + channelKey?: string; + }> = []; for (const [teamKey, teamCfg] of Object.entries(teamsConfig)) { if (teamKey === "*") { continue; @@ -190,7 +197,11 @@ export async function monitorMSTeamsProvider( ...sourceTeam.channels, ...existing.channels, }; - const mergedTeam = { ...sourceTeam, ...existing, channels: mergedChannels }; + const mergedTeam = { + ...sourceTeam, + ...existing, + channels: mergedChannels, + }; nextTeams[entry.teamId] = mergedTeam; if (source.channelKey && entry.channelId) { const sourceChannel = sourceTeam.channels?.[source.channelKey]; @@ -254,18 +265,21 @@ export async function monitorMSTeamsProvider( const tokenProvider = new MsalTokenProvider(authConfig); const adapter = createMSTeamsAdapter(authConfig, sdk); - const handler = registerMSTeamsHandlers(new ActivityHandler() as MSTeamsActivityHandler, { - cfg, - runtime, - appId, - adapter: adapter as unknown as MSTeamsAdapter, - tokenProvider, - textLimit, - mediaMaxBytes, - conversationStore, - pollStore, - log, - }); + const { handler, unregisterDebouncer } = registerMSTeamsHandlers( + new ActivityHandler() as MSTeamsActivityHandler, + { + cfg, + runtime, + appId, + adapter: adapter as unknown as MSTeamsAdapter, + tokenProvider, + textLimit, + mediaMaxBytes, + conversationStore, + pollStore, + log, + }, + ); // Create Express server const expressApp = express.default(); @@ -283,7 +297,7 @@ export async function monitorMSTeamsProvider( const configuredPath = msteamsCfg.webhook?.path ?? "/api/messages"; const messageHandler = (req: Request, res: Response) => { void adapter - .process(req, res, (context: unknown) => handler.run!(context)) + .process(req, res, (context: unknown) => handler.run?.(context)) .catch((err: unknown) => { log.error("msteams webhook failed", { error: formatUnknownError(err) }); }); @@ -301,21 +315,29 @@ export async function monitorMSTeamsProvider( }); // Start listening and fail fast if bind/listen fails. - const httpServer = expressApp.listen(port); - await new Promise((resolve, reject) => { - const onListening = () => { - httpServer.off("error", onError); - log.info(`msteams provider started on port ${port}`); - resolve(); - }; - const onError = (err: unknown) => { - httpServer.off("listening", onListening); - log.error("msteams server error", { error: String(err) }); - reject(err); - }; - httpServer.once("listening", onListening); - httpServer.once("error", onError); - }); + let httpServer: ReturnType; + try { + httpServer = expressApp.listen(port); + await new Promise((resolve, reject) => { + const onListening = () => { + httpServer.off("error", onError); + log.info(`msteams provider started on port ${port}`); + resolve(); + }; + const onError = (err: unknown) => { + httpServer.off("listening", onListening); + log.error("msteams server error", { error: String(err) }); + reject(err); + }; + httpServer.once("listening", onListening); + httpServer.once("error", onError); + }); + } catch (err) { + // Clean up the debouncer so it does not linger in the global registry + // when the provider fails to start (e.g. port already in use). + unregisterDebouncer(); + throw err; + } applyMSTeamsWebhookTimeouts(httpServer); httpServer.on("error", (err) => { @@ -324,6 +346,7 @@ export async function monitorMSTeamsProvider( const shutdown = async () => { log.info("shutting down msteams provider"); + unregisterDebouncer(); return new Promise((resolve) => { httpServer.close((err) => { if (err) { diff --git a/extensions/slack/src/monitor/message-handler.test.ts b/extensions/slack/src/monitor/message-handler.test.ts index cfea959f4d0..8ccc70bcd13 100644 --- a/extensions/slack/src/monitor/message-handler.test.ts +++ b/extensions/slack/src/monitor/message-handler.test.ts @@ -12,6 +12,8 @@ vi.mock("../../../../src/auto-reply/inbound-debounce.js", () => ({ createInboundDebouncer: () => ({ enqueue: (entry: unknown) => enqueueMock(entry), flushKey: (key: string) => flushKeyMock(key), + flushAll: async () => 0, + unregister: () => {}, }), })); diff --git a/extensions/slack/src/monitor/message-handler.ts b/extensions/slack/src/monitor/message-handler.ts index fb700b78350..5f43ed9aa58 100644 --- a/extensions/slack/src/monitor/message-handler.ts +++ b/extensions/slack/src/monitor/message-handler.ts @@ -15,6 +15,10 @@ export type SlackMessageHandler = ( opts: { source: "message" | "app_mention"; wasMentioned?: boolean }, ) => Promise; +export type SlackMessageHandlerWithLifecycle = SlackMessageHandler & { + deactivate: () => void; +}; + const APP_MENTION_RETRY_TTL_MS = 60_000; function resolveSlackSenderId(message: SlackMessageEvent): string | null { @@ -92,7 +96,7 @@ export function createSlackMessageHandler(params: { account: ResolvedSlackAccount; /** Called on each inbound event to update liveness tracking. */ trackEvent?: () => void; -}): SlackMessageHandler { +}): SlackMessageHandlerWithLifecycle { const { ctx, account, trackEvent } = params; const { debounceMs, debouncer } = createChannelInboundDebouncer<{ message: SlackMessageEvent; @@ -206,7 +210,7 @@ export function createSlackMessageHandler(params: { return true; }; - return async (message, opts) => { + const handler: SlackMessageHandlerWithLifecycle = async (message, opts) => { if (opts.source === "message" && message.type !== "message") { return; } @@ -253,4 +257,10 @@ export function createSlackMessageHandler(params: { } await debouncer.enqueue({ message: resolvedMessage, opts }); }; + + handler.deactivate = () => { + debouncer.unregister(); + }; + + return handler; } diff --git a/extensions/slack/src/monitor/provider.ts b/extensions/slack/src/monitor/provider.ts index 1af83676e93..832babd37ba 100644 --- a/extensions/slack/src/monitor/provider.ts +++ b/extensions/slack/src/monitor/provider.ts @@ -567,6 +567,7 @@ export async function monitorSlackProvider(opts: MonitorSlackOpts = {}) { } finally { opts.abortSignal?.removeEventListener("abort", stopOnAbort); unregisterHttpHandler?.(); + handleSlackMessage.deactivate(); await app.stop().catch(() => undefined); } } diff --git a/extensions/telegram/src/bot-handlers.runtime.ts b/extensions/telegram/src/bot-handlers.runtime.ts index 96726785db2..008f2ed74d3 100644 --- a/extensions/telegram/src/bot-handlers.runtime.ts +++ b/extensions/telegram/src/bot-handlers.runtime.ts @@ -110,7 +110,7 @@ export const registerTelegramHandlers = ({ processMessage, logger, telegramDeps = defaultTelegramBotDeps, -}: RegisterTelegramHandlerParams) => { +}: RegisterTelegramHandlerParams): { unregisterDebouncer: () => void } => { const DEFAULT_TEXT_FRAGMENT_MAX_GAP_MS = 1500; const TELEGRAM_TEXT_FRAGMENT_START_THRESHOLD_CHARS = 4000; const TELEGRAM_TEXT_FRAGMENT_MAX_GAP_MS = @@ -1759,4 +1759,6 @@ export const registerTelegramHandlers = ({ errorMessage: "channel_post handler failed", }); }); + + return { unregisterDebouncer: inboundDebouncer.unregister }; }; diff --git a/extensions/telegram/src/bot.ts b/extensions/telegram/src/bot.ts index 11c394518c4..e198cc6e120 100644 --- a/extensions/telegram/src/bot.ts +++ b/extensions/telegram/src/bot.ts @@ -324,7 +324,7 @@ export function createTelegramBot(opts: TelegramBotOptions) { const MAX_RAW_UPDATE_ARRAY = 20; const stringifyUpdate = (update: unknown) => { const seen = new WeakSet(); - return JSON.stringify(update ?? null, (key, value) => { + return JSON.stringify(update ?? null, (_key, value) => { if (typeof value === "string" && value.length > MAX_RAW_UPDATE_STRING) { return `${value.slice(0, MAX_RAW_UPDATE_STRING)}...`; } @@ -531,7 +531,7 @@ export function createTelegramBot(opts: TelegramBotOptions) { telegramDeps, }); - registerTelegramHandlers({ + const { unregisterDebouncer } = registerTelegramHandlers({ cfg, accountId: account.accountId, bot, @@ -552,6 +552,7 @@ export function createTelegramBot(opts: TelegramBotOptions) { const originalStop = bot.stop.bind(bot); bot.stop = ((...args: Parameters) => { + unregisterDebouncer(); threadBindingManager?.stop(); return originalStop(...args); }) as typeof bot.stop; diff --git a/extensions/whatsapp/src/inbound/monitor.ts b/extensions/whatsapp/src/inbound/monitor.ts index b19e37feb69..38c8adb7306 100644 --- a/extensions/whatsapp/src/inbound/monitor.ts +++ b/extensions/whatsapp/src/inbound/monitor.ts @@ -461,6 +461,7 @@ export async function monitorWebInbox(options: { return { close: async () => { try { + debouncer.unregister(); const ev = sock.ev as unknown as { off?: (event: string, listener: (...args: unknown[]) => void) => void; removeListener?: (event: string, listener: (...args: unknown[]) => void) => void; diff --git a/src/auto-reply/inbound-debounce.ts b/src/auto-reply/inbound-debounce.ts index debda7bc7b5..34c0d463185 100644 --- a/src/auto-reply/inbound-debounce.ts +++ b/src/auto-reply/inbound-debounce.ts @@ -1,5 +1,91 @@ import type { OpenClawConfig } from "../config/config.js"; import type { InboundDebounceByProvider } from "../config/types.messages.js"; +import { resolveGlobalMap } from "../shared/global-singleton.js"; + +/** + * Global registry of all active inbound debouncers so they can be flushed + * collectively during gateway restart (SIGUSR1). Each debouncer registers + * itself on creation and stays registered until a complete global flush + * drains it or the owner explicitly unregisters it during teardown. + */ +type DebouncerFlushResult = { + flushedCount: number; + drained: boolean; +}; + +type DebouncerFlushHandle = { + flushAll: (options?: { deadlineMs?: number }) => Promise; + unregister: () => void; + /** Epoch ms of last enqueue or creation, whichever is more recent. */ + lastActivityMs: number; +}; +const INBOUND_DEBOUNCERS_KEY = Symbol.for("openclaw.inboundDebouncers"); +const INBOUND_DEBOUNCERS = resolveGlobalMap(INBOUND_DEBOUNCERS_KEY); + +/** + * Clear the global debouncer registry. Intended for test cleanup only. + */ +export function clearInboundDebouncerRegistry(): void { + INBOUND_DEBOUNCERS.clear(); +} + +/** Debouncers idle longer than this are auto-removed during flush as a safety + * net against channels that forget to call unregister() on teardown. */ +const STALE_DEBOUNCER_MS = 5 * 60 * 1000; // 5 minutes + +/** + * Flush all registered inbound debouncers immediately. Called during SIGUSR1 + * restart to push buffered messages into the session before reinitializing. + * Returns the number of debounce buffers actually flushed so restart logic can + * skip followup draining when there was no buffered work. + * + * Stale debouncers (no enqueue activity for >5 minutes) are auto-evicted as a + * safety net in case a channel monitor forgot to call unregister() on teardown. + */ +export async function flushAllInboundDebouncers(options?: { timeoutMs?: number }): Promise { + const entries = [...INBOUND_DEBOUNCERS.entries()]; + if (entries.length === 0) { + return 0; + } + const now = Date.now(); + const deadlineMs = + typeof options?.timeoutMs === "number" && Number.isFinite(options.timeoutMs) + ? now + Math.max(0, Math.trunc(options.timeoutMs)) + : undefined; + const flushedCounts = await Promise.all( + entries.map(async ([_key, handle]) => { + let result: DebouncerFlushResult; + try { + result = await (deadlineMs !== undefined + ? Promise.race([ + handle.flushAll({ deadlineMs }), + new Promise((resolve) => { + const timer = setTimeout( + () => resolve({ flushedCount: 0, drained: false }), + Math.max(0, deadlineMs - Date.now()), + ); + timer.unref?.(); + }), + ]) + : handle.flushAll({ deadlineMs })); + } catch { + // A hung or failing flushAll should not prevent other debouncers + // from being swept. Keep the handle registered for a future sweep. + return 0; + } + // Only deregister AFTER the handle confirms all its buffers are + // drained. If the deadline hit mid-sweep, keep partially-flushed + // handles registered so subsequent sweeps can finish the job. + // Also auto-evict stale entries whose owning channel never called + // unregister() (e.g. after reconnect). + if (result.drained || now - handle.lastActivityMs >= STALE_DEBOUNCER_MS) { + handle.unregister(); + } + return result.flushedCount; + }), + ); + return flushedCounts.reduce((total, count) => total + count, 0); +} const resolveMs = (value: unknown): number | undefined => { if (typeof value !== "number" || !Number.isFinite(value)) { @@ -60,6 +146,7 @@ export function createInboundDebouncer(params: InboundDebounceCreateParams return Math.max(0, Math.trunc(resolved)); }; + // Returns true when the buffer had pending messages that were delivered. const flushBuffer = async (key: string, buffer: DebounceBuffer) => { buffers.delete(key); if (buffer.timeout) { @@ -67,21 +154,24 @@ export function createInboundDebouncer(params: InboundDebounceCreateParams buffer.timeout = null; } if (buffer.items.length === 0) { - return; + return false; } + let delivered = false; try { await params.onFlush(buffer.items); + delivered = true; } catch (err) { params.onError?.(err, buffer.items); } + return delivered; }; const flushKey = async (key: string) => { const buffer = buffers.get(key); if (!buffer) { - return; + return false; } - await flushBuffer(key, buffer); + return flushBuffer(key, buffer); }; const scheduleFlush = (key: string, buffer: DebounceBuffer) => { @@ -95,6 +185,7 @@ export function createInboundDebouncer(params: InboundDebounceCreateParams }; const enqueue = async (item: T) => { + handle.lastActivityMs = Date.now(); const key = params.buildKey(item); const debounceMs = resolveDebounceMs(item); const canDebounce = debounceMs > 0 && (params.shouldDebounce?.(item) ?? true); @@ -119,10 +210,75 @@ export function createInboundDebouncer(params: InboundDebounceCreateParams return; } - const buffer: DebounceBuffer = { items: [item], timeout: null, debounceMs }; + const buffer: DebounceBuffer = { + items: [item], + timeout: null, + debounceMs, + }; buffers.set(key, buffer); scheduleFlush(key, buffer); }; - return { enqueue, flushKey }; + const flushAllInternal = async (options?: { + deadlineMs?: number; + }): Promise => { + let flushedBufferCount = 0; + + // Keep sweeping until no debounced keys remain. A flush callback can race + // with late in-flight ingress and create another buffered key before the + // global registry deregisters this debouncer during restart. + while (buffers.size > 0) { + if (options?.deadlineMs !== undefined && Date.now() >= options.deadlineMs) { + return { + flushedCount: flushedBufferCount, + drained: buffers.size === 0, + }; + } + const keys = [...buffers.keys()]; + for (const key of keys) { + if (options?.deadlineMs !== undefined && Date.now() >= options.deadlineMs) { + return { + flushedCount: flushedBufferCount, + drained: buffers.size === 0, + }; + } + if (!buffers.has(key)) { + continue; + } + try { + const hadMessages = await flushKey(key); + if (hadMessages) { + flushedBufferCount += 1; + } + } catch { + // flushBuffer already routed the failure through onError; keep + // sweeping so one bad key cannot strand later buffered messages. + } + } + } + + return { + flushedCount: flushedBufferCount, + drained: buffers.size === 0, + }; + }; + + const flushAll = async (options?: { deadlineMs?: number }) => { + const result = await flushAllInternal(options); + return result.flushedCount; + }; + + // Register in global registry for SIGUSR1 flush. + const registryKey = Symbol(); + const unregister = () => { + INBOUND_DEBOUNCERS.delete(registryKey); + }; + const handle: DebouncerFlushHandle = { + flushAll: flushAllInternal, + unregister, + lastActivityMs: Date.now(), + }; + INBOUND_DEBOUNCERS.set(registryKey, handle); + + return { enqueue, flushKey, flushAll, unregister }; } diff --git a/src/auto-reply/inbound.test.ts b/src/auto-reply/inbound.test.ts index 77ff61e814e..a724036d007 100644 --- a/src/auto-reply/inbound.test.ts +++ b/src/auto-reply/inbound.test.ts @@ -1,10 +1,14 @@ import fs from "node:fs/promises"; import os from "node:os"; import path from "node:path"; -import { describe, expect, it, vi } from "vitest"; +import { afterEach, beforeEach, describe, expect, it, vi } from "vitest"; import type { OpenClawConfig } from "../config/config.js"; import type { GroupKeyResolution } from "../config/sessions.js"; -import { createInboundDebouncer } from "./inbound-debounce.js"; +import { + clearInboundDebouncerRegistry, + createInboundDebouncer, + flushAllInboundDebouncers, +} from "./inbound-debounce.js"; import { resolveGroupRequireMention } from "./reply/groups.js"; import { finalizeInboundContext } from "./reply/inbound-context.js"; import { @@ -308,7 +312,11 @@ describe("createInboundDebouncer", () => { vi.useFakeTimers(); const calls: Array = []; - const debouncer = createInboundDebouncer<{ key: string; id: string; debounce: boolean }>({ + const debouncer = createInboundDebouncer<{ + key: string; + id: string; + debounce: boolean; + }>({ debounceMs: 50, buildKey: (item) => item.key, shouldDebounce: (item) => item.debounce, @@ -329,7 +337,11 @@ describe("createInboundDebouncer", () => { vi.useFakeTimers(); const calls: Array = []; - const debouncer = createInboundDebouncer<{ key: string; id: string; windowMs: number }>({ + const debouncer = createInboundDebouncer<{ + key: string; + id: string; + windowMs: number; + }>({ debounceMs: 0, buildKey: (item) => item.key, resolveDebounceMs: (item) => item.windowMs, @@ -349,6 +361,383 @@ describe("createInboundDebouncer", () => { }); }); +describe("flushAllInboundDebouncers", () => { + // Clear registry before each test to avoid leaking state from other tests + // that create debouncers. + beforeEach(() => { + clearInboundDebouncerRegistry(); + }); + + afterEach(() => { + clearInboundDebouncerRegistry(); + }); + + it("flushes all pending inbound debounce buffers immediately", async () => { + vi.useFakeTimers(); + const callsA: Array = []; + const callsB: Array = []; + + const debouncerA = createInboundDebouncer<{ key: string; id: string }>({ + debounceMs: 5000, + buildKey: (item) => item.key, + onFlush: async (items) => { + callsA.push(items.map((entry) => entry.id)); + }, + }); + + const debouncerB = createInboundDebouncer<{ key: string; id: string }>({ + debounceMs: 5000, + buildKey: (item) => item.key, + onFlush: async (items) => { + callsB.push(items.map((entry) => entry.id)); + }, + }); + + await debouncerA.enqueue({ key: "session-1", id: "msg-1" }); + await debouncerA.enqueue({ key: "session-1", id: "msg-2" }); + await debouncerB.enqueue({ key: "session-2", id: "msg-3" }); + + // Nothing flushed yet (timers haven't fired) + expect(callsA).toEqual([]); + expect(callsB).toEqual([]); + + const flushed = await flushAllInboundDebouncers(); + expect(flushed).toBe(2); + expect(callsA).toEqual([["msg-1", "msg-2"]]); + expect(callsB).toEqual([["msg-3"]]); + + vi.useRealTimers(); + }); + + it("counts pending buffers instead of registered debouncers", async () => { + vi.useFakeTimers(); + const calls: Array = []; + + const activeDebouncer = createInboundDebouncer<{ key: string; id: string }>({ + debounceMs: 5000, + buildKey: (item) => item.key, + onFlush: async (items) => { + calls.push(items.map((entry) => entry.id)); + }, + }); + + createInboundDebouncer<{ key: string; id: string }>({ + debounceMs: 5000, + buildKey: (item) => item.key, + onFlush: async () => {}, + }); + + await activeDebouncer.enqueue({ key: "session-1", id: "msg-1" }); + await activeDebouncer.enqueue({ key: "session-2", id: "msg-2" }); + + const flushed = await flushAllInboundDebouncers(); + expect(flushed).toBe(2); + expect(calls).toHaveLength(2); + expect(calls).toContainEqual(["msg-1"]); + expect(calls).toContainEqual(["msg-2"]); + + vi.useRealTimers(); + }); + + it("counts only buffers that were delivered successfully", async () => { + vi.useFakeTimers(); + const calls: Array = []; + const errors: Array = []; + + const debouncer = createInboundDebouncer<{ key: string; id: string }>({ + debounceMs: 5000, + buildKey: (item) => item.key, + onFlush: async (items) => { + const ids = items.map((entry) => entry.id); + if (ids.includes("msg-1")) { + throw new Error("dispatch failed"); + } + calls.push(ids); + }, + onError: (_err, items) => { + errors.push(items.map((entry) => entry.id)); + }, + }); + + await debouncer.enqueue({ key: "session-1", id: "msg-1" }); + await debouncer.enqueue({ key: "session-2", id: "msg-2" }); + + const flushed = await flushAllInboundDebouncers(); + expect(flushed).toBe(1); + expect(calls).toEqual([["msg-2"]]); + expect(errors).toEqual([["msg-1"]]); + + vi.useRealTimers(); + }); + + it("keeps flushing until no buffered keys remain", async () => { + vi.useFakeTimers(); + const calls: Array = []; + let enqueuedDuringFlush = false; + + let debouncer: ReturnType>; + debouncer = createInboundDebouncer<{ key: string; id: string }>({ + debounceMs: 5000, + buildKey: (item) => item.key, + onFlush: async (items) => { + calls.push(items.map((entry) => entry.id)); + if (!enqueuedDuringFlush) { + enqueuedDuringFlush = true; + await debouncer.enqueue({ key: "session-2", id: "msg-2" }); + } + }, + }); + + await debouncer.enqueue({ key: "session-1", id: "msg-1" }); + + const flushed = await flushAllInboundDebouncers(); + expect(flushed).toBe(2); + expect(calls).toEqual([["msg-1"], ["msg-2"]]); + await expect(flushAllInboundDebouncers()).resolves.toBe(0); + + vi.useRealTimers(); + }); + + it("keeps timed-out debouncers registered for a later global sweep", async () => { + vi.useFakeTimers(); + const calls: Array = []; + let now = 0; + const nowSpy = vi.spyOn(Date, "now").mockImplementation(() => now); + + let debouncer: ReturnType>; + debouncer = createInboundDebouncer<{ key: string; id: string }>({ + debounceMs: 5000, + buildKey: (item) => item.key, + onFlush: async (items) => { + calls.push(items.map((entry) => entry.id)); + if (items[0]?.id === "msg-1") { + await debouncer.enqueue({ key: "session-2", id: "msg-2" }); + now = 20; + } + }, + }); + + try { + await debouncer.enqueue({ key: "session-1", id: "msg-1" }); + + const flushed = await flushAllInboundDebouncers({ timeoutMs: 10 }); + expect(flushed).toBe(1); + expect(calls).toEqual([["msg-1"]]); + + now = 0; + const flushedLater = await flushAllInboundDebouncers({ timeoutMs: 10 }); + expect(flushedLater).toBe(1); + expect(calls).toEqual([["msg-1"], ["msg-2"]]); + } finally { + nowSpy.mockRestore(); + vi.useRealTimers(); + } + }); + + it("returns 0 when no debouncers are registered", async () => { + const flushed = await flushAllInboundDebouncers(); + expect(flushed).toBe(0); + }); + + it("lets callers unregister a debouncer from the global registry", async () => { + vi.useFakeTimers(); + const calls: Array = []; + + const debouncer = createInboundDebouncer<{ key: string; id: string }>({ + debounceMs: 5000, + buildKey: (item) => item.key, + onFlush: async (items) => { + calls.push(items.map((entry) => entry.id)); + }, + }); + + await debouncer.enqueue({ key: "session-1", id: "msg-1" }); + debouncer.unregister(); + + expect(await flushAllInboundDebouncers()).toBe(0); + expect(calls).toEqual([]); + + await debouncer.flushAll(); + expect(calls).toEqual([["msg-1"]]); + + vi.useRealTimers(); + }); + + it("deregisters debouncers from global registry after flush", async () => { + vi.useFakeTimers(); + + createInboundDebouncer<{ key: string; id: string }>({ + debounceMs: 5000, + buildKey: (item) => item.key, + onFlush: async () => {}, + }); + + // First flush deregisters + await flushAllInboundDebouncers(); + + // Second flush should find nothing + const flushed = await flushAllInboundDebouncers(); + expect(flushed).toBe(0); + + vi.useRealTimers(); + }); + + it("auto-evicts stale debouncers idle >5 min even with a tight deadline", async () => { + vi.useFakeTimers(); + + // Use a debounceMs longer than the staleness window so the debounce + // timeout does NOT fire when we advance the clock. + createInboundDebouncer<{ key: string; id: string }>({ + debounceMs: 10 * 60 * 1000, + buildKey: (item) => item.key, + onFlush: async () => {}, + }); + + // Advance past the 5-minute staleness window (debounce timer still pending) + vi.advanceTimersByTime(5 * 60 * 1000 + 1); + + // Flush with a zero-ms timeout. The deadline fires immediately so the + // debouncer cannot actually drain, but the staleness guard evicts it. + await flushAllInboundDebouncers({ timeoutMs: 0 }); + + // Second flush should find nothing — stale entry was auto-evicted + const flushed2 = await flushAllInboundDebouncers(); + expect(flushed2).toBe(0); + + vi.useRealTimers(); + }); + + it("does not evict debouncers that have recent activity", async () => { + vi.useFakeTimers(); + + const debouncer = createInboundDebouncer<{ key: string; id: string }>({ + debounceMs: 10 * 60 * 1000, + buildKey: (item) => item.key, + onFlush: async () => {}, + }); + + // Advance 4 minutes (below staleness threshold) + vi.advanceTimersByTime(4 * 60 * 1000); + + // Enqueue refreshes the activity timestamp + await debouncer.enqueue({ key: "session-1", id: "msg-1" }); + + // Advance another 4 minutes (8 total since creation, 4 since enqueue) + vi.advanceTimersByTime(4 * 60 * 1000); + + // Flush with zero timeout — debouncer can't drain, but it's NOT stale + // (only 4 min since last enqueue). It should remain registered. + await flushAllInboundDebouncers({ timeoutMs: 0 }); + + // Debouncer should still be in the registry + // Do a full flush to verify it's still there + const flushed = await flushAllInboundDebouncers(); + expect(flushed).toBe(1); + + vi.useRealTimers(); + }); +}); + +describe("createInboundDebouncer flushAll", () => { + it("flushes all buffered keys", async () => { + vi.useFakeTimers(); + const calls: Array = []; + + const debouncer = createInboundDebouncer<{ key: string; id: string }>({ + debounceMs: 5000, + buildKey: (item) => item.key, + onFlush: async (items) => { + calls.push(items.map((entry) => entry.id)); + }, + }); + + await debouncer.enqueue({ key: "a", id: "1" }); + await debouncer.enqueue({ key: "b", id: "2" }); + await debouncer.enqueue({ key: "a", id: "3" }); + + expect(calls).toEqual([]); + await debouncer.flushAll(); + + // Both keys flushed + expect(calls).toHaveLength(2); + expect(calls).toContainEqual(["1", "3"]); + expect(calls).toContainEqual(["2"]); + + vi.useRealTimers(); + }); + + it("continues flushing later keys when onError throws", async () => { + vi.useFakeTimers(); + const calls: Array = []; + const errors: Array = []; + + const debouncer = createInboundDebouncer<{ key: string; id: string }>({ + debounceMs: 5000, + buildKey: (item) => item.key, + onFlush: async (items) => { + const ids = items.map((entry) => entry.id); + if (ids.includes("2")) { + throw new Error("dispatch failed"); + } + calls.push(ids); + }, + onError: (_err, items) => { + errors.push(items.map((entry) => entry.id)); + throw new Error("onError failed"); + }, + }); + + await debouncer.enqueue({ key: "a", id: "1" }); + await debouncer.enqueue({ key: "b", id: "2" }); + await debouncer.enqueue({ key: "c", id: "3" }); + + const flushed = await debouncer.flushAll(); + + expect(flushed).toBe(2); + expect(calls).toContainEqual(["1"]); + expect(calls).toContainEqual(["3"]); + expect(errors).toEqual([["2"]]); + + vi.useRealTimers(); + }); + + it("stops sweeping when the global flush deadline is reached", async () => { + vi.useFakeTimers(); + const calls: Array = []; + let now = 0; + const nowSpy = vi.spyOn(Date, "now").mockImplementation(() => now); + + let debouncer: ReturnType>; + debouncer = createInboundDebouncer<{ key: string; id: string }>({ + debounceMs: 5000, + buildKey: (item) => item.key, + onFlush: async (items) => { + calls.push(items.map((entry) => entry.id)); + if (items[0]?.id === "1") { + await debouncer.enqueue({ key: "b", id: "2" }); + now = 20; + } + }, + }); + + try { + await debouncer.enqueue({ key: "a", id: "1" }); + + const flushed = await debouncer.flushAll({ deadlineMs: 10 }); + expect(flushed).toBe(1); + expect(calls).toEqual([["1"]]); + + now = 0; + const flushedLater = await debouncer.flushAll({ deadlineMs: 10 }); + expect(flushedLater).toBe(1); + expect(calls).toEqual([["1"], ["2"]]); + } finally { + nowSpy.mockRestore(); + vi.useRealTimers(); + } + }); +}); + describe("initSessionState BodyStripped", () => { it("prefers BodyForAgent over Body for group chats", async () => { const root = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-sender-meta-")); diff --git a/src/auto-reply/reply/queue.ts b/src/auto-reply/reply/queue.ts index b097b6c5193..e3bbfcd2761 100644 --- a/src/auto-reply/reply/queue.ts +++ b/src/auto-reply/reply/queue.ts @@ -1,7 +1,8 @@ -export { extractQueueDirective } from "./queue/directive.js"; -export { clearSessionQueues } from "./queue/cleanup.js"; export type { ClearSessionQueueResult } from "./queue/cleanup.js"; +export { clearSessionQueues } from "./queue/cleanup.js"; +export { extractQueueDirective } from "./queue/directive.js"; export { scheduleFollowupDrain } from "./queue/drain.js"; +export { waitForFollowupQueueDrain } from "./queue/drain-all.js"; export { enqueueFollowupRun, getFollowupQueueDepth, diff --git a/src/auto-reply/reply/queue/drain-all.test.ts b/src/auto-reply/reply/queue/drain-all.test.ts new file mode 100644 index 00000000000..eff31238def --- /dev/null +++ b/src/auto-reply/reply/queue/drain-all.test.ts @@ -0,0 +1,89 @@ +import { afterEach, describe, expect, it, vi } from "vitest"; +import { waitForFollowupQueueDrain } from "./drain-all.js"; +import { FOLLOWUP_QUEUES, type FollowupQueueState } from "./state.js"; + +function createMockQueue(overrides: Partial = {}): FollowupQueueState { + return { + items: [], + draining: false, + lastEnqueuedAt: 0, + mode: "followup", + debounceMs: 1000, + cap: 20, + dropPolicy: "summarize", + droppedCount: 0, + summaryLines: [], + ...overrides, + }; +} + +afterEach(() => { + FOLLOWUP_QUEUES.clear(); +}); + +describe("waitForFollowupQueueDrain", () => { + it("returns drained immediately when no queues exist", async () => { + const result = await waitForFollowupQueueDrain(1000); + expect(result).toEqual({ drained: true, remaining: 0 }); + }); + + it("returns drained immediately when all queues are empty", async () => { + FOLLOWUP_QUEUES.set("test", createMockQueue()); + const result = await waitForFollowupQueueDrain(1000); + expect(result).toEqual({ drained: true, remaining: 0 }); + }); + + it("waits until queues are drained", async () => { + const queue = createMockQueue({ + items: [ + { prompt: "test", run: vi.fn() as unknown, enqueuedAt: Date.now() }, + ] as FollowupQueueState["items"], + draining: true, + }); + FOLLOWUP_QUEUES.set("test", queue); + + // Simulate drain completing after 100ms + setTimeout(() => { + queue.items.length = 0; + queue.draining = false; + FOLLOWUP_QUEUES.delete("test"); + }, 100); + + const result = await waitForFollowupQueueDrain(5000); + expect(result.drained).toBe(true); + expect(result.remaining).toBe(0); + }); + + it("returns not drained on timeout", async () => { + const queue = createMockQueue({ + items: [ + { prompt: "test", run: vi.fn() as unknown, enqueuedAt: Date.now() }, + ] as FollowupQueueState["items"], + draining: true, + }); + FOLLOWUP_QUEUES.set("test", queue); + + const result = await waitForFollowupQueueDrain(100); + expect(result.drained).toBe(false); + expect(result.remaining).toBeGreaterThan(0); + }); + + it("counts draining queues as having pending items even with empty items array", async () => { + const queue = createMockQueue({ draining: true }); + FOLLOWUP_QUEUES.set("test", queue); + + // Queue has no items but is still draining — should wait + const result = await waitForFollowupQueueDrain(100); + expect(result.drained).toBe(false); + expect(result.remaining).toBeGreaterThanOrEqual(1); + }); + + it("reports each draining queue in the timeout remaining count", async () => { + FOLLOWUP_QUEUES.set("queue-1", createMockQueue({ draining: true })); + FOLLOWUP_QUEUES.set("queue-2", createMockQueue({ draining: true })); + FOLLOWUP_QUEUES.set("queue-3", createMockQueue({ draining: true })); + + const result = await waitForFollowupQueueDrain(1); + expect(result).toEqual({ drained: false, remaining: 3 }); + }); +}); diff --git a/src/auto-reply/reply/queue/drain-all.ts b/src/auto-reply/reply/queue/drain-all.ts new file mode 100644 index 00000000000..f681ed0f557 --- /dev/null +++ b/src/auto-reply/reply/queue/drain-all.ts @@ -0,0 +1,44 @@ +import { FOLLOWUP_QUEUES } from "./state.js"; + +/** + * Wait for all followup queues to finish draining, up to `timeoutMs`. + * Returns `{ drained: true }` if all queues are empty, or `{ drained: false }` + * if the timeout was reached with items still pending. + * + * Called during SIGUSR1 restart after flushing inbound debouncers, so the + * newly enqueued items have time to be processed before the server tears down. + */ +export async function waitForFollowupQueueDrain( + timeoutMs: number, +): Promise<{ drained: boolean; remaining: number }> { + const deadline = Date.now() + timeoutMs; + const POLL_INTERVAL_MS = 50; + + const getPendingCount = (): number => { + let total = 0; + for (const queue of FOLLOWUP_QUEUES.values()) { + // Add 1 for the in-flight item owned by an active drain loop. + const queuePending = queue.items.length + (queue.draining ? 1 : 0); + total += queuePending; + } + return total; + }; + + let remaining = getPendingCount(); + if (remaining === 0) { + return { drained: true, remaining: 0 }; + } + + while (Date.now() < deadline) { + await new Promise((resolve) => { + const timer = setTimeout(resolve, Math.min(POLL_INTERVAL_MS, deadline - Date.now())); + timer.unref?.(); + }); + remaining = getPendingCount(); + if (remaining === 0) { + return { drained: true, remaining: 0 }; + } + } + + return { drained: false, remaining }; +} diff --git a/src/cli/gateway-cli/run-loop.test.ts b/src/cli/gateway-cli/run-loop.test.ts index ce8fbccbe93..36db91ac0b5 100644 --- a/src/cli/gateway-cli/run-loop.test.ts +++ b/src/cli/gateway-cli/run-loop.test.ts @@ -19,16 +19,29 @@ const scheduleGatewaySigusr1Restart = vi.fn((_opts?: { delayMs?: number; reason? })); const getActiveTaskCount = vi.fn(() => 0); const markGatewayDraining = vi.fn(); -const waitForActiveTasks = vi.fn(async (_timeoutMs: number) => ({ drained: true })); +const waitForActiveTasks = vi.fn(async (_timeoutMs: number) => ({ + drained: true, +})); const resetAllLanes = vi.fn(); const restartGatewayProcessWithFreshPid = vi.fn< - () => { mode: "spawned" | "supervised" | "disabled" | "failed"; pid?: number; detail?: string } + () => { + mode: "spawned" | "supervised" | "disabled" | "failed"; + pid?: number; + detail?: string; + } >(() => ({ mode: "disabled" })); const abortEmbeddedPiRun = vi.fn( (_sessionId?: string, _opts?: { mode?: "all" | "compacting" }) => false, ); const getActiveEmbeddedRunCount = vi.fn(() => 0); -const waitForActiveEmbeddedRuns = vi.fn(async (_timeoutMs: number) => ({ drained: true })); +const waitForActiveEmbeddedRuns = vi.fn(async (_timeoutMs: number) => ({ + drained: true, +})); +const flushAllInboundDebouncers = vi.fn(async (_options?: { timeoutMs?: number }) => 0); +const waitForFollowupQueueDrain = vi.fn(async (_timeoutMs: number) => ({ + drained: true, + remaining: 0, +})); const DRAIN_TIMEOUT_LOG = "drain timeout reached; proceeding with restart"; const gatewayLog = { info: vi.fn(), @@ -36,6 +49,15 @@ const gatewayLog = { error: vi.fn(), }; +vi.mock("../../auto-reply/inbound-debounce.js", () => ({ + flushAllInboundDebouncers: (options?: { timeoutMs?: number }) => + flushAllInboundDebouncers(options), +})); + +vi.mock("../../auto-reply/reply/queue/drain-all.js", () => ({ + waitForFollowupQueueDrain: (timeoutMs: number) => waitForFollowupQueueDrain(timeoutMs), +})); + vi.mock("../../infra/gateway-lock.js", () => ({ acquireGatewayLock: (opts?: { port?: number }) => acquireGatewayLock(opts), })); @@ -268,10 +290,14 @@ describe("runGatewayLoop", () => { expect(start).toHaveBeenCalledTimes(2); await new Promise((resolve) => setImmediate(resolve)); - expect(abortEmbeddedPiRun).toHaveBeenCalledWith(undefined, { mode: "compacting" }); + expect(abortEmbeddedPiRun).toHaveBeenCalledWith(undefined, { + mode: "compacting", + }); expect(waitForActiveTasks).toHaveBeenCalledWith(90_000); expect(waitForActiveEmbeddedRuns).toHaveBeenCalledWith(90_000); - expect(abortEmbeddedPiRun).toHaveBeenCalledWith(undefined, { mode: "all" }); + expect(abortEmbeddedPiRun).toHaveBeenCalledWith(undefined, { + mode: "all", + }); expect(markGatewayDraining).toHaveBeenCalledTimes(1); expect(gatewayLog.warn).toHaveBeenCalledWith(DRAIN_TIMEOUT_LOG); expect(closeFirst).toHaveBeenCalledWith({ @@ -325,6 +351,191 @@ describe("runGatewayLoop", () => { }); }); + it("flushes inbound debouncers before marking gateway draining on SIGUSR1", async () => { + vi.clearAllMocks(); + + await withIsolatedSignals(async ({ captureSignal }) => { + flushAllInboundDebouncers.mockResolvedValueOnce(2); + waitForFollowupQueueDrain.mockResolvedValueOnce({ + drained: true, + remaining: 0, + }); + + const { exited } = await createSignaledLoopHarness(); + const sigusr1 = captureSignal("SIGUSR1"); + const sigterm = captureSignal("SIGTERM"); + + sigusr1(); + + await new Promise((resolve) => setImmediate(resolve)); + await new Promise((resolve) => setImmediate(resolve)); + + expect(markGatewayDraining).toHaveBeenCalledTimes(1); + expect(flushAllInboundDebouncers).toHaveBeenCalledTimes(1); + expect(flushAllInboundDebouncers).toHaveBeenCalledWith({ timeoutMs: 10_000 }); + expect(waitForFollowupQueueDrain).toHaveBeenCalledWith(5_000); + // Flush debouncers BEFORE marking draining so flushed messages can enqueue + expect(flushAllInboundDebouncers.mock.invocationCallOrder[0]).toBeLessThan( + markGatewayDraining.mock.invocationCallOrder[0] ?? Number.POSITIVE_INFINITY, + ); + expect(markGatewayDraining.mock.invocationCallOrder[0]).toBeLessThan( + waitForFollowupQueueDrain.mock.invocationCallOrder[0] ?? Number.POSITIVE_INFINITY, + ); + expect(flushAllInboundDebouncers.mock.invocationCallOrder[0]).toBeLessThan( + waitForFollowupQueueDrain.mock.invocationCallOrder[0] ?? Number.POSITIVE_INFINITY, + ); + + expect(gatewayLog.info).toHaveBeenCalledWith( + "flushed 2 pending inbound debounce buffer(s) before restart", + ); + expect(gatewayLog.info).toHaveBeenCalledWith("followup queues drained before restart"); + + sigterm(); + await expect(exited).resolves.toBe(0); + }); + }); + + it("extends the restart force-exit timer to include followup queue drain time", async () => { + vi.clearAllMocks(); + + await withIsolatedSignals(async ({ captureSignal }) => { + flushAllInboundDebouncers.mockResolvedValueOnce(1); + waitForFollowupQueueDrain.mockResolvedValueOnce({ + drained: true, + remaining: 0, + }); + + const setTimeoutSpy = vi.spyOn(globalThis, "setTimeout"); + try { + const { exited } = await createSignaledLoopHarness(); + const sigusr1 = captureSignal("SIGUSR1"); + const sigterm = captureSignal("SIGTERM"); + + sigusr1(); + + await new Promise((resolve) => setImmediate(resolve)); + await new Promise((resolve) => setImmediate(resolve)); + + const forceExitCalls = setTimeoutSpy.mock.calls + .map((call) => call[1]) + .filter((delay): delay is number => typeof delay === "number" && delay >= 95_000); + expect(forceExitCalls).toContain(95_000); + expect(forceExitCalls).toContain(100_000); + + sigterm(); + await expect(exited).resolves.toBe(0); + } finally { + setTimeoutSpy.mockRestore(); + } + }); + }); + + it("always drains followup queue even when no debouncers had buffered messages", async () => { + vi.clearAllMocks(); + + await withIsolatedSignals(async ({ captureSignal }) => { + flushAllInboundDebouncers.mockResolvedValueOnce(0); + waitForFollowupQueueDrain.mockResolvedValueOnce({ + drained: true, + remaining: 0, + }); + + const setTimeoutSpy = vi.spyOn(globalThis, "setTimeout"); + try { + const { exited } = await createSignaledLoopHarness(); + const sigusr1 = captureSignal("SIGUSR1"); + const sigterm = captureSignal("SIGTERM"); + + sigusr1(); + + await new Promise((resolve) => setImmediate(resolve)); + await new Promise((resolve) => setImmediate(resolve)); + + expect(flushAllInboundDebouncers).toHaveBeenCalledTimes(1); + expect(flushAllInboundDebouncers).toHaveBeenCalledWith({ timeoutMs: 10_000 }); + // Followup queue drain is always called regardless of flushedCount + expect(waitForFollowupQueueDrain).toHaveBeenCalledWith(5_000); + expect(markGatewayDraining).toHaveBeenCalledTimes(1); + const forceExitCalls = setTimeoutSpy.mock.calls + .map((call) => call[1]) + .filter((delay): delay is number => typeof delay === "number" && delay >= 95_000); + expect(forceExitCalls).toEqual([95_000, 100_000]); + + sigterm(); + await expect(exited).resolves.toBe(0); + } finally { + setTimeoutSpy.mockRestore(); + } + }); + }); + + it("logs warning when followup queue drain times out", async () => { + vi.clearAllMocks(); + + await withIsolatedSignals(async ({ captureSignal }) => { + flushAllInboundDebouncers.mockResolvedValueOnce(1); + waitForFollowupQueueDrain.mockResolvedValueOnce({ + drained: false, + remaining: 3, + }); + + const { exited } = await createSignaledLoopHarness(); + const sigusr1 = captureSignal("SIGUSR1"); + const sigterm = captureSignal("SIGTERM"); + + sigusr1(); + + await new Promise((resolve) => setImmediate(resolve)); + await new Promise((resolve) => setImmediate(resolve)); + + expect(gatewayLog.warn).toHaveBeenCalledWith( + "followup queue drain timeout; 3 item(s) still pending", + ); + + sigterm(); + await expect(exited).resolves.toBe(0); + }); + }); + + it("re-arms the restart watchdog after a slow debounce flush", async () => { + vi.clearAllMocks(); + + await withIsolatedSignals(async ({ captureSignal }) => { + let now = 1000; + const nowSpy = vi.spyOn(Date, "now").mockImplementation(() => now); + flushAllInboundDebouncers.mockImplementationOnce(async () => { + now += 20_000; + return 0; + }); + + const setTimeoutSpy = vi.spyOn(globalThis, "setTimeout"); + try { + const { exited } = await createSignaledLoopHarness(); + const sigusr1 = captureSignal("SIGUSR1"); + const sigterm = captureSignal("SIGTERM"); + + sigusr1(); + + await new Promise((resolve) => setImmediate(resolve)); + await new Promise((resolve) => setImmediate(resolve)); + + const forceExitCalls = setTimeoutSpy.mock.calls + .map((call) => call[1]) + .filter((delay): delay is number => typeof delay === "number" && delay >= 95_000); + // First arm: 1000 + 5000 + 90000 = 96000, delay = 96000 - 1000 = 95000 + // Second arm (after 20s flush): 21000 + 5000 + 90000 + 5000 = 121000, + // delay = 121000 - 21000 = 100000 + expect(forceExitCalls).toEqual([95_000, 100_000]); + + sigterm(); + await expect(exited).resolves.toBe(0); + } finally { + nowSpy.mockRestore(); + setTimeoutSpy.mockRestore(); + } + }); + }); + it("releases the lock before exiting on spawned restart", async () => { vi.clearAllMocks(); diff --git a/src/cli/gateway-cli/run-loop.ts b/src/cli/gateway-cli/run-loop.ts index 23ec7dd584d..08f7b39119b 100644 --- a/src/cli/gateway-cli/run-loop.ts +++ b/src/cli/gateway-cli/run-loop.ts @@ -3,6 +3,8 @@ import { getActiveEmbeddedRunCount, waitForActiveEmbeddedRuns, } from "../../agents/pi-embedded-runner/runs.js"; +import { flushAllInboundDebouncers } from "../../auto-reply/inbound-debounce.js"; +import { waitForFollowupQueueDrain } from "../../auto-reply/reply/queue/drain-all.js"; import type { startGatewayServer } from "../../gateway/server.js"; import { acquireGatewayLock } from "../../infra/gateway-lock.js"; import { restartGatewayProcessWithFreshPid } from "../../infra/process-respawn.js"; @@ -97,6 +99,8 @@ export async function runGatewayLoop(params: { }; const DRAIN_TIMEOUT_MS = 90_000; + const FOLLOWUP_DRAIN_TIMEOUT_MS = 5_000; + const INBOUND_DEBOUNCE_FLUSH_TIMEOUT_MS = 10_000; const SHUTDOWN_TIMEOUT_MS = 5_000; const request = (action: GatewayRunSignalAction, signal: string) => { @@ -108,24 +112,57 @@ export async function runGatewayLoop(params: { const isRestart = action === "restart"; gatewayLog.info(`received ${signal}; ${isRestart ? "restarting" : "shutting down"}`); - // Allow extra time for draining active turns on restart. - const forceExitMs = isRestart ? DRAIN_TIMEOUT_MS + SHUTDOWN_TIMEOUT_MS : SHUTDOWN_TIMEOUT_MS; - const forceExitTimer = setTimeout(() => { - gatewayLog.error("shutdown timed out; exiting without full cleanup"); - // Exit non-zero on restart timeout so launchd/systemd treats it as a - // failure and triggers a clean process restart instead of assuming the - // shutdown was intentional. Stop-timeout stays at 0 (graceful). (#36822) - exitProcess(isRestart ? 1 : 0); - }, forceExitMs); + const baseForceExitDeadlineMs = + Date.now() + SHUTDOWN_TIMEOUT_MS + (isRestart ? DRAIN_TIMEOUT_MS : 0); + let forceExitTimer: ReturnType | null = null; + const armForceExitTimer = (deadlineMs: number) => { + if (forceExitTimer) { + clearTimeout(forceExitTimer); + } + forceExitTimer = setTimeout( + () => { + gatewayLog.error("shutdown timed out; exiting without full cleanup"); + // Exit non-zero on restart timeout so launchd/systemd treats it as a + // failure and triggers a clean process restart instead of assuming the + // shutdown was intentional. Stop-timeout stays at 0 (graceful). (#36822) + exitProcess(isRestart ? 1 : 0); + }, + Math.max(0, deadlineMs - Date.now()), + ); + forceExitTimer.unref?.(); + }; + armForceExitTimer(baseForceExitDeadlineMs); void (async () => { try { // On restart, wait for in-flight agent turns to finish before // tearing down the server so buffered messages are delivered. if (isRestart) { - // Reject new enqueues immediately during the drain window so - // sessions get an explicit restart error instead of silent task loss. + // Flush inbound debounce buffers BEFORE marking the gateway as + // draining so flushed messages can still enqueue into the command + // queue. This pushes any messages waiting in per-channel debounce + // timers (e.g. the 2500ms collect window) into the followup queues + // immediately, preventing silent message loss on reinit. + const flushedBuffers = await flushAllInboundDebouncers({ + timeoutMs: INBOUND_DEBOUNCE_FLUSH_TIMEOUT_MS, + }); + if (flushedBuffers > 0) { + gatewayLog.info( + `flushed ${flushedBuffers} pending inbound debounce buffer(s) before restart`, + ); + } + + // Now reject new command-queue work so late arrivals fail explicitly + // instead of being stranded. This does not block followup queue + // enqueues, so already-flushed inbound work can still drain normally. markGatewayDraining(); + + // Start the restart watchdog budget after the pre-shutdown debounce + // flush so slow flush handlers do not steal time from active drain. + armForceExitTimer( + Date.now() + SHUTDOWN_TIMEOUT_MS + DRAIN_TIMEOUT_MS + FOLLOWUP_DRAIN_TIMEOUT_MS, + ); + const activeTasks = getActiveTaskCount(); const activeRuns = getActiveEmbeddedRunCount(); @@ -156,6 +193,19 @@ export async function runGatewayLoop(params: { abortEmbeddedPiRun(undefined, { mode: "all" }); } } + + // Drain followup queues AFTER active tasks finish so tasks that + // produce followup work have a chance to enqueue before we wait. + // Always drain regardless of flushedCount — queued followups are + // not contingent on debouncers. + const followupResult = await waitForFollowupQueueDrain(FOLLOWUP_DRAIN_TIMEOUT_MS); + if (followupResult.drained) { + gatewayLog.info("followup queues drained before restart"); + } else { + gatewayLog.warn( + `followup queue drain timeout; ${followupResult.remaining} item(s) still pending`, + ); + } } await server?.close({ @@ -165,7 +215,9 @@ export async function runGatewayLoop(params: { } catch (err) { gatewayLog.error(`shutdown error: ${String(err)}`); } finally { - clearTimeout(forceExitTimer); + if (forceExitTimer) { + clearTimeout(forceExitTimer); + } server = null; if (isRestart) { await handleRestartAfterServerClose(); diff --git a/test/helpers/extensions/plugin-runtime-mock.ts b/test/helpers/extensions/plugin-runtime-mock.ts index c0b73a6e15d..1a3715c6446 100644 --- a/test/helpers/extensions/plugin-runtime-mock.ts +++ b/test/helpers/extensions/plugin-runtime-mock.ts @@ -275,6 +275,8 @@ export function createPluginRuntimeMock(overrides: DeepPartial = await params.onFlush([item]); }, flushKey: vi.fn(), + flushAll: vi.fn(async () => 0), + unregister: vi.fn(), }), ) as unknown as PluginRuntime["channel"]["debounce"]["createInboundDebouncer"], resolveInboundDebounceMs: vi.fn(