Compare commits

...

5 Commits

Author SHA1 Message Date
Vincent Koc
5d4771e91e telegram: use dm thread hook session for plugin commands 2026-03-09 11:36:36 -07:00
Vincent Koc
712821e87d gateway: harden before_reset transcript loading 2026-03-09 11:36:14 -07:00
Vincent Koc
4096a9c6a1 docs: normalize contributing whitespace 2026-03-09 11:25:38 -07:00
Vincent Koc
e2419f445d gateway: await before_reset hooks after cleanup 2026-03-09 11:25:38 -07:00
Vincent Koc
72535d5508 gateway: run before_reset hooks on session reset 2026-03-09 11:25:38 -07:00
9 changed files with 371 additions and 61 deletions

View File

@ -58,7 +58,6 @@ Welcome to the lobster tank! 🦞
- **Jonathan Taylor** - ACP subsystem, Gateway features/bugs, Gog/Mog/Sog CLI's, SEDMAT
- GitHub [@visionik](https://github.com/visionik) · X: [@visionik](https://x.com/visionik)
- **Josh Lehman** - Compaction, Tlon/Urbit subsystem
- GitHub [@jalehman](https://github.com/jalehman) · X: [@jlehman\_](https://x.com/jlehman_)
@ -73,7 +72,7 @@ Welcome to the lobster tank! 🦞
- **Robin Waslander** - Security, PR triage, bug fixes
- GitHub: [@hydro13](https://github.com/hydro13) · X: [@Robin_waslander](https://x.com/Robin_waslander)
## How to Contribute
1. **Bugs & small fixes** → Open a PR!

View File

@ -40,7 +40,7 @@ describe("emitResetCommandHooks", () => {
workspaceDir: "/tmp/openclaw-workspace",
});
await vi.waitFor(() => expect(hookRunnerMocks.runBeforeReset).toHaveBeenCalledTimes(1));
expect(hookRunnerMocks.runBeforeReset).toHaveBeenCalledTimes(1);
const [, ctx] = hookRunnerMocks.runBeforeReset.mock.calls[0] ?? [];
return ctx;
}

View File

@ -1,9 +1,7 @@
import fs from "node:fs/promises";
import { resetAcpSessionInPlace } from "../../acp/persistent-bindings.js";
import { logVerbose } from "../../globals.js";
import { createInternalHookEvent, triggerInternalHook } from "../../hooks/internal-hooks.js";
import { getGlobalHookRunner } from "../../plugins/hook-runner-global.js";
import { isAcpSessionKey, resolveAgentIdFromSessionKey } from "../../routing/session-key.js";
import { isAcpSessionKey } from "../../routing/session-key.js";
import { resolveSendPolicy } from "../../sessions/send-policy.js";
import { shouldHandleTextCommands } from "../commands-registry.js";
import { handleAcpCommand } from "./commands-acp.js";
@ -39,6 +37,7 @@ import type {
CommandHandlerResult,
HandleCommandsParams,
} from "./commands-types.js";
import { emitBeforeResetPluginHook } from "./reset-hooks.js";
import { routeReply } from "./route-reply.js";
let HANDLERS: CommandHandler[] | null = null;
@ -56,6 +55,7 @@ export async function emitResetCommandHooks(params: {
sessionKey?: string;
sessionEntry?: HandleCommandsParams["sessionEntry"];
previousSessionEntry?: HandleCommandsParams["previousSessionEntry"];
storePath?: HandleCommandsParams["storePath"];
workspaceDir: string;
}): Promise<void> {
const hookEvent = createInternalHookEvent("command", params.action, params.sessionKey ?? "", {
@ -91,47 +91,13 @@ export async function emitResetCommandHooks(params: {
}
}
// Fire before_reset plugin hook — extract memories before session history is lost
const hookRunner = getGlobalHookRunner();
if (hookRunner?.hasHooks("before_reset")) {
const prevEntry = params.previousSessionEntry;
const sessionFile = prevEntry?.sessionFile;
// Fire-and-forget: read old session messages and run hook
void (async () => {
try {
const messages: unknown[] = [];
if (sessionFile) {
const content = await fs.readFile(sessionFile, "utf-8");
for (const line of content.split("\n")) {
if (!line.trim()) {
continue;
}
try {
const entry = JSON.parse(line);
if (entry.type === "message" && entry.message) {
messages.push(entry.message);
}
} catch {
// skip malformed lines
}
}
} else {
logVerbose("before_reset: no session file available, firing hook with empty messages");
}
await hookRunner.runBeforeReset(
{ sessionFile, messages, reason: params.action },
{
agentId: resolveAgentIdFromSessionKey(params.sessionKey),
sessionKey: params.sessionKey,
sessionId: prevEntry?.sessionId,
workspaceDir: params.workspaceDir,
},
);
} catch (err: unknown) {
logVerbose(`before_reset hook failed: ${String(err)}`);
}
})();
}
await emitBeforeResetPluginHook({
sessionKey: params.sessionKey,
previousSessionEntry: params.previousSessionEntry,
workspaceDir: params.workspaceDir,
reason: params.action,
storePath: params.storePath,
});
}
function applyAcpResetTailContext(ctx: HandleCommandsParams["ctx"], resetTail: string): void {
@ -247,6 +213,7 @@ export async function handleCommands(params: HandleCommandsParams): Promise<Comm
sessionKey: boundAcpKey,
sessionEntry: hookSessionEntry,
previousSessionEntry: hookPreviousSessionEntry,
storePath: params.storePath,
workspaceDir: params.workspaceDir,
});
if (resetTail) {
@ -286,6 +253,7 @@ export async function handleCommands(params: HandleCommandsParams): Promise<Comm
sessionKey: params.sessionKey,
sessionEntry: params.sessionEntry,
previousSessionEntry: params.previousSessionEntry,
storePath: params.storePath,
workspaceDir: params.workspaceDir,
});
}

View File

@ -0,0 +1,99 @@
import fs from "node:fs/promises";
import os from "node:os";
import path from "node:path";
import { afterEach, beforeEach, describe, expect, it, vi } from "vitest";
import type { HookRunner } from "../../plugins/hooks.js";
const hookRunnerMocks = vi.hoisted(() => ({
hasHooks: vi.fn<HookRunner["hasHooks"]>(),
runBeforeReset: vi.fn<HookRunner["runBeforeReset"]>(),
}));
vi.mock("../../plugins/hook-runner-global.js", () => ({
getGlobalHookRunner: () =>
({
hasHooks: hookRunnerMocks.hasHooks,
runBeforeReset: hookRunnerMocks.runBeforeReset,
}) as unknown as HookRunner,
}));
const { emitBeforeResetPluginHook } = await import("./reset-hooks.js");
describe("emitBeforeResetPluginHook", () => {
let tempDir: string;
let storePath: string;
beforeEach(async () => {
tempDir = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-before-reset-"));
storePath = path.join(tempDir, "sessions.json");
hookRunnerMocks.hasHooks.mockReset();
hookRunnerMocks.runBeforeReset.mockReset();
hookRunnerMocks.hasHooks.mockImplementation((hookName) => hookName === "before_reset");
hookRunnerMocks.runBeforeReset.mockResolvedValue(undefined);
});
afterEach(async () => {
await fs.rm(tempDir, { recursive: true, force: true });
vi.restoreAllMocks();
});
it("re-resolves transcript paths within the session store directory", async () => {
const transcriptPath = path.join(tempDir, "sess-main.jsonl");
await fs.writeFile(
transcriptPath,
`${JSON.stringify({ type: "message", message: { role: "user", content: "hello" } })}\n`,
"utf-8",
);
const resolvedTranscriptPath = await fs.realpath(transcriptPath).catch(() => transcriptPath);
await emitBeforeResetPluginHook({
sessionKey: "agent:main:main",
previousSessionEntry: {
sessionId: "sess-main",
sessionFile: "../../etc/passwd",
},
workspaceDir: "/tmp/openclaw-workspace",
reason: "new",
storePath,
});
expect(hookRunnerMocks.runBeforeReset).toHaveBeenCalledWith(
expect.objectContaining({
sessionFile: resolvedTranscriptPath,
messages: [{ role: "user", content: "hello" }],
reason: "new",
}),
expect.objectContaining({
agentId: "main",
sessionKey: "agent:main:main",
sessionId: "sess-main",
}),
);
});
it("caps extracted transcript messages to a bounded maximum", async () => {
const transcriptPath = path.join(tempDir, "sess-cap.jsonl");
const lines = Array.from({ length: 1_050 }, (_, index) =>
JSON.stringify({ type: "message", message: { role: "user", content: `m-${index}` } }),
).join("\n");
await fs.writeFile(transcriptPath, `${lines}\n`, "utf-8");
await emitBeforeResetPluginHook({
sessionKey: "agent:main:main",
previousSessionEntry: {
sessionId: "sess-cap",
sessionFile: "sess-cap.jsonl",
},
workspaceDir: "/tmp/openclaw-workspace",
reason: "reset",
storePath,
});
const [event] = hookRunnerMocks.runBeforeReset.mock.calls[0] ?? [];
const messages = event?.messages;
expect(Array.isArray(messages)).toBe(true);
expect(messages).toHaveLength(1_000);
expect(messages?.[0]).toEqual({ role: "user", content: "m-0" });
expect(messages?.at(-1)).toEqual({ role: "user", content: "m-999" });
});
});

View File

@ -0,0 +1,117 @@
import { createReadStream } from "node:fs";
import fs from "node:fs/promises";
import readline from "node:readline";
import { resolveSessionFilePath, resolveSessionFilePathOptions } from "../../config/sessions.js";
import { logVerbose } from "../../globals.js";
import { getGlobalHookRunner } from "../../plugins/hook-runner-global.js";
import { resolveAgentIdFromSessionKey } from "../../routing/session-key.js";
type BeforeResetSessionEntry = {
sessionId?: string;
sessionFile?: string;
} | null;
const MAX_BEFORE_RESET_TRANSCRIPT_BYTES = 2 * 1024 * 1024;
const MAX_BEFORE_RESET_TRANSCRIPT_LINES = 10_000;
const MAX_BEFORE_RESET_MESSAGES = 1_000;
async function readBoundedBeforeResetMessages(sessionFile: string): Promise<unknown[]> {
const stat = await fs.stat(sessionFile);
if (stat.size > MAX_BEFORE_RESET_TRANSCRIPT_BYTES) {
logVerbose(
`before_reset: transcript exceeds ${MAX_BEFORE_RESET_TRANSCRIPT_BYTES} bytes; skipping message extraction`,
);
return [];
}
const messages: unknown[] = [];
let lineCount = 0;
let bytesRead = 0;
let truncated = false;
const stream = createReadStream(sessionFile, { encoding: "utf-8" });
const rl = readline.createInterface({ input: stream, crlfDelay: Infinity });
try {
for await (const line of rl) {
lineCount += 1;
bytesRead += Buffer.byteLength(line, "utf-8") + 1;
if (
lineCount > MAX_BEFORE_RESET_TRANSCRIPT_LINES ||
bytesRead > MAX_BEFORE_RESET_TRANSCRIPT_BYTES ||
messages.length >= MAX_BEFORE_RESET_MESSAGES
) {
truncated = true;
break;
}
if (!line.trim()) {
continue;
}
try {
const entry = JSON.parse(line);
if (entry.type === "message" && entry.message) {
messages.push(entry.message);
}
} catch {
// Skip malformed transcript lines.
}
}
} finally {
rl.close();
stream.destroy();
}
if (truncated) {
logVerbose("before_reset: transcript parsing truncated to bounded limits");
}
return messages;
}
export async function emitBeforeResetPluginHook(params: {
sessionKey?: string;
previousSessionEntry?: BeforeResetSessionEntry;
workspaceDir: string;
reason: string;
storePath?: string;
}): Promise<void> {
const hookRunner = getGlobalHookRunner();
if (!hookRunner?.hasHooks("before_reset")) {
return;
}
const prevEntry = params.previousSessionEntry;
const sessionId = prevEntry?.sessionId;
const agentId = resolveAgentIdFromSessionKey(params.sessionKey);
const pathOpts = resolveSessionFilePathOptions({
agentId,
storePath: params.storePath,
});
let sessionFile: string | undefined;
try {
let messages: unknown[] = [];
if (sessionId) {
sessionFile = resolveSessionFilePath(sessionId, prevEntry ?? undefined, pathOpts);
try {
messages = await readBoundedBeforeResetMessages(sessionFile);
} catch (err: unknown) {
logVerbose(`before_reset: failed reading transcript messages: ${String(err)}`);
}
} else if (prevEntry?.sessionFile) {
logVerbose("before_reset: session file present without session id; skipping transcript read");
} else {
logVerbose("before_reset: no session file available, firing hook with empty messages");
}
await hookRunner.runBeforeReset(
{ sessionFile, messages, reason: params.reason },
{
agentId,
sessionKey: params.sessionKey,
sessionId,
workspaceDir: params.workspaceDir,
},
);
} catch (err: unknown) {
logVerbose(`before_reset hook failed: ${String(err)}`);
}
}

View File

@ -1,11 +1,12 @@
import { randomUUID } from "node:crypto";
import fs from "node:fs";
import { getAcpSessionManager } from "../../acp/control-plane/manager.js";
import { resolveDefaultAgentId } from "../../agents/agent-scope.js";
import { resolveAgentWorkspaceDir, resolveDefaultAgentId } from "../../agents/agent-scope.js";
import { clearBootstrapSnapshot } from "../../agents/bootstrap-cache.js";
import { abortEmbeddedPiRun, waitForEmbeddedPiRunEnd } from "../../agents/pi-embedded.js";
import { stopSubagentsForRequester } from "../../auto-reply/reply/abort.js";
import { clearSessionQueues } from "../../auto-reply/reply/queue.js";
import { emitBeforeResetPluginHook } from "../../auto-reply/reply/reset-hooks.js";
import { closeTrackedBrowserTabsForSessions } from "../../browser/session-tab-registry.js";
import { loadConfig } from "../../config/config.js";
import {
@ -502,6 +503,13 @@ export const sessionsHandlers: GatewayRequestHandlers = {
respond(false, undefined, mutationCleanupError);
return;
}
await emitBeforeResetPluginHook({
sessionKey: target.canonicalKey ?? key,
previousSessionEntry: entry,
workspaceDir: resolveAgentWorkspaceDir(cfg, target.agentId),
reason: commandReason,
storePath,
});
let oldSessionId: string | undefined;
let oldSessionFile: string | undefined;
const next = await updateSessionStore(storePath, (store) => {

View File

@ -37,6 +37,11 @@ const subagentLifecycleHookMocks = vi.hoisted(() => ({
const subagentLifecycleHookState = vi.hoisted(() => ({
hasSubagentEndedHook: true,
hasBeforeResetHook: false,
}));
const beforeResetHookMocks = vi.hoisted(() => ({
runBeforeReset: vi.fn(async () => {}),
}));
const threadBindingMocks = vi.hoisted(() => ({
@ -96,8 +101,10 @@ vi.mock("../plugins/hook-runner-global.js", async (importOriginal) => {
...actual,
getGlobalHookRunner: vi.fn(() => ({
hasHooks: (hookName: string) =>
hookName === "subagent_ended" && subagentLifecycleHookState.hasSubagentEndedHook,
(hookName === "subagent_ended" && subagentLifecycleHookState.hasSubagentEndedHook) ||
(hookName === "before_reset" && subagentLifecycleHookState.hasBeforeResetHook),
runSubagentEnded: subagentLifecycleHookMocks.runSubagentEnded,
runBeforeReset: beforeResetHookMocks.runBeforeReset,
})),
};
});
@ -220,6 +227,8 @@ describe("gateway server sessions", () => {
sessionHookMocks.triggerInternalHook.mockClear();
subagentLifecycleHookMocks.runSubagentEnded.mockClear();
subagentLifecycleHookState.hasSubagentEndedHook = true;
subagentLifecycleHookState.hasBeforeResetHook = false;
beforeResetHookMocks.runBeforeReset.mockClear();
threadBindingMocks.unbindThreadBindingsBySessionKey.mockClear();
acpRuntimeMocks.cancel.mockClear();
acpRuntimeMocks.close.mockClear();
@ -1179,6 +1188,47 @@ describe("gateway server sessions", () => {
ws.close();
});
test("sessions.reset runs before_reset plugin hooks with gateway session context", async () => {
const { dir } = await createSessionStoreDir();
await writeSingleLineSession(dir, "sess-main", "hello");
const resolvedTranscriptPath = await fs
.realpath(path.join(dir, "sess-main.jsonl"))
.catch(() => path.join(dir, "sess-main.jsonl"));
await writeSessionStore({
entries: {
main: {
sessionId: "sess-main",
sessionFile: path.join(dir, "sess-main.jsonl"),
updatedAt: Date.now(),
},
},
});
subagentLifecycleHookState.hasBeforeResetHook = true;
const { ws } = await openClient();
const reset = await rpcReq<{ ok: true; key: string }>(ws, "sessions.reset", {
key: "main",
reason: "new",
});
expect(reset.ok).toBe(true);
await vi.waitFor(() => expect(beforeResetHookMocks.runBeforeReset).toHaveBeenCalledTimes(1));
expect(beforeResetHookMocks.runBeforeReset).toHaveBeenCalledWith(
expect.objectContaining({
sessionFile: resolvedTranscriptPath,
reason: "new",
}),
expect.objectContaining({
agentId: "main",
sessionKey: "agent:main:main",
sessionId: "sess-main",
}),
);
ws.close();
});
test("sessions.reset returns unavailable when active run does not stop", async () => {
const { dir, storePath } = await seedActiveMainSession();
const waitCallCountAtSnapshotClear: number[] = [];
@ -1188,6 +1238,7 @@ describe("gateway server sessions", () => {
embeddedRunMock.activeIds.add("sess-main");
embeddedRunMock.waitResults.set("sess-main", false);
subagentLifecycleHookState.hasBeforeResetHook = true;
const { ws } = await openClient();
@ -1204,6 +1255,7 @@ describe("gateway server sessions", () => {
);
expect(waitCallCountAtSnapshotClear).toEqual([1]);
expect(browserSessionTabMocks.closeTrackedBrowserTabsForSessions).not.toHaveBeenCalled();
expect(beforeResetHookMocks.runBeforeReset).not.toHaveBeenCalled();
const store = JSON.parse(await fs.readFile(storePath, "utf-8")) as Record<
string,

View File

@ -270,4 +270,58 @@ describe("registerTelegramNativeCommands", () => {
);
expect(sendMessage).not.toHaveBeenCalledWith(123, "Command not found.");
});
it("uses the DM thread session key for plugin command internal sent hooks", async () => {
const commandHandlers = new Map<string, (ctx: unknown) => Promise<void>>();
pluginCommandMocks.getPluginCommandSpecs.mockReturnValue([
{
name: "plug",
description: "Plugin command",
},
] as never);
pluginCommandMocks.matchPluginCommand.mockReturnValue({
command: { key: "plug", requireAuth: false },
args: undefined,
} as never);
registerTelegramNativeCommands({
...buildParams({
channels: {
telegram: {
dmPolicy: "open",
},
},
}),
bot: {
api: {
setMyCommands: vi.fn().mockResolvedValue(undefined),
sendMessage: vi.fn().mockResolvedValue(undefined),
},
command: vi.fn((name: string, cb: (ctx: unknown) => Promise<void>) => {
commandHandlers.set(name, cb);
}),
} as unknown as Parameters<typeof registerTelegramNativeCommands>[0]["bot"],
});
const handler = commandHandlers.get("plug");
expect(handler).toBeTruthy();
await handler?.({
match: "",
message: {
message_id: 42,
date: Math.floor(Date.now() / 1000),
chat: { id: 12345, type: "private" },
from: { id: 12345, username: "alice" },
message_thread_id: 99,
},
});
expect(deliveryMocks.deliverReplies).toHaveBeenCalledWith(
expect.objectContaining({
sessionKeyForInternalHooks: "agent:main:main:thread:12345:99",
}),
);
});
});

View File

@ -540,6 +540,20 @@ export const registerTelegramNativeCommands = ({
chunkMode: params.chunkMode,
linkPreview: telegramCfg.linkPreview,
});
const resolveDmThreadSessionKey = (params: {
baseSessionKey: string;
chatId: string | number;
threadSpec: ReturnType<typeof resolveTelegramThreadSpec>;
}): string => {
const dmThreadId = params.threadSpec.scope === "dm" ? params.threadSpec.id : undefined;
if (dmThreadId == null) {
return params.baseSessionKey;
}
return resolveThreadSessionKeys({
baseSessionKey: params.baseSessionKey,
threadId: `${params.chatId}:${dmThreadId}`,
}).sessionKey;
};
if (commandsToRegister.length > 0 || pluginCatalog.commands.length > 0) {
if (typeof (bot as unknown as { command?: unknown }).command !== "function") {
@ -647,17 +661,11 @@ export const registerTelegramNativeCommands = ({
});
return;
}
const baseSessionKey = route.sessionKey;
// DMs: use raw messageThreadId for thread sessions (not resolvedThreadId which is for forums)
const dmThreadId = threadSpec.scope === "dm" ? threadSpec.id : undefined;
const threadKeys =
dmThreadId != null
? resolveThreadSessionKeys({
baseSessionKey,
threadId: `${chatId}:${dmThreadId}`,
})
: null;
const sessionKey = threadKeys?.sessionKey ?? baseSessionKey;
const sessionKey = resolveDmThreadSessionKey({
baseSessionKey: route.sessionKey,
chatId,
threadSpec,
});
const { skillFilter, groupSystemPrompt } = resolveTelegramGroupPromptSettings({
groupConfig,
topicConfig,
@ -833,10 +841,15 @@ export const registerTelegramNativeCommands = ({
return;
}
const { threadSpec, route, mediaLocalRoots, tableMode, chunkMode } = runtimeContext;
const sessionKeyForInternalHooks = resolveDmThreadSessionKey({
baseSessionKey: route.sessionKey,
chatId,
threadSpec,
});
const deliveryBaseOptions = buildCommandDeliveryBaseOptions({
chatId,
accountId: route.accountId,
sessionKeyForInternalHooks: route.sessionKey,
sessionKeyForInternalHooks,
mirrorIsGroup: isGroup,
mirrorGroupId: isGroup ? String(chatId) : undefined,
mediaLocalRoots,