From 063e06e17b8f79f55eb6f43fd0f4e693ad77ca96 Mon Sep 17 00:00:00 2001 From: Joey Krug Date: Sat, 14 Mar 2026 17:03:43 -0400 Subject: [PATCH] fix: only count debounce buffers with actual pending messages in flush count flushBuffer/flushKey now return whether messages were actually flushed, so flushAll only increments flushedBufferCount for non-empty buffers. Prevents idle registered debouncers from triggering unnecessary followup queue drain waits during SIGUSR1 restart. Also wraps per-key flush in try/catch so one onError throw cannot strand later buffered messages. --- .../bluebubbles/src/monitor-debounce.ts | 2 +- .../feishu/src/monitor.reaction.test.ts | 2 +- src/auto-reply/inbound-debounce.ts | 19 +++++++--- src/auto-reply/inbound.test.ts | 35 +++++++++++++++++++ 4 files changed, 51 insertions(+), 7 deletions(-) diff --git a/extensions/bluebubbles/src/monitor-debounce.ts b/extensions/bluebubbles/src/monitor-debounce.ts index 298be3e4921..79101bc3fb1 100644 --- a/extensions/bluebubbles/src/monitor-debounce.ts +++ b/extensions/bluebubbles/src/monitor-debounce.ts @@ -13,7 +13,7 @@ type BlueBubblesDebounceEntry = { export type BlueBubblesDebouncer = { enqueue: (item: BlueBubblesDebounceEntry) => Promise; - flushKey: (key: string) => Promise; + flushKey: (key: string) => Promise; }; export type BlueBubblesDebounceRegistry = { diff --git a/extensions/feishu/src/monitor.reaction.test.ts b/extensions/feishu/src/monitor.reaction.test.ts index b9fecb8050c..9e607b17c84 100644 --- a/extensions/feishu/src/monitor.reaction.test.ts +++ b/extensions/feishu/src/monitor.reaction.test.ts @@ -711,7 +711,7 @@ describe("Feishu inbound debounce regressions", () => { enqueueMock(item); params.onError?.(new Error("dispatch failed"), [item]); }, - flushKey: async (_key: string) => {}, + flushKey: async (_key: string) => false, flushAll: async () => 0, }), resolveInboundDebounceMs, diff --git a/src/auto-reply/inbound-debounce.ts b/src/auto-reply/inbound-debounce.ts index c74cb2f5992..c5e6b53520c 100644 --- a/src/auto-reply/inbound-debounce.ts +++ b/src/auto-reply/inbound-debounce.ts @@ -102,6 +102,7 @@ export function createInboundDebouncer(params: InboundDebounceCreateParams return Math.max(0, Math.trunc(resolved)); }; + // Returns true when the buffer had pending messages that were flushed. const flushBuffer = async (key: string, buffer: DebounceBuffer) => { buffers.delete(key); if (buffer.timeout) { @@ -109,21 +110,22 @@ export function createInboundDebouncer(params: InboundDebounceCreateParams buffer.timeout = null; } if (buffer.items.length === 0) { - return; + return false; } try { await params.onFlush(buffer.items); } catch (err) { params.onError?.(err, buffer.items); } + return true; }; const flushKey = async (key: string) => { const buffer = buffers.get(key); if (!buffer) { - return; + return false; } - await flushBuffer(key, buffer); + return flushBuffer(key, buffer); }; const scheduleFlush = (key: string, buffer: DebounceBuffer) => { @@ -182,8 +184,15 @@ export function createInboundDebouncer(params: InboundDebounceCreateParams if (!buffers.has(key)) { continue; } - await flushKey(key); - flushedBufferCount += 1; + try { + const hadMessages = await flushKey(key); + if (hadMessages) { + flushedBufferCount += 1; + } + } catch { + // flushBuffer already routed the failure through onError; keep + // sweeping so one bad key cannot strand later buffered messages. + } } } diff --git a/src/auto-reply/inbound.test.ts b/src/auto-reply/inbound.test.ts index 54ea66ae1d8..47fde4c9920 100644 --- a/src/auto-reply/inbound.test.ts +++ b/src/auto-reply/inbound.test.ts @@ -519,6 +519,41 @@ describe("createInboundDebouncer flushAll", () => { vi.useRealTimers(); }); + + it("continues flushing later keys when onError throws", async () => { + vi.useFakeTimers(); + const calls: Array = []; + const errors: Array = []; + + const debouncer = createInboundDebouncer<{ key: string; id: string }>({ + debounceMs: 5000, + buildKey: (item) => item.key, + onFlush: async (items) => { + const ids = items.map((entry) => entry.id); + if (ids.includes("2")) { + throw new Error("dispatch failed"); + } + calls.push(ids); + }, + onError: (_err, items) => { + errors.push(items.map((entry) => entry.id)); + throw new Error("onError failed"); + }, + }); + + await debouncer.enqueue({ key: "a", id: "1" }); + await debouncer.enqueue({ key: "b", id: "2" }); + await debouncer.enqueue({ key: "c", id: "3" }); + + const flushed = await debouncer.flushAll(); + + expect(flushed).toBe(2); + expect(calls).toContainEqual(["1"]); + expect(calls).toContainEqual(["3"]); + expect(errors).toEqual([["2"]]); + + vi.useRealTimers(); + }); }); describe("initSessionState BodyStripped", () => {