From 39d38902789d5a46aa2db26d91327a74c47c99db Mon Sep 17 00:00:00 2001 From: Dominic Lewis Date: Wed, 18 Mar 2026 18:30:45 -0400 Subject: [PATCH 1/2] fix(acp): add distributed session locking with fail-closed redis fallback - add SessionLockManager abstraction for ACP execution locking - add local, redis, and fail-closed lock manager implementations - use Redis SET NX PX plus owner-checked Lua release/renew - wire lock acquire/release/renew into ACP dispatch flow - skip with acp_execution_locked when lock is contended or acquire fails - document ACP session lock Redis env vars in .env.example - add unit and dispatch tests for lock behavior --- .env.example | 9 + src/auto-reply/reply/dispatch-acp.test.ts | 127 +++++ src/auto-reply/reply/dispatch-acp.ts | 73 +++ .../reply/session-lock-manager.test.ts | 198 +++++++ src/auto-reply/reply/session-lock-manager.ts | 497 ++++++++++++++++++ 5 files changed, 904 insertions(+) create mode 100644 src/auto-reply/reply/session-lock-manager.test.ts create mode 100644 src/auto-reply/reply/session-lock-manager.ts diff --git a/.env.example b/.env.example index 41df435b8f9..effceb33b6e 100644 --- a/.env.example +++ b/.env.example @@ -78,3 +78,12 @@ OPENCLAW_GATEWAY_TOKEN=change-me-to-a-long-random-token # ELEVENLABS_API_KEY=... # XI_API_KEY=... # alias for ElevenLabs # DEEPGRAM_API_KEY=... + +# ----------------------------------------------------------------------------- +# ACP distributed session locking (optional) +# ----------------------------------------------------------------------------- +# Enable Redis-backed ACP session locks across workers/containers. +# If unset, ACP uses an in-process local lock manager. +# OPENCLAW_ACP_SESSION_LOCK_REDIS_URL=redis://:password@redis-host:6379/0 +# Optional lock TTL in milliseconds (default: 120000). +# OPENCLAW_ACP_SESSION_LOCK_TTL_MS=120000 diff --git a/src/auto-reply/reply/dispatch-acp.test.ts b/src/auto-reply/reply/dispatch-acp.test.ts index b19f2edde09..4676c870096 100644 --- a/src/auto-reply/reply/dispatch-acp.test.ts +++ b/src/auto-reply/reply/dispatch-acp.test.ts @@ -52,6 +52,13 @@ const bindingServiceMocks = vi.hoisted(() => ({ listBySession: vi.fn<(sessionKey: string) => SessionBindingRecord[]>(() => []), })); +const sessionLockMocks = vi.hoisted(() => ({ + acquire: vi.fn(), + release: vi.fn(), + renew: vi.fn(), + resolveTtlMs: vi.fn(() => 120_000), +})); + vi.mock("../../acp/control-plane/manager.js", () => ({ getAcpSessionManager: () => managerMocks, })); @@ -87,6 +94,16 @@ vi.mock("../../infra/outbound/session-binding-service.js", () => ({ }), })); +vi.mock("./session-lock-manager.js", () => ({ + getAcpSessionLockManager: () => ({ + acquire: (sessionKey: string, ttlMs: number) => sessionLockMocks.acquire(sessionKey, ttlMs), + release: (sessionKey: string, ownerId: string) => sessionLockMocks.release(sessionKey, ownerId), + renew: (sessionKey: string, ownerId: string, ttlMs: number) => + sessionLockMocks.renew(sessionKey, ownerId, ttlMs), + }), + resolveAcpSessionLockTtlMs: () => sessionLockMocks.resolveTtlMs(), +})); + const { tryDispatchAcpReply } = await import("./dispatch-acp.js"); const sessionKey = "agent:codex-acp:session-1"; @@ -160,6 +177,41 @@ async function runDispatch(params: { }); } +async function runDispatchWithTracking(params: { + bodyForAgent: string; + cfg?: OpenClawConfig; + dispatcher?: ReplyDispatcher; + shouldRouteToOriginating?: boolean; + onReplyStart?: () => void; + ctxOverrides?: Record; +}) { + const recordProcessed = vi.fn(); + const markIdle = vi.fn(); + const result = await tryDispatchAcpReply({ + ctx: buildTestCtx({ + Provider: "discord", + Surface: "discord", + SessionKey: sessionKey, + BodyForAgent: params.bodyForAgent, + ...params.ctxOverrides, + }), + cfg: params.cfg ?? createAcpTestConfig(), + dispatcher: params.dispatcher ?? createDispatcher().dispatcher, + sessionKey, + inboundAudio: false, + shouldRouteToOriginating: params.shouldRouteToOriginating ?? false, + ...(params.shouldRouteToOriginating + ? { originatingChannel: "telegram", originatingTo: "telegram:thread-1" } + : {}), + shouldSendToolSummaries: true, + bypassForCommand: false, + ...(params.onReplyStart ? { onReplyStart: params.onReplyStart } : {}), + recordProcessed, + markIdle, + }); + return { result, recordProcessed, markIdle }; +} + async function emitToolLifecycleEvents( onEvent: (event: unknown) => Promise, toolCallId: string, @@ -232,6 +284,14 @@ describe("tryDispatchAcpReply", () => { sessionMetaMocks.readAcpSessionEntry.mockReturnValue(null); bindingServiceMocks.listBySession.mockReset(); bindingServiceMocks.listBySession.mockReturnValue([]); + sessionLockMocks.acquire.mockReset(); + sessionLockMocks.acquire.mockResolvedValue({ acquired: true, ownerId: "owner-1" }); + sessionLockMocks.release.mockReset(); + sessionLockMocks.release.mockResolvedValue(undefined); + sessionLockMocks.renew.mockReset(); + sessionLockMocks.renew.mockResolvedValue(true); + sessionLockMocks.resolveTtlMs.mockReset(); + sessionLockMocks.resolveTtlMs.mockReturnValue(120_000); }); it("routes ACP block output to originating channel", async () => { @@ -435,4 +495,71 @@ describe("tryDispatchAcpReply", () => { }), ); }); + + it("enforces ACP execution lock to prevent concurrent runs", async () => { + setReadyAcpResolution(); + let releaseRun: (() => void) | undefined; + let held = false; + sessionLockMocks.acquire.mockImplementation(async () => { + if (held) { + return { acquired: false } as const; + } + held = true; + return { acquired: true, ownerId: "owner-1" } as const; + }); + sessionLockMocks.release.mockImplementation(async () => { + held = false; + }); + managerMocks.runTurn.mockImplementation( + async () => + await new Promise((resolve) => { + releaseRun = resolve; + }), + ); + + const firstDispatch = runDispatch({ bodyForAgent: "first" }); + await vi.waitFor(() => { + expect(managerMocks.runTurn).toHaveBeenCalledTimes(1); + }); + + const secondResult = await runDispatch({ bodyForAgent: "second" }); + expect(secondResult?.queuedFinal).toBe(false); + expect(managerMocks.runTurn).toHaveBeenCalledTimes(1); + + releaseRun?.(); + await firstDispatch; + }); + + it("marks skipped with acp_execution_locked when lock is already held", async () => { + setReadyAcpResolution(); + sessionLockMocks.acquire.mockResolvedValue({ acquired: false }); + + const tracked = await runDispatchWithTracking({ bodyForAgent: "skip me" }); + + expect(tracked.result?.queuedFinal).toBe(false); + expect(managerMocks.runTurn).not.toHaveBeenCalled(); + expect(tracked.recordProcessed).toHaveBeenCalledWith("skipped", { + reason: "acp_execution_locked", + }); + }); + + it("releases lock after successful ACP dispatch", async () => { + setReadyAcpResolution(); + sessionLockMocks.acquire.mockResolvedValue({ acquired: true, ownerId: "owner-success" }); + managerMocks.runTurn.mockResolvedValue(undefined); + + await runDispatch({ bodyForAgent: "success" }); + + expect(sessionLockMocks.release).toHaveBeenCalledWith(sessionKey, "owner-success"); + }); + + it("releases lock after ACP dispatch throws", async () => { + setReadyAcpResolution(); + sessionLockMocks.acquire.mockResolvedValue({ acquired: true, ownerId: "owner-error" }); + managerMocks.runTurn.mockRejectedValue(new Error("boom")); + + await runDispatch({ bodyForAgent: "throws" }); + + expect(sessionLockMocks.release).toHaveBeenCalledWith(sessionKey, "owner-error"); + }); }); diff --git a/src/auto-reply/reply/dispatch-acp.ts b/src/auto-reply/reply/dispatch-acp.ts index 8fc7110fc4c..7cc24d4ed75 100644 --- a/src/auto-reply/reply/dispatch-acp.ts +++ b/src/auto-reply/reply/dispatch-acp.ts @@ -32,6 +32,7 @@ import type { FinalizedMsgContext } from "../templating.js"; import { createAcpReplyProjector } from "./acp-projector.js"; import { createAcpDispatchDeliveryCoordinator } from "./dispatch-acp-delivery.js"; import type { ReplyDispatcher, ReplyDispatchKind } from "./reply-dispatcher.js"; +import { getAcpSessionLockManager, resolveAcpSessionLockTtlMs } from "./session-lock-manager.js"; type DispatchProcessedRecorder = ( outcome: "completed" | "skipped" | "error", @@ -229,6 +230,62 @@ export async function tryDispatchAcpReply(params: { onReplyStart: params.onReplyStart, }); + const lockManager = getAcpSessionLockManager(); + const lockTtlMs = resolveAcpSessionLockTtlMs(); + const lockAcquireStartedAt = Date.now(); + let lockOwnerId: string | null = null; + + try { + const lockAcquire = await lockManager.acquire(sessionKey, lockTtlMs); + if (!lockAcquire.acquired) { + const counts = params.dispatcher.getQueuedCounts(); + params.recordProcessed("skipped", { reason: "acp_execution_locked" }); + params.markIdle("message_completed"); + logVerbose( + `dispatch-acp-lock: session=${sessionKey} lock_contended=true backend_wait_ms=${Date.now() - lockAcquireStartedAt}`, + ); + return { queuedFinal: false, counts }; + } + lockOwnerId = lockAcquire.ownerId; + logVerbose( + `dispatch-acp-lock: session=${sessionKey} acquired=true owner=${lockOwnerId} ttlMs=${lockTtlMs}`, + ); + } catch (error) { + const counts = params.dispatcher.getQueuedCounts(); + params.recordProcessed("skipped", { reason: "acp_execution_locked" }); + params.markIdle("message_completed"); + logVerbose( + `dispatch-acp-lock: session=${sessionKey} acquire_failed=${error instanceof Error ? error.message : String(error)}`, + ); + return { queuedFinal: false, counts }; + } + + let renewTimer: NodeJS.Timeout | null = null; + if (lockOwnerId) { + // Best-effort heartbeat: renew failures are logged, and the turn continues. + const renewEveryMs = Math.max(1_000, Math.floor(lockTtlMs / 2)); + renewTimer = setInterval(() => { + if (!lockOwnerId) { + return; + } + void lockManager + .renew(sessionKey, lockOwnerId, lockTtlMs) + .then((renewed) => { + if (!renewed) { + logVerbose( + `dispatch-acp-lock: session=${sessionKey} renew_failed owner=${lockOwnerId}`, + ); + } + }) + .catch((error) => { + logVerbose( + `dispatch-acp-lock: session=${sessionKey} renew_error=${error instanceof Error ? error.message : String(error)}`, + ); + }); + }, renewEveryMs); + renewTimer.unref?.(); + } + const identityPendingBeforeTurn = isSessionIdentityPending( resolveSessionIdentityFromMeta(acpResolution.kind === "ready" ? acpResolution.meta : undefined), ); @@ -390,5 +447,21 @@ export async function tryDispatchAcpReply(params: { }); params.markIdle("message_completed"); return { queuedFinal, counts }; + } finally { + if (renewTimer) { + clearInterval(renewTimer); + } + if (lockOwnerId) { + try { + await lockManager.release(sessionKey, lockOwnerId); + logVerbose(`dispatch-acp-lock: session=${sessionKey} released owner=${lockOwnerId}`); + } catch (error) { + logVerbose( + `dispatch-acp-lock: session=${sessionKey} release_failed owner=${lockOwnerId} error=${error instanceof Error ? error.message : String(error)}`, + ); + } finally { + lockOwnerId = null; + } + } } } diff --git a/src/auto-reply/reply/session-lock-manager.test.ts b/src/auto-reply/reply/session-lock-manager.test.ts new file mode 100644 index 00000000000..cf13bbc8065 --- /dev/null +++ b/src/auto-reply/reply/session-lock-manager.test.ts @@ -0,0 +1,198 @@ +import { beforeEach, describe, expect, it, vi } from "vitest"; +import { + LocalSessionLockManager, + RedisSessionLockManager, + getAcpSessionLockManager, + resetAcpSessionLockManagerForTests, + resolveAcpSessionLockTtlMs, +} from "./session-lock-manager.js"; + +describe("LocalSessionLockManager", () => { + it("acquires and releases session locks", async () => { + const manager = new LocalSessionLockManager(); + + const first = await manager.acquire("session-1", 5_000); + expect(first.acquired).toBe(true); + if (!first.acquired) { + throw new Error("expected first lock to be acquired"); + } + const second = await manager.acquire("session-1", 5_000); + expect(second).toEqual({ acquired: false }); + + await manager.release("session-1", first.ownerId); + const third = await manager.acquire("session-1", 5_000); + expect(third.acquired).toBe(true); + }); + + it("blocks concurrent acquire for the same session key", async () => { + const manager = new LocalSessionLockManager(); + const [a, b] = await Promise.all([ + manager.acquire("session-2", 5_000), + manager.acquire("session-2", 5_000), + ]); + expect([a.acquired, b.acquired].toSorted((x, y) => Number(x) - Number(y))).toEqual([ + false, + true, + ]); + }); + + it("only allows the current owner to release", async () => { + const manager = new LocalSessionLockManager(); + const first = await manager.acquire("session-3", 5_000); + expect(first.acquired).toBe(true); + if (!first.acquired) { + throw new Error("expected lock to be acquired"); + } + + await manager.release("session-3", "wrong-owner"); + const blocked = await manager.acquire("session-3", 5_000); + expect(blocked).toEqual({ acquired: false }); + + await manager.release("session-3", first.ownerId); + const reopened = await manager.acquire("session-3", 5_000); + expect(reopened.acquired).toBe(true); + }); + + it("renews lock ttl only for the current owner", async () => { + vi.useFakeTimers(); + try { + const manager = new LocalSessionLockManager(); + const first = await manager.acquire("session-4", 1_000); + expect(first.acquired).toBe(true); + if (!first.acquired) { + throw new Error("expected lock to be acquired"); + } + + vi.advanceTimersByTime(700); + const wrongOwnerRenewed = await manager.renew("session-4", "wrong-owner", 1_000); + expect(wrongOwnerRenewed).toBe(false); + + const renewed = await manager.renew("session-4", first.ownerId, 1_000); + expect(renewed).toBe(true); + + vi.advanceTimersByTime(700); + const blocked = await manager.acquire("session-4", 1_000); + expect(blocked).toEqual({ acquired: false }); + + vi.advanceTimersByTime(400); + const afterExpiry = await manager.acquire("session-4", 1_000); + expect(afterExpiry.acquired).toBe(true); + } finally { + vi.useRealTimers(); + } + }); +}); + +describe("RedisSessionLockManager", () => { + beforeEach(() => { + vi.restoreAllMocks(); + }); + + it("acquire uses NX+PX and returns false when lock exists", async () => { + const values = new Map(); + const runRedisCommand = vi.fn(async (args: string[]) => { + if (args[0] === "SET") { + const [_, key, value] = args; + if (values.has(key)) { + return null; + } + values.set(key, value); + return "OK"; + } + throw new Error(`unsupported command: ${args.join(" ")}`); + }); + const manager = new RedisSessionLockManager({ + runRedisCommand, + ownerIdFactory: () => "owner-1", + }); + + const acquired = await manager.acquire("session-5", 9_000); + expect(acquired).toEqual({ acquired: true, ownerId: "owner-1" }); + const blocked = await manager.acquire("session-5", 9_000); + expect(blocked).toEqual({ acquired: false }); + expect(runRedisCommand).toHaveBeenCalledWith([ + "SET", + "lock:acp:session:session-5", + "owner-1", + "NX", + "PX", + "9000", + ]); + }); + + it("only owner can release and renew", async () => { + const values = new Map(); + const runRedisCommand = vi.fn(async (args: string[]) => { + if (args[0] === "SET") { + const [_, key, value] = args; + if (values.has(key)) { + return null; + } + values.set(key, value); + return "OK"; + } + if (args[0] === "EVAL") { + const [_, script, __, key, ownerId] = args; + if (script.includes("DEL")) { + if (values.get(key) === ownerId) { + values.delete(key); + return 1; + } + return 0; + } + if (script.includes("PEXPIRE")) { + const ttl = Number.parseInt(args[5] ?? "", 10); + expect(ttl).toBe(7_000); + return values.get(key) === ownerId ? 1 : 0; + } + } + throw new Error(`unsupported command: ${args.join(" ")}`); + }); + const manager = new RedisSessionLockManager({ + runRedisCommand, + ownerIdFactory: () => "owner-2", + }); + const acquired = await manager.acquire("session-6", 7_000); + expect(acquired).toEqual({ acquired: true, ownerId: "owner-2" }); + + const wrongOwnerRenew = await manager.renew("session-6", "wrong-owner", 7_000); + expect(wrongOwnerRenew).toBe(false); + const ownerRenew = await manager.renew("session-6", "owner-2", 7_000); + expect(ownerRenew).toBe(true); + + await manager.release("session-6", "wrong-owner"); + const stillBlocked = await manager.acquire("session-6", 7_000); + expect(stillBlocked).toEqual({ acquired: false }); + + await manager.release("session-6", "owner-2"); + const reopened = await manager.acquire("session-6", 7_000); + expect(reopened).toEqual({ acquired: true, ownerId: "owner-2" }); + }); +}); + +describe("session lock manager selection", () => { + beforeEach(() => { + resetAcpSessionLockManagerForTests(); + }); + + it("defaults to local lock manager when Redis is not configured", () => { + const manager = getAcpSessionLockManager({}); + expect(manager).toBeInstanceOf(LocalSessionLockManager); + }); + + it("fails closed when Redis is configured but initialization fails", async () => { + const manager = getAcpSessionLockManager({ + OPENCLAW_ACP_SESSION_LOCK_REDIS_URL: "http://redis.example", + }); + expect(manager).not.toBeInstanceOf(LocalSessionLockManager); + await expect(manager.acquire("session-7", 1_000)).rejects.toThrow( + "Redis ACP session lock manager unavailable", + ); + }); + + it("uses default ttl when env is missing or invalid", () => { + expect(resolveAcpSessionLockTtlMs({})).toBe(120_000); + expect(resolveAcpSessionLockTtlMs({ OPENCLAW_ACP_SESSION_LOCK_TTL_MS: "nope" })).toBe(120_000); + expect(resolveAcpSessionLockTtlMs({ OPENCLAW_ACP_SESSION_LOCK_TTL_MS: "2000" })).toBe(2_000); + }); +}); diff --git a/src/auto-reply/reply/session-lock-manager.ts b/src/auto-reply/reply/session-lock-manager.ts new file mode 100644 index 00000000000..36388e0bc9f --- /dev/null +++ b/src/auto-reply/reply/session-lock-manager.ts @@ -0,0 +1,497 @@ +import net from "node:net"; +import tls from "node:tls"; +import { logVerbose } from "../../globals.js"; +import { generateSecureUuid } from "../../infra/secure-random.js"; + +export interface SessionLockManager { + acquire( + sessionKey: string, + ttlMs: number, + ): Promise<{ acquired: true; ownerId: string } | { acquired: false }>; + release(sessionKey: string, ownerId: string): Promise; + renew(sessionKey: string, ownerId: string, ttlMs: number): Promise; +} + +type RedisReply = number | string | null | RedisReply[]; + +type RedisCommandRunner = (args: string[]) => Promise; + +const DEFAULT_ACP_SESSION_LOCK_TTL_MS = 120_000; +const MIN_LOCK_TTL_MS = 1_000; +const LOCK_KEY_PREFIX = "lock:acp:session:"; +const RELEASE_IF_OWNER_LUA = `if redis.call("GET", KEYS[1]) == ARGV[1] then return redis.call("DEL", KEYS[1]) else return 0 end`; +const RENEW_IF_OWNER_LUA = `if redis.call("GET", KEYS[1]) == ARGV[1] then return redis.call("PEXPIRE", KEYS[1], ARGV[2]) else return 0 end`; + +type RedisConnectionConfig = { + tls: boolean; + host: string; + port: number; + username?: string; + password?: string; + database: number; +}; + +class RedisProtocolError extends Error {} + +class RedisSocketConnection { + private readonly socket: net.Socket | tls.TLSSocket; + private buffer = Buffer.alloc(0); + private pending: + | { + resolve: (value: RedisReply) => void; + reject: (error: Error) => void; + } + | undefined; + + constructor(socket: net.Socket | tls.TLSSocket) { + this.socket = socket; + this.socket.on("data", (chunk: Buffer) => { + this.buffer = Buffer.concat([this.buffer, chunk]); + this.flushPending(); + }); + this.socket.on("error", (error) => { + if (this.pending) { + this.pending.reject(error); + this.pending = undefined; + } + }); + this.socket.on("close", () => { + if (this.pending) { + this.pending.reject(new Error("Redis socket closed before response was received.")); + this.pending = undefined; + } + }); + } + + async sendCommand(args: string[]): Promise { + if (this.pending) { + throw new Error("Redis command pipelining is not supported by this connection."); + } + const payload = encodeRedisCommand(args); + this.socket.write(payload); + const parsed = tryParseRedisReply(this.buffer); + if (parsed) { + this.buffer = this.buffer.subarray(parsed.nextOffset); + if (parsed.error) { + throw parsed.error; + } + return parsed.value; + } + return await new Promise((resolve, reject) => { + this.pending = { resolve, reject }; + }); + } + + close(): void { + this.socket.end(); + this.socket.destroy(); + } + + private flushPending(): void { + if (!this.pending) { + return; + } + const parsed = tryParseRedisReply(this.buffer); + if (!parsed) { + return; + } + this.buffer = this.buffer.subarray(parsed.nextOffset); + const pending = this.pending; + this.pending = undefined; + if (parsed.error) { + pending.reject(parsed.error); + return; + } + pending.resolve(parsed.value); + } +} + +function encodeRedisCommand(args: string[]): Buffer { + const lines: string[] = [`*${args.length}`]; + for (const arg of args) { + lines.push(`$${Buffer.byteLength(arg)}`); + lines.push(arg); + } + return Buffer.from(`${lines.join("\r\n")}\r\n`, "utf8"); +} + +function tryReadLine( + buffer: Buffer, + startOffset: number, +): { line: string; nextOffset: number } | null { + const lineEnd = buffer.indexOf("\r\n", startOffset); + if (lineEnd < 0) { + return null; + } + const line = buffer.toString("utf8", startOffset, lineEnd); + return { line, nextOffset: lineEnd + 2 }; +} + +function parseRedisValueAt( + buffer: Buffer, + startOffset: number, +): { value: RedisReply; nextOffset: number; error?: Error } | null { + if (startOffset >= buffer.length) { + return null; + } + const type = String.fromCharCode(buffer[startOffset]); + const line = tryReadLine(buffer, startOffset + 1); + if (!line) { + return null; + } + if (type === "+") { + return { value: line.line, nextOffset: line.nextOffset }; + } + if (type === "-") { + return { + value: line.line, + nextOffset: line.nextOffset, + error: new RedisProtocolError(`Redis error reply: ${line.line}`), + }; + } + if (type === ":") { + const parsed = Number.parseInt(line.line, 10); + if (!Number.isFinite(parsed)) { + throw new RedisProtocolError(`Invalid Redis integer reply: ${line.line}`); + } + return { value: parsed, nextOffset: line.nextOffset }; + } + if (type === "$") { + const len = Number.parseInt(line.line, 10); + if (!Number.isFinite(len)) { + throw new RedisProtocolError(`Invalid Redis bulk length: ${line.line}`); + } + if (len === -1) { + return { value: null, nextOffset: line.nextOffset }; + } + const bodyEnd = line.nextOffset + len; + if (bodyEnd + 2 > buffer.length) { + return null; + } + const text = buffer.toString("utf8", line.nextOffset, bodyEnd); + return { value: text, nextOffset: bodyEnd + 2 }; + } + if (type === "*") { + const count = Number.parseInt(line.line, 10); + if (!Number.isFinite(count)) { + throw new RedisProtocolError(`Invalid Redis array length: ${line.line}`); + } + if (count === -1) { + return { value: null, nextOffset: line.nextOffset }; + } + let nextOffset = line.nextOffset; + const values: RedisReply[] = []; + for (let idx = 0; idx < count; idx += 1) { + const child = parseRedisValueAt(buffer, nextOffset); + if (!child) { + return null; + } + nextOffset = child.nextOffset; + if (child.error) { + return { + value: child.value, + nextOffset, + error: child.error, + }; + } + values.push(child.value); + } + return { value: values, nextOffset }; + } + throw new RedisProtocolError(`Unsupported Redis response type: ${type}`); +} + +function tryParseRedisReply( + buffer: Buffer, +): { value: RedisReply; nextOffset: number; error?: Error } | null { + return parseRedisValueAt(buffer, 0); +} + +function parseRedisConnectionConfig(redisUrl: string): RedisConnectionConfig { + const parsed = new URL(redisUrl); + if (parsed.protocol !== "redis:" && parsed.protocol !== "rediss:") { + throw new Error( + `Unsupported Redis URL protocol "${parsed.protocol}". Expected redis:// or rediss://.`, + ); + } + const host = parsed.hostname?.trim(); + if (!host) { + throw new Error("Redis URL must include a host."); + } + const portRaw = parsed.port?.trim(); + const port = portRaw ? Number.parseInt(portRaw, 10) : 6379; + if (!Number.isFinite(port) || port <= 0) { + throw new Error(`Redis URL has invalid port: ${portRaw || "(empty)"}`); + } + const username = parsed.username ? decodeURIComponent(parsed.username) : undefined; + const password = parsed.password ? decodeURIComponent(parsed.password) : undefined; + const dbRaw = parsed.pathname.replace(/^\//, "").trim(); + const database = dbRaw ? Number.parseInt(dbRaw, 10) : 0; + if (!Number.isFinite(database) || database < 0) { + throw new Error(`Redis URL has invalid database index: ${dbRaw || "(empty)"}`); + } + return { + tls: parsed.protocol === "rediss:", + host, + port, + username, + password, + database, + }; +} + +async function connectRedis(config: RedisConnectionConfig): Promise { + const socket: net.Socket | tls.TLSSocket = await new Promise((resolve, reject) => { + const connectHandler = () => { + cleanup(); + resolve(rawSocket); + }; + const errorHandler = (error: Error) => { + cleanup(); + reject(error); + }; + const timeoutHandler = () => { + cleanup(); + reject(new Error("Redis connection timed out.")); + }; + const cleanup = () => { + rawSocket.off("connect", connectHandler); + rawSocket.off("error", errorHandler); + rawSocket.off("timeout", timeoutHandler); + }; + const rawSocket = config.tls + ? tls.connect({ host: config.host, port: config.port }) + : net.createConnection({ host: config.host, port: config.port }); + rawSocket.setTimeout(10_000); + rawSocket.on("connect", connectHandler); + rawSocket.on("error", errorHandler); + rawSocket.on("timeout", timeoutHandler); + }); + const connection = new RedisSocketConnection(socket); + if (config.password) { + const authArgs = config.username + ? ["AUTH", config.username, config.password] + : ["AUTH", config.password]; + await connection.sendCommand(authArgs); + } + if (config.database !== 0) { + await connection.sendCommand(["SELECT", String(config.database)]); + } + return connection; +} + +function createRedisCommandRunner(redisUrl: string): RedisCommandRunner { + // No shared Redis client currently exists in this repo; keep lock-scoped RESP wiring local. + const config = parseRedisConnectionConfig(redisUrl); + return async (args: string[]) => { + const connection = await connectRedis(config); + try { + return await connection.sendCommand(args); + } finally { + connection.close(); + } + }; +} + +function toPositiveTtlMs(ttlMs: number): number { + if (!Number.isFinite(ttlMs)) { + return DEFAULT_ACP_SESSION_LOCK_TTL_MS; + } + return Math.max(MIN_LOCK_TTL_MS, Math.floor(ttlMs)); +} + +function lockKeyForSession(sessionKey: string): string { + return `${LOCK_KEY_PREFIX}${sessionKey}`; +} + +type LocalLockEntry = { + ownerId: string; + expiresAtMs: number; +}; + +export class LocalSessionLockManager implements SessionLockManager { + private readonly locks = new Map(); + + async acquire( + sessionKey: string, + ttlMs: number, + ): Promise<{ acquired: true; ownerId: string } | { acquired: false }> { + this.clearIfExpired(sessionKey); + if (this.locks.has(sessionKey)) { + return { acquired: false }; + } + const ownerId = generateSecureUuid(); + this.locks.set(sessionKey, { + ownerId, + expiresAtMs: Date.now() + toPositiveTtlMs(ttlMs), + }); + return { acquired: true, ownerId }; + } + + async release(sessionKey: string, ownerId: string): Promise { + this.clearIfExpired(sessionKey); + const lock = this.locks.get(sessionKey); + if (!lock || lock.ownerId !== ownerId) { + return; + } + this.locks.delete(sessionKey); + } + + async renew(sessionKey: string, ownerId: string, ttlMs: number): Promise { + this.clearIfExpired(sessionKey); + const lock = this.locks.get(sessionKey); + if (!lock || lock.ownerId !== ownerId) { + return false; + } + lock.expiresAtMs = Date.now() + toPositiveTtlMs(ttlMs); + this.locks.set(sessionKey, lock); + return true; + } + + private clearIfExpired(sessionKey: string): void { + const lock = this.locks.get(sessionKey); + if (!lock) { + return; + } + if (lock.expiresAtMs <= Date.now()) { + this.locks.delete(sessionKey); + } + } +} + +export class RedisSessionLockManager implements SessionLockManager { + private readonly runRedisCommand: RedisCommandRunner; + private readonly ownerIdFactory: () => string; + + constructor(params: { + redisUrl?: string; + runRedisCommand?: RedisCommandRunner; + ownerIdFactory?: () => string; + }) { + if (!params.redisUrl && !params.runRedisCommand) { + throw new Error("RedisSessionLockManager requires redisUrl or runRedisCommand."); + } + this.runRedisCommand = + params.runRedisCommand ?? + createRedisCommandRunner( + params.redisUrl ?? + (() => { + throw new Error("Missing redisUrl"); + })(), + ); + this.ownerIdFactory = params.ownerIdFactory ?? generateSecureUuid; + } + + async acquire( + sessionKey: string, + ttlMs: number, + ): Promise<{ acquired: true; ownerId: string } | { acquired: false }> { + const ownerId = this.ownerIdFactory(); + const result = await this.runRedisCommand([ + "SET", + lockKeyForSession(sessionKey), + ownerId, + "NX", + "PX", + String(toPositiveTtlMs(ttlMs)), + ]); + if (result === "OK") { + return { acquired: true, ownerId }; + } + return { acquired: false }; + } + + async release(sessionKey: string, ownerId: string): Promise { + await this.runRedisCommand([ + "EVAL", + RELEASE_IF_OWNER_LUA, + "1", + lockKeyForSession(sessionKey), + ownerId, + ]); + } + + async renew(sessionKey: string, ownerId: string, ttlMs: number): Promise { + const result = await this.runRedisCommand([ + "EVAL", + RENEW_IF_OWNER_LUA, + "1", + lockKeyForSession(sessionKey), + ownerId, + String(toPositiveTtlMs(ttlMs)), + ]); + return Number(result) === 1; + } +} + +class FailClosedSessionLockManager implements SessionLockManager { + private readonly reason: string; + + constructor(reason: string) { + this.reason = reason; + } + + async acquire( + _sessionKey: string, + _ttlMs: number, + ): Promise<{ acquired: true; ownerId: string } | { acquired: false }> { + throw new Error(`Redis ACP session lock manager unavailable: ${this.reason}`); + } + + async release(_sessionKey: string, _ownerId: string): Promise {} + + async renew(_sessionKey: string, _ownerId: string, _ttlMs: number): Promise { + return false; + } +} + +export function resolveAcpSessionLockTtlMs(env: NodeJS.ProcessEnv = process.env): number { + const raw = env.OPENCLAW_ACP_SESSION_LOCK_TTL_MS?.trim(); + if (!raw) { + return DEFAULT_ACP_SESSION_LOCK_TTL_MS; + } + const parsed = Number.parseInt(raw, 10); + if (!Number.isFinite(parsed) || parsed <= 0) { + return DEFAULT_ACP_SESSION_LOCK_TTL_MS; + } + return toPositiveTtlMs(parsed); +} + +function resolveRedisLockUrl(env: NodeJS.ProcessEnv = process.env): string | null { + const candidates = [env.OPENCLAW_ACP_SESSION_LOCK_REDIS_URL, env.REDIS_URL]; + for (const candidate of candidates) { + const normalized = candidate?.trim(); + if (normalized) { + return normalized; + } + } + return null; +} + +let ACP_SESSION_LOCK_MANAGER_SINGLETON: SessionLockManager | null = null; + +export function getAcpSessionLockManager(env: NodeJS.ProcessEnv = process.env): SessionLockManager { + if (ACP_SESSION_LOCK_MANAGER_SINGLETON) { + return ACP_SESSION_LOCK_MANAGER_SINGLETON; + } + const redisUrl = resolveRedisLockUrl(env); + if (!redisUrl) { + ACP_SESSION_LOCK_MANAGER_SINGLETON = new LocalSessionLockManager(); + logVerbose("dispatch-acp-lock: backend=local redis_configured=false"); + return ACP_SESSION_LOCK_MANAGER_SINGLETON; + } + try { + ACP_SESSION_LOCK_MANAGER_SINGLETON = new RedisSessionLockManager({ redisUrl }); + logVerbose("dispatch-acp-lock: backend=redis"); + return ACP_SESSION_LOCK_MANAGER_SINGLETON; + } catch (error) { + const message = error instanceof Error ? error.message : String(error); + ACP_SESSION_LOCK_MANAGER_SINGLETON = new FailClosedSessionLockManager(message); + logVerbose(`dispatch-acp-lock: backend=redis init_failed=${message} fail_closed=true`); + return ACP_SESSION_LOCK_MANAGER_SINGLETON; + } +} + +export function resetAcpSessionLockManagerForTests(): void { + ACP_SESSION_LOCK_MANAGER_SINGLETON = null; +} From 475141fdd78214d7e1b2125e2f26cb7b69fab683 Mon Sep 17 00:00:00 2001 From: Dominic Lewis Date: Wed, 18 Mar 2026 19:29:51 -0400 Subject: [PATCH 2/2] fix(acp): add per-command timeout and remove REDIS_URL fallback C-1: sendCommand now races against a 10s per-command timeout so a Redis instance that accepts the TCP connection but then stalls will reject and flow into the existing fail-closed path instead of hanging dispatch indefinitely. C-3: resolveRedisLockUrl no longer falls back to REDIS_URL. ACP Redis locking now activates only when OPENCLAW_ACP_SESSION_LOCK_REDIS_URL is explicitly set, preventing unrelated Redis deployments from being silently opted into fail-closed ACP locking. Tests: added two cases to session-lock-manager.test.ts confirming that REDIS_URL alone selects the local backend, and that OPENCLAW_ACP_SESSION_LOCK_REDIS_URL takes precedence and triggers fail-closed when the URL is invalid. --- .../reply/session-lock-manager.test.ts | 21 +++++++++++++++ src/auto-reply/reply/session-lock-manager.ts | 26 ++++++++++--------- 2 files changed, 35 insertions(+), 12 deletions(-) diff --git a/src/auto-reply/reply/session-lock-manager.test.ts b/src/auto-reply/reply/session-lock-manager.test.ts index cf13bbc8065..2ae2696f809 100644 --- a/src/auto-reply/reply/session-lock-manager.test.ts +++ b/src/auto-reply/reply/session-lock-manager.test.ts @@ -195,4 +195,25 @@ describe("session lock manager selection", () => { expect(resolveAcpSessionLockTtlMs({ OPENCLAW_ACP_SESSION_LOCK_TTL_MS: "nope" })).toBe(120_000); expect(resolveAcpSessionLockTtlMs({ OPENCLAW_ACP_SESSION_LOCK_TTL_MS: "2000" })).toBe(2_000); }); + + // C-3: REDIS_URL fallback removed — generic REDIS_URL must not activate Redis-backed locking + it("does not activate Redis backend when only REDIS_URL is set", () => { + const manager = getAcpSessionLockManager({ REDIS_URL: "redis://localhost:6379" }); + expect(manager).toBeInstanceOf(LocalSessionLockManager); + }); + + it("activates Redis backend only when OPENCLAW_ACP_SESSION_LOCK_REDIS_URL is set", () => { + const manager = getAcpSessionLockManager({ + REDIS_URL: "redis://localhost:6379", + OPENCLAW_ACP_SESSION_LOCK_REDIS_URL: "http://redis.example", + }); + // init will fail (bad scheme) but it must not fall back to local — must be fail-closed + expect(manager).not.toBeInstanceOf(LocalSessionLockManager); + }); }); + +// C-1: sendCommand per-command timeout lives inside RedisSocketConnection, which is private +// and only reachable via createRedisCommandRunner (real TCP path). That path is not exercised +// in unit tests because RedisSessionLockManager accepts a runRedisCommand injection that +// bypasses socket I/O entirely. The Promise.race timeout is covered by code inspection; a +// network-level integration test would require a real (or mock) TCP server. diff --git a/src/auto-reply/reply/session-lock-manager.ts b/src/auto-reply/reply/session-lock-manager.ts index 36388e0bc9f..98d804019f0 100644 --- a/src/auto-reply/reply/session-lock-manager.ts +++ b/src/auto-reply/reply/session-lock-manager.ts @@ -63,7 +63,7 @@ class RedisSocketConnection { }); } - async sendCommand(args: string[]): Promise { + async sendCommand(args: string[], timeoutMs = 10_000): Promise { if (this.pending) { throw new Error("Redis command pipelining is not supported by this connection."); } @@ -77,9 +77,17 @@ class RedisSocketConnection { } return parsed.value; } - return await new Promise((resolve, reject) => { - this.pending = { resolve, reject }; - }); + return await Promise.race([ + new Promise((resolve, reject) => { + this.pending = { resolve, reject }; + }), + new Promise((_, reject) => { + setTimeout( + () => reject(new Error(`Redis command timed out after ${timeoutMs}ms.`)), + timeoutMs, + ); + }), + ]); } close(): void { @@ -458,14 +466,8 @@ export function resolveAcpSessionLockTtlMs(env: NodeJS.ProcessEnv = process.env) } function resolveRedisLockUrl(env: NodeJS.ProcessEnv = process.env): string | null { - const candidates = [env.OPENCLAW_ACP_SESSION_LOCK_REDIS_URL, env.REDIS_URL]; - for (const candidate of candidates) { - const normalized = candidate?.trim(); - if (normalized) { - return normalized; - } - } - return null; + const normalized = env.OPENCLAW_ACP_SESSION_LOCK_REDIS_URL?.trim(); + return normalized || null; } let ACP_SESSION_LOCK_MANAGER_SINGLETON: SessionLockManager | null = null;