Merge 089077d985f9cf4de1ce66511bd8dff7f7d451b8 into 8a05c05596ca9ba0735dafd8e359885de4c2c969

This commit is contained in:
RichardCao 2026-03-21 05:59:25 +00:00 committed by GitHub
commit 075d433c4e
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
11 changed files with 999 additions and 80 deletions

View File

@ -0,0 +1,421 @@
import fs from "node:fs/promises";
import { describe, expect, it, vi } from "vitest";
import { setupCronServiceSuite, writeCronStoreSnapshot } from "./service.test-harness.js";
import { recomputeNextRuns } from "./service/jobs.js";
import { run } from "./service/ops.js";
import { createCronServiceState } from "./service/state.js";
import { ensureLoaded } from "./service/store.js";
import type { CronJob } from "./types.js";
const { logger: noopLogger, makeStorePath } = setupCronServiceSuite({
prefix: "openclaw-cron-external-reload-",
baseTimeIso: "2026-03-19T01:44:00.000Z",
});
function createCronJob(params: {
id: string;
expr: string;
updatedAtMs?: number;
enabled?: boolean;
nextRunAtMs?: number;
scheduleErrorCount?: number;
lastError?: string;
lastStatus?: CronJob["state"]["lastStatus"];
runningAtMs?: number;
}): CronJob {
return {
id: params.id,
name: params.id,
enabled: params.enabled ?? true,
createdAtMs: Date.parse("2026-03-18T00:30:00.000Z"),
updatedAtMs: params.updatedAtMs ?? Date.parse("2026-03-19T01:44:00.000Z"),
schedule: { kind: "cron", expr: params.expr, tz: "Asia/Shanghai", staggerMs: 0 },
sessionTarget: "main",
wakeMode: "next-heartbeat",
payload: { kind: "systemEvent", text: "tick" },
state: {
nextRunAtMs: params.nextRunAtMs,
scheduleErrorCount: params.scheduleErrorCount,
lastError: params.lastError,
lastStatus: params.lastStatus,
runningAtMs: params.runningAtMs,
},
};
}
describe("forceReload repairs externally changed schedules", () => {
it("recomputes nextRunAtMs when jobs.json changes a cron schedule outside cron.update", async () => {
const store = await makeStorePath();
const nowMs = Date.parse("2026-03-19T01:44:00.000Z");
const jobId = "external-schedule-change";
const staleNextRunAtMs = Date.parse("2026-03-20T00:30:00.000Z");
const correctedNextRunAtMs = Date.parse("2026-03-19T12:30:00.000Z");
await writeCronStoreSnapshot({
storePath: store.storePath,
jobs: [createCronJob({ id: jobId, expr: "30 8 * * *", nextRunAtMs: staleNextRunAtMs })],
});
const state = createCronServiceState({
cronEnabled: true,
storePath: store.storePath,
log: noopLogger,
nowMs: () => nowMs,
enqueueSystemEvent: vi.fn(),
requestHeartbeatNow: vi.fn(),
runIsolatedAgentJob: vi.fn(async () => ({ status: "ok" as const })),
});
await ensureLoaded(state, { skipRecompute: true });
await writeCronStoreSnapshot({
storePath: store.storePath,
jobs: [createCronJob({ id: jobId, expr: "30 8,20 * * *", nextRunAtMs: staleNextRunAtMs })],
});
await ensureLoaded(state, { forceReload: true, skipRecompute: true });
const reloaded = state.store?.jobs.find((job) => job.id === jobId);
expect(reloaded?.state.nextRunAtMs).toBe(correctedNextRunAtMs);
const persisted = JSON.parse(await fs.readFile(store.storePath, "utf8")) as {
jobs?: Array<{ id: string; state?: { nextRunAtMs?: number } }>;
};
expect(persisted.jobs?.find((job) => job.id === jobId)?.state?.nextRunAtMs).toBe(
correctedNextRunAtMs,
);
});
it("recomputes from updatedAtMs so delayed reload keeps newly earlier slots due", async () => {
const store = await makeStorePath();
const nowMs = Date.parse("2026-03-19T12:10:00.000Z");
const jobId = "external-schedule-change-delayed-observe";
const staleNextRunAtMs = Date.parse("2026-03-20T00:30:00.000Z");
const createJob = (params: { expr: string; updatedAtMs: number }) =>
createCronJob({
id: jobId,
expr: params.expr,
updatedAtMs: params.updatedAtMs,
nextRunAtMs: staleNextRunAtMs,
});
await writeCronStoreSnapshot({
storePath: store.storePath,
jobs: [
createJob({ expr: "30 23 * * *", updatedAtMs: Date.parse("2026-03-19T12:00:00.000Z") }),
],
});
const state = createCronServiceState({
cronEnabled: true,
storePath: store.storePath,
log: noopLogger,
nowMs: () => nowMs,
enqueueSystemEvent: vi.fn(),
requestHeartbeatNow: vi.fn(),
runIsolatedAgentJob: vi.fn(async () => ({ status: "ok" as const })),
});
await ensureLoaded(state, { skipRecompute: true });
await writeCronStoreSnapshot({
storePath: store.storePath,
jobs: [createJob({ expr: "* * * * *", updatedAtMs: Date.parse("2026-03-19T12:01:00.000Z") })],
});
await ensureLoaded(state, { forceReload: true, skipRecompute: true });
expect(state.store?.jobs[0]?.state.nextRunAtMs).toBe(Date.parse("2026-03-19T12:02:00.000Z"));
});
it("records schedule errors instead of aborting reload when an external edit is invalid", async () => {
const store = await makeStorePath();
const nowMs = Date.parse("2026-03-19T01:44:00.000Z");
const jobId = "external-invalid-schedule";
await writeCronStoreSnapshot({
storePath: store.storePath,
jobs: [
createCronJob({
id: jobId,
expr: "30 8 * * *",
nextRunAtMs: Date.parse("2026-03-20T00:30:00.000Z"),
}),
],
});
const state = createCronServiceState({
cronEnabled: true,
storePath: store.storePath,
log: noopLogger,
nowMs: () => nowMs,
enqueueSystemEvent: vi.fn(),
requestHeartbeatNow: vi.fn(),
runIsolatedAgentJob: vi.fn(async () => ({ status: "ok" as const })),
});
await ensureLoaded(state, { skipRecompute: true });
await writeCronStoreSnapshot({
storePath: store.storePath,
jobs: [
createCronJob({
id: jobId,
expr: "not a valid cron",
nextRunAtMs: Date.parse("2026-03-20T00:30:00.000Z"),
}),
],
});
await expect(
ensureLoaded(state, { forceReload: true, skipRecompute: true }),
).resolves.toBeUndefined();
const reloaded = state.store?.jobs[0];
expect(reloaded?.state.nextRunAtMs).toBeUndefined();
expect(reloaded?.state.scheduleErrorCount).toBe(1);
expect(reloaded?.state.lastError).toMatch(/^schedule error:/);
});
it("does not double-count a reload schedule error during the immediate recompute", async () => {
const store = await makeStorePath();
const nowMs = Date.parse("2026-03-19T01:44:00.000Z");
const jobId = "external-invalid-schedule-full-recompute";
await writeCronStoreSnapshot({
storePath: store.storePath,
jobs: [
createCronJob({
id: jobId,
expr: "30 8 * * *",
nextRunAtMs: Date.parse("2026-03-20T00:30:00.000Z"),
}),
],
});
const state = createCronServiceState({
cronEnabled: true,
storePath: store.storePath,
log: noopLogger,
nowMs: () => nowMs,
enqueueSystemEvent: vi.fn(),
requestHeartbeatNow: vi.fn(),
runIsolatedAgentJob: vi.fn(async () => ({ status: "ok" as const })),
});
await ensureLoaded(state, { skipRecompute: true });
await writeCronStoreSnapshot({
storePath: store.storePath,
jobs: [
createCronJob({
id: jobId,
expr: "not a valid cron",
nextRunAtMs: Date.parse("2026-03-20T00:30:00.000Z"),
}),
],
});
await ensureLoaded(state, { forceReload: true, skipRecompute: true });
expect(state.store?.jobs[0]?.state.scheduleErrorCount).toBe(1);
recomputeNextRuns(state);
expect(state.store?.jobs[0]?.state.scheduleErrorCount).toBe(1);
});
it("keeps forceReload repairs when manual-run snapshot is merged back", async () => {
const store = await makeStorePath();
let nowMs = Date.parse("2026-03-19T01:44:00.000Z");
const jobId = "manual-run-reload-merge";
const staleNextRunAtMs = Date.parse("2026-03-19T23:30:00.000Z");
const createJob = (params: { expr: string; enabled: boolean; nextRunAtMs?: number }) => ({
...createCronJob({
id: jobId,
expr: params.expr,
enabled: params.enabled,
nextRunAtMs: params.nextRunAtMs,
}),
sessionTarget: "isolated" as const,
payload: { kind: "agentTurn", message: "tick" } as const,
});
await writeCronStoreSnapshot({
storePath: store.storePath,
jobs: [createJob({ expr: "30 23 * * *", enabled: true, nextRunAtMs: staleNextRunAtMs })],
});
const runIsolatedAgentJob = vi.fn(async () => {
await writeCronStoreSnapshot({
storePath: store.storePath,
jobs: [createJob({ expr: "30 8 * * *", enabled: false, nextRunAtMs: staleNextRunAtMs })],
});
nowMs += 500;
return { status: "ok" as const, summary: "done" };
});
const state = createCronServiceState({
cronEnabled: true,
storePath: store.storePath,
log: noopLogger,
nowMs: () => nowMs,
enqueueSystemEvent: vi.fn(),
requestHeartbeatNow: vi.fn(),
runIsolatedAgentJob,
});
expect(await run(state, jobId, "force")).toEqual({ ok: true, ran: true });
const merged = state.store?.jobs[0];
expect(merged?.enabled).toBe(false);
expect(merged?.state.nextRunAtMs).toBeUndefined();
expect(merged?.state.lastStatus).toBe("ok");
});
it("keeps scheduleErrorCount cleared when external reload fixes schedule during force-run", async () => {
const store = await makeStorePath();
let nowMs = Date.parse("2026-03-19T01:44:00.000Z");
const jobId = "manual-run-reload-clears-schedule-error-count";
const staleNextRunAtMs = Date.parse("2026-03-19T23:30:00.000Z");
const createJob = (expr: string) => ({
...createCronJob({
id: jobId,
expr,
nextRunAtMs: staleNextRunAtMs,
scheduleErrorCount: 2,
lastError: "schedule error: invalid expression",
}),
sessionTarget: "isolated" as const,
payload: { kind: "agentTurn", message: "tick" } as const,
});
await writeCronStoreSnapshot({
storePath: store.storePath,
jobs: [createJob("30 23 * * *")],
});
const runIsolatedAgentJob = vi.fn(async () => {
await writeCronStoreSnapshot({
storePath: store.storePath,
jobs: [createJob("30 8 * * *")],
});
nowMs += 500;
return { status: "ok" as const, summary: "done" };
});
const state = createCronServiceState({
cronEnabled: true,
storePath: store.storePath,
log: noopLogger,
nowMs: () => nowMs,
enqueueSystemEvent: vi.fn(),
requestHeartbeatNow: vi.fn(),
runIsolatedAgentJob,
});
expect(await run(state, jobId, "force")).toEqual({ ok: true, ran: true });
expect(state.store?.jobs[0]?.state.scheduleErrorCount).toBeUndefined();
});
it("preserves runningAtMs when an external reload comes from a stale file snapshot", async () => {
const store = await makeStorePath();
const nowMs = Date.parse("2026-03-19T12:10:00.000Z");
const jobId = "external-running-marker";
const runningAtMs = Date.parse("2026-03-19T12:00:00.000Z");
await writeCronStoreSnapshot({
storePath: store.storePath,
jobs: [
createCronJob({
id: jobId,
expr: "30 23 * * *",
nextRunAtMs: Date.parse("2026-03-20T00:30:00.000Z"),
runningAtMs,
}),
],
});
const state = createCronServiceState({
cronEnabled: true,
storePath: store.storePath,
log: noopLogger,
nowMs: () => nowMs,
enqueueSystemEvent: vi.fn(),
requestHeartbeatNow: vi.fn(),
runIsolatedAgentJob: vi.fn(async () => ({ status: "ok" as const })),
});
await ensureLoaded(state, { skipRecompute: true });
await writeCronStoreSnapshot({
storePath: store.storePath,
jobs: [
createCronJob({
id: jobId,
expr: "* * * * *",
updatedAtMs: Date.parse("2026-03-19T12:01:00.000Z"),
nextRunAtMs: Date.parse("2026-03-20T00:30:00.000Z"),
}),
],
});
await ensureLoaded(state, { forceReload: true, skipRecompute: true });
const reloaded = state.store?.jobs[0];
expect(reloaded?.state.runningAtMs).toBe(runningAtMs);
expect(reloaded?.state.nextRunAtMs).toBe(Date.parse("2026-03-19T12:02:00.000Z"));
});
it("recomputes nextRunAtMs when an external every schedule changes", async () => {
const store = await makeStorePath();
const nowMs = Date.parse("2026-03-19T01:44:00.000Z");
const jobId = "external-every-schedule-change";
const createEveryJob = (everyMs: number): CronJob => ({
id: jobId,
name: jobId,
enabled: true,
createdAtMs: Date.parse("2026-03-18T00:00:00.000Z"),
updatedAtMs: Date.parse("2026-03-19T01:44:00.000Z"),
schedule: {
kind: "every",
everyMs,
anchorMs: Date.parse("2026-03-19T00:00:00.000Z"),
},
sessionTarget: "main",
wakeMode: "next-heartbeat",
payload: { kind: "systemEvent", text: "tick" },
state: {
nextRunAtMs: Date.parse("2026-03-20T00:00:00.000Z"),
},
});
await writeCronStoreSnapshot({
storePath: store.storePath,
jobs: [createEveryJob(6 * 60_000)],
});
const state = createCronServiceState({
cronEnabled: true,
storePath: store.storePath,
log: noopLogger,
nowMs: () => nowMs,
enqueueSystemEvent: vi.fn(),
requestHeartbeatNow: vi.fn(),
runIsolatedAgentJob: vi.fn(async () => ({ status: "ok" as const })),
});
await ensureLoaded(state, { skipRecompute: true });
await writeCronStoreSnapshot({
storePath: store.storePath,
jobs: [createEveryJob(60_000)],
});
await ensureLoaded(state, { forceReload: true, skipRecompute: true });
expect(state.store?.jobs[0]?.state.nextRunAtMs).toBe(Date.parse("2026-03-19T01:44:00.000Z"));
});
});

View File

@ -22,8 +22,12 @@ import {
createNoopLogger,
createRunningCronServiceState,
} from "./service.test-harness.js";
import { computeJobNextRunAtMs } from "./service/jobs.js";
import { enqueueRun, run } from "./service/ops.js";
import {
computeJobNextRunAtMs,
nextWakeAtMs,
recomputeNextRunsForMaintenance,
} from "./service/jobs.js";
import { enqueueRun, list, run, status } from "./service/ops.js";
import { createCronServiceState, type CronEvent } from "./service/state.js";
import {
DEFAULT_JOB_TIMEOUT_MS,
@ -1748,6 +1752,170 @@ describe("Cron issue regressions", () => {
expect(job.enabled).toBe(true);
});
it("does not double-count reload schedule errors in apply path before maintenance recompute", () => {
const startedAt = Date.parse("2026-03-02T12:10:00.000Z");
const endedAt = startedAt + 25;
const job = createIsolatedRegressionJob({
id: "apply-result-reload-dedupe-30905",
name: "apply-result-reload-dedupe-30905",
scheduledAt: startedAt,
schedule: { kind: "cron", expr: "0 7 * * *", tz: "Invalid/Timezone" },
payload: { kind: "agentTurn", message: "ping" },
state: {
nextRunAtMs: undefined,
runningAtMs: startedAt - 500,
scheduleErrorCount: 1,
lastError: "schedule error: previous",
},
});
const state = createRunningCronServiceState({
storePath: "/tmp/cron-30905-reload-dedupe.json",
log: noopLogger as never,
nowMs: () => endedAt,
jobs: [job],
});
state.skipNextReloadRepairRecomputeJobIds = new Set([job.id]);
applyJobResult(state, job, {
status: "ok",
delivered: true,
startedAt,
endedAt,
});
expect(job.state.scheduleErrorCount).toBe(1);
expect(state.skipNextReloadRepairRecomputeJobIds?.has(job.id)).toBe(true);
expect(nextWakeAtMs(state)).toBe(endedAt + 2_000);
recomputeNextRunsForMaintenance(state);
expect(job.state.scheduleErrorCount).toBe(1);
expect(state.skipNextReloadRepairRecomputeJobIds?.has(job.id)).toBe(false);
expect(nextWakeAtMs(state)).toBe(endedAt + 2_000);
});
it("keeps a future wake when apply skips immediate recompute after reload schedule error", () => {
const startedAt = Date.parse("2026-03-02T12:12:00.000Z");
const endedAt = startedAt + 25;
const job = createIsolatedRegressionJob({
id: "apply-result-reload-wake-30905",
name: "apply-result-reload-wake-30905",
scheduledAt: startedAt,
schedule: { kind: "cron", expr: "0 7 * * *", tz: "Invalid/Timezone" },
payload: { kind: "agentTurn", message: "ping" },
state: {
nextRunAtMs: undefined,
runningAtMs: startedAt - 500,
scheduleErrorCount: 1,
lastError: "schedule error: previous",
},
});
const state = createRunningCronServiceState({
storePath: "/tmp/cron-30905-reload-wake.json",
log: noopLogger as never,
nowMs: () => endedAt,
jobs: [job],
});
state.skipNextReloadRepairRecomputeJobIds = new Set([job.id]);
applyJobResult(state, job, {
status: "error",
error: "synthetic failure",
startedAt,
endedAt,
});
expect(job.state.scheduleErrorCount).toBe(1);
expect(job.state.nextRunAtMs).toBeUndefined();
expect(state.skipNextReloadRepairRecomputeJobIds?.has(job.id)).toBe(true);
expect(nextWakeAtMs(state)).toBe(endedAt + 2_000);
recomputeNextRunsForMaintenance(state);
expect(job.state.scheduleErrorCount).toBe(1);
expect(state.skipNextReloadRepairRecomputeJobIds?.has(job.id)).toBe(false);
expect(nextWakeAtMs(state)).toBe(endedAt + 2_000);
});
it("does not arm 2s wake for malformed every schedules with non-repairable missing nextRun", () => {
const nowMs = Date.parse("2026-03-02T12:20:00.000Z");
const job = createIsolatedRegressionJob({
id: "missing-nextrun-malformed-every",
name: "missing-nextrun-malformed-every",
scheduledAt: nowMs,
schedule: { kind: "every", everyMs: Number.NaN, anchorMs: nowMs - 60_000 },
payload: { kind: "agentTurn", message: "ping" },
state: {
nextRunAtMs: undefined,
},
});
const state = createRunningCronServiceState({
storePath: "/tmp/cron-missing-nextrun-malformed-every.json",
log: noopLogger as never,
nowMs: () => nowMs,
jobs: [job],
});
expect(nextWakeAtMs(state)).toBeUndefined();
});
it("does not consume reload skip markers during read-only status/list maintenance", async () => {
const nowMs = Date.parse("2026-03-02T12:25:00.000Z");
const job = createIsolatedRegressionJob({
id: "read-only-skip-marker",
name: "read-only-skip-marker",
scheduledAt: nowMs,
schedule: { kind: "cron", expr: "0 7 * * *", tz: "Invalid/Timezone" },
payload: { kind: "agentTurn", message: "ping" },
state: {
nextRunAtMs: undefined,
scheduleErrorCount: 1,
lastError: "schedule error: previous",
},
});
const state = createRunningCronServiceState({
storePath: "/tmp/cron-read-only-skip-marker.json",
log: noopLogger as never,
nowMs: () => nowMs,
jobs: [job],
});
state.skipNextReloadRepairRecomputeJobIds = new Set([job.id]);
const currentStatus = await status(state);
expect(currentStatus.nextWakeAtMs).toBe(nowMs + 2_000);
expect(state.skipNextReloadRepairRecomputeJobIds.has(job.id)).toBe(true);
const jobs = await list(state, { includeDisabled: true });
expect(jobs).toHaveLength(1);
expect(state.skipNextReloadRepairRecomputeJobIds.has(job.id)).toBe(true);
recomputeNextRunsForMaintenance(state);
expect(state.skipNextReloadRepairRecomputeJobIds.has(job.id)).toBe(false);
});
it("does not arm 2s wake for exhausted one-shot jobs with missing nextRun", () => {
const nowMs = Date.parse("2026-03-02T12:22:00.000Z");
const atMs = nowMs - 60_000;
const job = createIsolatedRegressionJob({
id: "missing-nextrun-exhausted-at",
name: "missing-nextrun-exhausted-at",
scheduledAt: nowMs,
schedule: { kind: "at", at: new Date(atMs).toISOString() },
payload: { kind: "agentTurn", message: "ping" },
state: {
nextRunAtMs: undefined,
lastStatus: "ok",
lastRunAtMs: nowMs - 30_000,
},
});
const state = createRunningCronServiceState({
storePath: "/tmp/cron-missing-nextrun-exhausted-at.json",
log: noopLogger as never,
nowMs: () => nowMs,
jobs: [job],
});
expect(nextWakeAtMs(state)).toBeUndefined();
});
it("force run preserves 'every' anchor while recording manual lastRunAtMs", () => {
const nowMs = Date.now();
const everyMs = 24 * 60 * 60 * 1_000;

View File

@ -385,6 +385,7 @@ function createMockState(now: number, opts?: { defaultAgentId?: string }): CronS
nowMs: () => now,
defaultAgentId: opts?.defaultAgentId,
},
skipNextReloadRepairRecomputeJobIds: new Set<string>(),
} as unknown as CronServiceState;
}

View File

@ -220,6 +220,7 @@ export function createMockCronStateForJobs(params: {
timer: null,
storeLoadedAtMs: nowMs,
storeFileMtimeMs: null,
skipNextReloadRepairRecomputeJobIds: new Set<string>(),
op: Promise.resolve(),
warnedDisabled: false,
deps: {

View File

@ -28,6 +28,11 @@ function createMockState(jobs: CronJob[]): CronServiceState {
store,
timer: null,
running: false,
op: Promise.resolve(),
warnedDisabled: false,
storeLoadedAtMs: null,
storeFileMtimeMs: null,
skipNextReloadRepairRecomputeJobIds: new Set<string>(),
} as unknown as CronServiceState;
}
@ -201,4 +206,44 @@ describe("cron schedule error isolation", () => {
expect(badJob.state.lastError).not.toContain("Cannot read properties of undefined");
expect(badJob.state.scheduleErrorCount).toBe(1);
});
it("keeps malformed every schedules on the schedule-error path", () => {
const badJob = createJob({
id: "bad-every",
name: "Bad Every",
schedule: { kind: "every", everyMs: Number.NaN },
state: {
nextRunAtMs: undefined,
scheduleErrorCount: 1,
lastError: "schedule error: previous",
},
});
const state = createMockState([badJob]);
recomputeNextRuns(state);
expect(badJob.state.nextRunAtMs).toBeUndefined();
expect(badJob.state.scheduleErrorCount).toBe(2);
expect(badJob.state.lastError).toContain("invalid every schedule");
});
it("keeps malformed at schedules on the schedule-error path", () => {
const badJob = createJob({
id: "bad-at",
name: "Bad At",
schedule: { kind: "at", at: "not-a-timestamp" },
state: {
nextRunAtMs: undefined,
scheduleErrorCount: 1,
lastError: "schedule error: previous",
},
});
const state = createMockState([badJob]);
recomputeNextRuns(state);
expect(badJob.state.nextRunAtMs).toBeUndefined();
expect(badJob.state.scheduleErrorCount).toBe(2);
expect(badJob.state.lastError).toContain("invalid at schedule");
});
});

View File

@ -33,6 +33,7 @@ import {
import type { CronServiceState } from "./state.js";
const STUCK_RUN_MS = 2 * 60 * 60 * 1000;
const MISSING_NEXT_RUN_WAKE_MS = 2_000;
const STAGGER_OFFSET_CACHE_MAX = 4096;
const staggerOffsetCache = new Map<string, number>();
@ -238,6 +239,19 @@ export function findJobOrThrow(state: CronServiceState, id: string) {
return job;
}
export function removeJobById(state: CronServiceState, jobId: string): boolean {
if (!state.store) {
return false;
}
const before = state.store.jobs.length;
state.store.jobs = state.store.jobs.filter((job) => job.id !== jobId);
const removed = state.store.jobs.length !== before;
if (removed) {
state.skipNextReloadRepairRecomputeJobIds.delete(jobId);
}
return removed;
}
export function computeJobNextRunAtMs(job: CronJob, nowMs: number): number | undefined {
if (!job.enabled) {
return undefined;
@ -294,6 +308,22 @@ export function computeJobNextRunAtMs(job: CronJob, nowMs: number): number | und
return isFiniteTimestamp(next) ? next : undefined;
}
export function shouldTreatUndefinedNextRunAsScheduleError(job: CronJob): boolean {
if (!job.enabled) {
return false;
}
if (job.schedule.kind === "every") {
return coerceFiniteScheduleNumber(job.schedule.everyMs) === undefined;
}
if (job.schedule.kind === "at") {
return parseAbsoluteTimeMs(job.schedule.at) === null;
}
if (job.schedule.kind === "cron") {
return job.schedule.expr.trim().length === 0;
}
return false;
}
export function computeJobPreviousRunAtMs(job: CronJob, nowMs: number): number | undefined {
if (!job.enabled || job.schedule.kind !== "cron") {
return undefined;
@ -413,10 +443,50 @@ function walkSchedulableJobs(
return changed;
}
function recomputeJobNextRunAtMs(params: { state: CronServiceState; job: CronJob; nowMs: number }) {
export function hasSkipNextReloadRepairRecompute(state: CronServiceState, jobId: string): boolean {
return state.skipNextReloadRepairRecomputeJobIds.has(jobId);
}
export function consumeSkipNextReloadRepairRecompute(
state: CronServiceState,
jobId: string,
): boolean {
if (!hasSkipNextReloadRepairRecompute(state, jobId)) {
return false;
}
state.skipNextReloadRepairRecomputeJobIds.delete(jobId);
return true;
}
function recomputeJobNextRunAtMs(params: {
state: CronServiceState;
job: CronJob;
nowMs: number;
consumeReloadRepairSkip?: boolean;
}) {
const consumeReloadRepairSkip = params.consumeReloadRepairSkip ?? true;
if (
consumeReloadRepairSkip
? consumeSkipNextReloadRepairRecompute(params.state, params.job.id)
: hasSkipNextReloadRepairRecompute(params.state, params.job.id)
) {
return false;
}
let changed = false;
try {
const newNext = computeJobNextRunAtMs(params.job, params.nowMs);
if (newNext === undefined && shouldTreatUndefinedNextRunAsScheduleError(params.job)) {
const err =
params.job.schedule.kind === "every"
? new Error("invalid every schedule: everyMs must be a finite number")
: params.job.schedule.kind === "at"
? new Error("invalid at schedule: at must be a valid absolute timestamp")
: new Error("invalid cron schedule: expr is required");
if (recordScheduleComputeError({ state: params.state, job: params.job, err })) {
changed = true;
}
return changed;
}
if (params.job.state.nextRunAtMs !== newNext) {
params.job.state.nextRunAtMs = newNext;
changed = true;
@ -460,16 +530,17 @@ export function recomputeNextRuns(state: CronServiceState): boolean {
*/
export function recomputeNextRunsForMaintenance(
state: CronServiceState,
opts?: { recomputeExpired?: boolean; nowMs?: number },
opts?: { recomputeExpired?: boolean; nowMs?: number; consumeReloadRepairSkip?: boolean },
): boolean {
const recomputeExpired = opts?.recomputeExpired ?? false;
const consumeReloadRepairSkip = opts?.consumeReloadRepairSkip ?? true;
return walkSchedulableJobs(
state,
({ job, nowMs: now }) => {
let changed = false;
if (!isFiniteTimestamp(job.state.nextRunAtMs)) {
// Missing or invalid nextRunAtMs is always repaired.
if (recomputeJobNextRunAtMs({ state, job, nowMs: now })) {
if (recomputeJobNextRunAtMs({ state, job, nowMs: now, consumeReloadRepairSkip })) {
changed = true;
}
} else if (
@ -482,7 +553,7 @@ export function recomputeNextRunsForMaintenance(
const lastRun = job.state.lastRunAtMs;
const alreadyExecutedSlot = isFiniteTimestamp(lastRun) && lastRun >= job.state.nextRunAtMs;
if (alreadyExecutedSlot) {
if (recomputeJobNextRunAtMs({ state, job, nowMs: now })) {
if (recomputeJobNextRunAtMs({ state, job, nowMs: now, consumeReloadRepairSkip })) {
changed = true;
}
}
@ -495,18 +566,46 @@ export function recomputeNextRunsForMaintenance(
export function nextWakeAtMs(state: CronServiceState) {
const jobs = state.store?.jobs ?? [];
const enabled = jobs.filter((j) => j.enabled && isFiniteTimestamp(j.state.nextRunAtMs));
if (enabled.length === 0) {
return undefined;
let minEnabledNextRunAtMs: number | undefined;
let hasEnabledRepairableMissingNextRun = false;
const nowMs = state.deps.nowMs();
for (const job of jobs) {
if (!job.enabled) {
continue;
}
const nextRunAtMs = job.state.nextRunAtMs;
if (isFiniteTimestamp(nextRunAtMs)) {
minEnabledNextRunAtMs =
minEnabledNextRunAtMs === undefined
? nextRunAtMs
: Math.min(minEnabledNextRunAtMs, nextRunAtMs);
continue;
}
// Only wake for missing nextRun values that can be repaired by recompute.
// Non-repairable malformed schedules (e.g. invalid every/at payloads)
// should not keep the scheduler in a perpetual 2s poll loop.
if ((job.state.scheduleErrorCount ?? 0) > 0) {
hasEnabledRepairableMissingNextRun = true;
continue;
}
try {
if (computeJobNextRunAtMs(job, nowMs) !== undefined) {
hasEnabledRepairableMissingNextRun = true;
}
} catch {
hasEnabledRepairableMissingNextRun = true;
}
}
const first = enabled[0]?.state.nextRunAtMs;
if (!isFiniteTimestamp(first)) {
return undefined;
if (!hasEnabledRepairableMissingNextRun) {
return minEnabledNextRunAtMs;
}
return enabled.reduce((min, j) => {
const next = j.state.nextRunAtMs;
return isFiniteTimestamp(next) ? Math.min(min, next) : min;
}, first);
const wakeForMissingNextRunAtMs = nowMs + MISSING_NEXT_RUN_WAKE_MS;
return minEnabledNextRunAtMs === undefined
? wakeForMissingNextRunAtMs
: Math.min(minEnabledNextRunAtMs, wakeForMissingNextRunAtMs);
}
export function createJob(state: CronServiceState, input: CronJobCreate): CronJob {

View File

@ -9,10 +9,12 @@ import {
findJobOrThrow,
isJobDue,
nextWakeAtMs,
removeJobById,
recomputeNextRuns,
recomputeNextRunsForMaintenance,
} from "./jobs.js";
import { locked } from "./locked.js";
import { schedulesEqual } from "./schedule-equality.js";
import type { CronServiceState } from "./state.js";
import { ensureLoaded, persist, warnIfDisabled } from "./store.js";
import {
@ -47,6 +49,7 @@ export type CronListPageResult = {
hasMore: boolean;
nextOffset: number | null;
};
function mergeManualRunSnapshotAfterReload(params: {
state: CronServiceState;
jobId: string;
@ -55,13 +58,17 @@ function mergeManualRunSnapshotAfterReload(params: {
updatedAtMs: number;
state: CronJob["state"];
} | null;
baseline: {
enabled: boolean;
schedule: CronJob["schedule"];
} | null;
removed: boolean;
}) {
if (!params.state.store) {
return;
}
if (params.removed) {
params.state.store.jobs = params.state.store.jobs.filter((job) => job.id !== params.jobId);
removeJobById(params.state, params.jobId);
return;
}
if (!params.snapshot) {
@ -71,9 +78,34 @@ function mergeManualRunSnapshotAfterReload(params: {
if (!reloaded) {
return;
}
reloaded.enabled = params.snapshot.enabled;
reloaded.updatedAtMs = params.snapshot.updatedAtMs;
reloaded.state = params.snapshot.state;
const preservedEnabled = reloaded.enabled;
const preservedNextRunAtMs = reloaded.state.nextRunAtMs;
const preservedScheduleErrorCount = reloaded.state.scheduleErrorCount;
const preservedScheduleErrorText = reloaded.state.lastError;
const externalScheduleOrEnabledChanged =
params.baseline !== null &&
(preservedEnabled !== params.baseline.enabled ||
!schedulesEqual(reloaded.schedule, params.baseline.schedule));
reloaded.updatedAtMs = Math.max(reloaded.updatedAtMs, params.snapshot.updatedAtMs);
reloaded.state = {
...reloaded.state,
...params.snapshot.state,
};
// Only preserve reload-derived schedule/enable repairs when the underlying
// schedule or enabled flag was externally changed while the manual run was executing.
// Otherwise, keep the manual-run terminal state (e.g. one-shot disable on success).
if (externalScheduleOrEnabledChanged) {
reloaded.enabled = preservedEnabled;
reloaded.state.nextRunAtMs = preservedNextRunAtMs;
reloaded.state.scheduleErrorCount = preservedScheduleErrorCount;
if (preservedScheduleErrorCount !== undefined) {
reloaded.state.lastError = preservedScheduleErrorText;
}
} else {
reloaded.enabled = params.snapshot.enabled;
}
}
async function ensureLoadedForRead(state: CronServiceState) {
@ -83,7 +115,7 @@ async function ensureLoadedForRead(state: CronServiceState) {
}
// Use the maintenance-only version so that read-only operations never
// advance a past-due nextRunAtMs without executing the job (#16156).
const changed = recomputeNextRunsForMaintenance(state);
const changed = recomputeNextRunsForMaintenance(state, { consumeReloadRepairSkip: false });
if (changed) {
await persist(state);
}
@ -302,6 +334,7 @@ export async function update(state: CronServiceState, id: string, patch: CronJob
job.state.nextRunAtMs = undefined;
job.state.runningAtMs = undefined;
}
state.skipNextReloadRepairRecomputeJobIds.delete(id);
} else if (job.enabled) {
// Non-schedule edits should not mutate other jobs, but still repair a
// missing/corrupt nextRunAtMs for the updated job.
@ -326,12 +359,10 @@ export async function remove(state: CronServiceState, id: string) {
return await locked(state, async () => {
warnIfDisabled(state, "remove");
await ensureLoaded(state);
const before = state.store?.jobs.length ?? 0;
if (!state.store) {
return { ok: false, removed: false } as const;
}
state.store.jobs = state.store.jobs.filter((j) => j.id !== id);
const removed = (state.store.jobs.length ?? 0) !== before;
const removed = removeJobById(state, id);
await persist(state);
armTimer(state);
if (removed) {
@ -383,7 +414,7 @@ async function inspectManualRunPreflight(
// Normalize job tick state (clears stale runningAtMs markers) before
// checking if already running, so a stale marker from a crashed Phase-1
// persist does not block manual triggers for up to STUCK_RUN_MS (#17554).
recomputeNextRunsForMaintenance(state);
recomputeNextRunsForMaintenance(state, { consumeReloadRepairSkip: false });
const job = findJobOrThrow(state, id);
if (typeof job.state.runningAtMs === "number") {
return { ok: true, ran: false, reason: "already-running" as const };
@ -508,8 +539,7 @@ async function finishPreparedManualRun(
usage: coreResult.usage,
});
if (shouldDelete && state.store) {
state.store.jobs = state.store.jobs.filter((entry) => entry.id !== job.id);
if (shouldDelete && removeJobById(state, job.id)) {
emit(state, { jobId: job.id, action: "removed" });
}
@ -523,6 +553,12 @@ async function finishPreparedManualRun(
updatedAtMs: job.updatedAtMs,
state: structuredClone(job.state),
};
const postRunBaseline = shouldDelete
? null
: {
enabled: executionJob.enabled,
schedule: structuredClone(executionJob.schedule),
};
const postRunRemoved = shouldDelete;
// Isolated Telegram send can persist target writeback directly to disk.
// Reload before final persist so manual `cron run` keeps those changes.
@ -531,6 +567,7 @@ async function finishPreparedManualRun(
state,
jobId,
snapshot: postRunSnapshot,
baseline: postRunBaseline,
removed: postRunRemoved,
});
recomputeNextRunsForMaintenance(state, { recomputeExpired: true });

View File

@ -0,0 +1,17 @@
import type { CronJob } from "../types.js";
export function schedulesEqual(a: CronJob["schedule"], b: CronJob["schedule"]): boolean {
if (a.kind !== b.kind) {
return false;
}
if (a.kind === "at" && b.kind === "at") {
return a.at === b.at;
}
if (a.kind === "every" && b.kind === "every") {
return a.everyMs === b.everyMs && a.anchorMs === b.anchorMs;
}
if (a.kind === "cron" && b.kind === "cron") {
return a.expr === b.expr && a.tz === b.tz && a.staggerMs === b.staggerMs;
}
return false;
}

View File

@ -127,6 +127,7 @@ export type CronServiceState = {
warnedDisabled: boolean;
storeLoadedAtMs: number | null;
storeFileMtimeMs: number | null;
skipNextReloadRepairRecomputeJobIds: Set<string>;
};
export function createCronServiceState(deps: CronServiceDeps): CronServiceState {
@ -139,6 +140,7 @@ export function createCronServiceState(deps: CronServiceDeps): CronServiceState
warnedDisabled: false,
storeLoadedAtMs: null,
storeFileMtimeMs: null,
skipNextReloadRepairRecomputeJobIds: new Set(),
};
}

View File

@ -2,7 +2,13 @@ import fs from "node:fs";
import { normalizeStoredCronJobs } from "../store-migration.js";
import { loadCronStore, saveCronStore } from "../store.js";
import type { CronJob } from "../types.js";
import { recomputeNextRuns } from "./jobs.js";
import {
computeJobNextRunAtMs,
recordScheduleComputeError,
recomputeNextRuns,
shouldTreatUndefinedNextRunAsScheduleError,
} from "./jobs.js";
import { schedulesEqual } from "./schedule-equality.js";
import type { CronServiceState } from "./state.js";
async function getFileMtimeMs(path: string): Promise<number | null> {
@ -14,6 +20,118 @@ async function getFileMtimeMs(path: string): Promise<number | null> {
}
}
function resolveExternalRepairComputeBaseMs(params: {
nowMs: number;
reloadedUpdatedAtMs: number;
previousUpdatedAtMs: number;
}): number {
const { nowMs, reloadedUpdatedAtMs, previousUpdatedAtMs } = params;
if (!Number.isFinite(reloadedUpdatedAtMs)) {
return nowMs;
}
const normalizedReloadedUpdatedAtMs = Math.max(0, Math.floor(reloadedUpdatedAtMs));
const normalizedPreviousUpdatedAtMs = Number.isFinite(previousUpdatedAtMs)
? Math.max(0, Math.floor(previousUpdatedAtMs))
: Number.NEGATIVE_INFINITY;
if (normalizedReloadedUpdatedAtMs <= normalizedPreviousUpdatedAtMs) {
return nowMs;
}
return Math.min(nowMs, normalizedReloadedUpdatedAtMs);
}
function repairNextRunsAfterExternalReload(params: {
state: CronServiceState;
previousJobs: CronJob[] | undefined;
}): boolean {
const { state, previousJobs } = params;
const skipRecomputeJobIds = state.skipNextReloadRepairRecomputeJobIds;
if (!state.store || previousJobs === undefined) {
return false;
}
if (skipRecomputeJobIds.size > 0) {
const currentJobIds = new Set(state.store.jobs.map((job) => job.id));
for (const jobId of skipRecomputeJobIds) {
if (!currentJobIds.has(jobId)) {
skipRecomputeJobIds.delete(jobId);
}
}
}
const previousById = new Map(previousJobs.map((job) => [job.id, job]));
const now = state.deps.nowMs();
let changed = false;
for (const job of state.store.jobs) {
const previous = previousById.get(job.id);
if (!previous) {
continue;
}
if (typeof previous.state.runningAtMs === "number" && job.state.runningAtMs === undefined) {
job.state.runningAtMs = previous.state.runningAtMs;
changed = true;
}
const scheduleChanged = !schedulesEqual(previous.schedule, job.schedule);
const enabledChanged = previous.enabled !== job.enabled;
if (!scheduleChanged && !enabledChanged) {
continue;
}
skipRecomputeJobIds.delete(job.id);
const computeBaseMs = resolveExternalRepairComputeBaseMs({
nowMs: now,
reloadedUpdatedAtMs: job.updatedAtMs,
previousUpdatedAtMs: previous.updatedAtMs,
});
let nextRunAtMs: number | undefined;
try {
nextRunAtMs = job.enabled ? computeJobNextRunAtMs(job, computeBaseMs) : undefined;
if (nextRunAtMs === undefined && shouldTreatUndefinedNextRunAsScheduleError(job)) {
const err =
job.schedule.kind === "every"
? new Error("invalid every schedule: everyMs must be a finite number")
: new Error("invalid at schedule: at must be a valid absolute timestamp");
if (recordScheduleComputeError({ state, job, err })) {
changed = true;
}
skipRecomputeJobIds.add(job.id);
continue;
}
if (job.state.scheduleErrorCount !== undefined) {
job.state.scheduleErrorCount = undefined;
changed = true;
}
} catch (err) {
if (recordScheduleComputeError({ state, job, err })) {
changed = true;
}
skipRecomputeJobIds.add(job.id);
continue;
}
if (job.state.nextRunAtMs !== nextRunAtMs) {
job.state.nextRunAtMs = nextRunAtMs;
changed = true;
}
if (!job.enabled && job.state.runningAtMs !== undefined) {
job.state.runningAtMs = undefined;
changed = true;
}
state.deps.log.debug(
{
jobId: job.id,
scheduleChanged,
enabledChanged,
computeBaseMs,
nextRunAtMs: job.state.nextRunAtMs,
},
"cron: repaired nextRunAtMs after external reload",
);
}
return changed;
}
export async function ensureLoaded(
state: CronServiceState,
opts?: {
@ -31,6 +149,7 @@ export async function ensureLoaded(
// Force reload always re-reads the file to avoid missing cross-service
// edits on filesystems with coarse mtime resolution.
const previousJobs = state.store?.jobs;
const fileMtimeMs = await getFileMtimeMs(state.deps.storePath);
const loaded = await loadCronStore(state.deps.storePath);
const jobs = (loaded.jobs ?? []) as unknown as Array<Record<string, unknown>>;
@ -38,12 +157,16 @@ export async function ensureLoaded(
state.store = { version: 1, jobs: jobs as unknown as CronJob[] };
state.storeLoadedAtMs = state.deps.nowMs();
state.storeFileMtimeMs = fileMtimeMs;
const repairedExternalReload = repairNextRunsAfterExternalReload({
state,
previousJobs,
});
if (!opts?.skipRecompute) {
recomputeNextRuns(state);
}
if (mutated) {
if (mutated || repairedExternalReload) {
await persist(state, { skipBackup: true });
}
}

View File

@ -15,7 +15,9 @@ import type {
import {
computeJobPreviousRunAtMs,
computeJobNextRunAtMs,
hasSkipNextReloadRepairRecompute,
nextWakeAtMs,
removeJobById,
recomputeNextRunsForMaintenance,
recordScheduleComputeError,
resolveJobPayloadTextForMain,
@ -368,6 +370,7 @@ export function applyJobResult(
const shouldDelete =
job.schedule.kind === "at" && job.deleteAfterRun === true && result.status === "ok";
const skipImmediateScheduleRecompute = hasSkipNextReloadRepairRecompute(state, job.id);
if (!shouldDelete) {
if (job.schedule.kind === "at") {
@ -416,54 +419,58 @@ export function applyJobResult(
} else if (result.status === "error" && job.enabled) {
// Apply exponential backoff for errored jobs to prevent retry storms.
const backoff = errorBackoffMs(job.state.consecutiveErrors ?? 1);
let normalNext: number | undefined;
try {
normalNext =
opts?.preserveSchedule && job.schedule.kind === "every"
? computeNextWithPreservedLastRun(result.endedAt)
: computeJobNextRunAtMs(job, result.endedAt);
} catch (err) {
// If the schedule expression/timezone throws (croner edge cases),
// record the schedule error (auto-disables after repeated failures)
// and fall back to backoff-only schedule so the state update is not lost.
recordScheduleComputeError({ state, job, err });
}
const backoffNext = result.endedAt + backoff;
// Use whichever is later: the natural next run or the backoff delay.
job.state.nextRunAtMs =
normalNext !== undefined ? Math.max(normalNext, backoffNext) : backoffNext;
state.deps.log.info(
{
jobId: job.id,
consecutiveErrors: job.state.consecutiveErrors,
backoffMs: backoff,
nextRunAtMs: job.state.nextRunAtMs,
},
"cron: applying error backoff",
);
} else if (job.enabled) {
let naturalNext: number | undefined;
try {
naturalNext =
opts?.preserveSchedule && job.schedule.kind === "every"
? computeNextWithPreservedLastRun(result.endedAt)
: computeJobNextRunAtMs(job, result.endedAt);
} catch (err) {
// If the schedule expression/timezone throws (croner edge cases),
// record the schedule error (auto-disables after repeated failures)
// so a persistent throw doesn't cause a MIN_REFIRE_GAP_MS hot loop.
recordScheduleComputeError({ state, job, err });
}
if (job.schedule.kind === "cron") {
// Safety net: ensure the next fire is at least MIN_REFIRE_GAP_MS
// after the current run ended. Prevents spin-loops when the
// schedule computation lands in the same second due to
// timezone/croner edge cases (see #17821).
const minNext = result.endedAt + MIN_REFIRE_GAP_MS;
if (!skipImmediateScheduleRecompute) {
let normalNext: number | undefined;
try {
normalNext =
opts?.preserveSchedule && job.schedule.kind === "every"
? computeNextWithPreservedLastRun(result.endedAt)
: computeJobNextRunAtMs(job, result.endedAt);
} catch (err) {
// If the schedule expression/timezone throws (croner edge cases),
// record the schedule error (auto-disables after repeated failures)
// and fall back to backoff-only schedule so the state update is not lost.
recordScheduleComputeError({ state, job, err });
}
const backoffNext = result.endedAt + backoff;
// Use whichever is later: the natural next run or the backoff delay.
job.state.nextRunAtMs =
naturalNext !== undefined ? Math.max(naturalNext, minNext) : minNext;
} else {
job.state.nextRunAtMs = naturalNext;
normalNext !== undefined ? Math.max(normalNext, backoffNext) : backoffNext;
state.deps.log.info(
{
jobId: job.id,
consecutiveErrors: job.state.consecutiveErrors,
backoffMs: backoff,
nextRunAtMs: job.state.nextRunAtMs,
},
"cron: applying error backoff",
);
}
} else if (job.enabled) {
if (!skipImmediateScheduleRecompute) {
let naturalNext: number | undefined;
try {
naturalNext =
opts?.preserveSchedule && job.schedule.kind === "every"
? computeNextWithPreservedLastRun(result.endedAt)
: computeJobNextRunAtMs(job, result.endedAt);
} catch (err) {
// If the schedule expression/timezone throws (croner edge cases),
// record the schedule error (auto-disables after repeated failures)
// so a persistent throw doesn't cause a MIN_REFIRE_GAP_MS hot loop.
recordScheduleComputeError({ state, job, err });
}
if (job.schedule.kind === "cron") {
// Safety net: ensure the next fire is at least MIN_REFIRE_GAP_MS
// after the current run ended. Prevents spin-loops when the
// schedule computation lands in the same second due to
// timezone/croner edge cases (see #17821).
const minNext = result.endedAt + MIN_REFIRE_GAP_MS;
job.state.nextRunAtMs =
naturalNext !== undefined ? Math.max(naturalNext, minNext) : minNext;
} else {
job.state.nextRunAtMs = naturalNext;
}
}
} else {
job.state.nextRunAtMs = undefined;
@ -498,8 +505,7 @@ function applyOutcomeToStoredJob(state: CronServiceState, result: TimedCronRunOu
emitJobFinished(state, job, result, result.startedAt);
if (shouldDelete) {
store.jobs = jobs.filter((entry) => entry.id !== job.id);
if (shouldDelete && removeJobById(state, job.id)) {
emit(state, { jobId: job.id, action: "removed" });
}
}
@ -1194,8 +1200,7 @@ export async function executeJob(
emitJobFinished(state, job, coreResult, startedAt);
if (shouldDelete && state.store) {
state.store.jobs = state.store.jobs.filter((j) => j.id !== job.id);
if (shouldDelete && removeJobById(state, job.id)) {
emit(state, { jobId: job.id, action: "removed" });
}
}