diff --git a/src/agents/pi-embedded-runner/run/attempt.spawn-workspace.test.ts b/src/agents/pi-embedded-runner/run/attempt.spawn-workspace.test.ts index 20617816e6e..9bc424f35fd 100644 --- a/src/agents/pi-embedded-runner/run/attempt.spawn-workspace.test.ts +++ b/src/agents/pi-embedded-runner/run/attempt.spawn-workspace.test.ts @@ -41,6 +41,12 @@ const hoisted = vi.hoisted(() => { const getGlobalHookRunnerMock = vi.fn<() => unknown>(() => undefined); const initializeGlobalHookRunnerMock = vi.fn(); const runContextEngineMaintenanceMock = vi.fn(async (_params?: unknown) => undefined); + const sessionLockReleaseMock = vi.fn(async () => {}); + const flushPendingToolResultsAfterIdleMock = vi.fn(async () => {}); + const setActiveEmbeddedRunMock = vi.fn(); + const clearActiveEmbeddedRunMock = vi.fn(); + const updateActiveEmbeddedRunSnapshotMock = vi.fn(); + const releaseWsSessionMock = vi.fn(); const sessionManager = { getLeafEntry: vi.fn(() => null), branch: vi.fn(), @@ -59,6 +65,12 @@ const hoisted = vi.hoisted(() => { getGlobalHookRunnerMock, initializeGlobalHookRunnerMock, runContextEngineMaintenanceMock, + sessionLockReleaseMock, + flushPendingToolResultsAfterIdleMock, + setActiveEmbeddedRunMock, + clearActiveEmbeddedRunMock, + updateActiveEmbeddedRunSnapshotMock, + releaseWsSessionMock, sessionManager, }; }); @@ -177,12 +189,14 @@ vi.mock("../tool-result-context-guard.js", () => ({ })); vi.mock("../wait-for-idle-before-flush.js", () => ({ - flushPendingToolResultsAfterIdle: async () => {}, + flushPendingToolResultsAfterIdle: hoisted.flushPendingToolResultsAfterIdleMock, })); vi.mock("../runs.js", () => ({ - setActiveEmbeddedRun: () => {}, - clearActiveEmbeddedRun: () => {}, + setActiveEmbeddedRun: (...args: unknown[]) => hoisted.setActiveEmbeddedRunMock(...args), + clearActiveEmbeddedRun: (...args: unknown[]) => hoisted.clearActiveEmbeddedRunMock(...args), + updateActiveEmbeddedRunSnapshot: (...args: unknown[]) => + hoisted.updateActiveEmbeddedRunSnapshotMock(...args), })); vi.mock("./images.js", () => ({ @@ -214,7 +228,7 @@ vi.mock("../extra-params.js", () => ({ vi.mock("../../openai-ws-stream.js", () => ({ createOpenAIWebSocketStreamFn: vi.fn(), - releaseWsSession: () => {}, + releaseWsSession: (...args: unknown[]) => hoisted.releaseWsSessionMock(...args), })); vi.mock("../../anthropic-payload-log.js", () => ({ @@ -299,7 +313,7 @@ function resetEmbeddedAttemptHarness( hoisted.sessionManagerOpenMock.mockReset().mockReturnValue(hoisted.sessionManager); hoisted.resolveSandboxContextMock.mockReset(); hoisted.acquireSessionWriteLockMock.mockReset().mockResolvedValue({ - release: async () => {}, + release: hoisted.sessionLockReleaseMock, }); hoisted.resolveBootstrapContextForRunMock.mockReset().mockResolvedValue({ bootstrapFiles: [], @@ -307,6 +321,12 @@ function resetEmbeddedAttemptHarness( }); hoisted.getGlobalHookRunnerMock.mockReset().mockReturnValue(undefined); hoisted.runContextEngineMaintenanceMock.mockReset().mockResolvedValue(undefined); + hoisted.sessionLockReleaseMock.mockReset().mockResolvedValue(undefined); + hoisted.flushPendingToolResultsAfterIdleMock.mockReset().mockResolvedValue(undefined); + hoisted.setActiveEmbeddedRunMock.mockReset(); + hoisted.clearActiveEmbeddedRunMock.mockReset(); + hoisted.updateActiveEmbeddedRunSnapshotMock.mockReset(); + hoisted.releaseWsSessionMock.mockReset(); hoisted.sessionManager.getLeafEntry.mockReset().mockReturnValue(null); hoisted.sessionManager.branch.mockReset(); hoisted.sessionManager.resetLeaf.mockReset(); @@ -494,6 +514,71 @@ describe("runEmbeddedAttempt sessions_spawn workspace inheritance", () => { }); }); +describe("runEmbeddedAttempt cleanup", () => { + const tempPaths: string[] = []; + + beforeEach(() => { + resetEmbeddedAttemptHarness({ + subscribeImpl: createSubscriptionMock, + }); + }); + + afterEach(async () => { + await cleanupTempPaths(tempPaths); + }); + + it("clears the active run and releases session resources when idle flush fails", async () => { + const workspaceDir = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-cleanup-workspace-")); + const agentDir = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-cleanup-agent-")); + const sessionFile = path.join(workspaceDir, "session.jsonl"); + tempPaths.push(workspaceDir, agentDir); + await fs.writeFile(sessionFile, "", "utf8"); + + const disposeMock = vi.fn(); + const flushError = new Error("flush failed"); + hoisted.flushPendingToolResultsAfterIdleMock.mockRejectedValueOnce(flushError); + hoisted.createAgentSessionMock.mockImplementation(async () => ({ + session: { + ...createDefaultEmbeddedSession(), + dispose: disposeMock, + }, + })); + + await expect( + runEmbeddedAttempt({ + sessionId: "embedded-session", + sessionKey: "agent:main:test-cleanup", + sessionFile, + workspaceDir, + agentDir, + config: {}, + prompt: "hello", + timeoutMs: 10_000, + runId: "run-cleanup-flush-failure", + provider: "openai", + modelId: "gpt-test", + model: testModel, + authStorage: {} as AuthStorage, + modelRegistry: {} as ModelRegistry, + thinkLevel: "off", + senderIsOwner: true, + disableMessageTool: true, + }), + ).rejects.toThrow(flushError); + + expect(hoisted.flushPendingToolResultsAfterIdleMock).toHaveBeenCalledTimes(1); + expect(hoisted.setActiveEmbeddedRunMock).toHaveBeenCalledTimes(1); + expect(hoisted.clearActiveEmbeddedRunMock).toHaveBeenCalledWith( + "embedded-session", + expect.any(Object), + "agent:main:test-cleanup", + ); + expect(disposeMock).toHaveBeenCalledTimes(1); + expect(hoisted.releaseWsSessionMock).toHaveBeenCalledWith("embedded-session"); + expect(hoisted.sessionLockReleaseMock).toHaveBeenCalledTimes(1); + }); +}); + describe("runEmbeddedAttempt bootstrap warning prompt assembly", () => { const tempPaths: string[] = []; diff --git a/src/agents/pi-embedded-runner/run/attempt.ts b/src/agents/pi-embedded-runner/run/attempt.ts index 21408b4ede2..789825469a7 100644 --- a/src/agents/pi-embedded-runner/run/attempt.ts +++ b/src/agents/pi-embedded-runner/run/attempt.ts @@ -165,6 +165,33 @@ type PromptBuildHookRunner = { const SESSIONS_YIELD_INTERRUPT_CUSTOM_TYPE = "openclaw.sessions_yield_interrupt"; const SESSIONS_YIELD_CONTEXT_CUSTOM_TYPE = "openclaw.sessions_yield"; +async function runCleanupSteps( + steps: ReadonlyArray<{ label: string; run: () => void | Promise }>, +): Promise { + let firstError: unknown; + let hasError = false; + + for (const step of steps) { + try { + await step.run(); + } catch (err) { + if (!hasError) { + firstError = err; + hasError = true; + continue; + } + + log.warn( + `embedded attempt cleanup failed during ${step.label}: ${describeUnknownError(err)}`, + ); + } + } + + if (hasError) { + throw firstError; + } +} + // Persist a hidden context reminder so the next turn knows why the runner stopped. function buildSessionsYieldContextMessage(message: string): string { return `${message}\n\n[Context: The previous turn ended intentionally via sessions_yield while waiting for a follow-up event.]`; @@ -2009,6 +2036,7 @@ export async function runEmbeddedAttempt( let sessionManager: ReturnType | undefined; let session: Awaited>["session"] | undefined; + let queueHandle: EmbeddedPiQueueHandle | undefined; let removeToolResultContextGuard: (() => void) | undefined; try { await repairSessionFileIfNeeded({ @@ -2567,7 +2595,7 @@ export async function runEmbeddedAttempt( getCompactionCount, } = subscription; - const queueHandle: EmbeddedPiQueueHandle = { + queueHandle = { queueMessage: async (text: string) => { await activeSession.steer(text); }, @@ -3192,23 +3220,60 @@ export async function runEmbeddedAttempt( // flushPendingToolResults() fires while tools are still executing, inserting // synthetic "missing tool result" errors and causing silent agent failures. // See: https://github.com/openclaw/openclaw/issues/8643 - removeToolResultContextGuard?.(); - await flushPendingToolResultsAfterIdle({ - agent: session?.agent, - sessionManager, - clearPendingOnTimeout: true, - }); - // Clear embedded run AFTER flushing pending tool results so that all concurrent - // tool executions (e.g. sessions_spawn) complete before the run is considered ended. - // This prevents premature announce when sessions_yield runs in parallel with - // sessions_spawn — the announce flow's waitForEmbeddedPiRunEnd won't resolve - // until spawn registrations are committed to the subagent registry. - clearActiveEmbeddedRun(params.sessionId, queueHandle, params.sessionKey); - session?.dispose(); - releaseWsSession(params.sessionId); - await bundleMcpRuntime?.dispose(); - await bundleLspRuntime?.dispose(); - await sessionLock.release(); + await runCleanupSteps([ + { + label: "tool result context guard removal", + run: () => { + removeToolResultContextGuard?.(); + }, + }, + { + label: "pending tool result flush", + run: () => + flushPendingToolResultsAfterIdle({ + agent: session?.agent, + sessionManager, + clearPendingOnTimeout: true, + }), + }, + { + // Clear embedded run AFTER flushing pending tool results so that all concurrent + // tool executions (e.g. sessions_spawn) complete before the run is considered ended. + // This prevents premature announce when sessions_yield runs in parallel with + // sessions_spawn — the announce flow's waitForEmbeddedPiRunEnd won't resolve + // until spawn registrations are committed to the subagent registry. + label: "active embedded run clear", + run: () => { + if (queueHandle) { + clearActiveEmbeddedRun(params.sessionId, queueHandle, params.sessionKey); + } + }, + }, + { + label: "session dispose", + run: () => { + session?.dispose(); + }, + }, + { + label: "websocket session release", + run: () => { + releaseWsSession(params.sessionId); + }, + }, + { + label: "bundle MCP runtime dispose", + run: () => bundleMcpRuntime?.dispose(), + }, + { + label: "bundle LSP runtime dispose", + run: () => bundleLspRuntime?.dispose(), + }, + { + label: "session lock release", + run: () => sessionLock.release(), + }, + ]); } } finally { restoreSkillEnv?.();