From 9b19b945d74b3742fba36bc63c7752280677b90f Mon Sep 17 00:00:00 2001 From: Joey Krug Date: Sat, 14 Mar 2026 15:16:48 -0400 Subject: [PATCH] fix: address queue drain review feedback --- .../feishu/src/monitor.reaction.test.ts | 3 +- src/auto-reply/inbound-debounce.ts | 22 +++++++--- src/auto-reply/inbound.test.ts | 32 +++++++++++++- src/auto-reply/reply/queue/drain-all.test.ts | 9 ++++ src/auto-reply/reply/queue/drain-all.ts | 10 ++--- src/cli/gateway-cli/run-loop.test.ts | 42 ++++++++++++++++++- src/cli/gateway-cli/run-loop.ts | 14 ++++--- 7 files changed, 111 insertions(+), 21 deletions(-) diff --git a/extensions/feishu/src/monitor.reaction.test.ts b/extensions/feishu/src/monitor.reaction.test.ts index 5765577441f..06efee0661a 100644 --- a/extensions/feishu/src/monitor.reaction.test.ts +++ b/extensions/feishu/src/monitor.reaction.test.ts @@ -711,7 +711,8 @@ describe("Feishu inbound debounce regressions", () => { enqueueMock(item); params.onError?.(new Error("dispatch failed"), [item]); }, - flushKey: async () => {}, + flushKey: async (_key: string) => {}, + flushAll: async () => {}, }), resolveInboundDebounceMs, }, diff --git a/src/auto-reply/inbound-debounce.ts b/src/auto-reply/inbound-debounce.ts index 96be2d94b1b..77eb234847a 100644 --- a/src/auto-reply/inbound-debounce.ts +++ b/src/auto-reply/inbound-debounce.ts @@ -5,9 +5,12 @@ import { resolveGlobalMap } from "../shared/global-singleton.js"; /** * Global registry of all active inbound debouncers so they can be flushed * collectively during gateway restart (SIGUSR1). Each debouncer registers - * itself on creation and deregisters when flushAll is called. + * itself on creation and deregisters after the next global flush sweep. */ -type DebouncerFlushHandle = { flushAll: () => Promise }; +type DebouncerFlushHandle = { + getPendingBufferCount: () => number; + flushAll: () => Promise; +}; const INBOUND_DEBOUNCERS_KEY = Symbol.for("openclaw.inboundDebouncers"); const INBOUND_DEBOUNCERS = resolveGlobalMap(INBOUND_DEBOUNCERS_KEY); @@ -21,24 +24,28 @@ export function clearInboundDebouncerRegistry(): void { /** * Flush all registered inbound debouncers immediately. Called during SIGUSR1 * restart to push buffered messages into the session before reinitializing. + * Returns the number of pending debounce buffers that existed when the flush + * started so restart logic can skip followup draining when there was no work. */ export async function flushAllInboundDebouncers(): Promise { const entries = [...INBOUND_DEBOUNCERS.entries()]; if (entries.length === 0) { return 0; } - let flushedCount = 0; + const pendingBufferCount = entries.reduce( + (count, [, handle]) => count + handle.getPendingBufferCount(), + 0, + ); await Promise.all( entries.map(async ([key, handle]) => { try { await handle.flushAll(); - flushedCount += 1; } finally { INBOUND_DEBOUNCERS.delete(key); } }), ); - return flushedCount; + return pendingBufferCount; } const resolveMs = (value: unknown): number | undefined => { @@ -177,7 +184,10 @@ export function createInboundDebouncer(params: InboundDebounceCreateParams // Register in global registry for SIGUSR1 flush. const registryKey = Symbol(); - INBOUND_DEBOUNCERS.set(registryKey, { flushAll }); + INBOUND_DEBOUNCERS.set(registryKey, { + getPendingBufferCount: () => buffers.size, + flushAll, + }); return { enqueue, flushKey, flushAll }; } diff --git a/src/auto-reply/inbound.test.ts b/src/auto-reply/inbound.test.ts index c43fa28ce4b..0075d758bc6 100644 --- a/src/auto-reply/inbound.test.ts +++ b/src/auto-reply/inbound.test.ts @@ -372,7 +372,7 @@ describe("flushAllInboundDebouncers", () => { clearInboundDebouncerRegistry(); }); - it("flushes all registered debouncers immediately", async () => { + it("flushes all pending inbound debounce buffers immediately", async () => { vi.useFakeTimers(); const callsA: Array = []; const callsB: Array = []; @@ -409,6 +409,36 @@ describe("flushAllInboundDebouncers", () => { vi.useRealTimers(); }); + it("counts pending buffers instead of registered debouncers", async () => { + vi.useFakeTimers(); + const calls: Array = []; + + const activeDebouncer = createInboundDebouncer<{ key: string; id: string }>({ + debounceMs: 5000, + buildKey: (item) => item.key, + onFlush: async (items) => { + calls.push(items.map((entry) => entry.id)); + }, + }); + + createInboundDebouncer<{ key: string; id: string }>({ + debounceMs: 5000, + buildKey: (item) => item.key, + onFlush: async () => {}, + }); + + await activeDebouncer.enqueue({ key: "session-1", id: "msg-1" }); + await activeDebouncer.enqueue({ key: "session-2", id: "msg-2" }); + + const flushed = await flushAllInboundDebouncers(); + expect(flushed).toBe(2); + expect(calls).toHaveLength(2); + expect(calls).toContainEqual(["msg-1"]); + expect(calls).toContainEqual(["msg-2"]); + + vi.useRealTimers(); + }); + it("returns 0 when no debouncers are registered", async () => { const flushed = await flushAllInboundDebouncers(); expect(flushed).toBe(0); diff --git a/src/auto-reply/reply/queue/drain-all.test.ts b/src/auto-reply/reply/queue/drain-all.test.ts index ad166e07b0a..eff31238def 100644 --- a/src/auto-reply/reply/queue/drain-all.test.ts +++ b/src/auto-reply/reply/queue/drain-all.test.ts @@ -77,4 +77,13 @@ describe("waitForFollowupQueueDrain", () => { expect(result.drained).toBe(false); expect(result.remaining).toBeGreaterThanOrEqual(1); }); + + it("reports each draining queue in the timeout remaining count", async () => { + FOLLOWUP_QUEUES.set("queue-1", createMockQueue({ draining: true })); + FOLLOWUP_QUEUES.set("queue-2", createMockQueue({ draining: true })); + FOLLOWUP_QUEUES.set("queue-3", createMockQueue({ draining: true })); + + const result = await waitForFollowupQueueDrain(1); + expect(result).toEqual({ drained: false, remaining: 3 }); + }); }); diff --git a/src/auto-reply/reply/queue/drain-all.ts b/src/auto-reply/reply/queue/drain-all.ts index a0e4c2e4f5a..f681ed0f557 100644 --- a/src/auto-reply/reply/queue/drain-all.ts +++ b/src/auto-reply/reply/queue/drain-all.ts @@ -17,13 +17,9 @@ export async function waitForFollowupQueueDrain( const getPendingCount = (): number => { let total = 0; for (const queue of FOLLOWUP_QUEUES.values()) { - total += queue.items.length; - if (queue.draining) { - // Count draining queues as having at least 1 pending item so we keep - // waiting for the drain loop to finish even if items.length hits 0 - // momentarily between shifts. - total = Math.max(total, 1); - } + // Add 1 for the in-flight item owned by an active drain loop. + const queuePending = queue.items.length + (queue.draining ? 1 : 0); + total += queuePending; } return total; }; diff --git a/src/cli/gateway-cli/run-loop.test.ts b/src/cli/gateway-cli/run-loop.test.ts index 49c1c4734d1..dd9f80db8e3 100644 --- a/src/cli/gateway-cli/run-loop.test.ts +++ b/src/cli/gateway-cli/run-loop.test.ts @@ -375,9 +375,17 @@ describe("runGatewayLoop", () => { expect(flushAllInboundDebouncers).toHaveBeenCalledTimes(1); expect(waitForFollowupQueueDrain).toHaveBeenCalledWith(5_000); expect(markGatewayDraining).toHaveBeenCalledTimes(1); + expect(flushAllInboundDebouncers.mock.invocationCallOrder[0]).toBeLessThan( + waitForFollowupQueueDrain.mock.invocationCallOrder[0] ?? Number.POSITIVE_INFINITY, + ); + expect(waitForFollowupQueueDrain.mock.invocationCallOrder[0]).toBeLessThan( + markGatewayDraining.mock.invocationCallOrder[0] ?? Number.POSITIVE_INFINITY, + ); // Verify logging - expect(gatewayLog.info).toHaveBeenCalledWith("flushed 2 inbound debouncer(s) before restart"); + expect(gatewayLog.info).toHaveBeenCalledWith( + "flushed 2 pending inbound debounce buffer(s) before restart", + ); expect(gatewayLog.info).toHaveBeenCalledWith("followup queues drained after debounce flush"); sigterm(); @@ -385,6 +393,38 @@ describe("runGatewayLoop", () => { }); }); + it("extends the restart force-exit timer to include followup queue drain time", async () => { + vi.clearAllMocks(); + + await withIsolatedSignals(async ({ captureSignal }) => { + flushAllInboundDebouncers.mockResolvedValueOnce(1); + waitForFollowupQueueDrain.mockResolvedValueOnce({ + drained: true, + remaining: 0, + }); + + const setTimeoutSpy = vi.spyOn(globalThis, "setTimeout"); + try { + const { exited } = await createSignaledLoopHarness(); + const sigusr1 = captureSignal("SIGUSR1"); + const sigterm = captureSignal("SIGTERM"); + + sigusr1(); + + await new Promise((resolve) => setImmediate(resolve)); + await new Promise((resolve) => setImmediate(resolve)); + + const forceExitCall = setTimeoutSpy.mock.calls.find((call) => call[1] === 100_000); + expect(forceExitCall).toBeDefined(); + + sigterm(); + await expect(exited).resolves.toBe(0); + } finally { + setTimeoutSpy.mockRestore(); + } + }); + }); + it("skips followup queue drain when no debouncers had buffered messages", async () => { vi.clearAllMocks(); diff --git a/src/cli/gateway-cli/run-loop.ts b/src/cli/gateway-cli/run-loop.ts index 27332d540e3..85bf389cb1a 100644 --- a/src/cli/gateway-cli/run-loop.ts +++ b/src/cli/gateway-cli/run-loop.ts @@ -99,6 +99,7 @@ export async function runGatewayLoop(params: { }; const DRAIN_TIMEOUT_MS = 90_000; + const FOLLOWUP_DRAIN_TIMEOUT_MS = 5_000; const SHUTDOWN_TIMEOUT_MS = 5_000; const request = (action: GatewayRunSignalAction, signal: string) => { @@ -111,7 +112,9 @@ export async function runGatewayLoop(params: { gatewayLog.info(`received ${signal}; ${isRestart ? "restarting" : "shutting down"}`); // Allow extra time for draining active turns on restart. - const forceExitMs = isRestart ? DRAIN_TIMEOUT_MS + SHUTDOWN_TIMEOUT_MS : SHUTDOWN_TIMEOUT_MS; + const forceExitMs = isRestart + ? DRAIN_TIMEOUT_MS + FOLLOWUP_DRAIN_TIMEOUT_MS + SHUTDOWN_TIMEOUT_MS + : SHUTDOWN_TIMEOUT_MS; const forceExitTimer = setTimeout(() => { gatewayLog.error("shutdown timed out; exiting without full cleanup"); // Exit non-zero on restart timeout so launchd/systemd treats it as a @@ -129,12 +132,13 @@ export async function runGatewayLoop(params: { // waiting in per-channel debounce timers (e.g. the 2500ms collect // window) into the followup queues immediately, preventing silent // message loss when the server reinitializes. - const flushedDebouncers = await flushAllInboundDebouncers(); - if (flushedDebouncers > 0) { - gatewayLog.info(`flushed ${flushedDebouncers} inbound debouncer(s) before restart`); + const flushedBuffers = await flushAllInboundDebouncers(); + if (flushedBuffers > 0) { + gatewayLog.info( + `flushed ${flushedBuffers} pending inbound debounce buffer(s) before restart`, + ); // Give the followup queue drain loops a short window to process // the newly flushed items before we mark the gateway as draining. - const FOLLOWUP_DRAIN_TIMEOUT_MS = 5_000; const followupResult = await waitForFollowupQueueDrain(FOLLOWUP_DRAIN_TIMEOUT_MS); if (followupResult.drained) { gatewayLog.info("followup queues drained after debounce flush");