Merge 46465c9f72c1da3a754dabb32a18e72fc2d730f8 into 9fb78453e088cd7b553d7779faa0de5c83708e70

This commit is contained in:
Joseph Krug 2026-03-21 05:07:35 +00:00 committed by GitHub
commit d2a2af7624
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
26 changed files with 1150 additions and 108 deletions

View File

@ -13,7 +13,9 @@ type BlueBubblesDebounceEntry = {
export type BlueBubblesDebouncer = {
enqueue: (item: BlueBubblesDebounceEntry) => Promise<void>;
flushKey: (key: string) => Promise<void>;
flushKey: (key: string) => Promise<boolean>;
flushAll: () => Promise<number>;
unregister: () => void;
};
export type BlueBubblesDebounceRegistry = {
@ -199,6 +201,7 @@ export function createBlueBubblesDebounceRegistry(params: {
return debouncer;
},
removeDebouncer: (target) => {
targetDebouncers.get(target)?.unregister();
targetDebouncers.delete(target);
},
};

View File

@ -180,7 +180,10 @@ export function createDiscordMessageHandler(
}
};
handler.deactivate = inboundWorker.deactivate;
handler.deactivate = () => {
debouncer.unregister();
inboundWorker.deactivate();
};
return handler;
}

View File

@ -1,15 +1,15 @@
import * as crypto from "crypto";
import * as Lark from "@larksuiteoapi/node-sdk";
import type { ClawdbotConfig, RuntimeEnv, HistoryEntry } from "../runtime-api.js";
import * as crypto from "node:crypto";
import type * as Lark from "@larksuiteoapi/node-sdk";
import type { ClawdbotConfig, HistoryEntry, RuntimeEnv } from "../runtime-api.js";
import { resolveFeishuAccount } from "./accounts.js";
import { raceWithTimeoutAndAbort } from "./async.js";
import {
type FeishuBotAddedEvent,
type FeishuMessageEvent,
handleFeishuMessage,
parseFeishuMessageEvent,
type FeishuMessageEvent,
type FeishuBotAddedEvent,
} from "./bot.js";
import { handleFeishuCardAction, type FeishuCardActionEvent } from "./card-action.js";
import { type FeishuCardActionEvent, handleFeishuCardAction } from "./card-action.js";
import { maybeHandleFeishuQuickActionMenu } from "./card-ux-launcher.js";
import { createEventDispatcher } from "./client.js";
import {
@ -254,7 +254,7 @@ function resolveFeishuDebounceMentions(params: {
function registerEventHandlers(
eventDispatcher: Lark.EventDispatcher,
context: RegisterEventHandlersContext,
): void {
): { unregisterDebouncer: () => void } {
const { cfg, accountId, runtime, chatHistories, fireAndForget } = context;
const core = getFeishuRuntime();
const inboundDebounceMs = core.channel.debounce.resolveInboundDebounceMs({
@ -617,6 +617,8 @@ function registerEventHandlers(
}
},
});
return { unregisterDebouncer: inboundDebouncer.unregister };
}
export type BotOpenIdSource =
@ -639,7 +641,10 @@ export async function monitorSingleAccount(params: MonitorSingleAccountParams):
const botOpenIdSource = params.botOpenIdSource ?? { kind: "fetch" };
const botIdentity =
botOpenIdSource.kind === "prefetched"
? { botOpenId: botOpenIdSource.botOpenId, botName: botOpenIdSource.botName }
? {
botOpenId: botOpenIdSource.botOpenId,
botName: botOpenIdSource.botName,
}
: await fetchBotIdentityForMonitor(account, { runtime, abortSignal });
const botOpenId = botIdentity.botOpenId;
const botName = botIdentity.botName?.trim();
@ -670,7 +675,7 @@ export async function monitorSingleAccount(params: MonitorSingleAccountParams):
const chatHistories = new Map<string, HistoryEntry[]>();
threadBindingManager = createFeishuThreadBindingManager({ accountId, cfg });
registerEventHandlers(eventDispatcher, {
const { unregisterDebouncer } = registerEventHandlers(eventDispatcher, {
cfg,
accountId,
runtime,
@ -678,10 +683,26 @@ export async function monitorSingleAccount(params: MonitorSingleAccountParams):
fireAndForget: true,
});
if (connectionMode === "webhook") {
return await monitorWebhook({ account, accountId, runtime, abortSignal, eventDispatcher });
try {
if (connectionMode === "webhook") {
return await monitorWebhook({
account,
accountId,
runtime,
abortSignal,
eventDispatcher,
});
}
return await monitorWebSocket({
account,
accountId,
runtime,
abortSignal,
eventDispatcher,
});
} finally {
unregisterDebouncer();
}
return await monitorWebSocket({ account, accountId, runtime, abortSignal, eventDispatcher });
} finally {
threadBindingManager?.stop();
}

View File

@ -711,7 +711,9 @@ describe("Feishu inbound debounce regressions", () => {
enqueueMock(item);
params.onError?.(new Error("dispatch failed"), [item]);
},
flushKey: async () => {},
flushKey: async (_key: string) => false,
flushAll: async () => 0,
unregister: () => {},
}),
resolveInboundDebounceMs,
},

View File

@ -17,7 +17,9 @@ export function createFeishuRuntimeMockModule(): {
resolveInboundDebounceMs: () => number;
createInboundDebouncer: () => {
enqueue: () => Promise<void>;
flushKey: () => Promise<void>;
flushKey: () => Promise<boolean>;
flushAll: () => Promise<number>;
unregister: () => void;
};
};
text: {
@ -33,7 +35,9 @@ export function createFeishuRuntimeMockModule(): {
resolveInboundDebounceMs: () => 0,
createInboundDebouncer: () => ({
enqueue: async () => {},
flushKey: async () => {},
flushKey: async () => false,
flushAll: async () => 0,
unregister: () => {},
}),
},
text: {

View File

@ -1686,6 +1686,7 @@ export async function monitorMattermostProvider(opts: MonitorMattermostOpts = {}
},
});
} finally {
debouncer.unregister();
unregisterInteractions?.();
}

View File

@ -33,6 +33,9 @@ const runtimeStub: PluginRuntime = {
resolveInboundDebounceMs: () => 0,
createInboundDebouncer: () => ({
enqueue: async () => {},
flushKey: async () => false,
flushAll: async () => 0,
unregister: () => {},
}),
},
},
@ -138,7 +141,7 @@ function createConsentInvokeHarness(params: {
contentType: "text/plain",
conversationId: params.pendingConversationId ?? "19:victim@thread.v2",
});
const handler = registerMSTeamsHandlers(createActivityHandler(), createDeps());
const { handler } = registerMSTeamsHandlers(createActivityHandler(), createDeps());
const { context, sendActivity } = createInvokeContext({
conversationId: params.invokeConversationId,
uploadId,

View File

@ -140,8 +140,8 @@ async function handleFileConsentInvoke(
export function registerMSTeamsHandlers<T extends MSTeamsActivityHandler>(
handler: T,
deps: MSTeamsMessageHandlerDeps,
): T {
const handleTeamsMessage = createMSTeamsMessageHandler(deps);
): { handler: T; unregisterDebouncer: () => void } {
const { handleTeamsMessage, unregisterDebouncer } = createMSTeamsMessageHandler(deps);
// Wrap the original run method to intercept invokes
const originalRun = handler.run;
@ -151,7 +151,10 @@ export function registerMSTeamsHandlers<T extends MSTeamsActivityHandler>(
// Handle file consent invokes before passing to normal flow
if (ctx.activity?.type === "invoke" && ctx.activity?.name === "fileConsent/invoke") {
// Send invoke response IMMEDIATELY to prevent Teams timeout
await ctx.sendActivity({ type: "invokeResponse", value: { status: 200 } });
await ctx.sendActivity({
type: "invokeResponse",
value: { status: 200 },
});
try {
await withRevokedProxyFallback({
@ -164,7 +167,9 @@ export function registerMSTeamsHandlers<T extends MSTeamsActivityHandler>(
},
});
} catch (err) {
deps.log.debug?.("file consent handler error", { error: String(err) });
deps.log.debug?.("file consent handler error", {
error: String(err),
});
}
return;
}
@ -192,5 +197,5 @@ export function registerMSTeamsHandlers<T extends MSTeamsActivityHandler>(
await next();
});
return handler;
return { handler, unregisterDebouncer };
}

View File

@ -14,10 +14,18 @@ describe("msteams monitor handler authz", () => {
resolveInboundDebounceMs: () => 0,
createInboundDebouncer: <T>(params: {
onFlush: (entries: T[]) => Promise<void>;
}): { enqueue: (entry: T) => Promise<void> } => ({
}): {
enqueue: (entry: T) => Promise<void>;
flushKey: (_key: string) => Promise<boolean>;
flushAll: () => Promise<number>;
unregister: () => void;
} => ({
enqueue: async (entry: T) => {
await params.onFlush([entry]);
},
flushKey: async (_key: string) => false,
flushAll: async () => 0,
unregister: () => {},
}),
},
pairing: {
@ -71,8 +79,8 @@ describe("msteams monitor handler authz", () => {
},
} as OpenClawConfig);
const handler = createMSTeamsMessageHandler(deps);
await handler({
const { handleTeamsMessage } = createMSTeamsMessageHandler(deps);
await handleTeamsMessage({
activity: {
id: "msg-1",
type: "message",
@ -122,8 +130,8 @@ describe("msteams monitor handler authz", () => {
},
} as OpenClawConfig);
const handler = createMSTeamsMessageHandler(deps);
await handler({
const { handleTeamsMessage } = createMSTeamsMessageHandler(deps);
await handleTeamsMessage({
activity: {
id: "msg-1",
type: "message",

View File

@ -1,24 +1,24 @@
import {
DEFAULT_ACCOUNT_ID,
DEFAULT_GROUP_HISTORY_LIMIT,
buildPendingHistoryContextFromMap,
clearHistoryEntriesIfEnabled,
createChannelPairingController,
dispatchReplyFromConfigWithSettledDispatcher,
DEFAULT_GROUP_HISTORY_LIMIT,
logInboundDrop,
evaluateSenderGroupAccessForPolicy,
resolveSenderScopedGroupPolicy,
recordPendingHistoryEntryIfEnabled,
resolveDualTextControlCommandGate,
resolveDefaultGroupPolicy,
isDangerousNameMatchingEnabled,
readStoreAllowFromForDmPolicy,
resolveMentionGating,
resolveInboundSessionEnvelopeContext,
formatAllowlistMatchMeta,
resolveEffectiveAllowFromLists,
resolveDmGroupAccessWithLists,
type HistoryEntry,
isDangerousNameMatchingEnabled,
logInboundDrop,
readStoreAllowFromForDmPolicy,
recordPendingHistoryEntryIfEnabled,
resolveDefaultGroupPolicy,
resolveDmGroupAccessWithLists,
resolveDualTextControlCommandGate,
resolveEffectiveAllowFromLists,
resolveInboundSessionEnvelopeContext,
resolveMentionGating,
resolveSenderScopedGroupPolicy,
} from "../../runtime-api.js";
import {
buildMSTeamsAttachmentPlaceholder,
@ -675,7 +675,7 @@ export function createMSTeamsMessageHandler(deps: MSTeamsMessageHandlerDeps) {
},
});
return async function handleTeamsMessage(context: MSTeamsTurnContext) {
const handleTeamsMessage = async (context: MSTeamsTurnContext) => {
const activity = context.activity;
const rawText = activity.text?.trim() ?? "";
const text = stripMSTeamsMentionTags(rawText);
@ -698,4 +698,9 @@ export function createMSTeamsMessageHandler(deps: MSTeamsMessageHandlerDeps) {
implicitMention,
});
};
return {
handleTeamsMessage,
unregisterDebouncer: inboundDebouncer.unregister,
};
}

View File

@ -30,7 +30,9 @@ vi.mock("../runtime-api.js", () => ({
resolve();
return;
}
params.abortSignal?.addEventListener("abort", () => resolve(), { once: true });
params.abortSignal?.addEventListener("abort", () => resolve(), {
once: true,
});
});
await params.onAbort?.();
},
@ -80,7 +82,8 @@ vi.mock("express", () => {
const registerMSTeamsHandlers = vi.hoisted(() =>
vi.fn(() => ({
run: vi.fn(async () => {}),
handler: { run: vi.fn(async () => {}) },
unregisterDebouncer: vi.fn(),
})),
);
const createMSTeamsAdapter = vi.hoisted(() =>

View File

@ -12,7 +12,7 @@ import { createMSTeamsConversationStoreFs } from "./conversation-store-fs.js";
import type { MSTeamsConversationStore } from "./conversation-store.js";
import { formatUnknownError } from "./errors.js";
import type { MSTeamsAdapter } from "./messenger.js";
import { registerMSTeamsHandlers, type MSTeamsActivityHandler } from "./monitor-handler.js";
import { type MSTeamsActivityHandler, registerMSTeamsHandlers } from "./monitor-handler.js";
import { createMSTeamsPollStoreFs, type MSTeamsPollStore } from "./polls.js";
import {
resolveMSTeamsChannelAllowlist,
@ -136,12 +136,19 @@ export async function monitorMSTeamsProvider(
.filter((entry) => entry && entry !== "*");
if (groupEntries.length > 0) {
const { additions } = await resolveAllowlistUsers("msteams group users", groupEntries);
groupAllowFrom = mergeAllowlist({ existing: groupAllowFrom, additions });
groupAllowFrom = mergeAllowlist({
existing: groupAllowFrom,
additions,
});
}
}
if (teamsConfig && Object.keys(teamsConfig).length > 0) {
const entries: Array<{ input: string; teamKey: string; channelKey?: string }> = [];
const entries: Array<{
input: string;
teamKey: string;
channelKey?: string;
}> = [];
for (const [teamKey, teamCfg] of Object.entries(teamsConfig)) {
if (teamKey === "*") {
continue;
@ -190,7 +197,11 @@ export async function monitorMSTeamsProvider(
...sourceTeam.channels,
...existing.channels,
};
const mergedTeam = { ...sourceTeam, ...existing, channels: mergedChannels };
const mergedTeam = {
...sourceTeam,
...existing,
channels: mergedChannels,
};
nextTeams[entry.teamId] = mergedTeam;
if (source.channelKey && entry.channelId) {
const sourceChannel = sourceTeam.channels?.[source.channelKey];
@ -254,18 +265,21 @@ export async function monitorMSTeamsProvider(
const tokenProvider = new MsalTokenProvider(authConfig);
const adapter = createMSTeamsAdapter(authConfig, sdk);
const handler = registerMSTeamsHandlers(new ActivityHandler() as MSTeamsActivityHandler, {
cfg,
runtime,
appId,
adapter: adapter as unknown as MSTeamsAdapter,
tokenProvider,
textLimit,
mediaMaxBytes,
conversationStore,
pollStore,
log,
});
const { handler, unregisterDebouncer } = registerMSTeamsHandlers(
new ActivityHandler() as MSTeamsActivityHandler,
{
cfg,
runtime,
appId,
adapter: adapter as unknown as MSTeamsAdapter,
tokenProvider,
textLimit,
mediaMaxBytes,
conversationStore,
pollStore,
log,
},
);
// Create Express server
const expressApp = express.default();
@ -283,7 +297,7 @@ export async function monitorMSTeamsProvider(
const configuredPath = msteamsCfg.webhook?.path ?? "/api/messages";
const messageHandler = (req: Request, res: Response) => {
void adapter
.process(req, res, (context: unknown) => handler.run!(context))
.process(req, res, (context: unknown) => handler.run?.(context))
.catch((err: unknown) => {
log.error("msteams webhook failed", { error: formatUnknownError(err) });
});
@ -301,21 +315,29 @@ export async function monitorMSTeamsProvider(
});
// Start listening and fail fast if bind/listen fails.
const httpServer = expressApp.listen(port);
await new Promise<void>((resolve, reject) => {
const onListening = () => {
httpServer.off("error", onError);
log.info(`msteams provider started on port ${port}`);
resolve();
};
const onError = (err: unknown) => {
httpServer.off("listening", onListening);
log.error("msteams server error", { error: String(err) });
reject(err);
};
httpServer.once("listening", onListening);
httpServer.once("error", onError);
});
let httpServer: ReturnType<typeof expressApp.listen>;
try {
httpServer = expressApp.listen(port);
await new Promise<void>((resolve, reject) => {
const onListening = () => {
httpServer.off("error", onError);
log.info(`msteams provider started on port ${port}`);
resolve();
};
const onError = (err: unknown) => {
httpServer.off("listening", onListening);
log.error("msteams server error", { error: String(err) });
reject(err);
};
httpServer.once("listening", onListening);
httpServer.once("error", onError);
});
} catch (err) {
// Clean up the debouncer so it does not linger in the global registry
// when the provider fails to start (e.g. port already in use).
unregisterDebouncer();
throw err;
}
applyMSTeamsWebhookTimeouts(httpServer);
httpServer.on("error", (err) => {
@ -324,6 +346,7 @@ export async function monitorMSTeamsProvider(
const shutdown = async () => {
log.info("shutting down msteams provider");
unregisterDebouncer();
return new Promise<void>((resolve) => {
httpServer.close((err) => {
if (err) {

View File

@ -12,6 +12,8 @@ vi.mock("../../../../src/auto-reply/inbound-debounce.js", () => ({
createInboundDebouncer: () => ({
enqueue: (entry: unknown) => enqueueMock(entry),
flushKey: (key: string) => flushKeyMock(key),
flushAll: async () => 0,
unregister: () => {},
}),
}));

View File

@ -15,6 +15,10 @@ export type SlackMessageHandler = (
opts: { source: "message" | "app_mention"; wasMentioned?: boolean },
) => Promise<void>;
export type SlackMessageHandlerWithLifecycle = SlackMessageHandler & {
deactivate: () => void;
};
const APP_MENTION_RETRY_TTL_MS = 60_000;
function resolveSlackSenderId(message: SlackMessageEvent): string | null {
@ -92,7 +96,7 @@ export function createSlackMessageHandler(params: {
account: ResolvedSlackAccount;
/** Called on each inbound event to update liveness tracking. */
trackEvent?: () => void;
}): SlackMessageHandler {
}): SlackMessageHandlerWithLifecycle {
const { ctx, account, trackEvent } = params;
const { debounceMs, debouncer } = createChannelInboundDebouncer<{
message: SlackMessageEvent;
@ -206,7 +210,7 @@ export function createSlackMessageHandler(params: {
return true;
};
return async (message, opts) => {
const handler: SlackMessageHandlerWithLifecycle = async (message, opts) => {
if (opts.source === "message" && message.type !== "message") {
return;
}
@ -253,4 +257,10 @@ export function createSlackMessageHandler(params: {
}
await debouncer.enqueue({ message: resolvedMessage, opts });
};
handler.deactivate = () => {
debouncer.unregister();
};
return handler;
}

View File

@ -567,6 +567,7 @@ export async function monitorSlackProvider(opts: MonitorSlackOpts = {}) {
} finally {
opts.abortSignal?.removeEventListener("abort", stopOnAbort);
unregisterHttpHandler?.();
handleSlackMessage.deactivate();
await app.stop().catch(() => undefined);
}
}

View File

@ -110,7 +110,7 @@ export const registerTelegramHandlers = ({
processMessage,
logger,
telegramDeps = defaultTelegramBotDeps,
}: RegisterTelegramHandlerParams) => {
}: RegisterTelegramHandlerParams): { unregisterDebouncer: () => void } => {
const DEFAULT_TEXT_FRAGMENT_MAX_GAP_MS = 1500;
const TELEGRAM_TEXT_FRAGMENT_START_THRESHOLD_CHARS = 4000;
const TELEGRAM_TEXT_FRAGMENT_MAX_GAP_MS =
@ -1759,4 +1759,6 @@ export const registerTelegramHandlers = ({
errorMessage: "channel_post handler failed",
});
});
return { unregisterDebouncer: inboundDebouncer.unregister };
};

View File

@ -324,7 +324,7 @@ export function createTelegramBot(opts: TelegramBotOptions) {
const MAX_RAW_UPDATE_ARRAY = 20;
const stringifyUpdate = (update: unknown) => {
const seen = new WeakSet();
return JSON.stringify(update ?? null, (key, value) => {
return JSON.stringify(update ?? null, (_key, value) => {
if (typeof value === "string" && value.length > MAX_RAW_UPDATE_STRING) {
return `${value.slice(0, MAX_RAW_UPDATE_STRING)}...`;
}
@ -531,7 +531,7 @@ export function createTelegramBot(opts: TelegramBotOptions) {
telegramDeps,
});
registerTelegramHandlers({
const { unregisterDebouncer } = registerTelegramHandlers({
cfg,
accountId: account.accountId,
bot,
@ -552,6 +552,7 @@ export function createTelegramBot(opts: TelegramBotOptions) {
const originalStop = bot.stop.bind(bot);
bot.stop = ((...args: Parameters<typeof originalStop>) => {
unregisterDebouncer();
threadBindingManager?.stop();
return originalStop(...args);
}) as typeof bot.stop;

View File

@ -461,6 +461,7 @@ export async function monitorWebInbox(options: {
return {
close: async () => {
try {
debouncer.unregister();
const ev = sock.ev as unknown as {
off?: (event: string, listener: (...args: unknown[]) => void) => void;
removeListener?: (event: string, listener: (...args: unknown[]) => void) => void;

View File

@ -1,5 +1,91 @@
import type { OpenClawConfig } from "../config/config.js";
import type { InboundDebounceByProvider } from "../config/types.messages.js";
import { resolveGlobalMap } from "../shared/global-singleton.js";
/**
* Global registry of all active inbound debouncers so they can be flushed
* collectively during gateway restart (SIGUSR1). Each debouncer registers
* itself on creation and stays registered until a complete global flush
* drains it or the owner explicitly unregisters it during teardown.
*/
type DebouncerFlushResult = {
flushedCount: number;
drained: boolean;
};
type DebouncerFlushHandle = {
flushAll: (options?: { deadlineMs?: number }) => Promise<DebouncerFlushResult>;
unregister: () => void;
/** Epoch ms of last enqueue or creation, whichever is more recent. */
lastActivityMs: number;
};
const INBOUND_DEBOUNCERS_KEY = Symbol.for("openclaw.inboundDebouncers");
const INBOUND_DEBOUNCERS = resolveGlobalMap<symbol, DebouncerFlushHandle>(INBOUND_DEBOUNCERS_KEY);
/**
* Clear the global debouncer registry. Intended for test cleanup only.
*/
export function clearInboundDebouncerRegistry(): void {
INBOUND_DEBOUNCERS.clear();
}
/** Debouncers idle longer than this are auto-removed during flush as a safety
* net against channels that forget to call unregister() on teardown. */
const STALE_DEBOUNCER_MS = 5 * 60 * 1000; // 5 minutes
/**
* Flush all registered inbound debouncers immediately. Called during SIGUSR1
* restart to push buffered messages into the session before reinitializing.
* Returns the number of debounce buffers actually flushed so restart logic can
* skip followup draining when there was no buffered work.
*
* Stale debouncers (no enqueue activity for >5 minutes) are auto-evicted as a
* safety net in case a channel monitor forgot to call unregister() on teardown.
*/
export async function flushAllInboundDebouncers(options?: { timeoutMs?: number }): Promise<number> {
const entries = [...INBOUND_DEBOUNCERS.entries()];
if (entries.length === 0) {
return 0;
}
const now = Date.now();
const deadlineMs =
typeof options?.timeoutMs === "number" && Number.isFinite(options.timeoutMs)
? now + Math.max(0, Math.trunc(options.timeoutMs))
: undefined;
const flushedCounts = await Promise.all(
entries.map(async ([_key, handle]) => {
let result: DebouncerFlushResult;
try {
result = await (deadlineMs !== undefined
? Promise.race([
handle.flushAll({ deadlineMs }),
new Promise<DebouncerFlushResult>((resolve) => {
const timer = setTimeout(
() => resolve({ flushedCount: 0, drained: false }),
Math.max(0, deadlineMs - Date.now()),
);
timer.unref?.();
}),
])
: handle.flushAll({ deadlineMs }));
} catch {
// A hung or failing flushAll should not prevent other debouncers
// from being swept. Keep the handle registered for a future sweep.
return 0;
}
// Only deregister AFTER the handle confirms all its buffers are
// drained. If the deadline hit mid-sweep, keep partially-flushed
// handles registered so subsequent sweeps can finish the job.
// Also auto-evict stale entries whose owning channel never called
// unregister() (e.g. after reconnect).
if (result.drained || now - handle.lastActivityMs >= STALE_DEBOUNCER_MS) {
handle.unregister();
}
return result.flushedCount;
}),
);
return flushedCounts.reduce((total, count) => total + count, 0);
}
const resolveMs = (value: unknown): number | undefined => {
if (typeof value !== "number" || !Number.isFinite(value)) {
@ -60,6 +146,7 @@ export function createInboundDebouncer<T>(params: InboundDebounceCreateParams<T>
return Math.max(0, Math.trunc(resolved));
};
// Returns true when the buffer had pending messages that were delivered.
const flushBuffer = async (key: string, buffer: DebounceBuffer<T>) => {
buffers.delete(key);
if (buffer.timeout) {
@ -67,21 +154,24 @@ export function createInboundDebouncer<T>(params: InboundDebounceCreateParams<T>
buffer.timeout = null;
}
if (buffer.items.length === 0) {
return;
return false;
}
let delivered = false;
try {
await params.onFlush(buffer.items);
delivered = true;
} catch (err) {
params.onError?.(err, buffer.items);
}
return delivered;
};
const flushKey = async (key: string) => {
const buffer = buffers.get(key);
if (!buffer) {
return;
return false;
}
await flushBuffer(key, buffer);
return flushBuffer(key, buffer);
};
const scheduleFlush = (key: string, buffer: DebounceBuffer<T>) => {
@ -95,6 +185,7 @@ export function createInboundDebouncer<T>(params: InboundDebounceCreateParams<T>
};
const enqueue = async (item: T) => {
handle.lastActivityMs = Date.now();
const key = params.buildKey(item);
const debounceMs = resolveDebounceMs(item);
const canDebounce = debounceMs > 0 && (params.shouldDebounce?.(item) ?? true);
@ -119,10 +210,75 @@ export function createInboundDebouncer<T>(params: InboundDebounceCreateParams<T>
return;
}
const buffer: DebounceBuffer<T> = { items: [item], timeout: null, debounceMs };
const buffer: DebounceBuffer<T> = {
items: [item],
timeout: null,
debounceMs,
};
buffers.set(key, buffer);
scheduleFlush(key, buffer);
};
return { enqueue, flushKey };
const flushAllInternal = async (options?: {
deadlineMs?: number;
}): Promise<DebouncerFlushResult> => {
let flushedBufferCount = 0;
// Keep sweeping until no debounced keys remain. A flush callback can race
// with late in-flight ingress and create another buffered key before the
// global registry deregisters this debouncer during restart.
while (buffers.size > 0) {
if (options?.deadlineMs !== undefined && Date.now() >= options.deadlineMs) {
return {
flushedCount: flushedBufferCount,
drained: buffers.size === 0,
};
}
const keys = [...buffers.keys()];
for (const key of keys) {
if (options?.deadlineMs !== undefined && Date.now() >= options.deadlineMs) {
return {
flushedCount: flushedBufferCount,
drained: buffers.size === 0,
};
}
if (!buffers.has(key)) {
continue;
}
try {
const hadMessages = await flushKey(key);
if (hadMessages) {
flushedBufferCount += 1;
}
} catch {
// flushBuffer already routed the failure through onError; keep
// sweeping so one bad key cannot strand later buffered messages.
}
}
}
return {
flushedCount: flushedBufferCount,
drained: buffers.size === 0,
};
};
const flushAll = async (options?: { deadlineMs?: number }) => {
const result = await flushAllInternal(options);
return result.flushedCount;
};
// Register in global registry for SIGUSR1 flush.
const registryKey = Symbol();
const unregister = () => {
INBOUND_DEBOUNCERS.delete(registryKey);
};
const handle: DebouncerFlushHandle = {
flushAll: flushAllInternal,
unregister,
lastActivityMs: Date.now(),
};
INBOUND_DEBOUNCERS.set(registryKey, handle);
return { enqueue, flushKey, flushAll, unregister };
}

View File

@ -1,10 +1,14 @@
import fs from "node:fs/promises";
import os from "node:os";
import path from "node:path";
import { describe, expect, it, vi } from "vitest";
import { afterEach, beforeEach, describe, expect, it, vi } from "vitest";
import type { OpenClawConfig } from "../config/config.js";
import type { GroupKeyResolution } from "../config/sessions.js";
import { createInboundDebouncer } from "./inbound-debounce.js";
import {
clearInboundDebouncerRegistry,
createInboundDebouncer,
flushAllInboundDebouncers,
} from "./inbound-debounce.js";
import { resolveGroupRequireMention } from "./reply/groups.js";
import { finalizeInboundContext } from "./reply/inbound-context.js";
import {
@ -308,7 +312,11 @@ describe("createInboundDebouncer", () => {
vi.useFakeTimers();
const calls: Array<string[]> = [];
const debouncer = createInboundDebouncer<{ key: string; id: string; debounce: boolean }>({
const debouncer = createInboundDebouncer<{
key: string;
id: string;
debounce: boolean;
}>({
debounceMs: 50,
buildKey: (item) => item.key,
shouldDebounce: (item) => item.debounce,
@ -329,7 +337,11 @@ describe("createInboundDebouncer", () => {
vi.useFakeTimers();
const calls: Array<string[]> = [];
const debouncer = createInboundDebouncer<{ key: string; id: string; windowMs: number }>({
const debouncer = createInboundDebouncer<{
key: string;
id: string;
windowMs: number;
}>({
debounceMs: 0,
buildKey: (item) => item.key,
resolveDebounceMs: (item) => item.windowMs,
@ -349,6 +361,383 @@ describe("createInboundDebouncer", () => {
});
});
describe("flushAllInboundDebouncers", () => {
// Clear registry before each test to avoid leaking state from other tests
// that create debouncers.
beforeEach(() => {
clearInboundDebouncerRegistry();
});
afterEach(() => {
clearInboundDebouncerRegistry();
});
it("flushes all pending inbound debounce buffers immediately", async () => {
vi.useFakeTimers();
const callsA: Array<string[]> = [];
const callsB: Array<string[]> = [];
const debouncerA = createInboundDebouncer<{ key: string; id: string }>({
debounceMs: 5000,
buildKey: (item) => item.key,
onFlush: async (items) => {
callsA.push(items.map((entry) => entry.id));
},
});
const debouncerB = createInboundDebouncer<{ key: string; id: string }>({
debounceMs: 5000,
buildKey: (item) => item.key,
onFlush: async (items) => {
callsB.push(items.map((entry) => entry.id));
},
});
await debouncerA.enqueue({ key: "session-1", id: "msg-1" });
await debouncerA.enqueue({ key: "session-1", id: "msg-2" });
await debouncerB.enqueue({ key: "session-2", id: "msg-3" });
// Nothing flushed yet (timers haven't fired)
expect(callsA).toEqual([]);
expect(callsB).toEqual([]);
const flushed = await flushAllInboundDebouncers();
expect(flushed).toBe(2);
expect(callsA).toEqual([["msg-1", "msg-2"]]);
expect(callsB).toEqual([["msg-3"]]);
vi.useRealTimers();
});
it("counts pending buffers instead of registered debouncers", async () => {
vi.useFakeTimers();
const calls: Array<string[]> = [];
const activeDebouncer = createInboundDebouncer<{ key: string; id: string }>({
debounceMs: 5000,
buildKey: (item) => item.key,
onFlush: async (items) => {
calls.push(items.map((entry) => entry.id));
},
});
createInboundDebouncer<{ key: string; id: string }>({
debounceMs: 5000,
buildKey: (item) => item.key,
onFlush: async () => {},
});
await activeDebouncer.enqueue({ key: "session-1", id: "msg-1" });
await activeDebouncer.enqueue({ key: "session-2", id: "msg-2" });
const flushed = await flushAllInboundDebouncers();
expect(flushed).toBe(2);
expect(calls).toHaveLength(2);
expect(calls).toContainEqual(["msg-1"]);
expect(calls).toContainEqual(["msg-2"]);
vi.useRealTimers();
});
it("counts only buffers that were delivered successfully", async () => {
vi.useFakeTimers();
const calls: Array<string[]> = [];
const errors: Array<string[]> = [];
const debouncer = createInboundDebouncer<{ key: string; id: string }>({
debounceMs: 5000,
buildKey: (item) => item.key,
onFlush: async (items) => {
const ids = items.map((entry) => entry.id);
if (ids.includes("msg-1")) {
throw new Error("dispatch failed");
}
calls.push(ids);
},
onError: (_err, items) => {
errors.push(items.map((entry) => entry.id));
},
});
await debouncer.enqueue({ key: "session-1", id: "msg-1" });
await debouncer.enqueue({ key: "session-2", id: "msg-2" });
const flushed = await flushAllInboundDebouncers();
expect(flushed).toBe(1);
expect(calls).toEqual([["msg-2"]]);
expect(errors).toEqual([["msg-1"]]);
vi.useRealTimers();
});
it("keeps flushing until no buffered keys remain", async () => {
vi.useFakeTimers();
const calls: Array<string[]> = [];
let enqueuedDuringFlush = false;
let debouncer: ReturnType<typeof createInboundDebouncer<{ key: string; id: string }>>;
debouncer = createInboundDebouncer<{ key: string; id: string }>({
debounceMs: 5000,
buildKey: (item) => item.key,
onFlush: async (items) => {
calls.push(items.map((entry) => entry.id));
if (!enqueuedDuringFlush) {
enqueuedDuringFlush = true;
await debouncer.enqueue({ key: "session-2", id: "msg-2" });
}
},
});
await debouncer.enqueue({ key: "session-1", id: "msg-1" });
const flushed = await flushAllInboundDebouncers();
expect(flushed).toBe(2);
expect(calls).toEqual([["msg-1"], ["msg-2"]]);
await expect(flushAllInboundDebouncers()).resolves.toBe(0);
vi.useRealTimers();
});
it("keeps timed-out debouncers registered for a later global sweep", async () => {
vi.useFakeTimers();
const calls: Array<string[]> = [];
let now = 0;
const nowSpy = vi.spyOn(Date, "now").mockImplementation(() => now);
let debouncer: ReturnType<typeof createInboundDebouncer<{ key: string; id: string }>>;
debouncer = createInboundDebouncer<{ key: string; id: string }>({
debounceMs: 5000,
buildKey: (item) => item.key,
onFlush: async (items) => {
calls.push(items.map((entry) => entry.id));
if (items[0]?.id === "msg-1") {
await debouncer.enqueue({ key: "session-2", id: "msg-2" });
now = 20;
}
},
});
try {
await debouncer.enqueue({ key: "session-1", id: "msg-1" });
const flushed = await flushAllInboundDebouncers({ timeoutMs: 10 });
expect(flushed).toBe(1);
expect(calls).toEqual([["msg-1"]]);
now = 0;
const flushedLater = await flushAllInboundDebouncers({ timeoutMs: 10 });
expect(flushedLater).toBe(1);
expect(calls).toEqual([["msg-1"], ["msg-2"]]);
} finally {
nowSpy.mockRestore();
vi.useRealTimers();
}
});
it("returns 0 when no debouncers are registered", async () => {
const flushed = await flushAllInboundDebouncers();
expect(flushed).toBe(0);
});
it("lets callers unregister a debouncer from the global registry", async () => {
vi.useFakeTimers();
const calls: Array<string[]> = [];
const debouncer = createInboundDebouncer<{ key: string; id: string }>({
debounceMs: 5000,
buildKey: (item) => item.key,
onFlush: async (items) => {
calls.push(items.map((entry) => entry.id));
},
});
await debouncer.enqueue({ key: "session-1", id: "msg-1" });
debouncer.unregister();
expect(await flushAllInboundDebouncers()).toBe(0);
expect(calls).toEqual([]);
await debouncer.flushAll();
expect(calls).toEqual([["msg-1"]]);
vi.useRealTimers();
});
it("deregisters debouncers from global registry after flush", async () => {
vi.useFakeTimers();
createInboundDebouncer<{ key: string; id: string }>({
debounceMs: 5000,
buildKey: (item) => item.key,
onFlush: async () => {},
});
// First flush deregisters
await flushAllInboundDebouncers();
// Second flush should find nothing
const flushed = await flushAllInboundDebouncers();
expect(flushed).toBe(0);
vi.useRealTimers();
});
it("auto-evicts stale debouncers idle >5 min even with a tight deadline", async () => {
vi.useFakeTimers();
// Use a debounceMs longer than the staleness window so the debounce
// timeout does NOT fire when we advance the clock.
createInboundDebouncer<{ key: string; id: string }>({
debounceMs: 10 * 60 * 1000,
buildKey: (item) => item.key,
onFlush: async () => {},
});
// Advance past the 5-minute staleness window (debounce timer still pending)
vi.advanceTimersByTime(5 * 60 * 1000 + 1);
// Flush with a zero-ms timeout. The deadline fires immediately so the
// debouncer cannot actually drain, but the staleness guard evicts it.
await flushAllInboundDebouncers({ timeoutMs: 0 });
// Second flush should find nothing — stale entry was auto-evicted
const flushed2 = await flushAllInboundDebouncers();
expect(flushed2).toBe(0);
vi.useRealTimers();
});
it("does not evict debouncers that have recent activity", async () => {
vi.useFakeTimers();
const debouncer = createInboundDebouncer<{ key: string; id: string }>({
debounceMs: 10 * 60 * 1000,
buildKey: (item) => item.key,
onFlush: async () => {},
});
// Advance 4 minutes (below staleness threshold)
vi.advanceTimersByTime(4 * 60 * 1000);
// Enqueue refreshes the activity timestamp
await debouncer.enqueue({ key: "session-1", id: "msg-1" });
// Advance another 4 minutes (8 total since creation, 4 since enqueue)
vi.advanceTimersByTime(4 * 60 * 1000);
// Flush with zero timeout — debouncer can't drain, but it's NOT stale
// (only 4 min since last enqueue). It should remain registered.
await flushAllInboundDebouncers({ timeoutMs: 0 });
// Debouncer should still be in the registry
// Do a full flush to verify it's still there
const flushed = await flushAllInboundDebouncers();
expect(flushed).toBe(1);
vi.useRealTimers();
});
});
describe("createInboundDebouncer flushAll", () => {
it("flushes all buffered keys", async () => {
vi.useFakeTimers();
const calls: Array<string[]> = [];
const debouncer = createInboundDebouncer<{ key: string; id: string }>({
debounceMs: 5000,
buildKey: (item) => item.key,
onFlush: async (items) => {
calls.push(items.map((entry) => entry.id));
},
});
await debouncer.enqueue({ key: "a", id: "1" });
await debouncer.enqueue({ key: "b", id: "2" });
await debouncer.enqueue({ key: "a", id: "3" });
expect(calls).toEqual([]);
await debouncer.flushAll();
// Both keys flushed
expect(calls).toHaveLength(2);
expect(calls).toContainEqual(["1", "3"]);
expect(calls).toContainEqual(["2"]);
vi.useRealTimers();
});
it("continues flushing later keys when onError throws", async () => {
vi.useFakeTimers();
const calls: Array<string[]> = [];
const errors: Array<string[]> = [];
const debouncer = createInboundDebouncer<{ key: string; id: string }>({
debounceMs: 5000,
buildKey: (item) => item.key,
onFlush: async (items) => {
const ids = items.map((entry) => entry.id);
if (ids.includes("2")) {
throw new Error("dispatch failed");
}
calls.push(ids);
},
onError: (_err, items) => {
errors.push(items.map((entry) => entry.id));
throw new Error("onError failed");
},
});
await debouncer.enqueue({ key: "a", id: "1" });
await debouncer.enqueue({ key: "b", id: "2" });
await debouncer.enqueue({ key: "c", id: "3" });
const flushed = await debouncer.flushAll();
expect(flushed).toBe(2);
expect(calls).toContainEqual(["1"]);
expect(calls).toContainEqual(["3"]);
expect(errors).toEqual([["2"]]);
vi.useRealTimers();
});
it("stops sweeping when the global flush deadline is reached", async () => {
vi.useFakeTimers();
const calls: Array<string[]> = [];
let now = 0;
const nowSpy = vi.spyOn(Date, "now").mockImplementation(() => now);
let debouncer: ReturnType<typeof createInboundDebouncer<{ key: string; id: string }>>;
debouncer = createInboundDebouncer<{ key: string; id: string }>({
debounceMs: 5000,
buildKey: (item) => item.key,
onFlush: async (items) => {
calls.push(items.map((entry) => entry.id));
if (items[0]?.id === "1") {
await debouncer.enqueue({ key: "b", id: "2" });
now = 20;
}
},
});
try {
await debouncer.enqueue({ key: "a", id: "1" });
const flushed = await debouncer.flushAll({ deadlineMs: 10 });
expect(flushed).toBe(1);
expect(calls).toEqual([["1"]]);
now = 0;
const flushedLater = await debouncer.flushAll({ deadlineMs: 10 });
expect(flushedLater).toBe(1);
expect(calls).toEqual([["1"], ["2"]]);
} finally {
nowSpy.mockRestore();
vi.useRealTimers();
}
});
});
describe("initSessionState BodyStripped", () => {
it("prefers BodyForAgent over Body for group chats", async () => {
const root = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-sender-meta-"));

View File

@ -1,7 +1,8 @@
export { extractQueueDirective } from "./queue/directive.js";
export { clearSessionQueues } from "./queue/cleanup.js";
export type { ClearSessionQueueResult } from "./queue/cleanup.js";
export { clearSessionQueues } from "./queue/cleanup.js";
export { extractQueueDirective } from "./queue/directive.js";
export { scheduleFollowupDrain } from "./queue/drain.js";
export { waitForFollowupQueueDrain } from "./queue/drain-all.js";
export {
enqueueFollowupRun,
getFollowupQueueDepth,

View File

@ -0,0 +1,89 @@
import { afterEach, describe, expect, it, vi } from "vitest";
import { waitForFollowupQueueDrain } from "./drain-all.js";
import { FOLLOWUP_QUEUES, type FollowupQueueState } from "./state.js";
function createMockQueue(overrides: Partial<FollowupQueueState> = {}): FollowupQueueState {
return {
items: [],
draining: false,
lastEnqueuedAt: 0,
mode: "followup",
debounceMs: 1000,
cap: 20,
dropPolicy: "summarize",
droppedCount: 0,
summaryLines: [],
...overrides,
};
}
afterEach(() => {
FOLLOWUP_QUEUES.clear();
});
describe("waitForFollowupQueueDrain", () => {
it("returns drained immediately when no queues exist", async () => {
const result = await waitForFollowupQueueDrain(1000);
expect(result).toEqual({ drained: true, remaining: 0 });
});
it("returns drained immediately when all queues are empty", async () => {
FOLLOWUP_QUEUES.set("test", createMockQueue());
const result = await waitForFollowupQueueDrain(1000);
expect(result).toEqual({ drained: true, remaining: 0 });
});
it("waits until queues are drained", async () => {
const queue = createMockQueue({
items: [
{ prompt: "test", run: vi.fn() as unknown, enqueuedAt: Date.now() },
] as FollowupQueueState["items"],
draining: true,
});
FOLLOWUP_QUEUES.set("test", queue);
// Simulate drain completing after 100ms
setTimeout(() => {
queue.items.length = 0;
queue.draining = false;
FOLLOWUP_QUEUES.delete("test");
}, 100);
const result = await waitForFollowupQueueDrain(5000);
expect(result.drained).toBe(true);
expect(result.remaining).toBe(0);
});
it("returns not drained on timeout", async () => {
const queue = createMockQueue({
items: [
{ prompt: "test", run: vi.fn() as unknown, enqueuedAt: Date.now() },
] as FollowupQueueState["items"],
draining: true,
});
FOLLOWUP_QUEUES.set("test", queue);
const result = await waitForFollowupQueueDrain(100);
expect(result.drained).toBe(false);
expect(result.remaining).toBeGreaterThan(0);
});
it("counts draining queues as having pending items even with empty items array", async () => {
const queue = createMockQueue({ draining: true });
FOLLOWUP_QUEUES.set("test", queue);
// Queue has no items but is still draining — should wait
const result = await waitForFollowupQueueDrain(100);
expect(result.drained).toBe(false);
expect(result.remaining).toBeGreaterThanOrEqual(1);
});
it("reports each draining queue in the timeout remaining count", async () => {
FOLLOWUP_QUEUES.set("queue-1", createMockQueue({ draining: true }));
FOLLOWUP_QUEUES.set("queue-2", createMockQueue({ draining: true }));
FOLLOWUP_QUEUES.set("queue-3", createMockQueue({ draining: true }));
const result = await waitForFollowupQueueDrain(1);
expect(result).toEqual({ drained: false, remaining: 3 });
});
});

View File

@ -0,0 +1,44 @@
import { FOLLOWUP_QUEUES } from "./state.js";
/**
* Wait for all followup queues to finish draining, up to `timeoutMs`.
* Returns `{ drained: true }` if all queues are empty, or `{ drained: false }`
* if the timeout was reached with items still pending.
*
* Called during SIGUSR1 restart after flushing inbound debouncers, so the
* newly enqueued items have time to be processed before the server tears down.
*/
export async function waitForFollowupQueueDrain(
timeoutMs: number,
): Promise<{ drained: boolean; remaining: number }> {
const deadline = Date.now() + timeoutMs;
const POLL_INTERVAL_MS = 50;
const getPendingCount = (): number => {
let total = 0;
for (const queue of FOLLOWUP_QUEUES.values()) {
// Add 1 for the in-flight item owned by an active drain loop.
const queuePending = queue.items.length + (queue.draining ? 1 : 0);
total += queuePending;
}
return total;
};
let remaining = getPendingCount();
if (remaining === 0) {
return { drained: true, remaining: 0 };
}
while (Date.now() < deadline) {
await new Promise<void>((resolve) => {
const timer = setTimeout(resolve, Math.min(POLL_INTERVAL_MS, deadline - Date.now()));
timer.unref?.();
});
remaining = getPendingCount();
if (remaining === 0) {
return { drained: true, remaining: 0 };
}
}
return { drained: false, remaining };
}

View File

@ -19,16 +19,29 @@ const scheduleGatewaySigusr1Restart = vi.fn((_opts?: { delayMs?: number; reason?
}));
const getActiveTaskCount = vi.fn(() => 0);
const markGatewayDraining = vi.fn();
const waitForActiveTasks = vi.fn(async (_timeoutMs: number) => ({ drained: true }));
const waitForActiveTasks = vi.fn(async (_timeoutMs: number) => ({
drained: true,
}));
const resetAllLanes = vi.fn();
const restartGatewayProcessWithFreshPid = vi.fn<
() => { mode: "spawned" | "supervised" | "disabled" | "failed"; pid?: number; detail?: string }
() => {
mode: "spawned" | "supervised" | "disabled" | "failed";
pid?: number;
detail?: string;
}
>(() => ({ mode: "disabled" }));
const abortEmbeddedPiRun = vi.fn(
(_sessionId?: string, _opts?: { mode?: "all" | "compacting" }) => false,
);
const getActiveEmbeddedRunCount = vi.fn(() => 0);
const waitForActiveEmbeddedRuns = vi.fn(async (_timeoutMs: number) => ({ drained: true }));
const waitForActiveEmbeddedRuns = vi.fn(async (_timeoutMs: number) => ({
drained: true,
}));
const flushAllInboundDebouncers = vi.fn(async (_options?: { timeoutMs?: number }) => 0);
const waitForFollowupQueueDrain = vi.fn(async (_timeoutMs: number) => ({
drained: true,
remaining: 0,
}));
const DRAIN_TIMEOUT_LOG = "drain timeout reached; proceeding with restart";
const gatewayLog = {
info: vi.fn(),
@ -36,6 +49,15 @@ const gatewayLog = {
error: vi.fn(),
};
vi.mock("../../auto-reply/inbound-debounce.js", () => ({
flushAllInboundDebouncers: (options?: { timeoutMs?: number }) =>
flushAllInboundDebouncers(options),
}));
vi.mock("../../auto-reply/reply/queue/drain-all.js", () => ({
waitForFollowupQueueDrain: (timeoutMs: number) => waitForFollowupQueueDrain(timeoutMs),
}));
vi.mock("../../infra/gateway-lock.js", () => ({
acquireGatewayLock: (opts?: { port?: number }) => acquireGatewayLock(opts),
}));
@ -268,10 +290,14 @@ describe("runGatewayLoop", () => {
expect(start).toHaveBeenCalledTimes(2);
await new Promise<void>((resolve) => setImmediate(resolve));
expect(abortEmbeddedPiRun).toHaveBeenCalledWith(undefined, { mode: "compacting" });
expect(abortEmbeddedPiRun).toHaveBeenCalledWith(undefined, {
mode: "compacting",
});
expect(waitForActiveTasks).toHaveBeenCalledWith(90_000);
expect(waitForActiveEmbeddedRuns).toHaveBeenCalledWith(90_000);
expect(abortEmbeddedPiRun).toHaveBeenCalledWith(undefined, { mode: "all" });
expect(abortEmbeddedPiRun).toHaveBeenCalledWith(undefined, {
mode: "all",
});
expect(markGatewayDraining).toHaveBeenCalledTimes(1);
expect(gatewayLog.warn).toHaveBeenCalledWith(DRAIN_TIMEOUT_LOG);
expect(closeFirst).toHaveBeenCalledWith({
@ -325,6 +351,191 @@ describe("runGatewayLoop", () => {
});
});
it("flushes inbound debouncers before marking gateway draining on SIGUSR1", async () => {
vi.clearAllMocks();
await withIsolatedSignals(async ({ captureSignal }) => {
flushAllInboundDebouncers.mockResolvedValueOnce(2);
waitForFollowupQueueDrain.mockResolvedValueOnce({
drained: true,
remaining: 0,
});
const { exited } = await createSignaledLoopHarness();
const sigusr1 = captureSignal("SIGUSR1");
const sigterm = captureSignal("SIGTERM");
sigusr1();
await new Promise<void>((resolve) => setImmediate(resolve));
await new Promise<void>((resolve) => setImmediate(resolve));
expect(markGatewayDraining).toHaveBeenCalledTimes(1);
expect(flushAllInboundDebouncers).toHaveBeenCalledTimes(1);
expect(flushAllInboundDebouncers).toHaveBeenCalledWith({ timeoutMs: 10_000 });
expect(waitForFollowupQueueDrain).toHaveBeenCalledWith(5_000);
// Flush debouncers BEFORE marking draining so flushed messages can enqueue
expect(flushAllInboundDebouncers.mock.invocationCallOrder[0]).toBeLessThan(
markGatewayDraining.mock.invocationCallOrder[0] ?? Number.POSITIVE_INFINITY,
);
expect(markGatewayDraining.mock.invocationCallOrder[0]).toBeLessThan(
waitForFollowupQueueDrain.mock.invocationCallOrder[0] ?? Number.POSITIVE_INFINITY,
);
expect(flushAllInboundDebouncers.mock.invocationCallOrder[0]).toBeLessThan(
waitForFollowupQueueDrain.mock.invocationCallOrder[0] ?? Number.POSITIVE_INFINITY,
);
expect(gatewayLog.info).toHaveBeenCalledWith(
"flushed 2 pending inbound debounce buffer(s) before restart",
);
expect(gatewayLog.info).toHaveBeenCalledWith("followup queues drained before restart");
sigterm();
await expect(exited).resolves.toBe(0);
});
});
it("extends the restart force-exit timer to include followup queue drain time", async () => {
vi.clearAllMocks();
await withIsolatedSignals(async ({ captureSignal }) => {
flushAllInboundDebouncers.mockResolvedValueOnce(1);
waitForFollowupQueueDrain.mockResolvedValueOnce({
drained: true,
remaining: 0,
});
const setTimeoutSpy = vi.spyOn(globalThis, "setTimeout");
try {
const { exited } = await createSignaledLoopHarness();
const sigusr1 = captureSignal("SIGUSR1");
const sigterm = captureSignal("SIGTERM");
sigusr1();
await new Promise<void>((resolve) => setImmediate(resolve));
await new Promise<void>((resolve) => setImmediate(resolve));
const forceExitCalls = setTimeoutSpy.mock.calls
.map((call) => call[1])
.filter((delay): delay is number => typeof delay === "number" && delay >= 95_000);
expect(forceExitCalls).toContain(95_000);
expect(forceExitCalls).toContain(100_000);
sigterm();
await expect(exited).resolves.toBe(0);
} finally {
setTimeoutSpy.mockRestore();
}
});
});
it("always drains followup queue even when no debouncers had buffered messages", async () => {
vi.clearAllMocks();
await withIsolatedSignals(async ({ captureSignal }) => {
flushAllInboundDebouncers.mockResolvedValueOnce(0);
waitForFollowupQueueDrain.mockResolvedValueOnce({
drained: true,
remaining: 0,
});
const setTimeoutSpy = vi.spyOn(globalThis, "setTimeout");
try {
const { exited } = await createSignaledLoopHarness();
const sigusr1 = captureSignal("SIGUSR1");
const sigterm = captureSignal("SIGTERM");
sigusr1();
await new Promise<void>((resolve) => setImmediate(resolve));
await new Promise<void>((resolve) => setImmediate(resolve));
expect(flushAllInboundDebouncers).toHaveBeenCalledTimes(1);
expect(flushAllInboundDebouncers).toHaveBeenCalledWith({ timeoutMs: 10_000 });
// Followup queue drain is always called regardless of flushedCount
expect(waitForFollowupQueueDrain).toHaveBeenCalledWith(5_000);
expect(markGatewayDraining).toHaveBeenCalledTimes(1);
const forceExitCalls = setTimeoutSpy.mock.calls
.map((call) => call[1])
.filter((delay): delay is number => typeof delay === "number" && delay >= 95_000);
expect(forceExitCalls).toEqual([95_000, 100_000]);
sigterm();
await expect(exited).resolves.toBe(0);
} finally {
setTimeoutSpy.mockRestore();
}
});
});
it("logs warning when followup queue drain times out", async () => {
vi.clearAllMocks();
await withIsolatedSignals(async ({ captureSignal }) => {
flushAllInboundDebouncers.mockResolvedValueOnce(1);
waitForFollowupQueueDrain.mockResolvedValueOnce({
drained: false,
remaining: 3,
});
const { exited } = await createSignaledLoopHarness();
const sigusr1 = captureSignal("SIGUSR1");
const sigterm = captureSignal("SIGTERM");
sigusr1();
await new Promise<void>((resolve) => setImmediate(resolve));
await new Promise<void>((resolve) => setImmediate(resolve));
expect(gatewayLog.warn).toHaveBeenCalledWith(
"followup queue drain timeout; 3 item(s) still pending",
);
sigterm();
await expect(exited).resolves.toBe(0);
});
});
it("re-arms the restart watchdog after a slow debounce flush", async () => {
vi.clearAllMocks();
await withIsolatedSignals(async ({ captureSignal }) => {
let now = 1000;
const nowSpy = vi.spyOn(Date, "now").mockImplementation(() => now);
flushAllInboundDebouncers.mockImplementationOnce(async () => {
now += 20_000;
return 0;
});
const setTimeoutSpy = vi.spyOn(globalThis, "setTimeout");
try {
const { exited } = await createSignaledLoopHarness();
const sigusr1 = captureSignal("SIGUSR1");
const sigterm = captureSignal("SIGTERM");
sigusr1();
await new Promise<void>((resolve) => setImmediate(resolve));
await new Promise<void>((resolve) => setImmediate(resolve));
const forceExitCalls = setTimeoutSpy.mock.calls
.map((call) => call[1])
.filter((delay): delay is number => typeof delay === "number" && delay >= 95_000);
// First arm: 1000 + 5000 + 90000 = 96000, delay = 96000 - 1000 = 95000
// Second arm (after 20s flush): 21000 + 5000 + 90000 + 5000 = 121000,
// delay = 121000 - 21000 = 100000
expect(forceExitCalls).toEqual([95_000, 100_000]);
sigterm();
await expect(exited).resolves.toBe(0);
} finally {
nowSpy.mockRestore();
setTimeoutSpy.mockRestore();
}
});
});
it("releases the lock before exiting on spawned restart", async () => {
vi.clearAllMocks();

View File

@ -3,6 +3,8 @@ import {
getActiveEmbeddedRunCount,
waitForActiveEmbeddedRuns,
} from "../../agents/pi-embedded-runner/runs.js";
import { flushAllInboundDebouncers } from "../../auto-reply/inbound-debounce.js";
import { waitForFollowupQueueDrain } from "../../auto-reply/reply/queue/drain-all.js";
import type { startGatewayServer } from "../../gateway/server.js";
import { acquireGatewayLock } from "../../infra/gateway-lock.js";
import { restartGatewayProcessWithFreshPid } from "../../infra/process-respawn.js";
@ -97,6 +99,8 @@ export async function runGatewayLoop(params: {
};
const DRAIN_TIMEOUT_MS = 90_000;
const FOLLOWUP_DRAIN_TIMEOUT_MS = 5_000;
const INBOUND_DEBOUNCE_FLUSH_TIMEOUT_MS = 10_000;
const SHUTDOWN_TIMEOUT_MS = 5_000;
const request = (action: GatewayRunSignalAction, signal: string) => {
@ -108,24 +112,57 @@ export async function runGatewayLoop(params: {
const isRestart = action === "restart";
gatewayLog.info(`received ${signal}; ${isRestart ? "restarting" : "shutting down"}`);
// Allow extra time for draining active turns on restart.
const forceExitMs = isRestart ? DRAIN_TIMEOUT_MS + SHUTDOWN_TIMEOUT_MS : SHUTDOWN_TIMEOUT_MS;
const forceExitTimer = setTimeout(() => {
gatewayLog.error("shutdown timed out; exiting without full cleanup");
// Exit non-zero on restart timeout so launchd/systemd treats it as a
// failure and triggers a clean process restart instead of assuming the
// shutdown was intentional. Stop-timeout stays at 0 (graceful). (#36822)
exitProcess(isRestart ? 1 : 0);
}, forceExitMs);
const baseForceExitDeadlineMs =
Date.now() + SHUTDOWN_TIMEOUT_MS + (isRestart ? DRAIN_TIMEOUT_MS : 0);
let forceExitTimer: ReturnType<typeof setTimeout> | null = null;
const armForceExitTimer = (deadlineMs: number) => {
if (forceExitTimer) {
clearTimeout(forceExitTimer);
}
forceExitTimer = setTimeout(
() => {
gatewayLog.error("shutdown timed out; exiting without full cleanup");
// Exit non-zero on restart timeout so launchd/systemd treats it as a
// failure and triggers a clean process restart instead of assuming the
// shutdown was intentional. Stop-timeout stays at 0 (graceful). (#36822)
exitProcess(isRestart ? 1 : 0);
},
Math.max(0, deadlineMs - Date.now()),
);
forceExitTimer.unref?.();
};
armForceExitTimer(baseForceExitDeadlineMs);
void (async () => {
try {
// On restart, wait for in-flight agent turns to finish before
// tearing down the server so buffered messages are delivered.
if (isRestart) {
// Reject new enqueues immediately during the drain window so
// sessions get an explicit restart error instead of silent task loss.
// Flush inbound debounce buffers BEFORE marking the gateway as
// draining so flushed messages can still enqueue into the command
// queue. This pushes any messages waiting in per-channel debounce
// timers (e.g. the 2500ms collect window) into the followup queues
// immediately, preventing silent message loss on reinit.
const flushedBuffers = await flushAllInboundDebouncers({
timeoutMs: INBOUND_DEBOUNCE_FLUSH_TIMEOUT_MS,
});
if (flushedBuffers > 0) {
gatewayLog.info(
`flushed ${flushedBuffers} pending inbound debounce buffer(s) before restart`,
);
}
// Now reject new command-queue work so late arrivals fail explicitly
// instead of being stranded. This does not block followup queue
// enqueues, so already-flushed inbound work can still drain normally.
markGatewayDraining();
// Start the restart watchdog budget after the pre-shutdown debounce
// flush so slow flush handlers do not steal time from active drain.
armForceExitTimer(
Date.now() + SHUTDOWN_TIMEOUT_MS + DRAIN_TIMEOUT_MS + FOLLOWUP_DRAIN_TIMEOUT_MS,
);
const activeTasks = getActiveTaskCount();
const activeRuns = getActiveEmbeddedRunCount();
@ -156,6 +193,19 @@ export async function runGatewayLoop(params: {
abortEmbeddedPiRun(undefined, { mode: "all" });
}
}
// Drain followup queues AFTER active tasks finish so tasks that
// produce followup work have a chance to enqueue before we wait.
// Always drain regardless of flushedCount — queued followups are
// not contingent on debouncers.
const followupResult = await waitForFollowupQueueDrain(FOLLOWUP_DRAIN_TIMEOUT_MS);
if (followupResult.drained) {
gatewayLog.info("followup queues drained before restart");
} else {
gatewayLog.warn(
`followup queue drain timeout; ${followupResult.remaining} item(s) still pending`,
);
}
}
await server?.close({
@ -165,7 +215,9 @@ export async function runGatewayLoop(params: {
} catch (err) {
gatewayLog.error(`shutdown error: ${String(err)}`);
} finally {
clearTimeout(forceExitTimer);
if (forceExitTimer) {
clearTimeout(forceExitTimer);
}
server = null;
if (isRestart) {
await handleRestartAfterServerClose();

View File

@ -275,6 +275,8 @@ export function createPluginRuntimeMock(overrides: DeepPartial<PluginRuntime> =
await params.onFlush([item]);
},
flushKey: vi.fn(),
flushAll: vi.fn(async () => 0),
unregister: vi.fn(),
}),
) as unknown as PluginRuntime["channel"]["debounce"]["createInboundDebouncer"],
resolveInboundDebounceMs: vi.fn(