From 2e4631fbe427f2520cd186131ba153c191f3f26c Mon Sep 17 00:00:00 2001 From: Joey Krug Date: Sat, 14 Mar 2026 22:17:03 -0400 Subject: [PATCH] fix: defer sessions_yield announce until all concurrent tool executions settle (#46719) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit When parallel_tool_calls is enabled and sessions_yield runs concurrently with sessions_spawn, the embedded run was marked ended (clearActiveEmbeddedRun) before flushPendingToolResultsAfterIdle completed. This caused the announce flow's waitForEmbeddedPiRunEnd to resolve prematurely — before spawn registrations committed to the subagent registry — leading to countPendingDescendantRuns returning 0 and a premature completion announce. Fix: move clearActiveEmbeddedRun after flushPendingToolResultsAfterIdle so the embedded run stays active until all concurrent tool executions finish. Also fix the silent catch block in runSubagentAnnounceFlow to defer (return false) instead of silently proceeding when the registry check fails. Co-Authored-By: Claude Opus 4.6 --- src/agents/pi-embedded-runner/run/attempt.ts | 7 ++- src/agents/subagent-announce.timeout.test.ts | 31 +++++++++- src/agents/subagent-announce.ts | 5 +- .../subagent-registry.nested.e2e.test.ts | 59 +++++++++++++++++++ 4 files changed, 99 insertions(+), 3 deletions(-) diff --git a/src/agents/pi-embedded-runner/run/attempt.ts b/src/agents/pi-embedded-runner/run/attempt.ts index d785218f819..21408b4ede2 100644 --- a/src/agents/pi-embedded-runner/run/attempt.ts +++ b/src/agents/pi-embedded-runner/run/attempt.ts @@ -3113,7 +3113,6 @@ export async function runEmbeddedAttempt( `CRITICAL: unsubscribe failed, possible resource leak: runId=${params.runId} ${String(err)}`, ); } - clearActiveEmbeddedRun(params.sessionId, queueHandle, params.sessionKey); params.abortSignal?.removeEventListener?.("abort", onAbort); } @@ -3199,6 +3198,12 @@ export async function runEmbeddedAttempt( sessionManager, clearPendingOnTimeout: true, }); + // Clear embedded run AFTER flushing pending tool results so that all concurrent + // tool executions (e.g. sessions_spawn) complete before the run is considered ended. + // This prevents premature announce when sessions_yield runs in parallel with + // sessions_spawn — the announce flow's waitForEmbeddedPiRunEnd won't resolve + // until spawn registrations are committed to the subagent registry. + clearActiveEmbeddedRun(params.sessionId, queueHandle, params.sessionKey); session?.dispose(); releaseWsSession(params.sessionId); await bundleMcpRuntime?.dispose(); diff --git a/src/agents/subagent-announce.timeout.test.ts b/src/agents/subagent-announce.timeout.test.ts index 52cde0f69b0..23f5f84c407 100644 --- a/src/agents/subagent-announce.timeout.test.ts +++ b/src/agents/subagent-announce.timeout.test.ts @@ -25,6 +25,7 @@ let requesterDepthResolver: (sessionKey?: string) => number = () => 0; let subagentSessionRunActive = true; let shouldIgnorePostCompletion = false; let pendingDescendantRuns = 0; +let registryCheckShouldThrow = false; let fallbackRequesterResolution: { requesterSessionKey: string; requesterOrigin?: { channel?: string; to?: string; accountId?: string }; @@ -68,7 +69,12 @@ vi.mock("./pi-embedded.js", () => ({ vi.mock("./subagent-registry.js", () => ({ countActiveDescendantRuns: () => 0, - countPendingDescendantRuns: () => pendingDescendantRuns, + countPendingDescendantRuns: () => { + if (registryCheckShouldThrow) { + throw new Error("registry unavailable"); + } + return pendingDescendantRuns; + }, listSubagentRunsForRequester: () => [], isSubagentSessionRunActive: () => subagentSessionRunActive, shouldIgnorePostCompletionAnnounceForSession: () => shouldIgnorePostCompletion, @@ -157,6 +163,7 @@ describe("subagent announce timeout config", () => { subagentSessionRunActive = true; shouldIgnorePostCompletion = false; pendingDescendantRuns = 0; + registryCheckShouldThrow = false; fallbackRequesterResolution = null; }); @@ -447,5 +454,27 @@ describe("subagent announce timeout config", () => { expect( findGatewayCall((call) => call.method === "agent" && call.expectFinal === true), ).toBeUndefined(); + + it("regression, defers announce when registry check throws instead of sending premature completion", async () => { + registryCheckShouldThrow = true; + + const didAnnounce = await runAnnounceFlowForTest("run-registry-error"); + + expect(didAnnounce).toBe(false); + expect(findFinalDirectAgentCall()).toBeUndefined(); + }); + + it("regression, defers announce for yield-aborted subagent with pending child spawns", async () => { + requesterDepthResolver = () => 1; + pendingDescendantRuns = 1; + + const didAnnounce = await runAnnounceFlowForTest("run-yield-pending-children", { + requesterSessionKey: "agent:main:subagent:orchestrator", + requesterDisplayKey: "subagent:orchestrator", + childSessionKey: "agent:main:subagent:orchestrator:subagent:worker", + }); + + expect(didAnnounce).toBe(false); + expect(findFinalDirectAgentCall()).toBeUndefined(); }); }); diff --git a/src/agents/subagent-announce.ts b/src/agents/subagent-announce.ts index ab2fbb1140e..96d278711fa 100644 --- a/src/agents/subagent-announce.ts +++ b/src/agents/subagent-announce.ts @@ -1362,7 +1362,10 @@ export async function runSubagentAnnounceFlow(params: { } } } catch { - // Best-effort only. + // If the registry check fails, defer the announce rather than risk a premature + // completion message. The deferred cleanup path will retry once descendants settle. + shouldDeleteChildSession = false; + return false; } const announceId = buildAnnounceIdFromChildRun({ diff --git a/src/agents/subagent-registry.nested.e2e.test.ts b/src/agents/subagent-registry.nested.e2e.test.ts index 06148705986..7cb91472caf 100644 --- a/src/agents/subagent-registry.nested.e2e.test.ts +++ b/src/agents/subagent-registry.nested.e2e.test.ts @@ -326,4 +326,63 @@ describe("subagent registry nested agent tracking", () => { expect(countPendingDescendantRunsExcludingRun("agent:main:main", "run-self")).toBe(1); expect(countPendingDescendantRunsExcludingRun("agent:main:main", "run-sibling")).toBe(1); }); + + it("yield-aborted parent with active children reports pending descendants correctly", async () => { + const { addSubagentRunForTests, countPendingDescendantRuns, countActiveDescendantRuns } = + subagentRegistry; + + // Orchestrator yielded (ended) after spawning children — simulates sessions_yield + addSubagentRunForTests({ + runId: "run-orch-yielded", + childSessionKey: "agent:main:subagent:orch-yield", + requesterSessionKey: "agent:main:main", + requesterDisplayKey: "main", + task: "orchestrate with yield", + cleanup: "keep", + createdAt: 1, + startedAt: 1, + endedAt: 2, + cleanupHandled: false, + cleanupCompletedAt: undefined, + }); + + // Child spawned by orchestrator — still running (no endedAt) + addSubagentRunForTests({ + runId: "run-child-active", + childSessionKey: "agent:main:subagent:orch-yield:subagent:worker-1", + requesterSessionKey: "agent:main:subagent:orch-yield", + requesterDisplayKey: "orch-yield", + task: "worker task", + cleanup: "keep", + createdAt: 1, + startedAt: 2, + cleanupHandled: false, + }); + + // The orchestrator has pending descendants (child still running + orchestrator cleanup not done) + expect(countPendingDescendantRuns("agent:main:subagent:orch-yield")).toBe(1); + // The child is active under the orchestrator + expect(countActiveDescendantRuns("agent:main:subagent:orch-yield")).toBe(1); + // From main's perspective, the orchestrator itself is pending (cleanup not completed) + expect(countPendingDescendantRuns("agent:main:main")).toBe(2); + + // Child completes and finishes cleanup + addSubagentRunForTests({ + runId: "run-child-active", + childSessionKey: "agent:main:subagent:orch-yield:subagent:worker-1", + requesterSessionKey: "agent:main:subagent:orch-yield", + requesterDisplayKey: "orch-yield", + task: "worker task", + cleanup: "keep", + createdAt: 1, + startedAt: 2, + endedAt: 3, + cleanupHandled: true, + cleanupCompletedAt: 4, + }); + + // Now only the orchestrator itself remains pending (no cleanup yet) + expect(countPendingDescendantRuns("agent:main:subagent:orch-yield")).toBe(0); + expect(countPendingDescendantRuns("agent:main:main")).toBe(1); + }); });