Merge fa9d223a6d922fc7bfd47300791729904d3977be into 5e417b44e1540f528d2ae63e3e20229a902d1db2

This commit is contained in:
manusjs 2026-03-21 11:01:43 +08:00 committed by GitHub
commit 0392d64524
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 134 additions and 5 deletions

View File

@ -0,0 +1,101 @@
import { describe, expect, it, vi } from "vitest";
import {
createStubSessionHarness,
emitAssistantTextDelta,
emitAssistantTextEnd,
} from "./pi-embedded-subscribe.e2e-harness.js";
import { subscribeEmbeddedPiSession } from "./pi-embedded-subscribe.js";
/**
* Regression tests for #46002 blockStreamingBreak:"text_end" must deliver
* intermediate text before tool calls, not after all tool calls complete.
*
* Root cause: emitBlockReplySafely used `void Promise.resolve().then(onBlockReply)`
* which deferred the pipeline enqueue to a microtask. When handleToolExecutionStart
* called onBlockReplyFlush(), the pipeline was still empty text arrived only after
* the flush completed, leaving it stuck until the idle-timeout fired.
*
* Fix: call params.onBlockReply synchronously in emitBlockReplySafely so text is
* enqueued to the pipeline before onBlockReplyFlush() drains it.
*/
describe("subscribeEmbeddedPiSession — pre-tool text flush (blockStreamingBreak: text_end)", () => {
it("delivers pre-tool text via onBlockReply before onBlockReplyFlush when text_end fires before tool start", () => {
const { session, emit } = createStubSessionHarness();
const onBlockReply = vi.fn();
const onBlockReplyFlush = vi.fn();
subscribeEmbeddedPiSession({
session: session as unknown as Parameters<typeof subscribeEmbeddedPiSession>[0]["session"],
runId: "run-46002-text-end",
onBlockReply,
onBlockReplyFlush,
blockReplyBreak: "text_end",
});
emit({ type: "message_start", message: { role: "assistant" } });
// Simulate text streaming in before the tool call
emitAssistantTextDelta({ emit, delta: "Thinking out loud before using a tool." });
// text_end fires — with blockStreamingBreak:"text_end", this should immediately
// enqueue the text via onBlockReply so the pipeline has it before the tool flushes
emitAssistantTextEnd({ emit });
// At this point, onBlockReply must have been called synchronously (not deferred)
// so the pipeline already holds the pre-tool text.
expect(onBlockReply).toHaveBeenCalledTimes(1);
expect(onBlockReply.mock.calls[0]?.[0]?.text).toBe("Thinking out loud before using a tool.");
// Now a tool starts — this triggers onBlockReplyFlush to drain the pipeline
emit({
type: "tool_execution_start",
toolName: "bash",
toolCallId: "tool-46002-1",
args: { command: "echo hello" },
});
expect(onBlockReplyFlush).toHaveBeenCalledTimes(1);
// Critical: onBlockReply (text enqueue) MUST precede onBlockReplyFlush (pipeline drain).
// If onBlockReply was deferred (old bug), it would be called AFTER onBlockReplyFlush,
// meaning the flush saw an empty pipeline and the text was never delivered before the tool.
expect(onBlockReply.mock.invocationCallOrder[0]).toBeLessThan(
onBlockReplyFlush.mock.invocationCallOrder[0],
);
});
it("delivers pre-tool text via onBlockReply before onBlockReplyFlush across multiple tool calls", () => {
const { session, emit } = createStubSessionHarness();
const onBlockReply = vi.fn();
const onBlockReplyFlush = vi.fn();
subscribeEmbeddedPiSession({
session: session as unknown as Parameters<typeof subscribeEmbeddedPiSession>[0]["session"],
runId: "run-46002-multi-tool",
onBlockReply,
onBlockReplyFlush,
blockReplyBreak: "text_end",
});
emit({ type: "message_start", message: { role: "assistant" } });
emitAssistantTextDelta({ emit, delta: "Pre-tool message." });
emitAssistantTextEnd({ emit });
// Text must be enqueued before first tool flush
expect(onBlockReply).toHaveBeenCalledTimes(1);
emit({
type: "tool_execution_start",
toolName: "bash",
toolCallId: "tool-46002-a",
args: { command: "ls" },
});
expect(onBlockReplyFlush).toHaveBeenCalledTimes(1);
expect(onBlockReply.mock.invocationCallOrder[0]).toBeLessThan(
onBlockReplyFlush.mock.invocationCallOrder[0],
);
});
});

View File

@ -107,11 +107,20 @@ export function subscribeEmbeddedPiSession(params: SubscribeEmbeddedPiSessionPar
if (!params.onBlockReply) {
return;
}
void Promise.resolve()
.then(() => params.onBlockReply?.(payload))
.catch((err) => {
log.warn(`block reply callback failed: ${String(err)}`);
});
// Call synchronously so text is enqueued to the pipeline before any subsequent
// onBlockReplyFlush() drains it (e.g. at tool_execution_start). Deferring via
// Promise.resolve().then() caused the flush to see an empty pipeline, leaving
// pre-tool text stuck until the idle-timeout fired (#46002).
try {
const result = params.onBlockReply(payload);
if (result instanceof Promise) {
result.catch((err: unknown) => {
log.warn(`block reply callback failed: ${String(err)}`);
});
}
} catch (err) {
log.warn(`block reply callback failed: ${String(err)}`);
}
};
const resetAssistantMessageState = (nextAssistantTextBaseline: number) => {

View File

@ -2,6 +2,7 @@ import { describe, expect, it } from "vitest";
import type { OpenClawConfig } from "../../config/config.js";
import {
resolveBlockStreamingChunking,
resolveBlockStreamingCoalescing,
resolveEffectiveBlockStreamingConfig,
} from "./block-streaming.js";
@ -93,4 +94,22 @@ describe("resolveEffectiveBlockStreamingConfig", () => {
expect(resolved.chunking.maxChars).toBe(1800);
expect(resolved.chunking.minChars).toBeLessThanOrEqual(resolved.chunking.maxChars);
});
// Regression: #46002 — blockStreamingBreak:"text_end" must flush accumulated text
// before tool_use blocks. The flush path relies on blockChunker/blockBuffer state
// which is driven by the coalescing config. Verify that coalescing config is
// well-formed so that a non-empty blockBuffer can always be detected and flushed
// at tool_execution_start.
it("produces valid coalescing config that allows pre-tool text flush for text_end break mode", () => {
const cfg = {} as OpenClawConfig;
const chunking = resolveBlockStreamingChunking(cfg, "telegram");
const coalescing = resolveBlockStreamingCoalescing(cfg, "telegram", undefined, chunking);
expect(coalescing).toBeDefined();
expect(coalescing!.minChars).toBeGreaterThan(0);
expect(coalescing!.maxChars).toBeGreaterThanOrEqual(coalescing!.minChars);
expect(coalescing!.idleMs).toBeGreaterThanOrEqual(0);
// joiner must be a string so accumulated blocks can be joined and flushed
expect(typeof coalescing!.joiner).toBe("string");
});
});