From d57c2bbf36d02d7b522b2149a774559cbf47bcd1 Mon Sep 17 00:00:00 2001 From: kumarabhirup Date: Sat, 21 Feb 2026 11:03:11 -0800 Subject: [PATCH] CLI: add --subscribe-session-key mode for tailing agent events --- src/cli/program/register.agent.ts | 7 +- src/commands/agent-via-gateway.ts | 211 +++++++++++++++++++++++++++++- 2 files changed, 215 insertions(+), 3 deletions(-) diff --git a/src/cli/program/register.agent.ts b/src/cli/program/register.agent.ts index 39bff8614c9..96027d1f85f 100644 --- a/src/cli/program/register.agent.ts +++ b/src/cli/program/register.agent.ts @@ -21,7 +21,7 @@ export function registerAgentCommands(program: Command, args: { agentChannelOpti program .command("agent") .description("Run an agent turn via the Gateway (use --local for embedded)") - .requiredOption("-m, --message ", "Message body for the agent") + .option("-m, --message ", "Message body for the agent") .option("-t, --to ", "Recipient number in E.164 used to derive the session key") .option("--session-id ", "Use an explicit session id") .option("--session-key ", "Explicit session key (e.g. agent:main:subagent:uuid)") @@ -44,6 +44,11 @@ export function registerAgentCommands(program: Command, args: { agentChannelOpti .option("--deliver", "Send the agent's reply back to the selected channel", false) .option("--json", "Output result as JSON", false) .option("--stream-json", "Stream NDJSON events to stdout", false) + .option( + "--subscribe-session-key ", + "Subscribe to gateway events for a session key (subscribe-only, no message required)", + ) + .option("--after-seq ", "Replay events after this global sequence cursor", "0") .option( "--timeout ", "Override agent command timeout (seconds, default 600 or config value)", diff --git a/src/commands/agent-via-gateway.ts b/src/commands/agent-via-gateway.ts index 3dfe89854f7..86b75dd26f8 100644 --- a/src/commands/agent-via-gateway.ts +++ b/src/commands/agent-via-gateway.ts @@ -4,8 +4,16 @@ import { listAgentIds } from "../agents/agent-scope.js"; import { DEFAULT_CHAT_CHANNEL } from "../channels/registry.js"; import { formatCliCommand } from "../cli/command-format.js"; import { withProgress } from "../cli/progress.js"; -import { loadConfig } from "../config/config.js"; -import { callGateway, randomIdempotencyKey } from "../gateway/call.js"; +import { loadConfig, resolveConfigPath, resolveStateDir } from "../config/config.js"; +import { + buildGatewayConnectionDetails, + callGateway, + randomIdempotencyKey, +} from "../gateway/call.js"; +import { GatewayClient } from "../gateway/client.js"; +import { PROTOCOL_VERSION } from "../gateway/protocol/index.js"; +import { loadOrCreateDeviceIdentity } from "../infra/device-identity.js"; +import { loadGatewayTlsRuntime } from "../infra/tls/gateway.js"; import { normalizeAgentId } from "../routing/session-key.js"; import { GATEWAY_CLIENT_MODES, @@ -313,7 +321,206 @@ async function agentViaGatewayStreamJson(opts: AgentCliOpts, _runtime: RuntimeEn } } +/** + * Subscribe to a session key's events via the gateway `agent.subscribe` RPC. + * Streams NDJSON to stdout until SIGTERM/SIGINT. + */ +async function agentSubscribeStreamJson( + sessionKey: string, + afterSeq: number, + _runtime: RuntimeEnv, +): Promise { + const cfg = loadConfig(); + const isRemoteMode = cfg.gateway?.mode === "remote"; + const remote = isRemoteMode ? cfg.gateway?.remote : undefined; + const remoteUrl = + typeof remote?.url === "string" && remote.url.trim().length > 0 ? remote.url.trim() : undefined; + if (isRemoteMode && !remoteUrl) { + const configPath = resolveConfigPath(process.env, resolveStateDir(process.env)); + throw new Error( + [ + "gateway remote mode misconfigured: gateway.remote.url missing", + `Config: ${configPath}`, + "Fix: set gateway.remote.url, or set gateway.mode=local.", + ].join("\n"), + ); + } + const connectionDetails = buildGatewayConnectionDetails({ config: cfg }); + const useLocalTls = + cfg.gateway?.tls?.enabled === true && !remoteUrl && connectionDetails.url.startsWith("wss://"); + const tlsRuntime = useLocalTls ? await loadGatewayTlsRuntime(cfg.gateway?.tls) : undefined; + const tlsFingerprint = + (isRemoteMode && remoteUrl && typeof remote?.tlsFingerprint === "string" + ? remote.tlsFingerprint.trim() + : undefined) || (tlsRuntime?.enabled ? tlsRuntime.fingerprintSha256 : undefined); + const authToken = cfg.gateway?.auth?.token; + const authPassword = cfg.gateway?.auth?.password; + const token = + isRemoteMode && remoteUrl + ? typeof remote?.token === "string" && remote.token.trim().length > 0 + ? remote.token.trim() + : undefined + : process.env.OPENCLAW_GATEWAY_TOKEN?.trim() || + process.env.CLAWDBOT_GATEWAY_TOKEN?.trim() || + (typeof authToken === "string" && authToken.trim().length > 0 + ? authToken.trim() + : undefined); + const password = + process.env.OPENCLAW_GATEWAY_PASSWORD?.trim() || + process.env.CLAWDBOT_GATEWAY_PASSWORD?.trim() || + (isRemoteMode && remoteUrl + ? typeof remote?.password === "string" && remote.password.trim().length > 0 + ? remote.password.trim() + : undefined + : typeof authPassword === "string" && authPassword.trim().length > 0 + ? authPassword.trim() + : undefined); + + let cursor = Math.max(0, Number.isFinite(afterSeq) ? afterSeq : 0); + const abortController = new AbortController(); + let client: GatewayClient | null = null; + const onSignal = () => { + if (!abortController.signal.aborted) { + abortController.abort(); + } + }; + process.on("SIGTERM", onSignal); + process.on("SIGINT", onSignal); + + try { + await new Promise((resolve, reject) => { + let settled = false; + let aborting = false; + + const settle = (err?: Error) => { + if (settled) { + return; + } + settled = true; + if (err) { + reject(err); + return; + } + resolve(); + }; + + const onAbort = () => { + if (aborting) { + return; + } + aborting = true; + const unsubscribeAndStop = async () => { + try { + await client?.request("agent.unsubscribe", { sessionKey }).catch(() => {}); + } finally { + client?.stop(); + emitNdjsonLine({ event: "aborted", reason: "signal" }); + settle(); + } + }; + void unsubscribeAndStop(); + }; + + if (abortController.signal.aborted) { + onAbort(); + return; + } + abortController.signal.addEventListener("abort", onAbort, { once: true }); + + client = new GatewayClient({ + url: connectionDetails.url, + token, + password, + tlsFingerprint, + instanceId: randomIdempotencyKey(), + clientName: GATEWAY_CLIENT_NAMES.CLI, + clientVersion: "dev", + platform: process.platform, + mode: GATEWAY_CLIENT_MODES.CLI, + role: "operator", + scopes: ["operator.admin", "operator.approvals", "operator.pairing"], + caps: ["tool-events"], + deviceIdentity: loadOrCreateDeviceIdentity(), + minProtocol: PROTOCOL_VERSION, + maxProtocol: PROTOCOL_VERSION, + onHelloOk: async () => { + try { + await client?.request("agent.subscribe", { sessionKey, afterSeq: cursor }); + } catch (err) { + client?.stop(); + const error = err instanceof Error ? err : new Error(String(err)); + settle(error); + } + }, + onEvent: (evt) => { + if (evt.event !== "agent") { + return; + } + const payload = + evt.payload && typeof evt.payload === "object" + ? (evt.payload as Record) + : undefined; + if (!payload || payload.sessionKey !== sessionKey) { + return; + } + const globalSeq = typeof payload.globalSeq === "number" ? payload.globalSeq : undefined; + if (globalSeq !== undefined && globalSeq > cursor) { + cursor = globalSeq; + } + emitNdjsonLine({ event: evt.event, ...payload }); + }, + onConnectError: (err) => { + if (aborting || settled) { + return; + } + client?.stop(); + settle(err); + }, + onClose: (code, reason) => { + if (aborting || settled || abortController.signal.aborted) { + return; + } + // For reconnectable closes, let GatewayClient retry. + if (code === 1000) { + return; + } + client?.stop(); + const reasonText = reason?.trim() || "no close reason"; + settle( + new Error( + `gateway subscribe closed (${code}): ${reasonText}\n${connectionDetails.message}`, + ), + ); + }, + }); + client.start(); + }); + } catch (err) { + if (err instanceof DOMException && err.name === "AbortError") { + emitNdjsonLine({ event: "aborted", reason: "signal" }); + return; + } + throw err; + } finally { + process.removeListener("SIGTERM", onSignal); + process.removeListener("SIGINT", onSignal); + } +} + export async function agentCliCommand(opts: AgentCliOpts, runtime: RuntimeEnv, deps?: CliDeps) { + // Subscribe-only mode: tail events for a session key with replay cursor. + const subscribeKey = (opts as Record).subscribeSessionKey as string | undefined; + if (subscribeKey && opts.streamJson) { + const afterSeq = + Number.parseInt(String((opts as Record).afterSeq ?? "0"), 10) || 0; + return await agentSubscribeStreamJson(subscribeKey.trim(), afterSeq, runtime); + } + + // --message is required for all non-subscribe paths. + if (!opts.message?.trim()) { + throw new Error("Message (--message) is required"); + } + const localOpts = { ...opts, agentId: opts.agent,