diff --git a/src/agents/pi-embedded-runner/run/attempt.spawn-workspace.test.ts b/src/agents/pi-embedded-runner/run/attempt.spawn-workspace.test.ts index 20617816e6e..42c97a7ce1b 100644 --- a/src/agents/pi-embedded-runner/run/attempt.spawn-workspace.test.ts +++ b/src/agents/pi-embedded-runner/run/attempt.spawn-workspace.test.ts @@ -41,6 +41,12 @@ const hoisted = vi.hoisted(() => { const getGlobalHookRunnerMock = vi.fn<() => unknown>(() => undefined); const initializeGlobalHookRunnerMock = vi.fn(); const runContextEngineMaintenanceMock = vi.fn(async (_params?: unknown) => undefined); + const sessionLockReleaseMock = vi.fn(async () => {}); + const flushPendingToolResultsAfterIdleMock = vi.fn(async () => {}); + const setActiveEmbeddedRunMock = vi.fn(); + const clearActiveEmbeddedRunMock = vi.fn(); + const updateActiveEmbeddedRunSnapshotMock = vi.fn(); + const releaseWsSessionMock = vi.fn(); const sessionManager = { getLeafEntry: vi.fn(() => null), branch: vi.fn(), @@ -59,6 +65,12 @@ const hoisted = vi.hoisted(() => { getGlobalHookRunnerMock, initializeGlobalHookRunnerMock, runContextEngineMaintenanceMock, + sessionLockReleaseMock, + flushPendingToolResultsAfterIdleMock, + setActiveEmbeddedRunMock, + clearActiveEmbeddedRunMock, + updateActiveEmbeddedRunSnapshotMock, + releaseWsSessionMock, sessionManager, }; }); @@ -177,12 +189,14 @@ vi.mock("../tool-result-context-guard.js", () => ({ })); vi.mock("../wait-for-idle-before-flush.js", () => ({ - flushPendingToolResultsAfterIdle: async () => {}, + flushPendingToolResultsAfterIdle: hoisted.flushPendingToolResultsAfterIdleMock, })); vi.mock("../runs.js", () => ({ - setActiveEmbeddedRun: () => {}, - clearActiveEmbeddedRun: () => {}, + setActiveEmbeddedRun: (...args: unknown[]) => hoisted.setActiveEmbeddedRunMock(...args), + clearActiveEmbeddedRun: (...args: unknown[]) => hoisted.clearActiveEmbeddedRunMock(...args), + updateActiveEmbeddedRunSnapshot: (...args: unknown[]) => + hoisted.updateActiveEmbeddedRunSnapshotMock(...args), })); vi.mock("./images.js", () => ({ @@ -214,7 +228,7 @@ vi.mock("../extra-params.js", () => ({ vi.mock("../../openai-ws-stream.js", () => ({ createOpenAIWebSocketStreamFn: vi.fn(), - releaseWsSession: () => {}, + releaseWsSession: (...args: unknown[]) => hoisted.releaseWsSessionMock(...args), })); vi.mock("../../anthropic-payload-log.js", () => ({ @@ -299,7 +313,7 @@ function resetEmbeddedAttemptHarness( hoisted.sessionManagerOpenMock.mockReset().mockReturnValue(hoisted.sessionManager); hoisted.resolveSandboxContextMock.mockReset(); hoisted.acquireSessionWriteLockMock.mockReset().mockResolvedValue({ - release: async () => {}, + release: hoisted.sessionLockReleaseMock, }); hoisted.resolveBootstrapContextForRunMock.mockReset().mockResolvedValue({ bootstrapFiles: [], @@ -307,6 +321,12 @@ function resetEmbeddedAttemptHarness( }); hoisted.getGlobalHookRunnerMock.mockReset().mockReturnValue(undefined); hoisted.runContextEngineMaintenanceMock.mockReset().mockResolvedValue(undefined); + hoisted.sessionLockReleaseMock.mockReset().mockResolvedValue(undefined); + hoisted.flushPendingToolResultsAfterIdleMock.mockReset().mockResolvedValue(undefined); + hoisted.setActiveEmbeddedRunMock.mockReset(); + hoisted.clearActiveEmbeddedRunMock.mockReset(); + hoisted.updateActiveEmbeddedRunSnapshotMock.mockReset(); + hoisted.releaseWsSessionMock.mockReset(); hoisted.sessionManager.getLeafEntry.mockReset().mockReturnValue(null); hoisted.sessionManager.branch.mockReset(); hoisted.sessionManager.resetLeaf.mockReset(); @@ -494,6 +514,216 @@ describe("runEmbeddedAttempt sessions_spawn workspace inheritance", () => { }); }); +describe("runEmbeddedAttempt cleanup", () => { + const tempPaths: string[] = []; + + beforeEach(() => { + resetEmbeddedAttemptHarness({ + subscribeImpl: createSubscriptionMock, + }); + }); + + afterEach(async () => { + await cleanupTempPaths(tempPaths); + }); + + it("clears the active run and releases session resources when idle flush fails", async () => { + const workspaceDir = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-cleanup-workspace-")); + const agentDir = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-cleanup-agent-")); + const sessionFile = path.join(workspaceDir, "session.jsonl"); + tempPaths.push(workspaceDir, agentDir); + await fs.writeFile(sessionFile, "", "utf8"); + + const disposeMock = vi.fn(); + const flushError = new Error("flush failed"); + hoisted.flushPendingToolResultsAfterIdleMock.mockRejectedValueOnce(flushError); + hoisted.createAgentSessionMock.mockImplementation(async () => ({ + session: { + ...createDefaultEmbeddedSession(), + dispose: disposeMock, + }, + })); + + await expect( + runEmbeddedAttempt({ + sessionId: "embedded-session", + sessionKey: "agent:main:test-cleanup", + sessionFile, + workspaceDir, + agentDir, + config: {}, + prompt: "hello", + timeoutMs: 10_000, + runId: "run-cleanup-flush-failure", + provider: "openai", + modelId: "gpt-test", + model: testModel, + authStorage: {} as AuthStorage, + modelRegistry: {} as ModelRegistry, + thinkLevel: "off", + senderIsOwner: true, + disableMessageTool: true, + }), + ).rejects.toThrow(flushError); + + expect(hoisted.flushPendingToolResultsAfterIdleMock).toHaveBeenCalledTimes(1); + expect(hoisted.setActiveEmbeddedRunMock).toHaveBeenCalledTimes(1); + expect(hoisted.clearActiveEmbeddedRunMock).toHaveBeenCalledWith( + "embedded-session", + expect.any(Object), + "agent:main:test-cleanup", + ); + expect(hoisted.flushPendingToolResultsAfterIdleMock.mock.invocationCallOrder[0]).toBeLessThan( + hoisted.clearActiveEmbeddedRunMock.mock.invocationCallOrder[0] ?? + Number.POSITIVE_INFINITY, + ); + expect(disposeMock).toHaveBeenCalledTimes(1); + expect(hoisted.releaseWsSessionMock).toHaveBeenCalledWith("embedded-session"); + expect(hoisted.sessionLockReleaseMock).toHaveBeenCalledTimes(1); + expect(hoisted.sessionLockReleaseMock.mock.invocationCallOrder[0]).toBeLessThan( + hoisted.clearActiveEmbeddedRunMock.mock.invocationCallOrder[0] ?? + Number.POSITIVE_INFINITY, + ); + }); + + it("skips active-run clear without masking the original error when subscribe fails before registration", async () => { + const workspaceDir = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-cleanup-workspace-")); + const agentDir = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-cleanup-agent-")); + const sessionFile = path.join(workspaceDir, "session.jsonl"); + tempPaths.push(workspaceDir, agentDir); + await fs.writeFile(sessionFile, "", "utf8"); + + const disposeMock = vi.fn(); + const subscribeError = new Error("subscribe failed"); + hoisted.subscribeEmbeddedPiSessionMock.mockReset().mockImplementation(() => { + throw subscribeError; + }); + hoisted.createAgentSessionMock.mockImplementation(async () => ({ + session: { + ...createDefaultEmbeddedSession(), + dispose: disposeMock, + }, + })); + + await expect( + runEmbeddedAttempt({ + sessionId: "embedded-session", + sessionKey: "agent:main:test-subscribe-failure", + sessionFile, + workspaceDir, + agentDir, + config: {}, + prompt: "hello", + timeoutMs: 10_000, + runId: "run-cleanup-subscribe-failure", + provider: "openai", + modelId: "gpt-test", + model: testModel, + authStorage: {} as AuthStorage, + modelRegistry: {} as ModelRegistry, + thinkLevel: "off", + senderIsOwner: true, + disableMessageTool: true, + }), + ).rejects.toThrow(subscribeError); + + expect(hoisted.setActiveEmbeddedRunMock).not.toHaveBeenCalled(); + expect(hoisted.clearActiveEmbeddedRunMock).not.toHaveBeenCalled(); + expect(hoisted.flushPendingToolResultsAfterIdleMock).toHaveBeenCalledTimes(1); + expect(disposeMock).toHaveBeenCalledTimes(1); + expect(hoisted.releaseWsSessionMock).toHaveBeenCalledWith("embedded-session"); + expect(hoisted.sessionLockReleaseMock).toHaveBeenCalledTimes(1); + }); + + it("clears the active run only after idle flush and session lock release finish", async () => { + const workspaceDir = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-cleanup-workspace-")); + const agentDir = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-cleanup-agent-")); + const sessionFile = path.join(workspaceDir, "session.jsonl"); + tempPaths.push(workspaceDir, agentDir); + await fs.writeFile(sessionFile, "", "utf8"); + + hoisted.createAgentSessionMock.mockImplementation(async () => ({ + session: createDefaultEmbeddedSession(), + })); + + const result = await runEmbeddedAttempt({ + sessionId: "embedded-session", + sessionKey: "agent:main:test-cleanup-order", + sessionFile, + workspaceDir, + agentDir, + config: {}, + prompt: "hello", + timeoutMs: 10_000, + runId: "run-cleanup-order", + provider: "openai", + modelId: "gpt-test", + model: testModel, + authStorage: {} as AuthStorage, + modelRegistry: {} as ModelRegistry, + thinkLevel: "off", + senderIsOwner: true, + disableMessageTool: true, + }); + + expect(result.promptError).toBeNull(); + expect(hoisted.clearActiveEmbeddedRunMock).toHaveBeenCalledTimes(1); + expect(hoisted.flushPendingToolResultsAfterIdleMock).toHaveBeenCalledTimes(1); + expect(hoisted.sessionLockReleaseMock).toHaveBeenCalledTimes(1); + expect(hoisted.flushPendingToolResultsAfterIdleMock.mock.invocationCallOrder[0]).toBeLessThan( + hoisted.sessionLockReleaseMock.mock.invocationCallOrder[0] ?? Number.POSITIVE_INFINITY, + ); + expect(hoisted.sessionLockReleaseMock.mock.invocationCallOrder[0]).toBeLessThan( + hoisted.clearActiveEmbeddedRunMock.mock.invocationCallOrder[0] ?? + Number.POSITIVE_INFINITY, + ); + }); + + it("keeps the active run registered when session lock release fails", async () => { + const workspaceDir = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-cleanup-workspace-")); + const agentDir = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-cleanup-agent-")); + const sessionFile = path.join(workspaceDir, "session.jsonl"); + tempPaths.push(workspaceDir, agentDir); + await fs.writeFile(sessionFile, "", "utf8"); + + const lockReleaseError = new Error("lock release failed"); + hoisted.sessionLockReleaseMock.mockRejectedValueOnce(lockReleaseError); + hoisted.createAgentSessionMock.mockImplementation(async () => ({ + session: createDefaultEmbeddedSession(), + })); + + await expect( + runEmbeddedAttempt({ + sessionId: "embedded-session", + sessionKey: "agent:main:test-cleanup-lock-release", + sessionFile, + workspaceDir, + agentDir, + config: {}, + prompt: "hello", + timeoutMs: 10_000, + runId: "run-cleanup-lock-release", + provider: "openai", + modelId: "gpt-test", + model: testModel, + authStorage: {} as AuthStorage, + modelRegistry: {} as ModelRegistry, + thinkLevel: "off", + senderIsOwner: true, + disableMessageTool: true, + }), + ).rejects.toThrow(lockReleaseError); + + expect(hoisted.flushPendingToolResultsAfterIdleMock).toHaveBeenCalledTimes(1); + expect(hoisted.sessionLockReleaseMock).toHaveBeenCalledTimes(1); + expect(hoisted.clearActiveEmbeddedRunMock).not.toHaveBeenCalled(); + expect(hoisted.releaseWsSessionMock).toHaveBeenCalledWith("embedded-session"); + expect(hoisted.flushPendingToolResultsAfterIdleMock.mock.invocationCallOrder[0]).toBeLessThan( + hoisted.sessionLockReleaseMock.mock.invocationCallOrder[0] ?? Number.POSITIVE_INFINITY, + ); + }); +}); + describe("runEmbeddedAttempt bootstrap warning prompt assembly", () => { const tempPaths: string[] = []; diff --git a/src/agents/pi-embedded-runner/run/attempt.ts b/src/agents/pi-embedded-runner/run/attempt.ts index d785218f819..e1fdf6afd52 100644 --- a/src/agents/pi-embedded-runner/run/attempt.ts +++ b/src/agents/pi-embedded-runner/run/attempt.ts @@ -165,6 +165,29 @@ type PromptBuildHookRunner = { const SESSIONS_YIELD_INTERRUPT_CUSTOM_TYPE = "openclaw.sessions_yield_interrupt"; const SESSIONS_YIELD_CONTEXT_CUSTOM_TYPE = "openclaw.sessions_yield"; +async function runCleanupSteps( + steps: ReadonlyArray<{ label: string; run: () => void | Promise }>, +): Promise { + let firstError: unknown | undefined; + + for (const step of steps) { + try { + await step.run(); + } catch (err) { + if (firstError === undefined) { + firstError = err; + } + log.warn( + `embedded attempt cleanup failed during ${step.label}: ${describeUnknownError(err)}`, + ); + } + } + + if (firstError !== undefined) { + throw firstError; + } +} + // Persist a hidden context reminder so the next turn knows why the runner stopped. function buildSessionsYieldContextMessage(message: string): string { return `${message}\n\n[Context: The previous turn ended intentionally via sessions_yield while waiting for a follow-up event.]`; @@ -2009,6 +2032,7 @@ export async function runEmbeddedAttempt( let sessionManager: ReturnType | undefined; let session: Awaited>["session"] | undefined; + let queueHandle: EmbeddedPiQueueHandle | undefined; let removeToolResultContextGuard: (() => void) | undefined; try { await repairSessionFileIfNeeded({ @@ -2567,7 +2591,7 @@ export async function runEmbeddedAttempt( getCompactionCount, } = subscription; - const queueHandle: EmbeddedPiQueueHandle = { + queueHandle = { queueMessage: async (text: string) => { await activeSession.steer(text); }, @@ -3113,7 +3137,6 @@ export async function runEmbeddedAttempt( `CRITICAL: unsubscribe failed, possible resource leak: runId=${params.runId} ${String(err)}`, ); } - clearActiveEmbeddedRun(params.sessionId, queueHandle, params.sessionKey); params.abortSignal?.removeEventListener?.("abort", onAbort); } @@ -3193,17 +3216,63 @@ export async function runEmbeddedAttempt( // flushPendingToolResults() fires while tools are still executing, inserting // synthetic "missing tool result" errors and causing silent agent failures. // See: https://github.com/openclaw/openclaw/issues/8643 - removeToolResultContextGuard?.(); - await flushPendingToolResultsAfterIdle({ - agent: session?.agent, - sessionManager, - clearPendingOnTimeout: true, - }); - session?.dispose(); - releaseWsSession(params.sessionId); - await bundleMcpRuntime?.dispose(); - await bundleLspRuntime?.dispose(); - await sessionLock.release(); + let sessionLockReleased = false; + await runCleanupSteps([ + { + label: "tool result context guard removal", + run: () => { + removeToolResultContextGuard?.(); + }, + }, + { + label: "pending tool result flush", + run: () => + flushPendingToolResultsAfterIdle({ + agent: session?.agent, + sessionManager, + clearPendingOnTimeout: true, + }), + }, + { + label: "session dispose", + run: () => { + session?.dispose(); + }, + }, + { + label: "websocket session release", + run: () => { + releaseWsSession(params.sessionId); + }, + }, + { + label: "bundle MCP runtime dispose", + run: () => bundleMcpRuntime?.dispose(), + }, + { + label: "bundle LSP runtime dispose", + run: () => bundleLspRuntime?.dispose(), + }, + { + label: "session lock release", + run: async () => { + await sessionLock.release(); + sessionLockReleased = true; + }, + }, + { + // Keep the embedded run registered until idle flush and lock release finish. + // waitForEmbeddedPiRunEnd() and restart drains rely on this barrier so yielded + // or aborted runs do not look fully idle while descendant spawns/tool cleanup + // may still be settling. + label: "active embedded run clear", + run: () => { + if (queueHandle && sessionLockReleased) { + clearActiveEmbeddedRun(params.sessionId, queueHandle, params.sessionKey); + } + }, + }, + ]); } } finally { restoreSkillEnv?.(); diff --git a/src/agents/subagent-announce.registry-runtime-load-fallback.test.ts b/src/agents/subagent-announce.registry-runtime-load-fallback.test.ts new file mode 100644 index 00000000000..ea9da885a94 --- /dev/null +++ b/src/agents/subagent-announce.registry-runtime-load-fallback.test.ts @@ -0,0 +1,116 @@ +import { afterEach, describe, expect, it, vi } from "vitest"; + +type GatewayCall = { + method?: string; + timeoutMs?: number; + expectFinal?: boolean; + params?: Record; +}; + +const gatewayCalls: GatewayCall[] = []; +let registryRuntimeLoadAttempts = 0; + +async function importModuleWithRegistryRuntimeFailure() { + vi.resetModules(); + gatewayCalls.length = 0; + registryRuntimeLoadAttempts = 0; + + vi.doMock("../gateway/call.js", () => ({ + callGateway: vi.fn(async (request: GatewayCall) => { + gatewayCalls.push(request); + if (request.method === "chat.history") { + return { messages: [] }; + } + return {}; + }), + })); + + vi.doMock("../config/config.js", () => ({ + loadConfig: () => ({ + session: { + mainKey: "main", + scope: "per-sender", + }, + }), + })); + + vi.doMock("../config/sessions.js", () => ({ + loadSessionStore: vi.fn(() => ({ + "agent:main:main": { sessionId: "sess-main", updatedAt: 1 }, + "agent:main:subagent:worker": { sessionId: "sess-worker", updatedAt: 1 }, + })), + resolveAgentIdFromSessionKey: () => "main", + resolveStorePath: () => "/tmp/sessions-main.json", + resolveMainSessionKey: () => "agent:main:main", + })); + + vi.doMock("./pi-embedded.js", () => ({ + isEmbeddedPiRunActive: () => false, + queueEmbeddedPiMessage: () => false, + waitForEmbeddedPiRunEnd: async () => true, + })); + + vi.doMock("./subagent-depth.js", () => ({ + getSubagentDepthFromSessionStore: () => 0, + })); + + vi.doMock("./subagent-registry-runtime.js", () => { + registryRuntimeLoadAttempts += 1; + throw new Error("registry runtime load failed"); + }); + + return await import("./subagent-announce.js"); +} + +describe("subagent announce registry runtime fallback", () => { + afterEach(() => { + vi.resetModules(); + vi.doUnmock("../gateway/call.js"); + vi.doUnmock("../config/config.js"); + vi.doUnmock("../config/sessions.js"); + vi.doUnmock("./pi-embedded.js"); + vi.doUnmock("./subagent-depth.js"); + vi.doUnmock("./subagent-registry-runtime.js"); + gatewayCalls.length = 0; + registryRuntimeLoadAttempts = 0; + }); + + it("falls back to best-effort announce and retries runtime load after a failure", async () => { + const { runSubagentAnnounceFlow } = await importModuleWithRegistryRuntimeFailure(); + + const baseParams = { + childSessionKey: "agent:main:subagent:worker", + requesterSessionKey: "agent:main:main", + requesterDisplayKey: "main", + task: "do thing", + timeoutMs: 1_000, + cleanup: "keep" as const, + roundOneReply: "done", + waitForCompletion: false, + outcome: { status: "ok" as const }, + }; + + await expect( + runSubagentAnnounceFlow({ + ...baseParams, + childRunId: "run-registry-load-failure-1", + }), + ).resolves.toBe(true); + + await expect( + runSubagentAnnounceFlow({ + ...baseParams, + childRunId: "run-registry-load-failure-2", + }), + ).resolves.toBe(true); + + const directAgentCalls = gatewayCalls.filter( + (call) => call.method === "agent" && call.expectFinal === true, + ); + expect(directAgentCalls).toHaveLength(2); + expect(directAgentCalls.every((call) => call.params?.sessionKey === "agent:main:main")).toBe( + true, + ); + expect(registryRuntimeLoadAttempts).toBe(2); + }); +}); diff --git a/src/agents/subagent-announce.timeout.test.ts b/src/agents/subagent-announce.timeout.test.ts index 52cde0f69b0..56c1950c5eb 100644 --- a/src/agents/subagent-announce.timeout.test.ts +++ b/src/agents/subagent-announce.timeout.test.ts @@ -25,6 +25,7 @@ let requesterDepthResolver: (sessionKey?: string) => number = () => 0; let subagentSessionRunActive = true; let shouldIgnorePostCompletion = false; let pendingDescendantRuns = 0; +let registryCheckShouldThrow = false; let fallbackRequesterResolution: { requesterSessionKey: string; requesterOrigin?: { channel?: string; to?: string; accountId?: string }; @@ -68,7 +69,12 @@ vi.mock("./pi-embedded.js", () => ({ vi.mock("./subagent-registry.js", () => ({ countActiveDescendantRuns: () => 0, - countPendingDescendantRuns: () => pendingDescendantRuns, + countPendingDescendantRuns: () => { + if (registryCheckShouldThrow) { + throw new Error("registry unavailable"); + } + return pendingDescendantRuns; + }, listSubagentRunsForRequester: () => [], isSubagentSessionRunActive: () => subagentSessionRunActive, shouldIgnorePostCompletionAnnounceForSession: () => shouldIgnorePostCompletion, @@ -157,6 +163,7 @@ describe("subagent announce timeout config", () => { subagentSessionRunActive = true; shouldIgnorePostCompletion = false; pendingDescendantRuns = 0; + registryCheckShouldThrow = false; fallbackRequesterResolution = null; }); @@ -448,4 +455,27 @@ describe("subagent announce timeout config", () => { findGatewayCall((call) => call.method === "agent" && call.expectFinal === true), ).toBeUndefined(); }); + + it("regression, defers announce when registry check throws instead of sending premature completion", async () => { + registryCheckShouldThrow = true; + + const didAnnounce = await runAnnounceFlowForTest("run-registry-error"); + + expect(didAnnounce).toBe(false); + expect(findFinalDirectAgentCall()).toBeUndefined(); + }); + + it("regression, defers announce for yield-aborted subagent with pending child spawns", async () => { + requesterDepthResolver = () => 1; + pendingDescendantRuns = 1; + + const didAnnounce = await runAnnounceFlowForTest("run-yield-pending-children", { + requesterSessionKey: "agent:main:subagent:orchestrator", + requesterDisplayKey: "subagent:orchestrator", + childSessionKey: "agent:main:subagent:orchestrator:subagent:worker", + }); + + expect(didAnnounce).toBe(false); + expect(findFinalDirectAgentCall()).toBeUndefined(); + }); }); diff --git a/src/agents/subagent-announce.ts b/src/agents/subagent-announce.ts index ab2fbb1140e..4c2e59ae9ec 100644 --- a/src/agents/subagent-announce.ts +++ b/src/agents/subagent-announce.ts @@ -59,7 +59,10 @@ let subagentRegistryRuntimePromise: Promise< > | null = null; function loadSubagentRegistryRuntime() { - subagentRegistryRuntimePromise ??= import("./subagent-registry-runtime.js"); + subagentRegistryRuntimePromise ??= import("./subagent-registry-runtime.js").catch((err) => { + subagentRegistryRuntimePromise = null; + throw err; + }); return subagentRegistryRuntimePromise; } @@ -1332,37 +1335,49 @@ export async function runSubagentAnnounceFlow(params: { | undefined; try { subagentRegistryRuntime = await loadSubagentRegistryRuntime(); - if ( - requesterDepth >= 1 && - subagentRegistryRuntime.shouldIgnorePostCompletionAnnounceForSession( - targetRequesterSessionKey, - ) - ) { - return true; - } - - const pendingChildDescendantRuns = Math.max( - 0, - subagentRegistryRuntime.countPendingDescendantRuns(params.childSessionKey), + } catch (err) { + defaultRuntime.log( + `[warn] Subagent announce registry runtime load failed; using best-effort announce fallback: ${summarizeDeliveryError(err)}`, ); - if (pendingChildDescendantRuns > 0 && announceType !== "cron job") { + } + if (subagentRegistryRuntime) { + try { + if ( + requesterDepth >= 1 && + subagentRegistryRuntime.shouldIgnorePostCompletionAnnounceForSession( + targetRequesterSessionKey, + ) + ) { + return true; + } + + const pendingChildDescendantRuns = Math.max( + 0, + subagentRegistryRuntime.countPendingDescendantRuns(params.childSessionKey), + ); + if (pendingChildDescendantRuns > 0 && announceType !== "cron job") { + shouldDeleteChildSession = false; + return false; + } + + if (typeof subagentRegistryRuntime.listSubagentRunsForRequester === "function") { + const directChildren = subagentRegistryRuntime.listSubagentRunsForRequester( + params.childSessionKey, + { + requesterRunId: params.childRunId, + }, + ); + if (Array.isArray(directChildren) && directChildren.length > 0) { + childCompletionFindings = buildChildCompletionFindings(directChildren); + } + } + } catch { + // If the registry query fails after runtime load, defer the announce rather than + // risk a premature completion message. The deferred cleanup path will retry once + // descendants settle. shouldDeleteChildSession = false; return false; } - - if (typeof subagentRegistryRuntime.listSubagentRunsForRequester === "function") { - const directChildren = subagentRegistryRuntime.listSubagentRunsForRequester( - params.childSessionKey, - { - requesterRunId: params.childRunId, - }, - ); - if (Array.isArray(directChildren) && directChildren.length > 0) { - childCompletionFindings = buildChildCompletionFindings(directChildren); - } - } - } catch { - // Best-effort only. } const announceId = buildAnnounceIdFromChildRun({ @@ -1450,12 +1465,12 @@ export async function runSubagentAnnounceFlow(params: { const findings = childCompletionFindings || reply || "(no output)"; let requesterIsSubagent = requesterIsInternalSession(); - if (requesterIsSubagent) { + if (requesterIsSubagent && subagentRegistryRuntime) { const { isSubagentSessionRunActive, resolveRequesterForChildSession, shouldIgnorePostCompletionAnnounceForSession, - } = subagentRegistryRuntime ?? (await loadSubagentRegistryRuntime()); + } = subagentRegistryRuntime; if (!isSubagentSessionRunActive(targetRequesterSessionKey)) { if (shouldIgnorePostCompletionAnnounceForSession(targetRequesterSessionKey)) { return true; diff --git a/src/agents/subagent-announce.yield-spawn-race.test.ts b/src/agents/subagent-announce.yield-spawn-race.test.ts new file mode 100644 index 00000000000..1917265e733 --- /dev/null +++ b/src/agents/subagent-announce.yield-spawn-race.test.ts @@ -0,0 +1,124 @@ +import { beforeEach, describe, expect, it, vi } from "vitest"; + +type GatewayCall = { + method?: string; + timeoutMs?: number; + expectFinal?: boolean; + params?: Record; +}; + +const gatewayCalls: GatewayCall[] = []; +let callGatewayImpl: (request: GatewayCall) => Promise = async () => ({}); +let sessionStore: Record> = {}; +let configOverride: ReturnType<(typeof import("../config/config.js"))["loadConfig"]> = { + session: { + mainKey: "main", + scope: "per-sender", + }, +}; + +vi.mock("../gateway/call.js", () => ({ + callGateway: vi.fn(async (request: GatewayCall) => { + gatewayCalls.push(request); + return await callGatewayImpl(request); + }), +})); + +vi.mock("../config/config.js", async (importOriginal) => { + const actual = await importOriginal(); + return { + ...actual, + loadConfig: () => configOverride, + }; +}); + +vi.mock("../config/sessions.js", () => ({ + loadSessionStore: vi.fn(() => sessionStore), + resolveAgentIdFromSessionKey: () => "main", + resolveStorePath: () => "/tmp/sessions-main.json", + resolveMainSessionKey: () => "agent:main:main", +})); + +vi.mock("./pi-embedded.js", () => ({ + isEmbeddedPiRunActive: () => false, + queueEmbeddedPiMessage: () => false, + waitForEmbeddedPiRunEnd: async () => true, +})); + +import { runSubagentAnnounceFlow } from "./subagent-announce.js"; +import { + addSubagentRunForTests, + countPendingDescendantRuns, + resetSubagentRegistryForTests, +} from "./subagent-registry.js"; + +function findFinalDirectAgentCall(): GatewayCall | undefined { + return gatewayCalls.find((call) => call.method === "agent" && call.expectFinal === true); +} + +describe("subagent announce yield + spawn race", () => { + beforeEach(() => { + gatewayCalls.length = 0; + callGatewayImpl = async () => ({}); + sessionStore = {}; + configOverride = { + session: { + mainKey: "main", + scope: "per-sender", + }, + }; + resetSubagentRegistryForTests({ persist: false }); + }); + + it("defers announce when a yield-aborted parent still has a concurrently spawned pending child", async () => { + const parentSessionKey = "agent:main:subagent:orchestrator-race"; + + addSubagentRunForTests({ + runId: "run-orchestrator-race", + childSessionKey: parentSessionKey, + controllerSessionKey: "agent:main:main", + requesterSessionKey: "agent:main:main", + requesterDisplayKey: "main", + task: "orchestrate", + cleanup: "keep", + createdAt: 100, + startedAt: 100, + endedAt: 110, + expectsCompletionMessage: true, + }); + addSubagentRunForTests({ + runId: "run-worker-race", + childSessionKey: `${parentSessionKey}:subagent:worker`, + controllerSessionKey: parentSessionKey, + requesterSessionKey: parentSessionKey, + requesterDisplayKey: parentSessionKey, + task: "child task", + cleanup: "keep", + createdAt: 111, + startedAt: 111, + expectsCompletionMessage: true, + }); + + // Regression guard: when sessions_spawn commits before the announce check, + // the parent must still see the pending child and defer its completion. + expect(countPendingDescendantRuns(parentSessionKey)).toBe(1); + + const didAnnounce = await runSubagentAnnounceFlow({ + childSessionKey: parentSessionKey, + childRunId: "run-orchestrator-race", + requesterSessionKey: "agent:main:main", + requesterDisplayKey: "main", + task: "orchestrate", + timeoutMs: 1_000, + cleanup: "keep", + roundOneReply: "Yielded after concurrent sessions_spawn.", + waitForCompletion: false, + expectsCompletionMessage: true, + outcome: { status: "ok" }, + }); + + expect(didAnnounce).toBe(false); + expect(countPendingDescendantRuns(parentSessionKey)).toBe(1); + expect(findFinalDirectAgentCall()).toBeUndefined(); + }); +}); diff --git a/src/agents/subagent-registry.nested.e2e.test.ts b/src/agents/subagent-registry.nested.e2e.test.ts index 06148705986..7cb91472caf 100644 --- a/src/agents/subagent-registry.nested.e2e.test.ts +++ b/src/agents/subagent-registry.nested.e2e.test.ts @@ -326,4 +326,63 @@ describe("subagent registry nested agent tracking", () => { expect(countPendingDescendantRunsExcludingRun("agent:main:main", "run-self")).toBe(1); expect(countPendingDescendantRunsExcludingRun("agent:main:main", "run-sibling")).toBe(1); }); + + it("yield-aborted parent with active children reports pending descendants correctly", async () => { + const { addSubagentRunForTests, countPendingDescendantRuns, countActiveDescendantRuns } = + subagentRegistry; + + // Orchestrator yielded (ended) after spawning children — simulates sessions_yield + addSubagentRunForTests({ + runId: "run-orch-yielded", + childSessionKey: "agent:main:subagent:orch-yield", + requesterSessionKey: "agent:main:main", + requesterDisplayKey: "main", + task: "orchestrate with yield", + cleanup: "keep", + createdAt: 1, + startedAt: 1, + endedAt: 2, + cleanupHandled: false, + cleanupCompletedAt: undefined, + }); + + // Child spawned by orchestrator — still running (no endedAt) + addSubagentRunForTests({ + runId: "run-child-active", + childSessionKey: "agent:main:subagent:orch-yield:subagent:worker-1", + requesterSessionKey: "agent:main:subagent:orch-yield", + requesterDisplayKey: "orch-yield", + task: "worker task", + cleanup: "keep", + createdAt: 1, + startedAt: 2, + cleanupHandled: false, + }); + + // The orchestrator has pending descendants (child still running + orchestrator cleanup not done) + expect(countPendingDescendantRuns("agent:main:subagent:orch-yield")).toBe(1); + // The child is active under the orchestrator + expect(countActiveDescendantRuns("agent:main:subagent:orch-yield")).toBe(1); + // From main's perspective, the orchestrator itself is pending (cleanup not completed) + expect(countPendingDescendantRuns("agent:main:main")).toBe(2); + + // Child completes and finishes cleanup + addSubagentRunForTests({ + runId: "run-child-active", + childSessionKey: "agent:main:subagent:orch-yield:subagent:worker-1", + requesterSessionKey: "agent:main:subagent:orch-yield", + requesterDisplayKey: "orch-yield", + task: "worker task", + cleanup: "keep", + createdAt: 1, + startedAt: 2, + endedAt: 3, + cleanupHandled: true, + cleanupCompletedAt: 4, + }); + + // Now only the orchestrator itself remains pending (no cleanup yet) + expect(countPendingDescendantRuns("agent:main:subagent:orch-yield")).toBe(0); + expect(countPendingDescendantRuns("agent:main:main")).toBe(1); + }); });