CLI: add --subscribe-session-key mode for tailing agent events

This commit is contained in:
kumarabhirup 2026-02-21 11:03:11 -08:00
parent 6c3e144166
commit d57c2bbf36
No known key found for this signature in database
GPG Key ID: DB7CA2289CAB0167
2 changed files with 215 additions and 3 deletions

View File

@ -21,7 +21,7 @@ export function registerAgentCommands(program: Command, args: { agentChannelOpti
program
.command("agent")
.description("Run an agent turn via the Gateway (use --local for embedded)")
.requiredOption("-m, --message <text>", "Message body for the agent")
.option("-m, --message <text>", "Message body for the agent")
.option("-t, --to <number>", "Recipient number in E.164 used to derive the session key")
.option("--session-id <id>", "Use an explicit session id")
.option("--session-key <key>", "Explicit session key (e.g. agent:main:subagent:uuid)")
@ -44,6 +44,11 @@ export function registerAgentCommands(program: Command, args: { agentChannelOpti
.option("--deliver", "Send the agent's reply back to the selected channel", false)
.option("--json", "Output result as JSON", false)
.option("--stream-json", "Stream NDJSON events to stdout", false)
.option(
"--subscribe-session-key <key>",
"Subscribe to gateway events for a session key (subscribe-only, no message required)",
)
.option("--after-seq <n>", "Replay events after this global sequence cursor", "0")
.option(
"--timeout <seconds>",
"Override agent command timeout (seconds, default 600 or config value)",

View File

@ -4,8 +4,16 @@ import { listAgentIds } from "../agents/agent-scope.js";
import { DEFAULT_CHAT_CHANNEL } from "../channels/registry.js";
import { formatCliCommand } from "../cli/command-format.js";
import { withProgress } from "../cli/progress.js";
import { loadConfig } from "../config/config.js";
import { callGateway, randomIdempotencyKey } from "../gateway/call.js";
import { loadConfig, resolveConfigPath, resolveStateDir } from "../config/config.js";
import {
buildGatewayConnectionDetails,
callGateway,
randomIdempotencyKey,
} from "../gateway/call.js";
import { GatewayClient } from "../gateway/client.js";
import { PROTOCOL_VERSION } from "../gateway/protocol/index.js";
import { loadOrCreateDeviceIdentity } from "../infra/device-identity.js";
import { loadGatewayTlsRuntime } from "../infra/tls/gateway.js";
import { normalizeAgentId } from "../routing/session-key.js";
import {
GATEWAY_CLIENT_MODES,
@ -313,7 +321,206 @@ async function agentViaGatewayStreamJson(opts: AgentCliOpts, _runtime: RuntimeEn
}
}
/**
* Subscribe to a session key's events via the gateway `agent.subscribe` RPC.
* Streams NDJSON to stdout until SIGTERM/SIGINT.
*/
async function agentSubscribeStreamJson(
sessionKey: string,
afterSeq: number,
_runtime: RuntimeEnv,
): Promise<void> {
const cfg = loadConfig();
const isRemoteMode = cfg.gateway?.mode === "remote";
const remote = isRemoteMode ? cfg.gateway?.remote : undefined;
const remoteUrl =
typeof remote?.url === "string" && remote.url.trim().length > 0 ? remote.url.trim() : undefined;
if (isRemoteMode && !remoteUrl) {
const configPath = resolveConfigPath(process.env, resolveStateDir(process.env));
throw new Error(
[
"gateway remote mode misconfigured: gateway.remote.url missing",
`Config: ${configPath}`,
"Fix: set gateway.remote.url, or set gateway.mode=local.",
].join("\n"),
);
}
const connectionDetails = buildGatewayConnectionDetails({ config: cfg });
const useLocalTls =
cfg.gateway?.tls?.enabled === true && !remoteUrl && connectionDetails.url.startsWith("wss://");
const tlsRuntime = useLocalTls ? await loadGatewayTlsRuntime(cfg.gateway?.tls) : undefined;
const tlsFingerprint =
(isRemoteMode && remoteUrl && typeof remote?.tlsFingerprint === "string"
? remote.tlsFingerprint.trim()
: undefined) || (tlsRuntime?.enabled ? tlsRuntime.fingerprintSha256 : undefined);
const authToken = cfg.gateway?.auth?.token;
const authPassword = cfg.gateway?.auth?.password;
const token =
isRemoteMode && remoteUrl
? typeof remote?.token === "string" && remote.token.trim().length > 0
? remote.token.trim()
: undefined
: process.env.OPENCLAW_GATEWAY_TOKEN?.trim() ||
process.env.CLAWDBOT_GATEWAY_TOKEN?.trim() ||
(typeof authToken === "string" && authToken.trim().length > 0
? authToken.trim()
: undefined);
const password =
process.env.OPENCLAW_GATEWAY_PASSWORD?.trim() ||
process.env.CLAWDBOT_GATEWAY_PASSWORD?.trim() ||
(isRemoteMode && remoteUrl
? typeof remote?.password === "string" && remote.password.trim().length > 0
? remote.password.trim()
: undefined
: typeof authPassword === "string" && authPassword.trim().length > 0
? authPassword.trim()
: undefined);
let cursor = Math.max(0, Number.isFinite(afterSeq) ? afterSeq : 0);
const abortController = new AbortController();
let client: GatewayClient | null = null;
const onSignal = () => {
if (!abortController.signal.aborted) {
abortController.abort();
}
};
process.on("SIGTERM", onSignal);
process.on("SIGINT", onSignal);
try {
await new Promise<void>((resolve, reject) => {
let settled = false;
let aborting = false;
const settle = (err?: Error) => {
if (settled) {
return;
}
settled = true;
if (err) {
reject(err);
return;
}
resolve();
};
const onAbort = () => {
if (aborting) {
return;
}
aborting = true;
const unsubscribeAndStop = async () => {
try {
await client?.request("agent.unsubscribe", { sessionKey }).catch(() => {});
} finally {
client?.stop();
emitNdjsonLine({ event: "aborted", reason: "signal" });
settle();
}
};
void unsubscribeAndStop();
};
if (abortController.signal.aborted) {
onAbort();
return;
}
abortController.signal.addEventListener("abort", onAbort, { once: true });
client = new GatewayClient({
url: connectionDetails.url,
token,
password,
tlsFingerprint,
instanceId: randomIdempotencyKey(),
clientName: GATEWAY_CLIENT_NAMES.CLI,
clientVersion: "dev",
platform: process.platform,
mode: GATEWAY_CLIENT_MODES.CLI,
role: "operator",
scopes: ["operator.admin", "operator.approvals", "operator.pairing"],
caps: ["tool-events"],
deviceIdentity: loadOrCreateDeviceIdentity(),
minProtocol: PROTOCOL_VERSION,
maxProtocol: PROTOCOL_VERSION,
onHelloOk: async () => {
try {
await client?.request("agent.subscribe", { sessionKey, afterSeq: cursor });
} catch (err) {
client?.stop();
const error = err instanceof Error ? err : new Error(String(err));
settle(error);
}
},
onEvent: (evt) => {
if (evt.event !== "agent") {
return;
}
const payload =
evt.payload && typeof evt.payload === "object"
? (evt.payload as Record<string, unknown>)
: undefined;
if (!payload || payload.sessionKey !== sessionKey) {
return;
}
const globalSeq = typeof payload.globalSeq === "number" ? payload.globalSeq : undefined;
if (globalSeq !== undefined && globalSeq > cursor) {
cursor = globalSeq;
}
emitNdjsonLine({ event: evt.event, ...payload });
},
onConnectError: (err) => {
if (aborting || settled) {
return;
}
client?.stop();
settle(err);
},
onClose: (code, reason) => {
if (aborting || settled || abortController.signal.aborted) {
return;
}
// For reconnectable closes, let GatewayClient retry.
if (code === 1000) {
return;
}
client?.stop();
const reasonText = reason?.trim() || "no close reason";
settle(
new Error(
`gateway subscribe closed (${code}): ${reasonText}\n${connectionDetails.message}`,
),
);
},
});
client.start();
});
} catch (err) {
if (err instanceof DOMException && err.name === "AbortError") {
emitNdjsonLine({ event: "aborted", reason: "signal" });
return;
}
throw err;
} finally {
process.removeListener("SIGTERM", onSignal);
process.removeListener("SIGINT", onSignal);
}
}
export async function agentCliCommand(opts: AgentCliOpts, runtime: RuntimeEnv, deps?: CliDeps) {
// Subscribe-only mode: tail events for a session key with replay cursor.
const subscribeKey = (opts as Record<string, unknown>).subscribeSessionKey as string | undefined;
if (subscribeKey && opts.streamJson) {
const afterSeq =
Number.parseInt(String((opts as Record<string, unknown>).afterSeq ?? "0"), 10) || 0;
return await agentSubscribeStreamJson(subscribeKey.trim(), afterSeq, runtime);
}
// --message is required for all non-subscribe paths.
if (!opts.message?.trim()) {
throw new Error("Message (--message) is required");
}
const localOpts = {
...opts,
agentId: opts.agent,