From 9f9e6b7cfe0954af8101c9a2e6b8849996d7faf0 Mon Sep 17 00:00:00 2001 From: create Date: Fri, 20 Mar 2026 16:59:33 +0800 Subject: [PATCH] fix(cron): handle latest review edge cases in manual merge and apply wake --- ...external-reload-schedule-recompute.test.ts | 60 +++++++++++++++++++ src/cron/service.issue-regressions.test.ts | 41 ++++++++++++- src/cron/service/ops.ts | 53 +++++++++++++--- src/cron/service/timer.ts | 12 +++- 4 files changed, 155 insertions(+), 11 deletions(-) diff --git a/src/cron/service.external-reload-schedule-recompute.test.ts b/src/cron/service.external-reload-schedule-recompute.test.ts index 7fde2d75d3c..29029c3a197 100644 --- a/src/cron/service.external-reload-schedule-recompute.test.ts +++ b/src/cron/service.external-reload-schedule-recompute.test.ts @@ -396,4 +396,64 @@ describe("forceReload repairs externally changed cron schedules", () => { expect(persistedJob?.state?.nextRunAtMs).toBeUndefined(); expect(persistedJob?.state?.lastStatus).toBe("ok"); }); + + it("keeps one-shot terminal disable state when manual force-run reloads unchanged store", async () => { + const store = await makeStorePath(); + let nowMs = Date.parse("2026-03-19T01:44:00.000Z"); + const jobId = "manual-run-at-terminal-state"; + const scheduledAtMs = nowMs + 60_000; + + const createJob = (): CronJob => ({ + id: jobId, + name: "manual run at terminal state", + enabled: true, + createdAtMs: Date.parse("2026-03-18T00:30:00.000Z"), + updatedAtMs: Date.parse("2026-03-19T01:44:00.000Z"), + schedule: { kind: "at", at: new Date(scheduledAtMs).toISOString() }, + sessionTarget: "isolated", + wakeMode: "next-heartbeat", + payload: { kind: "agentTurn", message: "tick" }, + state: { + nextRunAtMs: scheduledAtMs, + }, + }); + + await writeCronStoreSnapshot({ + storePath: store.storePath, + jobs: [createJob()], + }); + + const state = createCronServiceState({ + cronEnabled: true, + storePath: store.storePath, + log: noopLogger, + nowMs: () => nowMs, + enqueueSystemEvent: vi.fn(), + requestHeartbeatNow: vi.fn(), + runIsolatedAgentJob: vi.fn(async () => { + nowMs += 500; + return { status: "ok" as const, summary: "done" }; + }), + }); + + const result = await run(state, jobId, "force"); + expect(result).toEqual({ ok: true, ran: true }); + + const merged = state.store?.jobs.find((job) => job.id === jobId); + expect(merged?.enabled).toBe(false); + expect(merged?.state.nextRunAtMs).toBeUndefined(); + expect(merged?.state.lastStatus).toBe("ok"); + + const persisted = JSON.parse(await fs.readFile(store.storePath, "utf8")) as { + jobs?: Array<{ + id: string; + enabled?: boolean; + state?: { nextRunAtMs?: number; lastStatus?: string }; + }>; + }; + const persistedJob = persisted.jobs?.find((job) => job.id === jobId); + expect(persistedJob?.enabled).toBe(false); + expect(persistedJob?.state?.nextRunAtMs).toBeUndefined(); + expect(persistedJob?.state?.lastStatus).toBe("ok"); + }); }); diff --git a/src/cron/service.issue-regressions.test.ts b/src/cron/service.issue-regressions.test.ts index e29a059f39d..c8078672b97 100644 --- a/src/cron/service.issue-regressions.test.ts +++ b/src/cron/service.issue-regressions.test.ts @@ -1780,14 +1780,51 @@ describe("Cron issue regressions", () => { }); expect(job.state.scheduleErrorCount).toBe(1); - expect(state.skipNextReloadRepairRecomputeJobIds?.has(job.id)).toBe(true); + expect(state.skipNextReloadRepairRecomputeJobIds?.has(job.id)).toBe(false); recomputeNextRunsForMaintenance(state); expect(job.state.scheduleErrorCount).toBe(1); expect(state.skipNextReloadRepairRecomputeJobIds?.has(job.id)).toBe(false); + }); + + it("keeps a future wake when apply skips immediate recompute after reload schedule error", () => { + const startedAt = Date.parse("2026-03-02T12:12:00.000Z"); + const endedAt = startedAt + 25; + const job = createIsolatedRegressionJob({ + id: "apply-result-reload-wake-30905", + name: "apply-result-reload-wake-30905", + scheduledAt: startedAt, + schedule: { kind: "cron", expr: "0 7 * * *", tz: "Invalid/Timezone" }, + payload: { kind: "agentTurn", message: "ping" }, + state: { + nextRunAtMs: undefined, + runningAtMs: startedAt - 500, + scheduleErrorCount: 1, + lastError: "schedule error: previous", + }, + }); + const state = createRunningCronServiceState({ + storePath: "/tmp/cron-30905-reload-wake.json", + log: noopLogger as never, + nowMs: () => endedAt, + jobs: [job], + }); + state.skipNextReloadRepairRecomputeJobIds = new Set([job.id]); + + applyJobResult(state, job, { + status: "error", + error: "synthetic failure", + startedAt, + endedAt, + }); + + expect(job.state.scheduleErrorCount).toBe(1); + expect(job.state.nextRunAtMs).toBe(endedAt + 30_000); + expect(state.skipNextReloadRepairRecomputeJobIds?.has(job.id)).toBe(false); recomputeNextRunsForMaintenance(state); - expect(job.state.scheduleErrorCount).toBe(2); + expect(job.state.scheduleErrorCount).toBe(1); + expect(state.skipNextReloadRepairRecomputeJobIds?.has(job.id)).toBe(false); }); it("force run preserves 'every' anchor while recording manual lastRunAtMs", () => { diff --git a/src/cron/service/ops.ts b/src/cron/service/ops.ts index e932e666cae..58b12933839 100644 --- a/src/cron/service/ops.ts +++ b/src/cron/service/ops.ts @@ -47,13 +47,35 @@ export type CronListPageResult = { hasMore: boolean; nextOffset: number | null; }; + +function schedulesEqual(a: CronJob["schedule"], b: CronJob["schedule"]): boolean { + if (a.kind !== b.kind) { + return false; + } + if (a.kind === "at" && b.kind === "at") { + return a.at === b.at; + } + if (a.kind === "every" && b.kind === "every") { + return a.everyMs === b.everyMs && a.anchorMs === b.anchorMs; + } + if (a.kind === "cron" && b.kind === "cron") { + return a.expr === b.expr && a.tz === b.tz && a.staggerMs === b.staggerMs; + } + return false; +} + function mergeManualRunSnapshotAfterReload(params: { state: CronServiceState; jobId: string; snapshot: { + enabled: boolean; updatedAtMs: number; state: CronJob["state"]; } | null; + baseline: { + enabled: boolean; + schedule: CronJob["schedule"]; + } | null; removed: boolean; }) { if (!params.state.store) { @@ -74,6 +96,10 @@ function mergeManualRunSnapshotAfterReload(params: { const preservedNextRunAtMs = reloaded.state.nextRunAtMs; const preservedScheduleErrorCount = reloaded.state.scheduleErrorCount; const preservedScheduleErrorText = reloaded.state.lastError; + const externalScheduleOrEnabledChanged = + params.baseline !== null && + (preservedEnabled !== params.baseline.enabled || + !schedulesEqual(reloaded.schedule, params.baseline.schedule)); reloaded.updatedAtMs = Math.max(reloaded.updatedAtMs, params.snapshot.updatedAtMs); reloaded.state = { @@ -81,13 +107,18 @@ function mergeManualRunSnapshotAfterReload(params: { ...params.snapshot.state, }; - // Keep externally reloaded schedule/enable repairs even when a manual run - // snapshot was captured before forceReload. - reloaded.enabled = preservedEnabled; - reloaded.state.nextRunAtMs = preservedNextRunAtMs; - if (preservedScheduleErrorCount !== undefined) { - reloaded.state.scheduleErrorCount = preservedScheduleErrorCount; - reloaded.state.lastError = preservedScheduleErrorText; + // Only preserve reload-derived schedule/enable repairs when the underlying + // schedule or enabled flag was externally changed while the manual run was executing. + // Otherwise, keep the manual-run terminal state (e.g. one-shot disable on success). + if (externalScheduleOrEnabledChanged) { + reloaded.enabled = preservedEnabled; + reloaded.state.nextRunAtMs = preservedNextRunAtMs; + if (preservedScheduleErrorCount !== undefined) { + reloaded.state.scheduleErrorCount = preservedScheduleErrorCount; + reloaded.state.lastError = preservedScheduleErrorText; + } + } else { + reloaded.enabled = params.snapshot.enabled; } } @@ -534,9 +565,16 @@ async function finishPreparedManualRun( const postRunSnapshot = shouldDelete ? null : { + enabled: job.enabled, updatedAtMs: job.updatedAtMs, state: structuredClone(job.state), }; + const postRunBaseline = shouldDelete + ? null + : { + enabled: executionJob.enabled, + schedule: structuredClone(executionJob.schedule), + }; const postRunRemoved = shouldDelete; // Isolated Telegram send can persist target writeback directly to disk. // Reload before final persist so manual `cron run` keeps those changes. @@ -545,6 +583,7 @@ async function finishPreparedManualRun( state, jobId, snapshot: postRunSnapshot, + baseline: postRunBaseline, removed: postRunRemoved, }); recomputeNextRunsForMaintenance(state, { recomputeExpired: true }); diff --git a/src/cron/service/timer.ts b/src/cron/service/timer.ts index b3c130b124b..ecac35d5fe8 100644 --- a/src/cron/service/timer.ts +++ b/src/cron/service/timer.ts @@ -15,7 +15,7 @@ import type { import { computeJobPreviousRunAtMs, computeJobNextRunAtMs, - hasSkipNextReloadRepairRecompute, + consumeSkipNextReloadRepairRecompute, nextWakeAtMs, recomputeNextRunsForMaintenance, recordScheduleComputeError, @@ -369,7 +369,7 @@ export function applyJobResult( const shouldDelete = job.schedule.kind === "at" && job.deleteAfterRun === true && result.status === "ok"; - const skipImmediateScheduleRecompute = hasSkipNextReloadRepairRecompute(state, job.id); + const skipImmediateScheduleRecompute = consumeSkipNextReloadRepairRecompute(state, job.id); if (!shouldDelete) { if (job.schedule.kind === "at") { @@ -444,6 +444,10 @@ export function applyJobResult( }, "cron: applying error backoff", ); + } else { + // Keep a future wake so we don't stall when the one-shot skip marker + // defers immediate schedule recompute after reload repair. + job.state.nextRunAtMs = result.endedAt + backoff; } } else if (job.enabled) { if (!skipImmediateScheduleRecompute) { @@ -470,6 +474,10 @@ export function applyJobResult( } else { job.state.nextRunAtMs = naturalNext; } + } else if (job.state.nextRunAtMs === undefined) { + // Keep timer progress when immediate recompute is deferred by the + // reload-repair skip marker. + job.state.nextRunAtMs = result.endedAt + MIN_REFIRE_GAP_MS; } } else { job.state.nextRunAtMs = undefined;