gateway: run before_reset hooks on session reset

This commit is contained in:
Vincent Koc 2026-03-09 09:48:25 -07:00
parent 7b88249c9e
commit 72535d5508
4 changed files with 124 additions and 46 deletions

View File

@ -1,9 +1,7 @@
import fs from "node:fs/promises";
import { resetAcpSessionInPlace } from "../../acp/persistent-bindings.js";
import { logVerbose } from "../../globals.js";
import { createInternalHookEvent, triggerInternalHook } from "../../hooks/internal-hooks.js";
import { getGlobalHookRunner } from "../../plugins/hook-runner-global.js";
import { isAcpSessionKey, resolveAgentIdFromSessionKey } from "../../routing/session-key.js";
import { isAcpSessionKey } from "../../routing/session-key.js";
import { resolveSendPolicy } from "../../sessions/send-policy.js";
import { shouldHandleTextCommands } from "../commands-registry.js";
import { handleAcpCommand } from "./commands-acp.js";
@ -39,6 +37,7 @@ import type {
CommandHandlerResult,
HandleCommandsParams,
} from "./commands-types.js";
import { emitBeforeResetPluginHook } from "./reset-hooks.js";
import { routeReply } from "./route-reply.js";
let HANDLERS: CommandHandler[] | null = null;
@ -91,47 +90,12 @@ export async function emitResetCommandHooks(params: {
}
}
// Fire before_reset plugin hook — extract memories before session history is lost
const hookRunner = getGlobalHookRunner();
if (hookRunner?.hasHooks("before_reset")) {
const prevEntry = params.previousSessionEntry;
const sessionFile = prevEntry?.sessionFile;
// Fire-and-forget: read old session messages and run hook
void (async () => {
try {
const messages: unknown[] = [];
if (sessionFile) {
const content = await fs.readFile(sessionFile, "utf-8");
for (const line of content.split("\n")) {
if (!line.trim()) {
continue;
}
try {
const entry = JSON.parse(line);
if (entry.type === "message" && entry.message) {
messages.push(entry.message);
}
} catch {
// skip malformed lines
}
}
} else {
logVerbose("before_reset: no session file available, firing hook with empty messages");
}
await hookRunner.runBeforeReset(
{ sessionFile, messages, reason: params.action },
{
agentId: resolveAgentIdFromSessionKey(params.sessionKey),
sessionKey: params.sessionKey,
sessionId: prevEntry?.sessionId,
workspaceDir: params.workspaceDir,
},
);
} catch (err: unknown) {
logVerbose(`before_reset hook failed: ${String(err)}`);
}
})();
}
emitBeforeResetPluginHook({
sessionKey: params.sessionKey,
previousSessionEntry: params.previousSessionEntry,
workspaceDir: params.workspaceDir,
reason: params.action,
});
}
function applyAcpResetTailContext(ctx: HandleCommandsParams["ctx"], resetTail: string): void {

View File

@ -0,0 +1,60 @@
import fs from "node:fs/promises";
import { logVerbose } from "../../globals.js";
import { getGlobalHookRunner } from "../../plugins/hook-runner-global.js";
import { resolveAgentIdFromSessionKey } from "../../routing/session-key.js";
type BeforeResetSessionEntry = {
sessionId?: string;
sessionFile?: string;
} | null;
export function emitBeforeResetPluginHook(params: {
sessionKey?: string;
previousSessionEntry?: BeforeResetSessionEntry;
workspaceDir: string;
reason: string;
}): void {
const hookRunner = getGlobalHookRunner();
if (!hookRunner?.hasHooks("before_reset")) {
return;
}
const prevEntry = params.previousSessionEntry;
const sessionFile = prevEntry?.sessionFile;
// Fire-and-forget: read old session messages and run hook before reset mutates the store.
void (async () => {
try {
const messages: unknown[] = [];
if (sessionFile) {
const content = await fs.readFile(sessionFile, "utf-8");
for (const line of content.split("\n")) {
if (!line.trim()) {
continue;
}
try {
const entry = JSON.parse(line);
if (entry.type === "message" && entry.message) {
messages.push(entry.message);
}
} catch {
// Skip malformed transcript lines.
}
}
} else {
logVerbose("before_reset: no session file available, firing hook with empty messages");
}
await hookRunner.runBeforeReset(
{ sessionFile, messages, reason: params.reason },
{
agentId: resolveAgentIdFromSessionKey(params.sessionKey),
sessionKey: params.sessionKey,
sessionId: prevEntry?.sessionId,
workspaceDir: params.workspaceDir,
},
);
} catch (err: unknown) {
logVerbose(`before_reset hook failed: ${String(err)}`);
}
})();
}

View File

@ -1,11 +1,12 @@
import { randomUUID } from "node:crypto";
import fs from "node:fs";
import { getAcpSessionManager } from "../../acp/control-plane/manager.js";
import { resolveDefaultAgentId } from "../../agents/agent-scope.js";
import { resolveAgentWorkspaceDir, resolveDefaultAgentId } from "../../agents/agent-scope.js";
import { clearBootstrapSnapshot } from "../../agents/bootstrap-cache.js";
import { abortEmbeddedPiRun, waitForEmbeddedPiRunEnd } from "../../agents/pi-embedded.js";
import { stopSubagentsForRequester } from "../../auto-reply/reply/abort.js";
import { clearSessionQueues } from "../../auto-reply/reply/queue.js";
import { emitBeforeResetPluginHook } from "../../auto-reply/reply/reset-hooks.js";
import { closeTrackedBrowserTabsForSessions } from "../../browser/session-tab-registry.js";
import { loadConfig } from "../../config/config.js";
import {
@ -489,6 +490,12 @@ export const sessionsHandlers: GatewayRequestHandlers = {
},
);
await triggerInternalHook(hookEvent);
emitBeforeResetPluginHook({
sessionKey: target.canonicalKey ?? key,
previousSessionEntry: entry,
workspaceDir: resolveAgentWorkspaceDir(cfg, target.agentId),
reason: commandReason,
});
const mutationCleanupError = await cleanupSessionBeforeMutation({
cfg,
key,

View File

@ -37,6 +37,11 @@ const subagentLifecycleHookMocks = vi.hoisted(() => ({
const subagentLifecycleHookState = vi.hoisted(() => ({
hasSubagentEndedHook: true,
hasBeforeResetHook: false,
}));
const beforeResetHookMocks = vi.hoisted(() => ({
runBeforeReset: vi.fn(async () => {}),
}));
const threadBindingMocks = vi.hoisted(() => ({
@ -96,8 +101,10 @@ vi.mock("../plugins/hook-runner-global.js", async (importOriginal) => {
...actual,
getGlobalHookRunner: vi.fn(() => ({
hasHooks: (hookName: string) =>
hookName === "subagent_ended" && subagentLifecycleHookState.hasSubagentEndedHook,
(hookName === "subagent_ended" && subagentLifecycleHookState.hasSubagentEndedHook) ||
(hookName === "before_reset" && subagentLifecycleHookState.hasBeforeResetHook),
runSubagentEnded: subagentLifecycleHookMocks.runSubagentEnded,
runBeforeReset: beforeResetHookMocks.runBeforeReset,
})),
};
});
@ -220,6 +227,8 @@ describe("gateway server sessions", () => {
sessionHookMocks.triggerInternalHook.mockClear();
subagentLifecycleHookMocks.runSubagentEnded.mockClear();
subagentLifecycleHookState.hasSubagentEndedHook = true;
subagentLifecycleHookState.hasBeforeResetHook = false;
beforeResetHookMocks.runBeforeReset.mockClear();
threadBindingMocks.unbindThreadBindingsBySessionKey.mockClear();
acpRuntimeMocks.cancel.mockClear();
acpRuntimeMocks.close.mockClear();
@ -1179,6 +1188,44 @@ describe("gateway server sessions", () => {
ws.close();
});
test("sessions.reset runs before_reset plugin hooks with gateway session context", async () => {
const { dir } = await createSessionStoreDir();
await writeSingleLineSession(dir, "sess-main", "hello");
await writeSessionStore({
entries: {
main: {
sessionId: "sess-main",
sessionFile: path.join(dir, "sess-main.jsonl"),
updatedAt: Date.now(),
},
},
});
subagentLifecycleHookState.hasBeforeResetHook = true;
const { ws } = await openClient();
const reset = await rpcReq<{ ok: true; key: string }>(ws, "sessions.reset", {
key: "main",
reason: "new",
});
expect(reset.ok).toBe(true);
await vi.waitFor(() => expect(beforeResetHookMocks.runBeforeReset).toHaveBeenCalledTimes(1));
expect(beforeResetHookMocks.runBeforeReset).toHaveBeenCalledWith(
expect.objectContaining({
sessionFile: path.join(dir, "sess-main.jsonl"),
reason: "new",
}),
expect.objectContaining({
agentId: "main",
sessionKey: "agent:main:main",
sessionId: "sess-main",
}),
);
ws.close();
});
test("sessions.reset returns unavailable when active run does not stop", async () => {
const { dir, storePath } = await seedActiveMainSession();
const waitCallCountAtSnapshotClear: number[] = [];