feat(chat): add stream status labels and partial tool output

Keep a visible streaming status indicator throughout the assistant turn and forward partial tool output to the UI so users see real-time progress.
This commit is contained in:
kumarabhirup 2026-03-15 00:31:19 -07:00
parent 8838cc4e16
commit 2c2164ed2c
No known key found for this signature in database
GPG Key ID: DB7CA2289CAB0167
5 changed files with 456 additions and 7 deletions

View File

@ -13,7 +13,17 @@ vi.mock("@/lib/active-runs", () => ({
// Mock workspace module
vi.mock("@/lib/workspace", () => ({
ensureManagedWorkspaceRouting: vi.fn(),
getActiveWorkspaceName: vi.fn(() => "default"),
resolveActiveAgentId: vi.fn(() => "main"),
resolveAgentWorkspacePrefix: vi.fn(() => null),
resolveOpenClawStateDir: vi.fn(() => "/home/testuser/.openclaw-dench"),
resolveWorkspaceDirForName: vi.fn((name: string) =>
name === "default"
? "/home/testuser/.openclaw-dench/workspace"
: `/home/testuser/.openclaw-dench/workspace-${name}`,
),
resolveWorkspaceRoot: vi.fn(() => "/home/testuser/.openclaw-dench/workspace"),
}));
// Mock web-sessions shared module
@ -42,7 +52,17 @@ describe("Chat API routes", () => {
getRunningSessionIds: vi.fn(() => []),
}));
vi.mock("@/lib/workspace", () => ({
ensureManagedWorkspaceRouting: vi.fn(),
getActiveWorkspaceName: vi.fn(() => "default"),
resolveActiveAgentId: vi.fn(() => "main"),
resolveAgentWorkspacePrefix: vi.fn(() => null),
resolveOpenClawStateDir: vi.fn(() => "/home/testuser/.openclaw-dench"),
resolveWorkspaceDirForName: vi.fn((name: string) =>
name === "default"
? "/home/testuser/.openclaw-dench/workspace"
: `/home/testuser/.openclaw-dench/workspace-${name}`,
),
resolveWorkspaceRoot: vi.fn(() => "/home/testuser/.openclaw-dench/workspace"),
}));
vi.mock("@/app/api/web-sessions/shared", () => ({
getSessionMeta: vi.fn(() => undefined),
@ -115,6 +135,40 @@ describe("Chat API routes", () => {
expect(startRun).toHaveBeenCalled();
});
it("maps partial tool output into AI SDK preliminary output chunks", async () => {
const { hasActiveRun, subscribeToRun } = await import("@/lib/active-runs");
vi.mocked(hasActiveRun).mockReturnValue(false);
vi.mocked(subscribeToRun).mockImplementation(((_sessionId, callback) => {
callback({
type: "tool-output-partial",
toolCallId: "tool-1",
output: { text: "partial output" },
} as never);
callback(null);
return () => {};
}) as never);
const { POST } = await import("./route.js");
const req = new Request("http://localhost/api/chat", {
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify({
messages: [
{ id: "m1", role: "user", parts: [{ type: "text", text: "hello" }] },
],
sessionId: "s1",
}),
});
const res = await POST(req);
const body = await res.text();
expect(body).toContain('"type":"tool-output-available"');
expect(body).toContain('"toolCallId":"tool-1"');
expect(body).toContain('"preliminary":true');
expect(body).toContain('"text":"partial output"');
expect(body).not.toContain("tool-output-partial");
});
it("does not reuse an old run when sessionId is absent", async () => {
const { startRun, hasActiveRun, subscribeToRun, persistUserMessage } = await import("@/lib/active-runs");
vi.mocked(hasActiveRun).mockReturnValue(true);
@ -161,6 +215,48 @@ describe("Chat API routes", () => {
expect(persistUserMessage).toHaveBeenCalledWith("s1", expect.objectContaining({ id: "m1" }));
});
it("repairs managed workspace routing before starting a persisted session run", async () => {
const { ensureManagedWorkspaceRouting } = await import("@/lib/workspace");
const { getSessionMeta } = await import("@/app/api/web-sessions/shared");
const { startRun, hasActiveRun, subscribeToRun } = await import("@/lib/active-runs");
vi.mocked(hasActiveRun).mockReturnValue(false);
vi.mocked(subscribeToRun).mockReturnValue(() => {});
vi.mocked(getSessionMeta).mockReturnValue({
id: "s1",
title: "Chat",
createdAt: 1,
updatedAt: 1,
messageCount: 1,
workspaceName: "default",
workspaceRoot: "/home/testuser/.openclaw-dench/workspace",
workspaceAgentId: "main",
chatAgentId: "chat-slot-main-2",
} as never);
const { POST } = await import("./route.js");
const req = new Request("http://localhost/api/chat", {
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify({
messages: [
{ id: "m1", role: "user", parts: [{ type: "text", text: "repair routing" }] },
],
sessionId: "s1",
}),
});
await POST(req);
expect(ensureManagedWorkspaceRouting).toHaveBeenCalledWith(
"default",
"/home/testuser/.openclaw-dench/workspace",
{ markDefault: false },
);
expect(startRun).toHaveBeenCalledWith(
expect.objectContaining({
overrideAgentId: "chat-slot-main-2",
}),
);
});
it("resolves workspace file paths in message", async () => {
const { resolveAgentWorkspacePrefix } = await import("@/lib/workspace");
vi.mocked(resolveAgentWorkspacePrefix).mockReturnValue("workspace");

View File

@ -1,5 +1,13 @@
import type { UIMessage } from "ai";
import { resolveAgentWorkspacePrefix } from "@/lib/workspace";
import {
resolveActiveAgentId,
resolveAgentWorkspacePrefix,
resolveOpenClawStateDir,
resolveWorkspaceDirForName,
resolveWorkspaceRoot,
getActiveWorkspaceName,
ensureManagedWorkspaceRouting,
} from "@/lib/workspace";
import {
startRun,
startSubscribeRun,
@ -14,7 +22,6 @@ import {
import { trackServer } from "@/lib/telemetry";
import { existsSync, readFileSync } from "node:fs";
import { join } from "node:path";
import { resolveOpenClawStateDir } from "@/lib/workspace";
import { getSessionMeta } from "@/app/api/web-sessions/shared";
export const runtime = "nodejs";
@ -40,6 +47,22 @@ function deriveSubagentInfo(sessionKey: string): { parentSessionId: string; task
return null;
}
function normalizeLiveStreamEvent(event: SseEvent): SseEvent {
// AI SDK's UI stream schema does not define `tool-output-partial`.
// It expects repeated `tool-output-available` chunks with
// `preliminary: true` while the tool is still running.
if (event.type === "tool-output-partial") {
return {
type: "tool-output-available",
toolCallId: event.toolCallId,
output: event.output,
preliminary: true,
};
}
return event;
}
export async function POST(req: Request) {
const {
messages,
@ -122,7 +145,19 @@ export async function POST(req: Request) {
});
const sessionMeta = getSessionMeta(sessionId);
const effectiveAgentId = sessionMeta?.chatAgentId ?? sessionMeta?.workspaceAgentId;
const workspaceName =
sessionMeta?.workspaceName
?? getActiveWorkspaceName()
?? "default";
const workspaceRoot =
sessionMeta?.workspaceRoot
?? resolveWorkspaceRoot()
?? resolveWorkspaceDirForName(workspaceName);
ensureManagedWorkspaceRouting(workspaceName, workspaceRoot, { markDefault: false });
const effectiveAgentId =
sessionMeta?.chatAgentId
?? sessionMeta?.workspaceAgentId
?? resolveActiveAgentId();
try {
startRun({
@ -168,11 +203,8 @@ export async function POST(req: Request) {
try { controller.close(); } catch { /* already closed */ }
return;
}
// Skip custom event types not in the AI SDK v6 data stream schema;
// they're only consumed by the reconnection parser (processEvent).
if (event.type === "tool-output-partial") {return;}
try {
const json = JSON.stringify(event);
const json = JSON.stringify(normalizeLiveStreamEvent(event));
controller.enqueue(encoder.encode(`data: ${json}\n\n`));
} catch { /* ignore */ }
},

View File

@ -72,6 +72,33 @@ describe("createStreamParser", () => {
]);
});
it("keeps partial tool output visible without marking the tool complete", () => {
const parser = createStreamParser();
parser.processEvent({
type: "tool-input-start",
toolCallId: "tool-1",
toolName: "readFile",
});
parser.processEvent({
type: "tool-output-partial",
toolCallId: "tool-1",
output: { text: "first chunk" },
});
expect(parser.getParts()).toEqual([
{
type: "dynamic-tool",
toolCallId: "tool-1",
toolName: "readFile",
state: "input-available",
input: {},
output: { text: "first chunk" },
preliminary: true,
},
]);
});
it("closes reasoning state on reasoning-end to prevent stuck streaming badges", () => {
const parser = createStreamParser();

View File

@ -0,0 +1,99 @@
import type { UIMessage } from "ai";
import { describe, expect, it } from "vitest";
import {
getStreamActivityLabel,
hasAssistantText,
isStatusReasoningText,
} from "./chat-stream-status";
function assistantMessage(parts: UIMessage["parts"]): UIMessage {
return {
id: "assistant-1",
role: "assistant",
parts,
} as UIMessage;
}
describe("chat stream status helpers", () => {
it("detects status reasoning labels that should stay out of the transcript body", () => {
expect(isStatusReasoningText("Preparing response...")).toBe(true);
expect(
isStatusReasoningText(
"Optimizing session context...\nRetrying with compacted context...",
),
).toBe(true);
expect(isStatusReasoningText("Planning the requested changes")).toBe(false);
});
it("keeps the stream activity row visible after assistant text has started", () => {
const label = getStreamActivityLabel({
loadingSession: false,
isReconnecting: false,
status: "streaming",
hasRunningSubagents: false,
lastMessage: assistantMessage([
{ type: "text", text: "Drafting the final answer now..." },
] as UIMessage["parts"]),
});
expect(label).toBe("Still streaming...");
expect(
hasAssistantText(
assistantMessage([
{ type: "text", text: "Drafting the final answer now..." },
] as UIMessage["parts"]),
),
).toBe(true);
});
it("prefers gateway status reasoning over the generic streaming label", () => {
const label = getStreamActivityLabel({
loadingSession: false,
isReconnecting: false,
status: "streaming",
hasRunningSubagents: false,
lastMessage: assistantMessage([
{
type: "reasoning",
text: "Optimizing session context...\nRetrying with compacted context...",
},
] as UIMessage["parts"]),
});
expect(label).toBe("Optimizing session context... Retrying with compacted context...");
});
it("surfaces the active tool name while a tool call is still running", () => {
const label = getStreamActivityLabel({
loadingSession: false,
isReconnecting: false,
status: "streaming",
hasRunningSubagents: false,
lastMessage: assistantMessage([
{
type: "dynamic-tool",
toolName: "read_file",
toolCallId: "tool-1",
state: "input-available",
input: {},
},
] as UIMessage["parts"]),
});
expect(label).toBe("Running Read File...");
});
it("shows waiting for subagents as the top-priority active status", () => {
const label = getStreamActivityLabel({
loadingSession: false,
isReconnecting: false,
status: "streaming",
hasRunningSubagents: true,
lastMessage: assistantMessage([
{ type: "text", text: "Initial draft is ready." },
] as UIMessage["parts"]),
});
expect(label).toBe("Waiting for subagents...");
});
});

View File

@ -0,0 +1,195 @@
import type { UIMessage } from "ai";
export const STREAM_STATUS_REASONING_LABELS = [
"Preparing response...",
"Optimizing session context...",
"Waiting for subagent results...",
"Waiting for subagents...",
] as const;
type ChatStatus = "submitted" | "streaming" | "ready" | "error";
type MessagePart = UIMessage["parts"][number];
function collapseWhitespace(text: string): string {
return text.trim().replace(/\s+/g, " ");
}
function humanizeToolName(toolName: string): string {
const normalized = toolName
.replace(/^tool-/, "")
.replace(/[_-]+/g, " ")
.trim();
if (!normalized) {
return "tool";
}
return normalized.replace(/\b\w/g, (char) => char.toUpperCase());
}
function resolveToolName(part: MessagePart): string | null {
if (part.type === "dynamic-tool") {
return typeof part.toolName === "string" ? part.toolName : null;
}
if (!part.type.startsWith("tool-")) {
return null;
}
const toolPart = part as {
type: string;
title?: unknown;
toolName?: unknown;
};
if (typeof toolPart.title === "string" && toolPart.title.trim()) {
return toolPart.title;
}
if (typeof toolPart.toolName === "string" && toolPart.toolName.trim()) {
return toolPart.toolName;
}
return part.type.replace(/^tool-/, "");
}
function resolveToolState(part: MessagePart): string | null {
if (part.type === "dynamic-tool") {
return typeof part.state === "string"
? part.state
: "input-available";
}
if (!part.type.startsWith("tool-")) {
return null;
}
const toolPart = part as {
state?: unknown;
errorText?: unknown;
output?: unknown;
result?: unknown;
};
if (typeof toolPart.state === "string") {
return toolPart.state;
}
if (typeof toolPart.errorText === "string" && toolPart.errorText.trim()) {
return "error";
}
if ("result" in toolPart || "output" in toolPart) {
return "output-available";
}
return "input-available";
}
export function hasAssistantText(message: UIMessage | null): boolean {
return Boolean(
message?.role === "assistant" &&
message.parts.some(
(part) =>
part.type === "text" &&
typeof (part as { text?: unknown }).text === "string" &&
(part as { text: string }).text.length > 0,
),
);
}
export function isStatusReasoningText(text: string): boolean {
return STREAM_STATUS_REASONING_LABELS.some((label) =>
text.startsWith(label),
);
}
function getLatestStatusReasoning(parts: UIMessage["parts"]): string | null {
for (let i = parts.length - 1; i >= 0; i--) {
const part = parts[i];
if (part.type !== "reasoning") {
continue;
}
const text =
typeof (part as { text?: unknown }).text === "string"
? collapseWhitespace((part as { text: string }).text)
: "";
if (text && isStatusReasoningText(text)) {
return text;
}
}
return null;
}
function getRunningToolLabel(parts: UIMessage["parts"]): string | null {
for (let i = parts.length - 1; i >= 0; i--) {
const part = parts[i];
const state = resolveToolState(part);
if (!state || state === "output-available" || state === "error") {
continue;
}
const toolName = resolveToolName(part);
if (!toolName) {
continue;
}
if (toolName === "sessions_spawn") {
return "Starting subagent...";
}
return `Running ${humanizeToolName(toolName)}...`;
}
return null;
}
export function getStreamActivityLabel({
loadingSession,
isReconnecting,
status,
hasRunningSubagents,
lastMessage,
}: {
loadingSession: boolean;
isReconnecting: boolean;
status: ChatStatus;
hasRunningSubagents: boolean;
lastMessage: UIMessage | null;
}): string | null {
if (loadingSession) {
return "Loading session...";
}
if (isReconnecting) {
return "Resuming stream...";
}
if (hasRunningSubagents) {
return "Waiting for subagents...";
}
if (lastMessage?.role === "assistant") {
const statusReasoning = getLatestStatusReasoning(lastMessage.parts);
if (statusReasoning) {
return statusReasoning;
}
const runningTool = getRunningToolLabel(lastMessage.parts);
if (runningTool) {
return runningTool;
}
}
if (status === "submitted") {
return "Thinking...";
}
if (status === "streaming") {
return hasAssistantText(lastMessage)
? "Still streaming..."
: "Streaming...";
}
return null;
}