fix(heartbeat): move scheduleNext to finally block to prevent timer death (#45772)
The heartbeat timer permanently stops after 1-2 triggers because scheduleNext() is called on each individual return path in run(). If an unhandled rejection or silent error exits the function before reaching any of those calls, the timer is never re-armed and heartbeats stop permanently. This fix wraps the run() body in try/finally with scheduleNext() in the finally block, guaranteeing the timer is always re-armed regardless of how the function exits. The three early guard returns (stopped, disabled, no agents) remain above the try/finally since they legitimately should not reschedule. Fixes #45772
This commit is contained in:
parent
b9e08a6839
commit
77d97f2493
@ -283,4 +283,34 @@ describe("startHeartbeatRunner", () => {
|
||||
|
||||
runner.stop();
|
||||
});
|
||||
|
||||
it("re-arms timer after runOnce rejects with an unhandled promise rejection (#45772)", async () => {
|
||||
useFakeHeartbeatTime();
|
||||
|
||||
let callCount = 0;
|
||||
const runSpy = vi.fn().mockImplementation(async () => {
|
||||
callCount++;
|
||||
if (callCount === 1) {
|
||||
// Simulate an unhandled rejection that escapes the inner try/catch
|
||||
// (e.g. from session resolution or preflight code outside the
|
||||
// existing error handling). Before the fix, this would permanently
|
||||
// kill the heartbeat timer.
|
||||
return Promise.reject(new Error("unexpected async failure"));
|
||||
}
|
||||
return { status: "ran", durationMs: 1 };
|
||||
});
|
||||
|
||||
const runner = startDefaultRunner(runSpy);
|
||||
|
||||
// First heartbeat fires and rejects
|
||||
await vi.advanceTimersByTimeAsync(30 * 60_000 + 1_000);
|
||||
expect(runSpy).toHaveBeenCalledTimes(1);
|
||||
|
||||
// Second heartbeat MUST still fire — the timer must have been re-armed
|
||||
// despite the rejection. This is the core assertion for #45772.
|
||||
await vi.advanceTimersByTimeAsync(30 * 60_000 + 1_000);
|
||||
expect(runSpy).toHaveBeenCalledTimes(2);
|
||||
|
||||
runner.stop();
|
||||
});
|
||||
});
|
||||
|
||||
@ -1079,80 +1079,87 @@ export function startHeartbeatRunner(opts: {
|
||||
const now = startedAt;
|
||||
let ran = false;
|
||||
|
||||
if (requestedSessionKey || requestedAgentId) {
|
||||
const targetAgentId = requestedAgentId ?? resolveAgentIdFromSessionKey(requestedSessionKey);
|
||||
const targetAgent = state.agents.get(targetAgentId);
|
||||
if (!targetAgent) {
|
||||
scheduleNext();
|
||||
return { status: "skipped", reason: "disabled" };
|
||||
}
|
||||
try {
|
||||
const res = await runOnce({
|
||||
cfg: state.cfg,
|
||||
agentId: targetAgent.agentId,
|
||||
heartbeat: targetAgent.heartbeat,
|
||||
reason,
|
||||
sessionKey: requestedSessionKey,
|
||||
deps: { runtime: state.runtime },
|
||||
});
|
||||
if (res.status !== "skipped" || res.reason !== "disabled") {
|
||||
advanceAgentSchedule(targetAgent, now);
|
||||
// Wrap the entire run in try/finally so scheduleNext() is ALWAYS called,
|
||||
// even if an unexpected rejection or silent error exits the function early.
|
||||
// This prevents the heartbeat timer from permanently dying (see #45772).
|
||||
try {
|
||||
if (requestedSessionKey || requestedAgentId) {
|
||||
const targetAgentId = requestedAgentId ?? resolveAgentIdFromSessionKey(requestedSessionKey);
|
||||
const targetAgent = state.agents.get(targetAgentId);
|
||||
if (!targetAgent) {
|
||||
return { status: "skipped", reason: "disabled" };
|
||||
}
|
||||
try {
|
||||
const res = await runOnce({
|
||||
cfg: state.cfg,
|
||||
agentId: targetAgent.agentId,
|
||||
heartbeat: targetAgent.heartbeat,
|
||||
reason,
|
||||
sessionKey: requestedSessionKey,
|
||||
deps: { runtime: state.runtime },
|
||||
});
|
||||
if (res.status !== "skipped" || res.reason !== "disabled") {
|
||||
advanceAgentSchedule(targetAgent, now);
|
||||
}
|
||||
return res.status === "ran" ? { status: "ran", durationMs: Date.now() - startedAt } : res;
|
||||
} catch (err) {
|
||||
const errMsg = formatErrorMessage(err);
|
||||
log.error(`heartbeat runner: targeted runOnce threw unexpectedly: ${errMsg}`, {
|
||||
error: errMsg,
|
||||
});
|
||||
advanceAgentSchedule(targetAgent, now);
|
||||
return { status: "failed", reason: errMsg };
|
||||
}
|
||||
scheduleNext();
|
||||
return res.status === "ran" ? { status: "ran", durationMs: Date.now() - startedAt } : res;
|
||||
} catch (err) {
|
||||
const errMsg = formatErrorMessage(err);
|
||||
log.error(`heartbeat runner: targeted runOnce threw unexpectedly: ${errMsg}`, {
|
||||
error: errMsg,
|
||||
});
|
||||
advanceAgentSchedule(targetAgent, now);
|
||||
scheduleNext();
|
||||
return { status: "failed", reason: errMsg };
|
||||
}
|
||||
}
|
||||
|
||||
for (const agent of state.agents.values()) {
|
||||
if (isInterval && now < agent.nextDueMs) {
|
||||
continue;
|
||||
}
|
||||
|
||||
let res: HeartbeatRunResult;
|
||||
try {
|
||||
res = await runOnce({
|
||||
cfg: state.cfg,
|
||||
agentId: agent.agentId,
|
||||
heartbeat: agent.heartbeat,
|
||||
reason,
|
||||
deps: { runtime: state.runtime },
|
||||
});
|
||||
} catch (err) {
|
||||
// If runOnce throws (e.g. during session compaction), we must still
|
||||
// advance the timer and call scheduleNext so heartbeats keep firing.
|
||||
const errMsg = formatErrorMessage(err);
|
||||
log.error(`heartbeat runner: runOnce threw unexpectedly: ${errMsg}`, { error: errMsg });
|
||||
advanceAgentSchedule(agent, now);
|
||||
continue;
|
||||
}
|
||||
if (res.status === "skipped" && res.reason === "requests-in-flight") {
|
||||
// Do not advance the schedule — the main lane is busy and the wake
|
||||
// layer will retry shortly (DEFAULT_RETRY_MS = 1 s). Calling
|
||||
// scheduleNext() here would register a 0 ms timer that races with
|
||||
// the wake layer's 1 s retry and wins, bypassing the cooldown.
|
||||
return res;
|
||||
}
|
||||
if (res.status !== "skipped" || res.reason !== "disabled") {
|
||||
advanceAgentSchedule(agent, now);
|
||||
}
|
||||
if (res.status === "ran") {
|
||||
ran = true;
|
||||
}
|
||||
}
|
||||
for (const agent of state.agents.values()) {
|
||||
if (isInterval && now < agent.nextDueMs) {
|
||||
continue;
|
||||
}
|
||||
|
||||
scheduleNext();
|
||||
if (ran) {
|
||||
return { status: "ran", durationMs: Date.now() - startedAt };
|
||||
let res: HeartbeatRunResult;
|
||||
try {
|
||||
res = await runOnce({
|
||||
cfg: state.cfg,
|
||||
agentId: agent.agentId,
|
||||
heartbeat: agent.heartbeat,
|
||||
reason,
|
||||
deps: { runtime: state.runtime },
|
||||
});
|
||||
} catch (err) {
|
||||
// If runOnce throws (e.g. during session compaction), we must still
|
||||
// advance the timer and call scheduleNext so heartbeats keep firing.
|
||||
const errMsg = formatErrorMessage(err);
|
||||
log.error(`heartbeat runner: runOnce threw unexpectedly: ${errMsg}`, { error: errMsg });
|
||||
advanceAgentSchedule(agent, now);
|
||||
continue;
|
||||
}
|
||||
if (res.status === "skipped" && res.reason === "requests-in-flight") {
|
||||
// Do not advance the schedule — the main lane is busy and the wake
|
||||
// layer will retry shortly (DEFAULT_RETRY_MS = 1 s). Calling
|
||||
// scheduleNext() here would register a 0 ms timer that races with
|
||||
// the wake layer's 1 s retry and wins, bypassing the cooldown.
|
||||
return res;
|
||||
}
|
||||
if (res.status !== "skipped" || res.reason !== "disabled") {
|
||||
advanceAgentSchedule(agent, now);
|
||||
}
|
||||
if (res.status === "ran") {
|
||||
ran = true;
|
||||
}
|
||||
}
|
||||
|
||||
if (ran) {
|
||||
return { status: "ran", durationMs: Date.now() - startedAt };
|
||||
}
|
||||
return { status: "skipped", reason: isInterval ? "not-due" : "disabled" };
|
||||
} finally {
|
||||
// Always re-arm the timer regardless of how the run exits.
|
||||
// This is the critical fix for #45772: without this, any unhandled
|
||||
// rejection or unexpected early exit permanently kills the heartbeat
|
||||
// timer because scheduleNext() is never called.
|
||||
scheduleNext();
|
||||
}
|
||||
return { status: "skipped", reason: isInterval ? "not-due" : "disabled" };
|
||||
};
|
||||
|
||||
const wakeHandler: HeartbeatWakeHandler = async (params) =>
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user