fix(cron): avoid synthetic reruns after reload-repair defer
This commit is contained in:
parent
576d0dbcd9
commit
4aabf034eb
@ -22,7 +22,11 @@ import {
|
||||
createNoopLogger,
|
||||
createRunningCronServiceState,
|
||||
} from "./service.test-harness.js";
|
||||
import { computeJobNextRunAtMs, recomputeNextRunsForMaintenance } from "./service/jobs.js";
|
||||
import {
|
||||
computeJobNextRunAtMs,
|
||||
nextWakeAtMs,
|
||||
recomputeNextRunsForMaintenance,
|
||||
} from "./service/jobs.js";
|
||||
import { enqueueRun, run } from "./service/ops.js";
|
||||
import { createCronServiceState, type CronEvent } from "./service/state.js";
|
||||
import {
|
||||
@ -1780,11 +1784,13 @@ describe("Cron issue regressions", () => {
|
||||
});
|
||||
|
||||
expect(job.state.scheduleErrorCount).toBe(1);
|
||||
expect(state.skipNextReloadRepairRecomputeJobIds?.has(job.id)).toBe(false);
|
||||
expect(state.skipNextReloadRepairRecomputeJobIds?.has(job.id)).toBe(true);
|
||||
expect(nextWakeAtMs(state)).toBe(endedAt + 2_000);
|
||||
|
||||
recomputeNextRunsForMaintenance(state);
|
||||
expect(job.state.scheduleErrorCount).toBe(1);
|
||||
expect(state.skipNextReloadRepairRecomputeJobIds?.has(job.id)).toBe(false);
|
||||
expect(nextWakeAtMs(state)).toBe(endedAt + 2_000);
|
||||
});
|
||||
|
||||
it("keeps a future wake when apply skips immediate recompute after reload schedule error", () => {
|
||||
@ -1819,12 +1825,14 @@ describe("Cron issue regressions", () => {
|
||||
});
|
||||
|
||||
expect(job.state.scheduleErrorCount).toBe(1);
|
||||
expect(job.state.nextRunAtMs).toBe(endedAt + 30_000);
|
||||
expect(state.skipNextReloadRepairRecomputeJobIds?.has(job.id)).toBe(false);
|
||||
expect(job.state.nextRunAtMs).toBeUndefined();
|
||||
expect(state.skipNextReloadRepairRecomputeJobIds?.has(job.id)).toBe(true);
|
||||
expect(nextWakeAtMs(state)).toBe(endedAt + 2_000);
|
||||
|
||||
recomputeNextRunsForMaintenance(state);
|
||||
expect(job.state.scheduleErrorCount).toBe(1);
|
||||
expect(state.skipNextReloadRepairRecomputeJobIds?.has(job.id)).toBe(false);
|
||||
expect(nextWakeAtMs(state)).toBe(endedAt + 2_000);
|
||||
});
|
||||
|
||||
it("force run preserves 'every' anchor while recording manual lastRunAtMs", () => {
|
||||
|
||||
@ -33,6 +33,7 @@ import {
|
||||
import type { CronServiceState } from "./state.js";
|
||||
|
||||
const STUCK_RUN_MS = 2 * 60 * 60 * 1000;
|
||||
const MISSING_NEXT_RUN_WAKE_MS = 2_000;
|
||||
const STAGGER_OFFSET_CACHE_MAX = 4096;
|
||||
const staggerOffsetCache = new Map<string, number>();
|
||||
|
||||
@ -526,18 +527,32 @@ export function recomputeNextRunsForMaintenance(
|
||||
|
||||
export function nextWakeAtMs(state: CronServiceState) {
|
||||
const jobs = state.store?.jobs ?? [];
|
||||
const enabled = jobs.filter((j) => j.enabled && isFiniteTimestamp(j.state.nextRunAtMs));
|
||||
if (enabled.length === 0) {
|
||||
return undefined;
|
||||
let minEnabledNextRunAtMs: number | undefined;
|
||||
let hasEnabledMissingNextRun = false;
|
||||
|
||||
for (const job of jobs) {
|
||||
if (!job.enabled) {
|
||||
continue;
|
||||
}
|
||||
const nextRunAtMs = job.state.nextRunAtMs;
|
||||
if (isFiniteTimestamp(nextRunAtMs)) {
|
||||
minEnabledNextRunAtMs =
|
||||
minEnabledNextRunAtMs === undefined
|
||||
? nextRunAtMs
|
||||
: Math.min(minEnabledNextRunAtMs, nextRunAtMs);
|
||||
continue;
|
||||
}
|
||||
hasEnabledMissingNextRun = true;
|
||||
}
|
||||
const first = enabled[0]?.state.nextRunAtMs;
|
||||
if (!isFiniteTimestamp(first)) {
|
||||
return undefined;
|
||||
|
||||
if (!hasEnabledMissingNextRun) {
|
||||
return minEnabledNextRunAtMs;
|
||||
}
|
||||
return enabled.reduce((min, j) => {
|
||||
const next = j.state.nextRunAtMs;
|
||||
return isFiniteTimestamp(next) ? Math.min(min, next) : min;
|
||||
}, first);
|
||||
|
||||
const wakeForMissingNextRunAtMs = state.deps.nowMs() + MISSING_NEXT_RUN_WAKE_MS;
|
||||
return minEnabledNextRunAtMs === undefined
|
||||
? wakeForMissingNextRunAtMs
|
||||
: Math.min(minEnabledNextRunAtMs, wakeForMissingNextRunAtMs);
|
||||
}
|
||||
|
||||
export function createJob(state: CronServiceState, input: CronJobCreate): CronJob {
|
||||
|
||||
@ -15,7 +15,7 @@ import type {
|
||||
import {
|
||||
computeJobPreviousRunAtMs,
|
||||
computeJobNextRunAtMs,
|
||||
consumeSkipNextReloadRepairRecompute,
|
||||
hasSkipNextReloadRepairRecompute,
|
||||
nextWakeAtMs,
|
||||
removeJobById,
|
||||
recomputeNextRunsForMaintenance,
|
||||
@ -370,7 +370,7 @@ export function applyJobResult(
|
||||
|
||||
const shouldDelete =
|
||||
job.schedule.kind === "at" && job.deleteAfterRun === true && result.status === "ok";
|
||||
const skipImmediateScheduleRecompute = consumeSkipNextReloadRepairRecompute(state, job.id);
|
||||
const skipImmediateScheduleRecompute = hasSkipNextReloadRepairRecompute(state, job.id);
|
||||
|
||||
if (!shouldDelete) {
|
||||
if (job.schedule.kind === "at") {
|
||||
@ -445,10 +445,6 @@ export function applyJobResult(
|
||||
},
|
||||
"cron: applying error backoff",
|
||||
);
|
||||
} else {
|
||||
// Keep a future wake so we don't stall when the one-shot skip marker
|
||||
// defers immediate schedule recompute after reload repair.
|
||||
job.state.nextRunAtMs = result.endedAt + backoff;
|
||||
}
|
||||
} else if (job.enabled) {
|
||||
if (!skipImmediateScheduleRecompute) {
|
||||
@ -475,10 +471,6 @@ export function applyJobResult(
|
||||
} else {
|
||||
job.state.nextRunAtMs = naturalNext;
|
||||
}
|
||||
} else if (job.state.nextRunAtMs === undefined) {
|
||||
// Keep timer progress when immediate recompute is deferred by the
|
||||
// reload-repair skip marker.
|
||||
job.state.nextRunAtMs = result.endedAt + MIN_REFIRE_GAP_MS;
|
||||
}
|
||||
} else {
|
||||
job.state.nextRunAtMs = undefined;
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user