fix(acp): harden session lifecycle against flooding
This commit is contained in:
parent
4ddc4dfd76
commit
e01011e3e4
@ -17,6 +17,7 @@ Docs: https://docs.openclaw.ai
|
||||
|
||||
### Fixes
|
||||
|
||||
- Security/ACP: harden ACP bridge session management with duplicate-session refresh, idle-session reaping, oldest-idle soft-cap eviction, and burst rate limiting on session creation to reduce local DoS risk without disrupting normal IDE usage.
|
||||
- Security/Gateway: rate-limit control-plane write RPCs (`config.apply`, `config.patch`, `update.run`) to 3 requests per minute per `deviceId+clientIp`, add restart single-flight coalescing plus a 30-second restart cooldown, and log actor/device/ip with changed-path audit details for config/update-triggered restarts.
|
||||
- Commands/Doctor: skip embedding-provider warnings when `memory.backend` is `qmd`, because QMD manages embeddings internally and does not require `memorySearch` providers. (#17263) Thanks @miloudbelarebia.
|
||||
- Security/Webhooks: harden Feishu and Zalo webhook ingress with webhook-mode token preconditions, loopback-default Feishu bind host, JSON content-type enforcement, per-path rate limiting, replay dedupe for Zalo events, constant-time Zalo secret comparison, and anomaly status counters.
|
||||
|
||||
@ -1,4 +1,4 @@
|
||||
import { afterEach, describe, expect, it, vi } from "vitest";
|
||||
import { afterEach, beforeEach, describe, expect, it, vi } from "vitest";
|
||||
import type { GatewayClient } from "../gateway/client.js";
|
||||
import { parseSessionMeta, resolveSessionKey } from "./session-mapper.js";
|
||||
import { createInMemorySessionStore } from "./session.js";
|
||||
@ -57,7 +57,17 @@ describe("acp session mapper", () => {
|
||||
});
|
||||
|
||||
describe("acp session manager", () => {
|
||||
const store = createInMemorySessionStore();
|
||||
let nowMs = 0;
|
||||
const now = () => nowMs;
|
||||
const advance = (ms: number) => {
|
||||
nowMs += ms;
|
||||
};
|
||||
let store = createInMemorySessionStore({ now });
|
||||
|
||||
beforeEach(() => {
|
||||
nowMs = 1_000;
|
||||
store = createInMemorySessionStore({ now });
|
||||
});
|
||||
|
||||
afterEach(() => {
|
||||
store.clearAllSessionsForTest();
|
||||
@ -77,4 +87,113 @@ describe("acp session manager", () => {
|
||||
expect(cancelled).toBe(true);
|
||||
expect(store.getSessionByRunId("run-1")).toBeUndefined();
|
||||
});
|
||||
|
||||
it("refreshes existing session IDs instead of creating duplicates", () => {
|
||||
const first = store.createSession({
|
||||
sessionId: "existing",
|
||||
sessionKey: "acp:one",
|
||||
cwd: "/tmp/one",
|
||||
});
|
||||
advance(500);
|
||||
|
||||
const refreshed = store.createSession({
|
||||
sessionId: "existing",
|
||||
sessionKey: "acp:two",
|
||||
cwd: "/tmp/two",
|
||||
});
|
||||
|
||||
expect(refreshed).toBe(first);
|
||||
expect(refreshed.sessionKey).toBe("acp:two");
|
||||
expect(refreshed.cwd).toBe("/tmp/two");
|
||||
expect(refreshed.createdAt).toBe(1_000);
|
||||
expect(refreshed.lastTouchedAt).toBe(1_500);
|
||||
});
|
||||
|
||||
it("reaps idle sessions before enforcing the max session cap", () => {
|
||||
const boundedStore = createInMemorySessionStore({
|
||||
maxSessions: 1,
|
||||
idleTtlMs: 1_000,
|
||||
now,
|
||||
});
|
||||
try {
|
||||
boundedStore.createSession({
|
||||
sessionId: "old",
|
||||
sessionKey: "acp:old",
|
||||
cwd: "/tmp",
|
||||
});
|
||||
advance(2_000);
|
||||
const fresh = boundedStore.createSession({
|
||||
sessionId: "fresh",
|
||||
sessionKey: "acp:fresh",
|
||||
cwd: "/tmp",
|
||||
});
|
||||
|
||||
expect(fresh.sessionId).toBe("fresh");
|
||||
expect(boundedStore.getSession("old")).toBeUndefined();
|
||||
} finally {
|
||||
boundedStore.clearAllSessionsForTest();
|
||||
}
|
||||
});
|
||||
|
||||
it("uses soft-cap eviction for the oldest idle session when full", () => {
|
||||
const boundedStore = createInMemorySessionStore({
|
||||
maxSessions: 2,
|
||||
idleTtlMs: 24 * 60 * 60 * 1_000,
|
||||
now,
|
||||
});
|
||||
try {
|
||||
const first = boundedStore.createSession({
|
||||
sessionId: "first",
|
||||
sessionKey: "acp:first",
|
||||
cwd: "/tmp",
|
||||
});
|
||||
advance(100);
|
||||
const second = boundedStore.createSession({
|
||||
sessionId: "second",
|
||||
sessionKey: "acp:second",
|
||||
cwd: "/tmp",
|
||||
});
|
||||
const controller = new AbortController();
|
||||
boundedStore.setActiveRun(second.sessionId, "run-2", controller);
|
||||
advance(100);
|
||||
|
||||
const third = boundedStore.createSession({
|
||||
sessionId: "third",
|
||||
sessionKey: "acp:third",
|
||||
cwd: "/tmp",
|
||||
});
|
||||
|
||||
expect(third.sessionId).toBe("third");
|
||||
expect(boundedStore.getSession(first.sessionId)).toBeUndefined();
|
||||
expect(boundedStore.getSession(second.sessionId)).toBeDefined();
|
||||
} finally {
|
||||
boundedStore.clearAllSessionsForTest();
|
||||
}
|
||||
});
|
||||
|
||||
it("rejects when full and no session is evictable", () => {
|
||||
const boundedStore = createInMemorySessionStore({
|
||||
maxSessions: 1,
|
||||
idleTtlMs: 24 * 60 * 60 * 1_000,
|
||||
now,
|
||||
});
|
||||
try {
|
||||
const only = boundedStore.createSession({
|
||||
sessionId: "only",
|
||||
sessionKey: "acp:only",
|
||||
cwd: "/tmp",
|
||||
});
|
||||
boundedStore.setActiveRun(only.sessionId, "run-only", new AbortController());
|
||||
|
||||
expect(() =>
|
||||
boundedStore.createSession({
|
||||
sessionId: "next",
|
||||
sessionKey: "acp:next",
|
||||
cwd: "/tmp",
|
||||
}),
|
||||
).toThrow(/session limit reached/i);
|
||||
} finally {
|
||||
boundedStore.clearAllSessionsForTest();
|
||||
}
|
||||
});
|
||||
});
|
||||
|
||||
@ -11,17 +11,93 @@ export type AcpSessionStore = {
|
||||
clearAllSessionsForTest: () => void;
|
||||
};
|
||||
|
||||
export function createInMemorySessionStore(): AcpSessionStore {
|
||||
type AcpSessionStoreOptions = {
|
||||
maxSessions?: number;
|
||||
idleTtlMs?: number;
|
||||
now?: () => number;
|
||||
};
|
||||
|
||||
const DEFAULT_MAX_SESSIONS = 5_000;
|
||||
const DEFAULT_IDLE_TTL_MS = 24 * 60 * 60 * 1_000;
|
||||
|
||||
export function createInMemorySessionStore(options: AcpSessionStoreOptions = {}): AcpSessionStore {
|
||||
const maxSessions = Math.max(1, options.maxSessions ?? DEFAULT_MAX_SESSIONS);
|
||||
const idleTtlMs = Math.max(1_000, options.idleTtlMs ?? DEFAULT_IDLE_TTL_MS);
|
||||
const now = options.now ?? Date.now;
|
||||
const sessions = new Map<string, AcpSession>();
|
||||
const runIdToSessionId = new Map<string, string>();
|
||||
|
||||
const touchSession = (session: AcpSession, nowMs: number) => {
|
||||
session.lastTouchedAt = nowMs;
|
||||
};
|
||||
|
||||
const removeSession = (sessionId: string) => {
|
||||
const session = sessions.get(sessionId);
|
||||
if (!session) {
|
||||
return false;
|
||||
}
|
||||
if (session.activeRunId) {
|
||||
runIdToSessionId.delete(session.activeRunId);
|
||||
}
|
||||
session.abortController?.abort();
|
||||
sessions.delete(sessionId);
|
||||
return true;
|
||||
};
|
||||
|
||||
const reapIdleSessions = (nowMs: number) => {
|
||||
const idleBefore = nowMs - idleTtlMs;
|
||||
for (const [sessionId, session] of sessions.entries()) {
|
||||
if (session.activeRunId || session.abortController) {
|
||||
continue;
|
||||
}
|
||||
if (session.lastTouchedAt > idleBefore) {
|
||||
continue;
|
||||
}
|
||||
removeSession(sessionId);
|
||||
}
|
||||
};
|
||||
|
||||
const evictOldestIdleSession = () => {
|
||||
let oldestSessionId: string | null = null;
|
||||
let oldestLastTouchedAt = Number.POSITIVE_INFINITY;
|
||||
for (const [sessionId, session] of sessions.entries()) {
|
||||
if (session.activeRunId || session.abortController) {
|
||||
continue;
|
||||
}
|
||||
if (session.lastTouchedAt >= oldestLastTouchedAt) {
|
||||
continue;
|
||||
}
|
||||
oldestLastTouchedAt = session.lastTouchedAt;
|
||||
oldestSessionId = sessionId;
|
||||
}
|
||||
if (!oldestSessionId) {
|
||||
return false;
|
||||
}
|
||||
return removeSession(oldestSessionId);
|
||||
};
|
||||
|
||||
const createSession: AcpSessionStore["createSession"] = (params) => {
|
||||
const nowMs = now();
|
||||
const sessionId = params.sessionId ?? randomUUID();
|
||||
const existingSession = sessions.get(sessionId);
|
||||
if (existingSession) {
|
||||
existingSession.sessionKey = params.sessionKey;
|
||||
existingSession.cwd = params.cwd;
|
||||
touchSession(existingSession, nowMs);
|
||||
return existingSession;
|
||||
}
|
||||
reapIdleSessions(nowMs);
|
||||
if (sessions.size >= maxSessions && !evictOldestIdleSession()) {
|
||||
throw new Error(
|
||||
`ACP session limit reached (max ${maxSessions}). Close idle ACP clients and retry.`,
|
||||
);
|
||||
}
|
||||
const session: AcpSession = {
|
||||
sessionId,
|
||||
sessionKey: params.sessionKey,
|
||||
cwd: params.cwd,
|
||||
createdAt: Date.now(),
|
||||
createdAt: nowMs,
|
||||
lastTouchedAt: nowMs,
|
||||
abortController: null,
|
||||
activeRunId: null,
|
||||
};
|
||||
@ -29,11 +105,24 @@ export function createInMemorySessionStore(): AcpSessionStore {
|
||||
return session;
|
||||
};
|
||||
|
||||
const getSession: AcpSessionStore["getSession"] = (sessionId) => sessions.get(sessionId);
|
||||
const getSession: AcpSessionStore["getSession"] = (sessionId) => {
|
||||
const session = sessions.get(sessionId);
|
||||
if (session) {
|
||||
touchSession(session, now());
|
||||
}
|
||||
return session;
|
||||
};
|
||||
|
||||
const getSessionByRunId: AcpSessionStore["getSessionByRunId"] = (runId) => {
|
||||
const sessionId = runIdToSessionId.get(runId);
|
||||
return sessionId ? sessions.get(sessionId) : undefined;
|
||||
if (!sessionId) {
|
||||
return undefined;
|
||||
}
|
||||
const session = sessions.get(sessionId);
|
||||
if (session) {
|
||||
touchSession(session, now());
|
||||
}
|
||||
return session;
|
||||
};
|
||||
|
||||
const setActiveRun: AcpSessionStore["setActiveRun"] = (sessionId, runId, abortController) => {
|
||||
@ -44,6 +133,7 @@ export function createInMemorySessionStore(): AcpSessionStore {
|
||||
session.activeRunId = runId;
|
||||
session.abortController = abortController;
|
||||
runIdToSessionId.set(runId, sessionId);
|
||||
touchSession(session, now());
|
||||
};
|
||||
|
||||
const clearActiveRun: AcpSessionStore["clearActiveRun"] = (sessionId) => {
|
||||
@ -56,6 +146,7 @@ export function createInMemorySessionStore(): AcpSessionStore {
|
||||
}
|
||||
session.activeRunId = null;
|
||||
session.abortController = null;
|
||||
touchSession(session, now());
|
||||
};
|
||||
|
||||
const cancelActiveRun: AcpSessionStore["cancelActiveRun"] = (sessionId) => {
|
||||
@ -69,6 +160,7 @@ export function createInMemorySessionStore(): AcpSessionStore {
|
||||
}
|
||||
session.abortController = null;
|
||||
session.activeRunId = null;
|
||||
touchSession(session, now());
|
||||
return true;
|
||||
};
|
||||
|
||||
|
||||
78
src/acp/translator.session-rate-limit.test.ts
Normal file
78
src/acp/translator.session-rate-limit.test.ts
Normal file
@ -0,0 +1,78 @@
|
||||
import type {
|
||||
AgentSideConnection,
|
||||
LoadSessionRequest,
|
||||
NewSessionRequest,
|
||||
} from "@agentclientprotocol/sdk";
|
||||
import { describe, expect, it, vi } from "vitest";
|
||||
import type { GatewayClient } from "../gateway/client.js";
|
||||
import { createInMemorySessionStore } from "./session.js";
|
||||
import { AcpGatewayAgent } from "./translator.js";
|
||||
|
||||
function createConnection(): AgentSideConnection {
|
||||
return {
|
||||
sessionUpdate: vi.fn(async () => {}),
|
||||
} as unknown as AgentSideConnection;
|
||||
}
|
||||
|
||||
function createGateway(): GatewayClient {
|
||||
return {
|
||||
request: vi.fn(async () => ({ ok: true })),
|
||||
} as unknown as GatewayClient;
|
||||
}
|
||||
|
||||
function createNewSessionRequest(cwd = "/tmp"): NewSessionRequest {
|
||||
return {
|
||||
cwd,
|
||||
mcpServers: [],
|
||||
_meta: {},
|
||||
} as unknown as NewSessionRequest;
|
||||
}
|
||||
|
||||
function createLoadSessionRequest(sessionId: string, cwd = "/tmp"): LoadSessionRequest {
|
||||
return {
|
||||
sessionId,
|
||||
cwd,
|
||||
mcpServers: [],
|
||||
_meta: {},
|
||||
} as unknown as LoadSessionRequest;
|
||||
}
|
||||
|
||||
describe("acp session creation rate limit", () => {
|
||||
it("rate limits excessive newSession bursts", async () => {
|
||||
const sessionStore = createInMemorySessionStore();
|
||||
const agent = new AcpGatewayAgent(createConnection(), createGateway(), {
|
||||
sessionStore,
|
||||
sessionCreateRateLimit: {
|
||||
maxRequests: 2,
|
||||
windowMs: 60_000,
|
||||
},
|
||||
});
|
||||
|
||||
await agent.newSession(createNewSessionRequest());
|
||||
await agent.newSession(createNewSessionRequest());
|
||||
await expect(agent.newSession(createNewSessionRequest())).rejects.toThrow(
|
||||
/session creation rate limit exceeded/i,
|
||||
);
|
||||
|
||||
sessionStore.clearAllSessionsForTest();
|
||||
});
|
||||
|
||||
it("does not count loadSession refreshes for an existing session ID", async () => {
|
||||
const sessionStore = createInMemorySessionStore();
|
||||
const agent = new AcpGatewayAgent(createConnection(), createGateway(), {
|
||||
sessionStore,
|
||||
sessionCreateRateLimit: {
|
||||
maxRequests: 1,
|
||||
windowMs: 60_000,
|
||||
},
|
||||
});
|
||||
|
||||
await agent.loadSession(createLoadSessionRequest("shared-session"));
|
||||
await agent.loadSession(createLoadSessionRequest("shared-session"));
|
||||
await expect(agent.loadSession(createLoadSessionRequest("new-session"))).rejects.toThrow(
|
||||
/session creation rate limit exceeded/i,
|
||||
);
|
||||
|
||||
sessionStore.clearAllSessionsForTest();
|
||||
});
|
||||
});
|
||||
@ -1,4 +1,3 @@
|
||||
import { randomUUID } from "node:crypto";
|
||||
import type {
|
||||
Agent,
|
||||
AgentSideConnection,
|
||||
@ -20,6 +19,7 @@ import type {
|
||||
StopReason,
|
||||
} from "@agentclientprotocol/sdk";
|
||||
import { PROTOCOL_VERSION } from "@agentclientprotocol/sdk";
|
||||
import { randomUUID } from "node:crypto";
|
||||
import type { GatewayClient } from "../gateway/client.js";
|
||||
import type { EventFrame } from "../gateway/protocol/index.js";
|
||||
import type { SessionsListResult } from "../gateway/session-utils.js";
|
||||
@ -50,12 +50,50 @@ type AcpGatewayAgentOptions = AcpServerOptions & {
|
||||
sessionStore?: AcpSessionStore;
|
||||
};
|
||||
|
||||
const SESSION_CREATE_RATE_LIMIT_DEFAULT_MAX_REQUESTS = 120;
|
||||
const SESSION_CREATE_RATE_LIMIT_DEFAULT_WINDOW_MS = 10_000;
|
||||
|
||||
class SessionCreateRateLimiter {
|
||||
private count = 0;
|
||||
private windowStartMs = 0;
|
||||
|
||||
constructor(
|
||||
private readonly maxRequests: number,
|
||||
private readonly windowMs: number,
|
||||
private readonly now: () => number = Date.now,
|
||||
) {}
|
||||
|
||||
consume(): { allowed: boolean; retryAfterMs: number; remaining: number } {
|
||||
const nowMs = this.now();
|
||||
if (nowMs - this.windowStartMs >= this.windowMs) {
|
||||
this.windowStartMs = nowMs;
|
||||
this.count = 0;
|
||||
}
|
||||
|
||||
if (this.count >= this.maxRequests) {
|
||||
return {
|
||||
allowed: false,
|
||||
retryAfterMs: Math.max(0, this.windowStartMs + this.windowMs - nowMs),
|
||||
remaining: 0,
|
||||
};
|
||||
}
|
||||
|
||||
this.count += 1;
|
||||
return {
|
||||
allowed: true,
|
||||
retryAfterMs: 0,
|
||||
remaining: Math.max(0, this.maxRequests - this.count),
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
export class AcpGatewayAgent implements Agent {
|
||||
private connection: AgentSideConnection;
|
||||
private gateway: GatewayClient;
|
||||
private opts: AcpGatewayAgentOptions;
|
||||
private log: (msg: string) => void;
|
||||
private sessionStore: AcpSessionStore;
|
||||
private sessionCreateRateLimiter: SessionCreateRateLimiter;
|
||||
private pendingPrompts = new Map<string, PendingPrompt>();
|
||||
|
||||
constructor(
|
||||
@ -68,6 +106,16 @@ export class AcpGatewayAgent implements Agent {
|
||||
this.opts = opts;
|
||||
this.log = opts.verbose ? (msg: string) => process.stderr.write(`[acp] ${msg}\n`) : () => {};
|
||||
this.sessionStore = opts.sessionStore ?? defaultAcpSessionStore;
|
||||
this.sessionCreateRateLimiter = new SessionCreateRateLimiter(
|
||||
Math.max(
|
||||
1,
|
||||
opts.sessionCreateRateLimit?.maxRequests ?? SESSION_CREATE_RATE_LIMIT_DEFAULT_MAX_REQUESTS,
|
||||
),
|
||||
Math.max(
|
||||
1_000,
|
||||
opts.sessionCreateRateLimit?.windowMs ?? SESSION_CREATE_RATE_LIMIT_DEFAULT_WINDOW_MS,
|
||||
),
|
||||
);
|
||||
}
|
||||
|
||||
start(): void {
|
||||
@ -124,6 +172,7 @@ export class AcpGatewayAgent implements Agent {
|
||||
if (params.mcpServers.length > 0) {
|
||||
this.log(`ignoring ${params.mcpServers.length} MCP servers`);
|
||||
}
|
||||
this.enforceSessionCreateRateLimit("newSession");
|
||||
|
||||
const sessionId = randomUUID();
|
||||
const meta = parseSessionMeta(params._meta);
|
||||
@ -154,6 +203,9 @@ export class AcpGatewayAgent implements Agent {
|
||||
if (params.mcpServers.length > 0) {
|
||||
this.log(`ignoring ${params.mcpServers.length} MCP servers`);
|
||||
}
|
||||
if (!this.sessionStore.getSession(params.sessionId)) {
|
||||
this.enforceSessionCreateRateLimit("loadSession");
|
||||
}
|
||||
|
||||
const meta = parseSessionMeta(params._meta);
|
||||
const sessionKey = await resolveSessionKey({
|
||||
@ -451,4 +503,14 @@ export class AcpGatewayAgent implements Agent {
|
||||
},
|
||||
});
|
||||
}
|
||||
|
||||
private enforceSessionCreateRateLimit(method: "newSession" | "loadSession"): void {
|
||||
const budget = this.sessionCreateRateLimiter.consume();
|
||||
if (budget.allowed) {
|
||||
return;
|
||||
}
|
||||
throw new Error(
|
||||
`ACP session creation rate limit exceeded for ${method}; retry after ${Math.ceil(budget.retryAfterMs / 1_000)}s.`,
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
@ -6,6 +6,7 @@ export type AcpSession = {
|
||||
sessionKey: string;
|
||||
cwd: string;
|
||||
createdAt: number;
|
||||
lastTouchedAt: number;
|
||||
abortController: AbortController | null;
|
||||
activeRunId: string | null;
|
||||
};
|
||||
@ -19,6 +20,10 @@ export type AcpServerOptions = {
|
||||
requireExistingSession?: boolean;
|
||||
resetSession?: boolean;
|
||||
prefixCwd?: boolean;
|
||||
sessionCreateRateLimit?: {
|
||||
maxRequests?: number;
|
||||
windowMs?: number;
|
||||
};
|
||||
verbose?: boolean;
|
||||
};
|
||||
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user