import crypto from "node:crypto"; import fs from "node:fs"; import type { AgentMessage } from "@mariozechner/pi-agent-core"; import { resolveBootstrapWarningSignaturesSeen } from "../../agents/bootstrap-budget.js"; import { estimateMessagesTokens } from "../../agents/compaction.js"; import { runWithModelFallback } from "../../agents/model-fallback.js"; import { isCliProvider } from "../../agents/model-selection.js"; import { runEmbeddedPiAgent } from "../../agents/pi-embedded.js"; import { resolveSandboxConfigForAgent, resolveSandboxRuntimeStatus } from "../../agents/sandbox.js"; import { derivePromptTokens, hasNonzeroUsage, normalizeUsage, type UsageLike, } from "../../agents/usage.js"; import type { OpenClawConfig } from "../../config/config.js"; import { resolveAgentIdFromSessionKey, resolveSessionFilePath, resolveSessionFilePathOptions, type SessionEntry, updateSessionStoreEntry, } from "../../config/sessions.js"; import { logVerbose } from "../../globals.js"; import { registerAgentRunContext } from "../../infra/agent-events.js"; import type { TemplateContext } from "../templating.js"; import type { VerboseLevel } from "../thinking.js"; import type { GetReplyOptions } from "../types.js"; import { buildEmbeddedRunBaseParams, buildEmbeddedRunContexts, resolveModelFallbackOptions, } from "./agent-runner-utils.js"; import { hasAlreadyFlushedForCurrentCompaction, resolveMemoryFlushContextWindowTokens, resolveMemoryFlushPromptForRun, resolveMemoryFlushSettings, shouldRunMemoryFlush, } from "./memory-flush.js"; import type { FollowupRun } from "./queue.js"; import { incrementCompactionCount } from "./session-updates.js"; export function estimatePromptTokensForMemoryFlush(prompt?: string): number | undefined { const trimmed = prompt?.trim(); if (!trimmed) { return undefined; } const message: AgentMessage = { role: "user", content: trimmed, timestamp: Date.now() }; const tokens = estimateMessagesTokens([message]); if (!Number.isFinite(tokens) || tokens <= 0) { return undefined; } return Math.ceil(tokens); } export function resolveEffectivePromptTokens( basePromptTokens?: number, lastOutputTokens?: number, promptTokenEstimate?: number, ): number { const base = Math.max(0, basePromptTokens ?? 0); const output = Math.max(0, lastOutputTokens ?? 0); const estimate = Math.max(0, promptTokenEstimate ?? 0); // Flush gating projects the next input context by adding the previous // completion and the current user prompt estimate. return base + output + estimate; } export type SessionTranscriptUsageSnapshot = { promptTokens?: number; outputTokens?: number; }; // Keep a generous near-threshold window so large assistant outputs still trigger // transcript reads in time to flip memory-flush gating when needed. const TRANSCRIPT_OUTPUT_READ_BUFFER_TOKENS = 8192; const TRANSCRIPT_TAIL_CHUNK_BYTES = 64 * 1024; function parseUsageFromTranscriptLine(line: string): ReturnType | undefined { const trimmed = line.trim(); if (!trimmed) { return undefined; } try { const parsed = JSON.parse(trimmed) as { message?: { usage?: UsageLike }; usage?: UsageLike; }; const usageRaw = parsed.message?.usage ?? parsed.usage; const usage = normalizeUsage(usageRaw); if (usage && hasNonzeroUsage(usage)) { return usage; } } catch { // ignore bad lines } return undefined; } function resolveSessionLogPath( sessionId?: string, sessionEntry?: SessionEntry, sessionKey?: string, opts?: { storePath?: string }, ): string | undefined { if (!sessionId) { return undefined; } try { const transcriptPath = ( sessionEntry as (SessionEntry & { transcriptPath?: string }) | undefined )?.transcriptPath?.trim(); const sessionFile = sessionEntry?.sessionFile?.trim() || transcriptPath; const agentId = resolveAgentIdFromSessionKey(sessionKey); const pathOpts = resolveSessionFilePathOptions({ agentId, storePath: opts?.storePath, }); // Normalize sessionFile through resolveSessionFilePath so relative entries // are resolved against the sessions dir/store layout, not process.cwd(). return resolveSessionFilePath( sessionId, sessionFile ? { sessionFile } : sessionEntry, pathOpts, ); } catch { return undefined; } } function deriveTranscriptUsageSnapshot( usage: ReturnType | undefined, ): SessionTranscriptUsageSnapshot | undefined { if (!usage) { return undefined; } const promptTokens = derivePromptTokens(usage); const outputRaw = usage.output; const outputTokens = typeof outputRaw === "number" && Number.isFinite(outputRaw) && outputRaw > 0 ? outputRaw : undefined; if (!(typeof promptTokens === "number") && !(typeof outputTokens === "number")) { return undefined; } return { promptTokens, outputTokens, }; } type SessionLogSnapshot = { byteSize?: number; usage?: SessionTranscriptUsageSnapshot; }; async function readSessionLogSnapshot(params: { sessionId?: string; sessionEntry?: SessionEntry; sessionKey?: string; opts?: { storePath?: string }; includeByteSize: boolean; includeUsage: boolean; }): Promise { const logPath = resolveSessionLogPath( params.sessionId, params.sessionEntry, params.sessionKey, params.opts, ); if (!logPath) { return {}; } const snapshot: SessionLogSnapshot = {}; if (params.includeByteSize) { try { const stat = await fs.promises.stat(logPath); const size = Math.floor(stat.size); snapshot.byteSize = Number.isFinite(size) && size >= 0 ? size : undefined; } catch { snapshot.byteSize = undefined; } } if (params.includeUsage) { try { const lastUsage = await readLastNonzeroUsageFromSessionLog(logPath); snapshot.usage = deriveTranscriptUsageSnapshot(lastUsage); } catch { snapshot.usage = undefined; } } return snapshot; } async function readLastNonzeroUsageFromSessionLog(logPath: string) { const handle = await fs.promises.open(logPath, "r"); try { const stat = await handle.stat(); let position = stat.size; let leadingPartial = ""; while (position > 0) { const chunkSize = Math.min(TRANSCRIPT_TAIL_CHUNK_BYTES, position); const start = position - chunkSize; const buffer = Buffer.allocUnsafe(chunkSize); const { bytesRead } = await handle.read(buffer, 0, chunkSize, start); if (bytesRead <= 0) { break; } const chunk = buffer.toString("utf-8", 0, bytesRead); const combined = `${chunk}${leadingPartial}`; const lines = combined.split(/\n+/); leadingPartial = lines.shift() ?? ""; for (let i = lines.length - 1; i >= 0; i -= 1) { const usage = parseUsageFromTranscriptLine(lines[i] ?? ""); if (usage) { return usage; } } position = start; } return parseUsageFromTranscriptLine(leadingPartial); } finally { await handle.close(); } } export async function readPromptTokensFromSessionLog( sessionId?: string, sessionEntry?: SessionEntry, sessionKey?: string, opts?: { storePath?: string }, ): Promise { const snapshot = await readSessionLogSnapshot({ sessionId, sessionEntry, sessionKey, opts, includeByteSize: false, includeUsage: true, }); return snapshot.usage; } export async function runMemoryFlushIfNeeded(params: { cfg: OpenClawConfig; followupRun: FollowupRun; promptForEstimate?: string; sessionCtx: TemplateContext; opts?: GetReplyOptions; defaultModel: string; agentCfgContextTokens?: number; resolvedVerboseLevel: VerboseLevel; sessionEntry?: SessionEntry; sessionStore?: Record; sessionKey?: string; storePath?: string; isHeartbeat: boolean; }): Promise { const memoryFlushSettings = resolveMemoryFlushSettings(params.cfg); if (!memoryFlushSettings) { return params.sessionEntry; } const memoryFlushWritable = (() => { if (!params.sessionKey) { return true; } const runtime = resolveSandboxRuntimeStatus({ cfg: params.cfg, sessionKey: params.sessionKey, }); if (!runtime.sandboxed) { return true; } const sandboxCfg = resolveSandboxConfigForAgent(params.cfg, runtime.agentId); return sandboxCfg.workspaceAccess === "rw"; })(); const isCli = isCliProvider(params.followupRun.run.provider, params.cfg); const canAttemptFlush = memoryFlushWritable && !params.isHeartbeat && !isCli; let entry = params.sessionEntry ?? (params.sessionKey ? params.sessionStore?.[params.sessionKey] : undefined); const contextWindowTokens = resolveMemoryFlushContextWindowTokens({ modelId: params.followupRun.run.model ?? params.defaultModel, agentCfgContextTokens: params.agentCfgContextTokens, }); const promptTokenEstimate = estimatePromptTokensForMemoryFlush( params.promptForEstimate ?? params.followupRun.prompt, ); const persistedPromptTokensRaw = entry?.totalTokens; const persistedPromptTokens = typeof persistedPromptTokensRaw === "number" && Number.isFinite(persistedPromptTokensRaw) && persistedPromptTokensRaw > 0 ? persistedPromptTokensRaw : undefined; const hasFreshPersistedPromptTokens = typeof persistedPromptTokens === "number" && entry?.totalTokensFresh === true; const flushThreshold = contextWindowTokens - memoryFlushSettings.reserveTokensFloor - memoryFlushSettings.softThresholdTokens; // When totals are stale/unknown, derive prompt + last output from transcript so memory // flush can still be evaluated against projected next-input size. // // When totals are fresh, only read the transcript when we're close enough to the // threshold that missing the last output tokens could flip the decision. const shouldReadTranscriptForOutput = canAttemptFlush && entry && hasFreshPersistedPromptTokens && typeof promptTokenEstimate === "number" && Number.isFinite(promptTokenEstimate) && flushThreshold > 0 && (persistedPromptTokens ?? 0) + promptTokenEstimate >= flushThreshold - TRANSCRIPT_OUTPUT_READ_BUFFER_TOKENS; const shouldReadTranscript = Boolean( canAttemptFlush && entry && (!hasFreshPersistedPromptTokens || shouldReadTranscriptForOutput), ); const forceFlushTranscriptBytes = memoryFlushSettings.forceFlushTranscriptBytes; const shouldCheckTranscriptSizeForForcedFlush = Boolean( canAttemptFlush && entry && Number.isFinite(forceFlushTranscriptBytes) && forceFlushTranscriptBytes > 0, ); const shouldReadSessionLog = shouldReadTranscript || shouldCheckTranscriptSizeForForcedFlush; const sessionLogSnapshot = shouldReadSessionLog ? await readSessionLogSnapshot({ sessionId: params.followupRun.run.sessionId, sessionEntry: entry, sessionKey: params.sessionKey ?? params.followupRun.run.sessionKey, opts: { storePath: params.storePath }, includeByteSize: shouldCheckTranscriptSizeForForcedFlush, includeUsage: shouldReadTranscript, }) : undefined; const transcriptByteSize = sessionLogSnapshot?.byteSize; const shouldForceFlushByTranscriptSize = typeof transcriptByteSize === "number" && transcriptByteSize >= forceFlushTranscriptBytes; const transcriptUsageSnapshot = sessionLogSnapshot?.usage; const transcriptPromptTokens = transcriptUsageSnapshot?.promptTokens; const transcriptOutputTokens = transcriptUsageSnapshot?.outputTokens; const hasReliableTranscriptPromptTokens = typeof transcriptPromptTokens === "number" && Number.isFinite(transcriptPromptTokens) && transcriptPromptTokens > 0; const shouldPersistTranscriptPromptTokens = hasReliableTranscriptPromptTokens && (!hasFreshPersistedPromptTokens || (transcriptPromptTokens ?? 0) > (persistedPromptTokens ?? 0)); if (entry && shouldPersistTranscriptPromptTokens) { const nextEntry = { ...entry, totalTokens: transcriptPromptTokens, totalTokensFresh: true, }; entry = nextEntry; if (params.sessionKey && params.sessionStore) { params.sessionStore[params.sessionKey] = nextEntry; } if (params.storePath && params.sessionKey) { try { const updatedEntry = await updateSessionStoreEntry({ storePath: params.storePath, sessionKey: params.sessionKey, update: async () => ({ totalTokens: transcriptPromptTokens, totalTokensFresh: true }), }); if (updatedEntry) { entry = updatedEntry; if (params.sessionStore) { params.sessionStore[params.sessionKey] = updatedEntry; } } } catch (err) { logVerbose(`failed to persist derived prompt totalTokens: ${String(err)}`); } } } const promptTokensSnapshot = Math.max( hasFreshPersistedPromptTokens ? (persistedPromptTokens ?? 0) : 0, hasReliableTranscriptPromptTokens ? (transcriptPromptTokens ?? 0) : 0, ); const hasFreshPromptTokensSnapshot = promptTokensSnapshot > 0 && (hasFreshPersistedPromptTokens || hasReliableTranscriptPromptTokens); const projectedTokenCount = hasFreshPromptTokensSnapshot ? resolveEffectivePromptTokens( promptTokensSnapshot, transcriptOutputTokens, promptTokenEstimate, ) : undefined; const tokenCountForFlush = typeof projectedTokenCount === "number" && Number.isFinite(projectedTokenCount) && projectedTokenCount > 0 ? projectedTokenCount : undefined; // Diagnostic logging to understand why memory flush may not trigger. logVerbose( `memoryFlush check: sessionKey=${params.sessionKey} ` + `tokenCount=${tokenCountForFlush ?? "undefined"} ` + `contextWindow=${contextWindowTokens} threshold=${flushThreshold} ` + `isHeartbeat=${params.isHeartbeat} isCli=${isCli} memoryFlushWritable=${memoryFlushWritable} ` + `compactionCount=${entry?.compactionCount ?? 0} memoryFlushCompactionCount=${entry?.memoryFlushCompactionCount ?? "undefined"} ` + `persistedPromptTokens=${persistedPromptTokens ?? "undefined"} persistedFresh=${entry?.totalTokensFresh === true} ` + `promptTokensEst=${promptTokenEstimate ?? "undefined"} transcriptPromptTokens=${transcriptPromptTokens ?? "undefined"} transcriptOutputTokens=${transcriptOutputTokens ?? "undefined"} ` + `projectedTokenCount=${projectedTokenCount ?? "undefined"} transcriptBytes=${transcriptByteSize ?? "undefined"} ` + `forceFlushTranscriptBytes=${forceFlushTranscriptBytes} forceFlushByTranscriptSize=${shouldForceFlushByTranscriptSize}`, ); const shouldFlushMemory = (memoryFlushSettings && memoryFlushWritable && !params.isHeartbeat && !isCli && shouldRunMemoryFlush({ entry, tokenCount: tokenCountForFlush, contextWindowTokens, reserveTokensFloor: memoryFlushSettings.reserveTokensFloor, softThresholdTokens: memoryFlushSettings.softThresholdTokens, })) || (shouldForceFlushByTranscriptSize && entry != null && !hasAlreadyFlushedForCurrentCompaction(entry)); if (!shouldFlushMemory) { return entry ?? params.sessionEntry; } logVerbose( `memoryFlush triggered: sessionKey=${params.sessionKey} tokenCount=${tokenCountForFlush ?? "undefined"} threshold=${flushThreshold}`, ); let activeSessionEntry = entry ?? params.sessionEntry; const activeSessionStore = params.sessionStore; let bootstrapPromptWarningSignaturesSeen = resolveBootstrapWarningSignaturesSeen( activeSessionEntry?.systemPromptReport ?? (params.sessionKey ? activeSessionStore?.[params.sessionKey]?.systemPromptReport : undefined), ); const flushRunId = crypto.randomUUID(); if (params.sessionKey) { registerAgentRunContext(flushRunId, { sessionKey: params.sessionKey, verboseLevel: params.resolvedVerboseLevel, }); } let memoryCompactionCompleted = false; const flushSystemPrompt = [ params.followupRun.run.extraSystemPrompt, memoryFlushSettings.systemPrompt, ] .filter(Boolean) .join("\n\n"); try { await runWithModelFallback({ ...resolveModelFallbackOptions(params.followupRun.run), run: async (provider, model, runOptions) => { const { authProfile, embeddedContext, senderContext } = buildEmbeddedRunContexts({ run: params.followupRun.run, sessionCtx: params.sessionCtx, hasRepliedRef: params.opts?.hasRepliedRef, provider, }); const runBaseParams = buildEmbeddedRunBaseParams({ run: params.followupRun.run, provider, model, runId: flushRunId, authProfile, allowTransientCooldownProbe: runOptions?.allowTransientCooldownProbe, }); const result = await runEmbeddedPiAgent({ ...embeddedContext, ...senderContext, ...runBaseParams, trigger: "memory", prompt: resolveMemoryFlushPromptForRun({ prompt: memoryFlushSettings.prompt, cfg: params.cfg, }), extraSystemPrompt: flushSystemPrompt, bootstrapPromptWarningSignaturesSeen, bootstrapPromptWarningSignature: bootstrapPromptWarningSignaturesSeen[bootstrapPromptWarningSignaturesSeen.length - 1], onAgentEvent: (evt) => { if (evt.stream === "compaction") { const phase = typeof evt.data.phase === "string" ? evt.data.phase : ""; if (phase === "end") { memoryCompactionCompleted = true; } } }, }); bootstrapPromptWarningSignaturesSeen = resolveBootstrapWarningSignaturesSeen( result.meta?.systemPromptReport, ); return result; }, }); let memoryFlushCompactionCount = activeSessionEntry?.compactionCount ?? (params.sessionKey ? activeSessionStore?.[params.sessionKey]?.compactionCount : 0) ?? 0; if (memoryCompactionCompleted) { const nextCount = await incrementCompactionCount({ sessionEntry: activeSessionEntry, sessionStore: activeSessionStore, sessionKey: params.sessionKey, storePath: params.storePath, }); if (typeof nextCount === "number") { memoryFlushCompactionCount = nextCount; } } if (params.storePath && params.sessionKey) { try { const updatedEntry = await updateSessionStoreEntry({ storePath: params.storePath, sessionKey: params.sessionKey, update: async () => ({ memoryFlushAt: Date.now(), memoryFlushCompactionCount, }), }); if (updatedEntry) { activeSessionEntry = updatedEntry; } } catch (err) { logVerbose(`failed to persist memory flush metadata: ${String(err)}`); } } } catch (err) { logVerbose(`memory flush run failed: ${String(err)}`); } return activeSessionEntry; }