Gateway: finish drain-queue restart review fixes

This commit is contained in:
Joey Krug 2026-03-14 16:39:24 -04:00
parent 070fc040b2
commit 23731ed099
5 changed files with 99 additions and 68 deletions

View File

@ -712,7 +712,7 @@ describe("Feishu inbound debounce regressions", () => {
params.onError?.(new Error("dispatch failed"), [item]);
},
flushKey: async (_key: string) => {},
flushAll: async () => {},
flushAll: async () => 0,
}),
resolveInboundDebounceMs,
},

View File

@ -8,8 +8,7 @@ import { resolveGlobalMap } from "../shared/global-singleton.js";
* itself on creation and deregisters after the next global flush sweep.
*/
type DebouncerFlushHandle = {
getPendingBufferCount: () => number;
flushAll: () => Promise<void>;
flushAll: () => Promise<number>;
};
const INBOUND_DEBOUNCERS_KEY = Symbol.for("openclaw.inboundDebouncers");
const INBOUND_DEBOUNCERS = resolveGlobalMap<symbol, DebouncerFlushHandle>(INBOUND_DEBOUNCERS_KEY);
@ -21,35 +20,27 @@ export function clearInboundDebouncerRegistry(): void {
INBOUND_DEBOUNCERS.clear();
}
export function getPendingInboundDebounceBufferCount(): number {
return [...INBOUND_DEBOUNCERS.values()].reduce(
(count, handle) => count + handle.getPendingBufferCount(),
0,
);
}
/**
* Flush all registered inbound debouncers immediately. Called during SIGUSR1
* restart to push buffered messages into the session before reinitializing.
* Returns the number of pending debounce buffers that existed when the flush
* started so restart logic can skip followup draining when there was no work.
* Returns the number of debounce buffers actually flushed so restart logic can
* skip followup draining when there was no buffered work.
*/
export async function flushAllInboundDebouncers(): Promise<number> {
const entries = [...INBOUND_DEBOUNCERS.entries()];
if (entries.length === 0) {
return 0;
}
const pendingBufferCount = getPendingInboundDebounceBufferCount();
await Promise.all(
const flushedCounts = await Promise.all(
entries.map(async ([key, handle]) => {
try {
await handle.flushAll();
return await handle.flushAll();
} finally {
INBOUND_DEBOUNCERS.delete(key);
}
}),
);
return pendingBufferCount;
return flushedCounts.reduce((total, count) => total + count, 0);
}
const resolveMs = (value: unknown): number | undefined => {
@ -180,16 +171,28 @@ export function createInboundDebouncer<T>(params: InboundDebounceCreateParams<T>
};
const flushAll = async () => {
const keys = [...buffers.keys()];
for (const key of keys) {
await flushKey(key);
let flushedBufferCount = 0;
// Keep sweeping until no debounced keys remain. A flush callback can race
// with late in-flight ingress and create another buffered key before the
// global registry deregisters this debouncer during restart.
while (buffers.size > 0) {
const keys = [...buffers.keys()];
for (const key of keys) {
if (!buffers.has(key)) {
continue;
}
await flushKey(key);
flushedBufferCount += 1;
}
}
return flushedBufferCount;
};
// Register in global registry for SIGUSR1 flush.
const registryKey = Symbol();
INBOUND_DEBOUNCERS.set(registryKey, {
getPendingBufferCount: () => buffers.size,
flushAll,
});

View File

@ -439,6 +439,34 @@ describe("flushAllInboundDebouncers", () => {
vi.useRealTimers();
});
it("keeps flushing until no buffered keys remain", async () => {
vi.useFakeTimers();
const calls: Array<string[]> = [];
let enqueuedDuringFlush = false;
let debouncer: ReturnType<typeof createInboundDebouncer<{ key: string; id: string }>>;
debouncer = createInboundDebouncer<{ key: string; id: string }>({
debounceMs: 5000,
buildKey: (item) => item.key,
onFlush: async (items) => {
calls.push(items.map((entry) => entry.id));
if (!enqueuedDuringFlush) {
enqueuedDuringFlush = true;
await debouncer.enqueue({ key: "session-2", id: "msg-2" });
}
},
});
await debouncer.enqueue({ key: "session-1", id: "msg-1" });
const flushed = await flushAllInboundDebouncers();
expect(flushed).toBe(2);
expect(calls).toEqual([["msg-1"], ["msg-2"]]);
await expect(flushAllInboundDebouncers()).resolves.toBe(0);
vi.useRealTimers();
});
it("returns 0 when no debouncers are registered", async () => {
const flushed = await flushAllInboundDebouncers();
expect(flushed).toBe(0);

View File

@ -37,7 +37,6 @@ const getActiveEmbeddedRunCount = vi.fn(() => 0);
const waitForActiveEmbeddedRuns = vi.fn(async (_timeoutMs: number) => ({
drained: true,
}));
const getPendingInboundDebounceBufferCount = vi.fn(() => 0);
const flushAllInboundDebouncers = vi.fn(async () => 0);
const waitForFollowupQueueDrain = vi.fn(async (_timeoutMs: number) => ({
drained: true,
@ -51,7 +50,6 @@ const gatewayLog = {
};
vi.mock("../../auto-reply/inbound-debounce.js", () => ({
getPendingInboundDebounceBufferCount: () => getPendingInboundDebounceBufferCount(),
flushAllInboundDebouncers: () => flushAllInboundDebouncers(),
}));
@ -352,12 +350,10 @@ describe("runGatewayLoop", () => {
});
});
it("flushes inbound debouncers and waits for followup queue drain before marking gateway draining on SIGUSR1", async () => {
it("marks gateway draining before flushing inbound debouncers on SIGUSR1", async () => {
vi.clearAllMocks();
await withIsolatedSignals(async ({ captureSignal }) => {
// Simulate debouncers having buffered messages
getPendingInboundDebounceBufferCount.mockReturnValueOnce(2);
flushAllInboundDebouncers.mockResolvedValueOnce(2);
waitForFollowupQueueDrain.mockResolvedValueOnce({
drained: true,
@ -370,22 +366,22 @@ describe("runGatewayLoop", () => {
sigusr1();
// Let the restart complete
await new Promise<void>((resolve) => setImmediate(resolve));
await new Promise<void>((resolve) => setImmediate(resolve));
// Verify flush was called before markGatewayDraining
expect(markGatewayDraining).toHaveBeenCalledTimes(1);
expect(flushAllInboundDebouncers).toHaveBeenCalledTimes(1);
expect(waitForFollowupQueueDrain).toHaveBeenCalledWith(5_000);
expect(markGatewayDraining).toHaveBeenCalledTimes(1);
expect(markGatewayDraining.mock.invocationCallOrder[0]).toBeLessThan(
flushAllInboundDebouncers.mock.invocationCallOrder[0] ?? Number.POSITIVE_INFINITY,
);
expect(markGatewayDraining.mock.invocationCallOrder[0]).toBeLessThan(
waitForFollowupQueueDrain.mock.invocationCallOrder[0] ?? Number.POSITIVE_INFINITY,
);
expect(flushAllInboundDebouncers.mock.invocationCallOrder[0]).toBeLessThan(
waitForFollowupQueueDrain.mock.invocationCallOrder[0] ?? Number.POSITIVE_INFINITY,
);
expect(waitForFollowupQueueDrain.mock.invocationCallOrder[0]).toBeLessThan(
markGatewayDraining.mock.invocationCallOrder[0] ?? Number.POSITIVE_INFINITY,
);
// Verify logging
expect(gatewayLog.info).toHaveBeenCalledWith(
"flushed 2 pending inbound debounce buffer(s) before restart",
);
@ -400,7 +396,6 @@ describe("runGatewayLoop", () => {
vi.clearAllMocks();
await withIsolatedSignals(async ({ captureSignal }) => {
getPendingInboundDebounceBufferCount.mockReturnValueOnce(1);
flushAllInboundDebouncers.mockResolvedValueOnce(1);
waitForFollowupQueueDrain.mockResolvedValueOnce({
drained: true,
@ -418,8 +413,11 @@ describe("runGatewayLoop", () => {
await new Promise<void>((resolve) => setImmediate(resolve));
await new Promise<void>((resolve) => setImmediate(resolve));
const forceExitCall = setTimeoutSpy.mock.calls.find((call) => call[1] === 100_000);
expect(forceExitCall).toBeDefined();
const forceExitCalls = setTimeoutSpy.mock.calls
.map((call) => call[1])
.filter((delay): delay is number => typeof delay === "number" && delay >= 95_000);
expect(forceExitCalls).toContain(95_000);
expect(forceExitCalls.some((delay) => delay > 95_000 && delay <= 100_000)).toBe(true);
sigterm();
await expect(exited).resolves.toBe(0);
@ -433,8 +431,6 @@ describe("runGatewayLoop", () => {
vi.clearAllMocks();
await withIsolatedSignals(async ({ captureSignal }) => {
// No debouncers had buffered messages
getPendingInboundDebounceBufferCount.mockReturnValueOnce(0);
flushAllInboundDebouncers.mockResolvedValueOnce(0);
const setTimeoutSpy = vi.spyOn(globalThis, "setTimeout");
@ -449,12 +445,12 @@ describe("runGatewayLoop", () => {
await new Promise<void>((resolve) => setImmediate(resolve));
expect(flushAllInboundDebouncers).toHaveBeenCalledTimes(1);
// Should NOT wait for followup drain when nothing was flushed
expect(waitForFollowupQueueDrain).not.toHaveBeenCalled();
// Should still mark draining
expect(markGatewayDraining).toHaveBeenCalledTimes(1);
const forceExitCall = setTimeoutSpy.mock.calls.find((call) => call[1] === 95_000);
expect(forceExitCall).toBeDefined();
const forceExitCalls = setTimeoutSpy.mock.calls
.map((call) => call[1])
.filter((delay): delay is number => typeof delay === "number" && delay >= 95_000);
expect(forceExitCalls).toEqual([95_000]);
sigterm();
await expect(exited).resolves.toBe(0);
@ -468,7 +464,6 @@ describe("runGatewayLoop", () => {
vi.clearAllMocks();
await withIsolatedSignals(async ({ captureSignal }) => {
getPendingInboundDebounceBufferCount.mockReturnValueOnce(1);
flushAllInboundDebouncers.mockResolvedValueOnce(1);
waitForFollowupQueueDrain.mockResolvedValueOnce({
drained: false,

View File

@ -3,10 +3,7 @@ import {
getActiveEmbeddedRunCount,
waitForActiveEmbeddedRuns,
} from "../../agents/pi-embedded-runner/runs.js";
import {
flushAllInboundDebouncers,
getPendingInboundDebounceBufferCount,
} from "../../auto-reply/inbound-debounce.js";
import { flushAllInboundDebouncers } from "../../auto-reply/inbound-debounce.js";
import { waitForFollowupQueueDrain } from "../../auto-reply/reply/queue/drain-all.js";
import type { startGatewayServer } from "../../gateway/server.js";
import { acquireGatewayLock } from "../../infra/gateway-lock.js";
@ -114,29 +111,38 @@ export async function runGatewayLoop(params: {
const isRestart = action === "restart";
gatewayLog.info(`received ${signal}; ${isRestart ? "restarting" : "shutting down"}`);
// Allow extra time for followup drain only when restart will actually
// flush buffered inbound messages into followup queues.
const hasPendingInboundDebounceBuffers = getPendingInboundDebounceBufferCount() > 0;
let forceExitMs = SHUTDOWN_TIMEOUT_MS;
if (isRestart) {
forceExitMs += DRAIN_TIMEOUT_MS;
if (hasPendingInboundDebounceBuffers) {
forceExitMs += FOLLOWUP_DRAIN_TIMEOUT_MS;
const signalStartMs = Date.now();
const baseForceExitDeadlineMs =
signalStartMs + SHUTDOWN_TIMEOUT_MS + (isRestart ? DRAIN_TIMEOUT_MS : 0);
let forceExitTimer: ReturnType<typeof setTimeout> | null = null;
const armForceExitTimer = (deadlineMs: number) => {
if (forceExitTimer) {
clearTimeout(forceExitTimer);
}
}
const forceExitTimer = setTimeout(() => {
gatewayLog.error("shutdown timed out; exiting without full cleanup");
// Exit non-zero on restart timeout so launchd/systemd treats it as a
// failure and triggers a clean process restart instead of assuming the
// shutdown was intentional. Stop-timeout stays at 0 (graceful). (#36822)
exitProcess(isRestart ? 1 : 0);
}, forceExitMs);
forceExitTimer = setTimeout(
() => {
gatewayLog.error("shutdown timed out; exiting without full cleanup");
// Exit non-zero on restart timeout so launchd/systemd treats it as a
// failure and triggers a clean process restart instead of assuming the
// shutdown was intentional. Stop-timeout stays at 0 (graceful). (#36822)
exitProcess(isRestart ? 1 : 0);
},
Math.max(0, deadlineMs - Date.now()),
);
forceExitTimer.unref?.();
};
armForceExitTimer(baseForceExitDeadlineMs);
void (async () => {
try {
// On restart, wait for in-flight agent turns to finish before
// tearing down the server so buffered messages are delivered.
if (isRestart) {
// Reject new command-queue work before any awaited restart drain
// step so late arrivals fail explicitly instead of being stranded
// behind a one-shot debounce flush.
markGatewayDraining();
// Flush inbound debounce buffers first. This pushes any messages
// waiting in per-channel debounce timers (e.g. the 2500ms collect
// window) into the followup queues immediately, preventing silent
@ -147,7 +153,8 @@ export async function runGatewayLoop(params: {
`flushed ${flushedBuffers} pending inbound debounce buffer(s) before restart`,
);
// Give the followup queue drain loops a short window to process
// the newly flushed items before we mark the gateway as draining.
// the newly flushed items before the server is torn down.
armForceExitTimer(baseForceExitDeadlineMs + FOLLOWUP_DRAIN_TIMEOUT_MS);
const followupResult = await waitForFollowupQueueDrain(FOLLOWUP_DRAIN_TIMEOUT_MS);
if (followupResult.drained) {
gatewayLog.info("followup queues drained after debounce flush");
@ -157,10 +164,6 @@ export async function runGatewayLoop(params: {
);
}
}
// Reject new enqueues immediately during the drain window so
// sessions get an explicit restart error instead of silent task loss.
markGatewayDraining();
const activeTasks = getActiveTaskCount();
const activeRuns = getActiveEmbeddedRunCount();
@ -200,7 +203,9 @@ export async function runGatewayLoop(params: {
} catch (err) {
gatewayLog.error(`shutdown error: ${String(err)}`);
} finally {
clearTimeout(forceExitTimer);
if (forceExitTimer) {
clearTimeout(forceExitTimer);
}
server = null;
if (isRestart) {
await handleRestartAfterServerClose();