Compare commits

...

7 Commits

Author SHA1 Message Date
Val Alexander
e0541f772e
update: auto-reply 2026-03-16 05:52:15 -05:00
Val Alexander
f285429952
Harden remote PTY gateway sessions 2026-03-16 05:52:14 -05:00
Val Alexander
71b4fa04d9
Add gateway PTY RPC methods 2026-03-16 05:52:14 -05:00
Val Alexander
da4459263d
Merge branch 'main' into feature/btw-ephemeral-side-turns 2026-03-13 22:09:33 -05:00
Val Alexander
5328399d75
feat: add /btw side-turn MVP
Coauthored with Nova.

Co-authored-by: Nova <nova@openknot.ai>
2026-03-13 22:08:14 -05:00
Val Alexander
ecedddae81
Merge branch 'main' into feature/btw-ephemeral-side-turns 2026-03-13 21:49:04 -05:00
Val Alexander
27ffeab217
chore: start /btw ephemeral side-turns draft 2026-03-13 21:29:37 -05:00
25 changed files with 1257 additions and 100 deletions

View File

@ -145,6 +145,8 @@ Common scopes:
- `operator.read` - `operator.read`
- `operator.write` - `operator.write`
- includes PTY lifecycle methods: `pty.create`, `pty.write`, `pty.resize`, `pty.kill`
- `pty.list` is readable by `operator.read` / `operator.write`
- `operator.admin` - `operator.admin`
- `operator.approvals` - `operator.approvals`
- `operator.pairing` - `operator.pairing`
@ -181,6 +183,10 @@ The Gateway treats these as **claims** and enforces server-side allowlists.
- `source`: `core` or `plugin` - `source`: `core` or `plugin`
- `pluginId`: plugin owner when `source="plugin"` - `pluginId`: plugin owner when `source="plugin"`
- `optional`: whether a plugin tool is optional - `optional`: whether a plugin tool is optional
- PTY helpers:
- `pty.create`, `pty.write`, `pty.resize`, `pty.kill` require `operator.write`
- `pty.list` is available to `operator.read` / `operator.write`
- PTY events are targeted to the owning operator connection/device: `pty.output`, `pty.exit`
## Exec approvals ## Exec approvals

View File

@ -117,6 +117,13 @@ openclaw health
### Common footguns ### Common footguns
- **Wrong port:** Gateway WS defaults to `ws://127.0.0.1:18789`; keep app + CLI on the same port. - **Wrong port:** Gateway WS defaults to `ws://127.0.0.1:18789`; keep app + CLI on the same port.
- **Remote PTY limits:** the Gateway remote terminal now enforces env-configurable limits.
- `OPENCLAW_PTY_MAX_SESSIONS_PER_OWNER` (default `4`)
- `OPENCLAW_PTY_MAX_TOTAL_SESSIONS` (default `32`)
- `OPENCLAW_PTY_MAX_INPUT_CHUNK_BYTES` (default `65536`)
- `OPENCLAW_PTY_MIN_COLS` / `OPENCLAW_PTY_MAX_COLS` (defaults `20` / `500`)
- `OPENCLAW_PTY_MIN_ROWS` / `OPENCLAW_PTY_MAX_ROWS` (defaults `5` / `200`)
- `OPENCLAW_PTY_IDLE_TIMEOUT_MS` / `OPENCLAW_PTY_IDLE_SWEEP_INTERVAL_MS` (defaults `1800000` / `60000`)
- **Where state lives:** - **Where state lives:**
- Credentials: `~/.openclaw/credentials/` - Credentials: `~/.openclaw/credentials/`
- Sessions: `~/.openclaw/agents/<agentId>/sessions/` - Sessions: `~/.openclaw/agents/<agentId>/sessions/`

View File

@ -358,6 +358,7 @@
"@mariozechner/pi-tui": "0.57.1", "@mariozechner/pi-tui": "0.57.1",
"@modelcontextprotocol/sdk": "1.27.1", "@modelcontextprotocol/sdk": "1.27.1",
"@mozilla/readability": "^0.6.0", "@mozilla/readability": "^0.6.0",
"@pierre/diffs": "1.1.0",
"@sinclair/typebox": "0.34.48", "@sinclair/typebox": "0.34.48",
"@slack/bolt": "^4.6.0", "@slack/bolt": "^4.6.0",
"@slack/web-api": "^7.15.0", "@slack/web-api": "^7.15.0",

3
pnpm-lock.yaml generated
View File

@ -79,6 +79,9 @@ importers:
'@napi-rs/canvas': '@napi-rs/canvas':
specifier: ^0.1.89 specifier: ^0.1.89
version: 0.1.95 version: 0.1.95
'@pierre/diffs':
specifier: 1.1.0
version: 1.1.0(react-dom@19.2.4(react@19.2.4))(react@19.2.4)
'@sinclair/typebox': '@sinclair/typebox':
specifier: 0.34.48 specifier: 0.34.48
version: 0.34.48 version: 0.34.48

View File

@ -144,6 +144,22 @@ function buildChatCommands(): ChatCommandDefinition[] {
textAlias: "/commands", textAlias: "/commands",
category: "status", category: "status",
}), }),
defineChatCommand({
key: "btw",
nativeName: "btw",
description: "Ask an ephemeral follow-up about the active session.",
textAlias: "/btw",
category: "status",
args: [
{
name: "question",
description: "Inline follow-up question",
type: "string",
required: true,
captureRemaining: true,
},
],
}),
defineChatCommand({ defineChatCommand({
key: "skill", key: "skill",
nativeName: "skill", nativeName: "skill",

View File

@ -36,6 +36,7 @@ describe("commands registry", () => {
it("exposes native specs", () => { it("exposes native specs", () => {
const specs = listNativeCommandSpecs(); const specs = listNativeCommandSpecs();
expect(specs.find((spec) => spec.name === "btw")).toBeTruthy();
expect(specs.find((spec) => spec.name === "help")).toBeTruthy(); expect(specs.find((spec) => spec.name === "help")).toBeTruthy();
expect(specs.find((spec) => spec.name === "stop")).toBeTruthy(); expect(specs.find((spec) => spec.name === "stop")).toBeTruthy();
expect(specs.find((spec) => spec.name === "skill")).toBeTruthy(); expect(specs.find((spec) => spec.name === "skill")).toBeTruthy();
@ -209,6 +210,20 @@ describe("commands registry", () => {
expect(modeArg?.choices).toEqual(["status", "on", "off"]); expect(modeArg?.choices).toEqual(["status", "on", "off"]);
}); });
it("registers /btw as an inline-question command", () => {
const command = findCommandByNativeName("btw");
expect(command).toMatchObject({
key: "btw",
nativeName: "btw",
textAliases: ["/btw"],
});
expect(command?.args?.[0]).toMatchObject({
name: "question",
required: true,
captureRemaining: true,
});
});
it("detects known text commands", () => { it("detects known text commands", () => {
const detection = getCommandDetection(); const detection = getCommandDetection();
expect(detection.exact.has("/commands")).toBe(true); expect(detection.exact.has("/commands")).toBe(true);

View File

@ -192,6 +192,7 @@ export function buildEmbeddedRunBaseParams(params: {
thinkLevel: params.run.thinkLevel, thinkLevel: params.run.thinkLevel,
verboseLevel: params.run.verboseLevel, verboseLevel: params.run.verboseLevel,
reasoningLevel: params.run.reasoningLevel, reasoningLevel: params.run.reasoningLevel,
disableTools: params.run.disableTools,
execOverrides: params.run.execOverrides, execOverrides: params.run.execOverrides,
bashElevated: params.run.bashElevated, bashElevated: params.run.bashElevated,
timeoutMs: params.run.timeoutMs, timeoutMs: params.run.timeoutMs,

View File

@ -1,3 +1,6 @@
import fs from "node:fs/promises";
import os from "node:os";
import path from "node:path";
import { beforeEach, describe, expect, it, vi } from "vitest"; import { beforeEach, describe, expect, it, vi } from "vitest";
import { runPreparedReply } from "./get-reply-run.js"; import { runPreparedReply } from "./get-reply-run.js";
@ -79,6 +82,7 @@ vi.mock("./typing-mode.js", () => ({
resolveTypingMode: vi.fn().mockReturnValue("off"), resolveTypingMode: vi.fn().mockReturnValue("off"),
})); }));
import { resolveSessionFilePath } from "../../config/sessions.js";
import { runReplyAgent } from "./agent-runner.js"; import { runReplyAgent } from "./agent-runner.js";
import { routeReply } from "./route-reply.js"; import { routeReply } from "./route-reply.js";
import { drainFormattedSystemEvents } from "./session-updates.js"; import { drainFormattedSystemEvents } from "./session-updates.js";
@ -396,4 +400,63 @@ describe("runPreparedReply media-only handling", () => {
// Queue body (used by steer mode) must keep the full original text. // Queue body (used by steer mode) must keep the full original text.
expect(call?.followupRun.prompt).toContain("low steer this conversation"); expect(call?.followupRun.prompt).toContain("low steer this conversation");
}); });
it("forces /btw side turns to run as a single no-tools reply", async () => {
await runPreparedReply(
baseParams({
blockStreamingEnabled: true,
ephemeralSideTurn: { kind: "btw" },
}),
);
const call = vi.mocked(runReplyAgent).mock.calls[0]?.[0];
expect(call).toBeTruthy();
expect(call?.followupRun.run.disableTools).toBe(true);
expect(call?.resolvedQueue.mode).toBe("interrupt");
expect(call?.shouldSteer).toBe(false);
expect(call?.shouldFollowup).toBe(false);
expect(call?.blockStreamingEnabled).toBe(false);
expect(call?.queueKey).toBe(call?.followupRun.run.sessionId);
expect(call?.queueKey).not.toBe("session-key");
});
it("copies parent transcript into a temporary /btw session without mutating the parent", async () => {
const tempRoot = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-btw-test-"));
const sourceSessionFile = path.join(tempRoot, "parent.jsonl");
const sourceTranscript = [
JSON.stringify({ type: "header", sessionId: "parent-session" }),
JSON.stringify({
type: "message",
message: { role: "user", content: [{ type: "text", text: "parent question" }] },
}),
"",
].join("\n");
await fs.writeFile(sourceSessionFile, sourceTranscript, "utf-8");
let copiedTranscript = "";
vi.mocked(runReplyAgent).mockImplementationOnce(async (call) => {
copiedTranscript = await fs.readFile(call.followupRun.run.sessionFile, "utf-8");
return { text: "ok" };
});
vi.mocked(resolveSessionFilePath).mockReturnValueOnce(sourceSessionFile);
try {
await runPreparedReply(
baseParams({
ephemeralSideTurn: { kind: "btw" },
sessionEntry: {
sessionId: "parent-session",
updatedAt: 1,
sessionFile: sourceSessionFile,
},
storePath: path.join(tempRoot, "sessions.json"),
}),
);
expect(copiedTranscript).toBe(sourceTranscript);
await expect(fs.readFile(sourceSessionFile, "utf-8")).resolves.toBe(sourceTranscript);
} finally {
await fs.rm(tempRoot, { recursive: true, force: true });
}
});
}); });

View File

@ -1,4 +1,7 @@
import crypto from "node:crypto"; import crypto from "node:crypto";
import fs from "node:fs/promises";
import os from "node:os";
import path from "node:path";
import { resolveSessionAuthProfileOverride } from "../../agents/auth-profiles/session-override.js"; import { resolveSessionAuthProfileOverride } from "../../agents/auth-profiles/session-override.js";
import type { ExecToolDefaults } from "../../agents/bash-tools.js"; import type { ExecToolDefaults } from "../../agents/bash-tools.js";
import { resolveFastModeState } from "../../agents/fast-mode.js"; import { resolveFastModeState } from "../../agents/fast-mode.js";
@ -53,6 +56,30 @@ import { appendUntrustedContext } from "./untrusted-context.js";
type AgentDefaults = NonNullable<OpenClawConfig["agents"]>["defaults"]; type AgentDefaults = NonNullable<OpenClawConfig["agents"]>["defaults"];
type ExecOverrides = Pick<ExecToolDefaults, "host" | "security" | "ask" | "node">; type ExecOverrides = Pick<ExecToolDefaults, "host" | "security" | "ask" | "node">;
type EphemeralSideTurn = { kind: "btw" };
async function createEphemeralSideTurnSession(params: {
agentId: string;
sessionEntry?: SessionEntry;
storePath?: string;
}) {
const sessionId = crypto.randomUUID();
const tempDir = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-btw-"));
const sessionFile = path.join(tempDir, `${sessionId}.jsonl`);
const sourceSessionFile = params.sessionEntry
? resolveSessionFilePath(
params.sessionEntry.sessionId ?? sessionId,
params.sessionEntry,
resolveSessionFilePathOptions({ agentId: params.agentId, storePath: params.storePath }),
)
: undefined;
if (sourceSessionFile) {
await fs.copyFile(sourceSessionFile, sessionFile);
} else {
await fs.writeFile(sessionFile, "", "utf-8");
}
return { sessionId, sessionFile, tempDir };
}
function buildResetSessionNoticeText(params: { function buildResetSessionNoticeText(params: {
provider: string; provider: string;
@ -177,6 +204,7 @@ type RunPreparedReplyParams = {
storePath?: string; storePath?: string;
workspaceDir: string; workspaceDir: string;
abortedLastRun: boolean; abortedLastRun: boolean;
ephemeralSideTurn?: EphemeralSideTurn;
}; };
export async function runPreparedReply( export async function runPreparedReply(
@ -219,6 +247,7 @@ export async function runPreparedReply(
storePath, storePath,
workspaceDir, workspaceDir,
sessionStore, sessionStore,
ephemeralSideTurn,
} = params; } = params;
let { let {
sessionEntry, sessionEntry,
@ -230,6 +259,9 @@ export async function runPreparedReply(
abortedLastRun, abortedLastRun,
} = params; } = params;
let currentSystemSent = systemSent; let currentSystemSent = systemSent;
const persistedSessionStore = ephemeralSideTurn ? undefined : sessionStore;
const persistedStorePath = ephemeralSideTurn ? undefined : storePath;
const abortedLastRunForRun = ephemeralSideTurn ? false : abortedLastRun;
const isFirstTurnInSession = isNewSession || !currentSystemSent; const isFirstTurnInSession = isNewSession || !currentSystemSent;
const isGroupChat = sessionCtx.ChatType === "group"; const isGroupChat = sessionCtx.ChatType === "group";
@ -324,11 +356,11 @@ export async function runPreparedReply(
: "[User sent media without caption]"; : "[User sent media without caption]";
let prefixedBodyBase = await applySessionHints({ let prefixedBodyBase = await applySessionHints({
baseBody: effectiveBaseBody, baseBody: effectiveBaseBody,
abortedLastRun, abortedLastRun: abortedLastRunForRun,
sessionEntry, sessionEntry,
sessionStore, sessionStore: persistedSessionStore,
sessionKey, sessionKey,
storePath, storePath: persistedStorePath,
abortKey: command.abortKey, abortKey: command.abortKey,
}); });
const isGroupSession = sessionEntry?.chatType === "group" || sessionEntry?.chatType === "channel"; const isGroupSession = sessionEntry?.chatType === "group" || sessionEntry?.chatType === "channel";
@ -367,9 +399,9 @@ export async function runPreparedReply(
: undefined; : undefined;
const skillResult = await ensureSkillSnapshot({ const skillResult = await ensureSkillSnapshot({
sessionEntry, sessionEntry,
sessionStore, sessionStore: persistedSessionStore,
sessionKey, sessionKey,
storePath, storePath: persistedStorePath,
sessionId, sessionId,
isFirstTurnInSession, isFirstTurnInSession,
workspaceDir, workspaceDir,
@ -399,12 +431,18 @@ export async function runPreparedReply(
}; };
} }
resolvedThinkLevel = "high"; resolvedThinkLevel = "high";
if (sessionEntry && sessionStore && sessionKey && sessionEntry.thinkingLevel === "xhigh") { if (
!ephemeralSideTurn &&
sessionEntry &&
sessionStore &&
sessionKey &&
sessionEntry.thinkingLevel === "xhigh"
) {
sessionEntry.thinkingLevel = "high"; sessionEntry.thinkingLevel = "high";
sessionEntry.updatedAt = Date.now(); sessionEntry.updatedAt = Date.now();
sessionStore[sessionKey] = sessionEntry; sessionStore[sessionKey] = sessionEntry;
if (storePath) { if (persistedStorePath) {
await updateSessionStore(storePath, (store) => { await updateSessionStore(persistedStorePath, (store) => {
store[sessionKey] = sessionEntry; store[sessionKey] = sessionEntry;
}); });
} }
@ -424,40 +462,53 @@ export async function runPreparedReply(
defaultModel, defaultModel,
}); });
} }
const sessionIdFinal = sessionId ?? crypto.randomUUID(); const sideTurnSession = ephemeralSideTurn
const sessionFile = resolveSessionFilePath( ? await createEphemeralSideTurnSession({
sessionIdFinal, agentId,
sessionEntry, sessionEntry,
resolveSessionFilePathOptions({ agentId, storePath }), storePath,
); })
: null;
const sessionIdFinal = sideTurnSession?.sessionId ?? sessionId ?? crypto.randomUUID();
const sessionFile =
sideTurnSession?.sessionFile ??
resolveSessionFilePath(
sessionIdFinal,
sessionEntry,
resolveSessionFilePathOptions({ agentId, storePath }),
);
// Use bodyWithEvents (events prepended, but no session hints / untrusted context) so // Use bodyWithEvents (events prepended, but no session hints / untrusted context) so
// deferred turns receive system events while keeping the same scope as effectiveBaseBody did. // deferred turns receive system events while keeping the same scope as effectiveBaseBody did.
const queueBodyBase = [threadContextNote, bodyWithEvents].filter(Boolean).join("\n\n"); const queueBodyBase = [threadContextNote, bodyWithEvents].filter(Boolean).join("\n\n");
const queuedBody = mediaNote const queuedBody = mediaNote
? [mediaNote, mediaReplyHint, queueBodyBase].filter(Boolean).join("\n").trim() ? [mediaNote, mediaReplyHint, queueBodyBase].filter(Boolean).join("\n").trim()
: queueBodyBase; : queueBodyBase;
const resolvedQueue = resolveQueueSettings({ const inheritedQueue = resolveQueueSettings({
cfg, cfg,
channel: sessionCtx.Provider, channel: sessionCtx.Provider,
sessionEntry, sessionEntry,
inlineMode: perMessageQueueMode, inlineMode: perMessageQueueMode,
inlineOptions: perMessageQueueOptions, inlineOptions: perMessageQueueOptions,
}); });
const sessionLaneKey = resolveEmbeddedSessionLane(sessionKey ?? sessionIdFinal); const resolvedQueue = ephemeralSideTurn ? { mode: "interrupt" as const } : inheritedQueue;
const queueKey = ephemeralSideTurn ? sessionIdFinal : (sessionKey ?? sessionIdFinal);
const sessionLaneKey = resolveEmbeddedSessionLane(queueKey);
const laneSize = getQueueSize(sessionLaneKey); const laneSize = getQueueSize(sessionLaneKey);
if (resolvedQueue.mode === "interrupt" && laneSize > 0) { if (resolvedQueue.mode === "interrupt" && laneSize > 0) {
const cleared = clearCommandLane(sessionLaneKey); const cleared = clearCommandLane(sessionLaneKey);
const aborted = abortEmbeddedPiRun(sessionIdFinal); const aborted = abortEmbeddedPiRun(sessionIdFinal);
logVerbose(`Interrupting ${sessionLaneKey} (cleared ${cleared}, aborted=${aborted})`); logVerbose(`Interrupting ${sessionLaneKey} (cleared ${cleared}, aborted=${aborted})`);
} }
const queueKey = sessionKey ?? sessionIdFinal;
const isActive = isEmbeddedPiRunActive(sessionIdFinal); const isActive = isEmbeddedPiRunActive(sessionIdFinal);
const isStreaming = isEmbeddedPiRunStreaming(sessionIdFinal); const isStreaming = isEmbeddedPiRunStreaming(sessionIdFinal);
const shouldSteer = resolvedQueue.mode === "steer" || resolvedQueue.mode === "steer-backlog"; const shouldSteer =
!ephemeralSideTurn &&
(resolvedQueue.mode === "steer" || resolvedQueue.mode === "steer-backlog");
const shouldFollowup = const shouldFollowup =
resolvedQueue.mode === "followup" || !ephemeralSideTurn &&
resolvedQueue.mode === "collect" || (resolvedQueue.mode === "followup" ||
resolvedQueue.mode === "steer-backlog"; resolvedQueue.mode === "collect" ||
resolvedQueue.mode === "steer-backlog");
const authProfileId = await resolveSessionAuthProfileOverride({ const authProfileId = await resolveSessionAuthProfileOverride({
cfg, cfg,
provider, provider,
@ -519,6 +570,7 @@ export async function runPreparedReply(
verboseLevel: resolvedVerboseLevel, verboseLevel: resolvedVerboseLevel,
reasoningLevel: resolvedReasoningLevel, reasoningLevel: resolvedReasoningLevel,
elevatedLevel: resolvedElevatedLevel, elevatedLevel: resolvedElevatedLevel,
...(ephemeralSideTurn ? { disableTools: true } : {}),
execOverrides, execOverrides,
bashElevated: { bashElevated: {
enabled: elevatedEnabled, enabled: elevatedEnabled,
@ -534,30 +586,36 @@ export async function runPreparedReply(
}, },
}; };
return runReplyAgent({ try {
commandBody: prefixedCommandBody, return await runReplyAgent({
followupRun, commandBody: prefixedCommandBody,
queueKey, followupRun,
resolvedQueue, queueKey,
shouldSteer, resolvedQueue,
shouldFollowup, shouldSteer,
isActive, shouldFollowup,
isStreaming, isActive,
opts, isStreaming,
typing, opts,
sessionEntry, typing,
sessionStore, sessionEntry,
sessionKey, sessionStore: persistedSessionStore,
storePath, sessionKey,
defaultModel, storePath: persistedStorePath,
agentCfgContextTokens: agentCfg?.contextTokens, defaultModel,
resolvedVerboseLevel: resolvedVerboseLevel ?? "off", agentCfgContextTokens: agentCfg?.contextTokens,
isNewSession, resolvedVerboseLevel: resolvedVerboseLevel ?? "off",
blockStreamingEnabled, isNewSession,
blockReplyChunking, blockStreamingEnabled: ephemeralSideTurn ? false : blockStreamingEnabled,
resolvedBlockStreamingBreak, blockReplyChunking,
sessionCtx, resolvedBlockStreamingBreak,
shouldInjectGroupIntro, sessionCtx,
typingMode, shouldInjectGroupIntro,
}); typingMode,
});
} finally {
if (sideTurnSession) {
await fs.rm(sideTurnSession.tempDir, { recursive: true, force: true });
}
}
} }

View File

@ -49,6 +49,23 @@ function buildNativeResetContext(): MsgContext {
}; };
} }
function buildNativeBtwContext(): MsgContext {
return {
Provider: "telegram",
Surface: "telegram",
ChatType: "direct",
Body: "/btw what changed?",
RawBody: "/btw what changed?",
CommandBody: "/btw what changed?",
CommandSource: "native",
CommandAuthorized: true,
SessionKey: "telegram:slash:123",
CommandTargetSessionKey: "agent:main:telegram:direct:123",
From: "telegram:123",
To: "slash:123",
};
}
function createContinueDirectivesResult(resetHookTriggered: boolean) { function createContinueDirectivesResult(resetHookTriggered: boolean) {
return { return {
kind: "continue" as const, kind: "continue" as const,
@ -150,4 +167,80 @@ describe("getReplyFromConfig reset-hook fallback", () => {
expect(mocks.emitResetCommandHooks).not.toHaveBeenCalled(); expect(mocks.emitResetCommandHooks).not.toHaveBeenCalled();
}); });
it("rewrites native /btw to the inline question and resolves the target session read-only", async () => {
mocks.handleInlineActions.mockResolvedValueOnce({ kind: "reply", reply: undefined });
mocks.initSessionState.mockResolvedValueOnce({
sessionCtx: { BodyStripped: "what changed?" },
sessionEntry: {
sessionId: "session-1",
updatedAt: 1,
sessionFile: "/tmp/session-1.jsonl",
},
previousSessionEntry: undefined,
sessionStore: {},
sessionKey: "agent:main:telegram:direct:123",
sessionId: "session-1",
isNewSession: false,
resetTriggered: false,
systemSent: true,
abortedLastRun: false,
storePath: "/tmp/sessions.json",
sessionScope: "per-sender",
groupResolution: undefined,
isGroup: false,
triggerBodyNormalized: "what changed?",
bodyStripped: "what changed?",
});
mocks.resolveReplyDirectives.mockResolvedValueOnce(createContinueDirectivesResult(false));
await getReplyFromConfig(buildNativeBtwContext(), undefined, {});
expect(mocks.initSessionState).toHaveBeenCalledWith(
expect.objectContaining({
ctx: expect.objectContaining({
Body: "what changed?",
CommandBody: "what changed?",
RawBody: "what changed?",
}),
readOnly: true,
}),
);
expect(mocks.resolveReplyDirectives).toHaveBeenCalledWith(
expect.objectContaining({
triggerBodyNormalized: "what changed?",
sessionCtx: expect.objectContaining({
BodyStripped: "what changed?",
}),
}),
);
});
it("returns an error for /btw when there is no active target session", async () => {
mocks.initSessionState.mockResolvedValueOnce({
sessionCtx: {},
sessionEntry: {
sessionId: "session-2",
updatedAt: 2,
},
previousSessionEntry: undefined,
sessionStore: {},
sessionKey: "agent:main:telegram:direct:123",
sessionId: "session-2",
isNewSession: true,
resetTriggered: false,
systemSent: false,
abortedLastRun: false,
storePath: "/tmp/sessions.json",
sessionScope: "per-sender",
groupResolution: undefined,
isGroup: false,
triggerBodyNormalized: "what changed?",
bodyStripped: "what changed?",
});
await expect(getReplyFromConfig(buildNativeBtwContext(), undefined, {})).resolves.toEqual({
text: "❌ No active session found for /btw.",
});
});
}); });

View File

@ -14,6 +14,7 @@ import { applyMediaUnderstanding } from "../../media-understanding/apply.js";
import { defaultRuntime } from "../../runtime.js"; import { defaultRuntime } from "../../runtime.js";
import { normalizeStringEntries } from "../../shared/string-normalization.js"; import { normalizeStringEntries } from "../../shared/string-normalization.js";
import { resolveCommandAuthorization } from "../command-auth.js"; import { resolveCommandAuthorization } from "../command-auth.js";
import { shouldHandleTextCommands } from "../commands-registry.js";
import type { MsgContext } from "../templating.js"; import type { MsgContext } from "../templating.js";
import { SILENT_REPLY_TOKEN } from "../tokens.js"; import { SILENT_REPLY_TOKEN } from "../tokens.js";
import type { GetReplyOptions, ReplyPayload } from "../types.js"; import type { GetReplyOptions, ReplyPayload } from "../types.js";
@ -54,6 +55,18 @@ function mergeSkillFilters(channelFilter?: string[], agentFilter?: string[]): st
return channel.filter((name) => agentSet.has(name)); return channel.filter((name) => agentSet.has(name));
} }
function parseBtwInlineQuestion(text: string | undefined): string | null {
const trimmed = text?.trim();
if (!trimmed) {
return null;
}
const match = trimmed.match(/^\/btw(?:\s+([\s\S]+))?$/i);
if (!match) {
return null;
}
return match[1]?.trim() ?? "";
}
export async function getReplyFromConfig( export async function getReplyFromConfig(
ctx: MsgContext, ctx: MsgContext,
opts?: GetReplyOptions, opts?: GetReplyOptions,
@ -124,6 +137,37 @@ export async function getReplyFromConfig(
opts?.onTypingController?.(typing); opts?.onTypingController?.(typing);
const finalized = finalizeInboundContext(ctx); const finalized = finalizeInboundContext(ctx);
const commandAuthorized = finalized.CommandAuthorized;
const commandAuth = resolveCommandAuthorization({
ctx: finalized,
cfg,
commandAuthorized,
});
const btwQuestion = parseBtwInlineQuestion(
finalized.BodyForCommands ?? finalized.CommandBody ?? finalized.RawBody ?? finalized.Body,
);
const allowBtwSideTurn =
typeof btwQuestion === "string" &&
shouldHandleTextCommands({
cfg,
surface: finalized.Surface ?? "",
commandSource: finalized.CommandSource,
});
const useBtwSideTurn = allowBtwSideTurn && typeof btwQuestion === "string";
if (useBtwSideTurn && !commandAuth.isAuthorizedSender) {
return undefined;
}
if (useBtwSideTurn) {
const btwQuestionText = btwQuestion ?? "";
if (btwQuestionText.length === 0) {
return { text: "⚙️ Usage: /btw <question>" };
}
finalized.Body = btwQuestionText;
finalized.BodyForAgent = btwQuestionText;
finalized.RawBody = btwQuestionText;
finalized.CommandBody = btwQuestionText;
finalized.BodyForCommands = btwQuestionText;
}
if (!isFastTestEnv) { if (!isFastTestEnv) {
await applyMediaUnderstanding({ await applyMediaUnderstanding({
@ -143,16 +187,11 @@ export async function getReplyFromConfig(
isFastTestEnv, isFastTestEnv,
}); });
const commandAuthorized = finalized.CommandAuthorized;
resolveCommandAuthorization({
ctx: finalized,
cfg,
commandAuthorized,
});
const sessionState = await initSessionState({ const sessionState = await initSessionState({
ctx: finalized, ctx: finalized,
cfg, cfg,
commandAuthorized, commandAuthorized,
readOnly: useBtwSideTurn,
}); });
let { let {
sessionCtx, sessionCtx,
@ -173,6 +212,11 @@ export async function getReplyFromConfig(
bodyStripped, bodyStripped,
} = sessionState; } = sessionState;
if (useBtwSideTurn && (isNewSession || !sessionEntry?.sessionFile?.trim())) {
typing.cleanup();
return { text: "❌ No active session found for /btw." };
}
await applyResetModelOverride({ await applyResetModelOverride({
cfg, cfg,
agentId, agentId,
@ -400,5 +444,6 @@ export async function getReplyFromConfig(
storePath, storePath,
workspaceDir, workspaceDir,
abortedLastRun, abortedLastRun,
...(useBtwSideTurn ? { ephemeralSideTurn: { kind: "btw" as const } } : {}),
}); });
} }

View File

@ -77,6 +77,7 @@ export type FollowupRun = {
}; };
timeoutMs: number; timeoutMs: number;
blockReplyBreak: "text_end" | "message_end"; blockReplyBreak: "text_end" | "message_end";
disableTools?: boolean;
ownerNumbers?: string[]; ownerNumbers?: string[];
inputProvenance?: InputProvenance; inputProvenance?: InputProvenance;
extraSystemPrompt?: string; extraSystemPrompt?: string;

View File

@ -170,8 +170,10 @@ export async function initSessionState(params: {
ctx: MsgContext; ctx: MsgContext;
cfg: OpenClawConfig; cfg: OpenClawConfig;
commandAuthorized: boolean; commandAuthorized: boolean;
readOnly?: boolean;
}): Promise<SessionInitResult> { }): Promise<SessionInitResult> {
const { ctx, cfg, commandAuthorized } = params; const { ctx, cfg, commandAuthorized } = params;
const readOnly = params.readOnly === true;
// Native slash commands (Telegram/Discord/Slack) are delivered on a separate // Native slash commands (Telegram/Discord/Slack) are delivered on a separate
// "slash session" key, but should mutate the target chat session. // "slash session" key, but should mutate the target chat session.
const targetSessionKey = const targetSessionKey =
@ -496,18 +498,24 @@ export async function initSessionState(params: {
const fallbackSessionFile = !sessionEntry.sessionFile const fallbackSessionFile = !sessionEntry.sessionFile
? resolveSessionTranscriptPath(sessionEntry.sessionId, agentId, ctx.MessageThreadId) ? resolveSessionTranscriptPath(sessionEntry.sessionId, agentId, ctx.MessageThreadId)
: undefined; : undefined;
const resolvedSessionFile = await resolveAndPersistSessionFile({ if (readOnly) {
sessionId: sessionEntry.sessionId, if (!sessionEntry.sessionFile && fallbackSessionFile) {
sessionKey, sessionEntry.sessionFile = fallbackSessionFile;
sessionStore, }
storePath, } else {
sessionEntry, const resolvedSessionFile = await resolveAndPersistSessionFile({
agentId, sessionId: sessionEntry.sessionId,
sessionsDir: path.dirname(storePath), sessionKey,
fallbackSessionFile, sessionStore,
activeSessionKey: sessionKey, storePath,
}); sessionEntry,
sessionEntry = resolvedSessionFile.sessionEntry; agentId,
sessionsDir: path.dirname(storePath),
fallbackSessionFile,
activeSessionKey: sessionKey,
});
sessionEntry = resolvedSessionFile.sessionEntry;
}
if (isNewSession) { if (isNewSession) {
sessionEntry.compactionCount = 0; sessionEntry.compactionCount = 0;
sessionEntry.memoryFlushCompactionCount = undefined; sessionEntry.memoryFlushCompactionCount = undefined;
@ -519,38 +527,40 @@ export async function initSessionState(params: {
sessionEntry.outputTokens = undefined; sessionEntry.outputTokens = undefined;
sessionEntry.contextTokens = undefined; sessionEntry.contextTokens = undefined;
} }
// Preserve per-session overrides while resetting compaction state on /new. if (!readOnly) {
sessionStore[sessionKey] = { ...sessionStore[sessionKey], ...sessionEntry }; // Preserve per-session overrides while resetting compaction state on /new.
await updateSessionStore( sessionStore[sessionKey] = { ...sessionStore[sessionKey], ...sessionEntry };
storePath, await updateSessionStore(
(store) => {
// Preserve per-session overrides while resetting compaction state on /new.
store[sessionKey] = { ...store[sessionKey], ...sessionEntry };
if (retiredLegacyMainDelivery) {
store[retiredLegacyMainDelivery.key] = retiredLegacyMainDelivery.entry;
}
},
{
activeSessionKey: sessionKey,
onWarn: (warning) =>
deliverSessionMaintenanceWarning({
cfg,
sessionKey,
entry: sessionEntry,
warning,
}),
},
);
// Archive old transcript so it doesn't accumulate on disk (#14869).
if (previousSessionEntry?.sessionId) {
archiveSessionTranscripts({
sessionId: previousSessionEntry.sessionId,
storePath, storePath,
sessionFile: previousSessionEntry.sessionFile, (store) => {
agentId, // Preserve per-session overrides while resetting compaction state on /new.
reason: "reset", store[sessionKey] = { ...store[sessionKey], ...sessionEntry };
}); if (retiredLegacyMainDelivery) {
store[retiredLegacyMainDelivery.key] = retiredLegacyMainDelivery.entry;
}
},
{
activeSessionKey: sessionKey,
onWarn: (warning) =>
deliverSessionMaintenanceWarning({
cfg,
sessionKey,
entry: sessionEntry,
warning,
}),
},
);
// Archive old transcript so it doesn't accumulate on disk (#14869).
if (previousSessionEntry?.sessionId) {
archiveSessionTranscripts({
sessionId: previousSessionEntry.sessionId,
storePath,
sessionFile: previousSessionEntry.sessionFile,
agentId,
reason: "reset",
});
}
} }
const sessionCtx: TemplateContext = { const sessionCtx: TemplateContext = {
@ -571,7 +581,7 @@ export async function initSessionState(params: {
}; };
// Run session plugin hooks (fire-and-forget) // Run session plugin hooks (fire-and-forget)
const hookRunner = getGlobalHookRunner(); const hookRunner = readOnly ? null : getGlobalHookRunner();
if (hookRunner && isNewSession) { if (hookRunner && isNewSession) {
const effectiveSessionId = sessionId ?? ""; const effectiveSessionId = sessionId ?? "";

View File

@ -67,6 +67,7 @@ const METHOD_SCOPE_GROUPS: Record<OperatorScope, readonly string[]> = {
"voicewake.get", "voicewake.get",
"sessions.list", "sessions.list",
"sessions.get", "sessions.get",
"pty.list",
"sessions.preview", "sessions.preview",
"sessions.resolve", "sessions.resolve",
"sessions.usage", "sessions.usage",

View File

@ -5,6 +5,11 @@ export const ErrorCodes = {
NOT_PAIRED: "NOT_PAIRED", NOT_PAIRED: "NOT_PAIRED",
AGENT_TIMEOUT: "AGENT_TIMEOUT", AGENT_TIMEOUT: "AGENT_TIMEOUT",
INVALID_REQUEST: "INVALID_REQUEST", INVALID_REQUEST: "INVALID_REQUEST",
INVALID_PARAMS: "INVALID_PARAMS",
NOT_FOUND: "NOT_FOUND",
FORBIDDEN: "FORBIDDEN",
RESOURCE_LIMIT: "RESOURCE_LIMIT",
PAYLOAD_TOO_LARGE: "PAYLOAD_TOO_LARGE",
UNAVAILABLE: "UNAVAILABLE", UNAVAILABLE: "UNAVAILABLE",
} as const; } as const;

View File

@ -0,0 +1,132 @@
import { afterEach, beforeEach, describe, expect, it, vi } from "vitest";
const spawnMock = vi.fn();
vi.mock("@lydell/node-pty", () => ({
spawn: spawnMock,
}));
function makePtyHandle() {
let dataListener: ((value: string) => void) | null = null;
let exitListener: ((event: { exitCode: number }) => void) | null = null;
return {
pid: 123,
write: vi.fn(),
resize: vi.fn(),
kill: vi.fn(),
onData: vi.fn((listener: (value: string) => void) => {
dataListener = listener;
return { dispose: vi.fn() };
}),
onExit: vi.fn((listener: (event: { exitCode: number }) => void) => {
exitListener = listener;
return { dispose: vi.fn() };
}),
emitData(value: string) {
dataListener?.(value);
},
emitExit(code: number) {
exitListener?.({ exitCode: code });
},
};
}
describe("gateway pty manager", () => {
beforeEach(() => {
vi.resetModules();
vi.clearAllMocks();
delete process.env.OPENCLAW_PTY_MAX_SESSIONS_PER_OWNER;
delete process.env.OPENCLAW_PTY_MAX_TOTAL_SESSIONS;
delete process.env.OPENCLAW_PTY_MAX_INPUT_CHUNK_BYTES;
delete process.env.OPENCLAW_PTY_MIN_COLS;
delete process.env.OPENCLAW_PTY_MAX_COLS;
delete process.env.OPENCLAW_PTY_MIN_ROWS;
delete process.env.OPENCLAW_PTY_MAX_ROWS;
});
afterEach(async () => {
const mod = await import("./pty-manager.js");
for (const session of mod.listGatewayPtySessionsByOwner("device:one")) {
mod.destroyGatewayPtySession(session.sessionId);
}
for (const session of mod.listGatewayPtySessionsByOwner("device:two")) {
mod.destroyGatewayPtySession(session.sessionId);
}
});
it("enforces per-owner and total session limits", async () => {
process.env.OPENCLAW_PTY_MAX_SESSIONS_PER_OWNER = "1";
process.env.OPENCLAW_PTY_MAX_TOTAL_SESSIONS = "2";
spawnMock.mockImplementation(() => makePtyHandle());
const mod = await import("./pty-manager.js");
await mod.createGatewayPtySession({
owner: { ownerKey: "device:one", connId: "conn-1" },
onOutput: vi.fn(),
onExit: vi.fn(),
});
await mod.createGatewayPtySession({
owner: { ownerKey: "device:two", connId: "conn-2" },
onOutput: vi.fn(),
onExit: vi.fn(),
});
await expect(
mod.createGatewayPtySession({
owner: { ownerKey: "device:one", connId: "conn-1" },
onOutput: vi.fn(),
onExit: vi.fn(),
}),
).rejects.toMatchObject({ code: "PTY_LIMIT_REACHED" });
});
it("enforces resize and input limits and exposes lastActive metadata", async () => {
process.env.OPENCLAW_PTY_MAX_INPUT_CHUNK_BYTES = "4";
process.env.OPENCLAW_PTY_MIN_COLS = "10";
process.env.OPENCLAW_PTY_MAX_COLS = "100";
process.env.OPENCLAW_PTY_MIN_ROWS = "5";
process.env.OPENCLAW_PTY_MAX_ROWS = "50";
const handle = makePtyHandle();
spawnMock.mockImplementation(() => handle);
const mod = await import("./pty-manager.js");
const session = await mod.createGatewayPtySession({
owner: { ownerKey: "device:one", connId: "conn-1" },
onOutput: vi.fn(),
onExit: vi.fn(),
});
expect(mod.listGatewayPtySessionsByOwner("device:one")[0]).toMatchObject({
sessionId: session.sessionId,
createdAt: expect.any(Number),
lastActive: expect.any(Number),
cols: 80,
rows: 24,
});
expect(() => mod.writeGatewayPtySession(session.sessionId, "12345")).toThrowError(
/exceeds 4 bytes/,
);
expect(() => mod.resizeGatewayPtySession(session.sessionId, 9, 6)).toThrowError(
/cols must be between 10 and 100/,
);
expect(() => mod.resizeGatewayPtySession(session.sessionId, 20, 51)).toThrowError(
/rows must be between 5 and 50/,
);
});
it("kills sessions bound to a disconnected connection", async () => {
const handle = makePtyHandle();
spawnMock.mockImplementation(() => handle);
const mod = await import("./pty-manager.js");
const session = await mod.createGatewayPtySession({
owner: { ownerKey: "device:one", connId: "conn-1" },
onOutput: vi.fn(),
onExit: vi.fn(),
});
expect(mod.destroyGatewayPtySessionsForConn("conn-1")).toBe(1);
expect(mod.getGatewayPtySession(session.sessionId)).toBeUndefined();
});
});

442
src/gateway/pty-manager.ts Normal file
View File

@ -0,0 +1,442 @@
import crypto from "node:crypto";
import os from "node:os";
import path from "node:path";
export type GatewayPtyOwner = {
ownerKey: string;
connId: string;
deviceId?: string;
};
export type GatewayPtySession = {
sessionId: string;
owner: GatewayPtyOwner;
shell: string;
cwd: string;
cols: number;
rows: number;
createdAt: number;
lastActive: number;
exitedAt?: number;
exitCode?: number | null;
};
type PtyExitEvent = { exitCode: number; signal?: number };
type PtyDisposable = { dispose: () => void };
type PtySpawnHandle = {
pid: number;
write: (data: string | Buffer) => void;
resize?: (cols: number, rows: number) => void;
onData: (listener: (value: string) => void) => PtyDisposable | void;
onExit: (listener: (event: PtyExitEvent) => void) => PtyDisposable | void;
kill: (signal?: string) => void;
};
type PtySpawn = (
file: string,
args: string[] | string,
options: {
name?: string;
cols?: number;
rows?: number;
cwd?: string;
env?: Record<string, string>;
},
) => PtySpawnHandle;
type PtyModule = {
spawn?: PtySpawn;
default?: { spawn?: PtySpawn };
};
type ActiveSession = GatewayPtySession & {
pty: PtySpawnHandle;
outputDispose?: PtyDisposable | null;
exitDispose?: PtyDisposable | null;
};
export class GatewayPtyError extends Error {
code:
| "PTY_NOT_FOUND"
| "PTY_ACCESS_DENIED"
| "PTY_INVALID_ARGS"
| "PTY_LIMIT_REACHED"
| "PTY_INPUT_TOO_LARGE";
constructor(
code:
| "PTY_NOT_FOUND"
| "PTY_ACCESS_DENIED"
| "PTY_INVALID_ARGS"
| "PTY_LIMIT_REACHED"
| "PTY_INPUT_TOO_LARGE",
message: string,
) {
super(message);
this.name = "GatewayPtyError";
this.code = code;
}
}
const sessions = new Map<string, ActiveSession>();
let idleSweepTimer: NodeJS.Timeout | null = null;
function intFromEnv(name: string, fallback: number): number {
const raw = process.env[name]?.trim();
if (!raw) {
return fallback;
}
const parsed = Number(raw);
if (!Number.isFinite(parsed)) {
return fallback;
}
return Math.floor(parsed);
}
function getPtyLimits() {
const minCols = Math.max(1, intFromEnv("OPENCLAW_PTY_MIN_COLS", 20));
const maxCols = Math.max(minCols, intFromEnv("OPENCLAW_PTY_MAX_COLS", 500));
const minRows = Math.max(1, intFromEnv("OPENCLAW_PTY_MIN_ROWS", 5));
const maxRows = Math.max(minRows, intFromEnv("OPENCLAW_PTY_MAX_ROWS", 200));
return {
minCols,
maxCols,
minRows,
maxRows,
maxSessionsPerOwner: Math.max(1, intFromEnv("OPENCLAW_PTY_MAX_SESSIONS_PER_OWNER", 4)),
maxTotalSessions: Math.max(1, intFromEnv("OPENCLAW_PTY_MAX_TOTAL_SESSIONS", 32)),
maxInputChunkBytes: Math.max(1, intFromEnv("OPENCLAW_PTY_MAX_INPUT_CHUNK_BYTES", 65536)),
idleTimeoutMs: Math.max(0, intFromEnv("OPENCLAW_PTY_IDLE_TIMEOUT_MS", 30 * 60 * 1000)),
idleSweepIntervalMs: Math.max(
1000,
intFromEnv("OPENCLAW_PTY_IDLE_SWEEP_INTERVAL_MS", 60 * 1000),
),
};
}
function sanitizeInitialDim(
value: unknown,
fallback: number,
min: number,
max: number,
label: string,
): number {
if (value == null) {
return fallback;
}
const n = typeof value === "number" ? value : Number(value);
if (!Number.isFinite(n)) {
throw new GatewayPtyError("PTY_INVALID_ARGS", `${label} must be a finite number`);
}
const next = Math.floor(n);
if (next < min || next > max) {
throw new GatewayPtyError("PTY_INVALID_ARGS", `${label} must be between ${min} and ${max}`);
}
return next;
}
function sanitizeResizeDim(
value: unknown,
current: number,
min: number,
max: number,
label: string,
): number {
if (value == null) {
return current;
}
const n = typeof value === "number" ? value : Number(value);
if (!Number.isFinite(n)) {
throw new GatewayPtyError("PTY_INVALID_ARGS", `${label} must be a finite number`);
}
const next = Math.floor(n);
if (next < min || next > max) {
throw new GatewayPtyError("PTY_INVALID_ARGS", `${label} must be between ${min} and ${max}`);
}
return next;
}
function resolveDefaultShell(): string {
const shell = (process.env.OPENCLAW_PTY_SHELL || process.env.SHELL || "").trim();
if (shell) {
return shell;
}
return process.platform === "win32" ? "powershell.exe" : "/bin/zsh";
}
function resolveAllowedShells(defaultShell: string): Set<string> {
const raw = (process.env.OPENCLAW_PTY_ALLOWED_SHELLS || "").trim();
const values = raw
? raw
.split(",")
.map((v) => v.trim())
.filter(Boolean)
: [defaultShell];
return new Set(values);
}
function resolveShell(requested?: string): string {
const defaultShell = resolveDefaultShell();
if (!requested?.trim()) {
return defaultShell;
}
const candidate = requested.trim();
const allowed = resolveAllowedShells(defaultShell);
if (!allowed.has(candidate)) {
throw new GatewayPtyError("PTY_INVALID_ARGS", `shell is not allowed: ${candidate}`);
}
return candidate;
}
function resolveCwd(requested?: string): string {
const base = process.env.OPENCLAW_PTY_CWD || process.cwd();
const home = os.homedir();
const fallback = path.resolve(base || home);
if (!requested?.trim()) {
return fallback;
}
const expanded = requested.startsWith("~/") ? path.join(home, requested.slice(2)) : requested;
return path.resolve(expanded);
}
function toStringEnv(env: NodeJS.ProcessEnv): Record<string, string> {
const out: Record<string, string> = {};
for (const [key, value] of Object.entries(env)) {
if (typeof value === "string") {
out[key] = value;
}
}
return out;
}
async function loadSpawn(): Promise<PtySpawn> {
const mod = (await import("@lydell/node-pty")) as unknown as PtyModule;
const spawn = mod.spawn ?? mod.default?.spawn;
if (!spawn) {
throw new Error("PTY support is unavailable");
}
return spawn;
}
function publicSession(session: ActiveSession): GatewayPtySession {
return {
sessionId: session.sessionId,
owner: { ...session.owner },
shell: session.shell,
cwd: session.cwd,
cols: session.cols,
rows: session.rows,
createdAt: session.createdAt,
lastActive: session.lastActive,
exitedAt: session.exitedAt,
exitCode: session.exitCode,
};
}
function markActive(session: ActiveSession): void {
session.lastActive = Date.now();
}
function ensureCapacity(ownerKey: string): void {
const limits = getPtyLimits();
if (sessions.size >= limits.maxTotalSessions) {
throw new GatewayPtyError(
"PTY_LIMIT_REACHED",
`PTY session limit reached (${limits.maxTotalSessions} total)`,
);
}
const ownerSessions = Array.from(sessions.values()).filter(
(session) => session.owner.ownerKey === ownerKey,
);
if (ownerSessions.length >= limits.maxSessionsPerOwner) {
throw new GatewayPtyError(
"PTY_LIMIT_REACHED",
`PTY session limit reached (${limits.maxSessionsPerOwner} per owner)`,
);
}
}
function ensureIdleSweep(): void {
const { idleTimeoutMs, idleSweepIntervalMs } = getPtyLimits();
if (idleTimeoutMs <= 0) {
if (idleSweepTimer) {
clearInterval(idleSweepTimer);
idleSweepTimer = null;
}
return;
}
if (idleSweepTimer) {
return;
}
idleSweepTimer = setInterval(() => {
const now = Date.now();
for (const session of sessions.values()) {
if (now - session.lastActive >= idleTimeoutMs) {
destroyGatewayPtySession(session.sessionId);
}
}
if (sessions.size === 0 && idleSweepTimer) {
clearInterval(idleSweepTimer);
idleSweepTimer = null;
}
}, idleSweepIntervalMs);
idleSweepTimer.unref?.();
}
export async function createGatewayPtySession(params: {
owner: GatewayPtyOwner;
cols?: number;
rows?: number;
cwd?: string;
shell?: string;
onOutput: (event: { sessionId: string; data: string; connId: string }) => void;
onExit: (event: { sessionId: string; code: number | null; connId: string }) => void;
}): Promise<GatewayPtySession> {
ensureCapacity(params.owner.ownerKey);
const spawn = await loadSpawn();
const limits = getPtyLimits();
const cols = sanitizeInitialDim(params.cols, 80, limits.minCols, limits.maxCols, "cols");
const rows = sanitizeInitialDim(params.rows, 24, limits.minRows, limits.maxRows, "rows");
const shell = resolveShell(params.shell);
const cwd = resolveCwd(params.cwd);
const sessionId = crypto.randomUUID();
const now = Date.now();
const pty = spawn(shell, [], {
name: process.env.TERM || "xterm-256color",
cols,
rows,
cwd,
env: toStringEnv(process.env),
});
const session: ActiveSession = {
sessionId,
owner: { ...params.owner },
shell,
cwd,
cols,
rows,
createdAt: now,
lastActive: now,
pty,
};
session.outputDispose =
pty.onData((data) => {
markActive(session);
params.onOutput({ sessionId, data, connId: session.owner.connId });
}) ?? null;
session.exitDispose =
pty.onExit((event) => {
session.exitedAt = Date.now();
session.exitCode = event.exitCode ?? null;
try {
params.onExit({ sessionId, code: session.exitCode, connId: session.owner.connId });
} finally {
destroyGatewayPtySession(sessionId);
}
}) ?? null;
sessions.set(sessionId, session);
ensureIdleSweep();
return publicSession(session);
}
export function listGatewayPtySessionsByOwner(ownerKey: string): GatewayPtySession[] {
return Array.from(sessions.values())
.filter((session) => session.owner.ownerKey === ownerKey)
.map(publicSession);
}
export function getGatewayPtySession(sessionId: string): GatewayPtySession | undefined {
const session = sessions.get(sessionId);
return session ? publicSession(session) : undefined;
}
export function touchGatewayPtySessionOwner(params: { sessionId: string; connId: string }): void {
const session = sessions.get(params.sessionId);
if (!session) {
return;
}
session.owner.connId = params.connId;
markActive(session);
}
export function writeGatewayPtySession(sessionId: string, data: string): void {
const session = sessions.get(sessionId);
if (!session) {
throw new GatewayPtyError("PTY_NOT_FOUND", `PTY session not found: ${sessionId}`);
}
const byteLength = Buffer.byteLength(data, "utf8");
const { maxInputChunkBytes } = getPtyLimits();
if (byteLength > maxInputChunkBytes) {
throw new GatewayPtyError(
"PTY_INPUT_TOO_LARGE",
`PTY input exceeds ${maxInputChunkBytes} bytes`,
);
}
markActive(session);
session.pty.write(data);
}
export function resizeGatewayPtySession(sessionId: string, cols?: number, rows?: number): void {
const session = sessions.get(sessionId);
if (!session) {
throw new GatewayPtyError("PTY_NOT_FOUND", `PTY session not found: ${sessionId}`);
}
const limits = getPtyLimits();
const nextCols = sanitizeResizeDim(cols, session.cols, limits.minCols, limits.maxCols, "cols");
const nextRows = sanitizeResizeDim(rows, session.rows, limits.minRows, limits.maxRows, "rows");
session.cols = nextCols;
session.rows = nextRows;
markActive(session);
session.pty.resize?.(nextCols, nextRows);
}
export function destroyGatewayPtySession(sessionId: string): void {
const session = sessions.get(sessionId);
if (!session) {
return;
}
sessions.delete(sessionId);
try {
session.outputDispose?.dispose();
} catch {}
try {
session.exitDispose?.dispose();
} catch {}
try {
session.pty.kill("SIGKILL");
} catch {}
if (sessions.size === 0 && idleSweepTimer) {
clearInterval(idleSweepTimer);
idleSweepTimer = null;
}
}
export function destroyGatewayPtySessionsForConn(connId: string): number {
const sessionIds = Array.from(sessions.values())
.filter((session) => session.owner.connId === connId)
.map((session) => session.sessionId);
for (const sessionId of sessionIds) {
destroyGatewayPtySession(sessionId);
}
return sessionIds.length;
}
export function assertGatewayPtyOwnership(params: {
sessionId: string;
ownerKey: string;
connId: string;
}): GatewayPtySession {
const session = sessions.get(params.sessionId);
if (!session) {
throw new GatewayPtyError("PTY_NOT_FOUND", `PTY session not found: ${params.sessionId}`);
}
if (session.owner.ownerKey !== params.ownerKey) {
throw new GatewayPtyError(
"PTY_ACCESS_DENIED",
`PTY session does not belong to this gateway client: ${params.sessionId}`,
);
}
session.owner.connId = params.connId;
markActive(session);
return publicSession(session);
}

View File

@ -99,6 +99,11 @@ const BASE_METHODS = [
"agent.identity.get", "agent.identity.get",
"agent.wait", "agent.wait",
"browser.request", "browser.request",
"pty.create",
"pty.write",
"pty.resize",
"pty.kill",
"pty.list",
// WebChat WebSocket-native chat methods // WebChat WebSocket-native chat methods
"chat.history", "chat.history",
"chat.abort", "chat.abort",

View File

@ -20,6 +20,7 @@ import { logsHandlers } from "./server-methods/logs.js";
import { modelsHandlers } from "./server-methods/models.js"; import { modelsHandlers } from "./server-methods/models.js";
import { nodePendingHandlers } from "./server-methods/nodes-pending.js"; import { nodePendingHandlers } from "./server-methods/nodes-pending.js";
import { nodeHandlers } from "./server-methods/nodes.js"; import { nodeHandlers } from "./server-methods/nodes.js";
import { ptyHandlers } from "./server-methods/pty.js";
import { pushHandlers } from "./server-methods/push.js"; import { pushHandlers } from "./server-methods/push.js";
import { sendHandlers } from "./server-methods/send.js"; import { sendHandlers } from "./server-methods/send.js";
import { sessionsHandlers } from "./server-methods/sessions.js"; import { sessionsHandlers } from "./server-methods/sessions.js";
@ -90,6 +91,7 @@ export const coreGatewayHandlers: GatewayRequestHandlers = {
...nodeHandlers, ...nodeHandlers,
...nodePendingHandlers, ...nodePendingHandlers,
...pushHandlers, ...pushHandlers,
...ptyHandlers,
...sendHandlers, ...sendHandlers,
...usageHandlers, ...usageHandlers,
...agentHandlers, ...agentHandlers,

View File

@ -0,0 +1,151 @@
import { ErrorCodes, errorShape } from "../protocol/index.js";
import {
assertGatewayPtyOwnership,
createGatewayPtySession,
destroyGatewayPtySession,
GatewayPtyError,
listGatewayPtySessionsByOwner,
resizeGatewayPtySession,
writeGatewayPtySession,
} from "../pty-manager.js";
import type { GatewayRequestHandlers } from "./types.js";
function getPtyOwner(client: { connect?: { device?: { id?: string } }; connId?: string } | null): {
ownerKey: string;
connId: string;
deviceId?: string;
} {
const connId = client?.connId?.trim();
if (!connId) {
throw new GatewayPtyError(
"PTY_INVALID_ARGS",
"PTY requires an authenticated gateway connection",
);
}
const deviceId = client?.connect?.device?.id?.trim() || undefined;
return {
ownerKey: deviceId ? `device:${deviceId}` : `conn:${connId}`,
connId,
deviceId,
};
}
function invalidParams(message: string) {
return errorShape(ErrorCodes.INVALID_PARAMS, message);
}
function asString(value: unknown): string | undefined {
return typeof value === "string" ? value : undefined;
}
function asNumber(value: unknown): number | undefined {
return typeof value === "number" && Number.isFinite(value) ? value : undefined;
}
function mapPtyError(error: unknown) {
if (error instanceof GatewayPtyError) {
switch (error.code) {
case "PTY_NOT_FOUND":
return errorShape(ErrorCodes.NOT_FOUND, error.message);
case "PTY_ACCESS_DENIED":
return errorShape(ErrorCodes.FORBIDDEN, error.message);
case "PTY_LIMIT_REACHED":
return errorShape(ErrorCodes.RESOURCE_LIMIT, error.message);
case "PTY_INPUT_TOO_LARGE":
return errorShape(ErrorCodes.PAYLOAD_TOO_LARGE, error.message);
case "PTY_INVALID_ARGS":
default:
return invalidParams(error.message);
}
}
return invalidParams(error instanceof Error ? error.message : String(error));
}
export const ptyHandlers: GatewayRequestHandlers = {
"pty.create": async ({ client, params, respond, context }) => {
try {
const owner = getPtyOwner(client);
const session = await createGatewayPtySession({
owner,
cols: asNumber(params.cols),
rows: asNumber(params.rows),
cwd: asString(params.cwd),
shell: asString(params.shell),
onOutput: ({ sessionId, data, connId }) => {
context.broadcastToConnIds("pty.output", { sessionId, data }, new Set([connId]));
},
onExit: ({ sessionId, code, connId }) => {
context.broadcastToConnIds("pty.exit", { sessionId, code }, new Set([connId]));
},
});
respond(true, { sessionId: session.sessionId, cwd: session.cwd, shell: session.shell });
} catch (error) {
respond(false, undefined, mapPtyError(error));
}
},
"pty.write": ({ client, params, respond }) => {
try {
const owner = getPtyOwner(client);
const sessionId = asString(params.sessionId)?.trim();
const data = asString(params.data);
if (!sessionId) {
respond(false, undefined, invalidParams("pty.write requires sessionId"));
return;
}
if (typeof data !== "string") {
respond(false, undefined, invalidParams("pty.write requires string data"));
return;
}
assertGatewayPtyOwnership({ sessionId, ownerKey: owner.ownerKey, connId: owner.connId });
writeGatewayPtySession(sessionId, data);
respond(true, { ok: true });
} catch (error) {
respond(false, undefined, mapPtyError(error));
}
},
"pty.resize": ({ client, params, respond }) => {
try {
const owner = getPtyOwner(client);
const sessionId = asString(params.sessionId)?.trim();
if (!sessionId) {
respond(false, undefined, invalidParams("pty.resize requires sessionId"));
return;
}
assertGatewayPtyOwnership({ sessionId, ownerKey: owner.ownerKey, connId: owner.connId });
resizeGatewayPtySession(sessionId, asNumber(params.cols), asNumber(params.rows));
respond(true, { ok: true });
} catch (error) {
respond(false, undefined, mapPtyError(error));
}
},
"pty.kill": ({ client, params, respond }) => {
try {
const owner = getPtyOwner(client);
const sessionId = asString(params.sessionId)?.trim();
if (!sessionId) {
respond(false, undefined, invalidParams("pty.kill requires sessionId"));
return;
}
assertGatewayPtyOwnership({ sessionId, ownerKey: owner.ownerKey, connId: owner.connId });
destroyGatewayPtySession(sessionId);
respond(true, { ok: true });
} catch (error) {
respond(false, undefined, mapPtyError(error));
}
},
"pty.list": ({ client, respond }) => {
try {
const owner = getPtyOwner(client);
const sessions = listGatewayPtySessionsByOwner(owner.ownerKey).map((session) => ({
sessionId: session.sessionId,
createdAt: session.createdAt,
lastActive: session.lastActive,
cols: session.cols,
rows: session.rows,
}));
respond(true, { sessions });
} catch (error) {
respond(false, undefined, mapPtyError(error));
}
},
};

View File

@ -9,6 +9,7 @@ import { isWebchatClient } from "../../utils/message-channel.js";
import type { AuthRateLimiter } from "../auth-rate-limit.js"; import type { AuthRateLimiter } from "../auth-rate-limit.js";
import type { ResolvedGatewayAuth } from "../auth.js"; import type { ResolvedGatewayAuth } from "../auth.js";
import { isLoopbackAddress } from "../net.js"; import { isLoopbackAddress } from "../net.js";
import { destroyGatewayPtySessionsForConn } from "../pty-manager.js";
import { getHandshakeTimeoutMs } from "../server-constants.js"; import { getHandshakeTimeoutMs } from "../server-constants.js";
import type { GatewayRequestContext, GatewayRequestHandlers } from "../server-methods/types.js"; import type { GatewayRequestContext, GatewayRequestHandlers } from "../server-methods/types.js";
import { formatError } from "../server-utils.js"; import { formatError } from "../server-utils.js";
@ -242,6 +243,9 @@ export function attachGatewayWsConnectionHandler(params: AttachGatewayWsConnecti
upsertPresence(client.presenceKey, { reason: "disconnect" }); upsertPresence(client.presenceKey, { reason: "disconnect" });
broadcastPresenceSnapshot({ broadcast, incrementPresenceVersion, getHealthVersion }); broadcastPresenceSnapshot({ broadcast, incrementPresenceVersion, getHealthVersion });
} }
if (client?.connId) {
destroyGatewayPtySessionsForConn(client.connId);
}
if (client?.connect?.role === "node") { if (client?.connect?.role === "node") {
const context = buildRequestContext(); const context = buildRequestContext();
const nodeId = context.nodeRegistry.unregister(connId); const nodeId = context.nodeRegistry.unregister(connId);

View File

@ -9,6 +9,7 @@ type GatewayClientMock = {
start: ReturnType<typeof vi.fn>; start: ReturnType<typeof vi.fn>;
stop: ReturnType<typeof vi.fn>; stop: ReturnType<typeof vi.fn>;
options: { clientVersion?: string }; options: { clientVersion?: string };
emitHello: (hello: Record<string, unknown>) => void;
emitClose: (info: { emitClose: (info: {
code: number; code: number;
reason?: string; reason?: string;
@ -39,6 +40,7 @@ vi.mock("./gateway.ts", () => {
constructor( constructor(
private opts: { private opts: {
clientVersion?: string; clientVersion?: string;
onHello?: (hello: Record<string, unknown>) => void;
onClose?: (info: { onClose?: (info: {
code: number; code: number;
reason: string; reason: string;
@ -52,6 +54,9 @@ vi.mock("./gateway.ts", () => {
start: this.start, start: this.start,
stop: this.stop, stop: this.stop,
options: { clientVersion: this.opts.clientVersion }, options: { clientVersion: this.opts.clientVersion },
emitHello: (hello) => {
this.opts.onHello?.(hello as never);
},
emitClose: (info) => { emitClose: (info) => {
this.opts.onClose?.({ this.opts.onClose?.({
code: info.code, code: info.code,
@ -158,6 +163,31 @@ describe("connectGateway", () => {
); );
}); });
it("falls back to the main session when persisted session keys point at cron chats", () => {
const host = createHost();
host.sessionKey = "agent:main:cron:nightly-brief";
host.settings.sessionKey = "agent:main:cron:nightly-brief";
host.settings.lastActiveSessionKey = "cron:nightly-brief";
connectGateway(host);
const client = gatewayClientInstances[0];
expect(client).toBeDefined();
client.emitHello({
snapshot: {
sessionDefaults: {
mainSessionKey: "agent:main:main",
mainKey: "main",
defaultAgentId: "main",
},
},
});
expect(host.sessionKey).toBe("agent:main:main");
expect(host.settings.sessionKey).toBe("agent:main:main");
expect(host.settings.lastActiveSessionKey).toBe("agent:main:main");
});
it("ignores stale client onEvent callbacks after reconnect", () => { it("ignores stale client onEvent callbacks after reconnect", () => {
const host = createHost(); const host = createHost();

View File

@ -118,6 +118,24 @@ export function resolveControlUiClientVersion(params: {
} }
} }
function isCronSessionKey(value: string | undefined): boolean {
const normalized = (value ?? "").trim().toLowerCase();
if (!normalized) {
return false;
}
if (normalized.startsWith("cron:")) {
return true;
}
if (!normalized.startsWith("agent:")) {
return false;
}
const parts = normalized.split(":").filter(Boolean);
if (parts.length < 3) {
return false;
}
return parts.slice(2).join(":").startsWith("cron:");
}
function normalizeSessionKeyForDefaults( function normalizeSessionKeyForDefaults(
value: string | undefined, value: string | undefined,
defaults: SessionDefaultsSnapshot, defaults: SessionDefaultsSnapshot,
@ -130,6 +148,9 @@ function normalizeSessionKeyForDefaults(
if (!raw) { if (!raw) {
return mainSessionKey; return mainSessionKey;
} }
if (isCronSessionKey(raw)) {
return mainSessionKey;
}
const mainKey = defaults.mainKey?.trim() || "main"; const mainKey = defaults.mainKey?.trim() || "main";
const defaultAgentId = defaults.defaultAgentId?.trim(); const defaultAgentId = defaults.defaultAgentId?.trim();
const isAlias = const isAlias =

View File

@ -1,6 +1,7 @@
const KEY = "openclaw.control.settings.v1"; const KEY = "openclaw.control.settings.v1";
const LEGACY_TOKEN_SESSION_KEY = "openclaw.control.token.v1"; const LEGACY_TOKEN_SESSION_KEY = "openclaw.control.token.v1";
const TOKEN_SESSION_KEY_PREFIX = "openclaw.control.token.v1:"; const TOKEN_SESSION_KEY_PREFIX = "openclaw.control.token.v1:";
const TOKEN_LOCAL_KEY_PREFIX = "openclaw.control.token.persisted.v1:";
type PersistedUiSettings = Omit<UiSettings, "token"> & { token?: never }; type PersistedUiSettings = Omit<UiSettings, "token"> & { token?: never };
@ -11,6 +12,7 @@ import { parseThemeSelection, type ThemeMode, type ThemeName } from "./theme.ts"
export type UiSettings = { export type UiSettings = {
gatewayUrl: string; gatewayUrl: string;
token: string; token: string;
rememberGatewayAuth: boolean;
sessionKey: string; sessionKey: string;
lastActiveSessionKey: string; lastActiveSessionKey: string;
theme: ThemeName; theme: ThemeName;
@ -86,6 +88,10 @@ function tokenSessionKeyForGateway(gatewayUrl: string): string {
return `${TOKEN_SESSION_KEY_PREFIX}${normalizeGatewayTokenScope(gatewayUrl)}`; return `${TOKEN_SESSION_KEY_PREFIX}${normalizeGatewayTokenScope(gatewayUrl)}`;
} }
function tokenLocalKeyForGateway(gatewayUrl: string): string {
return `${TOKEN_LOCAL_KEY_PREFIX}${normalizeGatewayTokenScope(gatewayUrl)}`;
}
function loadSessionToken(gatewayUrl: string): string { function loadSessionToken(gatewayUrl: string): string {
try { try {
const storage = getSessionStorage(); const storage = getSessionStorage();
@ -119,12 +125,37 @@ function persistSessionToken(gatewayUrl: string, token: string) {
} }
} }
function loadRememberedToken(gatewayUrl: string): string {
try {
const token = localStorage.getItem(tokenLocalKeyForGateway(gatewayUrl)) ?? "";
return token.trim();
} catch {
return "";
}
}
function persistGatewayToken(gatewayUrl: string, token: string, remember: boolean) {
try {
const normalized = token.trim();
persistSessionToken(gatewayUrl, remember ? "" : normalized);
const localKey = tokenLocalKeyForGateway(gatewayUrl);
if (remember && normalized) {
localStorage.setItem(localKey, normalized);
} else {
localStorage.removeItem(localKey);
}
} catch {
// best-effort
}
}
export function loadSettings(): UiSettings { export function loadSettings(): UiSettings {
const { pageUrl: pageDerivedUrl, effectiveUrl: defaultUrl } = deriveDefaultGatewayUrl(); const { pageUrl: pageDerivedUrl, effectiveUrl: defaultUrl } = deriveDefaultGatewayUrl();
const defaults: UiSettings = { const defaults: UiSettings = {
gatewayUrl: defaultUrl, gatewayUrl: defaultUrl,
token: loadSessionToken(defaultUrl), token: loadSessionToken(defaultUrl),
rememberGatewayAuth: false,
sessionKey: "main", sessionKey: "main",
lastActiveSessionKey: "main", lastActiveSessionKey: "main",
theme: "claw", theme: "claw",
@ -152,10 +183,12 @@ export function loadSettings(): UiSettings {
(parsed as { theme?: unknown }).theme, (parsed as { theme?: unknown }).theme,
(parsed as { themeMode?: unknown }).themeMode, (parsed as { themeMode?: unknown }).themeMode,
); );
const rememberGatewayAuth =
typeof parsed.rememberGatewayAuth === "boolean" ? parsed.rememberGatewayAuth : false;
const settings = { const settings = {
gatewayUrl, gatewayUrl,
// Gateway auth is intentionally in-memory only; scrub any legacy persisted token on load. token: rememberGatewayAuth ? loadRememberedToken(gatewayUrl) : loadSessionToken(gatewayUrl),
token: loadSessionToken(gatewayUrl), rememberGatewayAuth,
sessionKey: sessionKey:
typeof parsed.sessionKey === "string" && parsed.sessionKey.trim() typeof parsed.sessionKey === "string" && parsed.sessionKey.trim()
? parsed.sessionKey.trim() ? parsed.sessionKey.trim()
@ -205,9 +238,10 @@ export function saveSettings(next: UiSettings) {
} }
function persistSettings(next: UiSettings) { function persistSettings(next: UiSettings) {
persistSessionToken(next.gatewayUrl, next.token); persistGatewayToken(next.gatewayUrl, next.token, next.rememberGatewayAuth);
const persisted: PersistedUiSettings = { const persisted: PersistedUiSettings = {
gatewayUrl: next.gatewayUrl, gatewayUrl: next.gatewayUrl,
rememberGatewayAuth: next.rememberGatewayAuth,
sessionKey: next.sessionKey, sessionKey: next.sessionKey,
lastActiveSessionKey: next.lastActiveSessionKey, lastActiveSessionKey: next.lastActiveSessionKey,
theme: next.theme, theme: next.theme,

View File

@ -97,6 +97,17 @@ export function renderLoginGate(state: AppViewState) {
</button> </button>
</div> </div>
</label> </label>
<label class="field" style="gap:8px; flex-direction:row; align-items:center;">
<input
type="checkbox"
.checked=${state.settings.rememberGatewayAuth}
@change=${(e: Event) => {
const checked = (e.target as HTMLInputElement).checked;
state.applySettings({ ...state.settings, rememberGatewayAuth: checked });
}}
/>
<span>Remember me on this device</span>
</label>
<button <button
class="btn primary login-gate__connect" class="btn primary login-gate__connect"
@click=${() => state.connect()} @click=${() => state.connect()}