diff --git a/src/auto-reply/reply/followup-media.test.ts b/src/auto-reply/reply/followup-media.test.ts new file mode 100644 index 00000000000..77996f85606 --- /dev/null +++ b/src/auto-reply/reply/followup-media.test.ts @@ -0,0 +1,121 @@ +import { describe, expect, it } from "vitest"; +import { + _findLastOccurrenceOutsideFileBlocks as findLastOccurrenceOutsideFileBlocks, + _normalizeUpdatedBody as normalizeUpdatedBody, + _rebuildQueuedPromptWithMediaUnderstanding as rebuildQueuedPromptWithMediaUnderstanding, +} from "./followup-media.js"; + +const FILE_BLOCK = '\nPDF content\n'; + +describe("findLastOccurrenceOutsideFileBlocks", () => { + it("returns -1 for empty search", () => { + expect(findLastOccurrenceOutsideFileBlocks("hello", "")).toBe(-1); + }); + + it("finds last occurrence in body region before file blocks", () => { + const value = `hello world hello\n${FILE_BLOCK}`; + // "hello" appears at 0 and 12 — both before the file block + expect(findLastOccurrenceOutsideFileBlocks(value, "hello")).toBe(12); + }); + + it("skips matches inside file block content", () => { + // "PDF content" appears only inside the file block — no valid match outside. + const value = `some text\n${FILE_BLOCK}`; + expect(findLastOccurrenceOutsideFileBlocks(value, "PDF content")).toBe(-1); + }); + + it("finds trailing occurrence outside file block even when also inside one", () => { + const value = `some text\n${FILE_BLOCK}\nPDF content`; + // "PDF content" appears inside the file block AND after it — the function + // should return the trailing occurrence that is outside the block. + const expected = value.lastIndexOf("PDF content"); + expect(findLastOccurrenceOutsideFileBlocks(value, "PDF content")).toBe(expected); + }); + + it("finds occurrence when search itself contains file blocks", () => { + const bodyWithFile = `caption\n${FILE_BLOCK}`; + const value = `previous\n${bodyWithFile}\nlater\n${bodyWithFile}`; + // Should find the *last* (trailing) occurrence + const expected = value.lastIndexOf(bodyWithFile); + expect(findLastOccurrenceOutsideFileBlocks(value, bodyWithFile)).toBe(expected); + expect(expected).toBeGreaterThan(value.indexOf(bodyWithFile)); + }); + + it("returns index when no file blocks exist in value", () => { + expect(findLastOccurrenceOutsideFileBlocks("abc abc", "abc")).toBe(4); + }); + + it("finds body text after thread-history file blocks", () => { + const value = `Thread history\n${FILE_BLOCK}\n\ncheck this out`; + // The body "check this out" appears after a file block from thread history. + // The old truncation approach would miss this; the new approach finds it. + expect(findLastOccurrenceOutsideFileBlocks(value, "check this out")).toBe( + value.lastIndexOf("check this out"), + ); + }); +}); + +describe("normalizeUpdatedBody", () => { + it("returns empty string when updatedBody is empty", () => { + expect(normalizeUpdatedBody({ originalBody: "foo", updatedBody: "" })).toBe(""); + }); + + it("returns updatedBody when originalBody is empty", () => { + expect(normalizeUpdatedBody({ updatedBody: "hello" })).toBe("hello"); + }); + + it("strips directives when updatedBody equals originalBody", () => { + const body = "/think high tell me a joke"; + const result = normalizeUpdatedBody({ originalBody: body, updatedBody: body }); + expect(result).toBe("tell me a joke"); + }); + + it("does not corrupt file block content during directive cleanup", () => { + const originalBody = "/think high tell me about this file"; + // updatedBody has the original body plus a file block appended by media processing + const updatedBody = `${originalBody}\n${FILE_BLOCK}`; + const result = normalizeUpdatedBody({ originalBody, updatedBody }); + // The directive should be stripped from the body portion, file block preserved + expect(result).toContain("tell me about this file"); + expect(result).toContain(FILE_BLOCK); + expect(result).not.toContain("/think"); + }); + + it("replaces in body region, not inside file blocks", () => { + const originalBody = "PDF content"; + const updatedBody = `PDF content\n\nPDF content\n`; + // The replacement should target the body region "PDF content" before the + // file block, not the "PDF content" inside the block. + const result = normalizeUpdatedBody({ originalBody, updatedBody }); + // With no directives to strip, original === cleaned, updatedBody !== originalBody + // because updatedBody has the file block appended. The replacement targets the + // body-region occurrence. + expect(result).toContain('"); + }); +}); + +describe("rebuildQueuedPromptWithMediaUnderstanding", () => { + it("replaces original body with updated body in prompt", () => { + const result = rebuildQueuedPromptWithMediaUnderstanding({ + prompt: "thread context\nhello world", + originalBody: "hello world", + updatedBody: 'hello world\ndata', + }); + expect(result).toContain('data'); + expect(result).toContain("thread context"); + }); + + it("preserves file blocks in thread history when body is replaced", () => { + const prompt = `history\nold\nhello world`; + const result = rebuildQueuedPromptWithMediaUnderstanding({ + prompt, + originalBody: "hello world", + updatedBody: "hello world transcribed", + }); + // The old file block from history should be preserved since updatedBody + // has no file blocks of its own. + expect(result).toContain('old'); + expect(result).toContain("hello world transcribed"); + }); +}); diff --git a/src/auto-reply/reply/followup-media.ts b/src/auto-reply/reply/followup-media.ts new file mode 100644 index 00000000000..19b2c061c8c --- /dev/null +++ b/src/auto-reply/reply/followup-media.ts @@ -0,0 +1,374 @@ +import { logVerbose } from "../../globals.js"; +import { applyMediaUnderstanding } from "../../media-understanding/apply.js"; +import { + normalizeAttachments, + resolveAttachmentKind, +} from "../../media-understanding/attachments.js"; +import { buildInboundMediaNote } from "../media-note.js"; +import type { MsgContext } from "../templating.js"; +import { parseInlineDirectives } from "./directive-handling.js"; +import type { FollowupMediaContext, FollowupRun } from "./queue/types.js"; + +const MEDIA_ONLY_PLACEHOLDER = "[User sent media without caption]"; +const MEDIA_REPLY_HINT_PREFIX = "To send an image back, prefer the message tool"; +const LEADING_MEDIA_ATTACHED_LINE_RE = /^\[media attached(?: \d+\/\d+)?: [^\r\n]*\]$/; +const FILE_BLOCK_RE = /]*>[\s\S]*?<\/file>/i; +const FILE_BLOCK_FULL_RE = /]*>[\s\S]*?<\/file>\n?/gi; + +function stripExistingFileBlocks(text: string): string { + return text.replace(FILE_BLOCK_FULL_RE, "").trim(); +} + +function stripLeadingMediaAttachedLines(prompt: string): string { + const lines = prompt.split("\n"); + let index = 0; + while (index < lines.length) { + const trimmed = lines[index]?.trim() ?? ""; + if (!LEADING_MEDIA_ATTACHED_LINE_RE.test(trimmed)) { + break; + } + index += 1; + } + return lines.slice(index).join("\n").trim(); +} + +function stripLeadingMediaReplyHint(prompt: string): string { + const lines = prompt.split("\n"); + if ((lines[0] ?? "").startsWith(MEDIA_REPLY_HINT_PREFIX)) { + return lines.slice(1).join("\n").trim(); + } + return prompt.trim(); +} + +/** Collect the [start, end) ranges of every `` block in `value`. */ +function collectFileBlockRanges(value: string): Array<[number, number]> { + const ranges: Array<[number, number]> = []; + const re = new RegExp(FILE_BLOCK_FULL_RE.source, FILE_BLOCK_FULL_RE.flags); + let m: RegExpExecArray | null; + while ((m = re.exec(value)) !== null) { + ranges.push([m.index, m.index + m[0].length]); + } + return ranges; +} + +function isInsideFileBlock( + position: number, + length: number, + ranges: Array<[number, number]>, +): boolean { + for (const [start, end] of ranges) { + if (position >= start && position + length <= end) { + return true; + } + } + return false; +} + +/** + * Find the last occurrence of `search` in `value` that is NOT inside a + * `` block. Searches the full string with lastIndexOf, + * then walks backward past any matches that fall inside file blocks. + */ +function findLastOccurrenceOutsideFileBlocks(value: string, search: string): number { + if (!search) { + return -1; + } + const ranges = collectFileBlockRanges(value); + let pos = value.lastIndexOf(search); + while (pos >= 0 && isInsideFileBlock(pos, search.length, ranges)) { + pos = value.lastIndexOf(search, pos - 1); + } + return pos; +} + +function replaceLastOccurrenceOutsideFileBlocks( + value: string, + search: string, + replacement: string, +): string | undefined { + if (!search) { + return undefined; + } + const index = findLastOccurrenceOutsideFileBlocks(value, search); + if (index < 0) { + return undefined; + } + return `${value.slice(0, index)}${replacement}${value.slice(index + search.length)}`; +} + +function findTrailingReplacementTargetBeforeFileBlocks( + value: string, + targets: string[], +): { index: number; target: string } | undefined { + let bestMatch: { index: number; target: string } | undefined; + for (const target of targets) { + const index = findLastOccurrenceOutsideFileBlocks(value, target); + if (index < 0) { + continue; + } + if (!bestMatch || index > bestMatch.index) { + bestMatch = { index, target }; + } + } + return bestMatch; +} + +function replaceOccurrenceAtIndex( + value: string, + search: string, + replacement: string, + index: number, +): string { + return `${value.slice(0, index)}${replacement}${value.slice(index + search.length)}`; +} + +function stripInlineDirectives(text: string | undefined): string { + return parseInlineDirectives(text ?? "").cleaned.trim(); +} + +function bodyContainsExtractedFileBlock(text: string | undefined): boolean { + return FILE_BLOCK_BODY_RE.test(text ?? ""); +} + +function normalizeUpdatedBody(params: { originalBody?: string; updatedBody?: string }): string { + const updatedBody = params.updatedBody?.trim(); + if (!updatedBody) { + return ""; + } + const originalBody = params.originalBody?.trim(); + if (!originalBody) { + return updatedBody; + } + + const cleanedOriginalBody = stripInlineDirectives(originalBody); + if (!cleanedOriginalBody) { + return updatedBody; + } + if (updatedBody === originalBody) { + return cleanedOriginalBody; + } + return ( + replaceLastOccurrenceOutsideFileBlocks(updatedBody, originalBody, cleanedOriginalBody) ?? + updatedBody + ).trim(); +} + +function rebuildQueuedPromptWithMediaUnderstanding(params: { + prompt: string; + originalBody?: string; + updatedBody?: string; + mediaNote?: string; +}): string { + let stripped = stripLeadingMediaAttachedLines(params.prompt); + if (!params.mediaNote) { + stripped = stripLeadingMediaReplyHint(stripped); + } + + const replacementTargets = [ + params.originalBody?.trim(), + stripInlineDirectives(params.originalBody), + MEDIA_ONLY_PLACEHOLDER, + ].filter( + (value, index, list): value is string => Boolean(value) && list.indexOf(value) === index, + ); + + // Strip pre-existing file blocks from the body region when the updated body + // contains new file blocks. Mixed messages (audio + PDF) can arrive with + // file extraction already applied in the primary path; without this strip + // the old block stays in the prompt while the updated body adds a new one, + // duplicating potentially large file payloads. + // Scope stripping to the confirmed body segment so quoted/replied text, + // thread history above the body, and prompts whose original body no longer + // appears all retain any legitimate blocks. + if (params.updatedBody && FILE_BLOCK_RE.test(params.updatedBody)) { + const trailingMatch = findTrailingReplacementTargetBeforeFileBlocks( + stripped, + replacementTargets, + ); + if (trailingMatch) { + stripped = + stripped.slice(0, trailingMatch.index) + + stripExistingFileBlocks(stripped.slice(trailingMatch.index)); + } + } + + const updatedBody = normalizeUpdatedBody({ + originalBody: params.originalBody, + updatedBody: params.updatedBody, + }); + if (!updatedBody) { + return [params.mediaNote?.trim(), stripped].filter(Boolean).join("\n").trim(); + } + + let rebuilt = stripped; + const trailingMatch = findTrailingReplacementTargetBeforeFileBlocks(rebuilt, replacementTargets); + if (trailingMatch) { + rebuilt = replaceOccurrenceAtIndex( + rebuilt, + trailingMatch.target, + updatedBody, + trailingMatch.index, + ); + return [params.mediaNote?.trim(), rebuilt.trim()].filter(Boolean).join("\n").trim(); + } + + rebuilt = [rebuilt, updatedBody].filter(Boolean).join("\n\n"); + return [params.mediaNote?.trim(), rebuilt.trim()].filter(Boolean).join("\n").trim(); +} + +function hasMediaAttachments(mediaContext: FollowupMediaContext): boolean { + return Boolean( + mediaContext.MediaPath?.trim() || + mediaContext.MediaUrl?.trim() || + (Array.isArray(mediaContext.MediaPaths) && mediaContext.MediaPaths.length > 0) || + (Array.isArray(mediaContext.MediaUrls) && mediaContext.MediaUrls.length > 0), + ); +} + +function hasOnlyFileLikeAttachments(mediaContext: FollowupMediaContext): boolean { + const attachments = normalizeAttachments(mediaContext as MsgContext); + return ( + attachments.length > 0 && + attachments.every((attachment) => { + const kind = resolveAttachmentKind(attachment); + return kind !== "audio" && kind !== "image" && kind !== "video"; + }) + ); +} + +function hasAnyFileAttachments(mediaContext: FollowupMediaContext): boolean { + return normalizeAttachments(mediaContext as MsgContext).some((attachment) => { + const kind = resolveAttachmentKind(attachment); + return kind !== "audio" && kind !== "image" && kind !== "video"; + }); +} + +function snapshotUpdatedMediaContext(params: { + original: FollowupMediaContext; + mediaCtx: MsgContext; + updatedBody?: string; + appliedFile?: boolean; +}): FollowupMediaContext { + return { + ...params.original, + Body: params.updatedBody ?? params.original.Body, + Transcript: + typeof params.mediaCtx.Transcript === "string" + ? params.mediaCtx.Transcript + : params.original.Transcript, + MediaUnderstanding: Array.isArray(params.mediaCtx.MediaUnderstanding) + ? [...params.mediaCtx.MediaUnderstanding] + : params.original.MediaUnderstanding, + MediaUnderstandingDecisions: Array.isArray(params.mediaCtx.MediaUnderstandingDecisions) + ? [...params.mediaCtx.MediaUnderstandingDecisions] + : params.original.MediaUnderstandingDecisions, + DeferredMediaApplied: true, + DeferredFileBlocksExtracted: + params.original.DeferredFileBlocksExtracted || params.appliedFile || undefined, + }; +} + +// Exported for unit testing — these are pure string helpers with no side effects. +export { + findLastOccurrenceOutsideFileBlocks as _findLastOccurrenceOutsideFileBlocks, + normalizeUpdatedBody as _normalizeUpdatedBody, + rebuildQueuedPromptWithMediaUnderstanding as _rebuildQueuedPromptWithMediaUnderstanding, +}; + +export async function applyDeferredMediaUnderstandingToQueuedRun( + queued: FollowupRun, + params: { logLabel?: string } = {}, +): Promise { + // NOTE: collect-mode and overflow-summary queue drains create synthetic + // followup runs without mediaContext — those paths are not covered here + // and rely on their own prompt-building logic in queue/drain.ts. + const mediaContext = queued.mediaContext; + if (!mediaContext || mediaContext.DeferredMediaApplied) { + return; + } + if (!hasMediaAttachments(mediaContext)) { + mediaContext.DeferredMediaApplied = true; + return; + } + + if (mediaContext.MediaUnderstanding?.length) { + mediaContext.DeferredMediaApplied = true; + return; + } + // Treat followup file extraction as already applied only when we have explicit + // evidence: the queue snapshot already flagged it or Body already contains a + // real extracted ... block. Body/RawBody mismatches are not + // reliable because some channels wrap Body with envelope metadata. + if ( + !mediaContext.DeferredFileBlocksExtracted && + hasAnyFileAttachments(mediaContext) && + bodyContainsExtractedFileBlock(mediaContext.Body) + ) { + mediaContext.DeferredFileBlocksExtracted = true; + } + + if (mediaContext.DeferredFileBlocksExtracted && hasOnlyFileLikeAttachments(mediaContext)) { + mediaContext.DeferredMediaApplied = true; + return; + } + + const resolvedOriginalBody = + mediaContext.CommandBody ?? mediaContext.RawBody ?? mediaContext.Body; + + try { + const mediaCtx = { + ...mediaContext, + Body: resolvedOriginalBody, + Provider: + mediaContext.Provider ?? + queued.run.messageProvider ?? + (typeof mediaContext.OriginatingChannel === "string" + ? mediaContext.OriginatingChannel + : undefined), + Surface: mediaContext.Surface, + } as MsgContext; + + const muResult = await applyMediaUnderstanding({ + ctx: mediaCtx, + cfg: queued.run.config, + agentDir: queued.run.agentDir, + activeModel: { + provider: queued.run.provider, + model: queued.run.model, + }, + }); + + const shouldRebuildPrompt = + muResult.outputs.length > 0 || + muResult.appliedAudio || + muResult.appliedImage || + muResult.appliedVideo || + (muResult.appliedFile && !mediaContext.DeferredFileBlocksExtracted); + + if (shouldRebuildPrompt) { + const newMediaNote = buildInboundMediaNote(mediaCtx); + queued.prompt = rebuildQueuedPromptWithMediaUnderstanding({ + prompt: queued.prompt, + originalBody: resolvedOriginalBody, + updatedBody: mediaCtx.Body, + mediaNote: newMediaNote, + }); + logVerbose( + `${params.logLabel ?? "followup"}: applied media understanding (audio=${muResult.appliedAudio}, image=${muResult.appliedImage}, video=${muResult.appliedVideo}, file=${muResult.appliedFile})`, + ); + } + + queued.mediaContext = snapshotUpdatedMediaContext({ + original: mediaContext, + mediaCtx, + updatedBody: shouldRebuildPrompt ? mediaCtx.Body : undefined, + appliedFile: muResult.appliedFile, + }); + } catch (err) { + mediaContext.DeferredMediaApplied = true; + logVerbose( + `${params.logLabel ?? "followup"}: media understanding failed, proceeding with raw content: ${err instanceof Error ? err.message : String(err)}`, + ); + } +} diff --git a/src/auto-reply/reply/followup-runner.test.ts b/src/auto-reply/reply/followup-runner.test.ts index 0e93ab156a8..f766aa57321 100644 --- a/src/auto-reply/reply/followup-runner.test.ts +++ b/src/auto-reply/reply/followup-runner.test.ts @@ -3,6 +3,7 @@ import { tmpdir } from "node:os"; import path from "node:path"; import { beforeEach, describe, expect, it, vi } from "vitest"; import { loadSessionStore, saveSessionStore, type SessionEntry } from "../../config/sessions.js"; +import { buildCollectPrompt } from "../../utils/queue-helpers.js"; import type { FollowupRun } from "./queue.js"; import * as sessionRunAccounting from "./session-run-accounting.js"; import { createMockFollowupRun, createMockTypingController } from "./test-helpers.js"; @@ -10,6 +11,7 @@ import { createMockFollowupRun, createMockTypingController } from "./test-helper const runEmbeddedPiAgentMock = vi.fn(); const routeReplyMock = vi.fn(); const isRoutableChannelMock = vi.fn(); +const applyMediaUnderstandingMock = vi.fn(); vi.mock( "../../agents/model-fallback.js", @@ -20,6 +22,10 @@ vi.mock("../../agents/pi-embedded.js", () => ({ runEmbeddedPiAgent: (params: unknown) => runEmbeddedPiAgentMock(params), })); +vi.mock("../../media-understanding/apply.js", () => ({ + applyMediaUnderstanding: (params: unknown) => applyMediaUnderstandingMock(params), +})); + vi.mock("./route-reply.js", async (importOriginal) => { const actual = await importOriginal(); return { @@ -30,6 +36,10 @@ vi.mock("./route-reply.js", async (importOriginal) => { }); import { createFollowupRunner } from "./followup-runner.js"; +import { + applyDeferredMediaToQueuedRuns, + buildMediaAwareQueueSummaryPrompt, +} from "./queue/drain.js"; const ROUTABLE_TEST_CHANNELS = new Set([ "telegram", @@ -48,13 +58,27 @@ beforeEach(() => { isRoutableChannelMock.mockImplementation((ch: string | undefined) => Boolean(ch?.trim() && ROUTABLE_TEST_CHANNELS.has(ch.trim().toLowerCase())), ); + applyMediaUnderstandingMock.mockReset(); + applyMediaUnderstandingMock.mockResolvedValue({ + outputs: [], + decisions: [], + appliedImage: false, + appliedAudio: false, + appliedVideo: false, + appliedFile: false, + }); }); const baseQueuedRun = (messageProvider = "whatsapp"): FollowupRun => createMockFollowupRun({ run: { messageProvider } }); +const MEDIA_REPLY_HINT = + "To send an image back, prefer the message tool (media/path/filePath). If you must inline, use MEDIA:https://example.com/image.jpg (spaces ok, quote if needed) or a safe relative path like MEDIA:./image.jpg. Avoid absolute paths (MEDIA:/...) and ~ paths — they are blocked for security. Keep caption in the text body."; + function createQueuedRun( - overrides: Partial> & { run?: Partial } = {}, + overrides: Partial> & { + run?: Partial; + } = {}, ): FollowupRun { return createMockFollowupRun(overrides); } @@ -401,7 +425,12 @@ describe("createFollowupRunner messaging tool dedupe", () => { agentResult: { ...makeTextReplyDedupeResult(), messagingToolSentTargets: [ - { tool: "telegram", provider: "telegram", to: "268300329", accountId: "work" }, + { + tool: "telegram", + provider: "telegram", + to: "268300329", + accountId: "work", + }, ], }, queued: { @@ -451,8 +480,13 @@ describe("createFollowupRunner messaging tool dedupe", () => { "sessions.json", ); const sessionKey = "main"; - const sessionEntry: SessionEntry = { sessionId: "session", updatedAt: Date.now() }; - const sessionStore: Record = { [sessionKey]: sessionEntry }; + const sessionEntry: SessionEntry = { + sessionId: "session", + updatedAt: Date.now(), + }; + const sessionStore: Record = { + [sessionKey]: sessionEntry, + }; await saveSessionStore(storePath, sessionStore); const { onBlockReply } = await runMessagingCase({ @@ -704,7 +738,1795 @@ describe("createFollowupRunner agentDir forwarding", () => { }); expect(runEmbeddedPiAgentMock).toHaveBeenCalledTimes(1); - const call = runEmbeddedPiAgentMock.mock.calls.at(-1)?.[0] as { agentDir?: string }; + const call = runEmbeddedPiAgentMock.mock.calls.at(-1)?.[0] as { + agentDir?: string; + }; expect(call?.agentDir).toBe(agentDir); }); }); + +describe("createFollowupRunner media understanding", () => { + it("applies audio transcription when mediaContext has untranscribed audio", async () => { + const transcriptText = "Hello, this is a voice note."; + // The real applyMediaUnderstanding mutates the ctx; the mock must do the same + // so buildInboundMediaNote and queued prompt rebuilding see the transcribed body. + applyMediaUnderstandingMock.mockImplementationOnce( + async (params: { ctx: Record }) => { + params.ctx.MediaUnderstanding = [ + { + kind: "audio.transcription", + text: transcriptText, + attachmentIndex: 0, + provider: "whisper", + }, + ]; + params.ctx.Transcript = transcriptText; + params.ctx.Body = `[Audio]\nUser text:\nsome text\nTranscript:\n${transcriptText}`; + return { + outputs: [ + { + kind: "audio.transcription", + text: transcriptText, + attachmentIndex: 0, + provider: "whisper", + }, + ], + decisions: [], + appliedImage: false, + appliedAudio: true, + appliedVideo: false, + appliedFile: false, + }; + }, + ); + runEmbeddedPiAgentMock.mockResolvedValueOnce({ + payloads: [{ text: "Got it!" }], + meta: {}, + }); + + const onBlockReply = vi.fn(async () => {}); + const runner = createFollowupRunner({ + opts: { onBlockReply }, + typing: createMockTypingController(), + typingMode: "instant", + defaultModel: "anthropic/claude-opus-4-5", + }); + + const queued = createQueuedRun({ + prompt: "[media attached: /tmp/voice.ogg (audio/ogg)]\nsome text", + mediaContext: { + Body: "some text", + MediaPaths: ["/tmp/voice.ogg"], + MediaTypes: ["audio/ogg"], + // MediaUnderstanding is empty — transcription not yet applied + }, + }); + await runner(queued); + + // applyMediaUnderstanding should have been called + expect(applyMediaUnderstandingMock).toHaveBeenCalledTimes(1); + expect(applyMediaUnderstandingMock).toHaveBeenCalledWith( + expect.objectContaining({ + cfg: queued.run.config, + agentDir: queued.run.agentDir, + }), + ); + + // The prompt passed to the agent should include the transcript, not the + // raw audio attachment line. + const agentCall = runEmbeddedPiAgentMock.mock.calls.at(-1)?.[0] as { + prompt?: string; + }; + expect(agentCall?.prompt).toContain(transcriptText); + expect(agentCall?.prompt).not.toContain("[media attached: /tmp/voice.ogg"); + + expect(onBlockReply).toHaveBeenCalledWith(expect.objectContaining({ text: "Got it!" })); + }); + + it("propagates the queued message provider into deferred media context", async () => { + const transcriptText = "Provider-aware transcript"; + applyMediaUnderstandingMock.mockImplementationOnce( + async (params: { ctx: Record }) => { + expect(params.ctx.Provider).toBe("telegram"); + params.ctx.MediaUnderstanding = [ + { + kind: "audio.transcription", + text: transcriptText, + attachmentIndex: 0, + provider: "whisper", + }, + ]; + params.ctx.Transcript = transcriptText; + params.ctx.Body = `[Audio]\nTranscript:\n${transcriptText}`; + return { + outputs: [ + { + kind: "audio.transcription", + text: transcriptText, + attachmentIndex: 0, + provider: "whisper", + }, + ], + decisions: [], + appliedImage: false, + appliedAudio: true, + appliedVideo: false, + appliedFile: false, + }; + }, + ); + runEmbeddedPiAgentMock.mockResolvedValueOnce({ + payloads: [{ text: "done" }], + meta: {}, + }); + + const runner = createFollowupRunner({ + opts: { onBlockReply: vi.fn(async () => {}) }, + typing: createMockTypingController(), + typingMode: "instant", + defaultModel: "anthropic/claude-opus-4-5", + }); + + await runner( + createQueuedRun({ + prompt: "[User sent media without caption]", + run: { messageProvider: "telegram" }, + mediaContext: { + Body: "", + MediaPaths: ["/tmp/voice.ogg"], + MediaTypes: ["audio/ogg"], + }, + }), + ); + + expect(applyMediaUnderstandingMock).toHaveBeenCalledTimes(1); + const agentCall = runEmbeddedPiAgentMock.mock.calls.at(-1)?.[0] as { + prompt?: string; + }; + expect(agentCall?.prompt).toContain(transcriptText); + }); + + it("applies media understanding for URL-only attachments", async () => { + const transcriptText = "URL-only transcript"; + applyMediaUnderstandingMock.mockImplementationOnce( + async (params: { ctx: Record }) => { + params.ctx.MediaUnderstanding = [ + { + kind: "audio.transcription", + text: transcriptText, + attachmentIndex: 0, + provider: "whisper", + }, + ]; + params.ctx.Transcript = transcriptText; + params.ctx.Body = `[Audio]\nUser text:\nsome text\nTranscript:\n${transcriptText}`; + return { + outputs: [ + { + kind: "audio.transcription", + text: transcriptText, + attachmentIndex: 0, + provider: "whisper", + }, + ], + decisions: [], + appliedImage: false, + appliedAudio: true, + appliedVideo: false, + appliedFile: false, + }; + }, + ); + runEmbeddedPiAgentMock.mockResolvedValueOnce({ + payloads: [{ text: "Got it!" }], + meta: {}, + }); + + const runner = createFollowupRunner({ + opts: { onBlockReply: vi.fn(async () => {}) }, + typing: createMockTypingController(), + typingMode: "instant", + defaultModel: "anthropic/claude-opus-4-5", + }); + + await runner( + createQueuedRun({ + prompt: "[media attached: https://cdn.example.com/voice.ogg (audio/ogg)]\nsome text", + mediaContext: { + Body: "some text", + MediaUrl: "https://cdn.example.com/voice.ogg", + MediaUrls: ["https://cdn.example.com/voice.ogg"], + MediaType: "audio/ogg", + MediaTypes: ["audio/ogg"], + }, + }), + ); + + expect(applyMediaUnderstandingMock).toHaveBeenCalledTimes(1); + const agentCall = runEmbeddedPiAgentMock.mock.calls.at(-1)?.[0] as { + prompt?: string; + }; + expect(agentCall?.prompt).toContain(transcriptText); + }); + + it("strips the full media line when attachment paths or URLs contain brackets", async () => { + const transcriptText = "Bracket-safe transcript"; + applyMediaUnderstandingMock.mockImplementationOnce( + async (params: { ctx: Record }) => { + params.ctx.MediaUnderstanding = [ + { + kind: "audio.transcription", + text: transcriptText, + attachmentIndex: 0, + provider: "whisper", + }, + ]; + params.ctx.Transcript = transcriptText; + params.ctx.Body = `[Audio]\nTranscript:\n${transcriptText}`; + return { + outputs: [ + { + kind: "audio.transcription", + text: transcriptText, + attachmentIndex: 0, + provider: "whisper", + }, + ], + decisions: [], + appliedImage: false, + appliedAudio: true, + appliedVideo: false, + appliedFile: false, + }; + }, + ); + runEmbeddedPiAgentMock.mockResolvedValueOnce({ + payloads: [{ text: "done" }], + meta: {}, + }); + + const runner = createFollowupRunner({ + opts: { onBlockReply: vi.fn(async () => {}) }, + typing: createMockTypingController(), + typingMode: "instant", + defaultModel: "anthropic/claude-opus-4-5", + }); + + await runner( + createQueuedRun({ + prompt: + "[media attached: /tmp/voice[0].ogg (audio/ogg) | https://cdn.example.com/files[0].ogg]\nsome text", + mediaContext: { + Body: "some text", + CommandBody: "some text", + RawBody: "some text", + MediaPaths: ["/tmp/voice[0].ogg"], + MediaUrls: ["https://cdn.example.com/files[0].ogg"], + MediaTypes: ["audio/ogg"], + }, + }), + ); + + const agentCall = runEmbeddedPiAgentMock.mock.calls.at(-1)?.[0] as { + prompt?: string; + }; + expect(agentCall?.prompt).toContain(transcriptText); + expect(agentCall?.prompt).not.toContain("[media attached:"); + expect(agentCall?.prompt).not.toContain("files[0].ogg]"); + }); + + it("only strips leading synthetic media lines and preserves literal user text later in the prompt", async () => { + const transcriptText = "Transcript with literal token"; + applyMediaUnderstandingMock.mockImplementationOnce( + async (params: { ctx: Record }) => { + params.ctx.MediaUnderstanding = [ + { + kind: "audio.transcription", + text: transcriptText, + attachmentIndex: 0, + provider: "whisper", + }, + ]; + params.ctx.Transcript = transcriptText; + params.ctx.Body = "I literally typed [media attached: keep me] in this message."; + return { + outputs: [ + { + kind: "audio.transcription", + text: transcriptText, + attachmentIndex: 0, + provider: "whisper", + }, + ], + decisions: [], + appliedImage: false, + appliedAudio: true, + appliedVideo: false, + appliedFile: false, + }; + }, + ); + runEmbeddedPiAgentMock.mockResolvedValueOnce({ + payloads: [{ text: "done" }], + meta: {}, + }); + + const runner = createFollowupRunner({ + opts: { onBlockReply: vi.fn(async () => {}) }, + typing: createMockTypingController(), + typingMode: "instant", + defaultModel: "anthropic/claude-opus-4-5", + }); + + await runner( + createQueuedRun({ + prompt: + "[media attached: /tmp/voice.ogg (audio/ogg)]\nI literally typed [media attached: keep me] in this message.", + mediaContext: { + Body: "I literally typed [media attached: keep me] in this message.", + CommandBody: "I literally typed [media attached: keep me] in this message.", + RawBody: "I literally typed [media attached: keep me] in this message.", + MediaPaths: ["/tmp/voice.ogg"], + MediaTypes: ["audio/ogg"], + }, + }), + ); + + const agentCall = runEmbeddedPiAgentMock.mock.calls.at(-1)?.[0] as { + prompt?: string; + }; + expect(agentCall?.prompt).toContain( + "I literally typed [media attached: keep me] in this message.", + ); + expect(agentCall?.prompt).not.toContain("[media attached: /tmp/voice.ogg (audio/ogg)]"); + }); + + it("skips media understanding when MediaUnderstanding is already populated", async () => { + runEmbeddedPiAgentMock.mockResolvedValueOnce({ + payloads: [{ text: "reply" }], + meta: {}, + }); + + const onBlockReply = vi.fn(async () => {}); + const runner = createFollowupRunner({ + opts: { onBlockReply }, + typing: createMockTypingController(), + typingMode: "instant", + defaultModel: "anthropic/claude-opus-4-5", + }); + + const queued = createQueuedRun({ + prompt: "[Audio]\nTranscript:\nAlready transcribed.\n\nsome text", + mediaContext: { + Body: "some text", + MediaPaths: ["/tmp/voice.ogg"], + MediaTypes: ["audio/ogg"], + // MediaUnderstanding already populated — transcription was applied in primary path + MediaUnderstanding: [ + { + kind: "audio.transcription", + text: "Already transcribed.", + attachmentIndex: 0, + provider: "whisper", + }, + ], + }, + }); + await runner(queued); + + // Should NOT re-run media understanding + expect(applyMediaUnderstandingMock).not.toHaveBeenCalled(); + + // The original prompt should be passed through unchanged + const agentCall = runEmbeddedPiAgentMock.mock.calls.at(-1)?.[0] as { + prompt?: string; + }; + expect(agentCall?.prompt).toContain("Already transcribed."); + }); + + it("skips media understanding when no mediaContext is present", async () => { + runEmbeddedPiAgentMock.mockResolvedValueOnce({ + payloads: [{ text: "reply" }], + meta: {}, + }); + + const onBlockReply = vi.fn(async () => {}); + const runner = createFollowupRunner({ + opts: { onBlockReply }, + typing: createMockTypingController(), + typingMode: "instant", + defaultModel: "anthropic/claude-opus-4-5", + }); + + // No mediaContext (plain text message) + const queued = createQueuedRun({ prompt: "just text" }); + await runner(queued); + + expect(applyMediaUnderstandingMock).not.toHaveBeenCalled(); + }); + + it("continues with raw prompt when media understanding fails", async () => { + applyMediaUnderstandingMock.mockRejectedValueOnce(new Error("transcription service down")); + runEmbeddedPiAgentMock.mockResolvedValueOnce({ + payloads: [{ text: "fallback reply" }], + meta: {}, + }); + + const onBlockReply = vi.fn(async () => {}); + const runner = createFollowupRunner({ + opts: { onBlockReply }, + typing: createMockTypingController(), + typingMode: "instant", + defaultModel: "anthropic/claude-opus-4-5", + }); + + const originalPrompt = "[media attached: /tmp/voice.ogg (audio/ogg)]\nsome text"; + const queued = createQueuedRun({ + prompt: originalPrompt, + mediaContext: { + Body: "some text", + MediaPaths: ["/tmp/voice.ogg"], + MediaTypes: ["audio/ogg"], + }, + }); + await runner(queued); + + // Should have attempted media understanding + expect(applyMediaUnderstandingMock).toHaveBeenCalledTimes(1); + + // Agent should still run with the original prompt + const agentCall = runEmbeddedPiAgentMock.mock.calls.at(-1)?.[0] as { + prompt?: string; + }; + expect(agentCall?.prompt).toBe(originalPrompt); + + expect(onBlockReply).toHaveBeenCalledWith(expect.objectContaining({ text: "fallback reply" })); + }); + + it("rebuilds the prompt when file extraction succeeds without media outputs", async () => { + const fileBlock = '\nline one\n'; + applyMediaUnderstandingMock.mockImplementationOnce( + async (params: { ctx: Record }) => { + params.ctx.Body = `some text\n\n${fileBlock}`; + return { + outputs: [], + decisions: [], + appliedImage: false, + appliedAudio: false, + appliedVideo: false, + appliedFile: true, + }; + }, + ); + runEmbeddedPiAgentMock.mockResolvedValueOnce({ + payloads: [{ text: "file processed" }], + meta: {}, + }); + + const runner = createFollowupRunner({ + opts: { onBlockReply: vi.fn(async () => {}) }, + typing: createMockTypingController(), + typingMode: "instant", + defaultModel: "anthropic/claude-opus-4-5", + }); + + await runner( + createQueuedRun({ + prompt: "[media attached: /tmp/notes.txt (text/plain)]\nsome text", + mediaContext: { + Body: "some text", + CommandBody: "some text", + RawBody: "some text", + MediaPaths: ["/tmp/notes.txt"], + MediaTypes: ["text/plain"], + }, + }), + ); + + const agentCall = runEmbeddedPiAgentMock.mock.calls.at(-1)?.[0] as { + prompt?: string; + }; + expect(agentCall?.prompt).toContain("[media attached: /tmp/notes.txt (text/plain)]"); + expect(agentCall?.prompt).toContain(fileBlock); + expect(agentCall?.prompt?.match(/ { + const fileBlock = '\nreport content\n'; + applyMediaUnderstandingMock.mockImplementationOnce( + async (params: { ctx: Record }) => { + params.ctx.Body = `summarize this\n\n${fileBlock}`; + return { + outputs: [], + decisions: [], + appliedImage: false, + appliedAudio: false, + appliedVideo: false, + appliedFile: true, + }; + }, + ); + runEmbeddedPiAgentMock.mockResolvedValueOnce({ + payloads: [{ text: "file processed" }], + meta: {}, + }); + + const runner = createFollowupRunner({ + opts: { onBlockReply: vi.fn(async () => {}) }, + typing: createMockTypingController(), + typingMode: "instant", + defaultModel: "anthropic/claude-opus-4-5", + }); + + await runner( + createQueuedRun({ + prompt: "[media attached: /tmp/report.pdf]\nLine: Alice\nsummarize this", + mediaContext: { + Body: "Line: Alice\nsummarize this", + RawBody: "summarize this", + MediaPaths: ["/tmp/report.pdf"], + MediaTypes: ["application/pdf"], + }, + }), + ); + + expect(applyMediaUnderstandingMock).toHaveBeenCalledTimes(1); + const agentCall = runEmbeddedPiAgentMock.mock.calls.at(-1)?.[0] as { + prompt?: string; + }; + expect(agentCall?.prompt).toContain("Line: Alice"); + expect(agentCall?.prompt).toContain(fileBlock); + expect(agentCall?.prompt?.match(/ { + applyMediaUnderstandingMock.mockImplementationOnce( + async (params: { ctx: Record }) => { + // Simulate transcription updating the context + params.ctx.MediaUnderstanding = [ + { + kind: "audio.transcription", + text: "voice transcript", + attachmentIndex: 0, + provider: "whisper", + }, + ]; + params.ctx.Transcript = "voice transcript"; + params.ctx.Body = "[Audio]\nUser text:\nsome text\nTranscript:\nvoice transcript"; + return { + outputs: [ + { + kind: "audio.transcription", + text: "voice transcript", + attachmentIndex: 0, + provider: "whisper", + }, + ], + decisions: [], + appliedImage: false, + appliedAudio: true, + appliedVideo: false, + appliedFile: false, + }; + }, + ); + runEmbeddedPiAgentMock.mockResolvedValueOnce({ + payloads: [{ text: "got both" }], + meta: {}, + }); + + const onBlockReply = vi.fn(async () => {}); + const runner = createFollowupRunner({ + opts: { onBlockReply }, + typing: createMockTypingController(), + typingMode: "instant", + defaultModel: "anthropic/claude-opus-4-5", + }); + + const queued = createQueuedRun({ + prompt: + "[media attached: 2 files]\n[media attached 1/2: /tmp/voice.ogg (audio/ogg)]\n[media attached 2/2: /tmp/photo.jpg (image/jpeg)]\nsome text", + mediaContext: { + Body: "some text", + MediaPaths: ["/tmp/voice.ogg", "/tmp/photo.jpg"], + MediaTypes: ["audio/ogg", "image/jpeg"], + }, + }); + await runner(queued); + + const agentCall = runEmbeddedPiAgentMock.mock.calls.at(-1)?.[0] as { + prompt?: string; + }; + // Audio attachment line should be stripped + expect(agentCall?.prompt).not.toContain("voice.ogg"); + // Image attachment line should also be stripped (all media-attached lines are + // removed and replaced by the new buildInboundMediaNote output) + // The transcript should be present + expect(agentCall?.prompt).toContain("voice transcript"); + }); + + it("strips queued media lines when attachment paths or URLs contain a literal closing bracket", async () => { + const transcriptText = "Bracket-safe transcript"; + applyMediaUnderstandingMock.mockImplementationOnce( + async (params: { ctx: Record }) => { + params.ctx.MediaUnderstanding = [ + { + kind: "audio.transcription", + text: transcriptText, + attachmentIndex: 0, + provider: "whisper", + }, + ]; + params.ctx.Transcript = transcriptText; + params.ctx.Body = `[Audio]\nUser text:\nsome text\nTranscript:\n${transcriptText}`; + return { + outputs: [ + { + kind: "audio.transcription", + text: transcriptText, + attachmentIndex: 0, + provider: "whisper", + }, + ], + decisions: [], + appliedImage: false, + appliedAudio: true, + appliedVideo: false, + appliedFile: false, + }; + }, + ); + runEmbeddedPiAgentMock.mockResolvedValueOnce({ + payloads: [{ text: "ok" }], + meta: {}, + }); + + const runner = createFollowupRunner({ + opts: { onBlockReply: vi.fn(async () => {}) }, + typing: createMockTypingController(), + typingMode: "instant", + defaultModel: "anthropic/claude-opus-4-5", + }); + + await runner( + createQueuedRun({ + prompt: + "[media attached: /tmp/voice[0].ogg (audio/ogg) | https://cdn.example.com/files[0].ogg?sig=abc]123]\n" + + MEDIA_REPLY_HINT + + "\n" + + "some text", + mediaContext: { + Body: "some text", + MediaPaths: ["/tmp/voice[0].ogg"], + MediaUrls: ["https://cdn.example.com/files[0].ogg?sig=abc]123"], + MediaTypes: ["audio/ogg"], + }, + }), + ); + + const agentCall = runEmbeddedPiAgentMock.mock.calls.at(-1)?.[0] as { + prompt?: string; + }; + expect(agentCall?.prompt).toContain(transcriptText); + expect(agentCall?.prompt).not.toContain("/tmp/voice[0].ogg"); + expect(agentCall?.prompt).not.toContain("https://cdn.example.com/files[0].ogg?sig=abc]123"); + expect(agentCall?.prompt).not.toContain(MEDIA_REPLY_HINT); + }); + + it("preserves file-only media understanding when outputs are empty", async () => { + applyMediaUnderstandingMock.mockImplementationOnce( + async (params: { ctx: Record }) => { + params.ctx.Body = + '\nQuarterly report body\n'; + return { + outputs: [], + decisions: [], + appliedImage: false, + appliedAudio: false, + appliedVideo: false, + appliedFile: true, + }; + }, + ); + runEmbeddedPiAgentMock.mockResolvedValueOnce({ + payloads: [{ text: "processed" }], + meta: {}, + }); + + const runner = createFollowupRunner({ + opts: { onBlockReply: vi.fn(async () => {}) }, + typing: createMockTypingController(), + typingMode: "instant", + defaultModel: "anthropic/claude-opus-4-5", + }); + + await runner( + createQueuedRun({ + prompt: `[media attached: /tmp/report.pdf]\n${MEDIA_REPLY_HINT}\n[User sent media without caption]`, + mediaContext: { + Body: "", + MediaPaths: ["/tmp/report.pdf"], + MediaTypes: ["application/pdf"], + }, + }), + ); + + const agentCall = runEmbeddedPiAgentMock.mock.calls.at(-1)?.[0] as { + prompt?: string; + }; + expect(agentCall?.prompt).toContain("[media attached: /tmp/report.pdf (application/pdf)]"); + expect(agentCall?.prompt).toContain(MEDIA_REPLY_HINT); + expect(agentCall?.prompt).toContain(''); + expect(agentCall?.prompt).toContain("Quarterly report body"); + expect(agentCall?.prompt).not.toContain("[User sent media without caption]"); + }); + + it("replaces the queued body when inline directives were already stripped from the prompt", async () => { + applyMediaUnderstandingMock.mockImplementationOnce( + async (params: { ctx: Record }) => { + params.ctx.Body = + '/think high summarize this\n\n\nreport\n'; + return { + outputs: [], + decisions: [], + appliedImage: false, + appliedAudio: false, + appliedVideo: false, + appliedFile: true, + }; + }, + ); + runEmbeddedPiAgentMock.mockResolvedValueOnce({ + payloads: [{ text: "processed" }], + meta: {}, + }); + + const runner = createFollowupRunner({ + opts: { onBlockReply: vi.fn(async () => {}) }, + typing: createMockTypingController(), + typingMode: "instant", + defaultModel: "anthropic/claude-opus-4-5", + }); + + await runner( + createQueuedRun({ + prompt: `[media attached: /tmp/report.pdf]\n${MEDIA_REPLY_HINT}\nsummarize this`, + mediaContext: { + Body: "/think high summarize this", + MediaPaths: ["/tmp/report.pdf"], + MediaTypes: ["application/pdf"], + }, + }), + ); + + const agentCall = runEmbeddedPiAgentMock.mock.calls.at(-1)?.[0] as { + prompt?: string; + }; + expect(agentCall?.prompt).toContain("summarize this"); + expect(agentCall?.prompt).toContain(''); + expect(agentCall?.prompt).not.toContain("summarize this\n\n/think high summarize this"); + expect(agentCall?.prompt).not.toContain("/think high summarize this"); + }); + + it("preserves directive-like tokens inside extracted media content", async () => { + const fileBlock = + '\n/model claude-opus should stay\n/queue followup should stay\n'; + applyMediaUnderstandingMock.mockImplementationOnce( + async (params: { ctx: Record }) => { + params.ctx.Body = `/think high summarize this\n\n${fileBlock}`; + return { + outputs: [], + decisions: [], + appliedImage: false, + appliedAudio: false, + appliedVideo: false, + appliedFile: true, + }; + }, + ); + runEmbeddedPiAgentMock.mockResolvedValueOnce({ + payloads: [{ text: "processed" }], + meta: {}, + }); + + const runner = createFollowupRunner({ + opts: { onBlockReply: vi.fn(async () => {}) }, + typing: createMockTypingController(), + typingMode: "instant", + defaultModel: "anthropic/claude-opus-4-5", + }); + + await runner( + createQueuedRun({ + prompt: `[media attached: /tmp/notes.txt]\n${MEDIA_REPLY_HINT}\nsummarize this`, + mediaContext: { + Body: "/think high summarize this", + MediaPaths: ["/tmp/notes.txt"], + MediaTypes: ["text/plain"], + }, + }), + ); + + const agentCall = runEmbeddedPiAgentMock.mock.calls.at(-1)?.[0] as { + prompt?: string; + }; + expect(agentCall?.prompt).toContain("summarize this"); + expect(agentCall?.prompt).not.toContain("/think high summarize this"); + expect(agentCall?.prompt).toContain("/model claude-opus should stay"); + expect(agentCall?.prompt).toContain("/queue followup should stay"); + }); + + it("rebuilds the prompt when image understanding mutates the body without outputs", async () => { + const description = "[Image]\nDescription:\na mountain at sunset"; + applyMediaUnderstandingMock.mockImplementationOnce( + async (params: { ctx: Record }) => { + params.ctx.Body = description; + return { + outputs: [], + decisions: [], + appliedImage: true, + appliedAudio: false, + appliedVideo: false, + appliedFile: false, + }; + }, + ); + runEmbeddedPiAgentMock.mockResolvedValueOnce({ + payloads: [{ text: "processed" }], + meta: {}, + }); + + const runner = createFollowupRunner({ + opts: { onBlockReply: vi.fn(async () => {}) }, + typing: createMockTypingController(), + typingMode: "instant", + defaultModel: "anthropic/claude-opus-4-5", + }); + + await runner( + createQueuedRun({ + prompt: "[media attached: /tmp/photo.jpg (image/jpeg)]\nsome text", + mediaContext: { + Body: "some text", + MediaPaths: ["/tmp/photo.jpg"], + MediaTypes: ["image/jpeg"], + }, + }), + ); + + const agentCall = runEmbeddedPiAgentMock.mock.calls.at(-1)?.[0] as { + prompt?: string; + }; + expect(agentCall?.prompt).toContain("a mountain at sunset"); + }); + + it("does not false-positive on user text containing literal ' { + const fileBlock = '\ncol1,col2\n1,2\n'; + applyMediaUnderstandingMock.mockImplementationOnce( + async (params: { ctx: Record }) => { + params.ctx.Body = `check my {}) }, + typing: createMockTypingController(), + typingMode: "instant", + defaultModel: "anthropic/claude-opus-4-5", + }); + + // User message contains literal " { + const fileBlock = + '\nRun `/think high` literally in the shell example.\n'; + applyMediaUnderstandingMock.mockImplementationOnce( + async (params: { ctx: Record }) => { + params.ctx.Body = `summarize this\n\n${fileBlock}`; + return { + outputs: [], + decisions: [], + appliedImage: false, + appliedAudio: false, + appliedVideo: false, + appliedFile: true, + }; + }, + ); + runEmbeddedPiAgentMock.mockResolvedValueOnce({ + payloads: [{ text: "processed" }], + meta: {}, + }); + + const runner = createFollowupRunner({ + opts: { onBlockReply: vi.fn(async () => {}) }, + typing: createMockTypingController(), + typingMode: "instant", + defaultModel: "anthropic/claude-opus-4-5", + }); + + await runner( + createQueuedRun({ + prompt: `[media attached: /tmp/notes.txt]\n${MEDIA_REPLY_HINT}\nsummarize this`, + mediaContext: { + Body: "/think high summarize this", + CommandBody: "summarize this", + RawBody: "/think high summarize this", + MediaPaths: ["/tmp/notes.txt"], + MediaTypes: ["text/plain"], + }, + }), + ); + + const agentCall = runEmbeddedPiAgentMock.mock.calls.at(-1)?.[0] as { + prompt?: string; + }; + expect(agentCall?.prompt).toContain("summarize this"); + expect(agentCall?.prompt).toContain("Run `/think high` literally in the shell example."); + }); + + it("rebuilds the prompt when image understanding mutates the body alongside existing text", async () => { + applyMediaUnderstandingMock.mockImplementationOnce( + async (params: { ctx: Record }) => { + params.ctx.Body = "some text\n\n[Image summary]\nA whiteboard with action items."; + return { + outputs: [], + decisions: [], + appliedImage: true, + appliedAudio: false, + appliedVideo: false, + appliedFile: false, + }; + }, + ); + runEmbeddedPiAgentMock.mockResolvedValueOnce({ + payloads: [{ text: "processed" }], + meta: {}, + }); + + const runner = createFollowupRunner({ + opts: { onBlockReply: vi.fn(async () => {}) }, + typing: createMockTypingController(), + typingMode: "instant", + defaultModel: "anthropic/claude-opus-4-5", + }); + + await runner( + createQueuedRun({ + prompt: "[media attached: /tmp/board.jpg (image/jpeg)]\nsome text", + mediaContext: { + Body: "some text", + CommandBody: "some text", + RawBody: "some text", + MediaPaths: ["/tmp/board.jpg"], + MediaTypes: ["image/jpeg"], + }, + }), + ); + + const agentCall = runEmbeddedPiAgentMock.mock.calls.at(-1)?.[0] as { + prompt?: string; + }; + expect(agentCall?.prompt).toContain("[Image summary]"); + expect(agentCall?.prompt).toContain("A whiteboard with action items."); + }); + + it("applies media understanding for URL-only deferred attachments", async () => { + applyMediaUnderstandingMock.mockImplementationOnce( + async (params: { ctx: Record }) => { + params.ctx.Body = "[Audio]\nTranscript:\nremote transcript"; + params.ctx.Transcript = "remote transcript"; + return { + outputs: [ + { + kind: "audio.transcription", + text: "remote transcript", + attachmentIndex: 0, + provider: "whisper", + }, + ], + decisions: [], + appliedImage: false, + appliedAudio: true, + appliedVideo: false, + appliedFile: false, + }; + }, + ); + runEmbeddedPiAgentMock.mockResolvedValueOnce({ + payloads: [{ text: "processed" }], + meta: {}, + }); + + const runner = createFollowupRunner({ + opts: { onBlockReply: vi.fn(async () => {}) }, + typing: createMockTypingController(), + typingMode: "instant", + defaultModel: "anthropic/claude-opus-4-5", + }); + + await runner( + createQueuedRun({ + prompt: "[User sent media without caption]", + mediaContext: { + Body: "", + MediaUrl: "https://cdn.example.com/audio.ogg", + MediaUrls: ["https://cdn.example.com/audio.ogg"], + MediaType: "audio/ogg", + MediaTypes: ["audio/ogg"], + }, + }), + ); + + expect(applyMediaUnderstandingMock).toHaveBeenCalledTimes(1); + const agentCall = runEmbeddedPiAgentMock.mock.calls.at(-1)?.[0] as { + prompt?: string; + }; + expect(agentCall?.prompt).toContain("remote transcript"); + }); + + it("uses resolved body (CommandBody) as originalBody for accurate prompt replacement", async () => { + const fileBlock = '\nreport content\n'; + applyMediaUnderstandingMock.mockImplementationOnce( + async (params: { ctx: Record }) => { + // applyMediaUnderstanding mutates the resolved body (which is CommandBody) + params.ctx.Body = `summarize this\n\n${fileBlock}`; + return { + outputs: [], + decisions: [], + appliedImage: false, + appliedAudio: false, + appliedVideo: false, + appliedFile: true, + }; + }, + ); + runEmbeddedPiAgentMock.mockResolvedValueOnce({ + payloads: [{ text: "processed" }], + meta: {}, + }); + + const runner = createFollowupRunner({ + opts: { onBlockReply: vi.fn(async () => {}) }, + typing: createMockTypingController(), + typingMode: "instant", + defaultModel: "anthropic/claude-opus-4-5", + }); + + // Body has directive prefix; CommandBody has the cleaned version. + // The prompt was built from CommandBody, so originalBody should match CommandBody + // for accurate replacement. + await runner( + createQueuedRun({ + prompt: `[media attached: /tmp/report.pdf]\n${MEDIA_REPLY_HINT}\nsummarize this`, + mediaContext: { + Body: "/think high summarize this", + CommandBody: "summarize this", + RawBody: "/think high summarize this", + MediaPaths: ["/tmp/report.pdf"], + MediaTypes: ["application/pdf"], + }, + }), + ); + + const agentCall = runEmbeddedPiAgentMock.mock.calls.at(-1)?.[0] as { + prompt?: string; + }; + // File block should be present (extraction succeeded) + expect(agentCall?.prompt).toContain(fileBlock); + // The body text should appear once, not duplicated + expect(agentCall?.prompt).toContain("summarize this"); + // Should NOT contain the directive prefix + expect(agentCall?.prompt).not.toContain("/think high"); + // The body should not be duplicated (would happen if originalBody didn't match) + const matches = agentCall?.prompt?.match(/summarize this/g); + expect(matches?.length).toBe(1); + }); + + it("does not duplicate file blocks for mixed audio+file messages re-processed in followup", async () => { + const existingFileBlock = + '\nold extracted content\n'; + const newFileBlock = + '\nnew extracted content\n'; + const transcriptText = "Mixed message transcript"; + + applyMediaUnderstandingMock.mockImplementationOnce( + async (params: { ctx: Record }) => { + params.ctx.MediaUnderstanding = [ + { + kind: "audio.transcription", + text: transcriptText, + attachmentIndex: 0, + provider: "whisper", + }, + ]; + params.ctx.Transcript = transcriptText; + params.ctx.Body = `[Audio]\nTranscript:\n${transcriptText}\n\nanalyze this\n\n${newFileBlock}`; + return { + outputs: [ + { + kind: "audio.transcription", + text: transcriptText, + attachmentIndex: 0, + provider: "whisper", + }, + ], + decisions: [], + appliedImage: false, + appliedAudio: true, + appliedVideo: false, + appliedFile: true, + }; + }, + ); + runEmbeddedPiAgentMock.mockResolvedValueOnce({ + payloads: [{ text: "processed" }], + meta: {}, + }); + + const runner = createFollowupRunner({ + opts: { onBlockReply: vi.fn(async () => {}) }, + typing: createMockTypingController(), + typingMode: "instant", + defaultModel: "anthropic/claude-opus-4-5", + }); + + // Simulate a mixed message where the primary path already extracted the + // PDF (file block is in the prompt) but audio transcription failed. + await runner( + createQueuedRun({ + prompt: `[media attached 1/2: /tmp/voice.ogg]\n[media attached 2/2: /tmp/report.pdf]\n${MEDIA_REPLY_HINT}\nanalyze this\n\n${existingFileBlock}`, + mediaContext: { + Body: `analyze this\n\n${existingFileBlock}`, + CommandBody: "analyze this", + RawBody: "analyze this", + MediaPaths: ["/tmp/voice.ogg", "/tmp/report.pdf"], + MediaTypes: ["audio/ogg", "application/pdf"], + }, + }), + ); + + const agentCall = runEmbeddedPiAgentMock.mock.calls.at(-1)?.[0] as { + prompt?: string; + }; + // Should contain the transcript + expect(agentCall?.prompt).toContain(transcriptText); + // Should have exactly one file block (the new one), not two + expect(agentCall?.prompt?.match(/ { + const quotedFileBlock = + '\nquoted thread attachment\n'; + const existingFileBlock = + '\nold extracted content\n'; + const newFileBlock = + '\nnew extracted content\n'; + const transcriptText = "Transcript from deferred audio"; + + applyMediaUnderstandingMock.mockImplementationOnce( + async (params: { ctx: Record }) => { + params.ctx.MediaUnderstanding = [ + { + kind: "audio.transcription", + text: transcriptText, + attachmentIndex: 0, + provider: "whisper", + }, + ]; + params.ctx.Transcript = transcriptText; + params.ctx.Body = `[Audio]\nTranscript:\n${transcriptText}\n\nsummarize this\n\n${newFileBlock}`; + return { + outputs: [ + { + kind: "audio.transcription", + text: transcriptText, + attachmentIndex: 0, + provider: "whisper", + }, + ], + decisions: [], + appliedImage: false, + appliedAudio: true, + appliedVideo: false, + appliedFile: true, + }; + }, + ); + runEmbeddedPiAgentMock.mockResolvedValueOnce({ + payloads: [{ text: "processed" }], + meta: {}, + }); + + const runner = createFollowupRunner({ + opts: { onBlockReply: vi.fn(async () => {}) }, + typing: createMockTypingController(), + typingMode: "instant", + defaultModel: "anthropic/claude-opus-4-5", + }); + + await runner( + createQueuedRun({ + prompt: `[media attached 1/2: /tmp/voice.ogg]\n[media attached 2/2: /tmp/report.pdf]\n${MEDIA_REPLY_HINT}\nQuoted thread above\n\n${quotedFileBlock}`, + mediaContext: { + Body: `summarize this\n\n${existingFileBlock}`, + CommandBody: "summarize this", + RawBody: "summarize this", + MediaPaths: ["/tmp/voice.ogg", "/tmp/report.pdf"], + MediaTypes: ["audio/ogg", "application/pdf"], + }, + }), + ); + + const agentCall = runEmbeddedPiAgentMock.mock.calls.at(-1)?.[0] as { + prompt?: string; + }; + expect(agentCall?.prompt).toContain("Quoted thread above"); + expect(agentCall?.prompt).toContain(quotedFileBlock); + expect(agentCall?.prompt).toContain(newFileBlock); + expect(agentCall?.prompt?.match(/ { + const existingFileBlock = + '\nsummary notes:\nsummarize this\n'; + const transcriptText = "Transcript from deferred audio"; + + applyMediaUnderstandingMock.mockImplementationOnce( + async (params: { ctx: Record }) => { + params.ctx.MediaUnderstanding = [ + { + kind: "audio.transcription", + text: transcriptText, + attachmentIndex: 0, + provider: "whisper", + }, + ]; + params.ctx.Transcript = transcriptText; + params.ctx.Body = `[Audio]\nTranscript:\n${transcriptText}\n\nsummarize this`; + return { + outputs: [ + { + kind: "audio.transcription", + text: transcriptText, + attachmentIndex: 0, + provider: "whisper", + }, + ], + decisions: [], + appliedImage: false, + appliedAudio: true, + appliedVideo: false, + appliedFile: false, + }; + }, + ); + runEmbeddedPiAgentMock.mockResolvedValueOnce({ + payloads: [{ text: "processed" }], + meta: {}, + }); + + const runner = createFollowupRunner({ + opts: { onBlockReply: vi.fn(async () => {}) }, + typing: createMockTypingController(), + typingMode: "instant", + defaultModel: "anthropic/claude-opus-4-5", + }); + + await runner( + createQueuedRun({ + prompt: `[media attached 1/2: /tmp/voice.ogg]\n[media attached 2/2: /tmp/report.pdf]\n${MEDIA_REPLY_HINT}\nsummarize this\n\n${existingFileBlock}`, + mediaContext: { + Body: `summarize this\n\n${existingFileBlock}`, + CommandBody: "summarize this", + RawBody: "summarize this", + MediaPaths: ["/tmp/voice.ogg", "/tmp/report.pdf"], + MediaTypes: ["audio/ogg", "application/pdf"], + }, + }), + ); + + const agentCall = runEmbeddedPiAgentMock.mock.calls.at(-1)?.[0] as { + prompt?: string; + }; + const transcriptBlock = `[Audio]\nTranscript:\n${transcriptText}\n\nsummarize this`; + expect(agentCall?.prompt).toContain(existingFileBlock); + expect(agentCall?.prompt).toContain(transcriptBlock); + expect(agentCall?.prompt?.indexOf(transcriptBlock)).toBeGreaterThan(-1); + expect(agentCall?.prompt?.indexOf(transcriptBlock)).toBeLessThan( + agentCall?.prompt?.indexOf(existingFileBlock) ?? -1, + ); + }); + + it("finds the body after thread-history file blocks when body appears after the first tag", async () => { + const threadFileBlock = + '\nolder thread attachment\n'; + const transcriptText = "Transcript from deferred voice note"; + + applyMediaUnderstandingMock.mockImplementationOnce( + async (params: { ctx: Record }) => { + params.ctx.MediaUnderstanding = [ + { + kind: "audio.transcription", + text: transcriptText, + attachmentIndex: 0, + provider: "whisper", + }, + ]; + params.ctx.Transcript = transcriptText; + params.ctx.Body = `[Audio]\nTranscript:\n${transcriptText}\n\ncheck this out`; + return { + outputs: [ + { + kind: "audio.transcription", + text: transcriptText, + attachmentIndex: 0, + provider: "whisper", + }, + ], + decisions: [], + appliedImage: false, + appliedAudio: true, + appliedVideo: false, + appliedFile: false, + }; + }, + ); + runEmbeddedPiAgentMock.mockResolvedValueOnce({ + payloads: [{ text: "processed" }], + meta: {}, + }); + + const runner = createFollowupRunner({ + opts: { onBlockReply: vi.fn(async () => {}) }, + typing: createMockTypingController(), + typingMode: "instant", + defaultModel: "anthropic/claude-opus-4-5", + }); + + // The prompt has thread history with a file block BEFORE the current + // queued body text. The old truncation approach would miss the body + // because it only searched before the first tag. + await runner( + createQueuedRun({ + prompt: `[media attached: /tmp/voice.ogg]\n${MEDIA_REPLY_HINT}\nThread history\n\n${threadFileBlock}\n\ncheck this out`, + mediaContext: { + Body: "check this out", + RawBody: "check this out", + MediaPaths: ["/tmp/voice.ogg"], + MediaTypes: ["audio/ogg"], + }, + }), + ); + + const agentCall = runEmbeddedPiAgentMock.mock.calls.at(-1)?.[0] as { + prompt?: string; + }; + const transcriptBlock = `[Audio]\nTranscript:\n${transcriptText}\n\ncheck this out`; + // The body should be replaced with the transcript block + expect(agentCall?.prompt).toContain(transcriptBlock); + // Thread history and its file block should be preserved + expect(agentCall?.prompt).toContain("Thread history"); + expect(agentCall?.prompt).toContain(threadFileBlock); + }); + + it("sets DeferredMediaApplied when media understanding throws", async () => { + applyMediaUnderstandingMock.mockRejectedValueOnce( + new Error("transcription service unavailable"), + ); + runEmbeddedPiAgentMock.mockResolvedValueOnce({ + payloads: [{ text: "fallback reply" }], + meta: {}, + }); + + const runner = createFollowupRunner({ + opts: { onBlockReply: vi.fn(async () => {}) }, + typing: createMockTypingController(), + typingMode: "instant", + defaultModel: "anthropic/claude-opus-4-5", + }); + + const queued = createQueuedRun({ + prompt: "[media attached: /tmp/voice.ogg (audio/ogg)]\nsome text", + mediaContext: { + Body: "some text", + MediaPaths: ["/tmp/voice.ogg"], + MediaTypes: ["audio/ogg"], + }, + }); + + await runner(queued); + + // DeferredMediaApplied should be set so re-runs don't retry + expect(queued.mediaContext?.DeferredMediaApplied).toBe(true); + + // The agent should still be called with the raw prompt + const agentCall = runEmbeddedPiAgentMock.mock.calls.at(-1)?.[0] as { + prompt?: string; + }; + expect(agentCall?.prompt).toContain("some text"); + }); + + it("does not re-apply file extraction when the stored media body already has a file block", async () => { + const fileBlock = '\nreport content\n'; + runEmbeddedPiAgentMock.mockResolvedValueOnce({ + payloads: [{ text: "processed" }], + meta: {}, + }); + + const runner = createFollowupRunner({ + opts: { onBlockReply: vi.fn(async () => {}) }, + typing: createMockTypingController(), + typingMode: "instant", + defaultModel: "anthropic/claude-opus-4-5", + }); + + await runner( + createQueuedRun({ + prompt: `[media attached: /tmp/report.pdf]\n${MEDIA_REPLY_HINT}\nsummarize this\n\n${fileBlock}`, + mediaContext: { + Body: `summarize this\n\n${fileBlock}`, + CommandBody: "summarize this", + RawBody: "summarize this", + MediaPaths: ["/tmp/report.pdf"], + MediaTypes: ["application/pdf"], + }, + }), + ); + + expect(applyMediaUnderstandingMock).not.toHaveBeenCalled(); + const agentCall = runEmbeddedPiAgentMock.mock.calls.at(-1)?.[0] as { + prompt?: string; + }; + expect(agentCall?.prompt?.match(/ { + const fileBlock = '\nreport content\n'; + runEmbeddedPiAgentMock.mockResolvedValueOnce({ + payloads: [{ text: "processed" }], + meta: {}, + }); + + const runner = createFollowupRunner({ + opts: { onBlockReply: vi.fn(async () => {}) }, + typing: createMockTypingController(), + typingMode: "instant", + defaultModel: "anthropic/claude-opus-4-5", + }); + + await runner( + createQueuedRun({ + prompt: `[media attached: /tmp/report.pdf]\n${MEDIA_REPLY_HINT}\nsummarize this\n\n${fileBlock}`, + mediaContext: { + Body: `summarize this\n\n${fileBlock}`, + CommandBody: "summarize this", + MediaPaths: ["/tmp/report.pdf"], + MediaTypes: ["application/pdf"], + }, + }), + ); + + expect(applyMediaUnderstandingMock).not.toHaveBeenCalled(); + const agentCall = runEmbeddedPiAgentMock.mock.calls.at(-1)?.[0] as { + prompt?: string; + }; + expect(agentCall?.prompt).toContain(fileBlock); + }); + + it("treats any stored file block as already extracted even when the filename differs from the attachment basename", async () => { + const fileBlock = + '\nreport content\n'; + runEmbeddedPiAgentMock.mockResolvedValueOnce({ + payloads: [{ text: "processed" }], + meta: {}, + }); + + const runner = createFollowupRunner({ + opts: { onBlockReply: vi.fn(async () => {}) }, + typing: createMockTypingController(), + typingMode: "instant", + defaultModel: "anthropic/claude-opus-4-5", + }); + + const queued = createQueuedRun({ + prompt: `[media attached: /tmp/upload-8472.bin]\n${MEDIA_REPLY_HINT}\nsummarize this\n\n${fileBlock}`, + mediaContext: { + Body: `summarize this\n\n${fileBlock}`, + CommandBody: "summarize this", + MediaPaths: ["/tmp/upload-8472.bin"], + MediaTypes: ["application/pdf"], + }, + }); + + await runner(queued); + + expect(applyMediaUnderstandingMock).not.toHaveBeenCalled(); + expect(queued.mediaContext?.DeferredFileBlocksExtracted).toBe(true); + const agentCall = runEmbeddedPiAgentMock.mock.calls.at(-1)?.[0] as { + prompt?: string; + }; + expect(agentCall?.prompt?.match(/ { + const existingFileBlock = + '\nold extracted content\n'; + const newFileBlock = + '\nnew extracted content\n'; + const transcriptText = "Deferred transcript"; + + applyMediaUnderstandingMock.mockImplementationOnce( + async (params: { ctx: Record }) => { + params.ctx.MediaUnderstanding = [ + { + kind: "audio.transcription", + text: transcriptText, + attachmentIndex: 0, + provider: "whisper", + }, + ]; + params.ctx.Transcript = transcriptText; + params.ctx.Body = `[Audio]\nTranscript:\n${transcriptText}\n\nsummarize this\n\n${newFileBlock}`; + return { + outputs: [ + { + kind: "audio.transcription", + text: transcriptText, + attachmentIndex: 0, + provider: "whisper", + }, + ], + decisions: [], + appliedImage: false, + appliedAudio: true, + appliedVideo: false, + appliedFile: true, + }; + }, + ); + runEmbeddedPiAgentMock.mockResolvedValueOnce({ + payloads: [{ text: "processed" }], + meta: {}, + }); + + const runner = createFollowupRunner({ + opts: { onBlockReply: vi.fn(async () => {}) }, + typing: createMockTypingController(), + typingMode: "instant", + defaultModel: "anthropic/claude-opus-4-5", + }); + + await runner( + createQueuedRun({ + prompt: + `[media attached 1/2: /tmp/voice.ogg]\n[media attached 2/2: /tmp/report.pdf]\n${MEDIA_REPLY_HINT}\nThread history: summarize this\n\n` + + `summarize this\n\n${existingFileBlock}`, + mediaContext: { + Body: `summarize this\n\n${existingFileBlock}`, + CommandBody: "summarize this", + RawBody: "summarize this", + MediaPaths: ["/tmp/voice.ogg", "/tmp/report.pdf"], + MediaTypes: ["audio/ogg", "application/pdf"], + }, + }), + ); + + const agentCall = runEmbeddedPiAgentMock.mock.calls.at(-1)?.[0] as { + prompt?: string; + }; + expect(agentCall?.prompt).toContain("Thread history: summarize this"); + expect(agentCall?.prompt).toContain(transcriptText); + expect(agentCall?.prompt).toContain(newFileBlock); + expect(agentCall?.prompt).not.toContain("old extracted content"); + expect(agentCall?.prompt?.indexOf(newFileBlock)).toBeGreaterThan( + agentCall?.prompt?.lastIndexOf("summarize this") ?? -1, + ); + }); +}); + +describe("followup queue drain deferred media understanding", () => { + it("preprocesses collect batches before synthesizing the followup prompt", async () => { + applyMediaUnderstandingMock.mockImplementationOnce( + async (params: { ctx: Record }) => { + params.ctx.MediaUnderstanding = [ + { + kind: "audio.transcription", + text: "collect transcript", + attachmentIndex: 0, + provider: "whisper", + }, + ]; + params.ctx.Transcript = "collect transcript"; + params.ctx.Body = "[Audio]\nTranscript:\ncollect transcript"; + return { + outputs: [ + { + kind: "audio.transcription", + text: "collect transcript", + attachmentIndex: 0, + provider: "whisper", + }, + ], + decisions: [], + appliedImage: false, + appliedAudio: true, + appliedVideo: false, + appliedFile: false, + }; + }, + ); + const items: FollowupRun[] = [ + createQueuedRun({ + prompt: "[media attached: /tmp/voice.ogg (audio/ogg)]\nsome text", + summaryLine: "some text", + originatingChannel: "telegram", + originatingTo: "chat:1", + run: { messageProvider: "telegram" }, + mediaContext: { + Body: "some text", + MediaPaths: ["/tmp/voice.ogg"], + MediaTypes: ["audio/ogg"], + }, + }), + createQueuedRun({ + prompt: "second text", + summaryLine: "second text", + originatingChannel: "telegram", + originatingTo: "chat:1", + run: { messageProvider: "telegram" }, + }), + ]; + + await applyDeferredMediaToQueuedRuns(items); + + const prompt = buildCollectPrompt({ + title: "[Queued messages while agent was busy]", + items, + renderItem: (item, idx) => `---\nQueued #${idx + 1}\n${item.prompt}`.trim(), + }); + + expect(prompt).toContain("collect transcript"); + expect(prompt).toContain("Queued #2\nsecond text"); + expect(prompt).not.toContain("[media attached: /tmp/voice.ogg"); + }); + + it("preprocesses queued runs in parallel", async () => { + const resolvers: Array<() => void> = []; + const done = () => ({ + outputs: [], + decisions: [], + appliedImage: false, + appliedAudio: false, + appliedVideo: false, + appliedFile: false, + }); + applyMediaUnderstandingMock.mockImplementation( + async () => + await new Promise((resolve) => { + resolvers.push(() => resolve(done())); + }), + ); + + const items: FollowupRun[] = [ + createQueuedRun({ + prompt: "[media attached: /tmp/voice-1.ogg (audio/ogg)]\nfirst text", + summaryLine: "first text", + run: { messageProvider: "telegram" }, + mediaContext: { + Body: "first text", + MediaPaths: ["/tmp/voice-1.ogg"], + MediaTypes: ["audio/ogg"], + }, + }), + createQueuedRun({ + prompt: "[media attached: /tmp/voice-2.ogg (audio/ogg)]\nsecond text", + summaryLine: "second text", + run: { messageProvider: "telegram" }, + mediaContext: { + Body: "second text", + MediaPaths: ["/tmp/voice-2.ogg"], + MediaTypes: ["audio/ogg"], + }, + }), + ]; + + const pending = applyDeferredMediaToQueuedRuns(items); + + expect(applyMediaUnderstandingMock).toHaveBeenCalledTimes(2); + + for (const resolve of resolvers) { + resolve(); + } + await pending; + }); + + it("preprocesses dropped media items before building overflow summaries", async () => { + applyMediaUnderstandingMock.mockImplementationOnce( + async (params: { ctx: Record }) => { + params.ctx.MediaUnderstanding = [ + { + kind: "audio.transcription", + text: "overflow transcript", + attachmentIndex: 0, + provider: "whisper", + }, + ]; + params.ctx.Transcript = "overflow transcript"; + params.ctx.Body = "[Audio]\nTranscript:\noverflow transcript"; + return { + outputs: [ + { + kind: "audio.transcription", + text: "overflow transcript", + attachmentIndex: 0, + provider: "whisper", + }, + ], + decisions: [], + appliedImage: false, + appliedAudio: true, + appliedVideo: false, + appliedFile: false, + }; + }, + ); + const summaryPrompt = await buildMediaAwareQueueSummaryPrompt({ + dropPolicy: "summarize", + droppedCount: 1, + summaryLines: ["[media attached: /tmp/voice.ogg (audio/ogg)]"], + summaryItems: [ + createQueuedRun({ + prompt: "[media attached: /tmp/voice.ogg (audio/ogg)]", + summaryLine: "", + run: { messageProvider: "telegram" }, + mediaContext: { + Body: "", + MediaPaths: ["/tmp/voice.ogg"], + MediaTypes: ["audio/ogg"], + }, + }), + ], + noun: "message", + }); + + expect(summaryPrompt).toContain("[Queue overflow] Dropped 1 message due to cap."); + expect(summaryPrompt).toContain("overflow transcript"); + expect(summaryPrompt).not.toContain("[media attached: /tmp/voice.ogg"); + }); +}); diff --git a/src/auto-reply/reply/followup-runner.ts b/src/auto-reply/reply/followup-runner.ts index 2fd21607095..9a80d4f78ad 100644 --- a/src/auto-reply/reply/followup-runner.ts +++ b/src/auto-reply/reply/followup-runner.ts @@ -20,6 +20,7 @@ import type { OriginatingChannelType } from "../templating.js"; import { isSilentReplyText, SILENT_REPLY_TOKEN } from "../tokens.js"; import type { GetReplyOptions, ReplyPayload } from "../types.js"; import { resolveRunAuthProfile } from "./agent-runner-utils.js"; +import { applyDeferredMediaUnderstandingToQueuedRun } from "./followup-media.js"; import { resolveOriginAccountId, resolveOriginMessageProvider, @@ -157,6 +158,8 @@ export function createFollowupRunner(params: { let bootstrapPromptWarningSignaturesSeen = resolveBootstrapWarningSignaturesSeen( activeSessionEntry?.systemPromptReport, ); + await applyDeferredMediaUnderstandingToQueuedRun(queued, { logLabel: "followup" }); + try { const fallbackResult = await runWithModelFallback({ cfg: queued.run.config, diff --git a/src/auto-reply/reply/get-reply-run.media-only.test.ts b/src/auto-reply/reply/get-reply-run.media-only.test.ts index 829b3937009..f519da10082 100644 --- a/src/auto-reply/reply/get-reply-run.media-only.test.ts +++ b/src/auto-reply/reply/get-reply-run.media-only.test.ts @@ -172,6 +172,45 @@ describe("runPreparedReply media-only handling", () => { expect(call?.followupRun.prompt).toContain("[User sent media without caption]"); }); + it("snapshots URL-only attachments into followup mediaContext", async () => { + await runPreparedReply( + baseParams({ + ctx: { + Body: "check this attachment", + RawBody: "check this attachment", + CommandBody: "check this attachment", + ThreadHistoryBody: "Earlier message in this thread", + OriginatingChannel: "slack", + OriginatingTo: "C123", + ChatType: "group", + MediaUrl: "https://cdn.example.com/input.png", + MediaUrls: ["https://cdn.example.com/input.png"], + MediaType: "image/png", + MediaTypes: ["image/png"], + }, + sessionCtx: { + Body: "check this attachment", + BodyStripped: "check this attachment", + ThreadHistoryBody: "Earlier message in this thread", + Provider: "slack", + ChatType: "group", + OriginatingChannel: "slack", + OriginatingTo: "C123", + }, + }), + ); + + const call = vi.mocked(runReplyAgent).mock.calls[0]?.[0]; + expect(call?.followupRun.mediaContext).toEqual( + expect.objectContaining({ + MediaUrl: "https://cdn.example.com/input.png", + MediaUrls: ["https://cdn.example.com/input.png"], + MediaType: "image/png", + MediaTypes: ["image/png"], + }), + ); + }); + it("keeps thread history context on follow-up turns", async () => { const result = await runPreparedReply( baseParams({ @@ -186,6 +225,41 @@ describe("runPreparedReply media-only handling", () => { expect(call?.followupRun.prompt).toContain("Earlier message in this thread"); }); + it("snapshots mediaContext for URL-only deferred attachments", async () => { + await runPreparedReply( + baseParams({ + ctx: { + Body: "", + RawBody: "", + CommandBody: "", + MediaUrl: "https://cdn.example.com/audio.ogg", + MediaUrls: ["https://cdn.example.com/audio.ogg"], + MediaType: "audio/ogg", + MediaTypes: ["audio/ogg"], + ThreadHistoryBody: "Earlier message in this thread", + OriginatingChannel: "slack", + OriginatingTo: "C123", + ChatType: "group", + }, + sessionCtx: { + Body: "", + BodyStripped: "", + ThreadHistoryBody: "Earlier message in this thread", + Provider: "slack", + ChatType: "group", + OriginatingChannel: "slack", + OriginatingTo: "C123", + }, + }), + ); + + const call = vi.mocked(runReplyAgent).mock.calls[0]?.[0]; + expect(call?.followupRun.mediaContext?.MediaUrl).toBe("https://cdn.example.com/audio.ogg"); + expect(call?.followupRun.mediaContext?.MediaUrls).toEqual([ + "https://cdn.example.com/audio.ogg", + ]); + }); + it("returns the empty-body reply when there is no text and no media", async () => { const result = await runPreparedReply( baseParams({ diff --git a/src/auto-reply/reply/get-reply-run.ts b/src/auto-reply/reply/get-reply-run.ts index c8451fd88f6..252cffbb364 100644 --- a/src/auto-reply/reply/get-reply-run.ts +++ b/src/auto-reply/reply/get-reply-run.ts @@ -310,7 +310,14 @@ export async function runPreparedReply( : [inboundUserContext, baseBodyFinal].filter(Boolean).join("\n\n"); const baseBodyTrimmed = baseBodyForPrompt.trim(); const hasMediaAttachment = Boolean( - sessionCtx.MediaPath || (sessionCtx.MediaPaths && sessionCtx.MediaPaths.length > 0), + sessionCtx.MediaPath || + sessionCtx.MediaUrl || + (sessionCtx.MediaPaths && sessionCtx.MediaPaths.length > 0) || + (sessionCtx.MediaUrls && sessionCtx.MediaUrls.length > 0) || + ctx.MediaPath?.trim() || + ctx.MediaUrl?.trim() || + (Array.isArray(ctx.MediaPaths) && ctx.MediaPaths.length > 0) || + (Array.isArray(ctx.MediaUrls) && ctx.MediaUrls.length > 0), ); if (!baseBodyTrimmed && !hasMediaAttachment) { await typing.onReplyStart(); @@ -387,7 +394,7 @@ export async function runPreparedReply( const mediaReplyHint = mediaNote ? "To send an image back, prefer the message tool (media/path/filePath). If you must inline, use MEDIA:https://example.com/image.jpg (spaces ok, quote if needed) or a safe relative path like MEDIA:./image.jpg. Avoid absolute paths (MEDIA:/...) and ~ paths — they are blocked for security. Keep caption in the text body." : undefined; - let prefixedCommandBody = mediaNote + const prefixedCommandBody = mediaNote ? [mediaNote, mediaReplyHint, prefixedBody ?? ""].filter(Boolean).join("\n").trim() : prefixedBody; if (!resolvedThinkLevel) { @@ -472,11 +479,48 @@ export async function runPreparedReply( isNewSession, }); const authProfileIdSource = sessionEntry?.authProfileOverrideSource; + // Snapshot media-related context for deferred media understanding in the + // followup runner. When MediaUnderstanding is already populated the runner + // knows transcription already succeeded and skips re-application. + const hasMediaAttachments = Boolean( + ctx.MediaPath?.trim() || + ctx.MediaUrl?.trim() || + (Array.isArray(ctx.MediaPaths) && ctx.MediaPaths.length > 0) || + (Array.isArray(ctx.MediaUrls) && ctx.MediaUrls.length > 0), + ); + const mediaContext = hasMediaAttachments + ? { + Body: ctx.Body, + CommandBody: ctx.CommandBody, + RawBody: ctx.RawBody, + Provider: ctx.Provider ?? sessionCtx.Provider, + Surface: ctx.Surface ?? sessionCtx.Surface, + MediaPath: ctx.MediaPath, + MediaUrl: ctx.MediaUrl, + MediaType: ctx.MediaType, + MediaDir: ctx.MediaDir, + MediaPaths: ctx.MediaPaths ? [...ctx.MediaPaths] : undefined, + MediaUrls: ctx.MediaUrls ? [...ctx.MediaUrls] : undefined, + MediaTypes: ctx.MediaTypes ? [...ctx.MediaTypes] : undefined, + MediaRemoteHost: ctx.MediaRemoteHost, + Transcript: ctx.Transcript, + MediaUnderstanding: ctx.MediaUnderstanding ? [...ctx.MediaUnderstanding] : undefined, + MediaUnderstandingDecisions: ctx.MediaUnderstandingDecisions + ? [...ctx.MediaUnderstandingDecisions] + : undefined, + OriginatingChannel: ctx.OriginatingChannel, + OriginatingTo: ctx.OriginatingTo, + AccountId: ctx.AccountId, + MessageThreadId: ctx.MessageThreadId, + } + : undefined; + const followupRun = { prompt: queuedBody, messageId: sessionCtx.MessageSidFull ?? sessionCtx.MessageSid, summaryLine: baseBodyTrimmedRaw, enqueuedAt: Date.now(), + mediaContext, // Originating channel for reply routing. originatingChannel: ctx.OriginatingChannel, originatingTo: ctx.OriginatingTo, diff --git a/src/auto-reply/reply/queue.ts b/src/auto-reply/reply/queue.ts index b097b6c5193..5c3a56f64af 100644 --- a/src/auto-reply/reply/queue.ts +++ b/src/auto-reply/reply/queue.ts @@ -1,6 +1,6 @@ -export { extractQueueDirective } from "./queue/directive.js"; -export { clearSessionQueues } from "./queue/cleanup.js"; export type { ClearSessionQueueResult } from "./queue/cleanup.js"; +export { clearSessionQueues } from "./queue/cleanup.js"; +export { extractQueueDirective } from "./queue/directive.js"; export { scheduleFollowupDrain } from "./queue/drain.js"; export { enqueueFollowupRun, @@ -10,6 +10,7 @@ export { export { resolveQueueSettings } from "./queue/settings.js"; export { clearFollowupQueue } from "./queue/state.js"; export type { + FollowupMediaContext, FollowupRun, QueueDedupeMode, QueueDropPolicy, diff --git a/src/auto-reply/reply/queue/drain.ts b/src/auto-reply/reply/queue/drain.ts index 1e2fb33e4e0..27491713848 100644 --- a/src/auto-reply/reply/queue/drain.ts +++ b/src/auto-reply/reply/queue/drain.ts @@ -3,15 +3,17 @@ import { resolveGlobalMap } from "../../../shared/global-singleton.js"; import { buildCollectPrompt, beginQueueDrain, + buildQueueSummaryLine, + buildQueueSummaryPrompt, clearQueueSummaryState, drainCollectQueueStep, drainNextQueueItem, hasCrossChannelItems, - previewQueueSummaryPrompt, waitForQueueDebounce, } from "../../../utils/queue-helpers.js"; +import { applyDeferredMediaUnderstandingToQueuedRun } from "../followup-media.js"; import { isRoutableChannel } from "../route-reply.js"; -import { FOLLOWUP_QUEUES } from "./state.js"; +import { FOLLOWUP_QUEUES, type FollowupQueueState } from "./state.js"; import type { FollowupRun } from "./types.js"; // Persists the most recent runFollowup callback per queue key so that @@ -68,6 +70,59 @@ function resolveCrossChannelKey(item: FollowupRun): { cross?: true; key?: string }; } +function clearFollowupQueueSummaryState(queue: FollowupQueueState): void { + clearQueueSummaryState(queue); + queue.summaryItems = []; +} + +export async function applyDeferredMediaToQueuedRuns(items: FollowupRun[]): Promise { + await Promise.allSettled( + items.map((item) => + applyDeferredMediaUnderstandingToQueuedRun(item, { logLabel: "followup queue" }), + ), + ); +} + +async function resolveSummaryLines(items: FollowupRun[]): Promise { + // Parallelize the media understanding API calls upfront (same pattern as + // applyDeferredMediaToQueuedRuns), then build summary lines sequentially + // so line order matches the original item order. + await Promise.allSettled( + items.map((item) => + applyDeferredMediaUnderstandingToQueuedRun(item, { logLabel: "followup queue" }), + ), + ); + // After deferred media, prefer the updated prompt (which includes transcripts) + // over the original summaryLine (which may just be the caption text). + return items.map((item) => + buildQueueSummaryLine(item.prompt.trim() || item.summaryLine?.trim() || ""), + ); +} + +export async function buildMediaAwareQueueSummaryPrompt(params: { + dropPolicy: FollowupQueueState["dropPolicy"]; + droppedCount: number; + summaryLines: string[]; + summaryItems: FollowupRun[]; + noun: string; +}): Promise { + if (params.dropPolicy !== "summarize" || params.droppedCount <= 0) { + return undefined; + } + const summaryLines = + params.summaryItems.length > 0 + ? await resolveSummaryLines(params.summaryItems) + : params.summaryLines; + return buildQueueSummaryPrompt({ + state: { + dropPolicy: params.dropPolicy, + droppedCount: params.droppedCount, + summaryLines: [...summaryLines], + }, + noun: params.noun, + }); +} + export function scheduleFollowupDrain( key: string, runFollowup: (run: FollowupRun) => Promise, @@ -107,7 +162,14 @@ export function scheduleFollowupDrain( } const items = queue.items.slice(); - const summary = previewQueueSummaryPrompt({ state: queue, noun: "message" }); + await applyDeferredMediaToQueuedRuns(items); + const summary = await buildMediaAwareQueueSummaryPrompt({ + dropPolicy: queue.dropPolicy, + droppedCount: queue.droppedCount, + summaryLines: queue.summaryLines, + summaryItems: queue.summaryItems, + noun: "message", + }); const run = items.at(-1)?.run ?? queue.lastRun; if (!run) { break; @@ -129,12 +191,18 @@ export function scheduleFollowupDrain( }); queue.items.splice(0, items.length); if (summary) { - clearQueueSummaryState(queue); + clearFollowupQueueSummaryState(queue); } continue; } - const summaryPrompt = previewQueueSummaryPrompt({ state: queue, noun: "message" }); + const summaryPrompt = await buildMediaAwareQueueSummaryPrompt({ + dropPolicy: queue.dropPolicy, + droppedCount: queue.droppedCount, + summaryLines: queue.summaryLines, + summaryItems: queue.summaryItems, + noun: "message", + }); if (summaryPrompt) { const run = queue.lastRun; if (!run) { @@ -155,7 +223,7 @@ export function scheduleFollowupDrain( ) { break; } - clearQueueSummaryState(queue); + clearFollowupQueueSummaryState(queue); continue; } diff --git a/src/auto-reply/reply/queue/enqueue.ts b/src/auto-reply/reply/queue/enqueue.ts index 11da0db98fc..e58cc5ffac5 100644 --- a/src/auto-reply/reply/queue/enqueue.ts +++ b/src/auto-reply/reply/queue/enqueue.ts @@ -1,8 +1,8 @@ import { createDedupeCache } from "../../../infra/dedupe.js"; import { resolveGlobalSingleton } from "../../../shared/global-singleton.js"; -import { applyQueueDropPolicy, shouldSkipQueueItem } from "../../../utils/queue-helpers.js"; +import { buildQueueSummaryLine, shouldSkipQueueItem } from "../../../utils/queue-helpers.js"; import { kickFollowupDrainIfIdle } from "./drain.js"; -import { getExistingFollowupQueue, getFollowupQueue } from "./state.js"; +import { getExistingFollowupQueue, getFollowupQueue, type FollowupQueueState } from "./state.js"; import type { FollowupRun, QueueDedupeMode, QueueSettings } from "./types.js"; /** @@ -57,6 +57,34 @@ function isRunAlreadyQueued( return items.some((item) => item.prompt === run.prompt && hasSameRouting(item)); } +function applyFollowupQueueDropPolicy(queue: FollowupQueueState): boolean { + const cap = queue.cap; + if (cap <= 0 || queue.items.length < cap) { + return true; + } + if (queue.dropPolicy === "new") { + return false; + } + + const dropCount = queue.items.length - cap + 1; + const dropped = queue.items.splice(0, dropCount); + if (queue.dropPolicy === "summarize") { + for (const item of dropped) { + queue.droppedCount += 1; + queue.summaryItems.push(item); + queue.summaryLines.push( + buildQueueSummaryLine(item.summaryLine?.trim() || item.prompt.trim()), + ); + } + const limit = Math.max(0, cap); + while (queue.summaryLines.length > limit) { + queue.summaryLines.shift(); + queue.summaryItems.shift(); + } + } + return true; +} + export function enqueueFollowupRun( key: string, run: FollowupRun, @@ -83,10 +111,7 @@ export function enqueueFollowupRun( queue.lastEnqueuedAt = Date.now(); queue.lastRun = run.run; - const shouldEnqueue = applyQueueDropPolicy({ - queue, - summarize: (item) => item.summaryLine?.trim() || item.prompt.trim(), - }); + const shouldEnqueue = applyFollowupQueueDropPolicy(queue); if (!shouldEnqueue) { return false; } diff --git a/src/auto-reply/reply/queue/state.ts b/src/auto-reply/reply/queue/state.ts index 44208e727dd..94021dd0c4c 100644 --- a/src/auto-reply/reply/queue/state.ts +++ b/src/auto-reply/reply/queue/state.ts @@ -4,6 +4,7 @@ import type { FollowupRun, QueueDropPolicy, QueueMode, QueueSettings } from "./t export type FollowupQueueState = { items: FollowupRun[]; + summaryItems: FollowupRun[]; draining: boolean; lastEnqueuedAt: number; mode: QueueMode; @@ -47,6 +48,7 @@ export function getFollowupQueue(key: string, settings: QueueSettings): Followup const created: FollowupQueueState = { items: [], + summaryItems: [], draining: false, lastEnqueuedAt: 0, mode: settings.mode, @@ -78,6 +80,7 @@ export function clearFollowupQueue(key: string): number { } const cleared = queue.items.length + queue.droppedCount; queue.items.length = 0; + queue.summaryItems.length = 0; queue.droppedCount = 0; queue.summaryLines = []; queue.lastRun = undefined; diff --git a/src/auto-reply/reply/queue/types.ts b/src/auto-reply/reply/queue/types.ts index 507f77d499d..1c2f5e0551a 100644 --- a/src/auto-reply/reply/queue/types.ts +++ b/src/auto-reply/reply/queue/types.ts @@ -2,6 +2,10 @@ import type { ExecToolDefaults } from "../../../agents/bash-tools.js"; import type { SkillSnapshot } from "../../../agents/skills.js"; import type { OpenClawConfig } from "../../../config/config.js"; import type { SessionEntry } from "../../../config/sessions.js"; +import type { + MediaUnderstandingDecision, + MediaUnderstandingOutput, +} from "../../../media-understanding/types.js"; import type { InputProvenance } from "../../../sessions/input-provenance.js"; import type { OriginatingChannelType } from "../../templating.js"; import type { ElevatedLevel, ReasoningLevel, ThinkLevel, VerboseLevel } from "../directives.js"; @@ -19,12 +23,55 @@ export type QueueSettings = { export type QueueDedupeMode = "message-id" | "prompt" | "none"; +/** + * Snapshot of media-related context fields carried on a FollowupRun so that + * the followup runner can apply media understanding (e.g. voice-note + * transcription) when it was not applied — or failed — in the primary path. + */ +export type FollowupMediaContext = { + Body?: string; + CommandBody?: string; + RawBody?: string; + Provider?: string; + Surface?: string; + MediaPath?: string; + MediaUrl?: string; + MediaType?: string; + MediaDir?: string; + MediaPaths?: string[]; + MediaUrls?: string[]; + MediaTypes?: string[]; + MediaRemoteHost?: string; + Transcript?: string; + MediaUnderstanding?: MediaUnderstandingOutput[]; + MediaUnderstandingDecisions?: MediaUnderstandingDecision[]; + OriginatingChannel?: OriginatingChannelType; + OriginatingTo?: string; + AccountId?: string; + MessageThreadId?: string | number; + DeferredMediaApplied?: boolean; + /** + * Set when file extraction has already been applied to Body (either in the + * primary path or by a previous deferred-media run). This avoids re-running + * file extraction when Body already contains real extracted `...` + * blocks. + */ + DeferredFileBlocksExtracted?: boolean; +}; + export type FollowupRun = { prompt: string; /** Provider message ID, when available (for deduplication). */ messageId?: string; summaryLine?: string; enqueuedAt: number; + /** + * Media context snapshot from the original inbound message. + * When present and MediaUnderstanding is empty, the followup runner will + * attempt to apply media understanding (audio transcription, etc.) before + * passing the prompt to the agent. + */ + mediaContext?: FollowupMediaContext; /** * Originating channel for reply routing. * When set, replies should be routed back to this provider