From f1ad5a4733cca228805017857d51948211b22f89 Mon Sep 17 00:00:00 2001 From: Joey Krug Date: Sat, 21 Mar 2026 00:42:38 -0400 Subject: [PATCH] fix: address codex review comments on #46741 - Move clearActiveEmbeddedRun AFTER session lock release in cleanup sequence so restart drains and waitForEmbeddedPiRunEnd barriers stay consistent - Guard queueHandle access with sessionLockReleased flag to prevent TDZ errors - Clear memoized registry promise on import failure so retries work - Split registry load vs query errors: load failure falls back to best-effort announce; query failure still defers to avoid premature completion - Guard requesterIsSubagent path with subagentRegistryRuntime availability - Simplify runCleanupSteps: remove duplicate log.warn, use undefined check - Fix missing closing brace in timeout test from rebase --- .../run/attempt.spawn-workspace.test.ts | 62 +++++++++- src/agents/pi-embedded-runner/run/attempt.ts | 48 +++----- ...nce.registry-runtime-load-fallback.test.ts | 116 ++++++++++++++++++ src/agents/subagent-announce.timeout.test.ts | 1 + src/agents/subagent-announce.ts | 78 +++++++----- 5 files changed, 239 insertions(+), 66 deletions(-) create mode 100644 src/agents/subagent-announce.registry-runtime-load-fallback.test.ts diff --git a/src/agents/pi-embedded-runner/run/attempt.spawn-workspace.test.ts b/src/agents/pi-embedded-runner/run/attempt.spawn-workspace.test.ts index ca69e168ad6..42c97a7ce1b 100644 --- a/src/agents/pi-embedded-runner/run/attempt.spawn-workspace.test.ts +++ b/src/agents/pi-embedded-runner/run/attempt.spawn-workspace.test.ts @@ -573,13 +573,17 @@ describe("runEmbeddedAttempt cleanup", () => { expect.any(Object), "agent:main:test-cleanup", ); - expect(hoisted.clearActiveEmbeddedRunMock.mock.invocationCallOrder[0]).toBeLessThan( - hoisted.flushPendingToolResultsAfterIdleMock.mock.invocationCallOrder[0] ?? + expect(hoisted.flushPendingToolResultsAfterIdleMock.mock.invocationCallOrder[0]).toBeLessThan( + hoisted.clearActiveEmbeddedRunMock.mock.invocationCallOrder[0] ?? Number.POSITIVE_INFINITY, ); expect(disposeMock).toHaveBeenCalledTimes(1); expect(hoisted.releaseWsSessionMock).toHaveBeenCalledWith("embedded-session"); expect(hoisted.sessionLockReleaseMock).toHaveBeenCalledTimes(1); + expect(hoisted.sessionLockReleaseMock.mock.invocationCallOrder[0]).toBeLessThan( + hoisted.clearActiveEmbeddedRunMock.mock.invocationCallOrder[0] ?? + Number.POSITIVE_INFINITY, + ); }); it("skips active-run clear without masking the original error when subscribe fails before registration", async () => { @@ -631,7 +635,7 @@ describe("runEmbeddedAttempt cleanup", () => { expect(hoisted.sessionLockReleaseMock).toHaveBeenCalledTimes(1); }); - it("clears the active run before waiting for the idle flush", async () => { + it("clears the active run only after idle flush and session lock release finish", async () => { const workspaceDir = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-cleanup-workspace-")); const agentDir = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-cleanup-agent-")); const sessionFile = path.join(workspaceDir, "session.jsonl"); @@ -665,11 +669,59 @@ describe("runEmbeddedAttempt cleanup", () => { expect(result.promptError).toBeNull(); expect(hoisted.clearActiveEmbeddedRunMock).toHaveBeenCalledTimes(1); expect(hoisted.flushPendingToolResultsAfterIdleMock).toHaveBeenCalledTimes(1); - expect(hoisted.clearActiveEmbeddedRunMock.mock.invocationCallOrder[0]).toBeLessThan( - hoisted.flushPendingToolResultsAfterIdleMock.mock.invocationCallOrder[0] ?? + expect(hoisted.sessionLockReleaseMock).toHaveBeenCalledTimes(1); + expect(hoisted.flushPendingToolResultsAfterIdleMock.mock.invocationCallOrder[0]).toBeLessThan( + hoisted.sessionLockReleaseMock.mock.invocationCallOrder[0] ?? Number.POSITIVE_INFINITY, + ); + expect(hoisted.sessionLockReleaseMock.mock.invocationCallOrder[0]).toBeLessThan( + hoisted.clearActiveEmbeddedRunMock.mock.invocationCallOrder[0] ?? Number.POSITIVE_INFINITY, ); }); + + it("keeps the active run registered when session lock release fails", async () => { + const workspaceDir = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-cleanup-workspace-")); + const agentDir = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-cleanup-agent-")); + const sessionFile = path.join(workspaceDir, "session.jsonl"); + tempPaths.push(workspaceDir, agentDir); + await fs.writeFile(sessionFile, "", "utf8"); + + const lockReleaseError = new Error("lock release failed"); + hoisted.sessionLockReleaseMock.mockRejectedValueOnce(lockReleaseError); + hoisted.createAgentSessionMock.mockImplementation(async () => ({ + session: createDefaultEmbeddedSession(), + })); + + await expect( + runEmbeddedAttempt({ + sessionId: "embedded-session", + sessionKey: "agent:main:test-cleanup-lock-release", + sessionFile, + workspaceDir, + agentDir, + config: {}, + prompt: "hello", + timeoutMs: 10_000, + runId: "run-cleanup-lock-release", + provider: "openai", + modelId: "gpt-test", + model: testModel, + authStorage: {} as AuthStorage, + modelRegistry: {} as ModelRegistry, + thinkLevel: "off", + senderIsOwner: true, + disableMessageTool: true, + }), + ).rejects.toThrow(lockReleaseError); + + expect(hoisted.flushPendingToolResultsAfterIdleMock).toHaveBeenCalledTimes(1); + expect(hoisted.sessionLockReleaseMock).toHaveBeenCalledTimes(1); + expect(hoisted.clearActiveEmbeddedRunMock).not.toHaveBeenCalled(); + expect(hoisted.releaseWsSessionMock).toHaveBeenCalledWith("embedded-session"); + expect(hoisted.flushPendingToolResultsAfterIdleMock.mock.invocationCallOrder[0]).toBeLessThan( + hoisted.sessionLockReleaseMock.mock.invocationCallOrder[0] ?? Number.POSITIVE_INFINITY, + ); + }); }); describe("runEmbeddedAttempt bootstrap warning prompt assembly", () => { diff --git a/src/agents/pi-embedded-runner/run/attempt.ts b/src/agents/pi-embedded-runner/run/attempt.ts index 814c8efbcf7..e1fdf6afd52 100644 --- a/src/agents/pi-embedded-runner/run/attempt.ts +++ b/src/agents/pi-embedded-runner/run/attempt.ts @@ -168,29 +168,22 @@ const SESSIONS_YIELD_CONTEXT_CUSTOM_TYPE = "openclaw.sessions_yield"; async function runCleanupSteps( steps: ReadonlyArray<{ label: string; run: () => void | Promise }>, ): Promise { - let firstError: unknown; - let hasError = false; + let firstError: unknown | undefined; for (const step of steps) { try { await step.run(); } catch (err) { - if (!hasError) { + if (firstError === undefined) { firstError = err; - hasError = true; - log.warn( - `embedded attempt cleanup failed during ${step.label}: ${describeUnknownError(err)}`, - ); - continue; } - log.warn( `embedded attempt cleanup failed during ${step.label}: ${describeUnknownError(err)}`, ); } } - if (hasError) { + if (firstError !== undefined) { throw firstError; } } @@ -3223,6 +3216,7 @@ export async function runEmbeddedAttempt( // flushPendingToolResults() fires while tools are still executing, inserting // synthetic "missing tool result" errors and causing silent agent failures. // See: https://github.com/openclaw/openclaw/issues/8643 + let sessionLockReleased = false; await runCleanupSteps([ { label: "tool result context guard removal", @@ -3230,23 +3224,6 @@ export async function runEmbeddedAttempt( removeToolResultContextGuard?.(); }, }, - { - // Clear embedded run BEFORE flushing pending tool results so that - // waitForEmbeddedPiRunEnd resolves promptly. The flush can take up to 30s - // (idle-wait timeout in wait-for-idle-before-flush.ts), but several callers - // only wait 15s (session-reset-service.ts, commands-compact.ts). Clearing - // first ensures the run is marked ended within the waiter timeout budget. - // - // This is safe because runCleanupSteps executes each step in an isolated - // try/catch — if this step throws (e.g. queueHandle is somehow invalid), - // the flush and remaining teardown still proceed. - label: "active embedded run clear", - run: () => { - if (queueHandle) { - clearActiveEmbeddedRun(params.sessionId, queueHandle, params.sessionKey); - } - }, - }, { label: "pending tool result flush", run: () => @@ -3278,7 +3255,22 @@ export async function runEmbeddedAttempt( }, { label: "session lock release", - run: () => sessionLock.release(), + run: async () => { + await sessionLock.release(); + sessionLockReleased = true; + }, + }, + { + // Keep the embedded run registered until idle flush and lock release finish. + // waitForEmbeddedPiRunEnd() and restart drains rely on this barrier so yielded + // or aborted runs do not look fully idle while descendant spawns/tool cleanup + // may still be settling. + label: "active embedded run clear", + run: () => { + if (queueHandle && sessionLockReleased) { + clearActiveEmbeddedRun(params.sessionId, queueHandle, params.sessionKey); + } + }, }, ]); } diff --git a/src/agents/subagent-announce.registry-runtime-load-fallback.test.ts b/src/agents/subagent-announce.registry-runtime-load-fallback.test.ts new file mode 100644 index 00000000000..ea9da885a94 --- /dev/null +++ b/src/agents/subagent-announce.registry-runtime-load-fallback.test.ts @@ -0,0 +1,116 @@ +import { afterEach, describe, expect, it, vi } from "vitest"; + +type GatewayCall = { + method?: string; + timeoutMs?: number; + expectFinal?: boolean; + params?: Record; +}; + +const gatewayCalls: GatewayCall[] = []; +let registryRuntimeLoadAttempts = 0; + +async function importModuleWithRegistryRuntimeFailure() { + vi.resetModules(); + gatewayCalls.length = 0; + registryRuntimeLoadAttempts = 0; + + vi.doMock("../gateway/call.js", () => ({ + callGateway: vi.fn(async (request: GatewayCall) => { + gatewayCalls.push(request); + if (request.method === "chat.history") { + return { messages: [] }; + } + return {}; + }), + })); + + vi.doMock("../config/config.js", () => ({ + loadConfig: () => ({ + session: { + mainKey: "main", + scope: "per-sender", + }, + }), + })); + + vi.doMock("../config/sessions.js", () => ({ + loadSessionStore: vi.fn(() => ({ + "agent:main:main": { sessionId: "sess-main", updatedAt: 1 }, + "agent:main:subagent:worker": { sessionId: "sess-worker", updatedAt: 1 }, + })), + resolveAgentIdFromSessionKey: () => "main", + resolveStorePath: () => "/tmp/sessions-main.json", + resolveMainSessionKey: () => "agent:main:main", + })); + + vi.doMock("./pi-embedded.js", () => ({ + isEmbeddedPiRunActive: () => false, + queueEmbeddedPiMessage: () => false, + waitForEmbeddedPiRunEnd: async () => true, + })); + + vi.doMock("./subagent-depth.js", () => ({ + getSubagentDepthFromSessionStore: () => 0, + })); + + vi.doMock("./subagent-registry-runtime.js", () => { + registryRuntimeLoadAttempts += 1; + throw new Error("registry runtime load failed"); + }); + + return await import("./subagent-announce.js"); +} + +describe("subagent announce registry runtime fallback", () => { + afterEach(() => { + vi.resetModules(); + vi.doUnmock("../gateway/call.js"); + vi.doUnmock("../config/config.js"); + vi.doUnmock("../config/sessions.js"); + vi.doUnmock("./pi-embedded.js"); + vi.doUnmock("./subagent-depth.js"); + vi.doUnmock("./subagent-registry-runtime.js"); + gatewayCalls.length = 0; + registryRuntimeLoadAttempts = 0; + }); + + it("falls back to best-effort announce and retries runtime load after a failure", async () => { + const { runSubagentAnnounceFlow } = await importModuleWithRegistryRuntimeFailure(); + + const baseParams = { + childSessionKey: "agent:main:subagent:worker", + requesterSessionKey: "agent:main:main", + requesterDisplayKey: "main", + task: "do thing", + timeoutMs: 1_000, + cleanup: "keep" as const, + roundOneReply: "done", + waitForCompletion: false, + outcome: { status: "ok" as const }, + }; + + await expect( + runSubagentAnnounceFlow({ + ...baseParams, + childRunId: "run-registry-load-failure-1", + }), + ).resolves.toBe(true); + + await expect( + runSubagentAnnounceFlow({ + ...baseParams, + childRunId: "run-registry-load-failure-2", + }), + ).resolves.toBe(true); + + const directAgentCalls = gatewayCalls.filter( + (call) => call.method === "agent" && call.expectFinal === true, + ); + expect(directAgentCalls).toHaveLength(2); + expect(directAgentCalls.every((call) => call.params?.sessionKey === "agent:main:main")).toBe( + true, + ); + expect(registryRuntimeLoadAttempts).toBe(2); + }); +}); diff --git a/src/agents/subagent-announce.timeout.test.ts b/src/agents/subagent-announce.timeout.test.ts index 23f5f84c407..56c1950c5eb 100644 --- a/src/agents/subagent-announce.timeout.test.ts +++ b/src/agents/subagent-announce.timeout.test.ts @@ -454,6 +454,7 @@ describe("subagent announce timeout config", () => { expect( findGatewayCall((call) => call.method === "agent" && call.expectFinal === true), ).toBeUndefined(); + }); it("regression, defers announce when registry check throws instead of sending premature completion", async () => { registryCheckShouldThrow = true; diff --git a/src/agents/subagent-announce.ts b/src/agents/subagent-announce.ts index 96d278711fa..4c2e59ae9ec 100644 --- a/src/agents/subagent-announce.ts +++ b/src/agents/subagent-announce.ts @@ -59,7 +59,10 @@ let subagentRegistryRuntimePromise: Promise< > | null = null; function loadSubagentRegistryRuntime() { - subagentRegistryRuntimePromise ??= import("./subagent-registry-runtime.js"); + subagentRegistryRuntimePromise ??= import("./subagent-registry-runtime.js").catch((err) => { + subagentRegistryRuntimePromise = null; + throw err; + }); return subagentRegistryRuntimePromise; } @@ -1332,40 +1335,49 @@ export async function runSubagentAnnounceFlow(params: { | undefined; try { subagentRegistryRuntime = await loadSubagentRegistryRuntime(); - if ( - requesterDepth >= 1 && - subagentRegistryRuntime.shouldIgnorePostCompletionAnnounceForSession( - targetRequesterSessionKey, - ) - ) { - return true; - } - - const pendingChildDescendantRuns = Math.max( - 0, - subagentRegistryRuntime.countPendingDescendantRuns(params.childSessionKey), + } catch (err) { + defaultRuntime.log( + `[warn] Subagent announce registry runtime load failed; using best-effort announce fallback: ${summarizeDeliveryError(err)}`, ); - if (pendingChildDescendantRuns > 0 && announceType !== "cron job") { + } + if (subagentRegistryRuntime) { + try { + if ( + requesterDepth >= 1 && + subagentRegistryRuntime.shouldIgnorePostCompletionAnnounceForSession( + targetRequesterSessionKey, + ) + ) { + return true; + } + + const pendingChildDescendantRuns = Math.max( + 0, + subagentRegistryRuntime.countPendingDescendantRuns(params.childSessionKey), + ); + if (pendingChildDescendantRuns > 0 && announceType !== "cron job") { + shouldDeleteChildSession = false; + return false; + } + + if (typeof subagentRegistryRuntime.listSubagentRunsForRequester === "function") { + const directChildren = subagentRegistryRuntime.listSubagentRunsForRequester( + params.childSessionKey, + { + requesterRunId: params.childRunId, + }, + ); + if (Array.isArray(directChildren) && directChildren.length > 0) { + childCompletionFindings = buildChildCompletionFindings(directChildren); + } + } + } catch { + // If the registry query fails after runtime load, defer the announce rather than + // risk a premature completion message. The deferred cleanup path will retry once + // descendants settle. shouldDeleteChildSession = false; return false; } - - if (typeof subagentRegistryRuntime.listSubagentRunsForRequester === "function") { - const directChildren = subagentRegistryRuntime.listSubagentRunsForRequester( - params.childSessionKey, - { - requesterRunId: params.childRunId, - }, - ); - if (Array.isArray(directChildren) && directChildren.length > 0) { - childCompletionFindings = buildChildCompletionFindings(directChildren); - } - } - } catch { - // If the registry check fails, defer the announce rather than risk a premature - // completion message. The deferred cleanup path will retry once descendants settle. - shouldDeleteChildSession = false; - return false; } const announceId = buildAnnounceIdFromChildRun({ @@ -1453,12 +1465,12 @@ export async function runSubagentAnnounceFlow(params: { const findings = childCompletionFindings || reply || "(no output)"; let requesterIsSubagent = requesterIsInternalSession(); - if (requesterIsSubagent) { + if (requesterIsSubagent && subagentRegistryRuntime) { const { isSubagentSessionRunActive, resolveRequesterForChildSession, shouldIgnorePostCompletionAnnounceForSession, - } = subagentRegistryRuntime ?? (await loadSubagentRegistryRuntime()); + } = subagentRegistryRuntime; if (!isSubagentSessionRunActive(targetRequesterSessionKey)) { if (shouldIgnorePostCompletionAnnounceForSession(targetRequesterSessionKey)) { return true;