fix: stop newline block streaming from sending per paragraph

This commit is contained in:
Tyler Yust 2026-03-19 05:39:38 -07:00
parent dfbc23496e
commit 00b2e23014
6 changed files with 71 additions and 39 deletions

View File

@ -11,20 +11,12 @@ function createFlushOnParagraphChunker(params: { minChars: number; maxChars: num
});
}
function drainChunks(chunker: EmbeddedBlockChunker) {
function drainChunks(chunker: EmbeddedBlockChunker, force = false) {
const chunks: string[] = [];
chunker.drain({ force: false, emit: (chunk) => chunks.push(chunk) });
chunker.drain({ force, emit: (chunk) => chunks.push(chunk) });
return chunks;
}
function expectFlushAtFirstParagraphBreak(text: string) {
const chunker = createFlushOnParagraphChunker({ minChars: 100, maxChars: 200 });
chunker.append(text);
const chunks = drainChunks(chunker);
expect(chunks).toEqual(["First paragraph."]);
expect(chunker.bufferedText).toBe("Second paragraph.");
}
describe("EmbeddedBlockChunker", () => {
it("breaks at paragraph boundary right after fence close", () => {
const chunker = new EmbeddedBlockChunker({
@ -54,12 +46,25 @@ describe("EmbeddedBlockChunker", () => {
expect(chunker.bufferedText).toMatch(/^After/);
});
it("flushes paragraph boundaries before minChars when flushOnParagraph is set", () => {
expectFlushAtFirstParagraphBreak("First paragraph.\n\nSecond paragraph.");
it("waits until minChars before flushing paragraph boundaries when flushOnParagraph is set", () => {
const chunker = createFlushOnParagraphChunker({ minChars: 30, maxChars: 200 });
chunker.append("First paragraph.\n\nSecond paragraph.\n\nThird paragraph.");
const chunks = drainChunks(chunker);
expect(chunks).toEqual(["First paragraph.\n\nSecond paragraph."]);
expect(chunker.bufferedText).toBe("Third paragraph.");
});
it("treats blank lines with whitespace as paragraph boundaries when flushOnParagraph is set", () => {
expectFlushAtFirstParagraphBreak("First paragraph.\n \nSecond paragraph.");
it("still force flushes buffered paragraphs below minChars at the end", () => {
const chunker = createFlushOnParagraphChunker({ minChars: 100, maxChars: 200 });
chunker.append("First paragraph.\n \nSecond paragraph.");
expect(drainChunks(chunker)).toEqual([]);
expect(drainChunks(chunker, true)).toEqual(["First paragraph.\n \nSecond paragraph."]);
expect(chunker.bufferedText).toBe("");
});
it("falls back to maxChars when flushOnParagraph is set and no paragraph break exists", () => {
@ -97,7 +102,7 @@ describe("EmbeddedBlockChunker", () => {
it("ignores paragraph breaks inside fences when flushOnParagraph is set", () => {
const chunker = new EmbeddedBlockChunker({
minChars: 100,
minChars: 10,
maxChars: 200,
breakPreference: "paragraph",
flushOnParagraph: true,

View File

@ -5,7 +5,7 @@ export type BlockReplyChunking = {
minChars: number;
maxChars: number;
breakPreference?: "paragraph" | "newline" | "sentence";
/** When true, flush eagerly on \n\n paragraph boundaries regardless of minChars. */
/** When true, prefer \n\n paragraph boundaries once minChars has been satisfied. */
flushOnParagraph?: boolean;
};
@ -129,7 +129,7 @@ export class EmbeddedBlockChunker {
const minChars = Math.max(1, Math.floor(this.#chunking.minChars));
const maxChars = Math.max(minChars, Math.floor(this.#chunking.maxChars));
if (this.#buffer.length < minChars && !force && !this.#chunking.flushOnParagraph) {
if (this.#buffer.length < minChars && !force) {
return;
}
@ -150,12 +150,12 @@ export class EmbeddedBlockChunker {
const reopenPrefix = reopenFence ? `${reopenFence.openLine}\n` : "";
const remainingLength = reopenPrefix.length + (source.length - start);
if (!force && !this.#chunking.flushOnParagraph && remainingLength < minChars) {
if (!force && remainingLength < minChars) {
break;
}
if (this.#chunking.flushOnParagraph && !force) {
const paragraphBreak = findNextParagraphBreak(source, fenceSpans, start);
const paragraphBreak = findNextParagraphBreak(source, fenceSpans, start, minChars);
const paragraphLimit = Math.max(1, maxChars - reopenPrefix.length);
if (paragraphBreak && paragraphBreak.index - start <= paragraphLimit) {
const chunk = `${reopenPrefix}${source.slice(start, paragraphBreak.index)}`;
@ -175,12 +175,7 @@ export class EmbeddedBlockChunker {
const breakResult =
force && remainingLength <= maxChars
? this.#pickSoftBreakIndex(view, fenceSpans, 1, start)
: this.#pickBreakIndex(
view,
fenceSpans,
force || this.#chunking.flushOnParagraph ? 1 : undefined,
start,
);
: this.#pickBreakIndex(view, fenceSpans, force ? 1 : undefined, start);
if (breakResult.index <= 0) {
if (force) {
emit(`${reopenPrefix}${source.slice(start)}`);
@ -205,7 +200,7 @@ export class EmbeddedBlockChunker {
const nextLength =
(reopenFence ? `${reopenFence.openLine}\n`.length : 0) + (source.length - start);
if (nextLength < minChars && !force && !this.#chunking.flushOnParagraph) {
if (nextLength < minChars && !force) {
break;
}
if (nextLength < maxChars && !force && !this.#chunking.flushOnParagraph) {
@ -401,6 +396,7 @@ function findNextParagraphBreak(
buffer: string,
fenceSpans: FenceSpan[],
startIndex = 0,
minCharsFromStart = 1,
): ParagraphBreak | null {
if (startIndex < 0) {
return null;
@ -413,6 +409,9 @@ function findNextParagraphBreak(
if (index < 0) {
continue;
}
if (index - startIndex < minCharsFromStart) {
continue;
}
if (!isSafeFenceBreak(fenceSpans, index)) {
continue;
}

View File

@ -89,8 +89,8 @@ export function createBlockReplyCoalescer(params: {
return;
}
// When flushOnEnqueue is set (chunkMode="newline"), each enqueued payload is treated
// as a separate paragraph and flushed immediately so delivery matches streaming boundaries.
// When flushOnEnqueue is set, treat each enqueued payload as its own outbound block
// and flush immediately instead of waiting for coalescing thresholds.
if (flushOnEnqueue) {
if (bufferText) {
void flush({ force: true });

View File

@ -68,7 +68,7 @@ describe("resolveEffectiveBlockStreamingConfig", () => {
});
expect(resolved.chunking.flushOnParagraph).toBe(true);
expect(resolved.coalescing.flushOnEnqueue).toBe(true);
expect(resolved.coalescing.flushOnEnqueue).toBeUndefined();
expect(resolved.coalescing.joiner).toBe("\n\n");
});

View File

@ -66,7 +66,7 @@ export type BlockStreamingCoalescing = {
maxChars: number;
idleMs: number;
joiner: string;
/** When true, the coalescer flushes the buffer on each enqueue (paragraph-boundary flush). */
/** Internal escape hatch for transports that truly need per-enqueue flushing. */
flushOnEnqueue?: boolean;
};
@ -147,7 +147,7 @@ export function resolveEffectiveBlockStreamingConfig(params: {
: chunking.breakPreference === "newline"
? "\n"
: "\n\n"),
flushOnEnqueue: coalescingDefaults?.flushOnEnqueue ?? chunking.flushOnParagraph === true,
...(coalescingDefaults?.flushOnEnqueue === true ? { flushOnEnqueue: true } : {}),
};
return { chunking, coalescing };
@ -161,9 +161,9 @@ export function resolveBlockStreamingChunking(
const { providerKey, textLimit } = resolveProviderChunkContext(cfg, provider, accountId);
const chunkCfg = cfg?.agents?.defaults?.blockStreamingChunk;
// When chunkMode="newline", the outbound delivery splits on paragraph boundaries.
// The block chunker should flush eagerly on \n\n boundaries during streaming,
// regardless of minChars, so each paragraph is sent as its own message.
// When chunkMode="newline", outbound delivery prefers paragraph boundaries.
// Keep the chunker paragraph-aware during streaming, but still let minChars
// control when a buffered paragraph is ready to flush.
const chunkMode = resolveChunkMode(cfg, providerKey, accountId);
const maxRequested = Math.max(1, Math.floor(chunkCfg?.maxChars ?? DEFAULT_BLOCK_STREAM_MAX));
@ -192,7 +192,6 @@ export function resolveBlockStreamingCoalescing(
maxChars: number;
breakPreference: "paragraph" | "newline" | "sentence";
},
opts?: { chunkMode?: "length" | "newline" },
): BlockStreamingCoalescing | undefined {
const { providerKey, providerId, textLimit } = resolveProviderChunkContext(
cfg,
@ -200,9 +199,6 @@ export function resolveBlockStreamingCoalescing(
accountId,
);
// Resolve the outbound chunkMode so the coalescer can flush on paragraph boundaries
// when chunkMode="newline", matching the delivery-time splitting behavior.
const chunkMode = opts?.chunkMode ?? resolveChunkMode(cfg, providerKey, accountId);
const providerDefaults = providerId
? getChannelPlugin(providerId)?.streaming?.blockStreamingCoalesceDefaults
: undefined;
@ -237,6 +233,5 @@ export function resolveBlockStreamingCoalescing(
maxChars,
idleMs,
joiner,
flushOnEnqueue: chunkMode === "newline",
};
}

View File

@ -675,6 +675,39 @@ describe("block reply coalescer", () => {
coalescer.stop();
});
it("keeps buffering newline-style chunks until minChars is reached", async () => {
vi.useFakeTimers();
const { flushes, coalescer } = createBlockCoalescerHarness({
minChars: 25,
maxChars: 2000,
idleMs: 50,
joiner: "\n\n",
});
coalescer.enqueue({ text: "First paragraph" });
coalescer.enqueue({ text: "Second paragraph" });
await vi.advanceTimersByTimeAsync(50);
expect(flushes).toEqual(["First paragraph\n\nSecond paragraph"]);
coalescer.stop();
});
it("force flushes buffered newline-style chunks even below minChars", async () => {
const { flushes, coalescer } = createBlockCoalescerHarness({
minChars: 100,
maxChars: 2000,
idleMs: 50,
joiner: "\n\n",
});
coalescer.enqueue({ text: "First paragraph" });
coalescer.enqueue({ text: "Second paragraph" });
await coalescer.flush({ force: true });
expect(flushes).toEqual(["First paragraph\n\nSecond paragraph"]);
coalescer.stop();
});
it("flushes immediately per enqueue when flushOnEnqueue is set", async () => {
const cases = [
{