diff --git a/src/cron/service.external-reload-schedule-recompute.test.ts b/src/cron/service.external-reload-schedule-recompute.test.ts new file mode 100644 index 00000000000..7c45d2cac29 --- /dev/null +++ b/src/cron/service.external-reload-schedule-recompute.test.ts @@ -0,0 +1,79 @@ +import fs from "node:fs/promises"; +import { describe, expect, it, vi } from "vitest"; +import { setupCronServiceSuite, writeCronStoreSnapshot } from "./service.test-harness.js"; +import { createCronServiceState } from "./service/state.js"; +import { ensureLoaded } from "./service/store.js"; +import type { CronJob } from "./types.js"; + +const { logger: noopLogger, makeStorePath } = setupCronServiceSuite({ + prefix: "openclaw-cron-external-reload-", + baseTimeIso: "2026-03-19T01:44:00.000Z", +}); + +describe("forceReload repairs externally changed cron schedules", () => { + it("recomputes nextRunAtMs when jobs.json changes schedule outside cron.update", async () => { + const store = await makeStorePath(); + const nowMs = Date.parse("2026-03-19T01:44:00.000Z"); + const jobId = "external-schedule-change"; + const staleNextRunAtMs = Date.parse("2026-03-20T00:30:00.000Z"); + const correctedNextRunAtMs = Date.parse("2026-03-19T12:30:00.000Z"); + + const createJob = (expr: string): CronJob => ({ + id: jobId, + name: "external schedule change", + enabled: true, + createdAtMs: Date.parse("2026-03-18T00:30:00.000Z"), + updatedAtMs: Date.parse("2026-03-19T01:44:00.000Z"), + schedule: { kind: "cron", expr, tz: "Asia/Shanghai", staggerMs: 0 }, + sessionTarget: "main", + wakeMode: "next-heartbeat", + payload: { kind: "systemEvent", text: "tick" }, + state: { + nextRunAtMs: staleNextRunAtMs, + lastRunAtMs: Date.parse("2026-03-19T00:30:00.000Z"), + lastStatus: "ok", + lastRunStatus: "ok", + }, + }); + + await writeCronStoreSnapshot({ + storePath: store.storePath, + jobs: [createJob("30 8 * * *")], + }); + + const state = createCronServiceState({ + cronEnabled: true, + storePath: store.storePath, + log: noopLogger, + nowMs: () => nowMs, + enqueueSystemEvent: vi.fn(), + requestHeartbeatNow: vi.fn(), + runIsolatedAgentJob: vi.fn(async () => ({ status: "ok" as const })), + }); + + await ensureLoaded(state, { skipRecompute: true }); + expect(state.store?.jobs[0]?.state.nextRunAtMs).toBe(staleNextRunAtMs); + + await writeCronStoreSnapshot({ + storePath: store.storePath, + jobs: [createJob("30 8,20 * * *")], + }); + + await ensureLoaded(state, { forceReload: true, skipRecompute: true }); + + const reloaded = state.store?.jobs.find((job) => job.id === jobId); + expect(reloaded?.schedule).toEqual({ + kind: "cron", + expr: "30 8,20 * * *", + tz: "Asia/Shanghai", + staggerMs: 0, + }); + expect(reloaded?.state.nextRunAtMs).toBe(correctedNextRunAtMs); + + const persisted = JSON.parse(await fs.readFile(store.storePath, "utf8")) as { + jobs?: Array<{ id: string; state?: { nextRunAtMs?: number } }>; + }; + const persistedJob = persisted.jobs?.find((job) => job.id === jobId); + expect(persistedJob?.state?.nextRunAtMs).toBe(correctedNextRunAtMs); + }); +}); diff --git a/src/cron/service/store.ts b/src/cron/service/store.ts index d1d36e48e08..37942ccd367 100644 --- a/src/cron/service/store.ts +++ b/src/cron/service/store.ts @@ -2,7 +2,7 @@ import fs from "node:fs"; import { normalizeStoredCronJobs } from "../store-migration.js"; import { loadCronStore, saveCronStore } from "../store.js"; import type { CronJob } from "../types.js"; -import { recomputeNextRuns } from "./jobs.js"; +import { computeJobNextRunAtMs, recomputeNextRuns } from "./jobs.js"; import type { CronServiceState } from "./state.js"; async function getFileMtimeMs(path: string): Promise { @@ -14,6 +14,71 @@ async function getFileMtimeMs(path: string): Promise { } } +function schedulesEqual(a: CronJob["schedule"], b: CronJob["schedule"]): boolean { + if (a.kind !== b.kind) { + return false; + } + if (a.kind === "at" && b.kind === "at") { + return a.at === b.at; + } + if (a.kind === "every" && b.kind === "every") { + return a.everyMs === b.everyMs && a.anchorMs === b.anchorMs; + } + if (a.kind === "cron" && b.kind === "cron") { + return a.expr === b.expr && a.tz === b.tz && a.staggerMs === b.staggerMs; + } + return false; +} + +function repairNextRunsAfterExternalReload(params: { + state: CronServiceState; + previousJobs: CronJob[] | undefined; +}): boolean { + const { state, previousJobs } = params; + if (!state.store || !previousJobs?.length) { + return false; + } + + const previousById = new Map(previousJobs.map((job) => [job.id, job])); + const now = state.deps.nowMs(); + let changed = false; + + for (const job of state.store.jobs) { + const previous = previousById.get(job.id); + if (!previous) { + continue; + } + + const scheduleChanged = !schedulesEqual(previous.schedule, job.schedule); + const enabledChanged = previous.enabled !== job.enabled; + if (!scheduleChanged && !enabledChanged) { + continue; + } + + const nextRunAtMs = job.enabled ? computeJobNextRunAtMs(job, now) : undefined; + if (job.state.nextRunAtMs !== nextRunAtMs) { + job.state.nextRunAtMs = nextRunAtMs; + changed = true; + } + if (!job.enabled && job.state.runningAtMs !== undefined) { + job.state.runningAtMs = undefined; + changed = true; + } + + state.deps.log.debug( + { + jobId: job.id, + scheduleChanged, + enabledChanged, + nextRunAtMs: job.state.nextRunAtMs, + }, + "cron: repaired nextRunAtMs after external reload", + ); + } + + return changed; +} + export async function ensureLoaded( state: CronServiceState, opts?: { @@ -31,6 +96,7 @@ export async function ensureLoaded( // Force reload always re-reads the file to avoid missing cross-service // edits on filesystems with coarse mtime resolution. + const previousJobs = state.store?.jobs; const fileMtimeMs = await getFileMtimeMs(state.deps.storePath); const loaded = await loadCronStore(state.deps.storePath); const jobs = (loaded.jobs ?? []) as unknown as Array>; @@ -38,12 +104,16 @@ export async function ensureLoaded( state.store = { version: 1, jobs: jobs as unknown as CronJob[] }; state.storeLoadedAtMs = state.deps.nowMs(); state.storeFileMtimeMs = fileMtimeMs; + const repairedExternalReload = repairNextRunsAfterExternalReload({ + state, + previousJobs, + }); if (!opts?.skipRecompute) { recomputeNextRuns(state); } - if (mutated) { + if (mutated || repairedExternalReload) { await persist(state, { skipBackup: true }); } }