Auto-reply: harden inbound debounce restart flushing

This commit is contained in:
Joey Krug 2026-03-14 21:47:50 -04:00
parent 8f0b2d8ba5
commit de53e27215
14 changed files with 175 additions and 19 deletions

View File

@ -15,6 +15,7 @@ export type BlueBubblesDebouncer = {
enqueue: (item: BlueBubblesDebounceEntry) => Promise<void>;
flushKey: (key: string) => Promise<boolean>;
flushAll: () => Promise<number>;
unregister: () => void;
};
export type BlueBubblesDebounceRegistry = {
@ -200,6 +201,7 @@ export function createBlueBubblesDebounceRegistry(params: {
return debouncer;
},
removeDebouncer: (target) => {
targetDebouncers.get(target)?.unregister();
targetDebouncers.delete(target);
},
};

View File

@ -180,7 +180,10 @@ export function createDiscordMessageHandler(
}
};
handler.deactivate = inboundWorker.deactivate;
handler.deactivate = () => {
debouncer.unregister();
inboundWorker.deactivate();
};
return handler;
}

View File

@ -713,6 +713,7 @@ describe("Feishu inbound debounce regressions", () => {
},
flushKey: async (_key: string) => false,
flushAll: async () => 0,
unregister: () => {},
}),
resolveInboundDebounceMs,
},

View File

@ -19,6 +19,7 @@ export function createFeishuRuntimeMockModule(): {
enqueue: () => Promise<void>;
flushKey: () => Promise<boolean>;
flushAll: () => Promise<number>;
unregister: () => void;
};
};
text: {
@ -36,6 +37,7 @@ export function createFeishuRuntimeMockModule(): {
enqueue: async () => {},
flushKey: async () => false,
flushAll: async () => 0,
unregister: () => {},
}),
},
text: {

View File

@ -1686,6 +1686,7 @@ export async function monitorMattermostProvider(opts: MonitorMattermostOpts = {}
},
});
} finally {
debouncer.unregister();
unregisterInteractions?.();
}

View File

@ -33,6 +33,9 @@ const runtimeStub: PluginRuntime = {
resolveInboundDebounceMs: () => 0,
createInboundDebouncer: () => ({
enqueue: async () => {},
flushKey: async () => false,
flushAll: async () => 0,
unregister: () => {},
}),
},
},

View File

@ -14,10 +14,18 @@ describe("msteams monitor handler authz", () => {
resolveInboundDebounceMs: () => 0,
createInboundDebouncer: <T>(params: {
onFlush: (entries: T[]) => Promise<void>;
}): { enqueue: (entry: T) => Promise<void> } => ({
}): {
enqueue: (entry: T) => Promise<void>;
flushKey: (_key: string) => Promise<boolean>;
flushAll: () => Promise<number>;
unregister: () => void;
} => ({
enqueue: async (entry: T) => {
await params.onFlush([entry]);
},
flushKey: async (_key: string) => false,
flushAll: async () => 0,
unregister: () => {},
}),
},
pairing: {

View File

@ -12,6 +12,8 @@ vi.mock("../../../../src/auto-reply/inbound-debounce.js", () => ({
createInboundDebouncer: () => ({
enqueue: (entry: unknown) => enqueueMock(entry),
flushKey: (key: string) => flushKeyMock(key),
flushAll: async () => 0,
unregister: () => {},
}),
}));

View File

@ -15,6 +15,10 @@ export type SlackMessageHandler = (
opts: { source: "message" | "app_mention"; wasMentioned?: boolean },
) => Promise<void>;
export type SlackMessageHandlerWithLifecycle = SlackMessageHandler & {
deactivate: () => void;
};
const APP_MENTION_RETRY_TTL_MS = 60_000;
function resolveSlackSenderId(message: SlackMessageEvent): string | null {
@ -92,7 +96,7 @@ export function createSlackMessageHandler(params: {
account: ResolvedSlackAccount;
/** Called on each inbound event to update liveness tracking. */
trackEvent?: () => void;
}): SlackMessageHandler {
}): SlackMessageHandlerWithLifecycle {
const { ctx, account, trackEvent } = params;
const { debounceMs, debouncer } = createChannelInboundDebouncer<{
message: SlackMessageEvent;
@ -206,7 +210,7 @@ export function createSlackMessageHandler(params: {
return true;
};
return async (message, opts) => {
const handler: SlackMessageHandlerWithLifecycle = async (message, opts) => {
if (opts.source === "message" && message.type !== "message") {
return;
}
@ -253,4 +257,10 @@ export function createSlackMessageHandler(params: {
}
await debouncer.enqueue({ message: resolvedMessage, opts });
};
handler.deactivate = () => {
debouncer.unregister();
};
return handler;
}

View File

@ -567,6 +567,7 @@ export async function monitorSlackProvider(opts: MonitorSlackOpts = {}) {
} finally {
opts.abortSignal?.removeEventListener("abort", stopOnAbort);
unregisterHttpHandler?.();
handleSlackMessage.deactivate();
await app.stop().catch(() => undefined);
}
}

View File

@ -461,6 +461,7 @@ export async function monitorWebInbox(options: {
return {
close: async () => {
try {
debouncer.unregister();
const ev = sock.ev as unknown as {
off?: (event: string, listener: (...args: unknown[]) => void) => void;
removeListener?: (event: string, listener: (...args: unknown[]) => void) => void;

View File

@ -5,10 +5,17 @@ import { resolveGlobalMap } from "../shared/global-singleton.js";
/**
* Global registry of all active inbound debouncers so they can be flushed
* collectively during gateway restart (SIGUSR1). Each debouncer registers
* itself on creation and deregisters after the next global flush sweep.
* itself on creation and stays registered until a complete global flush
* drains it or the owner explicitly unregisters it during teardown.
*/
type DebouncerFlushResult = {
flushedCount: number;
drained: boolean;
};
type DebouncerFlushHandle = {
flushAll: (options?: { deadlineMs?: number }) => Promise<number>;
flushAll: (options?: { deadlineMs?: number }) => Promise<DebouncerFlushResult>;
unregister: () => void;
};
const INBOUND_DEBOUNCERS_KEY = Symbol.for("openclaw.inboundDebouncers");
const INBOUND_DEBOUNCERS = resolveGlobalMap<symbol, DebouncerFlushHandle>(INBOUND_DEBOUNCERS_KEY);
@ -36,12 +43,12 @@ export async function flushAllInboundDebouncers(options?: { timeoutMs?: number }
? Date.now() + Math.max(0, Math.trunc(options.timeoutMs))
: undefined;
const flushedCounts = await Promise.all(
entries.map(async ([key, handle]) => {
try {
return await handle.flushAll({ deadlineMs });
} finally {
INBOUND_DEBOUNCERS.delete(key);
entries.map(async ([_key, handle]) => {
const result = await handle.flushAll({ deadlineMs });
if (result.drained) {
handle.unregister();
}
return result.flushedCount;
}),
);
return flushedCounts.reduce((total, count) => total + count, 0);
@ -106,7 +113,7 @@ export function createInboundDebouncer<T>(params: InboundDebounceCreateParams<T>
return Math.max(0, Math.trunc(resolved));
};
// Returns true when the buffer had pending messages that were flushed.
// Returns true when the buffer had pending messages that were delivered.
const flushBuffer = async (key: string, buffer: DebounceBuffer<T>) => {
buffers.delete(key);
if (buffer.timeout) {
@ -116,12 +123,14 @@ export function createInboundDebouncer<T>(params: InboundDebounceCreateParams<T>
if (buffer.items.length === 0) {
return false;
}
let delivered = false;
try {
await params.onFlush(buffer.items);
delivered = true;
} catch (err) {
params.onError?.(err, buffer.items);
}
return true;
return delivered;
};
const flushKey = async (key: string) => {
@ -176,7 +185,9 @@ export function createInboundDebouncer<T>(params: InboundDebounceCreateParams<T>
scheduleFlush(key, buffer);
};
const flushAll = async (options?: { deadlineMs?: number }) => {
const flushAllInternal = async (options?: {
deadlineMs?: number;
}): Promise<DebouncerFlushResult> => {
let flushedBufferCount = 0;
// Keep sweeping until no debounced keys remain. A flush callback can race
@ -184,12 +195,18 @@ export function createInboundDebouncer<T>(params: InboundDebounceCreateParams<T>
// global registry deregisters this debouncer during restart.
while (buffers.size > 0) {
if (options?.deadlineMs !== undefined && Date.now() >= options.deadlineMs) {
break;
return {
flushedCount: flushedBufferCount,
drained: buffers.size === 0,
};
}
const keys = [...buffers.keys()];
for (const key of keys) {
if (options?.deadlineMs !== undefined && Date.now() >= options.deadlineMs) {
return flushedBufferCount;
return {
flushedCount: flushedBufferCount,
drained: buffers.size === 0,
};
}
if (!buffers.has(key)) {
continue;
@ -206,14 +223,26 @@ export function createInboundDebouncer<T>(params: InboundDebounceCreateParams<T>
}
}
return flushedBufferCount;
return {
flushedCount: flushedBufferCount,
drained: buffers.size === 0,
};
};
const flushAll = async (options?: { deadlineMs?: number }) => {
const result = await flushAllInternal(options);
return result.flushedCount;
};
// Register in global registry for SIGUSR1 flush.
const registryKey = Symbol();
const unregister = () => {
INBOUND_DEBOUNCERS.delete(registryKey);
};
INBOUND_DEBOUNCERS.set(registryKey, {
flushAll,
flushAll: flushAllInternal,
unregister,
});
return { enqueue, flushKey, flushAll };
return { enqueue, flushKey, flushAll, unregister };
}

View File

@ -439,6 +439,37 @@ describe("flushAllInboundDebouncers", () => {
vi.useRealTimers();
});
it("counts only buffers that were delivered successfully", async () => {
vi.useFakeTimers();
const calls: Array<string[]> = [];
const errors: Array<string[]> = [];
const debouncer = createInboundDebouncer<{ key: string; id: string }>({
debounceMs: 5000,
buildKey: (item) => item.key,
onFlush: async (items) => {
const ids = items.map((entry) => entry.id);
if (ids.includes("msg-1")) {
throw new Error("dispatch failed");
}
calls.push(ids);
},
onError: (_err, items) => {
errors.push(items.map((entry) => entry.id));
},
});
await debouncer.enqueue({ key: "session-1", id: "msg-1" });
await debouncer.enqueue({ key: "session-2", id: "msg-2" });
const flushed = await flushAllInboundDebouncers();
expect(flushed).toBe(1);
expect(calls).toEqual([["msg-2"]]);
expect(errors).toEqual([["msg-1"]]);
vi.useRealTimers();
});
it("keeps flushing until no buffered keys remain", async () => {
vi.useFakeTimers();
const calls: Array<string[]> = [];
@ -467,11 +498,71 @@ describe("flushAllInboundDebouncers", () => {
vi.useRealTimers();
});
it("keeps timed-out debouncers registered for a later global sweep", async () => {
vi.useFakeTimers();
const calls: Array<string[]> = [];
let now = 0;
const nowSpy = vi.spyOn(Date, "now").mockImplementation(() => now);
let debouncer: ReturnType<typeof createInboundDebouncer<{ key: string; id: string }>>;
debouncer = createInboundDebouncer<{ key: string; id: string }>({
debounceMs: 5000,
buildKey: (item) => item.key,
onFlush: async (items) => {
calls.push(items.map((entry) => entry.id));
if (items[0]?.id === "msg-1") {
await debouncer.enqueue({ key: "session-2", id: "msg-2" });
now = 20;
}
},
});
try {
await debouncer.enqueue({ key: "session-1", id: "msg-1" });
const flushed = await flushAllInboundDebouncers({ timeoutMs: 10 });
expect(flushed).toBe(1);
expect(calls).toEqual([["msg-1"]]);
now = 0;
const flushedLater = await flushAllInboundDebouncers({ timeoutMs: 10 });
expect(flushedLater).toBe(1);
expect(calls).toEqual([["msg-1"], ["msg-2"]]);
} finally {
nowSpy.mockRestore();
vi.useRealTimers();
}
});
it("returns 0 when no debouncers are registered", async () => {
const flushed = await flushAllInboundDebouncers();
expect(flushed).toBe(0);
});
it("lets callers unregister a debouncer from the global registry", async () => {
vi.useFakeTimers();
const calls: Array<string[]> = [];
const debouncer = createInboundDebouncer<{ key: string; id: string }>({
debounceMs: 5000,
buildKey: (item) => item.key,
onFlush: async (items) => {
calls.push(items.map((entry) => entry.id));
},
});
await debouncer.enqueue({ key: "session-1", id: "msg-1" });
debouncer.unregister();
expect(await flushAllInboundDebouncers()).toBe(0);
expect(calls).toEqual([]);
await debouncer.flushAll();
expect(calls).toEqual([["msg-1"]]);
vi.useRealTimers();
});
it("deregisters debouncers from global registry after flush", async () => {
vi.useFakeTimers();

View File

@ -275,6 +275,8 @@ export function createPluginRuntimeMock(overrides: DeepPartial<PluginRuntime> =
await params.onFlush([item]);
},
flushKey: vi.fn(),
flushAll: vi.fn(async () => 0),
unregister: vi.fn(),
}),
) as unknown as PluginRuntime["channel"]["debounce"]["createInboundDebouncer"],
resolveInboundDebounceMs: vi.fn(