diff --git a/src/cron/service.external-reload-schedule-recompute.test.ts b/src/cron/service.external-reload-schedule-recompute.test.ts index bb25f792773..16af1888e78 100644 --- a/src/cron/service.external-reload-schedule-recompute.test.ts +++ b/src/cron/service.external-reload-schedule-recompute.test.ts @@ -179,6 +179,93 @@ describe("forceReload repairs externally changed cron schedules", () => { expect(persistedJob?.state?.nextRunAtMs).toBe(Date.parse("2026-03-19T12:02:00.000Z")); }); + it("repairs overdue stale nextRunAtMs on first load when edit happened later", async () => { + const store = await makeStorePath(); + const nowMs = Date.parse("2026-03-19T12:10:00.000Z"); + const staleDueNextRunAtMs = Date.parse("2026-03-19T12:00:00.000Z"); + const editedAtMs = Date.parse("2026-03-19T12:05:00.000Z"); + const jobId = "external-schedule-change-cold-load-overdue"; + + const createJob = (): CronJob => ({ + id: jobId, + name: "external schedule cold load overdue repair", + enabled: true, + createdAtMs: Date.parse("2026-03-18T00:30:00.000Z"), + updatedAtMs: editedAtMs, + schedule: { kind: "cron", expr: "30 23 * * *", tz: "UTC", staggerMs: 0 }, + sessionTarget: "main", + wakeMode: "next-heartbeat", + payload: { kind: "systemEvent", text: "tick" }, + state: { + nextRunAtMs: staleDueNextRunAtMs, + }, + }); + + await writeCronStoreSnapshot({ + storePath: store.storePath, + jobs: [createJob()], + }); + + const state = createCronServiceState({ + cronEnabled: true, + storePath: store.storePath, + log: noopLogger, + nowMs: () => nowMs, + enqueueSystemEvent: vi.fn(), + requestHeartbeatNow: vi.fn(), + runIsolatedAgentJob: vi.fn(async () => ({ status: "ok" as const })), + }); + + await ensureLoaded(state, { skipRecompute: true }); + + const reloaded = state.store?.jobs.find((job) => job.id === jobId); + expect(reloaded?.state.nextRunAtMs).toBe(Date.parse("2026-03-19T23:30:00.000Z")); + }); + + it("repairs cold-load stale nextRunAtMs even when consecutiveErrors is set", async () => { + const store = await makeStorePath(); + const nowMs = Date.parse("2026-03-19T12:10:00.000Z"); + const editedAtMs = Date.parse("2026-03-19T12:01:00.000Z"); + const jobId = "external-schedule-change-cold-load-consecutive-errors"; + const staleNextRunAtMs = Date.parse("2026-03-20T00:30:00.000Z"); + + const createJob = (): CronJob => ({ + id: jobId, + name: "external schedule cold load with consecutiveErrors", + enabled: true, + createdAtMs: Date.parse("2026-03-18T00:30:00.000Z"), + updatedAtMs: editedAtMs, + schedule: { kind: "cron", expr: "* * * * *", tz: "UTC", staggerMs: 0 }, + sessionTarget: "main", + wakeMode: "next-heartbeat", + payload: { kind: "systemEvent", text: "tick" }, + state: { + nextRunAtMs: staleNextRunAtMs, + consecutiveErrors: 2, + }, + }); + + await writeCronStoreSnapshot({ + storePath: store.storePath, + jobs: [createJob()], + }); + + const state = createCronServiceState({ + cronEnabled: true, + storePath: store.storePath, + log: noopLogger, + nowMs: () => nowMs, + enqueueSystemEvent: vi.fn(), + requestHeartbeatNow: vi.fn(), + runIsolatedAgentJob: vi.fn(async () => ({ status: "ok" as const })), + }); + + await ensureLoaded(state, { skipRecompute: true }); + + const reloaded = state.store?.jobs.find((job) => job.id === jobId); + expect(reloaded?.state.nextRunAtMs).toBe(Date.parse("2026-03-19T12:02:00.000Z")); + }); + it("records schedule errors instead of aborting reload when an external edit is invalid", async () => { const store = await makeStorePath(); const nowMs = Date.parse("2026-03-19T01:44:00.000Z"); diff --git a/src/cron/service.issue-regressions.test.ts b/src/cron/service.issue-regressions.test.ts index 3a1f291943c..baf73b71192 100644 --- a/src/cron/service.issue-regressions.test.ts +++ b/src/cron/service.issue-regressions.test.ts @@ -1835,6 +1835,53 @@ describe("Cron issue regressions", () => { expect(nextWakeAtMs(state)).toBe(endedAt + 2_000); }); + it("does not arm 2s wake for malformed every schedules with non-repairable missing nextRun", () => { + const nowMs = Date.parse("2026-03-02T12:20:00.000Z"); + const job = createIsolatedRegressionJob({ + id: "missing-nextrun-malformed-every", + name: "missing-nextrun-malformed-every", + scheduledAt: nowMs, + schedule: { kind: "every", everyMs: Number.NaN, anchorMs: nowMs - 60_000 }, + payload: { kind: "agentTurn", message: "ping" }, + state: { + nextRunAtMs: undefined, + }, + }); + const state = createRunningCronServiceState({ + storePath: "/tmp/cron-missing-nextrun-malformed-every.json", + log: noopLogger as never, + nowMs: () => nowMs, + jobs: [job], + }); + + expect(nextWakeAtMs(state)).toBeUndefined(); + }); + + it("does not arm 2s wake for exhausted one-shot jobs with missing nextRun", () => { + const nowMs = Date.parse("2026-03-02T12:22:00.000Z"); + const atMs = nowMs - 60_000; + const job = createIsolatedRegressionJob({ + id: "missing-nextrun-exhausted-at", + name: "missing-nextrun-exhausted-at", + scheduledAt: nowMs, + schedule: { kind: "at", at: new Date(atMs).toISOString() }, + payload: { kind: "agentTurn", message: "ping" }, + state: { + nextRunAtMs: undefined, + lastStatus: "ok", + lastRunAtMs: nowMs - 30_000, + }, + }); + const state = createRunningCronServiceState({ + storePath: "/tmp/cron-missing-nextrun-exhausted-at.json", + log: noopLogger as never, + nowMs: () => nowMs, + jobs: [job], + }); + + expect(nextWakeAtMs(state)).toBeUndefined(); + }); + it("force run preserves 'every' anchor while recording manual lastRunAtMs", () => { const nowMs = Date.now(); const everyMs = 24 * 60 * 60 * 1_000; diff --git a/src/cron/service/jobs.ts b/src/cron/service/jobs.ts index 6ba919bcd1c..3c087c94331 100644 --- a/src/cron/service/jobs.ts +++ b/src/cron/service/jobs.ts @@ -528,7 +528,8 @@ export function recomputeNextRunsForMaintenance( export function nextWakeAtMs(state: CronServiceState) { const jobs = state.store?.jobs ?? []; let minEnabledNextRunAtMs: number | undefined; - let hasEnabledMissingNextRun = false; + let hasEnabledRepairableMissingNextRun = false; + const nowMs = state.deps.nowMs(); for (const job of jobs) { if (!job.enabled) { @@ -542,14 +543,27 @@ export function nextWakeAtMs(state: CronServiceState) { : Math.min(minEnabledNextRunAtMs, nextRunAtMs); continue; } - hasEnabledMissingNextRun = true; + // Only wake for missing nextRun values that can be repaired by recompute. + // Non-repairable malformed schedules (e.g. invalid every/at payloads) + // should not keep the scheduler in a perpetual 2s poll loop. + if ((job.state.scheduleErrorCount ?? 0) > 0) { + hasEnabledRepairableMissingNextRun = true; + continue; + } + try { + if (computeJobNextRunAtMs(job, nowMs) !== undefined) { + hasEnabledRepairableMissingNextRun = true; + } + } catch { + hasEnabledRepairableMissingNextRun = true; + } } - if (!hasEnabledMissingNextRun) { + if (!hasEnabledRepairableMissingNextRun) { return minEnabledNextRunAtMs; } - const wakeForMissingNextRunAtMs = state.deps.nowMs() + MISSING_NEXT_RUN_WAKE_MS; + const wakeForMissingNextRunAtMs = nowMs + MISSING_NEXT_RUN_WAKE_MS; return minEnabledNextRunAtMs === undefined ? wakeForMissingNextRunAtMs : Math.min(minEnabledNextRunAtMs, wakeForMissingNextRunAtMs); diff --git a/src/cron/service/store.ts b/src/cron/service/store.ts index 408a9186fae..04ac09fa33b 100644 --- a/src/cron/service/store.ts +++ b/src/cron/service/store.ts @@ -39,9 +39,6 @@ function shouldRepairColdLoadNextRun(params: { job: CronJob; nowMs: number }): b if (!job.enabled) { return false; } - if ((job.state.consecutiveErrors ?? 0) > 0) { - return false; - } if (typeof job.updatedAtMs !== "number" || !Number.isFinite(job.updatedAtMs)) { return false; } @@ -49,13 +46,13 @@ function shouldRepairColdLoadNextRun(params: { job: CronJob; nowMs: number }): b if (typeof persistedNextRunAtMs !== "number" || !Number.isFinite(persistedNextRunAtMs)) { return false; } - // Cold-load repair is only for stale future schedules edited while the - // gateway was offline. Already-due timestamps should be preserved so they can - // execute on the next tick. + const normalizedUpdatedAtMs = Math.max(0, Math.floor(job.updatedAtMs)); + // If a schedule edit happened after the persisted slot, the slot is stale + // even when it is already overdue at startup. if (persistedNextRunAtMs <= nowMs) { - return false; + return normalizedUpdatedAtMs > persistedNextRunAtMs; } - const computeBaseMs = Math.min(nowMs, Math.max(0, Math.floor(job.updatedAtMs))); + const computeBaseMs = Math.min(nowMs, normalizedUpdatedAtMs); return computeBaseMs < persistedNextRunAtMs; }