openclaw/src/discord/monitor/thread-bindings.lifecycle.ts
Onur Solmaz a7d56e3554
feat: ACP thread-bound agents (#23580)
* docs: add ACP thread-bound agents plan doc

* docs: expand ACP implementation specification

* feat(acp): route ACP sessions through core dispatch and lifecycle cleanup

* feat(acp): add /acp commands and Discord spawn gate

* ACP: add acpx runtime plugin backend

* fix(subagents): defer transient lifecycle errors before announce

* Agents: harden ACP sessions_spawn and tighten spawn guidance

* Agents: require explicit ACP target for runtime spawns

* docs: expand ACP control-plane implementation plan

* ACP: harden metadata seeding and spawn guidance

* ACP: centralize runtime control-plane manager and fail-closed dispatch

* ACP: harden runtime manager and unify spawn helpers

* Commands: route ACP sessions through ACP runtime in agent command

* ACP: require persisted metadata for runtime spawns

* Sessions: preserve ACP metadata when updating entries

* Plugins: harden ACP backend registry across loaders

* ACPX: make availability probe compatible with adapters

* E2E: add manual Discord ACP plain-language smoke script

* ACPX: preserve streamed spacing across Discord delivery

* Docs: add ACP Discord streaming strategy

* ACP: harden Discord stream buffering for thread replies

* ACP: reuse shared block reply pipeline for projector

* ACP: unify streaming config and adopt coalesceIdleMs

* Docs: add temporary ACP production hardening plan

* Docs: trim temporary ACP hardening plan goals

* Docs: gate ACP thread controls by backend capabilities

* ACP: add capability-gated runtime controls and /acp operator commands

* Docs: remove temporary ACP hardening plan

* ACP: fix spawn target validation and close cache cleanup

* ACP: harden runtime dispatch and recovery paths

* ACP: split ACP command/runtime internals and centralize policy

* ACP: harden runtime lifecycle, validation, and observability

* ACP: surface runtime and backend session IDs in thread bindings

* docs: add temp plan for binding-service migration

* ACP: migrate thread binding flows to SessionBindingService

* ACP: address review feedback and preserve prompt wording

* ACPX plugin: pin runtime dependency and prefer bundled CLI

* Discord: complete binding-service migration cleanup and restore ACP plan

* Docs: add standalone ACP agents guide

* ACP: route harness intents to thread-bound ACP sessions

* ACP: fix spawn thread routing and queue-owner stall

* ACP: harden startup reconciliation and command bypass handling

* ACP: fix dispatch bypass type narrowing

* ACP: align runtime metadata to agentSessionId

* ACP: normalize session identifier handling and labels

* ACP: mark thread banner session ids provisional until first reply

* ACP: stabilize session identity mapping and startup reconciliation

* ACP: add resolved session-id notices and cwd in thread intros

* Discord: prefix thread meta notices consistently

* Discord: unify ACP/thread meta notices with gear prefix

* Discord: split thread persona naming from meta formatting

* Extensions: bump acpx plugin dependency to 0.1.9

* Agents: gate ACP prompt guidance behind acp.enabled

* Docs: remove temp experiment plan docs

* Docs: scope streaming plan to holy grail refactor

* Docs: refactor ACP agents guide for human-first flow

* Docs/Skill: add ACP feature-flag guidance and direct acpx telephone-game flow

* Docs/Skill: add OpenCode and Pi to ACP harness lists

* Docs/Skill: align ACP harness list with current acpx registry

* Dev/Test: move ACP plain-language smoke script and mark as keep

* Docs/Skill: reorder ACP harness lists with Pi first

* ACP: split control-plane manager into core/types/utils modules

* Docs: refresh ACP thread-bound agents plan

* ACP: extract dispatch lane and split manager domains

* ACP: centralize binding context and remove reverse deps

* Infra: unify system message formatting

* ACP: centralize error boundaries and session id rendering

* ACP: enforce init concurrency cap and strict meta clear

* Tests: fix ACP dispatch binding mock typing

* Tests: fix Discord thread-binding mock drift and ACP request id

* ACP: gate slash bypass and persist cleared overrides

* ACPX: await pre-abort cancel before runTurn return

* Extension: pin acpx runtime dependency to 0.1.11

* Docs: add pinned acpx install strategy for ACP extension

* Extensions/acpx: enforce strict local pinned startup

* Extensions/acpx: tighten acp-router install guidance

* ACPX: retry runtime test temp-dir cleanup

* Extensions/acpx: require proactive ACPX repair for thread spawns

* Extensions/acpx: require restart offer after acpx reinstall

* extensions/acpx: remove workspace protocol devDependency

* extensions/acpx: bump pinned acpx to 0.1.13

* extensions/acpx: sync lockfile after dependency bump

* ACPX: make runtime spawn Windows-safe

* fix: align doctor-config-flow repair tests with default-account migration (#23580) (thanks @osolmaz)
2026-02-26 11:00:09 +01:00

282 lines
7.6 KiB
TypeScript

import { readAcpSessionEntry } from "../../acp/runtime/session-meta.js";
import type { OpenClawConfig } from "../../config/config.js";
import { normalizeAccountId } from "../../routing/session-key.js";
import { parseDiscordTarget } from "../targets.js";
import { resolveChannelIdForBinding } from "./thread-bindings.discord-api.js";
import { getThreadBindingManager } from "./thread-bindings.manager.js";
import {
resolveThreadBindingIntroText,
resolveThreadBindingThreadName,
} from "./thread-bindings.messages.js";
import {
BINDINGS_BY_THREAD_ID,
MANAGERS_BY_ACCOUNT_ID,
ensureBindingsLoaded,
getThreadBindingToken,
normalizeThreadBindingTtlMs,
normalizeThreadId,
rememberRecentUnboundWebhookEcho,
removeBindingRecord,
resolveBindingIdsForSession,
saveBindingsToDisk,
setBindingRecord,
shouldPersistBindingMutations,
} from "./thread-bindings.state.js";
import type { ThreadBindingRecord, ThreadBindingTargetKind } from "./thread-bindings.types.js";
export type AcpThreadBindingReconciliationResult = {
checked: number;
removed: number;
staleSessionKeys: string[];
};
function resolveBindingIdsForTargetSession(params: {
targetSessionKey: string;
accountId?: string;
targetKind?: ThreadBindingTargetKind;
}) {
ensureBindingsLoaded();
const targetSessionKey = params.targetSessionKey.trim();
if (!targetSessionKey) {
return [];
}
const accountId = params.accountId ? normalizeAccountId(params.accountId) : undefined;
return resolveBindingIdsForSession({
targetSessionKey,
accountId,
targetKind: params.targetKind,
});
}
export function listThreadBindingsForAccount(accountId?: string): ThreadBindingRecord[] {
const manager = getThreadBindingManager(accountId);
if (!manager) {
return [];
}
return manager.listBindings();
}
export function listThreadBindingsBySessionKey(params: {
targetSessionKey: string;
accountId?: string;
targetKind?: ThreadBindingTargetKind;
}): ThreadBindingRecord[] {
const ids = resolveBindingIdsForTargetSession(params);
return ids
.map((bindingKey) => BINDINGS_BY_THREAD_ID.get(bindingKey))
.filter((entry): entry is ThreadBindingRecord => Boolean(entry));
}
export async function autoBindSpawnedDiscordSubagent(params: {
accountId?: string;
channel?: string;
to?: string;
threadId?: string | number;
childSessionKey: string;
agentId: string;
label?: string;
boundBy?: string;
}): Promise<ThreadBindingRecord | null> {
const channel = params.channel?.trim().toLowerCase();
if (channel !== "discord") {
return null;
}
const manager = getThreadBindingManager(params.accountId);
if (!manager) {
return null;
}
const managerToken = getThreadBindingToken(manager.accountId);
const requesterThreadId = normalizeThreadId(params.threadId);
let channelId = "";
if (requesterThreadId) {
const existing = manager.getByThreadId(requesterThreadId);
if (existing?.channelId?.trim()) {
channelId = existing.channelId.trim();
} else {
channelId =
(await resolveChannelIdForBinding({
accountId: manager.accountId,
token: managerToken,
threadId: requesterThreadId,
})) ?? "";
}
}
if (!channelId) {
const to = params.to?.trim() || "";
if (!to) {
return null;
}
try {
const target = parseDiscordTarget(to, { defaultKind: "channel" });
if (!target || target.kind !== "channel") {
return null;
}
channelId =
(await resolveChannelIdForBinding({
accountId: manager.accountId,
token: managerToken,
threadId: target.id,
})) ?? "";
} catch {
return null;
}
}
return await manager.bindTarget({
threadId: undefined,
channelId,
createThread: true,
threadName: resolveThreadBindingThreadName({
agentId: params.agentId,
label: params.label,
}),
targetKind: "subagent",
targetSessionKey: params.childSessionKey,
agentId: params.agentId,
label: params.label,
boundBy: params.boundBy ?? "system",
introText: resolveThreadBindingIntroText({
agentId: params.agentId,
label: params.label,
sessionTtlMs: manager.getSessionTtlMs(),
}),
});
}
export function unbindThreadBindingsBySessionKey(params: {
targetSessionKey: string;
accountId?: string;
targetKind?: ThreadBindingTargetKind;
reason?: string;
sendFarewell?: boolean;
farewellText?: string;
}): ThreadBindingRecord[] {
const ids = resolveBindingIdsForTargetSession(params);
if (ids.length === 0) {
return [];
}
const removed: ThreadBindingRecord[] = [];
for (const bindingKey of ids) {
const record = BINDINGS_BY_THREAD_ID.get(bindingKey);
if (!record) {
continue;
}
const manager = MANAGERS_BY_ACCOUNT_ID.get(record.accountId);
if (manager) {
const unbound = manager.unbindThread({
threadId: record.threadId,
reason: params.reason,
sendFarewell: params.sendFarewell,
farewellText: params.farewellText,
});
if (unbound) {
removed.push(unbound);
}
continue;
}
const unbound = removeBindingRecord(bindingKey);
if (unbound) {
rememberRecentUnboundWebhookEcho(unbound);
removed.push(unbound);
}
}
if (removed.length > 0 && shouldPersistBindingMutations()) {
saveBindingsToDisk({ force: true });
}
return removed;
}
export function setThreadBindingTtlBySessionKey(params: {
targetSessionKey: string;
accountId?: string;
ttlMs: number;
}): ThreadBindingRecord[] {
const ids = resolveBindingIdsForTargetSession(params);
if (ids.length === 0) {
return [];
}
const ttlMs = normalizeThreadBindingTtlMs(params.ttlMs);
const now = Date.now();
const expiresAt = ttlMs > 0 ? now + ttlMs : 0;
const updated: ThreadBindingRecord[] = [];
for (const bindingKey of ids) {
const existing = BINDINGS_BY_THREAD_ID.get(bindingKey);
if (!existing) {
continue;
}
const nextRecord: ThreadBindingRecord = {
...existing,
boundAt: now,
expiresAt,
};
setBindingRecord(nextRecord);
updated.push(nextRecord);
}
if (updated.length > 0 && shouldPersistBindingMutations()) {
saveBindingsToDisk({ force: true });
}
return updated;
}
export function reconcileAcpThreadBindingsOnStartup(params: {
cfg: OpenClawConfig;
accountId?: string;
sendFarewell?: boolean;
}): AcpThreadBindingReconciliationResult {
const manager = getThreadBindingManager(params.accountId);
if (!manager) {
return {
checked: 0,
removed: 0,
staleSessionKeys: [],
};
}
const acpBindings = manager.listBindings().filter((binding) => binding.targetKind === "acp");
const staleBindings = acpBindings.filter((binding) => {
const sessionKey = binding.targetSessionKey.trim();
if (!sessionKey) {
return true;
}
const session = readAcpSessionEntry({
cfg: params.cfg,
sessionKey,
});
// Session store read failures are transient; never auto-unbind on uncertain reads.
if (session?.storeReadFailed) {
return false;
}
return !session?.acp;
});
if (staleBindings.length === 0) {
return {
checked: acpBindings.length,
removed: 0,
staleSessionKeys: [],
};
}
const staleSessionKeys: string[] = [];
let removed = 0;
for (const binding of staleBindings) {
staleSessionKeys.push(binding.targetSessionKey);
const unbound = manager.unbindThread({
threadId: binding.threadId,
reason: "stale-session",
sendFarewell: params.sendFarewell ?? false,
});
if (unbound) {
removed += 1;
}
}
return {
checked: acpBindings.length,
removed,
staleSessionKeys: [...new Set(staleSessionKeys)],
};
}