fix(cron): recompute external reload repairs from edit time
This commit is contained in:
parent
44e912eccd
commit
576d0dbcd9
@ -79,6 +79,57 @@ describe("forceReload repairs externally changed cron schedules", () => {
|
|||||||
expect(persistedJob?.state?.nextRunAtMs).toBe(correctedNextRunAtMs);
|
expect(persistedJob?.state?.nextRunAtMs).toBe(correctedNextRunAtMs);
|
||||||
});
|
});
|
||||||
|
|
||||||
|
it("recomputes from updatedAtMs so delayed reload keeps newly earlier slots due", async () => {
|
||||||
|
const store = await makeStorePath();
|
||||||
|
const nowMs = Date.parse("2026-03-19T12:10:00.000Z");
|
||||||
|
const initialUpdatedAtMs = Date.parse("2026-03-19T12:00:00.000Z");
|
||||||
|
const editedAtMs = Date.parse("2026-03-19T12:01:00.000Z");
|
||||||
|
const jobId = "external-schedule-change-delayed-observe";
|
||||||
|
|
||||||
|
const createJob = (params: { expr: string; updatedAtMs: number }): CronJob => ({
|
||||||
|
id: jobId,
|
||||||
|
name: "external schedule delayed observe",
|
||||||
|
enabled: true,
|
||||||
|
createdAtMs: Date.parse("2026-03-18T00:30:00.000Z"),
|
||||||
|
updatedAtMs: params.updatedAtMs,
|
||||||
|
schedule: { kind: "cron", expr: params.expr, tz: "UTC", staggerMs: 0 },
|
||||||
|
sessionTarget: "main",
|
||||||
|
wakeMode: "next-heartbeat",
|
||||||
|
payload: { kind: "systemEvent", text: "tick" },
|
||||||
|
state: {
|
||||||
|
nextRunAtMs: Date.parse("2026-03-20T00:30:00.000Z"),
|
||||||
|
},
|
||||||
|
});
|
||||||
|
|
||||||
|
await writeCronStoreSnapshot({
|
||||||
|
storePath: store.storePath,
|
||||||
|
jobs: [createJob({ expr: "30 23 * * *", updatedAtMs: initialUpdatedAtMs })],
|
||||||
|
});
|
||||||
|
|
||||||
|
const state = createCronServiceState({
|
||||||
|
cronEnabled: true,
|
||||||
|
storePath: store.storePath,
|
||||||
|
log: noopLogger,
|
||||||
|
nowMs: () => nowMs,
|
||||||
|
enqueueSystemEvent: vi.fn(),
|
||||||
|
requestHeartbeatNow: vi.fn(),
|
||||||
|
runIsolatedAgentJob: vi.fn(async () => ({ status: "ok" as const })),
|
||||||
|
});
|
||||||
|
|
||||||
|
await ensureLoaded(state, { skipRecompute: true });
|
||||||
|
|
||||||
|
await writeCronStoreSnapshot({
|
||||||
|
storePath: store.storePath,
|
||||||
|
jobs: [createJob({ expr: "* * * * *", updatedAtMs: editedAtMs })],
|
||||||
|
});
|
||||||
|
|
||||||
|
await ensureLoaded(state, { forceReload: true, skipRecompute: true });
|
||||||
|
|
||||||
|
const reloaded = state.store?.jobs.find((job) => job.id === jobId);
|
||||||
|
expect(reloaded?.state.nextRunAtMs).toBeLessThan(nowMs);
|
||||||
|
expect(reloaded?.state.nextRunAtMs).toBe(Date.parse("2026-03-19T12:02:00.000Z"));
|
||||||
|
});
|
||||||
|
|
||||||
it("records schedule errors instead of aborting reload when an external edit is invalid", async () => {
|
it("records schedule errors instead of aborting reload when an external edit is invalid", async () => {
|
||||||
const store = await makeStorePath();
|
const store = await makeStorePath();
|
||||||
const nowMs = Date.parse("2026-03-19T01:44:00.000Z");
|
const nowMs = Date.parse("2026-03-19T01:44:00.000Z");
|
||||||
|
|||||||
@ -15,6 +15,25 @@ async function getFileMtimeMs(path: string): Promise<number | null> {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
function resolveExternalRepairComputeBaseMs(params: {
|
||||||
|
nowMs: number;
|
||||||
|
reloadedUpdatedAtMs: number;
|
||||||
|
previousUpdatedAtMs: number;
|
||||||
|
}): number {
|
||||||
|
const { nowMs, reloadedUpdatedAtMs, previousUpdatedAtMs } = params;
|
||||||
|
if (!Number.isFinite(reloadedUpdatedAtMs)) {
|
||||||
|
return nowMs;
|
||||||
|
}
|
||||||
|
const normalizedReloadedUpdatedAtMs = Math.max(0, Math.floor(reloadedUpdatedAtMs));
|
||||||
|
const normalizedPreviousUpdatedAtMs = Number.isFinite(previousUpdatedAtMs)
|
||||||
|
? Math.max(0, Math.floor(previousUpdatedAtMs))
|
||||||
|
: Number.NEGATIVE_INFINITY;
|
||||||
|
if (normalizedReloadedUpdatedAtMs <= normalizedPreviousUpdatedAtMs) {
|
||||||
|
return nowMs;
|
||||||
|
}
|
||||||
|
return Math.min(nowMs, normalizedReloadedUpdatedAtMs);
|
||||||
|
}
|
||||||
|
|
||||||
function repairNextRunsAfterExternalReload(params: {
|
function repairNextRunsAfterExternalReload(params: {
|
||||||
state: CronServiceState;
|
state: CronServiceState;
|
||||||
previousJobs: CronJob[] | undefined;
|
previousJobs: CronJob[] | undefined;
|
||||||
@ -50,9 +69,14 @@ function repairNextRunsAfterExternalReload(params: {
|
|||||||
}
|
}
|
||||||
|
|
||||||
skipRecomputeJobIds.delete(job.id);
|
skipRecomputeJobIds.delete(job.id);
|
||||||
|
const computeBaseMs = resolveExternalRepairComputeBaseMs({
|
||||||
|
nowMs: now,
|
||||||
|
reloadedUpdatedAtMs: job.updatedAtMs,
|
||||||
|
previousUpdatedAtMs: previous.updatedAtMs,
|
||||||
|
});
|
||||||
let nextRunAtMs: number | undefined;
|
let nextRunAtMs: number | undefined;
|
||||||
try {
|
try {
|
||||||
nextRunAtMs = job.enabled ? computeJobNextRunAtMs(job, now) : undefined;
|
nextRunAtMs = job.enabled ? computeJobNextRunAtMs(job, computeBaseMs) : undefined;
|
||||||
if (job.state.scheduleErrorCount !== undefined) {
|
if (job.state.scheduleErrorCount !== undefined) {
|
||||||
job.state.scheduleErrorCount = undefined;
|
job.state.scheduleErrorCount = undefined;
|
||||||
changed = true;
|
changed = true;
|
||||||
@ -78,6 +102,7 @@ function repairNextRunsAfterExternalReload(params: {
|
|||||||
jobId: job.id,
|
jobId: job.id,
|
||||||
scheduleChanged,
|
scheduleChanged,
|
||||||
enabledChanged,
|
enabledChanged,
|
||||||
|
computeBaseMs,
|
||||||
nextRunAtMs: job.state.nextRunAtMs,
|
nextRunAtMs: job.state.nextRunAtMs,
|
||||||
},
|
},
|
||||||
"cron: repaired nextRunAtMs after external reload",
|
"cron: repaired nextRunAtMs after external reload",
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user