diff --git a/src/infra/bonjour.test.ts b/src/infra/bonjour.test.ts index d8f976fdc41..efccc1ce8b1 100644 --- a/src/infra/bonjour.test.ts +++ b/src/infra/bonjour.test.ts @@ -27,21 +27,32 @@ function mockCiaoService(params?: { advertise?: ReturnType; destroy?: ReturnType; serviceState?: string; + stateRef?: { value: string }; on?: ReturnType; }) { const advertise = params?.advertise ?? vi.fn().mockResolvedValue(undefined); const destroy = params?.destroy ?? vi.fn().mockResolvedValue(undefined); const on = params?.on ?? vi.fn(); createService.mockImplementation((options: Record) => { - return { + const service = { advertise, destroy, - serviceState: params?.serviceState ?? "announced", on, getFQDN: () => `${asString(options.type, "service")}.${asString(options.domain, "local")}.`, getHostname: () => asString(options.hostname, "unknown"), getPort: () => Number(options.port ?? -1), }; + Object.defineProperty(service, "serviceState", { + configurable: true, + enumerable: true, + get: () => params?.stateRef?.value ?? params?.serviceState ?? "announced", + set: (value: string) => { + if (params?.stateRef) { + params.stateRef.value = value; + } + }, + }); + return service; }); return { advertise, destroy, on }; } @@ -254,7 +265,7 @@ describe("gateway bonjour advertiser", () => { expect(logWarn).toHaveBeenCalledWith(expect.stringContaining("advertise failed")); // watchdog should attempt re-advertise at the 60s interval tick - await vi.advanceTimersByTimeAsync(60_000); + await vi.advanceTimersByTimeAsync(15_000); expect(advertise).toHaveBeenCalledTimes(2); await started.stop(); @@ -283,6 +294,43 @@ describe("gateway bonjour advertiser", () => { await started.stop(); }); + it("recreates the advertiser when ciao gets stuck announcing", async () => { + enableAdvertiserUnitMode(); + vi.useFakeTimers(); + + const stateRef = { value: "announcing" }; + const destroy = vi.fn().mockResolvedValue(undefined); + const advertise = vi.fn().mockImplementation(() => { + if (advertise.mock.calls.length === 1) { + stateRef.value = "announcing"; + return new Promise(() => {}); + } + stateRef.value = "announced"; + return Promise.resolve(); + }); + mockCiaoService({ advertise, destroy, stateRef }); + + const started = await startGatewayBonjourAdvertiser({ + gatewayPort: 18789, + sshPort: 2222, + }); + + expect(createService).toHaveBeenCalledTimes(1); + expect(advertise).toHaveBeenCalledTimes(1); + + await vi.advanceTimersByTimeAsync(15_000); + + expect(logWarn).toHaveBeenCalledWith(expect.stringContaining("restarting advertiser")); + expect(createService).toHaveBeenCalledTimes(2); + expect(advertise).toHaveBeenCalledTimes(2); + expect(destroy).toHaveBeenCalledTimes(1); + expect(shutdown).toHaveBeenCalledTimes(1); + + await started.stop(); + expect(destroy).toHaveBeenCalledTimes(2); + expect(shutdown).toHaveBeenCalledTimes(2); + }); + it("normalizes hostnames with domains for service names", async () => { // Allow advertiser to run in unit tests. delete process.env.VITEST; diff --git a/src/infra/bonjour.ts b/src/infra/bonjour.ts index 7d405741a0e..9e7790e2065 100644 --- a/src/infra/bonjour.ts +++ b/src/infra/bonjour.ts @@ -58,6 +58,32 @@ type BonjourService = { serviceState: string; }; +type BonjourCycle = { + responder: { + createService: (options: { + name: string; + type: string; + protocol: unknown; + port: number; + domain: string; + hostname: string; + txt: Record; + }) => unknown; + shutdown: () => Promise; + }; + services: Array<{ label: string; svc: BonjourService }>; + cleanupUnhandledRejection?: () => void; +}; + +type ServiceStateTracker = { + state: string; + sinceMs: number; +}; + +const WATCHDOG_INTERVAL_MS = 5_000; +const REPAIR_DEBOUNCE_MS = 30_000; +const STUCK_ANNOUNCING_MS = 8_000; + function serviceSummary(label: string, svc: BonjourService): string { let fqdn = "unknown"; let hostname = "unknown"; @@ -89,7 +115,6 @@ export async function startGatewayBonjourAdvertiser( } const { getResponder, Protocol } = await import("@homebridge/ciao"); - const responder = getResponder(); // mDNS service instance names are single DNS labels; dots in hostnames (like // `Mac.localdomain`) can confuse some resolvers/browsers and break discovery. @@ -133,8 +158,6 @@ export async function startGatewayBonjourAdvertiser( txtBase.cliPath = opts.cliPath.trim(); } - const services: Array<{ label: string; svc: BonjourService }> = []; - // Build TXT record for the gateway service. // In minimal mode, omit sshPort to avoid advertising SSH availability. const gatewayTxt: Record = { @@ -145,25 +168,91 @@ export async function startGatewayBonjourAdvertiser( gatewayTxt.sshPort = String(opts.sshPort ?? 22); } - const gateway = responder.createService({ - name: safeServiceName(instanceName), - type: "openclaw-gw", - protocol: Protocol.TCP, - port: opts.gatewayPort, - domain: "local", - hostname, - txt: gatewayTxt, - }); - services.push({ - label: "gateway", - svc: gateway as unknown as BonjourService, - }); + function createCycle(): BonjourCycle { + const responder = getResponder(); + const services: Array<{ label: string; svc: BonjourService }> = []; - let ciaoCancellationRejectionHandler: (() => void) | undefined; - if (services.length > 0) { - ciaoCancellationRejectionHandler = registerUnhandledRejectionHandler( - ignoreCiaoCancellationRejection, - ); + const gateway = responder.createService({ + name: safeServiceName(instanceName), + type: "openclaw-gw", + protocol: Protocol.TCP, + port: opts.gatewayPort, + domain: "local", + hostname, + txt: gatewayTxt, + }); + services.push({ + label: "gateway", + svc: gateway as unknown as BonjourService, + }); + + const cleanupUnhandledRejection = + services.length > 0 + ? registerUnhandledRejectionHandler(ignoreCiaoCancellationRejection) + : undefined; + + return { responder, services, cleanupUnhandledRejection }; + } + + async function stopCycle(cycle: BonjourCycle | null) { + if (!cycle) { + return; + } + for (const { svc } of cycle.services) { + try { + await svc.destroy(); + } catch { + /* ignore */ + } + } + try { + await cycle.responder.shutdown(); + } catch { + /* ignore */ + } finally { + cycle.cleanupUnhandledRejection?.(); + } + } + + function attachConflictListeners(services: Array<{ label: string; svc: BonjourService }>) { + for (const { label, svc } of services) { + try { + svc.on("name-change", (name: unknown) => { + const next = typeof name === "string" ? name : String(name); + logWarn(`bonjour: ${label} name conflict resolved; newName=${JSON.stringify(next)}`); + }); + svc.on("hostname-change", (nextHostname: unknown) => { + const next = typeof nextHostname === "string" ? nextHostname : String(nextHostname); + logWarn( + `bonjour: ${label} hostname conflict resolved; newHostname=${JSON.stringify(next)}`, + ); + }); + } catch (err) { + logDebug(`bonjour: failed to attach listeners for ${label}: ${String(err)}`); + } + } + } + + function startAdvertising(services: Array<{ label: string; svc: BonjourService }>) { + for (const { label, svc } of services) { + try { + void svc + .advertise() + .then(() => { + // Keep this out of stdout/stderr (menubar + tests) but capture in the rolling log. + getLogger().info(`bonjour: advertised ${serviceSummary(label, svc)}`); + }) + .catch((err) => { + logWarn( + `bonjour: advertise failed (${serviceSummary(label, svc)}): ${formatBonjourError(err)}`, + ); + }); + } catch (err) { + logWarn( + `bonjour: advertise threw (${serviceSummary(label, svc)}): ${formatBonjourError(err)}`, + ); + } + } } logDebug( @@ -172,55 +261,72 @@ export async function startGatewayBonjourAdvertiser( )}, gatewayPort=${opts.gatewayPort}${opts.minimal ? ", minimal=true" : `, sshPort=${opts.sshPort ?? 22}`})`, ); - for (const { label, svc } of services) { - try { - svc.on("name-change", (name: unknown) => { - const next = typeof name === "string" ? name : String(name); - logWarn(`bonjour: ${label} name conflict resolved; newName=${JSON.stringify(next)}`); - }); - svc.on("hostname-change", (nextHostname: unknown) => { - const next = typeof nextHostname === "string" ? nextHostname : String(nextHostname); - logWarn( - `bonjour: ${label} hostname conflict resolved; newHostname=${JSON.stringify(next)}`, - ); - }); - } catch (err) { - logDebug(`bonjour: failed to attach listeners for ${label}: ${String(err)}`); - } - } + let stopped = false; + let recreatePromise: Promise | null = null; + let cycle = createCycle(); + const stateTracker = new Map(); + attachConflictListeners(cycle.services); + startAdvertising(cycle.services); - // Do not block gateway startup on mDNS probing/announce. Advertising can take - // multiple seconds depending on network state; the gateway should come up even - // if Bonjour is slow or fails. - for (const { label, svc } of services) { - try { - void svc - .advertise() - .then(() => { - // Keep this out of stdout/stderr (menubar + tests) but capture in the rolling log. - getLogger().info(`bonjour: advertised ${serviceSummary(label, svc)}`); - }) - .catch((err) => { - logWarn( - `bonjour: advertise failed (${serviceSummary(label, svc)}): ${formatBonjourError(err)}`, - ); - }); - } catch (err) { - logWarn( - `bonjour: advertise threw (${serviceSummary(label, svc)}): ${formatBonjourError(err)}`, - ); + const updateStateTrackers = (services: Array<{ label: string; svc: BonjourService }>) => { + const now = Date.now(); + for (const { label, svc } of services) { + const nextState = typeof svc.serviceState === "string" ? svc.serviceState : "unknown"; + const current = stateTracker.get(label); + if (!current || current.state !== nextState) { + stateTracker.set(label, { state: nextState, sinceMs: now }); + } } - } + }; + + const recreateAdvertiser = async (reason: string) => { + if (stopped) { + return; + } + if (recreatePromise) { + return recreatePromise; + } + recreatePromise = (async () => { + logWarn(`bonjour: restarting advertiser (${reason})`); + const previous = cycle; + cycle = createCycle(); + stateTracker.clear(); + attachConflictListeners(cycle.services); + startAdvertising(cycle.services); + await stopCycle(previous); + })().finally(() => { + recreatePromise = null; + }); + return recreatePromise; + }; // Watchdog: if we ever end up in an unannounced state (e.g. after sleep/wake or // interface churn), try to re-advertise instead of requiring a full gateway restart. const lastRepairAttempt = new Map(); const watchdog = setInterval(() => { - for (const { label, svc } of services) { + if (stopped || recreatePromise) { + return; + } + updateStateTrackers(cycle.services); + for (const { label, svc } of cycle.services) { const stateUnknown = (svc as { serviceState?: unknown }).serviceState; if (typeof stateUnknown !== "string") { continue; } + const tracked = stateTracker.get(label); + if ( + stateUnknown === "announcing" && + tracked && + Date.now() - tracked.sinceMs >= STUCK_ANNOUNCING_MS + ) { + void recreateAdvertiser( + `service stuck announcing for ${Date.now() - tracked.sinceMs}ms (${serviceSummary( + label, + svc, + )})`, + ); + return; + } if (stateUnknown === "announced" || stateUnknown === "announcing") { continue; } @@ -233,7 +339,7 @@ export async function startGatewayBonjourAdvertiser( } const now = Date.now(); const last = lastRepairAttempt.get(key) ?? 0; - if (now - last < 30_000) { + if (now - last < REPAIR_DEBOUNCE_MS) { continue; } lastRepairAttempt.set(key, now); @@ -256,26 +362,15 @@ export async function startGatewayBonjourAdvertiser( ); } } - }, 60_000); + }, WATCHDOG_INTERVAL_MS); watchdog.unref?.(); return { stop: async () => { + stopped = true; clearInterval(watchdog); - for (const { svc } of services) { - try { - await svc.destroy(); - } catch { - /* ignore */ - } - } - try { - await responder.shutdown(); - } catch { - /* ignore */ - } finally { - ciaoCancellationRejectionHandler?.(); - } + await recreatePromise; + await stopCycle(cycle); }, }; }