feat(web): add WebSocket gateway client and RPC support to agent-runner
This commit is contained in:
parent
ba1a66d222
commit
7bd6583697
@ -1,6 +1,16 @@
|
||||
import { spawn } from "node:child_process";
|
||||
import { randomUUID } from "node:crypto";
|
||||
import { EventEmitter } from "node:events";
|
||||
import { existsSync, readFileSync } from "node:fs";
|
||||
import { join } from "node:path";
|
||||
import { createInterface } from "node:readline";
|
||||
import { getEffectiveProfile, resolveWorkspaceRoot } from "./workspace";
|
||||
import { PassThrough } from "node:stream";
|
||||
import NodeWebSocket from "ws";
|
||||
import {
|
||||
getEffectiveProfile,
|
||||
resolveOpenClawStateDir,
|
||||
resolveWorkspaceRoot,
|
||||
} from "./workspace";
|
||||
|
||||
export type AgentEvent = {
|
||||
event: string;
|
||||
@ -129,6 +139,736 @@ export type AgentProcessHandle = {
|
||||
};
|
||||
};
|
||||
|
||||
type GatewayReqFrame = {
|
||||
type: "req";
|
||||
id: string;
|
||||
method: string;
|
||||
params?: unknown;
|
||||
};
|
||||
|
||||
type GatewayResFrame = {
|
||||
type: "res";
|
||||
id: string;
|
||||
ok: boolean;
|
||||
payload?: unknown;
|
||||
error?: unknown;
|
||||
};
|
||||
|
||||
type GatewayEventFrame = {
|
||||
type: "event";
|
||||
event: string;
|
||||
seq?: number;
|
||||
payload?: unknown;
|
||||
};
|
||||
|
||||
type GatewayFrame =
|
||||
| GatewayReqFrame
|
||||
| GatewayResFrame
|
||||
| GatewayEventFrame
|
||||
| { type?: string; [key: string]: unknown };
|
||||
|
||||
type GatewayConnectionSettings = {
|
||||
url: string;
|
||||
token?: string;
|
||||
password?: string;
|
||||
};
|
||||
|
||||
type PendingGatewayRequest = {
|
||||
resolve: (value: GatewayResFrame) => void;
|
||||
reject: (error: Error) => void;
|
||||
timeout: ReturnType<typeof setTimeout>;
|
||||
};
|
||||
|
||||
type SpawnGatewayProcessParams = {
|
||||
mode: "start" | "subscribe";
|
||||
message?: string;
|
||||
sessionKey?: string;
|
||||
afterSeq: number;
|
||||
};
|
||||
|
||||
type BuildConnectParamsOptions = {
|
||||
clientMode?: "webchat" | "backend" | "cli" | "ui" | "node" | "probe" | "test";
|
||||
caps?: string[];
|
||||
};
|
||||
|
||||
const DEFAULT_GATEWAY_PORT = 18_789;
|
||||
const OPEN_TIMEOUT_MS = 8_000;
|
||||
const REQUEST_TIMEOUT_MS = 12_000;
|
||||
const DEFAULT_GATEWAY_CLIENT_CAPS = ["tool-events"];
|
||||
const SESSIONS_PATCH_RETRY_DELAY_MS = 150;
|
||||
const SESSIONS_PATCH_MAX_ATTEMPTS = 2;
|
||||
|
||||
type AgentSubscribeSupport = "unknown" | "supported" | "unsupported";
|
||||
let cachedAgentSubscribeSupport: AgentSubscribeSupport = "unknown";
|
||||
|
||||
function asRecord(value: unknown): Record<string, unknown> | null {
|
||||
if (!value || typeof value !== "object" || Array.isArray(value)) {
|
||||
return null;
|
||||
}
|
||||
return value as Record<string, unknown>;
|
||||
}
|
||||
|
||||
function parseJsonObject(raw: string): Record<string, unknown> | null {
|
||||
try {
|
||||
const parsed = JSON.parse(raw) as unknown;
|
||||
return asRecord(parsed);
|
||||
} catch {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
function parsePort(value: unknown): number | undefined {
|
||||
if (typeof value === "number" && Number.isFinite(value) && value > 0) {
|
||||
return Math.floor(value);
|
||||
}
|
||||
if (typeof value === "string") {
|
||||
const parsed = Number.parseInt(value, 10);
|
||||
if (Number.isFinite(parsed) && parsed > 0) {
|
||||
return parsed;
|
||||
}
|
||||
}
|
||||
return undefined;
|
||||
}
|
||||
|
||||
function normalizeWsUrl(raw: string, fallbackPort: number): string {
|
||||
const withScheme = raw.includes("://") ? raw : `ws://${raw}`;
|
||||
const url = new URL(withScheme);
|
||||
if (url.protocol === "http:") {
|
||||
url.protocol = "ws:";
|
||||
} else if (url.protocol === "https:") {
|
||||
url.protocol = "wss:";
|
||||
}
|
||||
if (!url.port) {
|
||||
url.port = url.protocol === "wss:" ? "443" : String(fallbackPort);
|
||||
}
|
||||
return url.toString();
|
||||
}
|
||||
|
||||
function readGatewayConfigFromStateDir(
|
||||
stateDir: string,
|
||||
): Record<string, unknown> | null {
|
||||
const candidates = [join(stateDir, "openclaw.json"), join(stateDir, "config.json")];
|
||||
for (const candidate of candidates) {
|
||||
if (!existsSync(candidate)) {
|
||||
continue;
|
||||
}
|
||||
try {
|
||||
const parsed = parseJsonObject(readFileSync(candidate, "utf-8"));
|
||||
if (parsed) {
|
||||
return parsed;
|
||||
}
|
||||
} catch {
|
||||
// Ignore malformed config and continue to fallback behavior.
|
||||
}
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
function resolveGatewayConnectionSettings(): GatewayConnectionSettings {
|
||||
const envUrl = process.env.OPENCLAW_GATEWAY_URL?.trim();
|
||||
const envToken = process.env.OPENCLAW_GATEWAY_TOKEN?.trim();
|
||||
const envPassword = process.env.OPENCLAW_GATEWAY_PASSWORD?.trim();
|
||||
const envPort = parsePort(process.env.OPENCLAW_GATEWAY_PORT);
|
||||
|
||||
const stateDir = resolveOpenClawStateDir();
|
||||
const config = readGatewayConfigFromStateDir(stateDir);
|
||||
const gateway = asRecord(config?.gateway);
|
||||
const remote = asRecord(gateway?.remote);
|
||||
const auth = asRecord(gateway?.auth);
|
||||
|
||||
const gatewayPort = envPort ?? parsePort(gateway?.port) ?? DEFAULT_GATEWAY_PORT;
|
||||
const gatewayMode =
|
||||
typeof gateway?.mode === "string" ? gateway.mode.trim().toLowerCase() : "";
|
||||
const remoteUrl =
|
||||
typeof remote?.url === "string" ? remote.url.trim() : undefined;
|
||||
const useRemote = !envUrl && gatewayMode === "remote" && Boolean(remoteUrl);
|
||||
|
||||
const rawUrl = envUrl || (useRemote ? remoteUrl! : `ws://127.0.0.1:${gatewayPort}`);
|
||||
const url = normalizeWsUrl(rawUrl, gatewayPort);
|
||||
|
||||
const token =
|
||||
envToken ||
|
||||
(useRemote && typeof remote?.token === "string"
|
||||
? remote.token.trim()
|
||||
: undefined) ||
|
||||
(typeof auth?.token === "string" ? auth.token.trim() : undefined);
|
||||
|
||||
const password =
|
||||
envPassword ||
|
||||
(useRemote && typeof remote?.password === "string"
|
||||
? remote.password.trim()
|
||||
: undefined) ||
|
||||
(typeof auth?.password === "string" ? auth.password.trim() : undefined);
|
||||
|
||||
return { url, token, password };
|
||||
}
|
||||
|
||||
export function buildConnectParams(
|
||||
settings: GatewayConnectionSettings,
|
||||
options?: BuildConnectParamsOptions,
|
||||
): Record<string, unknown> {
|
||||
const optionCaps = options?.caps;
|
||||
const caps = Array.isArray(optionCaps)
|
||||
? optionCaps.filter(
|
||||
(cap): cap is string => typeof cap === "string" && cap.trim().length > 0,
|
||||
)
|
||||
: DEFAULT_GATEWAY_CLIENT_CAPS;
|
||||
const clientMode = options?.clientMode ?? "backend";
|
||||
const auth =
|
||||
settings.token || settings.password
|
||||
? {
|
||||
...(settings.token ? { token: settings.token } : {}),
|
||||
...(settings.password ? { password: settings.password } : {}),
|
||||
}
|
||||
: undefined;
|
||||
|
||||
return {
|
||||
minProtocol: 3,
|
||||
maxProtocol: 3,
|
||||
client: {
|
||||
id: "gateway-client",
|
||||
version: "dev",
|
||||
platform: process.platform,
|
||||
mode: clientMode,
|
||||
instanceId: "ironclaw-web-server",
|
||||
},
|
||||
locale: "en-US",
|
||||
userAgent: "ironclaw-web",
|
||||
role: "operator",
|
||||
scopes: ["operator.read", "operator.write", "operator.admin"],
|
||||
caps,
|
||||
...(auth ? { auth } : {}),
|
||||
};
|
||||
}
|
||||
|
||||
function frameErrorMessage(frame: GatewayResFrame): string {
|
||||
const error = asRecord(frame.error);
|
||||
if (typeof error?.message === "string" && error.message.trim()) {
|
||||
return error.message;
|
||||
}
|
||||
if (typeof frame.error === "string" && frame.error.trim()) {
|
||||
return frame.error;
|
||||
}
|
||||
return "Gateway request failed";
|
||||
}
|
||||
|
||||
function isUnknownMethodResponse(
|
||||
frame: GatewayResFrame,
|
||||
methodName: string,
|
||||
): boolean {
|
||||
const message = frameErrorMessage(frame).trim().toLowerCase();
|
||||
if (!message.includes("unknown method")) {
|
||||
return false;
|
||||
}
|
||||
return message.includes(methodName.toLowerCase());
|
||||
}
|
||||
|
||||
function isRetryableGatewayMessage(message: string): boolean {
|
||||
const normalized = message.trim().toLowerCase();
|
||||
if (!normalized) {
|
||||
return false;
|
||||
}
|
||||
return (
|
||||
normalized.includes("timeout") ||
|
||||
normalized.includes("timed out") ||
|
||||
normalized.includes("temporar") ||
|
||||
normalized.includes("unavailable") ||
|
||||
normalized.includes("try again") ||
|
||||
normalized.includes("connection closed") ||
|
||||
normalized.includes("connection reset")
|
||||
);
|
||||
}
|
||||
|
||||
function toMessageText(data: unknown): string | null {
|
||||
if (typeof data === "string") {
|
||||
return data;
|
||||
}
|
||||
if (data instanceof ArrayBuffer) {
|
||||
return Buffer.from(data).toString("utf-8");
|
||||
}
|
||||
if (ArrayBuffer.isView(data)) {
|
||||
return Buffer.from(data.buffer, data.byteOffset, data.byteLength).toString(
|
||||
"utf-8",
|
||||
);
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
class GatewayWsClient {
|
||||
private ws: NodeWebSocket | null = null;
|
||||
private pending = new Map<string, PendingGatewayRequest>();
|
||||
private closed = false;
|
||||
|
||||
constructor(
|
||||
private readonly settings: GatewayConnectionSettings,
|
||||
private readonly onEvent: (frame: GatewayEventFrame) => void,
|
||||
private readonly onClose: (code: number, reason: string) => void,
|
||||
) {}
|
||||
|
||||
async open(timeoutMs = OPEN_TIMEOUT_MS): Promise<void> {
|
||||
if (this.ws) {
|
||||
return;
|
||||
}
|
||||
const gatewayOrigin = this.settings.url
|
||||
.replace(/^ws:/, "http:")
|
||||
.replace(/^wss:/, "https:");
|
||||
const ws = new NodeWebSocket(this.settings.url, {
|
||||
headers: { Origin: gatewayOrigin },
|
||||
});
|
||||
this.ws = ws;
|
||||
|
||||
await new Promise<void>((resolve, reject) => {
|
||||
let settled = false;
|
||||
const timer = setTimeout(() => {
|
||||
if (settled) {
|
||||
return;
|
||||
}
|
||||
settled = true;
|
||||
reject(new Error("Gateway WebSocket open timeout"));
|
||||
}, timeoutMs);
|
||||
|
||||
const onOpen = () => {
|
||||
if (settled) {
|
||||
return;
|
||||
}
|
||||
settled = true;
|
||||
clearTimeout(timer);
|
||||
resolve();
|
||||
};
|
||||
|
||||
const onError = () => {
|
||||
if (settled) {
|
||||
return;
|
||||
}
|
||||
settled = true;
|
||||
clearTimeout(timer);
|
||||
reject(new Error("Gateway WebSocket connection failed"));
|
||||
};
|
||||
|
||||
ws.once("open", onOpen);
|
||||
ws.once("error", onError);
|
||||
});
|
||||
|
||||
ws.on("message", (data: NodeWebSocket.RawData) => {
|
||||
const text = toMessageText(data);
|
||||
if (text != null) {
|
||||
this.handleMessageText(text);
|
||||
}
|
||||
});
|
||||
|
||||
ws.on("close", (code: number, reason: Buffer) => {
|
||||
if (this.closed) {
|
||||
return;
|
||||
}
|
||||
this.closed = true;
|
||||
this.flushPending(new Error("Gateway connection closed"));
|
||||
this.onClose(code, reason.toString("utf-8"));
|
||||
});
|
||||
}
|
||||
|
||||
request(
|
||||
method: string,
|
||||
params?: unknown,
|
||||
timeoutMs = REQUEST_TIMEOUT_MS,
|
||||
): Promise<GatewayResFrame> {
|
||||
const ws = this.ws;
|
||||
if (!ws || ws.readyState !== NodeWebSocket.OPEN) {
|
||||
return Promise.reject(new Error("Gateway WebSocket is not connected"));
|
||||
}
|
||||
|
||||
return new Promise<GatewayResFrame>((resolve, reject) => {
|
||||
const id = randomUUID();
|
||||
const frame: GatewayReqFrame = { type: "req", id, method, params };
|
||||
const timeout = setTimeout(() => {
|
||||
this.pending.delete(id);
|
||||
reject(new Error(`Gateway request timed out (${method})`));
|
||||
}, timeoutMs);
|
||||
this.pending.set(id, { resolve, reject, timeout });
|
||||
ws.send(JSON.stringify(frame));
|
||||
});
|
||||
}
|
||||
|
||||
close(code?: number, reason?: string): void {
|
||||
if (this.closed) {
|
||||
return;
|
||||
}
|
||||
this.closed = true;
|
||||
this.flushPending(new Error("Gateway connection closed"));
|
||||
try {
|
||||
this.ws?.close(code, reason);
|
||||
} catch {
|
||||
// Ignore socket close failures.
|
||||
}
|
||||
}
|
||||
|
||||
private flushPending(error: Error): void {
|
||||
for (const pending of this.pending.values()) {
|
||||
clearTimeout(pending.timeout);
|
||||
pending.reject(error);
|
||||
}
|
||||
this.pending.clear();
|
||||
}
|
||||
|
||||
private handleMessageText(text: string): void {
|
||||
let frame: GatewayFrame | null = null;
|
||||
try {
|
||||
frame = JSON.parse(text) as GatewayFrame;
|
||||
} catch {
|
||||
return;
|
||||
}
|
||||
if (!frame || typeof frame !== "object" || !("type" in frame)) {
|
||||
return;
|
||||
}
|
||||
|
||||
if (frame.type === "res") {
|
||||
const response = frame as GatewayResFrame;
|
||||
const pending = this.pending.get(response.id);
|
||||
if (!pending) {
|
||||
return;
|
||||
}
|
||||
this.pending.delete(response.id);
|
||||
clearTimeout(pending.timeout);
|
||||
pending.resolve(response);
|
||||
return;
|
||||
}
|
||||
|
||||
if (frame.type === "event") {
|
||||
this.onEvent(frame as GatewayEventFrame);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
class GatewayProcessHandle
|
||||
extends EventEmitter
|
||||
implements AgentProcessHandle
|
||||
{
|
||||
public readonly stdout: NodeJS.ReadableStream | null = new PassThrough();
|
||||
public readonly stderr: NodeJS.ReadableStream | null = new PassThrough();
|
||||
private client: GatewayWsClient | null = null;
|
||||
private finished = false;
|
||||
private closeScheduled = false;
|
||||
private requestedClose = false;
|
||||
private runId: string | null = null;
|
||||
|
||||
constructor(private readonly params: SpawnGatewayProcessParams) {
|
||||
super();
|
||||
void this.start();
|
||||
}
|
||||
|
||||
kill(signal?: NodeJS.Signals | number): boolean {
|
||||
if (this.finished) {
|
||||
return false;
|
||||
}
|
||||
this.requestedClose = true;
|
||||
this.client?.close();
|
||||
const closeSignal = typeof signal === "string" ? signal : null;
|
||||
this.finish(0, closeSignal);
|
||||
return true;
|
||||
}
|
||||
|
||||
private async start(): Promise<void> {
|
||||
try {
|
||||
const settings = resolveGatewayConnectionSettings();
|
||||
this.client = new GatewayWsClient(
|
||||
settings,
|
||||
(frame) => this.handleGatewayEvent(frame),
|
||||
(code, reason) => this.handleSocketClose(code, reason),
|
||||
);
|
||||
await this.client.open();
|
||||
const connectRes = await this.client.request(
|
||||
"connect",
|
||||
buildConnectParams(settings),
|
||||
);
|
||||
if (!connectRes.ok) {
|
||||
throw new Error(frameErrorMessage(connectRes));
|
||||
}
|
||||
|
||||
if (this.params.sessionKey) {
|
||||
await this.ensureFullToolVerbose(this.params.sessionKey);
|
||||
}
|
||||
|
||||
if (this.params.mode === "start") {
|
||||
const sessionKey = this.params.sessionKey;
|
||||
const startRes = await this.client.request("agent", {
|
||||
message: this.params.message ?? "",
|
||||
idempotencyKey: randomUUID(),
|
||||
...(sessionKey ? { sessionKey } : {}),
|
||||
deliver: false,
|
||||
channel: "webchat",
|
||||
lane: "web",
|
||||
timeout: 0,
|
||||
});
|
||||
if (!startRes.ok) {
|
||||
throw new Error(frameErrorMessage(startRes));
|
||||
}
|
||||
const payload = asRecord(startRes.payload);
|
||||
const runId =
|
||||
payload && typeof payload.runId === "string" ? payload.runId : null;
|
||||
this.runId = runId;
|
||||
} else {
|
||||
const sessionKey = this.params.sessionKey;
|
||||
if (!sessionKey) {
|
||||
throw new Error("Missing session key for subscribe mode");
|
||||
}
|
||||
if (cachedAgentSubscribeSupport !== "unsupported") {
|
||||
const subscribeRes = await this.client.request("agent.subscribe", {
|
||||
sessionKey,
|
||||
afterSeq: Math.max(
|
||||
0,
|
||||
Number.isFinite(this.params.afterSeq) ? this.params.afterSeq : 0,
|
||||
),
|
||||
});
|
||||
if (!subscribeRes.ok) {
|
||||
if (isUnknownMethodResponse(subscribeRes, "agent.subscribe")) {
|
||||
cachedAgentSubscribeSupport = "unsupported";
|
||||
(this.stderr as PassThrough).write(
|
||||
"[gateway] agent.subscribe unavailable; using passive session filter mode\n",
|
||||
);
|
||||
} else {
|
||||
throw new Error(frameErrorMessage(subscribeRes));
|
||||
}
|
||||
} else {
|
||||
cachedAgentSubscribeSupport = "supported";
|
||||
}
|
||||
}
|
||||
}
|
||||
} catch (error) {
|
||||
const err =
|
||||
error instanceof Error ? error : new Error(String(error));
|
||||
(this.stderr as PassThrough).write(`${err.message}\n`);
|
||||
this.emit("error", err);
|
||||
this.finish(1, null);
|
||||
}
|
||||
}
|
||||
|
||||
private async ensureFullToolVerbose(sessionKey: string): Promise<void> {
|
||||
if (!this.client || !sessionKey.trim()) {
|
||||
return;
|
||||
}
|
||||
let attempt = 0;
|
||||
let lastMessage = "";
|
||||
while (attempt < SESSIONS_PATCH_MAX_ATTEMPTS) {
|
||||
attempt += 1;
|
||||
try {
|
||||
const patch = await this.client.request("sessions.patch", {
|
||||
key: sessionKey,
|
||||
verboseLevel: "full",
|
||||
});
|
||||
if (patch.ok) {
|
||||
return;
|
||||
}
|
||||
lastMessage = frameErrorMessage(patch);
|
||||
if (
|
||||
attempt >= SESSIONS_PATCH_MAX_ATTEMPTS ||
|
||||
!isRetryableGatewayMessage(lastMessage)
|
||||
) {
|
||||
break;
|
||||
}
|
||||
} catch (error) {
|
||||
lastMessage =
|
||||
error instanceof Error ? error.message : String(error);
|
||||
if (
|
||||
attempt >= SESSIONS_PATCH_MAX_ATTEMPTS ||
|
||||
!isRetryableGatewayMessage(lastMessage)
|
||||
) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
await new Promise((resolve) =>
|
||||
setTimeout(resolve, SESSIONS_PATCH_RETRY_DELAY_MS),
|
||||
);
|
||||
}
|
||||
if (lastMessage.trim()) {
|
||||
(this.stderr as PassThrough).write(
|
||||
`[gateway] sessions.patch verboseLevel=full failed: ${lastMessage}\n`,
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
private shouldAcceptSessionEvent(sessionKey: string | undefined): boolean {
|
||||
const expected = this.params.sessionKey;
|
||||
if (!expected) {
|
||||
return true;
|
||||
}
|
||||
if (this.params.mode === "subscribe") {
|
||||
// Subscribe mode should only accept explicit events for the target session.
|
||||
return sessionKey === expected;
|
||||
}
|
||||
if (!sessionKey) {
|
||||
return true;
|
||||
}
|
||||
return sessionKey === expected;
|
||||
}
|
||||
|
||||
private handleGatewayEvent(frame: GatewayEventFrame): void {
|
||||
if (this.finished) {
|
||||
return;
|
||||
}
|
||||
if (frame.event === "connect.challenge") {
|
||||
return;
|
||||
}
|
||||
|
||||
if (frame.event === "agent") {
|
||||
const payload = asRecord(frame.payload);
|
||||
if (!payload) {
|
||||
return;
|
||||
}
|
||||
const sessionKey =
|
||||
typeof payload.sessionKey === "string" ? payload.sessionKey : undefined;
|
||||
if (!this.shouldAcceptSessionEvent(sessionKey)) {
|
||||
return;
|
||||
}
|
||||
const runId = typeof payload.runId === "string" ? payload.runId : undefined;
|
||||
if (this.runId && runId && runId !== this.runId) {
|
||||
return;
|
||||
}
|
||||
const payloadGlobalSeq =
|
||||
typeof payload.globalSeq === "number" ? payload.globalSeq : undefined;
|
||||
const eventGlobalSeq =
|
||||
payloadGlobalSeq ??
|
||||
(typeof frame.seq === "number" ? frame.seq : undefined);
|
||||
if (
|
||||
typeof eventGlobalSeq === "number" &&
|
||||
eventGlobalSeq <= this.params.afterSeq
|
||||
) {
|
||||
return;
|
||||
}
|
||||
|
||||
const event: AgentEvent = {
|
||||
event: "agent",
|
||||
...(runId ? { runId } : {}),
|
||||
...(typeof payload.stream === "string" ? { stream: payload.stream } : {}),
|
||||
...(asRecord(payload.data) ? { data: payload.data as Record<string, unknown> } : {}),
|
||||
...(typeof payload.seq === "number" ? { seq: payload.seq } : {}),
|
||||
...(typeof eventGlobalSeq === "number"
|
||||
? { globalSeq: eventGlobalSeq }
|
||||
: {}),
|
||||
...(typeof payload.ts === "number" ? { ts: payload.ts } : {}),
|
||||
...(sessionKey ? { sessionKey } : {}),
|
||||
};
|
||||
|
||||
(this.stdout as PassThrough).write(`${JSON.stringify(event)}\n`);
|
||||
|
||||
const stream = typeof payload.stream === "string" ? payload.stream : "";
|
||||
const data = asRecord(payload.data);
|
||||
const phase = data && typeof data.phase === "string" ? data.phase : "";
|
||||
if (
|
||||
this.params.mode === "start" &&
|
||||
stream === "lifecycle" &&
|
||||
(phase === "end" || phase === "error")
|
||||
) {
|
||||
this.scheduleClose();
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
||||
if (frame.event === "error") {
|
||||
const payload = asRecord(frame.payload) ?? {};
|
||||
const sessionKey =
|
||||
typeof payload.sessionKey === "string" ? payload.sessionKey : undefined;
|
||||
if (!this.shouldAcceptSessionEvent(sessionKey)) {
|
||||
return;
|
||||
}
|
||||
const payloadGlobalSeq =
|
||||
typeof payload.globalSeq === "number" ? payload.globalSeq : undefined;
|
||||
const eventGlobalSeq =
|
||||
payloadGlobalSeq ??
|
||||
(typeof frame.seq === "number" ? frame.seq : undefined);
|
||||
const event: AgentEvent = {
|
||||
event: "error",
|
||||
data: payload,
|
||||
...(typeof eventGlobalSeq === "number"
|
||||
? { globalSeq: eventGlobalSeq }
|
||||
: {}),
|
||||
...(sessionKey ? { sessionKey } : {}),
|
||||
};
|
||||
(this.stdout as PassThrough).write(`${JSON.stringify(event)}\n`);
|
||||
if (this.params.mode === "start") {
|
||||
this.scheduleClose();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private scheduleClose(): void {
|
||||
if (this.closeScheduled || this.finished) {
|
||||
return;
|
||||
}
|
||||
this.closeScheduled = true;
|
||||
setTimeout(() => {
|
||||
if (this.finished) {
|
||||
return;
|
||||
}
|
||||
this.requestedClose = true;
|
||||
this.client?.close();
|
||||
this.finish(0, null);
|
||||
}, 25);
|
||||
}
|
||||
|
||||
private handleSocketClose(code: number, reason: string): void {
|
||||
if (this.finished) {
|
||||
return;
|
||||
}
|
||||
if (!this.requestedClose) {
|
||||
const detail = reason.trim() || `code ${code}`;
|
||||
(this.stderr as PassThrough).write(`Gateway connection closed: ${detail}\n`);
|
||||
}
|
||||
const exitCode = this.requestedClose || code === 1000 || code === 1005 ? 0 : 1;
|
||||
this.finish(exitCode, null);
|
||||
}
|
||||
|
||||
private finish(code: number | null, signal: NodeJS.Signals | null): void {
|
||||
if (this.finished) {
|
||||
return;
|
||||
}
|
||||
this.finished = true;
|
||||
try {
|
||||
(this.stdout as PassThrough).end();
|
||||
(this.stderr as PassThrough).end();
|
||||
} catch {
|
||||
// Ignore stream close errors.
|
||||
}
|
||||
this.emit("close", code, signal);
|
||||
}
|
||||
}
|
||||
|
||||
function shouldForceLegacyStream(): boolean {
|
||||
const raw = process.env.IRONCLAW_WEB_FORCE_LEGACY_STREAM?.trim().toLowerCase();
|
||||
return raw === "1" || raw === "true" || raw === "yes";
|
||||
}
|
||||
|
||||
export async function callGatewayRpc(
|
||||
method: string,
|
||||
params?: Record<string, unknown>,
|
||||
options?: { timeoutMs?: number },
|
||||
): Promise<GatewayResFrame> {
|
||||
const settings = resolveGatewayConnectionSettings();
|
||||
let closed = false;
|
||||
const client = new GatewayWsClient(settings, () => {}, () => {
|
||||
closed = true;
|
||||
});
|
||||
await client.open();
|
||||
try {
|
||||
const connect = await client.request(
|
||||
"connect",
|
||||
buildConnectParams(settings),
|
||||
options?.timeoutMs ?? REQUEST_TIMEOUT_MS,
|
||||
);
|
||||
if (!connect.ok) {
|
||||
throw new Error(frameErrorMessage(connect));
|
||||
}
|
||||
const result = await client.request(
|
||||
method,
|
||||
params,
|
||||
options?.timeoutMs ?? REQUEST_TIMEOUT_MS,
|
||||
);
|
||||
return result;
|
||||
} finally {
|
||||
if (!closed) {
|
||||
client.close();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Spawn an agent child process and return the ChildProcess handle.
|
||||
* Shared between `runAgent` (legacy callback API) and the ActiveRunManager.
|
||||
@ -137,12 +877,30 @@ export function spawnAgentProcess(
|
||||
message: string,
|
||||
agentSessionId?: string,
|
||||
): AgentProcessHandle {
|
||||
return spawnLegacyAgentProcess(message, agentSessionId);
|
||||
if (shouldForceLegacyStream()) {
|
||||
return spawnLegacyAgentProcess(message, agentSessionId);
|
||||
}
|
||||
const sessionKey = agentSessionId
|
||||
? `agent:main:web:${agentSessionId}`
|
||||
: undefined;
|
||||
return new GatewayProcessHandle({
|
||||
mode: "start",
|
||||
message,
|
||||
sessionKey,
|
||||
afterSeq: 0,
|
||||
});
|
||||
}
|
||||
|
||||
function spawnLegacyAgentProcess(
|
||||
message: string,
|
||||
agentSessionId?: string,
|
||||
): ReturnType<typeof spawn> {
|
||||
return spawnCliAgentProcess(message, agentSessionId);
|
||||
}
|
||||
|
||||
function spawnCliAgentProcess(
|
||||
message: string,
|
||||
agentSessionId?: string,
|
||||
): ReturnType<typeof spawn> {
|
||||
const args = [
|
||||
"agent",
|
||||
@ -178,7 +936,14 @@ export function spawnAgentSubscribeProcess(
|
||||
sessionKey: string,
|
||||
afterSeq = 0,
|
||||
): AgentProcessHandle {
|
||||
return spawnLegacyAgentSubscribeProcess(sessionKey, afterSeq);
|
||||
if (shouldForceLegacyStream()) {
|
||||
return spawnLegacyAgentSubscribeProcess(sessionKey, afterSeq);
|
||||
}
|
||||
return new GatewayProcessHandle({
|
||||
mode: "subscribe",
|
||||
sessionKey,
|
||||
afterSeq: Math.max(0, Number.isFinite(afterSeq) ? afterSeq : 0),
|
||||
});
|
||||
}
|
||||
|
||||
function spawnLegacyAgentSubscribeProcess(
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user