From 7bd65836973f4c740c0f9360b707c8ed5dd01539 Mon Sep 17 00:00:00 2001 From: kumarabhirup Date: Mon, 2 Mar 2026 18:33:01 -0800 Subject: [PATCH] feat(web): add WebSocket gateway client and RPC support to agent-runner --- apps/web/lib/agent-runner.ts | 771 ++++++++++++++++++++++++++++++++++- 1 file changed, 768 insertions(+), 3 deletions(-) diff --git a/apps/web/lib/agent-runner.ts b/apps/web/lib/agent-runner.ts index 5578552ff76..0d061a46a65 100644 --- a/apps/web/lib/agent-runner.ts +++ b/apps/web/lib/agent-runner.ts @@ -1,6 +1,16 @@ import { spawn } from "node:child_process"; +import { randomUUID } from "node:crypto"; +import { EventEmitter } from "node:events"; +import { existsSync, readFileSync } from "node:fs"; +import { join } from "node:path"; import { createInterface } from "node:readline"; -import { getEffectiveProfile, resolveWorkspaceRoot } from "./workspace"; +import { PassThrough } from "node:stream"; +import NodeWebSocket from "ws"; +import { + getEffectiveProfile, + resolveOpenClawStateDir, + resolveWorkspaceRoot, +} from "./workspace"; export type AgentEvent = { event: string; @@ -129,6 +139,736 @@ export type AgentProcessHandle = { }; }; +type GatewayReqFrame = { + type: "req"; + id: string; + method: string; + params?: unknown; +}; + +type GatewayResFrame = { + type: "res"; + id: string; + ok: boolean; + payload?: unknown; + error?: unknown; +}; + +type GatewayEventFrame = { + type: "event"; + event: string; + seq?: number; + payload?: unknown; +}; + +type GatewayFrame = + | GatewayReqFrame + | GatewayResFrame + | GatewayEventFrame + | { type?: string; [key: string]: unknown }; + +type GatewayConnectionSettings = { + url: string; + token?: string; + password?: string; +}; + +type PendingGatewayRequest = { + resolve: (value: GatewayResFrame) => void; + reject: (error: Error) => void; + timeout: ReturnType; +}; + +type SpawnGatewayProcessParams = { + mode: "start" | "subscribe"; + message?: string; + sessionKey?: string; + afterSeq: number; +}; + +type BuildConnectParamsOptions = { + clientMode?: "webchat" | "backend" | "cli" | "ui" | "node" | "probe" | "test"; + caps?: string[]; +}; + +const DEFAULT_GATEWAY_PORT = 18_789; +const OPEN_TIMEOUT_MS = 8_000; +const REQUEST_TIMEOUT_MS = 12_000; +const DEFAULT_GATEWAY_CLIENT_CAPS = ["tool-events"]; +const SESSIONS_PATCH_RETRY_DELAY_MS = 150; +const SESSIONS_PATCH_MAX_ATTEMPTS = 2; + +type AgentSubscribeSupport = "unknown" | "supported" | "unsupported"; +let cachedAgentSubscribeSupport: AgentSubscribeSupport = "unknown"; + +function asRecord(value: unknown): Record | null { + if (!value || typeof value !== "object" || Array.isArray(value)) { + return null; + } + return value as Record; +} + +function parseJsonObject(raw: string): Record | null { + try { + const parsed = JSON.parse(raw) as unknown; + return asRecord(parsed); + } catch { + return null; + } +} + +function parsePort(value: unknown): number | undefined { + if (typeof value === "number" && Number.isFinite(value) && value > 0) { + return Math.floor(value); + } + if (typeof value === "string") { + const parsed = Number.parseInt(value, 10); + if (Number.isFinite(parsed) && parsed > 0) { + return parsed; + } + } + return undefined; +} + +function normalizeWsUrl(raw: string, fallbackPort: number): string { + const withScheme = raw.includes("://") ? raw : `ws://${raw}`; + const url = new URL(withScheme); + if (url.protocol === "http:") { + url.protocol = "ws:"; + } else if (url.protocol === "https:") { + url.protocol = "wss:"; + } + if (!url.port) { + url.port = url.protocol === "wss:" ? "443" : String(fallbackPort); + } + return url.toString(); +} + +function readGatewayConfigFromStateDir( + stateDir: string, +): Record | null { + const candidates = [join(stateDir, "openclaw.json"), join(stateDir, "config.json")]; + for (const candidate of candidates) { + if (!existsSync(candidate)) { + continue; + } + try { + const parsed = parseJsonObject(readFileSync(candidate, "utf-8")); + if (parsed) { + return parsed; + } + } catch { + // Ignore malformed config and continue to fallback behavior. + } + } + return null; +} + +function resolveGatewayConnectionSettings(): GatewayConnectionSettings { + const envUrl = process.env.OPENCLAW_GATEWAY_URL?.trim(); + const envToken = process.env.OPENCLAW_GATEWAY_TOKEN?.trim(); + const envPassword = process.env.OPENCLAW_GATEWAY_PASSWORD?.trim(); + const envPort = parsePort(process.env.OPENCLAW_GATEWAY_PORT); + + const stateDir = resolveOpenClawStateDir(); + const config = readGatewayConfigFromStateDir(stateDir); + const gateway = asRecord(config?.gateway); + const remote = asRecord(gateway?.remote); + const auth = asRecord(gateway?.auth); + + const gatewayPort = envPort ?? parsePort(gateway?.port) ?? DEFAULT_GATEWAY_PORT; + const gatewayMode = + typeof gateway?.mode === "string" ? gateway.mode.trim().toLowerCase() : ""; + const remoteUrl = + typeof remote?.url === "string" ? remote.url.trim() : undefined; + const useRemote = !envUrl && gatewayMode === "remote" && Boolean(remoteUrl); + + const rawUrl = envUrl || (useRemote ? remoteUrl! : `ws://127.0.0.1:${gatewayPort}`); + const url = normalizeWsUrl(rawUrl, gatewayPort); + + const token = + envToken || + (useRemote && typeof remote?.token === "string" + ? remote.token.trim() + : undefined) || + (typeof auth?.token === "string" ? auth.token.trim() : undefined); + + const password = + envPassword || + (useRemote && typeof remote?.password === "string" + ? remote.password.trim() + : undefined) || + (typeof auth?.password === "string" ? auth.password.trim() : undefined); + + return { url, token, password }; +} + +export function buildConnectParams( + settings: GatewayConnectionSettings, + options?: BuildConnectParamsOptions, +): Record { + const optionCaps = options?.caps; + const caps = Array.isArray(optionCaps) + ? optionCaps.filter( + (cap): cap is string => typeof cap === "string" && cap.trim().length > 0, + ) + : DEFAULT_GATEWAY_CLIENT_CAPS; + const clientMode = options?.clientMode ?? "backend"; + const auth = + settings.token || settings.password + ? { + ...(settings.token ? { token: settings.token } : {}), + ...(settings.password ? { password: settings.password } : {}), + } + : undefined; + + return { + minProtocol: 3, + maxProtocol: 3, + client: { + id: "gateway-client", + version: "dev", + platform: process.platform, + mode: clientMode, + instanceId: "ironclaw-web-server", + }, + locale: "en-US", + userAgent: "ironclaw-web", + role: "operator", + scopes: ["operator.read", "operator.write", "operator.admin"], + caps, + ...(auth ? { auth } : {}), + }; +} + +function frameErrorMessage(frame: GatewayResFrame): string { + const error = asRecord(frame.error); + if (typeof error?.message === "string" && error.message.trim()) { + return error.message; + } + if (typeof frame.error === "string" && frame.error.trim()) { + return frame.error; + } + return "Gateway request failed"; +} + +function isUnknownMethodResponse( + frame: GatewayResFrame, + methodName: string, +): boolean { + const message = frameErrorMessage(frame).trim().toLowerCase(); + if (!message.includes("unknown method")) { + return false; + } + return message.includes(methodName.toLowerCase()); +} + +function isRetryableGatewayMessage(message: string): boolean { + const normalized = message.trim().toLowerCase(); + if (!normalized) { + return false; + } + return ( + normalized.includes("timeout") || + normalized.includes("timed out") || + normalized.includes("temporar") || + normalized.includes("unavailable") || + normalized.includes("try again") || + normalized.includes("connection closed") || + normalized.includes("connection reset") + ); +} + +function toMessageText(data: unknown): string | null { + if (typeof data === "string") { + return data; + } + if (data instanceof ArrayBuffer) { + return Buffer.from(data).toString("utf-8"); + } + if (ArrayBuffer.isView(data)) { + return Buffer.from(data.buffer, data.byteOffset, data.byteLength).toString( + "utf-8", + ); + } + return null; +} + +class GatewayWsClient { + private ws: NodeWebSocket | null = null; + private pending = new Map(); + private closed = false; + + constructor( + private readonly settings: GatewayConnectionSettings, + private readonly onEvent: (frame: GatewayEventFrame) => void, + private readonly onClose: (code: number, reason: string) => void, + ) {} + + async open(timeoutMs = OPEN_TIMEOUT_MS): Promise { + if (this.ws) { + return; + } + const gatewayOrigin = this.settings.url + .replace(/^ws:/, "http:") + .replace(/^wss:/, "https:"); + const ws = new NodeWebSocket(this.settings.url, { + headers: { Origin: gatewayOrigin }, + }); + this.ws = ws; + + await new Promise((resolve, reject) => { + let settled = false; + const timer = setTimeout(() => { + if (settled) { + return; + } + settled = true; + reject(new Error("Gateway WebSocket open timeout")); + }, timeoutMs); + + const onOpen = () => { + if (settled) { + return; + } + settled = true; + clearTimeout(timer); + resolve(); + }; + + const onError = () => { + if (settled) { + return; + } + settled = true; + clearTimeout(timer); + reject(new Error("Gateway WebSocket connection failed")); + }; + + ws.once("open", onOpen); + ws.once("error", onError); + }); + + ws.on("message", (data: NodeWebSocket.RawData) => { + const text = toMessageText(data); + if (text != null) { + this.handleMessageText(text); + } + }); + + ws.on("close", (code: number, reason: Buffer) => { + if (this.closed) { + return; + } + this.closed = true; + this.flushPending(new Error("Gateway connection closed")); + this.onClose(code, reason.toString("utf-8")); + }); + } + + request( + method: string, + params?: unknown, + timeoutMs = REQUEST_TIMEOUT_MS, + ): Promise { + const ws = this.ws; + if (!ws || ws.readyState !== NodeWebSocket.OPEN) { + return Promise.reject(new Error("Gateway WebSocket is not connected")); + } + + return new Promise((resolve, reject) => { + const id = randomUUID(); + const frame: GatewayReqFrame = { type: "req", id, method, params }; + const timeout = setTimeout(() => { + this.pending.delete(id); + reject(new Error(`Gateway request timed out (${method})`)); + }, timeoutMs); + this.pending.set(id, { resolve, reject, timeout }); + ws.send(JSON.stringify(frame)); + }); + } + + close(code?: number, reason?: string): void { + if (this.closed) { + return; + } + this.closed = true; + this.flushPending(new Error("Gateway connection closed")); + try { + this.ws?.close(code, reason); + } catch { + // Ignore socket close failures. + } + } + + private flushPending(error: Error): void { + for (const pending of this.pending.values()) { + clearTimeout(pending.timeout); + pending.reject(error); + } + this.pending.clear(); + } + + private handleMessageText(text: string): void { + let frame: GatewayFrame | null = null; + try { + frame = JSON.parse(text) as GatewayFrame; + } catch { + return; + } + if (!frame || typeof frame !== "object" || !("type" in frame)) { + return; + } + + if (frame.type === "res") { + const response = frame as GatewayResFrame; + const pending = this.pending.get(response.id); + if (!pending) { + return; + } + this.pending.delete(response.id); + clearTimeout(pending.timeout); + pending.resolve(response); + return; + } + + if (frame.type === "event") { + this.onEvent(frame as GatewayEventFrame); + } + } +} + +class GatewayProcessHandle + extends EventEmitter + implements AgentProcessHandle +{ + public readonly stdout: NodeJS.ReadableStream | null = new PassThrough(); + public readonly stderr: NodeJS.ReadableStream | null = new PassThrough(); + private client: GatewayWsClient | null = null; + private finished = false; + private closeScheduled = false; + private requestedClose = false; + private runId: string | null = null; + + constructor(private readonly params: SpawnGatewayProcessParams) { + super(); + void this.start(); + } + + kill(signal?: NodeJS.Signals | number): boolean { + if (this.finished) { + return false; + } + this.requestedClose = true; + this.client?.close(); + const closeSignal = typeof signal === "string" ? signal : null; + this.finish(0, closeSignal); + return true; + } + + private async start(): Promise { + try { + const settings = resolveGatewayConnectionSettings(); + this.client = new GatewayWsClient( + settings, + (frame) => this.handleGatewayEvent(frame), + (code, reason) => this.handleSocketClose(code, reason), + ); + await this.client.open(); + const connectRes = await this.client.request( + "connect", + buildConnectParams(settings), + ); + if (!connectRes.ok) { + throw new Error(frameErrorMessage(connectRes)); + } + + if (this.params.sessionKey) { + await this.ensureFullToolVerbose(this.params.sessionKey); + } + + if (this.params.mode === "start") { + const sessionKey = this.params.sessionKey; + const startRes = await this.client.request("agent", { + message: this.params.message ?? "", + idempotencyKey: randomUUID(), + ...(sessionKey ? { sessionKey } : {}), + deliver: false, + channel: "webchat", + lane: "web", + timeout: 0, + }); + if (!startRes.ok) { + throw new Error(frameErrorMessage(startRes)); + } + const payload = asRecord(startRes.payload); + const runId = + payload && typeof payload.runId === "string" ? payload.runId : null; + this.runId = runId; + } else { + const sessionKey = this.params.sessionKey; + if (!sessionKey) { + throw new Error("Missing session key for subscribe mode"); + } + if (cachedAgentSubscribeSupport !== "unsupported") { + const subscribeRes = await this.client.request("agent.subscribe", { + sessionKey, + afterSeq: Math.max( + 0, + Number.isFinite(this.params.afterSeq) ? this.params.afterSeq : 0, + ), + }); + if (!subscribeRes.ok) { + if (isUnknownMethodResponse(subscribeRes, "agent.subscribe")) { + cachedAgentSubscribeSupport = "unsupported"; + (this.stderr as PassThrough).write( + "[gateway] agent.subscribe unavailable; using passive session filter mode\n", + ); + } else { + throw new Error(frameErrorMessage(subscribeRes)); + } + } else { + cachedAgentSubscribeSupport = "supported"; + } + } + } + } catch (error) { + const err = + error instanceof Error ? error : new Error(String(error)); + (this.stderr as PassThrough).write(`${err.message}\n`); + this.emit("error", err); + this.finish(1, null); + } + } + + private async ensureFullToolVerbose(sessionKey: string): Promise { + if (!this.client || !sessionKey.trim()) { + return; + } + let attempt = 0; + let lastMessage = ""; + while (attempt < SESSIONS_PATCH_MAX_ATTEMPTS) { + attempt += 1; + try { + const patch = await this.client.request("sessions.patch", { + key: sessionKey, + verboseLevel: "full", + }); + if (patch.ok) { + return; + } + lastMessage = frameErrorMessage(patch); + if ( + attempt >= SESSIONS_PATCH_MAX_ATTEMPTS || + !isRetryableGatewayMessage(lastMessage) + ) { + break; + } + } catch (error) { + lastMessage = + error instanceof Error ? error.message : String(error); + if ( + attempt >= SESSIONS_PATCH_MAX_ATTEMPTS || + !isRetryableGatewayMessage(lastMessage) + ) { + break; + } + } + await new Promise((resolve) => + setTimeout(resolve, SESSIONS_PATCH_RETRY_DELAY_MS), + ); + } + if (lastMessage.trim()) { + (this.stderr as PassThrough).write( + `[gateway] sessions.patch verboseLevel=full failed: ${lastMessage}\n`, + ); + } + } + + private shouldAcceptSessionEvent(sessionKey: string | undefined): boolean { + const expected = this.params.sessionKey; + if (!expected) { + return true; + } + if (this.params.mode === "subscribe") { + // Subscribe mode should only accept explicit events for the target session. + return sessionKey === expected; + } + if (!sessionKey) { + return true; + } + return sessionKey === expected; + } + + private handleGatewayEvent(frame: GatewayEventFrame): void { + if (this.finished) { + return; + } + if (frame.event === "connect.challenge") { + return; + } + + if (frame.event === "agent") { + const payload = asRecord(frame.payload); + if (!payload) { + return; + } + const sessionKey = + typeof payload.sessionKey === "string" ? payload.sessionKey : undefined; + if (!this.shouldAcceptSessionEvent(sessionKey)) { + return; + } + const runId = typeof payload.runId === "string" ? payload.runId : undefined; + if (this.runId && runId && runId !== this.runId) { + return; + } + const payloadGlobalSeq = + typeof payload.globalSeq === "number" ? payload.globalSeq : undefined; + const eventGlobalSeq = + payloadGlobalSeq ?? + (typeof frame.seq === "number" ? frame.seq : undefined); + if ( + typeof eventGlobalSeq === "number" && + eventGlobalSeq <= this.params.afterSeq + ) { + return; + } + + const event: AgentEvent = { + event: "agent", + ...(runId ? { runId } : {}), + ...(typeof payload.stream === "string" ? { stream: payload.stream } : {}), + ...(asRecord(payload.data) ? { data: payload.data as Record } : {}), + ...(typeof payload.seq === "number" ? { seq: payload.seq } : {}), + ...(typeof eventGlobalSeq === "number" + ? { globalSeq: eventGlobalSeq } + : {}), + ...(typeof payload.ts === "number" ? { ts: payload.ts } : {}), + ...(sessionKey ? { sessionKey } : {}), + }; + + (this.stdout as PassThrough).write(`${JSON.stringify(event)}\n`); + + const stream = typeof payload.stream === "string" ? payload.stream : ""; + const data = asRecord(payload.data); + const phase = data && typeof data.phase === "string" ? data.phase : ""; + if ( + this.params.mode === "start" && + stream === "lifecycle" && + (phase === "end" || phase === "error") + ) { + this.scheduleClose(); + } + return; + } + + if (frame.event === "error") { + const payload = asRecord(frame.payload) ?? {}; + const sessionKey = + typeof payload.sessionKey === "string" ? payload.sessionKey : undefined; + if (!this.shouldAcceptSessionEvent(sessionKey)) { + return; + } + const payloadGlobalSeq = + typeof payload.globalSeq === "number" ? payload.globalSeq : undefined; + const eventGlobalSeq = + payloadGlobalSeq ?? + (typeof frame.seq === "number" ? frame.seq : undefined); + const event: AgentEvent = { + event: "error", + data: payload, + ...(typeof eventGlobalSeq === "number" + ? { globalSeq: eventGlobalSeq } + : {}), + ...(sessionKey ? { sessionKey } : {}), + }; + (this.stdout as PassThrough).write(`${JSON.stringify(event)}\n`); + if (this.params.mode === "start") { + this.scheduleClose(); + } + } + } + + private scheduleClose(): void { + if (this.closeScheduled || this.finished) { + return; + } + this.closeScheduled = true; + setTimeout(() => { + if (this.finished) { + return; + } + this.requestedClose = true; + this.client?.close(); + this.finish(0, null); + }, 25); + } + + private handleSocketClose(code: number, reason: string): void { + if (this.finished) { + return; + } + if (!this.requestedClose) { + const detail = reason.trim() || `code ${code}`; + (this.stderr as PassThrough).write(`Gateway connection closed: ${detail}\n`); + } + const exitCode = this.requestedClose || code === 1000 || code === 1005 ? 0 : 1; + this.finish(exitCode, null); + } + + private finish(code: number | null, signal: NodeJS.Signals | null): void { + if (this.finished) { + return; + } + this.finished = true; + try { + (this.stdout as PassThrough).end(); + (this.stderr as PassThrough).end(); + } catch { + // Ignore stream close errors. + } + this.emit("close", code, signal); + } +} + +function shouldForceLegacyStream(): boolean { + const raw = process.env.IRONCLAW_WEB_FORCE_LEGACY_STREAM?.trim().toLowerCase(); + return raw === "1" || raw === "true" || raw === "yes"; +} + +export async function callGatewayRpc( + method: string, + params?: Record, + options?: { timeoutMs?: number }, +): Promise { + const settings = resolveGatewayConnectionSettings(); + let closed = false; + const client = new GatewayWsClient(settings, () => {}, () => { + closed = true; + }); + await client.open(); + try { + const connect = await client.request( + "connect", + buildConnectParams(settings), + options?.timeoutMs ?? REQUEST_TIMEOUT_MS, + ); + if (!connect.ok) { + throw new Error(frameErrorMessage(connect)); + } + const result = await client.request( + method, + params, + options?.timeoutMs ?? REQUEST_TIMEOUT_MS, + ); + return result; + } finally { + if (!closed) { + client.close(); + } + } +} + /** * Spawn an agent child process and return the ChildProcess handle. * Shared between `runAgent` (legacy callback API) and the ActiveRunManager. @@ -137,12 +877,30 @@ export function spawnAgentProcess( message: string, agentSessionId?: string, ): AgentProcessHandle { - return spawnLegacyAgentProcess(message, agentSessionId); + if (shouldForceLegacyStream()) { + return spawnLegacyAgentProcess(message, agentSessionId); + } + const sessionKey = agentSessionId + ? `agent:main:web:${agentSessionId}` + : undefined; + return new GatewayProcessHandle({ + mode: "start", + message, + sessionKey, + afterSeq: 0, + }); } function spawnLegacyAgentProcess( message: string, agentSessionId?: string, +): ReturnType { + return spawnCliAgentProcess(message, agentSessionId); +} + +function spawnCliAgentProcess( + message: string, + agentSessionId?: string, ): ReturnType { const args = [ "agent", @@ -178,7 +936,14 @@ export function spawnAgentSubscribeProcess( sessionKey: string, afterSeq = 0, ): AgentProcessHandle { - return spawnLegacyAgentSubscribeProcess(sessionKey, afterSeq); + if (shouldForceLegacyStream()) { + return spawnLegacyAgentSubscribeProcess(sessionKey, afterSeq); + } + return new GatewayProcessHandle({ + mode: "subscribe", + sessionKey, + afterSeq: Math.max(0, Number.isFinite(afterSeq) ? afterSeq : 0), + }); } function spawnLegacyAgentSubscribeProcess(