From 536ae596671112a06bcd50be24d258115d877048 Mon Sep 17 00:00:00 2001 From: kumarabhirup Date: Sat, 21 Feb 2026 12:32:43 -0800 Subject: [PATCH] web: add interactive messaging UI to subagent panel Co-authored-by: Cursor --- apps/web/app/components/subagent-panel.tsx | 630 ++++++++++++--------- 1 file changed, 352 insertions(+), 278 deletions(-) diff --git a/apps/web/app/components/subagent-panel.tsx b/apps/web/app/components/subagent-panel.tsx index da7daa9f937..aa3f9b33f73 100644 --- a/apps/web/app/components/subagent-panel.tsx +++ b/apps/web/app/components/subagent-panel.tsx @@ -1,107 +1,10 @@ "use client"; -import { useEffect, useRef, useState, useMemo } from "react"; -import { ChatMessage } from "./chat-message"; import type { UIMessage } from "ai"; - -type ParsedPart = - | { type: "text"; text: string } - | { type: "reasoning"; text: string; state?: string } - | { - type: "dynamic-tool"; - toolName: string; - toolCallId: string; - state: string; - input?: Record; - output?: Record; - }; - -function createSubagentStreamParser() { - const parts: ParsedPart[] = []; - let currentTextIdx = -1; - let currentReasoningIdx = -1; - - function processEvent(event: Record) { - const t = event.type as string; - switch (t) { - case "reasoning-start": - parts.push({ type: "reasoning", text: "", state: "streaming" }); - currentReasoningIdx = parts.length - 1; - break; - case "reasoning-delta": { - if (currentReasoningIdx >= 0) { - const p = parts[currentReasoningIdx] as { type: "reasoning"; text: string }; - p.text += event.delta as string; - } - break; - } - case "reasoning-end": - if (currentReasoningIdx >= 0) { - const p = parts[currentReasoningIdx] as { type: "reasoning"; state?: string }; - delete p.state; - } - currentReasoningIdx = -1; - break; - case "text-start": - parts.push({ type: "text", text: "" }); - currentTextIdx = parts.length - 1; - break; - case "text-delta": { - if (currentTextIdx >= 0) { - const p = parts[currentTextIdx] as { type: "text"; text: string }; - p.text += event.delta as string; - } - break; - } - case "text-end": - currentTextIdx = -1; - break; - case "tool-input-start": - parts.push({ - type: "dynamic-tool", - toolCallId: event.toolCallId as string, - toolName: event.toolName as string, - state: "input-available", - input: {}, - }); - break; - case "tool-input-available": - for (let i = parts.length - 1; i >= 0; i--) { - const p = parts[i]; - if (p.type === "dynamic-tool" && p.toolCallId === event.toolCallId) { - p.input = (event.input as Record) ?? {}; - break; - } - } - break; - case "tool-output-available": - for (let i = parts.length - 1; i >= 0; i--) { - const p = parts[i]; - if (p.type === "dynamic-tool" && p.toolCallId === event.toolCallId) { - p.state = "output-available"; - p.output = (event.output as Record) ?? {}; - break; - } - } - break; - case "tool-output-error": - for (let i = parts.length - 1; i >= 0; i--) { - const p = parts[i]; - if (p.type === "dynamic-tool" && p.toolCallId === event.toolCallId) { - p.state = "error"; - p.output = { error: event.errorText as string }; - break; - } - } - break; - } - } - - return { - processEvent, - getParts: (): ParsedPart[] => parts.map((p) => ({ ...p })), - }; -} +import { useCallback, useEffect, useMemo, useRef, useState } from "react"; +import { ChatMessage } from "./chat-message"; +import { createStreamParser } from "./chat-panel"; +import { ChatEditor, type ChatEditorHandle } from "./tiptap/chat-editor"; type SubagentPanelProps = { sessionKey: string; @@ -110,20 +13,268 @@ type SubagentPanelProps = { onBack: () => void; }; +type QueuedMessage = { + id: string; + text: string; + mentionedFiles: Array<{ name: string; path: string }>; +}; + +function taskMessage(sessionKey: string, task: string): UIMessage { + return { + id: `task-${sessionKey}`, + role: "user", + parts: [{ type: "text", text: task }], + } as UIMessage; +} + +function buildMessagesFromParsed( + sessionKey: string, + task: string, + parts: Array>, +): UIMessage[] { + const messages: UIMessage[] = [taskMessage(sessionKey, task)]; + let assistantParts: UIMessage["parts"] = []; + let assistantCount = 0; + let userCount = 0; + + const pushAssistant = () => { + if (assistantParts.length === 0) {return;} + messages.push({ + id: `assistant-${sessionKey}-${assistantCount++}`, + role: "assistant", + parts: assistantParts, + } as UIMessage); + assistantParts = []; + }; + + for (const part of parts) { + if (part.type === "user-message") { + pushAssistant(); + messages.push({ + id: (part.id as string | undefined) ?? `user-${sessionKey}-${userCount++}`, + role: "user", + parts: [{ type: "text", text: (part.text as string) ?? "" }], + } as UIMessage); + continue; + } + assistantParts.push(part as UIMessage["parts"][number]); + } + pushAssistant(); + return messages; +} + export function SubagentPanel({ sessionKey, task, label, onBack }: SubagentPanelProps) { - const [messages, setMessages] = useState< - Array<{ id: string; role: "assistant"; parts: UIMessage["parts"] }> - >([]); - const [isStreaming, setIsStreaming] = useState(true); + const editorRef = useRef(null); + const [editorEmpty, setEditorEmpty] = useState(true); + const [messages, setMessages] = useState(() => [taskMessage(sessionKey, task)]); + const [queuedMessages, setQueuedMessages] = useState([]); + const [isStreaming, setIsStreaming] = useState(false); const [connected, setConnected] = useState(false); + const [isReconnecting, setIsReconnecting] = useState(false); const messagesEndRef = useRef(null); const scrollContainerRef = useRef(null); const userScrolledAwayRef = useRef(false); - const abortRef = useRef(null); + const streamAbortRef = useRef(null); + const scrollRafRef = useRef(0); const displayLabel = label || (task.length > 60 ? task.slice(0, 60) + "..." : task); - // Auto-scroll + const streamFromResponse = useCallback( + async ( + res: Response, + onUpdate: (parts: Array>) => void, + signal: AbortSignal, + ) => { + if (!res.body) {return;} + const parser = createStreamParser(); + const reader = res.body.getReader(); + const decoder = new TextDecoder(); + let buffer = ""; + let frameRequested = false; + while (true) { + const { done, value } = await reader.read(); + if (done) {break;} + buffer += decoder.decode(value, { stream: true }); + let idx; + while ((idx = buffer.indexOf("\n\n")) !== -1) { + const chunk = buffer.slice(0, idx); + buffer = buffer.slice(idx + 2); + if (chunk.startsWith("data: ")) { + try { + const event = JSON.parse(chunk.slice(6)) as Record; + parser.processEvent(event); + } catch { + // ignore malformed event + } + } + } + if (!frameRequested) { + frameRequested = true; + requestAnimationFrame(() => { + frameRequested = false; + if (!signal.aborted) { + onUpdate(parser.getParts() as Array>); + } + }); + } + } + if (!signal.aborted) { + onUpdate(parser.getParts() as Array>); + } + }, + [], + ); + + const reconnect = useCallback(async () => { + streamAbortRef.current?.abort(); + const abort = new AbortController(); + streamAbortRef.current = abort; + setIsReconnecting(true); + try { + const res = await fetch(`/api/chat/stream?sessionKey=${encodeURIComponent(sessionKey)}`, { + signal: abort.signal, + }); + if (!res.ok || !res.body) { + setConnected(false); + setIsStreaming(false); + return; + } + setConnected(true); + setIsStreaming(res.headers.get("X-Run-Active") !== "false"); + await streamFromResponse( + res, + (parts) => setMessages(buildMessagesFromParsed(sessionKey, task, parts)), + abort.signal, + ); + } catch (err) { + if ((err as Error).name !== "AbortError") { + console.error("Subagent reconnect error:", err); + } + } finally { + setIsReconnecting(false); + if (!abort.signal.aborted) { + setIsStreaming(false); + streamAbortRef.current = null; + } + } + }, [sessionKey, task, streamFromResponse]); + + const sendSubagentMessage = useCallback( + async (text: string, mentionedFiles: Array<{ name: string; path: string }>) => { + const trimmed = text.trim(); + const hasMentions = mentionedFiles.length > 0; + if (!trimmed && !hasMentions) {return;} + + const allFilePaths = mentionedFiles.map((f) => f.path); + const payloadText = allFilePaths.length > 0 + ? `[Attached files: ${allFilePaths.join(", ")}]\n\n${trimmed}` + : trimmed; + + const optimisticUser: UIMessage = { + id: `user-${Date.now()}-${Math.random().toString(36).slice(2)}`, + role: "user", + parts: [{ type: "text", text: payloadText }], + } as UIMessage; + const baseMessages = [...messages, optimisticUser]; + setMessages(baseMessages); + + streamAbortRef.current?.abort(); + const abort = new AbortController(); + streamAbortRef.current = abort; + setIsStreaming(true); + setConnected(true); + + try { + const res = await fetch("/api/chat", { + method: "POST", + headers: { "Content-Type": "application/json" }, + signal: abort.signal, + body: JSON.stringify({ + sessionKey, + messages: [optimisticUser], + }), + }); + if (!res.ok || !res.body) { + setIsStreaming(false); + return; + } + await streamFromResponse( + res, + (parts) => { + const assistantMsg: UIMessage = { + id: `assistant-${sessionKey}-${Date.now()}`, + role: "assistant", + parts: parts as UIMessage["parts"], + } as UIMessage; + setMessages([...baseMessages, assistantMsg]); + }, + abort.signal, + ); + } catch (err) { + if ((err as Error).name !== "AbortError") { + console.error("Subagent send error:", err); + } + } finally { + if (!abort.signal.aborted) { + setIsStreaming(false); + streamAbortRef.current = null; + } + } + }, + [messages, sessionKey, streamFromResponse], + ); + + const handleEditorSubmit = useCallback( + async (text: string, mentionedFiles: Array<{ name: string; path: string }>) => { + if (isStreaming || isReconnecting) { + setQueuedMessages((prev) => [ + ...prev, + { + id: crypto.randomUUID(), + text, + mentionedFiles, + }, + ]); + return; + } + await sendSubagentMessage(text, mentionedFiles); + }, + [isStreaming, isReconnecting, sendSubagentMessage], + ); + + const handleStop = useCallback(async () => { + streamAbortRef.current?.abort(); + streamAbortRef.current = null; + setIsStreaming(false); + setIsReconnecting(false); + try { + await fetch("/api/chat/stop", { + method: "POST", + headers: { "Content-Type": "application/json" }, + body: JSON.stringify({ sessionKey }), + }); + } catch { + // ignore + } + }, [sessionKey]); + + useEffect(() => { + void reconnect(); + return () => { + streamAbortRef.current?.abort(); + }; + }, [reconnect]); + + useEffect(() => { + const wasBusy = isStreaming || isReconnecting; + if (wasBusy || queuedMessages.length === 0) {return;} + const [next, ...rest] = queuedMessages; + setQueuedMessages(rest); + queueMicrotask(() => { + void sendSubagentMessage(next.text, next.mentionedFiles); + }); + }, [isStreaming, isReconnecting, queuedMessages, sendSubagentMessage]); + useEffect(() => { const el = scrollContainerRef.current; if (!el) {return;} @@ -137,108 +288,23 @@ export function SubagentPanel({ sessionKey, task, label, onBack }: SubagentPanel useEffect(() => { if (userScrolledAwayRef.current) {return;} - messagesEndRef.current?.scrollIntoView({ behavior: "smooth" }); + if (scrollRafRef.current) {return;} + scrollRafRef.current = requestAnimationFrame(() => { + scrollRafRef.current = 0; + messagesEndRef.current?.scrollIntoView({ behavior: "smooth" }); + }); }, [messages]); - // Reset state when switching between subagents - useEffect(() => { - setMessages([]); - setIsStreaming(true); - setConnected(false); - userScrolledAwayRef.current = false; - }, [sessionKey]); - - // Connect to subagent SSE stream - useEffect(() => { - const abort = new AbortController(); - abortRef.current = abort; - - const connect = async () => { - try { - const res = await fetch( - `/api/chat/subagent-stream?sessionKey=${encodeURIComponent(sessionKey)}`, - { signal: abort.signal }, - ); - - if (!res.ok || !res.body) { - setIsStreaming(false); - return; - } - - const isActive = res.headers.get("X-Run-Active") !== "false"; - setConnected(true); - setIsStreaming(isActive); - - const parser = createSubagentStreamParser(); - const reader = res.body.getReader(); - const decoder = new TextDecoder(); - const msgId = `subagent-${sessionKey}`; - let buffer = ""; - let frameRequested = false; - - const updateUI = () => { - if (abort.signal.aborted) {return;} - const assistantMsg = { - id: msgId, - role: "assistant" as const, - parts: parser.getParts() as UIMessage["parts"], - }; - setMessages([assistantMsg]); - }; - - // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition -- loop - while (true) { - const { done, value } = await reader.read(); - if (done) {break;} - - buffer += decoder.decode(value, { stream: true }); - - let idx; - while ((idx = buffer.indexOf("\n\n")) !== -1) { - const chunk = buffer.slice(0, idx); - buffer = buffer.slice(idx + 2); - - if (chunk.startsWith("data: ")) { - try { - const event = JSON.parse(chunk.slice(6)); - parser.processEvent(event); - } catch { /* skip */ } - } - } - - if (!frameRequested) { - frameRequested = true; - requestAnimationFrame(() => { - frameRequested = false; - updateUI(); - }); - } - } - - updateUI(); - setIsStreaming(false); - } catch (err) { - if ((err as Error).name !== "AbortError") { - console.error("Subagent stream error:", err); - } - setIsStreaming(false); - } - }; - - void connect(); - return () => { abort.abort(); }; - }, [sessionKey]); - const statusLabel = useMemo(() => { - if (!connected && isStreaming) {return "Connecting...";} + if (!connected && (isStreaming || isReconnecting)) {return "Connecting...";} + if (isReconnecting) {return "Resuming stream...";} if (isStreaming) {return "Streaming...";} return "Completed"; - }, [connected, isStreaming]); + }, [connected, isStreaming, isReconnecting]); return (
- {/* Header */}
- +
-
- - Subagent - -

- {displayLabel} -

-
+

+ {displayLabel} +

{statusLabel}

- {isStreaming && ( - - )}
- {/* Messages */}
- {messages.length === 0 && isStreaming ? ( -
-
-
-

- Waiting for subagent... -

-
-
- ) : messages.length === 0 ? ( -
-

- No output from subagent. -

-
- ) : ( -
- {messages.map((message, i) => ( - - ))} -
-
- )} +
+ {messages.map((message, i) => ( + + ))} +
+
- {/* Task description */} - {task && task.length > 60 && ( -
-
- Task description -

{task}

-
+
+
+ {queuedMessages.length > 0 && ( +
+
+
+ Queued ({queuedMessages.length}) +
+
+ {queuedMessages.map((msg) => ( +
+

+ {msg.text} +

+ +
+ ))} +
+
+
+ )} + setEditorEmpty(isEmpty)} + placeholder={isStreaming || isReconnecting ? "Type to queue a message..." : "Type @ to mention files..."} + /> +
+
+ {(isStreaming || isReconnecting) && ( + + )} + +
+
- )} +
);