Merge e1ab5a4679a3725c40193c0bcf9ed333fbc3ab2a into 8a05c05596ca9ba0735dafd8e359885de4c2c969
This commit is contained in:
commit
8658545d59
@ -692,6 +692,68 @@ Immediate system event without creating a job:
|
|||||||
openclaw system event --mode now --text "Next heartbeat: check battery."
|
openclaw system event --mode now --text "Next heartbeat: check battery."
|
||||||
```
|
```
|
||||||
|
|
||||||
|
## Pre-check gate (skip when nothing changed)
|
||||||
|
|
||||||
|
Recurring jobs often wake an agent when there's nothing to act on — wasting tokens on "nothing new" responses. The **pre-check gate** runs a lightweight shell command before the agent turn; if the command fails or returns empty output, the job is skipped entirely.
|
||||||
|
|
||||||
|
```bash
|
||||||
|
openclaw cron add \
|
||||||
|
--name "PR review" \
|
||||||
|
--every "30m" \
|
||||||
|
--session isolated \
|
||||||
|
--message "Review these open PRs and flag any issues." \
|
||||||
|
--pre-check 'gh pr list --state open --json number | jq "if length > 0 then . else empty end"'
|
||||||
|
```
|
||||||
|
|
||||||
|
### How it works
|
||||||
|
|
||||||
|
| Pre-check result | Job outcome |
|
||||||
|
| ------------------------- | ---------------------------------------------------- |
|
||||||
|
| Exit 0 + non-empty stdout | Job proceeds — stdout optionally included as context |
|
||||||
|
| Exit non-zero | Job skipped (status: `skipped`) |
|
||||||
|
| Empty stdout | Job skipped (nothing to do) |
|
||||||
|
| Timeout (default: 30s) | Job skipped |
|
||||||
|
|
||||||
|
### Pre-check fields
|
||||||
|
|
||||||
|
| Field | Type | Default | Description |
|
||||||
|
| ---------------- | ---------------------------------------- | ----------- | -------------------------------------- |
|
||||||
|
| `command` | string | _required_ | Shell command to execute |
|
||||||
|
| `timeoutSeconds` | number | `30` | Kill and skip on timeout |
|
||||||
|
| `outputMode` | `"prepend"` \| `"replace"` \| `"ignore"` | `"prepend"` | How to use stdout in the agent message |
|
||||||
|
|
||||||
|
### Output modes
|
||||||
|
|
||||||
|
- **prepend** (default): Stdout is prepended to the agent message as `[Pre-check context]`, giving the agent the data without a separate lookup.
|
||||||
|
- **replace**: Stdout becomes the entire agent message (useful when the script produces the full prompt).
|
||||||
|
- **ignore**: Only the pass/fail gate matters; stdout is discarded.
|
||||||
|
|
||||||
|
### Examples
|
||||||
|
|
||||||
|
```bash
|
||||||
|
# Only summarize when new mail arrived (check returns count)
|
||||||
|
--pre-check 'python3 -c "import imaplib; m=imaplib.IMAP4_SSL(\"imap.gmail.com\"); m.login(\"...\",\"...\"); print(len(m.search(None,\"UNSEEN\")[1][0].split()))"'
|
||||||
|
|
||||||
|
# Only alert when disk usage exceeds 80%
|
||||||
|
--pre-check 'df -h / | awk "NR==2 {gsub(/%/,\"\",\$5); if(\$5>80) print \$5\"% used\"; else exit 1}"'
|
||||||
|
|
||||||
|
# Only process when a file changed since last check
|
||||||
|
--pre-check 'find /data -newer /tmp/.last-check -type f | head -5'
|
||||||
|
|
||||||
|
# Only wake when API returns non-empty results
|
||||||
|
--pre-check 'curl -sf https://api.example.com/pending | jq "if length > 0 then . else empty end"'
|
||||||
|
```
|
||||||
|
|
||||||
|
### Updating or removing a pre-check
|
||||||
|
|
||||||
|
```bash
|
||||||
|
# Update the command
|
||||||
|
openclaw cron update <job-id> --pre-check 'new-command-here'
|
||||||
|
|
||||||
|
# Remove the pre-check (job always runs)
|
||||||
|
openclaw cron update <job-id> --pre-check ''
|
||||||
|
```
|
||||||
|
|
||||||
## Gateway API surface
|
## Gateway API surface
|
||||||
|
|
||||||
- `cron.list`, `cron.status`, `cron.add`, `cron.update`, `cron.remove`
|
- `cron.list`, `cron.status`, `cron.add`, `cron.update`, `cron.remove`
|
||||||
|
|||||||
@ -406,6 +406,29 @@ export function normalizeCronJobInput(
|
|||||||
next.delivery = coerceDelivery(base.delivery);
|
next.delivery = coerceDelivery(base.delivery);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Pre-check gate normalization
|
||||||
|
if (base.preCheck === null) {
|
||||||
|
next.preCheck = null; // explicit removal in patch
|
||||||
|
} else if (isRecord(base.preCheck)) {
|
||||||
|
const pc = base.preCheck;
|
||||||
|
const preCheck: Record<string, unknown> = {};
|
||||||
|
if (typeof pc.command === "string" && pc.command.trim()) {
|
||||||
|
preCheck.command = pc.command.trim();
|
||||||
|
}
|
||||||
|
if (typeof pc.timeoutSeconds === "number" && Number.isFinite(pc.timeoutSeconds)) {
|
||||||
|
preCheck.timeoutSeconds = Math.max(1, Math.floor(pc.timeoutSeconds));
|
||||||
|
}
|
||||||
|
if (typeof pc.outputMode === "string") {
|
||||||
|
const mode = pc.outputMode.trim().toLowerCase();
|
||||||
|
if (mode === "prepend" || mode === "replace" || mode === "ignore") {
|
||||||
|
preCheck.outputMode = mode;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (preCheck.command) {
|
||||||
|
next.preCheck = preCheck;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
if ("isolation" in next) {
|
if ("isolation" in next) {
|
||||||
delete next.isolation;
|
delete next.isolation;
|
||||||
}
|
}
|
||||||
|
|||||||
104
src/cron/pre-check.test.ts
Normal file
104
src/cron/pre-check.test.ts
Normal file
@ -0,0 +1,104 @@
|
|||||||
|
import { describe, it, expect } from "vitest";
|
||||||
|
import { runPreCheck, applyPreCheckOutput } from "./pre-check.js";
|
||||||
|
|
||||||
|
const isWindows = process.platform === "win32";
|
||||||
|
|
||||||
|
describe("cron pre-check gate", () => {
|
||||||
|
describe("runPreCheck", () => {
|
||||||
|
it("passes when command exits 0 with output", async () => {
|
||||||
|
const result = await runPreCheck({ command: 'echo "hello world"' });
|
||||||
|
expect(result.passed).toBe(true);
|
||||||
|
if (result.passed) {
|
||||||
|
expect(result.output).toContain("hello world");
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
it("fails when command exits non-zero", async () => {
|
||||||
|
const cmd = isWindows ? "exit /b 1" : "exit 1";
|
||||||
|
const result = await runPreCheck({ command: cmd });
|
||||||
|
expect(result.passed).toBe(false);
|
||||||
|
if (!result.passed) {
|
||||||
|
expect(result.reason).toContain("exited with code 1");
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
it("fails when command produces empty stdout", async () => {
|
||||||
|
// node -e "" produces no output on all platforms
|
||||||
|
const result = await runPreCheck({ command: 'node -e ""' });
|
||||||
|
expect(result.passed).toBe(false);
|
||||||
|
if (!result.passed) {
|
||||||
|
expect(result.reason).toContain("empty output");
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
it("fails on timeout", async () => {
|
||||||
|
const cmd = isWindows ? "ping -n 11 127.0.0.1 > nul" : "sleep 10";
|
||||||
|
const result = await runPreCheck({
|
||||||
|
command: cmd,
|
||||||
|
timeoutSeconds: 1,
|
||||||
|
});
|
||||||
|
expect(result.passed).toBe(false);
|
||||||
|
if (!result.passed) {
|
||||||
|
expect(result.reason).toMatch(/timed out|error/i);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
it("passes with multi-line output", async () => {
|
||||||
|
const cmd = isWindows
|
||||||
|
? "node -e \"console.log('line1');console.log('line2');console.log('line3')\""
|
||||||
|
: 'echo "line1"; echo "line2"; echo "line3"';
|
||||||
|
const result = await runPreCheck({ command: cmd });
|
||||||
|
expect(result.passed).toBe(true);
|
||||||
|
if (result.passed) {
|
||||||
|
expect(result.output).toContain("line1");
|
||||||
|
expect(result.output).toContain("line3");
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
it("includes stderr hint on non-zero exit", async () => {
|
||||||
|
const cmd = isWindows
|
||||||
|
? "node -e \"process.stderr.write('oops');process.exit(2)\""
|
||||||
|
: 'echo "oops" >&2; exit 2';
|
||||||
|
const result = await runPreCheck({ command: cmd });
|
||||||
|
expect(result.passed).toBe(false);
|
||||||
|
if (!result.passed) {
|
||||||
|
expect(result.reason).toContain("oops");
|
||||||
|
expect(result.reason).toContain("code 2");
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
it("handles command not found", async () => {
|
||||||
|
const result = await runPreCheck({
|
||||||
|
command: "nonexistent_command_xyz_12345",
|
||||||
|
});
|
||||||
|
expect(result.passed).toBe(false);
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
describe("applyPreCheckOutput", () => {
|
||||||
|
it("prepends output by default", () => {
|
||||||
|
const result = applyPreCheckOutput("original message", "check data", undefined);
|
||||||
|
expect(result).toContain("[Pre-check context]");
|
||||||
|
expect(result).toContain("check data");
|
||||||
|
expect(result).toContain("original message");
|
||||||
|
// Pre-check comes before original
|
||||||
|
expect(result.indexOf("check data")).toBeLessThan(result.indexOf("original message"));
|
||||||
|
});
|
||||||
|
|
||||||
|
it("prepends output with explicit 'prepend' mode", () => {
|
||||||
|
const result = applyPreCheckOutput("original", "data", "prepend");
|
||||||
|
expect(result).toContain("data");
|
||||||
|
expect(result).toContain("original");
|
||||||
|
});
|
||||||
|
|
||||||
|
it("replaces original with 'replace' mode", () => {
|
||||||
|
const result = applyPreCheckOutput("original", "replacement", "replace");
|
||||||
|
expect(result).toBe("replacement");
|
||||||
|
});
|
||||||
|
|
||||||
|
it("ignores output with 'ignore' mode", () => {
|
||||||
|
const result = applyPreCheckOutput("original", "ignored data", "ignore");
|
||||||
|
expect(result).toBe("original");
|
||||||
|
});
|
||||||
|
});
|
||||||
|
});
|
||||||
110
src/cron/pre-check.ts
Normal file
110
src/cron/pre-check.ts
Normal file
@ -0,0 +1,110 @@
|
|||||||
|
/**
|
||||||
|
* Cron pre-check gate: execute a lightweight shell command before an agent turn.
|
||||||
|
*
|
||||||
|
* If the command exits 0 with non-empty stdout → job proceeds (stdout available as context).
|
||||||
|
* If the command exits non-zero or stdout is empty → job is skipped (no tokens spent).
|
||||||
|
*
|
||||||
|
* This saves tokens on recurring jobs that only need attention when something changed
|
||||||
|
* (e.g., new PRs, new emails, file changes, API status changes).
|
||||||
|
*/
|
||||||
|
|
||||||
|
import { exec } from "node:child_process";
|
||||||
|
import type { CronPreCheck } from "./types.js";
|
||||||
|
|
||||||
|
const DEFAULT_TIMEOUT_SECONDS = 30;
|
||||||
|
const MAX_OUTPUT_BYTES = 32_768; // 32 KB — enough context without overwhelming the prompt
|
||||||
|
|
||||||
|
export type PreCheckResult = { passed: true; output: string } | { passed: false; reason: string };
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Run a pre-check command. Returns `passed: true` with stdout if the gate
|
||||||
|
* passes, or `passed: false` with a reason if it should be skipped.
|
||||||
|
*/
|
||||||
|
export function runPreCheck(
|
||||||
|
preCheck: CronPreCheck,
|
||||||
|
opts?: { cwd?: string },
|
||||||
|
): Promise<PreCheckResult> {
|
||||||
|
const timeoutMs = (preCheck.timeoutSeconds ?? DEFAULT_TIMEOUT_SECONDS) * 1_000;
|
||||||
|
|
||||||
|
return new Promise((resolve) => {
|
||||||
|
let settled = false;
|
||||||
|
const child = exec(preCheck.command, {
|
||||||
|
timeout: timeoutMs,
|
||||||
|
maxBuffer: MAX_OUTPUT_BYTES,
|
||||||
|
cwd: opts?.cwd,
|
||||||
|
env: { ...process.env },
|
||||||
|
});
|
||||||
|
|
||||||
|
let stdout = "";
|
||||||
|
let stderr = "";
|
||||||
|
|
||||||
|
child.stdout?.on("data", (chunk: string | Buffer) => {
|
||||||
|
stdout += String(chunk);
|
||||||
|
});
|
||||||
|
|
||||||
|
child.stderr?.on("data", (chunk: string | Buffer) => {
|
||||||
|
stderr += String(chunk);
|
||||||
|
});
|
||||||
|
|
||||||
|
child.on("error", (err) => {
|
||||||
|
if (settled) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
settled = true;
|
||||||
|
resolve({ passed: false, reason: `preCheck error: ${err.message}` });
|
||||||
|
});
|
||||||
|
|
||||||
|
child.on("close", (code, signal) => {
|
||||||
|
if (settled) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
settled = true;
|
||||||
|
if (signal === "SIGTERM") {
|
||||||
|
resolve({
|
||||||
|
passed: false,
|
||||||
|
reason: `preCheck timed out after ${preCheck.timeoutSeconds ?? DEFAULT_TIMEOUT_SECONDS}s`,
|
||||||
|
});
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (code !== 0) {
|
||||||
|
const hint = stderr.trim() ? ` (stderr: ${stderr.trim().slice(0, 200)})` : "";
|
||||||
|
resolve({ passed: false, reason: `preCheck exited with code ${code}${hint}` });
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
const trimmed = stdout.trim();
|
||||||
|
if (!trimmed) {
|
||||||
|
resolve({ passed: false, reason: "preCheck produced empty output (nothing to do)" });
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Truncate if needed
|
||||||
|
const output =
|
||||||
|
trimmed.length > MAX_OUTPUT_BYTES
|
||||||
|
? trimmed.slice(0, MAX_OUTPUT_BYTES) + "\n[truncated]"
|
||||||
|
: trimmed;
|
||||||
|
|
||||||
|
resolve({ passed: true, output });
|
||||||
|
});
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Apply the pre-check output to a payload message/text based on the outputMode.
|
||||||
|
*/
|
||||||
|
export function applyPreCheckOutput(
|
||||||
|
originalText: string,
|
||||||
|
preCheckOutput: string,
|
||||||
|
outputMode: CronPreCheck["outputMode"],
|
||||||
|
): string {
|
||||||
|
switch (outputMode ?? "prepend") {
|
||||||
|
case "replace":
|
||||||
|
return preCheckOutput;
|
||||||
|
case "ignore":
|
||||||
|
return originalText;
|
||||||
|
case "prepend":
|
||||||
|
default:
|
||||||
|
return `[Pre-check context]\n${preCheckOutput}\n\n${originalText}`;
|
||||||
|
}
|
||||||
|
}
|
||||||
76
src/cron/service.pre-check.test.ts
Normal file
76
src/cron/service.pre-check.test.ts
Normal file
@ -0,0 +1,76 @@
|
|||||||
|
import { describe, expect, it } from "vitest";
|
||||||
|
import type { CronJob, CronJobPatch } from "./types.js";
|
||||||
|
import { applyJobPatch } from "./service/jobs.js";
|
||||||
|
|
||||||
|
describe("applyJobPatch with preCheck", () => {
|
||||||
|
const makeJob = (preCheck?: CronJob["preCheck"]): CronJob => ({
|
||||||
|
id: "job-1",
|
||||||
|
name: "job-1",
|
||||||
|
enabled: true,
|
||||||
|
createdAtMs: Date.now(),
|
||||||
|
updatedAtMs: Date.now(),
|
||||||
|
schedule: { kind: "every", everyMs: 60_000 },
|
||||||
|
sessionTarget: "isolated",
|
||||||
|
wakeMode: "now",
|
||||||
|
payload: { kind: "agentTurn", message: "do it" },
|
||||||
|
preCheck,
|
||||||
|
state: {},
|
||||||
|
});
|
||||||
|
|
||||||
|
it("adds preCheck to a job that had none", () => {
|
||||||
|
const job = makeJob();
|
||||||
|
|
||||||
|
const patch: CronJobPatch = {
|
||||||
|
preCheck: { command: "echo new" },
|
||||||
|
};
|
||||||
|
applyJobPatch(job, patch);
|
||||||
|
expect(job.preCheck?.command).toBe("echo new");
|
||||||
|
});
|
||||||
|
|
||||||
|
it("merges preCheck fields (partial update)", () => {
|
||||||
|
const job = makeJob({ command: "echo original", timeoutSeconds: 30 });
|
||||||
|
const patch: CronJobPatch = {
|
||||||
|
preCheck: { timeoutSeconds: 60 },
|
||||||
|
};
|
||||||
|
applyJobPatch(job, patch);
|
||||||
|
expect(job.preCheck?.command).toBe("echo original");
|
||||||
|
expect(job.preCheck?.timeoutSeconds).toBe(60);
|
||||||
|
});
|
||||||
|
|
||||||
|
it("removes preCheck when patched with null", () => {
|
||||||
|
const job = makeJob({ command: "echo original" });
|
||||||
|
expect(job.preCheck).toBeDefined();
|
||||||
|
|
||||||
|
const patch: CronJobPatch = { preCheck: null };
|
||||||
|
applyJobPatch(job, patch);
|
||||||
|
expect(job.preCheck).toBeUndefined();
|
||||||
|
});
|
||||||
|
|
||||||
|
it("replaces preCheck command while preserving other fields", () => {
|
||||||
|
const job = makeJob({ command: "echo old", timeoutSeconds: 30 });
|
||||||
|
const patch: CronJobPatch = {
|
||||||
|
preCheck: { command: "gh pr list --json number" },
|
||||||
|
};
|
||||||
|
applyJobPatch(job, patch);
|
||||||
|
expect(job.preCheck?.command).toBe("gh pr list --json number");
|
||||||
|
expect(job.preCheck?.timeoutSeconds).toBe(30);
|
||||||
|
});
|
||||||
|
|
||||||
|
it("sets outputMode via patch", () => {
|
||||||
|
const job = makeJob({ command: "echo data" });
|
||||||
|
const patch: CronJobPatch = {
|
||||||
|
preCheck: { outputMode: "replace" },
|
||||||
|
};
|
||||||
|
applyJobPatch(job, patch);
|
||||||
|
expect(job.preCheck?.command).toBe("echo data");
|
||||||
|
expect(job.preCheck?.outputMode).toBe("replace");
|
||||||
|
});
|
||||||
|
|
||||||
|
it("does not touch preCheck when patch omits it", () => {
|
||||||
|
const job = makeJob({ command: "echo keep" });
|
||||||
|
const patch: CronJobPatch = { name: "renamed" };
|
||||||
|
applyJobPatch(job, patch);
|
||||||
|
expect(job.preCheck?.command).toBe("echo keep");
|
||||||
|
expect(job.name).toBe("renamed");
|
||||||
|
});
|
||||||
|
});
|
||||||
@ -20,6 +20,7 @@ import type {
|
|||||||
CronJobPatch,
|
CronJobPatch,
|
||||||
CronPayload,
|
CronPayload,
|
||||||
CronPayloadPatch,
|
CronPayloadPatch,
|
||||||
|
CronPreCheck,
|
||||||
} from "../types.js";
|
} from "../types.js";
|
||||||
import { normalizeHttpWebhookUrl } from "../webhook-url.js";
|
import { normalizeHttpWebhookUrl } from "../webhook-url.js";
|
||||||
import { resolveInitialCronDelivery } from "./initial-delivery.js";
|
import { resolveInitialCronDelivery } from "./initial-delivery.js";
|
||||||
@ -554,6 +555,7 @@ export function createJob(state: CronServiceState, input: CronJobCreate): CronJo
|
|||||||
sessionTarget: input.sessionTarget,
|
sessionTarget: input.sessionTarget,
|
||||||
wakeMode: input.wakeMode,
|
wakeMode: input.wakeMode,
|
||||||
payload: input.payload,
|
payload: input.payload,
|
||||||
|
preCheck: input.preCheck,
|
||||||
delivery: resolveInitialCronDelivery(input),
|
delivery: resolveInitialCronDelivery(input),
|
||||||
failureAlert: input.failureAlert,
|
failureAlert: input.failureAlert,
|
||||||
state: {
|
state: {
|
||||||
@ -644,6 +646,13 @@ export function applyJobPatch(
|
|||||||
if (patch.state) {
|
if (patch.state) {
|
||||||
job.state = { ...job.state, ...patch.state };
|
job.state = { ...job.state, ...patch.state };
|
||||||
}
|
}
|
||||||
|
if ("preCheck" in patch) {
|
||||||
|
if (patch.preCheck === null) {
|
||||||
|
job.preCheck = undefined;
|
||||||
|
} else if (patch.preCheck) {
|
||||||
|
job.preCheck = { ...job.preCheck, ...patch.preCheck } as CronPreCheck;
|
||||||
|
}
|
||||||
|
}
|
||||||
if ("agentId" in patch) {
|
if ("agentId" in patch) {
|
||||||
job.agentId = normalizeOptionalAgentId((patch as { agentId?: unknown }).agentId);
|
job.agentId = normalizeOptionalAgentId((patch as { agentId?: unknown }).agentId);
|
||||||
}
|
}
|
||||||
|
|||||||
@ -1,9 +1,6 @@
|
|||||||
import { resolveFailoverReasonFromError } from "../../agents/failover-error.js";
|
import { dirname } from "node:path";
|
||||||
import type { CronConfig, CronRetryOn } from "../../config/types.cron.js";
|
import type { CronConfig, CronRetryOn } from "../../config/types.cron.js";
|
||||||
import type { HeartbeatRunResult } from "../../infra/heartbeat-wake.js";
|
import type { HeartbeatRunResult } from "../../infra/heartbeat-wake.js";
|
||||||
import { DEFAULT_AGENT_ID } from "../../routing/session-key.js";
|
|
||||||
import { resolveCronDeliveryPlan } from "../delivery.js";
|
|
||||||
import { sweepCronRunSessions } from "../session-reaper.js";
|
|
||||||
import type {
|
import type {
|
||||||
CronDeliveryStatus,
|
CronDeliveryStatus,
|
||||||
CronJob,
|
CronJob,
|
||||||
@ -12,6 +9,12 @@ import type {
|
|||||||
CronRunStatus,
|
CronRunStatus,
|
||||||
CronRunTelemetry,
|
CronRunTelemetry,
|
||||||
} from "../types.js";
|
} from "../types.js";
|
||||||
|
import type { CronEvent, CronServiceState } from "./state.js";
|
||||||
|
import { resolveFailoverReasonFromError } from "../../agents/failover-error.js";
|
||||||
|
import { DEFAULT_AGENT_ID } from "../../routing/session-key.js";
|
||||||
|
import { resolveCronDeliveryPlan } from "../delivery.js";
|
||||||
|
import { runPreCheck, applyPreCheckOutput } from "../pre-check.js";
|
||||||
|
import { sweepCronRunSessions } from "../session-reaper.js";
|
||||||
import {
|
import {
|
||||||
computeJobPreviousRunAtMs,
|
computeJobPreviousRunAtMs,
|
||||||
computeJobNextRunAtMs,
|
computeJobNextRunAtMs,
|
||||||
@ -21,7 +24,6 @@ import {
|
|||||||
resolveJobPayloadTextForMain,
|
resolveJobPayloadTextForMain,
|
||||||
} from "./jobs.js";
|
} from "./jobs.js";
|
||||||
import { locked } from "./locked.js";
|
import { locked } from "./locked.js";
|
||||||
import type { CronEvent, CronServiceState } from "./state.js";
|
|
||||||
import { ensureLoaded, persist } from "./store.js";
|
import { ensureLoaded, persist } from "./store.js";
|
||||||
import { DEFAULT_JOB_TIMEOUT_MS, resolveCronJobTimeoutMs } from "./timeout-policy.js";
|
import { DEFAULT_JOB_TIMEOUT_MS, resolveCronJobTimeoutMs } from "./timeout-policy.js";
|
||||||
|
|
||||||
@ -1009,6 +1011,29 @@ export async function executeJobCore(
|
|||||||
): Promise<
|
): Promise<
|
||||||
CronRunOutcome & CronRunTelemetry & { delivered?: boolean; deliveryAttempted?: boolean }
|
CronRunOutcome & CronRunTelemetry & { delivered?: boolean; deliveryAttempted?: boolean }
|
||||||
> {
|
> {
|
||||||
|
// ── Pre-check gate ─────────────────────────────────────────────────
|
||||||
|
// If the job has a preCheck, run the lightweight shell command first.
|
||||||
|
// Skip the entire agent turn (no tokens spent) if the gate fails.
|
||||||
|
let preCheckOutput: string | undefined;
|
||||||
|
if (job.preCheck?.command) {
|
||||||
|
// Run pre-check from the session store directory (near the agent workspace)
|
||||||
|
// or fall back to the gateway's working directory.
|
||||||
|
const cwd = state.deps.storePath ? dirname(state.deps.storePath) : undefined;
|
||||||
|
const result = await runPreCheck(job.preCheck, { cwd });
|
||||||
|
if (!result.passed) {
|
||||||
|
state.deps.log.debug(
|
||||||
|
{ jobId: job.id, jobName: job.name, reason: result.reason },
|
||||||
|
"cron: preCheck gate failed, skipping job",
|
||||||
|
);
|
||||||
|
return { status: "skipped", error: result.reason };
|
||||||
|
}
|
||||||
|
preCheckOutput = result.output;
|
||||||
|
state.deps.log.debug(
|
||||||
|
{ jobId: job.id, jobName: job.name, outputLen: preCheckOutput.length },
|
||||||
|
"cron: preCheck gate passed",
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
const resolveAbortError = () => ({
|
const resolveAbortError = () => ({
|
||||||
status: "error" as const,
|
status: "error" as const,
|
||||||
error: timeoutErrorMessage(),
|
error: timeoutErrorMessage(),
|
||||||
@ -1039,7 +1064,7 @@ export async function executeJobCore(
|
|||||||
return resolveAbortError();
|
return resolveAbortError();
|
||||||
}
|
}
|
||||||
if (job.sessionTarget === "main") {
|
if (job.sessionTarget === "main") {
|
||||||
const text = resolveJobPayloadTextForMain(job);
|
let text = resolveJobPayloadTextForMain(job);
|
||||||
if (!text) {
|
if (!text) {
|
||||||
const kind = job.payload.kind;
|
const kind = job.payload.kind;
|
||||||
return {
|
return {
|
||||||
@ -1050,6 +1075,10 @@ export async function executeJobCore(
|
|||||||
: 'main job requires payload.kind="systemEvent"',
|
: 'main job requires payload.kind="systemEvent"',
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
// Apply pre-check output to main session text
|
||||||
|
if (preCheckOutput) {
|
||||||
|
text = applyPreCheckOutput(text, preCheckOutput, job.preCheck?.outputMode);
|
||||||
|
}
|
||||||
// Preserve the job session namespace for main-target reminders so heartbeat
|
// Preserve the job session namespace for main-target reminders so heartbeat
|
||||||
// routing can deliver follow-through in the originating channel/thread.
|
// routing can deliver follow-through in the originating channel/thread.
|
||||||
// Downstream gateway wiring canonicalizes/guards this key per agent.
|
// Downstream gateway wiring canonicalizes/guards this key per agent.
|
||||||
@ -1130,9 +1159,15 @@ export async function executeJobCore(
|
|||||||
return resolveAbortError();
|
return resolveAbortError();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Apply pre-check output to isolated agent message
|
||||||
|
let agentMessage = job.payload.message;
|
||||||
|
if (preCheckOutput) {
|
||||||
|
agentMessage = applyPreCheckOutput(agentMessage, preCheckOutput, job.preCheck?.outputMode);
|
||||||
|
}
|
||||||
|
|
||||||
const res = await state.deps.runIsolatedAgentJob({
|
const res = await state.deps.runIsolatedAgentJob({
|
||||||
job,
|
job,
|
||||||
message: job.payload.message,
|
message: agentMessage,
|
||||||
abortSignal,
|
abortSignal,
|
||||||
});
|
});
|
||||||
|
|
||||||
|
|||||||
@ -1,6 +1,5 @@
|
|||||||
import type { FailoverReason } from "../agents/pi-embedded-helpers.js";
|
import type { FailoverReason } from "../agents/pi-embedded-helpers.js";
|
||||||
import type { ChannelId } from "../channels/plugins/types.js";
|
import type { ChannelId } from "../channels/plugins/types.js";
|
||||||
import type { CronJobBase } from "./types-shared.js";
|
|
||||||
|
|
||||||
export type CronSchedule =
|
export type CronSchedule =
|
||||||
| { kind: "at"; at: string }
|
| { kind: "at"; at: string }
|
||||||
@ -43,6 +42,45 @@ export type CronDeliveryPatch = Partial<CronDelivery>;
|
|||||||
export type CronRunStatus = "ok" | "error" | "skipped";
|
export type CronRunStatus = "ok" | "error" | "skipped";
|
||||||
export type CronDeliveryStatus = "delivered" | "not-delivered" | "unknown" | "not-requested";
|
export type CronDeliveryStatus = "delivered" | "not-delivered" | "unknown" | "not-requested";
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Pre-check gate: a lightweight shell command that runs before the agent turn.
|
||||||
|
* If the command exits 0 and produces non-empty stdout, the job proceeds with
|
||||||
|
* stdout as context. If it exits non-zero or produces empty stdout, the job
|
||||||
|
* is skipped — saving tokens when there's nothing to do.
|
||||||
|
*
|
||||||
|
* Example: `preCheck: { command: "gh pr list --state open --json number | jq 'if length > 0 then . else empty end'" }`
|
||||||
|
* Only wakes the agent when there are open PRs.
|
||||||
|
*/
|
||||||
|
export type CronPreCheck = {
|
||||||
|
/** Shell command to execute. Runs in the agent workspace directory. */
|
||||||
|
command: string;
|
||||||
|
/** Timeout in seconds (default: 30). Killed + skipped on timeout. */
|
||||||
|
timeoutSeconds?: number;
|
||||||
|
/**
|
||||||
|
* What to do with stdout when the check passes:
|
||||||
|
* - "prepend" (default): prepend stdout to the agent message/system-event as context
|
||||||
|
* - "replace": use stdout as the entire message (replaces payload text/message)
|
||||||
|
* - "ignore": discard stdout, just use the gate result
|
||||||
|
*/
|
||||||
|
outputMode?: "prepend" | "replace" | "ignore";
|
||||||
|
};
|
||||||
|
|
||||||
|
export type CronPayload =
|
||||||
|
| { kind: "systemEvent"; text: string }
|
||||||
|
| {
|
||||||
|
kind: "agentTurn";
|
||||||
|
message: string;
|
||||||
|
/** Optional model override (provider/model or alias). */
|
||||||
|
model?: string;
|
||||||
|
thinking?: string;
|
||||||
|
timeoutSeconds?: number;
|
||||||
|
allowUnsafeExternalContent?: boolean;
|
||||||
|
deliver?: boolean;
|
||||||
|
channel?: CronMessageChannel;
|
||||||
|
to?: string;
|
||||||
|
bestEffortDeliver?: boolean;
|
||||||
|
};
|
||||||
|
|
||||||
export type CronUsageSummary = {
|
export type CronUsageSummary = {
|
||||||
input_tokens?: number;
|
input_tokens?: number;
|
||||||
output_tokens?: number;
|
output_tokens?: number;
|
||||||
@ -78,8 +116,6 @@ export type CronFailureAlert = {
|
|||||||
accountId?: string;
|
accountId?: string;
|
||||||
};
|
};
|
||||||
|
|
||||||
export type CronPayload = { kind: "systemEvent"; text: string } | CronAgentTurnPayload;
|
|
||||||
|
|
||||||
export type CronPayloadPatch = { kind: "systemEvent"; text?: string } | CronAgentTurnPayloadPatch;
|
export type CronPayloadPatch = { kind: "systemEvent"; text?: string } | CronAgentTurnPayloadPatch;
|
||||||
|
|
||||||
type CronAgentTurnPayloadFields = {
|
type CronAgentTurnPayloadFields = {
|
||||||
@ -99,10 +135,6 @@ type CronAgentTurnPayloadFields = {
|
|||||||
bestEffortDeliver?: boolean;
|
bestEffortDeliver?: boolean;
|
||||||
};
|
};
|
||||||
|
|
||||||
type CronAgentTurnPayload = {
|
|
||||||
kind: "agentTurn";
|
|
||||||
} & CronAgentTurnPayloadFields;
|
|
||||||
|
|
||||||
type CronAgentTurnPayloadPatch = {
|
type CronAgentTurnPayloadPatch = {
|
||||||
kind: "agentTurn";
|
kind: "agentTurn";
|
||||||
} & Partial<CronAgentTurnPayloadFields>;
|
} & Partial<CronAgentTurnPayloadFields>;
|
||||||
@ -132,14 +164,24 @@ export type CronJobState = {
|
|||||||
lastDelivered?: boolean;
|
lastDelivered?: boolean;
|
||||||
};
|
};
|
||||||
|
|
||||||
export type CronJob = CronJobBase<
|
export type CronJob = {
|
||||||
CronSchedule,
|
id: string;
|
||||||
CronSessionTarget,
|
agentId?: string;
|
||||||
CronWakeMode,
|
name: string;
|
||||||
CronPayload,
|
description?: string;
|
||||||
CronDelivery,
|
enabled: boolean;
|
||||||
CronFailureAlert | false
|
deleteAfterRun?: boolean;
|
||||||
> & {
|
createdAtMs: number;
|
||||||
|
updatedAtMs: number;
|
||||||
|
schedule: CronSchedule;
|
||||||
|
sessionTarget: CronSessionTarget;
|
||||||
|
wakeMode: CronWakeMode;
|
||||||
|
payload: CronPayload;
|
||||||
|
/** Optional pre-check gate. Runs a shell command before the agent turn;
|
||||||
|
* skips the job (no tokens spent) if the command fails or returns empty. */
|
||||||
|
preCheck?: CronPreCheck;
|
||||||
|
delivery?: CronDelivery;
|
||||||
|
failureAlert?: CronFailureAlert | false;
|
||||||
state: CronJobState;
|
state: CronJobState;
|
||||||
};
|
};
|
||||||
|
|
||||||
@ -152,8 +194,13 @@ export type CronJobCreate = Omit<CronJob, "id" | "createdAtMs" | "updatedAtMs" |
|
|||||||
state?: Partial<CronJobState>;
|
state?: Partial<CronJobState>;
|
||||||
};
|
};
|
||||||
|
|
||||||
export type CronJobPatch = Partial<Omit<CronJob, "id" | "createdAtMs" | "state" | "payload">> & {
|
export type CronPreCheckPatch = Partial<CronPreCheck>;
|
||||||
|
|
||||||
|
export type CronJobPatch = Partial<
|
||||||
|
Omit<CronJob, "id" | "createdAtMs" | "state" | "payload" | "preCheck">
|
||||||
|
> & {
|
||||||
payload?: CronPayloadPatch;
|
payload?: CronPayloadPatch;
|
||||||
|
preCheck?: CronPreCheckPatch | null;
|
||||||
delivery?: CronDeliveryPatch;
|
delivery?: CronDeliveryPatch;
|
||||||
state?: Partial<CronJobState>;
|
state?: Partial<CronJobState>;
|
||||||
};
|
};
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user