From c3972982b5d47e8250ca2b1a64a25b373d9c1f2f Mon Sep 17 00:00:00 2001 From: Josh Lehman Date: Fri, 20 Mar 2026 15:03:30 -0700 Subject: [PATCH] fix: sanitize malformed replay tool calls (#50005) Merged via squash. Prepared head SHA: 64ad5563f7ae321b749d5a52bc0b477d666dc6be Co-authored-by: jalehman <550978+jalehman@users.noreply.github.com> Co-authored-by: jalehman <550978+jalehman@users.noreply.github.com> Reviewed-by: @jalehman --- CHANGELOG.md | 1 + .../pi-embedded-runner/run/attempt.test.ts | 547 ++++++++++++++++++ src/agents/pi-embedded-runner/run/attempt.ts | 237 ++++++++ src/agents/session-transcript-repair.ts | 50 +- .../bundled-web-search-registry.ts | 16 +- src/plugins/bundled-web-search.ts | 2 +- src/plugins/contracts/registry.ts | 2 +- 7 files changed, 830 insertions(+), 25 deletions(-) rename src/{plugins => }/bundled-web-search-registry.ts (56%) diff --git a/CHANGELOG.md b/CHANGELOG.md index 4f533794769..210ce179a32 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -187,6 +187,7 @@ Docs: https://docs.openclaw.ai - Agents/compaction: add an opt-in post-compaction session JSONL truncation step that drops summarized transcript entries while preserving the retained branch tail and live session metadata. (#41021) thanks @thirumaleshp. - Telegram/routing: fail loud when `message send` targets an unknown non-default Telegram `accountId`, instead of silently falling back to the channel-level bot token and sending through the wrong bot. (#50853) Thanks @hclsys. - Web search: align onboarding, configure, and finalize with plugin-owned provider contracts, including disabled-provider recovery, config-aware credential hooks, and runtime-visible summaries. (#50935) Thanks @gumadeiras. +- Agents/replay: sanitize malformed assistant tool-call replay blocks before provider replay so follow-up Anthropic requests do not inherit the downstream `replace` crash. (#50005) Thanks @jalehman. ### Breaking diff --git a/src/agents/pi-embedded-runner/run/attempt.test.ts b/src/agents/pi-embedded-runner/run/attempt.test.ts index 20bf752587b..39b2abe4da7 100644 --- a/src/agents/pi-embedded-runner/run/attempt.test.ts +++ b/src/agents/pi-embedded-runner/run/attempt.test.ts @@ -16,6 +16,7 @@ import { decodeHtmlEntitiesInObject, wrapOllamaCompatNumCtx, wrapStreamFnRepairMalformedToolCallArguments, + wrapStreamFnSanitizeMalformedToolCalls, wrapStreamFnTrimToolCallNames, } from "./attempt.js"; @@ -779,6 +780,552 @@ describe("wrapStreamFnTrimToolCallNames", () => { }); }); +describe("wrapStreamFnSanitizeMalformedToolCalls", () => { + it("drops malformed assistant tool calls from outbound context before provider replay", async () => { + const messages = [ + { + role: "assistant", + stopReason: "error", + content: [{ type: "toolCall", name: "read", arguments: {} }], + }, + { + role: "user", + content: [{ type: "text", text: "retry" }], + }, + ]; + const baseFn = vi.fn((_model, _context) => + createFakeStream({ events: [], resultMessage: { role: "assistant", content: [] } }), + ); + + const wrapped = wrapStreamFnSanitizeMalformedToolCalls(baseFn as never, new Set(["read"])); + const stream = wrapped({} as never, { messages } as never, {} as never) as + | FakeWrappedStream + | Promise; + await Promise.resolve(stream); + + expect(baseFn).toHaveBeenCalledTimes(1); + const seenContext = baseFn.mock.calls[0]?.[1] as { messages: unknown[] }; + expect(seenContext.messages).toEqual([ + { + role: "user", + content: [{ type: "text", text: "retry" }], + }, + ]); + expect(seenContext.messages).not.toBe(messages); + }); + + it("preserves outbound context when all assistant tool calls are valid", async () => { + const messages = [ + { + role: "assistant", + content: [{ type: "toolCall", id: "call_1", name: "read", arguments: {} }], + }, + ]; + const baseFn = vi.fn((_model, _context) => + createFakeStream({ events: [], resultMessage: { role: "assistant", content: [] } }), + ); + + const wrapped = wrapStreamFnSanitizeMalformedToolCalls(baseFn as never, new Set(["read"])); + const stream = wrapped({} as never, { messages } as never, {} as never) as + | FakeWrappedStream + | Promise; + await Promise.resolve(stream); + + expect(baseFn).toHaveBeenCalledTimes(1); + const seenContext = baseFn.mock.calls[0]?.[1] as { messages: unknown[] }; + expect(seenContext.messages).toBe(messages); + }); + + it("preserves sessions_spawn attachment payloads on replay", async () => { + const attachmentContent = "INLINE_ATTACHMENT_PAYLOAD"; + const messages = [ + { + role: "assistant", + content: [ + { + type: "toolUse", + id: "call_1", + name: " SESSIONS_SPAWN ", + input: { + task: "inspect attachment", + attachments: [{ name: "snapshot.txt", content: attachmentContent }], + }, + }, + ], + }, + ]; + const baseFn = vi.fn((_model, _context) => + createFakeStream({ events: [], resultMessage: { role: "assistant", content: [] } }), + ); + + const wrapped = wrapStreamFnSanitizeMalformedToolCalls( + baseFn as never, + new Set(["sessions_spawn"]), + ); + const stream = wrapped({} as never, { messages } as never, {} as never) as + | FakeWrappedStream + | Promise; + await Promise.resolve(stream); + + expect(baseFn).toHaveBeenCalledTimes(1); + const seenContext = baseFn.mock.calls[0]?.[1] as { + messages: Array<{ content?: Array> }>; + }; + const toolCall = seenContext.messages[0]?.content?.[0] as { + name?: string; + input?: { attachments?: Array<{ content?: string }> }; + }; + expect(toolCall.name).toBe("sessions_spawn"); + expect(toolCall.input?.attachments?.[0]?.content).toBe(attachmentContent); + }); + + it("preserves allowlisted tool names that contain punctuation", async () => { + const messages = [ + { + role: "assistant", + content: [{ type: "toolUse", id: "call_1", name: "admin.export", input: { scope: "all" } }], + }, + ]; + const baseFn = vi.fn((_model, _context) => + createFakeStream({ events: [], resultMessage: { role: "assistant", content: [] } }), + ); + + const wrapped = wrapStreamFnSanitizeMalformedToolCalls( + baseFn as never, + new Set(["admin.export"]), + ); + const stream = wrapped({} as never, { messages } as never, {} as never) as + | FakeWrappedStream + | Promise; + await Promise.resolve(stream); + + expect(baseFn).toHaveBeenCalledTimes(1); + const seenContext = baseFn.mock.calls[0]?.[1] as { messages: unknown[] }; + expect(seenContext.messages).toBe(messages); + }); + + it("normalizes provider-prefixed replayed tool names before provider replay", async () => { + const messages = [ + { + role: "assistant", + content: [{ type: "toolUse", id: "call_1", name: "functions.read", input: { path: "." } }], + }, + ]; + const baseFn = vi.fn((_model, _context) => + createFakeStream({ events: [], resultMessage: { role: "assistant", content: [] } }), + ); + + const wrapped = wrapStreamFnSanitizeMalformedToolCalls(baseFn as never, new Set(["read"])); + const stream = wrapped({} as never, { messages } as never, {} as never) as + | FakeWrappedStream + | Promise; + await Promise.resolve(stream); + + expect(baseFn).toHaveBeenCalledTimes(1); + const seenContext = baseFn.mock.calls[0]?.[1] as { + messages: Array<{ content?: Array<{ name?: string }> }>; + }; + expect(seenContext.messages[0]?.content?.[0]?.name).toBe("read"); + }); + + it("canonicalizes mixed-case allowlisted tool names on replay", async () => { + const messages = [ + { + role: "assistant", + content: [{ type: "toolCall", id: "call_1", name: "readfile", arguments: {} }], + }, + ]; + const baseFn = vi.fn((_model, _context) => + createFakeStream({ events: [], resultMessage: { role: "assistant", content: [] } }), + ); + + const wrapped = wrapStreamFnSanitizeMalformedToolCalls(baseFn as never, new Set(["ReadFile"])); + const stream = wrapped({} as never, { messages } as never, {} as never) as + | FakeWrappedStream + | Promise; + await Promise.resolve(stream); + + expect(baseFn).toHaveBeenCalledTimes(1); + const seenContext = baseFn.mock.calls[0]?.[1] as { + messages: Array<{ content?: Array<{ name?: string }> }>; + }; + expect(seenContext.messages[0]?.content?.[0]?.name).toBe("ReadFile"); + }); + + it("recovers blank replayed tool names from their ids", async () => { + const messages = [ + { + role: "assistant", + content: [{ type: "toolCall", id: "functionswrite4", name: " ", arguments: {} }], + }, + ]; + const baseFn = vi.fn((_model, _context) => + createFakeStream({ events: [], resultMessage: { role: "assistant", content: [] } }), + ); + + const wrapped = wrapStreamFnSanitizeMalformedToolCalls(baseFn as never, new Set(["write"])); + const stream = wrapped({} as never, { messages } as never, {} as never) as + | FakeWrappedStream + | Promise; + await Promise.resolve(stream); + + expect(baseFn).toHaveBeenCalledTimes(1); + const seenContext = baseFn.mock.calls[0]?.[1] as { + messages: Array<{ content?: Array<{ name?: string }> }>; + }; + expect(seenContext.messages[0]?.content?.[0]?.name).toBe("write"); + }); + + it("recovers mangled replayed tool names before dropping the call", async () => { + const messages = [ + { + role: "assistant", + content: [{ type: "toolCall", id: "call_1", name: "functionsread3", arguments: {} }], + }, + ]; + const baseFn = vi.fn((_model, _context) => + createFakeStream({ events: [], resultMessage: { role: "assistant", content: [] } }), + ); + + const wrapped = wrapStreamFnSanitizeMalformedToolCalls(baseFn as never, new Set(["read"])); + const stream = wrapped({} as never, { messages } as never, {} as never) as + | FakeWrappedStream + | Promise; + await Promise.resolve(stream); + + expect(baseFn).toHaveBeenCalledTimes(1); + const seenContext = baseFn.mock.calls[0]?.[1] as { + messages: Array<{ content?: Array<{ name?: string }> }>; + }; + expect(seenContext.messages[0]?.content?.[0]?.name).toBe("read"); + }); + + it("drops orphaned tool results after replay sanitization removes a tool-call turn", async () => { + const messages = [ + { + role: "assistant", + content: [{ type: "toolCall", name: "read", arguments: {} }], + stopReason: "error", + }, + { + role: "toolResult", + toolCallId: "call_missing", + toolName: "read", + content: [{ type: "text", text: "stale result" }], + isError: false, + }, + { + role: "user", + content: [{ type: "text", text: "retry" }], + }, + ]; + const baseFn = vi.fn((_model, _context) => + createFakeStream({ events: [], resultMessage: { role: "assistant", content: [] } }), + ); + + const wrapped = wrapStreamFnSanitizeMalformedToolCalls(baseFn as never, new Set(["read"])); + const stream = wrapped({} as never, { messages } as never, {} as never) as + | FakeWrappedStream + | Promise; + await Promise.resolve(stream); + + expect(baseFn).toHaveBeenCalledTimes(1); + const seenContext = baseFn.mock.calls[0]?.[1] as { + messages: Array<{ role?: string }>; + }; + expect(seenContext.messages).toEqual([ + { + role: "user", + content: [{ type: "text", text: "retry" }], + }, + ]); + }); + + it("drops replayed tool calls that are no longer allowlisted", async () => { + const messages = [ + { + role: "assistant", + content: [{ type: "toolCall", id: "call_1", name: "write", arguments: {} }], + }, + { + role: "toolResult", + toolCallId: "call_1", + toolName: "write", + content: [{ type: "text", text: "stale result" }], + isError: false, + }, + { + role: "user", + content: [{ type: "text", text: "retry" }], + }, + ]; + const baseFn = vi.fn((_model, _context) => + createFakeStream({ events: [], resultMessage: { role: "assistant", content: [] } }), + ); + + const wrapped = wrapStreamFnSanitizeMalformedToolCalls(baseFn as never, new Set(["read"])); + const stream = wrapped({} as never, { messages } as never, {} as never) as + | FakeWrappedStream + | Promise; + await Promise.resolve(stream); + + expect(baseFn).toHaveBeenCalledTimes(1); + const seenContext = baseFn.mock.calls[0]?.[1] as { + messages: Array<{ role?: string }>; + }; + expect(seenContext.messages).toEqual([ + { + role: "user", + content: [{ type: "text", text: "retry" }], + }, + ]); + }); + it("drops replayed tool names that are no longer allowlisted", async () => { + const messages = [ + { + role: "assistant", + content: [{ type: "toolUse", id: "call_1", name: "unknown_tool", input: { path: "." } }], + }, + { + role: "toolResult", + toolCallId: "call_1", + toolName: "unknown_tool", + content: [{ type: "text", text: "stale result" }], + isError: false, + }, + ]; + const baseFn = vi.fn((_model, _context) => + createFakeStream({ events: [], resultMessage: { role: "assistant", content: [] } }), + ); + + const wrapped = wrapStreamFnSanitizeMalformedToolCalls(baseFn as never, new Set(["read"])); + const stream = wrapped({} as never, { messages } as never, {} as never) as + | FakeWrappedStream + | Promise; + await Promise.resolve(stream); + + expect(baseFn).toHaveBeenCalledTimes(1); + const seenContext = baseFn.mock.calls[0]?.[1] as { messages: unknown[] }; + expect(seenContext.messages).toEqual([]); + }); + + it("drops ambiguous mangled replay names instead of guessing a tool", async () => { + const messages = [ + { + role: "assistant", + content: [{ type: "toolCall", id: "call_1", name: "functions.exec2", arguments: {} }], + }, + ]; + const baseFn = vi.fn((_model, _context) => + createFakeStream({ events: [], resultMessage: { role: "assistant", content: [] } }), + ); + + const wrapped = wrapStreamFnSanitizeMalformedToolCalls( + baseFn as never, + new Set(["exec", "exec2"]), + ); + const stream = wrapped({} as never, { messages } as never, {} as never) as + | FakeWrappedStream + | Promise; + await Promise.resolve(stream); + + expect(baseFn).toHaveBeenCalledTimes(1); + const seenContext = baseFn.mock.calls[0]?.[1] as { messages: unknown[] }; + expect(seenContext.messages).toEqual([]); + }); + + it("preserves matching tool results for retained errored assistant turns", async () => { + const messages = [ + { + role: "assistant", + stopReason: "error", + content: [ + { type: "toolCall", id: "call_1", name: "read", arguments: {} }, + { type: "toolCall", name: "read", arguments: {} }, + ], + }, + { + role: "toolResult", + toolCallId: "call_1", + toolName: "read", + content: [{ type: "text", text: "kept result" }], + isError: false, + }, + { + role: "user", + content: [{ type: "text", text: "retry" }], + }, + ]; + const baseFn = vi.fn((_model, _context) => + createFakeStream({ events: [], resultMessage: { role: "assistant", content: [] } }), + ); + + const wrapped = wrapStreamFnSanitizeMalformedToolCalls(baseFn as never, new Set(["read"])); + const stream = wrapped({} as never, { messages } as never, {} as never) as + | FakeWrappedStream + | Promise; + await Promise.resolve(stream); + + expect(baseFn).toHaveBeenCalledTimes(1); + const seenContext = baseFn.mock.calls[0]?.[1] as { messages: unknown[] }; + expect(seenContext.messages).toEqual([ + { + role: "assistant", + stopReason: "error", + content: [{ type: "toolCall", id: "call_1", name: "read", arguments: {} }], + }, + { + role: "toolResult", + toolCallId: "call_1", + toolName: "read", + content: [{ type: "text", text: "kept result" }], + isError: false, + }, + { + role: "user", + content: [{ type: "text", text: "retry" }], + }, + ]); + }); + + it("revalidates turn ordering after dropping an assistant replay turn", async () => { + const messages = [ + { + role: "user", + content: [{ type: "text", text: "first" }], + }, + { + role: "assistant", + stopReason: "error", + content: [{ type: "toolCall", name: "read", arguments: {} }], + }, + { + role: "user", + content: [{ type: "text", text: "second" }], + }, + ]; + const baseFn = vi.fn((_model, _context) => + createFakeStream({ events: [], resultMessage: { role: "assistant", content: [] } }), + ); + + const wrapped = wrapStreamFnSanitizeMalformedToolCalls(baseFn as never, new Set(["read"]), { + validateGeminiTurns: false, + validateAnthropicTurns: true, + }); + const stream = wrapped({} as never, { messages } as never, {} as never) as + | FakeWrappedStream + | Promise; + await Promise.resolve(stream); + + expect(baseFn).toHaveBeenCalledTimes(1); + const seenContext = baseFn.mock.calls[0]?.[1] as { + messages: Array<{ role?: string; content?: unknown[] }>; + }; + expect(seenContext.messages).toEqual([ + { + role: "user", + content: [ + { type: "text", text: "first" }, + { type: "text", text: "second" }, + ], + }, + ]); + }); + + it("drops orphaned Anthropic user tool_result blocks after replay sanitization", async () => { + const messages = [ + { + role: "assistant", + content: [ + { type: "text", text: "partial response" }, + { type: "toolUse", name: "read", input: { path: "." } }, + ], + }, + { + role: "user", + content: [ + { type: "toolResult", toolUseId: "call_1", content: [{ type: "text", text: "stale" }] }, + { type: "text", text: "retry" }, + ], + }, + ]; + const baseFn = vi.fn((_model, _context) => + createFakeStream({ events: [], resultMessage: { role: "assistant", content: [] } }), + ); + + const wrapped = wrapStreamFnSanitizeMalformedToolCalls(baseFn as never, new Set(["read"]), { + validateGeminiTurns: false, + validateAnthropicTurns: true, + }); + const stream = wrapped({} as never, { messages } as never, {} as never) as + | FakeWrappedStream + | Promise; + await Promise.resolve(stream); + + expect(baseFn).toHaveBeenCalledTimes(1); + const seenContext = baseFn.mock.calls[0]?.[1] as { + messages: Array<{ role?: string; content?: unknown[] }>; + }; + expect(seenContext.messages).toEqual([ + { + role: "assistant", + content: [{ type: "text", text: "partial response" }], + }, + { + role: "user", + content: [{ type: "text", text: "retry" }], + }, + ]); + }); + + it("drops orphaned Anthropic user tool_result blocks after dropping an assistant replay turn", async () => { + const messages = [ + { + role: "user", + content: [{ type: "text", text: "first" }], + }, + { + role: "assistant", + stopReason: "error", + content: [{ type: "toolUse", name: "read", input: { path: "." } }], + }, + { + role: "user", + content: [ + { type: "toolResult", toolUseId: "call_1", content: [{ type: "text", text: "stale" }] }, + { type: "text", text: "second" }, + ], + }, + ]; + const baseFn = vi.fn((_model, _context) => + createFakeStream({ events: [], resultMessage: { role: "assistant", content: [] } }), + ); + + const wrapped = wrapStreamFnSanitizeMalformedToolCalls(baseFn as never, new Set(["read"]), { + validateGeminiTurns: false, + validateAnthropicTurns: true, + }); + const stream = wrapped({} as never, { messages } as never, {} as never) as + | FakeWrappedStream + | Promise; + await Promise.resolve(stream); + + expect(baseFn).toHaveBeenCalledTimes(1); + const seenContext = baseFn.mock.calls[0]?.[1] as { + messages: Array<{ role?: string; content?: unknown[] }>; + }; + expect(seenContext.messages).toEqual([ + { + role: "user", + content: [ + { type: "text", text: "first" }, + { type: "text", text: "second" }, + ], + }, + ]); + }); +}); + describe("wrapStreamFnRepairMalformedToolCallArguments", () => { async function invokeWrappedStream(baseFn: (...args: never[]) => unknown) { return await invokeWrappedTestStream( diff --git a/src/agents/pi-embedded-runner/run/attempt.ts b/src/agents/pi-embedded-runner/run/attempt.ts index c7c7a728ae7..0ef91481415 100644 --- a/src/agents/pi-embedded-runner/run/attempt.ts +++ b/src/agents/pi-embedded-runner/run/attempt.ts @@ -97,6 +97,7 @@ import { buildSystemPromptReport } from "../../system-prompt-report.js"; import { sanitizeToolCallIdsForCloudCodeAssist } from "../../tool-call-id.js"; import { resolveEffectiveToolFsWorkspaceOnly } from "../../tool-fs-policy.js"; import { normalizeToolName } from "../../tool-policy.js"; +import type { TranscriptPolicy } from "../../transcript-policy.js"; import { resolveTranscriptPolicy } from "../../transcript-policy.js"; import { DEFAULT_BOOTSTRAP_FILENAME } from "../../workspace.js"; import { isRunnerAbortError } from "../abort.js"; @@ -648,6 +649,200 @@ function isToolCallBlockType(type: unknown): boolean { return type === "toolCall" || type === "toolUse" || type === "functionCall"; } +const REPLAY_TOOL_CALL_NAME_MAX_CHARS = 64; + +type ReplayToolCallBlock = { + type?: unknown; + id?: unknown; + name?: unknown; + input?: unknown; + arguments?: unknown; +}; + +type ReplayToolCallSanitizeReport = { + messages: AgentMessage[]; + droppedAssistantMessages: number; +}; + +type AnthropicToolResultContentBlock = { + type?: unknown; + toolUseId?: unknown; +}; + +function isReplayToolCallBlock(block: unknown): block is ReplayToolCallBlock { + if (!block || typeof block !== "object") { + return false; + } + return isToolCallBlockType((block as { type?: unknown }).type); +} + +function replayToolCallHasInput(block: ReplayToolCallBlock): boolean { + const hasInput = "input" in block ? block.input !== undefined && block.input !== null : false; + const hasArguments = + "arguments" in block ? block.arguments !== undefined && block.arguments !== null : false; + return hasInput || hasArguments; +} + +function replayToolCallNonEmptyString(value: unknown): value is string { + return typeof value === "string" && value.trim().length > 0; +} + +function resolveReplayToolCallName( + rawName: string, + rawId: string, + allowedToolNames?: Set, +): string | null { + if (rawName.length > REPLAY_TOOL_CALL_NAME_MAX_CHARS * 2) { + return null; + } + const normalized = normalizeToolCallNameForDispatch(rawName, allowedToolNames, rawId); + const trimmed = normalized.trim(); + if (!trimmed || trimmed.length > REPLAY_TOOL_CALL_NAME_MAX_CHARS || /\s/.test(trimmed)) { + return null; + } + if (!allowedToolNames || allowedToolNames.size === 0) { + return trimmed; + } + return resolveExactAllowedToolName(trimmed, allowedToolNames); +} + +function sanitizeReplayToolCallInputs( + messages: AgentMessage[], + allowedToolNames?: Set, +): ReplayToolCallSanitizeReport { + let changed = false; + let droppedAssistantMessages = 0; + const out: AgentMessage[] = []; + + for (const message of messages) { + if (!message || typeof message !== "object" || message.role !== "assistant") { + out.push(message); + continue; + } + if (!Array.isArray(message.content)) { + out.push(message); + continue; + } + + const nextContent: typeof message.content = []; + let messageChanged = false; + + for (const block of message.content) { + if (!isReplayToolCallBlock(block)) { + nextContent.push(block); + continue; + } + const replayBlock = block as ReplayToolCallBlock; + + if (!replayToolCallHasInput(replayBlock) || !replayToolCallNonEmptyString(replayBlock.id)) { + changed = true; + messageChanged = true; + continue; + } + + const rawName = typeof replayBlock.name === "string" ? replayBlock.name : ""; + const resolvedName = resolveReplayToolCallName(rawName, replayBlock.id, allowedToolNames); + if (!resolvedName) { + changed = true; + messageChanged = true; + continue; + } + + if (replayBlock.name !== resolvedName) { + nextContent.push({ ...(block as object), name: resolvedName } as typeof block); + changed = true; + messageChanged = true; + continue; + } + nextContent.push(block); + } + + if (messageChanged) { + changed = true; + if (nextContent.length > 0) { + out.push({ ...message, content: nextContent }); + } else { + droppedAssistantMessages += 1; + } + continue; + } + + out.push(message); + } + + return { + messages: changed ? out : messages, + droppedAssistantMessages, + }; +} + +function sanitizeAnthropicReplayToolResults(messages: AgentMessage[]): AgentMessage[] { + let changed = false; + const out: AgentMessage[] = []; + + for (let index = 0; index < messages.length; index += 1) { + const message = messages[index]; + if (!message || typeof message !== "object" || message.role !== "user") { + out.push(message); + continue; + } + if (!Array.isArray(message.content)) { + out.push(message); + continue; + } + + const previous = messages[index - 1]; + const validToolUseIds = new Set(); + if (previous && typeof previous === "object" && previous.role === "assistant") { + const previousContent = (previous as { content?: unknown }).content; + if (Array.isArray(previousContent)) { + for (const block of previousContent) { + if (!block || typeof block !== "object") { + continue; + } + const typedBlock = block as { type?: unknown; id?: unknown }; + if (typedBlock.type !== "toolUse" || typeof typedBlock.id !== "string") { + continue; + } + const trimmedId = typedBlock.id.trim(); + if (trimmedId) { + validToolUseIds.add(trimmedId); + } + } + } + } + + const nextContent = message.content.filter((block) => { + if (!block || typeof block !== "object") { + return true; + } + const typedBlock = block as AnthropicToolResultContentBlock; + if (typedBlock.type !== "toolResult" || typeof typedBlock.toolUseId !== "string") { + return true; + } + return validToolUseIds.size > 0 && validToolUseIds.has(typedBlock.toolUseId); + }); + + if (nextContent.length === message.content.length) { + out.push(message); + continue; + } + + changed = true; + if (nextContent.length > 0) { + out.push({ ...message, content: nextContent }); + continue; + } + + out.push({ + ...message, + content: [{ type: "text", text: "[tool results omitted]" }], + } as AgentMessage); + } + + return changed ? out : messages; +} + function normalizeToolCallIdsInMessage(message: unknown): void { if (!message || typeof message !== "object") { return; @@ -796,6 +991,43 @@ export function wrapStreamFnTrimToolCallNames( }; } +export function wrapStreamFnSanitizeMalformedToolCalls( + baseFn: StreamFn, + allowedToolNames?: Set, + transcriptPolicy?: Pick, +): StreamFn { + return (model, context, options) => { + const ctx = context as unknown as { messages?: unknown }; + const messages = ctx?.messages; + if (!Array.isArray(messages)) { + return baseFn(model, context, options); + } + const sanitized = sanitizeReplayToolCallInputs(messages as AgentMessage[], allowedToolNames); + if (sanitized.messages === messages) { + return baseFn(model, context, options); + } + let nextMessages = sanitizeToolUseResultPairing(sanitized.messages, { + preserveErroredAssistantResults: true, + }); + if (transcriptPolicy?.validateAnthropicTurns) { + nextMessages = sanitizeAnthropicReplayToolResults(nextMessages); + } + if (sanitized.droppedAssistantMessages > 0 || transcriptPolicy?.validateAnthropicTurns) { + if (transcriptPolicy?.validateGeminiTurns) { + nextMessages = validateGeminiTurns(nextMessages); + } + if (transcriptPolicy?.validateAnthropicTurns) { + nextMessages = validateAnthropicTurns(nextMessages); + } + } + const nextContext = { + ...(context as unknown as Record), + messages: nextMessages, + } as unknown; + return baseFn(model, nextContext as typeof context, options); + }; +} + function extractBalancedJsonPrefix(raw: string): string | null { let start = 0; while (start < raw.length && /\s/.test(raw[start] ?? "")) { @@ -2100,6 +2332,11 @@ export async function runEmbeddedAttempt( // Some models emit tool names with surrounding whitespace (e.g. " read "). // pi-agent-core dispatches tool calls with exact string matching, so normalize // names on the live response stream before tool execution. + activeSession.agent.streamFn = wrapStreamFnSanitizeMalformedToolCalls( + activeSession.agent.streamFn, + allowedToolNames, + transcriptPolicy, + ); activeSession.agent.streamFn = wrapStreamFnTrimToolCallNames( activeSession.agent.streamFn, allowedToolNames, diff --git a/src/agents/session-transcript-repair.ts b/src/agents/session-transcript-repair.ts index e7ab7db94b3..9455837d930 100644 --- a/src/agents/session-transcript-repair.ts +++ b/src/agents/session-transcript-repair.ts @@ -195,6 +195,10 @@ export type ToolCallInputRepairOptions = { allowedToolNames?: Iterable; }; +export type ToolUseResultPairingOptions = { + preserveErroredAssistantResults?: boolean; +}; + export function stripToolResultDetails(messages: AgentMessage[]): AgentMessage[] { let touched = false; const out: AgentMessage[] = []; @@ -327,8 +331,11 @@ export function sanitizeToolCallInputs( return repairToolCallInputs(messages, options).messages; } -export function sanitizeToolUseResultPairing(messages: AgentMessage[]): AgentMessage[] { - return repairToolUseResultPairing(messages).messages; +export function sanitizeToolUseResultPairing( + messages: AgentMessage[], + options?: ToolUseResultPairingOptions, +): AgentMessage[] { + return repairToolUseResultPairing(messages, options).messages; } export type ToolUseRepairReport = { @@ -339,7 +346,10 @@ export type ToolUseRepairReport = { moved: boolean; }; -export function repairToolUseResultPairing(messages: AgentMessage[]): ToolUseRepairReport { +export function repairToolUseResultPairing( + messages: AgentMessage[], + options?: ToolUseResultPairingOptions, +): ToolUseRepairReport { // Anthropic (and Cloud Code Assist) reject transcripts where assistant tool calls are not // immediately followed by matching tool results. Session files can end up with results // displaced (e.g. after user turns) or duplicated. Repair by: @@ -390,18 +400,6 @@ export function repairToolUseResultPairing(messages: AgentMessage[]): ToolUseRep const assistant = msg as Extract; - // Skip tool call extraction for aborted or errored assistant messages. - // When stopReason is "error" or "aborted", the tool_use blocks may be incomplete - // (e.g., partialJson: true) and should not have synthetic tool_results created. - // Creating synthetic results for incomplete tool calls causes API 400 errors: - // "unexpected tool_use_id found in tool_result blocks" - // See: https://github.com/openclaw/openclaw/issues/4597 - const stopReason = (assistant as { stopReason?: string }).stopReason; - if (stopReason === "error" || stopReason === "aborted") { - out.push(msg); - continue; - } - const toolCalls = extractToolCallsFromAssistant(assistant); if (toolCalls.length === 0) { out.push(msg); @@ -459,6 +457,28 @@ export function repairToolUseResultPairing(messages: AgentMessage[]): ToolUseRep } } + // Aborted/errored assistant turns should never synthesize missing tool results, but + // the replay sanitizer can still legitimately retain real tool results for surviving + // tool calls in the same turn after malformed siblings are dropped. + const stopReason = (assistant as { stopReason?: string }).stopReason; + if (stopReason === "error" || stopReason === "aborted") { + out.push(msg); + if (options?.preserveErroredAssistantResults) { + for (const toolCall of toolCalls) { + const result = spanResultsById.get(toolCall.id); + if (!result) { + continue; + } + pushToolResult(result); + } + } + for (const rem of remainder) { + out.push(rem); + } + i = j - 1; + continue; + } + out.push(msg); if (spanResultsById.size > 0 && remainder.length > 0) { diff --git a/src/plugins/bundled-web-search-registry.ts b/src/bundled-web-search-registry.ts similarity index 56% rename from src/plugins/bundled-web-search-registry.ts rename to src/bundled-web-search-registry.ts index 15c04dd2935..c1f24639556 100644 --- a/src/plugins/bundled-web-search-registry.ts +++ b/src/bundled-web-search-registry.ts @@ -1,11 +1,11 @@ -import bravePlugin from "../../extensions/brave/index.js"; -import firecrawlPlugin from "../../extensions/firecrawl/index.js"; -import googlePlugin from "../../extensions/google/index.js"; -import moonshotPlugin from "../../extensions/moonshot/index.js"; -import perplexityPlugin from "../../extensions/perplexity/index.js"; -import tavilyPlugin from "../../extensions/tavily/index.js"; -import xaiPlugin from "../../extensions/xai/index.js"; -import type { OpenClawPluginApi } from "./types.js"; +import bravePlugin from "../extensions/brave/index.js"; +import firecrawlPlugin from "../extensions/firecrawl/index.js"; +import googlePlugin from "../extensions/google/index.js"; +import moonshotPlugin from "../extensions/moonshot/index.js"; +import perplexityPlugin from "../extensions/perplexity/index.js"; +import tavilyPlugin from "../extensions/tavily/index.js"; +import xaiPlugin from "../extensions/xai/index.js"; +import type { OpenClawPluginApi } from "./plugins/types.js"; type RegistrablePlugin = { id: string; diff --git a/src/plugins/bundled-web-search.ts b/src/plugins/bundled-web-search.ts index 5b709aa00ee..6eb87f431fa 100644 --- a/src/plugins/bundled-web-search.ts +++ b/src/plugins/bundled-web-search.ts @@ -1,4 +1,4 @@ -import { bundledWebSearchPluginRegistrations } from "./bundled-web-search-registry.js"; +import { bundledWebSearchPluginRegistrations } from "../bundled-web-search-registry.js"; import { capturePluginRegistration } from "./captured-registration.js"; import type { PluginLoadOptions } from "./loader.js"; import { loadPluginManifestRegistry } from "./manifest-registry.js"; diff --git a/src/plugins/contracts/registry.ts b/src/plugins/contracts/registry.ts index 98cefe7820c..0a419efebe1 100644 --- a/src/plugins/contracts/registry.ts +++ b/src/plugins/contracts/registry.ts @@ -34,7 +34,7 @@ import volcenginePlugin from "../../../extensions/volcengine/index.js"; import xaiPlugin from "../../../extensions/xai/index.js"; import xiaomiPlugin from "../../../extensions/xiaomi/index.js"; import zaiPlugin from "../../../extensions/zai/index.js"; -import { bundledWebSearchPluginRegistrations } from "../bundled-web-search-registry.js"; +import { bundledWebSearchPluginRegistrations } from "../../bundled-web-search-registry.js"; import { createCapturedPluginRegistration } from "../captured-registration.js"; import { resolvePluginProviders } from "../providers.js"; import type {