import { randomUUID } from "node:crypto"; import fs from "node:fs"; import { getAcpSessionManager } from "../../acp/control-plane/manager.js"; import { resolveDefaultAgentId } from "../../agents/agent-scope.js"; import { clearBootstrapSnapshot } from "../../agents/bootstrap-cache.js"; import { abortEmbeddedPiRun, waitForEmbeddedPiRunEnd } from "../../agents/pi-embedded.js"; import { stopSubagentsForRequester } from "../../auto-reply/reply/abort.js"; import { clearSessionQueues } from "../../auto-reply/reply/queue.js"; import { closeTrackedBrowserTabsForSessions } from "../../browser/session-tab-registry.js"; import { loadConfig } from "../../config/config.js"; import { loadSessionStore, snapshotSessionOrigin, resolveMainSessionKey, type SessionEntry, updateSessionStore, } from "../../config/sessions.js"; import { unbindThreadBindingsBySessionKey } from "../../discord/monitor/thread-bindings.js"; import { logVerbose } from "../../globals.js"; import { createInternalHookEvent, triggerInternalHook } from "../../hooks/internal-hooks.js"; import { getGlobalHookRunner } from "../../plugins/hook-runner-global.js"; import { isSubagentSessionKey, normalizeAgentId, parseAgentSessionKey, } from "../../routing/session-key.js"; import { GATEWAY_CLIENT_IDS } from "../protocol/client-info.js"; import { ErrorCodes, errorShape, validateSessionsCompactParams, validateSessionsDeleteParams, validateSessionsListParams, validateSessionsPatchParams, validateSessionsPreviewParams, validateSessionsResetParams, validateSessionsResolveParams, } from "../protocol/index.js"; import { archiveFileOnDisk, archiveSessionTranscripts, listSessionsFromStore, loadCombinedSessionStoreForGateway, loadSessionEntry, pruneLegacyStoreKeys, readSessionPreviewItemsFromTranscript, resolveGatewaySessionStoreTarget, resolveSessionModelRef, resolveSessionTranscriptCandidates, type SessionsPatchResult, type SessionsPreviewEntry, type SessionsPreviewResult, readSessionMessages, } from "../session-utils.js"; import { applySessionsPatchToStore } from "../sessions-patch.js"; import { resolveSessionKeyFromResolveParams } from "../sessions-resolve.js"; import type { GatewayClient, GatewayRequestHandlers, RespondFn } from "./types.js"; import { assertValidParams } from "./validation.js"; function requireSessionKey(key: unknown, respond: RespondFn): string | null { const raw = typeof key === "string" ? key : typeof key === "number" ? String(key) : typeof key === "bigint" ? String(key) : ""; const normalized = raw.trim(); if (!normalized) { respond(false, undefined, errorShape(ErrorCodes.INVALID_REQUEST, "key required")); return null; } return normalized; } function resolveGatewaySessionTargetFromKey(key: string) { const cfg = loadConfig(); const target = resolveGatewaySessionStoreTarget({ cfg, key }); return { cfg, target, storePath: target.storePath }; } function rejectWebchatSessionMutation(params: { action: "patch" | "delete"; client: GatewayClient | null; isWebchatConnect: (params: GatewayClient["connect"] | null | undefined) => boolean; respond: RespondFn; }): boolean { if (!params.client?.connect || !params.isWebchatConnect(params.client.connect)) { return false; } if (params.client.connect.client.id === GATEWAY_CLIENT_IDS.CONTROL_UI) { return false; } params.respond( false, undefined, errorShape( ErrorCodes.INVALID_REQUEST, `webchat clients cannot ${params.action} sessions; use chat.send for session-scoped updates`, ), ); return true; } function migrateAndPruneSessionStoreKey(params: { cfg: ReturnType; key: string; store: Record; }) { const target = resolveGatewaySessionStoreTarget({ cfg: params.cfg, key: params.key, store: params.store, }); const primaryKey = target.canonicalKey; if (!params.store[primaryKey]) { const existingKey = target.storeKeys.find((candidate) => Boolean(params.store[candidate])); if (existingKey) { params.store[primaryKey] = params.store[existingKey]; } } pruneLegacyStoreKeys({ store: params.store, canonicalKey: primaryKey, candidates: target.storeKeys, }); return { target, primaryKey, entry: params.store[primaryKey] }; } function stripRuntimeModelState(entry?: SessionEntry): SessionEntry | undefined { if (!entry) { return entry; } return { ...entry, model: undefined, modelProvider: undefined, contextTokens: undefined, systemPromptReport: undefined, }; } function archiveSessionTranscriptsForSession(params: { sessionId: string | undefined; storePath: string; sessionFile?: string; agentId?: string; reason: "reset" | "deleted"; }): string[] { if (!params.sessionId) { return []; } return archiveSessionTranscripts({ sessionId: params.sessionId, storePath: params.storePath, sessionFile: params.sessionFile, agentId: params.agentId, reason: params.reason, }); } async function emitSessionUnboundLifecycleEvent(params: { targetSessionKey: string; reason: "session-reset" | "session-delete"; emitHooks?: boolean; }) { const targetKind = isSubagentSessionKey(params.targetSessionKey) ? "subagent" : "acp"; unbindThreadBindingsBySessionKey({ targetSessionKey: params.targetSessionKey, targetKind, reason: params.reason, sendFarewell: true, }); if (params.emitHooks === false) { return; } const hookRunner = getGlobalHookRunner(); if (!hookRunner?.hasHooks("subagent_ended")) { return; } await hookRunner.runSubagentEnded( { targetSessionKey: params.targetSessionKey, targetKind, reason: params.reason, sendFarewell: true, outcome: params.reason === "session-reset" ? "reset" : "deleted", }, { childSessionKey: params.targetSessionKey, }, ); } async function ensureSessionRuntimeCleanup(params: { cfg: ReturnType; key: string; target: ReturnType; sessionId?: string; }) { const closeTrackedBrowserTabs = async () => { const closeKeys = new Set([ params.key, params.target.canonicalKey, ...params.target.storeKeys, params.sessionId ?? "", ]); return await closeTrackedBrowserTabsForSessions({ sessionKeys: [...closeKeys], onWarn: (message) => logVerbose(message), }); }; const queueKeys = new Set(params.target.storeKeys); queueKeys.add(params.target.canonicalKey); if (params.sessionId) { queueKeys.add(params.sessionId); } clearSessionQueues([...queueKeys]); stopSubagentsForRequester({ cfg: params.cfg, requesterSessionKey: params.target.canonicalKey }); if (!params.sessionId) { clearBootstrapSnapshot(params.target.canonicalKey); await closeTrackedBrowserTabs(); return undefined; } abortEmbeddedPiRun(params.sessionId); const ended = await waitForEmbeddedPiRunEnd(params.sessionId, 15_000); clearBootstrapSnapshot(params.target.canonicalKey); if (ended) { await closeTrackedBrowserTabs(); return undefined; } return errorShape( ErrorCodes.UNAVAILABLE, `Session ${params.key} is still active; try again in a moment.`, ); } const ACP_RUNTIME_CLEANUP_TIMEOUT_MS = 15_000; async function runAcpCleanupStep(params: { op: () => Promise; }): Promise<{ status: "ok" } | { status: "timeout" } | { status: "error"; error: unknown }> { let timer: NodeJS.Timeout | undefined; const timeoutPromise = new Promise<{ status: "timeout" }>((resolve) => { timer = setTimeout(() => resolve({ status: "timeout" }), ACP_RUNTIME_CLEANUP_TIMEOUT_MS); }); const opPromise = params .op() .then(() => ({ status: "ok" as const })) .catch((error: unknown) => ({ status: "error" as const, error })); const outcome = await Promise.race([opPromise, timeoutPromise]); if (timer) { clearTimeout(timer); } return outcome; } async function closeAcpRuntimeForSession(params: { cfg: ReturnType; sessionKey: string; entry?: SessionEntry; reason: "session-reset" | "session-delete"; }) { if (!params.entry?.acp) { return undefined; } const acpManager = getAcpSessionManager(); const cancelOutcome = await runAcpCleanupStep({ op: async () => { await acpManager.cancelSession({ cfg: params.cfg, sessionKey: params.sessionKey, reason: params.reason, }); }, }); if (cancelOutcome.status === "timeout") { return errorShape( ErrorCodes.UNAVAILABLE, `Session ${params.sessionKey} is still active; try again in a moment.`, ); } if (cancelOutcome.status === "error") { logVerbose( `sessions.${params.reason}: ACP cancel failed for ${params.sessionKey}: ${String(cancelOutcome.error)}`, ); } const closeOutcome = await runAcpCleanupStep({ op: async () => { await acpManager.closeSession({ cfg: params.cfg, sessionKey: params.sessionKey, reason: params.reason, requireAcpSession: false, allowBackendUnavailable: true, }); }, }); if (closeOutcome.status === "timeout") { return errorShape( ErrorCodes.UNAVAILABLE, `Session ${params.sessionKey} is still active; try again in a moment.`, ); } if (closeOutcome.status === "error") { logVerbose( `sessions.${params.reason}: ACP runtime close failed for ${params.sessionKey}: ${String(closeOutcome.error)}`, ); } return undefined; } async function cleanupSessionBeforeMutation(params: { cfg: ReturnType; key: string; target: ReturnType; entry: SessionEntry | undefined; legacyKey?: string; canonicalKey?: string; reason: "session-reset" | "session-delete"; }) { const cleanupError = await ensureSessionRuntimeCleanup({ cfg: params.cfg, key: params.key, target: params.target, sessionId: params.entry?.sessionId, }); if (cleanupError) { return cleanupError; } return await closeAcpRuntimeForSession({ cfg: params.cfg, sessionKey: params.legacyKey ?? params.canonicalKey ?? params.target.canonicalKey ?? params.key, entry: params.entry, reason: params.reason, }); } export const sessionsHandlers: GatewayRequestHandlers = { "sessions.list": ({ params, respond }) => { if (!assertValidParams(params, validateSessionsListParams, "sessions.list", respond)) { return; } const p = params; const cfg = loadConfig(); const { storePath, store } = loadCombinedSessionStoreForGateway(cfg); const result = listSessionsFromStore({ cfg, storePath, store, opts: p, }); respond(true, result, undefined); }, "sessions.preview": ({ params, respond }) => { if (!assertValidParams(params, validateSessionsPreviewParams, "sessions.preview", respond)) { return; } const p = params; const keysRaw = Array.isArray(p.keys) ? p.keys : []; const keys = keysRaw .map((key) => String(key ?? "").trim()) .filter(Boolean) .slice(0, 64); const limit = typeof p.limit === "number" && Number.isFinite(p.limit) ? Math.max(1, p.limit) : 12; const maxChars = typeof p.maxChars === "number" && Number.isFinite(p.maxChars) ? Math.max(20, p.maxChars) : 240; if (keys.length === 0) { respond(true, { ts: Date.now(), previews: [] } satisfies SessionsPreviewResult, undefined); return; } const cfg = loadConfig(); const storeCache = new Map>(); const previews: SessionsPreviewEntry[] = []; for (const key of keys) { try { const storeTarget = resolveGatewaySessionStoreTarget({ cfg, key, scanLegacyKeys: false }); const store = storeCache.get(storeTarget.storePath) ?? loadSessionStore(storeTarget.storePath); storeCache.set(storeTarget.storePath, store); const target = resolveGatewaySessionStoreTarget({ cfg, key, store, }); const entry = target.storeKeys.map((candidate) => store[candidate]).find(Boolean); if (!entry?.sessionId) { previews.push({ key, status: "missing", items: [] }); continue; } const items = readSessionPreviewItemsFromTranscript( entry.sessionId, target.storePath, entry.sessionFile, target.agentId, limit, maxChars, ); previews.push({ key, status: items.length > 0 ? "ok" : "empty", items, }); } catch { previews.push({ key, status: "error", items: [] }); } } respond(true, { ts: Date.now(), previews } satisfies SessionsPreviewResult, undefined); }, "sessions.resolve": async ({ params, respond }) => { if (!assertValidParams(params, validateSessionsResolveParams, "sessions.resolve", respond)) { return; } const p = params; const cfg = loadConfig(); const resolved = await resolveSessionKeyFromResolveParams({ cfg, p }); if (!resolved.ok) { respond(false, undefined, resolved.error); return; } respond(true, { ok: true, key: resolved.key }, undefined); }, "sessions.patch": async ({ params, respond, context, client, isWebchatConnect }) => { if (!assertValidParams(params, validateSessionsPatchParams, "sessions.patch", respond)) { return; } const p = params; const key = requireSessionKey(p.key, respond); if (!key) { return; } if (rejectWebchatSessionMutation({ action: "patch", client, isWebchatConnect, respond })) { return; } const { cfg, target, storePath } = resolveGatewaySessionTargetFromKey(key); const applied = await updateSessionStore(storePath, async (store) => { const { primaryKey } = migrateAndPruneSessionStoreKey({ cfg, key, store }); return await applySessionsPatchToStore({ cfg, store, storeKey: primaryKey, patch: p, loadGatewayModelCatalog: context.loadGatewayModelCatalog, }); }); if (!applied.ok) { respond(false, undefined, applied.error); return; } const parsed = parseAgentSessionKey(target.canonicalKey ?? key); const agentId = normalizeAgentId(parsed?.agentId ?? resolveDefaultAgentId(cfg)); const resolved = resolveSessionModelRef(cfg, applied.entry, agentId); const result: SessionsPatchResult = { ok: true, path: storePath, key: target.canonicalKey, entry: applied.entry, resolved: { modelProvider: resolved.provider, model: resolved.model, }, }; respond(true, result, undefined); }, "sessions.reset": async ({ params, respond }) => { if (!assertValidParams(params, validateSessionsResetParams, "sessions.reset", respond)) { return; } const p = params; const key = requireSessionKey(p.key, respond); if (!key) { return; } const { cfg, target, storePath } = resolveGatewaySessionTargetFromKey(key); const { entry, legacyKey, canonicalKey } = loadSessionEntry(key); const hadExistingEntry = Boolean(entry); const commandReason = p.reason === "new" ? "new" : "reset"; const hookEvent = createInternalHookEvent( "command", commandReason, target.canonicalKey ?? key, { sessionEntry: entry, previousSessionEntry: entry, commandSource: "gateway:sessions.reset", cfg, }, ); await triggerInternalHook(hookEvent); const mutationCleanupError = await cleanupSessionBeforeMutation({ cfg, key, target, entry, legacyKey, canonicalKey, reason: "session-reset", }); if (mutationCleanupError) { respond(false, undefined, mutationCleanupError); return; } let oldSessionId: string | undefined; let oldSessionFile: string | undefined; const next = await updateSessionStore(storePath, (store) => { const { primaryKey } = migrateAndPruneSessionStoreKey({ cfg, key, store }); const entry = store[primaryKey]; const resetEntry = stripRuntimeModelState(entry); const parsed = parseAgentSessionKey(primaryKey); const sessionAgentId = normalizeAgentId(parsed?.agentId ?? resolveDefaultAgentId(cfg)); const resolvedModel = resolveSessionModelRef(cfg, resetEntry, sessionAgentId); oldSessionId = entry?.sessionId; oldSessionFile = entry?.sessionFile; const now = Date.now(); const nextEntry: SessionEntry = { sessionId: randomUUID(), updatedAt: now, systemSent: false, abortedLastRun: false, thinkingLevel: entry?.thinkingLevel, verboseLevel: entry?.verboseLevel, reasoningLevel: entry?.reasoningLevel, responseUsage: entry?.responseUsage, model: resolvedModel.model, modelProvider: resolvedModel.provider, contextTokens: resetEntry?.contextTokens, sendPolicy: entry?.sendPolicy, label: entry?.label, origin: snapshotSessionOrigin(entry), lastChannel: entry?.lastChannel, lastTo: entry?.lastTo, skillsSnapshot: entry?.skillsSnapshot, // Reset token counts to 0 on session reset (#1523) inputTokens: 0, outputTokens: 0, totalTokens: 0, totalTokensFresh: true, }; store[primaryKey] = nextEntry; return nextEntry; }); // Archive old transcript so it doesn't accumulate on disk (#14869). archiveSessionTranscriptsForSession({ sessionId: oldSessionId, storePath, sessionFile: oldSessionFile, agentId: target.agentId, reason: "reset", }); if (hadExistingEntry) { await emitSessionUnboundLifecycleEvent({ targetSessionKey: target.canonicalKey ?? key, reason: "session-reset", }); } respond(true, { ok: true, key: target.canonicalKey, entry: next }, undefined); }, "sessions.delete": async ({ params, respond, client, isWebchatConnect }) => { if (!assertValidParams(params, validateSessionsDeleteParams, "sessions.delete", respond)) { return; } const p = params; const key = requireSessionKey(p.key, respond); if (!key) { return; } if (rejectWebchatSessionMutation({ action: "delete", client, isWebchatConnect, respond })) { return; } const { cfg, target, storePath } = resolveGatewaySessionTargetFromKey(key); const mainKey = resolveMainSessionKey(cfg); if (target.canonicalKey === mainKey) { respond( false, undefined, errorShape(ErrorCodes.INVALID_REQUEST, `Cannot delete the main session (${mainKey}).`), ); return; } const deleteTranscript = typeof p.deleteTranscript === "boolean" ? p.deleteTranscript : true; const { entry, legacyKey, canonicalKey } = loadSessionEntry(key); const mutationCleanupError = await cleanupSessionBeforeMutation({ cfg, key, target, entry, legacyKey, canonicalKey, reason: "session-delete", }); if (mutationCleanupError) { respond(false, undefined, mutationCleanupError); return; } const sessionId = entry?.sessionId; const deleted = await updateSessionStore(storePath, (store) => { const { primaryKey } = migrateAndPruneSessionStoreKey({ cfg, key, store }); const hadEntry = Boolean(store[primaryKey]); if (hadEntry) { delete store[primaryKey]; } return hadEntry; }); const archived = deleted && deleteTranscript ? archiveSessionTranscriptsForSession({ sessionId, storePath, sessionFile: entry?.sessionFile, agentId: target.agentId, reason: "deleted", }) : []; if (deleted) { const emitLifecycleHooks = p.emitLifecycleHooks !== false; await emitSessionUnboundLifecycleEvent({ targetSessionKey: target.canonicalKey ?? key, reason: "session-delete", emitHooks: emitLifecycleHooks, }); } respond(true, { ok: true, key: target.canonicalKey, deleted, archived }, undefined); }, "sessions.get": ({ params, respond }) => { const p = params; const key = requireSessionKey(p.key ?? p.sessionKey, respond); if (!key) { return; } const limit = typeof p.limit === "number" && Number.isFinite(p.limit) ? Math.max(1, Math.floor(p.limit)) : 200; const { target, storePath } = resolveGatewaySessionTargetFromKey(key); const store = loadSessionStore(storePath); const entry = target.storeKeys.map((k) => store[k]).find(Boolean); if (!entry?.sessionId) { respond(true, { messages: [] }, undefined); return; } const allMessages = readSessionMessages(entry.sessionId, storePath, entry.sessionFile); const messages = limit < allMessages.length ? allMessages.slice(-limit) : allMessages; respond(true, { messages }, undefined); }, "sessions.compact": async ({ params, respond }) => { if (!assertValidParams(params, validateSessionsCompactParams, "sessions.compact", respond)) { return; } const p = params; const key = requireSessionKey(p.key, respond); if (!key) { return; } const maxLines = typeof p.maxLines === "number" && Number.isFinite(p.maxLines) ? Math.max(1, Math.floor(p.maxLines)) : 400; const { cfg, target, storePath } = resolveGatewaySessionTargetFromKey(key); // Lock + read in a short critical section; transcript work happens outside. const compactTarget = await updateSessionStore(storePath, (store) => { const { entry, primaryKey } = migrateAndPruneSessionStoreKey({ cfg, key, store }); return { entry, primaryKey }; }); const entry = compactTarget.entry; const sessionId = entry?.sessionId; if (!sessionId) { respond( true, { ok: true, key: target.canonicalKey, compacted: false, reason: "no sessionId", }, undefined, ); return; } const filePath = resolveSessionTranscriptCandidates( sessionId, storePath, entry?.sessionFile, target.agentId, ).find((candidate) => fs.existsSync(candidate)); if (!filePath) { respond( true, { ok: true, key: target.canonicalKey, compacted: false, reason: "no transcript", }, undefined, ); return; } const raw = fs.readFileSync(filePath, "utf-8"); const lines = raw.split(/\r?\n/).filter((l) => l.trim().length > 0); if (lines.length <= maxLines) { respond( true, { ok: true, key: target.canonicalKey, compacted: false, kept: lines.length, }, undefined, ); return; } const archived = archiveFileOnDisk(filePath, "bak"); const keptLines = lines.slice(-maxLines); fs.writeFileSync(filePath, `${keptLines.join("\n")}\n`, "utf-8"); await updateSessionStore(storePath, (store) => { const entryKey = compactTarget.primaryKey; const entryToUpdate = store[entryKey]; if (!entryToUpdate) { return; } delete entryToUpdate.inputTokens; delete entryToUpdate.outputTokens; delete entryToUpdate.totalTokens; delete entryToUpdate.totalTokensFresh; entryToUpdate.updatedAt = Date.now(); }); respond( true, { ok: true, key: target.canonicalKey, compacted: true, archived, kept: keptLines.length, }, undefined, ); }, };