From 4da6a7f21206ab6b097323165b4dbc7d2ba6e147 Mon Sep 17 00:00:00 2001 From: Peter Steinberger Date: Thu, 26 Feb 2026 16:39:14 +0100 Subject: [PATCH] refactor(restart): extract stale pid cleanup and supervisor markers --- src/infra/process-respawn.test.ts | 12 +-- src/infra/process-respawn.ts | 24 +----- src/infra/restart-stale-pids.ts | 127 ++++++++++++++++++++++++++++++ src/infra/restart.test.ts | 114 ++++++++++++++++++++++++--- src/infra/restart.ts | 98 +---------------------- src/infra/supervisor-markers.ts | 20 +++++ 6 files changed, 259 insertions(+), 136 deletions(-) create mode 100644 src/infra/restart-stale-pids.ts create mode 100644 src/infra/supervisor-markers.ts diff --git a/src/infra/process-respawn.test.ts b/src/infra/process-respawn.test.ts index e92463ad00e..13c1bc6a08d 100644 --- a/src/infra/process-respawn.test.ts +++ b/src/infra/process-respawn.test.ts @@ -1,5 +1,6 @@ import { afterEach, describe, expect, it, vi } from "vitest"; import { captureFullEnv } from "../test-utils/env.js"; +import { SUPERVISOR_HINT_ENV_VARS } from "./supervisor-markers.js"; const spawnMock = vi.hoisted(() => vi.fn()); @@ -21,14 +22,9 @@ afterEach(() => { }); function clearSupervisorHints() { - delete process.env.LAUNCH_JOB_LABEL; - delete process.env.LAUNCH_JOB_NAME; - delete process.env.OPENCLAW_LAUNCHD_LABEL; - delete process.env.INVOCATION_ID; - delete process.env.SYSTEMD_EXEC_PID; - delete process.env.JOURNAL_STREAM; - delete process.env.OPENCLAW_SYSTEMD_UNIT; - delete process.env.OPENCLAW_SERVICE_MARKER; + for (const key of SUPERVISOR_HINT_ENV_VARS) { + delete process.env[key]; + } } describe("restartGatewayProcessWithFreshPid", () => { diff --git a/src/infra/process-respawn.ts b/src/infra/process-respawn.ts index 8ad4e1992a9..d77721cd088 100644 --- a/src/infra/process-respawn.ts +++ b/src/infra/process-respawn.ts @@ -1,4 +1,5 @@ import { spawn } from "node:child_process"; +import { hasSupervisorHint } from "./supervisor-markers.js"; type RespawnMode = "spawned" | "supervised" | "disabled" | "failed"; @@ -8,24 +9,6 @@ export type GatewayRespawnResult = { detail?: string; }; -const SUPERVISOR_HINT_ENV_VARS = [ - // macOS launchd — native env vars (may be set by launchd itself) - "LAUNCH_JOB_LABEL", - "LAUNCH_JOB_NAME", - // macOS launchd — OpenClaw's own plist generator sets these via - // buildServiceEnvironment() in service-env.ts. launchd does NOT - // automatically inject LAUNCH_JOB_LABEL into the child environment, - // so OPENCLAW_LAUNCHD_LABEL is the reliable supervised-mode signal. - "OPENCLAW_LAUNCHD_LABEL", - // Linux systemd - "INVOCATION_ID", - "SYSTEMD_EXEC_PID", - "JOURNAL_STREAM", - "OPENCLAW_SYSTEMD_UNIT", - // Generic service marker (set by both launchd and systemd plist/unit generators) - "OPENCLAW_SERVICE_MARKER", -]; - function isTruthy(value: string | undefined): boolean { if (!value) { return false; @@ -35,10 +18,7 @@ function isTruthy(value: string | undefined): boolean { } function isLikelySupervisedProcess(env: NodeJS.ProcessEnv = process.env): boolean { - return SUPERVISOR_HINT_ENV_VARS.some((key) => { - const value = env[key]; - return typeof value === "string" && value.trim().length > 0; - }); + return hasSupervisorHint(env); } /** diff --git a/src/infra/restart-stale-pids.ts b/src/infra/restart-stale-pids.ts new file mode 100644 index 00000000000..bbab76f8374 --- /dev/null +++ b/src/infra/restart-stale-pids.ts @@ -0,0 +1,127 @@ +import { spawnSync } from "node:child_process"; +import { resolveGatewayPort } from "../config/paths.js"; +import { createSubsystemLogger } from "../logging/subsystem.js"; +import { resolveLsofCommandSync } from "./ports-lsof.js"; + +const SPAWN_TIMEOUT_MS = 2000; +const STALE_SIGTERM_WAIT_MS = 300; +const STALE_SIGKILL_WAIT_MS = 200; + +const restartLog = createSubsystemLogger("restart"); +let sleepSyncOverride: ((ms: number) => void) | null = null; + +function sleepSync(ms: number): void { + const timeoutMs = Math.max(0, Math.floor(ms)); + if (timeoutMs <= 0) { + return; + } + if (sleepSyncOverride) { + sleepSyncOverride(timeoutMs); + return; + } + try { + const lock = new Int32Array(new SharedArrayBuffer(4)); + Atomics.wait(lock, 0, 0, timeoutMs); + } catch { + const start = Date.now(); + while (Date.now() - start < timeoutMs) { + // Best-effort fallback when Atomics.wait is unavailable. + } + } +} + +/** + * Find PIDs of gateway processes listening on the given port using synchronous lsof. + * Returns only PIDs that belong to openclaw gateway processes (not the current process). + */ +export function findGatewayPidsOnPortSync(port: number): number[] { + if (process.platform === "win32") { + return []; + } + const lsof = resolveLsofCommandSync(); + const res = spawnSync(lsof, ["-nP", `-iTCP:${port}`, "-sTCP:LISTEN", "-Fpc"], { + encoding: "utf8", + timeout: SPAWN_TIMEOUT_MS, + }); + if (res.error || res.status !== 0) { + return []; + } + const pids: number[] = []; + let currentPid: number | undefined; + let currentCmd: string | undefined; + for (const line of res.stdout.split(/\r?\n/).filter(Boolean)) { + if (line.startsWith("p")) { + if (currentPid != null && currentCmd && currentCmd.toLowerCase().includes("openclaw")) { + pids.push(currentPid); + } + const parsed = Number.parseInt(line.slice(1), 10); + currentPid = Number.isFinite(parsed) && parsed > 0 ? parsed : undefined; + currentCmd = undefined; + } else if (line.startsWith("c")) { + currentCmd = line.slice(1); + } + } + if (currentPid != null && currentCmd && currentCmd.toLowerCase().includes("openclaw")) { + pids.push(currentPid); + } + return pids.filter((pid) => pid !== process.pid); +} + +/** + * Synchronously terminate stale gateway processes. + * Sends SIGTERM, waits briefly, then SIGKILL for survivors. + */ +function terminateStaleProcessesSync(pids: number[]): number[] { + if (pids.length === 0) { + return []; + } + const killed: number[] = []; + for (const pid of pids) { + try { + process.kill(pid, "SIGTERM"); + killed.push(pid); + } catch { + // ESRCH — already gone + } + } + if (killed.length === 0) { + return killed; + } + sleepSync(STALE_SIGTERM_WAIT_MS); + for (const pid of killed) { + try { + process.kill(pid, 0); + process.kill(pid, "SIGKILL"); + } catch { + // already gone + } + } + sleepSync(STALE_SIGKILL_WAIT_MS); + return killed; +} + +/** + * Inspect the gateway port and kill any stale gateway processes holding it. + * Called before service restart commands to prevent port conflicts. + */ +export function cleanStaleGatewayProcessesSync(): number[] { + try { + const port = resolveGatewayPort(undefined, process.env); + const stalePids = findGatewayPidsOnPortSync(port); + if (stalePids.length === 0) { + return []; + } + restartLog.warn( + `killing ${stalePids.length} stale gateway process(es) before restart: ${stalePids.join(", ")}`, + ); + return terminateStaleProcessesSync(stalePids); + } catch { + return []; + } +} + +export const __testing = { + setSleepSyncOverride(fn: ((ms: number) => void) | null) { + sleepSyncOverride = fn; + }, +}; diff --git a/src/infra/restart.test.ts b/src/infra/restart.test.ts index cc858933a2b..0203817016c 100644 --- a/src/infra/restart.test.ts +++ b/src/infra/restart.test.ts @@ -1,19 +1,111 @@ -import { describe, expect, it } from "vitest"; -import { findGatewayPidsOnPortSync } from "./restart.js"; +import { afterEach, beforeEach, describe, expect, it, vi } from "vitest"; + +const spawnSyncMock = vi.hoisted(() => vi.fn()); +const resolveLsofCommandSyncMock = vi.hoisted(() => vi.fn()); +const resolveGatewayPortMock = vi.hoisted(() => vi.fn()); + +vi.mock("node:child_process", () => ({ + spawnSync: (...args: unknown[]) => spawnSyncMock(...args), +})); + +vi.mock("./ports-lsof.js", () => ({ + resolveLsofCommandSync: (...args: unknown[]) => resolveLsofCommandSyncMock(...args), +})); + +vi.mock("../config/paths.js", () => ({ + resolveGatewayPort: (...args: unknown[]) => resolveGatewayPortMock(...args), +})); + +import { + __testing, + cleanStaleGatewayProcessesSync, + findGatewayPidsOnPortSync, +} from "./restart-stale-pids.js"; + +beforeEach(() => { + spawnSyncMock.mockReset(); + resolveLsofCommandSyncMock.mockReset(); + resolveGatewayPortMock.mockReset(); + + resolveLsofCommandSyncMock.mockReturnValue("/usr/sbin/lsof"); + resolveGatewayPortMock.mockReturnValue(18789); + __testing.setSleepSyncOverride(() => {}); +}); + +afterEach(() => { + __testing.setSleepSyncOverride(null); + vi.restoreAllMocks(); +}); describe("findGatewayPidsOnPortSync", () => { - it("returns an empty array for a port with no listeners", () => { - const pids = findGatewayPidsOnPortSync(19999); - expect(pids).toEqual([]); - }); + it("parses lsof output and filters non-openclaw/current processes", () => { + spawnSyncMock.mockReturnValue({ + error: undefined, + status: 0, + stdout: [ + `p${process.pid}`, + "copenclaw", + "p4100", + "copenclaw-gateway", + "p4200", + "cnode", + "p4300", + "cOpenClaw", + ].join("\n"), + }); - it("never includes the current process PID", () => { const pids = findGatewayPidsOnPortSync(18789); - expect(pids).not.toContain(process.pid); + + expect(pids).toEqual([4100, 4300]); + expect(spawnSyncMock).toHaveBeenCalledWith( + "/usr/sbin/lsof", + ["-nP", "-iTCP:18789", "-sTCP:LISTEN", "-Fpc"], + expect.objectContaining({ encoding: "utf8", timeout: 2000 }), + ); }); - it("returns an array (not undefined or null) on any port", () => { - const pids = findGatewayPidsOnPortSync(0); - expect(Array.isArray(pids)).toBe(true); + it("returns empty when lsof fails", () => { + spawnSyncMock.mockReturnValue({ + error: undefined, + status: 1, + stdout: "", + stderr: "lsof failed", + }); + + expect(findGatewayPidsOnPortSync(18789)).toEqual([]); + }); +}); + +describe("cleanStaleGatewayProcessesSync", () => { + it("kills stale gateway pids discovered on the gateway port", () => { + spawnSyncMock.mockReturnValue({ + error: undefined, + status: 0, + stdout: ["p6001", "copenclaw", "p6002", "copenclaw-gateway"].join("\n"), + }); + const killSpy = vi.spyOn(process, "kill").mockImplementation(() => true); + + const killed = cleanStaleGatewayProcessesSync(); + + expect(killed).toEqual([6001, 6002]); + expect(resolveGatewayPortMock).toHaveBeenCalledWith(undefined, process.env); + expect(killSpy).toHaveBeenCalledWith(6001, "SIGTERM"); + expect(killSpy).toHaveBeenCalledWith(6002, "SIGTERM"); + expect(killSpy).toHaveBeenCalledWith(6001, "SIGKILL"); + expect(killSpy).toHaveBeenCalledWith(6002, "SIGKILL"); + }); + + it("returns empty when no stale listeners are found", () => { + spawnSyncMock.mockReturnValue({ + error: undefined, + status: 0, + stdout: "", + }); + const killSpy = vi.spyOn(process, "kill").mockImplementation(() => true); + + const killed = cleanStaleGatewayProcessesSync(); + + expect(killed).toEqual([]); + expect(killSpy).not.toHaveBeenCalled(); }); }); diff --git a/src/infra/restart.ts b/src/infra/restart.ts index 35cad07175f..c84dfc6f7ac 100644 --- a/src/infra/restart.ts +++ b/src/infra/restart.ts @@ -1,11 +1,10 @@ import { spawnSync } from "node:child_process"; -import { resolveGatewayPort } from "../config/paths.js"; import { resolveGatewayLaunchAgentLabel, resolveGatewaySystemdServiceName, } from "../daemon/constants.js"; import { createSubsystemLogger } from "../logging/subsystem.js"; -import { resolveLsofCommandSync } from "./ports-lsof.js"; +import { cleanStaleGatewayProcessesSync, findGatewayPidsOnPortSync } from "./restart-stale-pids.js"; export type RestartAttempt = { ok: boolean; @@ -22,6 +21,8 @@ const RESTART_COOLDOWN_MS = 30_000; const restartLog = createSubsystemLogger("restart"); +export { findGatewayPidsOnPortSync }; + let sigusr1AuthorizedCount = 0; let sigusr1AuthorizedUntil = 0; let sigusr1ExternalAllowed = false; @@ -285,99 +286,6 @@ function normalizeSystemdUnit(raw?: string, profile?: string): string { return unit.endsWith(".service") ? unit : `${unit}.service`; } -/** - * Find PIDs of gateway processes listening on the given port using synchronous lsof. - * Returns only PIDs that belong to openclaw gateway processes (not the current process). - */ -export function findGatewayPidsOnPortSync(port: number): number[] { - if (process.platform === "win32") { - return []; - } - const lsof = resolveLsofCommandSync(); - const res = spawnSync(lsof, ["-nP", `-iTCP:${port}`, "-sTCP:LISTEN", "-Fpc"], { - encoding: "utf8", - timeout: SPAWN_TIMEOUT_MS, - }); - if (res.error || res.status !== 0) { - return []; - } - const pids: number[] = []; - let currentPid: number | undefined; - let currentCmd: string | undefined; - for (const line of res.stdout.split(/\r?\n/).filter(Boolean)) { - if (line.startsWith("p")) { - if (currentPid != null && currentCmd && currentCmd.toLowerCase().includes("openclaw")) { - pids.push(currentPid); - } - const parsed = Number.parseInt(line.slice(1), 10); - currentPid = Number.isFinite(parsed) && parsed > 0 ? parsed : undefined; - currentCmd = undefined; - } else if (line.startsWith("c")) { - currentCmd = line.slice(1); - } - } - if (currentPid != null && currentCmd && currentCmd.toLowerCase().includes("openclaw")) { - pids.push(currentPid); - } - return pids.filter((pid) => pid !== process.pid); -} - -const STALE_SIGTERM_WAIT_MS = 300; -const STALE_SIGKILL_WAIT_MS = 200; - -/** - * Synchronously terminate stale gateway processes. - * Sends SIGTERM, waits briefly, then SIGKILL for survivors. - */ -function terminateStaleProcessesSync(pids: number[]): number[] { - if (pids.length === 0) { - return []; - } - const killed: number[] = []; - for (const pid of pids) { - try { - process.kill(pid, "SIGTERM"); - killed.push(pid); - } catch { - // ESRCH — already gone - } - } - if (killed.length === 0) { - return killed; - } - spawnSync("sleep", [String(STALE_SIGTERM_WAIT_MS / 1000)], { timeout: 2000 }); - for (const pid of killed) { - try { - process.kill(pid, 0); - process.kill(pid, "SIGKILL"); - } catch { - // already gone - } - } - spawnSync("sleep", [String(STALE_SIGKILL_WAIT_MS / 1000)], { timeout: 2000 }); - return killed; -} - -/** - * Inspect the gateway port and kill any stale gateway processes holding it. - * Called before service restart commands to prevent port conflicts. - */ -function cleanStaleGatewayProcessesSync(): number[] { - try { - const port = resolveGatewayPort(undefined, process.env); - const stalePids = findGatewayPidsOnPortSync(port); - if (stalePids.length === 0) { - return []; - } - restartLog.warn( - `killing ${stalePids.length} stale gateway process(es) before restart: ${stalePids.join(", ")}`, - ); - return terminateStaleProcessesSync(stalePids); - } catch { - return []; - } -} - export function triggerOpenClawRestart(): RestartAttempt { if (process.env.VITEST || process.env.NODE_ENV === "test") { return { ok: true, method: "supervisor", detail: "test mode" }; diff --git a/src/infra/supervisor-markers.ts b/src/infra/supervisor-markers.ts new file mode 100644 index 00000000000..231bece5e3d --- /dev/null +++ b/src/infra/supervisor-markers.ts @@ -0,0 +1,20 @@ +export const SUPERVISOR_HINT_ENV_VARS = [ + // macOS launchd + "LAUNCH_JOB_LABEL", + "LAUNCH_JOB_NAME", + // OpenClaw service env markers + "OPENCLAW_LAUNCHD_LABEL", + "OPENCLAW_SYSTEMD_UNIT", + "OPENCLAW_SERVICE_MARKER", + // Linux systemd + "INVOCATION_ID", + "SYSTEMD_EXEC_PID", + "JOURNAL_STREAM", +] as const; + +export function hasSupervisorHint(env: NodeJS.ProcessEnv = process.env): boolean { + return SUPERVISOR_HINT_ENV_VARS.some((key) => { + const value = env[key]; + return typeof value === "string" && value.trim().length > 0; + }); +}