fix: drain inbound debounce buffer and followup queues before SIGUSR1 reload

When config.patch triggers a SIGUSR1 restart, two in-memory message
buffers were silently wiped:
1. Per-channel inbound debounce buffers (closure-local Map + setTimeout)
2. Followup queues (global Map of pending session messages)

This caused inbound messages received during the debounce window to be
permanently lost on config-triggered gateway restarts.

Fix:
- Add a global registry of inbound debouncers so they can be flushed
  collectively during restart. Each createInboundDebouncer() call now
  auto-registers in a shared Symbol.for() map, with a new flushAll()
  method that immediately processes all buffered items.
- Add flushAllInboundDebouncers() which iterates the global registry
  and forces all debounce timers to fire immediately.
- Add waitForFollowupQueueDrain() which polls the FOLLOWUP_QUEUES map
  until all queues finish processing (or timeout).
- Hook both into the SIGUSR1 restart flow in run-loop.ts: before
  markGatewayDraining(), flush all debouncers first (pushing buffered
  messages into the followup queues), then wait up to 5s for the
  followup drain loops to process them.

The ordering is critical: flush debouncers → wait for followup drain →
then mark draining. This ensures messages that were mid-debounce get
delivered to sessions before the gateway reinitializes.

Tests:
- flushAllInboundDebouncers: flushes multiple registered debouncers,
  returns count, deregisters after flush
- createInboundDebouncer.flushAll: flushes all keys in a single debouncer
- waitForFollowupQueueDrain: immediate return when empty, waits for
  drain, returns not-drained on timeout, counts draining queues
- run-loop: SIGUSR1 calls flush before markGatewayDraining, skips
  followup wait when no debouncers had buffered messages, logs warning
  on followup drain timeout
This commit is contained in:
Joey Krug 2026-03-14 11:54:01 -04:00
parent 0a842de354
commit 41b6372ec0
7 changed files with 448 additions and 13 deletions

View File

@ -1,5 +1,45 @@
import type { OpenClawConfig } from "../config/config.js";
import type { InboundDebounceByProvider } from "../config/types.messages.js";
import { resolveGlobalMap } from "../shared/global-singleton.js";
/**
* Global registry of all active inbound debouncers so they can be flushed
* collectively during gateway restart (SIGUSR1). Each debouncer registers
* itself on creation and deregisters when flushAll is called.
*/
type DebouncerFlushHandle = { flushAll: () => Promise<void> };
const INBOUND_DEBOUNCERS_KEY = Symbol.for("openclaw.inboundDebouncers");
const INBOUND_DEBOUNCERS = resolveGlobalMap<symbol, DebouncerFlushHandle>(INBOUND_DEBOUNCERS_KEY);
/**
* Clear the global debouncer registry. Intended for test cleanup only.
*/
export function clearInboundDebouncerRegistry(): void {
INBOUND_DEBOUNCERS.clear();
}
/**
* Flush all registered inbound debouncers immediately. Called during SIGUSR1
* restart to push buffered messages into the session before reinitializing.
*/
export async function flushAllInboundDebouncers(): Promise<number> {
const entries = [...INBOUND_DEBOUNCERS.entries()];
if (entries.length === 0) {
return 0;
}
let flushedCount = 0;
await Promise.all(
entries.map(async ([key, handle]) => {
try {
await handle.flushAll();
flushedCount += 1;
} finally {
INBOUND_DEBOUNCERS.delete(key);
}
}),
);
return flushedCount;
}
const resolveMs = (value: unknown): number | undefined => {
if (typeof value !== "number" || !Number.isFinite(value)) {
@ -119,10 +159,25 @@ export function createInboundDebouncer<T>(params: InboundDebounceCreateParams<T>
return;
}
const buffer: DebounceBuffer<T> = { items: [item], timeout: null, debounceMs };
const buffer: DebounceBuffer<T> = {
items: [item],
timeout: null,
debounceMs,
};
buffers.set(key, buffer);
scheduleFlush(key, buffer);
};
return { enqueue, flushKey };
const flushAll = async () => {
const keys = [...buffers.keys()];
for (const key of keys) {
await flushKey(key);
}
};
// Register in global registry for SIGUSR1 flush.
const registryKey = Symbol();
INBOUND_DEBOUNCERS.set(registryKey, { flushAll });
return { enqueue, flushKey, flushAll };
}

View File

@ -1,10 +1,14 @@
import fs from "node:fs/promises";
import os from "node:os";
import path from "node:path";
import { describe, expect, it, vi } from "vitest";
import { afterEach, beforeEach, describe, expect, it, vi } from "vitest";
import type { OpenClawConfig } from "../config/config.js";
import type { GroupKeyResolution } from "../config/sessions.js";
import { createInboundDebouncer } from "./inbound-debounce.js";
import {
clearInboundDebouncerRegistry,
createInboundDebouncer,
flushAllInboundDebouncers,
} from "./inbound-debounce.js";
import { resolveGroupRequireMention } from "./reply/groups.js";
import { finalizeInboundContext } from "./reply/inbound-context.js";
import {
@ -308,7 +312,11 @@ describe("createInboundDebouncer", () => {
vi.useFakeTimers();
const calls: Array<string[]> = [];
const debouncer = createInboundDebouncer<{ key: string; id: string; debounce: boolean }>({
const debouncer = createInboundDebouncer<{
key: string;
id: string;
debounce: boolean;
}>({
debounceMs: 50,
buildKey: (item) => item.key,
shouldDebounce: (item) => item.debounce,
@ -329,7 +337,11 @@ describe("createInboundDebouncer", () => {
vi.useFakeTimers();
const calls: Array<string[]> = [];
const debouncer = createInboundDebouncer<{ key: string; id: string; windowMs: number }>({
const debouncer = createInboundDebouncer<{
key: string;
id: string;
windowMs: number;
}>({
debounceMs: 0,
buildKey: (item) => item.key,
resolveDebounceMs: (item) => item.windowMs,
@ -349,6 +361,108 @@ describe("createInboundDebouncer", () => {
});
});
describe("flushAllInboundDebouncers", () => {
// Clear registry before each test to avoid leaking state from other tests
// that create debouncers.
beforeEach(() => {
clearInboundDebouncerRegistry();
});
afterEach(() => {
clearInboundDebouncerRegistry();
});
it("flushes all registered debouncers immediately", async () => {
vi.useFakeTimers();
const callsA: Array<string[]> = [];
const callsB: Array<string[]> = [];
const debouncerA = createInboundDebouncer<{ key: string; id: string }>({
debounceMs: 5000,
buildKey: (item) => item.key,
onFlush: async (items) => {
callsA.push(items.map((entry) => entry.id));
},
});
const debouncerB = createInboundDebouncer<{ key: string; id: string }>({
debounceMs: 5000,
buildKey: (item) => item.key,
onFlush: async (items) => {
callsB.push(items.map((entry) => entry.id));
},
});
await debouncerA.enqueue({ key: "session-1", id: "msg-1" });
await debouncerA.enqueue({ key: "session-1", id: "msg-2" });
await debouncerB.enqueue({ key: "session-2", id: "msg-3" });
// Nothing flushed yet (timers haven't fired)
expect(callsA).toEqual([]);
expect(callsB).toEqual([]);
const flushed = await flushAllInboundDebouncers();
expect(flushed).toBe(2);
expect(callsA).toEqual([["msg-1", "msg-2"]]);
expect(callsB).toEqual([["msg-3"]]);
vi.useRealTimers();
});
it("returns 0 when no debouncers are registered", async () => {
const flushed = await flushAllInboundDebouncers();
expect(flushed).toBe(0);
});
it("deregisters debouncers from global registry after flush", async () => {
vi.useFakeTimers();
createInboundDebouncer<{ key: string; id: string }>({
debounceMs: 5000,
buildKey: (item) => item.key,
onFlush: async () => {},
});
// First flush deregisters
await flushAllInboundDebouncers();
// Second flush should find nothing
const flushed = await flushAllInboundDebouncers();
expect(flushed).toBe(0);
vi.useRealTimers();
});
});
describe("createInboundDebouncer flushAll", () => {
it("flushes all buffered keys", async () => {
vi.useFakeTimers();
const calls: Array<string[]> = [];
const debouncer = createInboundDebouncer<{ key: string; id: string }>({
debounceMs: 5000,
buildKey: (item) => item.key,
onFlush: async (items) => {
calls.push(items.map((entry) => entry.id));
},
});
await debouncer.enqueue({ key: "a", id: "1" });
await debouncer.enqueue({ key: "b", id: "2" });
await debouncer.enqueue({ key: "a", id: "3" });
expect(calls).toEqual([]);
await debouncer.flushAll();
// Both keys flushed
expect(calls).toHaveLength(2);
expect(calls).toContainEqual(["1", "3"]);
expect(calls).toContainEqual(["2"]);
vi.useRealTimers();
});
});
describe("initSessionState BodyStripped", () => {
it("prefers BodyForAgent over Body for group chats", async () => {
const root = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-sender-meta-"));

View File

@ -1,7 +1,8 @@
export { extractQueueDirective } from "./queue/directive.js";
export { clearSessionQueues } from "./queue/cleanup.js";
export type { ClearSessionQueueResult } from "./queue/cleanup.js";
export { clearSessionQueues } from "./queue/cleanup.js";
export { extractQueueDirective } from "./queue/directive.js";
export { scheduleFollowupDrain } from "./queue/drain.js";
export { waitForFollowupQueueDrain } from "./queue/drain-all.js";
export {
enqueueFollowupRun,
getFollowupQueueDepth,

View File

@ -0,0 +1,80 @@
import { afterEach, describe, expect, it, vi } from "vitest";
import { waitForFollowupQueueDrain } from "./drain-all.js";
import { FOLLOWUP_QUEUES, type FollowupQueueState } from "./state.js";
function createMockQueue(overrides: Partial<FollowupQueueState> = {}): FollowupQueueState {
return {
items: [],
draining: false,
lastEnqueuedAt: 0,
mode: "followup",
debounceMs: 1000,
cap: 20,
dropPolicy: "summarize",
droppedCount: 0,
summaryLines: [],
...overrides,
};
}
afterEach(() => {
FOLLOWUP_QUEUES.clear();
});
describe("waitForFollowupQueueDrain", () => {
it("returns drained immediately when no queues exist", async () => {
const result = await waitForFollowupQueueDrain(1000);
expect(result).toEqual({ drained: true, remaining: 0 });
});
it("returns drained immediately when all queues are empty", async () => {
FOLLOWUP_QUEUES.set("test", createMockQueue());
const result = await waitForFollowupQueueDrain(1000);
expect(result).toEqual({ drained: true, remaining: 0 });
});
it("waits until queues are drained", async () => {
const queue = createMockQueue({
items: [
{ prompt: "test", run: vi.fn() as unknown, enqueuedAt: Date.now() },
] as FollowupQueueState["items"],
draining: true,
});
FOLLOWUP_QUEUES.set("test", queue);
// Simulate drain completing after 100ms
setTimeout(() => {
queue.items.length = 0;
queue.draining = false;
FOLLOWUP_QUEUES.delete("test");
}, 100);
const result = await waitForFollowupQueueDrain(5000);
expect(result.drained).toBe(true);
expect(result.remaining).toBe(0);
});
it("returns not drained on timeout", async () => {
const queue = createMockQueue({
items: [
{ prompt: "test", run: vi.fn() as unknown, enqueuedAt: Date.now() },
] as FollowupQueueState["items"],
draining: true,
});
FOLLOWUP_QUEUES.set("test", queue);
const result = await waitForFollowupQueueDrain(100);
expect(result.drained).toBe(false);
expect(result.remaining).toBeGreaterThan(0);
});
it("counts draining queues as having pending items even with empty items array", async () => {
const queue = createMockQueue({ draining: true });
FOLLOWUP_QUEUES.set("test", queue);
// Queue has no items but is still draining — should wait
const result = await waitForFollowupQueueDrain(100);
expect(result.drained).toBe(false);
expect(result.remaining).toBeGreaterThanOrEqual(1);
});
});

View File

@ -0,0 +1,48 @@
import { FOLLOWUP_QUEUES } from "./state.js";
/**
* Wait for all followup queues to finish draining, up to `timeoutMs`.
* Returns `{ drained: true }` if all queues are empty, or `{ drained: false }`
* if the timeout was reached with items still pending.
*
* Called during SIGUSR1 restart after flushing inbound debouncers, so the
* newly enqueued items have time to be processed before the server tears down.
*/
export async function waitForFollowupQueueDrain(
timeoutMs: number,
): Promise<{ drained: boolean; remaining: number }> {
const deadline = Date.now() + timeoutMs;
const POLL_INTERVAL_MS = 50;
const getPendingCount = (): number => {
let total = 0;
for (const queue of FOLLOWUP_QUEUES.values()) {
total += queue.items.length;
if (queue.draining) {
// Count draining queues as having at least 1 pending item so we keep
// waiting for the drain loop to finish even if items.length hits 0
// momentarily between shifts.
total = Math.max(total, 1);
}
}
return total;
};
let remaining = getPendingCount();
if (remaining === 0) {
return { drained: true, remaining: 0 };
}
while (Date.now() < deadline) {
await new Promise<void>((resolve) => {
const timer = setTimeout(resolve, Math.min(POLL_INTERVAL_MS, deadline - Date.now()));
timer.unref?.();
});
remaining = getPendingCount();
if (remaining === 0) {
return { drained: true, remaining: 0 };
}
}
return { drained: false, remaining };
}

View File

@ -19,16 +19,29 @@ const scheduleGatewaySigusr1Restart = vi.fn((_opts?: { delayMs?: number; reason?
}));
const getActiveTaskCount = vi.fn(() => 0);
const markGatewayDraining = vi.fn();
const waitForActiveTasks = vi.fn(async (_timeoutMs: number) => ({ drained: true }));
const waitForActiveTasks = vi.fn(async (_timeoutMs: number) => ({
drained: true,
}));
const resetAllLanes = vi.fn();
const restartGatewayProcessWithFreshPid = vi.fn<
() => { mode: "spawned" | "supervised" | "disabled" | "failed"; pid?: number; detail?: string }
() => {
mode: "spawned" | "supervised" | "disabled" | "failed";
pid?: number;
detail?: string;
}
>(() => ({ mode: "disabled" }));
const abortEmbeddedPiRun = vi.fn(
(_sessionId?: string, _opts?: { mode?: "all" | "compacting" }) => false,
);
const getActiveEmbeddedRunCount = vi.fn(() => 0);
const waitForActiveEmbeddedRuns = vi.fn(async (_timeoutMs: number) => ({ drained: true }));
const waitForActiveEmbeddedRuns = vi.fn(async (_timeoutMs: number) => ({
drained: true,
}));
const flushAllInboundDebouncers = vi.fn(async () => 0);
const waitForFollowupQueueDrain = vi.fn(async (_timeoutMs: number) => ({
drained: true,
remaining: 0,
}));
const DRAIN_TIMEOUT_LOG = "drain timeout reached; proceeding with restart";
const gatewayLog = {
info: vi.fn(),
@ -36,6 +49,14 @@ const gatewayLog = {
error: vi.fn(),
};
vi.mock("../../auto-reply/inbound-debounce.js", () => ({
flushAllInboundDebouncers: () => flushAllInboundDebouncers(),
}));
vi.mock("../../auto-reply/reply/queue/drain-all.js", () => ({
waitForFollowupQueueDrain: (timeoutMs: number) => waitForFollowupQueueDrain(timeoutMs),
}));
vi.mock("../../infra/gateway-lock.js", () => ({
acquireGatewayLock: (opts?: { port?: number }) => acquireGatewayLock(opts),
}));
@ -268,10 +289,14 @@ describe("runGatewayLoop", () => {
expect(start).toHaveBeenCalledTimes(2);
await new Promise<void>((resolve) => setImmediate(resolve));
expect(abortEmbeddedPiRun).toHaveBeenCalledWith(undefined, { mode: "compacting" });
expect(abortEmbeddedPiRun).toHaveBeenCalledWith(undefined, {
mode: "compacting",
});
expect(waitForActiveTasks).toHaveBeenCalledWith(90_000);
expect(waitForActiveEmbeddedRuns).toHaveBeenCalledWith(90_000);
expect(abortEmbeddedPiRun).toHaveBeenCalledWith(undefined, { mode: "all" });
expect(abortEmbeddedPiRun).toHaveBeenCalledWith(undefined, {
mode: "all",
});
expect(markGatewayDraining).toHaveBeenCalledTimes(1);
expect(gatewayLog.warn).toHaveBeenCalledWith(DRAIN_TIMEOUT_LOG);
expect(closeFirst).toHaveBeenCalledWith({
@ -325,6 +350,96 @@ describe("runGatewayLoop", () => {
});
});
it("flushes inbound debouncers and waits for followup queue drain before marking gateway draining on SIGUSR1", async () => {
vi.clearAllMocks();
await withIsolatedSignals(async ({ captureSignal }) => {
// Simulate debouncers having buffered messages
flushAllInboundDebouncers.mockResolvedValueOnce(2);
waitForFollowupQueueDrain.mockResolvedValueOnce({
drained: true,
remaining: 0,
});
const { exited } = await createSignaledLoopHarness();
const sigusr1 = captureSignal("SIGUSR1");
const sigterm = captureSignal("SIGTERM");
sigusr1();
// Let the restart complete
await new Promise<void>((resolve) => setImmediate(resolve));
await new Promise<void>((resolve) => setImmediate(resolve));
// Verify flush was called before markGatewayDraining
expect(flushAllInboundDebouncers).toHaveBeenCalledTimes(1);
expect(waitForFollowupQueueDrain).toHaveBeenCalledWith(5_000);
expect(markGatewayDraining).toHaveBeenCalledTimes(1);
// Verify logging
expect(gatewayLog.info).toHaveBeenCalledWith("flushed 2 inbound debouncer(s) before restart");
expect(gatewayLog.info).toHaveBeenCalledWith("followup queues drained after debounce flush");
sigterm();
await expect(exited).resolves.toBe(0);
});
});
it("skips followup queue drain when no debouncers had buffered messages", async () => {
vi.clearAllMocks();
await withIsolatedSignals(async ({ captureSignal }) => {
// No debouncers had buffered messages
flushAllInboundDebouncers.mockResolvedValueOnce(0);
const { exited } = await createSignaledLoopHarness();
const sigusr1 = captureSignal("SIGUSR1");
const sigterm = captureSignal("SIGTERM");
sigusr1();
await new Promise<void>((resolve) => setImmediate(resolve));
await new Promise<void>((resolve) => setImmediate(resolve));
expect(flushAllInboundDebouncers).toHaveBeenCalledTimes(1);
// Should NOT wait for followup drain when nothing was flushed
expect(waitForFollowupQueueDrain).not.toHaveBeenCalled();
// Should still mark draining
expect(markGatewayDraining).toHaveBeenCalledTimes(1);
sigterm();
await expect(exited).resolves.toBe(0);
});
});
it("logs warning when followup queue drain times out", async () => {
vi.clearAllMocks();
await withIsolatedSignals(async ({ captureSignal }) => {
flushAllInboundDebouncers.mockResolvedValueOnce(1);
waitForFollowupQueueDrain.mockResolvedValueOnce({
drained: false,
remaining: 3,
});
const { exited } = await createSignaledLoopHarness();
const sigusr1 = captureSignal("SIGUSR1");
const sigterm = captureSignal("SIGTERM");
sigusr1();
await new Promise<void>((resolve) => setImmediate(resolve));
await new Promise<void>((resolve) => setImmediate(resolve));
expect(gatewayLog.warn).toHaveBeenCalledWith(
"followup queue drain timeout; 3 item(s) still pending",
);
sigterm();
await expect(exited).resolves.toBe(0);
});
});
it("releases the lock before exiting on spawned restart", async () => {
vi.clearAllMocks();

View File

@ -3,6 +3,8 @@ import {
getActiveEmbeddedRunCount,
waitForActiveEmbeddedRuns,
} from "../../agents/pi-embedded-runner/runs.js";
import { flushAllInboundDebouncers } from "../../auto-reply/inbound-debounce.js";
import { waitForFollowupQueueDrain } from "../../auto-reply/reply/queue/drain-all.js";
import type { startGatewayServer } from "../../gateway/server.js";
import { acquireGatewayLock } from "../../infra/gateway-lock.js";
import { restartGatewayProcessWithFreshPid } from "../../infra/process-respawn.js";
@ -123,6 +125,26 @@ export async function runGatewayLoop(params: {
// On restart, wait for in-flight agent turns to finish before
// tearing down the server so buffered messages are delivered.
if (isRestart) {
// Flush inbound debounce buffers first. This pushes any messages
// waiting in per-channel debounce timers (e.g. the 2500ms collect
// window) into the followup queues immediately, preventing silent
// message loss when the server reinitializes.
const flushedDebouncers = await flushAllInboundDebouncers();
if (flushedDebouncers > 0) {
gatewayLog.info(`flushed ${flushedDebouncers} inbound debouncer(s) before restart`);
// Give the followup queue drain loops a short window to process
// the newly flushed items before we mark the gateway as draining.
const FOLLOWUP_DRAIN_TIMEOUT_MS = 5_000;
const followupResult = await waitForFollowupQueueDrain(FOLLOWUP_DRAIN_TIMEOUT_MS);
if (followupResult.drained) {
gatewayLog.info("followup queues drained after debounce flush");
} else {
gatewayLog.warn(
`followup queue drain timeout; ${followupResult.remaining} item(s) still pending`,
);
}
}
// Reject new enqueues immediately during the drain window so
// sessions get an explicit restart error instead of silent task loss.
markGatewayDraining();