Merge eb90f612e69550a59d9e64b9b988d6f34a871958 into 5e417b44e1540f528d2ae63e3e20229a902d1db2

This commit is contained in:
zidongdesign 2026-03-21 02:53:32 +00:00 committed by GitHub
commit db318454ba
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 121 additions and 23 deletions

View File

@ -199,6 +199,23 @@ export async function runAgentTurnWithFallback(params: {
return text;
};
const blockReplyPipeline = params.blockReplyPipeline;
// Build the delivery handler once so both onAgentEvent (compaction start
// notice) and the onBlockReply field share the same instance. This
// ensures replyToId threading (replyToMode=all|first) is applied to
// compaction notices just like every other block reply.
const blockReplyHandler = params.opts?.onBlockReply
? createBlockReplyDeliveryHandler({
onBlockReply: params.opts.onBlockReply,
currentMessageId: params.sessionCtx.MessageSidFull ?? params.sessionCtx.MessageSid,
normalizeStreamingText,
applyReplyToMode: params.applyReplyToMode,
normalizeMediaPaths: normalizeReplyMediaPaths,
typingSignals: params.typingSignals,
blockStreamingEnabled: params.blockStreamingEnabled,
blockReplyPipeline,
directlySentBlockKeys,
})
: undefined;
const onToolResult = params.opts?.onToolResult;
const fallbackResult = await runWithModelFallback({
...resolveModelFallbackOptions(params.followupRun.run),
@ -394,11 +411,34 @@ export async function runAgentTurnWithFallback(params: {
await params.opts?.onToolStart?.({ name, phase });
}
}
// Track auto-compaction completion and notify UI layer.
// Track auto-compaction and notify higher layers.
if (evt.stream === "compaction") {
const phase = typeof evt.data.phase === "string" ? evt.data.phase : "";
if (phase === "start") {
await params.opts?.onCompactionStart?.();
if (params.opts?.onCompactionStart) {
await params.opts.onCompactionStart();
} else if (params.opts?.onBlockReply) {
// Send directly via opts.onBlockReply (bypassing the
// pipeline) so the notice does not cause final payloads
// to be discarded on non-streaming model paths.
const currentMessageId =
params.sessionCtx.MessageSidFull ?? params.sessionCtx.MessageSid;
const noticePayload = params.applyReplyToMode({
text: "🧹 Compacting context...",
replyToId: currentMessageId,
replyToCurrent: true,
isCompactionNotice: true,
});
try {
await params.opts.onBlockReply(noticePayload);
} catch (err) {
// Non-critical notice delivery failure should not
// bubble out of the fire-and-forget event handler.
logVerbose(
`compaction start notice delivery failed (non-fatal): ${String(err)}`,
);
}
}
}
const completed = evt.data?.completed === true;
if (phase === "end" && completed) {
@ -410,20 +450,7 @@ export async function runAgentTurnWithFallback(params: {
// Always pass onBlockReply so flushBlockReplyBuffer works before tool execution,
// even when regular block streaming is disabled. The handler sends directly
// via opts.onBlockReply when the pipeline isn't available.
onBlockReply: params.opts?.onBlockReply
? createBlockReplyDeliveryHandler({
onBlockReply: params.opts.onBlockReply,
currentMessageId:
params.sessionCtx.MessageSidFull ?? params.sessionCtx.MessageSid,
normalizeStreamingText,
applyReplyToMode: params.applyReplyToMode,
normalizeMediaPaths: normalizeReplyMediaPaths,
typingSignals: params.typingSignals,
blockStreamingEnabled: params.blockStreamingEnabled,
blockReplyPipeline,
directlySentBlockKeys,
})
: undefined,
onBlockReply: blockReplyHandler,
onBlockReplyFlush:
params.blockStreamingEnabled && blockReplyPipeline
? async () => {

View File

@ -418,6 +418,11 @@ export async function runReplyAgent(params: {
await blockReplyPipeline.flush({ force: true });
blockReplyPipeline.stop();
}
// NOTE: The compaction completion notice for block-streaming mode is sent
// further below — after incrementRunCompactionCount — so it can include
// the `(count N)` suffix. Sending it here (before the count is known)
// would omit that information.
if (pendingToolTasks.size > 0) {
await Promise.allSettled(pendingToolTasks);
}
@ -697,9 +702,48 @@ export async function runReplyAgent(params: {
});
}
if (verboseEnabled) {
const suffix = typeof count === "number" ? ` (count ${count})` : "";
verboseNotices.push({ text: `🧹 Auto-compaction complete${suffix}.` });
// Always notify the user when compaction completes — not just in verbose
// mode. The "🧹 Compacting context..." notice was already sent at start,
// so the completion message closes the loop for every user regardless of
// their verbose setting.
const suffix = typeof count === "number" ? ` (count ${count})` : "";
const completionText = verboseEnabled
? `🧹 Auto-compaction complete${suffix}.`
: `✅ Context compacted${suffix}.`;
if (blockReplyPipeline && opts?.onBlockReply) {
// In block-streaming mode, send the completion notice via
// fire-and-forget *after* the pipeline has flushed (so it does not set
// didStream()=true and cause buildReplyPayloads to discard the real
// assistant reply). Now that the count is known we can include it.
const currentMessageId = sessionCtx.MessageSidFull ?? sessionCtx.MessageSid;
const noticePayload = applyReplyToMode({
text: completionText,
replyToId: currentMessageId,
replyToCurrent: true,
isCompactionNotice: true,
});
void Promise.race([
opts.onBlockReply(noticePayload),
new Promise<void>((_, reject) =>
setTimeout(() => reject(new Error("compaction notice timeout")), blockReplyTimeoutMs),
),
]).catch(() => {
// Intentionally swallowed — the notice is informational only.
});
} else {
// Non-streaming: push into verboseNotices with full compaction metadata
// so threading exemptions apply and replyToMode=first does not thread
// the notice instead of the real assistant reply.
const currentMessageId = sessionCtx.MessageSidFull ?? sessionCtx.MessageSid;
verboseNotices.push(
applyReplyToMode({
text: completionText,
replyToId: currentMessageId,
replyToCurrent: true,
isCompactionNotice: true,
}),
);
}
}
if (verboseNotices.length > 0) {

View File

@ -582,8 +582,10 @@ export async function dispatchReplyFromConfig(params: {
if (shouldSuppressReasoningPayload(payload)) {
return;
}
// Accumulate block text for TTS generation after streaming
if (payload.text) {
// Accumulate block text for TTS generation after streaming.
// Exclude compaction status notices — they are informational UI
// signals and must not be synthesised into the spoken reply.
if (payload.text && !payload.isCompactionNotice) {
if (accumulatedBlockText.length > 0) {
accumulatedBlockText += "\n";
}

View File

@ -33,7 +33,12 @@ export function createReplyToModeFilter(
}
if (mode === "off") {
const isExplicit = Boolean(payload.replyToTag) || Boolean(payload.replyToCurrent);
if (opts.allowExplicitReplyTagsWhenOff && isExplicit) {
// Compaction notices must never be threaded when replyToMode=off — even
// if they carry explicit reply tags (replyToCurrent). Honouring the
// explicit tag here would make status notices appear in-thread while
// normal assistant replies stay off-thread, contradicting the off-mode
// expectation. Strip replyToId unconditionally for compaction payloads.
if (opts.allowExplicitReplyTagsWhenOff && isExplicit && !payload.isCompactionNotice) {
return payload;
}
return { ...payload, replyToId: undefined };
@ -42,9 +47,21 @@ export function createReplyToModeFilter(
return payload;
}
if (hasThreaded) {
// Compaction notices are transient status messages that should always
// appear in-thread, even after the first assistant block has already
// consumed the "first" slot. Let them keep their replyToId.
if (payload.isCompactionNotice) {
return payload;
}
return { ...payload, replyToId: undefined };
}
hasThreaded = true;
// Compaction notices are transient status messages — they should be
// threaded (so they appear in-context), but they must not consume the
// "first" slot of the replyToMode=first filter. Skip advancing
// hasThreaded so the real assistant reply still gets replyToId.
if (!payload.isCompactionNotice) {
hasThreaded = true;
}
return payload;
};
}

View File

@ -91,6 +91,10 @@ export type ReplyPayload = {
/** Marks this payload as a reasoning/thinking block. Channels that do not
* have a dedicated reasoning lane (e.g. WhatsApp, web) should suppress it. */
isReasoning?: boolean;
/** Marks this payload as a compaction status notice (start/end).
* Should be excluded from TTS transcript accumulation so compaction
* status lines are not synthesised into the spoken assistant reply. */
isCompactionNotice?: boolean;
/** Channel-specific payload data (per-channel envelope). */
channelData?: Record<string, unknown>;
};

View File

@ -825,6 +825,10 @@ export async function maybeApplyTtsToPayload(params: {
inboundAudio?: boolean;
ttsAuto?: string;
}): Promise<ReplyPayload> {
// Compaction notices are informational UI signals — never synthesise them as speech.
if (params.payload.isCompactionNotice) {
return params.payload;
}
const config = resolveTtsConfig(params.cfg);
const prefsPath = resolveTtsPrefsPath(config);
const autoMode = resolveTtsAutoMode({