fix(cron): avoid double-counting external reload schedule errors
This commit is contained in:
parent
d1623edb18
commit
ce43e39dcb
@ -1,6 +1,7 @@
|
||||
import fs from "node:fs/promises";
|
||||
import { describe, expect, it, vi } from "vitest";
|
||||
import { setupCronServiceSuite, writeCronStoreSnapshot } from "./service.test-harness.js";
|
||||
import { recomputeNextRuns, recomputeNextRunsForMaintenance } from "./service/jobs.js";
|
||||
import { createCronServiceState } from "./service/state.js";
|
||||
import { ensureLoaded } from "./service/store.js";
|
||||
import type { CronJob } from "./types.js";
|
||||
@ -142,4 +143,108 @@ describe("forceReload repairs externally changed cron schedules", () => {
|
||||
expect(persistedJob?.state?.lastError).toMatch(/^schedule error:/);
|
||||
expect(persistedJob?.state?.nextRunAtMs).toBeUndefined();
|
||||
});
|
||||
|
||||
it("does not double-count a reload schedule error during the immediate full recompute", async () => {
|
||||
const store = await makeStorePath();
|
||||
const nowMs = Date.parse("2026-03-19T01:44:00.000Z");
|
||||
const jobId = "external-invalid-schedule-full-recompute";
|
||||
|
||||
const createJob = (expr: string): CronJob => ({
|
||||
id: jobId,
|
||||
name: "external invalid schedule full recompute",
|
||||
enabled: true,
|
||||
createdAtMs: Date.parse("2026-03-18T00:30:00.000Z"),
|
||||
updatedAtMs: Date.parse("2026-03-19T01:44:00.000Z"),
|
||||
schedule: { kind: "cron", expr, tz: "Asia/Shanghai", staggerMs: 0 },
|
||||
sessionTarget: "main",
|
||||
wakeMode: "next-heartbeat",
|
||||
payload: { kind: "systemEvent", text: "tick" },
|
||||
state: {
|
||||
nextRunAtMs: Date.parse("2026-03-20T00:30:00.000Z"),
|
||||
},
|
||||
});
|
||||
|
||||
await writeCronStoreSnapshot({
|
||||
storePath: store.storePath,
|
||||
jobs: [createJob("30 8 * * *")],
|
||||
});
|
||||
|
||||
const state = createCronServiceState({
|
||||
cronEnabled: true,
|
||||
storePath: store.storePath,
|
||||
log: noopLogger,
|
||||
nowMs: () => nowMs,
|
||||
enqueueSystemEvent: vi.fn(),
|
||||
requestHeartbeatNow: vi.fn(),
|
||||
runIsolatedAgentJob: vi.fn(async () => ({ status: "ok" as const })),
|
||||
});
|
||||
|
||||
await ensureLoaded(state, { skipRecompute: true });
|
||||
|
||||
await writeCronStoreSnapshot({
|
||||
storePath: store.storePath,
|
||||
jobs: [createJob("not a valid cron")],
|
||||
});
|
||||
|
||||
await ensureLoaded(state, { forceReload: true, skipRecompute: true });
|
||||
expect(state.store?.jobs[0]?.state.scheduleErrorCount).toBe(1);
|
||||
|
||||
recomputeNextRuns(state);
|
||||
expect(state.store?.jobs[0]?.state.scheduleErrorCount).toBe(1);
|
||||
|
||||
recomputeNextRuns(state);
|
||||
expect(state.store?.jobs[0]?.state.scheduleErrorCount).toBe(2);
|
||||
});
|
||||
|
||||
it("does not double-count a reload schedule error during immediate maintenance recompute", async () => {
|
||||
const store = await makeStorePath();
|
||||
const nowMs = Date.parse("2026-03-19T01:44:00.000Z");
|
||||
const jobId = "external-invalid-schedule-maintenance";
|
||||
|
||||
const createJob = (expr: string): CronJob => ({
|
||||
id: jobId,
|
||||
name: "external invalid schedule maintenance",
|
||||
enabled: true,
|
||||
createdAtMs: Date.parse("2026-03-18T00:30:00.000Z"),
|
||||
updatedAtMs: Date.parse("2026-03-19T01:44:00.000Z"),
|
||||
schedule: { kind: "cron", expr, tz: "Asia/Shanghai", staggerMs: 0 },
|
||||
sessionTarget: "main",
|
||||
wakeMode: "next-heartbeat",
|
||||
payload: { kind: "systemEvent", text: "tick" },
|
||||
state: {
|
||||
nextRunAtMs: Date.parse("2026-03-20T00:30:00.000Z"),
|
||||
},
|
||||
});
|
||||
|
||||
await writeCronStoreSnapshot({
|
||||
storePath: store.storePath,
|
||||
jobs: [createJob("30 8 * * *")],
|
||||
});
|
||||
|
||||
const state = createCronServiceState({
|
||||
cronEnabled: true,
|
||||
storePath: store.storePath,
|
||||
log: noopLogger,
|
||||
nowMs: () => nowMs,
|
||||
enqueueSystemEvent: vi.fn(),
|
||||
requestHeartbeatNow: vi.fn(),
|
||||
runIsolatedAgentJob: vi.fn(async () => ({ status: "ok" as const })),
|
||||
});
|
||||
|
||||
await ensureLoaded(state, { skipRecompute: true });
|
||||
|
||||
await writeCronStoreSnapshot({
|
||||
storePath: store.storePath,
|
||||
jobs: [createJob("not a valid cron")],
|
||||
});
|
||||
|
||||
await ensureLoaded(state, { forceReload: true, skipRecompute: true });
|
||||
expect(state.store?.jobs[0]?.state.scheduleErrorCount).toBe(1);
|
||||
|
||||
recomputeNextRunsForMaintenance(state);
|
||||
expect(state.store?.jobs[0]?.state.scheduleErrorCount).toBe(1);
|
||||
|
||||
recomputeNextRunsForMaintenance(state);
|
||||
expect(state.store?.jobs[0]?.state.scheduleErrorCount).toBe(2);
|
||||
});
|
||||
});
|
||||
|
||||
@ -413,7 +413,19 @@ function walkSchedulableJobs(
|
||||
return changed;
|
||||
}
|
||||
|
||||
function consumeSkipNextReloadRepairRecompute(state: CronServiceState, jobId: string): boolean {
|
||||
const pending = state.skipNextReloadRepairRecomputeJobIds;
|
||||
if (!pending?.has(jobId)) {
|
||||
return false;
|
||||
}
|
||||
pending.delete(jobId);
|
||||
return true;
|
||||
}
|
||||
|
||||
function recomputeJobNextRunAtMs(params: { state: CronServiceState; job: CronJob; nowMs: number }) {
|
||||
if (consumeSkipNextReloadRepairRecompute(params.state, params.job.id)) {
|
||||
return false;
|
||||
}
|
||||
let changed = false;
|
||||
try {
|
||||
const newNext = computeJobNextRunAtMs(params.job, params.nowMs);
|
||||
|
||||
@ -127,6 +127,7 @@ export type CronServiceState = {
|
||||
warnedDisabled: boolean;
|
||||
storeLoadedAtMs: number | null;
|
||||
storeFileMtimeMs: number | null;
|
||||
skipNextReloadRepairRecomputeJobIds?: Set<string>;
|
||||
};
|
||||
|
||||
export function createCronServiceState(deps: CronServiceDeps): CronServiceState {
|
||||
@ -139,6 +140,7 @@ export function createCronServiceState(deps: CronServiceDeps): CronServiceState
|
||||
warnedDisabled: false,
|
||||
storeLoadedAtMs: null,
|
||||
storeFileMtimeMs: null,
|
||||
skipNextReloadRepairRecomputeJobIds: new Set(),
|
||||
};
|
||||
}
|
||||
|
||||
|
||||
@ -30,11 +30,17 @@ function schedulesEqual(a: CronJob["schedule"], b: CronJob["schedule"]): boolean
|
||||
return false;
|
||||
}
|
||||
|
||||
function getSkipNextReloadRepairRecomputeJobIds(state: CronServiceState): Set<string> {
|
||||
return (state.skipNextReloadRepairRecomputeJobIds ??= new Set());
|
||||
}
|
||||
|
||||
function repairNextRunsAfterExternalReload(params: {
|
||||
state: CronServiceState;
|
||||
previousJobs: CronJob[] | undefined;
|
||||
}): boolean {
|
||||
const { state, previousJobs } = params;
|
||||
const skipRecomputeJobIds = getSkipNextReloadRepairRecomputeJobIds(state);
|
||||
skipRecomputeJobIds.clear();
|
||||
if (!state.store || !previousJobs?.length) {
|
||||
return false;
|
||||
}
|
||||
@ -55,6 +61,7 @@ function repairNextRunsAfterExternalReload(params: {
|
||||
continue;
|
||||
}
|
||||
|
||||
skipRecomputeJobIds.delete(job.id);
|
||||
let nextRunAtMs: number | undefined;
|
||||
try {
|
||||
nextRunAtMs = job.enabled ? computeJobNextRunAtMs(job, now) : undefined;
|
||||
@ -66,6 +73,7 @@ function repairNextRunsAfterExternalReload(params: {
|
||||
if (recordScheduleComputeError({ state, job, err })) {
|
||||
changed = true;
|
||||
}
|
||||
skipRecomputeJobIds.add(job.id);
|
||||
continue;
|
||||
}
|
||||
if (job.state.nextRunAtMs !== nextRunAtMs) {
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user