openclaw/src/auto-reply/reply/block-reply-coalescer.ts
Tyler Yust 9ef24fd400
fix: flush block streaming on paragraph boundaries for chunkMode=newline (#7014)
* feat: Implement paragraph boundary flushing in block streaming

- Added `flushOnParagraph` option to `BlockReplyChunking` for immediate flushing on paragraph breaks.
- Updated `EmbeddedBlockChunker` to handle paragraph boundaries during chunking.
- Enhanced `createBlockReplyCoalescer` to support flushing on enqueue.
- Added tests to verify behavior of flushing with and without `flushOnEnqueue` set.
- Updated relevant types and interfaces to include `flushOnParagraph` and `flushOnEnqueue` options.

* fix: Improve streaming behavior and enhance block chunking logic

- Resolved issue with stuck typing indicator after streamed BlueBubbles replies.
- Refactored `EmbeddedBlockChunker` to streamline fence-split handling and ensure maxChars fallback for newline chunking.
- Added tests to validate new chunking behavior, including handling of paragraph breaks and fence scenarios.
- Updated changelog to reflect these changes.

* test: Add test for clamping long paragraphs in EmbeddedBlockChunker

- Introduced a new test case to verify that long paragraphs are correctly clamped to maxChars when flushOnParagraph is enabled.
- Updated logic in EmbeddedBlockChunker to handle cases where the next paragraph break exceeds maxChars, ensuring proper chunking behavior.

* refactor: streamline logging and improve error handling in message processing

- Removed verbose logging statements from the `processMessage` function to reduce clutter.
- Enhanced error handling by using `runtime.error` for typing restart failures.
- Updated the `applySystemPromptOverrideToSession` function to accept a string directly instead of a function, simplifying the prompt application process.
- Adjusted the `runEmbeddedAttempt` function to directly use the system prompt override without invoking it as a function.
2026-02-02 01:22:41 -08:00

148 lines
3.8 KiB
TypeScript

import type { ReplyPayload } from "../types.js";
import type { BlockStreamingCoalescing } from "./block-streaming.js";
export type BlockReplyCoalescer = {
enqueue: (payload: ReplyPayload) => void;
flush: (options?: { force?: boolean }) => Promise<void>;
hasBuffered: () => boolean;
stop: () => void;
};
export function createBlockReplyCoalescer(params: {
config: BlockStreamingCoalescing;
shouldAbort: () => boolean;
onFlush: (payload: ReplyPayload) => Promise<void> | void;
}): BlockReplyCoalescer {
const { config, shouldAbort, onFlush } = params;
const minChars = Math.max(1, Math.floor(config.minChars));
const maxChars = Math.max(minChars, Math.floor(config.maxChars));
const idleMs = Math.max(0, Math.floor(config.idleMs));
const joiner = config.joiner ?? "";
const flushOnEnqueue = config.flushOnEnqueue === true;
let bufferText = "";
let bufferReplyToId: ReplyPayload["replyToId"];
let bufferAudioAsVoice: ReplyPayload["audioAsVoice"];
let idleTimer: NodeJS.Timeout | undefined;
const clearIdleTimer = () => {
if (!idleTimer) {
return;
}
clearTimeout(idleTimer);
idleTimer = undefined;
};
const resetBuffer = () => {
bufferText = "";
bufferReplyToId = undefined;
bufferAudioAsVoice = undefined;
};
const scheduleIdleFlush = () => {
if (idleMs <= 0) {
return;
}
clearIdleTimer();
idleTimer = setTimeout(() => {
void flush({ force: false });
}, idleMs);
};
const flush = async (options?: { force?: boolean }) => {
clearIdleTimer();
if (shouldAbort()) {
resetBuffer();
return;
}
if (!bufferText) {
return;
}
if (!options?.force && !flushOnEnqueue && bufferText.length < minChars) {
scheduleIdleFlush();
return;
}
const payload: ReplyPayload = {
text: bufferText,
replyToId: bufferReplyToId,
audioAsVoice: bufferAudioAsVoice,
};
resetBuffer();
await onFlush(payload);
};
const enqueue = (payload: ReplyPayload) => {
if (shouldAbort()) {
return;
}
const hasMedia = Boolean(payload.mediaUrl) || (payload.mediaUrls?.length ?? 0) > 0;
const text = payload.text ?? "";
const hasText = text.trim().length > 0;
if (hasMedia) {
void flush({ force: true });
void onFlush(payload);
return;
}
if (!hasText) {
return;
}
// When flushOnEnqueue is set (chunkMode="newline"), each enqueued payload is treated
// as a separate paragraph and flushed immediately so delivery matches streaming boundaries.
if (flushOnEnqueue) {
if (bufferText) {
void flush({ force: true });
}
bufferReplyToId = payload.replyToId;
bufferAudioAsVoice = payload.audioAsVoice;
bufferText = text;
void flush({ force: true });
return;
}
if (
bufferText &&
(bufferReplyToId !== payload.replyToId || bufferAudioAsVoice !== payload.audioAsVoice)
) {
void flush({ force: true });
}
if (!bufferText) {
bufferReplyToId = payload.replyToId;
bufferAudioAsVoice = payload.audioAsVoice;
}
const nextText = bufferText ? `${bufferText}${joiner}${text}` : text;
if (nextText.length > maxChars) {
if (bufferText) {
void flush({ force: true });
bufferReplyToId = payload.replyToId;
bufferAudioAsVoice = payload.audioAsVoice;
if (text.length >= maxChars) {
void onFlush(payload);
return;
}
bufferText = text;
scheduleIdleFlush();
return;
}
void onFlush(payload);
return;
}
bufferText = nextText;
if (bufferText.length >= maxChars) {
void flush({ force: true });
return;
}
scheduleIdleFlush();
};
return {
enqueue,
flush,
hasBuffered: () => Boolean(bufferText),
stop: () => clearIdleTimer(),
};
}