From 197ef0a1f510dd50618084dfb1ff78e6f2090d5e Mon Sep 17 00:00:00 2001 From: zidongdesign Date: Sun, 8 Mar 2026 11:39:06 +0800 Subject: [PATCH] fix: send compaction completion notice after pipeline stop to preserve didStream Enqueueing the completion notice into blockReplyPipeline before flush caused didStream() to return true even when no assistant content was streamed. buildReplyPayloads drops all finalPayloads when didStream() is true, so the real assistant reply could be silently discarded on non-streaming model paths (e.g. pi-embedded-subscribe) that fill assistantTexts without emitting block replies. Fix: move the completion notice send to *after* pipeline flush+stop, using a fire-and-forget Promise.race with blockReplyTimeoutMs. This keeps the timeout guarantee (satisfying the previous P1) while not touching didStream() at all. Non-streaming fallback (verboseNotices) is unchanged. Addresses P1 review comment on PR #38805. --- src/auto-reply/reply/agent-runner.ts | 55 ++++++++++++++++------------ 1 file changed, 31 insertions(+), 24 deletions(-) diff --git a/src/auto-reply/reply/agent-runner.ts b/src/auto-reply/reply/agent-runner.ts index e6acb1d9af5..7bec12d1c8a 100644 --- a/src/auto-reply/reply/agent-runner.ts +++ b/src/auto-reply/reply/agent-runner.ts @@ -414,30 +414,36 @@ export async function runReplyAgent(params: { const payloadArray = runResult.payloads ?? []; - // If compaction completed, enqueue the completion notice into the pipeline - // *before* flushing so it benefits from the same timeout/abort/serial- - // delivery guarantees as every other block reply. The count update and - // post-compaction context injection still happen later (after flush) because - // they don't affect the user-visible notice text at this point โ€” we use a - // placeholder suffix here and the full count is logged separately. - if (autoCompactionCompleted && blockReplyPipeline) { + if (blockReplyPipeline) { + await blockReplyPipeline.flush({ force: true }); + blockReplyPipeline.stop(); + } + + // Send the compaction completion notice *after* the pipeline has flushed and + // stopped, so the enqueue does not set didStream() = true and cause + // buildReplyPayloads to discard the real assistant reply. We still apply a + // timeout so the notice cannot stall the run indefinitely. + if (autoCompactionCompleted && opts?.onBlockReply) { const verboseEnabled = resolvedVerboseLevel !== "off"; const completionText = verboseEnabled ? `๐Ÿงน Auto-compaction complete.` : `โœ… Context compacted.`; const currentMessageId = sessionCtx.MessageSidFull ?? sessionCtx.MessageSid; - blockReplyPipeline.enqueue( - applyReplyToMode({ - text: completionText, - replyToId: currentMessageId, - replyToCurrent: true, - }), - ); - } - - if (blockReplyPipeline) { - await blockReplyPipeline.flush({ force: true }); - blockReplyPipeline.stop(); + const noticePayload = applyReplyToMode({ + text: completionText, + replyToId: currentMessageId, + replyToCurrent: true, + }); + // Fire-and-forget with timeout โ€” best-effort delivery; failure must not + // propagate to the caller. + void Promise.race([ + opts.onBlockReply(noticePayload), + new Promise((_, reject) => + setTimeout(() => reject(new Error("compaction notice timeout")), blockReplyTimeoutMs), + ), + ]).catch(() => { + // Intentionally swallowed โ€” the notice is informational only. + }); } if (pendingToolTasks.size > 0) { await Promise.allSettled(pendingToolTasks); @@ -727,11 +733,12 @@ export async function runReplyAgent(params: { ? `๐Ÿงน Auto-compaction complete${suffix}.` : `โœ… Context compacted${suffix}.`; - // In block-streaming mode the completion notice was already enqueued into - // blockReplyPipeline before flush (see above), so it travels through the - // normal timeout/abort/serial-delivery path without bypassing the pipeline. - // Here we only handle the non-streaming fallback: push into verboseNotices - // so it appears as a final payload alongside other verbose output. + // In block-streaming mode the completion notice is sent above (after the + // pipeline has flushed) via a fire-and-forget call to opts.onBlockReply, + // so that it does not set didStream()=true and cause buildReplyPayloads to + // discard the real assistant reply. + // In non-streaming mode, push into verboseNotices so it is included in + // the final payload batch. if (!blockReplyPipeline) { verboseNotices.push({ text: completionText }); }