diff --git a/CHANGELOG.md b/CHANGELOG.md index 210ce179a32..e5ed05e4ae0 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -52,6 +52,7 @@ Docs: https://docs.openclaw.ai - Docs/plugins: add the community DingTalk plugin listing to the docs catalog. (#29913) Thanks @sliverp. - Docs/plugins: add the community QQbot plugin listing to the docs catalog. (#29898) Thanks @sliverp. - Plugins/context engines: pass the embedded runner `modelId` into context-engine `assemble()` so plugins can adapt context formatting per model. (#47437) thanks @jscianna. +- Plugins/context engines: add transcript maintenance rewrites for context engines, preserve active-branch transcript metadata during rewrites, and harden overflow-recovery truncation to rewrite sessions under the normal session write lock. (#51191) Thanks @jalehman. ### Fixes diff --git a/src/agents/pi-embedded-runner/compact.hooks.test.ts b/src/agents/pi-embedded-runner/compact.hooks.test.ts index 1a97501959e..f8f486f230f 100644 --- a/src/agents/pi-embedded-runner/compact.hooks.test.ts +++ b/src/agents/pi-embedded-runner/compact.hooks.test.ts @@ -623,6 +623,36 @@ describe("compactEmbeddedPiSession hooks (ownsCompaction engine)", () => { } }); + it("runs maintain after successful compaction with a transcript rewrite helper", async () => { + const maintain = vi.fn(async (_params?: unknown) => ({ + changed: false, + bytesFreed: 0, + rewrittenEntries: 0, + })); + resolveContextEngineMock.mockResolvedValue({ + info: { ownsCompaction: true }, + compact: contextEngineCompactMock, + maintain, + } as never); + + const result = await compactEmbeddedPiSession(wrappedCompactionArgs()); + + expect(result.ok).toBe(true); + expect(maintain).toHaveBeenCalledWith( + expect.objectContaining({ + sessionKey: TEST_SESSION_KEY, + sessionFile: TEST_SESSION_FILE, + runtimeContext: expect.objectContaining({ + workspaceDir: TEST_WORKSPACE_DIR, + }), + }), + ); + const runtimeContext = ( + maintain.mock.calls[0]?.[0] as { runtimeContext?: Record } | undefined + )?.runtimeContext; + expect(typeof runtimeContext?.rewriteTranscriptEntries).toBe("function"); + }); + it("does not fire after_compaction when compaction fails", async () => { hookRunner.hasHooks.mockReturnValue(true); const sync = vi.fn(async () => {}); diff --git a/src/agents/pi-embedded-runner/compact.ts b/src/agents/pi-embedded-runner/compact.ts index d76a01ed5af..dd5806421a0 100644 --- a/src/agents/pi-embedded-runner/compact.ts +++ b/src/agents/pi-embedded-runner/compact.ts @@ -83,6 +83,7 @@ import { compactWithSafetyTimeout, resolveCompactionTimeoutMs, } from "./compaction-safety-timeout.js"; +import { runContextEngineMaintenance } from "./context-engine-maintenance.js"; import { buildEmbeddedExtensionFactories } from "./extensions.js"; import { logToolSchemasForGoogle, @@ -1226,6 +1227,16 @@ export async function compactEmbeddedPiSession( force: params.trigger === "manual", runtimeContext: params as Record, }); + if (result.ok && result.compacted) { + await runContextEngineMaintenance({ + contextEngine, + sessionId: params.sessionId, + sessionKey: params.sessionKey, + sessionFile: params.sessionFile, + reason: "compaction", + runtimeContext: params as Record, + }); + } if (engineOwnsCompaction && result.ok && result.compacted) { await runPostCompactionSideEffects({ config: params.config, diff --git a/src/agents/pi-embedded-runner/context-engine-maintenance.test.ts b/src/agents/pi-embedded-runner/context-engine-maintenance.test.ts new file mode 100644 index 00000000000..3c62e463620 --- /dev/null +++ b/src/agents/pi-embedded-runner/context-engine-maintenance.test.ts @@ -0,0 +1,150 @@ +import { beforeEach, describe, expect, it, vi } from "vitest"; + +const rewriteTranscriptEntriesInSessionManagerMock = vi.fn((_params?: unknown) => ({ + changed: true, + bytesFreed: 77, + rewrittenEntries: 1, +})); +const rewriteTranscriptEntriesInSessionFileMock = vi.fn(async (_params?: unknown) => ({ + changed: true, + bytesFreed: 123, + rewrittenEntries: 2, +})); + +vi.mock("./transcript-rewrite.js", () => ({ + rewriteTranscriptEntriesInSessionManager: (params: unknown) => + rewriteTranscriptEntriesInSessionManagerMock(params), + rewriteTranscriptEntriesInSessionFile: (params: unknown) => + rewriteTranscriptEntriesInSessionFileMock(params), +})); + +import { + buildContextEngineMaintenanceRuntimeContext, + runContextEngineMaintenance, +} from "./context-engine-maintenance.js"; + +describe("buildContextEngineMaintenanceRuntimeContext", () => { + beforeEach(() => { + rewriteTranscriptEntriesInSessionManagerMock.mockClear(); + rewriteTranscriptEntriesInSessionFileMock.mockClear(); + }); + + it("adds a transcript rewrite helper that targets the current session file", async () => { + const runtimeContext = buildContextEngineMaintenanceRuntimeContext({ + sessionId: "session-1", + sessionKey: "agent:main:session-1", + sessionFile: "/tmp/session.jsonl", + runtimeContext: { workspaceDir: "/tmp/workspace" }, + }); + + expect(runtimeContext.workspaceDir).toBe("/tmp/workspace"); + expect(typeof runtimeContext.rewriteTranscriptEntries).toBe("function"); + + const result = await runtimeContext.rewriteTranscriptEntries?.({ + replacements: [ + { entryId: "entry-1", message: { role: "user", content: "hi", timestamp: 1 } }, + ], + }); + + expect(result).toEqual({ + changed: true, + bytesFreed: 123, + rewrittenEntries: 2, + }); + expect(rewriteTranscriptEntriesInSessionFileMock).toHaveBeenCalledWith({ + sessionFile: "/tmp/session.jsonl", + sessionId: "session-1", + sessionKey: "agent:main:session-1", + request: { + replacements: [ + { entryId: "entry-1", message: { role: "user", content: "hi", timestamp: 1 } }, + ], + }, + }); + }); + + it("reuses the active session manager when one is provided", async () => { + const sessionManager = { appendMessage: vi.fn() } as unknown as Parameters< + typeof buildContextEngineMaintenanceRuntimeContext + >[0]["sessionManager"]; + const runtimeContext = buildContextEngineMaintenanceRuntimeContext({ + sessionId: "session-1", + sessionKey: "agent:main:session-1", + sessionFile: "/tmp/session.jsonl", + sessionManager, + }); + + const result = await runtimeContext.rewriteTranscriptEntries?.({ + replacements: [ + { entryId: "entry-1", message: { role: "user", content: "hi", timestamp: 1 } }, + ], + }); + + expect(result).toEqual({ + changed: true, + bytesFreed: 77, + rewrittenEntries: 1, + }); + expect(rewriteTranscriptEntriesInSessionManagerMock).toHaveBeenCalledWith({ + sessionManager, + replacements: [ + { entryId: "entry-1", message: { role: "user", content: "hi", timestamp: 1 } }, + ], + }); + expect(rewriteTranscriptEntriesInSessionFileMock).not.toHaveBeenCalled(); + }); +}); + +describe("runContextEngineMaintenance", () => { + beforeEach(() => { + rewriteTranscriptEntriesInSessionManagerMock.mockClear(); + rewriteTranscriptEntriesInSessionFileMock.mockClear(); + }); + + it("passes a rewrite-capable runtime context into maintain()", async () => { + const maintain = vi.fn(async (_params?: unknown) => ({ + changed: false, + bytesFreed: 0, + rewrittenEntries: 0, + })); + + const result = await runContextEngineMaintenance({ + contextEngine: { + info: { id: "test", name: "Test Engine" }, + ingest: async () => ({ ingested: true }), + assemble: async ({ messages }) => ({ messages, estimatedTokens: 0 }), + compact: async () => ({ ok: true, compacted: false }), + maintain, + }, + sessionId: "session-1", + sessionKey: "agent:main:session-1", + sessionFile: "/tmp/session.jsonl", + reason: "turn", + runtimeContext: { workspaceDir: "/tmp/workspace" }, + }); + + expect(result).toEqual({ + changed: false, + bytesFreed: 0, + rewrittenEntries: 0, + }); + expect(maintain).toHaveBeenCalledWith( + expect.objectContaining({ + sessionId: "session-1", + sessionKey: "agent:main:session-1", + sessionFile: "/tmp/session.jsonl", + runtimeContext: expect.objectContaining({ + workspaceDir: "/tmp/workspace", + }), + }), + ); + const runtimeContext = ( + maintain.mock.calls[0]?.[0] as + | { runtimeContext?: { rewriteTranscriptEntries?: (request: unknown) => Promise } } + | undefined + )?.runtimeContext as + | { rewriteTranscriptEntries?: (request: unknown) => Promise } + | undefined; + expect(typeof runtimeContext?.rewriteTranscriptEntries).toBe("function"); + }); +}); diff --git a/src/agents/pi-embedded-runner/context-engine-maintenance.ts b/src/agents/pi-embedded-runner/context-engine-maintenance.ts new file mode 100644 index 00000000000..88e417f5757 --- /dev/null +++ b/src/agents/pi-embedded-runner/context-engine-maintenance.ts @@ -0,0 +1,83 @@ +import type { + ContextEngine, + ContextEngineMaintenanceResult, + ContextEngineRuntimeContext, +} from "../../context-engine/types.js"; +import { log } from "./logger.js"; +import { + rewriteTranscriptEntriesInSessionFile, + rewriteTranscriptEntriesInSessionManager, +} from "./transcript-rewrite.js"; + +/** + * Attach runtime-owned transcript rewrite helpers to an existing + * context-engine runtime context payload. + */ +export function buildContextEngineMaintenanceRuntimeContext(params: { + sessionId: string; + sessionKey?: string; + sessionFile: string; + sessionManager?: Parameters[0]["sessionManager"]; + runtimeContext?: ContextEngineRuntimeContext; +}): ContextEngineRuntimeContext { + return { + ...params.runtimeContext, + rewriteTranscriptEntries: async (request) => { + if (params.sessionManager) { + return rewriteTranscriptEntriesInSessionManager({ + sessionManager: params.sessionManager, + replacements: request.replacements, + }); + } + return await rewriteTranscriptEntriesInSessionFile({ + sessionFile: params.sessionFile, + sessionId: params.sessionId, + sessionKey: params.sessionKey, + request, + }); + }, + }; +} + +/** + * Run optional context-engine transcript maintenance and normalize the result. + */ +export async function runContextEngineMaintenance(params: { + contextEngine?: ContextEngine; + sessionId: string; + sessionKey?: string; + sessionFile: string; + reason: "bootstrap" | "compaction" | "turn"; + sessionManager?: Parameters[0]["sessionManager"]; + runtimeContext?: ContextEngineRuntimeContext; +}): Promise { + if (typeof params.contextEngine?.maintain !== "function") { + return undefined; + } + + try { + const result = await params.contextEngine.maintain({ + sessionId: params.sessionId, + sessionKey: params.sessionKey, + sessionFile: params.sessionFile, + runtimeContext: buildContextEngineMaintenanceRuntimeContext({ + sessionId: params.sessionId, + sessionKey: params.sessionKey, + sessionFile: params.sessionFile, + sessionManager: params.sessionManager, + runtimeContext: params.runtimeContext, + }), + }); + if (result.changed) { + log.info( + `[context-engine] maintenance(${params.reason}) changed transcript ` + + `rewrittenEntries=${result.rewrittenEntries} bytesFreed=${result.bytesFreed} ` + + `sessionKey=${params.sessionKey ?? params.sessionId ?? "unknown"}`, + ); + } + return result; + } catch (err) { + log.warn(`context engine maintain failed (${params.reason}): ${String(err)}`); + return undefined; + } +} diff --git a/src/agents/pi-embedded-runner/run.overflow-compaction.harness.ts b/src/agents/pi-embedded-runner/run.overflow-compaction.harness.ts index 9e7853ef7d5..10c13dfe6fc 100644 --- a/src/agents/pi-embedded-runner/run.overflow-compaction.harness.ts +++ b/src/agents/pi-embedded-runner/run.overflow-compaction.harness.ts @@ -66,6 +66,7 @@ export const mockedEnsureRuntimePluginsLoaded = vi.fn<(params?: unknown) => void export const mockedPrepareProviderRuntimeAuth = vi.fn(async () => undefined); export const mockedRunEmbeddedAttempt = vi.fn<(params: unknown) => Promise>(); +export const mockedRunContextEngineMaintenance = vi.fn(async () => undefined); export const mockedSessionLikelyHasOversizedToolResults = vi.fn(() => false); export const mockedTruncateOversizedToolResultsInSession = vi.fn< () => Promise @@ -173,6 +174,8 @@ export function resetRunOverflowCompactionHarnessMocks(): void { mockedPrepareProviderRuntimeAuth.mockReset(); mockedPrepareProviderRuntimeAuth.mockResolvedValue(undefined); mockedRunEmbeddedAttempt.mockReset(); + mockedRunContextEngineMaintenance.mockReset(); + mockedRunContextEngineMaintenance.mockResolvedValue(undefined); mockedSessionLikelyHasOversizedToolResults.mockReset(); mockedSessionLikelyHasOversizedToolResults.mockReturnValue(false); mockedTruncateOversizedToolResultsInSession.mockReset(); @@ -303,6 +306,10 @@ export async function loadRunOverflowCompactionHarness(): Promise<{ runEmbeddedAttempt: mockedRunEmbeddedAttempt, })); + vi.doMock("./context-engine-maintenance.js", () => ({ + runContextEngineMaintenance: mockedRunContextEngineMaintenance, + })); + vi.doMock("./model.js", () => ({ resolveModelAsync: vi.fn(async () => ({ model: { diff --git a/src/agents/pi-embedded-runner/run.overflow-compaction.test.ts b/src/agents/pi-embedded-runner/run.overflow-compaction.test.ts index 1f5f0b6de35..56b4fbf0186 100644 --- a/src/agents/pi-embedded-runner/run.overflow-compaction.test.ts +++ b/src/agents/pi-embedded-runner/run.overflow-compaction.test.ts @@ -16,6 +16,7 @@ import { mockedContextEngine, mockedCompactDirect, mockedRunEmbeddedAttempt, + mockedRunContextEngineMaintenance, resetRunOverflowCompactionHarnessMocks, mockedSessionLikelyHasOversizedToolResults, mockedTruncateOversizedToolResultsInSession, @@ -35,6 +36,7 @@ describe("runEmbeddedPiAgent overflow compaction trigger routing", () => { beforeEach(() => { mockedRunEmbeddedAttempt.mockReset(); + mockedRunContextEngineMaintenance.mockReset(); mockedCompactDirect.mockReset(); mockedCoerceToFailoverError.mockReset(); mockedDescribeFailoverError.mockReset(); @@ -50,6 +52,7 @@ describe("runEmbeddedPiAgent overflow compaction trigger routing", () => { compacted: false, reason: "nothing to compact", }); + mockedRunContextEngineMaintenance.mockResolvedValue(undefined); mockedCoerceToFailoverError.mockReturnValue(null); mockedDescribeFailoverError.mockImplementation((err: unknown) => ({ message: err instanceof Error ? err.message : String(err), @@ -241,6 +244,37 @@ describe("runEmbeddedPiAgent overflow compaction trigger routing", () => { ); }); + it("runs maintenance after successful overflow-recovery compaction", async () => { + mockedContextEngine.info.ownsCompaction = true; + mockedRunEmbeddedAttempt + .mockResolvedValueOnce(makeAttemptResult({ promptError: makeOverflowError() })) + .mockResolvedValueOnce(makeAttemptResult({ promptError: null })); + mockedCompactDirect.mockResolvedValueOnce({ + ok: true, + compacted: true, + result: { + summary: "engine-owned compaction", + tokensAfter: 50, + }, + }); + + await runEmbeddedPiAgent(overflowBaseRunParams); + + expect(mockedRunContextEngineMaintenance).toHaveBeenCalledWith( + expect.objectContaining({ + contextEngine: mockedContextEngine, + sessionId: "test-session", + sessionKey: "test-key", + sessionFile: "/tmp/session.json", + reason: "compaction", + runtimeContext: expect.objectContaining({ + trigger: "overflow", + authProfileId: "test-profile", + }), + }), + ); + }); + it("guards thrown engine-owned overflow compaction attempts", async () => { mockedContextEngine.info.ownsCompaction = true; mockedGlobalHookRunner.hasHooks.mockImplementation( diff --git a/src/agents/pi-embedded-runner/run.ts b/src/agents/pi-embedded-runner/run.ts index a35c03d98ca..0c66203992f 100644 --- a/src/agents/pi-embedded-runner/run.ts +++ b/src/agents/pi-embedded-runner/run.ts @@ -66,6 +66,7 @@ import { ensureRuntimePluginsLoaded } from "../runtime-plugins.js"; import { derivePromptTokens, normalizeUsage, type UsageLike } from "../usage.js"; import { redactRunIdentifier, resolveRunWorkspaceDir } from "../workspace-run.js"; import { buildEmbeddedCompactionRuntimeContext } from "./compaction-runtime-context.js"; +import { runContextEngineMaintenance } from "./context-engine-maintenance.js"; import { resolveGlobalLane, resolveSessionLane } from "./lanes.js"; import { log } from "./logger.js"; import { resolveModelAsync } from "./model.js"; @@ -1131,6 +1132,39 @@ export async function runEmbeddedPiAgent( } } try { + const overflowCompactionRuntimeContext = { + ...buildEmbeddedCompactionRuntimeContext({ + sessionKey: params.sessionKey, + messageChannel: params.messageChannel, + messageProvider: params.messageProvider, + agentAccountId: params.agentAccountId, + currentChannelId: params.currentChannelId, + currentThreadTs: params.currentThreadTs, + currentMessageId: params.currentMessageId, + authProfileId: lastProfileId, + workspaceDir: resolvedWorkspace, + agentDir, + config: params.config, + skillsSnapshot: params.skillsSnapshot, + senderIsOwner: params.senderIsOwner, + senderId: params.senderId, + provider, + modelId, + thinkLevel, + reasoningLevel: params.reasoningLevel, + bashElevated: params.bashElevated, + extraSystemPrompt: params.extraSystemPrompt, + ownerNumbers: params.ownerNumbers, + }), + runId: params.runId, + trigger: "overflow", + ...(observedOverflowTokens !== undefined + ? { currentTokenCount: observedOverflowTokens } + : {}), + diagId: overflowDiagId, + attempt: overflowCompactionAttempts, + maxAttempts: MAX_OVERFLOW_COMPACTION_ATTEMPTS, + }; compactResult = await contextEngine.compact({ sessionId: params.sessionId, sessionKey: params.sessionKey, @@ -1141,40 +1175,18 @@ export async function runEmbeddedPiAgent( : {}), force: true, compactionTarget: "budget", - runtimeContext: { - ...buildEmbeddedCompactionRuntimeContext({ - sessionKey: params.sessionKey, - messageChannel: params.messageChannel, - messageProvider: params.messageProvider, - agentAccountId: params.agentAccountId, - currentChannelId: params.currentChannelId, - currentThreadTs: params.currentThreadTs, - currentMessageId: params.currentMessageId, - authProfileId: lastProfileId, - workspaceDir: resolvedWorkspace, - agentDir, - config: params.config, - skillsSnapshot: params.skillsSnapshot, - senderIsOwner: params.senderIsOwner, - senderId: params.senderId, - provider, - modelId, - thinkLevel, - reasoningLevel: params.reasoningLevel, - bashElevated: params.bashElevated, - extraSystemPrompt: params.extraSystemPrompt, - ownerNumbers: params.ownerNumbers, - }), - runId: params.runId, - trigger: "overflow", - ...(observedOverflowTokens !== undefined - ? { currentTokenCount: observedOverflowTokens } - : {}), - diagId: overflowDiagId, - attempt: overflowCompactionAttempts, - maxAttempts: MAX_OVERFLOW_COMPACTION_ATTEMPTS, - }, + runtimeContext: overflowCompactionRuntimeContext, }); + if (compactResult.ok && compactResult.compacted) { + await runContextEngineMaintenance({ + contextEngine, + sessionId: params.sessionId, + sessionKey: params.sessionKey, + sessionFile: params.sessionFile, + reason: "compaction", + runtimeContext: overflowCompactionRuntimeContext, + }); + } } catch (compactErr) { log.warn( `contextEngine.compact() threw during overflow recovery for ${provider}/${modelId}: ${String(compactErr)}`, diff --git a/src/agents/pi-embedded-runner/run/attempt.spawn-workspace.test.ts b/src/agents/pi-embedded-runner/run/attempt.spawn-workspace.test.ts index 082442045d3..20617816e6e 100644 --- a/src/agents/pi-embedded-runner/run/attempt.spawn-workspace.test.ts +++ b/src/agents/pi-embedded-runner/run/attempt.spawn-workspace.test.ts @@ -40,6 +40,7 @@ const hoisted = vi.hoisted(() => { })); const getGlobalHookRunnerMock = vi.fn<() => unknown>(() => undefined); const initializeGlobalHookRunnerMock = vi.fn(); + const runContextEngineMaintenanceMock = vi.fn(async (_params?: unknown) => undefined); const sessionManager = { getLeafEntry: vi.fn(() => null), branch: vi.fn(), @@ -57,6 +58,7 @@ const hoisted = vi.hoisted(() => { resolveBootstrapContextForRunMock, getGlobalHookRunnerMock, initializeGlobalHookRunnerMock, + runContextEngineMaintenanceMock, sessionManager, }; }); @@ -126,6 +128,10 @@ vi.mock("../skills-runtime.js", () => ({ }), })); +vi.mock("../context-engine-maintenance.js", () => ({ + runContextEngineMaintenance: (params: unknown) => hoisted.runContextEngineMaintenanceMock(params), +})); + vi.mock("../../docs-path.js", () => ({ resolveOpenClawDocsPath: async () => undefined, })); @@ -300,6 +306,7 @@ function resetEmbeddedAttemptHarness( contextFiles: [], }); hoisted.getGlobalHookRunnerMock.mockReset().mockReturnValue(undefined); + hoisted.runContextEngineMaintenanceMock.mockReset().mockResolvedValue(undefined); hoisted.sessionManager.getLeafEntry.mockReset().mockReturnValue(null); hoisted.sessionManager.branch.mockReset(); hoisted.sessionManager.resetLeaf.mockReset(); @@ -852,4 +859,55 @@ describe("runEmbeddedAttempt context engine sessionKey forwarding", () => { }), ).toBe(true); }); + + it("skips maintenance when afterTurn fails", async () => { + const { bootstrap, assemble } = createContextEngineBootstrapAndAssemble(); + const afterTurn = vi.fn(async () => { + throw new Error("afterTurn failed"); + }); + + const result = await runAttemptWithContextEngine({ + bootstrap, + assemble, + afterTurn, + }); + + expect(result.promptError).toBeNull(); + expect(afterTurn).toHaveBeenCalled(); + expect(hoisted.runContextEngineMaintenanceMock).not.toHaveBeenCalledWith( + expect.objectContaining({ reason: "turn" }), + ); + }); + + it("runs startup maintenance for existing sessions even without bootstrap()", async () => { + const { assemble } = createContextEngineBootstrapAndAssemble(); + + const result = await runAttemptWithContextEngine({ + assemble, + }); + + expect(result.promptError).toBeNull(); + expect(hoisted.runContextEngineMaintenanceMock).toHaveBeenCalledWith( + expect.objectContaining({ reason: "bootstrap" }), + ); + }); + + it("skips maintenance when ingestBatch fails", async () => { + const { bootstrap, assemble } = createContextEngineBootstrapAndAssemble(); + const ingestBatch = vi.fn(async () => { + throw new Error("ingestBatch failed"); + }); + + const result = await runAttemptWithContextEngine({ + bootstrap, + assemble, + ingestBatch, + }); + + expect(result.promptError).toBeNull(); + expect(ingestBatch).toHaveBeenCalled(); + expect(hoisted.runContextEngineMaintenanceMock).not.toHaveBeenCalledWith( + expect.objectContaining({ reason: "turn" }), + ); + }); }); diff --git a/src/agents/pi-embedded-runner/run/attempt.ts b/src/agents/pi-embedded-runner/run/attempt.ts index 31752946e96..346629566ea 100644 --- a/src/agents/pi-embedded-runner/run/attempt.ts +++ b/src/agents/pi-embedded-runner/run/attempt.ts @@ -106,6 +106,7 @@ import { appendCacheTtlTimestamp, isCacheTtlEligibleProvider } from "../cache-tt import type { CompactEmbeddedPiSessionParams } from "../compact.js"; import { buildEmbeddedCompactionRuntimeContext } from "../compaction-runtime-context.js"; import { resolveCompactionTimeoutMs } from "../compaction-safety-timeout.js"; +import { runContextEngineMaintenance } from "../context-engine-maintenance.js"; import { buildEmbeddedExtensionFactories } from "../extensions.js"; import { applyExtraParamsToAgent } from "../extra-params.js"; import { @@ -2035,12 +2036,27 @@ export async function runEmbeddedAttempt( }); trackSessionManagerAccess(params.sessionFile); - if (hadSessionFile && params.contextEngine?.bootstrap) { + if (hadSessionFile && (params.contextEngine?.bootstrap || params.contextEngine?.maintain)) { try { - await params.contextEngine.bootstrap({ + if (typeof params.contextEngine?.bootstrap === "function") { + await params.contextEngine.bootstrap({ + sessionId: params.sessionId, + sessionKey: params.sessionKey, + sessionFile: params.sessionFile, + }); + } + await runContextEngineMaintenance({ + contextEngine: params.contextEngine, sessionId: params.sessionId, sessionKey: params.sessionKey, sessionFile: params.sessionFile, + reason: "bootstrap", + sessionManager, + runtimeContext: buildAfterTurnRuntimeContext({ + attempt: params, + workspaceDir: effectiveWorkspace, + agentDir, + }), }); } catch (bootstrapErr) { log.warn(`context engine bootstrap failed: ${String(bootstrapErr)}`); @@ -2978,6 +2994,7 @@ export async function runEmbeddedAttempt( workspaceDir: effectiveWorkspace, agentDir, }); + let postTurnFinalizationSucceeded = true; if (typeof params.contextEngine.afterTurn === "function") { try { @@ -2991,6 +3008,7 @@ export async function runEmbeddedAttempt( runtimeContext: afterTurnRuntimeContext, }); } catch (afterTurnErr) { + postTurnFinalizationSucceeded = false; log.warn(`context engine afterTurn failed: ${String(afterTurnErr)}`); } } else { @@ -3005,6 +3023,7 @@ export async function runEmbeddedAttempt( messages: newMessages, }); } catch (ingestErr) { + postTurnFinalizationSucceeded = false; log.warn(`context engine ingest failed: ${String(ingestErr)}`); } } else { @@ -3016,12 +3035,25 @@ export async function runEmbeddedAttempt( message: msg, }); } catch (ingestErr) { + postTurnFinalizationSucceeded = false; log.warn(`context engine ingest failed: ${String(ingestErr)}`); } } } } } + + if (!promptError && !aborted && !yieldAborted && postTurnFinalizationSucceeded) { + await runContextEngineMaintenance({ + contextEngine: params.contextEngine, + sessionId: sessionIdUsed, + sessionKey: params.sessionKey, + sessionFile: params.sessionFile, + reason: "turn", + sessionManager, + runtimeContext: afterTurnRuntimeContext, + }); + } } cacheTrace?.recordStage("session:after", { diff --git a/src/agents/pi-embedded-runner/tool-result-truncation.test.ts b/src/agents/pi-embedded-runner/tool-result-truncation.test.ts index b65ed0a65e8..016130ff23d 100644 --- a/src/agents/pi-embedded-runner/tool-result-truncation.test.ts +++ b/src/agents/pi-embedded-runner/tool-result-truncation.test.ts @@ -1,13 +1,26 @@ import type { AgentMessage } from "@mariozechner/pi-agent-core"; import type { AssistantMessage, ToolResultMessage, UserMessage } from "@mariozechner/pi-ai"; -import { describe, expect, it } from "vitest"; +import { SessionManager } from "@mariozechner/pi-coding-agent"; +import { beforeEach, describe, expect, it, vi } from "vitest"; +import { onSessionTranscriptUpdate } from "../../sessions/transcript-events.js"; import { makeAgentAssistantMessage } from "../test-helpers/agent-message-fixtures.js"; + +const acquireSessionWriteLockReleaseMock = vi.hoisted(() => vi.fn(async () => {})); +const acquireSessionWriteLockMock = vi.hoisted(() => + vi.fn(async (_params?: unknown) => ({ release: acquireSessionWriteLockReleaseMock })), +); + +vi.mock("../session-write-lock.js", () => ({ + acquireSessionWriteLock: (params: unknown) => acquireSessionWriteLockMock(params), +})); + import { truncateToolResultText, truncateToolResultMessage, calculateMaxToolResultChars, getToolResultTextLength, truncateOversizedToolResultsInMessages, + truncateOversizedToolResultsInSession, isOversizedToolResult, sessionLikelyHasOversizedToolResults, HARD_MAX_TOOL_RESULT_CHARS, @@ -16,6 +29,12 @@ import { let testTimestamp = 1; const nextTimestamp = () => testTimestamp++; +beforeEach(() => { + testTimestamp = 1; + acquireSessionWriteLockMock.mockClear(); + acquireSessionWriteLockReleaseMock.mockClear(); +}); + function makeToolResult(text: string, toolCallId = "call_1"): ToolResultMessage { return { role: "toolResult", @@ -248,6 +267,54 @@ describe("truncateOversizedToolResultsInMessages", () => { }); }); +describe("truncateOversizedToolResultsInSession", () => { + it("acquires the session write lock before rewriting oversized tool results", async () => { + const sessionFile = "/tmp/tool-result-truncation-session.jsonl"; + const sessionManager = SessionManager.inMemory(); + sessionManager.appendMessage(makeUserMessage("hello")); + sessionManager.appendMessage(makeAssistantMessage("reading file")); + sessionManager.appendMessage(makeToolResult("x".repeat(500_000))); + + const openSpy = vi + .spyOn(SessionManager, "open") + .mockReturnValue(sessionManager as unknown as ReturnType); + const listener = vi.fn(); + const cleanup = onSessionTranscriptUpdate(listener); + + try { + const result = await truncateOversizedToolResultsInSession({ + sessionFile, + contextWindowTokens: 128_000, + sessionKey: "agent:main:test", + }); + + expect(result.truncated).toBe(true); + expect(result.truncatedCount).toBe(1); + expect(acquireSessionWriteLockMock).toHaveBeenCalledWith({ sessionFile }); + expect(acquireSessionWriteLockReleaseMock).toHaveBeenCalledTimes(1); + expect(listener).toHaveBeenCalledWith({ sessionFile }); + + const branch = sessionManager.getBranch(); + const rewrittenToolResult = branch.find( + (entry) => entry.type === "message" && entry.message.role === "toolResult", + ); + expect(rewrittenToolResult?.type).toBe("message"); + if ( + rewrittenToolResult?.type !== "message" || + rewrittenToolResult.message.role !== "toolResult" + ) { + throw new Error("expected rewritten tool result"); + } + const rewrittenText = getFirstToolResultText(rewrittenToolResult.message); + expect(rewrittenText.length).toBeLessThan(500_000); + expect(rewrittenText).toContain("truncated"); + } finally { + cleanup(); + openSpy.mockRestore(); + } + }); +}); + describe("sessionLikelyHasOversizedToolResults", () => { it("returns false when no tool results are oversized", () => { const messages = [makeUserMessage("hello"), makeToolResult("small result")]; diff --git a/src/agents/pi-embedded-runner/tool-result-truncation.ts b/src/agents/pi-embedded-runner/tool-result-truncation.ts index c8cbd1124bb..675c70228a3 100644 --- a/src/agents/pi-embedded-runner/tool-result-truncation.ts +++ b/src/agents/pi-embedded-runner/tool-result-truncation.ts @@ -1,7 +1,10 @@ import type { AgentMessage } from "@mariozechner/pi-agent-core"; import type { TextContent } from "@mariozechner/pi-ai"; import { SessionManager } from "@mariozechner/pi-coding-agent"; +import { emitSessionTranscriptUpdate } from "../../sessions/transcript-events.js"; +import { acquireSessionWriteLock } from "../session-write-lock.js"; import { log } from "./logger.js"; +import { rewriteTranscriptEntriesInSessionManager } from "./transcript-rewrite.js"; /** * Maximum share of the context window a single tool result should occupy. @@ -211,8 +214,10 @@ export async function truncateOversizedToolResultsInSession(params: { }): Promise<{ truncated: boolean; truncatedCount: number; reason?: string }> { const { sessionFile, contextWindowTokens } = params; const maxChars = calculateMaxToolResultChars(contextWindowTokens); + let sessionLock: Awaited> | undefined; try { + sessionLock = await acquireSessionWriteLock({ sessionFile }); const sessionManager = SessionManager.open(sessionFile); const branch = sessionManager.getBranch(); @@ -246,87 +251,46 @@ export async function truncateOversizedToolResultsInSession(params: { return { truncated: false, truncatedCount: 0, reason: "no oversized tool results" }; } - // Branch from the parent of the first oversized entry - const firstOversizedIdx = oversizedIndices[0]; - const firstOversizedEntry = branch[firstOversizedIdx]; - const branchFromId = firstOversizedEntry.parentId; - - if (!branchFromId) { - // The oversized entry is the root - very unusual but handle it - sessionManager.resetLeaf(); - } else { - sessionManager.branch(branchFromId); - } - - // Re-append all entries from the first oversized one onwards, - // with truncated tool results - const oversizedSet = new Set(oversizedIndices); - let truncatedCount = 0; - - for (let i = firstOversizedIdx; i < branch.length; i++) { - const entry = branch[i]; - - if (entry.type === "message") { - let message = entry.message; - - if (oversizedSet.has(i)) { - message = truncateToolResultMessage(message, maxChars); - truncatedCount++; - const newLength = getToolResultTextLength(message); - log.info( - `[tool-result-truncation] Truncated tool result: ` + - `originalEntry=${entry.id} newChars=${newLength} ` + - `sessionKey=${params.sessionKey ?? params.sessionId ?? "unknown"}`, - ); - } - - // appendMessage expects Message | CustomMessage | BashExecutionMessage - sessionManager.appendMessage(message as Parameters[0]); - } else if (entry.type === "compaction") { - sessionManager.appendCompaction( - entry.summary, - entry.firstKeptEntryId, - entry.tokensBefore, - entry.details, - entry.fromHook, - ); - } else if (entry.type === "thinking_level_change") { - sessionManager.appendThinkingLevelChange(entry.thinkingLevel); - } else if (entry.type === "model_change") { - sessionManager.appendModelChange(entry.provider, entry.modelId); - } else if (entry.type === "custom") { - sessionManager.appendCustomEntry(entry.customType, entry.data); - } else if (entry.type === "custom_message") { - sessionManager.appendCustomMessageEntry( - entry.customType, - entry.content, - entry.display, - entry.details, - ); - } else if (entry.type === "branch_summary") { - // Branch summaries reference specific entry IDs - skip to avoid inconsistency - continue; - } else if (entry.type === "label") { - // Labels reference specific entry IDs - skip to avoid inconsistency - continue; - } else if (entry.type === "session_info") { - if (entry.name) { - sessionManager.appendSessionInfo(entry.name); - } + const replacements = oversizedIndices.flatMap((index) => { + const entry = branch[index]; + if (!entry || entry.type !== "message") { + return []; } + const message = truncateToolResultMessage(entry.message, maxChars); + const newLength = getToolResultTextLength(message); + log.info( + `[tool-result-truncation] Truncated tool result: ` + + `originalEntry=${entry.id} newChars=${newLength} ` + + `sessionKey=${params.sessionKey ?? params.sessionId ?? "unknown"}`, + ); + return [{ entryId: entry.id, message }]; + }); + + const rewriteResult = rewriteTranscriptEntriesInSessionManager({ + sessionManager, + replacements, + }); + if (rewriteResult.changed) { + emitSessionTranscriptUpdate(sessionFile); } log.info( - `[tool-result-truncation] Truncated ${truncatedCount} tool result(s) in session ` + + `[tool-result-truncation] Truncated ${rewriteResult.rewrittenEntries} tool result(s) in session ` + `(contextWindow=${contextWindowTokens} maxChars=${maxChars}) ` + `sessionKey=${params.sessionKey ?? params.sessionId ?? "unknown"}`, ); - return { truncated: true, truncatedCount }; + return { + truncated: rewriteResult.changed, + truncatedCount: rewriteResult.rewrittenEntries, + reason: rewriteResult.reason, + }; } catch (err) { const errMsg = err instanceof Error ? err.message : String(err); log.warn(`[tool-result-truncation] Failed to truncate: ${errMsg}`); return { truncated: false, truncatedCount: 0, reason: errMsg }; + } finally { + await sessionLock?.release(); } } diff --git a/src/agents/pi-embedded-runner/transcript-rewrite.test.ts b/src/agents/pi-embedded-runner/transcript-rewrite.test.ts new file mode 100644 index 00000000000..0e698244962 --- /dev/null +++ b/src/agents/pi-embedded-runner/transcript-rewrite.test.ts @@ -0,0 +1,402 @@ +import type { AgentMessage } from "@mariozechner/pi-agent-core"; +import { SessionManager } from "@mariozechner/pi-coding-agent"; +import { beforeEach, describe, expect, it, vi } from "vitest"; +import { onSessionTranscriptUpdate } from "../../sessions/transcript-events.js"; +import { installSessionToolResultGuard } from "../session-tool-result-guard.js"; + +const acquireSessionWriteLockReleaseMock = vi.hoisted(() => vi.fn(async () => {})); +const acquireSessionWriteLockMock = vi.hoisted(() => + vi.fn(async (_params?: unknown) => ({ release: acquireSessionWriteLockReleaseMock })), +); + +vi.mock("../session-write-lock.js", () => ({ + acquireSessionWriteLock: (params: unknown) => acquireSessionWriteLockMock(params), +})); + +import { + rewriteTranscriptEntriesInSessionFile, + rewriteTranscriptEntriesInSessionManager, +} from "./transcript-rewrite.js"; + +type AppendMessage = Parameters[0]; + +function asAppendMessage(message: unknown): AppendMessage { + return message as AppendMessage; +} + +function getBranchMessages(sessionManager: SessionManager): AgentMessage[] { + return sessionManager + .getBranch() + .filter((entry) => entry.type === "message") + .map((entry) => entry.message); +} + +beforeEach(() => { + acquireSessionWriteLockMock.mockClear(); + acquireSessionWriteLockReleaseMock.mockClear(); +}); + +describe("rewriteTranscriptEntriesInSessionManager", () => { + it("branches from the first replaced message and re-appends the remaining suffix", () => { + const sessionManager = SessionManager.inMemory(); + sessionManager.appendMessage( + asAppendMessage({ + role: "user", + content: "read file", + timestamp: 1, + }), + ); + sessionManager.appendMessage( + asAppendMessage({ + role: "assistant", + content: [{ type: "toolCall", id: "call_1", name: "read", arguments: {} }], + timestamp: 2, + }), + ); + sessionManager.appendMessage( + asAppendMessage({ + role: "toolResult", + toolCallId: "call_1", + toolName: "read", + content: [{ type: "text", text: "x".repeat(8_000) }], + isError: false, + timestamp: 3, + }), + ); + sessionManager.appendMessage( + asAppendMessage({ + role: "assistant", + content: [{ type: "text", text: "summarized" }], + timestamp: 4, + }), + ); + + const toolResultEntry = sessionManager + .getBranch() + .find((entry) => entry.type === "message" && entry.message.role === "toolResult"); + expect(toolResultEntry).toBeDefined(); + + const result = rewriteTranscriptEntriesInSessionManager({ + sessionManager, + replacements: [ + { + entryId: toolResultEntry!.id, + message: { + role: "toolResult", + toolCallId: "call_1", + toolName: "read", + content: [{ type: "text", text: "[externalized file_123]" }], + isError: false, + timestamp: 3, + }, + }, + ], + }); + + expect(result).toMatchObject({ + changed: true, + rewrittenEntries: 1, + }); + expect(result.bytesFreed).toBeGreaterThan(0); + + const branchMessages = getBranchMessages(sessionManager); + expect(branchMessages.map((message) => message.role)).toEqual([ + "user", + "assistant", + "toolResult", + "assistant", + ]); + const rewrittenToolResult = branchMessages[2] as Extract; + expect(rewrittenToolResult.content).toEqual([ + { type: "text", text: "[externalized file_123]" }, + ]); + }); + + it("preserves active-branch labels after rewritten entries are re-appended", () => { + const sessionManager = SessionManager.inMemory(); + sessionManager.appendMessage( + asAppendMessage({ + role: "user", + content: "read file", + timestamp: 1, + }), + ); + sessionManager.appendMessage( + asAppendMessage({ + role: "assistant", + content: [{ type: "toolCall", id: "call_1", name: "read", arguments: {} }], + timestamp: 2, + }), + ); + const toolResultEntryId = sessionManager.appendMessage( + asAppendMessage({ + role: "toolResult", + toolCallId: "call_1", + toolName: "read", + content: [{ type: "text", text: "x".repeat(8_000) }], + isError: false, + timestamp: 3, + }), + ); + sessionManager.appendMessage( + asAppendMessage({ + role: "assistant", + content: [{ type: "text", text: "summarized" }], + timestamp: 4, + }), + ); + + const summaryEntry = sessionManager + .getBranch() + .find( + (entry) => + entry.type === "message" && + entry.message.role === "assistant" && + Array.isArray(entry.message.content) && + entry.message.content.some((part) => part.type === "text" && part.text === "summarized"), + ); + expect(summaryEntry).toBeDefined(); + sessionManager.appendLabelChange(summaryEntry!.id, "bookmark"); + + const result = rewriteTranscriptEntriesInSessionManager({ + sessionManager, + replacements: [ + { + entryId: toolResultEntryId, + message: { + role: "toolResult", + toolCallId: "call_1", + toolName: "read", + content: [{ type: "text", text: "[externalized file_123]" }], + isError: false, + timestamp: 3, + }, + }, + ], + }); + + expect(result.changed).toBe(true); + const rewrittenSummaryEntry = sessionManager + .getBranch() + .find( + (entry) => + entry.type === "message" && + entry.message.role === "assistant" && + Array.isArray(entry.message.content) && + entry.message.content.some((part) => part.type === "text" && part.text === "summarized"), + ); + expect(rewrittenSummaryEntry).toBeDefined(); + expect(sessionManager.getLabel(rewrittenSummaryEntry!.id)).toBe("bookmark"); + expect(sessionManager.getBranch().some((entry) => entry.type === "label")).toBe(true); + }); + + it("remaps compaction keep markers when rewritten entries change ids", () => { + const sessionManager = SessionManager.inMemory(); + sessionManager.appendMessage( + asAppendMessage({ + role: "user", + content: "read file", + timestamp: 1, + }), + ); + sessionManager.appendMessage( + asAppendMessage({ + role: "assistant", + content: [{ type: "toolCall", id: "call_1", name: "read", arguments: {} }], + timestamp: 2, + }), + ); + const toolResultEntryId = sessionManager.appendMessage( + asAppendMessage({ + role: "toolResult", + toolCallId: "call_1", + toolName: "read", + content: [{ type: "text", text: "x".repeat(8_000) }], + isError: false, + timestamp: 3, + }), + ); + const keptAssistantEntryId = sessionManager.appendMessage( + asAppendMessage({ + role: "assistant", + content: [{ type: "text", text: "keep me" }], + timestamp: 4, + }), + ); + sessionManager.appendCompaction("summary", keptAssistantEntryId, 123); + + const result = rewriteTranscriptEntriesInSessionManager({ + sessionManager, + replacements: [ + { + entryId: toolResultEntryId, + message: { + role: "toolResult", + toolCallId: "call_1", + toolName: "read", + content: [{ type: "text", text: "[externalized file_123]" }], + isError: false, + timestamp: 3, + }, + }, + ], + }); + + expect(result.changed).toBe(true); + const branch = sessionManager.getBranch(); + const keptAssistantEntry = branch.find( + (entry) => + entry.type === "message" && + entry.message.role === "assistant" && + Array.isArray(entry.message.content) && + entry.message.content.some((part) => part.type === "text" && part.text === "keep me"), + ); + const compactionEntry = branch.find((entry) => entry.type === "compaction"); + + expect(keptAssistantEntry).toBeDefined(); + expect(compactionEntry).toBeDefined(); + expect(compactionEntry?.firstKeptEntryId).toBe(keptAssistantEntry?.id); + expect(compactionEntry?.firstKeptEntryId).not.toBe(keptAssistantEntryId); + }); + + it("bypasses persistence hooks when replaying rewritten messages", () => { + const sessionManager = SessionManager.inMemory(); + sessionManager.appendMessage( + asAppendMessage({ + role: "user", + content: "run tool", + timestamp: 1, + }), + ); + const toolResultEntryId = sessionManager.appendMessage( + asAppendMessage({ + role: "toolResult", + toolCallId: "call_1", + toolName: "exec", + content: [{ type: "text", text: "before rewrite" }], + isError: false, + timestamp: 2, + }), + ); + sessionManager.appendMessage( + asAppendMessage({ + role: "assistant", + content: [{ type: "text", text: "summarized" }], + timestamp: 3, + }), + ); + installSessionToolResultGuard(sessionManager, { + transformToolResultForPersistence: (message) => ({ + ...(message as Extract), + content: [{ type: "text", text: "[hook transformed]" }], + }), + beforeMessageWriteHook: ({ message }) => + message.role === "assistant" ? { block: true } : undefined, + }); + + const result = rewriteTranscriptEntriesInSessionManager({ + sessionManager, + replacements: [ + { + entryId: toolResultEntryId, + message: { + role: "toolResult", + toolCallId: "call_1", + toolName: "exec", + content: [{ type: "text", text: "[exact replacement]" }], + isError: false, + timestamp: 2, + }, + }, + ], + }); + + expect(result.changed).toBe(true); + const branchMessages = getBranchMessages(sessionManager); + expect(branchMessages.map((message) => message.role)).toEqual([ + "user", + "toolResult", + "assistant", + ]); + expect((branchMessages[1] as Extract).content).toEqual([ + { type: "text", text: "[exact replacement]" }, + ]); + expect(branchMessages[2]).toMatchObject({ + role: "assistant", + content: [{ type: "text", text: "summarized" }], + }); + }); +}); + +describe("rewriteTranscriptEntriesInSessionFile", () => { + it("emits transcript updates when the active branch changes", async () => { + const sessionFile = "/tmp/session.jsonl"; + const sessionManager = SessionManager.inMemory(); + sessionManager.appendMessage( + asAppendMessage({ + role: "user", + content: "run tool", + timestamp: 1, + }), + ); + sessionManager.appendMessage( + asAppendMessage({ + role: "toolResult", + toolCallId: "call_1", + toolName: "exec", + content: [{ type: "text", text: "y".repeat(6_000) }], + isError: false, + timestamp: 2, + }), + ); + + const toolResultEntry = sessionManager + .getBranch() + .find((entry) => entry.type === "message" && entry.message.role === "toolResult"); + expect(toolResultEntry).toBeDefined(); + + const openSpy = vi + .spyOn(SessionManager, "open") + .mockReturnValue(sessionManager as unknown as ReturnType); + const listener = vi.fn(); + const cleanup = onSessionTranscriptUpdate(listener); + + try { + const result = await rewriteTranscriptEntriesInSessionFile({ + sessionFile, + sessionKey: "agent:main:test", + request: { + replacements: [ + { + entryId: toolResultEntry!.id, + message: { + role: "toolResult", + toolCallId: "call_1", + toolName: "exec", + content: [{ type: "text", text: "[file_ref:file_abc]" }], + isError: false, + timestamp: 2, + }, + }, + ], + }, + }); + + expect(result.changed).toBe(true); + expect(acquireSessionWriteLockMock).toHaveBeenCalledWith({ + sessionFile, + }); + expect(acquireSessionWriteLockReleaseMock).toHaveBeenCalledTimes(1); + expect(listener).toHaveBeenCalledWith({ sessionFile }); + + const rewrittenToolResult = getBranchMessages(sessionManager)[1] as Extract< + AgentMessage, + { role: "toolResult" } + >; + expect(rewrittenToolResult.content).toEqual([{ type: "text", text: "[file_ref:file_abc]" }]); + } finally { + cleanup(); + openSpy.mockRestore(); + } + }); +}); diff --git a/src/agents/pi-embedded-runner/transcript-rewrite.ts b/src/agents/pi-embedded-runner/transcript-rewrite.ts new file mode 100644 index 00000000000..48d93d445b6 --- /dev/null +++ b/src/agents/pi-embedded-runner/transcript-rewrite.ts @@ -0,0 +1,232 @@ +import type { AgentMessage } from "@mariozechner/pi-agent-core"; +import { SessionManager } from "@mariozechner/pi-coding-agent"; +import type { + TranscriptRewriteReplacement, + TranscriptRewriteRequest, + TranscriptRewriteResult, +} from "../../context-engine/types.js"; +import { emitSessionTranscriptUpdate } from "../../sessions/transcript-events.js"; +import { getRawSessionAppendMessage } from "../session-tool-result-guard.js"; +import { acquireSessionWriteLock } from "../session-write-lock.js"; +import { log } from "./logger.js"; + +type SessionManagerLike = ReturnType; +type SessionBranchEntry = ReturnType[number]; + +function estimateMessageBytes(message: AgentMessage): number { + return Buffer.byteLength(JSON.stringify(message), "utf8"); +} + +function remapEntryId( + entryId: string | null | undefined, + rewrittenEntryIds: ReadonlyMap, +): string | null { + if (!entryId) { + return null; + } + return rewrittenEntryIds.get(entryId) ?? entryId; +} + +function appendBranchEntry(params: { + sessionManager: SessionManagerLike; + entry: SessionBranchEntry; + rewrittenEntryIds: ReadonlyMap; + appendMessage: SessionManagerLike["appendMessage"]; +}): string { + const { sessionManager, entry, rewrittenEntryIds, appendMessage } = params; + if (entry.type === "message") { + return appendMessage(entry.message as Parameters[0]); + } + if (entry.type === "compaction") { + return sessionManager.appendCompaction( + entry.summary, + remapEntryId(entry.firstKeptEntryId, rewrittenEntryIds) ?? entry.firstKeptEntryId, + entry.tokensBefore, + entry.details, + entry.fromHook, + ); + } + if (entry.type === "thinking_level_change") { + return sessionManager.appendThinkingLevelChange(entry.thinkingLevel); + } + if (entry.type === "model_change") { + return sessionManager.appendModelChange(entry.provider, entry.modelId); + } + if (entry.type === "custom") { + return sessionManager.appendCustomEntry(entry.customType, entry.data); + } + if (entry.type === "custom_message") { + return sessionManager.appendCustomMessageEntry( + entry.customType, + entry.content, + entry.display, + entry.details, + ); + } + if (entry.type === "session_info") { + if (entry.name) { + return sessionManager.appendSessionInfo(entry.name); + } + return sessionManager.appendSessionInfo(""); + } + if (entry.type === "branch_summary") { + return sessionManager.branchWithSummary( + remapEntryId(entry.parentId, rewrittenEntryIds), + entry.summary, + entry.details, + entry.fromHook, + ); + } + return sessionManager.appendLabelChange( + remapEntryId(entry.targetId, rewrittenEntryIds) ?? entry.targetId, + entry.label, + ); +} + +/** + * Safely rewrites transcript message entries on the active branch by branching + * from the first rewritten message's parent and re-appending the suffix. + */ +export function rewriteTranscriptEntriesInSessionManager(params: { + sessionManager: SessionManagerLike; + replacements: TranscriptRewriteReplacement[]; +}): TranscriptRewriteResult { + const replacementsById = new Map( + params.replacements + .filter((replacement) => replacement.entryId.trim().length > 0) + .map((replacement) => [replacement.entryId, replacement.message]), + ); + if (replacementsById.size === 0) { + return { + changed: false, + bytesFreed: 0, + rewrittenEntries: 0, + reason: "no replacements requested", + }; + } + + const branch = params.sessionManager.getBranch(); + if (branch.length === 0) { + return { + changed: false, + bytesFreed: 0, + rewrittenEntries: 0, + reason: "empty session", + }; + } + + const matchedIndices: number[] = []; + let bytesFreed = 0; + + for (let index = 0; index < branch.length; index++) { + const entry = branch[index]; + if (entry.type !== "message") { + continue; + } + const replacement = replacementsById.get(entry.id); + if (!replacement) { + continue; + } + const originalBytes = estimateMessageBytes(entry.message); + const replacementBytes = estimateMessageBytes(replacement); + matchedIndices.push(index); + bytesFreed += Math.max(0, originalBytes - replacementBytes); + } + + if (matchedIndices.length === 0) { + return { + changed: false, + bytesFreed: 0, + rewrittenEntries: 0, + reason: "no matching message entries", + }; + } + + const firstMatchedEntry = branch[matchedIndices[0]] as + | Extract + | undefined; + // matchedIndices only contains indices of branch "message" entries. + if (!firstMatchedEntry) { + return { + changed: false, + bytesFreed: 0, + rewrittenEntries: 0, + reason: "invalid first rewrite target", + }; + } + + if (!firstMatchedEntry.parentId) { + params.sessionManager.resetLeaf(); + } else { + params.sessionManager.branch(firstMatchedEntry.parentId); + } + + // Maintenance rewrites should preserve the exact requested history without + // re-running persistence hooks or size truncation on replayed messages. + const appendMessage = getRawSessionAppendMessage(params.sessionManager); + const rewrittenEntryIds = new Map(); + for (let index = matchedIndices[0]; index < branch.length; index++) { + const entry = branch[index]; + const replacement = entry.type === "message" ? replacementsById.get(entry.id) : undefined; + const newEntryId = + replacement === undefined + ? appendBranchEntry({ + sessionManager: params.sessionManager, + entry, + rewrittenEntryIds, + appendMessage, + }) + : appendMessage(replacement as Parameters[0]); + rewrittenEntryIds.set(entry.id, newEntryId); + } + + return { + changed: true, + bytesFreed, + rewrittenEntries: matchedIndices.length, + }; +} + +/** + * Open a transcript file, rewrite message entries on the active branch, and + * emit a transcript update when the active branch changed. + */ +export async function rewriteTranscriptEntriesInSessionFile(params: { + sessionFile: string; + sessionId?: string; + sessionKey?: string; + request: TranscriptRewriteRequest; +}): Promise { + let sessionLock: Awaited> | undefined; + try { + sessionLock = await acquireSessionWriteLock({ + sessionFile: params.sessionFile, + }); + const sessionManager = SessionManager.open(params.sessionFile); + const result = rewriteTranscriptEntriesInSessionManager({ + sessionManager, + replacements: params.request.replacements, + }); + if (result.changed) { + emitSessionTranscriptUpdate(params.sessionFile); + log.info( + `[transcript-rewrite] rewrote ${result.rewrittenEntries} entr` + + `${result.rewrittenEntries === 1 ? "y" : "ies"} ` + + `bytesFreed=${result.bytesFreed} ` + + `sessionKey=${params.sessionKey ?? params.sessionId ?? "unknown"}`, + ); + } + return result; + } catch (err) { + const reason = err instanceof Error ? err.message : String(err); + log.warn(`[transcript-rewrite] failed: ${reason}`); + return { + changed: false, + bytesFreed: 0, + rewrittenEntries: 0, + reason, + }; + } finally { + await sessionLock?.release(); + } +} diff --git a/src/agents/session-tool-result-guard.ts b/src/agents/session-tool-result-guard.ts index 1060ae8b2bc..36150800fd5 100644 --- a/src/agents/session-tool-result-guard.ts +++ b/src/agents/session-tool-result-guard.ts @@ -16,6 +16,11 @@ import { extractToolCallsFromAssistant, extractToolResultId } from "./tool-call- const GUARD_TRUNCATION_SUFFIX = "\n\n⚠️ [Content truncated during persistence — original exceeded size limit. " + "Use offset/limit parameters or request specific sections for large content.]"; +const RAW_APPEND_MESSAGE = Symbol("openclaw.session.rawAppendMessage"); + +type SessionManagerWithRawAppend = SessionManager & { + [RAW_APPEND_MESSAGE]?: SessionManager["appendMessage"]; +}; /** * Truncate oversized text content blocks in a tool result message. @@ -68,6 +73,16 @@ function normalizePersistedToolResultName( return toolResult; } +/** + * Return the unguarded appendMessage implementation for a session manager. + */ +export function getRawSessionAppendMessage( + sessionManager: SessionManager, +): SessionManager["appendMessage"] { + const rawAppend = (sessionManager as SessionManagerWithRawAppend)[RAW_APPEND_MESSAGE]; + return rawAppend ?? sessionManager.appendMessage.bind(sessionManager); +} + export function installSessionToolResultGuard( sessionManager: SessionManager, opts?: { @@ -109,7 +124,8 @@ export function installSessionToolResultGuard( clearPendingToolResults: () => void; getPendingIds: () => string[]; } { - const originalAppend = sessionManager.appendMessage.bind(sessionManager); + const originalAppend = getRawSessionAppendMessage(sessionManager); + (sessionManager as SessionManagerWithRawAppend)[RAW_APPEND_MESSAGE] = originalAppend; const pendingState = createPendingToolCallState(); const persistMessage = (message: AgentMessage) => { const transformer = opts?.transformMessageForPersistence; diff --git a/src/context-engine/context-engine.test.ts b/src/context-engine/context-engine.test.ts index cf24bfd7a07..9596c4e310b 100644 --- a/src/context-engine/context-engine.test.ts +++ b/src/context-engine/context-engine.test.ts @@ -20,6 +20,7 @@ import type { ContextEngineInfo, AssembleResult, CompactResult, + ContextEngineMaintenanceResult, IngestResult, } from "./types.js"; @@ -118,6 +119,7 @@ class LegacySessionKeyStrictEngine implements ContextEngine { readonly ingestCalls: Array> = []; readonly assembleCalls: Array> = []; readonly compactCalls: Array> = []; + readonly maintainCalls: Array> = []; readonly ingestedMessages: AgentMessage[] = []; private rejectSessionKey(params: { sessionKey?: string }): void { @@ -172,6 +174,21 @@ class LegacySessionKeyStrictEngine implements ContextEngine { }, }; } + + async maintain(params: { + sessionId: string; + sessionKey?: string; + sessionFile: string; + runtimeContext?: Record; + }): Promise { + this.maintainCalls.push({ ...params }); + this.rejectSessionKey(params); + return { + changed: false, + bytesFreed: 0, + rewrittenEntries: 0, + }; + } } class SessionKeyRuntimeErrorEngine implements ContextEngine { @@ -463,6 +480,24 @@ describe("Legacy sessionKey compatibility", () => { expect(strictEngine.ingestedMessages).toEqual([firstMessage, secondMessage]); }); + it("retries strict maintain once and memoizes legacy mode there too", async () => { + const engineId = `legacy-sessionkey-maintain-${Date.now().toString(36)}`; + const strictEngine = new LegacySessionKeyStrictEngine(); + registerContextEngine(engineId, () => strictEngine); + + const engine = await resolveContextEngine(configWithSlot(engineId)); + + await engine.maintain?.({ + sessionId: "s1", + sessionKey: "agent:main:test", + sessionFile: "/tmp/session.json", + }); + + expect(strictEngine.maintainCalls).toHaveLength(2); + expect(strictEngine.maintainCalls[0]).toHaveProperty("sessionKey", "agent:main:test"); + expect(strictEngine.maintainCalls[1]).not.toHaveProperty("sessionKey"); + }); + it("does not retry non-compat runtime errors", async () => { const engineId = `sessionkey-runtime-${Date.now().toString(36)}`; const runtimeErrorEngine = new SessionKeyRuntimeErrorEngine(); diff --git a/src/context-engine/index.ts b/src/context-engine/index.ts index 09cc4c8e94e..fef9105d8be 100644 --- a/src/context-engine/index.ts +++ b/src/context-engine/index.ts @@ -3,7 +3,12 @@ export type { ContextEngineInfo, AssembleResult, CompactResult, + ContextEngineMaintenanceResult, + ContextEngineRuntimeContext, IngestResult, + TranscriptRewriteReplacement, + TranscriptRewriteRequest, + TranscriptRewriteResult, } from "./types.js"; export { diff --git a/src/context-engine/registry.ts b/src/context-engine/registry.ts index 2c5cac439c0..123227a7067 100644 --- a/src/context-engine/registry.ts +++ b/src/context-engine/registry.ts @@ -16,6 +16,7 @@ type RegisterContextEngineForOwnerOptions = { const LEGACY_SESSION_KEY_COMPAT = Symbol.for("openclaw.contextEngine.sessionKeyCompat"); const SESSION_KEY_COMPAT_METHODS = [ "bootstrap", + "maintain", "ingest", "ingestBatch", "afterTurn", diff --git a/src/context-engine/types.ts b/src/context-engine/types.ts index 438ae625d2d..98f3f376cbf 100644 --- a/src/context-engine/types.ts +++ b/src/context-engine/types.ts @@ -57,7 +57,43 @@ export type SubagentSpawnPreparation = { }; export type SubagentEndReason = "deleted" | "completed" | "swept" | "released"; -export type ContextEngineRuntimeContext = Record; + +export type TranscriptRewriteReplacement = { + /** Existing transcript entry id to replace on the active branch. */ + entryId: string; + /** Replacement message content for that entry. */ + message: AgentMessage; +}; + +export type TranscriptRewriteRequest = { + /** Message entry replacements to apply in one branch-and-reappend pass. */ + replacements: TranscriptRewriteReplacement[]; +}; + +export type TranscriptRewriteResult = { + /** Whether the active branch changed. */ + changed: boolean; + /** Estimated bytes removed from the active branch message payloads. */ + bytesFreed: number; + /** Number of transcript message entries rewritten. */ + rewrittenEntries: number; + /** Optional reason when no rewrite occurred. */ + reason?: string; +}; + +export type ContextEngineMaintenanceResult = TranscriptRewriteResult; + +export type ContextEngineRuntimeContext = Record & { + /** + * Safe transcript rewrite helper implemented by the runtime. + * + * Engines decide what is safe to rewrite; the runtime owns how the session + * DAG is updated on disk. + */ + rewriteTranscriptEntries?: ( + request: TranscriptRewriteRequest, + ) => Promise; +}; /** * ContextEngine defines the pluggable contract for context management. @@ -78,6 +114,19 @@ export interface ContextEngine { sessionFile: string; }): Promise; + /** + * Run transcript maintenance after bootstrap, successful turns, or compaction. + * + * Engines can use runtimeContext.rewriteTranscriptEntries() to request safe + * branch-and-reappend transcript rewrites without depending on Pi internals. + */ + maintain?(params: { + sessionId: string; + sessionKey?: string; + sessionFile: string; + runtimeContext?: ContextEngineRuntimeContext; + }): Promise; + /** * Ingest a single message into the engine's store. */ diff --git a/src/plugin-sdk/index.ts b/src/plugin-sdk/index.ts index 20f8a34672a..c80dbc37eaf 100644 --- a/src/plugin-sdk/index.ts +++ b/src/plugin-sdk/index.ts @@ -65,6 +65,15 @@ export type { ReplyPayload } from "../auto-reply/types.js"; export type { WizardPrompter } from "../wizard/prompts.js"; export type { ContextEngineFactory } from "../context-engine/registry.js"; export type { DiagnosticEventPayload } from "../infra/diagnostic-events.js"; +export type { + ContextEngine, + ContextEngineInfo, + ContextEngineMaintenanceResult, + ContextEngineRuntimeContext, + TranscriptRewriteReplacement, + TranscriptRewriteRequest, + TranscriptRewriteResult, +} from "../context-engine/types.js"; export { emptyPluginConfigSchema } from "../plugins/config-schema.js"; export { registerContextEngine } from "../context-engine/registry.js";