fix: address codex review comments on #46741

- Move clearActiveEmbeddedRun AFTER session lock release in cleanup sequence
  so restart drains and waitForEmbeddedPiRunEnd barriers stay consistent
- Guard queueHandle access with sessionLockReleased flag to prevent TDZ errors
- Clear memoized registry promise on import failure so retries work
- Split registry load vs query errors: load failure falls back to best-effort
  announce; query failure still defers to avoid premature completion
- Guard requesterIsSubagent path with subagentRegistryRuntime availability
- Simplify runCleanupSteps: remove duplicate log.warn, use undefined check
- Fix missing closing brace in timeout test from rebase
This commit is contained in:
Joey Krug 2026-03-21 00:42:38 -04:00
parent 46c355bb52
commit f1ad5a4733
5 changed files with 239 additions and 66 deletions

View File

@ -573,13 +573,17 @@ describe("runEmbeddedAttempt cleanup", () => {
expect.any(Object),
"agent:main:test-cleanup",
);
expect(hoisted.clearActiveEmbeddedRunMock.mock.invocationCallOrder[0]).toBeLessThan(
hoisted.flushPendingToolResultsAfterIdleMock.mock.invocationCallOrder[0] ??
expect(hoisted.flushPendingToolResultsAfterIdleMock.mock.invocationCallOrder[0]).toBeLessThan(
hoisted.clearActiveEmbeddedRunMock.mock.invocationCallOrder[0] ??
Number.POSITIVE_INFINITY,
);
expect(disposeMock).toHaveBeenCalledTimes(1);
expect(hoisted.releaseWsSessionMock).toHaveBeenCalledWith("embedded-session");
expect(hoisted.sessionLockReleaseMock).toHaveBeenCalledTimes(1);
expect(hoisted.sessionLockReleaseMock.mock.invocationCallOrder[0]).toBeLessThan(
hoisted.clearActiveEmbeddedRunMock.mock.invocationCallOrder[0] ??
Number.POSITIVE_INFINITY,
);
});
it("skips active-run clear without masking the original error when subscribe fails before registration", async () => {
@ -631,7 +635,7 @@ describe("runEmbeddedAttempt cleanup", () => {
expect(hoisted.sessionLockReleaseMock).toHaveBeenCalledTimes(1);
});
it("clears the active run before waiting for the idle flush", async () => {
it("clears the active run only after idle flush and session lock release finish", async () => {
const workspaceDir = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-cleanup-workspace-"));
const agentDir = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-cleanup-agent-"));
const sessionFile = path.join(workspaceDir, "session.jsonl");
@ -665,11 +669,59 @@ describe("runEmbeddedAttempt cleanup", () => {
expect(result.promptError).toBeNull();
expect(hoisted.clearActiveEmbeddedRunMock).toHaveBeenCalledTimes(1);
expect(hoisted.flushPendingToolResultsAfterIdleMock).toHaveBeenCalledTimes(1);
expect(hoisted.clearActiveEmbeddedRunMock.mock.invocationCallOrder[0]).toBeLessThan(
hoisted.flushPendingToolResultsAfterIdleMock.mock.invocationCallOrder[0] ??
expect(hoisted.sessionLockReleaseMock).toHaveBeenCalledTimes(1);
expect(hoisted.flushPendingToolResultsAfterIdleMock.mock.invocationCallOrder[0]).toBeLessThan(
hoisted.sessionLockReleaseMock.mock.invocationCallOrder[0] ?? Number.POSITIVE_INFINITY,
);
expect(hoisted.sessionLockReleaseMock.mock.invocationCallOrder[0]).toBeLessThan(
hoisted.clearActiveEmbeddedRunMock.mock.invocationCallOrder[0] ??
Number.POSITIVE_INFINITY,
);
});
it("keeps the active run registered when session lock release fails", async () => {
const workspaceDir = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-cleanup-workspace-"));
const agentDir = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-cleanup-agent-"));
const sessionFile = path.join(workspaceDir, "session.jsonl");
tempPaths.push(workspaceDir, agentDir);
await fs.writeFile(sessionFile, "", "utf8");
const lockReleaseError = new Error("lock release failed");
hoisted.sessionLockReleaseMock.mockRejectedValueOnce(lockReleaseError);
hoisted.createAgentSessionMock.mockImplementation(async () => ({
session: createDefaultEmbeddedSession(),
}));
await expect(
runEmbeddedAttempt({
sessionId: "embedded-session",
sessionKey: "agent:main:test-cleanup-lock-release",
sessionFile,
workspaceDir,
agentDir,
config: {},
prompt: "hello",
timeoutMs: 10_000,
runId: "run-cleanup-lock-release",
provider: "openai",
modelId: "gpt-test",
model: testModel,
authStorage: {} as AuthStorage,
modelRegistry: {} as ModelRegistry,
thinkLevel: "off",
senderIsOwner: true,
disableMessageTool: true,
}),
).rejects.toThrow(lockReleaseError);
expect(hoisted.flushPendingToolResultsAfterIdleMock).toHaveBeenCalledTimes(1);
expect(hoisted.sessionLockReleaseMock).toHaveBeenCalledTimes(1);
expect(hoisted.clearActiveEmbeddedRunMock).not.toHaveBeenCalled();
expect(hoisted.releaseWsSessionMock).toHaveBeenCalledWith("embedded-session");
expect(hoisted.flushPendingToolResultsAfterIdleMock.mock.invocationCallOrder[0]).toBeLessThan(
hoisted.sessionLockReleaseMock.mock.invocationCallOrder[0] ?? Number.POSITIVE_INFINITY,
);
});
});
describe("runEmbeddedAttempt bootstrap warning prompt assembly", () => {

View File

@ -168,29 +168,22 @@ const SESSIONS_YIELD_CONTEXT_CUSTOM_TYPE = "openclaw.sessions_yield";
async function runCleanupSteps(
steps: ReadonlyArray<{ label: string; run: () => void | Promise<void> }>,
): Promise<void> {
let firstError: unknown;
let hasError = false;
let firstError: unknown | undefined;
for (const step of steps) {
try {
await step.run();
} catch (err) {
if (!hasError) {
if (firstError === undefined) {
firstError = err;
hasError = true;
log.warn(
`embedded attempt cleanup failed during ${step.label}: ${describeUnknownError(err)}`,
);
continue;
}
log.warn(
`embedded attempt cleanup failed during ${step.label}: ${describeUnknownError(err)}`,
);
}
}
if (hasError) {
if (firstError !== undefined) {
throw firstError;
}
}
@ -3223,6 +3216,7 @@ export async function runEmbeddedAttempt(
// flushPendingToolResults() fires while tools are still executing, inserting
// synthetic "missing tool result" errors and causing silent agent failures.
// See: https://github.com/openclaw/openclaw/issues/8643
let sessionLockReleased = false;
await runCleanupSteps([
{
label: "tool result context guard removal",
@ -3230,23 +3224,6 @@ export async function runEmbeddedAttempt(
removeToolResultContextGuard?.();
},
},
{
// Clear embedded run BEFORE flushing pending tool results so that
// waitForEmbeddedPiRunEnd resolves promptly. The flush can take up to 30s
// (idle-wait timeout in wait-for-idle-before-flush.ts), but several callers
// only wait 15s (session-reset-service.ts, commands-compact.ts). Clearing
// first ensures the run is marked ended within the waiter timeout budget.
//
// This is safe because runCleanupSteps executes each step in an isolated
// try/catch — if this step throws (e.g. queueHandle is somehow invalid),
// the flush and remaining teardown still proceed.
label: "active embedded run clear",
run: () => {
if (queueHandle) {
clearActiveEmbeddedRun(params.sessionId, queueHandle, params.sessionKey);
}
},
},
{
label: "pending tool result flush",
run: () =>
@ -3278,7 +3255,22 @@ export async function runEmbeddedAttempt(
},
{
label: "session lock release",
run: () => sessionLock.release(),
run: async () => {
await sessionLock.release();
sessionLockReleased = true;
},
},
{
// Keep the embedded run registered until idle flush and lock release finish.
// waitForEmbeddedPiRunEnd() and restart drains rely on this barrier so yielded
// or aborted runs do not look fully idle while descendant spawns/tool cleanup
// may still be settling.
label: "active embedded run clear",
run: () => {
if (queueHandle && sessionLockReleased) {
clearActiveEmbeddedRun(params.sessionId, queueHandle, params.sessionKey);
}
},
},
]);
}

View File

@ -0,0 +1,116 @@
import { afterEach, describe, expect, it, vi } from "vitest";
type GatewayCall = {
method?: string;
timeoutMs?: number;
expectFinal?: boolean;
params?: Record<string, unknown>;
};
const gatewayCalls: GatewayCall[] = [];
let registryRuntimeLoadAttempts = 0;
async function importModuleWithRegistryRuntimeFailure() {
vi.resetModules();
gatewayCalls.length = 0;
registryRuntimeLoadAttempts = 0;
vi.doMock("../gateway/call.js", () => ({
callGateway: vi.fn(async (request: GatewayCall) => {
gatewayCalls.push(request);
if (request.method === "chat.history") {
return { messages: [] };
}
return {};
}),
}));
vi.doMock("../config/config.js", () => ({
loadConfig: () => ({
session: {
mainKey: "main",
scope: "per-sender",
},
}),
}));
vi.doMock("../config/sessions.js", () => ({
loadSessionStore: vi.fn(() => ({
"agent:main:main": { sessionId: "sess-main", updatedAt: 1 },
"agent:main:subagent:worker": { sessionId: "sess-worker", updatedAt: 1 },
})),
resolveAgentIdFromSessionKey: () => "main",
resolveStorePath: () => "/tmp/sessions-main.json",
resolveMainSessionKey: () => "agent:main:main",
}));
vi.doMock("./pi-embedded.js", () => ({
isEmbeddedPiRunActive: () => false,
queueEmbeddedPiMessage: () => false,
waitForEmbeddedPiRunEnd: async () => true,
}));
vi.doMock("./subagent-depth.js", () => ({
getSubagentDepthFromSessionStore: () => 0,
}));
vi.doMock("./subagent-registry-runtime.js", () => {
registryRuntimeLoadAttempts += 1;
throw new Error("registry runtime load failed");
});
return await import("./subagent-announce.js");
}
describe("subagent announce registry runtime fallback", () => {
afterEach(() => {
vi.resetModules();
vi.doUnmock("../gateway/call.js");
vi.doUnmock("../config/config.js");
vi.doUnmock("../config/sessions.js");
vi.doUnmock("./pi-embedded.js");
vi.doUnmock("./subagent-depth.js");
vi.doUnmock("./subagent-registry-runtime.js");
gatewayCalls.length = 0;
registryRuntimeLoadAttempts = 0;
});
it("falls back to best-effort announce and retries runtime load after a failure", async () => {
const { runSubagentAnnounceFlow } = await importModuleWithRegistryRuntimeFailure();
const baseParams = {
childSessionKey: "agent:main:subagent:worker",
requesterSessionKey: "agent:main:main",
requesterDisplayKey: "main",
task: "do thing",
timeoutMs: 1_000,
cleanup: "keep" as const,
roundOneReply: "done",
waitForCompletion: false,
outcome: { status: "ok" as const },
};
await expect(
runSubagentAnnounceFlow({
...baseParams,
childRunId: "run-registry-load-failure-1",
}),
).resolves.toBe(true);
await expect(
runSubagentAnnounceFlow({
...baseParams,
childRunId: "run-registry-load-failure-2",
}),
).resolves.toBe(true);
const directAgentCalls = gatewayCalls.filter(
(call) => call.method === "agent" && call.expectFinal === true,
);
expect(directAgentCalls).toHaveLength(2);
expect(directAgentCalls.every((call) => call.params?.sessionKey === "agent:main:main")).toBe(
true,
);
expect(registryRuntimeLoadAttempts).toBe(2);
});
});

View File

@ -454,6 +454,7 @@ describe("subagent announce timeout config", () => {
expect(
findGatewayCall((call) => call.method === "agent" && call.expectFinal === true),
).toBeUndefined();
});
it("regression, defers announce when registry check throws instead of sending premature completion", async () => {
registryCheckShouldThrow = true;

View File

@ -59,7 +59,10 @@ let subagentRegistryRuntimePromise: Promise<
> | null = null;
function loadSubagentRegistryRuntime() {
subagentRegistryRuntimePromise ??= import("./subagent-registry-runtime.js");
subagentRegistryRuntimePromise ??= import("./subagent-registry-runtime.js").catch((err) => {
subagentRegistryRuntimePromise = null;
throw err;
});
return subagentRegistryRuntimePromise;
}
@ -1332,40 +1335,49 @@ export async function runSubagentAnnounceFlow(params: {
| undefined;
try {
subagentRegistryRuntime = await loadSubagentRegistryRuntime();
if (
requesterDepth >= 1 &&
subagentRegistryRuntime.shouldIgnorePostCompletionAnnounceForSession(
targetRequesterSessionKey,
)
) {
return true;
}
const pendingChildDescendantRuns = Math.max(
0,
subagentRegistryRuntime.countPendingDescendantRuns(params.childSessionKey),
} catch (err) {
defaultRuntime.log(
`[warn] Subagent announce registry runtime load failed; using best-effort announce fallback: ${summarizeDeliveryError(err)}`,
);
if (pendingChildDescendantRuns > 0 && announceType !== "cron job") {
}
if (subagentRegistryRuntime) {
try {
if (
requesterDepth >= 1 &&
subagentRegistryRuntime.shouldIgnorePostCompletionAnnounceForSession(
targetRequesterSessionKey,
)
) {
return true;
}
const pendingChildDescendantRuns = Math.max(
0,
subagentRegistryRuntime.countPendingDescendantRuns(params.childSessionKey),
);
if (pendingChildDescendantRuns > 0 && announceType !== "cron job") {
shouldDeleteChildSession = false;
return false;
}
if (typeof subagentRegistryRuntime.listSubagentRunsForRequester === "function") {
const directChildren = subagentRegistryRuntime.listSubagentRunsForRequester(
params.childSessionKey,
{
requesterRunId: params.childRunId,
},
);
if (Array.isArray(directChildren) && directChildren.length > 0) {
childCompletionFindings = buildChildCompletionFindings(directChildren);
}
}
} catch {
// If the registry query fails after runtime load, defer the announce rather than
// risk a premature completion message. The deferred cleanup path will retry once
// descendants settle.
shouldDeleteChildSession = false;
return false;
}
if (typeof subagentRegistryRuntime.listSubagentRunsForRequester === "function") {
const directChildren = subagentRegistryRuntime.listSubagentRunsForRequester(
params.childSessionKey,
{
requesterRunId: params.childRunId,
},
);
if (Array.isArray(directChildren) && directChildren.length > 0) {
childCompletionFindings = buildChildCompletionFindings(directChildren);
}
}
} catch {
// If the registry check fails, defer the announce rather than risk a premature
// completion message. The deferred cleanup path will retry once descendants settle.
shouldDeleteChildSession = false;
return false;
}
const announceId = buildAnnounceIdFromChildRun({
@ -1453,12 +1465,12 @@ export async function runSubagentAnnounceFlow(params: {
const findings = childCompletionFindings || reply || "(no output)";
let requesterIsSubagent = requesterIsInternalSession();
if (requesterIsSubagent) {
if (requesterIsSubagent && subagentRegistryRuntime) {
const {
isSubagentSessionRunActive,
resolveRequesterForChildSession,
shouldIgnorePostCompletionAnnounceForSession,
} = subagentRegistryRuntime ?? (await loadSubagentRegistryRuntime());
} = subagentRegistryRuntime;
if (!isSubagentSessionRunActive(targetRequesterSessionKey)) {
if (shouldIgnorePostCompletionAnnounceForSession(targetRequesterSessionKey)) {
return true;