diff --git a/docs/automation/cron-jobs.md b/docs/automation/cron-jobs.md index d58683aedea..b8a70ce0f99 100644 --- a/docs/automation/cron-jobs.md +++ b/docs/automation/cron-jobs.md @@ -692,6 +692,68 @@ Immediate system event without creating a job: openclaw system event --mode now --text "Next heartbeat: check battery." ``` +## Pre-check gate (skip when nothing changed) + +Recurring jobs often wake an agent when there's nothing to act on — wasting tokens on "nothing new" responses. The **pre-check gate** runs a lightweight shell command before the agent turn; if the command fails or returns empty output, the job is skipped entirely. + +```bash +openclaw cron add \ + --name "PR review" \ + --every "30m" \ + --session isolated \ + --message "Review these open PRs and flag any issues." \ + --pre-check 'gh pr list --state open --json number | jq "if length > 0 then . else empty end"' +``` + +### How it works + +| Pre-check result | Job outcome | +| ------------------------- | ---------------------------------------------------- | +| Exit 0 + non-empty stdout | Job proceeds — stdout optionally included as context | +| Exit non-zero | Job skipped (status: `skipped`) | +| Empty stdout | Job skipped (nothing to do) | +| Timeout (default: 30s) | Job skipped | + +### Pre-check fields + +| Field | Type | Default | Description | +| ---------------- | ---------------------------------------- | ----------- | -------------------------------------- | +| `command` | string | _required_ | Shell command to execute | +| `timeoutSeconds` | number | `30` | Kill and skip on timeout | +| `outputMode` | `"prepend"` \| `"replace"` \| `"ignore"` | `"prepend"` | How to use stdout in the agent message | + +### Output modes + +- **prepend** (default): Stdout is prepended to the agent message as `[Pre-check context]`, giving the agent the data without a separate lookup. +- **replace**: Stdout becomes the entire agent message (useful when the script produces the full prompt). +- **ignore**: Only the pass/fail gate matters; stdout is discarded. + +### Examples + +```bash +# Only summarize when new mail arrived (check returns count) +--pre-check 'python3 -c "import imaplib; m=imaplib.IMAP4_SSL(\"imap.gmail.com\"); m.login(\"...\",\"...\"); print(len(m.search(None,\"UNSEEN\")[1][0].split()))"' + +# Only alert when disk usage exceeds 80% +--pre-check 'df -h / | awk "NR==2 {gsub(/%/,\"\",\$5); if(\$5>80) print \$5\"% used\"; else exit 1}"' + +# Only process when a file changed since last check +--pre-check 'find /data -newer /tmp/.last-check -type f | head -5' + +# Only wake when API returns non-empty results +--pre-check 'curl -sf https://api.example.com/pending | jq "if length > 0 then . else empty end"' +``` + +### Updating or removing a pre-check + +```bash +# Update the command +openclaw cron update --pre-check 'new-command-here' + +# Remove the pre-check (job always runs) +openclaw cron update --pre-check '' +``` + ## Gateway API surface - `cron.list`, `cron.status`, `cron.add`, `cron.update`, `cron.remove` diff --git a/src/cron/normalize.ts b/src/cron/normalize.ts index b1afdfaaa12..eb0a87469c1 100644 --- a/src/cron/normalize.ts +++ b/src/cron/normalize.ts @@ -406,6 +406,29 @@ export function normalizeCronJobInput( next.delivery = coerceDelivery(base.delivery); } + // Pre-check gate normalization + if (base.preCheck === null) { + next.preCheck = null; // explicit removal in patch + } else if (isRecord(base.preCheck)) { + const pc = base.preCheck; + const preCheck: Record = {}; + if (typeof pc.command === "string" && pc.command.trim()) { + preCheck.command = pc.command.trim(); + } + if (typeof pc.timeoutSeconds === "number" && Number.isFinite(pc.timeoutSeconds)) { + preCheck.timeoutSeconds = Math.max(1, Math.floor(pc.timeoutSeconds)); + } + if (typeof pc.outputMode === "string") { + const mode = pc.outputMode.trim().toLowerCase(); + if (mode === "prepend" || mode === "replace" || mode === "ignore") { + preCheck.outputMode = mode; + } + } + if (preCheck.command) { + next.preCheck = preCheck; + } + } + if ("isolation" in next) { delete next.isolation; } diff --git a/src/cron/pre-check.test.ts b/src/cron/pre-check.test.ts new file mode 100644 index 00000000000..6fc3b78be5d --- /dev/null +++ b/src/cron/pre-check.test.ts @@ -0,0 +1,104 @@ +import { describe, it, expect } from "vitest"; +import { runPreCheck, applyPreCheckOutput } from "./pre-check.js"; + +const isWindows = process.platform === "win32"; + +describe("cron pre-check gate", () => { + describe("runPreCheck", () => { + it("passes when command exits 0 with output", async () => { + const result = await runPreCheck({ command: 'echo "hello world"' }); + expect(result.passed).toBe(true); + if (result.passed) { + expect(result.output).toContain("hello world"); + } + }); + + it("fails when command exits non-zero", async () => { + const cmd = isWindows ? "exit /b 1" : "exit 1"; + const result = await runPreCheck({ command: cmd }); + expect(result.passed).toBe(false); + if (!result.passed) { + expect(result.reason).toContain("exited with code 1"); + } + }); + + it("fails when command produces empty stdout", async () => { + // node -e "" produces no output on all platforms + const result = await runPreCheck({ command: 'node -e ""' }); + expect(result.passed).toBe(false); + if (!result.passed) { + expect(result.reason).toContain("empty output"); + } + }); + + it("fails on timeout", async () => { + const cmd = isWindows ? "ping -n 11 127.0.0.1 > nul" : "sleep 10"; + const result = await runPreCheck({ + command: cmd, + timeoutSeconds: 1, + }); + expect(result.passed).toBe(false); + if (!result.passed) { + expect(result.reason).toMatch(/timed out|error/i); + } + }); + + it("passes with multi-line output", async () => { + const cmd = isWindows + ? "node -e \"console.log('line1');console.log('line2');console.log('line3')\"" + : 'echo "line1"; echo "line2"; echo "line3"'; + const result = await runPreCheck({ command: cmd }); + expect(result.passed).toBe(true); + if (result.passed) { + expect(result.output).toContain("line1"); + expect(result.output).toContain("line3"); + } + }); + + it("includes stderr hint on non-zero exit", async () => { + const cmd = isWindows + ? "node -e \"process.stderr.write('oops');process.exit(2)\"" + : 'echo "oops" >&2; exit 2'; + const result = await runPreCheck({ command: cmd }); + expect(result.passed).toBe(false); + if (!result.passed) { + expect(result.reason).toContain("oops"); + expect(result.reason).toContain("code 2"); + } + }); + + it("handles command not found", async () => { + const result = await runPreCheck({ + command: "nonexistent_command_xyz_12345", + }); + expect(result.passed).toBe(false); + }); + }); + + describe("applyPreCheckOutput", () => { + it("prepends output by default", () => { + const result = applyPreCheckOutput("original message", "check data", undefined); + expect(result).toContain("[Pre-check context]"); + expect(result).toContain("check data"); + expect(result).toContain("original message"); + // Pre-check comes before original + expect(result.indexOf("check data")).toBeLessThan(result.indexOf("original message")); + }); + + it("prepends output with explicit 'prepend' mode", () => { + const result = applyPreCheckOutput("original", "data", "prepend"); + expect(result).toContain("data"); + expect(result).toContain("original"); + }); + + it("replaces original with 'replace' mode", () => { + const result = applyPreCheckOutput("original", "replacement", "replace"); + expect(result).toBe("replacement"); + }); + + it("ignores output with 'ignore' mode", () => { + const result = applyPreCheckOutput("original", "ignored data", "ignore"); + expect(result).toBe("original"); + }); + }); +}); diff --git a/src/cron/pre-check.ts b/src/cron/pre-check.ts new file mode 100644 index 00000000000..974db5fd87d --- /dev/null +++ b/src/cron/pre-check.ts @@ -0,0 +1,110 @@ +/** + * Cron pre-check gate: execute a lightweight shell command before an agent turn. + * + * If the command exits 0 with non-empty stdout → job proceeds (stdout available as context). + * If the command exits non-zero or stdout is empty → job is skipped (no tokens spent). + * + * This saves tokens on recurring jobs that only need attention when something changed + * (e.g., new PRs, new emails, file changes, API status changes). + */ + +import { exec } from "node:child_process"; +import type { CronPreCheck } from "./types.js"; + +const DEFAULT_TIMEOUT_SECONDS = 30; +const MAX_OUTPUT_BYTES = 32_768; // 32 KB — enough context without overwhelming the prompt + +export type PreCheckResult = { passed: true; output: string } | { passed: false; reason: string }; + +/** + * Run a pre-check command. Returns `passed: true` with stdout if the gate + * passes, or `passed: false` with a reason if it should be skipped. + */ +export function runPreCheck( + preCheck: CronPreCheck, + opts?: { cwd?: string }, +): Promise { + const timeoutMs = (preCheck.timeoutSeconds ?? DEFAULT_TIMEOUT_SECONDS) * 1_000; + + return new Promise((resolve) => { + let settled = false; + const child = exec(preCheck.command, { + timeout: timeoutMs, + maxBuffer: MAX_OUTPUT_BYTES, + cwd: opts?.cwd, + env: { ...process.env }, + }); + + let stdout = ""; + let stderr = ""; + + child.stdout?.on("data", (chunk: string | Buffer) => { + stdout += String(chunk); + }); + + child.stderr?.on("data", (chunk: string | Buffer) => { + stderr += String(chunk); + }); + + child.on("error", (err) => { + if (settled) { + return; + } + settled = true; + resolve({ passed: false, reason: `preCheck error: ${err.message}` }); + }); + + child.on("close", (code, signal) => { + if (settled) { + return; + } + settled = true; + if (signal === "SIGTERM") { + resolve({ + passed: false, + reason: `preCheck timed out after ${preCheck.timeoutSeconds ?? DEFAULT_TIMEOUT_SECONDS}s`, + }); + return; + } + + if (code !== 0) { + const hint = stderr.trim() ? ` (stderr: ${stderr.trim().slice(0, 200)})` : ""; + resolve({ passed: false, reason: `preCheck exited with code ${code}${hint}` }); + return; + } + + const trimmed = stdout.trim(); + if (!trimmed) { + resolve({ passed: false, reason: "preCheck produced empty output (nothing to do)" }); + return; + } + + // Truncate if needed + const output = + trimmed.length > MAX_OUTPUT_BYTES + ? trimmed.slice(0, MAX_OUTPUT_BYTES) + "\n[truncated]" + : trimmed; + + resolve({ passed: true, output }); + }); + }); +} + +/** + * Apply the pre-check output to a payload message/text based on the outputMode. + */ +export function applyPreCheckOutput( + originalText: string, + preCheckOutput: string, + outputMode: CronPreCheck["outputMode"], +): string { + switch (outputMode ?? "prepend") { + case "replace": + return preCheckOutput; + case "ignore": + return originalText; + case "prepend": + default: + return `[Pre-check context]\n${preCheckOutput}\n\n${originalText}`; + } +} diff --git a/src/cron/service.pre-check.test.ts b/src/cron/service.pre-check.test.ts new file mode 100644 index 00000000000..c66e8bf70b9 --- /dev/null +++ b/src/cron/service.pre-check.test.ts @@ -0,0 +1,76 @@ +import { describe, expect, it } from "vitest"; +import type { CronJob, CronJobPatch } from "./types.js"; +import { applyJobPatch } from "./service/jobs.js"; + +describe("applyJobPatch with preCheck", () => { + const makeJob = (preCheck?: CronJob["preCheck"]): CronJob => ({ + id: "job-1", + name: "job-1", + enabled: true, + createdAtMs: Date.now(), + updatedAtMs: Date.now(), + schedule: { kind: "every", everyMs: 60_000 }, + sessionTarget: "isolated", + wakeMode: "now", + payload: { kind: "agentTurn", message: "do it" }, + preCheck, + state: {}, + }); + + it("adds preCheck to a job that had none", () => { + const job = makeJob(); + + const patch: CronJobPatch = { + preCheck: { command: "echo new" }, + }; + applyJobPatch(job, patch); + expect(job.preCheck?.command).toBe("echo new"); + }); + + it("merges preCheck fields (partial update)", () => { + const job = makeJob({ command: "echo original", timeoutSeconds: 30 }); + const patch: CronJobPatch = { + preCheck: { timeoutSeconds: 60 }, + }; + applyJobPatch(job, patch); + expect(job.preCheck?.command).toBe("echo original"); + expect(job.preCheck?.timeoutSeconds).toBe(60); + }); + + it("removes preCheck when patched with null", () => { + const job = makeJob({ command: "echo original" }); + expect(job.preCheck).toBeDefined(); + + const patch: CronJobPatch = { preCheck: null }; + applyJobPatch(job, patch); + expect(job.preCheck).toBeUndefined(); + }); + + it("replaces preCheck command while preserving other fields", () => { + const job = makeJob({ command: "echo old", timeoutSeconds: 30 }); + const patch: CronJobPatch = { + preCheck: { command: "gh pr list --json number" }, + }; + applyJobPatch(job, patch); + expect(job.preCheck?.command).toBe("gh pr list --json number"); + expect(job.preCheck?.timeoutSeconds).toBe(30); + }); + + it("sets outputMode via patch", () => { + const job = makeJob({ command: "echo data" }); + const patch: CronJobPatch = { + preCheck: { outputMode: "replace" }, + }; + applyJobPatch(job, patch); + expect(job.preCheck?.command).toBe("echo data"); + expect(job.preCheck?.outputMode).toBe("replace"); + }); + + it("does not touch preCheck when patch omits it", () => { + const job = makeJob({ command: "echo keep" }); + const patch: CronJobPatch = { name: "renamed" }; + applyJobPatch(job, patch); + expect(job.preCheck?.command).toBe("echo keep"); + expect(job.name).toBe("renamed"); + }); +}); diff --git a/src/cron/service/jobs.ts b/src/cron/service/jobs.ts index 542ba81053d..df6e019f448 100644 --- a/src/cron/service/jobs.ts +++ b/src/cron/service/jobs.ts @@ -20,6 +20,7 @@ import type { CronJobPatch, CronPayload, CronPayloadPatch, + CronPreCheck, } from "../types.js"; import { normalizeHttpWebhookUrl } from "../webhook-url.js"; import { resolveInitialCronDelivery } from "./initial-delivery.js"; @@ -554,6 +555,7 @@ export function createJob(state: CronServiceState, input: CronJobCreate): CronJo sessionTarget: input.sessionTarget, wakeMode: input.wakeMode, payload: input.payload, + preCheck: input.preCheck, delivery: resolveInitialCronDelivery(input), failureAlert: input.failureAlert, state: { @@ -644,6 +646,13 @@ export function applyJobPatch( if (patch.state) { job.state = { ...job.state, ...patch.state }; } + if ("preCheck" in patch) { + if (patch.preCheck === null) { + job.preCheck = undefined; + } else if (patch.preCheck) { + job.preCheck = { ...job.preCheck, ...patch.preCheck } as CronPreCheck; + } + } if ("agentId" in patch) { job.agentId = normalizeOptionalAgentId((patch as { agentId?: unknown }).agentId); } diff --git a/src/cron/service/timer.ts b/src/cron/service/timer.ts index e12c4ae38e7..3978ac34570 100644 --- a/src/cron/service/timer.ts +++ b/src/cron/service/timer.ts @@ -1,9 +1,6 @@ -import { resolveFailoverReasonFromError } from "../../agents/failover-error.js"; +import { dirname } from "node:path"; import type { CronConfig, CronRetryOn } from "../../config/types.cron.js"; import type { HeartbeatRunResult } from "../../infra/heartbeat-wake.js"; -import { DEFAULT_AGENT_ID } from "../../routing/session-key.js"; -import { resolveCronDeliveryPlan } from "../delivery.js"; -import { sweepCronRunSessions } from "../session-reaper.js"; import type { CronDeliveryStatus, CronJob, @@ -12,6 +9,12 @@ import type { CronRunStatus, CronRunTelemetry, } from "../types.js"; +import type { CronEvent, CronServiceState } from "./state.js"; +import { resolveFailoverReasonFromError } from "../../agents/failover-error.js"; +import { DEFAULT_AGENT_ID } from "../../routing/session-key.js"; +import { resolveCronDeliveryPlan } from "../delivery.js"; +import { runPreCheck, applyPreCheckOutput } from "../pre-check.js"; +import { sweepCronRunSessions } from "../session-reaper.js"; import { computeJobPreviousRunAtMs, computeJobNextRunAtMs, @@ -21,7 +24,6 @@ import { resolveJobPayloadTextForMain, } from "./jobs.js"; import { locked } from "./locked.js"; -import type { CronEvent, CronServiceState } from "./state.js"; import { ensureLoaded, persist } from "./store.js"; import { DEFAULT_JOB_TIMEOUT_MS, resolveCronJobTimeoutMs } from "./timeout-policy.js"; @@ -1009,6 +1011,29 @@ export async function executeJobCore( ): Promise< CronRunOutcome & CronRunTelemetry & { delivered?: boolean; deliveryAttempted?: boolean } > { + // ── Pre-check gate ───────────────────────────────────────────────── + // If the job has a preCheck, run the lightweight shell command first. + // Skip the entire agent turn (no tokens spent) if the gate fails. + let preCheckOutput: string | undefined; + if (job.preCheck?.command) { + // Run pre-check from the session store directory (near the agent workspace) + // or fall back to the gateway's working directory. + const cwd = state.deps.storePath ? dirname(state.deps.storePath) : undefined; + const result = await runPreCheck(job.preCheck, { cwd }); + if (!result.passed) { + state.deps.log.debug( + { jobId: job.id, jobName: job.name, reason: result.reason }, + "cron: preCheck gate failed, skipping job", + ); + return { status: "skipped", error: result.reason }; + } + preCheckOutput = result.output; + state.deps.log.debug( + { jobId: job.id, jobName: job.name, outputLen: preCheckOutput.length }, + "cron: preCheck gate passed", + ); + } + const resolveAbortError = () => ({ status: "error" as const, error: timeoutErrorMessage(), @@ -1039,7 +1064,7 @@ export async function executeJobCore( return resolveAbortError(); } if (job.sessionTarget === "main") { - const text = resolveJobPayloadTextForMain(job); + let text = resolveJobPayloadTextForMain(job); if (!text) { const kind = job.payload.kind; return { @@ -1050,6 +1075,10 @@ export async function executeJobCore( : 'main job requires payload.kind="systemEvent"', }; } + // Apply pre-check output to main session text + if (preCheckOutput) { + text = applyPreCheckOutput(text, preCheckOutput, job.preCheck?.outputMode); + } // Preserve the job session namespace for main-target reminders so heartbeat // routing can deliver follow-through in the originating channel/thread. // Downstream gateway wiring canonicalizes/guards this key per agent. @@ -1130,9 +1159,15 @@ export async function executeJobCore( return resolveAbortError(); } + // Apply pre-check output to isolated agent message + let agentMessage = job.payload.message; + if (preCheckOutput) { + agentMessage = applyPreCheckOutput(agentMessage, preCheckOutput, job.preCheck?.outputMode); + } + const res = await state.deps.runIsolatedAgentJob({ job, - message: job.payload.message, + message: agentMessage, abortSignal, }); diff --git a/src/cron/types.ts b/src/cron/types.ts index 02078d15424..4deae0a7c4e 100644 --- a/src/cron/types.ts +++ b/src/cron/types.ts @@ -1,6 +1,5 @@ import type { FailoverReason } from "../agents/pi-embedded-helpers.js"; import type { ChannelId } from "../channels/plugins/types.js"; -import type { CronJobBase } from "./types-shared.js"; export type CronSchedule = | { kind: "at"; at: string } @@ -43,6 +42,45 @@ export type CronDeliveryPatch = Partial; export type CronRunStatus = "ok" | "error" | "skipped"; export type CronDeliveryStatus = "delivered" | "not-delivered" | "unknown" | "not-requested"; +/** + * Pre-check gate: a lightweight shell command that runs before the agent turn. + * If the command exits 0 and produces non-empty stdout, the job proceeds with + * stdout as context. If it exits non-zero or produces empty stdout, the job + * is skipped — saving tokens when there's nothing to do. + * + * Example: `preCheck: { command: "gh pr list --state open --json number | jq 'if length > 0 then . else empty end'" }` + * Only wakes the agent when there are open PRs. + */ +export type CronPreCheck = { + /** Shell command to execute. Runs in the agent workspace directory. */ + command: string; + /** Timeout in seconds (default: 30). Killed + skipped on timeout. */ + timeoutSeconds?: number; + /** + * What to do with stdout when the check passes: + * - "prepend" (default): prepend stdout to the agent message/system-event as context + * - "replace": use stdout as the entire message (replaces payload text/message) + * - "ignore": discard stdout, just use the gate result + */ + outputMode?: "prepend" | "replace" | "ignore"; +}; + +export type CronPayload = + | { kind: "systemEvent"; text: string } + | { + kind: "agentTurn"; + message: string; + /** Optional model override (provider/model or alias). */ + model?: string; + thinking?: string; + timeoutSeconds?: number; + allowUnsafeExternalContent?: boolean; + deliver?: boolean; + channel?: CronMessageChannel; + to?: string; + bestEffortDeliver?: boolean; + }; + export type CronUsageSummary = { input_tokens?: number; output_tokens?: number; @@ -78,8 +116,6 @@ export type CronFailureAlert = { accountId?: string; }; -export type CronPayload = { kind: "systemEvent"; text: string } | CronAgentTurnPayload; - export type CronPayloadPatch = { kind: "systemEvent"; text?: string } | CronAgentTurnPayloadPatch; type CronAgentTurnPayloadFields = { @@ -99,10 +135,6 @@ type CronAgentTurnPayloadFields = { bestEffortDeliver?: boolean; }; -type CronAgentTurnPayload = { - kind: "agentTurn"; -} & CronAgentTurnPayloadFields; - type CronAgentTurnPayloadPatch = { kind: "agentTurn"; } & Partial; @@ -132,14 +164,24 @@ export type CronJobState = { lastDelivered?: boolean; }; -export type CronJob = CronJobBase< - CronSchedule, - CronSessionTarget, - CronWakeMode, - CronPayload, - CronDelivery, - CronFailureAlert | false -> & { +export type CronJob = { + id: string; + agentId?: string; + name: string; + description?: string; + enabled: boolean; + deleteAfterRun?: boolean; + createdAtMs: number; + updatedAtMs: number; + schedule: CronSchedule; + sessionTarget: CronSessionTarget; + wakeMode: CronWakeMode; + payload: CronPayload; + /** Optional pre-check gate. Runs a shell command before the agent turn; + * skips the job (no tokens spent) if the command fails or returns empty. */ + preCheck?: CronPreCheck; + delivery?: CronDelivery; + failureAlert?: CronFailureAlert | false; state: CronJobState; }; @@ -152,8 +194,13 @@ export type CronJobCreate = Omit; }; -export type CronJobPatch = Partial> & { +export type CronPreCheckPatch = Partial; + +export type CronJobPatch = Partial< + Omit +> & { payload?: CronPayloadPatch; + preCheck?: CronPreCheckPatch | null; delivery?: CronDeliveryPatch; state?: Partial; };