From 23731ed09913f99a0e666d77d31d022f982d1fd3 Mon Sep 17 00:00:00 2001 From: Joey Krug Date: Sat, 14 Mar 2026 16:39:24 -0400 Subject: [PATCH] Gateway: finish drain-queue restart review fixes --- .../feishu/src/monitor.reaction.test.ts | 2 +- src/auto-reply/inbound-debounce.ts | 41 ++++++------- src/auto-reply/inbound.test.ts | 28 +++++++++ src/cli/gateway-cli/run-loop.test.ts | 39 ++++++------- src/cli/gateway-cli/run-loop.ts | 57 ++++++++++--------- 5 files changed, 99 insertions(+), 68 deletions(-) diff --git a/extensions/feishu/src/monitor.reaction.test.ts b/extensions/feishu/src/monitor.reaction.test.ts index 06efee0661a..b9fecb8050c 100644 --- a/extensions/feishu/src/monitor.reaction.test.ts +++ b/extensions/feishu/src/monitor.reaction.test.ts @@ -712,7 +712,7 @@ describe("Feishu inbound debounce regressions", () => { params.onError?.(new Error("dispatch failed"), [item]); }, flushKey: async (_key: string) => {}, - flushAll: async () => {}, + flushAll: async () => 0, }), resolveInboundDebounceMs, }, diff --git a/src/auto-reply/inbound-debounce.ts b/src/auto-reply/inbound-debounce.ts index 8c5ebe2a846..c74cb2f5992 100644 --- a/src/auto-reply/inbound-debounce.ts +++ b/src/auto-reply/inbound-debounce.ts @@ -8,8 +8,7 @@ import { resolveGlobalMap } from "../shared/global-singleton.js"; * itself on creation and deregisters after the next global flush sweep. */ type DebouncerFlushHandle = { - getPendingBufferCount: () => number; - flushAll: () => Promise; + flushAll: () => Promise; }; const INBOUND_DEBOUNCERS_KEY = Symbol.for("openclaw.inboundDebouncers"); const INBOUND_DEBOUNCERS = resolveGlobalMap(INBOUND_DEBOUNCERS_KEY); @@ -21,35 +20,27 @@ export function clearInboundDebouncerRegistry(): void { INBOUND_DEBOUNCERS.clear(); } -export function getPendingInboundDebounceBufferCount(): number { - return [...INBOUND_DEBOUNCERS.values()].reduce( - (count, handle) => count + handle.getPendingBufferCount(), - 0, - ); -} - /** * Flush all registered inbound debouncers immediately. Called during SIGUSR1 * restart to push buffered messages into the session before reinitializing. - * Returns the number of pending debounce buffers that existed when the flush - * started so restart logic can skip followup draining when there was no work. + * Returns the number of debounce buffers actually flushed so restart logic can + * skip followup draining when there was no buffered work. */ export async function flushAllInboundDebouncers(): Promise { const entries = [...INBOUND_DEBOUNCERS.entries()]; if (entries.length === 0) { return 0; } - const pendingBufferCount = getPendingInboundDebounceBufferCount(); - await Promise.all( + const flushedCounts = await Promise.all( entries.map(async ([key, handle]) => { try { - await handle.flushAll(); + return await handle.flushAll(); } finally { INBOUND_DEBOUNCERS.delete(key); } }), ); - return pendingBufferCount; + return flushedCounts.reduce((total, count) => total + count, 0); } const resolveMs = (value: unknown): number | undefined => { @@ -180,16 +171,28 @@ export function createInboundDebouncer(params: InboundDebounceCreateParams }; const flushAll = async () => { - const keys = [...buffers.keys()]; - for (const key of keys) { - await flushKey(key); + let flushedBufferCount = 0; + + // Keep sweeping until no debounced keys remain. A flush callback can race + // with late in-flight ingress and create another buffered key before the + // global registry deregisters this debouncer during restart. + while (buffers.size > 0) { + const keys = [...buffers.keys()]; + for (const key of keys) { + if (!buffers.has(key)) { + continue; + } + await flushKey(key); + flushedBufferCount += 1; + } } + + return flushedBufferCount; }; // Register in global registry for SIGUSR1 flush. const registryKey = Symbol(); INBOUND_DEBOUNCERS.set(registryKey, { - getPendingBufferCount: () => buffers.size, flushAll, }); diff --git a/src/auto-reply/inbound.test.ts b/src/auto-reply/inbound.test.ts index 0075d758bc6..54ea66ae1d8 100644 --- a/src/auto-reply/inbound.test.ts +++ b/src/auto-reply/inbound.test.ts @@ -439,6 +439,34 @@ describe("flushAllInboundDebouncers", () => { vi.useRealTimers(); }); + it("keeps flushing until no buffered keys remain", async () => { + vi.useFakeTimers(); + const calls: Array = []; + let enqueuedDuringFlush = false; + + let debouncer: ReturnType>; + debouncer = createInboundDebouncer<{ key: string; id: string }>({ + debounceMs: 5000, + buildKey: (item) => item.key, + onFlush: async (items) => { + calls.push(items.map((entry) => entry.id)); + if (!enqueuedDuringFlush) { + enqueuedDuringFlush = true; + await debouncer.enqueue({ key: "session-2", id: "msg-2" }); + } + }, + }); + + await debouncer.enqueue({ key: "session-1", id: "msg-1" }); + + const flushed = await flushAllInboundDebouncers(); + expect(flushed).toBe(2); + expect(calls).toEqual([["msg-1"], ["msg-2"]]); + await expect(flushAllInboundDebouncers()).resolves.toBe(0); + + vi.useRealTimers(); + }); + it("returns 0 when no debouncers are registered", async () => { const flushed = await flushAllInboundDebouncers(); expect(flushed).toBe(0); diff --git a/src/cli/gateway-cli/run-loop.test.ts b/src/cli/gateway-cli/run-loop.test.ts index 70c3e52e448..76a4e3066b4 100644 --- a/src/cli/gateway-cli/run-loop.test.ts +++ b/src/cli/gateway-cli/run-loop.test.ts @@ -37,7 +37,6 @@ const getActiveEmbeddedRunCount = vi.fn(() => 0); const waitForActiveEmbeddedRuns = vi.fn(async (_timeoutMs: number) => ({ drained: true, })); -const getPendingInboundDebounceBufferCount = vi.fn(() => 0); const flushAllInboundDebouncers = vi.fn(async () => 0); const waitForFollowupQueueDrain = vi.fn(async (_timeoutMs: number) => ({ drained: true, @@ -51,7 +50,6 @@ const gatewayLog = { }; vi.mock("../../auto-reply/inbound-debounce.js", () => ({ - getPendingInboundDebounceBufferCount: () => getPendingInboundDebounceBufferCount(), flushAllInboundDebouncers: () => flushAllInboundDebouncers(), })); @@ -352,12 +350,10 @@ describe("runGatewayLoop", () => { }); }); - it("flushes inbound debouncers and waits for followup queue drain before marking gateway draining on SIGUSR1", async () => { + it("marks gateway draining before flushing inbound debouncers on SIGUSR1", async () => { vi.clearAllMocks(); await withIsolatedSignals(async ({ captureSignal }) => { - // Simulate debouncers having buffered messages - getPendingInboundDebounceBufferCount.mockReturnValueOnce(2); flushAllInboundDebouncers.mockResolvedValueOnce(2); waitForFollowupQueueDrain.mockResolvedValueOnce({ drained: true, @@ -370,22 +366,22 @@ describe("runGatewayLoop", () => { sigusr1(); - // Let the restart complete await new Promise((resolve) => setImmediate(resolve)); await new Promise((resolve) => setImmediate(resolve)); - // Verify flush was called before markGatewayDraining + expect(markGatewayDraining).toHaveBeenCalledTimes(1); expect(flushAllInboundDebouncers).toHaveBeenCalledTimes(1); expect(waitForFollowupQueueDrain).toHaveBeenCalledWith(5_000); - expect(markGatewayDraining).toHaveBeenCalledTimes(1); + expect(markGatewayDraining.mock.invocationCallOrder[0]).toBeLessThan( + flushAllInboundDebouncers.mock.invocationCallOrder[0] ?? Number.POSITIVE_INFINITY, + ); + expect(markGatewayDraining.mock.invocationCallOrder[0]).toBeLessThan( + waitForFollowupQueueDrain.mock.invocationCallOrder[0] ?? Number.POSITIVE_INFINITY, + ); expect(flushAllInboundDebouncers.mock.invocationCallOrder[0]).toBeLessThan( waitForFollowupQueueDrain.mock.invocationCallOrder[0] ?? Number.POSITIVE_INFINITY, ); - expect(waitForFollowupQueueDrain.mock.invocationCallOrder[0]).toBeLessThan( - markGatewayDraining.mock.invocationCallOrder[0] ?? Number.POSITIVE_INFINITY, - ); - // Verify logging expect(gatewayLog.info).toHaveBeenCalledWith( "flushed 2 pending inbound debounce buffer(s) before restart", ); @@ -400,7 +396,6 @@ describe("runGatewayLoop", () => { vi.clearAllMocks(); await withIsolatedSignals(async ({ captureSignal }) => { - getPendingInboundDebounceBufferCount.mockReturnValueOnce(1); flushAllInboundDebouncers.mockResolvedValueOnce(1); waitForFollowupQueueDrain.mockResolvedValueOnce({ drained: true, @@ -418,8 +413,11 @@ describe("runGatewayLoop", () => { await new Promise((resolve) => setImmediate(resolve)); await new Promise((resolve) => setImmediate(resolve)); - const forceExitCall = setTimeoutSpy.mock.calls.find((call) => call[1] === 100_000); - expect(forceExitCall).toBeDefined(); + const forceExitCalls = setTimeoutSpy.mock.calls + .map((call) => call[1]) + .filter((delay): delay is number => typeof delay === "number" && delay >= 95_000); + expect(forceExitCalls).toContain(95_000); + expect(forceExitCalls.some((delay) => delay > 95_000 && delay <= 100_000)).toBe(true); sigterm(); await expect(exited).resolves.toBe(0); @@ -433,8 +431,6 @@ describe("runGatewayLoop", () => { vi.clearAllMocks(); await withIsolatedSignals(async ({ captureSignal }) => { - // No debouncers had buffered messages - getPendingInboundDebounceBufferCount.mockReturnValueOnce(0); flushAllInboundDebouncers.mockResolvedValueOnce(0); const setTimeoutSpy = vi.spyOn(globalThis, "setTimeout"); @@ -449,12 +445,12 @@ describe("runGatewayLoop", () => { await new Promise((resolve) => setImmediate(resolve)); expect(flushAllInboundDebouncers).toHaveBeenCalledTimes(1); - // Should NOT wait for followup drain when nothing was flushed expect(waitForFollowupQueueDrain).not.toHaveBeenCalled(); - // Should still mark draining expect(markGatewayDraining).toHaveBeenCalledTimes(1); - const forceExitCall = setTimeoutSpy.mock.calls.find((call) => call[1] === 95_000); - expect(forceExitCall).toBeDefined(); + const forceExitCalls = setTimeoutSpy.mock.calls + .map((call) => call[1]) + .filter((delay): delay is number => typeof delay === "number" && delay >= 95_000); + expect(forceExitCalls).toEqual([95_000]); sigterm(); await expect(exited).resolves.toBe(0); @@ -468,7 +464,6 @@ describe("runGatewayLoop", () => { vi.clearAllMocks(); await withIsolatedSignals(async ({ captureSignal }) => { - getPendingInboundDebounceBufferCount.mockReturnValueOnce(1); flushAllInboundDebouncers.mockResolvedValueOnce(1); waitForFollowupQueueDrain.mockResolvedValueOnce({ drained: false, diff --git a/src/cli/gateway-cli/run-loop.ts b/src/cli/gateway-cli/run-loop.ts index c3885ae9322..338b85a9917 100644 --- a/src/cli/gateway-cli/run-loop.ts +++ b/src/cli/gateway-cli/run-loop.ts @@ -3,10 +3,7 @@ import { getActiveEmbeddedRunCount, waitForActiveEmbeddedRuns, } from "../../agents/pi-embedded-runner/runs.js"; -import { - flushAllInboundDebouncers, - getPendingInboundDebounceBufferCount, -} from "../../auto-reply/inbound-debounce.js"; +import { flushAllInboundDebouncers } from "../../auto-reply/inbound-debounce.js"; import { waitForFollowupQueueDrain } from "../../auto-reply/reply/queue/drain-all.js"; import type { startGatewayServer } from "../../gateway/server.js"; import { acquireGatewayLock } from "../../infra/gateway-lock.js"; @@ -114,29 +111,38 @@ export async function runGatewayLoop(params: { const isRestart = action === "restart"; gatewayLog.info(`received ${signal}; ${isRestart ? "restarting" : "shutting down"}`); - // Allow extra time for followup drain only when restart will actually - // flush buffered inbound messages into followup queues. - const hasPendingInboundDebounceBuffers = getPendingInboundDebounceBufferCount() > 0; - let forceExitMs = SHUTDOWN_TIMEOUT_MS; - if (isRestart) { - forceExitMs += DRAIN_TIMEOUT_MS; - if (hasPendingInboundDebounceBuffers) { - forceExitMs += FOLLOWUP_DRAIN_TIMEOUT_MS; + const signalStartMs = Date.now(); + const baseForceExitDeadlineMs = + signalStartMs + SHUTDOWN_TIMEOUT_MS + (isRestart ? DRAIN_TIMEOUT_MS : 0); + let forceExitTimer: ReturnType | null = null; + const armForceExitTimer = (deadlineMs: number) => { + if (forceExitTimer) { + clearTimeout(forceExitTimer); } - } - const forceExitTimer = setTimeout(() => { - gatewayLog.error("shutdown timed out; exiting without full cleanup"); - // Exit non-zero on restart timeout so launchd/systemd treats it as a - // failure and triggers a clean process restart instead of assuming the - // shutdown was intentional. Stop-timeout stays at 0 (graceful). (#36822) - exitProcess(isRestart ? 1 : 0); - }, forceExitMs); + forceExitTimer = setTimeout( + () => { + gatewayLog.error("shutdown timed out; exiting without full cleanup"); + // Exit non-zero on restart timeout so launchd/systemd treats it as a + // failure and triggers a clean process restart instead of assuming the + // shutdown was intentional. Stop-timeout stays at 0 (graceful). (#36822) + exitProcess(isRestart ? 1 : 0); + }, + Math.max(0, deadlineMs - Date.now()), + ); + forceExitTimer.unref?.(); + }; + armForceExitTimer(baseForceExitDeadlineMs); void (async () => { try { // On restart, wait for in-flight agent turns to finish before // tearing down the server so buffered messages are delivered. if (isRestart) { + // Reject new command-queue work before any awaited restart drain + // step so late arrivals fail explicitly instead of being stranded + // behind a one-shot debounce flush. + markGatewayDraining(); + // Flush inbound debounce buffers first. This pushes any messages // waiting in per-channel debounce timers (e.g. the 2500ms collect // window) into the followup queues immediately, preventing silent @@ -147,7 +153,8 @@ export async function runGatewayLoop(params: { `flushed ${flushedBuffers} pending inbound debounce buffer(s) before restart`, ); // Give the followup queue drain loops a short window to process - // the newly flushed items before we mark the gateway as draining. + // the newly flushed items before the server is torn down. + armForceExitTimer(baseForceExitDeadlineMs + FOLLOWUP_DRAIN_TIMEOUT_MS); const followupResult = await waitForFollowupQueueDrain(FOLLOWUP_DRAIN_TIMEOUT_MS); if (followupResult.drained) { gatewayLog.info("followup queues drained after debounce flush"); @@ -157,10 +164,6 @@ export async function runGatewayLoop(params: { ); } } - - // Reject new enqueues immediately during the drain window so - // sessions get an explicit restart error instead of silent task loss. - markGatewayDraining(); const activeTasks = getActiveTaskCount(); const activeRuns = getActiveEmbeddedRunCount(); @@ -200,7 +203,9 @@ export async function runGatewayLoop(params: { } catch (err) { gatewayLog.error(`shutdown error: ${String(err)}`); } finally { - clearTimeout(forceExitTimer); + if (forceExitTimer) { + clearTimeout(forceExitTimer); + } server = null; if (isRestart) { await handleRestartAfterServerClose();