Merge pull request #85 from DenchHQ/kumareth/fix-chat-freeze-concurrent-sessions

feat(chat): fix infinite loading and enable concurrent sessions
This commit is contained in:
Kumar Abhirup 2026-03-09 09:02:11 -07:00 committed by GitHub
commit 2b79f2eda9
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
15 changed files with 845 additions and 29 deletions

View File

@ -16,6 +16,18 @@ vi.mock("@/lib/workspace", () => ({
resolveAgentWorkspacePrefix: vi.fn(() => null),
}));
// Mock web-sessions shared module
vi.mock("@/app/api/web-sessions/shared", () => ({
getSessionMeta: vi.fn(() => undefined),
resolveSessionKey: vi.fn(
(sessionId: string, fallbackAgentId: string) =>
`agent:${fallbackAgentId}:web:${sessionId}`,
),
resolveSessionAgentId: vi.fn(
(_sessionId: string, fallbackAgentId: string) => fallbackAgentId,
),
}));
describe("Chat API routes", () => {
beforeEach(() => {
vi.resetModules();
@ -32,6 +44,16 @@ describe("Chat API routes", () => {
vi.mock("@/lib/workspace", () => ({
resolveAgentWorkspacePrefix: vi.fn(() => null),
}));
vi.mock("@/app/api/web-sessions/shared", () => ({
getSessionMeta: vi.fn(() => undefined),
resolveSessionKey: vi.fn(
(sessionId: string, fallbackAgentId: string) =>
`agent:${fallbackAgentId}:web:${sessionId}`,
),
resolveSessionAgentId: vi.fn(
(_sessionId: string, fallbackAgentId: string) => fallbackAgentId,
),
}));
});
afterEach(() => {

View File

@ -15,6 +15,7 @@ import { trackServer } from "@/lib/telemetry";
import { existsSync, readFileSync } from "node:fs";
import { join } from "node:path";
import { resolveOpenClawStateDir } from "@/lib/workspace";
import { getSessionMeta } from "@/app/api/web-sessions/shared";
export const runtime = "nodejs";
@ -119,11 +120,16 @@ export async function POST(req: Request) {
parts: lastUserMessage.parts as unknown[],
html: userHtml,
});
const sessionMeta = getSessionMeta(sessionId);
const effectiveAgentId = sessionMeta?.chatAgentId ?? sessionMeta?.workspaceAgentId;
try {
startRun({
sessionId,
message: agentMessage,
agentSessionId: sessionId,
overrideAgentId: effectiveAgentId,
});
} catch (err) {
return new Response(

View File

@ -1,6 +1,8 @@
import { existsSync, readFileSync } from "node:fs";
import { join } from "node:path";
import { resolveOpenClawStateDir, resolveActiveAgentId } from "@/lib/workspace";
import { getActiveRun } from "@/lib/active-runs";
import { resolveSessionKey } from "@/app/api/web-sessions/shared";
export const runtime = "nodejs";
@ -38,7 +40,10 @@ export async function GET(req: Request) {
return Response.json({ error: "sessionId required" }, { status: 400 });
}
const webSessionKey = `agent:${resolveActiveAgentId()}:web:${sessionId}`;
const run = getActiveRun(sessionId);
const fallbackAgentId = resolveActiveAgentId();
const webSessionKey = run?.pinnedSessionKey
?? resolveSessionKey(sessionId, fallbackAgentId);
const entries = readSubagentRegistry();
const subagents = entries

View File

@ -2,6 +2,12 @@ import { writeFileSync } from "node:fs";
import { randomUUID } from "node:crypto";
import { trackServer } from "@/lib/telemetry";
import { type WebSessionMeta, ensureDir, readIndex, writeIndex } from "./shared";
import {
getActiveWorkspaceName,
resolveActiveAgentId,
resolveWorkspaceRoot,
} from "@/lib/workspace";
import { allocateChatAgent } from "@/lib/chat-agent-registry";
export { type WebSessionMeta };
@ -26,13 +32,40 @@ export async function GET(req: Request) {
export async function POST(req: Request) {
const body = await req.json().catch(() => ({}));
const id = randomUUID();
const now = Date.now();
const workspaceName = getActiveWorkspaceName() ?? undefined;
const workspaceAgentId = resolveActiveAgentId();
const workspaceRoot = resolveWorkspaceRoot() ?? undefined;
// Assign a pool slot agent for concurrent chat support.
// Falls back to the workspace agent if no slots are available.
let chatAgentId: string | undefined;
let effectiveAgentId = workspaceAgentId;
try {
const slot = allocateChatAgent(id);
chatAgentId = slot.chatAgentId;
effectiveAgentId = slot.chatAgentId;
} catch {
// Fall back to workspace agent
}
const gatewaySessionKey = `agent:${effectiveAgentId}:web:${id}`;
const session: WebSessionMeta = {
id,
title: body.title || "New Chat",
createdAt: Date.now(),
updatedAt: Date.now(),
createdAt: now,
updatedAt: now,
messageCount: 0,
...(body.filePath ? { filePath: body.filePath } : {}),
workspaceName,
workspaceRoot,
workspaceAgentId,
chatAgentId,
gatewaySessionKey,
agentMode: chatAgentId ? "ephemeral" : "workspace",
lastActiveAt: now,
};
const sessions = readIndex();

View File

@ -10,6 +10,20 @@ export type WebSessionMeta = {
messageCount: number;
/** When set, this session is scoped to a specific workspace file. */
filePath?: string;
/** Workspace name at session creation time (pinned). */
workspaceName?: string;
/** Workspace root directory at session creation time. */
workspaceRoot?: string;
/** The workspace's durable agent ID (e.g. "main"). */
workspaceAgentId?: string;
/** Ephemeral chat-specific agent ID, if one was allocated. */
chatAgentId?: string;
/** The full gateway session key used for this chat. */
gatewaySessionKey?: string;
/** Which agent model is in use: "workspace" (shared) or "ephemeral" (per-chat). */
agentMode?: "workspace" | "ephemeral";
/** Last time the session had active traffic. */
lastActiveAt?: number;
};
export function ensureDir() {
@ -86,3 +100,22 @@ export function writeIndex(sessions: WebSessionMeta[]) {
const dir = ensureDir();
writeFileSync(join(dir, "index.json"), JSON.stringify(sessions, null, 2));
}
/** Look up a session's pinned metadata by ID. Returns undefined for unknown sessions. */
export function getSessionMeta(sessionId: string): WebSessionMeta | undefined {
return readIndex().find((s) => s.id === sessionId);
}
/** Resolve the effective agent ID for a session.
* Uses pinned metadata when available, falls back to workspace-global resolution. */
export function resolveSessionAgentId(sessionId: string, fallbackAgentId: string): string {
const meta = getSessionMeta(sessionId);
return meta?.chatAgentId ?? meta?.workspaceAgentId ?? fallbackAgentId;
}
/** Resolve the gateway session key for a session.
* Uses pinned metadata when available, constructs from the given agent ID otherwise. */
export function resolveSessionKey(sessionId: string, fallbackAgentId: string): string {
const meta = getSessionMeta(sessionId);
return meta?.gatewaySessionKey ?? `agent:${fallbackAgentId}:web:${sessionId}`;
}

View File

@ -24,6 +24,7 @@ vi.mock("@/lib/workspace", () => ({
isValidWorkspaceName: vi.fn(() => true),
resolveWorkspaceRoot: vi.fn(() => null),
ensureAgentInConfig: vi.fn(),
ensureChatAgentPool: vi.fn(),
}));
describe("POST /api/workspace/init", () => {

View File

@ -14,6 +14,7 @@ import {
isValidWorkspaceName,
resolveWorkspaceRoot,
ensureAgentInConfig,
ensureChatAgentPool,
} from "@/lib/workspace";
import {
BOOTSTRAP_TEMPLATE_CONTENT,
@ -204,6 +205,9 @@ export async function POST(req: Request) {
// Register a per-workspace agent in openclaw.json and make it the default.
ensureAgentInConfig(workspaceName, workspaceDir);
// Pre-create a pool of chat agent slots for concurrent web chat sessions.
ensureChatAgentPool(workspaceName, workspaceDir);
// Switch the UI to the new workspace.
setUIActiveWorkspace(workspaceName);
const activeWorkspace = getActiveWorkspaceName();

View File

@ -957,6 +957,31 @@ export const ChatPanel = forwardRef<ChatPanelHandle, ChatPanelProps>(
status === "submitted" ||
isReconnecting;
// Stream stall detection: if we stay in "submitted" (no first
// token received) for too long, surface an error and reset.
const stallTimerRef = useRef<ReturnType<typeof setTimeout> | null>(null);
useEffect(() => {
if (stallTimerRef.current) {
clearTimeout(stallTimerRef.current);
stallTimerRef.current = null;
}
if (status === "submitted") {
stallTimerRef.current = setTimeout(() => {
stallTimerRef.current = null;
if (status === "submitted") {
setStreamError("Request timed out — no response from agent. Try again or check the gateway.");
void stop();
}
}, 90_000);
}
return () => {
if (stallTimerRef.current) {
clearTimeout(stallTimerRef.current);
stallTimerRef.current = null;
}
};
}, [status, stop]);
// Auto-scroll to bottom on new messages, but only when the user
// is already near the bottom. If the user scrolls up during
// streaming, we stop auto-scrolling until they return to the
@ -1042,8 +1067,8 @@ export const ChatPanel = forwardRef<ChatPanelHandle, ChatPanelProps>(
}
// If the run already completed (still in the grace
// period), skip the expensive SSE replay -- the
// persisted messages we already loaded are final.
// period), skip the SSE replay -- the persisted
// messages we already loaded are final.
if (res.headers.get("X-Run-Active") === "false") {
void res.body.cancel();
return false;
@ -1265,19 +1290,21 @@ export const ChatPanel = forwardRef<ChatPanelHandle, ChatPanelProps>(
// eslint-disable-next-line react-hooks/exhaustive-deps -- stable setters
}, [filePath, attemptReconnect]);
// ── Non-file panel: auto-restore session on mount ──
// When the main ChatPanel remounts after navigation (e.g. user viewed
// a file then returned to chat), re-load the previously active session
// and reconnect to any active stream.
// ── Non-file panel: auto-restore session on mount or URL change ──
const initialSessionHandled = useRef(false);
const lastInitialSessionRef = useRef<string | null>(null);
useEffect(() => {
if (filePath || isSubagentMode || !initialSessionId || initialSessionHandled.current) {
if (filePath || isSubagentMode || !initialSessionId) {
return;
}
if (initialSessionHandled.current && initialSessionId === lastInitialSessionRef.current) {
return;
}
initialSessionHandled.current = true;
lastInitialSessionRef.current = initialSessionId;
void handleSessionSelect(initialSessionId);
// eslint-disable-next-line react-hooks/exhaustive-deps -- run once on mount
}, []);
// eslint-disable-next-line react-hooks/exhaustive-deps -- intentionally re-run when initialSessionId changes
}, [initialSessionId]);
// ── Subagent mode: load persisted messages + reconnect to active stream ──
useEffect(() => {

View File

@ -2,5 +2,8 @@ export async function register() {
if (process.env.NEXT_RUNTIME === "nodejs") {
const { startTerminalServer } = await import("./lib/terminal-server");
startTerminalServer(Number(process.env.TERMINAL_WS_PORT) || 3101);
const { startChatAgentGc } = await import("./lib/chat-agent-registry");
startChatAgentGc();
}
}

View File

@ -2,6 +2,17 @@ import { type ChildProcess } from "node:child_process";
import { PassThrough } from "node:stream";
import { describe, it, expect, vi, beforeEach, afterEach } from "vitest";
// Mock workspace to prevent disk I/O and provide stable agent IDs
vi.mock("./workspace", () => ({
resolveWebChatDir: vi.fn(() => "/tmp/mock-web-chat"),
resolveOpenClawStateDir: vi.fn(() => "/tmp/mock-state"),
resolveActiveAgentId: vi.fn(() => "main"),
}));
vi.mock("./chat-agent-registry", () => ({
markChatAgentIdle: vi.fn(),
}));
// Mock agent-runner to control spawnAgentProcess
vi.mock("./agent-runner", () => ({
spawnAgentProcess: vi.fn(),
@ -104,6 +115,16 @@ describe("active-runs", () => {
beforeEach(() => {
vi.resetModules();
vi.mock("./workspace", () => ({
resolveWebChatDir: vi.fn(() => "/tmp/mock-web-chat"),
resolveOpenClawStateDir: vi.fn(() => "/tmp/mock-state"),
resolveActiveAgentId: vi.fn(() => "main"),
}));
vi.mock("./chat-agent-registry", () => ({
markChatAgentIdle: vi.fn(),
}));
// Re-wire mocks after resetModules
vi.mock("./agent-runner", () => ({
spawnAgentProcess: vi.fn(),
@ -1470,4 +1491,88 @@ describe("active-runs", () => {
child._emit("close", 0);
});
});
// ── Pinned agent ID ────────────────────────────────────────────────
describe("pinned agent identity", () => {
it("captures pinnedAgentId and pinnedSessionKey at run creation", async () => {
const { startRun, getActiveRun } = await setup();
startRun({
sessionId: "s-pin",
message: "hello",
agentSessionId: "s-pin",
});
const run = getActiveRun("s-pin");
expect(run).toBeDefined();
expect(run?.pinnedAgentId).toBe("main");
expect(run?.pinnedSessionKey).toBe("agent:main:web:s-pin");
});
it("uses overrideAgentId when provided", async () => {
const { startRun, getActiveRun } = await setup();
startRun({
sessionId: "s-override",
message: "hello",
agentSessionId: "s-override",
overrideAgentId: "chat-abc123",
});
const run = getActiveRun("s-override");
expect(run?.pinnedAgentId).toBe("chat-abc123");
expect(run?.pinnedSessionKey).toBe("agent:chat-abc123:web:s-override");
});
});
// ── Chat frame forwarding ─────────────────────────────────────────
describe("chat frame handling", () => {
it("processes chat final events with assistant text", async () => {
const { child, startRun, subscribeToRun } = await setup();
const events: SseEvent[] = [];
startRun({
sessionId: "s-chat-frame",
message: "run",
agentSessionId: "s-chat-frame",
});
subscribeToRun(
"s-chat-frame",
(event) => {
if (event) {events.push(event);}
},
{ replay: false },
);
child._writeLine({
event: "chat",
data: {
state: "final",
message: {
role: "assistant",
content: [{ type: "text", text: "Chat final text." }],
},
},
});
await new Promise((r) => setTimeout(r, 50));
expect(
events.some(
(e) =>
e.type === "text-delta" &&
typeof e.delta === "string" &&
e.delta.includes("Chat final text."),
),
).toBe(true);
child.stdout.end();
await new Promise((r) => setTimeout(r, 50));
child._emit("close", 0);
});
});
});

View File

@ -23,6 +23,7 @@ import {
writeFile,
} from "node:fs/promises";
import { resolveWebChatDir, resolveOpenClawStateDir, resolveActiveAgentId } from "./workspace";
import { markChatAgentIdle } from "./chat-agent-registry";
import {
type AgentProcessHandle,
type AgentEvent,
@ -102,6 +103,10 @@ export type ActiveRun = {
_finalizeTimer?: ReturnType<typeof setTimeout> | null;
/** @internal short reconciliation window before waiting-run completion */
_waitingFinalizeTimer?: ReturnType<typeof setTimeout> | null;
/** Agent ID captured at run creation time. Used for abort, transcript enrichment. */
pinnedAgentId?: string;
/** Full gateway session key captured at run creation time. */
pinnedSessionKey?: string;
};
// ── Constants ──
@ -113,6 +118,8 @@ const SUBSCRIBE_RETRY_BASE_MS = 300;
const SUBSCRIBE_RETRY_MAX_MS = 5_000;
const SUBSCRIBE_LIFECYCLE_END_GRACE_MS = 750;
const WAITING_FINALIZE_RECONCILE_MS = 5_000;
const MAX_WAITING_DURATION_MS = 10 * 60_000;
const SUBAGENT_REGISTRY_STALENESS_MS = 15 * 60_000;
const SILENT_REPLY_TOKEN = "NO_REPLY";
@ -275,10 +282,13 @@ export async function hasRunningSubagentsForParent(parentWebSessionId: string):
const runs = raw?.runs;
if (!runs) {return false;}
const parentKeyPattern = `:web:${parentWebSessionId}`;
const now = Date.now();
for (const entry of Object.values(runs)) {
const requester = typeof entry.requesterSessionKey === "string" ? entry.requesterSessionKey : "";
if (!requester.endsWith(parentKeyPattern)) {continue;}
if (typeof entry.endedAt === "number") {continue;}
const createdAt = typeof entry.createdAt === "number" ? entry.createdAt : 0;
if (createdAt > 0 && now - createdAt > SUBAGENT_REGISTRY_STALENESS_MS) {continue;}
return true;
}
} catch { /* ignore */ }
@ -505,7 +515,9 @@ export function abortRun(sessionId: string): boolean {
*/
function sendGatewayAbort(sessionId: string): void {
try {
const sessionKey = `agent:${resolveActiveAgentId()}:web:${sessionId}`;
const run = activeRuns.get(sessionId);
const agentId = run?.pinnedAgentId ?? resolveActiveAgentId();
const sessionKey = run?.pinnedSessionKey ?? `agent:${agentId}:web:${sessionId}`;
void callGatewayRpc("chat.abort", { sessionKey }, { timeoutMs: 4_000 }).catch(
() => {
// Best effort; don't let abort failures break the stop flow.
@ -524,8 +536,10 @@ export function startRun(params: {
sessionId: string;
message: string;
agentSessionId?: string;
/** Use a specific agent ID instead of the workspace default. */
overrideAgentId?: string;
}): ActiveRun {
const { sessionId, message, agentSessionId } = params;
const { sessionId, message, agentSessionId, overrideAgentId } = params;
const existing = activeRuns.get(sessionId);
if (existing?.status === "running") {
@ -534,8 +548,12 @@ export function startRun(params: {
// Clean up a finished run that's still in the grace period.
if (existing) {cleanupRun(sessionId);}
const agentId = overrideAgentId ?? resolveActiveAgentId();
const sessionKey = agentSessionId
? `agent:${agentId}:web:${agentSessionId}`
: undefined;
const abortController = new AbortController();
const child = spawnAgentProcess(message, agentSessionId);
const child = spawnAgentProcess(message, agentSessionId, overrideAgentId);
const run: ActiveRun = {
sessionId,
@ -557,6 +575,8 @@ export function startRun(params: {
_subscribeRetryTimer: null,
_subscribeRetryAttempt: 0,
_waitingFinalizeTimer: null,
pinnedAgentId: agentId,
pinnedSessionKey: sessionKey,
};
activeRuns.set(sessionId, run);
@ -657,9 +677,10 @@ type TranscriptToolPart = {
function readLatestTranscriptToolParts(
sessionKey: string,
pinnedAgentId?: string,
): { sessionId: string; tools: TranscriptToolPart[] } | null {
const stateDir = resolveOpenClawStateDir();
const agentId = resolveActiveAgentId();
const agentId = pinnedAgentId ?? resolveActiveAgentId();
const sessionsJsonPath = join(stateDir, "agents", agentId, "sessions", "sessions.json");
if (!existsSync(sessionsJsonPath)) { return null; }
const sessions = JSON.parse(readFileSync(sessionsJsonPath, "utf-8")) as Record<string, Record<string, unknown>>;
@ -829,7 +850,7 @@ function wireSubscribeOnlyProcess(
if (liveStats.toolStartCount > 0) {
return;
}
const bundle = readLatestTranscriptToolParts(sessionKey);
const bundle = readLatestTranscriptToolParts(sessionKey, run.pinnedAgentId);
if (!bundle) {
return;
}
@ -1145,7 +1166,8 @@ function finalizeSubscribeRun(run: ActiveRun, status: "completed" | "error" = "c
// backfill tool-invocation parts from the transcript into the web-chat JSONL.
if (run.isSubscribeOnly && run.sessionKey && !hasToolParts) {
const sessionKey = run.sessionKey;
setTimeout(() => { deferredTranscriptEnrich(sessionKey); }, 2_000);
const agentId = run.pinnedAgentId;
setTimeout(() => { deferredTranscriptEnrich(sessionKey, agentId); }, 2_000);
}
const grace = run.isSubscribeOnly ? SUBSCRIBE_CLEANUP_GRACE_MS : CLEANUP_GRACE_MS;
@ -1159,10 +1181,10 @@ function finalizeSubscribeRun(run: ActiveRun, status: "completed" | "error" = "c
* tool-invocation parts into the web-chat JSONL for a subagent session.
* Matches tools to assistant messages by text content to avoid index-mapping issues.
*/
function deferredTranscriptEnrich(sessionKey: string): void {
function deferredTranscriptEnrich(sessionKey: string, pinnedAgentId?: string): void {
try {
const stateDir = resolveOpenClawStateDir();
const agentId = resolveActiveAgentId();
const agentId = pinnedAgentId ?? resolveActiveAgentId();
const sessionsJsonPath = join(stateDir, "agents", agentId, "sessions", "sessions.json");
if (!existsSync(sessionsJsonPath)) {return;}
@ -1499,7 +1521,8 @@ function wireChildProcess(run: ActiveRun): void {
// ── Parse stdout JSON lines ──
const rl = createInterface({ input: child.stdout! });
const parentSessionKey = `agent:${resolveActiveAgentId()}:web:${run.sessionId}`;
const pinnedAgent = run.pinnedAgentId ?? resolveActiveAgentId();
const parentSessionKey = run.pinnedSessionKey ?? `agent:${pinnedAgent}:web:${run.sessionId}`;
// Prevent unhandled 'error' events on the readline interface.
// When the child process fails to start (e.g. ENOENT — missing script)
// the stdout pipe is destroyed and readline re-emits the error. Without
@ -1935,12 +1958,22 @@ function wireChildProcess(run: ActiveRun): void {
}
flushPersistence(run).catch(() => {});
startParentSubscribeStream(run, parentSessionKey, processParentSubscribeEvent);
// Safety: force-finalize if waiting exceeds the maximum duration
setTimeout(() => {
if (run.status === "waiting-for-subagents") {
finalizeWaitingRun(run);
}
}, MAX_WAITING_DURATION_MS);
return;
}
// Normal completion path.
run.status = exitedClean ? "completed" : "error";
// Release the chat agent pool slot so it can be reused.
try { markChatAgentIdle(run.sessionId); } catch { /* best-effort */ }
// Final persistence flush (removes _streaming flag).
flushPersistence(run).catch(() => {});
@ -2073,6 +2106,8 @@ function finalizeWaitingRun(run: ActiveRun): void {
stopSubscribeProcess(run);
try { markChatAgentIdle(run.sessionId); } catch { /* best-effort */ }
flushPersistence(run).catch(() => {});
for (const sub of run.subscribers) {

View File

@ -603,7 +603,6 @@ class GatewayProcessHandle
...(sessionKey ? { sessionKey } : {}),
deliver: false,
channel: "webchat",
thinking: "xhigh",
lane: this.params.lane ?? "web",
timeout: 0,
});
@ -662,21 +661,33 @@ class GatewayProcessHandle
if (!this.client || !sessionKey.trim()) {
return;
}
const patchParams: Record<string, string> = {
key: sessionKey,
thinkingLevel: "xhigh",
verboseLevel: "full",
reasoningLevel: "on",
};
let attempt = 0;
let lastMessage = "";
while (attempt < SESSIONS_PATCH_MAX_ATTEMPTS) {
attempt += 1;
try {
const patch = await this.client.request("sessions.patch", {
key: sessionKey,
thinkingLevel: "xhigh",
verboseLevel: "full",
reasoningLevel: "on",
});
const patch = await this.client.request("sessions.patch", patchParams);
if (patch.ok) {
return;
}
lastMessage = frameErrorMessage(patch);
// If the error indicates thinkingLevel is unsupported for the
// current model, retry without it rather than failing entirely.
if (lastMessage.includes("thinkingLevel") && patchParams.thinkingLevel) {
delete patchParams.thinkingLevel;
attempt = 0;
continue;
}
if (
attempt >= SESSIONS_PATCH_MAX_ATTEMPTS ||
!isRetryableGatewayMessage(lastMessage)
@ -686,6 +697,13 @@ class GatewayProcessHandle
} catch (error) {
lastMessage =
error instanceof Error ? error.message : String(error);
if (lastMessage.includes("thinkingLevel") && patchParams.thinkingLevel) {
delete patchParams.thinkingLevel;
attempt = 0;
continue;
}
if (
attempt >= SESSIONS_PATCH_MAX_ATTEMPTS ||
!isRetryableGatewayMessage(lastMessage)
@ -781,6 +799,36 @@ class GatewayProcessHandle
return;
}
if (frame.event === "chat") {
// Only forward chat frames in subscribe mode. In start mode
// the agent stream already delivers the full assistant text;
// forwarding chat finals would duplicate the output.
if (this.params.mode !== "subscribe") {
return;
}
const payload = asRecord(frame.payload) ?? {};
const sessionKey =
typeof payload.sessionKey === "string" ? payload.sessionKey : undefined;
if (!this.shouldAcceptSessionEvent(sessionKey)) {
return;
}
const payloadGlobalSeq =
typeof payload.globalSeq === "number" ? payload.globalSeq : undefined;
const eventGlobalSeq =
payloadGlobalSeq ??
(typeof frame.seq === "number" ? frame.seq : undefined);
const event: AgentEvent = {
event: "chat",
data: payload,
...(typeof eventGlobalSeq === "number"
? { globalSeq: eventGlobalSeq }
: {}),
...(sessionKey ? { sessionKey } : {}),
};
(this.stdout as PassThrough).write(`${JSON.stringify(event)}\n`);
return;
}
if (frame.event === "error") {
const payload = asRecord(frame.payload) ?? {};
const sessionKey =
@ -886,12 +934,14 @@ export async function callGatewayRpc(
/**
* Start an agent run via the Gateway WebSocket and return a process handle.
* @param overrideAgentId - Use a specific agent ID instead of the workspace default.
*/
export function spawnAgentProcess(
message: string,
agentSessionId?: string,
overrideAgentId?: string,
): AgentProcessHandle {
const agentId = resolveActiveAgentId();
const agentId = overrideAgentId ?? resolveActiveAgentId();
const sessionKey = agentSessionId
? `agent:${agentId}:web:${agentSessionId}`
: undefined;

View File

@ -0,0 +1,185 @@
import { describe, it, expect, vi, beforeEach, afterEach } from "vitest";
import { mkdtempSync, writeFileSync, readFileSync, existsSync, mkdirSync, rmSync } from "node:fs";
import { join } from "node:path";
import { tmpdir } from "node:os";
let tempDir: string;
vi.mock("./workspace", () => ({
resolveOpenClawStateDir: () => tempDir,
resolveActiveAgentId: () => "main",
getChatSlotAgentIds: () => ["chat-slot-main-1", "chat-slot-main-2", "chat-slot-main-3"],
}));
describe("chat-agent-registry", () => {
beforeEach(() => {
vi.resetModules();
tempDir = mkdtempSync(join(tmpdir(), "chat-agent-test-"));
vi.mock("./workspace", () => ({
resolveOpenClawStateDir: () => tempDir,
resolveActiveAgentId: () => "main",
getChatSlotAgentIds: () => ["chat-slot-main-1", "chat-slot-main-2", "chat-slot-main-3"],
}));
const configPath = join(tempDir, "openclaw.json");
writeFileSync(
configPath,
JSON.stringify({
agents: {
list: [
{ id: "main", workspace: "/tmp/ws", default: true },
],
},
}),
);
});
afterEach(() => {
vi.restoreAllMocks();
try { rmSync(tempDir, { recursive: true, force: true }); } catch { /* ignore */ }
});
it("allocates a pool slot agent and writes registry", async () => {
const { allocateChatAgent, getChatAgent } = await import("./chat-agent-registry.js");
const entry = allocateChatAgent("session-1");
expect(entry.chatAgentId).toBe("chat-slot-main-1");
expect(entry.workspaceAgentId).toBe("main");
expect(entry.state).toBe("active");
expect(entry.sessionId).toBe("session-1");
const stored = getChatAgent("session-1");
expect(stored?.chatAgentId).toBe(entry.chatAgentId);
});
it("reuses existing slot on second allocate for same session", async () => {
const { allocateChatAgent } = await import("./chat-agent-registry.js");
const first = allocateChatAgent("session-2");
const second = allocateChatAgent("session-2");
expect(second.chatAgentId).toBe(first.chatAgentId);
expect(second.state).toBe("active");
});
it("assigns different pool slots to concurrent sessions", async () => {
const { allocateChatAgent } = await import("./chat-agent-registry.js");
const a = allocateChatAgent("session-concurrent-a");
const b = allocateChatAgent("session-concurrent-b");
expect(a.chatAgentId).not.toBe(b.chatAgentId);
expect(a.chatAgentId).toBe("chat-slot-main-1");
expect(b.chatAgentId).toBe("chat-slot-main-2");
});
it("falls back to workspace agent when all slots are occupied", async () => {
const { allocateChatAgent } = await import("./chat-agent-registry.js");
allocateChatAgent("s-fill-1");
allocateChatAgent("s-fill-2");
allocateChatAgent("s-fill-3");
const overflow = allocateChatAgent("s-fill-4");
expect(overflow.chatAgentId).toBe("main");
});
it("marks agent idle and back to active on touch", async () => {
const { allocateChatAgent, markChatAgentIdle, touchChatAgent, getChatAgent } =
await import("./chat-agent-registry.js");
allocateChatAgent("session-3");
markChatAgentIdle("session-3");
expect(getChatAgent("session-3")?.state).toBe("idle");
touchChatAgent("session-3");
expect(getChatAgent("session-3")?.state).toBe("active");
});
it("expires idle agents past TTL and releases the slot", async () => {
const { allocateChatAgent, getChatAgent, expireIdleChatAgents } =
await import("./chat-agent-registry.js");
const entry = allocateChatAgent("session-4", { idleTtlMs: 1 });
expect(entry.state).toBe("active");
const registry = JSON.parse(readFileSync(join(tempDir, "chat-agents.json"), "utf-8"));
registry.agents["session-4"].lastActiveAt = Date.now() - 1000;
writeFileSync(join(tempDir, "chat-agents.json"), JSON.stringify(registry));
const expired = expireIdleChatAgents();
expect(expired).toContain("session-4");
expect(getChatAgent("session-4")?.state).toBe("expired");
});
it("resumes expired agent transparently", async () => {
const { allocateChatAgent, expireIdleChatAgents, resumeExpiredChatAgent, getChatAgent } =
await import("./chat-agent-registry.js");
allocateChatAgent("session-5", { idleTtlMs: 1 });
const registry = JSON.parse(readFileSync(join(tempDir, "chat-agents.json"), "utf-8"));
registry.agents["session-5"].lastActiveAt = Date.now() - 1000;
writeFileSync(join(tempDir, "chat-agents.json"), JSON.stringify(registry));
expireIdleChatAgents();
expect(getChatAgent("session-5")?.state).toBe("expired");
const resumed = resumeExpiredChatAgent("session-5");
expect(resumed?.state).toBe("active");
expect(resumed?.chatAgentId).toMatch(/^chat-/);
});
it("deletes agent and releases slot", async () => {
const { allocateChatAgent, deleteChatAgent, getChatAgent } =
await import("./chat-agent-registry.js");
allocateChatAgent("session-6");
deleteChatAgent("session-6");
expect(getChatAgent("session-6")?.state).toBe("deleted");
});
it("purges deleted entries from registry", async () => {
const { allocateChatAgent, deleteChatAgent, purgeChatAgentRegistry, getChatAgent } =
await import("./chat-agent-registry.js");
allocateChatAgent("session-7");
deleteChatAgent("session-7");
expect(getChatAgent("session-7")?.state).toBe("deleted");
const purged = purgeChatAgentRegistry();
expect(purged).toBe(1);
expect(getChatAgent("session-7")).toBeUndefined();
});
it("ensureChatAgentForSend resumes expired agent", async () => {
const { allocateChatAgent, expireIdleChatAgents, ensureChatAgentForSend } =
await import("./chat-agent-registry.js");
allocateChatAgent("session-8", { idleTtlMs: 1 });
const registry = JSON.parse(readFileSync(join(tempDir, "chat-agents.json"), "utf-8"));
registry.agents["session-8"].lastActiveAt = Date.now() - 1000;
writeFileSync(join(tempDir, "chat-agents.json"), JSON.stringify(registry));
expireIdleChatAgents();
const agentId = ensureChatAgentForSend("session-8");
expect(agentId).toMatch(/^chat-/);
});
it("ensureChatAgentForSend returns undefined for unknown session", async () => {
const { ensureChatAgentForSend } = await import("./chat-agent-registry.js");
expect(ensureChatAgentForSend("nonexistent")).toBeUndefined();
});
it("listChatAgents returns all entries", async () => {
const { allocateChatAgent, listChatAgents } = await import("./chat-agent-registry.js");
allocateChatAgent("session-a");
allocateChatAgent("session-b");
const all = listChatAgents();
expect(all.length).toBe(2);
expect(all.map((e) => e.sessionId).sort()).toEqual(["session-a", "session-b"]);
});
});

View File

@ -0,0 +1,259 @@
/**
* Chat-agent registry: assigns web chat sessions to pre-created agent
* pool slots so concurrent chats each get their own gateway agent.
*
* Architecture:
* - Each workspace has one durable "workspace agent" (e.g. "kumareth").
* - A pool of chat agent slots (e.g. "chat-slot-kumareth-1" through "-5")
* is pre-created in openclaw.json at workspace init time.
* - Each new web chat session is assigned an available slot from the pool.
* - When a slot is released (chat completes or is deleted), it becomes
* available for the next session.
* - If all slots are occupied, falls back to the workspace agent.
*/
import { existsSync, readFileSync, writeFileSync, mkdirSync } from "node:fs";
import { join } from "node:path";
import { resolveOpenClawStateDir, resolveActiveAgentId, getChatSlotAgentIds } from "./workspace";
const DEFAULT_IDLE_TTL_MS = 30 * 60_000;
export type ChatAgentState = "active" | "idle" | "expired" | "deleted";
export type ChatAgentEntry = {
chatAgentId: string;
workspaceAgentId: string;
sessionId: string;
workspaceDir: string;
state: ChatAgentState;
createdAt: number;
lastActiveAt: number;
idleTtlMs: number;
};
type ChatAgentRegistryData = {
version: number;
agents: Record<string, ChatAgentEntry>;
};
function registryPath(): string {
return join(resolveOpenClawStateDir(), "chat-agents.json");
}
function readRegistry(): ChatAgentRegistryData {
const fp = registryPath();
if (!existsSync(fp)) {
return { version: 1, agents: {} };
}
try {
return JSON.parse(readFileSync(fp, "utf-8")) as ChatAgentRegistryData;
} catch {
return { version: 1, agents: {} };
}
}
function writeRegistry(data: ChatAgentRegistryData): void {
const fp = registryPath();
const dir = join(fp, "..");
if (!existsSync(dir)) {
mkdirSync(dir, { recursive: true });
}
writeFileSync(fp, JSON.stringify(data, null, 2) + "\n", "utf-8");
}
/**
* Assign a web chat session to an available pool slot agent.
* Does NOT write to openclaw.json -- slots are pre-created at workspace init.
* Falls back to the workspace agent if no slots are available.
*/
export function allocateChatAgent(
sessionId: string,
options?: { idleTtlMs?: number },
): ChatAgentEntry {
const registry = readRegistry();
const existing = registry.agents[sessionId];
if (existing && existing.state !== "expired" && existing.state !== "deleted") {
existing.lastActiveAt = Date.now();
existing.state = "active";
writeRegistry(registry);
return existing;
}
const workspaceAgentId = resolveActiveAgentId();
const now = Date.now();
// Find available pool slots (not assigned to an active/idle session)
const allSlots = getChatSlotAgentIds();
const occupiedSlots = new Set(
Object.values(registry.agents)
.filter((e) => e.state === "active" || e.state === "idle")
.map((e) => e.chatAgentId),
);
const availableSlot = allSlots.find((s) => !occupiedSlots.has(s));
const chatAgentId = availableSlot ?? workspaceAgentId;
const entry: ChatAgentEntry = {
chatAgentId,
workspaceAgentId,
sessionId,
workspaceDir: "",
state: "active",
createdAt: now,
lastActiveAt: now,
idleTtlMs: options?.idleTtlMs ?? DEFAULT_IDLE_TTL_MS,
};
registry.agents[sessionId] = entry;
writeRegistry(registry);
return entry;
}
/** Look up a chat agent entry by session ID. */
export function getChatAgent(sessionId: string): ChatAgentEntry | undefined {
const registry = readRegistry();
return registry.agents[sessionId];
}
/** Touch the lastActiveAt timestamp for a chat agent. */
export function touchChatAgent(sessionId: string): void {
const registry = readRegistry();
const entry = registry.agents[sessionId];
if (!entry) {return;}
entry.lastActiveAt = Date.now();
if (entry.state === "idle") {
entry.state = "active";
}
writeRegistry(registry);
}
/** Mark a chat agent as idle. */
export function markChatAgentIdle(sessionId: string): void {
const registry = readRegistry();
const entry = registry.agents[sessionId];
if (!entry || entry.state === "expired" || entry.state === "deleted") {return;}
entry.state = "idle";
writeRegistry(registry);
}
/** Release a chat agent slot back to the pool. */
export function deleteChatAgent(sessionId: string): void {
const registry = readRegistry();
const entry = registry.agents[sessionId];
if (!entry) {return;}
entry.state = "deleted";
writeRegistry(registry);
}
/**
* Expire chat agents that have been idle longer than their TTL.
* Returns the list of expired session IDs.
*/
export function expireIdleChatAgents(): string[] {
const registry = readRegistry();
const now = Date.now();
const expired: string[] = [];
for (const [sessionId, entry] of Object.entries(registry.agents)) {
if (entry.state === "expired" || entry.state === "deleted") {continue;}
const idleSince = now - entry.lastActiveAt;
if (idleSince > entry.idleTtlMs) {
entry.state = "expired";
expired.push(sessionId);
}
}
if (expired.length > 0) {
writeRegistry(registry);
}
return expired;
}
/**
* Try to resume an expired chat agent by re-allocating it.
* Returns the new entry, or undefined if the session has no prior agent.
*/
export function resumeExpiredChatAgent(
sessionId: string,
options?: { idleTtlMs?: number },
): ChatAgentEntry | undefined {
const registry = readRegistry();
const existing = registry.agents[sessionId];
if (!existing) {return undefined;}
if (existing.state !== "expired") {return existing;}
return allocateChatAgent(sessionId, {
idleTtlMs: options?.idleTtlMs ?? existing.idleTtlMs,
});
}
/** List all chat agent entries (for diagnostics). */
export function listChatAgents(): ChatAgentEntry[] {
const registry = readRegistry();
return Object.values(registry.agents);
}
/** Clean up deleted entries from the registry. */
export function purgeChatAgentRegistry(): number {
const registry = readRegistry();
let count = 0;
for (const [sessionId, entry] of Object.entries(registry.agents)) {
if (entry.state === "deleted") {
delete registry.agents[sessionId];
count++;
}
}
if (count > 0) {
writeRegistry(registry);
}
return count;
}
// ── Periodic GC ──
const GC_INTERVAL_MS = 5 * 60_000;
let gcTimer: ReturnType<typeof setInterval> | null = null;
/** Start the background idle-GC interval (idempotent). */
export function startChatAgentGc(): void {
if (gcTimer) {return;}
gcTimer = setInterval(() => {
try {
expireIdleChatAgents();
purgeChatAgentRegistry();
} catch {
// Best-effort background cleanup
}
}, GC_INTERVAL_MS);
if (typeof gcTimer === "object" && "unref" in gcTimer) {
gcTimer.unref();
}
}
/** Stop the background GC interval. */
export function stopChatAgentGc(): void {
if (gcTimer) {
clearInterval(gcTimer);
gcTimer = null;
}
}
/**
* Ensure a chat agent is valid for sending a message.
* If the agent expired, re-allocate it transparently.
* Returns the effective agent ID to use.
*/
export function ensureChatAgentForSend(sessionId: string): string | undefined {
const entry = getChatAgent(sessionId);
if (!entry) {return undefined;}
if (entry.state === "deleted") {return undefined;}
if (entry.state === "expired") {
const resumed = resumeExpiredChatAgent(sessionId);
return resumed?.chatAgentId;
}
touchChatAgent(sessionId);
return entry.chatAgentId;
}
export { DEFAULT_IDLE_TTL_MS, GC_INTERVAL_MS };

View File

@ -362,6 +362,54 @@ export function ensureAgentInConfig(workspaceName: string, workspaceDir: string)
writeOpenClawConfig(config);
}
const CHAT_SLOT_PREFIX = "chat-slot-";
const DEFAULT_CHAT_POOL_SIZE = 5;
/**
* Pre-create a pool of chat agent slots in `agents.list[]` so the gateway
* knows about them at startup. Each slot shares the workspace directory
* of the parent workspace agent, enabling concurrent chat sessions.
*/
export function ensureChatAgentPool(workspaceName: string, workspaceDir: string, poolSize = DEFAULT_CHAT_POOL_SIZE): void {
const config = readOpenClawConfig();
if (!config.agents) { config.agents = {}; }
if (!Array.isArray(config.agents.list)) { config.agents.list = []; }
const baseId = workspaceNameToAgentId(workspaceName);
let changed = false;
for (let i = 1; i <= poolSize; i++) {
const slotId = `${CHAT_SLOT_PREFIX}${baseId}-${i}`;
const existing = config.agents.list.find((a) => a.id === slotId);
if (!existing) {
config.agents.list.push({ id: slotId, workspace: workspaceDir });
changed = true;
} else if (existing.workspace !== workspaceDir) {
existing.workspace = workspaceDir;
changed = true;
}
}
if (changed) {
writeOpenClawConfig(config);
}
}
/**
* Return the list of chat slot agent IDs for a workspace.
*/
export function getChatSlotAgentIds(workspaceName?: string): string[] {
const config = readOpenClawConfig();
const list = config.agents?.list;
if (!Array.isArray(list)) { return []; }
const baseId = workspaceNameToAgentId(workspaceName ?? getActiveWorkspaceName() ?? DEFAULT_WORKSPACE_NAME);
const prefix = `${CHAT_SLOT_PREFIX}${baseId}-`;
return list.filter((a) => a.id.startsWith(prefix)).map((a) => a.id);
}
export { CHAT_SLOT_PREFIX, DEFAULT_CHAT_POOL_SIZE };
/**
* Flip `default: true` to the target agent in `agents.list[]`.
* No-op if the list doesn't exist or the agent isn't found.