From 879c6e9040ac825cb63ec36bd47e980650aea844 Mon Sep 17 00:00:00 2001 From: Jason Wu Date: Thu, 19 Mar 2026 10:06:14 +0800 Subject: [PATCH] fix(health-monitor): re-check state post-drain and skip stale busy drain --- src/gateway/channel-health-monitor.test.ts | 97 +++++++++++++++++++++- src/gateway/channel-health-monitor.ts | 52 ++++++++++++ 2 files changed, 147 insertions(+), 2 deletions(-) diff --git a/src/gateway/channel-health-monitor.test.ts b/src/gateway/channel-health-monitor.test.ts index b237ce7feb2..98739a10ada 100644 --- a/src/gateway/channel-health-monitor.test.ts +++ b/src/gateway/channel-health-monitor.test.ts @@ -578,6 +578,99 @@ describe("channel-health-monitor", () => { monitor.stop(); }); + it("does not restart if monitor is stopped during drain", async () => { + const now = Date.now(); + let monitorRef: ReturnType | undefined; + let callCount = 0; + const manager = createMockChannelManager({ + getRuntimeSnapshot: vi.fn(() => { + callCount++; + // Stop the monitor partway through the drain window. + if (callCount === 2) { + monitorRef?.stop(); + } + return snapshotWith({ + discord: { + default: { + running: true, + connected: false, + enabled: true, + configured: true, + lastStartAt: now - 300_000, + activeRuns: 1, + busy: true, + lastRunActivityAt: now - 5_000, + }, + }, + }); + }), + }); + monitorRef = startDefaultMonitor(manager, { checkIntervalMs: DEFAULT_CHECK_INTERVAL_MS }); + await vi.advanceTimersByTimeAsync(DEFAULT_CHECK_INTERVAL_MS + 1); + // Advance enough for drain to be polled (which triggers stop). + await vi.advanceTimersByTimeAsync(2_000); + expect(manager.stopChannel).not.toHaveBeenCalled(); + expect(manager.startChannel).not.toHaveBeenCalled(); + }); + + it("does not restart if channel recovers (becomes healthy) during drain", async () => { + const now = Date.now(); + let callCount = 0; + const manager = createMockChannelManager({ + getRuntimeSnapshot: vi.fn(() => { + callCount++; + const recovered = callCount >= 4; + return snapshotWith({ + discord: { + default: { + running: true, + // Channel reconnects on the 4th snapshot call (post-drain re-check). + connected: recovered, + enabled: true, + configured: true, + lastStartAt: now - 300_000, + activeRuns: recovered ? 0 : 1, + busy: !recovered, + lastRunActivityAt: recovered ? now : now - 5_000, + }, + }, + }); + }), + }); + const monitor = startDefaultMonitor(manager, { checkIntervalMs: DEFAULT_CHECK_INTERVAL_MS }); + await vi.advanceTimersByTimeAsync(DEFAULT_CHECK_INTERVAL_MS + 1); + // Advance enough for drain to finish and post-drain re-check to see healthy. + await vi.advanceTimersByTimeAsync(3_000); + expect(manager.stopChannel).not.toHaveBeenCalled(); + expect(manager.startChannel).not.toHaveBeenCalled(); + monitor.stop(); + }); + + it("skips drain immediately for stale inherited activeRuns (lastRunActivityAt predates lastStartAt)", async () => { + const now = Date.now(); + const manager = createSnapshotManager({ + discord: { + default: { + running: true, + connected: false, + enabled: true, + configured: true, + lastStartAt: now - 300_000, + activeRuns: 1, + busy: true, + // Activity predates lastStartAt — inherited from prior lifecycle. + lastRunActivityAt: now - 400_000, + }, + }, + }); + const monitor = startDefaultMonitor(manager, { checkIntervalMs: DEFAULT_CHECK_INTERVAL_MS }); + await vi.advanceTimersByTimeAsync(DEFAULT_CHECK_INTERVAL_MS + 1); + // Should NOT need 30s drain window — restarts immediately. + expect(manager.stopChannel).toHaveBeenCalledWith("discord", "default"); + expect(manager.startChannel).toHaveBeenCalledWith("discord", "default"); + monitor.stop(); + }); + it("skips drain when channel has no active runs", async () => { const now = Date.now(); const snapshotFn = vi.fn(() => @@ -600,8 +693,8 @@ describe("channel-health-monitor", () => { const monitor = await startAndRunCheck(manager); // stopChannel should be called without any drain delay. expect(manager.stopChannel).toHaveBeenCalledWith("discord", "default"); - // Only 2 snapshot calls: one for the main check, one for the drain check. - expect(snapshotFn).toHaveBeenCalledTimes(2); + // 3 snapshot calls: main health check, drain check (activeRuns=0 → returns), post-drain re-check. + expect(snapshotFn).toHaveBeenCalledTimes(3); monitor.stop(); }); }); diff --git a/src/gateway/channel-health-monitor.ts b/src/gateway/channel-health-monitor.ts index 40ef3e3b8d3..dee5853eeaa 100644 --- a/src/gateway/channel-health-monitor.ts +++ b/src/gateway/channel-health-monitor.ts @@ -82,9 +82,15 @@ function resolveTimingPolicy( }; } +/** How long before run activity is considered stale / not belonging to any live run. */ +const STALE_RUN_ACTIVITY_MS = 25 * 60_000; + /** * Wait up to {@link DRAIN_WINDOW_MS} for active agent runs on a channel/account * to finish so in-flight text replies are not silently aborted. + * + * Returns immediately (without waiting) when the busy state is determined to be + * stale — i.e. inherited from a prior lifecycle with no real in-flight runs. */ async function drainActiveRuns( channelManager: ChannelManager, @@ -94,6 +100,7 @@ async function drainActiveRuns( ): Promise { const deadline = Date.now() + DRAIN_WINDOW_MS; let warned = false; + let checkedStaleness = false; while (!isStopped() && Date.now() < deadline) { const snap = channelManager.getRuntimeSnapshot(); const accountSnap = snap.channelAccounts[channelId]?.[accountId]; @@ -104,6 +111,31 @@ async function drainActiveRuns( if (activeRuns === 0) { return; } + // On the first iteration with active runs, check whether the busy state is stale + // (e.g. inherited from a prior lifecycle). If so, there are no real in-flight runs + // to protect — return immediately to avoid burning the full drain window. + if (!checkedStaleness) { + checkedStaleness = true; + const lastStartAt = + typeof accountSnap?.lastStartAt === "number" && Number.isFinite(accountSnap.lastStartAt) + ? accountSnap.lastStartAt + : null; + const lastRunActivityAt = + typeof accountSnap?.lastRunActivityAt === "number" && + Number.isFinite(accountSnap.lastRunActivityAt) + ? accountSnap.lastRunActivityAt + : null; + const isStale = + lastRunActivityAt == null || + (lastStartAt != null && lastRunActivityAt < lastStartAt) || + Date.now() - lastRunActivityAt > STALE_RUN_ACTIVITY_MS; + if (isStale) { + log.info?.( + `[${channelId}:${accountId}] health-monitor: skipping drain — busy state is stale (inherited from prior lifecycle)`, + ); + return; + } + } if (!warned) { log.info?.( `[${channelId}:${accountId}] health-monitor: waiting for ${activeRuns} active run(s) to drain`, @@ -213,6 +245,26 @@ export function startChannelHealthMonitor(deps: ChannelHealthMonitorDeps): Chann accountId, () => stopped, ); + // If the monitor was stopped during the drain window, abort the restart. + if (stopped) { + break; + } + // Re-evaluate channel health after drain: the channel may have recovered + // or reconnected while we were waiting. Only proceed if still unhealthy. + const postDrainSnap = channelManager.getRuntimeSnapshot(); + const postDrainStatus = postDrainSnap.channelAccounts[channelId]?.[accountId]; + if (postDrainStatus) { + const postDrainHealth = evaluateChannelHealth(postDrainStatus, { + ...healthPolicy, + now: Date.now(), + }); + if (postDrainHealth.healthy) { + log.info?.( + `[${channelId}:${accountId}] health-monitor: channel recovered during drain, skipping restart`, + ); + continue; + } + } await channelManager.stopChannel(channelId as ChannelId, accountId); } channelManager.resetRestartAttempts(channelId as ChannelId, accountId);