diff --git a/apps/web/app/api/chat/route.ts b/apps/web/app/api/chat/route.ts index 43d465796a7..7af4818542e 100644 --- a/apps/web/app/api/chat/route.ts +++ b/apps/web/app/api/chat/route.ts @@ -1,12 +1,25 @@ import type { UIMessage } from "ai"; +import { existsSync, readFileSync } from "node:fs"; +import { join } from "node:path"; import { resolveAgentWorkspacePrefix } from "@/lib/workspace"; import { startRun, hasActiveRun, subscribeToRun, persistUserMessage, - type SseEvent, + type SseEvent as ParentSseEvent, } from "@/lib/active-runs"; +import { + hasActiveSubagent, + isSubagentRunning, + ensureRegisteredFromDisk, + subscribeToSubagent, + persistUserMessage as persistSubagentUserMessage, + reactivateSubagent, + spawnSubagentMessage, + type SseEvent as SubagentSseEvent, +} from "@/lib/subagent-runs"; +import { resolveOpenClawStateDir } from "@/lib/workspace"; // Force Node.js runtime (required for child_process) export const runtime = "nodejs"; @@ -14,11 +27,37 @@ export const runtime = "nodejs"; // Allow streaming responses up to 10 minutes export const maxDuration = 600; +function deriveSubagentParentSessionId(sessionKey: string): string { + const registryPath = join(resolveOpenClawStateDir(), "subagents", "runs.json"); + if (!existsSync(registryPath)) {return "";} + try { + const raw = JSON.parse(readFileSync(registryPath, "utf-8")) as { + runs?: Record>; + }; + for (const entry of Object.values(raw.runs ?? {})) { + if (entry.childSessionKey !== sessionKey) {continue;} + const requester = typeof entry.requesterSessionKey === "string" ? entry.requesterSessionKey : ""; + const match = requester.match(/^agent:[^:]+:web:(.+)$/); + return match?.[1] ?? ""; + } + } catch { + // ignore + } + return ""; +} + +function ensureSubagentRegistered(sessionKey: string): boolean { + if (hasActiveSubagent(sessionKey)) {return true;} + const parentWebSessionId = deriveSubagentParentSessionId(sessionKey); + return ensureRegisteredFromDisk(sessionKey, parentWebSessionId); +} + export async function POST(req: Request) { const { messages, sessionId, - }: { messages: UIMessage[]; sessionId?: string } = await req.json(); + sessionKey, + }: { messages: UIMessage[]; sessionId?: string; sessionKey?: string } = await req.json(); // Extract the latest user message text const lastUserMessage = messages.filter((m) => m.role === "user").pop(); @@ -35,10 +74,15 @@ export async function POST(req: Request) { return new Response("No message provided", { status: 400 }); } + const isSubagentSession = typeof sessionKey === "string" && sessionKey.includes(":subagent:"); + // Reject if a run is already active for this session. - if (sessionId && hasActiveRun(sessionId)) { + if (!isSubagentSession && sessionId && hasActiveRun(sessionId)) { return new Response("Active run in progress", { status: 409 }); } + if (isSubagentSession && isSubagentRunning(sessionKey)) { + return new Response("Active subagent run in progress", { status: 409 }); + } // Resolve workspace file paths to be agent-cwd-relative. let agentMessage = userText; @@ -52,7 +96,15 @@ export async function POST(req: Request) { // Persist the user message server-side so it survives a page reload // even if the client never gets a chance to save. - if (sessionId && lastUserMessage) { + if (isSubagentSession && sessionKey && lastUserMessage) { + if (!ensureSubagentRegistered(sessionKey)) { + return new Response("Subagent not found", { status: 404 }); + } + persistSubagentUserMessage(sessionKey, { + id: lastUserMessage.id, + text: userText, + }); + } else if (sessionId && lastUserMessage) { persistUserMessage(sessionId, { id: lastUserMessage.id, content: userText, @@ -62,7 +114,14 @@ export async function POST(req: Request) { // Start the agent run (decoupled from this HTTP connection). // The child process will keep running even if this response is cancelled. - if (sessionId) { + if (isSubagentSession && sessionKey) { + if (!reactivateSubagent(sessionKey)) { + return new Response("Subagent not found", { status: 404 }); + } + if (!spawnSubagentMessage(sessionKey, agentMessage)) { + return new Response("Failed to start subagent run", { status: 500 }); + } + } else if (sessionId) { try { startRun({ sessionId, @@ -84,15 +143,38 @@ export async function POST(req: Request) { const stream = new ReadableStream({ start(controller) { - if (!sessionId) { + if (!sessionId && !sessionKey) { // No session — shouldn't happen but close gracefully. controller.close(); return; } - unsubscribe = subscribeToRun( - sessionId, - (event: SseEvent | null) => { + unsubscribe = isSubagentSession && sessionKey + ? subscribeToSubagent( + sessionKey, + (event: SubagentSseEvent | null) => { + if (closed) {return;} + if (event === null) { + closed = true; + try { + controller.close(); + } catch { + /* already closed */ + } + return; + } + try { + const json = JSON.stringify(event); + controller.enqueue(encoder.encode(`data: ${json}\n\n`)); + } catch { + /* ignore enqueue errors on closed stream */ + } + }, + { replay: false }, + ) + : subscribeToRun( + sessionId as string, + (event: ParentSseEvent | null) => { if (closed) {return;} if (event === null) { // Run completed — close the SSE stream. diff --git a/apps/web/app/api/chat/stop/route.ts b/apps/web/app/api/chat/stop/route.ts index 1642915773c..02b1a66e488 100644 --- a/apps/web/app/api/chat/stop/route.ts +++ b/apps/web/app/api/chat/stop/route.ts @@ -5,16 +5,54 @@ * The child process is sent SIGTERM and the run transitions to "error" state. */ import { abortRun } from "@/lib/active-runs"; +import { + abortSubagent, + hasActiveSubagent, + isSubagentRunning, + ensureRegisteredFromDisk, +} from "@/lib/subagent-runs"; +import { existsSync, readFileSync } from "node:fs"; +import { join } from "node:path"; +import { resolveOpenClawStateDir } from "@/lib/workspace"; export const runtime = "nodejs"; +function deriveSubagentParentSessionId(sessionKey: string): string { + const registryPath = join(resolveOpenClawStateDir(), "subagents", "runs.json"); + if (!existsSync(registryPath)) {return "";} + try { + const raw = JSON.parse(readFileSync(registryPath, "utf-8")) as { + runs?: Record>; + }; + for (const entry of Object.values(raw.runs ?? {})) { + if (entry.childSessionKey !== sessionKey) {continue;} + const requester = typeof entry.requesterSessionKey === "string" ? entry.requesterSessionKey : ""; + const match = requester.match(/^agent:[^:]+:web:(.+)$/); + return match?.[1] ?? ""; + } + } catch { + // ignore + } + return ""; +} + export async function POST(req: Request) { - const body: { sessionId?: string } = await req + const body: { sessionId?: string; sessionKey?: string } = await req .json() .catch(() => ({})); + const isSubagentSession = typeof body.sessionKey === "string" && body.sessionKey.includes(":subagent:"); + if (isSubagentSession && body.sessionKey) { + if (!hasActiveSubagent(body.sessionKey)) { + const parentWebSessionId = deriveSubagentParentSessionId(body.sessionKey); + ensureRegisteredFromDisk(body.sessionKey, parentWebSessionId); + } + const aborted = isSubagentRunning(body.sessionKey) ? abortSubagent(body.sessionKey) : false; + return Response.json({ aborted }); + } + if (!body.sessionId) { - return new Response("sessionId required", { status: 400 }); + return new Response("sessionId or subagent sessionKey required", { status: 400 }); } const aborted = abortRun(body.sessionId);