From 43c57005a654a966d88bb8e8a8f8fb0c3f246659 Mon Sep 17 00:00:00 2001 From: Onur <2453968+osolmaz@users.noreply.github.com> Date: Sun, 1 Mar 2026 15:21:21 +0100 Subject: [PATCH] ACP: start typing lifecycle at turn start and harden delivery --- src/auto-reply/reply/acp-projector.test.ts | 4 +- .../reply/acp-stream-settings.test.ts | 2 +- src/auto-reply/reply/acp-stream-settings.ts | 2 +- .../reply/dispatch-acp-delivery.test.ts | 93 +++++++++++++++++++ src/auto-reply/reply/dispatch-acp-delivery.ts | 6 +- src/auto-reply/reply/dispatch-acp.test.ts | 83 ++++++++++++++++- src/auto-reply/reply/dispatch-acp.ts | 8 ++ src/config/schema.help.ts | 2 +- 8 files changed, 189 insertions(+), 11 deletions(-) create mode 100644 src/auto-reply/reply/dispatch-acp-delivery.test.ts diff --git a/src/auto-reply/reply/acp-projector.test.ts b/src/auto-reply/reply/acp-projector.test.ts index 0c58a5ce34d..40bcf84e974 100644 --- a/src/auto-reply/reply/acp-projector.test.ts +++ b/src/auto-reply/reply/acp-projector.test.ts @@ -574,7 +574,7 @@ describe("createAcpReplyProjector", () => { expect(deliveries[0]?.text).toContain("Tool Call"); }); - it("inserts a newline boundary before visible text after hidden tool updates by default", async () => { + it("inserts a paragraph boundary before visible text after hidden tool updates by default", async () => { const deliveries: Array<{ kind: string; text?: string }> = []; const projector = createAcpReplyProjector({ cfg: createCfg({ @@ -610,7 +610,7 @@ describe("createAcpReplyProjector", () => { .filter((entry) => entry.kind === "block") .map((entry) => entry.text ?? "") .join(""); - expect(combinedText).toBe("fallback.\nI don't"); + expect(combinedText).toBe("fallback.\n\nI don't"); }); it("supports hiddenBoundarySeparator=space", async () => { diff --git a/src/auto-reply/reply/acp-stream-settings.test.ts b/src/auto-reply/reply/acp-stream-settings.test.ts index 2cb6207d57e..ef35508db1c 100644 --- a/src/auto-reply/reply/acp-stream-settings.test.ts +++ b/src/auto-reply/reply/acp-stream-settings.test.ts @@ -10,7 +10,7 @@ describe("acp stream settings", () => { it("resolves stable defaults", () => { const settings = resolveAcpProjectionSettings(createAcpTestConfig()); expect(settings.deliveryMode).toBe("final_only"); - expect(settings.hiddenBoundarySeparator).toBe("newline"); + expect(settings.hiddenBoundarySeparator).toBe("paragraph"); expect(settings.repeatSuppression).toBe(true); expect(settings.maxTurnChars).toBe(24_000); expect(settings.maxMetaEventsPerTurn).toBe(64); diff --git a/src/auto-reply/reply/acp-stream-settings.ts b/src/auto-reply/reply/acp-stream-settings.ts index bbe51ae2f14..c0d32f1e75b 100644 --- a/src/auto-reply/reply/acp-stream-settings.ts +++ b/src/auto-reply/reply/acp-stream-settings.ts @@ -6,7 +6,7 @@ const DEFAULT_ACP_STREAM_COALESCE_IDLE_MS = 350; const DEFAULT_ACP_STREAM_MAX_CHUNK_CHARS = 1800; const DEFAULT_ACP_REPEAT_SUPPRESSION = true; const DEFAULT_ACP_DELIVERY_MODE = "final_only"; -const DEFAULT_ACP_HIDDEN_BOUNDARY_SEPARATOR = "newline"; +const DEFAULT_ACP_HIDDEN_BOUNDARY_SEPARATOR = "paragraph"; const DEFAULT_ACP_MAX_TURN_CHARS = 24_000; const DEFAULT_ACP_MAX_TOOL_SUMMARY_CHARS = 320; const DEFAULT_ACP_MAX_STATUS_CHARS = 320; diff --git a/src/auto-reply/reply/dispatch-acp-delivery.test.ts b/src/auto-reply/reply/dispatch-acp-delivery.test.ts new file mode 100644 index 00000000000..26733136ad0 --- /dev/null +++ b/src/auto-reply/reply/dispatch-acp-delivery.test.ts @@ -0,0 +1,93 @@ +import { describe, expect, it, vi } from "vitest"; +import { createAcpDispatchDeliveryCoordinator } from "./dispatch-acp-delivery.js"; +import type { ReplyDispatcher } from "./reply-dispatcher.js"; +import { buildTestCtx } from "./test-ctx.js"; +import { createAcpTestConfig } from "./test-fixtures/acp-runtime.js"; + +const ttsMocks = vi.hoisted(() => ({ + maybeApplyTtsToPayload: vi.fn(async (paramsUnknown: unknown) => { + const params = paramsUnknown as { payload: unknown }; + return params.payload; + }), +})); + +vi.mock("../../tts/tts.js", () => ({ + maybeApplyTtsToPayload: (params: unknown) => ttsMocks.maybeApplyTtsToPayload(params), +})); + +function createDispatcher(): ReplyDispatcher { + return { + sendToolResult: vi.fn(() => true), + sendBlockReply: vi.fn(() => true), + sendFinalReply: vi.fn(() => true), + waitForIdle: vi.fn(async () => {}), + getQueuedCounts: vi.fn(() => ({ tool: 0, block: 0, final: 0 })), + markComplete: vi.fn(), + }; +} + +describe("createAcpDispatchDeliveryCoordinator", () => { + it("starts reply lifecycle only once when called directly and through deliver", async () => { + const onReplyStart = vi.fn(async () => {}); + const coordinator = createAcpDispatchDeliveryCoordinator({ + cfg: createAcpTestConfig(), + ctx: buildTestCtx({ + Provider: "discord", + Surface: "discord", + SessionKey: "agent:codex-acp:session-1", + }), + dispatcher: createDispatcher(), + inboundAudio: false, + shouldRouteToOriginating: false, + onReplyStart, + }); + + await coordinator.startReplyLifecycle(); + await coordinator.deliver("final", { text: "hello" }); + await coordinator.startReplyLifecycle(); + await coordinator.deliver("block", { text: "world" }); + + expect(onReplyStart).toHaveBeenCalledTimes(1); + }); + + it("starts reply lifecycle once when deliver triggers first", async () => { + const onReplyStart = vi.fn(async () => {}); + const coordinator = createAcpDispatchDeliveryCoordinator({ + cfg: createAcpTestConfig(), + ctx: buildTestCtx({ + Provider: "discord", + Surface: "discord", + SessionKey: "agent:codex-acp:session-1", + }), + dispatcher: createDispatcher(), + inboundAudio: false, + shouldRouteToOriginating: false, + onReplyStart, + }); + + await coordinator.deliver("final", { text: "hello" }); + await coordinator.startReplyLifecycle(); + + expect(onReplyStart).toHaveBeenCalledTimes(1); + }); + + it("does not start reply lifecycle for empty payload delivery", async () => { + const onReplyStart = vi.fn(async () => {}); + const coordinator = createAcpDispatchDeliveryCoordinator({ + cfg: createAcpTestConfig(), + ctx: buildTestCtx({ + Provider: "discord", + Surface: "discord", + SessionKey: "agent:codex-acp:session-1", + }), + dispatcher: createDispatcher(), + inboundAudio: false, + shouldRouteToOriginating: false, + onReplyStart, + }); + + await coordinator.deliver("final", {}); + + expect(onReplyStart).not.toHaveBeenCalled(); + }); +}); diff --git a/src/auto-reply/reply/dispatch-acp-delivery.ts b/src/auto-reply/reply/dispatch-acp-delivery.ts index 84d8f33a505..6624f9868a2 100644 --- a/src/auto-reply/reply/dispatch-acp-delivery.ts +++ b/src/auto-reply/reply/dispatch-acp-delivery.ts @@ -30,6 +30,7 @@ type AcpDispatchDeliveryState = { }; export type AcpDispatchDeliveryCoordinator = { + startReplyLifecycle: () => Promise; deliver: ( kind: ReplyDispatchKind, payload: ReplyPayload, @@ -65,7 +66,7 @@ export function createAcpDispatchDeliveryCoordinator(params: { toolMessageByCallId: new Map(), }; - const ensureReplyLifecycleStarted = async () => { + const startReplyLifecycleOnce = async () => { if (state.startedReplyLifecycle) { return; } @@ -127,7 +128,7 @@ export function createAcpDispatchDeliveryCoordinator(params: { } if ((payload.text?.trim() ?? "").length > 0 || payload.mediaUrl || payload.mediaUrls?.length) { - await ensureReplyLifecycleStarted(); + await startReplyLifecycleOnce(); } const ttsPayload = await maybeApplyTtsToPayload({ @@ -186,6 +187,7 @@ export function createAcpDispatchDeliveryCoordinator(params: { }; return { + startReplyLifecycle: startReplyLifecycleOnce, deliver, getBlockCount: () => state.blockCount, getAccumulatedBlockText: () => state.accumulatedBlockText, diff --git a/src/auto-reply/reply/dispatch-acp.test.ts b/src/auto-reply/reply/dispatch-acp.test.ts index 92342d503c6..e573d6e0e89 100644 --- a/src/auto-reply/reply/dispatch-acp.test.ts +++ b/src/auto-reply/reply/dispatch-acp.test.ts @@ -103,6 +103,20 @@ function setReadyAcpResolution() { }); } +function createAcpConfigWithVisibleToolTags(): OpenClawConfig { + return createAcpTestConfig({ + acp: { + enabled: true, + stream: { + tagVisibility: { + tool_call: true, + tool_call_update: true, + }, + }, + }, + }); +} + describe("tryDispatchAcpReply", () => { beforeEach(() => { managerMocks.resolveSession.mockReset(); @@ -202,7 +216,7 @@ describe("tryDispatchAcpReply", () => { SessionKey: "agent:codex-acp:session-1", BodyForAgent: "run tool", }), - cfg: createAcpTestConfig(), + cfg: createAcpConfigWithVisibleToolTags(), dispatcher, sessionKey: "agent:codex-acp:session-1", inboundAudio: false, @@ -262,7 +276,7 @@ describe("tryDispatchAcpReply", () => { SessionKey: "agent:codex-acp:session-1", BodyForAgent: "run tool", }), - cfg: createAcpTestConfig(), + cfg: createAcpConfigWithVisibleToolTags(), dispatcher, sessionKey: "agent:codex-acp:session-1", inboundAudio: false, @@ -279,7 +293,7 @@ describe("tryDispatchAcpReply", () => { expect(routeMocks.routeReply).toHaveBeenCalledTimes(2); }); - it("starts reply lifecycle only when visible projected output is emitted", async () => { + it("starts reply lifecycle when ACP turn starts, including hidden-only turns", async () => { setReadyAcpResolution(); const onReplyStart = vi.fn(); const { dispatcher } = createDispatcher(); @@ -314,7 +328,7 @@ describe("tryDispatchAcpReply", () => { recordProcessed: vi.fn(), markIdle: vi.fn(), }); - expect(onReplyStart).not.toHaveBeenCalled(); + expect(onReplyStart).toHaveBeenCalledTimes(1); managerMocks.runTurn.mockImplementationOnce( async ({ onEvent }: { onEvent: (event: unknown) => Promise }) => { @@ -340,9 +354,70 @@ describe("tryDispatchAcpReply", () => { recordProcessed: vi.fn(), markIdle: vi.fn(), }); + expect(onReplyStart).toHaveBeenCalledTimes(2); + }); + + it("starts reply lifecycle once per turn when output is delivered", async () => { + setReadyAcpResolution(); + const onReplyStart = vi.fn(); + + managerMocks.runTurn.mockImplementationOnce( + async ({ onEvent }: { onEvent: (event: unknown) => Promise }) => { + await onEvent({ type: "text_delta", text: "visible", tag: "agent_message_chunk" }); + await onEvent({ type: "done" }); + }, + ); + + await tryDispatchAcpReply({ + ctx: buildTestCtx({ + Provider: "discord", + Surface: "discord", + SessionKey: "agent:codex-acp:session-1", + BodyForAgent: "visible", + }), + cfg: createAcpTestConfig(), + dispatcher: createDispatcher().dispatcher, + sessionKey: "agent:codex-acp:session-1", + inboundAudio: false, + shouldRouteToOriginating: false, + shouldSendToolSummaries: true, + bypassForCommand: false, + onReplyStart, + recordProcessed: vi.fn(), + markIdle: vi.fn(), + }); + expect(onReplyStart).toHaveBeenCalledTimes(1); }); + it("does not start reply lifecycle for empty ACP prompt", async () => { + setReadyAcpResolution(); + const onReplyStart = vi.fn(); + const { dispatcher } = createDispatcher(); + + await tryDispatchAcpReply({ + ctx: buildTestCtx({ + Provider: "discord", + Surface: "discord", + SessionKey: "agent:codex-acp:session-1", + BodyForAgent: " ", + }), + cfg: createAcpTestConfig(), + dispatcher, + sessionKey: "agent:codex-acp:session-1", + inboundAudio: false, + shouldRouteToOriginating: false, + shouldSendToolSummaries: true, + bypassForCommand: false, + onReplyStart, + recordProcessed: vi.fn(), + markIdle: vi.fn(), + }); + + expect(managerMocks.runTurn).not.toHaveBeenCalled(); + expect(onReplyStart).not.toHaveBeenCalled(); + }); + it("surfaces ACP policy errors as final error replies", async () => { setReadyAcpResolution(); policyMocks.resolveAcpDispatchPolicyError.mockReturnValue( diff --git a/src/auto-reply/reply/dispatch-acp.ts b/src/auto-reply/reply/dispatch-acp.ts index c40965a763c..33990cb20d6 100644 --- a/src/auto-reply/reply/dispatch-acp.ts +++ b/src/auto-reply/reply/dispatch-acp.ts @@ -239,6 +239,14 @@ export async function tryDispatchAcpReply(params: { throw agentPolicyError; } + try { + await delivery.startReplyLifecycle(); + } catch (error) { + logVerbose( + `dispatch-acp: start reply lifecycle failed: ${error instanceof Error ? error.message : String(error)}`, + ); + } + await acpManager.runTurn({ cfg: params.cfg, sessionKey, diff --git a/src/config/schema.help.ts b/src/config/schema.help.ts index 447230c96b0..ae544623779 100644 --- a/src/config/schema.help.ts +++ b/src/config/schema.help.ts @@ -177,7 +177,7 @@ export const FIELD_HELP: Record = { "acp.stream.deliveryMode": "ACP delivery style: live streams projected output incrementally, final_only buffers all projected ACP output until terminal turn events.", "acp.stream.hiddenBoundarySeparator": - "Separator inserted before next visible assistant text when hidden ACP tool lifecycle events occurred (none|space|newline|paragraph).", + "Separator inserted before next visible assistant text when hidden ACP tool lifecycle events occurred (none|space|newline|paragraph). Default: paragraph.", "acp.stream.maxTurnChars": "Maximum assistant text characters projected per ACP turn before truncation notice is emitted.", "acp.stream.maxToolSummaryChars":