diff --git a/apps/web/app/api/chat/route.ts b/apps/web/app/api/chat/route.ts index 0eb2b2dbb35..99bc172a4cb 100644 --- a/apps/web/app/api/chat/route.ts +++ b/apps/web/app/api/chat/route.ts @@ -1,5 +1,5 @@ import type { UIMessage } from "ai"; -import { runAgent } from "@/lib/agent-runner"; +import { runAgent, type ToolResult } from "@/lib/agent-runner"; // Force Node.js runtime (required for child_process) export const runtime = "nodejs"; @@ -7,100 +7,261 @@ export const runtime = "nodejs"; // Allow streaming responses up to 10 minutes export const maxDuration = 600; -export async function POST(req: Request) { - const { messages }: { messages: UIMessage[] } = await req.json(); - - // Extract the latest user message text - const lastUserMessage = messages.filter((m) => m.role === "user").pop(); - const userText = - lastUserMessage?.parts - ?.filter((p): p is { type: "text"; text: string } => p.type === "text") - .map((p) => p.text) - .join("\n") ?? ""; - - console.log("[chat] Received message:", userText); - - if (!userText.trim()) { - return new Response("No message provided", { status: 400 }); - } - - // Create a custom SSE stream - const encoder = new TextEncoder(); - const stream = new ReadableStream({ - async start(controller) { - const textPartId = `text-${Date.now()}`; - let started = false; - - const writeEvent = (data: unknown) => { - const json = JSON.stringify(data); - console.log("[chat] SSE write:", json); - controller.enqueue(encoder.encode(`data: ${json}\n\n`)); - }; - - console.log("[chat] Starting agent stream..."); - - try { - await runAgent(userText, { - onTextDelta: (delta) => { - console.log("[chat] Text delta:", delta); - if (!started) { - console.log("[chat] Writing text-start"); - writeEvent({ type: "text-start", id: textPartId }); - started = true; - } - writeEvent({ type: "text-delta", id: textPartId, delta }); - }, - onLifecycleEnd: () => { - console.log("[chat] Lifecycle end, started:", started); - if (started) { - writeEvent({ type: "text-end", id: textPartId }); - } - }, - onError: (err) => { - console.error("[chat] Agent error:", err); - if (!started) { - writeEvent({ type: "text-start", id: textPartId }); - writeEvent({ - type: "text-delta", - id: textPartId, - delta: `Error starting agent: ${err.message}`, - }); - writeEvent({ type: "text-end", id: textPartId }); - } - }, - onClose: (code) => { - console.log("[chat] Agent closed with code:", code, "started:", started); - // If we never started text, emit an empty response - if (!started) { - writeEvent({ type: "text-start", id: textPartId }); - writeEvent({ - type: "text-delta", - id: textPartId, - delta: "(No response from agent)", - }); - writeEvent({ type: "text-end", id: textPartId }); - } - }, - }); - - console.log("[chat] Agent stream complete"); - } catch (error) { - console.error("[chat] Stream error:", error); - writeEvent({ - type: "error", - error: error instanceof Error ? error.message : String(error), - }); - } finally { - controller.close(); - } - }, - }); - - return new Response(stream, { - headers: { - "Content-Type": "text/event-stream", - "Cache-Control": "no-cache, no-transform", - Connection: "keep-alive", - }, - }); +/** + * Build a flat output object from the agent's tool result so the frontend + * can render tool output text, exit codes, etc. + */ +function buildToolOutput( + result?: ToolResult, +): Record { + if (!result) return {}; + const out: Record = {}; + if (result.text) out.text = result.text; + if (result.details) { + // Forward useful details (exit code, duration, status, cwd) + for (const key of [ + "exitCode", + "status", + "durationMs", + "cwd", + "error", + "reason", + ]) { + if (result.details[key] !== undefined) + out[key] = result.details[key]; + } + } + return out; +} + +export async function POST(req: Request) { + const { messages }: { messages: UIMessage[] } = await req.json(); + + // Extract the latest user message text + const lastUserMessage = messages.filter((m) => m.role === "user").pop(); + const userText = + lastUserMessage?.parts + ?.filter( + (p): p is { type: "text"; text: string } => p.type === "text", + ) + .map((p) => p.text) + .join("\n") ?? ""; + + if (!userText.trim()) { + return new Response("No message provided", { status: 400 }); + } + + // Create a custom SSE stream using the AI SDK v6 data stream wire format. + // DefaultChatTransport parses these events into UIMessage parts automatically. + const encoder = new TextEncoder(); + let closed = false; + const abortController = new AbortController(); + const stream = new ReadableStream({ + async start(controller) { + // Use incrementing IDs so multi-round reasoning/text cycles get + // unique part IDs (avoids conflicts in the AI SDK transport). + let idCounter = 0; + const nextId = (prefix: string) => + `${prefix}-${Date.now()}-${++idCounter}`; + + let currentTextId = ""; + let currentReasoningId = ""; + let textStarted = false; + let reasoningStarted = false; + // Track whether ANY text was ever sent across the full run. + // onLifecycleEnd closes the text part (textStarted→false), so + // onClose can't rely on textStarted alone to detect "no output". + let everSentText = false; + + /** Write an SSE event; silently no-ops if the stream was already cancelled. */ + const writeEvent = (data: unknown) => { + if (closed) return; + const json = JSON.stringify(data); + controller.enqueue(encoder.encode(`data: ${json}\n\n`)); + }; + + /** Close the reasoning part if open. */ + const closeReasoning = () => { + if (reasoningStarted) { + writeEvent({ + type: "reasoning-end", + id: currentReasoningId, + }); + reasoningStarted = false; + } + }; + + /** Close the text part if open. */ + const closeText = () => { + if (textStarted) { + writeEvent({ type: "text-end", id: currentTextId }); + textStarted = false; + } + }; + + try { + await runAgent(userText, abortController.signal, { + onThinkingDelta: (delta) => { + if (!reasoningStarted) { + currentReasoningId = nextId("reasoning"); + writeEvent({ + type: "reasoning-start", + id: currentReasoningId, + }); + reasoningStarted = true; + } + writeEvent({ + type: "reasoning-delta", + id: currentReasoningId, + delta, + }); + }, + + onTextDelta: (delta) => { + // Close reasoning once text starts streaming + closeReasoning(); + + if (!textStarted) { + currentTextId = nextId("text"); + writeEvent({ + type: "text-start", + id: currentTextId, + }); + textStarted = true; + } + everSentText = true; + writeEvent({ + type: "text-delta", + id: currentTextId, + delta, + }); + }, + + onToolStart: (toolCallId, toolName, args) => { + // Close open reasoning/text parts before tool events + closeReasoning(); + closeText(); + + writeEvent({ + type: "tool-input-start", + toolCallId, + toolName, + }); + // Include actual tool arguments so the frontend can + // display what the tool is doing (command, path, etc.) + writeEvent({ + type: "tool-input-available", + toolCallId, + toolName, + input: args ?? {}, + }); + }, + + onToolEnd: ( + toolCallId, + _toolName, + isError, + result, + ) => { + if (isError) { + const errorText = + result?.text || + (result?.details?.error as + | string + | undefined) || + "Tool execution failed"; + writeEvent({ + type: "tool-output-error", + toolCallId, + errorText, + }); + } else { + // Include the actual tool output (text, exit code, etc.) + writeEvent({ + type: "tool-output-available", + toolCallId, + output: buildToolOutput(result), + }); + } + }, + + onLifecycleEnd: () => { + closeReasoning(); + closeText(); + }, + + onError: (err) => { + console.error("[chat] Agent error:", err); + closeReasoning(); + if (!textStarted) { + currentTextId = nextId("text"); + writeEvent({ + type: "text-start", + id: currentTextId, + }); + textStarted = true; + } + writeEvent({ + type: "text-delta", + id: currentTextId, + delta: `Error starting agent: ${err.message}`, + }); + writeEvent({ type: "text-end", id: currentTextId }); + textStarted = false; + }, + + onClose: (_code) => { + closeReasoning(); + if (!everSentText) { + // No text was ever sent during the entire run + currentTextId = nextId("text"); + writeEvent({ + type: "text-start", + id: currentTextId, + }); + writeEvent({ + type: "text-delta", + id: currentTextId, + delta: "(No response from agent)", + }); + writeEvent({ + type: "text-end", + id: currentTextId, + }); + } else { + // Ensure any still-open text part is closed + closeText(); + } + }, + }); + } catch (error) { + console.error("[chat] Stream error:", error); + writeEvent({ + type: "error", + errorText: + error instanceof Error + ? error.message + : String(error), + }); + } finally { + if (!closed) { + closed = true; + controller.close(); + } + } + }, + cancel() { + // Client disconnected (e.g. user hit stop) — tear down gracefully. + closed = true; + abortController.abort(); + }, + }); + + return new Response(stream, { + headers: { + "Content-Type": "text/event-stream", + "Cache-Control": "no-cache, no-transform", + Connection: "keep-alive", + }, + }); } diff --git a/apps/web/app/api/new-session/route.ts b/apps/web/app/api/new-session/route.ts index 84ee3ba2626..986a4791cd0 100644 --- a/apps/web/app/api/new-session/route.ts +++ b/apps/web/app/api/new-session/route.ts @@ -8,8 +8,11 @@ export const maxDuration = 30; /** POST /api/new-session — send /new to the agent to start a fresh backend session */ export async function POST() { return new Promise((resolve) => { - runAgent("/new", { + runAgent("/new", undefined, { onTextDelta: () => {}, + onThinkingDelta: () => {}, + onToolStart: () => {}, + onToolEnd: () => {}, onLifecycleEnd: () => {}, onError: (err) => { console.error("[new-session] Error:", err); diff --git a/apps/web/app/api/web-sessions/[id]/route.ts b/apps/web/app/api/web-sessions/[id]/route.ts index fb299f1edf0..8607c9c9ba5 100644 --- a/apps/web/app/api/web-sessions/[id]/route.ts +++ b/apps/web/app/api/web-sessions/[id]/route.ts @@ -9,7 +9,12 @@ const WEB_CHAT_DIR = join(homedir(), ".openclaw", "web-chat"); export type ChatLine = { id: string; role: "user" | "assistant"; + /** Plain text summary (always present, used for sidebar / backward compat). */ content: string; + /** Full UIMessage parts array — reasoning, tool calls, outputs, text. + * Present for sessions saved after the rich-persistence update; + * absent for older sessions (fall back to `content` as a text part). */ + parts?: Array>; timestamp: string; }; diff --git a/apps/web/app/components/chain-of-thought.tsx b/apps/web/app/components/chain-of-thought.tsx new file mode 100644 index 00000000000..9670aa43392 --- /dev/null +++ b/apps/web/app/components/chain-of-thought.tsx @@ -0,0 +1,480 @@ +"use client"; + +import { useEffect, useRef, useState } from "react"; + +/* ─── Public types ─── */ + +export type ChainPart = + | { kind: "reasoning"; text: string; isStreaming: boolean } + | { + kind: "tool"; + toolName: string; + toolCallId: string; + status: "running" | "done" | "error"; + args?: Record; + output?: Record; + errorText?: string; + }; + +/* ─── Main component ─── */ + +export function ChainOfThought({ parts }: { parts: ChainPart[] }) { + const [isOpen, setIsOpen] = useState(true); + const prevActiveRef = useRef(true); + + const isActive = parts.some( + (p) => + (p.kind === "reasoning" && p.isStreaming) || + (p.kind === "tool" && p.status === "running"), + ); + + // Auto-collapse once all steps finish (active → inactive transition) + useEffect(() => { + if (prevActiveRef.current && !isActive && parts.length > 0) { + setIsOpen(false); + } + prevActiveRef.current = isActive; + }, [isActive, parts.length]); + + // Aggregate reasoning text from all reasoning parts + const reasoningText = parts + .filter( + (p): p is Extract => + p.kind === "reasoning", + ) + .map((p) => p.text) + .join(""); + const isReasoningStreaming = parts.some( + (p) => p.kind === "reasoning" && p.isStreaming, + ); + + // Tool steps + const tools = parts.filter( + (p): p is Extract => p.kind === "tool", + ); + const completedTools = tools.filter((t) => t.status === "done").length; + const activeTool = tools.find((t) => t.status === "running"); + + // Header label summarizes current/completed activity + let headerLabel: string; + if (isActive) { + if (activeTool) { + // Show what the active tool is doing + const summary = getToolSummary( + activeTool.toolName, + activeTool.args, + ); + headerLabel = summary || formatToolName(activeTool.toolName); + } else { + headerLabel = "Thinking"; + } + } else if (tools.length > 0) { + headerLabel = `Reasoned with ${completedTools} tool${completedTools !== 1 ? "s" : ""}`; + } else { + headerLabel = "Reasoned"; + } + + return ( +
+ {/* Trigger */} + + + {/* Collapsible content (smooth CSS grid animation) */} +
+
+
+ {/* Reasoning text block */} + {reasoningText && ( + + )} + + {/* Tool step timeline */} + {tools.length > 0 && ( +
+ {tools.map((tool) => ( + + ))} +
+ )} +
+
+
+
+ ); +} + +/* ─── Sub-components ─── */ + +/** Expandable reasoning text display */ +function ReasoningText({ + text, + isStreaming, +}: { + text: string; + isStreaming: boolean; +}) { + const [expanded, setExpanded] = useState(false); + const isLong = text.length > 300; + + return ( +
+
+ {text} + {isStreaming && ( + + )} +
+ {isLong && !expanded && ( + + )} +
+ ); +} + +/** Rich tool step with args display and collapsible output */ +function ToolStep({ + toolName, + status, + args, + output, + errorText, +}: { + toolName: string; + status: "running" | "done" | "error"; + args?: Record; + output?: Record; + errorText?: string; +}) { + const [showOutput, setShowOutput] = useState(false); + const displayType = getToolDisplayType(toolName); + const primaryArg = getPrimaryArg(toolName, args); + const outputText = + typeof output?.text === "string" ? output.text : undefined; + const exitCode = + output?.exitCode !== undefined ? Number(output.exitCode) : undefined; + + return ( +
+ {/* Tool name + status */} +
+ {status === "running" && ( + + )} + {status === "done" && ( + + )} + {status === "error" && ( + + )} + + + {formatToolName(toolName)} + + + {/* Exit code badge for bash/exec tools */} + {exitCode !== undefined && exitCode !== 0 && ( + + exit {exitCode} + + )} +
+ + {/* Primary argument: command, path, query, code, etc. */} + {primaryArg && ( +
+ {displayType === "bash" ? ( + + ) : displayType === "code" ? ( + + ) : ( +
+ {primaryArg} +
+ )} +
+ )} + + {/* Error message */} + {status === "error" && errorText && ( +
+ {errorText} +
+ )} + + {/* Tool output */} + {outputText && status === "done" && ( +
+ + {showOutput && ( + + )} +
+ )} +
+ ); +} + +/** Monospace code block with optional line limit */ +function CodeBlock({ + content, + maxLines = 10, +}: { + content: string; + maxLines?: number; +}) { + const [expanded, setExpanded] = useState(false); + const lines = content.split("\n"); + const isLong = lines.length > maxLines; + const displayContent = + !expanded && isLong + ? lines.slice(0, maxLines).join("\n") + "\n..." + : content; + + return ( +
+
+				{displayContent}
+			
+ {isLong && !expanded && ( + + )} +
+ ); +} + +/* ─── Tool classification helpers ─── */ + +type ToolDisplayType = "bash" | "code" | "file" | "search" | "generic"; + +function getToolDisplayType(toolName: string): ToolDisplayType { + const name = toolName.toLowerCase().replace(/[_-]/g, ""); + if ( + ["bash", "shell", "execute", "exec", "terminal", "command"].some((k) => + name.includes(k), + ) + ) + return "bash"; + if ( + ["runcode", "python", "javascript", "typescript", "notebook"].some( + (k) => name.includes(k), + ) + ) + return "code"; + if ( + ["file", "read", "write", "create", "edit", "str_replace"].some((k) => + name.includes(k), + ) + ) + return "file"; + if ( + ["search", "web", "grep", "find", "glob"].some((k) => + name.includes(k), + ) + ) + return "search"; + return "generic"; +} + +function getPrimaryArg( + toolName: string, + args?: Record, +): string | undefined { + if (!args) return undefined; + const type = getToolDisplayType(toolName); + switch (type) { + case "bash": + return strArg(args, "command") ?? strArg(args, "cmd"); + case "code": + return strArg(args, "code") ?? strArg(args, "script"); + case "file": + return ( + strArg(args, "path") ?? + strArg(args, "file") ?? + strArg(args, "file_path") + ); + case "search": + return ( + strArg(args, "query") ?? + strArg(args, "search") ?? + strArg(args, "pattern") ?? + strArg(args, "q") + ); + default: { + // Return first short string arg + for (const val of Object.values(args)) { + if (typeof val === "string" && val.length > 0 && val.length < 200) return val; + } + return undefined; + } + } +} + +/** Safely extract a string value from an args object */ +function strArg( + args: Record, + key: string, +): string | undefined { + const val = args[key]; + return typeof val === "string" && val.length > 0 ? val : undefined; +} + +/** Build a short summary for the active tool (shown in collapsed header) */ +function getToolSummary( + toolName: string, + args?: Record, +): string | undefined { + if (!args) return undefined; + const type = getToolDisplayType(toolName); + const primary = getPrimaryArg(toolName, args); + if (!primary) return undefined; + + switch (type) { + case "bash": { + // Show first 40 chars of command + const short = + primary.length > 40 ? primary.slice(0, 40) + "..." : primary; + return `Running: ${short}`; + } + case "file": { + return `Reading ${primary.split("/").pop()}`; + } + case "search": { + return `Searching: ${primary}`; + } + default: + return undefined; + } +} + +/* ─── Helpers ─── */ + +/** Convert tool_name_like_this → Tool Name Like This */ +function formatToolName(name: string): string { + return name + .replace(/_/g, " ") + .replace(/\b\w/g, (c) => c.toUpperCase()) + .trim(); +} + +/* ─── Inline SVG icons (avoids adding lucide-react dep) ─── */ + +function SparkleIcon({ className }: { className?: string }) { + return ( + + + + ); +} + +function ChevronIcon({ className }: { className?: string }) { + return ( + + + + ); +} + +function CheckIcon({ className }: { className?: string }) { + return ( + + + + ); +} + +function XIcon({ className }: { className?: string }) { + return ( + + + + ); +} diff --git a/apps/web/app/components/chat-message.tsx b/apps/web/app/components/chat-message.tsx index 082b94ed967..bdacb5ef005 100644 --- a/apps/web/app/components/chat-message.tsx +++ b/apps/web/app/components/chat-message.tsx @@ -1,56 +1,153 @@ "use client"; import type { UIMessage } from "ai"; +import { ChainOfThought, type ChainPart } from "./chain-of-thought"; + +/* ─── Part grouping ─── */ + +type MessageSegment = + | { type: "text"; text: string } + | { type: "chain"; parts: ChainPart[] }; + +/** Map AI SDK tool state string to a simplified status */ +function toolStatus(state: string): "running" | "done" | "error" { + if (state === "output-available") return "done"; + if (state === "error") return "error"; + return "running"; +} + +/** + * Group consecutive non-text parts (reasoning + tools) into chain-of-thought + * blocks, with text parts standing alone between them. + */ +function groupParts(parts: UIMessage["parts"]): MessageSegment[] { + const segments: MessageSegment[] = []; + let chain: ChainPart[] = []; + + const flush = () => { + if (chain.length > 0) { + segments.push({ type: "chain", parts: [...chain] }); + chain = []; + } + }; + + for (const part of parts) { + if (part.type === "text") { + flush(); + segments.push({ + type: "text", + text: (part as { type: "text"; text: string }).text, + }); + } else if (part.type === "reasoning") { + const rp = part as { + type: "reasoning"; + text: string; + state?: string; + }; + chain.push({ + kind: "reasoning", + text: rp.text, + isStreaming: rp.state === "streaming", + }); + } else if (part.type === "dynamic-tool") { + const tp = part as { + type: "dynamic-tool"; + toolName: string; + toolCallId: string; + state: string; + input?: unknown; + output?: unknown; + }; + chain.push({ + kind: "tool", + toolName: tp.toolName, + toolCallId: tp.toolCallId, + status: toolStatus(tp.state), + args: asRecord(tp.input), + output: asRecord(tp.output), + }); + } else if (part.type.startsWith("tool-")) { + const tp = part as { + type: string; + toolCallId: string; + toolName?: string; + state?: string; + title?: string; + input?: unknown; + output?: unknown; + }; + chain.push({ + kind: "tool", + toolName: + tp.title ?? + tp.toolName ?? + part.type.replace("tool-", ""), + toolCallId: tp.toolCallId, + status: toolStatus(tp.state ?? "input-available"), + args: asRecord(tp.input), + output: asRecord(tp.output), + }); + } + } + + flush(); + return segments; +} + +/** Safely cast unknown to Record if it's a non-null object */ +function asRecord( + val: unknown, +): Record | undefined { + if (val && typeof val === "object" && !Array.isArray(val)) + return val as Record; + return undefined; +} + +/* ─── Chat message ─── */ export function ChatMessage({ message }: { message: UIMessage }) { - const isUser = message.role === "user"; + const isUser = message.role === "user"; + const segments = groupParts(message.parts); - return ( -
- {!isUser && ( -
- O -
- )} + return ( +
+ {!isUser && ( +
+ O +
+ )} -
- {message.parts.map((part, index) => { - if (part.type === "text") { - return ( -
- {part.text} -
- ); - } - if (part.type.startsWith("tool-")) { - const toolPart = part as { type: string; toolCallId: string; state?: string; title?: string }; - return ( -
- Tool: {toolPart.title ?? toolPart.toolCallId} - {toolPart.state === "result" && ( - done - )} -
- ); - } - return null; - })} -
+
+ {segments.map((segment, index) => { + if (segment.type === "text") { + return ( +
+ {segment.text} +
+ ); + } + return ( + + ); + })} +
- {isUser && ( -
- U -
- )} -
- ); + {isUser && ( +
+ U +
+ )} +
+ ); } diff --git a/apps/web/app/page.tsx b/apps/web/app/page.tsx index 5d665d9f9f2..c45368b7e1f 100644 --- a/apps/web/app/page.tsx +++ b/apps/web/app/page.tsx @@ -31,17 +31,26 @@ export default function Home() { setSidebarRefreshKey((k) => k + 1); }, []); - /** Persist messages to the web session's .jsonl file */ + /** Persist messages to the web session's .jsonl file. + * Saves the full `parts` array (reasoning, tool calls, output, text) + * alongside a plain-text `content` field for backward compat / sidebar. */ const saveMessages = useCallback( async ( sessionId: string, - msgs: Array<{ id: string; role: string; content: string }>, + msgs: Array<{ + id: string; + role: string; + content: string; + parts?: unknown[]; + }>, title?: string, ) => { const toSave = msgs.map((m) => ({ id: m.id, role: m.role, content: m.content, + // Persist full UIMessage parts so reasoning + tool calls survive reload + ...(m.parts ? { parts: m.parts } : {}), timestamp: new Date().toISOString(), })); try { @@ -91,13 +100,16 @@ export default function Home() { const isNowReady = status === "ready"; if (wasStreaming && isNowReady && currentSessionId) { - // Save any unsaved messages (typically the assistant response) + // Save any unsaved messages (typically the assistant response). + // Include the full parts array so reasoning, tool calls, and their + // outputs persist across session reloads. const unsaved = messages.filter((m) => !savedMessageIdsRef.current.has(m.id)); if (unsaved.length > 0) { const toSave = unsaved.map((m) => ({ id: m.id, role: m.role, content: getMessageText(m), + parts: m.parts, })); saveMessages(currentSessionId, toSave); } @@ -127,10 +139,15 @@ export default function Home() { refreshSidebar(); } - // Save the user message immediately + // Save the user message immediately (include parts for consistency) const userMsgId = `user-${Date.now()}`; await saveMessages(sessionId, [ - { id: userMsgId, role: "user", content: userText }, + { + id: userMsgId, + role: "user", + content: userText, + parts: [{ type: "text", text: userText }], + }, ]); // Send to agent @@ -155,15 +172,18 @@ export default function Home() { id: string; role: "user" | "assistant"; content: string; + parts?: Array>; }> = data.messages || []; - // Convert to UIMessage format and mark all as saved + // Convert to UIMessage format and mark all as saved. + // Restore from saved `parts` if available (preserves reasoning, + // tool calls, output), falling back to plain text for old sessions. const uiMessages = sessionMessages.map((msg) => { savedMessageIdsRef.current.add(msg.id); return { id: msg.id, role: msg.role, - parts: [{ type: "text" as const, text: msg.content }], + parts: msg.parts ?? [{ type: "text" as const, text: msg.content }], }; }); diff --git a/apps/web/lib/agent-runner.ts b/apps/web/lib/agent-runner.ts index b62f7a8c3fe..46d33ab5f85 100644 --- a/apps/web/lib/agent-runner.ts +++ b/apps/web/lib/agent-runner.ts @@ -3,98 +3,221 @@ import { createInterface } from "node:readline"; import { join } from "node:path"; export type AgentEvent = { - event: string; - runId?: string; - stream?: string; - data?: Record; - seq?: number; - ts?: number; - sessionKey?: string; - status?: string; - result?: { - payloads?: Array<{ text?: string; mediaUrl?: string | null }>; - meta?: Record; - }; + event: string; + runId?: string; + stream?: string; + data?: Record; + seq?: number; + ts?: number; + sessionKey?: string; + status?: string; + result?: { + payloads?: Array<{ text?: string; mediaUrl?: string | null }>; + meta?: Record; + }; +}; + +/** Extracted text + details from a tool result event. */ +export type ToolResult = { + text?: string; + details?: Record; }; export type AgentCallback = { - onTextDelta: (delta: string) => void; - onLifecycleEnd: () => void; - onError: (error: Error) => void; - onClose: (code: number | null) => void; + onTextDelta: (delta: string) => void; + onThinkingDelta: (delta: string) => void; + onToolStart: ( + toolCallId: string, + toolName: string, + args?: Record, + ) => void; + onToolEnd: ( + toolCallId: string, + toolName: string, + isError: boolean, + result?: ToolResult, + ) => void; + onLifecycleEnd: () => void; + onError: (error: Error) => void; + onClose: (code: number | null) => void; }; /** - * Spawn the openclaw agent and stream its output + * Extract text content from the agent's tool result object. + * The result has `content: Array<{ type: "text", text: string } | ...>` and + * optional `details` (exit codes, file paths, etc.). */ -export async function runAgent(message: string, callback: AgentCallback): Promise { - // Get repo root - construct path dynamically at runtime - const cwd = process.cwd(); - const root = cwd.endsWith(join("apps", "web")) ? join(cwd, "..", "..") : cwd; +function extractToolResult( + raw: unknown, +): ToolResult | undefined { + if (!raw || typeof raw !== "object") return undefined; + const r = raw as Record; - // Construct script path at runtime to avoid static analysis - const pathParts = ["scripts", "run-node.mjs"]; - const scriptPath = join(root, ...pathParts); + // Extract text from content blocks + const content = Array.isArray(r.content) ? r.content : []; + const textParts: string[] = []; + for (const block of content) { + if ( + block && + typeof block === "object" && + (block as Record).type === "text" && + typeof (block as Record).text === "string" + ) { + textParts.push((block as Record).text as string); + } + } - return new Promise((resolve, reject) => { - const child = spawn( - "node", - [scriptPath, "agent", "--agent", "main", "--message", message, "--stream-json"], - { - cwd: root, - env: { ...process.env }, - stdio: ["ignore", "pipe", "pipe"], - }, - ); + const text = textParts.length > 0 ? textParts.join("\n") : undefined; + const details = + r.details && typeof r.details === "object" + ? (r.details as Record) + : undefined; - const rl = createInterface({ input: child.stdout }); - - rl.on("line", (line: string) => { - if (!line.trim()) return; - - let event: AgentEvent; - try { - event = JSON.parse(line) as AgentEvent; - } catch (err) { - console.log("[agent-runner] Non-JSON line:", line); - return; // skip non-JSON lines - } - - console.log("[agent-runner] Event:", event.event, event.stream, event.data); - - // Handle assistant text deltas - if (event.event === "agent" && event.stream === "assistant") { - const delta = typeof event.data?.delta === "string" ? event.data.delta : undefined; - if (delta) { - console.log("[agent-runner] Delta:", delta); - callback.onTextDelta(delta); - } - } - - // Handle lifecycle end - if ( - event.event === "agent" && - event.stream === "lifecycle" && - event.data?.phase === "end" - ) { - console.log("[agent-runner] Lifecycle end"); - callback.onLifecycleEnd(); - } - }); - - child.on("close", (code) => { - callback.onClose(code); - resolve(); - }); - - child.on("error", (err) => { - callback.onError(err); - resolve(); - }); - - // Log stderr for debugging - child.stderr?.on("data", (chunk: Buffer) => { - console.error("[openclaw stderr]", chunk.toString()); - }); - }); + return { text, details }; +} + +/** + * Spawn the openclaw agent and stream its output. + * Pass an AbortSignal to kill the child process when the caller cancels. + */ +export async function runAgent( + message: string, + signal: AbortSignal | undefined, + callback: AgentCallback, +): Promise { + // Get repo root - construct path dynamically at runtime + const cwd = process.cwd(); + const root = cwd.endsWith(join("apps", "web")) + ? join(cwd, "..", "..") + : cwd; + + // Construct script path at runtime to avoid static analysis + const pathParts = ["scripts", "run-node.mjs"]; + const scriptPath = join(root, ...pathParts); + + return new Promise((resolve) => { + const child = spawn( + "node", + [ + scriptPath, + "agent", + "--agent", + "main", + "--message", + message, + "--stream-json", + // Run embedded (--local) so we get ALL events (tool, thinking, + // lifecycle) unfiltered. The gateway path drops tool events + // unless verbose is explicitly "on". + "--local", + ], + { + cwd: root, + env: { ...process.env }, + stdio: ["ignore", "pipe", "pipe"], + }, + ); + + // Kill the child process if the caller aborts (e.g. user hit stop). + if (signal) { + const onAbort = () => child.kill("SIGTERM"); + if (signal.aborted) { + child.kill("SIGTERM"); + } else { + signal.addEventListener("abort", onAbort, { once: true }); + child.on("close", () => + signal.removeEventListener("abort", onAbort), + ); + } + } + + const rl = createInterface({ input: child.stdout }); + + rl.on("line", (line: string) => { + if (!line.trim()) return; + + let event: AgentEvent; + try { + event = JSON.parse(line) as AgentEvent; + } catch { + console.log("[agent-runner] Non-JSON line:", line); + return; // skip non-JSON lines + } + + // Handle assistant text deltas + if (event.event === "agent" && event.stream === "assistant") { + const delta = + typeof event.data?.delta === "string" + ? event.data.delta + : undefined; + if (delta) { + callback.onTextDelta(delta); + } + } + + // Handle thinking/reasoning deltas + if (event.event === "agent" && event.stream === "thinking") { + const delta = + typeof event.data?.delta === "string" + ? event.data.delta + : undefined; + if (delta) { + callback.onThinkingDelta(delta); + } + } + + // Handle tool execution events + if (event.event === "agent" && event.stream === "tool") { + const phase = + typeof event.data?.phase === "string" + ? event.data.phase + : undefined; + const toolCallId = + typeof event.data?.toolCallId === "string" + ? event.data.toolCallId + : ""; + const toolName = + typeof event.data?.name === "string" + ? event.data.name + : ""; + + if (phase === "start") { + const args = + event.data?.args && + typeof event.data.args === "object" + ? (event.data.args as Record) + : undefined; + callback.onToolStart(toolCallId, toolName, args); + } else if (phase === "result") { + const isError = event.data?.isError === true; + const result = extractToolResult(event.data?.result); + callback.onToolEnd(toolCallId, toolName, isError, result); + } + } + + // Handle lifecycle end + if ( + event.event === "agent" && + event.stream === "lifecycle" && + event.data?.phase === "end" + ) { + callback.onLifecycleEnd(); + } + }); + + child.on("close", (code) => { + callback.onClose(code); + resolve(); + }); + + child.on("error", (err) => { + callback.onError(err); + resolve(); + }); + + // Log stderr for debugging + child.stderr?.on("data", (chunk: Buffer) => { + console.error("[openclaw stderr]", chunk.toString()); + }); + }); } diff --git a/src/agents/pi-embedded-subscribe.handlers.messages.ts b/src/agents/pi-embedded-subscribe.handlers.messages.ts index 840d5c74b76..67c59335deb 100644 --- a/src/agents/pi-embedded-subscribe.handlers.messages.ts +++ b/src/agents/pi-embedded-subscribe.handlers.messages.ts @@ -109,10 +109,9 @@ export function handleMessageUpdate( } } - if (ctx.state.streamReasoning) { - // Handle partial tags: stream whatever reasoning is visible so far. - ctx.emitReasoningStream(extractThinkingFromTaggedStream(ctx.state.deltaBuffer)); - } + // Extract -tagged reasoning and emit for all consumers (NDJSON/SSE, TUI). + // The emitReasoningStream function handles dedup and delta computation internally. + ctx.emitReasoningStream(extractThinkingFromTaggedStream(ctx.state.deltaBuffer)); const next = ctx .stripBlockTags(ctx.state.deltaBuffer, { diff --git a/src/agents/pi-embedded-subscribe.ts b/src/agents/pi-embedded-subscribe.ts index e9853775065..4044895cb05 100644 --- a/src/agents/pi-embedded-subscribe.ts +++ b/src/agents/pi-embedded-subscribe.ts @@ -7,6 +7,7 @@ import type { SubscribeEmbeddedPiSessionParams } from "./pi-embedded-subscribe.t import { parseReplyDirectives } from "../auto-reply/reply/reply-directives.js"; import { createStreamingDirectiveAccumulator } from "../auto-reply/reply/streaming-directives.js"; import { formatToolAggregate } from "../auto-reply/tool-meta.js"; +import { emitAgentEvent } from "../infra/agent-events.js"; import { createSubsystemLogger } from "../logging/subsystem.js"; import { buildCodeSpanIndex, createInlineCodeState } from "../markdown/code-spans.js"; import { EmbeddedBlockChunker } from "./pi-embedded-block-chunker.js"; @@ -474,9 +475,6 @@ export function subscribeEmbeddedPiSession(params: SubscribeEmbeddedPiSessionPar }; const emitReasoningStream = (text: string) => { - if (!state.streamReasoning || !params.onReasoningStream) { - return; - } const formatted = formatReasoningMessage(text); if (!formatted) { return; @@ -484,10 +482,26 @@ export function subscribeEmbeddedPiSession(params: SubscribeEmbeddedPiSessionPar if (formatted === state.lastStreamedReasoning) { return; } + // Compute delta before updating state + const prev = state.lastStreamedReasoning ?? ""; + const delta = formatted.startsWith(prev) ? formatted.slice(prev.length) : formatted; state.lastStreamedReasoning = formatted; - void params.onReasoningStream({ - text: formatted, - }); + + // Always emit as agent event so NDJSON/SSE consumers (e.g. --stream-json, web UI) receive thinking + if (delta && params.runId) { + emitAgentEvent({ + runId: params.runId, + stream: "thinking", + data: { delta, text: formatted }, + }); + } + + // Callback-based emission (TUI, messaging channels) still guarded by streamReasoning + if (state.streamReasoning && params.onReasoningStream) { + void params.onReasoningStream({ + text: formatted, + }); + } }; const resetForCompactionRetry = () => {