From dfbc23496e6d6d8f093f9e5ab96a0d9fca21c3e3 Mon Sep 17 00:00:00 2001 From: Tyler Yust Date: Thu, 19 Mar 2026 04:37:37 -0700 Subject: [PATCH 1/2] fix: honor BlueBubbles chunk mode and envelope timezone --- src/auto-reply/envelope.ts | 4 +-- src/auto-reply/reply/block-streaming.test.ts | 28 ++++++++++++++++++ src/auto-reply/reply/block-streaming.ts | 16 ++++------- src/auto-reply/reply/get-reply-run.ts | 3 ++ src/auto-reply/reply/inbound-meta.test.ts | 20 +++++++++++++ src/auto-reply/reply/inbound-meta.ts | 30 ++++++++------------ 6 files changed, 71 insertions(+), 30 deletions(-) diff --git a/src/auto-reply/envelope.ts b/src/auto-reply/envelope.ts index 3a2985419dd..5eedb19dd0c 100644 --- a/src/auto-reply/envelope.ts +++ b/src/auto-reply/envelope.ts @@ -102,7 +102,7 @@ function resolveEnvelopeTimezone(options: NormalizedEnvelopeOptions): ResolvedEn return explicit ? { mode: "iana", timeZone: explicit } : { mode: "utc" }; } -function formatTimestamp( +export function formatEnvelopeTimestamp( ts: number | Date | undefined, options?: EnvelopeFormatOptions, ): string | undefined { @@ -179,7 +179,7 @@ export function formatAgentEnvelope(params: AgentEnvelopeParams): string { if (params.ip?.trim()) { parts.push(sanitizeEnvelopeHeaderPart(params.ip.trim())); } - const ts = formatTimestamp(params.timestamp, resolved); + const ts = formatEnvelopeTimestamp(params.timestamp, resolved); if (ts) { parts.push(ts); } diff --git a/src/auto-reply/reply/block-streaming.test.ts b/src/auto-reply/reply/block-streaming.test.ts index 29264ca99b3..9da4f73a619 100644 --- a/src/auto-reply/reply/block-streaming.test.ts +++ b/src/auto-reply/reply/block-streaming.test.ts @@ -44,6 +44,34 @@ describe("resolveEffectiveBlockStreamingConfig", () => { expect(resolved.coalescing.idleMs).toBe(0); }); + it("honors newline chunkMode for plugin channels even before the plugin registry is loaded", () => { + const cfg = { + channels: { + bluebubbles: { + chunkMode: "newline", + }, + }, + agents: { + defaults: { + blockStreamingChunk: { + minChars: 1, + maxChars: 4000, + breakPreference: "paragraph", + }, + }, + }, + } as OpenClawConfig; + + const resolved = resolveEffectiveBlockStreamingConfig({ + cfg, + provider: "bluebubbles", + }); + + expect(resolved.chunking.flushOnParagraph).toBe(true); + expect(resolved.coalescing.flushOnEnqueue).toBe(true); + expect(resolved.coalescing.joiner).toBe("\n\n"); + }); + it("allows ACP maxChunkChars overrides above base defaults up to provider text limits", () => { const cfg = { channels: { diff --git a/src/auto-reply/reply/block-streaming.ts b/src/auto-reply/reply/block-streaming.ts index 9149f7c8562..8db8170e060 100644 --- a/src/auto-reply/reply/block-streaming.ts +++ b/src/auto-reply/reply/block-streaming.ts @@ -3,26 +3,22 @@ import type { OpenClawConfig } from "../../config/config.js"; import type { BlockStreamingCoalesceConfig } from "../../config/types.js"; import { resolveAccountEntry } from "../../routing/account-lookup.js"; import { normalizeAccountId } from "../../routing/session-key.js"; -import { - INTERNAL_MESSAGE_CHANNEL, - listDeliverableMessageChannels, -} from "../../utils/message-channel.js"; +import { normalizeMessageChannel } from "../../utils/message-channel.js"; import { resolveChunkMode, resolveTextChunkLimit, type TextChunkProvider } from "../chunk.js"; const DEFAULT_BLOCK_STREAM_MIN = 800; const DEFAULT_BLOCK_STREAM_MAX = 1200; const DEFAULT_BLOCK_STREAM_COALESCE_IDLE_MS = 1000; -const getBlockChunkProviders = () => - new Set([...listDeliverableMessageChannels(), INTERNAL_MESSAGE_CHANNEL]); function normalizeChunkProvider(provider?: string): TextChunkProvider | undefined { if (!provider) { return undefined; } - const cleaned = provider.trim().toLowerCase(); - return getBlockChunkProviders().has(cleaned as TextChunkProvider) - ? (cleaned as TextChunkProvider) - : undefined; + const normalized = normalizeMessageChannel(provider); + if (!normalized) { + return undefined; + } + return normalized as TextChunkProvider; } function resolveProviderChunkContext( diff --git a/src/auto-reply/reply/get-reply-run.ts b/src/auto-reply/reply/get-reply-run.ts index 760c42aed1a..c8451fd88f6 100644 --- a/src/auto-reply/reply/get-reply-run.ts +++ b/src/auto-reply/reply/get-reply-run.ts @@ -21,6 +21,7 @@ import { clearCommandLane, getQueueSize } from "../../process/command-queue.js"; import { normalizeMainKey } from "../../routing/session-key.js"; import { isReasoningTagProvider } from "../../utils/provider-utils.js"; import { hasControlCommand } from "../command-detection.js"; +import { resolveEnvelopeFormatOptions } from "../envelope.js"; import { buildInboundMediaNote } from "../media-note.js"; import type { MsgContext, TemplateContext } from "../templating.js"; import { @@ -292,6 +293,7 @@ export async function runPreparedReply( isNewSession && ((baseBodyTrimmedRaw.length === 0 && rawBodyTrimmed.length > 0) || isBareNewOrReset); const baseBodyFinal = isBareSessionReset ? buildBareSessionResetPrompt(cfg) : baseBody; + const envelopeOptions = resolveEnvelopeFormatOptions(cfg); const inboundUserContext = buildInboundUserContextPrefix( isNewSession ? { @@ -301,6 +303,7 @@ export async function runPreparedReply( : {}), } : { ...sessionCtx, ThreadStarterBody: undefined }, + envelopeOptions, ); const baseBodyForPrompt = isBareSessionReset ? baseBodyFinal diff --git a/src/auto-reply/reply/inbound-meta.test.ts b/src/auto-reply/reply/inbound-meta.test.ts index b39fe5c9805..db964a9db26 100644 --- a/src/auto-reply/reply/inbound-meta.test.ts +++ b/src/auto-reply/reply/inbound-meta.test.ts @@ -1,4 +1,5 @@ import { describe, expect, it } from "vitest"; +import { withEnv } from "../../test-utils/env.js"; import type { TemplateContext } from "../templating.js"; import { buildInboundMetaSystemPrompt, buildInboundUserContextPrefix } from "./inbound-meta.js"; @@ -217,6 +218,25 @@ describe("buildInboundUserContextPrefix", () => { expect(conversationInfo["timestamp"]).toEqual(expect.any(String)); }); + it("honors envelope user timezone for conversation timestamps", () => { + withEnv({ TZ: "America/Los_Angeles" }, () => { + const text = buildInboundUserContextPrefix( + { + ChatType: "group", + MessageSid: "msg-with-user-tz", + Timestamp: Date.UTC(2026, 2, 19, 0, 0), + } as TemplateContext, + { + timezone: "user", + userTimezone: "Asia/Tokyo", + }, + ); + + const conversationInfo = parseConversationInfoPayload(text); + expect(conversationInfo["timestamp"]).toBe("Thu 2026-03-19 09:00 GMT+9"); + }); + }); + it("omits invalid timestamps instead of throwing", () => { expect(() => buildInboundUserContextPrefix({ diff --git a/src/auto-reply/reply/inbound-meta.ts b/src/auto-reply/reply/inbound-meta.ts index 519414fa109..8aa9973bae0 100644 --- a/src/auto-reply/reply/inbound-meta.ts +++ b/src/auto-reply/reply/inbound-meta.ts @@ -1,6 +1,7 @@ import { normalizeChatType } from "../../channels/chat-type.js"; import { resolveSenderLabel } from "../../channels/sender-label.js"; -import { formatZonedTimestamp } from "../../infra/format-time/format-datetime.js"; +import type { EnvelopeFormatOptions } from "../envelope.js"; +import { formatEnvelopeTimestamp } from "../envelope.js"; import type { TemplateContext } from "../templating.js"; function safeTrim(value: unknown): string | undefined { @@ -11,24 +12,14 @@ function safeTrim(value: unknown): string | undefined { return trimmed ? trimmed : undefined; } -function formatConversationTimestamp(value: unknown): string | undefined { +function formatConversationTimestamp( + value: unknown, + envelope?: EnvelopeFormatOptions, +): string | undefined { if (typeof value !== "number" || !Number.isFinite(value)) { return undefined; } - const date = new Date(value); - if (Number.isNaN(date.getTime())) { - return undefined; - } - const formatted = formatZonedTimestamp(date); - if (!formatted) { - return undefined; - } - try { - const weekday = new Intl.DateTimeFormat("en-US", { weekday: "short" }).format(date); - return weekday ? `${weekday} ${formatted}` : formatted; - } catch { - return formatted; - } + return formatEnvelopeTimestamp(value, envelope); } function resolveInboundChannel(ctx: TemplateContext): string | undefined { @@ -81,7 +72,10 @@ export function buildInboundMetaSystemPrompt(ctx: TemplateContext): string { ].join("\n"); } -export function buildInboundUserContextPrefix(ctx: TemplateContext): string { +export function buildInboundUserContextPrefix( + ctx: TemplateContext, + envelope?: EnvelopeFormatOptions, +): string { const blocks: string[] = []; const chatType = normalizeChatType(ctx.ChatType); const isDirect = !chatType || chatType === "direct"; @@ -94,7 +88,7 @@ export function buildInboundUserContextPrefix(ctx: TemplateContext): string { const messageId = safeTrim(ctx.MessageSid); const messageIdFull = safeTrim(ctx.MessageSidFull); const resolvedMessageId = messageId ?? messageIdFull; - const timestampStr = formatConversationTimestamp(ctx.Timestamp); + const timestampStr = formatConversationTimestamp(ctx.Timestamp, envelope); const conversationInfo = { message_id: shouldIncludeConversationInfo ? resolvedMessageId : undefined, From 00b2e2301496c8f022de880441bac08525da3c40 Mon Sep 17 00:00:00 2001 From: Tyler Yust Date: Thu, 19 Mar 2026 05:39:38 -0700 Subject: [PATCH 2/2] fix: stop newline block streaming from sending per paragraph --- src/agents/pi-embedded-block-chunker.test.ts | 35 +++++++++++-------- src/agents/pi-embedded-block-chunker.ts | 21 ++++++----- src/auto-reply/reply/block-reply-coalescer.ts | 4 +-- src/auto-reply/reply/block-streaming.test.ts | 2 +- src/auto-reply/reply/block-streaming.ts | 15 +++----- src/auto-reply/reply/reply-utils.test.ts | 33 +++++++++++++++++ 6 files changed, 71 insertions(+), 39 deletions(-) diff --git a/src/agents/pi-embedded-block-chunker.test.ts b/src/agents/pi-embedded-block-chunker.test.ts index c8b1f5dda55..0766dce9233 100644 --- a/src/agents/pi-embedded-block-chunker.test.ts +++ b/src/agents/pi-embedded-block-chunker.test.ts @@ -11,20 +11,12 @@ function createFlushOnParagraphChunker(params: { minChars: number; maxChars: num }); } -function drainChunks(chunker: EmbeddedBlockChunker) { +function drainChunks(chunker: EmbeddedBlockChunker, force = false) { const chunks: string[] = []; - chunker.drain({ force: false, emit: (chunk) => chunks.push(chunk) }); + chunker.drain({ force, emit: (chunk) => chunks.push(chunk) }); return chunks; } -function expectFlushAtFirstParagraphBreak(text: string) { - const chunker = createFlushOnParagraphChunker({ minChars: 100, maxChars: 200 }); - chunker.append(text); - const chunks = drainChunks(chunker); - expect(chunks).toEqual(["First paragraph."]); - expect(chunker.bufferedText).toBe("Second paragraph."); -} - describe("EmbeddedBlockChunker", () => { it("breaks at paragraph boundary right after fence close", () => { const chunker = new EmbeddedBlockChunker({ @@ -54,12 +46,25 @@ describe("EmbeddedBlockChunker", () => { expect(chunker.bufferedText).toMatch(/^After/); }); - it("flushes paragraph boundaries before minChars when flushOnParagraph is set", () => { - expectFlushAtFirstParagraphBreak("First paragraph.\n\nSecond paragraph."); + it("waits until minChars before flushing paragraph boundaries when flushOnParagraph is set", () => { + const chunker = createFlushOnParagraphChunker({ minChars: 30, maxChars: 200 }); + + chunker.append("First paragraph.\n\nSecond paragraph.\n\nThird paragraph."); + + const chunks = drainChunks(chunker); + + expect(chunks).toEqual(["First paragraph.\n\nSecond paragraph."]); + expect(chunker.bufferedText).toBe("Third paragraph."); }); - it("treats blank lines with whitespace as paragraph boundaries when flushOnParagraph is set", () => { - expectFlushAtFirstParagraphBreak("First paragraph.\n \nSecond paragraph."); + it("still force flushes buffered paragraphs below minChars at the end", () => { + const chunker = createFlushOnParagraphChunker({ minChars: 100, maxChars: 200 }); + + chunker.append("First paragraph.\n \nSecond paragraph."); + + expect(drainChunks(chunker)).toEqual([]); + expect(drainChunks(chunker, true)).toEqual(["First paragraph.\n \nSecond paragraph."]); + expect(chunker.bufferedText).toBe(""); }); it("falls back to maxChars when flushOnParagraph is set and no paragraph break exists", () => { @@ -97,7 +102,7 @@ describe("EmbeddedBlockChunker", () => { it("ignores paragraph breaks inside fences when flushOnParagraph is set", () => { const chunker = new EmbeddedBlockChunker({ - minChars: 100, + minChars: 10, maxChars: 200, breakPreference: "paragraph", flushOnParagraph: true, diff --git a/src/agents/pi-embedded-block-chunker.ts b/src/agents/pi-embedded-block-chunker.ts index 11eddc2d190..6abe7b5a7da 100644 --- a/src/agents/pi-embedded-block-chunker.ts +++ b/src/agents/pi-embedded-block-chunker.ts @@ -5,7 +5,7 @@ export type BlockReplyChunking = { minChars: number; maxChars: number; breakPreference?: "paragraph" | "newline" | "sentence"; - /** When true, flush eagerly on \n\n paragraph boundaries regardless of minChars. */ + /** When true, prefer \n\n paragraph boundaries once minChars has been satisfied. */ flushOnParagraph?: boolean; }; @@ -129,7 +129,7 @@ export class EmbeddedBlockChunker { const minChars = Math.max(1, Math.floor(this.#chunking.minChars)); const maxChars = Math.max(minChars, Math.floor(this.#chunking.maxChars)); - if (this.#buffer.length < minChars && !force && !this.#chunking.flushOnParagraph) { + if (this.#buffer.length < minChars && !force) { return; } @@ -150,12 +150,12 @@ export class EmbeddedBlockChunker { const reopenPrefix = reopenFence ? `${reopenFence.openLine}\n` : ""; const remainingLength = reopenPrefix.length + (source.length - start); - if (!force && !this.#chunking.flushOnParagraph && remainingLength < minChars) { + if (!force && remainingLength < minChars) { break; } if (this.#chunking.flushOnParagraph && !force) { - const paragraphBreak = findNextParagraphBreak(source, fenceSpans, start); + const paragraphBreak = findNextParagraphBreak(source, fenceSpans, start, minChars); const paragraphLimit = Math.max(1, maxChars - reopenPrefix.length); if (paragraphBreak && paragraphBreak.index - start <= paragraphLimit) { const chunk = `${reopenPrefix}${source.slice(start, paragraphBreak.index)}`; @@ -175,12 +175,7 @@ export class EmbeddedBlockChunker { const breakResult = force && remainingLength <= maxChars ? this.#pickSoftBreakIndex(view, fenceSpans, 1, start) - : this.#pickBreakIndex( - view, - fenceSpans, - force || this.#chunking.flushOnParagraph ? 1 : undefined, - start, - ); + : this.#pickBreakIndex(view, fenceSpans, force ? 1 : undefined, start); if (breakResult.index <= 0) { if (force) { emit(`${reopenPrefix}${source.slice(start)}`); @@ -205,7 +200,7 @@ export class EmbeddedBlockChunker { const nextLength = (reopenFence ? `${reopenFence.openLine}\n`.length : 0) + (source.length - start); - if (nextLength < minChars && !force && !this.#chunking.flushOnParagraph) { + if (nextLength < minChars && !force) { break; } if (nextLength < maxChars && !force && !this.#chunking.flushOnParagraph) { @@ -401,6 +396,7 @@ function findNextParagraphBreak( buffer: string, fenceSpans: FenceSpan[], startIndex = 0, + minCharsFromStart = 1, ): ParagraphBreak | null { if (startIndex < 0) { return null; @@ -413,6 +409,9 @@ function findNextParagraphBreak( if (index < 0) { continue; } + if (index - startIndex < minCharsFromStart) { + continue; + } if (!isSafeFenceBreak(fenceSpans, index)) { continue; } diff --git a/src/auto-reply/reply/block-reply-coalescer.ts b/src/auto-reply/reply/block-reply-coalescer.ts index c7a6f85c26b..ada535ad7cc 100644 --- a/src/auto-reply/reply/block-reply-coalescer.ts +++ b/src/auto-reply/reply/block-reply-coalescer.ts @@ -89,8 +89,8 @@ export function createBlockReplyCoalescer(params: { return; } - // When flushOnEnqueue is set (chunkMode="newline"), each enqueued payload is treated - // as a separate paragraph and flushed immediately so delivery matches streaming boundaries. + // When flushOnEnqueue is set, treat each enqueued payload as its own outbound block + // and flush immediately instead of waiting for coalescing thresholds. if (flushOnEnqueue) { if (bufferText) { void flush({ force: true }); diff --git a/src/auto-reply/reply/block-streaming.test.ts b/src/auto-reply/reply/block-streaming.test.ts index 9da4f73a619..1850f1521c8 100644 --- a/src/auto-reply/reply/block-streaming.test.ts +++ b/src/auto-reply/reply/block-streaming.test.ts @@ -68,7 +68,7 @@ describe("resolveEffectiveBlockStreamingConfig", () => { }); expect(resolved.chunking.flushOnParagraph).toBe(true); - expect(resolved.coalescing.flushOnEnqueue).toBe(true); + expect(resolved.coalescing.flushOnEnqueue).toBeUndefined(); expect(resolved.coalescing.joiner).toBe("\n\n"); }); diff --git a/src/auto-reply/reply/block-streaming.ts b/src/auto-reply/reply/block-streaming.ts index 8db8170e060..df1582846ff 100644 --- a/src/auto-reply/reply/block-streaming.ts +++ b/src/auto-reply/reply/block-streaming.ts @@ -66,7 +66,7 @@ export type BlockStreamingCoalescing = { maxChars: number; idleMs: number; joiner: string; - /** When true, the coalescer flushes the buffer on each enqueue (paragraph-boundary flush). */ + /** Internal escape hatch for transports that truly need per-enqueue flushing. */ flushOnEnqueue?: boolean; }; @@ -147,7 +147,7 @@ export function resolveEffectiveBlockStreamingConfig(params: { : chunking.breakPreference === "newline" ? "\n" : "\n\n"), - flushOnEnqueue: coalescingDefaults?.flushOnEnqueue ?? chunking.flushOnParagraph === true, + ...(coalescingDefaults?.flushOnEnqueue === true ? { flushOnEnqueue: true } : {}), }; return { chunking, coalescing }; @@ -161,9 +161,9 @@ export function resolveBlockStreamingChunking( const { providerKey, textLimit } = resolveProviderChunkContext(cfg, provider, accountId); const chunkCfg = cfg?.agents?.defaults?.blockStreamingChunk; - // When chunkMode="newline", the outbound delivery splits on paragraph boundaries. - // The block chunker should flush eagerly on \n\n boundaries during streaming, - // regardless of minChars, so each paragraph is sent as its own message. + // When chunkMode="newline", outbound delivery prefers paragraph boundaries. + // Keep the chunker paragraph-aware during streaming, but still let minChars + // control when a buffered paragraph is ready to flush. const chunkMode = resolveChunkMode(cfg, providerKey, accountId); const maxRequested = Math.max(1, Math.floor(chunkCfg?.maxChars ?? DEFAULT_BLOCK_STREAM_MAX)); @@ -192,7 +192,6 @@ export function resolveBlockStreamingCoalescing( maxChars: number; breakPreference: "paragraph" | "newline" | "sentence"; }, - opts?: { chunkMode?: "length" | "newline" }, ): BlockStreamingCoalescing | undefined { const { providerKey, providerId, textLimit } = resolveProviderChunkContext( cfg, @@ -200,9 +199,6 @@ export function resolveBlockStreamingCoalescing( accountId, ); - // Resolve the outbound chunkMode so the coalescer can flush on paragraph boundaries - // when chunkMode="newline", matching the delivery-time splitting behavior. - const chunkMode = opts?.chunkMode ?? resolveChunkMode(cfg, providerKey, accountId); const providerDefaults = providerId ? getChannelPlugin(providerId)?.streaming?.blockStreamingCoalesceDefaults : undefined; @@ -237,6 +233,5 @@ export function resolveBlockStreamingCoalescing( maxChars, idleMs, joiner, - flushOnEnqueue: chunkMode === "newline", }; } diff --git a/src/auto-reply/reply/reply-utils.test.ts b/src/auto-reply/reply/reply-utils.test.ts index fc499e93676..2055ce54583 100644 --- a/src/auto-reply/reply/reply-utils.test.ts +++ b/src/auto-reply/reply/reply-utils.test.ts @@ -675,6 +675,39 @@ describe("block reply coalescer", () => { coalescer.stop(); }); + it("keeps buffering newline-style chunks until minChars is reached", async () => { + vi.useFakeTimers(); + const { flushes, coalescer } = createBlockCoalescerHarness({ + minChars: 25, + maxChars: 2000, + idleMs: 50, + joiner: "\n\n", + }); + + coalescer.enqueue({ text: "First paragraph" }); + coalescer.enqueue({ text: "Second paragraph" }); + + await vi.advanceTimersByTimeAsync(50); + expect(flushes).toEqual(["First paragraph\n\nSecond paragraph"]); + coalescer.stop(); + }); + + it("force flushes buffered newline-style chunks even below minChars", async () => { + const { flushes, coalescer } = createBlockCoalescerHarness({ + minChars: 100, + maxChars: 2000, + idleMs: 50, + joiner: "\n\n", + }); + + coalescer.enqueue({ text: "First paragraph" }); + coalescer.enqueue({ text: "Second paragraph" }); + await coalescer.flush({ force: true }); + + expect(flushes).toEqual(["First paragraph\n\nSecond paragraph"]); + coalescer.stop(); + }); + it("flushes immediately per enqueue when flushOnEnqueue is set", async () => { const cases = [ {