openclaw/src/auto-reply/reply/acp-projector.ts

499 lines
14 KiB
TypeScript

import type { AcpRuntimeEvent, AcpSessionUpdateTag } from "../../acp/runtime/types.js";
import { EmbeddedBlockChunker } from "../../agents/pi-embedded-block-chunker.js";
import { formatToolSummary, resolveToolDisplay } from "../../agents/tool-display.js";
import type { OpenClawConfig } from "../../config/config.js";
import { prefixSystemMessage } from "../../infra/system-message.js";
import type { ReplyPayload } from "../types.js";
import {
type AcpHiddenBoundarySeparator,
isAcpTagVisible,
resolveAcpProjectionSettings,
resolveAcpStreamingConfig,
} from "./acp-stream-settings.js";
import { createBlockReplyPipeline } from "./block-reply-pipeline.js";
import type { ReplyDispatchKind } from "./reply-dispatcher.js";
const ACP_BLOCK_REPLY_TIMEOUT_MS = 15_000;
const ACP_LIVE_IDLE_FLUSH_FLOOR_MS = 750;
const ACP_LIVE_IDLE_MIN_CHARS = 80;
const ACP_LIVE_SOFT_FLUSH_CHARS = 220;
const ACP_LIVE_HARD_FLUSH_CHARS = 480;
const TERMINAL_TOOL_STATUSES = new Set(["completed", "failed", "cancelled", "done", "error"]);
const HIDDEN_BOUNDARY_TAGS = new Set<AcpSessionUpdateTag>(["tool_call", "tool_call_update"]);
export type AcpProjectedDeliveryMeta = {
tag?: AcpSessionUpdateTag;
toolCallId?: string;
toolStatus?: string;
allowEdit?: boolean;
};
type ToolLifecycleState = {
started: boolean;
terminal: boolean;
lastRenderedHash?: string;
};
type BufferedToolDelivery = {
payload: ReplyPayload;
meta?: AcpProjectedDeliveryMeta;
};
function truncateText(input: string, maxChars: number): string {
if (input.length <= maxChars) {
return input;
}
if (maxChars <= 1) {
return input.slice(0, maxChars);
}
return `${input.slice(0, maxChars - 1)}`;
}
function hashText(text: string): string {
return text.trim();
}
function normalizeToolStatus(status: string | undefined): string | undefined {
if (!status) {
return undefined;
}
const normalized = status.trim().toLowerCase();
return normalized || undefined;
}
function resolveHiddenBoundarySeparatorText(mode: AcpHiddenBoundarySeparator): string {
if (mode === "space") {
return " ";
}
if (mode === "newline") {
return "\n";
}
if (mode === "paragraph") {
return "\n\n";
}
return "";
}
function shouldInsertSeparator(params: {
separator: string;
previousTail: string | undefined;
nextText: string;
}): boolean {
if (!params.separator) {
return false;
}
if (!params.nextText) {
return false;
}
const firstChar = params.nextText[0];
if (typeof firstChar === "string" && /\s/.test(firstChar)) {
return false;
}
const tail = params.previousTail ?? "";
if (!tail) {
return false;
}
if (params.separator === " " && /\s$/.test(tail)) {
return false;
}
if ((params.separator === "\n" || params.separator === "\n\n") && tail.endsWith("\n")) {
return false;
}
return true;
}
function shouldFlushLiveBufferOnBoundary(text: string): boolean {
if (!text) {
return false;
}
if (text.length >= ACP_LIVE_HARD_FLUSH_CHARS) {
return true;
}
if (text.endsWith("\n\n")) {
return true;
}
if (/[.!?][)"'`]*\s$/.test(text)) {
return true;
}
if (text.length >= ACP_LIVE_SOFT_FLUSH_CHARS && /\s$/.test(text)) {
return true;
}
return false;
}
function shouldFlushLiveBufferOnIdle(text: string): boolean {
if (!text) {
return false;
}
if (text.length >= ACP_LIVE_IDLE_MIN_CHARS) {
return true;
}
if (/[.!?][)"'`]*$/.test(text.trimEnd())) {
return true;
}
if (text.includes("\n")) {
return true;
}
return false;
}
function renderToolSummaryText(event: Extract<AcpRuntimeEvent, { type: "tool_call" }>): string {
const detailParts: string[] = [];
const title = event.title?.trim();
if (title) {
detailParts.push(title);
}
const status = event.status?.trim();
if (status) {
detailParts.push(`status=${status}`);
}
const fallback = event.text?.trim();
if (detailParts.length === 0 && fallback) {
detailParts.push(fallback);
}
const display = resolveToolDisplay({
name: "tool_call",
meta: detailParts.join(" · ") || "tool call",
});
return formatToolSummary(display);
}
export type AcpReplyProjector = {
onEvent: (event: AcpRuntimeEvent) => Promise<void>;
flush: (force?: boolean) => Promise<void>;
};
export function createAcpReplyProjector(params: {
cfg: OpenClawConfig;
shouldSendToolSummaries: boolean;
deliver: (
kind: ReplyDispatchKind,
payload: ReplyPayload,
meta?: AcpProjectedDeliveryMeta,
) => Promise<boolean>;
provider?: string;
accountId?: string;
}): AcpReplyProjector {
const settings = resolveAcpProjectionSettings(params.cfg);
const streaming = resolveAcpStreamingConfig({
cfg: params.cfg,
provider: params.provider,
accountId: params.accountId,
deliveryMode: settings.deliveryMode,
});
const createTurnBlockReplyPipeline = () =>
createBlockReplyPipeline({
onBlockReply: async (payload) => {
await params.deliver("block", payload);
},
timeoutMs: ACP_BLOCK_REPLY_TIMEOUT_MS,
coalescing: settings.deliveryMode === "live" ? undefined : streaming.coalescing,
});
let blockReplyPipeline = createTurnBlockReplyPipeline();
const chunker = new EmbeddedBlockChunker(streaming.chunking);
const liveIdleFlushMs = Math.max(streaming.coalescing.idleMs, ACP_LIVE_IDLE_FLUSH_FLOOR_MS);
let emittedOutputChars = 0;
let truncationNoticeEmitted = false;
let lastStatusHash: string | undefined;
let lastToolHash: string | undefined;
let lastUsageTuple: string | undefined;
let lastVisibleOutputTail: string | undefined;
let pendingHiddenBoundary = false;
let liveBufferText = "";
let liveIdleTimer: NodeJS.Timeout | undefined;
const pendingToolDeliveries: BufferedToolDelivery[] = [];
const toolLifecycleById = new Map<string, ToolLifecycleState>();
const clearLiveIdleTimer = () => {
if (!liveIdleTimer) {
return;
}
clearTimeout(liveIdleTimer);
liveIdleTimer = undefined;
};
const drainChunker = (force: boolean) => {
if (settings.deliveryMode === "final_only" && !force) {
return;
}
chunker.drain({
force,
emit: (chunk) => {
blockReplyPipeline.enqueue({ text: chunk });
},
});
};
const flushLiveBuffer = (opts?: { force?: boolean; idle?: boolean }) => {
if (settings.deliveryMode !== "live") {
return;
}
if (!liveBufferText) {
return;
}
if (opts?.idle && !shouldFlushLiveBufferOnIdle(liveBufferText)) {
return;
}
const text = liveBufferText;
liveBufferText = "";
chunker.append(text);
drainChunker(opts?.force === true);
};
const scheduleLiveIdleFlush = () => {
if (settings.deliveryMode !== "live") {
return;
}
if (liveIdleFlushMs <= 0 || !liveBufferText) {
return;
}
clearLiveIdleTimer();
liveIdleTimer = setTimeout(() => {
flushLiveBuffer({ force: true, idle: true });
if (liveBufferText) {
scheduleLiveIdleFlush();
}
}, liveIdleFlushMs);
};
const resetTurnState = () => {
clearLiveIdleTimer();
blockReplyPipeline.stop();
blockReplyPipeline = createTurnBlockReplyPipeline();
emittedOutputChars = 0;
truncationNoticeEmitted = false;
lastStatusHash = undefined;
lastToolHash = undefined;
lastUsageTuple = undefined;
lastVisibleOutputTail = undefined;
pendingHiddenBoundary = false;
liveBufferText = "";
pendingToolDeliveries.length = 0;
toolLifecycleById.clear();
};
const flushBufferedToolDeliveries = async (force: boolean) => {
if (!(settings.deliveryMode === "final_only" && force)) {
return;
}
for (const entry of pendingToolDeliveries.splice(0, pendingToolDeliveries.length)) {
await params.deliver("tool", entry.payload, entry.meta);
}
};
const flush = async (force = false): Promise<void> => {
if (settings.deliveryMode === "live") {
clearLiveIdleTimer();
flushLiveBuffer({ force: true });
}
await flushBufferedToolDeliveries(force);
drainChunker(force);
await blockReplyPipeline.flush({ force });
};
const emitSystemStatus = async (
text: string,
meta?: AcpProjectedDeliveryMeta,
opts?: { dedupe?: boolean },
) => {
if (!params.shouldSendToolSummaries) {
return;
}
const bounded = truncateText(text.trim(), settings.maxSessionUpdateChars);
if (!bounded) {
return;
}
const formatted = prefixSystemMessage(bounded);
const hash = hashText(formatted);
const shouldDedupe = settings.repeatSuppression && opts?.dedupe !== false;
if (shouldDedupe && lastStatusHash === hash) {
return;
}
if (settings.deliveryMode === "final_only") {
pendingToolDeliveries.push({
payload: { text: formatted },
meta,
});
} else {
await flush(true);
await params.deliver("tool", { text: formatted }, meta);
}
lastStatusHash = hash;
};
const emitToolSummary = async (event: Extract<AcpRuntimeEvent, { type: "tool_call" }>) => {
if (!params.shouldSendToolSummaries) {
return;
}
if (!isAcpTagVisible(settings, event.tag)) {
return;
}
const renderedToolSummary = renderToolSummaryText(event);
const toolSummary = truncateText(renderedToolSummary, settings.maxSessionUpdateChars);
const hash = hashText(renderedToolSummary);
const toolCallId = event.toolCallId?.trim() || undefined;
const status = normalizeToolStatus(event.status);
const isTerminal = status ? TERMINAL_TOOL_STATUSES.has(status) : false;
const isStart = status === "in_progress" || event.tag === "tool_call";
if (settings.repeatSuppression) {
if (toolCallId) {
const state = toolLifecycleById.get(toolCallId) ?? {
started: false,
terminal: false,
};
if (isTerminal && state.terminal) {
return;
}
if (isStart && state.started) {
return;
}
if (state.lastRenderedHash === hash) {
return;
}
if (isStart) {
state.started = true;
}
if (isTerminal) {
state.terminal = true;
}
state.lastRenderedHash = hash;
toolLifecycleById.set(toolCallId, state);
} else if (lastToolHash === hash) {
return;
}
}
const deliveryMeta: AcpProjectedDeliveryMeta = {
...(event.tag ? { tag: event.tag } : {}),
...(toolCallId ? { toolCallId } : {}),
...(status ? { toolStatus: status } : {}),
allowEdit: Boolean(toolCallId && event.tag === "tool_call_update"),
};
if (settings.deliveryMode === "final_only") {
pendingToolDeliveries.push({
payload: { text: toolSummary },
meta: deliveryMeta,
});
} else {
await flush(true);
await params.deliver("tool", { text: toolSummary }, deliveryMeta);
}
lastToolHash = hash;
};
const emitTruncationNotice = async () => {
if (truncationNoticeEmitted) {
return;
}
truncationNoticeEmitted = true;
await emitSystemStatus(
"output truncated",
{
tag: "session_info_update",
},
{
dedupe: false,
},
);
};
const onEvent = async (event: AcpRuntimeEvent): Promise<void> => {
if (event.type === "text_delta") {
if (event.stream && event.stream !== "output") {
return;
}
if (!isAcpTagVisible(settings, event.tag)) {
return;
}
let text = event.text;
if (!text) {
return;
}
if (
pendingHiddenBoundary &&
shouldInsertSeparator({
separator: resolveHiddenBoundarySeparatorText(settings.hiddenBoundarySeparator),
previousTail: lastVisibleOutputTail,
nextText: text,
})
) {
text = `${resolveHiddenBoundarySeparatorText(settings.hiddenBoundarySeparator)}${text}`;
}
pendingHiddenBoundary = false;
if (emittedOutputChars >= settings.maxOutputChars) {
await emitTruncationNotice();
return;
}
const remaining = settings.maxOutputChars - emittedOutputChars;
const accepted = remaining < text.length ? text.slice(0, remaining) : text;
if (accepted.length > 0) {
emittedOutputChars += accepted.length;
lastVisibleOutputTail = accepted.slice(-1);
if (settings.deliveryMode === "live") {
liveBufferText += accepted;
if (shouldFlushLiveBufferOnBoundary(liveBufferText)) {
clearLiveIdleTimer();
flushLiveBuffer({ force: true });
} else {
scheduleLiveIdleFlush();
}
} else {
chunker.append(accepted);
drainChunker(false);
}
}
if (accepted.length < text.length) {
await emitTruncationNotice();
}
return;
}
if (event.type === "status") {
if (!isAcpTagVisible(settings, event.tag)) {
return;
}
if (event.tag === "usage_update" && settings.repeatSuppression) {
const usageTuple =
typeof event.used === "number" && typeof event.size === "number"
? `${event.used}/${event.size}`
: hashText(event.text);
if (usageTuple === lastUsageTuple) {
return;
}
lastUsageTuple = usageTuple;
}
await emitSystemStatus(event.text, event.tag ? { tag: event.tag } : undefined, {
dedupe: true,
});
return;
}
if (event.type === "tool_call") {
if (!isAcpTagVisible(settings, event.tag)) {
if (event.tag && HIDDEN_BOUNDARY_TAGS.has(event.tag)) {
const status = normalizeToolStatus(event.status);
const isTerminal = status ? TERMINAL_TOOL_STATUSES.has(status) : false;
pendingHiddenBoundary = pendingHiddenBoundary || event.tag === "tool_call" || isTerminal;
}
return;
}
await emitToolSummary(event);
return;
}
if (event.type === "done" || event.type === "error") {
await flush(true);
resetTurnState();
}
};
return {
onEvent,
flush,
};
}