From de53e272157d934dcb37c47f694f1c92c77a8d36 Mon Sep 17 00:00:00 2001 From: Joey Krug Date: Sat, 14 Mar 2026 21:47:50 -0400 Subject: [PATCH] Auto-reply: harden inbound debounce restart flushing --- .../bluebubbles/src/monitor-debounce.ts | 2 + .../discord/src/monitor/message-handler.ts | 5 +- .../feishu/src/monitor.reaction.test.ts | 1 + extensions/feishu/src/monitor.test-mocks.ts | 2 + .../mattermost/src/mattermost/monitor.ts | 1 + .../src/monitor-handler.file-consent.test.ts | 3 + .../message-handler.authz.test.ts | 10 +- .../slack/src/monitor/message-handler.test.ts | 2 + .../slack/src/monitor/message-handler.ts | 14 ++- extensions/slack/src/monitor/provider.ts | 1 + extensions/whatsapp/src/inbound/monitor.ts | 1 + src/auto-reply/inbound-debounce.ts | 59 +++++++++--- src/auto-reply/inbound.test.ts | 91 +++++++++++++++++++ .../helpers/extensions/plugin-runtime-mock.ts | 2 + 14 files changed, 175 insertions(+), 19 deletions(-) diff --git a/extensions/bluebubbles/src/monitor-debounce.ts b/extensions/bluebubbles/src/monitor-debounce.ts index 8eb1a0f112c..3896cbe4912 100644 --- a/extensions/bluebubbles/src/monitor-debounce.ts +++ b/extensions/bluebubbles/src/monitor-debounce.ts @@ -15,6 +15,7 @@ export type BlueBubblesDebouncer = { enqueue: (item: BlueBubblesDebounceEntry) => Promise; flushKey: (key: string) => Promise; flushAll: () => Promise; + unregister: () => void; }; export type BlueBubblesDebounceRegistry = { @@ -200,6 +201,7 @@ export function createBlueBubblesDebounceRegistry(params: { return debouncer; }, removeDebouncer: (target) => { + targetDebouncers.get(target)?.unregister(); targetDebouncers.delete(target); }, }; diff --git a/extensions/discord/src/monitor/message-handler.ts b/extensions/discord/src/monitor/message-handler.ts index e17dcc906af..fdb41c677a8 100644 --- a/extensions/discord/src/monitor/message-handler.ts +++ b/extensions/discord/src/monitor/message-handler.ts @@ -180,7 +180,10 @@ export function createDiscordMessageHandler( } }; - handler.deactivate = inboundWorker.deactivate; + handler.deactivate = () => { + debouncer.unregister(); + inboundWorker.deactivate(); + }; return handler; } diff --git a/extensions/feishu/src/monitor.reaction.test.ts b/extensions/feishu/src/monitor.reaction.test.ts index 9e607b17c84..4ac0a504cb5 100644 --- a/extensions/feishu/src/monitor.reaction.test.ts +++ b/extensions/feishu/src/monitor.reaction.test.ts @@ -713,6 +713,7 @@ describe("Feishu inbound debounce regressions", () => { }, flushKey: async (_key: string) => false, flushAll: async () => 0, + unregister: () => {}, }), resolveInboundDebounceMs, }, diff --git a/extensions/feishu/src/monitor.test-mocks.ts b/extensions/feishu/src/monitor.test-mocks.ts index fd984106988..de287c14277 100644 --- a/extensions/feishu/src/monitor.test-mocks.ts +++ b/extensions/feishu/src/monitor.test-mocks.ts @@ -19,6 +19,7 @@ export function createFeishuRuntimeMockModule(): { enqueue: () => Promise; flushKey: () => Promise; flushAll: () => Promise; + unregister: () => void; }; }; text: { @@ -36,6 +37,7 @@ export function createFeishuRuntimeMockModule(): { enqueue: async () => {}, flushKey: async () => false, flushAll: async () => 0, + unregister: () => {}, }), }, text: { diff --git a/extensions/mattermost/src/mattermost/monitor.ts b/extensions/mattermost/src/mattermost/monitor.ts index 958a40de705..44fcfebe46a 100644 --- a/extensions/mattermost/src/mattermost/monitor.ts +++ b/extensions/mattermost/src/mattermost/monitor.ts @@ -1686,6 +1686,7 @@ export async function monitorMattermostProvider(opts: MonitorMattermostOpts = {} }, }); } finally { + debouncer.unregister(); unregisterInteractions?.(); } diff --git a/extensions/msteams/src/monitor-handler.file-consent.test.ts b/extensions/msteams/src/monitor-handler.file-consent.test.ts index 39b6ea1b1ff..93ad1634c4d 100644 --- a/extensions/msteams/src/monitor-handler.file-consent.test.ts +++ b/extensions/msteams/src/monitor-handler.file-consent.test.ts @@ -33,6 +33,9 @@ const runtimeStub: PluginRuntime = { resolveInboundDebounceMs: () => 0, createInboundDebouncer: () => ({ enqueue: async () => {}, + flushKey: async () => false, + flushAll: async () => 0, + unregister: () => {}, }), }, }, diff --git a/extensions/msteams/src/monitor-handler/message-handler.authz.test.ts b/extensions/msteams/src/monitor-handler/message-handler.authz.test.ts index 68295e9bb07..96b8d85a790 100644 --- a/extensions/msteams/src/monitor-handler/message-handler.authz.test.ts +++ b/extensions/msteams/src/monitor-handler/message-handler.authz.test.ts @@ -14,10 +14,18 @@ describe("msteams monitor handler authz", () => { resolveInboundDebounceMs: () => 0, createInboundDebouncer: (params: { onFlush: (entries: T[]) => Promise; - }): { enqueue: (entry: T) => Promise } => ({ + }): { + enqueue: (entry: T) => Promise; + flushKey: (_key: string) => Promise; + flushAll: () => Promise; + unregister: () => void; + } => ({ enqueue: async (entry: T) => { await params.onFlush([entry]); }, + flushKey: async (_key: string) => false, + flushAll: async () => 0, + unregister: () => {}, }), }, pairing: { diff --git a/extensions/slack/src/monitor/message-handler.test.ts b/extensions/slack/src/monitor/message-handler.test.ts index cfea959f4d0..8ccc70bcd13 100644 --- a/extensions/slack/src/monitor/message-handler.test.ts +++ b/extensions/slack/src/monitor/message-handler.test.ts @@ -12,6 +12,8 @@ vi.mock("../../../../src/auto-reply/inbound-debounce.js", () => ({ createInboundDebouncer: () => ({ enqueue: (entry: unknown) => enqueueMock(entry), flushKey: (key: string) => flushKeyMock(key), + flushAll: async () => 0, + unregister: () => {}, }), })); diff --git a/extensions/slack/src/monitor/message-handler.ts b/extensions/slack/src/monitor/message-handler.ts index fb700b78350..5f43ed9aa58 100644 --- a/extensions/slack/src/monitor/message-handler.ts +++ b/extensions/slack/src/monitor/message-handler.ts @@ -15,6 +15,10 @@ export type SlackMessageHandler = ( opts: { source: "message" | "app_mention"; wasMentioned?: boolean }, ) => Promise; +export type SlackMessageHandlerWithLifecycle = SlackMessageHandler & { + deactivate: () => void; +}; + const APP_MENTION_RETRY_TTL_MS = 60_000; function resolveSlackSenderId(message: SlackMessageEvent): string | null { @@ -92,7 +96,7 @@ export function createSlackMessageHandler(params: { account: ResolvedSlackAccount; /** Called on each inbound event to update liveness tracking. */ trackEvent?: () => void; -}): SlackMessageHandler { +}): SlackMessageHandlerWithLifecycle { const { ctx, account, trackEvent } = params; const { debounceMs, debouncer } = createChannelInboundDebouncer<{ message: SlackMessageEvent; @@ -206,7 +210,7 @@ export function createSlackMessageHandler(params: { return true; }; - return async (message, opts) => { + const handler: SlackMessageHandlerWithLifecycle = async (message, opts) => { if (opts.source === "message" && message.type !== "message") { return; } @@ -253,4 +257,10 @@ export function createSlackMessageHandler(params: { } await debouncer.enqueue({ message: resolvedMessage, opts }); }; + + handler.deactivate = () => { + debouncer.unregister(); + }; + + return handler; } diff --git a/extensions/slack/src/monitor/provider.ts b/extensions/slack/src/monitor/provider.ts index 1af83676e93..832babd37ba 100644 --- a/extensions/slack/src/monitor/provider.ts +++ b/extensions/slack/src/monitor/provider.ts @@ -567,6 +567,7 @@ export async function monitorSlackProvider(opts: MonitorSlackOpts = {}) { } finally { opts.abortSignal?.removeEventListener("abort", stopOnAbort); unregisterHttpHandler?.(); + handleSlackMessage.deactivate(); await app.stop().catch(() => undefined); } } diff --git a/extensions/whatsapp/src/inbound/monitor.ts b/extensions/whatsapp/src/inbound/monitor.ts index b19e37feb69..38c8adb7306 100644 --- a/extensions/whatsapp/src/inbound/monitor.ts +++ b/extensions/whatsapp/src/inbound/monitor.ts @@ -461,6 +461,7 @@ export async function monitorWebInbox(options: { return { close: async () => { try { + debouncer.unregister(); const ev = sock.ev as unknown as { off?: (event: string, listener: (...args: unknown[]) => void) => void; removeListener?: (event: string, listener: (...args: unknown[]) => void) => void; diff --git a/src/auto-reply/inbound-debounce.ts b/src/auto-reply/inbound-debounce.ts index abbd3df9694..c5ab68a58d7 100644 --- a/src/auto-reply/inbound-debounce.ts +++ b/src/auto-reply/inbound-debounce.ts @@ -5,10 +5,17 @@ import { resolveGlobalMap } from "../shared/global-singleton.js"; /** * Global registry of all active inbound debouncers so they can be flushed * collectively during gateway restart (SIGUSR1). Each debouncer registers - * itself on creation and deregisters after the next global flush sweep. + * itself on creation and stays registered until a complete global flush + * drains it or the owner explicitly unregisters it during teardown. */ +type DebouncerFlushResult = { + flushedCount: number; + drained: boolean; +}; + type DebouncerFlushHandle = { - flushAll: (options?: { deadlineMs?: number }) => Promise; + flushAll: (options?: { deadlineMs?: number }) => Promise; + unregister: () => void; }; const INBOUND_DEBOUNCERS_KEY = Symbol.for("openclaw.inboundDebouncers"); const INBOUND_DEBOUNCERS = resolveGlobalMap(INBOUND_DEBOUNCERS_KEY); @@ -36,12 +43,12 @@ export async function flushAllInboundDebouncers(options?: { timeoutMs?: number } ? Date.now() + Math.max(0, Math.trunc(options.timeoutMs)) : undefined; const flushedCounts = await Promise.all( - entries.map(async ([key, handle]) => { - try { - return await handle.flushAll({ deadlineMs }); - } finally { - INBOUND_DEBOUNCERS.delete(key); + entries.map(async ([_key, handle]) => { + const result = await handle.flushAll({ deadlineMs }); + if (result.drained) { + handle.unregister(); } + return result.flushedCount; }), ); return flushedCounts.reduce((total, count) => total + count, 0); @@ -106,7 +113,7 @@ export function createInboundDebouncer(params: InboundDebounceCreateParams return Math.max(0, Math.trunc(resolved)); }; - // Returns true when the buffer had pending messages that were flushed. + // Returns true when the buffer had pending messages that were delivered. const flushBuffer = async (key: string, buffer: DebounceBuffer) => { buffers.delete(key); if (buffer.timeout) { @@ -116,12 +123,14 @@ export function createInboundDebouncer(params: InboundDebounceCreateParams if (buffer.items.length === 0) { return false; } + let delivered = false; try { await params.onFlush(buffer.items); + delivered = true; } catch (err) { params.onError?.(err, buffer.items); } - return true; + return delivered; }; const flushKey = async (key: string) => { @@ -176,7 +185,9 @@ export function createInboundDebouncer(params: InboundDebounceCreateParams scheduleFlush(key, buffer); }; - const flushAll = async (options?: { deadlineMs?: number }) => { + const flushAllInternal = async (options?: { + deadlineMs?: number; + }): Promise => { let flushedBufferCount = 0; // Keep sweeping until no debounced keys remain. A flush callback can race @@ -184,12 +195,18 @@ export function createInboundDebouncer(params: InboundDebounceCreateParams // global registry deregisters this debouncer during restart. while (buffers.size > 0) { if (options?.deadlineMs !== undefined && Date.now() >= options.deadlineMs) { - break; + return { + flushedCount: flushedBufferCount, + drained: buffers.size === 0, + }; } const keys = [...buffers.keys()]; for (const key of keys) { if (options?.deadlineMs !== undefined && Date.now() >= options.deadlineMs) { - return flushedBufferCount; + return { + flushedCount: flushedBufferCount, + drained: buffers.size === 0, + }; } if (!buffers.has(key)) { continue; @@ -206,14 +223,26 @@ export function createInboundDebouncer(params: InboundDebounceCreateParams } } - return flushedBufferCount; + return { + flushedCount: flushedBufferCount, + drained: buffers.size === 0, + }; + }; + + const flushAll = async (options?: { deadlineMs?: number }) => { + const result = await flushAllInternal(options); + return result.flushedCount; }; // Register in global registry for SIGUSR1 flush. const registryKey = Symbol(); + const unregister = () => { + INBOUND_DEBOUNCERS.delete(registryKey); + }; INBOUND_DEBOUNCERS.set(registryKey, { - flushAll, + flushAll: flushAllInternal, + unregister, }); - return { enqueue, flushKey, flushAll }; + return { enqueue, flushKey, flushAll, unregister }; } diff --git a/src/auto-reply/inbound.test.ts b/src/auto-reply/inbound.test.ts index 7becad2cedf..ef1880f4784 100644 --- a/src/auto-reply/inbound.test.ts +++ b/src/auto-reply/inbound.test.ts @@ -439,6 +439,37 @@ describe("flushAllInboundDebouncers", () => { vi.useRealTimers(); }); + it("counts only buffers that were delivered successfully", async () => { + vi.useFakeTimers(); + const calls: Array = []; + const errors: Array = []; + + const debouncer = createInboundDebouncer<{ key: string; id: string }>({ + debounceMs: 5000, + buildKey: (item) => item.key, + onFlush: async (items) => { + const ids = items.map((entry) => entry.id); + if (ids.includes("msg-1")) { + throw new Error("dispatch failed"); + } + calls.push(ids); + }, + onError: (_err, items) => { + errors.push(items.map((entry) => entry.id)); + }, + }); + + await debouncer.enqueue({ key: "session-1", id: "msg-1" }); + await debouncer.enqueue({ key: "session-2", id: "msg-2" }); + + const flushed = await flushAllInboundDebouncers(); + expect(flushed).toBe(1); + expect(calls).toEqual([["msg-2"]]); + expect(errors).toEqual([["msg-1"]]); + + vi.useRealTimers(); + }); + it("keeps flushing until no buffered keys remain", async () => { vi.useFakeTimers(); const calls: Array = []; @@ -467,11 +498,71 @@ describe("flushAllInboundDebouncers", () => { vi.useRealTimers(); }); + it("keeps timed-out debouncers registered for a later global sweep", async () => { + vi.useFakeTimers(); + const calls: Array = []; + let now = 0; + const nowSpy = vi.spyOn(Date, "now").mockImplementation(() => now); + + let debouncer: ReturnType>; + debouncer = createInboundDebouncer<{ key: string; id: string }>({ + debounceMs: 5000, + buildKey: (item) => item.key, + onFlush: async (items) => { + calls.push(items.map((entry) => entry.id)); + if (items[0]?.id === "msg-1") { + await debouncer.enqueue({ key: "session-2", id: "msg-2" }); + now = 20; + } + }, + }); + + try { + await debouncer.enqueue({ key: "session-1", id: "msg-1" }); + + const flushed = await flushAllInboundDebouncers({ timeoutMs: 10 }); + expect(flushed).toBe(1); + expect(calls).toEqual([["msg-1"]]); + + now = 0; + const flushedLater = await flushAllInboundDebouncers({ timeoutMs: 10 }); + expect(flushedLater).toBe(1); + expect(calls).toEqual([["msg-1"], ["msg-2"]]); + } finally { + nowSpy.mockRestore(); + vi.useRealTimers(); + } + }); + it("returns 0 when no debouncers are registered", async () => { const flushed = await flushAllInboundDebouncers(); expect(flushed).toBe(0); }); + it("lets callers unregister a debouncer from the global registry", async () => { + vi.useFakeTimers(); + const calls: Array = []; + + const debouncer = createInboundDebouncer<{ key: string; id: string }>({ + debounceMs: 5000, + buildKey: (item) => item.key, + onFlush: async (items) => { + calls.push(items.map((entry) => entry.id)); + }, + }); + + await debouncer.enqueue({ key: "session-1", id: "msg-1" }); + debouncer.unregister(); + + expect(await flushAllInboundDebouncers()).toBe(0); + expect(calls).toEqual([]); + + await debouncer.flushAll(); + expect(calls).toEqual([["msg-1"]]); + + vi.useRealTimers(); + }); + it("deregisters debouncers from global registry after flush", async () => { vi.useFakeTimers(); diff --git a/test/helpers/extensions/plugin-runtime-mock.ts b/test/helpers/extensions/plugin-runtime-mock.ts index c0b73a6e15d..1a3715c6446 100644 --- a/test/helpers/extensions/plugin-runtime-mock.ts +++ b/test/helpers/extensions/plugin-runtime-mock.ts @@ -275,6 +275,8 @@ export function createPluginRuntimeMock(overrides: DeepPartial = await params.onFlush([item]); }, flushKey: vi.fn(), + flushAll: vi.fn(async () => 0), + unregister: vi.fn(), }), ) as unknown as PluginRuntime["channel"]["debounce"]["createInboundDebouncer"], resolveInboundDebounceMs: vi.fn(