fix(acp): add distributed session locking with fail-closed redis fallback
- add SessionLockManager abstraction for ACP execution locking - add local, redis, and fail-closed lock manager implementations - use Redis SET NX PX plus owner-checked Lua release/renew - wire lock acquire/release/renew into ACP dispatch flow - skip with acp_execution_locked when lock is contended or acquire fails - document ACP session lock Redis env vars in .env.example - add unit and dispatch tests for lock behavior
This commit is contained in:
parent
6e044ace28
commit
39d3890278
@ -78,3 +78,12 @@ OPENCLAW_GATEWAY_TOKEN=change-me-to-a-long-random-token
|
||||
# ELEVENLABS_API_KEY=...
|
||||
# XI_API_KEY=... # alias for ElevenLabs
|
||||
# DEEPGRAM_API_KEY=...
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
# ACP distributed session locking (optional)
|
||||
# -----------------------------------------------------------------------------
|
||||
# Enable Redis-backed ACP session locks across workers/containers.
|
||||
# If unset, ACP uses an in-process local lock manager.
|
||||
# OPENCLAW_ACP_SESSION_LOCK_REDIS_URL=redis://:password@redis-host:6379/0
|
||||
# Optional lock TTL in milliseconds (default: 120000).
|
||||
# OPENCLAW_ACP_SESSION_LOCK_TTL_MS=120000
|
||||
|
||||
@ -52,6 +52,13 @@ const bindingServiceMocks = vi.hoisted(() => ({
|
||||
listBySession: vi.fn<(sessionKey: string) => SessionBindingRecord[]>(() => []),
|
||||
}));
|
||||
|
||||
const sessionLockMocks = vi.hoisted(() => ({
|
||||
acquire: vi.fn(),
|
||||
release: vi.fn(),
|
||||
renew: vi.fn(),
|
||||
resolveTtlMs: vi.fn(() => 120_000),
|
||||
}));
|
||||
|
||||
vi.mock("../../acp/control-plane/manager.js", () => ({
|
||||
getAcpSessionManager: () => managerMocks,
|
||||
}));
|
||||
@ -87,6 +94,16 @@ vi.mock("../../infra/outbound/session-binding-service.js", () => ({
|
||||
}),
|
||||
}));
|
||||
|
||||
vi.mock("./session-lock-manager.js", () => ({
|
||||
getAcpSessionLockManager: () => ({
|
||||
acquire: (sessionKey: string, ttlMs: number) => sessionLockMocks.acquire(sessionKey, ttlMs),
|
||||
release: (sessionKey: string, ownerId: string) => sessionLockMocks.release(sessionKey, ownerId),
|
||||
renew: (sessionKey: string, ownerId: string, ttlMs: number) =>
|
||||
sessionLockMocks.renew(sessionKey, ownerId, ttlMs),
|
||||
}),
|
||||
resolveAcpSessionLockTtlMs: () => sessionLockMocks.resolveTtlMs(),
|
||||
}));
|
||||
|
||||
const { tryDispatchAcpReply } = await import("./dispatch-acp.js");
|
||||
const sessionKey = "agent:codex-acp:session-1";
|
||||
|
||||
@ -160,6 +177,41 @@ async function runDispatch(params: {
|
||||
});
|
||||
}
|
||||
|
||||
async function runDispatchWithTracking(params: {
|
||||
bodyForAgent: string;
|
||||
cfg?: OpenClawConfig;
|
||||
dispatcher?: ReplyDispatcher;
|
||||
shouldRouteToOriginating?: boolean;
|
||||
onReplyStart?: () => void;
|
||||
ctxOverrides?: Record<string, unknown>;
|
||||
}) {
|
||||
const recordProcessed = vi.fn();
|
||||
const markIdle = vi.fn();
|
||||
const result = await tryDispatchAcpReply({
|
||||
ctx: buildTestCtx({
|
||||
Provider: "discord",
|
||||
Surface: "discord",
|
||||
SessionKey: sessionKey,
|
||||
BodyForAgent: params.bodyForAgent,
|
||||
...params.ctxOverrides,
|
||||
}),
|
||||
cfg: params.cfg ?? createAcpTestConfig(),
|
||||
dispatcher: params.dispatcher ?? createDispatcher().dispatcher,
|
||||
sessionKey,
|
||||
inboundAudio: false,
|
||||
shouldRouteToOriginating: params.shouldRouteToOriginating ?? false,
|
||||
...(params.shouldRouteToOriginating
|
||||
? { originatingChannel: "telegram", originatingTo: "telegram:thread-1" }
|
||||
: {}),
|
||||
shouldSendToolSummaries: true,
|
||||
bypassForCommand: false,
|
||||
...(params.onReplyStart ? { onReplyStart: params.onReplyStart } : {}),
|
||||
recordProcessed,
|
||||
markIdle,
|
||||
});
|
||||
return { result, recordProcessed, markIdle };
|
||||
}
|
||||
|
||||
async function emitToolLifecycleEvents(
|
||||
onEvent: (event: unknown) => Promise<void>,
|
||||
toolCallId: string,
|
||||
@ -232,6 +284,14 @@ describe("tryDispatchAcpReply", () => {
|
||||
sessionMetaMocks.readAcpSessionEntry.mockReturnValue(null);
|
||||
bindingServiceMocks.listBySession.mockReset();
|
||||
bindingServiceMocks.listBySession.mockReturnValue([]);
|
||||
sessionLockMocks.acquire.mockReset();
|
||||
sessionLockMocks.acquire.mockResolvedValue({ acquired: true, ownerId: "owner-1" });
|
||||
sessionLockMocks.release.mockReset();
|
||||
sessionLockMocks.release.mockResolvedValue(undefined);
|
||||
sessionLockMocks.renew.mockReset();
|
||||
sessionLockMocks.renew.mockResolvedValue(true);
|
||||
sessionLockMocks.resolveTtlMs.mockReset();
|
||||
sessionLockMocks.resolveTtlMs.mockReturnValue(120_000);
|
||||
});
|
||||
|
||||
it("routes ACP block output to originating channel", async () => {
|
||||
@ -435,4 +495,71 @@ describe("tryDispatchAcpReply", () => {
|
||||
}),
|
||||
);
|
||||
});
|
||||
|
||||
it("enforces ACP execution lock to prevent concurrent runs", async () => {
|
||||
setReadyAcpResolution();
|
||||
let releaseRun: (() => void) | undefined;
|
||||
let held = false;
|
||||
sessionLockMocks.acquire.mockImplementation(async () => {
|
||||
if (held) {
|
||||
return { acquired: false } as const;
|
||||
}
|
||||
held = true;
|
||||
return { acquired: true, ownerId: "owner-1" } as const;
|
||||
});
|
||||
sessionLockMocks.release.mockImplementation(async () => {
|
||||
held = false;
|
||||
});
|
||||
managerMocks.runTurn.mockImplementation(
|
||||
async () =>
|
||||
await new Promise<void>((resolve) => {
|
||||
releaseRun = resolve;
|
||||
}),
|
||||
);
|
||||
|
||||
const firstDispatch = runDispatch({ bodyForAgent: "first" });
|
||||
await vi.waitFor(() => {
|
||||
expect(managerMocks.runTurn).toHaveBeenCalledTimes(1);
|
||||
});
|
||||
|
||||
const secondResult = await runDispatch({ bodyForAgent: "second" });
|
||||
expect(secondResult?.queuedFinal).toBe(false);
|
||||
expect(managerMocks.runTurn).toHaveBeenCalledTimes(1);
|
||||
|
||||
releaseRun?.();
|
||||
await firstDispatch;
|
||||
});
|
||||
|
||||
it("marks skipped with acp_execution_locked when lock is already held", async () => {
|
||||
setReadyAcpResolution();
|
||||
sessionLockMocks.acquire.mockResolvedValue({ acquired: false });
|
||||
|
||||
const tracked = await runDispatchWithTracking({ bodyForAgent: "skip me" });
|
||||
|
||||
expect(tracked.result?.queuedFinal).toBe(false);
|
||||
expect(managerMocks.runTurn).not.toHaveBeenCalled();
|
||||
expect(tracked.recordProcessed).toHaveBeenCalledWith("skipped", {
|
||||
reason: "acp_execution_locked",
|
||||
});
|
||||
});
|
||||
|
||||
it("releases lock after successful ACP dispatch", async () => {
|
||||
setReadyAcpResolution();
|
||||
sessionLockMocks.acquire.mockResolvedValue({ acquired: true, ownerId: "owner-success" });
|
||||
managerMocks.runTurn.mockResolvedValue(undefined);
|
||||
|
||||
await runDispatch({ bodyForAgent: "success" });
|
||||
|
||||
expect(sessionLockMocks.release).toHaveBeenCalledWith(sessionKey, "owner-success");
|
||||
});
|
||||
|
||||
it("releases lock after ACP dispatch throws", async () => {
|
||||
setReadyAcpResolution();
|
||||
sessionLockMocks.acquire.mockResolvedValue({ acquired: true, ownerId: "owner-error" });
|
||||
managerMocks.runTurn.mockRejectedValue(new Error("boom"));
|
||||
|
||||
await runDispatch({ bodyForAgent: "throws" });
|
||||
|
||||
expect(sessionLockMocks.release).toHaveBeenCalledWith(sessionKey, "owner-error");
|
||||
});
|
||||
});
|
||||
|
||||
@ -32,6 +32,7 @@ import type { FinalizedMsgContext } from "../templating.js";
|
||||
import { createAcpReplyProjector } from "./acp-projector.js";
|
||||
import { createAcpDispatchDeliveryCoordinator } from "./dispatch-acp-delivery.js";
|
||||
import type { ReplyDispatcher, ReplyDispatchKind } from "./reply-dispatcher.js";
|
||||
import { getAcpSessionLockManager, resolveAcpSessionLockTtlMs } from "./session-lock-manager.js";
|
||||
|
||||
type DispatchProcessedRecorder = (
|
||||
outcome: "completed" | "skipped" | "error",
|
||||
@ -229,6 +230,62 @@ export async function tryDispatchAcpReply(params: {
|
||||
onReplyStart: params.onReplyStart,
|
||||
});
|
||||
|
||||
const lockManager = getAcpSessionLockManager();
|
||||
const lockTtlMs = resolveAcpSessionLockTtlMs();
|
||||
const lockAcquireStartedAt = Date.now();
|
||||
let lockOwnerId: string | null = null;
|
||||
|
||||
try {
|
||||
const lockAcquire = await lockManager.acquire(sessionKey, lockTtlMs);
|
||||
if (!lockAcquire.acquired) {
|
||||
const counts = params.dispatcher.getQueuedCounts();
|
||||
params.recordProcessed("skipped", { reason: "acp_execution_locked" });
|
||||
params.markIdle("message_completed");
|
||||
logVerbose(
|
||||
`dispatch-acp-lock: session=${sessionKey} lock_contended=true backend_wait_ms=${Date.now() - lockAcquireStartedAt}`,
|
||||
);
|
||||
return { queuedFinal: false, counts };
|
||||
}
|
||||
lockOwnerId = lockAcquire.ownerId;
|
||||
logVerbose(
|
||||
`dispatch-acp-lock: session=${sessionKey} acquired=true owner=${lockOwnerId} ttlMs=${lockTtlMs}`,
|
||||
);
|
||||
} catch (error) {
|
||||
const counts = params.dispatcher.getQueuedCounts();
|
||||
params.recordProcessed("skipped", { reason: "acp_execution_locked" });
|
||||
params.markIdle("message_completed");
|
||||
logVerbose(
|
||||
`dispatch-acp-lock: session=${sessionKey} acquire_failed=${error instanceof Error ? error.message : String(error)}`,
|
||||
);
|
||||
return { queuedFinal: false, counts };
|
||||
}
|
||||
|
||||
let renewTimer: NodeJS.Timeout | null = null;
|
||||
if (lockOwnerId) {
|
||||
// Best-effort heartbeat: renew failures are logged, and the turn continues.
|
||||
const renewEveryMs = Math.max(1_000, Math.floor(lockTtlMs / 2));
|
||||
renewTimer = setInterval(() => {
|
||||
if (!lockOwnerId) {
|
||||
return;
|
||||
}
|
||||
void lockManager
|
||||
.renew(sessionKey, lockOwnerId, lockTtlMs)
|
||||
.then((renewed) => {
|
||||
if (!renewed) {
|
||||
logVerbose(
|
||||
`dispatch-acp-lock: session=${sessionKey} renew_failed owner=${lockOwnerId}`,
|
||||
);
|
||||
}
|
||||
})
|
||||
.catch((error) => {
|
||||
logVerbose(
|
||||
`dispatch-acp-lock: session=${sessionKey} renew_error=${error instanceof Error ? error.message : String(error)}`,
|
||||
);
|
||||
});
|
||||
}, renewEveryMs);
|
||||
renewTimer.unref?.();
|
||||
}
|
||||
|
||||
const identityPendingBeforeTurn = isSessionIdentityPending(
|
||||
resolveSessionIdentityFromMeta(acpResolution.kind === "ready" ? acpResolution.meta : undefined),
|
||||
);
|
||||
@ -390,5 +447,21 @@ export async function tryDispatchAcpReply(params: {
|
||||
});
|
||||
params.markIdle("message_completed");
|
||||
return { queuedFinal, counts };
|
||||
} finally {
|
||||
if (renewTimer) {
|
||||
clearInterval(renewTimer);
|
||||
}
|
||||
if (lockOwnerId) {
|
||||
try {
|
||||
await lockManager.release(sessionKey, lockOwnerId);
|
||||
logVerbose(`dispatch-acp-lock: session=${sessionKey} released owner=${lockOwnerId}`);
|
||||
} catch (error) {
|
||||
logVerbose(
|
||||
`dispatch-acp-lock: session=${sessionKey} release_failed owner=${lockOwnerId} error=${error instanceof Error ? error.message : String(error)}`,
|
||||
);
|
||||
} finally {
|
||||
lockOwnerId = null;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
198
src/auto-reply/reply/session-lock-manager.test.ts
Normal file
198
src/auto-reply/reply/session-lock-manager.test.ts
Normal file
@ -0,0 +1,198 @@
|
||||
import { beforeEach, describe, expect, it, vi } from "vitest";
|
||||
import {
|
||||
LocalSessionLockManager,
|
||||
RedisSessionLockManager,
|
||||
getAcpSessionLockManager,
|
||||
resetAcpSessionLockManagerForTests,
|
||||
resolveAcpSessionLockTtlMs,
|
||||
} from "./session-lock-manager.js";
|
||||
|
||||
describe("LocalSessionLockManager", () => {
|
||||
it("acquires and releases session locks", async () => {
|
||||
const manager = new LocalSessionLockManager();
|
||||
|
||||
const first = await manager.acquire("session-1", 5_000);
|
||||
expect(first.acquired).toBe(true);
|
||||
if (!first.acquired) {
|
||||
throw new Error("expected first lock to be acquired");
|
||||
}
|
||||
const second = await manager.acquire("session-1", 5_000);
|
||||
expect(second).toEqual({ acquired: false });
|
||||
|
||||
await manager.release("session-1", first.ownerId);
|
||||
const third = await manager.acquire("session-1", 5_000);
|
||||
expect(third.acquired).toBe(true);
|
||||
});
|
||||
|
||||
it("blocks concurrent acquire for the same session key", async () => {
|
||||
const manager = new LocalSessionLockManager();
|
||||
const [a, b] = await Promise.all([
|
||||
manager.acquire("session-2", 5_000),
|
||||
manager.acquire("session-2", 5_000),
|
||||
]);
|
||||
expect([a.acquired, b.acquired].toSorted((x, y) => Number(x) - Number(y))).toEqual([
|
||||
false,
|
||||
true,
|
||||
]);
|
||||
});
|
||||
|
||||
it("only allows the current owner to release", async () => {
|
||||
const manager = new LocalSessionLockManager();
|
||||
const first = await manager.acquire("session-3", 5_000);
|
||||
expect(first.acquired).toBe(true);
|
||||
if (!first.acquired) {
|
||||
throw new Error("expected lock to be acquired");
|
||||
}
|
||||
|
||||
await manager.release("session-3", "wrong-owner");
|
||||
const blocked = await manager.acquire("session-3", 5_000);
|
||||
expect(blocked).toEqual({ acquired: false });
|
||||
|
||||
await manager.release("session-3", first.ownerId);
|
||||
const reopened = await manager.acquire("session-3", 5_000);
|
||||
expect(reopened.acquired).toBe(true);
|
||||
});
|
||||
|
||||
it("renews lock ttl only for the current owner", async () => {
|
||||
vi.useFakeTimers();
|
||||
try {
|
||||
const manager = new LocalSessionLockManager();
|
||||
const first = await manager.acquire("session-4", 1_000);
|
||||
expect(first.acquired).toBe(true);
|
||||
if (!first.acquired) {
|
||||
throw new Error("expected lock to be acquired");
|
||||
}
|
||||
|
||||
vi.advanceTimersByTime(700);
|
||||
const wrongOwnerRenewed = await manager.renew("session-4", "wrong-owner", 1_000);
|
||||
expect(wrongOwnerRenewed).toBe(false);
|
||||
|
||||
const renewed = await manager.renew("session-4", first.ownerId, 1_000);
|
||||
expect(renewed).toBe(true);
|
||||
|
||||
vi.advanceTimersByTime(700);
|
||||
const blocked = await manager.acquire("session-4", 1_000);
|
||||
expect(blocked).toEqual({ acquired: false });
|
||||
|
||||
vi.advanceTimersByTime(400);
|
||||
const afterExpiry = await manager.acquire("session-4", 1_000);
|
||||
expect(afterExpiry.acquired).toBe(true);
|
||||
} finally {
|
||||
vi.useRealTimers();
|
||||
}
|
||||
});
|
||||
});
|
||||
|
||||
describe("RedisSessionLockManager", () => {
|
||||
beforeEach(() => {
|
||||
vi.restoreAllMocks();
|
||||
});
|
||||
|
||||
it("acquire uses NX+PX and returns false when lock exists", async () => {
|
||||
const values = new Map<string, string>();
|
||||
const runRedisCommand = vi.fn(async (args: string[]) => {
|
||||
if (args[0] === "SET") {
|
||||
const [_, key, value] = args;
|
||||
if (values.has(key)) {
|
||||
return null;
|
||||
}
|
||||
values.set(key, value);
|
||||
return "OK";
|
||||
}
|
||||
throw new Error(`unsupported command: ${args.join(" ")}`);
|
||||
});
|
||||
const manager = new RedisSessionLockManager({
|
||||
runRedisCommand,
|
||||
ownerIdFactory: () => "owner-1",
|
||||
});
|
||||
|
||||
const acquired = await manager.acquire("session-5", 9_000);
|
||||
expect(acquired).toEqual({ acquired: true, ownerId: "owner-1" });
|
||||
const blocked = await manager.acquire("session-5", 9_000);
|
||||
expect(blocked).toEqual({ acquired: false });
|
||||
expect(runRedisCommand).toHaveBeenCalledWith([
|
||||
"SET",
|
||||
"lock:acp:session:session-5",
|
||||
"owner-1",
|
||||
"NX",
|
||||
"PX",
|
||||
"9000",
|
||||
]);
|
||||
});
|
||||
|
||||
it("only owner can release and renew", async () => {
|
||||
const values = new Map<string, string>();
|
||||
const runRedisCommand = vi.fn(async (args: string[]) => {
|
||||
if (args[0] === "SET") {
|
||||
const [_, key, value] = args;
|
||||
if (values.has(key)) {
|
||||
return null;
|
||||
}
|
||||
values.set(key, value);
|
||||
return "OK";
|
||||
}
|
||||
if (args[0] === "EVAL") {
|
||||
const [_, script, __, key, ownerId] = args;
|
||||
if (script.includes("DEL")) {
|
||||
if (values.get(key) === ownerId) {
|
||||
values.delete(key);
|
||||
return 1;
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
if (script.includes("PEXPIRE")) {
|
||||
const ttl = Number.parseInt(args[5] ?? "", 10);
|
||||
expect(ttl).toBe(7_000);
|
||||
return values.get(key) === ownerId ? 1 : 0;
|
||||
}
|
||||
}
|
||||
throw new Error(`unsupported command: ${args.join(" ")}`);
|
||||
});
|
||||
const manager = new RedisSessionLockManager({
|
||||
runRedisCommand,
|
||||
ownerIdFactory: () => "owner-2",
|
||||
});
|
||||
const acquired = await manager.acquire("session-6", 7_000);
|
||||
expect(acquired).toEqual({ acquired: true, ownerId: "owner-2" });
|
||||
|
||||
const wrongOwnerRenew = await manager.renew("session-6", "wrong-owner", 7_000);
|
||||
expect(wrongOwnerRenew).toBe(false);
|
||||
const ownerRenew = await manager.renew("session-6", "owner-2", 7_000);
|
||||
expect(ownerRenew).toBe(true);
|
||||
|
||||
await manager.release("session-6", "wrong-owner");
|
||||
const stillBlocked = await manager.acquire("session-6", 7_000);
|
||||
expect(stillBlocked).toEqual({ acquired: false });
|
||||
|
||||
await manager.release("session-6", "owner-2");
|
||||
const reopened = await manager.acquire("session-6", 7_000);
|
||||
expect(reopened).toEqual({ acquired: true, ownerId: "owner-2" });
|
||||
});
|
||||
});
|
||||
|
||||
describe("session lock manager selection", () => {
|
||||
beforeEach(() => {
|
||||
resetAcpSessionLockManagerForTests();
|
||||
});
|
||||
|
||||
it("defaults to local lock manager when Redis is not configured", () => {
|
||||
const manager = getAcpSessionLockManager({});
|
||||
expect(manager).toBeInstanceOf(LocalSessionLockManager);
|
||||
});
|
||||
|
||||
it("fails closed when Redis is configured but initialization fails", async () => {
|
||||
const manager = getAcpSessionLockManager({
|
||||
OPENCLAW_ACP_SESSION_LOCK_REDIS_URL: "http://redis.example",
|
||||
});
|
||||
expect(manager).not.toBeInstanceOf(LocalSessionLockManager);
|
||||
await expect(manager.acquire("session-7", 1_000)).rejects.toThrow(
|
||||
"Redis ACP session lock manager unavailable",
|
||||
);
|
||||
});
|
||||
|
||||
it("uses default ttl when env is missing or invalid", () => {
|
||||
expect(resolveAcpSessionLockTtlMs({})).toBe(120_000);
|
||||
expect(resolveAcpSessionLockTtlMs({ OPENCLAW_ACP_SESSION_LOCK_TTL_MS: "nope" })).toBe(120_000);
|
||||
expect(resolveAcpSessionLockTtlMs({ OPENCLAW_ACP_SESSION_LOCK_TTL_MS: "2000" })).toBe(2_000);
|
||||
});
|
||||
});
|
||||
497
src/auto-reply/reply/session-lock-manager.ts
Normal file
497
src/auto-reply/reply/session-lock-manager.ts
Normal file
@ -0,0 +1,497 @@
|
||||
import net from "node:net";
|
||||
import tls from "node:tls";
|
||||
import { logVerbose } from "../../globals.js";
|
||||
import { generateSecureUuid } from "../../infra/secure-random.js";
|
||||
|
||||
export interface SessionLockManager {
|
||||
acquire(
|
||||
sessionKey: string,
|
||||
ttlMs: number,
|
||||
): Promise<{ acquired: true; ownerId: string } | { acquired: false }>;
|
||||
release(sessionKey: string, ownerId: string): Promise<void>;
|
||||
renew(sessionKey: string, ownerId: string, ttlMs: number): Promise<boolean>;
|
||||
}
|
||||
|
||||
type RedisReply = number | string | null | RedisReply[];
|
||||
|
||||
type RedisCommandRunner = (args: string[]) => Promise<RedisReply>;
|
||||
|
||||
const DEFAULT_ACP_SESSION_LOCK_TTL_MS = 120_000;
|
||||
const MIN_LOCK_TTL_MS = 1_000;
|
||||
const LOCK_KEY_PREFIX = "lock:acp:session:";
|
||||
const RELEASE_IF_OWNER_LUA = `if redis.call("GET", KEYS[1]) == ARGV[1] then return redis.call("DEL", KEYS[1]) else return 0 end`;
|
||||
const RENEW_IF_OWNER_LUA = `if redis.call("GET", KEYS[1]) == ARGV[1] then return redis.call("PEXPIRE", KEYS[1], ARGV[2]) else return 0 end`;
|
||||
|
||||
type RedisConnectionConfig = {
|
||||
tls: boolean;
|
||||
host: string;
|
||||
port: number;
|
||||
username?: string;
|
||||
password?: string;
|
||||
database: number;
|
||||
};
|
||||
|
||||
class RedisProtocolError extends Error {}
|
||||
|
||||
class RedisSocketConnection {
|
||||
private readonly socket: net.Socket | tls.TLSSocket;
|
||||
private buffer = Buffer.alloc(0);
|
||||
private pending:
|
||||
| {
|
||||
resolve: (value: RedisReply) => void;
|
||||
reject: (error: Error) => void;
|
||||
}
|
||||
| undefined;
|
||||
|
||||
constructor(socket: net.Socket | tls.TLSSocket) {
|
||||
this.socket = socket;
|
||||
this.socket.on("data", (chunk: Buffer) => {
|
||||
this.buffer = Buffer.concat([this.buffer, chunk]);
|
||||
this.flushPending();
|
||||
});
|
||||
this.socket.on("error", (error) => {
|
||||
if (this.pending) {
|
||||
this.pending.reject(error);
|
||||
this.pending = undefined;
|
||||
}
|
||||
});
|
||||
this.socket.on("close", () => {
|
||||
if (this.pending) {
|
||||
this.pending.reject(new Error("Redis socket closed before response was received."));
|
||||
this.pending = undefined;
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
async sendCommand(args: string[]): Promise<RedisReply> {
|
||||
if (this.pending) {
|
||||
throw new Error("Redis command pipelining is not supported by this connection.");
|
||||
}
|
||||
const payload = encodeRedisCommand(args);
|
||||
this.socket.write(payload);
|
||||
const parsed = tryParseRedisReply(this.buffer);
|
||||
if (parsed) {
|
||||
this.buffer = this.buffer.subarray(parsed.nextOffset);
|
||||
if (parsed.error) {
|
||||
throw parsed.error;
|
||||
}
|
||||
return parsed.value;
|
||||
}
|
||||
return await new Promise<RedisReply>((resolve, reject) => {
|
||||
this.pending = { resolve, reject };
|
||||
});
|
||||
}
|
||||
|
||||
close(): void {
|
||||
this.socket.end();
|
||||
this.socket.destroy();
|
||||
}
|
||||
|
||||
private flushPending(): void {
|
||||
if (!this.pending) {
|
||||
return;
|
||||
}
|
||||
const parsed = tryParseRedisReply(this.buffer);
|
||||
if (!parsed) {
|
||||
return;
|
||||
}
|
||||
this.buffer = this.buffer.subarray(parsed.nextOffset);
|
||||
const pending = this.pending;
|
||||
this.pending = undefined;
|
||||
if (parsed.error) {
|
||||
pending.reject(parsed.error);
|
||||
return;
|
||||
}
|
||||
pending.resolve(parsed.value);
|
||||
}
|
||||
}
|
||||
|
||||
function encodeRedisCommand(args: string[]): Buffer {
|
||||
const lines: string[] = [`*${args.length}`];
|
||||
for (const arg of args) {
|
||||
lines.push(`$${Buffer.byteLength(arg)}`);
|
||||
lines.push(arg);
|
||||
}
|
||||
return Buffer.from(`${lines.join("\r\n")}\r\n`, "utf8");
|
||||
}
|
||||
|
||||
function tryReadLine(
|
||||
buffer: Buffer,
|
||||
startOffset: number,
|
||||
): { line: string; nextOffset: number } | null {
|
||||
const lineEnd = buffer.indexOf("\r\n", startOffset);
|
||||
if (lineEnd < 0) {
|
||||
return null;
|
||||
}
|
||||
const line = buffer.toString("utf8", startOffset, lineEnd);
|
||||
return { line, nextOffset: lineEnd + 2 };
|
||||
}
|
||||
|
||||
function parseRedisValueAt(
|
||||
buffer: Buffer,
|
||||
startOffset: number,
|
||||
): { value: RedisReply; nextOffset: number; error?: Error } | null {
|
||||
if (startOffset >= buffer.length) {
|
||||
return null;
|
||||
}
|
||||
const type = String.fromCharCode(buffer[startOffset]);
|
||||
const line = tryReadLine(buffer, startOffset + 1);
|
||||
if (!line) {
|
||||
return null;
|
||||
}
|
||||
if (type === "+") {
|
||||
return { value: line.line, nextOffset: line.nextOffset };
|
||||
}
|
||||
if (type === "-") {
|
||||
return {
|
||||
value: line.line,
|
||||
nextOffset: line.nextOffset,
|
||||
error: new RedisProtocolError(`Redis error reply: ${line.line}`),
|
||||
};
|
||||
}
|
||||
if (type === ":") {
|
||||
const parsed = Number.parseInt(line.line, 10);
|
||||
if (!Number.isFinite(parsed)) {
|
||||
throw new RedisProtocolError(`Invalid Redis integer reply: ${line.line}`);
|
||||
}
|
||||
return { value: parsed, nextOffset: line.nextOffset };
|
||||
}
|
||||
if (type === "$") {
|
||||
const len = Number.parseInt(line.line, 10);
|
||||
if (!Number.isFinite(len)) {
|
||||
throw new RedisProtocolError(`Invalid Redis bulk length: ${line.line}`);
|
||||
}
|
||||
if (len === -1) {
|
||||
return { value: null, nextOffset: line.nextOffset };
|
||||
}
|
||||
const bodyEnd = line.nextOffset + len;
|
||||
if (bodyEnd + 2 > buffer.length) {
|
||||
return null;
|
||||
}
|
||||
const text = buffer.toString("utf8", line.nextOffset, bodyEnd);
|
||||
return { value: text, nextOffset: bodyEnd + 2 };
|
||||
}
|
||||
if (type === "*") {
|
||||
const count = Number.parseInt(line.line, 10);
|
||||
if (!Number.isFinite(count)) {
|
||||
throw new RedisProtocolError(`Invalid Redis array length: ${line.line}`);
|
||||
}
|
||||
if (count === -1) {
|
||||
return { value: null, nextOffset: line.nextOffset };
|
||||
}
|
||||
let nextOffset = line.nextOffset;
|
||||
const values: RedisReply[] = [];
|
||||
for (let idx = 0; idx < count; idx += 1) {
|
||||
const child = parseRedisValueAt(buffer, nextOffset);
|
||||
if (!child) {
|
||||
return null;
|
||||
}
|
||||
nextOffset = child.nextOffset;
|
||||
if (child.error) {
|
||||
return {
|
||||
value: child.value,
|
||||
nextOffset,
|
||||
error: child.error,
|
||||
};
|
||||
}
|
||||
values.push(child.value);
|
||||
}
|
||||
return { value: values, nextOffset };
|
||||
}
|
||||
throw new RedisProtocolError(`Unsupported Redis response type: ${type}`);
|
||||
}
|
||||
|
||||
function tryParseRedisReply(
|
||||
buffer: Buffer,
|
||||
): { value: RedisReply; nextOffset: number; error?: Error } | null {
|
||||
return parseRedisValueAt(buffer, 0);
|
||||
}
|
||||
|
||||
function parseRedisConnectionConfig(redisUrl: string): RedisConnectionConfig {
|
||||
const parsed = new URL(redisUrl);
|
||||
if (parsed.protocol !== "redis:" && parsed.protocol !== "rediss:") {
|
||||
throw new Error(
|
||||
`Unsupported Redis URL protocol "${parsed.protocol}". Expected redis:// or rediss://.`,
|
||||
);
|
||||
}
|
||||
const host = parsed.hostname?.trim();
|
||||
if (!host) {
|
||||
throw new Error("Redis URL must include a host.");
|
||||
}
|
||||
const portRaw = parsed.port?.trim();
|
||||
const port = portRaw ? Number.parseInt(portRaw, 10) : 6379;
|
||||
if (!Number.isFinite(port) || port <= 0) {
|
||||
throw new Error(`Redis URL has invalid port: ${portRaw || "(empty)"}`);
|
||||
}
|
||||
const username = parsed.username ? decodeURIComponent(parsed.username) : undefined;
|
||||
const password = parsed.password ? decodeURIComponent(parsed.password) : undefined;
|
||||
const dbRaw = parsed.pathname.replace(/^\//, "").trim();
|
||||
const database = dbRaw ? Number.parseInt(dbRaw, 10) : 0;
|
||||
if (!Number.isFinite(database) || database < 0) {
|
||||
throw new Error(`Redis URL has invalid database index: ${dbRaw || "(empty)"}`);
|
||||
}
|
||||
return {
|
||||
tls: parsed.protocol === "rediss:",
|
||||
host,
|
||||
port,
|
||||
username,
|
||||
password,
|
||||
database,
|
||||
};
|
||||
}
|
||||
|
||||
async function connectRedis(config: RedisConnectionConfig): Promise<RedisSocketConnection> {
|
||||
const socket: net.Socket | tls.TLSSocket = await new Promise((resolve, reject) => {
|
||||
const connectHandler = () => {
|
||||
cleanup();
|
||||
resolve(rawSocket);
|
||||
};
|
||||
const errorHandler = (error: Error) => {
|
||||
cleanup();
|
||||
reject(error);
|
||||
};
|
||||
const timeoutHandler = () => {
|
||||
cleanup();
|
||||
reject(new Error("Redis connection timed out."));
|
||||
};
|
||||
const cleanup = () => {
|
||||
rawSocket.off("connect", connectHandler);
|
||||
rawSocket.off("error", errorHandler);
|
||||
rawSocket.off("timeout", timeoutHandler);
|
||||
};
|
||||
const rawSocket = config.tls
|
||||
? tls.connect({ host: config.host, port: config.port })
|
||||
: net.createConnection({ host: config.host, port: config.port });
|
||||
rawSocket.setTimeout(10_000);
|
||||
rawSocket.on("connect", connectHandler);
|
||||
rawSocket.on("error", errorHandler);
|
||||
rawSocket.on("timeout", timeoutHandler);
|
||||
});
|
||||
const connection = new RedisSocketConnection(socket);
|
||||
if (config.password) {
|
||||
const authArgs = config.username
|
||||
? ["AUTH", config.username, config.password]
|
||||
: ["AUTH", config.password];
|
||||
await connection.sendCommand(authArgs);
|
||||
}
|
||||
if (config.database !== 0) {
|
||||
await connection.sendCommand(["SELECT", String(config.database)]);
|
||||
}
|
||||
return connection;
|
||||
}
|
||||
|
||||
function createRedisCommandRunner(redisUrl: string): RedisCommandRunner {
|
||||
// No shared Redis client currently exists in this repo; keep lock-scoped RESP wiring local.
|
||||
const config = parseRedisConnectionConfig(redisUrl);
|
||||
return async (args: string[]) => {
|
||||
const connection = await connectRedis(config);
|
||||
try {
|
||||
return await connection.sendCommand(args);
|
||||
} finally {
|
||||
connection.close();
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
function toPositiveTtlMs(ttlMs: number): number {
|
||||
if (!Number.isFinite(ttlMs)) {
|
||||
return DEFAULT_ACP_SESSION_LOCK_TTL_MS;
|
||||
}
|
||||
return Math.max(MIN_LOCK_TTL_MS, Math.floor(ttlMs));
|
||||
}
|
||||
|
||||
function lockKeyForSession(sessionKey: string): string {
|
||||
return `${LOCK_KEY_PREFIX}${sessionKey}`;
|
||||
}
|
||||
|
||||
type LocalLockEntry = {
|
||||
ownerId: string;
|
||||
expiresAtMs: number;
|
||||
};
|
||||
|
||||
export class LocalSessionLockManager implements SessionLockManager {
|
||||
private readonly locks = new Map<string, LocalLockEntry>();
|
||||
|
||||
async acquire(
|
||||
sessionKey: string,
|
||||
ttlMs: number,
|
||||
): Promise<{ acquired: true; ownerId: string } | { acquired: false }> {
|
||||
this.clearIfExpired(sessionKey);
|
||||
if (this.locks.has(sessionKey)) {
|
||||
return { acquired: false };
|
||||
}
|
||||
const ownerId = generateSecureUuid();
|
||||
this.locks.set(sessionKey, {
|
||||
ownerId,
|
||||
expiresAtMs: Date.now() + toPositiveTtlMs(ttlMs),
|
||||
});
|
||||
return { acquired: true, ownerId };
|
||||
}
|
||||
|
||||
async release(sessionKey: string, ownerId: string): Promise<void> {
|
||||
this.clearIfExpired(sessionKey);
|
||||
const lock = this.locks.get(sessionKey);
|
||||
if (!lock || lock.ownerId !== ownerId) {
|
||||
return;
|
||||
}
|
||||
this.locks.delete(sessionKey);
|
||||
}
|
||||
|
||||
async renew(sessionKey: string, ownerId: string, ttlMs: number): Promise<boolean> {
|
||||
this.clearIfExpired(sessionKey);
|
||||
const lock = this.locks.get(sessionKey);
|
||||
if (!lock || lock.ownerId !== ownerId) {
|
||||
return false;
|
||||
}
|
||||
lock.expiresAtMs = Date.now() + toPositiveTtlMs(ttlMs);
|
||||
this.locks.set(sessionKey, lock);
|
||||
return true;
|
||||
}
|
||||
|
||||
private clearIfExpired(sessionKey: string): void {
|
||||
const lock = this.locks.get(sessionKey);
|
||||
if (!lock) {
|
||||
return;
|
||||
}
|
||||
if (lock.expiresAtMs <= Date.now()) {
|
||||
this.locks.delete(sessionKey);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
export class RedisSessionLockManager implements SessionLockManager {
|
||||
private readonly runRedisCommand: RedisCommandRunner;
|
||||
private readonly ownerIdFactory: () => string;
|
||||
|
||||
constructor(params: {
|
||||
redisUrl?: string;
|
||||
runRedisCommand?: RedisCommandRunner;
|
||||
ownerIdFactory?: () => string;
|
||||
}) {
|
||||
if (!params.redisUrl && !params.runRedisCommand) {
|
||||
throw new Error("RedisSessionLockManager requires redisUrl or runRedisCommand.");
|
||||
}
|
||||
this.runRedisCommand =
|
||||
params.runRedisCommand ??
|
||||
createRedisCommandRunner(
|
||||
params.redisUrl ??
|
||||
(() => {
|
||||
throw new Error("Missing redisUrl");
|
||||
})(),
|
||||
);
|
||||
this.ownerIdFactory = params.ownerIdFactory ?? generateSecureUuid;
|
||||
}
|
||||
|
||||
async acquire(
|
||||
sessionKey: string,
|
||||
ttlMs: number,
|
||||
): Promise<{ acquired: true; ownerId: string } | { acquired: false }> {
|
||||
const ownerId = this.ownerIdFactory();
|
||||
const result = await this.runRedisCommand([
|
||||
"SET",
|
||||
lockKeyForSession(sessionKey),
|
||||
ownerId,
|
||||
"NX",
|
||||
"PX",
|
||||
String(toPositiveTtlMs(ttlMs)),
|
||||
]);
|
||||
if (result === "OK") {
|
||||
return { acquired: true, ownerId };
|
||||
}
|
||||
return { acquired: false };
|
||||
}
|
||||
|
||||
async release(sessionKey: string, ownerId: string): Promise<void> {
|
||||
await this.runRedisCommand([
|
||||
"EVAL",
|
||||
RELEASE_IF_OWNER_LUA,
|
||||
"1",
|
||||
lockKeyForSession(sessionKey),
|
||||
ownerId,
|
||||
]);
|
||||
}
|
||||
|
||||
async renew(sessionKey: string, ownerId: string, ttlMs: number): Promise<boolean> {
|
||||
const result = await this.runRedisCommand([
|
||||
"EVAL",
|
||||
RENEW_IF_OWNER_LUA,
|
||||
"1",
|
||||
lockKeyForSession(sessionKey),
|
||||
ownerId,
|
||||
String(toPositiveTtlMs(ttlMs)),
|
||||
]);
|
||||
return Number(result) === 1;
|
||||
}
|
||||
}
|
||||
|
||||
class FailClosedSessionLockManager implements SessionLockManager {
|
||||
private readonly reason: string;
|
||||
|
||||
constructor(reason: string) {
|
||||
this.reason = reason;
|
||||
}
|
||||
|
||||
async acquire(
|
||||
_sessionKey: string,
|
||||
_ttlMs: number,
|
||||
): Promise<{ acquired: true; ownerId: string } | { acquired: false }> {
|
||||
throw new Error(`Redis ACP session lock manager unavailable: ${this.reason}`);
|
||||
}
|
||||
|
||||
async release(_sessionKey: string, _ownerId: string): Promise<void> {}
|
||||
|
||||
async renew(_sessionKey: string, _ownerId: string, _ttlMs: number): Promise<boolean> {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
export function resolveAcpSessionLockTtlMs(env: NodeJS.ProcessEnv = process.env): number {
|
||||
const raw = env.OPENCLAW_ACP_SESSION_LOCK_TTL_MS?.trim();
|
||||
if (!raw) {
|
||||
return DEFAULT_ACP_SESSION_LOCK_TTL_MS;
|
||||
}
|
||||
const parsed = Number.parseInt(raw, 10);
|
||||
if (!Number.isFinite(parsed) || parsed <= 0) {
|
||||
return DEFAULT_ACP_SESSION_LOCK_TTL_MS;
|
||||
}
|
||||
return toPositiveTtlMs(parsed);
|
||||
}
|
||||
|
||||
function resolveRedisLockUrl(env: NodeJS.ProcessEnv = process.env): string | null {
|
||||
const candidates = [env.OPENCLAW_ACP_SESSION_LOCK_REDIS_URL, env.REDIS_URL];
|
||||
for (const candidate of candidates) {
|
||||
const normalized = candidate?.trim();
|
||||
if (normalized) {
|
||||
return normalized;
|
||||
}
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
let ACP_SESSION_LOCK_MANAGER_SINGLETON: SessionLockManager | null = null;
|
||||
|
||||
export function getAcpSessionLockManager(env: NodeJS.ProcessEnv = process.env): SessionLockManager {
|
||||
if (ACP_SESSION_LOCK_MANAGER_SINGLETON) {
|
||||
return ACP_SESSION_LOCK_MANAGER_SINGLETON;
|
||||
}
|
||||
const redisUrl = resolveRedisLockUrl(env);
|
||||
if (!redisUrl) {
|
||||
ACP_SESSION_LOCK_MANAGER_SINGLETON = new LocalSessionLockManager();
|
||||
logVerbose("dispatch-acp-lock: backend=local redis_configured=false");
|
||||
return ACP_SESSION_LOCK_MANAGER_SINGLETON;
|
||||
}
|
||||
try {
|
||||
ACP_SESSION_LOCK_MANAGER_SINGLETON = new RedisSessionLockManager({ redisUrl });
|
||||
logVerbose("dispatch-acp-lock: backend=redis");
|
||||
return ACP_SESSION_LOCK_MANAGER_SINGLETON;
|
||||
} catch (error) {
|
||||
const message = error instanceof Error ? error.message : String(error);
|
||||
ACP_SESSION_LOCK_MANAGER_SINGLETON = new FailClosedSessionLockManager(message);
|
||||
logVerbose(`dispatch-acp-lock: backend=redis init_failed=${message} fail_closed=true`);
|
||||
return ACP_SESSION_LOCK_MANAGER_SINGLETON;
|
||||
}
|
||||
}
|
||||
|
||||
export function resetAcpSessionLockManagerForTests(): void {
|
||||
ACP_SESSION_LOCK_MANAGER_SINGLETON = null;
|
||||
}
|
||||
Loading…
x
Reference in New Issue
Block a user