taw0002 738d6b6b52 fix: strip thinking/redacted_thinking blocks from non-latest assistant messages for Anthropic
Fixes #39887

When compaction or session serialization modifies thinking/redacted_thinking
content blocks in assistant messages, the Anthropic API rejects the next
request because these blocks must remain byte-identical to the original
response in the latest assistant message.

Changes:

1. **dropThinkingBlocks**: Now strips both `type: "thinking"` and
   `type: "redacted_thinking"` blocks (previously only handled
   `thinking`).

2. **stripThinkingFromNonLatestAssistant** (new): Strips thinking and
   redacted_thinking blocks from all assistant messages EXCEPT the latest
   one. Anthropic allows omitting these blocks from non-latest messages
   but requires the latest message's blocks to be unmodified. This
   prevents compaction-corrupted blocks from reaching the API.

3. **sanitizeSessionHistory**: For Anthropic providers, applies
   stripThinkingFromNonLatestAssistant before other sanitization steps.
   This ensures that even if compaction modifies thinking blocks in older
   messages, they are cleanly removed before the API call.

The fix is defensive: rather than trying to preserve exact byte
representation through compaction (which crosses the pi-agent-core SDK
boundary), we strip blocks that Anthropic doesn't require from older
messages and preserve only the latest assistant's blocks which haven't
been through compaction yet.
2026-03-08 10:08:12 -04:00

626 lines
21 KiB
TypeScript

import { EventEmitter } from "node:events";
import type { AgentMessage, AgentTool } from "@mariozechner/pi-agent-core";
import type { SessionManager } from "@mariozechner/pi-coding-agent";
import type { TSchema } from "@sinclair/typebox";
import type { OpenClawConfig } from "../../config/config.js";
import { registerUnhandledRejectionHandler } from "../../infra/unhandled-rejections.js";
import {
hasInterSessionUserProvenance,
normalizeInputProvenance,
} from "../../sessions/input-provenance.js";
import { resolveImageSanitizationLimits } from "../image-sanitization.js";
import {
downgradeOpenAIFunctionCallReasoningPairs,
downgradeOpenAIReasoningBlocks,
isCompactionFailureError,
isGoogleModelApi,
sanitizeGoogleTurnOrdering,
sanitizeSessionMessagesImages,
} from "../pi-embedded-helpers.js";
import { cleanToolSchemaForGemini } from "../pi-tools.schema.js";
import {
sanitizeToolCallInputs,
stripToolResultDetails,
sanitizeToolUseResultPairing,
} from "../session-transcript-repair.js";
import type { TranscriptPolicy } from "../transcript-policy.js";
import { resolveTranscriptPolicy } from "../transcript-policy.js";
import {
makeZeroUsageSnapshot,
normalizeUsage,
type AssistantUsageSnapshot,
type UsageLike,
} from "../usage.js";
import { log } from "./logger.js";
import { dropThinkingBlocks, stripThinkingFromNonLatestAssistant } from "./thinking.js";
import { describeUnknownError } from "./utils.js";
const GOOGLE_TURN_ORDERING_CUSTOM_TYPE = "google-turn-ordering-bootstrap";
const GOOGLE_SCHEMA_UNSUPPORTED_KEYWORDS = new Set([
"patternProperties",
"additionalProperties",
"$schema",
"$id",
"$ref",
"$defs",
"definitions",
"examples",
"minLength",
"maxLength",
"minimum",
"maximum",
"multipleOf",
"pattern",
"format",
"minItems",
"maxItems",
"uniqueItems",
"minProperties",
"maxProperties",
]);
const INTER_SESSION_PREFIX_BASE = "[Inter-session message]";
function buildInterSessionPrefix(message: AgentMessage): string {
const provenance = normalizeInputProvenance((message as { provenance?: unknown }).provenance);
if (!provenance) {
return INTER_SESSION_PREFIX_BASE;
}
const details = [
provenance.sourceSessionKey ? `sourceSession=${provenance.sourceSessionKey}` : undefined,
provenance.sourceChannel ? `sourceChannel=${provenance.sourceChannel}` : undefined,
provenance.sourceTool ? `sourceTool=${provenance.sourceTool}` : undefined,
].filter(Boolean);
if (details.length === 0) {
return INTER_SESSION_PREFIX_BASE;
}
return `${INTER_SESSION_PREFIX_BASE} ${details.join(" ")}`;
}
function annotateInterSessionUserMessages(messages: AgentMessage[]): AgentMessage[] {
let touched = false;
const out: AgentMessage[] = [];
for (const msg of messages) {
if (!hasInterSessionUserProvenance(msg as { role?: unknown; provenance?: unknown })) {
out.push(msg);
continue;
}
const prefix = buildInterSessionPrefix(msg);
const user = msg as Extract<AgentMessage, { role: "user" }>;
if (typeof user.content === "string") {
if (user.content.startsWith(prefix)) {
out.push(msg);
continue;
}
touched = true;
out.push({
...(msg as unknown as Record<string, unknown>),
content: `${prefix}\n${user.content}`,
} as AgentMessage);
continue;
}
if (!Array.isArray(user.content)) {
out.push(msg);
continue;
}
const textIndex = user.content.findIndex(
(block) =>
block &&
typeof block === "object" &&
(block as { type?: unknown }).type === "text" &&
typeof (block as { text?: unknown }).text === "string",
);
if (textIndex >= 0) {
const existing = user.content[textIndex] as { type: "text"; text: string };
if (existing.text.startsWith(prefix)) {
out.push(msg);
continue;
}
const nextContent = [...user.content];
nextContent[textIndex] = {
...existing,
text: `${prefix}\n${existing.text}`,
};
touched = true;
out.push({
...(msg as unknown as Record<string, unknown>),
content: nextContent,
} as AgentMessage);
continue;
}
touched = true;
out.push({
...(msg as unknown as Record<string, unknown>),
content: [{ type: "text", text: prefix }, ...user.content],
} as AgentMessage);
}
return touched ? out : messages;
}
function parseMessageTimestamp(value: unknown): number | null {
if (typeof value === "number" && Number.isFinite(value)) {
return value;
}
if (typeof value === "string") {
const parsed = Date.parse(value);
if (Number.isFinite(parsed)) {
return parsed;
}
}
return null;
}
function stripStaleAssistantUsageBeforeLatestCompaction(messages: AgentMessage[]): AgentMessage[] {
let latestCompactionSummaryIndex = -1;
let latestCompactionTimestamp: number | null = null;
for (let i = 0; i < messages.length; i += 1) {
const entry = messages[i];
if (entry?.role !== "compactionSummary") {
continue;
}
latestCompactionSummaryIndex = i;
latestCompactionTimestamp = parseMessageTimestamp(
(entry as { timestamp?: unknown }).timestamp ?? null,
);
}
if (latestCompactionSummaryIndex === -1) {
return messages;
}
const out = [...messages];
let touched = false;
for (let i = 0; i < out.length; i += 1) {
const candidate = out[i] as
| (AgentMessage & { usage?: unknown; timestamp?: unknown })
| undefined;
if (!candidate || candidate.role !== "assistant") {
continue;
}
if (!candidate.usage || typeof candidate.usage !== "object") {
continue;
}
const messageTimestamp = parseMessageTimestamp(candidate.timestamp);
const staleByTimestamp =
latestCompactionTimestamp !== null &&
messageTimestamp !== null &&
messageTimestamp <= latestCompactionTimestamp;
const staleByLegacyOrdering = i < latestCompactionSummaryIndex;
if (!staleByTimestamp && !staleByLegacyOrdering) {
continue;
}
// pi-coding-agent expects assistant usage to always be present during context
// accounting. Keep stale snapshots structurally valid, but zeroed out.
const candidateRecord = candidate as unknown as Record<string, unknown>;
out[i] = {
...candidateRecord,
usage: makeZeroUsageSnapshot(),
} as unknown as AgentMessage;
touched = true;
}
return touched ? out : messages;
}
function normalizeAssistantUsageSnapshot(usage: unknown) {
const normalized = normalizeUsage((usage ?? undefined) as UsageLike | undefined);
if (!normalized) {
return makeZeroUsageSnapshot();
}
const input = normalized.input ?? 0;
const output = normalized.output ?? 0;
const cacheRead = normalized.cacheRead ?? 0;
const cacheWrite = normalized.cacheWrite ?? 0;
const totalTokens = normalized.total ?? input + output + cacheRead + cacheWrite;
const cost = normalizeAssistantUsageCost(usage);
return {
input,
output,
cacheRead,
cacheWrite,
totalTokens,
...(cost ? { cost } : {}),
};
}
function normalizeAssistantUsageCost(usage: unknown): AssistantUsageSnapshot["cost"] | undefined {
const base = makeZeroUsageSnapshot().cost;
if (!usage || typeof usage !== "object") {
return undefined;
}
const rawCost = (usage as { cost?: unknown }).cost;
if (!rawCost || typeof rawCost !== "object") {
return undefined;
}
const cost = rawCost as Record<string, unknown>;
const inputRaw = toFiniteCostNumber(cost.input);
const outputRaw = toFiniteCostNumber(cost.output);
const cacheReadRaw = toFiniteCostNumber(cost.cacheRead);
const cacheWriteRaw = toFiniteCostNumber(cost.cacheWrite);
const totalRaw = toFiniteCostNumber(cost.total);
if (
inputRaw === undefined &&
outputRaw === undefined &&
cacheReadRaw === undefined &&
cacheWriteRaw === undefined &&
totalRaw === undefined
) {
return undefined;
}
const input = inputRaw ?? base.input;
const output = outputRaw ?? base.output;
const cacheRead = cacheReadRaw ?? base.cacheRead;
const cacheWrite = cacheWriteRaw ?? base.cacheWrite;
const total = totalRaw ?? input + output + cacheRead + cacheWrite;
return { input, output, cacheRead, cacheWrite, total };
}
function toFiniteCostNumber(value: unknown): number | undefined {
return typeof value === "number" && Number.isFinite(value) ? value : undefined;
}
function ensureAssistantUsageSnapshots(messages: AgentMessage[]): AgentMessage[] {
if (messages.length === 0) {
return messages;
}
let touched = false;
const out = [...messages];
for (let i = 0; i < out.length; i += 1) {
const message = out[i] as (AgentMessage & { role?: unknown; usage?: unknown }) | undefined;
if (!message || message.role !== "assistant") {
continue;
}
const normalizedUsage = normalizeAssistantUsageSnapshot(message.usage);
const usageCost =
message.usage && typeof message.usage === "object"
? (message.usage as { cost?: unknown }).cost
: undefined;
const normalizedCost = normalizedUsage.cost;
if (
message.usage &&
typeof message.usage === "object" &&
(message.usage as { input?: unknown }).input === normalizedUsage.input &&
(message.usage as { output?: unknown }).output === normalizedUsage.output &&
(message.usage as { cacheRead?: unknown }).cacheRead === normalizedUsage.cacheRead &&
(message.usage as { cacheWrite?: unknown }).cacheWrite === normalizedUsage.cacheWrite &&
(message.usage as { totalTokens?: unknown }).totalTokens === normalizedUsage.totalTokens &&
((normalizedCost &&
usageCost &&
typeof usageCost === "object" &&
(usageCost as { input?: unknown }).input === normalizedCost.input &&
(usageCost as { output?: unknown }).output === normalizedCost.output &&
(usageCost as { cacheRead?: unknown }).cacheRead === normalizedCost.cacheRead &&
(usageCost as { cacheWrite?: unknown }).cacheWrite === normalizedCost.cacheWrite &&
(usageCost as { total?: unknown }).total === normalizedCost.total) ||
(!normalizedCost && usageCost === undefined))
) {
continue;
}
out[i] = {
...(message as unknown as Record<string, unknown>),
usage: normalizedUsage,
} as AgentMessage;
touched = true;
}
return touched ? out : messages;
}
export function findUnsupportedSchemaKeywords(schema: unknown, path: string): string[] {
if (!schema || typeof schema !== "object") {
return [];
}
if (Array.isArray(schema)) {
return schema.flatMap((item, index) =>
findUnsupportedSchemaKeywords(item, `${path}[${index}]`),
);
}
const record = schema as Record<string, unknown>;
const violations: string[] = [];
const properties =
record.properties && typeof record.properties === "object" && !Array.isArray(record.properties)
? (record.properties as Record<string, unknown>)
: undefined;
if (properties) {
for (const [key, value] of Object.entries(properties)) {
violations.push(...findUnsupportedSchemaKeywords(value, `${path}.properties.${key}`));
}
}
for (const [key, value] of Object.entries(record)) {
if (key === "properties") {
continue;
}
if (GOOGLE_SCHEMA_UNSUPPORTED_KEYWORDS.has(key)) {
violations.push(`${path}.${key}`);
}
if (value && typeof value === "object") {
violations.push(...findUnsupportedSchemaKeywords(value, `${path}.${key}`));
}
}
return violations;
}
export function sanitizeToolsForGoogle<
TSchemaType extends TSchema = TSchema,
TResult = unknown,
>(params: {
tools: AgentTool<TSchemaType, TResult>[];
provider: string;
}): AgentTool<TSchemaType, TResult>[] {
// Cloud Code Assist uses the OpenAPI 3.03 `parameters` field for both Gemini
// AND Claude models. This field does not support JSON Schema keywords such as
// patternProperties, additionalProperties, $ref, etc. We must clean schemas
// for every provider that routes through this path.
if (params.provider !== "google-gemini-cli") {
return params.tools;
}
return params.tools.map((tool) => {
if (!tool.parameters || typeof tool.parameters !== "object") {
return tool;
}
return {
...tool,
parameters: cleanToolSchemaForGemini(
tool.parameters as Record<string, unknown>,
) as TSchemaType,
};
});
}
export function logToolSchemasForGoogle(params: { tools: AgentTool[]; provider: string }) {
if (params.provider !== "google-gemini-cli") {
return;
}
const toolNames = params.tools.map((tool, index) => `${index}:${tool.name}`);
const tools = sanitizeToolsForGoogle(params);
log.info("google tool schema snapshot", {
provider: params.provider,
toolCount: tools.length,
tools: toolNames,
});
for (const [index, tool] of tools.entries()) {
const violations = findUnsupportedSchemaKeywords(tool.parameters, `${tool.name}.parameters`);
if (violations.length > 0) {
log.warn("google tool schema has unsupported keywords", {
index,
tool: tool.name,
violations: violations.slice(0, 12),
violationCount: violations.length,
});
}
}
}
// Event emitter for unhandled compaction failures that escape try-catch blocks.
// Listeners can use this to trigger session recovery with retry.
const compactionFailureEmitter = new EventEmitter();
export type CompactionFailureListener = (reason: string) => void;
/**
* Register a listener for unhandled compaction failures.
* Called when auto-compaction fails in a way that escapes the normal try-catch,
* e.g., when the summarization request itself exceeds the model's token limit.
* Returns an unsubscribe function.
*/
export function onUnhandledCompactionFailure(cb: CompactionFailureListener): () => void {
compactionFailureEmitter.on("failure", cb);
return () => compactionFailureEmitter.off("failure", cb);
}
registerUnhandledRejectionHandler((reason) => {
const message = describeUnknownError(reason);
if (!isCompactionFailureError(message)) {
return false;
}
log.error(`Auto-compaction failed (unhandled): ${message}`);
compactionFailureEmitter.emit("failure", message);
return true;
});
type CustomEntryLike = { type?: unknown; customType?: unknown; data?: unknown };
type ModelSnapshotEntry = {
timestamp: number;
provider?: string;
modelApi?: string | null;
modelId?: string;
};
const MODEL_SNAPSHOT_CUSTOM_TYPE = "model-snapshot";
function readLastModelSnapshot(sessionManager: SessionManager): ModelSnapshotEntry | null {
try {
const entries = sessionManager.getEntries();
for (let i = entries.length - 1; i >= 0; i--) {
const entry = entries[i] as CustomEntryLike;
if (entry?.type !== "custom" || entry?.customType !== MODEL_SNAPSHOT_CUSTOM_TYPE) {
continue;
}
const data = entry?.data as ModelSnapshotEntry | undefined;
if (data && typeof data === "object") {
return data;
}
}
} catch {
return null;
}
return null;
}
function appendModelSnapshot(sessionManager: SessionManager, data: ModelSnapshotEntry): void {
try {
sessionManager.appendCustomEntry(MODEL_SNAPSHOT_CUSTOM_TYPE, data);
} catch {
// ignore persistence failures
}
}
function isSameModelSnapshot(a: ModelSnapshotEntry, b: ModelSnapshotEntry): boolean {
const normalize = (value?: string | null) => value ?? "";
return (
normalize(a.provider) === normalize(b.provider) &&
normalize(a.modelApi) === normalize(b.modelApi) &&
normalize(a.modelId) === normalize(b.modelId)
);
}
function hasGoogleTurnOrderingMarker(sessionManager: SessionManager): boolean {
try {
return sessionManager
.getEntries()
.some(
(entry) =>
(entry as CustomEntryLike)?.type === "custom" &&
(entry as CustomEntryLike)?.customType === GOOGLE_TURN_ORDERING_CUSTOM_TYPE,
);
} catch {
return false;
}
}
function markGoogleTurnOrderingMarker(sessionManager: SessionManager): void {
try {
sessionManager.appendCustomEntry(GOOGLE_TURN_ORDERING_CUSTOM_TYPE, {
timestamp: Date.now(),
});
} catch {
// ignore marker persistence failures
}
}
export function applyGoogleTurnOrderingFix(params: {
messages: AgentMessage[];
modelApi?: string | null;
sessionManager: SessionManager;
sessionId: string;
warn?: (message: string) => void;
}): { messages: AgentMessage[]; didPrepend: boolean } {
if (!isGoogleModelApi(params.modelApi)) {
return { messages: params.messages, didPrepend: false };
}
const first = params.messages[0] as { role?: unknown; content?: unknown } | undefined;
if (first?.role !== "assistant") {
return { messages: params.messages, didPrepend: false };
}
const sanitized = sanitizeGoogleTurnOrdering(params.messages);
const didPrepend = sanitized !== params.messages;
if (didPrepend && !hasGoogleTurnOrderingMarker(params.sessionManager)) {
const warn = params.warn ?? ((message: string) => log.warn(message));
warn(`google turn ordering fixup: prepended user bootstrap (sessionId=${params.sessionId})`);
markGoogleTurnOrderingMarker(params.sessionManager);
}
return { messages: sanitized, didPrepend };
}
export async function sanitizeSessionHistory(params: {
messages: AgentMessage[];
modelApi?: string | null;
modelId?: string;
provider?: string;
allowedToolNames?: Iterable<string>;
config?: OpenClawConfig;
sessionManager: SessionManager;
sessionId: string;
policy?: TranscriptPolicy;
}): Promise<AgentMessage[]> {
// Keep docs/reference/transcript-hygiene.md in sync with any logic changes here.
const policy =
params.policy ??
resolveTranscriptPolicy({
modelApi: params.modelApi,
provider: params.provider,
modelId: params.modelId,
});
const withInterSessionMarkers = annotateInterSessionUserMessages(params.messages);
const sanitizedImages = await sanitizeSessionMessagesImages(
withInterSessionMarkers,
"session:history",
{
sanitizeMode: policy.sanitizeMode,
sanitizeToolCallIds: policy.sanitizeToolCallIds,
toolCallIdMode: policy.toolCallIdMode,
preserveSignatures: policy.preserveSignatures,
sanitizeThoughtSignatures: policy.sanitizeThoughtSignatures,
...resolveImageSanitizationLimits(params.config),
},
);
// For Anthropic models, strip thinking/redacted_thinking blocks from all
// non-latest assistant messages. Anthropic requires these blocks to be
// byte-identical to the original response in the latest assistant message,
// but allows omitting them from older messages. Compaction and session
// serialization can corrupt these blocks, causing API rejections.
const isAnthropicProvider =
params.modelApi === "anthropic-messages" ||
params.modelApi === "bedrock-converse-stream" ||
(params.provider ?? "").toLowerCase() === "anthropic" ||
(params.provider ?? "").toLowerCase() === "amazon-bedrock";
const strippedNonLatestThinking = isAnthropicProvider
? stripThinkingFromNonLatestAssistant(sanitizedImages)
: sanitizedImages;
const droppedThinking = policy.dropThinkingBlocks
? dropThinkingBlocks(strippedNonLatestThinking)
: strippedNonLatestThinking;
const sanitizedToolCalls = sanitizeToolCallInputs(droppedThinking, {
allowedToolNames: params.allowedToolNames,
});
const repairedTools = policy.repairToolUseResultPairing
? sanitizeToolUseResultPairing(sanitizedToolCalls)
: sanitizedToolCalls;
const sanitizedToolResults = stripToolResultDetails(repairedTools);
const sanitizedCompactionUsage = ensureAssistantUsageSnapshots(
stripStaleAssistantUsageBeforeLatestCompaction(sanitizedToolResults),
);
const isOpenAIResponsesApi =
params.modelApi === "openai-responses" || params.modelApi === "openai-codex-responses";
const hasSnapshot = Boolean(params.provider || params.modelApi || params.modelId);
const priorSnapshot = hasSnapshot ? readLastModelSnapshot(params.sessionManager) : null;
const modelChanged = priorSnapshot
? !isSameModelSnapshot(priorSnapshot, {
timestamp: 0,
provider: params.provider,
modelApi: params.modelApi,
modelId: params.modelId,
})
: false;
const sanitizedOpenAI = isOpenAIResponsesApi
? downgradeOpenAIFunctionCallReasoningPairs(
downgradeOpenAIReasoningBlocks(sanitizedCompactionUsage),
)
: sanitizedCompactionUsage;
if (hasSnapshot && (!priorSnapshot || modelChanged)) {
appendModelSnapshot(params.sessionManager, {
timestamp: Date.now(),
provider: params.provider,
modelApi: params.modelApi,
modelId: params.modelId,
});
}
if (!policy.applyGoogleTurnOrdering) {
return sanitizedOpenAI;
}
// Google models use the full wrapper with logging and session markers.
if (isGoogleModelApi(params.modelApi)) {
return applyGoogleTurnOrderingFix({
messages: sanitizedOpenAI,
modelApi: params.modelApi,
sessionManager: params.sessionManager,
sessionId: params.sessionId,
}).messages;
}
// Strict OpenAI-compatible providers (vLLM, Gemma, etc.) also reject
// conversations that start with an assistant turn (e.g. delivery-mirror
// messages after /new). Apply the same ordering fix without the
// Google-specific session markers. See #38962.
return sanitizeGoogleTurnOrdering(sanitizedOpenAI);
}