From edbd266e7782f344926a7716d517c1a356edb4b3 Mon Sep 17 00:00:00 2001 From: chenwenxiong <381152212@qq.com> Date: Sun, 15 Mar 2026 01:55:32 +0800 Subject: [PATCH] cron: clean up sessions and run logs on job removal (#46369) --- src/cron/service/ops.ts | 23 +++++++++++++ src/cron/session-reaper.ts | 70 ++++++++++++++++++++++++++++++++++++++ 2 files changed, 93 insertions(+) diff --git a/src/cron/service/ops.ts b/src/cron/service/ops.ts index 69751e4dfdb..3048b0ce9bf 100644 --- a/src/cron/service/ops.ts +++ b/src/cron/service/ops.ts @@ -1,5 +1,8 @@ +import fs from "node:fs/promises"; import { enqueueCommandInLane } from "../../process/command-queue.js"; import { CommandLane } from "../../process/lanes.js"; +import { resolveCronRunLogPath } from "../run-log.js"; +import { purgeCronJobSessions } from "../session-reaper.js"; import type { CronJob, CronJobCreate, CronJobPatch } from "../types.js"; import { normalizeCronCreateDeliveryInput } from "./initial-delivery.js"; import { @@ -330,12 +333,32 @@ export async function remove(state: CronServiceState, id: string) { if (!state.store) { return { ok: false, removed: false } as const; } + const job = state.store.jobs.find((j) => j.id === id); state.store.jobs = state.store.jobs.filter((j) => j.id !== id); const removed = (state.store.jobs.length ?? 0) !== before; await persist(state); armTimer(state); if (removed) { emit(state, { jobId: id, action: "removed" }); + + // Clean up run log file + const logPath = resolveCronRunLogPath({ storePath: state.deps.storePath, jobId: id }); + void fs.unlink(logPath).catch(() => undefined); + + // Clean up sessions from the session store + if (job && state.deps.resolveSessionStorePath) { + const sessionStorePath = state.deps.resolveSessionStorePath(job.agentId); + void purgeCronJobSessions({ + jobId: id, + sessionStorePath, + log: state.deps.log, + }).catch((err) => { + state.deps.log.warn( + { err: String(err), jobId: id }, + "cron: failed to purge job sessions on removal", + ); + }); + } } return { ok: true, removed } as const; }); diff --git a/src/cron/session-reaper.ts b/src/cron/session-reaper.ts index dd0094d4c57..313adad9d14 100644 --- a/src/cron/session-reaper.ts +++ b/src/cron/session-reaper.ts @@ -14,6 +14,7 @@ import { } from "../config/sessions.js"; import type { CronConfig } from "../config/types.cron.js"; import { cleanupArchivedSessionTranscripts } from "../gateway/session-utils.fs.js"; +import { parseAgentSessionKey } from "../sessions/session-key-utils.js"; import { isCronRunSessionKey } from "../sessions/session-key-utils.js"; import type { Logger } from "./service/state.js"; @@ -146,6 +147,75 @@ export async function sweepCronRunSessions(params: { return { swept: true, pruned }; } +/** + * Purge all sessions associated with a deleted cron job. + * Removes both the base session and all run sessions. + */ +export async function purgeCronJobSessions(params: { + jobId: string; + sessionStorePath: string; + log: Logger; +}): Promise { + const storePath = params.sessionStorePath; + const jobId = params.jobId; + const removedSessions = new Map(); + + try { + await updateSessionStore(storePath, (store) => { + for (const key of Object.keys(store)) { + const parsed = parseAgentSessionKey(key); + if (!parsed) { + continue; + } + + // Match base session (cron:) or run session (cron::run:) + const isBase = parsed.rest === `cron:${jobId}`; + const isRun = parsed.rest.startsWith(`cron:${jobId}:run:`); + + if (isBase || isRun) { + const entry = store[key]; + if (entry) { + removedSessions.set(entry.sessionId, entry.sessionFile); + } + delete store[key]; + } + } + }); + } catch (err) { + params.log.warn({ err: String(err), jobId }, "cron-reaper: failed to purge job sessions"); + return; + } + + // Cleanup transcripts for purged sessions + if (removedSessions.size > 0) { + try { + const store = loadSessionStore(storePath, { skipCache: true }); + const referencedSessionIds = new Set( + Object.values(store) + .map((entry) => entry?.sessionId) + .filter((id): id is string => Boolean(id)), + ); + const archivedDirs = archiveRemovedSessionTranscripts({ + removedSessionFiles: removedSessions, + referencedSessionIds, + storePath, + reason: "deleted", + restrictToStoreDir: true, + }); + if (archivedDirs.size > 0) { + await cleanupArchivedSessionTranscripts({ + directories: [...archivedDirs], + olderThanMs: 0, // delete immediately + reason: "deleted", + nowMs: Date.now(), + }); + } + } catch (err) { + params.log.warn({ err: String(err), jobId }, "cron-reaper: purged transcript cleanup failed"); + } + } +} + /** Reset the throttle timer (for tests). */ export function resetReaperThrottle(): void { lastSweepAtMsByStore.clear();