Merge f1ad5a4733cca228805017857d51948211b22f89 into 6b4c24c2e55b5b4013277bd799525086f6a0c40f
This commit is contained in:
commit
52b97327df
@ -41,6 +41,12 @@ const hoisted = vi.hoisted(() => {
|
||||
const getGlobalHookRunnerMock = vi.fn<() => unknown>(() => undefined);
|
||||
const initializeGlobalHookRunnerMock = vi.fn();
|
||||
const runContextEngineMaintenanceMock = vi.fn(async (_params?: unknown) => undefined);
|
||||
const sessionLockReleaseMock = vi.fn(async () => {});
|
||||
const flushPendingToolResultsAfterIdleMock = vi.fn(async () => {});
|
||||
const setActiveEmbeddedRunMock = vi.fn();
|
||||
const clearActiveEmbeddedRunMock = vi.fn();
|
||||
const updateActiveEmbeddedRunSnapshotMock = vi.fn();
|
||||
const releaseWsSessionMock = vi.fn();
|
||||
const sessionManager = {
|
||||
getLeafEntry: vi.fn(() => null),
|
||||
branch: vi.fn(),
|
||||
@ -59,6 +65,12 @@ const hoisted = vi.hoisted(() => {
|
||||
getGlobalHookRunnerMock,
|
||||
initializeGlobalHookRunnerMock,
|
||||
runContextEngineMaintenanceMock,
|
||||
sessionLockReleaseMock,
|
||||
flushPendingToolResultsAfterIdleMock,
|
||||
setActiveEmbeddedRunMock,
|
||||
clearActiveEmbeddedRunMock,
|
||||
updateActiveEmbeddedRunSnapshotMock,
|
||||
releaseWsSessionMock,
|
||||
sessionManager,
|
||||
};
|
||||
});
|
||||
@ -177,12 +189,14 @@ vi.mock("../tool-result-context-guard.js", () => ({
|
||||
}));
|
||||
|
||||
vi.mock("../wait-for-idle-before-flush.js", () => ({
|
||||
flushPendingToolResultsAfterIdle: async () => {},
|
||||
flushPendingToolResultsAfterIdle: hoisted.flushPendingToolResultsAfterIdleMock,
|
||||
}));
|
||||
|
||||
vi.mock("../runs.js", () => ({
|
||||
setActiveEmbeddedRun: () => {},
|
||||
clearActiveEmbeddedRun: () => {},
|
||||
setActiveEmbeddedRun: (...args: unknown[]) => hoisted.setActiveEmbeddedRunMock(...args),
|
||||
clearActiveEmbeddedRun: (...args: unknown[]) => hoisted.clearActiveEmbeddedRunMock(...args),
|
||||
updateActiveEmbeddedRunSnapshot: (...args: unknown[]) =>
|
||||
hoisted.updateActiveEmbeddedRunSnapshotMock(...args),
|
||||
}));
|
||||
|
||||
vi.mock("./images.js", () => ({
|
||||
@ -214,7 +228,7 @@ vi.mock("../extra-params.js", () => ({
|
||||
|
||||
vi.mock("../../openai-ws-stream.js", () => ({
|
||||
createOpenAIWebSocketStreamFn: vi.fn(),
|
||||
releaseWsSession: () => {},
|
||||
releaseWsSession: (...args: unknown[]) => hoisted.releaseWsSessionMock(...args),
|
||||
}));
|
||||
|
||||
vi.mock("../../anthropic-payload-log.js", () => ({
|
||||
@ -299,7 +313,7 @@ function resetEmbeddedAttemptHarness(
|
||||
hoisted.sessionManagerOpenMock.mockReset().mockReturnValue(hoisted.sessionManager);
|
||||
hoisted.resolveSandboxContextMock.mockReset();
|
||||
hoisted.acquireSessionWriteLockMock.mockReset().mockResolvedValue({
|
||||
release: async () => {},
|
||||
release: hoisted.sessionLockReleaseMock,
|
||||
});
|
||||
hoisted.resolveBootstrapContextForRunMock.mockReset().mockResolvedValue({
|
||||
bootstrapFiles: [],
|
||||
@ -307,6 +321,12 @@ function resetEmbeddedAttemptHarness(
|
||||
});
|
||||
hoisted.getGlobalHookRunnerMock.mockReset().mockReturnValue(undefined);
|
||||
hoisted.runContextEngineMaintenanceMock.mockReset().mockResolvedValue(undefined);
|
||||
hoisted.sessionLockReleaseMock.mockReset().mockResolvedValue(undefined);
|
||||
hoisted.flushPendingToolResultsAfterIdleMock.mockReset().mockResolvedValue(undefined);
|
||||
hoisted.setActiveEmbeddedRunMock.mockReset();
|
||||
hoisted.clearActiveEmbeddedRunMock.mockReset();
|
||||
hoisted.updateActiveEmbeddedRunSnapshotMock.mockReset();
|
||||
hoisted.releaseWsSessionMock.mockReset();
|
||||
hoisted.sessionManager.getLeafEntry.mockReset().mockReturnValue(null);
|
||||
hoisted.sessionManager.branch.mockReset();
|
||||
hoisted.sessionManager.resetLeaf.mockReset();
|
||||
@ -494,6 +514,216 @@ describe("runEmbeddedAttempt sessions_spawn workspace inheritance", () => {
|
||||
});
|
||||
});
|
||||
|
||||
describe("runEmbeddedAttempt cleanup", () => {
|
||||
const tempPaths: string[] = [];
|
||||
|
||||
beforeEach(() => {
|
||||
resetEmbeddedAttemptHarness({
|
||||
subscribeImpl: createSubscriptionMock,
|
||||
});
|
||||
});
|
||||
|
||||
afterEach(async () => {
|
||||
await cleanupTempPaths(tempPaths);
|
||||
});
|
||||
|
||||
it("clears the active run and releases session resources when idle flush fails", async () => {
|
||||
const workspaceDir = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-cleanup-workspace-"));
|
||||
const agentDir = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-cleanup-agent-"));
|
||||
const sessionFile = path.join(workspaceDir, "session.jsonl");
|
||||
tempPaths.push(workspaceDir, agentDir);
|
||||
await fs.writeFile(sessionFile, "", "utf8");
|
||||
|
||||
const disposeMock = vi.fn();
|
||||
const flushError = new Error("flush failed");
|
||||
hoisted.flushPendingToolResultsAfterIdleMock.mockRejectedValueOnce(flushError);
|
||||
hoisted.createAgentSessionMock.mockImplementation(async () => ({
|
||||
session: {
|
||||
...createDefaultEmbeddedSession(),
|
||||
dispose: disposeMock,
|
||||
},
|
||||
}));
|
||||
|
||||
await expect(
|
||||
runEmbeddedAttempt({
|
||||
sessionId: "embedded-session",
|
||||
sessionKey: "agent:main:test-cleanup",
|
||||
sessionFile,
|
||||
workspaceDir,
|
||||
agentDir,
|
||||
config: {},
|
||||
prompt: "hello",
|
||||
timeoutMs: 10_000,
|
||||
runId: "run-cleanup-flush-failure",
|
||||
provider: "openai",
|
||||
modelId: "gpt-test",
|
||||
model: testModel,
|
||||
authStorage: {} as AuthStorage,
|
||||
modelRegistry: {} as ModelRegistry,
|
||||
thinkLevel: "off",
|
||||
senderIsOwner: true,
|
||||
disableMessageTool: true,
|
||||
}),
|
||||
).rejects.toThrow(flushError);
|
||||
|
||||
expect(hoisted.flushPendingToolResultsAfterIdleMock).toHaveBeenCalledTimes(1);
|
||||
expect(hoisted.setActiveEmbeddedRunMock).toHaveBeenCalledTimes(1);
|
||||
expect(hoisted.clearActiveEmbeddedRunMock).toHaveBeenCalledWith(
|
||||
"embedded-session",
|
||||
expect.any(Object),
|
||||
"agent:main:test-cleanup",
|
||||
);
|
||||
expect(hoisted.flushPendingToolResultsAfterIdleMock.mock.invocationCallOrder[0]).toBeLessThan(
|
||||
hoisted.clearActiveEmbeddedRunMock.mock.invocationCallOrder[0] ??
|
||||
Number.POSITIVE_INFINITY,
|
||||
);
|
||||
expect(disposeMock).toHaveBeenCalledTimes(1);
|
||||
expect(hoisted.releaseWsSessionMock).toHaveBeenCalledWith("embedded-session");
|
||||
expect(hoisted.sessionLockReleaseMock).toHaveBeenCalledTimes(1);
|
||||
expect(hoisted.sessionLockReleaseMock.mock.invocationCallOrder[0]).toBeLessThan(
|
||||
hoisted.clearActiveEmbeddedRunMock.mock.invocationCallOrder[0] ??
|
||||
Number.POSITIVE_INFINITY,
|
||||
);
|
||||
});
|
||||
|
||||
it("skips active-run clear without masking the original error when subscribe fails before registration", async () => {
|
||||
const workspaceDir = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-cleanup-workspace-"));
|
||||
const agentDir = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-cleanup-agent-"));
|
||||
const sessionFile = path.join(workspaceDir, "session.jsonl");
|
||||
tempPaths.push(workspaceDir, agentDir);
|
||||
await fs.writeFile(sessionFile, "", "utf8");
|
||||
|
||||
const disposeMock = vi.fn();
|
||||
const subscribeError = new Error("subscribe failed");
|
||||
hoisted.subscribeEmbeddedPiSessionMock.mockReset().mockImplementation(() => {
|
||||
throw subscribeError;
|
||||
});
|
||||
hoisted.createAgentSessionMock.mockImplementation(async () => ({
|
||||
session: {
|
||||
...createDefaultEmbeddedSession(),
|
||||
dispose: disposeMock,
|
||||
},
|
||||
}));
|
||||
|
||||
await expect(
|
||||
runEmbeddedAttempt({
|
||||
sessionId: "embedded-session",
|
||||
sessionKey: "agent:main:test-subscribe-failure",
|
||||
sessionFile,
|
||||
workspaceDir,
|
||||
agentDir,
|
||||
config: {},
|
||||
prompt: "hello",
|
||||
timeoutMs: 10_000,
|
||||
runId: "run-cleanup-subscribe-failure",
|
||||
provider: "openai",
|
||||
modelId: "gpt-test",
|
||||
model: testModel,
|
||||
authStorage: {} as AuthStorage,
|
||||
modelRegistry: {} as ModelRegistry,
|
||||
thinkLevel: "off",
|
||||
senderIsOwner: true,
|
||||
disableMessageTool: true,
|
||||
}),
|
||||
).rejects.toThrow(subscribeError);
|
||||
|
||||
expect(hoisted.setActiveEmbeddedRunMock).not.toHaveBeenCalled();
|
||||
expect(hoisted.clearActiveEmbeddedRunMock).not.toHaveBeenCalled();
|
||||
expect(hoisted.flushPendingToolResultsAfterIdleMock).toHaveBeenCalledTimes(1);
|
||||
expect(disposeMock).toHaveBeenCalledTimes(1);
|
||||
expect(hoisted.releaseWsSessionMock).toHaveBeenCalledWith("embedded-session");
|
||||
expect(hoisted.sessionLockReleaseMock).toHaveBeenCalledTimes(1);
|
||||
});
|
||||
|
||||
it("clears the active run only after idle flush and session lock release finish", async () => {
|
||||
const workspaceDir = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-cleanup-workspace-"));
|
||||
const agentDir = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-cleanup-agent-"));
|
||||
const sessionFile = path.join(workspaceDir, "session.jsonl");
|
||||
tempPaths.push(workspaceDir, agentDir);
|
||||
await fs.writeFile(sessionFile, "", "utf8");
|
||||
|
||||
hoisted.createAgentSessionMock.mockImplementation(async () => ({
|
||||
session: createDefaultEmbeddedSession(),
|
||||
}));
|
||||
|
||||
const result = await runEmbeddedAttempt({
|
||||
sessionId: "embedded-session",
|
||||
sessionKey: "agent:main:test-cleanup-order",
|
||||
sessionFile,
|
||||
workspaceDir,
|
||||
agentDir,
|
||||
config: {},
|
||||
prompt: "hello",
|
||||
timeoutMs: 10_000,
|
||||
runId: "run-cleanup-order",
|
||||
provider: "openai",
|
||||
modelId: "gpt-test",
|
||||
model: testModel,
|
||||
authStorage: {} as AuthStorage,
|
||||
modelRegistry: {} as ModelRegistry,
|
||||
thinkLevel: "off",
|
||||
senderIsOwner: true,
|
||||
disableMessageTool: true,
|
||||
});
|
||||
|
||||
expect(result.promptError).toBeNull();
|
||||
expect(hoisted.clearActiveEmbeddedRunMock).toHaveBeenCalledTimes(1);
|
||||
expect(hoisted.flushPendingToolResultsAfterIdleMock).toHaveBeenCalledTimes(1);
|
||||
expect(hoisted.sessionLockReleaseMock).toHaveBeenCalledTimes(1);
|
||||
expect(hoisted.flushPendingToolResultsAfterIdleMock.mock.invocationCallOrder[0]).toBeLessThan(
|
||||
hoisted.sessionLockReleaseMock.mock.invocationCallOrder[0] ?? Number.POSITIVE_INFINITY,
|
||||
);
|
||||
expect(hoisted.sessionLockReleaseMock.mock.invocationCallOrder[0]).toBeLessThan(
|
||||
hoisted.clearActiveEmbeddedRunMock.mock.invocationCallOrder[0] ??
|
||||
Number.POSITIVE_INFINITY,
|
||||
);
|
||||
});
|
||||
|
||||
it("keeps the active run registered when session lock release fails", async () => {
|
||||
const workspaceDir = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-cleanup-workspace-"));
|
||||
const agentDir = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-cleanup-agent-"));
|
||||
const sessionFile = path.join(workspaceDir, "session.jsonl");
|
||||
tempPaths.push(workspaceDir, agentDir);
|
||||
await fs.writeFile(sessionFile, "", "utf8");
|
||||
|
||||
const lockReleaseError = new Error("lock release failed");
|
||||
hoisted.sessionLockReleaseMock.mockRejectedValueOnce(lockReleaseError);
|
||||
hoisted.createAgentSessionMock.mockImplementation(async () => ({
|
||||
session: createDefaultEmbeddedSession(),
|
||||
}));
|
||||
|
||||
await expect(
|
||||
runEmbeddedAttempt({
|
||||
sessionId: "embedded-session",
|
||||
sessionKey: "agent:main:test-cleanup-lock-release",
|
||||
sessionFile,
|
||||
workspaceDir,
|
||||
agentDir,
|
||||
config: {},
|
||||
prompt: "hello",
|
||||
timeoutMs: 10_000,
|
||||
runId: "run-cleanup-lock-release",
|
||||
provider: "openai",
|
||||
modelId: "gpt-test",
|
||||
model: testModel,
|
||||
authStorage: {} as AuthStorage,
|
||||
modelRegistry: {} as ModelRegistry,
|
||||
thinkLevel: "off",
|
||||
senderIsOwner: true,
|
||||
disableMessageTool: true,
|
||||
}),
|
||||
).rejects.toThrow(lockReleaseError);
|
||||
|
||||
expect(hoisted.flushPendingToolResultsAfterIdleMock).toHaveBeenCalledTimes(1);
|
||||
expect(hoisted.sessionLockReleaseMock).toHaveBeenCalledTimes(1);
|
||||
expect(hoisted.clearActiveEmbeddedRunMock).not.toHaveBeenCalled();
|
||||
expect(hoisted.releaseWsSessionMock).toHaveBeenCalledWith("embedded-session");
|
||||
expect(hoisted.flushPendingToolResultsAfterIdleMock.mock.invocationCallOrder[0]).toBeLessThan(
|
||||
hoisted.sessionLockReleaseMock.mock.invocationCallOrder[0] ?? Number.POSITIVE_INFINITY,
|
||||
);
|
||||
});
|
||||
});
|
||||
|
||||
describe("runEmbeddedAttempt bootstrap warning prompt assembly", () => {
|
||||
const tempPaths: string[] = [];
|
||||
|
||||
|
||||
@ -165,6 +165,29 @@ type PromptBuildHookRunner = {
|
||||
const SESSIONS_YIELD_INTERRUPT_CUSTOM_TYPE = "openclaw.sessions_yield_interrupt";
|
||||
const SESSIONS_YIELD_CONTEXT_CUSTOM_TYPE = "openclaw.sessions_yield";
|
||||
|
||||
async function runCleanupSteps(
|
||||
steps: ReadonlyArray<{ label: string; run: () => void | Promise<void> }>,
|
||||
): Promise<void> {
|
||||
let firstError: unknown | undefined;
|
||||
|
||||
for (const step of steps) {
|
||||
try {
|
||||
await step.run();
|
||||
} catch (err) {
|
||||
if (firstError === undefined) {
|
||||
firstError = err;
|
||||
}
|
||||
log.warn(
|
||||
`embedded attempt cleanup failed during ${step.label}: ${describeUnknownError(err)}`,
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
if (firstError !== undefined) {
|
||||
throw firstError;
|
||||
}
|
||||
}
|
||||
|
||||
// Persist a hidden context reminder so the next turn knows why the runner stopped.
|
||||
function buildSessionsYieldContextMessage(message: string): string {
|
||||
return `${message}\n\n[Context: The previous turn ended intentionally via sessions_yield while waiting for a follow-up event.]`;
|
||||
@ -2009,6 +2032,7 @@ export async function runEmbeddedAttempt(
|
||||
|
||||
let sessionManager: ReturnType<typeof guardSessionManager> | undefined;
|
||||
let session: Awaited<ReturnType<typeof createAgentSession>>["session"] | undefined;
|
||||
let queueHandle: EmbeddedPiQueueHandle | undefined;
|
||||
let removeToolResultContextGuard: (() => void) | undefined;
|
||||
try {
|
||||
await repairSessionFileIfNeeded({
|
||||
@ -2567,7 +2591,7 @@ export async function runEmbeddedAttempt(
|
||||
getCompactionCount,
|
||||
} = subscription;
|
||||
|
||||
const queueHandle: EmbeddedPiQueueHandle = {
|
||||
queueHandle = {
|
||||
queueMessage: async (text: string) => {
|
||||
await activeSession.steer(text);
|
||||
},
|
||||
@ -3113,7 +3137,6 @@ export async function runEmbeddedAttempt(
|
||||
`CRITICAL: unsubscribe failed, possible resource leak: runId=${params.runId} ${String(err)}`,
|
||||
);
|
||||
}
|
||||
clearActiveEmbeddedRun(params.sessionId, queueHandle, params.sessionKey);
|
||||
params.abortSignal?.removeEventListener?.("abort", onAbort);
|
||||
}
|
||||
|
||||
@ -3193,17 +3216,63 @@ export async function runEmbeddedAttempt(
|
||||
// flushPendingToolResults() fires while tools are still executing, inserting
|
||||
// synthetic "missing tool result" errors and causing silent agent failures.
|
||||
// See: https://github.com/openclaw/openclaw/issues/8643
|
||||
removeToolResultContextGuard?.();
|
||||
await flushPendingToolResultsAfterIdle({
|
||||
agent: session?.agent,
|
||||
sessionManager,
|
||||
clearPendingOnTimeout: true,
|
||||
});
|
||||
session?.dispose();
|
||||
releaseWsSession(params.sessionId);
|
||||
await bundleMcpRuntime?.dispose();
|
||||
await bundleLspRuntime?.dispose();
|
||||
await sessionLock.release();
|
||||
let sessionLockReleased = false;
|
||||
await runCleanupSteps([
|
||||
{
|
||||
label: "tool result context guard removal",
|
||||
run: () => {
|
||||
removeToolResultContextGuard?.();
|
||||
},
|
||||
},
|
||||
{
|
||||
label: "pending tool result flush",
|
||||
run: () =>
|
||||
flushPendingToolResultsAfterIdle({
|
||||
agent: session?.agent,
|
||||
sessionManager,
|
||||
clearPendingOnTimeout: true,
|
||||
}),
|
||||
},
|
||||
{
|
||||
label: "session dispose",
|
||||
run: () => {
|
||||
session?.dispose();
|
||||
},
|
||||
},
|
||||
{
|
||||
label: "websocket session release",
|
||||
run: () => {
|
||||
releaseWsSession(params.sessionId);
|
||||
},
|
||||
},
|
||||
{
|
||||
label: "bundle MCP runtime dispose",
|
||||
run: () => bundleMcpRuntime?.dispose(),
|
||||
},
|
||||
{
|
||||
label: "bundle LSP runtime dispose",
|
||||
run: () => bundleLspRuntime?.dispose(),
|
||||
},
|
||||
{
|
||||
label: "session lock release",
|
||||
run: async () => {
|
||||
await sessionLock.release();
|
||||
sessionLockReleased = true;
|
||||
},
|
||||
},
|
||||
{
|
||||
// Keep the embedded run registered until idle flush and lock release finish.
|
||||
// waitForEmbeddedPiRunEnd() and restart drains rely on this barrier so yielded
|
||||
// or aborted runs do not look fully idle while descendant spawns/tool cleanup
|
||||
// may still be settling.
|
||||
label: "active embedded run clear",
|
||||
run: () => {
|
||||
if (queueHandle && sessionLockReleased) {
|
||||
clearActiveEmbeddedRun(params.sessionId, queueHandle, params.sessionKey);
|
||||
}
|
||||
},
|
||||
},
|
||||
]);
|
||||
}
|
||||
} finally {
|
||||
restoreSkillEnv?.();
|
||||
|
||||
@ -0,0 +1,116 @@
|
||||
import { afterEach, describe, expect, it, vi } from "vitest";
|
||||
|
||||
type GatewayCall = {
|
||||
method?: string;
|
||||
timeoutMs?: number;
|
||||
expectFinal?: boolean;
|
||||
params?: Record<string, unknown>;
|
||||
};
|
||||
|
||||
const gatewayCalls: GatewayCall[] = [];
|
||||
let registryRuntimeLoadAttempts = 0;
|
||||
|
||||
async function importModuleWithRegistryRuntimeFailure() {
|
||||
vi.resetModules();
|
||||
gatewayCalls.length = 0;
|
||||
registryRuntimeLoadAttempts = 0;
|
||||
|
||||
vi.doMock("../gateway/call.js", () => ({
|
||||
callGateway: vi.fn(async (request: GatewayCall) => {
|
||||
gatewayCalls.push(request);
|
||||
if (request.method === "chat.history") {
|
||||
return { messages: [] };
|
||||
}
|
||||
return {};
|
||||
}),
|
||||
}));
|
||||
|
||||
vi.doMock("../config/config.js", () => ({
|
||||
loadConfig: () => ({
|
||||
session: {
|
||||
mainKey: "main",
|
||||
scope: "per-sender",
|
||||
},
|
||||
}),
|
||||
}));
|
||||
|
||||
vi.doMock("../config/sessions.js", () => ({
|
||||
loadSessionStore: vi.fn(() => ({
|
||||
"agent:main:main": { sessionId: "sess-main", updatedAt: 1 },
|
||||
"agent:main:subagent:worker": { sessionId: "sess-worker", updatedAt: 1 },
|
||||
})),
|
||||
resolveAgentIdFromSessionKey: () => "main",
|
||||
resolveStorePath: () => "/tmp/sessions-main.json",
|
||||
resolveMainSessionKey: () => "agent:main:main",
|
||||
}));
|
||||
|
||||
vi.doMock("./pi-embedded.js", () => ({
|
||||
isEmbeddedPiRunActive: () => false,
|
||||
queueEmbeddedPiMessage: () => false,
|
||||
waitForEmbeddedPiRunEnd: async () => true,
|
||||
}));
|
||||
|
||||
vi.doMock("./subagent-depth.js", () => ({
|
||||
getSubagentDepthFromSessionStore: () => 0,
|
||||
}));
|
||||
|
||||
vi.doMock("./subagent-registry-runtime.js", () => {
|
||||
registryRuntimeLoadAttempts += 1;
|
||||
throw new Error("registry runtime load failed");
|
||||
});
|
||||
|
||||
return await import("./subagent-announce.js");
|
||||
}
|
||||
|
||||
describe("subagent announce registry runtime fallback", () => {
|
||||
afterEach(() => {
|
||||
vi.resetModules();
|
||||
vi.doUnmock("../gateway/call.js");
|
||||
vi.doUnmock("../config/config.js");
|
||||
vi.doUnmock("../config/sessions.js");
|
||||
vi.doUnmock("./pi-embedded.js");
|
||||
vi.doUnmock("./subagent-depth.js");
|
||||
vi.doUnmock("./subagent-registry-runtime.js");
|
||||
gatewayCalls.length = 0;
|
||||
registryRuntimeLoadAttempts = 0;
|
||||
});
|
||||
|
||||
it("falls back to best-effort announce and retries runtime load after a failure", async () => {
|
||||
const { runSubagentAnnounceFlow } = await importModuleWithRegistryRuntimeFailure();
|
||||
|
||||
const baseParams = {
|
||||
childSessionKey: "agent:main:subagent:worker",
|
||||
requesterSessionKey: "agent:main:main",
|
||||
requesterDisplayKey: "main",
|
||||
task: "do thing",
|
||||
timeoutMs: 1_000,
|
||||
cleanup: "keep" as const,
|
||||
roundOneReply: "done",
|
||||
waitForCompletion: false,
|
||||
outcome: { status: "ok" as const },
|
||||
};
|
||||
|
||||
await expect(
|
||||
runSubagentAnnounceFlow({
|
||||
...baseParams,
|
||||
childRunId: "run-registry-load-failure-1",
|
||||
}),
|
||||
).resolves.toBe(true);
|
||||
|
||||
await expect(
|
||||
runSubagentAnnounceFlow({
|
||||
...baseParams,
|
||||
childRunId: "run-registry-load-failure-2",
|
||||
}),
|
||||
).resolves.toBe(true);
|
||||
|
||||
const directAgentCalls = gatewayCalls.filter(
|
||||
(call) => call.method === "agent" && call.expectFinal === true,
|
||||
);
|
||||
expect(directAgentCalls).toHaveLength(2);
|
||||
expect(directAgentCalls.every((call) => call.params?.sessionKey === "agent:main:main")).toBe(
|
||||
true,
|
||||
);
|
||||
expect(registryRuntimeLoadAttempts).toBe(2);
|
||||
});
|
||||
});
|
||||
@ -25,6 +25,7 @@ let requesterDepthResolver: (sessionKey?: string) => number = () => 0;
|
||||
let subagentSessionRunActive = true;
|
||||
let shouldIgnorePostCompletion = false;
|
||||
let pendingDescendantRuns = 0;
|
||||
let registryCheckShouldThrow = false;
|
||||
let fallbackRequesterResolution: {
|
||||
requesterSessionKey: string;
|
||||
requesterOrigin?: { channel?: string; to?: string; accountId?: string };
|
||||
@ -68,7 +69,12 @@ vi.mock("./pi-embedded.js", () => ({
|
||||
|
||||
vi.mock("./subagent-registry.js", () => ({
|
||||
countActiveDescendantRuns: () => 0,
|
||||
countPendingDescendantRuns: () => pendingDescendantRuns,
|
||||
countPendingDescendantRuns: () => {
|
||||
if (registryCheckShouldThrow) {
|
||||
throw new Error("registry unavailable");
|
||||
}
|
||||
return pendingDescendantRuns;
|
||||
},
|
||||
listSubagentRunsForRequester: () => [],
|
||||
isSubagentSessionRunActive: () => subagentSessionRunActive,
|
||||
shouldIgnorePostCompletionAnnounceForSession: () => shouldIgnorePostCompletion,
|
||||
@ -157,6 +163,7 @@ describe("subagent announce timeout config", () => {
|
||||
subagentSessionRunActive = true;
|
||||
shouldIgnorePostCompletion = false;
|
||||
pendingDescendantRuns = 0;
|
||||
registryCheckShouldThrow = false;
|
||||
fallbackRequesterResolution = null;
|
||||
});
|
||||
|
||||
@ -448,4 +455,27 @@ describe("subagent announce timeout config", () => {
|
||||
findGatewayCall((call) => call.method === "agent" && call.expectFinal === true),
|
||||
).toBeUndefined();
|
||||
});
|
||||
|
||||
it("regression, defers announce when registry check throws instead of sending premature completion", async () => {
|
||||
registryCheckShouldThrow = true;
|
||||
|
||||
const didAnnounce = await runAnnounceFlowForTest("run-registry-error");
|
||||
|
||||
expect(didAnnounce).toBe(false);
|
||||
expect(findFinalDirectAgentCall()).toBeUndefined();
|
||||
});
|
||||
|
||||
it("regression, defers announce for yield-aborted subagent with pending child spawns", async () => {
|
||||
requesterDepthResolver = () => 1;
|
||||
pendingDescendantRuns = 1;
|
||||
|
||||
const didAnnounce = await runAnnounceFlowForTest("run-yield-pending-children", {
|
||||
requesterSessionKey: "agent:main:subagent:orchestrator",
|
||||
requesterDisplayKey: "subagent:orchestrator",
|
||||
childSessionKey: "agent:main:subagent:orchestrator:subagent:worker",
|
||||
});
|
||||
|
||||
expect(didAnnounce).toBe(false);
|
||||
expect(findFinalDirectAgentCall()).toBeUndefined();
|
||||
});
|
||||
});
|
||||
|
||||
@ -59,7 +59,10 @@ let subagentRegistryRuntimePromise: Promise<
|
||||
> | null = null;
|
||||
|
||||
function loadSubagentRegistryRuntime() {
|
||||
subagentRegistryRuntimePromise ??= import("./subagent-registry-runtime.js");
|
||||
subagentRegistryRuntimePromise ??= import("./subagent-registry-runtime.js").catch((err) => {
|
||||
subagentRegistryRuntimePromise = null;
|
||||
throw err;
|
||||
});
|
||||
return subagentRegistryRuntimePromise;
|
||||
}
|
||||
|
||||
@ -1332,37 +1335,49 @@ export async function runSubagentAnnounceFlow(params: {
|
||||
| undefined;
|
||||
try {
|
||||
subagentRegistryRuntime = await loadSubagentRegistryRuntime();
|
||||
if (
|
||||
requesterDepth >= 1 &&
|
||||
subagentRegistryRuntime.shouldIgnorePostCompletionAnnounceForSession(
|
||||
targetRequesterSessionKey,
|
||||
)
|
||||
) {
|
||||
return true;
|
||||
}
|
||||
|
||||
const pendingChildDescendantRuns = Math.max(
|
||||
0,
|
||||
subagentRegistryRuntime.countPendingDescendantRuns(params.childSessionKey),
|
||||
} catch (err) {
|
||||
defaultRuntime.log(
|
||||
`[warn] Subagent announce registry runtime load failed; using best-effort announce fallback: ${summarizeDeliveryError(err)}`,
|
||||
);
|
||||
if (pendingChildDescendantRuns > 0 && announceType !== "cron job") {
|
||||
}
|
||||
if (subagentRegistryRuntime) {
|
||||
try {
|
||||
if (
|
||||
requesterDepth >= 1 &&
|
||||
subagentRegistryRuntime.shouldIgnorePostCompletionAnnounceForSession(
|
||||
targetRequesterSessionKey,
|
||||
)
|
||||
) {
|
||||
return true;
|
||||
}
|
||||
|
||||
const pendingChildDescendantRuns = Math.max(
|
||||
0,
|
||||
subagentRegistryRuntime.countPendingDescendantRuns(params.childSessionKey),
|
||||
);
|
||||
if (pendingChildDescendantRuns > 0 && announceType !== "cron job") {
|
||||
shouldDeleteChildSession = false;
|
||||
return false;
|
||||
}
|
||||
|
||||
if (typeof subagentRegistryRuntime.listSubagentRunsForRequester === "function") {
|
||||
const directChildren = subagentRegistryRuntime.listSubagentRunsForRequester(
|
||||
params.childSessionKey,
|
||||
{
|
||||
requesterRunId: params.childRunId,
|
||||
},
|
||||
);
|
||||
if (Array.isArray(directChildren) && directChildren.length > 0) {
|
||||
childCompletionFindings = buildChildCompletionFindings(directChildren);
|
||||
}
|
||||
}
|
||||
} catch {
|
||||
// If the registry query fails after runtime load, defer the announce rather than
|
||||
// risk a premature completion message. The deferred cleanup path will retry once
|
||||
// descendants settle.
|
||||
shouldDeleteChildSession = false;
|
||||
return false;
|
||||
}
|
||||
|
||||
if (typeof subagentRegistryRuntime.listSubagentRunsForRequester === "function") {
|
||||
const directChildren = subagentRegistryRuntime.listSubagentRunsForRequester(
|
||||
params.childSessionKey,
|
||||
{
|
||||
requesterRunId: params.childRunId,
|
||||
},
|
||||
);
|
||||
if (Array.isArray(directChildren) && directChildren.length > 0) {
|
||||
childCompletionFindings = buildChildCompletionFindings(directChildren);
|
||||
}
|
||||
}
|
||||
} catch {
|
||||
// Best-effort only.
|
||||
}
|
||||
|
||||
const announceId = buildAnnounceIdFromChildRun({
|
||||
@ -1450,12 +1465,12 @@ export async function runSubagentAnnounceFlow(params: {
|
||||
const findings = childCompletionFindings || reply || "(no output)";
|
||||
|
||||
let requesterIsSubagent = requesterIsInternalSession();
|
||||
if (requesterIsSubagent) {
|
||||
if (requesterIsSubagent && subagentRegistryRuntime) {
|
||||
const {
|
||||
isSubagentSessionRunActive,
|
||||
resolveRequesterForChildSession,
|
||||
shouldIgnorePostCompletionAnnounceForSession,
|
||||
} = subagentRegistryRuntime ?? (await loadSubagentRegistryRuntime());
|
||||
} = subagentRegistryRuntime;
|
||||
if (!isSubagentSessionRunActive(targetRequesterSessionKey)) {
|
||||
if (shouldIgnorePostCompletionAnnounceForSession(targetRequesterSessionKey)) {
|
||||
return true;
|
||||
|
||||
124
src/agents/subagent-announce.yield-spawn-race.test.ts
Normal file
124
src/agents/subagent-announce.yield-spawn-race.test.ts
Normal file
@ -0,0 +1,124 @@
|
||||
import { beforeEach, describe, expect, it, vi } from "vitest";
|
||||
|
||||
type GatewayCall = {
|
||||
method?: string;
|
||||
timeoutMs?: number;
|
||||
expectFinal?: boolean;
|
||||
params?: Record<string, unknown>;
|
||||
};
|
||||
|
||||
const gatewayCalls: GatewayCall[] = [];
|
||||
let callGatewayImpl: (request: GatewayCall) => Promise<unknown> = async () => ({});
|
||||
let sessionStore: Record<string, Record<string, unknown>> = {};
|
||||
let configOverride: ReturnType<(typeof import("../config/config.js"))["loadConfig"]> = {
|
||||
session: {
|
||||
mainKey: "main",
|
||||
scope: "per-sender",
|
||||
},
|
||||
};
|
||||
|
||||
vi.mock("../gateway/call.js", () => ({
|
||||
callGateway: vi.fn(async (request: GatewayCall) => {
|
||||
gatewayCalls.push(request);
|
||||
return await callGatewayImpl(request);
|
||||
}),
|
||||
}));
|
||||
|
||||
vi.mock("../config/config.js", async (importOriginal) => {
|
||||
const actual = await importOriginal<typeof import("../config/config.js")>();
|
||||
return {
|
||||
...actual,
|
||||
loadConfig: () => configOverride,
|
||||
};
|
||||
});
|
||||
|
||||
vi.mock("../config/sessions.js", () => ({
|
||||
loadSessionStore: vi.fn(() => sessionStore),
|
||||
resolveAgentIdFromSessionKey: () => "main",
|
||||
resolveStorePath: () => "/tmp/sessions-main.json",
|
||||
resolveMainSessionKey: () => "agent:main:main",
|
||||
}));
|
||||
|
||||
vi.mock("./pi-embedded.js", () => ({
|
||||
isEmbeddedPiRunActive: () => false,
|
||||
queueEmbeddedPiMessage: () => false,
|
||||
waitForEmbeddedPiRunEnd: async () => true,
|
||||
}));
|
||||
|
||||
import { runSubagentAnnounceFlow } from "./subagent-announce.js";
|
||||
import {
|
||||
addSubagentRunForTests,
|
||||
countPendingDescendantRuns,
|
||||
resetSubagentRegistryForTests,
|
||||
} from "./subagent-registry.js";
|
||||
|
||||
function findFinalDirectAgentCall(): GatewayCall | undefined {
|
||||
return gatewayCalls.find((call) => call.method === "agent" && call.expectFinal === true);
|
||||
}
|
||||
|
||||
describe("subagent announce yield + spawn race", () => {
|
||||
beforeEach(() => {
|
||||
gatewayCalls.length = 0;
|
||||
callGatewayImpl = async () => ({});
|
||||
sessionStore = {};
|
||||
configOverride = {
|
||||
session: {
|
||||
mainKey: "main",
|
||||
scope: "per-sender",
|
||||
},
|
||||
};
|
||||
resetSubagentRegistryForTests({ persist: false });
|
||||
});
|
||||
|
||||
it("defers announce when a yield-aborted parent still has a concurrently spawned pending child", async () => {
|
||||
const parentSessionKey = "agent:main:subagent:orchestrator-race";
|
||||
|
||||
addSubagentRunForTests({
|
||||
runId: "run-orchestrator-race",
|
||||
childSessionKey: parentSessionKey,
|
||||
controllerSessionKey: "agent:main:main",
|
||||
requesterSessionKey: "agent:main:main",
|
||||
requesterDisplayKey: "main",
|
||||
task: "orchestrate",
|
||||
cleanup: "keep",
|
||||
createdAt: 100,
|
||||
startedAt: 100,
|
||||
endedAt: 110,
|
||||
expectsCompletionMessage: true,
|
||||
});
|
||||
addSubagentRunForTests({
|
||||
runId: "run-worker-race",
|
||||
childSessionKey: `${parentSessionKey}:subagent:worker`,
|
||||
controllerSessionKey: parentSessionKey,
|
||||
requesterSessionKey: parentSessionKey,
|
||||
requesterDisplayKey: parentSessionKey,
|
||||
task: "child task",
|
||||
cleanup: "keep",
|
||||
createdAt: 111,
|
||||
startedAt: 111,
|
||||
expectsCompletionMessage: true,
|
||||
});
|
||||
|
||||
// Regression guard: when sessions_spawn commits before the announce check,
|
||||
// the parent must still see the pending child and defer its completion.
|
||||
expect(countPendingDescendantRuns(parentSessionKey)).toBe(1);
|
||||
|
||||
const didAnnounce = await runSubagentAnnounceFlow({
|
||||
childSessionKey: parentSessionKey,
|
||||
childRunId: "run-orchestrator-race",
|
||||
requesterSessionKey: "agent:main:main",
|
||||
requesterDisplayKey: "main",
|
||||
task: "orchestrate",
|
||||
timeoutMs: 1_000,
|
||||
cleanup: "keep",
|
||||
roundOneReply: "Yielded after concurrent sessions_spawn.",
|
||||
waitForCompletion: false,
|
||||
expectsCompletionMessage: true,
|
||||
outcome: { status: "ok" },
|
||||
});
|
||||
|
||||
expect(didAnnounce).toBe(false);
|
||||
expect(countPendingDescendantRuns(parentSessionKey)).toBe(1);
|
||||
expect(findFinalDirectAgentCall()).toBeUndefined();
|
||||
});
|
||||
});
|
||||
@ -326,4 +326,63 @@ describe("subagent registry nested agent tracking", () => {
|
||||
expect(countPendingDescendantRunsExcludingRun("agent:main:main", "run-self")).toBe(1);
|
||||
expect(countPendingDescendantRunsExcludingRun("agent:main:main", "run-sibling")).toBe(1);
|
||||
});
|
||||
|
||||
it("yield-aborted parent with active children reports pending descendants correctly", async () => {
|
||||
const { addSubagentRunForTests, countPendingDescendantRuns, countActiveDescendantRuns } =
|
||||
subagentRegistry;
|
||||
|
||||
// Orchestrator yielded (ended) after spawning children — simulates sessions_yield
|
||||
addSubagentRunForTests({
|
||||
runId: "run-orch-yielded",
|
||||
childSessionKey: "agent:main:subagent:orch-yield",
|
||||
requesterSessionKey: "agent:main:main",
|
||||
requesterDisplayKey: "main",
|
||||
task: "orchestrate with yield",
|
||||
cleanup: "keep",
|
||||
createdAt: 1,
|
||||
startedAt: 1,
|
||||
endedAt: 2,
|
||||
cleanupHandled: false,
|
||||
cleanupCompletedAt: undefined,
|
||||
});
|
||||
|
||||
// Child spawned by orchestrator — still running (no endedAt)
|
||||
addSubagentRunForTests({
|
||||
runId: "run-child-active",
|
||||
childSessionKey: "agent:main:subagent:orch-yield:subagent:worker-1",
|
||||
requesterSessionKey: "agent:main:subagent:orch-yield",
|
||||
requesterDisplayKey: "orch-yield",
|
||||
task: "worker task",
|
||||
cleanup: "keep",
|
||||
createdAt: 1,
|
||||
startedAt: 2,
|
||||
cleanupHandled: false,
|
||||
});
|
||||
|
||||
// The orchestrator has pending descendants (child still running + orchestrator cleanup not done)
|
||||
expect(countPendingDescendantRuns("agent:main:subagent:orch-yield")).toBe(1);
|
||||
// The child is active under the orchestrator
|
||||
expect(countActiveDescendantRuns("agent:main:subagent:orch-yield")).toBe(1);
|
||||
// From main's perspective, the orchestrator itself is pending (cleanup not completed)
|
||||
expect(countPendingDescendantRuns("agent:main:main")).toBe(2);
|
||||
|
||||
// Child completes and finishes cleanup
|
||||
addSubagentRunForTests({
|
||||
runId: "run-child-active",
|
||||
childSessionKey: "agent:main:subagent:orch-yield:subagent:worker-1",
|
||||
requesterSessionKey: "agent:main:subagent:orch-yield",
|
||||
requesterDisplayKey: "orch-yield",
|
||||
task: "worker task",
|
||||
cleanup: "keep",
|
||||
createdAt: 1,
|
||||
startedAt: 2,
|
||||
endedAt: 3,
|
||||
cleanupHandled: true,
|
||||
cleanupCompletedAt: 4,
|
||||
});
|
||||
|
||||
// Now only the orchestrator itself remains pending (no cleanup yet)
|
||||
expect(countPendingDescendantRuns("agent:main:subagent:orch-yield")).toBe(0);
|
||||
expect(countPendingDescendantRuns("agent:main:main")).toBe(1);
|
||||
});
|
||||
});
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user