diff --git a/src/agents/pi-embedded-runner/run/attempt.ts b/src/agents/pi-embedded-runner/run/attempt.ts index 2f5f3d04d5f..fd5d4033e0a 100644 --- a/src/agents/pi-embedded-runner/run/attempt.ts +++ b/src/agents/pi-embedded-runner/run/attempt.ts @@ -11,10 +11,7 @@ import { resolveHeartbeatPrompt } from "../../../auto-reply/heartbeat.js"; import { resolveChannelCapabilities } from "../../../config/channel-capabilities.js"; import type { OpenClawConfig } from "../../../config/config.js"; import { getMachineDisplayName } from "../../../infra/machine-name.js"; -import { - ensureGlobalUndiciEnvProxyDispatcher, - ensureGlobalUndiciStreamTimeouts, -} from "../../../infra/net/undici-global-dispatcher.js"; +import { ensureGlobalUndiciStreamTimeouts } from "../../../infra/net/undici-global-dispatcher.js"; import { MAX_IMAGE_BYTES } from "../../../media/constants.js"; import { getGlobalHookRunner } from "../../../plugins/hook-runner-global.js"; import type { @@ -46,6 +43,7 @@ import { listChannelSupportedActions, resolveChannelMessageToolHints, } from "../../channel-tools.js"; +import { resolveAgentCortexPromptContext } from "../../cortex.js"; import { ensureCustomApiRegistered } from "../../custom-api-registry.js"; import { DEFAULT_CONTEXT_TOKENS } from "../../defaults.js"; import { resolveOpenClawDocsPath } from "../../docs-path.js"; @@ -127,7 +125,6 @@ import { installToolResultContextGuard } from "../tool-result-context-guard.js"; import { splitSdkTools } from "../tool-split.js"; import { describeUnknownError, mapThinkingLevel } from "../utils.js"; import { flushPendingToolResultsAfterIdle } from "../wait-for-idle-before-flush.js"; -import { waitForCompactionRetryWithAggregateTimeout } from "./compaction-retry-aggregate-timeout.js"; import { selectCompactionTimeoutSnapshot, shouldFlagCompactionTimeout, @@ -233,14 +230,15 @@ export function wrapOllamaCompatNumCtx(baseFn: StreamFn | undefined, numCtx: num ...options, onPayload: (payload: unknown) => { if (!payload || typeof payload !== "object") { - return options?.onPayload?.(payload, model); + options?.onPayload?.(payload); + return; } const payloadRecord = payload as Record; if (!payloadRecord.options || typeof payloadRecord.options !== "object") { payloadRecord.options = {}; } (payloadRecord.options as Record).num_ctx = numCtx; - return options?.onPayload?.(payload, model); + options?.onPayload?.(payload); }, }); } @@ -752,9 +750,6 @@ export async function runEmbeddedAttempt( const resolvedWorkspace = resolveUserPath(params.workspaceDir); const prevCwd = process.cwd(); const runAbortController = new AbortController(); - // Proxy bootstrap must happen before timeout tuning so the timeouts wrap the - // active EnvHttpProxyAgent instead of being replaced by a bare proxy dispatcher. - ensureGlobalUndiciEnvProxyDispatcher(); ensureGlobalUndiciStreamTimeouts(); log.debug( @@ -1544,7 +1539,6 @@ export async function runEmbeddedAttempt( toolMetas, unsubscribe, waitForCompactionRetry, - isCompactionInFlight, getMessagingToolSentTexts, getMessagingToolSentMediaUrls, getMessagingToolSentTargets, @@ -1654,6 +1648,14 @@ export async function runEmbeddedAttempt( hookRunner, legacyBeforeAgentStartResult: params.legacyBeforeAgentStartResult, }); + const cortexPromptContext = await resolveAgentCortexPromptContext({ + cfg: params.config, + agentId: sessionAgentId, + workspaceDir: params.workspaceDir, + promptMode, + sessionId: params.sessionId, + channelId: params.messageChannel ?? params.messageProvider ?? undefined, + }); { if (hookResult?.prependContext) { effectivePrompt = `${hookResult.prependContext}\n\n${params.prompt}`; @@ -1661,6 +1663,9 @@ export async function runEmbeddedAttempt( `hooks: prepended context to prompt (${hookResult.prependContext.length} chars)`, ); } + if (cortexPromptContext.error) { + log.warn(`cortex prompt context unavailable: ${cortexPromptContext.error}`); + } const legacySystemPrompt = typeof hookResult?.systemPrompt === "string" ? hookResult.systemPrompt.trim() : ""; if (legacySystemPrompt) { @@ -1670,16 +1675,22 @@ export async function runEmbeddedAttempt( } const prependedOrAppendedSystemPrompt = composeSystemPromptWithHookContext({ baseSystemPrompt: systemPromptText, - prependSystemContext: hookResult?.prependSystemContext, + prependSystemContext: joinPresentTextSegments([ + cortexPromptContext.context, + hookResult?.prependSystemContext, + ]), appendSystemContext: hookResult?.appendSystemContext, }); if (prependedOrAppendedSystemPrompt) { - const prependSystemLen = hookResult?.prependSystemContext?.trim().length ?? 0; + const prependSystemLen = joinPresentTextSegments([ + cortexPromptContext.context, + hookResult?.prependSystemContext, + ])?.trim().length; const appendSystemLen = hookResult?.appendSystemContext?.trim().length ?? 0; applySystemPromptOverrideToSession(activeSession, prependedOrAppendedSystemPrompt); systemPromptText = prependedOrAppendedSystemPrompt; log.debug( - `hooks: applied prependSystemContext/appendSystemContext (${prependSystemLen}+${appendSystemLen} chars)`, + `hooks: applied prependSystemContext/appendSystemContext (${prependSystemLen ?? 0}+${appendSystemLen} chars)`, ); } } @@ -1774,8 +1785,6 @@ export async function runEmbeddedAttempt( sessionId: params.sessionId, workspaceDir: params.workspaceDir, messageProvider: params.messageProvider ?? undefined, - trigger: params.trigger, - channelId: params.messageChannel ?? params.messageProvider ?? undefined, }, ) .catch((err) => { @@ -1808,7 +1817,6 @@ export async function runEmbeddedAttempt( // Only trust snapshot if compaction wasn't running before or after capture const preCompactionSnapshot = wasCompactingBefore || wasCompactingAfter ? null : snapshot; const preCompactionSessionId = activeSession.sessionId; - const COMPACTION_RETRY_AGGREGATE_TIMEOUT_MS = 60_000; try { // Flush buffered block replies before waiting for compaction so the @@ -1819,21 +1827,7 @@ export async function runEmbeddedAttempt( await params.onBlockReplyFlush(); } - const compactionRetryWait = await waitForCompactionRetryWithAggregateTimeout({ - waitForCompactionRetry, - abortable, - aggregateTimeoutMs: COMPACTION_RETRY_AGGREGATE_TIMEOUT_MS, - isCompactionStillInFlight: isCompactionInFlight, - }); - if (compactionRetryWait.timedOut) { - timedOutDuringCompaction = true; - if (!isProbeSession) { - log.warn( - `compaction retry aggregate timeout (${COMPACTION_RETRY_AGGREGATE_TIMEOUT_MS}ms): ` + - `proceeding with pre-compaction state runId=${params.runId} sessionId=${params.sessionId}`, - ); - } - } + await abortable(waitForCompactionRetry()); } catch (err) { if (isRunnerAbortError(err)) { if (!promptError) { @@ -1984,8 +1978,6 @@ export async function runEmbeddedAttempt( sessionId: params.sessionId, workspaceDir: params.workspaceDir, messageProvider: params.messageProvider ?? undefined, - trigger: params.trigger, - channelId: params.messageChannel ?? params.messageProvider ?? undefined, }, ) .catch((err) => { @@ -2046,8 +2038,6 @@ export async function runEmbeddedAttempt( sessionId: params.sessionId, workspaceDir: params.workspaceDir, messageProvider: params.messageProvider ?? undefined, - trigger: params.trigger, - channelId: params.messageChannel ?? params.messageProvider ?? undefined, }, ) .catch((err) => {