diff --git a/src/agents/pi-embedded-runner/run/attempt.ts b/src/agents/pi-embedded-runner/run/attempt.ts index fd5d4033e0a..50fce712a61 100644 --- a/src/agents/pi-embedded-runner/run/attempt.ts +++ b/src/agents/pi-embedded-runner/run/attempt.ts @@ -11,7 +11,10 @@ import { resolveHeartbeatPrompt } from "../../../auto-reply/heartbeat.js"; import { resolveChannelCapabilities } from "../../../config/channel-capabilities.js"; import type { OpenClawConfig } from "../../../config/config.js"; import { getMachineDisplayName } from "../../../infra/machine-name.js"; -import { ensureGlobalUndiciStreamTimeouts } from "../../../infra/net/undici-global-dispatcher.js"; +import { + ensureGlobalUndiciEnvProxyDispatcher, + ensureGlobalUndiciStreamTimeouts, +} from "../../../infra/net/undici-global-dispatcher.js"; import { MAX_IMAGE_BYTES } from "../../../media/constants.js"; import { getGlobalHookRunner } from "../../../plugins/hook-runner-global.js"; import type { @@ -125,6 +128,7 @@ import { installToolResultContextGuard } from "../tool-result-context-guard.js"; import { splitSdkTools } from "../tool-split.js"; import { describeUnknownError, mapThinkingLevel } from "../utils.js"; import { flushPendingToolResultsAfterIdle } from "../wait-for-idle-before-flush.js"; +import { waitForCompactionRetryWithAggregateTimeout } from "./compaction-retry-aggregate-timeout.js"; import { selectCompactionTimeoutSnapshot, shouldFlagCompactionTimeout, @@ -145,6 +149,186 @@ type PromptBuildHookRunner = { ) => Promise; }; +const SESSIONS_YIELD_INTERRUPT_CUSTOM_TYPE = "openclaw.sessions_yield_interrupt"; +const SESSIONS_YIELD_CONTEXT_CUSTOM_TYPE = "openclaw.sessions_yield"; + +// Persist a hidden context reminder so the next turn knows why the runner stopped. +function buildSessionsYieldContextMessage(message: string): string { + return `${message}\n\n[Context: The previous turn ended intentionally via sessions_yield while waiting for a follow-up event.]`; +} + +// Return a synthetic aborted response so pi-agent-core unwinds without a real provider call. +function createYieldAbortedResponse(model: { api?: string; provider?: string; id?: string }): { + [Symbol.asyncIterator]: () => AsyncGenerator; + result: () => Promise<{ + role: "assistant"; + content: Array<{ type: "text"; text: string }>; + stopReason: "aborted"; + api: string; + provider: string; + model: string; + usage: { + input: number; + output: number; + cacheRead: number; + cacheWrite: number; + totalTokens: number; + cost: { + input: number; + output: number; + cacheRead: number; + cacheWrite: number; + total: number; + }; + }; + timestamp: number; + }>; +} { + const message = { + role: "assistant" as const, + content: [{ type: "text" as const, text: "" }], + stopReason: "aborted" as const, + api: model.api ?? "", + provider: model.provider ?? "", + model: model.id ?? "", + usage: { + input: 0, + output: 0, + cacheRead: 0, + cacheWrite: 0, + totalTokens: 0, + cost: { + input: 0, + output: 0, + cacheRead: 0, + cacheWrite: 0, + total: 0, + }, + }, + timestamp: Date.now(), + }; + return { + async *[Symbol.asyncIterator]() {}, + result: async () => message, + }; +} + +// Queue a hidden steering message so pi-agent-core skips any remaining tool calls. +function queueSessionsYieldInterruptMessage(activeSession: { + agent: { steer: (message: AgentMessage) => void }; +}) { + activeSession.agent.steer({ + role: "custom", + customType: SESSIONS_YIELD_INTERRUPT_CUSTOM_TYPE, + content: "[sessions_yield interrupt]", + display: false, + details: { source: "sessions_yield" }, + timestamp: Date.now(), + }); +} + +// Append the caller-provided yield payload as a hidden session message once the run is idle. +async function persistSessionsYieldContextMessage( + activeSession: { + sendCustomMessage: ( + message: { + customType: string; + content: string; + display: boolean; + details?: Record; + }, + options?: { triggerTurn?: boolean }, + ) => Promise; + }, + message: string, +) { + await activeSession.sendCustomMessage( + { + customType: SESSIONS_YIELD_CONTEXT_CUSTOM_TYPE, + content: buildSessionsYieldContextMessage(message), + display: false, + details: { source: "sessions_yield", message }, + }, + { triggerTurn: false }, + ); +} + +// Remove the synthetic yield interrupt + aborted assistant entry from the live transcript. +function stripSessionsYieldArtifacts(activeSession: { + messages: AgentMessage[]; + agent: { replaceMessages: (messages: AgentMessage[]) => void }; + sessionManager?: unknown; +}) { + const strippedMessages = activeSession.messages.slice(); + while (strippedMessages.length > 0) { + const last = strippedMessages.at(-1) as + | AgentMessage + | { role?: string; customType?: string; stopReason?: string }; + if (last?.role === "assistant" && "stopReason" in last && last.stopReason === "aborted") { + strippedMessages.pop(); + continue; + } + if ( + last?.role === "custom" && + "customType" in last && + last.customType === SESSIONS_YIELD_INTERRUPT_CUSTOM_TYPE + ) { + strippedMessages.pop(); + continue; + } + break; + } + if (strippedMessages.length !== activeSession.messages.length) { + activeSession.agent.replaceMessages(strippedMessages); + } + + const sessionManager = activeSession.sessionManager as + | { + fileEntries?: Array<{ + type?: string; + id?: string; + parentId?: string | null; + message?: { role?: string; stopReason?: string }; + customType?: string; + }>; + byId?: Map; + leafId?: string | null; + _rewriteFile?: () => void; + } + | undefined; + const fileEntries = sessionManager?.fileEntries; + const byId = sessionManager?.byId; + if (!fileEntries || !byId) { + return; + } + + let changed = false; + while (fileEntries.length > 1) { + const last = fileEntries.at(-1); + if (!last || last.type === "session") { + break; + } + const isYieldAbortAssistant = + last.type === "message" && + last.message?.role === "assistant" && + last.message?.stopReason === "aborted"; + const isYieldInterruptMessage = + last.type === "custom_message" && last.customType === SESSIONS_YIELD_INTERRUPT_CUSTOM_TYPE; + if (!isYieldAbortAssistant && !isYieldInterruptMessage) { + break; + } + fileEntries.pop(); + if (last.id) { + byId.delete(last.id); + } + sessionManager.leafId = last.parentId ?? null; + changed = true; + } + if (changed) { + sessionManager._rewriteFile?.(); + } +} + export function isOllamaCompatProvider(model: { provider?: string; baseUrl?: string; @@ -230,32 +414,83 @@ export function wrapOllamaCompatNumCtx(baseFn: StreamFn | undefined, numCtx: num ...options, onPayload: (payload: unknown) => { if (!payload || typeof payload !== "object") { - options?.onPayload?.(payload); - return; + return options?.onPayload?.(payload, model); } const payloadRecord = payload as Record; if (!payloadRecord.options || typeof payloadRecord.options !== "object") { payloadRecord.options = {}; } (payloadRecord.options as Record).num_ctx = numCtx; - options?.onPayload?.(payload); + return options?.onPayload?.(payload, model); }, }); } -function normalizeToolCallNameForDispatch(rawName: string, allowedToolNames?: Set): string { - const trimmed = rawName.trim(); - if (!trimmed) { - // Keep whitespace-only placeholders unchanged so they do not collapse to - // empty names (which can later surface as toolName="" loops). +function resolveCaseInsensitiveAllowedToolName( + rawName: string, + allowedToolNames?: Set, +): string | null { + if (!allowedToolNames || allowedToolNames.size === 0) { + return null; + } + const folded = rawName.toLowerCase(); + let caseInsensitiveMatch: string | null = null; + for (const name of allowedToolNames) { + if (name.toLowerCase() !== folded) { + continue; + } + if (caseInsensitiveMatch && caseInsensitiveMatch !== name) { + return null; + } + caseInsensitiveMatch = name; + } + return caseInsensitiveMatch; +} + +function resolveExactAllowedToolName( + rawName: string, + allowedToolNames?: Set, +): string | null { + if (!allowedToolNames || allowedToolNames.size === 0) { + return null; + } + if (allowedToolNames.has(rawName)) { return rawName; } - if (!allowedToolNames || allowedToolNames.size === 0) { - return trimmed; + const normalized = normalizeToolName(rawName); + if (allowedToolNames.has(normalized)) { + return normalized; + } + return ( + resolveCaseInsensitiveAllowedToolName(rawName, allowedToolNames) ?? + resolveCaseInsensitiveAllowedToolName(normalized, allowedToolNames) + ); +} + +function buildStructuredToolNameCandidates(rawName: string): string[] { + const trimmed = rawName.trim(); + if (!trimmed) { + return []; } - const candidateNames = new Set([trimmed, normalizeToolName(trimmed)]); + const candidates: string[] = []; + const seen = new Set(); + const addCandidate = (value: string) => { + const candidate = value.trim(); + if (!candidate || seen.has(candidate)) { + return; + } + seen.add(candidate); + candidates.push(candidate); + }; + + addCandidate(trimmed); + addCandidate(normalizeToolName(trimmed)); + const normalizedDelimiter = trimmed.replace(/\//g, "."); + addCandidate(normalizedDelimiter); + addCandidate(normalizeToolName(normalizedDelimiter)); + const segments = normalizedDelimiter .split(".") .map((segment) => segment.trim()) @@ -263,11 +498,23 @@ function normalizeToolCallNameForDispatch(rawName: string, allowedToolNames?: Se if (segments.length > 1) { for (let index = 1; index < segments.length; index += 1) { const suffix = segments.slice(index).join("."); - candidateNames.add(suffix); - candidateNames.add(normalizeToolName(suffix)); + addCandidate(suffix); + addCandidate(normalizeToolName(suffix)); } } + return candidates; +} + +function resolveStructuredAllowedToolName( + rawName: string, + allowedToolNames?: Set, +): string | null { + if (!allowedToolNames || allowedToolNames.size === 0) { + return null; + } + + const candidateNames = buildStructuredToolNameCandidates(rawName); for (const candidate of candidateNames) { if (allowedToolNames.has(candidate)) { return candidate; @@ -275,23 +522,116 @@ function normalizeToolCallNameForDispatch(rawName: string, allowedToolNames?: Se } for (const candidate of candidateNames) { - const folded = candidate.toLowerCase(); - let caseInsensitiveMatch: string | null = null; - for (const name of allowedToolNames) { - if (name.toLowerCase() !== folded) { - continue; - } - if (caseInsensitiveMatch && caseInsensitiveMatch !== name) { - return candidate; - } - caseInsensitiveMatch = name; - } + const caseInsensitiveMatch = resolveCaseInsensitiveAllowedToolName(candidate, allowedToolNames); if (caseInsensitiveMatch) { return caseInsensitiveMatch; } } - return trimmed; + return null; +} + +function inferToolNameFromToolCallId( + rawId: string | undefined, + allowedToolNames?: Set, +): string | null { + if (!rawId || !allowedToolNames || allowedToolNames.size === 0) { + return null; + } + const id = rawId.trim(); + if (!id) { + return null; + } + + const candidateTokens = new Set(); + const addToken = (value: string) => { + const trimmed = value.trim(); + if (!trimmed) { + return; + } + candidateTokens.add(trimmed); + candidateTokens.add(trimmed.replace(/[:._/-]\d+$/, "")); + candidateTokens.add(trimmed.replace(/\d+$/, "")); + + const normalizedDelimiter = trimmed.replace(/\//g, "."); + candidateTokens.add(normalizedDelimiter); + candidateTokens.add(normalizedDelimiter.replace(/[:._-]\d+$/, "")); + candidateTokens.add(normalizedDelimiter.replace(/\d+$/, "")); + + for (const prefixPattern of [/^functions?[._-]?/i, /^tools?[._-]?/i]) { + const stripped = normalizedDelimiter.replace(prefixPattern, ""); + if (stripped !== normalizedDelimiter) { + candidateTokens.add(stripped); + candidateTokens.add(stripped.replace(/[:._-]\d+$/, "")); + candidateTokens.add(stripped.replace(/\d+$/, "")); + } + } + }; + + const preColon = id.split(":")[0] ?? id; + for (const seed of [id, preColon]) { + addToken(seed); + } + + let singleMatch: string | null = null; + for (const candidate of candidateTokens) { + const matched = resolveStructuredAllowedToolName(candidate, allowedToolNames); + if (!matched) { + continue; + } + if (singleMatch && singleMatch !== matched) { + return null; + } + singleMatch = matched; + } + + return singleMatch; +} + +function looksLikeMalformedToolNameCounter(rawName: string): boolean { + const normalizedDelimiter = rawName.trim().replace(/\//g, "."); + return ( + /^(?:functions?|tools?)[._-]?/i.test(normalizedDelimiter) && + /(?:[:._-]\d+|\d+)$/.test(normalizedDelimiter) + ); +} + +function normalizeToolCallNameForDispatch( + rawName: string, + allowedToolNames?: Set, + rawToolCallId?: string, +): string { + const trimmed = rawName.trim(); + if (!trimmed) { + // Keep whitespace-only placeholders unchanged unless we can safely infer + // a canonical name from toolCallId and allowlist. + return inferToolNameFromToolCallId(rawToolCallId, allowedToolNames) ?? rawName; + } + if (!allowedToolNames || allowedToolNames.size === 0) { + return trimmed; + } + + const exact = resolveExactAllowedToolName(trimmed, allowedToolNames); + if (exact) { + return exact; + } + // Some providers put malformed toolCallId-like strings into `name` + // itself (for example `functionsread3`). Recover conservatively from the + // name token before consulting the separate id so explicit names like + // `someOtherTool` are preserved. + const inferredFromName = inferToolNameFromToolCallId(trimmed, allowedToolNames); + if (inferredFromName) { + return inferredFromName; + } + + // If the explicit name looks like a provider-mangled tool-call id with a + // numeric suffix, fail closed when inference is ambiguous instead of routing + // to whichever structured candidate happens to match. + if (looksLikeMalformedToolNameCounter(trimmed)) { + return trimmed; + } + + return resolveStructuredAllowedToolName(trimmed, allowedToolNames) ?? trimmed; } function isToolCallBlockType(type: unknown): boolean { @@ -367,13 +707,21 @@ function trimWhitespaceFromToolCallNamesInMessage( if (!block || typeof block !== "object") { continue; } - const typedBlock = block as { type?: unknown; name?: unknown }; - if (!isToolCallBlockType(typedBlock.type) || typeof typedBlock.name !== "string") { + const typedBlock = block as { type?: unknown; name?: unknown; id?: unknown }; + if (!isToolCallBlockType(typedBlock.type)) { continue; } - const normalized = normalizeToolCallNameForDispatch(typedBlock.name, allowedToolNames); - if (normalized !== typedBlock.name) { - typedBlock.name = normalized; + const rawId = typeof typedBlock.id === "string" ? typedBlock.id : undefined; + if (typeof typedBlock.name === "string") { + const normalized = normalizeToolCallNameForDispatch(typedBlock.name, allowedToolNames, rawId); + if (normalized !== typedBlock.name) { + typedBlock.name = normalized; + } + continue; + } + const inferred = inferToolNameFromToolCallId(rawId, allowedToolNames); + if (inferred) { + typedBlock.name = inferred; } } normalizeToolCallIdsInMessage(message); @@ -434,6 +782,281 @@ export function wrapStreamFnTrimToolCallNames( }; } +function extractBalancedJsonPrefix(raw: string): string | null { + let start = 0; + while (start < raw.length && /\s/.test(raw[start] ?? "")) { + start += 1; + } + const startChar = raw[start]; + if (startChar !== "{" && startChar !== "[") { + return null; + } + + let depth = 0; + let inString = false; + let escaped = false; + for (let i = start; i < raw.length; i += 1) { + const char = raw[i]; + if (char === undefined) { + break; + } + if (inString) { + if (escaped) { + escaped = false; + } else if (char === "\\") { + escaped = true; + } else if (char === '"') { + inString = false; + } + continue; + } + if (char === '"') { + inString = true; + continue; + } + if (char === "{" || char === "[") { + depth += 1; + continue; + } + if (char === "}" || char === "]") { + depth -= 1; + if (depth === 0) { + return raw.slice(start, i + 1); + } + } + } + return null; +} + +const MAX_TOOLCALL_REPAIR_BUFFER_CHARS = 64_000; +const MAX_TOOLCALL_REPAIR_TRAILING_CHARS = 3; +const TOOLCALL_REPAIR_ALLOWED_TRAILING_RE = /^[^\s{}[\]":,\\]{1,3}$/; + +function shouldAttemptMalformedToolCallRepair(partialJson: string, delta: string): boolean { + if (/[}\]]/.test(delta)) { + return true; + } + const trimmedDelta = delta.trim(); + return ( + trimmedDelta.length > 0 && + trimmedDelta.length <= MAX_TOOLCALL_REPAIR_TRAILING_CHARS && + /[}\]]/.test(partialJson) + ); +} + +type ToolCallArgumentRepair = { + args: Record; + trailingSuffix: string; +}; + +function tryParseMalformedToolCallArguments(raw: string): ToolCallArgumentRepair | undefined { + if (!raw.trim()) { + return undefined; + } + try { + JSON.parse(raw); + return undefined; + } catch { + const jsonPrefix = extractBalancedJsonPrefix(raw); + if (!jsonPrefix) { + return undefined; + } + const suffix = raw.slice(raw.indexOf(jsonPrefix) + jsonPrefix.length).trim(); + if ( + suffix.length === 0 || + suffix.length > MAX_TOOLCALL_REPAIR_TRAILING_CHARS || + !TOOLCALL_REPAIR_ALLOWED_TRAILING_RE.test(suffix) + ) { + return undefined; + } + try { + const parsed = JSON.parse(jsonPrefix) as unknown; + return parsed && typeof parsed === "object" && !Array.isArray(parsed) + ? { args: parsed as Record, trailingSuffix: suffix } + : undefined; + } catch { + return undefined; + } + } +} + +function repairToolCallArgumentsInMessage( + message: unknown, + contentIndex: number, + repairedArgs: Record, +): void { + if (!message || typeof message !== "object") { + return; + } + const content = (message as { content?: unknown }).content; + if (!Array.isArray(content)) { + return; + } + const block = content[contentIndex]; + if (!block || typeof block !== "object") { + return; + } + const typedBlock = block as { type?: unknown; arguments?: unknown }; + if (!isToolCallBlockType(typedBlock.type)) { + return; + } + typedBlock.arguments = repairedArgs; +} + +function clearToolCallArgumentsInMessage(message: unknown, contentIndex: number): void { + if (!message || typeof message !== "object") { + return; + } + const content = (message as { content?: unknown }).content; + if (!Array.isArray(content)) { + return; + } + const block = content[contentIndex]; + if (!block || typeof block !== "object") { + return; + } + const typedBlock = block as { type?: unknown; arguments?: unknown }; + if (!isToolCallBlockType(typedBlock.type)) { + return; + } + typedBlock.arguments = {}; +} + +function repairMalformedToolCallArgumentsInMessage( + message: unknown, + repairedArgsByIndex: Map>, +): void { + if (!message || typeof message !== "object") { + return; + } + const content = (message as { content?: unknown }).content; + if (!Array.isArray(content)) { + return; + } + for (const [index, repairedArgs] of repairedArgsByIndex.entries()) { + repairToolCallArgumentsInMessage(message, index, repairedArgs); + } +} + +function wrapStreamRepairMalformedToolCallArguments( + stream: ReturnType, +): ReturnType { + const partialJsonByIndex = new Map(); + const repairedArgsByIndex = new Map>(); + const disabledIndices = new Set(); + const loggedRepairIndices = new Set(); + const originalResult = stream.result.bind(stream); + stream.result = async () => { + const message = await originalResult(); + repairMalformedToolCallArgumentsInMessage(message, repairedArgsByIndex); + partialJsonByIndex.clear(); + repairedArgsByIndex.clear(); + disabledIndices.clear(); + loggedRepairIndices.clear(); + return message; + }; + + const originalAsyncIterator = stream[Symbol.asyncIterator].bind(stream); + (stream as { [Symbol.asyncIterator]: typeof originalAsyncIterator })[Symbol.asyncIterator] = + function () { + const iterator = originalAsyncIterator(); + return { + async next() { + const result = await iterator.next(); + if (!result.done && result.value && typeof result.value === "object") { + const event = result.value as { + type?: unknown; + contentIndex?: unknown; + delta?: unknown; + partial?: unknown; + message?: unknown; + toolCall?: unknown; + }; + if ( + typeof event.contentIndex === "number" && + Number.isInteger(event.contentIndex) && + event.type === "toolcall_delta" && + typeof event.delta === "string" + ) { + if (disabledIndices.has(event.contentIndex)) { + return result; + } + const nextPartialJson = + (partialJsonByIndex.get(event.contentIndex) ?? "") + event.delta; + if (nextPartialJson.length > MAX_TOOLCALL_REPAIR_BUFFER_CHARS) { + partialJsonByIndex.delete(event.contentIndex); + repairedArgsByIndex.delete(event.contentIndex); + disabledIndices.add(event.contentIndex); + return result; + } + partialJsonByIndex.set(event.contentIndex, nextPartialJson); + if (shouldAttemptMalformedToolCallRepair(nextPartialJson, event.delta)) { + const repair = tryParseMalformedToolCallArguments(nextPartialJson); + if (repair) { + repairedArgsByIndex.set(event.contentIndex, repair.args); + repairToolCallArgumentsInMessage(event.partial, event.contentIndex, repair.args); + repairToolCallArgumentsInMessage(event.message, event.contentIndex, repair.args); + if (!loggedRepairIndices.has(event.contentIndex)) { + loggedRepairIndices.add(event.contentIndex); + log.warn( + `repairing kimi-coding tool call arguments after ${repair.trailingSuffix.length} trailing chars`, + ); + } + } else { + repairedArgsByIndex.delete(event.contentIndex); + clearToolCallArgumentsInMessage(event.partial, event.contentIndex); + clearToolCallArgumentsInMessage(event.message, event.contentIndex); + } + } + } + if ( + typeof event.contentIndex === "number" && + Number.isInteger(event.contentIndex) && + event.type === "toolcall_end" + ) { + const repairedArgs = repairedArgsByIndex.get(event.contentIndex); + if (repairedArgs) { + if (event.toolCall && typeof event.toolCall === "object") { + (event.toolCall as { arguments?: unknown }).arguments = repairedArgs; + } + repairToolCallArgumentsInMessage(event.partial, event.contentIndex, repairedArgs); + repairToolCallArgumentsInMessage(event.message, event.contentIndex, repairedArgs); + } + partialJsonByIndex.delete(event.contentIndex); + disabledIndices.delete(event.contentIndex); + loggedRepairIndices.delete(event.contentIndex); + } + } + return result; + }, + async return(value?: unknown) { + return iterator.return?.(value) ?? { done: true as const, value: undefined }; + }, + async throw(error?: unknown) { + return iterator.throw?.(error) ?? { done: true as const, value: undefined }; + }, + }; + }; + + return stream; +} + +export function wrapStreamFnRepairMalformedToolCallArguments(baseFn: StreamFn): StreamFn { + return (model, context, options) => { + const maybeStream = baseFn(model, context, options); + if (maybeStream && typeof maybeStream === "object" && "then" in maybeStream) { + return Promise.resolve(maybeStream).then((stream) => + wrapStreamRepairMalformedToolCallArguments(stream), + ); + } + return wrapStreamRepairMalformedToolCallArguments(maybeStream); + }; +} + +function shouldRepairMalformedAnthropicToolCallArguments(provider?: string): boolean { + return normalizeProviderId(provider ?? "") === "kimi-coding"; +} + // --------------------------------------------------------------------------- // xAI / Grok: decode HTML entities in tool call arguments // --------------------------------------------------------------------------- @@ -750,6 +1373,9 @@ export async function runEmbeddedAttempt( const resolvedWorkspace = resolveUserPath(params.workspaceDir); const prevCwd = process.cwd(); const runAbortController = new AbortController(); + // Proxy bootstrap must happen before timeout tuning so the timeouts wrap the + // active EnvHttpProxyAgent instead of being replaced by a bare proxy dispatcher. + ensureGlobalUndiciEnvProxyDispatcher(); ensureGlobalUndiciStreamTimeouts(); log.debug( @@ -841,6 +1467,13 @@ export async function runEmbeddedAttempt( config: params.config, sessionAgentId, }); + // Track sessions_yield tool invocation (callback pattern, like clientToolCallDetected) + let yieldDetected = false; + let yieldMessage: string | null = null; + // Late-binding reference so onYield can abort the session (declared after tool creation) + let abortSessionForYield: (() => void) | null = null; + let queueYieldInterruptForSession: (() => void) | null = null; + let yieldAbortSettled: Promise | null = null; // Check if the model supports native image input const modelHasVision = params.model.input?.includes("image") ?? false; const toolsRaw = params.disableTools @@ -870,6 +1503,10 @@ export async function runEmbeddedAttempt( runId: params.runId, agentDir, workspaceDir: effectiveWorkspace, + // When sandboxing uses a copied workspace (`ro` or `none`), effectiveWorkspace points + // at the sandbox copy. Spawned subagents should inherit the real workspace instead. + spawnWorkspaceDir: + sandbox?.enabled && sandbox.workspaceAccess !== "rw" ? resolvedWorkspace : undefined, config: params.config, abortSignal: runAbortController.signal, modelProvider: params.model.provider, @@ -885,6 +1522,13 @@ export async function runEmbeddedAttempt( requireExplicitMessageTarget: params.requireExplicitMessageTarget ?? isSubagentSessionKey(params.sessionKey), disableMessageTool: params.disableMessageTool, + onYield: (message) => { + yieldDetected = true; + yieldMessage = message; + queueYieldInterruptForSession?.(); + runAbortController.abort("sessions_yield"); + abortSessionForYield?.(); + }, }); const toolsEnabled = supportsModelTools(params.model); const tools = sanitizeToolsForGoogle({ @@ -1098,6 +1742,7 @@ export async function runEmbeddedAttempt( try { await params.contextEngine.bootstrap({ sessionId: params.sessionId, + sessionKey: params.sessionKey, sessionFile: params.sessionFile, }); } catch (bootstrapErr) { @@ -1195,6 +1840,12 @@ export async function runEmbeddedAttempt( throw new Error("Embedded agent session missing"); } const activeSession = session; + abortSessionForYield = () => { + yieldAbortSettled = Promise.resolve(activeSession.abort()); + }; + queueYieldInterruptForSession = () => { + queueSessionsYieldInterruptMessage(activeSession); + }; removeToolResultContextGuard = installToolResultContextGuard({ agent: activeSession.agent, contextWindowTokens: Math.max( @@ -1366,6 +2017,17 @@ export async function runEmbeddedAttempt( }; } + const innerStreamFn = activeSession.agent.streamFn; + activeSession.agent.streamFn = (model, context, options) => { + const signal = runAbortController.signal as AbortSignal & { reason?: unknown }; + if (yieldDetected && signal.aborted && signal.reason === "sessions_yield") { + return createYieldAbortedResponse(model) as unknown as Awaited< + ReturnType + >; + } + return innerStreamFn(model, context, options); + }; + // Some models emit tool names with surrounding whitespace (e.g. " read "). // pi-agent-core dispatches tool calls with exact string matching, so normalize // names on the live response stream before tool execution. @@ -1374,6 +2036,15 @@ export async function runEmbeddedAttempt( allowedToolNames, ); + if ( + params.model.api === "anthropic-messages" && + shouldRepairMalformedAnthropicToolCallArguments(params.provider) + ) { + activeSession.agent.streamFn = wrapStreamFnRepairMalformedToolCallArguments( + activeSession.agent.streamFn, + ); + } + if (isXaiProvider(params.provider, params.modelId)) { activeSession.agent.streamFn = wrapStreamFnDecodeXaiToolCallArguments( activeSession.agent.streamFn, @@ -1424,6 +2095,7 @@ export async function runEmbeddedAttempt( try { const assembled = await params.contextEngine.assemble({ sessionId: params.sessionId, + sessionKey: params.sessionKey, messages: activeSession.messages, tokenBudget: params.contextTokenBudget, }); @@ -1457,6 +2129,7 @@ export async function runEmbeddedAttempt( } let aborted = Boolean(params.abortSignal?.aborted); + let yieldAborted = false; let timedOut = false; let timedOutDuringCompaction = false; const getAbortReason = (signal: AbortSignal): unknown => @@ -1539,6 +2212,7 @@ export async function runEmbeddedAttempt( toolMetas, unsubscribe, waitForCompactionRetry, + isCompactionInFlight, getMessagingToolSentTexts, getMessagingToolSentMediaUrls, getMessagingToolSentTargets, @@ -1648,15 +2322,15 @@ export async function runEmbeddedAttempt( hookRunner, legacyBeforeAgentStartResult: params.legacyBeforeAgentStartResult, }); - const cortexPromptContext = await resolveAgentCortexPromptContext({ - cfg: params.config, - agentId: sessionAgentId, - workspaceDir: params.workspaceDir, - promptMode, - sessionId: params.sessionId, - channelId: params.messageChannel ?? params.messageProvider ?? undefined, - }); { + const cortexPromptContext = await resolveAgentCortexPromptContext({ + cfg: params.config, + agentId: sessionAgentId, + workspaceDir: params.workspaceDir, + promptMode, + sessionId: params.sessionId, + channelId: params.messageChannel ?? params.messageProvider ?? undefined, + }); if (hookResult?.prependContext) { effectivePrompt = `${hookResult.prependContext}\n\n${params.prompt}`; log.debug( @@ -1785,6 +2459,8 @@ export async function runEmbeddedAttempt( sessionId: params.sessionId, workspaceDir: params.workspaceDir, messageProvider: params.messageProvider ?? undefined, + trigger: params.trigger, + channelId: params.messageChannel ?? params.messageProvider ?? undefined, }, ) .catch((err) => { @@ -1800,8 +2476,29 @@ export async function runEmbeddedAttempt( await abortable(activeSession.prompt(effectivePrompt)); } } catch (err) { - promptError = err; - promptErrorSource = "prompt"; + // Yield-triggered abort is intentional — treat as clean stop, not error. + // Check the abort reason to distinguish from external aborts (timeout, user cancel) + // that may race after yieldDetected is set. + yieldAborted = + yieldDetected && + isRunnerAbortError(err) && + err instanceof Error && + err.cause === "sessions_yield"; + if (yieldAborted) { + aborted = false; + // Ensure the session abort has fully settled before proceeding. + if (yieldAbortSettled) { + // eslint-disable-next-line @typescript-eslint/await-thenable -- abort() returns Promise per AgentSession.d.ts + await yieldAbortSettled; + } + stripSessionsYieldArtifacts(activeSession); + if (yieldMessage) { + await persistSessionsYieldContextMessage(activeSession, yieldMessage); + } + } else { + promptError = err; + promptErrorSource = "prompt"; + } } finally { log.debug( `embedded run prompt end: runId=${params.runId} sessionId=${params.sessionId} durationMs=${Date.now() - promptStartedAt}`, @@ -1817,6 +2514,7 @@ export async function runEmbeddedAttempt( // Only trust snapshot if compaction wasn't running before or after capture const preCompactionSnapshot = wasCompactingBefore || wasCompactingAfter ? null : snapshot; const preCompactionSessionId = activeSession.sessionId; + const COMPACTION_RETRY_AGGREGATE_TIMEOUT_MS = 60_000; try { // Flush buffered block replies before waiting for compaction so the @@ -1827,7 +2525,25 @@ export async function runEmbeddedAttempt( await params.onBlockReplyFlush(); } - await abortable(waitForCompactionRetry()); + // Skip compaction wait when yield aborted the run — the signal is + // already tripped and abortable() would immediately reject. + const compactionRetryWait = yieldAborted + ? { timedOut: false } + : await waitForCompactionRetryWithAggregateTimeout({ + waitForCompactionRetry, + abortable, + aggregateTimeoutMs: COMPACTION_RETRY_AGGREGATE_TIMEOUT_MS, + isCompactionStillInFlight: isCompactionInFlight, + }); + if (compactionRetryWait.timedOut) { + timedOutDuringCompaction = true; + if (!isProbeSession) { + log.warn( + `compaction retry aggregate timeout (${COMPACTION_RETRY_AGGREGATE_TIMEOUT_MS}ms): ` + + `proceeding with pre-compaction state runId=${params.runId} sessionId=${params.sessionId}`, + ); + } + } } catch (err) { if (isRunnerAbortError(err)) { if (!promptError) { @@ -1844,14 +2560,19 @@ export async function runEmbeddedAttempt( } } + // Check if ANY compaction occurred during the entire attempt (prompt + retry). + // Using a cumulative count (> 0) instead of a delta check avoids missing + // compactions that complete during activeSession.prompt() before the delta + // baseline is sampled. const compactionOccurredThisAttempt = getCompactionCount() > 0; - // Append cache-TTL timestamp AFTER prompt + compaction retry completes. // Previously this was before the prompt, which caused a custom entry to be // inserted between compaction and the next prompt — breaking the // prepareCompaction() guard that checks the last entry type, leading to // double-compaction. See: https://github.com/openclaw/openclaw/issues/9282 // Skip when timed out during compaction — session state may be inconsistent. + // Also skip when compaction ran this attempt — appending a custom entry + // after compaction would break the guard again. See: #28491 if (!timedOutDuringCompaction && !compactionOccurredThisAttempt) { const shouldTrackCacheTtl = params.config?.agents?.defaults?.contextPruning?.mode === "cache-ttl" && @@ -1912,6 +2633,7 @@ export async function runEmbeddedAttempt( try { await params.contextEngine.afterTurn({ sessionId: sessionIdUsed, + sessionKey: params.sessionKey, sessionFile: params.sessionFile, messages: messagesSnapshot, prePromptMessageCount, @@ -1929,6 +2651,7 @@ export async function runEmbeddedAttempt( try { await params.contextEngine.ingestBatch({ sessionId: sessionIdUsed, + sessionKey: params.sessionKey, messages: newMessages, }); } catch (ingestErr) { @@ -1939,6 +2662,7 @@ export async function runEmbeddedAttempt( try { await params.contextEngine.ingest({ sessionId: sessionIdUsed, + sessionKey: params.sessionKey, message: msg, }); } catch (ingestErr) { @@ -1978,6 +2702,8 @@ export async function runEmbeddedAttempt( sessionId: params.sessionId, workspaceDir: params.workspaceDir, messageProvider: params.messageProvider ?? undefined, + trigger: params.trigger, + channelId: params.messageChannel ?? params.messageProvider ?? undefined, }, ) .catch((err) => { @@ -2038,6 +2764,8 @@ export async function runEmbeddedAttempt( sessionId: params.sessionId, workspaceDir: params.workspaceDir, messageProvider: params.messageProvider ?? undefined, + trigger: params.trigger, + channelId: params.messageChannel ?? params.messageProvider ?? undefined, }, ) .catch((err) => { @@ -2071,6 +2799,7 @@ export async function runEmbeddedAttempt( compactionCount: getCompactionCount(), // Client tool call detected (OpenResponses hosted tools) clientToolCall: clientToolCallDetected ?? undefined, + yieldDetected: yieldDetected || undefined, }; } finally { // Always tear down the session (and release the lock) before we leave this attempt.