fix(cron): recompute next run after external schedule reload
This commit is contained in:
parent
8a05c05596
commit
2efa044a29
79
src/cron/service.external-reload-schedule-recompute.test.ts
Normal file
79
src/cron/service.external-reload-schedule-recompute.test.ts
Normal file
@ -0,0 +1,79 @@
|
||||
import fs from "node:fs/promises";
|
||||
import { describe, expect, it, vi } from "vitest";
|
||||
import { setupCronServiceSuite, writeCronStoreSnapshot } from "./service.test-harness.js";
|
||||
import { createCronServiceState } from "./service/state.js";
|
||||
import { ensureLoaded } from "./service/store.js";
|
||||
import type { CronJob } from "./types.js";
|
||||
|
||||
const { logger: noopLogger, makeStorePath } = setupCronServiceSuite({
|
||||
prefix: "openclaw-cron-external-reload-",
|
||||
baseTimeIso: "2026-03-19T01:44:00.000Z",
|
||||
});
|
||||
|
||||
describe("forceReload repairs externally changed cron schedules", () => {
|
||||
it("recomputes nextRunAtMs when jobs.json changes schedule outside cron.update", async () => {
|
||||
const store = await makeStorePath();
|
||||
const nowMs = Date.parse("2026-03-19T01:44:00.000Z");
|
||||
const jobId = "external-schedule-change";
|
||||
const staleNextRunAtMs = Date.parse("2026-03-20T00:30:00.000Z");
|
||||
const correctedNextRunAtMs = Date.parse("2026-03-19T12:30:00.000Z");
|
||||
|
||||
const createJob = (expr: string): CronJob => ({
|
||||
id: jobId,
|
||||
name: "external schedule change",
|
||||
enabled: true,
|
||||
createdAtMs: Date.parse("2026-03-18T00:30:00.000Z"),
|
||||
updatedAtMs: Date.parse("2026-03-19T01:44:00.000Z"),
|
||||
schedule: { kind: "cron", expr, tz: "Asia/Shanghai", staggerMs: 0 },
|
||||
sessionTarget: "main",
|
||||
wakeMode: "next-heartbeat",
|
||||
payload: { kind: "systemEvent", text: "tick" },
|
||||
state: {
|
||||
nextRunAtMs: staleNextRunAtMs,
|
||||
lastRunAtMs: Date.parse("2026-03-19T00:30:00.000Z"),
|
||||
lastStatus: "ok",
|
||||
lastRunStatus: "ok",
|
||||
},
|
||||
});
|
||||
|
||||
await writeCronStoreSnapshot({
|
||||
storePath: store.storePath,
|
||||
jobs: [createJob("30 8 * * *")],
|
||||
});
|
||||
|
||||
const state = createCronServiceState({
|
||||
cronEnabled: true,
|
||||
storePath: store.storePath,
|
||||
log: noopLogger,
|
||||
nowMs: () => nowMs,
|
||||
enqueueSystemEvent: vi.fn(),
|
||||
requestHeartbeatNow: vi.fn(),
|
||||
runIsolatedAgentJob: vi.fn(async () => ({ status: "ok" as const })),
|
||||
});
|
||||
|
||||
await ensureLoaded(state, { skipRecompute: true });
|
||||
expect(state.store?.jobs[0]?.state.nextRunAtMs).toBe(staleNextRunAtMs);
|
||||
|
||||
await writeCronStoreSnapshot({
|
||||
storePath: store.storePath,
|
||||
jobs: [createJob("30 8,20 * * *")],
|
||||
});
|
||||
|
||||
await ensureLoaded(state, { forceReload: true, skipRecompute: true });
|
||||
|
||||
const reloaded = state.store?.jobs.find((job) => job.id === jobId);
|
||||
expect(reloaded?.schedule).toEqual({
|
||||
kind: "cron",
|
||||
expr: "30 8,20 * * *",
|
||||
tz: "Asia/Shanghai",
|
||||
staggerMs: 0,
|
||||
});
|
||||
expect(reloaded?.state.nextRunAtMs).toBe(correctedNextRunAtMs);
|
||||
|
||||
const persisted = JSON.parse(await fs.readFile(store.storePath, "utf8")) as {
|
||||
jobs?: Array<{ id: string; state?: { nextRunAtMs?: number } }>;
|
||||
};
|
||||
const persistedJob = persisted.jobs?.find((job) => job.id === jobId);
|
||||
expect(persistedJob?.state?.nextRunAtMs).toBe(correctedNextRunAtMs);
|
||||
});
|
||||
});
|
||||
@ -2,7 +2,7 @@ import fs from "node:fs";
|
||||
import { normalizeStoredCronJobs } from "../store-migration.js";
|
||||
import { loadCronStore, saveCronStore } from "../store.js";
|
||||
import type { CronJob } from "../types.js";
|
||||
import { recomputeNextRuns } from "./jobs.js";
|
||||
import { computeJobNextRunAtMs, recomputeNextRuns } from "./jobs.js";
|
||||
import type { CronServiceState } from "./state.js";
|
||||
|
||||
async function getFileMtimeMs(path: string): Promise<number | null> {
|
||||
@ -14,6 +14,71 @@ async function getFileMtimeMs(path: string): Promise<number | null> {
|
||||
}
|
||||
}
|
||||
|
||||
function schedulesEqual(a: CronJob["schedule"], b: CronJob["schedule"]): boolean {
|
||||
if (a.kind !== b.kind) {
|
||||
return false;
|
||||
}
|
||||
if (a.kind === "at" && b.kind === "at") {
|
||||
return a.at === b.at;
|
||||
}
|
||||
if (a.kind === "every" && b.kind === "every") {
|
||||
return a.everyMs === b.everyMs && a.anchorMs === b.anchorMs;
|
||||
}
|
||||
if (a.kind === "cron" && b.kind === "cron") {
|
||||
return a.expr === b.expr && a.tz === b.tz && a.staggerMs === b.staggerMs;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
function repairNextRunsAfterExternalReload(params: {
|
||||
state: CronServiceState;
|
||||
previousJobs: CronJob[] | undefined;
|
||||
}): boolean {
|
||||
const { state, previousJobs } = params;
|
||||
if (!state.store || !previousJobs?.length) {
|
||||
return false;
|
||||
}
|
||||
|
||||
const previousById = new Map(previousJobs.map((job) => [job.id, job]));
|
||||
const now = state.deps.nowMs();
|
||||
let changed = false;
|
||||
|
||||
for (const job of state.store.jobs) {
|
||||
const previous = previousById.get(job.id);
|
||||
if (!previous) {
|
||||
continue;
|
||||
}
|
||||
|
||||
const scheduleChanged = !schedulesEqual(previous.schedule, job.schedule);
|
||||
const enabledChanged = previous.enabled !== job.enabled;
|
||||
if (!scheduleChanged && !enabledChanged) {
|
||||
continue;
|
||||
}
|
||||
|
||||
const nextRunAtMs = job.enabled ? computeJobNextRunAtMs(job, now) : undefined;
|
||||
if (job.state.nextRunAtMs !== nextRunAtMs) {
|
||||
job.state.nextRunAtMs = nextRunAtMs;
|
||||
changed = true;
|
||||
}
|
||||
if (!job.enabled && job.state.runningAtMs !== undefined) {
|
||||
job.state.runningAtMs = undefined;
|
||||
changed = true;
|
||||
}
|
||||
|
||||
state.deps.log.debug(
|
||||
{
|
||||
jobId: job.id,
|
||||
scheduleChanged,
|
||||
enabledChanged,
|
||||
nextRunAtMs: job.state.nextRunAtMs,
|
||||
},
|
||||
"cron: repaired nextRunAtMs after external reload",
|
||||
);
|
||||
}
|
||||
|
||||
return changed;
|
||||
}
|
||||
|
||||
export async function ensureLoaded(
|
||||
state: CronServiceState,
|
||||
opts?: {
|
||||
@ -31,6 +96,7 @@ export async function ensureLoaded(
|
||||
// Force reload always re-reads the file to avoid missing cross-service
|
||||
// edits on filesystems with coarse mtime resolution.
|
||||
|
||||
const previousJobs = state.store?.jobs;
|
||||
const fileMtimeMs = await getFileMtimeMs(state.deps.storePath);
|
||||
const loaded = await loadCronStore(state.deps.storePath);
|
||||
const jobs = (loaded.jobs ?? []) as unknown as Array<Record<string, unknown>>;
|
||||
@ -38,12 +104,16 @@ export async function ensureLoaded(
|
||||
state.store = { version: 1, jobs: jobs as unknown as CronJob[] };
|
||||
state.storeLoadedAtMs = state.deps.nowMs();
|
||||
state.storeFileMtimeMs = fileMtimeMs;
|
||||
const repairedExternalReload = repairNextRunsAfterExternalReload({
|
||||
state,
|
||||
previousJobs,
|
||||
});
|
||||
|
||||
if (!opts?.skipRecompute) {
|
||||
recomputeNextRuns(state);
|
||||
}
|
||||
|
||||
if (mutated) {
|
||||
if (mutated || repairedExternalReload) {
|
||||
await persist(state, { skipBackup: true });
|
||||
}
|
||||
}
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user