web: merge subagent-stream into unified stream route
This commit is contained in:
parent
e267fe7df4
commit
54d048bfb9
@ -10,21 +10,112 @@
|
||||
import {
|
||||
getActiveRun,
|
||||
subscribeToRun,
|
||||
type SseEvent,
|
||||
type SseEvent as ParentSseEvent,
|
||||
} from "@/lib/active-runs";
|
||||
import {
|
||||
subscribeToSubagent,
|
||||
hasActiveSubagent,
|
||||
isSubagentRunning,
|
||||
ensureRegisteredFromDisk,
|
||||
ensureSubagentStreamable,
|
||||
type SseEvent as SubagentSseEvent,
|
||||
} from "@/lib/subagent-runs";
|
||||
import { existsSync, readFileSync } from "node:fs";
|
||||
import { join } from "node:path";
|
||||
import { resolveOpenClawStateDir } from "@/lib/workspace";
|
||||
|
||||
export const runtime = "nodejs";
|
||||
export const maxDuration = 600;
|
||||
|
||||
function deriveSubagentParentSessionId(sessionKey: string): string {
|
||||
const registryPath = join(resolveOpenClawStateDir(), "subagents", "runs.json");
|
||||
if (!existsSync(registryPath)) {return "";}
|
||||
try {
|
||||
const raw = JSON.parse(readFileSync(registryPath, "utf-8")) as {
|
||||
runs?: Record<string, Record<string, unknown>>;
|
||||
};
|
||||
for (const entry of Object.values(raw.runs ?? {})) {
|
||||
if (entry.childSessionKey !== sessionKey) {continue;}
|
||||
const requester = typeof entry.requesterSessionKey === "string" ? entry.requesterSessionKey : "";
|
||||
const match = requester.match(/^agent:[^:]+:web:(.+)$/);
|
||||
return match?.[1] ?? "";
|
||||
}
|
||||
} catch {
|
||||
// ignore
|
||||
}
|
||||
return "";
|
||||
}
|
||||
|
||||
export async function GET(req: Request) {
|
||||
const url = new URL(req.url);
|
||||
const sessionId = url.searchParams.get("sessionId");
|
||||
const sessionKey = url.searchParams.get("sessionKey");
|
||||
const isSubagentSession = typeof sessionKey === "string" && sessionKey.includes(":subagent:");
|
||||
|
||||
if (!sessionId) {
|
||||
return new Response("sessionId required", { status: 400 });
|
||||
if (!sessionId && !sessionKey) {
|
||||
return new Response("sessionId or subagent sessionKey required", { status: 400 });
|
||||
}
|
||||
|
||||
const run = getActiveRun(sessionId);
|
||||
if (isSubagentSession && sessionKey) {
|
||||
if (!hasActiveSubagent(sessionKey)) {
|
||||
const parentWebSessionId = deriveSubagentParentSessionId(sessionKey);
|
||||
const registered = ensureRegisteredFromDisk(sessionKey, parentWebSessionId);
|
||||
if (!registered && !hasActiveSubagent(sessionKey)) {
|
||||
return Response.json({ active: false }, { status: 404 });
|
||||
}
|
||||
}
|
||||
ensureSubagentStreamable(sessionKey);
|
||||
const isActive = isSubagentRunning(sessionKey);
|
||||
const encoder = new TextEncoder();
|
||||
let closed = false;
|
||||
let unsubscribe: (() => void) | null = null;
|
||||
|
||||
const stream = new ReadableStream({
|
||||
start(controller) {
|
||||
unsubscribe = subscribeToSubagent(
|
||||
sessionKey,
|
||||
(event: SubagentSseEvent | null) => {
|
||||
if (closed) {return;}
|
||||
if (event === null) {
|
||||
closed = true;
|
||||
try {
|
||||
controller.close();
|
||||
} catch {
|
||||
/* already closed */
|
||||
}
|
||||
return;
|
||||
}
|
||||
try {
|
||||
const json = JSON.stringify(event);
|
||||
controller.enqueue(encoder.encode(`data: ${json}\n\n`));
|
||||
} catch {
|
||||
/* ignore enqueue errors on closed stream */
|
||||
}
|
||||
},
|
||||
{ replay: true },
|
||||
);
|
||||
|
||||
if (!unsubscribe) {
|
||||
closed = true;
|
||||
controller.close();
|
||||
}
|
||||
},
|
||||
cancel() {
|
||||
closed = true;
|
||||
unsubscribe?.();
|
||||
},
|
||||
});
|
||||
|
||||
return new Response(stream, {
|
||||
headers: {
|
||||
"Content-Type": "text/event-stream",
|
||||
"Cache-Control": "no-cache, no-transform",
|
||||
Connection: "keep-alive",
|
||||
"X-Run-Active": isActive ? "true" : "false",
|
||||
},
|
||||
});
|
||||
}
|
||||
const run = getActiveRun(sessionId as string);
|
||||
if (!run) {
|
||||
return Response.json({ active: false }, { status: 404 });
|
||||
}
|
||||
@ -49,8 +140,8 @@ export async function GET(req: Request) {
|
||||
// subscribeToRun with replay=true replays the full event buffer
|
||||
// synchronously, then subscribes for live events.
|
||||
unsubscribe = subscribeToRun(
|
||||
sessionId,
|
||||
(event: SseEvent | null) => {
|
||||
sessionId as string,
|
||||
(event: ParentSseEvent | null) => {
|
||||
if (closed) {return;}
|
||||
if (event === null) {
|
||||
// Run completed — close the SSE stream.
|
||||
|
||||
@ -1,101 +0,0 @@
|
||||
import { subscribeToSubagent, hasActiveSubagent, isSubagentRunning, ensureRegisteredFromDisk, ensureSubagentStreamable } from "@/lib/subagent-runs";
|
||||
import type { SseEvent } from "@/lib/subagent-runs";
|
||||
import { existsSync, readFileSync } from "node:fs";
|
||||
import { join } from "node:path";
|
||||
import { resolveOpenClawStateDir } from "@/lib/workspace";
|
||||
|
||||
export const runtime = "nodejs";
|
||||
export const maxDuration = 600;
|
||||
|
||||
/**
|
||||
* Ensure the subagent is registered in the in-memory SubagentRunManager.
|
||||
* Tries the shared ensureRegisteredFromDisk helper first, which reads the
|
||||
* on-disk registry (~/.openclaw/subagents/runs.json).
|
||||
*/
|
||||
function ensureRegistered(sessionKey: string): boolean {
|
||||
if (hasActiveSubagent(sessionKey)) {return true;}
|
||||
|
||||
// Look up the parent web session ID from the on-disk registry
|
||||
const registryPath = join(resolveOpenClawStateDir(), "subagents", "runs.json");
|
||||
if (!existsSync(registryPath)) {return false;}
|
||||
|
||||
try {
|
||||
const raw = JSON.parse(readFileSync(registryPath, "utf-8")) as { runs?: Record<string, Record<string, unknown>> };
|
||||
const runs = raw?.runs;
|
||||
if (!runs) {return false;}
|
||||
|
||||
for (const entry of Object.values(runs)) {
|
||||
if (entry.childSessionKey === sessionKey) {
|
||||
const rsk = typeof entry.requesterSessionKey === "string" ? entry.requesterSessionKey : "";
|
||||
const webIdMatch = rsk.match(/^agent:[^:]+:web:(.+)$/);
|
||||
const parentWebSessionId = webIdMatch?.[1] ?? "";
|
||||
return ensureRegisteredFromDisk(sessionKey, parentWebSessionId);
|
||||
}
|
||||
}
|
||||
} catch { /* ignore */ }
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
export async function GET(req: Request) {
|
||||
const url = new URL(req.url);
|
||||
const sessionKey = url.searchParams.get("sessionKey");
|
||||
|
||||
if (!sessionKey) {
|
||||
return new Response("sessionKey required", { status: 400 });
|
||||
}
|
||||
|
||||
// Lazily register the subagent so events get buffered
|
||||
const registered = ensureRegistered(sessionKey);
|
||||
if (!registered && !hasActiveSubagent(sessionKey)) {
|
||||
return new Response("Subagent not found", { status: 404 });
|
||||
}
|
||||
|
||||
// For still-running subagents rehydrated from disk, activate the gateway
|
||||
// WebSocket subscription so new events arrive in real time.
|
||||
ensureSubagentStreamable(sessionKey);
|
||||
|
||||
const isActive = isSubagentRunning(sessionKey);
|
||||
const encoder = new TextEncoder();
|
||||
let closed = false;
|
||||
let unsubscribe: (() => void) | null = null;
|
||||
|
||||
const stream = new ReadableStream({
|
||||
start(controller) {
|
||||
unsubscribe = subscribeToSubagent(
|
||||
sessionKey,
|
||||
(event: SseEvent | null) => {
|
||||
if (closed) {return;}
|
||||
if (event === null) {
|
||||
closed = true;
|
||||
try { controller.close(); } catch { /* already closed */ }
|
||||
return;
|
||||
}
|
||||
try {
|
||||
const json = JSON.stringify(event);
|
||||
controller.enqueue(encoder.encode(`data: ${json}\n\n`));
|
||||
} catch { /* ignore */ }
|
||||
},
|
||||
{ replay: true },
|
||||
);
|
||||
|
||||
if (!unsubscribe) {
|
||||
closed = true;
|
||||
controller.close();
|
||||
}
|
||||
},
|
||||
cancel() {
|
||||
closed = true;
|
||||
unsubscribe?.();
|
||||
},
|
||||
});
|
||||
|
||||
return new Response(stream, {
|
||||
headers: {
|
||||
"Content-Type": "text/event-stream",
|
||||
"Cache-Control": "no-cache, no-transform",
|
||||
Connection: "keep-alive",
|
||||
"X-Run-Active": isActive ? "true" : "false",
|
||||
},
|
||||
});
|
||||
}
|
||||
Loading…
x
Reference in New Issue
Block a user