fix(cron): clear reload-skip markers on delete and harden loader regression test

This commit is contained in:
create 2026-03-20 17:47:32 +08:00
parent 9f9e6b7cfe
commit dbc2925451
10 changed files with 167 additions and 52 deletions

View File

@ -2,7 +2,7 @@ import fs from "node:fs/promises";
import { describe, expect, it, vi } from "vitest";
import { setupCronServiceSuite, writeCronStoreSnapshot } from "./service.test-harness.js";
import { recomputeNextRuns, recomputeNextRunsForMaintenance } from "./service/jobs.js";
import { run } from "./service/ops.js";
import { remove, run } from "./service/ops.js";
import { createCronServiceState } from "./service/state.js";
import { ensureLoaded } from "./service/store.js";
import type { CronJob } from "./types.js";
@ -456,4 +456,120 @@ describe("forceReload repairs externally changed cron schedules", () => {
expect(persistedJob?.state?.nextRunAtMs).toBeUndefined();
expect(persistedJob?.state?.lastStatus).toBe("ok");
});
it("clears reload-repair skip markers when a job is removed before same-id rebuild", async () => {
const store = await makeStorePath();
const nowMs = Date.parse("2026-03-19T01:44:00.000Z");
const jobId = "external-reload-skip-marker-id-reuse";
const createJob = (expr: string): CronJob => ({
id: jobId,
name: "external reload skip marker id reuse",
enabled: true,
createdAtMs: Date.parse("2026-03-18T00:30:00.000Z"),
updatedAtMs: Date.parse("2026-03-19T01:44:00.000Z"),
schedule: { kind: "cron", expr, tz: "UTC", staggerMs: 0 },
sessionTarget: "main",
wakeMode: "next-heartbeat",
payload: { kind: "systemEvent", text: "tick" },
state: {},
});
await writeCronStoreSnapshot({
storePath: store.storePath,
jobs: [createJob("*/15 * * * *")],
});
const state = createCronServiceState({
cronEnabled: true,
storePath: store.storePath,
log: noopLogger,
nowMs: () => nowMs,
enqueueSystemEvent: vi.fn(),
requestHeartbeatNow: vi.fn(),
runIsolatedAgentJob: vi.fn(async () => ({ status: "ok" as const })),
});
await ensureLoaded(state, { skipRecompute: true });
await writeCronStoreSnapshot({
storePath: store.storePath,
jobs: [createJob("not a valid cron")],
});
await ensureLoaded(state, { forceReload: true, skipRecompute: true });
expect(state.skipNextReloadRepairRecomputeJobIds.has(jobId)).toBe(true);
const removed = await remove(state, jobId);
expect(removed).toEqual({ ok: true, removed: true });
expect(state.skipNextReloadRepairRecomputeJobIds.has(jobId)).toBe(false);
await writeCronStoreSnapshot({
storePath: store.storePath,
jobs: [createJob("*/5 * * * *")],
});
await ensureLoaded(state, { forceReload: true, skipRecompute: true });
recomputeNextRunsForMaintenance(state);
const rebuilt = state.store?.jobs.find((job) => job.id === jobId);
expect(typeof rebuilt?.state.nextRunAtMs).toBe("number");
expect(Number.isFinite(rebuilt?.state.nextRunAtMs)).toBe(true);
expect(rebuilt?.state.scheduleErrorCount).toBeUndefined();
});
it("recomputes nextRunAtMs when external every schedule changes", async () => {
const store = await makeStorePath();
const nowMs = Date.parse("2026-03-19T01:44:00.000Z");
const jobId = "external-every-schedule-change";
const createEveryJob = (everyMs: number): CronJob => ({
id: jobId,
name: "external every schedule change",
enabled: true,
createdAtMs: Date.parse("2026-03-18T00:00:00.000Z"),
updatedAtMs: Date.parse("2026-03-19T01:44:00.000Z"),
schedule: {
kind: "every",
everyMs,
anchorMs: Date.parse("2026-03-19T00:00:00.000Z"),
},
sessionTarget: "main",
wakeMode: "next-heartbeat",
payload: { kind: "systemEvent", text: "tick" },
state: {
nextRunAtMs: Date.parse("2026-03-20T00:00:00.000Z"),
},
});
await writeCronStoreSnapshot({
storePath: store.storePath,
jobs: [createEveryJob(6 * 60_000)],
});
const state = createCronServiceState({
cronEnabled: true,
storePath: store.storePath,
log: noopLogger,
nowMs: () => nowMs,
enqueueSystemEvent: vi.fn(),
requestHeartbeatNow: vi.fn(),
runIsolatedAgentJob: vi.fn(async () => ({ status: "ok" as const })),
});
await ensureLoaded(state, { skipRecompute: true });
await writeCronStoreSnapshot({
storePath: store.storePath,
jobs: [createEveryJob(60_000)],
});
await ensureLoaded(state, { forceReload: true, skipRecompute: true });
const reloaded = state.store?.jobs.find((job) => job.id === jobId);
expect(reloaded?.schedule).toEqual({
kind: "every",
everyMs: 60_000,
anchorMs: Date.parse("2026-03-19T00:00:00.000Z"),
});
expect(reloaded?.state.nextRunAtMs).toBe(Date.parse("2026-03-19T01:44:00.000Z"));
});
});

View File

@ -385,6 +385,7 @@ function createMockState(now: number, opts?: { defaultAgentId?: string }): CronS
nowMs: () => now,
defaultAgentId: opts?.defaultAgentId,
},
skipNextReloadRepairRecomputeJobIds: new Set<string>(),
} as unknown as CronServiceState;
}

View File

@ -220,6 +220,7 @@ export function createMockCronStateForJobs(params: {
timer: null,
storeLoadedAtMs: nowMs,
storeFileMtimeMs: null,
skipNextReloadRepairRecomputeJobIds: new Set<string>(),
op: Promise.resolve(),
warnedDisabled: false,
deps: {

View File

@ -28,6 +28,11 @@ function createMockState(jobs: CronJob[]): CronServiceState {
store,
timer: null,
running: false,
op: Promise.resolve(),
warnedDisabled: false,
storeLoadedAtMs: null,
storeFileMtimeMs: null,
skipNextReloadRepairRecomputeJobIds: new Set<string>(),
} as unknown as CronServiceState;
}

View File

@ -238,6 +238,19 @@ export function findJobOrThrow(state: CronServiceState, id: string) {
return job;
}
export function removeJobById(state: CronServiceState, jobId: string): boolean {
if (!state.store) {
return false;
}
const before = state.store.jobs.length;
state.store.jobs = state.store.jobs.filter((job) => job.id !== jobId);
const removed = state.store.jobs.length !== before;
if (removed) {
state.skipNextReloadRepairRecomputeJobIds.delete(jobId);
}
return removed;
}
export function computeJobNextRunAtMs(job: CronJob, nowMs: number): number | undefined {
if (!job.enabled) {
return undefined;
@ -414,8 +427,7 @@ function walkSchedulableJobs(
}
export function hasSkipNextReloadRepairRecompute(state: CronServiceState, jobId: string): boolean {
const pending = state.skipNextReloadRepairRecomputeJobIds;
return pending?.has(jobId) === true;
return state.skipNextReloadRepairRecomputeJobIds.has(jobId);
}
export function consumeSkipNextReloadRepairRecompute(
@ -425,7 +437,7 @@ export function consumeSkipNextReloadRepairRecompute(
if (!hasSkipNextReloadRepairRecompute(state, jobId)) {
return false;
}
state.skipNextReloadRepairRecomputeJobIds?.delete(jobId);
state.skipNextReloadRepairRecomputeJobIds.delete(jobId);
return true;
}

View File

@ -9,10 +9,12 @@ import {
findJobOrThrow,
isJobDue,
nextWakeAtMs,
removeJobById,
recomputeNextRuns,
recomputeNextRunsForMaintenance,
} from "./jobs.js";
import { locked } from "./locked.js";
import { schedulesEqual } from "./schedule-equality.js";
import type { CronServiceState } from "./state.js";
import { ensureLoaded, persist, warnIfDisabled } from "./store.js";
import {
@ -48,22 +50,6 @@ export type CronListPageResult = {
nextOffset: number | null;
};
function schedulesEqual(a: CronJob["schedule"], b: CronJob["schedule"]): boolean {
if (a.kind !== b.kind) {
return false;
}
if (a.kind === "at" && b.kind === "at") {
return a.at === b.at;
}
if (a.kind === "every" && b.kind === "every") {
return a.everyMs === b.everyMs && a.anchorMs === b.anchorMs;
}
if (a.kind === "cron" && b.kind === "cron") {
return a.expr === b.expr && a.tz === b.tz && a.staggerMs === b.staggerMs;
}
return false;
}
function mergeManualRunSnapshotAfterReload(params: {
state: CronServiceState;
jobId: string;
@ -82,7 +68,7 @@ function mergeManualRunSnapshotAfterReload(params: {
return;
}
if (params.removed) {
params.state.store.jobs = params.state.store.jobs.filter((job) => job.id !== params.jobId);
removeJobById(params.state, params.jobId);
return;
}
if (!params.snapshot) {
@ -372,12 +358,10 @@ export async function remove(state: CronServiceState, id: string) {
return await locked(state, async () => {
warnIfDisabled(state, "remove");
await ensureLoaded(state);
const before = state.store?.jobs.length ?? 0;
if (!state.store) {
return { ok: false, removed: false } as const;
}
state.store.jobs = state.store.jobs.filter((j) => j.id !== id);
const removed = (state.store.jobs.length ?? 0) !== before;
const removed = removeJobById(state, id);
await persist(state);
armTimer(state);
if (removed) {
@ -554,8 +538,7 @@ async function finishPreparedManualRun(
usage: coreResult.usage,
});
if (shouldDelete && state.store) {
state.store.jobs = state.store.jobs.filter((entry) => entry.id !== job.id);
if (shouldDelete && removeJobById(state, job.id)) {
emit(state, { jobId: job.id, action: "removed" });
}

View File

@ -0,0 +1,17 @@
import type { CronJob } from "../types.js";
export function schedulesEqual(a: CronJob["schedule"], b: CronJob["schedule"]): boolean {
if (a.kind !== b.kind) {
return false;
}
if (a.kind === "at" && b.kind === "at") {
return a.at === b.at;
}
if (a.kind === "every" && b.kind === "every") {
return a.everyMs === b.everyMs && a.anchorMs === b.anchorMs;
}
if (a.kind === "cron" && b.kind === "cron") {
return a.expr === b.expr && a.tz === b.tz && a.staggerMs === b.staggerMs;
}
return false;
}

View File

@ -127,7 +127,7 @@ export type CronServiceState = {
warnedDisabled: boolean;
storeLoadedAtMs: number | null;
storeFileMtimeMs: number | null;
skipNextReloadRepairRecomputeJobIds?: Set<string>;
skipNextReloadRepairRecomputeJobIds: Set<string>;
};
export function createCronServiceState(deps: CronServiceDeps): CronServiceState {

View File

@ -3,6 +3,7 @@ import { normalizeStoredCronJobs } from "../store-migration.js";
import { loadCronStore, saveCronStore } from "../store.js";
import type { CronJob } from "../types.js";
import { computeJobNextRunAtMs, recordScheduleComputeError, recomputeNextRuns } from "./jobs.js";
import { schedulesEqual } from "./schedule-equality.js";
import type { CronServiceState } from "./state.js";
async function getFileMtimeMs(path: string): Promise<number | null> {
@ -14,32 +15,12 @@ async function getFileMtimeMs(path: string): Promise<number | null> {
}
}
function schedulesEqual(a: CronJob["schedule"], b: CronJob["schedule"]): boolean {
if (a.kind !== b.kind) {
return false;
}
if (a.kind === "at" && b.kind === "at") {
return a.at === b.at;
}
if (a.kind === "every" && b.kind === "every") {
return a.everyMs === b.everyMs && a.anchorMs === b.anchorMs;
}
if (a.kind === "cron" && b.kind === "cron") {
return a.expr === b.expr && a.tz === b.tz && a.staggerMs === b.staggerMs;
}
return false;
}
function getSkipNextReloadRepairRecomputeJobIds(state: CronServiceState): Set<string> {
return (state.skipNextReloadRepairRecomputeJobIds ??= new Set());
}
function repairNextRunsAfterExternalReload(params: {
state: CronServiceState;
previousJobs: CronJob[] | undefined;
}): boolean {
const { state, previousJobs } = params;
const skipRecomputeJobIds = getSkipNextReloadRepairRecomputeJobIds(state);
const skipRecomputeJobIds = state.skipNextReloadRepairRecomputeJobIds;
if (!state.store || !previousJobs?.length) {
return false;
}

View File

@ -17,6 +17,7 @@ import {
computeJobNextRunAtMs,
consumeSkipNextReloadRepairRecompute,
nextWakeAtMs,
removeJobById,
recomputeNextRunsForMaintenance,
recordScheduleComputeError,
resolveJobPayloadTextForMain,
@ -512,8 +513,7 @@ function applyOutcomeToStoredJob(state: CronServiceState, result: TimedCronRunOu
emitJobFinished(state, job, result, result.startedAt);
if (shouldDelete) {
store.jobs = jobs.filter((entry) => entry.id !== job.id);
if (shouldDelete && removeJobById(state, job.id)) {
emit(state, { jobId: job.id, action: "removed" });
}
}
@ -1208,8 +1208,7 @@ export async function executeJob(
emitJobFinished(state, job, coreResult, startedAt);
if (shouldDelete && state.store) {
state.store.jobs = state.store.jobs.filter((j) => j.id !== job.id);
if (shouldDelete && removeJobById(state, job.id)) {
emit(state, { jobId: job.id, action: "removed" });
}
}