Merge 02c83082847b3204cf69ca6846baedad54d7fa46 into 8a05c05596ca9ba0735dafd8e359885de4c2c969
This commit is contained in:
commit
b39171bdc3
@ -2,7 +2,7 @@ import crypto from "node:crypto";
|
||||
import fs from "node:fs/promises";
|
||||
import os from "node:os";
|
||||
import path from "node:path";
|
||||
import { describe, expect, it, vi } from "vitest";
|
||||
import { afterEach, describe, expect, it, vi } from "vitest";
|
||||
import type { OpenClawConfig } from "../config/config.js";
|
||||
import { resetLogger, setLoggerOverride } from "../logging/logger.js";
|
||||
import type { AuthProfileStore } from "./auth-profiles.js";
|
||||
@ -189,6 +189,28 @@ const MODEL_COOLDOWN_MESSAGE = "model_cooldown: All credentials for model gpt-5
|
||||
// SDK/transport compatibility marker, not a provider API contract.
|
||||
const CONNECTION_ERROR_MESSAGE = "Connection error.";
|
||||
|
||||
const originalEmptyStreamRetry = process.env.OPENCLAW_EMPTY_STREAM_RETRY;
|
||||
const originalEmptyStreamRetryMin = process.env.OPENCLAW_EMPTY_STREAM_RETRY_MIN_MS;
|
||||
const originalEmptyStreamRetryMax = process.env.OPENCLAW_EMPTY_STREAM_RETRY_MAX_MS;
|
||||
|
||||
afterEach(() => {
|
||||
if (originalEmptyStreamRetry === undefined) {
|
||||
delete process.env.OPENCLAW_EMPTY_STREAM_RETRY;
|
||||
} else {
|
||||
process.env.OPENCLAW_EMPTY_STREAM_RETRY = originalEmptyStreamRetry;
|
||||
}
|
||||
if (originalEmptyStreamRetryMin === undefined) {
|
||||
delete process.env.OPENCLAW_EMPTY_STREAM_RETRY_MIN_MS;
|
||||
} else {
|
||||
process.env.OPENCLAW_EMPTY_STREAM_RETRY_MIN_MS = originalEmptyStreamRetryMin;
|
||||
}
|
||||
if (originalEmptyStreamRetryMax === undefined) {
|
||||
delete process.env.OPENCLAW_EMPTY_STREAM_RETRY_MAX_MS;
|
||||
} else {
|
||||
process.env.OPENCLAW_EMPTY_STREAM_RETRY_MAX_MS = originalEmptyStreamRetryMax;
|
||||
}
|
||||
});
|
||||
|
||||
describe("runWithModelFallback", () => {
|
||||
it("keeps openai gpt-5.3 codex on the openai provider before running", async () => {
|
||||
const cfg = makeCfg();
|
||||
@ -206,6 +228,80 @@ describe("runWithModelFallback", () => {
|
||||
expect(run).toHaveBeenCalledWith("openai", "gpt-5.3-codex");
|
||||
});
|
||||
|
||||
it("retries once on empty-stream errors before falling back", async () => {
|
||||
const cfg = makeCfg();
|
||||
process.env.OPENCLAW_EMPTY_STREAM_RETRY_MIN_MS = "0";
|
||||
process.env.OPENCLAW_EMPTY_STREAM_RETRY_MAX_MS = "0";
|
||||
const run = vi
|
||||
.fn()
|
||||
.mockRejectedValueOnce(new Error("request ended without sending any chunks"))
|
||||
.mockResolvedValueOnce("ok");
|
||||
|
||||
const result = await runWithModelFallback({
|
||||
cfg,
|
||||
provider: "openai",
|
||||
model: "gpt-4.1-mini",
|
||||
run,
|
||||
});
|
||||
|
||||
expect(result.result).toBe("ok");
|
||||
expect(run).toHaveBeenCalledTimes(2);
|
||||
expect(run.mock.calls).toEqual([
|
||||
["openai", "gpt-4.1-mini"],
|
||||
["openai", "gpt-4.1-mini"],
|
||||
]);
|
||||
expect(result.attempts[0]?.error).toContain("Empty stream before first chunk");
|
||||
});
|
||||
|
||||
it("falls back after empty-stream retry still fails", async () => {
|
||||
const cfg = makeCfg();
|
||||
process.env.OPENCLAW_EMPTY_STREAM_RETRY_MIN_MS = "0";
|
||||
process.env.OPENCLAW_EMPTY_STREAM_RETRY_MAX_MS = "0";
|
||||
const run = vi
|
||||
.fn()
|
||||
.mockRejectedValueOnce(new Error("request ended without sending any chunks"))
|
||||
.mockRejectedValueOnce(new Error("request ended without sending any chunks"))
|
||||
.mockResolvedValueOnce("ok");
|
||||
|
||||
const result = await runWithModelFallback({
|
||||
cfg,
|
||||
provider: "openai",
|
||||
model: "gpt-4.1-mini",
|
||||
run,
|
||||
});
|
||||
|
||||
expect(result.result).toBe("ok");
|
||||
expect(run).toHaveBeenCalledTimes(3);
|
||||
expect(run.mock.calls).toEqual([
|
||||
["openai", "gpt-4.1-mini"],
|
||||
["openai", "gpt-4.1-mini"],
|
||||
["anthropic", "claude-haiku-3-5"],
|
||||
]);
|
||||
});
|
||||
|
||||
it("can disable empty-stream in-model retry with feature flag", async () => {
|
||||
const cfg = makeCfg();
|
||||
process.env.OPENCLAW_EMPTY_STREAM_RETRY = "0";
|
||||
const run = vi
|
||||
.fn()
|
||||
.mockRejectedValueOnce(new Error("request ended without sending any chunks"))
|
||||
.mockResolvedValueOnce("ok");
|
||||
|
||||
const result = await runWithModelFallback({
|
||||
cfg,
|
||||
provider: "openai",
|
||||
model: "gpt-4.1-mini",
|
||||
run,
|
||||
});
|
||||
|
||||
expect(result.result).toBe("ok");
|
||||
expect(run).toHaveBeenCalledTimes(2);
|
||||
expect(run.mock.calls).toEqual([
|
||||
["openai", "gpt-4.1-mini"],
|
||||
["anthropic", "claude-haiku-3-5"],
|
||||
]);
|
||||
});
|
||||
|
||||
it("falls back on unrecognized errors when candidates remain", async () => {
|
||||
const cfg = makeCfg();
|
||||
const run = vi.fn().mockRejectedValueOnce(new Error("bad request")).mockResolvedValueOnce("ok");
|
||||
|
||||
@ -63,6 +63,43 @@ function shouldRethrowAbort(err: unknown): boolean {
|
||||
return isFallbackAbortError(err) && !isTimeoutError(err);
|
||||
}
|
||||
|
||||
const EMPTY_STREAM_ERROR_RE =
|
||||
/request ended without sending any chunks|stream ended before first chunk/i;
|
||||
|
||||
function isEmptyStreamError(err: unknown): boolean {
|
||||
if (typeof err === "string") {
|
||||
return EMPTY_STREAM_ERROR_RE.test(err);
|
||||
}
|
||||
if (err instanceof Error) {
|
||||
return EMPTY_STREAM_ERROR_RE.test(err.message);
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
function isEmptyStreamRetryEnabled(): boolean {
|
||||
const raw = process.env.OPENCLAW_EMPTY_STREAM_RETRY;
|
||||
if (raw == null || raw === "") {
|
||||
return true;
|
||||
}
|
||||
const normalized = String(raw).trim().toLowerCase();
|
||||
return !["0", "false", "off", "no"].includes(normalized);
|
||||
}
|
||||
|
||||
function resolveEmptyStreamRetryDelayMs(): number {
|
||||
const minRaw = Number(process.env.OPENCLAW_EMPTY_STREAM_RETRY_MIN_MS);
|
||||
const maxRaw = Number(process.env.OPENCLAW_EMPTY_STREAM_RETRY_MAX_MS);
|
||||
const min = Number.isFinite(minRaw) && minRaw >= 0 ? Math.floor(minRaw) : 300;
|
||||
const max = Number.isFinite(maxRaw) && maxRaw >= min ? Math.floor(maxRaw) : 800;
|
||||
if (max <= min) {
|
||||
return min;
|
||||
}
|
||||
return Math.floor(min + Math.random() * (max - min + 1));
|
||||
}
|
||||
|
||||
async function sleepEmptyStreamRetry(ms: number): Promise<void> {
|
||||
await new Promise((resolve) => setTimeout(resolve, ms));
|
||||
}
|
||||
|
||||
function createModelCandidateCollector(allowlist: Set<string> | null | undefined): {
|
||||
candidates: ModelCandidate[];
|
||||
addExplicitCandidate: (candidate: ModelCandidate) => void;
|
||||
@ -176,6 +213,35 @@ async function runFallbackAttempt<T>(params: {
|
||||
}),
|
||||
};
|
||||
}
|
||||
|
||||
if (isEmptyStreamError(runResult.error) && isEmptyStreamRetryEnabled()) {
|
||||
const retryDelayMs = resolveEmptyStreamRetryDelayMs();
|
||||
params.attempts.push({
|
||||
provider: params.provider,
|
||||
model: params.model,
|
||||
error: `Empty stream before first chunk; retrying once after ${retryDelayMs}ms`,
|
||||
});
|
||||
await sleepEmptyStreamRetry(retryDelayMs);
|
||||
|
||||
const retryResult = await runFallbackCandidate({
|
||||
run: params.run,
|
||||
provider: params.provider,
|
||||
model: params.model,
|
||||
options: params.options,
|
||||
});
|
||||
if (retryResult.ok) {
|
||||
return {
|
||||
success: buildFallbackSuccess({
|
||||
result: retryResult.result,
|
||||
provider: params.provider,
|
||||
model: params.model,
|
||||
attempts: params.attempts,
|
||||
}),
|
||||
};
|
||||
}
|
||||
return { error: retryResult.error };
|
||||
}
|
||||
|
||||
return { error: runResult.error };
|
||||
}
|
||||
|
||||
|
||||
@ -731,6 +731,11 @@ describe("classifyFailoverReason", () => {
|
||||
expect(classifyFailoverReason(OPENAI_RATE_LIMIT_MESSAGE)).toBe("rate_limit");
|
||||
expect(classifyFailoverReason(GEMINI_RESOURCE_EXHAUSTED_MESSAGE)).toBe("rate_limit");
|
||||
expect(classifyFailoverReason(ANTHROPIC_OVERLOADED_PAYLOAD)).toBe("overloaded");
|
||||
expect(
|
||||
classifyFailoverReason(
|
||||
'500 {"error":{"type":"new_api_error","message":"当前模型 claude-opus-4-6 负载已经达到上限,请稍后重试"}}',
|
||||
),
|
||||
).toBe("overloaded");
|
||||
expect(classifyFailoverReason(OPENROUTER_CREDITS_MESSAGE)).toBe("billing");
|
||||
expect(classifyFailoverReason(TOGETHER_PAYMENT_REQUIRED_MESSAGE)).toBe("billing");
|
||||
expect(classifyFailoverReason(TOGETHER_ENGINE_OVERLOADED_MESSAGE)).toBe("overloaded");
|
||||
@ -781,6 +786,7 @@ describe("classifyFailoverReason", () => {
|
||||
expect(classifyFailoverReason(INSUFFICIENT_QUOTA_PAYLOAD)).toBe("billing");
|
||||
expect(classifyFailoverReason("deadline exceeded")).toBe("timeout");
|
||||
expect(classifyFailoverReason("request ended without sending any chunks")).toBe("timeout");
|
||||
expect(classifyFailoverReason("stream ended before first chunk")).toBe("timeout");
|
||||
expect(classifyFailoverReason("Connection error.")).toBe("timeout");
|
||||
expect(classifyFailoverReason("fetch failed")).toBe("timeout");
|
||||
expect(classifyFailoverReason("network error: ECONNREFUSED")).toBe("timeout");
|
||||
|
||||
@ -17,7 +17,7 @@ const ERROR_PATTERNS = {
|
||||
"tokens per day",
|
||||
],
|
||||
overloaded: [
|
||||
/overloaded_error|"type"\s*:\s*"overloaded_error"/i,
|
||||
/overloaded_error|"type"\s*:\s*"overloaded_error"|负载(?:已经)?达到上限/i,
|
||||
"overloaded",
|
||||
// Match "service unavailable" only when combined with an explicit overload
|
||||
// indicator — a generic 503 from a proxy/CDN should not be classified as
|
||||
@ -47,6 +47,7 @@ const ERROR_PATTERNS = {
|
||||
/\benotfound\b/i,
|
||||
/\beai_again\b/i,
|
||||
/without sending (?:any )?chunks?/i,
|
||||
"stream ended before first chunk",
|
||||
/\bstop reason:\s*(?:abort|error|malformed_response|network_error)\b/i,
|
||||
/\breason:\s*(?:abort|error|malformed_response|network_error)\b/i,
|
||||
/\bunhandled stop reason:\s*(?:abort|error|malformed_response|network_error)\b/i,
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user