From 277a0e5c131b525be6190e7e57e0da699d252258 Mon Sep 17 00:00:00 2001 From: Wesley Date: Wed, 18 Mar 2026 10:06:46 -0700 Subject: [PATCH] fix(subagent): preserve timeout partial progress reporting --- src/agents/subagent-announce.timeout.test.ts | 322 +++++++++++++++---- src/agents/subagent-announce.ts | 136 +++++++- 2 files changed, 381 insertions(+), 77 deletions(-) diff --git a/src/agents/subagent-announce.timeout.test.ts b/src/agents/subagent-announce.timeout.test.ts index 5fae988fe73..6b1cd29cf28 100644 --- a/src/agents/subagent-announce.timeout.test.ts +++ b/src/agents/subagent-announce.timeout.test.ts @@ -8,12 +8,6 @@ type GatewayCall = { }; const gatewayCalls: GatewayCall[] = []; -let callGatewayImpl: (request: GatewayCall) => Promise = async (request) => { - if (request.method === "chat.history") { - return { messages: [] }; - } - return {}; -}; let sessionStore: Record> = {}; let configOverride: ReturnType<(typeof import("../config/config.js"))["loadConfig"]> = { session: { @@ -30,10 +24,15 @@ let fallbackRequesterResolution: { requesterOrigin?: { channel?: string; to?: string; accountId?: string }; } | null = null; +let chatHistoryMessages: Array> = []; + vi.mock("../gateway/call.js", () => ({ callGateway: vi.fn(async (request: GatewayCall) => { gatewayCalls.push(request); - return await callGatewayImpl(request); + if (request.method === "chat.history") { + return { messages: chatHistoryMessages }; + } + return {}; }), })); @@ -120,30 +119,9 @@ function findGatewayCall(predicate: (call: GatewayCall) => boolean): GatewayCall return gatewayCalls.find(predicate); } -function findFinalDirectAgentCall(): GatewayCall | undefined { - return findGatewayCall((call) => call.method === "agent" && call.expectFinal === true); -} - -function setupParentSessionFallback(parentSessionKey: string): void { - requesterDepthResolver = (sessionKey?: string) => - sessionKey === parentSessionKey ? 1 : sessionKey?.includes(":subagent:") ? 1 : 0; - subagentSessionRunActive = false; - shouldIgnorePostCompletion = false; - fallbackRequesterResolution = { - requesterSessionKey: "agent:main:main", - requesterOrigin: { channel: "discord", to: "chan-main", accountId: "acct-main" }, - }; -} - describe("subagent announce timeout config", () => { beforeEach(() => { gatewayCalls.length = 0; - callGatewayImpl = async (request) => { - if (request.method === "chat.history") { - return { messages: [] }; - } - return {}; - }; sessionStore = {}; configOverride = { session: defaultSessionConfig, @@ -153,15 +131,16 @@ describe("subagent announce timeout config", () => { shouldIgnorePostCompletion = false; pendingDescendantRuns = 0; fallbackRequesterResolution = null; + chatHistoryMessages = []; }); - it("uses 90s timeout by default for direct announce agent call", async () => { + it("uses 60s timeout by default for direct announce agent call", async () => { await runAnnounceFlowForTest("run-default-timeout"); const directAgentCall = findGatewayCall( (call) => call.method === "agent" && call.expectFinal === true, ); - expect(directAgentCall?.timeoutMs).toBe(90_000); + expect(directAgentCall?.timeoutMs).toBe(60_000); }); it("honors configured announce timeout for direct announce agent call", async () => { @@ -190,35 +169,6 @@ describe("subagent announce timeout config", () => { expect(completionDirectAgentCall?.timeoutMs).toBe(90_000); }); - it("does not retry gateway timeout for externally delivered completion announces", async () => { - vi.useFakeTimers(); - try { - callGatewayImpl = async (request) => { - if (request.method === "chat.history") { - return { messages: [] }; - } - throw new Error("gateway timeout after 90000ms"); - }; - - await expect( - runAnnounceFlowForTest("run-completion-timeout-no-retry", { - requesterOrigin: { - channel: "telegram", - to: "12345", - }, - expectsCompletionMessage: true, - }), - ).resolves.toBe(false); - - const directAgentCalls = gatewayCalls.filter( - (call) => call.method === "agent" && call.expectFinal === true, - ); - expect(directAgentCalls).toHaveLength(1); - } finally { - vi.useRealTimers(); - } - }); - it("regression, skips parent announce while descendants are still pending", async () => { requesterDepthResolver = () => 1; pendingDescendantRuns = 2; @@ -259,7 +209,9 @@ describe("subagent announce timeout config", () => { requesterOrigin: { channel: "discord", to: "channel:cron-results", accountId: "acct-1" }, }); - const directAgentCall = findFinalDirectAgentCall(); + const directAgentCall = findGatewayCall( + (call) => call.method === "agent" && call.expectFinal === true, + ); expect(directAgentCall?.params?.sessionKey).toBe(cronSessionKey); expect(directAgentCall?.params?.deliver).toBe(false); expect(directAgentCall?.params?.channel).toBeUndefined(); @@ -269,7 +221,14 @@ describe("subagent announce timeout config", () => { it("regression, routes child announce to parent session instead of grandparent when parent session still exists", async () => { const parentSessionKey = "agent:main:subagent:parent"; - setupParentSessionFallback(parentSessionKey); + requesterDepthResolver = (sessionKey?: string) => + sessionKey === parentSessionKey ? 1 : sessionKey?.includes(":subagent:") ? 1 : 0; + subagentSessionRunActive = false; + shouldIgnorePostCompletion = false; + fallbackRequesterResolution = { + requesterSessionKey: "agent:main:main", + requesterOrigin: { channel: "discord", to: "chan-main", accountId: "acct-main" }, + }; // No sessionId on purpose: existence in store should still count as alive. sessionStore[parentSessionKey] = { updatedAt: Date.now() }; @@ -279,14 +238,23 @@ describe("subagent announce timeout config", () => { childSessionKey: `${parentSessionKey}:subagent:child`, }); - const directAgentCall = findFinalDirectAgentCall(); + const directAgentCall = findGatewayCall( + (call) => call.method === "agent" && call.expectFinal === true, + ); expect(directAgentCall?.params?.sessionKey).toBe(parentSessionKey); expect(directAgentCall?.params?.deliver).toBe(false); }); it("regression, falls back to grandparent only when parent subagent session is missing", async () => { const parentSessionKey = "agent:main:subagent:parent-missing"; - setupParentSessionFallback(parentSessionKey); + requesterDepthResolver = (sessionKey?: string) => + sessionKey === parentSessionKey ? 1 : sessionKey?.includes(":subagent:") ? 1 : 0; + subagentSessionRunActive = false; + shouldIgnorePostCompletion = false; + fallbackRequesterResolution = { + requesterSessionKey: "agent:main:main", + requesterOrigin: { channel: "discord", to: "chan-main", accountId: "acct-main" }, + }; await runAnnounceFlowForTest("run-parent-fallback", { requesterSessionKey: parentSessionKey, @@ -294,11 +262,237 @@ describe("subagent announce timeout config", () => { childSessionKey: `${parentSessionKey}:subagent:child`, }); - const directAgentCall = findFinalDirectAgentCall(); + const directAgentCall = findGatewayCall( + (call) => call.method === "agent" && call.expectFinal === true, + ); expect(directAgentCall?.params?.sessionKey).toBe("agent:main:main"); expect(directAgentCall?.params?.deliver).toBe(true); expect(directAgentCall?.params?.channel).toBe("discord"); expect(directAgentCall?.params?.to).toBe("chan-main"); expect(directAgentCall?.params?.accountId).toBe("acct-main"); }); + + it("includes partial progress from assistant messages when subagent times out", async () => { + // Simulate a session with assistant text from intermediate tool-call rounds. + chatHistoryMessages = [ + { role: "user", content: "do a complex task" }, + { + role: "assistant", + content: [ + { type: "text", text: "I'll start by reading the files..." }, + { type: "toolCall", id: "call1", name: "read", arguments: {} }, + ], + }, + { + role: "toolResult", + toolCallId: "call1", + content: [{ type: "text", text: "file contents" }], + }, + { + role: "assistant", + content: [ + { type: "text", text: "Now analyzing the code structure. Found 3 modules." }, + { type: "toolCall", id: "call2", name: "read", arguments: {} }, + ], + }, + { role: "toolResult", toolCallId: "call2", content: [{ type: "text", text: "more data" }] }, + // Last assistant turn was a tool call with no text — simulating mid-work timeout. + { + role: "assistant", + content: [{ type: "toolCall", id: "call3", name: "exec", arguments: {} }], + }, + ]; + + await runAnnounceFlowForTest("run-timeout-partial", { + outcome: { status: "timeout" }, + roundOneReply: undefined, + }); + + const directAgentCall = findGatewayCall( + (call) => call.method === "agent" && call.expectFinal === true, + ); + expect(directAgentCall).toBeDefined(); + + // The announce message should contain the partial progress. + const internalEvents = + (directAgentCall?.params?.internalEvents as Array<{ + result?: string; + statusLabel?: string; + }>) ?? []; + expect(internalEvents[0]?.statusLabel).toBe("timed out"); + // The result should include the partial assistant text, not just "(no output)". + expect(internalEvents[0]?.result).toBeTruthy(); + expect(internalEvents[0]?.result).not.toBe("(no output)"); + expect(internalEvents[0]?.result).toContain("tool call"); + // Verify assistant text fragments are extracted, not just tool call counts. + expect(internalEvents[0]?.result).toContain("reading the files"); + expect(internalEvents[0]?.result).toContain("analyzing the code structure"); + }); + + it("reports tool call count in partial progress for timeout with no assistant text", async () => { + // Subagent only made tool calls but never produced assistant text. + chatHistoryMessages = [ + { role: "user", content: "do something" }, + { + role: "assistant", + content: [{ type: "toolCall", id: "call1", name: "read", arguments: {} }], + }, + { role: "toolResult", toolCallId: "call1", content: [{ type: "text", text: "data" }] }, + { + role: "assistant", + content: [{ type: "toolCall", id: "call2", name: "exec", arguments: {} }], + }, + ]; + + await runAnnounceFlowForTest("run-timeout-no-text", { + outcome: { status: "timeout" }, + roundOneReply: undefined, + }); + + const directAgentCall = findGatewayCall( + (call) => call.method === "agent" && call.expectFinal === true, + ); + const internalEvents = + (directAgentCall?.params?.internalEvents as Array<{ result?: string }>) ?? []; + // Should report tool call count even without assistant text. + expect(internalEvents[0]?.result).toContain("2 tool call(s)"); + }); + + it("counts toolUse blocks in timeout partial progress", async () => { + chatHistoryMessages = [ + { role: "user", content: "do something" }, + { + role: "assistant", + content: [{ type: "toolUse", id: "call1", name: "read", input: {} }], + }, + { role: "toolResult", toolCallId: "call1", content: [{ type: "text", text: "data" }] }, + { + role: "assistant", + content: [{ type: "toolUse", id: "call2", name: "exec", input: {} }], + }, + ]; + + await runAnnounceFlowForTest("run-timeout-tooluse", { + outcome: { status: "timeout" }, + roundOneReply: undefined, + }); + + const directAgentCall = findGatewayCall( + (call) => call.method === "agent" && call.expectFinal === true, + ); + const internalEvents = + (directAgentCall?.params?.internalEvents as Array<{ result?: string }>) ?? []; + expect(internalEvents[0]?.result).toContain("2 tool call(s)"); + }); + + it("counts functionCall blocks in timeout partial progress", async () => { + chatHistoryMessages = [ + { role: "user", content: "do something" }, + { + role: "assistant", + content: [{ type: "functionCall", id: "call1", name: "read", arguments: {} }], + }, + { role: "toolResult", toolCallId: "call1", content: [{ type: "text", text: "data" }] }, + { + role: "assistant", + content: [{ type: "functionCall", id: "call2", name: "exec", arguments: {} }], + }, + ]; + + await runAnnounceFlowForTest("run-timeout-functioncall", { + outcome: { status: "timeout" }, + roundOneReply: undefined, + }); + + const directAgentCall = findGatewayCall( + (call) => call.method === "agent" && call.expectFinal === true, + ); + const internalEvents = + (directAgentCall?.params?.internalEvents as Array<{ result?: string }>) ?? []; + expect(internalEvents[0]?.result).toContain("2 tool call(s)"); + }); + + it("preserves NO_REPLY when timeout partial progress exists", async () => { + chatHistoryMessages = [ + { role: "user", content: "do something" }, + { + role: "assistant", + content: [ + { type: "text", text: "Still working through the files." }, + { type: "toolCall", id: "call1", name: "read", arguments: {} }, + ], + }, + ]; + + await runAnnounceFlowForTest("run-timeout-no-reply", { + outcome: { status: "timeout" }, + roundOneReply: " NO_REPLY ", + }); + + expect( + findGatewayCall((call) => call.method === "agent" && call.expectFinal === true), + ).toBeUndefined(); + }); + + it("preserves NO_REPLY when timeout partial-progress history mixes prior text and later silence", async () => { + chatHistoryMessages = [ + { role: "user", content: "do something" }, + { + role: "assistant", + content: [ + { type: "text", text: "Still working through the files." }, + { type: "toolCall", id: "call1", name: "read", arguments: {} }, + ], + }, + { role: "toolResult", toolCallId: "call1", content: [{ type: "text", text: "data" }] }, + { + role: "assistant", + content: [{ type: "text", text: "NO_REPLY" }], + }, + { + role: "assistant", + content: [{ type: "toolCall", id: "call2", name: "exec", arguments: {} }], + }, + ]; + + await runAnnounceFlowForTest("run-timeout-mixed-no-reply", { + outcome: { status: "timeout" }, + roundOneReply: undefined, + }); + + expect( + findGatewayCall((call) => call.method === "agent" && call.expectFinal === true), + ).toBeUndefined(); + }); + + it("prefers NO_REPLY partial progress over a longer latest assistant reply", async () => { + chatHistoryMessages = [ + { role: "user", content: "do something" }, + { + role: "assistant", + content: [ + { type: "text", text: "Still working through the files." }, + { type: "toolCall", id: "call1", name: "read", arguments: {} }, + ], + }, + { role: "toolResult", toolCallId: "call1", content: [{ type: "text", text: "data" }] }, + { + role: "assistant", + content: [{ type: "text", text: "NO_REPLY" }], + }, + { + role: "assistant", + content: [{ type: "text", text: "A longer partial summary that should stay silent." }], + }, + ]; + + await runAnnounceFlowForTest("run-timeout-no-reply-overrides-latest-text", { + outcome: { status: "timeout" }, + roundOneReply: undefined, + }); + + expect( + findGatewayCall((call) => call.method === "agent" && call.expectFinal === true), + ).toBeUndefined(); + }); }); diff --git a/src/agents/subagent-announce.ts b/src/agents/subagent-announce.ts index 5070b204392..b7dd51f4ada 100644 --- a/src/agents/subagent-announce.ts +++ b/src/agents/subagent-announce.ts @@ -51,9 +51,8 @@ import { isAnnounceSkip } from "./tools/sessions-send-helpers.js"; const FAST_TEST_MODE = process.env.OPENCLAW_TEST_FAST === "1"; const FAST_TEST_RETRY_INTERVAL_MS = 8; -const DEFAULT_SUBAGENT_ANNOUNCE_TIMEOUT_MS = 90_000; +const DEFAULT_SUBAGENT_ANNOUNCE_TIMEOUT_MS = 60_000; const MAX_TIMER_SAFE_TIMEOUT_MS = 2_147_000_000; -const GATEWAY_TIMEOUT_PATTERN = /gateway timeout/i; let subagentRegistryRuntimePromise: Promise< typeof import("./subagent-registry-runtime.js") > | null = null; @@ -108,7 +107,7 @@ const TRANSIENT_ANNOUNCE_DELIVERY_ERROR_PATTERNS: readonly RegExp[] = [ /no active .* listener/i, /gateway not connected/i, /gateway closed \(1006/i, - GATEWAY_TIMEOUT_PATTERN, + /gateway timeout/i, /\b(econnreset|econnrefused|etimedout|enotfound|ehostunreach|network error)\b/i, ]; @@ -134,11 +133,6 @@ function isTransientAnnounceDeliveryError(error: unknown): boolean { return TRANSIENT_ANNOUNCE_DELIVERY_ERROR_PATTERNS.some((re) => re.test(message)); } -function isGatewayTimeoutError(error: unknown): boolean { - const message = summarizeDeliveryError(error); - return Boolean(message) && GATEWAY_TIMEOUT_PATTERN.test(message); -} - async function waitForAnnounceRetryDelay(ms: number, signal?: AbortSignal): Promise { if (ms <= 0) { return; @@ -166,7 +160,6 @@ async function waitForAnnounceRetryDelay(ms: number, signal?: AbortSignal): Prom async function runAnnounceDeliveryWithRetry(params: { operation: string; - noRetryOnGatewayTimeout?: boolean; signal?: AbortSignal; run: () => Promise; }): Promise { @@ -178,9 +171,6 @@ async function runAnnounceDeliveryWithRetry(params: { try { return await params.run(); } catch (err) { - if (params.noRetryOnGatewayTimeout && isGatewayTimeoutError(err)) { - throw err; - } const delayMs = DIRECT_ANNOUNCE_TRANSIENT_RETRY_DELAYS_MS[retryIndex]; if (delayMs == null || !isTransientAnnounceDeliveryError(err) || params.signal?.aborted) { throw err; @@ -312,6 +302,103 @@ async function readLatestSubagentOutput(sessionKey: string): Promise { + let history: { messages?: Array } | undefined; + try { + history = await callGateway<{ messages?: Array }>({ + method: "chat.history", + params: { sessionKey, limit: 100 }, + }); + } catch { + return undefined; + } + const messages = Array.isArray(history?.messages) ? history.messages : []; + if (messages.length === 0) { + return undefined; + } + + // Collect all assistant text fragments (partial results from each turn). + const assistantFragments: string[] = []; + let toolCallCount = 0; + let silentAssistantOverrideText: string | undefined; + + for (const msg of messages) { + if (!msg || typeof msg !== "object") { + continue; + } + const role = (msg as { role?: unknown }).role; + if (role === "assistant") { + const text = extractSubagentOutputText(msg); + if (text?.trim()) { + const trimmedText = text.trim(); + if (isAnnounceSkip(trimmedText) || isSilentReplyText(trimmedText, SILENT_REPLY_TOKEN)) { + // Preserve explicit silence semantics across timeout fallback synthesis. + // If any assistant turn asked to stay silent, do not turn earlier + // partial progress into a user-visible completion. + silentAssistantOverrideText = trimmedText; + } else { + assistantFragments.push(trimmedText); + } + } + // Count tool calls to report progress depth. + const content = (msg as { content?: unknown }).content; + if (Array.isArray(content)) { + for (const block of content) { + if ( + block && + typeof block === "object" && + ((block as { type?: string }).type === "toolCall" || + (block as { type?: string }).type === "tool_use" || + (block as { type?: string }).type === "toolUse" || + (block as { type?: string }).type === "functionCall" || + (block as { type?: string }).type === "function_call") + ) { + toolCallCount += 1; + } + } + } + } + } + + if (silentAssistantOverrideText) { + return silentAssistantOverrideText; + } + + if (assistantFragments.length === 0 && toolCallCount === 0) { + return undefined; + } + + const parts: string[] = []; + if (toolCallCount > 0) { + parts.push(`[Partial progress: ${toolCallCount} tool call(s) executed before timeout]`); + } + if (assistantFragments.length > 0) { + // Return the last (most recent) assistant fragment as the primary result, + // but include earlier fragments if the last one is short. + const lastFragment = assistantFragments[assistantFragments.length - 1]; + if (assistantFragments.length > 1 && lastFragment.length < 200) { + // Include up to 3 most recent fragments for context. + const recentFragments = assistantFragments.slice(-3); + parts.push(recentFragments.join("\n\n---\n\n")); + } else { + parts.push(lastFragment); + } + } + + return parts.join("\n\n") || undefined; +} + async function readLatestSubagentOutputWithRetry(params: { sessionKey: string; maxWaitMs: number; @@ -799,7 +886,6 @@ async function sendSubagentAnnounceDirectly(params: { operation: params.expectsCompletionMessage ? "completion direct announce agent call" : "direct announce agent call", - noRetryOnGatewayTimeout: params.expectsCompletionMessage && shouldDeliverExternally, signal: params.signal, run: async () => await callGateway({ @@ -1319,6 +1405,30 @@ export async function runSubagentAnnounceFlow(params: { }); } + // For timed-out runs, attempt to collect partial progress from the full + // session history. The subagent may have produced useful intermediate + // results across multiple tool-call rounds even though no final assistant + // reply was generated before the timeout. Use the richer partial progress + // when it contains more context than the simple last-message extraction. + if (outcome.status === "timeout") { + const partialProgress = await readSubagentPartialProgress(params.childSessionKey); + // Do not overwrite recognized silent/skip tokens with partial progress — + // that would cause the parent to announce when it should stay silent. + const replyIsSilent = + reply?.trim() && (isAnnounceSkip(reply) || isSilentReplyText(reply, SILENT_REPLY_TOKEN)); + const partialProgressIsSilent = + partialProgress?.trim() && + (isAnnounceSkip(partialProgress) || + isSilentReplyText(partialProgress, SILENT_REPLY_TOKEN)); + if ( + !replyIsSilent && + partialProgress?.trim() && + (partialProgressIsSilent || !reply?.trim() || partialProgress.length > reply.length) + ) { + reply = partialProgress; + } + } + if (!reply?.trim() && fallbackReply && !fallbackIsSilent) { reply = fallbackReply; }