import type { AssistantMessage } from "@mariozechner/pi-ai"; import { describe, expect, it, vi } from "vitest"; import { createStubSessionHarness, createTextEndBlockReplyHarness, emitAssistantTextDelta, emitAssistantTextEnd, } from "./pi-embedded-subscribe.e2e-harness.js"; import { subscribeEmbeddedPiSession } from "./pi-embedded-subscribe.js"; describe("subscribeEmbeddedPiSession", () => { it("does not emit duplicate block replies when text_end repeats", () => { const onBlockReply = vi.fn(); const { emit, subscription } = createTextEndBlockReplyHarness({ onBlockReply }); emitAssistantTextDelta({ emit, delta: "Hello block" }); emitAssistantTextEnd({ emit }); emitAssistantTextEnd({ emit }); expect(onBlockReply).toHaveBeenCalledTimes(1); expect(subscription.assistantTexts).toEqual(["Hello block"]); }); it("does not duplicate assistantTexts when message_end repeats", () => { const { session, emit } = createStubSessionHarness(); const subscription = subscribeEmbeddedPiSession({ session, runId: "run", }); const assistantMessage = { role: "assistant", content: [{ type: "text", text: "Hello world" }], } as AssistantMessage; emit({ type: "message_end", message: assistantMessage }); emit({ type: "message_end", message: assistantMessage }); expect(subscription.assistantTexts).toEqual(["Hello world"]); }); it("does not duplicate assistantTexts when message_end repeats with trailing whitespace changes", () => { const { session, emit } = createStubSessionHarness(); const subscription = subscribeEmbeddedPiSession({ session, runId: "run", }); const assistantMessageWithNewline = { role: "assistant", content: [{ type: "text", text: "Hello world\n" }], } as AssistantMessage; const assistantMessageTrimmed = { role: "assistant", content: [{ type: "text", text: "Hello world" }], } as AssistantMessage; emit({ type: "message_end", message: assistantMessageWithNewline }); emit({ type: "message_end", message: assistantMessageTrimmed }); expect(subscription.assistantTexts).toEqual(["Hello world"]); }); it("does not duplicate assistantTexts when message_end repeats with reasoning blocks", () => { const { session, emit } = createStubSessionHarness(); const subscription = subscribeEmbeddedPiSession({ session, runId: "run", reasoningMode: "on", }); const assistantMessage = { role: "assistant", content: [ { type: "thinking", thinking: "Because" }, { type: "text", text: "Hello world" }, ], } as AssistantMessage; emit({ type: "message_end", message: assistantMessage }); emit({ type: "message_end", message: assistantMessage }); expect(subscription.assistantTexts).toEqual(["Hello world"]); }); it("populates assistantTexts for non-streaming models with chunking enabled", () => { // Non-streaming models (e.g. zai/glm-4.7): no text_delta events; message_end // must still populate assistantTexts so providers can deliver a final reply. const { session, emit } = createStubSessionHarness(); const subscription = subscribeEmbeddedPiSession({ session, runId: "run", blockReplyChunking: { minChars: 50, maxChars: 200 }, // Chunking enabled }); // Simulate non-streaming model: only message_start and message_end, no text_delta emit({ type: "message_start", message: { role: "assistant" } }); const assistantMessage = { role: "assistant", content: [{ type: "text", text: "Response from non-streaming model" }], } as AssistantMessage; emit({ type: "message_end", message: assistantMessage }); expect(subscription.assistantTexts).toEqual(["Response from non-streaming model"]); }); });