From 2e4631fbe427f2520cd186131ba153c191f3f26c Mon Sep 17 00:00:00 2001 From: Joey Krug Date: Sat, 14 Mar 2026 22:17:03 -0400 Subject: [PATCH 1/7] fix: defer sessions_yield announce until all concurrent tool executions settle (#46719) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit When parallel_tool_calls is enabled and sessions_yield runs concurrently with sessions_spawn, the embedded run was marked ended (clearActiveEmbeddedRun) before flushPendingToolResultsAfterIdle completed. This caused the announce flow's waitForEmbeddedPiRunEnd to resolve prematurely — before spawn registrations committed to the subagent registry — leading to countPendingDescendantRuns returning 0 and a premature completion announce. Fix: move clearActiveEmbeddedRun after flushPendingToolResultsAfterIdle so the embedded run stays active until all concurrent tool executions finish. Also fix the silent catch block in runSubagentAnnounceFlow to defer (return false) instead of silently proceeding when the registry check fails. Co-Authored-By: Claude Opus 4.6 --- src/agents/pi-embedded-runner/run/attempt.ts | 7 ++- src/agents/subagent-announce.timeout.test.ts | 31 +++++++++- src/agents/subagent-announce.ts | 5 +- .../subagent-registry.nested.e2e.test.ts | 59 +++++++++++++++++++ 4 files changed, 99 insertions(+), 3 deletions(-) diff --git a/src/agents/pi-embedded-runner/run/attempt.ts b/src/agents/pi-embedded-runner/run/attempt.ts index d785218f819..21408b4ede2 100644 --- a/src/agents/pi-embedded-runner/run/attempt.ts +++ b/src/agents/pi-embedded-runner/run/attempt.ts @@ -3113,7 +3113,6 @@ export async function runEmbeddedAttempt( `CRITICAL: unsubscribe failed, possible resource leak: runId=${params.runId} ${String(err)}`, ); } - clearActiveEmbeddedRun(params.sessionId, queueHandle, params.sessionKey); params.abortSignal?.removeEventListener?.("abort", onAbort); } @@ -3199,6 +3198,12 @@ export async function runEmbeddedAttempt( sessionManager, clearPendingOnTimeout: true, }); + // Clear embedded run AFTER flushing pending tool results so that all concurrent + // tool executions (e.g. sessions_spawn) complete before the run is considered ended. + // This prevents premature announce when sessions_yield runs in parallel with + // sessions_spawn — the announce flow's waitForEmbeddedPiRunEnd won't resolve + // until spawn registrations are committed to the subagent registry. + clearActiveEmbeddedRun(params.sessionId, queueHandle, params.sessionKey); session?.dispose(); releaseWsSession(params.sessionId); await bundleMcpRuntime?.dispose(); diff --git a/src/agents/subagent-announce.timeout.test.ts b/src/agents/subagent-announce.timeout.test.ts index 52cde0f69b0..23f5f84c407 100644 --- a/src/agents/subagent-announce.timeout.test.ts +++ b/src/agents/subagent-announce.timeout.test.ts @@ -25,6 +25,7 @@ let requesterDepthResolver: (sessionKey?: string) => number = () => 0; let subagentSessionRunActive = true; let shouldIgnorePostCompletion = false; let pendingDescendantRuns = 0; +let registryCheckShouldThrow = false; let fallbackRequesterResolution: { requesterSessionKey: string; requesterOrigin?: { channel?: string; to?: string; accountId?: string }; @@ -68,7 +69,12 @@ vi.mock("./pi-embedded.js", () => ({ vi.mock("./subagent-registry.js", () => ({ countActiveDescendantRuns: () => 0, - countPendingDescendantRuns: () => pendingDescendantRuns, + countPendingDescendantRuns: () => { + if (registryCheckShouldThrow) { + throw new Error("registry unavailable"); + } + return pendingDescendantRuns; + }, listSubagentRunsForRequester: () => [], isSubagentSessionRunActive: () => subagentSessionRunActive, shouldIgnorePostCompletionAnnounceForSession: () => shouldIgnorePostCompletion, @@ -157,6 +163,7 @@ describe("subagent announce timeout config", () => { subagentSessionRunActive = true; shouldIgnorePostCompletion = false; pendingDescendantRuns = 0; + registryCheckShouldThrow = false; fallbackRequesterResolution = null; }); @@ -447,5 +454,27 @@ describe("subagent announce timeout config", () => { expect( findGatewayCall((call) => call.method === "agent" && call.expectFinal === true), ).toBeUndefined(); + + it("regression, defers announce when registry check throws instead of sending premature completion", async () => { + registryCheckShouldThrow = true; + + const didAnnounce = await runAnnounceFlowForTest("run-registry-error"); + + expect(didAnnounce).toBe(false); + expect(findFinalDirectAgentCall()).toBeUndefined(); + }); + + it("regression, defers announce for yield-aborted subagent with pending child spawns", async () => { + requesterDepthResolver = () => 1; + pendingDescendantRuns = 1; + + const didAnnounce = await runAnnounceFlowForTest("run-yield-pending-children", { + requesterSessionKey: "agent:main:subagent:orchestrator", + requesterDisplayKey: "subagent:orchestrator", + childSessionKey: "agent:main:subagent:orchestrator:subagent:worker", + }); + + expect(didAnnounce).toBe(false); + expect(findFinalDirectAgentCall()).toBeUndefined(); }); }); diff --git a/src/agents/subagent-announce.ts b/src/agents/subagent-announce.ts index ab2fbb1140e..96d278711fa 100644 --- a/src/agents/subagent-announce.ts +++ b/src/agents/subagent-announce.ts @@ -1362,7 +1362,10 @@ export async function runSubagentAnnounceFlow(params: { } } } catch { - // Best-effort only. + // If the registry check fails, defer the announce rather than risk a premature + // completion message. The deferred cleanup path will retry once descendants settle. + shouldDeleteChildSession = false; + return false; } const announceId = buildAnnounceIdFromChildRun({ diff --git a/src/agents/subagent-registry.nested.e2e.test.ts b/src/agents/subagent-registry.nested.e2e.test.ts index 06148705986..7cb91472caf 100644 --- a/src/agents/subagent-registry.nested.e2e.test.ts +++ b/src/agents/subagent-registry.nested.e2e.test.ts @@ -326,4 +326,63 @@ describe("subagent registry nested agent tracking", () => { expect(countPendingDescendantRunsExcludingRun("agent:main:main", "run-self")).toBe(1); expect(countPendingDescendantRunsExcludingRun("agent:main:main", "run-sibling")).toBe(1); }); + + it("yield-aborted parent with active children reports pending descendants correctly", async () => { + const { addSubagentRunForTests, countPendingDescendantRuns, countActiveDescendantRuns } = + subagentRegistry; + + // Orchestrator yielded (ended) after spawning children — simulates sessions_yield + addSubagentRunForTests({ + runId: "run-orch-yielded", + childSessionKey: "agent:main:subagent:orch-yield", + requesterSessionKey: "agent:main:main", + requesterDisplayKey: "main", + task: "orchestrate with yield", + cleanup: "keep", + createdAt: 1, + startedAt: 1, + endedAt: 2, + cleanupHandled: false, + cleanupCompletedAt: undefined, + }); + + // Child spawned by orchestrator — still running (no endedAt) + addSubagentRunForTests({ + runId: "run-child-active", + childSessionKey: "agent:main:subagent:orch-yield:subagent:worker-1", + requesterSessionKey: "agent:main:subagent:orch-yield", + requesterDisplayKey: "orch-yield", + task: "worker task", + cleanup: "keep", + createdAt: 1, + startedAt: 2, + cleanupHandled: false, + }); + + // The orchestrator has pending descendants (child still running + orchestrator cleanup not done) + expect(countPendingDescendantRuns("agent:main:subagent:orch-yield")).toBe(1); + // The child is active under the orchestrator + expect(countActiveDescendantRuns("agent:main:subagent:orch-yield")).toBe(1); + // From main's perspective, the orchestrator itself is pending (cleanup not completed) + expect(countPendingDescendantRuns("agent:main:main")).toBe(2); + + // Child completes and finishes cleanup + addSubagentRunForTests({ + runId: "run-child-active", + childSessionKey: "agent:main:subagent:orch-yield:subagent:worker-1", + requesterSessionKey: "agent:main:subagent:orch-yield", + requesterDisplayKey: "orch-yield", + task: "worker task", + cleanup: "keep", + createdAt: 1, + startedAt: 2, + endedAt: 3, + cleanupHandled: true, + cleanupCompletedAt: 4, + }); + + // Now only the orchestrator itself remains pending (no cleanup yet) + expect(countPendingDescendantRuns("agent:main:subagent:orch-yield")).toBe(0); + expect(countPendingDescendantRuns("agent:main:main")).toBe(1); + }); }); From 37522cc01cf3f6cb162a190f45bd20b69e49d71e Mon Sep 17 00:00:00 2001 From: Joey Krug Date: Sat, 14 Mar 2026 23:18:13 -0400 Subject: [PATCH 2/7] fix(agents): preserve embedded run cleanup on flush failure --- .../run/attempt.spawn-workspace.test.ts | 95 +++++++++++++++- src/agents/pi-embedded-runner/run/attempt.ts | 101 ++++++++++++++---- 2 files changed, 173 insertions(+), 23 deletions(-) diff --git a/src/agents/pi-embedded-runner/run/attempt.spawn-workspace.test.ts b/src/agents/pi-embedded-runner/run/attempt.spawn-workspace.test.ts index 20617816e6e..9bc424f35fd 100644 --- a/src/agents/pi-embedded-runner/run/attempt.spawn-workspace.test.ts +++ b/src/agents/pi-embedded-runner/run/attempt.spawn-workspace.test.ts @@ -41,6 +41,12 @@ const hoisted = vi.hoisted(() => { const getGlobalHookRunnerMock = vi.fn<() => unknown>(() => undefined); const initializeGlobalHookRunnerMock = vi.fn(); const runContextEngineMaintenanceMock = vi.fn(async (_params?: unknown) => undefined); + const sessionLockReleaseMock = vi.fn(async () => {}); + const flushPendingToolResultsAfterIdleMock = vi.fn(async () => {}); + const setActiveEmbeddedRunMock = vi.fn(); + const clearActiveEmbeddedRunMock = vi.fn(); + const updateActiveEmbeddedRunSnapshotMock = vi.fn(); + const releaseWsSessionMock = vi.fn(); const sessionManager = { getLeafEntry: vi.fn(() => null), branch: vi.fn(), @@ -59,6 +65,12 @@ const hoisted = vi.hoisted(() => { getGlobalHookRunnerMock, initializeGlobalHookRunnerMock, runContextEngineMaintenanceMock, + sessionLockReleaseMock, + flushPendingToolResultsAfterIdleMock, + setActiveEmbeddedRunMock, + clearActiveEmbeddedRunMock, + updateActiveEmbeddedRunSnapshotMock, + releaseWsSessionMock, sessionManager, }; }); @@ -177,12 +189,14 @@ vi.mock("../tool-result-context-guard.js", () => ({ })); vi.mock("../wait-for-idle-before-flush.js", () => ({ - flushPendingToolResultsAfterIdle: async () => {}, + flushPendingToolResultsAfterIdle: hoisted.flushPendingToolResultsAfterIdleMock, })); vi.mock("../runs.js", () => ({ - setActiveEmbeddedRun: () => {}, - clearActiveEmbeddedRun: () => {}, + setActiveEmbeddedRun: (...args: unknown[]) => hoisted.setActiveEmbeddedRunMock(...args), + clearActiveEmbeddedRun: (...args: unknown[]) => hoisted.clearActiveEmbeddedRunMock(...args), + updateActiveEmbeddedRunSnapshot: (...args: unknown[]) => + hoisted.updateActiveEmbeddedRunSnapshotMock(...args), })); vi.mock("./images.js", () => ({ @@ -214,7 +228,7 @@ vi.mock("../extra-params.js", () => ({ vi.mock("../../openai-ws-stream.js", () => ({ createOpenAIWebSocketStreamFn: vi.fn(), - releaseWsSession: () => {}, + releaseWsSession: (...args: unknown[]) => hoisted.releaseWsSessionMock(...args), })); vi.mock("../../anthropic-payload-log.js", () => ({ @@ -299,7 +313,7 @@ function resetEmbeddedAttemptHarness( hoisted.sessionManagerOpenMock.mockReset().mockReturnValue(hoisted.sessionManager); hoisted.resolveSandboxContextMock.mockReset(); hoisted.acquireSessionWriteLockMock.mockReset().mockResolvedValue({ - release: async () => {}, + release: hoisted.sessionLockReleaseMock, }); hoisted.resolveBootstrapContextForRunMock.mockReset().mockResolvedValue({ bootstrapFiles: [], @@ -307,6 +321,12 @@ function resetEmbeddedAttemptHarness( }); hoisted.getGlobalHookRunnerMock.mockReset().mockReturnValue(undefined); hoisted.runContextEngineMaintenanceMock.mockReset().mockResolvedValue(undefined); + hoisted.sessionLockReleaseMock.mockReset().mockResolvedValue(undefined); + hoisted.flushPendingToolResultsAfterIdleMock.mockReset().mockResolvedValue(undefined); + hoisted.setActiveEmbeddedRunMock.mockReset(); + hoisted.clearActiveEmbeddedRunMock.mockReset(); + hoisted.updateActiveEmbeddedRunSnapshotMock.mockReset(); + hoisted.releaseWsSessionMock.mockReset(); hoisted.sessionManager.getLeafEntry.mockReset().mockReturnValue(null); hoisted.sessionManager.branch.mockReset(); hoisted.sessionManager.resetLeaf.mockReset(); @@ -494,6 +514,71 @@ describe("runEmbeddedAttempt sessions_spawn workspace inheritance", () => { }); }); +describe("runEmbeddedAttempt cleanup", () => { + const tempPaths: string[] = []; + + beforeEach(() => { + resetEmbeddedAttemptHarness({ + subscribeImpl: createSubscriptionMock, + }); + }); + + afterEach(async () => { + await cleanupTempPaths(tempPaths); + }); + + it("clears the active run and releases session resources when idle flush fails", async () => { + const workspaceDir = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-cleanup-workspace-")); + const agentDir = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-cleanup-agent-")); + const sessionFile = path.join(workspaceDir, "session.jsonl"); + tempPaths.push(workspaceDir, agentDir); + await fs.writeFile(sessionFile, "", "utf8"); + + const disposeMock = vi.fn(); + const flushError = new Error("flush failed"); + hoisted.flushPendingToolResultsAfterIdleMock.mockRejectedValueOnce(flushError); + hoisted.createAgentSessionMock.mockImplementation(async () => ({ + session: { + ...createDefaultEmbeddedSession(), + dispose: disposeMock, + }, + })); + + await expect( + runEmbeddedAttempt({ + sessionId: "embedded-session", + sessionKey: "agent:main:test-cleanup", + sessionFile, + workspaceDir, + agentDir, + config: {}, + prompt: "hello", + timeoutMs: 10_000, + runId: "run-cleanup-flush-failure", + provider: "openai", + modelId: "gpt-test", + model: testModel, + authStorage: {} as AuthStorage, + modelRegistry: {} as ModelRegistry, + thinkLevel: "off", + senderIsOwner: true, + disableMessageTool: true, + }), + ).rejects.toThrow(flushError); + + expect(hoisted.flushPendingToolResultsAfterIdleMock).toHaveBeenCalledTimes(1); + expect(hoisted.setActiveEmbeddedRunMock).toHaveBeenCalledTimes(1); + expect(hoisted.clearActiveEmbeddedRunMock).toHaveBeenCalledWith( + "embedded-session", + expect.any(Object), + "agent:main:test-cleanup", + ); + expect(disposeMock).toHaveBeenCalledTimes(1); + expect(hoisted.releaseWsSessionMock).toHaveBeenCalledWith("embedded-session"); + expect(hoisted.sessionLockReleaseMock).toHaveBeenCalledTimes(1); + }); +}); + describe("runEmbeddedAttempt bootstrap warning prompt assembly", () => { const tempPaths: string[] = []; diff --git a/src/agents/pi-embedded-runner/run/attempt.ts b/src/agents/pi-embedded-runner/run/attempt.ts index 21408b4ede2..789825469a7 100644 --- a/src/agents/pi-embedded-runner/run/attempt.ts +++ b/src/agents/pi-embedded-runner/run/attempt.ts @@ -165,6 +165,33 @@ type PromptBuildHookRunner = { const SESSIONS_YIELD_INTERRUPT_CUSTOM_TYPE = "openclaw.sessions_yield_interrupt"; const SESSIONS_YIELD_CONTEXT_CUSTOM_TYPE = "openclaw.sessions_yield"; +async function runCleanupSteps( + steps: ReadonlyArray<{ label: string; run: () => void | Promise }>, +): Promise { + let firstError: unknown; + let hasError = false; + + for (const step of steps) { + try { + await step.run(); + } catch (err) { + if (!hasError) { + firstError = err; + hasError = true; + continue; + } + + log.warn( + `embedded attempt cleanup failed during ${step.label}: ${describeUnknownError(err)}`, + ); + } + } + + if (hasError) { + throw firstError; + } +} + // Persist a hidden context reminder so the next turn knows why the runner stopped. function buildSessionsYieldContextMessage(message: string): string { return `${message}\n\n[Context: The previous turn ended intentionally via sessions_yield while waiting for a follow-up event.]`; @@ -2009,6 +2036,7 @@ export async function runEmbeddedAttempt( let sessionManager: ReturnType | undefined; let session: Awaited>["session"] | undefined; + let queueHandle: EmbeddedPiQueueHandle | undefined; let removeToolResultContextGuard: (() => void) | undefined; try { await repairSessionFileIfNeeded({ @@ -2567,7 +2595,7 @@ export async function runEmbeddedAttempt( getCompactionCount, } = subscription; - const queueHandle: EmbeddedPiQueueHandle = { + queueHandle = { queueMessage: async (text: string) => { await activeSession.steer(text); }, @@ -3192,23 +3220,60 @@ export async function runEmbeddedAttempt( // flushPendingToolResults() fires while tools are still executing, inserting // synthetic "missing tool result" errors and causing silent agent failures. // See: https://github.com/openclaw/openclaw/issues/8643 - removeToolResultContextGuard?.(); - await flushPendingToolResultsAfterIdle({ - agent: session?.agent, - sessionManager, - clearPendingOnTimeout: true, - }); - // Clear embedded run AFTER flushing pending tool results so that all concurrent - // tool executions (e.g. sessions_spawn) complete before the run is considered ended. - // This prevents premature announce when sessions_yield runs in parallel with - // sessions_spawn — the announce flow's waitForEmbeddedPiRunEnd won't resolve - // until spawn registrations are committed to the subagent registry. - clearActiveEmbeddedRun(params.sessionId, queueHandle, params.sessionKey); - session?.dispose(); - releaseWsSession(params.sessionId); - await bundleMcpRuntime?.dispose(); - await bundleLspRuntime?.dispose(); - await sessionLock.release(); + await runCleanupSteps([ + { + label: "tool result context guard removal", + run: () => { + removeToolResultContextGuard?.(); + }, + }, + { + label: "pending tool result flush", + run: () => + flushPendingToolResultsAfterIdle({ + agent: session?.agent, + sessionManager, + clearPendingOnTimeout: true, + }), + }, + { + // Clear embedded run AFTER flushing pending tool results so that all concurrent + // tool executions (e.g. sessions_spawn) complete before the run is considered ended. + // This prevents premature announce when sessions_yield runs in parallel with + // sessions_spawn — the announce flow's waitForEmbeddedPiRunEnd won't resolve + // until spawn registrations are committed to the subagent registry. + label: "active embedded run clear", + run: () => { + if (queueHandle) { + clearActiveEmbeddedRun(params.sessionId, queueHandle, params.sessionKey); + } + }, + }, + { + label: "session dispose", + run: () => { + session?.dispose(); + }, + }, + { + label: "websocket session release", + run: () => { + releaseWsSession(params.sessionId); + }, + }, + { + label: "bundle MCP runtime dispose", + run: () => bundleMcpRuntime?.dispose(), + }, + { + label: "bundle LSP runtime dispose", + run: () => bundleLspRuntime?.dispose(), + }, + { + label: "session lock release", + run: () => sessionLock.release(), + }, + ]); } } finally { restoreSkillEnv?.(); From ca766f4e3ec8b545403a51c832e8b823536c62e3 Mon Sep 17 00:00:00 2001 From: Joey Krug Date: Sat, 14 Mar 2026 23:46:30 -0400 Subject: [PATCH 3/7] fix: clear active embedded run before flush to stay within waiter timeout budget Addresses PR review comments: - Move clearActiveEmbeddedRun before flushPendingToolResultsAfterIdle so waitForEmbeddedPiRunEnd resolves within the 15s budget used by session-reset-service.ts and commands-compact.ts (flush can take up to 30s) - queueHandle guard (if check) prevents issues when uninitialized - runCleanupSteps already wraps each step in try/catch, so flush failures cannot prevent the clear from running (and vice versa) --- src/agents/pi-embedded-runner/run/attempt.ts | 30 +++++++++++--------- 1 file changed, 17 insertions(+), 13 deletions(-) diff --git a/src/agents/pi-embedded-runner/run/attempt.ts b/src/agents/pi-embedded-runner/run/attempt.ts index 789825469a7..25ec3d4e092 100644 --- a/src/agents/pi-embedded-runner/run/attempt.ts +++ b/src/agents/pi-embedded-runner/run/attempt.ts @@ -3227,6 +3227,23 @@ export async function runEmbeddedAttempt( removeToolResultContextGuard?.(); }, }, + { + // Clear embedded run BEFORE flushing pending tool results so that + // waitForEmbeddedPiRunEnd resolves promptly. The flush can take up to 30s + // (idle-wait timeout in wait-for-idle-before-flush.ts), but several callers + // only wait 15s (session-reset-service.ts, commands-compact.ts). Clearing + // first ensures the run is marked ended within the waiter timeout budget. + // + // This is safe because runCleanupSteps executes each step in an isolated + // try/catch — if this step throws (e.g. queueHandle is somehow invalid), + // the flush and remaining teardown still proceed. + label: "active embedded run clear", + run: () => { + if (queueHandle) { + clearActiveEmbeddedRun(params.sessionId, queueHandle, params.sessionKey); + } + }, + }, { label: "pending tool result flush", run: () => @@ -3236,19 +3253,6 @@ export async function runEmbeddedAttempt( clearPendingOnTimeout: true, }), }, - { - // Clear embedded run AFTER flushing pending tool results so that all concurrent - // tool executions (e.g. sessions_spawn) complete before the run is considered ended. - // This prevents premature announce when sessions_yield runs in parallel with - // sessions_spawn — the announce flow's waitForEmbeddedPiRunEnd won't resolve - // until spawn registrations are committed to the subagent registry. - label: "active embedded run clear", - run: () => { - if (queueHandle) { - clearActiveEmbeddedRun(params.sessionId, queueHandle, params.sessionKey); - } - }, - }, { label: "session dispose", run: () => { From e01229347a9184848a836bb5d616fb5cf89d9dd5 Mon Sep 17 00:00:00 2001 From: Joey Krug Date: Sun, 15 Mar 2026 00:07:47 -0400 Subject: [PATCH 4/7] test(agents): add embedded cleanup regressions --- .../run/attempt.spawn-workspace.test.ts | 93 +++++++++++++++++++ 1 file changed, 93 insertions(+) diff --git a/src/agents/pi-embedded-runner/run/attempt.spawn-workspace.test.ts b/src/agents/pi-embedded-runner/run/attempt.spawn-workspace.test.ts index 9bc424f35fd..ca69e168ad6 100644 --- a/src/agents/pi-embedded-runner/run/attempt.spawn-workspace.test.ts +++ b/src/agents/pi-embedded-runner/run/attempt.spawn-workspace.test.ts @@ -573,10 +573,103 @@ describe("runEmbeddedAttempt cleanup", () => { expect.any(Object), "agent:main:test-cleanup", ); + expect(hoisted.clearActiveEmbeddedRunMock.mock.invocationCallOrder[0]).toBeLessThan( + hoisted.flushPendingToolResultsAfterIdleMock.mock.invocationCallOrder[0] ?? + Number.POSITIVE_INFINITY, + ); expect(disposeMock).toHaveBeenCalledTimes(1); expect(hoisted.releaseWsSessionMock).toHaveBeenCalledWith("embedded-session"); expect(hoisted.sessionLockReleaseMock).toHaveBeenCalledTimes(1); }); + + it("skips active-run clear without masking the original error when subscribe fails before registration", async () => { + const workspaceDir = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-cleanup-workspace-")); + const agentDir = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-cleanup-agent-")); + const sessionFile = path.join(workspaceDir, "session.jsonl"); + tempPaths.push(workspaceDir, agentDir); + await fs.writeFile(sessionFile, "", "utf8"); + + const disposeMock = vi.fn(); + const subscribeError = new Error("subscribe failed"); + hoisted.subscribeEmbeddedPiSessionMock.mockReset().mockImplementation(() => { + throw subscribeError; + }); + hoisted.createAgentSessionMock.mockImplementation(async () => ({ + session: { + ...createDefaultEmbeddedSession(), + dispose: disposeMock, + }, + })); + + await expect( + runEmbeddedAttempt({ + sessionId: "embedded-session", + sessionKey: "agent:main:test-subscribe-failure", + sessionFile, + workspaceDir, + agentDir, + config: {}, + prompt: "hello", + timeoutMs: 10_000, + runId: "run-cleanup-subscribe-failure", + provider: "openai", + modelId: "gpt-test", + model: testModel, + authStorage: {} as AuthStorage, + modelRegistry: {} as ModelRegistry, + thinkLevel: "off", + senderIsOwner: true, + disableMessageTool: true, + }), + ).rejects.toThrow(subscribeError); + + expect(hoisted.setActiveEmbeddedRunMock).not.toHaveBeenCalled(); + expect(hoisted.clearActiveEmbeddedRunMock).not.toHaveBeenCalled(); + expect(hoisted.flushPendingToolResultsAfterIdleMock).toHaveBeenCalledTimes(1); + expect(disposeMock).toHaveBeenCalledTimes(1); + expect(hoisted.releaseWsSessionMock).toHaveBeenCalledWith("embedded-session"); + expect(hoisted.sessionLockReleaseMock).toHaveBeenCalledTimes(1); + }); + + it("clears the active run before waiting for the idle flush", async () => { + const workspaceDir = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-cleanup-workspace-")); + const agentDir = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-cleanup-agent-")); + const sessionFile = path.join(workspaceDir, "session.jsonl"); + tempPaths.push(workspaceDir, agentDir); + await fs.writeFile(sessionFile, "", "utf8"); + + hoisted.createAgentSessionMock.mockImplementation(async () => ({ + session: createDefaultEmbeddedSession(), + })); + + const result = await runEmbeddedAttempt({ + sessionId: "embedded-session", + sessionKey: "agent:main:test-cleanup-order", + sessionFile, + workspaceDir, + agentDir, + config: {}, + prompt: "hello", + timeoutMs: 10_000, + runId: "run-cleanup-order", + provider: "openai", + modelId: "gpt-test", + model: testModel, + authStorage: {} as AuthStorage, + modelRegistry: {} as ModelRegistry, + thinkLevel: "off", + senderIsOwner: true, + disableMessageTool: true, + }); + + expect(result.promptError).toBeNull(); + expect(hoisted.clearActiveEmbeddedRunMock).toHaveBeenCalledTimes(1); + expect(hoisted.flushPendingToolResultsAfterIdleMock).toHaveBeenCalledTimes(1); + expect(hoisted.clearActiveEmbeddedRunMock.mock.invocationCallOrder[0]).toBeLessThan( + hoisted.flushPendingToolResultsAfterIdleMock.mock.invocationCallOrder[0] ?? + Number.POSITIVE_INFINITY, + ); + }); }); describe("runEmbeddedAttempt bootstrap warning prompt assembly", () => { From 0ac62697a8d16cb087cbf0944cef5025c0ce0d4c Mon Sep 17 00:00:00 2001 From: Joey Krug Date: Sun, 15 Mar 2026 00:42:45 -0400 Subject: [PATCH 5/7] Tests: cover yield spawn announce gating --- ...subagent-announce.yield-spawn-race.test.ts | 124 ++++++++++++++++++ 1 file changed, 124 insertions(+) create mode 100644 src/agents/subagent-announce.yield-spawn-race.test.ts diff --git a/src/agents/subagent-announce.yield-spawn-race.test.ts b/src/agents/subagent-announce.yield-spawn-race.test.ts new file mode 100644 index 00000000000..1917265e733 --- /dev/null +++ b/src/agents/subagent-announce.yield-spawn-race.test.ts @@ -0,0 +1,124 @@ +import { beforeEach, describe, expect, it, vi } from "vitest"; + +type GatewayCall = { + method?: string; + timeoutMs?: number; + expectFinal?: boolean; + params?: Record; +}; + +const gatewayCalls: GatewayCall[] = []; +let callGatewayImpl: (request: GatewayCall) => Promise = async () => ({}); +let sessionStore: Record> = {}; +let configOverride: ReturnType<(typeof import("../config/config.js"))["loadConfig"]> = { + session: { + mainKey: "main", + scope: "per-sender", + }, +}; + +vi.mock("../gateway/call.js", () => ({ + callGateway: vi.fn(async (request: GatewayCall) => { + gatewayCalls.push(request); + return await callGatewayImpl(request); + }), +})); + +vi.mock("../config/config.js", async (importOriginal) => { + const actual = await importOriginal(); + return { + ...actual, + loadConfig: () => configOverride, + }; +}); + +vi.mock("../config/sessions.js", () => ({ + loadSessionStore: vi.fn(() => sessionStore), + resolveAgentIdFromSessionKey: () => "main", + resolveStorePath: () => "/tmp/sessions-main.json", + resolveMainSessionKey: () => "agent:main:main", +})); + +vi.mock("./pi-embedded.js", () => ({ + isEmbeddedPiRunActive: () => false, + queueEmbeddedPiMessage: () => false, + waitForEmbeddedPiRunEnd: async () => true, +})); + +import { runSubagentAnnounceFlow } from "./subagent-announce.js"; +import { + addSubagentRunForTests, + countPendingDescendantRuns, + resetSubagentRegistryForTests, +} from "./subagent-registry.js"; + +function findFinalDirectAgentCall(): GatewayCall | undefined { + return gatewayCalls.find((call) => call.method === "agent" && call.expectFinal === true); +} + +describe("subagent announce yield + spawn race", () => { + beforeEach(() => { + gatewayCalls.length = 0; + callGatewayImpl = async () => ({}); + sessionStore = {}; + configOverride = { + session: { + mainKey: "main", + scope: "per-sender", + }, + }; + resetSubagentRegistryForTests({ persist: false }); + }); + + it("defers announce when a yield-aborted parent still has a concurrently spawned pending child", async () => { + const parentSessionKey = "agent:main:subagent:orchestrator-race"; + + addSubagentRunForTests({ + runId: "run-orchestrator-race", + childSessionKey: parentSessionKey, + controllerSessionKey: "agent:main:main", + requesterSessionKey: "agent:main:main", + requesterDisplayKey: "main", + task: "orchestrate", + cleanup: "keep", + createdAt: 100, + startedAt: 100, + endedAt: 110, + expectsCompletionMessage: true, + }); + addSubagentRunForTests({ + runId: "run-worker-race", + childSessionKey: `${parentSessionKey}:subagent:worker`, + controllerSessionKey: parentSessionKey, + requesterSessionKey: parentSessionKey, + requesterDisplayKey: parentSessionKey, + task: "child task", + cleanup: "keep", + createdAt: 111, + startedAt: 111, + expectsCompletionMessage: true, + }); + + // Regression guard: when sessions_spawn commits before the announce check, + // the parent must still see the pending child and defer its completion. + expect(countPendingDescendantRuns(parentSessionKey)).toBe(1); + + const didAnnounce = await runSubagentAnnounceFlow({ + childSessionKey: parentSessionKey, + childRunId: "run-orchestrator-race", + requesterSessionKey: "agent:main:main", + requesterDisplayKey: "main", + task: "orchestrate", + timeoutMs: 1_000, + cleanup: "keep", + roundOneReply: "Yielded after concurrent sessions_spawn.", + waitForCompletion: false, + expectsCompletionMessage: true, + outcome: { status: "ok" }, + }); + + expect(didAnnounce).toBe(false); + expect(countPendingDescendantRuns(parentSessionKey)).toBe(1); + expect(findFinalDirectAgentCall()).toBeUndefined(); + }); +}); From 46c355bb5244df2492a2b9e3dc60d6bd5d3c1d83 Mon Sep 17 00:00:00 2001 From: Joey Krug Date: Sun, 15 Mar 2026 01:07:52 -0400 Subject: [PATCH 6/7] fix(agents): log warning for first cleanup step failure in runCleanupSteps --- src/agents/pi-embedded-runner/run/attempt.ts | 3 +++ 1 file changed, 3 insertions(+) diff --git a/src/agents/pi-embedded-runner/run/attempt.ts b/src/agents/pi-embedded-runner/run/attempt.ts index 25ec3d4e092..814c8efbcf7 100644 --- a/src/agents/pi-embedded-runner/run/attempt.ts +++ b/src/agents/pi-embedded-runner/run/attempt.ts @@ -178,6 +178,9 @@ async function runCleanupSteps( if (!hasError) { firstError = err; hasError = true; + log.warn( + `embedded attempt cleanup failed during ${step.label}: ${describeUnknownError(err)}`, + ); continue; } From f1ad5a4733cca228805017857d51948211b22f89 Mon Sep 17 00:00:00 2001 From: Joey Krug Date: Sat, 21 Mar 2026 00:42:38 -0400 Subject: [PATCH 7/7] fix: address codex review comments on #46741 - Move clearActiveEmbeddedRun AFTER session lock release in cleanup sequence so restart drains and waitForEmbeddedPiRunEnd barriers stay consistent - Guard queueHandle access with sessionLockReleased flag to prevent TDZ errors - Clear memoized registry promise on import failure so retries work - Split registry load vs query errors: load failure falls back to best-effort announce; query failure still defers to avoid premature completion - Guard requesterIsSubagent path with subagentRegistryRuntime availability - Simplify runCleanupSteps: remove duplicate log.warn, use undefined check - Fix missing closing brace in timeout test from rebase --- .../run/attempt.spawn-workspace.test.ts | 62 +++++++++- src/agents/pi-embedded-runner/run/attempt.ts | 48 +++----- ...nce.registry-runtime-load-fallback.test.ts | 116 ++++++++++++++++++ src/agents/subagent-announce.timeout.test.ts | 1 + src/agents/subagent-announce.ts | 78 +++++++----- 5 files changed, 239 insertions(+), 66 deletions(-) create mode 100644 src/agents/subagent-announce.registry-runtime-load-fallback.test.ts diff --git a/src/agents/pi-embedded-runner/run/attempt.spawn-workspace.test.ts b/src/agents/pi-embedded-runner/run/attempt.spawn-workspace.test.ts index ca69e168ad6..42c97a7ce1b 100644 --- a/src/agents/pi-embedded-runner/run/attempt.spawn-workspace.test.ts +++ b/src/agents/pi-embedded-runner/run/attempt.spawn-workspace.test.ts @@ -573,13 +573,17 @@ describe("runEmbeddedAttempt cleanup", () => { expect.any(Object), "agent:main:test-cleanup", ); - expect(hoisted.clearActiveEmbeddedRunMock.mock.invocationCallOrder[0]).toBeLessThan( - hoisted.flushPendingToolResultsAfterIdleMock.mock.invocationCallOrder[0] ?? + expect(hoisted.flushPendingToolResultsAfterIdleMock.mock.invocationCallOrder[0]).toBeLessThan( + hoisted.clearActiveEmbeddedRunMock.mock.invocationCallOrder[0] ?? Number.POSITIVE_INFINITY, ); expect(disposeMock).toHaveBeenCalledTimes(1); expect(hoisted.releaseWsSessionMock).toHaveBeenCalledWith("embedded-session"); expect(hoisted.sessionLockReleaseMock).toHaveBeenCalledTimes(1); + expect(hoisted.sessionLockReleaseMock.mock.invocationCallOrder[0]).toBeLessThan( + hoisted.clearActiveEmbeddedRunMock.mock.invocationCallOrder[0] ?? + Number.POSITIVE_INFINITY, + ); }); it("skips active-run clear without masking the original error when subscribe fails before registration", async () => { @@ -631,7 +635,7 @@ describe("runEmbeddedAttempt cleanup", () => { expect(hoisted.sessionLockReleaseMock).toHaveBeenCalledTimes(1); }); - it("clears the active run before waiting for the idle flush", async () => { + it("clears the active run only after idle flush and session lock release finish", async () => { const workspaceDir = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-cleanup-workspace-")); const agentDir = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-cleanup-agent-")); const sessionFile = path.join(workspaceDir, "session.jsonl"); @@ -665,11 +669,59 @@ describe("runEmbeddedAttempt cleanup", () => { expect(result.promptError).toBeNull(); expect(hoisted.clearActiveEmbeddedRunMock).toHaveBeenCalledTimes(1); expect(hoisted.flushPendingToolResultsAfterIdleMock).toHaveBeenCalledTimes(1); - expect(hoisted.clearActiveEmbeddedRunMock.mock.invocationCallOrder[0]).toBeLessThan( - hoisted.flushPendingToolResultsAfterIdleMock.mock.invocationCallOrder[0] ?? + expect(hoisted.sessionLockReleaseMock).toHaveBeenCalledTimes(1); + expect(hoisted.flushPendingToolResultsAfterIdleMock.mock.invocationCallOrder[0]).toBeLessThan( + hoisted.sessionLockReleaseMock.mock.invocationCallOrder[0] ?? Number.POSITIVE_INFINITY, + ); + expect(hoisted.sessionLockReleaseMock.mock.invocationCallOrder[0]).toBeLessThan( + hoisted.clearActiveEmbeddedRunMock.mock.invocationCallOrder[0] ?? Number.POSITIVE_INFINITY, ); }); + + it("keeps the active run registered when session lock release fails", async () => { + const workspaceDir = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-cleanup-workspace-")); + const agentDir = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-cleanup-agent-")); + const sessionFile = path.join(workspaceDir, "session.jsonl"); + tempPaths.push(workspaceDir, agentDir); + await fs.writeFile(sessionFile, "", "utf8"); + + const lockReleaseError = new Error("lock release failed"); + hoisted.sessionLockReleaseMock.mockRejectedValueOnce(lockReleaseError); + hoisted.createAgentSessionMock.mockImplementation(async () => ({ + session: createDefaultEmbeddedSession(), + })); + + await expect( + runEmbeddedAttempt({ + sessionId: "embedded-session", + sessionKey: "agent:main:test-cleanup-lock-release", + sessionFile, + workspaceDir, + agentDir, + config: {}, + prompt: "hello", + timeoutMs: 10_000, + runId: "run-cleanup-lock-release", + provider: "openai", + modelId: "gpt-test", + model: testModel, + authStorage: {} as AuthStorage, + modelRegistry: {} as ModelRegistry, + thinkLevel: "off", + senderIsOwner: true, + disableMessageTool: true, + }), + ).rejects.toThrow(lockReleaseError); + + expect(hoisted.flushPendingToolResultsAfterIdleMock).toHaveBeenCalledTimes(1); + expect(hoisted.sessionLockReleaseMock).toHaveBeenCalledTimes(1); + expect(hoisted.clearActiveEmbeddedRunMock).not.toHaveBeenCalled(); + expect(hoisted.releaseWsSessionMock).toHaveBeenCalledWith("embedded-session"); + expect(hoisted.flushPendingToolResultsAfterIdleMock.mock.invocationCallOrder[0]).toBeLessThan( + hoisted.sessionLockReleaseMock.mock.invocationCallOrder[0] ?? Number.POSITIVE_INFINITY, + ); + }); }); describe("runEmbeddedAttempt bootstrap warning prompt assembly", () => { diff --git a/src/agents/pi-embedded-runner/run/attempt.ts b/src/agents/pi-embedded-runner/run/attempt.ts index 814c8efbcf7..e1fdf6afd52 100644 --- a/src/agents/pi-embedded-runner/run/attempt.ts +++ b/src/agents/pi-embedded-runner/run/attempt.ts @@ -168,29 +168,22 @@ const SESSIONS_YIELD_CONTEXT_CUSTOM_TYPE = "openclaw.sessions_yield"; async function runCleanupSteps( steps: ReadonlyArray<{ label: string; run: () => void | Promise }>, ): Promise { - let firstError: unknown; - let hasError = false; + let firstError: unknown | undefined; for (const step of steps) { try { await step.run(); } catch (err) { - if (!hasError) { + if (firstError === undefined) { firstError = err; - hasError = true; - log.warn( - `embedded attempt cleanup failed during ${step.label}: ${describeUnknownError(err)}`, - ); - continue; } - log.warn( `embedded attempt cleanup failed during ${step.label}: ${describeUnknownError(err)}`, ); } } - if (hasError) { + if (firstError !== undefined) { throw firstError; } } @@ -3223,6 +3216,7 @@ export async function runEmbeddedAttempt( // flushPendingToolResults() fires while tools are still executing, inserting // synthetic "missing tool result" errors and causing silent agent failures. // See: https://github.com/openclaw/openclaw/issues/8643 + let sessionLockReleased = false; await runCleanupSteps([ { label: "tool result context guard removal", @@ -3230,23 +3224,6 @@ export async function runEmbeddedAttempt( removeToolResultContextGuard?.(); }, }, - { - // Clear embedded run BEFORE flushing pending tool results so that - // waitForEmbeddedPiRunEnd resolves promptly. The flush can take up to 30s - // (idle-wait timeout in wait-for-idle-before-flush.ts), but several callers - // only wait 15s (session-reset-service.ts, commands-compact.ts). Clearing - // first ensures the run is marked ended within the waiter timeout budget. - // - // This is safe because runCleanupSteps executes each step in an isolated - // try/catch — if this step throws (e.g. queueHandle is somehow invalid), - // the flush and remaining teardown still proceed. - label: "active embedded run clear", - run: () => { - if (queueHandle) { - clearActiveEmbeddedRun(params.sessionId, queueHandle, params.sessionKey); - } - }, - }, { label: "pending tool result flush", run: () => @@ -3278,7 +3255,22 @@ export async function runEmbeddedAttempt( }, { label: "session lock release", - run: () => sessionLock.release(), + run: async () => { + await sessionLock.release(); + sessionLockReleased = true; + }, + }, + { + // Keep the embedded run registered until idle flush and lock release finish. + // waitForEmbeddedPiRunEnd() and restart drains rely on this barrier so yielded + // or aborted runs do not look fully idle while descendant spawns/tool cleanup + // may still be settling. + label: "active embedded run clear", + run: () => { + if (queueHandle && sessionLockReleased) { + clearActiveEmbeddedRun(params.sessionId, queueHandle, params.sessionKey); + } + }, }, ]); } diff --git a/src/agents/subagent-announce.registry-runtime-load-fallback.test.ts b/src/agents/subagent-announce.registry-runtime-load-fallback.test.ts new file mode 100644 index 00000000000..ea9da885a94 --- /dev/null +++ b/src/agents/subagent-announce.registry-runtime-load-fallback.test.ts @@ -0,0 +1,116 @@ +import { afterEach, describe, expect, it, vi } from "vitest"; + +type GatewayCall = { + method?: string; + timeoutMs?: number; + expectFinal?: boolean; + params?: Record; +}; + +const gatewayCalls: GatewayCall[] = []; +let registryRuntimeLoadAttempts = 0; + +async function importModuleWithRegistryRuntimeFailure() { + vi.resetModules(); + gatewayCalls.length = 0; + registryRuntimeLoadAttempts = 0; + + vi.doMock("../gateway/call.js", () => ({ + callGateway: vi.fn(async (request: GatewayCall) => { + gatewayCalls.push(request); + if (request.method === "chat.history") { + return { messages: [] }; + } + return {}; + }), + })); + + vi.doMock("../config/config.js", () => ({ + loadConfig: () => ({ + session: { + mainKey: "main", + scope: "per-sender", + }, + }), + })); + + vi.doMock("../config/sessions.js", () => ({ + loadSessionStore: vi.fn(() => ({ + "agent:main:main": { sessionId: "sess-main", updatedAt: 1 }, + "agent:main:subagent:worker": { sessionId: "sess-worker", updatedAt: 1 }, + })), + resolveAgentIdFromSessionKey: () => "main", + resolveStorePath: () => "/tmp/sessions-main.json", + resolveMainSessionKey: () => "agent:main:main", + })); + + vi.doMock("./pi-embedded.js", () => ({ + isEmbeddedPiRunActive: () => false, + queueEmbeddedPiMessage: () => false, + waitForEmbeddedPiRunEnd: async () => true, + })); + + vi.doMock("./subagent-depth.js", () => ({ + getSubagentDepthFromSessionStore: () => 0, + })); + + vi.doMock("./subagent-registry-runtime.js", () => { + registryRuntimeLoadAttempts += 1; + throw new Error("registry runtime load failed"); + }); + + return await import("./subagent-announce.js"); +} + +describe("subagent announce registry runtime fallback", () => { + afterEach(() => { + vi.resetModules(); + vi.doUnmock("../gateway/call.js"); + vi.doUnmock("../config/config.js"); + vi.doUnmock("../config/sessions.js"); + vi.doUnmock("./pi-embedded.js"); + vi.doUnmock("./subagent-depth.js"); + vi.doUnmock("./subagent-registry-runtime.js"); + gatewayCalls.length = 0; + registryRuntimeLoadAttempts = 0; + }); + + it("falls back to best-effort announce and retries runtime load after a failure", async () => { + const { runSubagentAnnounceFlow } = await importModuleWithRegistryRuntimeFailure(); + + const baseParams = { + childSessionKey: "agent:main:subagent:worker", + requesterSessionKey: "agent:main:main", + requesterDisplayKey: "main", + task: "do thing", + timeoutMs: 1_000, + cleanup: "keep" as const, + roundOneReply: "done", + waitForCompletion: false, + outcome: { status: "ok" as const }, + }; + + await expect( + runSubagentAnnounceFlow({ + ...baseParams, + childRunId: "run-registry-load-failure-1", + }), + ).resolves.toBe(true); + + await expect( + runSubagentAnnounceFlow({ + ...baseParams, + childRunId: "run-registry-load-failure-2", + }), + ).resolves.toBe(true); + + const directAgentCalls = gatewayCalls.filter( + (call) => call.method === "agent" && call.expectFinal === true, + ); + expect(directAgentCalls).toHaveLength(2); + expect(directAgentCalls.every((call) => call.params?.sessionKey === "agent:main:main")).toBe( + true, + ); + expect(registryRuntimeLoadAttempts).toBe(2); + }); +}); diff --git a/src/agents/subagent-announce.timeout.test.ts b/src/agents/subagent-announce.timeout.test.ts index 23f5f84c407..56c1950c5eb 100644 --- a/src/agents/subagent-announce.timeout.test.ts +++ b/src/agents/subagent-announce.timeout.test.ts @@ -454,6 +454,7 @@ describe("subagent announce timeout config", () => { expect( findGatewayCall((call) => call.method === "agent" && call.expectFinal === true), ).toBeUndefined(); + }); it("regression, defers announce when registry check throws instead of sending premature completion", async () => { registryCheckShouldThrow = true; diff --git a/src/agents/subagent-announce.ts b/src/agents/subagent-announce.ts index 96d278711fa..4c2e59ae9ec 100644 --- a/src/agents/subagent-announce.ts +++ b/src/agents/subagent-announce.ts @@ -59,7 +59,10 @@ let subagentRegistryRuntimePromise: Promise< > | null = null; function loadSubagentRegistryRuntime() { - subagentRegistryRuntimePromise ??= import("./subagent-registry-runtime.js"); + subagentRegistryRuntimePromise ??= import("./subagent-registry-runtime.js").catch((err) => { + subagentRegistryRuntimePromise = null; + throw err; + }); return subagentRegistryRuntimePromise; } @@ -1332,40 +1335,49 @@ export async function runSubagentAnnounceFlow(params: { | undefined; try { subagentRegistryRuntime = await loadSubagentRegistryRuntime(); - if ( - requesterDepth >= 1 && - subagentRegistryRuntime.shouldIgnorePostCompletionAnnounceForSession( - targetRequesterSessionKey, - ) - ) { - return true; - } - - const pendingChildDescendantRuns = Math.max( - 0, - subagentRegistryRuntime.countPendingDescendantRuns(params.childSessionKey), + } catch (err) { + defaultRuntime.log( + `[warn] Subagent announce registry runtime load failed; using best-effort announce fallback: ${summarizeDeliveryError(err)}`, ); - if (pendingChildDescendantRuns > 0 && announceType !== "cron job") { + } + if (subagentRegistryRuntime) { + try { + if ( + requesterDepth >= 1 && + subagentRegistryRuntime.shouldIgnorePostCompletionAnnounceForSession( + targetRequesterSessionKey, + ) + ) { + return true; + } + + const pendingChildDescendantRuns = Math.max( + 0, + subagentRegistryRuntime.countPendingDescendantRuns(params.childSessionKey), + ); + if (pendingChildDescendantRuns > 0 && announceType !== "cron job") { + shouldDeleteChildSession = false; + return false; + } + + if (typeof subagentRegistryRuntime.listSubagentRunsForRequester === "function") { + const directChildren = subagentRegistryRuntime.listSubagentRunsForRequester( + params.childSessionKey, + { + requesterRunId: params.childRunId, + }, + ); + if (Array.isArray(directChildren) && directChildren.length > 0) { + childCompletionFindings = buildChildCompletionFindings(directChildren); + } + } + } catch { + // If the registry query fails after runtime load, defer the announce rather than + // risk a premature completion message. The deferred cleanup path will retry once + // descendants settle. shouldDeleteChildSession = false; return false; } - - if (typeof subagentRegistryRuntime.listSubagentRunsForRequester === "function") { - const directChildren = subagentRegistryRuntime.listSubagentRunsForRequester( - params.childSessionKey, - { - requesterRunId: params.childRunId, - }, - ); - if (Array.isArray(directChildren) && directChildren.length > 0) { - childCompletionFindings = buildChildCompletionFindings(directChildren); - } - } - } catch { - // If the registry check fails, defer the announce rather than risk a premature - // completion message. The deferred cleanup path will retry once descendants settle. - shouldDeleteChildSession = false; - return false; } const announceId = buildAnnounceIdFromChildRun({ @@ -1453,12 +1465,12 @@ export async function runSubagentAnnounceFlow(params: { const findings = childCompletionFindings || reply || "(no output)"; let requesterIsSubagent = requesterIsInternalSession(); - if (requesterIsSubagent) { + if (requesterIsSubagent && subagentRegistryRuntime) { const { isSubagentSessionRunActive, resolveRequesterForChildSession, shouldIgnorePostCompletionAnnounceForSession, - } = subagentRegistryRuntime ?? (await loadSubagentRegistryRuntime()); + } = subagentRegistryRuntime; if (!isSubagentSessionRunActive(targetRequesterSessionKey)) { if (shouldIgnorePostCompletionAnnounceForSession(targetRequesterSessionKey)) { return true;