diff --git a/src/agents/subagent-announce.capture-completion-reply.test.ts b/src/agents/subagent-announce.capture-completion-reply.test.ts index 9511cd9ec8a..a2cbbb1faa5 100644 --- a/src/agents/subagent-announce.capture-completion-reply.test.ts +++ b/src/agents/subagent-announce.capture-completion-reply.test.ts @@ -1,8 +1,5 @@ import { afterAll, beforeAll, beforeEach, describe, expect, it, vi } from "vitest"; -const readLatestAssistantReplyMock = vi.fn<(sessionKey: string) => Promise>( - async (_sessionKey: string) => undefined, -); const chatHistoryMock = vi.fn<(sessionKey: string) => Promise<{ messages?: Array }>>( async (_sessionKey: string) => ({ messages: [] }), ); @@ -17,10 +14,6 @@ vi.mock("../gateway/call.js", () => ({ }), })); -vi.mock("./tools/agent-step.js", () => ({ - readLatestAssistantReply: readLatestAssistantReplyMock, -})); - describe("captureSubagentCompletionReply", () => { let previousFastTestEnv: string | undefined; let captureSubagentCompletionReply: (typeof import("./subagent-announce.js"))["captureSubagentCompletionReply"]; @@ -40,23 +33,27 @@ describe("captureSubagentCompletionReply", () => { }); beforeEach(() => { - readLatestAssistantReplyMock.mockReset().mockResolvedValue(undefined); chatHistoryMock.mockReset().mockResolvedValue({ messages: [] }); }); - it("returns immediate assistant output without polling", async () => { - readLatestAssistantReplyMock.mockResolvedValueOnce("Immediate assistant completion"); + it("returns immediate assistant output from history without polling", async () => { + chatHistoryMock.mockResolvedValueOnce({ + messages: [ + { + role: "assistant", + content: [{ type: "text", text: "Immediate assistant completion" }], + }, + ], + }); const result = await captureSubagentCompletionReply("agent:main:subagent:child"); expect(result).toBe("Immediate assistant completion"); - expect(readLatestAssistantReplyMock).toHaveBeenCalledTimes(1); - expect(chatHistoryMock).not.toHaveBeenCalled(); + expect(chatHistoryMock).toHaveBeenCalledTimes(1); }); it("polls briefly and returns late tool output once available", async () => { vi.useFakeTimers(); - readLatestAssistantReplyMock.mockResolvedValue(undefined); chatHistoryMock.mockResolvedValueOnce({ messages: [] }).mockResolvedValueOnce({ messages: [ { @@ -82,7 +79,6 @@ describe("captureSubagentCompletionReply", () => { it("returns undefined when no completion output arrives before retry window closes", async () => { vi.useFakeTimers(); - readLatestAssistantReplyMock.mockResolvedValue(undefined); chatHistoryMock.mockResolvedValue({ messages: [] }); const pending = captureSubagentCompletionReply("agent:main:subagent:child"); @@ -93,4 +89,26 @@ describe("captureSubagentCompletionReply", () => { expect(chatHistoryMock).toHaveBeenCalled(); vi.useRealTimers(); }); + + it("returns partial assistant progress when the latest assistant turn is tool-only", async () => { + chatHistoryMock.mockResolvedValueOnce({ + messages: [ + { + role: "assistant", + content: [ + { type: "text", text: "Mapped the modules." }, + { type: "toolCall", id: "call-1", name: "read", arguments: {} }, + ], + }, + { + role: "assistant", + content: [{ type: "toolCall", id: "call-2", name: "exec", arguments: {} }], + }, + ], + }); + + const result = await captureSubagentCompletionReply("agent:main:subagent:child"); + + expect(result).toBe("Mapped the modules."); + }); }); diff --git a/src/agents/subagent-announce.timeout.test.ts b/src/agents/subagent-announce.timeout.test.ts index 5fae988fe73..52cde0f69b0 100644 --- a/src/agents/subagent-announce.timeout.test.ts +++ b/src/agents/subagent-announce.timeout.test.ts @@ -29,10 +29,14 @@ let fallbackRequesterResolution: { requesterSessionKey: string; requesterOrigin?: { channel?: string; to?: string; accountId?: string }; } | null = null; +let chatHistoryMessages: Array> = []; vi.mock("../gateway/call.js", () => ({ callGateway: vi.fn(async (request: GatewayCall) => { gatewayCalls.push(request); + if (request.method === "chat.history") { + return { messages: chatHistoryMessages }; + } return await callGatewayImpl(request); }), })); @@ -138,6 +142,7 @@ function setupParentSessionFallback(parentSessionKey: string): void { describe("subagent announce timeout config", () => { beforeEach(() => { gatewayCalls.length = 0; + chatHistoryMessages = []; callGatewayImpl = async (request) => { if (request.method === "chat.history") { return { messages: [] }; @@ -270,7 +275,6 @@ describe("subagent announce timeout config", () => { it("regression, routes child announce to parent session instead of grandparent when parent session still exists", async () => { const parentSessionKey = "agent:main:subagent:parent"; setupParentSessionFallback(parentSessionKey); - // No sessionId on purpose: existence in store should still count as alive. sessionStore[parentSessionKey] = { updatedAt: Date.now() }; await runAnnounceFlowForTest("run-parent-route", { @@ -301,4 +305,147 @@ describe("subagent announce timeout config", () => { expect(directAgentCall?.params?.to).toBe("chan-main"); expect(directAgentCall?.params?.accountId).toBe("acct-main"); }); + + it("uses partial progress on timeout when the child only made tool calls", async () => { + chatHistoryMessages = [ + { role: "user", content: "do a complex task" }, + { + role: "assistant", + content: [{ type: "toolCall", id: "call-1", name: "read", arguments: {} }], + }, + { role: "toolResult", toolCallId: "call-1", content: [{ type: "text", text: "data" }] }, + { + role: "assistant", + content: [{ type: "toolCall", id: "call-2", name: "exec", arguments: {} }], + }, + { + role: "assistant", + content: [{ type: "toolCall", id: "call-3", name: "search", arguments: {} }], + }, + ]; + + await runAnnounceFlowForTest("run-timeout-partial-progress", { + outcome: { status: "timeout" }, + roundOneReply: undefined, + }); + + const directAgentCall = findFinalDirectAgentCall(); + const internalEvents = + (directAgentCall?.params?.internalEvents as Array<{ result?: string }>) ?? []; + expect(internalEvents[0]?.result).toContain("3 tool call(s)"); + expect(internalEvents[0]?.result).not.toContain("data"); + }); + + it("preserves NO_REPLY when timeout history ends with silence after earlier progress", async () => { + chatHistoryMessages = [ + { + role: "assistant", + content: [ + { type: "text", text: "Still working through the files." }, + { type: "toolCall", id: "call-1", name: "read", arguments: {} }, + ], + }, + { + role: "assistant", + content: [{ type: "text", text: "NO_REPLY" }], + }, + { + role: "assistant", + content: [{ type: "toolCall", id: "call-2", name: "exec", arguments: {} }], + }, + ]; + + await runAnnounceFlowForTest("run-timeout-no-reply", { + outcome: { status: "timeout" }, + roundOneReply: undefined, + }); + + expect(findFinalDirectAgentCall()).toBeUndefined(); + }); + + it("prefers visible assistant progress over a later raw tool result", async () => { + chatHistoryMessages = [ + { + role: "assistant", + content: [{ type: "text", text: "Read 12 files. Narrowing the search now." }], + }, + { + role: "toolResult", + content: [{ type: "text", text: "grep output" }], + }, + ]; + + await runAnnounceFlowForTest("run-timeout-visible-assistant", { + outcome: { status: "timeout" }, + roundOneReply: undefined, + }); + + const directAgentCall = findFinalDirectAgentCall(); + const internalEvents = + (directAgentCall?.params?.internalEvents as Array<{ result?: string }>) ?? []; + expect(internalEvents[0]?.result).toContain("Read 12 files"); + expect(internalEvents[0]?.result).not.toContain("grep output"); + }); + + it("preserves NO_REPLY when timeout partial-progress history mixes prior text and later silence", async () => { + chatHistoryMessages = [ + { role: "user", content: "do something" }, + { + role: "assistant", + content: [ + { type: "text", text: "Still working through the files." }, + { type: "toolCall", id: "call1", name: "read", arguments: {} }, + ], + }, + { role: "toolResult", toolCallId: "call1", content: [{ type: "text", text: "data" }] }, + { + role: "assistant", + content: [{ type: "text", text: "NO_REPLY" }], + }, + { + role: "assistant", + content: [{ type: "toolCall", id: "call2", name: "exec", arguments: {} }], + }, + ]; + + await runAnnounceFlowForTest("run-timeout-mixed-no-reply", { + outcome: { status: "timeout" }, + roundOneReply: undefined, + }); + + expect( + findGatewayCall((call) => call.method === "agent" && call.expectFinal === true), + ).toBeUndefined(); + }); + + it("prefers NO_REPLY partial progress over a longer latest assistant reply", async () => { + chatHistoryMessages = [ + { role: "user", content: "do something" }, + { + role: "assistant", + content: [ + { type: "text", text: "Still working through the files." }, + { type: "toolCall", id: "call1", name: "read", arguments: {} }, + ], + }, + { role: "toolResult", toolCallId: "call1", content: [{ type: "text", text: "data" }] }, + { + role: "assistant", + content: [{ type: "text", text: "NO_REPLY" }], + }, + { + role: "assistant", + content: [{ type: "text", text: "A longer partial summary that should stay silent." }], + }, + ]; + + await runAnnounceFlowForTest("run-timeout-no-reply-overrides-latest-text", { + outcome: { status: "timeout" }, + roundOneReply: undefined, + }); + + expect( + findGatewayCall((call) => call.method === "agent" && call.expectFinal === true), + ).toBeUndefined(); + }); }); diff --git a/src/agents/subagent-announce.ts b/src/agents/subagent-announce.ts index eeef9db6b9b..ab2fbb1140e 100644 --- a/src/agents/subagent-announce.ts +++ b/src/agents/subagent-announce.ts @@ -47,7 +47,6 @@ import { import { type AnnounceQueueItem, enqueueAnnounce } from "./subagent-announce-queue.js"; import { getSubagentDepthFromSessionStore } from "./subagent-depth.js"; import type { SpawnSubagentMode } from "./subagent-spawn.js"; -import { readLatestAssistantReply } from "./tools/agent-step.js"; import { sanitizeTextContent, extractAssistantText } from "./tools/sessions-helpers.js"; import { isAnnounceSkip } from "./tools/sessions-send-helpers.js"; @@ -55,7 +54,6 @@ const FAST_TEST_MODE = process.env.OPENCLAW_TEST_FAST === "1"; const FAST_TEST_RETRY_INTERVAL_MS = 8; const DEFAULT_SUBAGENT_ANNOUNCE_TIMEOUT_MS = 90_000; const MAX_TIMER_SAFE_TIMEOUT_MS = 2_147_000_000; -const GATEWAY_TIMEOUT_PATTERN = /gateway timeout/i; let subagentRegistryRuntimePromise: Promise< typeof import("./subagent-registry-runtime.js") > | null = null; @@ -74,6 +72,14 @@ type ToolResultMessage = { content?: unknown; }; +type SubagentOutputSnapshot = { + latestAssistantText?: string; + latestSilentText?: string; + latestRawText?: string; + assistantFragments: string[]; + toolCallCount: number; +}; + function resolveSubagentAnnounceTimeoutMs(cfg: ReturnType): number { const configured = cfg.agents?.defaults?.subagents?.announceTimeoutMs; if (typeof configured !== "number" || !Number.isFinite(configured)) { @@ -110,7 +116,7 @@ const TRANSIENT_ANNOUNCE_DELIVERY_ERROR_PATTERNS: readonly RegExp[] = [ /no active .* listener/i, /gateway not connected/i, /gateway closed \(1006/i, - GATEWAY_TIMEOUT_PATTERN, + /gateway timeout/i, /\b(econnreset|econnrefused|etimedout|enotfound|ehostunreach|network error)\b/i, ]; @@ -136,11 +142,6 @@ function isTransientAnnounceDeliveryError(error: unknown): boolean { return TRANSIENT_ANNOUNCE_DELIVERY_ERROR_PATTERNS.some((re) => re.test(message)); } -function isGatewayTimeoutError(error: unknown): boolean { - const message = summarizeDeliveryError(error); - return Boolean(message) && GATEWAY_TIMEOUT_PATTERN.test(message); -} - async function waitForAnnounceRetryDelay(ms: number, signal?: AbortSignal): Promise { if (ms <= 0) { return; @@ -168,7 +169,6 @@ async function waitForAnnounceRetryDelay(ms: number, signal?: AbortSignal): Prom async function runAnnounceDeliveryWithRetry(params: { operation: string; - noRetryOnGatewayTimeout?: boolean; signal?: AbortSignal; run: () => Promise; }): Promise { @@ -180,9 +180,6 @@ async function runAnnounceDeliveryWithRetry(params: { try { return await params.run(); } catch (err) { - if (params.noRetryOnGatewayTimeout && isGatewayTimeoutError(err)) { - throw err; - } const delayMs = DIRECT_ANNOUNCE_TRANSIENT_RETRY_DELAYS_MS[retryIndex]; if (delayMs == null || !isTransientAnnounceDeliveryError(err) || params.signal?.aborted) { throw err; @@ -287,42 +284,126 @@ function extractSubagentOutputText(message: unknown): string { return ""; } -async function readLatestSubagentOutput(sessionKey: string): Promise { - try { - const latestAssistant = await readLatestAssistantReply({ - sessionKey, - limit: 50, - }); - if (latestAssistant?.trim()) { - return latestAssistant; - } - } catch { - // Best-effort: fall back to richer history parsing below. +function countAssistantToolCalls(content: unknown): number { + if (!Array.isArray(content)) { + return 0; } + let count = 0; + for (const block of content) { + if (!block || typeof block !== "object") { + continue; + } + const type = (block as { type?: unknown }).type; + if ( + type === "toolCall" || + type === "tool_use" || + type === "toolUse" || + type === "functionCall" || + type === "function_call" + ) { + count += 1; + } + } + return count; +} + +function summarizeSubagentOutputHistory(messages: Array): SubagentOutputSnapshot { + const snapshot: SubagentOutputSnapshot = { + assistantFragments: [], + toolCallCount: 0, + }; + for (const message of messages) { + if (!message || typeof message !== "object") { + continue; + } + const role = (message as { role?: unknown }).role; + if (role === "assistant") { + snapshot.toolCallCount += countAssistantToolCalls((message as { content?: unknown }).content); + const text = extractSubagentOutputText(message).trim(); + if (!text) { + continue; + } + if (isAnnounceSkip(text) || isSilentReplyText(text, SILENT_REPLY_TOKEN)) { + snapshot.latestSilentText = text; + snapshot.latestAssistantText = undefined; + snapshot.assistantFragments = []; + continue; + } + snapshot.latestSilentText = undefined; + snapshot.latestAssistantText = text; + snapshot.assistantFragments.push(text); + continue; + } + const text = extractSubagentOutputText(message).trim(); + if (text) { + snapshot.latestRawText = text; + } + } + return snapshot; +} + +function formatSubagentPartialProgress( + snapshot: SubagentOutputSnapshot, + outcome?: SubagentRunOutcome, +): string | undefined { + if (snapshot.latestSilentText) { + return undefined; + } + const timedOut = outcome?.status === "timeout"; + if (snapshot.assistantFragments.length === 0 && (!timedOut || snapshot.toolCallCount === 0)) { + return undefined; + } + const parts: string[] = []; + if (timedOut && snapshot.toolCallCount > 0) { + parts.push( + `[Partial progress: ${snapshot.toolCallCount} tool call(s) executed before timeout]`, + ); + } + if (snapshot.assistantFragments.length > 0) { + parts.push(snapshot.assistantFragments.slice(-3).join("\n\n---\n\n")); + } + return parts.join("\n\n") || undefined; +} + +function selectSubagentOutputText( + snapshot: SubagentOutputSnapshot, + outcome?: SubagentRunOutcome, +): string | undefined { + if (snapshot.latestSilentText) { + return snapshot.latestSilentText; + } + if (snapshot.latestAssistantText) { + return snapshot.latestAssistantText; + } + const partialProgress = formatSubagentPartialProgress(snapshot, outcome); + if (partialProgress) { + return partialProgress; + } + return snapshot.latestRawText; +} + +async function readSubagentOutput( + sessionKey: string, + outcome?: SubagentRunOutcome, +): Promise { const history = await callGateway<{ messages?: Array }>({ method: "chat.history", - params: { sessionKey, limit: 50 }, + params: { sessionKey, limit: 100 }, }); const messages = Array.isArray(history?.messages) ? history.messages : []; - for (let i = messages.length - 1; i >= 0; i -= 1) { - const msg = messages[i]; - const text = extractSubagentOutputText(msg); - if (text) { - return text; - } - } - return undefined; + return selectSubagentOutputText(summarizeSubagentOutputHistory(messages), outcome); } async function readLatestSubagentOutputWithRetry(params: { sessionKey: string; maxWaitMs: number; + outcome?: SubagentRunOutcome; }): Promise { const RETRY_INTERVAL_MS = FAST_TEST_MODE ? FAST_TEST_RETRY_INTERVAL_MS : 100; const deadline = Date.now() + Math.max(0, Math.min(params.maxWaitMs, 15_000)); let result: string | undefined; while (Date.now() < deadline) { - result = await readLatestSubagentOutput(params.sessionKey); + result = await readSubagentOutput(params.sessionKey, params.outcome); if (result?.trim()) { return result; } @@ -334,7 +415,7 @@ async function readLatestSubagentOutputWithRetry(params: { export async function captureSubagentCompletionReply( sessionKey: string, ): Promise { - const immediate = await readLatestSubagentOutput(sessionKey); + const immediate = await readSubagentOutput(sessionKey); if (immediate?.trim()) { return immediate; } @@ -811,7 +892,6 @@ async function sendSubagentAnnounceDirectly(params: { operation: params.expectsCompletionMessage ? "completion direct announce agent call" : "direct announce agent call", - noRetryOnGatewayTimeout: params.expectsCompletionMessage && shouldDeliverExternally, signal: params.signal, run: async () => await callGateway({ @@ -1321,13 +1401,14 @@ export async function runSubagentAnnounceFlow(params: { (isAnnounceSkip(fallbackReply) || isSilentReplyText(fallbackReply, SILENT_REPLY_TOKEN)); if (!reply) { - reply = await readLatestSubagentOutput(params.childSessionKey); + reply = await readSubagentOutput(params.childSessionKey, outcome); } if (!reply?.trim()) { reply = await readLatestSubagentOutputWithRetry({ sessionKey: params.childSessionKey, maxWaitMs: params.timeoutMs, + outcome, }); }