Merge remote-tracking branch 'origin/main' into codex/cortex-openclaw-integration

This commit is contained in:
Junebugg1214 2026-03-20 23:20:32 -04:00
commit e2318d3cd1
3 changed files with 297 additions and 51 deletions

View File

@ -1,8 +1,5 @@
import { afterAll, beforeAll, beforeEach, describe, expect, it, vi } from "vitest";
const readLatestAssistantReplyMock = vi.fn<(sessionKey: string) => Promise<string | undefined>>(
async (_sessionKey: string) => undefined,
);
const chatHistoryMock = vi.fn<(sessionKey: string) => Promise<{ messages?: Array<unknown> }>>(
async (_sessionKey: string) => ({ messages: [] }),
);
@ -17,10 +14,6 @@ vi.mock("../gateway/call.js", () => ({
}),
}));
vi.mock("./tools/agent-step.js", () => ({
readLatestAssistantReply: readLatestAssistantReplyMock,
}));
describe("captureSubagentCompletionReply", () => {
let previousFastTestEnv: string | undefined;
let captureSubagentCompletionReply: (typeof import("./subagent-announce.js"))["captureSubagentCompletionReply"];
@ -40,23 +33,27 @@ describe("captureSubagentCompletionReply", () => {
});
beforeEach(() => {
readLatestAssistantReplyMock.mockReset().mockResolvedValue(undefined);
chatHistoryMock.mockReset().mockResolvedValue({ messages: [] });
});
it("returns immediate assistant output without polling", async () => {
readLatestAssistantReplyMock.mockResolvedValueOnce("Immediate assistant completion");
it("returns immediate assistant output from history without polling", async () => {
chatHistoryMock.mockResolvedValueOnce({
messages: [
{
role: "assistant",
content: [{ type: "text", text: "Immediate assistant completion" }],
},
],
});
const result = await captureSubagentCompletionReply("agent:main:subagent:child");
expect(result).toBe("Immediate assistant completion");
expect(readLatestAssistantReplyMock).toHaveBeenCalledTimes(1);
expect(chatHistoryMock).not.toHaveBeenCalled();
expect(chatHistoryMock).toHaveBeenCalledTimes(1);
});
it("polls briefly and returns late tool output once available", async () => {
vi.useFakeTimers();
readLatestAssistantReplyMock.mockResolvedValue(undefined);
chatHistoryMock.mockResolvedValueOnce({ messages: [] }).mockResolvedValueOnce({
messages: [
{
@ -82,7 +79,6 @@ describe("captureSubagentCompletionReply", () => {
it("returns undefined when no completion output arrives before retry window closes", async () => {
vi.useFakeTimers();
readLatestAssistantReplyMock.mockResolvedValue(undefined);
chatHistoryMock.mockResolvedValue({ messages: [] });
const pending = captureSubagentCompletionReply("agent:main:subagent:child");
@ -93,4 +89,26 @@ describe("captureSubagentCompletionReply", () => {
expect(chatHistoryMock).toHaveBeenCalled();
vi.useRealTimers();
});
it("returns partial assistant progress when the latest assistant turn is tool-only", async () => {
chatHistoryMock.mockResolvedValueOnce({
messages: [
{
role: "assistant",
content: [
{ type: "text", text: "Mapped the modules." },
{ type: "toolCall", id: "call-1", name: "read", arguments: {} },
],
},
{
role: "assistant",
content: [{ type: "toolCall", id: "call-2", name: "exec", arguments: {} }],
},
],
});
const result = await captureSubagentCompletionReply("agent:main:subagent:child");
expect(result).toBe("Mapped the modules.");
});
});

View File

@ -29,10 +29,14 @@ let fallbackRequesterResolution: {
requesterSessionKey: string;
requesterOrigin?: { channel?: string; to?: string; accountId?: string };
} | null = null;
let chatHistoryMessages: Array<Record<string, unknown>> = [];
vi.mock("../gateway/call.js", () => ({
callGateway: vi.fn(async (request: GatewayCall) => {
gatewayCalls.push(request);
if (request.method === "chat.history") {
return { messages: chatHistoryMessages };
}
return await callGatewayImpl(request);
}),
}));
@ -138,6 +142,7 @@ function setupParentSessionFallback(parentSessionKey: string): void {
describe("subagent announce timeout config", () => {
beforeEach(() => {
gatewayCalls.length = 0;
chatHistoryMessages = [];
callGatewayImpl = async (request) => {
if (request.method === "chat.history") {
return { messages: [] };
@ -270,7 +275,6 @@ describe("subagent announce timeout config", () => {
it("regression, routes child announce to parent session instead of grandparent when parent session still exists", async () => {
const parentSessionKey = "agent:main:subagent:parent";
setupParentSessionFallback(parentSessionKey);
// No sessionId on purpose: existence in store should still count as alive.
sessionStore[parentSessionKey] = { updatedAt: Date.now() };
await runAnnounceFlowForTest("run-parent-route", {
@ -301,4 +305,147 @@ describe("subagent announce timeout config", () => {
expect(directAgentCall?.params?.to).toBe("chan-main");
expect(directAgentCall?.params?.accountId).toBe("acct-main");
});
it("uses partial progress on timeout when the child only made tool calls", async () => {
chatHistoryMessages = [
{ role: "user", content: "do a complex task" },
{
role: "assistant",
content: [{ type: "toolCall", id: "call-1", name: "read", arguments: {} }],
},
{ role: "toolResult", toolCallId: "call-1", content: [{ type: "text", text: "data" }] },
{
role: "assistant",
content: [{ type: "toolCall", id: "call-2", name: "exec", arguments: {} }],
},
{
role: "assistant",
content: [{ type: "toolCall", id: "call-3", name: "search", arguments: {} }],
},
];
await runAnnounceFlowForTest("run-timeout-partial-progress", {
outcome: { status: "timeout" },
roundOneReply: undefined,
});
const directAgentCall = findFinalDirectAgentCall();
const internalEvents =
(directAgentCall?.params?.internalEvents as Array<{ result?: string }>) ?? [];
expect(internalEvents[0]?.result).toContain("3 tool call(s)");
expect(internalEvents[0]?.result).not.toContain("data");
});
it("preserves NO_REPLY when timeout history ends with silence after earlier progress", async () => {
chatHistoryMessages = [
{
role: "assistant",
content: [
{ type: "text", text: "Still working through the files." },
{ type: "toolCall", id: "call-1", name: "read", arguments: {} },
],
},
{
role: "assistant",
content: [{ type: "text", text: "NO_REPLY" }],
},
{
role: "assistant",
content: [{ type: "toolCall", id: "call-2", name: "exec", arguments: {} }],
},
];
await runAnnounceFlowForTest("run-timeout-no-reply", {
outcome: { status: "timeout" },
roundOneReply: undefined,
});
expect(findFinalDirectAgentCall()).toBeUndefined();
});
it("prefers visible assistant progress over a later raw tool result", async () => {
chatHistoryMessages = [
{
role: "assistant",
content: [{ type: "text", text: "Read 12 files. Narrowing the search now." }],
},
{
role: "toolResult",
content: [{ type: "text", text: "grep output" }],
},
];
await runAnnounceFlowForTest("run-timeout-visible-assistant", {
outcome: { status: "timeout" },
roundOneReply: undefined,
});
const directAgentCall = findFinalDirectAgentCall();
const internalEvents =
(directAgentCall?.params?.internalEvents as Array<{ result?: string }>) ?? [];
expect(internalEvents[0]?.result).toContain("Read 12 files");
expect(internalEvents[0]?.result).not.toContain("grep output");
});
it("preserves NO_REPLY when timeout partial-progress history mixes prior text and later silence", async () => {
chatHistoryMessages = [
{ role: "user", content: "do something" },
{
role: "assistant",
content: [
{ type: "text", text: "Still working through the files." },
{ type: "toolCall", id: "call1", name: "read", arguments: {} },
],
},
{ role: "toolResult", toolCallId: "call1", content: [{ type: "text", text: "data" }] },
{
role: "assistant",
content: [{ type: "text", text: "NO_REPLY" }],
},
{
role: "assistant",
content: [{ type: "toolCall", id: "call2", name: "exec", arguments: {} }],
},
];
await runAnnounceFlowForTest("run-timeout-mixed-no-reply", {
outcome: { status: "timeout" },
roundOneReply: undefined,
});
expect(
findGatewayCall((call) => call.method === "agent" && call.expectFinal === true),
).toBeUndefined();
});
it("prefers NO_REPLY partial progress over a longer latest assistant reply", async () => {
chatHistoryMessages = [
{ role: "user", content: "do something" },
{
role: "assistant",
content: [
{ type: "text", text: "Still working through the files." },
{ type: "toolCall", id: "call1", name: "read", arguments: {} },
],
},
{ role: "toolResult", toolCallId: "call1", content: [{ type: "text", text: "data" }] },
{
role: "assistant",
content: [{ type: "text", text: "NO_REPLY" }],
},
{
role: "assistant",
content: [{ type: "text", text: "A longer partial summary that should stay silent." }],
},
];
await runAnnounceFlowForTest("run-timeout-no-reply-overrides-latest-text", {
outcome: { status: "timeout" },
roundOneReply: undefined,
});
expect(
findGatewayCall((call) => call.method === "agent" && call.expectFinal === true),
).toBeUndefined();
});
});

View File

@ -47,7 +47,6 @@ import {
import { type AnnounceQueueItem, enqueueAnnounce } from "./subagent-announce-queue.js";
import { getSubagentDepthFromSessionStore } from "./subagent-depth.js";
import type { SpawnSubagentMode } from "./subagent-spawn.js";
import { readLatestAssistantReply } from "./tools/agent-step.js";
import { sanitizeTextContent, extractAssistantText } from "./tools/sessions-helpers.js";
import { isAnnounceSkip } from "./tools/sessions-send-helpers.js";
@ -55,7 +54,6 @@ const FAST_TEST_MODE = process.env.OPENCLAW_TEST_FAST === "1";
const FAST_TEST_RETRY_INTERVAL_MS = 8;
const DEFAULT_SUBAGENT_ANNOUNCE_TIMEOUT_MS = 90_000;
const MAX_TIMER_SAFE_TIMEOUT_MS = 2_147_000_000;
const GATEWAY_TIMEOUT_PATTERN = /gateway timeout/i;
let subagentRegistryRuntimePromise: Promise<
typeof import("./subagent-registry-runtime.js")
> | null = null;
@ -74,6 +72,14 @@ type ToolResultMessage = {
content?: unknown;
};
type SubagentOutputSnapshot = {
latestAssistantText?: string;
latestSilentText?: string;
latestRawText?: string;
assistantFragments: string[];
toolCallCount: number;
};
function resolveSubagentAnnounceTimeoutMs(cfg: ReturnType<typeof loadConfig>): number {
const configured = cfg.agents?.defaults?.subagents?.announceTimeoutMs;
if (typeof configured !== "number" || !Number.isFinite(configured)) {
@ -110,7 +116,7 @@ const TRANSIENT_ANNOUNCE_DELIVERY_ERROR_PATTERNS: readonly RegExp[] = [
/no active .* listener/i,
/gateway not connected/i,
/gateway closed \(1006/i,
GATEWAY_TIMEOUT_PATTERN,
/gateway timeout/i,
/\b(econnreset|econnrefused|etimedout|enotfound|ehostunreach|network error)\b/i,
];
@ -136,11 +142,6 @@ function isTransientAnnounceDeliveryError(error: unknown): boolean {
return TRANSIENT_ANNOUNCE_DELIVERY_ERROR_PATTERNS.some((re) => re.test(message));
}
function isGatewayTimeoutError(error: unknown): boolean {
const message = summarizeDeliveryError(error);
return Boolean(message) && GATEWAY_TIMEOUT_PATTERN.test(message);
}
async function waitForAnnounceRetryDelay(ms: number, signal?: AbortSignal): Promise<void> {
if (ms <= 0) {
return;
@ -168,7 +169,6 @@ async function waitForAnnounceRetryDelay(ms: number, signal?: AbortSignal): Prom
async function runAnnounceDeliveryWithRetry<T>(params: {
operation: string;
noRetryOnGatewayTimeout?: boolean;
signal?: AbortSignal;
run: () => Promise<T>;
}): Promise<T> {
@ -180,9 +180,6 @@ async function runAnnounceDeliveryWithRetry<T>(params: {
try {
return await params.run();
} catch (err) {
if (params.noRetryOnGatewayTimeout && isGatewayTimeoutError(err)) {
throw err;
}
const delayMs = DIRECT_ANNOUNCE_TRANSIENT_RETRY_DELAYS_MS[retryIndex];
if (delayMs == null || !isTransientAnnounceDeliveryError(err) || params.signal?.aborted) {
throw err;
@ -287,42 +284,126 @@ function extractSubagentOutputText(message: unknown): string {
return "";
}
async function readLatestSubagentOutput(sessionKey: string): Promise<string | undefined> {
try {
const latestAssistant = await readLatestAssistantReply({
sessionKey,
limit: 50,
});
if (latestAssistant?.trim()) {
return latestAssistant;
}
} catch {
// Best-effort: fall back to richer history parsing below.
function countAssistantToolCalls(content: unknown): number {
if (!Array.isArray(content)) {
return 0;
}
let count = 0;
for (const block of content) {
if (!block || typeof block !== "object") {
continue;
}
const type = (block as { type?: unknown }).type;
if (
type === "toolCall" ||
type === "tool_use" ||
type === "toolUse" ||
type === "functionCall" ||
type === "function_call"
) {
count += 1;
}
}
return count;
}
function summarizeSubagentOutputHistory(messages: Array<unknown>): SubagentOutputSnapshot {
const snapshot: SubagentOutputSnapshot = {
assistantFragments: [],
toolCallCount: 0,
};
for (const message of messages) {
if (!message || typeof message !== "object") {
continue;
}
const role = (message as { role?: unknown }).role;
if (role === "assistant") {
snapshot.toolCallCount += countAssistantToolCalls((message as { content?: unknown }).content);
const text = extractSubagentOutputText(message).trim();
if (!text) {
continue;
}
if (isAnnounceSkip(text) || isSilentReplyText(text, SILENT_REPLY_TOKEN)) {
snapshot.latestSilentText = text;
snapshot.latestAssistantText = undefined;
snapshot.assistantFragments = [];
continue;
}
snapshot.latestSilentText = undefined;
snapshot.latestAssistantText = text;
snapshot.assistantFragments.push(text);
continue;
}
const text = extractSubagentOutputText(message).trim();
if (text) {
snapshot.latestRawText = text;
}
}
return snapshot;
}
function formatSubagentPartialProgress(
snapshot: SubagentOutputSnapshot,
outcome?: SubagentRunOutcome,
): string | undefined {
if (snapshot.latestSilentText) {
return undefined;
}
const timedOut = outcome?.status === "timeout";
if (snapshot.assistantFragments.length === 0 && (!timedOut || snapshot.toolCallCount === 0)) {
return undefined;
}
const parts: string[] = [];
if (timedOut && snapshot.toolCallCount > 0) {
parts.push(
`[Partial progress: ${snapshot.toolCallCount} tool call(s) executed before timeout]`,
);
}
if (snapshot.assistantFragments.length > 0) {
parts.push(snapshot.assistantFragments.slice(-3).join("\n\n---\n\n"));
}
return parts.join("\n\n") || undefined;
}
function selectSubagentOutputText(
snapshot: SubagentOutputSnapshot,
outcome?: SubagentRunOutcome,
): string | undefined {
if (snapshot.latestSilentText) {
return snapshot.latestSilentText;
}
if (snapshot.latestAssistantText) {
return snapshot.latestAssistantText;
}
const partialProgress = formatSubagentPartialProgress(snapshot, outcome);
if (partialProgress) {
return partialProgress;
}
return snapshot.latestRawText;
}
async function readSubagentOutput(
sessionKey: string,
outcome?: SubagentRunOutcome,
): Promise<string | undefined> {
const history = await callGateway<{ messages?: Array<unknown> }>({
method: "chat.history",
params: { sessionKey, limit: 50 },
params: { sessionKey, limit: 100 },
});
const messages = Array.isArray(history?.messages) ? history.messages : [];
for (let i = messages.length - 1; i >= 0; i -= 1) {
const msg = messages[i];
const text = extractSubagentOutputText(msg);
if (text) {
return text;
}
}
return undefined;
return selectSubagentOutputText(summarizeSubagentOutputHistory(messages), outcome);
}
async function readLatestSubagentOutputWithRetry(params: {
sessionKey: string;
maxWaitMs: number;
outcome?: SubagentRunOutcome;
}): Promise<string | undefined> {
const RETRY_INTERVAL_MS = FAST_TEST_MODE ? FAST_TEST_RETRY_INTERVAL_MS : 100;
const deadline = Date.now() + Math.max(0, Math.min(params.maxWaitMs, 15_000));
let result: string | undefined;
while (Date.now() < deadline) {
result = await readLatestSubagentOutput(params.sessionKey);
result = await readSubagentOutput(params.sessionKey, params.outcome);
if (result?.trim()) {
return result;
}
@ -334,7 +415,7 @@ async function readLatestSubagentOutputWithRetry(params: {
export async function captureSubagentCompletionReply(
sessionKey: string,
): Promise<string | undefined> {
const immediate = await readLatestSubagentOutput(sessionKey);
const immediate = await readSubagentOutput(sessionKey);
if (immediate?.trim()) {
return immediate;
}
@ -811,7 +892,6 @@ async function sendSubagentAnnounceDirectly(params: {
operation: params.expectsCompletionMessage
? "completion direct announce agent call"
: "direct announce agent call",
noRetryOnGatewayTimeout: params.expectsCompletionMessage && shouldDeliverExternally,
signal: params.signal,
run: async () =>
await callGateway({
@ -1321,13 +1401,14 @@ export async function runSubagentAnnounceFlow(params: {
(isAnnounceSkip(fallbackReply) || isSilentReplyText(fallbackReply, SILENT_REPLY_TOKEN));
if (!reply) {
reply = await readLatestSubagentOutput(params.childSessionKey);
reply = await readSubagentOutput(params.childSessionKey, outcome);
}
if (!reply?.trim()) {
reply = await readLatestSubagentOutputWithRetry({
sessionKey: params.childSessionKey,
maxWaitMs: params.timeoutMs,
outcome,
});
}