import crypto from "node:crypto"; import { runWithModelFallback } from "../../agents/model-fallback.js"; import { isCliProvider } from "../../agents/model-selection.js"; import { runEmbeddedPiAgent } from "../../agents/pi-embedded.js"; import { resolveSandboxConfigForAgent, resolveSandboxRuntimeStatus } from "../../agents/sandbox.js"; import type { OpenClawConfig } from "../../config/config.js"; import { type SessionEntry, updateSessionStoreEntry } from "../../config/sessions.js"; import { logVerbose } from "../../globals.js"; import { registerAgentRunContext } from "../../infra/agent-events.js"; import type { TemplateContext } from "../templating.js"; import type { VerboseLevel } from "../thinking.js"; import type { GetReplyOptions } from "../types.js"; import { buildEmbeddedRunBaseParams, buildEmbeddedRunContexts, resolveModelFallbackOptions, } from "./agent-runner-utils.js"; import { resolveMemoryFlushContextWindowTokens, resolveMemoryFlushPromptForRun, resolveMemoryFlushSettings, shouldRunMemoryFlush, } from "./memory-flush.js"; import type { FollowupRun } from "./queue.js"; import { incrementCompactionCount } from "./session-updates.js"; export async function runMemoryFlushIfNeeded(params: { cfg: OpenClawConfig; followupRun: FollowupRun; sessionCtx: TemplateContext; opts?: GetReplyOptions; defaultModel: string; agentCfgContextTokens?: number; resolvedVerboseLevel: VerboseLevel; sessionEntry?: SessionEntry; sessionStore?: Record; sessionKey?: string; storePath?: string; isHeartbeat: boolean; }): Promise { const memoryFlushSettings = resolveMemoryFlushSettings(params.cfg); if (!memoryFlushSettings) { return params.sessionEntry; } const memoryFlushWritable = (() => { if (!params.sessionKey) { return true; } const runtime = resolveSandboxRuntimeStatus({ cfg: params.cfg, sessionKey: params.sessionKey, }); if (!runtime.sandboxed) { return true; } const sandboxCfg = resolveSandboxConfigForAgent(params.cfg, runtime.agentId); return sandboxCfg.workspaceAccess === "rw"; })(); const shouldFlushMemory = memoryFlushSettings && memoryFlushWritable && !params.isHeartbeat && !isCliProvider(params.followupRun.run.provider, params.cfg) && shouldRunMemoryFlush({ entry: params.sessionEntry ?? (params.sessionKey ? params.sessionStore?.[params.sessionKey] : undefined), contextWindowTokens: resolveMemoryFlushContextWindowTokens({ modelId: params.followupRun.run.model ?? params.defaultModel, agentCfgContextTokens: params.agentCfgContextTokens, }), reserveTokensFloor: memoryFlushSettings.reserveTokensFloor, softThresholdTokens: memoryFlushSettings.softThresholdTokens, }); if (!shouldFlushMemory) { return params.sessionEntry; } let activeSessionEntry = params.sessionEntry; const activeSessionStore = params.sessionStore; const flushRunId = crypto.randomUUID(); if (params.sessionKey) { registerAgentRunContext(flushRunId, { sessionKey: params.sessionKey, verboseLevel: params.resolvedVerboseLevel, }); } let memoryCompactionCompleted = false; const flushSystemPrompt = [ params.followupRun.run.extraSystemPrompt, memoryFlushSettings.systemPrompt, ] .filter(Boolean) .join("\n\n"); try { await runWithModelFallback({ ...resolveModelFallbackOptions(params.followupRun.run), run: (provider, model) => { const { authProfile, embeddedContext, senderContext } = buildEmbeddedRunContexts({ run: params.followupRun.run, sessionCtx: params.sessionCtx, hasRepliedRef: params.opts?.hasRepliedRef, provider, }); const runBaseParams = buildEmbeddedRunBaseParams({ run: params.followupRun.run, provider, model, runId: flushRunId, authProfile, }); return runEmbeddedPiAgent({ ...embeddedContext, ...senderContext, ...runBaseParams, prompt: resolveMemoryFlushPromptForRun({ prompt: memoryFlushSettings.prompt, cfg: params.cfg, }), extraSystemPrompt: flushSystemPrompt, onAgentEvent: (evt) => { if (evt.stream === "compaction") { const phase = typeof evt.data.phase === "string" ? evt.data.phase : ""; if (phase === "end") { memoryCompactionCompleted = true; } } }, }); }, }); let memoryFlushCompactionCount = activeSessionEntry?.compactionCount ?? (params.sessionKey ? activeSessionStore?.[params.sessionKey]?.compactionCount : 0) ?? 0; if (memoryCompactionCompleted) { const nextCount = await incrementCompactionCount({ sessionEntry: activeSessionEntry, sessionStore: activeSessionStore, sessionKey: params.sessionKey, storePath: params.storePath, }); if (typeof nextCount === "number") { memoryFlushCompactionCount = nextCount; } } if (params.storePath && params.sessionKey) { try { const updatedEntry = await updateSessionStoreEntry({ storePath: params.storePath, sessionKey: params.sessionKey, update: async () => ({ memoryFlushAt: Date.now(), memoryFlushCompactionCount, }), }); if (updatedEntry) { activeSessionEntry = updatedEntry; } } catch (err) { logVerbose(`failed to persist memory flush metadata: ${String(err)}`); } } } catch (err) { logVerbose(`memory flush run failed: ${String(err)}`); } return activeSessionEntry; }