fix: only count debounce buffers with actual pending messages in flush count

flushBuffer/flushKey now return whether messages were actually flushed,
so flushAll only increments flushedBufferCount for non-empty buffers.
Prevents idle registered debouncers from triggering unnecessary followup
queue drain waits during SIGUSR1 restart. Also wraps per-key flush in
try/catch so one onError throw cannot strand later buffered messages.
This commit is contained in:
Joey Krug 2026-03-14 17:03:43 -04:00
parent 23731ed099
commit 063e06e17b
4 changed files with 51 additions and 7 deletions

View File

@ -13,7 +13,7 @@ type BlueBubblesDebounceEntry = {
export type BlueBubblesDebouncer = {
enqueue: (item: BlueBubblesDebounceEntry) => Promise<void>;
flushKey: (key: string) => Promise<void>;
flushKey: (key: string) => Promise<boolean>;
};
export type BlueBubblesDebounceRegistry = {

View File

@ -711,7 +711,7 @@ describe("Feishu inbound debounce regressions", () => {
enqueueMock(item);
params.onError?.(new Error("dispatch failed"), [item]);
},
flushKey: async (_key: string) => {},
flushKey: async (_key: string) => false,
flushAll: async () => 0,
}),
resolveInboundDebounceMs,

View File

@ -102,6 +102,7 @@ export function createInboundDebouncer<T>(params: InboundDebounceCreateParams<T>
return Math.max(0, Math.trunc(resolved));
};
// Returns true when the buffer had pending messages that were flushed.
const flushBuffer = async (key: string, buffer: DebounceBuffer<T>) => {
buffers.delete(key);
if (buffer.timeout) {
@ -109,21 +110,22 @@ export function createInboundDebouncer<T>(params: InboundDebounceCreateParams<T>
buffer.timeout = null;
}
if (buffer.items.length === 0) {
return;
return false;
}
try {
await params.onFlush(buffer.items);
} catch (err) {
params.onError?.(err, buffer.items);
}
return true;
};
const flushKey = async (key: string) => {
const buffer = buffers.get(key);
if (!buffer) {
return;
return false;
}
await flushBuffer(key, buffer);
return flushBuffer(key, buffer);
};
const scheduleFlush = (key: string, buffer: DebounceBuffer<T>) => {
@ -182,8 +184,15 @@ export function createInboundDebouncer<T>(params: InboundDebounceCreateParams<T>
if (!buffers.has(key)) {
continue;
}
await flushKey(key);
flushedBufferCount += 1;
try {
const hadMessages = await flushKey(key);
if (hadMessages) {
flushedBufferCount += 1;
}
} catch {
// flushBuffer already routed the failure through onError; keep
// sweeping so one bad key cannot strand later buffered messages.
}
}
}

View File

@ -519,6 +519,41 @@ describe("createInboundDebouncer flushAll", () => {
vi.useRealTimers();
});
it("continues flushing later keys when onError throws", async () => {
vi.useFakeTimers();
const calls: Array<string[]> = [];
const errors: Array<string[]> = [];
const debouncer = createInboundDebouncer<{ key: string; id: string }>({
debounceMs: 5000,
buildKey: (item) => item.key,
onFlush: async (items) => {
const ids = items.map((entry) => entry.id);
if (ids.includes("2")) {
throw new Error("dispatch failed");
}
calls.push(ids);
},
onError: (_err, items) => {
errors.push(items.map((entry) => entry.id));
throw new Error("onError failed");
},
});
await debouncer.enqueue({ key: "a", id: "1" });
await debouncer.enqueue({ key: "b", id: "2" });
await debouncer.enqueue({ key: "c", id: "3" });
const flushed = await debouncer.flushAll();
expect(flushed).toBe(2);
expect(calls).toContainEqual(["1"]);
expect(calls).toContainEqual(["3"]);
expect(errors).toEqual([["2"]]);
vi.useRealTimers();
});
});
describe("initSessionState BodyStripped", () => {