Tyler Yust b8f66c260d
Agents: add nested subagent orchestration controls and reduce subagent token waste (#14447)
* Agents: add subagent orchestration controls

* Agents: add subagent orchestration controls (WIP uncommitted changes)

* feat(subagents): add depth-based spawn gating for sub-sub-agents

* feat(subagents): tool policy, registry, and announce chain for nested agents

* feat(subagents): system prompt, docs, changelog for nested sub-agents

* fix(subagents): prevent model fallback override, show model during active runs, and block context overflow fallback

Bug 1: When a session has an explicit model override (e.g., gpt/openai-codex),
the fallback candidate logic in resolveFallbackCandidates silently appended the
global primary model (opus) as a backstop. On reinjection/steer with a transient
error, the session could fall back to opus which has a smaller context window
and crash. Fix: when storedModelOverride is set, pass fallbacksOverride ?? []
instead of undefined, preventing the implicit primary backstop.

Bug 2: Active subagents showed 'model n/a' in /subagents list because
resolveModelDisplay only read entry.model/modelProvider (populated after run
completes). Fix: fall back to modelOverride/providerOverride fields which are
populated at spawn time via sessions.patch.

Bug 3: Context overflow errors (prompt too long, context_length_exceeded) could
theoretically escape runEmbeddedPiAgent and be treated as failover candidates
in runWithModelFallback, causing a switch to a model with a smaller context
window. Fix: in runWithModelFallback, detect context overflow errors via
isLikelyContextOverflowError and rethrow them immediately instead of trying the
next model candidate.

* fix(subagents): track spawn depth in session store and fix announce routing for nested agents

* Fix compaction status tracking and dedupe overflow compaction triggers

* fix(subagents): enforce depth block via session store and implement cascade kill

* fix: inject group chat context into system prompt

* fix(subagents): always write model to session store at spawn time

* Preserve spawnDepth when agent handler rewrites session entry

* fix(subagents): suppress announce on steer-restart

* fix(subagents): fallback spawned session model to runtime default

* fix(subagents): enforce spawn depth when caller key resolves by sessionId

* feat(subagents): implement active-first ordering for numeric targets and enhance task display

- Added a test to verify that subagents with numeric targets follow an active-first list ordering.
- Updated `resolveSubagentTarget` to sort subagent runs based on active status and recent activity.
- Enhanced task display in command responses to prevent truncation of long task descriptions.
- Introduced new utility functions for compacting task text and managing subagent run states.

* fix(subagents): show model for active runs via run record fallback

When the spawned model matches the agent's default model, the session
store's override fields are intentionally cleared (isDefault: true).
The model/modelProvider fields are only populated after the run
completes. This left active subagents showing 'model n/a'.

Fix: store the resolved model on SubagentRunRecord at registration
time, and use it as a fallback in both display paths (subagents tool
and /subagents command) when the session store entry has no model info.

Changes:
- SubagentRunRecord: add optional model field
- registerSubagentRun: accept and persist model param
- sessions-spawn-tool: pass resolvedModel to registerSubagentRun
- subagents-tool: pass run record model as fallback to resolveModelDisplay
- commands-subagents: pass run record model as fallback to resolveModelDisplay

* feat(chat): implement session key resolution and reset on sidebar navigation

- Added functions to resolve the main session key and reset chat state when switching sessions from the sidebar.
- Updated the `renderTab` function to handle session key changes when navigating to the chat tab.
- Introduced a test to verify that the session resets to "main" when opening chat from the sidebar navigation.

* fix: subagent timeout=0 passthrough and fallback prompt duplication

Bug 1: runTimeoutSeconds=0 now means 'no timeout' instead of applying 600s default
- sessions-spawn-tool: default to undefined (not 0) when neither timeout param
  is provided; use != null check so explicit 0 passes through to gateway
- agent.ts: accept 0 as valid timeout (resolveAgentTimeoutMs already handles
  0 → MAX_SAFE_TIMEOUT_MS)

Bug 2: model fallback no longer re-injects the original prompt as a duplicate
- agent.ts: track fallback attempt index; on retries use a short continuation
  message instead of the full original prompt since the session file already
  contains it from the first attempt
- Also skip re-sending images on fallback retries (already in session)

* feat(subagents): truncate long task descriptions in subagents command output

- Introduced a new utility function to format task previews, limiting their length to improve readability.
- Updated the command handler to use the new formatting function, ensuring task descriptions are truncated appropriately.
- Adjusted related tests to verify that long task descriptions are now truncated in the output.

* refactor(subagents): update subagent registry path resolution and improve command output formatting

- Replaced direct import of STATE_DIR with a utility function to resolve the state directory dynamically.
- Enhanced the formatting of command output for active and recent subagents, adding separators for better readability.
- Updated related tests to reflect changes in command output structure.

* fix(subagent): default sessions_spawn to no timeout when runTimeoutSeconds omitted

The previous fix (75a791106) correctly handled the case where
runTimeoutSeconds was explicitly set to 0 ("no timeout"). However,
when models omit the parameter entirely (which is common since the
schema marks it as optional), runTimeoutSeconds resolved to undefined.

undefined flowed through the chain as:
  sessions_spawn → timeout: undefined (since undefined != null is false)
  → gateway agent handler → agentCommand opts.timeout: undefined
  → resolveAgentTimeoutMs({ overrideSeconds: undefined })
  → DEFAULT_AGENT_TIMEOUT_SECONDS (600s = 10 minutes)

This caused subagents to be killed at exactly 10 minutes even though
the user's intent (via TOOLS.md) was for subagents to run without a
timeout.

Fix: default runTimeoutSeconds to 0 (no timeout) when neither
runTimeoutSeconds nor timeoutSeconds is provided by the caller.
Subagent spawns are long-running by design and should not inherit the
600s agent-command default timeout.

* fix(subagent): accept timeout=0 in agent-via-gateway path (second 600s default)

* fix: thread timeout override through getReplyFromConfig dispatch path

getReplyFromConfig called resolveAgentTimeoutMs({ cfg }) with no override,
always falling back to the config default (600s). Add timeoutOverrideSeconds
to GetReplyOptions and pass it through as overrideSeconds so callers of the
dispatch chain can specify a custom timeout (0 = no timeout).

This complements the existing timeout threading in agentCommand and the
cron isolated-agent runner, which already pass overrideSeconds correctly.

* feat(model-fallback): normalize OpenAI Codex model references and enhance fallback handling

- Added normalization for OpenAI Codex model references, specifically converting "gpt-5.3-codex" to "openai-codex" before execution.
- Updated the `resolveFallbackCandidates` function to utilize the new normalization logic.
- Enhanced tests to verify the correct behavior of model normalization and fallback mechanisms.
- Introduced a new test case to ensure that the normalization process works as expected for various input formats.

* feat(tests): add unit tests for steer failure behavior in openclaw-tools

- Introduced a new test file to validate the behavior of subagents when steer replacement dispatch fails.
- Implemented tests to ensure that the announce behavior is restored correctly and that the suppression reason is cleared as expected.
- Enhanced the subagent registry with a new function to clear steer restart suppression.
- Updated related components to support the new test scenarios.

* fix(subagents): replace stop command with kill in slash commands and documentation

- Updated the `/subagents` command to replace `stop` with `kill` for consistency in controlling sub-agent runs.
- Modified related documentation to reflect the change in command usage.
- Removed legacy timeoutSeconds references from the sessions-spawn-tool schema and tests to streamline timeout handling.
- Enhanced tests to ensure correct behavior of the updated commands and their interactions.

* feat(tests): add unit tests for readLatestAssistantReply function

- Introduced a new test file for the `readLatestAssistantReply` function to validate its behavior with various message scenarios.
- Implemented tests to ensure the function correctly retrieves the latest assistant message and handles cases where the latest message has no text.
- Mocked the gateway call to simulate different message histories for comprehensive testing.

* feat(tests): enhance subagent kill-all cascade tests and announce formatting

- Added a new test to verify that the `kill-all` command cascades through ended parents to active descendants in subagents.
- Updated the subagent announce formatting tests to reflect changes in message structure, including the replacement of "Findings:" with "Result:" and the addition of new expectations for message content.
- Improved the handling of long findings and stats in the announce formatting logic to ensure concise output.
- Refactored related functions to enhance clarity and maintainability in the subagent registry and tools.

* refactor(subagent): update announce formatting and remove unused constants

- Modified the subagent announce formatting to replace "Findings:" with "Result:" and adjusted related expectations in tests.
- Removed constants for maximum announce findings characters and summary words, simplifying the announcement logic.
- Updated the handling of findings to retain full content instead of truncating, ensuring more informative outputs.
- Cleaned up unused imports in the commands-subagents file to enhance code clarity.

* feat(tests): enhance billing error handling in user-facing text

- Added tests to ensure that normal text mentioning billing plans is not rewritten, preserving user context.
- Updated the `isBillingErrorMessage` and `sanitizeUserFacingText` functions to improve handling of billing-related messages.
- Introduced new test cases for various scenarios involving billing messages to ensure accurate processing and output.
- Enhanced the subagent announce flow to correctly manage active descendant runs, preventing premature announcements.

* feat(subagent): enhance workflow guidance and auto-announcement clarity

- Added a new guideline in the subagent system prompt to emphasize trust in push-based completion, discouraging busy polling for status updates.
- Updated documentation to clarify that sub-agents will automatically announce their results, improving user understanding of the workflow.
- Enhanced tests to verify the new guidance on avoiding polling loops and to ensure the accuracy of the updated prompts.

* fix(cron): avoid announcing interim subagent spawn acks

* chore: clean post-rebase imports

* fix(cron): fall back to child replies when parent stays interim

* fix(subagents): make active-run guidance advisory

* fix(subagents): update announce flow to handle active descendants and enhance test coverage

- Modified the announce flow to defer announcements when active descendant runs are present, ensuring accurate status reporting.
- Updated tests to verify the new behavior, including scenarios where no fallback requester is available and ensuring proper handling of finished subagents.
- Enhanced the announce formatting to include an `expectFinal` flag for better clarity in the announcement process.

* fix(subagents): enhance announce flow and formatting for user updates

- Updated the announce flow to provide clearer instructions for user updates based on active subagent runs and requester context.
- Refactored the announcement logic to improve clarity and ensure internal context remains private.
- Enhanced tests to verify the new message expectations and formatting, including updated prompts for user-facing updates.
- Introduced a new function to build reply instructions based on session context, improving the overall announcement process.

* fix: resolve prep blockers and changelog placement (#14447) (thanks @tyler6204)

* fix: restore cron delivery-plan import after rebase (#14447) (thanks @tyler6204)

* fix: resolve test failures from rebase conflicts (#14447) (thanks @tyler6204)

* fix: apply formatting after rebase (#14447) (thanks @tyler6204)
2026-02-14 22:03:45 -08:00

747 lines
28 KiB
TypeScript

import type { AgentMessage } from "@mariozechner/pi-agent-core";
import {
createAgentSession,
estimateTokens,
SessionManager,
SettingsManager,
} from "@mariozechner/pi-coding-agent";
import fs from "node:fs/promises";
import os from "node:os";
import type { ReasoningLevel, ThinkLevel } from "../../auto-reply/thinking.js";
import type { OpenClawConfig } from "../../config/config.js";
import type { ExecElevatedDefaults } from "../bash-tools.js";
import type { EmbeddedPiCompactResult } from "./types.js";
import { resolveHeartbeatPrompt } from "../../auto-reply/heartbeat.js";
import { resolveChannelCapabilities } from "../../config/channel-capabilities.js";
import { getMachineDisplayName } from "../../infra/machine-name.js";
import { getGlobalHookRunner } from "../../plugins/hook-runner-global.js";
import { type enqueueCommand, enqueueCommandInLane } from "../../process/command-queue.js";
import { isCronSessionKey, isSubagentSessionKey } from "../../routing/session-key.js";
import { resolveSignalReactionLevel } from "../../signal/reaction-level.js";
import { resolveTelegramInlineButtonsScope } from "../../telegram/inline-buttons.js";
import { resolveTelegramReactionLevel } from "../../telegram/reaction-level.js";
import { buildTtsSystemPromptHint } from "../../tts/tts.js";
import { resolveUserPath } from "../../utils.js";
import { normalizeMessageChannel } from "../../utils/message-channel.js";
import { isReasoningTagProvider } from "../../utils/provider-utils.js";
import { resolveOpenClawAgentDir } from "../agent-paths.js";
import { resolveSessionAgentIds } from "../agent-scope.js";
import { makeBootstrapWarn, resolveBootstrapContextForRun } from "../bootstrap-files.js";
import { listChannelSupportedActions, resolveChannelMessageToolHints } from "../channel-tools.js";
import { formatUserTime, resolveUserTimeFormat, resolveUserTimezone } from "../date-time.js";
import { DEFAULT_MODEL, DEFAULT_PROVIDER } from "../defaults.js";
import { resolveOpenClawDocsPath } from "../docs-path.js";
import { getApiKeyForModel, resolveModelAuthMode } from "../model-auth.js";
import { ensureOpenClawModelsJson } from "../models-config.js";
import {
ensureSessionHeader,
validateAnthropicTurns,
validateGeminiTurns,
} from "../pi-embedded-helpers.js";
import {
ensurePiCompactionReserveTokens,
resolveCompactionReserveTokensFloor,
} from "../pi-settings.js";
import { createOpenClawCodingTools } from "../pi-tools.js";
import { resolveSandboxContext } from "../sandbox.js";
import { repairSessionFileIfNeeded } from "../session-file-repair.js";
import { guardSessionManager } from "../session-tool-result-guard-wrapper.js";
import { sanitizeToolUseResultPairing } from "../session-transcript-repair.js";
import { acquireSessionWriteLock } from "../session-write-lock.js";
import { detectRuntimeShell } from "../shell-utils.js";
import {
applySkillEnvOverrides,
applySkillEnvOverridesFromSnapshot,
loadWorkspaceSkillEntries,
resolveSkillsPromptForRun,
type SkillSnapshot,
} from "../skills.js";
import { resolveTranscriptPolicy } from "../transcript-policy.js";
import { compactWithSafetyTimeout } from "./compaction-safety-timeout.js";
import { buildEmbeddedExtensionPaths } from "./extensions.js";
import {
logToolSchemasForGoogle,
sanitizeSessionHistory,
sanitizeToolsForGoogle,
} from "./google.js";
import { getDmHistoryLimitFromSessionKey, limitHistoryTurns } from "./history.js";
import { resolveGlobalLane, resolveSessionLane } from "./lanes.js";
import { log } from "./logger.js";
import { buildModelAliasLines, resolveModel } from "./model.js";
import { buildEmbeddedSandboxInfo } from "./sandbox-info.js";
import { prewarmSessionFile, trackSessionManagerAccess } from "./session-manager-cache.js";
import {
applySystemPromptOverrideToSession,
buildEmbeddedSystemPrompt,
createSystemPromptOverride,
} from "./system-prompt.js";
import { splitSdkTools } from "./tool-split.js";
import { describeUnknownError, mapThinkingLevel } from "./utils.js";
import { flushPendingToolResultsAfterIdle } from "./wait-for-idle-before-flush.js";
export type CompactEmbeddedPiSessionParams = {
sessionId: string;
runId?: string;
sessionKey?: string;
messageChannel?: string;
messageProvider?: string;
agentAccountId?: string;
authProfileId?: string;
/** Group id for channel-level tool policy resolution. */
groupId?: string | null;
/** Group channel label (e.g. #general) for channel-level tool policy resolution. */
groupChannel?: string | null;
/** Group space label (e.g. guild/team id) for channel-level tool policy resolution. */
groupSpace?: string | null;
/** Parent session key for subagent policy inheritance. */
spawnedBy?: string | null;
/** Whether the sender is an owner (required for owner-only tools). */
senderIsOwner?: boolean;
sessionFile: string;
workspaceDir: string;
agentDir?: string;
config?: OpenClawConfig;
skillsSnapshot?: SkillSnapshot;
provider?: string;
model?: string;
thinkLevel?: ThinkLevel;
reasoningLevel?: ReasoningLevel;
bashElevated?: ExecElevatedDefaults;
customInstructions?: string;
trigger?: "overflow" | "manual";
diagId?: string;
attempt?: number;
maxAttempts?: number;
lane?: string;
enqueue?: typeof enqueueCommand;
extraSystemPrompt?: string;
ownerNumbers?: string[];
};
type CompactionMessageMetrics = {
messages: number;
historyTextChars: number;
toolResultChars: number;
estTokens?: number;
contributors: Array<{ role: string; chars: number; tool?: string }>;
};
function createCompactionDiagId(): string {
return `cmp-${Date.now().toString(36)}-${Math.random().toString(36).slice(2, 8)}`;
}
function getMessageTextChars(msg: AgentMessage): number {
const content = (msg as { content?: unknown }).content;
if (typeof content === "string") {
return content.length;
}
if (!Array.isArray(content)) {
return 0;
}
let total = 0;
for (const block of content) {
if (!block || typeof block !== "object") {
continue;
}
const text = (block as { text?: unknown }).text;
if (typeof text === "string") {
total += text.length;
}
}
return total;
}
function resolveMessageToolLabel(msg: AgentMessage): string | undefined {
const candidate =
(msg as { toolName?: unknown }).toolName ??
(msg as { name?: unknown }).name ??
(msg as { tool?: unknown }).tool;
return typeof candidate === "string" && candidate.trim().length > 0 ? candidate : undefined;
}
function summarizeCompactionMessages(messages: AgentMessage[]): CompactionMessageMetrics {
let historyTextChars = 0;
let toolResultChars = 0;
const contributors: Array<{ role: string; chars: number; tool?: string }> = [];
let estTokens = 0;
let tokenEstimationFailed = false;
for (const msg of messages) {
const role = typeof msg.role === "string" ? msg.role : "unknown";
const chars = getMessageTextChars(msg);
historyTextChars += chars;
if (role === "toolResult") {
toolResultChars += chars;
}
contributors.push({ role, chars, tool: resolveMessageToolLabel(msg) });
if (!tokenEstimationFailed) {
try {
estTokens += estimateTokens(msg);
} catch {
tokenEstimationFailed = true;
}
}
}
return {
messages: messages.length,
historyTextChars,
toolResultChars,
estTokens: tokenEstimationFailed ? undefined : estTokens,
contributors: contributors.toSorted((a, b) => b.chars - a.chars).slice(0, 3),
};
}
function classifyCompactionReason(reason?: string): string {
const text = (reason ?? "").trim().toLowerCase();
if (!text) {
return "unknown";
}
if (text.includes("nothing to compact")) {
return "no_compactable_entries";
}
if (text.includes("below threshold")) {
return "below_threshold";
}
if (text.includes("already compacted")) {
return "already_compacted_recently";
}
if (text.includes("guard")) {
return "guard_blocked";
}
if (text.includes("summary")) {
return "summary_failed";
}
if (text.includes("timed out") || text.includes("timeout")) {
return "timeout";
}
if (
text.includes("400") ||
text.includes("401") ||
text.includes("403") ||
text.includes("429")
) {
return "provider_error_4xx";
}
if (
text.includes("500") ||
text.includes("502") ||
text.includes("503") ||
text.includes("504")
) {
return "provider_error_5xx";
}
return "unknown";
}
/**
* Core compaction logic without lane queueing.
* Use this when already inside a session/global lane to avoid deadlocks.
*/
export async function compactEmbeddedPiSessionDirect(
params: CompactEmbeddedPiSessionParams,
): Promise<EmbeddedPiCompactResult> {
const startedAt = Date.now();
const diagId = params.diagId?.trim() || createCompactionDiagId();
const trigger = params.trigger ?? "manual";
const attempt = params.attempt ?? 1;
const maxAttempts = params.maxAttempts ?? 1;
const runId = params.runId ?? params.sessionId;
const resolvedWorkspace = resolveUserPath(params.workspaceDir);
const prevCwd = process.cwd();
const provider = (params.provider ?? DEFAULT_PROVIDER).trim() || DEFAULT_PROVIDER;
const modelId = (params.model ?? DEFAULT_MODEL).trim() || DEFAULT_MODEL;
const agentDir = params.agentDir ?? resolveOpenClawAgentDir();
await ensureOpenClawModelsJson(params.config, agentDir);
const { model, error, authStorage, modelRegistry } = resolveModel(
provider,
modelId,
agentDir,
params.config,
);
if (!model) {
const reason = error ?? `Unknown model: ${provider}/${modelId}`;
log.warn(
`[compaction-diag] end runId=${runId} sessionKey=${params.sessionKey ?? params.sessionId} ` +
`diagId=${diagId} trigger=${trigger} provider=${provider}/${modelId} ` +
`attempt=${attempt} maxAttempts=${maxAttempts} outcome=failed reason=${classifyCompactionReason(reason)} ` +
`durationMs=${Date.now() - startedAt}`,
);
return {
ok: false,
compacted: false,
reason,
};
}
try {
const apiKeyInfo = await getApiKeyForModel({
model,
cfg: params.config,
profileId: params.authProfileId,
agentDir,
});
if (!apiKeyInfo.apiKey) {
if (apiKeyInfo.mode !== "aws-sdk") {
throw new Error(
`No API key resolved for provider "${model.provider}" (auth mode: ${apiKeyInfo.mode}).`,
);
}
} else if (model.provider === "github-copilot") {
const { resolveCopilotApiToken } = await import("../../providers/github-copilot-token.js");
const copilotToken = await resolveCopilotApiToken({
githubToken: apiKeyInfo.apiKey,
});
authStorage.setRuntimeApiKey(model.provider, copilotToken.token);
} else {
authStorage.setRuntimeApiKey(model.provider, apiKeyInfo.apiKey);
}
} catch (err) {
const reason = describeUnknownError(err);
log.warn(
`[compaction-diag] end runId=${runId} sessionKey=${params.sessionKey ?? params.sessionId} ` +
`diagId=${diagId} trigger=${trigger} provider=${provider}/${modelId} ` +
`attempt=${attempt} maxAttempts=${maxAttempts} outcome=failed reason=${classifyCompactionReason(reason)} ` +
`durationMs=${Date.now() - startedAt}`,
);
return {
ok: false,
compacted: false,
reason,
};
}
await fs.mkdir(resolvedWorkspace, { recursive: true });
const sandboxSessionKey = params.sessionKey?.trim() || params.sessionId;
const sandbox = await resolveSandboxContext({
config: params.config,
sessionKey: sandboxSessionKey,
workspaceDir: resolvedWorkspace,
});
const effectiveWorkspace = sandbox?.enabled
? sandbox.workspaceAccess === "rw"
? resolvedWorkspace
: sandbox.workspaceDir
: resolvedWorkspace;
await fs.mkdir(effectiveWorkspace, { recursive: true });
await ensureSessionHeader({
sessionFile: params.sessionFile,
sessionId: params.sessionId,
cwd: effectiveWorkspace,
});
let restoreSkillEnv: (() => void) | undefined;
process.chdir(effectiveWorkspace);
try {
const shouldLoadSkillEntries = !params.skillsSnapshot || !params.skillsSnapshot.resolvedSkills;
const skillEntries = shouldLoadSkillEntries
? loadWorkspaceSkillEntries(effectiveWorkspace)
: [];
restoreSkillEnv = params.skillsSnapshot
? applySkillEnvOverridesFromSnapshot({
snapshot: params.skillsSnapshot,
config: params.config,
})
: applySkillEnvOverrides({
skills: skillEntries ?? [],
config: params.config,
});
const skillsPrompt = resolveSkillsPromptForRun({
skillsSnapshot: params.skillsSnapshot,
entries: shouldLoadSkillEntries ? skillEntries : undefined,
config: params.config,
workspaceDir: effectiveWorkspace,
});
const sessionLabel = params.sessionKey ?? params.sessionId;
const { contextFiles } = await resolveBootstrapContextForRun({
workspaceDir: effectiveWorkspace,
config: params.config,
sessionKey: params.sessionKey,
sessionId: params.sessionId,
warn: makeBootstrapWarn({ sessionLabel, warn: (message) => log.warn(message) }),
});
const runAbortController = new AbortController();
const toolsRaw = createOpenClawCodingTools({
exec: {
elevated: params.bashElevated,
},
sandbox,
messageProvider: params.messageChannel ?? params.messageProvider,
agentAccountId: params.agentAccountId,
sessionKey: params.sessionKey ?? params.sessionId,
groupId: params.groupId,
groupChannel: params.groupChannel,
groupSpace: params.groupSpace,
spawnedBy: params.spawnedBy,
senderIsOwner: params.senderIsOwner,
agentDir,
workspaceDir: effectiveWorkspace,
config: params.config,
abortSignal: runAbortController.signal,
modelProvider: model.provider,
modelId,
modelAuthMode: resolveModelAuthMode(model.provider, params.config),
});
const tools = sanitizeToolsForGoogle({ tools: toolsRaw, provider });
logToolSchemasForGoogle({ tools, provider });
const machineName = await getMachineDisplayName();
const runtimeChannel = normalizeMessageChannel(params.messageChannel ?? params.messageProvider);
let runtimeCapabilities = runtimeChannel
? (resolveChannelCapabilities({
cfg: params.config,
channel: runtimeChannel,
accountId: params.agentAccountId,
}) ?? [])
: undefined;
if (runtimeChannel === "telegram" && params.config) {
const inlineButtonsScope = resolveTelegramInlineButtonsScope({
cfg: params.config,
accountId: params.agentAccountId ?? undefined,
});
if (inlineButtonsScope !== "off") {
if (!runtimeCapabilities) {
runtimeCapabilities = [];
}
if (
!runtimeCapabilities.some((cap) => String(cap).trim().toLowerCase() === "inlinebuttons")
) {
runtimeCapabilities.push("inlineButtons");
}
}
}
const reactionGuidance =
runtimeChannel && params.config
? (() => {
if (runtimeChannel === "telegram") {
const resolved = resolveTelegramReactionLevel({
cfg: params.config,
accountId: params.agentAccountId ?? undefined,
});
const level = resolved.agentReactionGuidance;
return level ? { level, channel: "Telegram" } : undefined;
}
if (runtimeChannel === "signal") {
const resolved = resolveSignalReactionLevel({
cfg: params.config,
accountId: params.agentAccountId ?? undefined,
});
const level = resolved.agentReactionGuidance;
return level ? { level, channel: "Signal" } : undefined;
}
return undefined;
})()
: undefined;
// Resolve channel-specific message actions for system prompt
const channelActions = runtimeChannel
? listChannelSupportedActions({
cfg: params.config,
channel: runtimeChannel,
})
: undefined;
const messageToolHints = runtimeChannel
? resolveChannelMessageToolHints({
cfg: params.config,
channel: runtimeChannel,
accountId: params.agentAccountId,
})
: undefined;
const runtimeInfo = {
host: machineName,
os: `${os.type()} ${os.release()}`,
arch: os.arch(),
node: process.version,
model: `${provider}/${modelId}`,
shell: detectRuntimeShell(),
channel: runtimeChannel,
capabilities: runtimeCapabilities,
channelActions,
};
const sandboxInfo = buildEmbeddedSandboxInfo(sandbox, params.bashElevated);
const reasoningTagHint = isReasoningTagProvider(provider);
const userTimezone = resolveUserTimezone(params.config?.agents?.defaults?.userTimezone);
const userTimeFormat = resolveUserTimeFormat(params.config?.agents?.defaults?.timeFormat);
const userTime = formatUserTime(new Date(), userTimezone, userTimeFormat);
const { defaultAgentId, sessionAgentId } = resolveSessionAgentIds({
sessionKey: params.sessionKey,
config: params.config,
});
const isDefaultAgent = sessionAgentId === defaultAgentId;
const promptMode =
isSubagentSessionKey(params.sessionKey) || isCronSessionKey(params.sessionKey)
? "minimal"
: "full";
const docsPath = await resolveOpenClawDocsPath({
workspaceDir: effectiveWorkspace,
argv1: process.argv[1],
cwd: process.cwd(),
moduleUrl: import.meta.url,
});
const ttsHint = params.config ? buildTtsSystemPromptHint(params.config) : undefined;
const appendPrompt = buildEmbeddedSystemPrompt({
workspaceDir: effectiveWorkspace,
defaultThinkLevel: params.thinkLevel,
reasoningLevel: params.reasoningLevel ?? "off",
extraSystemPrompt: params.extraSystemPrompt,
ownerNumbers: params.ownerNumbers,
reasoningTagHint,
heartbeatPrompt: isDefaultAgent
? resolveHeartbeatPrompt(params.config?.agents?.defaults?.heartbeat?.prompt)
: undefined,
skillsPrompt,
docsPath: docsPath ?? undefined,
ttsHint,
promptMode,
runtimeInfo,
reactionGuidance,
messageToolHints,
sandboxInfo,
tools,
modelAliasLines: buildModelAliasLines(params.config),
userTimezone,
userTime,
userTimeFormat,
contextFiles,
memoryCitationsMode: params.config?.memory?.citations,
});
const systemPromptOverride = createSystemPromptOverride(appendPrompt);
const sessionLock = await acquireSessionWriteLock({
sessionFile: params.sessionFile,
});
try {
await repairSessionFileIfNeeded({
sessionFile: params.sessionFile,
warn: (message) => log.warn(message),
});
await prewarmSessionFile(params.sessionFile);
const transcriptPolicy = resolveTranscriptPolicy({
modelApi: model.api,
provider,
modelId,
});
const sessionManager = guardSessionManager(SessionManager.open(params.sessionFile), {
agentId: sessionAgentId,
sessionKey: params.sessionKey,
allowSyntheticToolResults: transcriptPolicy.allowSyntheticToolResults,
});
trackSessionManagerAccess(params.sessionFile);
const settingsManager = SettingsManager.create(effectiveWorkspace, agentDir);
ensurePiCompactionReserveTokens({
settingsManager,
minReserveTokens: resolveCompactionReserveTokensFloor(params.config),
});
// Call for side effects (sets compaction/pruning runtime state)
buildEmbeddedExtensionPaths({
cfg: params.config,
sessionManager,
provider,
modelId,
model,
});
const { builtInTools, customTools } = splitSdkTools({
tools,
sandboxEnabled: !!sandbox?.enabled,
});
const { session } = await createAgentSession({
cwd: resolvedWorkspace,
agentDir,
authStorage,
modelRegistry,
model,
thinkingLevel: mapThinkingLevel(params.thinkLevel),
tools: builtInTools,
customTools,
sessionManager,
settingsManager,
});
applySystemPromptOverrideToSession(session, systemPromptOverride());
try {
const prior = await sanitizeSessionHistory({
messages: session.messages,
modelApi: model.api,
modelId,
provider,
sessionManager,
sessionId: params.sessionId,
policy: transcriptPolicy,
});
const validatedGemini = transcriptPolicy.validateGeminiTurns
? validateGeminiTurns(prior)
: prior;
const validated = transcriptPolicy.validateAnthropicTurns
? validateAnthropicTurns(validatedGemini)
: validatedGemini;
// Capture full message history BEFORE limiting — plugins need the complete conversation
const preCompactionMessages = [...session.messages];
const truncated = limitHistoryTurns(
validated,
getDmHistoryLimitFromSessionKey(params.sessionKey, params.config),
);
// Re-run tool_use/tool_result pairing repair after truncation, since
// limitHistoryTurns can orphan tool_result blocks by removing the
// assistant message that contained the matching tool_use.
const limited = transcriptPolicy.repairToolUseResultPairing
? sanitizeToolUseResultPairing(truncated)
: truncated;
if (limited.length > 0) {
session.agent.replaceMessages(limited);
}
// Run before_compaction hooks (fire-and-forget).
// The session JSONL already contains all messages on disk, so plugins
// can read sessionFile asynchronously and process in parallel with
// the compaction LLM call — no need to block or wait for after_compaction.
const hookRunner = getGlobalHookRunner();
const hookCtx = {
agentId: params.sessionKey?.split(":")[0] ?? "main",
sessionKey: params.sessionKey,
sessionId: params.sessionId,
workspaceDir: params.workspaceDir,
messageProvider: params.messageChannel ?? params.messageProvider,
};
if (hookRunner?.hasHooks("before_compaction")) {
hookRunner
.runBeforeCompaction(
{
messageCount: preCompactionMessages.length,
compactingCount: limited.length,
messages: preCompactionMessages,
sessionFile: params.sessionFile,
},
hookCtx,
)
.catch((hookErr: unknown) => {
log.warn(`before_compaction hook failed: ${String(hookErr)}`);
});
}
const diagEnabled = log.isEnabled("debug");
const preMetrics = diagEnabled ? summarizeCompactionMessages(session.messages) : undefined;
if (diagEnabled && preMetrics) {
log.debug(
`[compaction-diag] start runId=${runId} sessionKey=${params.sessionKey ?? params.sessionId} ` +
`diagId=${diagId} trigger=${trigger} provider=${provider}/${modelId} ` +
`attempt=${attempt} maxAttempts=${maxAttempts} ` +
`pre.messages=${preMetrics.messages} pre.historyTextChars=${preMetrics.historyTextChars} ` +
`pre.toolResultChars=${preMetrics.toolResultChars} pre.estTokens=${preMetrics.estTokens ?? "unknown"}`,
);
log.debug(
`[compaction-diag] contributors diagId=${diagId} top=${JSON.stringify(preMetrics.contributors)}`,
);
}
const compactStartedAt = Date.now();
const result = await compactWithSafetyTimeout(() =>
session.compact(params.customInstructions),
);
// Estimate tokens after compaction by summing token estimates for remaining messages
let tokensAfter: number | undefined;
try {
tokensAfter = 0;
for (const message of session.messages) {
tokensAfter += estimateTokens(message);
}
// Sanity check: tokensAfter should be less than tokensBefore
if (tokensAfter > result.tokensBefore) {
tokensAfter = undefined; // Don't trust the estimate
}
} catch {
// If estimation fails, leave tokensAfter undefined
tokensAfter = undefined;
}
// Run after_compaction hooks (fire-and-forget).
// Also includes sessionFile for plugins that only need to act after
// compaction completes (e.g. analytics, cleanup).
if (hookRunner?.hasHooks("after_compaction")) {
hookRunner
.runAfterCompaction(
{
messageCount: session.messages.length,
tokenCount: tokensAfter,
compactedCount: limited.length - session.messages.length,
sessionFile: params.sessionFile,
},
hookCtx,
)
.catch((hookErr) => {
log.warn(`after_compaction hook failed: ${hookErr}`);
});
}
const postMetrics = diagEnabled ? summarizeCompactionMessages(session.messages) : undefined;
if (diagEnabled && preMetrics && postMetrics) {
log.debug(
`[compaction-diag] end runId=${runId} sessionKey=${params.sessionKey ?? params.sessionId} ` +
`diagId=${diagId} trigger=${trigger} provider=${provider}/${modelId} ` +
`attempt=${attempt} maxAttempts=${maxAttempts} outcome=compacted reason=none ` +
`durationMs=${Date.now() - compactStartedAt} retrying=false ` +
`post.messages=${postMetrics.messages} post.historyTextChars=${postMetrics.historyTextChars} ` +
`post.toolResultChars=${postMetrics.toolResultChars} post.estTokens=${postMetrics.estTokens ?? "unknown"} ` +
`delta.messages=${postMetrics.messages - preMetrics.messages} ` +
`delta.historyTextChars=${postMetrics.historyTextChars - preMetrics.historyTextChars} ` +
`delta.toolResultChars=${postMetrics.toolResultChars - preMetrics.toolResultChars} ` +
`delta.estTokens=${typeof preMetrics.estTokens === "number" && typeof postMetrics.estTokens === "number" ? postMetrics.estTokens - preMetrics.estTokens : "unknown"}`,
);
}
return {
ok: true,
compacted: true,
result: {
summary: result.summary,
firstKeptEntryId: result.firstKeptEntryId,
tokensBefore: result.tokensBefore,
tokensAfter,
details: result.details,
},
};
} finally {
await flushPendingToolResultsAfterIdle({
agent: session?.agent,
sessionManager,
});
session.dispose();
}
} finally {
await sessionLock.release();
}
} catch (err) {
const reason = describeUnknownError(err);
log.warn(
`[compaction-diag] end runId=${runId} sessionKey=${params.sessionKey ?? params.sessionId} ` +
`diagId=${diagId} trigger=${trigger} provider=${provider}/${modelId} ` +
`attempt=${attempt} maxAttempts=${maxAttempts} outcome=failed reason=${classifyCompactionReason(reason)} ` +
`durationMs=${Date.now() - startedAt}`,
);
return {
ok: false,
compacted: false,
reason,
};
} finally {
restoreSkillEnv?.();
process.chdir(prevCwd);
}
}
/**
* Compacts a session with lane queueing (session lane + global lane).
* Use this from outside a lane context. If already inside a lane, use
* `compactEmbeddedPiSessionDirect` to avoid deadlocks.
*/
export async function compactEmbeddedPiSession(
params: CompactEmbeddedPiSessionParams,
): Promise<EmbeddedPiCompactResult> {
const sessionLane = resolveSessionLane(params.sessionKey?.trim() || params.sessionId);
const globalLane = resolveGlobalLane(params.lane);
const enqueueGlobal =
params.enqueue ?? ((task, opts) => enqueueCommandInLane(globalLane, task, opts));
return enqueueCommandInLane(sessionLane, () =>
enqueueGlobal(async () => compactEmbeddedPiSessionDirect(params)),
);
}