feat: integrate Cortex local memory into OpenClaw

This commit is contained in:
Marc J Saint-jour 2026-03-12 18:41:02 -04:00
parent 99bd165459
commit 020947d29c

View File

@ -11,10 +11,7 @@ import { resolveHeartbeatPrompt } from "../../../auto-reply/heartbeat.js";
import { resolveChannelCapabilities } from "../../../config/channel-capabilities.js";
import type { OpenClawConfig } from "../../../config/config.js";
import { getMachineDisplayName } from "../../../infra/machine-name.js";
import {
ensureGlobalUndiciEnvProxyDispatcher,
ensureGlobalUndiciStreamTimeouts,
} from "../../../infra/net/undici-global-dispatcher.js";
import { ensureGlobalUndiciStreamTimeouts } from "../../../infra/net/undici-global-dispatcher.js";
import { MAX_IMAGE_BYTES } from "../../../media/constants.js";
import { getGlobalHookRunner } from "../../../plugins/hook-runner-global.js";
import type {
@ -46,6 +43,7 @@ import {
listChannelSupportedActions,
resolveChannelMessageToolHints,
} from "../../channel-tools.js";
import { resolveAgentCortexPromptContext } from "../../cortex.js";
import { ensureCustomApiRegistered } from "../../custom-api-registry.js";
import { DEFAULT_CONTEXT_TOKENS } from "../../defaults.js";
import { resolveOpenClawDocsPath } from "../../docs-path.js";
@ -127,7 +125,6 @@ import { installToolResultContextGuard } from "../tool-result-context-guard.js";
import { splitSdkTools } from "../tool-split.js";
import { describeUnknownError, mapThinkingLevel } from "../utils.js";
import { flushPendingToolResultsAfterIdle } from "../wait-for-idle-before-flush.js";
import { waitForCompactionRetryWithAggregateTimeout } from "./compaction-retry-aggregate-timeout.js";
import {
selectCompactionTimeoutSnapshot,
shouldFlagCompactionTimeout,
@ -233,14 +230,15 @@ export function wrapOllamaCompatNumCtx(baseFn: StreamFn | undefined, numCtx: num
...options,
onPayload: (payload: unknown) => {
if (!payload || typeof payload !== "object") {
return options?.onPayload?.(payload, model);
options?.onPayload?.(payload);
return;
}
const payloadRecord = payload as Record<string, unknown>;
if (!payloadRecord.options || typeof payloadRecord.options !== "object") {
payloadRecord.options = {};
}
(payloadRecord.options as Record<string, unknown>).num_ctx = numCtx;
return options?.onPayload?.(payload, model);
options?.onPayload?.(payload);
},
});
}
@ -752,9 +750,6 @@ export async function runEmbeddedAttempt(
const resolvedWorkspace = resolveUserPath(params.workspaceDir);
const prevCwd = process.cwd();
const runAbortController = new AbortController();
// Proxy bootstrap must happen before timeout tuning so the timeouts wrap the
// active EnvHttpProxyAgent instead of being replaced by a bare proxy dispatcher.
ensureGlobalUndiciEnvProxyDispatcher();
ensureGlobalUndiciStreamTimeouts();
log.debug(
@ -1544,7 +1539,6 @@ export async function runEmbeddedAttempt(
toolMetas,
unsubscribe,
waitForCompactionRetry,
isCompactionInFlight,
getMessagingToolSentTexts,
getMessagingToolSentMediaUrls,
getMessagingToolSentTargets,
@ -1654,6 +1648,14 @@ export async function runEmbeddedAttempt(
hookRunner,
legacyBeforeAgentStartResult: params.legacyBeforeAgentStartResult,
});
const cortexPromptContext = await resolveAgentCortexPromptContext({
cfg: params.config,
agentId: sessionAgentId,
workspaceDir: params.workspaceDir,
promptMode,
sessionId: params.sessionId,
channelId: params.messageChannel ?? params.messageProvider ?? undefined,
});
{
if (hookResult?.prependContext) {
effectivePrompt = `${hookResult.prependContext}\n\n${params.prompt}`;
@ -1661,6 +1663,9 @@ export async function runEmbeddedAttempt(
`hooks: prepended context to prompt (${hookResult.prependContext.length} chars)`,
);
}
if (cortexPromptContext.error) {
log.warn(`cortex prompt context unavailable: ${cortexPromptContext.error}`);
}
const legacySystemPrompt =
typeof hookResult?.systemPrompt === "string" ? hookResult.systemPrompt.trim() : "";
if (legacySystemPrompt) {
@ -1670,16 +1675,22 @@ export async function runEmbeddedAttempt(
}
const prependedOrAppendedSystemPrompt = composeSystemPromptWithHookContext({
baseSystemPrompt: systemPromptText,
prependSystemContext: hookResult?.prependSystemContext,
prependSystemContext: joinPresentTextSegments([
cortexPromptContext.context,
hookResult?.prependSystemContext,
]),
appendSystemContext: hookResult?.appendSystemContext,
});
if (prependedOrAppendedSystemPrompt) {
const prependSystemLen = hookResult?.prependSystemContext?.trim().length ?? 0;
const prependSystemLen = joinPresentTextSegments([
cortexPromptContext.context,
hookResult?.prependSystemContext,
])?.trim().length;
const appendSystemLen = hookResult?.appendSystemContext?.trim().length ?? 0;
applySystemPromptOverrideToSession(activeSession, prependedOrAppendedSystemPrompt);
systemPromptText = prependedOrAppendedSystemPrompt;
log.debug(
`hooks: applied prependSystemContext/appendSystemContext (${prependSystemLen}+${appendSystemLen} chars)`,
`hooks: applied prependSystemContext/appendSystemContext (${prependSystemLen ?? 0}+${appendSystemLen} chars)`,
);
}
}
@ -1774,8 +1785,6 @@ export async function runEmbeddedAttempt(
sessionId: params.sessionId,
workspaceDir: params.workspaceDir,
messageProvider: params.messageProvider ?? undefined,
trigger: params.trigger,
channelId: params.messageChannel ?? params.messageProvider ?? undefined,
},
)
.catch((err) => {
@ -1808,7 +1817,6 @@ export async function runEmbeddedAttempt(
// Only trust snapshot if compaction wasn't running before or after capture
const preCompactionSnapshot = wasCompactingBefore || wasCompactingAfter ? null : snapshot;
const preCompactionSessionId = activeSession.sessionId;
const COMPACTION_RETRY_AGGREGATE_TIMEOUT_MS = 60_000;
try {
// Flush buffered block replies before waiting for compaction so the
@ -1819,21 +1827,7 @@ export async function runEmbeddedAttempt(
await params.onBlockReplyFlush();
}
const compactionRetryWait = await waitForCompactionRetryWithAggregateTimeout({
waitForCompactionRetry,
abortable,
aggregateTimeoutMs: COMPACTION_RETRY_AGGREGATE_TIMEOUT_MS,
isCompactionStillInFlight: isCompactionInFlight,
});
if (compactionRetryWait.timedOut) {
timedOutDuringCompaction = true;
if (!isProbeSession) {
log.warn(
`compaction retry aggregate timeout (${COMPACTION_RETRY_AGGREGATE_TIMEOUT_MS}ms): ` +
`proceeding with pre-compaction state runId=${params.runId} sessionId=${params.sessionId}`,
);
}
}
await abortable(waitForCompactionRetry());
} catch (err) {
if (isRunnerAbortError(err)) {
if (!promptError) {
@ -1984,8 +1978,6 @@ export async function runEmbeddedAttempt(
sessionId: params.sessionId,
workspaceDir: params.workspaceDir,
messageProvider: params.messageProvider ?? undefined,
trigger: params.trigger,
channelId: params.messageChannel ?? params.messageProvider ?? undefined,
},
)
.catch((err) => {
@ -2046,8 +2038,6 @@ export async function runEmbeddedAttempt(
sessionId: params.sessionId,
workspaceDir: params.workspaceDir,
messageProvider: params.messageProvider ?? undefined,
trigger: params.trigger,
channelId: params.messageChannel ?? params.messageProvider ?? undefined,
},
)
.catch((err) => {