👌 IMPROVE: stream json shows tools and thinking now and so does nextjs web app

This commit is contained in:
kumarabhirup 2026-02-08 21:59:08 -08:00
parent 12ebd39308
commit 22345bdd71
No known key found for this signature in database
GPG Key ID: DB7CA2289CAB0167
9 changed files with 1148 additions and 246 deletions

View File

@ -1,5 +1,5 @@
import type { UIMessage } from "ai";
import { runAgent } from "@/lib/agent-runner";
import { runAgent, type ToolResult } from "@/lib/agent-runner";
// Force Node.js runtime (required for child_process)
export const runtime = "nodejs";
@ -7,100 +7,261 @@ export const runtime = "nodejs";
// Allow streaming responses up to 10 minutes
export const maxDuration = 600;
export async function POST(req: Request) {
const { messages }: { messages: UIMessage[] } = await req.json();
// Extract the latest user message text
const lastUserMessage = messages.filter((m) => m.role === "user").pop();
const userText =
lastUserMessage?.parts
?.filter((p): p is { type: "text"; text: string } => p.type === "text")
.map((p) => p.text)
.join("\n") ?? "";
console.log("[chat] Received message:", userText);
if (!userText.trim()) {
return new Response("No message provided", { status: 400 });
}
// Create a custom SSE stream
const encoder = new TextEncoder();
const stream = new ReadableStream({
async start(controller) {
const textPartId = `text-${Date.now()}`;
let started = false;
const writeEvent = (data: unknown) => {
const json = JSON.stringify(data);
console.log("[chat] SSE write:", json);
controller.enqueue(encoder.encode(`data: ${json}\n\n`));
};
console.log("[chat] Starting agent stream...");
try {
await runAgent(userText, {
onTextDelta: (delta) => {
console.log("[chat] Text delta:", delta);
if (!started) {
console.log("[chat] Writing text-start");
writeEvent({ type: "text-start", id: textPartId });
started = true;
}
writeEvent({ type: "text-delta", id: textPartId, delta });
},
onLifecycleEnd: () => {
console.log("[chat] Lifecycle end, started:", started);
if (started) {
writeEvent({ type: "text-end", id: textPartId });
}
},
onError: (err) => {
console.error("[chat] Agent error:", err);
if (!started) {
writeEvent({ type: "text-start", id: textPartId });
writeEvent({
type: "text-delta",
id: textPartId,
delta: `Error starting agent: ${err.message}`,
});
writeEvent({ type: "text-end", id: textPartId });
}
},
onClose: (code) => {
console.log("[chat] Agent closed with code:", code, "started:", started);
// If we never started text, emit an empty response
if (!started) {
writeEvent({ type: "text-start", id: textPartId });
writeEvent({
type: "text-delta",
id: textPartId,
delta: "(No response from agent)",
});
writeEvent({ type: "text-end", id: textPartId });
}
},
});
console.log("[chat] Agent stream complete");
} catch (error) {
console.error("[chat] Stream error:", error);
writeEvent({
type: "error",
error: error instanceof Error ? error.message : String(error),
});
} finally {
controller.close();
}
},
});
return new Response(stream, {
headers: {
"Content-Type": "text/event-stream",
"Cache-Control": "no-cache, no-transform",
Connection: "keep-alive",
},
});
/**
* Build a flat output object from the agent's tool result so the frontend
* can render tool output text, exit codes, etc.
*/
function buildToolOutput(
result?: ToolResult,
): Record<string, unknown> {
if (!result) return {};
const out: Record<string, unknown> = {};
if (result.text) out.text = result.text;
if (result.details) {
// Forward useful details (exit code, duration, status, cwd)
for (const key of [
"exitCode",
"status",
"durationMs",
"cwd",
"error",
"reason",
]) {
if (result.details[key] !== undefined)
out[key] = result.details[key];
}
}
return out;
}
export async function POST(req: Request) {
const { messages }: { messages: UIMessage[] } = await req.json();
// Extract the latest user message text
const lastUserMessage = messages.filter((m) => m.role === "user").pop();
const userText =
lastUserMessage?.parts
?.filter(
(p): p is { type: "text"; text: string } => p.type === "text",
)
.map((p) => p.text)
.join("\n") ?? "";
if (!userText.trim()) {
return new Response("No message provided", { status: 400 });
}
// Create a custom SSE stream using the AI SDK v6 data stream wire format.
// DefaultChatTransport parses these events into UIMessage parts automatically.
const encoder = new TextEncoder();
let closed = false;
const abortController = new AbortController();
const stream = new ReadableStream({
async start(controller) {
// Use incrementing IDs so multi-round reasoning/text cycles get
// unique part IDs (avoids conflicts in the AI SDK transport).
let idCounter = 0;
const nextId = (prefix: string) =>
`${prefix}-${Date.now()}-${++idCounter}`;
let currentTextId = "";
let currentReasoningId = "";
let textStarted = false;
let reasoningStarted = false;
// Track whether ANY text was ever sent across the full run.
// onLifecycleEnd closes the text part (textStarted→false), so
// onClose can't rely on textStarted alone to detect "no output".
let everSentText = false;
/** Write an SSE event; silently no-ops if the stream was already cancelled. */
const writeEvent = (data: unknown) => {
if (closed) return;
const json = JSON.stringify(data);
controller.enqueue(encoder.encode(`data: ${json}\n\n`));
};
/** Close the reasoning part if open. */
const closeReasoning = () => {
if (reasoningStarted) {
writeEvent({
type: "reasoning-end",
id: currentReasoningId,
});
reasoningStarted = false;
}
};
/** Close the text part if open. */
const closeText = () => {
if (textStarted) {
writeEvent({ type: "text-end", id: currentTextId });
textStarted = false;
}
};
try {
await runAgent(userText, abortController.signal, {
onThinkingDelta: (delta) => {
if (!reasoningStarted) {
currentReasoningId = nextId("reasoning");
writeEvent({
type: "reasoning-start",
id: currentReasoningId,
});
reasoningStarted = true;
}
writeEvent({
type: "reasoning-delta",
id: currentReasoningId,
delta,
});
},
onTextDelta: (delta) => {
// Close reasoning once text starts streaming
closeReasoning();
if (!textStarted) {
currentTextId = nextId("text");
writeEvent({
type: "text-start",
id: currentTextId,
});
textStarted = true;
}
everSentText = true;
writeEvent({
type: "text-delta",
id: currentTextId,
delta,
});
},
onToolStart: (toolCallId, toolName, args) => {
// Close open reasoning/text parts before tool events
closeReasoning();
closeText();
writeEvent({
type: "tool-input-start",
toolCallId,
toolName,
});
// Include actual tool arguments so the frontend can
// display what the tool is doing (command, path, etc.)
writeEvent({
type: "tool-input-available",
toolCallId,
toolName,
input: args ?? {},
});
},
onToolEnd: (
toolCallId,
_toolName,
isError,
result,
) => {
if (isError) {
const errorText =
result?.text ||
(result?.details?.error as
| string
| undefined) ||
"Tool execution failed";
writeEvent({
type: "tool-output-error",
toolCallId,
errorText,
});
} else {
// Include the actual tool output (text, exit code, etc.)
writeEvent({
type: "tool-output-available",
toolCallId,
output: buildToolOutput(result),
});
}
},
onLifecycleEnd: () => {
closeReasoning();
closeText();
},
onError: (err) => {
console.error("[chat] Agent error:", err);
closeReasoning();
if (!textStarted) {
currentTextId = nextId("text");
writeEvent({
type: "text-start",
id: currentTextId,
});
textStarted = true;
}
writeEvent({
type: "text-delta",
id: currentTextId,
delta: `Error starting agent: ${err.message}`,
});
writeEvent({ type: "text-end", id: currentTextId });
textStarted = false;
},
onClose: (_code) => {
closeReasoning();
if (!everSentText) {
// No text was ever sent during the entire run
currentTextId = nextId("text");
writeEvent({
type: "text-start",
id: currentTextId,
});
writeEvent({
type: "text-delta",
id: currentTextId,
delta: "(No response from agent)",
});
writeEvent({
type: "text-end",
id: currentTextId,
});
} else {
// Ensure any still-open text part is closed
closeText();
}
},
});
} catch (error) {
console.error("[chat] Stream error:", error);
writeEvent({
type: "error",
errorText:
error instanceof Error
? error.message
: String(error),
});
} finally {
if (!closed) {
closed = true;
controller.close();
}
}
},
cancel() {
// Client disconnected (e.g. user hit stop) — tear down gracefully.
closed = true;
abortController.abort();
},
});
return new Response(stream, {
headers: {
"Content-Type": "text/event-stream",
"Cache-Control": "no-cache, no-transform",
Connection: "keep-alive",
},
});
}

View File

@ -8,8 +8,11 @@ export const maxDuration = 30;
/** POST /api/new-session — send /new to the agent to start a fresh backend session */
export async function POST() {
return new Promise<Response>((resolve) => {
runAgent("/new", {
runAgent("/new", undefined, {
onTextDelta: () => {},
onThinkingDelta: () => {},
onToolStart: () => {},
onToolEnd: () => {},
onLifecycleEnd: () => {},
onError: (err) => {
console.error("[new-session] Error:", err);

View File

@ -9,7 +9,12 @@ const WEB_CHAT_DIR = join(homedir(), ".openclaw", "web-chat");
export type ChatLine = {
id: string;
role: "user" | "assistant";
/** Plain text summary (always present, used for sidebar / backward compat). */
content: string;
/** Full UIMessage parts array reasoning, tool calls, outputs, text.
* Present for sessions saved after the rich-persistence update;
* absent for older sessions (fall back to `content` as a text part). */
parts?: Array<Record<string, unknown>>;
timestamp: string;
};

View File

@ -0,0 +1,480 @@
"use client";
import { useEffect, useRef, useState } from "react";
/* ─── Public types ─── */
export type ChainPart =
| { kind: "reasoning"; text: string; isStreaming: boolean }
| {
kind: "tool";
toolName: string;
toolCallId: string;
status: "running" | "done" | "error";
args?: Record<string, unknown>;
output?: Record<string, unknown>;
errorText?: string;
};
/* ─── Main component ─── */
export function ChainOfThought({ parts }: { parts: ChainPart[] }) {
const [isOpen, setIsOpen] = useState(true);
const prevActiveRef = useRef(true);
const isActive = parts.some(
(p) =>
(p.kind === "reasoning" && p.isStreaming) ||
(p.kind === "tool" && p.status === "running"),
);
// Auto-collapse once all steps finish (active → inactive transition)
useEffect(() => {
if (prevActiveRef.current && !isActive && parts.length > 0) {
setIsOpen(false);
}
prevActiveRef.current = isActive;
}, [isActive, parts.length]);
// Aggregate reasoning text from all reasoning parts
const reasoningText = parts
.filter(
(p): p is Extract<ChainPart, { kind: "reasoning" }> =>
p.kind === "reasoning",
)
.map((p) => p.text)
.join("");
const isReasoningStreaming = parts.some(
(p) => p.kind === "reasoning" && p.isStreaming,
);
// Tool steps
const tools = parts.filter(
(p): p is Extract<ChainPart, { kind: "tool" }> => p.kind === "tool",
);
const completedTools = tools.filter((t) => t.status === "done").length;
const activeTool = tools.find((t) => t.status === "running");
// Header label summarizes current/completed activity
let headerLabel: string;
if (isActive) {
if (activeTool) {
// Show what the active tool is doing
const summary = getToolSummary(
activeTool.toolName,
activeTool.args,
);
headerLabel = summary || formatToolName(activeTool.toolName);
} else {
headerLabel = "Thinking";
}
} else if (tools.length > 0) {
headerLabel = `Reasoned with ${completedTools} tool${completedTools !== 1 ? "s" : ""}`;
} else {
headerLabel = "Reasoned";
}
return (
<div className="my-2 rounded-lg border border-[var(--color-border)] overflow-hidden">
{/* Trigger */}
<button
type="button"
onClick={() => setIsOpen((v) => !v)}
className="w-full flex items-center gap-2 px-3 py-2 text-xs text-[var(--color-text-muted)] hover:bg-[var(--color-surface-hover)] transition-colors cursor-pointer"
>
<SparkleIcon className="w-3.5 h-3.5 flex-shrink-0 opacity-70" />
<span className="font-medium truncate">{headerLabel}</span>
{isActive && (
<span className="w-1.5 h-1.5 rounded-full bg-[var(--color-accent)] animate-pulse flex-shrink-0" />
)}
<ChevronIcon
className={`w-3 h-3 ml-auto flex-shrink-0 transition-transform duration-200 ${
isOpen ? "" : "-rotate-90"
}`}
/>
</button>
{/* Collapsible content (smooth CSS grid animation) */}
<div
className="grid transition-[grid-template-rows] duration-200 ease-out"
style={{ gridTemplateRows: isOpen ? "1fr" : "0fr" }}
>
<div className="overflow-hidden">
<div className="px-3 pb-3 space-y-2">
{/* Reasoning text block */}
{reasoningText && (
<ReasoningText
text={reasoningText}
isStreaming={isReasoningStreaming}
/>
)}
{/* Tool step timeline */}
{tools.length > 0 && (
<div className="flex flex-col gap-1">
{tools.map((tool) => (
<ToolStep key={tool.toolCallId} {...tool} />
))}
</div>
)}
</div>
</div>
</div>
</div>
);
}
/* ─── Sub-components ─── */
/** Expandable reasoning text display */
function ReasoningText({
text,
isStreaming,
}: {
text: string;
isStreaming: boolean;
}) {
const [expanded, setExpanded] = useState(false);
const isLong = text.length > 300;
return (
<div>
<div
className={`text-[11px] text-[var(--color-text-muted)] whitespace-pre-wrap leading-relaxed opacity-60 ${
!expanded && isLong
? "max-h-20 overflow-hidden"
: "max-h-64 overflow-y-auto"
}`}
>
{text}
{isStreaming && (
<span className="inline-block w-1 h-3 ml-0.5 bg-[var(--color-accent)] opacity-60 animate-pulse align-text-bottom" />
)}
</div>
{isLong && !expanded && (
<button
type="button"
onClick={() => setExpanded(true)}
className="text-[11px] text-[var(--color-accent)] hover:underline mt-0.5 cursor-pointer"
>
Show more
</button>
)}
</div>
);
}
/** Rich tool step with args display and collapsible output */
function ToolStep({
toolName,
status,
args,
output,
errorText,
}: {
toolName: string;
status: "running" | "done" | "error";
args?: Record<string, unknown>;
output?: Record<string, unknown>;
errorText?: string;
}) {
const [showOutput, setShowOutput] = useState(false);
const displayType = getToolDisplayType(toolName);
const primaryArg = getPrimaryArg(toolName, args);
const outputText =
typeof output?.text === "string" ? output.text : undefined;
const exitCode =
output?.exitCode !== undefined ? Number(output.exitCode) : undefined;
return (
<div className="flex flex-col gap-1">
{/* Tool name + status */}
<div className="flex items-center gap-2 text-xs">
{status === "running" && (
<span className="w-3 h-3 border border-[var(--color-text-muted)] border-t-[var(--color-accent)] rounded-full animate-spin flex-shrink-0" />
)}
{status === "done" && (
<CheckIcon className="w-3 h-3 text-green-400 flex-shrink-0" />
)}
{status === "error" && (
<XIcon className="w-3 h-3 text-red-400 flex-shrink-0" />
)}
<span
className={`font-medium truncate ${
status === "running"
? "text-[var(--color-text)]"
: "text-[var(--color-text-muted)]"
}`}
>
{formatToolName(toolName)}
</span>
{/* Exit code badge for bash/exec tools */}
{exitCode !== undefined && exitCode !== 0 && (
<span className="text-[10px] text-red-400 font-mono">
exit {exitCode}
</span>
)}
</div>
{/* Primary argument: command, path, query, code, etc. */}
{primaryArg && (
<div className="ml-5">
{displayType === "bash" ? (
<CodeBlock
content={`$ ${primaryArg}`}
maxLines={3}
/>
) : displayType === "code" ? (
<CodeBlock content={primaryArg} maxLines={8} />
) : (
<div className="text-[11px] font-mono text-[var(--color-text-muted)] opacity-70 truncate">
{primaryArg}
</div>
)}
</div>
)}
{/* Error message */}
{status === "error" && errorText && (
<div className="ml-5 text-[11px] text-red-400 font-mono bg-red-900/10 rounded px-2 py-1">
{errorText}
</div>
)}
{/* Tool output */}
{outputText && status === "done" && (
<div className="ml-5">
<button
type="button"
onClick={() => setShowOutput((v) => !v)}
className="text-[10px] text-[var(--color-accent)] hover:underline cursor-pointer"
>
{showOutput ? "Hide output" : "Show output"}
</button>
{showOutput && (
<CodeBlock
content={outputText}
maxLines={20}
/>
)}
</div>
)}
</div>
);
}
/** Monospace code block with optional line limit */
function CodeBlock({
content,
maxLines = 10,
}: {
content: string;
maxLines?: number;
}) {
const [expanded, setExpanded] = useState(false);
const lines = content.split("\n");
const isLong = lines.length > maxLines;
const displayContent =
!expanded && isLong
? lines.slice(0, maxLines).join("\n") + "\n..."
: content;
return (
<div>
<pre className="text-[11px] font-mono text-[var(--color-text-muted)] bg-[var(--color-bg)] rounded px-2 py-1.5 overflow-x-auto whitespace-pre-wrap break-all max-h-64 overflow-y-auto leading-relaxed">
{displayContent}
</pre>
{isLong && !expanded && (
<button
type="button"
onClick={() => setExpanded(true)}
className="text-[10px] text-[var(--color-accent)] hover:underline mt-0.5 cursor-pointer"
>
Show all {lines.length} lines
</button>
)}
</div>
);
}
/* ─── Tool classification helpers ─── */
type ToolDisplayType = "bash" | "code" | "file" | "search" | "generic";
function getToolDisplayType(toolName: string): ToolDisplayType {
const name = toolName.toLowerCase().replace(/[_-]/g, "");
if (
["bash", "shell", "execute", "exec", "terminal", "command"].some((k) =>
name.includes(k),
)
)
return "bash";
if (
["runcode", "python", "javascript", "typescript", "notebook"].some(
(k) => name.includes(k),
)
)
return "code";
if (
["file", "read", "write", "create", "edit", "str_replace"].some((k) =>
name.includes(k),
)
)
return "file";
if (
["search", "web", "grep", "find", "glob"].some((k) =>
name.includes(k),
)
)
return "search";
return "generic";
}
function getPrimaryArg(
toolName: string,
args?: Record<string, unknown>,
): string | undefined {
if (!args) return undefined;
const type = getToolDisplayType(toolName);
switch (type) {
case "bash":
return strArg(args, "command") ?? strArg(args, "cmd");
case "code":
return strArg(args, "code") ?? strArg(args, "script");
case "file":
return (
strArg(args, "path") ??
strArg(args, "file") ??
strArg(args, "file_path")
);
case "search":
return (
strArg(args, "query") ??
strArg(args, "search") ??
strArg(args, "pattern") ??
strArg(args, "q")
);
default: {
// Return first short string arg
for (const val of Object.values(args)) {
if (typeof val === "string" && val.length > 0 && val.length < 200) return val;
}
return undefined;
}
}
}
/** Safely extract a string value from an args object */
function strArg(
args: Record<string, unknown>,
key: string,
): string | undefined {
const val = args[key];
return typeof val === "string" && val.length > 0 ? val : undefined;
}
/** Build a short summary for the active tool (shown in collapsed header) */
function getToolSummary(
toolName: string,
args?: Record<string, unknown>,
): string | undefined {
if (!args) return undefined;
const type = getToolDisplayType(toolName);
const primary = getPrimaryArg(toolName, args);
if (!primary) return undefined;
switch (type) {
case "bash": {
// Show first 40 chars of command
const short =
primary.length > 40 ? primary.slice(0, 40) + "..." : primary;
return `Running: ${short}`;
}
case "file": {
return `Reading ${primary.split("/").pop()}`;
}
case "search": {
return `Searching: ${primary}`;
}
default:
return undefined;
}
}
/* ─── Helpers ─── */
/** Convert tool_name_like_this → Tool Name Like This */
function formatToolName(name: string): string {
return name
.replace(/_/g, " ")
.replace(/\b\w/g, (c) => c.toUpperCase())
.trim();
}
/* ─── Inline SVG icons (avoids adding lucide-react dep) ─── */
function SparkleIcon({ className }: { className?: string }) {
return (
<svg
className={className}
viewBox="0 0 16 16"
fill="currentColor"
xmlns="http://www.w3.org/2000/svg"
>
<path d="M8 0L9.8 6.2L16 8L9.8 9.8L8 16L6.2 9.8L0 8L6.2 6.2Z" />
</svg>
);
}
function ChevronIcon({ className }: { className?: string }) {
return (
<svg
className={className}
viewBox="0 0 12 12"
fill="none"
stroke="currentColor"
strokeWidth="1.5"
strokeLinecap="round"
strokeLinejoin="round"
>
<path d="M3 4.5L6 7.5L9 4.5" />
</svg>
);
}
function CheckIcon({ className }: { className?: string }) {
return (
<svg
className={className}
viewBox="0 0 12 12"
fill="none"
stroke="currentColor"
strokeWidth="1.5"
strokeLinecap="round"
strokeLinejoin="round"
>
<path d="M2.5 6L5 8.5L9.5 3.5" />
</svg>
);
}
function XIcon({ className }: { className?: string }) {
return (
<svg
className={className}
viewBox="0 0 12 12"
fill="none"
stroke="currentColor"
strokeWidth="1.5"
strokeLinecap="round"
strokeLinejoin="round"
>
<path d="M3 3L9 9M9 3L3 9" />
</svg>
);
}

View File

@ -1,56 +1,153 @@
"use client";
import type { UIMessage } from "ai";
import { ChainOfThought, type ChainPart } from "./chain-of-thought";
/* ─── Part grouping ─── */
type MessageSegment =
| { type: "text"; text: string }
| { type: "chain"; parts: ChainPart[] };
/** Map AI SDK tool state string to a simplified status */
function toolStatus(state: string): "running" | "done" | "error" {
if (state === "output-available") return "done";
if (state === "error") return "error";
return "running";
}
/**
* Group consecutive non-text parts (reasoning + tools) into chain-of-thought
* blocks, with text parts standing alone between them.
*/
function groupParts(parts: UIMessage["parts"]): MessageSegment[] {
const segments: MessageSegment[] = [];
let chain: ChainPart[] = [];
const flush = () => {
if (chain.length > 0) {
segments.push({ type: "chain", parts: [...chain] });
chain = [];
}
};
for (const part of parts) {
if (part.type === "text") {
flush();
segments.push({
type: "text",
text: (part as { type: "text"; text: string }).text,
});
} else if (part.type === "reasoning") {
const rp = part as {
type: "reasoning";
text: string;
state?: string;
};
chain.push({
kind: "reasoning",
text: rp.text,
isStreaming: rp.state === "streaming",
});
} else if (part.type === "dynamic-tool") {
const tp = part as {
type: "dynamic-tool";
toolName: string;
toolCallId: string;
state: string;
input?: unknown;
output?: unknown;
};
chain.push({
kind: "tool",
toolName: tp.toolName,
toolCallId: tp.toolCallId,
status: toolStatus(tp.state),
args: asRecord(tp.input),
output: asRecord(tp.output),
});
} else if (part.type.startsWith("tool-")) {
const tp = part as {
type: string;
toolCallId: string;
toolName?: string;
state?: string;
title?: string;
input?: unknown;
output?: unknown;
};
chain.push({
kind: "tool",
toolName:
tp.title ??
tp.toolName ??
part.type.replace("tool-", ""),
toolCallId: tp.toolCallId,
status: toolStatus(tp.state ?? "input-available"),
args: asRecord(tp.input),
output: asRecord(tp.output),
});
}
}
flush();
return segments;
}
/** Safely cast unknown to Record if it's a non-null object */
function asRecord(
val: unknown,
): Record<string, unknown> | undefined {
if (val && typeof val === "object" && !Array.isArray(val))
return val as Record<string, unknown>;
return undefined;
}
/* ─── Chat message ─── */
export function ChatMessage({ message }: { message: UIMessage }) {
const isUser = message.role === "user";
const isUser = message.role === "user";
const segments = groupParts(message.parts);
return (
<div className={`flex gap-3 py-4 ${isUser ? "justify-end" : "justify-start"}`}>
{!isUser && (
<div className="flex-shrink-0 w-8 h-8 rounded-full bg-[var(--color-accent)] flex items-center justify-center text-white text-sm font-bold">
O
</div>
)}
return (
<div
className={`flex gap-3 py-4 ${isUser ? "justify-end" : "justify-start"}`}
>
{!isUser && (
<div className="flex-shrink-0 w-8 h-8 rounded-full bg-[var(--color-accent)] flex items-center justify-center text-white text-sm font-bold">
O
</div>
)}
<div
className={`max-w-[75%] rounded-2xl px-4 py-3 ${
isUser
? "bg-[var(--color-accent)] text-white"
: "bg-[var(--color-surface)] text-[var(--color-text)]"
}`}
>
{message.parts.map((part, index) => {
if (part.type === "text") {
return (
<div key={index} className="whitespace-pre-wrap text-[15px] leading-relaxed">
{part.text}
</div>
);
}
if (part.type.startsWith("tool-")) {
const toolPart = part as { type: string; toolCallId: string; state?: string; title?: string };
return (
<div
key={index}
className="text-xs text-[var(--color-text-muted)] mt-2 px-2 py-1 bg-[var(--color-bg)] rounded font-mono"
>
Tool: {toolPart.title ?? toolPart.toolCallId}
{toolPart.state === "result" && (
<span className="ml-2 text-green-400">done</span>
)}
</div>
);
}
return null;
})}
</div>
<div
className={`max-w-[75%] rounded-2xl px-4 py-3 ${
isUser
? "bg-[var(--color-accent)] text-white"
: "bg-[var(--color-surface)] text-[var(--color-text)]"
}`}
>
{segments.map((segment, index) => {
if (segment.type === "text") {
return (
<div
key={index}
className="whitespace-pre-wrap text-[15px] leading-relaxed"
>
{segment.text}
</div>
);
}
return (
<ChainOfThought key={index} parts={segment.parts} />
);
})}
</div>
{isUser && (
<div className="flex-shrink-0 w-8 h-8 rounded-full bg-[var(--color-border)] flex items-center justify-center text-[var(--color-text-muted)] text-sm font-bold">
U
</div>
)}
</div>
);
{isUser && (
<div className="flex-shrink-0 w-8 h-8 rounded-full bg-[var(--color-border)] flex items-center justify-center text-[var(--color-text-muted)] text-sm font-bold">
U
</div>
)}
</div>
);
}

View File

@ -31,17 +31,26 @@ export default function Home() {
setSidebarRefreshKey((k) => k + 1);
}, []);
/** Persist messages to the web session's .jsonl file */
/** Persist messages to the web session's .jsonl file.
* Saves the full `parts` array (reasoning, tool calls, output, text)
* alongside a plain-text `content` field for backward compat / sidebar. */
const saveMessages = useCallback(
async (
sessionId: string,
msgs: Array<{ id: string; role: string; content: string }>,
msgs: Array<{
id: string;
role: string;
content: string;
parts?: unknown[];
}>,
title?: string,
) => {
const toSave = msgs.map((m) => ({
id: m.id,
role: m.role,
content: m.content,
// Persist full UIMessage parts so reasoning + tool calls survive reload
...(m.parts ? { parts: m.parts } : {}),
timestamp: new Date().toISOString(),
}));
try {
@ -91,13 +100,16 @@ export default function Home() {
const isNowReady = status === "ready";
if (wasStreaming && isNowReady && currentSessionId) {
// Save any unsaved messages (typically the assistant response)
// Save any unsaved messages (typically the assistant response).
// Include the full parts array so reasoning, tool calls, and their
// outputs persist across session reloads.
const unsaved = messages.filter((m) => !savedMessageIdsRef.current.has(m.id));
if (unsaved.length > 0) {
const toSave = unsaved.map((m) => ({
id: m.id,
role: m.role,
content: getMessageText(m),
parts: m.parts,
}));
saveMessages(currentSessionId, toSave);
}
@ -127,10 +139,15 @@ export default function Home() {
refreshSidebar();
}
// Save the user message immediately
// Save the user message immediately (include parts for consistency)
const userMsgId = `user-${Date.now()}`;
await saveMessages(sessionId, [
{ id: userMsgId, role: "user", content: userText },
{
id: userMsgId,
role: "user",
content: userText,
parts: [{ type: "text", text: userText }],
},
]);
// Send to agent
@ -155,15 +172,18 @@ export default function Home() {
id: string;
role: "user" | "assistant";
content: string;
parts?: Array<Record<string, unknown>>;
}> = data.messages || [];
// Convert to UIMessage format and mark all as saved
// Convert to UIMessage format and mark all as saved.
// Restore from saved `parts` if available (preserves reasoning,
// tool calls, output), falling back to plain text for old sessions.
const uiMessages = sessionMessages.map((msg) => {
savedMessageIdsRef.current.add(msg.id);
return {
id: msg.id,
role: msg.role,
parts: [{ type: "text" as const, text: msg.content }],
parts: msg.parts ?? [{ type: "text" as const, text: msg.content }],
};
});

View File

@ -3,98 +3,221 @@ import { createInterface } from "node:readline";
import { join } from "node:path";
export type AgentEvent = {
event: string;
runId?: string;
stream?: string;
data?: Record<string, unknown>;
seq?: number;
ts?: number;
sessionKey?: string;
status?: string;
result?: {
payloads?: Array<{ text?: string; mediaUrl?: string | null }>;
meta?: Record<string, unknown>;
};
event: string;
runId?: string;
stream?: string;
data?: Record<string, unknown>;
seq?: number;
ts?: number;
sessionKey?: string;
status?: string;
result?: {
payloads?: Array<{ text?: string; mediaUrl?: string | null }>;
meta?: Record<string, unknown>;
};
};
/** Extracted text + details from a tool result event. */
export type ToolResult = {
text?: string;
details?: Record<string, unknown>;
};
export type AgentCallback = {
onTextDelta: (delta: string) => void;
onLifecycleEnd: () => void;
onError: (error: Error) => void;
onClose: (code: number | null) => void;
onTextDelta: (delta: string) => void;
onThinkingDelta: (delta: string) => void;
onToolStart: (
toolCallId: string,
toolName: string,
args?: Record<string, unknown>,
) => void;
onToolEnd: (
toolCallId: string,
toolName: string,
isError: boolean,
result?: ToolResult,
) => void;
onLifecycleEnd: () => void;
onError: (error: Error) => void;
onClose: (code: number | null) => void;
};
/**
* Spawn the openclaw agent and stream its output
* Extract text content from the agent's tool result object.
* The result has `content: Array<{ type: "text", text: string } | ...>` and
* optional `details` (exit codes, file paths, etc.).
*/
export async function runAgent(message: string, callback: AgentCallback): Promise<void> {
// Get repo root - construct path dynamically at runtime
const cwd = process.cwd();
const root = cwd.endsWith(join("apps", "web")) ? join(cwd, "..", "..") : cwd;
function extractToolResult(
raw: unknown,
): ToolResult | undefined {
if (!raw || typeof raw !== "object") return undefined;
const r = raw as Record<string, unknown>;
// Construct script path at runtime to avoid static analysis
const pathParts = ["scripts", "run-node.mjs"];
const scriptPath = join(root, ...pathParts);
// Extract text from content blocks
const content = Array.isArray(r.content) ? r.content : [];
const textParts: string[] = [];
for (const block of content) {
if (
block &&
typeof block === "object" &&
(block as Record<string, unknown>).type === "text" &&
typeof (block as Record<string, unknown>).text === "string"
) {
textParts.push((block as Record<string, unknown>).text as string);
}
}
return new Promise<void>((resolve, reject) => {
const child = spawn(
"node",
[scriptPath, "agent", "--agent", "main", "--message", message, "--stream-json"],
{
cwd: root,
env: { ...process.env },
stdio: ["ignore", "pipe", "pipe"],
},
);
const text = textParts.length > 0 ? textParts.join("\n") : undefined;
const details =
r.details && typeof r.details === "object"
? (r.details as Record<string, unknown>)
: undefined;
const rl = createInterface({ input: child.stdout });
rl.on("line", (line: string) => {
if (!line.trim()) return;
let event: AgentEvent;
try {
event = JSON.parse(line) as AgentEvent;
} catch (err) {
console.log("[agent-runner] Non-JSON line:", line);
return; // skip non-JSON lines
}
console.log("[agent-runner] Event:", event.event, event.stream, event.data);
// Handle assistant text deltas
if (event.event === "agent" && event.stream === "assistant") {
const delta = typeof event.data?.delta === "string" ? event.data.delta : undefined;
if (delta) {
console.log("[agent-runner] Delta:", delta);
callback.onTextDelta(delta);
}
}
// Handle lifecycle end
if (
event.event === "agent" &&
event.stream === "lifecycle" &&
event.data?.phase === "end"
) {
console.log("[agent-runner] Lifecycle end");
callback.onLifecycleEnd();
}
});
child.on("close", (code) => {
callback.onClose(code);
resolve();
});
child.on("error", (err) => {
callback.onError(err);
resolve();
});
// Log stderr for debugging
child.stderr?.on("data", (chunk: Buffer) => {
console.error("[openclaw stderr]", chunk.toString());
});
});
return { text, details };
}
/**
* Spawn the openclaw agent and stream its output.
* Pass an AbortSignal to kill the child process when the caller cancels.
*/
export async function runAgent(
message: string,
signal: AbortSignal | undefined,
callback: AgentCallback,
): Promise<void> {
// Get repo root - construct path dynamically at runtime
const cwd = process.cwd();
const root = cwd.endsWith(join("apps", "web"))
? join(cwd, "..", "..")
: cwd;
// Construct script path at runtime to avoid static analysis
const pathParts = ["scripts", "run-node.mjs"];
const scriptPath = join(root, ...pathParts);
return new Promise<void>((resolve) => {
const child = spawn(
"node",
[
scriptPath,
"agent",
"--agent",
"main",
"--message",
message,
"--stream-json",
// Run embedded (--local) so we get ALL events (tool, thinking,
// lifecycle) unfiltered. The gateway path drops tool events
// unless verbose is explicitly "on".
"--local",
],
{
cwd: root,
env: { ...process.env },
stdio: ["ignore", "pipe", "pipe"],
},
);
// Kill the child process if the caller aborts (e.g. user hit stop).
if (signal) {
const onAbort = () => child.kill("SIGTERM");
if (signal.aborted) {
child.kill("SIGTERM");
} else {
signal.addEventListener("abort", onAbort, { once: true });
child.on("close", () =>
signal.removeEventListener("abort", onAbort),
);
}
}
const rl = createInterface({ input: child.stdout });
rl.on("line", (line: string) => {
if (!line.trim()) return;
let event: AgentEvent;
try {
event = JSON.parse(line) as AgentEvent;
} catch {
console.log("[agent-runner] Non-JSON line:", line);
return; // skip non-JSON lines
}
// Handle assistant text deltas
if (event.event === "agent" && event.stream === "assistant") {
const delta =
typeof event.data?.delta === "string"
? event.data.delta
: undefined;
if (delta) {
callback.onTextDelta(delta);
}
}
// Handle thinking/reasoning deltas
if (event.event === "agent" && event.stream === "thinking") {
const delta =
typeof event.data?.delta === "string"
? event.data.delta
: undefined;
if (delta) {
callback.onThinkingDelta(delta);
}
}
// Handle tool execution events
if (event.event === "agent" && event.stream === "tool") {
const phase =
typeof event.data?.phase === "string"
? event.data.phase
: undefined;
const toolCallId =
typeof event.data?.toolCallId === "string"
? event.data.toolCallId
: "";
const toolName =
typeof event.data?.name === "string"
? event.data.name
: "";
if (phase === "start") {
const args =
event.data?.args &&
typeof event.data.args === "object"
? (event.data.args as Record<string, unknown>)
: undefined;
callback.onToolStart(toolCallId, toolName, args);
} else if (phase === "result") {
const isError = event.data?.isError === true;
const result = extractToolResult(event.data?.result);
callback.onToolEnd(toolCallId, toolName, isError, result);
}
}
// Handle lifecycle end
if (
event.event === "agent" &&
event.stream === "lifecycle" &&
event.data?.phase === "end"
) {
callback.onLifecycleEnd();
}
});
child.on("close", (code) => {
callback.onClose(code);
resolve();
});
child.on("error", (err) => {
callback.onError(err);
resolve();
});
// Log stderr for debugging
child.stderr?.on("data", (chunk: Buffer) => {
console.error("[openclaw stderr]", chunk.toString());
});
});
}

View File

@ -109,10 +109,9 @@ export function handleMessageUpdate(
}
}
if (ctx.state.streamReasoning) {
// Handle partial <think> tags: stream whatever reasoning is visible so far.
ctx.emitReasoningStream(extractThinkingFromTaggedStream(ctx.state.deltaBuffer));
}
// Extract <think>-tagged reasoning and emit for all consumers (NDJSON/SSE, TUI).
// The emitReasoningStream function handles dedup and delta computation internally.
ctx.emitReasoningStream(extractThinkingFromTaggedStream(ctx.state.deltaBuffer));
const next = ctx
.stripBlockTags(ctx.state.deltaBuffer, {

View File

@ -7,6 +7,7 @@ import type { SubscribeEmbeddedPiSessionParams } from "./pi-embedded-subscribe.t
import { parseReplyDirectives } from "../auto-reply/reply/reply-directives.js";
import { createStreamingDirectiveAccumulator } from "../auto-reply/reply/streaming-directives.js";
import { formatToolAggregate } from "../auto-reply/tool-meta.js";
import { emitAgentEvent } from "../infra/agent-events.js";
import { createSubsystemLogger } from "../logging/subsystem.js";
import { buildCodeSpanIndex, createInlineCodeState } from "../markdown/code-spans.js";
import { EmbeddedBlockChunker } from "./pi-embedded-block-chunker.js";
@ -474,9 +475,6 @@ export function subscribeEmbeddedPiSession(params: SubscribeEmbeddedPiSessionPar
};
const emitReasoningStream = (text: string) => {
if (!state.streamReasoning || !params.onReasoningStream) {
return;
}
const formatted = formatReasoningMessage(text);
if (!formatted) {
return;
@ -484,10 +482,26 @@ export function subscribeEmbeddedPiSession(params: SubscribeEmbeddedPiSessionPar
if (formatted === state.lastStreamedReasoning) {
return;
}
// Compute delta before updating state
const prev = state.lastStreamedReasoning ?? "";
const delta = formatted.startsWith(prev) ? formatted.slice(prev.length) : formatted;
state.lastStreamedReasoning = formatted;
void params.onReasoningStream({
text: formatted,
});
// Always emit as agent event so NDJSON/SSE consumers (e.g. --stream-json, web UI) receive thinking
if (delta && params.runId) {
emitAgentEvent({
runId: params.runId,
stream: "thinking",
data: { delta, text: formatted },
});
}
// Callback-based emission (TUI, messaging channels) still guarded by streamReasoning
if (state.streamReasoning && params.onReasoningStream) {
void params.onReasoningStream({
text: formatted,
});
}
};
const resetForCompactionRetry = () => {