diff --git a/src/cron/service.issue-regressions.test.ts b/src/cron/service.issue-regressions.test.ts index c8078672b97..3a1f291943c 100644 --- a/src/cron/service.issue-regressions.test.ts +++ b/src/cron/service.issue-regressions.test.ts @@ -22,7 +22,11 @@ import { createNoopLogger, createRunningCronServiceState, } from "./service.test-harness.js"; -import { computeJobNextRunAtMs, recomputeNextRunsForMaintenance } from "./service/jobs.js"; +import { + computeJobNextRunAtMs, + nextWakeAtMs, + recomputeNextRunsForMaintenance, +} from "./service/jobs.js"; import { enqueueRun, run } from "./service/ops.js"; import { createCronServiceState, type CronEvent } from "./service/state.js"; import { @@ -1780,11 +1784,13 @@ describe("Cron issue regressions", () => { }); expect(job.state.scheduleErrorCount).toBe(1); - expect(state.skipNextReloadRepairRecomputeJobIds?.has(job.id)).toBe(false); + expect(state.skipNextReloadRepairRecomputeJobIds?.has(job.id)).toBe(true); + expect(nextWakeAtMs(state)).toBe(endedAt + 2_000); recomputeNextRunsForMaintenance(state); expect(job.state.scheduleErrorCount).toBe(1); expect(state.skipNextReloadRepairRecomputeJobIds?.has(job.id)).toBe(false); + expect(nextWakeAtMs(state)).toBe(endedAt + 2_000); }); it("keeps a future wake when apply skips immediate recompute after reload schedule error", () => { @@ -1819,12 +1825,14 @@ describe("Cron issue regressions", () => { }); expect(job.state.scheduleErrorCount).toBe(1); - expect(job.state.nextRunAtMs).toBe(endedAt + 30_000); - expect(state.skipNextReloadRepairRecomputeJobIds?.has(job.id)).toBe(false); + expect(job.state.nextRunAtMs).toBeUndefined(); + expect(state.skipNextReloadRepairRecomputeJobIds?.has(job.id)).toBe(true); + expect(nextWakeAtMs(state)).toBe(endedAt + 2_000); recomputeNextRunsForMaintenance(state); expect(job.state.scheduleErrorCount).toBe(1); expect(state.skipNextReloadRepairRecomputeJobIds?.has(job.id)).toBe(false); + expect(nextWakeAtMs(state)).toBe(endedAt + 2_000); }); it("force run preserves 'every' anchor while recording manual lastRunAtMs", () => { diff --git a/src/cron/service/jobs.ts b/src/cron/service/jobs.ts index 184f136d98c..6ba919bcd1c 100644 --- a/src/cron/service/jobs.ts +++ b/src/cron/service/jobs.ts @@ -33,6 +33,7 @@ import { import type { CronServiceState } from "./state.js"; const STUCK_RUN_MS = 2 * 60 * 60 * 1000; +const MISSING_NEXT_RUN_WAKE_MS = 2_000; const STAGGER_OFFSET_CACHE_MAX = 4096; const staggerOffsetCache = new Map(); @@ -526,18 +527,32 @@ export function recomputeNextRunsForMaintenance( export function nextWakeAtMs(state: CronServiceState) { const jobs = state.store?.jobs ?? []; - const enabled = jobs.filter((j) => j.enabled && isFiniteTimestamp(j.state.nextRunAtMs)); - if (enabled.length === 0) { - return undefined; + let minEnabledNextRunAtMs: number | undefined; + let hasEnabledMissingNextRun = false; + + for (const job of jobs) { + if (!job.enabled) { + continue; + } + const nextRunAtMs = job.state.nextRunAtMs; + if (isFiniteTimestamp(nextRunAtMs)) { + minEnabledNextRunAtMs = + minEnabledNextRunAtMs === undefined + ? nextRunAtMs + : Math.min(minEnabledNextRunAtMs, nextRunAtMs); + continue; + } + hasEnabledMissingNextRun = true; } - const first = enabled[0]?.state.nextRunAtMs; - if (!isFiniteTimestamp(first)) { - return undefined; + + if (!hasEnabledMissingNextRun) { + return minEnabledNextRunAtMs; } - return enabled.reduce((min, j) => { - const next = j.state.nextRunAtMs; - return isFiniteTimestamp(next) ? Math.min(min, next) : min; - }, first); + + const wakeForMissingNextRunAtMs = state.deps.nowMs() + MISSING_NEXT_RUN_WAKE_MS; + return minEnabledNextRunAtMs === undefined + ? wakeForMissingNextRunAtMs + : Math.min(minEnabledNextRunAtMs, wakeForMissingNextRunAtMs); } export function createJob(state: CronServiceState, input: CronJobCreate): CronJob { diff --git a/src/cron/service/timer.ts b/src/cron/service/timer.ts index 315be8a4fc8..66662de5616 100644 --- a/src/cron/service/timer.ts +++ b/src/cron/service/timer.ts @@ -15,7 +15,7 @@ import type { import { computeJobPreviousRunAtMs, computeJobNextRunAtMs, - consumeSkipNextReloadRepairRecompute, + hasSkipNextReloadRepairRecompute, nextWakeAtMs, removeJobById, recomputeNextRunsForMaintenance, @@ -370,7 +370,7 @@ export function applyJobResult( const shouldDelete = job.schedule.kind === "at" && job.deleteAfterRun === true && result.status === "ok"; - const skipImmediateScheduleRecompute = consumeSkipNextReloadRepairRecompute(state, job.id); + const skipImmediateScheduleRecompute = hasSkipNextReloadRepairRecompute(state, job.id); if (!shouldDelete) { if (job.schedule.kind === "at") { @@ -445,10 +445,6 @@ export function applyJobResult( }, "cron: applying error backoff", ); - } else { - // Keep a future wake so we don't stall when the one-shot skip marker - // defers immediate schedule recompute after reload repair. - job.state.nextRunAtMs = result.endedAt + backoff; } } else if (job.enabled) { if (!skipImmediateScheduleRecompute) { @@ -475,10 +471,6 @@ export function applyJobResult( } else { job.state.nextRunAtMs = naturalNext; } - } else if (job.state.nextRunAtMs === undefined) { - // Keep timer progress when immediate recompute is deferred by the - // reload-repair skip marker. - job.state.nextRunAtMs = result.endedAt + MIN_REFIRE_GAP_MS; } } else { job.state.nextRunAtMs = undefined;