Compare commits

...

2 Commits

Author SHA1 Message Date
Tyler Yust
00b2e23014 fix: stop newline block streaming from sending per paragraph 2026-03-19 05:39:38 -07:00
Tyler Yust
dfbc23496e fix: honor BlueBubbles chunk mode and envelope timezone 2026-03-19 04:37:37 -07:00
10 changed files with 141 additions and 68 deletions

View File

@ -11,20 +11,12 @@ function createFlushOnParagraphChunker(params: { minChars: number; maxChars: num
}); });
} }
function drainChunks(chunker: EmbeddedBlockChunker) { function drainChunks(chunker: EmbeddedBlockChunker, force = false) {
const chunks: string[] = []; const chunks: string[] = [];
chunker.drain({ force: false, emit: (chunk) => chunks.push(chunk) }); chunker.drain({ force, emit: (chunk) => chunks.push(chunk) });
return chunks; return chunks;
} }
function expectFlushAtFirstParagraphBreak(text: string) {
const chunker = createFlushOnParagraphChunker({ minChars: 100, maxChars: 200 });
chunker.append(text);
const chunks = drainChunks(chunker);
expect(chunks).toEqual(["First paragraph."]);
expect(chunker.bufferedText).toBe("Second paragraph.");
}
describe("EmbeddedBlockChunker", () => { describe("EmbeddedBlockChunker", () => {
it("breaks at paragraph boundary right after fence close", () => { it("breaks at paragraph boundary right after fence close", () => {
const chunker = new EmbeddedBlockChunker({ const chunker = new EmbeddedBlockChunker({
@ -54,12 +46,25 @@ describe("EmbeddedBlockChunker", () => {
expect(chunker.bufferedText).toMatch(/^After/); expect(chunker.bufferedText).toMatch(/^After/);
}); });
it("flushes paragraph boundaries before minChars when flushOnParagraph is set", () => { it("waits until minChars before flushing paragraph boundaries when flushOnParagraph is set", () => {
expectFlushAtFirstParagraphBreak("First paragraph.\n\nSecond paragraph."); const chunker = createFlushOnParagraphChunker({ minChars: 30, maxChars: 200 });
chunker.append("First paragraph.\n\nSecond paragraph.\n\nThird paragraph.");
const chunks = drainChunks(chunker);
expect(chunks).toEqual(["First paragraph.\n\nSecond paragraph."]);
expect(chunker.bufferedText).toBe("Third paragraph.");
}); });
it("treats blank lines with whitespace as paragraph boundaries when flushOnParagraph is set", () => { it("still force flushes buffered paragraphs below minChars at the end", () => {
expectFlushAtFirstParagraphBreak("First paragraph.\n \nSecond paragraph."); const chunker = createFlushOnParagraphChunker({ minChars: 100, maxChars: 200 });
chunker.append("First paragraph.\n \nSecond paragraph.");
expect(drainChunks(chunker)).toEqual([]);
expect(drainChunks(chunker, true)).toEqual(["First paragraph.\n \nSecond paragraph."]);
expect(chunker.bufferedText).toBe("");
}); });
it("falls back to maxChars when flushOnParagraph is set and no paragraph break exists", () => { it("falls back to maxChars when flushOnParagraph is set and no paragraph break exists", () => {
@ -97,7 +102,7 @@ describe("EmbeddedBlockChunker", () => {
it("ignores paragraph breaks inside fences when flushOnParagraph is set", () => { it("ignores paragraph breaks inside fences when flushOnParagraph is set", () => {
const chunker = new EmbeddedBlockChunker({ const chunker = new EmbeddedBlockChunker({
minChars: 100, minChars: 10,
maxChars: 200, maxChars: 200,
breakPreference: "paragraph", breakPreference: "paragraph",
flushOnParagraph: true, flushOnParagraph: true,

View File

@ -5,7 +5,7 @@ export type BlockReplyChunking = {
minChars: number; minChars: number;
maxChars: number; maxChars: number;
breakPreference?: "paragraph" | "newline" | "sentence"; breakPreference?: "paragraph" | "newline" | "sentence";
/** When true, flush eagerly on \n\n paragraph boundaries regardless of minChars. */ /** When true, prefer \n\n paragraph boundaries once minChars has been satisfied. */
flushOnParagraph?: boolean; flushOnParagraph?: boolean;
}; };
@ -129,7 +129,7 @@ export class EmbeddedBlockChunker {
const minChars = Math.max(1, Math.floor(this.#chunking.minChars)); const minChars = Math.max(1, Math.floor(this.#chunking.minChars));
const maxChars = Math.max(minChars, Math.floor(this.#chunking.maxChars)); const maxChars = Math.max(minChars, Math.floor(this.#chunking.maxChars));
if (this.#buffer.length < minChars && !force && !this.#chunking.flushOnParagraph) { if (this.#buffer.length < minChars && !force) {
return; return;
} }
@ -150,12 +150,12 @@ export class EmbeddedBlockChunker {
const reopenPrefix = reopenFence ? `${reopenFence.openLine}\n` : ""; const reopenPrefix = reopenFence ? `${reopenFence.openLine}\n` : "";
const remainingLength = reopenPrefix.length + (source.length - start); const remainingLength = reopenPrefix.length + (source.length - start);
if (!force && !this.#chunking.flushOnParagraph && remainingLength < minChars) { if (!force && remainingLength < minChars) {
break; break;
} }
if (this.#chunking.flushOnParagraph && !force) { if (this.#chunking.flushOnParagraph && !force) {
const paragraphBreak = findNextParagraphBreak(source, fenceSpans, start); const paragraphBreak = findNextParagraphBreak(source, fenceSpans, start, minChars);
const paragraphLimit = Math.max(1, maxChars - reopenPrefix.length); const paragraphLimit = Math.max(1, maxChars - reopenPrefix.length);
if (paragraphBreak && paragraphBreak.index - start <= paragraphLimit) { if (paragraphBreak && paragraphBreak.index - start <= paragraphLimit) {
const chunk = `${reopenPrefix}${source.slice(start, paragraphBreak.index)}`; const chunk = `${reopenPrefix}${source.slice(start, paragraphBreak.index)}`;
@ -175,12 +175,7 @@ export class EmbeddedBlockChunker {
const breakResult = const breakResult =
force && remainingLength <= maxChars force && remainingLength <= maxChars
? this.#pickSoftBreakIndex(view, fenceSpans, 1, start) ? this.#pickSoftBreakIndex(view, fenceSpans, 1, start)
: this.#pickBreakIndex( : this.#pickBreakIndex(view, fenceSpans, force ? 1 : undefined, start);
view,
fenceSpans,
force || this.#chunking.flushOnParagraph ? 1 : undefined,
start,
);
if (breakResult.index <= 0) { if (breakResult.index <= 0) {
if (force) { if (force) {
emit(`${reopenPrefix}${source.slice(start)}`); emit(`${reopenPrefix}${source.slice(start)}`);
@ -205,7 +200,7 @@ export class EmbeddedBlockChunker {
const nextLength = const nextLength =
(reopenFence ? `${reopenFence.openLine}\n`.length : 0) + (source.length - start); (reopenFence ? `${reopenFence.openLine}\n`.length : 0) + (source.length - start);
if (nextLength < minChars && !force && !this.#chunking.flushOnParagraph) { if (nextLength < minChars && !force) {
break; break;
} }
if (nextLength < maxChars && !force && !this.#chunking.flushOnParagraph) { if (nextLength < maxChars && !force && !this.#chunking.flushOnParagraph) {
@ -401,6 +396,7 @@ function findNextParagraphBreak(
buffer: string, buffer: string,
fenceSpans: FenceSpan[], fenceSpans: FenceSpan[],
startIndex = 0, startIndex = 0,
minCharsFromStart = 1,
): ParagraphBreak | null { ): ParagraphBreak | null {
if (startIndex < 0) { if (startIndex < 0) {
return null; return null;
@ -413,6 +409,9 @@ function findNextParagraphBreak(
if (index < 0) { if (index < 0) {
continue; continue;
} }
if (index - startIndex < minCharsFromStart) {
continue;
}
if (!isSafeFenceBreak(fenceSpans, index)) { if (!isSafeFenceBreak(fenceSpans, index)) {
continue; continue;
} }

View File

@ -102,7 +102,7 @@ function resolveEnvelopeTimezone(options: NormalizedEnvelopeOptions): ResolvedEn
return explicit ? { mode: "iana", timeZone: explicit } : { mode: "utc" }; return explicit ? { mode: "iana", timeZone: explicit } : { mode: "utc" };
} }
function formatTimestamp( export function formatEnvelopeTimestamp(
ts: number | Date | undefined, ts: number | Date | undefined,
options?: EnvelopeFormatOptions, options?: EnvelopeFormatOptions,
): string | undefined { ): string | undefined {
@ -179,7 +179,7 @@ export function formatAgentEnvelope(params: AgentEnvelopeParams): string {
if (params.ip?.trim()) { if (params.ip?.trim()) {
parts.push(sanitizeEnvelopeHeaderPart(params.ip.trim())); parts.push(sanitizeEnvelopeHeaderPart(params.ip.trim()));
} }
const ts = formatTimestamp(params.timestamp, resolved); const ts = formatEnvelopeTimestamp(params.timestamp, resolved);
if (ts) { if (ts) {
parts.push(ts); parts.push(ts);
} }

View File

@ -89,8 +89,8 @@ export function createBlockReplyCoalescer(params: {
return; return;
} }
// When flushOnEnqueue is set (chunkMode="newline"), each enqueued payload is treated // When flushOnEnqueue is set, treat each enqueued payload as its own outbound block
// as a separate paragraph and flushed immediately so delivery matches streaming boundaries. // and flush immediately instead of waiting for coalescing thresholds.
if (flushOnEnqueue) { if (flushOnEnqueue) {
if (bufferText) { if (bufferText) {
void flush({ force: true }); void flush({ force: true });

View File

@ -44,6 +44,34 @@ describe("resolveEffectiveBlockStreamingConfig", () => {
expect(resolved.coalescing.idleMs).toBe(0); expect(resolved.coalescing.idleMs).toBe(0);
}); });
it("honors newline chunkMode for plugin channels even before the plugin registry is loaded", () => {
const cfg = {
channels: {
bluebubbles: {
chunkMode: "newline",
},
},
agents: {
defaults: {
blockStreamingChunk: {
minChars: 1,
maxChars: 4000,
breakPreference: "paragraph",
},
},
},
} as OpenClawConfig;
const resolved = resolveEffectiveBlockStreamingConfig({
cfg,
provider: "bluebubbles",
});
expect(resolved.chunking.flushOnParagraph).toBe(true);
expect(resolved.coalescing.flushOnEnqueue).toBeUndefined();
expect(resolved.coalescing.joiner).toBe("\n\n");
});
it("allows ACP maxChunkChars overrides above base defaults up to provider text limits", () => { it("allows ACP maxChunkChars overrides above base defaults up to provider text limits", () => {
const cfg = { const cfg = {
channels: { channels: {

View File

@ -3,26 +3,22 @@ import type { OpenClawConfig } from "../../config/config.js";
import type { BlockStreamingCoalesceConfig } from "../../config/types.js"; import type { BlockStreamingCoalesceConfig } from "../../config/types.js";
import { resolveAccountEntry } from "../../routing/account-lookup.js"; import { resolveAccountEntry } from "../../routing/account-lookup.js";
import { normalizeAccountId } from "../../routing/session-key.js"; import { normalizeAccountId } from "../../routing/session-key.js";
import { import { normalizeMessageChannel } from "../../utils/message-channel.js";
INTERNAL_MESSAGE_CHANNEL,
listDeliverableMessageChannels,
} from "../../utils/message-channel.js";
import { resolveChunkMode, resolveTextChunkLimit, type TextChunkProvider } from "../chunk.js"; import { resolveChunkMode, resolveTextChunkLimit, type TextChunkProvider } from "../chunk.js";
const DEFAULT_BLOCK_STREAM_MIN = 800; const DEFAULT_BLOCK_STREAM_MIN = 800;
const DEFAULT_BLOCK_STREAM_MAX = 1200; const DEFAULT_BLOCK_STREAM_MAX = 1200;
const DEFAULT_BLOCK_STREAM_COALESCE_IDLE_MS = 1000; const DEFAULT_BLOCK_STREAM_COALESCE_IDLE_MS = 1000;
const getBlockChunkProviders = () =>
new Set<TextChunkProvider>([...listDeliverableMessageChannels(), INTERNAL_MESSAGE_CHANNEL]);
function normalizeChunkProvider(provider?: string): TextChunkProvider | undefined { function normalizeChunkProvider(provider?: string): TextChunkProvider | undefined {
if (!provider) { if (!provider) {
return undefined; return undefined;
} }
const cleaned = provider.trim().toLowerCase(); const normalized = normalizeMessageChannel(provider);
return getBlockChunkProviders().has(cleaned as TextChunkProvider) if (!normalized) {
? (cleaned as TextChunkProvider) return undefined;
: undefined; }
return normalized as TextChunkProvider;
} }
function resolveProviderChunkContext( function resolveProviderChunkContext(
@ -70,7 +66,7 @@ export type BlockStreamingCoalescing = {
maxChars: number; maxChars: number;
idleMs: number; idleMs: number;
joiner: string; joiner: string;
/** When true, the coalescer flushes the buffer on each enqueue (paragraph-boundary flush). */ /** Internal escape hatch for transports that truly need per-enqueue flushing. */
flushOnEnqueue?: boolean; flushOnEnqueue?: boolean;
}; };
@ -151,7 +147,7 @@ export function resolveEffectiveBlockStreamingConfig(params: {
: chunking.breakPreference === "newline" : chunking.breakPreference === "newline"
? "\n" ? "\n"
: "\n\n"), : "\n\n"),
flushOnEnqueue: coalescingDefaults?.flushOnEnqueue ?? chunking.flushOnParagraph === true, ...(coalescingDefaults?.flushOnEnqueue === true ? { flushOnEnqueue: true } : {}),
}; };
return { chunking, coalescing }; return { chunking, coalescing };
@ -165,9 +161,9 @@ export function resolveBlockStreamingChunking(
const { providerKey, textLimit } = resolveProviderChunkContext(cfg, provider, accountId); const { providerKey, textLimit } = resolveProviderChunkContext(cfg, provider, accountId);
const chunkCfg = cfg?.agents?.defaults?.blockStreamingChunk; const chunkCfg = cfg?.agents?.defaults?.blockStreamingChunk;
// When chunkMode="newline", the outbound delivery splits on paragraph boundaries. // When chunkMode="newline", outbound delivery prefers paragraph boundaries.
// The block chunker should flush eagerly on \n\n boundaries during streaming, // Keep the chunker paragraph-aware during streaming, but still let minChars
// regardless of minChars, so each paragraph is sent as its own message. // control when a buffered paragraph is ready to flush.
const chunkMode = resolveChunkMode(cfg, providerKey, accountId); const chunkMode = resolveChunkMode(cfg, providerKey, accountId);
const maxRequested = Math.max(1, Math.floor(chunkCfg?.maxChars ?? DEFAULT_BLOCK_STREAM_MAX)); const maxRequested = Math.max(1, Math.floor(chunkCfg?.maxChars ?? DEFAULT_BLOCK_STREAM_MAX));
@ -196,7 +192,6 @@ export function resolveBlockStreamingCoalescing(
maxChars: number; maxChars: number;
breakPreference: "paragraph" | "newline" | "sentence"; breakPreference: "paragraph" | "newline" | "sentence";
}, },
opts?: { chunkMode?: "length" | "newline" },
): BlockStreamingCoalescing | undefined { ): BlockStreamingCoalescing | undefined {
const { providerKey, providerId, textLimit } = resolveProviderChunkContext( const { providerKey, providerId, textLimit } = resolveProviderChunkContext(
cfg, cfg,
@ -204,9 +199,6 @@ export function resolveBlockStreamingCoalescing(
accountId, accountId,
); );
// Resolve the outbound chunkMode so the coalescer can flush on paragraph boundaries
// when chunkMode="newline", matching the delivery-time splitting behavior.
const chunkMode = opts?.chunkMode ?? resolveChunkMode(cfg, providerKey, accountId);
const providerDefaults = providerId const providerDefaults = providerId
? getChannelPlugin(providerId)?.streaming?.blockStreamingCoalesceDefaults ? getChannelPlugin(providerId)?.streaming?.blockStreamingCoalesceDefaults
: undefined; : undefined;
@ -241,6 +233,5 @@ export function resolveBlockStreamingCoalescing(
maxChars, maxChars,
idleMs, idleMs,
joiner, joiner,
flushOnEnqueue: chunkMode === "newline",
}; };
} }

View File

@ -21,6 +21,7 @@ import { clearCommandLane, getQueueSize } from "../../process/command-queue.js";
import { normalizeMainKey } from "../../routing/session-key.js"; import { normalizeMainKey } from "../../routing/session-key.js";
import { isReasoningTagProvider } from "../../utils/provider-utils.js"; import { isReasoningTagProvider } from "../../utils/provider-utils.js";
import { hasControlCommand } from "../command-detection.js"; import { hasControlCommand } from "../command-detection.js";
import { resolveEnvelopeFormatOptions } from "../envelope.js";
import { buildInboundMediaNote } from "../media-note.js"; import { buildInboundMediaNote } from "../media-note.js";
import type { MsgContext, TemplateContext } from "../templating.js"; import type { MsgContext, TemplateContext } from "../templating.js";
import { import {
@ -292,6 +293,7 @@ export async function runPreparedReply(
isNewSession && isNewSession &&
((baseBodyTrimmedRaw.length === 0 && rawBodyTrimmed.length > 0) || isBareNewOrReset); ((baseBodyTrimmedRaw.length === 0 && rawBodyTrimmed.length > 0) || isBareNewOrReset);
const baseBodyFinal = isBareSessionReset ? buildBareSessionResetPrompt(cfg) : baseBody; const baseBodyFinal = isBareSessionReset ? buildBareSessionResetPrompt(cfg) : baseBody;
const envelopeOptions = resolveEnvelopeFormatOptions(cfg);
const inboundUserContext = buildInboundUserContextPrefix( const inboundUserContext = buildInboundUserContextPrefix(
isNewSession isNewSession
? { ? {
@ -301,6 +303,7 @@ export async function runPreparedReply(
: {}), : {}),
} }
: { ...sessionCtx, ThreadStarterBody: undefined }, : { ...sessionCtx, ThreadStarterBody: undefined },
envelopeOptions,
); );
const baseBodyForPrompt = isBareSessionReset const baseBodyForPrompt = isBareSessionReset
? baseBodyFinal ? baseBodyFinal

View File

@ -1,4 +1,5 @@
import { describe, expect, it } from "vitest"; import { describe, expect, it } from "vitest";
import { withEnv } from "../../test-utils/env.js";
import type { TemplateContext } from "../templating.js"; import type { TemplateContext } from "../templating.js";
import { buildInboundMetaSystemPrompt, buildInboundUserContextPrefix } from "./inbound-meta.js"; import { buildInboundMetaSystemPrompt, buildInboundUserContextPrefix } from "./inbound-meta.js";
@ -217,6 +218,25 @@ describe("buildInboundUserContextPrefix", () => {
expect(conversationInfo["timestamp"]).toEqual(expect.any(String)); expect(conversationInfo["timestamp"]).toEqual(expect.any(String));
}); });
it("honors envelope user timezone for conversation timestamps", () => {
withEnv({ TZ: "America/Los_Angeles" }, () => {
const text = buildInboundUserContextPrefix(
{
ChatType: "group",
MessageSid: "msg-with-user-tz",
Timestamp: Date.UTC(2026, 2, 19, 0, 0),
} as TemplateContext,
{
timezone: "user",
userTimezone: "Asia/Tokyo",
},
);
const conversationInfo = parseConversationInfoPayload(text);
expect(conversationInfo["timestamp"]).toBe("Thu 2026-03-19 09:00 GMT+9");
});
});
it("omits invalid timestamps instead of throwing", () => { it("omits invalid timestamps instead of throwing", () => {
expect(() => expect(() =>
buildInboundUserContextPrefix({ buildInboundUserContextPrefix({

View File

@ -1,6 +1,7 @@
import { normalizeChatType } from "../../channels/chat-type.js"; import { normalizeChatType } from "../../channels/chat-type.js";
import { resolveSenderLabel } from "../../channels/sender-label.js"; import { resolveSenderLabel } from "../../channels/sender-label.js";
import { formatZonedTimestamp } from "../../infra/format-time/format-datetime.js"; import type { EnvelopeFormatOptions } from "../envelope.js";
import { formatEnvelopeTimestamp } from "../envelope.js";
import type { TemplateContext } from "../templating.js"; import type { TemplateContext } from "../templating.js";
function safeTrim(value: unknown): string | undefined { function safeTrim(value: unknown): string | undefined {
@ -11,24 +12,14 @@ function safeTrim(value: unknown): string | undefined {
return trimmed ? trimmed : undefined; return trimmed ? trimmed : undefined;
} }
function formatConversationTimestamp(value: unknown): string | undefined { function formatConversationTimestamp(
value: unknown,
envelope?: EnvelopeFormatOptions,
): string | undefined {
if (typeof value !== "number" || !Number.isFinite(value)) { if (typeof value !== "number" || !Number.isFinite(value)) {
return undefined; return undefined;
} }
const date = new Date(value); return formatEnvelopeTimestamp(value, envelope);
if (Number.isNaN(date.getTime())) {
return undefined;
}
const formatted = formatZonedTimestamp(date);
if (!formatted) {
return undefined;
}
try {
const weekday = new Intl.DateTimeFormat("en-US", { weekday: "short" }).format(date);
return weekday ? `${weekday} ${formatted}` : formatted;
} catch {
return formatted;
}
} }
function resolveInboundChannel(ctx: TemplateContext): string | undefined { function resolveInboundChannel(ctx: TemplateContext): string | undefined {
@ -81,7 +72,10 @@ export function buildInboundMetaSystemPrompt(ctx: TemplateContext): string {
].join("\n"); ].join("\n");
} }
export function buildInboundUserContextPrefix(ctx: TemplateContext): string { export function buildInboundUserContextPrefix(
ctx: TemplateContext,
envelope?: EnvelopeFormatOptions,
): string {
const blocks: string[] = []; const blocks: string[] = [];
const chatType = normalizeChatType(ctx.ChatType); const chatType = normalizeChatType(ctx.ChatType);
const isDirect = !chatType || chatType === "direct"; const isDirect = !chatType || chatType === "direct";
@ -94,7 +88,7 @@ export function buildInboundUserContextPrefix(ctx: TemplateContext): string {
const messageId = safeTrim(ctx.MessageSid); const messageId = safeTrim(ctx.MessageSid);
const messageIdFull = safeTrim(ctx.MessageSidFull); const messageIdFull = safeTrim(ctx.MessageSidFull);
const resolvedMessageId = messageId ?? messageIdFull; const resolvedMessageId = messageId ?? messageIdFull;
const timestampStr = formatConversationTimestamp(ctx.Timestamp); const timestampStr = formatConversationTimestamp(ctx.Timestamp, envelope);
const conversationInfo = { const conversationInfo = {
message_id: shouldIncludeConversationInfo ? resolvedMessageId : undefined, message_id: shouldIncludeConversationInfo ? resolvedMessageId : undefined,

View File

@ -675,6 +675,39 @@ describe("block reply coalescer", () => {
coalescer.stop(); coalescer.stop();
}); });
it("keeps buffering newline-style chunks until minChars is reached", async () => {
vi.useFakeTimers();
const { flushes, coalescer } = createBlockCoalescerHarness({
minChars: 25,
maxChars: 2000,
idleMs: 50,
joiner: "\n\n",
});
coalescer.enqueue({ text: "First paragraph" });
coalescer.enqueue({ text: "Second paragraph" });
await vi.advanceTimersByTimeAsync(50);
expect(flushes).toEqual(["First paragraph\n\nSecond paragraph"]);
coalescer.stop();
});
it("force flushes buffered newline-style chunks even below minChars", async () => {
const { flushes, coalescer } = createBlockCoalescerHarness({
minChars: 100,
maxChars: 2000,
idleMs: 50,
joiner: "\n\n",
});
coalescer.enqueue({ text: "First paragraph" });
coalescer.enqueue({ text: "Second paragraph" });
await coalescer.flush({ force: true });
expect(flushes).toEqual(["First paragraph\n\nSecond paragraph"]);
coalescer.stop();
});
it("flushes immediately per enqueue when flushOnEnqueue is set", async () => { it("flushes immediately per enqueue when flushOnEnqueue is set", async () => {
const cases = [ const cases = [
{ {