From 41b6372ec02efaa820901686cd041a54122dfba0 Mon Sep 17 00:00:00 2001 From: Joey Krug Date: Sat, 14 Mar 2026 11:54:01 -0400 Subject: [PATCH 01/11] fix: drain inbound debounce buffer and followup queues before SIGUSR1 reload MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit When config.patch triggers a SIGUSR1 restart, two in-memory message buffers were silently wiped: 1. Per-channel inbound debounce buffers (closure-local Map + setTimeout) 2. Followup queues (global Map of pending session messages) This caused inbound messages received during the debounce window to be permanently lost on config-triggered gateway restarts. Fix: - Add a global registry of inbound debouncers so they can be flushed collectively during restart. Each createInboundDebouncer() call now auto-registers in a shared Symbol.for() map, with a new flushAll() method that immediately processes all buffered items. - Add flushAllInboundDebouncers() which iterates the global registry and forces all debounce timers to fire immediately. - Add waitForFollowupQueueDrain() which polls the FOLLOWUP_QUEUES map until all queues finish processing (or timeout). - Hook both into the SIGUSR1 restart flow in run-loop.ts: before markGatewayDraining(), flush all debouncers first (pushing buffered messages into the followup queues), then wait up to 5s for the followup drain loops to process them. The ordering is critical: flush debouncers → wait for followup drain → then mark draining. This ensures messages that were mid-debounce get delivered to sessions before the gateway reinitializes. Tests: - flushAllInboundDebouncers: flushes multiple registered debouncers, returns count, deregisters after flush - createInboundDebouncer.flushAll: flushes all keys in a single debouncer - waitForFollowupQueueDrain: immediate return when empty, waits for drain, returns not-drained on timeout, counts draining queues - run-loop: SIGUSR1 calls flush before markGatewayDraining, skips followup wait when no debouncers had buffered messages, logs warning on followup drain timeout --- src/auto-reply/inbound-debounce.ts | 59 ++++++++- src/auto-reply/inbound.test.ts | 122 +++++++++++++++++- src/auto-reply/reply/queue.ts | 5 +- src/auto-reply/reply/queue/drain-all.test.ts | 80 ++++++++++++ src/auto-reply/reply/queue/drain-all.ts | 48 +++++++ src/cli/gateway-cli/run-loop.test.ts | 125 ++++++++++++++++++- src/cli/gateway-cli/run-loop.ts | 22 ++++ 7 files changed, 448 insertions(+), 13 deletions(-) create mode 100644 src/auto-reply/reply/queue/drain-all.test.ts create mode 100644 src/auto-reply/reply/queue/drain-all.ts diff --git a/src/auto-reply/inbound-debounce.ts b/src/auto-reply/inbound-debounce.ts index debda7bc7b5..96be2d94b1b 100644 --- a/src/auto-reply/inbound-debounce.ts +++ b/src/auto-reply/inbound-debounce.ts @@ -1,5 +1,45 @@ import type { OpenClawConfig } from "../config/config.js"; import type { InboundDebounceByProvider } from "../config/types.messages.js"; +import { resolveGlobalMap } from "../shared/global-singleton.js"; + +/** + * Global registry of all active inbound debouncers so they can be flushed + * collectively during gateway restart (SIGUSR1). Each debouncer registers + * itself on creation and deregisters when flushAll is called. + */ +type DebouncerFlushHandle = { flushAll: () => Promise }; +const INBOUND_DEBOUNCERS_KEY = Symbol.for("openclaw.inboundDebouncers"); +const INBOUND_DEBOUNCERS = resolveGlobalMap(INBOUND_DEBOUNCERS_KEY); + +/** + * Clear the global debouncer registry. Intended for test cleanup only. + */ +export function clearInboundDebouncerRegistry(): void { + INBOUND_DEBOUNCERS.clear(); +} + +/** + * Flush all registered inbound debouncers immediately. Called during SIGUSR1 + * restart to push buffered messages into the session before reinitializing. + */ +export async function flushAllInboundDebouncers(): Promise { + const entries = [...INBOUND_DEBOUNCERS.entries()]; + if (entries.length === 0) { + return 0; + } + let flushedCount = 0; + await Promise.all( + entries.map(async ([key, handle]) => { + try { + await handle.flushAll(); + flushedCount += 1; + } finally { + INBOUND_DEBOUNCERS.delete(key); + } + }), + ); + return flushedCount; +} const resolveMs = (value: unknown): number | undefined => { if (typeof value !== "number" || !Number.isFinite(value)) { @@ -119,10 +159,25 @@ export function createInboundDebouncer(params: InboundDebounceCreateParams return; } - const buffer: DebounceBuffer = { items: [item], timeout: null, debounceMs }; + const buffer: DebounceBuffer = { + items: [item], + timeout: null, + debounceMs, + }; buffers.set(key, buffer); scheduleFlush(key, buffer); }; - return { enqueue, flushKey }; + const flushAll = async () => { + const keys = [...buffers.keys()]; + for (const key of keys) { + await flushKey(key); + } + }; + + // Register in global registry for SIGUSR1 flush. + const registryKey = Symbol(); + INBOUND_DEBOUNCERS.set(registryKey, { flushAll }); + + return { enqueue, flushKey, flushAll }; } diff --git a/src/auto-reply/inbound.test.ts b/src/auto-reply/inbound.test.ts index 77ff61e814e..c43fa28ce4b 100644 --- a/src/auto-reply/inbound.test.ts +++ b/src/auto-reply/inbound.test.ts @@ -1,10 +1,14 @@ import fs from "node:fs/promises"; import os from "node:os"; import path from "node:path"; -import { describe, expect, it, vi } from "vitest"; +import { afterEach, beforeEach, describe, expect, it, vi } from "vitest"; import type { OpenClawConfig } from "../config/config.js"; import type { GroupKeyResolution } from "../config/sessions.js"; -import { createInboundDebouncer } from "./inbound-debounce.js"; +import { + clearInboundDebouncerRegistry, + createInboundDebouncer, + flushAllInboundDebouncers, +} from "./inbound-debounce.js"; import { resolveGroupRequireMention } from "./reply/groups.js"; import { finalizeInboundContext } from "./reply/inbound-context.js"; import { @@ -308,7 +312,11 @@ describe("createInboundDebouncer", () => { vi.useFakeTimers(); const calls: Array = []; - const debouncer = createInboundDebouncer<{ key: string; id: string; debounce: boolean }>({ + const debouncer = createInboundDebouncer<{ + key: string; + id: string; + debounce: boolean; + }>({ debounceMs: 50, buildKey: (item) => item.key, shouldDebounce: (item) => item.debounce, @@ -329,7 +337,11 @@ describe("createInboundDebouncer", () => { vi.useFakeTimers(); const calls: Array = []; - const debouncer = createInboundDebouncer<{ key: string; id: string; windowMs: number }>({ + const debouncer = createInboundDebouncer<{ + key: string; + id: string; + windowMs: number; + }>({ debounceMs: 0, buildKey: (item) => item.key, resolveDebounceMs: (item) => item.windowMs, @@ -349,6 +361,108 @@ describe("createInboundDebouncer", () => { }); }); +describe("flushAllInboundDebouncers", () => { + // Clear registry before each test to avoid leaking state from other tests + // that create debouncers. + beforeEach(() => { + clearInboundDebouncerRegistry(); + }); + + afterEach(() => { + clearInboundDebouncerRegistry(); + }); + + it("flushes all registered debouncers immediately", async () => { + vi.useFakeTimers(); + const callsA: Array = []; + const callsB: Array = []; + + const debouncerA = createInboundDebouncer<{ key: string; id: string }>({ + debounceMs: 5000, + buildKey: (item) => item.key, + onFlush: async (items) => { + callsA.push(items.map((entry) => entry.id)); + }, + }); + + const debouncerB = createInboundDebouncer<{ key: string; id: string }>({ + debounceMs: 5000, + buildKey: (item) => item.key, + onFlush: async (items) => { + callsB.push(items.map((entry) => entry.id)); + }, + }); + + await debouncerA.enqueue({ key: "session-1", id: "msg-1" }); + await debouncerA.enqueue({ key: "session-1", id: "msg-2" }); + await debouncerB.enqueue({ key: "session-2", id: "msg-3" }); + + // Nothing flushed yet (timers haven't fired) + expect(callsA).toEqual([]); + expect(callsB).toEqual([]); + + const flushed = await flushAllInboundDebouncers(); + expect(flushed).toBe(2); + expect(callsA).toEqual([["msg-1", "msg-2"]]); + expect(callsB).toEqual([["msg-3"]]); + + vi.useRealTimers(); + }); + + it("returns 0 when no debouncers are registered", async () => { + const flushed = await flushAllInboundDebouncers(); + expect(flushed).toBe(0); + }); + + it("deregisters debouncers from global registry after flush", async () => { + vi.useFakeTimers(); + + createInboundDebouncer<{ key: string; id: string }>({ + debounceMs: 5000, + buildKey: (item) => item.key, + onFlush: async () => {}, + }); + + // First flush deregisters + await flushAllInboundDebouncers(); + + // Second flush should find nothing + const flushed = await flushAllInboundDebouncers(); + expect(flushed).toBe(0); + + vi.useRealTimers(); + }); +}); + +describe("createInboundDebouncer flushAll", () => { + it("flushes all buffered keys", async () => { + vi.useFakeTimers(); + const calls: Array = []; + + const debouncer = createInboundDebouncer<{ key: string; id: string }>({ + debounceMs: 5000, + buildKey: (item) => item.key, + onFlush: async (items) => { + calls.push(items.map((entry) => entry.id)); + }, + }); + + await debouncer.enqueue({ key: "a", id: "1" }); + await debouncer.enqueue({ key: "b", id: "2" }); + await debouncer.enqueue({ key: "a", id: "3" }); + + expect(calls).toEqual([]); + await debouncer.flushAll(); + + // Both keys flushed + expect(calls).toHaveLength(2); + expect(calls).toContainEqual(["1", "3"]); + expect(calls).toContainEqual(["2"]); + + vi.useRealTimers(); + }); +}); + describe("initSessionState BodyStripped", () => { it("prefers BodyForAgent over Body for group chats", async () => { const root = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-sender-meta-")); diff --git a/src/auto-reply/reply/queue.ts b/src/auto-reply/reply/queue.ts index b097b6c5193..e3bbfcd2761 100644 --- a/src/auto-reply/reply/queue.ts +++ b/src/auto-reply/reply/queue.ts @@ -1,7 +1,8 @@ -export { extractQueueDirective } from "./queue/directive.js"; -export { clearSessionQueues } from "./queue/cleanup.js"; export type { ClearSessionQueueResult } from "./queue/cleanup.js"; +export { clearSessionQueues } from "./queue/cleanup.js"; +export { extractQueueDirective } from "./queue/directive.js"; export { scheduleFollowupDrain } from "./queue/drain.js"; +export { waitForFollowupQueueDrain } from "./queue/drain-all.js"; export { enqueueFollowupRun, getFollowupQueueDepth, diff --git a/src/auto-reply/reply/queue/drain-all.test.ts b/src/auto-reply/reply/queue/drain-all.test.ts new file mode 100644 index 00000000000..ad166e07b0a --- /dev/null +++ b/src/auto-reply/reply/queue/drain-all.test.ts @@ -0,0 +1,80 @@ +import { afterEach, describe, expect, it, vi } from "vitest"; +import { waitForFollowupQueueDrain } from "./drain-all.js"; +import { FOLLOWUP_QUEUES, type FollowupQueueState } from "./state.js"; + +function createMockQueue(overrides: Partial = {}): FollowupQueueState { + return { + items: [], + draining: false, + lastEnqueuedAt: 0, + mode: "followup", + debounceMs: 1000, + cap: 20, + dropPolicy: "summarize", + droppedCount: 0, + summaryLines: [], + ...overrides, + }; +} + +afterEach(() => { + FOLLOWUP_QUEUES.clear(); +}); + +describe("waitForFollowupQueueDrain", () => { + it("returns drained immediately when no queues exist", async () => { + const result = await waitForFollowupQueueDrain(1000); + expect(result).toEqual({ drained: true, remaining: 0 }); + }); + + it("returns drained immediately when all queues are empty", async () => { + FOLLOWUP_QUEUES.set("test", createMockQueue()); + const result = await waitForFollowupQueueDrain(1000); + expect(result).toEqual({ drained: true, remaining: 0 }); + }); + + it("waits until queues are drained", async () => { + const queue = createMockQueue({ + items: [ + { prompt: "test", run: vi.fn() as unknown, enqueuedAt: Date.now() }, + ] as FollowupQueueState["items"], + draining: true, + }); + FOLLOWUP_QUEUES.set("test", queue); + + // Simulate drain completing after 100ms + setTimeout(() => { + queue.items.length = 0; + queue.draining = false; + FOLLOWUP_QUEUES.delete("test"); + }, 100); + + const result = await waitForFollowupQueueDrain(5000); + expect(result.drained).toBe(true); + expect(result.remaining).toBe(0); + }); + + it("returns not drained on timeout", async () => { + const queue = createMockQueue({ + items: [ + { prompt: "test", run: vi.fn() as unknown, enqueuedAt: Date.now() }, + ] as FollowupQueueState["items"], + draining: true, + }); + FOLLOWUP_QUEUES.set("test", queue); + + const result = await waitForFollowupQueueDrain(100); + expect(result.drained).toBe(false); + expect(result.remaining).toBeGreaterThan(0); + }); + + it("counts draining queues as having pending items even with empty items array", async () => { + const queue = createMockQueue({ draining: true }); + FOLLOWUP_QUEUES.set("test", queue); + + // Queue has no items but is still draining — should wait + const result = await waitForFollowupQueueDrain(100); + expect(result.drained).toBe(false); + expect(result.remaining).toBeGreaterThanOrEqual(1); + }); +}); diff --git a/src/auto-reply/reply/queue/drain-all.ts b/src/auto-reply/reply/queue/drain-all.ts new file mode 100644 index 00000000000..a0e4c2e4f5a --- /dev/null +++ b/src/auto-reply/reply/queue/drain-all.ts @@ -0,0 +1,48 @@ +import { FOLLOWUP_QUEUES } from "./state.js"; + +/** + * Wait for all followup queues to finish draining, up to `timeoutMs`. + * Returns `{ drained: true }` if all queues are empty, or `{ drained: false }` + * if the timeout was reached with items still pending. + * + * Called during SIGUSR1 restart after flushing inbound debouncers, so the + * newly enqueued items have time to be processed before the server tears down. + */ +export async function waitForFollowupQueueDrain( + timeoutMs: number, +): Promise<{ drained: boolean; remaining: number }> { + const deadline = Date.now() + timeoutMs; + const POLL_INTERVAL_MS = 50; + + const getPendingCount = (): number => { + let total = 0; + for (const queue of FOLLOWUP_QUEUES.values()) { + total += queue.items.length; + if (queue.draining) { + // Count draining queues as having at least 1 pending item so we keep + // waiting for the drain loop to finish even if items.length hits 0 + // momentarily between shifts. + total = Math.max(total, 1); + } + } + return total; + }; + + let remaining = getPendingCount(); + if (remaining === 0) { + return { drained: true, remaining: 0 }; + } + + while (Date.now() < deadline) { + await new Promise((resolve) => { + const timer = setTimeout(resolve, Math.min(POLL_INTERVAL_MS, deadline - Date.now())); + timer.unref?.(); + }); + remaining = getPendingCount(); + if (remaining === 0) { + return { drained: true, remaining: 0 }; + } + } + + return { drained: false, remaining }; +} diff --git a/src/cli/gateway-cli/run-loop.test.ts b/src/cli/gateway-cli/run-loop.test.ts index ce8fbccbe93..49c1c4734d1 100644 --- a/src/cli/gateway-cli/run-loop.test.ts +++ b/src/cli/gateway-cli/run-loop.test.ts @@ -19,16 +19,29 @@ const scheduleGatewaySigusr1Restart = vi.fn((_opts?: { delayMs?: number; reason? })); const getActiveTaskCount = vi.fn(() => 0); const markGatewayDraining = vi.fn(); -const waitForActiveTasks = vi.fn(async (_timeoutMs: number) => ({ drained: true })); +const waitForActiveTasks = vi.fn(async (_timeoutMs: number) => ({ + drained: true, +})); const resetAllLanes = vi.fn(); const restartGatewayProcessWithFreshPid = vi.fn< - () => { mode: "spawned" | "supervised" | "disabled" | "failed"; pid?: number; detail?: string } + () => { + mode: "spawned" | "supervised" | "disabled" | "failed"; + pid?: number; + detail?: string; + } >(() => ({ mode: "disabled" })); const abortEmbeddedPiRun = vi.fn( (_sessionId?: string, _opts?: { mode?: "all" | "compacting" }) => false, ); const getActiveEmbeddedRunCount = vi.fn(() => 0); -const waitForActiveEmbeddedRuns = vi.fn(async (_timeoutMs: number) => ({ drained: true })); +const waitForActiveEmbeddedRuns = vi.fn(async (_timeoutMs: number) => ({ + drained: true, +})); +const flushAllInboundDebouncers = vi.fn(async () => 0); +const waitForFollowupQueueDrain = vi.fn(async (_timeoutMs: number) => ({ + drained: true, + remaining: 0, +})); const DRAIN_TIMEOUT_LOG = "drain timeout reached; proceeding with restart"; const gatewayLog = { info: vi.fn(), @@ -36,6 +49,14 @@ const gatewayLog = { error: vi.fn(), }; +vi.mock("../../auto-reply/inbound-debounce.js", () => ({ + flushAllInboundDebouncers: () => flushAllInboundDebouncers(), +})); + +vi.mock("../../auto-reply/reply/queue/drain-all.js", () => ({ + waitForFollowupQueueDrain: (timeoutMs: number) => waitForFollowupQueueDrain(timeoutMs), +})); + vi.mock("../../infra/gateway-lock.js", () => ({ acquireGatewayLock: (opts?: { port?: number }) => acquireGatewayLock(opts), })); @@ -268,10 +289,14 @@ describe("runGatewayLoop", () => { expect(start).toHaveBeenCalledTimes(2); await new Promise((resolve) => setImmediate(resolve)); - expect(abortEmbeddedPiRun).toHaveBeenCalledWith(undefined, { mode: "compacting" }); + expect(abortEmbeddedPiRun).toHaveBeenCalledWith(undefined, { + mode: "compacting", + }); expect(waitForActiveTasks).toHaveBeenCalledWith(90_000); expect(waitForActiveEmbeddedRuns).toHaveBeenCalledWith(90_000); - expect(abortEmbeddedPiRun).toHaveBeenCalledWith(undefined, { mode: "all" }); + expect(abortEmbeddedPiRun).toHaveBeenCalledWith(undefined, { + mode: "all", + }); expect(markGatewayDraining).toHaveBeenCalledTimes(1); expect(gatewayLog.warn).toHaveBeenCalledWith(DRAIN_TIMEOUT_LOG); expect(closeFirst).toHaveBeenCalledWith({ @@ -325,6 +350,96 @@ describe("runGatewayLoop", () => { }); }); + it("flushes inbound debouncers and waits for followup queue drain before marking gateway draining on SIGUSR1", async () => { + vi.clearAllMocks(); + + await withIsolatedSignals(async ({ captureSignal }) => { + // Simulate debouncers having buffered messages + flushAllInboundDebouncers.mockResolvedValueOnce(2); + waitForFollowupQueueDrain.mockResolvedValueOnce({ + drained: true, + remaining: 0, + }); + + const { exited } = await createSignaledLoopHarness(); + const sigusr1 = captureSignal("SIGUSR1"); + const sigterm = captureSignal("SIGTERM"); + + sigusr1(); + + // Let the restart complete + await new Promise((resolve) => setImmediate(resolve)); + await new Promise((resolve) => setImmediate(resolve)); + + // Verify flush was called before markGatewayDraining + expect(flushAllInboundDebouncers).toHaveBeenCalledTimes(1); + expect(waitForFollowupQueueDrain).toHaveBeenCalledWith(5_000); + expect(markGatewayDraining).toHaveBeenCalledTimes(1); + + // Verify logging + expect(gatewayLog.info).toHaveBeenCalledWith("flushed 2 inbound debouncer(s) before restart"); + expect(gatewayLog.info).toHaveBeenCalledWith("followup queues drained after debounce flush"); + + sigterm(); + await expect(exited).resolves.toBe(0); + }); + }); + + it("skips followup queue drain when no debouncers had buffered messages", async () => { + vi.clearAllMocks(); + + await withIsolatedSignals(async ({ captureSignal }) => { + // No debouncers had buffered messages + flushAllInboundDebouncers.mockResolvedValueOnce(0); + + const { exited } = await createSignaledLoopHarness(); + const sigusr1 = captureSignal("SIGUSR1"); + const sigterm = captureSignal("SIGTERM"); + + sigusr1(); + + await new Promise((resolve) => setImmediate(resolve)); + await new Promise((resolve) => setImmediate(resolve)); + + expect(flushAllInboundDebouncers).toHaveBeenCalledTimes(1); + // Should NOT wait for followup drain when nothing was flushed + expect(waitForFollowupQueueDrain).not.toHaveBeenCalled(); + // Should still mark draining + expect(markGatewayDraining).toHaveBeenCalledTimes(1); + + sigterm(); + await expect(exited).resolves.toBe(0); + }); + }); + + it("logs warning when followup queue drain times out", async () => { + vi.clearAllMocks(); + + await withIsolatedSignals(async ({ captureSignal }) => { + flushAllInboundDebouncers.mockResolvedValueOnce(1); + waitForFollowupQueueDrain.mockResolvedValueOnce({ + drained: false, + remaining: 3, + }); + + const { exited } = await createSignaledLoopHarness(); + const sigusr1 = captureSignal("SIGUSR1"); + const sigterm = captureSignal("SIGTERM"); + + sigusr1(); + + await new Promise((resolve) => setImmediate(resolve)); + await new Promise((resolve) => setImmediate(resolve)); + + expect(gatewayLog.warn).toHaveBeenCalledWith( + "followup queue drain timeout; 3 item(s) still pending", + ); + + sigterm(); + await expect(exited).resolves.toBe(0); + }); + }); + it("releases the lock before exiting on spawned restart", async () => { vi.clearAllMocks(); diff --git a/src/cli/gateway-cli/run-loop.ts b/src/cli/gateway-cli/run-loop.ts index 23ec7dd584d..27332d540e3 100644 --- a/src/cli/gateway-cli/run-loop.ts +++ b/src/cli/gateway-cli/run-loop.ts @@ -3,6 +3,8 @@ import { getActiveEmbeddedRunCount, waitForActiveEmbeddedRuns, } from "../../agents/pi-embedded-runner/runs.js"; +import { flushAllInboundDebouncers } from "../../auto-reply/inbound-debounce.js"; +import { waitForFollowupQueueDrain } from "../../auto-reply/reply/queue/drain-all.js"; import type { startGatewayServer } from "../../gateway/server.js"; import { acquireGatewayLock } from "../../infra/gateway-lock.js"; import { restartGatewayProcessWithFreshPid } from "../../infra/process-respawn.js"; @@ -123,6 +125,26 @@ export async function runGatewayLoop(params: { // On restart, wait for in-flight agent turns to finish before // tearing down the server so buffered messages are delivered. if (isRestart) { + // Flush inbound debounce buffers first. This pushes any messages + // waiting in per-channel debounce timers (e.g. the 2500ms collect + // window) into the followup queues immediately, preventing silent + // message loss when the server reinitializes. + const flushedDebouncers = await flushAllInboundDebouncers(); + if (flushedDebouncers > 0) { + gatewayLog.info(`flushed ${flushedDebouncers} inbound debouncer(s) before restart`); + // Give the followup queue drain loops a short window to process + // the newly flushed items before we mark the gateway as draining. + const FOLLOWUP_DRAIN_TIMEOUT_MS = 5_000; + const followupResult = await waitForFollowupQueueDrain(FOLLOWUP_DRAIN_TIMEOUT_MS); + if (followupResult.drained) { + gatewayLog.info("followup queues drained after debounce flush"); + } else { + gatewayLog.warn( + `followup queue drain timeout; ${followupResult.remaining} item(s) still pending`, + ); + } + } + // Reject new enqueues immediately during the drain window so // sessions get an explicit restart error instead of silent task loss. markGatewayDraining(); From 9b19b945d74b3742fba36bc63c7752280677b90f Mon Sep 17 00:00:00 2001 From: Joey Krug Date: Sat, 14 Mar 2026 15:16:48 -0400 Subject: [PATCH 02/11] fix: address queue drain review feedback --- .../feishu/src/monitor.reaction.test.ts | 3 +- src/auto-reply/inbound-debounce.ts | 22 +++++++--- src/auto-reply/inbound.test.ts | 32 +++++++++++++- src/auto-reply/reply/queue/drain-all.test.ts | 9 ++++ src/auto-reply/reply/queue/drain-all.ts | 10 ++--- src/cli/gateway-cli/run-loop.test.ts | 42 ++++++++++++++++++- src/cli/gateway-cli/run-loop.ts | 14 ++++--- 7 files changed, 111 insertions(+), 21 deletions(-) diff --git a/extensions/feishu/src/monitor.reaction.test.ts b/extensions/feishu/src/monitor.reaction.test.ts index 5765577441f..06efee0661a 100644 --- a/extensions/feishu/src/monitor.reaction.test.ts +++ b/extensions/feishu/src/monitor.reaction.test.ts @@ -711,7 +711,8 @@ describe("Feishu inbound debounce regressions", () => { enqueueMock(item); params.onError?.(new Error("dispatch failed"), [item]); }, - flushKey: async () => {}, + flushKey: async (_key: string) => {}, + flushAll: async () => {}, }), resolveInboundDebounceMs, }, diff --git a/src/auto-reply/inbound-debounce.ts b/src/auto-reply/inbound-debounce.ts index 96be2d94b1b..77eb234847a 100644 --- a/src/auto-reply/inbound-debounce.ts +++ b/src/auto-reply/inbound-debounce.ts @@ -5,9 +5,12 @@ import { resolveGlobalMap } from "../shared/global-singleton.js"; /** * Global registry of all active inbound debouncers so they can be flushed * collectively during gateway restart (SIGUSR1). Each debouncer registers - * itself on creation and deregisters when flushAll is called. + * itself on creation and deregisters after the next global flush sweep. */ -type DebouncerFlushHandle = { flushAll: () => Promise }; +type DebouncerFlushHandle = { + getPendingBufferCount: () => number; + flushAll: () => Promise; +}; const INBOUND_DEBOUNCERS_KEY = Symbol.for("openclaw.inboundDebouncers"); const INBOUND_DEBOUNCERS = resolveGlobalMap(INBOUND_DEBOUNCERS_KEY); @@ -21,24 +24,28 @@ export function clearInboundDebouncerRegistry(): void { /** * Flush all registered inbound debouncers immediately. Called during SIGUSR1 * restart to push buffered messages into the session before reinitializing. + * Returns the number of pending debounce buffers that existed when the flush + * started so restart logic can skip followup draining when there was no work. */ export async function flushAllInboundDebouncers(): Promise { const entries = [...INBOUND_DEBOUNCERS.entries()]; if (entries.length === 0) { return 0; } - let flushedCount = 0; + const pendingBufferCount = entries.reduce( + (count, [, handle]) => count + handle.getPendingBufferCount(), + 0, + ); await Promise.all( entries.map(async ([key, handle]) => { try { await handle.flushAll(); - flushedCount += 1; } finally { INBOUND_DEBOUNCERS.delete(key); } }), ); - return flushedCount; + return pendingBufferCount; } const resolveMs = (value: unknown): number | undefined => { @@ -177,7 +184,10 @@ export function createInboundDebouncer(params: InboundDebounceCreateParams // Register in global registry for SIGUSR1 flush. const registryKey = Symbol(); - INBOUND_DEBOUNCERS.set(registryKey, { flushAll }); + INBOUND_DEBOUNCERS.set(registryKey, { + getPendingBufferCount: () => buffers.size, + flushAll, + }); return { enqueue, flushKey, flushAll }; } diff --git a/src/auto-reply/inbound.test.ts b/src/auto-reply/inbound.test.ts index c43fa28ce4b..0075d758bc6 100644 --- a/src/auto-reply/inbound.test.ts +++ b/src/auto-reply/inbound.test.ts @@ -372,7 +372,7 @@ describe("flushAllInboundDebouncers", () => { clearInboundDebouncerRegistry(); }); - it("flushes all registered debouncers immediately", async () => { + it("flushes all pending inbound debounce buffers immediately", async () => { vi.useFakeTimers(); const callsA: Array = []; const callsB: Array = []; @@ -409,6 +409,36 @@ describe("flushAllInboundDebouncers", () => { vi.useRealTimers(); }); + it("counts pending buffers instead of registered debouncers", async () => { + vi.useFakeTimers(); + const calls: Array = []; + + const activeDebouncer = createInboundDebouncer<{ key: string; id: string }>({ + debounceMs: 5000, + buildKey: (item) => item.key, + onFlush: async (items) => { + calls.push(items.map((entry) => entry.id)); + }, + }); + + createInboundDebouncer<{ key: string; id: string }>({ + debounceMs: 5000, + buildKey: (item) => item.key, + onFlush: async () => {}, + }); + + await activeDebouncer.enqueue({ key: "session-1", id: "msg-1" }); + await activeDebouncer.enqueue({ key: "session-2", id: "msg-2" }); + + const flushed = await flushAllInboundDebouncers(); + expect(flushed).toBe(2); + expect(calls).toHaveLength(2); + expect(calls).toContainEqual(["msg-1"]); + expect(calls).toContainEqual(["msg-2"]); + + vi.useRealTimers(); + }); + it("returns 0 when no debouncers are registered", async () => { const flushed = await flushAllInboundDebouncers(); expect(flushed).toBe(0); diff --git a/src/auto-reply/reply/queue/drain-all.test.ts b/src/auto-reply/reply/queue/drain-all.test.ts index ad166e07b0a..eff31238def 100644 --- a/src/auto-reply/reply/queue/drain-all.test.ts +++ b/src/auto-reply/reply/queue/drain-all.test.ts @@ -77,4 +77,13 @@ describe("waitForFollowupQueueDrain", () => { expect(result.drained).toBe(false); expect(result.remaining).toBeGreaterThanOrEqual(1); }); + + it("reports each draining queue in the timeout remaining count", async () => { + FOLLOWUP_QUEUES.set("queue-1", createMockQueue({ draining: true })); + FOLLOWUP_QUEUES.set("queue-2", createMockQueue({ draining: true })); + FOLLOWUP_QUEUES.set("queue-3", createMockQueue({ draining: true })); + + const result = await waitForFollowupQueueDrain(1); + expect(result).toEqual({ drained: false, remaining: 3 }); + }); }); diff --git a/src/auto-reply/reply/queue/drain-all.ts b/src/auto-reply/reply/queue/drain-all.ts index a0e4c2e4f5a..f681ed0f557 100644 --- a/src/auto-reply/reply/queue/drain-all.ts +++ b/src/auto-reply/reply/queue/drain-all.ts @@ -17,13 +17,9 @@ export async function waitForFollowupQueueDrain( const getPendingCount = (): number => { let total = 0; for (const queue of FOLLOWUP_QUEUES.values()) { - total += queue.items.length; - if (queue.draining) { - // Count draining queues as having at least 1 pending item so we keep - // waiting for the drain loop to finish even if items.length hits 0 - // momentarily between shifts. - total = Math.max(total, 1); - } + // Add 1 for the in-flight item owned by an active drain loop. + const queuePending = queue.items.length + (queue.draining ? 1 : 0); + total += queuePending; } return total; }; diff --git a/src/cli/gateway-cli/run-loop.test.ts b/src/cli/gateway-cli/run-loop.test.ts index 49c1c4734d1..dd9f80db8e3 100644 --- a/src/cli/gateway-cli/run-loop.test.ts +++ b/src/cli/gateway-cli/run-loop.test.ts @@ -375,9 +375,17 @@ describe("runGatewayLoop", () => { expect(flushAllInboundDebouncers).toHaveBeenCalledTimes(1); expect(waitForFollowupQueueDrain).toHaveBeenCalledWith(5_000); expect(markGatewayDraining).toHaveBeenCalledTimes(1); + expect(flushAllInboundDebouncers.mock.invocationCallOrder[0]).toBeLessThan( + waitForFollowupQueueDrain.mock.invocationCallOrder[0] ?? Number.POSITIVE_INFINITY, + ); + expect(waitForFollowupQueueDrain.mock.invocationCallOrder[0]).toBeLessThan( + markGatewayDraining.mock.invocationCallOrder[0] ?? Number.POSITIVE_INFINITY, + ); // Verify logging - expect(gatewayLog.info).toHaveBeenCalledWith("flushed 2 inbound debouncer(s) before restart"); + expect(gatewayLog.info).toHaveBeenCalledWith( + "flushed 2 pending inbound debounce buffer(s) before restart", + ); expect(gatewayLog.info).toHaveBeenCalledWith("followup queues drained after debounce flush"); sigterm(); @@ -385,6 +393,38 @@ describe("runGatewayLoop", () => { }); }); + it("extends the restart force-exit timer to include followup queue drain time", async () => { + vi.clearAllMocks(); + + await withIsolatedSignals(async ({ captureSignal }) => { + flushAllInboundDebouncers.mockResolvedValueOnce(1); + waitForFollowupQueueDrain.mockResolvedValueOnce({ + drained: true, + remaining: 0, + }); + + const setTimeoutSpy = vi.spyOn(globalThis, "setTimeout"); + try { + const { exited } = await createSignaledLoopHarness(); + const sigusr1 = captureSignal("SIGUSR1"); + const sigterm = captureSignal("SIGTERM"); + + sigusr1(); + + await new Promise((resolve) => setImmediate(resolve)); + await new Promise((resolve) => setImmediate(resolve)); + + const forceExitCall = setTimeoutSpy.mock.calls.find((call) => call[1] === 100_000); + expect(forceExitCall).toBeDefined(); + + sigterm(); + await expect(exited).resolves.toBe(0); + } finally { + setTimeoutSpy.mockRestore(); + } + }); + }); + it("skips followup queue drain when no debouncers had buffered messages", async () => { vi.clearAllMocks(); diff --git a/src/cli/gateway-cli/run-loop.ts b/src/cli/gateway-cli/run-loop.ts index 27332d540e3..85bf389cb1a 100644 --- a/src/cli/gateway-cli/run-loop.ts +++ b/src/cli/gateway-cli/run-loop.ts @@ -99,6 +99,7 @@ export async function runGatewayLoop(params: { }; const DRAIN_TIMEOUT_MS = 90_000; + const FOLLOWUP_DRAIN_TIMEOUT_MS = 5_000; const SHUTDOWN_TIMEOUT_MS = 5_000; const request = (action: GatewayRunSignalAction, signal: string) => { @@ -111,7 +112,9 @@ export async function runGatewayLoop(params: { gatewayLog.info(`received ${signal}; ${isRestart ? "restarting" : "shutting down"}`); // Allow extra time for draining active turns on restart. - const forceExitMs = isRestart ? DRAIN_TIMEOUT_MS + SHUTDOWN_TIMEOUT_MS : SHUTDOWN_TIMEOUT_MS; + const forceExitMs = isRestart + ? DRAIN_TIMEOUT_MS + FOLLOWUP_DRAIN_TIMEOUT_MS + SHUTDOWN_TIMEOUT_MS + : SHUTDOWN_TIMEOUT_MS; const forceExitTimer = setTimeout(() => { gatewayLog.error("shutdown timed out; exiting without full cleanup"); // Exit non-zero on restart timeout so launchd/systemd treats it as a @@ -129,12 +132,13 @@ export async function runGatewayLoop(params: { // waiting in per-channel debounce timers (e.g. the 2500ms collect // window) into the followup queues immediately, preventing silent // message loss when the server reinitializes. - const flushedDebouncers = await flushAllInboundDebouncers(); - if (flushedDebouncers > 0) { - gatewayLog.info(`flushed ${flushedDebouncers} inbound debouncer(s) before restart`); + const flushedBuffers = await flushAllInboundDebouncers(); + if (flushedBuffers > 0) { + gatewayLog.info( + `flushed ${flushedBuffers} pending inbound debounce buffer(s) before restart`, + ); // Give the followup queue drain loops a short window to process // the newly flushed items before we mark the gateway as draining. - const FOLLOWUP_DRAIN_TIMEOUT_MS = 5_000; const followupResult = await waitForFollowupQueueDrain(FOLLOWUP_DRAIN_TIMEOUT_MS); if (followupResult.drained) { gatewayLog.info("followup queues drained after debounce flush"); From 070fc040b2da34c1f4a74771645bf8941b77660a Mon Sep 17 00:00:00 2001 From: Joey Krug Date: Sat, 14 Mar 2026 15:37:08 -0400 Subject: [PATCH 03/11] fix: only add restart drain timeout when needed --- src/auto-reply/inbound-debounce.ts | 12 ++++++--- src/cli/gateway-cli/run-loop.test.ts | 39 ++++++++++++++++++---------- src/cli/gateway-cli/run-loop.ts | 19 ++++++++++---- 3 files changed, 48 insertions(+), 22 deletions(-) diff --git a/src/auto-reply/inbound-debounce.ts b/src/auto-reply/inbound-debounce.ts index 77eb234847a..8c5ebe2a846 100644 --- a/src/auto-reply/inbound-debounce.ts +++ b/src/auto-reply/inbound-debounce.ts @@ -21,6 +21,13 @@ export function clearInboundDebouncerRegistry(): void { INBOUND_DEBOUNCERS.clear(); } +export function getPendingInboundDebounceBufferCount(): number { + return [...INBOUND_DEBOUNCERS.values()].reduce( + (count, handle) => count + handle.getPendingBufferCount(), + 0, + ); +} + /** * Flush all registered inbound debouncers immediately. Called during SIGUSR1 * restart to push buffered messages into the session before reinitializing. @@ -32,10 +39,7 @@ export async function flushAllInboundDebouncers(): Promise { if (entries.length === 0) { return 0; } - const pendingBufferCount = entries.reduce( - (count, [, handle]) => count + handle.getPendingBufferCount(), - 0, - ); + const pendingBufferCount = getPendingInboundDebounceBufferCount(); await Promise.all( entries.map(async ([key, handle]) => { try { diff --git a/src/cli/gateway-cli/run-loop.test.ts b/src/cli/gateway-cli/run-loop.test.ts index dd9f80db8e3..70c3e52e448 100644 --- a/src/cli/gateway-cli/run-loop.test.ts +++ b/src/cli/gateway-cli/run-loop.test.ts @@ -37,6 +37,7 @@ const getActiveEmbeddedRunCount = vi.fn(() => 0); const waitForActiveEmbeddedRuns = vi.fn(async (_timeoutMs: number) => ({ drained: true, })); +const getPendingInboundDebounceBufferCount = vi.fn(() => 0); const flushAllInboundDebouncers = vi.fn(async () => 0); const waitForFollowupQueueDrain = vi.fn(async (_timeoutMs: number) => ({ drained: true, @@ -50,6 +51,7 @@ const gatewayLog = { }; vi.mock("../../auto-reply/inbound-debounce.js", () => ({ + getPendingInboundDebounceBufferCount: () => getPendingInboundDebounceBufferCount(), flushAllInboundDebouncers: () => flushAllInboundDebouncers(), })); @@ -355,6 +357,7 @@ describe("runGatewayLoop", () => { await withIsolatedSignals(async ({ captureSignal }) => { // Simulate debouncers having buffered messages + getPendingInboundDebounceBufferCount.mockReturnValueOnce(2); flushAllInboundDebouncers.mockResolvedValueOnce(2); waitForFollowupQueueDrain.mockResolvedValueOnce({ drained: true, @@ -397,6 +400,7 @@ describe("runGatewayLoop", () => { vi.clearAllMocks(); await withIsolatedSignals(async ({ captureSignal }) => { + getPendingInboundDebounceBufferCount.mockReturnValueOnce(1); flushAllInboundDebouncers.mockResolvedValueOnce(1); waitForFollowupQueueDrain.mockResolvedValueOnce({ drained: true, @@ -430,25 +434,33 @@ describe("runGatewayLoop", () => { await withIsolatedSignals(async ({ captureSignal }) => { // No debouncers had buffered messages + getPendingInboundDebounceBufferCount.mockReturnValueOnce(0); flushAllInboundDebouncers.mockResolvedValueOnce(0); - const { exited } = await createSignaledLoopHarness(); - const sigusr1 = captureSignal("SIGUSR1"); - const sigterm = captureSignal("SIGTERM"); + const setTimeoutSpy = vi.spyOn(globalThis, "setTimeout"); + try { + const { exited } = await createSignaledLoopHarness(); + const sigusr1 = captureSignal("SIGUSR1"); + const sigterm = captureSignal("SIGTERM"); - sigusr1(); + sigusr1(); - await new Promise((resolve) => setImmediate(resolve)); - await new Promise((resolve) => setImmediate(resolve)); + await new Promise((resolve) => setImmediate(resolve)); + await new Promise((resolve) => setImmediate(resolve)); - expect(flushAllInboundDebouncers).toHaveBeenCalledTimes(1); - // Should NOT wait for followup drain when nothing was flushed - expect(waitForFollowupQueueDrain).not.toHaveBeenCalled(); - // Should still mark draining - expect(markGatewayDraining).toHaveBeenCalledTimes(1); + expect(flushAllInboundDebouncers).toHaveBeenCalledTimes(1); + // Should NOT wait for followup drain when nothing was flushed + expect(waitForFollowupQueueDrain).not.toHaveBeenCalled(); + // Should still mark draining + expect(markGatewayDraining).toHaveBeenCalledTimes(1); + const forceExitCall = setTimeoutSpy.mock.calls.find((call) => call[1] === 95_000); + expect(forceExitCall).toBeDefined(); - sigterm(); - await expect(exited).resolves.toBe(0); + sigterm(); + await expect(exited).resolves.toBe(0); + } finally { + setTimeoutSpy.mockRestore(); + } }); }); @@ -456,6 +468,7 @@ describe("runGatewayLoop", () => { vi.clearAllMocks(); await withIsolatedSignals(async ({ captureSignal }) => { + getPendingInboundDebounceBufferCount.mockReturnValueOnce(1); flushAllInboundDebouncers.mockResolvedValueOnce(1); waitForFollowupQueueDrain.mockResolvedValueOnce({ drained: false, diff --git a/src/cli/gateway-cli/run-loop.ts b/src/cli/gateway-cli/run-loop.ts index 85bf389cb1a..c3885ae9322 100644 --- a/src/cli/gateway-cli/run-loop.ts +++ b/src/cli/gateway-cli/run-loop.ts @@ -3,7 +3,10 @@ import { getActiveEmbeddedRunCount, waitForActiveEmbeddedRuns, } from "../../agents/pi-embedded-runner/runs.js"; -import { flushAllInboundDebouncers } from "../../auto-reply/inbound-debounce.js"; +import { + flushAllInboundDebouncers, + getPendingInboundDebounceBufferCount, +} from "../../auto-reply/inbound-debounce.js"; import { waitForFollowupQueueDrain } from "../../auto-reply/reply/queue/drain-all.js"; import type { startGatewayServer } from "../../gateway/server.js"; import { acquireGatewayLock } from "../../infra/gateway-lock.js"; @@ -111,10 +114,16 @@ export async function runGatewayLoop(params: { const isRestart = action === "restart"; gatewayLog.info(`received ${signal}; ${isRestart ? "restarting" : "shutting down"}`); - // Allow extra time for draining active turns on restart. - const forceExitMs = isRestart - ? DRAIN_TIMEOUT_MS + FOLLOWUP_DRAIN_TIMEOUT_MS + SHUTDOWN_TIMEOUT_MS - : SHUTDOWN_TIMEOUT_MS; + // Allow extra time for followup drain only when restart will actually + // flush buffered inbound messages into followup queues. + const hasPendingInboundDebounceBuffers = getPendingInboundDebounceBufferCount() > 0; + let forceExitMs = SHUTDOWN_TIMEOUT_MS; + if (isRestart) { + forceExitMs += DRAIN_TIMEOUT_MS; + if (hasPendingInboundDebounceBuffers) { + forceExitMs += FOLLOWUP_DRAIN_TIMEOUT_MS; + } + } const forceExitTimer = setTimeout(() => { gatewayLog.error("shutdown timed out; exiting without full cleanup"); // Exit non-zero on restart timeout so launchd/systemd treats it as a From 23731ed09913f99a0e666d77d31d022f982d1fd3 Mon Sep 17 00:00:00 2001 From: Joey Krug Date: Sat, 14 Mar 2026 16:39:24 -0400 Subject: [PATCH 04/11] Gateway: finish drain-queue restart review fixes --- .../feishu/src/monitor.reaction.test.ts | 2 +- src/auto-reply/inbound-debounce.ts | 41 ++++++------- src/auto-reply/inbound.test.ts | 28 +++++++++ src/cli/gateway-cli/run-loop.test.ts | 39 ++++++------- src/cli/gateway-cli/run-loop.ts | 57 ++++++++++--------- 5 files changed, 99 insertions(+), 68 deletions(-) diff --git a/extensions/feishu/src/monitor.reaction.test.ts b/extensions/feishu/src/monitor.reaction.test.ts index 06efee0661a..b9fecb8050c 100644 --- a/extensions/feishu/src/monitor.reaction.test.ts +++ b/extensions/feishu/src/monitor.reaction.test.ts @@ -712,7 +712,7 @@ describe("Feishu inbound debounce regressions", () => { params.onError?.(new Error("dispatch failed"), [item]); }, flushKey: async (_key: string) => {}, - flushAll: async () => {}, + flushAll: async () => 0, }), resolveInboundDebounceMs, }, diff --git a/src/auto-reply/inbound-debounce.ts b/src/auto-reply/inbound-debounce.ts index 8c5ebe2a846..c74cb2f5992 100644 --- a/src/auto-reply/inbound-debounce.ts +++ b/src/auto-reply/inbound-debounce.ts @@ -8,8 +8,7 @@ import { resolveGlobalMap } from "../shared/global-singleton.js"; * itself on creation and deregisters after the next global flush sweep. */ type DebouncerFlushHandle = { - getPendingBufferCount: () => number; - flushAll: () => Promise; + flushAll: () => Promise; }; const INBOUND_DEBOUNCERS_KEY = Symbol.for("openclaw.inboundDebouncers"); const INBOUND_DEBOUNCERS = resolveGlobalMap(INBOUND_DEBOUNCERS_KEY); @@ -21,35 +20,27 @@ export function clearInboundDebouncerRegistry(): void { INBOUND_DEBOUNCERS.clear(); } -export function getPendingInboundDebounceBufferCount(): number { - return [...INBOUND_DEBOUNCERS.values()].reduce( - (count, handle) => count + handle.getPendingBufferCount(), - 0, - ); -} - /** * Flush all registered inbound debouncers immediately. Called during SIGUSR1 * restart to push buffered messages into the session before reinitializing. - * Returns the number of pending debounce buffers that existed when the flush - * started so restart logic can skip followup draining when there was no work. + * Returns the number of debounce buffers actually flushed so restart logic can + * skip followup draining when there was no buffered work. */ export async function flushAllInboundDebouncers(): Promise { const entries = [...INBOUND_DEBOUNCERS.entries()]; if (entries.length === 0) { return 0; } - const pendingBufferCount = getPendingInboundDebounceBufferCount(); - await Promise.all( + const flushedCounts = await Promise.all( entries.map(async ([key, handle]) => { try { - await handle.flushAll(); + return await handle.flushAll(); } finally { INBOUND_DEBOUNCERS.delete(key); } }), ); - return pendingBufferCount; + return flushedCounts.reduce((total, count) => total + count, 0); } const resolveMs = (value: unknown): number | undefined => { @@ -180,16 +171,28 @@ export function createInboundDebouncer(params: InboundDebounceCreateParams }; const flushAll = async () => { - const keys = [...buffers.keys()]; - for (const key of keys) { - await flushKey(key); + let flushedBufferCount = 0; + + // Keep sweeping until no debounced keys remain. A flush callback can race + // with late in-flight ingress and create another buffered key before the + // global registry deregisters this debouncer during restart. + while (buffers.size > 0) { + const keys = [...buffers.keys()]; + for (const key of keys) { + if (!buffers.has(key)) { + continue; + } + await flushKey(key); + flushedBufferCount += 1; + } } + + return flushedBufferCount; }; // Register in global registry for SIGUSR1 flush. const registryKey = Symbol(); INBOUND_DEBOUNCERS.set(registryKey, { - getPendingBufferCount: () => buffers.size, flushAll, }); diff --git a/src/auto-reply/inbound.test.ts b/src/auto-reply/inbound.test.ts index 0075d758bc6..54ea66ae1d8 100644 --- a/src/auto-reply/inbound.test.ts +++ b/src/auto-reply/inbound.test.ts @@ -439,6 +439,34 @@ describe("flushAllInboundDebouncers", () => { vi.useRealTimers(); }); + it("keeps flushing until no buffered keys remain", async () => { + vi.useFakeTimers(); + const calls: Array = []; + let enqueuedDuringFlush = false; + + let debouncer: ReturnType>; + debouncer = createInboundDebouncer<{ key: string; id: string }>({ + debounceMs: 5000, + buildKey: (item) => item.key, + onFlush: async (items) => { + calls.push(items.map((entry) => entry.id)); + if (!enqueuedDuringFlush) { + enqueuedDuringFlush = true; + await debouncer.enqueue({ key: "session-2", id: "msg-2" }); + } + }, + }); + + await debouncer.enqueue({ key: "session-1", id: "msg-1" }); + + const flushed = await flushAllInboundDebouncers(); + expect(flushed).toBe(2); + expect(calls).toEqual([["msg-1"], ["msg-2"]]); + await expect(flushAllInboundDebouncers()).resolves.toBe(0); + + vi.useRealTimers(); + }); + it("returns 0 when no debouncers are registered", async () => { const flushed = await flushAllInboundDebouncers(); expect(flushed).toBe(0); diff --git a/src/cli/gateway-cli/run-loop.test.ts b/src/cli/gateway-cli/run-loop.test.ts index 70c3e52e448..76a4e3066b4 100644 --- a/src/cli/gateway-cli/run-loop.test.ts +++ b/src/cli/gateway-cli/run-loop.test.ts @@ -37,7 +37,6 @@ const getActiveEmbeddedRunCount = vi.fn(() => 0); const waitForActiveEmbeddedRuns = vi.fn(async (_timeoutMs: number) => ({ drained: true, })); -const getPendingInboundDebounceBufferCount = vi.fn(() => 0); const flushAllInboundDebouncers = vi.fn(async () => 0); const waitForFollowupQueueDrain = vi.fn(async (_timeoutMs: number) => ({ drained: true, @@ -51,7 +50,6 @@ const gatewayLog = { }; vi.mock("../../auto-reply/inbound-debounce.js", () => ({ - getPendingInboundDebounceBufferCount: () => getPendingInboundDebounceBufferCount(), flushAllInboundDebouncers: () => flushAllInboundDebouncers(), })); @@ -352,12 +350,10 @@ describe("runGatewayLoop", () => { }); }); - it("flushes inbound debouncers and waits for followup queue drain before marking gateway draining on SIGUSR1", async () => { + it("marks gateway draining before flushing inbound debouncers on SIGUSR1", async () => { vi.clearAllMocks(); await withIsolatedSignals(async ({ captureSignal }) => { - // Simulate debouncers having buffered messages - getPendingInboundDebounceBufferCount.mockReturnValueOnce(2); flushAllInboundDebouncers.mockResolvedValueOnce(2); waitForFollowupQueueDrain.mockResolvedValueOnce({ drained: true, @@ -370,22 +366,22 @@ describe("runGatewayLoop", () => { sigusr1(); - // Let the restart complete await new Promise((resolve) => setImmediate(resolve)); await new Promise((resolve) => setImmediate(resolve)); - // Verify flush was called before markGatewayDraining + expect(markGatewayDraining).toHaveBeenCalledTimes(1); expect(flushAllInboundDebouncers).toHaveBeenCalledTimes(1); expect(waitForFollowupQueueDrain).toHaveBeenCalledWith(5_000); - expect(markGatewayDraining).toHaveBeenCalledTimes(1); + expect(markGatewayDraining.mock.invocationCallOrder[0]).toBeLessThan( + flushAllInboundDebouncers.mock.invocationCallOrder[0] ?? Number.POSITIVE_INFINITY, + ); + expect(markGatewayDraining.mock.invocationCallOrder[0]).toBeLessThan( + waitForFollowupQueueDrain.mock.invocationCallOrder[0] ?? Number.POSITIVE_INFINITY, + ); expect(flushAllInboundDebouncers.mock.invocationCallOrder[0]).toBeLessThan( waitForFollowupQueueDrain.mock.invocationCallOrder[0] ?? Number.POSITIVE_INFINITY, ); - expect(waitForFollowupQueueDrain.mock.invocationCallOrder[0]).toBeLessThan( - markGatewayDraining.mock.invocationCallOrder[0] ?? Number.POSITIVE_INFINITY, - ); - // Verify logging expect(gatewayLog.info).toHaveBeenCalledWith( "flushed 2 pending inbound debounce buffer(s) before restart", ); @@ -400,7 +396,6 @@ describe("runGatewayLoop", () => { vi.clearAllMocks(); await withIsolatedSignals(async ({ captureSignal }) => { - getPendingInboundDebounceBufferCount.mockReturnValueOnce(1); flushAllInboundDebouncers.mockResolvedValueOnce(1); waitForFollowupQueueDrain.mockResolvedValueOnce({ drained: true, @@ -418,8 +413,11 @@ describe("runGatewayLoop", () => { await new Promise((resolve) => setImmediate(resolve)); await new Promise((resolve) => setImmediate(resolve)); - const forceExitCall = setTimeoutSpy.mock.calls.find((call) => call[1] === 100_000); - expect(forceExitCall).toBeDefined(); + const forceExitCalls = setTimeoutSpy.mock.calls + .map((call) => call[1]) + .filter((delay): delay is number => typeof delay === "number" && delay >= 95_000); + expect(forceExitCalls).toContain(95_000); + expect(forceExitCalls.some((delay) => delay > 95_000 && delay <= 100_000)).toBe(true); sigterm(); await expect(exited).resolves.toBe(0); @@ -433,8 +431,6 @@ describe("runGatewayLoop", () => { vi.clearAllMocks(); await withIsolatedSignals(async ({ captureSignal }) => { - // No debouncers had buffered messages - getPendingInboundDebounceBufferCount.mockReturnValueOnce(0); flushAllInboundDebouncers.mockResolvedValueOnce(0); const setTimeoutSpy = vi.spyOn(globalThis, "setTimeout"); @@ -449,12 +445,12 @@ describe("runGatewayLoop", () => { await new Promise((resolve) => setImmediate(resolve)); expect(flushAllInboundDebouncers).toHaveBeenCalledTimes(1); - // Should NOT wait for followup drain when nothing was flushed expect(waitForFollowupQueueDrain).not.toHaveBeenCalled(); - // Should still mark draining expect(markGatewayDraining).toHaveBeenCalledTimes(1); - const forceExitCall = setTimeoutSpy.mock.calls.find((call) => call[1] === 95_000); - expect(forceExitCall).toBeDefined(); + const forceExitCalls = setTimeoutSpy.mock.calls + .map((call) => call[1]) + .filter((delay): delay is number => typeof delay === "number" && delay >= 95_000); + expect(forceExitCalls).toEqual([95_000]); sigterm(); await expect(exited).resolves.toBe(0); @@ -468,7 +464,6 @@ describe("runGatewayLoop", () => { vi.clearAllMocks(); await withIsolatedSignals(async ({ captureSignal }) => { - getPendingInboundDebounceBufferCount.mockReturnValueOnce(1); flushAllInboundDebouncers.mockResolvedValueOnce(1); waitForFollowupQueueDrain.mockResolvedValueOnce({ drained: false, diff --git a/src/cli/gateway-cli/run-loop.ts b/src/cli/gateway-cli/run-loop.ts index c3885ae9322..338b85a9917 100644 --- a/src/cli/gateway-cli/run-loop.ts +++ b/src/cli/gateway-cli/run-loop.ts @@ -3,10 +3,7 @@ import { getActiveEmbeddedRunCount, waitForActiveEmbeddedRuns, } from "../../agents/pi-embedded-runner/runs.js"; -import { - flushAllInboundDebouncers, - getPendingInboundDebounceBufferCount, -} from "../../auto-reply/inbound-debounce.js"; +import { flushAllInboundDebouncers } from "../../auto-reply/inbound-debounce.js"; import { waitForFollowupQueueDrain } from "../../auto-reply/reply/queue/drain-all.js"; import type { startGatewayServer } from "../../gateway/server.js"; import { acquireGatewayLock } from "../../infra/gateway-lock.js"; @@ -114,29 +111,38 @@ export async function runGatewayLoop(params: { const isRestart = action === "restart"; gatewayLog.info(`received ${signal}; ${isRestart ? "restarting" : "shutting down"}`); - // Allow extra time for followup drain only when restart will actually - // flush buffered inbound messages into followup queues. - const hasPendingInboundDebounceBuffers = getPendingInboundDebounceBufferCount() > 0; - let forceExitMs = SHUTDOWN_TIMEOUT_MS; - if (isRestart) { - forceExitMs += DRAIN_TIMEOUT_MS; - if (hasPendingInboundDebounceBuffers) { - forceExitMs += FOLLOWUP_DRAIN_TIMEOUT_MS; + const signalStartMs = Date.now(); + const baseForceExitDeadlineMs = + signalStartMs + SHUTDOWN_TIMEOUT_MS + (isRestart ? DRAIN_TIMEOUT_MS : 0); + let forceExitTimer: ReturnType | null = null; + const armForceExitTimer = (deadlineMs: number) => { + if (forceExitTimer) { + clearTimeout(forceExitTimer); } - } - const forceExitTimer = setTimeout(() => { - gatewayLog.error("shutdown timed out; exiting without full cleanup"); - // Exit non-zero on restart timeout so launchd/systemd treats it as a - // failure and triggers a clean process restart instead of assuming the - // shutdown was intentional. Stop-timeout stays at 0 (graceful). (#36822) - exitProcess(isRestart ? 1 : 0); - }, forceExitMs); + forceExitTimer = setTimeout( + () => { + gatewayLog.error("shutdown timed out; exiting without full cleanup"); + // Exit non-zero on restart timeout so launchd/systemd treats it as a + // failure and triggers a clean process restart instead of assuming the + // shutdown was intentional. Stop-timeout stays at 0 (graceful). (#36822) + exitProcess(isRestart ? 1 : 0); + }, + Math.max(0, deadlineMs - Date.now()), + ); + forceExitTimer.unref?.(); + }; + armForceExitTimer(baseForceExitDeadlineMs); void (async () => { try { // On restart, wait for in-flight agent turns to finish before // tearing down the server so buffered messages are delivered. if (isRestart) { + // Reject new command-queue work before any awaited restart drain + // step so late arrivals fail explicitly instead of being stranded + // behind a one-shot debounce flush. + markGatewayDraining(); + // Flush inbound debounce buffers first. This pushes any messages // waiting in per-channel debounce timers (e.g. the 2500ms collect // window) into the followup queues immediately, preventing silent @@ -147,7 +153,8 @@ export async function runGatewayLoop(params: { `flushed ${flushedBuffers} pending inbound debounce buffer(s) before restart`, ); // Give the followup queue drain loops a short window to process - // the newly flushed items before we mark the gateway as draining. + // the newly flushed items before the server is torn down. + armForceExitTimer(baseForceExitDeadlineMs + FOLLOWUP_DRAIN_TIMEOUT_MS); const followupResult = await waitForFollowupQueueDrain(FOLLOWUP_DRAIN_TIMEOUT_MS); if (followupResult.drained) { gatewayLog.info("followup queues drained after debounce flush"); @@ -157,10 +164,6 @@ export async function runGatewayLoop(params: { ); } } - - // Reject new enqueues immediately during the drain window so - // sessions get an explicit restart error instead of silent task loss. - markGatewayDraining(); const activeTasks = getActiveTaskCount(); const activeRuns = getActiveEmbeddedRunCount(); @@ -200,7 +203,9 @@ export async function runGatewayLoop(params: { } catch (err) { gatewayLog.error(`shutdown error: ${String(err)}`); } finally { - clearTimeout(forceExitTimer); + if (forceExitTimer) { + clearTimeout(forceExitTimer); + } server = null; if (isRestart) { await handleRestartAfterServerClose(); From 063e06e17b8f79f55eb6f43fd0f4e693ad77ca96 Mon Sep 17 00:00:00 2001 From: Joey Krug Date: Sat, 14 Mar 2026 17:03:43 -0400 Subject: [PATCH 05/11] fix: only count debounce buffers with actual pending messages in flush count flushBuffer/flushKey now return whether messages were actually flushed, so flushAll only increments flushedBufferCount for non-empty buffers. Prevents idle registered debouncers from triggering unnecessary followup queue drain waits during SIGUSR1 restart. Also wraps per-key flush in try/catch so one onError throw cannot strand later buffered messages. --- .../bluebubbles/src/monitor-debounce.ts | 2 +- .../feishu/src/monitor.reaction.test.ts | 2 +- src/auto-reply/inbound-debounce.ts | 19 +++++++--- src/auto-reply/inbound.test.ts | 35 +++++++++++++++++++ 4 files changed, 51 insertions(+), 7 deletions(-) diff --git a/extensions/bluebubbles/src/monitor-debounce.ts b/extensions/bluebubbles/src/monitor-debounce.ts index 298be3e4921..79101bc3fb1 100644 --- a/extensions/bluebubbles/src/monitor-debounce.ts +++ b/extensions/bluebubbles/src/monitor-debounce.ts @@ -13,7 +13,7 @@ type BlueBubblesDebounceEntry = { export type BlueBubblesDebouncer = { enqueue: (item: BlueBubblesDebounceEntry) => Promise; - flushKey: (key: string) => Promise; + flushKey: (key: string) => Promise; }; export type BlueBubblesDebounceRegistry = { diff --git a/extensions/feishu/src/monitor.reaction.test.ts b/extensions/feishu/src/monitor.reaction.test.ts index b9fecb8050c..9e607b17c84 100644 --- a/extensions/feishu/src/monitor.reaction.test.ts +++ b/extensions/feishu/src/monitor.reaction.test.ts @@ -711,7 +711,7 @@ describe("Feishu inbound debounce regressions", () => { enqueueMock(item); params.onError?.(new Error("dispatch failed"), [item]); }, - flushKey: async (_key: string) => {}, + flushKey: async (_key: string) => false, flushAll: async () => 0, }), resolveInboundDebounceMs, diff --git a/src/auto-reply/inbound-debounce.ts b/src/auto-reply/inbound-debounce.ts index c74cb2f5992..c5e6b53520c 100644 --- a/src/auto-reply/inbound-debounce.ts +++ b/src/auto-reply/inbound-debounce.ts @@ -102,6 +102,7 @@ export function createInboundDebouncer(params: InboundDebounceCreateParams return Math.max(0, Math.trunc(resolved)); }; + // Returns true when the buffer had pending messages that were flushed. const flushBuffer = async (key: string, buffer: DebounceBuffer) => { buffers.delete(key); if (buffer.timeout) { @@ -109,21 +110,22 @@ export function createInboundDebouncer(params: InboundDebounceCreateParams buffer.timeout = null; } if (buffer.items.length === 0) { - return; + return false; } try { await params.onFlush(buffer.items); } catch (err) { params.onError?.(err, buffer.items); } + return true; }; const flushKey = async (key: string) => { const buffer = buffers.get(key); if (!buffer) { - return; + return false; } - await flushBuffer(key, buffer); + return flushBuffer(key, buffer); }; const scheduleFlush = (key: string, buffer: DebounceBuffer) => { @@ -182,8 +184,15 @@ export function createInboundDebouncer(params: InboundDebounceCreateParams if (!buffers.has(key)) { continue; } - await flushKey(key); - flushedBufferCount += 1; + try { + const hadMessages = await flushKey(key); + if (hadMessages) { + flushedBufferCount += 1; + } + } catch { + // flushBuffer already routed the failure through onError; keep + // sweeping so one bad key cannot strand later buffered messages. + } } } diff --git a/src/auto-reply/inbound.test.ts b/src/auto-reply/inbound.test.ts index 54ea66ae1d8..47fde4c9920 100644 --- a/src/auto-reply/inbound.test.ts +++ b/src/auto-reply/inbound.test.ts @@ -519,6 +519,41 @@ describe("createInboundDebouncer flushAll", () => { vi.useRealTimers(); }); + + it("continues flushing later keys when onError throws", async () => { + vi.useFakeTimers(); + const calls: Array = []; + const errors: Array = []; + + const debouncer = createInboundDebouncer<{ key: string; id: string }>({ + debounceMs: 5000, + buildKey: (item) => item.key, + onFlush: async (items) => { + const ids = items.map((entry) => entry.id); + if (ids.includes("2")) { + throw new Error("dispatch failed"); + } + calls.push(ids); + }, + onError: (_err, items) => { + errors.push(items.map((entry) => entry.id)); + throw new Error("onError failed"); + }, + }); + + await debouncer.enqueue({ key: "a", id: "1" }); + await debouncer.enqueue({ key: "b", id: "2" }); + await debouncer.enqueue({ key: "c", id: "3" }); + + const flushed = await debouncer.flushAll(); + + expect(flushed).toBe(2); + expect(calls).toContainEqual(["1"]); + expect(calls).toContainEqual(["3"]); + expect(errors).toEqual([["2"]]); + + vi.useRealTimers(); + }); }); describe("initSessionState BodyStripped", () => { From 0584b8dbc04b94ee99477557351d53c4ae1c9ef3 Mon Sep 17 00:00:00 2001 From: Joey Krug Date: Sat, 14 Mar 2026 17:14:38 -0400 Subject: [PATCH 06/11] test: fix debounce mock typings --- extensions/bluebubbles/src/monitor-debounce.ts | 1 + extensions/feishu/src/monitor.test-mocks.ts | 6 ++++-- 2 files changed, 5 insertions(+), 2 deletions(-) diff --git a/extensions/bluebubbles/src/monitor-debounce.ts b/extensions/bluebubbles/src/monitor-debounce.ts index 79101bc3fb1..8eb1a0f112c 100644 --- a/extensions/bluebubbles/src/monitor-debounce.ts +++ b/extensions/bluebubbles/src/monitor-debounce.ts @@ -14,6 +14,7 @@ type BlueBubblesDebounceEntry = { export type BlueBubblesDebouncer = { enqueue: (item: BlueBubblesDebounceEntry) => Promise; flushKey: (key: string) => Promise; + flushAll: () => Promise; }; export type BlueBubblesDebounceRegistry = { diff --git a/extensions/feishu/src/monitor.test-mocks.ts b/extensions/feishu/src/monitor.test-mocks.ts index 276d6375464..fd984106988 100644 --- a/extensions/feishu/src/monitor.test-mocks.ts +++ b/extensions/feishu/src/monitor.test-mocks.ts @@ -17,7 +17,8 @@ export function createFeishuRuntimeMockModule(): { resolveInboundDebounceMs: () => number; createInboundDebouncer: () => { enqueue: () => Promise; - flushKey: () => Promise; + flushKey: () => Promise; + flushAll: () => Promise; }; }; text: { @@ -33,7 +34,8 @@ export function createFeishuRuntimeMockModule(): { resolveInboundDebounceMs: () => 0, createInboundDebouncer: () => ({ enqueue: async () => {}, - flushKey: async () => {}, + flushKey: async () => false, + flushAll: async () => 0, }), }, text: { From 324c43931b34f805133172d56ed4f7c9dc4a28d8 Mon Sep 17 00:00:00 2001 From: Joey Krug Date: Sat, 14 Mar 2026 18:06:22 -0400 Subject: [PATCH 07/11] fix: bound restart debounce draining --- src/auto-reply/inbound-debounce.ts | 18 ++++++++--- src/auto-reply/inbound.test.ts | 36 ++++++++++++++++++++++ src/cli/gateway-cli/run-loop.test.ts | 45 ++++++++++++++++++++++++++-- src/cli/gateway-cli/run-loop.ts | 20 +++++++++---- 4 files changed, 107 insertions(+), 12 deletions(-) diff --git a/src/auto-reply/inbound-debounce.ts b/src/auto-reply/inbound-debounce.ts index c5e6b53520c..abbd3df9694 100644 --- a/src/auto-reply/inbound-debounce.ts +++ b/src/auto-reply/inbound-debounce.ts @@ -8,7 +8,7 @@ import { resolveGlobalMap } from "../shared/global-singleton.js"; * itself on creation and deregisters after the next global flush sweep. */ type DebouncerFlushHandle = { - flushAll: () => Promise; + flushAll: (options?: { deadlineMs?: number }) => Promise; }; const INBOUND_DEBOUNCERS_KEY = Symbol.for("openclaw.inboundDebouncers"); const INBOUND_DEBOUNCERS = resolveGlobalMap(INBOUND_DEBOUNCERS_KEY); @@ -26,15 +26,19 @@ export function clearInboundDebouncerRegistry(): void { * Returns the number of debounce buffers actually flushed so restart logic can * skip followup draining when there was no buffered work. */ -export async function flushAllInboundDebouncers(): Promise { +export async function flushAllInboundDebouncers(options?: { timeoutMs?: number }): Promise { const entries = [...INBOUND_DEBOUNCERS.entries()]; if (entries.length === 0) { return 0; } + const deadlineMs = + typeof options?.timeoutMs === "number" && Number.isFinite(options.timeoutMs) + ? Date.now() + Math.max(0, Math.trunc(options.timeoutMs)) + : undefined; const flushedCounts = await Promise.all( entries.map(async ([key, handle]) => { try { - return await handle.flushAll(); + return await handle.flushAll({ deadlineMs }); } finally { INBOUND_DEBOUNCERS.delete(key); } @@ -172,15 +176,21 @@ export function createInboundDebouncer(params: InboundDebounceCreateParams scheduleFlush(key, buffer); }; - const flushAll = async () => { + const flushAll = async (options?: { deadlineMs?: number }) => { let flushedBufferCount = 0; // Keep sweeping until no debounced keys remain. A flush callback can race // with late in-flight ingress and create another buffered key before the // global registry deregisters this debouncer during restart. while (buffers.size > 0) { + if (options?.deadlineMs !== undefined && Date.now() >= options.deadlineMs) { + break; + } const keys = [...buffers.keys()]; for (const key of keys) { + if (options?.deadlineMs !== undefined && Date.now() >= options.deadlineMs) { + return flushedBufferCount; + } if (!buffers.has(key)) { continue; } diff --git a/src/auto-reply/inbound.test.ts b/src/auto-reply/inbound.test.ts index 47fde4c9920..7becad2cedf 100644 --- a/src/auto-reply/inbound.test.ts +++ b/src/auto-reply/inbound.test.ts @@ -554,6 +554,42 @@ describe("createInboundDebouncer flushAll", () => { vi.useRealTimers(); }); + + it("stops sweeping when the global flush deadline is reached", async () => { + vi.useFakeTimers(); + const calls: Array = []; + let now = 0; + const nowSpy = vi.spyOn(Date, "now").mockImplementation(() => now); + + let debouncer: ReturnType>; + debouncer = createInboundDebouncer<{ key: string; id: string }>({ + debounceMs: 5000, + buildKey: (item) => item.key, + onFlush: async (items) => { + calls.push(items.map((entry) => entry.id)); + if (items[0]?.id === "1") { + await debouncer.enqueue({ key: "b", id: "2" }); + now = 20; + } + }, + }); + + try { + await debouncer.enqueue({ key: "a", id: "1" }); + + const flushed = await debouncer.flushAll({ deadlineMs: 10 }); + expect(flushed).toBe(1); + expect(calls).toEqual([["1"]]); + + now = 0; + const flushedLater = await debouncer.flushAll({ deadlineMs: 10 }); + expect(flushedLater).toBe(1); + expect(calls).toEqual([["1"], ["2"]]); + } finally { + nowSpy.mockRestore(); + vi.useRealTimers(); + } + }); }); describe("initSessionState BodyStripped", () => { diff --git a/src/cli/gateway-cli/run-loop.test.ts b/src/cli/gateway-cli/run-loop.test.ts index 76a4e3066b4..66aba21375d 100644 --- a/src/cli/gateway-cli/run-loop.test.ts +++ b/src/cli/gateway-cli/run-loop.test.ts @@ -50,7 +50,8 @@ const gatewayLog = { }; vi.mock("../../auto-reply/inbound-debounce.js", () => ({ - flushAllInboundDebouncers: () => flushAllInboundDebouncers(), + flushAllInboundDebouncers: (options?: { timeoutMs?: number }) => + flushAllInboundDebouncers(options), })); vi.mock("../../auto-reply/reply/queue/drain-all.js", () => ({ @@ -371,6 +372,7 @@ describe("runGatewayLoop", () => { expect(markGatewayDraining).toHaveBeenCalledTimes(1); expect(flushAllInboundDebouncers).toHaveBeenCalledTimes(1); + expect(flushAllInboundDebouncers).toHaveBeenCalledWith({ timeoutMs: 10_000 }); expect(waitForFollowupQueueDrain).toHaveBeenCalledWith(5_000); expect(markGatewayDraining.mock.invocationCallOrder[0]).toBeLessThan( flushAllInboundDebouncers.mock.invocationCallOrder[0] ?? Number.POSITIVE_INFINITY, @@ -417,7 +419,7 @@ describe("runGatewayLoop", () => { .map((call) => call[1]) .filter((delay): delay is number => typeof delay === "number" && delay >= 95_000); expect(forceExitCalls).toContain(95_000); - expect(forceExitCalls.some((delay) => delay > 95_000 && delay <= 100_000)).toBe(true); + expect(forceExitCalls).toContain(100_000); sigterm(); await expect(exited).resolves.toBe(0); @@ -445,12 +447,13 @@ describe("runGatewayLoop", () => { await new Promise((resolve) => setImmediate(resolve)); expect(flushAllInboundDebouncers).toHaveBeenCalledTimes(1); + expect(flushAllInboundDebouncers).toHaveBeenCalledWith({ timeoutMs: 10_000 }); expect(waitForFollowupQueueDrain).not.toHaveBeenCalled(); expect(markGatewayDraining).toHaveBeenCalledTimes(1); const forceExitCalls = setTimeoutSpy.mock.calls .map((call) => call[1]) .filter((delay): delay is number => typeof delay === "number" && delay >= 95_000); - expect(forceExitCalls).toEqual([95_000]); + expect(forceExitCalls).toEqual([95_000, 95_000]); sigterm(); await expect(exited).resolves.toBe(0); @@ -488,6 +491,42 @@ describe("runGatewayLoop", () => { }); }); + it("re-arms the restart watchdog after a slow debounce flush", async () => { + vi.clearAllMocks(); + + await withIsolatedSignals(async ({ captureSignal }) => { + let now = 1000; + const nowSpy = vi.spyOn(Date, "now").mockImplementation(() => now); + flushAllInboundDebouncers.mockImplementationOnce(async () => { + now += 20_000; + return 0; + }); + + const setTimeoutSpy = vi.spyOn(globalThis, "setTimeout"); + try { + const { exited } = await createSignaledLoopHarness(); + const sigusr1 = captureSignal("SIGUSR1"); + const sigterm = captureSignal("SIGTERM"); + + sigusr1(); + + await new Promise((resolve) => setImmediate(resolve)); + await new Promise((resolve) => setImmediate(resolve)); + + const forceExitCalls = setTimeoutSpy.mock.calls + .map((call) => call[1]) + .filter((delay): delay is number => typeof delay === "number" && delay >= 95_000); + expect(forceExitCalls).toEqual([95_000, 95_000]); + + sigterm(); + await expect(exited).resolves.toBe(0); + } finally { + nowSpy.mockRestore(); + setTimeoutSpy.mockRestore(); + } + }); + }); + it("releases the lock before exiting on spawned restart", async () => { vi.clearAllMocks(); diff --git a/src/cli/gateway-cli/run-loop.ts b/src/cli/gateway-cli/run-loop.ts index 338b85a9917..6167bc56500 100644 --- a/src/cli/gateway-cli/run-loop.ts +++ b/src/cli/gateway-cli/run-loop.ts @@ -100,6 +100,7 @@ export async function runGatewayLoop(params: { const DRAIN_TIMEOUT_MS = 90_000; const FOLLOWUP_DRAIN_TIMEOUT_MS = 5_000; + const INBOUND_DEBOUNCE_FLUSH_TIMEOUT_MS = 10_000; const SHUTDOWN_TIMEOUT_MS = 5_000; const request = (action: GatewayRunSignalAction, signal: string) => { @@ -111,9 +112,8 @@ export async function runGatewayLoop(params: { const isRestart = action === "restart"; gatewayLog.info(`received ${signal}; ${isRestart ? "restarting" : "shutting down"}`); - const signalStartMs = Date.now(); const baseForceExitDeadlineMs = - signalStartMs + SHUTDOWN_TIMEOUT_MS + (isRestart ? DRAIN_TIMEOUT_MS : 0); + Date.now() + SHUTDOWN_TIMEOUT_MS + (isRestart ? DRAIN_TIMEOUT_MS : 0); let forceExitTimer: ReturnType | null = null; const armForceExitTimer = (deadlineMs: number) => { if (forceExitTimer) { @@ -140,21 +140,31 @@ export async function runGatewayLoop(params: { if (isRestart) { // Reject new command-queue work before any awaited restart drain // step so late arrivals fail explicitly instead of being stranded - // behind a one-shot debounce flush. + // behind a one-shot debounce flush. This does not block followup + // queue enqueues, so flushed inbound work can still drain normally. markGatewayDraining(); // Flush inbound debounce buffers first. This pushes any messages // waiting in per-channel debounce timers (e.g. the 2500ms collect // window) into the followup queues immediately, preventing silent // message loss when the server reinitializes. - const flushedBuffers = await flushAllInboundDebouncers(); + const flushedBuffers = await flushAllInboundDebouncers({ + timeoutMs: INBOUND_DEBOUNCE_FLUSH_TIMEOUT_MS, + }); + // Start the restart watchdog budget after the pre-shutdown debounce + // flush so slow flush handlers do not steal time from active drain. + armForceExitTimer( + Date.now() + + SHUTDOWN_TIMEOUT_MS + + DRAIN_TIMEOUT_MS + + (flushedBuffers > 0 ? FOLLOWUP_DRAIN_TIMEOUT_MS : 0), + ); if (flushedBuffers > 0) { gatewayLog.info( `flushed ${flushedBuffers} pending inbound debounce buffer(s) before restart`, ); // Give the followup queue drain loops a short window to process // the newly flushed items before the server is torn down. - armForceExitTimer(baseForceExitDeadlineMs + FOLLOWUP_DRAIN_TIMEOUT_MS); const followupResult = await waitForFollowupQueueDrain(FOLLOWUP_DRAIN_TIMEOUT_MS); if (followupResult.drained) { gatewayLog.info("followup queues drained after debounce flush"); From 8f0b2d8ba555fea81d724411980239a73e4fa66f Mon Sep 17 00:00:00 2001 From: Joey Krug Date: Sat, 14 Mar 2026 18:51:46 -0400 Subject: [PATCH 08/11] test: fix run-loop debounce mock typing --- src/cli/gateway-cli/run-loop.test.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/cli/gateway-cli/run-loop.test.ts b/src/cli/gateway-cli/run-loop.test.ts index 66aba21375d..ded7aaff4d7 100644 --- a/src/cli/gateway-cli/run-loop.test.ts +++ b/src/cli/gateway-cli/run-loop.test.ts @@ -37,7 +37,7 @@ const getActiveEmbeddedRunCount = vi.fn(() => 0); const waitForActiveEmbeddedRuns = vi.fn(async (_timeoutMs: number) => ({ drained: true, })); -const flushAllInboundDebouncers = vi.fn(async () => 0); +const flushAllInboundDebouncers = vi.fn(async (_options?: { timeoutMs?: number }) => 0); const waitForFollowupQueueDrain = vi.fn(async (_timeoutMs: number) => ({ drained: true, remaining: 0, From de53e272157d934dcb37c47f694f1c92c77a8d36 Mon Sep 17 00:00:00 2001 From: Joey Krug Date: Sat, 14 Mar 2026 21:47:50 -0400 Subject: [PATCH 09/11] Auto-reply: harden inbound debounce restart flushing --- .../bluebubbles/src/monitor-debounce.ts | 2 + .../discord/src/monitor/message-handler.ts | 5 +- .../feishu/src/monitor.reaction.test.ts | 1 + extensions/feishu/src/monitor.test-mocks.ts | 2 + .../mattermost/src/mattermost/monitor.ts | 1 + .../src/monitor-handler.file-consent.test.ts | 3 + .../message-handler.authz.test.ts | 10 +- .../slack/src/monitor/message-handler.test.ts | 2 + .../slack/src/monitor/message-handler.ts | 14 ++- extensions/slack/src/monitor/provider.ts | 1 + extensions/whatsapp/src/inbound/monitor.ts | 1 + src/auto-reply/inbound-debounce.ts | 59 +++++++++--- src/auto-reply/inbound.test.ts | 91 +++++++++++++++++++ .../helpers/extensions/plugin-runtime-mock.ts | 2 + 14 files changed, 175 insertions(+), 19 deletions(-) diff --git a/extensions/bluebubbles/src/monitor-debounce.ts b/extensions/bluebubbles/src/monitor-debounce.ts index 8eb1a0f112c..3896cbe4912 100644 --- a/extensions/bluebubbles/src/monitor-debounce.ts +++ b/extensions/bluebubbles/src/monitor-debounce.ts @@ -15,6 +15,7 @@ export type BlueBubblesDebouncer = { enqueue: (item: BlueBubblesDebounceEntry) => Promise; flushKey: (key: string) => Promise; flushAll: () => Promise; + unregister: () => void; }; export type BlueBubblesDebounceRegistry = { @@ -200,6 +201,7 @@ export function createBlueBubblesDebounceRegistry(params: { return debouncer; }, removeDebouncer: (target) => { + targetDebouncers.get(target)?.unregister(); targetDebouncers.delete(target); }, }; diff --git a/extensions/discord/src/monitor/message-handler.ts b/extensions/discord/src/monitor/message-handler.ts index e17dcc906af..fdb41c677a8 100644 --- a/extensions/discord/src/monitor/message-handler.ts +++ b/extensions/discord/src/monitor/message-handler.ts @@ -180,7 +180,10 @@ export function createDiscordMessageHandler( } }; - handler.deactivate = inboundWorker.deactivate; + handler.deactivate = () => { + debouncer.unregister(); + inboundWorker.deactivate(); + }; return handler; } diff --git a/extensions/feishu/src/monitor.reaction.test.ts b/extensions/feishu/src/monitor.reaction.test.ts index 9e607b17c84..4ac0a504cb5 100644 --- a/extensions/feishu/src/monitor.reaction.test.ts +++ b/extensions/feishu/src/monitor.reaction.test.ts @@ -713,6 +713,7 @@ describe("Feishu inbound debounce regressions", () => { }, flushKey: async (_key: string) => false, flushAll: async () => 0, + unregister: () => {}, }), resolveInboundDebounceMs, }, diff --git a/extensions/feishu/src/monitor.test-mocks.ts b/extensions/feishu/src/monitor.test-mocks.ts index fd984106988..de287c14277 100644 --- a/extensions/feishu/src/monitor.test-mocks.ts +++ b/extensions/feishu/src/monitor.test-mocks.ts @@ -19,6 +19,7 @@ export function createFeishuRuntimeMockModule(): { enqueue: () => Promise; flushKey: () => Promise; flushAll: () => Promise; + unregister: () => void; }; }; text: { @@ -36,6 +37,7 @@ export function createFeishuRuntimeMockModule(): { enqueue: async () => {}, flushKey: async () => false, flushAll: async () => 0, + unregister: () => {}, }), }, text: { diff --git a/extensions/mattermost/src/mattermost/monitor.ts b/extensions/mattermost/src/mattermost/monitor.ts index 958a40de705..44fcfebe46a 100644 --- a/extensions/mattermost/src/mattermost/monitor.ts +++ b/extensions/mattermost/src/mattermost/monitor.ts @@ -1686,6 +1686,7 @@ export async function monitorMattermostProvider(opts: MonitorMattermostOpts = {} }, }); } finally { + debouncer.unregister(); unregisterInteractions?.(); } diff --git a/extensions/msteams/src/monitor-handler.file-consent.test.ts b/extensions/msteams/src/monitor-handler.file-consent.test.ts index 39b6ea1b1ff..93ad1634c4d 100644 --- a/extensions/msteams/src/monitor-handler.file-consent.test.ts +++ b/extensions/msteams/src/monitor-handler.file-consent.test.ts @@ -33,6 +33,9 @@ const runtimeStub: PluginRuntime = { resolveInboundDebounceMs: () => 0, createInboundDebouncer: () => ({ enqueue: async () => {}, + flushKey: async () => false, + flushAll: async () => 0, + unregister: () => {}, }), }, }, diff --git a/extensions/msteams/src/monitor-handler/message-handler.authz.test.ts b/extensions/msteams/src/monitor-handler/message-handler.authz.test.ts index 68295e9bb07..96b8d85a790 100644 --- a/extensions/msteams/src/monitor-handler/message-handler.authz.test.ts +++ b/extensions/msteams/src/monitor-handler/message-handler.authz.test.ts @@ -14,10 +14,18 @@ describe("msteams monitor handler authz", () => { resolveInboundDebounceMs: () => 0, createInboundDebouncer: (params: { onFlush: (entries: T[]) => Promise; - }): { enqueue: (entry: T) => Promise } => ({ + }): { + enqueue: (entry: T) => Promise; + flushKey: (_key: string) => Promise; + flushAll: () => Promise; + unregister: () => void; + } => ({ enqueue: async (entry: T) => { await params.onFlush([entry]); }, + flushKey: async (_key: string) => false, + flushAll: async () => 0, + unregister: () => {}, }), }, pairing: { diff --git a/extensions/slack/src/monitor/message-handler.test.ts b/extensions/slack/src/monitor/message-handler.test.ts index cfea959f4d0..8ccc70bcd13 100644 --- a/extensions/slack/src/monitor/message-handler.test.ts +++ b/extensions/slack/src/monitor/message-handler.test.ts @@ -12,6 +12,8 @@ vi.mock("../../../../src/auto-reply/inbound-debounce.js", () => ({ createInboundDebouncer: () => ({ enqueue: (entry: unknown) => enqueueMock(entry), flushKey: (key: string) => flushKeyMock(key), + flushAll: async () => 0, + unregister: () => {}, }), })); diff --git a/extensions/slack/src/monitor/message-handler.ts b/extensions/slack/src/monitor/message-handler.ts index fb700b78350..5f43ed9aa58 100644 --- a/extensions/slack/src/monitor/message-handler.ts +++ b/extensions/slack/src/monitor/message-handler.ts @@ -15,6 +15,10 @@ export type SlackMessageHandler = ( opts: { source: "message" | "app_mention"; wasMentioned?: boolean }, ) => Promise; +export type SlackMessageHandlerWithLifecycle = SlackMessageHandler & { + deactivate: () => void; +}; + const APP_MENTION_RETRY_TTL_MS = 60_000; function resolveSlackSenderId(message: SlackMessageEvent): string | null { @@ -92,7 +96,7 @@ export function createSlackMessageHandler(params: { account: ResolvedSlackAccount; /** Called on each inbound event to update liveness tracking. */ trackEvent?: () => void; -}): SlackMessageHandler { +}): SlackMessageHandlerWithLifecycle { const { ctx, account, trackEvent } = params; const { debounceMs, debouncer } = createChannelInboundDebouncer<{ message: SlackMessageEvent; @@ -206,7 +210,7 @@ export function createSlackMessageHandler(params: { return true; }; - return async (message, opts) => { + const handler: SlackMessageHandlerWithLifecycle = async (message, opts) => { if (opts.source === "message" && message.type !== "message") { return; } @@ -253,4 +257,10 @@ export function createSlackMessageHandler(params: { } await debouncer.enqueue({ message: resolvedMessage, opts }); }; + + handler.deactivate = () => { + debouncer.unregister(); + }; + + return handler; } diff --git a/extensions/slack/src/monitor/provider.ts b/extensions/slack/src/monitor/provider.ts index 1af83676e93..832babd37ba 100644 --- a/extensions/slack/src/monitor/provider.ts +++ b/extensions/slack/src/monitor/provider.ts @@ -567,6 +567,7 @@ export async function monitorSlackProvider(opts: MonitorSlackOpts = {}) { } finally { opts.abortSignal?.removeEventListener("abort", stopOnAbort); unregisterHttpHandler?.(); + handleSlackMessage.deactivate(); await app.stop().catch(() => undefined); } } diff --git a/extensions/whatsapp/src/inbound/monitor.ts b/extensions/whatsapp/src/inbound/monitor.ts index b19e37feb69..38c8adb7306 100644 --- a/extensions/whatsapp/src/inbound/monitor.ts +++ b/extensions/whatsapp/src/inbound/monitor.ts @@ -461,6 +461,7 @@ export async function monitorWebInbox(options: { return { close: async () => { try { + debouncer.unregister(); const ev = sock.ev as unknown as { off?: (event: string, listener: (...args: unknown[]) => void) => void; removeListener?: (event: string, listener: (...args: unknown[]) => void) => void; diff --git a/src/auto-reply/inbound-debounce.ts b/src/auto-reply/inbound-debounce.ts index abbd3df9694..c5ab68a58d7 100644 --- a/src/auto-reply/inbound-debounce.ts +++ b/src/auto-reply/inbound-debounce.ts @@ -5,10 +5,17 @@ import { resolveGlobalMap } from "../shared/global-singleton.js"; /** * Global registry of all active inbound debouncers so they can be flushed * collectively during gateway restart (SIGUSR1). Each debouncer registers - * itself on creation and deregisters after the next global flush sweep. + * itself on creation and stays registered until a complete global flush + * drains it or the owner explicitly unregisters it during teardown. */ +type DebouncerFlushResult = { + flushedCount: number; + drained: boolean; +}; + type DebouncerFlushHandle = { - flushAll: (options?: { deadlineMs?: number }) => Promise; + flushAll: (options?: { deadlineMs?: number }) => Promise; + unregister: () => void; }; const INBOUND_DEBOUNCERS_KEY = Symbol.for("openclaw.inboundDebouncers"); const INBOUND_DEBOUNCERS = resolveGlobalMap(INBOUND_DEBOUNCERS_KEY); @@ -36,12 +43,12 @@ export async function flushAllInboundDebouncers(options?: { timeoutMs?: number } ? Date.now() + Math.max(0, Math.trunc(options.timeoutMs)) : undefined; const flushedCounts = await Promise.all( - entries.map(async ([key, handle]) => { - try { - return await handle.flushAll({ deadlineMs }); - } finally { - INBOUND_DEBOUNCERS.delete(key); + entries.map(async ([_key, handle]) => { + const result = await handle.flushAll({ deadlineMs }); + if (result.drained) { + handle.unregister(); } + return result.flushedCount; }), ); return flushedCounts.reduce((total, count) => total + count, 0); @@ -106,7 +113,7 @@ export function createInboundDebouncer(params: InboundDebounceCreateParams return Math.max(0, Math.trunc(resolved)); }; - // Returns true when the buffer had pending messages that were flushed. + // Returns true when the buffer had pending messages that were delivered. const flushBuffer = async (key: string, buffer: DebounceBuffer) => { buffers.delete(key); if (buffer.timeout) { @@ -116,12 +123,14 @@ export function createInboundDebouncer(params: InboundDebounceCreateParams if (buffer.items.length === 0) { return false; } + let delivered = false; try { await params.onFlush(buffer.items); + delivered = true; } catch (err) { params.onError?.(err, buffer.items); } - return true; + return delivered; }; const flushKey = async (key: string) => { @@ -176,7 +185,9 @@ export function createInboundDebouncer(params: InboundDebounceCreateParams scheduleFlush(key, buffer); }; - const flushAll = async (options?: { deadlineMs?: number }) => { + const flushAllInternal = async (options?: { + deadlineMs?: number; + }): Promise => { let flushedBufferCount = 0; // Keep sweeping until no debounced keys remain. A flush callback can race @@ -184,12 +195,18 @@ export function createInboundDebouncer(params: InboundDebounceCreateParams // global registry deregisters this debouncer during restart. while (buffers.size > 0) { if (options?.deadlineMs !== undefined && Date.now() >= options.deadlineMs) { - break; + return { + flushedCount: flushedBufferCount, + drained: buffers.size === 0, + }; } const keys = [...buffers.keys()]; for (const key of keys) { if (options?.deadlineMs !== undefined && Date.now() >= options.deadlineMs) { - return flushedBufferCount; + return { + flushedCount: flushedBufferCount, + drained: buffers.size === 0, + }; } if (!buffers.has(key)) { continue; @@ -206,14 +223,26 @@ export function createInboundDebouncer(params: InboundDebounceCreateParams } } - return flushedBufferCount; + return { + flushedCount: flushedBufferCount, + drained: buffers.size === 0, + }; + }; + + const flushAll = async (options?: { deadlineMs?: number }) => { + const result = await flushAllInternal(options); + return result.flushedCount; }; // Register in global registry for SIGUSR1 flush. const registryKey = Symbol(); + const unregister = () => { + INBOUND_DEBOUNCERS.delete(registryKey); + }; INBOUND_DEBOUNCERS.set(registryKey, { - flushAll, + flushAll: flushAllInternal, + unregister, }); - return { enqueue, flushKey, flushAll }; + return { enqueue, flushKey, flushAll, unregister }; } diff --git a/src/auto-reply/inbound.test.ts b/src/auto-reply/inbound.test.ts index 7becad2cedf..ef1880f4784 100644 --- a/src/auto-reply/inbound.test.ts +++ b/src/auto-reply/inbound.test.ts @@ -439,6 +439,37 @@ describe("flushAllInboundDebouncers", () => { vi.useRealTimers(); }); + it("counts only buffers that were delivered successfully", async () => { + vi.useFakeTimers(); + const calls: Array = []; + const errors: Array = []; + + const debouncer = createInboundDebouncer<{ key: string; id: string }>({ + debounceMs: 5000, + buildKey: (item) => item.key, + onFlush: async (items) => { + const ids = items.map((entry) => entry.id); + if (ids.includes("msg-1")) { + throw new Error("dispatch failed"); + } + calls.push(ids); + }, + onError: (_err, items) => { + errors.push(items.map((entry) => entry.id)); + }, + }); + + await debouncer.enqueue({ key: "session-1", id: "msg-1" }); + await debouncer.enqueue({ key: "session-2", id: "msg-2" }); + + const flushed = await flushAllInboundDebouncers(); + expect(flushed).toBe(1); + expect(calls).toEqual([["msg-2"]]); + expect(errors).toEqual([["msg-1"]]); + + vi.useRealTimers(); + }); + it("keeps flushing until no buffered keys remain", async () => { vi.useFakeTimers(); const calls: Array = []; @@ -467,11 +498,71 @@ describe("flushAllInboundDebouncers", () => { vi.useRealTimers(); }); + it("keeps timed-out debouncers registered for a later global sweep", async () => { + vi.useFakeTimers(); + const calls: Array = []; + let now = 0; + const nowSpy = vi.spyOn(Date, "now").mockImplementation(() => now); + + let debouncer: ReturnType>; + debouncer = createInboundDebouncer<{ key: string; id: string }>({ + debounceMs: 5000, + buildKey: (item) => item.key, + onFlush: async (items) => { + calls.push(items.map((entry) => entry.id)); + if (items[0]?.id === "msg-1") { + await debouncer.enqueue({ key: "session-2", id: "msg-2" }); + now = 20; + } + }, + }); + + try { + await debouncer.enqueue({ key: "session-1", id: "msg-1" }); + + const flushed = await flushAllInboundDebouncers({ timeoutMs: 10 }); + expect(flushed).toBe(1); + expect(calls).toEqual([["msg-1"]]); + + now = 0; + const flushedLater = await flushAllInboundDebouncers({ timeoutMs: 10 }); + expect(flushedLater).toBe(1); + expect(calls).toEqual([["msg-1"], ["msg-2"]]); + } finally { + nowSpy.mockRestore(); + vi.useRealTimers(); + } + }); + it("returns 0 when no debouncers are registered", async () => { const flushed = await flushAllInboundDebouncers(); expect(flushed).toBe(0); }); + it("lets callers unregister a debouncer from the global registry", async () => { + vi.useFakeTimers(); + const calls: Array = []; + + const debouncer = createInboundDebouncer<{ key: string; id: string }>({ + debounceMs: 5000, + buildKey: (item) => item.key, + onFlush: async (items) => { + calls.push(items.map((entry) => entry.id)); + }, + }); + + await debouncer.enqueue({ key: "session-1", id: "msg-1" }); + debouncer.unregister(); + + expect(await flushAllInboundDebouncers()).toBe(0); + expect(calls).toEqual([]); + + await debouncer.flushAll(); + expect(calls).toEqual([["msg-1"]]); + + vi.useRealTimers(); + }); + it("deregisters debouncers from global registry after flush", async () => { vi.useFakeTimers(); diff --git a/test/helpers/extensions/plugin-runtime-mock.ts b/test/helpers/extensions/plugin-runtime-mock.ts index c0b73a6e15d..1a3715c6446 100644 --- a/test/helpers/extensions/plugin-runtime-mock.ts +++ b/test/helpers/extensions/plugin-runtime-mock.ts @@ -275,6 +275,8 @@ export function createPluginRuntimeMock(overrides: DeepPartial = await params.onFlush([item]); }, flushKey: vi.fn(), + flushAll: vi.fn(async () => 0), + unregister: vi.fn(), }), ) as unknown as PluginRuntime["channel"]["debounce"]["createInboundDebouncer"], resolveInboundDebounceMs: vi.fn( From 1830baed036532f0174889f2808f7c86f4a3b813 Mon Sep 17 00:00:00 2001 From: Joey Krug Date: Sat, 14 Mar 2026 23:15:43 -0400 Subject: [PATCH 10/11] fix: unregister inbound debouncers on channel teardown Telegram, Feishu, and MSTeams channel monitors created inbound debouncers without calling unregister() during teardown. On reconnect a new debouncer was registered while the old one stayed in the global INBOUND_DEBOUNCERS map, accumulating stale entries that increased restart latency and memory. - Telegram: registerTelegramHandlers now returns unregisterDebouncer; called in bot.stop override - Feishu: registerEventHandlers now returns unregisterDebouncer; monitorSingleAccount wraps transport in try/finally - MSTeams: createMSTeamsMessageHandler returns { handleTeamsMessage, unregisterDebouncer }; threaded through registerMSTeamsHandlers and called in monitor shutdown Safety net: flushAllInboundDebouncers auto-evicts debouncers idle >5 min so orphaned entries from channels that forget unregister() are cleaned up. Co-Authored-By: Claude Opus 4.6 --- extensions/feishu/src/monitor.account.ts | 45 +++++++++++---- .../src/monitor-handler.file-consent.test.ts | 2 +- extensions/msteams/src/monitor-handler.ts | 15 +++-- .../message-handler.authz.test.ts | 8 +-- .../src/monitor-handler/message-handler.ts | 31 ++++++----- .../msteams/src/monitor.lifecycle.test.ts | 7 ++- extensions/msteams/src/monitor.ts | 49 +++++++++++------ .../telegram/src/bot-handlers.runtime.ts | 4 +- extensions/telegram/src/bot.ts | 5 +- src/auto-reply/inbound-debounce.ts | 23 ++++++-- src/auto-reply/inbound.test.ts | 55 +++++++++++++++++++ 11 files changed, 183 insertions(+), 61 deletions(-) diff --git a/extensions/feishu/src/monitor.account.ts b/extensions/feishu/src/monitor.account.ts index ff3a0ba9dc9..eebf69e5f45 100644 --- a/extensions/feishu/src/monitor.account.ts +++ b/extensions/feishu/src/monitor.account.ts @@ -1,15 +1,15 @@ -import * as crypto from "crypto"; -import * as Lark from "@larksuiteoapi/node-sdk"; -import type { ClawdbotConfig, RuntimeEnv, HistoryEntry } from "../runtime-api.js"; +import * as crypto from "node:crypto"; +import type * as Lark from "@larksuiteoapi/node-sdk"; +import type { ClawdbotConfig, HistoryEntry, RuntimeEnv } from "../runtime-api.js"; import { resolveFeishuAccount } from "./accounts.js"; import { raceWithTimeoutAndAbort } from "./async.js"; import { + type FeishuBotAddedEvent, + type FeishuMessageEvent, handleFeishuMessage, parseFeishuMessageEvent, - type FeishuMessageEvent, - type FeishuBotAddedEvent, } from "./bot.js"; -import { handleFeishuCardAction, type FeishuCardActionEvent } from "./card-action.js"; +import { type FeishuCardActionEvent, handleFeishuCardAction } from "./card-action.js"; import { maybeHandleFeishuQuickActionMenu } from "./card-ux-launcher.js"; import { createEventDispatcher } from "./client.js"; import { @@ -254,7 +254,7 @@ function resolveFeishuDebounceMentions(params: { function registerEventHandlers( eventDispatcher: Lark.EventDispatcher, context: RegisterEventHandlersContext, -): void { +): { unregisterDebouncer: () => void } { const { cfg, accountId, runtime, chatHistories, fireAndForget } = context; const core = getFeishuRuntime(); const inboundDebounceMs = core.channel.debounce.resolveInboundDebounceMs({ @@ -617,6 +617,8 @@ function registerEventHandlers( } }, }); + + return { unregisterDebouncer: inboundDebouncer.unregister }; } export type BotOpenIdSource = @@ -639,7 +641,10 @@ export async function monitorSingleAccount(params: MonitorSingleAccountParams): const botOpenIdSource = params.botOpenIdSource ?? { kind: "fetch" }; const botIdentity = botOpenIdSource.kind === "prefetched" - ? { botOpenId: botOpenIdSource.botOpenId, botName: botOpenIdSource.botName } + ? { + botOpenId: botOpenIdSource.botOpenId, + botName: botOpenIdSource.botName, + } : await fetchBotIdentityForMonitor(account, { runtime, abortSignal }); const botOpenId = botIdentity.botOpenId; const botName = botIdentity.botName?.trim(); @@ -670,7 +675,7 @@ export async function monitorSingleAccount(params: MonitorSingleAccountParams): const chatHistories = new Map(); threadBindingManager = createFeishuThreadBindingManager({ accountId, cfg }); - registerEventHandlers(eventDispatcher, { + const { unregisterDebouncer } = registerEventHandlers(eventDispatcher, { cfg, accountId, runtime, @@ -678,10 +683,26 @@ export async function monitorSingleAccount(params: MonitorSingleAccountParams): fireAndForget: true, }); - if (connectionMode === "webhook") { - return await monitorWebhook({ account, accountId, runtime, abortSignal, eventDispatcher }); + try { + if (connectionMode === "webhook") { + return await monitorWebhook({ + account, + accountId, + runtime, + abortSignal, + eventDispatcher, + }); + } + return await monitorWebSocket({ + account, + accountId, + runtime, + abortSignal, + eventDispatcher, + }); + } finally { + unregisterDebouncer(); } - return await monitorWebSocket({ account, accountId, runtime, abortSignal, eventDispatcher }); } finally { threadBindingManager?.stop(); } diff --git a/extensions/msteams/src/monitor-handler.file-consent.test.ts b/extensions/msteams/src/monitor-handler.file-consent.test.ts index 93ad1634c4d..ea06b5d19c7 100644 --- a/extensions/msteams/src/monitor-handler.file-consent.test.ts +++ b/extensions/msteams/src/monitor-handler.file-consent.test.ts @@ -141,7 +141,7 @@ function createConsentInvokeHarness(params: { contentType: "text/plain", conversationId: params.pendingConversationId ?? "19:victim@thread.v2", }); - const handler = registerMSTeamsHandlers(createActivityHandler(), createDeps()); + const { handler } = registerMSTeamsHandlers(createActivityHandler(), createDeps()); const { context, sendActivity } = createInvokeContext({ conversationId: params.invokeConversationId, uploadId, diff --git a/extensions/msteams/src/monitor-handler.ts b/extensions/msteams/src/monitor-handler.ts index 4cda545bd02..fec16302014 100644 --- a/extensions/msteams/src/monitor-handler.ts +++ b/extensions/msteams/src/monitor-handler.ts @@ -140,8 +140,8 @@ async function handleFileConsentInvoke( export function registerMSTeamsHandlers( handler: T, deps: MSTeamsMessageHandlerDeps, -): T { - const handleTeamsMessage = createMSTeamsMessageHandler(deps); +): { handler: T; unregisterDebouncer: () => void } { + const { handleTeamsMessage, unregisterDebouncer } = createMSTeamsMessageHandler(deps); // Wrap the original run method to intercept invokes const originalRun = handler.run; @@ -151,7 +151,10 @@ export function registerMSTeamsHandlers( // Handle file consent invokes before passing to normal flow if (ctx.activity?.type === "invoke" && ctx.activity?.name === "fileConsent/invoke") { // Send invoke response IMMEDIATELY to prevent Teams timeout - await ctx.sendActivity({ type: "invokeResponse", value: { status: 200 } }); + await ctx.sendActivity({ + type: "invokeResponse", + value: { status: 200 }, + }); try { await withRevokedProxyFallback({ @@ -164,7 +167,9 @@ export function registerMSTeamsHandlers( }, }); } catch (err) { - deps.log.debug?.("file consent handler error", { error: String(err) }); + deps.log.debug?.("file consent handler error", { + error: String(err), + }); } return; } @@ -192,5 +197,5 @@ export function registerMSTeamsHandlers( await next(); }); - return handler; + return { handler, unregisterDebouncer }; } diff --git a/extensions/msteams/src/monitor-handler/message-handler.authz.test.ts b/extensions/msteams/src/monitor-handler/message-handler.authz.test.ts index 96b8d85a790..1bb0cbf6c0d 100644 --- a/extensions/msteams/src/monitor-handler/message-handler.authz.test.ts +++ b/extensions/msteams/src/monitor-handler/message-handler.authz.test.ts @@ -79,8 +79,8 @@ describe("msteams monitor handler authz", () => { }, } as OpenClawConfig); - const handler = createMSTeamsMessageHandler(deps); - await handler({ + const { handleTeamsMessage } = createMSTeamsMessageHandler(deps); + await handleTeamsMessage({ activity: { id: "msg-1", type: "message", @@ -130,8 +130,8 @@ describe("msteams monitor handler authz", () => { }, } as OpenClawConfig); - const handler = createMSTeamsMessageHandler(deps); - await handler({ + const { handleTeamsMessage } = createMSTeamsMessageHandler(deps); + await handleTeamsMessage({ activity: { id: "msg-1", type: "message", diff --git a/extensions/msteams/src/monitor-handler/message-handler.ts b/extensions/msteams/src/monitor-handler/message-handler.ts index fe6751b94c3..05987b234ee 100644 --- a/extensions/msteams/src/monitor-handler/message-handler.ts +++ b/extensions/msteams/src/monitor-handler/message-handler.ts @@ -1,24 +1,24 @@ import { DEFAULT_ACCOUNT_ID, + DEFAULT_GROUP_HISTORY_LIMIT, buildPendingHistoryContextFromMap, clearHistoryEntriesIfEnabled, createChannelPairingController, dispatchReplyFromConfigWithSettledDispatcher, - DEFAULT_GROUP_HISTORY_LIMIT, - logInboundDrop, evaluateSenderGroupAccessForPolicy, - resolveSenderScopedGroupPolicy, - recordPendingHistoryEntryIfEnabled, - resolveDualTextControlCommandGate, - resolveDefaultGroupPolicy, - isDangerousNameMatchingEnabled, - readStoreAllowFromForDmPolicy, - resolveMentionGating, - resolveInboundSessionEnvelopeContext, formatAllowlistMatchMeta, - resolveEffectiveAllowFromLists, - resolveDmGroupAccessWithLists, type HistoryEntry, + isDangerousNameMatchingEnabled, + logInboundDrop, + readStoreAllowFromForDmPolicy, + recordPendingHistoryEntryIfEnabled, + resolveDefaultGroupPolicy, + resolveDmGroupAccessWithLists, + resolveDualTextControlCommandGate, + resolveEffectiveAllowFromLists, + resolveInboundSessionEnvelopeContext, + resolveMentionGating, + resolveSenderScopedGroupPolicy, } from "../../runtime-api.js"; import { buildMSTeamsAttachmentPlaceholder, @@ -675,7 +675,7 @@ export function createMSTeamsMessageHandler(deps: MSTeamsMessageHandlerDeps) { }, }); - return async function handleTeamsMessage(context: MSTeamsTurnContext) { + const handleTeamsMessage = async (context: MSTeamsTurnContext) => { const activity = context.activity; const rawText = activity.text?.trim() ?? ""; const text = stripMSTeamsMentionTags(rawText); @@ -698,4 +698,9 @@ export function createMSTeamsMessageHandler(deps: MSTeamsMessageHandlerDeps) { implicitMention, }); }; + + return { + handleTeamsMessage, + unregisterDebouncer: inboundDebouncer.unregister, + }; } diff --git a/extensions/msteams/src/monitor.lifecycle.test.ts b/extensions/msteams/src/monitor.lifecycle.test.ts index 67302dc61dd..d0a902ca263 100644 --- a/extensions/msteams/src/monitor.lifecycle.test.ts +++ b/extensions/msteams/src/monitor.lifecycle.test.ts @@ -30,7 +30,9 @@ vi.mock("../runtime-api.js", () => ({ resolve(); return; } - params.abortSignal?.addEventListener("abort", () => resolve(), { once: true }); + params.abortSignal?.addEventListener("abort", () => resolve(), { + once: true, + }); }); await params.onAbort?.(); }, @@ -80,7 +82,8 @@ vi.mock("express", () => { const registerMSTeamsHandlers = vi.hoisted(() => vi.fn(() => ({ - run: vi.fn(async () => {}), + handler: { run: vi.fn(async () => {}) }, + unregisterDebouncer: vi.fn(), })), ); const createMSTeamsAdapter = vi.hoisted(() => diff --git a/extensions/msteams/src/monitor.ts b/extensions/msteams/src/monitor.ts index f5c60064174..993705b2d30 100644 --- a/extensions/msteams/src/monitor.ts +++ b/extensions/msteams/src/monitor.ts @@ -12,7 +12,7 @@ import { createMSTeamsConversationStoreFs } from "./conversation-store-fs.js"; import type { MSTeamsConversationStore } from "./conversation-store.js"; import { formatUnknownError } from "./errors.js"; import type { MSTeamsAdapter } from "./messenger.js"; -import { registerMSTeamsHandlers, type MSTeamsActivityHandler } from "./monitor-handler.js"; +import { type MSTeamsActivityHandler, registerMSTeamsHandlers } from "./monitor-handler.js"; import { createMSTeamsPollStoreFs, type MSTeamsPollStore } from "./polls.js"; import { resolveMSTeamsChannelAllowlist, @@ -136,12 +136,19 @@ export async function monitorMSTeamsProvider( .filter((entry) => entry && entry !== "*"); if (groupEntries.length > 0) { const { additions } = await resolveAllowlistUsers("msteams group users", groupEntries); - groupAllowFrom = mergeAllowlist({ existing: groupAllowFrom, additions }); + groupAllowFrom = mergeAllowlist({ + existing: groupAllowFrom, + additions, + }); } } if (teamsConfig && Object.keys(teamsConfig).length > 0) { - const entries: Array<{ input: string; teamKey: string; channelKey?: string }> = []; + const entries: Array<{ + input: string; + teamKey: string; + channelKey?: string; + }> = []; for (const [teamKey, teamCfg] of Object.entries(teamsConfig)) { if (teamKey === "*") { continue; @@ -190,7 +197,11 @@ export async function monitorMSTeamsProvider( ...sourceTeam.channels, ...existing.channels, }; - const mergedTeam = { ...sourceTeam, ...existing, channels: mergedChannels }; + const mergedTeam = { + ...sourceTeam, + ...existing, + channels: mergedChannels, + }; nextTeams[entry.teamId] = mergedTeam; if (source.channelKey && entry.channelId) { const sourceChannel = sourceTeam.channels?.[source.channelKey]; @@ -254,18 +265,21 @@ export async function monitorMSTeamsProvider( const tokenProvider = new MsalTokenProvider(authConfig); const adapter = createMSTeamsAdapter(authConfig, sdk); - const handler = registerMSTeamsHandlers(new ActivityHandler() as MSTeamsActivityHandler, { - cfg, - runtime, - appId, - adapter: adapter as unknown as MSTeamsAdapter, - tokenProvider, - textLimit, - mediaMaxBytes, - conversationStore, - pollStore, - log, - }); + const { handler, unregisterDebouncer } = registerMSTeamsHandlers( + new ActivityHandler() as MSTeamsActivityHandler, + { + cfg, + runtime, + appId, + adapter: adapter as unknown as MSTeamsAdapter, + tokenProvider, + textLimit, + mediaMaxBytes, + conversationStore, + pollStore, + log, + }, + ); // Create Express server const expressApp = express.default(); @@ -283,7 +297,7 @@ export async function monitorMSTeamsProvider( const configuredPath = msteamsCfg.webhook?.path ?? "/api/messages"; const messageHandler = (req: Request, res: Response) => { void adapter - .process(req, res, (context: unknown) => handler.run!(context)) + .process(req, res, (context: unknown) => handler.run?.(context)) .catch((err: unknown) => { log.error("msteams webhook failed", { error: formatUnknownError(err) }); }); @@ -324,6 +338,7 @@ export async function monitorMSTeamsProvider( const shutdown = async () => { log.info("shutting down msteams provider"); + unregisterDebouncer(); return new Promise((resolve) => { httpServer.close((err) => { if (err) { diff --git a/extensions/telegram/src/bot-handlers.runtime.ts b/extensions/telegram/src/bot-handlers.runtime.ts index 6df428d1273..5da805e0297 100644 --- a/extensions/telegram/src/bot-handlers.runtime.ts +++ b/extensions/telegram/src/bot-handlers.runtime.ts @@ -110,7 +110,7 @@ export const registerTelegramHandlers = ({ processMessage, logger, telegramDeps = defaultTelegramBotDeps, -}: RegisterTelegramHandlerParams) => { +}: RegisterTelegramHandlerParams): { unregisterDebouncer: () => void } => { const DEFAULT_TEXT_FRAGMENT_MAX_GAP_MS = 1500; const TELEGRAM_TEXT_FRAGMENT_START_THRESHOLD_CHARS = 4000; const TELEGRAM_TEXT_FRAGMENT_MAX_GAP_MS = @@ -1746,4 +1746,6 @@ export const registerTelegramHandlers = ({ errorMessage: "channel_post handler failed", }); }); + + return { unregisterDebouncer: inboundDebouncer.unregister }; }; diff --git a/extensions/telegram/src/bot.ts b/extensions/telegram/src/bot.ts index 479560c8e38..b2887af581b 100644 --- a/extensions/telegram/src/bot.ts +++ b/extensions/telegram/src/bot.ts @@ -322,7 +322,7 @@ export function createTelegramBot(opts: TelegramBotOptions) { const MAX_RAW_UPDATE_ARRAY = 20; const stringifyUpdate = (update: unknown) => { const seen = new WeakSet(); - return JSON.stringify(update ?? null, (key, value) => { + return JSON.stringify(update ?? null, (_key, value) => { if (typeof value === "string" && value.length > MAX_RAW_UPDATE_STRING) { return `${value.slice(0, MAX_RAW_UPDATE_STRING)}...`; } @@ -529,7 +529,7 @@ export function createTelegramBot(opts: TelegramBotOptions) { telegramDeps, }); - registerTelegramHandlers({ + const { unregisterDebouncer } = registerTelegramHandlers({ cfg, accountId: account.accountId, bot, @@ -550,6 +550,7 @@ export function createTelegramBot(opts: TelegramBotOptions) { const originalStop = bot.stop.bind(bot); bot.stop = ((...args: Parameters) => { + unregisterDebouncer(); threadBindingManager?.stop(); return originalStop(...args); }) as typeof bot.stop; diff --git a/src/auto-reply/inbound-debounce.ts b/src/auto-reply/inbound-debounce.ts index c5ab68a58d7..813aaade21a 100644 --- a/src/auto-reply/inbound-debounce.ts +++ b/src/auto-reply/inbound-debounce.ts @@ -16,6 +16,8 @@ type DebouncerFlushResult = { type DebouncerFlushHandle = { flushAll: (options?: { deadlineMs?: number }) => Promise; unregister: () => void; + /** Epoch ms of last enqueue or creation, whichever is more recent. */ + lastActivityMs: number; }; const INBOUND_DEBOUNCERS_KEY = Symbol.for("openclaw.inboundDebouncers"); const INBOUND_DEBOUNCERS = resolveGlobalMap(INBOUND_DEBOUNCERS_KEY); @@ -27,25 +29,35 @@ export function clearInboundDebouncerRegistry(): void { INBOUND_DEBOUNCERS.clear(); } +/** Debouncers idle longer than this are auto-removed during flush as a safety + * net against channels that forget to call unregister() on teardown. */ +const STALE_DEBOUNCER_MS = 5 * 60 * 1000; // 5 minutes + /** * Flush all registered inbound debouncers immediately. Called during SIGUSR1 * restart to push buffered messages into the session before reinitializing. * Returns the number of debounce buffers actually flushed so restart logic can * skip followup draining when there was no buffered work. + * + * Stale debouncers (no enqueue activity for >5 minutes) are auto-evicted as a + * safety net in case a channel monitor forgot to call unregister() on teardown. */ export async function flushAllInboundDebouncers(options?: { timeoutMs?: number }): Promise { const entries = [...INBOUND_DEBOUNCERS.entries()]; if (entries.length === 0) { return 0; } + const now = Date.now(); const deadlineMs = typeof options?.timeoutMs === "number" && Number.isFinite(options.timeoutMs) - ? Date.now() + Math.max(0, Math.trunc(options.timeoutMs)) + ? now + Math.max(0, Math.trunc(options.timeoutMs)) : undefined; const flushedCounts = await Promise.all( entries.map(async ([_key, handle]) => { const result = await handle.flushAll({ deadlineMs }); - if (result.drained) { + // Remove drained debouncers, and auto-evict stale entries whose + // owning channel never called unregister() (e.g. after reconnect). + if (result.drained || now - handle.lastActivityMs >= STALE_DEBOUNCER_MS) { handle.unregister(); } return result.flushedCount; @@ -152,6 +164,7 @@ export function createInboundDebouncer(params: InboundDebounceCreateParams }; const enqueue = async (item: T) => { + handle.lastActivityMs = Date.now(); const key = params.buildKey(item); const debounceMs = resolveDebounceMs(item); const canDebounce = debounceMs > 0 && (params.shouldDebounce?.(item) ?? true); @@ -239,10 +252,12 @@ export function createInboundDebouncer(params: InboundDebounceCreateParams const unregister = () => { INBOUND_DEBOUNCERS.delete(registryKey); }; - INBOUND_DEBOUNCERS.set(registryKey, { + const handle: DebouncerFlushHandle = { flushAll: flushAllInternal, unregister, - }); + lastActivityMs: Date.now(), + }; + INBOUND_DEBOUNCERS.set(registryKey, handle); return { enqueue, flushKey, flushAll, unregister }; } diff --git a/src/auto-reply/inbound.test.ts b/src/auto-reply/inbound.test.ts index ef1880f4784..a724036d007 100644 --- a/src/auto-reply/inbound.test.ts +++ b/src/auto-reply/inbound.test.ts @@ -581,6 +581,61 @@ describe("flushAllInboundDebouncers", () => { vi.useRealTimers(); }); + + it("auto-evicts stale debouncers idle >5 min even with a tight deadline", async () => { + vi.useFakeTimers(); + + // Use a debounceMs longer than the staleness window so the debounce + // timeout does NOT fire when we advance the clock. + createInboundDebouncer<{ key: string; id: string }>({ + debounceMs: 10 * 60 * 1000, + buildKey: (item) => item.key, + onFlush: async () => {}, + }); + + // Advance past the 5-minute staleness window (debounce timer still pending) + vi.advanceTimersByTime(5 * 60 * 1000 + 1); + + // Flush with a zero-ms timeout. The deadline fires immediately so the + // debouncer cannot actually drain, but the staleness guard evicts it. + await flushAllInboundDebouncers({ timeoutMs: 0 }); + + // Second flush should find nothing — stale entry was auto-evicted + const flushed2 = await flushAllInboundDebouncers(); + expect(flushed2).toBe(0); + + vi.useRealTimers(); + }); + + it("does not evict debouncers that have recent activity", async () => { + vi.useFakeTimers(); + + const debouncer = createInboundDebouncer<{ key: string; id: string }>({ + debounceMs: 10 * 60 * 1000, + buildKey: (item) => item.key, + onFlush: async () => {}, + }); + + // Advance 4 minutes (below staleness threshold) + vi.advanceTimersByTime(4 * 60 * 1000); + + // Enqueue refreshes the activity timestamp + await debouncer.enqueue({ key: "session-1", id: "msg-1" }); + + // Advance another 4 minutes (8 total since creation, 4 since enqueue) + vi.advanceTimersByTime(4 * 60 * 1000); + + // Flush with zero timeout — debouncer can't drain, but it's NOT stale + // (only 4 min since last enqueue). It should remain registered. + await flushAllInboundDebouncers({ timeoutMs: 0 }); + + // Debouncer should still be in the registry + // Do a full flush to verify it's still there + const flushed = await flushAllInboundDebouncers(); + expect(flushed).toBe(1); + + vi.useRealTimers(); + }); }); describe("createInboundDebouncer flushAll", () => { From 46465c9f72c1da3a754dabb32a18e72fc2d730f8 Mon Sep 17 00:00:00 2001 From: Joey Krug Date: Sat, 21 Mar 2026 01:07:25 -0400 Subject: [PATCH 11/11] fix: address codex review comments on #46303 - Flush inbound debouncers BEFORE markGatewayDraining() so flushed messages can still enqueue into the command queue (CWE-672) - Reorder restart drain: flush debouncers -> active tasks -> followup queues (followups need active turns to finish before they can drain) - Always drain followup queues regardless of flushed debouncer count - Only deregister debouncer handles after all buffers confirmed drained; keep partially-flushed handles for subsequent sweeps - Wrap flushAll with deadline-based timeout (Promise.race) to prevent hung provider calls from blocking restart indefinitely - Unregister MSTeams debouncer on startup failure (EADDRINUSE etc) - Update test expectations for new drain ordering --- extensions/msteams/src/monitor.ts | 38 +++++++++++------- src/auto-reply/inbound-debounce.ts | 27 +++++++++++-- src/cli/gateway-cli/run-loop.test.ts | 25 ++++++++---- src/cli/gateway-cli/run-loop.ts | 58 ++++++++++++++-------------- 4 files changed, 94 insertions(+), 54 deletions(-) diff --git a/extensions/msteams/src/monitor.ts b/extensions/msteams/src/monitor.ts index 993705b2d30..d66ada1dcf9 100644 --- a/extensions/msteams/src/monitor.ts +++ b/extensions/msteams/src/monitor.ts @@ -315,21 +315,29 @@ export async function monitorMSTeamsProvider( }); // Start listening and fail fast if bind/listen fails. - const httpServer = expressApp.listen(port); - await new Promise((resolve, reject) => { - const onListening = () => { - httpServer.off("error", onError); - log.info(`msteams provider started on port ${port}`); - resolve(); - }; - const onError = (err: unknown) => { - httpServer.off("listening", onListening); - log.error("msteams server error", { error: String(err) }); - reject(err); - }; - httpServer.once("listening", onListening); - httpServer.once("error", onError); - }); + let httpServer: ReturnType; + try { + httpServer = expressApp.listen(port); + await new Promise((resolve, reject) => { + const onListening = () => { + httpServer.off("error", onError); + log.info(`msteams provider started on port ${port}`); + resolve(); + }; + const onError = (err: unknown) => { + httpServer.off("listening", onListening); + log.error("msteams server error", { error: String(err) }); + reject(err); + }; + httpServer.once("listening", onListening); + httpServer.once("error", onError); + }); + } catch (err) { + // Clean up the debouncer so it does not linger in the global registry + // when the provider fails to start (e.g. port already in use). + unregisterDebouncer(); + throw err; + } applyMSTeamsWebhookTimeouts(httpServer); httpServer.on("error", (err) => { diff --git a/src/auto-reply/inbound-debounce.ts b/src/auto-reply/inbound-debounce.ts index 813aaade21a..34c0d463185 100644 --- a/src/auto-reply/inbound-debounce.ts +++ b/src/auto-reply/inbound-debounce.ts @@ -54,9 +54,30 @@ export async function flushAllInboundDebouncers(options?: { timeoutMs?: number } : undefined; const flushedCounts = await Promise.all( entries.map(async ([_key, handle]) => { - const result = await handle.flushAll({ deadlineMs }); - // Remove drained debouncers, and auto-evict stale entries whose - // owning channel never called unregister() (e.g. after reconnect). + let result: DebouncerFlushResult; + try { + result = await (deadlineMs !== undefined + ? Promise.race([ + handle.flushAll({ deadlineMs }), + new Promise((resolve) => { + const timer = setTimeout( + () => resolve({ flushedCount: 0, drained: false }), + Math.max(0, deadlineMs - Date.now()), + ); + timer.unref?.(); + }), + ]) + : handle.flushAll({ deadlineMs })); + } catch { + // A hung or failing flushAll should not prevent other debouncers + // from being swept. Keep the handle registered for a future sweep. + return 0; + } + // Only deregister AFTER the handle confirms all its buffers are + // drained. If the deadline hit mid-sweep, keep partially-flushed + // handles registered so subsequent sweeps can finish the job. + // Also auto-evict stale entries whose owning channel never called + // unregister() (e.g. after reconnect). if (result.drained || now - handle.lastActivityMs >= STALE_DEBOUNCER_MS) { handle.unregister(); } diff --git a/src/cli/gateway-cli/run-loop.test.ts b/src/cli/gateway-cli/run-loop.test.ts index ded7aaff4d7..36db91ac0b5 100644 --- a/src/cli/gateway-cli/run-loop.test.ts +++ b/src/cli/gateway-cli/run-loop.test.ts @@ -351,7 +351,7 @@ describe("runGatewayLoop", () => { }); }); - it("marks gateway draining before flushing inbound debouncers on SIGUSR1", async () => { + it("flushes inbound debouncers before marking gateway draining on SIGUSR1", async () => { vi.clearAllMocks(); await withIsolatedSignals(async ({ captureSignal }) => { @@ -374,8 +374,9 @@ describe("runGatewayLoop", () => { expect(flushAllInboundDebouncers).toHaveBeenCalledTimes(1); expect(flushAllInboundDebouncers).toHaveBeenCalledWith({ timeoutMs: 10_000 }); expect(waitForFollowupQueueDrain).toHaveBeenCalledWith(5_000); - expect(markGatewayDraining.mock.invocationCallOrder[0]).toBeLessThan( - flushAllInboundDebouncers.mock.invocationCallOrder[0] ?? Number.POSITIVE_INFINITY, + // Flush debouncers BEFORE marking draining so flushed messages can enqueue + expect(flushAllInboundDebouncers.mock.invocationCallOrder[0]).toBeLessThan( + markGatewayDraining.mock.invocationCallOrder[0] ?? Number.POSITIVE_INFINITY, ); expect(markGatewayDraining.mock.invocationCallOrder[0]).toBeLessThan( waitForFollowupQueueDrain.mock.invocationCallOrder[0] ?? Number.POSITIVE_INFINITY, @@ -387,7 +388,7 @@ describe("runGatewayLoop", () => { expect(gatewayLog.info).toHaveBeenCalledWith( "flushed 2 pending inbound debounce buffer(s) before restart", ); - expect(gatewayLog.info).toHaveBeenCalledWith("followup queues drained after debounce flush"); + expect(gatewayLog.info).toHaveBeenCalledWith("followup queues drained before restart"); sigterm(); await expect(exited).resolves.toBe(0); @@ -429,11 +430,15 @@ describe("runGatewayLoop", () => { }); }); - it("skips followup queue drain when no debouncers had buffered messages", async () => { + it("always drains followup queue even when no debouncers had buffered messages", async () => { vi.clearAllMocks(); await withIsolatedSignals(async ({ captureSignal }) => { flushAllInboundDebouncers.mockResolvedValueOnce(0); + waitForFollowupQueueDrain.mockResolvedValueOnce({ + drained: true, + remaining: 0, + }); const setTimeoutSpy = vi.spyOn(globalThis, "setTimeout"); try { @@ -448,12 +453,13 @@ describe("runGatewayLoop", () => { expect(flushAllInboundDebouncers).toHaveBeenCalledTimes(1); expect(flushAllInboundDebouncers).toHaveBeenCalledWith({ timeoutMs: 10_000 }); - expect(waitForFollowupQueueDrain).not.toHaveBeenCalled(); + // Followup queue drain is always called regardless of flushedCount + expect(waitForFollowupQueueDrain).toHaveBeenCalledWith(5_000); expect(markGatewayDraining).toHaveBeenCalledTimes(1); const forceExitCalls = setTimeoutSpy.mock.calls .map((call) => call[1]) .filter((delay): delay is number => typeof delay === "number" && delay >= 95_000); - expect(forceExitCalls).toEqual([95_000, 95_000]); + expect(forceExitCalls).toEqual([95_000, 100_000]); sigterm(); await expect(exited).resolves.toBe(0); @@ -516,7 +522,10 @@ describe("runGatewayLoop", () => { const forceExitCalls = setTimeoutSpy.mock.calls .map((call) => call[1]) .filter((delay): delay is number => typeof delay === "number" && delay >= 95_000); - expect(forceExitCalls).toEqual([95_000, 95_000]); + // First arm: 1000 + 5000 + 90000 = 96000, delay = 96000 - 1000 = 95000 + // Second arm (after 20s flush): 21000 + 5000 + 90000 + 5000 = 121000, + // delay = 121000 - 21000 = 100000 + expect(forceExitCalls).toEqual([95_000, 100_000]); sigterm(); await expect(exited).resolves.toBe(0); diff --git a/src/cli/gateway-cli/run-loop.ts b/src/cli/gateway-cli/run-loop.ts index 6167bc56500..08f7b39119b 100644 --- a/src/cli/gateway-cli/run-loop.ts +++ b/src/cli/gateway-cli/run-loop.ts @@ -138,42 +138,31 @@ export async function runGatewayLoop(params: { // On restart, wait for in-flight agent turns to finish before // tearing down the server so buffered messages are delivered. if (isRestart) { - // Reject new command-queue work before any awaited restart drain - // step so late arrivals fail explicitly instead of being stranded - // behind a one-shot debounce flush. This does not block followup - // queue enqueues, so flushed inbound work can still drain normally. - markGatewayDraining(); - - // Flush inbound debounce buffers first. This pushes any messages - // waiting in per-channel debounce timers (e.g. the 2500ms collect - // window) into the followup queues immediately, preventing silent - // message loss when the server reinitializes. + // Flush inbound debounce buffers BEFORE marking the gateway as + // draining so flushed messages can still enqueue into the command + // queue. This pushes any messages waiting in per-channel debounce + // timers (e.g. the 2500ms collect window) into the followup queues + // immediately, preventing silent message loss on reinit. const flushedBuffers = await flushAllInboundDebouncers({ timeoutMs: INBOUND_DEBOUNCE_FLUSH_TIMEOUT_MS, }); - // Start the restart watchdog budget after the pre-shutdown debounce - // flush so slow flush handlers do not steal time from active drain. - armForceExitTimer( - Date.now() + - SHUTDOWN_TIMEOUT_MS + - DRAIN_TIMEOUT_MS + - (flushedBuffers > 0 ? FOLLOWUP_DRAIN_TIMEOUT_MS : 0), - ); if (flushedBuffers > 0) { gatewayLog.info( `flushed ${flushedBuffers} pending inbound debounce buffer(s) before restart`, ); - // Give the followup queue drain loops a short window to process - // the newly flushed items before the server is torn down. - const followupResult = await waitForFollowupQueueDrain(FOLLOWUP_DRAIN_TIMEOUT_MS); - if (followupResult.drained) { - gatewayLog.info("followup queues drained after debounce flush"); - } else { - gatewayLog.warn( - `followup queue drain timeout; ${followupResult.remaining} item(s) still pending`, - ); - } } + + // Now reject new command-queue work so late arrivals fail explicitly + // instead of being stranded. This does not block followup queue + // enqueues, so already-flushed inbound work can still drain normally. + markGatewayDraining(); + + // Start the restart watchdog budget after the pre-shutdown debounce + // flush so slow flush handlers do not steal time from active drain. + armForceExitTimer( + Date.now() + SHUTDOWN_TIMEOUT_MS + DRAIN_TIMEOUT_MS + FOLLOWUP_DRAIN_TIMEOUT_MS, + ); + const activeTasks = getActiveTaskCount(); const activeRuns = getActiveEmbeddedRunCount(); @@ -204,6 +193,19 @@ export async function runGatewayLoop(params: { abortEmbeddedPiRun(undefined, { mode: "all" }); } } + + // Drain followup queues AFTER active tasks finish so tasks that + // produce followup work have a chance to enqueue before we wait. + // Always drain regardless of flushedCount — queued followups are + // not contingent on debouncers. + const followupResult = await waitForFollowupQueueDrain(FOLLOWUP_DRAIN_TIMEOUT_MS); + if (followupResult.drained) { + gatewayLog.info("followup queues drained before restart"); + } else { + gatewayLog.warn( + `followup queue drain timeout; ${followupResult.remaining} item(s) still pending`, + ); + } } await server?.close({