fix: route compaction completion notice through block reply pipeline

Previously the completion notice bypassed the block-reply pipeline by
calling opts.onBlockReply directly after the pipeline had already been
flushed and stopped. This meant timeout/abort handling and serial
delivery guarantees did not apply to the notice, risking stalls or
out-of-order delivery in streaming/routed runs.

Fix: enqueue the completion notice into blockReplyPipeline *before*
flush so it is delivered through the same path as every other block
reply. The non-streaming fallback (verboseNotices) is preserved for
runs where no pipeline exists.

Also removes the now-unnecessary direct opts.onBlockReply call and
cleans up the redundant suffix in the pre-flush path (count suffix is
still included in the verboseNotices fallback path where count is
available).

Addresses P1 review comment on PR #38805.
This commit is contained in:
zidongdesign 2026-03-08 10:42:45 +08:00 committed by Josh Lehman
parent 8e216cbb4e
commit b9beb6869e
No known key found for this signature in database
GPG Key ID: D141B425AC7F876B

View File

@ -414,6 +414,27 @@ export async function runReplyAgent(params: {
const payloadArray = runResult.payloads ?? [];
// If compaction completed, enqueue the completion notice into the pipeline
// *before* flushing so it benefits from the same timeout/abort/serial-
// delivery guarantees as every other block reply. The count update and
// post-compaction context injection still happen later (after flush) because
// they don't affect the user-visible notice text at this point — we use a
// placeholder suffix here and the full count is logged separately.
if (autoCompactionCompleted && blockReplyPipeline) {
const verboseEnabled = resolvedVerboseLevel !== "off";
const completionText = verboseEnabled
? `🧹 Auto-compaction complete.`
: `✅ Context compacted.`;
const currentMessageId = sessionCtx.MessageSidFull ?? sessionCtx.MessageSid;
blockReplyPipeline.enqueue(
applyReplyToMode({
text: completionText,
replyToId: currentMessageId,
replyToCurrent: true,
}),
);
}
if (blockReplyPipeline) {
await blockReplyPipeline.flush({ force: true });
blockReplyPipeline.stop();
@ -706,21 +727,12 @@ export async function runReplyAgent(params: {
? `🧹 Auto-compaction complete${suffix}.`
: `✅ Context compacted${suffix}.`;
// In block-streaming mode, onBlockReply bypasses buildReplyPayloads, so
// we must deliver the completion notice the same way the start notice was
// sent (via onBlockReply directly). Otherwise the user sees the "🧹
// Compacting context..." start notice but never receives the completion.
// Apply replyToMode so the notice is threaded consistently with normal
// replies when replyToMode=all|first is configured.
if (opts?.onBlockReply) {
const currentMessageId = sessionCtx.MessageSidFull ?? sessionCtx.MessageSid;
const noticePayload = applyReplyToMode({
text: completionText,
replyToId: currentMessageId,
replyToCurrent: true,
});
await opts.onBlockReply(noticePayload);
} else {
// In block-streaming mode the completion notice was already enqueued into
// blockReplyPipeline before flush (see above), so it travels through the
// normal timeout/abort/serial-delivery path without bypassing the pipeline.
// Here we only handle the non-streaming fallback: push into verboseNotices
// so it appears as a final payload alongside other verbose output.
if (!blockReplyPipeline) {
verboseNotices.push({ text: completionText });
}
}