diff --git a/src/auto-reply/inbound-debounce.ts b/src/auto-reply/inbound-debounce.ts index 77eb234847a..8c5ebe2a846 100644 --- a/src/auto-reply/inbound-debounce.ts +++ b/src/auto-reply/inbound-debounce.ts @@ -21,6 +21,13 @@ export function clearInboundDebouncerRegistry(): void { INBOUND_DEBOUNCERS.clear(); } +export function getPendingInboundDebounceBufferCount(): number { + return [...INBOUND_DEBOUNCERS.values()].reduce( + (count, handle) => count + handle.getPendingBufferCount(), + 0, + ); +} + /** * Flush all registered inbound debouncers immediately. Called during SIGUSR1 * restart to push buffered messages into the session before reinitializing. @@ -32,10 +39,7 @@ export async function flushAllInboundDebouncers(): Promise { if (entries.length === 0) { return 0; } - const pendingBufferCount = entries.reduce( - (count, [, handle]) => count + handle.getPendingBufferCount(), - 0, - ); + const pendingBufferCount = getPendingInboundDebounceBufferCount(); await Promise.all( entries.map(async ([key, handle]) => { try { diff --git a/src/cli/gateway-cli/run-loop.test.ts b/src/cli/gateway-cli/run-loop.test.ts index dd9f80db8e3..70c3e52e448 100644 --- a/src/cli/gateway-cli/run-loop.test.ts +++ b/src/cli/gateway-cli/run-loop.test.ts @@ -37,6 +37,7 @@ const getActiveEmbeddedRunCount = vi.fn(() => 0); const waitForActiveEmbeddedRuns = vi.fn(async (_timeoutMs: number) => ({ drained: true, })); +const getPendingInboundDebounceBufferCount = vi.fn(() => 0); const flushAllInboundDebouncers = vi.fn(async () => 0); const waitForFollowupQueueDrain = vi.fn(async (_timeoutMs: number) => ({ drained: true, @@ -50,6 +51,7 @@ const gatewayLog = { }; vi.mock("../../auto-reply/inbound-debounce.js", () => ({ + getPendingInboundDebounceBufferCount: () => getPendingInboundDebounceBufferCount(), flushAllInboundDebouncers: () => flushAllInboundDebouncers(), })); @@ -355,6 +357,7 @@ describe("runGatewayLoop", () => { await withIsolatedSignals(async ({ captureSignal }) => { // Simulate debouncers having buffered messages + getPendingInboundDebounceBufferCount.mockReturnValueOnce(2); flushAllInboundDebouncers.mockResolvedValueOnce(2); waitForFollowupQueueDrain.mockResolvedValueOnce({ drained: true, @@ -397,6 +400,7 @@ describe("runGatewayLoop", () => { vi.clearAllMocks(); await withIsolatedSignals(async ({ captureSignal }) => { + getPendingInboundDebounceBufferCount.mockReturnValueOnce(1); flushAllInboundDebouncers.mockResolvedValueOnce(1); waitForFollowupQueueDrain.mockResolvedValueOnce({ drained: true, @@ -430,25 +434,33 @@ describe("runGatewayLoop", () => { await withIsolatedSignals(async ({ captureSignal }) => { // No debouncers had buffered messages + getPendingInboundDebounceBufferCount.mockReturnValueOnce(0); flushAllInboundDebouncers.mockResolvedValueOnce(0); - const { exited } = await createSignaledLoopHarness(); - const sigusr1 = captureSignal("SIGUSR1"); - const sigterm = captureSignal("SIGTERM"); + const setTimeoutSpy = vi.spyOn(globalThis, "setTimeout"); + try { + const { exited } = await createSignaledLoopHarness(); + const sigusr1 = captureSignal("SIGUSR1"); + const sigterm = captureSignal("SIGTERM"); - sigusr1(); + sigusr1(); - await new Promise((resolve) => setImmediate(resolve)); - await new Promise((resolve) => setImmediate(resolve)); + await new Promise((resolve) => setImmediate(resolve)); + await new Promise((resolve) => setImmediate(resolve)); - expect(flushAllInboundDebouncers).toHaveBeenCalledTimes(1); - // Should NOT wait for followup drain when nothing was flushed - expect(waitForFollowupQueueDrain).not.toHaveBeenCalled(); - // Should still mark draining - expect(markGatewayDraining).toHaveBeenCalledTimes(1); + expect(flushAllInboundDebouncers).toHaveBeenCalledTimes(1); + // Should NOT wait for followup drain when nothing was flushed + expect(waitForFollowupQueueDrain).not.toHaveBeenCalled(); + // Should still mark draining + expect(markGatewayDraining).toHaveBeenCalledTimes(1); + const forceExitCall = setTimeoutSpy.mock.calls.find((call) => call[1] === 95_000); + expect(forceExitCall).toBeDefined(); - sigterm(); - await expect(exited).resolves.toBe(0); + sigterm(); + await expect(exited).resolves.toBe(0); + } finally { + setTimeoutSpy.mockRestore(); + } }); }); @@ -456,6 +468,7 @@ describe("runGatewayLoop", () => { vi.clearAllMocks(); await withIsolatedSignals(async ({ captureSignal }) => { + getPendingInboundDebounceBufferCount.mockReturnValueOnce(1); flushAllInboundDebouncers.mockResolvedValueOnce(1); waitForFollowupQueueDrain.mockResolvedValueOnce({ drained: false, diff --git a/src/cli/gateway-cli/run-loop.ts b/src/cli/gateway-cli/run-loop.ts index 85bf389cb1a..c3885ae9322 100644 --- a/src/cli/gateway-cli/run-loop.ts +++ b/src/cli/gateway-cli/run-loop.ts @@ -3,7 +3,10 @@ import { getActiveEmbeddedRunCount, waitForActiveEmbeddedRuns, } from "../../agents/pi-embedded-runner/runs.js"; -import { flushAllInboundDebouncers } from "../../auto-reply/inbound-debounce.js"; +import { + flushAllInboundDebouncers, + getPendingInboundDebounceBufferCount, +} from "../../auto-reply/inbound-debounce.js"; import { waitForFollowupQueueDrain } from "../../auto-reply/reply/queue/drain-all.js"; import type { startGatewayServer } from "../../gateway/server.js"; import { acquireGatewayLock } from "../../infra/gateway-lock.js"; @@ -111,10 +114,16 @@ export async function runGatewayLoop(params: { const isRestart = action === "restart"; gatewayLog.info(`received ${signal}; ${isRestart ? "restarting" : "shutting down"}`); - // Allow extra time for draining active turns on restart. - const forceExitMs = isRestart - ? DRAIN_TIMEOUT_MS + FOLLOWUP_DRAIN_TIMEOUT_MS + SHUTDOWN_TIMEOUT_MS - : SHUTDOWN_TIMEOUT_MS; + // Allow extra time for followup drain only when restart will actually + // flush buffered inbound messages into followup queues. + const hasPendingInboundDebounceBuffers = getPendingInboundDebounceBufferCount() > 0; + let forceExitMs = SHUTDOWN_TIMEOUT_MS; + if (isRestart) { + forceExitMs += DRAIN_TIMEOUT_MS; + if (hasPendingInboundDebounceBuffers) { + forceExitMs += FOLLOWUP_DRAIN_TIMEOUT_MS; + } + } const forceExitTimer = setTimeout(() => { gatewayLog.error("shutdown timed out; exiting without full cleanup"); // Exit non-zero on restart timeout so launchd/systemd treats it as a