fix(cron): stagger missed jobs on restart to prevent gateway overload
When the gateway restarts with many overdue cron jobs, they are now executed with staggered delays to prevent overwhelming the gateway. - Add missedJobStaggerMs config (default 5s between jobs) - Add maxMissedJobsPerRestart limit (default 5 jobs immediately) - Prioritize most overdue jobs by sorting by nextRunAtMs - Reschedule deferred jobs to fire gradually via normal timer Fixes #18892
This commit is contained in:
parent
2220a58ff7
commit
41a39085d3
@ -2,7 +2,9 @@ import fs from "node:fs/promises";
|
||||
import path from "node:path";
|
||||
import { describe, expect, it, vi } from "vitest";
|
||||
import { CronService } from "./service.js";
|
||||
import { createCronServiceState } from "./state.js";
|
||||
import { setupCronServiceSuite } from "./service.test-harness.js";
|
||||
import { runMissedJobs } from "./timer.js";
|
||||
|
||||
const { logger: noopLogger, makeStorePath } = setupCronServiceSuite({
|
||||
prefix: "openclaw-cron-",
|
||||
@ -30,6 +32,21 @@ describe("CronService restart catch-up", () => {
|
||||
});
|
||||
}
|
||||
|
||||
function createOverdueEveryJob(id: string, nextRunAtMs: number) {
|
||||
return {
|
||||
id,
|
||||
name: `job-${id}`,
|
||||
enabled: true,
|
||||
createdAtMs: nextRunAtMs - 60_000,
|
||||
updatedAtMs: nextRunAtMs - 60_000,
|
||||
schedule: { kind: "every", everyMs: 60_000, anchorMs: nextRunAtMs - 60_000 },
|
||||
sessionTarget: "main",
|
||||
wakeMode: "next-heartbeat",
|
||||
payload: { kind: "systemEvent", text: `tick-${id}` },
|
||||
state: { nextRunAtMs },
|
||||
};
|
||||
}
|
||||
|
||||
it("executes an overdue recurring job immediately on start", async () => {
|
||||
const store = await makeStorePath();
|
||||
const enqueueSystemEvent = vi.fn();
|
||||
@ -351,4 +368,47 @@ describe("CronService restart catch-up", () => {
|
||||
cron.stop();
|
||||
await store.cleanup();
|
||||
});
|
||||
|
||||
it("reschedules deferred missed jobs from the post-catchup clock so they stay in the future", async () => {
|
||||
const store = await makeStorePath();
|
||||
const startNow = Date.parse("2025-12-13T17:00:00.000Z");
|
||||
let now = startNow;
|
||||
|
||||
await writeStoreJobs(store.storePath, [
|
||||
createOverdueEveryJob("stagger-0", startNow - 60_000),
|
||||
createOverdueEveryJob("stagger-1", startNow - 50_000),
|
||||
createOverdueEveryJob("stagger-2", startNow - 40_000),
|
||||
]);
|
||||
|
||||
const state = createCronServiceState({
|
||||
cronEnabled: true,
|
||||
storePath: store.storePath,
|
||||
log: noopLogger,
|
||||
nowMs: () => now,
|
||||
enqueueSystemEvent: vi.fn(),
|
||||
requestHeartbeatNow: vi.fn(),
|
||||
runIsolatedAgentJob: vi.fn(async () => {
|
||||
now += 6_000;
|
||||
return { status: "ok" as const, summary: "ok" };
|
||||
}),
|
||||
maxMissedJobsPerRestart: 1,
|
||||
missedJobStaggerMs: 5_000,
|
||||
});
|
||||
|
||||
await runMissedJobs(state);
|
||||
|
||||
const staggeredJobs = (state.store?.jobs ?? [])
|
||||
.filter((job) => job.id.startsWith("stagger-") && job.id !== "stagger-0")
|
||||
.toSorted((a, b) => (a.state.nextRunAtMs ?? 0) - (b.state.nextRunAtMs ?? 0));
|
||||
|
||||
expect(staggeredJobs).toHaveLength(2);
|
||||
expect(staggeredJobs[0]?.state.nextRunAtMs).toBeGreaterThan(now);
|
||||
expect(staggeredJobs[1]?.state.nextRunAtMs).toBeGreaterThan(
|
||||
staggeredJobs[0]?.state.nextRunAtMs ?? 0,
|
||||
);
|
||||
expect((staggeredJobs[1]?.state.nextRunAtMs ?? 0) - (staggeredJobs[0]?.state.nextRunAtMs ?? 0))
|
||||
.toBe(5_000);
|
||||
|
||||
await store.cleanup();
|
||||
});
|
||||
});
|
||||
|
||||
@ -48,6 +48,18 @@ export type CronServiceDeps = {
|
||||
resolveSessionStorePath?: (agentId?: string) => string;
|
||||
/** Path to the session store (sessions.json) for reaper use. */
|
||||
sessionStorePath?: string;
|
||||
/**
|
||||
* Delay in ms between missed job executions on startup.
|
||||
* Prevents overwhelming the gateway when many jobs are overdue.
|
||||
* See: https://github.com/openclaw/openclaw/issues/18892
|
||||
*/
|
||||
missedJobStaggerMs?: number;
|
||||
/**
|
||||
* Maximum number of missed jobs to run immediately on startup.
|
||||
* Additional missed jobs will be rescheduled to fire gradually.
|
||||
* See: https://github.com/openclaw/openclaw/issues/18892
|
||||
*/
|
||||
maxMissedJobsPerRestart?: number;
|
||||
enqueueSystemEvent: (
|
||||
text: string,
|
||||
opts?: { agentId?: string; sessionKey?: string; contextKey?: string },
|
||||
|
||||
@ -38,6 +38,9 @@ const MAX_TIMER_DELAY_MS = 60_000;
|
||||
* but always breaks an infinite re-trigger cycle. (See #17821)
|
||||
*/
|
||||
const MIN_REFIRE_GAP_MS = 2_000;
|
||||
|
||||
const DEFAULT_MISSED_JOB_STAGGER_MS = 5_000;
|
||||
const DEFAULT_MAX_MISSED_JOBS_PER_RESTART = 5;
|
||||
const DEFAULT_FAILURE_ALERT_AFTER = 2;
|
||||
const DEFAULT_FAILURE_ALERT_COOLDOWN_MS = 60 * 60_000; // 1 hour
|
||||
|
||||
@ -829,10 +832,18 @@ export async function runMissedJobs(
|
||||
state: CronServiceState,
|
||||
opts?: { skipJobIds?: ReadonlySet<string> },
|
||||
) {
|
||||
const startupCandidates = await locked(state, async () => {
|
||||
const staggerMs = Math.max(0, state.deps.missedJobStaggerMs ?? DEFAULT_MISSED_JOB_STAGGER_MS);
|
||||
const maxImmediate = Math.max(
|
||||
0,
|
||||
state.deps.maxMissedJobsPerRestart ?? DEFAULT_MAX_MISSED_JOBS_PER_RESTART,
|
||||
);
|
||||
const selection = await locked(state, async () => {
|
||||
await ensureLoaded(state, { skipRecompute: true });
|
||||
if (!state.store) {
|
||||
return [] as Array<{ jobId: string; job: CronJob }>;
|
||||
return {
|
||||
deferredJobIds: [] as string[],
|
||||
startupCandidates: [] as Array<{ jobId: string; job: CronJob }>,
|
||||
};
|
||||
}
|
||||
const now = state.deps.nowMs();
|
||||
const skipJobIds = opts?.skipJobIds;
|
||||
@ -842,26 +853,47 @@ export async function runMissedJobs(
|
||||
allowCronMissedRunByLastRun: true,
|
||||
});
|
||||
if (missed.length === 0) {
|
||||
return [] as Array<{ jobId: string; job: CronJob }>;
|
||||
return {
|
||||
deferredJobIds: [] as string[],
|
||||
startupCandidates: [] as Array<{ jobId: string; job: CronJob }>,
|
||||
};
|
||||
}
|
||||
state.deps.log.info(
|
||||
{ count: missed.length, jobIds: missed.map((j) => j.id) },
|
||||
"cron: running missed jobs after restart",
|
||||
);
|
||||
for (const job of missed) {
|
||||
const sorted = missed.toSorted((a, b) => (a.state.nextRunAtMs ?? 0) - (b.state.nextRunAtMs ?? 0));
|
||||
const startupCandidates = sorted.slice(0, maxImmediate);
|
||||
const deferred = sorted.slice(maxImmediate);
|
||||
if (deferred.length > 0) {
|
||||
state.deps.log.info(
|
||||
{
|
||||
immediateCount: startupCandidates.length,
|
||||
deferredCount: deferred.length,
|
||||
totalMissed: missed.length,
|
||||
},
|
||||
"cron: staggering missed jobs to prevent gateway overload",
|
||||
);
|
||||
}
|
||||
if (startupCandidates.length > 0) {
|
||||
state.deps.log.info(
|
||||
{ count: startupCandidates.length, jobIds: startupCandidates.map((j) => j.id) },
|
||||
"cron: running missed jobs after restart",
|
||||
);
|
||||
}
|
||||
for (const job of startupCandidates) {
|
||||
job.state.runningAtMs = now;
|
||||
job.state.lastError = undefined;
|
||||
}
|
||||
await persist(state);
|
||||
return missed.map((job) => ({ jobId: job.id, job }));
|
||||
return {
|
||||
deferredJobIds: deferred.map((job) => job.id),
|
||||
startupCandidates: startupCandidates.map((job) => ({ jobId: job.id, job })),
|
||||
};
|
||||
});
|
||||
|
||||
if (startupCandidates.length === 0) {
|
||||
if (selection.startupCandidates.length === 0 && selection.deferredJobIds.length === 0) {
|
||||
return;
|
||||
}
|
||||
|
||||
const outcomes: Array<TimedCronRunOutcome> = [];
|
||||
for (const candidate of startupCandidates) {
|
||||
for (const candidate of selection.startupCandidates) {
|
||||
const startedAt = state.deps.nowMs();
|
||||
emit(state, { jobId: candidate.job.id, action: "started", runAtMs: startedAt });
|
||||
try {
|
||||
@ -901,6 +933,19 @@ export async function runMissedJobs(
|
||||
applyOutcomeToStoredJob(state, result);
|
||||
}
|
||||
|
||||
if (selection.deferredJobIds.length > 0) {
|
||||
const baseNow = state.deps.nowMs();
|
||||
let offset = staggerMs;
|
||||
for (const jobId of selection.deferredJobIds) {
|
||||
const job = state.store.jobs.find((entry) => entry.id === jobId);
|
||||
if (!job || !job.enabled) {
|
||||
continue;
|
||||
}
|
||||
job.state.nextRunAtMs = baseNow + offset;
|
||||
offset += staggerMs;
|
||||
}
|
||||
}
|
||||
|
||||
// Preserve any new past-due nextRunAtMs values that became due while
|
||||
// startup catch-up was running. They should execute on a future tick
|
||||
// instead of being silently advanced.
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user