From 2ada811e9d713a7ce1e107a8337c67cd41c626c5 Mon Sep 17 00:00:00 2001 From: Joey Krug Date: Sat, 14 Mar 2026 14:50:37 -0400 Subject: [PATCH] Pi Runner: fix timeout compaction review feedback --- .../run.overflow-compaction.test.ts | 2 + .../run.timeout-triggered-compaction.test.ts | 73 +++++++++++++++ src/agents/pi-embedded-runner/run.ts | 93 ++++++++++--------- 3 files changed, 125 insertions(+), 43 deletions(-) diff --git a/src/agents/pi-embedded-runner/run.overflow-compaction.test.ts b/src/agents/pi-embedded-runner/run.overflow-compaction.test.ts index 1f5f0b6de35..caaf175f85f 100644 --- a/src/agents/pi-embedded-runner/run.overflow-compaction.test.ts +++ b/src/agents/pi-embedded-runner/run.overflow-compaction.test.ts @@ -44,6 +44,7 @@ describe("runEmbeddedPiAgent overflow compaction trigger routing", () => { mockedGlobalHookRunner.runBeforeAgentStart.mockReset(); mockedGlobalHookRunner.runBeforeCompaction.mockReset(); mockedGlobalHookRunner.runAfterCompaction.mockReset(); + mockedPickFallbackThinkingLevel.mockReset(); mockedContextEngine.info.ownsCompaction = false; mockedCompactDirect.mockResolvedValue({ ok: false, @@ -63,6 +64,7 @@ describe("runEmbeddedPiAgent overflow compaction trigger routing", () => { truncatedCount: 0, reason: "no oversized tool results", }); + mockedPickFallbackThinkingLevel.mockReturnValue(undefined); mockedGlobalHookRunner.hasHooks.mockImplementation(() => false); }); diff --git a/src/agents/pi-embedded-runner/run.timeout-triggered-compaction.test.ts b/src/agents/pi-embedded-runner/run.timeout-triggered-compaction.test.ts index 65192a15a6b..24b7720ed81 100644 --- a/src/agents/pi-embedded-runner/run.timeout-triggered-compaction.test.ts +++ b/src/agents/pi-embedded-runner/run.timeout-triggered-compaction.test.ts @@ -9,6 +9,7 @@ import { mockedContextEngine, mockedCompactDirect, mockedRunEmbeddedAttempt, + mockedPickFallbackThinkingLevel, resetRunOverflowCompactionHarnessMocks, mockedSessionLikelyHasOversizedToolResults, mockedTruncateOversizedToolResultsInSession, @@ -34,6 +35,7 @@ describe("timeout-triggered compaction", () => { mockedGlobalHookRunner.runBeforeAgentStart.mockReset(); mockedGlobalHookRunner.runBeforeCompaction.mockReset(); mockedGlobalHookRunner.runAfterCompaction.mockReset(); + mockedPickFallbackThinkingLevel.mockReset(); mockedContextEngine.info.ownsCompaction = false; mockedCompactDirect.mockResolvedValue({ ok: false, @@ -53,6 +55,7 @@ describe("timeout-triggered compaction", () => { truncatedCount: 0, reason: "no oversized tool results", }); + mockedPickFallbackThinkingLevel.mockReturnValue(undefined); mockedGlobalHookRunner.hasHooks.mockImplementation(() => false); }); @@ -170,6 +173,31 @@ describe("timeout-triggered compaction", () => { expect(result.payloads?.[0]?.text).toContain("timed out"); }); + it("does not attempt compaction for low-context timeouts on later retries", async () => { + mockedPickFallbackThinkingLevel.mockReturnValueOnce("low"); + mockedRunEmbeddedAttempt + .mockResolvedValueOnce( + makeAttemptResult({ + promptError: new Error("unsupported reasoning mode"), + }), + ) + .mockResolvedValueOnce( + makeAttemptResult({ + timedOut: true, + lastAssistant: { + usage: { total: 20000 }, + } as never, + }), + ); + + const result = await runEmbeddedPiAgent(overflowBaseRunParams); + + expect(mockedRunEmbeddedAttempt).toHaveBeenCalledTimes(2); + expect(mockedCompactDirect).not.toHaveBeenCalled(); + expect(result.payloads?.[0]?.isError).toBe(true); + expect(result.payloads?.[0]?.text).toContain("timed out"); + }); + it("does not attempt compaction when aborted", async () => { mockedRunEmbeddedAttempt.mockResolvedValueOnce( makeAttemptResult({ @@ -223,4 +251,49 @@ describe("timeout-triggered compaction", () => { expect(result.payloads?.[0]?.isError).toBe(true); expect(result.payloads?.[0]?.text).toContain("timed out"); }); + + it("fires compaction hooks during timeout recovery for ownsCompaction engines", async () => { + mockedContextEngine.info.ownsCompaction = true; + mockedGlobalHookRunner.hasHooks.mockImplementation( + (hookName) => hookName === "before_compaction" || hookName === "after_compaction", + ); + mockedRunEmbeddedAttempt + .mockResolvedValueOnce( + makeAttemptResult({ + timedOut: true, + lastAssistant: { + usage: { total: 160000 }, + } as never, + }), + ) + .mockResolvedValueOnce(makeAttemptResult({ promptError: null })); + mockedCompactDirect.mockResolvedValueOnce({ + ok: true, + compacted: true, + result: { + summary: "engine-owned timeout compaction", + tokensAfter: 70, + }, + }); + + await runEmbeddedPiAgent(overflowBaseRunParams); + + expect(mockedGlobalHookRunner.runBeforeCompaction).toHaveBeenCalledWith( + { messageCount: -1, sessionFile: "/tmp/session.json" }, + expect.objectContaining({ + sessionKey: "test-key", + }), + ); + expect(mockedGlobalHookRunner.runAfterCompaction).toHaveBeenCalledWith( + { + messageCount: -1, + compactedCount: -1, + tokenCount: 70, + sessionFile: "/tmp/session.json", + }, + expect.objectContaining({ + sessionKey: "test-key", + }), + ); + }); }); diff --git a/src/agents/pi-embedded-runner/run.ts b/src/agents/pi-embedded-runner/run.ts index 4936ebc6ce4..3309a7af1fb 100644 --- a/src/agents/pi-embedded-runner/run.ts +++ b/src/agents/pi-embedded-runner/run.ts @@ -881,6 +881,51 @@ export async function runEmbeddedPiAgent( ensureContextEnginesInitialized(); const contextEngine = await resolveContextEngine(params.config); try { + // When the engine owns compaction, compactEmbeddedPiSessionDirect is + // bypassed. Fire lifecycle hooks here so recovery paths still notify + // subscribers like memory extensions and usage trackers. + const runOwnsCompactionBeforeHook = async (reason: string) => { + if ( + contextEngine.info.ownsCompaction !== true || + !hookRunner?.hasHooks("before_compaction") + ) { + return; + } + try { + await hookRunner.runBeforeCompaction( + { messageCount: -1, sessionFile: params.sessionFile }, + hookCtx, + ); + } catch (hookErr) { + log.warn(`before_compaction hook failed during ${reason}: ${String(hookErr)}`); + } + }; + const runOwnsCompactionAfterHook = async ( + reason: string, + compactResult: Awaited>, + ) => { + if ( + contextEngine.info.ownsCompaction !== true || + !compactResult.ok || + !compactResult.compacted || + !hookRunner?.hasHooks("after_compaction") + ) { + return; + } + try { + await hookRunner.runAfterCompaction( + { + messageCount: -1, + compactedCount: -1, + tokenCount: compactResult.result?.tokensAfter, + sessionFile: params.sessionFile, + }, + hookCtx, + ); + } catch (hookErr) { + log.warn(`after_compaction hook failed during ${reason}: ${String(hookErr)}`); + } + }; let authRetryPending = false; // Hoisted so the retry-limit error path can use the most recent API total. let lastTurnTotal: number | undefined; @@ -1053,16 +1098,14 @@ export async function runEmbeddedPiAgent( if (timedOut && !aborted && !timedOutDuringCompaction) { const tokenUsedRatio = lastTurnTotal != null && ctxInfo.tokens > 0 ? lastTurnTotal / ctxInfo.tokens : 0; - if ( - tokenUsedRatio > 0.65 || - (overflowCompactionAttempts === 0 && runLoopIterations > 1) - ) { + if (tokenUsedRatio > 0.65) { const timeoutDiagId = createCompactionDiagId(); log.warn( `[timeout-compaction] LLM timed out with high context usage (${Math.round(tokenUsedRatio * 100)}%); ` + `attempting compaction before retry diagId=${timeoutDiagId}`, ); let timeoutCompactResult: Awaited>; + await runOwnsCompactionBeforeHook("timeout recovery"); try { timeoutCompactResult = await contextEngine.compact({ sessionId: params.sessionId, @@ -1102,6 +1145,7 @@ export async function runEmbeddedPiAgent( ); timeoutCompactResult = { ok: false, compacted: false, reason: String(compactErr) }; } + await runOwnsCompactionAfterHook("timeout recovery", timeoutCompactResult); if (timeoutCompactResult.compacted) { autoCompactionCount += 1; log.info( @@ -1181,24 +1225,7 @@ export async function runEmbeddedPiAgent( `context overflow detected (attempt ${overflowCompactionAttempts}/${MAX_OVERFLOW_COMPACTION_ATTEMPTS}); attempting auto-compaction for ${provider}/${modelId}`, ); let compactResult: Awaited>; - // When the engine owns compaction, hooks are not fired inside - // compactEmbeddedPiSessionDirect (which is bypassed). Fire them - // here so subscribers (memory extensions, usage trackers) are - // notified even on overflow-recovery compactions. - const overflowEngineOwnsCompaction = contextEngine.info.ownsCompaction === true; - const overflowHookRunner = overflowEngineOwnsCompaction ? hookRunner : null; - if (overflowHookRunner?.hasHooks("before_compaction")) { - try { - await overflowHookRunner.runBeforeCompaction( - { messageCount: -1, sessionFile: params.sessionFile }, - hookCtx, - ); - } catch (hookErr) { - log.warn( - `before_compaction hook failed during overflow recovery: ${String(hookErr)}`, - ); - } - } + await runOwnsCompactionBeforeHook("overflow recovery"); try { compactResult = await contextEngine.compact({ sessionId: params.sessionId, @@ -1250,27 +1277,7 @@ export async function runEmbeddedPiAgent( ); compactResult = { ok: false, compacted: false, reason: String(compactErr) }; } - if ( - compactResult.ok && - compactResult.compacted && - overflowHookRunner?.hasHooks("after_compaction") - ) { - try { - await overflowHookRunner.runAfterCompaction( - { - messageCount: -1, - compactedCount: -1, - tokenCount: compactResult.result?.tokensAfter, - sessionFile: params.sessionFile, - }, - hookCtx, - ); - } catch (hookErr) { - log.warn( - `after_compaction hook failed during overflow recovery: ${String(hookErr)}`, - ); - } - } + await runOwnsCompactionAfterHook("overflow recovery", compactResult); if (compactResult.compacted) { autoCompactionCount += 1; log.info(`auto-compaction succeeded for ${provider}/${modelId}; retrying prompt`);