fix(cron): avoid cold-load backoff regression and restore loader jiti base
This commit is contained in:
parent
a7217bfcf7
commit
83844adec2
@ -1,4 +1,6 @@
|
||||
import fs from "node:fs";
|
||||
import { parseAbsoluteTimeMs } from "../parse.js";
|
||||
import { coerceFiniteScheduleNumber } from "../schedule.js";
|
||||
import { normalizeStoredCronJobs } from "../store-migration.js";
|
||||
import { loadCronStore, saveCronStore } from "../store.js";
|
||||
import type { CronJob } from "../types.js";
|
||||
@ -34,6 +36,22 @@ function resolveExternalRepairComputeBaseMs(params: {
|
||||
return Math.min(nowMs, normalizedReloadedUpdatedAtMs);
|
||||
}
|
||||
|
||||
function hasPendingErrorBackoff(job: CronJob, nowMs: number): boolean {
|
||||
const nextRunAtMs = job.state.nextRunAtMs;
|
||||
if (typeof nextRunAtMs !== "number" || !Number.isFinite(nextRunAtMs) || nextRunAtMs <= nowMs) {
|
||||
return false;
|
||||
}
|
||||
const consecutiveErrors = job.state.consecutiveErrors;
|
||||
if (
|
||||
typeof consecutiveErrors !== "number" ||
|
||||
!Number.isFinite(consecutiveErrors) ||
|
||||
consecutiveErrors <= 0
|
||||
) {
|
||||
return false;
|
||||
}
|
||||
return job.state.lastStatus === "error";
|
||||
}
|
||||
|
||||
function shouldRepairColdLoadNextRun(params: { job: CronJob; nowMs: number }): boolean {
|
||||
const { job, nowMs } = params;
|
||||
if (!job.enabled) {
|
||||
@ -52,10 +70,40 @@ function shouldRepairColdLoadNextRun(params: { job: CronJob; nowMs: number }): b
|
||||
if (persistedNextRunAtMs <= nowMs) {
|
||||
return normalizedUpdatedAtMs > persistedNextRunAtMs;
|
||||
}
|
||||
if (hasPendingErrorBackoff(job, nowMs)) {
|
||||
return false;
|
||||
}
|
||||
const computeBaseMs = Math.min(nowMs, normalizedUpdatedAtMs);
|
||||
return computeBaseMs < persistedNextRunAtMs;
|
||||
}
|
||||
|
||||
function parseAtScheduleMs(schedule: Extract<CronJob["schedule"], { kind: "at" }>): number | null {
|
||||
const legacy = schedule as { at?: string; atMs?: number | string };
|
||||
if (typeof legacy.atMs === "number" && Number.isFinite(legacy.atMs) && legacy.atMs > 0) {
|
||||
return legacy.atMs;
|
||||
}
|
||||
if (typeof legacy.atMs === "string") {
|
||||
return parseAbsoluteTimeMs(legacy.atMs);
|
||||
}
|
||||
if (typeof legacy.at === "string") {
|
||||
return parseAbsoluteTimeMs(legacy.at);
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
function shouldTreatUndefinedNextRunAsScheduleError(job: CronJob): boolean {
|
||||
if (!job.enabled) {
|
||||
return false;
|
||||
}
|
||||
if (job.schedule.kind === "every") {
|
||||
return coerceFiniteScheduleNumber(job.schedule.everyMs) === undefined;
|
||||
}
|
||||
if (job.schedule.kind === "at") {
|
||||
return parseAtScheduleMs(job.schedule) === null;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
function repairNextRunsAfterExternalReload(params: {
|
||||
state: CronServiceState;
|
||||
previousJobs: CronJob[] | undefined;
|
||||
@ -99,6 +147,17 @@ function repairNextRunsAfterExternalReload(params: {
|
||||
let nextRunAtMs: number | undefined;
|
||||
try {
|
||||
nextRunAtMs = job.enabled ? computeJobNextRunAtMs(job, computeBaseMs) : undefined;
|
||||
if (nextRunAtMs === undefined && shouldTreatUndefinedNextRunAsScheduleError(job)) {
|
||||
const err =
|
||||
job.schedule.kind === "every"
|
||||
? new Error("invalid every schedule: everyMs must be a finite number")
|
||||
: new Error("invalid at schedule: at must be a valid absolute timestamp");
|
||||
if (recordScheduleComputeError({ state, job, err })) {
|
||||
changed = true;
|
||||
}
|
||||
skipRecomputeJobIds.add(job.id);
|
||||
continue;
|
||||
}
|
||||
if (job.state.scheduleErrorCount !== undefined) {
|
||||
job.state.scheduleErrorCount = undefined;
|
||||
changed = true;
|
||||
|
||||
@ -1,6 +1,7 @@
|
||||
import fs from "node:fs";
|
||||
import os from "node:os";
|
||||
import path from "node:path";
|
||||
import { pathToFileURL } from "node:url";
|
||||
import { afterAll, afterEach, describe, expect, it, vi } from "vitest";
|
||||
import { emitDiagnosticEvent, resetDiagnosticEventsForTest } from "../infra/diagnostic-events.js";
|
||||
import { withEnv } from "../test-utils/env.js";
|
||||
@ -3700,9 +3701,10 @@ export const syntheticRuntimeMarker = {
|
||||
"utf-8",
|
||||
);
|
||||
const copiedChannelRuntime = path.join(copiedExtensionRoot, "src", "channel.runtime.ts");
|
||||
const jitiBaseUrl = pathToFileURL(jitiBaseFile).href;
|
||||
|
||||
const createJiti = await getCreateJiti();
|
||||
const withoutAlias = createJiti(jitiBaseFile, {
|
||||
const withoutAlias = createJiti(jitiBaseUrl, {
|
||||
...__testing.buildPluginLoaderJitiOptions({}),
|
||||
tryNative: false,
|
||||
});
|
||||
@ -3710,7 +3712,7 @@ export const syntheticRuntimeMarker = {
|
||||
// follow the same path instead of the async import helper.
|
||||
expect(() => withoutAlias(copiedChannelRuntime)).toThrow();
|
||||
|
||||
const withAlias = createJiti(jitiBaseFile, {
|
||||
const withAlias = createJiti(jitiBaseUrl, {
|
||||
...__testing.buildPluginLoaderJitiOptions({
|
||||
"openclaw/plugin-sdk/infra-runtime": copiedChannelRuntimeShim,
|
||||
}),
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user