fix(agents): preserve embedded run cleanup on flush failure

This commit is contained in:
Joey Krug 2026-03-14 23:18:13 -04:00
parent 2e4631fbe4
commit 37522cc01c
2 changed files with 173 additions and 23 deletions

View File

@ -41,6 +41,12 @@ const hoisted = vi.hoisted(() => {
const getGlobalHookRunnerMock = vi.fn<() => unknown>(() => undefined);
const initializeGlobalHookRunnerMock = vi.fn();
const runContextEngineMaintenanceMock = vi.fn(async (_params?: unknown) => undefined);
const sessionLockReleaseMock = vi.fn(async () => {});
const flushPendingToolResultsAfterIdleMock = vi.fn(async () => {});
const setActiveEmbeddedRunMock = vi.fn();
const clearActiveEmbeddedRunMock = vi.fn();
const updateActiveEmbeddedRunSnapshotMock = vi.fn();
const releaseWsSessionMock = vi.fn();
const sessionManager = {
getLeafEntry: vi.fn(() => null),
branch: vi.fn(),
@ -59,6 +65,12 @@ const hoisted = vi.hoisted(() => {
getGlobalHookRunnerMock,
initializeGlobalHookRunnerMock,
runContextEngineMaintenanceMock,
sessionLockReleaseMock,
flushPendingToolResultsAfterIdleMock,
setActiveEmbeddedRunMock,
clearActiveEmbeddedRunMock,
updateActiveEmbeddedRunSnapshotMock,
releaseWsSessionMock,
sessionManager,
};
});
@ -177,12 +189,14 @@ vi.mock("../tool-result-context-guard.js", () => ({
}));
vi.mock("../wait-for-idle-before-flush.js", () => ({
flushPendingToolResultsAfterIdle: async () => {},
flushPendingToolResultsAfterIdle: hoisted.flushPendingToolResultsAfterIdleMock,
}));
vi.mock("../runs.js", () => ({
setActiveEmbeddedRun: () => {},
clearActiveEmbeddedRun: () => {},
setActiveEmbeddedRun: (...args: unknown[]) => hoisted.setActiveEmbeddedRunMock(...args),
clearActiveEmbeddedRun: (...args: unknown[]) => hoisted.clearActiveEmbeddedRunMock(...args),
updateActiveEmbeddedRunSnapshot: (...args: unknown[]) =>
hoisted.updateActiveEmbeddedRunSnapshotMock(...args),
}));
vi.mock("./images.js", () => ({
@ -214,7 +228,7 @@ vi.mock("../extra-params.js", () => ({
vi.mock("../../openai-ws-stream.js", () => ({
createOpenAIWebSocketStreamFn: vi.fn(),
releaseWsSession: () => {},
releaseWsSession: (...args: unknown[]) => hoisted.releaseWsSessionMock(...args),
}));
vi.mock("../../anthropic-payload-log.js", () => ({
@ -299,7 +313,7 @@ function resetEmbeddedAttemptHarness(
hoisted.sessionManagerOpenMock.mockReset().mockReturnValue(hoisted.sessionManager);
hoisted.resolveSandboxContextMock.mockReset();
hoisted.acquireSessionWriteLockMock.mockReset().mockResolvedValue({
release: async () => {},
release: hoisted.sessionLockReleaseMock,
});
hoisted.resolveBootstrapContextForRunMock.mockReset().mockResolvedValue({
bootstrapFiles: [],
@ -307,6 +321,12 @@ function resetEmbeddedAttemptHarness(
});
hoisted.getGlobalHookRunnerMock.mockReset().mockReturnValue(undefined);
hoisted.runContextEngineMaintenanceMock.mockReset().mockResolvedValue(undefined);
hoisted.sessionLockReleaseMock.mockReset().mockResolvedValue(undefined);
hoisted.flushPendingToolResultsAfterIdleMock.mockReset().mockResolvedValue(undefined);
hoisted.setActiveEmbeddedRunMock.mockReset();
hoisted.clearActiveEmbeddedRunMock.mockReset();
hoisted.updateActiveEmbeddedRunSnapshotMock.mockReset();
hoisted.releaseWsSessionMock.mockReset();
hoisted.sessionManager.getLeafEntry.mockReset().mockReturnValue(null);
hoisted.sessionManager.branch.mockReset();
hoisted.sessionManager.resetLeaf.mockReset();
@ -494,6 +514,71 @@ describe("runEmbeddedAttempt sessions_spawn workspace inheritance", () => {
});
});
describe("runEmbeddedAttempt cleanup", () => {
const tempPaths: string[] = [];
beforeEach(() => {
resetEmbeddedAttemptHarness({
subscribeImpl: createSubscriptionMock,
});
});
afterEach(async () => {
await cleanupTempPaths(tempPaths);
});
it("clears the active run and releases session resources when idle flush fails", async () => {
const workspaceDir = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-cleanup-workspace-"));
const agentDir = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-cleanup-agent-"));
const sessionFile = path.join(workspaceDir, "session.jsonl");
tempPaths.push(workspaceDir, agentDir);
await fs.writeFile(sessionFile, "", "utf8");
const disposeMock = vi.fn();
const flushError = new Error("flush failed");
hoisted.flushPendingToolResultsAfterIdleMock.mockRejectedValueOnce(flushError);
hoisted.createAgentSessionMock.mockImplementation(async () => ({
session: {
...createDefaultEmbeddedSession(),
dispose: disposeMock,
},
}));
await expect(
runEmbeddedAttempt({
sessionId: "embedded-session",
sessionKey: "agent:main:test-cleanup",
sessionFile,
workspaceDir,
agentDir,
config: {},
prompt: "hello",
timeoutMs: 10_000,
runId: "run-cleanup-flush-failure",
provider: "openai",
modelId: "gpt-test",
model: testModel,
authStorage: {} as AuthStorage,
modelRegistry: {} as ModelRegistry,
thinkLevel: "off",
senderIsOwner: true,
disableMessageTool: true,
}),
).rejects.toThrow(flushError);
expect(hoisted.flushPendingToolResultsAfterIdleMock).toHaveBeenCalledTimes(1);
expect(hoisted.setActiveEmbeddedRunMock).toHaveBeenCalledTimes(1);
expect(hoisted.clearActiveEmbeddedRunMock).toHaveBeenCalledWith(
"embedded-session",
expect.any(Object),
"agent:main:test-cleanup",
);
expect(disposeMock).toHaveBeenCalledTimes(1);
expect(hoisted.releaseWsSessionMock).toHaveBeenCalledWith("embedded-session");
expect(hoisted.sessionLockReleaseMock).toHaveBeenCalledTimes(1);
});
});
describe("runEmbeddedAttempt bootstrap warning prompt assembly", () => {
const tempPaths: string[] = [];

View File

@ -165,6 +165,33 @@ type PromptBuildHookRunner = {
const SESSIONS_YIELD_INTERRUPT_CUSTOM_TYPE = "openclaw.sessions_yield_interrupt";
const SESSIONS_YIELD_CONTEXT_CUSTOM_TYPE = "openclaw.sessions_yield";
async function runCleanupSteps(
steps: ReadonlyArray<{ label: string; run: () => void | Promise<void> }>,
): Promise<void> {
let firstError: unknown;
let hasError = false;
for (const step of steps) {
try {
await step.run();
} catch (err) {
if (!hasError) {
firstError = err;
hasError = true;
continue;
}
log.warn(
`embedded attempt cleanup failed during ${step.label}: ${describeUnknownError(err)}`,
);
}
}
if (hasError) {
throw firstError;
}
}
// Persist a hidden context reminder so the next turn knows why the runner stopped.
function buildSessionsYieldContextMessage(message: string): string {
return `${message}\n\n[Context: The previous turn ended intentionally via sessions_yield while waiting for a follow-up event.]`;
@ -2009,6 +2036,7 @@ export async function runEmbeddedAttempt(
let sessionManager: ReturnType<typeof guardSessionManager> | undefined;
let session: Awaited<ReturnType<typeof createAgentSession>>["session"] | undefined;
let queueHandle: EmbeddedPiQueueHandle | undefined;
let removeToolResultContextGuard: (() => void) | undefined;
try {
await repairSessionFileIfNeeded({
@ -2567,7 +2595,7 @@ export async function runEmbeddedAttempt(
getCompactionCount,
} = subscription;
const queueHandle: EmbeddedPiQueueHandle = {
queueHandle = {
queueMessage: async (text: string) => {
await activeSession.steer(text);
},
@ -3192,23 +3220,60 @@ export async function runEmbeddedAttempt(
// flushPendingToolResults() fires while tools are still executing, inserting
// synthetic "missing tool result" errors and causing silent agent failures.
// See: https://github.com/openclaw/openclaw/issues/8643
removeToolResultContextGuard?.();
await flushPendingToolResultsAfterIdle({
agent: session?.agent,
sessionManager,
clearPendingOnTimeout: true,
});
// Clear embedded run AFTER flushing pending tool results so that all concurrent
// tool executions (e.g. sessions_spawn) complete before the run is considered ended.
// This prevents premature announce when sessions_yield runs in parallel with
// sessions_spawn — the announce flow's waitForEmbeddedPiRunEnd won't resolve
// until spawn registrations are committed to the subagent registry.
clearActiveEmbeddedRun(params.sessionId, queueHandle, params.sessionKey);
session?.dispose();
releaseWsSession(params.sessionId);
await bundleMcpRuntime?.dispose();
await bundleLspRuntime?.dispose();
await sessionLock.release();
await runCleanupSteps([
{
label: "tool result context guard removal",
run: () => {
removeToolResultContextGuard?.();
},
},
{
label: "pending tool result flush",
run: () =>
flushPendingToolResultsAfterIdle({
agent: session?.agent,
sessionManager,
clearPendingOnTimeout: true,
}),
},
{
// Clear embedded run AFTER flushing pending tool results so that all concurrent
// tool executions (e.g. sessions_spawn) complete before the run is considered ended.
// This prevents premature announce when sessions_yield runs in parallel with
// sessions_spawn — the announce flow's waitForEmbeddedPiRunEnd won't resolve
// until spawn registrations are committed to the subagent registry.
label: "active embedded run clear",
run: () => {
if (queueHandle) {
clearActiveEmbeddedRun(params.sessionId, queueHandle, params.sessionKey);
}
},
},
{
label: "session dispose",
run: () => {
session?.dispose();
},
},
{
label: "websocket session release",
run: () => {
releaseWsSession(params.sessionId);
},
},
{
label: "bundle MCP runtime dispose",
run: () => bundleMcpRuntime?.dispose(),
},
{
label: "bundle LSP runtime dispose",
run: () => bundleLspRuntime?.dispose(),
},
{
label: "session lock release",
run: () => sessionLock.release(),
},
]);
}
} finally {
restoreSkillEnv?.();