fix(cron): narrow external reload repair scope
This commit is contained in:
parent
3cb6564ac2
commit
089077d985
@ -1,8 +1,8 @@
|
|||||||
import fs from "node:fs/promises";
|
import fs from "node:fs/promises";
|
||||||
import { describe, expect, it, vi } from "vitest";
|
import { describe, expect, it, vi } from "vitest";
|
||||||
import { setupCronServiceSuite, writeCronStoreSnapshot } from "./service.test-harness.js";
|
import { setupCronServiceSuite, writeCronStoreSnapshot } from "./service.test-harness.js";
|
||||||
import { recomputeNextRuns, recomputeNextRunsForMaintenance } from "./service/jobs.js";
|
import { recomputeNextRuns } from "./service/jobs.js";
|
||||||
import { remove, run } from "./service/ops.js";
|
import { run } from "./service/ops.js";
|
||||||
import { createCronServiceState } from "./service/state.js";
|
import { createCronServiceState } from "./service/state.js";
|
||||||
import { ensureLoaded } from "./service/store.js";
|
import { ensureLoaded } from "./service/store.js";
|
||||||
import type { CronJob } from "./types.js";
|
import type { CronJob } from "./types.js";
|
||||||
@ -12,35 +12,48 @@ const { logger: noopLogger, makeStorePath } = setupCronServiceSuite({
|
|||||||
baseTimeIso: "2026-03-19T01:44:00.000Z",
|
baseTimeIso: "2026-03-19T01:44:00.000Z",
|
||||||
});
|
});
|
||||||
|
|
||||||
describe("forceReload repairs externally changed cron schedules", () => {
|
function createCronJob(params: {
|
||||||
it("recomputes nextRunAtMs when jobs.json changes schedule outside cron.update", async () => {
|
id: string;
|
||||||
|
expr: string;
|
||||||
|
updatedAtMs?: number;
|
||||||
|
enabled?: boolean;
|
||||||
|
nextRunAtMs?: number;
|
||||||
|
scheduleErrorCount?: number;
|
||||||
|
lastError?: string;
|
||||||
|
lastStatus?: CronJob["state"]["lastStatus"];
|
||||||
|
runningAtMs?: number;
|
||||||
|
}): CronJob {
|
||||||
|
return {
|
||||||
|
id: params.id,
|
||||||
|
name: params.id,
|
||||||
|
enabled: params.enabled ?? true,
|
||||||
|
createdAtMs: Date.parse("2026-03-18T00:30:00.000Z"),
|
||||||
|
updatedAtMs: params.updatedAtMs ?? Date.parse("2026-03-19T01:44:00.000Z"),
|
||||||
|
schedule: { kind: "cron", expr: params.expr, tz: "Asia/Shanghai", staggerMs: 0 },
|
||||||
|
sessionTarget: "main",
|
||||||
|
wakeMode: "next-heartbeat",
|
||||||
|
payload: { kind: "systemEvent", text: "tick" },
|
||||||
|
state: {
|
||||||
|
nextRunAtMs: params.nextRunAtMs,
|
||||||
|
scheduleErrorCount: params.scheduleErrorCount,
|
||||||
|
lastError: params.lastError,
|
||||||
|
lastStatus: params.lastStatus,
|
||||||
|
runningAtMs: params.runningAtMs,
|
||||||
|
},
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
describe("forceReload repairs externally changed schedules", () => {
|
||||||
|
it("recomputes nextRunAtMs when jobs.json changes a cron schedule outside cron.update", async () => {
|
||||||
const store = await makeStorePath();
|
const store = await makeStorePath();
|
||||||
const nowMs = Date.parse("2026-03-19T01:44:00.000Z");
|
const nowMs = Date.parse("2026-03-19T01:44:00.000Z");
|
||||||
const jobId = "external-schedule-change";
|
const jobId = "external-schedule-change";
|
||||||
const staleNextRunAtMs = Date.parse("2026-03-20T00:30:00.000Z");
|
const staleNextRunAtMs = Date.parse("2026-03-20T00:30:00.000Z");
|
||||||
const correctedNextRunAtMs = Date.parse("2026-03-19T12:30:00.000Z");
|
const correctedNextRunAtMs = Date.parse("2026-03-19T12:30:00.000Z");
|
||||||
|
|
||||||
const createJob = (expr: string): CronJob => ({
|
|
||||||
id: jobId,
|
|
||||||
name: "external schedule change",
|
|
||||||
enabled: true,
|
|
||||||
createdAtMs: Date.parse("2026-03-18T00:30:00.000Z"),
|
|
||||||
updatedAtMs: Date.parse("2026-03-19T01:44:00.000Z"),
|
|
||||||
schedule: { kind: "cron", expr, tz: "Asia/Shanghai", staggerMs: 0 },
|
|
||||||
sessionTarget: "main",
|
|
||||||
wakeMode: "next-heartbeat",
|
|
||||||
payload: { kind: "systemEvent", text: "tick" },
|
|
||||||
state: {
|
|
||||||
nextRunAtMs: staleNextRunAtMs,
|
|
||||||
lastRunAtMs: Date.parse("2026-03-19T00:30:00.000Z"),
|
|
||||||
lastStatus: "ok",
|
|
||||||
lastRunStatus: "ok",
|
|
||||||
},
|
|
||||||
});
|
|
||||||
|
|
||||||
await writeCronStoreSnapshot({
|
await writeCronStoreSnapshot({
|
||||||
storePath: store.storePath,
|
storePath: store.storePath,
|
||||||
jobs: [createJob("30 8 * * *")],
|
jobs: [createCronJob({ id: jobId, expr: "30 8 * * *", nextRunAtMs: staleNextRunAtMs })],
|
||||||
});
|
});
|
||||||
|
|
||||||
const state = createCronServiceState({
|
const state = createCronServiceState({
|
||||||
@ -54,56 +67,44 @@ describe("forceReload repairs externally changed cron schedules", () => {
|
|||||||
});
|
});
|
||||||
|
|
||||||
await ensureLoaded(state, { skipRecompute: true });
|
await ensureLoaded(state, { skipRecompute: true });
|
||||||
expect(state.store?.jobs[0]?.state.nextRunAtMs).toBe(staleNextRunAtMs);
|
|
||||||
|
|
||||||
await writeCronStoreSnapshot({
|
await writeCronStoreSnapshot({
|
||||||
storePath: store.storePath,
|
storePath: store.storePath,
|
||||||
jobs: [createJob("30 8,20 * * *")],
|
jobs: [createCronJob({ id: jobId, expr: "30 8,20 * * *", nextRunAtMs: staleNextRunAtMs })],
|
||||||
});
|
});
|
||||||
|
|
||||||
await ensureLoaded(state, { forceReload: true, skipRecompute: true });
|
await ensureLoaded(state, { forceReload: true, skipRecompute: true });
|
||||||
|
|
||||||
const reloaded = state.store?.jobs.find((job) => job.id === jobId);
|
const reloaded = state.store?.jobs.find((job) => job.id === jobId);
|
||||||
expect(reloaded?.schedule).toEqual({
|
|
||||||
kind: "cron",
|
|
||||||
expr: "30 8,20 * * *",
|
|
||||||
tz: "Asia/Shanghai",
|
|
||||||
staggerMs: 0,
|
|
||||||
});
|
|
||||||
expect(reloaded?.state.nextRunAtMs).toBe(correctedNextRunAtMs);
|
expect(reloaded?.state.nextRunAtMs).toBe(correctedNextRunAtMs);
|
||||||
|
|
||||||
const persisted = JSON.parse(await fs.readFile(store.storePath, "utf8")) as {
|
const persisted = JSON.parse(await fs.readFile(store.storePath, "utf8")) as {
|
||||||
jobs?: Array<{ id: string; state?: { nextRunAtMs?: number } }>;
|
jobs?: Array<{ id: string; state?: { nextRunAtMs?: number } }>;
|
||||||
};
|
};
|
||||||
const persistedJob = persisted.jobs?.find((job) => job.id === jobId);
|
expect(persisted.jobs?.find((job) => job.id === jobId)?.state?.nextRunAtMs).toBe(
|
||||||
expect(persistedJob?.state?.nextRunAtMs).toBe(correctedNextRunAtMs);
|
correctedNextRunAtMs,
|
||||||
|
);
|
||||||
});
|
});
|
||||||
|
|
||||||
it("recomputes from updatedAtMs so delayed reload keeps newly earlier slots due", async () => {
|
it("recomputes from updatedAtMs so delayed reload keeps newly earlier slots due", async () => {
|
||||||
const store = await makeStorePath();
|
const store = await makeStorePath();
|
||||||
const nowMs = Date.parse("2026-03-19T12:10:00.000Z");
|
const nowMs = Date.parse("2026-03-19T12:10:00.000Z");
|
||||||
const initialUpdatedAtMs = Date.parse("2026-03-19T12:00:00.000Z");
|
|
||||||
const editedAtMs = Date.parse("2026-03-19T12:01:00.000Z");
|
|
||||||
const jobId = "external-schedule-change-delayed-observe";
|
const jobId = "external-schedule-change-delayed-observe";
|
||||||
|
const staleNextRunAtMs = Date.parse("2026-03-20T00:30:00.000Z");
|
||||||
|
|
||||||
const createJob = (params: { expr: string; updatedAtMs: number }): CronJob => ({
|
const createJob = (params: { expr: string; updatedAtMs: number }) =>
|
||||||
id: jobId,
|
createCronJob({
|
||||||
name: "external schedule delayed observe",
|
id: jobId,
|
||||||
enabled: true,
|
expr: params.expr,
|
||||||
createdAtMs: Date.parse("2026-03-18T00:30:00.000Z"),
|
updatedAtMs: params.updatedAtMs,
|
||||||
updatedAtMs: params.updatedAtMs,
|
nextRunAtMs: staleNextRunAtMs,
|
||||||
schedule: { kind: "cron", expr: params.expr, tz: "UTC", staggerMs: 0 },
|
});
|
||||||
sessionTarget: "main",
|
|
||||||
wakeMode: "next-heartbeat",
|
|
||||||
payload: { kind: "systemEvent", text: "tick" },
|
|
||||||
state: {
|
|
||||||
nextRunAtMs: Date.parse("2026-03-20T00:30:00.000Z"),
|
|
||||||
},
|
|
||||||
});
|
|
||||||
|
|
||||||
await writeCronStoreSnapshot({
|
await writeCronStoreSnapshot({
|
||||||
storePath: store.storePath,
|
storePath: store.storePath,
|
||||||
jobs: [createJob({ expr: "30 23 * * *", updatedAtMs: initialUpdatedAtMs })],
|
jobs: [
|
||||||
|
createJob({ expr: "30 23 * * *", updatedAtMs: Date.parse("2026-03-19T12:00:00.000Z") }),
|
||||||
|
],
|
||||||
});
|
});
|
||||||
|
|
||||||
const state = createCronServiceState({
|
const state = createCronServiceState({
|
||||||
@ -120,150 +121,12 @@ describe("forceReload repairs externally changed cron schedules", () => {
|
|||||||
|
|
||||||
await writeCronStoreSnapshot({
|
await writeCronStoreSnapshot({
|
||||||
storePath: store.storePath,
|
storePath: store.storePath,
|
||||||
jobs: [createJob({ expr: "* * * * *", updatedAtMs: editedAtMs })],
|
jobs: [createJob({ expr: "* * * * *", updatedAtMs: Date.parse("2026-03-19T12:01:00.000Z") })],
|
||||||
});
|
});
|
||||||
|
|
||||||
await ensureLoaded(state, { forceReload: true, skipRecompute: true });
|
await ensureLoaded(state, { forceReload: true, skipRecompute: true });
|
||||||
|
|
||||||
const reloaded = state.store?.jobs.find((job) => job.id === jobId);
|
expect(state.store?.jobs[0]?.state.nextRunAtMs).toBe(Date.parse("2026-03-19T12:02:00.000Z"));
|
||||||
expect(reloaded?.state.nextRunAtMs).toBeLessThan(nowMs);
|
|
||||||
expect(reloaded?.state.nextRunAtMs).toBe(Date.parse("2026-03-19T12:02:00.000Z"));
|
|
||||||
});
|
|
||||||
|
|
||||||
it("repairs stale nextRunAtMs on first load after restart", async () => {
|
|
||||||
const store = await makeStorePath();
|
|
||||||
const nowMs = Date.parse("2026-03-19T12:10:00.000Z");
|
|
||||||
const editedAtMs = Date.parse("2026-03-19T12:01:00.000Z");
|
|
||||||
const jobId = "external-schedule-change-cold-load";
|
|
||||||
const staleNextRunAtMs = Date.parse("2026-03-20T00:30:00.000Z");
|
|
||||||
|
|
||||||
const createJob = (): CronJob => ({
|
|
||||||
id: jobId,
|
|
||||||
name: "external schedule cold load repair",
|
|
||||||
enabled: true,
|
|
||||||
createdAtMs: Date.parse("2026-03-18T00:30:00.000Z"),
|
|
||||||
updatedAtMs: editedAtMs,
|
|
||||||
schedule: { kind: "cron", expr: "* * * * *", tz: "UTC", staggerMs: 0 },
|
|
||||||
sessionTarget: "main",
|
|
||||||
wakeMode: "next-heartbeat",
|
|
||||||
payload: { kind: "systemEvent", text: "tick" },
|
|
||||||
state: {
|
|
||||||
nextRunAtMs: staleNextRunAtMs,
|
|
||||||
},
|
|
||||||
});
|
|
||||||
|
|
||||||
await writeCronStoreSnapshot({
|
|
||||||
storePath: store.storePath,
|
|
||||||
jobs: [createJob()],
|
|
||||||
});
|
|
||||||
|
|
||||||
const state = createCronServiceState({
|
|
||||||
cronEnabled: true,
|
|
||||||
storePath: store.storePath,
|
|
||||||
log: noopLogger,
|
|
||||||
nowMs: () => nowMs,
|
|
||||||
enqueueSystemEvent: vi.fn(),
|
|
||||||
requestHeartbeatNow: vi.fn(),
|
|
||||||
runIsolatedAgentJob: vi.fn(async () => ({ status: "ok" as const })),
|
|
||||||
});
|
|
||||||
|
|
||||||
await ensureLoaded(state, { skipRecompute: true });
|
|
||||||
|
|
||||||
const reloaded = state.store?.jobs.find((job) => job.id === jobId);
|
|
||||||
expect(reloaded?.state.nextRunAtMs).toBe(Date.parse("2026-03-19T12:02:00.000Z"));
|
|
||||||
|
|
||||||
const persisted = JSON.parse(await fs.readFile(store.storePath, "utf8")) as {
|
|
||||||
jobs?: Array<{ id: string; state?: { nextRunAtMs?: number } }>;
|
|
||||||
};
|
|
||||||
const persistedJob = persisted.jobs?.find((job) => job.id === jobId);
|
|
||||||
expect(persistedJob?.state?.nextRunAtMs).toBe(Date.parse("2026-03-19T12:02:00.000Z"));
|
|
||||||
});
|
|
||||||
|
|
||||||
it("repairs overdue stale nextRunAtMs on first load when edit happened later", async () => {
|
|
||||||
const store = await makeStorePath();
|
|
||||||
const nowMs = Date.parse("2026-03-19T12:10:00.000Z");
|
|
||||||
const staleDueNextRunAtMs = Date.parse("2026-03-19T12:00:00.000Z");
|
|
||||||
const editedAtMs = Date.parse("2026-03-19T12:05:00.000Z");
|
|
||||||
const jobId = "external-schedule-change-cold-load-overdue";
|
|
||||||
|
|
||||||
const createJob = (): CronJob => ({
|
|
||||||
id: jobId,
|
|
||||||
name: "external schedule cold load overdue repair",
|
|
||||||
enabled: true,
|
|
||||||
createdAtMs: Date.parse("2026-03-18T00:30:00.000Z"),
|
|
||||||
updatedAtMs: editedAtMs,
|
|
||||||
schedule: { kind: "cron", expr: "30 23 * * *", tz: "UTC", staggerMs: 0 },
|
|
||||||
sessionTarget: "main",
|
|
||||||
wakeMode: "next-heartbeat",
|
|
||||||
payload: { kind: "systemEvent", text: "tick" },
|
|
||||||
state: {
|
|
||||||
nextRunAtMs: staleDueNextRunAtMs,
|
|
||||||
},
|
|
||||||
});
|
|
||||||
|
|
||||||
await writeCronStoreSnapshot({
|
|
||||||
storePath: store.storePath,
|
|
||||||
jobs: [createJob()],
|
|
||||||
});
|
|
||||||
|
|
||||||
const state = createCronServiceState({
|
|
||||||
cronEnabled: true,
|
|
||||||
storePath: store.storePath,
|
|
||||||
log: noopLogger,
|
|
||||||
nowMs: () => nowMs,
|
|
||||||
enqueueSystemEvent: vi.fn(),
|
|
||||||
requestHeartbeatNow: vi.fn(),
|
|
||||||
runIsolatedAgentJob: vi.fn(async () => ({ status: "ok" as const })),
|
|
||||||
});
|
|
||||||
|
|
||||||
await ensureLoaded(state, { skipRecompute: true });
|
|
||||||
|
|
||||||
const reloaded = state.store?.jobs.find((job) => job.id === jobId);
|
|
||||||
expect(reloaded?.state.nextRunAtMs).toBe(Date.parse("2026-03-19T23:30:00.000Z"));
|
|
||||||
});
|
|
||||||
|
|
||||||
it("repairs cold-load stale nextRunAtMs even when consecutiveErrors is set", async () => {
|
|
||||||
const store = await makeStorePath();
|
|
||||||
const nowMs = Date.parse("2026-03-19T12:10:00.000Z");
|
|
||||||
const editedAtMs = Date.parse("2026-03-19T12:01:00.000Z");
|
|
||||||
const jobId = "external-schedule-change-cold-load-consecutive-errors";
|
|
||||||
const staleNextRunAtMs = Date.parse("2026-03-20T00:30:00.000Z");
|
|
||||||
|
|
||||||
const createJob = (): CronJob => ({
|
|
||||||
id: jobId,
|
|
||||||
name: "external schedule cold load with consecutiveErrors",
|
|
||||||
enabled: true,
|
|
||||||
createdAtMs: Date.parse("2026-03-18T00:30:00.000Z"),
|
|
||||||
updatedAtMs: editedAtMs,
|
|
||||||
schedule: { kind: "cron", expr: "* * * * *", tz: "UTC", staggerMs: 0 },
|
|
||||||
sessionTarget: "main",
|
|
||||||
wakeMode: "next-heartbeat",
|
|
||||||
payload: { kind: "systemEvent", text: "tick" },
|
|
||||||
state: {
|
|
||||||
nextRunAtMs: staleNextRunAtMs,
|
|
||||||
consecutiveErrors: 2,
|
|
||||||
},
|
|
||||||
});
|
|
||||||
|
|
||||||
await writeCronStoreSnapshot({
|
|
||||||
storePath: store.storePath,
|
|
||||||
jobs: [createJob()],
|
|
||||||
});
|
|
||||||
|
|
||||||
const state = createCronServiceState({
|
|
||||||
cronEnabled: true,
|
|
||||||
storePath: store.storePath,
|
|
||||||
log: noopLogger,
|
|
||||||
nowMs: () => nowMs,
|
|
||||||
enqueueSystemEvent: vi.fn(),
|
|
||||||
requestHeartbeatNow: vi.fn(),
|
|
||||||
runIsolatedAgentJob: vi.fn(async () => ({ status: "ok" as const })),
|
|
||||||
});
|
|
||||||
|
|
||||||
await ensureLoaded(state, { skipRecompute: true });
|
|
||||||
|
|
||||||
const reloaded = state.store?.jobs.find((job) => job.id === jobId);
|
|
||||||
expect(reloaded?.state.nextRunAtMs).toBe(Date.parse("2026-03-19T12:02:00.000Z"));
|
|
||||||
});
|
});
|
||||||
|
|
||||||
it("records schedule errors instead of aborting reload when an external edit is invalid", async () => {
|
it("records schedule errors instead of aborting reload when an external edit is invalid", async () => {
|
||||||
@ -271,27 +134,15 @@ describe("forceReload repairs externally changed cron schedules", () => {
|
|||||||
const nowMs = Date.parse("2026-03-19T01:44:00.000Z");
|
const nowMs = Date.parse("2026-03-19T01:44:00.000Z");
|
||||||
const jobId = "external-invalid-schedule";
|
const jobId = "external-invalid-schedule";
|
||||||
|
|
||||||
const createJob = (expr: string): CronJob => ({
|
|
||||||
id: jobId,
|
|
||||||
name: "external invalid schedule",
|
|
||||||
enabled: true,
|
|
||||||
createdAtMs: Date.parse("2026-03-18T00:30:00.000Z"),
|
|
||||||
updatedAtMs: Date.parse("2026-03-19T01:44:00.000Z"),
|
|
||||||
schedule: { kind: "cron", expr, tz: "Asia/Shanghai", staggerMs: 0 },
|
|
||||||
sessionTarget: "main",
|
|
||||||
wakeMode: "next-heartbeat",
|
|
||||||
payload: { kind: "systemEvent", text: "tick" },
|
|
||||||
state: {
|
|
||||||
nextRunAtMs: Date.parse("2026-03-20T00:30:00.000Z"),
|
|
||||||
lastRunAtMs: Date.parse("2026-03-19T00:30:00.000Z"),
|
|
||||||
lastStatus: "ok",
|
|
||||||
lastRunStatus: "ok",
|
|
||||||
},
|
|
||||||
});
|
|
||||||
|
|
||||||
await writeCronStoreSnapshot({
|
await writeCronStoreSnapshot({
|
||||||
storePath: store.storePath,
|
storePath: store.storePath,
|
||||||
jobs: [createJob("30 8 * * *")],
|
jobs: [
|
||||||
|
createCronJob({
|
||||||
|
id: jobId,
|
||||||
|
expr: "30 8 * * *",
|
||||||
|
nextRunAtMs: Date.parse("2026-03-20T00:30:00.000Z"),
|
||||||
|
}),
|
||||||
|
],
|
||||||
});
|
});
|
||||||
|
|
||||||
const state = createCronServiceState({
|
const state = createCronServiceState({
|
||||||
@ -308,53 +159,39 @@ describe("forceReload repairs externally changed cron schedules", () => {
|
|||||||
|
|
||||||
await writeCronStoreSnapshot({
|
await writeCronStoreSnapshot({
|
||||||
storePath: store.storePath,
|
storePath: store.storePath,
|
||||||
jobs: [createJob("not a valid cron")],
|
jobs: [
|
||||||
|
createCronJob({
|
||||||
|
id: jobId,
|
||||||
|
expr: "not a valid cron",
|
||||||
|
nextRunAtMs: Date.parse("2026-03-20T00:30:00.000Z"),
|
||||||
|
}),
|
||||||
|
],
|
||||||
});
|
});
|
||||||
|
|
||||||
await expect(
|
await expect(
|
||||||
ensureLoaded(state, { forceReload: true, skipRecompute: true }),
|
ensureLoaded(state, { forceReload: true, skipRecompute: true }),
|
||||||
).resolves.toBeUndefined();
|
).resolves.toBeUndefined();
|
||||||
|
|
||||||
const reloaded = state.store?.jobs.find((job) => job.id === jobId);
|
const reloaded = state.store?.jobs[0];
|
||||||
expect(reloaded?.state.nextRunAtMs).toBeUndefined();
|
expect(reloaded?.state.nextRunAtMs).toBeUndefined();
|
||||||
expect(reloaded?.state.scheduleErrorCount).toBe(1);
|
expect(reloaded?.state.scheduleErrorCount).toBe(1);
|
||||||
expect(reloaded?.state.lastError).toMatch(/^schedule error:/);
|
expect(reloaded?.state.lastError).toMatch(/^schedule error:/);
|
||||||
|
|
||||||
const persisted = JSON.parse(await fs.readFile(store.storePath, "utf8")) as {
|
|
||||||
jobs?: Array<{
|
|
||||||
id: string;
|
|
||||||
state?: { scheduleErrorCount?: number; lastError?: string; nextRunAtMs?: number };
|
|
||||||
}>;
|
|
||||||
};
|
|
||||||
const persistedJob = persisted.jobs?.find((job) => job.id === jobId);
|
|
||||||
expect(persistedJob?.state?.scheduleErrorCount).toBe(1);
|
|
||||||
expect(persistedJob?.state?.lastError).toMatch(/^schedule error:/);
|
|
||||||
expect(persistedJob?.state?.nextRunAtMs).toBeUndefined();
|
|
||||||
});
|
});
|
||||||
|
|
||||||
it("does not double-count a reload schedule error during the immediate full recompute", async () => {
|
it("does not double-count a reload schedule error during the immediate recompute", async () => {
|
||||||
const store = await makeStorePath();
|
const store = await makeStorePath();
|
||||||
const nowMs = Date.parse("2026-03-19T01:44:00.000Z");
|
const nowMs = Date.parse("2026-03-19T01:44:00.000Z");
|
||||||
const jobId = "external-invalid-schedule-full-recompute";
|
const jobId = "external-invalid-schedule-full-recompute";
|
||||||
|
|
||||||
const createJob = (expr: string): CronJob => ({
|
|
||||||
id: jobId,
|
|
||||||
name: "external invalid schedule full recompute",
|
|
||||||
enabled: true,
|
|
||||||
createdAtMs: Date.parse("2026-03-18T00:30:00.000Z"),
|
|
||||||
updatedAtMs: Date.parse("2026-03-19T01:44:00.000Z"),
|
|
||||||
schedule: { kind: "cron", expr, tz: "Asia/Shanghai", staggerMs: 0 },
|
|
||||||
sessionTarget: "main",
|
|
||||||
wakeMode: "next-heartbeat",
|
|
||||||
payload: { kind: "systemEvent", text: "tick" },
|
|
||||||
state: {
|
|
||||||
nextRunAtMs: Date.parse("2026-03-20T00:30:00.000Z"),
|
|
||||||
},
|
|
||||||
});
|
|
||||||
|
|
||||||
await writeCronStoreSnapshot({
|
await writeCronStoreSnapshot({
|
||||||
storePath: store.storePath,
|
storePath: store.storePath,
|
||||||
jobs: [createJob("30 8 * * *")],
|
jobs: [
|
||||||
|
createCronJob({
|
||||||
|
id: jobId,
|
||||||
|
expr: "30 8 * * *",
|
||||||
|
nextRunAtMs: Date.parse("2026-03-20T00:30:00.000Z"),
|
||||||
|
}),
|
||||||
|
],
|
||||||
});
|
});
|
||||||
|
|
||||||
const state = createCronServiceState({
|
const state = createCronServiceState({
|
||||||
@ -371,7 +208,13 @@ describe("forceReload repairs externally changed cron schedules", () => {
|
|||||||
|
|
||||||
await writeCronStoreSnapshot({
|
await writeCronStoreSnapshot({
|
||||||
storePath: store.storePath,
|
storePath: store.storePath,
|
||||||
jobs: [createJob("not a valid cron")],
|
jobs: [
|
||||||
|
createCronJob({
|
||||||
|
id: jobId,
|
||||||
|
expr: "not a valid cron",
|
||||||
|
nextRunAtMs: Date.parse("2026-03-20T00:30:00.000Z"),
|
||||||
|
}),
|
||||||
|
],
|
||||||
});
|
});
|
||||||
|
|
||||||
await ensureLoaded(state, { forceReload: true, skipRecompute: true });
|
await ensureLoaded(state, { forceReload: true, skipRecompute: true });
|
||||||
@ -379,116 +222,6 @@ describe("forceReload repairs externally changed cron schedules", () => {
|
|||||||
|
|
||||||
recomputeNextRuns(state);
|
recomputeNextRuns(state);
|
||||||
expect(state.store?.jobs[0]?.state.scheduleErrorCount).toBe(1);
|
expect(state.store?.jobs[0]?.state.scheduleErrorCount).toBe(1);
|
||||||
|
|
||||||
recomputeNextRuns(state);
|
|
||||||
expect(state.store?.jobs[0]?.state.scheduleErrorCount).toBe(2);
|
|
||||||
});
|
|
||||||
|
|
||||||
it("does not double-count a reload schedule error during immediate maintenance recompute", async () => {
|
|
||||||
const store = await makeStorePath();
|
|
||||||
const nowMs = Date.parse("2026-03-19T01:44:00.000Z");
|
|
||||||
const jobId = "external-invalid-schedule-maintenance";
|
|
||||||
|
|
||||||
const createJob = (expr: string): CronJob => ({
|
|
||||||
id: jobId,
|
|
||||||
name: "external invalid schedule maintenance",
|
|
||||||
enabled: true,
|
|
||||||
createdAtMs: Date.parse("2026-03-18T00:30:00.000Z"),
|
|
||||||
updatedAtMs: Date.parse("2026-03-19T01:44:00.000Z"),
|
|
||||||
schedule: { kind: "cron", expr, tz: "Asia/Shanghai", staggerMs: 0 },
|
|
||||||
sessionTarget: "main",
|
|
||||||
wakeMode: "next-heartbeat",
|
|
||||||
payload: { kind: "systemEvent", text: "tick" },
|
|
||||||
state: {
|
|
||||||
nextRunAtMs: Date.parse("2026-03-20T00:30:00.000Z"),
|
|
||||||
},
|
|
||||||
});
|
|
||||||
|
|
||||||
await writeCronStoreSnapshot({
|
|
||||||
storePath: store.storePath,
|
|
||||||
jobs: [createJob("30 8 * * *")],
|
|
||||||
});
|
|
||||||
|
|
||||||
const state = createCronServiceState({
|
|
||||||
cronEnabled: true,
|
|
||||||
storePath: store.storePath,
|
|
||||||
log: noopLogger,
|
|
||||||
nowMs: () => nowMs,
|
|
||||||
enqueueSystemEvent: vi.fn(),
|
|
||||||
requestHeartbeatNow: vi.fn(),
|
|
||||||
runIsolatedAgentJob: vi.fn(async () => ({ status: "ok" as const })),
|
|
||||||
});
|
|
||||||
|
|
||||||
await ensureLoaded(state, { skipRecompute: true });
|
|
||||||
|
|
||||||
await writeCronStoreSnapshot({
|
|
||||||
storePath: store.storePath,
|
|
||||||
jobs: [createJob("not a valid cron")],
|
|
||||||
});
|
|
||||||
|
|
||||||
await ensureLoaded(state, { forceReload: true, skipRecompute: true });
|
|
||||||
expect(state.store?.jobs[0]?.state.scheduleErrorCount).toBe(1);
|
|
||||||
|
|
||||||
recomputeNextRunsForMaintenance(state);
|
|
||||||
expect(state.store?.jobs[0]?.state.scheduleErrorCount).toBe(1);
|
|
||||||
|
|
||||||
recomputeNextRunsForMaintenance(state);
|
|
||||||
expect(state.store?.jobs[0]?.state.scheduleErrorCount).toBe(2);
|
|
||||||
});
|
|
||||||
|
|
||||||
it("preserves the one-shot skip across a second forceReload before maintenance recompute", async () => {
|
|
||||||
const store = await makeStorePath();
|
|
||||||
const nowMs = Date.parse("2026-03-19T01:44:00.000Z");
|
|
||||||
const jobId = "external-invalid-schedule-second-reload";
|
|
||||||
|
|
||||||
const createJob = (expr: string): CronJob => ({
|
|
||||||
id: jobId,
|
|
||||||
name: "external invalid schedule second reload",
|
|
||||||
enabled: true,
|
|
||||||
createdAtMs: Date.parse("2026-03-18T00:30:00.000Z"),
|
|
||||||
updatedAtMs: Date.parse("2026-03-19T01:44:00.000Z"),
|
|
||||||
schedule: { kind: "cron", expr, tz: "Asia/Shanghai", staggerMs: 0 },
|
|
||||||
sessionTarget: "main",
|
|
||||||
wakeMode: "next-heartbeat",
|
|
||||||
payload: { kind: "systemEvent", text: "tick" },
|
|
||||||
state: {
|
|
||||||
nextRunAtMs: Date.parse("2026-03-20T00:30:00.000Z"),
|
|
||||||
},
|
|
||||||
});
|
|
||||||
|
|
||||||
await writeCronStoreSnapshot({
|
|
||||||
storePath: store.storePath,
|
|
||||||
jobs: [createJob("30 8 * * *")],
|
|
||||||
});
|
|
||||||
|
|
||||||
const state = createCronServiceState({
|
|
||||||
cronEnabled: true,
|
|
||||||
storePath: store.storePath,
|
|
||||||
log: noopLogger,
|
|
||||||
nowMs: () => nowMs,
|
|
||||||
enqueueSystemEvent: vi.fn(),
|
|
||||||
requestHeartbeatNow: vi.fn(),
|
|
||||||
runIsolatedAgentJob: vi.fn(async () => ({ status: "ok" as const })),
|
|
||||||
});
|
|
||||||
|
|
||||||
await ensureLoaded(state, { skipRecompute: true });
|
|
||||||
|
|
||||||
await writeCronStoreSnapshot({
|
|
||||||
storePath: store.storePath,
|
|
||||||
jobs: [createJob("not a valid cron")],
|
|
||||||
});
|
|
||||||
|
|
||||||
await ensureLoaded(state, { forceReload: true, skipRecompute: true });
|
|
||||||
expect(state.store?.jobs[0]?.state.scheduleErrorCount).toBe(1);
|
|
||||||
|
|
||||||
await ensureLoaded(state, { forceReload: true, skipRecompute: true });
|
|
||||||
expect(state.store?.jobs[0]?.state.scheduleErrorCount).toBe(1);
|
|
||||||
|
|
||||||
recomputeNextRunsForMaintenance(state);
|
|
||||||
expect(state.store?.jobs[0]?.state.scheduleErrorCount).toBe(1);
|
|
||||||
|
|
||||||
recomputeNextRunsForMaintenance(state);
|
|
||||||
expect(state.store?.jobs[0]?.state.scheduleErrorCount).toBe(2);
|
|
||||||
});
|
});
|
||||||
|
|
||||||
it("keeps forceReload repairs when manual-run snapshot is merged back", async () => {
|
it("keeps forceReload repairs when manual-run snapshot is merged back", async () => {
|
||||||
@ -497,49 +230,26 @@ describe("forceReload repairs externally changed cron schedules", () => {
|
|||||||
const jobId = "manual-run-reload-merge";
|
const jobId = "manual-run-reload-merge";
|
||||||
const staleNextRunAtMs = Date.parse("2026-03-19T23:30:00.000Z");
|
const staleNextRunAtMs = Date.parse("2026-03-19T23:30:00.000Z");
|
||||||
|
|
||||||
const createJob = (params: {
|
const createJob = (params: { expr: string; enabled: boolean; nextRunAtMs?: number }) => ({
|
||||||
expr: string;
|
...createCronJob({
|
||||||
enabled: boolean;
|
id: jobId,
|
||||||
nextRunAtMs?: number;
|
expr: params.expr,
|
||||||
lastStatus?: CronJob["state"]["lastStatus"];
|
enabled: params.enabled,
|
||||||
}): CronJob => ({
|
|
||||||
id: jobId,
|
|
||||||
name: "manual run reload merge",
|
|
||||||
enabled: params.enabled,
|
|
||||||
createdAtMs: Date.parse("2026-03-18T00:30:00.000Z"),
|
|
||||||
updatedAtMs: Date.parse("2026-03-19T01:44:00.000Z"),
|
|
||||||
schedule: { kind: "cron", expr: params.expr, tz: "Asia/Shanghai", staggerMs: 0 },
|
|
||||||
sessionTarget: "isolated",
|
|
||||||
wakeMode: "next-heartbeat",
|
|
||||||
payload: { kind: "agentTurn", message: "tick" },
|
|
||||||
state: {
|
|
||||||
nextRunAtMs: params.nextRunAtMs,
|
nextRunAtMs: params.nextRunAtMs,
|
||||||
lastStatus: params.lastStatus,
|
}),
|
||||||
},
|
sessionTarget: "isolated" as const,
|
||||||
|
payload: { kind: "agentTurn", message: "tick" } as const,
|
||||||
});
|
});
|
||||||
|
|
||||||
await writeCronStoreSnapshot({
|
await writeCronStoreSnapshot({
|
||||||
storePath: store.storePath,
|
storePath: store.storePath,
|
||||||
jobs: [
|
jobs: [createJob({ expr: "30 23 * * *", enabled: true, nextRunAtMs: staleNextRunAtMs })],
|
||||||
createJob({
|
|
||||||
expr: "30 23 * * *",
|
|
||||||
enabled: true,
|
|
||||||
nextRunAtMs: staleNextRunAtMs,
|
|
||||||
}),
|
|
||||||
],
|
|
||||||
});
|
});
|
||||||
|
|
||||||
const runIsolatedAgentJob = vi.fn(async () => {
|
const runIsolatedAgentJob = vi.fn(async () => {
|
||||||
await writeCronStoreSnapshot({
|
await writeCronStoreSnapshot({
|
||||||
storePath: store.storePath,
|
storePath: store.storePath,
|
||||||
jobs: [
|
jobs: [createJob({ expr: "30 8 * * *", enabled: false, nextRunAtMs: staleNextRunAtMs })],
|
||||||
createJob({
|
|
||||||
expr: "30 8 * * *",
|
|
||||||
enabled: false,
|
|
||||||
nextRunAtMs: staleNextRunAtMs,
|
|
||||||
lastStatus: "error",
|
|
||||||
}),
|
|
||||||
],
|
|
||||||
});
|
});
|
||||||
nowMs += 500;
|
nowMs += 500;
|
||||||
return { status: "ok" as const, summary: "done" };
|
return { status: "ok" as const, summary: "done" };
|
||||||
@ -555,33 +265,12 @@ describe("forceReload repairs externally changed cron schedules", () => {
|
|||||||
runIsolatedAgentJob,
|
runIsolatedAgentJob,
|
||||||
});
|
});
|
||||||
|
|
||||||
const result = await run(state, jobId, "force");
|
expect(await run(state, jobId, "force")).toEqual({ ok: true, ran: true });
|
||||||
expect(result).toEqual({ ok: true, ran: true });
|
|
||||||
expect(runIsolatedAgentJob).toHaveBeenCalledTimes(1);
|
|
||||||
|
|
||||||
const merged = state.store?.jobs.find((job) => job.id === jobId);
|
const merged = state.store?.jobs[0];
|
||||||
expect(merged?.schedule).toEqual({
|
|
||||||
kind: "cron",
|
|
||||||
expr: "30 8 * * *",
|
|
||||||
tz: "Asia/Shanghai",
|
|
||||||
staggerMs: 0,
|
|
||||||
});
|
|
||||||
expect(merged?.enabled).toBe(false);
|
expect(merged?.enabled).toBe(false);
|
||||||
expect(merged?.state.nextRunAtMs).toBeUndefined();
|
expect(merged?.state.nextRunAtMs).toBeUndefined();
|
||||||
expect(merged?.state.lastStatus).toBe("ok");
|
expect(merged?.state.lastStatus).toBe("ok");
|
||||||
expect(merged?.state.lastRunAtMs).toBeDefined();
|
|
||||||
|
|
||||||
const persisted = JSON.parse(await fs.readFile(store.storePath, "utf8")) as {
|
|
||||||
jobs?: Array<{
|
|
||||||
id: string;
|
|
||||||
enabled?: boolean;
|
|
||||||
state?: { nextRunAtMs?: number; lastStatus?: string };
|
|
||||||
}>;
|
|
||||||
};
|
|
||||||
const persistedJob = persisted.jobs?.find((job) => job.id === jobId);
|
|
||||||
expect(persistedJob?.enabled).toBe(false);
|
|
||||||
expect(persistedJob?.state?.nextRunAtMs).toBeUndefined();
|
|
||||||
expect(persistedJob?.state?.lastStatus).toBe("ok");
|
|
||||||
});
|
});
|
||||||
|
|
||||||
it("keeps scheduleErrorCount cleared when external reload fixes schedule during force-run", async () => {
|
it("keeps scheduleErrorCount cleared when external reload fixes schedule during force-run", async () => {
|
||||||
@ -590,51 +279,27 @@ describe("forceReload repairs externally changed cron schedules", () => {
|
|||||||
const jobId = "manual-run-reload-clears-schedule-error-count";
|
const jobId = "manual-run-reload-clears-schedule-error-count";
|
||||||
const staleNextRunAtMs = Date.parse("2026-03-19T23:30:00.000Z");
|
const staleNextRunAtMs = Date.parse("2026-03-19T23:30:00.000Z");
|
||||||
|
|
||||||
const createJob = (params: {
|
const createJob = (expr: string) => ({
|
||||||
expr: string;
|
...createCronJob({
|
||||||
scheduleErrorCount?: number;
|
id: jobId,
|
||||||
lastError?: string;
|
expr,
|
||||||
nextRunAtMs?: number;
|
nextRunAtMs: staleNextRunAtMs,
|
||||||
}): CronJob => ({
|
scheduleErrorCount: 2,
|
||||||
id: jobId,
|
lastError: "schedule error: invalid expression",
|
||||||
name: "manual run reload clears schedule error count",
|
}),
|
||||||
enabled: true,
|
sessionTarget: "isolated" as const,
|
||||||
createdAtMs: Date.parse("2026-03-18T00:30:00.000Z"),
|
payload: { kind: "agentTurn", message: "tick" } as const,
|
||||||
updatedAtMs: Date.parse("2026-03-19T01:44:00.000Z"),
|
|
||||||
schedule: { kind: "cron", expr: params.expr, tz: "Asia/Shanghai", staggerMs: 0 },
|
|
||||||
sessionTarget: "isolated",
|
|
||||||
wakeMode: "next-heartbeat",
|
|
||||||
payload: { kind: "agentTurn", message: "tick" },
|
|
||||||
state: {
|
|
||||||
nextRunAtMs: params.nextRunAtMs,
|
|
||||||
scheduleErrorCount: params.scheduleErrorCount,
|
|
||||||
lastError: params.lastError,
|
|
||||||
},
|
|
||||||
});
|
});
|
||||||
|
|
||||||
await writeCronStoreSnapshot({
|
await writeCronStoreSnapshot({
|
||||||
storePath: store.storePath,
|
storePath: store.storePath,
|
||||||
jobs: [
|
jobs: [createJob("30 23 * * *")],
|
||||||
createJob({
|
|
||||||
expr: "30 23 * * *",
|
|
||||||
nextRunAtMs: staleNextRunAtMs,
|
|
||||||
scheduleErrorCount: 2,
|
|
||||||
lastError: "cron: invalid expression",
|
|
||||||
}),
|
|
||||||
],
|
|
||||||
});
|
});
|
||||||
|
|
||||||
const runIsolatedAgentJob = vi.fn(async () => {
|
const runIsolatedAgentJob = vi.fn(async () => {
|
||||||
await writeCronStoreSnapshot({
|
await writeCronStoreSnapshot({
|
||||||
storePath: store.storePath,
|
storePath: store.storePath,
|
||||||
jobs: [
|
jobs: [createJob("30 8 * * *")],
|
||||||
createJob({
|
|
||||||
expr: "30 8 * * *",
|
|
||||||
nextRunAtMs: staleNextRunAtMs,
|
|
||||||
scheduleErrorCount: 2,
|
|
||||||
lastError: "cron: invalid expression",
|
|
||||||
}),
|
|
||||||
],
|
|
||||||
});
|
});
|
||||||
nowMs += 500;
|
nowMs += 500;
|
||||||
return { status: "ok" as const, summary: "done" };
|
return { status: "ok" as const, summary: "done" };
|
||||||
@ -650,110 +315,26 @@ describe("forceReload repairs externally changed cron schedules", () => {
|
|||||||
runIsolatedAgentJob,
|
runIsolatedAgentJob,
|
||||||
});
|
});
|
||||||
|
|
||||||
const result = await run(state, jobId, "force");
|
expect(await run(state, jobId, "force")).toEqual({ ok: true, ran: true });
|
||||||
expect(result).toEqual({ ok: true, ran: true });
|
expect(state.store?.jobs[0]?.state.scheduleErrorCount).toBeUndefined();
|
||||||
expect(runIsolatedAgentJob).toHaveBeenCalledTimes(1);
|
|
||||||
|
|
||||||
const merged = state.store?.jobs.find((job) => job.id === jobId);
|
|
||||||
expect(merged?.schedule).toEqual({
|
|
||||||
kind: "cron",
|
|
||||||
expr: "30 8 * * *",
|
|
||||||
tz: "Asia/Shanghai",
|
|
||||||
staggerMs: 0,
|
|
||||||
});
|
|
||||||
expect(merged?.state.scheduleErrorCount).toBeUndefined();
|
|
||||||
|
|
||||||
const persisted = JSON.parse(await fs.readFile(store.storePath, "utf8")) as {
|
|
||||||
jobs?: Array<{
|
|
||||||
id: string;
|
|
||||||
state?: { scheduleErrorCount?: number };
|
|
||||||
}>;
|
|
||||||
};
|
|
||||||
const persistedJob = persisted.jobs?.find((job) => job.id === jobId);
|
|
||||||
expect(persistedJob?.state?.scheduleErrorCount).toBeUndefined();
|
|
||||||
});
|
});
|
||||||
|
|
||||||
it("keeps one-shot terminal disable state when manual force-run reloads unchanged store", async () => {
|
it("preserves runningAtMs when an external reload comes from a stale file snapshot", async () => {
|
||||||
const store = await makeStorePath();
|
const store = await makeStorePath();
|
||||||
let nowMs = Date.parse("2026-03-19T01:44:00.000Z");
|
const nowMs = Date.parse("2026-03-19T12:10:00.000Z");
|
||||||
const jobId = "manual-run-at-terminal-state";
|
const jobId = "external-running-marker";
|
||||||
const scheduledAtMs = nowMs + 60_000;
|
const runningAtMs = Date.parse("2026-03-19T12:00:00.000Z");
|
||||||
|
|
||||||
const createJob = (): CronJob => ({
|
|
||||||
id: jobId,
|
|
||||||
name: "manual run at terminal state",
|
|
||||||
enabled: true,
|
|
||||||
createdAtMs: Date.parse("2026-03-18T00:30:00.000Z"),
|
|
||||||
updatedAtMs: Date.parse("2026-03-19T01:44:00.000Z"),
|
|
||||||
schedule: { kind: "at", at: new Date(scheduledAtMs).toISOString() },
|
|
||||||
sessionTarget: "isolated",
|
|
||||||
wakeMode: "next-heartbeat",
|
|
||||||
payload: { kind: "agentTurn", message: "tick" },
|
|
||||||
state: {
|
|
||||||
nextRunAtMs: scheduledAtMs,
|
|
||||||
},
|
|
||||||
});
|
|
||||||
|
|
||||||
await writeCronStoreSnapshot({
|
await writeCronStoreSnapshot({
|
||||||
storePath: store.storePath,
|
storePath: store.storePath,
|
||||||
jobs: [createJob()],
|
jobs: [
|
||||||
});
|
createCronJob({
|
||||||
|
id: jobId,
|
||||||
const state = createCronServiceState({
|
expr: "30 23 * * *",
|
||||||
cronEnabled: true,
|
nextRunAtMs: Date.parse("2026-03-20T00:30:00.000Z"),
|
||||||
storePath: store.storePath,
|
runningAtMs,
|
||||||
log: noopLogger,
|
}),
|
||||||
nowMs: () => nowMs,
|
],
|
||||||
enqueueSystemEvent: vi.fn(),
|
|
||||||
requestHeartbeatNow: vi.fn(),
|
|
||||||
runIsolatedAgentJob: vi.fn(async () => {
|
|
||||||
nowMs += 500;
|
|
||||||
return { status: "ok" as const, summary: "done" };
|
|
||||||
}),
|
|
||||||
});
|
|
||||||
|
|
||||||
const result = await run(state, jobId, "force");
|
|
||||||
expect(result).toEqual({ ok: true, ran: true });
|
|
||||||
|
|
||||||
const merged = state.store?.jobs.find((job) => job.id === jobId);
|
|
||||||
expect(merged?.enabled).toBe(false);
|
|
||||||
expect(merged?.state.nextRunAtMs).toBeUndefined();
|
|
||||||
expect(merged?.state.lastStatus).toBe("ok");
|
|
||||||
|
|
||||||
const persisted = JSON.parse(await fs.readFile(store.storePath, "utf8")) as {
|
|
||||||
jobs?: Array<{
|
|
||||||
id: string;
|
|
||||||
enabled?: boolean;
|
|
||||||
state?: { nextRunAtMs?: number; lastStatus?: string };
|
|
||||||
}>;
|
|
||||||
};
|
|
||||||
const persistedJob = persisted.jobs?.find((job) => job.id === jobId);
|
|
||||||
expect(persistedJob?.enabled).toBe(false);
|
|
||||||
expect(persistedJob?.state?.nextRunAtMs).toBeUndefined();
|
|
||||||
expect(persistedJob?.state?.lastStatus).toBe("ok");
|
|
||||||
});
|
|
||||||
|
|
||||||
it("clears reload-repair skip markers when a job is removed before same-id rebuild", async () => {
|
|
||||||
const store = await makeStorePath();
|
|
||||||
const nowMs = Date.parse("2026-03-19T01:44:00.000Z");
|
|
||||||
const jobId = "external-reload-skip-marker-id-reuse";
|
|
||||||
|
|
||||||
const createJob = (expr: string): CronJob => ({
|
|
||||||
id: jobId,
|
|
||||||
name: "external reload skip marker id reuse",
|
|
||||||
enabled: true,
|
|
||||||
createdAtMs: Date.parse("2026-03-18T00:30:00.000Z"),
|
|
||||||
updatedAtMs: Date.parse("2026-03-19T01:44:00.000Z"),
|
|
||||||
schedule: { kind: "cron", expr, tz: "UTC", staggerMs: 0 },
|
|
||||||
sessionTarget: "main",
|
|
||||||
wakeMode: "next-heartbeat",
|
|
||||||
payload: { kind: "systemEvent", text: "tick" },
|
|
||||||
state: {},
|
|
||||||
});
|
|
||||||
|
|
||||||
await writeCronStoreSnapshot({
|
|
||||||
storePath: store.storePath,
|
|
||||||
jobs: [createJob("*/15 * * * *")],
|
|
||||||
});
|
});
|
||||||
|
|
||||||
const state = createCronServiceState({
|
const state = createCronServiceState({
|
||||||
@ -770,36 +351,31 @@ describe("forceReload repairs externally changed cron schedules", () => {
|
|||||||
|
|
||||||
await writeCronStoreSnapshot({
|
await writeCronStoreSnapshot({
|
||||||
storePath: store.storePath,
|
storePath: store.storePath,
|
||||||
jobs: [createJob("not a valid cron")],
|
jobs: [
|
||||||
|
createCronJob({
|
||||||
|
id: jobId,
|
||||||
|
expr: "* * * * *",
|
||||||
|
updatedAtMs: Date.parse("2026-03-19T12:01:00.000Z"),
|
||||||
|
nextRunAtMs: Date.parse("2026-03-20T00:30:00.000Z"),
|
||||||
|
}),
|
||||||
|
],
|
||||||
});
|
});
|
||||||
|
|
||||||
await ensureLoaded(state, { forceReload: true, skipRecompute: true });
|
await ensureLoaded(state, { forceReload: true, skipRecompute: true });
|
||||||
expect(state.skipNextReloadRepairRecomputeJobIds.has(jobId)).toBe(true);
|
|
||||||
|
|
||||||
const removed = await remove(state, jobId);
|
const reloaded = state.store?.jobs[0];
|
||||||
expect(removed).toEqual({ ok: true, removed: true });
|
expect(reloaded?.state.runningAtMs).toBe(runningAtMs);
|
||||||
expect(state.skipNextReloadRepairRecomputeJobIds.has(jobId)).toBe(false);
|
expect(reloaded?.state.nextRunAtMs).toBe(Date.parse("2026-03-19T12:02:00.000Z"));
|
||||||
|
|
||||||
await writeCronStoreSnapshot({
|
|
||||||
storePath: store.storePath,
|
|
||||||
jobs: [createJob("*/5 * * * *")],
|
|
||||||
});
|
|
||||||
await ensureLoaded(state, { forceReload: true, skipRecompute: true });
|
|
||||||
recomputeNextRunsForMaintenance(state);
|
|
||||||
|
|
||||||
const rebuilt = state.store?.jobs.find((job) => job.id === jobId);
|
|
||||||
expect(typeof rebuilt?.state.nextRunAtMs).toBe("number");
|
|
||||||
expect(Number.isFinite(rebuilt?.state.nextRunAtMs)).toBe(true);
|
|
||||||
expect(rebuilt?.state.scheduleErrorCount).toBeUndefined();
|
|
||||||
});
|
});
|
||||||
|
|
||||||
it("recomputes nextRunAtMs when external every schedule changes", async () => {
|
it("recomputes nextRunAtMs when an external every schedule changes", async () => {
|
||||||
const store = await makeStorePath();
|
const store = await makeStorePath();
|
||||||
const nowMs = Date.parse("2026-03-19T01:44:00.000Z");
|
const nowMs = Date.parse("2026-03-19T01:44:00.000Z");
|
||||||
const jobId = "external-every-schedule-change";
|
const jobId = "external-every-schedule-change";
|
||||||
|
|
||||||
const createEveryJob = (everyMs: number): CronJob => ({
|
const createEveryJob = (everyMs: number): CronJob => ({
|
||||||
id: jobId,
|
id: jobId,
|
||||||
name: "external every schedule change",
|
name: jobId,
|
||||||
enabled: true,
|
enabled: true,
|
||||||
createdAtMs: Date.parse("2026-03-18T00:00:00.000Z"),
|
createdAtMs: Date.parse("2026-03-18T00:00:00.000Z"),
|
||||||
updatedAtMs: Date.parse("2026-03-19T01:44:00.000Z"),
|
updatedAtMs: Date.parse("2026-03-19T01:44:00.000Z"),
|
||||||
@ -840,12 +416,6 @@ describe("forceReload repairs externally changed cron schedules", () => {
|
|||||||
|
|
||||||
await ensureLoaded(state, { forceReload: true, skipRecompute: true });
|
await ensureLoaded(state, { forceReload: true, skipRecompute: true });
|
||||||
|
|
||||||
const reloaded = state.store?.jobs.find((job) => job.id === jobId);
|
expect(state.store?.jobs[0]?.state.nextRunAtMs).toBe(Date.parse("2026-03-19T01:44:00.000Z"));
|
||||||
expect(reloaded?.schedule).toEqual({
|
|
||||||
kind: "every",
|
|
||||||
everyMs: 60_000,
|
|
||||||
anchorMs: Date.parse("2026-03-19T00:00:00.000Z"),
|
|
||||||
});
|
|
||||||
expect(reloaded?.state.nextRunAtMs).toBe(Date.parse("2026-03-19T01:44:00.000Z"));
|
|
||||||
});
|
});
|
||||||
});
|
});
|
||||||
|
|||||||
@ -334,6 +334,7 @@ export async function update(state: CronServiceState, id: string, patch: CronJob
|
|||||||
job.state.nextRunAtMs = undefined;
|
job.state.nextRunAtMs = undefined;
|
||||||
job.state.runningAtMs = undefined;
|
job.state.runningAtMs = undefined;
|
||||||
}
|
}
|
||||||
|
state.skipNextReloadRepairRecomputeJobIds.delete(id);
|
||||||
} else if (job.enabled) {
|
} else if (job.enabled) {
|
||||||
// Non-schedule edits should not mutate other jobs, but still repair a
|
// Non-schedule edits should not mutate other jobs, but still repair a
|
||||||
// missing/corrupt nextRunAtMs for the updated job.
|
// missing/corrupt nextRunAtMs for the updated job.
|
||||||
|
|||||||
@ -39,54 +39,13 @@ function resolveExternalRepairComputeBaseMs(params: {
|
|||||||
return Math.min(nowMs, normalizedReloadedUpdatedAtMs);
|
return Math.min(nowMs, normalizedReloadedUpdatedAtMs);
|
||||||
}
|
}
|
||||||
|
|
||||||
function hasPendingErrorBackoff(job: CronJob, nowMs: number): boolean {
|
|
||||||
const nextRunAtMs = job.state.nextRunAtMs;
|
|
||||||
if (typeof nextRunAtMs !== "number" || !Number.isFinite(nextRunAtMs) || nextRunAtMs <= nowMs) {
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
const consecutiveErrors = job.state.consecutiveErrors;
|
|
||||||
if (
|
|
||||||
typeof consecutiveErrors !== "number" ||
|
|
||||||
!Number.isFinite(consecutiveErrors) ||
|
|
||||||
consecutiveErrors <= 0
|
|
||||||
) {
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
return job.state.lastStatus === "error";
|
|
||||||
}
|
|
||||||
|
|
||||||
function shouldRepairColdLoadNextRun(params: { job: CronJob; nowMs: number }): boolean {
|
|
||||||
const { job, nowMs } = params;
|
|
||||||
if (!job.enabled) {
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
if (typeof job.updatedAtMs !== "number" || !Number.isFinite(job.updatedAtMs)) {
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
const persistedNextRunAtMs = job.state.nextRunAtMs;
|
|
||||||
if (typeof persistedNextRunAtMs !== "number" || !Number.isFinite(persistedNextRunAtMs)) {
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
const normalizedUpdatedAtMs = Math.max(0, Math.floor(job.updatedAtMs));
|
|
||||||
// If a schedule edit happened after the persisted slot, the slot is stale
|
|
||||||
// even when it is already overdue at startup.
|
|
||||||
if (persistedNextRunAtMs <= nowMs) {
|
|
||||||
return normalizedUpdatedAtMs > persistedNextRunAtMs;
|
|
||||||
}
|
|
||||||
if (hasPendingErrorBackoff(job, nowMs)) {
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
const computeBaseMs = Math.min(nowMs, normalizedUpdatedAtMs);
|
|
||||||
return computeBaseMs < persistedNextRunAtMs;
|
|
||||||
}
|
|
||||||
|
|
||||||
function repairNextRunsAfterExternalReload(params: {
|
function repairNextRunsAfterExternalReload(params: {
|
||||||
state: CronServiceState;
|
state: CronServiceState;
|
||||||
previousJobs: CronJob[] | undefined;
|
previousJobs: CronJob[] | undefined;
|
||||||
}): boolean {
|
}): boolean {
|
||||||
const { state, previousJobs } = params;
|
const { state, previousJobs } = params;
|
||||||
const skipRecomputeJobIds = state.skipNextReloadRepairRecomputeJobIds;
|
const skipRecomputeJobIds = state.skipNextReloadRepairRecomputeJobIds;
|
||||||
if (!state.store) {
|
if (!state.store || previousJobs === undefined) {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
if (skipRecomputeJobIds.size > 0) {
|
if (skipRecomputeJobIds.size > 0) {
|
||||||
@ -98,28 +57,32 @@ function repairNextRunsAfterExternalReload(params: {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
const previousById = new Map((previousJobs ?? []).map((job) => [job.id, job]));
|
const previousById = new Map(previousJobs.map((job) => [job.id, job]));
|
||||||
const now = state.deps.nowMs();
|
const now = state.deps.nowMs();
|
||||||
let changed = false;
|
let changed = false;
|
||||||
|
|
||||||
for (const job of state.store.jobs) {
|
for (const job of state.store.jobs) {
|
||||||
const previous = previousById.get(job.id);
|
const previous = previousById.get(job.id);
|
||||||
const coldLoadRepairCandidate =
|
if (!previous) {
|
||||||
previousJobs === undefined && shouldRepairColdLoadNextRun({ job, nowMs: now });
|
continue;
|
||||||
const scheduleChanged = previous ? !schedulesEqual(previous.schedule, job.schedule) : false;
|
}
|
||||||
const enabledChanged = previous ? previous.enabled !== job.enabled : false;
|
if (typeof previous.state.runningAtMs === "number" && job.state.runningAtMs === undefined) {
|
||||||
if (!scheduleChanged && !enabledChanged && !coldLoadRepairCandidate) {
|
job.state.runningAtMs = previous.state.runningAtMs;
|
||||||
|
changed = true;
|
||||||
|
}
|
||||||
|
|
||||||
|
const scheduleChanged = !schedulesEqual(previous.schedule, job.schedule);
|
||||||
|
const enabledChanged = previous.enabled !== job.enabled;
|
||||||
|
if (!scheduleChanged && !enabledChanged) {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
skipRecomputeJobIds.delete(job.id);
|
skipRecomputeJobIds.delete(job.id);
|
||||||
const computeBaseMs = coldLoadRepairCandidate
|
const computeBaseMs = resolveExternalRepairComputeBaseMs({
|
||||||
? Math.min(now, Math.max(0, Math.floor(job.updatedAtMs)))
|
nowMs: now,
|
||||||
: resolveExternalRepairComputeBaseMs({
|
reloadedUpdatedAtMs: job.updatedAtMs,
|
||||||
nowMs: now,
|
previousUpdatedAtMs: previous.updatedAtMs,
|
||||||
reloadedUpdatedAtMs: job.updatedAtMs,
|
});
|
||||||
previousUpdatedAtMs: previous?.updatedAtMs ?? Number.NEGATIVE_INFINITY,
|
|
||||||
});
|
|
||||||
let nextRunAtMs: number | undefined;
|
let nextRunAtMs: number | undefined;
|
||||||
try {
|
try {
|
||||||
nextRunAtMs = job.enabled ? computeJobNextRunAtMs(job, computeBaseMs) : undefined;
|
nextRunAtMs = job.enabled ? computeJobNextRunAtMs(job, computeBaseMs) : undefined;
|
||||||
@ -159,7 +122,6 @@ function repairNextRunsAfterExternalReload(params: {
|
|||||||
jobId: job.id,
|
jobId: job.id,
|
||||||
scheduleChanged,
|
scheduleChanged,
|
||||||
enabledChanged,
|
enabledChanged,
|
||||||
coldLoadRepairCandidate,
|
|
||||||
computeBaseMs,
|
computeBaseMs,
|
||||||
nextRunAtMs: job.state.nextRunAtMs,
|
nextRunAtMs: job.state.nextRunAtMs,
|
||||||
},
|
},
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user