From 77d97f24937518d22be900bba3823e18777ecfc9 Mon Sep 17 00:00:00 2001 From: Rene Date: Sun, 15 Mar 2026 16:54:26 +0800 Subject: [PATCH] fix(heartbeat): move scheduleNext to finally block to prevent timer death (#45772) The heartbeat timer permanently stops after 1-2 triggers because scheduleNext() is called on each individual return path in run(). If an unhandled rejection or silent error exits the function before reaching any of those calls, the timer is never re-armed and heartbeats stop permanently. This fix wraps the run() body in try/finally with scheduleNext() in the finally block, guaranteeing the timer is always re-armed regardless of how the function exits. The three early guard returns (stopped, disabled, no agents) remain above the try/finally since they legitimately should not reschedule. Fixes #45772 --- src/infra/heartbeat-runner.scheduler.test.ts | 30 ++++ src/infra/heartbeat-runner.ts | 145 ++++++++++--------- 2 files changed, 106 insertions(+), 69 deletions(-) diff --git a/src/infra/heartbeat-runner.scheduler.test.ts b/src/infra/heartbeat-runner.scheduler.test.ts index 10313dc33ad..1d6fbd66b55 100644 --- a/src/infra/heartbeat-runner.scheduler.test.ts +++ b/src/infra/heartbeat-runner.scheduler.test.ts @@ -283,4 +283,34 @@ describe("startHeartbeatRunner", () => { runner.stop(); }); + + it("re-arms timer after runOnce rejects with an unhandled promise rejection (#45772)", async () => { + useFakeHeartbeatTime(); + + let callCount = 0; + const runSpy = vi.fn().mockImplementation(async () => { + callCount++; + if (callCount === 1) { + // Simulate an unhandled rejection that escapes the inner try/catch + // (e.g. from session resolution or preflight code outside the + // existing error handling). Before the fix, this would permanently + // kill the heartbeat timer. + return Promise.reject(new Error("unexpected async failure")); + } + return { status: "ran", durationMs: 1 }; + }); + + const runner = startDefaultRunner(runSpy); + + // First heartbeat fires and rejects + await vi.advanceTimersByTimeAsync(30 * 60_000 + 1_000); + expect(runSpy).toHaveBeenCalledTimes(1); + + // Second heartbeat MUST still fire — the timer must have been re-armed + // despite the rejection. This is the core assertion for #45772. + await vi.advanceTimersByTimeAsync(30 * 60_000 + 1_000); + expect(runSpy).toHaveBeenCalledTimes(2); + + runner.stop(); + }); }); diff --git a/src/infra/heartbeat-runner.ts b/src/infra/heartbeat-runner.ts index 34b3a7b5f86..68d151949a9 100644 --- a/src/infra/heartbeat-runner.ts +++ b/src/infra/heartbeat-runner.ts @@ -1079,80 +1079,87 @@ export function startHeartbeatRunner(opts: { const now = startedAt; let ran = false; - if (requestedSessionKey || requestedAgentId) { - const targetAgentId = requestedAgentId ?? resolveAgentIdFromSessionKey(requestedSessionKey); - const targetAgent = state.agents.get(targetAgentId); - if (!targetAgent) { - scheduleNext(); - return { status: "skipped", reason: "disabled" }; - } - try { - const res = await runOnce({ - cfg: state.cfg, - agentId: targetAgent.agentId, - heartbeat: targetAgent.heartbeat, - reason, - sessionKey: requestedSessionKey, - deps: { runtime: state.runtime }, - }); - if (res.status !== "skipped" || res.reason !== "disabled") { - advanceAgentSchedule(targetAgent, now); + // Wrap the entire run in try/finally so scheduleNext() is ALWAYS called, + // even if an unexpected rejection or silent error exits the function early. + // This prevents the heartbeat timer from permanently dying (see #45772). + try { + if (requestedSessionKey || requestedAgentId) { + const targetAgentId = requestedAgentId ?? resolveAgentIdFromSessionKey(requestedSessionKey); + const targetAgent = state.agents.get(targetAgentId); + if (!targetAgent) { + return { status: "skipped", reason: "disabled" }; + } + try { + const res = await runOnce({ + cfg: state.cfg, + agentId: targetAgent.agentId, + heartbeat: targetAgent.heartbeat, + reason, + sessionKey: requestedSessionKey, + deps: { runtime: state.runtime }, + }); + if (res.status !== "skipped" || res.reason !== "disabled") { + advanceAgentSchedule(targetAgent, now); + } + return res.status === "ran" ? { status: "ran", durationMs: Date.now() - startedAt } : res; + } catch (err) { + const errMsg = formatErrorMessage(err); + log.error(`heartbeat runner: targeted runOnce threw unexpectedly: ${errMsg}`, { + error: errMsg, + }); + advanceAgentSchedule(targetAgent, now); + return { status: "failed", reason: errMsg }; } - scheduleNext(); - return res.status === "ran" ? { status: "ran", durationMs: Date.now() - startedAt } : res; - } catch (err) { - const errMsg = formatErrorMessage(err); - log.error(`heartbeat runner: targeted runOnce threw unexpectedly: ${errMsg}`, { - error: errMsg, - }); - advanceAgentSchedule(targetAgent, now); - scheduleNext(); - return { status: "failed", reason: errMsg }; - } - } - - for (const agent of state.agents.values()) { - if (isInterval && now < agent.nextDueMs) { - continue; } - let res: HeartbeatRunResult; - try { - res = await runOnce({ - cfg: state.cfg, - agentId: agent.agentId, - heartbeat: agent.heartbeat, - reason, - deps: { runtime: state.runtime }, - }); - } catch (err) { - // If runOnce throws (e.g. during session compaction), we must still - // advance the timer and call scheduleNext so heartbeats keep firing. - const errMsg = formatErrorMessage(err); - log.error(`heartbeat runner: runOnce threw unexpectedly: ${errMsg}`, { error: errMsg }); - advanceAgentSchedule(agent, now); - continue; - } - if (res.status === "skipped" && res.reason === "requests-in-flight") { - // Do not advance the schedule — the main lane is busy and the wake - // layer will retry shortly (DEFAULT_RETRY_MS = 1 s). Calling - // scheduleNext() here would register a 0 ms timer that races with - // the wake layer's 1 s retry and wins, bypassing the cooldown. - return res; - } - if (res.status !== "skipped" || res.reason !== "disabled") { - advanceAgentSchedule(agent, now); - } - if (res.status === "ran") { - ran = true; - } - } + for (const agent of state.agents.values()) { + if (isInterval && now < agent.nextDueMs) { + continue; + } - scheduleNext(); - if (ran) { - return { status: "ran", durationMs: Date.now() - startedAt }; + let res: HeartbeatRunResult; + try { + res = await runOnce({ + cfg: state.cfg, + agentId: agent.agentId, + heartbeat: agent.heartbeat, + reason, + deps: { runtime: state.runtime }, + }); + } catch (err) { + // If runOnce throws (e.g. during session compaction), we must still + // advance the timer and call scheduleNext so heartbeats keep firing. + const errMsg = formatErrorMessage(err); + log.error(`heartbeat runner: runOnce threw unexpectedly: ${errMsg}`, { error: errMsg }); + advanceAgentSchedule(agent, now); + continue; + } + if (res.status === "skipped" && res.reason === "requests-in-flight") { + // Do not advance the schedule — the main lane is busy and the wake + // layer will retry shortly (DEFAULT_RETRY_MS = 1 s). Calling + // scheduleNext() here would register a 0 ms timer that races with + // the wake layer's 1 s retry and wins, bypassing the cooldown. + return res; + } + if (res.status !== "skipped" || res.reason !== "disabled") { + advanceAgentSchedule(agent, now); + } + if (res.status === "ran") { + ran = true; + } + } + + if (ran) { + return { status: "ran", durationMs: Date.now() - startedAt }; + } + return { status: "skipped", reason: isInterval ? "not-due" : "disabled" }; + } finally { + // Always re-arm the timer regardless of how the run exits. + // This is the critical fix for #45772: without this, any unhandled + // rejection or unexpected early exit permanently kills the heartbeat + // timer because scheduleNext() is never called. + scheduleNext(); } - return { status: "skipped", reason: isInterval ? "not-due" : "disabled" }; }; const wakeHandler: HeartbeatWakeHandler = async (params) =>