import { randomUUID } from "node:crypto"; import fs from "node:fs"; import type { GatewayRequestHandlers } from "./types.js"; import { resolveDefaultAgentId } from "../../agents/agent-scope.js"; import { abortEmbeddedPiRun, waitForEmbeddedPiRunEnd } from "../../agents/pi-embedded.js"; import { stopSubagentsForRequester } from "../../auto-reply/reply/abort.js"; import { clearSessionQueues } from "../../auto-reply/reply/queue.js"; import { loadConfig } from "../../config/config.js"; import { loadSessionStore, snapshotSessionOrigin, resolveMainSessionKey, type SessionEntry, updateSessionStore, } from "../../config/sessions.js"; import { normalizeAgentId, parseAgentSessionKey } from "../../routing/session-key.js"; import { ErrorCodes, errorShape, formatValidationErrors, validateSessionsCompactParams, validateSessionsDeleteParams, validateSessionsListParams, validateSessionsPatchParams, validateSessionsPreviewParams, validateSessionsResetParams, validateSessionsResolveParams, } from "../protocol/index.js"; import { archiveFileOnDisk, archiveSessionTranscripts, listSessionsFromStore, loadCombinedSessionStoreForGateway, loadSessionEntry, pruneLegacyStoreKeys, readSessionPreviewItemsFromTranscript, resolveGatewaySessionStoreTarget, resolveSessionModelRef, resolveSessionTranscriptCandidates, type SessionsPatchResult, type SessionsPreviewEntry, type SessionsPreviewResult, } from "../session-utils.js"; import { applySessionsPatchToStore } from "../sessions-patch.js"; import { resolveSessionKeyFromResolveParams } from "../sessions-resolve.js"; function migrateAndPruneSessionStoreKey(params: { cfg: ReturnType; key: string; store: Record; }) { const target = resolveGatewaySessionStoreTarget({ cfg: params.cfg, key: params.key, store: params.store, }); const primaryKey = target.canonicalKey; if (!params.store[primaryKey]) { const existingKey = target.storeKeys.find((candidate) => Boolean(params.store[candidate])); if (existingKey) { params.store[primaryKey] = params.store[existingKey]; } } pruneLegacyStoreKeys({ store: params.store, canonicalKey: primaryKey, candidates: target.storeKeys, }); return { target, primaryKey, entry: params.store[primaryKey] }; } function archiveSessionTranscriptsForSession(params: { sessionId: string | undefined; storePath: string; sessionFile?: string; agentId?: string; reason: "reset" | "deleted"; }): string[] { if (!params.sessionId) { return []; } return archiveSessionTranscripts({ sessionId: params.sessionId, storePath: params.storePath, sessionFile: params.sessionFile, agentId: params.agentId, reason: params.reason, }); } export const sessionsHandlers: GatewayRequestHandlers = { "sessions.list": ({ params, respond }) => { if (!validateSessionsListParams(params)) { respond( false, undefined, errorShape( ErrorCodes.INVALID_REQUEST, `invalid sessions.list params: ${formatValidationErrors(validateSessionsListParams.errors)}`, ), ); return; } const p = params; const cfg = loadConfig(); const { storePath, store } = loadCombinedSessionStoreForGateway(cfg); const result = listSessionsFromStore({ cfg, storePath, store, opts: p, }); respond(true, result, undefined); }, "sessions.preview": ({ params, respond }) => { if (!validateSessionsPreviewParams(params)) { respond( false, undefined, errorShape( ErrorCodes.INVALID_REQUEST, `invalid sessions.preview params: ${formatValidationErrors( validateSessionsPreviewParams.errors, )}`, ), ); return; } const p = params; const keysRaw = Array.isArray(p.keys) ? p.keys : []; const keys = keysRaw .map((key) => String(key ?? "").trim()) .filter(Boolean) .slice(0, 64); const limit = typeof p.limit === "number" && Number.isFinite(p.limit) ? Math.max(1, p.limit) : 12; const maxChars = typeof p.maxChars === "number" && Number.isFinite(p.maxChars) ? Math.max(20, p.maxChars) : 240; if (keys.length === 0) { respond(true, { ts: Date.now(), previews: [] } satisfies SessionsPreviewResult, undefined); return; } const cfg = loadConfig(); const storeCache = new Map>(); const previews: SessionsPreviewEntry[] = []; for (const key of keys) { try { const storeTarget = resolveGatewaySessionStoreTarget({ cfg, key, scanLegacyKeys: false }); const store = storeCache.get(storeTarget.storePath) ?? loadSessionStore(storeTarget.storePath); storeCache.set(storeTarget.storePath, store); const target = resolveGatewaySessionStoreTarget({ cfg, key, store, }); const entry = target.storeKeys.map((candidate) => store[candidate]).find(Boolean); if (!entry?.sessionId) { previews.push({ key, status: "missing", items: [] }); continue; } const items = readSessionPreviewItemsFromTranscript( entry.sessionId, target.storePath, entry.sessionFile, target.agentId, limit, maxChars, ); previews.push({ key, status: items.length > 0 ? "ok" : "empty", items, }); } catch { previews.push({ key, status: "error", items: [] }); } } respond(true, { ts: Date.now(), previews } satisfies SessionsPreviewResult, undefined); }, "sessions.resolve": async ({ params, respond }) => { if (!validateSessionsResolveParams(params)) { respond( false, undefined, errorShape( ErrorCodes.INVALID_REQUEST, `invalid sessions.resolve params: ${formatValidationErrors(validateSessionsResolveParams.errors)}`, ), ); return; } const p = params; const cfg = loadConfig(); const resolved = await resolveSessionKeyFromResolveParams({ cfg, p }); if (!resolved.ok) { respond(false, undefined, resolved.error); return; } respond(true, { ok: true, key: resolved.key }, undefined); }, "sessions.patch": async ({ params, respond, context }) => { if (!validateSessionsPatchParams(params)) { respond( false, undefined, errorShape( ErrorCodes.INVALID_REQUEST, `invalid sessions.patch params: ${formatValidationErrors(validateSessionsPatchParams.errors)}`, ), ); return; } const p = params; const key = String(p.key ?? "").trim(); if (!key) { respond(false, undefined, errorShape(ErrorCodes.INVALID_REQUEST, "key required")); return; } const cfg = loadConfig(); const target = resolveGatewaySessionStoreTarget({ cfg, key }); const storePath = target.storePath; const applied = await updateSessionStore(storePath, async (store) => { const { primaryKey } = migrateAndPruneSessionStoreKey({ cfg, key, store }); return await applySessionsPatchToStore({ cfg, store, storeKey: primaryKey, patch: p, loadGatewayModelCatalog: context.loadGatewayModelCatalog, }); }); if (!applied.ok) { respond(false, undefined, applied.error); return; } const parsed = parseAgentSessionKey(target.canonicalKey ?? key); const agentId = normalizeAgentId(parsed?.agentId ?? resolveDefaultAgentId(cfg)); const resolved = resolveSessionModelRef(cfg, applied.entry, agentId); const result: SessionsPatchResult = { ok: true, path: storePath, key: target.canonicalKey, entry: applied.entry, resolved: { modelProvider: resolved.provider, model: resolved.model, }, }; respond(true, result, undefined); }, "sessions.reset": async ({ params, respond }) => { if (!validateSessionsResetParams(params)) { respond( false, undefined, errorShape( ErrorCodes.INVALID_REQUEST, `invalid sessions.reset params: ${formatValidationErrors(validateSessionsResetParams.errors)}`, ), ); return; } const p = params; const key = String(p.key ?? "").trim(); if (!key) { respond(false, undefined, errorShape(ErrorCodes.INVALID_REQUEST, "key required")); return; } const cfg = loadConfig(); const target = resolveGatewaySessionStoreTarget({ cfg, key }); const storePath = target.storePath; let oldSessionId: string | undefined; let oldSessionFile: string | undefined; const next = await updateSessionStore(storePath, (store) => { const { primaryKey } = migrateAndPruneSessionStoreKey({ cfg, key, store }); const entry = store[primaryKey]; oldSessionId = entry?.sessionId; oldSessionFile = entry?.sessionFile; const now = Date.now(); const nextEntry: SessionEntry = { sessionId: randomUUID(), updatedAt: now, systemSent: false, abortedLastRun: false, thinkingLevel: entry?.thinkingLevel, verboseLevel: entry?.verboseLevel, reasoningLevel: entry?.reasoningLevel, responseUsage: entry?.responseUsage, model: entry?.model, contextTokens: entry?.contextTokens, sendPolicy: entry?.sendPolicy, label: entry?.label, origin: snapshotSessionOrigin(entry), lastChannel: entry?.lastChannel, lastTo: entry?.lastTo, skillsSnapshot: entry?.skillsSnapshot, // Reset token counts to 0 on session reset (#1523) inputTokens: 0, outputTokens: 0, totalTokens: 0, totalTokensFresh: true, }; store[primaryKey] = nextEntry; return nextEntry; }); // Archive old transcript so it doesn't accumulate on disk (#14869). archiveSessionTranscriptsForSession({ sessionId: oldSessionId, storePath, sessionFile: oldSessionFile, agentId: target.agentId, reason: "reset", }); respond(true, { ok: true, key: target.canonicalKey, entry: next }, undefined); }, "sessions.delete": async ({ params, respond }) => { if (!validateSessionsDeleteParams(params)) { respond( false, undefined, errorShape( ErrorCodes.INVALID_REQUEST, `invalid sessions.delete params: ${formatValidationErrors(validateSessionsDeleteParams.errors)}`, ), ); return; } const p = params; const key = String(p.key ?? "").trim(); if (!key) { respond(false, undefined, errorShape(ErrorCodes.INVALID_REQUEST, "key required")); return; } const cfg = loadConfig(); const mainKey = resolveMainSessionKey(cfg); const target = resolveGatewaySessionStoreTarget({ cfg, key }); if (target.canonicalKey === mainKey) { respond( false, undefined, errorShape(ErrorCodes.INVALID_REQUEST, `Cannot delete the main session (${mainKey}).`), ); return; } const deleteTranscript = typeof p.deleteTranscript === "boolean" ? p.deleteTranscript : true; const storePath = target.storePath; const { entry } = loadSessionEntry(key); const sessionId = entry?.sessionId; const existed = Boolean(entry); const queueKeys = new Set(target.storeKeys); queueKeys.add(target.canonicalKey); if (sessionId) { queueKeys.add(sessionId); } clearSessionQueues([...queueKeys]); stopSubagentsForRequester({ cfg, requesterSessionKey: target.canonicalKey }); if (sessionId) { abortEmbeddedPiRun(sessionId); const ended = await waitForEmbeddedPiRunEnd(sessionId, 15_000); if (!ended) { respond( false, undefined, errorShape( ErrorCodes.UNAVAILABLE, `Session ${key} is still active; try again in a moment.`, ), ); return; } } await updateSessionStore(storePath, (store) => { const { primaryKey } = migrateAndPruneSessionStoreKey({ cfg, key, store }); if (store[primaryKey]) { delete store[primaryKey]; } }); const archived = deleteTranscript ? archiveSessionTranscriptsForSession({ sessionId, storePath, sessionFile: entry?.sessionFile, agentId: target.agentId, reason: "deleted", }) : []; respond(true, { ok: true, key: target.canonicalKey, deleted: existed, archived }, undefined); }, "sessions.compact": async ({ params, respond }) => { if (!validateSessionsCompactParams(params)) { respond( false, undefined, errorShape( ErrorCodes.INVALID_REQUEST, `invalid sessions.compact params: ${formatValidationErrors(validateSessionsCompactParams.errors)}`, ), ); return; } const p = params; const key = String(p.key ?? "").trim(); if (!key) { respond(false, undefined, errorShape(ErrorCodes.INVALID_REQUEST, "key required")); return; } const maxLines = typeof p.maxLines === "number" && Number.isFinite(p.maxLines) ? Math.max(1, Math.floor(p.maxLines)) : 400; const cfg = loadConfig(); const target = resolveGatewaySessionStoreTarget({ cfg, key }); const storePath = target.storePath; // Lock + read in a short critical section; transcript work happens outside. const compactTarget = await updateSessionStore(storePath, (store) => { const { entry, primaryKey } = migrateAndPruneSessionStoreKey({ cfg, key, store }); return { entry, primaryKey }; }); const entry = compactTarget.entry; const sessionId = entry?.sessionId; if (!sessionId) { respond( true, { ok: true, key: target.canonicalKey, compacted: false, reason: "no sessionId", }, undefined, ); return; } const filePath = resolveSessionTranscriptCandidates( sessionId, storePath, entry?.sessionFile, target.agentId, ).find((candidate) => fs.existsSync(candidate)); if (!filePath) { respond( true, { ok: true, key: target.canonicalKey, compacted: false, reason: "no transcript", }, undefined, ); return; } const raw = fs.readFileSync(filePath, "utf-8"); const lines = raw.split(/\r?\n/).filter((l) => l.trim().length > 0); if (lines.length <= maxLines) { respond( true, { ok: true, key: target.canonicalKey, compacted: false, kept: lines.length, }, undefined, ); return; } const archived = archiveFileOnDisk(filePath, "bak"); const keptLines = lines.slice(-maxLines); fs.writeFileSync(filePath, `${keptLines.join("\n")}\n`, "utf-8"); await updateSessionStore(storePath, (store) => { const entryKey = compactTarget.primaryKey; const entryToUpdate = store[entryKey]; if (!entryToUpdate) { return; } delete entryToUpdate.inputTokens; delete entryToUpdate.outputTokens; delete entryToUpdate.totalTokens; delete entryToUpdate.totalTokensFresh; entryToUpdate.updatedAt = Date.now(); }); respond( true, { ok: true, key: target.canonicalKey, compacted: true, archived, kept: keptLines.length, }, undefined, ); }, };