From 02c83082847b3204cf69ca6846baedad54d7fa46 Mon Sep 17 00:00:00 2001 From: huangliuliu-bot Date: Tue, 10 Mar 2026 19:38:13 +0800 Subject: [PATCH] feat(agents): retry empty-stream once before model fallback --- src/agents/model-fallback.test.ts | 98 ++++++++++++++++++- src/agents/model-fallback.ts | 66 +++++++++++++ ...dded-helpers.isbillingerrormessage.test.ts | 6 ++ .../pi-embedded-helpers/failover-matches.ts | 3 +- 4 files changed, 171 insertions(+), 2 deletions(-) diff --git a/src/agents/model-fallback.test.ts b/src/agents/model-fallback.test.ts index f8422b4aa14..0ac210436dd 100644 --- a/src/agents/model-fallback.test.ts +++ b/src/agents/model-fallback.test.ts @@ -2,7 +2,7 @@ import crypto from "node:crypto"; import fs from "node:fs/promises"; import os from "node:os"; import path from "node:path"; -import { describe, expect, it, vi } from "vitest"; +import { afterEach, describe, expect, it, vi } from "vitest"; import type { OpenClawConfig } from "../config/config.js"; import { resetLogger, setLoggerOverride } from "../logging/logger.js"; import type { AuthProfileStore } from "./auth-profiles.js"; @@ -189,6 +189,28 @@ const MODEL_COOLDOWN_MESSAGE = "model_cooldown: All credentials for model gpt-5 // SDK/transport compatibility marker, not a provider API contract. const CONNECTION_ERROR_MESSAGE = "Connection error."; +const originalEmptyStreamRetry = process.env.OPENCLAW_EMPTY_STREAM_RETRY; +const originalEmptyStreamRetryMin = process.env.OPENCLAW_EMPTY_STREAM_RETRY_MIN_MS; +const originalEmptyStreamRetryMax = process.env.OPENCLAW_EMPTY_STREAM_RETRY_MAX_MS; + +afterEach(() => { + if (originalEmptyStreamRetry === undefined) { + delete process.env.OPENCLAW_EMPTY_STREAM_RETRY; + } else { + process.env.OPENCLAW_EMPTY_STREAM_RETRY = originalEmptyStreamRetry; + } + if (originalEmptyStreamRetryMin === undefined) { + delete process.env.OPENCLAW_EMPTY_STREAM_RETRY_MIN_MS; + } else { + process.env.OPENCLAW_EMPTY_STREAM_RETRY_MIN_MS = originalEmptyStreamRetryMin; + } + if (originalEmptyStreamRetryMax === undefined) { + delete process.env.OPENCLAW_EMPTY_STREAM_RETRY_MAX_MS; + } else { + process.env.OPENCLAW_EMPTY_STREAM_RETRY_MAX_MS = originalEmptyStreamRetryMax; + } +}); + describe("runWithModelFallback", () => { it("keeps openai gpt-5.3 codex on the openai provider before running", async () => { const cfg = makeCfg(); @@ -206,6 +228,80 @@ describe("runWithModelFallback", () => { expect(run).toHaveBeenCalledWith("openai", "gpt-5.3-codex"); }); + it("retries once on empty-stream errors before falling back", async () => { + const cfg = makeCfg(); + process.env.OPENCLAW_EMPTY_STREAM_RETRY_MIN_MS = "0"; + process.env.OPENCLAW_EMPTY_STREAM_RETRY_MAX_MS = "0"; + const run = vi + .fn() + .mockRejectedValueOnce(new Error("request ended without sending any chunks")) + .mockResolvedValueOnce("ok"); + + const result = await runWithModelFallback({ + cfg, + provider: "openai", + model: "gpt-4.1-mini", + run, + }); + + expect(result.result).toBe("ok"); + expect(run).toHaveBeenCalledTimes(2); + expect(run.mock.calls).toEqual([ + ["openai", "gpt-4.1-mini"], + ["openai", "gpt-4.1-mini"], + ]); + expect(result.attempts[0]?.error).toContain("Empty stream before first chunk"); + }); + + it("falls back after empty-stream retry still fails", async () => { + const cfg = makeCfg(); + process.env.OPENCLAW_EMPTY_STREAM_RETRY_MIN_MS = "0"; + process.env.OPENCLAW_EMPTY_STREAM_RETRY_MAX_MS = "0"; + const run = vi + .fn() + .mockRejectedValueOnce(new Error("request ended without sending any chunks")) + .mockRejectedValueOnce(new Error("request ended without sending any chunks")) + .mockResolvedValueOnce("ok"); + + const result = await runWithModelFallback({ + cfg, + provider: "openai", + model: "gpt-4.1-mini", + run, + }); + + expect(result.result).toBe("ok"); + expect(run).toHaveBeenCalledTimes(3); + expect(run.mock.calls).toEqual([ + ["openai", "gpt-4.1-mini"], + ["openai", "gpt-4.1-mini"], + ["anthropic", "claude-haiku-3-5"], + ]); + }); + + it("can disable empty-stream in-model retry with feature flag", async () => { + const cfg = makeCfg(); + process.env.OPENCLAW_EMPTY_STREAM_RETRY = "0"; + const run = vi + .fn() + .mockRejectedValueOnce(new Error("request ended without sending any chunks")) + .mockResolvedValueOnce("ok"); + + const result = await runWithModelFallback({ + cfg, + provider: "openai", + model: "gpt-4.1-mini", + run, + }); + + expect(result.result).toBe("ok"); + expect(run).toHaveBeenCalledTimes(2); + expect(run.mock.calls).toEqual([ + ["openai", "gpt-4.1-mini"], + ["anthropic", "claude-haiku-3-5"], + ]); + }); + it("falls back on unrecognized errors when candidates remain", async () => { const cfg = makeCfg(); const run = vi.fn().mockRejectedValueOnce(new Error("bad request")).mockResolvedValueOnce("ok"); diff --git a/src/agents/model-fallback.ts b/src/agents/model-fallback.ts index 5fd6e533a1a..8dbd6261e36 100644 --- a/src/agents/model-fallback.ts +++ b/src/agents/model-fallback.ts @@ -63,6 +63,43 @@ function shouldRethrowAbort(err: unknown): boolean { return isFallbackAbortError(err) && !isTimeoutError(err); } +const EMPTY_STREAM_ERROR_RE = + /request ended without sending any chunks|stream ended before first chunk/i; + +function isEmptyStreamError(err: unknown): boolean { + if (typeof err === "string") { + return EMPTY_STREAM_ERROR_RE.test(err); + } + if (err instanceof Error) { + return EMPTY_STREAM_ERROR_RE.test(err.message); + } + return false; +} + +function isEmptyStreamRetryEnabled(): boolean { + const raw = process.env.OPENCLAW_EMPTY_STREAM_RETRY; + if (raw == null || raw === "") { + return true; + } + const normalized = String(raw).trim().toLowerCase(); + return !["0", "false", "off", "no"].includes(normalized); +} + +function resolveEmptyStreamRetryDelayMs(): number { + const minRaw = Number(process.env.OPENCLAW_EMPTY_STREAM_RETRY_MIN_MS); + const maxRaw = Number(process.env.OPENCLAW_EMPTY_STREAM_RETRY_MAX_MS); + const min = Number.isFinite(minRaw) && minRaw >= 0 ? Math.floor(minRaw) : 300; + const max = Number.isFinite(maxRaw) && maxRaw >= min ? Math.floor(maxRaw) : 800; + if (max <= min) { + return min; + } + return Math.floor(min + Math.random() * (max - min + 1)); +} + +async function sleepEmptyStreamRetry(ms: number): Promise { + await new Promise((resolve) => setTimeout(resolve, ms)); +} + function createModelCandidateCollector(allowlist: Set | null | undefined): { candidates: ModelCandidate[]; addExplicitCandidate: (candidate: ModelCandidate) => void; @@ -176,6 +213,35 @@ async function runFallbackAttempt(params: { }), }; } + + if (isEmptyStreamError(runResult.error) && isEmptyStreamRetryEnabled()) { + const retryDelayMs = resolveEmptyStreamRetryDelayMs(); + params.attempts.push({ + provider: params.provider, + model: params.model, + error: `Empty stream before first chunk; retrying once after ${retryDelayMs}ms`, + }); + await sleepEmptyStreamRetry(retryDelayMs); + + const retryResult = await runFallbackCandidate({ + run: params.run, + provider: params.provider, + model: params.model, + options: params.options, + }); + if (retryResult.ok) { + return { + success: buildFallbackSuccess({ + result: retryResult.result, + provider: params.provider, + model: params.model, + attempts: params.attempts, + }), + }; + } + return { error: retryResult.error }; + } + return { error: runResult.error }; } diff --git a/src/agents/pi-embedded-helpers.isbillingerrormessage.test.ts b/src/agents/pi-embedded-helpers.isbillingerrormessage.test.ts index 8c0a0b1994d..bbe03f27958 100644 --- a/src/agents/pi-embedded-helpers.isbillingerrormessage.test.ts +++ b/src/agents/pi-embedded-helpers.isbillingerrormessage.test.ts @@ -731,6 +731,11 @@ describe("classifyFailoverReason", () => { expect(classifyFailoverReason(OPENAI_RATE_LIMIT_MESSAGE)).toBe("rate_limit"); expect(classifyFailoverReason(GEMINI_RESOURCE_EXHAUSTED_MESSAGE)).toBe("rate_limit"); expect(classifyFailoverReason(ANTHROPIC_OVERLOADED_PAYLOAD)).toBe("overloaded"); + expect( + classifyFailoverReason( + '500 {"error":{"type":"new_api_error","message":"当前模型 claude-opus-4-6 负载已经达到上限,请稍后重试"}}', + ), + ).toBe("overloaded"); expect(classifyFailoverReason(OPENROUTER_CREDITS_MESSAGE)).toBe("billing"); expect(classifyFailoverReason(TOGETHER_PAYMENT_REQUIRED_MESSAGE)).toBe("billing"); expect(classifyFailoverReason(TOGETHER_ENGINE_OVERLOADED_MESSAGE)).toBe("overloaded"); @@ -781,6 +786,7 @@ describe("classifyFailoverReason", () => { expect(classifyFailoverReason(INSUFFICIENT_QUOTA_PAYLOAD)).toBe("billing"); expect(classifyFailoverReason("deadline exceeded")).toBe("timeout"); expect(classifyFailoverReason("request ended without sending any chunks")).toBe("timeout"); + expect(classifyFailoverReason("stream ended before first chunk")).toBe("timeout"); expect(classifyFailoverReason("Connection error.")).toBe("timeout"); expect(classifyFailoverReason("fetch failed")).toBe("timeout"); expect(classifyFailoverReason("network error: ECONNREFUSED")).toBe("timeout"); diff --git a/src/agents/pi-embedded-helpers/failover-matches.ts b/src/agents/pi-embedded-helpers/failover-matches.ts index 9f6e83e9461..3f7370fac0e 100644 --- a/src/agents/pi-embedded-helpers/failover-matches.ts +++ b/src/agents/pi-embedded-helpers/failover-matches.ts @@ -17,7 +17,7 @@ const ERROR_PATTERNS = { "tokens per day", ], overloaded: [ - /overloaded_error|"type"\s*:\s*"overloaded_error"/i, + /overloaded_error|"type"\s*:\s*"overloaded_error"|负载(?:已经)?达到上限/i, "overloaded", // Match "service unavailable" only when combined with an explicit overload // indicator — a generic 503 from a proxy/CDN should not be classified as @@ -47,6 +47,7 @@ const ERROR_PATTERNS = { /\benotfound\b/i, /\beai_again\b/i, /without sending (?:any )?chunks?/i, + "stream ended before first chunk", /\bstop reason:\s*(?:abort|error|malformed_response|network_error)\b/i, /\breason:\s*(?:abort|error|malformed_response|network_error)\b/i, /\bunhandled stop reason:\s*(?:abort|error|malformed_response|network_error)\b/i,