fix: serialize duplicate channel starts (#49583) (thanks @sudie-codes)

This commit is contained in:
Vignesh Natarajan 2026-03-18 01:56:28 -07:00 committed by Vignesh
parent 1040ae56b5
commit 1890089f49
3 changed files with 204 additions and 107 deletions

View File

@ -128,6 +128,7 @@ Docs: https://docs.openclaw.ai
- Plugins/subagents: forward per-run provider and model overrides through gateway plugin subagent dispatch so plugin-launched agent delegations honor explicit model selection again. (#48277) Thanks @jalehman.
- Agents/compaction: write minimal boundary summaries for empty preparations while keeping split-turn prefixes on the normal path, so no-summarizable-message sessions stop retriggering the safeguard loop. (#42215) thanks @lml2468.
- Models/chat commands: keep `/model ...@YYYYMMDD` version suffixes intact by default, but still honor matching stored numeric auth-profile overrides for the same provider. (#48896) Thanks @Alix-007.
- Gateway/channels: serialize per-account channel startup so overlapping starts do not boot the same provider twice, preventing MS Teams `EADDRINUSE` crash loops during startup and restart. (#49583) Thanks @sudie-codes.
### Fixes

View File

@ -45,6 +45,7 @@ function createTestPlugin(params?: {
startAccount?: NonNullable<ChannelPlugin<TestAccount>["gateway"]>["startAccount"];
includeDescribeAccount?: boolean;
resolveAccount?: ChannelPlugin<TestAccount>["config"]["resolveAccount"];
isConfigured?: ChannelPlugin<TestAccount>["config"]["isConfigured"];
}): ChannelPlugin<TestAccount> {
const account = params?.account ?? { enabled: true, configured: true };
const includeDescribeAccount = params?.includeDescribeAccount !== false;
@ -52,6 +53,7 @@ function createTestPlugin(params?: {
listAccountIds: () => [DEFAULT_ACCOUNT_ID],
resolveAccount: params?.resolveAccount ?? (() => account),
isEnabled: (resolved) => resolved.enabled !== false,
...(params?.isConfigured ? { isConfigured: params.isConfigured } : {}),
};
if (includeDescribeAccount) {
config.describeAccount = (resolved) => ({
@ -79,6 +81,14 @@ function createTestPlugin(params?: {
};
}
function createDeferred(): { promise: Promise<void>; resolve: () => void } {
let resolvePromise = () => {};
const promise = new Promise<void>((resolve) => {
resolvePromise = resolve;
});
return { promise, resolve: resolvePromise };
}
function installTestRegistry(plugin: ChannelPlugin<TestAccount>) {
const registry = createEmptyPluginRegistry();
registry.channels.push({
@ -189,6 +199,52 @@ describe("server-channels auto restart", () => {
expect(startAccount).toHaveBeenCalledTimes(1);
});
it("deduplicates concurrent start requests for the same account", async () => {
const startupGate = createDeferred();
const isConfigured = vi.fn(async () => {
await startupGate.promise;
return true;
});
const startAccount = vi.fn(async () => {});
installTestRegistry(createTestPlugin({ startAccount, isConfigured }));
const manager = createManager();
const firstStart = manager.startChannel("discord", DEFAULT_ACCOUNT_ID);
const secondStart = manager.startChannel("discord", DEFAULT_ACCOUNT_ID);
await Promise.resolve();
expect(isConfigured).toHaveBeenCalledTimes(1);
expect(startAccount).not.toHaveBeenCalled();
startupGate.resolve();
await Promise.all([firstStart, secondStart]);
expect(startAccount).toHaveBeenCalledTimes(1);
});
it("cancels a pending startup when the account is stopped mid-boot", async () => {
const startupGate = createDeferred();
const isConfigured = vi.fn(async () => {
await startupGate.promise;
return true;
});
const startAccount = vi.fn(async () => {});
installTestRegistry(createTestPlugin({ startAccount, isConfigured }));
const manager = createManager();
const startTask = manager.startChannel("discord", DEFAULT_ACCOUNT_ID);
await Promise.resolve();
const stopTask = manager.stopChannel("discord", DEFAULT_ACCOUNT_ID);
startupGate.resolve();
await Promise.all([startTask, stopTask]);
expect(startAccount).not.toHaveBeenCalled();
});
it("does not resolve channelRuntime until a channel starts", async () => {
const channelRuntime = {
marker: "lazy-channel-runtime",

View File

@ -32,6 +32,7 @@ type SubsystemLogger = ReturnType<typeof createSubsystemLogger>;
type ChannelRuntimeStore = {
aborts: Map<string, AbortController>;
starting: Map<string, Promise<void>>;
tasks: Map<string, Promise<unknown>>;
runtimes: Map<string, ChannelAccountSnapshot>;
};
@ -49,6 +50,7 @@ type ChannelHealthMonitorConfig = HealthMonitorConfig & {
function createRuntimeStore(): ChannelRuntimeStore {
return {
aborts: new Map(),
starting: new Map(),
tasks: new Map(),
runtimes: new Map(),
};
@ -256,137 +258,174 @@ export function createChannelManager(opts: ChannelManagerOptions): ChannelManage
if (store.tasks.has(id)) {
return;
}
const account = plugin.config.resolveAccount(cfg, id);
const enabled = plugin.config.isEnabled
? plugin.config.isEnabled(account, cfg)
: isAccountEnabled(account);
if (!enabled) {
setRuntime(channelId, id, {
accountId: id,
enabled: false,
configured: true,
running: false,
restartPending: false,
lastError: plugin.config.disabledReason?.(account, cfg) ?? "disabled",
});
const existingStart = store.starting.get(id);
if (existingStart) {
await existingStart;
return;
}
let configured = true;
if (plugin.config.isConfigured) {
configured = await plugin.config.isConfigured(account, cfg);
}
if (!configured) {
setRuntime(channelId, id, {
accountId: id,
enabled: true,
configured: false,
running: false,
restartPending: false,
lastError: plugin.config.unconfiguredReason?.(account, cfg) ?? "not configured",
});
return;
}
const rKey = restartKey(channelId, id);
if (!preserveManualStop) {
manuallyStopped.delete(rKey);
}
let resolveStart: (() => void) | undefined;
const startGate = new Promise<void>((resolve) => {
resolveStart = resolve;
});
store.starting.set(id, startGate);
// Reserve the account before the first await so overlapping start calls
// cannot race into duplicate provider boots for the same account.
const abort = new AbortController();
store.aborts.set(id, abort);
if (!preserveRestartAttempts) {
restartAttempts.delete(rKey);
}
setRuntime(channelId, id, {
accountId: id,
enabled: true,
configured: true,
running: true,
restartPending: false,
lastStartAt: Date.now(),
lastError: null,
reconnectAttempts: preserveRestartAttempts ? (restartAttempts.get(rKey) ?? 0) : 0,
});
let handedOffTask = false;
const log = channelLogs[channelId];
const resolvedChannelRuntime = getChannelRuntime();
const task = startAccount({
cfg,
accountId: id,
account,
runtime: channelRuntimeEnvs[channelId],
abortSignal: abort.signal,
log,
getStatus: () => getRuntime(channelId, id),
setStatus: (next) => setRuntime(channelId, id, next),
...(resolvedChannelRuntime ? { channelRuntime: resolvedChannelRuntime } : {}),
});
const trackedPromise = Promise.resolve(task)
.catch((err) => {
const message = formatErrorMessage(err);
setRuntime(channelId, id, { accountId: id, lastError: message });
log.error?.(`[${id}] channel exited: ${message}`);
})
.finally(() => {
try {
const account = plugin.config.resolveAccount(cfg, id);
const enabled = plugin.config.isEnabled
? plugin.config.isEnabled(account, cfg)
: isAccountEnabled(account);
if (!enabled) {
setRuntime(channelId, id, {
accountId: id,
enabled: false,
configured: true,
running: false,
restartPending: false,
lastError: plugin.config.disabledReason?.(account, cfg) ?? "disabled",
});
return;
}
let configured = true;
if (plugin.config.isConfigured) {
configured = await plugin.config.isConfigured(account, cfg);
}
if (!configured) {
setRuntime(channelId, id, {
accountId: id,
enabled: true,
configured: false,
running: false,
restartPending: false,
lastError: plugin.config.unconfiguredReason?.(account, cfg) ?? "not configured",
});
return;
}
const rKey = restartKey(channelId, id);
if (!preserveManualStop) {
manuallyStopped.delete(rKey);
}
if (abort.signal.aborted || manuallyStopped.has(rKey)) {
setRuntime(channelId, id, {
accountId: id,
running: false,
restartPending: false,
lastStopAt: Date.now(),
});
})
.then(async () => {
if (manuallyStopped.has(rKey)) {
return;
}
const attempt = (restartAttempts.get(rKey) ?? 0) + 1;
restartAttempts.set(rKey, attempt);
if (attempt > MAX_RESTART_ATTEMPTS) {
return;
}
if (!preserveRestartAttempts) {
restartAttempts.delete(rKey);
}
setRuntime(channelId, id, {
accountId: id,
enabled: true,
configured: true,
running: true,
restartPending: false,
lastStartAt: Date.now(),
lastError: null,
reconnectAttempts: preserveRestartAttempts ? (restartAttempts.get(rKey) ?? 0) : 0,
});
const log = channelLogs[channelId];
const resolvedChannelRuntime = getChannelRuntime();
const task = startAccount({
cfg,
accountId: id,
account,
runtime: channelRuntimeEnvs[channelId],
abortSignal: abort.signal,
log,
getStatus: () => getRuntime(channelId, id),
setStatus: (next) => setRuntime(channelId, id, next),
...(resolvedChannelRuntime ? { channelRuntime: resolvedChannelRuntime } : {}),
});
const trackedPromise = Promise.resolve(task)
.catch((err) => {
const message = formatErrorMessage(err);
setRuntime(channelId, id, { accountId: id, lastError: message });
log.error?.(`[${id}] channel exited: ${message}`);
})
.finally(() => {
setRuntime(channelId, id, {
accountId: id,
restartPending: false,
reconnectAttempts: attempt,
running: false,
lastStopAt: Date.now(),
});
log.error?.(`[${id}] giving up after ${MAX_RESTART_ATTEMPTS} restart attempts`);
return;
}
const delayMs = computeBackoff(CHANNEL_RESTART_POLICY, attempt);
log.info?.(
`[${id}] auto-restart attempt ${attempt}/${MAX_RESTART_ATTEMPTS} in ${Math.round(delayMs / 1000)}s`,
);
setRuntime(channelId, id, {
accountId: id,
restartPending: true,
reconnectAttempts: attempt,
});
try {
await sleepWithAbort(delayMs, abort.signal);
})
.then(async () => {
if (manuallyStopped.has(rKey)) {
return;
}
const attempt = (restartAttempts.get(rKey) ?? 0) + 1;
restartAttempts.set(rKey, attempt);
if (attempt > MAX_RESTART_ATTEMPTS) {
setRuntime(channelId, id, {
accountId: id,
restartPending: false,
reconnectAttempts: attempt,
});
log.error?.(`[${id}] giving up after ${MAX_RESTART_ATTEMPTS} restart attempts`);
return;
}
const delayMs = computeBackoff(CHANNEL_RESTART_POLICY, attempt);
log.info?.(
`[${id}] auto-restart attempt ${attempt}/${MAX_RESTART_ATTEMPTS} in ${Math.round(delayMs / 1000)}s`,
);
setRuntime(channelId, id, {
accountId: id,
restartPending: true,
reconnectAttempts: attempt,
});
try {
await sleepWithAbort(delayMs, abort.signal);
if (manuallyStopped.has(rKey)) {
return;
}
if (store.tasks.get(id) === trackedPromise) {
store.tasks.delete(id);
}
if (store.aborts.get(id) === abort) {
store.aborts.delete(id);
}
await startChannelInternal(channelId, id, {
preserveRestartAttempts: true,
preserveManualStop: true,
});
} catch {
// abort or startup failure — next crash will retry
}
})
.finally(() => {
if (store.tasks.get(id) === trackedPromise) {
store.tasks.delete(id);
}
if (store.aborts.get(id) === abort) {
store.aborts.delete(id);
}
await startChannelInternal(channelId, id, {
preserveRestartAttempts: true,
preserveManualStop: true,
});
} catch {
// abort or startup failure — next crash will retry
}
})
.finally(() => {
if (store.tasks.get(id) === trackedPromise) {
store.tasks.delete(id);
}
if (store.aborts.get(id) === abort) {
store.aborts.delete(id);
}
});
store.tasks.set(id, trackedPromise);
});
handedOffTask = true;
store.tasks.set(id, trackedPromise);
} finally {
resolveStart?.();
if (store.starting.get(id) === startGate) {
store.starting.delete(id);
}
if (!handedOffTask && store.aborts.get(id) === abort) {
store.aborts.delete(id);
}
}
}),
);
};
@ -405,6 +444,7 @@ export function createChannelManager(opts: ChannelManagerOptions): ChannelManage
const cfg = loadConfig();
const knownIds = new Set<string>([
...store.aborts.keys(),
...store.starting.keys(),
...store.tasks.keys(),
...(plugin ? plugin.config.listAccountIds(cfg) : []),
]);