From 41b6372ec02efaa820901686cd041a54122dfba0 Mon Sep 17 00:00:00 2001 From: Joey Krug Date: Sat, 14 Mar 2026 11:54:01 -0400 Subject: [PATCH] fix: drain inbound debounce buffer and followup queues before SIGUSR1 reload MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit When config.patch triggers a SIGUSR1 restart, two in-memory message buffers were silently wiped: 1. Per-channel inbound debounce buffers (closure-local Map + setTimeout) 2. Followup queues (global Map of pending session messages) This caused inbound messages received during the debounce window to be permanently lost on config-triggered gateway restarts. Fix: - Add a global registry of inbound debouncers so they can be flushed collectively during restart. Each createInboundDebouncer() call now auto-registers in a shared Symbol.for() map, with a new flushAll() method that immediately processes all buffered items. - Add flushAllInboundDebouncers() which iterates the global registry and forces all debounce timers to fire immediately. - Add waitForFollowupQueueDrain() which polls the FOLLOWUP_QUEUES map until all queues finish processing (or timeout). - Hook both into the SIGUSR1 restart flow in run-loop.ts: before markGatewayDraining(), flush all debouncers first (pushing buffered messages into the followup queues), then wait up to 5s for the followup drain loops to process them. The ordering is critical: flush debouncers → wait for followup drain → then mark draining. This ensures messages that were mid-debounce get delivered to sessions before the gateway reinitializes. Tests: - flushAllInboundDebouncers: flushes multiple registered debouncers, returns count, deregisters after flush - createInboundDebouncer.flushAll: flushes all keys in a single debouncer - waitForFollowupQueueDrain: immediate return when empty, waits for drain, returns not-drained on timeout, counts draining queues - run-loop: SIGUSR1 calls flush before markGatewayDraining, skips followup wait when no debouncers had buffered messages, logs warning on followup drain timeout --- src/auto-reply/inbound-debounce.ts | 59 ++++++++- src/auto-reply/inbound.test.ts | 122 +++++++++++++++++- src/auto-reply/reply/queue.ts | 5 +- src/auto-reply/reply/queue/drain-all.test.ts | 80 ++++++++++++ src/auto-reply/reply/queue/drain-all.ts | 48 +++++++ src/cli/gateway-cli/run-loop.test.ts | 125 ++++++++++++++++++- src/cli/gateway-cli/run-loop.ts | 22 ++++ 7 files changed, 448 insertions(+), 13 deletions(-) create mode 100644 src/auto-reply/reply/queue/drain-all.test.ts create mode 100644 src/auto-reply/reply/queue/drain-all.ts diff --git a/src/auto-reply/inbound-debounce.ts b/src/auto-reply/inbound-debounce.ts index debda7bc7b5..96be2d94b1b 100644 --- a/src/auto-reply/inbound-debounce.ts +++ b/src/auto-reply/inbound-debounce.ts @@ -1,5 +1,45 @@ import type { OpenClawConfig } from "../config/config.js"; import type { InboundDebounceByProvider } from "../config/types.messages.js"; +import { resolveGlobalMap } from "../shared/global-singleton.js"; + +/** + * Global registry of all active inbound debouncers so they can be flushed + * collectively during gateway restart (SIGUSR1). Each debouncer registers + * itself on creation and deregisters when flushAll is called. + */ +type DebouncerFlushHandle = { flushAll: () => Promise }; +const INBOUND_DEBOUNCERS_KEY = Symbol.for("openclaw.inboundDebouncers"); +const INBOUND_DEBOUNCERS = resolveGlobalMap(INBOUND_DEBOUNCERS_KEY); + +/** + * Clear the global debouncer registry. Intended for test cleanup only. + */ +export function clearInboundDebouncerRegistry(): void { + INBOUND_DEBOUNCERS.clear(); +} + +/** + * Flush all registered inbound debouncers immediately. Called during SIGUSR1 + * restart to push buffered messages into the session before reinitializing. + */ +export async function flushAllInboundDebouncers(): Promise { + const entries = [...INBOUND_DEBOUNCERS.entries()]; + if (entries.length === 0) { + return 0; + } + let flushedCount = 0; + await Promise.all( + entries.map(async ([key, handle]) => { + try { + await handle.flushAll(); + flushedCount += 1; + } finally { + INBOUND_DEBOUNCERS.delete(key); + } + }), + ); + return flushedCount; +} const resolveMs = (value: unknown): number | undefined => { if (typeof value !== "number" || !Number.isFinite(value)) { @@ -119,10 +159,25 @@ export function createInboundDebouncer(params: InboundDebounceCreateParams return; } - const buffer: DebounceBuffer = { items: [item], timeout: null, debounceMs }; + const buffer: DebounceBuffer = { + items: [item], + timeout: null, + debounceMs, + }; buffers.set(key, buffer); scheduleFlush(key, buffer); }; - return { enqueue, flushKey }; + const flushAll = async () => { + const keys = [...buffers.keys()]; + for (const key of keys) { + await flushKey(key); + } + }; + + // Register in global registry for SIGUSR1 flush. + const registryKey = Symbol(); + INBOUND_DEBOUNCERS.set(registryKey, { flushAll }); + + return { enqueue, flushKey, flushAll }; } diff --git a/src/auto-reply/inbound.test.ts b/src/auto-reply/inbound.test.ts index 77ff61e814e..c43fa28ce4b 100644 --- a/src/auto-reply/inbound.test.ts +++ b/src/auto-reply/inbound.test.ts @@ -1,10 +1,14 @@ import fs from "node:fs/promises"; import os from "node:os"; import path from "node:path"; -import { describe, expect, it, vi } from "vitest"; +import { afterEach, beforeEach, describe, expect, it, vi } from "vitest"; import type { OpenClawConfig } from "../config/config.js"; import type { GroupKeyResolution } from "../config/sessions.js"; -import { createInboundDebouncer } from "./inbound-debounce.js"; +import { + clearInboundDebouncerRegistry, + createInboundDebouncer, + flushAllInboundDebouncers, +} from "./inbound-debounce.js"; import { resolveGroupRequireMention } from "./reply/groups.js"; import { finalizeInboundContext } from "./reply/inbound-context.js"; import { @@ -308,7 +312,11 @@ describe("createInboundDebouncer", () => { vi.useFakeTimers(); const calls: Array = []; - const debouncer = createInboundDebouncer<{ key: string; id: string; debounce: boolean }>({ + const debouncer = createInboundDebouncer<{ + key: string; + id: string; + debounce: boolean; + }>({ debounceMs: 50, buildKey: (item) => item.key, shouldDebounce: (item) => item.debounce, @@ -329,7 +337,11 @@ describe("createInboundDebouncer", () => { vi.useFakeTimers(); const calls: Array = []; - const debouncer = createInboundDebouncer<{ key: string; id: string; windowMs: number }>({ + const debouncer = createInboundDebouncer<{ + key: string; + id: string; + windowMs: number; + }>({ debounceMs: 0, buildKey: (item) => item.key, resolveDebounceMs: (item) => item.windowMs, @@ -349,6 +361,108 @@ describe("createInboundDebouncer", () => { }); }); +describe("flushAllInboundDebouncers", () => { + // Clear registry before each test to avoid leaking state from other tests + // that create debouncers. + beforeEach(() => { + clearInboundDebouncerRegistry(); + }); + + afterEach(() => { + clearInboundDebouncerRegistry(); + }); + + it("flushes all registered debouncers immediately", async () => { + vi.useFakeTimers(); + const callsA: Array = []; + const callsB: Array = []; + + const debouncerA = createInboundDebouncer<{ key: string; id: string }>({ + debounceMs: 5000, + buildKey: (item) => item.key, + onFlush: async (items) => { + callsA.push(items.map((entry) => entry.id)); + }, + }); + + const debouncerB = createInboundDebouncer<{ key: string; id: string }>({ + debounceMs: 5000, + buildKey: (item) => item.key, + onFlush: async (items) => { + callsB.push(items.map((entry) => entry.id)); + }, + }); + + await debouncerA.enqueue({ key: "session-1", id: "msg-1" }); + await debouncerA.enqueue({ key: "session-1", id: "msg-2" }); + await debouncerB.enqueue({ key: "session-2", id: "msg-3" }); + + // Nothing flushed yet (timers haven't fired) + expect(callsA).toEqual([]); + expect(callsB).toEqual([]); + + const flushed = await flushAllInboundDebouncers(); + expect(flushed).toBe(2); + expect(callsA).toEqual([["msg-1", "msg-2"]]); + expect(callsB).toEqual([["msg-3"]]); + + vi.useRealTimers(); + }); + + it("returns 0 when no debouncers are registered", async () => { + const flushed = await flushAllInboundDebouncers(); + expect(flushed).toBe(0); + }); + + it("deregisters debouncers from global registry after flush", async () => { + vi.useFakeTimers(); + + createInboundDebouncer<{ key: string; id: string }>({ + debounceMs: 5000, + buildKey: (item) => item.key, + onFlush: async () => {}, + }); + + // First flush deregisters + await flushAllInboundDebouncers(); + + // Second flush should find nothing + const flushed = await flushAllInboundDebouncers(); + expect(flushed).toBe(0); + + vi.useRealTimers(); + }); +}); + +describe("createInboundDebouncer flushAll", () => { + it("flushes all buffered keys", async () => { + vi.useFakeTimers(); + const calls: Array = []; + + const debouncer = createInboundDebouncer<{ key: string; id: string }>({ + debounceMs: 5000, + buildKey: (item) => item.key, + onFlush: async (items) => { + calls.push(items.map((entry) => entry.id)); + }, + }); + + await debouncer.enqueue({ key: "a", id: "1" }); + await debouncer.enqueue({ key: "b", id: "2" }); + await debouncer.enqueue({ key: "a", id: "3" }); + + expect(calls).toEqual([]); + await debouncer.flushAll(); + + // Both keys flushed + expect(calls).toHaveLength(2); + expect(calls).toContainEqual(["1", "3"]); + expect(calls).toContainEqual(["2"]); + + vi.useRealTimers(); + }); +}); + describe("initSessionState BodyStripped", () => { it("prefers BodyForAgent over Body for group chats", async () => { const root = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-sender-meta-")); diff --git a/src/auto-reply/reply/queue.ts b/src/auto-reply/reply/queue.ts index b097b6c5193..e3bbfcd2761 100644 --- a/src/auto-reply/reply/queue.ts +++ b/src/auto-reply/reply/queue.ts @@ -1,7 +1,8 @@ -export { extractQueueDirective } from "./queue/directive.js"; -export { clearSessionQueues } from "./queue/cleanup.js"; export type { ClearSessionQueueResult } from "./queue/cleanup.js"; +export { clearSessionQueues } from "./queue/cleanup.js"; +export { extractQueueDirective } from "./queue/directive.js"; export { scheduleFollowupDrain } from "./queue/drain.js"; +export { waitForFollowupQueueDrain } from "./queue/drain-all.js"; export { enqueueFollowupRun, getFollowupQueueDepth, diff --git a/src/auto-reply/reply/queue/drain-all.test.ts b/src/auto-reply/reply/queue/drain-all.test.ts new file mode 100644 index 00000000000..ad166e07b0a --- /dev/null +++ b/src/auto-reply/reply/queue/drain-all.test.ts @@ -0,0 +1,80 @@ +import { afterEach, describe, expect, it, vi } from "vitest"; +import { waitForFollowupQueueDrain } from "./drain-all.js"; +import { FOLLOWUP_QUEUES, type FollowupQueueState } from "./state.js"; + +function createMockQueue(overrides: Partial = {}): FollowupQueueState { + return { + items: [], + draining: false, + lastEnqueuedAt: 0, + mode: "followup", + debounceMs: 1000, + cap: 20, + dropPolicy: "summarize", + droppedCount: 0, + summaryLines: [], + ...overrides, + }; +} + +afterEach(() => { + FOLLOWUP_QUEUES.clear(); +}); + +describe("waitForFollowupQueueDrain", () => { + it("returns drained immediately when no queues exist", async () => { + const result = await waitForFollowupQueueDrain(1000); + expect(result).toEqual({ drained: true, remaining: 0 }); + }); + + it("returns drained immediately when all queues are empty", async () => { + FOLLOWUP_QUEUES.set("test", createMockQueue()); + const result = await waitForFollowupQueueDrain(1000); + expect(result).toEqual({ drained: true, remaining: 0 }); + }); + + it("waits until queues are drained", async () => { + const queue = createMockQueue({ + items: [ + { prompt: "test", run: vi.fn() as unknown, enqueuedAt: Date.now() }, + ] as FollowupQueueState["items"], + draining: true, + }); + FOLLOWUP_QUEUES.set("test", queue); + + // Simulate drain completing after 100ms + setTimeout(() => { + queue.items.length = 0; + queue.draining = false; + FOLLOWUP_QUEUES.delete("test"); + }, 100); + + const result = await waitForFollowupQueueDrain(5000); + expect(result.drained).toBe(true); + expect(result.remaining).toBe(0); + }); + + it("returns not drained on timeout", async () => { + const queue = createMockQueue({ + items: [ + { prompt: "test", run: vi.fn() as unknown, enqueuedAt: Date.now() }, + ] as FollowupQueueState["items"], + draining: true, + }); + FOLLOWUP_QUEUES.set("test", queue); + + const result = await waitForFollowupQueueDrain(100); + expect(result.drained).toBe(false); + expect(result.remaining).toBeGreaterThan(0); + }); + + it("counts draining queues as having pending items even with empty items array", async () => { + const queue = createMockQueue({ draining: true }); + FOLLOWUP_QUEUES.set("test", queue); + + // Queue has no items but is still draining — should wait + const result = await waitForFollowupQueueDrain(100); + expect(result.drained).toBe(false); + expect(result.remaining).toBeGreaterThanOrEqual(1); + }); +}); diff --git a/src/auto-reply/reply/queue/drain-all.ts b/src/auto-reply/reply/queue/drain-all.ts new file mode 100644 index 00000000000..a0e4c2e4f5a --- /dev/null +++ b/src/auto-reply/reply/queue/drain-all.ts @@ -0,0 +1,48 @@ +import { FOLLOWUP_QUEUES } from "./state.js"; + +/** + * Wait for all followup queues to finish draining, up to `timeoutMs`. + * Returns `{ drained: true }` if all queues are empty, or `{ drained: false }` + * if the timeout was reached with items still pending. + * + * Called during SIGUSR1 restart after flushing inbound debouncers, so the + * newly enqueued items have time to be processed before the server tears down. + */ +export async function waitForFollowupQueueDrain( + timeoutMs: number, +): Promise<{ drained: boolean; remaining: number }> { + const deadline = Date.now() + timeoutMs; + const POLL_INTERVAL_MS = 50; + + const getPendingCount = (): number => { + let total = 0; + for (const queue of FOLLOWUP_QUEUES.values()) { + total += queue.items.length; + if (queue.draining) { + // Count draining queues as having at least 1 pending item so we keep + // waiting for the drain loop to finish even if items.length hits 0 + // momentarily between shifts. + total = Math.max(total, 1); + } + } + return total; + }; + + let remaining = getPendingCount(); + if (remaining === 0) { + return { drained: true, remaining: 0 }; + } + + while (Date.now() < deadline) { + await new Promise((resolve) => { + const timer = setTimeout(resolve, Math.min(POLL_INTERVAL_MS, deadline - Date.now())); + timer.unref?.(); + }); + remaining = getPendingCount(); + if (remaining === 0) { + return { drained: true, remaining: 0 }; + } + } + + return { drained: false, remaining }; +} diff --git a/src/cli/gateway-cli/run-loop.test.ts b/src/cli/gateway-cli/run-loop.test.ts index ce8fbccbe93..49c1c4734d1 100644 --- a/src/cli/gateway-cli/run-loop.test.ts +++ b/src/cli/gateway-cli/run-loop.test.ts @@ -19,16 +19,29 @@ const scheduleGatewaySigusr1Restart = vi.fn((_opts?: { delayMs?: number; reason? })); const getActiveTaskCount = vi.fn(() => 0); const markGatewayDraining = vi.fn(); -const waitForActiveTasks = vi.fn(async (_timeoutMs: number) => ({ drained: true })); +const waitForActiveTasks = vi.fn(async (_timeoutMs: number) => ({ + drained: true, +})); const resetAllLanes = vi.fn(); const restartGatewayProcessWithFreshPid = vi.fn< - () => { mode: "spawned" | "supervised" | "disabled" | "failed"; pid?: number; detail?: string } + () => { + mode: "spawned" | "supervised" | "disabled" | "failed"; + pid?: number; + detail?: string; + } >(() => ({ mode: "disabled" })); const abortEmbeddedPiRun = vi.fn( (_sessionId?: string, _opts?: { mode?: "all" | "compacting" }) => false, ); const getActiveEmbeddedRunCount = vi.fn(() => 0); -const waitForActiveEmbeddedRuns = vi.fn(async (_timeoutMs: number) => ({ drained: true })); +const waitForActiveEmbeddedRuns = vi.fn(async (_timeoutMs: number) => ({ + drained: true, +})); +const flushAllInboundDebouncers = vi.fn(async () => 0); +const waitForFollowupQueueDrain = vi.fn(async (_timeoutMs: number) => ({ + drained: true, + remaining: 0, +})); const DRAIN_TIMEOUT_LOG = "drain timeout reached; proceeding with restart"; const gatewayLog = { info: vi.fn(), @@ -36,6 +49,14 @@ const gatewayLog = { error: vi.fn(), }; +vi.mock("../../auto-reply/inbound-debounce.js", () => ({ + flushAllInboundDebouncers: () => flushAllInboundDebouncers(), +})); + +vi.mock("../../auto-reply/reply/queue/drain-all.js", () => ({ + waitForFollowupQueueDrain: (timeoutMs: number) => waitForFollowupQueueDrain(timeoutMs), +})); + vi.mock("../../infra/gateway-lock.js", () => ({ acquireGatewayLock: (opts?: { port?: number }) => acquireGatewayLock(opts), })); @@ -268,10 +289,14 @@ describe("runGatewayLoop", () => { expect(start).toHaveBeenCalledTimes(2); await new Promise((resolve) => setImmediate(resolve)); - expect(abortEmbeddedPiRun).toHaveBeenCalledWith(undefined, { mode: "compacting" }); + expect(abortEmbeddedPiRun).toHaveBeenCalledWith(undefined, { + mode: "compacting", + }); expect(waitForActiveTasks).toHaveBeenCalledWith(90_000); expect(waitForActiveEmbeddedRuns).toHaveBeenCalledWith(90_000); - expect(abortEmbeddedPiRun).toHaveBeenCalledWith(undefined, { mode: "all" }); + expect(abortEmbeddedPiRun).toHaveBeenCalledWith(undefined, { + mode: "all", + }); expect(markGatewayDraining).toHaveBeenCalledTimes(1); expect(gatewayLog.warn).toHaveBeenCalledWith(DRAIN_TIMEOUT_LOG); expect(closeFirst).toHaveBeenCalledWith({ @@ -325,6 +350,96 @@ describe("runGatewayLoop", () => { }); }); + it("flushes inbound debouncers and waits for followup queue drain before marking gateway draining on SIGUSR1", async () => { + vi.clearAllMocks(); + + await withIsolatedSignals(async ({ captureSignal }) => { + // Simulate debouncers having buffered messages + flushAllInboundDebouncers.mockResolvedValueOnce(2); + waitForFollowupQueueDrain.mockResolvedValueOnce({ + drained: true, + remaining: 0, + }); + + const { exited } = await createSignaledLoopHarness(); + const sigusr1 = captureSignal("SIGUSR1"); + const sigterm = captureSignal("SIGTERM"); + + sigusr1(); + + // Let the restart complete + await new Promise((resolve) => setImmediate(resolve)); + await new Promise((resolve) => setImmediate(resolve)); + + // Verify flush was called before markGatewayDraining + expect(flushAllInboundDebouncers).toHaveBeenCalledTimes(1); + expect(waitForFollowupQueueDrain).toHaveBeenCalledWith(5_000); + expect(markGatewayDraining).toHaveBeenCalledTimes(1); + + // Verify logging + expect(gatewayLog.info).toHaveBeenCalledWith("flushed 2 inbound debouncer(s) before restart"); + expect(gatewayLog.info).toHaveBeenCalledWith("followup queues drained after debounce flush"); + + sigterm(); + await expect(exited).resolves.toBe(0); + }); + }); + + it("skips followup queue drain when no debouncers had buffered messages", async () => { + vi.clearAllMocks(); + + await withIsolatedSignals(async ({ captureSignal }) => { + // No debouncers had buffered messages + flushAllInboundDebouncers.mockResolvedValueOnce(0); + + const { exited } = await createSignaledLoopHarness(); + const sigusr1 = captureSignal("SIGUSR1"); + const sigterm = captureSignal("SIGTERM"); + + sigusr1(); + + await new Promise((resolve) => setImmediate(resolve)); + await new Promise((resolve) => setImmediate(resolve)); + + expect(flushAllInboundDebouncers).toHaveBeenCalledTimes(1); + // Should NOT wait for followup drain when nothing was flushed + expect(waitForFollowupQueueDrain).not.toHaveBeenCalled(); + // Should still mark draining + expect(markGatewayDraining).toHaveBeenCalledTimes(1); + + sigterm(); + await expect(exited).resolves.toBe(0); + }); + }); + + it("logs warning when followup queue drain times out", async () => { + vi.clearAllMocks(); + + await withIsolatedSignals(async ({ captureSignal }) => { + flushAllInboundDebouncers.mockResolvedValueOnce(1); + waitForFollowupQueueDrain.mockResolvedValueOnce({ + drained: false, + remaining: 3, + }); + + const { exited } = await createSignaledLoopHarness(); + const sigusr1 = captureSignal("SIGUSR1"); + const sigterm = captureSignal("SIGTERM"); + + sigusr1(); + + await new Promise((resolve) => setImmediate(resolve)); + await new Promise((resolve) => setImmediate(resolve)); + + expect(gatewayLog.warn).toHaveBeenCalledWith( + "followup queue drain timeout; 3 item(s) still pending", + ); + + sigterm(); + await expect(exited).resolves.toBe(0); + }); + }); + it("releases the lock before exiting on spawned restart", async () => { vi.clearAllMocks(); diff --git a/src/cli/gateway-cli/run-loop.ts b/src/cli/gateway-cli/run-loop.ts index 23ec7dd584d..27332d540e3 100644 --- a/src/cli/gateway-cli/run-loop.ts +++ b/src/cli/gateway-cli/run-loop.ts @@ -3,6 +3,8 @@ import { getActiveEmbeddedRunCount, waitForActiveEmbeddedRuns, } from "../../agents/pi-embedded-runner/runs.js"; +import { flushAllInboundDebouncers } from "../../auto-reply/inbound-debounce.js"; +import { waitForFollowupQueueDrain } from "../../auto-reply/reply/queue/drain-all.js"; import type { startGatewayServer } from "../../gateway/server.js"; import { acquireGatewayLock } from "../../infra/gateway-lock.js"; import { restartGatewayProcessWithFreshPid } from "../../infra/process-respawn.js"; @@ -123,6 +125,26 @@ export async function runGatewayLoop(params: { // On restart, wait for in-flight agent turns to finish before // tearing down the server so buffered messages are delivered. if (isRestart) { + // Flush inbound debounce buffers first. This pushes any messages + // waiting in per-channel debounce timers (e.g. the 2500ms collect + // window) into the followup queues immediately, preventing silent + // message loss when the server reinitializes. + const flushedDebouncers = await flushAllInboundDebouncers(); + if (flushedDebouncers > 0) { + gatewayLog.info(`flushed ${flushedDebouncers} inbound debouncer(s) before restart`); + // Give the followup queue drain loops a short window to process + // the newly flushed items before we mark the gateway as draining. + const FOLLOWUP_DRAIN_TIMEOUT_MS = 5_000; + const followupResult = await waitForFollowupQueueDrain(FOLLOWUP_DRAIN_TIMEOUT_MS); + if (followupResult.drained) { + gatewayLog.info("followup queues drained after debounce flush"); + } else { + gatewayLog.warn( + `followup queue drain timeout; ${followupResult.remaining} item(s) still pending`, + ); + } + } + // Reject new enqueues immediately during the drain window so // sessions get an explicit restart error instead of silent task loss. markGatewayDraining();