diff --git a/CHANGELOG.md b/CHANGELOG.md index 8e01313986e..8dd74fd3313 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -17,6 +17,7 @@ Docs: https://docs.openclaw.ai ### Fixes +- Security/ACP: harden ACP bridge session management with duplicate-session refresh, idle-session reaping, oldest-idle soft-cap eviction, and burst rate limiting on session creation to reduce local DoS risk without disrupting normal IDE usage. - Security/Gateway: rate-limit control-plane write RPCs (`config.apply`, `config.patch`, `update.run`) to 3 requests per minute per `deviceId+clientIp`, add restart single-flight coalescing plus a 30-second restart cooldown, and log actor/device/ip with changed-path audit details for config/update-triggered restarts. - Commands/Doctor: skip embedding-provider warnings when `memory.backend` is `qmd`, because QMD manages embeddings internally and does not require `memorySearch` providers. (#17263) Thanks @miloudbelarebia. - Security/Webhooks: harden Feishu and Zalo webhook ingress with webhook-mode token preconditions, loopback-default Feishu bind host, JSON content-type enforcement, per-path rate limiting, replay dedupe for Zalo events, constant-time Zalo secret comparison, and anomaly status counters. diff --git a/src/acp/session-mapper.test.ts b/src/acp/session-mapper.test.ts index ac06dcf4b89..be026e632a8 100644 --- a/src/acp/session-mapper.test.ts +++ b/src/acp/session-mapper.test.ts @@ -1,4 +1,4 @@ -import { afterEach, describe, expect, it, vi } from "vitest"; +import { afterEach, beforeEach, describe, expect, it, vi } from "vitest"; import type { GatewayClient } from "../gateway/client.js"; import { parseSessionMeta, resolveSessionKey } from "./session-mapper.js"; import { createInMemorySessionStore } from "./session.js"; @@ -57,7 +57,17 @@ describe("acp session mapper", () => { }); describe("acp session manager", () => { - const store = createInMemorySessionStore(); + let nowMs = 0; + const now = () => nowMs; + const advance = (ms: number) => { + nowMs += ms; + }; + let store = createInMemorySessionStore({ now }); + + beforeEach(() => { + nowMs = 1_000; + store = createInMemorySessionStore({ now }); + }); afterEach(() => { store.clearAllSessionsForTest(); @@ -77,4 +87,113 @@ describe("acp session manager", () => { expect(cancelled).toBe(true); expect(store.getSessionByRunId("run-1")).toBeUndefined(); }); + + it("refreshes existing session IDs instead of creating duplicates", () => { + const first = store.createSession({ + sessionId: "existing", + sessionKey: "acp:one", + cwd: "/tmp/one", + }); + advance(500); + + const refreshed = store.createSession({ + sessionId: "existing", + sessionKey: "acp:two", + cwd: "/tmp/two", + }); + + expect(refreshed).toBe(first); + expect(refreshed.sessionKey).toBe("acp:two"); + expect(refreshed.cwd).toBe("/tmp/two"); + expect(refreshed.createdAt).toBe(1_000); + expect(refreshed.lastTouchedAt).toBe(1_500); + }); + + it("reaps idle sessions before enforcing the max session cap", () => { + const boundedStore = createInMemorySessionStore({ + maxSessions: 1, + idleTtlMs: 1_000, + now, + }); + try { + boundedStore.createSession({ + sessionId: "old", + sessionKey: "acp:old", + cwd: "/tmp", + }); + advance(2_000); + const fresh = boundedStore.createSession({ + sessionId: "fresh", + sessionKey: "acp:fresh", + cwd: "/tmp", + }); + + expect(fresh.sessionId).toBe("fresh"); + expect(boundedStore.getSession("old")).toBeUndefined(); + } finally { + boundedStore.clearAllSessionsForTest(); + } + }); + + it("uses soft-cap eviction for the oldest idle session when full", () => { + const boundedStore = createInMemorySessionStore({ + maxSessions: 2, + idleTtlMs: 24 * 60 * 60 * 1_000, + now, + }); + try { + const first = boundedStore.createSession({ + sessionId: "first", + sessionKey: "acp:first", + cwd: "/tmp", + }); + advance(100); + const second = boundedStore.createSession({ + sessionId: "second", + sessionKey: "acp:second", + cwd: "/tmp", + }); + const controller = new AbortController(); + boundedStore.setActiveRun(second.sessionId, "run-2", controller); + advance(100); + + const third = boundedStore.createSession({ + sessionId: "third", + sessionKey: "acp:third", + cwd: "/tmp", + }); + + expect(third.sessionId).toBe("third"); + expect(boundedStore.getSession(first.sessionId)).toBeUndefined(); + expect(boundedStore.getSession(second.sessionId)).toBeDefined(); + } finally { + boundedStore.clearAllSessionsForTest(); + } + }); + + it("rejects when full and no session is evictable", () => { + const boundedStore = createInMemorySessionStore({ + maxSessions: 1, + idleTtlMs: 24 * 60 * 60 * 1_000, + now, + }); + try { + const only = boundedStore.createSession({ + sessionId: "only", + sessionKey: "acp:only", + cwd: "/tmp", + }); + boundedStore.setActiveRun(only.sessionId, "run-only", new AbortController()); + + expect(() => + boundedStore.createSession({ + sessionId: "next", + sessionKey: "acp:next", + cwd: "/tmp", + }), + ).toThrow(/session limit reached/i); + } finally { + boundedStore.clearAllSessionsForTest(); + } + }); }); diff --git a/src/acp/session.ts b/src/acp/session.ts index 3214b08c301..92c45d87522 100644 --- a/src/acp/session.ts +++ b/src/acp/session.ts @@ -11,17 +11,93 @@ export type AcpSessionStore = { clearAllSessionsForTest: () => void; }; -export function createInMemorySessionStore(): AcpSessionStore { +type AcpSessionStoreOptions = { + maxSessions?: number; + idleTtlMs?: number; + now?: () => number; +}; + +const DEFAULT_MAX_SESSIONS = 5_000; +const DEFAULT_IDLE_TTL_MS = 24 * 60 * 60 * 1_000; + +export function createInMemorySessionStore(options: AcpSessionStoreOptions = {}): AcpSessionStore { + const maxSessions = Math.max(1, options.maxSessions ?? DEFAULT_MAX_SESSIONS); + const idleTtlMs = Math.max(1_000, options.idleTtlMs ?? DEFAULT_IDLE_TTL_MS); + const now = options.now ?? Date.now; const sessions = new Map(); const runIdToSessionId = new Map(); + const touchSession = (session: AcpSession, nowMs: number) => { + session.lastTouchedAt = nowMs; + }; + + const removeSession = (sessionId: string) => { + const session = sessions.get(sessionId); + if (!session) { + return false; + } + if (session.activeRunId) { + runIdToSessionId.delete(session.activeRunId); + } + session.abortController?.abort(); + sessions.delete(sessionId); + return true; + }; + + const reapIdleSessions = (nowMs: number) => { + const idleBefore = nowMs - idleTtlMs; + for (const [sessionId, session] of sessions.entries()) { + if (session.activeRunId || session.abortController) { + continue; + } + if (session.lastTouchedAt > idleBefore) { + continue; + } + removeSession(sessionId); + } + }; + + const evictOldestIdleSession = () => { + let oldestSessionId: string | null = null; + let oldestLastTouchedAt = Number.POSITIVE_INFINITY; + for (const [sessionId, session] of sessions.entries()) { + if (session.activeRunId || session.abortController) { + continue; + } + if (session.lastTouchedAt >= oldestLastTouchedAt) { + continue; + } + oldestLastTouchedAt = session.lastTouchedAt; + oldestSessionId = sessionId; + } + if (!oldestSessionId) { + return false; + } + return removeSession(oldestSessionId); + }; + const createSession: AcpSessionStore["createSession"] = (params) => { + const nowMs = now(); const sessionId = params.sessionId ?? randomUUID(); + const existingSession = sessions.get(sessionId); + if (existingSession) { + existingSession.sessionKey = params.sessionKey; + existingSession.cwd = params.cwd; + touchSession(existingSession, nowMs); + return existingSession; + } + reapIdleSessions(nowMs); + if (sessions.size >= maxSessions && !evictOldestIdleSession()) { + throw new Error( + `ACP session limit reached (max ${maxSessions}). Close idle ACP clients and retry.`, + ); + } const session: AcpSession = { sessionId, sessionKey: params.sessionKey, cwd: params.cwd, - createdAt: Date.now(), + createdAt: nowMs, + lastTouchedAt: nowMs, abortController: null, activeRunId: null, }; @@ -29,11 +105,24 @@ export function createInMemorySessionStore(): AcpSessionStore { return session; }; - const getSession: AcpSessionStore["getSession"] = (sessionId) => sessions.get(sessionId); + const getSession: AcpSessionStore["getSession"] = (sessionId) => { + const session = sessions.get(sessionId); + if (session) { + touchSession(session, now()); + } + return session; + }; const getSessionByRunId: AcpSessionStore["getSessionByRunId"] = (runId) => { const sessionId = runIdToSessionId.get(runId); - return sessionId ? sessions.get(sessionId) : undefined; + if (!sessionId) { + return undefined; + } + const session = sessions.get(sessionId); + if (session) { + touchSession(session, now()); + } + return session; }; const setActiveRun: AcpSessionStore["setActiveRun"] = (sessionId, runId, abortController) => { @@ -44,6 +133,7 @@ export function createInMemorySessionStore(): AcpSessionStore { session.activeRunId = runId; session.abortController = abortController; runIdToSessionId.set(runId, sessionId); + touchSession(session, now()); }; const clearActiveRun: AcpSessionStore["clearActiveRun"] = (sessionId) => { @@ -56,6 +146,7 @@ export function createInMemorySessionStore(): AcpSessionStore { } session.activeRunId = null; session.abortController = null; + touchSession(session, now()); }; const cancelActiveRun: AcpSessionStore["cancelActiveRun"] = (sessionId) => { @@ -69,6 +160,7 @@ export function createInMemorySessionStore(): AcpSessionStore { } session.abortController = null; session.activeRunId = null; + touchSession(session, now()); return true; }; diff --git a/src/acp/translator.session-rate-limit.test.ts b/src/acp/translator.session-rate-limit.test.ts new file mode 100644 index 00000000000..f8e31914be6 --- /dev/null +++ b/src/acp/translator.session-rate-limit.test.ts @@ -0,0 +1,78 @@ +import type { + AgentSideConnection, + LoadSessionRequest, + NewSessionRequest, +} from "@agentclientprotocol/sdk"; +import { describe, expect, it, vi } from "vitest"; +import type { GatewayClient } from "../gateway/client.js"; +import { createInMemorySessionStore } from "./session.js"; +import { AcpGatewayAgent } from "./translator.js"; + +function createConnection(): AgentSideConnection { + return { + sessionUpdate: vi.fn(async () => {}), + } as unknown as AgentSideConnection; +} + +function createGateway(): GatewayClient { + return { + request: vi.fn(async () => ({ ok: true })), + } as unknown as GatewayClient; +} + +function createNewSessionRequest(cwd = "/tmp"): NewSessionRequest { + return { + cwd, + mcpServers: [], + _meta: {}, + } as unknown as NewSessionRequest; +} + +function createLoadSessionRequest(sessionId: string, cwd = "/tmp"): LoadSessionRequest { + return { + sessionId, + cwd, + mcpServers: [], + _meta: {}, + } as unknown as LoadSessionRequest; +} + +describe("acp session creation rate limit", () => { + it("rate limits excessive newSession bursts", async () => { + const sessionStore = createInMemorySessionStore(); + const agent = new AcpGatewayAgent(createConnection(), createGateway(), { + sessionStore, + sessionCreateRateLimit: { + maxRequests: 2, + windowMs: 60_000, + }, + }); + + await agent.newSession(createNewSessionRequest()); + await agent.newSession(createNewSessionRequest()); + await expect(agent.newSession(createNewSessionRequest())).rejects.toThrow( + /session creation rate limit exceeded/i, + ); + + sessionStore.clearAllSessionsForTest(); + }); + + it("does not count loadSession refreshes for an existing session ID", async () => { + const sessionStore = createInMemorySessionStore(); + const agent = new AcpGatewayAgent(createConnection(), createGateway(), { + sessionStore, + sessionCreateRateLimit: { + maxRequests: 1, + windowMs: 60_000, + }, + }); + + await agent.loadSession(createLoadSessionRequest("shared-session")); + await agent.loadSession(createLoadSessionRequest("shared-session")); + await expect(agent.loadSession(createLoadSessionRequest("new-session"))).rejects.toThrow( + /session creation rate limit exceeded/i, + ); + + sessionStore.clearAllSessionsForTest(); + }); +}); diff --git a/src/acp/translator.ts b/src/acp/translator.ts index 3b8def1ec38..dc63020a3a4 100644 --- a/src/acp/translator.ts +++ b/src/acp/translator.ts @@ -1,4 +1,3 @@ -import { randomUUID } from "node:crypto"; import type { Agent, AgentSideConnection, @@ -20,6 +19,7 @@ import type { StopReason, } from "@agentclientprotocol/sdk"; import { PROTOCOL_VERSION } from "@agentclientprotocol/sdk"; +import { randomUUID } from "node:crypto"; import type { GatewayClient } from "../gateway/client.js"; import type { EventFrame } from "../gateway/protocol/index.js"; import type { SessionsListResult } from "../gateway/session-utils.js"; @@ -50,12 +50,50 @@ type AcpGatewayAgentOptions = AcpServerOptions & { sessionStore?: AcpSessionStore; }; +const SESSION_CREATE_RATE_LIMIT_DEFAULT_MAX_REQUESTS = 120; +const SESSION_CREATE_RATE_LIMIT_DEFAULT_WINDOW_MS = 10_000; + +class SessionCreateRateLimiter { + private count = 0; + private windowStartMs = 0; + + constructor( + private readonly maxRequests: number, + private readonly windowMs: number, + private readonly now: () => number = Date.now, + ) {} + + consume(): { allowed: boolean; retryAfterMs: number; remaining: number } { + const nowMs = this.now(); + if (nowMs - this.windowStartMs >= this.windowMs) { + this.windowStartMs = nowMs; + this.count = 0; + } + + if (this.count >= this.maxRequests) { + return { + allowed: false, + retryAfterMs: Math.max(0, this.windowStartMs + this.windowMs - nowMs), + remaining: 0, + }; + } + + this.count += 1; + return { + allowed: true, + retryAfterMs: 0, + remaining: Math.max(0, this.maxRequests - this.count), + }; + } +} + export class AcpGatewayAgent implements Agent { private connection: AgentSideConnection; private gateway: GatewayClient; private opts: AcpGatewayAgentOptions; private log: (msg: string) => void; private sessionStore: AcpSessionStore; + private sessionCreateRateLimiter: SessionCreateRateLimiter; private pendingPrompts = new Map(); constructor( @@ -68,6 +106,16 @@ export class AcpGatewayAgent implements Agent { this.opts = opts; this.log = opts.verbose ? (msg: string) => process.stderr.write(`[acp] ${msg}\n`) : () => {}; this.sessionStore = opts.sessionStore ?? defaultAcpSessionStore; + this.sessionCreateRateLimiter = new SessionCreateRateLimiter( + Math.max( + 1, + opts.sessionCreateRateLimit?.maxRequests ?? SESSION_CREATE_RATE_LIMIT_DEFAULT_MAX_REQUESTS, + ), + Math.max( + 1_000, + opts.sessionCreateRateLimit?.windowMs ?? SESSION_CREATE_RATE_LIMIT_DEFAULT_WINDOW_MS, + ), + ); } start(): void { @@ -124,6 +172,7 @@ export class AcpGatewayAgent implements Agent { if (params.mcpServers.length > 0) { this.log(`ignoring ${params.mcpServers.length} MCP servers`); } + this.enforceSessionCreateRateLimit("newSession"); const sessionId = randomUUID(); const meta = parseSessionMeta(params._meta); @@ -154,6 +203,9 @@ export class AcpGatewayAgent implements Agent { if (params.mcpServers.length > 0) { this.log(`ignoring ${params.mcpServers.length} MCP servers`); } + if (!this.sessionStore.getSession(params.sessionId)) { + this.enforceSessionCreateRateLimit("loadSession"); + } const meta = parseSessionMeta(params._meta); const sessionKey = await resolveSessionKey({ @@ -451,4 +503,14 @@ export class AcpGatewayAgent implements Agent { }, }); } + + private enforceSessionCreateRateLimit(method: "newSession" | "loadSession"): void { + const budget = this.sessionCreateRateLimiter.consume(); + if (budget.allowed) { + return; + } + throw new Error( + `ACP session creation rate limit exceeded for ${method}; retry after ${Math.ceil(budget.retryAfterMs / 1_000)}s.`, + ); + } } diff --git a/src/acp/types.ts b/src/acp/types.ts index b6c713442b1..b266f6a5eef 100644 --- a/src/acp/types.ts +++ b/src/acp/types.ts @@ -6,6 +6,7 @@ export type AcpSession = { sessionKey: string; cwd: string; createdAt: number; + lastTouchedAt: number; abortController: AbortController | null; activeRunId: string | null; }; @@ -19,6 +20,10 @@ export type AcpServerOptions = { requireExistingSession?: boolean; resetSession?: boolean; prefixCwd?: boolean; + sessionCreateRateLimit?: { + maxRequests?: number; + windowMs?: number; + }; verbose?: boolean; };