From f5eb99df3f2092be9cf9b14c2ee2cfa3d606dba1 Mon Sep 17 00:00:00 2001 From: create Date: Fri, 20 Mar 2026 19:20:00 +0800 Subject: [PATCH] fix(cron): repair stale cold-load schedules and stabilize jiti regression test --- ...external-reload-schedule-recompute.test.ts | 49 +++++++++++++++++ src/cron/service/store.ts | 54 ++++++++++++++----- src/plugins/loader.test.ts | 6 +-- 3 files changed, 91 insertions(+), 18 deletions(-) diff --git a/src/cron/service.external-reload-schedule-recompute.test.ts b/src/cron/service.external-reload-schedule-recompute.test.ts index 1d168c0689f..bb25f792773 100644 --- a/src/cron/service.external-reload-schedule-recompute.test.ts +++ b/src/cron/service.external-reload-schedule-recompute.test.ts @@ -130,6 +130,55 @@ describe("forceReload repairs externally changed cron schedules", () => { expect(reloaded?.state.nextRunAtMs).toBe(Date.parse("2026-03-19T12:02:00.000Z")); }); + it("repairs stale nextRunAtMs on first load after restart", async () => { + const store = await makeStorePath(); + const nowMs = Date.parse("2026-03-19T12:10:00.000Z"); + const editedAtMs = Date.parse("2026-03-19T12:01:00.000Z"); + const jobId = "external-schedule-change-cold-load"; + const staleNextRunAtMs = Date.parse("2026-03-20T00:30:00.000Z"); + + const createJob = (): CronJob => ({ + id: jobId, + name: "external schedule cold load repair", + enabled: true, + createdAtMs: Date.parse("2026-03-18T00:30:00.000Z"), + updatedAtMs: editedAtMs, + schedule: { kind: "cron", expr: "* * * * *", tz: "UTC", staggerMs: 0 }, + sessionTarget: "main", + wakeMode: "next-heartbeat", + payload: { kind: "systemEvent", text: "tick" }, + state: { + nextRunAtMs: staleNextRunAtMs, + }, + }); + + await writeCronStoreSnapshot({ + storePath: store.storePath, + jobs: [createJob()], + }); + + const state = createCronServiceState({ + cronEnabled: true, + storePath: store.storePath, + log: noopLogger, + nowMs: () => nowMs, + enqueueSystemEvent: vi.fn(), + requestHeartbeatNow: vi.fn(), + runIsolatedAgentJob: vi.fn(async () => ({ status: "ok" as const })), + }); + + await ensureLoaded(state, { skipRecompute: true }); + + const reloaded = state.store?.jobs.find((job) => job.id === jobId); + expect(reloaded?.state.nextRunAtMs).toBe(Date.parse("2026-03-19T12:02:00.000Z")); + + const persisted = JSON.parse(await fs.readFile(store.storePath, "utf8")) as { + jobs?: Array<{ id: string; state?: { nextRunAtMs?: number } }>; + }; + const persistedJob = persisted.jobs?.find((job) => job.id === jobId); + expect(persistedJob?.state?.nextRunAtMs).toBe(Date.parse("2026-03-19T12:02:00.000Z")); + }); + it("records schedule errors instead of aborting reload when an external edit is invalid", async () => { const store = await makeStorePath(); const nowMs = Date.parse("2026-03-19T01:44:00.000Z"); diff --git a/src/cron/service/store.ts b/src/cron/service/store.ts index 3727b1cdd55..408a9186fae 100644 --- a/src/cron/service/store.ts +++ b/src/cron/service/store.ts @@ -34,13 +34,38 @@ function resolveExternalRepairComputeBaseMs(params: { return Math.min(nowMs, normalizedReloadedUpdatedAtMs); } +function shouldRepairColdLoadNextRun(params: { job: CronJob; nowMs: number }): boolean { + const { job, nowMs } = params; + if (!job.enabled) { + return false; + } + if ((job.state.consecutiveErrors ?? 0) > 0) { + return false; + } + if (typeof job.updatedAtMs !== "number" || !Number.isFinite(job.updatedAtMs)) { + return false; + } + const persistedNextRunAtMs = job.state.nextRunAtMs; + if (typeof persistedNextRunAtMs !== "number" || !Number.isFinite(persistedNextRunAtMs)) { + return false; + } + // Cold-load repair is only for stale future schedules edited while the + // gateway was offline. Already-due timestamps should be preserved so they can + // execute on the next tick. + if (persistedNextRunAtMs <= nowMs) { + return false; + } + const computeBaseMs = Math.min(nowMs, Math.max(0, Math.floor(job.updatedAtMs))); + return computeBaseMs < persistedNextRunAtMs; +} + function repairNextRunsAfterExternalReload(params: { state: CronServiceState; previousJobs: CronJob[] | undefined; }): boolean { const { state, previousJobs } = params; const skipRecomputeJobIds = state.skipNextReloadRepairRecomputeJobIds; - if (!state.store || !previousJobs?.length) { + if (!state.store) { return false; } if (skipRecomputeJobIds.size > 0) { @@ -52,28 +77,28 @@ function repairNextRunsAfterExternalReload(params: { } } - const previousById = new Map(previousJobs.map((job) => [job.id, job])); + const previousById = new Map((previousJobs ?? []).map((job) => [job.id, job])); const now = state.deps.nowMs(); let changed = false; for (const job of state.store.jobs) { const previous = previousById.get(job.id); - if (!previous) { - continue; - } - - const scheduleChanged = !schedulesEqual(previous.schedule, job.schedule); - const enabledChanged = previous.enabled !== job.enabled; - if (!scheduleChanged && !enabledChanged) { + const coldLoadRepairCandidate = + previousJobs === undefined && shouldRepairColdLoadNextRun({ job, nowMs: now }); + const scheduleChanged = previous ? !schedulesEqual(previous.schedule, job.schedule) : false; + const enabledChanged = previous ? previous.enabled !== job.enabled : false; + if (!scheduleChanged && !enabledChanged && !coldLoadRepairCandidate) { continue; } skipRecomputeJobIds.delete(job.id); - const computeBaseMs = resolveExternalRepairComputeBaseMs({ - nowMs: now, - reloadedUpdatedAtMs: job.updatedAtMs, - previousUpdatedAtMs: previous.updatedAtMs, - }); + const computeBaseMs = coldLoadRepairCandidate + ? Math.min(now, Math.max(0, Math.floor(job.updatedAtMs))) + : resolveExternalRepairComputeBaseMs({ + nowMs: now, + reloadedUpdatedAtMs: job.updatedAtMs, + previousUpdatedAtMs: previous?.updatedAtMs ?? Number.NEGATIVE_INFINITY, + }); let nextRunAtMs: number | undefined; try { nextRunAtMs = job.enabled ? computeJobNextRunAtMs(job, computeBaseMs) : undefined; @@ -102,6 +127,7 @@ function repairNextRunsAfterExternalReload(params: { jobId: job.id, scheduleChanged, enabledChanged, + coldLoadRepairCandidate, computeBaseMs, nextRunAtMs: job.state.nextRunAtMs, }, diff --git a/src/plugins/loader.test.ts b/src/plugins/loader.test.ts index 8af6cf927d4..d81f8c18326 100644 --- a/src/plugins/loader.test.ts +++ b/src/plugins/loader.test.ts @@ -1,7 +1,6 @@ import fs from "node:fs"; import os from "node:os"; import path from "node:path"; -import { pathToFileURL } from "node:url"; import { afterAll, afterEach, describe, expect, it, vi } from "vitest"; import { emitDiagnosticEvent, resetDiagnosticEventsForTest } from "../infra/diagnostic-events.js"; import { withEnv } from "../test-utils/env.js"; @@ -3701,10 +3700,9 @@ export const syntheticRuntimeMarker = { "utf-8", ); const copiedChannelRuntime = path.join(copiedExtensionRoot, "src", "channel.runtime.ts"); - const jitiBaseUrl = pathToFileURL(jitiBaseFile).href; const createJiti = await getCreateJiti(); - const withoutAlias = createJiti(jitiBaseUrl, { + const withoutAlias = createJiti(jitiBaseFile, { ...__testing.buildPluginLoaderJitiOptions({}), tryNative: false, }); @@ -3712,7 +3710,7 @@ export const syntheticRuntimeMarker = { // follow the same path instead of the async import helper. expect(() => withoutAlias(copiedChannelRuntime)).toThrow(); - const withAlias = createJiti(jitiBaseUrl, { + const withAlias = createJiti(jitiBaseFile, { ...__testing.buildPluginLoaderJitiOptions({ "openclaw/plugin-sdk/infra-runtime": copiedChannelRuntimeShim, }),