diff --git a/src/cron/service/store.ts b/src/cron/service/store.ts index 04ac09fa33b..8c2a6cb0fde 100644 --- a/src/cron/service/store.ts +++ b/src/cron/service/store.ts @@ -1,4 +1,6 @@ import fs from "node:fs"; +import { parseAbsoluteTimeMs } from "../parse.js"; +import { coerceFiniteScheduleNumber } from "../schedule.js"; import { normalizeStoredCronJobs } from "../store-migration.js"; import { loadCronStore, saveCronStore } from "../store.js"; import type { CronJob } from "../types.js"; @@ -34,6 +36,22 @@ function resolveExternalRepairComputeBaseMs(params: { return Math.min(nowMs, normalizedReloadedUpdatedAtMs); } +function hasPendingErrorBackoff(job: CronJob, nowMs: number): boolean { + const nextRunAtMs = job.state.nextRunAtMs; + if (typeof nextRunAtMs !== "number" || !Number.isFinite(nextRunAtMs) || nextRunAtMs <= nowMs) { + return false; + } + const consecutiveErrors = job.state.consecutiveErrors; + if ( + typeof consecutiveErrors !== "number" || + !Number.isFinite(consecutiveErrors) || + consecutiveErrors <= 0 + ) { + return false; + } + return job.state.lastStatus === "error"; +} + function shouldRepairColdLoadNextRun(params: { job: CronJob; nowMs: number }): boolean { const { job, nowMs } = params; if (!job.enabled) { @@ -52,10 +70,40 @@ function shouldRepairColdLoadNextRun(params: { job: CronJob; nowMs: number }): b if (persistedNextRunAtMs <= nowMs) { return normalizedUpdatedAtMs > persistedNextRunAtMs; } + if (hasPendingErrorBackoff(job, nowMs)) { + return false; + } const computeBaseMs = Math.min(nowMs, normalizedUpdatedAtMs); return computeBaseMs < persistedNextRunAtMs; } +function parseAtScheduleMs(schedule: Extract): number | null { + const legacy = schedule as { at?: string; atMs?: number | string }; + if (typeof legacy.atMs === "number" && Number.isFinite(legacy.atMs) && legacy.atMs > 0) { + return legacy.atMs; + } + if (typeof legacy.atMs === "string") { + return parseAbsoluteTimeMs(legacy.atMs); + } + if (typeof legacy.at === "string") { + return parseAbsoluteTimeMs(legacy.at); + } + return null; +} + +function shouldTreatUndefinedNextRunAsScheduleError(job: CronJob): boolean { + if (!job.enabled) { + return false; + } + if (job.schedule.kind === "every") { + return coerceFiniteScheduleNumber(job.schedule.everyMs) === undefined; + } + if (job.schedule.kind === "at") { + return parseAtScheduleMs(job.schedule) === null; + } + return false; +} + function repairNextRunsAfterExternalReload(params: { state: CronServiceState; previousJobs: CronJob[] | undefined; @@ -99,6 +147,17 @@ function repairNextRunsAfterExternalReload(params: { let nextRunAtMs: number | undefined; try { nextRunAtMs = job.enabled ? computeJobNextRunAtMs(job, computeBaseMs) : undefined; + if (nextRunAtMs === undefined && shouldTreatUndefinedNextRunAsScheduleError(job)) { + const err = + job.schedule.kind === "every" + ? new Error("invalid every schedule: everyMs must be a finite number") + : new Error("invalid at schedule: at must be a valid absolute timestamp"); + if (recordScheduleComputeError({ state, job, err })) { + changed = true; + } + skipRecomputeJobIds.add(job.id); + continue; + } if (job.state.scheduleErrorCount !== undefined) { job.state.scheduleErrorCount = undefined; changed = true; diff --git a/src/plugins/loader.test.ts b/src/plugins/loader.test.ts index d81f8c18326..8af6cf927d4 100644 --- a/src/plugins/loader.test.ts +++ b/src/plugins/loader.test.ts @@ -1,6 +1,7 @@ import fs from "node:fs"; import os from "node:os"; import path from "node:path"; +import { pathToFileURL } from "node:url"; import { afterAll, afterEach, describe, expect, it, vi } from "vitest"; import { emitDiagnosticEvent, resetDiagnosticEventsForTest } from "../infra/diagnostic-events.js"; import { withEnv } from "../test-utils/env.js"; @@ -3700,9 +3701,10 @@ export const syntheticRuntimeMarker = { "utf-8", ); const copiedChannelRuntime = path.join(copiedExtensionRoot, "src", "channel.runtime.ts"); + const jitiBaseUrl = pathToFileURL(jitiBaseFile).href; const createJiti = await getCreateJiti(); - const withoutAlias = createJiti(jitiBaseFile, { + const withoutAlias = createJiti(jitiBaseUrl, { ...__testing.buildPluginLoaderJitiOptions({}), tryNative: false, }); @@ -3710,7 +3712,7 @@ export const syntheticRuntimeMarker = { // follow the same path instead of the async import helper. expect(() => withoutAlias(copiedChannelRuntime)).toThrow(); - const withAlias = createJiti(jitiBaseFile, { + const withAlias = createJiti(jitiBaseUrl, { ...__testing.buildPluginLoaderJitiOptions({ "openclaw/plugin-sdk/infra-runtime": copiedChannelRuntimeShim, }),