openclaw/src/auto-reply/reply/commands-session.ts
Onur 8178ea472d
feat: thread-bound subagents on Discord (#21805)
* docs: thread-bound subagents plan

* docs: add exact thread-bound subagent implementation touchpoints

* Docs: prioritize auto thread-bound subagent flow

* Docs: add ACP harness thread-binding extensions

* Discord: add thread-bound session routing and auto-bind spawn flow

* Subagents: add focus commands and ACP/session binding lifecycle hooks

* Tests: cover thread bindings, focus commands, and ACP unbind hooks

* Docs: add plugin-hook appendix for thread-bound subagents

* Plugins: add subagent lifecycle hook events

* Core: emit subagent lifecycle hooks and decouple Discord bindings

* Discord: handle subagent bind lifecycle via plugin hooks

* Subagents: unify completion finalizer and split registry modules

* Add subagent lifecycle events module

* Hooks: fix subagent ended context key

* Discord: share thread bindings across ESM and Jiti

* Subagents: add persistent sessions_spawn mode for thread-bound sessions

* Subagents: clarify thread intro and persistent completion copy

* test(subagents): stabilize sessions_spawn lifecycle cleanup assertions

* Discord: add thread-bound session TTL with auto-unfocus

* Subagents: fail session spawns when thread bind fails

* Subagents: cover thread session failure cleanup paths

* Session: add thread binding TTL config and /session ttl controls

* Tests: align discord reaction expectations

* Agent: persist sessionFile for keyed subagent sessions

* Discord: normalize imports after conflict resolution

* Sessions: centralize sessionFile resolve/persist helper

* Discord: harden thread-bound subagent session routing

* Rebase: resolve upstream/main conflicts

* Subagents: move thread binding into hooks and split bindings modules

* Docs: add channel-agnostic subagent routing hook plan

* Agents: decouple subagent routing from Discord

* Discord: refactor thread-bound subagent flows

* Subagents: prevent duplicate end hooks and orphaned failed sessions

* Refactor: split subagent command and provider phases

* Subagents: honor hook delivery target overrides

* Discord: add thread binding kill switches and refresh plan doc

* Discord: fix thread bind channel resolution

* Routing: centralize account id normalization

* Discord: clean up thread bindings on startup failures

* Discord: add startup cleanup regression tests

* Docs: add long-term thread-bound subagent architecture

* Docs: split session binding plan and dedupe thread-bound doc

* Subagents: add channel-agnostic session binding routing

* Subagents: stabilize announce completion routing tests

* Subagents: cover multi-bound completion routing

* Subagents: suppress lifecycle hooks on failed thread bind

* tests: fix discord provider mock typing regressions

* docs/protocol: sync slash command aliases and delete param models

* fix: add changelog entry for Discord thread-bound subagents (#21805) (thanks @onutc)

---------

Co-authored-by: Shadow <hi@shadowing.dev>
2026-02-21 16:14:55 +01:00

551 lines
17 KiB
TypeScript
Raw Blame History

This file contains invisible Unicode characters

This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import { abortEmbeddedPiRun } from "../../agents/pi-embedded.js";
import { parseDurationMs } from "../../cli/parse-duration.js";
import { isRestartEnabled } from "../../config/commands.js";
import type { SessionEntry } from "../../config/sessions.js";
import { updateSessionStore } from "../../config/sessions.js";
import {
formatThreadBindingTtlLabel,
getThreadBindingManager,
setThreadBindingTtlBySessionKey,
} from "../../discord/monitor/thread-bindings.js";
import { logVerbose } from "../../globals.js";
import { createInternalHookEvent, triggerInternalHook } from "../../hooks/internal-hooks.js";
import { scheduleGatewaySigusr1Restart, triggerOpenClawRestart } from "../../infra/restart.js";
import { loadCostUsageSummary, loadSessionCostSummary } from "../../infra/session-cost-usage.js";
import { formatTokenCount, formatUsd } from "../../utils/usage-format.js";
import { parseActivationCommand } from "../group-activation.js";
import { parseSendPolicyCommand } from "../send-policy.js";
import { normalizeUsageDisplay, resolveResponseUsageMode } from "../thinking.js";
import {
formatAbortReplyText,
isAbortTrigger,
resolveSessionEntryForKey,
setAbortMemory,
stopSubagentsForRequester,
} from "./abort.js";
import type { CommandHandler } from "./commands-types.js";
import { clearSessionQueues } from "./queue.js";
function resolveAbortTarget(params: {
ctx: { CommandTargetSessionKey?: string | null };
sessionKey?: string;
sessionEntry?: SessionEntry;
sessionStore?: Record<string, SessionEntry>;
}) {
const targetSessionKey = params.ctx.CommandTargetSessionKey?.trim() || params.sessionKey;
const { entry, key } = resolveSessionEntryForKey(params.sessionStore, targetSessionKey);
if (entry && key) {
return { entry, key, sessionId: entry.sessionId };
}
if (params.sessionEntry && params.sessionKey) {
return {
entry: params.sessionEntry,
key: params.sessionKey,
sessionId: params.sessionEntry.sessionId,
};
}
return { entry: undefined, key: targetSessionKey, sessionId: undefined };
}
const SESSION_COMMAND_PREFIX = "/session";
const SESSION_TTL_OFF_VALUES = new Set(["off", "disable", "disabled", "none", "0"]);
function isDiscordSurface(params: Parameters<CommandHandler>[0]): boolean {
const channel =
params.ctx.OriginatingChannel ??
params.command.channel ??
params.ctx.Surface ??
params.ctx.Provider;
return (
String(channel ?? "")
.trim()
.toLowerCase() === "discord"
);
}
function resolveDiscordAccountId(params: Parameters<CommandHandler>[0]): string {
const accountId = typeof params.ctx.AccountId === "string" ? params.ctx.AccountId.trim() : "";
return accountId || "default";
}
function resolveSessionCommandUsage() {
return "Usage: /session ttl <duration|off> (example: /session ttl 24h)";
}
function parseSessionTtlMs(raw: string): number {
const normalized = raw.trim().toLowerCase();
if (!normalized) {
throw new Error("missing ttl");
}
if (SESSION_TTL_OFF_VALUES.has(normalized)) {
return 0;
}
if (/^\d+(?:\.\d+)?$/.test(normalized)) {
const hours = Number(normalized);
if (!Number.isFinite(hours) || hours < 0) {
throw new Error("invalid ttl");
}
return Math.round(hours * 60 * 60 * 1000);
}
return parseDurationMs(normalized, { defaultUnit: "h" });
}
function formatSessionExpiry(expiresAt: number) {
return new Date(expiresAt).toISOString();
}
async function applyAbortTarget(params: {
abortTarget: ReturnType<typeof resolveAbortTarget>;
sessionStore?: Record<string, SessionEntry>;
storePath?: string;
abortKey?: string;
}) {
const { abortTarget } = params;
if (abortTarget.sessionId) {
abortEmbeddedPiRun(abortTarget.sessionId);
}
if (abortTarget.entry && params.sessionStore && abortTarget.key) {
abortTarget.entry.abortedLastRun = true;
abortTarget.entry.updatedAt = Date.now();
params.sessionStore[abortTarget.key] = abortTarget.entry;
if (params.storePath) {
await updateSessionStore(params.storePath, (store) => {
store[abortTarget.key] = abortTarget.entry;
});
}
} else if (params.abortKey) {
setAbortMemory(params.abortKey, true);
}
}
async function persistSessionEntry(params: Parameters<CommandHandler>[0]): Promise<boolean> {
if (!params.sessionEntry || !params.sessionStore || !params.sessionKey) {
return false;
}
params.sessionEntry.updatedAt = Date.now();
params.sessionStore[params.sessionKey] = params.sessionEntry;
if (params.storePath) {
await updateSessionStore(params.storePath, (store) => {
store[params.sessionKey] = params.sessionEntry as SessionEntry;
});
}
return true;
}
export const handleActivationCommand: CommandHandler = async (params, allowTextCommands) => {
if (!allowTextCommands) {
return null;
}
const activationCommand = parseActivationCommand(params.command.commandBodyNormalized);
if (!activationCommand.hasCommand) {
return null;
}
if (!params.isGroup) {
return {
shouldContinue: false,
reply: { text: "⚙️ Group activation only applies to group chats." },
};
}
if (!params.command.isAuthorizedSender) {
logVerbose(
`Ignoring /activation from unauthorized sender in group: ${params.command.senderId || "<unknown>"}`,
);
return { shouldContinue: false };
}
if (!activationCommand.mode) {
return {
shouldContinue: false,
reply: { text: "⚙️ Usage: /activation mention|always" },
};
}
if (params.sessionEntry && params.sessionStore && params.sessionKey) {
params.sessionEntry.groupActivation = activationCommand.mode;
params.sessionEntry.groupActivationNeedsSystemIntro = true;
await persistSessionEntry(params);
}
return {
shouldContinue: false,
reply: {
text: `⚙️ Group activation set to ${activationCommand.mode}.`,
},
};
};
export const handleSendPolicyCommand: CommandHandler = async (params, allowTextCommands) => {
if (!allowTextCommands) {
return null;
}
const sendPolicyCommand = parseSendPolicyCommand(params.command.commandBodyNormalized);
if (!sendPolicyCommand.hasCommand) {
return null;
}
if (!params.command.isAuthorizedSender) {
logVerbose(
`Ignoring /send from unauthorized sender: ${params.command.senderId || "<unknown>"}`,
);
return { shouldContinue: false };
}
if (!sendPolicyCommand.mode) {
return {
shouldContinue: false,
reply: { text: "⚙️ Usage: /send on|off|inherit" },
};
}
if (params.sessionEntry && params.sessionStore && params.sessionKey) {
if (sendPolicyCommand.mode === "inherit") {
delete params.sessionEntry.sendPolicy;
} else {
params.sessionEntry.sendPolicy = sendPolicyCommand.mode;
}
await persistSessionEntry(params);
}
const label =
sendPolicyCommand.mode === "inherit"
? "inherit"
: sendPolicyCommand.mode === "allow"
? "on"
: "off";
return {
shouldContinue: false,
reply: { text: `⚙️ Send policy set to ${label}.` },
};
};
export const handleUsageCommand: CommandHandler = async (params, allowTextCommands) => {
if (!allowTextCommands) {
return null;
}
const normalized = params.command.commandBodyNormalized;
if (normalized !== "/usage" && !normalized.startsWith("/usage ")) {
return null;
}
if (!params.command.isAuthorizedSender) {
logVerbose(
`Ignoring /usage from unauthorized sender: ${params.command.senderId || "<unknown>"}`,
);
return { shouldContinue: false };
}
const rawArgs = normalized === "/usage" ? "" : normalized.slice("/usage".length).trim();
const requested = rawArgs ? normalizeUsageDisplay(rawArgs) : undefined;
if (rawArgs.toLowerCase().startsWith("cost")) {
const sessionSummary = await loadSessionCostSummary({
sessionId: params.sessionEntry?.sessionId,
sessionEntry: params.sessionEntry,
sessionFile: params.sessionEntry?.sessionFile,
config: params.cfg,
agentId: params.agentId,
});
const summary = await loadCostUsageSummary({ days: 30, config: params.cfg });
const sessionCost = formatUsd(sessionSummary?.totalCost);
const sessionTokens = sessionSummary?.totalTokens
? formatTokenCount(sessionSummary.totalTokens)
: undefined;
const sessionMissing = sessionSummary?.missingCostEntries ?? 0;
const sessionSuffix = sessionMissing > 0 ? " (partial)" : "";
const sessionLine =
sessionCost || sessionTokens
? `Session ${sessionCost ?? "n/a"}${sessionSuffix}${sessionTokens ? ` · ${sessionTokens} tokens` : ""}`
: "Session n/a";
const todayKey = new Date().toLocaleDateString("en-CA");
const todayEntry = summary.daily.find((entry) => entry.date === todayKey);
const todayCost = formatUsd(todayEntry?.totalCost);
const todayMissing = todayEntry?.missingCostEntries ?? 0;
const todaySuffix = todayMissing > 0 ? " (partial)" : "";
const todayLine = `Today ${todayCost ?? "n/a"}${todaySuffix}`;
const last30Cost = formatUsd(summary.totals.totalCost);
const last30Missing = summary.totals.missingCostEntries;
const last30Suffix = last30Missing > 0 ? " (partial)" : "";
const last30Line = `Last 30d ${last30Cost ?? "n/a"}${last30Suffix}`;
return {
shouldContinue: false,
reply: { text: `💸 Usage cost\n${sessionLine}\n${todayLine}\n${last30Line}` },
};
}
if (rawArgs && !requested) {
return {
shouldContinue: false,
reply: { text: "⚙️ Usage: /usage off|tokens|full|cost" },
};
}
const currentRaw =
params.sessionEntry?.responseUsage ??
(params.sessionKey ? params.sessionStore?.[params.sessionKey]?.responseUsage : undefined);
const current = resolveResponseUsageMode(currentRaw);
const next = requested ?? (current === "off" ? "tokens" : current === "tokens" ? "full" : "off");
if (params.sessionEntry && params.sessionStore && params.sessionKey) {
if (next === "off") {
delete params.sessionEntry.responseUsage;
} else {
params.sessionEntry.responseUsage = next;
}
await persistSessionEntry(params);
}
return {
shouldContinue: false,
reply: {
text: `⚙️ Usage footer: ${next}.`,
},
};
};
export const handleSessionCommand: CommandHandler = async (params, allowTextCommands) => {
if (!allowTextCommands) {
return null;
}
const normalized = params.command.commandBodyNormalized;
if (!/^\/session(?:\s|$)/.test(normalized)) {
return null;
}
if (!params.command.isAuthorizedSender) {
logVerbose(
`Ignoring /session from unauthorized sender: ${params.command.senderId || "<unknown>"}`,
);
return { shouldContinue: false };
}
const rest = normalized.slice(SESSION_COMMAND_PREFIX.length).trim();
const tokens = rest.split(/\s+/).filter(Boolean);
const action = tokens[0]?.toLowerCase();
if (action !== "ttl") {
return {
shouldContinue: false,
reply: { text: resolveSessionCommandUsage() },
};
}
if (!isDiscordSurface(params)) {
return {
shouldContinue: false,
reply: { text: "⚠️ /session ttl is currently available for Discord thread-bound sessions." },
};
}
const threadId =
params.ctx.MessageThreadId != null ? String(params.ctx.MessageThreadId).trim() : "";
if (!threadId) {
return {
shouldContinue: false,
reply: { text: "⚠️ /session ttl must be run inside a focused Discord thread." },
};
}
const accountId = resolveDiscordAccountId(params);
const threadBindings = getThreadBindingManager(accountId);
if (!threadBindings) {
return {
shouldContinue: false,
reply: { text: "⚠️ Discord thread bindings are unavailable for this account." },
};
}
const binding = threadBindings.getByThreadId(threadId);
if (!binding) {
return {
shouldContinue: false,
reply: { text: " This thread is not currently focused." },
};
}
const ttlArgRaw = tokens.slice(1).join("");
if (!ttlArgRaw) {
const expiresAt = binding.expiresAt;
if (typeof expiresAt === "number" && Number.isFinite(expiresAt) && expiresAt > Date.now()) {
return {
shouldContinue: false,
reply: {
text: ` Session TTL active (${formatThreadBindingTtlLabel(expiresAt - Date.now())}, auto-unfocus at ${formatSessionExpiry(expiresAt)}).`,
},
};
}
return {
shouldContinue: false,
reply: { text: " Session TTL is currently disabled for this focused session." },
};
}
const senderId = params.command.senderId?.trim() || "";
if (binding.boundBy && binding.boundBy !== "system" && senderId && senderId !== binding.boundBy) {
return {
shouldContinue: false,
reply: { text: `⚠️ Only ${binding.boundBy} can update session TTL for this thread.` },
};
}
let ttlMs: number;
try {
ttlMs = parseSessionTtlMs(ttlArgRaw);
} catch {
return {
shouldContinue: false,
reply: { text: resolveSessionCommandUsage() },
};
}
const updatedBindings = setThreadBindingTtlBySessionKey({
targetSessionKey: binding.targetSessionKey,
accountId,
ttlMs,
});
if (updatedBindings.length === 0) {
return {
shouldContinue: false,
reply: { text: "⚠️ Failed to update session TTL for the current binding." },
};
}
if (ttlMs <= 0) {
return {
shouldContinue: false,
reply: {
text: `✅ Session TTL disabled for ${updatedBindings.length} binding${updatedBindings.length === 1 ? "" : "s"}.`,
},
};
}
const expiresAt = updatedBindings[0]?.expiresAt;
const expiryLabel =
typeof expiresAt === "number" && Number.isFinite(expiresAt)
? formatSessionExpiry(expiresAt)
: "n/a";
return {
shouldContinue: false,
reply: {
text: `✅ Session TTL set to ${formatThreadBindingTtlLabel(ttlMs)} for ${updatedBindings.length} binding${updatedBindings.length === 1 ? "" : "s"} (auto-unfocus at ${expiryLabel}).`,
},
};
};
export const handleRestartCommand: CommandHandler = async (params, allowTextCommands) => {
if (!allowTextCommands) {
return null;
}
if (params.command.commandBodyNormalized !== "/restart") {
return null;
}
if (!params.command.isAuthorizedSender) {
logVerbose(
`Ignoring /restart from unauthorized sender: ${params.command.senderId || "<unknown>"}`,
);
return { shouldContinue: false };
}
if (!isRestartEnabled(params.cfg)) {
return {
shouldContinue: false,
reply: {
text: "⚠️ /restart is disabled (commands.restart=false).",
},
};
}
const hasSigusr1Listener = process.listenerCount("SIGUSR1") > 0;
if (hasSigusr1Listener) {
scheduleGatewaySigusr1Restart({ reason: "/restart" });
return {
shouldContinue: false,
reply: {
text: "⚙️ Restarting OpenClaw in-process (SIGUSR1); back in a few seconds.",
},
};
}
const restartMethod = triggerOpenClawRestart();
if (!restartMethod.ok) {
const detail = restartMethod.detail ? ` Details: ${restartMethod.detail}` : "";
return {
shouldContinue: false,
reply: {
text: `⚠️ Restart failed (${restartMethod.method}).${detail}`,
},
};
}
return {
shouldContinue: false,
reply: {
text: `⚙️ Restarting OpenClaw via ${restartMethod.method}; give me a few seconds to come back online.`,
},
};
};
export const handleStopCommand: CommandHandler = async (params, allowTextCommands) => {
if (!allowTextCommands) {
return null;
}
if (params.command.commandBodyNormalized !== "/stop") {
return null;
}
if (!params.command.isAuthorizedSender) {
logVerbose(
`Ignoring /stop from unauthorized sender: ${params.command.senderId || "<unknown>"}`,
);
return { shouldContinue: false };
}
const abortTarget = resolveAbortTarget({
ctx: params.ctx,
sessionKey: params.sessionKey,
sessionEntry: params.sessionEntry,
sessionStore: params.sessionStore,
});
const cleared = clearSessionQueues([abortTarget.key, abortTarget.sessionId]);
if (cleared.followupCleared > 0 || cleared.laneCleared > 0) {
logVerbose(
`stop: cleared followups=${cleared.followupCleared} lane=${cleared.laneCleared} keys=${cleared.keys.join(",")}`,
);
}
await applyAbortTarget({
abortTarget,
sessionStore: params.sessionStore,
storePath: params.storePath,
abortKey: params.command.abortKey,
});
// Trigger internal hook for stop command
const hookEvent = createInternalHookEvent(
"command",
"stop",
abortTarget.key ?? params.sessionKey ?? "",
{
sessionEntry: abortTarget.entry ?? params.sessionEntry,
sessionId: abortTarget.sessionId,
commandSource: params.command.surface,
senderId: params.command.senderId,
},
);
await triggerInternalHook(hookEvent);
const { stopped } = stopSubagentsForRequester({
cfg: params.cfg,
requesterSessionKey: abortTarget.key ?? params.sessionKey,
});
return { shouldContinue: false, reply: { text: formatAbortReplyText(stopped) } };
};
export const handleAbortTrigger: CommandHandler = async (params, allowTextCommands) => {
if (!allowTextCommands) {
return null;
}
if (!isAbortTrigger(params.command.rawBodyNormalized)) {
return null;
}
const abortTarget = resolveAbortTarget({
ctx: params.ctx,
sessionKey: params.sessionKey,
sessionEntry: params.sessionEntry,
sessionStore: params.sessionStore,
});
await applyAbortTarget({
abortTarget,
sessionStore: params.sessionStore,
storePath: params.storePath,
abortKey: params.command.abortKey,
});
return { shouldContinue: false, reply: { text: "⚙️ Agent was aborted." } };
};