diff --git a/src/gateway/channel-health-monitor.test.ts b/src/gateway/channel-health-monitor.test.ts index efc392f8ee0..b237ce7feb2 100644 --- a/src/gateway/channel-health-monitor.test.ts +++ b/src/gateway/channel-health-monitor.test.ts @@ -317,13 +317,23 @@ describe("channel-health-monitor", () => { it("restarts busy channels when run activity is stale", async () => { const now = Date.now(); const manager = createBusyDisconnectedManager(now - 26 * 60_000); - await expectRestartedChannel(manager, "discord"); + const monitor = await startAndRunCheck(manager); + // activeRuns stays 1, so drain window times out (30s). + await vi.advanceTimersByTimeAsync(31_000); + expect(manager.stopChannel).toHaveBeenCalledWith("discord", "default"); + expect(manager.startChannel).toHaveBeenCalledWith("discord", "default"); + monitor.stop(); }); it("restarts disconnected channels when busy flags are inherited from a prior lifecycle", async () => { const now = Date.now(); const manager = createBusyDisconnectedManager(now - 301_000); - await expectRestartedChannel(manager, "discord"); + const monitor = await startAndRunCheck(manager); + // activeRuns stays 1, so drain window times out (30s). + await vi.advanceTimersByTimeAsync(31_000); + expect(manager.stopChannel).toHaveBeenCalledWith("discord", "default"); + expect(manager.startChannel).toHaveBeenCalledWith("discord", "default"); + monitor.stop(); }); it("skips recently-started channels while they are still connecting", async () => { @@ -402,7 +412,7 @@ describe("channel-health-monitor", () => { monitor.stop(); }); - it("applies cooldown — skips recently restarted channels for 2 cycles", async () => { + it("applies cooldown — enforces minimum 60s floor between restarts", async () => { const manager = createSnapshotManager({ discord: { default: managedStoppedAccount("crashed"), @@ -410,11 +420,12 @@ describe("channel-health-monitor", () => { }); const monitor = await startAndRunCheck(manager); expect(manager.startChannel).toHaveBeenCalledTimes(1); - await vi.advanceTimersByTimeAsync(DEFAULT_CHECK_INTERVAL_MS); + // With checkIntervalMs=5_000, cooldownCycles=2, computed cooldown = max(10_000, 60_000) = 60s. + // Advance 55s (11 intervals) — still in cooldown. + await vi.advanceTimersByTimeAsync(55_000); expect(manager.startChannel).toHaveBeenCalledTimes(1); - await vi.advanceTimersByTimeAsync(DEFAULT_CHECK_INTERVAL_MS); - expect(manager.startChannel).toHaveBeenCalledTimes(1); - await vi.advanceTimersByTimeAsync(DEFAULT_CHECK_INTERVAL_MS); + // Advance another 10s — past 60s cooldown, next check triggers restart. + await vi.advanceTimersByTimeAsync(10_000); expect(manager.startChannel).toHaveBeenCalledTimes(2); monitor.stop(); }); @@ -425,14 +436,17 @@ describe("channel-health-monitor", () => { default: managedStoppedAccount("keeps crashing"), }, }); + // Use 61s interval so cooldown (max(61_000, 60_000)) doesn't dominate. const monitor = startDefaultMonitor(manager, { - checkIntervalMs: 1_000, + checkIntervalMs: 61_000, cooldownCycles: 1, maxRestartsPerHour: 3, }); - await vi.advanceTimersByTimeAsync(5_001); + // Restart #1 at ~61s, #2 at ~183s, #3 at ~305s (every other interval due to <= cooldown). + await vi.advanceTimersByTimeAsync(310_000); expect(manager.startChannel).toHaveBeenCalledTimes(3); - await vi.advanceTimersByTimeAsync(1_001); + // One more full cycle pair — capped. + await vi.advanceTimersByTimeAsync(130_000); expect(manager.startChannel).toHaveBeenCalledTimes(3); monitor.stop(); }); @@ -493,6 +507,159 @@ describe("channel-health-monitor", () => { monitor.stop(); }); + describe("drain window", () => { + it("waits for active runs to drain before stopping a channel", async () => { + const now = Date.now(); + let callCount = 0; + const manager = createSnapshotManager( + { + discord: { + default: { + running: true, + connected: false, + enabled: true, + configured: true, + lastStartAt: now - 300_000, + activeRuns: 2, + }, + }, + }, + { + // After 3 snapshot reads (initial + 2 drain polls), simulate runs finishing. + getRuntimeSnapshot: vi.fn(() => { + callCount++; + return snapshotWith({ + discord: { + default: { + running: true, + connected: false, + enabled: true, + configured: true, + lastStartAt: now - 300_000, + activeRuns: callCount >= 4 ? 0 : 2, + }, + }, + }); + }), + }, + ); + const monitor = startDefaultMonitor(manager, { checkIntervalMs: DEFAULT_CHECK_INTERVAL_MS }); + // First interval fires the check. + await vi.advanceTimersByTimeAsync(DEFAULT_CHECK_INTERVAL_MS + 1); + // Drain polls every 1s. Advance enough for drain to see activeRuns=0. + await vi.advanceTimersByTimeAsync(3_000); + expect(manager.stopChannel).toHaveBeenCalledWith("discord", "default"); + expect(manager.startChannel).toHaveBeenCalledWith("discord", "default"); + monitor.stop(); + }); + + it("proceeds with stop after drain window timeout even if runs remain", async () => { + const now = Date.now(); + const manager = createSnapshotManager({ + discord: { + default: { + running: true, + connected: false, + enabled: true, + configured: true, + lastStartAt: now - 300_000, + // activeRuns stays > 0 forever. + activeRuns: 1, + }, + }, + }); + const monitor = startDefaultMonitor(manager, { checkIntervalMs: DEFAULT_CHECK_INTERVAL_MS }); + // First interval fires the check. + await vi.advanceTimersByTimeAsync(DEFAULT_CHECK_INTERVAL_MS + 1); + // Advance past the full 30s drain window. + await vi.advanceTimersByTimeAsync(31_000); + expect(manager.stopChannel).toHaveBeenCalledWith("discord", "default"); + expect(manager.startChannel).toHaveBeenCalledWith("discord", "default"); + monitor.stop(); + }); + + it("skips drain when channel has no active runs", async () => { + const now = Date.now(); + const snapshotFn = vi.fn(() => + snapshotWith({ + discord: { + default: { + running: true, + connected: false, + enabled: true, + configured: true, + lastStartAt: now - 300_000, + activeRuns: 0, + }, + }, + }), + ); + const manager = createMockChannelManager({ + getRuntimeSnapshot: snapshotFn, + }); + const monitor = await startAndRunCheck(manager); + // stopChannel should be called without any drain delay. + expect(manager.stopChannel).toHaveBeenCalledWith("discord", "default"); + // Only 2 snapshot calls: one for the main check, one for the drain check. + expect(snapshotFn).toHaveBeenCalledTimes(2); + monitor.stop(); + }); + }); + + describe("cooldown floor", () => { + it("enforces 60s minimum even with short check intervals", async () => { + const manager = createSnapshotManager({ + discord: { + default: managedStoppedAccount("crashed"), + }, + }); + // checkIntervalMs=1_000, cooldownCycles=1 → computed cooldown = max(1_000, 60_000) = 60s. + const monitor = startDefaultMonitor(manager, { + checkIntervalMs: 1_000, + cooldownCycles: 1, + }); + // First restart at ~1s. + await vi.advanceTimersByTimeAsync(1_001); + expect(manager.startChannel).toHaveBeenCalledTimes(1); + // 30s later — still in 60s cooldown. + await vi.advanceTimersByTimeAsync(30_000); + expect(manager.startChannel).toHaveBeenCalledTimes(1); + // Past 60s cooldown floor. + await vi.advanceTimersByTimeAsync(35_000); + expect(manager.startChannel).toHaveBeenCalledTimes(2); + monitor.stop(); + }); + }); + + describe("fresh timing", () => { + it("uses resolveFreshTiming callback each cycle", async () => { + const now = Date.now(); + // Start with a very long stale threshold so the channel appears healthy. + let freshStaleMs = 999_999_999; + const manager = createSlackSnapshotManager( + runningConnectedSlackAccount({ + lastStartAt: now - 120_000, + lastEventAt: now - 100_000, + }), + ); + const resolveFreshTiming = vi.fn(() => ({ staleEventThresholdMs: freshStaleMs })); + const monitor = startDefaultMonitor(manager, { + checkIntervalMs: DEFAULT_CHECK_INTERVAL_MS, + resolveFreshTiming, + }); + // First check — stale threshold is huge, so no restart. + await vi.advanceTimersByTimeAsync(DEFAULT_CHECK_INTERVAL_MS + 1); + expect(manager.stopChannel).not.toHaveBeenCalled(); + expect(resolveFreshTiming).toHaveBeenCalled(); + // Lower the threshold so the channel is now considered stale. + freshStaleMs = 60_000; + await vi.advanceTimersByTimeAsync(DEFAULT_CHECK_INTERVAL_MS); + expect(manager.stopChannel).toHaveBeenCalledWith("slack", "default"); + expect(manager.startChannel).toHaveBeenCalledWith("slack", "default"); + monitor.stop(); + }); + }); + describe("stale socket detection", () => { const STALE_THRESHOLD = 30 * 60_000; diff --git a/src/gateway/channel-health-monitor.ts b/src/gateway/channel-health-monitor.ts index 809beb1abb8..f0fca813b65 100644 --- a/src/gateway/channel-health-monitor.ts +++ b/src/gateway/channel-health-monitor.ts @@ -17,6 +17,13 @@ const DEFAULT_COOLDOWN_CYCLES = 2; const DEFAULT_MAX_RESTARTS_PER_HOUR = 10; const ONE_HOUR_MS = 60 * 60_000; +/** Minimum cooldown between health-monitor-triggered restarts per channel. */ +const MIN_RESTART_COOLDOWN_MS = 60_000; + +/** Maximum time to wait for active agent runs to drain before aborting. */ +const DRAIN_WINDOW_MS = 30_000; +const DRAIN_POLL_MS = 1_000; + /** * How long a connected channel can go without receiving any event before * the health monitor treats it as a "stale socket" and triggers a restart. @@ -42,6 +49,8 @@ export type ChannelHealthMonitorDeps = { cooldownCycles?: number; maxRestartsPerHour?: number; abortSignal?: AbortSignal; + /** Called each check cycle to resolve fresh timing values (avoids stale config). */ + resolveFreshTiming?: () => Partial; }; export type ChannelHealthMonitor = { @@ -73,6 +82,38 @@ function resolveTimingPolicy( }; } +/** + * Wait up to {@link DRAIN_WINDOW_MS} for active agent runs on a channel/account + * to finish so in-flight text replies are not silently aborted. + */ +async function drainActiveRuns( + channelManager: ChannelManager, + channelId: ChannelId, + accountId: string, + stopped: boolean, +): Promise { + const deadline = Date.now() + DRAIN_WINDOW_MS; + let warned = false; + while (!stopped && Date.now() < deadline) { + const snap = channelManager.getRuntimeSnapshot(); + const accountSnap = snap.channelAccounts[channelId]?.[accountId]; + const activeRuns = + typeof accountSnap?.activeRuns === "number" && Number.isFinite(accountSnap.activeRuns) + ? Math.max(0, Math.trunc(accountSnap.activeRuns)) + : 0; + if (activeRuns === 0) { + return; + } + if (!warned) { + log.info?.( + `[${channelId}:${accountId}] health-monitor: waiting for ${activeRuns} active run(s) to drain`, + ); + warned = true; + } + await new Promise((resolve) => setTimeout(resolve, DRAIN_POLL_MS)); + } +} + export function startChannelHealthMonitor(deps: ChannelHealthMonitorDeps): ChannelHealthMonitor { const { channelManager, @@ -83,7 +124,7 @@ export function startChannelHealthMonitor(deps: ChannelHealthMonitorDeps): Chann } = deps; const timing = resolveTimingPolicy(deps); - const cooldownMs = cooldownCycles * checkIntervalMs; + const cooldownMs = Math.max(cooldownCycles * checkIntervalMs, MIN_RESTART_COOLDOWN_MS); const restartRecords = new Map(); const startedAt = Date.now(); let stopped = false; @@ -108,6 +149,13 @@ export function startChannelHealthMonitor(deps: ChannelHealthMonitorDeps): Chann return; } + // Re-resolve timing each cycle so runtime config changes are picked up + // without waiting for a full health-monitor restart. + const freshTiming = deps.resolveFreshTiming?.(); + const effectiveTiming: ChannelHealthTimingPolicy = freshTiming + ? { ...timing, ...freshTiming } + : timing; + const snapshot = channelManager.getRuntimeSnapshot(); for (const [channelId, accounts] of Object.entries(snapshot.channelAccounts)) { @@ -127,8 +175,8 @@ export function startChannelHealthMonitor(deps: ChannelHealthMonitorDeps): Chann const healthPolicy: ChannelHealthPolicy = { channelId, now, - staleEventThresholdMs: timing.staleEventThresholdMs, - channelConnectGraceMs: timing.channelConnectGraceMs, + staleEventThresholdMs: effectiveTiming.staleEventThresholdMs, + channelConnectGraceMs: effectiveTiming.channelConnectGraceMs, }; const health = evaluateChannelHealth(status, healthPolicy); if (health.healthy) { @@ -159,6 +207,7 @@ export function startChannelHealthMonitor(deps: ChannelHealthMonitorDeps): Chann try { if (status.running) { + await drainActiveRuns(channelManager, channelId as ChannelId, accountId, stopped); await channelManager.stopChannel(channelId as ChannelId, accountId); } channelManager.resetRestartAttempts(channelId as ChannelId, accountId);