diff --git a/src/auto-reply/reply/dispatch-acp-delivery.ts b/src/auto-reply/reply/dispatch-acp-delivery.ts index 57be876132b..22a52c402dc 100644 --- a/src/auto-reply/reply/dispatch-acp-delivery.ts +++ b/src/auto-reply/reply/dispatch-acp-delivery.ts @@ -1,4 +1,3 @@ -import { hasOutboundReplyContent } from "openclaw/plugin-sdk/reply-payload"; import type { OpenClawConfig } from "../../config/config.js"; import type { TtsAutoMode } from "../../config/types.tts.js"; import { logVerbose } from "../../globals.js"; @@ -12,6 +11,7 @@ import { routeReply } from "./route-reply.js"; export type AcpDispatchDeliveryMeta = { toolCallId?: string; allowEdit?: boolean; + skipTts?: boolean; }; type ToolMessageHandle = { @@ -128,18 +128,20 @@ export function createAcpDispatchDeliveryCoordinator(params: { state.blockCount += 1; } - if (hasOutboundReplyContent(payload, { trimText: true })) { + if ((payload.text?.trim() ?? "").length > 0 || payload.mediaUrl || payload.mediaUrls?.length) { await startReplyLifecycleOnce(); } - const ttsPayload = await maybeApplyTtsToPayload({ - payload, - cfg: params.cfg, - channel: params.ttsChannel, - kind, - inboundAudio: params.inboundAudio, - ttsAuto: params.sessionTtsAuto, - }); + const ttsPayload = meta?.skipTts + ? payload + : await maybeApplyTtsToPayload({ + payload, + cfg: params.cfg, + channel: params.ttsChannel, + kind, + inboundAudio: params.inboundAudio, + ttsAuto: params.sessionTtsAuto, + }); if (params.shouldRouteToOriginating && params.originatingChannel && params.originatingTo) { const toolCallId = meta?.toolCallId?.trim(); diff --git a/src/auto-reply/reply/dispatch-acp.test.ts b/src/auto-reply/reply/dispatch-acp.test.ts index b19f2edde09..5f9fb367fa4 100644 --- a/src/auto-reply/reply/dispatch-acp.test.ts +++ b/src/auto-reply/reply/dispatch-acp.test.ts @@ -435,4 +435,91 @@ describe("tryDispatchAcpReply", () => { }), ); }); + + it("delivers accumulated block text as fallback when TTS synthesis returns no media", async () => { + setReadyAcpResolution(); + // Configure TTS mode as "final" but TTS synthesis returns no mediaUrl + ttsMocks.resolveTtsConfig.mockReturnValue({ mode: "final" }); + // Mock TTS to return no mediaUrl for all calls in this test + ttsMocks.maybeApplyTtsToPayload.mockResolvedValue( + {} as ReturnType, + ); + + managerMocks.runTurn.mockImplementation( + async ({ onEvent }: { onEvent: (event: unknown) => Promise }) => { + await onEvent({ type: "text_delta", text: "CODEX_OK", tag: "agent_message_chunk" }); + await onEvent({ type: "done" }); + }, + ); + + const { dispatcher } = createDispatcher(); + const result = await runDispatch({ + bodyForAgent: "run acp", + dispatcher, + shouldRouteToOriginating: false, // Use non-routed flow to test fallback logic + }); + + // Should deliver final text as fallback when TTS produced no media. + // In non-routed flow, block delivery is not tracked, so fallback should run. + expect(result?.counts.final).toBe(1); + // Verify final delivery contains the expected text + expect(dispatcher.sendFinalReply).toHaveBeenCalledWith( + expect.objectContaining({ + text: "CODEX_OK", + }), + ); + }); + + it("does not duplicate delivery when blocks were already routed", async () => { + setReadyAcpResolution(); + // Configure TTS mode as "none" - should skip TTS for final delivery + ttsMocks.resolveTtsConfig.mockReturnValue({ mode: "none" }); + + // Simulate normal flow where projector routes blocks + managerMocks.runTurn.mockImplementation( + async ({ onEvent }: { onEvent: (event: unknown) => Promise }) => { + await onEvent({ type: "text_delta", text: "Task completed", tag: "agent_message_chunk" }); + await onEvent({ type: "done" }); + }, + ); + + const { dispatcher } = createDispatcher(); + const result = await runDispatch({ + bodyForAgent: "run acp", + dispatcher, + shouldRouteToOriginating: true, + }); + + // Should NOT deliver duplicate final text when blocks were already routed + // The block delivery should be sufficient + expect(result?.counts.block).toBeGreaterThanOrEqual(1); + expect(result?.counts.final).toBe(0); + // Verify routeReply was called for block, not for duplicate final + expect(routeMocks.routeReply).toHaveBeenCalledTimes(1); + }); + + it("skips fallback when TTS mode is all (blocks already processed with TTS)", async () => { + setReadyAcpResolution(); + // Configure TTS mode as "all" - blocks already went through TTS + ttsMocks.resolveTtsConfig.mockReturnValue({ mode: "all" }); + + managerMocks.runTurn.mockImplementation( + async ({ onEvent }: { onEvent: (event: unknown) => Promise }) => { + await onEvent({ type: "text_delta", text: "Response", tag: "agent_message_chunk" }); + await onEvent({ type: "done" }); + }, + ); + + const { dispatcher } = createDispatcher(); + const result = await runDispatch({ + bodyForAgent: "run acp", + dispatcher, + shouldRouteToOriginating: true, + }); + + // Should NOT trigger fallback for ttsMode="all" to avoid duplicate TTS + expect(result?.counts.final).toBe(0); + // Note: maybeApplyTtsToPayload is called during block delivery, not in the fallback path + // We just verify that no final delivery occurred + }); }); diff --git a/src/auto-reply/reply/dispatch-acp.ts b/src/auto-reply/reply/dispatch-acp.ts index 8fc7110fc4c..f5b46f077b0 100644 --- a/src/auto-reply/reply/dispatch-acp.ts +++ b/src/auto-reply/reply/dispatch-acp.ts @@ -314,7 +314,11 @@ export async function tryDispatchAcpReply(params: { await projector.flush(true); const ttsMode = resolveTtsConfig(params.cfg).mode ?? "final"; const accumulatedBlockText = delivery.getAccumulatedBlockText(); - if (ttsMode === "final" && delivery.getBlockCount() > 0 && accumulatedBlockText.trim()) { + const routedCounts = delivery.getRoutedCounts(); + // Attempt final TTS synthesis for ttsMode="final" (only if we have text to synthesize). + // This ensures routed ACP flows still get final audio even after block delivery. + let ttsSucceeded = false; + if (ttsMode === "final" && accumulatedBlockText.trim()) { try { const ttsSyntheticReply = await maybeApplyTtsToPayload({ payload: { text: accumulatedBlockText }, @@ -325,18 +329,42 @@ export async function tryDispatchAcpReply(params: { ttsAuto: params.sessionTtsAuto, }); if (ttsSyntheticReply.mediaUrl) { + // Use delivery.deliver to ensure proper routing in cross-provider ACP turns. + // Pass audioAsVoice to avoid re-entering TTS synthesis. const delivered = await delivery.deliver("final", { mediaUrl: ttsSyntheticReply.mediaUrl, audioAsVoice: ttsSyntheticReply.audioAsVoice, }); queuedFinal = queuedFinal || delivered; + if (delivered) { + ttsSucceeded = true; // TTS succeeded AND delivered, skip text fallback + } } } catch (err) { logVerbose( `dispatch-acp: accumulated ACP block TTS failed: ${err instanceof Error ? err.message : String(err)}`, ); + // TTS failed, fall through to text fallback } } + // Only attempt text fallback if no delivery has happened yet. + // For routed flows, check routedCounts (block or final) to detect prior successful delivery. + // For non-routed flows, we cannot reliably detect delivery success (blockCount increments + // before send), so we skip the fallback guard to allow recovery when block delivery fails. + // Skip fallback for ttsMode="all" because blocks were already processed with TTS. + const shouldSkipTextFallback = + ttsMode === "all" || + ttsSucceeded || + (params.shouldRouteToOriginating && (routedCounts.block > 0 || routedCounts.final > 0)); + if (!shouldSkipTextFallback && accumulatedBlockText.trim()) { + // Fallback to text-only delivery (no TTS). + // For routed flows, use delivery.deliver with skipTts to bypass TTS re-entry. + // For non-routed flows, use dispatcher directly to bypass TTS. + const delivered = params.shouldRouteToOriginating + ? await delivery.deliver("final", { text: accumulatedBlockText }, { skipTts: true }) + : params.dispatcher.sendFinalReply({ text: accumulatedBlockText }); + queuedFinal = queuedFinal || delivered; + } if (shouldEmitResolvedIdentityNotice) { const currentMeta = readAcpSessionEntry({