web: support subagent sessions in chat and stop API routes

This commit is contained in:
kumarabhirup 2026-02-21 12:32:37 -08:00
parent 54d048bfb9
commit 7cdcf61639
No known key found for this signature in database
GPG Key ID: DB7CA2289CAB0167
2 changed files with 131 additions and 11 deletions

View File

@ -1,12 +1,25 @@
import type { UIMessage } from "ai";
import { existsSync, readFileSync } from "node:fs";
import { join } from "node:path";
import { resolveAgentWorkspacePrefix } from "@/lib/workspace";
import {
startRun,
hasActiveRun,
subscribeToRun,
persistUserMessage,
type SseEvent,
type SseEvent as ParentSseEvent,
} from "@/lib/active-runs";
import {
hasActiveSubagent,
isSubagentRunning,
ensureRegisteredFromDisk,
subscribeToSubagent,
persistUserMessage as persistSubagentUserMessage,
reactivateSubagent,
spawnSubagentMessage,
type SseEvent as SubagentSseEvent,
} from "@/lib/subagent-runs";
import { resolveOpenClawStateDir } from "@/lib/workspace";
// Force Node.js runtime (required for child_process)
export const runtime = "nodejs";
@ -14,11 +27,37 @@ export const runtime = "nodejs";
// Allow streaming responses up to 10 minutes
export const maxDuration = 600;
function deriveSubagentParentSessionId(sessionKey: string): string {
const registryPath = join(resolveOpenClawStateDir(), "subagents", "runs.json");
if (!existsSync(registryPath)) {return "";}
try {
const raw = JSON.parse(readFileSync(registryPath, "utf-8")) as {
runs?: Record<string, Record<string, unknown>>;
};
for (const entry of Object.values(raw.runs ?? {})) {
if (entry.childSessionKey !== sessionKey) {continue;}
const requester = typeof entry.requesterSessionKey === "string" ? entry.requesterSessionKey : "";
const match = requester.match(/^agent:[^:]+:web:(.+)$/);
return match?.[1] ?? "";
}
} catch {
// ignore
}
return "";
}
function ensureSubagentRegistered(sessionKey: string): boolean {
if (hasActiveSubagent(sessionKey)) {return true;}
const parentWebSessionId = deriveSubagentParentSessionId(sessionKey);
return ensureRegisteredFromDisk(sessionKey, parentWebSessionId);
}
export async function POST(req: Request) {
const {
messages,
sessionId,
}: { messages: UIMessage[]; sessionId?: string } = await req.json();
sessionKey,
}: { messages: UIMessage[]; sessionId?: string; sessionKey?: string } = await req.json();
// Extract the latest user message text
const lastUserMessage = messages.filter((m) => m.role === "user").pop();
@ -35,10 +74,15 @@ export async function POST(req: Request) {
return new Response("No message provided", { status: 400 });
}
const isSubagentSession = typeof sessionKey === "string" && sessionKey.includes(":subagent:");
// Reject if a run is already active for this session.
if (sessionId && hasActiveRun(sessionId)) {
if (!isSubagentSession && sessionId && hasActiveRun(sessionId)) {
return new Response("Active run in progress", { status: 409 });
}
if (isSubagentSession && isSubagentRunning(sessionKey)) {
return new Response("Active subagent run in progress", { status: 409 });
}
// Resolve workspace file paths to be agent-cwd-relative.
let agentMessage = userText;
@ -52,7 +96,15 @@ export async function POST(req: Request) {
// Persist the user message server-side so it survives a page reload
// even if the client never gets a chance to save.
if (sessionId && lastUserMessage) {
if (isSubagentSession && sessionKey && lastUserMessage) {
if (!ensureSubagentRegistered(sessionKey)) {
return new Response("Subagent not found", { status: 404 });
}
persistSubagentUserMessage(sessionKey, {
id: lastUserMessage.id,
text: userText,
});
} else if (sessionId && lastUserMessage) {
persistUserMessage(sessionId, {
id: lastUserMessage.id,
content: userText,
@ -62,7 +114,14 @@ export async function POST(req: Request) {
// Start the agent run (decoupled from this HTTP connection).
// The child process will keep running even if this response is cancelled.
if (sessionId) {
if (isSubagentSession && sessionKey) {
if (!reactivateSubagent(sessionKey)) {
return new Response("Subagent not found", { status: 404 });
}
if (!spawnSubagentMessage(sessionKey, agentMessage)) {
return new Response("Failed to start subagent run", { status: 500 });
}
} else if (sessionId) {
try {
startRun({
sessionId,
@ -84,15 +143,38 @@ export async function POST(req: Request) {
const stream = new ReadableStream({
start(controller) {
if (!sessionId) {
if (!sessionId && !sessionKey) {
// No session — shouldn't happen but close gracefully.
controller.close();
return;
}
unsubscribe = subscribeToRun(
sessionId,
(event: SseEvent | null) => {
unsubscribe = isSubagentSession && sessionKey
? subscribeToSubagent(
sessionKey,
(event: SubagentSseEvent | null) => {
if (closed) {return;}
if (event === null) {
closed = true;
try {
controller.close();
} catch {
/* already closed */
}
return;
}
try {
const json = JSON.stringify(event);
controller.enqueue(encoder.encode(`data: ${json}\n\n`));
} catch {
/* ignore enqueue errors on closed stream */
}
},
{ replay: false },
)
: subscribeToRun(
sessionId as string,
(event: ParentSseEvent | null) => {
if (closed) {return;}
if (event === null) {
// Run completed — close the SSE stream.

View File

@ -5,16 +5,54 @@
* The child process is sent SIGTERM and the run transitions to "error" state.
*/
import { abortRun } from "@/lib/active-runs";
import {
abortSubagent,
hasActiveSubagent,
isSubagentRunning,
ensureRegisteredFromDisk,
} from "@/lib/subagent-runs";
import { existsSync, readFileSync } from "node:fs";
import { join } from "node:path";
import { resolveOpenClawStateDir } from "@/lib/workspace";
export const runtime = "nodejs";
function deriveSubagentParentSessionId(sessionKey: string): string {
const registryPath = join(resolveOpenClawStateDir(), "subagents", "runs.json");
if (!existsSync(registryPath)) {return "";}
try {
const raw = JSON.parse(readFileSync(registryPath, "utf-8")) as {
runs?: Record<string, Record<string, unknown>>;
};
for (const entry of Object.values(raw.runs ?? {})) {
if (entry.childSessionKey !== sessionKey) {continue;}
const requester = typeof entry.requesterSessionKey === "string" ? entry.requesterSessionKey : "";
const match = requester.match(/^agent:[^:]+:web:(.+)$/);
return match?.[1] ?? "";
}
} catch {
// ignore
}
return "";
}
export async function POST(req: Request) {
const body: { sessionId?: string } = await req
const body: { sessionId?: string; sessionKey?: string } = await req
.json()
.catch(() => ({}));
const isSubagentSession = typeof body.sessionKey === "string" && body.sessionKey.includes(":subagent:");
if (isSubagentSession && body.sessionKey) {
if (!hasActiveSubagent(body.sessionKey)) {
const parentWebSessionId = deriveSubagentParentSessionId(body.sessionKey);
ensureRegisteredFromDisk(body.sessionKey, parentWebSessionId);
}
const aborted = isSubagentRunning(body.sessionKey) ? abortSubagent(body.sessionKey) : false;
return Response.json({ aborted });
}
if (!body.sessionId) {
return new Response("sessionId required", { status: 400 });
return new Response("sessionId or subagent sessionKey required", { status: 400 });
}
const aborted = abortRun(body.sessionId);