Merge 7f22f798691d8a2c124d403fff535f7774796946 into 598f1826d8b2bc969aace2c6459824737667218c
This commit is contained in:
commit
559a208ea4
@ -283,4 +283,40 @@ describe("startHeartbeatRunner", () => {
|
||||
|
||||
runner.stop();
|
||||
});
|
||||
|
||||
it("re-arms timer when runOnce throws — inner catch continues, outer finally reschedules (#45772)", async () => {
|
||||
useFakeHeartbeatTime();
|
||||
|
||||
// The inner per-agent try/catch swallows runOnce errors via `continue`.
|
||||
// After the loop completes, the outer `finally` calls scheduleNext().
|
||||
// This ensures the timer is always re-armed even after an error — the
|
||||
// critical fix for #45772 where the requests-in-flight early return (and
|
||||
// other short-circuit exits) left scheduleNext() unreachable.
|
||||
//
|
||||
// Note: the inner catch IS what handles the thrown error here. The outer
|
||||
// finally still runs on loop completion, re-arming the timer. This covers
|
||||
// the same class of silent-death bugs as #45772 (any early exit or error
|
||||
// path that previously bypassed the scheduleNext() call at the end of run).
|
||||
let callCount = 0;
|
||||
const runSpy = vi.fn().mockImplementation(async () => {
|
||||
callCount++;
|
||||
if (callCount === 1) {
|
||||
throw new Error("simulated runOnce failure");
|
||||
}
|
||||
return { status: "ran", durationMs: 1 };
|
||||
});
|
||||
|
||||
const runner = startDefaultRunner(runSpy);
|
||||
|
||||
// First heartbeat fires and throws — inner catch handles it, outer
|
||||
// finally re-arms the timer.
|
||||
await vi.advanceTimersByTimeAsync(30 * 60_000 + 1_000);
|
||||
expect(runSpy).toHaveBeenCalledTimes(1);
|
||||
|
||||
// Second heartbeat MUST still fire — the timer must have been re-armed.
|
||||
await vi.advanceTimersByTimeAsync(30 * 60_000 + 1_000);
|
||||
expect(runSpy).toHaveBeenCalledTimes(2);
|
||||
|
||||
runner.stop();
|
||||
});
|
||||
});
|
||||
|
||||
@ -1079,80 +1079,98 @@ export function startHeartbeatRunner(opts: {
|
||||
const now = startedAt;
|
||||
let ran = false;
|
||||
|
||||
if (requestedSessionKey || requestedAgentId) {
|
||||
const targetAgentId = requestedAgentId ?? resolveAgentIdFromSessionKey(requestedSessionKey);
|
||||
const targetAgent = state.agents.get(targetAgentId);
|
||||
if (!targetAgent) {
|
||||
scheduleNext();
|
||||
return { status: "skipped", reason: "disabled" };
|
||||
}
|
||||
try {
|
||||
const res = await runOnce({
|
||||
cfg: state.cfg,
|
||||
agentId: targetAgent.agentId,
|
||||
heartbeat: targetAgent.heartbeat,
|
||||
reason,
|
||||
sessionKey: requestedSessionKey,
|
||||
deps: { runtime: state.runtime },
|
||||
});
|
||||
if (res.status !== "skipped" || res.reason !== "disabled") {
|
||||
advanceAgentSchedule(targetAgent, now);
|
||||
// Track whether the wake layer handles rescheduling (requests-in-flight).
|
||||
// When true, we must NOT call scheduleNext() in the finally block because
|
||||
// it would register a 0 ms timer that races with and defeats the wake
|
||||
// layer's 1 s retry cooldown.
|
||||
let skipFinalSchedule = false;
|
||||
|
||||
// Wrap the entire run in try/finally so scheduleNext() is ALWAYS called,
|
||||
// even if an unexpected rejection or silent error exits the function early.
|
||||
// This prevents the heartbeat timer from permanently dying (see #45772).
|
||||
try {
|
||||
if (requestedSessionKey || requestedAgentId) {
|
||||
const targetAgentId = requestedAgentId ?? resolveAgentIdFromSessionKey(requestedSessionKey);
|
||||
const targetAgent = state.agents.get(targetAgentId);
|
||||
if (!targetAgent) {
|
||||
return { status: "skipped", reason: "disabled" };
|
||||
}
|
||||
try {
|
||||
const res = await runOnce({
|
||||
cfg: state.cfg,
|
||||
agentId: targetAgent.agentId,
|
||||
heartbeat: targetAgent.heartbeat,
|
||||
reason,
|
||||
sessionKey: requestedSessionKey,
|
||||
deps: { runtime: state.runtime },
|
||||
});
|
||||
if (res.status !== "skipped" || res.reason !== "disabled") {
|
||||
advanceAgentSchedule(targetAgent, now);
|
||||
}
|
||||
return res.status === "ran" ? { status: "ran", durationMs: Date.now() - startedAt } : res;
|
||||
} catch (err) {
|
||||
const errMsg = formatErrorMessage(err);
|
||||
log.error(`heartbeat runner: targeted runOnce threw unexpectedly: ${errMsg}`, {
|
||||
error: errMsg,
|
||||
});
|
||||
advanceAgentSchedule(targetAgent, now);
|
||||
return { status: "failed", reason: errMsg };
|
||||
}
|
||||
}
|
||||
|
||||
for (const agent of state.agents.values()) {
|
||||
if (isInterval && now < agent.nextDueMs) {
|
||||
continue;
|
||||
}
|
||||
|
||||
let res: HeartbeatRunResult;
|
||||
try {
|
||||
res = await runOnce({
|
||||
cfg: state.cfg,
|
||||
agentId: agent.agentId,
|
||||
heartbeat: agent.heartbeat,
|
||||
reason,
|
||||
deps: { runtime: state.runtime },
|
||||
});
|
||||
} catch (err) {
|
||||
// If runOnce throws (e.g. during session compaction), advance the
|
||||
// agent's schedule so the next interval is computed correctly.
|
||||
const errMsg = formatErrorMessage(err);
|
||||
log.error(`heartbeat runner: runOnce threw unexpectedly: ${errMsg}`, { error: errMsg });
|
||||
advanceAgentSchedule(agent, now);
|
||||
continue;
|
||||
}
|
||||
if (res.status === "skipped" && res.reason === "requests-in-flight") {
|
||||
// Do not advance the schedule — the main lane is busy and the wake
|
||||
// layer will retry shortly (DEFAULT_RETRY_MS = 1 s). We must also
|
||||
// suppress scheduleNext() in the finally block because it would
|
||||
// register a 0 ms timer that races with the wake layer's retry
|
||||
// cooldown and defeats the backoff.
|
||||
skipFinalSchedule = true;
|
||||
return res;
|
||||
}
|
||||
if (res.status !== "skipped" || res.reason !== "disabled") {
|
||||
advanceAgentSchedule(agent, now);
|
||||
}
|
||||
if (res.status === "ran") {
|
||||
ran = true;
|
||||
}
|
||||
}
|
||||
|
||||
if (ran) {
|
||||
return { status: "ran", durationMs: Date.now() - startedAt };
|
||||
}
|
||||
return { status: "skipped", reason: isInterval ? "not-due" : "disabled" };
|
||||
} finally {
|
||||
// Always re-arm the timer regardless of how the run exits — unless the
|
||||
// wake layer is handling rescheduling (requests-in-flight path).
|
||||
// This is the critical fix for #45772: without this, any unhandled
|
||||
// rejection or unexpected early exit permanently kills the heartbeat
|
||||
// timer because scheduleNext() is never called.
|
||||
if (!skipFinalSchedule) {
|
||||
scheduleNext();
|
||||
return res.status === "ran" ? { status: "ran", durationMs: Date.now() - startedAt } : res;
|
||||
} catch (err) {
|
||||
const errMsg = formatErrorMessage(err);
|
||||
log.error(`heartbeat runner: targeted runOnce threw unexpectedly: ${errMsg}`, {
|
||||
error: errMsg,
|
||||
});
|
||||
advanceAgentSchedule(targetAgent, now);
|
||||
scheduleNext();
|
||||
return { status: "failed", reason: errMsg };
|
||||
}
|
||||
}
|
||||
|
||||
for (const agent of state.agents.values()) {
|
||||
if (isInterval && now < agent.nextDueMs) {
|
||||
continue;
|
||||
}
|
||||
|
||||
let res: HeartbeatRunResult;
|
||||
try {
|
||||
res = await runOnce({
|
||||
cfg: state.cfg,
|
||||
agentId: agent.agentId,
|
||||
heartbeat: agent.heartbeat,
|
||||
reason,
|
||||
deps: { runtime: state.runtime },
|
||||
});
|
||||
} catch (err) {
|
||||
// If runOnce throws (e.g. during session compaction), we must still
|
||||
// advance the timer and call scheduleNext so heartbeats keep firing.
|
||||
const errMsg = formatErrorMessage(err);
|
||||
log.error(`heartbeat runner: runOnce threw unexpectedly: ${errMsg}`, { error: errMsg });
|
||||
advanceAgentSchedule(agent, now);
|
||||
continue;
|
||||
}
|
||||
if (res.status === "skipped" && res.reason === "requests-in-flight") {
|
||||
// Do not advance the schedule — the main lane is busy and the wake
|
||||
// layer will retry shortly (DEFAULT_RETRY_MS = 1 s). Calling
|
||||
// scheduleNext() here would register a 0 ms timer that races with
|
||||
// the wake layer's 1 s retry and wins, bypassing the cooldown.
|
||||
return res;
|
||||
}
|
||||
if (res.status !== "skipped" || res.reason !== "disabled") {
|
||||
advanceAgentSchedule(agent, now);
|
||||
}
|
||||
if (res.status === "ran") {
|
||||
ran = true;
|
||||
}
|
||||
}
|
||||
|
||||
scheduleNext();
|
||||
if (ran) {
|
||||
return { status: "ran", durationMs: Date.now() - startedAt };
|
||||
}
|
||||
return { status: "skipped", reason: isInterval ? "not-due" : "disabled" };
|
||||
};
|
||||
|
||||
const wakeHandler: HeartbeatWakeHandler = async (params) =>
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user