Merge d8f4873df414218e1cf0cdd9a48dfa7407ea43f1 into 6b4c24c2e55b5b4013277bd799525086f6a0c40f
This commit is contained in:
commit
65122d2227
@ -10,6 +10,7 @@ public enum ErrorCode: String, Codable, Sendable {
|
||||
case agentTimeout = "AGENT_TIMEOUT"
|
||||
case invalidRequest = "INVALID_REQUEST"
|
||||
case unavailable = "UNAVAILABLE"
|
||||
case permissionDenied = "PERMISSION_DENIED"
|
||||
}
|
||||
|
||||
public struct ConnectParams: Codable, Sendable {
|
||||
@ -2910,6 +2911,7 @@ public struct CronListParams: Codable, Sendable {
|
||||
public let enabled: AnyCodable?
|
||||
public let sortby: AnyCodable?
|
||||
public let sortdir: AnyCodable?
|
||||
public let callersessionkey: String?
|
||||
|
||||
public init(
|
||||
includedisabled: Bool?,
|
||||
@ -2918,7 +2920,8 @@ public struct CronListParams: Codable, Sendable {
|
||||
query: String?,
|
||||
enabled: AnyCodable?,
|
||||
sortby: AnyCodable?,
|
||||
sortdir: AnyCodable?)
|
||||
sortdir: AnyCodable?,
|
||||
callersessionkey: String?)
|
||||
{
|
||||
self.includedisabled = includedisabled
|
||||
self.limit = limit
|
||||
@ -2927,6 +2930,7 @@ public struct CronListParams: Codable, Sendable {
|
||||
self.enabled = enabled
|
||||
self.sortby = sortby
|
||||
self.sortdir = sortdir
|
||||
self.callersessionkey = callersessionkey
|
||||
}
|
||||
|
||||
private enum CodingKeys: String, CodingKey {
|
||||
@ -2937,6 +2941,7 @@ public struct CronListParams: Codable, Sendable {
|
||||
case enabled
|
||||
case sortby = "sortBy"
|
||||
case sortdir = "sortDir"
|
||||
case callersessionkey = "callerSessionKey"
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@ -10,6 +10,7 @@ public enum ErrorCode: String, Codable, Sendable {
|
||||
case agentTimeout = "AGENT_TIMEOUT"
|
||||
case invalidRequest = "INVALID_REQUEST"
|
||||
case unavailable = "UNAVAILABLE"
|
||||
case permissionDenied = "PERMISSION_DENIED"
|
||||
}
|
||||
|
||||
public struct ConnectParams: Codable, Sendable {
|
||||
@ -2910,6 +2911,7 @@ public struct CronListParams: Codable, Sendable {
|
||||
public let enabled: AnyCodable?
|
||||
public let sortby: AnyCodable?
|
||||
public let sortdir: AnyCodable?
|
||||
public let callersessionkey: String?
|
||||
|
||||
public init(
|
||||
includedisabled: Bool?,
|
||||
@ -2918,7 +2920,8 @@ public struct CronListParams: Codable, Sendable {
|
||||
query: String?,
|
||||
enabled: AnyCodable?,
|
||||
sortby: AnyCodable?,
|
||||
sortdir: AnyCodable?)
|
||||
sortdir: AnyCodable?,
|
||||
callersessionkey: String?)
|
||||
{
|
||||
self.includedisabled = includedisabled
|
||||
self.limit = limit
|
||||
@ -2927,6 +2930,7 @@ public struct CronListParams: Codable, Sendable {
|
||||
self.enabled = enabled
|
||||
self.sortby = sortby
|
||||
self.sortdir = sortdir
|
||||
self.callersessionkey = callersessionkey
|
||||
}
|
||||
|
||||
private enum CodingKeys: String, CodingKey {
|
||||
@ -2937,6 +2941,7 @@ public struct CronListParams: Codable, Sendable {
|
||||
case enabled
|
||||
case sortby = "sortBy"
|
||||
case sortdir = "sortDir"
|
||||
case callersessionkey = "callerSessionKey"
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
343
src/cron/service.session-isolation.test.ts
Normal file
343
src/cron/service.session-isolation.test.ts
Normal file
@ -0,0 +1,343 @@
|
||||
/**
|
||||
* Tests for per-agent/session isolation of cron job visibility and mutations.
|
||||
* Covers issue #35447: cron.list, remove, update, and run must be scoped to
|
||||
* the calling agent/session unless the caller holds admin (ownerOverride).
|
||||
*/
|
||||
import { describe, expect, it, vi } from "vitest";
|
||||
import { CronService } from "./service.js";
|
||||
import {
|
||||
createCronStoreHarness,
|
||||
createMockCronStateForJobs,
|
||||
createNoopLogger,
|
||||
installCronTestHooks,
|
||||
} from "./service.test-harness.js";
|
||||
import { enqueueRun, listPage } from "./service/ops.js";
|
||||
import type { CronJob } from "./types.js";
|
||||
|
||||
const logger = createNoopLogger();
|
||||
const { makeStorePath } = createCronStoreHarness({ prefix: "openclaw-cron-isolation-" });
|
||||
installCronTestHooks({ logger });
|
||||
|
||||
const AGENT_A_KEY = "telegram:direct:111";
|
||||
const AGENT_B_KEY = "telegram:direct:222";
|
||||
|
||||
function makeCronService(storePath: string) {
|
||||
return new CronService({
|
||||
storePath,
|
||||
cronEnabled: true,
|
||||
log: logger,
|
||||
enqueueSystemEvent: vi.fn(),
|
||||
requestHeartbeatNow: vi.fn(),
|
||||
runIsolatedAgentJob: vi.fn(async () => ({ status: "ok" as const, summary: "done" })),
|
||||
});
|
||||
}
|
||||
|
||||
const BASE_JOB_ADD = {
|
||||
enabled: true,
|
||||
schedule: { kind: "every" as const, everyMs: 60_000 },
|
||||
sessionTarget: "isolated" as const,
|
||||
wakeMode: "now" as const,
|
||||
payload: { kind: "agentTurn" as const, message: "tick" },
|
||||
} as const;
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// Helpers for listPage unit tests (no disk I/O needed)
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
function makeMockJob(id: string, overrides?: Partial<CronJob>): CronJob {
|
||||
return {
|
||||
id,
|
||||
name: `job-${id}`,
|
||||
enabled: true,
|
||||
schedule: { kind: "cron", expr: "*/5 * * * *", tz: "UTC" },
|
||||
sessionTarget: "isolated",
|
||||
wakeMode: "now",
|
||||
payload: { kind: "agentTurn", message: "tick" },
|
||||
state: { nextRunAtMs: Date.parse("2026-03-17T12:00:00.000Z") },
|
||||
createdAtMs: Date.parse("2026-03-17T10:00:00.000Z"),
|
||||
updatedAtMs: Date.parse("2026-03-17T10:00:00.000Z"),
|
||||
...overrides,
|
||||
};
|
||||
}
|
||||
|
||||
function makeMockJobSet() {
|
||||
return [
|
||||
makeMockJob("job-a", { sessionKey: AGENT_A_KEY }),
|
||||
makeMockJob("job-b", { sessionKey: AGENT_B_KEY }),
|
||||
makeMockJob("job-legacy"), // no agentId / sessionKey
|
||||
];
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// listPage isolation (unit tests, no disk I/O)
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
describe("listPage session isolation", () => {
|
||||
it("returns only owned jobs and legacy jobs when callerSessionKey is provided", async () => {
|
||||
const state = createMockCronStateForJobs({ jobs: makeMockJobSet() });
|
||||
const page = await listPage(state, { callerSessionKey: AGENT_A_KEY });
|
||||
|
||||
const ids = page.jobs.map((j) => j.id);
|
||||
expect(ids).toContain("job-a");
|
||||
expect(ids).toContain("job-legacy"); // no owner → accessible
|
||||
expect(ids).not.toContain("job-b");
|
||||
});
|
||||
|
||||
it("returns only jobs belonging to agent-b when using agent-b session key", async () => {
|
||||
const state = createMockCronStateForJobs({ jobs: makeMockJobSet() });
|
||||
const page = await listPage(state, { callerSessionKey: AGENT_B_KEY });
|
||||
|
||||
const ids = page.jobs.map((j) => j.id);
|
||||
expect(ids).toContain("job-b");
|
||||
expect(ids).toContain("job-legacy");
|
||||
expect(ids).not.toContain("job-a");
|
||||
});
|
||||
|
||||
it("returns all jobs when ownerOverride is true (admin bypass)", async () => {
|
||||
const state = createMockCronStateForJobs({ jobs: makeMockJobSet() });
|
||||
const page = await listPage(state, {
|
||||
callerSessionKey: AGENT_A_KEY,
|
||||
ownerOverride: true,
|
||||
});
|
||||
|
||||
expect(page.jobs).toHaveLength(3);
|
||||
});
|
||||
|
||||
it("returns all jobs when no caller identity is provided (backward compat)", async () => {
|
||||
const state = createMockCronStateForJobs({ jobs: makeMockJobSet() });
|
||||
const page = await listPage(state, {});
|
||||
|
||||
expect(page.jobs).toHaveLength(3);
|
||||
});
|
||||
|
||||
it("total and pagination counters reflect the filtered set", async () => {
|
||||
const state = createMockCronStateForJobs({ jobs: makeMockJobSet() });
|
||||
const page = await listPage(state, { callerSessionKey: AGENT_A_KEY });
|
||||
|
||||
// job-a + job-legacy = 2
|
||||
expect(page.total).toBe(2);
|
||||
expect(page.jobs).toHaveLength(2);
|
||||
expect(page.hasMore).toBe(false);
|
||||
});
|
||||
|
||||
it("matches by agentId when callerAgentId is set", async () => {
|
||||
const jobs = [
|
||||
makeMockJob("job-with-agent-id", { agentId: "agent-abc" }),
|
||||
makeMockJob("job-other-agent", { agentId: "agent-xyz" }),
|
||||
];
|
||||
const state = createMockCronStateForJobs({ jobs });
|
||||
const page = await listPage(state, { callerAgentId: "agent-abc" });
|
||||
|
||||
const ids = page.jobs.map((j) => j.id);
|
||||
expect(ids).toContain("job-with-agent-id");
|
||||
expect(ids).not.toContain("job-other-agent");
|
||||
});
|
||||
});
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// remove ownership enforcement (integration tests with real store)
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
describe("remove ownership enforcement", () => {
|
||||
it("throws CRON_PERMISSION_DENIED when agent-b tries to remove agent-a's job", async () => {
|
||||
const { storePath } = await makeStorePath();
|
||||
const cron = makeCronService(storePath);
|
||||
await cron.start();
|
||||
|
||||
try {
|
||||
const jobA = await cron.add({ ...BASE_JOB_ADD, name: "job-a", sessionKey: AGENT_A_KEY });
|
||||
|
||||
await expect(cron.remove(jobA.id, { callerSessionKey: AGENT_B_KEY })).rejects.toMatchObject({
|
||||
code: "CRON_PERMISSION_DENIED",
|
||||
});
|
||||
} finally {
|
||||
cron.stop();
|
||||
}
|
||||
});
|
||||
|
||||
it("allows agent-a to remove its own job", async () => {
|
||||
const { storePath } = await makeStorePath();
|
||||
const cron = makeCronService(storePath);
|
||||
await cron.start();
|
||||
|
||||
try {
|
||||
const jobA = await cron.add({ ...BASE_JOB_ADD, name: "job-a", sessionKey: AGENT_A_KEY });
|
||||
const result = await cron.remove(jobA.id, { callerSessionKey: AGENT_A_KEY });
|
||||
expect(result).toEqual({ ok: true, removed: true });
|
||||
} finally {
|
||||
cron.stop();
|
||||
}
|
||||
});
|
||||
|
||||
it("allows removal when ownerOverride is true regardless of ownership", async () => {
|
||||
const { storePath } = await makeStorePath();
|
||||
const cron = makeCronService(storePath);
|
||||
await cron.start();
|
||||
|
||||
try {
|
||||
const jobA = await cron.add({ ...BASE_JOB_ADD, name: "job-a", sessionKey: AGENT_A_KEY });
|
||||
const result = await cron.remove(jobA.id, {
|
||||
callerSessionKey: AGENT_B_KEY,
|
||||
ownerOverride: true,
|
||||
});
|
||||
expect(result).toEqual({ ok: true, removed: true });
|
||||
} finally {
|
||||
cron.stop();
|
||||
}
|
||||
});
|
||||
|
||||
it("allows removal of legacy (no-owner) jobs by any caller", async () => {
|
||||
const { storePath } = await makeStorePath();
|
||||
const cron = makeCronService(storePath);
|
||||
await cron.start();
|
||||
|
||||
try {
|
||||
const legacy = await cron.add({ ...BASE_JOB_ADD, name: "legacy-job" });
|
||||
const result = await cron.remove(legacy.id, { callerSessionKey: AGENT_B_KEY });
|
||||
expect(result).toEqual({ ok: true, removed: true });
|
||||
} finally {
|
||||
cron.stop();
|
||||
}
|
||||
});
|
||||
|
||||
it("allows removal when no caller identity is provided (backward compat)", async () => {
|
||||
const { storePath } = await makeStorePath();
|
||||
const cron = makeCronService(storePath);
|
||||
await cron.start();
|
||||
|
||||
try {
|
||||
const jobA = await cron.add({ ...BASE_JOB_ADD, name: "job-a", sessionKey: AGENT_A_KEY });
|
||||
const result = await cron.remove(jobA.id);
|
||||
expect(result).toEqual({ ok: true, removed: true });
|
||||
} finally {
|
||||
cron.stop();
|
||||
}
|
||||
});
|
||||
});
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// update ownership enforcement (integration tests with real store)
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
describe("update ownership enforcement", () => {
|
||||
it("throws CRON_PERMISSION_DENIED when agent-b tries to update agent-a's job", async () => {
|
||||
const { storePath } = await makeStorePath();
|
||||
const cron = makeCronService(storePath);
|
||||
await cron.start();
|
||||
|
||||
try {
|
||||
const jobA = await cron.add({ ...BASE_JOB_ADD, name: "job-a", sessionKey: AGENT_A_KEY });
|
||||
|
||||
await expect(
|
||||
cron.update(jobA.id, { name: "renamed" }, { callerSessionKey: AGENT_B_KEY }),
|
||||
).rejects.toMatchObject({ code: "CRON_PERMISSION_DENIED" });
|
||||
} finally {
|
||||
cron.stop();
|
||||
}
|
||||
});
|
||||
|
||||
it("allows agent-a to update its own job", async () => {
|
||||
const { storePath } = await makeStorePath();
|
||||
const cron = makeCronService(storePath);
|
||||
await cron.start();
|
||||
|
||||
try {
|
||||
const jobA = await cron.add({ ...BASE_JOB_ADD, name: "job-a", sessionKey: AGENT_A_KEY });
|
||||
const updated = await cron.update(
|
||||
jobA.id,
|
||||
{ name: "updated-name" },
|
||||
{ callerSessionKey: AGENT_A_KEY },
|
||||
);
|
||||
expect(updated.name).toBe("updated-name");
|
||||
} finally {
|
||||
cron.stop();
|
||||
}
|
||||
});
|
||||
|
||||
it("allows update when ownerOverride is true", async () => {
|
||||
const { storePath } = await makeStorePath();
|
||||
const cron = makeCronService(storePath);
|
||||
await cron.start();
|
||||
|
||||
try {
|
||||
const jobA = await cron.add({ ...BASE_JOB_ADD, name: "job-a", sessionKey: AGENT_A_KEY });
|
||||
const updated = await cron.update(
|
||||
jobA.id,
|
||||
{ name: "admin-override" },
|
||||
{ callerSessionKey: AGENT_B_KEY, ownerOverride: true },
|
||||
);
|
||||
expect(updated.name).toBe("admin-override");
|
||||
} finally {
|
||||
cron.stop();
|
||||
}
|
||||
});
|
||||
|
||||
it("allows update of legacy (no-owner) jobs by any caller", async () => {
|
||||
const { storePath } = await makeStorePath();
|
||||
const cron = makeCronService(storePath);
|
||||
await cron.start();
|
||||
|
||||
try {
|
||||
const legacy = await cron.add({ ...BASE_JOB_ADD, name: "legacy" });
|
||||
const updated = await cron.update(
|
||||
legacy.id,
|
||||
{ name: "patched" },
|
||||
{ callerSessionKey: AGENT_A_KEY },
|
||||
);
|
||||
expect(updated.name).toBe("patched");
|
||||
} finally {
|
||||
cron.stop();
|
||||
}
|
||||
});
|
||||
});
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// enqueueRun ownership enforcement (integration tests with real store)
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
describe("enqueueRun ownership enforcement", () => {
|
||||
// These tests exercise the caller ownership check inside inspectManualRunPreflight,
|
||||
// which fires before any background I/O is enqueued. We use in-memory state so
|
||||
// there are no filesystem side effects or cleanup races.
|
||||
|
||||
it("throws CRON_PERMISSION_DENIED when agent-b tries to run agent-a's job", async () => {
|
||||
const jobs = [makeMockJob("job-a", { sessionKey: AGENT_A_KEY })];
|
||||
const state = createMockCronStateForJobs({ jobs });
|
||||
|
||||
await expect(
|
||||
enqueueRun(state, "job-a", "force", { callerSessionKey: AGENT_B_KEY }),
|
||||
).rejects.toMatchObject({ code: "CRON_PERMISSION_DENIED" });
|
||||
});
|
||||
|
||||
it("allows agent-a to run its own job (permission check passes)", async () => {
|
||||
const jobs = [makeMockJob("job-a", { sessionKey: AGENT_A_KEY })];
|
||||
const state = createMockCronStateForJobs({ jobs });
|
||||
|
||||
// enqueueRun resolves (ok:true) once the permission check passes and
|
||||
// the run is enqueued. The background execution will fail on persist (mock
|
||||
// state has no real storePath) but that does not affect the permission result.
|
||||
const result = await enqueueRun(state, "job-a", "force", { callerSessionKey: AGENT_A_KEY });
|
||||
expect(result.ok).toBe(true);
|
||||
});
|
||||
|
||||
it("allows enqueueRun when ownerOverride is true", async () => {
|
||||
const jobs = [makeMockJob("job-a", { sessionKey: AGENT_A_KEY })];
|
||||
const state = createMockCronStateForJobs({ jobs });
|
||||
|
||||
const result = await enqueueRun(state, "job-a", "force", {
|
||||
callerSessionKey: AGENT_B_KEY,
|
||||
ownerOverride: true,
|
||||
});
|
||||
expect(result.ok).toBe(true);
|
||||
});
|
||||
|
||||
it("allows enqueueRun of legacy (no-owner) jobs by any caller", async () => {
|
||||
const jobs = [makeMockJob("job-legacy")]; // no agentId / sessionKey
|
||||
const state = createMockCronStateForJobs({ jobs });
|
||||
|
||||
const result = await enqueueRun(state, "job-legacy", "force", {
|
||||
callerSessionKey: AGENT_B_KEY,
|
||||
});
|
||||
expect(result.ok).toBe(true);
|
||||
});
|
||||
});
|
||||
@ -1,4 +1,5 @@
|
||||
import * as ops from "./service/ops.js";
|
||||
import type { CronMutationCallerOptions } from "./service/ops.js";
|
||||
import { type CronServiceDeps, createCronServiceState } from "./service/state.js";
|
||||
import type { CronJob, CronJobCreate, CronJobPatch } from "./types.js";
|
||||
|
||||
@ -34,20 +35,20 @@ export class CronService {
|
||||
return await ops.add(this.state, input);
|
||||
}
|
||||
|
||||
async update(id: string, patch: CronJobPatch) {
|
||||
return await ops.update(this.state, id, patch);
|
||||
async update(id: string, patch: CronJobPatch, caller?: CronMutationCallerOptions) {
|
||||
return await ops.update(this.state, id, patch, caller);
|
||||
}
|
||||
|
||||
async remove(id: string) {
|
||||
return await ops.remove(this.state, id);
|
||||
async remove(id: string, caller?: CronMutationCallerOptions) {
|
||||
return await ops.remove(this.state, id, caller);
|
||||
}
|
||||
|
||||
async run(id: string, mode?: "due" | "force") {
|
||||
return await ops.run(this.state, id, mode);
|
||||
async run(id: string, mode?: "due" | "force", caller?: CronMutationCallerOptions) {
|
||||
return await ops.run(this.state, id, mode, caller);
|
||||
}
|
||||
|
||||
async enqueueRun(id: string, mode?: "due" | "force") {
|
||||
return await ops.enqueueRun(this.state, id, mode);
|
||||
async enqueueRun(id: string, mode?: "due" | "force", caller?: CronMutationCallerOptions) {
|
||||
return await ops.enqueueRun(this.state, id, mode, caller);
|
||||
}
|
||||
|
||||
getJob(id: string): CronJob | undefined {
|
||||
|
||||
@ -37,6 +37,21 @@ export type CronListPageOptions = {
|
||||
enabled?: CronJobsEnabledFilter;
|
||||
sortBy?: CronJobsSortBy;
|
||||
sortDir?: CronSortDir;
|
||||
/** When set, restricts results to jobs owned by this agentId. Ignored when ownerOverride is true. */
|
||||
callerAgentId?: string;
|
||||
/** When set, restricts results to jobs owned by this sessionKey. Ignored when ownerOverride is true. */
|
||||
callerSessionKey?: string;
|
||||
/** When true, bypasses caller-scoped filtering (admin/owner sessions only). */
|
||||
ownerOverride?: boolean;
|
||||
};
|
||||
|
||||
export type CronMutationCallerOptions = {
|
||||
/** agentId of the caller requesting the mutation. */
|
||||
callerAgentId?: string;
|
||||
/** sessionKey of the caller requesting the mutation. */
|
||||
callerSessionKey?: string;
|
||||
/** When true, bypasses ownership checks (admin/owner sessions only). */
|
||||
ownerOverride?: boolean;
|
||||
};
|
||||
|
||||
export type CronListPageResult = {
|
||||
@ -47,6 +62,40 @@ export type CronListPageResult = {
|
||||
hasMore: boolean;
|
||||
nextOffset: number | null;
|
||||
};
|
||||
|
||||
/**
|
||||
* Returns true when the caller is permitted to mutate or read the given job.
|
||||
*
|
||||
* Ownership is determined by matching either agentId or sessionKey.
|
||||
* When ownerOverride is true the check is skipped (admin/owner callers).
|
||||
* When neither callerAgentId nor callerSessionKey are provided (e.g. direct
|
||||
* CLI usage), the check is also skipped so backward compatibility is preserved.
|
||||
*/
|
||||
function callerOwnsJob(
|
||||
job: { agentId?: string; sessionKey?: string },
|
||||
caller: CronMutationCallerOptions,
|
||||
): boolean {
|
||||
if (caller.ownerOverride) {
|
||||
return true;
|
||||
}
|
||||
// No caller identity available — preserve backward compat (local CLI, tests).
|
||||
if (!caller.callerAgentId && !caller.callerSessionKey) {
|
||||
return true;
|
||||
}
|
||||
if (caller.callerAgentId && job.agentId && caller.callerAgentId === job.agentId) {
|
||||
return true;
|
||||
}
|
||||
if (caller.callerSessionKey && job.sessionKey && caller.callerSessionKey === job.sessionKey) {
|
||||
return true;
|
||||
}
|
||||
// Job has no owner metadata — allow access so pre-existing jobs without
|
||||
// agentId/sessionKey remain accessible.
|
||||
if (!job.agentId && !job.sessionKey) {
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
function mergeManualRunSnapshotAfterReload(params: {
|
||||
state: CronServiceState;
|
||||
jobId: string;
|
||||
@ -201,8 +250,16 @@ export async function listPage(state: CronServiceState, opts?: CronListPageOptio
|
||||
const enabledFilter = resolveEnabledFilter(opts);
|
||||
const sortBy = opts?.sortBy ?? "nextRunAtMs";
|
||||
const sortDir = opts?.sortDir ?? "asc";
|
||||
const callerOpts: CronMutationCallerOptions = {
|
||||
callerAgentId: opts?.callerAgentId,
|
||||
callerSessionKey: opts?.callerSessionKey,
|
||||
ownerOverride: opts?.ownerOverride,
|
||||
};
|
||||
const source = state.store?.jobs ?? [];
|
||||
const filtered = source.filter((job) => {
|
||||
if (!callerOwnsJob(job, callerOpts)) {
|
||||
return false;
|
||||
}
|
||||
if (enabledFilter === "enabled" && !job.enabled) {
|
||||
return false;
|
||||
}
|
||||
@ -268,11 +325,21 @@ export async function add(state: CronServiceState, input: CronJobCreate) {
|
||||
});
|
||||
}
|
||||
|
||||
export async function update(state: CronServiceState, id: string, patch: CronJobPatch) {
|
||||
export async function update(
|
||||
state: CronServiceState,
|
||||
id: string,
|
||||
patch: CronJobPatch,
|
||||
caller?: CronMutationCallerOptions,
|
||||
) {
|
||||
return await locked(state, async () => {
|
||||
warnIfDisabled(state, "update");
|
||||
await ensureLoaded(state, { skipRecompute: true });
|
||||
const job = findJobOrThrow(state, id);
|
||||
if (caller && !callerOwnsJob(job, caller)) {
|
||||
throw Object.assign(new Error(`cron: permission denied for update on job ${id}`), {
|
||||
code: "CRON_PERMISSION_DENIED",
|
||||
});
|
||||
}
|
||||
const now = state.deps.nowMs();
|
||||
applyJobPatch(job, patch, { defaultAgentId: state.deps.defaultAgentId });
|
||||
if (job.schedule.kind === "every") {
|
||||
@ -322,7 +389,11 @@ export async function update(state: CronServiceState, id: string, patch: CronJob
|
||||
});
|
||||
}
|
||||
|
||||
export async function remove(state: CronServiceState, id: string) {
|
||||
export async function remove(
|
||||
state: CronServiceState,
|
||||
id: string,
|
||||
caller?: CronMutationCallerOptions,
|
||||
) {
|
||||
return await locked(state, async () => {
|
||||
warnIfDisabled(state, "remove");
|
||||
await ensureLoaded(state);
|
||||
@ -330,6 +401,14 @@ export async function remove(state: CronServiceState, id: string) {
|
||||
if (!state.store) {
|
||||
return { ok: false, removed: false } as const;
|
||||
}
|
||||
if (caller) {
|
||||
const target = state.store.jobs.find((j) => j.id === id);
|
||||
if (target && !callerOwnsJob(target, caller)) {
|
||||
throw Object.assign(new Error(`cron: permission denied for remove on job ${id}`), {
|
||||
code: "CRON_PERMISSION_DENIED",
|
||||
});
|
||||
}
|
||||
}
|
||||
state.store.jobs = state.store.jobs.filter((j) => j.id !== id);
|
||||
const removed = (state.store.jobs.length ?? 0) !== before;
|
||||
await persist(state);
|
||||
@ -376,6 +455,7 @@ async function inspectManualRunPreflight(
|
||||
state: CronServiceState,
|
||||
id: string,
|
||||
mode?: "due" | "force",
|
||||
caller?: CronMutationCallerOptions,
|
||||
): Promise<ManualRunPreflightResult> {
|
||||
return await locked(state, async () => {
|
||||
warnIfDisabled(state, "run");
|
||||
@ -385,6 +465,11 @@ async function inspectManualRunPreflight(
|
||||
// persist does not block manual triggers for up to STUCK_RUN_MS (#17554).
|
||||
recomputeNextRunsForMaintenance(state);
|
||||
const job = findJobOrThrow(state, id);
|
||||
if (caller && !callerOwnsJob(job, caller)) {
|
||||
throw Object.assign(new Error(`cron: permission denied for run on job ${id}`), {
|
||||
code: "CRON_PERMISSION_DENIED",
|
||||
});
|
||||
}
|
||||
if (typeof job.state.runningAtMs === "number") {
|
||||
return { ok: true, ran: false, reason: "already-running" as const };
|
||||
}
|
||||
@ -401,8 +486,9 @@ async function inspectManualRunDisposition(
|
||||
state: CronServiceState,
|
||||
id: string,
|
||||
mode?: "due" | "force",
|
||||
caller?: CronMutationCallerOptions,
|
||||
): Promise<ManualRunDisposition | { ok: false }> {
|
||||
const result = await inspectManualRunPreflight(state, id, mode);
|
||||
const result = await inspectManualRunPreflight(state, id, mode, caller);
|
||||
if (!result.ok) {
|
||||
return result;
|
||||
}
|
||||
@ -416,8 +502,9 @@ async function prepareManualRun(
|
||||
state: CronServiceState,
|
||||
id: string,
|
||||
mode?: "due" | "force",
|
||||
caller?: CronMutationCallerOptions,
|
||||
): Promise<PreparedManualRun> {
|
||||
const preflight = await inspectManualRunPreflight(state, id, mode);
|
||||
const preflight = await inspectManualRunPreflight(state, id, mode, caller);
|
||||
if (!preflight.ok) {
|
||||
return preflight;
|
||||
}
|
||||
@ -539,8 +626,13 @@ async function finishPreparedManualRun(
|
||||
});
|
||||
}
|
||||
|
||||
export async function run(state: CronServiceState, id: string, mode?: "due" | "force") {
|
||||
const prepared = await prepareManualRun(state, id, mode);
|
||||
export async function run(
|
||||
state: CronServiceState,
|
||||
id: string,
|
||||
mode?: "due" | "force",
|
||||
caller?: CronMutationCallerOptions,
|
||||
) {
|
||||
const prepared = await prepareManualRun(state, id, mode, caller);
|
||||
if (!prepared.ok || !prepared.ran) {
|
||||
return prepared;
|
||||
}
|
||||
@ -548,8 +640,13 @@ export async function run(state: CronServiceState, id: string, mode?: "due" | "f
|
||||
return { ok: true, ran: true } as const;
|
||||
}
|
||||
|
||||
export async function enqueueRun(state: CronServiceState, id: string, mode?: "due" | "force") {
|
||||
const disposition = await inspectManualRunDisposition(state, id, mode);
|
||||
export async function enqueueRun(
|
||||
state: CronServiceState,
|
||||
id: string,
|
||||
mode?: "due" | "force",
|
||||
caller?: CronMutationCallerOptions,
|
||||
) {
|
||||
const disposition = await inspectManualRunDisposition(state, id, mode, caller);
|
||||
if (!disposition.ok || !("runnable" in disposition && disposition.runnable)) {
|
||||
return disposition;
|
||||
}
|
||||
@ -558,6 +655,8 @@ export async function enqueueRun(state: CronServiceState, id: string, mode?: "du
|
||||
void enqueueCommandInLane(
|
||||
CommandLane.Cron,
|
||||
async () => {
|
||||
// Note: caller check already passed in inspectManualRunDisposition above;
|
||||
// re-run without caller so the internal execution path is not gated twice.
|
||||
const result = await run(state, id, mode);
|
||||
if (result.ok && "ran" in result && !result.ran) {
|
||||
state.deps.log.info(
|
||||
|
||||
@ -79,6 +79,59 @@ describe("cron protocol validators", () => {
|
||||
expect(validateCronListParams({ offset: -1 })).toBe(false);
|
||||
});
|
||||
|
||||
it("accepts callerSessionKey on list params", () => {
|
||||
expect(validateCronListParams({ callerSessionKey: "telegram:direct:111" })).toBe(true);
|
||||
expect(validateCronListParams({ callerSessionKey: "" })).toBe(false);
|
||||
});
|
||||
|
||||
it("accepts callerSessionKey on update params", () => {
|
||||
expect(
|
||||
validateCronUpdateParams({
|
||||
id: "job-1",
|
||||
patch: { enabled: false },
|
||||
callerSessionKey: "telegram:direct:111",
|
||||
}),
|
||||
).toBe(true);
|
||||
expect(
|
||||
validateCronUpdateParams({
|
||||
jobId: "job-2",
|
||||
patch: { enabled: true },
|
||||
callerSessionKey: "telegram:direct:222",
|
||||
}),
|
||||
).toBe(true);
|
||||
expect(
|
||||
validateCronUpdateParams({ id: "job-1", patch: { enabled: false }, callerSessionKey: "" }),
|
||||
).toBe(false);
|
||||
});
|
||||
|
||||
it("accepts callerSessionKey on remove params", () => {
|
||||
expect(validateCronRemoveParams({ id: "job-1", callerSessionKey: "telegram:direct:111" })).toBe(
|
||||
true,
|
||||
);
|
||||
expect(
|
||||
validateCronRemoveParams({ jobId: "job-2", callerSessionKey: "telegram:direct:222" }),
|
||||
).toBe(true);
|
||||
expect(validateCronRemoveParams({ id: "job-1", callerSessionKey: "" })).toBe(false);
|
||||
});
|
||||
|
||||
it("accepts callerSessionKey on run params", () => {
|
||||
expect(
|
||||
validateCronRunParams({
|
||||
id: "job-1",
|
||||
mode: "force",
|
||||
callerSessionKey: "telegram:direct:111",
|
||||
}),
|
||||
).toBe(true);
|
||||
expect(
|
||||
validateCronRunParams({
|
||||
jobId: "job-2",
|
||||
mode: "due",
|
||||
callerSessionKey: "telegram:direct:222",
|
||||
}),
|
||||
).toBe(true);
|
||||
expect(validateCronRunParams({ id: "job-1", callerSessionKey: "" })).toBe(false);
|
||||
});
|
||||
|
||||
it("enforces runs limit minimum for id and jobId selectors", () => {
|
||||
expect(validateCronRunsParams({ id: "job-1", limit: 1 })).toBe(true);
|
||||
expect(validateCronRunsParams({ jobId: "job-2", limit: 1 })).toBe(true);
|
||||
|
||||
@ -275,6 +275,7 @@ export const CronListParamsSchema = Type.Object(
|
||||
enabled: Type.Optional(CronJobsEnabledFilterSchema),
|
||||
sortBy: Type.Optional(CronJobsSortBySchema),
|
||||
sortDir: Type.Optional(CronSortDirSchema),
|
||||
callerSessionKey: Type.Optional(NonEmptyString),
|
||||
},
|
||||
{ additionalProperties: false },
|
||||
);
|
||||
@ -312,12 +313,16 @@ export const CronJobPatchSchema = Type.Object(
|
||||
|
||||
export const CronUpdateParamsSchema = cronIdOrJobIdParams({
|
||||
patch: CronJobPatchSchema,
|
||||
callerSessionKey: Type.Optional(NonEmptyString),
|
||||
});
|
||||
|
||||
export const CronRemoveParamsSchema = cronIdOrJobIdParams({});
|
||||
export const CronRemoveParamsSchema = cronIdOrJobIdParams({
|
||||
callerSessionKey: Type.Optional(NonEmptyString),
|
||||
});
|
||||
|
||||
export const CronRunParamsSchema = cronIdOrJobIdParams({
|
||||
mode: Type.Optional(Type.Union([Type.Literal("due"), Type.Literal("force")])),
|
||||
callerSessionKey: Type.Optional(NonEmptyString),
|
||||
});
|
||||
|
||||
export const CronRunsParamsSchema = Type.Object(
|
||||
|
||||
@ -6,6 +6,7 @@ export const ErrorCodes = {
|
||||
AGENT_TIMEOUT: "AGENT_TIMEOUT",
|
||||
INVALID_REQUEST: "INVALID_REQUEST",
|
||||
UNAVAILABLE: "UNAVAILABLE",
|
||||
PERMISSION_DENIED: "PERMISSION_DENIED",
|
||||
} as const;
|
||||
|
||||
export type ErrorCode = (typeof ErrorCodes)[keyof typeof ErrorCodes];
|
||||
|
||||
61
src/gateway/server-methods/cron.caller-options.test.ts
Normal file
61
src/gateway/server-methods/cron.caller-options.test.ts
Normal file
@ -0,0 +1,61 @@
|
||||
import { describe, expect, it } from "vitest";
|
||||
import { ADMIN_SCOPE, READ_SCOPE, WRITE_SCOPE } from "../method-scopes.js";
|
||||
import { resolveCronCallerOptions } from "./cron.js";
|
||||
import type { GatewayClient } from "./types.js";
|
||||
|
||||
function makeClient(scopes: string[]): GatewayClient {
|
||||
return {
|
||||
connect: {
|
||||
minProtocol: 1,
|
||||
maxProtocol: 1,
|
||||
client: {
|
||||
id: "openclaw-control-ui",
|
||||
version: "1.0.0",
|
||||
platform: "test",
|
||||
mode: "operator",
|
||||
},
|
||||
scopes,
|
||||
},
|
||||
} as unknown as GatewayClient;
|
||||
}
|
||||
|
||||
describe("resolveCronCallerOptions", () => {
|
||||
it("sets ownerOverride=true for admin without sessionKey", () => {
|
||||
const opts = resolveCronCallerOptions(makeClient([ADMIN_SCOPE]));
|
||||
expect(opts.ownerOverride).toBe(true);
|
||||
expect(opts.callerSessionKey).toBeUndefined();
|
||||
});
|
||||
|
||||
it("sets ownerOverride=false for admin WITH sessionKey", () => {
|
||||
const opts = resolveCronCallerOptions(makeClient([ADMIN_SCOPE]), "telegram:direct:111");
|
||||
expect(opts.ownerOverride).toBe(false);
|
||||
expect(opts.callerSessionKey).toBe("telegram:direct:111");
|
||||
});
|
||||
|
||||
it("sets ownerOverride=false for non-admin without sessionKey", () => {
|
||||
const opts = resolveCronCallerOptions(makeClient([READ_SCOPE]));
|
||||
expect(opts.ownerOverride).toBe(false);
|
||||
expect(opts.callerSessionKey).toBeUndefined();
|
||||
});
|
||||
|
||||
it("sets ownerOverride=false for non-admin with sessionKey", () => {
|
||||
const opts = resolveCronCallerOptions(
|
||||
makeClient([READ_SCOPE, WRITE_SCOPE]),
|
||||
"discord:channel:ops",
|
||||
);
|
||||
expect(opts.ownerOverride).toBe(false);
|
||||
expect(opts.callerSessionKey).toBe("discord:channel:ops");
|
||||
});
|
||||
|
||||
it("handles null client gracefully", () => {
|
||||
const opts = resolveCronCallerOptions(null, "telegram:direct:111");
|
||||
expect(opts.ownerOverride).toBe(false);
|
||||
expect(opts.callerSessionKey).toBe("telegram:direct:111");
|
||||
});
|
||||
|
||||
it("handles null client without sessionKey", () => {
|
||||
const opts = resolveCronCallerOptions(null);
|
||||
expect(opts.ownerOverride).toBe(false);
|
||||
expect(opts.callerSessionKey).toBeUndefined();
|
||||
});
|
||||
});
|
||||
@ -4,8 +4,10 @@ import {
|
||||
readCronRunLogEntriesPageAll,
|
||||
resolveCronRunLogPath,
|
||||
} from "../../cron/run-log.js";
|
||||
import type { CronMutationCallerOptions } from "../../cron/service/ops.js";
|
||||
import type { CronJobCreate, CronJobPatch } from "../../cron/types.js";
|
||||
import { validateScheduleTimestamp } from "../../cron/validate-timestamp.js";
|
||||
import { ADMIN_SCOPE } from "../method-scopes.js";
|
||||
import {
|
||||
ErrorCodes,
|
||||
errorShape,
|
||||
@ -19,7 +21,37 @@ import {
|
||||
validateCronUpdateParams,
|
||||
validateWakeParams,
|
||||
} from "../protocol/index.js";
|
||||
import type { GatewayRequestHandlers } from "./types.js";
|
||||
import type { GatewayClient, GatewayRequestHandlers } from "./types.js";
|
||||
|
||||
/**
|
||||
* Resolves the caller identity and admin-bypass flag from the connected client.
|
||||
*
|
||||
* When the caller supplies a `callerSessionKey` it is explicitly requesting
|
||||
* session-scoped access (multi-agent / multi-user deployments). In that case
|
||||
* the ownership check in the service layer must fire even if the client holds
|
||||
* `ADMIN_SCOPE`, so `ownerOverride` stays false.
|
||||
*
|
||||
* `ownerOverride` is only true when the client is an admin that did **not**
|
||||
* supply a session key — the typical local-CLI / control-UI case where a
|
||||
* single operator manages all jobs.
|
||||
*/
|
||||
export function resolveCronCallerOptions(
|
||||
client: GatewayClient | null,
|
||||
callerSessionKey?: string,
|
||||
): CronMutationCallerOptions {
|
||||
const scopes: readonly string[] = Array.isArray(client?.connect?.scopes)
|
||||
? (client.connect.scopes as string[])
|
||||
: [];
|
||||
const isAdmin = scopes.includes(ADMIN_SCOPE);
|
||||
// Only bypass ownership when the caller is an admin that did NOT supply a
|
||||
// session key. A present session key signals session-scoped intent, so the
|
||||
// service-layer ownership check must still run.
|
||||
const ownerOverride = isAdmin && !callerSessionKey;
|
||||
return {
|
||||
callerSessionKey: callerSessionKey ?? undefined,
|
||||
ownerOverride,
|
||||
};
|
||||
}
|
||||
|
||||
export const cronHandlers: GatewayRequestHandlers = {
|
||||
wake: ({ params, respond, context }) => {
|
||||
@ -41,7 +73,7 @@ export const cronHandlers: GatewayRequestHandlers = {
|
||||
const result = context.cron.wake({ mode: p.mode, text: p.text });
|
||||
respond(true, result, undefined);
|
||||
},
|
||||
"cron.list": async ({ params, respond, context }) => {
|
||||
"cron.list": async ({ params, respond, context, client }) => {
|
||||
if (!validateCronListParams(params)) {
|
||||
respond(
|
||||
false,
|
||||
@ -61,7 +93,9 @@ export const cronHandlers: GatewayRequestHandlers = {
|
||||
enabled?: "all" | "enabled" | "disabled";
|
||||
sortBy?: "nextRunAtMs" | "updatedAtMs" | "name";
|
||||
sortDir?: "asc" | "desc";
|
||||
callerSessionKey?: string;
|
||||
};
|
||||
const callerOpts = resolveCronCallerOptions(client, p.callerSessionKey);
|
||||
const page = await context.cron.listPage({
|
||||
includeDisabled: p.includeDisabled,
|
||||
limit: p.limit,
|
||||
@ -70,6 +104,8 @@ export const cronHandlers: GatewayRequestHandlers = {
|
||||
enabled: p.enabled,
|
||||
sortBy: p.sortBy,
|
||||
sortDir: p.sortDir,
|
||||
callerSessionKey: callerOpts.callerSessionKey,
|
||||
ownerOverride: callerOpts.ownerOverride,
|
||||
});
|
||||
respond(true, page, undefined);
|
||||
},
|
||||
@ -122,7 +158,7 @@ export const cronHandlers: GatewayRequestHandlers = {
|
||||
context.logGateway.info("cron: job created", { jobId: job.id, schedule: jobCreate.schedule });
|
||||
respond(true, job, undefined);
|
||||
},
|
||||
"cron.update": async ({ params, respond, context }) => {
|
||||
"cron.update": async ({ params, respond, context, client }) => {
|
||||
const normalizedPatch = normalizeCronJobPatch((params as { patch?: unknown } | null)?.patch);
|
||||
const candidate =
|
||||
normalizedPatch && typeof params === "object" && params !== null
|
||||
@ -143,6 +179,7 @@ export const cronHandlers: GatewayRequestHandlers = {
|
||||
id?: string;
|
||||
jobId?: string;
|
||||
patch: Record<string, unknown>;
|
||||
callerSessionKey?: string;
|
||||
};
|
||||
const jobId = p.id ?? p.jobId;
|
||||
if (!jobId) {
|
||||
@ -165,11 +202,20 @@ export const cronHandlers: GatewayRequestHandlers = {
|
||||
return;
|
||||
}
|
||||
}
|
||||
const job = await context.cron.update(jobId, patch);
|
||||
context.logGateway.info("cron: job updated", { jobId });
|
||||
respond(true, job, undefined);
|
||||
const callerOpts = resolveCronCallerOptions(client, p.callerSessionKey);
|
||||
try {
|
||||
const job = await context.cron.update(jobId, patch, callerOpts);
|
||||
context.logGateway.info("cron: job updated", { jobId });
|
||||
respond(true, job, undefined);
|
||||
} catch (err) {
|
||||
if ((err as { code?: string } | null)?.code === "CRON_PERMISSION_DENIED") {
|
||||
respond(false, undefined, errorShape(ErrorCodes.PERMISSION_DENIED, "permission denied"));
|
||||
return;
|
||||
}
|
||||
throw err;
|
||||
}
|
||||
},
|
||||
"cron.remove": async ({ params, respond, context }) => {
|
||||
"cron.remove": async ({ params, respond, context, client }) => {
|
||||
if (!validateCronRemoveParams(params)) {
|
||||
respond(
|
||||
false,
|
||||
@ -181,7 +227,7 @@ export const cronHandlers: GatewayRequestHandlers = {
|
||||
);
|
||||
return;
|
||||
}
|
||||
const p = params as { id?: string; jobId?: string };
|
||||
const p = params as { id?: string; jobId?: string; callerSessionKey?: string };
|
||||
const jobId = p.id ?? p.jobId;
|
||||
if (!jobId) {
|
||||
respond(
|
||||
@ -191,13 +237,22 @@ export const cronHandlers: GatewayRequestHandlers = {
|
||||
);
|
||||
return;
|
||||
}
|
||||
const result = await context.cron.remove(jobId);
|
||||
if (result.removed) {
|
||||
context.logGateway.info("cron: job removed", { jobId });
|
||||
const callerOpts = resolveCronCallerOptions(client, p.callerSessionKey);
|
||||
try {
|
||||
const result = await context.cron.remove(jobId, callerOpts);
|
||||
if (result.removed) {
|
||||
context.logGateway.info("cron: job removed", { jobId });
|
||||
}
|
||||
respond(true, result, undefined);
|
||||
} catch (err) {
|
||||
if ((err as { code?: string } | null)?.code === "CRON_PERMISSION_DENIED") {
|
||||
respond(false, undefined, errorShape(ErrorCodes.PERMISSION_DENIED, "permission denied"));
|
||||
return;
|
||||
}
|
||||
throw err;
|
||||
}
|
||||
respond(true, result, undefined);
|
||||
},
|
||||
"cron.run": async ({ params, respond, context }) => {
|
||||
"cron.run": async ({ params, respond, context, client }) => {
|
||||
if (!validateCronRunParams(params)) {
|
||||
respond(
|
||||
false,
|
||||
@ -209,7 +264,12 @@ export const cronHandlers: GatewayRequestHandlers = {
|
||||
);
|
||||
return;
|
||||
}
|
||||
const p = params as { id?: string; jobId?: string; mode?: "due" | "force" };
|
||||
const p = params as {
|
||||
id?: string;
|
||||
jobId?: string;
|
||||
mode?: "due" | "force";
|
||||
callerSessionKey?: string;
|
||||
};
|
||||
const jobId = p.id ?? p.jobId;
|
||||
if (!jobId) {
|
||||
respond(
|
||||
@ -219,8 +279,17 @@ export const cronHandlers: GatewayRequestHandlers = {
|
||||
);
|
||||
return;
|
||||
}
|
||||
const result = await context.cron.enqueueRun(jobId, p.mode ?? "force");
|
||||
respond(true, result, undefined);
|
||||
const callerOpts = resolveCronCallerOptions(client, p.callerSessionKey);
|
||||
try {
|
||||
const result = await context.cron.enqueueRun(jobId, p.mode ?? "force", callerOpts);
|
||||
respond(true, result, undefined);
|
||||
} catch (err) {
|
||||
if ((err as { code?: string } | null)?.code === "CRON_PERMISSION_DENIED") {
|
||||
respond(false, undefined, errorShape(ErrorCodes.PERMISSION_DENIED, "permission denied"));
|
||||
return;
|
||||
}
|
||||
throw err;
|
||||
}
|
||||
},
|
||||
"cron.runs": async ({ params, respond, context }) => {
|
||||
if (!validateCronRunsParams(params)) {
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user