diff --git a/apps/web/lib/active-runs.ts b/apps/web/lib/active-runs.ts index a3540addb08..12b104d0be1 100644 --- a/apps/web/lib/active-runs.ts +++ b/apps/web/lib/active-runs.ts @@ -30,10 +30,6 @@ import { parseErrorFromStderr, } from "./agent-runner"; import { - routeRawEvent as routeSubagentEvent, - ensureRegisteredFromDisk, - hasActiveSubagent as hasSubagentRun, - activateGatewayFallback, hasRunningSubagentsForParent, } from "./subagent-runs"; @@ -88,6 +84,23 @@ export type ActiveRun = { const PERSIST_INTERVAL_MS = 2_000; const CLEANUP_GRACE_MS = 30_000; + +const SILENT_REPLY_TOKEN = "NO_REPLY"; + +/** + * Detect leaked silent-reply fragments in finalized text parts. + * The agent runner suppresses full "NO_REPLY" tokens, but during streaming + * the model may emit a partial prefix (e.g. "NO") before the full token is + * assembled and caught. This catches both the full token and known partial + * prefixes so they don't leak into persisted/displayed messages. + */ +function isLeakedSilentReplyToken(text: string): boolean { + const t = text.trim(); + if (!t) {return false;} + if (new RegExp(`^${SILENT_REPLY_TOKEN}\\W*$`).test(t)) {return true;} + if (SILENT_REPLY_TOKEN.startsWith(t) && t.length >= 2 && t.length < SILENT_REPLY_TOKEN.length) {return true;} + return false; +} // Evaluated per-call so it tracks profile switches at runtime. function webChatDir(): string { return resolveWebChatDir(); } function indexFile(): string { return join(webChatDir(), "index.json"); } @@ -477,6 +490,15 @@ function wireChildProcess(run: ActiveRun): void { const closeText = () => { if (textStarted) { + if (accTextIdx >= 0) { + const part = run.accumulated.parts[accTextIdx] as { type: "text"; text: string }; + if (isLeakedSilentReplyToken(part.text)) { + run.accumulated.parts.splice(accTextIdx, 1); + for (const [k, v] of accToolMap) { + if (v > accTextIdx) { accToolMap.set(k, v - 1); } + } + } + } emit({ type: "text-end", id: currentTextId }); textStarted = false; } @@ -514,9 +536,6 @@ function wireChildProcess(run: ActiveRun): void { const rl = createInterface({ input: child.stdout! }); const parentSessionKey = `agent:main:web:${run.sessionId}`; - // Track which subagent session keys we've already attempted to register - const seenSubagentKeys = new Set(); - // Prevent unhandled 'error' events on the readline interface. // When the child process fails to start (e.g. ENOENT — missing script) // the stdout pipe is destroyed and readline re-emits the error. Without @@ -784,28 +803,6 @@ function wireChildProcess(run: ActiveRun): void { return; } - // ── Route non-parent events to SubagentRunManager ── - // The CLI child process receives ALL gateway broadcasts, including - // events from subagent runs. Filter them out of the parent chat - // and route to the SubagentRunManager for separate streaming. - if (ev.sessionKey && ev.sessionKey !== parentSessionKey) { - const childKey = ev.sessionKey; - if (!hasSubagentRun(childKey) && !seenSubagentKeys.has(childKey)) { - if (ensureRegisteredFromDisk(childKey, run.sessionId)) { - seenSubagentKeys.add(childKey); - } - } - routeSubagentEvent(childKey, { - event: ev.event, - stream: ev.stream, - data: ev.data, - globalSeq: typeof (ev as Record).globalSeq === "number" - ? (ev as Record).globalSeq as number - : undefined, - }); - return; - } - // Track the global event cursor from the gateway for replay on handoff. const gSeq = typeof (ev as Record).globalSeq === "number" ? (ev as Record).globalSeq as number @@ -865,9 +862,6 @@ function wireChildProcess(run: ActiveRun): void { // keep the SSE stream open and wait for announcement-triggered // parent turns via subscribe-only CLI NDJSON. if (exitedClean && hasRunningSubagents) { - // The parent's NDJSON stream has ended; running subagents lose their - // original event source, so switch each subagent to subscribe children. - activateGatewayFallback(); run.status = "waiting-for-subagents"; openStatusReasoning("Waiting for subagent results..."); @@ -1040,9 +1034,14 @@ function flushPersistence(run: ActiveRun) { return; // Nothing to persist yet. } + // Filter out leaked silent-reply text fragments before persisting. + const cleanParts = parts.filter((p) => + p.type !== "text" || !isLeakedSilentReplyToken((p as { text: string }).text), + ); + // Build content text from text parts for the backwards-compatible // content field (used when parts are not available). - const text = parts + const text = cleanParts .filter((p): p is { type: "text"; text: string } => p.type === "text") .map((p) => p.text) .join(""); @@ -1052,7 +1051,7 @@ function flushPersistence(run: ActiveRun) { id: run.accumulated.id, role: "assistant", content: text, - parts, // Ordered parts — preserves interleaving of reasoning, tools, text + parts: cleanParts, timestamp: new Date().toISOString(), }; if (isStillStreaming) {