diff --git a/src/auto-reply/reply/agent-runner.ts b/src/auto-reply/reply/agent-runner.ts index e6acb1d9af5..7bec12d1c8a 100644 --- a/src/auto-reply/reply/agent-runner.ts +++ b/src/auto-reply/reply/agent-runner.ts @@ -414,30 +414,36 @@ export async function runReplyAgent(params: { const payloadArray = runResult.payloads ?? []; - // If compaction completed, enqueue the completion notice into the pipeline - // *before* flushing so it benefits from the same timeout/abort/serial- - // delivery guarantees as every other block reply. The count update and - // post-compaction context injection still happen later (after flush) because - // they don't affect the user-visible notice text at this point โ€” we use a - // placeholder suffix here and the full count is logged separately. - if (autoCompactionCompleted && blockReplyPipeline) { + if (blockReplyPipeline) { + await blockReplyPipeline.flush({ force: true }); + blockReplyPipeline.stop(); + } + + // Send the compaction completion notice *after* the pipeline has flushed and + // stopped, so the enqueue does not set didStream() = true and cause + // buildReplyPayloads to discard the real assistant reply. We still apply a + // timeout so the notice cannot stall the run indefinitely. + if (autoCompactionCompleted && opts?.onBlockReply) { const verboseEnabled = resolvedVerboseLevel !== "off"; const completionText = verboseEnabled ? `๐Ÿงน Auto-compaction complete.` : `โœ… Context compacted.`; const currentMessageId = sessionCtx.MessageSidFull ?? sessionCtx.MessageSid; - blockReplyPipeline.enqueue( - applyReplyToMode({ - text: completionText, - replyToId: currentMessageId, - replyToCurrent: true, - }), - ); - } - - if (blockReplyPipeline) { - await blockReplyPipeline.flush({ force: true }); - blockReplyPipeline.stop(); + const noticePayload = applyReplyToMode({ + text: completionText, + replyToId: currentMessageId, + replyToCurrent: true, + }); + // Fire-and-forget with timeout โ€” best-effort delivery; failure must not + // propagate to the caller. + void Promise.race([ + opts.onBlockReply(noticePayload), + new Promise((_, reject) => + setTimeout(() => reject(new Error("compaction notice timeout")), blockReplyTimeoutMs), + ), + ]).catch(() => { + // Intentionally swallowed โ€” the notice is informational only. + }); } if (pendingToolTasks.size > 0) { await Promise.allSettled(pendingToolTasks); @@ -727,11 +733,12 @@ export async function runReplyAgent(params: { ? `๐Ÿงน Auto-compaction complete${suffix}.` : `โœ… Context compacted${suffix}.`; - // In block-streaming mode the completion notice was already enqueued into - // blockReplyPipeline before flush (see above), so it travels through the - // normal timeout/abort/serial-delivery path without bypassing the pipeline. - // Here we only handle the non-streaming fallback: push into verboseNotices - // so it appears as a final payload alongside other verbose output. + // In block-streaming mode the completion notice is sent above (after the + // pipeline has flushed) via a fire-and-forget call to opts.onBlockReply, + // so that it does not set didStream()=true and cause buildReplyPayloads to + // discard the real assistant reply. + // In non-streaming mode, push into verboseNotices so it is included in + // the final payload batch. if (!blockReplyPipeline) { verboseNotices.push({ text: completionText }); }