diff --git a/extensions/msteams/src/monitor.ts b/extensions/msteams/src/monitor.ts index 993705b2d30..d66ada1dcf9 100644 --- a/extensions/msteams/src/monitor.ts +++ b/extensions/msteams/src/monitor.ts @@ -315,21 +315,29 @@ export async function monitorMSTeamsProvider( }); // Start listening and fail fast if bind/listen fails. - const httpServer = expressApp.listen(port); - await new Promise((resolve, reject) => { - const onListening = () => { - httpServer.off("error", onError); - log.info(`msteams provider started on port ${port}`); - resolve(); - }; - const onError = (err: unknown) => { - httpServer.off("listening", onListening); - log.error("msteams server error", { error: String(err) }); - reject(err); - }; - httpServer.once("listening", onListening); - httpServer.once("error", onError); - }); + let httpServer: ReturnType; + try { + httpServer = expressApp.listen(port); + await new Promise((resolve, reject) => { + const onListening = () => { + httpServer.off("error", onError); + log.info(`msteams provider started on port ${port}`); + resolve(); + }; + const onError = (err: unknown) => { + httpServer.off("listening", onListening); + log.error("msteams server error", { error: String(err) }); + reject(err); + }; + httpServer.once("listening", onListening); + httpServer.once("error", onError); + }); + } catch (err) { + // Clean up the debouncer so it does not linger in the global registry + // when the provider fails to start (e.g. port already in use). + unregisterDebouncer(); + throw err; + } applyMSTeamsWebhookTimeouts(httpServer); httpServer.on("error", (err) => { diff --git a/src/auto-reply/inbound-debounce.ts b/src/auto-reply/inbound-debounce.ts index 813aaade21a..34c0d463185 100644 --- a/src/auto-reply/inbound-debounce.ts +++ b/src/auto-reply/inbound-debounce.ts @@ -54,9 +54,30 @@ export async function flushAllInboundDebouncers(options?: { timeoutMs?: number } : undefined; const flushedCounts = await Promise.all( entries.map(async ([_key, handle]) => { - const result = await handle.flushAll({ deadlineMs }); - // Remove drained debouncers, and auto-evict stale entries whose - // owning channel never called unregister() (e.g. after reconnect). + let result: DebouncerFlushResult; + try { + result = await (deadlineMs !== undefined + ? Promise.race([ + handle.flushAll({ deadlineMs }), + new Promise((resolve) => { + const timer = setTimeout( + () => resolve({ flushedCount: 0, drained: false }), + Math.max(0, deadlineMs - Date.now()), + ); + timer.unref?.(); + }), + ]) + : handle.flushAll({ deadlineMs })); + } catch { + // A hung or failing flushAll should not prevent other debouncers + // from being swept. Keep the handle registered for a future sweep. + return 0; + } + // Only deregister AFTER the handle confirms all its buffers are + // drained. If the deadline hit mid-sweep, keep partially-flushed + // handles registered so subsequent sweeps can finish the job. + // Also auto-evict stale entries whose owning channel never called + // unregister() (e.g. after reconnect). if (result.drained || now - handle.lastActivityMs >= STALE_DEBOUNCER_MS) { handle.unregister(); } diff --git a/src/cli/gateway-cli/run-loop.test.ts b/src/cli/gateway-cli/run-loop.test.ts index ded7aaff4d7..36db91ac0b5 100644 --- a/src/cli/gateway-cli/run-loop.test.ts +++ b/src/cli/gateway-cli/run-loop.test.ts @@ -351,7 +351,7 @@ describe("runGatewayLoop", () => { }); }); - it("marks gateway draining before flushing inbound debouncers on SIGUSR1", async () => { + it("flushes inbound debouncers before marking gateway draining on SIGUSR1", async () => { vi.clearAllMocks(); await withIsolatedSignals(async ({ captureSignal }) => { @@ -374,8 +374,9 @@ describe("runGatewayLoop", () => { expect(flushAllInboundDebouncers).toHaveBeenCalledTimes(1); expect(flushAllInboundDebouncers).toHaveBeenCalledWith({ timeoutMs: 10_000 }); expect(waitForFollowupQueueDrain).toHaveBeenCalledWith(5_000); - expect(markGatewayDraining.mock.invocationCallOrder[0]).toBeLessThan( - flushAllInboundDebouncers.mock.invocationCallOrder[0] ?? Number.POSITIVE_INFINITY, + // Flush debouncers BEFORE marking draining so flushed messages can enqueue + expect(flushAllInboundDebouncers.mock.invocationCallOrder[0]).toBeLessThan( + markGatewayDraining.mock.invocationCallOrder[0] ?? Number.POSITIVE_INFINITY, ); expect(markGatewayDraining.mock.invocationCallOrder[0]).toBeLessThan( waitForFollowupQueueDrain.mock.invocationCallOrder[0] ?? Number.POSITIVE_INFINITY, @@ -387,7 +388,7 @@ describe("runGatewayLoop", () => { expect(gatewayLog.info).toHaveBeenCalledWith( "flushed 2 pending inbound debounce buffer(s) before restart", ); - expect(gatewayLog.info).toHaveBeenCalledWith("followup queues drained after debounce flush"); + expect(gatewayLog.info).toHaveBeenCalledWith("followup queues drained before restart"); sigterm(); await expect(exited).resolves.toBe(0); @@ -429,11 +430,15 @@ describe("runGatewayLoop", () => { }); }); - it("skips followup queue drain when no debouncers had buffered messages", async () => { + it("always drains followup queue even when no debouncers had buffered messages", async () => { vi.clearAllMocks(); await withIsolatedSignals(async ({ captureSignal }) => { flushAllInboundDebouncers.mockResolvedValueOnce(0); + waitForFollowupQueueDrain.mockResolvedValueOnce({ + drained: true, + remaining: 0, + }); const setTimeoutSpy = vi.spyOn(globalThis, "setTimeout"); try { @@ -448,12 +453,13 @@ describe("runGatewayLoop", () => { expect(flushAllInboundDebouncers).toHaveBeenCalledTimes(1); expect(flushAllInboundDebouncers).toHaveBeenCalledWith({ timeoutMs: 10_000 }); - expect(waitForFollowupQueueDrain).not.toHaveBeenCalled(); + // Followup queue drain is always called regardless of flushedCount + expect(waitForFollowupQueueDrain).toHaveBeenCalledWith(5_000); expect(markGatewayDraining).toHaveBeenCalledTimes(1); const forceExitCalls = setTimeoutSpy.mock.calls .map((call) => call[1]) .filter((delay): delay is number => typeof delay === "number" && delay >= 95_000); - expect(forceExitCalls).toEqual([95_000, 95_000]); + expect(forceExitCalls).toEqual([95_000, 100_000]); sigterm(); await expect(exited).resolves.toBe(0); @@ -516,7 +522,10 @@ describe("runGatewayLoop", () => { const forceExitCalls = setTimeoutSpy.mock.calls .map((call) => call[1]) .filter((delay): delay is number => typeof delay === "number" && delay >= 95_000); - expect(forceExitCalls).toEqual([95_000, 95_000]); + // First arm: 1000 + 5000 + 90000 = 96000, delay = 96000 - 1000 = 95000 + // Second arm (after 20s flush): 21000 + 5000 + 90000 + 5000 = 121000, + // delay = 121000 - 21000 = 100000 + expect(forceExitCalls).toEqual([95_000, 100_000]); sigterm(); await expect(exited).resolves.toBe(0); diff --git a/src/cli/gateway-cli/run-loop.ts b/src/cli/gateway-cli/run-loop.ts index 6167bc56500..08f7b39119b 100644 --- a/src/cli/gateway-cli/run-loop.ts +++ b/src/cli/gateway-cli/run-loop.ts @@ -138,42 +138,31 @@ export async function runGatewayLoop(params: { // On restart, wait for in-flight agent turns to finish before // tearing down the server so buffered messages are delivered. if (isRestart) { - // Reject new command-queue work before any awaited restart drain - // step so late arrivals fail explicitly instead of being stranded - // behind a one-shot debounce flush. This does not block followup - // queue enqueues, so flushed inbound work can still drain normally. - markGatewayDraining(); - - // Flush inbound debounce buffers first. This pushes any messages - // waiting in per-channel debounce timers (e.g. the 2500ms collect - // window) into the followup queues immediately, preventing silent - // message loss when the server reinitializes. + // Flush inbound debounce buffers BEFORE marking the gateway as + // draining so flushed messages can still enqueue into the command + // queue. This pushes any messages waiting in per-channel debounce + // timers (e.g. the 2500ms collect window) into the followup queues + // immediately, preventing silent message loss on reinit. const flushedBuffers = await flushAllInboundDebouncers({ timeoutMs: INBOUND_DEBOUNCE_FLUSH_TIMEOUT_MS, }); - // Start the restart watchdog budget after the pre-shutdown debounce - // flush so slow flush handlers do not steal time from active drain. - armForceExitTimer( - Date.now() + - SHUTDOWN_TIMEOUT_MS + - DRAIN_TIMEOUT_MS + - (flushedBuffers > 0 ? FOLLOWUP_DRAIN_TIMEOUT_MS : 0), - ); if (flushedBuffers > 0) { gatewayLog.info( `flushed ${flushedBuffers} pending inbound debounce buffer(s) before restart`, ); - // Give the followup queue drain loops a short window to process - // the newly flushed items before the server is torn down. - const followupResult = await waitForFollowupQueueDrain(FOLLOWUP_DRAIN_TIMEOUT_MS); - if (followupResult.drained) { - gatewayLog.info("followup queues drained after debounce flush"); - } else { - gatewayLog.warn( - `followup queue drain timeout; ${followupResult.remaining} item(s) still pending`, - ); - } } + + // Now reject new command-queue work so late arrivals fail explicitly + // instead of being stranded. This does not block followup queue + // enqueues, so already-flushed inbound work can still drain normally. + markGatewayDraining(); + + // Start the restart watchdog budget after the pre-shutdown debounce + // flush so slow flush handlers do not steal time from active drain. + armForceExitTimer( + Date.now() + SHUTDOWN_TIMEOUT_MS + DRAIN_TIMEOUT_MS + FOLLOWUP_DRAIN_TIMEOUT_MS, + ); + const activeTasks = getActiveTaskCount(); const activeRuns = getActiveEmbeddedRunCount(); @@ -204,6 +193,19 @@ export async function runGatewayLoop(params: { abortEmbeddedPiRun(undefined, { mode: "all" }); } } + + // Drain followup queues AFTER active tasks finish so tasks that + // produce followup work have a chance to enqueue before we wait. + // Always drain regardless of flushedCount — queued followups are + // not contingent on debouncers. + const followupResult = await waitForFollowupQueueDrain(FOLLOWUP_DRAIN_TIMEOUT_MS); + if (followupResult.drained) { + gatewayLog.info("followup queues drained before restart"); + } else { + gatewayLog.warn( + `followup queue drain timeout; ${followupResult.remaining} item(s) still pending`, + ); + } } await server?.close({