diff --git a/apps/web/app/api/chat/route.ts b/apps/web/app/api/chat/route.ts index ac678a1a730..a0a0e0b2cee 100644 --- a/apps/web/app/api/chat/route.ts +++ b/apps/web/app/api/chat/route.ts @@ -9,7 +9,6 @@ import { persistUserMessage, persistSubscribeUserMessage, reactivateSubscribeRun, - sendSubagentFollowUp, type SseEvent, } from "@/lib/active-runs"; import { existsSync, readFileSync } from "node:fs"; @@ -100,10 +99,7 @@ export async function POST(req: Request) { id: lastUserMessage.id, text: userText, }); - reactivateSubscribeRun(sessionKey); - if (!sendSubagentFollowUp(sessionKey, agentMessage)) { - return new Response("Failed to send subagent message", { status: 500 }); - } + reactivateSubscribeRun(sessionKey, agentMessage); } else if (sessionId && lastUserMessage) { persistUserMessage(sessionId, { id: lastUserMessage.id, @@ -144,6 +140,9 @@ export async function POST(req: Request) { try { controller.close(); } catch { /* already closed */ } return; } + // Skip custom event types not in the AI SDK v6 data stream schema; + // they're only consumed by the reconnection parser (processEvent). + if (event.type === "tool-output-partial") {return;} try { const json = JSON.stringify(event); controller.enqueue(encoder.encode(`data: ${json}\n\n`)); diff --git a/apps/web/app/api/web-sessions/[id]/route.ts b/apps/web/app/api/web-sessions/[id]/route.ts index 83f98a72bb4..d79881d4d9a 100644 --- a/apps/web/app/api/web-sessions/[id]/route.ts +++ b/apps/web/app/api/web-sessions/[id]/route.ts @@ -1,6 +1,6 @@ -import { readFileSync, existsSync, unlinkSync } from "node:fs"; +import { readFileSync, existsSync, unlinkSync, writeFileSync } from "node:fs"; import { join } from "node:path"; -import { resolveWebChatDir } from "@/lib/workspace"; +import { resolveWebChatDir, resolveOpenClawStateDir, resolveActiveAgentId } from "@/lib/workspace"; import { readIndex, writeIndex } from "../shared"; export const dynamic = "force-dynamic"; @@ -17,6 +17,99 @@ export type ChatLine = { timestamp: string; }; +/** + * For subagent sessions whose persisted parts lack tool-invocation entries, + * backfill from the gateway's on-disk session transcript (which always + * stores the full conversation including tool calls). + */ +function enrichSubagentMessages(sessionKey: string, messages: ChatLine[], webChatPath: string): ChatLine[] { + const assistantMsgs = messages.filter((m) => m.role === "assistant"); + const hasToolParts = assistantMsgs.some((m) => + m.parts?.some((p) => p.type === "tool-invocation" || p.type === "dynamic-tool"), + ); + if (hasToolParts) {return messages;} + + try { + const stateDir = resolveOpenClawStateDir(); + const agentId = resolveActiveAgentId(); + const sessionsJsonPath = join(stateDir, "agents", agentId, "sessions", "sessions.json"); + if (!existsSync(sessionsJsonPath)) {return messages;} + + const sessions = JSON.parse(readFileSync(sessionsJsonPath, "utf-8")) as Record>; + const sessionData = sessions[sessionKey]; + const sessionId = typeof sessionData?.sessionId === "string" ? sessionData.sessionId : null; + if (!sessionId) {return messages;} + + const transcriptPath = join(stateDir, "agents", agentId, "sessions", `${sessionId}.jsonl`); + if (!existsSync(transcriptPath)) {return messages;} + + const entries = readFileSync(transcriptPath, "utf-8") + .split("\n") + .filter((l) => l.trim()) + .map((l) => { try { return JSON.parse(l); } catch { return null; } }) + .filter(Boolean) as Array>; + + const toolParts: Array> = []; + const toolResults = new Map>(); + + for (const entry of entries) { + if (entry.type !== "message") {continue;} + const msg = entry.message as Record | undefined; + if (!msg) {continue;} + const content = msg.content; + + if (msg.role === "toolResult" && typeof msg.toolCallId === "string") { + const text = Array.isArray(content) + ? (content as Array>) + .filter((c) => c.type === "text" && typeof c.text === "string") + .map((c) => c.text as string) + .join("\n") + : typeof content === "string" ? content : ""; + toolResults.set(msg.toolCallId, { text: text.slice(0, 500) }); + } + + if (Array.isArray(content)) { + for (const part of content as Array>) { + if (part.type === "toolCall" && typeof part.id === "string" && typeof part.name === "string") { + toolParts.push({ + type: "tool-invocation", + toolCallId: part.id, + toolName: part.name, + args: (part.arguments as Record) ?? {}, + }); + } + } + } + } + + if (toolParts.length === 0) {return messages;} + + for (const tp of toolParts) { + const result = toolResults.get(tp.toolCallId as string); + if (result) { tp.result = result; } + } + + // Inject tool parts into assistant messages: place them before text parts + const enriched = messages.map((m) => { + if (m.role !== "assistant") {return m;} + const existingParts = m.parts ?? [{ type: "text", text: m.content }]; + const textParts = existingParts.filter((p) => p.type === "text"); + const otherParts = existingParts.filter((p) => p.type !== "text"); + return { ...m, parts: [...otherParts, ...toolParts, ...textParts] }; + }); + + // Persist the enriched data back so future loads don't need to re-enrich + try { + const lines = enriched.map((m) => JSON.stringify(m)); + writeFileSync(webChatPath, lines.join("\n") + "\n"); + } catch { /* best effort */ } + + return enriched; + } catch { + return messages; + } +} + /** GET /api/web-sessions/[id] — read all messages for a web chat session */ export async function GET( _request: Request, @@ -30,7 +123,7 @@ export async function GET( } const content = readFileSync(filePath, "utf-8"); - const messages: ChatLine[] = content + let messages: ChatLine[] = content .trim() .split("\n") .filter((line) => line.trim()) @@ -43,6 +136,10 @@ export async function GET( }) .filter((m): m is ChatLine => m !== null); + if (id.includes(":subagent:")) { + messages = enrichSubagentMessages(id, messages, filePath); + } + return Response.json({ id, messages }); } diff --git a/apps/web/lib/active-runs.ts b/apps/web/lib/active-runs.ts index 824a7949292..3197343efa6 100644 --- a/apps/web/lib/active-runs.ts +++ b/apps/web/lib/active-runs.ts @@ -22,6 +22,7 @@ import { type AgentEvent, spawnAgentProcess, spawnAgentSubscribeProcess, + spawnAgentStartForSession, callGatewayRpc, extractToolResult, buildToolOutput, @@ -285,7 +286,7 @@ export function subscribeToRun( * Reactivate a completed subscribe-only run for a follow-up message. * Resets status to "running" and restarts the subscribe stream. */ -export function reactivateSubscribeRun(sessionKey: string): boolean { +export function reactivateSubscribeRun(sessionKey: string, message?: string): boolean { const run = activeRuns.get(sessionKey); if (!run?.isSubscribeOnly) {return false;} if (run.status === "running") {return true;} @@ -295,6 +296,7 @@ export function reactivateSubscribeRun(sessionKey: string): boolean { if (run._finalizeTimer) {clearTimeout(run._finalizeTimer); run._finalizeTimer = null;} clearWaitingFinalizeTimer(run); resetSubscribeRetryState(run); + stopSubscribeProcess(run); run.accumulated = { id: `assistant-${sessionKey}-${Date.now()}`, @@ -302,7 +304,12 @@ export function reactivateSubscribeRun(sessionKey: string): boolean { parts: [], }; - const newChild = spawnAgentSubscribeProcess(sessionKey, run.lastGlobalSeq); + // When a follow-up message is provided, use start mode so the `agent` + // RPC streams ALL events (including tool events) on the same connection. + // In passive subscribe mode, tool events are not broadcast by the gateway. + const newChild = message + ? spawnAgentStartForSession(message, sessionKey) + : spawnAgentSubscribeProcess(sessionKey, run.lastGlobalSeq); run._subscribeProcess = newChild; run.childProcess = newChild; wireSubscribeOnlyProcess(run, newChild, sessionKey); @@ -512,6 +519,15 @@ export function startSubscribeRun(params: { return activeRuns.get(sessionKey)!; } + // Patch verbose level BEFORE spawning the subscribe process so tool + // events are generated for events that occur after this point. + // The subscribe process also patches, but this gives us a head start. + void callGatewayRpc( + "sessions.patch", + { key: sessionKey, verboseLevel: "full", reasoningLevel: "on" }, + { timeoutMs: 4_000 }, + ).catch(() => {}); + const abortController = new AbortController(); const subscribeChild = spawnAgentSubscribeProcess(sessionKey, 0); @@ -718,7 +734,6 @@ function wireSubscribeOnlyProcess( const phase = typeof ev.data?.phase === "string" ? ev.data.phase : undefined; const toolCallId = typeof ev.data?.toolCallId === "string" ? ev.data.toolCallId : ""; const toolName = typeof ev.data?.name === "string" ? ev.data.name : ""; - if (phase === "start") { closeReasoning(); closeText(); @@ -879,12 +894,108 @@ function wireSubscribeOnlyProcess( run._subscribeProcess = child; } +/** + * When the subscribe process doesn't receive tool events (gateway + * `agent.subscribe` unsupported), backfill tool-invocation parts from + * the gateway's on-disk session transcript. + */ +function enrichFromGatewayTranscript(run: ActiveRun): void { + if (!run.isSubscribeOnly || !run.sessionKey) {return;} + if (run.accumulated.parts.some((p) => p.type === "tool-invocation")) {return;} + + try { + const stateDir = resolveOpenClawStateDir(); + const agentId = resolveActiveAgentId(); + const sessionsJsonPath = join(stateDir, "agents", agentId, "sessions", "sessions.json"); + if (!existsSync(sessionsJsonPath)) {return;} + + const sessions = JSON.parse(readFileSync(sessionsJsonPath, "utf-8")) as Record>; + const sessionData = sessions[run.sessionKey]; + const sessionId = typeof sessionData?.sessionId === "string" ? sessionData.sessionId : null; + if (!sessionId) {return;} + + const transcriptPath = join(stateDir, "agents", agentId, "sessions", `${sessionId}.jsonl`); + if (!existsSync(transcriptPath)) {return;} + + const entries = readFileSync(transcriptPath, "utf-8") + .split("\n") + .filter((l) => l.trim()) + .map((l) => { try { return JSON.parse(l); } catch { return null; } }) + .filter(Boolean) as Array>; + + const toolParts: AccumulatedPart[] = []; + const toolResults = new Map>(); + + for (const entry of entries) { + if (entry.type !== "message") {continue;} + const msg = entry.message as Record | undefined; + if (!msg) {continue;} + const content = msg.content; + + if (msg.role === "toolResult" && typeof msg.toolCallId === "string") { + const text = Array.isArray(content) + ? (content as Array>) + .filter((c) => c.type === "text" && typeof c.text === "string") + .map((c) => c.text as string) + .join("\n") + : typeof content === "string" ? content : ""; + toolResults.set(msg.toolCallId, { text: text.slice(0, 500) }); + } + + if (Array.isArray(content)) { + for (const part of content as Array>) { + if (part.type === "toolCall" && typeof part.id === "string" && typeof part.name === "string") { + toolParts.push({ + type: "tool-invocation", + toolCallId: part.id, + toolName: part.name, + args: (part.arguments as Record) ?? {}, + }); + } + } + } + } + + if (toolParts.length === 0) {return;} + + for (const tp of toolParts) { + if (tp.type === "tool-invocation") { + const result = toolResults.get(tp.toolCallId); + if (result) { tp.result = result; } + } + } + + const textParts = run.accumulated.parts.filter((p) => p.type === "text"); + const nonTextParts = run.accumulated.parts.filter((p) => p.type !== "text"); + run.accumulated.parts = [...nonTextParts, ...toolParts, ...textParts]; + + // Also emit SSE events for tool parts so reconnecting clients see them + for (const tp of toolParts) { + if (tp.type === "tool-invocation") { + const toolEvent = { type: "tool-input-start", toolCallId: tp.toolCallId, toolName: tp.toolName }; + run.eventBuffer.push(toolEvent); + const inputEvent = { type: "tool-input-available", toolCallId: tp.toolCallId, toolName: tp.toolName, input: tp.args }; + run.eventBuffer.push(inputEvent); + if (tp.result) { + const outputEvent = { type: "tool-output-available", toolCallId: tp.toolCallId, output: tp.result }; + run.eventBuffer.push(outputEvent); + } + } + } + + } catch { + // Best effort -- don't break finalization + } +} + function finalizeSubscribeRun(run: ActiveRun, status: "completed" | "error" = "completed"): void { if (run.status !== "running") { return; } if (run._finalizeTimer) { clearTimeout(run._finalizeTimer); run._finalizeTimer = null; } clearWaitingFinalizeTimer(run); resetSubscribeRetryState(run); + enrichFromGatewayTranscript(run); + run.status = status; flushPersistence(run); diff --git a/apps/web/lib/agent-runner.ts b/apps/web/lib/agent-runner.ts index 556b4fc41fd..9361976f8e0 100644 --- a/apps/web/lib/agent-runner.ts +++ b/apps/web/lib/agent-runner.ts @@ -185,6 +185,7 @@ type SpawnGatewayProcessParams = { message?: string; sessionKey?: string; afterSeq: number; + lane?: string; }; type BuildConnectParamsOptions = { @@ -596,7 +597,7 @@ class GatewayProcessHandle ...(sessionKey ? { sessionKey } : {}), deliver: false, channel: "webchat", - lane: "web", + lane: this.params.lane ?? "web", timeout: 0, }); if (!startRes.ok) { @@ -950,6 +951,24 @@ export function spawnAgentSubscribeProcess( }); } +/** + * Spawn a start-mode agent process for a subagent follow-up message. + * Uses the `agent` RPC which receives ALL events (including tool events) + * on the same WebSocket connection, unlike passive subscribe mode. + */ +export function spawnAgentStartForSession( + message: string, + sessionKey: string, +): AgentProcessHandle { + return new GatewayProcessHandle({ + mode: "start", + message, + sessionKey, + afterSeq: 0, + lane: "subagent", + }); +} + function spawnLegacyAgentSubscribeProcess( sessionKey: string, afterSeq = 0,