fix: send compaction completion notice after pipeline stop to preserve didStream
Enqueueing the completion notice into blockReplyPipeline before flush caused didStream() to return true even when no assistant content was streamed. buildReplyPayloads drops all finalPayloads when didStream() is true, so the real assistant reply could be silently discarded on non-streaming model paths (e.g. pi-embedded-subscribe) that fill assistantTexts without emitting block replies. Fix: move the completion notice send to *after* pipeline flush+stop, using a fire-and-forget Promise.race with blockReplyTimeoutMs. This keeps the timeout guarantee (satisfying the previous P1) while not touching didStream() at all. Non-streaming fallback (verboseNotices) is unchanged. Addresses P1 review comment on PR #38805.
This commit is contained in:
parent
b9beb6869e
commit
197ef0a1f5
@ -414,30 +414,36 @@ export async function runReplyAgent(params: {
|
||||
|
||||
const payloadArray = runResult.payloads ?? [];
|
||||
|
||||
// If compaction completed, enqueue the completion notice into the pipeline
|
||||
// *before* flushing so it benefits from the same timeout/abort/serial-
|
||||
// delivery guarantees as every other block reply. The count update and
|
||||
// post-compaction context injection still happen later (after flush) because
|
||||
// they don't affect the user-visible notice text at this point — we use a
|
||||
// placeholder suffix here and the full count is logged separately.
|
||||
if (autoCompactionCompleted && blockReplyPipeline) {
|
||||
if (blockReplyPipeline) {
|
||||
await blockReplyPipeline.flush({ force: true });
|
||||
blockReplyPipeline.stop();
|
||||
}
|
||||
|
||||
// Send the compaction completion notice *after* the pipeline has flushed and
|
||||
// stopped, so the enqueue does not set didStream() = true and cause
|
||||
// buildReplyPayloads to discard the real assistant reply. We still apply a
|
||||
// timeout so the notice cannot stall the run indefinitely.
|
||||
if (autoCompactionCompleted && opts?.onBlockReply) {
|
||||
const verboseEnabled = resolvedVerboseLevel !== "off";
|
||||
const completionText = verboseEnabled
|
||||
? `🧹 Auto-compaction complete.`
|
||||
: `✅ Context compacted.`;
|
||||
const currentMessageId = sessionCtx.MessageSidFull ?? sessionCtx.MessageSid;
|
||||
blockReplyPipeline.enqueue(
|
||||
applyReplyToMode({
|
||||
text: completionText,
|
||||
replyToId: currentMessageId,
|
||||
replyToCurrent: true,
|
||||
}),
|
||||
);
|
||||
}
|
||||
|
||||
if (blockReplyPipeline) {
|
||||
await blockReplyPipeline.flush({ force: true });
|
||||
blockReplyPipeline.stop();
|
||||
const noticePayload = applyReplyToMode({
|
||||
text: completionText,
|
||||
replyToId: currentMessageId,
|
||||
replyToCurrent: true,
|
||||
});
|
||||
// Fire-and-forget with timeout — best-effort delivery; failure must not
|
||||
// propagate to the caller.
|
||||
void Promise.race([
|
||||
opts.onBlockReply(noticePayload),
|
||||
new Promise<void>((_, reject) =>
|
||||
setTimeout(() => reject(new Error("compaction notice timeout")), blockReplyTimeoutMs),
|
||||
),
|
||||
]).catch(() => {
|
||||
// Intentionally swallowed — the notice is informational only.
|
||||
});
|
||||
}
|
||||
if (pendingToolTasks.size > 0) {
|
||||
await Promise.allSettled(pendingToolTasks);
|
||||
@ -727,11 +733,12 @@ export async function runReplyAgent(params: {
|
||||
? `🧹 Auto-compaction complete${suffix}.`
|
||||
: `✅ Context compacted${suffix}.`;
|
||||
|
||||
// In block-streaming mode the completion notice was already enqueued into
|
||||
// blockReplyPipeline before flush (see above), so it travels through the
|
||||
// normal timeout/abort/serial-delivery path without bypassing the pipeline.
|
||||
// Here we only handle the non-streaming fallback: push into verboseNotices
|
||||
// so it appears as a final payload alongside other verbose output.
|
||||
// In block-streaming mode the completion notice is sent above (after the
|
||||
// pipeline has flushed) via a fire-and-forget call to opts.onBlockReply,
|
||||
// so that it does not set didStream()=true and cause buildReplyPayloads to
|
||||
// discard the real assistant reply.
|
||||
// In non-streaming mode, push into verboseNotices so it is included in
|
||||
// the final payload batch.
|
||||
if (!blockReplyPipeline) {
|
||||
verboseNotices.push({ text: completionText });
|
||||
}
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user