fix(subagent): preserve timeout partial progress reporting

This commit is contained in:
Wesley 2026-03-18 10:06:46 -07:00
parent 06832112ee
commit 277a0e5c13
2 changed files with 381 additions and 77 deletions

View File

@ -8,12 +8,6 @@ type GatewayCall = {
};
const gatewayCalls: GatewayCall[] = [];
let callGatewayImpl: (request: GatewayCall) => Promise<unknown> = async (request) => {
if (request.method === "chat.history") {
return { messages: [] };
}
return {};
};
let sessionStore: Record<string, Record<string, unknown>> = {};
let configOverride: ReturnType<(typeof import("../config/config.js"))["loadConfig"]> = {
session: {
@ -30,10 +24,15 @@ let fallbackRequesterResolution: {
requesterOrigin?: { channel?: string; to?: string; accountId?: string };
} | null = null;
let chatHistoryMessages: Array<Record<string, unknown>> = [];
vi.mock("../gateway/call.js", () => ({
callGateway: vi.fn(async (request: GatewayCall) => {
gatewayCalls.push(request);
return await callGatewayImpl(request);
if (request.method === "chat.history") {
return { messages: chatHistoryMessages };
}
return {};
}),
}));
@ -120,30 +119,9 @@ function findGatewayCall(predicate: (call: GatewayCall) => boolean): GatewayCall
return gatewayCalls.find(predicate);
}
function findFinalDirectAgentCall(): GatewayCall | undefined {
return findGatewayCall((call) => call.method === "agent" && call.expectFinal === true);
}
function setupParentSessionFallback(parentSessionKey: string): void {
requesterDepthResolver = (sessionKey?: string) =>
sessionKey === parentSessionKey ? 1 : sessionKey?.includes(":subagent:") ? 1 : 0;
subagentSessionRunActive = false;
shouldIgnorePostCompletion = false;
fallbackRequesterResolution = {
requesterSessionKey: "agent:main:main",
requesterOrigin: { channel: "discord", to: "chan-main", accountId: "acct-main" },
};
}
describe("subagent announce timeout config", () => {
beforeEach(() => {
gatewayCalls.length = 0;
callGatewayImpl = async (request) => {
if (request.method === "chat.history") {
return { messages: [] };
}
return {};
};
sessionStore = {};
configOverride = {
session: defaultSessionConfig,
@ -153,15 +131,16 @@ describe("subagent announce timeout config", () => {
shouldIgnorePostCompletion = false;
pendingDescendantRuns = 0;
fallbackRequesterResolution = null;
chatHistoryMessages = [];
});
it("uses 90s timeout by default for direct announce agent call", async () => {
it("uses 60s timeout by default for direct announce agent call", async () => {
await runAnnounceFlowForTest("run-default-timeout");
const directAgentCall = findGatewayCall(
(call) => call.method === "agent" && call.expectFinal === true,
);
expect(directAgentCall?.timeoutMs).toBe(90_000);
expect(directAgentCall?.timeoutMs).toBe(60_000);
});
it("honors configured announce timeout for direct announce agent call", async () => {
@ -190,35 +169,6 @@ describe("subagent announce timeout config", () => {
expect(completionDirectAgentCall?.timeoutMs).toBe(90_000);
});
it("does not retry gateway timeout for externally delivered completion announces", async () => {
vi.useFakeTimers();
try {
callGatewayImpl = async (request) => {
if (request.method === "chat.history") {
return { messages: [] };
}
throw new Error("gateway timeout after 90000ms");
};
await expect(
runAnnounceFlowForTest("run-completion-timeout-no-retry", {
requesterOrigin: {
channel: "telegram",
to: "12345",
},
expectsCompletionMessage: true,
}),
).resolves.toBe(false);
const directAgentCalls = gatewayCalls.filter(
(call) => call.method === "agent" && call.expectFinal === true,
);
expect(directAgentCalls).toHaveLength(1);
} finally {
vi.useRealTimers();
}
});
it("regression, skips parent announce while descendants are still pending", async () => {
requesterDepthResolver = () => 1;
pendingDescendantRuns = 2;
@ -259,7 +209,9 @@ describe("subagent announce timeout config", () => {
requesterOrigin: { channel: "discord", to: "channel:cron-results", accountId: "acct-1" },
});
const directAgentCall = findFinalDirectAgentCall();
const directAgentCall = findGatewayCall(
(call) => call.method === "agent" && call.expectFinal === true,
);
expect(directAgentCall?.params?.sessionKey).toBe(cronSessionKey);
expect(directAgentCall?.params?.deliver).toBe(false);
expect(directAgentCall?.params?.channel).toBeUndefined();
@ -269,7 +221,14 @@ describe("subagent announce timeout config", () => {
it("regression, routes child announce to parent session instead of grandparent when parent session still exists", async () => {
const parentSessionKey = "agent:main:subagent:parent";
setupParentSessionFallback(parentSessionKey);
requesterDepthResolver = (sessionKey?: string) =>
sessionKey === parentSessionKey ? 1 : sessionKey?.includes(":subagent:") ? 1 : 0;
subagentSessionRunActive = false;
shouldIgnorePostCompletion = false;
fallbackRequesterResolution = {
requesterSessionKey: "agent:main:main",
requesterOrigin: { channel: "discord", to: "chan-main", accountId: "acct-main" },
};
// No sessionId on purpose: existence in store should still count as alive.
sessionStore[parentSessionKey] = { updatedAt: Date.now() };
@ -279,14 +238,23 @@ describe("subagent announce timeout config", () => {
childSessionKey: `${parentSessionKey}:subagent:child`,
});
const directAgentCall = findFinalDirectAgentCall();
const directAgentCall = findGatewayCall(
(call) => call.method === "agent" && call.expectFinal === true,
);
expect(directAgentCall?.params?.sessionKey).toBe(parentSessionKey);
expect(directAgentCall?.params?.deliver).toBe(false);
});
it("regression, falls back to grandparent only when parent subagent session is missing", async () => {
const parentSessionKey = "agent:main:subagent:parent-missing";
setupParentSessionFallback(parentSessionKey);
requesterDepthResolver = (sessionKey?: string) =>
sessionKey === parentSessionKey ? 1 : sessionKey?.includes(":subagent:") ? 1 : 0;
subagentSessionRunActive = false;
shouldIgnorePostCompletion = false;
fallbackRequesterResolution = {
requesterSessionKey: "agent:main:main",
requesterOrigin: { channel: "discord", to: "chan-main", accountId: "acct-main" },
};
await runAnnounceFlowForTest("run-parent-fallback", {
requesterSessionKey: parentSessionKey,
@ -294,11 +262,237 @@ describe("subagent announce timeout config", () => {
childSessionKey: `${parentSessionKey}:subagent:child`,
});
const directAgentCall = findFinalDirectAgentCall();
const directAgentCall = findGatewayCall(
(call) => call.method === "agent" && call.expectFinal === true,
);
expect(directAgentCall?.params?.sessionKey).toBe("agent:main:main");
expect(directAgentCall?.params?.deliver).toBe(true);
expect(directAgentCall?.params?.channel).toBe("discord");
expect(directAgentCall?.params?.to).toBe("chan-main");
expect(directAgentCall?.params?.accountId).toBe("acct-main");
});
it("includes partial progress from assistant messages when subagent times out", async () => {
// Simulate a session with assistant text from intermediate tool-call rounds.
chatHistoryMessages = [
{ role: "user", content: "do a complex task" },
{
role: "assistant",
content: [
{ type: "text", text: "I'll start by reading the files..." },
{ type: "toolCall", id: "call1", name: "read", arguments: {} },
],
},
{
role: "toolResult",
toolCallId: "call1",
content: [{ type: "text", text: "file contents" }],
},
{
role: "assistant",
content: [
{ type: "text", text: "Now analyzing the code structure. Found 3 modules." },
{ type: "toolCall", id: "call2", name: "read", arguments: {} },
],
},
{ role: "toolResult", toolCallId: "call2", content: [{ type: "text", text: "more data" }] },
// Last assistant turn was a tool call with no text — simulating mid-work timeout.
{
role: "assistant",
content: [{ type: "toolCall", id: "call3", name: "exec", arguments: {} }],
},
];
await runAnnounceFlowForTest("run-timeout-partial", {
outcome: { status: "timeout" },
roundOneReply: undefined,
});
const directAgentCall = findGatewayCall(
(call) => call.method === "agent" && call.expectFinal === true,
);
expect(directAgentCall).toBeDefined();
// The announce message should contain the partial progress.
const internalEvents =
(directAgentCall?.params?.internalEvents as Array<{
result?: string;
statusLabel?: string;
}>) ?? [];
expect(internalEvents[0]?.statusLabel).toBe("timed out");
// The result should include the partial assistant text, not just "(no output)".
expect(internalEvents[0]?.result).toBeTruthy();
expect(internalEvents[0]?.result).not.toBe("(no output)");
expect(internalEvents[0]?.result).toContain("tool call");
// Verify assistant text fragments are extracted, not just tool call counts.
expect(internalEvents[0]?.result).toContain("reading the files");
expect(internalEvents[0]?.result).toContain("analyzing the code structure");
});
it("reports tool call count in partial progress for timeout with no assistant text", async () => {
// Subagent only made tool calls but never produced assistant text.
chatHistoryMessages = [
{ role: "user", content: "do something" },
{
role: "assistant",
content: [{ type: "toolCall", id: "call1", name: "read", arguments: {} }],
},
{ role: "toolResult", toolCallId: "call1", content: [{ type: "text", text: "data" }] },
{
role: "assistant",
content: [{ type: "toolCall", id: "call2", name: "exec", arguments: {} }],
},
];
await runAnnounceFlowForTest("run-timeout-no-text", {
outcome: { status: "timeout" },
roundOneReply: undefined,
});
const directAgentCall = findGatewayCall(
(call) => call.method === "agent" && call.expectFinal === true,
);
const internalEvents =
(directAgentCall?.params?.internalEvents as Array<{ result?: string }>) ?? [];
// Should report tool call count even without assistant text.
expect(internalEvents[0]?.result).toContain("2 tool call(s)");
});
it("counts toolUse blocks in timeout partial progress", async () => {
chatHistoryMessages = [
{ role: "user", content: "do something" },
{
role: "assistant",
content: [{ type: "toolUse", id: "call1", name: "read", input: {} }],
},
{ role: "toolResult", toolCallId: "call1", content: [{ type: "text", text: "data" }] },
{
role: "assistant",
content: [{ type: "toolUse", id: "call2", name: "exec", input: {} }],
},
];
await runAnnounceFlowForTest("run-timeout-tooluse", {
outcome: { status: "timeout" },
roundOneReply: undefined,
});
const directAgentCall = findGatewayCall(
(call) => call.method === "agent" && call.expectFinal === true,
);
const internalEvents =
(directAgentCall?.params?.internalEvents as Array<{ result?: string }>) ?? [];
expect(internalEvents[0]?.result).toContain("2 tool call(s)");
});
it("counts functionCall blocks in timeout partial progress", async () => {
chatHistoryMessages = [
{ role: "user", content: "do something" },
{
role: "assistant",
content: [{ type: "functionCall", id: "call1", name: "read", arguments: {} }],
},
{ role: "toolResult", toolCallId: "call1", content: [{ type: "text", text: "data" }] },
{
role: "assistant",
content: [{ type: "functionCall", id: "call2", name: "exec", arguments: {} }],
},
];
await runAnnounceFlowForTest("run-timeout-functioncall", {
outcome: { status: "timeout" },
roundOneReply: undefined,
});
const directAgentCall = findGatewayCall(
(call) => call.method === "agent" && call.expectFinal === true,
);
const internalEvents =
(directAgentCall?.params?.internalEvents as Array<{ result?: string }>) ?? [];
expect(internalEvents[0]?.result).toContain("2 tool call(s)");
});
it("preserves NO_REPLY when timeout partial progress exists", async () => {
chatHistoryMessages = [
{ role: "user", content: "do something" },
{
role: "assistant",
content: [
{ type: "text", text: "Still working through the files." },
{ type: "toolCall", id: "call1", name: "read", arguments: {} },
],
},
];
await runAnnounceFlowForTest("run-timeout-no-reply", {
outcome: { status: "timeout" },
roundOneReply: " NO_REPLY ",
});
expect(
findGatewayCall((call) => call.method === "agent" && call.expectFinal === true),
).toBeUndefined();
});
it("preserves NO_REPLY when timeout partial-progress history mixes prior text and later silence", async () => {
chatHistoryMessages = [
{ role: "user", content: "do something" },
{
role: "assistant",
content: [
{ type: "text", text: "Still working through the files." },
{ type: "toolCall", id: "call1", name: "read", arguments: {} },
],
},
{ role: "toolResult", toolCallId: "call1", content: [{ type: "text", text: "data" }] },
{
role: "assistant",
content: [{ type: "text", text: "NO_REPLY" }],
},
{
role: "assistant",
content: [{ type: "toolCall", id: "call2", name: "exec", arguments: {} }],
},
];
await runAnnounceFlowForTest("run-timeout-mixed-no-reply", {
outcome: { status: "timeout" },
roundOneReply: undefined,
});
expect(
findGatewayCall((call) => call.method === "agent" && call.expectFinal === true),
).toBeUndefined();
});
it("prefers NO_REPLY partial progress over a longer latest assistant reply", async () => {
chatHistoryMessages = [
{ role: "user", content: "do something" },
{
role: "assistant",
content: [
{ type: "text", text: "Still working through the files." },
{ type: "toolCall", id: "call1", name: "read", arguments: {} },
],
},
{ role: "toolResult", toolCallId: "call1", content: [{ type: "text", text: "data" }] },
{
role: "assistant",
content: [{ type: "text", text: "NO_REPLY" }],
},
{
role: "assistant",
content: [{ type: "text", text: "A longer partial summary that should stay silent." }],
},
];
await runAnnounceFlowForTest("run-timeout-no-reply-overrides-latest-text", {
outcome: { status: "timeout" },
roundOneReply: undefined,
});
expect(
findGatewayCall((call) => call.method === "agent" && call.expectFinal === true),
).toBeUndefined();
});
});

View File

@ -51,9 +51,8 @@ import { isAnnounceSkip } from "./tools/sessions-send-helpers.js";
const FAST_TEST_MODE = process.env.OPENCLAW_TEST_FAST === "1";
const FAST_TEST_RETRY_INTERVAL_MS = 8;
const DEFAULT_SUBAGENT_ANNOUNCE_TIMEOUT_MS = 90_000;
const DEFAULT_SUBAGENT_ANNOUNCE_TIMEOUT_MS = 60_000;
const MAX_TIMER_SAFE_TIMEOUT_MS = 2_147_000_000;
const GATEWAY_TIMEOUT_PATTERN = /gateway timeout/i;
let subagentRegistryRuntimePromise: Promise<
typeof import("./subagent-registry-runtime.js")
> | null = null;
@ -108,7 +107,7 @@ const TRANSIENT_ANNOUNCE_DELIVERY_ERROR_PATTERNS: readonly RegExp[] = [
/no active .* listener/i,
/gateway not connected/i,
/gateway closed \(1006/i,
GATEWAY_TIMEOUT_PATTERN,
/gateway timeout/i,
/\b(econnreset|econnrefused|etimedout|enotfound|ehostunreach|network error)\b/i,
];
@ -134,11 +133,6 @@ function isTransientAnnounceDeliveryError(error: unknown): boolean {
return TRANSIENT_ANNOUNCE_DELIVERY_ERROR_PATTERNS.some((re) => re.test(message));
}
function isGatewayTimeoutError(error: unknown): boolean {
const message = summarizeDeliveryError(error);
return Boolean(message) && GATEWAY_TIMEOUT_PATTERN.test(message);
}
async function waitForAnnounceRetryDelay(ms: number, signal?: AbortSignal): Promise<void> {
if (ms <= 0) {
return;
@ -166,7 +160,6 @@ async function waitForAnnounceRetryDelay(ms: number, signal?: AbortSignal): Prom
async function runAnnounceDeliveryWithRetry<T>(params: {
operation: string;
noRetryOnGatewayTimeout?: boolean;
signal?: AbortSignal;
run: () => Promise<T>;
}): Promise<T> {
@ -178,9 +171,6 @@ async function runAnnounceDeliveryWithRetry<T>(params: {
try {
return await params.run();
} catch (err) {
if (params.noRetryOnGatewayTimeout && isGatewayTimeoutError(err)) {
throw err;
}
const delayMs = DIRECT_ANNOUNCE_TRANSIENT_RETRY_DELAYS_MS[retryIndex];
if (delayMs == null || !isTransientAnnounceDeliveryError(err) || params.signal?.aborted) {
throw err;
@ -312,6 +302,103 @@ async function readLatestSubagentOutput(sessionKey: string): Promise<string | un
return undefined;
}
/**
* Collect partial progress from a timed-out subagent session.
*
* Unlike `readLatestSubagentOutput` which returns only the last message,
* this function scans the full history to build a progress summary from
* all assistant messages. This is critical for timeout scenarios where
* the subagent may have produced intermediate results across multiple
* tool-call rounds but no final summary.
*
* @see https://github.com/openclaw/openclaw/issues/33827
*/
async function readSubagentPartialProgress(sessionKey: string): Promise<string | undefined> {
let history: { messages?: Array<unknown> } | undefined;
try {
history = await callGateway<{ messages?: Array<unknown> }>({
method: "chat.history",
params: { sessionKey, limit: 100 },
});
} catch {
return undefined;
}
const messages = Array.isArray(history?.messages) ? history.messages : [];
if (messages.length === 0) {
return undefined;
}
// Collect all assistant text fragments (partial results from each turn).
const assistantFragments: string[] = [];
let toolCallCount = 0;
let silentAssistantOverrideText: string | undefined;
for (const msg of messages) {
if (!msg || typeof msg !== "object") {
continue;
}
const role = (msg as { role?: unknown }).role;
if (role === "assistant") {
const text = extractSubagentOutputText(msg);
if (text?.trim()) {
const trimmedText = text.trim();
if (isAnnounceSkip(trimmedText) || isSilentReplyText(trimmedText, SILENT_REPLY_TOKEN)) {
// Preserve explicit silence semantics across timeout fallback synthesis.
// If any assistant turn asked to stay silent, do not turn earlier
// partial progress into a user-visible completion.
silentAssistantOverrideText = trimmedText;
} else {
assistantFragments.push(trimmedText);
}
}
// Count tool calls to report progress depth.
const content = (msg as { content?: unknown }).content;
if (Array.isArray(content)) {
for (const block of content) {
if (
block &&
typeof block === "object" &&
((block as { type?: string }).type === "toolCall" ||
(block as { type?: string }).type === "tool_use" ||
(block as { type?: string }).type === "toolUse" ||
(block as { type?: string }).type === "functionCall" ||
(block as { type?: string }).type === "function_call")
) {
toolCallCount += 1;
}
}
}
}
}
if (silentAssistantOverrideText) {
return silentAssistantOverrideText;
}
if (assistantFragments.length === 0 && toolCallCount === 0) {
return undefined;
}
const parts: string[] = [];
if (toolCallCount > 0) {
parts.push(`[Partial progress: ${toolCallCount} tool call(s) executed before timeout]`);
}
if (assistantFragments.length > 0) {
// Return the last (most recent) assistant fragment as the primary result,
// but include earlier fragments if the last one is short.
const lastFragment = assistantFragments[assistantFragments.length - 1];
if (assistantFragments.length > 1 && lastFragment.length < 200) {
// Include up to 3 most recent fragments for context.
const recentFragments = assistantFragments.slice(-3);
parts.push(recentFragments.join("\n\n---\n\n"));
} else {
parts.push(lastFragment);
}
}
return parts.join("\n\n") || undefined;
}
async function readLatestSubagentOutputWithRetry(params: {
sessionKey: string;
maxWaitMs: number;
@ -799,7 +886,6 @@ async function sendSubagentAnnounceDirectly(params: {
operation: params.expectsCompletionMessage
? "completion direct announce agent call"
: "direct announce agent call",
noRetryOnGatewayTimeout: params.expectsCompletionMessage && shouldDeliverExternally,
signal: params.signal,
run: async () =>
await callGateway({
@ -1319,6 +1405,30 @@ export async function runSubagentAnnounceFlow(params: {
});
}
// For timed-out runs, attempt to collect partial progress from the full
// session history. The subagent may have produced useful intermediate
// results across multiple tool-call rounds even though no final assistant
// reply was generated before the timeout. Use the richer partial progress
// when it contains more context than the simple last-message extraction.
if (outcome.status === "timeout") {
const partialProgress = await readSubagentPartialProgress(params.childSessionKey);
// Do not overwrite recognized silent/skip tokens with partial progress —
// that would cause the parent to announce when it should stay silent.
const replyIsSilent =
reply?.trim() && (isAnnounceSkip(reply) || isSilentReplyText(reply, SILENT_REPLY_TOKEN));
const partialProgressIsSilent =
partialProgress?.trim() &&
(isAnnounceSkip(partialProgress) ||
isSilentReplyText(partialProgress, SILENT_REPLY_TOKEN));
if (
!replyIsSilent &&
partialProgress?.trim() &&
(partialProgressIsSilent || !reply?.trim() || partialProgress.length > reply.length)
) {
reply = partialProgress;
}
}
if (!reply?.trim() && fallbackReply && !fallbackIsSilent) {
reply = fallbackReply;
}