From 38f3cb6efef7219f812a0d62dbd1c6375d1a24cb Mon Sep 17 00:00:00 2001 From: kumarabhirup Date: Tue, 3 Mar 2026 15:38:00 -0800 Subject: [PATCH] feat(web): use workspace-aware agent IDs for gateway sessions Replace hardcoded agent:main: session keys with resolveActiveAgentId() so each workspace routes to its own gateway agent. --- apps/web/app/api/chat/subagents/route.ts | 4 +- apps/web/lib/active-runs.ts | 67 ++++++++++++++++++++++-- apps/web/lib/agent-runner.ts | 10 ++-- 3 files changed, 73 insertions(+), 8 deletions(-) diff --git a/apps/web/app/api/chat/subagents/route.ts b/apps/web/app/api/chat/subagents/route.ts index 35f3ab6f811..95769db1b87 100644 --- a/apps/web/app/api/chat/subagents/route.ts +++ b/apps/web/app/api/chat/subagents/route.ts @@ -1,6 +1,6 @@ import { existsSync, readFileSync } from "node:fs"; import { join } from "node:path"; -import { resolveOpenClawStateDir } from "@/lib/workspace"; +import { resolveOpenClawStateDir, resolveActiveAgentId } from "@/lib/workspace"; export const runtime = "nodejs"; @@ -38,7 +38,7 @@ export async function GET(req: Request) { return Response.json({ error: "sessionId required" }, { status: 400 }); } - const webSessionKey = `agent:main:web:${sessionId}`; + const webSessionKey = `agent:${resolveActiveAgentId()}:web:${sessionId}`; const entries = readSubagentRegistry(); const subagents = entries diff --git a/apps/web/lib/active-runs.ts b/apps/web/lib/active-runs.ts index 224e24e4bb8..824a7949292 100644 --- a/apps/web/lib/active-runs.ts +++ b/apps/web/lib/active-runs.ts @@ -16,7 +16,7 @@ import { existsSync, mkdirSync, } from "node:fs"; -import { resolveWebChatDir, resolveOpenClawStateDir } from "./workspace"; +import { resolveWebChatDir, resolveOpenClawStateDir, resolveActiveAgentId } from "./workspace"; import { type AgentProcessHandle, type AgentEvent, @@ -131,6 +131,16 @@ function asRecord(value: unknown): Record | null { return value as Record; } +function resolveModelLabel(provider: unknown, model: unknown): string | null { + if (typeof model !== "string" || !model.trim()) { return null; } + const m = model.trim(); + if (typeof provider === "string" && provider.trim()) { + const p = provider.trim(); + return m.toLowerCase().startsWith(`${p.toLowerCase()}/`) ? m : `${p}/${m}`; + } + return m; +} + function extractAssistantTextFromChatPayload( data: Record | undefined, ): string { @@ -411,7 +421,7 @@ export function abortRun(sessionId: string): boolean { */ function sendGatewayAbort(sessionId: string): void { try { - const sessionKey = `agent:main:web:${sessionId}`; + const sessionKey = `agent:${resolveActiveAgentId()}:web:${sessionId}`; void callGatewayRpc("chat.abort", { sessionKey }, { timeoutMs: 4_000 }).catch( () => { // Best effort; don't let abort failures break the stop flow. @@ -717,6 +727,12 @@ function wireSubscribeOnlyProcess( emit({ type: "tool-input-available", toolCallId, toolName, input: args }); run.accumulated.parts.push({ type: "tool-invocation", toolCallId, toolName, args }); accToolMap.set(toolCallId, run.accumulated.parts.length - 1); + } else if (phase === "update") { + const partialResult = extractToolResult(ev.data?.partialResult); + if (partialResult) { + const output = buildToolOutput(partialResult); + emit({ type: "tool-output-partial", toolCallId, output }); + } } else if (phase === "result") { const isError = ev.data?.isError === true; const result = extractToolResult(ev.data?.result); @@ -742,6 +758,23 @@ function wireSubscribeOnlyProcess( } } + if (ev.event === "agent" && ev.stream === "lifecycle" && (ev.data?.phase === "fallback" || ev.data?.phase === "fallback_cleared")) { + const data = ev.data; + const selected = resolveModelLabel(data?.selectedProvider, data?.selectedModel) + ?? resolveModelLabel(data?.fromProvider, data?.fromModel); + const active = resolveModelLabel(data?.activeProvider, data?.activeModel) + ?? resolveModelLabel(data?.toProvider, data?.toModel); + if (selected && active) { + const isClear = data?.phase === "fallback_cleared"; + const reason = typeof data?.reasonSummary === "string" ? data.reasonSummary + : typeof data?.reason === "string" ? data.reason : undefined; + const label = isClear + ? `Restored to ${selected}` + : `Switched to ${active}${reason ? ` (${reason})` : ""}`; + openStatusReasoning(label); + } + } + if (ev.event === "agent" && ev.stream === "compaction") { const phase = typeof ev.data?.phase === "string" ? ev.data.phase : undefined; if (phase === "start") { openStatusReasoning("Optimizing session context..."); } @@ -1093,7 +1126,7 @@ function wireChildProcess(run: ActiveRun): void { // ── Parse stdout JSON lines ── const rl = createInterface({ input: child.stdout! }); - const parentSessionKey = `agent:main:web:${run.sessionId}`; + const parentSessionKey = `agent:${resolveActiveAgentId()}:web:${run.sessionId}`; // Prevent unhandled 'error' events on the readline interface. // When the child process fails to start (e.g. ENOENT — missing script) // the stdout pipe is destroyed and readline re-emits the error. Without @@ -1237,6 +1270,12 @@ function wireChildProcess(run: ActiveRun): void { args, }); accToolMap.set(toolCallId, run.accumulated.parts.length - 1); + } else if (phase === "update") { + const partialResult = extractToolResult(ev.data?.partialResult); + if (partialResult) { + const output = buildToolOutput(partialResult); + emit({ type: "tool-output-partial", toolCallId, output }); + } } else if (phase === "result") { const isError = ev.data?.isError === true; const result = extractToolResult(ev.data?.result); @@ -1291,6 +1330,28 @@ function wireChildProcess(run: ActiveRun): void { } } + // Model fallback events + if ( + ev.event === "agent" && + ev.stream === "lifecycle" && + (ev.data?.phase === "fallback" || ev.data?.phase === "fallback_cleared") + ) { + const data = ev.data; + const selected = resolveModelLabel(data?.selectedProvider, data?.selectedModel) + ?? resolveModelLabel(data?.fromProvider, data?.fromModel); + const active = resolveModelLabel(data?.activeProvider, data?.activeModel) + ?? resolveModelLabel(data?.toProvider, data?.toModel); + if (selected && active) { + const isClear = data?.phase === "fallback_cleared"; + const reason = typeof data?.reasonSummary === "string" ? data.reasonSummary + : typeof data?.reason === "string" ? data.reason : undefined; + const label = isClear + ? `Restored to ${selected}` + : `Switched to ${active}${reason ? ` (${reason})` : ""}`; + openStatusReasoning(label); + } + } + // Chat final events can include assistant turns from runs outside // the original parent process (e.g. subagent announce follow-ups). if (ev.event === "chat") { diff --git a/apps/web/lib/agent-runner.ts b/apps/web/lib/agent-runner.ts index 0d061a46a65..556b4fc41fd 100644 --- a/apps/web/lib/agent-runner.ts +++ b/apps/web/lib/agent-runner.ts @@ -8,6 +8,7 @@ import { PassThrough } from "node:stream"; import NodeWebSocket from "ws"; import { getEffectiveProfile, + resolveActiveAgentId, resolveOpenClawStateDir, resolveWorkspaceRoot, } from "./workspace"; @@ -653,6 +654,7 @@ class GatewayProcessHandle const patch = await this.client.request("sessions.patch", { key: sessionKey, verboseLevel: "full", + reasoningLevel: "on", }); if (patch.ok) { return; @@ -880,8 +882,9 @@ export function spawnAgentProcess( if (shouldForceLegacyStream()) { return spawnLegacyAgentProcess(message, agentSessionId); } + const agentId = resolveActiveAgentId(); const sessionKey = agentSessionId - ? `agent:main:web:${agentSessionId}` + ? `agent:${agentId}:web:${agentSessionId}` : undefined; return new GatewayProcessHandle({ mode: "start", @@ -902,17 +905,18 @@ function spawnCliAgentProcess( message: string, agentSessionId?: string, ): ReturnType { + const cliAgentId = resolveActiveAgentId(); const args = [ "agent", "--agent", - "main", + cliAgentId, "--message", message, "--stream-json", ]; if (agentSessionId) { - const sessionKey = `agent:main:web:${agentSessionId}`; + const sessionKey = `agent:${cliAgentId}:web:${agentSessionId}`; args.push("--session-key", sessionKey, "--lane", "web", "--channel", "webchat"); }