Compare commits
7 Commits
main
...
feature/bt
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
e0541f772e | ||
|
|
f285429952 | ||
|
|
71b4fa04d9 | ||
|
|
da4459263d | ||
|
|
5328399d75 | ||
|
|
ecedddae81 | ||
|
|
27ffeab217 |
@ -145,6 +145,8 @@ Common scopes:
|
||||
|
||||
- `operator.read`
|
||||
- `operator.write`
|
||||
- includes PTY lifecycle methods: `pty.create`, `pty.write`, `pty.resize`, `pty.kill`
|
||||
- `pty.list` is readable by `operator.read` / `operator.write`
|
||||
- `operator.admin`
|
||||
- `operator.approvals`
|
||||
- `operator.pairing`
|
||||
@ -181,6 +183,10 @@ The Gateway treats these as **claims** and enforces server-side allowlists.
|
||||
- `source`: `core` or `plugin`
|
||||
- `pluginId`: plugin owner when `source="plugin"`
|
||||
- `optional`: whether a plugin tool is optional
|
||||
- PTY helpers:
|
||||
- `pty.create`, `pty.write`, `pty.resize`, `pty.kill` require `operator.write`
|
||||
- `pty.list` is available to `operator.read` / `operator.write`
|
||||
- PTY events are targeted to the owning operator connection/device: `pty.output`, `pty.exit`
|
||||
|
||||
## Exec approvals
|
||||
|
||||
|
||||
@ -117,6 +117,13 @@ openclaw health
|
||||
### Common footguns
|
||||
|
||||
- **Wrong port:** Gateway WS defaults to `ws://127.0.0.1:18789`; keep app + CLI on the same port.
|
||||
- **Remote PTY limits:** the Gateway remote terminal now enforces env-configurable limits.
|
||||
- `OPENCLAW_PTY_MAX_SESSIONS_PER_OWNER` (default `4`)
|
||||
- `OPENCLAW_PTY_MAX_TOTAL_SESSIONS` (default `32`)
|
||||
- `OPENCLAW_PTY_MAX_INPUT_CHUNK_BYTES` (default `65536`)
|
||||
- `OPENCLAW_PTY_MIN_COLS` / `OPENCLAW_PTY_MAX_COLS` (defaults `20` / `500`)
|
||||
- `OPENCLAW_PTY_MIN_ROWS` / `OPENCLAW_PTY_MAX_ROWS` (defaults `5` / `200`)
|
||||
- `OPENCLAW_PTY_IDLE_TIMEOUT_MS` / `OPENCLAW_PTY_IDLE_SWEEP_INTERVAL_MS` (defaults `1800000` / `60000`)
|
||||
- **Where state lives:**
|
||||
- Credentials: `~/.openclaw/credentials/`
|
||||
- Sessions: `~/.openclaw/agents/<agentId>/sessions/`
|
||||
|
||||
@ -358,6 +358,7 @@
|
||||
"@mariozechner/pi-tui": "0.57.1",
|
||||
"@modelcontextprotocol/sdk": "1.27.1",
|
||||
"@mozilla/readability": "^0.6.0",
|
||||
"@pierre/diffs": "1.1.0",
|
||||
"@sinclair/typebox": "0.34.48",
|
||||
"@slack/bolt": "^4.6.0",
|
||||
"@slack/web-api": "^7.15.0",
|
||||
|
||||
3
pnpm-lock.yaml
generated
3
pnpm-lock.yaml
generated
@ -79,6 +79,9 @@ importers:
|
||||
'@napi-rs/canvas':
|
||||
specifier: ^0.1.89
|
||||
version: 0.1.95
|
||||
'@pierre/diffs':
|
||||
specifier: 1.1.0
|
||||
version: 1.1.0(react-dom@19.2.4(react@19.2.4))(react@19.2.4)
|
||||
'@sinclair/typebox':
|
||||
specifier: 0.34.48
|
||||
version: 0.34.48
|
||||
|
||||
@ -144,6 +144,22 @@ function buildChatCommands(): ChatCommandDefinition[] {
|
||||
textAlias: "/commands",
|
||||
category: "status",
|
||||
}),
|
||||
defineChatCommand({
|
||||
key: "btw",
|
||||
nativeName: "btw",
|
||||
description: "Ask an ephemeral follow-up about the active session.",
|
||||
textAlias: "/btw",
|
||||
category: "status",
|
||||
args: [
|
||||
{
|
||||
name: "question",
|
||||
description: "Inline follow-up question",
|
||||
type: "string",
|
||||
required: true,
|
||||
captureRemaining: true,
|
||||
},
|
||||
],
|
||||
}),
|
||||
defineChatCommand({
|
||||
key: "skill",
|
||||
nativeName: "skill",
|
||||
|
||||
@ -36,6 +36,7 @@ describe("commands registry", () => {
|
||||
|
||||
it("exposes native specs", () => {
|
||||
const specs = listNativeCommandSpecs();
|
||||
expect(specs.find((spec) => spec.name === "btw")).toBeTruthy();
|
||||
expect(specs.find((spec) => spec.name === "help")).toBeTruthy();
|
||||
expect(specs.find((spec) => spec.name === "stop")).toBeTruthy();
|
||||
expect(specs.find((spec) => spec.name === "skill")).toBeTruthy();
|
||||
@ -209,6 +210,20 @@ describe("commands registry", () => {
|
||||
expect(modeArg?.choices).toEqual(["status", "on", "off"]);
|
||||
});
|
||||
|
||||
it("registers /btw as an inline-question command", () => {
|
||||
const command = findCommandByNativeName("btw");
|
||||
expect(command).toMatchObject({
|
||||
key: "btw",
|
||||
nativeName: "btw",
|
||||
textAliases: ["/btw"],
|
||||
});
|
||||
expect(command?.args?.[0]).toMatchObject({
|
||||
name: "question",
|
||||
required: true,
|
||||
captureRemaining: true,
|
||||
});
|
||||
});
|
||||
|
||||
it("detects known text commands", () => {
|
||||
const detection = getCommandDetection();
|
||||
expect(detection.exact.has("/commands")).toBe(true);
|
||||
|
||||
@ -192,6 +192,7 @@ export function buildEmbeddedRunBaseParams(params: {
|
||||
thinkLevel: params.run.thinkLevel,
|
||||
verboseLevel: params.run.verboseLevel,
|
||||
reasoningLevel: params.run.reasoningLevel,
|
||||
disableTools: params.run.disableTools,
|
||||
execOverrides: params.run.execOverrides,
|
||||
bashElevated: params.run.bashElevated,
|
||||
timeoutMs: params.run.timeoutMs,
|
||||
|
||||
@ -1,3 +1,6 @@
|
||||
import fs from "node:fs/promises";
|
||||
import os from "node:os";
|
||||
import path from "node:path";
|
||||
import { beforeEach, describe, expect, it, vi } from "vitest";
|
||||
import { runPreparedReply } from "./get-reply-run.js";
|
||||
|
||||
@ -79,6 +82,7 @@ vi.mock("./typing-mode.js", () => ({
|
||||
resolveTypingMode: vi.fn().mockReturnValue("off"),
|
||||
}));
|
||||
|
||||
import { resolveSessionFilePath } from "../../config/sessions.js";
|
||||
import { runReplyAgent } from "./agent-runner.js";
|
||||
import { routeReply } from "./route-reply.js";
|
||||
import { drainFormattedSystemEvents } from "./session-updates.js";
|
||||
@ -396,4 +400,63 @@ describe("runPreparedReply media-only handling", () => {
|
||||
// Queue body (used by steer mode) must keep the full original text.
|
||||
expect(call?.followupRun.prompt).toContain("low steer this conversation");
|
||||
});
|
||||
|
||||
it("forces /btw side turns to run as a single no-tools reply", async () => {
|
||||
await runPreparedReply(
|
||||
baseParams({
|
||||
blockStreamingEnabled: true,
|
||||
ephemeralSideTurn: { kind: "btw" },
|
||||
}),
|
||||
);
|
||||
|
||||
const call = vi.mocked(runReplyAgent).mock.calls[0]?.[0];
|
||||
expect(call).toBeTruthy();
|
||||
expect(call?.followupRun.run.disableTools).toBe(true);
|
||||
expect(call?.resolvedQueue.mode).toBe("interrupt");
|
||||
expect(call?.shouldSteer).toBe(false);
|
||||
expect(call?.shouldFollowup).toBe(false);
|
||||
expect(call?.blockStreamingEnabled).toBe(false);
|
||||
expect(call?.queueKey).toBe(call?.followupRun.run.sessionId);
|
||||
expect(call?.queueKey).not.toBe("session-key");
|
||||
});
|
||||
|
||||
it("copies parent transcript into a temporary /btw session without mutating the parent", async () => {
|
||||
const tempRoot = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-btw-test-"));
|
||||
const sourceSessionFile = path.join(tempRoot, "parent.jsonl");
|
||||
const sourceTranscript = [
|
||||
JSON.stringify({ type: "header", sessionId: "parent-session" }),
|
||||
JSON.stringify({
|
||||
type: "message",
|
||||
message: { role: "user", content: [{ type: "text", text: "parent question" }] },
|
||||
}),
|
||||
"",
|
||||
].join("\n");
|
||||
await fs.writeFile(sourceSessionFile, sourceTranscript, "utf-8");
|
||||
|
||||
let copiedTranscript = "";
|
||||
vi.mocked(runReplyAgent).mockImplementationOnce(async (call) => {
|
||||
copiedTranscript = await fs.readFile(call.followupRun.run.sessionFile, "utf-8");
|
||||
return { text: "ok" };
|
||||
});
|
||||
vi.mocked(resolveSessionFilePath).mockReturnValueOnce(sourceSessionFile);
|
||||
|
||||
try {
|
||||
await runPreparedReply(
|
||||
baseParams({
|
||||
ephemeralSideTurn: { kind: "btw" },
|
||||
sessionEntry: {
|
||||
sessionId: "parent-session",
|
||||
updatedAt: 1,
|
||||
sessionFile: sourceSessionFile,
|
||||
},
|
||||
storePath: path.join(tempRoot, "sessions.json"),
|
||||
}),
|
||||
);
|
||||
|
||||
expect(copiedTranscript).toBe(sourceTranscript);
|
||||
await expect(fs.readFile(sourceSessionFile, "utf-8")).resolves.toBe(sourceTranscript);
|
||||
} finally {
|
||||
await fs.rm(tempRoot, { recursive: true, force: true });
|
||||
}
|
||||
});
|
||||
});
|
||||
|
||||
@ -1,4 +1,7 @@
|
||||
import crypto from "node:crypto";
|
||||
import fs from "node:fs/promises";
|
||||
import os from "node:os";
|
||||
import path from "node:path";
|
||||
import { resolveSessionAuthProfileOverride } from "../../agents/auth-profiles/session-override.js";
|
||||
import type { ExecToolDefaults } from "../../agents/bash-tools.js";
|
||||
import { resolveFastModeState } from "../../agents/fast-mode.js";
|
||||
@ -53,6 +56,30 @@ import { appendUntrustedContext } from "./untrusted-context.js";
|
||||
|
||||
type AgentDefaults = NonNullable<OpenClawConfig["agents"]>["defaults"];
|
||||
type ExecOverrides = Pick<ExecToolDefaults, "host" | "security" | "ask" | "node">;
|
||||
type EphemeralSideTurn = { kind: "btw" };
|
||||
|
||||
async function createEphemeralSideTurnSession(params: {
|
||||
agentId: string;
|
||||
sessionEntry?: SessionEntry;
|
||||
storePath?: string;
|
||||
}) {
|
||||
const sessionId = crypto.randomUUID();
|
||||
const tempDir = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-btw-"));
|
||||
const sessionFile = path.join(tempDir, `${sessionId}.jsonl`);
|
||||
const sourceSessionFile = params.sessionEntry
|
||||
? resolveSessionFilePath(
|
||||
params.sessionEntry.sessionId ?? sessionId,
|
||||
params.sessionEntry,
|
||||
resolveSessionFilePathOptions({ agentId: params.agentId, storePath: params.storePath }),
|
||||
)
|
||||
: undefined;
|
||||
if (sourceSessionFile) {
|
||||
await fs.copyFile(sourceSessionFile, sessionFile);
|
||||
} else {
|
||||
await fs.writeFile(sessionFile, "", "utf-8");
|
||||
}
|
||||
return { sessionId, sessionFile, tempDir };
|
||||
}
|
||||
|
||||
function buildResetSessionNoticeText(params: {
|
||||
provider: string;
|
||||
@ -177,6 +204,7 @@ type RunPreparedReplyParams = {
|
||||
storePath?: string;
|
||||
workspaceDir: string;
|
||||
abortedLastRun: boolean;
|
||||
ephemeralSideTurn?: EphemeralSideTurn;
|
||||
};
|
||||
|
||||
export async function runPreparedReply(
|
||||
@ -219,6 +247,7 @@ export async function runPreparedReply(
|
||||
storePath,
|
||||
workspaceDir,
|
||||
sessionStore,
|
||||
ephemeralSideTurn,
|
||||
} = params;
|
||||
let {
|
||||
sessionEntry,
|
||||
@ -230,6 +259,9 @@ export async function runPreparedReply(
|
||||
abortedLastRun,
|
||||
} = params;
|
||||
let currentSystemSent = systemSent;
|
||||
const persistedSessionStore = ephemeralSideTurn ? undefined : sessionStore;
|
||||
const persistedStorePath = ephemeralSideTurn ? undefined : storePath;
|
||||
const abortedLastRunForRun = ephemeralSideTurn ? false : abortedLastRun;
|
||||
|
||||
const isFirstTurnInSession = isNewSession || !currentSystemSent;
|
||||
const isGroupChat = sessionCtx.ChatType === "group";
|
||||
@ -324,11 +356,11 @@ export async function runPreparedReply(
|
||||
: "[User sent media without caption]";
|
||||
let prefixedBodyBase = await applySessionHints({
|
||||
baseBody: effectiveBaseBody,
|
||||
abortedLastRun,
|
||||
abortedLastRun: abortedLastRunForRun,
|
||||
sessionEntry,
|
||||
sessionStore,
|
||||
sessionStore: persistedSessionStore,
|
||||
sessionKey,
|
||||
storePath,
|
||||
storePath: persistedStorePath,
|
||||
abortKey: command.abortKey,
|
||||
});
|
||||
const isGroupSession = sessionEntry?.chatType === "group" || sessionEntry?.chatType === "channel";
|
||||
@ -367,9 +399,9 @@ export async function runPreparedReply(
|
||||
: undefined;
|
||||
const skillResult = await ensureSkillSnapshot({
|
||||
sessionEntry,
|
||||
sessionStore,
|
||||
sessionStore: persistedSessionStore,
|
||||
sessionKey,
|
||||
storePath,
|
||||
storePath: persistedStorePath,
|
||||
sessionId,
|
||||
isFirstTurnInSession,
|
||||
workspaceDir,
|
||||
@ -399,12 +431,18 @@ export async function runPreparedReply(
|
||||
};
|
||||
}
|
||||
resolvedThinkLevel = "high";
|
||||
if (sessionEntry && sessionStore && sessionKey && sessionEntry.thinkingLevel === "xhigh") {
|
||||
if (
|
||||
!ephemeralSideTurn &&
|
||||
sessionEntry &&
|
||||
sessionStore &&
|
||||
sessionKey &&
|
||||
sessionEntry.thinkingLevel === "xhigh"
|
||||
) {
|
||||
sessionEntry.thinkingLevel = "high";
|
||||
sessionEntry.updatedAt = Date.now();
|
||||
sessionStore[sessionKey] = sessionEntry;
|
||||
if (storePath) {
|
||||
await updateSessionStore(storePath, (store) => {
|
||||
if (persistedStorePath) {
|
||||
await updateSessionStore(persistedStorePath, (store) => {
|
||||
store[sessionKey] = sessionEntry;
|
||||
});
|
||||
}
|
||||
@ -424,40 +462,53 @@ export async function runPreparedReply(
|
||||
defaultModel,
|
||||
});
|
||||
}
|
||||
const sessionIdFinal = sessionId ?? crypto.randomUUID();
|
||||
const sessionFile = resolveSessionFilePath(
|
||||
sessionIdFinal,
|
||||
sessionEntry,
|
||||
resolveSessionFilePathOptions({ agentId, storePath }),
|
||||
);
|
||||
const sideTurnSession = ephemeralSideTurn
|
||||
? await createEphemeralSideTurnSession({
|
||||
agentId,
|
||||
sessionEntry,
|
||||
storePath,
|
||||
})
|
||||
: null;
|
||||
const sessionIdFinal = sideTurnSession?.sessionId ?? sessionId ?? crypto.randomUUID();
|
||||
const sessionFile =
|
||||
sideTurnSession?.sessionFile ??
|
||||
resolveSessionFilePath(
|
||||
sessionIdFinal,
|
||||
sessionEntry,
|
||||
resolveSessionFilePathOptions({ agentId, storePath }),
|
||||
);
|
||||
// Use bodyWithEvents (events prepended, but no session hints / untrusted context) so
|
||||
// deferred turns receive system events while keeping the same scope as effectiveBaseBody did.
|
||||
const queueBodyBase = [threadContextNote, bodyWithEvents].filter(Boolean).join("\n\n");
|
||||
const queuedBody = mediaNote
|
||||
? [mediaNote, mediaReplyHint, queueBodyBase].filter(Boolean).join("\n").trim()
|
||||
: queueBodyBase;
|
||||
const resolvedQueue = resolveQueueSettings({
|
||||
const inheritedQueue = resolveQueueSettings({
|
||||
cfg,
|
||||
channel: sessionCtx.Provider,
|
||||
sessionEntry,
|
||||
inlineMode: perMessageQueueMode,
|
||||
inlineOptions: perMessageQueueOptions,
|
||||
});
|
||||
const sessionLaneKey = resolveEmbeddedSessionLane(sessionKey ?? sessionIdFinal);
|
||||
const resolvedQueue = ephemeralSideTurn ? { mode: "interrupt" as const } : inheritedQueue;
|
||||
const queueKey = ephemeralSideTurn ? sessionIdFinal : (sessionKey ?? sessionIdFinal);
|
||||
const sessionLaneKey = resolveEmbeddedSessionLane(queueKey);
|
||||
const laneSize = getQueueSize(sessionLaneKey);
|
||||
if (resolvedQueue.mode === "interrupt" && laneSize > 0) {
|
||||
const cleared = clearCommandLane(sessionLaneKey);
|
||||
const aborted = abortEmbeddedPiRun(sessionIdFinal);
|
||||
logVerbose(`Interrupting ${sessionLaneKey} (cleared ${cleared}, aborted=${aborted})`);
|
||||
}
|
||||
const queueKey = sessionKey ?? sessionIdFinal;
|
||||
const isActive = isEmbeddedPiRunActive(sessionIdFinal);
|
||||
const isStreaming = isEmbeddedPiRunStreaming(sessionIdFinal);
|
||||
const shouldSteer = resolvedQueue.mode === "steer" || resolvedQueue.mode === "steer-backlog";
|
||||
const shouldSteer =
|
||||
!ephemeralSideTurn &&
|
||||
(resolvedQueue.mode === "steer" || resolvedQueue.mode === "steer-backlog");
|
||||
const shouldFollowup =
|
||||
resolvedQueue.mode === "followup" ||
|
||||
resolvedQueue.mode === "collect" ||
|
||||
resolvedQueue.mode === "steer-backlog";
|
||||
!ephemeralSideTurn &&
|
||||
(resolvedQueue.mode === "followup" ||
|
||||
resolvedQueue.mode === "collect" ||
|
||||
resolvedQueue.mode === "steer-backlog");
|
||||
const authProfileId = await resolveSessionAuthProfileOverride({
|
||||
cfg,
|
||||
provider,
|
||||
@ -519,6 +570,7 @@ export async function runPreparedReply(
|
||||
verboseLevel: resolvedVerboseLevel,
|
||||
reasoningLevel: resolvedReasoningLevel,
|
||||
elevatedLevel: resolvedElevatedLevel,
|
||||
...(ephemeralSideTurn ? { disableTools: true } : {}),
|
||||
execOverrides,
|
||||
bashElevated: {
|
||||
enabled: elevatedEnabled,
|
||||
@ -534,30 +586,36 @@ export async function runPreparedReply(
|
||||
},
|
||||
};
|
||||
|
||||
return runReplyAgent({
|
||||
commandBody: prefixedCommandBody,
|
||||
followupRun,
|
||||
queueKey,
|
||||
resolvedQueue,
|
||||
shouldSteer,
|
||||
shouldFollowup,
|
||||
isActive,
|
||||
isStreaming,
|
||||
opts,
|
||||
typing,
|
||||
sessionEntry,
|
||||
sessionStore,
|
||||
sessionKey,
|
||||
storePath,
|
||||
defaultModel,
|
||||
agentCfgContextTokens: agentCfg?.contextTokens,
|
||||
resolvedVerboseLevel: resolvedVerboseLevel ?? "off",
|
||||
isNewSession,
|
||||
blockStreamingEnabled,
|
||||
blockReplyChunking,
|
||||
resolvedBlockStreamingBreak,
|
||||
sessionCtx,
|
||||
shouldInjectGroupIntro,
|
||||
typingMode,
|
||||
});
|
||||
try {
|
||||
return await runReplyAgent({
|
||||
commandBody: prefixedCommandBody,
|
||||
followupRun,
|
||||
queueKey,
|
||||
resolvedQueue,
|
||||
shouldSteer,
|
||||
shouldFollowup,
|
||||
isActive,
|
||||
isStreaming,
|
||||
opts,
|
||||
typing,
|
||||
sessionEntry,
|
||||
sessionStore: persistedSessionStore,
|
||||
sessionKey,
|
||||
storePath: persistedStorePath,
|
||||
defaultModel,
|
||||
agentCfgContextTokens: agentCfg?.contextTokens,
|
||||
resolvedVerboseLevel: resolvedVerboseLevel ?? "off",
|
||||
isNewSession,
|
||||
blockStreamingEnabled: ephemeralSideTurn ? false : blockStreamingEnabled,
|
||||
blockReplyChunking,
|
||||
resolvedBlockStreamingBreak,
|
||||
sessionCtx,
|
||||
shouldInjectGroupIntro,
|
||||
typingMode,
|
||||
});
|
||||
} finally {
|
||||
if (sideTurnSession) {
|
||||
await fs.rm(sideTurnSession.tempDir, { recursive: true, force: true });
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -49,6 +49,23 @@ function buildNativeResetContext(): MsgContext {
|
||||
};
|
||||
}
|
||||
|
||||
function buildNativeBtwContext(): MsgContext {
|
||||
return {
|
||||
Provider: "telegram",
|
||||
Surface: "telegram",
|
||||
ChatType: "direct",
|
||||
Body: "/btw what changed?",
|
||||
RawBody: "/btw what changed?",
|
||||
CommandBody: "/btw what changed?",
|
||||
CommandSource: "native",
|
||||
CommandAuthorized: true,
|
||||
SessionKey: "telegram:slash:123",
|
||||
CommandTargetSessionKey: "agent:main:telegram:direct:123",
|
||||
From: "telegram:123",
|
||||
To: "slash:123",
|
||||
};
|
||||
}
|
||||
|
||||
function createContinueDirectivesResult(resetHookTriggered: boolean) {
|
||||
return {
|
||||
kind: "continue" as const,
|
||||
@ -150,4 +167,80 @@ describe("getReplyFromConfig reset-hook fallback", () => {
|
||||
|
||||
expect(mocks.emitResetCommandHooks).not.toHaveBeenCalled();
|
||||
});
|
||||
|
||||
it("rewrites native /btw to the inline question and resolves the target session read-only", async () => {
|
||||
mocks.handleInlineActions.mockResolvedValueOnce({ kind: "reply", reply: undefined });
|
||||
mocks.initSessionState.mockResolvedValueOnce({
|
||||
sessionCtx: { BodyStripped: "what changed?" },
|
||||
sessionEntry: {
|
||||
sessionId: "session-1",
|
||||
updatedAt: 1,
|
||||
sessionFile: "/tmp/session-1.jsonl",
|
||||
},
|
||||
previousSessionEntry: undefined,
|
||||
sessionStore: {},
|
||||
sessionKey: "agent:main:telegram:direct:123",
|
||||
sessionId: "session-1",
|
||||
isNewSession: false,
|
||||
resetTriggered: false,
|
||||
systemSent: true,
|
||||
abortedLastRun: false,
|
||||
storePath: "/tmp/sessions.json",
|
||||
sessionScope: "per-sender",
|
||||
groupResolution: undefined,
|
||||
isGroup: false,
|
||||
triggerBodyNormalized: "what changed?",
|
||||
bodyStripped: "what changed?",
|
||||
});
|
||||
mocks.resolveReplyDirectives.mockResolvedValueOnce(createContinueDirectivesResult(false));
|
||||
|
||||
await getReplyFromConfig(buildNativeBtwContext(), undefined, {});
|
||||
|
||||
expect(mocks.initSessionState).toHaveBeenCalledWith(
|
||||
expect.objectContaining({
|
||||
ctx: expect.objectContaining({
|
||||
Body: "what changed?",
|
||||
CommandBody: "what changed?",
|
||||
RawBody: "what changed?",
|
||||
}),
|
||||
readOnly: true,
|
||||
}),
|
||||
);
|
||||
expect(mocks.resolveReplyDirectives).toHaveBeenCalledWith(
|
||||
expect.objectContaining({
|
||||
triggerBodyNormalized: "what changed?",
|
||||
sessionCtx: expect.objectContaining({
|
||||
BodyStripped: "what changed?",
|
||||
}),
|
||||
}),
|
||||
);
|
||||
});
|
||||
|
||||
it("returns an error for /btw when there is no active target session", async () => {
|
||||
mocks.initSessionState.mockResolvedValueOnce({
|
||||
sessionCtx: {},
|
||||
sessionEntry: {
|
||||
sessionId: "session-2",
|
||||
updatedAt: 2,
|
||||
},
|
||||
previousSessionEntry: undefined,
|
||||
sessionStore: {},
|
||||
sessionKey: "agent:main:telegram:direct:123",
|
||||
sessionId: "session-2",
|
||||
isNewSession: true,
|
||||
resetTriggered: false,
|
||||
systemSent: false,
|
||||
abortedLastRun: false,
|
||||
storePath: "/tmp/sessions.json",
|
||||
sessionScope: "per-sender",
|
||||
groupResolution: undefined,
|
||||
isGroup: false,
|
||||
triggerBodyNormalized: "what changed?",
|
||||
bodyStripped: "what changed?",
|
||||
});
|
||||
|
||||
await expect(getReplyFromConfig(buildNativeBtwContext(), undefined, {})).resolves.toEqual({
|
||||
text: "❌ No active session found for /btw.",
|
||||
});
|
||||
});
|
||||
});
|
||||
|
||||
@ -14,6 +14,7 @@ import { applyMediaUnderstanding } from "../../media-understanding/apply.js";
|
||||
import { defaultRuntime } from "../../runtime.js";
|
||||
import { normalizeStringEntries } from "../../shared/string-normalization.js";
|
||||
import { resolveCommandAuthorization } from "../command-auth.js";
|
||||
import { shouldHandleTextCommands } from "../commands-registry.js";
|
||||
import type { MsgContext } from "../templating.js";
|
||||
import { SILENT_REPLY_TOKEN } from "../tokens.js";
|
||||
import type { GetReplyOptions, ReplyPayload } from "../types.js";
|
||||
@ -54,6 +55,18 @@ function mergeSkillFilters(channelFilter?: string[], agentFilter?: string[]): st
|
||||
return channel.filter((name) => agentSet.has(name));
|
||||
}
|
||||
|
||||
function parseBtwInlineQuestion(text: string | undefined): string | null {
|
||||
const trimmed = text?.trim();
|
||||
if (!trimmed) {
|
||||
return null;
|
||||
}
|
||||
const match = trimmed.match(/^\/btw(?:\s+([\s\S]+))?$/i);
|
||||
if (!match) {
|
||||
return null;
|
||||
}
|
||||
return match[1]?.trim() ?? "";
|
||||
}
|
||||
|
||||
export async function getReplyFromConfig(
|
||||
ctx: MsgContext,
|
||||
opts?: GetReplyOptions,
|
||||
@ -124,6 +137,37 @@ export async function getReplyFromConfig(
|
||||
opts?.onTypingController?.(typing);
|
||||
|
||||
const finalized = finalizeInboundContext(ctx);
|
||||
const commandAuthorized = finalized.CommandAuthorized;
|
||||
const commandAuth = resolveCommandAuthorization({
|
||||
ctx: finalized,
|
||||
cfg,
|
||||
commandAuthorized,
|
||||
});
|
||||
const btwQuestion = parseBtwInlineQuestion(
|
||||
finalized.BodyForCommands ?? finalized.CommandBody ?? finalized.RawBody ?? finalized.Body,
|
||||
);
|
||||
const allowBtwSideTurn =
|
||||
typeof btwQuestion === "string" &&
|
||||
shouldHandleTextCommands({
|
||||
cfg,
|
||||
surface: finalized.Surface ?? "",
|
||||
commandSource: finalized.CommandSource,
|
||||
});
|
||||
const useBtwSideTurn = allowBtwSideTurn && typeof btwQuestion === "string";
|
||||
if (useBtwSideTurn && !commandAuth.isAuthorizedSender) {
|
||||
return undefined;
|
||||
}
|
||||
if (useBtwSideTurn) {
|
||||
const btwQuestionText = btwQuestion ?? "";
|
||||
if (btwQuestionText.length === 0) {
|
||||
return { text: "⚙️ Usage: /btw <question>" };
|
||||
}
|
||||
finalized.Body = btwQuestionText;
|
||||
finalized.BodyForAgent = btwQuestionText;
|
||||
finalized.RawBody = btwQuestionText;
|
||||
finalized.CommandBody = btwQuestionText;
|
||||
finalized.BodyForCommands = btwQuestionText;
|
||||
}
|
||||
|
||||
if (!isFastTestEnv) {
|
||||
await applyMediaUnderstanding({
|
||||
@ -143,16 +187,11 @@ export async function getReplyFromConfig(
|
||||
isFastTestEnv,
|
||||
});
|
||||
|
||||
const commandAuthorized = finalized.CommandAuthorized;
|
||||
resolveCommandAuthorization({
|
||||
ctx: finalized,
|
||||
cfg,
|
||||
commandAuthorized,
|
||||
});
|
||||
const sessionState = await initSessionState({
|
||||
ctx: finalized,
|
||||
cfg,
|
||||
commandAuthorized,
|
||||
readOnly: useBtwSideTurn,
|
||||
});
|
||||
let {
|
||||
sessionCtx,
|
||||
@ -173,6 +212,11 @@ export async function getReplyFromConfig(
|
||||
bodyStripped,
|
||||
} = sessionState;
|
||||
|
||||
if (useBtwSideTurn && (isNewSession || !sessionEntry?.sessionFile?.trim())) {
|
||||
typing.cleanup();
|
||||
return { text: "❌ No active session found for /btw." };
|
||||
}
|
||||
|
||||
await applyResetModelOverride({
|
||||
cfg,
|
||||
agentId,
|
||||
@ -400,5 +444,6 @@ export async function getReplyFromConfig(
|
||||
storePath,
|
||||
workspaceDir,
|
||||
abortedLastRun,
|
||||
...(useBtwSideTurn ? { ephemeralSideTurn: { kind: "btw" as const } } : {}),
|
||||
});
|
||||
}
|
||||
|
||||
@ -77,6 +77,7 @@ export type FollowupRun = {
|
||||
};
|
||||
timeoutMs: number;
|
||||
blockReplyBreak: "text_end" | "message_end";
|
||||
disableTools?: boolean;
|
||||
ownerNumbers?: string[];
|
||||
inputProvenance?: InputProvenance;
|
||||
extraSystemPrompt?: string;
|
||||
|
||||
@ -170,8 +170,10 @@ export async function initSessionState(params: {
|
||||
ctx: MsgContext;
|
||||
cfg: OpenClawConfig;
|
||||
commandAuthorized: boolean;
|
||||
readOnly?: boolean;
|
||||
}): Promise<SessionInitResult> {
|
||||
const { ctx, cfg, commandAuthorized } = params;
|
||||
const readOnly = params.readOnly === true;
|
||||
// Native slash commands (Telegram/Discord/Slack) are delivered on a separate
|
||||
// "slash session" key, but should mutate the target chat session.
|
||||
const targetSessionKey =
|
||||
@ -496,18 +498,24 @@ export async function initSessionState(params: {
|
||||
const fallbackSessionFile = !sessionEntry.sessionFile
|
||||
? resolveSessionTranscriptPath(sessionEntry.sessionId, agentId, ctx.MessageThreadId)
|
||||
: undefined;
|
||||
const resolvedSessionFile = await resolveAndPersistSessionFile({
|
||||
sessionId: sessionEntry.sessionId,
|
||||
sessionKey,
|
||||
sessionStore,
|
||||
storePath,
|
||||
sessionEntry,
|
||||
agentId,
|
||||
sessionsDir: path.dirname(storePath),
|
||||
fallbackSessionFile,
|
||||
activeSessionKey: sessionKey,
|
||||
});
|
||||
sessionEntry = resolvedSessionFile.sessionEntry;
|
||||
if (readOnly) {
|
||||
if (!sessionEntry.sessionFile && fallbackSessionFile) {
|
||||
sessionEntry.sessionFile = fallbackSessionFile;
|
||||
}
|
||||
} else {
|
||||
const resolvedSessionFile = await resolveAndPersistSessionFile({
|
||||
sessionId: sessionEntry.sessionId,
|
||||
sessionKey,
|
||||
sessionStore,
|
||||
storePath,
|
||||
sessionEntry,
|
||||
agentId,
|
||||
sessionsDir: path.dirname(storePath),
|
||||
fallbackSessionFile,
|
||||
activeSessionKey: sessionKey,
|
||||
});
|
||||
sessionEntry = resolvedSessionFile.sessionEntry;
|
||||
}
|
||||
if (isNewSession) {
|
||||
sessionEntry.compactionCount = 0;
|
||||
sessionEntry.memoryFlushCompactionCount = undefined;
|
||||
@ -519,38 +527,40 @@ export async function initSessionState(params: {
|
||||
sessionEntry.outputTokens = undefined;
|
||||
sessionEntry.contextTokens = undefined;
|
||||
}
|
||||
// Preserve per-session overrides while resetting compaction state on /new.
|
||||
sessionStore[sessionKey] = { ...sessionStore[sessionKey], ...sessionEntry };
|
||||
await updateSessionStore(
|
||||
storePath,
|
||||
(store) => {
|
||||
// Preserve per-session overrides while resetting compaction state on /new.
|
||||
store[sessionKey] = { ...store[sessionKey], ...sessionEntry };
|
||||
if (retiredLegacyMainDelivery) {
|
||||
store[retiredLegacyMainDelivery.key] = retiredLegacyMainDelivery.entry;
|
||||
}
|
||||
},
|
||||
{
|
||||
activeSessionKey: sessionKey,
|
||||
onWarn: (warning) =>
|
||||
deliverSessionMaintenanceWarning({
|
||||
cfg,
|
||||
sessionKey,
|
||||
entry: sessionEntry,
|
||||
warning,
|
||||
}),
|
||||
},
|
||||
);
|
||||
|
||||
// Archive old transcript so it doesn't accumulate on disk (#14869).
|
||||
if (previousSessionEntry?.sessionId) {
|
||||
archiveSessionTranscripts({
|
||||
sessionId: previousSessionEntry.sessionId,
|
||||
if (!readOnly) {
|
||||
// Preserve per-session overrides while resetting compaction state on /new.
|
||||
sessionStore[sessionKey] = { ...sessionStore[sessionKey], ...sessionEntry };
|
||||
await updateSessionStore(
|
||||
storePath,
|
||||
sessionFile: previousSessionEntry.sessionFile,
|
||||
agentId,
|
||||
reason: "reset",
|
||||
});
|
||||
(store) => {
|
||||
// Preserve per-session overrides while resetting compaction state on /new.
|
||||
store[sessionKey] = { ...store[sessionKey], ...sessionEntry };
|
||||
if (retiredLegacyMainDelivery) {
|
||||
store[retiredLegacyMainDelivery.key] = retiredLegacyMainDelivery.entry;
|
||||
}
|
||||
},
|
||||
{
|
||||
activeSessionKey: sessionKey,
|
||||
onWarn: (warning) =>
|
||||
deliverSessionMaintenanceWarning({
|
||||
cfg,
|
||||
sessionKey,
|
||||
entry: sessionEntry,
|
||||
warning,
|
||||
}),
|
||||
},
|
||||
);
|
||||
|
||||
// Archive old transcript so it doesn't accumulate on disk (#14869).
|
||||
if (previousSessionEntry?.sessionId) {
|
||||
archiveSessionTranscripts({
|
||||
sessionId: previousSessionEntry.sessionId,
|
||||
storePath,
|
||||
sessionFile: previousSessionEntry.sessionFile,
|
||||
agentId,
|
||||
reason: "reset",
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
const sessionCtx: TemplateContext = {
|
||||
@ -571,7 +581,7 @@ export async function initSessionState(params: {
|
||||
};
|
||||
|
||||
// Run session plugin hooks (fire-and-forget)
|
||||
const hookRunner = getGlobalHookRunner();
|
||||
const hookRunner = readOnly ? null : getGlobalHookRunner();
|
||||
if (hookRunner && isNewSession) {
|
||||
const effectiveSessionId = sessionId ?? "";
|
||||
|
||||
|
||||
@ -67,6 +67,7 @@ const METHOD_SCOPE_GROUPS: Record<OperatorScope, readonly string[]> = {
|
||||
"voicewake.get",
|
||||
"sessions.list",
|
||||
"sessions.get",
|
||||
"pty.list",
|
||||
"sessions.preview",
|
||||
"sessions.resolve",
|
||||
"sessions.usage",
|
||||
|
||||
@ -5,6 +5,11 @@ export const ErrorCodes = {
|
||||
NOT_PAIRED: "NOT_PAIRED",
|
||||
AGENT_TIMEOUT: "AGENT_TIMEOUT",
|
||||
INVALID_REQUEST: "INVALID_REQUEST",
|
||||
INVALID_PARAMS: "INVALID_PARAMS",
|
||||
NOT_FOUND: "NOT_FOUND",
|
||||
FORBIDDEN: "FORBIDDEN",
|
||||
RESOURCE_LIMIT: "RESOURCE_LIMIT",
|
||||
PAYLOAD_TOO_LARGE: "PAYLOAD_TOO_LARGE",
|
||||
UNAVAILABLE: "UNAVAILABLE",
|
||||
} as const;
|
||||
|
||||
|
||||
132
src/gateway/pty-manager.test.ts
Normal file
132
src/gateway/pty-manager.test.ts
Normal file
@ -0,0 +1,132 @@
|
||||
import { afterEach, beforeEach, describe, expect, it, vi } from "vitest";
|
||||
|
||||
const spawnMock = vi.fn();
|
||||
|
||||
vi.mock("@lydell/node-pty", () => ({
|
||||
spawn: spawnMock,
|
||||
}));
|
||||
|
||||
function makePtyHandle() {
|
||||
let dataListener: ((value: string) => void) | null = null;
|
||||
let exitListener: ((event: { exitCode: number }) => void) | null = null;
|
||||
return {
|
||||
pid: 123,
|
||||
write: vi.fn(),
|
||||
resize: vi.fn(),
|
||||
kill: vi.fn(),
|
||||
onData: vi.fn((listener: (value: string) => void) => {
|
||||
dataListener = listener;
|
||||
return { dispose: vi.fn() };
|
||||
}),
|
||||
onExit: vi.fn((listener: (event: { exitCode: number }) => void) => {
|
||||
exitListener = listener;
|
||||
return { dispose: vi.fn() };
|
||||
}),
|
||||
emitData(value: string) {
|
||||
dataListener?.(value);
|
||||
},
|
||||
emitExit(code: number) {
|
||||
exitListener?.({ exitCode: code });
|
||||
},
|
||||
};
|
||||
}
|
||||
|
||||
describe("gateway pty manager", () => {
|
||||
beforeEach(() => {
|
||||
vi.resetModules();
|
||||
vi.clearAllMocks();
|
||||
delete process.env.OPENCLAW_PTY_MAX_SESSIONS_PER_OWNER;
|
||||
delete process.env.OPENCLAW_PTY_MAX_TOTAL_SESSIONS;
|
||||
delete process.env.OPENCLAW_PTY_MAX_INPUT_CHUNK_BYTES;
|
||||
delete process.env.OPENCLAW_PTY_MIN_COLS;
|
||||
delete process.env.OPENCLAW_PTY_MAX_COLS;
|
||||
delete process.env.OPENCLAW_PTY_MIN_ROWS;
|
||||
delete process.env.OPENCLAW_PTY_MAX_ROWS;
|
||||
});
|
||||
|
||||
afterEach(async () => {
|
||||
const mod = await import("./pty-manager.js");
|
||||
for (const session of mod.listGatewayPtySessionsByOwner("device:one")) {
|
||||
mod.destroyGatewayPtySession(session.sessionId);
|
||||
}
|
||||
for (const session of mod.listGatewayPtySessionsByOwner("device:two")) {
|
||||
mod.destroyGatewayPtySession(session.sessionId);
|
||||
}
|
||||
});
|
||||
|
||||
it("enforces per-owner and total session limits", async () => {
|
||||
process.env.OPENCLAW_PTY_MAX_SESSIONS_PER_OWNER = "1";
|
||||
process.env.OPENCLAW_PTY_MAX_TOTAL_SESSIONS = "2";
|
||||
spawnMock.mockImplementation(() => makePtyHandle());
|
||||
const mod = await import("./pty-manager.js");
|
||||
|
||||
await mod.createGatewayPtySession({
|
||||
owner: { ownerKey: "device:one", connId: "conn-1" },
|
||||
onOutput: vi.fn(),
|
||||
onExit: vi.fn(),
|
||||
});
|
||||
await mod.createGatewayPtySession({
|
||||
owner: { ownerKey: "device:two", connId: "conn-2" },
|
||||
onOutput: vi.fn(),
|
||||
onExit: vi.fn(),
|
||||
});
|
||||
|
||||
await expect(
|
||||
mod.createGatewayPtySession({
|
||||
owner: { ownerKey: "device:one", connId: "conn-1" },
|
||||
onOutput: vi.fn(),
|
||||
onExit: vi.fn(),
|
||||
}),
|
||||
).rejects.toMatchObject({ code: "PTY_LIMIT_REACHED" });
|
||||
});
|
||||
|
||||
it("enforces resize and input limits and exposes lastActive metadata", async () => {
|
||||
process.env.OPENCLAW_PTY_MAX_INPUT_CHUNK_BYTES = "4";
|
||||
process.env.OPENCLAW_PTY_MIN_COLS = "10";
|
||||
process.env.OPENCLAW_PTY_MAX_COLS = "100";
|
||||
process.env.OPENCLAW_PTY_MIN_ROWS = "5";
|
||||
process.env.OPENCLAW_PTY_MAX_ROWS = "50";
|
||||
const handle = makePtyHandle();
|
||||
spawnMock.mockImplementation(() => handle);
|
||||
const mod = await import("./pty-manager.js");
|
||||
|
||||
const session = await mod.createGatewayPtySession({
|
||||
owner: { ownerKey: "device:one", connId: "conn-1" },
|
||||
onOutput: vi.fn(),
|
||||
onExit: vi.fn(),
|
||||
});
|
||||
|
||||
expect(mod.listGatewayPtySessionsByOwner("device:one")[0]).toMatchObject({
|
||||
sessionId: session.sessionId,
|
||||
createdAt: expect.any(Number),
|
||||
lastActive: expect.any(Number),
|
||||
cols: 80,
|
||||
rows: 24,
|
||||
});
|
||||
|
||||
expect(() => mod.writeGatewayPtySession(session.sessionId, "12345")).toThrowError(
|
||||
/exceeds 4 bytes/,
|
||||
);
|
||||
expect(() => mod.resizeGatewayPtySession(session.sessionId, 9, 6)).toThrowError(
|
||||
/cols must be between 10 and 100/,
|
||||
);
|
||||
expect(() => mod.resizeGatewayPtySession(session.sessionId, 20, 51)).toThrowError(
|
||||
/rows must be between 5 and 50/,
|
||||
);
|
||||
});
|
||||
|
||||
it("kills sessions bound to a disconnected connection", async () => {
|
||||
const handle = makePtyHandle();
|
||||
spawnMock.mockImplementation(() => handle);
|
||||
const mod = await import("./pty-manager.js");
|
||||
|
||||
const session = await mod.createGatewayPtySession({
|
||||
owner: { ownerKey: "device:one", connId: "conn-1" },
|
||||
onOutput: vi.fn(),
|
||||
onExit: vi.fn(),
|
||||
});
|
||||
|
||||
expect(mod.destroyGatewayPtySessionsForConn("conn-1")).toBe(1);
|
||||
expect(mod.getGatewayPtySession(session.sessionId)).toBeUndefined();
|
||||
});
|
||||
});
|
||||
442
src/gateway/pty-manager.ts
Normal file
442
src/gateway/pty-manager.ts
Normal file
@ -0,0 +1,442 @@
|
||||
import crypto from "node:crypto";
|
||||
import os from "node:os";
|
||||
import path from "node:path";
|
||||
|
||||
export type GatewayPtyOwner = {
|
||||
ownerKey: string;
|
||||
connId: string;
|
||||
deviceId?: string;
|
||||
};
|
||||
|
||||
export type GatewayPtySession = {
|
||||
sessionId: string;
|
||||
owner: GatewayPtyOwner;
|
||||
shell: string;
|
||||
cwd: string;
|
||||
cols: number;
|
||||
rows: number;
|
||||
createdAt: number;
|
||||
lastActive: number;
|
||||
exitedAt?: number;
|
||||
exitCode?: number | null;
|
||||
};
|
||||
|
||||
type PtyExitEvent = { exitCode: number; signal?: number };
|
||||
type PtyDisposable = { dispose: () => void };
|
||||
type PtySpawnHandle = {
|
||||
pid: number;
|
||||
write: (data: string | Buffer) => void;
|
||||
resize?: (cols: number, rows: number) => void;
|
||||
onData: (listener: (value: string) => void) => PtyDisposable | void;
|
||||
onExit: (listener: (event: PtyExitEvent) => void) => PtyDisposable | void;
|
||||
kill: (signal?: string) => void;
|
||||
};
|
||||
|
||||
type PtySpawn = (
|
||||
file: string,
|
||||
args: string[] | string,
|
||||
options: {
|
||||
name?: string;
|
||||
cols?: number;
|
||||
rows?: number;
|
||||
cwd?: string;
|
||||
env?: Record<string, string>;
|
||||
},
|
||||
) => PtySpawnHandle;
|
||||
|
||||
type PtyModule = {
|
||||
spawn?: PtySpawn;
|
||||
default?: { spawn?: PtySpawn };
|
||||
};
|
||||
|
||||
type ActiveSession = GatewayPtySession & {
|
||||
pty: PtySpawnHandle;
|
||||
outputDispose?: PtyDisposable | null;
|
||||
exitDispose?: PtyDisposable | null;
|
||||
};
|
||||
|
||||
export class GatewayPtyError extends Error {
|
||||
code:
|
||||
| "PTY_NOT_FOUND"
|
||||
| "PTY_ACCESS_DENIED"
|
||||
| "PTY_INVALID_ARGS"
|
||||
| "PTY_LIMIT_REACHED"
|
||||
| "PTY_INPUT_TOO_LARGE";
|
||||
|
||||
constructor(
|
||||
code:
|
||||
| "PTY_NOT_FOUND"
|
||||
| "PTY_ACCESS_DENIED"
|
||||
| "PTY_INVALID_ARGS"
|
||||
| "PTY_LIMIT_REACHED"
|
||||
| "PTY_INPUT_TOO_LARGE",
|
||||
message: string,
|
||||
) {
|
||||
super(message);
|
||||
this.name = "GatewayPtyError";
|
||||
this.code = code;
|
||||
}
|
||||
}
|
||||
|
||||
const sessions = new Map<string, ActiveSession>();
|
||||
let idleSweepTimer: NodeJS.Timeout | null = null;
|
||||
|
||||
function intFromEnv(name: string, fallback: number): number {
|
||||
const raw = process.env[name]?.trim();
|
||||
if (!raw) {
|
||||
return fallback;
|
||||
}
|
||||
const parsed = Number(raw);
|
||||
if (!Number.isFinite(parsed)) {
|
||||
return fallback;
|
||||
}
|
||||
return Math.floor(parsed);
|
||||
}
|
||||
|
||||
function getPtyLimits() {
|
||||
const minCols = Math.max(1, intFromEnv("OPENCLAW_PTY_MIN_COLS", 20));
|
||||
const maxCols = Math.max(minCols, intFromEnv("OPENCLAW_PTY_MAX_COLS", 500));
|
||||
const minRows = Math.max(1, intFromEnv("OPENCLAW_PTY_MIN_ROWS", 5));
|
||||
const maxRows = Math.max(minRows, intFromEnv("OPENCLAW_PTY_MAX_ROWS", 200));
|
||||
return {
|
||||
minCols,
|
||||
maxCols,
|
||||
minRows,
|
||||
maxRows,
|
||||
maxSessionsPerOwner: Math.max(1, intFromEnv("OPENCLAW_PTY_MAX_SESSIONS_PER_OWNER", 4)),
|
||||
maxTotalSessions: Math.max(1, intFromEnv("OPENCLAW_PTY_MAX_TOTAL_SESSIONS", 32)),
|
||||
maxInputChunkBytes: Math.max(1, intFromEnv("OPENCLAW_PTY_MAX_INPUT_CHUNK_BYTES", 65536)),
|
||||
idleTimeoutMs: Math.max(0, intFromEnv("OPENCLAW_PTY_IDLE_TIMEOUT_MS", 30 * 60 * 1000)),
|
||||
idleSweepIntervalMs: Math.max(
|
||||
1000,
|
||||
intFromEnv("OPENCLAW_PTY_IDLE_SWEEP_INTERVAL_MS", 60 * 1000),
|
||||
),
|
||||
};
|
||||
}
|
||||
|
||||
function sanitizeInitialDim(
|
||||
value: unknown,
|
||||
fallback: number,
|
||||
min: number,
|
||||
max: number,
|
||||
label: string,
|
||||
): number {
|
||||
if (value == null) {
|
||||
return fallback;
|
||||
}
|
||||
const n = typeof value === "number" ? value : Number(value);
|
||||
if (!Number.isFinite(n)) {
|
||||
throw new GatewayPtyError("PTY_INVALID_ARGS", `${label} must be a finite number`);
|
||||
}
|
||||
const next = Math.floor(n);
|
||||
if (next < min || next > max) {
|
||||
throw new GatewayPtyError("PTY_INVALID_ARGS", `${label} must be between ${min} and ${max}`);
|
||||
}
|
||||
return next;
|
||||
}
|
||||
|
||||
function sanitizeResizeDim(
|
||||
value: unknown,
|
||||
current: number,
|
||||
min: number,
|
||||
max: number,
|
||||
label: string,
|
||||
): number {
|
||||
if (value == null) {
|
||||
return current;
|
||||
}
|
||||
const n = typeof value === "number" ? value : Number(value);
|
||||
if (!Number.isFinite(n)) {
|
||||
throw new GatewayPtyError("PTY_INVALID_ARGS", `${label} must be a finite number`);
|
||||
}
|
||||
const next = Math.floor(n);
|
||||
if (next < min || next > max) {
|
||||
throw new GatewayPtyError("PTY_INVALID_ARGS", `${label} must be between ${min} and ${max}`);
|
||||
}
|
||||
return next;
|
||||
}
|
||||
|
||||
function resolveDefaultShell(): string {
|
||||
const shell = (process.env.OPENCLAW_PTY_SHELL || process.env.SHELL || "").trim();
|
||||
if (shell) {
|
||||
return shell;
|
||||
}
|
||||
return process.platform === "win32" ? "powershell.exe" : "/bin/zsh";
|
||||
}
|
||||
|
||||
function resolveAllowedShells(defaultShell: string): Set<string> {
|
||||
const raw = (process.env.OPENCLAW_PTY_ALLOWED_SHELLS || "").trim();
|
||||
const values = raw
|
||||
? raw
|
||||
.split(",")
|
||||
.map((v) => v.trim())
|
||||
.filter(Boolean)
|
||||
: [defaultShell];
|
||||
return new Set(values);
|
||||
}
|
||||
|
||||
function resolveShell(requested?: string): string {
|
||||
const defaultShell = resolveDefaultShell();
|
||||
if (!requested?.trim()) {
|
||||
return defaultShell;
|
||||
}
|
||||
const candidate = requested.trim();
|
||||
const allowed = resolveAllowedShells(defaultShell);
|
||||
if (!allowed.has(candidate)) {
|
||||
throw new GatewayPtyError("PTY_INVALID_ARGS", `shell is not allowed: ${candidate}`);
|
||||
}
|
||||
return candidate;
|
||||
}
|
||||
|
||||
function resolveCwd(requested?: string): string {
|
||||
const base = process.env.OPENCLAW_PTY_CWD || process.cwd();
|
||||
const home = os.homedir();
|
||||
const fallback = path.resolve(base || home);
|
||||
if (!requested?.trim()) {
|
||||
return fallback;
|
||||
}
|
||||
const expanded = requested.startsWith("~/") ? path.join(home, requested.slice(2)) : requested;
|
||||
return path.resolve(expanded);
|
||||
}
|
||||
|
||||
function toStringEnv(env: NodeJS.ProcessEnv): Record<string, string> {
|
||||
const out: Record<string, string> = {};
|
||||
for (const [key, value] of Object.entries(env)) {
|
||||
if (typeof value === "string") {
|
||||
out[key] = value;
|
||||
}
|
||||
}
|
||||
return out;
|
||||
}
|
||||
|
||||
async function loadSpawn(): Promise<PtySpawn> {
|
||||
const mod = (await import("@lydell/node-pty")) as unknown as PtyModule;
|
||||
const spawn = mod.spawn ?? mod.default?.spawn;
|
||||
if (!spawn) {
|
||||
throw new Error("PTY support is unavailable");
|
||||
}
|
||||
return spawn;
|
||||
}
|
||||
|
||||
function publicSession(session: ActiveSession): GatewayPtySession {
|
||||
return {
|
||||
sessionId: session.sessionId,
|
||||
owner: { ...session.owner },
|
||||
shell: session.shell,
|
||||
cwd: session.cwd,
|
||||
cols: session.cols,
|
||||
rows: session.rows,
|
||||
createdAt: session.createdAt,
|
||||
lastActive: session.lastActive,
|
||||
exitedAt: session.exitedAt,
|
||||
exitCode: session.exitCode,
|
||||
};
|
||||
}
|
||||
|
||||
function markActive(session: ActiveSession): void {
|
||||
session.lastActive = Date.now();
|
||||
}
|
||||
|
||||
function ensureCapacity(ownerKey: string): void {
|
||||
const limits = getPtyLimits();
|
||||
if (sessions.size >= limits.maxTotalSessions) {
|
||||
throw new GatewayPtyError(
|
||||
"PTY_LIMIT_REACHED",
|
||||
`PTY session limit reached (${limits.maxTotalSessions} total)`,
|
||||
);
|
||||
}
|
||||
const ownerSessions = Array.from(sessions.values()).filter(
|
||||
(session) => session.owner.ownerKey === ownerKey,
|
||||
);
|
||||
if (ownerSessions.length >= limits.maxSessionsPerOwner) {
|
||||
throw new GatewayPtyError(
|
||||
"PTY_LIMIT_REACHED",
|
||||
`PTY session limit reached (${limits.maxSessionsPerOwner} per owner)`,
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
function ensureIdleSweep(): void {
|
||||
const { idleTimeoutMs, idleSweepIntervalMs } = getPtyLimits();
|
||||
if (idleTimeoutMs <= 0) {
|
||||
if (idleSweepTimer) {
|
||||
clearInterval(idleSweepTimer);
|
||||
idleSweepTimer = null;
|
||||
}
|
||||
return;
|
||||
}
|
||||
if (idleSweepTimer) {
|
||||
return;
|
||||
}
|
||||
idleSweepTimer = setInterval(() => {
|
||||
const now = Date.now();
|
||||
for (const session of sessions.values()) {
|
||||
if (now - session.lastActive >= idleTimeoutMs) {
|
||||
destroyGatewayPtySession(session.sessionId);
|
||||
}
|
||||
}
|
||||
if (sessions.size === 0 && idleSweepTimer) {
|
||||
clearInterval(idleSweepTimer);
|
||||
idleSweepTimer = null;
|
||||
}
|
||||
}, idleSweepIntervalMs);
|
||||
idleSweepTimer.unref?.();
|
||||
}
|
||||
|
||||
export async function createGatewayPtySession(params: {
|
||||
owner: GatewayPtyOwner;
|
||||
cols?: number;
|
||||
rows?: number;
|
||||
cwd?: string;
|
||||
shell?: string;
|
||||
onOutput: (event: { sessionId: string; data: string; connId: string }) => void;
|
||||
onExit: (event: { sessionId: string; code: number | null; connId: string }) => void;
|
||||
}): Promise<GatewayPtySession> {
|
||||
ensureCapacity(params.owner.ownerKey);
|
||||
const spawn = await loadSpawn();
|
||||
const limits = getPtyLimits();
|
||||
const cols = sanitizeInitialDim(params.cols, 80, limits.minCols, limits.maxCols, "cols");
|
||||
const rows = sanitizeInitialDim(params.rows, 24, limits.minRows, limits.maxRows, "rows");
|
||||
const shell = resolveShell(params.shell);
|
||||
const cwd = resolveCwd(params.cwd);
|
||||
const sessionId = crypto.randomUUID();
|
||||
const now = Date.now();
|
||||
const pty = spawn(shell, [], {
|
||||
name: process.env.TERM || "xterm-256color",
|
||||
cols,
|
||||
rows,
|
||||
cwd,
|
||||
env: toStringEnv(process.env),
|
||||
});
|
||||
const session: ActiveSession = {
|
||||
sessionId,
|
||||
owner: { ...params.owner },
|
||||
shell,
|
||||
cwd,
|
||||
cols,
|
||||
rows,
|
||||
createdAt: now,
|
||||
lastActive: now,
|
||||
pty,
|
||||
};
|
||||
session.outputDispose =
|
||||
pty.onData((data) => {
|
||||
markActive(session);
|
||||
params.onOutput({ sessionId, data, connId: session.owner.connId });
|
||||
}) ?? null;
|
||||
session.exitDispose =
|
||||
pty.onExit((event) => {
|
||||
session.exitedAt = Date.now();
|
||||
session.exitCode = event.exitCode ?? null;
|
||||
try {
|
||||
params.onExit({ sessionId, code: session.exitCode, connId: session.owner.connId });
|
||||
} finally {
|
||||
destroyGatewayPtySession(sessionId);
|
||||
}
|
||||
}) ?? null;
|
||||
sessions.set(sessionId, session);
|
||||
ensureIdleSweep();
|
||||
return publicSession(session);
|
||||
}
|
||||
|
||||
export function listGatewayPtySessionsByOwner(ownerKey: string): GatewayPtySession[] {
|
||||
return Array.from(sessions.values())
|
||||
.filter((session) => session.owner.ownerKey === ownerKey)
|
||||
.map(publicSession);
|
||||
}
|
||||
|
||||
export function getGatewayPtySession(sessionId: string): GatewayPtySession | undefined {
|
||||
const session = sessions.get(sessionId);
|
||||
return session ? publicSession(session) : undefined;
|
||||
}
|
||||
|
||||
export function touchGatewayPtySessionOwner(params: { sessionId: string; connId: string }): void {
|
||||
const session = sessions.get(params.sessionId);
|
||||
if (!session) {
|
||||
return;
|
||||
}
|
||||
session.owner.connId = params.connId;
|
||||
markActive(session);
|
||||
}
|
||||
|
||||
export function writeGatewayPtySession(sessionId: string, data: string): void {
|
||||
const session = sessions.get(sessionId);
|
||||
if (!session) {
|
||||
throw new GatewayPtyError("PTY_NOT_FOUND", `PTY session not found: ${sessionId}`);
|
||||
}
|
||||
const byteLength = Buffer.byteLength(data, "utf8");
|
||||
const { maxInputChunkBytes } = getPtyLimits();
|
||||
if (byteLength > maxInputChunkBytes) {
|
||||
throw new GatewayPtyError(
|
||||
"PTY_INPUT_TOO_LARGE",
|
||||
`PTY input exceeds ${maxInputChunkBytes} bytes`,
|
||||
);
|
||||
}
|
||||
markActive(session);
|
||||
session.pty.write(data);
|
||||
}
|
||||
|
||||
export function resizeGatewayPtySession(sessionId: string, cols?: number, rows?: number): void {
|
||||
const session = sessions.get(sessionId);
|
||||
if (!session) {
|
||||
throw new GatewayPtyError("PTY_NOT_FOUND", `PTY session not found: ${sessionId}`);
|
||||
}
|
||||
const limits = getPtyLimits();
|
||||
const nextCols = sanitizeResizeDim(cols, session.cols, limits.minCols, limits.maxCols, "cols");
|
||||
const nextRows = sanitizeResizeDim(rows, session.rows, limits.minRows, limits.maxRows, "rows");
|
||||
session.cols = nextCols;
|
||||
session.rows = nextRows;
|
||||
markActive(session);
|
||||
session.pty.resize?.(nextCols, nextRows);
|
||||
}
|
||||
|
||||
export function destroyGatewayPtySession(sessionId: string): void {
|
||||
const session = sessions.get(sessionId);
|
||||
if (!session) {
|
||||
return;
|
||||
}
|
||||
sessions.delete(sessionId);
|
||||
try {
|
||||
session.outputDispose?.dispose();
|
||||
} catch {}
|
||||
try {
|
||||
session.exitDispose?.dispose();
|
||||
} catch {}
|
||||
try {
|
||||
session.pty.kill("SIGKILL");
|
||||
} catch {}
|
||||
if (sessions.size === 0 && idleSweepTimer) {
|
||||
clearInterval(idleSweepTimer);
|
||||
idleSweepTimer = null;
|
||||
}
|
||||
}
|
||||
|
||||
export function destroyGatewayPtySessionsForConn(connId: string): number {
|
||||
const sessionIds = Array.from(sessions.values())
|
||||
.filter((session) => session.owner.connId === connId)
|
||||
.map((session) => session.sessionId);
|
||||
for (const sessionId of sessionIds) {
|
||||
destroyGatewayPtySession(sessionId);
|
||||
}
|
||||
return sessionIds.length;
|
||||
}
|
||||
|
||||
export function assertGatewayPtyOwnership(params: {
|
||||
sessionId: string;
|
||||
ownerKey: string;
|
||||
connId: string;
|
||||
}): GatewayPtySession {
|
||||
const session = sessions.get(params.sessionId);
|
||||
if (!session) {
|
||||
throw new GatewayPtyError("PTY_NOT_FOUND", `PTY session not found: ${params.sessionId}`);
|
||||
}
|
||||
if (session.owner.ownerKey !== params.ownerKey) {
|
||||
throw new GatewayPtyError(
|
||||
"PTY_ACCESS_DENIED",
|
||||
`PTY session does not belong to this gateway client: ${params.sessionId}`,
|
||||
);
|
||||
}
|
||||
session.owner.connId = params.connId;
|
||||
markActive(session);
|
||||
return publicSession(session);
|
||||
}
|
||||
@ -99,6 +99,11 @@ const BASE_METHODS = [
|
||||
"agent.identity.get",
|
||||
"agent.wait",
|
||||
"browser.request",
|
||||
"pty.create",
|
||||
"pty.write",
|
||||
"pty.resize",
|
||||
"pty.kill",
|
||||
"pty.list",
|
||||
// WebChat WebSocket-native chat methods
|
||||
"chat.history",
|
||||
"chat.abort",
|
||||
|
||||
@ -20,6 +20,7 @@ import { logsHandlers } from "./server-methods/logs.js";
|
||||
import { modelsHandlers } from "./server-methods/models.js";
|
||||
import { nodePendingHandlers } from "./server-methods/nodes-pending.js";
|
||||
import { nodeHandlers } from "./server-methods/nodes.js";
|
||||
import { ptyHandlers } from "./server-methods/pty.js";
|
||||
import { pushHandlers } from "./server-methods/push.js";
|
||||
import { sendHandlers } from "./server-methods/send.js";
|
||||
import { sessionsHandlers } from "./server-methods/sessions.js";
|
||||
@ -90,6 +91,7 @@ export const coreGatewayHandlers: GatewayRequestHandlers = {
|
||||
...nodeHandlers,
|
||||
...nodePendingHandlers,
|
||||
...pushHandlers,
|
||||
...ptyHandlers,
|
||||
...sendHandlers,
|
||||
...usageHandlers,
|
||||
...agentHandlers,
|
||||
|
||||
151
src/gateway/server-methods/pty.ts
Normal file
151
src/gateway/server-methods/pty.ts
Normal file
@ -0,0 +1,151 @@
|
||||
import { ErrorCodes, errorShape } from "../protocol/index.js";
|
||||
import {
|
||||
assertGatewayPtyOwnership,
|
||||
createGatewayPtySession,
|
||||
destroyGatewayPtySession,
|
||||
GatewayPtyError,
|
||||
listGatewayPtySessionsByOwner,
|
||||
resizeGatewayPtySession,
|
||||
writeGatewayPtySession,
|
||||
} from "../pty-manager.js";
|
||||
import type { GatewayRequestHandlers } from "./types.js";
|
||||
|
||||
function getPtyOwner(client: { connect?: { device?: { id?: string } }; connId?: string } | null): {
|
||||
ownerKey: string;
|
||||
connId: string;
|
||||
deviceId?: string;
|
||||
} {
|
||||
const connId = client?.connId?.trim();
|
||||
if (!connId) {
|
||||
throw new GatewayPtyError(
|
||||
"PTY_INVALID_ARGS",
|
||||
"PTY requires an authenticated gateway connection",
|
||||
);
|
||||
}
|
||||
const deviceId = client?.connect?.device?.id?.trim() || undefined;
|
||||
return {
|
||||
ownerKey: deviceId ? `device:${deviceId}` : `conn:${connId}`,
|
||||
connId,
|
||||
deviceId,
|
||||
};
|
||||
}
|
||||
|
||||
function invalidParams(message: string) {
|
||||
return errorShape(ErrorCodes.INVALID_PARAMS, message);
|
||||
}
|
||||
|
||||
function asString(value: unknown): string | undefined {
|
||||
return typeof value === "string" ? value : undefined;
|
||||
}
|
||||
|
||||
function asNumber(value: unknown): number | undefined {
|
||||
return typeof value === "number" && Number.isFinite(value) ? value : undefined;
|
||||
}
|
||||
|
||||
function mapPtyError(error: unknown) {
|
||||
if (error instanceof GatewayPtyError) {
|
||||
switch (error.code) {
|
||||
case "PTY_NOT_FOUND":
|
||||
return errorShape(ErrorCodes.NOT_FOUND, error.message);
|
||||
case "PTY_ACCESS_DENIED":
|
||||
return errorShape(ErrorCodes.FORBIDDEN, error.message);
|
||||
case "PTY_LIMIT_REACHED":
|
||||
return errorShape(ErrorCodes.RESOURCE_LIMIT, error.message);
|
||||
case "PTY_INPUT_TOO_LARGE":
|
||||
return errorShape(ErrorCodes.PAYLOAD_TOO_LARGE, error.message);
|
||||
case "PTY_INVALID_ARGS":
|
||||
default:
|
||||
return invalidParams(error.message);
|
||||
}
|
||||
}
|
||||
return invalidParams(error instanceof Error ? error.message : String(error));
|
||||
}
|
||||
|
||||
export const ptyHandlers: GatewayRequestHandlers = {
|
||||
"pty.create": async ({ client, params, respond, context }) => {
|
||||
try {
|
||||
const owner = getPtyOwner(client);
|
||||
const session = await createGatewayPtySession({
|
||||
owner,
|
||||
cols: asNumber(params.cols),
|
||||
rows: asNumber(params.rows),
|
||||
cwd: asString(params.cwd),
|
||||
shell: asString(params.shell),
|
||||
onOutput: ({ sessionId, data, connId }) => {
|
||||
context.broadcastToConnIds("pty.output", { sessionId, data }, new Set([connId]));
|
||||
},
|
||||
onExit: ({ sessionId, code, connId }) => {
|
||||
context.broadcastToConnIds("pty.exit", { sessionId, code }, new Set([connId]));
|
||||
},
|
||||
});
|
||||
respond(true, { sessionId: session.sessionId, cwd: session.cwd, shell: session.shell });
|
||||
} catch (error) {
|
||||
respond(false, undefined, mapPtyError(error));
|
||||
}
|
||||
},
|
||||
"pty.write": ({ client, params, respond }) => {
|
||||
try {
|
||||
const owner = getPtyOwner(client);
|
||||
const sessionId = asString(params.sessionId)?.trim();
|
||||
const data = asString(params.data);
|
||||
if (!sessionId) {
|
||||
respond(false, undefined, invalidParams("pty.write requires sessionId"));
|
||||
return;
|
||||
}
|
||||
if (typeof data !== "string") {
|
||||
respond(false, undefined, invalidParams("pty.write requires string data"));
|
||||
return;
|
||||
}
|
||||
assertGatewayPtyOwnership({ sessionId, ownerKey: owner.ownerKey, connId: owner.connId });
|
||||
writeGatewayPtySession(sessionId, data);
|
||||
respond(true, { ok: true });
|
||||
} catch (error) {
|
||||
respond(false, undefined, mapPtyError(error));
|
||||
}
|
||||
},
|
||||
"pty.resize": ({ client, params, respond }) => {
|
||||
try {
|
||||
const owner = getPtyOwner(client);
|
||||
const sessionId = asString(params.sessionId)?.trim();
|
||||
if (!sessionId) {
|
||||
respond(false, undefined, invalidParams("pty.resize requires sessionId"));
|
||||
return;
|
||||
}
|
||||
assertGatewayPtyOwnership({ sessionId, ownerKey: owner.ownerKey, connId: owner.connId });
|
||||
resizeGatewayPtySession(sessionId, asNumber(params.cols), asNumber(params.rows));
|
||||
respond(true, { ok: true });
|
||||
} catch (error) {
|
||||
respond(false, undefined, mapPtyError(error));
|
||||
}
|
||||
},
|
||||
"pty.kill": ({ client, params, respond }) => {
|
||||
try {
|
||||
const owner = getPtyOwner(client);
|
||||
const sessionId = asString(params.sessionId)?.trim();
|
||||
if (!sessionId) {
|
||||
respond(false, undefined, invalidParams("pty.kill requires sessionId"));
|
||||
return;
|
||||
}
|
||||
assertGatewayPtyOwnership({ sessionId, ownerKey: owner.ownerKey, connId: owner.connId });
|
||||
destroyGatewayPtySession(sessionId);
|
||||
respond(true, { ok: true });
|
||||
} catch (error) {
|
||||
respond(false, undefined, mapPtyError(error));
|
||||
}
|
||||
},
|
||||
"pty.list": ({ client, respond }) => {
|
||||
try {
|
||||
const owner = getPtyOwner(client);
|
||||
const sessions = listGatewayPtySessionsByOwner(owner.ownerKey).map((session) => ({
|
||||
sessionId: session.sessionId,
|
||||
createdAt: session.createdAt,
|
||||
lastActive: session.lastActive,
|
||||
cols: session.cols,
|
||||
rows: session.rows,
|
||||
}));
|
||||
respond(true, { sessions });
|
||||
} catch (error) {
|
||||
respond(false, undefined, mapPtyError(error));
|
||||
}
|
||||
},
|
||||
};
|
||||
@ -9,6 +9,7 @@ import { isWebchatClient } from "../../utils/message-channel.js";
|
||||
import type { AuthRateLimiter } from "../auth-rate-limit.js";
|
||||
import type { ResolvedGatewayAuth } from "../auth.js";
|
||||
import { isLoopbackAddress } from "../net.js";
|
||||
import { destroyGatewayPtySessionsForConn } from "../pty-manager.js";
|
||||
import { getHandshakeTimeoutMs } from "../server-constants.js";
|
||||
import type { GatewayRequestContext, GatewayRequestHandlers } from "../server-methods/types.js";
|
||||
import { formatError } from "../server-utils.js";
|
||||
@ -242,6 +243,9 @@ export function attachGatewayWsConnectionHandler(params: AttachGatewayWsConnecti
|
||||
upsertPresence(client.presenceKey, { reason: "disconnect" });
|
||||
broadcastPresenceSnapshot({ broadcast, incrementPresenceVersion, getHealthVersion });
|
||||
}
|
||||
if (client?.connId) {
|
||||
destroyGatewayPtySessionsForConn(client.connId);
|
||||
}
|
||||
if (client?.connect?.role === "node") {
|
||||
const context = buildRequestContext();
|
||||
const nodeId = context.nodeRegistry.unregister(connId);
|
||||
|
||||
@ -9,6 +9,7 @@ type GatewayClientMock = {
|
||||
start: ReturnType<typeof vi.fn>;
|
||||
stop: ReturnType<typeof vi.fn>;
|
||||
options: { clientVersion?: string };
|
||||
emitHello: (hello: Record<string, unknown>) => void;
|
||||
emitClose: (info: {
|
||||
code: number;
|
||||
reason?: string;
|
||||
@ -39,6 +40,7 @@ vi.mock("./gateway.ts", () => {
|
||||
constructor(
|
||||
private opts: {
|
||||
clientVersion?: string;
|
||||
onHello?: (hello: Record<string, unknown>) => void;
|
||||
onClose?: (info: {
|
||||
code: number;
|
||||
reason: string;
|
||||
@ -52,6 +54,9 @@ vi.mock("./gateway.ts", () => {
|
||||
start: this.start,
|
||||
stop: this.stop,
|
||||
options: { clientVersion: this.opts.clientVersion },
|
||||
emitHello: (hello) => {
|
||||
this.opts.onHello?.(hello as never);
|
||||
},
|
||||
emitClose: (info) => {
|
||||
this.opts.onClose?.({
|
||||
code: info.code,
|
||||
@ -158,6 +163,31 @@ describe("connectGateway", () => {
|
||||
);
|
||||
});
|
||||
|
||||
it("falls back to the main session when persisted session keys point at cron chats", () => {
|
||||
const host = createHost();
|
||||
host.sessionKey = "agent:main:cron:nightly-brief";
|
||||
host.settings.sessionKey = "agent:main:cron:nightly-brief";
|
||||
host.settings.lastActiveSessionKey = "cron:nightly-brief";
|
||||
|
||||
connectGateway(host);
|
||||
const client = gatewayClientInstances[0];
|
||||
expect(client).toBeDefined();
|
||||
|
||||
client.emitHello({
|
||||
snapshot: {
|
||||
sessionDefaults: {
|
||||
mainSessionKey: "agent:main:main",
|
||||
mainKey: "main",
|
||||
defaultAgentId: "main",
|
||||
},
|
||||
},
|
||||
});
|
||||
|
||||
expect(host.sessionKey).toBe("agent:main:main");
|
||||
expect(host.settings.sessionKey).toBe("agent:main:main");
|
||||
expect(host.settings.lastActiveSessionKey).toBe("agent:main:main");
|
||||
});
|
||||
|
||||
it("ignores stale client onEvent callbacks after reconnect", () => {
|
||||
const host = createHost();
|
||||
|
||||
|
||||
@ -118,6 +118,24 @@ export function resolveControlUiClientVersion(params: {
|
||||
}
|
||||
}
|
||||
|
||||
function isCronSessionKey(value: string | undefined): boolean {
|
||||
const normalized = (value ?? "").trim().toLowerCase();
|
||||
if (!normalized) {
|
||||
return false;
|
||||
}
|
||||
if (normalized.startsWith("cron:")) {
|
||||
return true;
|
||||
}
|
||||
if (!normalized.startsWith("agent:")) {
|
||||
return false;
|
||||
}
|
||||
const parts = normalized.split(":").filter(Boolean);
|
||||
if (parts.length < 3) {
|
||||
return false;
|
||||
}
|
||||
return parts.slice(2).join(":").startsWith("cron:");
|
||||
}
|
||||
|
||||
function normalizeSessionKeyForDefaults(
|
||||
value: string | undefined,
|
||||
defaults: SessionDefaultsSnapshot,
|
||||
@ -130,6 +148,9 @@ function normalizeSessionKeyForDefaults(
|
||||
if (!raw) {
|
||||
return mainSessionKey;
|
||||
}
|
||||
if (isCronSessionKey(raw)) {
|
||||
return mainSessionKey;
|
||||
}
|
||||
const mainKey = defaults.mainKey?.trim() || "main";
|
||||
const defaultAgentId = defaults.defaultAgentId?.trim();
|
||||
const isAlias =
|
||||
|
||||
@ -1,6 +1,7 @@
|
||||
const KEY = "openclaw.control.settings.v1";
|
||||
const LEGACY_TOKEN_SESSION_KEY = "openclaw.control.token.v1";
|
||||
const TOKEN_SESSION_KEY_PREFIX = "openclaw.control.token.v1:";
|
||||
const TOKEN_LOCAL_KEY_PREFIX = "openclaw.control.token.persisted.v1:";
|
||||
|
||||
type PersistedUiSettings = Omit<UiSettings, "token"> & { token?: never };
|
||||
|
||||
@ -11,6 +12,7 @@ import { parseThemeSelection, type ThemeMode, type ThemeName } from "./theme.ts"
|
||||
export type UiSettings = {
|
||||
gatewayUrl: string;
|
||||
token: string;
|
||||
rememberGatewayAuth: boolean;
|
||||
sessionKey: string;
|
||||
lastActiveSessionKey: string;
|
||||
theme: ThemeName;
|
||||
@ -86,6 +88,10 @@ function tokenSessionKeyForGateway(gatewayUrl: string): string {
|
||||
return `${TOKEN_SESSION_KEY_PREFIX}${normalizeGatewayTokenScope(gatewayUrl)}`;
|
||||
}
|
||||
|
||||
function tokenLocalKeyForGateway(gatewayUrl: string): string {
|
||||
return `${TOKEN_LOCAL_KEY_PREFIX}${normalizeGatewayTokenScope(gatewayUrl)}`;
|
||||
}
|
||||
|
||||
function loadSessionToken(gatewayUrl: string): string {
|
||||
try {
|
||||
const storage = getSessionStorage();
|
||||
@ -119,12 +125,37 @@ function persistSessionToken(gatewayUrl: string, token: string) {
|
||||
}
|
||||
}
|
||||
|
||||
function loadRememberedToken(gatewayUrl: string): string {
|
||||
try {
|
||||
const token = localStorage.getItem(tokenLocalKeyForGateway(gatewayUrl)) ?? "";
|
||||
return token.trim();
|
||||
} catch {
|
||||
return "";
|
||||
}
|
||||
}
|
||||
|
||||
function persistGatewayToken(gatewayUrl: string, token: string, remember: boolean) {
|
||||
try {
|
||||
const normalized = token.trim();
|
||||
persistSessionToken(gatewayUrl, remember ? "" : normalized);
|
||||
const localKey = tokenLocalKeyForGateway(gatewayUrl);
|
||||
if (remember && normalized) {
|
||||
localStorage.setItem(localKey, normalized);
|
||||
} else {
|
||||
localStorage.removeItem(localKey);
|
||||
}
|
||||
} catch {
|
||||
// best-effort
|
||||
}
|
||||
}
|
||||
|
||||
export function loadSettings(): UiSettings {
|
||||
const { pageUrl: pageDerivedUrl, effectiveUrl: defaultUrl } = deriveDefaultGatewayUrl();
|
||||
|
||||
const defaults: UiSettings = {
|
||||
gatewayUrl: defaultUrl,
|
||||
token: loadSessionToken(defaultUrl),
|
||||
rememberGatewayAuth: false,
|
||||
sessionKey: "main",
|
||||
lastActiveSessionKey: "main",
|
||||
theme: "claw",
|
||||
@ -152,10 +183,12 @@ export function loadSettings(): UiSettings {
|
||||
(parsed as { theme?: unknown }).theme,
|
||||
(parsed as { themeMode?: unknown }).themeMode,
|
||||
);
|
||||
const rememberGatewayAuth =
|
||||
typeof parsed.rememberGatewayAuth === "boolean" ? parsed.rememberGatewayAuth : false;
|
||||
const settings = {
|
||||
gatewayUrl,
|
||||
// Gateway auth is intentionally in-memory only; scrub any legacy persisted token on load.
|
||||
token: loadSessionToken(gatewayUrl),
|
||||
token: rememberGatewayAuth ? loadRememberedToken(gatewayUrl) : loadSessionToken(gatewayUrl),
|
||||
rememberGatewayAuth,
|
||||
sessionKey:
|
||||
typeof parsed.sessionKey === "string" && parsed.sessionKey.trim()
|
||||
? parsed.sessionKey.trim()
|
||||
@ -205,9 +238,10 @@ export function saveSettings(next: UiSettings) {
|
||||
}
|
||||
|
||||
function persistSettings(next: UiSettings) {
|
||||
persistSessionToken(next.gatewayUrl, next.token);
|
||||
persistGatewayToken(next.gatewayUrl, next.token, next.rememberGatewayAuth);
|
||||
const persisted: PersistedUiSettings = {
|
||||
gatewayUrl: next.gatewayUrl,
|
||||
rememberGatewayAuth: next.rememberGatewayAuth,
|
||||
sessionKey: next.sessionKey,
|
||||
lastActiveSessionKey: next.lastActiveSessionKey,
|
||||
theme: next.theme,
|
||||
|
||||
@ -97,6 +97,17 @@ export function renderLoginGate(state: AppViewState) {
|
||||
</button>
|
||||
</div>
|
||||
</label>
|
||||
<label class="field" style="gap:8px; flex-direction:row; align-items:center;">
|
||||
<input
|
||||
type="checkbox"
|
||||
.checked=${state.settings.rememberGatewayAuth}
|
||||
@change=${(e: Event) => {
|
||||
const checked = (e.target as HTMLInputElement).checked;
|
||||
state.applySettings({ ...state.settings, rememberGatewayAuth: checked });
|
||||
}}
|
||||
/>
|
||||
<span>Remember me on this device</span>
|
||||
</label>
|
||||
<button
|
||||
class="btn primary login-gate__connect"
|
||||
@click=${() => state.connect()}
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user