Merge fddce3db72f5e22b8d3a1d2f7a12daf15b3e045c into 598f1826d8b2bc969aace2c6459824737667218c

This commit is contained in:
Joseph Krug 2026-03-21 04:28:31 +00:00 committed by GitHub
commit 831ade5573
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
11 changed files with 2603 additions and 21 deletions

View File

@ -0,0 +1,121 @@
import { describe, expect, it } from "vitest";
import {
_findLastOccurrenceOutsideFileBlocks as findLastOccurrenceOutsideFileBlocks,
_normalizeUpdatedBody as normalizeUpdatedBody,
_rebuildQueuedPromptWithMediaUnderstanding as rebuildQueuedPromptWithMediaUnderstanding,
} from "./followup-media.js";
const FILE_BLOCK = '<file name="doc.pdf" type="application/pdf">\nPDF content\n</file>';
describe("findLastOccurrenceOutsideFileBlocks", () => {
it("returns -1 for empty search", () => {
expect(findLastOccurrenceOutsideFileBlocks("hello", "")).toBe(-1);
});
it("finds last occurrence in body region before file blocks", () => {
const value = `hello world hello\n${FILE_BLOCK}`;
// "hello" appears at 0 and 12 — both before the file block
expect(findLastOccurrenceOutsideFileBlocks(value, "hello")).toBe(12);
});
it("skips matches inside file block content", () => {
// "PDF content" appears only inside the file block — no valid match outside.
const value = `some text\n${FILE_BLOCK}`;
expect(findLastOccurrenceOutsideFileBlocks(value, "PDF content")).toBe(-1);
});
it("finds trailing occurrence outside file block even when also inside one", () => {
const value = `some text\n${FILE_BLOCK}\nPDF content`;
// "PDF content" appears inside the file block AND after it — the function
// should return the trailing occurrence that is outside the block.
const expected = value.lastIndexOf("PDF content");
expect(findLastOccurrenceOutsideFileBlocks(value, "PDF content")).toBe(expected);
});
it("finds occurrence when search itself contains file blocks", () => {
const bodyWithFile = `caption\n${FILE_BLOCK}`;
const value = `previous\n${bodyWithFile}\nlater\n${bodyWithFile}`;
// Should find the *last* (trailing) occurrence
const expected = value.lastIndexOf(bodyWithFile);
expect(findLastOccurrenceOutsideFileBlocks(value, bodyWithFile)).toBe(expected);
expect(expected).toBeGreaterThan(value.indexOf(bodyWithFile));
});
it("returns index when no file blocks exist in value", () => {
expect(findLastOccurrenceOutsideFileBlocks("abc abc", "abc")).toBe(4);
});
it("finds body text after thread-history file blocks", () => {
const value = `Thread history\n${FILE_BLOCK}\n\ncheck this out`;
// The body "check this out" appears after a file block from thread history.
// The old truncation approach would miss this; the new approach finds it.
expect(findLastOccurrenceOutsideFileBlocks(value, "check this out")).toBe(
value.lastIndexOf("check this out"),
);
});
});
describe("normalizeUpdatedBody", () => {
it("returns empty string when updatedBody is empty", () => {
expect(normalizeUpdatedBody({ originalBody: "foo", updatedBody: "" })).toBe("");
});
it("returns updatedBody when originalBody is empty", () => {
expect(normalizeUpdatedBody({ updatedBody: "hello" })).toBe("hello");
});
it("strips directives when updatedBody equals originalBody", () => {
const body = "/think high tell me a joke";
const result = normalizeUpdatedBody({ originalBody: body, updatedBody: body });
expect(result).toBe("tell me a joke");
});
it("does not corrupt file block content during directive cleanup", () => {
const originalBody = "/think high tell me about this file";
// updatedBody has the original body plus a file block appended by media processing
const updatedBody = `${originalBody}\n${FILE_BLOCK}`;
const result = normalizeUpdatedBody({ originalBody, updatedBody });
// The directive should be stripped from the body portion, file block preserved
expect(result).toContain("tell me about this file");
expect(result).toContain(FILE_BLOCK);
expect(result).not.toContain("/think");
});
it("replaces in body region, not inside file blocks", () => {
const originalBody = "PDF content";
const updatedBody = `PDF content\n<file name="doc.pdf" type="application/pdf">\nPDF content\n</file>`;
// The replacement should target the body region "PDF content" before the
// file block, not the "PDF content" inside the <file> block.
const result = normalizeUpdatedBody({ originalBody, updatedBody });
// With no directives to strip, original === cleaned, updatedBody !== originalBody
// because updatedBody has the file block appended. The replacement targets the
// body-region occurrence.
expect(result).toContain('<file name="doc.pdf"');
expect(result).toContain("PDF content\n</file>");
});
});
describe("rebuildQueuedPromptWithMediaUnderstanding", () => {
it("replaces original body with updated body in prompt", () => {
const result = rebuildQueuedPromptWithMediaUnderstanding({
prompt: "thread context\nhello world",
originalBody: "hello world",
updatedBody: 'hello world\n<file name="a.pdf">data</file>',
});
expect(result).toContain('<file name="a.pdf">data</file>');
expect(result).toContain("thread context");
});
it("preserves file blocks in thread history when body is replaced", () => {
const prompt = `history\n<file name="old.pdf">old</file>\nhello world`;
const result = rebuildQueuedPromptWithMediaUnderstanding({
prompt,
originalBody: "hello world",
updatedBody: "hello world transcribed",
});
// The old file block from history should be preserved since updatedBody
// has no file blocks of its own.
expect(result).toContain('<file name="old.pdf">old</file>');
expect(result).toContain("hello world transcribed");
});
});

View File

@ -0,0 +1,374 @@
import { logVerbose } from "../../globals.js";
import { applyMediaUnderstanding } from "../../media-understanding/apply.js";
import {
normalizeAttachments,
resolveAttachmentKind,
} from "../../media-understanding/attachments.js";
import { buildInboundMediaNote } from "../media-note.js";
import type { MsgContext } from "../templating.js";
import { parseInlineDirectives } from "./directive-handling.js";
import type { FollowupMediaContext, FollowupRun } from "./queue/types.js";
const MEDIA_ONLY_PLACEHOLDER = "[User sent media without caption]";
const MEDIA_REPLY_HINT_PREFIX = "To send an image back, prefer the message tool";
const LEADING_MEDIA_ATTACHED_LINE_RE = /^\[media attached(?: \d+\/\d+)?: [^\r\n]*\]$/;
const FILE_BLOCK_RE = /<file\s+name="/i;
const FILE_BLOCK_BODY_RE = /<file\s+name="[^"]*"[^>]*>[\s\S]*?<\/file>/i;
const FILE_BLOCK_FULL_RE = /<file\s+name="[^"]*"[^>]*>[\s\S]*?<\/file>\n?/gi;
function stripExistingFileBlocks(text: string): string {
return text.replace(FILE_BLOCK_FULL_RE, "").trim();
}
function stripLeadingMediaAttachedLines(prompt: string): string {
const lines = prompt.split("\n");
let index = 0;
while (index < lines.length) {
const trimmed = lines[index]?.trim() ?? "";
if (!LEADING_MEDIA_ATTACHED_LINE_RE.test(trimmed)) {
break;
}
index += 1;
}
return lines.slice(index).join("\n").trim();
}
function stripLeadingMediaReplyHint(prompt: string): string {
const lines = prompt.split("\n");
if ((lines[0] ?? "").startsWith(MEDIA_REPLY_HINT_PREFIX)) {
return lines.slice(1).join("\n").trim();
}
return prompt.trim();
}
/** Collect the [start, end) ranges of every `<file …>…</file>` block in `value`. */
function collectFileBlockRanges(value: string): Array<[number, number]> {
const ranges: Array<[number, number]> = [];
const re = new RegExp(FILE_BLOCK_FULL_RE.source, FILE_BLOCK_FULL_RE.flags);
let m: RegExpExecArray | null;
while ((m = re.exec(value)) !== null) {
ranges.push([m.index, m.index + m[0].length]);
}
return ranges;
}
function isInsideFileBlock(
position: number,
length: number,
ranges: Array<[number, number]>,
): boolean {
for (const [start, end] of ranges) {
if (position >= start && position + length <= end) {
return true;
}
}
return false;
}
/**
* Find the last occurrence of `search` in `value` that is NOT inside a
* `<file …>…</file>` block. Searches the full string with lastIndexOf,
* then walks backward past any matches that fall inside file blocks.
*/
function findLastOccurrenceOutsideFileBlocks(value: string, search: string): number {
if (!search) {
return -1;
}
const ranges = collectFileBlockRanges(value);
let pos = value.lastIndexOf(search);
while (pos >= 0 && isInsideFileBlock(pos, search.length, ranges)) {
pos = value.lastIndexOf(search, pos - 1);
}
return pos;
}
function replaceLastOccurrenceOutsideFileBlocks(
value: string,
search: string,
replacement: string,
): string | undefined {
if (!search) {
return undefined;
}
const index = findLastOccurrenceOutsideFileBlocks(value, search);
if (index < 0) {
return undefined;
}
return `${value.slice(0, index)}${replacement}${value.slice(index + search.length)}`;
}
function findTrailingReplacementTargetBeforeFileBlocks(
value: string,
targets: string[],
): { index: number; target: string } | undefined {
let bestMatch: { index: number; target: string } | undefined;
for (const target of targets) {
const index = findLastOccurrenceOutsideFileBlocks(value, target);
if (index < 0) {
continue;
}
if (!bestMatch || index > bestMatch.index) {
bestMatch = { index, target };
}
}
return bestMatch;
}
function replaceOccurrenceAtIndex(
value: string,
search: string,
replacement: string,
index: number,
): string {
return `${value.slice(0, index)}${replacement}${value.slice(index + search.length)}`;
}
function stripInlineDirectives(text: string | undefined): string {
return parseInlineDirectives(text ?? "").cleaned.trim();
}
function bodyContainsExtractedFileBlock(text: string | undefined): boolean {
return FILE_BLOCK_BODY_RE.test(text ?? "");
}
function normalizeUpdatedBody(params: { originalBody?: string; updatedBody?: string }): string {
const updatedBody = params.updatedBody?.trim();
if (!updatedBody) {
return "";
}
const originalBody = params.originalBody?.trim();
if (!originalBody) {
return updatedBody;
}
const cleanedOriginalBody = stripInlineDirectives(originalBody);
if (!cleanedOriginalBody) {
return updatedBody;
}
if (updatedBody === originalBody) {
return cleanedOriginalBody;
}
return (
replaceLastOccurrenceOutsideFileBlocks(updatedBody, originalBody, cleanedOriginalBody) ??
updatedBody
).trim();
}
function rebuildQueuedPromptWithMediaUnderstanding(params: {
prompt: string;
originalBody?: string;
updatedBody?: string;
mediaNote?: string;
}): string {
let stripped = stripLeadingMediaAttachedLines(params.prompt);
if (!params.mediaNote) {
stripped = stripLeadingMediaReplyHint(stripped);
}
const replacementTargets = [
params.originalBody?.trim(),
stripInlineDirectives(params.originalBody),
MEDIA_ONLY_PLACEHOLDER,
].filter(
(value, index, list): value is string => Boolean(value) && list.indexOf(value) === index,
);
// Strip pre-existing file blocks from the body region when the updated body
// contains new file blocks. Mixed messages (audio + PDF) can arrive with
// file extraction already applied in the primary path; without this strip
// the old block stays in the prompt while the updated body adds a new one,
// duplicating potentially large file payloads.
// Scope stripping to the confirmed body segment so quoted/replied text,
// thread history above the body, and prompts whose original body no longer
// appears all retain any legitimate <file> blocks.
if (params.updatedBody && FILE_BLOCK_RE.test(params.updatedBody)) {
const trailingMatch = findTrailingReplacementTargetBeforeFileBlocks(
stripped,
replacementTargets,
);
if (trailingMatch) {
stripped =
stripped.slice(0, trailingMatch.index) +
stripExistingFileBlocks(stripped.slice(trailingMatch.index));
}
}
const updatedBody = normalizeUpdatedBody({
originalBody: params.originalBody,
updatedBody: params.updatedBody,
});
if (!updatedBody) {
return [params.mediaNote?.trim(), stripped].filter(Boolean).join("\n").trim();
}
let rebuilt = stripped;
const trailingMatch = findTrailingReplacementTargetBeforeFileBlocks(rebuilt, replacementTargets);
if (trailingMatch) {
rebuilt = replaceOccurrenceAtIndex(
rebuilt,
trailingMatch.target,
updatedBody,
trailingMatch.index,
);
return [params.mediaNote?.trim(), rebuilt.trim()].filter(Boolean).join("\n").trim();
}
rebuilt = [rebuilt, updatedBody].filter(Boolean).join("\n\n");
return [params.mediaNote?.trim(), rebuilt.trim()].filter(Boolean).join("\n").trim();
}
function hasMediaAttachments(mediaContext: FollowupMediaContext): boolean {
return Boolean(
mediaContext.MediaPath?.trim() ||
mediaContext.MediaUrl?.trim() ||
(Array.isArray(mediaContext.MediaPaths) && mediaContext.MediaPaths.length > 0) ||
(Array.isArray(mediaContext.MediaUrls) && mediaContext.MediaUrls.length > 0),
);
}
function hasOnlyFileLikeAttachments(mediaContext: FollowupMediaContext): boolean {
const attachments = normalizeAttachments(mediaContext as MsgContext);
return (
attachments.length > 0 &&
attachments.every((attachment) => {
const kind = resolveAttachmentKind(attachment);
return kind !== "audio" && kind !== "image" && kind !== "video";
})
);
}
function hasAnyFileAttachments(mediaContext: FollowupMediaContext): boolean {
return normalizeAttachments(mediaContext as MsgContext).some((attachment) => {
const kind = resolveAttachmentKind(attachment);
return kind !== "audio" && kind !== "image" && kind !== "video";
});
}
function snapshotUpdatedMediaContext(params: {
original: FollowupMediaContext;
mediaCtx: MsgContext;
updatedBody?: string;
appliedFile?: boolean;
}): FollowupMediaContext {
return {
...params.original,
Body: params.updatedBody ?? params.original.Body,
Transcript:
typeof params.mediaCtx.Transcript === "string"
? params.mediaCtx.Transcript
: params.original.Transcript,
MediaUnderstanding: Array.isArray(params.mediaCtx.MediaUnderstanding)
? [...params.mediaCtx.MediaUnderstanding]
: params.original.MediaUnderstanding,
MediaUnderstandingDecisions: Array.isArray(params.mediaCtx.MediaUnderstandingDecisions)
? [...params.mediaCtx.MediaUnderstandingDecisions]
: params.original.MediaUnderstandingDecisions,
DeferredMediaApplied: true,
DeferredFileBlocksExtracted:
params.original.DeferredFileBlocksExtracted || params.appliedFile || undefined,
};
}
// Exported for unit testing — these are pure string helpers with no side effects.
export {
findLastOccurrenceOutsideFileBlocks as _findLastOccurrenceOutsideFileBlocks,
normalizeUpdatedBody as _normalizeUpdatedBody,
rebuildQueuedPromptWithMediaUnderstanding as _rebuildQueuedPromptWithMediaUnderstanding,
};
export async function applyDeferredMediaUnderstandingToQueuedRun(
queued: FollowupRun,
params: { logLabel?: string } = {},
): Promise<void> {
// NOTE: collect-mode and overflow-summary queue drains create synthetic
// followup runs without mediaContext — those paths are not covered here
// and rely on their own prompt-building logic in queue/drain.ts.
const mediaContext = queued.mediaContext;
if (!mediaContext || mediaContext.DeferredMediaApplied) {
return;
}
if (!hasMediaAttachments(mediaContext)) {
mediaContext.DeferredMediaApplied = true;
return;
}
if (mediaContext.MediaUnderstanding?.length) {
mediaContext.DeferredMediaApplied = true;
return;
}
// Treat followup file extraction as already applied only when we have explicit
// evidence: the queue snapshot already flagged it or Body already contains a
// real extracted <file>...</file> block. Body/RawBody mismatches are not
// reliable because some channels wrap Body with envelope metadata.
if (
!mediaContext.DeferredFileBlocksExtracted &&
hasAnyFileAttachments(mediaContext) &&
bodyContainsExtractedFileBlock(mediaContext.Body)
) {
mediaContext.DeferredFileBlocksExtracted = true;
}
if (mediaContext.DeferredFileBlocksExtracted && hasOnlyFileLikeAttachments(mediaContext)) {
mediaContext.DeferredMediaApplied = true;
return;
}
const resolvedOriginalBody =
mediaContext.CommandBody ?? mediaContext.RawBody ?? mediaContext.Body;
try {
const mediaCtx = {
...mediaContext,
Body: resolvedOriginalBody,
Provider:
mediaContext.Provider ??
queued.run.messageProvider ??
(typeof mediaContext.OriginatingChannel === "string"
? mediaContext.OriginatingChannel
: undefined),
Surface: mediaContext.Surface,
} as MsgContext;
const muResult = await applyMediaUnderstanding({
ctx: mediaCtx,
cfg: queued.run.config,
agentDir: queued.run.agentDir,
activeModel: {
provider: queued.run.provider,
model: queued.run.model,
},
});
const shouldRebuildPrompt =
muResult.outputs.length > 0 ||
muResult.appliedAudio ||
muResult.appliedImage ||
muResult.appliedVideo ||
(muResult.appliedFile && !mediaContext.DeferredFileBlocksExtracted);
if (shouldRebuildPrompt) {
const newMediaNote = buildInboundMediaNote(mediaCtx);
queued.prompt = rebuildQueuedPromptWithMediaUnderstanding({
prompt: queued.prompt,
originalBody: resolvedOriginalBody,
updatedBody: mediaCtx.Body,
mediaNote: newMediaNote,
});
logVerbose(
`${params.logLabel ?? "followup"}: applied media understanding (audio=${muResult.appliedAudio}, image=${muResult.appliedImage}, video=${muResult.appliedVideo}, file=${muResult.appliedFile})`,
);
}
queued.mediaContext = snapshotUpdatedMediaContext({
original: mediaContext,
mediaCtx,
updatedBody: shouldRebuildPrompt ? mediaCtx.Body : undefined,
appliedFile: muResult.appliedFile,
});
} catch (err) {
mediaContext.DeferredMediaApplied = true;
logVerbose(
`${params.logLabel ?? "followup"}: media understanding failed, proceeding with raw content: ${err instanceof Error ? err.message : String(err)}`,
);
}
}

File diff suppressed because it is too large Load Diff

View File

@ -20,6 +20,7 @@ import type { OriginatingChannelType } from "../templating.js";
import { isSilentReplyText, SILENT_REPLY_TOKEN } from "../tokens.js";
import type { GetReplyOptions, ReplyPayload } from "../types.js";
import { resolveRunAuthProfile } from "./agent-runner-utils.js";
import { applyDeferredMediaUnderstandingToQueuedRun } from "./followup-media.js";
import {
resolveOriginAccountId,
resolveOriginMessageProvider,
@ -157,6 +158,8 @@ export function createFollowupRunner(params: {
let bootstrapPromptWarningSignaturesSeen = resolveBootstrapWarningSignaturesSeen(
activeSessionEntry?.systemPromptReport,
);
await applyDeferredMediaUnderstandingToQueuedRun(queued, { logLabel: "followup" });
try {
const fallbackResult = await runWithModelFallback({
cfg: queued.run.config,

View File

@ -172,6 +172,45 @@ describe("runPreparedReply media-only handling", () => {
expect(call?.followupRun.prompt).toContain("[User sent media without caption]");
});
it("snapshots URL-only attachments into followup mediaContext", async () => {
await runPreparedReply(
baseParams({
ctx: {
Body: "check this attachment",
RawBody: "check this attachment",
CommandBody: "check this attachment",
ThreadHistoryBody: "Earlier message in this thread",
OriginatingChannel: "slack",
OriginatingTo: "C123",
ChatType: "group",
MediaUrl: "https://cdn.example.com/input.png",
MediaUrls: ["https://cdn.example.com/input.png"],
MediaType: "image/png",
MediaTypes: ["image/png"],
},
sessionCtx: {
Body: "check this attachment",
BodyStripped: "check this attachment",
ThreadHistoryBody: "Earlier message in this thread",
Provider: "slack",
ChatType: "group",
OriginatingChannel: "slack",
OriginatingTo: "C123",
},
}),
);
const call = vi.mocked(runReplyAgent).mock.calls[0]?.[0];
expect(call?.followupRun.mediaContext).toEqual(
expect.objectContaining({
MediaUrl: "https://cdn.example.com/input.png",
MediaUrls: ["https://cdn.example.com/input.png"],
MediaType: "image/png",
MediaTypes: ["image/png"],
}),
);
});
it("keeps thread history context on follow-up turns", async () => {
const result = await runPreparedReply(
baseParams({
@ -186,6 +225,41 @@ describe("runPreparedReply media-only handling", () => {
expect(call?.followupRun.prompt).toContain("Earlier message in this thread");
});
it("snapshots mediaContext for URL-only deferred attachments", async () => {
await runPreparedReply(
baseParams({
ctx: {
Body: "",
RawBody: "",
CommandBody: "",
MediaUrl: "https://cdn.example.com/audio.ogg",
MediaUrls: ["https://cdn.example.com/audio.ogg"],
MediaType: "audio/ogg",
MediaTypes: ["audio/ogg"],
ThreadHistoryBody: "Earlier message in this thread",
OriginatingChannel: "slack",
OriginatingTo: "C123",
ChatType: "group",
},
sessionCtx: {
Body: "",
BodyStripped: "",
ThreadHistoryBody: "Earlier message in this thread",
Provider: "slack",
ChatType: "group",
OriginatingChannel: "slack",
OriginatingTo: "C123",
},
}),
);
const call = vi.mocked(runReplyAgent).mock.calls[0]?.[0];
expect(call?.followupRun.mediaContext?.MediaUrl).toBe("https://cdn.example.com/audio.ogg");
expect(call?.followupRun.mediaContext?.MediaUrls).toEqual([
"https://cdn.example.com/audio.ogg",
]);
});
it("returns the empty-body reply when there is no text and no media", async () => {
const result = await runPreparedReply(
baseParams({

View File

@ -310,7 +310,14 @@ export async function runPreparedReply(
: [inboundUserContext, baseBodyFinal].filter(Boolean).join("\n\n");
const baseBodyTrimmed = baseBodyForPrompt.trim();
const hasMediaAttachment = Boolean(
sessionCtx.MediaPath || (sessionCtx.MediaPaths && sessionCtx.MediaPaths.length > 0),
sessionCtx.MediaPath ||
sessionCtx.MediaUrl ||
(sessionCtx.MediaPaths && sessionCtx.MediaPaths.length > 0) ||
(sessionCtx.MediaUrls && sessionCtx.MediaUrls.length > 0) ||
ctx.MediaPath?.trim() ||
ctx.MediaUrl?.trim() ||
(Array.isArray(ctx.MediaPaths) && ctx.MediaPaths.length > 0) ||
(Array.isArray(ctx.MediaUrls) && ctx.MediaUrls.length > 0),
);
if (!baseBodyTrimmed && !hasMediaAttachment) {
await typing.onReplyStart();
@ -387,7 +394,7 @@ export async function runPreparedReply(
const mediaReplyHint = mediaNote
? "To send an image back, prefer the message tool (media/path/filePath). If you must inline, use MEDIA:https://example.com/image.jpg (spaces ok, quote if needed) or a safe relative path like MEDIA:./image.jpg. Avoid absolute paths (MEDIA:/...) and ~ paths — they are blocked for security. Keep caption in the text body."
: undefined;
let prefixedCommandBody = mediaNote
const prefixedCommandBody = mediaNote
? [mediaNote, mediaReplyHint, prefixedBody ?? ""].filter(Boolean).join("\n").trim()
: prefixedBody;
if (!resolvedThinkLevel) {
@ -472,11 +479,48 @@ export async function runPreparedReply(
isNewSession,
});
const authProfileIdSource = sessionEntry?.authProfileOverrideSource;
// Snapshot media-related context for deferred media understanding in the
// followup runner. When MediaUnderstanding is already populated the runner
// knows transcription already succeeded and skips re-application.
const hasMediaAttachments = Boolean(
ctx.MediaPath?.trim() ||
ctx.MediaUrl?.trim() ||
(Array.isArray(ctx.MediaPaths) && ctx.MediaPaths.length > 0) ||
(Array.isArray(ctx.MediaUrls) && ctx.MediaUrls.length > 0),
);
const mediaContext = hasMediaAttachments
? {
Body: ctx.Body,
CommandBody: ctx.CommandBody,
RawBody: ctx.RawBody,
Provider: ctx.Provider ?? sessionCtx.Provider,
Surface: ctx.Surface ?? sessionCtx.Surface,
MediaPath: ctx.MediaPath,
MediaUrl: ctx.MediaUrl,
MediaType: ctx.MediaType,
MediaDir: ctx.MediaDir,
MediaPaths: ctx.MediaPaths ? [...ctx.MediaPaths] : undefined,
MediaUrls: ctx.MediaUrls ? [...ctx.MediaUrls] : undefined,
MediaTypes: ctx.MediaTypes ? [...ctx.MediaTypes] : undefined,
MediaRemoteHost: ctx.MediaRemoteHost,
Transcript: ctx.Transcript,
MediaUnderstanding: ctx.MediaUnderstanding ? [...ctx.MediaUnderstanding] : undefined,
MediaUnderstandingDecisions: ctx.MediaUnderstandingDecisions
? [...ctx.MediaUnderstandingDecisions]
: undefined,
OriginatingChannel: ctx.OriginatingChannel,
OriginatingTo: ctx.OriginatingTo,
AccountId: ctx.AccountId,
MessageThreadId: ctx.MessageThreadId,
}
: undefined;
const followupRun = {
prompt: queuedBody,
messageId: sessionCtx.MessageSidFull ?? sessionCtx.MessageSid,
summaryLine: baseBodyTrimmedRaw,
enqueuedAt: Date.now(),
mediaContext,
// Originating channel for reply routing.
originatingChannel: ctx.OriginatingChannel,
originatingTo: ctx.OriginatingTo,

View File

@ -1,6 +1,6 @@
export { extractQueueDirective } from "./queue/directive.js";
export { clearSessionQueues } from "./queue/cleanup.js";
export type { ClearSessionQueueResult } from "./queue/cleanup.js";
export { clearSessionQueues } from "./queue/cleanup.js";
export { extractQueueDirective } from "./queue/directive.js";
export { scheduleFollowupDrain } from "./queue/drain.js";
export {
enqueueFollowupRun,
@ -10,6 +10,7 @@ export {
export { resolveQueueSettings } from "./queue/settings.js";
export { clearFollowupQueue } from "./queue/state.js";
export type {
FollowupMediaContext,
FollowupRun,
QueueDedupeMode,
QueueDropPolicy,

View File

@ -3,15 +3,17 @@ import { resolveGlobalMap } from "../../../shared/global-singleton.js";
import {
buildCollectPrompt,
beginQueueDrain,
buildQueueSummaryLine,
buildQueueSummaryPrompt,
clearQueueSummaryState,
drainCollectQueueStep,
drainNextQueueItem,
hasCrossChannelItems,
previewQueueSummaryPrompt,
waitForQueueDebounce,
} from "../../../utils/queue-helpers.js";
import { applyDeferredMediaUnderstandingToQueuedRun } from "../followup-media.js";
import { isRoutableChannel } from "../route-reply.js";
import { FOLLOWUP_QUEUES } from "./state.js";
import { FOLLOWUP_QUEUES, type FollowupQueueState } from "./state.js";
import type { FollowupRun } from "./types.js";
// Persists the most recent runFollowup callback per queue key so that
@ -68,6 +70,59 @@ function resolveCrossChannelKey(item: FollowupRun): { cross?: true; key?: string
};
}
function clearFollowupQueueSummaryState(queue: FollowupQueueState): void {
clearQueueSummaryState(queue);
queue.summaryItems = [];
}
export async function applyDeferredMediaToQueuedRuns(items: FollowupRun[]): Promise<void> {
await Promise.allSettled(
items.map((item) =>
applyDeferredMediaUnderstandingToQueuedRun(item, { logLabel: "followup queue" }),
),
);
}
async function resolveSummaryLines(items: FollowupRun[]): Promise<string[]> {
// Parallelize the media understanding API calls upfront (same pattern as
// applyDeferredMediaToQueuedRuns), then build summary lines sequentially
// so line order matches the original item order.
await Promise.allSettled(
items.map((item) =>
applyDeferredMediaUnderstandingToQueuedRun(item, { logLabel: "followup queue" }),
),
);
// After deferred media, prefer the updated prompt (which includes transcripts)
// over the original summaryLine (which may just be the caption text).
return items.map((item) =>
buildQueueSummaryLine(item.prompt.trim() || item.summaryLine?.trim() || ""),
);
}
export async function buildMediaAwareQueueSummaryPrompt(params: {
dropPolicy: FollowupQueueState["dropPolicy"];
droppedCount: number;
summaryLines: string[];
summaryItems: FollowupRun[];
noun: string;
}): Promise<string | undefined> {
if (params.dropPolicy !== "summarize" || params.droppedCount <= 0) {
return undefined;
}
const summaryLines =
params.summaryItems.length > 0
? await resolveSummaryLines(params.summaryItems)
: params.summaryLines;
return buildQueueSummaryPrompt({
state: {
dropPolicy: params.dropPolicy,
droppedCount: params.droppedCount,
summaryLines: [...summaryLines],
},
noun: params.noun,
});
}
export function scheduleFollowupDrain(
key: string,
runFollowup: (run: FollowupRun) => Promise<void>,
@ -107,7 +162,14 @@ export function scheduleFollowupDrain(
}
const items = queue.items.slice();
const summary = previewQueueSummaryPrompt({ state: queue, noun: "message" });
await applyDeferredMediaToQueuedRuns(items);
const summary = await buildMediaAwareQueueSummaryPrompt({
dropPolicy: queue.dropPolicy,
droppedCount: queue.droppedCount,
summaryLines: queue.summaryLines,
summaryItems: queue.summaryItems,
noun: "message",
});
const run = items.at(-1)?.run ?? queue.lastRun;
if (!run) {
break;
@ -129,12 +191,18 @@ export function scheduleFollowupDrain(
});
queue.items.splice(0, items.length);
if (summary) {
clearQueueSummaryState(queue);
clearFollowupQueueSummaryState(queue);
}
continue;
}
const summaryPrompt = previewQueueSummaryPrompt({ state: queue, noun: "message" });
const summaryPrompt = await buildMediaAwareQueueSummaryPrompt({
dropPolicy: queue.dropPolicy,
droppedCount: queue.droppedCount,
summaryLines: queue.summaryLines,
summaryItems: queue.summaryItems,
noun: "message",
});
if (summaryPrompt) {
const run = queue.lastRun;
if (!run) {
@ -155,7 +223,7 @@ export function scheduleFollowupDrain(
) {
break;
}
clearQueueSummaryState(queue);
clearFollowupQueueSummaryState(queue);
continue;
}

View File

@ -1,8 +1,8 @@
import { createDedupeCache } from "../../../infra/dedupe.js";
import { resolveGlobalSingleton } from "../../../shared/global-singleton.js";
import { applyQueueDropPolicy, shouldSkipQueueItem } from "../../../utils/queue-helpers.js";
import { buildQueueSummaryLine, shouldSkipQueueItem } from "../../../utils/queue-helpers.js";
import { kickFollowupDrainIfIdle } from "./drain.js";
import { getExistingFollowupQueue, getFollowupQueue } from "./state.js";
import { getExistingFollowupQueue, getFollowupQueue, type FollowupQueueState } from "./state.js";
import type { FollowupRun, QueueDedupeMode, QueueSettings } from "./types.js";
/**
@ -57,6 +57,34 @@ function isRunAlreadyQueued(
return items.some((item) => item.prompt === run.prompt && hasSameRouting(item));
}
function applyFollowupQueueDropPolicy(queue: FollowupQueueState): boolean {
const cap = queue.cap;
if (cap <= 0 || queue.items.length < cap) {
return true;
}
if (queue.dropPolicy === "new") {
return false;
}
const dropCount = queue.items.length - cap + 1;
const dropped = queue.items.splice(0, dropCount);
if (queue.dropPolicy === "summarize") {
for (const item of dropped) {
queue.droppedCount += 1;
queue.summaryItems.push(item);
queue.summaryLines.push(
buildQueueSummaryLine(item.summaryLine?.trim() || item.prompt.trim()),
);
}
const limit = Math.max(0, cap);
while (queue.summaryLines.length > limit) {
queue.summaryLines.shift();
queue.summaryItems.shift();
}
}
return true;
}
export function enqueueFollowupRun(
key: string,
run: FollowupRun,
@ -83,10 +111,7 @@ export function enqueueFollowupRun(
queue.lastEnqueuedAt = Date.now();
queue.lastRun = run.run;
const shouldEnqueue = applyQueueDropPolicy({
queue,
summarize: (item) => item.summaryLine?.trim() || item.prompt.trim(),
});
const shouldEnqueue = applyFollowupQueueDropPolicy(queue);
if (!shouldEnqueue) {
return false;
}

View File

@ -4,6 +4,7 @@ import type { FollowupRun, QueueDropPolicy, QueueMode, QueueSettings } from "./t
export type FollowupQueueState = {
items: FollowupRun[];
summaryItems: FollowupRun[];
draining: boolean;
lastEnqueuedAt: number;
mode: QueueMode;
@ -47,6 +48,7 @@ export function getFollowupQueue(key: string, settings: QueueSettings): Followup
const created: FollowupQueueState = {
items: [],
summaryItems: [],
draining: false,
lastEnqueuedAt: 0,
mode: settings.mode,
@ -78,6 +80,7 @@ export function clearFollowupQueue(key: string): number {
}
const cleared = queue.items.length + queue.droppedCount;
queue.items.length = 0;
queue.summaryItems.length = 0;
queue.droppedCount = 0;
queue.summaryLines = [];
queue.lastRun = undefined;

View File

@ -2,6 +2,10 @@ import type { ExecToolDefaults } from "../../../agents/bash-tools.js";
import type { SkillSnapshot } from "../../../agents/skills.js";
import type { OpenClawConfig } from "../../../config/config.js";
import type { SessionEntry } from "../../../config/sessions.js";
import type {
MediaUnderstandingDecision,
MediaUnderstandingOutput,
} from "../../../media-understanding/types.js";
import type { InputProvenance } from "../../../sessions/input-provenance.js";
import type { OriginatingChannelType } from "../../templating.js";
import type { ElevatedLevel, ReasoningLevel, ThinkLevel, VerboseLevel } from "../directives.js";
@ -19,12 +23,55 @@ export type QueueSettings = {
export type QueueDedupeMode = "message-id" | "prompt" | "none";
/**
* Snapshot of media-related context fields carried on a FollowupRun so that
* the followup runner can apply media understanding (e.g. voice-note
* transcription) when it was not applied or failed in the primary path.
*/
export type FollowupMediaContext = {
Body?: string;
CommandBody?: string;
RawBody?: string;
Provider?: string;
Surface?: string;
MediaPath?: string;
MediaUrl?: string;
MediaType?: string;
MediaDir?: string;
MediaPaths?: string[];
MediaUrls?: string[];
MediaTypes?: string[];
MediaRemoteHost?: string;
Transcript?: string;
MediaUnderstanding?: MediaUnderstandingOutput[];
MediaUnderstandingDecisions?: MediaUnderstandingDecision[];
OriginatingChannel?: OriginatingChannelType;
OriginatingTo?: string;
AccountId?: string;
MessageThreadId?: string | number;
DeferredMediaApplied?: boolean;
/**
* Set when file extraction has already been applied to Body (either in the
* primary path or by a previous deferred-media run). This avoids re-running
* file extraction when Body already contains real extracted `<file>...</file>`
* blocks.
*/
DeferredFileBlocksExtracted?: boolean;
};
export type FollowupRun = {
prompt: string;
/** Provider message ID, when available (for deduplication). */
messageId?: string;
summaryLine?: string;
enqueuedAt: number;
/**
* Media context snapshot from the original inbound message.
* When present and MediaUnderstanding is empty, the followup runner will
* attempt to apply media understanding (audio transcription, etc.) before
* passing the prompt to the agent.
*/
mediaContext?: FollowupMediaContext;
/**
* Originating channel for reply routing.
* When set, replies should be routed back to this provider