From 3bf55561cbb972df5c122da75ce7a26e7b947c8b Mon Sep 17 00:00:00 2001 From: Joey Krug Date: Sat, 14 Mar 2026 13:40:52 -0400 Subject: [PATCH] fix: apply media understanding to followup-queued messages (#44682) Voice notes arriving while the agent is mid-turn were queued as followup messages without audio transcription. The followup runner called runEmbeddedPiAgent directly, bypassing applyMediaUnderstanding. This adds a mediaContext field to FollowupRun that snapshots the original message's media fields. Before the agent run, the followup runner checks whether media understanding was applied. If not (empty MediaUnderstanding), it calls applyMediaUnderstanding and rebuilds the prompt with the transcript, matching the primary path's formatting. Co-Authored-By: Claude Opus 4.6 --- src/auto-reply/reply/followup-runner.test.ts | 285 ++++++++++++++++++- src/auto-reply/reply/followup-runner.ts | 65 ++++- src/auto-reply/reply/get-reply-run.ts | 34 ++- src/auto-reply/reply/queue.ts | 5 +- src/auto-reply/reply/queue/types.ts | 37 +++ 5 files changed, 416 insertions(+), 10 deletions(-) diff --git a/src/auto-reply/reply/followup-runner.test.ts b/src/auto-reply/reply/followup-runner.test.ts index 0e93ab156a8..f0a060af4ac 100644 --- a/src/auto-reply/reply/followup-runner.test.ts +++ b/src/auto-reply/reply/followup-runner.test.ts @@ -2,7 +2,7 @@ import fs from "node:fs/promises"; import { tmpdir } from "node:os"; import path from "node:path"; import { beforeEach, describe, expect, it, vi } from "vitest"; -import { loadSessionStore, saveSessionStore, type SessionEntry } from "../../config/sessions.js"; +import { loadSessionStore, type SessionEntry, saveSessionStore } from "../../config/sessions.js"; import type { FollowupRun } from "./queue.js"; import * as sessionRunAccounting from "./session-run-accounting.js"; import { createMockFollowupRun, createMockTypingController } from "./test-helpers.js"; @@ -10,6 +10,7 @@ import { createMockFollowupRun, createMockTypingController } from "./test-helper const runEmbeddedPiAgentMock = vi.fn(); const routeReplyMock = vi.fn(); const isRoutableChannelMock = vi.fn(); +const applyMediaUnderstandingMock = vi.fn(); vi.mock( "../../agents/model-fallback.js", @@ -20,6 +21,10 @@ vi.mock("../../agents/pi-embedded.js", () => ({ runEmbeddedPiAgent: (params: unknown) => runEmbeddedPiAgentMock(params), })); +vi.mock("../../media-understanding/apply.js", () => ({ + applyMediaUnderstanding: (params: unknown) => applyMediaUnderstandingMock(params), +})); + vi.mock("./route-reply.js", async (importOriginal) => { const actual = await importOriginal(); return { @@ -48,13 +53,24 @@ beforeEach(() => { isRoutableChannelMock.mockImplementation((ch: string | undefined) => Boolean(ch?.trim() && ROUTABLE_TEST_CHANNELS.has(ch.trim().toLowerCase())), ); + applyMediaUnderstandingMock.mockReset(); + applyMediaUnderstandingMock.mockResolvedValue({ + outputs: [], + decisions: [], + appliedImage: false, + appliedAudio: false, + appliedVideo: false, + appliedFile: false, + }); }); const baseQueuedRun = (messageProvider = "whatsapp"): FollowupRun => createMockFollowupRun({ run: { messageProvider } }); function createQueuedRun( - overrides: Partial> & { run?: Partial } = {}, + overrides: Partial> & { + run?: Partial; + } = {}, ): FollowupRun { return createMockFollowupRun(overrides); } @@ -401,7 +417,12 @@ describe("createFollowupRunner messaging tool dedupe", () => { agentResult: { ...makeTextReplyDedupeResult(), messagingToolSentTargets: [ - { tool: "telegram", provider: "telegram", to: "268300329", accountId: "work" }, + { + tool: "telegram", + provider: "telegram", + to: "268300329", + accountId: "work", + }, ], }, queued: { @@ -451,8 +472,13 @@ describe("createFollowupRunner messaging tool dedupe", () => { "sessions.json", ); const sessionKey = "main"; - const sessionEntry: SessionEntry = { sessionId: "session", updatedAt: Date.now() }; - const sessionStore: Record = { [sessionKey]: sessionEntry }; + const sessionEntry: SessionEntry = { + sessionId: "session", + updatedAt: Date.now(), + }; + const sessionStore: Record = { + [sessionKey]: sessionEntry, + }; await saveSessionStore(storePath, sessionStore); const { onBlockReply } = await runMessagingCase({ @@ -704,7 +730,254 @@ describe("createFollowupRunner agentDir forwarding", () => { }); expect(runEmbeddedPiAgentMock).toHaveBeenCalledTimes(1); - const call = runEmbeddedPiAgentMock.mock.calls.at(-1)?.[0] as { agentDir?: string }; + const call = runEmbeddedPiAgentMock.mock.calls.at(-1)?.[0] as { + agentDir?: string; + }; expect(call?.agentDir).toBe(agentDir); }); }); + +describe("createFollowupRunner media understanding", () => { + it("applies audio transcription when mediaContext has untranscribed audio", async () => { + const transcriptText = "Hello, this is a voice note."; + // The real applyMediaUnderstanding mutates the ctx; the mock must do the same + // so buildInboundMediaNote sees MediaUnderstanding and suppresses the audio line. + applyMediaUnderstandingMock.mockImplementationOnce( + async (params: { ctx: Record }) => { + params.ctx.MediaUnderstanding = [ + { + kind: "audio.transcription", + text: transcriptText, + attachmentIndex: 0, + provider: "whisper", + }, + ]; + params.ctx.Transcript = transcriptText; + return { + outputs: [ + { + kind: "audio.transcription", + text: transcriptText, + attachmentIndex: 0, + provider: "whisper", + }, + ], + decisions: [], + appliedImage: false, + appliedAudio: true, + appliedVideo: false, + appliedFile: false, + }; + }, + ); + runEmbeddedPiAgentMock.mockResolvedValueOnce({ + payloads: [{ text: "Got it!" }], + meta: {}, + }); + + const onBlockReply = vi.fn(async () => {}); + const runner = createFollowupRunner({ + opts: { onBlockReply }, + typing: createMockTypingController(), + typingMode: "instant", + defaultModel: "anthropic/claude-opus-4-5", + }); + + const queued = createQueuedRun({ + prompt: "[media attached: /tmp/voice.ogg (audio/ogg)]\nsome text", + mediaContext: { + Body: "some text", + MediaPaths: ["/tmp/voice.ogg"], + MediaTypes: ["audio/ogg"], + // MediaUnderstanding is empty — transcription not yet applied + }, + }); + await runner(queued); + + // applyMediaUnderstanding should have been called + expect(applyMediaUnderstandingMock).toHaveBeenCalledTimes(1); + expect(applyMediaUnderstandingMock).toHaveBeenCalledWith( + expect.objectContaining({ + cfg: queued.run.config, + agentDir: queued.run.agentDir, + }), + ); + + // The prompt passed to the agent should include the transcript, not the + // raw audio attachment line. + const agentCall = runEmbeddedPiAgentMock.mock.calls.at(-1)?.[0] as { + prompt?: string; + }; + expect(agentCall?.prompt).toContain(transcriptText); + expect(agentCall?.prompt).not.toContain("[media attached: /tmp/voice.ogg"); + + expect(onBlockReply).toHaveBeenCalledWith(expect.objectContaining({ text: "Got it!" })); + }); + + it("skips media understanding when MediaUnderstanding is already populated", async () => { + runEmbeddedPiAgentMock.mockResolvedValueOnce({ + payloads: [{ text: "reply" }], + meta: {}, + }); + + const onBlockReply = vi.fn(async () => {}); + const runner = createFollowupRunner({ + opts: { onBlockReply }, + typing: createMockTypingController(), + typingMode: "instant", + defaultModel: "anthropic/claude-opus-4-5", + }); + + const queued = createQueuedRun({ + prompt: "[Audio]\nTranscript:\nAlready transcribed.\n\nsome text", + mediaContext: { + Body: "some text", + MediaPaths: ["/tmp/voice.ogg"], + MediaTypes: ["audio/ogg"], + // MediaUnderstanding already populated — transcription was applied in primary path + MediaUnderstanding: [ + { + kind: "audio.transcription", + text: "Already transcribed.", + attachmentIndex: 0, + provider: "whisper", + }, + ], + }, + }); + await runner(queued); + + // Should NOT re-run media understanding + expect(applyMediaUnderstandingMock).not.toHaveBeenCalled(); + + // The original prompt should be passed through unchanged + const agentCall = runEmbeddedPiAgentMock.mock.calls.at(-1)?.[0] as { + prompt?: string; + }; + expect(agentCall?.prompt).toContain("Already transcribed."); + }); + + it("skips media understanding when no mediaContext is present", async () => { + runEmbeddedPiAgentMock.mockResolvedValueOnce({ + payloads: [{ text: "reply" }], + meta: {}, + }); + + const onBlockReply = vi.fn(async () => {}); + const runner = createFollowupRunner({ + opts: { onBlockReply }, + typing: createMockTypingController(), + typingMode: "instant", + defaultModel: "anthropic/claude-opus-4-5", + }); + + // No mediaContext (plain text message) + const queued = createQueuedRun({ prompt: "just text" }); + await runner(queued); + + expect(applyMediaUnderstandingMock).not.toHaveBeenCalled(); + }); + + it("continues with raw prompt when media understanding fails", async () => { + applyMediaUnderstandingMock.mockRejectedValueOnce(new Error("transcription service down")); + runEmbeddedPiAgentMock.mockResolvedValueOnce({ + payloads: [{ text: "fallback reply" }], + meta: {}, + }); + + const onBlockReply = vi.fn(async () => {}); + const runner = createFollowupRunner({ + opts: { onBlockReply }, + typing: createMockTypingController(), + typingMode: "instant", + defaultModel: "anthropic/claude-opus-4-5", + }); + + const originalPrompt = "[media attached: /tmp/voice.ogg (audio/ogg)]\nsome text"; + const queued = createQueuedRun({ + prompt: originalPrompt, + mediaContext: { + Body: "some text", + MediaPaths: ["/tmp/voice.ogg"], + MediaTypes: ["audio/ogg"], + }, + }); + await runner(queued); + + // Should have attempted media understanding + expect(applyMediaUnderstandingMock).toHaveBeenCalledTimes(1); + + // Agent should still run with the original prompt + const agentCall = runEmbeddedPiAgentMock.mock.calls.at(-1)?.[0] as { + prompt?: string; + }; + expect(agentCall?.prompt).toBe(originalPrompt); + + expect(onBlockReply).toHaveBeenCalledWith(expect.objectContaining({ text: "fallback reply" })); + }); + + it("preserves non-audio media lines when only audio is transcribed", async () => { + applyMediaUnderstandingMock.mockImplementationOnce( + async (params: { ctx: Record }) => { + // Simulate transcription updating the context + params.ctx.MediaUnderstanding = [ + { + kind: "audio.transcription", + text: "voice transcript", + attachmentIndex: 0, + provider: "whisper", + }, + ]; + params.ctx.Transcript = "voice transcript"; + return { + outputs: [ + { + kind: "audio.transcription", + text: "voice transcript", + attachmentIndex: 0, + provider: "whisper", + }, + ], + decisions: [], + appliedImage: false, + appliedAudio: true, + appliedVideo: false, + appliedFile: false, + }; + }, + ); + runEmbeddedPiAgentMock.mockResolvedValueOnce({ + payloads: [{ text: "got both" }], + meta: {}, + }); + + const onBlockReply = vi.fn(async () => {}); + const runner = createFollowupRunner({ + opts: { onBlockReply }, + typing: createMockTypingController(), + typingMode: "instant", + defaultModel: "anthropic/claude-opus-4-5", + }); + + const queued = createQueuedRun({ + prompt: + "[media attached: 2 files]\n[media attached 1/2: /tmp/voice.ogg (audio/ogg)]\n[media attached 2/2: /tmp/photo.jpg (image/jpeg)]\nsome text", + mediaContext: { + Body: "some text", + MediaPaths: ["/tmp/voice.ogg", "/tmp/photo.jpg"], + MediaTypes: ["audio/ogg", "image/jpeg"], + }, + }); + await runner(queued); + + const agentCall = runEmbeddedPiAgentMock.mock.calls.at(-1)?.[0] as { + prompt?: string; + }; + // Audio attachment line should be stripped + expect(agentCall?.prompt).not.toContain("voice.ogg"); + // Image attachment line should also be stripped (all media-attached lines are + // removed and replaced by the new buildInboundMediaNote output) + // The transcript should be present + expect(agentCall?.prompt).toContain("voice transcript"); + }); +}); diff --git a/src/auto-reply/reply/followup-runner.ts b/src/auto-reply/reply/followup-runner.ts index 2fd21607095..1e65380a020 100644 --- a/src/auto-reply/reply/followup-runner.ts +++ b/src/auto-reply/reply/followup-runner.ts @@ -13,10 +13,13 @@ import type { SessionEntry } from "../../config/sessions.js"; import type { TypingMode } from "../../config/types.js"; import { logVerbose } from "../../globals.js"; import { registerAgentRunContext } from "../../infra/agent-events.js"; +import { applyMediaUnderstanding } from "../../media-understanding/apply.js"; +import { formatMediaUnderstandingBody } from "../../media-understanding/format.js"; import { defaultRuntime } from "../../runtime.js"; import { isInternalMessageChannel } from "../../utils/message-channel.js"; import { stripHeartbeatToken } from "../heartbeat.js"; -import type { OriginatingChannelType } from "../templating.js"; +import { buildInboundMediaNote } from "../media-note.js"; +import type { MsgContext, OriginatingChannelType } from "../templating.js"; import { isSilentReplyText, SILENT_REPLY_TOKEN } from "../tokens.js"; import type { GetReplyOptions, ReplyPayload } from "../types.js"; import { resolveRunAuthProfile } from "./agent-runner-utils.js"; @@ -157,6 +160,66 @@ export function createFollowupRunner(params: { let bootstrapPromptWarningSignaturesSeen = resolveBootstrapWarningSignaturesSeen( activeSessionEntry?.systemPromptReport, ); + + // Apply media understanding for followup-queued messages when it was + // not applied (or failed) in the primary path. This ensures voice + // notes that arrived while the agent was mid-turn still get transcribed. + if (queued.mediaContext && !queued.mediaContext.MediaUnderstanding?.length) { + const hasMedia = Boolean( + queued.mediaContext.MediaPath?.trim() || + (Array.isArray(queued.mediaContext.MediaPaths) && + queued.mediaContext.MediaPaths.length > 0), + ); + if (hasMedia) { + try { + const mediaCtx = { ...queued.mediaContext } as MsgContext; + const muResult = await applyMediaUnderstanding({ + ctx: mediaCtx, + cfg: queued.run.config, + agentDir: queued.run.agentDir, + activeModel: { + provider: queued.run.provider, + model: queued.run.model, + }, + }); + if (muResult.outputs.length > 0) { + // Rebuild the prompt with media understanding results baked in, + // matching the primary path's formatting. + const newMediaNote = buildInboundMediaNote(mediaCtx); + const transcriptBody = formatMediaUnderstandingBody({ + body: undefined, + outputs: muResult.outputs, + }); + + // Strip existing [media attached ...] lines from the prompt so + // they can be replaced by the updated media note (which excludes + // successfully-understood attachments like transcribed audio). + const stripped = queued.prompt + .replace(/\[media attached: \d+ files\]\n?/g, "") + .replace(/\[media attached[^\]]*\]\n?/g, ""); + + const parts: string[] = []; + if (newMediaNote) { + parts.push(newMediaNote); + } + if (transcriptBody) { + parts.push(transcriptBody); + } + parts.push(stripped.trim()); + queued.prompt = parts.filter(Boolean).join("\n\n"); + + logVerbose( + `followup: applied media understanding (audio=${muResult.appliedAudio}, image=${muResult.appliedImage})`, + ); + } + } catch (err) { + logVerbose( + `followup: media understanding failed, proceeding with raw content: ${err instanceof Error ? err.message : String(err)}`, + ); + } + } + } + try { const fallbackResult = await runWithModelFallback({ cfg: queued.run.config, diff --git a/src/auto-reply/reply/get-reply-run.ts b/src/auto-reply/reply/get-reply-run.ts index c8451fd88f6..fe87e3919d0 100644 --- a/src/auto-reply/reply/get-reply-run.ts +++ b/src/auto-reply/reply/get-reply-run.ts @@ -387,7 +387,7 @@ export async function runPreparedReply( const mediaReplyHint = mediaNote ? "To send an image back, prefer the message tool (media/path/filePath). If you must inline, use MEDIA:https://example.com/image.jpg (spaces ok, quote if needed) or a safe relative path like MEDIA:./image.jpg. Avoid absolute paths (MEDIA:/...) and ~ paths — they are blocked for security. Keep caption in the text body." : undefined; - let prefixedCommandBody = mediaNote + const prefixedCommandBody = mediaNote ? [mediaNote, mediaReplyHint, prefixedBody ?? ""].filter(Boolean).join("\n").trim() : prefixedBody; if (!resolvedThinkLevel) { @@ -472,11 +472,43 @@ export async function runPreparedReply( isNewSession, }); const authProfileIdSource = sessionEntry?.authProfileOverrideSource; + // Snapshot media-related context for deferred media understanding in the + // followup runner. When MediaUnderstanding is already populated the runner + // knows transcription already succeeded and skips re-application. + const hasMediaAttachments = Boolean( + ctx.MediaPath?.trim() || (Array.isArray(ctx.MediaPaths) && ctx.MediaPaths.length > 0), + ); + const mediaContext = hasMediaAttachments + ? { + Body: ctx.Body, + CommandBody: ctx.CommandBody, + RawBody: ctx.RawBody, + MediaPath: ctx.MediaPath, + MediaUrl: ctx.MediaUrl, + MediaType: ctx.MediaType, + MediaDir: ctx.MediaDir, + MediaPaths: ctx.MediaPaths ? [...ctx.MediaPaths] : undefined, + MediaUrls: ctx.MediaUrls ? [...ctx.MediaUrls] : undefined, + MediaTypes: ctx.MediaTypes ? [...ctx.MediaTypes] : undefined, + MediaRemoteHost: ctx.MediaRemoteHost, + Transcript: ctx.Transcript, + MediaUnderstanding: ctx.MediaUnderstanding ? [...ctx.MediaUnderstanding] : undefined, + MediaUnderstandingDecisions: ctx.MediaUnderstandingDecisions + ? [...ctx.MediaUnderstandingDecisions] + : undefined, + OriginatingChannel: ctx.OriginatingChannel, + OriginatingTo: ctx.OriginatingTo, + AccountId: ctx.AccountId, + MessageThreadId: ctx.MessageThreadId, + } + : undefined; + const followupRun = { prompt: queuedBody, messageId: sessionCtx.MessageSidFull ?? sessionCtx.MessageSid, summaryLine: baseBodyTrimmedRaw, enqueuedAt: Date.now(), + mediaContext, // Originating channel for reply routing. originatingChannel: ctx.OriginatingChannel, originatingTo: ctx.OriginatingTo, diff --git a/src/auto-reply/reply/queue.ts b/src/auto-reply/reply/queue.ts index b097b6c5193..5c3a56f64af 100644 --- a/src/auto-reply/reply/queue.ts +++ b/src/auto-reply/reply/queue.ts @@ -1,6 +1,6 @@ -export { extractQueueDirective } from "./queue/directive.js"; -export { clearSessionQueues } from "./queue/cleanup.js"; export type { ClearSessionQueueResult } from "./queue/cleanup.js"; +export { clearSessionQueues } from "./queue/cleanup.js"; +export { extractQueueDirective } from "./queue/directive.js"; export { scheduleFollowupDrain } from "./queue/drain.js"; export { enqueueFollowupRun, @@ -10,6 +10,7 @@ export { export { resolveQueueSettings } from "./queue/settings.js"; export { clearFollowupQueue } from "./queue/state.js"; export type { + FollowupMediaContext, FollowupRun, QueueDedupeMode, QueueDropPolicy, diff --git a/src/auto-reply/reply/queue/types.ts b/src/auto-reply/reply/queue/types.ts index 507f77d499d..637ceb0a149 100644 --- a/src/auto-reply/reply/queue/types.ts +++ b/src/auto-reply/reply/queue/types.ts @@ -2,6 +2,10 @@ import type { ExecToolDefaults } from "../../../agents/bash-tools.js"; import type { SkillSnapshot } from "../../../agents/skills.js"; import type { OpenClawConfig } from "../../../config/config.js"; import type { SessionEntry } from "../../../config/sessions.js"; +import type { + MediaUnderstandingDecision, + MediaUnderstandingOutput, +} from "../../../media-understanding/types.js"; import type { InputProvenance } from "../../../sessions/input-provenance.js"; import type { OriginatingChannelType } from "../../templating.js"; import type { ElevatedLevel, ReasoningLevel, ThinkLevel, VerboseLevel } from "../directives.js"; @@ -19,12 +23,45 @@ export type QueueSettings = { export type QueueDedupeMode = "message-id" | "prompt" | "none"; +/** + * Snapshot of media-related context fields carried on a FollowupRun so that + * the followup runner can apply media understanding (e.g. voice-note + * transcription) when it was not applied — or failed — in the primary path. + */ +export type FollowupMediaContext = { + Body?: string; + CommandBody?: string; + RawBody?: string; + MediaPath?: string; + MediaUrl?: string; + MediaType?: string; + MediaDir?: string; + MediaPaths?: string[]; + MediaUrls?: string[]; + MediaTypes?: string[]; + MediaRemoteHost?: string; + Transcript?: string; + MediaUnderstanding?: MediaUnderstandingOutput[]; + MediaUnderstandingDecisions?: MediaUnderstandingDecision[]; + OriginatingChannel?: OriginatingChannelType; + OriginatingTo?: string; + AccountId?: string; + MessageThreadId?: string | number; +}; + export type FollowupRun = { prompt: string; /** Provider message ID, when available (for deduplication). */ messageId?: string; summaryLine?: string; enqueuedAt: number; + /** + * Media context snapshot from the original inbound message. + * When present and MediaUnderstanding is empty, the followup runner will + * attempt to apply media understanding (audio transcription, etc.) before + * passing the prompt to the agent. + */ + mediaContext?: FollowupMediaContext; /** * Originating channel for reply routing. * When set, replies should be routed back to this provider