diff --git a/src/cron/service.external-reload-schedule-recompute.test.ts b/src/cron/service.external-reload-schedule-recompute.test.ts new file mode 100644 index 00000000000..cec8c03e311 --- /dev/null +++ b/src/cron/service.external-reload-schedule-recompute.test.ts @@ -0,0 +1,421 @@ +import fs from "node:fs/promises"; +import { describe, expect, it, vi } from "vitest"; +import { setupCronServiceSuite, writeCronStoreSnapshot } from "./service.test-harness.js"; +import { recomputeNextRuns } from "./service/jobs.js"; +import { run } from "./service/ops.js"; +import { createCronServiceState } from "./service/state.js"; +import { ensureLoaded } from "./service/store.js"; +import type { CronJob } from "./types.js"; + +const { logger: noopLogger, makeStorePath } = setupCronServiceSuite({ + prefix: "openclaw-cron-external-reload-", + baseTimeIso: "2026-03-19T01:44:00.000Z", +}); + +function createCronJob(params: { + id: string; + expr: string; + updatedAtMs?: number; + enabled?: boolean; + nextRunAtMs?: number; + scheduleErrorCount?: number; + lastError?: string; + lastStatus?: CronJob["state"]["lastStatus"]; + runningAtMs?: number; +}): CronJob { + return { + id: params.id, + name: params.id, + enabled: params.enabled ?? true, + createdAtMs: Date.parse("2026-03-18T00:30:00.000Z"), + updatedAtMs: params.updatedAtMs ?? Date.parse("2026-03-19T01:44:00.000Z"), + schedule: { kind: "cron", expr: params.expr, tz: "Asia/Shanghai", staggerMs: 0 }, + sessionTarget: "main", + wakeMode: "next-heartbeat", + payload: { kind: "systemEvent", text: "tick" }, + state: { + nextRunAtMs: params.nextRunAtMs, + scheduleErrorCount: params.scheduleErrorCount, + lastError: params.lastError, + lastStatus: params.lastStatus, + runningAtMs: params.runningAtMs, + }, + }; +} + +describe("forceReload repairs externally changed schedules", () => { + it("recomputes nextRunAtMs when jobs.json changes a cron schedule outside cron.update", async () => { + const store = await makeStorePath(); + const nowMs = Date.parse("2026-03-19T01:44:00.000Z"); + const jobId = "external-schedule-change"; + const staleNextRunAtMs = Date.parse("2026-03-20T00:30:00.000Z"); + const correctedNextRunAtMs = Date.parse("2026-03-19T12:30:00.000Z"); + + await writeCronStoreSnapshot({ + storePath: store.storePath, + jobs: [createCronJob({ id: jobId, expr: "30 8 * * *", nextRunAtMs: staleNextRunAtMs })], + }); + + const state = createCronServiceState({ + cronEnabled: true, + storePath: store.storePath, + log: noopLogger, + nowMs: () => nowMs, + enqueueSystemEvent: vi.fn(), + requestHeartbeatNow: vi.fn(), + runIsolatedAgentJob: vi.fn(async () => ({ status: "ok" as const })), + }); + + await ensureLoaded(state, { skipRecompute: true }); + + await writeCronStoreSnapshot({ + storePath: store.storePath, + jobs: [createCronJob({ id: jobId, expr: "30 8,20 * * *", nextRunAtMs: staleNextRunAtMs })], + }); + + await ensureLoaded(state, { forceReload: true, skipRecompute: true }); + + const reloaded = state.store?.jobs.find((job) => job.id === jobId); + expect(reloaded?.state.nextRunAtMs).toBe(correctedNextRunAtMs); + + const persisted = JSON.parse(await fs.readFile(store.storePath, "utf8")) as { + jobs?: Array<{ id: string; state?: { nextRunAtMs?: number } }>; + }; + expect(persisted.jobs?.find((job) => job.id === jobId)?.state?.nextRunAtMs).toBe( + correctedNextRunAtMs, + ); + }); + + it("recomputes from updatedAtMs so delayed reload keeps newly earlier slots due", async () => { + const store = await makeStorePath(); + const nowMs = Date.parse("2026-03-19T12:10:00.000Z"); + const jobId = "external-schedule-change-delayed-observe"; + const staleNextRunAtMs = Date.parse("2026-03-20T00:30:00.000Z"); + + const createJob = (params: { expr: string; updatedAtMs: number }) => + createCronJob({ + id: jobId, + expr: params.expr, + updatedAtMs: params.updatedAtMs, + nextRunAtMs: staleNextRunAtMs, + }); + + await writeCronStoreSnapshot({ + storePath: store.storePath, + jobs: [ + createJob({ expr: "30 23 * * *", updatedAtMs: Date.parse("2026-03-19T12:00:00.000Z") }), + ], + }); + + const state = createCronServiceState({ + cronEnabled: true, + storePath: store.storePath, + log: noopLogger, + nowMs: () => nowMs, + enqueueSystemEvent: vi.fn(), + requestHeartbeatNow: vi.fn(), + runIsolatedAgentJob: vi.fn(async () => ({ status: "ok" as const })), + }); + + await ensureLoaded(state, { skipRecompute: true }); + + await writeCronStoreSnapshot({ + storePath: store.storePath, + jobs: [createJob({ expr: "* * * * *", updatedAtMs: Date.parse("2026-03-19T12:01:00.000Z") })], + }); + + await ensureLoaded(state, { forceReload: true, skipRecompute: true }); + + expect(state.store?.jobs[0]?.state.nextRunAtMs).toBe(Date.parse("2026-03-19T12:02:00.000Z")); + }); + + it("records schedule errors instead of aborting reload when an external edit is invalid", async () => { + const store = await makeStorePath(); + const nowMs = Date.parse("2026-03-19T01:44:00.000Z"); + const jobId = "external-invalid-schedule"; + + await writeCronStoreSnapshot({ + storePath: store.storePath, + jobs: [ + createCronJob({ + id: jobId, + expr: "30 8 * * *", + nextRunAtMs: Date.parse("2026-03-20T00:30:00.000Z"), + }), + ], + }); + + const state = createCronServiceState({ + cronEnabled: true, + storePath: store.storePath, + log: noopLogger, + nowMs: () => nowMs, + enqueueSystemEvent: vi.fn(), + requestHeartbeatNow: vi.fn(), + runIsolatedAgentJob: vi.fn(async () => ({ status: "ok" as const })), + }); + + await ensureLoaded(state, { skipRecompute: true }); + + await writeCronStoreSnapshot({ + storePath: store.storePath, + jobs: [ + createCronJob({ + id: jobId, + expr: "not a valid cron", + nextRunAtMs: Date.parse("2026-03-20T00:30:00.000Z"), + }), + ], + }); + + await expect( + ensureLoaded(state, { forceReload: true, skipRecompute: true }), + ).resolves.toBeUndefined(); + + const reloaded = state.store?.jobs[0]; + expect(reloaded?.state.nextRunAtMs).toBeUndefined(); + expect(reloaded?.state.scheduleErrorCount).toBe(1); + expect(reloaded?.state.lastError).toMatch(/^schedule error:/); + }); + + it("does not double-count a reload schedule error during the immediate recompute", async () => { + const store = await makeStorePath(); + const nowMs = Date.parse("2026-03-19T01:44:00.000Z"); + const jobId = "external-invalid-schedule-full-recompute"; + + await writeCronStoreSnapshot({ + storePath: store.storePath, + jobs: [ + createCronJob({ + id: jobId, + expr: "30 8 * * *", + nextRunAtMs: Date.parse("2026-03-20T00:30:00.000Z"), + }), + ], + }); + + const state = createCronServiceState({ + cronEnabled: true, + storePath: store.storePath, + log: noopLogger, + nowMs: () => nowMs, + enqueueSystemEvent: vi.fn(), + requestHeartbeatNow: vi.fn(), + runIsolatedAgentJob: vi.fn(async () => ({ status: "ok" as const })), + }); + + await ensureLoaded(state, { skipRecompute: true }); + + await writeCronStoreSnapshot({ + storePath: store.storePath, + jobs: [ + createCronJob({ + id: jobId, + expr: "not a valid cron", + nextRunAtMs: Date.parse("2026-03-20T00:30:00.000Z"), + }), + ], + }); + + await ensureLoaded(state, { forceReload: true, skipRecompute: true }); + expect(state.store?.jobs[0]?.state.scheduleErrorCount).toBe(1); + + recomputeNextRuns(state); + expect(state.store?.jobs[0]?.state.scheduleErrorCount).toBe(1); + }); + + it("keeps forceReload repairs when manual-run snapshot is merged back", async () => { + const store = await makeStorePath(); + let nowMs = Date.parse("2026-03-19T01:44:00.000Z"); + const jobId = "manual-run-reload-merge"; + const staleNextRunAtMs = Date.parse("2026-03-19T23:30:00.000Z"); + + const createJob = (params: { expr: string; enabled: boolean; nextRunAtMs?: number }) => ({ + ...createCronJob({ + id: jobId, + expr: params.expr, + enabled: params.enabled, + nextRunAtMs: params.nextRunAtMs, + }), + sessionTarget: "isolated" as const, + payload: { kind: "agentTurn", message: "tick" } as const, + }); + + await writeCronStoreSnapshot({ + storePath: store.storePath, + jobs: [createJob({ expr: "30 23 * * *", enabled: true, nextRunAtMs: staleNextRunAtMs })], + }); + + const runIsolatedAgentJob = vi.fn(async () => { + await writeCronStoreSnapshot({ + storePath: store.storePath, + jobs: [createJob({ expr: "30 8 * * *", enabled: false, nextRunAtMs: staleNextRunAtMs })], + }); + nowMs += 500; + return { status: "ok" as const, summary: "done" }; + }); + + const state = createCronServiceState({ + cronEnabled: true, + storePath: store.storePath, + log: noopLogger, + nowMs: () => nowMs, + enqueueSystemEvent: vi.fn(), + requestHeartbeatNow: vi.fn(), + runIsolatedAgentJob, + }); + + expect(await run(state, jobId, "force")).toEqual({ ok: true, ran: true }); + + const merged = state.store?.jobs[0]; + expect(merged?.enabled).toBe(false); + expect(merged?.state.nextRunAtMs).toBeUndefined(); + expect(merged?.state.lastStatus).toBe("ok"); + }); + + it("keeps scheduleErrorCount cleared when external reload fixes schedule during force-run", async () => { + const store = await makeStorePath(); + let nowMs = Date.parse("2026-03-19T01:44:00.000Z"); + const jobId = "manual-run-reload-clears-schedule-error-count"; + const staleNextRunAtMs = Date.parse("2026-03-19T23:30:00.000Z"); + + const createJob = (expr: string) => ({ + ...createCronJob({ + id: jobId, + expr, + nextRunAtMs: staleNextRunAtMs, + scheduleErrorCount: 2, + lastError: "schedule error: invalid expression", + }), + sessionTarget: "isolated" as const, + payload: { kind: "agentTurn", message: "tick" } as const, + }); + + await writeCronStoreSnapshot({ + storePath: store.storePath, + jobs: [createJob("30 23 * * *")], + }); + + const runIsolatedAgentJob = vi.fn(async () => { + await writeCronStoreSnapshot({ + storePath: store.storePath, + jobs: [createJob("30 8 * * *")], + }); + nowMs += 500; + return { status: "ok" as const, summary: "done" }; + }); + + const state = createCronServiceState({ + cronEnabled: true, + storePath: store.storePath, + log: noopLogger, + nowMs: () => nowMs, + enqueueSystemEvent: vi.fn(), + requestHeartbeatNow: vi.fn(), + runIsolatedAgentJob, + }); + + expect(await run(state, jobId, "force")).toEqual({ ok: true, ran: true }); + expect(state.store?.jobs[0]?.state.scheduleErrorCount).toBeUndefined(); + }); + + it("preserves runningAtMs when an external reload comes from a stale file snapshot", async () => { + const store = await makeStorePath(); + const nowMs = Date.parse("2026-03-19T12:10:00.000Z"); + const jobId = "external-running-marker"; + const runningAtMs = Date.parse("2026-03-19T12:00:00.000Z"); + + await writeCronStoreSnapshot({ + storePath: store.storePath, + jobs: [ + createCronJob({ + id: jobId, + expr: "30 23 * * *", + nextRunAtMs: Date.parse("2026-03-20T00:30:00.000Z"), + runningAtMs, + }), + ], + }); + + const state = createCronServiceState({ + cronEnabled: true, + storePath: store.storePath, + log: noopLogger, + nowMs: () => nowMs, + enqueueSystemEvent: vi.fn(), + requestHeartbeatNow: vi.fn(), + runIsolatedAgentJob: vi.fn(async () => ({ status: "ok" as const })), + }); + + await ensureLoaded(state, { skipRecompute: true }); + + await writeCronStoreSnapshot({ + storePath: store.storePath, + jobs: [ + createCronJob({ + id: jobId, + expr: "* * * * *", + updatedAtMs: Date.parse("2026-03-19T12:01:00.000Z"), + nextRunAtMs: Date.parse("2026-03-20T00:30:00.000Z"), + }), + ], + }); + + await ensureLoaded(state, { forceReload: true, skipRecompute: true }); + + const reloaded = state.store?.jobs[0]; + expect(reloaded?.state.runningAtMs).toBe(runningAtMs); + expect(reloaded?.state.nextRunAtMs).toBe(Date.parse("2026-03-19T12:02:00.000Z")); + }); + + it("recomputes nextRunAtMs when an external every schedule changes", async () => { + const store = await makeStorePath(); + const nowMs = Date.parse("2026-03-19T01:44:00.000Z"); + const jobId = "external-every-schedule-change"; + + const createEveryJob = (everyMs: number): CronJob => ({ + id: jobId, + name: jobId, + enabled: true, + createdAtMs: Date.parse("2026-03-18T00:00:00.000Z"), + updatedAtMs: Date.parse("2026-03-19T01:44:00.000Z"), + schedule: { + kind: "every", + everyMs, + anchorMs: Date.parse("2026-03-19T00:00:00.000Z"), + }, + sessionTarget: "main", + wakeMode: "next-heartbeat", + payload: { kind: "systemEvent", text: "tick" }, + state: { + nextRunAtMs: Date.parse("2026-03-20T00:00:00.000Z"), + }, + }); + + await writeCronStoreSnapshot({ + storePath: store.storePath, + jobs: [createEveryJob(6 * 60_000)], + }); + + const state = createCronServiceState({ + cronEnabled: true, + storePath: store.storePath, + log: noopLogger, + nowMs: () => nowMs, + enqueueSystemEvent: vi.fn(), + requestHeartbeatNow: vi.fn(), + runIsolatedAgentJob: vi.fn(async () => ({ status: "ok" as const })), + }); + + await ensureLoaded(state, { skipRecompute: true }); + + await writeCronStoreSnapshot({ + storePath: store.storePath, + jobs: [createEveryJob(60_000)], + }); + + await ensureLoaded(state, { forceReload: true, skipRecompute: true }); + + expect(state.store?.jobs[0]?.state.nextRunAtMs).toBe(Date.parse("2026-03-19T01:44:00.000Z")); + }); +}); diff --git a/src/cron/service.issue-regressions.test.ts b/src/cron/service.issue-regressions.test.ts index dac28f4b0c9..e0ca0f32816 100644 --- a/src/cron/service.issue-regressions.test.ts +++ b/src/cron/service.issue-regressions.test.ts @@ -22,8 +22,12 @@ import { createNoopLogger, createRunningCronServiceState, } from "./service.test-harness.js"; -import { computeJobNextRunAtMs } from "./service/jobs.js"; -import { enqueueRun, run } from "./service/ops.js"; +import { + computeJobNextRunAtMs, + nextWakeAtMs, + recomputeNextRunsForMaintenance, +} from "./service/jobs.js"; +import { enqueueRun, list, run, status } from "./service/ops.js"; import { createCronServiceState, type CronEvent } from "./service/state.js"; import { DEFAULT_JOB_TIMEOUT_MS, @@ -1748,6 +1752,170 @@ describe("Cron issue regressions", () => { expect(job.enabled).toBe(true); }); + it("does not double-count reload schedule errors in apply path before maintenance recompute", () => { + const startedAt = Date.parse("2026-03-02T12:10:00.000Z"); + const endedAt = startedAt + 25; + const job = createIsolatedRegressionJob({ + id: "apply-result-reload-dedupe-30905", + name: "apply-result-reload-dedupe-30905", + scheduledAt: startedAt, + schedule: { kind: "cron", expr: "0 7 * * *", tz: "Invalid/Timezone" }, + payload: { kind: "agentTurn", message: "ping" }, + state: { + nextRunAtMs: undefined, + runningAtMs: startedAt - 500, + scheduleErrorCount: 1, + lastError: "schedule error: previous", + }, + }); + const state = createRunningCronServiceState({ + storePath: "/tmp/cron-30905-reload-dedupe.json", + log: noopLogger as never, + nowMs: () => endedAt, + jobs: [job], + }); + state.skipNextReloadRepairRecomputeJobIds = new Set([job.id]); + + applyJobResult(state, job, { + status: "ok", + delivered: true, + startedAt, + endedAt, + }); + + expect(job.state.scheduleErrorCount).toBe(1); + expect(state.skipNextReloadRepairRecomputeJobIds?.has(job.id)).toBe(true); + expect(nextWakeAtMs(state)).toBe(endedAt + 2_000); + + recomputeNextRunsForMaintenance(state); + expect(job.state.scheduleErrorCount).toBe(1); + expect(state.skipNextReloadRepairRecomputeJobIds?.has(job.id)).toBe(false); + expect(nextWakeAtMs(state)).toBe(endedAt + 2_000); + }); + + it("keeps a future wake when apply skips immediate recompute after reload schedule error", () => { + const startedAt = Date.parse("2026-03-02T12:12:00.000Z"); + const endedAt = startedAt + 25; + const job = createIsolatedRegressionJob({ + id: "apply-result-reload-wake-30905", + name: "apply-result-reload-wake-30905", + scheduledAt: startedAt, + schedule: { kind: "cron", expr: "0 7 * * *", tz: "Invalid/Timezone" }, + payload: { kind: "agentTurn", message: "ping" }, + state: { + nextRunAtMs: undefined, + runningAtMs: startedAt - 500, + scheduleErrorCount: 1, + lastError: "schedule error: previous", + }, + }); + const state = createRunningCronServiceState({ + storePath: "/tmp/cron-30905-reload-wake.json", + log: noopLogger as never, + nowMs: () => endedAt, + jobs: [job], + }); + state.skipNextReloadRepairRecomputeJobIds = new Set([job.id]); + + applyJobResult(state, job, { + status: "error", + error: "synthetic failure", + startedAt, + endedAt, + }); + + expect(job.state.scheduleErrorCount).toBe(1); + expect(job.state.nextRunAtMs).toBeUndefined(); + expect(state.skipNextReloadRepairRecomputeJobIds?.has(job.id)).toBe(true); + expect(nextWakeAtMs(state)).toBe(endedAt + 2_000); + + recomputeNextRunsForMaintenance(state); + expect(job.state.scheduleErrorCount).toBe(1); + expect(state.skipNextReloadRepairRecomputeJobIds?.has(job.id)).toBe(false); + expect(nextWakeAtMs(state)).toBe(endedAt + 2_000); + }); + + it("does not arm 2s wake for malformed every schedules with non-repairable missing nextRun", () => { + const nowMs = Date.parse("2026-03-02T12:20:00.000Z"); + const job = createIsolatedRegressionJob({ + id: "missing-nextrun-malformed-every", + name: "missing-nextrun-malformed-every", + scheduledAt: nowMs, + schedule: { kind: "every", everyMs: Number.NaN, anchorMs: nowMs - 60_000 }, + payload: { kind: "agentTurn", message: "ping" }, + state: { + nextRunAtMs: undefined, + }, + }); + const state = createRunningCronServiceState({ + storePath: "/tmp/cron-missing-nextrun-malformed-every.json", + log: noopLogger as never, + nowMs: () => nowMs, + jobs: [job], + }); + + expect(nextWakeAtMs(state)).toBeUndefined(); + }); + + it("does not consume reload skip markers during read-only status/list maintenance", async () => { + const nowMs = Date.parse("2026-03-02T12:25:00.000Z"); + const job = createIsolatedRegressionJob({ + id: "read-only-skip-marker", + name: "read-only-skip-marker", + scheduledAt: nowMs, + schedule: { kind: "cron", expr: "0 7 * * *", tz: "Invalid/Timezone" }, + payload: { kind: "agentTurn", message: "ping" }, + state: { + nextRunAtMs: undefined, + scheduleErrorCount: 1, + lastError: "schedule error: previous", + }, + }); + const state = createRunningCronServiceState({ + storePath: "/tmp/cron-read-only-skip-marker.json", + log: noopLogger as never, + nowMs: () => nowMs, + jobs: [job], + }); + state.skipNextReloadRepairRecomputeJobIds = new Set([job.id]); + + const currentStatus = await status(state); + expect(currentStatus.nextWakeAtMs).toBe(nowMs + 2_000); + expect(state.skipNextReloadRepairRecomputeJobIds.has(job.id)).toBe(true); + + const jobs = await list(state, { includeDisabled: true }); + expect(jobs).toHaveLength(1); + expect(state.skipNextReloadRepairRecomputeJobIds.has(job.id)).toBe(true); + + recomputeNextRunsForMaintenance(state); + expect(state.skipNextReloadRepairRecomputeJobIds.has(job.id)).toBe(false); + }); + + it("does not arm 2s wake for exhausted one-shot jobs with missing nextRun", () => { + const nowMs = Date.parse("2026-03-02T12:22:00.000Z"); + const atMs = nowMs - 60_000; + const job = createIsolatedRegressionJob({ + id: "missing-nextrun-exhausted-at", + name: "missing-nextrun-exhausted-at", + scheduledAt: nowMs, + schedule: { kind: "at", at: new Date(atMs).toISOString() }, + payload: { kind: "agentTurn", message: "ping" }, + state: { + nextRunAtMs: undefined, + lastStatus: "ok", + lastRunAtMs: nowMs - 30_000, + }, + }); + const state = createRunningCronServiceState({ + storePath: "/tmp/cron-missing-nextrun-exhausted-at.json", + log: noopLogger as never, + nowMs: () => nowMs, + jobs: [job], + }); + + expect(nextWakeAtMs(state)).toBeUndefined(); + }); + it("force run preserves 'every' anchor while recording manual lastRunAtMs", () => { const nowMs = Date.now(); const everyMs = 24 * 60 * 60 * 1_000; diff --git a/src/cron/service.jobs.test.ts b/src/cron/service.jobs.test.ts index c514f7528ba..c97adb2f504 100644 --- a/src/cron/service.jobs.test.ts +++ b/src/cron/service.jobs.test.ts @@ -385,6 +385,7 @@ function createMockState(now: number, opts?: { defaultAgentId?: string }): CronS nowMs: () => now, defaultAgentId: opts?.defaultAgentId, }, + skipNextReloadRepairRecomputeJobIds: new Set(), } as unknown as CronServiceState; } diff --git a/src/cron/service.test-harness.ts b/src/cron/service.test-harness.ts index fcc62637892..a4803b5be3e 100644 --- a/src/cron/service.test-harness.ts +++ b/src/cron/service.test-harness.ts @@ -220,6 +220,7 @@ export function createMockCronStateForJobs(params: { timer: null, storeLoadedAtMs: nowMs, storeFileMtimeMs: null, + skipNextReloadRepairRecomputeJobIds: new Set(), op: Promise.resolve(), warnedDisabled: false, deps: { diff --git a/src/cron/service/jobs.schedule-error-isolation.test.ts b/src/cron/service/jobs.schedule-error-isolation.test.ts index 84cd8e0a1e9..cd55e4aa56d 100644 --- a/src/cron/service/jobs.schedule-error-isolation.test.ts +++ b/src/cron/service/jobs.schedule-error-isolation.test.ts @@ -28,6 +28,11 @@ function createMockState(jobs: CronJob[]): CronServiceState { store, timer: null, running: false, + op: Promise.resolve(), + warnedDisabled: false, + storeLoadedAtMs: null, + storeFileMtimeMs: null, + skipNextReloadRepairRecomputeJobIds: new Set(), } as unknown as CronServiceState; } @@ -201,4 +206,44 @@ describe("cron schedule error isolation", () => { expect(badJob.state.lastError).not.toContain("Cannot read properties of undefined"); expect(badJob.state.scheduleErrorCount).toBe(1); }); + + it("keeps malformed every schedules on the schedule-error path", () => { + const badJob = createJob({ + id: "bad-every", + name: "Bad Every", + schedule: { kind: "every", everyMs: Number.NaN }, + state: { + nextRunAtMs: undefined, + scheduleErrorCount: 1, + lastError: "schedule error: previous", + }, + }); + const state = createMockState([badJob]); + + recomputeNextRuns(state); + + expect(badJob.state.nextRunAtMs).toBeUndefined(); + expect(badJob.state.scheduleErrorCount).toBe(2); + expect(badJob.state.lastError).toContain("invalid every schedule"); + }); + + it("keeps malformed at schedules on the schedule-error path", () => { + const badJob = createJob({ + id: "bad-at", + name: "Bad At", + schedule: { kind: "at", at: "not-a-timestamp" }, + state: { + nextRunAtMs: undefined, + scheduleErrorCount: 1, + lastError: "schedule error: previous", + }, + }); + const state = createMockState([badJob]); + + recomputeNextRuns(state); + + expect(badJob.state.nextRunAtMs).toBeUndefined(); + expect(badJob.state.scheduleErrorCount).toBe(2); + expect(badJob.state.lastError).toContain("invalid at schedule"); + }); }); diff --git a/src/cron/service/jobs.ts b/src/cron/service/jobs.ts index 542ba81053d..2f1f6f4d2ab 100644 --- a/src/cron/service/jobs.ts +++ b/src/cron/service/jobs.ts @@ -33,6 +33,7 @@ import { import type { CronServiceState } from "./state.js"; const STUCK_RUN_MS = 2 * 60 * 60 * 1000; +const MISSING_NEXT_RUN_WAKE_MS = 2_000; const STAGGER_OFFSET_CACHE_MAX = 4096; const staggerOffsetCache = new Map(); @@ -238,6 +239,19 @@ export function findJobOrThrow(state: CronServiceState, id: string) { return job; } +export function removeJobById(state: CronServiceState, jobId: string): boolean { + if (!state.store) { + return false; + } + const before = state.store.jobs.length; + state.store.jobs = state.store.jobs.filter((job) => job.id !== jobId); + const removed = state.store.jobs.length !== before; + if (removed) { + state.skipNextReloadRepairRecomputeJobIds.delete(jobId); + } + return removed; +} + export function computeJobNextRunAtMs(job: CronJob, nowMs: number): number | undefined { if (!job.enabled) { return undefined; @@ -294,6 +308,22 @@ export function computeJobNextRunAtMs(job: CronJob, nowMs: number): number | und return isFiniteTimestamp(next) ? next : undefined; } +export function shouldTreatUndefinedNextRunAsScheduleError(job: CronJob): boolean { + if (!job.enabled) { + return false; + } + if (job.schedule.kind === "every") { + return coerceFiniteScheduleNumber(job.schedule.everyMs) === undefined; + } + if (job.schedule.kind === "at") { + return parseAbsoluteTimeMs(job.schedule.at) === null; + } + if (job.schedule.kind === "cron") { + return job.schedule.expr.trim().length === 0; + } + return false; +} + export function computeJobPreviousRunAtMs(job: CronJob, nowMs: number): number | undefined { if (!job.enabled || job.schedule.kind !== "cron") { return undefined; @@ -413,10 +443,50 @@ function walkSchedulableJobs( return changed; } -function recomputeJobNextRunAtMs(params: { state: CronServiceState; job: CronJob; nowMs: number }) { +export function hasSkipNextReloadRepairRecompute(state: CronServiceState, jobId: string): boolean { + return state.skipNextReloadRepairRecomputeJobIds.has(jobId); +} + +export function consumeSkipNextReloadRepairRecompute( + state: CronServiceState, + jobId: string, +): boolean { + if (!hasSkipNextReloadRepairRecompute(state, jobId)) { + return false; + } + state.skipNextReloadRepairRecomputeJobIds.delete(jobId); + return true; +} + +function recomputeJobNextRunAtMs(params: { + state: CronServiceState; + job: CronJob; + nowMs: number; + consumeReloadRepairSkip?: boolean; +}) { + const consumeReloadRepairSkip = params.consumeReloadRepairSkip ?? true; + if ( + consumeReloadRepairSkip + ? consumeSkipNextReloadRepairRecompute(params.state, params.job.id) + : hasSkipNextReloadRepairRecompute(params.state, params.job.id) + ) { + return false; + } let changed = false; try { const newNext = computeJobNextRunAtMs(params.job, params.nowMs); + if (newNext === undefined && shouldTreatUndefinedNextRunAsScheduleError(params.job)) { + const err = + params.job.schedule.kind === "every" + ? new Error("invalid every schedule: everyMs must be a finite number") + : params.job.schedule.kind === "at" + ? new Error("invalid at schedule: at must be a valid absolute timestamp") + : new Error("invalid cron schedule: expr is required"); + if (recordScheduleComputeError({ state: params.state, job: params.job, err })) { + changed = true; + } + return changed; + } if (params.job.state.nextRunAtMs !== newNext) { params.job.state.nextRunAtMs = newNext; changed = true; @@ -460,16 +530,17 @@ export function recomputeNextRuns(state: CronServiceState): boolean { */ export function recomputeNextRunsForMaintenance( state: CronServiceState, - opts?: { recomputeExpired?: boolean; nowMs?: number }, + opts?: { recomputeExpired?: boolean; nowMs?: number; consumeReloadRepairSkip?: boolean }, ): boolean { const recomputeExpired = opts?.recomputeExpired ?? false; + const consumeReloadRepairSkip = opts?.consumeReloadRepairSkip ?? true; return walkSchedulableJobs( state, ({ job, nowMs: now }) => { let changed = false; if (!isFiniteTimestamp(job.state.nextRunAtMs)) { // Missing or invalid nextRunAtMs is always repaired. - if (recomputeJobNextRunAtMs({ state, job, nowMs: now })) { + if (recomputeJobNextRunAtMs({ state, job, nowMs: now, consumeReloadRepairSkip })) { changed = true; } } else if ( @@ -482,7 +553,7 @@ export function recomputeNextRunsForMaintenance( const lastRun = job.state.lastRunAtMs; const alreadyExecutedSlot = isFiniteTimestamp(lastRun) && lastRun >= job.state.nextRunAtMs; if (alreadyExecutedSlot) { - if (recomputeJobNextRunAtMs({ state, job, nowMs: now })) { + if (recomputeJobNextRunAtMs({ state, job, nowMs: now, consumeReloadRepairSkip })) { changed = true; } } @@ -495,18 +566,46 @@ export function recomputeNextRunsForMaintenance( export function nextWakeAtMs(state: CronServiceState) { const jobs = state.store?.jobs ?? []; - const enabled = jobs.filter((j) => j.enabled && isFiniteTimestamp(j.state.nextRunAtMs)); - if (enabled.length === 0) { - return undefined; + let minEnabledNextRunAtMs: number | undefined; + let hasEnabledRepairableMissingNextRun = false; + const nowMs = state.deps.nowMs(); + + for (const job of jobs) { + if (!job.enabled) { + continue; + } + const nextRunAtMs = job.state.nextRunAtMs; + if (isFiniteTimestamp(nextRunAtMs)) { + minEnabledNextRunAtMs = + minEnabledNextRunAtMs === undefined + ? nextRunAtMs + : Math.min(minEnabledNextRunAtMs, nextRunAtMs); + continue; + } + // Only wake for missing nextRun values that can be repaired by recompute. + // Non-repairable malformed schedules (e.g. invalid every/at payloads) + // should not keep the scheduler in a perpetual 2s poll loop. + if ((job.state.scheduleErrorCount ?? 0) > 0) { + hasEnabledRepairableMissingNextRun = true; + continue; + } + try { + if (computeJobNextRunAtMs(job, nowMs) !== undefined) { + hasEnabledRepairableMissingNextRun = true; + } + } catch { + hasEnabledRepairableMissingNextRun = true; + } } - const first = enabled[0]?.state.nextRunAtMs; - if (!isFiniteTimestamp(first)) { - return undefined; + + if (!hasEnabledRepairableMissingNextRun) { + return minEnabledNextRunAtMs; } - return enabled.reduce((min, j) => { - const next = j.state.nextRunAtMs; - return isFiniteTimestamp(next) ? Math.min(min, next) : min; - }, first); + + const wakeForMissingNextRunAtMs = nowMs + MISSING_NEXT_RUN_WAKE_MS; + return minEnabledNextRunAtMs === undefined + ? wakeForMissingNextRunAtMs + : Math.min(minEnabledNextRunAtMs, wakeForMissingNextRunAtMs); } export function createJob(state: CronServiceState, input: CronJobCreate): CronJob { diff --git a/src/cron/service/ops.ts b/src/cron/service/ops.ts index 69751e4dfdb..95e4f84df87 100644 --- a/src/cron/service/ops.ts +++ b/src/cron/service/ops.ts @@ -9,10 +9,12 @@ import { findJobOrThrow, isJobDue, nextWakeAtMs, + removeJobById, recomputeNextRuns, recomputeNextRunsForMaintenance, } from "./jobs.js"; import { locked } from "./locked.js"; +import { schedulesEqual } from "./schedule-equality.js"; import type { CronServiceState } from "./state.js"; import { ensureLoaded, persist, warnIfDisabled } from "./store.js"; import { @@ -47,6 +49,7 @@ export type CronListPageResult = { hasMore: boolean; nextOffset: number | null; }; + function mergeManualRunSnapshotAfterReload(params: { state: CronServiceState; jobId: string; @@ -55,13 +58,17 @@ function mergeManualRunSnapshotAfterReload(params: { updatedAtMs: number; state: CronJob["state"]; } | null; + baseline: { + enabled: boolean; + schedule: CronJob["schedule"]; + } | null; removed: boolean; }) { if (!params.state.store) { return; } if (params.removed) { - params.state.store.jobs = params.state.store.jobs.filter((job) => job.id !== params.jobId); + removeJobById(params.state, params.jobId); return; } if (!params.snapshot) { @@ -71,9 +78,34 @@ function mergeManualRunSnapshotAfterReload(params: { if (!reloaded) { return; } - reloaded.enabled = params.snapshot.enabled; - reloaded.updatedAtMs = params.snapshot.updatedAtMs; - reloaded.state = params.snapshot.state; + const preservedEnabled = reloaded.enabled; + const preservedNextRunAtMs = reloaded.state.nextRunAtMs; + const preservedScheduleErrorCount = reloaded.state.scheduleErrorCount; + const preservedScheduleErrorText = reloaded.state.lastError; + const externalScheduleOrEnabledChanged = + params.baseline !== null && + (preservedEnabled !== params.baseline.enabled || + !schedulesEqual(reloaded.schedule, params.baseline.schedule)); + + reloaded.updatedAtMs = Math.max(reloaded.updatedAtMs, params.snapshot.updatedAtMs); + reloaded.state = { + ...reloaded.state, + ...params.snapshot.state, + }; + + // Only preserve reload-derived schedule/enable repairs when the underlying + // schedule or enabled flag was externally changed while the manual run was executing. + // Otherwise, keep the manual-run terminal state (e.g. one-shot disable on success). + if (externalScheduleOrEnabledChanged) { + reloaded.enabled = preservedEnabled; + reloaded.state.nextRunAtMs = preservedNextRunAtMs; + reloaded.state.scheduleErrorCount = preservedScheduleErrorCount; + if (preservedScheduleErrorCount !== undefined) { + reloaded.state.lastError = preservedScheduleErrorText; + } + } else { + reloaded.enabled = params.snapshot.enabled; + } } async function ensureLoadedForRead(state: CronServiceState) { @@ -83,7 +115,7 @@ async function ensureLoadedForRead(state: CronServiceState) { } // Use the maintenance-only version so that read-only operations never // advance a past-due nextRunAtMs without executing the job (#16156). - const changed = recomputeNextRunsForMaintenance(state); + const changed = recomputeNextRunsForMaintenance(state, { consumeReloadRepairSkip: false }); if (changed) { await persist(state); } @@ -302,6 +334,7 @@ export async function update(state: CronServiceState, id: string, patch: CronJob job.state.nextRunAtMs = undefined; job.state.runningAtMs = undefined; } + state.skipNextReloadRepairRecomputeJobIds.delete(id); } else if (job.enabled) { // Non-schedule edits should not mutate other jobs, but still repair a // missing/corrupt nextRunAtMs for the updated job. @@ -326,12 +359,10 @@ export async function remove(state: CronServiceState, id: string) { return await locked(state, async () => { warnIfDisabled(state, "remove"); await ensureLoaded(state); - const before = state.store?.jobs.length ?? 0; if (!state.store) { return { ok: false, removed: false } as const; } - state.store.jobs = state.store.jobs.filter((j) => j.id !== id); - const removed = (state.store.jobs.length ?? 0) !== before; + const removed = removeJobById(state, id); await persist(state); armTimer(state); if (removed) { @@ -383,7 +414,7 @@ async function inspectManualRunPreflight( // Normalize job tick state (clears stale runningAtMs markers) before // checking if already running, so a stale marker from a crashed Phase-1 // persist does not block manual triggers for up to STUCK_RUN_MS (#17554). - recomputeNextRunsForMaintenance(state); + recomputeNextRunsForMaintenance(state, { consumeReloadRepairSkip: false }); const job = findJobOrThrow(state, id); if (typeof job.state.runningAtMs === "number") { return { ok: true, ran: false, reason: "already-running" as const }; @@ -508,8 +539,7 @@ async function finishPreparedManualRun( usage: coreResult.usage, }); - if (shouldDelete && state.store) { - state.store.jobs = state.store.jobs.filter((entry) => entry.id !== job.id); + if (shouldDelete && removeJobById(state, job.id)) { emit(state, { jobId: job.id, action: "removed" }); } @@ -523,6 +553,12 @@ async function finishPreparedManualRun( updatedAtMs: job.updatedAtMs, state: structuredClone(job.state), }; + const postRunBaseline = shouldDelete + ? null + : { + enabled: executionJob.enabled, + schedule: structuredClone(executionJob.schedule), + }; const postRunRemoved = shouldDelete; // Isolated Telegram send can persist target writeback directly to disk. // Reload before final persist so manual `cron run` keeps those changes. @@ -531,6 +567,7 @@ async function finishPreparedManualRun( state, jobId, snapshot: postRunSnapshot, + baseline: postRunBaseline, removed: postRunRemoved, }); recomputeNextRunsForMaintenance(state, { recomputeExpired: true }); diff --git a/src/cron/service/schedule-equality.ts b/src/cron/service/schedule-equality.ts new file mode 100644 index 00000000000..d4f64432ede --- /dev/null +++ b/src/cron/service/schedule-equality.ts @@ -0,0 +1,17 @@ +import type { CronJob } from "../types.js"; + +export function schedulesEqual(a: CronJob["schedule"], b: CronJob["schedule"]): boolean { + if (a.kind !== b.kind) { + return false; + } + if (a.kind === "at" && b.kind === "at") { + return a.at === b.at; + } + if (a.kind === "every" && b.kind === "every") { + return a.everyMs === b.everyMs && a.anchorMs === b.anchorMs; + } + if (a.kind === "cron" && b.kind === "cron") { + return a.expr === b.expr && a.tz === b.tz && a.staggerMs === b.staggerMs; + } + return false; +} diff --git a/src/cron/service/state.ts b/src/cron/service/state.ts index 073efd8f459..e56c7d23bcb 100644 --- a/src/cron/service/state.ts +++ b/src/cron/service/state.ts @@ -127,6 +127,7 @@ export type CronServiceState = { warnedDisabled: boolean; storeLoadedAtMs: number | null; storeFileMtimeMs: number | null; + skipNextReloadRepairRecomputeJobIds: Set; }; export function createCronServiceState(deps: CronServiceDeps): CronServiceState { @@ -139,6 +140,7 @@ export function createCronServiceState(deps: CronServiceDeps): CronServiceState warnedDisabled: false, storeLoadedAtMs: null, storeFileMtimeMs: null, + skipNextReloadRepairRecomputeJobIds: new Set(), }; } diff --git a/src/cron/service/store.ts b/src/cron/service/store.ts index d1d36e48e08..c7e2452844e 100644 --- a/src/cron/service/store.ts +++ b/src/cron/service/store.ts @@ -2,7 +2,13 @@ import fs from "node:fs"; import { normalizeStoredCronJobs } from "../store-migration.js"; import { loadCronStore, saveCronStore } from "../store.js"; import type { CronJob } from "../types.js"; -import { recomputeNextRuns } from "./jobs.js"; +import { + computeJobNextRunAtMs, + recordScheduleComputeError, + recomputeNextRuns, + shouldTreatUndefinedNextRunAsScheduleError, +} from "./jobs.js"; +import { schedulesEqual } from "./schedule-equality.js"; import type { CronServiceState } from "./state.js"; async function getFileMtimeMs(path: string): Promise { @@ -14,6 +20,118 @@ async function getFileMtimeMs(path: string): Promise { } } +function resolveExternalRepairComputeBaseMs(params: { + nowMs: number; + reloadedUpdatedAtMs: number; + previousUpdatedAtMs: number; +}): number { + const { nowMs, reloadedUpdatedAtMs, previousUpdatedAtMs } = params; + if (!Number.isFinite(reloadedUpdatedAtMs)) { + return nowMs; + } + const normalizedReloadedUpdatedAtMs = Math.max(0, Math.floor(reloadedUpdatedAtMs)); + const normalizedPreviousUpdatedAtMs = Number.isFinite(previousUpdatedAtMs) + ? Math.max(0, Math.floor(previousUpdatedAtMs)) + : Number.NEGATIVE_INFINITY; + if (normalizedReloadedUpdatedAtMs <= normalizedPreviousUpdatedAtMs) { + return nowMs; + } + return Math.min(nowMs, normalizedReloadedUpdatedAtMs); +} + +function repairNextRunsAfterExternalReload(params: { + state: CronServiceState; + previousJobs: CronJob[] | undefined; +}): boolean { + const { state, previousJobs } = params; + const skipRecomputeJobIds = state.skipNextReloadRepairRecomputeJobIds; + if (!state.store || previousJobs === undefined) { + return false; + } + if (skipRecomputeJobIds.size > 0) { + const currentJobIds = new Set(state.store.jobs.map((job) => job.id)); + for (const jobId of skipRecomputeJobIds) { + if (!currentJobIds.has(jobId)) { + skipRecomputeJobIds.delete(jobId); + } + } + } + + const previousById = new Map(previousJobs.map((job) => [job.id, job])); + const now = state.deps.nowMs(); + let changed = false; + + for (const job of state.store.jobs) { + const previous = previousById.get(job.id); + if (!previous) { + continue; + } + if (typeof previous.state.runningAtMs === "number" && job.state.runningAtMs === undefined) { + job.state.runningAtMs = previous.state.runningAtMs; + changed = true; + } + + const scheduleChanged = !schedulesEqual(previous.schedule, job.schedule); + const enabledChanged = previous.enabled !== job.enabled; + if (!scheduleChanged && !enabledChanged) { + continue; + } + + skipRecomputeJobIds.delete(job.id); + const computeBaseMs = resolveExternalRepairComputeBaseMs({ + nowMs: now, + reloadedUpdatedAtMs: job.updatedAtMs, + previousUpdatedAtMs: previous.updatedAtMs, + }); + let nextRunAtMs: number | undefined; + try { + nextRunAtMs = job.enabled ? computeJobNextRunAtMs(job, computeBaseMs) : undefined; + if (nextRunAtMs === undefined && shouldTreatUndefinedNextRunAsScheduleError(job)) { + const err = + job.schedule.kind === "every" + ? new Error("invalid every schedule: everyMs must be a finite number") + : new Error("invalid at schedule: at must be a valid absolute timestamp"); + if (recordScheduleComputeError({ state, job, err })) { + changed = true; + } + skipRecomputeJobIds.add(job.id); + continue; + } + if (job.state.scheduleErrorCount !== undefined) { + job.state.scheduleErrorCount = undefined; + changed = true; + } + } catch (err) { + if (recordScheduleComputeError({ state, job, err })) { + changed = true; + } + skipRecomputeJobIds.add(job.id); + continue; + } + if (job.state.nextRunAtMs !== nextRunAtMs) { + job.state.nextRunAtMs = nextRunAtMs; + changed = true; + } + if (!job.enabled && job.state.runningAtMs !== undefined) { + job.state.runningAtMs = undefined; + changed = true; + } + + state.deps.log.debug( + { + jobId: job.id, + scheduleChanged, + enabledChanged, + computeBaseMs, + nextRunAtMs: job.state.nextRunAtMs, + }, + "cron: repaired nextRunAtMs after external reload", + ); + } + + return changed; +} + export async function ensureLoaded( state: CronServiceState, opts?: { @@ -31,6 +149,7 @@ export async function ensureLoaded( // Force reload always re-reads the file to avoid missing cross-service // edits on filesystems with coarse mtime resolution. + const previousJobs = state.store?.jobs; const fileMtimeMs = await getFileMtimeMs(state.deps.storePath); const loaded = await loadCronStore(state.deps.storePath); const jobs = (loaded.jobs ?? []) as unknown as Array>; @@ -38,12 +157,16 @@ export async function ensureLoaded( state.store = { version: 1, jobs: jobs as unknown as CronJob[] }; state.storeLoadedAtMs = state.deps.nowMs(); state.storeFileMtimeMs = fileMtimeMs; + const repairedExternalReload = repairNextRunsAfterExternalReload({ + state, + previousJobs, + }); if (!opts?.skipRecompute) { recomputeNextRuns(state); } - if (mutated) { + if (mutated || repairedExternalReload) { await persist(state, { skipBackup: true }); } } diff --git a/src/cron/service/timer.ts b/src/cron/service/timer.ts index e12c4ae38e7..66662de5616 100644 --- a/src/cron/service/timer.ts +++ b/src/cron/service/timer.ts @@ -15,7 +15,9 @@ import type { import { computeJobPreviousRunAtMs, computeJobNextRunAtMs, + hasSkipNextReloadRepairRecompute, nextWakeAtMs, + removeJobById, recomputeNextRunsForMaintenance, recordScheduleComputeError, resolveJobPayloadTextForMain, @@ -368,6 +370,7 @@ export function applyJobResult( const shouldDelete = job.schedule.kind === "at" && job.deleteAfterRun === true && result.status === "ok"; + const skipImmediateScheduleRecompute = hasSkipNextReloadRepairRecompute(state, job.id); if (!shouldDelete) { if (job.schedule.kind === "at") { @@ -416,54 +419,58 @@ export function applyJobResult( } else if (result.status === "error" && job.enabled) { // Apply exponential backoff for errored jobs to prevent retry storms. const backoff = errorBackoffMs(job.state.consecutiveErrors ?? 1); - let normalNext: number | undefined; - try { - normalNext = - opts?.preserveSchedule && job.schedule.kind === "every" - ? computeNextWithPreservedLastRun(result.endedAt) - : computeJobNextRunAtMs(job, result.endedAt); - } catch (err) { - // If the schedule expression/timezone throws (croner edge cases), - // record the schedule error (auto-disables after repeated failures) - // and fall back to backoff-only schedule so the state update is not lost. - recordScheduleComputeError({ state, job, err }); - } - const backoffNext = result.endedAt + backoff; - // Use whichever is later: the natural next run or the backoff delay. - job.state.nextRunAtMs = - normalNext !== undefined ? Math.max(normalNext, backoffNext) : backoffNext; - state.deps.log.info( - { - jobId: job.id, - consecutiveErrors: job.state.consecutiveErrors, - backoffMs: backoff, - nextRunAtMs: job.state.nextRunAtMs, - }, - "cron: applying error backoff", - ); - } else if (job.enabled) { - let naturalNext: number | undefined; - try { - naturalNext = - opts?.preserveSchedule && job.schedule.kind === "every" - ? computeNextWithPreservedLastRun(result.endedAt) - : computeJobNextRunAtMs(job, result.endedAt); - } catch (err) { - // If the schedule expression/timezone throws (croner edge cases), - // record the schedule error (auto-disables after repeated failures) - // so a persistent throw doesn't cause a MIN_REFIRE_GAP_MS hot loop. - recordScheduleComputeError({ state, job, err }); - } - if (job.schedule.kind === "cron") { - // Safety net: ensure the next fire is at least MIN_REFIRE_GAP_MS - // after the current run ended. Prevents spin-loops when the - // schedule computation lands in the same second due to - // timezone/croner edge cases (see #17821). - const minNext = result.endedAt + MIN_REFIRE_GAP_MS; + if (!skipImmediateScheduleRecompute) { + let normalNext: number | undefined; + try { + normalNext = + opts?.preserveSchedule && job.schedule.kind === "every" + ? computeNextWithPreservedLastRun(result.endedAt) + : computeJobNextRunAtMs(job, result.endedAt); + } catch (err) { + // If the schedule expression/timezone throws (croner edge cases), + // record the schedule error (auto-disables after repeated failures) + // and fall back to backoff-only schedule so the state update is not lost. + recordScheduleComputeError({ state, job, err }); + } + const backoffNext = result.endedAt + backoff; + // Use whichever is later: the natural next run or the backoff delay. job.state.nextRunAtMs = - naturalNext !== undefined ? Math.max(naturalNext, minNext) : minNext; - } else { - job.state.nextRunAtMs = naturalNext; + normalNext !== undefined ? Math.max(normalNext, backoffNext) : backoffNext; + state.deps.log.info( + { + jobId: job.id, + consecutiveErrors: job.state.consecutiveErrors, + backoffMs: backoff, + nextRunAtMs: job.state.nextRunAtMs, + }, + "cron: applying error backoff", + ); + } + } else if (job.enabled) { + if (!skipImmediateScheduleRecompute) { + let naturalNext: number | undefined; + try { + naturalNext = + opts?.preserveSchedule && job.schedule.kind === "every" + ? computeNextWithPreservedLastRun(result.endedAt) + : computeJobNextRunAtMs(job, result.endedAt); + } catch (err) { + // If the schedule expression/timezone throws (croner edge cases), + // record the schedule error (auto-disables after repeated failures) + // so a persistent throw doesn't cause a MIN_REFIRE_GAP_MS hot loop. + recordScheduleComputeError({ state, job, err }); + } + if (job.schedule.kind === "cron") { + // Safety net: ensure the next fire is at least MIN_REFIRE_GAP_MS + // after the current run ended. Prevents spin-loops when the + // schedule computation lands in the same second due to + // timezone/croner edge cases (see #17821). + const minNext = result.endedAt + MIN_REFIRE_GAP_MS; + job.state.nextRunAtMs = + naturalNext !== undefined ? Math.max(naturalNext, minNext) : minNext; + } else { + job.state.nextRunAtMs = naturalNext; + } } } else { job.state.nextRunAtMs = undefined; @@ -498,8 +505,7 @@ function applyOutcomeToStoredJob(state: CronServiceState, result: TimedCronRunOu emitJobFinished(state, job, result, result.startedAt); - if (shouldDelete) { - store.jobs = jobs.filter((entry) => entry.id !== job.id); + if (shouldDelete && removeJobById(state, job.id)) { emit(state, { jobId: job.id, action: "removed" }); } } @@ -1194,8 +1200,7 @@ export async function executeJob( emitJobFinished(state, job, coreResult, startedAt); - if (shouldDelete && state.store) { - state.store.jobs = state.store.jobs.filter((j) => j.id !== job.id); + if (shouldDelete && removeJobById(state, job.id)) { emit(state, { jobId: job.id, action: "removed" }); } }