diff --git a/src/gateway/channel-health-monitor.test.ts b/src/gateway/channel-health-monitor.test.ts index efc392f8ee0..c51e1da324f 100644 --- a/src/gateway/channel-health-monitor.test.ts +++ b/src/gateway/channel-health-monitor.test.ts @@ -317,13 +317,23 @@ describe("channel-health-monitor", () => { it("restarts busy channels when run activity is stale", async () => { const now = Date.now(); const manager = createBusyDisconnectedManager(now - 26 * 60_000); - await expectRestartedChannel(manager, "discord"); + const monitor = await startAndRunCheck(manager); + // activeRuns stays 1, so drain window times out (30s). + await vi.advanceTimersByTimeAsync(31_000); + expect(manager.stopChannel).toHaveBeenCalledWith("discord", "default"); + expect(manager.startChannel).toHaveBeenCalledWith("discord", "default"); + monitor.stop(); }); it("restarts disconnected channels when busy flags are inherited from a prior lifecycle", async () => { const now = Date.now(); const manager = createBusyDisconnectedManager(now - 301_000); - await expectRestartedChannel(manager, "discord"); + const monitor = await startAndRunCheck(manager); + // activeRuns stays 1, so drain window times out (30s). + await vi.advanceTimersByTimeAsync(31_000); + expect(manager.stopChannel).toHaveBeenCalledWith("discord", "default"); + expect(manager.startChannel).toHaveBeenCalledWith("discord", "default"); + monitor.stop(); }); it("skips recently-started channels while they are still connecting", async () => { @@ -402,7 +412,7 @@ describe("channel-health-monitor", () => { monitor.stop(); }); - it("applies cooldown — skips recently restarted channels for 2 cycles", async () => { + it("applies cooldown — enforces minimum 60s floor between restarts", async () => { const manager = createSnapshotManager({ discord: { default: managedStoppedAccount("crashed"), @@ -410,11 +420,12 @@ describe("channel-health-monitor", () => { }); const monitor = await startAndRunCheck(manager); expect(manager.startChannel).toHaveBeenCalledTimes(1); - await vi.advanceTimersByTimeAsync(DEFAULT_CHECK_INTERVAL_MS); + // With checkIntervalMs=5_000, cooldownCycles=2, computed cooldown = max(10_000, 60_000) = 60s. + // Advance 55s (11 intervals) — still in cooldown. + await vi.advanceTimersByTimeAsync(55_000); expect(manager.startChannel).toHaveBeenCalledTimes(1); - await vi.advanceTimersByTimeAsync(DEFAULT_CHECK_INTERVAL_MS); - expect(manager.startChannel).toHaveBeenCalledTimes(1); - await vi.advanceTimersByTimeAsync(DEFAULT_CHECK_INTERVAL_MS); + // Advance another 10s — past 60s cooldown, next check triggers restart. + await vi.advanceTimersByTimeAsync(10_000); expect(manager.startChannel).toHaveBeenCalledTimes(2); monitor.stop(); }); @@ -425,14 +436,17 @@ describe("channel-health-monitor", () => { default: managedStoppedAccount("keeps crashing"), }, }); + // Use 61s interval so cooldown (max(61_000, 60_000)) doesn't dominate. const monitor = startDefaultMonitor(manager, { - checkIntervalMs: 1_000, + checkIntervalMs: 61_000, cooldownCycles: 1, maxRestartsPerHour: 3, }); - await vi.advanceTimersByTimeAsync(5_001); + // Restart #1 at ~61s, #2 at ~183s, #3 at ~305s (every other interval due to <= cooldown). + await vi.advanceTimersByTimeAsync(310_000); expect(manager.startChannel).toHaveBeenCalledTimes(3); - await vi.advanceTimersByTimeAsync(1_001); + // One more full cycle pair — capped. + await vi.advanceTimersByTimeAsync(130_000); expect(manager.startChannel).toHaveBeenCalledTimes(3); monitor.stop(); }); @@ -493,6 +507,331 @@ describe("channel-health-monitor", () => { monitor.stop(); }); + describe("drain window", () => { + it("waits for active runs to drain before stopping a channel", async () => { + const now = Date.now(); + let callCount = 0; + const manager = createSnapshotManager( + { + discord: { + default: { + running: true, + connected: false, + enabled: true, + configured: true, + lastStartAt: now - 300_000, + activeRuns: 2, + }, + }, + }, + { + // After 3 snapshot reads (initial + 2 drain polls), simulate runs finishing. + getRuntimeSnapshot: vi.fn(() => { + callCount++; + return snapshotWith({ + discord: { + default: { + running: true, + connected: false, + enabled: true, + configured: true, + lastStartAt: now - 300_000, + activeRuns: callCount >= 4 ? 0 : 2, + }, + }, + }); + }), + }, + ); + const monitor = startDefaultMonitor(manager, { checkIntervalMs: DEFAULT_CHECK_INTERVAL_MS }); + // First interval fires the check. + await vi.advanceTimersByTimeAsync(DEFAULT_CHECK_INTERVAL_MS + 1); + // Drain polls every 1s. Advance enough for drain to see activeRuns=0. + await vi.advanceTimersByTimeAsync(3_000); + expect(manager.stopChannel).toHaveBeenCalledWith("discord", "default"); + expect(manager.startChannel).toHaveBeenCalledWith("discord", "default"); + monitor.stop(); + }); + + it("proceeds with stop after drain window timeout even if runs remain", async () => { + const now = Date.now(); + const manager = createSnapshotManager({ + discord: { + default: { + running: true, + connected: false, + enabled: true, + configured: true, + lastStartAt: now - 300_000, + // activeRuns stays > 0 forever. + activeRuns: 1, + }, + }, + }); + const monitor = startDefaultMonitor(manager, { checkIntervalMs: DEFAULT_CHECK_INTERVAL_MS }); + // First interval fires the check. + await vi.advanceTimersByTimeAsync(DEFAULT_CHECK_INTERVAL_MS + 1); + // Advance past the full 30s drain window. + await vi.advanceTimersByTimeAsync(31_000); + expect(manager.stopChannel).toHaveBeenCalledWith("discord", "default"); + expect(manager.startChannel).toHaveBeenCalledWith("discord", "default"); + monitor.stop(); + }); + + it("does not restart if monitor is stopped during drain", async () => { + const now = Date.now(); + let monitorRef: ReturnType | undefined; + let callCount = 0; + const manager = createMockChannelManager({ + getRuntimeSnapshot: vi.fn(() => { + callCount++; + // Stop the monitor partway through the drain window. + if (callCount === 2) { + monitorRef?.stop(); + } + return snapshotWith({ + discord: { + default: { + running: true, + connected: false, + enabled: true, + configured: true, + lastStartAt: now - 300_000, + activeRuns: 1, + busy: true, + lastRunActivityAt: now - 5_000, + }, + }, + }); + }), + }); + monitorRef = startDefaultMonitor(manager, { checkIntervalMs: DEFAULT_CHECK_INTERVAL_MS }); + await vi.advanceTimersByTimeAsync(DEFAULT_CHECK_INTERVAL_MS + 1); + // Advance enough for drain to be polled (which triggers stop). + await vi.advanceTimersByTimeAsync(2_000); + expect(manager.stopChannel).not.toHaveBeenCalled(); + expect(manager.startChannel).not.toHaveBeenCalled(); + }); + + it("does not restart if channel recovers (becomes healthy) during drain", async () => { + const now = Date.now(); + let callCount = 0; + const manager = createMockChannelManager({ + getRuntimeSnapshot: vi.fn(() => { + callCount++; + const recovered = callCount >= 4; + return snapshotWith({ + discord: { + default: { + running: true, + // Channel reconnects on the 4th snapshot call (post-drain re-check). + connected: recovered, + enabled: true, + configured: true, + lastStartAt: now - 300_000, + activeRuns: recovered ? 0 : 1, + busy: !recovered, + lastRunActivityAt: recovered ? now : now - 5_000, + }, + }, + }); + }), + }); + const monitor = startDefaultMonitor(manager, { checkIntervalMs: DEFAULT_CHECK_INTERVAL_MS }); + await vi.advanceTimersByTimeAsync(DEFAULT_CHECK_INTERVAL_MS + 1); + // Advance enough for drain to finish and post-drain re-check to see healthy. + await vi.advanceTimersByTimeAsync(3_000); + expect(manager.stopChannel).not.toHaveBeenCalled(); + expect(manager.startChannel).not.toHaveBeenCalled(); + monitor.stop(); + }); + + it("skips drain immediately for stale inherited activeRuns (lastRunActivityAt predates lastStartAt)", async () => { + const now = Date.now(); + const manager = createSnapshotManager({ + discord: { + default: { + running: true, + connected: false, + enabled: true, + configured: true, + lastStartAt: now - 300_000, + activeRuns: 1, + busy: true, + // Activity predates lastStartAt — inherited from prior lifecycle. + lastRunActivityAt: now - 400_000, + }, + }, + }); + const monitor = startDefaultMonitor(manager, { checkIntervalMs: DEFAULT_CHECK_INTERVAL_MS }); + await vi.advanceTimersByTimeAsync(DEFAULT_CHECK_INTERVAL_MS + 1); + // Should NOT need 30s drain window — restarts immediately. + expect(manager.stopChannel).toHaveBeenCalledWith("discord", "default"); + expect(manager.startChannel).toHaveBeenCalledWith("discord", "default"); + monitor.stop(); + }); + + it("skips restart when account is removed during drain", async () => { + const now = Date.now(); + let callCount = 0; + const manager = createMockChannelManager({ + getRuntimeSnapshot: vi.fn(() => { + callCount++; + // After the initial health check + first drain poll, simulate account removal. + const removed = callCount >= 3; + return snapshotWith( + removed + ? {} + : { + discord: { + default: { + running: true, + connected: false, + enabled: true, + configured: true, + lastStartAt: now - 300_000, + activeRuns: callCount >= 3 ? 0 : 1, + busy: true, + lastRunActivityAt: now - 5_000, + }, + }, + }, + ); + }), + }); + const monitor = startDefaultMonitor(manager, { checkIntervalMs: DEFAULT_CHECK_INTERVAL_MS }); + await vi.advanceTimersByTimeAsync(DEFAULT_CHECK_INTERVAL_MS + 1); + // Drain polls until activeRuns=0 / account removed. + await vi.advanceTimersByTimeAsync(3_000); + expect(manager.stopChannel).not.toHaveBeenCalled(); + expect(manager.startChannel).not.toHaveBeenCalled(); + monitor.stop(); + }); + + it("skips restart when health monitor is disabled during drain", async () => { + const now = Date.now(); + let callCount = 0; + let monitorEnabled = true; + const manager = createMockChannelManager({ + getRuntimeSnapshot: vi.fn(() => { + callCount++; + const drained = callCount >= 4; + return snapshotWith({ + discord: { + default: { + running: true, + connected: false, + enabled: true, + configured: true, + lastStartAt: now - 300_000, + activeRuns: drained ? 0 : 1, + busy: !drained, + lastRunActivityAt: now - 5_000, + }, + }, + }); + }), + isHealthMonitorEnabled: vi.fn(() => { + // Disable after drain starts (simulating operator hot-reload). + if (!monitorEnabled) { + return false; + } + return true; + }), + }); + const monitor = startDefaultMonitor(manager, { checkIntervalMs: DEFAULT_CHECK_INTERVAL_MS }); + await vi.advanceTimersByTimeAsync(DEFAULT_CHECK_INTERVAL_MS + 1); + // Disable health monitor while drain is in progress. + monitorEnabled = false; + // Drain finishes after a few polls. + await vi.advanceTimersByTimeAsync(3_000); + expect(manager.stopChannel).not.toHaveBeenCalled(); + expect(manager.startChannel).not.toHaveBeenCalled(); + monitor.stop(); + }); + + it("skips drain when channel has no active runs", async () => { + const now = Date.now(); + const snapshotFn = vi.fn(() => + snapshotWith({ + discord: { + default: { + running: true, + connected: false, + enabled: true, + configured: true, + lastStartAt: now - 300_000, + activeRuns: 0, + }, + }, + }), + ); + const manager = createMockChannelManager({ + getRuntimeSnapshot: snapshotFn, + }); + const monitor = await startAndRunCheck(manager); + // stopChannel should be called without any drain delay. + expect(manager.stopChannel).toHaveBeenCalledWith("discord", "default"); + // 3 snapshot calls: main health check, drain check (activeRuns=0 → returns), post-drain re-check. + expect(snapshotFn).toHaveBeenCalledTimes(3); + monitor.stop(); + }); + }); + + describe("cooldown floor", () => { + it("enforces 60s minimum even with short check intervals", async () => { + const manager = createSnapshotManager({ + discord: { + default: managedStoppedAccount("crashed"), + }, + }); + // checkIntervalMs=1_000, cooldownCycles=1 → computed cooldown = max(1_000, 60_000) = 60s. + const monitor = startDefaultMonitor(manager, { + checkIntervalMs: 1_000, + cooldownCycles: 1, + }); + // First restart at ~1s. + await vi.advanceTimersByTimeAsync(1_001); + expect(manager.startChannel).toHaveBeenCalledTimes(1); + // 30s later — still in 60s cooldown. + await vi.advanceTimersByTimeAsync(30_000); + expect(manager.startChannel).toHaveBeenCalledTimes(1); + // Past 60s cooldown floor. + await vi.advanceTimersByTimeAsync(35_000); + expect(manager.startChannel).toHaveBeenCalledTimes(2); + monitor.stop(); + }); + }); + + describe("fresh timing", () => { + it("uses resolveFreshTiming callback each cycle", async () => { + const now = Date.now(); + // Start with a very long stale threshold so the channel appears healthy. + let freshStaleMs = 999_999_999; + const manager = createSlackSnapshotManager( + runningConnectedSlackAccount({ + lastStartAt: now - 120_000, + lastEventAt: now - 100_000, + }), + ); + const resolveFreshTiming = vi.fn(() => ({ staleEventThresholdMs: freshStaleMs })); + const monitor = startDefaultMonitor(manager, { + checkIntervalMs: DEFAULT_CHECK_INTERVAL_MS, + resolveFreshTiming, + }); + // First check — stale threshold is huge, so no restart. + await vi.advanceTimersByTimeAsync(DEFAULT_CHECK_INTERVAL_MS + 1); + expect(manager.stopChannel).not.toHaveBeenCalled(); + expect(resolveFreshTiming).toHaveBeenCalled(); + // Lower the threshold so the channel is now considered stale. + freshStaleMs = 60_000; + await vi.advanceTimersByTimeAsync(DEFAULT_CHECK_INTERVAL_MS); + expect(manager.stopChannel).toHaveBeenCalledWith("slack", "default"); + expect(manager.startChannel).toHaveBeenCalledWith("slack", "default"); + monitor.stop(); + }); + }); + describe("stale socket detection", () => { const STALE_THRESHOLD = 30 * 60_000; diff --git a/src/gateway/channel-health-monitor.ts b/src/gateway/channel-health-monitor.ts index 809beb1abb8..65c95f2d473 100644 --- a/src/gateway/channel-health-monitor.ts +++ b/src/gateway/channel-health-monitor.ts @@ -17,6 +17,13 @@ const DEFAULT_COOLDOWN_CYCLES = 2; const DEFAULT_MAX_RESTARTS_PER_HOUR = 10; const ONE_HOUR_MS = 60 * 60_000; +/** Minimum cooldown between health-monitor-triggered restarts per channel. */ +const MIN_RESTART_COOLDOWN_MS = 60_000; + +/** Maximum time to wait for active agent runs to drain before aborting. */ +const DRAIN_WINDOW_MS = 30_000; +const DRAIN_POLL_MS = 1_000; + /** * How long a connected channel can go without receiving any event before * the health monitor treats it as a "stale socket" and triggers a restart. @@ -42,6 +49,8 @@ export type ChannelHealthMonitorDeps = { cooldownCycles?: number; maxRestartsPerHour?: number; abortSignal?: AbortSignal; + /** Called each check cycle to resolve fresh timing values (avoids stale config). */ + resolveFreshTiming?: () => Partial; }; export type ChannelHealthMonitor = { @@ -73,6 +82,70 @@ function resolveTimingPolicy( }; } +/** How long before run activity is considered stale / not belonging to any live run. */ +const STALE_RUN_ACTIVITY_MS = 25 * 60_000; + +/** + * Wait up to {@link DRAIN_WINDOW_MS} for active agent runs on a channel/account + * to finish so in-flight text replies are not silently aborted. + * + * Returns immediately (without waiting) when the busy state is determined to be + * stale — i.e. inherited from a prior lifecycle with no real in-flight runs. + */ +async function drainActiveRuns( + channelManager: ChannelManager, + channelId: ChannelId, + accountId: string, + isStopped: () => boolean, +): Promise { + const deadline = Date.now() + DRAIN_WINDOW_MS; + let warned = false; + let checkedStaleness = false; + while (!isStopped() && Date.now() < deadline) { + const snap = channelManager.getRuntimeSnapshot(); + const accountSnap = snap.channelAccounts[channelId]?.[accountId]; + const activeRuns = + typeof accountSnap?.activeRuns === "number" && Number.isFinite(accountSnap.activeRuns) + ? Math.max(0, Math.trunc(accountSnap.activeRuns)) + : 0; + if (activeRuns === 0) { + return; + } + // On the first iteration with active runs, check whether the busy state is stale + // (e.g. inherited from a prior lifecycle). If so, there are no real in-flight runs + // to protect — return immediately to avoid burning the full drain window. + if (!checkedStaleness) { + checkedStaleness = true; + const lastStartAt = + typeof accountSnap?.lastStartAt === "number" && Number.isFinite(accountSnap.lastStartAt) + ? accountSnap.lastStartAt + : null; + const lastRunActivityAt = + typeof accountSnap?.lastRunActivityAt === "number" && + Number.isFinite(accountSnap.lastRunActivityAt) + ? accountSnap.lastRunActivityAt + : null; + const isStale = + lastRunActivityAt == null || + (lastStartAt != null && lastRunActivityAt < lastStartAt) || + Date.now() - lastRunActivityAt > STALE_RUN_ACTIVITY_MS; + if (isStale) { + log.info?.( + `[${channelId}:${accountId}] health-monitor: skipping drain — busy state is stale (inherited from prior lifecycle)`, + ); + return; + } + } + if (!warned) { + log.info?.( + `[${channelId}:${accountId}] health-monitor: waiting for ${activeRuns} active run(s) to drain`, + ); + warned = true; + } + await new Promise((resolve) => setTimeout(resolve, DRAIN_POLL_MS)); + } +} + export function startChannelHealthMonitor(deps: ChannelHealthMonitorDeps): ChannelHealthMonitor { const { channelManager, @@ -83,7 +156,7 @@ export function startChannelHealthMonitor(deps: ChannelHealthMonitorDeps): Chann } = deps; const timing = resolveTimingPolicy(deps); - const cooldownMs = cooldownCycles * checkIntervalMs; + const cooldownMs = Math.max(cooldownCycles * checkIntervalMs, MIN_RESTART_COOLDOWN_MS); const restartRecords = new Map(); const startedAt = Date.now(); let stopped = false; @@ -108,6 +181,13 @@ export function startChannelHealthMonitor(deps: ChannelHealthMonitorDeps): Chann return; } + // Re-resolve timing each cycle so runtime config changes are picked up + // without waiting for a full health-monitor restart. + const freshTiming = deps.resolveFreshTiming?.(); + const effectiveTiming: ChannelHealthTimingPolicy = freshTiming + ? { ...timing, ...freshTiming } + : timing; + const snapshot = channelManager.getRuntimeSnapshot(); for (const [channelId, accounts] of Object.entries(snapshot.channelAccounts)) { @@ -127,8 +207,8 @@ export function startChannelHealthMonitor(deps: ChannelHealthMonitorDeps): Chann const healthPolicy: ChannelHealthPolicy = { channelId, now, - staleEventThresholdMs: timing.staleEventThresholdMs, - channelConnectGraceMs: timing.channelConnectGraceMs, + staleEventThresholdMs: effectiveTiming.staleEventThresholdMs, + channelConnectGraceMs: effectiveTiming.channelConnectGraceMs, }; const health = evaluateChannelHealth(status, healthPolicy); if (health.healthy) { @@ -159,12 +239,62 @@ export function startChannelHealthMonitor(deps: ChannelHealthMonitorDeps): Chann try { if (status.running) { + await drainActiveRuns( + channelManager, + channelId as ChannelId, + accountId, + () => stopped, + ); + // If the monitor was stopped during the drain window, abort the restart. + if (stopped) { + return; + } + // Re-evaluate channel health after drain: the channel may have recovered + // or reconnected while we were waiting. Only proceed if still unhealthy. + const postDrainSnap = channelManager.getRuntimeSnapshot(); + const postDrainStatus = postDrainSnap.channelAccounts[channelId]?.[accountId]; + // Account was removed during drain (config hot-reload) — do not resurrect it. + if (!postDrainStatus) { + log.debug?.( + `[${channelId}:${accountId}] health-monitor: account removed during drain, skipping restart`, + ); + continue; + } + if (postDrainStatus) { + const postDrainHealth = evaluateChannelHealth(postDrainStatus, { + ...healthPolicy, + now: Date.now(), + }); + if (postDrainHealth.healthy) { + log.info?.( + `[${channelId}:${accountId}] health-monitor: channel recovered during drain, skipping restart`, + ); + continue; + } + } + // Re-check monitor enablement after drain — operator may have disabled it. + if (!channelManager.isHealthMonitorEnabled(channelId as ChannelId, accountId)) { + log.info?.( + `[${channelId}:${accountId}] health-monitor: monitor disabled during drain, skipping restart`, + ); + continue; + } + // Re-prune the hourly bucket with a fresh timestamp so that entries + // which aged out during the drain window are not counted against the cap. + pruneOldRestarts(record, Date.now()); + if (record.restartsThisHour.length >= maxRestartsPerHour) { + log.warn?.( + `[${channelId}:${accountId}] health-monitor: hit ${maxRestartsPerHour} restarts/hour limit after drain, skipping`, + ); + continue; + } await channelManager.stopChannel(channelId as ChannelId, accountId); } channelManager.resetRestartAttempts(channelId as ChannelId, accountId); await channelManager.startChannel(channelId as ChannelId, accountId); - record.lastRestartAt = now; - record.restartsThisHour.push({ at: now }); + const restartedAt = Date.now(); + record.lastRestartAt = restartedAt; + record.restartsThisHour.push({ at: restartedAt }); restartRecords.set(key, record); } catch (err) { log.error?.(