fix(cron): handle latest review edge cases in manual merge and apply wake

This commit is contained in:
create 2026-03-20 16:59:33 +08:00
parent c3338cdb9f
commit 9f9e6b7cfe
4 changed files with 155 additions and 11 deletions

View File

@ -396,4 +396,64 @@ describe("forceReload repairs externally changed cron schedules", () => {
expect(persistedJob?.state?.nextRunAtMs).toBeUndefined();
expect(persistedJob?.state?.lastStatus).toBe("ok");
});
it("keeps one-shot terminal disable state when manual force-run reloads unchanged store", async () => {
const store = await makeStorePath();
let nowMs = Date.parse("2026-03-19T01:44:00.000Z");
const jobId = "manual-run-at-terminal-state";
const scheduledAtMs = nowMs + 60_000;
const createJob = (): CronJob => ({
id: jobId,
name: "manual run at terminal state",
enabled: true,
createdAtMs: Date.parse("2026-03-18T00:30:00.000Z"),
updatedAtMs: Date.parse("2026-03-19T01:44:00.000Z"),
schedule: { kind: "at", at: new Date(scheduledAtMs).toISOString() },
sessionTarget: "isolated",
wakeMode: "next-heartbeat",
payload: { kind: "agentTurn", message: "tick" },
state: {
nextRunAtMs: scheduledAtMs,
},
});
await writeCronStoreSnapshot({
storePath: store.storePath,
jobs: [createJob()],
});
const state = createCronServiceState({
cronEnabled: true,
storePath: store.storePath,
log: noopLogger,
nowMs: () => nowMs,
enqueueSystemEvent: vi.fn(),
requestHeartbeatNow: vi.fn(),
runIsolatedAgentJob: vi.fn(async () => {
nowMs += 500;
return { status: "ok" as const, summary: "done" };
}),
});
const result = await run(state, jobId, "force");
expect(result).toEqual({ ok: true, ran: true });
const merged = state.store?.jobs.find((job) => job.id === jobId);
expect(merged?.enabled).toBe(false);
expect(merged?.state.nextRunAtMs).toBeUndefined();
expect(merged?.state.lastStatus).toBe("ok");
const persisted = JSON.parse(await fs.readFile(store.storePath, "utf8")) as {
jobs?: Array<{
id: string;
enabled?: boolean;
state?: { nextRunAtMs?: number; lastStatus?: string };
}>;
};
const persistedJob = persisted.jobs?.find((job) => job.id === jobId);
expect(persistedJob?.enabled).toBe(false);
expect(persistedJob?.state?.nextRunAtMs).toBeUndefined();
expect(persistedJob?.state?.lastStatus).toBe("ok");
});
});

View File

@ -1780,14 +1780,51 @@ describe("Cron issue regressions", () => {
});
expect(job.state.scheduleErrorCount).toBe(1);
expect(state.skipNextReloadRepairRecomputeJobIds?.has(job.id)).toBe(true);
expect(state.skipNextReloadRepairRecomputeJobIds?.has(job.id)).toBe(false);
recomputeNextRunsForMaintenance(state);
expect(job.state.scheduleErrorCount).toBe(1);
expect(state.skipNextReloadRepairRecomputeJobIds?.has(job.id)).toBe(false);
});
it("keeps a future wake when apply skips immediate recompute after reload schedule error", () => {
const startedAt = Date.parse("2026-03-02T12:12:00.000Z");
const endedAt = startedAt + 25;
const job = createIsolatedRegressionJob({
id: "apply-result-reload-wake-30905",
name: "apply-result-reload-wake-30905",
scheduledAt: startedAt,
schedule: { kind: "cron", expr: "0 7 * * *", tz: "Invalid/Timezone" },
payload: { kind: "agentTurn", message: "ping" },
state: {
nextRunAtMs: undefined,
runningAtMs: startedAt - 500,
scheduleErrorCount: 1,
lastError: "schedule error: previous",
},
});
const state = createRunningCronServiceState({
storePath: "/tmp/cron-30905-reload-wake.json",
log: noopLogger as never,
nowMs: () => endedAt,
jobs: [job],
});
state.skipNextReloadRepairRecomputeJobIds = new Set([job.id]);
applyJobResult(state, job, {
status: "error",
error: "synthetic failure",
startedAt,
endedAt,
});
expect(job.state.scheduleErrorCount).toBe(1);
expect(job.state.nextRunAtMs).toBe(endedAt + 30_000);
expect(state.skipNextReloadRepairRecomputeJobIds?.has(job.id)).toBe(false);
recomputeNextRunsForMaintenance(state);
expect(job.state.scheduleErrorCount).toBe(2);
expect(job.state.scheduleErrorCount).toBe(1);
expect(state.skipNextReloadRepairRecomputeJobIds?.has(job.id)).toBe(false);
});
it("force run preserves 'every' anchor while recording manual lastRunAtMs", () => {

View File

@ -47,13 +47,35 @@ export type CronListPageResult = {
hasMore: boolean;
nextOffset: number | null;
};
function schedulesEqual(a: CronJob["schedule"], b: CronJob["schedule"]): boolean {
if (a.kind !== b.kind) {
return false;
}
if (a.kind === "at" && b.kind === "at") {
return a.at === b.at;
}
if (a.kind === "every" && b.kind === "every") {
return a.everyMs === b.everyMs && a.anchorMs === b.anchorMs;
}
if (a.kind === "cron" && b.kind === "cron") {
return a.expr === b.expr && a.tz === b.tz && a.staggerMs === b.staggerMs;
}
return false;
}
function mergeManualRunSnapshotAfterReload(params: {
state: CronServiceState;
jobId: string;
snapshot: {
enabled: boolean;
updatedAtMs: number;
state: CronJob["state"];
} | null;
baseline: {
enabled: boolean;
schedule: CronJob["schedule"];
} | null;
removed: boolean;
}) {
if (!params.state.store) {
@ -74,6 +96,10 @@ function mergeManualRunSnapshotAfterReload(params: {
const preservedNextRunAtMs = reloaded.state.nextRunAtMs;
const preservedScheduleErrorCount = reloaded.state.scheduleErrorCount;
const preservedScheduleErrorText = reloaded.state.lastError;
const externalScheduleOrEnabledChanged =
params.baseline !== null &&
(preservedEnabled !== params.baseline.enabled ||
!schedulesEqual(reloaded.schedule, params.baseline.schedule));
reloaded.updatedAtMs = Math.max(reloaded.updatedAtMs, params.snapshot.updatedAtMs);
reloaded.state = {
@ -81,13 +107,18 @@ function mergeManualRunSnapshotAfterReload(params: {
...params.snapshot.state,
};
// Keep externally reloaded schedule/enable repairs even when a manual run
// snapshot was captured before forceReload.
reloaded.enabled = preservedEnabled;
reloaded.state.nextRunAtMs = preservedNextRunAtMs;
if (preservedScheduleErrorCount !== undefined) {
reloaded.state.scheduleErrorCount = preservedScheduleErrorCount;
reloaded.state.lastError = preservedScheduleErrorText;
// Only preserve reload-derived schedule/enable repairs when the underlying
// schedule or enabled flag was externally changed while the manual run was executing.
// Otherwise, keep the manual-run terminal state (e.g. one-shot disable on success).
if (externalScheduleOrEnabledChanged) {
reloaded.enabled = preservedEnabled;
reloaded.state.nextRunAtMs = preservedNextRunAtMs;
if (preservedScheduleErrorCount !== undefined) {
reloaded.state.scheduleErrorCount = preservedScheduleErrorCount;
reloaded.state.lastError = preservedScheduleErrorText;
}
} else {
reloaded.enabled = params.snapshot.enabled;
}
}
@ -534,9 +565,16 @@ async function finishPreparedManualRun(
const postRunSnapshot = shouldDelete
? null
: {
enabled: job.enabled,
updatedAtMs: job.updatedAtMs,
state: structuredClone(job.state),
};
const postRunBaseline = shouldDelete
? null
: {
enabled: executionJob.enabled,
schedule: structuredClone(executionJob.schedule),
};
const postRunRemoved = shouldDelete;
// Isolated Telegram send can persist target writeback directly to disk.
// Reload before final persist so manual `cron run` keeps those changes.
@ -545,6 +583,7 @@ async function finishPreparedManualRun(
state,
jobId,
snapshot: postRunSnapshot,
baseline: postRunBaseline,
removed: postRunRemoved,
});
recomputeNextRunsForMaintenance(state, { recomputeExpired: true });

View File

@ -15,7 +15,7 @@ import type {
import {
computeJobPreviousRunAtMs,
computeJobNextRunAtMs,
hasSkipNextReloadRepairRecompute,
consumeSkipNextReloadRepairRecompute,
nextWakeAtMs,
recomputeNextRunsForMaintenance,
recordScheduleComputeError,
@ -369,7 +369,7 @@ export function applyJobResult(
const shouldDelete =
job.schedule.kind === "at" && job.deleteAfterRun === true && result.status === "ok";
const skipImmediateScheduleRecompute = hasSkipNextReloadRepairRecompute(state, job.id);
const skipImmediateScheduleRecompute = consumeSkipNextReloadRepairRecompute(state, job.id);
if (!shouldDelete) {
if (job.schedule.kind === "at") {
@ -444,6 +444,10 @@ export function applyJobResult(
},
"cron: applying error backoff",
);
} else {
// Keep a future wake so we don't stall when the one-shot skip marker
// defers immediate schedule recompute after reload repair.
job.state.nextRunAtMs = result.endedAt + backoff;
}
} else if (job.enabled) {
if (!skipImmediateScheduleRecompute) {
@ -470,6 +474,10 @@ export function applyJobResult(
} else {
job.state.nextRunAtMs = naturalNext;
}
} else if (job.state.nextRunAtMs === undefined) {
// Keep timer progress when immediate recompute is deferred by the
// reload-repair skip marker.
job.state.nextRunAtMs = result.endedAt + MIN_REFIRE_GAP_MS;
}
} else {
job.state.nextRunAtMs = undefined;