diff --git a/CHANGELOG.md b/CHANGELOG.md index 7f0bcd97486..1ffe236664c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -31,6 +31,8 @@ Docs: https://docs.openclaw.ai - Agents/compaction: extend the enclosing run deadline once while compaction is actively in flight, and abort the underlying SDK compaction on timeout/cancel so large-session compactions stop freezing mid-run. (#46889) Thanks @asyncjason. - Models/openai-completions: default non-native OpenAI-compatible providers to omit tool-definition `strict` fields unless users explicitly opt back in, so tool calling keeps working on providers that reject that option. (#45497) Thanks @sahancava. - WhatsApp/reconnect: restore the append recency filter in the extension inbox monitor and handle protobuf `Long` timestamps correctly, so fresh post-reconnect append messages are processed while stale history sync stays suppressed. (#42588) thanks @MonkeyLeeT. +- WhatsApp/login: wait for pending creds writes before reopening after Baileys `515` pairing restarts in both QR login and `channels login` flows, and keep the restart coverage pinned to the real wrapped error shape plus per-account creds queues. (#27910) Thanks @asyncjason. +- Agents/openai-compatible tool calls: deduplicate repeated tool call ids across live assistant messages and replayed history so OpenAI-compatible backends no longer reject duplicate `tool_call_id` values with HTTP 400. (#40996) Thanks @xaeon2026. ### Fixes @@ -42,6 +44,7 @@ Docs: https://docs.openclaw.ai - Email/webhook wrapping: sanitize sender and subject metadata before external-content wrapping so metadata fields cannot break the wrapper structure. Thanks @vincentkoc. - Node/startup: remove leftover debug `console.log("node host PATH: ...")` that printed the resolved PATH on every `openclaw node run` invocation. (#46411) - Telegram/message send: forward `--force-document` through the `sendPayload` path as well as `sendMedia`, so Telegram payload sends with `channelData` keep uploading images as documents instead of silently falling back to compressed photo sends. (#47119) Thanks @thepagent. +- Telegram/message chunking: preserve spaces, paragraph separators, and word boundaries when HTML overflow rechunking splits formatted replies. (#47274) ## 2026.3.13 diff --git a/extensions/telegram/src/format.ts b/extensions/telegram/src/format.ts index 1ccd8f8299b..0c1bec2a62a 100644 --- a/extensions/telegram/src/format.ts +++ b/extensions/telegram/src/format.ts @@ -512,6 +512,146 @@ function sliceLinkSpans( }); } +function sliceMarkdownIR(ir: MarkdownIR, start: number, end: number): MarkdownIR { + return { + text: ir.text.slice(start, end), + styles: sliceStyleSpans(ir.styles, start, end), + links: sliceLinkSpans(ir.links, start, end), + }; +} + +function mergeAdjacentStyleSpans(styles: MarkdownIR["styles"]): MarkdownIR["styles"] { + const merged: MarkdownIR["styles"] = []; + for (const span of styles) { + const last = merged.at(-1); + if (last && last.style === span.style && span.start <= last.end) { + last.end = Math.max(last.end, span.end); + continue; + } + merged.push({ ...span }); + } + return merged; +} + +function mergeAdjacentLinkSpans(links: MarkdownIR["links"]): MarkdownIR["links"] { + const merged: MarkdownIR["links"] = []; + for (const link of links) { + const last = merged.at(-1); + if (last && last.href === link.href && link.start <= last.end) { + last.end = Math.max(last.end, link.end); + continue; + } + merged.push({ ...link }); + } + return merged; +} + +function mergeMarkdownIRChunks(left: MarkdownIR, right: MarkdownIR): MarkdownIR { + const offset = left.text.length; + return { + text: left.text + right.text, + styles: mergeAdjacentStyleSpans([ + ...left.styles, + ...right.styles.map((span) => ({ + ...span, + start: span.start + offset, + end: span.end + offset, + })), + ]), + links: mergeAdjacentLinkSpans([ + ...left.links, + ...right.links.map((link) => ({ + ...link, + start: link.start + offset, + end: link.end + offset, + })), + ]), + }; +} + +function renderTelegramChunkHtml(ir: MarkdownIR): string { + return wrapFileReferencesInHtml(renderTelegramHtml(ir)); +} + +function findMarkdownIRPreservedSplitIndex(text: string, start: number, limit: number): number { + const maxEnd = Math.min(text.length, start + limit); + if (maxEnd >= text.length) { + return text.length; + } + + let lastOutsideParenNewlineBreak = -1; + let lastOutsideParenWhitespaceBreak = -1; + let lastOutsideParenWhitespaceRunStart = -1; + let lastAnyNewlineBreak = -1; + let lastAnyWhitespaceBreak = -1; + let lastAnyWhitespaceRunStart = -1; + let parenDepth = 0; + let sawNonWhitespace = false; + + for (let index = start; index < maxEnd; index += 1) { + const char = text[index]; + if (char === "(") { + sawNonWhitespace = true; + parenDepth += 1; + continue; + } + if (char === ")" && parenDepth > 0) { + sawNonWhitespace = true; + parenDepth -= 1; + continue; + } + if (!/\s/.test(char)) { + sawNonWhitespace = true; + continue; + } + if (!sawNonWhitespace) { + continue; + } + if (char === "\n") { + lastAnyNewlineBreak = index + 1; + if (parenDepth === 0) { + lastOutsideParenNewlineBreak = index + 1; + } + continue; + } + const whitespaceRunStart = + index === start || !/\s/.test(text[index - 1] ?? "") ? index : lastAnyWhitespaceRunStart; + lastAnyWhitespaceBreak = index + 1; + lastAnyWhitespaceRunStart = whitespaceRunStart; + if (parenDepth === 0) { + lastOutsideParenWhitespaceBreak = index + 1; + lastOutsideParenWhitespaceRunStart = whitespaceRunStart; + } + } + + const resolveWhitespaceBreak = (breakIndex: number, runStart: number): number => { + if (breakIndex <= start) { + return breakIndex; + } + if (runStart <= start) { + return breakIndex; + } + return /\s/.test(text[breakIndex] ?? "") ? runStart : breakIndex; + }; + + if (lastOutsideParenNewlineBreak > start) { + return lastOutsideParenNewlineBreak; + } + if (lastOutsideParenWhitespaceBreak > start) { + return resolveWhitespaceBreak( + lastOutsideParenWhitespaceBreak, + lastOutsideParenWhitespaceRunStart, + ); + } + if (lastAnyNewlineBreak > start) { + return lastAnyNewlineBreak; + } + if (lastAnyWhitespaceBreak > start) { + return resolveWhitespaceBreak(lastAnyWhitespaceBreak, lastAnyWhitespaceRunStart); + } + return maxEnd; +} + function splitMarkdownIRPreserveWhitespace(ir: MarkdownIR, limit: number): MarkdownIR[] { if (!ir.text) { return []; @@ -523,7 +663,7 @@ function splitMarkdownIRPreserveWhitespace(ir: MarkdownIR, limit: number): Markd const chunks: MarkdownIR[] = []; let cursor = 0; while (cursor < ir.text.length) { - const end = Math.min(ir.text.length, cursor + normalizedLimit); + const end = findMarkdownIRPreservedSplitIndex(ir.text, cursor, normalizedLimit); chunks.push({ text: ir.text.slice(cursor, end), styles: sliceStyleSpans(ir.styles, cursor, end), @@ -534,32 +674,98 @@ function splitMarkdownIRPreserveWhitespace(ir: MarkdownIR, limit: number): Markd return chunks; } +function coalesceWhitespaceOnlyMarkdownIRChunks(chunks: MarkdownIR[], limit: number): MarkdownIR[] { + const coalesced: MarkdownIR[] = []; + let index = 0; + + while (index < chunks.length) { + const chunk = chunks[index]; + if (!chunk) { + index += 1; + continue; + } + if (chunk.text.trim().length > 0) { + coalesced.push(chunk); + index += 1; + continue; + } + + const prev = coalesced.at(-1); + const next = chunks[index + 1]; + const chunkLength = chunk.text.length; + + const canMergePrev = (candidate: MarkdownIR) => + renderTelegramChunkHtml(candidate).length <= limit; + const canMergeNext = (candidate: MarkdownIR) => + renderTelegramChunkHtml(candidate).length <= limit; + + if (prev) { + const mergedPrev = mergeMarkdownIRChunks(prev, chunk); + if (canMergePrev(mergedPrev)) { + coalesced[coalesced.length - 1] = mergedPrev; + index += 1; + continue; + } + } + + if (next) { + const mergedNext = mergeMarkdownIRChunks(chunk, next); + if (canMergeNext(mergedNext)) { + chunks[index + 1] = mergedNext; + index += 1; + continue; + } + } + + if (prev && next) { + for (let prefixLength = chunkLength - 1; prefixLength >= 1; prefixLength -= 1) { + const prefix = sliceMarkdownIR(chunk, 0, prefixLength); + const suffix = sliceMarkdownIR(chunk, prefixLength, chunkLength); + const mergedPrev = mergeMarkdownIRChunks(prev, prefix); + const mergedNext = mergeMarkdownIRChunks(suffix, next); + if (canMergePrev(mergedPrev) && canMergeNext(mergedNext)) { + coalesced[coalesced.length - 1] = mergedPrev; + chunks[index + 1] = mergedNext; + break; + } + } + } + + index += 1; + } + + return coalesced; +} + function renderTelegramChunksWithinHtmlLimit( ir: MarkdownIR, limit: number, ): TelegramFormattedChunk[] { const normalizedLimit = Math.max(1, Math.floor(limit)); const pending = chunkMarkdownIR(ir, normalizedLimit); - const rendered: TelegramFormattedChunk[] = []; + const finalized: MarkdownIR[] = []; while (pending.length > 0) { const chunk = pending.shift(); if (!chunk) { continue; } - const html = wrapFileReferencesInHtml(renderTelegramHtml(chunk)); + const html = renderTelegramChunkHtml(chunk); if (html.length <= normalizedLimit || chunk.text.length <= 1) { - rendered.push({ html, text: chunk.text }); + finalized.push(chunk); continue; } const split = splitTelegramChunkByHtmlLimit(chunk, normalizedLimit, html.length); if (split.length <= 1) { // Worst-case safety: avoid retry loops, deliver the chunk as-is. - rendered.push({ html, text: chunk.text }); + finalized.push(chunk); continue; } pending.unshift(...split); } - return rendered; + return coalesceWhitespaceOnlyMarkdownIRChunks(finalized, normalizedLimit).map((chunk) => ({ + html: renderTelegramChunkHtml(chunk), + text: chunk.text, + })); } export function markdownToTelegramChunks( diff --git a/extensions/telegram/src/format.wrap-md.test.ts b/extensions/telegram/src/format.wrap-md.test.ts index 9921b669973..de3cab42056 100644 --- a/extensions/telegram/src/format.wrap-md.test.ts +++ b/extensions/telegram/src/format.wrap-md.test.ts @@ -174,6 +174,35 @@ describe("markdownToTelegramChunks - file reference wrapping", () => { expect(chunks.map((chunk) => chunk.text).join("")).toBe(input); expect(chunks.every((chunk) => chunk.html.length <= 5)).toBe(true); }); + + it("prefers word boundaries when html-limit retry splits formatted prose", () => { + const input = "**Which of these**"; + const chunks = markdownToTelegramChunks(input, 16); + expect(chunks.map((chunk) => chunk.text)).toEqual(["Which of ", "these"]); + expect(chunks.every((chunk) => chunk.html.length <= 16)).toBe(true); + }); + + it("falls back to in-paren word boundaries when the parenthesis is unbalanced", () => { + const input = "**foo (bar baz qux quux**"; + const chunks = markdownToTelegramChunks(input, 20); + expect(chunks.map((chunk) => chunk.text)).toEqual(["foo", "(bar baz qux ", "quux"]); + expect(chunks.every((chunk) => chunk.html.length <= 20)).toBe(true); + }); + + it("does not emit whitespace-only chunks during html-limit retry splitting", () => { + const input = "**ab <<**"; + const chunks = markdownToTelegramChunks(input, 11); + expect(chunks.map((chunk) => chunk.text).join("")).toBe("ab <<"); + expect(chunks.every((chunk) => chunk.text.trim().length > 0)).toBe(true); + expect(chunks.every((chunk) => chunk.html.length <= 11)).toBe(true); + }); + + it("preserves paragraph separators when retry chunking produces whitespace-only spans", () => { + const input = "ab\n\n<<"; + const chunks = markdownToTelegramChunks(input, 6); + expect(chunks.map((chunk) => chunk.text).join("")).toBe(input); + expect(chunks.every((chunk) => chunk.html.length <= 6)).toBe(true); + }); }); describe("edge cases", () => { diff --git a/extensions/whatsapp/src/login-qr.test.ts b/extensions/whatsapp/src/login-qr.test.ts index 4b16a289001..48709ceb484 100644 --- a/extensions/whatsapp/src/login-qr.test.ts +++ b/extensions/whatsapp/src/login-qr.test.ts @@ -1,6 +1,11 @@ import { beforeEach, describe, expect, it, vi } from "vitest"; import { startWebLoginWithQr, waitForWebLogin } from "./login-qr.js"; -import { createWaSocket, logoutWeb, waitForWaConnection } from "./session.js"; +import { + createWaSocket, + logoutWeb, + waitForCredsSaveQueueWithTimeout, + waitForWaConnection, +} from "./session.js"; vi.mock("./session.js", () => { const createWaSocket = vi.fn( @@ -17,11 +22,13 @@ vi.mock("./session.js", () => { const getStatusCode = vi.fn( (err: unknown) => (err as { output?: { statusCode?: number } })?.output?.statusCode ?? - (err as { status?: number })?.status, + (err as { status?: number })?.status ?? + (err as { error?: { output?: { statusCode?: number } } })?.error?.output?.statusCode, ); const webAuthExists = vi.fn(async () => false); const readWebSelfId = vi.fn(() => ({ e164: null, jid: null })); const logoutWeb = vi.fn(async () => true); + const waitForCredsSaveQueueWithTimeout = vi.fn(async () => {}); return { createWaSocket, waitForWaConnection, @@ -30,6 +37,7 @@ vi.mock("./session.js", () => { webAuthExists, readWebSelfId, logoutWeb, + waitForCredsSaveQueueWithTimeout, }; }); @@ -39,22 +47,43 @@ vi.mock("./qr-image.js", () => ({ const createWaSocketMock = vi.mocked(createWaSocket); const waitForWaConnectionMock = vi.mocked(waitForWaConnection); +const waitForCredsSaveQueueWithTimeoutMock = vi.mocked(waitForCredsSaveQueueWithTimeout); const logoutWebMock = vi.mocked(logoutWeb); +async function flushTasks() { + await Promise.resolve(); + await Promise.resolve(); +} + describe("login-qr", () => { beforeEach(() => { vi.clearAllMocks(); }); it("restarts login once on status 515 and completes", async () => { + let releaseCredsFlush: (() => void) | undefined; + const credsFlushGate = new Promise((resolve) => { + releaseCredsFlush = resolve; + }); waitForWaConnectionMock - .mockRejectedValueOnce({ output: { statusCode: 515 } }) + // Baileys v7 wraps the error: { error: BoomError(515) } + .mockRejectedValueOnce({ error: { output: { statusCode: 515 } } }) .mockResolvedValueOnce(undefined); + waitForCredsSaveQueueWithTimeoutMock.mockReturnValueOnce(credsFlushGate); const start = await startWebLoginWithQr({ timeoutMs: 5000 }); expect(start.qrDataUrl).toBe("data:image/png;base64,base64"); - const result = await waitForWebLogin({ timeoutMs: 5000 }); + const resultPromise = waitForWebLogin({ timeoutMs: 5000 }); + await flushTasks(); + await flushTasks(); + + expect(createWaSocketMock).toHaveBeenCalledTimes(1); + expect(waitForCredsSaveQueueWithTimeoutMock).toHaveBeenCalledOnce(); + expect(waitForCredsSaveQueueWithTimeoutMock).toHaveBeenCalledWith(expect.any(String)); + + releaseCredsFlush?.(); + const result = await resultPromise; expect(result.connected).toBe(true); expect(createWaSocketMock).toHaveBeenCalledTimes(2); diff --git a/extensions/whatsapp/src/login-qr.ts b/extensions/whatsapp/src/login-qr.ts index a54e3fe56b2..3681d646252 100644 --- a/extensions/whatsapp/src/login-qr.ts +++ b/extensions/whatsapp/src/login-qr.ts @@ -12,6 +12,7 @@ import { getStatusCode, logoutWeb, readWebSelfId, + waitForCredsSaveQueueWithTimeout, waitForWaConnection, webAuthExists, } from "./session.js"; @@ -85,9 +86,10 @@ async function restartLoginSocket(login: ActiveLogin, runtime: RuntimeEnv) { } login.restartAttempted = true; runtime.log( - info("WhatsApp asked for a restart after pairing (code 515); retrying connection once…"), + info("WhatsApp asked for a restart after pairing (code 515); waiting for creds to save…"), ); closeSocket(login.sock); + await waitForCredsSaveQueueWithTimeout(login.authDir); try { const sock = await createWaSocket(false, login.verbose, { authDir: login.authDir, diff --git a/extensions/whatsapp/src/login.coverage.test.ts b/extensions/whatsapp/src/login.coverage.test.ts index 6306228693a..dda665ccdce 100644 --- a/extensions/whatsapp/src/login.coverage.test.ts +++ b/extensions/whatsapp/src/login.coverage.test.ts @@ -4,7 +4,12 @@ import path from "node:path"; import { DisconnectReason } from "@whiskeysockets/baileys"; import { afterEach, beforeEach, describe, expect, it, vi } from "vitest"; import { loginWeb } from "./login.js"; -import { createWaSocket, formatError, waitForWaConnection } from "./session.js"; +import { + createWaSocket, + formatError, + waitForCredsSaveQueueWithTimeout, + waitForWaConnection, +} from "./session.js"; const rmMock = vi.spyOn(fs, "rm"); @@ -35,10 +40,19 @@ vi.mock("./session.js", () => { const createWaSocket = vi.fn(async () => (call++ === 0 ? sockA : sockB)); const waitForWaConnection = vi.fn(); const formatError = vi.fn((err: unknown) => `formatted:${String(err)}`); + const getStatusCode = vi.fn( + (err: unknown) => + (err as { output?: { statusCode?: number } })?.output?.statusCode ?? + (err as { status?: number })?.status ?? + (err as { error?: { output?: { statusCode?: number } } })?.error?.output?.statusCode, + ); + const waitForCredsSaveQueueWithTimeout = vi.fn(async () => {}); return { createWaSocket, waitForWaConnection, formatError, + getStatusCode, + waitForCredsSaveQueueWithTimeout, WA_WEB_AUTH_DIR: authDir, logoutWeb: vi.fn(async (params: { authDir?: string }) => { await fs.rm(params.authDir ?? authDir, { @@ -52,8 +66,14 @@ vi.mock("./session.js", () => { const createWaSocketMock = vi.mocked(createWaSocket); const waitForWaConnectionMock = vi.mocked(waitForWaConnection); +const waitForCredsSaveQueueWithTimeoutMock = vi.mocked(waitForCredsSaveQueueWithTimeout); const formatErrorMock = vi.mocked(formatError); +async function flushTasks() { + await Promise.resolve(); + await Promise.resolve(); +} + describe("loginWeb coverage", () => { beforeEach(() => { vi.useFakeTimers(); @@ -65,12 +85,25 @@ describe("loginWeb coverage", () => { }); it("restarts once when WhatsApp requests code 515", async () => { + let releaseCredsFlush: (() => void) | undefined; + const credsFlushGate = new Promise((resolve) => { + releaseCredsFlush = resolve; + }); waitForWaConnectionMock - .mockRejectedValueOnce({ output: { statusCode: 515 } }) + .mockRejectedValueOnce({ error: { output: { statusCode: 515 } } }) .mockResolvedValueOnce(undefined); + waitForCredsSaveQueueWithTimeoutMock.mockReturnValueOnce(credsFlushGate); const runtime = { log: vi.fn(), error: vi.fn() } as never; - await loginWeb(false, waitForWaConnectionMock as never, runtime); + const pendingLogin = loginWeb(false, waitForWaConnectionMock as never, runtime); + await flushTasks(); + + expect(createWaSocketMock).toHaveBeenCalledTimes(1); + expect(waitForCredsSaveQueueWithTimeoutMock).toHaveBeenCalledOnce(); + expect(waitForCredsSaveQueueWithTimeoutMock).toHaveBeenCalledWith(authDir); + + releaseCredsFlush?.(); + await pendingLogin; expect(createWaSocketMock).toHaveBeenCalledTimes(2); const firstSock = await createWaSocketMock.mock.results[0]?.value; diff --git a/extensions/whatsapp/src/login.ts b/extensions/whatsapp/src/login.ts index 3eae0732c5d..0923a38a122 100644 --- a/extensions/whatsapp/src/login.ts +++ b/extensions/whatsapp/src/login.ts @@ -5,7 +5,14 @@ import { danger, info, success } from "../../../src/globals.js"; import { logInfo } from "../../../src/logger.js"; import { defaultRuntime, type RuntimeEnv } from "../../../src/runtime.js"; import { resolveWhatsAppAccount } from "./accounts.js"; -import { createWaSocket, formatError, logoutWeb, waitForWaConnection } from "./session.js"; +import { + createWaSocket, + formatError, + getStatusCode, + logoutWeb, + waitForCredsSaveQueueWithTimeout, + waitForWaConnection, +} from "./session.js"; export async function loginWeb( verbose: boolean, @@ -24,20 +31,17 @@ export async function loginWeb( await wait(sock); console.log(success("✅ Linked! Credentials saved for future sends.")); } catch (err) { - const code = - (err as { error?: { output?: { statusCode?: number } } })?.error?.output?.statusCode ?? - (err as { output?: { statusCode?: number } })?.output?.statusCode; + const code = getStatusCode(err); if (code === 515) { console.log( - info( - "WhatsApp asked for a restart after pairing (code 515); creds are saved. Restarting connection once…", - ), + info("WhatsApp asked for a restart after pairing (code 515); waiting for creds to save…"), ); try { sock.ws?.close(); } catch { // ignore } + await waitForCredsSaveQueueWithTimeout(account.authDir); const retry = await createWaSocket(false, verbose, { authDir: account.authDir, }); diff --git a/extensions/whatsapp/src/session.test.ts b/extensions/whatsapp/src/session.test.ts index 177c8c8e5e6..d86de75ffa7 100644 --- a/extensions/whatsapp/src/session.test.ts +++ b/extensions/whatsapp/src/session.test.ts @@ -204,6 +204,62 @@ describe("web session", () => { expect(inFlight).toBe(0); }); + it("lets different authDir queues flush independently", async () => { + let inFlightA = 0; + let inFlightB = 0; + let releaseA: (() => void) | null = null; + let releaseB: (() => void) | null = null; + const gateA = new Promise((resolve) => { + releaseA = resolve; + }); + const gateB = new Promise((resolve) => { + releaseB = resolve; + }); + + const saveCredsA = vi.fn(async () => { + inFlightA += 1; + await gateA; + inFlightA -= 1; + }); + const saveCredsB = vi.fn(async () => { + inFlightB += 1; + await gateB; + inFlightB -= 1; + }); + useMultiFileAuthStateMock + .mockResolvedValueOnce({ + state: { creds: {} as never, keys: {} as never }, + saveCreds: saveCredsA, + }) + .mockResolvedValueOnce({ + state: { creds: {} as never, keys: {} as never }, + saveCreds: saveCredsB, + }); + + await createWaSocket(false, false, { authDir: "/tmp/wa-a" }); + const sockA = getLastSocket(); + await createWaSocket(false, false, { authDir: "/tmp/wa-b" }); + const sockB = getLastSocket(); + + sockA.ev.emit("creds.update", {}); + sockB.ev.emit("creds.update", {}); + + await flushCredsUpdate(); + + expect(saveCredsA).toHaveBeenCalledTimes(1); + expect(saveCredsB).toHaveBeenCalledTimes(1); + expect(inFlightA).toBe(1); + expect(inFlightB).toBe(1); + + (releaseA as (() => void) | null)?.(); + (releaseB as (() => void) | null)?.(); + await flushCredsUpdate(); + await flushCredsUpdate(); + + expect(inFlightA).toBe(0); + expect(inFlightB).toBe(0); + }); + it("rotates creds backup when creds.json is valid JSON", async () => { const creds = mockCredsJsonSpies("{}"); const backupSuffix = path.join( diff --git a/extensions/whatsapp/src/session.ts b/extensions/whatsapp/src/session.ts index db48b49c874..8fc7f9fd1fc 100644 --- a/extensions/whatsapp/src/session.ts +++ b/extensions/whatsapp/src/session.ts @@ -31,17 +31,24 @@ export { webAuthExists, } from "./auth-store.js"; -let credsSaveQueue: Promise = Promise.resolve(); +// Per-authDir queues so multi-account creds saves don't block each other. +const credsSaveQueues = new Map>(); +const CREDS_SAVE_FLUSH_TIMEOUT_MS = 15_000; function enqueueSaveCreds( authDir: string, saveCreds: () => Promise | void, logger: ReturnType, ): void { - credsSaveQueue = credsSaveQueue + const prev = credsSaveQueues.get(authDir) ?? Promise.resolve(); + const next = prev .then(() => safeSaveCreds(authDir, saveCreds, logger)) .catch((err) => { logger.warn({ error: String(err) }, "WhatsApp creds save queue error"); + }) + .finally(() => { + if (credsSaveQueues.get(authDir) === next) credsSaveQueues.delete(authDir); }); + credsSaveQueues.set(authDir, next); } async function safeSaveCreds( @@ -186,10 +193,37 @@ export async function waitForWaConnection(sock: ReturnType) export function getStatusCode(err: unknown) { return ( (err as { output?: { statusCode?: number } })?.output?.statusCode ?? - (err as { status?: number })?.status + (err as { status?: number })?.status ?? + (err as { error?: { output?: { statusCode?: number } } })?.error?.output?.statusCode ); } +/** Await pending credential saves — scoped to one authDir, or all if omitted. */ +export function waitForCredsSaveQueue(authDir?: string): Promise { + if (authDir) { + return credsSaveQueues.get(authDir) ?? Promise.resolve(); + } + return Promise.all(credsSaveQueues.values()).then(() => {}); +} + +/** Await pending credential saves, but don't hang forever on stalled I/O. */ +export async function waitForCredsSaveQueueWithTimeout( + authDir: string, + timeoutMs = CREDS_SAVE_FLUSH_TIMEOUT_MS, +): Promise { + let flushTimeout: ReturnType | undefined; + await Promise.race([ + waitForCredsSaveQueue(authDir), + new Promise((resolve) => { + flushTimeout = setTimeout(resolve, timeoutMs); + }), + ]).finally(() => { + if (flushTimeout) { + clearTimeout(flushTimeout); + } + }); +} + function safeStringify(value: unknown, limit = 800): string { try { const seen = new WeakSet(); diff --git a/src/agents/pi-embedded-runner.sanitize-session-history.test.ts b/src/agents/pi-embedded-runner.sanitize-session-history.test.ts index 2003523e03f..438b46bb971 100644 --- a/src/agents/pi-embedded-runner.sanitize-session-history.test.ts +++ b/src/agents/pi-embedded-runner.sanitize-session-history.test.ts @@ -2,6 +2,7 @@ import type { AgentMessage } from "@mariozechner/pi-agent-core"; import type { AssistantMessage, UserMessage, Usage } from "@mariozechner/pi-ai"; import { beforeEach, describe, expect, it, vi } from "vitest"; import { + expectOpenAIResponsesStrictSanitizeCall, loadSanitizeSessionHistoryWithCleanMocks, makeMockSessionManager, makeInMemorySessionManager, @@ -247,7 +248,24 @@ describe("sanitizeSessionHistory", () => { expect(result).toEqual(mockMessages); }); - it("passes simple user-only history through for openai-completions", async () => { + it("sanitizes tool call ids for OpenAI-compatible responses providers", async () => { + setNonGoogleModelApi(); + + await sanitizeSessionHistory({ + messages: mockMessages, + modelApi: "openai-responses", + provider: "custom", + sessionManager: mockSessionManager, + sessionId: TEST_SESSION_ID, + }); + + expectOpenAIResponsesStrictSanitizeCall( + mockedHelpers.sanitizeSessionMessagesImages, + mockMessages, + ); + }); + + it("sanitizes tool call ids for openai-completions", async () => { setNonGoogleModelApi(); const result = await sanitizeSessionHistory({ diff --git a/src/agents/pi-embedded-runner/run/attempt.test.ts b/src/agents/pi-embedded-runner/run/attempt.test.ts index ef88e04ef46..1953099cf7b 100644 --- a/src/agents/pi-embedded-runner/run/attempt.test.ts +++ b/src/agents/pi-embedded-runner/run/attempt.test.ts @@ -702,6 +702,26 @@ describe("wrapStreamFnTrimToolCallNames", () => { expect(finalToolCall.name).toBe("read"); expect(finalToolCall.id).toBe("call_42"); }); + + it("reassigns duplicate tool call ids within a message to unique fallbacks", async () => { + const finalToolCallA = { type: "toolCall", name: " read ", id: " edit:22 " }; + const finalToolCallB = { type: "toolCall", name: " write ", id: "edit:22" }; + const finalMessage = { role: "assistant", content: [finalToolCallA, finalToolCallB] }; + const baseFn = vi.fn(() => + createFakeStream({ + events: [], + resultMessage: finalMessage, + }), + ); + + const stream = await invokeWrappedStream(baseFn); + await stream.result(); + + expect(finalToolCallA.name).toBe("read"); + expect(finalToolCallB.name).toBe("write"); + expect(finalToolCallA.id).toBe("edit:22"); + expect(finalToolCallB.id).toBe("call_auto_1"); + }); }); describe("wrapStreamFnRepairMalformedToolCallArguments", () => { diff --git a/src/agents/pi-embedded-runner/run/attempt.ts b/src/agents/pi-embedded-runner/run/attempt.ts index f34bd8c3628..07ab0627186 100644 --- a/src/agents/pi-embedded-runner/run/attempt.ts +++ b/src/agents/pi-embedded-runner/run/attempt.ts @@ -668,6 +668,7 @@ function normalizeToolCallIdsInMessage(message: unknown): void { } let fallbackIndex = 1; + const assignedIds = new Set(); for (const block of content) { if (!block || typeof block !== "object") { continue; @@ -679,20 +680,23 @@ function normalizeToolCallIdsInMessage(message: unknown): void { if (typeof typedBlock.id === "string") { const trimmedId = typedBlock.id.trim(); if (trimmedId) { - if (typedBlock.id !== trimmedId) { - typedBlock.id = trimmedId; + if (!assignedIds.has(trimmedId)) { + if (typedBlock.id !== trimmedId) { + typedBlock.id = trimmedId; + } + assignedIds.add(trimmedId); + continue; } - usedIds.add(trimmedId); - continue; } } let fallbackId = ""; - while (!fallbackId || usedIds.has(fallbackId)) { + while (!fallbackId || usedIds.has(fallbackId) || assignedIds.has(fallbackId)) { fallbackId = `call_auto_${fallbackIndex++}`; } typedBlock.id = fallbackId; usedIds.add(fallbackId); + assignedIds.add(fallbackId); } } diff --git a/src/agents/tool-call-id.test.ts b/src/agents/tool-call-id.test.ts index dec3d37e9d8..ced9c7ee8a5 100644 --- a/src/agents/tool-call-id.test.ts +++ b/src/agents/tool-call-id.test.ts @@ -29,6 +29,54 @@ const buildDuplicateIdCollisionInput = () => }, ]); +const buildRepeatedRawIdInput = () => + castAgentMessages([ + { + role: "assistant", + content: [ + { type: "toolCall", id: "edit:22", name: "edit", arguments: {} }, + { type: "toolCall", id: "edit:22", name: "edit", arguments: {} }, + ], + }, + { + role: "toolResult", + toolCallId: "edit:22", + toolName: "edit", + content: [{ type: "text", text: "one" }], + }, + { + role: "toolResult", + toolCallId: "edit:22", + toolName: "edit", + content: [{ type: "text", text: "two" }], + }, + ]); + +const buildRepeatedSharedToolResultIdInput = () => + castAgentMessages([ + { + role: "assistant", + content: [ + { type: "toolCall", id: "edit:22", name: "edit", arguments: {} }, + { type: "toolCall", id: "edit:22", name: "edit", arguments: {} }, + ], + }, + { + role: "toolResult", + toolCallId: "edit:22", + toolUseId: "edit:22", + toolName: "edit", + content: [{ type: "text", text: "one" }], + }, + { + role: "toolResult", + toolCallId: "edit:22", + toolUseId: "edit:22", + toolName: "edit", + content: [{ type: "text", text: "two" }], + }, + ]); + function expectCollisionIdsRemainDistinct( out: AgentMessage[], mode: "strict" | "strict9", @@ -111,6 +159,26 @@ describe("sanitizeToolCallIdsForCloudCodeAssist", () => { expectCollisionIdsRemainDistinct(out, "strict"); }); + it("reuses one rewritten id when a tool result carries matching toolCallId and toolUseId", () => { + const input = buildRepeatedSharedToolResultIdInput(); + + const out = sanitizeToolCallIdsForCloudCodeAssist(input); + expect(out).not.toBe(input); + const { aId, bId } = expectCollisionIdsRemainDistinct(out, "strict"); + const r1 = out[1] as Extract & { toolUseId?: string }; + const r2 = out[2] as Extract & { toolUseId?: string }; + expect(r1.toolUseId).toBe(aId); + expect(r2.toolUseId).toBe(bId); + }); + + it("assigns distinct IDs when identical raw tool call ids repeat", () => { + const input = buildRepeatedRawIdInput(); + + const out = sanitizeToolCallIdsForCloudCodeAssist(input); + expect(out).not.toBe(input); + expectCollisionIdsRemainDistinct(out, "strict"); + }); + it("caps tool call IDs at 40 chars while preserving uniqueness", () => { const longA = `call_${"a".repeat(60)}`; const longB = `call_${"a".repeat(59)}b`; @@ -181,6 +249,16 @@ describe("sanitizeToolCallIdsForCloudCodeAssist", () => { expect(aId).not.toMatch(/[_-]/); expect(bId).not.toMatch(/[_-]/); }); + + it("assigns distinct strict IDs when identical raw tool call ids repeat", () => { + const input = buildRepeatedRawIdInput(); + + const out = sanitizeToolCallIdsForCloudCodeAssist(input, "strict"); + expect(out).not.toBe(input); + const { aId, bId } = expectCollisionIdsRemainDistinct(out, "strict"); + expect(aId).not.toMatch(/[_-]/); + expect(bId).not.toMatch(/[_-]/); + }); }); describe("strict9 mode (Mistral tool call IDs)", () => { @@ -231,5 +309,27 @@ describe("sanitizeToolCallIdsForCloudCodeAssist", () => { expect(aId.length).toBe(9); expect(bId.length).toBe(9); }); + + it("assigns distinct strict9 IDs when identical raw tool call ids repeat", () => { + const input = buildRepeatedRawIdInput(); + + const out = sanitizeToolCallIdsForCloudCodeAssist(input, "strict9"); + expect(out).not.toBe(input); + const { aId, bId } = expectCollisionIdsRemainDistinct(out, "strict9"); + expect(aId.length).toBe(9); + expect(bId.length).toBe(9); + }); + + it("reuses one rewritten strict9 id when a tool result carries matching toolCallId and toolUseId", () => { + const input = buildRepeatedSharedToolResultIdInput(); + + const out = sanitizeToolCallIdsForCloudCodeAssist(input, "strict9"); + expect(out).not.toBe(input); + const { aId, bId } = expectCollisionIdsRemainDistinct(out, "strict9"); + const r1 = out[1] as Extract & { toolUseId?: string }; + const r2 = out[2] as Extract & { toolUseId?: string }; + expect(r1.toolUseId).toBe(aId); + expect(r2.toolUseId).toBe(bId); + }); }); }); diff --git a/src/agents/tool-call-id.ts b/src/agents/tool-call-id.ts index e30236e6e82..c7c68994458 100644 --- a/src/agents/tool-call-id.ts +++ b/src/agents/tool-call-id.ts @@ -144,9 +144,55 @@ function makeUniqueToolId(params: { id: string; used: Set; mode: ToolCal return `${candidate.slice(0, MAX_LEN - ts.length)}${ts}`; } +function createOccurrenceAwareResolver(mode: ToolCallIdMode): { + resolveAssistantId: (id: string) => string; + resolveToolResultId: (id: string) => string; +} { + const used = new Set(); + const assistantOccurrences = new Map(); + const orphanToolResultOccurrences = new Map(); + const pendingByRawId = new Map(); + + const allocate = (seed: string): string => { + const next = makeUniqueToolId({ id: seed, used, mode }); + used.add(next); + return next; + }; + + const resolveAssistantId = (id: string): string => { + const occurrence = (assistantOccurrences.get(id) ?? 0) + 1; + assistantOccurrences.set(id, occurrence); + const next = allocate(occurrence === 1 ? id : `${id}:${occurrence}`); + const pending = pendingByRawId.get(id); + if (pending) { + pending.push(next); + } else { + pendingByRawId.set(id, [next]); + } + return next; + }; + + const resolveToolResultId = (id: string): string => { + const pending = pendingByRawId.get(id); + if (pending && pending.length > 0) { + const next = pending.shift()!; + if (pending.length === 0) { + pendingByRawId.delete(id); + } + return next; + } + + const occurrence = (orphanToolResultOccurrences.get(id) ?? 0) + 1; + orphanToolResultOccurrences.set(id, occurrence); + return allocate(`${id}:tool_result:${occurrence}`); + }; + + return { resolveAssistantId, resolveToolResultId }; +} + function rewriteAssistantToolCallIds(params: { message: Extract; - resolve: (id: string) => string; + resolveId: (id: string) => string; }): Extract { const content = params.message.content; if (!Array.isArray(content)) { @@ -168,7 +214,7 @@ function rewriteAssistantToolCallIds(params: { ) { return block; } - const nextId = params.resolve(id); + const nextId = params.resolveId(id); if (nextId === id) { return block; } @@ -184,7 +230,7 @@ function rewriteAssistantToolCallIds(params: { function rewriteToolResultIds(params: { message: Extract; - resolve: (id: string) => string; + resolveId: (id: string) => string; }): Extract { const toolCallId = typeof params.message.toolCallId === "string" && params.message.toolCallId @@ -192,9 +238,14 @@ function rewriteToolResultIds(params: { : undefined; const toolUseId = (params.message as { toolUseId?: unknown }).toolUseId; const toolUseIdStr = typeof toolUseId === "string" && toolUseId ? toolUseId : undefined; + const sharedRawId = + toolCallId && toolUseIdStr && toolCallId === toolUseIdStr ? toolCallId : undefined; - const nextToolCallId = toolCallId ? params.resolve(toolCallId) : undefined; - const nextToolUseId = toolUseIdStr ? params.resolve(toolUseIdStr) : undefined; + const sharedResolvedId = sharedRawId ? params.resolveId(sharedRawId) : undefined; + const nextToolCallId = + sharedResolvedId ?? (toolCallId ? params.resolveId(toolCallId) : undefined); + const nextToolUseId = + sharedResolvedId ?? (toolUseIdStr ? params.resolveId(toolUseIdStr) : undefined); if (nextToolCallId === toolCallId && nextToolUseId === toolUseIdStr) { return params.message; @@ -219,21 +270,11 @@ export function sanitizeToolCallIdsForCloudCodeAssist( ): AgentMessage[] { // Strict mode: only [a-zA-Z0-9] // Strict9 mode: only [a-zA-Z0-9], length 9 (Mistral tool call requirement) - // Sanitization can introduce collisions (e.g. `a|b` and `a:b` -> `ab`). - // Fix by applying a stable, transcript-wide mapping and de-duping via suffix. - const map = new Map(); - const used = new Set(); - - const resolve = (id: string) => { - const existing = map.get(id); - if (existing) { - return existing; - } - const next = makeUniqueToolId({ id, used, mode }); - map.set(id, next); - used.add(next); - return next; - }; + // Sanitization can introduce collisions, and some providers also reject raw + // duplicate tool-call IDs. Track assistant occurrences in-order so repeated + // raw IDs receive distinct rewritten IDs, while matching tool results consume + // the same rewritten IDs in encounter order. + const { resolveAssistantId, resolveToolResultId } = createOccurrenceAwareResolver(mode); let changed = false; const out = messages.map((msg) => { @@ -244,7 +285,7 @@ export function sanitizeToolCallIdsForCloudCodeAssist( if (role === "assistant") { const next = rewriteAssistantToolCallIds({ message: msg as Extract, - resolve, + resolveId: resolveAssistantId, }); if (next !== msg) { changed = true; @@ -254,7 +295,7 @@ export function sanitizeToolCallIdsForCloudCodeAssist( if (role === "toolResult") { const next = rewriteToolResultIds({ message: msg as Extract, - resolve, + resolveId: resolveToolResultId, }); if (next !== msg) { changed = true; diff --git a/src/agents/transcript-policy.ts b/src/agents/transcript-policy.ts index 46795bad1bc..784770f2e28 100644 --- a/src/agents/transcript-policy.ts +++ b/src/agents/transcript-policy.ts @@ -78,7 +78,10 @@ export function resolveTranscriptPolicy(params: { provider, modelId, }); - const requiresOpenAiCompatibleToolIdSanitization = params.modelApi === "openai-completions"; + const requiresOpenAiCompatibleToolIdSanitization = + params.modelApi === "openai-completions" || + (!isOpenAi && + (params.modelApi === "openai-responses" || params.modelApi === "openai-codex-responses")); // Anthropic Claude endpoints can reject replayed `thinking` blocks unless the // original signatures are preserved byte-for-byte. Drop them at send-time to diff --git a/src/gateway/server/ws-connection/connect-policy.test.ts b/src/gateway/server/ws-connection/connect-policy.test.ts index 670f73637ac..a7baa7f73c1 100644 --- a/src/gateway/server/ws-connection/connect-policy.test.ts +++ b/src/gateway/server/ws-connection/connect-policy.test.ts @@ -226,6 +226,30 @@ describe("ws connect policy", () => { expect(shouldSkipControlUiPairing(strict, "operator", true)).toBe(true); }); + test("auth.mode=none skips pairing for operator control-ui only", () => { + const controlUi = resolveControlUiAuthPolicy({ + isControlUi: true, + controlUiConfig: undefined, + deviceRaw: null, + }); + const nonControlUi = resolveControlUiAuthPolicy({ + isControlUi: false, + controlUiConfig: undefined, + deviceRaw: null, + }); + // Control UI + operator + auth.mode=none: skip pairing (the fix for #42931) + expect(shouldSkipControlUiPairing(controlUi, "operator", false, "none")).toBe(true); + // Control UI + node role + auth.mode=none: still require pairing + expect(shouldSkipControlUiPairing(controlUi, "node", false, "none")).toBe(false); + // Non-Control-UI + operator + auth.mode=none: still require pairing + // (prevents #43478 regression where ALL clients bypassed pairing) + expect(shouldSkipControlUiPairing(nonControlUi, "operator", false, "none")).toBe(false); + // Control UI + operator + auth.mode=shared-key: no change + expect(shouldSkipControlUiPairing(controlUi, "operator", false, "shared-key")).toBe(false); + // Control UI + operator + no authMode: no change + expect(shouldSkipControlUiPairing(controlUi, "operator", false)).toBe(false); + }); + test("trusted-proxy control-ui bypass only applies to operator + trusted-proxy auth", () => { const cases: Array<{ role: "operator" | "node"; diff --git a/src/gateway/server/ws-connection/connect-policy.ts b/src/gateway/server/ws-connection/connect-policy.ts index c5c4c1d0a07..caf4551a714 100644 --- a/src/gateway/server/ws-connection/connect-policy.ts +++ b/src/gateway/server/ws-connection/connect-policy.ts @@ -3,6 +3,7 @@ import type { GatewayRole } from "../../role-policy.js"; import { roleCanSkipDeviceIdentity } from "../../role-policy.js"; export type ControlUiAuthPolicy = { + isControlUi: boolean; allowInsecureAuthConfigured: boolean; dangerouslyDisableDeviceAuth: boolean; allowBypass: boolean; @@ -24,6 +25,7 @@ export function resolveControlUiAuthPolicy(params: { const dangerouslyDisableDeviceAuth = params.isControlUi && params.controlUiConfig?.dangerouslyDisableDeviceAuth === true; return { + isControlUi: params.isControlUi, allowInsecureAuthConfigured, dangerouslyDisableDeviceAuth, // `allowInsecureAuth` must not bypass secure-context/device-auth requirements. @@ -36,10 +38,21 @@ export function shouldSkipControlUiPairing( policy: ControlUiAuthPolicy, role: GatewayRole, trustedProxyAuthOk = false, + authMode?: string, ): boolean { if (trustedProxyAuthOk) { return true; } + // When auth is completely disabled (mode=none), there is no shared secret + // or token to gate pairing. Requiring pairing in this configuration adds + // friction without security value since any client can already connect + // without credentials. Guard with policy.isControlUi because this function + // is called for ALL clients (not just Control UI) at the call site. + // Scope to operator role so node-role sessions still need device identity + // (#43478 was reverted for skipping ALL clients). + if (policy.isControlUi && role === "operator" && authMode === "none") { + return true; + } // dangerouslyDisableDeviceAuth is the break-glass path for Control UI // operators. Keep pairing aligned with the missing-device bypass, including // open-auth deployments where there is no shared token/password to prove. diff --git a/src/gateway/server/ws-connection/message-handler.ts b/src/gateway/server/ws-connection/message-handler.ts index e0116190009..f7eec2153ad 100644 --- a/src/gateway/server/ws-connection/message-handler.ts +++ b/src/gateway/server/ws-connection/message-handler.ts @@ -681,7 +681,13 @@ export function attachGatewayWsMessageHandler(params: { hasBrowserOriginHeader, sharedAuthOk, authMethod, - }) || shouldSkipControlUiPairing(controlUiAuthPolicy, role, trustedProxyAuthOk); + }) || + shouldSkipControlUiPairing( + controlUiAuthPolicy, + role, + trustedProxyAuthOk, + resolvedAuth.mode, + ); if (device && devicePublicKey && !skipPairing) { const formatAuditList = (items: string[] | undefined): string => { if (!items || items.length === 0) {