diff --git a/CHANGELOG.md b/CHANGELOG.md index 76ced29da0f..b95d8a0c2eb 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1035,6 +1035,7 @@ Docs: https://docs.openclaw.ai - Feishu: detect bot mentions in post messages with embedded docs when `message.mentions` is empty. (#18074) Thanks @popomore. - Agents/Sessions: align session lock watchdog hold windows with run and compaction timeout budgets (plus grace), preventing valid long-running turns from being force-unlocked mid-run while still recovering hung lock owners. (#18060) - Cron: preserve default model fallbacks for cron agent runs when only `model.primary` is overridden, so failover still follows configured fallbacks unless explicitly cleared with `fallbacks: []`. (#18210) Thanks @mahsumaktas. +- Cron/Isolation: treat non-finite `nextRunAtMs` as missing and repair isolated `every` anchor fallback so legacy jobs without valid timestamps self-heal and scheduler wake timing remains valid. (#19469) Thanks @guirguispierre. - Cron: route text-only announce output through the main session announce flow via runSubagentAnnounceFlow so cron text-only output remains visible to the initiating session. Thanks @tyler6204. - Cron: treat `timeoutSeconds: 0` as no-timeout (not clamped to 1), ensuring long-running cron runs are not prematurely terminated. Thanks @tyler6204. - Cron announce injection now targets the session determined by delivery config (`to` + channel) instead of defaulting to the current session. Thanks @tyler6204. diff --git a/src/cron/service.issue-regressions.test.ts b/src/cron/service.issue-regressions.test.ts index 469ff498019..88eef2c9bdb 100644 --- a/src/cron/service.issue-regressions.test.ts +++ b/src/cron/service.issue-regressions.test.ts @@ -241,6 +241,55 @@ describe("Cron issue regressions", () => { cron.stop(); }); + it("repairs isolated every jobs missing createdAtMs and sets nextWakeAtMs", async () => { + const store = await makeStorePath(); + await fs.writeFile( + store.storePath, + JSON.stringify({ + version: 1, + jobs: [ + { + id: "legacy-isolated", + agentId: "feature-dev_planner", + sessionKey: "agent:main:main", + name: "legacy isolated", + enabled: true, + schedule: { kind: "every", everyMs: 300_000 }, + sessionTarget: "isolated", + wakeMode: "now", + payload: { kind: "agentTurn", message: "poll workflow queue" }, + state: {}, + }, + ], + }), + ); + + const cron = new CronService({ + cronEnabled: true, + storePath: store.storePath, + log: noopLogger, + enqueueSystemEvent: vi.fn(), + requestHeartbeatNow: vi.fn(), + runIsolatedAgentJob: vi.fn().mockResolvedValue({ status: "ok", summary: "ok" }), + }); + await cron.start(); + + const status = await cron.status(); + const jobs = await cron.list({ includeDisabled: true }); + const isolated = jobs.find((job) => job.id === "legacy-isolated"); + expect(Number.isFinite(isolated?.state.nextRunAtMs)).toBe(true); + expect(Number.isFinite(status.nextWakeAtMs)).toBe(true); + + const persisted = JSON.parse(await fs.readFile(store.storePath, "utf8")) as { + jobs: Array<{ id: string; state?: { nextRunAtMs?: number | null } }>; + }; + const persistedIsolated = persisted.jobs.find((job) => job.id === "legacy-isolated"); + expect(typeof persistedIsolated?.state?.nextRunAtMs).toBe("number"); + expect(Number.isFinite(persistedIsolated?.state?.nextRunAtMs)).toBe(true); + + cron.stop(); + }); + it("repairs missing nextRunAtMs on non-schedule updates without touching other jobs", async () => { const store = await makeStorePath(); const cron = await startCronForStore({ storePath: store.storePath }); diff --git a/src/cron/service/jobs.ts b/src/cron/service/jobs.ts index db42b80ba54..488683bed96 100644 --- a/src/cron/service/jobs.ts +++ b/src/cron/service/jobs.ts @@ -63,15 +63,22 @@ function computeStaggeredCronNextRunAtMs(job: CronJob, nowMs: number) { return undefined; } +function isFiniteTimestamp(value: unknown): value is number { + return typeof value === "number" && Number.isFinite(value); +} + function resolveEveryAnchorMs(params: { schedule: { everyMs: number; anchorMs?: number }; fallbackAnchorMs: number; }) { const raw = params.schedule.anchorMs; - if (typeof raw === "number" && Number.isFinite(raw)) { + if (isFiniteTimestamp(raw)) { return Math.max(0, Math.floor(raw)); } - return Math.max(0, Math.floor(params.fallbackAnchorMs)); + if (isFiniteTimestamp(params.fallbackAnchorMs)) { + return Math.max(0, Math.floor(params.fallbackAnchorMs)); + } + return 0; } export function assertSupportedJobSpec(job: Pick) { @@ -144,11 +151,13 @@ export function computeJobNextRunAtMs(job: CronJob, nowMs: number): number | und return nextFromLastRun; } } + const fallbackAnchorMs = isFiniteTimestamp(job.createdAtMs) ? job.createdAtMs : nowMs; const anchorMs = resolveEveryAnchorMs({ schedule: job.schedule, - fallbackAnchorMs: job.createdAtMs, + fallbackAnchorMs, }); - return computeNextRunAtMs({ ...job.schedule, everyMs, anchorMs }, nowMs); + const next = computeNextRunAtMs({ ...job.schedule, everyMs, anchorMs }, nowMs); + return isFiniteTimestamp(next) ? next : undefined; } if (job.schedule.kind === "at") { // One-shot jobs stay due until they successfully finish. @@ -167,14 +176,14 @@ export function computeJobNextRunAtMs(job: CronJob, nowMs: number): number | und : typeof schedule.at === "string" ? parseAbsoluteTimeMs(schedule.at) : null; - return atMs !== null ? atMs : undefined; + return atMs !== null && Number.isFinite(atMs) ? atMs : undefined; } const next = computeStaggeredCronNextRunAtMs(job, nowMs); if (next === undefined && job.schedule.kind === "cron") { const nextSecondMs = Math.floor(nowMs / 1000) * 1000 + 1000; return computeStaggeredCronNextRunAtMs(job, nextSecondMs); } - return next; + return isFiniteTimestamp(next) ? next : undefined; } /** Maximum consecutive schedule errors before auto-disabling a job. */ @@ -233,6 +242,11 @@ function normalizeJobTickState(params: { state: CronServiceState; job: CronJob; return { changed, skip: true }; } + if (!isFiniteTimestamp(job.state.nextRunAtMs) && job.state.nextRunAtMs !== undefined) { + job.state.nextRunAtMs = undefined; + changed = true; + } + const runningAt = job.state.runningAtMs; if (typeof runningAt === "number" && nowMs - runningAt > STUCK_RUN_MS) { state.deps.log.warn( @@ -298,7 +312,7 @@ export function recomputeNextRuns(state: CronServiceState): boolean { // Preserving a still-future nextRunAtMs avoids accidentally advancing // a job that hasn't fired yet (e.g. during restart recovery). const nextRun = job.state.nextRunAtMs; - const isDueOrMissing = nextRun === undefined || now >= nextRun; + const isDueOrMissing = !isFiniteTimestamp(nextRun) || now >= nextRun; if (isDueOrMissing) { if (recomputeJobNextRunAtMs({ state, job, nowMs: now })) { changed = true; @@ -321,7 +335,7 @@ export function recomputeNextRunsForMaintenance(state: CronServiceState): boolea // Only compute missing nextRunAtMs, do NOT recompute existing ones. // If a job was past-due but not found by findDueJobs, recomputing would // cause it to be silently skipped. - if (job.state.nextRunAtMs === undefined) { + if (!isFiniteTimestamp(job.state.nextRunAtMs)) { if (recomputeJobNextRunAtMs({ state, job, nowMs: now })) { changed = true; } @@ -332,14 +346,18 @@ export function recomputeNextRunsForMaintenance(state: CronServiceState): boolea export function nextWakeAtMs(state: CronServiceState) { const jobs = state.store?.jobs ?? []; - const enabled = jobs.filter((j) => j.enabled && typeof j.state.nextRunAtMs === "number"); + const enabled = jobs.filter((j) => j.enabled && isFiniteTimestamp(j.state.nextRunAtMs)); if (enabled.length === 0) { return undefined; } - return enabled.reduce( - (min, j) => Math.min(min, j.state.nextRunAtMs as number), - enabled[0].state.nextRunAtMs as number, - ); + const first = enabled[0]?.state.nextRunAtMs; + if (!isFiniteTimestamp(first)) { + return undefined; + } + return enabled.reduce((min, j) => { + const next = j.state.nextRunAtMs; + return isFiniteTimestamp(next) ? Math.min(min, next) : min; + }, first); } export function createJob(state: CronServiceState, input: CronJobCreate): CronJob { diff --git a/src/cron/service/timer.ts b/src/cron/service/timer.ts index 8267d4c970a..85c392584fd 100644 --- a/src/cron/service/timer.ts +++ b/src/cron/service/timer.ts @@ -250,8 +250,12 @@ export function armTimer(state: CronServiceState) { const jobCount = state.store?.jobs.length ?? 0; const enabledCount = state.store?.jobs.filter((j) => j.enabled).length ?? 0; const withNextRun = - state.store?.jobs.filter((j) => j.enabled && typeof j.state.nextRunAtMs === "number") - .length ?? 0; + state.store?.jobs.filter( + (j) => + j.enabled && + typeof j.state.nextRunAtMs === "number" && + Number.isFinite(j.state.nextRunAtMs), + ).length ?? 0; state.deps.log.debug( { jobCount, enabledCount, withNextRun }, "cron: armTimer skipped - no jobs with nextRunAtMs", @@ -476,7 +480,7 @@ function isRunnableJob(params: { return false; } const next = job.state.nextRunAtMs; - return typeof next === "number" && nowMs >= next; + return typeof next === "number" && Number.isFinite(next) && nowMs >= next; } function collectRunnableJobs(