fix(health-monitor): re-check state post-drain and skip stale busy drain

This commit is contained in:
Jason Wu 2026-03-19 10:06:14 +08:00
parent afa4adb202
commit 879c6e9040
2 changed files with 147 additions and 2 deletions

View File

@ -578,6 +578,99 @@ describe("channel-health-monitor", () => {
monitor.stop();
});
it("does not restart if monitor is stopped during drain", async () => {
const now = Date.now();
let monitorRef: ReturnType<typeof startDefaultMonitor> | undefined;
let callCount = 0;
const manager = createMockChannelManager({
getRuntimeSnapshot: vi.fn(() => {
callCount++;
// Stop the monitor partway through the drain window.
if (callCount === 2) {
monitorRef?.stop();
}
return snapshotWith({
discord: {
default: {
running: true,
connected: false,
enabled: true,
configured: true,
lastStartAt: now - 300_000,
activeRuns: 1,
busy: true,
lastRunActivityAt: now - 5_000,
},
},
});
}),
});
monitorRef = startDefaultMonitor(manager, { checkIntervalMs: DEFAULT_CHECK_INTERVAL_MS });
await vi.advanceTimersByTimeAsync(DEFAULT_CHECK_INTERVAL_MS + 1);
// Advance enough for drain to be polled (which triggers stop).
await vi.advanceTimersByTimeAsync(2_000);
expect(manager.stopChannel).not.toHaveBeenCalled();
expect(manager.startChannel).not.toHaveBeenCalled();
});
it("does not restart if channel recovers (becomes healthy) during drain", async () => {
const now = Date.now();
let callCount = 0;
const manager = createMockChannelManager({
getRuntimeSnapshot: vi.fn(() => {
callCount++;
const recovered = callCount >= 4;
return snapshotWith({
discord: {
default: {
running: true,
// Channel reconnects on the 4th snapshot call (post-drain re-check).
connected: recovered,
enabled: true,
configured: true,
lastStartAt: now - 300_000,
activeRuns: recovered ? 0 : 1,
busy: !recovered,
lastRunActivityAt: recovered ? now : now - 5_000,
},
},
});
}),
});
const monitor = startDefaultMonitor(manager, { checkIntervalMs: DEFAULT_CHECK_INTERVAL_MS });
await vi.advanceTimersByTimeAsync(DEFAULT_CHECK_INTERVAL_MS + 1);
// Advance enough for drain to finish and post-drain re-check to see healthy.
await vi.advanceTimersByTimeAsync(3_000);
expect(manager.stopChannel).not.toHaveBeenCalled();
expect(manager.startChannel).not.toHaveBeenCalled();
monitor.stop();
});
it("skips drain immediately for stale inherited activeRuns (lastRunActivityAt predates lastStartAt)", async () => {
const now = Date.now();
const manager = createSnapshotManager({
discord: {
default: {
running: true,
connected: false,
enabled: true,
configured: true,
lastStartAt: now - 300_000,
activeRuns: 1,
busy: true,
// Activity predates lastStartAt — inherited from prior lifecycle.
lastRunActivityAt: now - 400_000,
},
},
});
const monitor = startDefaultMonitor(manager, { checkIntervalMs: DEFAULT_CHECK_INTERVAL_MS });
await vi.advanceTimersByTimeAsync(DEFAULT_CHECK_INTERVAL_MS + 1);
// Should NOT need 30s drain window — restarts immediately.
expect(manager.stopChannel).toHaveBeenCalledWith("discord", "default");
expect(manager.startChannel).toHaveBeenCalledWith("discord", "default");
monitor.stop();
});
it("skips drain when channel has no active runs", async () => {
const now = Date.now();
const snapshotFn = vi.fn(() =>
@ -600,8 +693,8 @@ describe("channel-health-monitor", () => {
const monitor = await startAndRunCheck(manager);
// stopChannel should be called without any drain delay.
expect(manager.stopChannel).toHaveBeenCalledWith("discord", "default");
// Only 2 snapshot calls: one for the main check, one for the drain check.
expect(snapshotFn).toHaveBeenCalledTimes(2);
// 3 snapshot calls: main health check, drain check (activeRuns=0 → returns), post-drain re-check.
expect(snapshotFn).toHaveBeenCalledTimes(3);
monitor.stop();
});
});

View File

@ -82,9 +82,15 @@ function resolveTimingPolicy(
};
}
/** How long before run activity is considered stale / not belonging to any live run. */
const STALE_RUN_ACTIVITY_MS = 25 * 60_000;
/**
* Wait up to {@link DRAIN_WINDOW_MS} for active agent runs on a channel/account
* to finish so in-flight text replies are not silently aborted.
*
* Returns immediately (without waiting) when the busy state is determined to be
* stale i.e. inherited from a prior lifecycle with no real in-flight runs.
*/
async function drainActiveRuns(
channelManager: ChannelManager,
@ -94,6 +100,7 @@ async function drainActiveRuns(
): Promise<void> {
const deadline = Date.now() + DRAIN_WINDOW_MS;
let warned = false;
let checkedStaleness = false;
while (!isStopped() && Date.now() < deadline) {
const snap = channelManager.getRuntimeSnapshot();
const accountSnap = snap.channelAccounts[channelId]?.[accountId];
@ -104,6 +111,31 @@ async function drainActiveRuns(
if (activeRuns === 0) {
return;
}
// On the first iteration with active runs, check whether the busy state is stale
// (e.g. inherited from a prior lifecycle). If so, there are no real in-flight runs
// to protect — return immediately to avoid burning the full drain window.
if (!checkedStaleness) {
checkedStaleness = true;
const lastStartAt =
typeof accountSnap?.lastStartAt === "number" && Number.isFinite(accountSnap.lastStartAt)
? accountSnap.lastStartAt
: null;
const lastRunActivityAt =
typeof accountSnap?.lastRunActivityAt === "number" &&
Number.isFinite(accountSnap.lastRunActivityAt)
? accountSnap.lastRunActivityAt
: null;
const isStale =
lastRunActivityAt == null ||
(lastStartAt != null && lastRunActivityAt < lastStartAt) ||
Date.now() - lastRunActivityAt > STALE_RUN_ACTIVITY_MS;
if (isStale) {
log.info?.(
`[${channelId}:${accountId}] health-monitor: skipping drain — busy state is stale (inherited from prior lifecycle)`,
);
return;
}
}
if (!warned) {
log.info?.(
`[${channelId}:${accountId}] health-monitor: waiting for ${activeRuns} active run(s) to drain`,
@ -213,6 +245,26 @@ export function startChannelHealthMonitor(deps: ChannelHealthMonitorDeps): Chann
accountId,
() => stopped,
);
// If the monitor was stopped during the drain window, abort the restart.
if (stopped) {
break;
}
// Re-evaluate channel health after drain: the channel may have recovered
// or reconnected while we were waiting. Only proceed if still unhealthy.
const postDrainSnap = channelManager.getRuntimeSnapshot();
const postDrainStatus = postDrainSnap.channelAccounts[channelId]?.[accountId];
if (postDrainStatus) {
const postDrainHealth = evaluateChannelHealth(postDrainStatus, {
...healthPolicy,
now: Date.now(),
});
if (postDrainHealth.healthy) {
log.info?.(
`[${channelId}:${accountId}] health-monitor: channel recovered during drain, skipping restart`,
);
continue;
}
}
await channelManager.stopChannel(channelId as ChannelId, accountId);
}
channelManager.resetRestartAttempts(channelId as ChannelId, accountId);