fix: only add restart drain timeout when needed

This commit is contained in:
Joey Krug 2026-03-14 15:37:08 -04:00
parent 9b19b945d7
commit 070fc040b2
3 changed files with 48 additions and 22 deletions

View File

@ -21,6 +21,13 @@ export function clearInboundDebouncerRegistry(): void {
INBOUND_DEBOUNCERS.clear();
}
export function getPendingInboundDebounceBufferCount(): number {
return [...INBOUND_DEBOUNCERS.values()].reduce(
(count, handle) => count + handle.getPendingBufferCount(),
0,
);
}
/**
* Flush all registered inbound debouncers immediately. Called during SIGUSR1
* restart to push buffered messages into the session before reinitializing.
@ -32,10 +39,7 @@ export async function flushAllInboundDebouncers(): Promise<number> {
if (entries.length === 0) {
return 0;
}
const pendingBufferCount = entries.reduce(
(count, [, handle]) => count + handle.getPendingBufferCount(),
0,
);
const pendingBufferCount = getPendingInboundDebounceBufferCount();
await Promise.all(
entries.map(async ([key, handle]) => {
try {

View File

@ -37,6 +37,7 @@ const getActiveEmbeddedRunCount = vi.fn(() => 0);
const waitForActiveEmbeddedRuns = vi.fn(async (_timeoutMs: number) => ({
drained: true,
}));
const getPendingInboundDebounceBufferCount = vi.fn(() => 0);
const flushAllInboundDebouncers = vi.fn(async () => 0);
const waitForFollowupQueueDrain = vi.fn(async (_timeoutMs: number) => ({
drained: true,
@ -50,6 +51,7 @@ const gatewayLog = {
};
vi.mock("../../auto-reply/inbound-debounce.js", () => ({
getPendingInboundDebounceBufferCount: () => getPendingInboundDebounceBufferCount(),
flushAllInboundDebouncers: () => flushAllInboundDebouncers(),
}));
@ -355,6 +357,7 @@ describe("runGatewayLoop", () => {
await withIsolatedSignals(async ({ captureSignal }) => {
// Simulate debouncers having buffered messages
getPendingInboundDebounceBufferCount.mockReturnValueOnce(2);
flushAllInboundDebouncers.mockResolvedValueOnce(2);
waitForFollowupQueueDrain.mockResolvedValueOnce({
drained: true,
@ -397,6 +400,7 @@ describe("runGatewayLoop", () => {
vi.clearAllMocks();
await withIsolatedSignals(async ({ captureSignal }) => {
getPendingInboundDebounceBufferCount.mockReturnValueOnce(1);
flushAllInboundDebouncers.mockResolvedValueOnce(1);
waitForFollowupQueueDrain.mockResolvedValueOnce({
drained: true,
@ -430,25 +434,33 @@ describe("runGatewayLoop", () => {
await withIsolatedSignals(async ({ captureSignal }) => {
// No debouncers had buffered messages
getPendingInboundDebounceBufferCount.mockReturnValueOnce(0);
flushAllInboundDebouncers.mockResolvedValueOnce(0);
const { exited } = await createSignaledLoopHarness();
const sigusr1 = captureSignal("SIGUSR1");
const sigterm = captureSignal("SIGTERM");
const setTimeoutSpy = vi.spyOn(globalThis, "setTimeout");
try {
const { exited } = await createSignaledLoopHarness();
const sigusr1 = captureSignal("SIGUSR1");
const sigterm = captureSignal("SIGTERM");
sigusr1();
sigusr1();
await new Promise<void>((resolve) => setImmediate(resolve));
await new Promise<void>((resolve) => setImmediate(resolve));
await new Promise<void>((resolve) => setImmediate(resolve));
await new Promise<void>((resolve) => setImmediate(resolve));
expect(flushAllInboundDebouncers).toHaveBeenCalledTimes(1);
// Should NOT wait for followup drain when nothing was flushed
expect(waitForFollowupQueueDrain).not.toHaveBeenCalled();
// Should still mark draining
expect(markGatewayDraining).toHaveBeenCalledTimes(1);
expect(flushAllInboundDebouncers).toHaveBeenCalledTimes(1);
// Should NOT wait for followup drain when nothing was flushed
expect(waitForFollowupQueueDrain).not.toHaveBeenCalled();
// Should still mark draining
expect(markGatewayDraining).toHaveBeenCalledTimes(1);
const forceExitCall = setTimeoutSpy.mock.calls.find((call) => call[1] === 95_000);
expect(forceExitCall).toBeDefined();
sigterm();
await expect(exited).resolves.toBe(0);
sigterm();
await expect(exited).resolves.toBe(0);
} finally {
setTimeoutSpy.mockRestore();
}
});
});
@ -456,6 +468,7 @@ describe("runGatewayLoop", () => {
vi.clearAllMocks();
await withIsolatedSignals(async ({ captureSignal }) => {
getPendingInboundDebounceBufferCount.mockReturnValueOnce(1);
flushAllInboundDebouncers.mockResolvedValueOnce(1);
waitForFollowupQueueDrain.mockResolvedValueOnce({
drained: false,

View File

@ -3,7 +3,10 @@ import {
getActiveEmbeddedRunCount,
waitForActiveEmbeddedRuns,
} from "../../agents/pi-embedded-runner/runs.js";
import { flushAllInboundDebouncers } from "../../auto-reply/inbound-debounce.js";
import {
flushAllInboundDebouncers,
getPendingInboundDebounceBufferCount,
} from "../../auto-reply/inbound-debounce.js";
import { waitForFollowupQueueDrain } from "../../auto-reply/reply/queue/drain-all.js";
import type { startGatewayServer } from "../../gateway/server.js";
import { acquireGatewayLock } from "../../infra/gateway-lock.js";
@ -111,10 +114,16 @@ export async function runGatewayLoop(params: {
const isRestart = action === "restart";
gatewayLog.info(`received ${signal}; ${isRestart ? "restarting" : "shutting down"}`);
// Allow extra time for draining active turns on restart.
const forceExitMs = isRestart
? DRAIN_TIMEOUT_MS + FOLLOWUP_DRAIN_TIMEOUT_MS + SHUTDOWN_TIMEOUT_MS
: SHUTDOWN_TIMEOUT_MS;
// Allow extra time for followup drain only when restart will actually
// flush buffered inbound messages into followup queues.
const hasPendingInboundDebounceBuffers = getPendingInboundDebounceBufferCount() > 0;
let forceExitMs = SHUTDOWN_TIMEOUT_MS;
if (isRestart) {
forceExitMs += DRAIN_TIMEOUT_MS;
if (hasPendingInboundDebounceBuffers) {
forceExitMs += FOLLOWUP_DRAIN_TIMEOUT_MS;
}
}
const forceExitTimer = setTimeout(() => {
gatewayLog.error("shutdown timed out; exiting without full cleanup");
// Exit non-zero on restart timeout so launchd/systemd treats it as a