From 324c43931b34f805133172d56ed4f7c9dc4a28d8 Mon Sep 17 00:00:00 2001 From: Joey Krug Date: Sat, 14 Mar 2026 18:06:22 -0400 Subject: [PATCH] fix: bound restart debounce draining --- src/auto-reply/inbound-debounce.ts | 18 ++++++++--- src/auto-reply/inbound.test.ts | 36 ++++++++++++++++++++++ src/cli/gateway-cli/run-loop.test.ts | 45 ++++++++++++++++++++++++++-- src/cli/gateway-cli/run-loop.ts | 20 +++++++++---- 4 files changed, 107 insertions(+), 12 deletions(-) diff --git a/src/auto-reply/inbound-debounce.ts b/src/auto-reply/inbound-debounce.ts index c5e6b53520c..abbd3df9694 100644 --- a/src/auto-reply/inbound-debounce.ts +++ b/src/auto-reply/inbound-debounce.ts @@ -8,7 +8,7 @@ import { resolveGlobalMap } from "../shared/global-singleton.js"; * itself on creation and deregisters after the next global flush sweep. */ type DebouncerFlushHandle = { - flushAll: () => Promise; + flushAll: (options?: { deadlineMs?: number }) => Promise; }; const INBOUND_DEBOUNCERS_KEY = Symbol.for("openclaw.inboundDebouncers"); const INBOUND_DEBOUNCERS = resolveGlobalMap(INBOUND_DEBOUNCERS_KEY); @@ -26,15 +26,19 @@ export function clearInboundDebouncerRegistry(): void { * Returns the number of debounce buffers actually flushed so restart logic can * skip followup draining when there was no buffered work. */ -export async function flushAllInboundDebouncers(): Promise { +export async function flushAllInboundDebouncers(options?: { timeoutMs?: number }): Promise { const entries = [...INBOUND_DEBOUNCERS.entries()]; if (entries.length === 0) { return 0; } + const deadlineMs = + typeof options?.timeoutMs === "number" && Number.isFinite(options.timeoutMs) + ? Date.now() + Math.max(0, Math.trunc(options.timeoutMs)) + : undefined; const flushedCounts = await Promise.all( entries.map(async ([key, handle]) => { try { - return await handle.flushAll(); + return await handle.flushAll({ deadlineMs }); } finally { INBOUND_DEBOUNCERS.delete(key); } @@ -172,15 +176,21 @@ export function createInboundDebouncer(params: InboundDebounceCreateParams scheduleFlush(key, buffer); }; - const flushAll = async () => { + const flushAll = async (options?: { deadlineMs?: number }) => { let flushedBufferCount = 0; // Keep sweeping until no debounced keys remain. A flush callback can race // with late in-flight ingress and create another buffered key before the // global registry deregisters this debouncer during restart. while (buffers.size > 0) { + if (options?.deadlineMs !== undefined && Date.now() >= options.deadlineMs) { + break; + } const keys = [...buffers.keys()]; for (const key of keys) { + if (options?.deadlineMs !== undefined && Date.now() >= options.deadlineMs) { + return flushedBufferCount; + } if (!buffers.has(key)) { continue; } diff --git a/src/auto-reply/inbound.test.ts b/src/auto-reply/inbound.test.ts index 47fde4c9920..7becad2cedf 100644 --- a/src/auto-reply/inbound.test.ts +++ b/src/auto-reply/inbound.test.ts @@ -554,6 +554,42 @@ describe("createInboundDebouncer flushAll", () => { vi.useRealTimers(); }); + + it("stops sweeping when the global flush deadline is reached", async () => { + vi.useFakeTimers(); + const calls: Array = []; + let now = 0; + const nowSpy = vi.spyOn(Date, "now").mockImplementation(() => now); + + let debouncer: ReturnType>; + debouncer = createInboundDebouncer<{ key: string; id: string }>({ + debounceMs: 5000, + buildKey: (item) => item.key, + onFlush: async (items) => { + calls.push(items.map((entry) => entry.id)); + if (items[0]?.id === "1") { + await debouncer.enqueue({ key: "b", id: "2" }); + now = 20; + } + }, + }); + + try { + await debouncer.enqueue({ key: "a", id: "1" }); + + const flushed = await debouncer.flushAll({ deadlineMs: 10 }); + expect(flushed).toBe(1); + expect(calls).toEqual([["1"]]); + + now = 0; + const flushedLater = await debouncer.flushAll({ deadlineMs: 10 }); + expect(flushedLater).toBe(1); + expect(calls).toEqual([["1"], ["2"]]); + } finally { + nowSpy.mockRestore(); + vi.useRealTimers(); + } + }); }); describe("initSessionState BodyStripped", () => { diff --git a/src/cli/gateway-cli/run-loop.test.ts b/src/cli/gateway-cli/run-loop.test.ts index 76a4e3066b4..66aba21375d 100644 --- a/src/cli/gateway-cli/run-loop.test.ts +++ b/src/cli/gateway-cli/run-loop.test.ts @@ -50,7 +50,8 @@ const gatewayLog = { }; vi.mock("../../auto-reply/inbound-debounce.js", () => ({ - flushAllInboundDebouncers: () => flushAllInboundDebouncers(), + flushAllInboundDebouncers: (options?: { timeoutMs?: number }) => + flushAllInboundDebouncers(options), })); vi.mock("../../auto-reply/reply/queue/drain-all.js", () => ({ @@ -371,6 +372,7 @@ describe("runGatewayLoop", () => { expect(markGatewayDraining).toHaveBeenCalledTimes(1); expect(flushAllInboundDebouncers).toHaveBeenCalledTimes(1); + expect(flushAllInboundDebouncers).toHaveBeenCalledWith({ timeoutMs: 10_000 }); expect(waitForFollowupQueueDrain).toHaveBeenCalledWith(5_000); expect(markGatewayDraining.mock.invocationCallOrder[0]).toBeLessThan( flushAllInboundDebouncers.mock.invocationCallOrder[0] ?? Number.POSITIVE_INFINITY, @@ -417,7 +419,7 @@ describe("runGatewayLoop", () => { .map((call) => call[1]) .filter((delay): delay is number => typeof delay === "number" && delay >= 95_000); expect(forceExitCalls).toContain(95_000); - expect(forceExitCalls.some((delay) => delay > 95_000 && delay <= 100_000)).toBe(true); + expect(forceExitCalls).toContain(100_000); sigterm(); await expect(exited).resolves.toBe(0); @@ -445,12 +447,13 @@ describe("runGatewayLoop", () => { await new Promise((resolve) => setImmediate(resolve)); expect(flushAllInboundDebouncers).toHaveBeenCalledTimes(1); + expect(flushAllInboundDebouncers).toHaveBeenCalledWith({ timeoutMs: 10_000 }); expect(waitForFollowupQueueDrain).not.toHaveBeenCalled(); expect(markGatewayDraining).toHaveBeenCalledTimes(1); const forceExitCalls = setTimeoutSpy.mock.calls .map((call) => call[1]) .filter((delay): delay is number => typeof delay === "number" && delay >= 95_000); - expect(forceExitCalls).toEqual([95_000]); + expect(forceExitCalls).toEqual([95_000, 95_000]); sigterm(); await expect(exited).resolves.toBe(0); @@ -488,6 +491,42 @@ describe("runGatewayLoop", () => { }); }); + it("re-arms the restart watchdog after a slow debounce flush", async () => { + vi.clearAllMocks(); + + await withIsolatedSignals(async ({ captureSignal }) => { + let now = 1000; + const nowSpy = vi.spyOn(Date, "now").mockImplementation(() => now); + flushAllInboundDebouncers.mockImplementationOnce(async () => { + now += 20_000; + return 0; + }); + + const setTimeoutSpy = vi.spyOn(globalThis, "setTimeout"); + try { + const { exited } = await createSignaledLoopHarness(); + const sigusr1 = captureSignal("SIGUSR1"); + const sigterm = captureSignal("SIGTERM"); + + sigusr1(); + + await new Promise((resolve) => setImmediate(resolve)); + await new Promise((resolve) => setImmediate(resolve)); + + const forceExitCalls = setTimeoutSpy.mock.calls + .map((call) => call[1]) + .filter((delay): delay is number => typeof delay === "number" && delay >= 95_000); + expect(forceExitCalls).toEqual([95_000, 95_000]); + + sigterm(); + await expect(exited).resolves.toBe(0); + } finally { + nowSpy.mockRestore(); + setTimeoutSpy.mockRestore(); + } + }); + }); + it("releases the lock before exiting on spawned restart", async () => { vi.clearAllMocks(); diff --git a/src/cli/gateway-cli/run-loop.ts b/src/cli/gateway-cli/run-loop.ts index 338b85a9917..6167bc56500 100644 --- a/src/cli/gateway-cli/run-loop.ts +++ b/src/cli/gateway-cli/run-loop.ts @@ -100,6 +100,7 @@ export async function runGatewayLoop(params: { const DRAIN_TIMEOUT_MS = 90_000; const FOLLOWUP_DRAIN_TIMEOUT_MS = 5_000; + const INBOUND_DEBOUNCE_FLUSH_TIMEOUT_MS = 10_000; const SHUTDOWN_TIMEOUT_MS = 5_000; const request = (action: GatewayRunSignalAction, signal: string) => { @@ -111,9 +112,8 @@ export async function runGatewayLoop(params: { const isRestart = action === "restart"; gatewayLog.info(`received ${signal}; ${isRestart ? "restarting" : "shutting down"}`); - const signalStartMs = Date.now(); const baseForceExitDeadlineMs = - signalStartMs + SHUTDOWN_TIMEOUT_MS + (isRestart ? DRAIN_TIMEOUT_MS : 0); + Date.now() + SHUTDOWN_TIMEOUT_MS + (isRestart ? DRAIN_TIMEOUT_MS : 0); let forceExitTimer: ReturnType | null = null; const armForceExitTimer = (deadlineMs: number) => { if (forceExitTimer) { @@ -140,21 +140,31 @@ export async function runGatewayLoop(params: { if (isRestart) { // Reject new command-queue work before any awaited restart drain // step so late arrivals fail explicitly instead of being stranded - // behind a one-shot debounce flush. + // behind a one-shot debounce flush. This does not block followup + // queue enqueues, so flushed inbound work can still drain normally. markGatewayDraining(); // Flush inbound debounce buffers first. This pushes any messages // waiting in per-channel debounce timers (e.g. the 2500ms collect // window) into the followup queues immediately, preventing silent // message loss when the server reinitializes. - const flushedBuffers = await flushAllInboundDebouncers(); + const flushedBuffers = await flushAllInboundDebouncers({ + timeoutMs: INBOUND_DEBOUNCE_FLUSH_TIMEOUT_MS, + }); + // Start the restart watchdog budget after the pre-shutdown debounce + // flush so slow flush handlers do not steal time from active drain. + armForceExitTimer( + Date.now() + + SHUTDOWN_TIMEOUT_MS + + DRAIN_TIMEOUT_MS + + (flushedBuffers > 0 ? FOLLOWUP_DRAIN_TIMEOUT_MS : 0), + ); if (flushedBuffers > 0) { gatewayLog.info( `flushed ${flushedBuffers} pending inbound debounce buffer(s) before restart`, ); // Give the followup queue drain loops a short window to process // the newly flushed items before the server is torn down. - armForceExitTimer(baseForceExitDeadlineMs + FOLLOWUP_DRAIN_TIMEOUT_MS); const followupResult = await waitForFollowupQueueDrain(FOLLOWUP_DRAIN_TIMEOUT_MS); if (followupResult.drained) { gatewayLog.info("followup queues drained after debounce flush");