From 6f938c3a9965b788a8e8f9fe1d253af0919843e0 Mon Sep 17 00:00:00 2001 From: GatewayJ <835269233@qq.com> Date: Wed, 18 Mar 2026 22:54:11 +0800 Subject: [PATCH] config: address Nacos PR review (listener body, onChange-on-change-only, backoff, runtime refresh, dotenv) Made-with: Cursor --- src/config/io.ts | 51 +++++++++++++++++++++++++++++- src/config/sources/nacos-client.ts | 27 +++++++++++----- src/gateway/server.impl.ts | 2 ++ 3 files changed, 71 insertions(+), 9 deletions(-) diff --git a/src/config/io.ts b/src/config/io.ts index 6a5fbd697f2..63f1f5b2792 100644 --- a/src/config/io.ts +++ b/src/config/io.ts @@ -1497,8 +1497,32 @@ export async function readBestEffortConfig(): Promise { return snapshot.valid ? loadConfig() : snapshot.config; } +function buildSnapshotFromRuntimeSnapshots(path: string): ConfigFileSnapshot | null { + if (!runtimeConfigSnapshot) return null; + const config = runtimeConfigSnapshot; + const resolved = runtimeConfigSourceSnapshot ?? config; + const raw = JSON.stringify(config); + return { + path, + exists: true, + raw, + parsed: config as unknown, + resolved: resolved as unknown, + valid: true, + config, + hash: hashConfigRaw(raw), + issues: [], + warnings: [], + legacyIssues: [], + }; +} + export async function readConfigFileSnapshot(): Promise { const source = getConfigSource(); + if (source?.kind === "nacos") { + const fromRuntime = buildSnapshotFromRuntimeSnapshots("nacos:openclaw.json"); + if (fromRuntime) return fromRuntime; + } if (source !== null) { return source.readSnapshot(); } @@ -1508,6 +1532,13 @@ export async function readConfigFileSnapshot(): Promise { export async function readConfigFileSnapshotForWrite(): Promise { const source = getConfigSource(); if (source?.kind === "nacos") { + const fromRuntime = buildSnapshotFromRuntimeSnapshots("nacos:openclaw.json"); + if (fromRuntime) { + return { + snapshot: fromRuntime, + writeOptions: { expectedConfigPath: fromRuntime.path }, + }; + } const snapshot = await source.readSnapshot(); return { snapshot, @@ -1530,7 +1561,25 @@ export async function writeConfigFile( const issueMessage = issue?.message ?? "invalid"; throw new Error(formatConfigValidationFailure(pathLabel, issueMessage)); } - setRuntimeConfigSnapshot(cfg); + setRuntimeConfigSnapshot(cfg, cfg); + const refreshHandler = runtimeConfigSnapshotRefreshHandler; + if (refreshHandler) { + try { + const refreshed = await refreshHandler.refresh({ sourceConfig: cfg }); + if (refreshed) return; + } catch (error) { + try { + refreshHandler.clearOnRefreshFailure?.(); + } catch { + // Keep the original refresh failure as the surfaced error. + } + const detail = error instanceof Error ? error.message : String(error); + throw new ConfigRuntimeRefreshError( + `Config was written (Nacos in-memory), but runtime snapshot refresh failed: ${detail}`, + { cause: error }, + ); + } + } return; } const io = createConfigIO(); diff --git a/src/config/sources/nacos-client.ts b/src/config/sources/nacos-client.ts index 548fae9ef25..eccbb6d3963 100644 --- a/src/config/sources/nacos-client.ts +++ b/src/config/sources/nacos-client.ts @@ -1,8 +1,11 @@ /** * Nacos config client: fetch config via Open API and long-poll for changes. * No nacos npm dependency; plain HTTP only. Uses opts.fetch for test injection. + * Listener uses Nacos v1 format: Listening-Configs=%02%02%02%01 */ +import crypto from "node:crypto"; + export type NacosConfigClientOptions = { serverAddr: string; dataId: string; @@ -41,25 +44,32 @@ export function createNacosConfigClient(opts: NacosConfigClientOptions): NacosCo const listenerUrl = `${base}/nacos/v1/cs/configs/listener`; + // MD5 of last-fetched content; used by listener so Nacos can compare and hold connection until change. + let lastContentMD5 = ""; + return { async fetchConfig(): Promise { const res = await doFetch(getConfigUrl()); if (!res.ok) { throw new Error(`Nacos get config failed: ${res.status} ${res.statusText}`); } - return res.text(); + const text = await res.text(); + lastContentMD5 = crypto.createHash("md5").update(text).digest("hex"); + return text; }, subscribe(onChange: () => void): () => void { let stopped = false; + const STX = "\x02"; + const SOH = "\x01"; const poll = async (): Promise => { if (stopped) return; - const body = new URLSearchParams({ - dataId: opts.dataId, - group: opts.group, - }); - if (opts.tenant) body.set("tenant", opts.tenant); + // Nacos v1 listener expects Listening-Configs=%02%02%02%01 + const tenant = opts.tenant ?? ""; + const listeningConfigs = + `${opts.dataId}${STX}${opts.group}${STX}${lastContentMD5}${STX}${tenant}${SOH}`; + const body = new URLSearchParams({ "Listening-Configs": listeningConfigs }); try { const res = await doFetch(listenerUrl, { method: "POST", @@ -71,10 +81,11 @@ export function createNacosConfigClient(opts: NacosConfigClientOptions): NacosCo }); if (stopped) return; if (res.ok) { - onChange(); + const text = await res.text(); + if (text.trim()) onChange(); } } catch { - // Ignore errors; loop will retry or exit when stopped + await new Promise((r) => setTimeout(r, 5000)); } if (!stopped) { void poll(); diff --git a/src/gateway/server.impl.ts b/src/gateway/server.impl.ts index 56d4b40c3b6..ba12fd934b5 100644 --- a/src/gateway/server.impl.ts +++ b/src/gateway/server.impl.ts @@ -32,6 +32,7 @@ import { resolveControlUiRootOverrideSync, resolveControlUiRootSync, } from "../infra/control-ui-assets.js"; +import { loadDotEnv } from "../infra/dotenv.js"; import { isDiagnosticsEnabled } from "../infra/diagnostic-events.js"; import { logAcceptedEnvOption } from "../infra/env.js"; import { createExecApprovalForwarder } from "../infra/exec-approval-forwarder.js"; @@ -381,6 +382,7 @@ export async function startGatewayServer( description: "raw stream log path override", }); + loadDotEnv({ quiet: true }); setConfigSource(resolveConfigSource(process.env)); let configSnapshot = await readConfigFileSnapshot();