fix: bound restart debounce draining

This commit is contained in:
Joey Krug 2026-03-14 18:06:22 -04:00
parent 0584b8dbc0
commit 324c43931b
4 changed files with 107 additions and 12 deletions

View File

@ -8,7 +8,7 @@ import { resolveGlobalMap } from "../shared/global-singleton.js";
* itself on creation and deregisters after the next global flush sweep.
*/
type DebouncerFlushHandle = {
flushAll: () => Promise<number>;
flushAll: (options?: { deadlineMs?: number }) => Promise<number>;
};
const INBOUND_DEBOUNCERS_KEY = Symbol.for("openclaw.inboundDebouncers");
const INBOUND_DEBOUNCERS = resolveGlobalMap<symbol, DebouncerFlushHandle>(INBOUND_DEBOUNCERS_KEY);
@ -26,15 +26,19 @@ export function clearInboundDebouncerRegistry(): void {
* Returns the number of debounce buffers actually flushed so restart logic can
* skip followup draining when there was no buffered work.
*/
export async function flushAllInboundDebouncers(): Promise<number> {
export async function flushAllInboundDebouncers(options?: { timeoutMs?: number }): Promise<number> {
const entries = [...INBOUND_DEBOUNCERS.entries()];
if (entries.length === 0) {
return 0;
}
const deadlineMs =
typeof options?.timeoutMs === "number" && Number.isFinite(options.timeoutMs)
? Date.now() + Math.max(0, Math.trunc(options.timeoutMs))
: undefined;
const flushedCounts = await Promise.all(
entries.map(async ([key, handle]) => {
try {
return await handle.flushAll();
return await handle.flushAll({ deadlineMs });
} finally {
INBOUND_DEBOUNCERS.delete(key);
}
@ -172,15 +176,21 @@ export function createInboundDebouncer<T>(params: InboundDebounceCreateParams<T>
scheduleFlush(key, buffer);
};
const flushAll = async () => {
const flushAll = async (options?: { deadlineMs?: number }) => {
let flushedBufferCount = 0;
// Keep sweeping until no debounced keys remain. A flush callback can race
// with late in-flight ingress and create another buffered key before the
// global registry deregisters this debouncer during restart.
while (buffers.size > 0) {
if (options?.deadlineMs !== undefined && Date.now() >= options.deadlineMs) {
break;
}
const keys = [...buffers.keys()];
for (const key of keys) {
if (options?.deadlineMs !== undefined && Date.now() >= options.deadlineMs) {
return flushedBufferCount;
}
if (!buffers.has(key)) {
continue;
}

View File

@ -554,6 +554,42 @@ describe("createInboundDebouncer flushAll", () => {
vi.useRealTimers();
});
it("stops sweeping when the global flush deadline is reached", async () => {
vi.useFakeTimers();
const calls: Array<string[]> = [];
let now = 0;
const nowSpy = vi.spyOn(Date, "now").mockImplementation(() => now);
let debouncer: ReturnType<typeof createInboundDebouncer<{ key: string; id: string }>>;
debouncer = createInboundDebouncer<{ key: string; id: string }>({
debounceMs: 5000,
buildKey: (item) => item.key,
onFlush: async (items) => {
calls.push(items.map((entry) => entry.id));
if (items[0]?.id === "1") {
await debouncer.enqueue({ key: "b", id: "2" });
now = 20;
}
},
});
try {
await debouncer.enqueue({ key: "a", id: "1" });
const flushed = await debouncer.flushAll({ deadlineMs: 10 });
expect(flushed).toBe(1);
expect(calls).toEqual([["1"]]);
now = 0;
const flushedLater = await debouncer.flushAll({ deadlineMs: 10 });
expect(flushedLater).toBe(1);
expect(calls).toEqual([["1"], ["2"]]);
} finally {
nowSpy.mockRestore();
vi.useRealTimers();
}
});
});
describe("initSessionState BodyStripped", () => {

View File

@ -50,7 +50,8 @@ const gatewayLog = {
};
vi.mock("../../auto-reply/inbound-debounce.js", () => ({
flushAllInboundDebouncers: () => flushAllInboundDebouncers(),
flushAllInboundDebouncers: (options?: { timeoutMs?: number }) =>
flushAllInboundDebouncers(options),
}));
vi.mock("../../auto-reply/reply/queue/drain-all.js", () => ({
@ -371,6 +372,7 @@ describe("runGatewayLoop", () => {
expect(markGatewayDraining).toHaveBeenCalledTimes(1);
expect(flushAllInboundDebouncers).toHaveBeenCalledTimes(1);
expect(flushAllInboundDebouncers).toHaveBeenCalledWith({ timeoutMs: 10_000 });
expect(waitForFollowupQueueDrain).toHaveBeenCalledWith(5_000);
expect(markGatewayDraining.mock.invocationCallOrder[0]).toBeLessThan(
flushAllInboundDebouncers.mock.invocationCallOrder[0] ?? Number.POSITIVE_INFINITY,
@ -417,7 +419,7 @@ describe("runGatewayLoop", () => {
.map((call) => call[1])
.filter((delay): delay is number => typeof delay === "number" && delay >= 95_000);
expect(forceExitCalls).toContain(95_000);
expect(forceExitCalls.some((delay) => delay > 95_000 && delay <= 100_000)).toBe(true);
expect(forceExitCalls).toContain(100_000);
sigterm();
await expect(exited).resolves.toBe(0);
@ -445,12 +447,13 @@ describe("runGatewayLoop", () => {
await new Promise<void>((resolve) => setImmediate(resolve));
expect(flushAllInboundDebouncers).toHaveBeenCalledTimes(1);
expect(flushAllInboundDebouncers).toHaveBeenCalledWith({ timeoutMs: 10_000 });
expect(waitForFollowupQueueDrain).not.toHaveBeenCalled();
expect(markGatewayDraining).toHaveBeenCalledTimes(1);
const forceExitCalls = setTimeoutSpy.mock.calls
.map((call) => call[1])
.filter((delay): delay is number => typeof delay === "number" && delay >= 95_000);
expect(forceExitCalls).toEqual([95_000]);
expect(forceExitCalls).toEqual([95_000, 95_000]);
sigterm();
await expect(exited).resolves.toBe(0);
@ -488,6 +491,42 @@ describe("runGatewayLoop", () => {
});
});
it("re-arms the restart watchdog after a slow debounce flush", async () => {
vi.clearAllMocks();
await withIsolatedSignals(async ({ captureSignal }) => {
let now = 1000;
const nowSpy = vi.spyOn(Date, "now").mockImplementation(() => now);
flushAllInboundDebouncers.mockImplementationOnce(async () => {
now += 20_000;
return 0;
});
const setTimeoutSpy = vi.spyOn(globalThis, "setTimeout");
try {
const { exited } = await createSignaledLoopHarness();
const sigusr1 = captureSignal("SIGUSR1");
const sigterm = captureSignal("SIGTERM");
sigusr1();
await new Promise<void>((resolve) => setImmediate(resolve));
await new Promise<void>((resolve) => setImmediate(resolve));
const forceExitCalls = setTimeoutSpy.mock.calls
.map((call) => call[1])
.filter((delay): delay is number => typeof delay === "number" && delay >= 95_000);
expect(forceExitCalls).toEqual([95_000, 95_000]);
sigterm();
await expect(exited).resolves.toBe(0);
} finally {
nowSpy.mockRestore();
setTimeoutSpy.mockRestore();
}
});
});
it("releases the lock before exiting on spawned restart", async () => {
vi.clearAllMocks();

View File

@ -100,6 +100,7 @@ export async function runGatewayLoop(params: {
const DRAIN_TIMEOUT_MS = 90_000;
const FOLLOWUP_DRAIN_TIMEOUT_MS = 5_000;
const INBOUND_DEBOUNCE_FLUSH_TIMEOUT_MS = 10_000;
const SHUTDOWN_TIMEOUT_MS = 5_000;
const request = (action: GatewayRunSignalAction, signal: string) => {
@ -111,9 +112,8 @@ export async function runGatewayLoop(params: {
const isRestart = action === "restart";
gatewayLog.info(`received ${signal}; ${isRestart ? "restarting" : "shutting down"}`);
const signalStartMs = Date.now();
const baseForceExitDeadlineMs =
signalStartMs + SHUTDOWN_TIMEOUT_MS + (isRestart ? DRAIN_TIMEOUT_MS : 0);
Date.now() + SHUTDOWN_TIMEOUT_MS + (isRestart ? DRAIN_TIMEOUT_MS : 0);
let forceExitTimer: ReturnType<typeof setTimeout> | null = null;
const armForceExitTimer = (deadlineMs: number) => {
if (forceExitTimer) {
@ -140,21 +140,31 @@ export async function runGatewayLoop(params: {
if (isRestart) {
// Reject new command-queue work before any awaited restart drain
// step so late arrivals fail explicitly instead of being stranded
// behind a one-shot debounce flush.
// behind a one-shot debounce flush. This does not block followup
// queue enqueues, so flushed inbound work can still drain normally.
markGatewayDraining();
// Flush inbound debounce buffers first. This pushes any messages
// waiting in per-channel debounce timers (e.g. the 2500ms collect
// window) into the followup queues immediately, preventing silent
// message loss when the server reinitializes.
const flushedBuffers = await flushAllInboundDebouncers();
const flushedBuffers = await flushAllInboundDebouncers({
timeoutMs: INBOUND_DEBOUNCE_FLUSH_TIMEOUT_MS,
});
// Start the restart watchdog budget after the pre-shutdown debounce
// flush so slow flush handlers do not steal time from active drain.
armForceExitTimer(
Date.now() +
SHUTDOWN_TIMEOUT_MS +
DRAIN_TIMEOUT_MS +
(flushedBuffers > 0 ? FOLLOWUP_DRAIN_TIMEOUT_MS : 0),
);
if (flushedBuffers > 0) {
gatewayLog.info(
`flushed ${flushedBuffers} pending inbound debounce buffer(s) before restart`,
);
// Give the followup queue drain loops a short window to process
// the newly flushed items before the server is torn down.
armForceExitTimer(baseForceExitDeadlineMs + FOLLOWUP_DRAIN_TIMEOUT_MS);
const followupResult = await waitForFollowupQueueDrain(FOLLOWUP_DRAIN_TIMEOUT_MS);
if (followupResult.drained) {
gatewayLog.info("followup queues drained after debounce flush");