fix: address queue drain review feedback

This commit is contained in:
Joey Krug 2026-03-14 15:16:48 -04:00
parent 41b6372ec0
commit 9b19b945d7
7 changed files with 111 additions and 21 deletions

View File

@ -711,7 +711,8 @@ describe("Feishu inbound debounce regressions", () => {
enqueueMock(item);
params.onError?.(new Error("dispatch failed"), [item]);
},
flushKey: async () => {},
flushKey: async (_key: string) => {},
flushAll: async () => {},
}),
resolveInboundDebounceMs,
},

View File

@ -5,9 +5,12 @@ import { resolveGlobalMap } from "../shared/global-singleton.js";
/**
* Global registry of all active inbound debouncers so they can be flushed
* collectively during gateway restart (SIGUSR1). Each debouncer registers
* itself on creation and deregisters when flushAll is called.
* itself on creation and deregisters after the next global flush sweep.
*/
type DebouncerFlushHandle = { flushAll: () => Promise<void> };
type DebouncerFlushHandle = {
getPendingBufferCount: () => number;
flushAll: () => Promise<void>;
};
const INBOUND_DEBOUNCERS_KEY = Symbol.for("openclaw.inboundDebouncers");
const INBOUND_DEBOUNCERS = resolveGlobalMap<symbol, DebouncerFlushHandle>(INBOUND_DEBOUNCERS_KEY);
@ -21,24 +24,28 @@ export function clearInboundDebouncerRegistry(): void {
/**
* Flush all registered inbound debouncers immediately. Called during SIGUSR1
* restart to push buffered messages into the session before reinitializing.
* Returns the number of pending debounce buffers that existed when the flush
* started so restart logic can skip followup draining when there was no work.
*/
export async function flushAllInboundDebouncers(): Promise<number> {
const entries = [...INBOUND_DEBOUNCERS.entries()];
if (entries.length === 0) {
return 0;
}
let flushedCount = 0;
const pendingBufferCount = entries.reduce(
(count, [, handle]) => count + handle.getPendingBufferCount(),
0,
);
await Promise.all(
entries.map(async ([key, handle]) => {
try {
await handle.flushAll();
flushedCount += 1;
} finally {
INBOUND_DEBOUNCERS.delete(key);
}
}),
);
return flushedCount;
return pendingBufferCount;
}
const resolveMs = (value: unknown): number | undefined => {
@ -177,7 +184,10 @@ export function createInboundDebouncer<T>(params: InboundDebounceCreateParams<T>
// Register in global registry for SIGUSR1 flush.
const registryKey = Symbol();
INBOUND_DEBOUNCERS.set(registryKey, { flushAll });
INBOUND_DEBOUNCERS.set(registryKey, {
getPendingBufferCount: () => buffers.size,
flushAll,
});
return { enqueue, flushKey, flushAll };
}

View File

@ -372,7 +372,7 @@ describe("flushAllInboundDebouncers", () => {
clearInboundDebouncerRegistry();
});
it("flushes all registered debouncers immediately", async () => {
it("flushes all pending inbound debounce buffers immediately", async () => {
vi.useFakeTimers();
const callsA: Array<string[]> = [];
const callsB: Array<string[]> = [];
@ -409,6 +409,36 @@ describe("flushAllInboundDebouncers", () => {
vi.useRealTimers();
});
it("counts pending buffers instead of registered debouncers", async () => {
vi.useFakeTimers();
const calls: Array<string[]> = [];
const activeDebouncer = createInboundDebouncer<{ key: string; id: string }>({
debounceMs: 5000,
buildKey: (item) => item.key,
onFlush: async (items) => {
calls.push(items.map((entry) => entry.id));
},
});
createInboundDebouncer<{ key: string; id: string }>({
debounceMs: 5000,
buildKey: (item) => item.key,
onFlush: async () => {},
});
await activeDebouncer.enqueue({ key: "session-1", id: "msg-1" });
await activeDebouncer.enqueue({ key: "session-2", id: "msg-2" });
const flushed = await flushAllInboundDebouncers();
expect(flushed).toBe(2);
expect(calls).toHaveLength(2);
expect(calls).toContainEqual(["msg-1"]);
expect(calls).toContainEqual(["msg-2"]);
vi.useRealTimers();
});
it("returns 0 when no debouncers are registered", async () => {
const flushed = await flushAllInboundDebouncers();
expect(flushed).toBe(0);

View File

@ -77,4 +77,13 @@ describe("waitForFollowupQueueDrain", () => {
expect(result.drained).toBe(false);
expect(result.remaining).toBeGreaterThanOrEqual(1);
});
it("reports each draining queue in the timeout remaining count", async () => {
FOLLOWUP_QUEUES.set("queue-1", createMockQueue({ draining: true }));
FOLLOWUP_QUEUES.set("queue-2", createMockQueue({ draining: true }));
FOLLOWUP_QUEUES.set("queue-3", createMockQueue({ draining: true }));
const result = await waitForFollowupQueueDrain(1);
expect(result).toEqual({ drained: false, remaining: 3 });
});
});

View File

@ -17,13 +17,9 @@ export async function waitForFollowupQueueDrain(
const getPendingCount = (): number => {
let total = 0;
for (const queue of FOLLOWUP_QUEUES.values()) {
total += queue.items.length;
if (queue.draining) {
// Count draining queues as having at least 1 pending item so we keep
// waiting for the drain loop to finish even if items.length hits 0
// momentarily between shifts.
total = Math.max(total, 1);
}
// Add 1 for the in-flight item owned by an active drain loop.
const queuePending = queue.items.length + (queue.draining ? 1 : 0);
total += queuePending;
}
return total;
};

View File

@ -375,9 +375,17 @@ describe("runGatewayLoop", () => {
expect(flushAllInboundDebouncers).toHaveBeenCalledTimes(1);
expect(waitForFollowupQueueDrain).toHaveBeenCalledWith(5_000);
expect(markGatewayDraining).toHaveBeenCalledTimes(1);
expect(flushAllInboundDebouncers.mock.invocationCallOrder[0]).toBeLessThan(
waitForFollowupQueueDrain.mock.invocationCallOrder[0] ?? Number.POSITIVE_INFINITY,
);
expect(waitForFollowupQueueDrain.mock.invocationCallOrder[0]).toBeLessThan(
markGatewayDraining.mock.invocationCallOrder[0] ?? Number.POSITIVE_INFINITY,
);
// Verify logging
expect(gatewayLog.info).toHaveBeenCalledWith("flushed 2 inbound debouncer(s) before restart");
expect(gatewayLog.info).toHaveBeenCalledWith(
"flushed 2 pending inbound debounce buffer(s) before restart",
);
expect(gatewayLog.info).toHaveBeenCalledWith("followup queues drained after debounce flush");
sigterm();
@ -385,6 +393,38 @@ describe("runGatewayLoop", () => {
});
});
it("extends the restart force-exit timer to include followup queue drain time", async () => {
vi.clearAllMocks();
await withIsolatedSignals(async ({ captureSignal }) => {
flushAllInboundDebouncers.mockResolvedValueOnce(1);
waitForFollowupQueueDrain.mockResolvedValueOnce({
drained: true,
remaining: 0,
});
const setTimeoutSpy = vi.spyOn(globalThis, "setTimeout");
try {
const { exited } = await createSignaledLoopHarness();
const sigusr1 = captureSignal("SIGUSR1");
const sigterm = captureSignal("SIGTERM");
sigusr1();
await new Promise<void>((resolve) => setImmediate(resolve));
await new Promise<void>((resolve) => setImmediate(resolve));
const forceExitCall = setTimeoutSpy.mock.calls.find((call) => call[1] === 100_000);
expect(forceExitCall).toBeDefined();
sigterm();
await expect(exited).resolves.toBe(0);
} finally {
setTimeoutSpy.mockRestore();
}
});
});
it("skips followup queue drain when no debouncers had buffered messages", async () => {
vi.clearAllMocks();

View File

@ -99,6 +99,7 @@ export async function runGatewayLoop(params: {
};
const DRAIN_TIMEOUT_MS = 90_000;
const FOLLOWUP_DRAIN_TIMEOUT_MS = 5_000;
const SHUTDOWN_TIMEOUT_MS = 5_000;
const request = (action: GatewayRunSignalAction, signal: string) => {
@ -111,7 +112,9 @@ export async function runGatewayLoop(params: {
gatewayLog.info(`received ${signal}; ${isRestart ? "restarting" : "shutting down"}`);
// Allow extra time for draining active turns on restart.
const forceExitMs = isRestart ? DRAIN_TIMEOUT_MS + SHUTDOWN_TIMEOUT_MS : SHUTDOWN_TIMEOUT_MS;
const forceExitMs = isRestart
? DRAIN_TIMEOUT_MS + FOLLOWUP_DRAIN_TIMEOUT_MS + SHUTDOWN_TIMEOUT_MS
: SHUTDOWN_TIMEOUT_MS;
const forceExitTimer = setTimeout(() => {
gatewayLog.error("shutdown timed out; exiting without full cleanup");
// Exit non-zero on restart timeout so launchd/systemd treats it as a
@ -129,12 +132,13 @@ export async function runGatewayLoop(params: {
// waiting in per-channel debounce timers (e.g. the 2500ms collect
// window) into the followup queues immediately, preventing silent
// message loss when the server reinitializes.
const flushedDebouncers = await flushAllInboundDebouncers();
if (flushedDebouncers > 0) {
gatewayLog.info(`flushed ${flushedDebouncers} inbound debouncer(s) before restart`);
const flushedBuffers = await flushAllInboundDebouncers();
if (flushedBuffers > 0) {
gatewayLog.info(
`flushed ${flushedBuffers} pending inbound debounce buffer(s) before restart`,
);
// Give the followup queue drain loops a short window to process
// the newly flushed items before we mark the gateway as draining.
const FOLLOWUP_DRAIN_TIMEOUT_MS = 5_000;
const followupResult = await waitForFollowupQueueDrain(FOLLOWUP_DRAIN_TIMEOUT_MS);
if (followupResult.drained) {
gatewayLog.info("followup queues drained after debounce flush");