fix(cron): tighten cold-load repair and wake fallback heuristics

This commit is contained in:
create 2026-03-20 19:34:36 +08:00
parent f5eb99df3f
commit a7217bfcf7
4 changed files with 157 additions and 12 deletions

View File

@ -179,6 +179,93 @@ describe("forceReload repairs externally changed cron schedules", () => {
expect(persistedJob?.state?.nextRunAtMs).toBe(Date.parse("2026-03-19T12:02:00.000Z"));
});
it("repairs overdue stale nextRunAtMs on first load when edit happened later", async () => {
const store = await makeStorePath();
const nowMs = Date.parse("2026-03-19T12:10:00.000Z");
const staleDueNextRunAtMs = Date.parse("2026-03-19T12:00:00.000Z");
const editedAtMs = Date.parse("2026-03-19T12:05:00.000Z");
const jobId = "external-schedule-change-cold-load-overdue";
const createJob = (): CronJob => ({
id: jobId,
name: "external schedule cold load overdue repair",
enabled: true,
createdAtMs: Date.parse("2026-03-18T00:30:00.000Z"),
updatedAtMs: editedAtMs,
schedule: { kind: "cron", expr: "30 23 * * *", tz: "UTC", staggerMs: 0 },
sessionTarget: "main",
wakeMode: "next-heartbeat",
payload: { kind: "systemEvent", text: "tick" },
state: {
nextRunAtMs: staleDueNextRunAtMs,
},
});
await writeCronStoreSnapshot({
storePath: store.storePath,
jobs: [createJob()],
});
const state = createCronServiceState({
cronEnabled: true,
storePath: store.storePath,
log: noopLogger,
nowMs: () => nowMs,
enqueueSystemEvent: vi.fn(),
requestHeartbeatNow: vi.fn(),
runIsolatedAgentJob: vi.fn(async () => ({ status: "ok" as const })),
});
await ensureLoaded(state, { skipRecompute: true });
const reloaded = state.store?.jobs.find((job) => job.id === jobId);
expect(reloaded?.state.nextRunAtMs).toBe(Date.parse("2026-03-19T23:30:00.000Z"));
});
it("repairs cold-load stale nextRunAtMs even when consecutiveErrors is set", async () => {
const store = await makeStorePath();
const nowMs = Date.parse("2026-03-19T12:10:00.000Z");
const editedAtMs = Date.parse("2026-03-19T12:01:00.000Z");
const jobId = "external-schedule-change-cold-load-consecutive-errors";
const staleNextRunAtMs = Date.parse("2026-03-20T00:30:00.000Z");
const createJob = (): CronJob => ({
id: jobId,
name: "external schedule cold load with consecutiveErrors",
enabled: true,
createdAtMs: Date.parse("2026-03-18T00:30:00.000Z"),
updatedAtMs: editedAtMs,
schedule: { kind: "cron", expr: "* * * * *", tz: "UTC", staggerMs: 0 },
sessionTarget: "main",
wakeMode: "next-heartbeat",
payload: { kind: "systemEvent", text: "tick" },
state: {
nextRunAtMs: staleNextRunAtMs,
consecutiveErrors: 2,
},
});
await writeCronStoreSnapshot({
storePath: store.storePath,
jobs: [createJob()],
});
const state = createCronServiceState({
cronEnabled: true,
storePath: store.storePath,
log: noopLogger,
nowMs: () => nowMs,
enqueueSystemEvent: vi.fn(),
requestHeartbeatNow: vi.fn(),
runIsolatedAgentJob: vi.fn(async () => ({ status: "ok" as const })),
});
await ensureLoaded(state, { skipRecompute: true });
const reloaded = state.store?.jobs.find((job) => job.id === jobId);
expect(reloaded?.state.nextRunAtMs).toBe(Date.parse("2026-03-19T12:02:00.000Z"));
});
it("records schedule errors instead of aborting reload when an external edit is invalid", async () => {
const store = await makeStorePath();
const nowMs = Date.parse("2026-03-19T01:44:00.000Z");

View File

@ -1835,6 +1835,53 @@ describe("Cron issue regressions", () => {
expect(nextWakeAtMs(state)).toBe(endedAt + 2_000);
});
it("does not arm 2s wake for malformed every schedules with non-repairable missing nextRun", () => {
const nowMs = Date.parse("2026-03-02T12:20:00.000Z");
const job = createIsolatedRegressionJob({
id: "missing-nextrun-malformed-every",
name: "missing-nextrun-malformed-every",
scheduledAt: nowMs,
schedule: { kind: "every", everyMs: Number.NaN, anchorMs: nowMs - 60_000 },
payload: { kind: "agentTurn", message: "ping" },
state: {
nextRunAtMs: undefined,
},
});
const state = createRunningCronServiceState({
storePath: "/tmp/cron-missing-nextrun-malformed-every.json",
log: noopLogger as never,
nowMs: () => nowMs,
jobs: [job],
});
expect(nextWakeAtMs(state)).toBeUndefined();
});
it("does not arm 2s wake for exhausted one-shot jobs with missing nextRun", () => {
const nowMs = Date.parse("2026-03-02T12:22:00.000Z");
const atMs = nowMs - 60_000;
const job = createIsolatedRegressionJob({
id: "missing-nextrun-exhausted-at",
name: "missing-nextrun-exhausted-at",
scheduledAt: nowMs,
schedule: { kind: "at", at: new Date(atMs).toISOString() },
payload: { kind: "agentTurn", message: "ping" },
state: {
nextRunAtMs: undefined,
lastStatus: "ok",
lastRunAtMs: nowMs - 30_000,
},
});
const state = createRunningCronServiceState({
storePath: "/tmp/cron-missing-nextrun-exhausted-at.json",
log: noopLogger as never,
nowMs: () => nowMs,
jobs: [job],
});
expect(nextWakeAtMs(state)).toBeUndefined();
});
it("force run preserves 'every' anchor while recording manual lastRunAtMs", () => {
const nowMs = Date.now();
const everyMs = 24 * 60 * 60 * 1_000;

View File

@ -528,7 +528,8 @@ export function recomputeNextRunsForMaintenance(
export function nextWakeAtMs(state: CronServiceState) {
const jobs = state.store?.jobs ?? [];
let minEnabledNextRunAtMs: number | undefined;
let hasEnabledMissingNextRun = false;
let hasEnabledRepairableMissingNextRun = false;
const nowMs = state.deps.nowMs();
for (const job of jobs) {
if (!job.enabled) {
@ -542,14 +543,27 @@ export function nextWakeAtMs(state: CronServiceState) {
: Math.min(minEnabledNextRunAtMs, nextRunAtMs);
continue;
}
hasEnabledMissingNextRun = true;
// Only wake for missing nextRun values that can be repaired by recompute.
// Non-repairable malformed schedules (e.g. invalid every/at payloads)
// should not keep the scheduler in a perpetual 2s poll loop.
if ((job.state.scheduleErrorCount ?? 0) > 0) {
hasEnabledRepairableMissingNextRun = true;
continue;
}
try {
if (computeJobNextRunAtMs(job, nowMs) !== undefined) {
hasEnabledRepairableMissingNextRun = true;
}
} catch {
hasEnabledRepairableMissingNextRun = true;
}
}
if (!hasEnabledMissingNextRun) {
if (!hasEnabledRepairableMissingNextRun) {
return minEnabledNextRunAtMs;
}
const wakeForMissingNextRunAtMs = state.deps.nowMs() + MISSING_NEXT_RUN_WAKE_MS;
const wakeForMissingNextRunAtMs = nowMs + MISSING_NEXT_RUN_WAKE_MS;
return minEnabledNextRunAtMs === undefined
? wakeForMissingNextRunAtMs
: Math.min(minEnabledNextRunAtMs, wakeForMissingNextRunAtMs);

View File

@ -39,9 +39,6 @@ function shouldRepairColdLoadNextRun(params: { job: CronJob; nowMs: number }): b
if (!job.enabled) {
return false;
}
if ((job.state.consecutiveErrors ?? 0) > 0) {
return false;
}
if (typeof job.updatedAtMs !== "number" || !Number.isFinite(job.updatedAtMs)) {
return false;
}
@ -49,13 +46,13 @@ function shouldRepairColdLoadNextRun(params: { job: CronJob; nowMs: number }): b
if (typeof persistedNextRunAtMs !== "number" || !Number.isFinite(persistedNextRunAtMs)) {
return false;
}
// Cold-load repair is only for stale future schedules edited while the
// gateway was offline. Already-due timestamps should be preserved so they can
// execute on the next tick.
const normalizedUpdatedAtMs = Math.max(0, Math.floor(job.updatedAtMs));
// If a schedule edit happened after the persisted slot, the slot is stale
// even when it is already overdue at startup.
if (persistedNextRunAtMs <= nowMs) {
return false;
return normalizedUpdatedAtMs > persistedNextRunAtMs;
}
const computeBaseMs = Math.min(nowMs, Math.max(0, Math.floor(job.updatedAtMs)));
const computeBaseMs = Math.min(nowMs, normalizedUpdatedAtMs);
return computeBaseMs < persistedNextRunAtMs;
}