fix(web): subagent tool events — live streaming + persisted enrichment

Subagent sessions were missing tool call displays because the gateway's
passive subscribe mode (agent.subscribe unsupported) only broadcasts
assistant + lifecycle events, not tool events.

Three fixes:
- Use start-mode GatewayProcessHandle for subagent follow-up messages
  so the agent RPC streams all events (including tools) on the same
  WebSocket connection
- Enrich persisted subagent JSONL at load time from the gateway's
  on-disk session transcript when tool-invocation parts are missing
- Best-effort enrichment at finalization time from gateway transcript
  for subscribe-only runs that didn't receive tool events
This commit is contained in:
kumarabhirup 2026-03-03 16:26:34 -08:00
parent 775e0e99f8
commit 045b73f42e
No known key found for this signature in database
GPG Key ID: DB7CA2289CAB0167
4 changed files with 238 additions and 12 deletions

View File

@ -9,7 +9,6 @@ import {
persistUserMessage,
persistSubscribeUserMessage,
reactivateSubscribeRun,
sendSubagentFollowUp,
type SseEvent,
} from "@/lib/active-runs";
import { existsSync, readFileSync } from "node:fs";
@ -100,10 +99,7 @@ export async function POST(req: Request) {
id: lastUserMessage.id,
text: userText,
});
reactivateSubscribeRun(sessionKey);
if (!sendSubagentFollowUp(sessionKey, agentMessage)) {
return new Response("Failed to send subagent message", { status: 500 });
}
reactivateSubscribeRun(sessionKey, agentMessage);
} else if (sessionId && lastUserMessage) {
persistUserMessage(sessionId, {
id: lastUserMessage.id,
@ -144,6 +140,9 @@ export async function POST(req: Request) {
try { controller.close(); } catch { /* already closed */ }
return;
}
// Skip custom event types not in the AI SDK v6 data stream schema;
// they're only consumed by the reconnection parser (processEvent).
if (event.type === "tool-output-partial") {return;}
try {
const json = JSON.stringify(event);
controller.enqueue(encoder.encode(`data: ${json}\n\n`));

View File

@ -1,6 +1,6 @@
import { readFileSync, existsSync, unlinkSync } from "node:fs";
import { readFileSync, existsSync, unlinkSync, writeFileSync } from "node:fs";
import { join } from "node:path";
import { resolveWebChatDir } from "@/lib/workspace";
import { resolveWebChatDir, resolveOpenClawStateDir, resolveActiveAgentId } from "@/lib/workspace";
import { readIndex, writeIndex } from "../shared";
export const dynamic = "force-dynamic";
@ -17,6 +17,99 @@ export type ChatLine = {
timestamp: string;
};
/**
* For subagent sessions whose persisted parts lack tool-invocation entries,
* backfill from the gateway's on-disk session transcript (which always
* stores the full conversation including tool calls).
*/
function enrichSubagentMessages(sessionKey: string, messages: ChatLine[], webChatPath: string): ChatLine[] {
const assistantMsgs = messages.filter((m) => m.role === "assistant");
const hasToolParts = assistantMsgs.some((m) =>
m.parts?.some((p) => p.type === "tool-invocation" || p.type === "dynamic-tool"),
);
if (hasToolParts) {return messages;}
try {
const stateDir = resolveOpenClawStateDir();
const agentId = resolveActiveAgentId();
const sessionsJsonPath = join(stateDir, "agents", agentId, "sessions", "sessions.json");
if (!existsSync(sessionsJsonPath)) {return messages;}
const sessions = JSON.parse(readFileSync(sessionsJsonPath, "utf-8")) as Record<string, Record<string, unknown>>;
const sessionData = sessions[sessionKey];
const sessionId = typeof sessionData?.sessionId === "string" ? sessionData.sessionId : null;
if (!sessionId) {return messages;}
const transcriptPath = join(stateDir, "agents", agentId, "sessions", `${sessionId}.jsonl`);
if (!existsSync(transcriptPath)) {return messages;}
const entries = readFileSync(transcriptPath, "utf-8")
.split("\n")
.filter((l) => l.trim())
.map((l) => { try { return JSON.parse(l); } catch { return null; } })
.filter(Boolean) as Array<Record<string, unknown>>;
const toolParts: Array<Record<string, unknown>> = [];
const toolResults = new Map<string, Record<string, unknown>>();
for (const entry of entries) {
if (entry.type !== "message") {continue;}
const msg = entry.message as Record<string, unknown> | undefined;
if (!msg) {continue;}
const content = msg.content;
if (msg.role === "toolResult" && typeof msg.toolCallId === "string") {
const text = Array.isArray(content)
? (content as Array<Record<string, unknown>>)
.filter((c) => c.type === "text" && typeof c.text === "string")
.map((c) => c.text as string)
.join("\n")
: typeof content === "string" ? content : "";
toolResults.set(msg.toolCallId, { text: text.slice(0, 500) });
}
if (Array.isArray(content)) {
for (const part of content as Array<Record<string, unknown>>) {
if (part.type === "toolCall" && typeof part.id === "string" && typeof part.name === "string") {
toolParts.push({
type: "tool-invocation",
toolCallId: part.id,
toolName: part.name,
args: (part.arguments as Record<string, unknown>) ?? {},
});
}
}
}
}
if (toolParts.length === 0) {return messages;}
for (const tp of toolParts) {
const result = toolResults.get(tp.toolCallId as string);
if (result) { tp.result = result; }
}
// Inject tool parts into assistant messages: place them before text parts
const enriched = messages.map((m) => {
if (m.role !== "assistant") {return m;}
const existingParts = m.parts ?? [{ type: "text", text: m.content }];
const textParts = existingParts.filter((p) => p.type === "text");
const otherParts = existingParts.filter((p) => p.type !== "text");
return { ...m, parts: [...otherParts, ...toolParts, ...textParts] };
});
// Persist the enriched data back so future loads don't need to re-enrich
try {
const lines = enriched.map((m) => JSON.stringify(m));
writeFileSync(webChatPath, lines.join("\n") + "\n");
} catch { /* best effort */ }
return enriched;
} catch {
return messages;
}
}
/** GET /api/web-sessions/[id] — read all messages for a web chat session */
export async function GET(
_request: Request,
@ -30,7 +123,7 @@ export async function GET(
}
const content = readFileSync(filePath, "utf-8");
const messages: ChatLine[] = content
let messages: ChatLine[] = content
.trim()
.split("\n")
.filter((line) => line.trim())
@ -43,6 +136,10 @@ export async function GET(
})
.filter((m): m is ChatLine => m !== null);
if (id.includes(":subagent:")) {
messages = enrichSubagentMessages(id, messages, filePath);
}
return Response.json({ id, messages });
}

View File

@ -22,6 +22,7 @@ import {
type AgentEvent,
spawnAgentProcess,
spawnAgentSubscribeProcess,
spawnAgentStartForSession,
callGatewayRpc,
extractToolResult,
buildToolOutput,
@ -285,7 +286,7 @@ export function subscribeToRun(
* Reactivate a completed subscribe-only run for a follow-up message.
* Resets status to "running" and restarts the subscribe stream.
*/
export function reactivateSubscribeRun(sessionKey: string): boolean {
export function reactivateSubscribeRun(sessionKey: string, message?: string): boolean {
const run = activeRuns.get(sessionKey);
if (!run?.isSubscribeOnly) {return false;}
if (run.status === "running") {return true;}
@ -295,6 +296,7 @@ export function reactivateSubscribeRun(sessionKey: string): boolean {
if (run._finalizeTimer) {clearTimeout(run._finalizeTimer); run._finalizeTimer = null;}
clearWaitingFinalizeTimer(run);
resetSubscribeRetryState(run);
stopSubscribeProcess(run);
run.accumulated = {
id: `assistant-${sessionKey}-${Date.now()}`,
@ -302,7 +304,12 @@ export function reactivateSubscribeRun(sessionKey: string): boolean {
parts: [],
};
const newChild = spawnAgentSubscribeProcess(sessionKey, run.lastGlobalSeq);
// When a follow-up message is provided, use start mode so the `agent`
// RPC streams ALL events (including tool events) on the same connection.
// In passive subscribe mode, tool events are not broadcast by the gateway.
const newChild = message
? spawnAgentStartForSession(message, sessionKey)
: spawnAgentSubscribeProcess(sessionKey, run.lastGlobalSeq);
run._subscribeProcess = newChild;
run.childProcess = newChild;
wireSubscribeOnlyProcess(run, newChild, sessionKey);
@ -512,6 +519,15 @@ export function startSubscribeRun(params: {
return activeRuns.get(sessionKey)!;
}
// Patch verbose level BEFORE spawning the subscribe process so tool
// events are generated for events that occur after this point.
// The subscribe process also patches, but this gives us a head start.
void callGatewayRpc(
"sessions.patch",
{ key: sessionKey, verboseLevel: "full", reasoningLevel: "on" },
{ timeoutMs: 4_000 },
).catch(() => {});
const abortController = new AbortController();
const subscribeChild = spawnAgentSubscribeProcess(sessionKey, 0);
@ -718,7 +734,6 @@ function wireSubscribeOnlyProcess(
const phase = typeof ev.data?.phase === "string" ? ev.data.phase : undefined;
const toolCallId = typeof ev.data?.toolCallId === "string" ? ev.data.toolCallId : "";
const toolName = typeof ev.data?.name === "string" ? ev.data.name : "";
if (phase === "start") {
closeReasoning();
closeText();
@ -879,12 +894,108 @@ function wireSubscribeOnlyProcess(
run._subscribeProcess = child;
}
/**
* When the subscribe process doesn't receive tool events (gateway
* `agent.subscribe` unsupported), backfill tool-invocation parts from
* the gateway's on-disk session transcript.
*/
function enrichFromGatewayTranscript(run: ActiveRun): void {
if (!run.isSubscribeOnly || !run.sessionKey) {return;}
if (run.accumulated.parts.some((p) => p.type === "tool-invocation")) {return;}
try {
const stateDir = resolveOpenClawStateDir();
const agentId = resolveActiveAgentId();
const sessionsJsonPath = join(stateDir, "agents", agentId, "sessions", "sessions.json");
if (!existsSync(sessionsJsonPath)) {return;}
const sessions = JSON.parse(readFileSync(sessionsJsonPath, "utf-8")) as Record<string, Record<string, unknown>>;
const sessionData = sessions[run.sessionKey];
const sessionId = typeof sessionData?.sessionId === "string" ? sessionData.sessionId : null;
if (!sessionId) {return;}
const transcriptPath = join(stateDir, "agents", agentId, "sessions", `${sessionId}.jsonl`);
if (!existsSync(transcriptPath)) {return;}
const entries = readFileSync(transcriptPath, "utf-8")
.split("\n")
.filter((l) => l.trim())
.map((l) => { try { return JSON.parse(l); } catch { return null; } })
.filter(Boolean) as Array<Record<string, unknown>>;
const toolParts: AccumulatedPart[] = [];
const toolResults = new Map<string, Record<string, unknown>>();
for (const entry of entries) {
if (entry.type !== "message") {continue;}
const msg = entry.message as Record<string, unknown> | undefined;
if (!msg) {continue;}
const content = msg.content;
if (msg.role === "toolResult" && typeof msg.toolCallId === "string") {
const text = Array.isArray(content)
? (content as Array<Record<string, unknown>>)
.filter((c) => c.type === "text" && typeof c.text === "string")
.map((c) => c.text as string)
.join("\n")
: typeof content === "string" ? content : "";
toolResults.set(msg.toolCallId, { text: text.slice(0, 500) });
}
if (Array.isArray(content)) {
for (const part of content as Array<Record<string, unknown>>) {
if (part.type === "toolCall" && typeof part.id === "string" && typeof part.name === "string") {
toolParts.push({
type: "tool-invocation",
toolCallId: part.id,
toolName: part.name,
args: (part.arguments as Record<string, unknown>) ?? {},
});
}
}
}
}
if (toolParts.length === 0) {return;}
for (const tp of toolParts) {
if (tp.type === "tool-invocation") {
const result = toolResults.get(tp.toolCallId);
if (result) { tp.result = result; }
}
}
const textParts = run.accumulated.parts.filter((p) => p.type === "text");
const nonTextParts = run.accumulated.parts.filter((p) => p.type !== "text");
run.accumulated.parts = [...nonTextParts, ...toolParts, ...textParts];
// Also emit SSE events for tool parts so reconnecting clients see them
for (const tp of toolParts) {
if (tp.type === "tool-invocation") {
const toolEvent = { type: "tool-input-start", toolCallId: tp.toolCallId, toolName: tp.toolName };
run.eventBuffer.push(toolEvent);
const inputEvent = { type: "tool-input-available", toolCallId: tp.toolCallId, toolName: tp.toolName, input: tp.args };
run.eventBuffer.push(inputEvent);
if (tp.result) {
const outputEvent = { type: "tool-output-available", toolCallId: tp.toolCallId, output: tp.result };
run.eventBuffer.push(outputEvent);
}
}
}
} catch {
// Best effort -- don't break finalization
}
}
function finalizeSubscribeRun(run: ActiveRun, status: "completed" | "error" = "completed"): void {
if (run.status !== "running") { return; }
if (run._finalizeTimer) { clearTimeout(run._finalizeTimer); run._finalizeTimer = null; }
clearWaitingFinalizeTimer(run);
resetSubscribeRetryState(run);
enrichFromGatewayTranscript(run);
run.status = status;
flushPersistence(run);

View File

@ -185,6 +185,7 @@ type SpawnGatewayProcessParams = {
message?: string;
sessionKey?: string;
afterSeq: number;
lane?: string;
};
type BuildConnectParamsOptions = {
@ -596,7 +597,7 @@ class GatewayProcessHandle
...(sessionKey ? { sessionKey } : {}),
deliver: false,
channel: "webchat",
lane: "web",
lane: this.params.lane ?? "web",
timeout: 0,
});
if (!startRes.ok) {
@ -950,6 +951,24 @@ export function spawnAgentSubscribeProcess(
});
}
/**
* Spawn a start-mode agent process for a subagent follow-up message.
* Uses the `agent` RPC which receives ALL events (including tool events)
* on the same WebSocket connection, unlike passive subscribe mode.
*/
export function spawnAgentStartForSession(
message: string,
sessionKey: string,
): AgentProcessHandle {
return new GatewayProcessHandle({
mode: "start",
message,
sessionKey,
afterSeq: 0,
lane: "subagent",
});
}
function spawnLegacyAgentSubscribeProcess(
sessionKey: string,
afterSeq = 0,