diff --git a/src/cron/service.external-reload-schedule-recompute.test.ts b/src/cron/service.external-reload-schedule-recompute.test.ts index 29029c3a197..d8378dab51d 100644 --- a/src/cron/service.external-reload-schedule-recompute.test.ts +++ b/src/cron/service.external-reload-schedule-recompute.test.ts @@ -2,7 +2,7 @@ import fs from "node:fs/promises"; import { describe, expect, it, vi } from "vitest"; import { setupCronServiceSuite, writeCronStoreSnapshot } from "./service.test-harness.js"; import { recomputeNextRuns, recomputeNextRunsForMaintenance } from "./service/jobs.js"; -import { run } from "./service/ops.js"; +import { remove, run } from "./service/ops.js"; import { createCronServiceState } from "./service/state.js"; import { ensureLoaded } from "./service/store.js"; import type { CronJob } from "./types.js"; @@ -456,4 +456,120 @@ describe("forceReload repairs externally changed cron schedules", () => { expect(persistedJob?.state?.nextRunAtMs).toBeUndefined(); expect(persistedJob?.state?.lastStatus).toBe("ok"); }); + + it("clears reload-repair skip markers when a job is removed before same-id rebuild", async () => { + const store = await makeStorePath(); + const nowMs = Date.parse("2026-03-19T01:44:00.000Z"); + const jobId = "external-reload-skip-marker-id-reuse"; + + const createJob = (expr: string): CronJob => ({ + id: jobId, + name: "external reload skip marker id reuse", + enabled: true, + createdAtMs: Date.parse("2026-03-18T00:30:00.000Z"), + updatedAtMs: Date.parse("2026-03-19T01:44:00.000Z"), + schedule: { kind: "cron", expr, tz: "UTC", staggerMs: 0 }, + sessionTarget: "main", + wakeMode: "next-heartbeat", + payload: { kind: "systemEvent", text: "tick" }, + state: {}, + }); + + await writeCronStoreSnapshot({ + storePath: store.storePath, + jobs: [createJob("*/15 * * * *")], + }); + + const state = createCronServiceState({ + cronEnabled: true, + storePath: store.storePath, + log: noopLogger, + nowMs: () => nowMs, + enqueueSystemEvent: vi.fn(), + requestHeartbeatNow: vi.fn(), + runIsolatedAgentJob: vi.fn(async () => ({ status: "ok" as const })), + }); + + await ensureLoaded(state, { skipRecompute: true }); + + await writeCronStoreSnapshot({ + storePath: store.storePath, + jobs: [createJob("not a valid cron")], + }); + await ensureLoaded(state, { forceReload: true, skipRecompute: true }); + expect(state.skipNextReloadRepairRecomputeJobIds.has(jobId)).toBe(true); + + const removed = await remove(state, jobId); + expect(removed).toEqual({ ok: true, removed: true }); + expect(state.skipNextReloadRepairRecomputeJobIds.has(jobId)).toBe(false); + + await writeCronStoreSnapshot({ + storePath: store.storePath, + jobs: [createJob("*/5 * * * *")], + }); + await ensureLoaded(state, { forceReload: true, skipRecompute: true }); + recomputeNextRunsForMaintenance(state); + + const rebuilt = state.store?.jobs.find((job) => job.id === jobId); + expect(typeof rebuilt?.state.nextRunAtMs).toBe("number"); + expect(Number.isFinite(rebuilt?.state.nextRunAtMs)).toBe(true); + expect(rebuilt?.state.scheduleErrorCount).toBeUndefined(); + }); + + it("recomputes nextRunAtMs when external every schedule changes", async () => { + const store = await makeStorePath(); + const nowMs = Date.parse("2026-03-19T01:44:00.000Z"); + const jobId = "external-every-schedule-change"; + + const createEveryJob = (everyMs: number): CronJob => ({ + id: jobId, + name: "external every schedule change", + enabled: true, + createdAtMs: Date.parse("2026-03-18T00:00:00.000Z"), + updatedAtMs: Date.parse("2026-03-19T01:44:00.000Z"), + schedule: { + kind: "every", + everyMs, + anchorMs: Date.parse("2026-03-19T00:00:00.000Z"), + }, + sessionTarget: "main", + wakeMode: "next-heartbeat", + payload: { kind: "systemEvent", text: "tick" }, + state: { + nextRunAtMs: Date.parse("2026-03-20T00:00:00.000Z"), + }, + }); + + await writeCronStoreSnapshot({ + storePath: store.storePath, + jobs: [createEveryJob(6 * 60_000)], + }); + + const state = createCronServiceState({ + cronEnabled: true, + storePath: store.storePath, + log: noopLogger, + nowMs: () => nowMs, + enqueueSystemEvent: vi.fn(), + requestHeartbeatNow: vi.fn(), + runIsolatedAgentJob: vi.fn(async () => ({ status: "ok" as const })), + }); + + await ensureLoaded(state, { skipRecompute: true }); + + await writeCronStoreSnapshot({ + storePath: store.storePath, + jobs: [createEveryJob(60_000)], + }); + + await ensureLoaded(state, { forceReload: true, skipRecompute: true }); + + const reloaded = state.store?.jobs.find((job) => job.id === jobId); + expect(reloaded?.schedule).toEqual({ + kind: "every", + everyMs: 60_000, + anchorMs: Date.parse("2026-03-19T00:00:00.000Z"), + }); + expect(reloaded?.state.nextRunAtMs).toBe(Date.parse("2026-03-19T01:44:00.000Z")); + }); }); diff --git a/src/cron/service.jobs.test.ts b/src/cron/service.jobs.test.ts index c514f7528ba..c97adb2f504 100644 --- a/src/cron/service.jobs.test.ts +++ b/src/cron/service.jobs.test.ts @@ -385,6 +385,7 @@ function createMockState(now: number, opts?: { defaultAgentId?: string }): CronS nowMs: () => now, defaultAgentId: opts?.defaultAgentId, }, + skipNextReloadRepairRecomputeJobIds: new Set(), } as unknown as CronServiceState; } diff --git a/src/cron/service.test-harness.ts b/src/cron/service.test-harness.ts index fcc62637892..a4803b5be3e 100644 --- a/src/cron/service.test-harness.ts +++ b/src/cron/service.test-harness.ts @@ -220,6 +220,7 @@ export function createMockCronStateForJobs(params: { timer: null, storeLoadedAtMs: nowMs, storeFileMtimeMs: null, + skipNextReloadRepairRecomputeJobIds: new Set(), op: Promise.resolve(), warnedDisabled: false, deps: { diff --git a/src/cron/service/jobs.schedule-error-isolation.test.ts b/src/cron/service/jobs.schedule-error-isolation.test.ts index 84cd8e0a1e9..3604c4ab45b 100644 --- a/src/cron/service/jobs.schedule-error-isolation.test.ts +++ b/src/cron/service/jobs.schedule-error-isolation.test.ts @@ -28,6 +28,11 @@ function createMockState(jobs: CronJob[]): CronServiceState { store, timer: null, running: false, + op: Promise.resolve(), + warnedDisabled: false, + storeLoadedAtMs: null, + storeFileMtimeMs: null, + skipNextReloadRepairRecomputeJobIds: new Set(), } as unknown as CronServiceState; } diff --git a/src/cron/service/jobs.ts b/src/cron/service/jobs.ts index db90f1502f1..184f136d98c 100644 --- a/src/cron/service/jobs.ts +++ b/src/cron/service/jobs.ts @@ -238,6 +238,19 @@ export function findJobOrThrow(state: CronServiceState, id: string) { return job; } +export function removeJobById(state: CronServiceState, jobId: string): boolean { + if (!state.store) { + return false; + } + const before = state.store.jobs.length; + state.store.jobs = state.store.jobs.filter((job) => job.id !== jobId); + const removed = state.store.jobs.length !== before; + if (removed) { + state.skipNextReloadRepairRecomputeJobIds.delete(jobId); + } + return removed; +} + export function computeJobNextRunAtMs(job: CronJob, nowMs: number): number | undefined { if (!job.enabled) { return undefined; @@ -414,8 +427,7 @@ function walkSchedulableJobs( } export function hasSkipNextReloadRepairRecompute(state: CronServiceState, jobId: string): boolean { - const pending = state.skipNextReloadRepairRecomputeJobIds; - return pending?.has(jobId) === true; + return state.skipNextReloadRepairRecomputeJobIds.has(jobId); } export function consumeSkipNextReloadRepairRecompute( @@ -425,7 +437,7 @@ export function consumeSkipNextReloadRepairRecompute( if (!hasSkipNextReloadRepairRecompute(state, jobId)) { return false; } - state.skipNextReloadRepairRecomputeJobIds?.delete(jobId); + state.skipNextReloadRepairRecomputeJobIds.delete(jobId); return true; } diff --git a/src/cron/service/ops.ts b/src/cron/service/ops.ts index 58b12933839..a59dd2c2251 100644 --- a/src/cron/service/ops.ts +++ b/src/cron/service/ops.ts @@ -9,10 +9,12 @@ import { findJobOrThrow, isJobDue, nextWakeAtMs, + removeJobById, recomputeNextRuns, recomputeNextRunsForMaintenance, } from "./jobs.js"; import { locked } from "./locked.js"; +import { schedulesEqual } from "./schedule-equality.js"; import type { CronServiceState } from "./state.js"; import { ensureLoaded, persist, warnIfDisabled } from "./store.js"; import { @@ -48,22 +50,6 @@ export type CronListPageResult = { nextOffset: number | null; }; -function schedulesEqual(a: CronJob["schedule"], b: CronJob["schedule"]): boolean { - if (a.kind !== b.kind) { - return false; - } - if (a.kind === "at" && b.kind === "at") { - return a.at === b.at; - } - if (a.kind === "every" && b.kind === "every") { - return a.everyMs === b.everyMs && a.anchorMs === b.anchorMs; - } - if (a.kind === "cron" && b.kind === "cron") { - return a.expr === b.expr && a.tz === b.tz && a.staggerMs === b.staggerMs; - } - return false; -} - function mergeManualRunSnapshotAfterReload(params: { state: CronServiceState; jobId: string; @@ -82,7 +68,7 @@ function mergeManualRunSnapshotAfterReload(params: { return; } if (params.removed) { - params.state.store.jobs = params.state.store.jobs.filter((job) => job.id !== params.jobId); + removeJobById(params.state, params.jobId); return; } if (!params.snapshot) { @@ -372,12 +358,10 @@ export async function remove(state: CronServiceState, id: string) { return await locked(state, async () => { warnIfDisabled(state, "remove"); await ensureLoaded(state); - const before = state.store?.jobs.length ?? 0; if (!state.store) { return { ok: false, removed: false } as const; } - state.store.jobs = state.store.jobs.filter((j) => j.id !== id); - const removed = (state.store.jobs.length ?? 0) !== before; + const removed = removeJobById(state, id); await persist(state); armTimer(state); if (removed) { @@ -554,8 +538,7 @@ async function finishPreparedManualRun( usage: coreResult.usage, }); - if (shouldDelete && state.store) { - state.store.jobs = state.store.jobs.filter((entry) => entry.id !== job.id); + if (shouldDelete && removeJobById(state, job.id)) { emit(state, { jobId: job.id, action: "removed" }); } diff --git a/src/cron/service/schedule-equality.ts b/src/cron/service/schedule-equality.ts new file mode 100644 index 00000000000..d4f64432ede --- /dev/null +++ b/src/cron/service/schedule-equality.ts @@ -0,0 +1,17 @@ +import type { CronJob } from "../types.js"; + +export function schedulesEqual(a: CronJob["schedule"], b: CronJob["schedule"]): boolean { + if (a.kind !== b.kind) { + return false; + } + if (a.kind === "at" && b.kind === "at") { + return a.at === b.at; + } + if (a.kind === "every" && b.kind === "every") { + return a.everyMs === b.everyMs && a.anchorMs === b.anchorMs; + } + if (a.kind === "cron" && b.kind === "cron") { + return a.expr === b.expr && a.tz === b.tz && a.staggerMs === b.staggerMs; + } + return false; +} diff --git a/src/cron/service/state.ts b/src/cron/service/state.ts index d51464c29d2..e56c7d23bcb 100644 --- a/src/cron/service/state.ts +++ b/src/cron/service/state.ts @@ -127,7 +127,7 @@ export type CronServiceState = { warnedDisabled: boolean; storeLoadedAtMs: number | null; storeFileMtimeMs: number | null; - skipNextReloadRepairRecomputeJobIds?: Set; + skipNextReloadRepairRecomputeJobIds: Set; }; export function createCronServiceState(deps: CronServiceDeps): CronServiceState { diff --git a/src/cron/service/store.ts b/src/cron/service/store.ts index 470b7178522..c4a64791a1e 100644 --- a/src/cron/service/store.ts +++ b/src/cron/service/store.ts @@ -3,6 +3,7 @@ import { normalizeStoredCronJobs } from "../store-migration.js"; import { loadCronStore, saveCronStore } from "../store.js"; import type { CronJob } from "../types.js"; import { computeJobNextRunAtMs, recordScheduleComputeError, recomputeNextRuns } from "./jobs.js"; +import { schedulesEqual } from "./schedule-equality.js"; import type { CronServiceState } from "./state.js"; async function getFileMtimeMs(path: string): Promise { @@ -14,32 +15,12 @@ async function getFileMtimeMs(path: string): Promise { } } -function schedulesEqual(a: CronJob["schedule"], b: CronJob["schedule"]): boolean { - if (a.kind !== b.kind) { - return false; - } - if (a.kind === "at" && b.kind === "at") { - return a.at === b.at; - } - if (a.kind === "every" && b.kind === "every") { - return a.everyMs === b.everyMs && a.anchorMs === b.anchorMs; - } - if (a.kind === "cron" && b.kind === "cron") { - return a.expr === b.expr && a.tz === b.tz && a.staggerMs === b.staggerMs; - } - return false; -} - -function getSkipNextReloadRepairRecomputeJobIds(state: CronServiceState): Set { - return (state.skipNextReloadRepairRecomputeJobIds ??= new Set()); -} - function repairNextRunsAfterExternalReload(params: { state: CronServiceState; previousJobs: CronJob[] | undefined; }): boolean { const { state, previousJobs } = params; - const skipRecomputeJobIds = getSkipNextReloadRepairRecomputeJobIds(state); + const skipRecomputeJobIds = state.skipNextReloadRepairRecomputeJobIds; if (!state.store || !previousJobs?.length) { return false; } diff --git a/src/cron/service/timer.ts b/src/cron/service/timer.ts index ecac35d5fe8..315be8a4fc8 100644 --- a/src/cron/service/timer.ts +++ b/src/cron/service/timer.ts @@ -17,6 +17,7 @@ import { computeJobNextRunAtMs, consumeSkipNextReloadRepairRecompute, nextWakeAtMs, + removeJobById, recomputeNextRunsForMaintenance, recordScheduleComputeError, resolveJobPayloadTextForMain, @@ -512,8 +513,7 @@ function applyOutcomeToStoredJob(state: CronServiceState, result: TimedCronRunOu emitJobFinished(state, job, result, result.startedAt); - if (shouldDelete) { - store.jobs = jobs.filter((entry) => entry.id !== job.id); + if (shouldDelete && removeJobById(state, job.id)) { emit(state, { jobId: job.id, action: "removed" }); } } @@ -1208,8 +1208,7 @@ export async function executeJob( emitJobFinished(state, job, coreResult, startedAt); - if (shouldDelete && state.store) { - state.store.jobs = state.store.jobs.filter((j) => j.id !== job.id); + if (shouldDelete && removeJobById(state, job.id)) { emit(state, { jobId: job.id, action: "removed" }); } }