diff --git a/apps/web/app/api/chat/chat.test.ts b/apps/web/app/api/chat/chat.test.ts index 494483d30ab..48a23e7e030 100644 --- a/apps/web/app/api/chat/chat.test.ts +++ b/apps/web/app/api/chat/chat.test.ts @@ -13,7 +13,17 @@ vi.mock("@/lib/active-runs", () => ({ // Mock workspace module vi.mock("@/lib/workspace", () => ({ + ensureManagedWorkspaceRouting: vi.fn(), + getActiveWorkspaceName: vi.fn(() => "default"), + resolveActiveAgentId: vi.fn(() => "main"), resolveAgentWorkspacePrefix: vi.fn(() => null), + resolveOpenClawStateDir: vi.fn(() => "/home/testuser/.openclaw-dench"), + resolveWorkspaceDirForName: vi.fn((name: string) => + name === "default" + ? "/home/testuser/.openclaw-dench/workspace" + : `/home/testuser/.openclaw-dench/workspace-${name}`, + ), + resolveWorkspaceRoot: vi.fn(() => "/home/testuser/.openclaw-dench/workspace"), })); // Mock web-sessions shared module @@ -42,7 +52,17 @@ describe("Chat API routes", () => { getRunningSessionIds: vi.fn(() => []), })); vi.mock("@/lib/workspace", () => ({ + ensureManagedWorkspaceRouting: vi.fn(), + getActiveWorkspaceName: vi.fn(() => "default"), + resolveActiveAgentId: vi.fn(() => "main"), resolveAgentWorkspacePrefix: vi.fn(() => null), + resolveOpenClawStateDir: vi.fn(() => "/home/testuser/.openclaw-dench"), + resolveWorkspaceDirForName: vi.fn((name: string) => + name === "default" + ? "/home/testuser/.openclaw-dench/workspace" + : `/home/testuser/.openclaw-dench/workspace-${name}`, + ), + resolveWorkspaceRoot: vi.fn(() => "/home/testuser/.openclaw-dench/workspace"), })); vi.mock("@/app/api/web-sessions/shared", () => ({ getSessionMeta: vi.fn(() => undefined), @@ -115,6 +135,40 @@ describe("Chat API routes", () => { expect(startRun).toHaveBeenCalled(); }); + it("maps partial tool output into AI SDK preliminary output chunks", async () => { + const { hasActiveRun, subscribeToRun } = await import("@/lib/active-runs"); + vi.mocked(hasActiveRun).mockReturnValue(false); + vi.mocked(subscribeToRun).mockImplementation(((_sessionId, callback) => { + callback({ + type: "tool-output-partial", + toolCallId: "tool-1", + output: { text: "partial output" }, + } as never); + callback(null); + return () => {}; + }) as never); + + const { POST } = await import("./route.js"); + const req = new Request("http://localhost/api/chat", { + method: "POST", + headers: { "Content-Type": "application/json" }, + body: JSON.stringify({ + messages: [ + { id: "m1", role: "user", parts: [{ type: "text", text: "hello" }] }, + ], + sessionId: "s1", + }), + }); + const res = await POST(req); + const body = await res.text(); + + expect(body).toContain('"type":"tool-output-available"'); + expect(body).toContain('"toolCallId":"tool-1"'); + expect(body).toContain('"preliminary":true'); + expect(body).toContain('"text":"partial output"'); + expect(body).not.toContain("tool-output-partial"); + }); + it("does not reuse an old run when sessionId is absent", async () => { const { startRun, hasActiveRun, subscribeToRun, persistUserMessage } = await import("@/lib/active-runs"); vi.mocked(hasActiveRun).mockReturnValue(true); @@ -161,6 +215,48 @@ describe("Chat API routes", () => { expect(persistUserMessage).toHaveBeenCalledWith("s1", expect.objectContaining({ id: "m1" })); }); + it("repairs managed workspace routing before starting a persisted session run", async () => { + const { ensureManagedWorkspaceRouting } = await import("@/lib/workspace"); + const { getSessionMeta } = await import("@/app/api/web-sessions/shared"); + const { startRun, hasActiveRun, subscribeToRun } = await import("@/lib/active-runs"); + vi.mocked(hasActiveRun).mockReturnValue(false); + vi.mocked(subscribeToRun).mockReturnValue(() => {}); + vi.mocked(getSessionMeta).mockReturnValue({ + id: "s1", + title: "Chat", + createdAt: 1, + updatedAt: 1, + messageCount: 1, + workspaceName: "default", + workspaceRoot: "/home/testuser/.openclaw-dench/workspace", + workspaceAgentId: "main", + chatAgentId: "chat-slot-main-2", + } as never); + + const { POST } = await import("./route.js"); + const req = new Request("http://localhost/api/chat", { + method: "POST", + headers: { "Content-Type": "application/json" }, + body: JSON.stringify({ + messages: [ + { id: "m1", role: "user", parts: [{ type: "text", text: "repair routing" }] }, + ], + sessionId: "s1", + }), + }); + await POST(req); + expect(ensureManagedWorkspaceRouting).toHaveBeenCalledWith( + "default", + "/home/testuser/.openclaw-dench/workspace", + { markDefault: false }, + ); + expect(startRun).toHaveBeenCalledWith( + expect.objectContaining({ + overrideAgentId: "chat-slot-main-2", + }), + ); + }); + it("resolves workspace file paths in message", async () => { const { resolveAgentWorkspacePrefix } = await import("@/lib/workspace"); vi.mocked(resolveAgentWorkspacePrefix).mockReturnValue("workspace"); diff --git a/apps/web/app/api/chat/route.ts b/apps/web/app/api/chat/route.ts index f138ec0505f..e5443438ce3 100644 --- a/apps/web/app/api/chat/route.ts +++ b/apps/web/app/api/chat/route.ts @@ -1,5 +1,13 @@ import type { UIMessage } from "ai"; -import { resolveAgentWorkspacePrefix } from "@/lib/workspace"; +import { + resolveActiveAgentId, + resolveAgentWorkspacePrefix, + resolveOpenClawStateDir, + resolveWorkspaceDirForName, + resolveWorkspaceRoot, + getActiveWorkspaceName, + ensureManagedWorkspaceRouting, +} from "@/lib/workspace"; import { startRun, startSubscribeRun, @@ -14,7 +22,6 @@ import { import { trackServer } from "@/lib/telemetry"; import { existsSync, readFileSync } from "node:fs"; import { join } from "node:path"; -import { resolveOpenClawStateDir } from "@/lib/workspace"; import { getSessionMeta } from "@/app/api/web-sessions/shared"; export const runtime = "nodejs"; @@ -40,6 +47,22 @@ function deriveSubagentInfo(sessionKey: string): { parentSessionId: string; task return null; } +function normalizeLiveStreamEvent(event: SseEvent): SseEvent { + // AI SDK's UI stream schema does not define `tool-output-partial`. + // It expects repeated `tool-output-available` chunks with + // `preliminary: true` while the tool is still running. + if (event.type === "tool-output-partial") { + return { + type: "tool-output-available", + toolCallId: event.toolCallId, + output: event.output, + preliminary: true, + }; + } + + return event; +} + export async function POST(req: Request) { const { messages, @@ -122,7 +145,19 @@ export async function POST(req: Request) { }); const sessionMeta = getSessionMeta(sessionId); - const effectiveAgentId = sessionMeta?.chatAgentId ?? sessionMeta?.workspaceAgentId; + const workspaceName = + sessionMeta?.workspaceName + ?? getActiveWorkspaceName() + ?? "default"; + const workspaceRoot = + sessionMeta?.workspaceRoot + ?? resolveWorkspaceRoot() + ?? resolveWorkspaceDirForName(workspaceName); + ensureManagedWorkspaceRouting(workspaceName, workspaceRoot, { markDefault: false }); + const effectiveAgentId = + sessionMeta?.chatAgentId + ?? sessionMeta?.workspaceAgentId + ?? resolveActiveAgentId(); try { startRun({ @@ -168,11 +203,8 @@ export async function POST(req: Request) { try { controller.close(); } catch { /* already closed */ } return; } - // Skip custom event types not in the AI SDK v6 data stream schema; - // they're only consumed by the reconnection parser (processEvent). - if (event.type === "tool-output-partial") {return;} try { - const json = JSON.stringify(event); + const json = JSON.stringify(normalizeLiveStreamEvent(event)); controller.enqueue(encoder.encode(`data: ${json}\n\n`)); } catch { /* ignore */ } }, diff --git a/apps/web/app/components/chat-panel.stream-parser.test.ts b/apps/web/app/components/chat-panel.stream-parser.test.ts index 48cb9999aec..a1651cf9554 100644 --- a/apps/web/app/components/chat-panel.stream-parser.test.ts +++ b/apps/web/app/components/chat-panel.stream-parser.test.ts @@ -72,6 +72,33 @@ describe("createStreamParser", () => { ]); }); + it("keeps partial tool output visible without marking the tool complete", () => { + const parser = createStreamParser(); + + parser.processEvent({ + type: "tool-input-start", + toolCallId: "tool-1", + toolName: "readFile", + }); + parser.processEvent({ + type: "tool-output-partial", + toolCallId: "tool-1", + output: { text: "first chunk" }, + }); + + expect(parser.getParts()).toEqual([ + { + type: "dynamic-tool", + toolCallId: "tool-1", + toolName: "readFile", + state: "input-available", + input: {}, + output: { text: "first chunk" }, + preliminary: true, + }, + ]); + }); + it("closes reasoning state on reasoning-end to prevent stuck streaming badges", () => { const parser = createStreamParser(); diff --git a/apps/web/app/components/chat-stream-status.test.ts b/apps/web/app/components/chat-stream-status.test.ts new file mode 100644 index 00000000000..a02efb156ef --- /dev/null +++ b/apps/web/app/components/chat-stream-status.test.ts @@ -0,0 +1,99 @@ +import type { UIMessage } from "ai"; +import { describe, expect, it } from "vitest"; +import { + getStreamActivityLabel, + hasAssistantText, + isStatusReasoningText, +} from "./chat-stream-status"; + +function assistantMessage(parts: UIMessage["parts"]): UIMessage { + return { + id: "assistant-1", + role: "assistant", + parts, + } as UIMessage; +} + +describe("chat stream status helpers", () => { + it("detects status reasoning labels that should stay out of the transcript body", () => { + expect(isStatusReasoningText("Preparing response...")).toBe(true); + expect( + isStatusReasoningText( + "Optimizing session context...\nRetrying with compacted context...", + ), + ).toBe(true); + expect(isStatusReasoningText("Planning the requested changes")).toBe(false); + }); + + it("keeps the stream activity row visible after assistant text has started", () => { + const label = getStreamActivityLabel({ + loadingSession: false, + isReconnecting: false, + status: "streaming", + hasRunningSubagents: false, + lastMessage: assistantMessage([ + { type: "text", text: "Drafting the final answer now..." }, + ] as UIMessage["parts"]), + }); + + expect(label).toBe("Still streaming..."); + expect( + hasAssistantText( + assistantMessage([ + { type: "text", text: "Drafting the final answer now..." }, + ] as UIMessage["parts"]), + ), + ).toBe(true); + }); + + it("prefers gateway status reasoning over the generic streaming label", () => { + const label = getStreamActivityLabel({ + loadingSession: false, + isReconnecting: false, + status: "streaming", + hasRunningSubagents: false, + lastMessage: assistantMessage([ + { + type: "reasoning", + text: "Optimizing session context...\nRetrying with compacted context...", + }, + ] as UIMessage["parts"]), + }); + + expect(label).toBe("Optimizing session context... Retrying with compacted context..."); + }); + + it("surfaces the active tool name while a tool call is still running", () => { + const label = getStreamActivityLabel({ + loadingSession: false, + isReconnecting: false, + status: "streaming", + hasRunningSubagents: false, + lastMessage: assistantMessage([ + { + type: "dynamic-tool", + toolName: "read_file", + toolCallId: "tool-1", + state: "input-available", + input: {}, + }, + ] as UIMessage["parts"]), + }); + + expect(label).toBe("Running Read File..."); + }); + + it("shows waiting for subagents as the top-priority active status", () => { + const label = getStreamActivityLabel({ + loadingSession: false, + isReconnecting: false, + status: "streaming", + hasRunningSubagents: true, + lastMessage: assistantMessage([ + { type: "text", text: "Initial draft is ready." }, + ] as UIMessage["parts"]), + }); + + expect(label).toBe("Waiting for subagents..."); + }); +}); diff --git a/apps/web/app/components/chat-stream-status.ts b/apps/web/app/components/chat-stream-status.ts new file mode 100644 index 00000000000..33a92bc408f --- /dev/null +++ b/apps/web/app/components/chat-stream-status.ts @@ -0,0 +1,195 @@ +import type { UIMessage } from "ai"; + +export const STREAM_STATUS_REASONING_LABELS = [ + "Preparing response...", + "Optimizing session context...", + "Waiting for subagent results...", + "Waiting for subagents...", +] as const; + +type ChatStatus = "submitted" | "streaming" | "ready" | "error"; +type MessagePart = UIMessage["parts"][number]; + +function collapseWhitespace(text: string): string { + return text.trim().replace(/\s+/g, " "); +} + +function humanizeToolName(toolName: string): string { + const normalized = toolName + .replace(/^tool-/, "") + .replace(/[_-]+/g, " ") + .trim(); + + if (!normalized) { + return "tool"; + } + + return normalized.replace(/\b\w/g, (char) => char.toUpperCase()); +} + +function resolveToolName(part: MessagePart): string | null { + if (part.type === "dynamic-tool") { + return typeof part.toolName === "string" ? part.toolName : null; + } + + if (!part.type.startsWith("tool-")) { + return null; + } + + const toolPart = part as { + type: string; + title?: unknown; + toolName?: unknown; + }; + + if (typeof toolPart.title === "string" && toolPart.title.trim()) { + return toolPart.title; + } + if (typeof toolPart.toolName === "string" && toolPart.toolName.trim()) { + return toolPart.toolName; + } + + return part.type.replace(/^tool-/, ""); +} + +function resolveToolState(part: MessagePart): string | null { + if (part.type === "dynamic-tool") { + return typeof part.state === "string" + ? part.state + : "input-available"; + } + + if (!part.type.startsWith("tool-")) { + return null; + } + + const toolPart = part as { + state?: unknown; + errorText?: unknown; + output?: unknown; + result?: unknown; + }; + + if (typeof toolPart.state === "string") { + return toolPart.state; + } + if (typeof toolPart.errorText === "string" && toolPart.errorText.trim()) { + return "error"; + } + if ("result" in toolPart || "output" in toolPart) { + return "output-available"; + } + + return "input-available"; +} + +export function hasAssistantText(message: UIMessage | null): boolean { + return Boolean( + message?.role === "assistant" && + message.parts.some( + (part) => + part.type === "text" && + typeof (part as { text?: unknown }).text === "string" && + (part as { text: string }).text.length > 0, + ), + ); +} + +export function isStatusReasoningText(text: string): boolean { + return STREAM_STATUS_REASONING_LABELS.some((label) => + text.startsWith(label), + ); +} + +function getLatestStatusReasoning(parts: UIMessage["parts"]): string | null { + for (let i = parts.length - 1; i >= 0; i--) { + const part = parts[i]; + if (part.type !== "reasoning") { + continue; + } + + const text = + typeof (part as { text?: unknown }).text === "string" + ? collapseWhitespace((part as { text: string }).text) + : ""; + + if (text && isStatusReasoningText(text)) { + return text; + } + } + + return null; +} + +function getRunningToolLabel(parts: UIMessage["parts"]): string | null { + for (let i = parts.length - 1; i >= 0; i--) { + const part = parts[i]; + const state = resolveToolState(part); + if (!state || state === "output-available" || state === "error") { + continue; + } + + const toolName = resolveToolName(part); + if (!toolName) { + continue; + } + + if (toolName === "sessions_spawn") { + return "Starting subagent..."; + } + + return `Running ${humanizeToolName(toolName)}...`; + } + + return null; +} + +export function getStreamActivityLabel({ + loadingSession, + isReconnecting, + status, + hasRunningSubagents, + lastMessage, +}: { + loadingSession: boolean; + isReconnecting: boolean; + status: ChatStatus; + hasRunningSubagents: boolean; + lastMessage: UIMessage | null; +}): string | null { + if (loadingSession) { + return "Loading session..."; + } + + if (isReconnecting) { + return "Resuming stream..."; + } + + if (hasRunningSubagents) { + return "Waiting for subagents..."; + } + + if (lastMessage?.role === "assistant") { + const statusReasoning = getLatestStatusReasoning(lastMessage.parts); + if (statusReasoning) { + return statusReasoning; + } + + const runningTool = getRunningToolLabel(lastMessage.parts); + if (runningTool) { + return runningTool; + } + } + + if (status === "submitted") { + return "Thinking..."; + } + + if (status === "streaming") { + return hasAssistantText(lastMessage) + ? "Still streaming..." + : "Streaming..."; + } + + return null; +}