From 54d048bfb9351cf0b3d0872647e2550a0b04f3a9 Mon Sep 17 00:00:00 2001 From: kumarabhirup Date: Sat, 21 Feb 2026 12:32:32 -0800 Subject: [PATCH] web: merge subagent-stream into unified stream route --- apps/web/app/api/chat/stream/route.ts | 103 +++++++++++++++++- .../web/app/api/chat/subagent-stream/route.ts | 101 ----------------- 2 files changed, 97 insertions(+), 107 deletions(-) delete mode 100644 apps/web/app/api/chat/subagent-stream/route.ts diff --git a/apps/web/app/api/chat/stream/route.ts b/apps/web/app/api/chat/stream/route.ts index 4434f950eb7..c1c16fd563e 100644 --- a/apps/web/app/api/chat/stream/route.ts +++ b/apps/web/app/api/chat/stream/route.ts @@ -10,21 +10,112 @@ import { getActiveRun, subscribeToRun, - type SseEvent, + type SseEvent as ParentSseEvent, } from "@/lib/active-runs"; +import { + subscribeToSubagent, + hasActiveSubagent, + isSubagentRunning, + ensureRegisteredFromDisk, + ensureSubagentStreamable, + type SseEvent as SubagentSseEvent, +} from "@/lib/subagent-runs"; +import { existsSync, readFileSync } from "node:fs"; +import { join } from "node:path"; +import { resolveOpenClawStateDir } from "@/lib/workspace"; export const runtime = "nodejs"; export const maxDuration = 600; +function deriveSubagentParentSessionId(sessionKey: string): string { + const registryPath = join(resolveOpenClawStateDir(), "subagents", "runs.json"); + if (!existsSync(registryPath)) {return "";} + try { + const raw = JSON.parse(readFileSync(registryPath, "utf-8")) as { + runs?: Record>; + }; + for (const entry of Object.values(raw.runs ?? {})) { + if (entry.childSessionKey !== sessionKey) {continue;} + const requester = typeof entry.requesterSessionKey === "string" ? entry.requesterSessionKey : ""; + const match = requester.match(/^agent:[^:]+:web:(.+)$/); + return match?.[1] ?? ""; + } + } catch { + // ignore + } + return ""; +} + export async function GET(req: Request) { const url = new URL(req.url); const sessionId = url.searchParams.get("sessionId"); + const sessionKey = url.searchParams.get("sessionKey"); + const isSubagentSession = typeof sessionKey === "string" && sessionKey.includes(":subagent:"); - if (!sessionId) { - return new Response("sessionId required", { status: 400 }); + if (!sessionId && !sessionKey) { + return new Response("sessionId or subagent sessionKey required", { status: 400 }); } - const run = getActiveRun(sessionId); + if (isSubagentSession && sessionKey) { + if (!hasActiveSubagent(sessionKey)) { + const parentWebSessionId = deriveSubagentParentSessionId(sessionKey); + const registered = ensureRegisteredFromDisk(sessionKey, parentWebSessionId); + if (!registered && !hasActiveSubagent(sessionKey)) { + return Response.json({ active: false }, { status: 404 }); + } + } + ensureSubagentStreamable(sessionKey); + const isActive = isSubagentRunning(sessionKey); + const encoder = new TextEncoder(); + let closed = false; + let unsubscribe: (() => void) | null = null; + + const stream = new ReadableStream({ + start(controller) { + unsubscribe = subscribeToSubagent( + sessionKey, + (event: SubagentSseEvent | null) => { + if (closed) {return;} + if (event === null) { + closed = true; + try { + controller.close(); + } catch { + /* already closed */ + } + return; + } + try { + const json = JSON.stringify(event); + controller.enqueue(encoder.encode(`data: ${json}\n\n`)); + } catch { + /* ignore enqueue errors on closed stream */ + } + }, + { replay: true }, + ); + + if (!unsubscribe) { + closed = true; + controller.close(); + } + }, + cancel() { + closed = true; + unsubscribe?.(); + }, + }); + + return new Response(stream, { + headers: { + "Content-Type": "text/event-stream", + "Cache-Control": "no-cache, no-transform", + Connection: "keep-alive", + "X-Run-Active": isActive ? "true" : "false", + }, + }); + } + const run = getActiveRun(sessionId as string); if (!run) { return Response.json({ active: false }, { status: 404 }); } @@ -49,8 +140,8 @@ export async function GET(req: Request) { // subscribeToRun with replay=true replays the full event buffer // synchronously, then subscribes for live events. unsubscribe = subscribeToRun( - sessionId, - (event: SseEvent | null) => { + sessionId as string, + (event: ParentSseEvent | null) => { if (closed) {return;} if (event === null) { // Run completed — close the SSE stream. diff --git a/apps/web/app/api/chat/subagent-stream/route.ts b/apps/web/app/api/chat/subagent-stream/route.ts deleted file mode 100644 index a682a17acbd..00000000000 --- a/apps/web/app/api/chat/subagent-stream/route.ts +++ /dev/null @@ -1,101 +0,0 @@ -import { subscribeToSubagent, hasActiveSubagent, isSubagentRunning, ensureRegisteredFromDisk, ensureSubagentStreamable } from "@/lib/subagent-runs"; -import type { SseEvent } from "@/lib/subagent-runs"; -import { existsSync, readFileSync } from "node:fs"; -import { join } from "node:path"; -import { resolveOpenClawStateDir } from "@/lib/workspace"; - -export const runtime = "nodejs"; -export const maxDuration = 600; - -/** - * Ensure the subagent is registered in the in-memory SubagentRunManager. - * Tries the shared ensureRegisteredFromDisk helper first, which reads the - * on-disk registry (~/.openclaw/subagents/runs.json). - */ -function ensureRegistered(sessionKey: string): boolean { - if (hasActiveSubagent(sessionKey)) {return true;} - - // Look up the parent web session ID from the on-disk registry - const registryPath = join(resolveOpenClawStateDir(), "subagents", "runs.json"); - if (!existsSync(registryPath)) {return false;} - - try { - const raw = JSON.parse(readFileSync(registryPath, "utf-8")) as { runs?: Record> }; - const runs = raw?.runs; - if (!runs) {return false;} - - for (const entry of Object.values(runs)) { - if (entry.childSessionKey === sessionKey) { - const rsk = typeof entry.requesterSessionKey === "string" ? entry.requesterSessionKey : ""; - const webIdMatch = rsk.match(/^agent:[^:]+:web:(.+)$/); - const parentWebSessionId = webIdMatch?.[1] ?? ""; - return ensureRegisteredFromDisk(sessionKey, parentWebSessionId); - } - } - } catch { /* ignore */ } - - return false; -} - -export async function GET(req: Request) { - const url = new URL(req.url); - const sessionKey = url.searchParams.get("sessionKey"); - - if (!sessionKey) { - return new Response("sessionKey required", { status: 400 }); - } - - // Lazily register the subagent so events get buffered - const registered = ensureRegistered(sessionKey); - if (!registered && !hasActiveSubagent(sessionKey)) { - return new Response("Subagent not found", { status: 404 }); - } - - // For still-running subagents rehydrated from disk, activate the gateway - // WebSocket subscription so new events arrive in real time. - ensureSubagentStreamable(sessionKey); - - const isActive = isSubagentRunning(sessionKey); - const encoder = new TextEncoder(); - let closed = false; - let unsubscribe: (() => void) | null = null; - - const stream = new ReadableStream({ - start(controller) { - unsubscribe = subscribeToSubagent( - sessionKey, - (event: SseEvent | null) => { - if (closed) {return;} - if (event === null) { - closed = true; - try { controller.close(); } catch { /* already closed */ } - return; - } - try { - const json = JSON.stringify(event); - controller.enqueue(encoder.encode(`data: ${json}\n\n`)); - } catch { /* ignore */ } - }, - { replay: true }, - ); - - if (!unsubscribe) { - closed = true; - controller.close(); - } - }, - cancel() { - closed = true; - unsubscribe?.(); - }, - }); - - return new Response(stream, { - headers: { - "Content-Type": "text/event-stream", - "Cache-Control": "no-cache, no-transform", - Connection: "keep-alive", - "X-Run-Active": isActive ? "true" : "false", - }, - }); -}