Merge 9717195e4e8980347ef3cbdbd3c1b9f337c58088 into 598f1826d8b2bc969aace2c6459824737667218c

This commit is contained in:
xydt-610 2026-03-21 12:16:22 +08:00 committed by GitHub
commit b5cbe88a3d
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 93 additions and 0 deletions

View File

@ -1,5 +1,8 @@
import fs from "node:fs/promises";
import { enqueueCommandInLane } from "../../process/command-queue.js";
import { CommandLane } from "../../process/lanes.js";
import { resolveCronRunLogPath } from "../run-log.js";
import { purgeCronJobSessions } from "../session-reaper.js";
import type { CronJob, CronJobCreate, CronJobPatch } from "../types.js";
import { normalizeCronCreateDeliveryInput } from "./initial-delivery.js";
import {
@ -330,12 +333,32 @@ export async function remove(state: CronServiceState, id: string) {
if (!state.store) {
return { ok: false, removed: false } as const;
}
const job = state.store.jobs.find((j) => j.id === id);
state.store.jobs = state.store.jobs.filter((j) => j.id !== id);
const removed = (state.store.jobs.length ?? 0) !== before;
await persist(state);
armTimer(state);
if (removed) {
emit(state, { jobId: id, action: "removed" });
// Clean up run log file
const logPath = resolveCronRunLogPath({ storePath: state.deps.storePath, jobId: id });
void fs.unlink(logPath).catch(() => undefined);
// Clean up sessions from the session store
if (job && state.deps.resolveSessionStorePath) {
const sessionStorePath = state.deps.resolveSessionStorePath(job.agentId);
void purgeCronJobSessions({
jobId: id,
sessionStorePath,
log: state.deps.log,
}).catch((err) => {
state.deps.log.warn(
{ err: String(err), jobId: id },
"cron: failed to purge job sessions on removal",
);
});
}
}
return { ok: true, removed } as const;
});

View File

@ -14,6 +14,7 @@ import {
} from "../config/sessions.js";
import type { CronConfig } from "../config/types.cron.js";
import { cleanupArchivedSessionTranscripts } from "../gateway/session-utils.fs.js";
import { parseAgentSessionKey } from "../sessions/session-key-utils.js";
import { isCronRunSessionKey } from "../sessions/session-key-utils.js";
import type { Logger } from "./service/state.js";
@ -146,6 +147,75 @@ export async function sweepCronRunSessions(params: {
return { swept: true, pruned };
}
/**
* Purge all sessions associated with a deleted cron job.
* Removes both the base session and all run sessions.
*/
export async function purgeCronJobSessions(params: {
jobId: string;
sessionStorePath: string;
log: Logger;
}): Promise<void> {
const storePath = params.sessionStorePath;
const jobId = params.jobId;
const removedSessions = new Map<string, string | undefined>();
try {
await updateSessionStore(storePath, (store) => {
for (const key of Object.keys(store)) {
const parsed = parseAgentSessionKey(key);
if (!parsed) {
continue;
}
// Match base session (cron:<jobId>) or run session (cron:<jobId>:run:<uuid>)
const isBase = parsed.rest === `cron:${jobId}`;
const isRun = parsed.rest.startsWith(`cron:${jobId}:run:`);
if (isBase || isRun) {
const entry = store[key];
if (entry) {
removedSessions.set(entry.sessionId, entry.sessionFile);
}
delete store[key];
}
}
});
} catch (err) {
params.log.warn({ err: String(err), jobId }, "cron-reaper: failed to purge job sessions");
return;
}
// Cleanup transcripts for purged sessions
if (removedSessions.size > 0) {
try {
const store = loadSessionStore(storePath, { skipCache: true });
const referencedSessionIds = new Set(
Object.values(store)
.map((entry) => entry?.sessionId)
.filter((id): id is string => Boolean(id)),
);
const archivedDirs = archiveRemovedSessionTranscripts({
removedSessionFiles: removedSessions,
referencedSessionIds,
storePath,
reason: "deleted",
restrictToStoreDir: true,
});
if (archivedDirs.size > 0) {
await cleanupArchivedSessionTranscripts({
directories: [...archivedDirs],
olderThanMs: 0, // delete immediately
reason: "deleted",
nowMs: Date.now(),
});
}
} catch (err) {
params.log.warn({ err: String(err), jobId }, "cron-reaper: purged transcript cleanup failed");
}
}
}
/** Reset the throttle timer (for tests). */
export function resetReaperThrottle(): void {
lastSweepAtMsByStore.clear();