fix(agent): harden undici stream timeouts for long openai-completions runs
This commit is contained in:
parent
4daaea1190
commit
05fb16d151
@ -40,6 +40,7 @@ Docs: https://docs.openclaw.ai
|
||||
- Heartbeat/workspace-path guardrails: append explicit workspace `HEARTBEAT.md` path guidance (and `docs/heartbeat.md` avoidance) to heartbeat prompts so heartbeat runs target workspace checklists reliably across packaged install layouts. (#37037) Thanks @stofancy.
|
||||
- Subagents/kill-complete announce race: when a late `subagent-complete` lifecycle event arrives after an earlier kill marker, clear stale kill suppression/cleanup flags and re-run announce cleanup so finished runs no longer get silently swallowed. (#37024) Thanks @cmfinlan.
|
||||
- Agents/tool-result cleanup timeout hardening: on embedded runner teardown idle timeouts, clear pending tool-call state without persisting synthetic `missing tool result` entries, preventing timeout cleanups from poisoning follow-up turns; adds regression coverage for timeout clear-vs-flush behavior. (#37081) Thanks @Coyote-Den.
|
||||
- Agents/openai-completions stream timeout hardening: ensure runtime undici global dispatchers use extended streaming body/header timeouts (including env-proxy dispatcher mode) before embedded runs, reducing forced mid-stream `terminated` failures on long generations; adds regression coverage for dispatcher selection and idempotent reconfiguration. (#9708) Thanks @scottchguard.
|
||||
- Cron/OpenAI Codex OAuth refresh hardening: when `openai-codex` token refresh fails specifically on account-id extraction, reuse the cached access token instead of failing the run immediately, with regression coverage to keep non-Codex and unrelated refresh failures unchanged. (#36604) Thanks @laulopezreal.
|
||||
- Gateway/remote WS break-glass hostname support: honor `OPENCLAW_ALLOW_INSECURE_PRIVATE_WS=1` for `ws://` hostname URLs (not only private IP literals) across onboarding validation and runtime gateway connection checks, while still rejecting public IP literals and non-unicast IPv6 endpoints. (#36930) Thanks @manju-rn.
|
||||
- Routing/binding lookup scalability: pre-index route bindings by channel/account and avoid full binding-list rescans on channel-account cache rollover, preventing multi-second `resolveAgentRoute` stalls in large binding configurations. (#36915) Thanks @songchenghao.
|
||||
|
||||
@ -11,6 +11,7 @@ import { resolveHeartbeatPrompt } from "../../../auto-reply/heartbeat.js";
|
||||
import { resolveChannelCapabilities } from "../../../config/channel-capabilities.js";
|
||||
import type { OpenClawConfig } from "../../../config/config.js";
|
||||
import { getMachineDisplayName } from "../../../infra/machine-name.js";
|
||||
import { ensureGlobalUndiciStreamTimeouts } from "../../../infra/net/undici-global-dispatcher.js";
|
||||
import { MAX_IMAGE_BYTES } from "../../../media/constants.js";
|
||||
import { getGlobalHookRunner } from "../../../plugins/hook-runner-global.js";
|
||||
import type {
|
||||
@ -685,6 +686,7 @@ export async function runEmbeddedAttempt(
|
||||
const resolvedWorkspace = resolveUserPath(params.workspaceDir);
|
||||
const prevCwd = process.cwd();
|
||||
const runAbortController = new AbortController();
|
||||
ensureGlobalUndiciStreamTimeouts();
|
||||
|
||||
log.debug(
|
||||
`embedded run start: runId=${params.runId} sessionId=${params.sessionId} provider=${params.provider} model=${params.modelId} thinking=${params.thinkLevel} messageChannel=${params.messageChannel ?? params.messageProvider ?? "unknown"}`,
|
||||
|
||||
138
src/infra/net/undici-global-dispatcher.test.ts
Normal file
138
src/infra/net/undici-global-dispatcher.test.ts
Normal file
@ -0,0 +1,138 @@
|
||||
import { beforeEach, describe, expect, it, vi } from "vitest";
|
||||
|
||||
const {
|
||||
Agent,
|
||||
EnvHttpProxyAgent,
|
||||
ProxyAgent,
|
||||
getGlobalDispatcher,
|
||||
setGlobalDispatcher,
|
||||
setCurrentDispatcher,
|
||||
getCurrentDispatcher,
|
||||
getDefaultAutoSelectFamily,
|
||||
} = vi.hoisted(() => {
|
||||
class Agent {
|
||||
constructor(public readonly options?: Record<string, unknown>) {}
|
||||
}
|
||||
|
||||
class EnvHttpProxyAgent {
|
||||
constructor(public readonly options?: Record<string, unknown>) {}
|
||||
}
|
||||
|
||||
class ProxyAgent {
|
||||
constructor(public readonly url: string) {}
|
||||
}
|
||||
|
||||
let currentDispatcher: unknown = new Agent();
|
||||
|
||||
const getGlobalDispatcher = vi.fn(() => currentDispatcher);
|
||||
const setGlobalDispatcher = vi.fn((next: unknown) => {
|
||||
currentDispatcher = next;
|
||||
});
|
||||
const setCurrentDispatcher = (next: unknown) => {
|
||||
currentDispatcher = next;
|
||||
};
|
||||
const getCurrentDispatcher = () => currentDispatcher;
|
||||
const getDefaultAutoSelectFamily = vi.fn(() => undefined as boolean | undefined);
|
||||
|
||||
return {
|
||||
Agent,
|
||||
EnvHttpProxyAgent,
|
||||
ProxyAgent,
|
||||
getGlobalDispatcher,
|
||||
setGlobalDispatcher,
|
||||
setCurrentDispatcher,
|
||||
getCurrentDispatcher,
|
||||
getDefaultAutoSelectFamily,
|
||||
};
|
||||
});
|
||||
|
||||
vi.mock("undici", () => ({
|
||||
Agent,
|
||||
EnvHttpProxyAgent,
|
||||
getGlobalDispatcher,
|
||||
setGlobalDispatcher,
|
||||
}));
|
||||
|
||||
vi.mock("node:net", () => ({
|
||||
getDefaultAutoSelectFamily,
|
||||
}));
|
||||
|
||||
import {
|
||||
DEFAULT_UNDICI_STREAM_TIMEOUT_MS,
|
||||
ensureGlobalUndiciStreamTimeouts,
|
||||
resetGlobalUndiciStreamTimeoutsForTests,
|
||||
} from "./undici-global-dispatcher.js";
|
||||
|
||||
describe("ensureGlobalUndiciStreamTimeouts", () => {
|
||||
beforeEach(() => {
|
||||
vi.clearAllMocks();
|
||||
resetGlobalUndiciStreamTimeoutsForTests();
|
||||
setCurrentDispatcher(new Agent());
|
||||
getDefaultAutoSelectFamily.mockReturnValue(undefined);
|
||||
});
|
||||
|
||||
it("replaces default Agent dispatcher with extended stream timeouts", () => {
|
||||
getDefaultAutoSelectFamily.mockReturnValue(true);
|
||||
|
||||
ensureGlobalUndiciStreamTimeouts();
|
||||
|
||||
expect(setGlobalDispatcher).toHaveBeenCalledTimes(1);
|
||||
const next = getCurrentDispatcher() as { options?: Record<string, unknown> };
|
||||
expect(next).toBeInstanceOf(Agent);
|
||||
expect(next.options?.bodyTimeout).toBe(DEFAULT_UNDICI_STREAM_TIMEOUT_MS);
|
||||
expect(next.options?.headersTimeout).toBe(DEFAULT_UNDICI_STREAM_TIMEOUT_MS);
|
||||
expect(next.options?.connect).toEqual({
|
||||
autoSelectFamily: true,
|
||||
autoSelectFamilyAttemptTimeout: 300,
|
||||
});
|
||||
});
|
||||
|
||||
it("replaces EnvHttpProxyAgent dispatcher while preserving env-proxy mode", () => {
|
||||
getDefaultAutoSelectFamily.mockReturnValue(false);
|
||||
setCurrentDispatcher(new EnvHttpProxyAgent());
|
||||
|
||||
ensureGlobalUndiciStreamTimeouts();
|
||||
|
||||
expect(setGlobalDispatcher).toHaveBeenCalledTimes(1);
|
||||
const next = getCurrentDispatcher() as { options?: Record<string, unknown> };
|
||||
expect(next).toBeInstanceOf(EnvHttpProxyAgent);
|
||||
expect(next.options?.bodyTimeout).toBe(DEFAULT_UNDICI_STREAM_TIMEOUT_MS);
|
||||
expect(next.options?.headersTimeout).toBe(DEFAULT_UNDICI_STREAM_TIMEOUT_MS);
|
||||
expect(next.options?.connect).toEqual({
|
||||
autoSelectFamily: false,
|
||||
autoSelectFamilyAttemptTimeout: 300,
|
||||
});
|
||||
});
|
||||
|
||||
it("does not override unsupported custom proxy dispatcher types", () => {
|
||||
setCurrentDispatcher(new ProxyAgent("http://proxy.test:8080"));
|
||||
|
||||
ensureGlobalUndiciStreamTimeouts();
|
||||
|
||||
expect(setGlobalDispatcher).not.toHaveBeenCalled();
|
||||
});
|
||||
|
||||
it("is idempotent for unchanged dispatcher kind and network policy", () => {
|
||||
getDefaultAutoSelectFamily.mockReturnValue(true);
|
||||
|
||||
ensureGlobalUndiciStreamTimeouts();
|
||||
ensureGlobalUndiciStreamTimeouts();
|
||||
|
||||
expect(setGlobalDispatcher).toHaveBeenCalledTimes(1);
|
||||
});
|
||||
|
||||
it("re-applies when autoSelectFamily decision changes", () => {
|
||||
getDefaultAutoSelectFamily.mockReturnValue(true);
|
||||
ensureGlobalUndiciStreamTimeouts();
|
||||
|
||||
getDefaultAutoSelectFamily.mockReturnValue(false);
|
||||
ensureGlobalUndiciStreamTimeouts();
|
||||
|
||||
expect(setGlobalDispatcher).toHaveBeenCalledTimes(2);
|
||||
const next = getCurrentDispatcher() as { options?: Record<string, unknown> };
|
||||
expect(next.options?.connect).toEqual({
|
||||
autoSelectFamily: false,
|
||||
autoSelectFamilyAttemptTimeout: 300,
|
||||
});
|
||||
});
|
||||
});
|
||||
113
src/infra/net/undici-global-dispatcher.ts
Normal file
113
src/infra/net/undici-global-dispatcher.ts
Normal file
@ -0,0 +1,113 @@
|
||||
import * as net from "node:net";
|
||||
import { Agent, EnvHttpProxyAgent, getGlobalDispatcher, setGlobalDispatcher } from "undici";
|
||||
|
||||
export const DEFAULT_UNDICI_STREAM_TIMEOUT_MS = 30 * 60 * 1000;
|
||||
|
||||
const AUTO_SELECT_FAMILY_ATTEMPT_TIMEOUT_MS = 300;
|
||||
|
||||
let lastAppliedDispatcherKey: string | null = null;
|
||||
|
||||
type DispatcherKind = "agent" | "env-proxy" | "unsupported";
|
||||
|
||||
function resolveDispatcherKind(dispatcher: unknown): DispatcherKind {
|
||||
const ctorName = (dispatcher as { constructor?: { name?: string } })?.constructor?.name;
|
||||
if (typeof ctorName !== "string" || ctorName.length === 0) {
|
||||
return "unsupported";
|
||||
}
|
||||
if (ctorName.includes("EnvHttpProxyAgent")) {
|
||||
return "env-proxy";
|
||||
}
|
||||
if (ctorName.includes("ProxyAgent")) {
|
||||
return "unsupported";
|
||||
}
|
||||
if (ctorName.includes("Agent")) {
|
||||
return "agent";
|
||||
}
|
||||
return "unsupported";
|
||||
}
|
||||
|
||||
function resolveAutoSelectFamily(): boolean | undefined {
|
||||
if (typeof net.getDefaultAutoSelectFamily !== "function") {
|
||||
return undefined;
|
||||
}
|
||||
try {
|
||||
return net.getDefaultAutoSelectFamily();
|
||||
} catch {
|
||||
return undefined;
|
||||
}
|
||||
}
|
||||
|
||||
function resolveConnectOptions(
|
||||
autoSelectFamily: boolean | undefined,
|
||||
): { autoSelectFamily: boolean; autoSelectFamilyAttemptTimeout: number } | undefined {
|
||||
if (autoSelectFamily === undefined) {
|
||||
return undefined;
|
||||
}
|
||||
return {
|
||||
autoSelectFamily,
|
||||
autoSelectFamilyAttemptTimeout: AUTO_SELECT_FAMILY_ATTEMPT_TIMEOUT_MS,
|
||||
};
|
||||
}
|
||||
|
||||
function resolveDispatcherKey(params: {
|
||||
kind: DispatcherKind;
|
||||
timeoutMs: number;
|
||||
autoSelectFamily: boolean | undefined;
|
||||
}): string {
|
||||
const autoSelectToken =
|
||||
params.autoSelectFamily === undefined ? "na" : params.autoSelectFamily ? "on" : "off";
|
||||
return `${params.kind}:${params.timeoutMs}:${autoSelectToken}`;
|
||||
}
|
||||
|
||||
export function ensureGlobalUndiciStreamTimeouts(opts?: { timeoutMs?: number }): void {
|
||||
const timeoutMsRaw = opts?.timeoutMs ?? DEFAULT_UNDICI_STREAM_TIMEOUT_MS;
|
||||
const timeoutMs = Math.max(1, Math.floor(timeoutMsRaw));
|
||||
if (!Number.isFinite(timeoutMsRaw)) {
|
||||
return;
|
||||
}
|
||||
|
||||
let dispatcher: unknown;
|
||||
try {
|
||||
dispatcher = getGlobalDispatcher();
|
||||
} catch {
|
||||
return;
|
||||
}
|
||||
|
||||
const kind = resolveDispatcherKind(dispatcher);
|
||||
if (kind === "unsupported") {
|
||||
return;
|
||||
}
|
||||
|
||||
const autoSelectFamily = resolveAutoSelectFamily();
|
||||
const nextKey = resolveDispatcherKey({ kind, timeoutMs, autoSelectFamily });
|
||||
if (lastAppliedDispatcherKey === nextKey) {
|
||||
return;
|
||||
}
|
||||
|
||||
const connect = resolveConnectOptions(autoSelectFamily);
|
||||
try {
|
||||
if (kind === "env-proxy") {
|
||||
const proxyOptions = {
|
||||
bodyTimeout: timeoutMs,
|
||||
headersTimeout: timeoutMs,
|
||||
...(connect ? { connect } : {}),
|
||||
} as ConstructorParameters<typeof EnvHttpProxyAgent>[0];
|
||||
setGlobalDispatcher(new EnvHttpProxyAgent(proxyOptions));
|
||||
} else {
|
||||
setGlobalDispatcher(
|
||||
new Agent({
|
||||
bodyTimeout: timeoutMs,
|
||||
headersTimeout: timeoutMs,
|
||||
...(connect ? { connect } : {}),
|
||||
}),
|
||||
);
|
||||
}
|
||||
lastAppliedDispatcherKey = nextKey;
|
||||
} catch {
|
||||
// Best-effort hardening only.
|
||||
}
|
||||
}
|
||||
|
||||
export function resetGlobalUndiciStreamTimeoutsForTests(): void {
|
||||
lastAppliedDispatcherKey = null;
|
||||
}
|
||||
Loading…
x
Reference in New Issue
Block a user