diff --git a/src/cron/service.external-reload-schedule-recompute.test.ts b/src/cron/service.external-reload-schedule-recompute.test.ts index 95bed250642..7fde2d75d3c 100644 --- a/src/cron/service.external-reload-schedule-recompute.test.ts +++ b/src/cron/service.external-reload-schedule-recompute.test.ts @@ -2,6 +2,7 @@ import fs from "node:fs/promises"; import { describe, expect, it, vi } from "vitest"; import { setupCronServiceSuite, writeCronStoreSnapshot } from "./service.test-harness.js"; import { recomputeNextRuns, recomputeNextRunsForMaintenance } from "./service/jobs.js"; +import { run } from "./service/ops.js"; import { createCronServiceState } from "./service/state.js"; import { ensureLoaded } from "./service/store.js"; import type { CronJob } from "./types.js"; @@ -302,4 +303,97 @@ describe("forceReload repairs externally changed cron schedules", () => { recomputeNextRunsForMaintenance(state); expect(state.store?.jobs[0]?.state.scheduleErrorCount).toBe(2); }); + + it("keeps forceReload repairs when manual-run snapshot is merged back", async () => { + const store = await makeStorePath(); + let nowMs = Date.parse("2026-03-19T01:44:00.000Z"); + const jobId = "manual-run-reload-merge"; + const staleNextRunAtMs = Date.parse("2026-03-19T23:30:00.000Z"); + + const createJob = (params: { + expr: string; + enabled: boolean; + nextRunAtMs?: number; + lastStatus?: CronJob["state"]["lastStatus"]; + }): CronJob => ({ + id: jobId, + name: "manual run reload merge", + enabled: params.enabled, + createdAtMs: Date.parse("2026-03-18T00:30:00.000Z"), + updatedAtMs: Date.parse("2026-03-19T01:44:00.000Z"), + schedule: { kind: "cron", expr: params.expr, tz: "Asia/Shanghai", staggerMs: 0 }, + sessionTarget: "isolated", + wakeMode: "next-heartbeat", + payload: { kind: "agentTurn", message: "tick" }, + state: { + nextRunAtMs: params.nextRunAtMs, + lastStatus: params.lastStatus, + }, + }); + + await writeCronStoreSnapshot({ + storePath: store.storePath, + jobs: [ + createJob({ + expr: "30 23 * * *", + enabled: true, + nextRunAtMs: staleNextRunAtMs, + }), + ], + }); + + const runIsolatedAgentJob = vi.fn(async () => { + await writeCronStoreSnapshot({ + storePath: store.storePath, + jobs: [ + createJob({ + expr: "30 8 * * *", + enabled: false, + nextRunAtMs: staleNextRunAtMs, + lastStatus: "error", + }), + ], + }); + nowMs += 500; + return { status: "ok" as const, summary: "done" }; + }); + + const state = createCronServiceState({ + cronEnabled: true, + storePath: store.storePath, + log: noopLogger, + nowMs: () => nowMs, + enqueueSystemEvent: vi.fn(), + requestHeartbeatNow: vi.fn(), + runIsolatedAgentJob, + }); + + const result = await run(state, jobId, "force"); + expect(result).toEqual({ ok: true, ran: true }); + expect(runIsolatedAgentJob).toHaveBeenCalledTimes(1); + + const merged = state.store?.jobs.find((job) => job.id === jobId); + expect(merged?.schedule).toEqual({ + kind: "cron", + expr: "30 8 * * *", + tz: "Asia/Shanghai", + staggerMs: 0, + }); + expect(merged?.enabled).toBe(false); + expect(merged?.state.nextRunAtMs).toBeUndefined(); + expect(merged?.state.lastStatus).toBe("ok"); + expect(merged?.state.lastRunAtMs).toBeDefined(); + + const persisted = JSON.parse(await fs.readFile(store.storePath, "utf8")) as { + jobs?: Array<{ + id: string; + enabled?: boolean; + state?: { nextRunAtMs?: number; lastStatus?: string }; + }>; + }; + const persistedJob = persisted.jobs?.find((job) => job.id === jobId); + expect(persistedJob?.enabled).toBe(false); + expect(persistedJob?.state?.nextRunAtMs).toBeUndefined(); + expect(persistedJob?.state?.lastStatus).toBe("ok"); + }); }); diff --git a/src/cron/service.issue-regressions.test.ts b/src/cron/service.issue-regressions.test.ts index dac28f4b0c9..e29a059f39d 100644 --- a/src/cron/service.issue-regressions.test.ts +++ b/src/cron/service.issue-regressions.test.ts @@ -22,7 +22,7 @@ import { createNoopLogger, createRunningCronServiceState, } from "./service.test-harness.js"; -import { computeJobNextRunAtMs } from "./service/jobs.js"; +import { computeJobNextRunAtMs, recomputeNextRunsForMaintenance } from "./service/jobs.js"; import { enqueueRun, run } from "./service/ops.js"; import { createCronServiceState, type CronEvent } from "./service/state.js"; import { @@ -1748,6 +1748,48 @@ describe("Cron issue regressions", () => { expect(job.enabled).toBe(true); }); + it("does not double-count reload schedule errors in apply path before maintenance recompute", () => { + const startedAt = Date.parse("2026-03-02T12:10:00.000Z"); + const endedAt = startedAt + 25; + const job = createIsolatedRegressionJob({ + id: "apply-result-reload-dedupe-30905", + name: "apply-result-reload-dedupe-30905", + scheduledAt: startedAt, + schedule: { kind: "cron", expr: "0 7 * * *", tz: "Invalid/Timezone" }, + payload: { kind: "agentTurn", message: "ping" }, + state: { + nextRunAtMs: undefined, + runningAtMs: startedAt - 500, + scheduleErrorCount: 1, + lastError: "schedule error: previous", + }, + }); + const state = createRunningCronServiceState({ + storePath: "/tmp/cron-30905-reload-dedupe.json", + log: noopLogger as never, + nowMs: () => endedAt, + jobs: [job], + }); + state.skipNextReloadRepairRecomputeJobIds = new Set([job.id]); + + applyJobResult(state, job, { + status: "ok", + delivered: true, + startedAt, + endedAt, + }); + + expect(job.state.scheduleErrorCount).toBe(1); + expect(state.skipNextReloadRepairRecomputeJobIds?.has(job.id)).toBe(true); + + recomputeNextRunsForMaintenance(state); + expect(job.state.scheduleErrorCount).toBe(1); + expect(state.skipNextReloadRepairRecomputeJobIds?.has(job.id)).toBe(false); + + recomputeNextRunsForMaintenance(state); + expect(job.state.scheduleErrorCount).toBe(2); + }); + it("force run preserves 'every' anchor while recording manual lastRunAtMs", () => { const nowMs = Date.now(); const everyMs = 24 * 60 * 60 * 1_000; diff --git a/src/cron/service/jobs.ts b/src/cron/service/jobs.ts index 8f853ea8e92..db90f1502f1 100644 --- a/src/cron/service/jobs.ts +++ b/src/cron/service/jobs.ts @@ -413,12 +413,19 @@ function walkSchedulableJobs( return changed; } -function consumeSkipNextReloadRepairRecompute(state: CronServiceState, jobId: string): boolean { +export function hasSkipNextReloadRepairRecompute(state: CronServiceState, jobId: string): boolean { const pending = state.skipNextReloadRepairRecomputeJobIds; - if (!pending?.has(jobId)) { + return pending?.has(jobId) === true; +} + +export function consumeSkipNextReloadRepairRecompute( + state: CronServiceState, + jobId: string, +): boolean { + if (!hasSkipNextReloadRepairRecompute(state, jobId)) { return false; } - pending.delete(jobId); + state.skipNextReloadRepairRecomputeJobIds?.delete(jobId); return true; } diff --git a/src/cron/service/ops.ts b/src/cron/service/ops.ts index 69751e4dfdb..e932e666cae 100644 --- a/src/cron/service/ops.ts +++ b/src/cron/service/ops.ts @@ -51,7 +51,6 @@ function mergeManualRunSnapshotAfterReload(params: { state: CronServiceState; jobId: string; snapshot: { - enabled: boolean; updatedAtMs: number; state: CronJob["state"]; } | null; @@ -71,9 +70,25 @@ function mergeManualRunSnapshotAfterReload(params: { if (!reloaded) { return; } - reloaded.enabled = params.snapshot.enabled; - reloaded.updatedAtMs = params.snapshot.updatedAtMs; - reloaded.state = params.snapshot.state; + const preservedEnabled = reloaded.enabled; + const preservedNextRunAtMs = reloaded.state.nextRunAtMs; + const preservedScheduleErrorCount = reloaded.state.scheduleErrorCount; + const preservedScheduleErrorText = reloaded.state.lastError; + + reloaded.updatedAtMs = Math.max(reloaded.updatedAtMs, params.snapshot.updatedAtMs); + reloaded.state = { + ...reloaded.state, + ...params.snapshot.state, + }; + + // Keep externally reloaded schedule/enable repairs even when a manual run + // snapshot was captured before forceReload. + reloaded.enabled = preservedEnabled; + reloaded.state.nextRunAtMs = preservedNextRunAtMs; + if (preservedScheduleErrorCount !== undefined) { + reloaded.state.scheduleErrorCount = preservedScheduleErrorCount; + reloaded.state.lastError = preservedScheduleErrorText; + } } async function ensureLoadedForRead(state: CronServiceState) { @@ -519,7 +534,6 @@ async function finishPreparedManualRun( const postRunSnapshot = shouldDelete ? null : { - enabled: job.enabled, updatedAtMs: job.updatedAtMs, state: structuredClone(job.state), }; diff --git a/src/cron/service/timer.ts b/src/cron/service/timer.ts index e12c4ae38e7..b3c130b124b 100644 --- a/src/cron/service/timer.ts +++ b/src/cron/service/timer.ts @@ -15,6 +15,7 @@ import type { import { computeJobPreviousRunAtMs, computeJobNextRunAtMs, + hasSkipNextReloadRepairRecompute, nextWakeAtMs, recomputeNextRunsForMaintenance, recordScheduleComputeError, @@ -368,6 +369,7 @@ export function applyJobResult( const shouldDelete = job.schedule.kind === "at" && job.deleteAfterRun === true && result.status === "ok"; + const skipImmediateScheduleRecompute = hasSkipNextReloadRepairRecompute(state, job.id); if (!shouldDelete) { if (job.schedule.kind === "at") { @@ -416,54 +418,58 @@ export function applyJobResult( } else if (result.status === "error" && job.enabled) { // Apply exponential backoff for errored jobs to prevent retry storms. const backoff = errorBackoffMs(job.state.consecutiveErrors ?? 1); - let normalNext: number | undefined; - try { - normalNext = - opts?.preserveSchedule && job.schedule.kind === "every" - ? computeNextWithPreservedLastRun(result.endedAt) - : computeJobNextRunAtMs(job, result.endedAt); - } catch (err) { - // If the schedule expression/timezone throws (croner edge cases), - // record the schedule error (auto-disables after repeated failures) - // and fall back to backoff-only schedule so the state update is not lost. - recordScheduleComputeError({ state, job, err }); - } - const backoffNext = result.endedAt + backoff; - // Use whichever is later: the natural next run or the backoff delay. - job.state.nextRunAtMs = - normalNext !== undefined ? Math.max(normalNext, backoffNext) : backoffNext; - state.deps.log.info( - { - jobId: job.id, - consecutiveErrors: job.state.consecutiveErrors, - backoffMs: backoff, - nextRunAtMs: job.state.nextRunAtMs, - }, - "cron: applying error backoff", - ); - } else if (job.enabled) { - let naturalNext: number | undefined; - try { - naturalNext = - opts?.preserveSchedule && job.schedule.kind === "every" - ? computeNextWithPreservedLastRun(result.endedAt) - : computeJobNextRunAtMs(job, result.endedAt); - } catch (err) { - // If the schedule expression/timezone throws (croner edge cases), - // record the schedule error (auto-disables after repeated failures) - // so a persistent throw doesn't cause a MIN_REFIRE_GAP_MS hot loop. - recordScheduleComputeError({ state, job, err }); - } - if (job.schedule.kind === "cron") { - // Safety net: ensure the next fire is at least MIN_REFIRE_GAP_MS - // after the current run ended. Prevents spin-loops when the - // schedule computation lands in the same second due to - // timezone/croner edge cases (see #17821). - const minNext = result.endedAt + MIN_REFIRE_GAP_MS; + if (!skipImmediateScheduleRecompute) { + let normalNext: number | undefined; + try { + normalNext = + opts?.preserveSchedule && job.schedule.kind === "every" + ? computeNextWithPreservedLastRun(result.endedAt) + : computeJobNextRunAtMs(job, result.endedAt); + } catch (err) { + // If the schedule expression/timezone throws (croner edge cases), + // record the schedule error (auto-disables after repeated failures) + // and fall back to backoff-only schedule so the state update is not lost. + recordScheduleComputeError({ state, job, err }); + } + const backoffNext = result.endedAt + backoff; + // Use whichever is later: the natural next run or the backoff delay. job.state.nextRunAtMs = - naturalNext !== undefined ? Math.max(naturalNext, minNext) : minNext; - } else { - job.state.nextRunAtMs = naturalNext; + normalNext !== undefined ? Math.max(normalNext, backoffNext) : backoffNext; + state.deps.log.info( + { + jobId: job.id, + consecutiveErrors: job.state.consecutiveErrors, + backoffMs: backoff, + nextRunAtMs: job.state.nextRunAtMs, + }, + "cron: applying error backoff", + ); + } + } else if (job.enabled) { + if (!skipImmediateScheduleRecompute) { + let naturalNext: number | undefined; + try { + naturalNext = + opts?.preserveSchedule && job.schedule.kind === "every" + ? computeNextWithPreservedLastRun(result.endedAt) + : computeJobNextRunAtMs(job, result.endedAt); + } catch (err) { + // If the schedule expression/timezone throws (croner edge cases), + // record the schedule error (auto-disables after repeated failures) + // so a persistent throw doesn't cause a MIN_REFIRE_GAP_MS hot loop. + recordScheduleComputeError({ state, job, err }); + } + if (job.schedule.kind === "cron") { + // Safety net: ensure the next fire is at least MIN_REFIRE_GAP_MS + // after the current run ended. Prevents spin-loops when the + // schedule computation lands in the same second due to + // timezone/croner edge cases (see #17821). + const minNext = result.endedAt + MIN_REFIRE_GAP_MS; + job.state.nextRunAtMs = + naturalNext !== undefined ? Math.max(naturalNext, minNext) : minNext; + } else { + job.state.nextRunAtMs = naturalNext; + } } } else { job.state.nextRunAtMs = undefined;