From fa9d223a6d922fc7bfd47300791729904d3977be Mon Sep 17 00:00:00 2001 From: manusjs Date: Sat, 14 Mar 2026 11:00:36 +0000 Subject: [PATCH] fix(feishu): flush pre-tool text block when blockStreamingBreak is text_end Fixes #46002 --- ....flushes-pre-tool-text-on-text-end.test.ts | 101 ++++++++++++++++++ src/agents/pi-embedded-subscribe.ts | 19 +++- src/auto-reply/reply/block-streaming.test.ts | 19 ++++ 3 files changed, 134 insertions(+), 5 deletions(-) create mode 100644 src/agents/pi-embedded-subscribe.subscribe-embedded-pi-session.flushes-pre-tool-text-on-text-end.test.ts diff --git a/src/agents/pi-embedded-subscribe.subscribe-embedded-pi-session.flushes-pre-tool-text-on-text-end.test.ts b/src/agents/pi-embedded-subscribe.subscribe-embedded-pi-session.flushes-pre-tool-text-on-text-end.test.ts new file mode 100644 index 00000000000..f68c385a8dc --- /dev/null +++ b/src/agents/pi-embedded-subscribe.subscribe-embedded-pi-session.flushes-pre-tool-text-on-text-end.test.ts @@ -0,0 +1,101 @@ +import { describe, expect, it, vi } from "vitest"; +import { + createStubSessionHarness, + emitAssistantTextDelta, + emitAssistantTextEnd, +} from "./pi-embedded-subscribe.e2e-harness.js"; +import { subscribeEmbeddedPiSession } from "./pi-embedded-subscribe.js"; + +/** + * Regression tests for #46002 — blockStreamingBreak:"text_end" must deliver + * intermediate text before tool calls, not after all tool calls complete. + * + * Root cause: emitBlockReplySafely used `void Promise.resolve().then(onBlockReply)` + * which deferred the pipeline enqueue to a microtask. When handleToolExecutionStart + * called onBlockReplyFlush(), the pipeline was still empty — text arrived only after + * the flush completed, leaving it stuck until the idle-timeout fired. + * + * Fix: call params.onBlockReply synchronously in emitBlockReplySafely so text is + * enqueued to the pipeline before onBlockReplyFlush() drains it. + */ +describe("subscribeEmbeddedPiSession — pre-tool text flush (blockStreamingBreak: text_end)", () => { + it("delivers pre-tool text via onBlockReply before onBlockReplyFlush when text_end fires before tool start", () => { + const { session, emit } = createStubSessionHarness(); + + const onBlockReply = vi.fn(); + const onBlockReplyFlush = vi.fn(); + + subscribeEmbeddedPiSession({ + session: session as unknown as Parameters[0]["session"], + runId: "run-46002-text-end", + onBlockReply, + onBlockReplyFlush, + blockReplyBreak: "text_end", + }); + + emit({ type: "message_start", message: { role: "assistant" } }); + + // Simulate text streaming in before the tool call + emitAssistantTextDelta({ emit, delta: "Thinking out loud before using a tool." }); + + // text_end fires — with blockStreamingBreak:"text_end", this should immediately + // enqueue the text via onBlockReply so the pipeline has it before the tool flushes + emitAssistantTextEnd({ emit }); + + // At this point, onBlockReply must have been called synchronously (not deferred) + // so the pipeline already holds the pre-tool text. + expect(onBlockReply).toHaveBeenCalledTimes(1); + expect(onBlockReply.mock.calls[0]?.[0]?.text).toBe("Thinking out loud before using a tool."); + + // Now a tool starts — this triggers onBlockReplyFlush to drain the pipeline + emit({ + type: "tool_execution_start", + toolName: "bash", + toolCallId: "tool-46002-1", + args: { command: "echo hello" }, + }); + + expect(onBlockReplyFlush).toHaveBeenCalledTimes(1); + + // Critical: onBlockReply (text enqueue) MUST precede onBlockReplyFlush (pipeline drain). + // If onBlockReply was deferred (old bug), it would be called AFTER onBlockReplyFlush, + // meaning the flush saw an empty pipeline and the text was never delivered before the tool. + expect(onBlockReply.mock.invocationCallOrder[0]).toBeLessThan( + onBlockReplyFlush.mock.invocationCallOrder[0], + ); + }); + + it("delivers pre-tool text via onBlockReply before onBlockReplyFlush across multiple tool calls", () => { + const { session, emit } = createStubSessionHarness(); + + const onBlockReply = vi.fn(); + const onBlockReplyFlush = vi.fn(); + + subscribeEmbeddedPiSession({ + session: session as unknown as Parameters[0]["session"], + runId: "run-46002-multi-tool", + onBlockReply, + onBlockReplyFlush, + blockReplyBreak: "text_end", + }); + + emit({ type: "message_start", message: { role: "assistant" } }); + emitAssistantTextDelta({ emit, delta: "Pre-tool message." }); + emitAssistantTextEnd({ emit }); + + // Text must be enqueued before first tool flush + expect(onBlockReply).toHaveBeenCalledTimes(1); + + emit({ + type: "tool_execution_start", + toolName: "bash", + toolCallId: "tool-46002-a", + args: { command: "ls" }, + }); + + expect(onBlockReplyFlush).toHaveBeenCalledTimes(1); + expect(onBlockReply.mock.invocationCallOrder[0]).toBeLessThan( + onBlockReplyFlush.mock.invocationCallOrder[0], + ); + }); +}); diff --git a/src/agents/pi-embedded-subscribe.ts b/src/agents/pi-embedded-subscribe.ts index 83592372e80..5ee9b33d383 100644 --- a/src/agents/pi-embedded-subscribe.ts +++ b/src/agents/pi-embedded-subscribe.ts @@ -107,11 +107,20 @@ export function subscribeEmbeddedPiSession(params: SubscribeEmbeddedPiSessionPar if (!params.onBlockReply) { return; } - void Promise.resolve() - .then(() => params.onBlockReply?.(payload)) - .catch((err) => { - log.warn(`block reply callback failed: ${String(err)}`); - }); + // Call synchronously so text is enqueued to the pipeline before any subsequent + // onBlockReplyFlush() drains it (e.g. at tool_execution_start). Deferring via + // Promise.resolve().then() caused the flush to see an empty pipeline, leaving + // pre-tool text stuck until the idle-timeout fired (#46002). + try { + const result = params.onBlockReply(payload); + if (result instanceof Promise) { + result.catch((err: unknown) => { + log.warn(`block reply callback failed: ${String(err)}`); + }); + } + } catch (err) { + log.warn(`block reply callback failed: ${String(err)}`); + } }; const resetAssistantMessageState = (nextAssistantTextBaseline: number) => { diff --git a/src/auto-reply/reply/block-streaming.test.ts b/src/auto-reply/reply/block-streaming.test.ts index 29264ca99b3..a710c36a41d 100644 --- a/src/auto-reply/reply/block-streaming.test.ts +++ b/src/auto-reply/reply/block-streaming.test.ts @@ -2,6 +2,7 @@ import { describe, expect, it } from "vitest"; import type { OpenClawConfig } from "../../config/config.js"; import { resolveBlockStreamingChunking, + resolveBlockStreamingCoalescing, resolveEffectiveBlockStreamingConfig, } from "./block-streaming.js"; @@ -65,4 +66,22 @@ describe("resolveEffectiveBlockStreamingConfig", () => { expect(resolved.chunking.maxChars).toBe(1800); expect(resolved.chunking.minChars).toBeLessThanOrEqual(resolved.chunking.maxChars); }); + + // Regression: #46002 — blockStreamingBreak:"text_end" must flush accumulated text + // before tool_use blocks. The flush path relies on blockChunker/blockBuffer state + // which is driven by the coalescing config. Verify that coalescing config is + // well-formed so that a non-empty blockBuffer can always be detected and flushed + // at tool_execution_start. + it("produces valid coalescing config that allows pre-tool text flush for text_end break mode", () => { + const cfg = {} as OpenClawConfig; + const chunking = resolveBlockStreamingChunking(cfg, "telegram"); + const coalescing = resolveBlockStreamingCoalescing(cfg, "telegram", undefined, chunking); + + expect(coalescing).toBeDefined(); + expect(coalescing!.minChars).toBeGreaterThan(0); + expect(coalescing!.maxChars).toBeGreaterThanOrEqual(coalescing!.minChars); + expect(coalescing!.idleMs).toBeGreaterThanOrEqual(0); + // joiner must be a string so accumulated blocks can be joined and flushed + expect(typeof coalescing!.joiner).toBe("string"); + }); });