From 8026d569830a14f6219c20bfb35ac35d8584a05b Mon Sep 17 00:00:00 2001 From: Jason Wu Date: Tue, 17 Mar 2026 13:50:14 +0800 Subject: [PATCH 1/6] fix(health-monitor): drain active runs before abort on channel restart - Add 30s drain window: wait for in-flight agent runs to complete before calling abort on health-monitor-triggered channel restarts, preventing silently dropped text replies (reactions survive since they fire early) - Add 60s cooldown floor between health-monitor restarts to prevent rapid restart cycling when check intervals are short - Add resolveFreshTiming callback so health evaluations use current config values instead of potentially stale timing captured at monitor startup Co-Authored-By: Claude Opus 4.6 (1M context) --- src/gateway/channel-health-monitor.test.ts | 187 +++++++++++++++++++-- src/gateway/channel-health-monitor.ts | 55 +++++- 2 files changed, 229 insertions(+), 13 deletions(-) diff --git a/src/gateway/channel-health-monitor.test.ts b/src/gateway/channel-health-monitor.test.ts index efc392f8ee0..b237ce7feb2 100644 --- a/src/gateway/channel-health-monitor.test.ts +++ b/src/gateway/channel-health-monitor.test.ts @@ -317,13 +317,23 @@ describe("channel-health-monitor", () => { it("restarts busy channels when run activity is stale", async () => { const now = Date.now(); const manager = createBusyDisconnectedManager(now - 26 * 60_000); - await expectRestartedChannel(manager, "discord"); + const monitor = await startAndRunCheck(manager); + // activeRuns stays 1, so drain window times out (30s). + await vi.advanceTimersByTimeAsync(31_000); + expect(manager.stopChannel).toHaveBeenCalledWith("discord", "default"); + expect(manager.startChannel).toHaveBeenCalledWith("discord", "default"); + monitor.stop(); }); it("restarts disconnected channels when busy flags are inherited from a prior lifecycle", async () => { const now = Date.now(); const manager = createBusyDisconnectedManager(now - 301_000); - await expectRestartedChannel(manager, "discord"); + const monitor = await startAndRunCheck(manager); + // activeRuns stays 1, so drain window times out (30s). + await vi.advanceTimersByTimeAsync(31_000); + expect(manager.stopChannel).toHaveBeenCalledWith("discord", "default"); + expect(manager.startChannel).toHaveBeenCalledWith("discord", "default"); + monitor.stop(); }); it("skips recently-started channels while they are still connecting", async () => { @@ -402,7 +412,7 @@ describe("channel-health-monitor", () => { monitor.stop(); }); - it("applies cooldown — skips recently restarted channels for 2 cycles", async () => { + it("applies cooldown — enforces minimum 60s floor between restarts", async () => { const manager = createSnapshotManager({ discord: { default: managedStoppedAccount("crashed"), @@ -410,11 +420,12 @@ describe("channel-health-monitor", () => { }); const monitor = await startAndRunCheck(manager); expect(manager.startChannel).toHaveBeenCalledTimes(1); - await vi.advanceTimersByTimeAsync(DEFAULT_CHECK_INTERVAL_MS); + // With checkIntervalMs=5_000, cooldownCycles=2, computed cooldown = max(10_000, 60_000) = 60s. + // Advance 55s (11 intervals) — still in cooldown. + await vi.advanceTimersByTimeAsync(55_000); expect(manager.startChannel).toHaveBeenCalledTimes(1); - await vi.advanceTimersByTimeAsync(DEFAULT_CHECK_INTERVAL_MS); - expect(manager.startChannel).toHaveBeenCalledTimes(1); - await vi.advanceTimersByTimeAsync(DEFAULT_CHECK_INTERVAL_MS); + // Advance another 10s — past 60s cooldown, next check triggers restart. + await vi.advanceTimersByTimeAsync(10_000); expect(manager.startChannel).toHaveBeenCalledTimes(2); monitor.stop(); }); @@ -425,14 +436,17 @@ describe("channel-health-monitor", () => { default: managedStoppedAccount("keeps crashing"), }, }); + // Use 61s interval so cooldown (max(61_000, 60_000)) doesn't dominate. const monitor = startDefaultMonitor(manager, { - checkIntervalMs: 1_000, + checkIntervalMs: 61_000, cooldownCycles: 1, maxRestartsPerHour: 3, }); - await vi.advanceTimersByTimeAsync(5_001); + // Restart #1 at ~61s, #2 at ~183s, #3 at ~305s (every other interval due to <= cooldown). + await vi.advanceTimersByTimeAsync(310_000); expect(manager.startChannel).toHaveBeenCalledTimes(3); - await vi.advanceTimersByTimeAsync(1_001); + // One more full cycle pair — capped. + await vi.advanceTimersByTimeAsync(130_000); expect(manager.startChannel).toHaveBeenCalledTimes(3); monitor.stop(); }); @@ -493,6 +507,159 @@ describe("channel-health-monitor", () => { monitor.stop(); }); + describe("drain window", () => { + it("waits for active runs to drain before stopping a channel", async () => { + const now = Date.now(); + let callCount = 0; + const manager = createSnapshotManager( + { + discord: { + default: { + running: true, + connected: false, + enabled: true, + configured: true, + lastStartAt: now - 300_000, + activeRuns: 2, + }, + }, + }, + { + // After 3 snapshot reads (initial + 2 drain polls), simulate runs finishing. + getRuntimeSnapshot: vi.fn(() => { + callCount++; + return snapshotWith({ + discord: { + default: { + running: true, + connected: false, + enabled: true, + configured: true, + lastStartAt: now - 300_000, + activeRuns: callCount >= 4 ? 0 : 2, + }, + }, + }); + }), + }, + ); + const monitor = startDefaultMonitor(manager, { checkIntervalMs: DEFAULT_CHECK_INTERVAL_MS }); + // First interval fires the check. + await vi.advanceTimersByTimeAsync(DEFAULT_CHECK_INTERVAL_MS + 1); + // Drain polls every 1s. Advance enough for drain to see activeRuns=0. + await vi.advanceTimersByTimeAsync(3_000); + expect(manager.stopChannel).toHaveBeenCalledWith("discord", "default"); + expect(manager.startChannel).toHaveBeenCalledWith("discord", "default"); + monitor.stop(); + }); + + it("proceeds with stop after drain window timeout even if runs remain", async () => { + const now = Date.now(); + const manager = createSnapshotManager({ + discord: { + default: { + running: true, + connected: false, + enabled: true, + configured: true, + lastStartAt: now - 300_000, + // activeRuns stays > 0 forever. + activeRuns: 1, + }, + }, + }); + const monitor = startDefaultMonitor(manager, { checkIntervalMs: DEFAULT_CHECK_INTERVAL_MS }); + // First interval fires the check. + await vi.advanceTimersByTimeAsync(DEFAULT_CHECK_INTERVAL_MS + 1); + // Advance past the full 30s drain window. + await vi.advanceTimersByTimeAsync(31_000); + expect(manager.stopChannel).toHaveBeenCalledWith("discord", "default"); + expect(manager.startChannel).toHaveBeenCalledWith("discord", "default"); + monitor.stop(); + }); + + it("skips drain when channel has no active runs", async () => { + const now = Date.now(); + const snapshotFn = vi.fn(() => + snapshotWith({ + discord: { + default: { + running: true, + connected: false, + enabled: true, + configured: true, + lastStartAt: now - 300_000, + activeRuns: 0, + }, + }, + }), + ); + const manager = createMockChannelManager({ + getRuntimeSnapshot: snapshotFn, + }); + const monitor = await startAndRunCheck(manager); + // stopChannel should be called without any drain delay. + expect(manager.stopChannel).toHaveBeenCalledWith("discord", "default"); + // Only 2 snapshot calls: one for the main check, one for the drain check. + expect(snapshotFn).toHaveBeenCalledTimes(2); + monitor.stop(); + }); + }); + + describe("cooldown floor", () => { + it("enforces 60s minimum even with short check intervals", async () => { + const manager = createSnapshotManager({ + discord: { + default: managedStoppedAccount("crashed"), + }, + }); + // checkIntervalMs=1_000, cooldownCycles=1 → computed cooldown = max(1_000, 60_000) = 60s. + const monitor = startDefaultMonitor(manager, { + checkIntervalMs: 1_000, + cooldownCycles: 1, + }); + // First restart at ~1s. + await vi.advanceTimersByTimeAsync(1_001); + expect(manager.startChannel).toHaveBeenCalledTimes(1); + // 30s later — still in 60s cooldown. + await vi.advanceTimersByTimeAsync(30_000); + expect(manager.startChannel).toHaveBeenCalledTimes(1); + // Past 60s cooldown floor. + await vi.advanceTimersByTimeAsync(35_000); + expect(manager.startChannel).toHaveBeenCalledTimes(2); + monitor.stop(); + }); + }); + + describe("fresh timing", () => { + it("uses resolveFreshTiming callback each cycle", async () => { + const now = Date.now(); + // Start with a very long stale threshold so the channel appears healthy. + let freshStaleMs = 999_999_999; + const manager = createSlackSnapshotManager( + runningConnectedSlackAccount({ + lastStartAt: now - 120_000, + lastEventAt: now - 100_000, + }), + ); + const resolveFreshTiming = vi.fn(() => ({ staleEventThresholdMs: freshStaleMs })); + const monitor = startDefaultMonitor(manager, { + checkIntervalMs: DEFAULT_CHECK_INTERVAL_MS, + resolveFreshTiming, + }); + // First check — stale threshold is huge, so no restart. + await vi.advanceTimersByTimeAsync(DEFAULT_CHECK_INTERVAL_MS + 1); + expect(manager.stopChannel).not.toHaveBeenCalled(); + expect(resolveFreshTiming).toHaveBeenCalled(); + // Lower the threshold so the channel is now considered stale. + freshStaleMs = 60_000; + await vi.advanceTimersByTimeAsync(DEFAULT_CHECK_INTERVAL_MS); + expect(manager.stopChannel).toHaveBeenCalledWith("slack", "default"); + expect(manager.startChannel).toHaveBeenCalledWith("slack", "default"); + monitor.stop(); + }); + }); + describe("stale socket detection", () => { const STALE_THRESHOLD = 30 * 60_000; diff --git a/src/gateway/channel-health-monitor.ts b/src/gateway/channel-health-monitor.ts index 809beb1abb8..f0fca813b65 100644 --- a/src/gateway/channel-health-monitor.ts +++ b/src/gateway/channel-health-monitor.ts @@ -17,6 +17,13 @@ const DEFAULT_COOLDOWN_CYCLES = 2; const DEFAULT_MAX_RESTARTS_PER_HOUR = 10; const ONE_HOUR_MS = 60 * 60_000; +/** Minimum cooldown between health-monitor-triggered restarts per channel. */ +const MIN_RESTART_COOLDOWN_MS = 60_000; + +/** Maximum time to wait for active agent runs to drain before aborting. */ +const DRAIN_WINDOW_MS = 30_000; +const DRAIN_POLL_MS = 1_000; + /** * How long a connected channel can go without receiving any event before * the health monitor treats it as a "stale socket" and triggers a restart. @@ -42,6 +49,8 @@ export type ChannelHealthMonitorDeps = { cooldownCycles?: number; maxRestartsPerHour?: number; abortSignal?: AbortSignal; + /** Called each check cycle to resolve fresh timing values (avoids stale config). */ + resolveFreshTiming?: () => Partial; }; export type ChannelHealthMonitor = { @@ -73,6 +82,38 @@ function resolveTimingPolicy( }; } +/** + * Wait up to {@link DRAIN_WINDOW_MS} for active agent runs on a channel/account + * to finish so in-flight text replies are not silently aborted. + */ +async function drainActiveRuns( + channelManager: ChannelManager, + channelId: ChannelId, + accountId: string, + stopped: boolean, +): Promise { + const deadline = Date.now() + DRAIN_WINDOW_MS; + let warned = false; + while (!stopped && Date.now() < deadline) { + const snap = channelManager.getRuntimeSnapshot(); + const accountSnap = snap.channelAccounts[channelId]?.[accountId]; + const activeRuns = + typeof accountSnap?.activeRuns === "number" && Number.isFinite(accountSnap.activeRuns) + ? Math.max(0, Math.trunc(accountSnap.activeRuns)) + : 0; + if (activeRuns === 0) { + return; + } + if (!warned) { + log.info?.( + `[${channelId}:${accountId}] health-monitor: waiting for ${activeRuns} active run(s) to drain`, + ); + warned = true; + } + await new Promise((resolve) => setTimeout(resolve, DRAIN_POLL_MS)); + } +} + export function startChannelHealthMonitor(deps: ChannelHealthMonitorDeps): ChannelHealthMonitor { const { channelManager, @@ -83,7 +124,7 @@ export function startChannelHealthMonitor(deps: ChannelHealthMonitorDeps): Chann } = deps; const timing = resolveTimingPolicy(deps); - const cooldownMs = cooldownCycles * checkIntervalMs; + const cooldownMs = Math.max(cooldownCycles * checkIntervalMs, MIN_RESTART_COOLDOWN_MS); const restartRecords = new Map(); const startedAt = Date.now(); let stopped = false; @@ -108,6 +149,13 @@ export function startChannelHealthMonitor(deps: ChannelHealthMonitorDeps): Chann return; } + // Re-resolve timing each cycle so runtime config changes are picked up + // without waiting for a full health-monitor restart. + const freshTiming = deps.resolveFreshTiming?.(); + const effectiveTiming: ChannelHealthTimingPolicy = freshTiming + ? { ...timing, ...freshTiming } + : timing; + const snapshot = channelManager.getRuntimeSnapshot(); for (const [channelId, accounts] of Object.entries(snapshot.channelAccounts)) { @@ -127,8 +175,8 @@ export function startChannelHealthMonitor(deps: ChannelHealthMonitorDeps): Chann const healthPolicy: ChannelHealthPolicy = { channelId, now, - staleEventThresholdMs: timing.staleEventThresholdMs, - channelConnectGraceMs: timing.channelConnectGraceMs, + staleEventThresholdMs: effectiveTiming.staleEventThresholdMs, + channelConnectGraceMs: effectiveTiming.channelConnectGraceMs, }; const health = evaluateChannelHealth(status, healthPolicy); if (health.healthy) { @@ -159,6 +207,7 @@ export function startChannelHealthMonitor(deps: ChannelHealthMonitorDeps): Chann try { if (status.running) { + await drainActiveRuns(channelManager, channelId as ChannelId, accountId, stopped); await channelManager.stopChannel(channelId as ChannelId, accountId); } channelManager.resetRestartAttempts(channelId as ChannelId, accountId); From afa4adb202aa85a003e549a1309453d6cccc6711 Mon Sep 17 00:00:00 2001 From: Jason Wu Date: Wed, 18 Mar 2026 19:52:34 +0800 Subject: [PATCH 2/6] fix: pass stopped as predicate and set cooldown clock after drain - Change drainActiveRuns parameter from `stopped: boolean` (snapshot) to `isStopped: () => boolean` so mid-drain stop() is visible - Set lastRestartAt = Date.now() after drain+restart completes, not before (was using stale `now` from check-loop start) --- src/gateway/channel-health-monitor.ts | 16 +++++++++++----- 1 file changed, 11 insertions(+), 5 deletions(-) diff --git a/src/gateway/channel-health-monitor.ts b/src/gateway/channel-health-monitor.ts index f0fca813b65..40ef3e3b8d3 100644 --- a/src/gateway/channel-health-monitor.ts +++ b/src/gateway/channel-health-monitor.ts @@ -90,11 +90,11 @@ async function drainActiveRuns( channelManager: ChannelManager, channelId: ChannelId, accountId: string, - stopped: boolean, + isStopped: () => boolean, ): Promise { const deadline = Date.now() + DRAIN_WINDOW_MS; let warned = false; - while (!stopped && Date.now() < deadline) { + while (!isStopped() && Date.now() < deadline) { const snap = channelManager.getRuntimeSnapshot(); const accountSnap = snap.channelAccounts[channelId]?.[accountId]; const activeRuns = @@ -207,13 +207,19 @@ export function startChannelHealthMonitor(deps: ChannelHealthMonitorDeps): Chann try { if (status.running) { - await drainActiveRuns(channelManager, channelId as ChannelId, accountId, stopped); + await drainActiveRuns( + channelManager, + channelId as ChannelId, + accountId, + () => stopped, + ); await channelManager.stopChannel(channelId as ChannelId, accountId); } channelManager.resetRestartAttempts(channelId as ChannelId, accountId); await channelManager.startChannel(channelId as ChannelId, accountId); - record.lastRestartAt = now; - record.restartsThisHour.push({ at: now }); + const restartedAt = Date.now(); + record.lastRestartAt = restartedAt; + record.restartsThisHour.push({ at: restartedAt }); restartRecords.set(key, record); } catch (err) { log.error?.( From 879c6e9040ac825cb63ec36bd47e980650aea844 Mon Sep 17 00:00:00 2001 From: Jason Wu Date: Thu, 19 Mar 2026 10:06:14 +0800 Subject: [PATCH 3/6] fix(health-monitor): re-check state post-drain and skip stale busy drain --- src/gateway/channel-health-monitor.test.ts | 97 +++++++++++++++++++++- src/gateway/channel-health-monitor.ts | 52 ++++++++++++ 2 files changed, 147 insertions(+), 2 deletions(-) diff --git a/src/gateway/channel-health-monitor.test.ts b/src/gateway/channel-health-monitor.test.ts index b237ce7feb2..98739a10ada 100644 --- a/src/gateway/channel-health-monitor.test.ts +++ b/src/gateway/channel-health-monitor.test.ts @@ -578,6 +578,99 @@ describe("channel-health-monitor", () => { monitor.stop(); }); + it("does not restart if monitor is stopped during drain", async () => { + const now = Date.now(); + let monitorRef: ReturnType | undefined; + let callCount = 0; + const manager = createMockChannelManager({ + getRuntimeSnapshot: vi.fn(() => { + callCount++; + // Stop the monitor partway through the drain window. + if (callCount === 2) { + monitorRef?.stop(); + } + return snapshotWith({ + discord: { + default: { + running: true, + connected: false, + enabled: true, + configured: true, + lastStartAt: now - 300_000, + activeRuns: 1, + busy: true, + lastRunActivityAt: now - 5_000, + }, + }, + }); + }), + }); + monitorRef = startDefaultMonitor(manager, { checkIntervalMs: DEFAULT_CHECK_INTERVAL_MS }); + await vi.advanceTimersByTimeAsync(DEFAULT_CHECK_INTERVAL_MS + 1); + // Advance enough for drain to be polled (which triggers stop). + await vi.advanceTimersByTimeAsync(2_000); + expect(manager.stopChannel).not.toHaveBeenCalled(); + expect(manager.startChannel).not.toHaveBeenCalled(); + }); + + it("does not restart if channel recovers (becomes healthy) during drain", async () => { + const now = Date.now(); + let callCount = 0; + const manager = createMockChannelManager({ + getRuntimeSnapshot: vi.fn(() => { + callCount++; + const recovered = callCount >= 4; + return snapshotWith({ + discord: { + default: { + running: true, + // Channel reconnects on the 4th snapshot call (post-drain re-check). + connected: recovered, + enabled: true, + configured: true, + lastStartAt: now - 300_000, + activeRuns: recovered ? 0 : 1, + busy: !recovered, + lastRunActivityAt: recovered ? now : now - 5_000, + }, + }, + }); + }), + }); + const monitor = startDefaultMonitor(manager, { checkIntervalMs: DEFAULT_CHECK_INTERVAL_MS }); + await vi.advanceTimersByTimeAsync(DEFAULT_CHECK_INTERVAL_MS + 1); + // Advance enough for drain to finish and post-drain re-check to see healthy. + await vi.advanceTimersByTimeAsync(3_000); + expect(manager.stopChannel).not.toHaveBeenCalled(); + expect(manager.startChannel).not.toHaveBeenCalled(); + monitor.stop(); + }); + + it("skips drain immediately for stale inherited activeRuns (lastRunActivityAt predates lastStartAt)", async () => { + const now = Date.now(); + const manager = createSnapshotManager({ + discord: { + default: { + running: true, + connected: false, + enabled: true, + configured: true, + lastStartAt: now - 300_000, + activeRuns: 1, + busy: true, + // Activity predates lastStartAt — inherited from prior lifecycle. + lastRunActivityAt: now - 400_000, + }, + }, + }); + const monitor = startDefaultMonitor(manager, { checkIntervalMs: DEFAULT_CHECK_INTERVAL_MS }); + await vi.advanceTimersByTimeAsync(DEFAULT_CHECK_INTERVAL_MS + 1); + // Should NOT need 30s drain window — restarts immediately. + expect(manager.stopChannel).toHaveBeenCalledWith("discord", "default"); + expect(manager.startChannel).toHaveBeenCalledWith("discord", "default"); + monitor.stop(); + }); + it("skips drain when channel has no active runs", async () => { const now = Date.now(); const snapshotFn = vi.fn(() => @@ -600,8 +693,8 @@ describe("channel-health-monitor", () => { const monitor = await startAndRunCheck(manager); // stopChannel should be called without any drain delay. expect(manager.stopChannel).toHaveBeenCalledWith("discord", "default"); - // Only 2 snapshot calls: one for the main check, one for the drain check. - expect(snapshotFn).toHaveBeenCalledTimes(2); + // 3 snapshot calls: main health check, drain check (activeRuns=0 → returns), post-drain re-check. + expect(snapshotFn).toHaveBeenCalledTimes(3); monitor.stop(); }); }); diff --git a/src/gateway/channel-health-monitor.ts b/src/gateway/channel-health-monitor.ts index 40ef3e3b8d3..dee5853eeaa 100644 --- a/src/gateway/channel-health-monitor.ts +++ b/src/gateway/channel-health-monitor.ts @@ -82,9 +82,15 @@ function resolveTimingPolicy( }; } +/** How long before run activity is considered stale / not belonging to any live run. */ +const STALE_RUN_ACTIVITY_MS = 25 * 60_000; + /** * Wait up to {@link DRAIN_WINDOW_MS} for active agent runs on a channel/account * to finish so in-flight text replies are not silently aborted. + * + * Returns immediately (without waiting) when the busy state is determined to be + * stale — i.e. inherited from a prior lifecycle with no real in-flight runs. */ async function drainActiveRuns( channelManager: ChannelManager, @@ -94,6 +100,7 @@ async function drainActiveRuns( ): Promise { const deadline = Date.now() + DRAIN_WINDOW_MS; let warned = false; + let checkedStaleness = false; while (!isStopped() && Date.now() < deadline) { const snap = channelManager.getRuntimeSnapshot(); const accountSnap = snap.channelAccounts[channelId]?.[accountId]; @@ -104,6 +111,31 @@ async function drainActiveRuns( if (activeRuns === 0) { return; } + // On the first iteration with active runs, check whether the busy state is stale + // (e.g. inherited from a prior lifecycle). If so, there are no real in-flight runs + // to protect — return immediately to avoid burning the full drain window. + if (!checkedStaleness) { + checkedStaleness = true; + const lastStartAt = + typeof accountSnap?.lastStartAt === "number" && Number.isFinite(accountSnap.lastStartAt) + ? accountSnap.lastStartAt + : null; + const lastRunActivityAt = + typeof accountSnap?.lastRunActivityAt === "number" && + Number.isFinite(accountSnap.lastRunActivityAt) + ? accountSnap.lastRunActivityAt + : null; + const isStale = + lastRunActivityAt == null || + (lastStartAt != null && lastRunActivityAt < lastStartAt) || + Date.now() - lastRunActivityAt > STALE_RUN_ACTIVITY_MS; + if (isStale) { + log.info?.( + `[${channelId}:${accountId}] health-monitor: skipping drain — busy state is stale (inherited from prior lifecycle)`, + ); + return; + } + } if (!warned) { log.info?.( `[${channelId}:${accountId}] health-monitor: waiting for ${activeRuns} active run(s) to drain`, @@ -213,6 +245,26 @@ export function startChannelHealthMonitor(deps: ChannelHealthMonitorDeps): Chann accountId, () => stopped, ); + // If the monitor was stopped during the drain window, abort the restart. + if (stopped) { + break; + } + // Re-evaluate channel health after drain: the channel may have recovered + // or reconnected while we were waiting. Only proceed if still unhealthy. + const postDrainSnap = channelManager.getRuntimeSnapshot(); + const postDrainStatus = postDrainSnap.channelAccounts[channelId]?.[accountId]; + if (postDrainStatus) { + const postDrainHealth = evaluateChannelHealth(postDrainStatus, { + ...healthPolicy, + now: Date.now(), + }); + if (postDrainHealth.healthy) { + log.info?.( + `[${channelId}:${accountId}] health-monitor: channel recovered during drain, skipping restart`, + ); + continue; + } + } await channelManager.stopChannel(channelId as ChannelId, accountId); } channelManager.resetRestartAttempts(channelId as ChannelId, accountId); From 269578dfaf04878a923486e4177fc0f862f5f959 Mon Sep 17 00:00:00 2001 From: Jason Wu Date: Fri, 20 Mar 2026 10:01:05 +0800 Subject: [PATCH 4/6] fix(health-monitor): re-prune hourly restart budget after drain window --- src/gateway/channel-health-monitor.ts | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/src/gateway/channel-health-monitor.ts b/src/gateway/channel-health-monitor.ts index dee5853eeaa..a56dd8a8183 100644 --- a/src/gateway/channel-health-monitor.ts +++ b/src/gateway/channel-health-monitor.ts @@ -265,6 +265,15 @@ export function startChannelHealthMonitor(deps: ChannelHealthMonitorDeps): Chann continue; } } + // Re-prune the hourly bucket with a fresh timestamp so that entries + // which aged out during the drain window are not counted against the cap. + pruneOldRestarts(record, Date.now()); + if (record.restartsThisHour.length >= maxRestartsPerHour) { + log.warn?.( + `[${channelId}:${accountId}] health-monitor: hit ${maxRestartsPerHour} restarts/hour limit after drain, skipping`, + ); + continue; + } await channelManager.stopChannel(channelId as ChannelId, accountId); } channelManager.resetRestartAttempts(channelId as ChannelId, accountId); From 8a9a9cbbe598715cc5b18b44801a0fa0372c6763 Mon Sep 17 00:00:00 2001 From: Jason Wu Date: Sat, 21 Mar 2026 10:02:33 +0800 Subject: [PATCH 5/6] fix(health-monitor): recompute hourly budget post-drain, abort full check on stop, skip stale-run drain --- src/gateway/channel-health-monitor.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/gateway/channel-health-monitor.ts b/src/gateway/channel-health-monitor.ts index a56dd8a8183..20eaf8c5dd6 100644 --- a/src/gateway/channel-health-monitor.ts +++ b/src/gateway/channel-health-monitor.ts @@ -247,7 +247,7 @@ export function startChannelHealthMonitor(deps: ChannelHealthMonitorDeps): Chann ); // If the monitor was stopped during the drain window, abort the restart. if (stopped) { - break; + return; } // Re-evaluate channel health after drain: the channel may have recovered // or reconnected while we were waiting. Only proceed if still unhealthy. From e316f68fd6e81c6e21e70c30a0432e662199b5bd Mon Sep 17 00:00:00 2001 From: Jason Wu Date: Sat, 21 Mar 2026 10:47:50 +0800 Subject: [PATCH 6/6] fix(health-monitor): skip restart when account removed or monitor disabled post-drain --- src/gateway/channel-health-monitor.test.ts | 79 ++++++++++++++++++++++ src/gateway/channel-health-monitor.ts | 14 ++++ 2 files changed, 93 insertions(+) diff --git a/src/gateway/channel-health-monitor.test.ts b/src/gateway/channel-health-monitor.test.ts index 98739a10ada..c51e1da324f 100644 --- a/src/gateway/channel-health-monitor.test.ts +++ b/src/gateway/channel-health-monitor.test.ts @@ -671,6 +671,85 @@ describe("channel-health-monitor", () => { monitor.stop(); }); + it("skips restart when account is removed during drain", async () => { + const now = Date.now(); + let callCount = 0; + const manager = createMockChannelManager({ + getRuntimeSnapshot: vi.fn(() => { + callCount++; + // After the initial health check + first drain poll, simulate account removal. + const removed = callCount >= 3; + return snapshotWith( + removed + ? {} + : { + discord: { + default: { + running: true, + connected: false, + enabled: true, + configured: true, + lastStartAt: now - 300_000, + activeRuns: callCount >= 3 ? 0 : 1, + busy: true, + lastRunActivityAt: now - 5_000, + }, + }, + }, + ); + }), + }); + const monitor = startDefaultMonitor(manager, { checkIntervalMs: DEFAULT_CHECK_INTERVAL_MS }); + await vi.advanceTimersByTimeAsync(DEFAULT_CHECK_INTERVAL_MS + 1); + // Drain polls until activeRuns=0 / account removed. + await vi.advanceTimersByTimeAsync(3_000); + expect(manager.stopChannel).not.toHaveBeenCalled(); + expect(manager.startChannel).not.toHaveBeenCalled(); + monitor.stop(); + }); + + it("skips restart when health monitor is disabled during drain", async () => { + const now = Date.now(); + let callCount = 0; + let monitorEnabled = true; + const manager = createMockChannelManager({ + getRuntimeSnapshot: vi.fn(() => { + callCount++; + const drained = callCount >= 4; + return snapshotWith({ + discord: { + default: { + running: true, + connected: false, + enabled: true, + configured: true, + lastStartAt: now - 300_000, + activeRuns: drained ? 0 : 1, + busy: !drained, + lastRunActivityAt: now - 5_000, + }, + }, + }); + }), + isHealthMonitorEnabled: vi.fn(() => { + // Disable after drain starts (simulating operator hot-reload). + if (!monitorEnabled) { + return false; + } + return true; + }), + }); + const monitor = startDefaultMonitor(manager, { checkIntervalMs: DEFAULT_CHECK_INTERVAL_MS }); + await vi.advanceTimersByTimeAsync(DEFAULT_CHECK_INTERVAL_MS + 1); + // Disable health monitor while drain is in progress. + monitorEnabled = false; + // Drain finishes after a few polls. + await vi.advanceTimersByTimeAsync(3_000); + expect(manager.stopChannel).not.toHaveBeenCalled(); + expect(manager.startChannel).not.toHaveBeenCalled(); + monitor.stop(); + }); + it("skips drain when channel has no active runs", async () => { const now = Date.now(); const snapshotFn = vi.fn(() => diff --git a/src/gateway/channel-health-monitor.ts b/src/gateway/channel-health-monitor.ts index 20eaf8c5dd6..65c95f2d473 100644 --- a/src/gateway/channel-health-monitor.ts +++ b/src/gateway/channel-health-monitor.ts @@ -253,6 +253,13 @@ export function startChannelHealthMonitor(deps: ChannelHealthMonitorDeps): Chann // or reconnected while we were waiting. Only proceed if still unhealthy. const postDrainSnap = channelManager.getRuntimeSnapshot(); const postDrainStatus = postDrainSnap.channelAccounts[channelId]?.[accountId]; + // Account was removed during drain (config hot-reload) — do not resurrect it. + if (!postDrainStatus) { + log.debug?.( + `[${channelId}:${accountId}] health-monitor: account removed during drain, skipping restart`, + ); + continue; + } if (postDrainStatus) { const postDrainHealth = evaluateChannelHealth(postDrainStatus, { ...healthPolicy, @@ -265,6 +272,13 @@ export function startChannelHealthMonitor(deps: ChannelHealthMonitorDeps): Chann continue; } } + // Re-check monitor enablement after drain — operator may have disabled it. + if (!channelManager.isHealthMonitorEnabled(channelId as ChannelId, accountId)) { + log.info?.( + `[${channelId}:${accountId}] health-monitor: monitor disabled during drain, skipping restart`, + ); + continue; + } // Re-prune the hourly bucket with a fresh timestamp so that entries // which aged out during the drain window are not counted against the cap. pruneOldRestarts(record, Date.now());