fix: address codex review comments on #46303

- Flush inbound debouncers BEFORE markGatewayDraining() so flushed
  messages can still enqueue into the command queue (CWE-672)
- Reorder restart drain: flush debouncers -> active tasks -> followup
  queues (followups need active turns to finish before they can drain)
- Always drain followup queues regardless of flushed debouncer count
- Only deregister debouncer handles after all buffers confirmed drained;
  keep partially-flushed handles for subsequent sweeps
- Wrap flushAll with deadline-based timeout (Promise.race) to prevent
  hung provider calls from blocking restart indefinitely
- Unregister MSTeams debouncer on startup failure (EADDRINUSE etc)
- Update test expectations for new drain ordering
This commit is contained in:
Joey Krug 2026-03-21 01:07:25 -04:00
parent 1830baed03
commit 46465c9f72
4 changed files with 94 additions and 54 deletions

View File

@ -315,21 +315,29 @@ export async function monitorMSTeamsProvider(
});
// Start listening and fail fast if bind/listen fails.
const httpServer = expressApp.listen(port);
await new Promise<void>((resolve, reject) => {
const onListening = () => {
httpServer.off("error", onError);
log.info(`msteams provider started on port ${port}`);
resolve();
};
const onError = (err: unknown) => {
httpServer.off("listening", onListening);
log.error("msteams server error", { error: String(err) });
reject(err);
};
httpServer.once("listening", onListening);
httpServer.once("error", onError);
});
let httpServer: ReturnType<typeof expressApp.listen>;
try {
httpServer = expressApp.listen(port);
await new Promise<void>((resolve, reject) => {
const onListening = () => {
httpServer.off("error", onError);
log.info(`msteams provider started on port ${port}`);
resolve();
};
const onError = (err: unknown) => {
httpServer.off("listening", onListening);
log.error("msteams server error", { error: String(err) });
reject(err);
};
httpServer.once("listening", onListening);
httpServer.once("error", onError);
});
} catch (err) {
// Clean up the debouncer so it does not linger in the global registry
// when the provider fails to start (e.g. port already in use).
unregisterDebouncer();
throw err;
}
applyMSTeamsWebhookTimeouts(httpServer);
httpServer.on("error", (err) => {

View File

@ -54,9 +54,30 @@ export async function flushAllInboundDebouncers(options?: { timeoutMs?: number }
: undefined;
const flushedCounts = await Promise.all(
entries.map(async ([_key, handle]) => {
const result = await handle.flushAll({ deadlineMs });
// Remove drained debouncers, and auto-evict stale entries whose
// owning channel never called unregister() (e.g. after reconnect).
let result: DebouncerFlushResult;
try {
result = await (deadlineMs !== undefined
? Promise.race([
handle.flushAll({ deadlineMs }),
new Promise<DebouncerFlushResult>((resolve) => {
const timer = setTimeout(
() => resolve({ flushedCount: 0, drained: false }),
Math.max(0, deadlineMs - Date.now()),
);
timer.unref?.();
}),
])
: handle.flushAll({ deadlineMs }));
} catch {
// A hung or failing flushAll should not prevent other debouncers
// from being swept. Keep the handle registered for a future sweep.
return 0;
}
// Only deregister AFTER the handle confirms all its buffers are
// drained. If the deadline hit mid-sweep, keep partially-flushed
// handles registered so subsequent sweeps can finish the job.
// Also auto-evict stale entries whose owning channel never called
// unregister() (e.g. after reconnect).
if (result.drained || now - handle.lastActivityMs >= STALE_DEBOUNCER_MS) {
handle.unregister();
}

View File

@ -351,7 +351,7 @@ describe("runGatewayLoop", () => {
});
});
it("marks gateway draining before flushing inbound debouncers on SIGUSR1", async () => {
it("flushes inbound debouncers before marking gateway draining on SIGUSR1", async () => {
vi.clearAllMocks();
await withIsolatedSignals(async ({ captureSignal }) => {
@ -374,8 +374,9 @@ describe("runGatewayLoop", () => {
expect(flushAllInboundDebouncers).toHaveBeenCalledTimes(1);
expect(flushAllInboundDebouncers).toHaveBeenCalledWith({ timeoutMs: 10_000 });
expect(waitForFollowupQueueDrain).toHaveBeenCalledWith(5_000);
expect(markGatewayDraining.mock.invocationCallOrder[0]).toBeLessThan(
flushAllInboundDebouncers.mock.invocationCallOrder[0] ?? Number.POSITIVE_INFINITY,
// Flush debouncers BEFORE marking draining so flushed messages can enqueue
expect(flushAllInboundDebouncers.mock.invocationCallOrder[0]).toBeLessThan(
markGatewayDraining.mock.invocationCallOrder[0] ?? Number.POSITIVE_INFINITY,
);
expect(markGatewayDraining.mock.invocationCallOrder[0]).toBeLessThan(
waitForFollowupQueueDrain.mock.invocationCallOrder[0] ?? Number.POSITIVE_INFINITY,
@ -387,7 +388,7 @@ describe("runGatewayLoop", () => {
expect(gatewayLog.info).toHaveBeenCalledWith(
"flushed 2 pending inbound debounce buffer(s) before restart",
);
expect(gatewayLog.info).toHaveBeenCalledWith("followup queues drained after debounce flush");
expect(gatewayLog.info).toHaveBeenCalledWith("followup queues drained before restart");
sigterm();
await expect(exited).resolves.toBe(0);
@ -429,11 +430,15 @@ describe("runGatewayLoop", () => {
});
});
it("skips followup queue drain when no debouncers had buffered messages", async () => {
it("always drains followup queue even when no debouncers had buffered messages", async () => {
vi.clearAllMocks();
await withIsolatedSignals(async ({ captureSignal }) => {
flushAllInboundDebouncers.mockResolvedValueOnce(0);
waitForFollowupQueueDrain.mockResolvedValueOnce({
drained: true,
remaining: 0,
});
const setTimeoutSpy = vi.spyOn(globalThis, "setTimeout");
try {
@ -448,12 +453,13 @@ describe("runGatewayLoop", () => {
expect(flushAllInboundDebouncers).toHaveBeenCalledTimes(1);
expect(flushAllInboundDebouncers).toHaveBeenCalledWith({ timeoutMs: 10_000 });
expect(waitForFollowupQueueDrain).not.toHaveBeenCalled();
// Followup queue drain is always called regardless of flushedCount
expect(waitForFollowupQueueDrain).toHaveBeenCalledWith(5_000);
expect(markGatewayDraining).toHaveBeenCalledTimes(1);
const forceExitCalls = setTimeoutSpy.mock.calls
.map((call) => call[1])
.filter((delay): delay is number => typeof delay === "number" && delay >= 95_000);
expect(forceExitCalls).toEqual([95_000, 95_000]);
expect(forceExitCalls).toEqual([95_000, 100_000]);
sigterm();
await expect(exited).resolves.toBe(0);
@ -516,7 +522,10 @@ describe("runGatewayLoop", () => {
const forceExitCalls = setTimeoutSpy.mock.calls
.map((call) => call[1])
.filter((delay): delay is number => typeof delay === "number" && delay >= 95_000);
expect(forceExitCalls).toEqual([95_000, 95_000]);
// First arm: 1000 + 5000 + 90000 = 96000, delay = 96000 - 1000 = 95000
// Second arm (after 20s flush): 21000 + 5000 + 90000 + 5000 = 121000,
// delay = 121000 - 21000 = 100000
expect(forceExitCalls).toEqual([95_000, 100_000]);
sigterm();
await expect(exited).resolves.toBe(0);

View File

@ -138,42 +138,31 @@ export async function runGatewayLoop(params: {
// On restart, wait for in-flight agent turns to finish before
// tearing down the server so buffered messages are delivered.
if (isRestart) {
// Reject new command-queue work before any awaited restart drain
// step so late arrivals fail explicitly instead of being stranded
// behind a one-shot debounce flush. This does not block followup
// queue enqueues, so flushed inbound work can still drain normally.
markGatewayDraining();
// Flush inbound debounce buffers first. This pushes any messages
// waiting in per-channel debounce timers (e.g. the 2500ms collect
// window) into the followup queues immediately, preventing silent
// message loss when the server reinitializes.
// Flush inbound debounce buffers BEFORE marking the gateway as
// draining so flushed messages can still enqueue into the command
// queue. This pushes any messages waiting in per-channel debounce
// timers (e.g. the 2500ms collect window) into the followup queues
// immediately, preventing silent message loss on reinit.
const flushedBuffers = await flushAllInboundDebouncers({
timeoutMs: INBOUND_DEBOUNCE_FLUSH_TIMEOUT_MS,
});
// Start the restart watchdog budget after the pre-shutdown debounce
// flush so slow flush handlers do not steal time from active drain.
armForceExitTimer(
Date.now() +
SHUTDOWN_TIMEOUT_MS +
DRAIN_TIMEOUT_MS +
(flushedBuffers > 0 ? FOLLOWUP_DRAIN_TIMEOUT_MS : 0),
);
if (flushedBuffers > 0) {
gatewayLog.info(
`flushed ${flushedBuffers} pending inbound debounce buffer(s) before restart`,
);
// Give the followup queue drain loops a short window to process
// the newly flushed items before the server is torn down.
const followupResult = await waitForFollowupQueueDrain(FOLLOWUP_DRAIN_TIMEOUT_MS);
if (followupResult.drained) {
gatewayLog.info("followup queues drained after debounce flush");
} else {
gatewayLog.warn(
`followup queue drain timeout; ${followupResult.remaining} item(s) still pending`,
);
}
}
// Now reject new command-queue work so late arrivals fail explicitly
// instead of being stranded. This does not block followup queue
// enqueues, so already-flushed inbound work can still drain normally.
markGatewayDraining();
// Start the restart watchdog budget after the pre-shutdown debounce
// flush so slow flush handlers do not steal time from active drain.
armForceExitTimer(
Date.now() + SHUTDOWN_TIMEOUT_MS + DRAIN_TIMEOUT_MS + FOLLOWUP_DRAIN_TIMEOUT_MS,
);
const activeTasks = getActiveTaskCount();
const activeRuns = getActiveEmbeddedRunCount();
@ -204,6 +193,19 @@ export async function runGatewayLoop(params: {
abortEmbeddedPiRun(undefined, { mode: "all" });
}
}
// Drain followup queues AFTER active tasks finish so tasks that
// produce followup work have a chance to enqueue before we wait.
// Always drain regardless of flushedCount — queued followups are
// not contingent on debouncers.
const followupResult = await waitForFollowupQueueDrain(FOLLOWUP_DRAIN_TIMEOUT_MS);
if (followupResult.drained) {
gatewayLog.info("followup queues drained before restart");
} else {
gatewayLog.warn(
`followup queue drain timeout; ${followupResult.remaining} item(s) still pending`,
);
}
}
await server?.close({