fix(cron): preserve reload repair error state

This commit is contained in:
create 2026-03-21 02:09:07 +08:00
parent 83844adec2
commit 3cb6564ac2
5 changed files with 127 additions and 38 deletions

View File

@ -27,7 +27,7 @@ import {
nextWakeAtMs,
recomputeNextRunsForMaintenance,
} from "./service/jobs.js";
import { enqueueRun, run } from "./service/ops.js";
import { enqueueRun, list, run, status } from "./service/ops.js";
import { createCronServiceState, type CronEvent } from "./service/state.js";
import {
DEFAULT_JOB_TIMEOUT_MS,
@ -1857,6 +1857,40 @@ describe("Cron issue regressions", () => {
expect(nextWakeAtMs(state)).toBeUndefined();
});
it("does not consume reload skip markers during read-only status/list maintenance", async () => {
const nowMs = Date.parse("2026-03-02T12:25:00.000Z");
const job = createIsolatedRegressionJob({
id: "read-only-skip-marker",
name: "read-only-skip-marker",
scheduledAt: nowMs,
schedule: { kind: "cron", expr: "0 7 * * *", tz: "Invalid/Timezone" },
payload: { kind: "agentTurn", message: "ping" },
state: {
nextRunAtMs: undefined,
scheduleErrorCount: 1,
lastError: "schedule error: previous",
},
});
const state = createRunningCronServiceState({
storePath: "/tmp/cron-read-only-skip-marker.json",
log: noopLogger as never,
nowMs: () => nowMs,
jobs: [job],
});
state.skipNextReloadRepairRecomputeJobIds = new Set([job.id]);
const currentStatus = await status(state);
expect(currentStatus.nextWakeAtMs).toBe(nowMs + 2_000);
expect(state.skipNextReloadRepairRecomputeJobIds.has(job.id)).toBe(true);
const jobs = await list(state, { includeDisabled: true });
expect(jobs).toHaveLength(1);
expect(state.skipNextReloadRepairRecomputeJobIds.has(job.id)).toBe(true);
recomputeNextRunsForMaintenance(state);
expect(state.skipNextReloadRepairRecomputeJobIds.has(job.id)).toBe(false);
});
it("does not arm 2s wake for exhausted one-shot jobs with missing nextRun", () => {
const nowMs = Date.parse("2026-03-02T12:22:00.000Z");
const atMs = nowMs - 60_000;

View File

@ -206,4 +206,44 @@ describe("cron schedule error isolation", () => {
expect(badJob.state.lastError).not.toContain("Cannot read properties of undefined");
expect(badJob.state.scheduleErrorCount).toBe(1);
});
it("keeps malformed every schedules on the schedule-error path", () => {
const badJob = createJob({
id: "bad-every",
name: "Bad Every",
schedule: { kind: "every", everyMs: Number.NaN },
state: {
nextRunAtMs: undefined,
scheduleErrorCount: 1,
lastError: "schedule error: previous",
},
});
const state = createMockState([badJob]);
recomputeNextRuns(state);
expect(badJob.state.nextRunAtMs).toBeUndefined();
expect(badJob.state.scheduleErrorCount).toBe(2);
expect(badJob.state.lastError).toContain("invalid every schedule");
});
it("keeps malformed at schedules on the schedule-error path", () => {
const badJob = createJob({
id: "bad-at",
name: "Bad At",
schedule: { kind: "at", at: "not-a-timestamp" },
state: {
nextRunAtMs: undefined,
scheduleErrorCount: 1,
lastError: "schedule error: previous",
},
});
const state = createMockState([badJob]);
recomputeNextRuns(state);
expect(badJob.state.nextRunAtMs).toBeUndefined();
expect(badJob.state.scheduleErrorCount).toBe(2);
expect(badJob.state.lastError).toContain("invalid at schedule");
});
});

View File

@ -308,6 +308,22 @@ export function computeJobNextRunAtMs(job: CronJob, nowMs: number): number | und
return isFiniteTimestamp(next) ? next : undefined;
}
export function shouldTreatUndefinedNextRunAsScheduleError(job: CronJob): boolean {
if (!job.enabled) {
return false;
}
if (job.schedule.kind === "every") {
return coerceFiniteScheduleNumber(job.schedule.everyMs) === undefined;
}
if (job.schedule.kind === "at") {
return parseAbsoluteTimeMs(job.schedule.at) === null;
}
if (job.schedule.kind === "cron") {
return job.schedule.expr.trim().length === 0;
}
return false;
}
export function computeJobPreviousRunAtMs(job: CronJob, nowMs: number): number | undefined {
if (!job.enabled || job.schedule.kind !== "cron") {
return undefined;
@ -442,13 +458,35 @@ export function consumeSkipNextReloadRepairRecompute(
return true;
}
function recomputeJobNextRunAtMs(params: { state: CronServiceState; job: CronJob; nowMs: number }) {
if (consumeSkipNextReloadRepairRecompute(params.state, params.job.id)) {
function recomputeJobNextRunAtMs(params: {
state: CronServiceState;
job: CronJob;
nowMs: number;
consumeReloadRepairSkip?: boolean;
}) {
const consumeReloadRepairSkip = params.consumeReloadRepairSkip ?? true;
if (
consumeReloadRepairSkip
? consumeSkipNextReloadRepairRecompute(params.state, params.job.id)
: hasSkipNextReloadRepairRecompute(params.state, params.job.id)
) {
return false;
}
let changed = false;
try {
const newNext = computeJobNextRunAtMs(params.job, params.nowMs);
if (newNext === undefined && shouldTreatUndefinedNextRunAsScheduleError(params.job)) {
const err =
params.job.schedule.kind === "every"
? new Error("invalid every schedule: everyMs must be a finite number")
: params.job.schedule.kind === "at"
? new Error("invalid at schedule: at must be a valid absolute timestamp")
: new Error("invalid cron schedule: expr is required");
if (recordScheduleComputeError({ state: params.state, job: params.job, err })) {
changed = true;
}
return changed;
}
if (params.job.state.nextRunAtMs !== newNext) {
params.job.state.nextRunAtMs = newNext;
changed = true;
@ -492,16 +530,17 @@ export function recomputeNextRuns(state: CronServiceState): boolean {
*/
export function recomputeNextRunsForMaintenance(
state: CronServiceState,
opts?: { recomputeExpired?: boolean; nowMs?: number },
opts?: { recomputeExpired?: boolean; nowMs?: number; consumeReloadRepairSkip?: boolean },
): boolean {
const recomputeExpired = opts?.recomputeExpired ?? false;
const consumeReloadRepairSkip = opts?.consumeReloadRepairSkip ?? true;
return walkSchedulableJobs(
state,
({ job, nowMs: now }) => {
let changed = false;
if (!isFiniteTimestamp(job.state.nextRunAtMs)) {
// Missing or invalid nextRunAtMs is always repaired.
if (recomputeJobNextRunAtMs({ state, job, nowMs: now })) {
if (recomputeJobNextRunAtMs({ state, job, nowMs: now, consumeReloadRepairSkip })) {
changed = true;
}
} else if (
@ -514,7 +553,7 @@ export function recomputeNextRunsForMaintenance(
const lastRun = job.state.lastRunAtMs;
const alreadyExecutedSlot = isFiniteTimestamp(lastRun) && lastRun >= job.state.nextRunAtMs;
if (alreadyExecutedSlot) {
if (recomputeJobNextRunAtMs({ state, job, nowMs: now })) {
if (recomputeJobNextRunAtMs({ state, job, nowMs: now, consumeReloadRepairSkip })) {
changed = true;
}
}

View File

@ -115,7 +115,7 @@ async function ensureLoadedForRead(state: CronServiceState) {
}
// Use the maintenance-only version so that read-only operations never
// advance a past-due nextRunAtMs without executing the job (#16156).
const changed = recomputeNextRunsForMaintenance(state);
const changed = recomputeNextRunsForMaintenance(state, { consumeReloadRepairSkip: false });
if (changed) {
await persist(state);
}
@ -413,7 +413,7 @@ async function inspectManualRunPreflight(
// Normalize job tick state (clears stale runningAtMs markers) before
// checking if already running, so a stale marker from a crashed Phase-1
// persist does not block manual triggers for up to STUCK_RUN_MS (#17554).
recomputeNextRunsForMaintenance(state);
recomputeNextRunsForMaintenance(state, { consumeReloadRepairSkip: false });
const job = findJobOrThrow(state, id);
if (typeof job.state.runningAtMs === "number") {
return { ok: true, ran: false, reason: "already-running" as const };

View File

@ -1,10 +1,13 @@
import fs from "node:fs";
import { parseAbsoluteTimeMs } from "../parse.js";
import { coerceFiniteScheduleNumber } from "../schedule.js";
import { normalizeStoredCronJobs } from "../store-migration.js";
import { loadCronStore, saveCronStore } from "../store.js";
import type { CronJob } from "../types.js";
import { computeJobNextRunAtMs, recordScheduleComputeError, recomputeNextRuns } from "./jobs.js";
import {
computeJobNextRunAtMs,
recordScheduleComputeError,
recomputeNextRuns,
shouldTreatUndefinedNextRunAsScheduleError,
} from "./jobs.js";
import { schedulesEqual } from "./schedule-equality.js";
import type { CronServiceState } from "./state.js";
@ -77,33 +80,6 @@ function shouldRepairColdLoadNextRun(params: { job: CronJob; nowMs: number }): b
return computeBaseMs < persistedNextRunAtMs;
}
function parseAtScheduleMs(schedule: Extract<CronJob["schedule"], { kind: "at" }>): number | null {
const legacy = schedule as { at?: string; atMs?: number | string };
if (typeof legacy.atMs === "number" && Number.isFinite(legacy.atMs) && legacy.atMs > 0) {
return legacy.atMs;
}
if (typeof legacy.atMs === "string") {
return parseAbsoluteTimeMs(legacy.atMs);
}
if (typeof legacy.at === "string") {
return parseAbsoluteTimeMs(legacy.at);
}
return null;
}
function shouldTreatUndefinedNextRunAsScheduleError(job: CronJob): boolean {
if (!job.enabled) {
return false;
}
if (job.schedule.kind === "every") {
return coerceFiniteScheduleNumber(job.schedule.everyMs) === undefined;
}
if (job.schedule.kind === "at") {
return parseAtScheduleMs(job.schedule) === null;
}
return false;
}
function repairNextRunsAfterExternalReload(params: {
state: CronServiceState;
previousJobs: CronJob[] | undefined;