Pi Runner: fix timeout compaction review feedback

This commit is contained in:
Joey Krug 2026-03-14 14:50:37 -04:00
parent d7a92284ff
commit 2ada811e9d
3 changed files with 125 additions and 43 deletions

View File

@ -44,6 +44,7 @@ describe("runEmbeddedPiAgent overflow compaction trigger routing", () => {
mockedGlobalHookRunner.runBeforeAgentStart.mockReset();
mockedGlobalHookRunner.runBeforeCompaction.mockReset();
mockedGlobalHookRunner.runAfterCompaction.mockReset();
mockedPickFallbackThinkingLevel.mockReset();
mockedContextEngine.info.ownsCompaction = false;
mockedCompactDirect.mockResolvedValue({
ok: false,
@ -63,6 +64,7 @@ describe("runEmbeddedPiAgent overflow compaction trigger routing", () => {
truncatedCount: 0,
reason: "no oversized tool results",
});
mockedPickFallbackThinkingLevel.mockReturnValue(undefined);
mockedGlobalHookRunner.hasHooks.mockImplementation(() => false);
});

View File

@ -9,6 +9,7 @@ import {
mockedContextEngine,
mockedCompactDirect,
mockedRunEmbeddedAttempt,
mockedPickFallbackThinkingLevel,
resetRunOverflowCompactionHarnessMocks,
mockedSessionLikelyHasOversizedToolResults,
mockedTruncateOversizedToolResultsInSession,
@ -34,6 +35,7 @@ describe("timeout-triggered compaction", () => {
mockedGlobalHookRunner.runBeforeAgentStart.mockReset();
mockedGlobalHookRunner.runBeforeCompaction.mockReset();
mockedGlobalHookRunner.runAfterCompaction.mockReset();
mockedPickFallbackThinkingLevel.mockReset();
mockedContextEngine.info.ownsCompaction = false;
mockedCompactDirect.mockResolvedValue({
ok: false,
@ -53,6 +55,7 @@ describe("timeout-triggered compaction", () => {
truncatedCount: 0,
reason: "no oversized tool results",
});
mockedPickFallbackThinkingLevel.mockReturnValue(undefined);
mockedGlobalHookRunner.hasHooks.mockImplementation(() => false);
});
@ -170,6 +173,31 @@ describe("timeout-triggered compaction", () => {
expect(result.payloads?.[0]?.text).toContain("timed out");
});
it("does not attempt compaction for low-context timeouts on later retries", async () => {
mockedPickFallbackThinkingLevel.mockReturnValueOnce("low");
mockedRunEmbeddedAttempt
.mockResolvedValueOnce(
makeAttemptResult({
promptError: new Error("unsupported reasoning mode"),
}),
)
.mockResolvedValueOnce(
makeAttemptResult({
timedOut: true,
lastAssistant: {
usage: { total: 20000 },
} as never,
}),
);
const result = await runEmbeddedPiAgent(overflowBaseRunParams);
expect(mockedRunEmbeddedAttempt).toHaveBeenCalledTimes(2);
expect(mockedCompactDirect).not.toHaveBeenCalled();
expect(result.payloads?.[0]?.isError).toBe(true);
expect(result.payloads?.[0]?.text).toContain("timed out");
});
it("does not attempt compaction when aborted", async () => {
mockedRunEmbeddedAttempt.mockResolvedValueOnce(
makeAttemptResult({
@ -223,4 +251,49 @@ describe("timeout-triggered compaction", () => {
expect(result.payloads?.[0]?.isError).toBe(true);
expect(result.payloads?.[0]?.text).toContain("timed out");
});
it("fires compaction hooks during timeout recovery for ownsCompaction engines", async () => {
mockedContextEngine.info.ownsCompaction = true;
mockedGlobalHookRunner.hasHooks.mockImplementation(
(hookName) => hookName === "before_compaction" || hookName === "after_compaction",
);
mockedRunEmbeddedAttempt
.mockResolvedValueOnce(
makeAttemptResult({
timedOut: true,
lastAssistant: {
usage: { total: 160000 },
} as never,
}),
)
.mockResolvedValueOnce(makeAttemptResult({ promptError: null }));
mockedCompactDirect.mockResolvedValueOnce({
ok: true,
compacted: true,
result: {
summary: "engine-owned timeout compaction",
tokensAfter: 70,
},
});
await runEmbeddedPiAgent(overflowBaseRunParams);
expect(mockedGlobalHookRunner.runBeforeCompaction).toHaveBeenCalledWith(
{ messageCount: -1, sessionFile: "/tmp/session.json" },
expect.objectContaining({
sessionKey: "test-key",
}),
);
expect(mockedGlobalHookRunner.runAfterCompaction).toHaveBeenCalledWith(
{
messageCount: -1,
compactedCount: -1,
tokenCount: 70,
sessionFile: "/tmp/session.json",
},
expect.objectContaining({
sessionKey: "test-key",
}),
);
});
});

View File

@ -881,6 +881,51 @@ export async function runEmbeddedPiAgent(
ensureContextEnginesInitialized();
const contextEngine = await resolveContextEngine(params.config);
try {
// When the engine owns compaction, compactEmbeddedPiSessionDirect is
// bypassed. Fire lifecycle hooks here so recovery paths still notify
// subscribers like memory extensions and usage trackers.
const runOwnsCompactionBeforeHook = async (reason: string) => {
if (
contextEngine.info.ownsCompaction !== true ||
!hookRunner?.hasHooks("before_compaction")
) {
return;
}
try {
await hookRunner.runBeforeCompaction(
{ messageCount: -1, sessionFile: params.sessionFile },
hookCtx,
);
} catch (hookErr) {
log.warn(`before_compaction hook failed during ${reason}: ${String(hookErr)}`);
}
};
const runOwnsCompactionAfterHook = async (
reason: string,
compactResult: Awaited<ReturnType<typeof contextEngine.compact>>,
) => {
if (
contextEngine.info.ownsCompaction !== true ||
!compactResult.ok ||
!compactResult.compacted ||
!hookRunner?.hasHooks("after_compaction")
) {
return;
}
try {
await hookRunner.runAfterCompaction(
{
messageCount: -1,
compactedCount: -1,
tokenCount: compactResult.result?.tokensAfter,
sessionFile: params.sessionFile,
},
hookCtx,
);
} catch (hookErr) {
log.warn(`after_compaction hook failed during ${reason}: ${String(hookErr)}`);
}
};
let authRetryPending = false;
// Hoisted so the retry-limit error path can use the most recent API total.
let lastTurnTotal: number | undefined;
@ -1053,16 +1098,14 @@ export async function runEmbeddedPiAgent(
if (timedOut && !aborted && !timedOutDuringCompaction) {
const tokenUsedRatio =
lastTurnTotal != null && ctxInfo.tokens > 0 ? lastTurnTotal / ctxInfo.tokens : 0;
if (
tokenUsedRatio > 0.65 ||
(overflowCompactionAttempts === 0 && runLoopIterations > 1)
) {
if (tokenUsedRatio > 0.65) {
const timeoutDiagId = createCompactionDiagId();
log.warn(
`[timeout-compaction] LLM timed out with high context usage (${Math.round(tokenUsedRatio * 100)}%); ` +
`attempting compaction before retry diagId=${timeoutDiagId}`,
);
let timeoutCompactResult: Awaited<ReturnType<typeof contextEngine.compact>>;
await runOwnsCompactionBeforeHook("timeout recovery");
try {
timeoutCompactResult = await contextEngine.compact({
sessionId: params.sessionId,
@ -1102,6 +1145,7 @@ export async function runEmbeddedPiAgent(
);
timeoutCompactResult = { ok: false, compacted: false, reason: String(compactErr) };
}
await runOwnsCompactionAfterHook("timeout recovery", timeoutCompactResult);
if (timeoutCompactResult.compacted) {
autoCompactionCount += 1;
log.info(
@ -1181,24 +1225,7 @@ export async function runEmbeddedPiAgent(
`context overflow detected (attempt ${overflowCompactionAttempts}/${MAX_OVERFLOW_COMPACTION_ATTEMPTS}); attempting auto-compaction for ${provider}/${modelId}`,
);
let compactResult: Awaited<ReturnType<typeof contextEngine.compact>>;
// When the engine owns compaction, hooks are not fired inside
// compactEmbeddedPiSessionDirect (which is bypassed). Fire them
// here so subscribers (memory extensions, usage trackers) are
// notified even on overflow-recovery compactions.
const overflowEngineOwnsCompaction = contextEngine.info.ownsCompaction === true;
const overflowHookRunner = overflowEngineOwnsCompaction ? hookRunner : null;
if (overflowHookRunner?.hasHooks("before_compaction")) {
try {
await overflowHookRunner.runBeforeCompaction(
{ messageCount: -1, sessionFile: params.sessionFile },
hookCtx,
);
} catch (hookErr) {
log.warn(
`before_compaction hook failed during overflow recovery: ${String(hookErr)}`,
);
}
}
await runOwnsCompactionBeforeHook("overflow recovery");
try {
compactResult = await contextEngine.compact({
sessionId: params.sessionId,
@ -1250,27 +1277,7 @@ export async function runEmbeddedPiAgent(
);
compactResult = { ok: false, compacted: false, reason: String(compactErr) };
}
if (
compactResult.ok &&
compactResult.compacted &&
overflowHookRunner?.hasHooks("after_compaction")
) {
try {
await overflowHookRunner.runAfterCompaction(
{
messageCount: -1,
compactedCount: -1,
tokenCount: compactResult.result?.tokensAfter,
sessionFile: params.sessionFile,
},
hookCtx,
);
} catch (hookErr) {
log.warn(
`after_compaction hook failed during overflow recovery: ${String(hookErr)}`,
);
}
}
await runOwnsCompactionAfterHook("overflow recovery", compactResult);
if (compactResult.compacted) {
autoCompactionCount += 1;
log.info(`auto-compaction succeeded for ${provider}/${modelId}; retrying prompt`);