feat(compaction): truncate session JSONL after compaction to prevent unbounded growth (#41021)
Merged via squash. Prepared head SHA: fa50b635800f20b0732d4f34c6da404db4dbc95f Co-authored-by: thirumaleshp <85149081+thirumaleshp@users.noreply.github.com> Co-authored-by: jalehman <550978+jalehman@users.noreply.github.com> Reviewed-by: @jalehman
This commit is contained in:
parent
4c60956d8e
commit
c6968c39d6
@ -179,6 +179,7 @@ Docs: https://docs.openclaw.ai
|
||||
- Plugins/update: let `openclaw plugins update <npm-spec>` target tracked npm installs by dist-tag or exact version, and preserve the recorded npm spec for later id-based updates. (#49998) Thanks @huntharo.
|
||||
- Tests/CLI: reduce command-secret gateway test import pressure while keeping the real protocol payload validator in place, so the isolated lane no longer carries the heavier runtime-web and message-channel graphs. (#50663) Thanks @huntharo.
|
||||
- Gateway/plugins: share plugin interactive callback routing and plugin bind approval state across duplicate module graphs so Telegram Codex picker buttons and plugin bind approvals no longer fall through to normal inbound message routing. (#50722) Thanks @huntharo.
|
||||
- Agents/compaction: add an opt-in post-compaction session JSONL truncation step that drops summarized transcript entries while preserving the retained branch tail and live session metadata. (#41021) thanks @thirumaleshp.
|
||||
|
||||
### Breaking
|
||||
|
||||
|
||||
@ -96,6 +96,7 @@ import { buildEmbeddedMessageActionDiscoveryInput } from "./message-action-disco
|
||||
import { buildModelAliasLines, resolveModelAsync } from "./model.js";
|
||||
import { buildEmbeddedSandboxInfo } from "./sandbox-info.js";
|
||||
import { prewarmSessionFile, trackSessionManagerAccess } from "./session-manager-cache.js";
|
||||
import { truncateSessionAfterCompaction } from "./session-truncation.js";
|
||||
import { resolveEmbeddedRunSkillEntries } from "./skills-runtime.js";
|
||||
import {
|
||||
applySystemPromptOverrideToSession,
|
||||
@ -1085,6 +1086,25 @@ export async function compactEmbeddedPiSessionDirect(
|
||||
});
|
||||
}
|
||||
}
|
||||
// Truncate session file to remove compacted entries (#39953)
|
||||
if (params.config?.agents?.defaults?.compaction?.truncateAfterCompaction) {
|
||||
try {
|
||||
const truncResult = await truncateSessionAfterCompaction({
|
||||
sessionFile: params.sessionFile,
|
||||
});
|
||||
if (truncResult.truncated) {
|
||||
log.info(
|
||||
`[compaction] post-compaction truncation removed ${truncResult.entriesRemoved} entries ` +
|
||||
`(sessionKey=${params.sessionKey ?? params.sessionId})`,
|
||||
);
|
||||
}
|
||||
} catch (err) {
|
||||
log.warn("[compaction] post-compaction truncation failed", {
|
||||
errorMessage: err instanceof Error ? err.message : String(err),
|
||||
errorStack: err instanceof Error ? err.stack : undefined,
|
||||
});
|
||||
}
|
||||
}
|
||||
return {
|
||||
ok: true,
|
||||
compacted: true,
|
||||
|
||||
368
src/agents/pi-embedded-runner/session-truncation.test.ts
Normal file
368
src/agents/pi-embedded-runner/session-truncation.test.ts
Normal file
@ -0,0 +1,368 @@
|
||||
import fs from "node:fs/promises";
|
||||
import os from "node:os";
|
||||
import path from "node:path";
|
||||
import { SessionManager } from "@mariozechner/pi-coding-agent";
|
||||
import { afterEach, describe, expect, it } from "vitest";
|
||||
import { makeAgentAssistantMessage } from "../test-helpers/agent-message-fixtures.js";
|
||||
import { truncateSessionAfterCompaction } from "./session-truncation.js";
|
||||
|
||||
let tmpDir: string;
|
||||
|
||||
async function createTmpDir(): Promise<string> {
|
||||
tmpDir = await fs.mkdtemp(path.join(os.tmpdir(), "session-truncation-test-"));
|
||||
return tmpDir;
|
||||
}
|
||||
|
||||
afterEach(async () => {
|
||||
if (tmpDir) {
|
||||
await fs.rm(tmpDir, { recursive: true, force: true }).catch(() => {});
|
||||
}
|
||||
});
|
||||
|
||||
function makeAssistant(text: string, timestamp: number) {
|
||||
return makeAgentAssistantMessage({
|
||||
content: [{ type: "text", text }],
|
||||
timestamp,
|
||||
});
|
||||
}
|
||||
|
||||
function createSessionWithCompaction(sessionDir: string): string {
|
||||
const sm = SessionManager.create(sessionDir, sessionDir);
|
||||
// Add messages before compaction
|
||||
sm.appendMessage({ role: "user", content: "hello", timestamp: 1 });
|
||||
sm.appendMessage(makeAssistant("hi there", 2));
|
||||
sm.appendMessage({ role: "user", content: "do something", timestamp: 3 });
|
||||
sm.appendMessage(makeAssistant("done", 4));
|
||||
|
||||
// Add compaction (summarizing the above)
|
||||
const branch = sm.getBranch();
|
||||
const firstKeptId = branch[branch.length - 1].id;
|
||||
sm.appendCompaction("Summary of conversation so far.", firstKeptId, 5000);
|
||||
|
||||
// Add messages after compaction
|
||||
sm.appendMessage({ role: "user", content: "next task", timestamp: 5 });
|
||||
sm.appendMessage(makeAssistant("working on it", 6));
|
||||
|
||||
return sm.getSessionFile()!;
|
||||
}
|
||||
|
||||
describe("truncateSessionAfterCompaction", () => {
|
||||
it("removes entries before compaction and keeps entries after (#39953)", async () => {
|
||||
const dir = await createTmpDir();
|
||||
const sessionFile = createSessionWithCompaction(dir);
|
||||
|
||||
// Verify pre-truncation state
|
||||
const smBefore = SessionManager.open(sessionFile);
|
||||
const entriesBefore = smBefore.getEntries().length;
|
||||
expect(entriesBefore).toBeGreaterThan(5); // 4 messages + compaction + 2 messages
|
||||
|
||||
const result = await truncateSessionAfterCompaction({ sessionFile });
|
||||
|
||||
expect(result.truncated).toBe(true);
|
||||
expect(result.entriesRemoved).toBeGreaterThan(0);
|
||||
expect(result.bytesAfter).toBeLessThan(result.bytesBefore!);
|
||||
|
||||
// Verify post-truncation: file is still a valid session
|
||||
const smAfter = SessionManager.open(sessionFile);
|
||||
const entriesAfter = smAfter.getEntries().length;
|
||||
expect(entriesAfter).toBeLessThan(entriesBefore);
|
||||
|
||||
// The branch should contain the firstKeptEntryId message (unsummarized
|
||||
// tail), compaction, and post-compaction messages
|
||||
const branchAfter = smAfter.getBranch();
|
||||
// The firstKeptEntryId message is preserved as the new root
|
||||
expect(branchAfter[0].type).toBe("message");
|
||||
expect(branchAfter[0].parentId).toBeNull();
|
||||
expect(branchAfter[1].type).toBe("compaction");
|
||||
|
||||
// Session context should still work
|
||||
const ctx = smAfter.buildSessionContext();
|
||||
expect(ctx.messages.length).toBeGreaterThan(0);
|
||||
});
|
||||
|
||||
it("skips truncation when no compaction entry exists", async () => {
|
||||
const dir = await createTmpDir();
|
||||
const sm = SessionManager.create(dir, dir);
|
||||
// appendMessage implicitly creates the session file
|
||||
sm.appendMessage({ role: "user", content: "hello", timestamp: 1 });
|
||||
sm.appendMessage(makeAssistant("hi", 2));
|
||||
sm.appendMessage({ role: "user", content: "bye", timestamp: 3 });
|
||||
const sessionFile = sm.getSessionFile()!;
|
||||
|
||||
const result = await truncateSessionAfterCompaction({ sessionFile });
|
||||
|
||||
expect(result.truncated).toBe(false);
|
||||
expect(result.reason).toBe("no compaction entry found");
|
||||
});
|
||||
|
||||
it("is idempotent — second truncation is a no-op", async () => {
|
||||
const dir = await createTmpDir();
|
||||
const sessionFile = createSessionWithCompaction(dir);
|
||||
|
||||
const first = await truncateSessionAfterCompaction({ sessionFile });
|
||||
expect(first.truncated).toBe(true);
|
||||
|
||||
// Run again — no message entries left to remove
|
||||
const second = await truncateSessionAfterCompaction({ sessionFile });
|
||||
expect(second.truncated).toBe(false);
|
||||
});
|
||||
|
||||
it("archives original file when archivePath is provided (#39953)", async () => {
|
||||
const dir = await createTmpDir();
|
||||
const sessionFile = createSessionWithCompaction(dir);
|
||||
const archivePath = path.join(dir, "archive", "backup.jsonl");
|
||||
|
||||
const result = await truncateSessionAfterCompaction({ sessionFile, archivePath });
|
||||
|
||||
expect(result.truncated).toBe(true);
|
||||
const archiveExists = await fs
|
||||
.stat(archivePath)
|
||||
.then(() => true)
|
||||
.catch(() => false);
|
||||
expect(archiveExists).toBe(true);
|
||||
|
||||
// Archive should be larger than truncated file (it has the full history)
|
||||
const archiveSize = (await fs.stat(archivePath)).size;
|
||||
const truncatedSize = (await fs.stat(sessionFile)).size;
|
||||
expect(archiveSize).toBeGreaterThan(truncatedSize);
|
||||
});
|
||||
|
||||
it("handles multiple compaction cycles (#39953)", async () => {
|
||||
const dir = await createTmpDir();
|
||||
const sm = SessionManager.create(dir, dir);
|
||||
|
||||
// First cycle: messages + compaction
|
||||
sm.appendMessage({ role: "user", content: "cycle 1 message 1", timestamp: 1 });
|
||||
sm.appendMessage(makeAssistant("response 1", 2));
|
||||
const branch1 = sm.getBranch();
|
||||
sm.appendCompaction("Summary of cycle 1.", branch1[branch1.length - 1].id, 3000);
|
||||
|
||||
// Second cycle: more messages + another compaction
|
||||
sm.appendMessage({ role: "user", content: "cycle 2 message 1", timestamp: 3 });
|
||||
sm.appendMessage(makeAssistant("response 2", 4));
|
||||
const branch2 = sm.getBranch();
|
||||
sm.appendCompaction("Summary of cycles 1 and 2.", branch2[branch2.length - 1].id, 6000);
|
||||
|
||||
// Post-compaction messages
|
||||
sm.appendMessage({ role: "user", content: "final question", timestamp: 5 });
|
||||
|
||||
const sessionFile = sm.getSessionFile()!;
|
||||
const entriesBefore = sm.getEntries().length;
|
||||
|
||||
const result = await truncateSessionAfterCompaction({ sessionFile });
|
||||
|
||||
expect(result.truncated).toBe(true);
|
||||
|
||||
// Should preserve both compactions (older compactions are non-message state)
|
||||
// but remove the summarized message entries
|
||||
const smAfter = SessionManager.open(sessionFile);
|
||||
const branchAfter = smAfter.getBranch();
|
||||
expect(branchAfter[0].type).toBe("compaction");
|
||||
|
||||
// Both compaction entries are preserved (non-message state is kept)
|
||||
const compactionEntries = branchAfter.filter((e) => e.type === "compaction");
|
||||
expect(compactionEntries).toHaveLength(2);
|
||||
|
||||
// But message entries before the latest compaction were removed
|
||||
const entriesAfter = smAfter.getEntries().length;
|
||||
expect(entriesAfter).toBeLessThan(entriesBefore);
|
||||
|
||||
// Only the firstKeptEntryId message should remain before the latest compaction
|
||||
const latestCompIdx = branchAfter.findIndex(
|
||||
(e) => e.type === "compaction" && e === compactionEntries[compactionEntries.length - 1],
|
||||
);
|
||||
const messagesBeforeLatest = branchAfter
|
||||
.slice(0, latestCompIdx)
|
||||
.filter((e) => e.type === "message");
|
||||
expect(messagesBeforeLatest).toHaveLength(1);
|
||||
});
|
||||
|
||||
it("preserves non-message session state during truncation", async () => {
|
||||
const dir = await createTmpDir();
|
||||
const sm = SessionManager.create(dir, dir);
|
||||
|
||||
// Messages before compaction
|
||||
sm.appendMessage({ role: "user", content: "hello", timestamp: 1 });
|
||||
sm.appendMessage(makeAssistant("hi", 2));
|
||||
|
||||
// Non-message state entries interleaved with messages
|
||||
sm.appendModelChange("anthropic", "claude-sonnet-4-5-20250514");
|
||||
sm.appendThinkingLevelChange("high");
|
||||
sm.appendCustomEntry("my-extension", { key: "value" });
|
||||
sm.appendSessionInfo("my session");
|
||||
|
||||
sm.appendMessage({ role: "user", content: "do task", timestamp: 3 });
|
||||
sm.appendMessage(makeAssistant("done", 4));
|
||||
|
||||
// Compaction summarizing the conversation
|
||||
const branch = sm.getBranch();
|
||||
const firstKeptId = branch[branch.length - 1].id;
|
||||
sm.appendCompaction("Summary.", firstKeptId, 5000);
|
||||
|
||||
// Post-compaction messages
|
||||
sm.appendMessage({ role: "user", content: "next", timestamp: 5 });
|
||||
|
||||
const sessionFile = sm.getSessionFile()!;
|
||||
const result = await truncateSessionAfterCompaction({ sessionFile });
|
||||
|
||||
expect(result.truncated).toBe(true);
|
||||
|
||||
// Verify non-message entries are preserved
|
||||
const smAfter = SessionManager.open(sessionFile);
|
||||
const allAfter = smAfter.getEntries();
|
||||
const types = allAfter.map((e) => e.type);
|
||||
|
||||
expect(types).toContain("model_change");
|
||||
expect(types).toContain("thinking_level_change");
|
||||
expect(types).toContain("custom");
|
||||
expect(types).toContain("session_info");
|
||||
expect(types).toContain("compaction");
|
||||
|
||||
// Only the firstKeptEntryId message should remain before the compaction
|
||||
// (all other messages before it were summarized and removed)
|
||||
const branchAfter = smAfter.getBranch();
|
||||
const compIdx = branchAfter.findIndex((e) => e.type === "compaction");
|
||||
const msgsBefore = branchAfter.slice(0, compIdx).filter((e) => e.type === "message");
|
||||
expect(msgsBefore).toHaveLength(1);
|
||||
|
||||
// Session context should still work
|
||||
const ctx = smAfter.buildSessionContext();
|
||||
expect(ctx.messages.length).toBeGreaterThan(0);
|
||||
// Non-message state entries are preserved in the truncated file
|
||||
expect(ctx.model).toBeDefined();
|
||||
expect(ctx.thinkingLevel).toBe("high");
|
||||
});
|
||||
|
||||
it("drops label entries whose target message was truncated", async () => {
|
||||
const dir = await createTmpDir();
|
||||
const sm = SessionManager.create(dir, dir);
|
||||
|
||||
// Messages before compaction
|
||||
sm.appendMessage({ role: "user", content: "hello", timestamp: 1 });
|
||||
sm.appendMessage(makeAssistant("hi", 2));
|
||||
sm.appendMessage({ role: "user", content: "do task", timestamp: 3 });
|
||||
sm.appendMessage(makeAssistant("done", 4));
|
||||
|
||||
// Capture a pre-compaction message that will be summarized away.
|
||||
const branch = sm.getBranch();
|
||||
const preCompactionMsgId = branch[1].id; // "hi" message
|
||||
|
||||
// Compaction summarizing the conversation
|
||||
const firstKeptId = branch[branch.length - 1].id;
|
||||
sm.appendCompaction("Summary.", firstKeptId, 5000);
|
||||
|
||||
// Post-compaction messages
|
||||
sm.appendMessage({ role: "user", content: "next", timestamp: 5 });
|
||||
sm.appendLabelChange(preCompactionMsgId, "my-label");
|
||||
|
||||
const sessionFile = sm.getSessionFile()!;
|
||||
const labelEntry = sm.getEntries().find((entry) => entry.type === "label");
|
||||
expect(labelEntry?.parentId).not.toBe(preCompactionMsgId);
|
||||
|
||||
const smBefore = SessionManager.open(sessionFile);
|
||||
expect(smBefore.getLabel(preCompactionMsgId)).toBe("my-label");
|
||||
|
||||
const result = await truncateSessionAfterCompaction({ sessionFile });
|
||||
|
||||
expect(result.truncated).toBe(true);
|
||||
|
||||
// Verify label metadata was dropped with the removed target message.
|
||||
const smAfter = SessionManager.open(sessionFile);
|
||||
const allAfter = smAfter.getEntries();
|
||||
const labels = allAfter.filter((e) => e.type === "label");
|
||||
expect(labels).toHaveLength(0);
|
||||
expect(smAfter.getLabel(preCompactionMsgId)).toBeUndefined();
|
||||
});
|
||||
|
||||
it("preserves the firstKeptEntryId unsummarized tail", async () => {
|
||||
const dir = await createTmpDir();
|
||||
const sm = SessionManager.create(dir, dir);
|
||||
|
||||
// Build a conversation where firstKeptEntryId is NOT the last message
|
||||
sm.appendMessage({ role: "user", content: "msg1", timestamp: 1 });
|
||||
sm.appendMessage(makeAssistant("resp1", 2));
|
||||
sm.appendMessage({ role: "user", content: "msg2", timestamp: 3 });
|
||||
sm.appendMessage(makeAssistant("resp2", 4));
|
||||
|
||||
const branch = sm.getBranch();
|
||||
// Set firstKeptEntryId to the second message — so msg1 is summarized
|
||||
// but msg2, resp2, and everything after are the unsummarized tail.
|
||||
const firstKeptId = branch[1].id; // "resp1"
|
||||
sm.appendCompaction("Summary of msg1.", firstKeptId, 2000);
|
||||
|
||||
sm.appendMessage({ role: "user", content: "next", timestamp: 5 });
|
||||
|
||||
const sessionFile = sm.getSessionFile()!;
|
||||
const result = await truncateSessionAfterCompaction({ sessionFile });
|
||||
|
||||
expect(result.truncated).toBe(true);
|
||||
// Only msg1 was summarized (1 entry removed)
|
||||
expect(result.entriesRemoved).toBe(1);
|
||||
|
||||
// Verify the unsummarized tail is preserved
|
||||
const smAfter = SessionManager.open(sessionFile);
|
||||
const branchAfter = smAfter.getBranch();
|
||||
const types = branchAfter.map((e) => e.type);
|
||||
// resp1 (firstKeptEntryId), msg2, resp2, compaction, next
|
||||
expect(types).toEqual(["message", "message", "message", "compaction", "message"]);
|
||||
|
||||
// buildSessionContext should include the unsummarized tail
|
||||
const ctx = smAfter.buildSessionContext();
|
||||
expect(ctx.messages.length).toBeGreaterThan(2);
|
||||
});
|
||||
|
||||
it("preserves unsummarized sibling branches during truncation", async () => {
|
||||
const dir = await createTmpDir();
|
||||
const sm = SessionManager.create(dir, dir);
|
||||
|
||||
// Build main conversation
|
||||
sm.appendMessage({ role: "user", content: "hello", timestamp: 1 });
|
||||
sm.appendMessage(makeAssistant("hi there", 2));
|
||||
|
||||
// Save a branch point
|
||||
const branchPoint = sm.getBranch();
|
||||
const branchFromId = branchPoint[branchPoint.length - 1].id;
|
||||
|
||||
// Continue main branch
|
||||
sm.appendMessage({ role: "user", content: "do task A", timestamp: 3 });
|
||||
sm.appendMessage(makeAssistant("done A", 4));
|
||||
|
||||
// Create a sibling branch from the earlier point
|
||||
sm.branch(branchFromId);
|
||||
sm.appendMessage({ role: "user", content: "do task B instead", timestamp: 5 });
|
||||
const siblingMsg = sm.appendMessage(makeAssistant("done B", 6));
|
||||
|
||||
// Go back to main branch tip and add compaction there
|
||||
sm.branch(branchFromId);
|
||||
sm.appendMessage({ role: "user", content: "do task A", timestamp: 3 });
|
||||
sm.appendMessage(makeAssistant("done A take 2", 7));
|
||||
const mainBranch = sm.getBranch();
|
||||
const firstKeptId = mainBranch[mainBranch.length - 1].id;
|
||||
sm.appendCompaction("Summary of main branch.", firstKeptId, 5000);
|
||||
sm.appendMessage({ role: "user", content: "next", timestamp: 8 });
|
||||
|
||||
const sessionFile = sm.getSessionFile()!;
|
||||
|
||||
const entriesBefore = sm.getEntries();
|
||||
|
||||
const result = await truncateSessionAfterCompaction({ sessionFile });
|
||||
|
||||
expect(result.truncated).toBe(true);
|
||||
|
||||
// Verify sibling branch is preserved in the full entry list
|
||||
const smAfter = SessionManager.open(sessionFile);
|
||||
const allAfter = smAfter.getEntries();
|
||||
|
||||
// The sibling branch message should still exist
|
||||
const siblingAfter = allAfter.find((e) => e.id === siblingMsg);
|
||||
expect(siblingAfter).toBeDefined();
|
||||
|
||||
// The tree should have entries from both branches
|
||||
const tree = smAfter.getTree();
|
||||
expect(tree.length).toBeGreaterThan(0);
|
||||
|
||||
// Total entries should be less (main branch messages removed) but not zero
|
||||
expect(allAfter.length).toBeGreaterThan(0);
|
||||
expect(allAfter.length).toBeLessThan(entriesBefore.length);
|
||||
});
|
||||
});
|
||||
226
src/agents/pi-embedded-runner/session-truncation.ts
Normal file
226
src/agents/pi-embedded-runner/session-truncation.ts
Normal file
@ -0,0 +1,226 @@
|
||||
import fs from "node:fs/promises";
|
||||
import path from "node:path";
|
||||
import type { CompactionEntry, SessionEntry } from "@mariozechner/pi-coding-agent";
|
||||
import { SessionManager } from "@mariozechner/pi-coding-agent";
|
||||
import { log } from "./logger.js";
|
||||
|
||||
/**
|
||||
* Truncate a session JSONL file after compaction by removing only the
|
||||
* message entries that the compaction actually summarized.
|
||||
*
|
||||
* After compaction, the session file still contains all historical entries
|
||||
* even though `buildSessionContext()` logically skips entries before
|
||||
* `firstKeptEntryId`. Over many compaction cycles this causes unbounded
|
||||
* file growth (issue #39953).
|
||||
*
|
||||
* This function rewrites the file keeping:
|
||||
* 1. The session header
|
||||
* 2. All non-message session state (custom, model_change, thinking_level_change,
|
||||
* session_info, custom_message, compaction entries)
|
||||
* Note: label and branch_summary entries referencing removed messages are
|
||||
* also dropped to avoid dangling metadata.
|
||||
* 3. All entries from sibling branches not covered by the compaction
|
||||
* 4. The unsummarized tail: entries from `firstKeptEntryId` through (and
|
||||
* including) the compaction entry, plus all entries after it
|
||||
*
|
||||
* Only `message` entries in the current branch that precede the compaction's
|
||||
* `firstKeptEntryId` are removed — they are the entries the compaction
|
||||
* actually summarized. Entries from `firstKeptEntryId` onward are preserved
|
||||
* because `buildSessionContext()` expects them when reconstructing the
|
||||
* session. Entries whose parent was removed are re-parented to the nearest
|
||||
* kept ancestor (or become roots).
|
||||
*/
|
||||
export async function truncateSessionAfterCompaction(params: {
|
||||
sessionFile: string;
|
||||
/** Optional path to archive the pre-truncation file. */
|
||||
archivePath?: string;
|
||||
}): Promise<TruncationResult> {
|
||||
const { sessionFile } = params;
|
||||
|
||||
let sm: SessionManager;
|
||||
try {
|
||||
sm = SessionManager.open(sessionFile);
|
||||
} catch (err) {
|
||||
const reason = err instanceof Error ? err.message : String(err);
|
||||
log.warn(`[session-truncation] Failed to open session file: ${reason}`);
|
||||
return { truncated: false, entriesRemoved: 0, reason };
|
||||
}
|
||||
|
||||
const header = sm.getHeader();
|
||||
if (!header) {
|
||||
return { truncated: false, entriesRemoved: 0, reason: "missing session header" };
|
||||
}
|
||||
|
||||
const branch = sm.getBranch();
|
||||
if (branch.length === 0) {
|
||||
return { truncated: false, entriesRemoved: 0, reason: "empty session" };
|
||||
}
|
||||
|
||||
// Find the latest compaction entry in the current branch
|
||||
let latestCompactionIdx = -1;
|
||||
for (let i = branch.length - 1; i >= 0; i--) {
|
||||
if (branch[i].type === "compaction") {
|
||||
latestCompactionIdx = i;
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
if (latestCompactionIdx < 0) {
|
||||
return { truncated: false, entriesRemoved: 0, reason: "no compaction entry found" };
|
||||
}
|
||||
|
||||
// Nothing to truncate if compaction is already at root
|
||||
if (latestCompactionIdx === 0) {
|
||||
return { truncated: false, entriesRemoved: 0, reason: "compaction already at root" };
|
||||
}
|
||||
|
||||
// The compaction's firstKeptEntryId marks the start of the "unsummarized
|
||||
// tail" — entries from firstKeptEntryId through the compaction that
|
||||
// buildSessionContext() expects to find when reconstructing the session.
|
||||
// Only entries *before* firstKeptEntryId were actually summarized.
|
||||
const compactionEntry = branch[latestCompactionIdx] as CompactionEntry;
|
||||
const { firstKeptEntryId } = compactionEntry;
|
||||
|
||||
// Collect IDs of entries in the current branch that were actually summarized
|
||||
// (everything before firstKeptEntryId). Entries from firstKeptEntryId through
|
||||
// the compaction are the unsummarized tail and must be preserved.
|
||||
const summarizedBranchIds = new Set<string>();
|
||||
for (let i = 0; i < latestCompactionIdx; i++) {
|
||||
if (firstKeptEntryId && branch[i].id === firstKeptEntryId) {
|
||||
break; // Everything from here to the compaction is the unsummarized tail
|
||||
}
|
||||
summarizedBranchIds.add(branch[i].id);
|
||||
}
|
||||
|
||||
// Operate on the full transcript so sibling branches and tree metadata
|
||||
// are not silently dropped.
|
||||
const allEntries = sm.getEntries();
|
||||
|
||||
// Only remove message-type entries that the compaction actually summarized.
|
||||
// Non-message session state (custom, model_change, thinking_level_change,
|
||||
// session_info, custom_message) is preserved even if it sits in the
|
||||
// summarized portion of the branch.
|
||||
//
|
||||
// label and branch_summary entries that reference removed message IDs are
|
||||
// also dropped to avoid dangling metadata (consistent with the approach in
|
||||
// tool-result-truncation.ts).
|
||||
const removedIds = new Set<string>();
|
||||
for (const entry of allEntries) {
|
||||
if (summarizedBranchIds.has(entry.id) && entry.type === "message") {
|
||||
removedIds.add(entry.id);
|
||||
}
|
||||
}
|
||||
|
||||
// Labels bookmark targetId while parentId just records the leaf when the
|
||||
// label was changed, so targetId determines whether the label is still valid.
|
||||
// Branch summaries still hang off the summarized branch via parentId.
|
||||
for (const entry of allEntries) {
|
||||
if (entry.type === "label" && removedIds.has(entry.targetId)) {
|
||||
removedIds.add(entry.id);
|
||||
continue;
|
||||
}
|
||||
if (
|
||||
entry.type === "branch_summary" &&
|
||||
entry.parentId !== null &&
|
||||
removedIds.has(entry.parentId)
|
||||
) {
|
||||
removedIds.add(entry.id);
|
||||
}
|
||||
}
|
||||
|
||||
if (removedIds.size === 0) {
|
||||
return { truncated: false, entriesRemoved: 0, reason: "no entries to remove" };
|
||||
}
|
||||
|
||||
// Build an id→entry map for walking parent chains during re-parenting.
|
||||
const entryById = new Map<string, SessionEntry>();
|
||||
for (const entry of allEntries) {
|
||||
entryById.set(entry.id, entry);
|
||||
}
|
||||
|
||||
// Keep every entry that was not removed, re-parenting where necessary so
|
||||
// the tree stays connected.
|
||||
const keptEntries: SessionEntry[] = [];
|
||||
for (const entry of allEntries) {
|
||||
if (removedIds.has(entry.id)) {
|
||||
continue;
|
||||
}
|
||||
|
||||
// Walk up the parent chain to find the nearest kept ancestor.
|
||||
let newParentId = entry.parentId;
|
||||
while (newParentId !== null && removedIds.has(newParentId)) {
|
||||
const parent = entryById.get(newParentId);
|
||||
newParentId = parent?.parentId ?? null;
|
||||
}
|
||||
|
||||
if (newParentId !== entry.parentId) {
|
||||
keptEntries.push({ ...entry, parentId: newParentId });
|
||||
} else {
|
||||
keptEntries.push(entry);
|
||||
}
|
||||
}
|
||||
|
||||
const entriesRemoved = removedIds.size;
|
||||
const totalEntriesBefore = allEntries.length;
|
||||
|
||||
// Get file size before truncation
|
||||
let bytesBefore = 0;
|
||||
try {
|
||||
const stat = await fs.stat(sessionFile);
|
||||
bytesBefore = stat.size;
|
||||
} catch {
|
||||
// If stat fails, continue anyway
|
||||
}
|
||||
|
||||
// Archive original file if requested
|
||||
if (params.archivePath) {
|
||||
try {
|
||||
const archiveDir = path.dirname(params.archivePath);
|
||||
await fs.mkdir(archiveDir, { recursive: true });
|
||||
await fs.copyFile(sessionFile, params.archivePath);
|
||||
log.info(`[session-truncation] Archived pre-truncation file to ${params.archivePath}`);
|
||||
} catch (err) {
|
||||
const reason = err instanceof Error ? err.message : String(err);
|
||||
log.warn(`[session-truncation] Failed to archive: ${reason}`);
|
||||
}
|
||||
}
|
||||
|
||||
// Write truncated file atomically (temp + rename)
|
||||
const lines: string[] = [JSON.stringify(header), ...keptEntries.map((e) => JSON.stringify(e))];
|
||||
const content = lines.join("\n") + "\n";
|
||||
|
||||
const tmpFile = `${sessionFile}.truncate-tmp`;
|
||||
try {
|
||||
await fs.writeFile(tmpFile, content, "utf-8");
|
||||
await fs.rename(tmpFile, sessionFile);
|
||||
} catch (err) {
|
||||
// Clean up temp file on failure
|
||||
try {
|
||||
await fs.unlink(tmpFile);
|
||||
} catch {
|
||||
// Ignore cleanup errors
|
||||
}
|
||||
const reason = err instanceof Error ? err.message : String(err);
|
||||
log.warn(`[session-truncation] Failed to write truncated file: ${reason}`);
|
||||
return { truncated: false, entriesRemoved: 0, reason };
|
||||
}
|
||||
|
||||
const bytesAfter = Buffer.byteLength(content, "utf-8");
|
||||
|
||||
log.info(
|
||||
`[session-truncation] Truncated session file: ` +
|
||||
`entriesBefore=${totalEntriesBefore} entriesAfter=${keptEntries.length} ` +
|
||||
`removed=${entriesRemoved} bytesBefore=${bytesBefore} bytesAfter=${bytesAfter} ` +
|
||||
`reduction=${bytesBefore > 0 ? ((1 - bytesAfter / bytesBefore) * 100).toFixed(1) : "?"}%`,
|
||||
);
|
||||
|
||||
return { truncated: true, entriesRemoved, bytesBefore, bytesAfter };
|
||||
}
|
||||
|
||||
export type TruncationResult = {
|
||||
truncated: boolean;
|
||||
entriesRemoved: number;
|
||||
bytesBefore?: number;
|
||||
bytesAfter?: number;
|
||||
reason?: string;
|
||||
};
|
||||
@ -390,6 +390,7 @@ const TARGET_KEYS = [
|
||||
"agents.defaults.compaction.postCompactionSections",
|
||||
"agents.defaults.compaction.timeoutSeconds",
|
||||
"agents.defaults.compaction.model",
|
||||
"agents.defaults.compaction.truncateAfterCompaction",
|
||||
"agents.defaults.compaction.memoryFlush",
|
||||
"agents.defaults.compaction.memoryFlush.enabled",
|
||||
"agents.defaults.compaction.memoryFlush.softThresholdTokens",
|
||||
|
||||
@ -1050,6 +1050,8 @@ export const FIELD_HELP: Record<string, string> = {
|
||||
"Maximum time in seconds allowed for a single compaction operation before it is aborted (default: 900). Increase this for very large sessions that need more time to summarize, or decrease it to fail faster on unresponsive models.",
|
||||
"agents.defaults.compaction.model":
|
||||
"Optional provider/model override used only for compaction summarization. Set this when you want compaction to run on a different model than the session default, and leave it unset to keep using the primary agent model.",
|
||||
"agents.defaults.compaction.truncateAfterCompaction":
|
||||
"When enabled, rewrites the session JSONL file after compaction to remove entries that were summarized. Prevents unbounded file growth in long-running sessions with many compaction cycles. Default: false.",
|
||||
"agents.defaults.compaction.memoryFlush":
|
||||
"Pre-compaction memory flush settings that run an agentic memory write before heavy compaction. Keep enabled for long sessions so salient context is persisted before aggressive trimming.",
|
||||
"agents.defaults.compaction.memoryFlush.enabled":
|
||||
|
||||
@ -467,6 +467,7 @@ export const FIELD_LABELS: Record<string, string> = {
|
||||
"agents.defaults.compaction.postCompactionSections": "Post-Compaction Context Sections",
|
||||
"agents.defaults.compaction.timeoutSeconds": "Compaction Timeout (Seconds)",
|
||||
"agents.defaults.compaction.model": "Compaction Model Override",
|
||||
"agents.defaults.compaction.truncateAfterCompaction": "Truncate After Compaction",
|
||||
"agents.defaults.compaction.memoryFlush": "Compaction Memory Flush",
|
||||
"agents.defaults.compaction.memoryFlush.enabled": "Compaction Memory Flush Enabled",
|
||||
"agents.defaults.compaction.memoryFlush.softThresholdTokens":
|
||||
|
||||
@ -342,6 +342,12 @@ export type AgentCompactionConfig = {
|
||||
model?: string;
|
||||
/** Maximum time in seconds for a single compaction operation (default: 900). */
|
||||
timeoutSeconds?: number;
|
||||
/**
|
||||
* Truncate the session JSONL file after compaction to remove entries that
|
||||
* were summarized. Prevents unbounded file growth in long-running sessions.
|
||||
* Default: false (existing behavior preserved).
|
||||
*/
|
||||
truncateAfterCompaction?: boolean;
|
||||
};
|
||||
|
||||
export type AgentCompactionMemoryFlushConfig = {
|
||||
|
||||
@ -1,18 +1,8 @@
|
||||
import { execFileSync } from "node:child_process";
|
||||
import fs from "node:fs";
|
||||
import os from "node:os";
|
||||
import path from "node:path";
|
||||
import { pathToFileURL } from "node:url";
|
||||
import { afterEach, describe, expect, it } from "vitest";
|
||||
import { __testing } from "./loader.js";
|
||||
|
||||
type CreateJiti = typeof import("jiti").createJiti;
|
||||
|
||||
let createJitiPromise: Promise<CreateJiti> | undefined;
|
||||
|
||||
async function getCreateJiti() {
|
||||
createJitiPromise ??= import("jiti").then(({ createJiti }) => createJiti);
|
||||
return createJitiPromise;
|
||||
}
|
||||
|
||||
const tempRoots: string[] = [];
|
||||
|
||||
@ -39,7 +29,6 @@ describe("plugin loader git path regression", () => {
|
||||
const copiedPluginSdkDir = path.join(copiedExtensionRoot, "plugin-sdk");
|
||||
mkdirSafe(copiedSourceDir);
|
||||
mkdirSafe(copiedPluginSdkDir);
|
||||
|
||||
const jitiBaseFile = path.join(copiedSourceDir, "__jiti-base__.mjs");
|
||||
fs.writeFileSync(jitiBaseFile, "export {};\n", "utf-8");
|
||||
fs.writeFileSync(
|
||||
@ -69,29 +58,46 @@ export const copiedRuntimeMarker = {
|
||||
`,
|
||||
"utf-8",
|
||||
);
|
||||
|
||||
const copiedChannelRuntime = path.join(copiedExtensionRoot, "src", "channel.runtime.ts");
|
||||
const jitiBaseUrl = pathToFileURL(jitiBaseFile).href;
|
||||
const createJiti = await getCreateJiti();
|
||||
const withoutAlias = createJiti(jitiBaseUrl, {
|
||||
...__testing.buildPluginLoaderJitiOptions({}),
|
||||
const script = `
|
||||
import { createJiti } from "jiti";
|
||||
const withoutAlias = createJiti(${JSON.stringify(jitiBaseFile)}, {
|
||||
interopDefault: true,
|
||||
tryNative: false,
|
||||
extensions: [".ts", ".tsx", ".mts", ".cts", ".mtsx", ".ctsx", ".js", ".mjs", ".cjs", ".json"],
|
||||
});
|
||||
// The production loader uses sync Jiti evaluation, so this regression test
|
||||
// should exercise the same seam instead of Jiti's async import helper.
|
||||
expect(() => withoutAlias(copiedChannelRuntime)).toThrow();
|
||||
|
||||
const withAlias = createJiti(jitiBaseUrl, {
|
||||
...__testing.buildPluginLoaderJitiOptions({
|
||||
"openclaw/plugin-sdk/channel-runtime": copiedChannelRuntimeShim,
|
||||
}),
|
||||
let withoutAliasThrew = false;
|
||||
try {
|
||||
withoutAlias(${JSON.stringify(copiedChannelRuntime)});
|
||||
} catch {
|
||||
withoutAliasThrew = true;
|
||||
}
|
||||
const withAlias = createJiti(${JSON.stringify(jitiBaseFile)}, {
|
||||
interopDefault: true,
|
||||
tryNative: false,
|
||||
});
|
||||
expect(withAlias(copiedChannelRuntime)).toMatchObject({
|
||||
copiedRuntimeMarker: {
|
||||
PAIRING_APPROVED_MESSAGE: "paired",
|
||||
resolveOutboundSendDep: expect.any(Function),
|
||||
extensions: [".ts", ".tsx", ".mts", ".cts", ".mtsx", ".ctsx", ".js", ".mjs", ".cjs", ".json"],
|
||||
alias: {
|
||||
"openclaw/plugin-sdk/channel-runtime": ${JSON.stringify(copiedChannelRuntimeShim)},
|
||||
},
|
||||
});
|
||||
const mod = withAlias(${JSON.stringify(copiedChannelRuntime)});
|
||||
console.log(JSON.stringify({
|
||||
withoutAliasThrew,
|
||||
marker: mod.copiedRuntimeMarker?.PAIRING_APPROVED_MESSAGE,
|
||||
dep: mod.copiedRuntimeMarker?.resolveOutboundSendDep?.(),
|
||||
}));
|
||||
`;
|
||||
const raw = execFileSync(process.execPath, ["--input-type=module", "--eval", script], {
|
||||
cwd: process.cwd(),
|
||||
encoding: "utf-8",
|
||||
});
|
||||
const result = JSON.parse(raw) as {
|
||||
withoutAliasThrew: boolean;
|
||||
marker?: string;
|
||||
dep?: string;
|
||||
};
|
||||
expect(result.withoutAliasThrew).toBe(true);
|
||||
expect(result.marker).toBe("paired");
|
||||
expect(result.dep).toBe("shimmed");
|
||||
});
|
||||
});
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user