fix(health-monitor): drain active runs before abort on channel restart

- Add 30s drain window: wait for in-flight agent runs to complete before
  calling abort on health-monitor-triggered channel restarts, preventing
  silently dropped text replies (reactions survive since they fire early)
- Add 60s cooldown floor between health-monitor restarts to prevent rapid
  restart cycling when check intervals are short
- Add resolveFreshTiming callback so health evaluations use current config
  values instead of potentially stale timing captured at monitor startup

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
Jason Wu 2026-03-17 13:50:14 +08:00
parent 94c27f34a1
commit 8026d56983
2 changed files with 229 additions and 13 deletions

View File

@ -317,13 +317,23 @@ describe("channel-health-monitor", () => {
it("restarts busy channels when run activity is stale", async () => {
const now = Date.now();
const manager = createBusyDisconnectedManager(now - 26 * 60_000);
await expectRestartedChannel(manager, "discord");
const monitor = await startAndRunCheck(manager);
// activeRuns stays 1, so drain window times out (30s).
await vi.advanceTimersByTimeAsync(31_000);
expect(manager.stopChannel).toHaveBeenCalledWith("discord", "default");
expect(manager.startChannel).toHaveBeenCalledWith("discord", "default");
monitor.stop();
});
it("restarts disconnected channels when busy flags are inherited from a prior lifecycle", async () => {
const now = Date.now();
const manager = createBusyDisconnectedManager(now - 301_000);
await expectRestartedChannel(manager, "discord");
const monitor = await startAndRunCheck(manager);
// activeRuns stays 1, so drain window times out (30s).
await vi.advanceTimersByTimeAsync(31_000);
expect(manager.stopChannel).toHaveBeenCalledWith("discord", "default");
expect(manager.startChannel).toHaveBeenCalledWith("discord", "default");
monitor.stop();
});
it("skips recently-started channels while they are still connecting", async () => {
@ -402,7 +412,7 @@ describe("channel-health-monitor", () => {
monitor.stop();
});
it("applies cooldown — skips recently restarted channels for 2 cycles", async () => {
it("applies cooldown — enforces minimum 60s floor between restarts", async () => {
const manager = createSnapshotManager({
discord: {
default: managedStoppedAccount("crashed"),
@ -410,11 +420,12 @@ describe("channel-health-monitor", () => {
});
const monitor = await startAndRunCheck(manager);
expect(manager.startChannel).toHaveBeenCalledTimes(1);
await vi.advanceTimersByTimeAsync(DEFAULT_CHECK_INTERVAL_MS);
// With checkIntervalMs=5_000, cooldownCycles=2, computed cooldown = max(10_000, 60_000) = 60s.
// Advance 55s (11 intervals) — still in cooldown.
await vi.advanceTimersByTimeAsync(55_000);
expect(manager.startChannel).toHaveBeenCalledTimes(1);
await vi.advanceTimersByTimeAsync(DEFAULT_CHECK_INTERVAL_MS);
expect(manager.startChannel).toHaveBeenCalledTimes(1);
await vi.advanceTimersByTimeAsync(DEFAULT_CHECK_INTERVAL_MS);
// Advance another 10s — past 60s cooldown, next check triggers restart.
await vi.advanceTimersByTimeAsync(10_000);
expect(manager.startChannel).toHaveBeenCalledTimes(2);
monitor.stop();
});
@ -425,14 +436,17 @@ describe("channel-health-monitor", () => {
default: managedStoppedAccount("keeps crashing"),
},
});
// Use 61s interval so cooldown (max(61_000, 60_000)) doesn't dominate.
const monitor = startDefaultMonitor(manager, {
checkIntervalMs: 1_000,
checkIntervalMs: 61_000,
cooldownCycles: 1,
maxRestartsPerHour: 3,
});
await vi.advanceTimersByTimeAsync(5_001);
// Restart #1 at ~61s, #2 at ~183s, #3 at ~305s (every other interval due to <= cooldown).
await vi.advanceTimersByTimeAsync(310_000);
expect(manager.startChannel).toHaveBeenCalledTimes(3);
await vi.advanceTimersByTimeAsync(1_001);
// One more full cycle pair — capped.
await vi.advanceTimersByTimeAsync(130_000);
expect(manager.startChannel).toHaveBeenCalledTimes(3);
monitor.stop();
});
@ -493,6 +507,159 @@ describe("channel-health-monitor", () => {
monitor.stop();
});
describe("drain window", () => {
it("waits for active runs to drain before stopping a channel", async () => {
const now = Date.now();
let callCount = 0;
const manager = createSnapshotManager(
{
discord: {
default: {
running: true,
connected: false,
enabled: true,
configured: true,
lastStartAt: now - 300_000,
activeRuns: 2,
},
},
},
{
// After 3 snapshot reads (initial + 2 drain polls), simulate runs finishing.
getRuntimeSnapshot: vi.fn(() => {
callCount++;
return snapshotWith({
discord: {
default: {
running: true,
connected: false,
enabled: true,
configured: true,
lastStartAt: now - 300_000,
activeRuns: callCount >= 4 ? 0 : 2,
},
},
});
}),
},
);
const monitor = startDefaultMonitor(manager, { checkIntervalMs: DEFAULT_CHECK_INTERVAL_MS });
// First interval fires the check.
await vi.advanceTimersByTimeAsync(DEFAULT_CHECK_INTERVAL_MS + 1);
// Drain polls every 1s. Advance enough for drain to see activeRuns=0.
await vi.advanceTimersByTimeAsync(3_000);
expect(manager.stopChannel).toHaveBeenCalledWith("discord", "default");
expect(manager.startChannel).toHaveBeenCalledWith("discord", "default");
monitor.stop();
});
it("proceeds with stop after drain window timeout even if runs remain", async () => {
const now = Date.now();
const manager = createSnapshotManager({
discord: {
default: {
running: true,
connected: false,
enabled: true,
configured: true,
lastStartAt: now - 300_000,
// activeRuns stays > 0 forever.
activeRuns: 1,
},
},
});
const monitor = startDefaultMonitor(manager, { checkIntervalMs: DEFAULT_CHECK_INTERVAL_MS });
// First interval fires the check.
await vi.advanceTimersByTimeAsync(DEFAULT_CHECK_INTERVAL_MS + 1);
// Advance past the full 30s drain window.
await vi.advanceTimersByTimeAsync(31_000);
expect(manager.stopChannel).toHaveBeenCalledWith("discord", "default");
expect(manager.startChannel).toHaveBeenCalledWith("discord", "default");
monitor.stop();
});
it("skips drain when channel has no active runs", async () => {
const now = Date.now();
const snapshotFn = vi.fn(() =>
snapshotWith({
discord: {
default: {
running: true,
connected: false,
enabled: true,
configured: true,
lastStartAt: now - 300_000,
activeRuns: 0,
},
},
}),
);
const manager = createMockChannelManager({
getRuntimeSnapshot: snapshotFn,
});
const monitor = await startAndRunCheck(manager);
// stopChannel should be called without any drain delay.
expect(manager.stopChannel).toHaveBeenCalledWith("discord", "default");
// Only 2 snapshot calls: one for the main check, one for the drain check.
expect(snapshotFn).toHaveBeenCalledTimes(2);
monitor.stop();
});
});
describe("cooldown floor", () => {
it("enforces 60s minimum even with short check intervals", async () => {
const manager = createSnapshotManager({
discord: {
default: managedStoppedAccount("crashed"),
},
});
// checkIntervalMs=1_000, cooldownCycles=1 → computed cooldown = max(1_000, 60_000) = 60s.
const monitor = startDefaultMonitor(manager, {
checkIntervalMs: 1_000,
cooldownCycles: 1,
});
// First restart at ~1s.
await vi.advanceTimersByTimeAsync(1_001);
expect(manager.startChannel).toHaveBeenCalledTimes(1);
// 30s later — still in 60s cooldown.
await vi.advanceTimersByTimeAsync(30_000);
expect(manager.startChannel).toHaveBeenCalledTimes(1);
// Past 60s cooldown floor.
await vi.advanceTimersByTimeAsync(35_000);
expect(manager.startChannel).toHaveBeenCalledTimes(2);
monitor.stop();
});
});
describe("fresh timing", () => {
it("uses resolveFreshTiming callback each cycle", async () => {
const now = Date.now();
// Start with a very long stale threshold so the channel appears healthy.
let freshStaleMs = 999_999_999;
const manager = createSlackSnapshotManager(
runningConnectedSlackAccount({
lastStartAt: now - 120_000,
lastEventAt: now - 100_000,
}),
);
const resolveFreshTiming = vi.fn(() => ({ staleEventThresholdMs: freshStaleMs }));
const monitor = startDefaultMonitor(manager, {
checkIntervalMs: DEFAULT_CHECK_INTERVAL_MS,
resolveFreshTiming,
});
// First check — stale threshold is huge, so no restart.
await vi.advanceTimersByTimeAsync(DEFAULT_CHECK_INTERVAL_MS + 1);
expect(manager.stopChannel).not.toHaveBeenCalled();
expect(resolveFreshTiming).toHaveBeenCalled();
// Lower the threshold so the channel is now considered stale.
freshStaleMs = 60_000;
await vi.advanceTimersByTimeAsync(DEFAULT_CHECK_INTERVAL_MS);
expect(manager.stopChannel).toHaveBeenCalledWith("slack", "default");
expect(manager.startChannel).toHaveBeenCalledWith("slack", "default");
monitor.stop();
});
});
describe("stale socket detection", () => {
const STALE_THRESHOLD = 30 * 60_000;

View File

@ -17,6 +17,13 @@ const DEFAULT_COOLDOWN_CYCLES = 2;
const DEFAULT_MAX_RESTARTS_PER_HOUR = 10;
const ONE_HOUR_MS = 60 * 60_000;
/** Minimum cooldown between health-monitor-triggered restarts per channel. */
const MIN_RESTART_COOLDOWN_MS = 60_000;
/** Maximum time to wait for active agent runs to drain before aborting. */
const DRAIN_WINDOW_MS = 30_000;
const DRAIN_POLL_MS = 1_000;
/**
* How long a connected channel can go without receiving any event before
* the health monitor treats it as a "stale socket" and triggers a restart.
@ -42,6 +49,8 @@ export type ChannelHealthMonitorDeps = {
cooldownCycles?: number;
maxRestartsPerHour?: number;
abortSignal?: AbortSignal;
/** Called each check cycle to resolve fresh timing values (avoids stale config). */
resolveFreshTiming?: () => Partial<ChannelHealthTimingPolicy>;
};
export type ChannelHealthMonitor = {
@ -73,6 +82,38 @@ function resolveTimingPolicy(
};
}
/**
* Wait up to {@link DRAIN_WINDOW_MS} for active agent runs on a channel/account
* to finish so in-flight text replies are not silently aborted.
*/
async function drainActiveRuns(
channelManager: ChannelManager,
channelId: ChannelId,
accountId: string,
stopped: boolean,
): Promise<void> {
const deadline = Date.now() + DRAIN_WINDOW_MS;
let warned = false;
while (!stopped && Date.now() < deadline) {
const snap = channelManager.getRuntimeSnapshot();
const accountSnap = snap.channelAccounts[channelId]?.[accountId];
const activeRuns =
typeof accountSnap?.activeRuns === "number" && Number.isFinite(accountSnap.activeRuns)
? Math.max(0, Math.trunc(accountSnap.activeRuns))
: 0;
if (activeRuns === 0) {
return;
}
if (!warned) {
log.info?.(
`[${channelId}:${accountId}] health-monitor: waiting for ${activeRuns} active run(s) to drain`,
);
warned = true;
}
await new Promise<void>((resolve) => setTimeout(resolve, DRAIN_POLL_MS));
}
}
export function startChannelHealthMonitor(deps: ChannelHealthMonitorDeps): ChannelHealthMonitor {
const {
channelManager,
@ -83,7 +124,7 @@ export function startChannelHealthMonitor(deps: ChannelHealthMonitorDeps): Chann
} = deps;
const timing = resolveTimingPolicy(deps);
const cooldownMs = cooldownCycles * checkIntervalMs;
const cooldownMs = Math.max(cooldownCycles * checkIntervalMs, MIN_RESTART_COOLDOWN_MS);
const restartRecords = new Map<string, RestartRecord>();
const startedAt = Date.now();
let stopped = false;
@ -108,6 +149,13 @@ export function startChannelHealthMonitor(deps: ChannelHealthMonitorDeps): Chann
return;
}
// Re-resolve timing each cycle so runtime config changes are picked up
// without waiting for a full health-monitor restart.
const freshTiming = deps.resolveFreshTiming?.();
const effectiveTiming: ChannelHealthTimingPolicy = freshTiming
? { ...timing, ...freshTiming }
: timing;
const snapshot = channelManager.getRuntimeSnapshot();
for (const [channelId, accounts] of Object.entries(snapshot.channelAccounts)) {
@ -127,8 +175,8 @@ export function startChannelHealthMonitor(deps: ChannelHealthMonitorDeps): Chann
const healthPolicy: ChannelHealthPolicy = {
channelId,
now,
staleEventThresholdMs: timing.staleEventThresholdMs,
channelConnectGraceMs: timing.channelConnectGraceMs,
staleEventThresholdMs: effectiveTiming.staleEventThresholdMs,
channelConnectGraceMs: effectiveTiming.channelConnectGraceMs,
};
const health = evaluateChannelHealth(status, healthPolicy);
if (health.healthy) {
@ -159,6 +207,7 @@ export function startChannelHealthMonitor(deps: ChannelHealthMonitorDeps): Chann
try {
if (status.running) {
await drainActiveRuns(channelManager, channelId as ChannelId, accountId, stopped);
await channelManager.stopChannel(channelId as ChannelId, accountId);
}
channelManager.resetRestartAttempts(channelId as ChannelId, accountId);