diff --git a/extensions/googlechat/src/monitor.ts b/extensions/googlechat/src/monitor.ts index 49621420e13..08c8cf12556 100644 --- a/extensions/googlechat/src/monitor.ts +++ b/extensions/googlechat/src/monitor.ts @@ -368,7 +368,7 @@ async function downloadAttachment( } async function deliverGoogleChatReply(params: { - payload: { text?: string; mediaUrls?: string[]; mediaUrl?: string; replyToId?: string }; + payload: { text?: string; mediaUrls?: string[]; mediaUrl?: string; replyToId?: string | null }; account: ResolvedGoogleChatAccount; spaceId: string; runtime: GoogleChatRuntimeEnv; @@ -462,7 +462,7 @@ async function deliverGoogleChatReply(params: { account, space: spaceId, text: caption, - thread: payload.replyToId, + thread: payload.replyToId ?? undefined, attachments: [ { attachmentUploadToken: upload.attachmentUploadToken, contentName: loaded.fileName }, ], diff --git a/extensions/imessage/src/monitor/deliver.ts b/extensions/imessage/src/monitor/deliver.ts index 708d319b640..2d021c711ea 100644 --- a/extensions/imessage/src/monitor/deliver.ts +++ b/extensions/imessage/src/monitor/deliver.ts @@ -50,7 +50,7 @@ export async function deliverReplies(params: { maxBytes, client, accountId, - replyToId: payload.replyToId, + replyToId: payload.replyToId ?? undefined, }); sentMessageCache?.remember(scope, { text: chunk, messageId: sent.messageId }); }, @@ -60,7 +60,7 @@ export async function deliverReplies(params: { maxBytes, client, accountId, - replyToId: payload.replyToId, + replyToId: payload.replyToId ?? undefined, }); sentMessageCache?.remember(scope, { text: caption || undefined, diff --git a/extensions/mattermost/src/mattermost/monitor.ts b/extensions/mattermost/src/mattermost/monitor.ts index 958a40de705..03b5c11c9ae 100644 --- a/extensions/mattermost/src/mattermost/monitor.ts +++ b/extensions/mattermost/src/mattermost/monitor.ts @@ -159,7 +159,7 @@ function channelChatType(kind: ChatType): "direct" | "group" | "channel" { export function resolveMattermostReplyRootId(params: { threadRootId?: string; - replyToId?: string; + replyToId?: string | null; }): string | undefined { const threadRootId = params.threadRootId?.trim(); if (threadRootId) { diff --git a/extensions/signal/src/channel.ts b/extensions/signal/src/channel.ts index 9612951c3b4..28b52e4aeb5 100644 --- a/extensions/signal/src/channel.ts +++ b/extensions/signal/src/channel.ts @@ -70,6 +70,8 @@ async function sendSignalOutbound(params: { mediaUrl?: string; mediaLocalRoots?: readonly string[]; accountId?: string; + replyToId?: string | null; + quoteAuthor?: string | null; deps?: { [channelId: string]: unknown }; }) { const { send, maxBytes } = resolveSignalSendContext(params); @@ -79,6 +81,8 @@ async function sendSignalOutbound(params: { ...(params.mediaLocalRoots?.length ? { mediaLocalRoots: params.mediaLocalRoots } : {}), maxBytes, accountId: params.accountId ?? undefined, + replyTo: params.replyToId ?? undefined, + quoteAuthor: params.quoteAuthor ?? undefined, }); } @@ -194,6 +198,8 @@ async function sendFormattedSignalText(ctx: { to: string; text: string; accountId?: string | null; + replyToId?: string | null; + quoteAuthor?: string | null; deps?: { [channelId: string]: unknown }; abortSignal?: AbortSignal; }) { @@ -218,6 +224,7 @@ async function sendFormattedSignalText(ctx: { chunks = [{ text: ctx.text, styles: [] }]; } const results = []; + let first = true; for (const chunk of chunks) { ctx.abortSignal?.throwIfAborted(); const result = await send(ctx.to, chunk.text, { @@ -226,8 +233,11 @@ async function sendFormattedSignalText(ctx: { accountId: ctx.accountId ?? undefined, textMode: "plain", textStyles: chunk.styles, + replyTo: first ? (ctx.replyToId ?? undefined) : undefined, + quoteAuthor: first ? (ctx.quoteAuthor ?? undefined) : undefined, }); results.push(result); + first = false; } return attachChannelToResults("signal", results); } @@ -239,6 +249,8 @@ async function sendFormattedSignalMedia(ctx: { mediaUrl: string; mediaLocalRoots?: readonly string[]; accountId?: string | null; + replyToId?: string | null; + quoteAuthor?: string | null; deps?: { [channelId: string]: unknown }; abortSignal?: AbortSignal; }) { @@ -267,6 +279,8 @@ async function sendFormattedSignalMedia(ctx: { accountId: ctx.accountId ?? undefined, textMode: "plain", textStyles: formatted.styles, + replyTo: ctx.replyToId ?? undefined, + quoteAuthor: ctx.quoteAuthor ?? undefined, }); return attachChannelToResult("signal", result); } @@ -315,12 +329,23 @@ export const signalPlugin: ChannelPlugin = { chunker: (text, limit) => getSignalRuntime().channel.text.chunkText(text, limit), chunkerMode: "text", textChunkLimit: 4000, - sendFormattedText: async ({ cfg, to, text, accountId, deps, abortSignal }) => + sendFormattedText: async ({ + cfg, + to, + text, + accountId, + deps, + abortSignal, + replyToId, + quoteAuthor, + }) => await sendFormattedSignalText({ cfg, to, text, accountId, + replyToId, + quoteAuthor, deps, abortSignal, }), @@ -333,6 +358,8 @@ export const signalPlugin: ChannelPlugin = { accountId, deps, abortSignal, + replyToId, + quoteAuthor, }) => await sendFormattedSignalMedia({ cfg, @@ -341,20 +368,34 @@ export const signalPlugin: ChannelPlugin = { mediaUrl, mediaLocalRoots, accountId, + replyToId, + quoteAuthor, deps, abortSignal, }), ...createAttachedChannelResultAdapter({ channel: "signal", - sendText: async ({ cfg, to, text, accountId, deps }) => + sendText: async ({ cfg, to, text, accountId, deps, replyToId, quoteAuthor }) => await sendSignalOutbound({ cfg, to, text, accountId: accountId ?? undefined, + replyToId, + quoteAuthor, deps, }), - sendMedia: async ({ cfg, to, text, mediaUrl, mediaLocalRoots, accountId, deps }) => + sendMedia: async ({ + cfg, + to, + text, + mediaUrl, + mediaLocalRoots, + accountId, + deps, + replyToId, + quoteAuthor, + }) => await sendSignalOutbound({ cfg, to, @@ -362,6 +403,8 @@ export const signalPlugin: ChannelPlugin = { mediaUrl, mediaLocalRoots, accountId: accountId ?? undefined, + replyToId, + quoteAuthor, deps, }), }), diff --git a/extensions/signal/src/identity.test.ts b/extensions/signal/src/identity.test.ts index a09f81910c6..2bdc70622d7 100644 --- a/extensions/signal/src/identity.test.ts +++ b/extensions/signal/src/identity.test.ts @@ -1,5 +1,6 @@ import { describe, expect, it } from "vitest"; import { + isStrictUuid, looksLikeUuid, resolveSignalPeerId, resolveSignalRecipient, @@ -25,6 +26,30 @@ describe("looksLikeUuid", () => { }); }); +describe("isStrictUuid", () => { + it("accepts canonical hyphenated UUIDs", () => { + expect(isStrictUuid("123e4567-e89b-12d3-a456-426614174000")).toBe(true); + }); + + it("accepts compact 32-hex UUIDs", () => { + expect(isStrictUuid("123e4567e89b12d3a456426614174000")).toBe(true); // pragma: allowlist secret + }); + + it("rejects short hex fragments that looksLikeUuid accepts", () => { + expect(isStrictUuid("abcd-1234")).toBe(false); + }); + + it("rejects numeric ids and phone-like values", () => { + expect(isStrictUuid("1234567890")).toBe(false); + expect(isStrictUuid("+15555551212")).toBe(false); + }); + + it("rejects attacker-crafted strings with hex chars", () => { + expect(isStrictUuid("deadbeef")).toBe(false); + expect(isStrictUuid("not-a-uuid-but-has-hex-cafe")).toBe(false); + }); +}); + describe("signal sender identity", () => { it("prefers sourceNumber over sourceUuid", () => { const sender = resolveSignalSender({ diff --git a/extensions/signal/src/identity.ts b/extensions/signal/src/identity.ts index dbd86ca1584..dfe027120c5 100644 --- a/extensions/signal/src/identity.ts +++ b/extensions/signal/src/identity.ts @@ -24,6 +24,11 @@ export function looksLikeUuid(value: string): boolean { return /[a-f]/i.test(compact); } +/** Strict UUID check: only accepts canonical 8-4-4-4-12 or compact 32-hex formats. */ +export function isStrictUuid(value: string): boolean { + return UUID_HYPHENATED_RE.test(value) || UUID_COMPACT_RE.test(value); +} + function stripSignalPrefix(value: string): string { return value.replace(/^signal:/i, "").trim(); } diff --git a/extensions/signal/src/monitor.ts b/extensions/signal/src/monitor.ts index 9aa32731b1d..1143cdb0a8d 100644 --- a/extensions/signal/src/monitor.ts +++ b/extensions/signal/src/monitor.ts @@ -10,10 +10,6 @@ import type { BackoffPolicy } from "openclaw/plugin-sdk/infra-runtime"; import { waitForTransportReady } from "openclaw/plugin-sdk/infra-runtime"; import { saveMediaBuffer } from "openclaw/plugin-sdk/media-runtime"; import { DEFAULT_GROUP_HISTORY_LIMIT, type HistoryEntry } from "openclaw/plugin-sdk/reply-history"; -import { - deliverTextOrMediaReply, - resolveSendableOutboundReplyParts, -} from "openclaw/plugin-sdk/reply-payload"; import type { ReplyPayload } from "openclaw/plugin-sdk/reply-runtime"; import { chunkTextWithMode, @@ -33,6 +29,12 @@ import type { SignalReactionMessage, SignalReactionTarget, } from "./monitor/event-handler.types.js"; +import { + markSignalReplyConsumed, + resolveSignalReplyDelivery, + type SignalReplyDeliveryState, +} from "./monitor/reply-delivery.js"; +import { isSignalGroupTarget } from "./reply-quote.js"; import { sendMessageSignal } from "./send.js"; import { runSignalSseLoop } from "./sse-reconnect.js"; @@ -299,36 +301,69 @@ async function deliverReplies(params: { maxBytes: number; textLimit: number; chunkMode: "length" | "newline"; + inheritedReplyToId?: string; + replyDeliveryState?: SignalReplyDeliveryState; + resolveQuoteAuthor?: (replyToId: string) => string | undefined; }) { const { replies, target, baseUrl, account, accountId, runtime, maxBytes, textLimit, chunkMode } = params; for (const payload of replies) { - const reply = resolveSendableOutboundReplyParts(payload); - const delivered = await deliverTextOrMediaReply({ + const { payload: resolvedPayload, effectiveReplyTo } = resolveSignalReplyDelivery({ payload, - text: reply.text, - chunkText: (value) => chunkTextWithMode(value, textLimit, chunkMode), - sendText: async (chunk) => { + inheritedReplyToId: params.inheritedReplyToId, + state: params.replyDeliveryState, + }); + const effectiveQuoteAuthor = effectiveReplyTo + ? params.resolveQuoteAuthor?.(effectiveReplyTo) + : undefined; + const text = resolvedPayload.text ?? ""; + const resolvedMediaList = + resolvedPayload.mediaUrls ?? (resolvedPayload.mediaUrl ? [resolvedPayload.mediaUrl] : []); + if (!text && resolvedMediaList.length === 0) { + continue; + } + if (resolvedMediaList.length === 0) { + let first = true; + for (const chunk of chunkTextWithMode(text, textLimit, chunkMode)) { await sendMessageSignal(target, chunk, { baseUrl, account, maxBytes, accountId, + replyTo: first ? effectiveReplyTo : undefined, + quoteAuthor: first ? effectiveQuoteAuthor : undefined, }); - }, - sendMedia: async ({ mediaUrl, caption }) => { - await sendMessageSignal(target, caption ?? "", { + if (first) { + markSignalReplyConsumed(params.replyDeliveryState, effectiveReplyTo, { + isGroup: isSignalGroupTarget(target), + quoteAuthor: effectiveQuoteAuthor, + }); + } + first = false; + } + } else { + let first = true; + for (const url of resolvedMediaList) { + const caption = first ? text : ""; + await sendMessageSignal(target, caption, { baseUrl, account, mediaUrl, maxBytes, accountId, + replyTo: first ? effectiveReplyTo : undefined, + quoteAuthor: first ? effectiveQuoteAuthor : undefined, }); - }, - }); - if (delivered !== "empty") { - runtime.log?.(`delivered reply to ${target}`); + if (first) { + markSignalReplyConsumed(params.replyDeliveryState, effectiveReplyTo, { + isGroup: isSignalGroupTarget(target), + quoteAuthor: effectiveQuoteAuthor, + }); + } + first = false; + } } + runtime.log?.(`delivered reply to ${target}`); } } diff --git a/extensions/signal/src/monitor/event-handler.inbound-context.test.ts b/extensions/signal/src/monitor/event-handler.inbound-context.test.ts index 3aafda7fe3d..e7a4f24753c 100644 --- a/extensions/signal/src/monitor/event-handler.inbound-context.test.ts +++ b/extensions/signal/src/monitor/event-handler.inbound-context.test.ts @@ -1,4 +1,5 @@ import { beforeEach, describe, expect, it, vi } from "vitest"; +import { buildInboundUserContextPrefix } from "../../../../src/auto-reply/reply/inbound-meta.js"; import type { MsgContext } from "../../../../src/auto-reply/templating.js"; import { expectChannelInboundContextContract as expectInboundContextContract } from "../../../../src/channels/plugins/contracts/suites.js"; import { createSignalEventHandler } from "./event-handler.js"; @@ -206,6 +207,138 @@ describe("signal createSignalEventHandler inbound context", () => { expect(capture.ctx?.MediaTypes).toEqual(["image/jpeg", "application/octet-stream"]); }); + it("surfaces quoted reply context in the agent-visible metadata block", async () => { + const handler = createSignalEventHandler( + createBaseSignalEventHandlerDeps({ + cfg: { + messages: { inbound: { debounceMs: 0 } }, + channels: { signal: { dmPolicy: "open", allowFrom: ["*"] } }, + }, + historyLimit: 0, + }), + ); + + await handler( + createSignalReceiveEvent({ + sourceNumber: "+15550002222", + sourceName: "Bob", + timestamp: 1700000000001, + dataMessage: { + message: "thanks", + quote: { + id: 1700000000000, + authorNumber: "+15550003333", + text: " sent the details", + mentions: [{ number: "+15550004444", start: 0, length: 1 }], + }, + attachments: [], + }, + }), + ); + + expect(capture.ctx).toBeTruthy(); + expect(capture.ctx?.ReplyToId).toBe("1700000000000"); + expect(capture.ctx?.ReplyToSender).toBe("+15550003333"); + expect(capture.ctx?.ReplyToBody).toBe("@+15550004444 sent the details"); + expect(capture.ctx?.ReplyToIsQuote).toBe(true); + expect(String(capture.ctx?.Body ?? "")).toContain("[Quoting +15550003333 id:1700000000000]"); + + const userContext = buildInboundUserContextPrefix(capture.ctx!); + expect(userContext).toContain("Replied message (untrusted, for context):"); + expect(userContext).toContain('"sender_label": "+15550003333"'); + expect(userContext).toContain('"body": "@+15550004444 sent the details"'); + }); + + it("keeps quote-only messages when the user sends no new text", async () => { + const handler = createSignalEventHandler( + createBaseSignalEventHandlerDeps({ + cfg: { + messages: { inbound: { debounceMs: 0 } }, + channels: { signal: { dmPolicy: "open", allowFrom: ["*"] } }, + }, + historyLimit: 0, + }), + ); + + await handler( + createSignalReceiveEvent({ + dataMessage: { + message: "", + quote: { + id: 1700000000000, + text: "original context", + }, + attachments: [], + }, + }), + ); + + expect(capture.ctx).toBeTruthy(); + expect(capture.ctx?.RawBody).toBe(""); + expect(capture.ctx?.ReplyToBody).toBe("original context"); + expect(String(capture.ctx?.Body ?? "")).toContain('"original context"'); + }); + + it("uses quoted attachment metadata for media-only quoted replies", async () => { + const handler = createSignalEventHandler( + createBaseSignalEventHandlerDeps({ + cfg: { + messages: { inbound: { debounceMs: 0 } }, + channels: { signal: { dmPolicy: "open", allowFrom: ["*"] } }, + }, + historyLimit: 0, + }), + ); + + await handler( + createSignalReceiveEvent({ + dataMessage: { + message: "nice one", + quote: { + id: 1700000000000, + attachments: [{ contentType: "image/jpeg", filename: "photo.jpg" }], + }, + attachments: [], + }, + }), + ); + + expect(capture.ctx).toBeTruthy(); + expect(capture.ctx?.ReplyToId).toBe("1700000000000"); + expect(capture.ctx?.ReplyToBody).toBe(""); + expect(String(capture.ctx?.Body ?? "")).toContain('""'); + }); + + it("ignores invalid quote ids while preserving the quoted body context", async () => { + const handler = createSignalEventHandler( + createBaseSignalEventHandlerDeps({ + cfg: { + messages: { inbound: { debounceMs: 0 } }, + channels: { signal: { dmPolicy: "open", allowFrom: ["*"] } }, + }, + historyLimit: 0, + }), + ); + + await handler( + createSignalReceiveEvent({ + dataMessage: { + message: "reply", + quote: { + id: "1700000000000abc", + text: "quoted context", + }, + attachments: [], + }, + }), + ); + + expect(capture.ctx).toBeTruthy(); + expect(capture.ctx?.ReplyToId).toBeUndefined(); + expect(capture.ctx?.ReplyToBody).toBe("quoted context"); + expect(String(capture.ctx?.Body ?? "")).not.toContain("id:1700000000000abc"); + }); + it("drops own UUID inbound messages when only accountUuid is configured", async () => { const ownUuid = "123e4567-e89b-12d3-a456-426614174000"; const handler = createSignalEventHandler( diff --git a/extensions/signal/src/monitor/event-handler.mention-gating.test.ts b/extensions/signal/src/monitor/event-handler.mention-gating.test.ts index ffcdb5baba6..1af44b65069 100644 --- a/extensions/signal/src/monitor/event-handler.mention-gating.test.ts +++ b/extensions/signal/src/monitor/event-handler.mention-gating.test.ts @@ -6,6 +6,7 @@ import { createBaseSignalEventHandlerDeps, createSignalReceiveEvent, } from "./event-handler.test-harness.js"; +import type { SignalQuote } from "./event-handler.types.js"; type SignalMsgContext = Pick & { Body?: string; @@ -32,6 +33,7 @@ type GroupEventOpts = { message?: string; attachments?: unknown[]; quoteText?: string; + quote?: SignalQuote; mentions?: Array<{ uuid?: string; number?: string; @@ -45,7 +47,7 @@ function makeGroupEvent(opts: GroupEventOpts) { dataMessage: { message: opts.message ?? "", attachments: opts.attachments ?? [], - quote: opts.quoteText ? { text: opts.quoteText } : undefined, + quote: opts.quote ?? (opts.quoteText ? { text: opts.quoteText } : undefined), mentions: opts.mentions ?? undefined, groupInfo: { groupId: "g1", groupName: "Test Group" }, }, @@ -203,6 +205,33 @@ describe("signal mention gating", () => { await expectSkippedGroupHistory({ message: "", quoteText: "quoted context" }, "quoted context"); }); + it("records quoted media placeholders in pending history for skipped quote-only group messages", async () => { + await expectSkippedGroupHistory( + { + message: "", + quote: { + id: 1700000000000, + attachments: [{ contentType: "image/png", filename: "photo.png" }], + }, + }, + "", + ); + }); + + it("hydrates quote mentions in pending history for skipped quote-only group messages", async () => { + const placeholder = "\uFFFC"; + await expectSkippedGroupHistory( + { + message: "", + quote: { + text: `${placeholder} quoted context`, + mentions: [{ uuid: "123e4567", start: 0, length: placeholder.length }], + }, + }, + "@123e4567 quoted context", + ); + }); + it("bypasses mention gating for authorized control commands", async () => { capturedCtx = undefined; const handler = createMentionHandler({ requireMention: true }); diff --git a/extensions/signal/src/monitor/event-handler.quote.test.ts b/extensions/signal/src/monitor/event-handler.quote.test.ts new file mode 100644 index 00000000000..81ccdcafd96 --- /dev/null +++ b/extensions/signal/src/monitor/event-handler.quote.test.ts @@ -0,0 +1,515 @@ +import { beforeEach, describe, expect, it, vi } from "vitest"; +import { buildInboundUserContextPrefix } from "../../../../src/auto-reply/reply/inbound-meta.js"; +import type { MsgContext } from "../../../../src/auto-reply/templating.js"; +import { createSignalEventHandler } from "./event-handler.js"; +import { + createBaseSignalEventHandlerDeps, + createSignalReceiveEvent, +} from "./event-handler.test-harness.js"; +import type { SignalEventHandlerDeps } from "./event-handler.types.js"; + +type CapturedSignalQuoteContext = Pick< + MsgContext, + "Body" | "BodyForAgent" | "ReplyToBody" | "ReplyToId" | "ReplyToIsQuote" | "ReplyToSender" +> & { + Body?: string; + BodyForAgent?: string; + ReplyToBody?: string; + ReplyToId?: string; + ReplyToIsQuote?: boolean; + ReplyToSender?: string; +}; + +let capturedCtx: CapturedSignalQuoteContext | undefined; + +const { dispatchInboundMessageMock } = vi.hoisted(() => ({ + dispatchInboundMessageMock: vi.fn(), +})); + +function getCapturedCtx() { + return capturedCtx as CapturedSignalQuoteContext; +} + +vi.mock("../../../../src/auto-reply/dispatch.js", async (importOriginal) => { + const actual = await importOriginal(); + return { + ...actual, + dispatchInboundMessage: dispatchInboundMessageMock, + dispatchInboundMessageWithDispatcher: dispatchInboundMessageMock, + dispatchInboundMessageWithBufferedDispatcher: dispatchInboundMessageMock, + }; +}); + +vi.mock("../send.js", () => ({ + sendMessageSignal: vi.fn(), + sendTypingSignal: vi.fn().mockResolvedValue(true), + sendReadReceiptSignal: vi.fn().mockResolvedValue(true), +})); + +vi.mock("../../../../src/pairing/pairing-store.js", () => ({ + readChannelAllowFromStore: vi.fn().mockResolvedValue([]), + upsertChannelPairingRequest: vi.fn(), +})); + +function createQuoteHandler(overrides: Partial = {}) { + return createSignalEventHandler( + createBaseSignalEventHandlerDeps({ + // oxlint-disable-next-line typescript/no-explicit-any + cfg: { messages: { inbound: { debounceMs: 0 } } } as any, + historyLimit: 0, + ...overrides, + }), + ); +} + +describe("signal quote reply handling", () => { + beforeEach(() => { + capturedCtx = undefined; + dispatchInboundMessageMock.mockReset(); + dispatchInboundMessageMock.mockImplementation(async (params: { ctx: unknown }) => { + capturedCtx = params.ctx as CapturedSignalQuoteContext; + return { queuedFinal: false, counts: { tool: 0, block: 0, final: 0 } }; + }); + }); + + it("surfaces quoted text in reply metadata while preserving the new message text", async () => { + const handler = createQuoteHandler(); + + await handler( + createSignalReceiveEvent({ + sourceNumber: "+15550002222", + sourceName: "Bob", + timestamp: 1700000000001, + dataMessage: { + message: "Thanks for the info!", + quote: { + id: 1700000000000, + authorNumber: "+15550003333", + text: "The meeting is at 3pm", + }, + }, + }), + ); + + const ctx = getCapturedCtx(); + expect(ctx?.BodyForAgent).toBe("Thanks for the info!"); + expect(ctx?.ReplyToId).toBe("1700000000000"); + expect(ctx?.ReplyToBody).toBe("The meeting is at 3pm"); + expect(ctx?.ReplyToSender).toBe("+15550003333"); + expect(ctx?.ReplyToIsQuote).toBe(true); + expect(String(ctx?.Body ?? "")).toContain("Thanks for the info!"); + expect(String(ctx?.Body ?? "")).toContain("[Quoting +15550003333 id:1700000000000]"); + }); + + it("uses the latest quote target when debouncing rapid quoted Signal replies", async () => { + vi.useFakeTimers(); + try { + const handler = createQuoteHandler({ + // oxlint-disable-next-line typescript/no-explicit-any + cfg: { messages: { inbound: { debounceMs: 25 } } } as any, + }); + + await handler( + createSignalReceiveEvent({ + sourceNumber: "+15550002222", + sourceName: "Bob", + timestamp: 1700000000001, + dataMessage: { + message: "First chunk", + quote: { + id: 1700000000000, + authorNumber: "+15550003333", + text: "First quoted message", + }, + }, + }), + ); + await handler( + createSignalReceiveEvent({ + sourceNumber: "+15550002222", + sourceName: "Bob", + timestamp: 1700000000002, + dataMessage: { + message: "Second chunk", + quote: { + id: 1700000000009, + authorNumber: "+15550004444", + text: "Second quoted message", + }, + }, + }), + ); + + expect(dispatchInboundMessageMock).not.toHaveBeenCalled(); + + await vi.advanceTimersByTimeAsync(30); + await vi.waitFor(() => { + expect(dispatchInboundMessageMock).toHaveBeenCalledTimes(1); + }); + + const ctx = getCapturedCtx(); + expect(ctx?.BodyForAgent).toBe("First chunk\\nSecond chunk"); + expect(ctx?.ReplyToId).toBe("1700000000009"); + expect(ctx?.ReplyToBody).toBe("Second quoted message"); + expect(ctx?.ReplyToSender).toBe("+15550004444"); + expect(String(ctx?.Body ?? "")).toContain("[Quoting +15550004444 id:1700000000009]"); + expect(String(ctx?.Body ?? "")).not.toContain("[Quoting +15550003333 id:1700000000000]"); + } finally { + vi.useRealTimers(); + } + }); + + it("keeps quote-only replies and exposes the replied-message context block", async () => { + const handler = createQuoteHandler(); + + await handler( + createSignalReceiveEvent({ + dataMessage: { + message: "", + quote: { + id: 1700000000000, + authorNumber: "+15550002222", + text: "Original message to quote", + }, + }, + }), + ); + + const ctx = getCapturedCtx(); + expect(ctx).toBeTruthy(); + expect(ctx?.BodyForAgent).toBe(""); + expect(ctx?.ReplyToBody).toBe("Original message to quote"); + const userContext = buildInboundUserContextPrefix(ctx as MsgContext); + expect(userContext).toContain("Replied message (untrusted, for context):"); + expect(userContext).toContain('"body": "Original message to quote"'); + expect(userContext).toContain('"sender_label": "+15550002222"'); + }); + + it("hydrates Signal mentions inside quoted text before surfacing reply context", async () => { + const handler = createQuoteHandler(); + const placeholder = "\uFFFC"; + + await handler( + createSignalReceiveEvent({ + dataMessage: { + message: "Replying now", + quote: { + id: 1700000000000, + text: `${placeholder} can you check this?`, + mentions: [{ uuid: "123e4567", start: 0, length: placeholder.length }], + }, + }, + }), + ); + + const ctx = getCapturedCtx(); + expect(ctx?.ReplyToBody).toBe("@123e4567 can you check this?"); + expect(String(ctx?.Body ?? "")).toContain('"@123e4567 can you check this?"'); + }); + + it("uses quoted attachment placeholders for media replies without text", async () => { + const handler = createQuoteHandler(); + + await handler( + createSignalReceiveEvent({ + dataMessage: { + message: "Nice photo!", + quote: { + id: 1700000000000, + authorUuid: "123e4567-e89b-12d3-a456-426614174000", + text: null, + attachments: [{ contentType: "image/jpeg" }], + }, + }, + }), + ); + + const ctx = getCapturedCtx(); + expect(ctx?.ReplyToBody).toBe(""); + expect(ctx?.ReplyToSender).toBe("uuid:123e4567-e89b-12d3-a456-426614174000"); + }); + + it("falls back to a generic quoted body when signal-cli sends an empty quoted text string", async () => { + const handler = createQuoteHandler(); + + await handler( + createSignalReceiveEvent({ + dataMessage: { + message: "Replying to media", + quote: { + id: 1700000000000, + text: "", + attachments: [], + }, + }, + }), + ); + + const ctx = getCapturedCtx(); + expect(ctx?.ReplyToId).toBe("1700000000000"); + expect(ctx?.ReplyToBody).toBe(""); + }); + + it("drops invalid quote ids from reply metadata but keeps valid quoted text", async () => { + const handler = createQuoteHandler(); + + await handler( + createSignalReceiveEvent({ + dataMessage: { + message: "I saw this", + quote: { + id: "1700000000000abc", + authorNumber: "+15550002222", + text: "Original text", + }, + }, + }), + ); + + const ctx = getCapturedCtx(); + expect(ctx?.ReplyToId).toBeUndefined(); + expect(ctx?.ReplyToBody).toBe("Original text"); + expect(String(ctx?.Body ?? "")).not.toContain("id:1700000000000abc"); + }); + + it("does not synthesize quote-only context from invalid negative ids", async () => { + const handler = createQuoteHandler(); + + await handler( + createSignalReceiveEvent({ + dataMessage: { + message: "", + quote: { + id: -1, + text: null, + }, + }, + }), + ); + + expect(capturedCtx).toBeUndefined(); + }); + + it("passes inherited reply state through deliverReplies for the current Signal message", async () => { + const deliverReplies = vi.fn().mockResolvedValue(undefined); + dispatchInboundMessageMock.mockImplementationOnce( + async (params: { + ctx: unknown; + dispatcher: { + sendToolResult: (payload: { text: string }) => boolean; + sendFinalReply: (payload: { text: string }) => boolean; + markComplete: () => void; + waitForIdle: () => Promise; + }; + }) => { + capturedCtx = params.ctx as CapturedSignalQuoteContext; + params.dispatcher.sendToolResult({ text: "First reply" }); + params.dispatcher.sendFinalReply({ text: "Second reply" }); + params.dispatcher.markComplete(); + await params.dispatcher.waitForIdle(); + return { queuedFinal: true, counts: { tool: 1, block: 0, final: 1 } }; + }, + ); + const handler = createQuoteHandler({ deliverReplies }); + + await handler( + createSignalReceiveEvent({ + dataMessage: { + message: "Incoming message", + }, + }), + ); + + expect(deliverReplies).toHaveBeenCalledTimes(2); + expect(deliverReplies.mock.calls[0]?.[0].replies[0]?.replyToId).toBeUndefined(); + expect(deliverReplies.mock.calls[1]?.[0].replies[0]?.replyToId).toBeUndefined(); + expect(deliverReplies.mock.calls[0]?.[0].inheritedReplyToId).toBe("1700000000000"); + expect(deliverReplies.mock.calls[1]?.[0].inheritedReplyToId).toBe("1700000000000"); + expect(deliverReplies.mock.calls[0]?.[0].replyDeliveryState).toBe( + deliverReplies.mock.calls[1]?.[0].replyDeliveryState, + ); + }); + + it("preserves explicit replyToId values on later deliveries in the same turn", async () => { + const deliverReplies = vi.fn().mockResolvedValue(undefined); + dispatchInboundMessageMock.mockImplementationOnce( + async (params: { + ctx: unknown; + dispatcher: { + sendToolResult: (payload: { text: string; replyToId: string }) => boolean; + sendFinalReply: (payload: { text: string; replyToId: string }) => boolean; + markComplete: () => void; + waitForIdle: () => Promise; + }; + }) => { + capturedCtx = params.ctx as CapturedSignalQuoteContext; + params.dispatcher.sendToolResult({ text: "First reply", replyToId: "1700000000001" }); + params.dispatcher.sendFinalReply({ text: "Second reply", replyToId: "1700000000002" }); + params.dispatcher.markComplete(); + await params.dispatcher.waitForIdle(); + return { queuedFinal: true, counts: { tool: 1, block: 0, final: 1 } }; + }, + ); + const handler = createQuoteHandler({ deliverReplies }); + + await handler( + createSignalReceiveEvent({ + dataMessage: { + message: "Incoming message", + }, + }), + ); + + expect(deliverReplies).toHaveBeenCalledTimes(2); + expect(deliverReplies.mock.calls[0]?.[0].replies[0]?.replyToId).toBe("1700000000001"); + expect(deliverReplies.mock.calls[1]?.[0].replies[0]?.replyToId).toBe("1700000000002"); + }); + + it("resolves missing quote authors from previously seen group messages", async () => { + const handler = createQuoteHandler(); + + await handler( + createSignalReceiveEvent({ + sourceNumber: "+15550002222", + sourceName: "Bob", + timestamp: 1700000000001, + dataMessage: { + message: "The meeting is at 3pm", + groupInfo: { groupId: "g1", groupName: "Test Group" }, + }, + }), + ); + + capturedCtx = undefined; + + await handler( + createSignalReceiveEvent({ + sourceNumber: "+15550003333", + sourceName: "Alice", + timestamp: 1700000000002, + dataMessage: { + message: "Thanks for the info!", + groupInfo: { groupId: "g1", groupName: "Test Group" }, + quote: { + id: 1700000000001, + text: "The meeting is at 3pm", + }, + }, + }), + ); + + const ctx = getCapturedCtx(); + expect(ctx?.ReplyToSender).toBe("+15550002222"); + expect(String(ctx?.Body ?? "")).toContain("[Quoting +15550002222 id:1700000000001]"); + }); + + it("does not poison the quote-author cache from attacker-controlled quote metadata", async () => { + const handler = createQuoteHandler(); + + await handler( + createSignalReceiveEvent({ + sourceNumber: "+15550002222", + sourceName: "Bob", + timestamp: 1700000000001, + dataMessage: { + message: "Forwarding this", + groupInfo: { groupId: "g1", groupName: "Test Group" }, + quote: { + id: 1700000000000, + authorNumber: "+15550009999", + text: "Mallory wrote this", + }, + }, + }), + ); + + capturedCtx = undefined; + + await handler( + createSignalReceiveEvent({ + sourceNumber: "+15550003333", + sourceName: "Alice", + timestamp: 1700000000002, + dataMessage: { + message: "Replying to Bob", + groupInfo: { groupId: "g1", groupName: "Test Group" }, + quote: { + id: 1700000000001, + text: "Forwarding this", + }, + }, + }), + ); + + const ctx = getCapturedCtx(); + expect(ctx?.ReplyToSender).toBe("+15550002222"); + expect(String(ctx?.Body ?? "")).toContain("[Quoting +15550002222 id:1700000000001]"); + }); + + it("resolves cached uuid senders with a uuid: prefix", async () => { + const handler = createQuoteHandler(); + const senderUuid = "123e4567-e89b-12d3-a456-426614174000"; + + await handler( + createSignalReceiveEvent({ + sourceNumber: null, + sourceUuid: senderUuid, + sourceName: "Bob", + timestamp: 1700000000001, + dataMessage: { + message: "The meeting is at 3pm", + groupInfo: { groupId: "g1", groupName: "Test Group" }, + }, + }), + ); + + capturedCtx = undefined; + + await handler( + createSignalReceiveEvent({ + sourceNumber: "+15550003333", + sourceName: "Alice", + timestamp: 1700000000002, + dataMessage: { + message: "Thanks for the info!", + groupInfo: { groupId: "g1", groupName: "Test Group" }, + quote: { + id: 1700000000001, + text: "The meeting is at 3pm", + }, + }, + }), + ); + + const ctx = getCapturedCtx(); + expect(ctx?.ReplyToSender).toBe(`uuid:${senderUuid}`); + expect(String(ctx?.Body ?? "")).toContain(`[Quoting uuid:${senderUuid} id:1700000000001]`); + }); + + it("preserves uuid: prefix in quote author normalization", async () => { + const handler = createQuoteHandler(); + + await handler( + createSignalReceiveEvent({ + sourceNumber: "+15550002222", + sourceName: "Bob", + timestamp: 1700000000001, + dataMessage: { + message: "Thanks for the info!", + groupInfo: { groupId: "g1", groupName: "Test Group" }, + quote: { + id: 1700000000000, + authorUuid: "uuid:01234567-89ab-cdef-0123-456789abcdef", + text: "The meeting is at 3pm", + }, + }, + }), + ); + + const ctx = getCapturedCtx(); + expect(ctx?.ReplyToSender).toBe("uuid:01234567-89ab-cdef-0123-456789abcdef"); + expect(String(ctx?.Body ?? "")).toContain( + "[Quoting uuid:01234567-89ab-cdef-0123-456789abcdef id:1700000000000]", + ); + }); +}); diff --git a/extensions/signal/src/monitor/event-handler.ts b/extensions/signal/src/monitor/event-handler.ts index 58ff8d4f8d7..2867dda019d 100644 --- a/extensions/signal/src/monitor/event-handler.ts +++ b/extensions/signal/src/monitor/event-handler.ts @@ -41,6 +41,7 @@ import { formatSignalSenderDisplay, formatSignalSenderId, isSignalSenderAllowed, + isStrictUuid, normalizeSignalAllowRecipient, resolveSignalPeerId, resolveSignalRecipient, @@ -55,8 +56,10 @@ import type { SignalEventHandlerDeps, SignalReactionMessage, SignalReceivePayload, + SignalReplyTarget, } from "./event-handler.types.js"; import { renderSignalMentions } from "./mentions.js"; +import { describeSignalReplyTarget } from "./quote-context.js"; function formatAttachmentKindCount(kind: string, count: number): string { if (kind === "attachment") { @@ -96,6 +99,78 @@ function resolveSignalInboundRoute(params: { } export function createSignalEventHandler(deps: SignalEventHandlerDeps) { + // Best-effort index to map (conversation, messageId) -> senderRecipient for quote-author resolution. + // signal-cli requires quote-author for group quotes. + const messageAuthorIndex = new Map(); + const MAX_MESSAGE_AUTHOR_INDEX = 5000; + + const resolveConversationKey = (entry: { + isGroup: boolean; + groupId?: string; + senderPeerId: string; + }): string => + entry.isGroup ? `group:${entry.groupId ?? "unknown"}` : `dm:${entry.senderPeerId}`; + + // Strict E.164: + followed by 7–15 digits (ITU-T E.164 range). + const STRICT_E164_RE = /^\+\d{7,15}$/; + + const normalizeCachedMessageAuthor = (raw?: string) => { + const trimmed = raw?.trim(); + if (!trimmed) { + return undefined; + } + // Strip uuid: prefix and validate strictly. + const unprefixed = trimmed.toLowerCase().startsWith("uuid:") + ? trimmed.slice("uuid:".length).trim() + : undefined; + if (unprefixed !== undefined) { + return isStrictUuid(unprefixed) ? `uuid:${unprefixed}` : undefined; + } + // Bare UUID without prefix. + if (isStrictUuid(trimmed)) { + return `uuid:${trimmed}`; + } + // E.164 phone number — normalize then validate. + const normalized = normalizeE164(trimmed); + return STRICT_E164_RE.test(normalized) ? normalized : undefined; + }; + + const rememberMessageAuthor = (params: { + conversationKey: string; + messageId?: string; + senderRecipient?: string; + }) => { + const id = params.messageId?.trim(); + const senderRecipient = normalizeCachedMessageAuthor(params.senderRecipient); + if (!id || !senderRecipient) { + return; + } + const key = `${params.conversationKey}:${id}`; + // Refresh insertion order for simple LRU-style eviction. + if (messageAuthorIndex.has(key)) { + messageAuthorIndex.delete(key); + } + messageAuthorIndex.set(key, senderRecipient); + while (messageAuthorIndex.size > MAX_MESSAGE_AUTHOR_INDEX) { + const oldest = messageAuthorIndex.keys().next().value; + if (oldest === undefined) { + break; + } + messageAuthorIndex.delete(oldest); + } + }; + + const resolveQuotedAuthor = (params: { + conversationKey: string; + replyToId?: string; + }): string | undefined => { + const replyToId = params.replyToId?.trim(); + if (!replyToId) { + return undefined; + } + return messageAuthorIndex.get(`${params.conversationKey}:${replyToId}`); + }; + type SignalInboundEntry = { senderName: string; senderDisplay: string; @@ -114,6 +189,8 @@ export function createSignalEventHandler(deps: SignalEventHandlerDeps) { mediaTypes?: string[]; commandAuthorized: boolean; wasMentioned?: boolean; + // Quote context fields + quoteTarget?: SignalReplyTarget; }; async function handleSignalInboundMessage(entry: SignalInboundEntry) { @@ -140,11 +217,18 @@ export function createSignalEventHandler(deps: SignalEventHandlerDeps) { storePath, sessionKey: route.sessionKey, }); + const quoteBlock = entry.quoteTarget + ? `[Quoting ${entry.quoteTarget.author ?? "unknown"}${ + entry.quoteTarget.id ? ` id:${entry.quoteTarget.id}` : "" + }]\n"${entry.quoteTarget.body}"\n[/Quoting]` + : ""; + const bodyWithQuote = [entry.bodyText, quoteBlock].filter(Boolean).join("\n\n"); + const body = formatInboundEnvelope({ channel: "Signal", from: fromLabel, timestamp: entry.timestamp ?? undefined, - body: entry.bodyText, + body: bodyWithQuote, chatType: entry.isGroup ? "group" : "direct", sender: { name: entry.senderName, id: entry.senderDisplay }, previousTimestamp, @@ -205,6 +289,11 @@ export function createSignalEventHandler(deps: SignalEventHandlerDeps) { Provider: "signal" as const, Surface: "signal" as const, MessageSid: entry.messageId, + // Quote/Reply context fields + ReplyToId: entry.quoteTarget?.id, + ReplyToBody: entry.quoteTarget?.body, + ReplyToSender: entry.quoteTarget?.author, + ReplyToIsQuote: entry.quoteTarget ? true : undefined, Timestamp: entry.timestamp ?? undefined, MediaPath: entry.mediaPath, MediaType: entry.mediaType, @@ -286,11 +375,14 @@ export function createSignalEventHandler(deps: SignalEventHandlerDeps) { }, }); + const replyDeliveryState = { consumed: false }; + const { dispatcher, replyOptions, markDispatchIdle } = createReplyDispatcherWithTyping({ ...replyPipeline, humanDelay: resolveHumanDelayConfig(deps.cfg, route.agentId), typingCallbacks, deliver: async (payload) => { + const conversationKey = resolveConversationKey(entry); await deps.deliverReplies({ replies: [payload], target: ctxPayload.To, @@ -300,6 +392,19 @@ export function createSignalEventHandler(deps: SignalEventHandlerDeps) { runtime: deps.runtime, maxBytes: deps.mediaMaxBytes, textLimit: deps.textLimit, + inheritedReplyToId: entry.messageId, + replyDeliveryState, + resolveQuoteAuthor: (replyToId) => { + const resolvedAuthor = resolveQuotedAuthor({ conversationKey, replyToId }); + if (resolvedAuthor) { + return resolvedAuthor; + } + // Don't pin explicit reply IDs to the current sender. Only fall back to the + // inbound sender when we're still replying to the current Signal message. + return replyToId === entry.messageId + ? normalizeCachedMessageAuthor(entry.senderRecipient) + : undefined; + }, }); }, onError: (err, info) => { @@ -371,6 +476,11 @@ export function createSignalEventHandler(deps: SignalEventHandlerDeps) { if (!combinedText.trim()) { return; } + // Preserve quoteTarget from the latest entry that has one so the reply + // target matches the newest text in the merged body. + const latestQuoteTarget = entries + .toReversed() + .find((entry) => entry.quoteTarget)?.quoteTarget; await handleSignalInboundMessage({ ...last, bodyText: combinedText, @@ -378,6 +488,7 @@ export function createSignalEventHandler(deps: SignalEventHandlerDeps) { mediaType: undefined, mediaPaths: undefined, mediaTypes: undefined, + quoteTarget: latestQuoteTarget ?? last.quoteTarget, }); }, onError: (err) => { @@ -519,10 +630,28 @@ export function createSignalEventHandler(deps: SignalEventHandlerDeps) { const rawMessage = dataMessage?.message ?? ""; const normalizedMessage = renderSignalMentions(rawMessage, dataMessage?.mentions); const messageText = normalizedMessage.trim(); - - const quoteText = dataMessage?.quote?.text?.trim() ?? ""; + const senderPeerId = resolveSignalPeerId(sender); + const groupId = dataMessage?.groupInfo?.groupId ?? undefined; + const groupName = dataMessage?.groupInfo?.groupName ?? undefined; + const isGroup = Boolean(groupId); + const conversationKey = resolveConversationKey({ + isGroup, + groupId, + senderPeerId, + }); + // NOTE: Full group-quote author resolution currently depends on the patched + // signal-cli install at /opt/signal-cli-0.14.1-patched/, which removes the + // quote.getAuthor().isValid() filter. Once upstream ships lib v140 on JitPack, + // the stock signal-cli build should provide the same quote metadata. + const quoteTarget = dataMessage + ? (describeSignalReplyTarget(dataMessage, { + resolveAuthor: resolveQuotedAuthor, + conversationKey, + }) ?? undefined) + : undefined; const hasBodyContent = - Boolean(messageText || quoteText) || Boolean(!reaction && dataMessage?.attachments?.length); + Boolean(messageText || quoteTarget?.body) || + Boolean(!reaction && dataMessage?.attachments?.length); const senderDisplay = formatSignalSenderDisplay(sender); const { resolveAccessDecision, dmAccess, effectiveDmAllow, effectiveGroupAllow } = await resolveSignalAccessState({ @@ -552,15 +681,11 @@ export function createSignalEventHandler(deps: SignalEventHandlerDeps) { } const senderRecipient = resolveSignalRecipient(sender); - const senderPeerId = resolveSignalPeerId(sender); const senderAllowId = formatSignalSenderId(sender); if (!senderRecipient) { return; } const senderIdLine = formatSignalPairingIdLine(sender); - const groupId = dataMessage.groupInfo?.groupId ?? undefined; - const groupName = dataMessage.groupInfo?.groupName ?? undefined; - const isGroup = Boolean(groupId); if (!isGroup) { const allowedDirectMessage = await handleSignalDirectMessageAccess({ @@ -654,6 +779,7 @@ export function createSignalEventHandler(deps: SignalEventHandlerDeps) { commandAuthorized, }); const effectiveWasMentioned = mentionGate.effectiveWasMentioned; + if (isGroup && requireMention && canDetectMention && mentionGate.shouldSkip) { logInboundDrop({ log: logVerbose, @@ -661,7 +787,6 @@ export function createSignalEventHandler(deps: SignalEventHandlerDeps) { reason: "no mention", target: senderDisplay, }); - const quoteText = dataMessage.quote?.text?.trim() || ""; const pendingPlaceholder = (() => { if (!dataMessage.attachments?.length) { return ""; @@ -681,7 +806,7 @@ export function createSignalEventHandler(deps: SignalEventHandlerDeps) { const pendingKind = kindFromMime(firstContentType ?? undefined); return pendingKind ? `` : ""; })(); - const pendingBodyText = messageText || pendingPlaceholder || quoteText; + const pendingBodyText = messageText || pendingPlaceholder || quoteTarget?.body || ""; const historyKey = groupId ?? "unknown"; recordPendingHistoryEntryIfEnabled({ historyMap: deps.groupHistories, @@ -695,6 +820,12 @@ export function createSignalEventHandler(deps: SignalEventHandlerDeps) { typeof envelope.timestamp === "number" ? String(envelope.timestamp) : undefined, }, }); + // Index the message author even when skipping due to no mention + rememberMessageAuthor({ + conversationKey, + messageId: typeof envelope.timestamp === "number" ? String(envelope.timestamp) : undefined, + senderRecipient: senderRecipient, + }); return; } @@ -745,11 +876,13 @@ export function createSignalEventHandler(deps: SignalEventHandlerDeps) { } } - const bodyText = messageText || placeholder || dataMessage.quote?.text?.trim() || ""; - if (!bodyText) { + // Build body text - don't use quote text as fallback, preserve it for context + const bodyText = messageText || placeholder || ""; + + // Continue if we have either body text OR quote context (quote-only messages are valid) + if (!bodyText && !quoteTarget) { return; } - const receiptTimestamp = typeof envelope.timestamp === "number" ? envelope.timestamp @@ -778,6 +911,17 @@ export function createSignalEventHandler(deps: SignalEventHandlerDeps) { const senderName = envelope.sourceName ?? senderDisplay; const messageId = typeof envelope.timestamp === "number" ? String(envelope.timestamp) : undefined; + + // Record (conversation, messageId) -> senderRecipient so later explicit reply tags + // ([[reply_to:]]) can resolve a correct quote-author. + // SECURITY: Only cache from actual envelope senders — never from quote metadata, + // which is attacker-controlled and could poison the author cache. + rememberMessageAuthor({ + conversationKey, + messageId, + senderRecipient, + }); + await inboundDebouncer.enqueue({ senderName, senderDisplay, @@ -796,6 +940,7 @@ export function createSignalEventHandler(deps: SignalEventHandlerDeps) { mediaTypes: mediaTypes.length > 0 ? mediaTypes : undefined, commandAuthorized, wasMentioned: effectiveWasMentioned, + quoteTarget, }); }; } diff --git a/extensions/signal/src/monitor/event-handler.types.ts b/extensions/signal/src/monitor/event-handler.types.ts index 4ccb85cde5d..f33319b17ba 100644 --- a/extensions/signal/src/monitor/event-handler.types.ts +++ b/extensions/signal/src/monitor/event-handler.types.ts @@ -8,6 +8,7 @@ import type { HistoryEntry } from "openclaw/plugin-sdk/reply-history"; import type { ReplyPayload } from "openclaw/plugin-sdk/reply-runtime"; import type { RuntimeEnv } from "openclaw/plugin-sdk/runtime-env"; import type { SignalSender } from "../identity.js"; +import type { SignalReplyDeliveryState } from "./reply-delivery.js"; export type SignalEnvelope = { sourceNumber?: string | null; @@ -28,6 +29,37 @@ export type SignalMention = { length?: number | null; }; +export type SignalTextStyle = { + start?: number | null; + length?: number | null; + style?: string | null; +}; + +export type SignalQuotedAttachment = { + contentType?: string | null; + filename?: string | null; + thumbnail?: SignalAttachment | null; +}; + +export type SignalQuote = { + id?: number | string | null; // signal-cli quote timestamp + author?: string | null; // deprecated legacy identifier from signal-cli + authorNumber?: string | null; // preferred E.164 author when available + authorUuid?: string | null; // preferred UUID author when available + text?: string | null; + mentions?: Array | null; + attachments?: Array | null; + textStyles?: Array | null; +}; + +export type SignalReplyTarget = { + id?: string; // message id of quoted message + author?: string; // who wrote the quoted message + body: string; // quoted text content + kind: "quote"; // always quote for Signal + mentions?: Array; // mentions in quoted text +}; + export type SignalDataMessage = { timestamp?: number; message?: string | null; @@ -37,7 +69,7 @@ export type SignalDataMessage = { groupId?: string | null; groupName?: string | null; } | null; - quote?: { text?: string | null } | null; + quote?: SignalQuote | null; reaction?: SignalReactionMessage | null; }; @@ -109,6 +141,9 @@ export type SignalEventHandlerDeps = { runtime: RuntimeEnv; maxBytes: number; textLimit: number; + inheritedReplyToId?: string; + replyDeliveryState?: SignalReplyDeliveryState; + resolveQuoteAuthor?: (replyToId: string) => string | undefined; }) => Promise; resolveSignalReactionTargets: (reaction: SignalReactionMessage) => SignalReactionTarget[]; isSignalReactionMessage: ( diff --git a/extensions/signal/src/monitor/quote-context.ts b/extensions/signal/src/monitor/quote-context.ts new file mode 100644 index 00000000000..5c1b031da43 --- /dev/null +++ b/extensions/signal/src/monitor/quote-context.ts @@ -0,0 +1,112 @@ +import { kindFromMime } from "../../../../src/media/mime.js"; +import { normalizeE164 } from "../../../../src/utils.js"; +import { looksLikeUuid } from "../identity.js"; +import type { + SignalDataMessage, + SignalMention, + SignalQuote, + SignalQuotedAttachment, + SignalReplyTarget, +} from "./event-handler.types.js"; +import { renderSignalMentions } from "./mentions.js"; + +export type SignalQuotedAuthorResolver = (params: { + conversationKey: string; + replyToId: string; +}) => string | undefined; + +function filterMentions(mentions?: Array | null) { + const filtered = mentions?.filter((mention): mention is SignalMention => mention != null); + return filtered && filtered.length > 0 ? filtered : undefined; +} + +function normalizeQuoteAuthorValue(raw?: string | null) { + const trimmed = raw?.trim(); + if (!trimmed) { + return undefined; + } + const unprefixed = trimmed.replace(/^uuid:/i, "").trim(); + if (!unprefixed) { + return undefined; + } + if (looksLikeUuid(unprefixed)) { + // Preserve uuid: prefix for signal-cli compatibility + return `uuid:${unprefixed}`; + } + const digits = unprefixed.replace(/[^\d+]/g, ""); + return digits ? normalizeE164(unprefixed) : undefined; +} + +function resolveQuotedAuthorFromPayload(quote: SignalQuote) { + return ( + normalizeQuoteAuthorValue(quote.authorNumber) ?? + normalizeQuoteAuthorValue(quote.authorUuid) ?? + normalizeQuoteAuthorValue(quote.author) + ); +} + +function resolveQuotedAttachmentPlaceholder( + attachments?: Array | null, +) { + const firstContentType = attachments?.find((attachment) => attachment?.contentType)?.contentType; + const kind = kindFromMime(firstContentType ?? undefined); + if (kind) { + return ``; + } + return attachments?.length ? "" : undefined; +} + +export function normalizeSignalQuoteId(rawId?: SignalQuote["id"]) { + if (typeof rawId === "number") { + return Number.isInteger(rawId) && rawId > 0 ? String(rawId) : undefined; + } + const trimmed = rawId?.trim(); + if (!trimmed) { + return undefined; + } + // Only accept decimal digit strings — reject hex (0x10), scientific (1e3), + // and other Number()-parseable formats that would normalize to a different ID. + if (!/^\d+$/.test(trimmed)) { + return undefined; + } + const numeric = Number(trimmed); + return Number.isInteger(numeric) && numeric > 0 ? String(numeric) : undefined; +} + +export function describeSignalReplyTarget( + dataMessage: SignalDataMessage, + opts: { + resolveAuthor?: SignalQuotedAuthorResolver; + conversationKey?: string; + } = {}, +): SignalReplyTarget | null { + const quote = dataMessage.quote; + if (!quote) { + return null; + } + + const id = normalizeSignalQuoteId(quote.id); + const mentions = filterMentions(quote.mentions); + const renderedText = renderSignalMentions(quote.text ?? "", mentions)?.trim() || ""; + const body = + renderedText || + resolveQuotedAttachmentPlaceholder(quote.attachments) || + (id ? "" : ""); + if (!body) { + return null; + } + + const author = + resolveQuotedAuthorFromPayload(quote) ?? + (opts.resolveAuthor && opts.conversationKey && id + ? opts.resolveAuthor({ conversationKey: opts.conversationKey, replyToId: id }) + : undefined); + + return { + id, + author, + body, + kind: "quote", + mentions, + }; +} diff --git a/extensions/signal/src/monitor/reply-delivery.test.ts b/extensions/signal/src/monitor/reply-delivery.test.ts new file mode 100644 index 00000000000..cf9ba0a2319 --- /dev/null +++ b/extensions/signal/src/monitor/reply-delivery.test.ts @@ -0,0 +1,128 @@ +import { describe, expect, it } from "vitest"; +import { + markSignalReplyConsumed, + resolveSignalReplyDelivery, + type SignalReplyDeliveryState, +} from "./reply-delivery.js"; + +describe("resolveSignalReplyDelivery", () => { + it("uses the inherited reply target until the first successful send", () => { + const state: SignalReplyDeliveryState = { consumed: false }; + + const first = resolveSignalReplyDelivery({ + payload: { text: "first" }, + inheritedReplyToId: "1700000000000", + state, + }); + markSignalReplyConsumed(state, first.effectiveReplyTo); + + const second = resolveSignalReplyDelivery({ + payload: { text: "second" }, + inheritedReplyToId: "1700000000000", + state, + }); + + expect(first.payload.replyToId).toBeUndefined(); + expect(first.effectiveReplyTo).toBe("1700000000000"); + expect(second.payload.replyToId).toBeUndefined(); + expect(second.effectiveReplyTo).toBeUndefined(); + }); + + it("keeps explicit reply targets after the inherited reply target is consumed", () => { + const state: SignalReplyDeliveryState = { consumed: true }; + + const explicit = resolveSignalReplyDelivery({ + payload: { text: "second", replyToId: "1700000000002" }, + inheritedReplyToId: "1700000000000", + state, + }); + + expect(explicit.payload.replyToId).toBe("1700000000002"); + expect(explicit.effectiveReplyTo).toBe("1700000000002"); + }); + + it("preserves explicit null reply suppression without consuming inherited reply state", () => { + const state: SignalReplyDeliveryState = { consumed: false }; + + const suppressed = resolveSignalReplyDelivery({ + payload: { text: "first", replyToId: null }, + inheritedReplyToId: "1700000000000", + state, + }); + markSignalReplyConsumed(state, suppressed.effectiveReplyTo); + + const inherited = resolveSignalReplyDelivery({ + payload: { text: "second" }, + inheritedReplyToId: "1700000000000", + state, + }); + + expect(suppressed.payload.replyToId).toBeUndefined(); + expect(suppressed.effectiveReplyTo).toBeUndefined(); + expect(inherited.effectiveReplyTo).toBe("1700000000000"); + }); + + it("does not consume inherited reply state for non-decimal reply ids", () => { + const state: SignalReplyDeliveryState = { consumed: false }; + + // Simulate a malformed reply_to tag that resolved to a non-timestamp string + const malformed = resolveSignalReplyDelivery({ + payload: { text: "first", replyToId: "not-a-timestamp" }, + inheritedReplyToId: "1700000000000", + state, + }); + markSignalReplyConsumed(state, malformed.effectiveReplyTo); + + // The inherited reply should still be available for the next payload + const next = resolveSignalReplyDelivery({ + payload: { text: "second" }, + inheritedReplyToId: "1700000000000", + state, + }); + + expect(malformed.effectiveReplyTo).toBe("not-a-timestamp"); + expect(state.consumed).toBe(false); + expect(next.effectiveReplyTo).toBe("1700000000000"); + }); + + it("does not consume inherited reply state for zero timestamps", () => { + const state: SignalReplyDeliveryState = { consumed: false }; + + const zero = resolveSignalReplyDelivery({ + payload: { text: "first", replyToId: "0" }, + inheritedReplyToId: "1700000000000", + state, + }); + markSignalReplyConsumed(state, zero.effectiveReplyTo); + + const next = resolveSignalReplyDelivery({ + payload: { text: "second" }, + inheritedReplyToId: "1700000000000", + state, + }); + + expect(zero.effectiveReplyTo).toBe("0"); + expect(state.consumed).toBe(false); + expect(next.effectiveReplyTo).toBe("1700000000000"); + }); + + it("does not consume inherited group reply state when quoteAuthor is unavailable", () => { + const state: SignalReplyDeliveryState = { consumed: false }; + + const first = resolveSignalReplyDelivery({ + payload: { text: "first" }, + inheritedReplyToId: "1700000000000", + state, + }); + markSignalReplyConsumed(state, first.effectiveReplyTo, { isGroup: true }); + + const next = resolveSignalReplyDelivery({ + payload: { text: "second" }, + inheritedReplyToId: "1700000000000", + state, + }); + + expect(state.consumed).toBe(false); + expect(next.effectiveReplyTo).toBe("1700000000000"); + }); +}); diff --git a/extensions/signal/src/monitor/reply-delivery.ts b/extensions/signal/src/monitor/reply-delivery.ts new file mode 100644 index 00000000000..5e3396abea8 --- /dev/null +++ b/extensions/signal/src/monitor/reply-delivery.ts @@ -0,0 +1,75 @@ +import type { ReplyPayload } from "../../../../src/auto-reply/types.js"; +import { resolveSignalQuoteMetadata } from "../reply-quote.js"; + +export type SignalReplyDeliveryState = { + consumed: boolean; +}; + +function normalizeReplyToId(raw?: string | null) { + if (raw == null) { + return raw; + } + const trimmed = raw.trim(); + return trimmed ? trimmed : null; +} + +export function resolveSignalReplyDelivery(params: { + payload: ReplyPayload; + inheritedReplyToId?: string | null; + state?: SignalReplyDeliveryState; +}): { + payload: ReplyPayload; + effectiveReplyTo?: string; +} { + const explicitReplyTo = + "replyToId" in params.payload ? normalizeReplyToId(params.payload.replyToId) : undefined; + const inheritedReplyTo = + explicitReplyTo === undefined ? normalizeReplyToId(params.inheritedReplyToId) : undefined; + const effectiveReplyTo = + explicitReplyTo != null + ? explicitReplyTo + : !params.state?.consumed + ? (inheritedReplyTo ?? undefined) + : undefined; + + if (explicitReplyTo === undefined) { + return { + payload: params.payload, + effectiveReplyTo, + }; + } + + return { + payload: + explicitReplyTo === null + ? { ...params.payload, replyToId: undefined } + : params.payload.replyToId === explicitReplyTo + ? params.payload + : { ...params.payload, replyToId: explicitReplyTo }, + effectiveReplyTo, + }; +} + +/** + * Only consume the inherited reply state when Signal can actually send quote + * metadata for the payload. Malformed ids (e.g. a raw `[[reply_to:...]]` tag) + * and group replies without a resolved quote-author are silently dropped by + * `sendMessageSignal`, so consuming state for them would lose the inherited + * quote for subsequent payloads in the same turn. + */ +export function markSignalReplyConsumed( + state: SignalReplyDeliveryState | undefined, + replyToId?: string, + options: { isGroup?: boolean; quoteAuthor?: string | null } = {}, +) { + if ( + state && + resolveSignalQuoteMetadata({ + replyToId, + quoteAuthor: options.quoteAuthor, + isGroup: options.isGroup, + }).quoteTimestamp !== undefined + ) { + state.consumed = true; + } +} diff --git a/extensions/signal/src/outbound-adapter.ts b/extensions/signal/src/outbound-adapter.ts index 08d54ddd052..b46e053bdb9 100644 --- a/extensions/signal/src/outbound-adapter.ts +++ b/extensions/signal/src/outbound-adapter.ts @@ -31,7 +31,16 @@ export const signalOutbound: ChannelOutboundAdapter = { chunker: (text, _limit) => text.split(/\n{2,}/).flatMap((chunk) => (chunk ? [chunk] : [])), chunkerMode: "text", textChunkLimit: 4000, - sendFormattedText: async ({ cfg, to, text, accountId, deps, abortSignal }) => { + sendFormattedText: async ({ + cfg, + to, + text, + accountId, + deps, + abortSignal, + replyToId, + quoteAuthor, + }) => { const send = resolveSignalSender(deps); const maxBytes = resolveSignalMaxBytes({ cfg, @@ -49,6 +58,7 @@ export const signalOutbound: ChannelOutboundAdapter = { chunks = [{ text, styles: [] }]; } const results = []; + let first = true; for (const chunk of chunks) { abortSignal?.throwIfAborted(); const result = await send(to, chunk.text, { @@ -57,8 +67,11 @@ export const signalOutbound: ChannelOutboundAdapter = { accountId: accountId ?? undefined, textMode: "plain", textStyles: chunk.styles, + replyTo: first ? (replyToId ?? undefined) : undefined, + quoteAuthor: first ? (quoteAuthor ?? undefined) : undefined, }); results.push(result); + first = false; } return attachChannelToResults("signal", results); }, @@ -71,6 +84,8 @@ export const signalOutbound: ChannelOutboundAdapter = { accountId, deps, abortSignal, + replyToId, + quoteAuthor, }) => { abortSignal?.throwIfAborted(); const send = resolveSignalSender(deps); @@ -93,12 +108,14 @@ export const signalOutbound: ChannelOutboundAdapter = { textMode: "plain", textStyles: formatted.styles, mediaLocalRoots, + replyTo: replyToId ?? undefined, + quoteAuthor: quoteAuthor ?? undefined, }); return attachChannelToResult("signal", result); }, ...createAttachedChannelResultAdapter({ channel: "signal", - sendText: async ({ cfg, to, text, accountId, deps }) => { + sendText: async ({ cfg, to, text, accountId, deps, replyToId, quoteAuthor }) => { const send = resolveSignalSender(deps); const maxBytes = resolveSignalMaxBytes({ cfg, @@ -108,9 +125,21 @@ export const signalOutbound: ChannelOutboundAdapter = { cfg, maxBytes, accountId: accountId ?? undefined, + replyTo: replyToId ?? undefined, + quoteAuthor: quoteAuthor ?? undefined, }); }, - sendMedia: async ({ cfg, to, text, mediaUrl, mediaLocalRoots, accountId, deps }) => { + sendMedia: async ({ + cfg, + to, + text, + mediaUrl, + mediaLocalRoots, + accountId, + deps, + replyToId, + quoteAuthor, + }) => { const send = resolveSignalSender(deps); const maxBytes = resolveSignalMaxBytes({ cfg, @@ -122,6 +151,8 @@ export const signalOutbound: ChannelOutboundAdapter = { maxBytes, accountId: accountId ?? undefined, mediaLocalRoots, + replyTo: replyToId ?? undefined, + quoteAuthor: quoteAuthor ?? undefined, }); }, }), diff --git a/extensions/signal/src/reply-quote.ts b/extensions/signal/src/reply-quote.ts new file mode 100644 index 00000000000..e533707131d --- /dev/null +++ b/extensions/signal/src/reply-quote.ts @@ -0,0 +1,40 @@ +const SIGNAL_QUOTE_TIMESTAMP_RE = /^\d+$/; + +export function parseSignalQuoteTimestamp(raw?: string | null): number | undefined { + const trimmed = raw?.trim(); + if (!trimmed || !SIGNAL_QUOTE_TIMESTAMP_RE.test(trimmed)) { + return undefined; + } + + const timestamp = Number(trimmed); + return Number.isInteger(timestamp) && timestamp > 0 ? timestamp : undefined; +} + +export function isSignalGroupTarget(rawTarget: string): boolean { + let value = rawTarget.trim(); + if (value.toLowerCase().startsWith("signal:")) { + value = value.slice("signal:".length).trim(); + } + return value.toLowerCase().startsWith("group:"); +} + +export function resolveSignalQuoteMetadata(params: { + replyToId?: string | null; + quoteAuthor?: string | null; + isGroup?: boolean; +}): { + quoteTimestamp?: number; + quoteAuthor?: string; +} { + const quoteTimestamp = parseSignalQuoteTimestamp(params.replyToId); + if (quoteTimestamp === undefined) { + return {}; + } + + const quoteAuthor = params.quoteAuthor?.trim() || undefined; + if (params.isGroup && !quoteAuthor) { + return {}; + } + + return { quoteTimestamp, quoteAuthor }; +} diff --git a/extensions/signal/src/send.test.ts b/extensions/signal/src/send.test.ts new file mode 100644 index 00000000000..d38e5852dc6 --- /dev/null +++ b/extensions/signal/src/send.test.ts @@ -0,0 +1,80 @@ +import { beforeEach, describe, expect, it, vi } from "vitest"; +import { sendMessageSignal } from "./send.js"; + +const rpcMock = vi.fn(); + +vi.mock("../../../src/config/config.js", async (importOriginal) => { + const actual = await importOriginal(); + return { + ...actual, + loadConfig: () => ({}), + }; +}); + +vi.mock("./accounts.js", () => ({ + resolveSignalAccount: () => ({ + accountId: "default", + enabled: true, + baseUrl: "http://signal.local", + configured: true, + config: { account: "+15550001111" }, + }), +})); + +vi.mock("./client.js", () => ({ + signalRpcRequest: (...args: unknown[]) => rpcMock(...args), +})); + +describe("sendMessageSignal", () => { + beforeEach(() => { + rpcMock.mockReset().mockResolvedValue({ timestamp: 123 }); + }); + + it("sends quote-author for group replies when quoteAuthor is available", async () => { + await sendMessageSignal("group:test-group", "hello", { + textMode: "plain", + replyTo: "1700000000000", + quoteAuthor: "uuid:sender-1", + }); + + const params = rpcMock.mock.calls[0]?.[1] as Record; + expect(rpcMock).toHaveBeenCalledWith("send", expect.any(Object), expect.any(Object)); + expect(params.groupId).toBe("test-group"); + expect(params["quote-timestamp"]).toBe(1700000000000); + expect(params["quote-author"]).toBe("uuid:sender-1"); + }); + + it("sends quote-timestamp for direct replies without quoteAuthor", async () => { + await sendMessageSignal("+15551230000", "hello", { + textMode: "plain", + replyTo: "1700000000000", + }); + + const params = rpcMock.mock.calls[0]?.[1] as Record; + expect(params["quote-timestamp"]).toBe(1700000000000); + expect(params["quote-author"]).toBeUndefined(); + }); + + it("ignores replyTo values with trailing non-numeric characters", async () => { + await sendMessageSignal("+15551230000", "hello", { + textMode: "plain", + replyTo: "1700000000000abc", + quoteAuthor: "uuid:sender-1", + }); + + const params = rpcMock.mock.calls[0]?.[1] as Record; + expect(params["quote-timestamp"]).toBeUndefined(); + expect(params["quote-author"]).toBeUndefined(); + }); + + it("skips group quote metadata when quoteAuthor is unavailable", async () => { + await sendMessageSignal("group:test-group", "hello", { + textMode: "plain", + replyTo: "1700000000000", + }); + + const params = rpcMock.mock.calls[0]?.[1] as Record; + expect(params["quote-timestamp"]).toBeUndefined(); + expect(params["quote-author"]).toBeUndefined(); + }); +}); diff --git a/extensions/signal/src/send.ts b/extensions/signal/src/send.ts index c102624836e..e5207286f0d 100644 --- a/extensions/signal/src/send.ts +++ b/extensions/signal/src/send.ts @@ -5,6 +5,7 @@ import { resolveOutboundAttachmentFromUrl } from "openclaw/plugin-sdk/media-runt import { resolveSignalAccount } from "./accounts.js"; import { signalRpcRequest } from "./client.js"; import { markdownToSignalText, type SignalTextStyleRange } from "./format.js"; +import { resolveSignalQuoteMetadata } from "./reply-quote.js"; import { resolveSignalRpcContext } from "./rpc-context.js"; export type SignalSendOpts = { @@ -18,6 +19,8 @@ export type SignalSendOpts = { timeoutMs?: number; textMode?: "markdown" | "plain"; textStyles?: SignalTextStyleRange[]; + replyTo?: string; + quoteAuthor?: string; }; export type SignalSendResult = { @@ -181,6 +184,19 @@ export async function sendMessageSignal( } Object.assign(params, targetParams); + // Add quote parameters for reply functionality + const { quoteTimestamp, quoteAuthor } = resolveSignalQuoteMetadata({ + replyToId: opts.replyTo, + quoteAuthor: opts.quoteAuthor, + isGroup: target.type === "group", + }); + if (quoteTimestamp !== undefined) { + params["quote-timestamp"] = quoteTimestamp; + if (quoteAuthor) { + params["quote-author"] = quoteAuthor; + } + } + const result = await signalRpcRequest<{ timestamp?: number }>("send", params, { baseUrl, timeoutMs: opts.timeoutMs, diff --git a/extensions/telegram/src/bot/delivery.replies.ts b/extensions/telegram/src/bot/delivery.replies.ts index 6222e913461..888dcb5f7c6 100644 --- a/extensions/telegram/src/bot/delivery.replies.ts +++ b/extensions/telegram/src/bot/delivery.replies.ts @@ -649,7 +649,9 @@ export async function deliverReplies(params: { try { const deliveredCountBeforeReply = progress.deliveredCount; const replyToId = - params.replyToMode === "off" ? undefined : resolveTelegramReplyId(reply.replyToId); + params.replyToMode === "off" + ? undefined + : resolveTelegramReplyId(reply.replyToId ?? undefined); const telegramData = reply.channelData?.telegram as TelegramReplyChannelData | undefined; const shouldPinFirstMessage = telegramData?.pin === true; const replyMarkup = buildInlineKeyboard(telegramData?.buttons); diff --git a/extensions/telegram/src/outbound-adapter.ts b/extensions/telegram/src/outbound-adapter.ts index b500fb870cf..f5430eda679 100644 --- a/extensions/telegram/src/outbound-adapter.ts +++ b/extensions/telegram/src/outbound-adapter.ts @@ -10,6 +10,7 @@ import { sendPayloadMediaSequenceOrFallback, } from "openclaw/plugin-sdk/reply-payload"; import type { ReplyPayload } from "openclaw/plugin-sdk/reply-runtime"; +import { markReplyApplied } from "../../../src/infra/outbound/reply-applied.js"; import type { TelegramInlineButtons } from "./button-types.js"; import { resolveTelegramInlineButtons } from "./button-types.js"; import { markdownToTelegramHtmlChunks } from "./format.js"; @@ -21,6 +22,13 @@ export const TELEGRAM_TEXT_CHUNK_LIMIT = 4000; type TelegramSendFn = typeof sendMessageTelegram; type TelegramSendOpts = Parameters[2]; +function attachReplyAppliedMarker( + result: T, + baseOpts: { replyToMessageId?: number }, +) { + return markReplyApplied(result, baseOpts.replyToMessageId !== undefined); +} + function resolveTelegramSendContext(params: { cfg: NonNullable["cfg"]; deps?: OutboundSendDeps; @@ -116,9 +124,10 @@ export const telegramOutbound: ChannelOutboundAdapter = { replyToId, threadId, }); - return await send(to, text, { + const result = await send(to, text, { ...baseOpts, }); + return attachReplyAppliedMarker(result, baseOpts); }, sendMedia: async ({ cfg, @@ -145,6 +154,7 @@ export const telegramOutbound: ChannelOutboundAdapter = { mediaLocalRoots, forceDocument: forceDocument ?? false, }); + return attachReplyAppliedMarker(result, baseOpts); }, }), sendPayload: async ({ @@ -175,6 +185,6 @@ export const telegramOutbound: ChannelOutboundAdapter = { forceDocument: forceDocument ?? false, }, }); - return attachChannelToResult("telegram", result); + return attachReplyAppliedMarker(attachChannelToResult("telegram", result), baseOpts); }, }; diff --git a/src/agents/pi-embedded-runner/types.ts b/src/agents/pi-embedded-runner/types.ts index 722abbf2a9a..ddc7f0f5406 100644 --- a/src/agents/pi-embedded-runner/types.ts +++ b/src/agents/pi-embedded-runner/types.ts @@ -59,7 +59,7 @@ export type EmbeddedPiRunResult = { text?: string; mediaUrl?: string; mediaUrls?: string[]; - replyToId?: string; + replyToId?: string | null; isError?: boolean; }>; meta: EmbeddedPiRunMeta; diff --git a/src/auto-reply/reply.directive.directive-behavior.e2e-mocks.ts b/src/auto-reply/reply.directive.directive-behavior.e2e-mocks.ts index 5199ba84887..347ea7089e3 100644 --- a/src/auto-reply/reply.directive.directive-behavior.e2e-mocks.ts +++ b/src/auto-reply/reply.directive.directive-behavior.e2e-mocks.ts @@ -4,7 +4,7 @@ export const runEmbeddedPiAgentMock: Mock = vi.fn(); vi.mock("../agents/pi-embedded.js", () => ({ abortEmbeddedPiRun: vi.fn().mockReturnValue(false), - runEmbeddedPiAgent: (...args: unknown[]) => runEmbeddedPiAgentMock(...args), + runEmbeddedPiAgent: runEmbeddedPiAgentMock, queueEmbeddedPiMessage: vi.fn().mockReturnValue(false), resolveEmbeddedSessionLane: (key: string) => `session:${key.trim() || "main"}`, isEmbeddedPiRunActive: vi.fn().mockReturnValue(false), diff --git a/src/auto-reply/reply/block-reply-pipeline.test.ts b/src/auto-reply/reply/block-reply-pipeline.test.ts index 92564033df5..dc4070e5052 100644 --- a/src/auto-reply/reply/block-reply-pipeline.test.ts +++ b/src/auto-reply/reply/block-reply-pipeline.test.ts @@ -45,7 +45,7 @@ describe("createBlockReplyContentKey", () => { describe("createBlockReplyPipeline dedup with threading", () => { it("keeps separate deliveries for same text with different replyToId", async () => { - const sent: Array<{ text?: string; replyToId?: string }> = []; + const sent: Array<{ text?: string; replyToId?: string | null }> = []; const pipeline = createBlockReplyPipeline({ onBlockReply: async (payload) => { sent.push({ text: payload.text, replyToId: payload.replyToId }); diff --git a/src/auto-reply/reply/reply-payloads.test.ts b/src/auto-reply/reply/reply-payloads.test.ts index 8664eec5c72..3156b0096ee 100644 --- a/src/auto-reply/reply/reply-payloads.test.ts +++ b/src/auto-reply/reply/reply-payloads.test.ts @@ -2,10 +2,33 @@ import { describe, expect, it } from "vitest"; import { setActivePluginRegistry } from "../../plugins/runtime.js"; import { createTestRegistry } from "../../test-utils/channel-plugins.js"; import { + applyReplyThreading, filterMessagingToolMediaDuplicates, shouldSuppressMessagingToolReplies, } from "./reply-payloads.js"; +describe("applyReplyThreading", () => { + it("treats whitespace-only replyToId as unset so implicit replies still apply", () => { + const result = applyReplyThreading({ + payloads: [{ text: "hello", replyToId: " \n\t " }], + replyToMode: "all", + currentMessageId: "123", + }); + + expect(result).toEqual([{ text: "hello", replyToId: "123" }]); + }); + + it("preserves explicit null replyToId as do-not-reply", () => { + const result = applyReplyThreading({ + payloads: [{ text: "hello", replyToId: null }], + replyToMode: "all", + currentMessageId: "123", + }); + + expect(result).toEqual([{ text: "hello", replyToId: null }]); + }); +}); + describe("filterMessagingToolMediaDuplicates", () => { it("strips mediaUrl when it matches sentMediaUrls", () => { const result = filterMessagingToolMediaDuplicates({ diff --git a/src/auto-reply/reply/reply-payloads.ts b/src/auto-reply/reply/reply-payloads.ts index 1826d1872af..b8be89f84ef 100644 --- a/src/auto-reply/reply/reply-payloads.ts +++ b/src/auto-reply/reply/reply-payloads.ts @@ -31,12 +31,23 @@ function resolveReplyThreadingForPayload(params: { }): ReplyPayload { const implicitReplyToId = params.implicitReplyToId?.trim() || undefined; const currentMessageId = params.currentMessageId?.trim() || undefined; + const normalizedReplyToId = + typeof params.payload.replyToId === "string" + ? params.payload.replyToId.trim() || undefined + : params.payload.replyToId; + const normalizedPayload = + normalizedReplyToId === params.payload.replyToId + ? params.payload + : { ...params.payload, replyToId: normalizedReplyToId }; + + const hasExplicitReplyToId = normalizedReplyToId !== undefined; // 1) Apply implicit reply threading first (replyToMode will strip later if needed). + // Treat replyToId=null as an explicit "do not reply" signal (do not override). let resolved: ReplyPayload = - params.payload.replyToId || params.payload.replyToCurrent === false || !implicitReplyToId - ? params.payload - : { ...params.payload, replyToId: implicitReplyToId }; + hasExplicitReplyToId || normalizedPayload.replyToCurrent === false || !implicitReplyToId + ? normalizedPayload + : { ...normalizedPayload, replyToId: implicitReplyToId }; // 2) Parse explicit reply tags from text (if present) and clean them. if (typeof resolved.text === "string" && resolved.text.includes("[[")) { @@ -55,7 +66,7 @@ function resolveReplyThreadingForPayload(params: { // 3) If replyToCurrent was set out-of-band (e.g. tags already stripped upstream), // ensure replyToId is set to the current message id when available. - if (resolved.replyToCurrent && !resolved.replyToId && currentMessageId) { + if (resolved.replyToCurrent && resolved.replyToId === undefined && currentMessageId) { resolved = { ...resolved, replyToId: currentMessageId, diff --git a/src/auto-reply/reply/reply-plumbing.test.ts b/src/auto-reply/reply/reply-plumbing.test.ts index 6e039333c58..700ef5a06fb 100644 --- a/src/auto-reply/reply/reply-plumbing.test.ts +++ b/src/auto-reply/reply/reply-plumbing.test.ts @@ -159,6 +159,39 @@ describe("applyReplyThreading auto-threading", () => { expect(result[0].replyToId).toBe("42"); }); + it("does not overwrite explicit replyToId:null when implicit threading is available", () => { + const result = applyReplyThreading({ + payloads: [{ text: "Hello", replyToId: null }], + replyToMode: "all", + currentMessageId: "42", + }); + + expect(result).toHaveLength(1); + expect(result[0].replyToId).toBeNull(); + }); + + it("treats blank explicit replyToId as undefined so implicit threading still applies", () => { + const result = applyReplyThreading({ + payloads: [{ text: "Hello", replyToId: " " }], + replyToMode: "all", + currentMessageId: "42", + }); + + expect(result).toHaveLength(1); + expect(result[0].replyToId).toBe("42"); + }); + + it("trims non-empty explicit replyToId before preserving it", () => { + const result = applyReplyThreading({ + payloads: [{ text: "Hello", replyToId: " abc-123 " }], + replyToMode: "all", + currentMessageId: "42", + }); + + expect(result).toHaveLength(1); + expect(result[0].replyToId).toBe("abc-123"); + }); + it("threads only first payload when mode is 'first'", () => { const result = applyReplyThreading({ payloads: [{ text: "A" }, { text: "B" }], diff --git a/src/auto-reply/types.ts b/src/auto-reply/types.ts index c424f43ab92..5d98941aaad 100644 --- a/src/auto-reply/types.ts +++ b/src/auto-reply/types.ts @@ -81,7 +81,7 @@ export type ReplyPayload = { btw?: { question: string; }; - replyToId?: string; + replyToId?: string | null; replyToTag?: boolean; /** True when [[reply_to_current]] was present but not yet mapped to a message id. */ replyToCurrent?: boolean; diff --git a/src/channels/plugins/outbound/direct-text-media.ts b/src/channels/plugins/outbound/direct-text-media.ts index 80a7178a10e..7cb6d98ebdb 100644 --- a/src/channels/plugins/outbound/direct-text-media.ts +++ b/src/channels/plugins/outbound/direct-text-media.ts @@ -15,6 +15,7 @@ type DirectSendOptions = { cfg: OpenClawConfig; accountId?: string | null; replyToId?: string | null; + quoteAuthor?: string | null; mediaUrl?: string; mediaLocalRoots?: readonly string[]; maxBytes?: number; @@ -78,6 +79,7 @@ export function createDirectTextMediaOutbound< accountId?: string | null; deps?: OutboundSendDeps; replyToId?: string | null; + quoteAuthor?: string | null; mediaUrl?: string; mediaLocalRoots?: readonly string[]; buildOptions: (params: DirectSendOptions) => TOpts; @@ -96,6 +98,7 @@ export function createDirectTextMediaOutbound< mediaLocalRoots: sendParams.mediaLocalRoots, accountId: sendParams.accountId, replyToId: sendParams.replyToId, + quoteAuthor: sendParams.quoteAuthor, maxBytes, }), ); @@ -109,7 +112,7 @@ export function createDirectTextMediaOutbound< textChunkLimit: 4000, sendPayload: async (ctx) => await sendTextMediaPayload({ channel: params.channel, ctx, adapter: outbound }), - sendText: async ({ cfg, to, text, accountId, deps, replyToId }) => { + sendText: async ({ cfg, to, text, accountId, deps, replyToId, quoteAuthor }) => { return await sendDirect({ cfg, to, @@ -117,10 +120,21 @@ export function createDirectTextMediaOutbound< accountId, deps, replyToId, + quoteAuthor, buildOptions: params.buildTextOptions, }); }, - sendMedia: async ({ cfg, to, text, mediaUrl, mediaLocalRoots, accountId, deps, replyToId }) => { + sendMedia: async ({ + cfg, + to, + text, + mediaUrl, + mediaLocalRoots, + accountId, + deps, + replyToId, + quoteAuthor, + }) => { return await sendDirect({ cfg, to, @@ -130,6 +144,7 @@ export function createDirectTextMediaOutbound< accountId, deps, replyToId, + quoteAuthor, buildOptions: params.buildMediaOptions, }); }, diff --git a/src/channels/plugins/outbound/signal.test.ts b/src/channels/plugins/outbound/signal.test.ts index 5d28e4aefaf..e18967d4e85 100644 --- a/src/channels/plugins/outbound/signal.test.ts +++ b/src/channels/plugins/outbound/signal.test.ts @@ -67,4 +67,32 @@ describe("signalOutbound", () => { ); expect(result).toEqual({ channel: "signal", messageId: "sig-media-1", timestamp: 456 }); }); + + it("passes replyToId and quoteAuthor through sendText", async () => { + const sendSignal = vi.fn().mockResolvedValue({ messageId: "sig-text-2", timestamp: 789 }); + const sendText = signalOutbound.sendText; + expect(sendText).toBeDefined(); + + const result = await sendText!({ + cfg, + to: "group:test-group", + text: "hello", + accountId: "work", + replyToId: "1700000000000", + quoteAuthor: "uuid:sender-1", + deps: { sendSignal }, + }); + + expect(sendSignal).toHaveBeenCalledWith( + "group:test-group", + "hello", + expect.objectContaining({ + accountId: "work", + maxBytes: 4 * 1024 * 1024, + replyTo: "1700000000000", + quoteAuthor: "uuid:sender-1", + }), + ); + expect(result).toEqual({ channel: "signal", messageId: "sig-text-2", timestamp: 789 }); + }); }); diff --git a/src/channels/plugins/types.adapters.ts b/src/channels/plugins/types.adapters.ts index 14a7ab10b8e..050d98931f6 100644 --- a/src/channels/plugins/types.adapters.ts +++ b/src/channels/plugins/types.adapters.ts @@ -135,6 +135,7 @@ export type ChannelOutboundContext = { /** Send image as document to avoid Telegram compression. */ forceDocument?: boolean; replyToId?: string | null; + quoteAuthor?: string | null; threadId?: string | number | null; accountId?: string | null; identity?: OutboundIdentity; diff --git a/src/infra/outbound/deliver.greptile-fixes.test.ts b/src/infra/outbound/deliver.greptile-fixes.test.ts new file mode 100644 index 00000000000..1fec54cfd8d --- /dev/null +++ b/src/infra/outbound/deliver.greptile-fixes.test.ts @@ -0,0 +1,472 @@ +import { afterEach, beforeEach, describe, expect, it, vi } from "vitest"; +import type { ReplyPayload } from "../../auto-reply/types.js"; +import { telegramOutbound } from "../../channels/plugins/outbound/telegram.js"; +import type { OpenClawConfig } from "../../config/config.js"; +import { setActivePluginRegistry } from "../../plugins/runtime.js"; +import { createOutboundTestPlugin, createTestRegistry } from "../../test-utils/channel-plugins.js"; + +const { deliverOutboundPayloads } = await import("./deliver.js"); +const defaultRegistry = createTestRegistry([ + { + pluginId: "telegram", + plugin: createOutboundTestPlugin({ id: "telegram", outbound: telegramOutbound }), + source: "test", + }, +]); + +describe("deliverOutboundPayloads Greptile fixes", () => { + beforeEach(() => { + setActivePluginRegistry(defaultRegistry); + }); + + afterEach(() => { + setActivePluginRegistry(createTestRegistry()); + }); + + it("retries replyToId on later non-signal text payloads after a best-effort failure", async () => { + const sendTelegram = vi + .fn() + .mockRejectedValueOnce(new Error("text fail")) + .mockResolvedValueOnce({ messageId: "m2", chatId: "chat-1" }); + const onError = vi.fn(); + const cfg: OpenClawConfig = { + channels: { telegram: { botToken: "tok-1", textChunkLimit: 2 } }, + }; + + const results = await deliverOutboundPayloads({ + cfg, + channel: "telegram", + to: "123", + payloads: [{ text: "ab" }, { text: "cd" }], + replyToId: "777", + deps: { sendTelegram }, + bestEffort: true, + onError, + skipQueue: true, + }); + + expect(sendTelegram).toHaveBeenCalledTimes(2); + expect(sendTelegram.mock.calls[0]?.[2]).toEqual( + expect.objectContaining({ replyToMessageId: 777 }), + ); + expect(sendTelegram.mock.calls[1]?.[2]).toEqual( + expect.objectContaining({ replyToMessageId: 777 }), + ); + expect(onError).toHaveBeenCalledTimes(1); + expect(results).toEqual([{ messageId: "m2", chatId: "chat-1", channel: "telegram" }]); + }); + + it("retries replyToId on later sendPayload payloads after a best-effort failure", async () => { + const sendPayload = vi + .fn() + .mockRejectedValueOnce(new Error("payload fail")) + .mockResolvedValueOnce({ channel: "matrix", messageId: "mx-2" }); + const sendText = vi.fn(); + const sendMedia = vi.fn(); + const onError = vi.fn(); + + setActivePluginRegistry( + createTestRegistry([ + { + pluginId: "matrix", + source: "test", + plugin: createOutboundTestPlugin({ + id: "matrix", + outbound: { deliveryMode: "direct", sendPayload, sendText, sendMedia }, + }), + }, + ]), + ); + + const results = await deliverOutboundPayloads({ + cfg: {}, + channel: "matrix", + to: "!room:1", + payloads: [ + { text: "first", channelData: { mode: "custom" } }, + { text: "second", channelData: { mode: "custom" } }, + ], + replyToId: "orig-msg-id", + bestEffort: true, + onError, + skipQueue: true, + }); + + expect(sendPayload).toHaveBeenCalledTimes(2); + expect(sendPayload.mock.calls[0]?.[0]?.replyToId).toBe("orig-msg-id"); + expect(sendPayload.mock.calls[1]?.[0]?.replyToId).toBe("orig-msg-id"); + expect(onError).toHaveBeenCalledTimes(1); + expect(results).toEqual([{ channel: "matrix", messageId: "mx-2" }]); + }); + + it("preserves explicit null reply suppression without consuming the inherited reply", async () => { + const sendPayload = vi + .fn() + .mockResolvedValueOnce({ channel: "matrix", messageId: "mx-1" }) + .mockResolvedValueOnce({ channel: "matrix", messageId: "mx-2" }); + const sendText = vi.fn(); + const sendMedia = vi.fn(); + + setActivePluginRegistry( + createTestRegistry([ + { + pluginId: "matrix", + source: "test", + plugin: createOutboundTestPlugin({ + id: "matrix", + outbound: { deliveryMode: "direct", sendPayload, sendText, sendMedia }, + }), + }, + ]), + ); + + const payloads: ReplyPayload[] = [ + { text: "first", channelData: { mode: "custom" }, replyToId: null }, + { text: "second", channelData: { mode: "custom" } }, + ]; + + const results = await deliverOutboundPayloads({ + cfg: {}, + channel: "matrix", + to: "!room:1", + payloads, + replyToId: "orig-msg-id", + skipQueue: true, + }); + + expect(sendPayload).toHaveBeenCalledTimes(2); + expect(sendPayload.mock.calls[0]?.[0]?.replyToId).toBeUndefined(); + expect(sendPayload.mock.calls[1]?.[0]?.replyToId).toBe("orig-msg-id"); + expect(results).toEqual([ + { channel: "matrix", messageId: "mx-1" }, + { channel: "matrix", messageId: "mx-2" }, + ]); + }); + + it("treats replyToId: undefined as inherited reply metadata", async () => { + const sendPayload = vi + .fn() + .mockResolvedValueOnce({ channel: "matrix", messageId: "mx-1" }) + .mockResolvedValueOnce({ channel: "matrix", messageId: "mx-2" }); + const sendText = vi.fn(); + const sendMedia = vi.fn(); + + setActivePluginRegistry( + createTestRegistry([ + { + pluginId: "matrix", + source: "test", + plugin: createOutboundTestPlugin({ + id: "matrix", + outbound: { deliveryMode: "direct", sendPayload, sendText, sendMedia }, + }), + }, + ]), + ); + + const results = await deliverOutboundPayloads({ + cfg: {}, + channel: "matrix", + to: "!room:1", + payloads: [ + { text: "first", channelData: { mode: "custom" }, replyToId: undefined }, + { text: "second", channelData: { mode: "custom" } }, + ], + replyToId: "orig-msg-id", + skipQueue: true, + }); + + expect(sendPayload).toHaveBeenCalledTimes(2); + expect(sendPayload.mock.calls[0]?.[0]?.replyToId).toBe("orig-msg-id"); + expect(sendPayload.mock.calls[1]?.[0]?.replyToId).toBeNull(); + expect(results).toEqual([ + { channel: "matrix", messageId: "mx-1" }, + { channel: "matrix", messageId: "mx-2" }, + ]); + }); + + it("clears replyToId on later sendPayload payloads after the first successful send", async () => { + const sendPayload = vi + .fn() + .mockResolvedValueOnce({ channel: "matrix", messageId: "mx-1" }) + .mockResolvedValueOnce({ channel: "matrix", messageId: "mx-2" }); + const sendText = vi.fn(); + const sendMedia = vi.fn(); + + setActivePluginRegistry( + createTestRegistry([ + { + pluginId: "matrix", + source: "test", + plugin: createOutboundTestPlugin({ + id: "matrix", + outbound: { deliveryMode: "direct", sendPayload, sendText, sendMedia }, + }), + }, + ]), + ); + + const results = await deliverOutboundPayloads({ + cfg: {}, + channel: "matrix", + to: "!room:1", + payloads: [ + { text: "first", channelData: { mode: "custom" } }, + { text: "second", channelData: { mode: "custom" } }, + ], + replyToId: "orig-msg-id", + skipQueue: true, + }); + + expect(sendPayload).toHaveBeenCalledTimes(2); + expect(sendPayload.mock.calls[0]?.[0]?.replyToId).toBe("orig-msg-id"); + expect(sendPayload.mock.calls[1]?.[0]?.replyToId).toBeNull(); + expect(results).toEqual([ + { channel: "matrix", messageId: "mx-1" }, + { channel: "matrix", messageId: "mx-2" }, + ]); + }); + + it("preserves later explicit replyToId values after the first successful send", async () => { + const sendPayload = vi + .fn() + .mockResolvedValueOnce({ channel: "matrix", messageId: "mx-1" }) + .mockResolvedValueOnce({ channel: "matrix", messageId: "mx-2" }) + .mockResolvedValueOnce({ channel: "matrix", messageId: "mx-3" }); + const sendText = vi.fn(); + const sendMedia = vi.fn(); + + setActivePluginRegistry( + createTestRegistry([ + { + pluginId: "matrix", + source: "test", + plugin: createOutboundTestPlugin({ + id: "matrix", + outbound: { deliveryMode: "direct", sendPayload, sendText, sendMedia }, + }), + }, + ]), + ); + + const results = await deliverOutboundPayloads({ + cfg: {}, + channel: "matrix", + to: "!room:1", + payloads: [ + { text: "first", replyToId: "reply-1", channelData: { mode: "custom" } }, + { text: "second", replyToId: "reply-2", channelData: { mode: "custom" } }, + { text: "third", replyToId: "reply-3", channelData: { mode: "custom" } }, + ], + skipQueue: true, + }); + + expect(sendPayload).toHaveBeenCalledTimes(3); + expect(sendPayload.mock.calls[0]?.[0]?.replyToId).toBe("reply-1"); + expect(sendPayload.mock.calls[1]?.[0]?.replyToId).toBe("reply-2"); + expect(sendPayload.mock.calls[2]?.[0]?.replyToId).toBe("reply-3"); + expect(results).toEqual([ + { channel: "matrix", messageId: "mx-1" }, + { channel: "matrix", messageId: "mx-2" }, + { channel: "matrix", messageId: "mx-3" }, + ]); + }); + + it("preserves inherited replyToId across all googlechat sendPayload payloads (thread routing)", async () => { + const sendPayload = vi + .fn() + .mockResolvedValueOnce({ channel: "googlechat", messageId: "gc-1" }) + .mockResolvedValueOnce({ channel: "googlechat", messageId: "gc-2" }) + .mockResolvedValueOnce({ channel: "googlechat", messageId: "gc-3" }); + const sendText = vi.fn(); + const sendMedia = vi.fn(); + + setActivePluginRegistry( + createTestRegistry([ + { + pluginId: "googlechat", + source: "test", + plugin: createOutboundTestPlugin({ + id: "googlechat", + outbound: { deliveryMode: "direct", sendPayload, sendText, sendMedia }, + }), + }, + ]), + ); + + const results = await deliverOutboundPayloads({ + cfg: {}, + channel: "googlechat", + to: "spaces/AAAA", + payloads: [ + { text: "first", channelData: { mode: "custom" } }, + { text: "second", channelData: { mode: "custom" } }, + { text: "third", channelData: { mode: "custom" } }, + ], + replyToId: "spaces/AAAA/threads/BBBB", + skipQueue: true, + }); + + expect(sendPayload).toHaveBeenCalledTimes(3); + // All payloads must retain the thread identifier — consuming it after the + // first send would orphan subsequent payloads to the top level. + expect(sendPayload.mock.calls[0]?.[0]?.replyToId).toBe("spaces/AAAA/threads/BBBB"); + expect(sendPayload.mock.calls[1]?.[0]?.replyToId).toBe("spaces/AAAA/threads/BBBB"); + expect(sendPayload.mock.calls[2]?.[0]?.replyToId).toBe("spaces/AAAA/threads/BBBB"); + expect(results).toEqual([ + { channel: "googlechat", messageId: "gc-1" }, + { channel: "googlechat", messageId: "gc-2" }, + { channel: "googlechat", messageId: "gc-3" }, + ]); + }); + + it("preserves inherited replyToId across googlechat text payloads (sendText path)", async () => { + const sendText = vi + .fn() + .mockResolvedValueOnce({ channel: "googlechat", messageId: "gc-t1", chatId: "spaces/X" }) + .mockResolvedValueOnce({ channel: "googlechat", messageId: "gc-t2", chatId: "spaces/X" }); + const sendMedia = vi.fn(); + + setActivePluginRegistry( + createTestRegistry([ + { + pluginId: "googlechat", + source: "test", + plugin: createOutboundTestPlugin({ + id: "googlechat", + outbound: { deliveryMode: "direct", sendText, sendMedia }, + }), + }, + ]), + ); + + const results = await deliverOutboundPayloads({ + cfg: {}, + channel: "googlechat", + to: "spaces/X", + payloads: [{ text: "chunk one" }, { text: "chunk two" }], + replyToId: "spaces/X/threads/T1", + skipQueue: true, + }); + + expect(sendText).toHaveBeenCalledTimes(2); + // Both text sends should receive replyToId for thread routing + expect(sendText.mock.calls[0]?.[0]?.replyToId).toBe("spaces/X/threads/T1"); + expect(sendText.mock.calls[1]?.[0]?.replyToId).toBe("spaces/X/threads/T1"); + expect(results).toHaveLength(2); + }); + + it("does not consume inherited telegram reply state after an invalid payload-level text override", async () => { + const sendTelegram = vi + .fn() + .mockResolvedValueOnce({ messageId: "tg-1", chatId: "chat-1" }) + .mockResolvedValueOnce({ messageId: "tg-2", chatId: "chat-1" }); + const cfg: OpenClawConfig = { + channels: { telegram: { botToken: "tok-1", textChunkLimit: 4000 } }, + }; + + const results = await deliverOutboundPayloads({ + cfg, + channel: "telegram", + to: "123", + payloads: [{ text: "first", replyToId: "not-a-number" }, { text: "second" }], + replyToId: "777", + deps: { sendTelegram }, + skipQueue: true, + }); + + const firstOpts = sendTelegram.mock.calls[0]?.[2] as { replyToMessageId?: number } | undefined; + const secondOpts = sendTelegram.mock.calls[1]?.[2] as { replyToMessageId?: number } | undefined; + + expect(sendTelegram).toHaveBeenCalledTimes(2); + expect(firstOpts?.replyToMessageId).toBeUndefined(); + expect(secondOpts?.replyToMessageId).toBe(777); + expect(results).toEqual([ + { channel: "telegram", messageId: "tg-1", chatId: "chat-1" }, + { channel: "telegram", messageId: "tg-2", chatId: "chat-1" }, + ]); + }); + + it("does not consume inherited telegram reply state after an invalid payload-level sendPayload override", async () => { + const sendTelegram = vi + .fn() + .mockResolvedValueOnce({ messageId: "tg-1", chatId: "chat-1" }) + .mockResolvedValueOnce({ messageId: "tg-2", chatId: "chat-1" }); + const cfg: OpenClawConfig = { + channels: { telegram: { botToken: "tok-1", textChunkLimit: 4000 } }, + }; + + const results = await deliverOutboundPayloads({ + cfg, + channel: "telegram", + to: "123", + payloads: [ + { + text: "first", + replyToId: "not-a-number", + channelData: { telegram: { buttons: [] } }, + }, + { + text: "second", + channelData: { telegram: { buttons: [] } }, + }, + ], + replyToId: "777", + deps: { sendTelegram }, + skipQueue: true, + }); + + const firstOpts = sendTelegram.mock.calls[0]?.[2] as { replyToMessageId?: number } | undefined; + const secondOpts = sendTelegram.mock.calls[1]?.[2] as { replyToMessageId?: number } | undefined; + + expect(sendTelegram).toHaveBeenCalledTimes(2); + expect(firstOpts?.replyToMessageId).toBeUndefined(); + expect(secondOpts?.replyToMessageId).toBe(777); + expect(results).toEqual([ + { channel: "telegram", messageId: "tg-1", chatId: "chat-1" }, + { channel: "telegram", messageId: "tg-2", chatId: "chat-1" }, + ]); + }); + + it("retries replyToId on later non-signal media payloads after a best-effort failure", async () => { + const sendText = vi.fn(); + const sendMedia = vi + .fn() + .mockRejectedValueOnce(new Error("media fail")) + .mockResolvedValueOnce({ channel: "matrix", messageId: "mx-media-2" }); + const onError = vi.fn(); + + setActivePluginRegistry( + createTestRegistry([ + { + pluginId: "matrix", + source: "test", + plugin: createOutboundTestPlugin({ + id: "matrix", + outbound: { deliveryMode: "direct", sendText, sendMedia }, + }), + }, + ]), + ); + + const results = await deliverOutboundPayloads({ + cfg: {}, + channel: "matrix", + to: "!room:1", + payloads: [ + { text: "first", mediaUrl: "https://example.com/1.jpg" }, + { text: "second", mediaUrl: "https://example.com/2.jpg" }, + ], + replyToId: "orig-msg-id", + bestEffort: true, + onError, + skipQueue: true, + }); + + expect(sendMedia).toHaveBeenCalledTimes(2); + expect(sendMedia.mock.calls[0]?.[0]?.replyToId).toBe("orig-msg-id"); + expect(sendMedia.mock.calls[1]?.[0]?.replyToId).toBe("orig-msg-id"); + expect(onError).toHaveBeenCalledTimes(1); + expect(results).toEqual([{ channel: "matrix", messageId: "mx-media-2" }]); + }); +}); diff --git a/src/infra/outbound/deliver.test.ts b/src/infra/outbound/deliver.test.ts index 6bf69a519f8..cec3e001fd4 100644 --- a/src/infra/outbound/deliver.test.ts +++ b/src/infra/outbound/deliver.test.ts @@ -3,6 +3,7 @@ import { afterEach, beforeAll, beforeEach, describe, expect, it, vi } from "vite import { markdownToSignalTextChunks } from "../../../extensions/signal/src/format.js"; import { signalOutbound, + slackOutbound, telegramOutbound, whatsappOutbound, } from "../../../test/channel-outbounds.js"; @@ -917,6 +918,155 @@ describe("deliverOutboundPayloads", () => { expect(sendWhatsApp).not.toHaveBeenCalled(); }); + it("does not re-apply Signal reply metadata after partial chunk failure in bestEffort mode", async () => { + const sendSignal = vi + .fn() + .mockResolvedValueOnce({ messageId: "s1", timestamp: 1 }) + .mockRejectedValueOnce(new Error("chunk fail")) + .mockResolvedValueOnce({ messageId: "s3", timestamp: 3 }); + const onError = vi.fn(); + const cfg: OpenClawConfig = { + channels: { signal: { textChunkLimit: 2 } }, + }; + + const results = await deliverOutboundPayloads({ + cfg, + channel: "signal", + to: "+1555", + payloads: [{ text: "abcd" }, { text: "ef" }], + replyToId: "1700000000000", + quoteAuthor: "Tester", + deps: { sendSignal }, + bestEffort: true, + onError, + }); + + expect(sendSignal).toHaveBeenCalledTimes(3); + expect(sendSignal).toHaveBeenNthCalledWith( + 1, + "+1555", + expect.any(String), + expect.objectContaining({ replyTo: "1700000000000", quoteAuthor: "Tester" }), + ); + expect(sendSignal).toHaveBeenNthCalledWith( + 2, + "+1555", + expect.any(String), + expect.objectContaining({ replyTo: undefined, quoteAuthor: undefined }), + ); + expect(sendSignal).toHaveBeenNthCalledWith( + 3, + "+1555", + expect.any(String), + expect.objectContaining({ replyTo: undefined, quoteAuthor: undefined }), + ); + expect(onError).toHaveBeenCalledTimes(1); + expect(results).toEqual([ + { channel: "signal", messageId: "s1", timestamp: 1 }, + { channel: "signal", messageId: "s3", timestamp: 3 }, + ]); + }); + + it("preserves inherited Signal reply metadata after an explicit malformed replyToId is skipped", async () => { + const sendSignal = vi + .fn() + .mockResolvedValueOnce({ messageId: "s1", timestamp: 1 }) + .mockResolvedValueOnce({ messageId: "s2", timestamp: 2 }); + + await deliverOutboundPayloads({ + cfg: { channels: { signal: {} } }, + channel: "signal", + to: "+1555", + payloads: [{ text: "first", replyToId: "not-a-timestamp" }, { text: "second" }], + replyToId: "1700000000000", + quoteAuthor: "uuid:sender-1", + deps: { sendSignal }, + }); + + expect(sendSignal).toHaveBeenCalledTimes(2); + expect(sendSignal).toHaveBeenNthCalledWith( + 1, + "+1555", + "first", + expect.objectContaining({ replyTo: "not-a-timestamp", quoteAuthor: "uuid:sender-1" }), + ); + expect(sendSignal).toHaveBeenNthCalledWith( + 2, + "+1555", + "second", + expect.objectContaining({ replyTo: "1700000000000", quoteAuthor: "uuid:sender-1" }), + ); + }); + + it("preserves inherited Signal group reply metadata when quoteAuthor is unavailable", async () => { + const sendSignal = vi + .fn() + .mockResolvedValueOnce({ messageId: "s1", timestamp: 1 }) + .mockResolvedValueOnce({ messageId: "s2", timestamp: 2 }); + + await deliverOutboundPayloads({ + cfg: { channels: { signal: {} } }, + channel: "signal", + to: "group:test-group", + payloads: [{ text: "first" }, { text: "second" }], + replyToId: "1700000000000", + deps: { sendSignal }, + }); + + expect(sendSignal).toHaveBeenCalledTimes(2); + expect(sendSignal).toHaveBeenNthCalledWith( + 1, + "group:test-group", + "first", + expect.objectContaining({ replyTo: "1700000000000", quoteAuthor: undefined }), + ); + expect(sendSignal).toHaveBeenNthCalledWith( + 2, + "group:test-group", + "second", + expect.objectContaining({ replyTo: "1700000000000", quoteAuthor: undefined }), + ); + }); + + it("keeps inherited Slack thread context across all payloads", async () => { + const sendSlack = vi + .fn() + .mockResolvedValueOnce({ messageId: "sl1", channelId: "C123" }) + .mockResolvedValueOnce({ messageId: "sl2", channelId: "C123" }); + setActivePluginRegistry( + createTestRegistry([ + { + pluginId: "slack", + plugin: createOutboundTestPlugin({ id: "slack", outbound: slackOutbound }), + source: "test", + }, + ]), + ); + + await deliverOutboundPayloads({ + cfg: { channels: { slack: {} } }, + channel: "slack", + to: "C123", + payloads: [{ text: "first" }, { text: "second" }], + replyToId: "thread-123", + deps: { sendSlack }, + }); + + expect(sendSlack).toHaveBeenCalledTimes(2); + expect(sendSlack).toHaveBeenNthCalledWith( + 1, + "C123", + "first", + expect.objectContaining({ threadTs: "thread-123" }), + ); + expect(sendSlack).toHaveBeenNthCalledWith( + 2, + "C123", + "second", + expect.objectContaining({ threadTs: "thread-123" }), + ); + }); + it("passes normalized payload to onError", async () => { const sendWhatsApp = vi.fn().mockRejectedValue(new Error("boom")); const onError = vi.fn(); diff --git a/src/infra/outbound/deliver.ts b/src/infra/outbound/deliver.ts index e1be816c910..e648ac70b92 100644 --- a/src/infra/outbound/deliver.ts +++ b/src/infra/outbound/deliver.ts @@ -2,6 +2,15 @@ import { resolveSendableOutboundReplyParts, sendMediaWithLeadingCaption, } from "openclaw/plugin-sdk/reply-payload"; +import { + markdownToSignalTextChunks, + type SignalTextStyleRange, +} from "../../../extensions/signal/src/format.js"; +import { + isSignalGroupTarget, + resolveSignalQuoteMetadata, +} from "../../../extensions/signal/src/reply-quote.js"; +import { sendMessageSignal } from "../../../extensions/signal/src/send.js"; import { chunkByParagraph, chunkMarkdownTextWithMode, @@ -9,12 +18,14 @@ import { resolveTextChunkLimit, } from "../../auto-reply/chunk.js"; import type { ReplyPayload } from "../../auto-reply/types.js"; +import { resolveChannelMediaMaxBytes } from "../../channels/plugins/media-limits.js"; import { loadChannelOutboundAdapter } from "../../channels/plugins/outbound/load.js"; import type { ChannelOutboundAdapter, ChannelOutboundContext, } from "../../channels/plugins/types.js"; import type { OpenClawConfig } from "../../config/config.js"; +import { resolveMarkdownTableMode } from "../../config/markdown-tables.js"; import { appendAssistantMessageToSessionTranscript, resolveMirroredTranscriptText, @@ -38,6 +49,7 @@ import type { OutboundIdentity } from "./identity.js"; import type { DeliveryMirror } from "./mirror.js"; import type { NormalizedOutboundPayload } from "./payloads.js"; import { normalizeReplyPayloadsForDelivery } from "./payloads.js"; +import { readReplyApplied } from "./reply-applied.js"; import { isPlainTextSurface, sanitizeForPlainText } from "./sanitize-text.js"; import { resolveOutboundSendDep, type OutboundSendDeps } from "./send-deps.js"; import type { OutboundSessionContext } from "./session-context.js"; @@ -118,6 +130,7 @@ type ChannelHandlerParams = { to: string; accountId?: string; replyToId?: string | null; + quoteAuthor?: string | null; threadId?: string | number | null; identity?: OutboundIdentity; deps?: OutboundSendDeps; @@ -161,8 +174,9 @@ function createPluginHandler( threadId?: string | number | null; }): Omit => ({ ...baseCtx, - replyToId: overrides?.replyToId ?? baseCtx.replyToId, - threadId: overrides?.threadId ?? baseCtx.threadId, + // Preserve explicit null overrides so later payloads can suppress inherited reply/thread metadata. + replyToId: overrides && "replyToId" in overrides ? overrides.replyToId : baseCtx.replyToId, + threadId: overrides && "threadId" in overrides ? overrides.threadId : baseCtx.threadId, }); return { chunker, @@ -223,6 +237,7 @@ function createPluginHandler( return sendText({ ...resolveCtx(overrides), text: caption, + mediaUrl, }); }, }; @@ -236,6 +251,7 @@ function createChannelOutboundContextBase( to: params.to, accountId: params.accountId, replyToId: params.replyToId, + quoteAuthor: params.quoteAuthor, threadId: params.threadId, identity: params.identity, gifPlayback: params.gifPlayback, @@ -255,6 +271,7 @@ type DeliverOutboundPayloadsCoreParams = { accountId?: string; payloads: ReplyPayload[]; replyToId?: string | null; + quoteAuthor?: string | null; threadId?: string | number | null; identity?: OutboundIdentity; deps?: OutboundSendDeps; @@ -541,6 +558,8 @@ async function deliverOutboundPayloadsCore( const accountId = params.accountId; const deps = params.deps; const abortSignal = params.abortSignal; + const sendSignal = + resolveOutboundSendDep(params.deps, "signal") ?? sendMessageSignal; const mediaLocalRoots = getAgentScopedMediaLocalRoots( cfg, params.session?.agentId ?? params.mirror?.agentId, @@ -553,6 +572,7 @@ async function deliverOutboundPayloadsCore( deps, accountId, replyToId: params.replyToId, + quoteAuthor: params.quoteAuthor, threadId: params.threadId, identity: params.identity, gifPlayback: params.gifPlayback, @@ -569,6 +589,20 @@ async function deliverOutboundPayloadsCore( ? handler.resolveEffectiveTextChunkLimit(configuredTextLimit) : configuredTextLimit; const chunkMode = handler.chunker ? resolveChunkMode(cfg, channel, accountId) : "length"; + const isSignalChannel = channel === "signal"; + const signalIsGroupTarget = isSignalChannel && isSignalGroupTarget(to); + const signalTableMode = isSignalChannel + ? resolveMarkdownTableMode({ cfg, channel: "signal", accountId }) + : "code"; + const signalMaxBytes = isSignalChannel + ? resolveChannelMediaMaxBytes({ + cfg, + resolveChannelLimitMb: ({ cfg, accountId }) => + cfg.channels?.signal?.accounts?.[accountId]?.mediaMaxMb ?? + cfg.channels?.signal?.mediaMaxMb, + accountId, + }) + : undefined; const sendTextChunks = async ( text: string, @@ -608,6 +642,81 @@ async function deliverOutboundPayloadsCore( } }; const normalizedPayloads = normalizePayloadsForChannelDelivery(payloads, channel, handler); + const sendSignalText = async ( + text: string, + styles: SignalTextStyleRange[], + replyTo?: string, + quoteAuthor?: string, + ) => { + throwIfAborted(abortSignal); + return { + channel: "signal" as const, + ...(await sendSignal(to, text, { + cfg, + maxBytes: signalMaxBytes, + accountId: accountId ?? undefined, + textMode: "plain", + textStyles: styles, + replyTo, + quoteAuthor, + })), + }; + }; + + const sendSignalTextChunks = async (text: string, replyTo?: string, quoteAuthor?: string) => { + throwIfAborted(abortSignal); + let signalChunks = + textLimit === undefined + ? markdownToSignalTextChunks(text, Number.POSITIVE_INFINITY, { + tableMode: signalTableMode, + }) + : markdownToSignalTextChunks(text, textLimit, { tableMode: signalTableMode }); + if (signalChunks.length === 0 && text) { + signalChunks = [{ text, styles: [] }]; + } + let first = true; + for (const chunk of signalChunks) { + throwIfAborted(abortSignal); + results.push( + await sendSignalText( + chunk.text, + chunk.styles, + first ? replyTo : undefined, + first ? quoteAuthor : undefined, + ), + ); + first = false; + } + }; + + const sendSignalMedia = async ( + caption: string, + mediaUrl: string, + replyTo?: string, + quoteAuthor?: string, + ) => { + throwIfAborted(abortSignal); + const formatted = markdownToSignalTextChunks(caption, Number.POSITIVE_INFINITY, { + tableMode: signalTableMode, + })[0] ?? { + text: caption, + styles: [], + }; + return { + channel: "signal" as const, + ...(await sendSignal(to, formatted.text, { + cfg, + mediaUrl, + maxBytes: signalMaxBytes, + accountId: accountId ?? undefined, + textMode: "plain", + textStyles: formatted.styles, + mediaLocalRoots, + replyTo, + quoteAuthor, + })), + }; + }; const hookRunner = getGlobalHookRunner(); const sessionKeyForInternalHooks = params.mirror?.sessionKey ?? params.session?.key; const mirrorIsGroup = params.mirror?.isGroup; @@ -632,6 +741,84 @@ async function deliverOutboundPayloadsCore( }, ); } + let replyConsumed = false; + // Slack, Mattermost, and Google Chat use replyToId as persistent thread + // context (thread_ts, rootId, and threadName respectively) that must survive + // across all payloads. Never consume inherited reply state for thread-based + // channels. + const isThreadBasedChannel = + channel === "slack" || channel === "mattermost" || channel === "googlechat"; + const shouldConsumeReplyAfterSend = (replyTo: string | undefined) => { + if (!replyTo) { + return false; + } + // Thread-based channels use replyToId for thread routing, not one-shot + // quoting — consuming it would orphan subsequent chunks from the thread. + if (isThreadBasedChannel) { + return false; + } + if (!isSignalChannel) { + return true; + } + // Signal silently drops malformed quote timestamps and group quotes without a + // quote-author, so only consume inherited reply state when the send can carry + // quote metadata on the wire. + return ( + resolveSignalQuoteMetadata({ + replyToId: replyTo, + quoteAuthor: params.quoteAuthor, + isGroup: signalIsGroupTarget, + }).quoteTimestamp !== undefined + ); + }; + const didSendApplyReply = ( + resultCountBeforeSend: number, + value?: T, + sent?: (value: T) => boolean, + ) => { + const valueFlag = value === undefined ? undefined : readReplyApplied(value); + if (valueFlag !== undefined) { + return valueFlag; + } + const resultFlags = results + .slice(resultCountBeforeSend) + .map((result) => readReplyApplied(result)) + .filter((flag): flag is boolean => flag !== undefined); + if (resultFlags.length > 0) { + return resultFlags.some(Boolean); + } + return value !== undefined && sent ? sent(value) : results.length > resultCountBeforeSend; + }; + const markReplyConsumedIfSendSucceeded = ( + replyTo: string | undefined, + resultCountBeforeSend: number, + ) => { + if (shouldConsumeReplyAfterSend(replyTo) && didSendApplyReply(resultCountBeforeSend)) { + replyConsumed = true; + } + }; + const trackReplyConsumption = async ( + replyTo: string | undefined, + send: () => Promise, + sent?: (value: T) => boolean, + ): Promise => { + const resultCountBeforeSend = results.length; + try { + const value = await send(); + if ( + shouldConsumeReplyAfterSend(replyTo) && + didSendApplyReply(resultCountBeforeSend, value, sent) + ) { + replyConsumed = true; + } + return value; + } catch (err) { + // Best-effort delivery should only consume reply metadata once a quoted send + // actually succeeded, even if a later chunk/send in the same payload fails. + markReplyConsumedIfSendSucceeded(replyTo, resultCountBeforeSend); + throw err; + } + }; for (const payload of normalizedPayloads) { let payloadSummary = buildPayloadSummary(payload); try { @@ -654,10 +841,37 @@ async function deliverOutboundPayloadsCore( payloadSummary = hookResult.payloadSummary; params.onPayload?.(payloadSummary); - const sendOverrides = { - replyToId: effectivePayload.replyToId ?? params.replyToId ?? undefined, + // Treat null as an explicit "do not reply" override so payload-level suppression + // is preserved instead of falling back to inherited reply metadata. + // NOTE: Many payload builders emit replyToId: undefined. Treat that as "no opinion" + // so the inherited reply metadata can still apply. + const explicitPayloadReplyTo = + "replyToId" in effectivePayload && effectivePayload.replyToId !== undefined + ? effectivePayload.replyToId + : undefined; + const inheritedReplyTo = + explicitPayloadReplyTo === undefined ? (params.replyToId ?? undefined) : undefined; + const effectiveReplyTo = + explicitPayloadReplyTo != null + ? explicitPayloadReplyTo + : !replyConsumed + ? inheritedReplyTo + : undefined; + // NOTE: quoteAuthor is derived from the top-level params, not resolved per-payload. + // The normal Signal inbound→outbound path (monitor.ts:deliverReplies) resolves + // quoteAuthor per-payload via resolveQuoteAuthor(effectiveReplyTo). If a future + // caller uses deliverOutboundPayloads directly with per-payload replyToId overrides, + // it should supply its own quoteAuthor resolution or extend this logic. + const effectiveQuoteAuthor = effectiveReplyTo ? (params.quoteAuthor ?? undefined) : undefined; + const effectiveSendOverrides = { threadId: params.threadId ?? undefined, forceDocument: params.forceDocument, + replyToId: + explicitPayloadReplyTo !== undefined // explicit payload override (string or null) + ? (explicitPayloadReplyTo ?? undefined) // null → undefined for the wire format + : inheritedReplyTo !== undefined + ? (effectiveReplyTo ?? null) + : undefined, }; if ( handler.sendPayload && @@ -666,7 +880,29 @@ async function deliverOutboundPayloadsCore( channelData: effectivePayload.channelData, }) ) { - const delivery = await handler.sendPayload(effectivePayload, sendOverrides); + // Materialize the resolved replyToId on the payload so downstream handlers see + // the correct value: + // - string: reply applied + // - null: reply metadata was inherited but has been consumed (explicitly no reply) + // - undefined: no reply metadata was ever present (no opinion) + const resolvedReplyToId = + effectiveReplyTo !== undefined + ? effectiveReplyTo + : inheritedReplyTo !== undefined + ? null + : undefined; + const resolvedPayload = { ...effectivePayload, replyToId: resolvedReplyToId }; + const delivery = await trackReplyConsumption( + effectiveReplyTo, + () => + handler.sendPayload!( + resolvedPayload as typeof effectivePayload, + effectiveSendOverrides, + ), + // Any non-undefined return counts as "sent" — channels that don't return + // messageId (e.g. matrix) should still consume the reply indicator. + (value) => value !== undefined, + ); results.push(delivery); emitMessageSent({ success: true, @@ -675,12 +911,19 @@ async function deliverOutboundPayloadsCore( }); continue; } + if (payloadSummary.mediaUrls.length === 0) { const beforeCount = results.length; - if (handler.sendFormattedText) { - results.push(...(await handler.sendFormattedText(payloadSummary.text, sendOverrides))); + if (isSignalChannel) { + await trackReplyConsumption(effectiveReplyTo, () => + sendSignalTextChunks(payloadSummary.text, effectiveReplyTo, effectiveQuoteAuthor), + ); + } else if (handler.sendFormattedText) { + results.push(...(await handler.sendFormattedText(payloadSummary.text, effectiveSendOverrides))); } else { - await sendTextChunks(payloadSummary.text, sendOverrides); + await trackReplyConsumption(effectiveReplyTo, () => + sendTextChunks(payloadSummary.text, effectiveSendOverrides), + ); } const messageId = results.at(-1)?.messageId; emitMessageSent({ @@ -707,7 +950,9 @@ async function deliverOutboundPayloadsCore( ); } const beforeCount = results.length; - await sendTextChunks(fallbackText, sendOverrides); + await trackReplyConsumption(effectiveReplyTo, () => + sendTextChunks(fallbackText, effectiveSendOverrides), + ); const messageId = results.at(-1)?.messageId; emitMessageSent({ success: results.length > beforeCount, @@ -717,26 +962,46 @@ async function deliverOutboundPayloadsCore( continue; } + let first = true; let lastMessageId: string | undefined; - await sendMediaWithLeadingCaption({ - mediaUrls: payloadSummary.mediaUrls, - caption: payloadSummary.text, - send: async ({ mediaUrl, caption }) => { - throwIfAborted(abortSignal); - if (handler.sendFormattedMedia) { - const delivery = await handler.sendFormattedMedia( + await trackReplyConsumption(effectiveReplyTo, async () => { + if (isSignalChannel) { + for (const url of payloadSummary.mediaUrls) { + throwIfAborted(abortSignal); + const caption = first ? payloadSummary.text : ""; + const mediaReplyTo = first ? effectiveReplyTo : undefined; + const mediaQuoteAuthor = first ? effectiveQuoteAuthor : undefined; + const delivery = await sendSignalMedia(caption, url, mediaReplyTo, mediaQuoteAuthor); + results.push(delivery); + lastMessageId = delivery.messageId; + first = false; + } + return; + } + await sendMediaWithLeadingCaption({ + mediaUrls: payloadSummary.mediaUrls, + caption: payloadSummary.text, + send: async ({ mediaUrl, caption }) => { + throwIfAborted(abortSignal); + if (handler.sendFormattedMedia) { + const delivery = await handler.sendFormattedMedia( + caption ?? "", + mediaUrl, + effectiveSendOverrides, + ); + results.push(delivery); + lastMessageId = delivery.messageId; + return; + } + const delivery = await handler.sendMedia( caption ?? "", mediaUrl, - sendOverrides, + effectiveSendOverrides, ); results.push(delivery); lastMessageId = delivery.messageId; - return; - } - const delivery = await handler.sendMedia(caption ?? "", mediaUrl, sendOverrides); - results.push(delivery); - lastMessageId = delivery.messageId; - }, + }, + }); }); emitMessageSent({ success: true, diff --git a/src/infra/outbound/payloads.ts b/src/infra/outbound/payloads.ts index 39da3d2fdcb..9cfc1f63c92 100644 --- a/src/infra/outbound/payloads.ts +++ b/src/infra/outbound/payloads.ts @@ -67,6 +67,8 @@ export function normalizeReplyPayloadsForDelivery( ); const hasMultipleMedia = (explicitMediaUrls?.length ?? 0) > 1; const resolvedMediaUrl = hasMultipleMedia ? undefined : explicitMediaUrl; + const resolvedReplyToId = + payload.replyToId === null ? null : (payload.replyToId ?? parsed.replyToId); const next: ReplyPayload = { ...payload, text: @@ -76,7 +78,7 @@ export function normalizeReplyPayloadsForDelivery( }) ?? "", mediaUrls: mergedMedia.length ? mergedMedia : undefined, mediaUrl: resolvedMediaUrl, - replyToId: payload.replyToId ?? parsed.replyToId, + ...(resolvedReplyToId !== undefined ? { replyToId: resolvedReplyToId } : {}), replyToTag: payload.replyToTag || parsed.replyToTag, replyToCurrent: payload.replyToCurrent || parsed.replyToCurrent, audioAsVoice: Boolean(payload.audioAsVoice || parsed.audioAsVoice), diff --git a/src/infra/outbound/reply-applied.ts b/src/infra/outbound/reply-applied.ts new file mode 100644 index 00000000000..83193716482 --- /dev/null +++ b/src/infra/outbound/reply-applied.ts @@ -0,0 +1,13 @@ +const replyAppliedMarkers = new WeakMap(); + +export function markReplyApplied(value: T, applied: boolean): T { + replyAppliedMarkers.set(value, applied); + return value; +} + +export function readReplyApplied(value: unknown): boolean | undefined { + if (!value || typeof value !== "object") { + return undefined; + } + return replyAppliedMarkers.get(value); +} diff --git a/src/plugin-sdk/reply-payload.ts b/src/plugin-sdk/reply-payload.ts index 98df862d748..1845419d6e7 100644 --- a/src/plugin-sdk/reply-payload.ts +++ b/src/plugin-sdk/reply-payload.ts @@ -7,7 +7,8 @@ export type OutboundReplyPayload = { text?: string; mediaUrls?: string[]; mediaUrl?: string; - replyToId?: string; + // null explicitly suppresses inherited reply metadata (distinct from undefined = "not set") + replyToId?: string | null; }; export type SendableOutboundReplyParts = { @@ -38,7 +39,12 @@ export function normalizeOutboundReplyPayload( ) : undefined; const mediaUrl = typeof payload.mediaUrl === "string" ? payload.mediaUrl : undefined; - const replyToId = typeof payload.replyToId === "string" ? payload.replyToId : undefined; + const replyToId = + typeof payload.replyToId === "string" + ? payload.replyToId + : payload.replyToId === null + ? null + : undefined; return { text, mediaUrls, diff --git a/src/plugins/loader.test.ts b/src/plugins/loader.test.ts index 8af6cf927d4..2de2d8eb086 100644 --- a/src/plugins/loader.test.ts +++ b/src/plugins/loader.test.ts @@ -1,3 +1,4 @@ +import { execFileSync } from "node:child_process"; import fs from "node:fs"; import os from "node:os"; import path from "node:path"; @@ -3262,7 +3263,7 @@ module.exports = { expect(record?.status).toBe("loaded"); }); - it("supports legacy plugins importing monolithic plugin-sdk root", async () => { + it("supports legacy plugins importing monolithic plugin-sdk root", () => { useNoBundledPlugins(); const plugin = writePlugin({ id: "legacy-root-import", diff --git a/tasks/todo.md b/tasks/todo.md new file mode 100644 index 00000000000..1520c28c71d --- /dev/null +++ b/tasks/todo.md @@ -0,0 +1,28 @@ +# Signal quote reply fix + +- [x] Inspect current reply consumption and Signal quote validation flow +- [x] Implement fix so invalid explicit Signal reply ids do not consume inherited reply state +- [x] Add or update regression tests for outbound delivery and Signal send behavior +- [x] Run targeted verification for touched test files and broader related tests if quick +- [x] Review diff, update this file with results, and commit/push changes + +## Notes + +- Status: pushed +- Shared Signal quote metadata validation now drives: + - Signal send param construction + - outbound reply consumption + - monitor reply consumption +- Regression coverage added for: + - malformed explicit Signal `replyToId` preserving inherited reply state + - group replies without `quoteAuthor` preserving inherited reply state + - direct-message valid numeric replies still emitting `quote-timestamp` + - existing partial-chunk failure case using a valid Signal timestamp fixture +- Verification: + - `pnpm exec vitest run --config vitest.unit.config.ts src/infra/outbound/deliver.test.ts src/signal/send.test.ts src/signal/monitor/reply-delivery.test.ts src/signal/monitor/event-handler.quote.test.ts` + - `pnpm exec vitest run --config vitest.unit.config.ts src/signal/*.test.ts src/signal/monitor/*.test.ts` + - `pnpm exec oxfmt --check src/infra/outbound/deliver.ts src/infra/outbound/deliver.test.ts src/signal/send.ts src/signal/send.test.ts src/signal/monitor.ts src/signal/monitor/reply-delivery.ts src/signal/monitor/reply-delivery.test.ts src/signal/reply-quote.ts tasks/todo.md` +- Commit: + - `a1e5c2966` `Signal: preserve inherited quote state` +- Push: + - `git push origin fix/signal-quote-reply`