Tyler Yust 3f82daefd8 feat(cron): enhance delivery modes and job configuration
- Updated isolated cron jobs to support new delivery modes: `announce` and `none`, improving output management.
- Refactored job configuration to remove legacy fields and streamline delivery settings.
- Enhanced the `CronJobEditor` UI to reflect changes in delivery options, including a new segmented control for delivery mode selection.
- Updated documentation to clarify the new delivery configurations and their implications for job execution.
- Improved tests to validate the new delivery behavior and ensure backward compatibility with legacy settings.

This update provides users with greater flexibility in managing how isolated jobs deliver their outputs, enhancing overall usability and clarity in job configurations.
2026-02-04 01:03:59 -08:00

234 lines
6.7 KiB
TypeScript

import type { HeartbeatRunResult } from "../../infra/heartbeat-wake.js";
import type { CronJob } from "../types.js";
import type { CronEvent, CronServiceState } from "./state.js";
import { computeJobNextRunAtMs, nextWakeAtMs, resolveJobPayloadTextForMain } from "./jobs.js";
import { locked } from "./locked.js";
import { ensureLoaded, persist } from "./store.js";
const MAX_TIMEOUT_MS = 2 ** 31 - 1;
export function armTimer(state: CronServiceState) {
if (state.timer) {
clearTimeout(state.timer);
}
state.timer = null;
if (!state.deps.cronEnabled) {
return;
}
const nextAt = nextWakeAtMs(state);
if (!nextAt) {
return;
}
const delay = Math.max(nextAt - state.deps.nowMs(), 0);
// Avoid TimeoutOverflowWarning when a job is far in the future.
const clampedDelay = Math.min(delay, MAX_TIMEOUT_MS);
state.timer = setTimeout(() => {
void onTimer(state).catch((err) => {
state.deps.log.error({ err: String(err) }, "cron: timer tick failed");
});
}, clampedDelay);
state.timer.unref?.();
}
export async function onTimer(state: CronServiceState) {
if (state.running) {
return;
}
state.running = true;
try {
await locked(state, async () => {
await ensureLoaded(state);
await runDueJobs(state);
await persist(state);
armTimer(state);
});
} finally {
state.running = false;
}
}
export async function runDueJobs(state: CronServiceState) {
if (!state.store) {
return;
}
const now = state.deps.nowMs();
const due = state.store.jobs.filter((j) => {
if (!j.enabled) {
return false;
}
if (typeof j.state.runningAtMs === "number") {
return false;
}
const next = j.state.nextRunAtMs;
return typeof next === "number" && now >= next;
});
for (const job of due) {
await executeJob(state, job, now, { forced: false });
}
}
export async function executeJob(
state: CronServiceState,
job: CronJob,
nowMs: number,
opts: { forced: boolean },
) {
const startedAt = state.deps.nowMs();
job.state.runningAtMs = startedAt;
job.state.lastError = undefined;
emit(state, { jobId: job.id, action: "started", runAtMs: startedAt });
let deleted = false;
const finish = async (status: "ok" | "error" | "skipped", err?: string, summary?: string) => {
const endedAt = state.deps.nowMs();
job.state.runningAtMs = undefined;
job.state.lastRunAtMs = startedAt;
job.state.lastStatus = status;
job.state.lastDurationMs = Math.max(0, endedAt - startedAt);
job.state.lastError = err;
const shouldDelete =
job.schedule.kind === "at" && status === "ok" && job.deleteAfterRun === true;
if (!shouldDelete) {
if (job.schedule.kind === "at" && status === "ok") {
// One-shot job completed successfully; disable it.
job.enabled = false;
job.state.nextRunAtMs = undefined;
} else if (job.enabled) {
job.state.nextRunAtMs = computeJobNextRunAtMs(job, endedAt);
} else {
job.state.nextRunAtMs = undefined;
}
}
emit(state, {
jobId: job.id,
action: "finished",
status,
error: err,
summary,
runAtMs: startedAt,
durationMs: job.state.lastDurationMs,
nextRunAtMs: job.state.nextRunAtMs,
});
if (shouldDelete && state.store) {
state.store.jobs = state.store.jobs.filter((j) => j.id !== job.id);
deleted = true;
emit(state, { jobId: job.id, action: "removed" });
}
};
try {
if (job.sessionTarget === "main") {
const text = resolveJobPayloadTextForMain(job);
if (!text) {
const kind = job.payload.kind;
await finish(
"skipped",
kind === "systemEvent"
? "main job requires non-empty systemEvent text"
: 'main job requires payload.kind="systemEvent"',
);
return;
}
state.deps.enqueueSystemEvent(text, { agentId: job.agentId });
if (job.wakeMode === "now" && state.deps.runHeartbeatOnce) {
const reason = `cron:${job.id}`;
const delay = (ms: number) => new Promise<void>((resolve) => setTimeout(resolve, ms));
const maxWaitMs = 2 * 60_000;
const waitStartedAt = state.deps.nowMs();
let heartbeatResult: HeartbeatRunResult;
for (;;) {
heartbeatResult = await state.deps.runHeartbeatOnce({ reason });
if (
heartbeatResult.status !== "skipped" ||
heartbeatResult.reason !== "requests-in-flight"
) {
break;
}
if (state.deps.nowMs() - waitStartedAt > maxWaitMs) {
heartbeatResult = {
status: "skipped",
reason: "timeout waiting for main lane to become idle",
};
break;
}
await delay(250);
}
if (heartbeatResult.status === "ran") {
await finish("ok", undefined, text);
} else if (heartbeatResult.status === "skipped") {
await finish("skipped", heartbeatResult.reason, text);
} else {
await finish("error", heartbeatResult.reason, text);
}
} else {
// wakeMode is "next-heartbeat" or runHeartbeatOnce not available
state.deps.requestHeartbeatNow({ reason: `cron:${job.id}` });
await finish("ok", undefined, text);
}
return;
}
if (job.payload.kind !== "agentTurn") {
await finish("skipped", "isolated job requires payload.kind=agentTurn");
return;
}
const res = await state.deps.runIsolatedAgentJob({
job,
message: job.payload.message,
});
if (res.status === "ok") {
await finish("ok", undefined, res.summary);
} else if (res.status === "skipped") {
await finish("skipped", undefined, res.summary);
} else {
await finish("error", res.error ?? "cron job failed", res.summary);
}
} catch (err) {
await finish("error", String(err));
} finally {
job.updatedAtMs = nowMs;
if (!opts.forced && job.enabled && !deleted) {
// Keep nextRunAtMs in sync in case the schedule advanced during a long run.
job.state.nextRunAtMs = computeJobNextRunAtMs(job, state.deps.nowMs());
}
}
}
export function wake(
state: CronServiceState,
opts: { mode: "now" | "next-heartbeat"; text: string },
) {
const text = opts.text.trim();
if (!text) {
return { ok: false } as const;
}
state.deps.enqueueSystemEvent(text);
if (opts.mode === "now") {
state.deps.requestHeartbeatNow({ reason: "wake" });
}
return { ok: true } as const;
}
export function stopTimer(state: CronServiceState) {
if (state.timer) {
clearTimeout(state.timer);
}
state.timer = null;
}
export function emit(state: CronServiceState, evt: CronEvent) {
try {
state.deps.onEvent?.(evt);
} catch {
/* ignore */
}
}