579 lines
18 KiB
TypeScript
Raw Normal View History

feat: ACP thread-bound agents (#23580) * docs: add ACP thread-bound agents plan doc * docs: expand ACP implementation specification * feat(acp): route ACP sessions through core dispatch and lifecycle cleanup * feat(acp): add /acp commands and Discord spawn gate * ACP: add acpx runtime plugin backend * fix(subagents): defer transient lifecycle errors before announce * Agents: harden ACP sessions_spawn and tighten spawn guidance * Agents: require explicit ACP target for runtime spawns * docs: expand ACP control-plane implementation plan * ACP: harden metadata seeding and spawn guidance * ACP: centralize runtime control-plane manager and fail-closed dispatch * ACP: harden runtime manager and unify spawn helpers * Commands: route ACP sessions through ACP runtime in agent command * ACP: require persisted metadata for runtime spawns * Sessions: preserve ACP metadata when updating entries * Plugins: harden ACP backend registry across loaders * ACPX: make availability probe compatible with adapters * E2E: add manual Discord ACP plain-language smoke script * ACPX: preserve streamed spacing across Discord delivery * Docs: add ACP Discord streaming strategy * ACP: harden Discord stream buffering for thread replies * ACP: reuse shared block reply pipeline for projector * ACP: unify streaming config and adopt coalesceIdleMs * Docs: add temporary ACP production hardening plan * Docs: trim temporary ACP hardening plan goals * Docs: gate ACP thread controls by backend capabilities * ACP: add capability-gated runtime controls and /acp operator commands * Docs: remove temporary ACP hardening plan * ACP: fix spawn target validation and close cache cleanup * ACP: harden runtime dispatch and recovery paths * ACP: split ACP command/runtime internals and centralize policy * ACP: harden runtime lifecycle, validation, and observability * ACP: surface runtime and backend session IDs in thread bindings * docs: add temp plan for binding-service migration * ACP: migrate thread binding flows to SessionBindingService * ACP: address review feedback and preserve prompt wording * ACPX plugin: pin runtime dependency and prefer bundled CLI * Discord: complete binding-service migration cleanup and restore ACP plan * Docs: add standalone ACP agents guide * ACP: route harness intents to thread-bound ACP sessions * ACP: fix spawn thread routing and queue-owner stall * ACP: harden startup reconciliation and command bypass handling * ACP: fix dispatch bypass type narrowing * ACP: align runtime metadata to agentSessionId * ACP: normalize session identifier handling and labels * ACP: mark thread banner session ids provisional until first reply * ACP: stabilize session identity mapping and startup reconciliation * ACP: add resolved session-id notices and cwd in thread intros * Discord: prefix thread meta notices consistently * Discord: unify ACP/thread meta notices with gear prefix * Discord: split thread persona naming from meta formatting * Extensions: bump acpx plugin dependency to 0.1.9 * Agents: gate ACP prompt guidance behind acp.enabled * Docs: remove temp experiment plan docs * Docs: scope streaming plan to holy grail refactor * Docs: refactor ACP agents guide for human-first flow * Docs/Skill: add ACP feature-flag guidance and direct acpx telephone-game flow * Docs/Skill: add OpenCode and Pi to ACP harness lists * Docs/Skill: align ACP harness list with current acpx registry * Dev/Test: move ACP plain-language smoke script and mark as keep * Docs/Skill: reorder ACP harness lists with Pi first * ACP: split control-plane manager into core/types/utils modules * Docs: refresh ACP thread-bound agents plan * ACP: extract dispatch lane and split manager domains * ACP: centralize binding context and remove reverse deps * Infra: unify system message formatting * ACP: centralize error boundaries and session id rendering * ACP: enforce init concurrency cap and strict meta clear * Tests: fix ACP dispatch binding mock typing * Tests: fix Discord thread-binding mock drift and ACP request id * ACP: gate slash bypass and persist cleared overrides * ACPX: await pre-abort cancel before runTurn return * Extension: pin acpx runtime dependency to 0.1.11 * Docs: add pinned acpx install strategy for ACP extension * Extensions/acpx: enforce strict local pinned startup * Extensions/acpx: tighten acp-router install guidance * ACPX: retry runtime test temp-dir cleanup * Extensions/acpx: require proactive ACPX repair for thread spawns * Extensions/acpx: require restart offer after acpx reinstall * extensions/acpx: remove workspace protocol devDependency * extensions/acpx: bump pinned acpx to 0.1.13 * extensions/acpx: sync lockfile after dependency bump * ACPX: make runtime spawn Windows-safe * fix: align doctor-config-flow repair tests with default-account migration (#23580) (thanks @osolmaz)
2026-02-26 11:00:09 +01:00
import { createInterface } from "node:readline";
import type {
AcpRuntimeCapabilities,
AcpRuntimeDoctorReport,
AcpRuntime,
AcpRuntimeEnsureInput,
AcpRuntimeErrorCode,
AcpRuntimeEvent,
AcpRuntimeHandle,
AcpRuntimeStatus,
AcpRuntimeTurnInput,
PluginLogger,
} from "openclaw/plugin-sdk";
import { AcpRuntimeError } from "openclaw/plugin-sdk";
import {
ACPX_LOCAL_INSTALL_COMMAND,
ACPX_PINNED_VERSION,
type ResolvedAcpxPluginConfig,
} from "./config.js";
import { checkPinnedAcpxVersion } from "./ensure.js";
import {
parseJsonLines,
parsePromptEventLine,
toAcpxErrorEvent,
} from "./runtime-internals/events.js";
import {
resolveSpawnFailure,
spawnAndCollect,
spawnWithResolvedCommand,
waitForExit,
} from "./runtime-internals/process.js";
import {
asOptionalString,
asTrimmedString,
buildPermissionArgs,
deriveAgentFromSessionKey,
isRecord,
type AcpxHandleState,
type AcpxJsonObject,
} from "./runtime-internals/shared.js";
export const ACPX_BACKEND_ID = "acpx";
const ACPX_RUNTIME_HANDLE_PREFIX = "acpx:v1:";
const DEFAULT_AGENT_FALLBACK = "codex";
const ACPX_CAPABILITIES: AcpRuntimeCapabilities = {
controls: ["session/set_mode", "session/set_config_option", "session/status"],
};
export function encodeAcpxRuntimeHandleState(state: AcpxHandleState): string {
const payload = Buffer.from(JSON.stringify(state), "utf8").toString("base64url");
return `${ACPX_RUNTIME_HANDLE_PREFIX}${payload}`;
}
export function decodeAcpxRuntimeHandleState(runtimeSessionName: string): AcpxHandleState | null {
const trimmed = runtimeSessionName.trim();
if (!trimmed.startsWith(ACPX_RUNTIME_HANDLE_PREFIX)) {
return null;
}
const encoded = trimmed.slice(ACPX_RUNTIME_HANDLE_PREFIX.length);
if (!encoded) {
return null;
}
try {
const raw = Buffer.from(encoded, "base64url").toString("utf8");
const parsed = JSON.parse(raw) as unknown;
if (!isRecord(parsed)) {
return null;
}
const name = asTrimmedString(parsed.name);
const agent = asTrimmedString(parsed.agent);
const cwd = asTrimmedString(parsed.cwd);
const mode = asTrimmedString(parsed.mode);
const acpxRecordId = asOptionalString(parsed.acpxRecordId);
const backendSessionId = asOptionalString(parsed.backendSessionId);
const agentSessionId = asOptionalString(parsed.agentSessionId);
if (!name || !agent || !cwd) {
return null;
}
if (mode !== "persistent" && mode !== "oneshot") {
return null;
}
return {
name,
agent,
cwd,
mode,
...(acpxRecordId ? { acpxRecordId } : {}),
...(backendSessionId ? { backendSessionId } : {}),
...(agentSessionId ? { agentSessionId } : {}),
};
} catch {
return null;
}
}
export class AcpxRuntime implements AcpRuntime {
private healthy = false;
private readonly logger?: PluginLogger;
private readonly queueOwnerTtlSeconds: number;
constructor(
private readonly config: ResolvedAcpxPluginConfig,
opts?: {
logger?: PluginLogger;
queueOwnerTtlSeconds?: number;
},
) {
this.logger = opts?.logger;
const requestedQueueOwnerTtlSeconds = opts?.queueOwnerTtlSeconds;
this.queueOwnerTtlSeconds =
typeof requestedQueueOwnerTtlSeconds === "number" &&
Number.isFinite(requestedQueueOwnerTtlSeconds) &&
requestedQueueOwnerTtlSeconds >= 0
? requestedQueueOwnerTtlSeconds
: this.config.queueOwnerTtlSeconds;
}
isHealthy(): boolean {
return this.healthy;
}
async probeAvailability(): Promise<void> {
const versionCheck = await checkPinnedAcpxVersion({
command: this.config.command,
cwd: this.config.cwd,
expectedVersion: ACPX_PINNED_VERSION,
});
if (!versionCheck.ok) {
this.healthy = false;
return;
}
try {
const result = await spawnAndCollect({
command: this.config.command,
args: ["--help"],
cwd: this.config.cwd,
});
this.healthy = result.error == null && (result.code ?? 0) === 0;
} catch {
this.healthy = false;
}
}
async ensureSession(input: AcpRuntimeEnsureInput): Promise<AcpRuntimeHandle> {
const sessionName = asTrimmedString(input.sessionKey);
if (!sessionName) {
throw new AcpRuntimeError("ACP_SESSION_INIT_FAILED", "ACP session key is required.");
}
const agent = asTrimmedString(input.agent);
if (!agent) {
throw new AcpRuntimeError("ACP_SESSION_INIT_FAILED", "ACP agent id is required.");
}
const cwd = asTrimmedString(input.cwd) || this.config.cwd;
const mode = input.mode;
const events = await this.runControlCommand({
args: this.buildControlArgs({
cwd,
command: [agent, "sessions", "ensure", "--name", sessionName],
}),
cwd,
fallbackCode: "ACP_SESSION_INIT_FAILED",
});
const ensuredEvent = events.find(
(event) =>
asOptionalString(event.agentSessionId) ||
asOptionalString(event.acpxSessionId) ||
asOptionalString(event.acpxRecordId),
);
const acpxRecordId = ensuredEvent ? asOptionalString(ensuredEvent.acpxRecordId) : undefined;
const agentSessionId = ensuredEvent ? asOptionalString(ensuredEvent.agentSessionId) : undefined;
const backendSessionId = ensuredEvent
? asOptionalString(ensuredEvent.acpxSessionId)
: undefined;
return {
sessionKey: input.sessionKey,
backend: ACPX_BACKEND_ID,
runtimeSessionName: encodeAcpxRuntimeHandleState({
name: sessionName,
agent,
cwd,
mode,
...(acpxRecordId ? { acpxRecordId } : {}),
...(backendSessionId ? { backendSessionId } : {}),
...(agentSessionId ? { agentSessionId } : {}),
}),
cwd,
...(acpxRecordId ? { acpxRecordId } : {}),
...(backendSessionId ? { backendSessionId } : {}),
...(agentSessionId ? { agentSessionId } : {}),
};
}
async *runTurn(input: AcpRuntimeTurnInput): AsyncIterable<AcpRuntimeEvent> {
const state = this.resolveHandleState(input.handle);
const args = this.buildPromptArgs({
agent: state.agent,
sessionName: state.name,
cwd: state.cwd,
});
const cancelOnAbort = async () => {
await this.cancel({
handle: input.handle,
reason: "abort-signal",
}).catch((err) => {
this.logger?.warn?.(`acpx runtime abort-cancel failed: ${String(err)}`);
});
};
const onAbort = () => {
void cancelOnAbort();
};
if (input.signal?.aborted) {
await cancelOnAbort();
return;
}
if (input.signal) {
input.signal.addEventListener("abort", onAbort, { once: true });
}
const child = spawnWithResolvedCommand({
command: this.config.command,
args,
cwd: state.cwd,
});
child.stdin.on("error", () => {
// Ignore EPIPE when the child exits before stdin flush completes.
});
child.stdin.end(input.text);
let stderr = "";
child.stderr.on("data", (chunk) => {
stderr += String(chunk);
});
let sawDone = false;
let sawError = false;
const lines = createInterface({ input: child.stdout });
try {
for await (const line of lines) {
const parsed = parsePromptEventLine(line);
if (!parsed) {
continue;
}
if (parsed.type === "done") {
sawDone = true;
}
if (parsed.type === "error") {
sawError = true;
}
yield parsed;
}
const exit = await waitForExit(child);
if (exit.error) {
const spawnFailure = resolveSpawnFailure(exit.error, state.cwd);
if (spawnFailure === "missing-command") {
this.healthy = false;
throw new AcpRuntimeError(
"ACP_BACKEND_UNAVAILABLE",
`acpx command not found: ${this.config.command}`,
{ cause: exit.error },
);
}
if (spawnFailure === "missing-cwd") {
throw new AcpRuntimeError(
"ACP_TURN_FAILED",
`ACP runtime working directory does not exist: ${state.cwd}`,
{ cause: exit.error },
);
}
throw new AcpRuntimeError("ACP_TURN_FAILED", exit.error.message, { cause: exit.error });
}
if ((exit.code ?? 0) !== 0 && !sawError) {
yield {
type: "error",
message: stderr.trim() || `acpx exited with code ${exit.code ?? "unknown"}`,
};
return;
}
if (!sawDone && !sawError) {
yield { type: "done" };
}
} finally {
lines.close();
if (input.signal) {
input.signal.removeEventListener("abort", onAbort);
}
}
}
getCapabilities(): AcpRuntimeCapabilities {
return ACPX_CAPABILITIES;
}
async getStatus(input: { handle: AcpRuntimeHandle }): Promise<AcpRuntimeStatus> {
const state = this.resolveHandleState(input.handle);
const events = await this.runControlCommand({
args: this.buildControlArgs({
cwd: state.cwd,
command: [state.agent, "status", "--session", state.name],
}),
cwd: state.cwd,
fallbackCode: "ACP_TURN_FAILED",
ignoreNoSession: true,
});
const detail = events.find((event) => !toAcpxErrorEvent(event)) ?? events[0];
if (!detail) {
return {
summary: "acpx status unavailable",
};
}
const status = asTrimmedString(detail.status) || "unknown";
const acpxRecordId = asOptionalString(detail.acpxRecordId);
const acpxSessionId = asOptionalString(detail.acpxSessionId);
const agentSessionId = asOptionalString(detail.agentSessionId);
const pid = typeof detail.pid === "number" && Number.isFinite(detail.pid) ? detail.pid : null;
const summary = [
`status=${status}`,
acpxRecordId ? `acpxRecordId=${acpxRecordId}` : null,
acpxSessionId ? `acpxSessionId=${acpxSessionId}` : null,
pid != null ? `pid=${pid}` : null,
]
.filter(Boolean)
.join(" ");
return {
summary,
...(acpxRecordId ? { acpxRecordId } : {}),
...(acpxSessionId ? { backendSessionId: acpxSessionId } : {}),
...(agentSessionId ? { agentSessionId } : {}),
details: detail,
};
}
async setMode(input: { handle: AcpRuntimeHandle; mode: string }): Promise<void> {
const state = this.resolveHandleState(input.handle);
const mode = asTrimmedString(input.mode);
if (!mode) {
throw new AcpRuntimeError("ACP_TURN_FAILED", "ACP runtime mode is required.");
}
await this.runControlCommand({
args: this.buildControlArgs({
cwd: state.cwd,
command: [state.agent, "set-mode", mode, "--session", state.name],
}),
cwd: state.cwd,
fallbackCode: "ACP_TURN_FAILED",
});
}
async setConfigOption(input: {
handle: AcpRuntimeHandle;
key: string;
value: string;
}): Promise<void> {
const state = this.resolveHandleState(input.handle);
const key = asTrimmedString(input.key);
const value = asTrimmedString(input.value);
if (!key || !value) {
throw new AcpRuntimeError("ACP_TURN_FAILED", "ACP config option key/value are required.");
}
await this.runControlCommand({
args: this.buildControlArgs({
cwd: state.cwd,
command: [state.agent, "set", key, value, "--session", state.name],
}),
cwd: state.cwd,
fallbackCode: "ACP_TURN_FAILED",
});
}
async doctor(): Promise<AcpRuntimeDoctorReport> {
const versionCheck = await checkPinnedAcpxVersion({
command: this.config.command,
cwd: this.config.cwd,
expectedVersion: ACPX_PINNED_VERSION,
});
if (!versionCheck.ok) {
this.healthy = false;
const details = [
`expected=${versionCheck.expectedVersion}`,
versionCheck.installedVersion ? `installed=${versionCheck.installedVersion}` : null,
].filter((detail): detail is string => Boolean(detail));
return {
ok: false,
code: "ACP_BACKEND_UNAVAILABLE",
message: versionCheck.message,
installCommand: versionCheck.installCommand,
details,
};
}
try {
const result = await spawnAndCollect({
command: this.config.command,
args: ["--help"],
cwd: this.config.cwd,
});
if (result.error) {
const spawnFailure = resolveSpawnFailure(result.error, this.config.cwd);
if (spawnFailure === "missing-command") {
this.healthy = false;
return {
ok: false,
code: "ACP_BACKEND_UNAVAILABLE",
message: `acpx command not found: ${this.config.command}`,
installCommand: ACPX_LOCAL_INSTALL_COMMAND,
};
}
if (spawnFailure === "missing-cwd") {
this.healthy = false;
return {
ok: false,
code: "ACP_BACKEND_UNAVAILABLE",
message: `ACP runtime working directory does not exist: ${this.config.cwd}`,
};
}
this.healthy = false;
return {
ok: false,
code: "ACP_BACKEND_UNAVAILABLE",
message: result.error.message,
details: [String(result.error)],
};
}
if ((result.code ?? 0) !== 0) {
this.healthy = false;
return {
ok: false,
code: "ACP_BACKEND_UNAVAILABLE",
message: result.stderr.trim() || `acpx exited with code ${result.code ?? "unknown"}`,
};
}
this.healthy = true;
return {
ok: true,
message: `acpx command available (${this.config.command}, version ${versionCheck.version})`,
};
} catch (error) {
this.healthy = false;
return {
ok: false,
code: "ACP_BACKEND_UNAVAILABLE",
message: error instanceof Error ? error.message : String(error),
};
}
}
async cancel(input: { handle: AcpRuntimeHandle; reason?: string }): Promise<void> {
const state = this.resolveHandleState(input.handle);
await this.runControlCommand({
args: this.buildControlArgs({
cwd: state.cwd,
command: [state.agent, "cancel", "--session", state.name],
}),
cwd: state.cwd,
fallbackCode: "ACP_TURN_FAILED",
ignoreNoSession: true,
});
}
async close(input: { handle: AcpRuntimeHandle; reason: string }): Promise<void> {
const state = this.resolveHandleState(input.handle);
await this.runControlCommand({
args: this.buildControlArgs({
cwd: state.cwd,
command: [state.agent, "sessions", "close", state.name],
}),
cwd: state.cwd,
fallbackCode: "ACP_TURN_FAILED",
ignoreNoSession: true,
});
}
private resolveHandleState(handle: AcpRuntimeHandle): AcpxHandleState {
const decoded = decodeAcpxRuntimeHandleState(handle.runtimeSessionName);
if (decoded) {
return decoded;
}
const legacyName = asTrimmedString(handle.runtimeSessionName);
if (!legacyName) {
throw new AcpRuntimeError(
"ACP_SESSION_INIT_FAILED",
"Invalid acpx runtime handle: runtimeSessionName is missing.",
);
}
return {
name: legacyName,
agent: deriveAgentFromSessionKey(handle.sessionKey, DEFAULT_AGENT_FALLBACK),
cwd: this.config.cwd,
mode: "persistent",
};
}
private buildControlArgs(params: { cwd: string; command: string[] }): string[] {
return ["--format", "json", "--json-strict", "--cwd", params.cwd, ...params.command];
}
private buildPromptArgs(params: { agent: string; sessionName: string; cwd: string }): string[] {
const args = [
"--format",
"json",
"--json-strict",
"--cwd",
params.cwd,
...buildPermissionArgs(this.config.permissionMode),
"--non-interactive-permissions",
this.config.nonInteractivePermissions,
];
if (this.config.timeoutSeconds) {
args.push("--timeout", String(this.config.timeoutSeconds));
}
args.push("--ttl", String(this.queueOwnerTtlSeconds));
args.push(params.agent, "prompt", "--session", params.sessionName, "--file", "-");
return args;
}
private async runControlCommand(params: {
args: string[];
cwd: string;
fallbackCode: AcpRuntimeErrorCode;
ignoreNoSession?: boolean;
}): Promise<AcpxJsonObject[]> {
const result = await spawnAndCollect({
command: this.config.command,
args: params.args,
cwd: params.cwd,
});
if (result.error) {
const spawnFailure = resolveSpawnFailure(result.error, params.cwd);
if (spawnFailure === "missing-command") {
this.healthy = false;
throw new AcpRuntimeError(
"ACP_BACKEND_UNAVAILABLE",
`acpx command not found: ${this.config.command}`,
{ cause: result.error },
);
}
if (spawnFailure === "missing-cwd") {
throw new AcpRuntimeError(
params.fallbackCode,
`ACP runtime working directory does not exist: ${params.cwd}`,
{ cause: result.error },
);
}
throw new AcpRuntimeError(params.fallbackCode, result.error.message, { cause: result.error });
}
const events = parseJsonLines(result.stdout);
const errorEvent = events.map((event) => toAcpxErrorEvent(event)).find(Boolean) ?? null;
if (errorEvent) {
if (params.ignoreNoSession && errorEvent.code === "NO_SESSION") {
return events;
}
throw new AcpRuntimeError(
params.fallbackCode,
errorEvent.code ? `${errorEvent.code}: ${errorEvent.message}` : errorEvent.message,
);
}
if ((result.code ?? 0) !== 0) {
throw new AcpRuntimeError(
params.fallbackCode,
result.stderr.trim() || `acpx exited with code ${result.code ?? "unknown"}`,
);
}
return events;
}
}