web: filter silent-reply tokens and remove subagent event routing from parent stream

This commit is contained in:
kumarabhirup 2026-02-21 12:32:16 -08:00
parent f3ae8127da
commit 3ee2528b75
No known key found for this signature in database
GPG Key ID: DB7CA2289CAB0167

View File

@ -30,10 +30,6 @@ import {
parseErrorFromStderr,
} from "./agent-runner";
import {
routeRawEvent as routeSubagentEvent,
ensureRegisteredFromDisk,
hasActiveSubagent as hasSubagentRun,
activateGatewayFallback,
hasRunningSubagentsForParent,
} from "./subagent-runs";
@ -88,6 +84,23 @@ export type ActiveRun = {
const PERSIST_INTERVAL_MS = 2_000;
const CLEANUP_GRACE_MS = 30_000;
const SILENT_REPLY_TOKEN = "NO_REPLY";
/**
* Detect leaked silent-reply fragments in finalized text parts.
* The agent runner suppresses full "NO_REPLY" tokens, but during streaming
* the model may emit a partial prefix (e.g. "NO") before the full token is
* assembled and caught. This catches both the full token and known partial
* prefixes so they don't leak into persisted/displayed messages.
*/
function isLeakedSilentReplyToken(text: string): boolean {
const t = text.trim();
if (!t) {return false;}
if (new RegExp(`^${SILENT_REPLY_TOKEN}\\W*$`).test(t)) {return true;}
if (SILENT_REPLY_TOKEN.startsWith(t) && t.length >= 2 && t.length < SILENT_REPLY_TOKEN.length) {return true;}
return false;
}
// Evaluated per-call so it tracks profile switches at runtime.
function webChatDir(): string { return resolveWebChatDir(); }
function indexFile(): string { return join(webChatDir(), "index.json"); }
@ -477,6 +490,15 @@ function wireChildProcess(run: ActiveRun): void {
const closeText = () => {
if (textStarted) {
if (accTextIdx >= 0) {
const part = run.accumulated.parts[accTextIdx] as { type: "text"; text: string };
if (isLeakedSilentReplyToken(part.text)) {
run.accumulated.parts.splice(accTextIdx, 1);
for (const [k, v] of accToolMap) {
if (v > accTextIdx) { accToolMap.set(k, v - 1); }
}
}
}
emit({ type: "text-end", id: currentTextId });
textStarted = false;
}
@ -514,9 +536,6 @@ function wireChildProcess(run: ActiveRun): void {
const rl = createInterface({ input: child.stdout! });
const parentSessionKey = `agent:main:web:${run.sessionId}`;
// Track which subagent session keys we've already attempted to register
const seenSubagentKeys = new Set<string>();
// Prevent unhandled 'error' events on the readline interface.
// When the child process fails to start (e.g. ENOENT — missing script)
// the stdout pipe is destroyed and readline re-emits the error. Without
@ -784,28 +803,6 @@ function wireChildProcess(run: ActiveRun): void {
return;
}
// ── Route non-parent events to SubagentRunManager ──
// The CLI child process receives ALL gateway broadcasts, including
// events from subagent runs. Filter them out of the parent chat
// and route to the SubagentRunManager for separate streaming.
if (ev.sessionKey && ev.sessionKey !== parentSessionKey) {
const childKey = ev.sessionKey;
if (!hasSubagentRun(childKey) && !seenSubagentKeys.has(childKey)) {
if (ensureRegisteredFromDisk(childKey, run.sessionId)) {
seenSubagentKeys.add(childKey);
}
}
routeSubagentEvent(childKey, {
event: ev.event,
stream: ev.stream,
data: ev.data,
globalSeq: typeof (ev as Record<string, unknown>).globalSeq === "number"
? (ev as Record<string, unknown>).globalSeq as number
: undefined,
});
return;
}
// Track the global event cursor from the gateway for replay on handoff.
const gSeq = typeof (ev as Record<string, unknown>).globalSeq === "number"
? (ev as Record<string, unknown>).globalSeq as number
@ -865,9 +862,6 @@ function wireChildProcess(run: ActiveRun): void {
// keep the SSE stream open and wait for announcement-triggered
// parent turns via subscribe-only CLI NDJSON.
if (exitedClean && hasRunningSubagents) {
// The parent's NDJSON stream has ended; running subagents lose their
// original event source, so switch each subagent to subscribe children.
activateGatewayFallback();
run.status = "waiting-for-subagents";
openStatusReasoning("Waiting for subagent results...");
@ -1040,9 +1034,14 @@ function flushPersistence(run: ActiveRun) {
return; // Nothing to persist yet.
}
// Filter out leaked silent-reply text fragments before persisting.
const cleanParts = parts.filter((p) =>
p.type !== "text" || !isLeakedSilentReplyToken((p as { text: string }).text),
);
// Build content text from text parts for the backwards-compatible
// content field (used when parts are not available).
const text = parts
const text = cleanParts
.filter((p): p is { type: "text"; text: string } => p.type === "text")
.map((p) => p.text)
.join("");
@ -1052,7 +1051,7 @@ function flushPersistence(run: ActiveRun) {
id: run.accumulated.id,
role: "assistant",
content: text,
parts, // Ordered parts — preserves interleaving of reasoning, tools, text
parts: cleanParts,
timestamp: new Date().toISOString(),
};
if (isStillStreaming) {