diff --git a/src/auto-reply/reply/followup-runner.test.ts b/src/auto-reply/reply/followup-runner.test.ts index 0e93ab156a8..f0a060af4ac 100644 --- a/src/auto-reply/reply/followup-runner.test.ts +++ b/src/auto-reply/reply/followup-runner.test.ts @@ -2,7 +2,7 @@ import fs from "node:fs/promises"; import { tmpdir } from "node:os"; import path from "node:path"; import { beforeEach, describe, expect, it, vi } from "vitest"; -import { loadSessionStore, saveSessionStore, type SessionEntry } from "../../config/sessions.js"; +import { loadSessionStore, type SessionEntry, saveSessionStore } from "../../config/sessions.js"; import type { FollowupRun } from "./queue.js"; import * as sessionRunAccounting from "./session-run-accounting.js"; import { createMockFollowupRun, createMockTypingController } from "./test-helpers.js"; @@ -10,6 +10,7 @@ import { createMockFollowupRun, createMockTypingController } from "./test-helper const runEmbeddedPiAgentMock = vi.fn(); const routeReplyMock = vi.fn(); const isRoutableChannelMock = vi.fn(); +const applyMediaUnderstandingMock = vi.fn(); vi.mock( "../../agents/model-fallback.js", @@ -20,6 +21,10 @@ vi.mock("../../agents/pi-embedded.js", () => ({ runEmbeddedPiAgent: (params: unknown) => runEmbeddedPiAgentMock(params), })); +vi.mock("../../media-understanding/apply.js", () => ({ + applyMediaUnderstanding: (params: unknown) => applyMediaUnderstandingMock(params), +})); + vi.mock("./route-reply.js", async (importOriginal) => { const actual = await importOriginal(); return { @@ -48,13 +53,24 @@ beforeEach(() => { isRoutableChannelMock.mockImplementation((ch: string | undefined) => Boolean(ch?.trim() && ROUTABLE_TEST_CHANNELS.has(ch.trim().toLowerCase())), ); + applyMediaUnderstandingMock.mockReset(); + applyMediaUnderstandingMock.mockResolvedValue({ + outputs: [], + decisions: [], + appliedImage: false, + appliedAudio: false, + appliedVideo: false, + appliedFile: false, + }); }); const baseQueuedRun = (messageProvider = "whatsapp"): FollowupRun => createMockFollowupRun({ run: { messageProvider } }); function createQueuedRun( - overrides: Partial> & { run?: Partial } = {}, + overrides: Partial> & { + run?: Partial; + } = {}, ): FollowupRun { return createMockFollowupRun(overrides); } @@ -401,7 +417,12 @@ describe("createFollowupRunner messaging tool dedupe", () => { agentResult: { ...makeTextReplyDedupeResult(), messagingToolSentTargets: [ - { tool: "telegram", provider: "telegram", to: "268300329", accountId: "work" }, + { + tool: "telegram", + provider: "telegram", + to: "268300329", + accountId: "work", + }, ], }, queued: { @@ -451,8 +472,13 @@ describe("createFollowupRunner messaging tool dedupe", () => { "sessions.json", ); const sessionKey = "main"; - const sessionEntry: SessionEntry = { sessionId: "session", updatedAt: Date.now() }; - const sessionStore: Record = { [sessionKey]: sessionEntry }; + const sessionEntry: SessionEntry = { + sessionId: "session", + updatedAt: Date.now(), + }; + const sessionStore: Record = { + [sessionKey]: sessionEntry, + }; await saveSessionStore(storePath, sessionStore); const { onBlockReply } = await runMessagingCase({ @@ -704,7 +730,254 @@ describe("createFollowupRunner agentDir forwarding", () => { }); expect(runEmbeddedPiAgentMock).toHaveBeenCalledTimes(1); - const call = runEmbeddedPiAgentMock.mock.calls.at(-1)?.[0] as { agentDir?: string }; + const call = runEmbeddedPiAgentMock.mock.calls.at(-1)?.[0] as { + agentDir?: string; + }; expect(call?.agentDir).toBe(agentDir); }); }); + +describe("createFollowupRunner media understanding", () => { + it("applies audio transcription when mediaContext has untranscribed audio", async () => { + const transcriptText = "Hello, this is a voice note."; + // The real applyMediaUnderstanding mutates the ctx; the mock must do the same + // so buildInboundMediaNote sees MediaUnderstanding and suppresses the audio line. + applyMediaUnderstandingMock.mockImplementationOnce( + async (params: { ctx: Record }) => { + params.ctx.MediaUnderstanding = [ + { + kind: "audio.transcription", + text: transcriptText, + attachmentIndex: 0, + provider: "whisper", + }, + ]; + params.ctx.Transcript = transcriptText; + return { + outputs: [ + { + kind: "audio.transcription", + text: transcriptText, + attachmentIndex: 0, + provider: "whisper", + }, + ], + decisions: [], + appliedImage: false, + appliedAudio: true, + appliedVideo: false, + appliedFile: false, + }; + }, + ); + runEmbeddedPiAgentMock.mockResolvedValueOnce({ + payloads: [{ text: "Got it!" }], + meta: {}, + }); + + const onBlockReply = vi.fn(async () => {}); + const runner = createFollowupRunner({ + opts: { onBlockReply }, + typing: createMockTypingController(), + typingMode: "instant", + defaultModel: "anthropic/claude-opus-4-5", + }); + + const queued = createQueuedRun({ + prompt: "[media attached: /tmp/voice.ogg (audio/ogg)]\nsome text", + mediaContext: { + Body: "some text", + MediaPaths: ["/tmp/voice.ogg"], + MediaTypes: ["audio/ogg"], + // MediaUnderstanding is empty — transcription not yet applied + }, + }); + await runner(queued); + + // applyMediaUnderstanding should have been called + expect(applyMediaUnderstandingMock).toHaveBeenCalledTimes(1); + expect(applyMediaUnderstandingMock).toHaveBeenCalledWith( + expect.objectContaining({ + cfg: queued.run.config, + agentDir: queued.run.agentDir, + }), + ); + + // The prompt passed to the agent should include the transcript, not the + // raw audio attachment line. + const agentCall = runEmbeddedPiAgentMock.mock.calls.at(-1)?.[0] as { + prompt?: string; + }; + expect(agentCall?.prompt).toContain(transcriptText); + expect(agentCall?.prompt).not.toContain("[media attached: /tmp/voice.ogg"); + + expect(onBlockReply).toHaveBeenCalledWith(expect.objectContaining({ text: "Got it!" })); + }); + + it("skips media understanding when MediaUnderstanding is already populated", async () => { + runEmbeddedPiAgentMock.mockResolvedValueOnce({ + payloads: [{ text: "reply" }], + meta: {}, + }); + + const onBlockReply = vi.fn(async () => {}); + const runner = createFollowupRunner({ + opts: { onBlockReply }, + typing: createMockTypingController(), + typingMode: "instant", + defaultModel: "anthropic/claude-opus-4-5", + }); + + const queued = createQueuedRun({ + prompt: "[Audio]\nTranscript:\nAlready transcribed.\n\nsome text", + mediaContext: { + Body: "some text", + MediaPaths: ["/tmp/voice.ogg"], + MediaTypes: ["audio/ogg"], + // MediaUnderstanding already populated — transcription was applied in primary path + MediaUnderstanding: [ + { + kind: "audio.transcription", + text: "Already transcribed.", + attachmentIndex: 0, + provider: "whisper", + }, + ], + }, + }); + await runner(queued); + + // Should NOT re-run media understanding + expect(applyMediaUnderstandingMock).not.toHaveBeenCalled(); + + // The original prompt should be passed through unchanged + const agentCall = runEmbeddedPiAgentMock.mock.calls.at(-1)?.[0] as { + prompt?: string; + }; + expect(agentCall?.prompt).toContain("Already transcribed."); + }); + + it("skips media understanding when no mediaContext is present", async () => { + runEmbeddedPiAgentMock.mockResolvedValueOnce({ + payloads: [{ text: "reply" }], + meta: {}, + }); + + const onBlockReply = vi.fn(async () => {}); + const runner = createFollowupRunner({ + opts: { onBlockReply }, + typing: createMockTypingController(), + typingMode: "instant", + defaultModel: "anthropic/claude-opus-4-5", + }); + + // No mediaContext (plain text message) + const queued = createQueuedRun({ prompt: "just text" }); + await runner(queued); + + expect(applyMediaUnderstandingMock).not.toHaveBeenCalled(); + }); + + it("continues with raw prompt when media understanding fails", async () => { + applyMediaUnderstandingMock.mockRejectedValueOnce(new Error("transcription service down")); + runEmbeddedPiAgentMock.mockResolvedValueOnce({ + payloads: [{ text: "fallback reply" }], + meta: {}, + }); + + const onBlockReply = vi.fn(async () => {}); + const runner = createFollowupRunner({ + opts: { onBlockReply }, + typing: createMockTypingController(), + typingMode: "instant", + defaultModel: "anthropic/claude-opus-4-5", + }); + + const originalPrompt = "[media attached: /tmp/voice.ogg (audio/ogg)]\nsome text"; + const queued = createQueuedRun({ + prompt: originalPrompt, + mediaContext: { + Body: "some text", + MediaPaths: ["/tmp/voice.ogg"], + MediaTypes: ["audio/ogg"], + }, + }); + await runner(queued); + + // Should have attempted media understanding + expect(applyMediaUnderstandingMock).toHaveBeenCalledTimes(1); + + // Agent should still run with the original prompt + const agentCall = runEmbeddedPiAgentMock.mock.calls.at(-1)?.[0] as { + prompt?: string; + }; + expect(agentCall?.prompt).toBe(originalPrompt); + + expect(onBlockReply).toHaveBeenCalledWith(expect.objectContaining({ text: "fallback reply" })); + }); + + it("preserves non-audio media lines when only audio is transcribed", async () => { + applyMediaUnderstandingMock.mockImplementationOnce( + async (params: { ctx: Record }) => { + // Simulate transcription updating the context + params.ctx.MediaUnderstanding = [ + { + kind: "audio.transcription", + text: "voice transcript", + attachmentIndex: 0, + provider: "whisper", + }, + ]; + params.ctx.Transcript = "voice transcript"; + return { + outputs: [ + { + kind: "audio.transcription", + text: "voice transcript", + attachmentIndex: 0, + provider: "whisper", + }, + ], + decisions: [], + appliedImage: false, + appliedAudio: true, + appliedVideo: false, + appliedFile: false, + }; + }, + ); + runEmbeddedPiAgentMock.mockResolvedValueOnce({ + payloads: [{ text: "got both" }], + meta: {}, + }); + + const onBlockReply = vi.fn(async () => {}); + const runner = createFollowupRunner({ + opts: { onBlockReply }, + typing: createMockTypingController(), + typingMode: "instant", + defaultModel: "anthropic/claude-opus-4-5", + }); + + const queued = createQueuedRun({ + prompt: + "[media attached: 2 files]\n[media attached 1/2: /tmp/voice.ogg (audio/ogg)]\n[media attached 2/2: /tmp/photo.jpg (image/jpeg)]\nsome text", + mediaContext: { + Body: "some text", + MediaPaths: ["/tmp/voice.ogg", "/tmp/photo.jpg"], + MediaTypes: ["audio/ogg", "image/jpeg"], + }, + }); + await runner(queued); + + const agentCall = runEmbeddedPiAgentMock.mock.calls.at(-1)?.[0] as { + prompt?: string; + }; + // Audio attachment line should be stripped + expect(agentCall?.prompt).not.toContain("voice.ogg"); + // Image attachment line should also be stripped (all media-attached lines are + // removed and replaced by the new buildInboundMediaNote output) + // The transcript should be present + expect(agentCall?.prompt).toContain("voice transcript"); + }); +}); diff --git a/src/auto-reply/reply/followup-runner.ts b/src/auto-reply/reply/followup-runner.ts index 2fd21607095..1e65380a020 100644 --- a/src/auto-reply/reply/followup-runner.ts +++ b/src/auto-reply/reply/followup-runner.ts @@ -13,10 +13,13 @@ import type { SessionEntry } from "../../config/sessions.js"; import type { TypingMode } from "../../config/types.js"; import { logVerbose } from "../../globals.js"; import { registerAgentRunContext } from "../../infra/agent-events.js"; +import { applyMediaUnderstanding } from "../../media-understanding/apply.js"; +import { formatMediaUnderstandingBody } from "../../media-understanding/format.js"; import { defaultRuntime } from "../../runtime.js"; import { isInternalMessageChannel } from "../../utils/message-channel.js"; import { stripHeartbeatToken } from "../heartbeat.js"; -import type { OriginatingChannelType } from "../templating.js"; +import { buildInboundMediaNote } from "../media-note.js"; +import type { MsgContext, OriginatingChannelType } from "../templating.js"; import { isSilentReplyText, SILENT_REPLY_TOKEN } from "../tokens.js"; import type { GetReplyOptions, ReplyPayload } from "../types.js"; import { resolveRunAuthProfile } from "./agent-runner-utils.js"; @@ -157,6 +160,66 @@ export function createFollowupRunner(params: { let bootstrapPromptWarningSignaturesSeen = resolveBootstrapWarningSignaturesSeen( activeSessionEntry?.systemPromptReport, ); + + // Apply media understanding for followup-queued messages when it was + // not applied (or failed) in the primary path. This ensures voice + // notes that arrived while the agent was mid-turn still get transcribed. + if (queued.mediaContext && !queued.mediaContext.MediaUnderstanding?.length) { + const hasMedia = Boolean( + queued.mediaContext.MediaPath?.trim() || + (Array.isArray(queued.mediaContext.MediaPaths) && + queued.mediaContext.MediaPaths.length > 0), + ); + if (hasMedia) { + try { + const mediaCtx = { ...queued.mediaContext } as MsgContext; + const muResult = await applyMediaUnderstanding({ + ctx: mediaCtx, + cfg: queued.run.config, + agentDir: queued.run.agentDir, + activeModel: { + provider: queued.run.provider, + model: queued.run.model, + }, + }); + if (muResult.outputs.length > 0) { + // Rebuild the prompt with media understanding results baked in, + // matching the primary path's formatting. + const newMediaNote = buildInboundMediaNote(mediaCtx); + const transcriptBody = formatMediaUnderstandingBody({ + body: undefined, + outputs: muResult.outputs, + }); + + // Strip existing [media attached ...] lines from the prompt so + // they can be replaced by the updated media note (which excludes + // successfully-understood attachments like transcribed audio). + const stripped = queued.prompt + .replace(/\[media attached: \d+ files\]\n?/g, "") + .replace(/\[media attached[^\]]*\]\n?/g, ""); + + const parts: string[] = []; + if (newMediaNote) { + parts.push(newMediaNote); + } + if (transcriptBody) { + parts.push(transcriptBody); + } + parts.push(stripped.trim()); + queued.prompt = parts.filter(Boolean).join("\n\n"); + + logVerbose( + `followup: applied media understanding (audio=${muResult.appliedAudio}, image=${muResult.appliedImage})`, + ); + } + } catch (err) { + logVerbose( + `followup: media understanding failed, proceeding with raw content: ${err instanceof Error ? err.message : String(err)}`, + ); + } + } + } + try { const fallbackResult = await runWithModelFallback({ cfg: queued.run.config, diff --git a/src/auto-reply/reply/get-reply-run.ts b/src/auto-reply/reply/get-reply-run.ts index c8451fd88f6..fe87e3919d0 100644 --- a/src/auto-reply/reply/get-reply-run.ts +++ b/src/auto-reply/reply/get-reply-run.ts @@ -387,7 +387,7 @@ export async function runPreparedReply( const mediaReplyHint = mediaNote ? "To send an image back, prefer the message tool (media/path/filePath). If you must inline, use MEDIA:https://example.com/image.jpg (spaces ok, quote if needed) or a safe relative path like MEDIA:./image.jpg. Avoid absolute paths (MEDIA:/...) and ~ paths — they are blocked for security. Keep caption in the text body." : undefined; - let prefixedCommandBody = mediaNote + const prefixedCommandBody = mediaNote ? [mediaNote, mediaReplyHint, prefixedBody ?? ""].filter(Boolean).join("\n").trim() : prefixedBody; if (!resolvedThinkLevel) { @@ -472,11 +472,43 @@ export async function runPreparedReply( isNewSession, }); const authProfileIdSource = sessionEntry?.authProfileOverrideSource; + // Snapshot media-related context for deferred media understanding in the + // followup runner. When MediaUnderstanding is already populated the runner + // knows transcription already succeeded and skips re-application. + const hasMediaAttachments = Boolean( + ctx.MediaPath?.trim() || (Array.isArray(ctx.MediaPaths) && ctx.MediaPaths.length > 0), + ); + const mediaContext = hasMediaAttachments + ? { + Body: ctx.Body, + CommandBody: ctx.CommandBody, + RawBody: ctx.RawBody, + MediaPath: ctx.MediaPath, + MediaUrl: ctx.MediaUrl, + MediaType: ctx.MediaType, + MediaDir: ctx.MediaDir, + MediaPaths: ctx.MediaPaths ? [...ctx.MediaPaths] : undefined, + MediaUrls: ctx.MediaUrls ? [...ctx.MediaUrls] : undefined, + MediaTypes: ctx.MediaTypes ? [...ctx.MediaTypes] : undefined, + MediaRemoteHost: ctx.MediaRemoteHost, + Transcript: ctx.Transcript, + MediaUnderstanding: ctx.MediaUnderstanding ? [...ctx.MediaUnderstanding] : undefined, + MediaUnderstandingDecisions: ctx.MediaUnderstandingDecisions + ? [...ctx.MediaUnderstandingDecisions] + : undefined, + OriginatingChannel: ctx.OriginatingChannel, + OriginatingTo: ctx.OriginatingTo, + AccountId: ctx.AccountId, + MessageThreadId: ctx.MessageThreadId, + } + : undefined; + const followupRun = { prompt: queuedBody, messageId: sessionCtx.MessageSidFull ?? sessionCtx.MessageSid, summaryLine: baseBodyTrimmedRaw, enqueuedAt: Date.now(), + mediaContext, // Originating channel for reply routing. originatingChannel: ctx.OriginatingChannel, originatingTo: ctx.OriginatingTo, diff --git a/src/auto-reply/reply/queue.ts b/src/auto-reply/reply/queue.ts index b097b6c5193..5c3a56f64af 100644 --- a/src/auto-reply/reply/queue.ts +++ b/src/auto-reply/reply/queue.ts @@ -1,6 +1,6 @@ -export { extractQueueDirective } from "./queue/directive.js"; -export { clearSessionQueues } from "./queue/cleanup.js"; export type { ClearSessionQueueResult } from "./queue/cleanup.js"; +export { clearSessionQueues } from "./queue/cleanup.js"; +export { extractQueueDirective } from "./queue/directive.js"; export { scheduleFollowupDrain } from "./queue/drain.js"; export { enqueueFollowupRun, @@ -10,6 +10,7 @@ export { export { resolveQueueSettings } from "./queue/settings.js"; export { clearFollowupQueue } from "./queue/state.js"; export type { + FollowupMediaContext, FollowupRun, QueueDedupeMode, QueueDropPolicy, diff --git a/src/auto-reply/reply/queue/types.ts b/src/auto-reply/reply/queue/types.ts index 507f77d499d..637ceb0a149 100644 --- a/src/auto-reply/reply/queue/types.ts +++ b/src/auto-reply/reply/queue/types.ts @@ -2,6 +2,10 @@ import type { ExecToolDefaults } from "../../../agents/bash-tools.js"; import type { SkillSnapshot } from "../../../agents/skills.js"; import type { OpenClawConfig } from "../../../config/config.js"; import type { SessionEntry } from "../../../config/sessions.js"; +import type { + MediaUnderstandingDecision, + MediaUnderstandingOutput, +} from "../../../media-understanding/types.js"; import type { InputProvenance } from "../../../sessions/input-provenance.js"; import type { OriginatingChannelType } from "../../templating.js"; import type { ElevatedLevel, ReasoningLevel, ThinkLevel, VerboseLevel } from "../directives.js"; @@ -19,12 +23,45 @@ export type QueueSettings = { export type QueueDedupeMode = "message-id" | "prompt" | "none"; +/** + * Snapshot of media-related context fields carried on a FollowupRun so that + * the followup runner can apply media understanding (e.g. voice-note + * transcription) when it was not applied — or failed — in the primary path. + */ +export type FollowupMediaContext = { + Body?: string; + CommandBody?: string; + RawBody?: string; + MediaPath?: string; + MediaUrl?: string; + MediaType?: string; + MediaDir?: string; + MediaPaths?: string[]; + MediaUrls?: string[]; + MediaTypes?: string[]; + MediaRemoteHost?: string; + Transcript?: string; + MediaUnderstanding?: MediaUnderstandingOutput[]; + MediaUnderstandingDecisions?: MediaUnderstandingDecision[]; + OriginatingChannel?: OriginatingChannelType; + OriginatingTo?: string; + AccountId?: string; + MessageThreadId?: string | number; +}; + export type FollowupRun = { prompt: string; /** Provider message ID, when available (for deduplication). */ messageId?: string; summaryLine?: string; enqueuedAt: number; + /** + * Media context snapshot from the original inbound message. + * When present and MediaUnderstanding is empty, the followup runner will + * attempt to apply media understanding (audio transcription, etc.) before + * passing the prompt to the agent. + */ + mediaContext?: FollowupMediaContext; /** * Originating channel for reply routing. * When set, replies should be routed back to this provider