382 lines
12 KiB
TypeScript
Raw Normal View History

2026-01-04 04:05:18 +01:00
import { randomUUID } from "node:crypto";
import fs from "node:fs";
import {
abortEmbeddedPiRun,
isEmbeddedPiRunActive,
resolveEmbeddedSessionLane,
waitForEmbeddedPiRunEnd,
} from "../../agents/pi-embedded.js";
import { loadConfig } from "../../config/config.js";
import {
loadSessionStore,
resolveMainSessionKey,
type SessionEntry,
saveSessionStore,
} from "../../config/sessions.js";
import { clearCommandLane } from "../../process/command-queue.js";
import {
ErrorCodes,
errorShape,
formatValidationErrors,
validateSessionsCompactParams,
validateSessionsDeleteParams,
validateSessionsListParams,
validateSessionsPatchParams,
validateSessionsResetParams,
validateSessionsResolveParams,
2026-01-04 04:05:18 +01:00
} from "../protocol/index.js";
import {
archiveFileOnDisk,
listSessionsFromStore,
loadCombinedSessionStoreForGateway,
resolveGatewaySessionStoreTarget,
2026-01-04 04:05:18 +01:00
resolveSessionTranscriptCandidates,
type SessionsPatchResult,
} from "../session-utils.js";
import { applySessionsPatchToStore } from "../sessions-patch.js";
import { resolveSessionKeyFromResolveParams } from "../sessions-resolve.js";
2026-01-04 04:05:18 +01:00
import type { GatewayRequestHandlers } from "./types.js";
export const sessionsHandlers: GatewayRequestHandlers = {
"sessions.list": ({ params, respond }) => {
if (!validateSessionsListParams(params)) {
respond(
false,
undefined,
errorShape(
ErrorCodes.INVALID_REQUEST,
`invalid sessions.list params: ${formatValidationErrors(validateSessionsListParams.errors)}`,
),
);
return;
}
const p = params as import("../protocol/index.js").SessionsListParams;
const cfg = loadConfig();
const { storePath, store } = loadCombinedSessionStoreForGateway(cfg);
2026-01-04 04:05:18 +01:00
const result = listSessionsFromStore({
cfg,
storePath,
store,
opts: p,
});
respond(true, result, undefined);
},
"sessions.resolve": ({ params, respond }) => {
if (!validateSessionsResolveParams(params)) {
respond(
false,
undefined,
errorShape(
ErrorCodes.INVALID_REQUEST,
`invalid sessions.resolve params: ${formatValidationErrors(validateSessionsResolveParams.errors)}`,
),
);
return;
}
const p = params as import("../protocol/index.js").SessionsResolveParams;
const cfg = loadConfig();
const resolved = resolveSessionKeyFromResolveParams({ cfg, p });
if (!resolved.ok) {
respond(false, undefined, resolved.error);
return;
}
respond(true, { ok: true, key: resolved.key }, undefined);
},
2026-01-04 04:05:18 +01:00
"sessions.patch": async ({ params, respond, context }) => {
if (!validateSessionsPatchParams(params)) {
respond(
false,
undefined,
errorShape(
ErrorCodes.INVALID_REQUEST,
`invalid sessions.patch params: ${formatValidationErrors(validateSessionsPatchParams.errors)}`,
),
);
return;
}
const p = params as import("../protocol/index.js").SessionsPatchParams;
const key = String(p.key ?? "").trim();
if (!key) {
respond(false, undefined, errorShape(ErrorCodes.INVALID_REQUEST, "key required"));
2026-01-04 04:05:18 +01:00
return;
}
const cfg = loadConfig();
const target = resolveGatewaySessionStoreTarget({ cfg, key });
const storePath = target.storePath;
2026-01-04 04:05:18 +01:00
const store = loadSessionStore(storePath);
const primaryKey = target.storeKeys[0] ?? key;
const existingKey = target.storeKeys.find((candidate) => store[candidate]);
if (existingKey && existingKey !== primaryKey && !store[primaryKey]) {
store[primaryKey] = store[existingKey];
delete store[existingKey];
}
const applied = await applySessionsPatchToStore({
cfg,
store,
storeKey: primaryKey,
patch: p,
loadGatewayModelCatalog: context.loadGatewayModelCatalog,
});
if (!applied.ok) {
respond(false, undefined, applied.error);
return;
2026-01-04 04:05:18 +01:00
}
await saveSessionStore(storePath, store);
const result: SessionsPatchResult = {
ok: true,
path: storePath,
key: target.canonicalKey,
entry: applied.entry,
2026-01-04 04:05:18 +01:00
};
respond(true, result, undefined);
},
"sessions.reset": async ({ params, respond }) => {
if (!validateSessionsResetParams(params)) {
respond(
false,
undefined,
errorShape(
ErrorCodes.INVALID_REQUEST,
`invalid sessions.reset params: ${formatValidationErrors(validateSessionsResetParams.errors)}`,
),
);
return;
}
const p = params as import("../protocol/index.js").SessionsResetParams;
const key = String(p.key ?? "").trim();
if (!key) {
respond(false, undefined, errorShape(ErrorCodes.INVALID_REQUEST, "key required"));
2026-01-04 04:05:18 +01:00
return;
}
const cfg = loadConfig();
const target = resolveGatewaySessionStoreTarget({ cfg, key });
const storePath = target.storePath;
const store = loadSessionStore(storePath);
const primaryKey = target.storeKeys[0] ?? key;
const existingKey = target.storeKeys.find((candidate) => store[candidate]);
if (existingKey && existingKey !== primaryKey && !store[primaryKey]) {
store[primaryKey] = store[existingKey];
delete store[existingKey];
}
const entry = store[primaryKey];
2026-01-04 04:05:18 +01:00
const now = Date.now();
const next: SessionEntry = {
sessionId: randomUUID(),
updatedAt: now,
systemSent: false,
abortedLastRun: false,
thinkingLevel: entry?.thinkingLevel,
verboseLevel: entry?.verboseLevel,
reasoningLevel: entry?.reasoningLevel,
2026-01-09 02:21:17 +00:00
responseUsage: entry?.responseUsage,
2026-01-04 04:05:18 +01:00
model: entry?.model,
contextTokens: entry?.contextTokens,
sendPolicy: entry?.sendPolicy,
feat(sessions): expose label in sessions.list and support label lookup in sessions_send - Add `label` field to session entries and expose it in `sessions.list` - Display label column in the web UI sessions table - Support `label` parameter in `sessions_send` for lookup by label instead of sessionKey - `sessions.patch`: Accept and store `label` field - `sessions.list`: Return `label` in session entries - `sessions_spawn`: Pass label through to registry and announce flow - `sessions_send`: Accept optional `label` param, lookup session by label if sessionKey not provided - `agent` method: Accept `label` and `spawnedBy` params (stored in session entry) - Add `label` column to sessions table in web UI - Changed session store writes to merge with existing entry (`{ ...existing, ...new }`) to preserve fields like `label` that might be set separately We attempted to implement label persistence "properly" by passing the label through the `agent` call and storing it during session initialization. However, the auto-reply flow has multiple write points that overwrite the session entry, and making all of them merge-aware proved unreliable. The working solution patches the label in the `finally` block of `runSubagentAnnounceFlow`, after all other session writes complete. This is a workaround but robust - the patch happens at the very end, just before potential cleanup. A future refactor could make session writes consistently merge-based, which would allow the cleaner approach of setting label at spawn time. ```typescript // Spawn with label sessions_spawn({ task: "...", label: "my-worker" }) // Later, find by label sessions_send({ label: "my-worker", message: "continue..." }) // Or use sessions_list to see labels sessions_list() // includes label field in response ```
2026-01-08 23:17:08 +00:00
label: entry?.label,
lastChannel: entry?.lastChannel,
2026-01-04 04:05:18 +01:00
lastTo: entry?.lastTo,
skillsSnapshot: entry?.skillsSnapshot,
};
store[primaryKey] = next;
2026-01-04 04:05:18 +01:00
await saveSessionStore(storePath, store);
respond(true, { ok: true, key: target.canonicalKey, entry: next }, undefined);
2026-01-04 04:05:18 +01:00
},
"sessions.delete": async ({ params, respond }) => {
if (!validateSessionsDeleteParams(params)) {
respond(
false,
undefined,
errorShape(
ErrorCodes.INVALID_REQUEST,
`invalid sessions.delete params: ${formatValidationErrors(validateSessionsDeleteParams.errors)}`,
),
);
return;
}
const p = params as import("../protocol/index.js").SessionsDeleteParams;
const key = String(p.key ?? "").trim();
if (!key) {
respond(false, undefined, errorShape(ErrorCodes.INVALID_REQUEST, "key required"));
2026-01-04 04:05:18 +01:00
return;
}
const cfg = loadConfig();
const mainKey = resolveMainSessionKey(cfg);
const target = resolveGatewaySessionStoreTarget({ cfg, key });
if (target.canonicalKey === mainKey) {
2026-01-04 04:05:18 +01:00
respond(
false,
undefined,
errorShape(ErrorCodes.INVALID_REQUEST, `Cannot delete the main session (${mainKey}).`),
2026-01-04 04:05:18 +01:00
);
return;
}
const deleteTranscript = typeof p.deleteTranscript === "boolean" ? p.deleteTranscript : true;
2026-01-04 04:05:18 +01:00
const storePath = target.storePath;
const store = loadSessionStore(storePath);
const primaryKey = target.storeKeys[0] ?? key;
const existingKey = target.storeKeys.find((candidate) => store[candidate]);
if (existingKey && existingKey !== primaryKey && !store[primaryKey]) {
store[primaryKey] = store[existingKey];
delete store[existingKey];
}
const entry = store[primaryKey];
2026-01-04 04:05:18 +01:00
const sessionId = entry?.sessionId;
const existed = Boolean(entry);
clearCommandLane(resolveEmbeddedSessionLane(target.canonicalKey));
2026-01-04 04:05:18 +01:00
if (sessionId && isEmbeddedPiRunActive(sessionId)) {
abortEmbeddedPiRun(sessionId);
const ended = await waitForEmbeddedPiRunEnd(sessionId, 15_000);
if (!ended) {
respond(
false,
undefined,
errorShape(
ErrorCodes.UNAVAILABLE,
`Session ${key} is still active; try again in a moment.`,
),
);
return;
}
}
if (existed) delete store[primaryKey];
2026-01-04 04:05:18 +01:00
await saveSessionStore(storePath, store);
const archived: string[] = [];
if (deleteTranscript && sessionId) {
for (const candidate of resolveSessionTranscriptCandidates(
sessionId,
storePath,
entry?.sessionFile,
target.agentId,
2026-01-04 04:05:18 +01:00
)) {
if (!fs.existsSync(candidate)) continue;
try {
archived.push(archiveFileOnDisk(candidate, "deleted"));
} catch {
// Best-effort.
}
}
}
respond(true, { ok: true, key: target.canonicalKey, deleted: existed, archived }, undefined);
2026-01-04 04:05:18 +01:00
},
"sessions.compact": async ({ params, respond }) => {
if (!validateSessionsCompactParams(params)) {
respond(
false,
undefined,
errorShape(
ErrorCodes.INVALID_REQUEST,
`invalid sessions.compact params: ${formatValidationErrors(validateSessionsCompactParams.errors)}`,
),
);
return;
}
const p = params as import("../protocol/index.js").SessionsCompactParams;
const key = String(p.key ?? "").trim();
if (!key) {
respond(false, undefined, errorShape(ErrorCodes.INVALID_REQUEST, "key required"));
2026-01-04 04:05:18 +01:00
return;
}
const maxLines =
typeof p.maxLines === "number" && Number.isFinite(p.maxLines)
? Math.max(1, Math.floor(p.maxLines))
: 400;
const cfg = loadConfig();
const target = resolveGatewaySessionStoreTarget({ cfg, key });
const storePath = target.storePath;
const store = loadSessionStore(storePath);
const primaryKey = target.storeKeys[0] ?? key;
const existingKey = target.storeKeys.find((candidate) => store[candidate]);
if (existingKey && existingKey !== primaryKey && !store[primaryKey]) {
store[primaryKey] = store[existingKey];
delete store[existingKey];
}
const entry = store[primaryKey];
2026-01-04 04:05:18 +01:00
const sessionId = entry?.sessionId;
if (!sessionId) {
2026-01-04 04:16:38 +01:00
respond(
true,
{
ok: true,
key: target.canonicalKey,
compacted: false,
reason: "no sessionId",
},
2026-01-04 04:16:38 +01:00
undefined,
);
2026-01-04 04:05:18 +01:00
return;
}
2026-01-04 04:16:38 +01:00
const filePath = resolveSessionTranscriptCandidates(
sessionId,
storePath,
entry?.sessionFile,
target.agentId,
2026-01-04 04:16:38 +01:00
).find((candidate) => fs.existsSync(candidate));
2026-01-04 04:05:18 +01:00
if (!filePath) {
2026-01-04 04:16:38 +01:00
respond(
true,
{
ok: true,
key: target.canonicalKey,
compacted: false,
reason: "no transcript",
},
2026-01-04 04:16:38 +01:00
undefined,
);
2026-01-04 04:05:18 +01:00
return;
}
const raw = fs.readFileSync(filePath, "utf-8");
const lines = raw.split(/\r?\n/).filter((l) => l.trim().length > 0);
if (lines.length <= maxLines) {
2026-01-04 04:16:38 +01:00
respond(
true,
{
ok: true,
key: target.canonicalKey,
compacted: false,
kept: lines.length,
},
2026-01-04 04:16:38 +01:00
undefined,
);
2026-01-04 04:05:18 +01:00
return;
}
const archived = archiveFileOnDisk(filePath, "bak");
const keptLines = lines.slice(-maxLines);
fs.writeFileSync(filePath, `${keptLines.join("\n")}\n`, "utf-8");
if (store[primaryKey]) {
delete store[primaryKey].inputTokens;
delete store[primaryKey].outputTokens;
delete store[primaryKey].totalTokens;
store[primaryKey].updatedAt = Date.now();
2026-01-04 04:05:18 +01:00
await saveSessionStore(storePath, store);
}
respond(
true,
{
ok: true,
key: target.canonicalKey,
2026-01-04 04:05:18 +01:00
compacted: true,
archived,
kept: keptLines.length,
},
undefined,
);
},
};