fix(cron): repair stale cold-load schedules and stabilize jiti regression test
This commit is contained in:
parent
4aabf034eb
commit
f5eb99df3f
@ -130,6 +130,55 @@ describe("forceReload repairs externally changed cron schedules", () => {
|
||||
expect(reloaded?.state.nextRunAtMs).toBe(Date.parse("2026-03-19T12:02:00.000Z"));
|
||||
});
|
||||
|
||||
it("repairs stale nextRunAtMs on first load after restart", async () => {
|
||||
const store = await makeStorePath();
|
||||
const nowMs = Date.parse("2026-03-19T12:10:00.000Z");
|
||||
const editedAtMs = Date.parse("2026-03-19T12:01:00.000Z");
|
||||
const jobId = "external-schedule-change-cold-load";
|
||||
const staleNextRunAtMs = Date.parse("2026-03-20T00:30:00.000Z");
|
||||
|
||||
const createJob = (): CronJob => ({
|
||||
id: jobId,
|
||||
name: "external schedule cold load repair",
|
||||
enabled: true,
|
||||
createdAtMs: Date.parse("2026-03-18T00:30:00.000Z"),
|
||||
updatedAtMs: editedAtMs,
|
||||
schedule: { kind: "cron", expr: "* * * * *", tz: "UTC", staggerMs: 0 },
|
||||
sessionTarget: "main",
|
||||
wakeMode: "next-heartbeat",
|
||||
payload: { kind: "systemEvent", text: "tick" },
|
||||
state: {
|
||||
nextRunAtMs: staleNextRunAtMs,
|
||||
},
|
||||
});
|
||||
|
||||
await writeCronStoreSnapshot({
|
||||
storePath: store.storePath,
|
||||
jobs: [createJob()],
|
||||
});
|
||||
|
||||
const state = createCronServiceState({
|
||||
cronEnabled: true,
|
||||
storePath: store.storePath,
|
||||
log: noopLogger,
|
||||
nowMs: () => nowMs,
|
||||
enqueueSystemEvent: vi.fn(),
|
||||
requestHeartbeatNow: vi.fn(),
|
||||
runIsolatedAgentJob: vi.fn(async () => ({ status: "ok" as const })),
|
||||
});
|
||||
|
||||
await ensureLoaded(state, { skipRecompute: true });
|
||||
|
||||
const reloaded = state.store?.jobs.find((job) => job.id === jobId);
|
||||
expect(reloaded?.state.nextRunAtMs).toBe(Date.parse("2026-03-19T12:02:00.000Z"));
|
||||
|
||||
const persisted = JSON.parse(await fs.readFile(store.storePath, "utf8")) as {
|
||||
jobs?: Array<{ id: string; state?: { nextRunAtMs?: number } }>;
|
||||
};
|
||||
const persistedJob = persisted.jobs?.find((job) => job.id === jobId);
|
||||
expect(persistedJob?.state?.nextRunAtMs).toBe(Date.parse("2026-03-19T12:02:00.000Z"));
|
||||
});
|
||||
|
||||
it("records schedule errors instead of aborting reload when an external edit is invalid", async () => {
|
||||
const store = await makeStorePath();
|
||||
const nowMs = Date.parse("2026-03-19T01:44:00.000Z");
|
||||
|
||||
@ -34,13 +34,38 @@ function resolveExternalRepairComputeBaseMs(params: {
|
||||
return Math.min(nowMs, normalizedReloadedUpdatedAtMs);
|
||||
}
|
||||
|
||||
function shouldRepairColdLoadNextRun(params: { job: CronJob; nowMs: number }): boolean {
|
||||
const { job, nowMs } = params;
|
||||
if (!job.enabled) {
|
||||
return false;
|
||||
}
|
||||
if ((job.state.consecutiveErrors ?? 0) > 0) {
|
||||
return false;
|
||||
}
|
||||
if (typeof job.updatedAtMs !== "number" || !Number.isFinite(job.updatedAtMs)) {
|
||||
return false;
|
||||
}
|
||||
const persistedNextRunAtMs = job.state.nextRunAtMs;
|
||||
if (typeof persistedNextRunAtMs !== "number" || !Number.isFinite(persistedNextRunAtMs)) {
|
||||
return false;
|
||||
}
|
||||
// Cold-load repair is only for stale future schedules edited while the
|
||||
// gateway was offline. Already-due timestamps should be preserved so they can
|
||||
// execute on the next tick.
|
||||
if (persistedNextRunAtMs <= nowMs) {
|
||||
return false;
|
||||
}
|
||||
const computeBaseMs = Math.min(nowMs, Math.max(0, Math.floor(job.updatedAtMs)));
|
||||
return computeBaseMs < persistedNextRunAtMs;
|
||||
}
|
||||
|
||||
function repairNextRunsAfterExternalReload(params: {
|
||||
state: CronServiceState;
|
||||
previousJobs: CronJob[] | undefined;
|
||||
}): boolean {
|
||||
const { state, previousJobs } = params;
|
||||
const skipRecomputeJobIds = state.skipNextReloadRepairRecomputeJobIds;
|
||||
if (!state.store || !previousJobs?.length) {
|
||||
if (!state.store) {
|
||||
return false;
|
||||
}
|
||||
if (skipRecomputeJobIds.size > 0) {
|
||||
@ -52,28 +77,28 @@ function repairNextRunsAfterExternalReload(params: {
|
||||
}
|
||||
}
|
||||
|
||||
const previousById = new Map(previousJobs.map((job) => [job.id, job]));
|
||||
const previousById = new Map((previousJobs ?? []).map((job) => [job.id, job]));
|
||||
const now = state.deps.nowMs();
|
||||
let changed = false;
|
||||
|
||||
for (const job of state.store.jobs) {
|
||||
const previous = previousById.get(job.id);
|
||||
if (!previous) {
|
||||
continue;
|
||||
}
|
||||
|
||||
const scheduleChanged = !schedulesEqual(previous.schedule, job.schedule);
|
||||
const enabledChanged = previous.enabled !== job.enabled;
|
||||
if (!scheduleChanged && !enabledChanged) {
|
||||
const coldLoadRepairCandidate =
|
||||
previousJobs === undefined && shouldRepairColdLoadNextRun({ job, nowMs: now });
|
||||
const scheduleChanged = previous ? !schedulesEqual(previous.schedule, job.schedule) : false;
|
||||
const enabledChanged = previous ? previous.enabled !== job.enabled : false;
|
||||
if (!scheduleChanged && !enabledChanged && !coldLoadRepairCandidate) {
|
||||
continue;
|
||||
}
|
||||
|
||||
skipRecomputeJobIds.delete(job.id);
|
||||
const computeBaseMs = resolveExternalRepairComputeBaseMs({
|
||||
nowMs: now,
|
||||
reloadedUpdatedAtMs: job.updatedAtMs,
|
||||
previousUpdatedAtMs: previous.updatedAtMs,
|
||||
});
|
||||
const computeBaseMs = coldLoadRepairCandidate
|
||||
? Math.min(now, Math.max(0, Math.floor(job.updatedAtMs)))
|
||||
: resolveExternalRepairComputeBaseMs({
|
||||
nowMs: now,
|
||||
reloadedUpdatedAtMs: job.updatedAtMs,
|
||||
previousUpdatedAtMs: previous?.updatedAtMs ?? Number.NEGATIVE_INFINITY,
|
||||
});
|
||||
let nextRunAtMs: number | undefined;
|
||||
try {
|
||||
nextRunAtMs = job.enabled ? computeJobNextRunAtMs(job, computeBaseMs) : undefined;
|
||||
@ -102,6 +127,7 @@ function repairNextRunsAfterExternalReload(params: {
|
||||
jobId: job.id,
|
||||
scheduleChanged,
|
||||
enabledChanged,
|
||||
coldLoadRepairCandidate,
|
||||
computeBaseMs,
|
||||
nextRunAtMs: job.state.nextRunAtMs,
|
||||
},
|
||||
|
||||
@ -1,7 +1,6 @@
|
||||
import fs from "node:fs";
|
||||
import os from "node:os";
|
||||
import path from "node:path";
|
||||
import { pathToFileURL } from "node:url";
|
||||
import { afterAll, afterEach, describe, expect, it, vi } from "vitest";
|
||||
import { emitDiagnosticEvent, resetDiagnosticEventsForTest } from "../infra/diagnostic-events.js";
|
||||
import { withEnv } from "../test-utils/env.js";
|
||||
@ -3701,10 +3700,9 @@ export const syntheticRuntimeMarker = {
|
||||
"utf-8",
|
||||
);
|
||||
const copiedChannelRuntime = path.join(copiedExtensionRoot, "src", "channel.runtime.ts");
|
||||
const jitiBaseUrl = pathToFileURL(jitiBaseFile).href;
|
||||
|
||||
const createJiti = await getCreateJiti();
|
||||
const withoutAlias = createJiti(jitiBaseUrl, {
|
||||
const withoutAlias = createJiti(jitiBaseFile, {
|
||||
...__testing.buildPluginLoaderJitiOptions({}),
|
||||
tryNative: false,
|
||||
});
|
||||
@ -3712,7 +3710,7 @@ export const syntheticRuntimeMarker = {
|
||||
// follow the same path instead of the async import helper.
|
||||
expect(() => withoutAlias(copiedChannelRuntime)).toThrow();
|
||||
|
||||
const withAlias = createJiti(jitiBaseUrl, {
|
||||
const withAlias = createJiti(jitiBaseFile, {
|
||||
...__testing.buildPluginLoaderJitiOptions({
|
||||
"openclaw/plugin-sdk/infra-runtime": copiedChannelRuntimeShim,
|
||||
}),
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user