diff --git a/CHANGELOG.md b/CHANGELOG.md index 2d99a6fdcff..471970d48d6 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -128,6 +128,7 @@ Docs: https://docs.openclaw.ai - Plugins/subagents: forward per-run provider and model overrides through gateway plugin subagent dispatch so plugin-launched agent delegations honor explicit model selection again. (#48277) Thanks @jalehman. - Agents/compaction: write minimal boundary summaries for empty preparations while keeping split-turn prefixes on the normal path, so no-summarizable-message sessions stop retriggering the safeguard loop. (#42215) thanks @lml2468. - Models/chat commands: keep `/model ...@YYYYMMDD` version suffixes intact by default, but still honor matching stored numeric auth-profile overrides for the same provider. (#48896) Thanks @Alix-007. +- Gateway/channels: serialize per-account channel startup so overlapping starts do not boot the same provider twice, preventing MS Teams `EADDRINUSE` crash loops during startup and restart. (#49583) Thanks @sudie-codes. ### Fixes diff --git a/src/gateway/server-channels.test.ts b/src/gateway/server-channels.test.ts index 2e886962d33..01dd6aa17d3 100644 --- a/src/gateway/server-channels.test.ts +++ b/src/gateway/server-channels.test.ts @@ -45,6 +45,7 @@ function createTestPlugin(params?: { startAccount?: NonNullable["gateway"]>["startAccount"]; includeDescribeAccount?: boolean; resolveAccount?: ChannelPlugin["config"]["resolveAccount"]; + isConfigured?: ChannelPlugin["config"]["isConfigured"]; }): ChannelPlugin { const account = params?.account ?? { enabled: true, configured: true }; const includeDescribeAccount = params?.includeDescribeAccount !== false; @@ -52,6 +53,7 @@ function createTestPlugin(params?: { listAccountIds: () => [DEFAULT_ACCOUNT_ID], resolveAccount: params?.resolveAccount ?? (() => account), isEnabled: (resolved) => resolved.enabled !== false, + ...(params?.isConfigured ? { isConfigured: params.isConfigured } : {}), }; if (includeDescribeAccount) { config.describeAccount = (resolved) => ({ @@ -79,6 +81,14 @@ function createTestPlugin(params?: { }; } +function createDeferred(): { promise: Promise; resolve: () => void } { + let resolvePromise = () => {}; + const promise = new Promise((resolve) => { + resolvePromise = resolve; + }); + return { promise, resolve: resolvePromise }; +} + function installTestRegistry(plugin: ChannelPlugin) { const registry = createEmptyPluginRegistry(); registry.channels.push({ @@ -189,6 +199,52 @@ describe("server-channels auto restart", () => { expect(startAccount).toHaveBeenCalledTimes(1); }); + it("deduplicates concurrent start requests for the same account", async () => { + const startupGate = createDeferred(); + const isConfigured = vi.fn(async () => { + await startupGate.promise; + return true; + }); + const startAccount = vi.fn(async () => {}); + + installTestRegistry(createTestPlugin({ startAccount, isConfigured })); + const manager = createManager(); + + const firstStart = manager.startChannel("discord", DEFAULT_ACCOUNT_ID); + const secondStart = manager.startChannel("discord", DEFAULT_ACCOUNT_ID); + + await Promise.resolve(); + expect(isConfigured).toHaveBeenCalledTimes(1); + expect(startAccount).not.toHaveBeenCalled(); + + startupGate.resolve(); + await Promise.all([firstStart, secondStart]); + + expect(startAccount).toHaveBeenCalledTimes(1); + }); + + it("cancels a pending startup when the account is stopped mid-boot", async () => { + const startupGate = createDeferred(); + const isConfigured = vi.fn(async () => { + await startupGate.promise; + return true; + }); + const startAccount = vi.fn(async () => {}); + + installTestRegistry(createTestPlugin({ startAccount, isConfigured })); + const manager = createManager(); + + const startTask = manager.startChannel("discord", DEFAULT_ACCOUNT_ID); + await Promise.resolve(); + + const stopTask = manager.stopChannel("discord", DEFAULT_ACCOUNT_ID); + startupGate.resolve(); + + await Promise.all([startTask, stopTask]); + + expect(startAccount).not.toHaveBeenCalled(); + }); + it("does not resolve channelRuntime until a channel starts", async () => { const channelRuntime = { marker: "lazy-channel-runtime", diff --git a/src/gateway/server-channels.ts b/src/gateway/server-channels.ts index a016826f69b..16cad24b07d 100644 --- a/src/gateway/server-channels.ts +++ b/src/gateway/server-channels.ts @@ -32,6 +32,7 @@ type SubsystemLogger = ReturnType; type ChannelRuntimeStore = { aborts: Map; + starting: Map>; tasks: Map>; runtimes: Map; }; @@ -49,6 +50,7 @@ type ChannelHealthMonitorConfig = HealthMonitorConfig & { function createRuntimeStore(): ChannelRuntimeStore { return { aborts: new Map(), + starting: new Map(), tasks: new Map(), runtimes: new Map(), }; @@ -256,137 +258,174 @@ export function createChannelManager(opts: ChannelManagerOptions): ChannelManage if (store.tasks.has(id)) { return; } - const account = plugin.config.resolveAccount(cfg, id); - const enabled = plugin.config.isEnabled - ? plugin.config.isEnabled(account, cfg) - : isAccountEnabled(account); - if (!enabled) { - setRuntime(channelId, id, { - accountId: id, - enabled: false, - configured: true, - running: false, - restartPending: false, - lastError: plugin.config.disabledReason?.(account, cfg) ?? "disabled", - }); + const existingStart = store.starting.get(id); + if (existingStart) { + await existingStart; return; } - let configured = true; - if (plugin.config.isConfigured) { - configured = await plugin.config.isConfigured(account, cfg); - } - if (!configured) { - setRuntime(channelId, id, { - accountId: id, - enabled: true, - configured: false, - running: false, - restartPending: false, - lastError: plugin.config.unconfiguredReason?.(account, cfg) ?? "not configured", - }); - return; - } - - const rKey = restartKey(channelId, id); - if (!preserveManualStop) { - manuallyStopped.delete(rKey); - } + let resolveStart: (() => void) | undefined; + const startGate = new Promise((resolve) => { + resolveStart = resolve; + }); + store.starting.set(id, startGate); + // Reserve the account before the first await so overlapping start calls + // cannot race into duplicate provider boots for the same account. const abort = new AbortController(); store.aborts.set(id, abort); - if (!preserveRestartAttempts) { - restartAttempts.delete(rKey); - } - setRuntime(channelId, id, { - accountId: id, - enabled: true, - configured: true, - running: true, - restartPending: false, - lastStartAt: Date.now(), - lastError: null, - reconnectAttempts: preserveRestartAttempts ? (restartAttempts.get(rKey) ?? 0) : 0, - }); + let handedOffTask = false; - const log = channelLogs[channelId]; - const resolvedChannelRuntime = getChannelRuntime(); - const task = startAccount({ - cfg, - accountId: id, - account, - runtime: channelRuntimeEnvs[channelId], - abortSignal: abort.signal, - log, - getStatus: () => getRuntime(channelId, id), - setStatus: (next) => setRuntime(channelId, id, next), - ...(resolvedChannelRuntime ? { channelRuntime: resolvedChannelRuntime } : {}), - }); - const trackedPromise = Promise.resolve(task) - .catch((err) => { - const message = formatErrorMessage(err); - setRuntime(channelId, id, { accountId: id, lastError: message }); - log.error?.(`[${id}] channel exited: ${message}`); - }) - .finally(() => { + try { + const account = plugin.config.resolveAccount(cfg, id); + const enabled = plugin.config.isEnabled + ? plugin.config.isEnabled(account, cfg) + : isAccountEnabled(account); + if (!enabled) { + setRuntime(channelId, id, { + accountId: id, + enabled: false, + configured: true, + running: false, + restartPending: false, + lastError: plugin.config.disabledReason?.(account, cfg) ?? "disabled", + }); + return; + } + + let configured = true; + if (plugin.config.isConfigured) { + configured = await plugin.config.isConfigured(account, cfg); + } + if (!configured) { + setRuntime(channelId, id, { + accountId: id, + enabled: true, + configured: false, + running: false, + restartPending: false, + lastError: plugin.config.unconfiguredReason?.(account, cfg) ?? "not configured", + }); + return; + } + + const rKey = restartKey(channelId, id); + if (!preserveManualStop) { + manuallyStopped.delete(rKey); + } + + if (abort.signal.aborted || manuallyStopped.has(rKey)) { setRuntime(channelId, id, { accountId: id, running: false, + restartPending: false, lastStopAt: Date.now(), }); - }) - .then(async () => { - if (manuallyStopped.has(rKey)) { - return; - } - const attempt = (restartAttempts.get(rKey) ?? 0) + 1; - restartAttempts.set(rKey, attempt); - if (attempt > MAX_RESTART_ATTEMPTS) { + return; + } + + if (!preserveRestartAttempts) { + restartAttempts.delete(rKey); + } + setRuntime(channelId, id, { + accountId: id, + enabled: true, + configured: true, + running: true, + restartPending: false, + lastStartAt: Date.now(), + lastError: null, + reconnectAttempts: preserveRestartAttempts ? (restartAttempts.get(rKey) ?? 0) : 0, + }); + + const log = channelLogs[channelId]; + const resolvedChannelRuntime = getChannelRuntime(); + const task = startAccount({ + cfg, + accountId: id, + account, + runtime: channelRuntimeEnvs[channelId], + abortSignal: abort.signal, + log, + getStatus: () => getRuntime(channelId, id), + setStatus: (next) => setRuntime(channelId, id, next), + ...(resolvedChannelRuntime ? { channelRuntime: resolvedChannelRuntime } : {}), + }); + const trackedPromise = Promise.resolve(task) + .catch((err) => { + const message = formatErrorMessage(err); + setRuntime(channelId, id, { accountId: id, lastError: message }); + log.error?.(`[${id}] channel exited: ${message}`); + }) + .finally(() => { setRuntime(channelId, id, { accountId: id, - restartPending: false, - reconnectAttempts: attempt, + running: false, + lastStopAt: Date.now(), }); - log.error?.(`[${id}] giving up after ${MAX_RESTART_ATTEMPTS} restart attempts`); - return; - } - const delayMs = computeBackoff(CHANNEL_RESTART_POLICY, attempt); - log.info?.( - `[${id}] auto-restart attempt ${attempt}/${MAX_RESTART_ATTEMPTS} in ${Math.round(delayMs / 1000)}s`, - ); - setRuntime(channelId, id, { - accountId: id, - restartPending: true, - reconnectAttempts: attempt, - }); - try { - await sleepWithAbort(delayMs, abort.signal); + }) + .then(async () => { if (manuallyStopped.has(rKey)) { return; } + const attempt = (restartAttempts.get(rKey) ?? 0) + 1; + restartAttempts.set(rKey, attempt); + if (attempt > MAX_RESTART_ATTEMPTS) { + setRuntime(channelId, id, { + accountId: id, + restartPending: false, + reconnectAttempts: attempt, + }); + log.error?.(`[${id}] giving up after ${MAX_RESTART_ATTEMPTS} restart attempts`); + return; + } + const delayMs = computeBackoff(CHANNEL_RESTART_POLICY, attempt); + log.info?.( + `[${id}] auto-restart attempt ${attempt}/${MAX_RESTART_ATTEMPTS} in ${Math.round(delayMs / 1000)}s`, + ); + setRuntime(channelId, id, { + accountId: id, + restartPending: true, + reconnectAttempts: attempt, + }); + try { + await sleepWithAbort(delayMs, abort.signal); + if (manuallyStopped.has(rKey)) { + return; + } + if (store.tasks.get(id) === trackedPromise) { + store.tasks.delete(id); + } + if (store.aborts.get(id) === abort) { + store.aborts.delete(id); + } + await startChannelInternal(channelId, id, { + preserveRestartAttempts: true, + preserveManualStop: true, + }); + } catch { + // abort or startup failure — next crash will retry + } + }) + .finally(() => { if (store.tasks.get(id) === trackedPromise) { store.tasks.delete(id); } if (store.aborts.get(id) === abort) { store.aborts.delete(id); } - await startChannelInternal(channelId, id, { - preserveRestartAttempts: true, - preserveManualStop: true, - }); - } catch { - // abort or startup failure — next crash will retry - } - }) - .finally(() => { - if (store.tasks.get(id) === trackedPromise) { - store.tasks.delete(id); - } - if (store.aborts.get(id) === abort) { - store.aborts.delete(id); - } - }); - store.tasks.set(id, trackedPromise); + }); + handedOffTask = true; + store.tasks.set(id, trackedPromise); + } finally { + resolveStart?.(); + if (store.starting.get(id) === startGate) { + store.starting.delete(id); + } + if (!handedOffTask && store.aborts.get(id) === abort) { + store.aborts.delete(id); + } + } }), ); }; @@ -405,6 +444,7 @@ export function createChannelManager(opts: ChannelManagerOptions): ChannelManage const cfg = loadConfig(); const knownIds = new Set([ ...store.aborts.keys(), + ...store.starting.keys(), ...store.tasks.keys(), ...(plugin ? plugin.config.listAccountIds(cfg) : []), ]);