diff --git a/src/infra/heartbeat-runner.scheduler.test.ts b/src/infra/heartbeat-runner.scheduler.test.ts index 10313dc33ad..9df23ab4fad 100644 --- a/src/infra/heartbeat-runner.scheduler.test.ts +++ b/src/infra/heartbeat-runner.scheduler.test.ts @@ -283,4 +283,40 @@ describe("startHeartbeatRunner", () => { runner.stop(); }); + + it("re-arms timer when runOnce throws — inner catch continues, outer finally reschedules (#45772)", async () => { + useFakeHeartbeatTime(); + + // The inner per-agent try/catch swallows runOnce errors via `continue`. + // After the loop completes, the outer `finally` calls scheduleNext(). + // This ensures the timer is always re-armed even after an error — the + // critical fix for #45772 where the requests-in-flight early return (and + // other short-circuit exits) left scheduleNext() unreachable. + // + // Note: the inner catch IS what handles the thrown error here. The outer + // finally still runs on loop completion, re-arming the timer. This covers + // the same class of silent-death bugs as #45772 (any early exit or error + // path that previously bypassed the scheduleNext() call at the end of run). + let callCount = 0; + const runSpy = vi.fn().mockImplementation(async () => { + callCount++; + if (callCount === 1) { + throw new Error("simulated runOnce failure"); + } + return { status: "ran", durationMs: 1 }; + }); + + const runner = startDefaultRunner(runSpy); + + // First heartbeat fires and throws — inner catch handles it, outer + // finally re-arms the timer. + await vi.advanceTimersByTimeAsync(30 * 60_000 + 1_000); + expect(runSpy).toHaveBeenCalledTimes(1); + + // Second heartbeat MUST still fire — the timer must have been re-armed. + await vi.advanceTimersByTimeAsync(30 * 60_000 + 1_000); + expect(runSpy).toHaveBeenCalledTimes(2); + + runner.stop(); + }); }); diff --git a/src/infra/heartbeat-runner.ts b/src/infra/heartbeat-runner.ts index 5e6ddcf07cf..ac9f3db19b5 100644 --- a/src/infra/heartbeat-runner.ts +++ b/src/infra/heartbeat-runner.ts @@ -1079,80 +1079,98 @@ export function startHeartbeatRunner(opts: { const now = startedAt; let ran = false; - if (requestedSessionKey || requestedAgentId) { - const targetAgentId = requestedAgentId ?? resolveAgentIdFromSessionKey(requestedSessionKey); - const targetAgent = state.agents.get(targetAgentId); - if (!targetAgent) { - scheduleNext(); - return { status: "skipped", reason: "disabled" }; - } - try { - const res = await runOnce({ - cfg: state.cfg, - agentId: targetAgent.agentId, - heartbeat: targetAgent.heartbeat, - reason, - sessionKey: requestedSessionKey, - deps: { runtime: state.runtime }, - }); - if (res.status !== "skipped" || res.reason !== "disabled") { - advanceAgentSchedule(targetAgent, now); + // Track whether the wake layer handles rescheduling (requests-in-flight). + // When true, we must NOT call scheduleNext() in the finally block because + // it would register a 0 ms timer that races with and defeats the wake + // layer's 1 s retry cooldown. + let skipFinalSchedule = false; + + // Wrap the entire run in try/finally so scheduleNext() is ALWAYS called, + // even if an unexpected rejection or silent error exits the function early. + // This prevents the heartbeat timer from permanently dying (see #45772). + try { + if (requestedSessionKey || requestedAgentId) { + const targetAgentId = requestedAgentId ?? resolveAgentIdFromSessionKey(requestedSessionKey); + const targetAgent = state.agents.get(targetAgentId); + if (!targetAgent) { + return { status: "skipped", reason: "disabled" }; } + try { + const res = await runOnce({ + cfg: state.cfg, + agentId: targetAgent.agentId, + heartbeat: targetAgent.heartbeat, + reason, + sessionKey: requestedSessionKey, + deps: { runtime: state.runtime }, + }); + if (res.status !== "skipped" || res.reason !== "disabled") { + advanceAgentSchedule(targetAgent, now); + } + return res.status === "ran" ? { status: "ran", durationMs: Date.now() - startedAt } : res; + } catch (err) { + const errMsg = formatErrorMessage(err); + log.error(`heartbeat runner: targeted runOnce threw unexpectedly: ${errMsg}`, { + error: errMsg, + }); + advanceAgentSchedule(targetAgent, now); + return { status: "failed", reason: errMsg }; + } + } + + for (const agent of state.agents.values()) { + if (isInterval && now < agent.nextDueMs) { + continue; + } + + let res: HeartbeatRunResult; + try { + res = await runOnce({ + cfg: state.cfg, + agentId: agent.agentId, + heartbeat: agent.heartbeat, + reason, + deps: { runtime: state.runtime }, + }); + } catch (err) { + // If runOnce throws (e.g. during session compaction), advance the + // agent's schedule so the next interval is computed correctly. + const errMsg = formatErrorMessage(err); + log.error(`heartbeat runner: runOnce threw unexpectedly: ${errMsg}`, { error: errMsg }); + advanceAgentSchedule(agent, now); + continue; + } + if (res.status === "skipped" && res.reason === "requests-in-flight") { + // Do not advance the schedule — the main lane is busy and the wake + // layer will retry shortly (DEFAULT_RETRY_MS = 1 s). We must also + // suppress scheduleNext() in the finally block because it would + // register a 0 ms timer that races with the wake layer's retry + // cooldown and defeats the backoff. + skipFinalSchedule = true; + return res; + } + if (res.status !== "skipped" || res.reason !== "disabled") { + advanceAgentSchedule(agent, now); + } + if (res.status === "ran") { + ran = true; + } + } + + if (ran) { + return { status: "ran", durationMs: Date.now() - startedAt }; + } + return { status: "skipped", reason: isInterval ? "not-due" : "disabled" }; + } finally { + // Always re-arm the timer regardless of how the run exits — unless the + // wake layer is handling rescheduling (requests-in-flight path). + // This is the critical fix for #45772: without this, any unhandled + // rejection or unexpected early exit permanently kills the heartbeat + // timer because scheduleNext() is never called. + if (!skipFinalSchedule) { scheduleNext(); - return res.status === "ran" ? { status: "ran", durationMs: Date.now() - startedAt } : res; - } catch (err) { - const errMsg = formatErrorMessage(err); - log.error(`heartbeat runner: targeted runOnce threw unexpectedly: ${errMsg}`, { - error: errMsg, - }); - advanceAgentSchedule(targetAgent, now); - scheduleNext(); - return { status: "failed", reason: errMsg }; } } - - for (const agent of state.agents.values()) { - if (isInterval && now < agent.nextDueMs) { - continue; - } - - let res: HeartbeatRunResult; - try { - res = await runOnce({ - cfg: state.cfg, - agentId: agent.agentId, - heartbeat: agent.heartbeat, - reason, - deps: { runtime: state.runtime }, - }); - } catch (err) { - // If runOnce throws (e.g. during session compaction), we must still - // advance the timer and call scheduleNext so heartbeats keep firing. - const errMsg = formatErrorMessage(err); - log.error(`heartbeat runner: runOnce threw unexpectedly: ${errMsg}`, { error: errMsg }); - advanceAgentSchedule(agent, now); - continue; - } - if (res.status === "skipped" && res.reason === "requests-in-flight") { - // Do not advance the schedule — the main lane is busy and the wake - // layer will retry shortly (DEFAULT_RETRY_MS = 1 s). Calling - // scheduleNext() here would register a 0 ms timer that races with - // the wake layer's 1 s retry and wins, bypassing the cooldown. - return res; - } - if (res.status !== "skipped" || res.reason !== "disabled") { - advanceAgentSchedule(agent, now); - } - if (res.status === "ran") { - ran = true; - } - } - - scheduleNext(); - if (ran) { - return { status: "ran", durationMs: Date.now() - startedAt }; - } - return { status: "skipped", reason: isInterval ? "not-due" : "disabled" }; }; const wakeHandler: HeartbeatWakeHandler = async (params) =>