From 089077d985f9cf4de1ce66511bd8dff7f7d451b8 Mon Sep 17 00:00:00 2001 From: create Date: Sat, 21 Mar 2026 13:48:35 +0800 Subject: [PATCH] fix(cron): narrow external reload repair scope --- ...external-reload-schedule-recompute.test.ts | 702 ++++-------------- src/cron/service/ops.ts | 1 + src/cron/service/store.ts | 74 +- 3 files changed, 155 insertions(+), 622 deletions(-) diff --git a/src/cron/service.external-reload-schedule-recompute.test.ts b/src/cron/service.external-reload-schedule-recompute.test.ts index 16af1888e78..cec8c03e311 100644 --- a/src/cron/service.external-reload-schedule-recompute.test.ts +++ b/src/cron/service.external-reload-schedule-recompute.test.ts @@ -1,8 +1,8 @@ import fs from "node:fs/promises"; import { describe, expect, it, vi } from "vitest"; import { setupCronServiceSuite, writeCronStoreSnapshot } from "./service.test-harness.js"; -import { recomputeNextRuns, recomputeNextRunsForMaintenance } from "./service/jobs.js"; -import { remove, run } from "./service/ops.js"; +import { recomputeNextRuns } from "./service/jobs.js"; +import { run } from "./service/ops.js"; import { createCronServiceState } from "./service/state.js"; import { ensureLoaded } from "./service/store.js"; import type { CronJob } from "./types.js"; @@ -12,35 +12,48 @@ const { logger: noopLogger, makeStorePath } = setupCronServiceSuite({ baseTimeIso: "2026-03-19T01:44:00.000Z", }); -describe("forceReload repairs externally changed cron schedules", () => { - it("recomputes nextRunAtMs when jobs.json changes schedule outside cron.update", async () => { +function createCronJob(params: { + id: string; + expr: string; + updatedAtMs?: number; + enabled?: boolean; + nextRunAtMs?: number; + scheduleErrorCount?: number; + lastError?: string; + lastStatus?: CronJob["state"]["lastStatus"]; + runningAtMs?: number; +}): CronJob { + return { + id: params.id, + name: params.id, + enabled: params.enabled ?? true, + createdAtMs: Date.parse("2026-03-18T00:30:00.000Z"), + updatedAtMs: params.updatedAtMs ?? Date.parse("2026-03-19T01:44:00.000Z"), + schedule: { kind: "cron", expr: params.expr, tz: "Asia/Shanghai", staggerMs: 0 }, + sessionTarget: "main", + wakeMode: "next-heartbeat", + payload: { kind: "systemEvent", text: "tick" }, + state: { + nextRunAtMs: params.nextRunAtMs, + scheduleErrorCount: params.scheduleErrorCount, + lastError: params.lastError, + lastStatus: params.lastStatus, + runningAtMs: params.runningAtMs, + }, + }; +} + +describe("forceReload repairs externally changed schedules", () => { + it("recomputes nextRunAtMs when jobs.json changes a cron schedule outside cron.update", async () => { const store = await makeStorePath(); const nowMs = Date.parse("2026-03-19T01:44:00.000Z"); const jobId = "external-schedule-change"; const staleNextRunAtMs = Date.parse("2026-03-20T00:30:00.000Z"); const correctedNextRunAtMs = Date.parse("2026-03-19T12:30:00.000Z"); - const createJob = (expr: string): CronJob => ({ - id: jobId, - name: "external schedule change", - enabled: true, - createdAtMs: Date.parse("2026-03-18T00:30:00.000Z"), - updatedAtMs: Date.parse("2026-03-19T01:44:00.000Z"), - schedule: { kind: "cron", expr, tz: "Asia/Shanghai", staggerMs: 0 }, - sessionTarget: "main", - wakeMode: "next-heartbeat", - payload: { kind: "systemEvent", text: "tick" }, - state: { - nextRunAtMs: staleNextRunAtMs, - lastRunAtMs: Date.parse("2026-03-19T00:30:00.000Z"), - lastStatus: "ok", - lastRunStatus: "ok", - }, - }); - await writeCronStoreSnapshot({ storePath: store.storePath, - jobs: [createJob("30 8 * * *")], + jobs: [createCronJob({ id: jobId, expr: "30 8 * * *", nextRunAtMs: staleNextRunAtMs })], }); const state = createCronServiceState({ @@ -54,56 +67,44 @@ describe("forceReload repairs externally changed cron schedules", () => { }); await ensureLoaded(state, { skipRecompute: true }); - expect(state.store?.jobs[0]?.state.nextRunAtMs).toBe(staleNextRunAtMs); await writeCronStoreSnapshot({ storePath: store.storePath, - jobs: [createJob("30 8,20 * * *")], + jobs: [createCronJob({ id: jobId, expr: "30 8,20 * * *", nextRunAtMs: staleNextRunAtMs })], }); await ensureLoaded(state, { forceReload: true, skipRecompute: true }); const reloaded = state.store?.jobs.find((job) => job.id === jobId); - expect(reloaded?.schedule).toEqual({ - kind: "cron", - expr: "30 8,20 * * *", - tz: "Asia/Shanghai", - staggerMs: 0, - }); expect(reloaded?.state.nextRunAtMs).toBe(correctedNextRunAtMs); const persisted = JSON.parse(await fs.readFile(store.storePath, "utf8")) as { jobs?: Array<{ id: string; state?: { nextRunAtMs?: number } }>; }; - const persistedJob = persisted.jobs?.find((job) => job.id === jobId); - expect(persistedJob?.state?.nextRunAtMs).toBe(correctedNextRunAtMs); + expect(persisted.jobs?.find((job) => job.id === jobId)?.state?.nextRunAtMs).toBe( + correctedNextRunAtMs, + ); }); it("recomputes from updatedAtMs so delayed reload keeps newly earlier slots due", async () => { const store = await makeStorePath(); const nowMs = Date.parse("2026-03-19T12:10:00.000Z"); - const initialUpdatedAtMs = Date.parse("2026-03-19T12:00:00.000Z"); - const editedAtMs = Date.parse("2026-03-19T12:01:00.000Z"); const jobId = "external-schedule-change-delayed-observe"; + const staleNextRunAtMs = Date.parse("2026-03-20T00:30:00.000Z"); - const createJob = (params: { expr: string; updatedAtMs: number }): CronJob => ({ - id: jobId, - name: "external schedule delayed observe", - enabled: true, - createdAtMs: Date.parse("2026-03-18T00:30:00.000Z"), - updatedAtMs: params.updatedAtMs, - schedule: { kind: "cron", expr: params.expr, tz: "UTC", staggerMs: 0 }, - sessionTarget: "main", - wakeMode: "next-heartbeat", - payload: { kind: "systemEvent", text: "tick" }, - state: { - nextRunAtMs: Date.parse("2026-03-20T00:30:00.000Z"), - }, - }); + const createJob = (params: { expr: string; updatedAtMs: number }) => + createCronJob({ + id: jobId, + expr: params.expr, + updatedAtMs: params.updatedAtMs, + nextRunAtMs: staleNextRunAtMs, + }); await writeCronStoreSnapshot({ storePath: store.storePath, - jobs: [createJob({ expr: "30 23 * * *", updatedAtMs: initialUpdatedAtMs })], + jobs: [ + createJob({ expr: "30 23 * * *", updatedAtMs: Date.parse("2026-03-19T12:00:00.000Z") }), + ], }); const state = createCronServiceState({ @@ -120,150 +121,12 @@ describe("forceReload repairs externally changed cron schedules", () => { await writeCronStoreSnapshot({ storePath: store.storePath, - jobs: [createJob({ expr: "* * * * *", updatedAtMs: editedAtMs })], + jobs: [createJob({ expr: "* * * * *", updatedAtMs: Date.parse("2026-03-19T12:01:00.000Z") })], }); await ensureLoaded(state, { forceReload: true, skipRecompute: true }); - const reloaded = state.store?.jobs.find((job) => job.id === jobId); - expect(reloaded?.state.nextRunAtMs).toBeLessThan(nowMs); - expect(reloaded?.state.nextRunAtMs).toBe(Date.parse("2026-03-19T12:02:00.000Z")); - }); - - it("repairs stale nextRunAtMs on first load after restart", async () => { - const store = await makeStorePath(); - const nowMs = Date.parse("2026-03-19T12:10:00.000Z"); - const editedAtMs = Date.parse("2026-03-19T12:01:00.000Z"); - const jobId = "external-schedule-change-cold-load"; - const staleNextRunAtMs = Date.parse("2026-03-20T00:30:00.000Z"); - - const createJob = (): CronJob => ({ - id: jobId, - name: "external schedule cold load repair", - enabled: true, - createdAtMs: Date.parse("2026-03-18T00:30:00.000Z"), - updatedAtMs: editedAtMs, - schedule: { kind: "cron", expr: "* * * * *", tz: "UTC", staggerMs: 0 }, - sessionTarget: "main", - wakeMode: "next-heartbeat", - payload: { kind: "systemEvent", text: "tick" }, - state: { - nextRunAtMs: staleNextRunAtMs, - }, - }); - - await writeCronStoreSnapshot({ - storePath: store.storePath, - jobs: [createJob()], - }); - - const state = createCronServiceState({ - cronEnabled: true, - storePath: store.storePath, - log: noopLogger, - nowMs: () => nowMs, - enqueueSystemEvent: vi.fn(), - requestHeartbeatNow: vi.fn(), - runIsolatedAgentJob: vi.fn(async () => ({ status: "ok" as const })), - }); - - await ensureLoaded(state, { skipRecompute: true }); - - const reloaded = state.store?.jobs.find((job) => job.id === jobId); - expect(reloaded?.state.nextRunAtMs).toBe(Date.parse("2026-03-19T12:02:00.000Z")); - - const persisted = JSON.parse(await fs.readFile(store.storePath, "utf8")) as { - jobs?: Array<{ id: string; state?: { nextRunAtMs?: number } }>; - }; - const persistedJob = persisted.jobs?.find((job) => job.id === jobId); - expect(persistedJob?.state?.nextRunAtMs).toBe(Date.parse("2026-03-19T12:02:00.000Z")); - }); - - it("repairs overdue stale nextRunAtMs on first load when edit happened later", async () => { - const store = await makeStorePath(); - const nowMs = Date.parse("2026-03-19T12:10:00.000Z"); - const staleDueNextRunAtMs = Date.parse("2026-03-19T12:00:00.000Z"); - const editedAtMs = Date.parse("2026-03-19T12:05:00.000Z"); - const jobId = "external-schedule-change-cold-load-overdue"; - - const createJob = (): CronJob => ({ - id: jobId, - name: "external schedule cold load overdue repair", - enabled: true, - createdAtMs: Date.parse("2026-03-18T00:30:00.000Z"), - updatedAtMs: editedAtMs, - schedule: { kind: "cron", expr: "30 23 * * *", tz: "UTC", staggerMs: 0 }, - sessionTarget: "main", - wakeMode: "next-heartbeat", - payload: { kind: "systemEvent", text: "tick" }, - state: { - nextRunAtMs: staleDueNextRunAtMs, - }, - }); - - await writeCronStoreSnapshot({ - storePath: store.storePath, - jobs: [createJob()], - }); - - const state = createCronServiceState({ - cronEnabled: true, - storePath: store.storePath, - log: noopLogger, - nowMs: () => nowMs, - enqueueSystemEvent: vi.fn(), - requestHeartbeatNow: vi.fn(), - runIsolatedAgentJob: vi.fn(async () => ({ status: "ok" as const })), - }); - - await ensureLoaded(state, { skipRecompute: true }); - - const reloaded = state.store?.jobs.find((job) => job.id === jobId); - expect(reloaded?.state.nextRunAtMs).toBe(Date.parse("2026-03-19T23:30:00.000Z")); - }); - - it("repairs cold-load stale nextRunAtMs even when consecutiveErrors is set", async () => { - const store = await makeStorePath(); - const nowMs = Date.parse("2026-03-19T12:10:00.000Z"); - const editedAtMs = Date.parse("2026-03-19T12:01:00.000Z"); - const jobId = "external-schedule-change-cold-load-consecutive-errors"; - const staleNextRunAtMs = Date.parse("2026-03-20T00:30:00.000Z"); - - const createJob = (): CronJob => ({ - id: jobId, - name: "external schedule cold load with consecutiveErrors", - enabled: true, - createdAtMs: Date.parse("2026-03-18T00:30:00.000Z"), - updatedAtMs: editedAtMs, - schedule: { kind: "cron", expr: "* * * * *", tz: "UTC", staggerMs: 0 }, - sessionTarget: "main", - wakeMode: "next-heartbeat", - payload: { kind: "systemEvent", text: "tick" }, - state: { - nextRunAtMs: staleNextRunAtMs, - consecutiveErrors: 2, - }, - }); - - await writeCronStoreSnapshot({ - storePath: store.storePath, - jobs: [createJob()], - }); - - const state = createCronServiceState({ - cronEnabled: true, - storePath: store.storePath, - log: noopLogger, - nowMs: () => nowMs, - enqueueSystemEvent: vi.fn(), - requestHeartbeatNow: vi.fn(), - runIsolatedAgentJob: vi.fn(async () => ({ status: "ok" as const })), - }); - - await ensureLoaded(state, { skipRecompute: true }); - - const reloaded = state.store?.jobs.find((job) => job.id === jobId); - expect(reloaded?.state.nextRunAtMs).toBe(Date.parse("2026-03-19T12:02:00.000Z")); + expect(state.store?.jobs[0]?.state.nextRunAtMs).toBe(Date.parse("2026-03-19T12:02:00.000Z")); }); it("records schedule errors instead of aborting reload when an external edit is invalid", async () => { @@ -271,27 +134,15 @@ describe("forceReload repairs externally changed cron schedules", () => { const nowMs = Date.parse("2026-03-19T01:44:00.000Z"); const jobId = "external-invalid-schedule"; - const createJob = (expr: string): CronJob => ({ - id: jobId, - name: "external invalid schedule", - enabled: true, - createdAtMs: Date.parse("2026-03-18T00:30:00.000Z"), - updatedAtMs: Date.parse("2026-03-19T01:44:00.000Z"), - schedule: { kind: "cron", expr, tz: "Asia/Shanghai", staggerMs: 0 }, - sessionTarget: "main", - wakeMode: "next-heartbeat", - payload: { kind: "systemEvent", text: "tick" }, - state: { - nextRunAtMs: Date.parse("2026-03-20T00:30:00.000Z"), - lastRunAtMs: Date.parse("2026-03-19T00:30:00.000Z"), - lastStatus: "ok", - lastRunStatus: "ok", - }, - }); - await writeCronStoreSnapshot({ storePath: store.storePath, - jobs: [createJob("30 8 * * *")], + jobs: [ + createCronJob({ + id: jobId, + expr: "30 8 * * *", + nextRunAtMs: Date.parse("2026-03-20T00:30:00.000Z"), + }), + ], }); const state = createCronServiceState({ @@ -308,53 +159,39 @@ describe("forceReload repairs externally changed cron schedules", () => { await writeCronStoreSnapshot({ storePath: store.storePath, - jobs: [createJob("not a valid cron")], + jobs: [ + createCronJob({ + id: jobId, + expr: "not a valid cron", + nextRunAtMs: Date.parse("2026-03-20T00:30:00.000Z"), + }), + ], }); await expect( ensureLoaded(state, { forceReload: true, skipRecompute: true }), ).resolves.toBeUndefined(); - const reloaded = state.store?.jobs.find((job) => job.id === jobId); + const reloaded = state.store?.jobs[0]; expect(reloaded?.state.nextRunAtMs).toBeUndefined(); expect(reloaded?.state.scheduleErrorCount).toBe(1); expect(reloaded?.state.lastError).toMatch(/^schedule error:/); - - const persisted = JSON.parse(await fs.readFile(store.storePath, "utf8")) as { - jobs?: Array<{ - id: string; - state?: { scheduleErrorCount?: number; lastError?: string; nextRunAtMs?: number }; - }>; - }; - const persistedJob = persisted.jobs?.find((job) => job.id === jobId); - expect(persistedJob?.state?.scheduleErrorCount).toBe(1); - expect(persistedJob?.state?.lastError).toMatch(/^schedule error:/); - expect(persistedJob?.state?.nextRunAtMs).toBeUndefined(); }); - it("does not double-count a reload schedule error during the immediate full recompute", async () => { + it("does not double-count a reload schedule error during the immediate recompute", async () => { const store = await makeStorePath(); const nowMs = Date.parse("2026-03-19T01:44:00.000Z"); const jobId = "external-invalid-schedule-full-recompute"; - const createJob = (expr: string): CronJob => ({ - id: jobId, - name: "external invalid schedule full recompute", - enabled: true, - createdAtMs: Date.parse("2026-03-18T00:30:00.000Z"), - updatedAtMs: Date.parse("2026-03-19T01:44:00.000Z"), - schedule: { kind: "cron", expr, tz: "Asia/Shanghai", staggerMs: 0 }, - sessionTarget: "main", - wakeMode: "next-heartbeat", - payload: { kind: "systemEvent", text: "tick" }, - state: { - nextRunAtMs: Date.parse("2026-03-20T00:30:00.000Z"), - }, - }); - await writeCronStoreSnapshot({ storePath: store.storePath, - jobs: [createJob("30 8 * * *")], + jobs: [ + createCronJob({ + id: jobId, + expr: "30 8 * * *", + nextRunAtMs: Date.parse("2026-03-20T00:30:00.000Z"), + }), + ], }); const state = createCronServiceState({ @@ -371,7 +208,13 @@ describe("forceReload repairs externally changed cron schedules", () => { await writeCronStoreSnapshot({ storePath: store.storePath, - jobs: [createJob("not a valid cron")], + jobs: [ + createCronJob({ + id: jobId, + expr: "not a valid cron", + nextRunAtMs: Date.parse("2026-03-20T00:30:00.000Z"), + }), + ], }); await ensureLoaded(state, { forceReload: true, skipRecompute: true }); @@ -379,116 +222,6 @@ describe("forceReload repairs externally changed cron schedules", () => { recomputeNextRuns(state); expect(state.store?.jobs[0]?.state.scheduleErrorCount).toBe(1); - - recomputeNextRuns(state); - expect(state.store?.jobs[0]?.state.scheduleErrorCount).toBe(2); - }); - - it("does not double-count a reload schedule error during immediate maintenance recompute", async () => { - const store = await makeStorePath(); - const nowMs = Date.parse("2026-03-19T01:44:00.000Z"); - const jobId = "external-invalid-schedule-maintenance"; - - const createJob = (expr: string): CronJob => ({ - id: jobId, - name: "external invalid schedule maintenance", - enabled: true, - createdAtMs: Date.parse("2026-03-18T00:30:00.000Z"), - updatedAtMs: Date.parse("2026-03-19T01:44:00.000Z"), - schedule: { kind: "cron", expr, tz: "Asia/Shanghai", staggerMs: 0 }, - sessionTarget: "main", - wakeMode: "next-heartbeat", - payload: { kind: "systemEvent", text: "tick" }, - state: { - nextRunAtMs: Date.parse("2026-03-20T00:30:00.000Z"), - }, - }); - - await writeCronStoreSnapshot({ - storePath: store.storePath, - jobs: [createJob("30 8 * * *")], - }); - - const state = createCronServiceState({ - cronEnabled: true, - storePath: store.storePath, - log: noopLogger, - nowMs: () => nowMs, - enqueueSystemEvent: vi.fn(), - requestHeartbeatNow: vi.fn(), - runIsolatedAgentJob: vi.fn(async () => ({ status: "ok" as const })), - }); - - await ensureLoaded(state, { skipRecompute: true }); - - await writeCronStoreSnapshot({ - storePath: store.storePath, - jobs: [createJob("not a valid cron")], - }); - - await ensureLoaded(state, { forceReload: true, skipRecompute: true }); - expect(state.store?.jobs[0]?.state.scheduleErrorCount).toBe(1); - - recomputeNextRunsForMaintenance(state); - expect(state.store?.jobs[0]?.state.scheduleErrorCount).toBe(1); - - recomputeNextRunsForMaintenance(state); - expect(state.store?.jobs[0]?.state.scheduleErrorCount).toBe(2); - }); - - it("preserves the one-shot skip across a second forceReload before maintenance recompute", async () => { - const store = await makeStorePath(); - const nowMs = Date.parse("2026-03-19T01:44:00.000Z"); - const jobId = "external-invalid-schedule-second-reload"; - - const createJob = (expr: string): CronJob => ({ - id: jobId, - name: "external invalid schedule second reload", - enabled: true, - createdAtMs: Date.parse("2026-03-18T00:30:00.000Z"), - updatedAtMs: Date.parse("2026-03-19T01:44:00.000Z"), - schedule: { kind: "cron", expr, tz: "Asia/Shanghai", staggerMs: 0 }, - sessionTarget: "main", - wakeMode: "next-heartbeat", - payload: { kind: "systemEvent", text: "tick" }, - state: { - nextRunAtMs: Date.parse("2026-03-20T00:30:00.000Z"), - }, - }); - - await writeCronStoreSnapshot({ - storePath: store.storePath, - jobs: [createJob("30 8 * * *")], - }); - - const state = createCronServiceState({ - cronEnabled: true, - storePath: store.storePath, - log: noopLogger, - nowMs: () => nowMs, - enqueueSystemEvent: vi.fn(), - requestHeartbeatNow: vi.fn(), - runIsolatedAgentJob: vi.fn(async () => ({ status: "ok" as const })), - }); - - await ensureLoaded(state, { skipRecompute: true }); - - await writeCronStoreSnapshot({ - storePath: store.storePath, - jobs: [createJob("not a valid cron")], - }); - - await ensureLoaded(state, { forceReload: true, skipRecompute: true }); - expect(state.store?.jobs[0]?.state.scheduleErrorCount).toBe(1); - - await ensureLoaded(state, { forceReload: true, skipRecompute: true }); - expect(state.store?.jobs[0]?.state.scheduleErrorCount).toBe(1); - - recomputeNextRunsForMaintenance(state); - expect(state.store?.jobs[0]?.state.scheduleErrorCount).toBe(1); - - recomputeNextRunsForMaintenance(state); - expect(state.store?.jobs[0]?.state.scheduleErrorCount).toBe(2); }); it("keeps forceReload repairs when manual-run snapshot is merged back", async () => { @@ -497,49 +230,26 @@ describe("forceReload repairs externally changed cron schedules", () => { const jobId = "manual-run-reload-merge"; const staleNextRunAtMs = Date.parse("2026-03-19T23:30:00.000Z"); - const createJob = (params: { - expr: string; - enabled: boolean; - nextRunAtMs?: number; - lastStatus?: CronJob["state"]["lastStatus"]; - }): CronJob => ({ - id: jobId, - name: "manual run reload merge", - enabled: params.enabled, - createdAtMs: Date.parse("2026-03-18T00:30:00.000Z"), - updatedAtMs: Date.parse("2026-03-19T01:44:00.000Z"), - schedule: { kind: "cron", expr: params.expr, tz: "Asia/Shanghai", staggerMs: 0 }, - sessionTarget: "isolated", - wakeMode: "next-heartbeat", - payload: { kind: "agentTurn", message: "tick" }, - state: { + const createJob = (params: { expr: string; enabled: boolean; nextRunAtMs?: number }) => ({ + ...createCronJob({ + id: jobId, + expr: params.expr, + enabled: params.enabled, nextRunAtMs: params.nextRunAtMs, - lastStatus: params.lastStatus, - }, + }), + sessionTarget: "isolated" as const, + payload: { kind: "agentTurn", message: "tick" } as const, }); await writeCronStoreSnapshot({ storePath: store.storePath, - jobs: [ - createJob({ - expr: "30 23 * * *", - enabled: true, - nextRunAtMs: staleNextRunAtMs, - }), - ], + jobs: [createJob({ expr: "30 23 * * *", enabled: true, nextRunAtMs: staleNextRunAtMs })], }); const runIsolatedAgentJob = vi.fn(async () => { await writeCronStoreSnapshot({ storePath: store.storePath, - jobs: [ - createJob({ - expr: "30 8 * * *", - enabled: false, - nextRunAtMs: staleNextRunAtMs, - lastStatus: "error", - }), - ], + jobs: [createJob({ expr: "30 8 * * *", enabled: false, nextRunAtMs: staleNextRunAtMs })], }); nowMs += 500; return { status: "ok" as const, summary: "done" }; @@ -555,33 +265,12 @@ describe("forceReload repairs externally changed cron schedules", () => { runIsolatedAgentJob, }); - const result = await run(state, jobId, "force"); - expect(result).toEqual({ ok: true, ran: true }); - expect(runIsolatedAgentJob).toHaveBeenCalledTimes(1); + expect(await run(state, jobId, "force")).toEqual({ ok: true, ran: true }); - const merged = state.store?.jobs.find((job) => job.id === jobId); - expect(merged?.schedule).toEqual({ - kind: "cron", - expr: "30 8 * * *", - tz: "Asia/Shanghai", - staggerMs: 0, - }); + const merged = state.store?.jobs[0]; expect(merged?.enabled).toBe(false); expect(merged?.state.nextRunAtMs).toBeUndefined(); expect(merged?.state.lastStatus).toBe("ok"); - expect(merged?.state.lastRunAtMs).toBeDefined(); - - const persisted = JSON.parse(await fs.readFile(store.storePath, "utf8")) as { - jobs?: Array<{ - id: string; - enabled?: boolean; - state?: { nextRunAtMs?: number; lastStatus?: string }; - }>; - }; - const persistedJob = persisted.jobs?.find((job) => job.id === jobId); - expect(persistedJob?.enabled).toBe(false); - expect(persistedJob?.state?.nextRunAtMs).toBeUndefined(); - expect(persistedJob?.state?.lastStatus).toBe("ok"); }); it("keeps scheduleErrorCount cleared when external reload fixes schedule during force-run", async () => { @@ -590,51 +279,27 @@ describe("forceReload repairs externally changed cron schedules", () => { const jobId = "manual-run-reload-clears-schedule-error-count"; const staleNextRunAtMs = Date.parse("2026-03-19T23:30:00.000Z"); - const createJob = (params: { - expr: string; - scheduleErrorCount?: number; - lastError?: string; - nextRunAtMs?: number; - }): CronJob => ({ - id: jobId, - name: "manual run reload clears schedule error count", - enabled: true, - createdAtMs: Date.parse("2026-03-18T00:30:00.000Z"), - updatedAtMs: Date.parse("2026-03-19T01:44:00.000Z"), - schedule: { kind: "cron", expr: params.expr, tz: "Asia/Shanghai", staggerMs: 0 }, - sessionTarget: "isolated", - wakeMode: "next-heartbeat", - payload: { kind: "agentTurn", message: "tick" }, - state: { - nextRunAtMs: params.nextRunAtMs, - scheduleErrorCount: params.scheduleErrorCount, - lastError: params.lastError, - }, + const createJob = (expr: string) => ({ + ...createCronJob({ + id: jobId, + expr, + nextRunAtMs: staleNextRunAtMs, + scheduleErrorCount: 2, + lastError: "schedule error: invalid expression", + }), + sessionTarget: "isolated" as const, + payload: { kind: "agentTurn", message: "tick" } as const, }); await writeCronStoreSnapshot({ storePath: store.storePath, - jobs: [ - createJob({ - expr: "30 23 * * *", - nextRunAtMs: staleNextRunAtMs, - scheduleErrorCount: 2, - lastError: "cron: invalid expression", - }), - ], + jobs: [createJob("30 23 * * *")], }); const runIsolatedAgentJob = vi.fn(async () => { await writeCronStoreSnapshot({ storePath: store.storePath, - jobs: [ - createJob({ - expr: "30 8 * * *", - nextRunAtMs: staleNextRunAtMs, - scheduleErrorCount: 2, - lastError: "cron: invalid expression", - }), - ], + jobs: [createJob("30 8 * * *")], }); nowMs += 500; return { status: "ok" as const, summary: "done" }; @@ -650,110 +315,26 @@ describe("forceReload repairs externally changed cron schedules", () => { runIsolatedAgentJob, }); - const result = await run(state, jobId, "force"); - expect(result).toEqual({ ok: true, ran: true }); - expect(runIsolatedAgentJob).toHaveBeenCalledTimes(1); - - const merged = state.store?.jobs.find((job) => job.id === jobId); - expect(merged?.schedule).toEqual({ - kind: "cron", - expr: "30 8 * * *", - tz: "Asia/Shanghai", - staggerMs: 0, - }); - expect(merged?.state.scheduleErrorCount).toBeUndefined(); - - const persisted = JSON.parse(await fs.readFile(store.storePath, "utf8")) as { - jobs?: Array<{ - id: string; - state?: { scheduleErrorCount?: number }; - }>; - }; - const persistedJob = persisted.jobs?.find((job) => job.id === jobId); - expect(persistedJob?.state?.scheduleErrorCount).toBeUndefined(); + expect(await run(state, jobId, "force")).toEqual({ ok: true, ran: true }); + expect(state.store?.jobs[0]?.state.scheduleErrorCount).toBeUndefined(); }); - it("keeps one-shot terminal disable state when manual force-run reloads unchanged store", async () => { + it("preserves runningAtMs when an external reload comes from a stale file snapshot", async () => { const store = await makeStorePath(); - let nowMs = Date.parse("2026-03-19T01:44:00.000Z"); - const jobId = "manual-run-at-terminal-state"; - const scheduledAtMs = nowMs + 60_000; - - const createJob = (): CronJob => ({ - id: jobId, - name: "manual run at terminal state", - enabled: true, - createdAtMs: Date.parse("2026-03-18T00:30:00.000Z"), - updatedAtMs: Date.parse("2026-03-19T01:44:00.000Z"), - schedule: { kind: "at", at: new Date(scheduledAtMs).toISOString() }, - sessionTarget: "isolated", - wakeMode: "next-heartbeat", - payload: { kind: "agentTurn", message: "tick" }, - state: { - nextRunAtMs: scheduledAtMs, - }, - }); + const nowMs = Date.parse("2026-03-19T12:10:00.000Z"); + const jobId = "external-running-marker"; + const runningAtMs = Date.parse("2026-03-19T12:00:00.000Z"); await writeCronStoreSnapshot({ storePath: store.storePath, - jobs: [createJob()], - }); - - const state = createCronServiceState({ - cronEnabled: true, - storePath: store.storePath, - log: noopLogger, - nowMs: () => nowMs, - enqueueSystemEvent: vi.fn(), - requestHeartbeatNow: vi.fn(), - runIsolatedAgentJob: vi.fn(async () => { - nowMs += 500; - return { status: "ok" as const, summary: "done" }; - }), - }); - - const result = await run(state, jobId, "force"); - expect(result).toEqual({ ok: true, ran: true }); - - const merged = state.store?.jobs.find((job) => job.id === jobId); - expect(merged?.enabled).toBe(false); - expect(merged?.state.nextRunAtMs).toBeUndefined(); - expect(merged?.state.lastStatus).toBe("ok"); - - const persisted = JSON.parse(await fs.readFile(store.storePath, "utf8")) as { - jobs?: Array<{ - id: string; - enabled?: boolean; - state?: { nextRunAtMs?: number; lastStatus?: string }; - }>; - }; - const persistedJob = persisted.jobs?.find((job) => job.id === jobId); - expect(persistedJob?.enabled).toBe(false); - expect(persistedJob?.state?.nextRunAtMs).toBeUndefined(); - expect(persistedJob?.state?.lastStatus).toBe("ok"); - }); - - it("clears reload-repair skip markers when a job is removed before same-id rebuild", async () => { - const store = await makeStorePath(); - const nowMs = Date.parse("2026-03-19T01:44:00.000Z"); - const jobId = "external-reload-skip-marker-id-reuse"; - - const createJob = (expr: string): CronJob => ({ - id: jobId, - name: "external reload skip marker id reuse", - enabled: true, - createdAtMs: Date.parse("2026-03-18T00:30:00.000Z"), - updatedAtMs: Date.parse("2026-03-19T01:44:00.000Z"), - schedule: { kind: "cron", expr, tz: "UTC", staggerMs: 0 }, - sessionTarget: "main", - wakeMode: "next-heartbeat", - payload: { kind: "systemEvent", text: "tick" }, - state: {}, - }); - - await writeCronStoreSnapshot({ - storePath: store.storePath, - jobs: [createJob("*/15 * * * *")], + jobs: [ + createCronJob({ + id: jobId, + expr: "30 23 * * *", + nextRunAtMs: Date.parse("2026-03-20T00:30:00.000Z"), + runningAtMs, + }), + ], }); const state = createCronServiceState({ @@ -770,36 +351,31 @@ describe("forceReload repairs externally changed cron schedules", () => { await writeCronStoreSnapshot({ storePath: store.storePath, - jobs: [createJob("not a valid cron")], + jobs: [ + createCronJob({ + id: jobId, + expr: "* * * * *", + updatedAtMs: Date.parse("2026-03-19T12:01:00.000Z"), + nextRunAtMs: Date.parse("2026-03-20T00:30:00.000Z"), + }), + ], }); + await ensureLoaded(state, { forceReload: true, skipRecompute: true }); - expect(state.skipNextReloadRepairRecomputeJobIds.has(jobId)).toBe(true); - const removed = await remove(state, jobId); - expect(removed).toEqual({ ok: true, removed: true }); - expect(state.skipNextReloadRepairRecomputeJobIds.has(jobId)).toBe(false); - - await writeCronStoreSnapshot({ - storePath: store.storePath, - jobs: [createJob("*/5 * * * *")], - }); - await ensureLoaded(state, { forceReload: true, skipRecompute: true }); - recomputeNextRunsForMaintenance(state); - - const rebuilt = state.store?.jobs.find((job) => job.id === jobId); - expect(typeof rebuilt?.state.nextRunAtMs).toBe("number"); - expect(Number.isFinite(rebuilt?.state.nextRunAtMs)).toBe(true); - expect(rebuilt?.state.scheduleErrorCount).toBeUndefined(); + const reloaded = state.store?.jobs[0]; + expect(reloaded?.state.runningAtMs).toBe(runningAtMs); + expect(reloaded?.state.nextRunAtMs).toBe(Date.parse("2026-03-19T12:02:00.000Z")); }); - it("recomputes nextRunAtMs when external every schedule changes", async () => { + it("recomputes nextRunAtMs when an external every schedule changes", async () => { const store = await makeStorePath(); const nowMs = Date.parse("2026-03-19T01:44:00.000Z"); const jobId = "external-every-schedule-change"; const createEveryJob = (everyMs: number): CronJob => ({ id: jobId, - name: "external every schedule change", + name: jobId, enabled: true, createdAtMs: Date.parse("2026-03-18T00:00:00.000Z"), updatedAtMs: Date.parse("2026-03-19T01:44:00.000Z"), @@ -840,12 +416,6 @@ describe("forceReload repairs externally changed cron schedules", () => { await ensureLoaded(state, { forceReload: true, skipRecompute: true }); - const reloaded = state.store?.jobs.find((job) => job.id === jobId); - expect(reloaded?.schedule).toEqual({ - kind: "every", - everyMs: 60_000, - anchorMs: Date.parse("2026-03-19T00:00:00.000Z"), - }); - expect(reloaded?.state.nextRunAtMs).toBe(Date.parse("2026-03-19T01:44:00.000Z")); + expect(state.store?.jobs[0]?.state.nextRunAtMs).toBe(Date.parse("2026-03-19T01:44:00.000Z")); }); }); diff --git a/src/cron/service/ops.ts b/src/cron/service/ops.ts index 9ae602933ce..95e4f84df87 100644 --- a/src/cron/service/ops.ts +++ b/src/cron/service/ops.ts @@ -334,6 +334,7 @@ export async function update(state: CronServiceState, id: string, patch: CronJob job.state.nextRunAtMs = undefined; job.state.runningAtMs = undefined; } + state.skipNextReloadRepairRecomputeJobIds.delete(id); } else if (job.enabled) { // Non-schedule edits should not mutate other jobs, but still repair a // missing/corrupt nextRunAtMs for the updated job. diff --git a/src/cron/service/store.ts b/src/cron/service/store.ts index 2c4f0ecf022..c7e2452844e 100644 --- a/src/cron/service/store.ts +++ b/src/cron/service/store.ts @@ -39,54 +39,13 @@ function resolveExternalRepairComputeBaseMs(params: { return Math.min(nowMs, normalizedReloadedUpdatedAtMs); } -function hasPendingErrorBackoff(job: CronJob, nowMs: number): boolean { - const nextRunAtMs = job.state.nextRunAtMs; - if (typeof nextRunAtMs !== "number" || !Number.isFinite(nextRunAtMs) || nextRunAtMs <= nowMs) { - return false; - } - const consecutiveErrors = job.state.consecutiveErrors; - if ( - typeof consecutiveErrors !== "number" || - !Number.isFinite(consecutiveErrors) || - consecutiveErrors <= 0 - ) { - return false; - } - return job.state.lastStatus === "error"; -} - -function shouldRepairColdLoadNextRun(params: { job: CronJob; nowMs: number }): boolean { - const { job, nowMs } = params; - if (!job.enabled) { - return false; - } - if (typeof job.updatedAtMs !== "number" || !Number.isFinite(job.updatedAtMs)) { - return false; - } - const persistedNextRunAtMs = job.state.nextRunAtMs; - if (typeof persistedNextRunAtMs !== "number" || !Number.isFinite(persistedNextRunAtMs)) { - return false; - } - const normalizedUpdatedAtMs = Math.max(0, Math.floor(job.updatedAtMs)); - // If a schedule edit happened after the persisted slot, the slot is stale - // even when it is already overdue at startup. - if (persistedNextRunAtMs <= nowMs) { - return normalizedUpdatedAtMs > persistedNextRunAtMs; - } - if (hasPendingErrorBackoff(job, nowMs)) { - return false; - } - const computeBaseMs = Math.min(nowMs, normalizedUpdatedAtMs); - return computeBaseMs < persistedNextRunAtMs; -} - function repairNextRunsAfterExternalReload(params: { state: CronServiceState; previousJobs: CronJob[] | undefined; }): boolean { const { state, previousJobs } = params; const skipRecomputeJobIds = state.skipNextReloadRepairRecomputeJobIds; - if (!state.store) { + if (!state.store || previousJobs === undefined) { return false; } if (skipRecomputeJobIds.size > 0) { @@ -98,28 +57,32 @@ function repairNextRunsAfterExternalReload(params: { } } - const previousById = new Map((previousJobs ?? []).map((job) => [job.id, job])); + const previousById = new Map(previousJobs.map((job) => [job.id, job])); const now = state.deps.nowMs(); let changed = false; for (const job of state.store.jobs) { const previous = previousById.get(job.id); - const coldLoadRepairCandidate = - previousJobs === undefined && shouldRepairColdLoadNextRun({ job, nowMs: now }); - const scheduleChanged = previous ? !schedulesEqual(previous.schedule, job.schedule) : false; - const enabledChanged = previous ? previous.enabled !== job.enabled : false; - if (!scheduleChanged && !enabledChanged && !coldLoadRepairCandidate) { + if (!previous) { + continue; + } + if (typeof previous.state.runningAtMs === "number" && job.state.runningAtMs === undefined) { + job.state.runningAtMs = previous.state.runningAtMs; + changed = true; + } + + const scheduleChanged = !schedulesEqual(previous.schedule, job.schedule); + const enabledChanged = previous.enabled !== job.enabled; + if (!scheduleChanged && !enabledChanged) { continue; } skipRecomputeJobIds.delete(job.id); - const computeBaseMs = coldLoadRepairCandidate - ? Math.min(now, Math.max(0, Math.floor(job.updatedAtMs))) - : resolveExternalRepairComputeBaseMs({ - nowMs: now, - reloadedUpdatedAtMs: job.updatedAtMs, - previousUpdatedAtMs: previous?.updatedAtMs ?? Number.NEGATIVE_INFINITY, - }); + const computeBaseMs = resolveExternalRepairComputeBaseMs({ + nowMs: now, + reloadedUpdatedAtMs: job.updatedAtMs, + previousUpdatedAtMs: previous.updatedAtMs, + }); let nextRunAtMs: number | undefined; try { nextRunAtMs = job.enabled ? computeJobNextRunAtMs(job, computeBaseMs) : undefined; @@ -159,7 +122,6 @@ function repairNextRunsAfterExternalReload(params: { jobId: job.id, scheduleChanged, enabledChanged, - coldLoadRepairCandidate, computeBaseMs, nextRunAtMs: job.state.nextRunAtMs, },