Merge 475141fdd78214d7e1b2125e2f26cb7b69fab683 into 5e417b44e1540f528d2ae63e3e20229a902d1db2

This commit is contained in:
Dnick20 2026-03-20 20:14:10 -07:00 committed by GitHub
commit 6d4c199af7
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 927 additions and 0 deletions

View File

@ -78,3 +78,12 @@ OPENCLAW_GATEWAY_TOKEN=change-me-to-a-long-random-token
# ELEVENLABS_API_KEY=...
# XI_API_KEY=... # alias for ElevenLabs
# DEEPGRAM_API_KEY=...
# -----------------------------------------------------------------------------
# ACP distributed session locking (optional)
# -----------------------------------------------------------------------------
# Enable Redis-backed ACP session locks across workers/containers.
# If unset, ACP uses an in-process local lock manager.
# OPENCLAW_ACP_SESSION_LOCK_REDIS_URL=redis://:password@redis-host:6379/0
# Optional lock TTL in milliseconds (default: 120000).
# OPENCLAW_ACP_SESSION_LOCK_TTL_MS=120000

View File

@ -52,6 +52,13 @@ const bindingServiceMocks = vi.hoisted(() => ({
listBySession: vi.fn<(sessionKey: string) => SessionBindingRecord[]>(() => []),
}));
const sessionLockMocks = vi.hoisted(() => ({
acquire: vi.fn(),
release: vi.fn(),
renew: vi.fn(),
resolveTtlMs: vi.fn(() => 120_000),
}));
vi.mock("../../acp/control-plane/manager.js", () => ({
getAcpSessionManager: () => managerMocks,
}));
@ -87,6 +94,16 @@ vi.mock("../../infra/outbound/session-binding-service.js", () => ({
}),
}));
vi.mock("./session-lock-manager.js", () => ({
getAcpSessionLockManager: () => ({
acquire: (sessionKey: string, ttlMs: number) => sessionLockMocks.acquire(sessionKey, ttlMs),
release: (sessionKey: string, ownerId: string) => sessionLockMocks.release(sessionKey, ownerId),
renew: (sessionKey: string, ownerId: string, ttlMs: number) =>
sessionLockMocks.renew(sessionKey, ownerId, ttlMs),
}),
resolveAcpSessionLockTtlMs: () => sessionLockMocks.resolveTtlMs(),
}));
const { tryDispatchAcpReply } = await import("./dispatch-acp.js");
const sessionKey = "agent:codex-acp:session-1";
@ -160,6 +177,41 @@ async function runDispatch(params: {
});
}
async function runDispatchWithTracking(params: {
bodyForAgent: string;
cfg?: OpenClawConfig;
dispatcher?: ReplyDispatcher;
shouldRouteToOriginating?: boolean;
onReplyStart?: () => void;
ctxOverrides?: Record<string, unknown>;
}) {
const recordProcessed = vi.fn();
const markIdle = vi.fn();
const result = await tryDispatchAcpReply({
ctx: buildTestCtx({
Provider: "discord",
Surface: "discord",
SessionKey: sessionKey,
BodyForAgent: params.bodyForAgent,
...params.ctxOverrides,
}),
cfg: params.cfg ?? createAcpTestConfig(),
dispatcher: params.dispatcher ?? createDispatcher().dispatcher,
sessionKey,
inboundAudio: false,
shouldRouteToOriginating: params.shouldRouteToOriginating ?? false,
...(params.shouldRouteToOriginating
? { originatingChannel: "telegram", originatingTo: "telegram:thread-1" }
: {}),
shouldSendToolSummaries: true,
bypassForCommand: false,
...(params.onReplyStart ? { onReplyStart: params.onReplyStart } : {}),
recordProcessed,
markIdle,
});
return { result, recordProcessed, markIdle };
}
async function emitToolLifecycleEvents(
onEvent: (event: unknown) => Promise<void>,
toolCallId: string,
@ -232,6 +284,14 @@ describe("tryDispatchAcpReply", () => {
sessionMetaMocks.readAcpSessionEntry.mockReturnValue(null);
bindingServiceMocks.listBySession.mockReset();
bindingServiceMocks.listBySession.mockReturnValue([]);
sessionLockMocks.acquire.mockReset();
sessionLockMocks.acquire.mockResolvedValue({ acquired: true, ownerId: "owner-1" });
sessionLockMocks.release.mockReset();
sessionLockMocks.release.mockResolvedValue(undefined);
sessionLockMocks.renew.mockReset();
sessionLockMocks.renew.mockResolvedValue(true);
sessionLockMocks.resolveTtlMs.mockReset();
sessionLockMocks.resolveTtlMs.mockReturnValue(120_000);
});
it("routes ACP block output to originating channel", async () => {
@ -435,4 +495,71 @@ describe("tryDispatchAcpReply", () => {
}),
);
});
it("enforces ACP execution lock to prevent concurrent runs", async () => {
setReadyAcpResolution();
let releaseRun: (() => void) | undefined;
let held = false;
sessionLockMocks.acquire.mockImplementation(async () => {
if (held) {
return { acquired: false } as const;
}
held = true;
return { acquired: true, ownerId: "owner-1" } as const;
});
sessionLockMocks.release.mockImplementation(async () => {
held = false;
});
managerMocks.runTurn.mockImplementation(
async () =>
await new Promise<void>((resolve) => {
releaseRun = resolve;
}),
);
const firstDispatch = runDispatch({ bodyForAgent: "first" });
await vi.waitFor(() => {
expect(managerMocks.runTurn).toHaveBeenCalledTimes(1);
});
const secondResult = await runDispatch({ bodyForAgent: "second" });
expect(secondResult?.queuedFinal).toBe(false);
expect(managerMocks.runTurn).toHaveBeenCalledTimes(1);
releaseRun?.();
await firstDispatch;
});
it("marks skipped with acp_execution_locked when lock is already held", async () => {
setReadyAcpResolution();
sessionLockMocks.acquire.mockResolvedValue({ acquired: false });
const tracked = await runDispatchWithTracking({ bodyForAgent: "skip me" });
expect(tracked.result?.queuedFinal).toBe(false);
expect(managerMocks.runTurn).not.toHaveBeenCalled();
expect(tracked.recordProcessed).toHaveBeenCalledWith("skipped", {
reason: "acp_execution_locked",
});
});
it("releases lock after successful ACP dispatch", async () => {
setReadyAcpResolution();
sessionLockMocks.acquire.mockResolvedValue({ acquired: true, ownerId: "owner-success" });
managerMocks.runTurn.mockResolvedValue(undefined);
await runDispatch({ bodyForAgent: "success" });
expect(sessionLockMocks.release).toHaveBeenCalledWith(sessionKey, "owner-success");
});
it("releases lock after ACP dispatch throws", async () => {
setReadyAcpResolution();
sessionLockMocks.acquire.mockResolvedValue({ acquired: true, ownerId: "owner-error" });
managerMocks.runTurn.mockRejectedValue(new Error("boom"));
await runDispatch({ bodyForAgent: "throws" });
expect(sessionLockMocks.release).toHaveBeenCalledWith(sessionKey, "owner-error");
});
});

View File

@ -32,6 +32,7 @@ import type { FinalizedMsgContext } from "../templating.js";
import { createAcpReplyProjector } from "./acp-projector.js";
import { createAcpDispatchDeliveryCoordinator } from "./dispatch-acp-delivery.js";
import type { ReplyDispatcher, ReplyDispatchKind } from "./reply-dispatcher.js";
import { getAcpSessionLockManager, resolveAcpSessionLockTtlMs } from "./session-lock-manager.js";
type DispatchProcessedRecorder = (
outcome: "completed" | "skipped" | "error",
@ -229,6 +230,62 @@ export async function tryDispatchAcpReply(params: {
onReplyStart: params.onReplyStart,
});
const lockManager = getAcpSessionLockManager();
const lockTtlMs = resolveAcpSessionLockTtlMs();
const lockAcquireStartedAt = Date.now();
let lockOwnerId: string | null = null;
try {
const lockAcquire = await lockManager.acquire(sessionKey, lockTtlMs);
if (!lockAcquire.acquired) {
const counts = params.dispatcher.getQueuedCounts();
params.recordProcessed("skipped", { reason: "acp_execution_locked" });
params.markIdle("message_completed");
logVerbose(
`dispatch-acp-lock: session=${sessionKey} lock_contended=true backend_wait_ms=${Date.now() - lockAcquireStartedAt}`,
);
return { queuedFinal: false, counts };
}
lockOwnerId = lockAcquire.ownerId;
logVerbose(
`dispatch-acp-lock: session=${sessionKey} acquired=true owner=${lockOwnerId} ttlMs=${lockTtlMs}`,
);
} catch (error) {
const counts = params.dispatcher.getQueuedCounts();
params.recordProcessed("skipped", { reason: "acp_execution_locked" });
params.markIdle("message_completed");
logVerbose(
`dispatch-acp-lock: session=${sessionKey} acquire_failed=${error instanceof Error ? error.message : String(error)}`,
);
return { queuedFinal: false, counts };
}
let renewTimer: NodeJS.Timeout | null = null;
if (lockOwnerId) {
// Best-effort heartbeat: renew failures are logged, and the turn continues.
const renewEveryMs = Math.max(1_000, Math.floor(lockTtlMs / 2));
renewTimer = setInterval(() => {
if (!lockOwnerId) {
return;
}
void lockManager
.renew(sessionKey, lockOwnerId, lockTtlMs)
.then((renewed) => {
if (!renewed) {
logVerbose(
`dispatch-acp-lock: session=${sessionKey} renew_failed owner=${lockOwnerId}`,
);
}
})
.catch((error) => {
logVerbose(
`dispatch-acp-lock: session=${sessionKey} renew_error=${error instanceof Error ? error.message : String(error)}`,
);
});
}, renewEveryMs);
renewTimer.unref?.();
}
const identityPendingBeforeTurn = isSessionIdentityPending(
resolveSessionIdentityFromMeta(acpResolution.kind === "ready" ? acpResolution.meta : undefined),
);
@ -390,5 +447,21 @@ export async function tryDispatchAcpReply(params: {
});
params.markIdle("message_completed");
return { queuedFinal, counts };
} finally {
if (renewTimer) {
clearInterval(renewTimer);
}
if (lockOwnerId) {
try {
await lockManager.release(sessionKey, lockOwnerId);
logVerbose(`dispatch-acp-lock: session=${sessionKey} released owner=${lockOwnerId}`);
} catch (error) {
logVerbose(
`dispatch-acp-lock: session=${sessionKey} release_failed owner=${lockOwnerId} error=${error instanceof Error ? error.message : String(error)}`,
);
} finally {
lockOwnerId = null;
}
}
}
}

View File

@ -0,0 +1,219 @@
import { beforeEach, describe, expect, it, vi } from "vitest";
import {
LocalSessionLockManager,
RedisSessionLockManager,
getAcpSessionLockManager,
resetAcpSessionLockManagerForTests,
resolveAcpSessionLockTtlMs,
} from "./session-lock-manager.js";
describe("LocalSessionLockManager", () => {
it("acquires and releases session locks", async () => {
const manager = new LocalSessionLockManager();
const first = await manager.acquire("session-1", 5_000);
expect(first.acquired).toBe(true);
if (!first.acquired) {
throw new Error("expected first lock to be acquired");
}
const second = await manager.acquire("session-1", 5_000);
expect(second).toEqual({ acquired: false });
await manager.release("session-1", first.ownerId);
const third = await manager.acquire("session-1", 5_000);
expect(third.acquired).toBe(true);
});
it("blocks concurrent acquire for the same session key", async () => {
const manager = new LocalSessionLockManager();
const [a, b] = await Promise.all([
manager.acquire("session-2", 5_000),
manager.acquire("session-2", 5_000),
]);
expect([a.acquired, b.acquired].toSorted((x, y) => Number(x) - Number(y))).toEqual([
false,
true,
]);
});
it("only allows the current owner to release", async () => {
const manager = new LocalSessionLockManager();
const first = await manager.acquire("session-3", 5_000);
expect(first.acquired).toBe(true);
if (!first.acquired) {
throw new Error("expected lock to be acquired");
}
await manager.release("session-3", "wrong-owner");
const blocked = await manager.acquire("session-3", 5_000);
expect(blocked).toEqual({ acquired: false });
await manager.release("session-3", first.ownerId);
const reopened = await manager.acquire("session-3", 5_000);
expect(reopened.acquired).toBe(true);
});
it("renews lock ttl only for the current owner", async () => {
vi.useFakeTimers();
try {
const manager = new LocalSessionLockManager();
const first = await manager.acquire("session-4", 1_000);
expect(first.acquired).toBe(true);
if (!first.acquired) {
throw new Error("expected lock to be acquired");
}
vi.advanceTimersByTime(700);
const wrongOwnerRenewed = await manager.renew("session-4", "wrong-owner", 1_000);
expect(wrongOwnerRenewed).toBe(false);
const renewed = await manager.renew("session-4", first.ownerId, 1_000);
expect(renewed).toBe(true);
vi.advanceTimersByTime(700);
const blocked = await manager.acquire("session-4", 1_000);
expect(blocked).toEqual({ acquired: false });
vi.advanceTimersByTime(400);
const afterExpiry = await manager.acquire("session-4", 1_000);
expect(afterExpiry.acquired).toBe(true);
} finally {
vi.useRealTimers();
}
});
});
describe("RedisSessionLockManager", () => {
beforeEach(() => {
vi.restoreAllMocks();
});
it("acquire uses NX+PX and returns false when lock exists", async () => {
const values = new Map<string, string>();
const runRedisCommand = vi.fn(async (args: string[]) => {
if (args[0] === "SET") {
const [_, key, value] = args;
if (values.has(key)) {
return null;
}
values.set(key, value);
return "OK";
}
throw new Error(`unsupported command: ${args.join(" ")}`);
});
const manager = new RedisSessionLockManager({
runRedisCommand,
ownerIdFactory: () => "owner-1",
});
const acquired = await manager.acquire("session-5", 9_000);
expect(acquired).toEqual({ acquired: true, ownerId: "owner-1" });
const blocked = await manager.acquire("session-5", 9_000);
expect(blocked).toEqual({ acquired: false });
expect(runRedisCommand).toHaveBeenCalledWith([
"SET",
"lock:acp:session:session-5",
"owner-1",
"NX",
"PX",
"9000",
]);
});
it("only owner can release and renew", async () => {
const values = new Map<string, string>();
const runRedisCommand = vi.fn(async (args: string[]) => {
if (args[0] === "SET") {
const [_, key, value] = args;
if (values.has(key)) {
return null;
}
values.set(key, value);
return "OK";
}
if (args[0] === "EVAL") {
const [_, script, __, key, ownerId] = args;
if (script.includes("DEL")) {
if (values.get(key) === ownerId) {
values.delete(key);
return 1;
}
return 0;
}
if (script.includes("PEXPIRE")) {
const ttl = Number.parseInt(args[5] ?? "", 10);
expect(ttl).toBe(7_000);
return values.get(key) === ownerId ? 1 : 0;
}
}
throw new Error(`unsupported command: ${args.join(" ")}`);
});
const manager = new RedisSessionLockManager({
runRedisCommand,
ownerIdFactory: () => "owner-2",
});
const acquired = await manager.acquire("session-6", 7_000);
expect(acquired).toEqual({ acquired: true, ownerId: "owner-2" });
const wrongOwnerRenew = await manager.renew("session-6", "wrong-owner", 7_000);
expect(wrongOwnerRenew).toBe(false);
const ownerRenew = await manager.renew("session-6", "owner-2", 7_000);
expect(ownerRenew).toBe(true);
await manager.release("session-6", "wrong-owner");
const stillBlocked = await manager.acquire("session-6", 7_000);
expect(stillBlocked).toEqual({ acquired: false });
await manager.release("session-6", "owner-2");
const reopened = await manager.acquire("session-6", 7_000);
expect(reopened).toEqual({ acquired: true, ownerId: "owner-2" });
});
});
describe("session lock manager selection", () => {
beforeEach(() => {
resetAcpSessionLockManagerForTests();
});
it("defaults to local lock manager when Redis is not configured", () => {
const manager = getAcpSessionLockManager({});
expect(manager).toBeInstanceOf(LocalSessionLockManager);
});
it("fails closed when Redis is configured but initialization fails", async () => {
const manager = getAcpSessionLockManager({
OPENCLAW_ACP_SESSION_LOCK_REDIS_URL: "http://redis.example",
});
expect(manager).not.toBeInstanceOf(LocalSessionLockManager);
await expect(manager.acquire("session-7", 1_000)).rejects.toThrow(
"Redis ACP session lock manager unavailable",
);
});
it("uses default ttl when env is missing or invalid", () => {
expect(resolveAcpSessionLockTtlMs({})).toBe(120_000);
expect(resolveAcpSessionLockTtlMs({ OPENCLAW_ACP_SESSION_LOCK_TTL_MS: "nope" })).toBe(120_000);
expect(resolveAcpSessionLockTtlMs({ OPENCLAW_ACP_SESSION_LOCK_TTL_MS: "2000" })).toBe(2_000);
});
// C-3: REDIS_URL fallback removed — generic REDIS_URL must not activate Redis-backed locking
it("does not activate Redis backend when only REDIS_URL is set", () => {
const manager = getAcpSessionLockManager({ REDIS_URL: "redis://localhost:6379" });
expect(manager).toBeInstanceOf(LocalSessionLockManager);
});
it("activates Redis backend only when OPENCLAW_ACP_SESSION_LOCK_REDIS_URL is set", () => {
const manager = getAcpSessionLockManager({
REDIS_URL: "redis://localhost:6379",
OPENCLAW_ACP_SESSION_LOCK_REDIS_URL: "http://redis.example",
});
// init will fail (bad scheme) but it must not fall back to local — must be fail-closed
expect(manager).not.toBeInstanceOf(LocalSessionLockManager);
});
});
// C-1: sendCommand per-command timeout lives inside RedisSocketConnection, which is private
// and only reachable via createRedisCommandRunner (real TCP path). That path is not exercised
// in unit tests because RedisSessionLockManager accepts a runRedisCommand injection that
// bypasses socket I/O entirely. The Promise.race timeout is covered by code inspection; a
// network-level integration test would require a real (or mock) TCP server.

View File

@ -0,0 +1,499 @@
import net from "node:net";
import tls from "node:tls";
import { logVerbose } from "../../globals.js";
import { generateSecureUuid } from "../../infra/secure-random.js";
export interface SessionLockManager {
acquire(
sessionKey: string,
ttlMs: number,
): Promise<{ acquired: true; ownerId: string } | { acquired: false }>;
release(sessionKey: string, ownerId: string): Promise<void>;
renew(sessionKey: string, ownerId: string, ttlMs: number): Promise<boolean>;
}
type RedisReply = number | string | null | RedisReply[];
type RedisCommandRunner = (args: string[]) => Promise<RedisReply>;
const DEFAULT_ACP_SESSION_LOCK_TTL_MS = 120_000;
const MIN_LOCK_TTL_MS = 1_000;
const LOCK_KEY_PREFIX = "lock:acp:session:";
const RELEASE_IF_OWNER_LUA = `if redis.call("GET", KEYS[1]) == ARGV[1] then return redis.call("DEL", KEYS[1]) else return 0 end`;
const RENEW_IF_OWNER_LUA = `if redis.call("GET", KEYS[1]) == ARGV[1] then return redis.call("PEXPIRE", KEYS[1], ARGV[2]) else return 0 end`;
type RedisConnectionConfig = {
tls: boolean;
host: string;
port: number;
username?: string;
password?: string;
database: number;
};
class RedisProtocolError extends Error {}
class RedisSocketConnection {
private readonly socket: net.Socket | tls.TLSSocket;
private buffer = Buffer.alloc(0);
private pending:
| {
resolve: (value: RedisReply) => void;
reject: (error: Error) => void;
}
| undefined;
constructor(socket: net.Socket | tls.TLSSocket) {
this.socket = socket;
this.socket.on("data", (chunk: Buffer) => {
this.buffer = Buffer.concat([this.buffer, chunk]);
this.flushPending();
});
this.socket.on("error", (error) => {
if (this.pending) {
this.pending.reject(error);
this.pending = undefined;
}
});
this.socket.on("close", () => {
if (this.pending) {
this.pending.reject(new Error("Redis socket closed before response was received."));
this.pending = undefined;
}
});
}
async sendCommand(args: string[], timeoutMs = 10_000): Promise<RedisReply> {
if (this.pending) {
throw new Error("Redis command pipelining is not supported by this connection.");
}
const payload = encodeRedisCommand(args);
this.socket.write(payload);
const parsed = tryParseRedisReply(this.buffer);
if (parsed) {
this.buffer = this.buffer.subarray(parsed.nextOffset);
if (parsed.error) {
throw parsed.error;
}
return parsed.value;
}
return await Promise.race([
new Promise<RedisReply>((resolve, reject) => {
this.pending = { resolve, reject };
}),
new Promise<never>((_, reject) => {
setTimeout(
() => reject(new Error(`Redis command timed out after ${timeoutMs}ms.`)),
timeoutMs,
);
}),
]);
}
close(): void {
this.socket.end();
this.socket.destroy();
}
private flushPending(): void {
if (!this.pending) {
return;
}
const parsed = tryParseRedisReply(this.buffer);
if (!parsed) {
return;
}
this.buffer = this.buffer.subarray(parsed.nextOffset);
const pending = this.pending;
this.pending = undefined;
if (parsed.error) {
pending.reject(parsed.error);
return;
}
pending.resolve(parsed.value);
}
}
function encodeRedisCommand(args: string[]): Buffer {
const lines: string[] = [`*${args.length}`];
for (const arg of args) {
lines.push(`$${Buffer.byteLength(arg)}`);
lines.push(arg);
}
return Buffer.from(`${lines.join("\r\n")}\r\n`, "utf8");
}
function tryReadLine(
buffer: Buffer,
startOffset: number,
): { line: string; nextOffset: number } | null {
const lineEnd = buffer.indexOf("\r\n", startOffset);
if (lineEnd < 0) {
return null;
}
const line = buffer.toString("utf8", startOffset, lineEnd);
return { line, nextOffset: lineEnd + 2 };
}
function parseRedisValueAt(
buffer: Buffer,
startOffset: number,
): { value: RedisReply; nextOffset: number; error?: Error } | null {
if (startOffset >= buffer.length) {
return null;
}
const type = String.fromCharCode(buffer[startOffset]);
const line = tryReadLine(buffer, startOffset + 1);
if (!line) {
return null;
}
if (type === "+") {
return { value: line.line, nextOffset: line.nextOffset };
}
if (type === "-") {
return {
value: line.line,
nextOffset: line.nextOffset,
error: new RedisProtocolError(`Redis error reply: ${line.line}`),
};
}
if (type === ":") {
const parsed = Number.parseInt(line.line, 10);
if (!Number.isFinite(parsed)) {
throw new RedisProtocolError(`Invalid Redis integer reply: ${line.line}`);
}
return { value: parsed, nextOffset: line.nextOffset };
}
if (type === "$") {
const len = Number.parseInt(line.line, 10);
if (!Number.isFinite(len)) {
throw new RedisProtocolError(`Invalid Redis bulk length: ${line.line}`);
}
if (len === -1) {
return { value: null, nextOffset: line.nextOffset };
}
const bodyEnd = line.nextOffset + len;
if (bodyEnd + 2 > buffer.length) {
return null;
}
const text = buffer.toString("utf8", line.nextOffset, bodyEnd);
return { value: text, nextOffset: bodyEnd + 2 };
}
if (type === "*") {
const count = Number.parseInt(line.line, 10);
if (!Number.isFinite(count)) {
throw new RedisProtocolError(`Invalid Redis array length: ${line.line}`);
}
if (count === -1) {
return { value: null, nextOffset: line.nextOffset };
}
let nextOffset = line.nextOffset;
const values: RedisReply[] = [];
for (let idx = 0; idx < count; idx += 1) {
const child = parseRedisValueAt(buffer, nextOffset);
if (!child) {
return null;
}
nextOffset = child.nextOffset;
if (child.error) {
return {
value: child.value,
nextOffset,
error: child.error,
};
}
values.push(child.value);
}
return { value: values, nextOffset };
}
throw new RedisProtocolError(`Unsupported Redis response type: ${type}`);
}
function tryParseRedisReply(
buffer: Buffer,
): { value: RedisReply; nextOffset: number; error?: Error } | null {
return parseRedisValueAt(buffer, 0);
}
function parseRedisConnectionConfig(redisUrl: string): RedisConnectionConfig {
const parsed = new URL(redisUrl);
if (parsed.protocol !== "redis:" && parsed.protocol !== "rediss:") {
throw new Error(
`Unsupported Redis URL protocol "${parsed.protocol}". Expected redis:// or rediss://.`,
);
}
const host = parsed.hostname?.trim();
if (!host) {
throw new Error("Redis URL must include a host.");
}
const portRaw = parsed.port?.trim();
const port = portRaw ? Number.parseInt(portRaw, 10) : 6379;
if (!Number.isFinite(port) || port <= 0) {
throw new Error(`Redis URL has invalid port: ${portRaw || "(empty)"}`);
}
const username = parsed.username ? decodeURIComponent(parsed.username) : undefined;
const password = parsed.password ? decodeURIComponent(parsed.password) : undefined;
const dbRaw = parsed.pathname.replace(/^\//, "").trim();
const database = dbRaw ? Number.parseInt(dbRaw, 10) : 0;
if (!Number.isFinite(database) || database < 0) {
throw new Error(`Redis URL has invalid database index: ${dbRaw || "(empty)"}`);
}
return {
tls: parsed.protocol === "rediss:",
host,
port,
username,
password,
database,
};
}
async function connectRedis(config: RedisConnectionConfig): Promise<RedisSocketConnection> {
const socket: net.Socket | tls.TLSSocket = await new Promise((resolve, reject) => {
const connectHandler = () => {
cleanup();
resolve(rawSocket);
};
const errorHandler = (error: Error) => {
cleanup();
reject(error);
};
const timeoutHandler = () => {
cleanup();
reject(new Error("Redis connection timed out."));
};
const cleanup = () => {
rawSocket.off("connect", connectHandler);
rawSocket.off("error", errorHandler);
rawSocket.off("timeout", timeoutHandler);
};
const rawSocket = config.tls
? tls.connect({ host: config.host, port: config.port })
: net.createConnection({ host: config.host, port: config.port });
rawSocket.setTimeout(10_000);
rawSocket.on("connect", connectHandler);
rawSocket.on("error", errorHandler);
rawSocket.on("timeout", timeoutHandler);
});
const connection = new RedisSocketConnection(socket);
if (config.password) {
const authArgs = config.username
? ["AUTH", config.username, config.password]
: ["AUTH", config.password];
await connection.sendCommand(authArgs);
}
if (config.database !== 0) {
await connection.sendCommand(["SELECT", String(config.database)]);
}
return connection;
}
function createRedisCommandRunner(redisUrl: string): RedisCommandRunner {
// No shared Redis client currently exists in this repo; keep lock-scoped RESP wiring local.
const config = parseRedisConnectionConfig(redisUrl);
return async (args: string[]) => {
const connection = await connectRedis(config);
try {
return await connection.sendCommand(args);
} finally {
connection.close();
}
};
}
function toPositiveTtlMs(ttlMs: number): number {
if (!Number.isFinite(ttlMs)) {
return DEFAULT_ACP_SESSION_LOCK_TTL_MS;
}
return Math.max(MIN_LOCK_TTL_MS, Math.floor(ttlMs));
}
function lockKeyForSession(sessionKey: string): string {
return `${LOCK_KEY_PREFIX}${sessionKey}`;
}
type LocalLockEntry = {
ownerId: string;
expiresAtMs: number;
};
export class LocalSessionLockManager implements SessionLockManager {
private readonly locks = new Map<string, LocalLockEntry>();
async acquire(
sessionKey: string,
ttlMs: number,
): Promise<{ acquired: true; ownerId: string } | { acquired: false }> {
this.clearIfExpired(sessionKey);
if (this.locks.has(sessionKey)) {
return { acquired: false };
}
const ownerId = generateSecureUuid();
this.locks.set(sessionKey, {
ownerId,
expiresAtMs: Date.now() + toPositiveTtlMs(ttlMs),
});
return { acquired: true, ownerId };
}
async release(sessionKey: string, ownerId: string): Promise<void> {
this.clearIfExpired(sessionKey);
const lock = this.locks.get(sessionKey);
if (!lock || lock.ownerId !== ownerId) {
return;
}
this.locks.delete(sessionKey);
}
async renew(sessionKey: string, ownerId: string, ttlMs: number): Promise<boolean> {
this.clearIfExpired(sessionKey);
const lock = this.locks.get(sessionKey);
if (!lock || lock.ownerId !== ownerId) {
return false;
}
lock.expiresAtMs = Date.now() + toPositiveTtlMs(ttlMs);
this.locks.set(sessionKey, lock);
return true;
}
private clearIfExpired(sessionKey: string): void {
const lock = this.locks.get(sessionKey);
if (!lock) {
return;
}
if (lock.expiresAtMs <= Date.now()) {
this.locks.delete(sessionKey);
}
}
}
export class RedisSessionLockManager implements SessionLockManager {
private readonly runRedisCommand: RedisCommandRunner;
private readonly ownerIdFactory: () => string;
constructor(params: {
redisUrl?: string;
runRedisCommand?: RedisCommandRunner;
ownerIdFactory?: () => string;
}) {
if (!params.redisUrl && !params.runRedisCommand) {
throw new Error("RedisSessionLockManager requires redisUrl or runRedisCommand.");
}
this.runRedisCommand =
params.runRedisCommand ??
createRedisCommandRunner(
params.redisUrl ??
(() => {
throw new Error("Missing redisUrl");
})(),
);
this.ownerIdFactory = params.ownerIdFactory ?? generateSecureUuid;
}
async acquire(
sessionKey: string,
ttlMs: number,
): Promise<{ acquired: true; ownerId: string } | { acquired: false }> {
const ownerId = this.ownerIdFactory();
const result = await this.runRedisCommand([
"SET",
lockKeyForSession(sessionKey),
ownerId,
"NX",
"PX",
String(toPositiveTtlMs(ttlMs)),
]);
if (result === "OK") {
return { acquired: true, ownerId };
}
return { acquired: false };
}
async release(sessionKey: string, ownerId: string): Promise<void> {
await this.runRedisCommand([
"EVAL",
RELEASE_IF_OWNER_LUA,
"1",
lockKeyForSession(sessionKey),
ownerId,
]);
}
async renew(sessionKey: string, ownerId: string, ttlMs: number): Promise<boolean> {
const result = await this.runRedisCommand([
"EVAL",
RENEW_IF_OWNER_LUA,
"1",
lockKeyForSession(sessionKey),
ownerId,
String(toPositiveTtlMs(ttlMs)),
]);
return Number(result) === 1;
}
}
class FailClosedSessionLockManager implements SessionLockManager {
private readonly reason: string;
constructor(reason: string) {
this.reason = reason;
}
async acquire(
_sessionKey: string,
_ttlMs: number,
): Promise<{ acquired: true; ownerId: string } | { acquired: false }> {
throw new Error(`Redis ACP session lock manager unavailable: ${this.reason}`);
}
async release(_sessionKey: string, _ownerId: string): Promise<void> {}
async renew(_sessionKey: string, _ownerId: string, _ttlMs: number): Promise<boolean> {
return false;
}
}
export function resolveAcpSessionLockTtlMs(env: NodeJS.ProcessEnv = process.env): number {
const raw = env.OPENCLAW_ACP_SESSION_LOCK_TTL_MS?.trim();
if (!raw) {
return DEFAULT_ACP_SESSION_LOCK_TTL_MS;
}
const parsed = Number.parseInt(raw, 10);
if (!Number.isFinite(parsed) || parsed <= 0) {
return DEFAULT_ACP_SESSION_LOCK_TTL_MS;
}
return toPositiveTtlMs(parsed);
}
function resolveRedisLockUrl(env: NodeJS.ProcessEnv = process.env): string | null {
const normalized = env.OPENCLAW_ACP_SESSION_LOCK_REDIS_URL?.trim();
return normalized || null;
}
let ACP_SESSION_LOCK_MANAGER_SINGLETON: SessionLockManager | null = null;
export function getAcpSessionLockManager(env: NodeJS.ProcessEnv = process.env): SessionLockManager {
if (ACP_SESSION_LOCK_MANAGER_SINGLETON) {
return ACP_SESSION_LOCK_MANAGER_SINGLETON;
}
const redisUrl = resolveRedisLockUrl(env);
if (!redisUrl) {
ACP_SESSION_LOCK_MANAGER_SINGLETON = new LocalSessionLockManager();
logVerbose("dispatch-acp-lock: backend=local redis_configured=false");
return ACP_SESSION_LOCK_MANAGER_SINGLETON;
}
try {
ACP_SESSION_LOCK_MANAGER_SINGLETON = new RedisSessionLockManager({ redisUrl });
logVerbose("dispatch-acp-lock: backend=redis");
return ACP_SESSION_LOCK_MANAGER_SINGLETON;
} catch (error) {
const message = error instanceof Error ? error.message : String(error);
ACP_SESSION_LOCK_MANAGER_SINGLETON = new FailClosedSessionLockManager(message);
logVerbose(`dispatch-acp-lock: backend=redis init_failed=${message} fail_closed=true`);
return ACP_SESSION_LOCK_MANAGER_SINGLETON;
}
}
export function resetAcpSessionLockManagerForTests(): void {
ACP_SESSION_LOCK_MANAGER_SINGLETON = null;
}