From 2c52012d1eeb333416104551f2c3a1ffe4aef0d8 Mon Sep 17 00:00:00 2001 From: kumarabhirup Date: Mon, 9 Mar 2026 08:58:50 -0700 Subject: [PATCH] feat(chat): fix infinite loading and enable concurrent sessions Chat panel froze because `thinking: "xhigh"` silently killed gateway runs, sessions used mutable global agent state, and OpenClaw's one-run-per-agent limit blocked concurrent chats. --- apps/web/app/api/chat/chat.test.ts | 22 ++ apps/web/app/api/chat/route.ts | 6 + apps/web/app/api/chat/subagents/route.ts | 7 +- apps/web/app/api/web-sessions/route.ts | 37 ++- apps/web/app/api/web-sessions/shared.ts | 33 +++ apps/web/app/api/workspace/init/route.test.ts | 1 + apps/web/app/api/workspace/init/route.ts | 4 + apps/web/app/components/chat-panel.tsx | 45 ++- apps/web/instrumentation.ts | 3 + apps/web/lib/active-runs.test.ts | 105 +++++++ apps/web/lib/active-runs.ts | 53 +++- apps/web/lib/agent-runner.ts | 66 ++++- apps/web/lib/chat-agent-registry.test.ts | 185 +++++++++++++ apps/web/lib/chat-agent-registry.ts | 259 ++++++++++++++++++ apps/web/lib/workspace.ts | 48 ++++ 15 files changed, 845 insertions(+), 29 deletions(-) create mode 100644 apps/web/lib/chat-agent-registry.test.ts create mode 100644 apps/web/lib/chat-agent-registry.ts diff --git a/apps/web/app/api/chat/chat.test.ts b/apps/web/app/api/chat/chat.test.ts index b6490ff9b9b..494483d30ab 100644 --- a/apps/web/app/api/chat/chat.test.ts +++ b/apps/web/app/api/chat/chat.test.ts @@ -16,6 +16,18 @@ vi.mock("@/lib/workspace", () => ({ resolveAgentWorkspacePrefix: vi.fn(() => null), })); +// Mock web-sessions shared module +vi.mock("@/app/api/web-sessions/shared", () => ({ + getSessionMeta: vi.fn(() => undefined), + resolveSessionKey: vi.fn( + (sessionId: string, fallbackAgentId: string) => + `agent:${fallbackAgentId}:web:${sessionId}`, + ), + resolveSessionAgentId: vi.fn( + (_sessionId: string, fallbackAgentId: string) => fallbackAgentId, + ), +})); + describe("Chat API routes", () => { beforeEach(() => { vi.resetModules(); @@ -32,6 +44,16 @@ describe("Chat API routes", () => { vi.mock("@/lib/workspace", () => ({ resolveAgentWorkspacePrefix: vi.fn(() => null), })); + vi.mock("@/app/api/web-sessions/shared", () => ({ + getSessionMeta: vi.fn(() => undefined), + resolveSessionKey: vi.fn( + (sessionId: string, fallbackAgentId: string) => + `agent:${fallbackAgentId}:web:${sessionId}`, + ), + resolveSessionAgentId: vi.fn( + (_sessionId: string, fallbackAgentId: string) => fallbackAgentId, + ), + })); }); afterEach(() => { diff --git a/apps/web/app/api/chat/route.ts b/apps/web/app/api/chat/route.ts index 8c94b95c3f6..f138ec0505f 100644 --- a/apps/web/app/api/chat/route.ts +++ b/apps/web/app/api/chat/route.ts @@ -15,6 +15,7 @@ import { trackServer } from "@/lib/telemetry"; import { existsSync, readFileSync } from "node:fs"; import { join } from "node:path"; import { resolveOpenClawStateDir } from "@/lib/workspace"; +import { getSessionMeta } from "@/app/api/web-sessions/shared"; export const runtime = "nodejs"; @@ -119,11 +120,16 @@ export async function POST(req: Request) { parts: lastUserMessage.parts as unknown[], html: userHtml, }); + + const sessionMeta = getSessionMeta(sessionId); + const effectiveAgentId = sessionMeta?.chatAgentId ?? sessionMeta?.workspaceAgentId; + try { startRun({ sessionId, message: agentMessage, agentSessionId: sessionId, + overrideAgentId: effectiveAgentId, }); } catch (err) { return new Response( diff --git a/apps/web/app/api/chat/subagents/route.ts b/apps/web/app/api/chat/subagents/route.ts index 95769db1b87..5578bf03a0e 100644 --- a/apps/web/app/api/chat/subagents/route.ts +++ b/apps/web/app/api/chat/subagents/route.ts @@ -1,6 +1,8 @@ import { existsSync, readFileSync } from "node:fs"; import { join } from "node:path"; import { resolveOpenClawStateDir, resolveActiveAgentId } from "@/lib/workspace"; +import { getActiveRun } from "@/lib/active-runs"; +import { resolveSessionKey } from "@/app/api/web-sessions/shared"; export const runtime = "nodejs"; @@ -38,7 +40,10 @@ export async function GET(req: Request) { return Response.json({ error: "sessionId required" }, { status: 400 }); } - const webSessionKey = `agent:${resolveActiveAgentId()}:web:${sessionId}`; + const run = getActiveRun(sessionId); + const fallbackAgentId = resolveActiveAgentId(); + const webSessionKey = run?.pinnedSessionKey + ?? resolveSessionKey(sessionId, fallbackAgentId); const entries = readSubagentRegistry(); const subagents = entries diff --git a/apps/web/app/api/web-sessions/route.ts b/apps/web/app/api/web-sessions/route.ts index a741e7b9b49..772dea7df67 100644 --- a/apps/web/app/api/web-sessions/route.ts +++ b/apps/web/app/api/web-sessions/route.ts @@ -2,6 +2,12 @@ import { writeFileSync } from "node:fs"; import { randomUUID } from "node:crypto"; import { trackServer } from "@/lib/telemetry"; import { type WebSessionMeta, ensureDir, readIndex, writeIndex } from "./shared"; +import { + getActiveWorkspaceName, + resolveActiveAgentId, + resolveWorkspaceRoot, +} from "@/lib/workspace"; +import { allocateChatAgent } from "@/lib/chat-agent-registry"; export { type WebSessionMeta }; @@ -26,13 +32,40 @@ export async function GET(req: Request) { export async function POST(req: Request) { const body = await req.json().catch(() => ({})); const id = randomUUID(); + const now = Date.now(); + + const workspaceName = getActiveWorkspaceName() ?? undefined; + const workspaceAgentId = resolveActiveAgentId(); + const workspaceRoot = resolveWorkspaceRoot() ?? undefined; + + // Assign a pool slot agent for concurrent chat support. + // Falls back to the workspace agent if no slots are available. + let chatAgentId: string | undefined; + let effectiveAgentId = workspaceAgentId; + try { + const slot = allocateChatAgent(id); + chatAgentId = slot.chatAgentId; + effectiveAgentId = slot.chatAgentId; + } catch { + // Fall back to workspace agent + } + + const gatewaySessionKey = `agent:${effectiveAgentId}:web:${id}`; + const session: WebSessionMeta = { id, title: body.title || "New Chat", - createdAt: Date.now(), - updatedAt: Date.now(), + createdAt: now, + updatedAt: now, messageCount: 0, ...(body.filePath ? { filePath: body.filePath } : {}), + workspaceName, + workspaceRoot, + workspaceAgentId, + chatAgentId, + gatewaySessionKey, + agentMode: chatAgentId ? "ephemeral" : "workspace", + lastActiveAt: now, }; const sessions = readIndex(); diff --git a/apps/web/app/api/web-sessions/shared.ts b/apps/web/app/api/web-sessions/shared.ts index 03073ed9c50..28fb2db4d89 100644 --- a/apps/web/app/api/web-sessions/shared.ts +++ b/apps/web/app/api/web-sessions/shared.ts @@ -10,6 +10,20 @@ export type WebSessionMeta = { messageCount: number; /** When set, this session is scoped to a specific workspace file. */ filePath?: string; + /** Workspace name at session creation time (pinned). */ + workspaceName?: string; + /** Workspace root directory at session creation time. */ + workspaceRoot?: string; + /** The workspace's durable agent ID (e.g. "main"). */ + workspaceAgentId?: string; + /** Ephemeral chat-specific agent ID, if one was allocated. */ + chatAgentId?: string; + /** The full gateway session key used for this chat. */ + gatewaySessionKey?: string; + /** Which agent model is in use: "workspace" (shared) or "ephemeral" (per-chat). */ + agentMode?: "workspace" | "ephemeral"; + /** Last time the session had active traffic. */ + lastActiveAt?: number; }; export function ensureDir() { @@ -86,3 +100,22 @@ export function writeIndex(sessions: WebSessionMeta[]) { const dir = ensureDir(); writeFileSync(join(dir, "index.json"), JSON.stringify(sessions, null, 2)); } + +/** Look up a session's pinned metadata by ID. Returns undefined for unknown sessions. */ +export function getSessionMeta(sessionId: string): WebSessionMeta | undefined { + return readIndex().find((s) => s.id === sessionId); +} + +/** Resolve the effective agent ID for a session. + * Uses pinned metadata when available, falls back to workspace-global resolution. */ +export function resolveSessionAgentId(sessionId: string, fallbackAgentId: string): string { + const meta = getSessionMeta(sessionId); + return meta?.chatAgentId ?? meta?.workspaceAgentId ?? fallbackAgentId; +} + +/** Resolve the gateway session key for a session. + * Uses pinned metadata when available, constructs from the given agent ID otherwise. */ +export function resolveSessionKey(sessionId: string, fallbackAgentId: string): string { + const meta = getSessionMeta(sessionId); + return meta?.gatewaySessionKey ?? `agent:${fallbackAgentId}:web:${sessionId}`; +} diff --git a/apps/web/app/api/workspace/init/route.test.ts b/apps/web/app/api/workspace/init/route.test.ts index 5af995c9f4b..7c5972ef06b 100644 --- a/apps/web/app/api/workspace/init/route.test.ts +++ b/apps/web/app/api/workspace/init/route.test.ts @@ -24,6 +24,7 @@ vi.mock("@/lib/workspace", () => ({ isValidWorkspaceName: vi.fn(() => true), resolveWorkspaceRoot: vi.fn(() => null), ensureAgentInConfig: vi.fn(), + ensureChatAgentPool: vi.fn(), })); describe("POST /api/workspace/init", () => { diff --git a/apps/web/app/api/workspace/init/route.ts b/apps/web/app/api/workspace/init/route.ts index 5d79274b567..79b0e95fbf9 100644 --- a/apps/web/app/api/workspace/init/route.ts +++ b/apps/web/app/api/workspace/init/route.ts @@ -14,6 +14,7 @@ import { isValidWorkspaceName, resolveWorkspaceRoot, ensureAgentInConfig, + ensureChatAgentPool, } from "@/lib/workspace"; import { BOOTSTRAP_TEMPLATE_CONTENT, @@ -204,6 +205,9 @@ export async function POST(req: Request) { // Register a per-workspace agent in openclaw.json and make it the default. ensureAgentInConfig(workspaceName, workspaceDir); + // Pre-create a pool of chat agent slots for concurrent web chat sessions. + ensureChatAgentPool(workspaceName, workspaceDir); + // Switch the UI to the new workspace. setUIActiveWorkspace(workspaceName); const activeWorkspace = getActiveWorkspaceName(); diff --git a/apps/web/app/components/chat-panel.tsx b/apps/web/app/components/chat-panel.tsx index 011fae2552a..e6fbb562c5d 100644 --- a/apps/web/app/components/chat-panel.tsx +++ b/apps/web/app/components/chat-panel.tsx @@ -957,6 +957,31 @@ export const ChatPanel = forwardRef( status === "submitted" || isReconnecting; + // Stream stall detection: if we stay in "submitted" (no first + // token received) for too long, surface an error and reset. + const stallTimerRef = useRef | null>(null); + useEffect(() => { + if (stallTimerRef.current) { + clearTimeout(stallTimerRef.current); + stallTimerRef.current = null; + } + if (status === "submitted") { + stallTimerRef.current = setTimeout(() => { + stallTimerRef.current = null; + if (status === "submitted") { + setStreamError("Request timed out — no response from agent. Try again or check the gateway."); + void stop(); + } + }, 90_000); + } + return () => { + if (stallTimerRef.current) { + clearTimeout(stallTimerRef.current); + stallTimerRef.current = null; + } + }; + }, [status, stop]); + // Auto-scroll to bottom on new messages, but only when the user // is already near the bottom. If the user scrolls up during // streaming, we stop auto-scrolling until they return to the @@ -1042,8 +1067,8 @@ export const ChatPanel = forwardRef( } // If the run already completed (still in the grace - // period), skip the expensive SSE replay -- the - // persisted messages we already loaded are final. + // period), skip the SSE replay -- the persisted + // messages we already loaded are final. if (res.headers.get("X-Run-Active") === "false") { void res.body.cancel(); return false; @@ -1265,19 +1290,21 @@ export const ChatPanel = forwardRef( // eslint-disable-next-line react-hooks/exhaustive-deps -- stable setters }, [filePath, attemptReconnect]); - // ── Non-file panel: auto-restore session on mount ── - // When the main ChatPanel remounts after navigation (e.g. user viewed - // a file then returned to chat), re-load the previously active session - // and reconnect to any active stream. + // ── Non-file panel: auto-restore session on mount or URL change ── const initialSessionHandled = useRef(false); + const lastInitialSessionRef = useRef(null); useEffect(() => { - if (filePath || isSubagentMode || !initialSessionId || initialSessionHandled.current) { + if (filePath || isSubagentMode || !initialSessionId) { + return; + } + if (initialSessionHandled.current && initialSessionId === lastInitialSessionRef.current) { return; } initialSessionHandled.current = true; + lastInitialSessionRef.current = initialSessionId; void handleSessionSelect(initialSessionId); - // eslint-disable-next-line react-hooks/exhaustive-deps -- run once on mount - }, []); + // eslint-disable-next-line react-hooks/exhaustive-deps -- intentionally re-run when initialSessionId changes + }, [initialSessionId]); // ── Subagent mode: load persisted messages + reconnect to active stream ── useEffect(() => { diff --git a/apps/web/instrumentation.ts b/apps/web/instrumentation.ts index 0138f0dd227..0911589845e 100644 --- a/apps/web/instrumentation.ts +++ b/apps/web/instrumentation.ts @@ -2,5 +2,8 @@ export async function register() { if (process.env.NEXT_RUNTIME === "nodejs") { const { startTerminalServer } = await import("./lib/terminal-server"); startTerminalServer(Number(process.env.TERMINAL_WS_PORT) || 3101); + + const { startChatAgentGc } = await import("./lib/chat-agent-registry"); + startChatAgentGc(); } } diff --git a/apps/web/lib/active-runs.test.ts b/apps/web/lib/active-runs.test.ts index a35b3dc8ba3..fdbf80f618f 100644 --- a/apps/web/lib/active-runs.test.ts +++ b/apps/web/lib/active-runs.test.ts @@ -2,6 +2,17 @@ import { type ChildProcess } from "node:child_process"; import { PassThrough } from "node:stream"; import { describe, it, expect, vi, beforeEach, afterEach } from "vitest"; +// Mock workspace to prevent disk I/O and provide stable agent IDs +vi.mock("./workspace", () => ({ + resolveWebChatDir: vi.fn(() => "/tmp/mock-web-chat"), + resolveOpenClawStateDir: vi.fn(() => "/tmp/mock-state"), + resolveActiveAgentId: vi.fn(() => "main"), +})); + +vi.mock("./chat-agent-registry", () => ({ + markChatAgentIdle: vi.fn(), +})); + // Mock agent-runner to control spawnAgentProcess vi.mock("./agent-runner", () => ({ spawnAgentProcess: vi.fn(), @@ -104,6 +115,16 @@ describe("active-runs", () => { beforeEach(() => { vi.resetModules(); + vi.mock("./workspace", () => ({ + resolveWebChatDir: vi.fn(() => "/tmp/mock-web-chat"), + resolveOpenClawStateDir: vi.fn(() => "/tmp/mock-state"), + resolveActiveAgentId: vi.fn(() => "main"), + })); + + vi.mock("./chat-agent-registry", () => ({ + markChatAgentIdle: vi.fn(), + })); + // Re-wire mocks after resetModules vi.mock("./agent-runner", () => ({ spawnAgentProcess: vi.fn(), @@ -1470,4 +1491,88 @@ describe("active-runs", () => { child._emit("close", 0); }); }); + + // ── Pinned agent ID ──────────────────────────────────────────────── + + describe("pinned agent identity", () => { + it("captures pinnedAgentId and pinnedSessionKey at run creation", async () => { + const { startRun, getActiveRun } = await setup(); + + startRun({ + sessionId: "s-pin", + message: "hello", + agentSessionId: "s-pin", + }); + + const run = getActiveRun("s-pin"); + expect(run).toBeDefined(); + expect(run?.pinnedAgentId).toBe("main"); + expect(run?.pinnedSessionKey).toBe("agent:main:web:s-pin"); + }); + + it("uses overrideAgentId when provided", async () => { + const { startRun, getActiveRun } = await setup(); + + startRun({ + sessionId: "s-override", + message: "hello", + agentSessionId: "s-override", + overrideAgentId: "chat-abc123", + }); + + const run = getActiveRun("s-override"); + expect(run?.pinnedAgentId).toBe("chat-abc123"); + expect(run?.pinnedSessionKey).toBe("agent:chat-abc123:web:s-override"); + }); + }); + + // ── Chat frame forwarding ───────────────────────────────────────── + + describe("chat frame handling", () => { + it("processes chat final events with assistant text", async () => { + const { child, startRun, subscribeToRun } = await setup(); + + const events: SseEvent[] = []; + + startRun({ + sessionId: "s-chat-frame", + message: "run", + agentSessionId: "s-chat-frame", + }); + + subscribeToRun( + "s-chat-frame", + (event) => { + if (event) {events.push(event);} + }, + { replay: false }, + ); + + child._writeLine({ + event: "chat", + data: { + state: "final", + message: { + role: "assistant", + content: [{ type: "text", text: "Chat final text." }], + }, + }, + }); + + await new Promise((r) => setTimeout(r, 50)); + + expect( + events.some( + (e) => + e.type === "text-delta" && + typeof e.delta === "string" && + e.delta.includes("Chat final text."), + ), + ).toBe(true); + + child.stdout.end(); + await new Promise((r) => setTimeout(r, 50)); + child._emit("close", 0); + }); + }); }); diff --git a/apps/web/lib/active-runs.ts b/apps/web/lib/active-runs.ts index 3fabbea2b22..9a1e0798f95 100644 --- a/apps/web/lib/active-runs.ts +++ b/apps/web/lib/active-runs.ts @@ -23,6 +23,7 @@ import { writeFile, } from "node:fs/promises"; import { resolveWebChatDir, resolveOpenClawStateDir, resolveActiveAgentId } from "./workspace"; +import { markChatAgentIdle } from "./chat-agent-registry"; import { type AgentProcessHandle, type AgentEvent, @@ -102,6 +103,10 @@ export type ActiveRun = { _finalizeTimer?: ReturnType | null; /** @internal short reconciliation window before waiting-run completion */ _waitingFinalizeTimer?: ReturnType | null; + /** Agent ID captured at run creation time. Used for abort, transcript enrichment. */ + pinnedAgentId?: string; + /** Full gateway session key captured at run creation time. */ + pinnedSessionKey?: string; }; // ── Constants ── @@ -113,6 +118,8 @@ const SUBSCRIBE_RETRY_BASE_MS = 300; const SUBSCRIBE_RETRY_MAX_MS = 5_000; const SUBSCRIBE_LIFECYCLE_END_GRACE_MS = 750; const WAITING_FINALIZE_RECONCILE_MS = 5_000; +const MAX_WAITING_DURATION_MS = 10 * 60_000; +const SUBAGENT_REGISTRY_STALENESS_MS = 15 * 60_000; const SILENT_REPLY_TOKEN = "NO_REPLY"; @@ -275,10 +282,13 @@ export async function hasRunningSubagentsForParent(parentWebSessionId: string): const runs = raw?.runs; if (!runs) {return false;} const parentKeyPattern = `:web:${parentWebSessionId}`; + const now = Date.now(); for (const entry of Object.values(runs)) { const requester = typeof entry.requesterSessionKey === "string" ? entry.requesterSessionKey : ""; if (!requester.endsWith(parentKeyPattern)) {continue;} if (typeof entry.endedAt === "number") {continue;} + const createdAt = typeof entry.createdAt === "number" ? entry.createdAt : 0; + if (createdAt > 0 && now - createdAt > SUBAGENT_REGISTRY_STALENESS_MS) {continue;} return true; } } catch { /* ignore */ } @@ -505,7 +515,9 @@ export function abortRun(sessionId: string): boolean { */ function sendGatewayAbort(sessionId: string): void { try { - const sessionKey = `agent:${resolveActiveAgentId()}:web:${sessionId}`; + const run = activeRuns.get(sessionId); + const agentId = run?.pinnedAgentId ?? resolveActiveAgentId(); + const sessionKey = run?.pinnedSessionKey ?? `agent:${agentId}:web:${sessionId}`; void callGatewayRpc("chat.abort", { sessionKey }, { timeoutMs: 4_000 }).catch( () => { // Best effort; don't let abort failures break the stop flow. @@ -524,8 +536,10 @@ export function startRun(params: { sessionId: string; message: string; agentSessionId?: string; + /** Use a specific agent ID instead of the workspace default. */ + overrideAgentId?: string; }): ActiveRun { - const { sessionId, message, agentSessionId } = params; + const { sessionId, message, agentSessionId, overrideAgentId } = params; const existing = activeRuns.get(sessionId); if (existing?.status === "running") { @@ -534,8 +548,12 @@ export function startRun(params: { // Clean up a finished run that's still in the grace period. if (existing) {cleanupRun(sessionId);} + const agentId = overrideAgentId ?? resolveActiveAgentId(); + const sessionKey = agentSessionId + ? `agent:${agentId}:web:${agentSessionId}` + : undefined; const abortController = new AbortController(); - const child = spawnAgentProcess(message, agentSessionId); + const child = spawnAgentProcess(message, agentSessionId, overrideAgentId); const run: ActiveRun = { sessionId, @@ -557,6 +575,8 @@ export function startRun(params: { _subscribeRetryTimer: null, _subscribeRetryAttempt: 0, _waitingFinalizeTimer: null, + pinnedAgentId: agentId, + pinnedSessionKey: sessionKey, }; activeRuns.set(sessionId, run); @@ -657,9 +677,10 @@ type TranscriptToolPart = { function readLatestTranscriptToolParts( sessionKey: string, + pinnedAgentId?: string, ): { sessionId: string; tools: TranscriptToolPart[] } | null { const stateDir = resolveOpenClawStateDir(); - const agentId = resolveActiveAgentId(); + const agentId = pinnedAgentId ?? resolveActiveAgentId(); const sessionsJsonPath = join(stateDir, "agents", agentId, "sessions", "sessions.json"); if (!existsSync(sessionsJsonPath)) { return null; } const sessions = JSON.parse(readFileSync(sessionsJsonPath, "utf-8")) as Record>; @@ -829,7 +850,7 @@ function wireSubscribeOnlyProcess( if (liveStats.toolStartCount > 0) { return; } - const bundle = readLatestTranscriptToolParts(sessionKey); + const bundle = readLatestTranscriptToolParts(sessionKey, run.pinnedAgentId); if (!bundle) { return; } @@ -1145,7 +1166,8 @@ function finalizeSubscribeRun(run: ActiveRun, status: "completed" | "error" = "c // backfill tool-invocation parts from the transcript into the web-chat JSONL. if (run.isSubscribeOnly && run.sessionKey && !hasToolParts) { const sessionKey = run.sessionKey; - setTimeout(() => { deferredTranscriptEnrich(sessionKey); }, 2_000); + const agentId = run.pinnedAgentId; + setTimeout(() => { deferredTranscriptEnrich(sessionKey, agentId); }, 2_000); } const grace = run.isSubscribeOnly ? SUBSCRIBE_CLEANUP_GRACE_MS : CLEANUP_GRACE_MS; @@ -1159,10 +1181,10 @@ function finalizeSubscribeRun(run: ActiveRun, status: "completed" | "error" = "c * tool-invocation parts into the web-chat JSONL for a subagent session. * Matches tools to assistant messages by text content to avoid index-mapping issues. */ -function deferredTranscriptEnrich(sessionKey: string): void { +function deferredTranscriptEnrich(sessionKey: string, pinnedAgentId?: string): void { try { const stateDir = resolveOpenClawStateDir(); - const agentId = resolveActiveAgentId(); + const agentId = pinnedAgentId ?? resolveActiveAgentId(); const sessionsJsonPath = join(stateDir, "agents", agentId, "sessions", "sessions.json"); if (!existsSync(sessionsJsonPath)) {return;} @@ -1499,7 +1521,8 @@ function wireChildProcess(run: ActiveRun): void { // ── Parse stdout JSON lines ── const rl = createInterface({ input: child.stdout! }); - const parentSessionKey = `agent:${resolveActiveAgentId()}:web:${run.sessionId}`; + const pinnedAgent = run.pinnedAgentId ?? resolveActiveAgentId(); + const parentSessionKey = run.pinnedSessionKey ?? `agent:${pinnedAgent}:web:${run.sessionId}`; // Prevent unhandled 'error' events on the readline interface. // When the child process fails to start (e.g. ENOENT — missing script) // the stdout pipe is destroyed and readline re-emits the error. Without @@ -1935,12 +1958,22 @@ function wireChildProcess(run: ActiveRun): void { } flushPersistence(run).catch(() => {}); startParentSubscribeStream(run, parentSessionKey, processParentSubscribeEvent); + + // Safety: force-finalize if waiting exceeds the maximum duration + setTimeout(() => { + if (run.status === "waiting-for-subagents") { + finalizeWaitingRun(run); + } + }, MAX_WAITING_DURATION_MS); return; } // Normal completion path. run.status = exitedClean ? "completed" : "error"; + // Release the chat agent pool slot so it can be reused. + try { markChatAgentIdle(run.sessionId); } catch { /* best-effort */ } + // Final persistence flush (removes _streaming flag). flushPersistence(run).catch(() => {}); @@ -2073,6 +2106,8 @@ function finalizeWaitingRun(run: ActiveRun): void { stopSubscribeProcess(run); + try { markChatAgentIdle(run.sessionId); } catch { /* best-effort */ } + flushPersistence(run).catch(() => {}); for (const sub of run.subscribers) { diff --git a/apps/web/lib/agent-runner.ts b/apps/web/lib/agent-runner.ts index 335de8bfd7a..2a7bd1e8d63 100644 --- a/apps/web/lib/agent-runner.ts +++ b/apps/web/lib/agent-runner.ts @@ -603,7 +603,6 @@ class GatewayProcessHandle ...(sessionKey ? { sessionKey } : {}), deliver: false, channel: "webchat", - thinking: "xhigh", lane: this.params.lane ?? "web", timeout: 0, }); @@ -662,21 +661,33 @@ class GatewayProcessHandle if (!this.client || !sessionKey.trim()) { return; } + + const patchParams: Record = { + key: sessionKey, + thinkingLevel: "xhigh", + verboseLevel: "full", + reasoningLevel: "on", + }; + let attempt = 0; let lastMessage = ""; while (attempt < SESSIONS_PATCH_MAX_ATTEMPTS) { attempt += 1; try { - const patch = await this.client.request("sessions.patch", { - key: sessionKey, - thinkingLevel: "xhigh", - verboseLevel: "full", - reasoningLevel: "on", - }); + const patch = await this.client.request("sessions.patch", patchParams); if (patch.ok) { return; } lastMessage = frameErrorMessage(patch); + + // If the error indicates thinkingLevel is unsupported for the + // current model, retry without it rather than failing entirely. + if (lastMessage.includes("thinkingLevel") && patchParams.thinkingLevel) { + delete patchParams.thinkingLevel; + attempt = 0; + continue; + } + if ( attempt >= SESSIONS_PATCH_MAX_ATTEMPTS || !isRetryableGatewayMessage(lastMessage) @@ -686,6 +697,13 @@ class GatewayProcessHandle } catch (error) { lastMessage = error instanceof Error ? error.message : String(error); + + if (lastMessage.includes("thinkingLevel") && patchParams.thinkingLevel) { + delete patchParams.thinkingLevel; + attempt = 0; + continue; + } + if ( attempt >= SESSIONS_PATCH_MAX_ATTEMPTS || !isRetryableGatewayMessage(lastMessage) @@ -781,6 +799,36 @@ class GatewayProcessHandle return; } + if (frame.event === "chat") { + // Only forward chat frames in subscribe mode. In start mode + // the agent stream already delivers the full assistant text; + // forwarding chat finals would duplicate the output. + if (this.params.mode !== "subscribe") { + return; + } + const payload = asRecord(frame.payload) ?? {}; + const sessionKey = + typeof payload.sessionKey === "string" ? payload.sessionKey : undefined; + if (!this.shouldAcceptSessionEvent(sessionKey)) { + return; + } + const payloadGlobalSeq = + typeof payload.globalSeq === "number" ? payload.globalSeq : undefined; + const eventGlobalSeq = + payloadGlobalSeq ?? + (typeof frame.seq === "number" ? frame.seq : undefined); + const event: AgentEvent = { + event: "chat", + data: payload, + ...(typeof eventGlobalSeq === "number" + ? { globalSeq: eventGlobalSeq } + : {}), + ...(sessionKey ? { sessionKey } : {}), + }; + (this.stdout as PassThrough).write(`${JSON.stringify(event)}\n`); + return; + } + if (frame.event === "error") { const payload = asRecord(frame.payload) ?? {}; const sessionKey = @@ -886,12 +934,14 @@ export async function callGatewayRpc( /** * Start an agent run via the Gateway WebSocket and return a process handle. + * @param overrideAgentId - Use a specific agent ID instead of the workspace default. */ export function spawnAgentProcess( message: string, agentSessionId?: string, + overrideAgentId?: string, ): AgentProcessHandle { - const agentId = resolveActiveAgentId(); + const agentId = overrideAgentId ?? resolveActiveAgentId(); const sessionKey = agentSessionId ? `agent:${agentId}:web:${agentSessionId}` : undefined; diff --git a/apps/web/lib/chat-agent-registry.test.ts b/apps/web/lib/chat-agent-registry.test.ts new file mode 100644 index 00000000000..1285ffac128 --- /dev/null +++ b/apps/web/lib/chat-agent-registry.test.ts @@ -0,0 +1,185 @@ +import { describe, it, expect, vi, beforeEach, afterEach } from "vitest"; +import { mkdtempSync, writeFileSync, readFileSync, existsSync, mkdirSync, rmSync } from "node:fs"; +import { join } from "node:path"; +import { tmpdir } from "node:os"; + +let tempDir: string; + +vi.mock("./workspace", () => ({ + resolveOpenClawStateDir: () => tempDir, + resolveActiveAgentId: () => "main", + getChatSlotAgentIds: () => ["chat-slot-main-1", "chat-slot-main-2", "chat-slot-main-3"], +})); + +describe("chat-agent-registry", () => { + beforeEach(() => { + vi.resetModules(); + tempDir = mkdtempSync(join(tmpdir(), "chat-agent-test-")); + + vi.mock("./workspace", () => ({ + resolveOpenClawStateDir: () => tempDir, + resolveActiveAgentId: () => "main", + getChatSlotAgentIds: () => ["chat-slot-main-1", "chat-slot-main-2", "chat-slot-main-3"], + })); + + const configPath = join(tempDir, "openclaw.json"); + writeFileSync( + configPath, + JSON.stringify({ + agents: { + list: [ + { id: "main", workspace: "/tmp/ws", default: true }, + ], + }, + }), + ); + }); + + afterEach(() => { + vi.restoreAllMocks(); + try { rmSync(tempDir, { recursive: true, force: true }); } catch { /* ignore */ } + }); + + it("allocates a pool slot agent and writes registry", async () => { + const { allocateChatAgent, getChatAgent } = await import("./chat-agent-registry.js"); + + const entry = allocateChatAgent("session-1"); + expect(entry.chatAgentId).toBe("chat-slot-main-1"); + expect(entry.workspaceAgentId).toBe("main"); + expect(entry.state).toBe("active"); + expect(entry.sessionId).toBe("session-1"); + + const stored = getChatAgent("session-1"); + expect(stored?.chatAgentId).toBe(entry.chatAgentId); + }); + + it("reuses existing slot on second allocate for same session", async () => { + const { allocateChatAgent } = await import("./chat-agent-registry.js"); + + const first = allocateChatAgent("session-2"); + const second = allocateChatAgent("session-2"); + expect(second.chatAgentId).toBe(first.chatAgentId); + expect(second.state).toBe("active"); + }); + + it("assigns different pool slots to concurrent sessions", async () => { + const { allocateChatAgent } = await import("./chat-agent-registry.js"); + + const a = allocateChatAgent("session-concurrent-a"); + const b = allocateChatAgent("session-concurrent-b"); + expect(a.chatAgentId).not.toBe(b.chatAgentId); + expect(a.chatAgentId).toBe("chat-slot-main-1"); + expect(b.chatAgentId).toBe("chat-slot-main-2"); + }); + + it("falls back to workspace agent when all slots are occupied", async () => { + const { allocateChatAgent } = await import("./chat-agent-registry.js"); + + allocateChatAgent("s-fill-1"); + allocateChatAgent("s-fill-2"); + allocateChatAgent("s-fill-3"); + const overflow = allocateChatAgent("s-fill-4"); + expect(overflow.chatAgentId).toBe("main"); + }); + + it("marks agent idle and back to active on touch", async () => { + const { allocateChatAgent, markChatAgentIdle, touchChatAgent, getChatAgent } = + await import("./chat-agent-registry.js"); + + allocateChatAgent("session-3"); + markChatAgentIdle("session-3"); + expect(getChatAgent("session-3")?.state).toBe("idle"); + + touchChatAgent("session-3"); + expect(getChatAgent("session-3")?.state).toBe("active"); + }); + + it("expires idle agents past TTL and releases the slot", async () => { + const { allocateChatAgent, getChatAgent, expireIdleChatAgents } = + await import("./chat-agent-registry.js"); + + const entry = allocateChatAgent("session-4", { idleTtlMs: 1 }); + expect(entry.state).toBe("active"); + + const registry = JSON.parse(readFileSync(join(tempDir, "chat-agents.json"), "utf-8")); + registry.agents["session-4"].lastActiveAt = Date.now() - 1000; + writeFileSync(join(tempDir, "chat-agents.json"), JSON.stringify(registry)); + + const expired = expireIdleChatAgents(); + expect(expired).toContain("session-4"); + expect(getChatAgent("session-4")?.state).toBe("expired"); + }); + + it("resumes expired agent transparently", async () => { + const { allocateChatAgent, expireIdleChatAgents, resumeExpiredChatAgent, getChatAgent } = + await import("./chat-agent-registry.js"); + + allocateChatAgent("session-5", { idleTtlMs: 1 }); + + const registry = JSON.parse(readFileSync(join(tempDir, "chat-agents.json"), "utf-8")); + registry.agents["session-5"].lastActiveAt = Date.now() - 1000; + writeFileSync(join(tempDir, "chat-agents.json"), JSON.stringify(registry)); + + expireIdleChatAgents(); + expect(getChatAgent("session-5")?.state).toBe("expired"); + + const resumed = resumeExpiredChatAgent("session-5"); + expect(resumed?.state).toBe("active"); + expect(resumed?.chatAgentId).toMatch(/^chat-/); + }); + + it("deletes agent and releases slot", async () => { + const { allocateChatAgent, deleteChatAgent, getChatAgent } = + await import("./chat-agent-registry.js"); + + allocateChatAgent("session-6"); + deleteChatAgent("session-6"); + + expect(getChatAgent("session-6")?.state).toBe("deleted"); + }); + + it("purges deleted entries from registry", async () => { + const { allocateChatAgent, deleteChatAgent, purgeChatAgentRegistry, getChatAgent } = + await import("./chat-agent-registry.js"); + + allocateChatAgent("session-7"); + deleteChatAgent("session-7"); + expect(getChatAgent("session-7")?.state).toBe("deleted"); + + const purged = purgeChatAgentRegistry(); + expect(purged).toBe(1); + expect(getChatAgent("session-7")).toBeUndefined(); + }); + + it("ensureChatAgentForSend resumes expired agent", async () => { + const { allocateChatAgent, expireIdleChatAgents, ensureChatAgentForSend } = + await import("./chat-agent-registry.js"); + + allocateChatAgent("session-8", { idleTtlMs: 1 }); + + const registry = JSON.parse(readFileSync(join(tempDir, "chat-agents.json"), "utf-8")); + registry.agents["session-8"].lastActiveAt = Date.now() - 1000; + writeFileSync(join(tempDir, "chat-agents.json"), JSON.stringify(registry)); + + expireIdleChatAgents(); + + const agentId = ensureChatAgentForSend("session-8"); + expect(agentId).toMatch(/^chat-/); + }); + + it("ensureChatAgentForSend returns undefined for unknown session", async () => { + const { ensureChatAgentForSend } = await import("./chat-agent-registry.js"); + expect(ensureChatAgentForSend("nonexistent")).toBeUndefined(); + }); + + it("listChatAgents returns all entries", async () => { + const { allocateChatAgent, listChatAgents } = await import("./chat-agent-registry.js"); + + allocateChatAgent("session-a"); + allocateChatAgent("session-b"); + + const all = listChatAgents(); + expect(all.length).toBe(2); + expect(all.map((e) => e.sessionId).sort()).toEqual(["session-a", "session-b"]); + }); +}); diff --git a/apps/web/lib/chat-agent-registry.ts b/apps/web/lib/chat-agent-registry.ts new file mode 100644 index 00000000000..84d71e8c330 --- /dev/null +++ b/apps/web/lib/chat-agent-registry.ts @@ -0,0 +1,259 @@ +/** + * Chat-agent registry: assigns web chat sessions to pre-created agent + * pool slots so concurrent chats each get their own gateway agent. + * + * Architecture: + * - Each workspace has one durable "workspace agent" (e.g. "kumareth"). + * - A pool of chat agent slots (e.g. "chat-slot-kumareth-1" through "-5") + * is pre-created in openclaw.json at workspace init time. + * - Each new web chat session is assigned an available slot from the pool. + * - When a slot is released (chat completes or is deleted), it becomes + * available for the next session. + * - If all slots are occupied, falls back to the workspace agent. + */ +import { existsSync, readFileSync, writeFileSync, mkdirSync } from "node:fs"; +import { join } from "node:path"; +import { resolveOpenClawStateDir, resolveActiveAgentId, getChatSlotAgentIds } from "./workspace"; + +const DEFAULT_IDLE_TTL_MS = 30 * 60_000; + +export type ChatAgentState = "active" | "idle" | "expired" | "deleted"; + +export type ChatAgentEntry = { + chatAgentId: string; + workspaceAgentId: string; + sessionId: string; + workspaceDir: string; + state: ChatAgentState; + createdAt: number; + lastActiveAt: number; + idleTtlMs: number; +}; + +type ChatAgentRegistryData = { + version: number; + agents: Record; +}; + +function registryPath(): string { + return join(resolveOpenClawStateDir(), "chat-agents.json"); +} + +function readRegistry(): ChatAgentRegistryData { + const fp = registryPath(); + if (!existsSync(fp)) { + return { version: 1, agents: {} }; + } + try { + return JSON.parse(readFileSync(fp, "utf-8")) as ChatAgentRegistryData; + } catch { + return { version: 1, agents: {} }; + } +} + +function writeRegistry(data: ChatAgentRegistryData): void { + const fp = registryPath(); + const dir = join(fp, ".."); + if (!existsSync(dir)) { + mkdirSync(dir, { recursive: true }); + } + writeFileSync(fp, JSON.stringify(data, null, 2) + "\n", "utf-8"); +} + +/** + * Assign a web chat session to an available pool slot agent. + * Does NOT write to openclaw.json -- slots are pre-created at workspace init. + * Falls back to the workspace agent if no slots are available. + */ +export function allocateChatAgent( + sessionId: string, + options?: { idleTtlMs?: number }, +): ChatAgentEntry { + const registry = readRegistry(); + + const existing = registry.agents[sessionId]; + if (existing && existing.state !== "expired" && existing.state !== "deleted") { + existing.lastActiveAt = Date.now(); + existing.state = "active"; + writeRegistry(registry); + return existing; + } + + const workspaceAgentId = resolveActiveAgentId(); + const now = Date.now(); + + // Find available pool slots (not assigned to an active/idle session) + const allSlots = getChatSlotAgentIds(); + const occupiedSlots = new Set( + Object.values(registry.agents) + .filter((e) => e.state === "active" || e.state === "idle") + .map((e) => e.chatAgentId), + ); + const availableSlot = allSlots.find((s) => !occupiedSlots.has(s)); + const chatAgentId = availableSlot ?? workspaceAgentId; + + const entry: ChatAgentEntry = { + chatAgentId, + workspaceAgentId, + sessionId, + workspaceDir: "", + state: "active", + createdAt: now, + lastActiveAt: now, + idleTtlMs: options?.idleTtlMs ?? DEFAULT_IDLE_TTL_MS, + }; + + registry.agents[sessionId] = entry; + writeRegistry(registry); + + return entry; +} + +/** Look up a chat agent entry by session ID. */ +export function getChatAgent(sessionId: string): ChatAgentEntry | undefined { + const registry = readRegistry(); + return registry.agents[sessionId]; +} + +/** Touch the lastActiveAt timestamp for a chat agent. */ +export function touchChatAgent(sessionId: string): void { + const registry = readRegistry(); + const entry = registry.agents[sessionId]; + if (!entry) {return;} + entry.lastActiveAt = Date.now(); + if (entry.state === "idle") { + entry.state = "active"; + } + writeRegistry(registry); +} + +/** Mark a chat agent as idle. */ +export function markChatAgentIdle(sessionId: string): void { + const registry = readRegistry(); + const entry = registry.agents[sessionId]; + if (!entry || entry.state === "expired" || entry.state === "deleted") {return;} + entry.state = "idle"; + writeRegistry(registry); +} + +/** Release a chat agent slot back to the pool. */ +export function deleteChatAgent(sessionId: string): void { + const registry = readRegistry(); + const entry = registry.agents[sessionId]; + if (!entry) {return;} + entry.state = "deleted"; + writeRegistry(registry); +} + +/** + * Expire chat agents that have been idle longer than their TTL. + * Returns the list of expired session IDs. + */ +export function expireIdleChatAgents(): string[] { + const registry = readRegistry(); + const now = Date.now(); + const expired: string[] = []; + + for (const [sessionId, entry] of Object.entries(registry.agents)) { + if (entry.state === "expired" || entry.state === "deleted") {continue;} + const idleSince = now - entry.lastActiveAt; + if (idleSince > entry.idleTtlMs) { + entry.state = "expired"; + expired.push(sessionId); + } + } + + if (expired.length > 0) { + writeRegistry(registry); + } + + return expired; +} + +/** + * Try to resume an expired chat agent by re-allocating it. + * Returns the new entry, or undefined if the session has no prior agent. + */ +export function resumeExpiredChatAgent( + sessionId: string, + options?: { idleTtlMs?: number }, +): ChatAgentEntry | undefined { + const registry = readRegistry(); + const existing = registry.agents[sessionId]; + if (!existing) {return undefined;} + if (existing.state !== "expired") {return existing;} + + return allocateChatAgent(sessionId, { + idleTtlMs: options?.idleTtlMs ?? existing.idleTtlMs, + }); +} + +/** List all chat agent entries (for diagnostics). */ +export function listChatAgents(): ChatAgentEntry[] { + const registry = readRegistry(); + return Object.values(registry.agents); +} + +/** Clean up deleted entries from the registry. */ +export function purgeChatAgentRegistry(): number { + const registry = readRegistry(); + let count = 0; + for (const [sessionId, entry] of Object.entries(registry.agents)) { + if (entry.state === "deleted") { + delete registry.agents[sessionId]; + count++; + } + } + if (count > 0) { + writeRegistry(registry); + } + return count; +} + +// ── Periodic GC ── + +const GC_INTERVAL_MS = 5 * 60_000; +let gcTimer: ReturnType | null = null; + +/** Start the background idle-GC interval (idempotent). */ +export function startChatAgentGc(): void { + if (gcTimer) {return;} + gcTimer = setInterval(() => { + try { + expireIdleChatAgents(); + purgeChatAgentRegistry(); + } catch { + // Best-effort background cleanup + } + }, GC_INTERVAL_MS); + if (typeof gcTimer === "object" && "unref" in gcTimer) { + gcTimer.unref(); + } +} + +/** Stop the background GC interval. */ +export function stopChatAgentGc(): void { + if (gcTimer) { + clearInterval(gcTimer); + gcTimer = null; + } +} + +/** + * Ensure a chat agent is valid for sending a message. + * If the agent expired, re-allocate it transparently. + * Returns the effective agent ID to use. + */ +export function ensureChatAgentForSend(sessionId: string): string | undefined { + const entry = getChatAgent(sessionId); + if (!entry) {return undefined;} + if (entry.state === "deleted") {return undefined;} + if (entry.state === "expired") { + const resumed = resumeExpiredChatAgent(sessionId); + return resumed?.chatAgentId; + } + touchChatAgent(sessionId); + return entry.chatAgentId; +} + +export { DEFAULT_IDLE_TTL_MS, GC_INTERVAL_MS }; diff --git a/apps/web/lib/workspace.ts b/apps/web/lib/workspace.ts index 18f23e16395..7dc424fe064 100644 --- a/apps/web/lib/workspace.ts +++ b/apps/web/lib/workspace.ts @@ -362,6 +362,54 @@ export function ensureAgentInConfig(workspaceName: string, workspaceDir: string) writeOpenClawConfig(config); } +const CHAT_SLOT_PREFIX = "chat-slot-"; +const DEFAULT_CHAT_POOL_SIZE = 5; + +/** + * Pre-create a pool of chat agent slots in `agents.list[]` so the gateway + * knows about them at startup. Each slot shares the workspace directory + * of the parent workspace agent, enabling concurrent chat sessions. + */ +export function ensureChatAgentPool(workspaceName: string, workspaceDir: string, poolSize = DEFAULT_CHAT_POOL_SIZE): void { + const config = readOpenClawConfig(); + if (!config.agents) { config.agents = {}; } + if (!Array.isArray(config.agents.list)) { config.agents.list = []; } + + const baseId = workspaceNameToAgentId(workspaceName); + let changed = false; + + for (let i = 1; i <= poolSize; i++) { + const slotId = `${CHAT_SLOT_PREFIX}${baseId}-${i}`; + const existing = config.agents.list.find((a) => a.id === slotId); + if (!existing) { + config.agents.list.push({ id: slotId, workspace: workspaceDir }); + changed = true; + } else if (existing.workspace !== workspaceDir) { + existing.workspace = workspaceDir; + changed = true; + } + } + + if (changed) { + writeOpenClawConfig(config); + } +} + +/** + * Return the list of chat slot agent IDs for a workspace. + */ +export function getChatSlotAgentIds(workspaceName?: string): string[] { + const config = readOpenClawConfig(); + const list = config.agents?.list; + if (!Array.isArray(list)) { return []; } + + const baseId = workspaceNameToAgentId(workspaceName ?? getActiveWorkspaceName() ?? DEFAULT_WORKSPACE_NAME); + const prefix = `${CHAT_SLOT_PREFIX}${baseId}-`; + return list.filter((a) => a.id.startsWith(prefix)).map((a) => a.id); +} + +export { CHAT_SLOT_PREFIX, DEFAULT_CHAT_POOL_SIZE }; + /** * Flip `default: true` to the target agent in `agents.list[]`. * No-op if the list doesn't exist or the agent isn't found.