fix: scope telegram polling restart to telegram errors (#43799)
* fix: scope telegram polling restart to telegram errors * fix: make telegram error tagging best-effort * fix: scope telegram polling restart to telegram errors (#43799)
This commit is contained in:
parent
82e3ac21ee
commit
ed0ec57a7b
@ -68,6 +68,7 @@ Docs: https://docs.openclaw.ai
|
||||
- Telegram/final preview delivery: split active preview lifecycle from cleanup retention so missing archived preview edits avoid duplicate fallback sends without clearing the live preview or blocking later in-place finalization. (#41662) thanks @hougangdev.
|
||||
- Telegram/final preview delivery followup: keep ambiguous missing-`message_id` finals only when a preview was already visible, while first-preview/no-id cases still fall back so Telegram users do not lose the final reply. (#41932) thanks @hougangdev.
|
||||
- Telegram/final preview cleanup follow-up: clear stale cleanup-retain state only for transient preview finals so archived-preview retains no longer leave a stale partial bubble beside a later fallback-sent final. (#41763) Thanks @obviyus.
|
||||
- Telegram/poll restarts: scope process-level polling restarts to real Telegram `getUpdates` failures so unrelated network errors, such as Slack DNS misses, no longer bounce Telegram polling. (#43799) Thanks @obviyus.
|
||||
- Gateway/auth: allow one trusted device-token retry on shared-token mismatch with recovery hints to prevent reconnect churn during token drift. (#42507) Thanks @joshavant.
|
||||
- Gateway/config errors: surface up to three validation issues in top-level `config.set`, `config.patch`, and `config.apply` error messages while preserving structured issue details. (#42664) Thanks @huntharo.
|
||||
- Agents/Azure OpenAI Responses: include the `azure-openai` provider in the Responses API store override so Azure OpenAI multi-turn cron jobs and embedded agent runs no longer fail with HTTP 400 "store is set to false". (#42934, fixes #42800) Thanks @ademczuk.
|
||||
|
||||
@ -1,10 +1,10 @@
|
||||
import { describe, expect, it, vi } from "vitest";
|
||||
import { botCtorSpy } from "./bot.create-telegram-bot.test-harness.js";
|
||||
import { createTelegramBot } from "./bot.js";
|
||||
import { getTelegramNetworkErrorOrigin } from "./network-errors.js";
|
||||
|
||||
describe("createTelegramBot fetch abort", () => {
|
||||
it("aborts wrapped client fetch when fetchAbortSignal aborts", async () => {
|
||||
const originalFetch = globalThis.fetch;
|
||||
const shutdown = new AbortController();
|
||||
const fetchSpy = vi.fn(
|
||||
(_input: RequestInfo | URL, init?: RequestInit) =>
|
||||
@ -13,22 +13,78 @@ describe("createTelegramBot fetch abort", () => {
|
||||
signal.addEventListener("abort", () => resolve(signal), { once: true });
|
||||
}),
|
||||
);
|
||||
globalThis.fetch = fetchSpy as unknown as typeof fetch;
|
||||
try {
|
||||
botCtorSpy.mockClear();
|
||||
createTelegramBot({ token: "tok", fetchAbortSignal: shutdown.signal });
|
||||
const clientFetch = (botCtorSpy.mock.calls.at(-1)?.[1] as { client?: { fetch?: unknown } })
|
||||
?.client?.fetch as (input: RequestInfo | URL, init?: RequestInit) => Promise<unknown>;
|
||||
expect(clientFetch).toBeTypeOf("function");
|
||||
botCtorSpy.mockClear();
|
||||
createTelegramBot({
|
||||
token: "tok",
|
||||
fetchAbortSignal: shutdown.signal,
|
||||
proxyFetch: fetchSpy as unknown as typeof fetch,
|
||||
});
|
||||
const clientFetch = (botCtorSpy.mock.calls.at(-1)?.[1] as { client?: { fetch?: unknown } })
|
||||
?.client?.fetch as (input: RequestInfo | URL, init?: RequestInit) => Promise<unknown>;
|
||||
expect(clientFetch).toBeTypeOf("function");
|
||||
|
||||
const observedSignalPromise = clientFetch("https://example.test");
|
||||
shutdown.abort(new Error("shutdown"));
|
||||
const observedSignal = (await observedSignalPromise) as AbortSignal;
|
||||
const observedSignalPromise = clientFetch("https://example.test");
|
||||
shutdown.abort(new Error("shutdown"));
|
||||
const observedSignal = (await observedSignalPromise) as AbortSignal;
|
||||
|
||||
expect(observedSignal).toBeInstanceOf(AbortSignal);
|
||||
expect(observedSignal.aborted).toBe(true);
|
||||
} finally {
|
||||
globalThis.fetch = originalFetch;
|
||||
}
|
||||
expect(observedSignal).toBeInstanceOf(AbortSignal);
|
||||
expect(observedSignal.aborted).toBe(true);
|
||||
});
|
||||
|
||||
it("tags wrapped Telegram fetch failures with the Bot API method", async () => {
|
||||
const shutdown = new AbortController();
|
||||
const fetchError = Object.assign(new TypeError("fetch failed"), {
|
||||
cause: Object.assign(new Error("connect timeout"), {
|
||||
code: "UND_ERR_CONNECT_TIMEOUT",
|
||||
}),
|
||||
});
|
||||
const fetchSpy = vi.fn(async () => {
|
||||
throw fetchError;
|
||||
});
|
||||
botCtorSpy.mockClear();
|
||||
createTelegramBot({
|
||||
token: "tok",
|
||||
fetchAbortSignal: shutdown.signal,
|
||||
proxyFetch: fetchSpy as unknown as typeof fetch,
|
||||
});
|
||||
const clientFetch = (botCtorSpy.mock.calls.at(-1)?.[1] as { client?: { fetch?: unknown } })
|
||||
?.client?.fetch as (input: RequestInfo | URL, init?: RequestInit) => Promise<unknown>;
|
||||
expect(clientFetch).toBeTypeOf("function");
|
||||
|
||||
await expect(clientFetch("https://api.telegram.org/bot123456:ABC/getUpdates")).rejects.toBe(
|
||||
fetchError,
|
||||
);
|
||||
expect(getTelegramNetworkErrorOrigin(fetchError)).toEqual({
|
||||
method: "getupdates",
|
||||
url: "https://api.telegram.org/bot123456:ABC/getUpdates",
|
||||
});
|
||||
});
|
||||
|
||||
it("preserves the original fetch error when tagging cannot attach metadata", async () => {
|
||||
const shutdown = new AbortController();
|
||||
const frozenError = Object.freeze(
|
||||
Object.assign(new TypeError("fetch failed"), {
|
||||
cause: Object.assign(new Error("connect timeout"), {
|
||||
code: "UND_ERR_CONNECT_TIMEOUT",
|
||||
}),
|
||||
}),
|
||||
);
|
||||
const fetchSpy = vi.fn(async () => {
|
||||
throw frozenError;
|
||||
});
|
||||
botCtorSpy.mockClear();
|
||||
createTelegramBot({
|
||||
token: "tok",
|
||||
fetchAbortSignal: shutdown.signal,
|
||||
proxyFetch: fetchSpy as unknown as typeof fetch,
|
||||
});
|
||||
const clientFetch = (botCtorSpy.mock.calls.at(-1)?.[1] as { client?: { fetch?: unknown } })
|
||||
?.client?.fetch as (input: RequestInfo | URL, init?: RequestInit) => Promise<unknown>;
|
||||
expect(clientFetch).toBeTypeOf("function");
|
||||
|
||||
await expect(clientFetch("https://api.telegram.org/bot123456:ABC/getUpdates")).rejects.toBe(
|
||||
frozenError,
|
||||
);
|
||||
expect(getTelegramNetworkErrorOrigin(frozenError)).toBeNull();
|
||||
});
|
||||
});
|
||||
|
||||
@ -39,6 +39,7 @@ import {
|
||||
} from "./bot-updates.js";
|
||||
import { buildTelegramGroupPeerId, resolveTelegramStreamMode } from "./bot/helpers.js";
|
||||
import { resolveTelegramFetch } from "./fetch.js";
|
||||
import { tagTelegramNetworkError } from "./network-errors.js";
|
||||
import { createTelegramSendChatActionHandler } from "./sendchataction-401-backoff.js";
|
||||
import { getTelegramSequentialKey } from "./sequential-key.js";
|
||||
import { createTelegramThreadBindingManager } from "./thread-bindings.js";
|
||||
@ -68,6 +69,34 @@ export type TelegramBotOptions = {
|
||||
|
||||
export { getTelegramSequentialKey };
|
||||
|
||||
function readRequestUrl(input: RequestInfo | URL): string | null {
|
||||
if (typeof input === "string") {
|
||||
return input;
|
||||
}
|
||||
if (input instanceof URL) {
|
||||
return input.toString();
|
||||
}
|
||||
if (typeof input === "object" && input !== null && "url" in input) {
|
||||
const url = (input as { url?: unknown }).url;
|
||||
return typeof url === "string" ? url : null;
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
function extractTelegramApiMethod(input: RequestInfo | URL): string | null {
|
||||
const url = readRequestUrl(input);
|
||||
if (!url) {
|
||||
return null;
|
||||
}
|
||||
try {
|
||||
const pathname = new URL(url).pathname;
|
||||
const segments = pathname.split("/").filter(Boolean);
|
||||
return segments.length > 0 ? (segments.at(-1) ?? null) : null;
|
||||
} catch {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
export function createTelegramBot(opts: TelegramBotOptions) {
|
||||
const runtime: RuntimeEnv = opts.runtime ?? createNonExitingRuntime();
|
||||
const cfg = opts.config ?? loadConfig();
|
||||
@ -147,6 +176,23 @@ export function createTelegramBot(opts: TelegramBotOptions) {
|
||||
});
|
||||
}) as unknown as NonNullable<ApiClientOptions["fetch"]>;
|
||||
}
|
||||
if (finalFetch) {
|
||||
const baseFetch = finalFetch;
|
||||
finalFetch = ((input: RequestInfo | URL, init?: RequestInit) => {
|
||||
return Promise.resolve(baseFetch(input, init)).catch((err: unknown) => {
|
||||
try {
|
||||
tagTelegramNetworkError(err, {
|
||||
method: extractTelegramApiMethod(input),
|
||||
url: readRequestUrl(input),
|
||||
});
|
||||
} catch {
|
||||
// Tagging is best-effort; preserve the original fetch failure if the
|
||||
// error object cannot accept extra metadata.
|
||||
}
|
||||
throw err;
|
||||
});
|
||||
}) as unknown as NonNullable<ApiClientOptions["fetch"]>;
|
||||
}
|
||||
|
||||
const timeoutSeconds =
|
||||
typeof telegramCfg?.timeoutSeconds === "number" && Number.isFinite(telegramCfg.timeoutSeconds)
|
||||
|
||||
@ -1,5 +1,6 @@
|
||||
import { afterEach, beforeEach, describe, expect, it, vi } from "vitest";
|
||||
import { monitorTelegramProvider } from "./monitor.js";
|
||||
import { tagTelegramNetworkError } from "./network-errors.js";
|
||||
|
||||
type MockCtx = {
|
||||
message: {
|
||||
@ -102,6 +103,15 @@ function makeRecoverableFetchError() {
|
||||
});
|
||||
}
|
||||
|
||||
function makeTaggedPollingFetchError() {
|
||||
const err = makeRecoverableFetchError();
|
||||
tagTelegramNetworkError(err, {
|
||||
method: "getUpdates",
|
||||
url: "https://api.telegram.org/bot123456:ABC/getUpdates",
|
||||
});
|
||||
return err;
|
||||
}
|
||||
|
||||
const createAbortTask = (
|
||||
abort: AbortController,
|
||||
beforeAbort?: () => void,
|
||||
@ -453,7 +463,7 @@ describe("monitorTelegramProvider (grammY)", () => {
|
||||
const monitor = monitorTelegramProvider({ token: "tok", abortSignal: abort.signal });
|
||||
await vi.waitFor(() => expect(runSpy).toHaveBeenCalledTimes(1));
|
||||
|
||||
expect(emitUnhandledRejection(new TypeError("fetch failed"))).toBe(true);
|
||||
expect(emitUnhandledRejection(makeTaggedPollingFetchError())).toBe(true);
|
||||
await monitor;
|
||||
|
||||
expect(stop.mock.calls.length).toBeGreaterThanOrEqual(1);
|
||||
@ -496,13 +506,54 @@ describe("monitorTelegramProvider (grammY)", () => {
|
||||
expect(firstSignal).toBeInstanceOf(AbortSignal);
|
||||
expect((firstSignal as AbortSignal).aborted).toBe(false);
|
||||
|
||||
expect(emitUnhandledRejection(new TypeError("fetch failed"))).toBe(true);
|
||||
expect(emitUnhandledRejection(makeTaggedPollingFetchError())).toBe(true);
|
||||
await monitor;
|
||||
|
||||
expect((firstSignal as AbortSignal).aborted).toBe(true);
|
||||
expect(stop).toHaveBeenCalled();
|
||||
});
|
||||
|
||||
it("ignores unrelated process-level network errors while telegram polling is active", async () => {
|
||||
const abort = new AbortController();
|
||||
let running = true;
|
||||
let releaseTask: (() => void) | undefined;
|
||||
const stop = vi.fn(async () => {
|
||||
running = false;
|
||||
releaseTask?.();
|
||||
});
|
||||
|
||||
runSpy.mockImplementationOnce(() =>
|
||||
makeRunnerStub({
|
||||
task: () =>
|
||||
new Promise<void>((resolve) => {
|
||||
releaseTask = resolve;
|
||||
}),
|
||||
stop,
|
||||
isRunning: () => running,
|
||||
}),
|
||||
);
|
||||
|
||||
const monitor = monitorTelegramProvider({ token: "tok", abortSignal: abort.signal });
|
||||
await vi.waitFor(() => expect(runSpy).toHaveBeenCalledTimes(1));
|
||||
|
||||
const slackDnsError = Object.assign(
|
||||
new Error("A request error occurred: getaddrinfo ENOTFOUND slack.com"),
|
||||
{
|
||||
code: "ENOTFOUND",
|
||||
hostname: "slack.com",
|
||||
},
|
||||
);
|
||||
expect(emitUnhandledRejection(slackDnsError)).toBe(false);
|
||||
|
||||
abort.abort();
|
||||
await monitor;
|
||||
|
||||
expect(stop).toHaveBeenCalledTimes(1);
|
||||
expect(computeBackoff).not.toHaveBeenCalled();
|
||||
expect(sleepWithAbort).not.toHaveBeenCalled();
|
||||
expect(runSpy).toHaveBeenCalledTimes(1);
|
||||
});
|
||||
|
||||
it("passes configured webhookHost to webhook listener", async () => {
|
||||
await monitorTelegramProvider({
|
||||
token: "tok",
|
||||
|
||||
@ -9,7 +9,10 @@ import type { RuntimeEnv } from "../runtime.js";
|
||||
import { resolveTelegramAccount } from "./accounts.js";
|
||||
import { resolveTelegramAllowedUpdates } from "./allowed-updates.js";
|
||||
import { TelegramExecApprovalHandler } from "./exec-approvals-handler.js";
|
||||
import { isRecoverableTelegramNetworkError } from "./network-errors.js";
|
||||
import {
|
||||
isRecoverableTelegramNetworkError,
|
||||
isTelegramPollingNetworkError,
|
||||
} from "./network-errors.js";
|
||||
import { TelegramPollingSession } from "./polling-session.js";
|
||||
import { makeProxyFetch } from "./proxy.js";
|
||||
import { readTelegramUpdateOffset, writeTelegramUpdateOffset } from "./update-offset-store.js";
|
||||
@ -78,13 +81,14 @@ export async function monitorTelegramProvider(opts: MonitorTelegramOpts = {}) {
|
||||
|
||||
const unregisterHandler = registerUnhandledRejectionHandler((err) => {
|
||||
const isNetworkError = isRecoverableTelegramNetworkError(err, { context: "polling" });
|
||||
if (isGrammyHttpError(err) && isNetworkError) {
|
||||
const isTelegramPollingError = isTelegramPollingNetworkError(err);
|
||||
if (isGrammyHttpError(err) && isNetworkError && isTelegramPollingError) {
|
||||
log(`[telegram] Suppressed network error: ${formatErrorMessage(err)}`);
|
||||
return true;
|
||||
}
|
||||
|
||||
const activeRunner = pollingSession?.activeRunner;
|
||||
if (isNetworkError && activeRunner && activeRunner.isRunning()) {
|
||||
if (isNetworkError && isTelegramPollingError && activeRunner && activeRunner.isRunning()) {
|
||||
pollingSession?.markForceRestarted();
|
||||
pollingSession?.abortActiveFetch();
|
||||
void activeRunner.stop().catch(() => {});
|
||||
|
||||
@ -1,12 +1,37 @@
|
||||
import { describe, expect, it } from "vitest";
|
||||
import {
|
||||
getTelegramNetworkErrorOrigin,
|
||||
isRecoverableTelegramNetworkError,
|
||||
isSafeToRetrySendError,
|
||||
isTelegramClientRejection,
|
||||
isTelegramPollingNetworkError,
|
||||
isTelegramServerError,
|
||||
tagTelegramNetworkError,
|
||||
} from "./network-errors.js";
|
||||
|
||||
describe("isRecoverableTelegramNetworkError", () => {
|
||||
it("tracks Telegram polling origin separately from generic network matching", () => {
|
||||
const slackDnsError = Object.assign(
|
||||
new Error("A request error occurred: getaddrinfo ENOTFOUND slack.com"),
|
||||
{
|
||||
code: "ENOTFOUND",
|
||||
hostname: "slack.com",
|
||||
},
|
||||
);
|
||||
expect(isRecoverableTelegramNetworkError(slackDnsError)).toBe(true);
|
||||
expect(isTelegramPollingNetworkError(slackDnsError)).toBe(false);
|
||||
|
||||
tagTelegramNetworkError(slackDnsError, {
|
||||
method: "getUpdates",
|
||||
url: "https://api.telegram.org/bot123456:ABC/getUpdates",
|
||||
});
|
||||
expect(getTelegramNetworkErrorOrigin(slackDnsError)).toEqual({
|
||||
method: "getupdates",
|
||||
url: "https://api.telegram.org/bot123456:ABC/getUpdates",
|
||||
});
|
||||
expect(isTelegramPollingNetworkError(slackDnsError)).toBe(true);
|
||||
});
|
||||
|
||||
it("detects recoverable error codes", () => {
|
||||
const err = Object.assign(new Error("timeout"), { code: "ETIMEDOUT" });
|
||||
expect(isRecoverableTelegramNetworkError(err)).toBe(true);
|
||||
|
||||
@ -5,6 +5,8 @@ import {
|
||||
readErrorName,
|
||||
} from "../infra/errors.js";
|
||||
|
||||
const TELEGRAM_NETWORK_ORIGIN = Symbol("openclaw.telegram.network-origin");
|
||||
|
||||
const RECOVERABLE_ERROR_CODES = new Set([
|
||||
"ECONNRESET",
|
||||
"ECONNREFUSED",
|
||||
@ -101,6 +103,51 @@ function getErrorCode(err: unknown): string | undefined {
|
||||
}
|
||||
|
||||
export type TelegramNetworkErrorContext = "polling" | "send" | "webhook" | "unknown";
|
||||
export type TelegramNetworkErrorOrigin = {
|
||||
method?: string | null;
|
||||
url?: string | null;
|
||||
};
|
||||
|
||||
function normalizeTelegramNetworkMethod(method?: string | null): string | null {
|
||||
const trimmed = method?.trim();
|
||||
if (!trimmed) {
|
||||
return null;
|
||||
}
|
||||
return trimmed.toLowerCase();
|
||||
}
|
||||
|
||||
export function tagTelegramNetworkError(err: unknown, origin: TelegramNetworkErrorOrigin): void {
|
||||
if (!err || typeof err !== "object") {
|
||||
return;
|
||||
}
|
||||
Object.defineProperty(err, TELEGRAM_NETWORK_ORIGIN, {
|
||||
value: {
|
||||
method: normalizeTelegramNetworkMethod(origin.method),
|
||||
url: typeof origin.url === "string" && origin.url.trim() ? origin.url : null,
|
||||
} satisfies TelegramNetworkErrorOrigin,
|
||||
configurable: true,
|
||||
});
|
||||
}
|
||||
|
||||
export function getTelegramNetworkErrorOrigin(err: unknown): TelegramNetworkErrorOrigin | null {
|
||||
for (const candidate of collectTelegramErrorCandidates(err)) {
|
||||
if (!candidate || typeof candidate !== "object") {
|
||||
continue;
|
||||
}
|
||||
const origin = (candidate as Record<PropertyKey, unknown>)[TELEGRAM_NETWORK_ORIGIN];
|
||||
if (!origin || typeof origin !== "object") {
|
||||
continue;
|
||||
}
|
||||
const method = "method" in origin && typeof origin.method === "string" ? origin.method : null;
|
||||
const url = "url" in origin && typeof origin.url === "string" ? origin.url : null;
|
||||
return { method, url };
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
export function isTelegramPollingNetworkError(err: unknown): boolean {
|
||||
return getTelegramNetworkErrorOrigin(err)?.method === "getupdates";
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns true if the error is safe to retry for a non-idempotent Telegram send operation
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user