diff --git a/src/cron/service.restart-catchup.test.ts b/src/cron/service.restart-catchup.test.ts index 307af0f9cb4..6dff6efc530 100644 --- a/src/cron/service.restart-catchup.test.ts +++ b/src/cron/service.restart-catchup.test.ts @@ -2,7 +2,9 @@ import fs from "node:fs/promises"; import path from "node:path"; import { describe, expect, it, vi } from "vitest"; import { CronService } from "./service.js"; +import { createCronServiceState } from "./state.js"; import { setupCronServiceSuite } from "./service.test-harness.js"; +import { runMissedJobs } from "./timer.js"; const { logger: noopLogger, makeStorePath } = setupCronServiceSuite({ prefix: "openclaw-cron-", @@ -30,6 +32,21 @@ describe("CronService restart catch-up", () => { }); } + function createOverdueEveryJob(id: string, nextRunAtMs: number) { + return { + id, + name: `job-${id}`, + enabled: true, + createdAtMs: nextRunAtMs - 60_000, + updatedAtMs: nextRunAtMs - 60_000, + schedule: { kind: "every", everyMs: 60_000, anchorMs: nextRunAtMs - 60_000 }, + sessionTarget: "main", + wakeMode: "next-heartbeat", + payload: { kind: "systemEvent", text: `tick-${id}` }, + state: { nextRunAtMs }, + }; + } + it("executes an overdue recurring job immediately on start", async () => { const store = await makeStorePath(); const enqueueSystemEvent = vi.fn(); @@ -351,4 +368,47 @@ describe("CronService restart catch-up", () => { cron.stop(); await store.cleanup(); }); + + it("reschedules deferred missed jobs from the post-catchup clock so they stay in the future", async () => { + const store = await makeStorePath(); + const startNow = Date.parse("2025-12-13T17:00:00.000Z"); + let now = startNow; + + await writeStoreJobs(store.storePath, [ + createOverdueEveryJob("stagger-0", startNow - 60_000), + createOverdueEveryJob("stagger-1", startNow - 50_000), + createOverdueEveryJob("stagger-2", startNow - 40_000), + ]); + + const state = createCronServiceState({ + cronEnabled: true, + storePath: store.storePath, + log: noopLogger, + nowMs: () => now, + enqueueSystemEvent: vi.fn(), + requestHeartbeatNow: vi.fn(), + runIsolatedAgentJob: vi.fn(async () => { + now += 6_000; + return { status: "ok" as const, summary: "ok" }; + }), + maxMissedJobsPerRestart: 1, + missedJobStaggerMs: 5_000, + }); + + await runMissedJobs(state); + + const staggeredJobs = (state.store?.jobs ?? []) + .filter((job) => job.id.startsWith("stagger-") && job.id !== "stagger-0") + .toSorted((a, b) => (a.state.nextRunAtMs ?? 0) - (b.state.nextRunAtMs ?? 0)); + + expect(staggeredJobs).toHaveLength(2); + expect(staggeredJobs[0]?.state.nextRunAtMs).toBeGreaterThan(now); + expect(staggeredJobs[1]?.state.nextRunAtMs).toBeGreaterThan( + staggeredJobs[0]?.state.nextRunAtMs ?? 0, + ); + expect((staggeredJobs[1]?.state.nextRunAtMs ?? 0) - (staggeredJobs[0]?.state.nextRunAtMs ?? 0)) + .toBe(5_000); + + await store.cleanup(); + }); }); diff --git a/src/cron/service/state.ts b/src/cron/service/state.ts index 1e42ae089cd..073efd8f459 100644 --- a/src/cron/service/state.ts +++ b/src/cron/service/state.ts @@ -48,6 +48,18 @@ export type CronServiceDeps = { resolveSessionStorePath?: (agentId?: string) => string; /** Path to the session store (sessions.json) for reaper use. */ sessionStorePath?: string; + /** + * Delay in ms between missed job executions on startup. + * Prevents overwhelming the gateway when many jobs are overdue. + * See: https://github.com/openclaw/openclaw/issues/18892 + */ + missedJobStaggerMs?: number; + /** + * Maximum number of missed jobs to run immediately on startup. + * Additional missed jobs will be rescheduled to fire gradually. + * See: https://github.com/openclaw/openclaw/issues/18892 + */ + maxMissedJobsPerRestart?: number; enqueueSystemEvent: ( text: string, opts?: { agentId?: string; sessionKey?: string; contextKey?: string }, diff --git a/src/cron/service/timer.ts b/src/cron/service/timer.ts index 3f50ca757e8..8f005bd8dbb 100644 --- a/src/cron/service/timer.ts +++ b/src/cron/service/timer.ts @@ -38,6 +38,9 @@ const MAX_TIMER_DELAY_MS = 60_000; * but always breaks an infinite re-trigger cycle. (See #17821) */ const MIN_REFIRE_GAP_MS = 2_000; + +const DEFAULT_MISSED_JOB_STAGGER_MS = 5_000; +const DEFAULT_MAX_MISSED_JOBS_PER_RESTART = 5; const DEFAULT_FAILURE_ALERT_AFTER = 2; const DEFAULT_FAILURE_ALERT_COOLDOWN_MS = 60 * 60_000; // 1 hour @@ -829,10 +832,18 @@ export async function runMissedJobs( state: CronServiceState, opts?: { skipJobIds?: ReadonlySet }, ) { - const startupCandidates = await locked(state, async () => { + const staggerMs = Math.max(0, state.deps.missedJobStaggerMs ?? DEFAULT_MISSED_JOB_STAGGER_MS); + const maxImmediate = Math.max( + 0, + state.deps.maxMissedJobsPerRestart ?? DEFAULT_MAX_MISSED_JOBS_PER_RESTART, + ); + const selection = await locked(state, async () => { await ensureLoaded(state, { skipRecompute: true }); if (!state.store) { - return [] as Array<{ jobId: string; job: CronJob }>; + return { + deferredJobIds: [] as string[], + startupCandidates: [] as Array<{ jobId: string; job: CronJob }>, + }; } const now = state.deps.nowMs(); const skipJobIds = opts?.skipJobIds; @@ -842,26 +853,47 @@ export async function runMissedJobs( allowCronMissedRunByLastRun: true, }); if (missed.length === 0) { - return [] as Array<{ jobId: string; job: CronJob }>; + return { + deferredJobIds: [] as string[], + startupCandidates: [] as Array<{ jobId: string; job: CronJob }>, + }; } - state.deps.log.info( - { count: missed.length, jobIds: missed.map((j) => j.id) }, - "cron: running missed jobs after restart", - ); - for (const job of missed) { + const sorted = missed.toSorted((a, b) => (a.state.nextRunAtMs ?? 0) - (b.state.nextRunAtMs ?? 0)); + const startupCandidates = sorted.slice(0, maxImmediate); + const deferred = sorted.slice(maxImmediate); + if (deferred.length > 0) { + state.deps.log.info( + { + immediateCount: startupCandidates.length, + deferredCount: deferred.length, + totalMissed: missed.length, + }, + "cron: staggering missed jobs to prevent gateway overload", + ); + } + if (startupCandidates.length > 0) { + state.deps.log.info( + { count: startupCandidates.length, jobIds: startupCandidates.map((j) => j.id) }, + "cron: running missed jobs after restart", + ); + } + for (const job of startupCandidates) { job.state.runningAtMs = now; job.state.lastError = undefined; } await persist(state); - return missed.map((job) => ({ jobId: job.id, job })); + return { + deferredJobIds: deferred.map((job) => job.id), + startupCandidates: startupCandidates.map((job) => ({ jobId: job.id, job })), + }; }); - if (startupCandidates.length === 0) { + if (selection.startupCandidates.length === 0 && selection.deferredJobIds.length === 0) { return; } const outcomes: Array = []; - for (const candidate of startupCandidates) { + for (const candidate of selection.startupCandidates) { const startedAt = state.deps.nowMs(); emit(state, { jobId: candidate.job.id, action: "started", runAtMs: startedAt }); try { @@ -901,6 +933,19 @@ export async function runMissedJobs( applyOutcomeToStoredJob(state, result); } + if (selection.deferredJobIds.length > 0) { + const baseNow = state.deps.nowMs(); + let offset = staggerMs; + for (const jobId of selection.deferredJobIds) { + const job = state.store.jobs.find((entry) => entry.id === jobId); + if (!job || !job.enabled) { + continue; + } + job.state.nextRunAtMs = baseNow + offset; + offset += staggerMs; + } + } + // Preserve any new past-due nextRunAtMs values that became due while // startup catch-up was running. They should execute on a future tick // instead of being silently advanced.