From eb59b9c19d426296ef05f36a6b3c8ec10ca135a0 Mon Sep 17 00:00:00 2001 From: Joey Krug Date: Sun, 15 Mar 2026 17:34:56 -0400 Subject: [PATCH] fix: trailing body match and RawBody-missing extraction detection (#46454) --- src/auto-reply/reply/followup-media.ts | 171 ++++++++++++------- src/auto-reply/reply/followup-runner.test.ts | 158 +++++++++++++++++ src/auto-reply/reply/queue/drain.ts | 9 +- 3 files changed, 272 insertions(+), 66 deletions(-) diff --git a/src/auto-reply/reply/followup-media.ts b/src/auto-reply/reply/followup-media.ts index f40ec91f109..425bdd601ea 100644 --- a/src/auto-reply/reply/followup-media.ts +++ b/src/auto-reply/reply/followup-media.ts @@ -1,3 +1,4 @@ +import path from "node:path"; import { logVerbose } from "../../globals.js"; import { applyMediaUnderstanding } from "../../media-understanding/apply.js"; import { @@ -40,30 +41,6 @@ function stripLeadingMediaReplyHint(prompt: string): string { return prompt.trim(); } -function replaceLastOccurrence( - value: string, - search: string, - replacement: string, -): string | undefined { - if (!search) { - return undefined; - } - const index = value.lastIndexOf(search); - if (index < 0) { - return undefined; - } - return `${value.slice(0, index)}${replacement}${value.slice(index + search.length)}`; -} - -function findFirstOccurrenceBeforeFileBlocks(value: string, search: string): number { - if (!search) { - return -1; - } - const fileBlockIndex = value.search(FILE_BLOCK_RE); - const bodyRegion = fileBlockIndex >= 0 ? value.slice(0, fileBlockIndex) : value; - return bodyRegion.indexOf(search); -} - function findLastOccurrenceBeforeFileBlocks(value: string, search: string): number { if (!search) { return -1; @@ -98,21 +75,81 @@ function replaceLastOccurrenceBeforeFileBlocks( return `${value.slice(0, index)}${replacement}${value.slice(index + search.length)}`; } -function replaceFirstOccurrenceBeforeFileBlocks( +function findTrailingReplacementTargetBeforeFileBlocks( + value: string, + targets: string[], +): { index: number; target: string } | undefined { + let bestMatch: { index: number; target: string } | undefined; + for (const target of targets) { + const index = findLastOccurrenceBeforeFileBlocks(value, target); + if (index < 0) { + continue; + } + if (!bestMatch || index > bestMatch.index) { + bestMatch = { index, target }; + } + } + return bestMatch; +} + +function replaceOccurrenceAtIndex( value: string, search: string, replacement: string, -): string | undefined { - if (!search) { - return undefined; - } - const index = findFirstOccurrenceBeforeFileBlocks(value, search); - if (index < 0) { - return undefined; - } + index: number, +): string { return `${value.slice(0, index)}${replacement}${value.slice(index + search.length)}`; } +function decodeXmlAttr(value: string): string { + return value + .replace(/"/g, '"') + .replace(/'/g, "'") + .replace(/</g, "<") + .replace(/>/g, ">") + .replace(/&/g, "&"); +} + +function extractAttachmentFileName(value?: string): string | undefined { + const trimmed = value?.trim(); + if (!trimmed) { + return undefined; + } + if (/^[a-zA-Z][a-zA-Z\d+.-]*:/.test(trimmed)) { + try { + const pathname = new URL(trimmed).pathname; + const basename = path.posix.basename(pathname); + return basename || undefined; + } catch { + // Fall back to path-style parsing below. + } + } + const normalized = trimmed.replace(/\\/g, "/"); + const basename = path.posix.basename(normalized); + return basename || undefined; +} + +function bodyContainsMatchingFileBlock(mediaContext: FollowupMediaContext): boolean { + const body = mediaContext.Body?.trim(); + if (!body || !FILE_BLOCK_RE.test(body)) { + return false; + } + const bodyFileNames = new Set(); + for (const match of body.matchAll(/]*>/gi)) { + const fileName = match[1]?.trim(); + if (fileName) { + bodyFileNames.add(decodeXmlAttr(fileName)); + } + } + if (bodyFileNames.size === 0) { + return false; + } + return normalizeAttachments(mediaContext as MsgContext).some((attachment) => { + const fileName = extractAttachmentFileName(attachment.path ?? attachment.url); + return Boolean(fileName && bodyFileNames.has(fileName)); + }); +} + function stripInlineDirectives(text: string | undefined): string { return parseInlineDirectives(text ?? "").cleaned.trim(); } @@ -168,12 +205,14 @@ function rebuildQueuedPromptWithMediaUnderstanding(params: { // thread history above the body, and prompts whose original body no longer // appears all retain any legitimate blocks. if (params.updatedBody && FILE_BLOCK_RE.test(params.updatedBody)) { - const bodyIdx = - replacementTargets - .map((target) => findFirstOccurrenceBeforeFileBlocks(stripped, target)) - .find((index) => index >= 0) ?? -1; - if (bodyIdx >= 0) { - stripped = stripped.slice(0, bodyIdx) + stripExistingFileBlocks(stripped.slice(bodyIdx)); + const trailingMatch = findTrailingReplacementTargetBeforeFileBlocks( + stripped, + replacementTargets, + ); + if (trailingMatch) { + stripped = + stripped.slice(0, trailingMatch.index) + + stripExistingFileBlocks(stripped.slice(trailingMatch.index)); } } @@ -186,12 +225,15 @@ function rebuildQueuedPromptWithMediaUnderstanding(params: { } let rebuilt = stripped; - for (const target of replacementTargets) { - const replaced = replaceFirstOccurrenceBeforeFileBlocks(rebuilt, target, updatedBody); - if (replaced !== undefined) { - rebuilt = replaced; - return [params.mediaNote?.trim(), rebuilt.trim()].filter(Boolean).join("\n").trim(); - } + const trailingMatch = findTrailingReplacementTargetBeforeFileBlocks(rebuilt, replacementTargets); + if (trailingMatch) { + rebuilt = replaceOccurrenceAtIndex( + rebuilt, + trailingMatch.target, + updatedBody, + trailingMatch.index, + ); + return [params.mediaNote?.trim(), rebuilt.trim()].filter(Boolean).join("\n").trim(); } rebuilt = [rebuilt, updatedBody].filter(Boolean).join("\n\n"); @@ -268,29 +310,29 @@ export async function applyDeferredMediaUnderstandingToQueuedRun( if (!mediaContext || mediaContext.DeferredMediaApplied) { return; } - if (mediaContext.MediaUnderstanding?.length) { - mediaContext.DeferredMediaApplied = true; - return; - } if (!hasMediaAttachments(mediaContext)) { mediaContext.DeferredMediaApplied = true; return; } - - const resolvedOriginalBody = - mediaContext.CommandBody ?? mediaContext.RawBody ?? mediaContext.Body; - // Detect file extraction from the primary path via body mutation instead of - // scanning for literal ' { }; expect(agentCall?.prompt?.match(/ { + const fileBlock = '\nreport content\n'; + runEmbeddedPiAgentMock.mockResolvedValueOnce({ + payloads: [{ text: "processed" }], + meta: {}, + }); + + const runner = createFollowupRunner({ + opts: { onBlockReply: vi.fn(async () => {}) }, + typing: createMockTypingController(), + typingMode: "instant", + defaultModel: "anthropic/claude-opus-4-5", + }); + + await runner( + createQueuedRun({ + prompt: `[media attached: /tmp/report.pdf]\n${MEDIA_REPLY_HINT}\nsummarize this\n\n${fileBlock}`, + mediaContext: { + Body: `summarize this\n\n${fileBlock}`, + CommandBody: "summarize this", + MediaPaths: ["/tmp/report.pdf"], + MediaTypes: ["application/pdf"], + }, + }), + ); + + expect(applyMediaUnderstandingMock).not.toHaveBeenCalled(); + const agentCall = runEmbeddedPiAgentMock.mock.calls.at(-1)?.[0] as { + prompt?: string; + }; + expect(agentCall?.prompt).toContain(fileBlock); + }); + + it("replaces the trailing repeated body segment instead of the first matching thread text", async () => { + const existingFileBlock = + '\nold extracted content\n'; + const newFileBlock = + '\nnew extracted content\n'; + const transcriptText = "Deferred transcript"; + + applyMediaUnderstandingMock.mockImplementationOnce( + async (params: { ctx: Record }) => { + params.ctx.MediaUnderstanding = [ + { + kind: "audio.transcription", + text: transcriptText, + attachmentIndex: 0, + provider: "whisper", + }, + ]; + params.ctx.Transcript = transcriptText; + params.ctx.Body = `[Audio]\nTranscript:\n${transcriptText}\n\nsummarize this\n\n${newFileBlock}`; + return { + outputs: [ + { + kind: "audio.transcription", + text: transcriptText, + attachmentIndex: 0, + provider: "whisper", + }, + ], + decisions: [], + appliedImage: false, + appliedAudio: true, + appliedVideo: false, + appliedFile: true, + }; + }, + ); + runEmbeddedPiAgentMock.mockResolvedValueOnce({ + payloads: [{ text: "processed" }], + meta: {}, + }); + + const runner = createFollowupRunner({ + opts: { onBlockReply: vi.fn(async () => {}) }, + typing: createMockTypingController(), + typingMode: "instant", + defaultModel: "anthropic/claude-opus-4-5", + }); + + await runner( + createQueuedRun({ + prompt: + `[media attached 1/2: /tmp/voice.ogg]\n[media attached 2/2: /tmp/report.pdf]\n${MEDIA_REPLY_HINT}\nThread history: summarize this\n\n` + + `summarize this\n\n${existingFileBlock}`, + mediaContext: { + Body: `summarize this\n\n${existingFileBlock}`, + CommandBody: "summarize this", + RawBody: "summarize this", + MediaPaths: ["/tmp/voice.ogg", "/tmp/report.pdf"], + MediaTypes: ["audio/ogg", "application/pdf"], + }, + }), + ); + + const agentCall = runEmbeddedPiAgentMock.mock.calls.at(-1)?.[0] as { + prompt?: string; + }; + expect(agentCall?.prompt).toContain("Thread history: summarize this"); + expect(agentCall?.prompt).toContain(transcriptText); + expect(agentCall?.prompt).toContain(newFileBlock); + expect(agentCall?.prompt).not.toContain("old extracted content"); + expect(agentCall?.prompt?.indexOf(newFileBlock)).toBeGreaterThan( + agentCall?.prompt?.lastIndexOf("summarize this") ?? -1, + ); + }); }); describe("followup queue drain deferred media understanding", () => { @@ -2163,6 +2271,56 @@ describe("followup queue drain deferred media understanding", () => { expect(prompt).not.toContain("[media attached: /tmp/voice.ogg"); }); + it("preprocesses queued runs in parallel", async () => { + const resolvers: Array<() => void> = []; + const done = () => ({ + outputs: [], + decisions: [], + appliedImage: false, + appliedAudio: false, + appliedVideo: false, + appliedFile: false, + }); + applyMediaUnderstandingMock.mockImplementation( + async () => + await new Promise((resolve) => { + resolvers.push(() => resolve(done())); + }), + ); + + const items: FollowupRun[] = [ + createQueuedRun({ + prompt: "[media attached: /tmp/voice-1.ogg (audio/ogg)]\nfirst text", + summaryLine: "first text", + run: { messageProvider: "telegram" }, + mediaContext: { + Body: "first text", + MediaPaths: ["/tmp/voice-1.ogg"], + MediaTypes: ["audio/ogg"], + }, + }), + createQueuedRun({ + prompt: "[media attached: /tmp/voice-2.ogg (audio/ogg)]\nsecond text", + summaryLine: "second text", + run: { messageProvider: "telegram" }, + mediaContext: { + Body: "second text", + MediaPaths: ["/tmp/voice-2.ogg"], + MediaTypes: ["audio/ogg"], + }, + }), + ]; + + const pending = applyDeferredMediaToQueuedRuns(items); + + expect(applyMediaUnderstandingMock).toHaveBeenCalledTimes(2); + + for (const resolve of resolvers) { + resolve(); + } + await pending; + }); + it("preprocesses dropped media items before building overflow summaries", async () => { applyMediaUnderstandingMock.mockImplementationOnce( async (params: { ctx: Record }) => { diff --git a/src/auto-reply/reply/queue/drain.ts b/src/auto-reply/reply/queue/drain.ts index 471c94200ba..5d53df34189 100644 --- a/src/auto-reply/reply/queue/drain.ts +++ b/src/auto-reply/reply/queue/drain.ts @@ -76,9 +76,12 @@ function clearFollowupQueueSummaryState(queue: FollowupQueueState): void { } export async function applyDeferredMediaToQueuedRuns(items: FollowupRun[]): Promise { - for (const item of items) { - await applyDeferredMediaUnderstandingToQueuedRun(item, { logLabel: "followup queue" }); - } + await Promise.allSettled( + items.map( + async (item) => + await applyDeferredMediaUnderstandingToQueuedRun(item, { logLabel: "followup queue" }), + ), + ); } async function resolveSummaryLines(items: FollowupRun[]): Promise {