Merge 8cbd9f425f3ea0a9a914d296bcf2f843e0b9ab26 into 9fb78453e088cd7b553d7779faa0de5c83708e70

This commit is contained in:
Joseph Krug 2026-03-21 05:19:49 +00:00 committed by GitHub
commit 25aa73c53c
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
39 changed files with 2589 additions and 82 deletions

View File

@ -368,7 +368,7 @@ async function downloadAttachment(
}
async function deliverGoogleChatReply(params: {
payload: { text?: string; mediaUrls?: string[]; mediaUrl?: string; replyToId?: string };
payload: { text?: string; mediaUrls?: string[]; mediaUrl?: string; replyToId?: string | null };
account: ResolvedGoogleChatAccount;
spaceId: string;
runtime: GoogleChatRuntimeEnv;
@ -462,7 +462,7 @@ async function deliverGoogleChatReply(params: {
account,
space: spaceId,
text: caption,
thread: payload.replyToId,
thread: payload.replyToId ?? undefined,
attachments: [
{ attachmentUploadToken: upload.attachmentUploadToken, contentName: loaded.fileName },
],

View File

@ -50,7 +50,7 @@ export async function deliverReplies(params: {
maxBytes,
client,
accountId,
replyToId: payload.replyToId,
replyToId: payload.replyToId ?? undefined,
});
sentMessageCache?.remember(scope, { text: chunk, messageId: sent.messageId });
},
@ -60,7 +60,7 @@ export async function deliverReplies(params: {
maxBytes,
client,
accountId,
replyToId: payload.replyToId,
replyToId: payload.replyToId ?? undefined,
});
sentMessageCache?.remember(scope, {
text: caption || undefined,

View File

@ -159,7 +159,7 @@ function channelChatType(kind: ChatType): "direct" | "group" | "channel" {
export function resolveMattermostReplyRootId(params: {
threadRootId?: string;
replyToId?: string;
replyToId?: string | null;
}): string | undefined {
const threadRootId = params.threadRootId?.trim();
if (threadRootId) {

View File

@ -70,6 +70,8 @@ async function sendSignalOutbound(params: {
mediaUrl?: string;
mediaLocalRoots?: readonly string[];
accountId?: string;
replyToId?: string | null;
quoteAuthor?: string | null;
deps?: { [channelId: string]: unknown };
}) {
const { send, maxBytes } = resolveSignalSendContext(params);
@ -79,6 +81,8 @@ async function sendSignalOutbound(params: {
...(params.mediaLocalRoots?.length ? { mediaLocalRoots: params.mediaLocalRoots } : {}),
maxBytes,
accountId: params.accountId ?? undefined,
replyTo: params.replyToId ?? undefined,
quoteAuthor: params.quoteAuthor ?? undefined,
});
}
@ -194,6 +198,8 @@ async function sendFormattedSignalText(ctx: {
to: string;
text: string;
accountId?: string | null;
replyToId?: string | null;
quoteAuthor?: string | null;
deps?: { [channelId: string]: unknown };
abortSignal?: AbortSignal;
}) {
@ -218,6 +224,7 @@ async function sendFormattedSignalText(ctx: {
chunks = [{ text: ctx.text, styles: [] }];
}
const results = [];
let first = true;
for (const chunk of chunks) {
ctx.abortSignal?.throwIfAborted();
const result = await send(ctx.to, chunk.text, {
@ -226,8 +233,11 @@ async function sendFormattedSignalText(ctx: {
accountId: ctx.accountId ?? undefined,
textMode: "plain",
textStyles: chunk.styles,
replyTo: first ? (ctx.replyToId ?? undefined) : undefined,
quoteAuthor: first ? (ctx.quoteAuthor ?? undefined) : undefined,
});
results.push(result);
first = false;
}
return attachChannelToResults("signal", results);
}
@ -239,6 +249,8 @@ async function sendFormattedSignalMedia(ctx: {
mediaUrl: string;
mediaLocalRoots?: readonly string[];
accountId?: string | null;
replyToId?: string | null;
quoteAuthor?: string | null;
deps?: { [channelId: string]: unknown };
abortSignal?: AbortSignal;
}) {
@ -267,6 +279,8 @@ async function sendFormattedSignalMedia(ctx: {
accountId: ctx.accountId ?? undefined,
textMode: "plain",
textStyles: formatted.styles,
replyTo: ctx.replyToId ?? undefined,
quoteAuthor: ctx.quoteAuthor ?? undefined,
});
return attachChannelToResult("signal", result);
}
@ -315,12 +329,23 @@ export const signalPlugin: ChannelPlugin<ResolvedSignalAccount> = {
chunker: (text, limit) => getSignalRuntime().channel.text.chunkText(text, limit),
chunkerMode: "text",
textChunkLimit: 4000,
sendFormattedText: async ({ cfg, to, text, accountId, deps, abortSignal }) =>
sendFormattedText: async ({
cfg,
to,
text,
accountId,
deps,
abortSignal,
replyToId,
quoteAuthor,
}) =>
await sendFormattedSignalText({
cfg,
to,
text,
accountId,
replyToId,
quoteAuthor,
deps,
abortSignal,
}),
@ -333,6 +358,8 @@ export const signalPlugin: ChannelPlugin<ResolvedSignalAccount> = {
accountId,
deps,
abortSignal,
replyToId,
quoteAuthor,
}) =>
await sendFormattedSignalMedia({
cfg,
@ -341,20 +368,34 @@ export const signalPlugin: ChannelPlugin<ResolvedSignalAccount> = {
mediaUrl,
mediaLocalRoots,
accountId,
replyToId,
quoteAuthor,
deps,
abortSignal,
}),
...createAttachedChannelResultAdapter({
channel: "signal",
sendText: async ({ cfg, to, text, accountId, deps }) =>
sendText: async ({ cfg, to, text, accountId, deps, replyToId, quoteAuthor }) =>
await sendSignalOutbound({
cfg,
to,
text,
accountId: accountId ?? undefined,
replyToId,
quoteAuthor,
deps,
}),
sendMedia: async ({ cfg, to, text, mediaUrl, mediaLocalRoots, accountId, deps }) =>
sendMedia: async ({
cfg,
to,
text,
mediaUrl,
mediaLocalRoots,
accountId,
deps,
replyToId,
quoteAuthor,
}) =>
await sendSignalOutbound({
cfg,
to,
@ -362,6 +403,8 @@ export const signalPlugin: ChannelPlugin<ResolvedSignalAccount> = {
mediaUrl,
mediaLocalRoots,
accountId: accountId ?? undefined,
replyToId,
quoteAuthor,
deps,
}),
}),

View File

@ -1,5 +1,6 @@
import { describe, expect, it } from "vitest";
import {
isStrictUuid,
looksLikeUuid,
resolveSignalPeerId,
resolveSignalRecipient,
@ -25,6 +26,30 @@ describe("looksLikeUuid", () => {
});
});
describe("isStrictUuid", () => {
it("accepts canonical hyphenated UUIDs", () => {
expect(isStrictUuid("123e4567-e89b-12d3-a456-426614174000")).toBe(true);
});
it("accepts compact 32-hex UUIDs", () => {
expect(isStrictUuid("123e4567e89b12d3a456426614174000")).toBe(true); // pragma: allowlist secret
});
it("rejects short hex fragments that looksLikeUuid accepts", () => {
expect(isStrictUuid("abcd-1234")).toBe(false);
});
it("rejects numeric ids and phone-like values", () => {
expect(isStrictUuid("1234567890")).toBe(false);
expect(isStrictUuid("+15555551212")).toBe(false);
});
it("rejects attacker-crafted strings with hex chars", () => {
expect(isStrictUuid("deadbeef")).toBe(false);
expect(isStrictUuid("not-a-uuid-but-has-hex-cafe")).toBe(false);
});
});
describe("signal sender identity", () => {
it("prefers sourceNumber over sourceUuid", () => {
const sender = resolveSignalSender({

View File

@ -24,6 +24,11 @@ export function looksLikeUuid(value: string): boolean {
return /[a-f]/i.test(compact);
}
/** Strict UUID check: only accepts canonical 8-4-4-4-12 or compact 32-hex formats. */
export function isStrictUuid(value: string): boolean {
return UUID_HYPHENATED_RE.test(value) || UUID_COMPACT_RE.test(value);
}
function stripSignalPrefix(value: string): string {
return value.replace(/^signal:/i, "").trim();
}

View File

@ -10,10 +10,6 @@ import type { BackoffPolicy } from "openclaw/plugin-sdk/infra-runtime";
import { waitForTransportReady } from "openclaw/plugin-sdk/infra-runtime";
import { saveMediaBuffer } from "openclaw/plugin-sdk/media-runtime";
import { DEFAULT_GROUP_HISTORY_LIMIT, type HistoryEntry } from "openclaw/plugin-sdk/reply-history";
import {
deliverTextOrMediaReply,
resolveSendableOutboundReplyParts,
} from "openclaw/plugin-sdk/reply-payload";
import type { ReplyPayload } from "openclaw/plugin-sdk/reply-runtime";
import {
chunkTextWithMode,
@ -33,6 +29,12 @@ import type {
SignalReactionMessage,
SignalReactionTarget,
} from "./monitor/event-handler.types.js";
import {
markSignalReplyConsumed,
resolveSignalReplyDelivery,
type SignalReplyDeliveryState,
} from "./monitor/reply-delivery.js";
import { isSignalGroupTarget } from "./reply-quote.js";
import { sendMessageSignal } from "./send.js";
import { runSignalSseLoop } from "./sse-reconnect.js";
@ -299,36 +301,69 @@ async function deliverReplies(params: {
maxBytes: number;
textLimit: number;
chunkMode: "length" | "newline";
inheritedReplyToId?: string;
replyDeliveryState?: SignalReplyDeliveryState;
resolveQuoteAuthor?: (replyToId: string) => string | undefined;
}) {
const { replies, target, baseUrl, account, accountId, runtime, maxBytes, textLimit, chunkMode } =
params;
for (const payload of replies) {
const reply = resolveSendableOutboundReplyParts(payload);
const delivered = await deliverTextOrMediaReply({
const { payload: resolvedPayload, effectiveReplyTo } = resolveSignalReplyDelivery({
payload,
text: reply.text,
chunkText: (value) => chunkTextWithMode(value, textLimit, chunkMode),
sendText: async (chunk) => {
inheritedReplyToId: params.inheritedReplyToId,
state: params.replyDeliveryState,
});
const effectiveQuoteAuthor = effectiveReplyTo
? params.resolveQuoteAuthor?.(effectiveReplyTo)
: undefined;
const text = resolvedPayload.text ?? "";
const resolvedMediaList =
resolvedPayload.mediaUrls ?? (resolvedPayload.mediaUrl ? [resolvedPayload.mediaUrl] : []);
if (!text && resolvedMediaList.length === 0) {
continue;
}
if (resolvedMediaList.length === 0) {
let first = true;
for (const chunk of chunkTextWithMode(text, textLimit, chunkMode)) {
await sendMessageSignal(target, chunk, {
baseUrl,
account,
maxBytes,
accountId,
replyTo: first ? effectiveReplyTo : undefined,
quoteAuthor: first ? effectiveQuoteAuthor : undefined,
});
},
sendMedia: async ({ mediaUrl, caption }) => {
await sendMessageSignal(target, caption ?? "", {
if (first) {
markSignalReplyConsumed(params.replyDeliveryState, effectiveReplyTo, {
isGroup: isSignalGroupTarget(target),
quoteAuthor: effectiveQuoteAuthor,
});
}
first = false;
}
} else {
let first = true;
for (const url of resolvedMediaList) {
const caption = first ? text : "";
await sendMessageSignal(target, caption, {
baseUrl,
account,
mediaUrl,
maxBytes,
accountId,
replyTo: first ? effectiveReplyTo : undefined,
quoteAuthor: first ? effectiveQuoteAuthor : undefined,
});
},
});
if (delivered !== "empty") {
runtime.log?.(`delivered reply to ${target}`);
if (first) {
markSignalReplyConsumed(params.replyDeliveryState, effectiveReplyTo, {
isGroup: isSignalGroupTarget(target),
quoteAuthor: effectiveQuoteAuthor,
});
}
first = false;
}
}
runtime.log?.(`delivered reply to ${target}`);
}
}

View File

@ -1,4 +1,5 @@
import { beforeEach, describe, expect, it, vi } from "vitest";
import { buildInboundUserContextPrefix } from "../../../../src/auto-reply/reply/inbound-meta.js";
import type { MsgContext } from "../../../../src/auto-reply/templating.js";
import { expectChannelInboundContextContract as expectInboundContextContract } from "../../../../src/channels/plugins/contracts/suites.js";
import { createSignalEventHandler } from "./event-handler.js";
@ -206,6 +207,138 @@ describe("signal createSignalEventHandler inbound context", () => {
expect(capture.ctx?.MediaTypes).toEqual(["image/jpeg", "application/octet-stream"]);
});
it("surfaces quoted reply context in the agent-visible metadata block", async () => {
const handler = createSignalEventHandler(
createBaseSignalEventHandlerDeps({
cfg: {
messages: { inbound: { debounceMs: 0 } },
channels: { signal: { dmPolicy: "open", allowFrom: ["*"] } },
},
historyLimit: 0,
}),
);
await handler(
createSignalReceiveEvent({
sourceNumber: "+15550002222",
sourceName: "Bob",
timestamp: 1700000000001,
dataMessage: {
message: "thanks",
quote: {
id: 1700000000000,
authorNumber: "+15550003333",
text: " sent the details",
mentions: [{ number: "+15550004444", start: 0, length: 1 }],
},
attachments: [],
},
}),
);
expect(capture.ctx).toBeTruthy();
expect(capture.ctx?.ReplyToId).toBe("1700000000000");
expect(capture.ctx?.ReplyToSender).toBe("+15550003333");
expect(capture.ctx?.ReplyToBody).toBe("@+15550004444 sent the details");
expect(capture.ctx?.ReplyToIsQuote).toBe(true);
expect(String(capture.ctx?.Body ?? "")).toContain("[Quoting +15550003333 id:1700000000000]");
const userContext = buildInboundUserContextPrefix(capture.ctx!);
expect(userContext).toContain("Replied message (untrusted, for context):");
expect(userContext).toContain('"sender_label": "+15550003333"');
expect(userContext).toContain('"body": "@+15550004444 sent the details"');
});
it("keeps quote-only messages when the user sends no new text", async () => {
const handler = createSignalEventHandler(
createBaseSignalEventHandlerDeps({
cfg: {
messages: { inbound: { debounceMs: 0 } },
channels: { signal: { dmPolicy: "open", allowFrom: ["*"] } },
},
historyLimit: 0,
}),
);
await handler(
createSignalReceiveEvent({
dataMessage: {
message: "",
quote: {
id: 1700000000000,
text: "original context",
},
attachments: [],
},
}),
);
expect(capture.ctx).toBeTruthy();
expect(capture.ctx?.RawBody).toBe("");
expect(capture.ctx?.ReplyToBody).toBe("original context");
expect(String(capture.ctx?.Body ?? "")).toContain('"original context"');
});
it("uses quoted attachment metadata for media-only quoted replies", async () => {
const handler = createSignalEventHandler(
createBaseSignalEventHandlerDeps({
cfg: {
messages: { inbound: { debounceMs: 0 } },
channels: { signal: { dmPolicy: "open", allowFrom: ["*"] } },
},
historyLimit: 0,
}),
);
await handler(
createSignalReceiveEvent({
dataMessage: {
message: "nice one",
quote: {
id: 1700000000000,
attachments: [{ contentType: "image/jpeg", filename: "photo.jpg" }],
},
attachments: [],
},
}),
);
expect(capture.ctx).toBeTruthy();
expect(capture.ctx?.ReplyToId).toBe("1700000000000");
expect(capture.ctx?.ReplyToBody).toBe("<media:image>");
expect(String(capture.ctx?.Body ?? "")).toContain('"<media:image>"');
});
it("ignores invalid quote ids while preserving the quoted body context", async () => {
const handler = createSignalEventHandler(
createBaseSignalEventHandlerDeps({
cfg: {
messages: { inbound: { debounceMs: 0 } },
channels: { signal: { dmPolicy: "open", allowFrom: ["*"] } },
},
historyLimit: 0,
}),
);
await handler(
createSignalReceiveEvent({
dataMessage: {
message: "reply",
quote: {
id: "1700000000000abc",
text: "quoted context",
},
attachments: [],
},
}),
);
expect(capture.ctx).toBeTruthy();
expect(capture.ctx?.ReplyToId).toBeUndefined();
expect(capture.ctx?.ReplyToBody).toBe("quoted context");
expect(String(capture.ctx?.Body ?? "")).not.toContain("id:1700000000000abc");
});
it("drops own UUID inbound messages when only accountUuid is configured", async () => {
const ownUuid = "123e4567-e89b-12d3-a456-426614174000";
const handler = createSignalEventHandler(

View File

@ -6,6 +6,7 @@ import {
createBaseSignalEventHandlerDeps,
createSignalReceiveEvent,
} from "./event-handler.test-harness.js";
import type { SignalQuote } from "./event-handler.types.js";
type SignalMsgContext = Pick<MsgContext, "Body" | "WasMentioned"> & {
Body?: string;
@ -32,6 +33,7 @@ type GroupEventOpts = {
message?: string;
attachments?: unknown[];
quoteText?: string;
quote?: SignalQuote;
mentions?: Array<{
uuid?: string;
number?: string;
@ -45,7 +47,7 @@ function makeGroupEvent(opts: GroupEventOpts) {
dataMessage: {
message: opts.message ?? "",
attachments: opts.attachments ?? [],
quote: opts.quoteText ? { text: opts.quoteText } : undefined,
quote: opts.quote ?? (opts.quoteText ? { text: opts.quoteText } : undefined),
mentions: opts.mentions ?? undefined,
groupInfo: { groupId: "g1", groupName: "Test Group" },
},
@ -203,6 +205,33 @@ describe("signal mention gating", () => {
await expectSkippedGroupHistory({ message: "", quoteText: "quoted context" }, "quoted context");
});
it("records quoted media placeholders in pending history for skipped quote-only group messages", async () => {
await expectSkippedGroupHistory(
{
message: "",
quote: {
id: 1700000000000,
attachments: [{ contentType: "image/png", filename: "photo.png" }],
},
},
"<media:image>",
);
});
it("hydrates quote mentions in pending history for skipped quote-only group messages", async () => {
const placeholder = "\uFFFC";
await expectSkippedGroupHistory(
{
message: "",
quote: {
text: `${placeholder} quoted context`,
mentions: [{ uuid: "123e4567", start: 0, length: placeholder.length }],
},
},
"@123e4567 quoted context",
);
});
it("bypasses mention gating for authorized control commands", async () => {
capturedCtx = undefined;
const handler = createMentionHandler({ requireMention: true });

View File

@ -0,0 +1,515 @@
import { beforeEach, describe, expect, it, vi } from "vitest";
import { buildInboundUserContextPrefix } from "../../../../src/auto-reply/reply/inbound-meta.js";
import type { MsgContext } from "../../../../src/auto-reply/templating.js";
import { createSignalEventHandler } from "./event-handler.js";
import {
createBaseSignalEventHandlerDeps,
createSignalReceiveEvent,
} from "./event-handler.test-harness.js";
import type { SignalEventHandlerDeps } from "./event-handler.types.js";
type CapturedSignalQuoteContext = Pick<
MsgContext,
"Body" | "BodyForAgent" | "ReplyToBody" | "ReplyToId" | "ReplyToIsQuote" | "ReplyToSender"
> & {
Body?: string;
BodyForAgent?: string;
ReplyToBody?: string;
ReplyToId?: string;
ReplyToIsQuote?: boolean;
ReplyToSender?: string;
};
let capturedCtx: CapturedSignalQuoteContext | undefined;
const { dispatchInboundMessageMock } = vi.hoisted(() => ({
dispatchInboundMessageMock: vi.fn(),
}));
function getCapturedCtx() {
return capturedCtx as CapturedSignalQuoteContext;
}
vi.mock("../../../../src/auto-reply/dispatch.js", async (importOriginal) => {
const actual = await importOriginal<typeof import("../../../../src/auto-reply/dispatch.js")>();
return {
...actual,
dispatchInboundMessage: dispatchInboundMessageMock,
dispatchInboundMessageWithDispatcher: dispatchInboundMessageMock,
dispatchInboundMessageWithBufferedDispatcher: dispatchInboundMessageMock,
};
});
vi.mock("../send.js", () => ({
sendMessageSignal: vi.fn(),
sendTypingSignal: vi.fn().mockResolvedValue(true),
sendReadReceiptSignal: vi.fn().mockResolvedValue(true),
}));
vi.mock("../../../../src/pairing/pairing-store.js", () => ({
readChannelAllowFromStore: vi.fn().mockResolvedValue([]),
upsertChannelPairingRequest: vi.fn(),
}));
function createQuoteHandler(overrides: Partial<SignalEventHandlerDeps> = {}) {
return createSignalEventHandler(
createBaseSignalEventHandlerDeps({
// oxlint-disable-next-line typescript/no-explicit-any
cfg: { messages: { inbound: { debounceMs: 0 } } } as any,
historyLimit: 0,
...overrides,
}),
);
}
describe("signal quote reply handling", () => {
beforeEach(() => {
capturedCtx = undefined;
dispatchInboundMessageMock.mockReset();
dispatchInboundMessageMock.mockImplementation(async (params: { ctx: unknown }) => {
capturedCtx = params.ctx as CapturedSignalQuoteContext;
return { queuedFinal: false, counts: { tool: 0, block: 0, final: 0 } };
});
});
it("surfaces quoted text in reply metadata while preserving the new message text", async () => {
const handler = createQuoteHandler();
await handler(
createSignalReceiveEvent({
sourceNumber: "+15550002222",
sourceName: "Bob",
timestamp: 1700000000001,
dataMessage: {
message: "Thanks for the info!",
quote: {
id: 1700000000000,
authorNumber: "+15550003333",
text: "The meeting is at 3pm",
},
},
}),
);
const ctx = getCapturedCtx();
expect(ctx?.BodyForAgent).toBe("Thanks for the info!");
expect(ctx?.ReplyToId).toBe("1700000000000");
expect(ctx?.ReplyToBody).toBe("The meeting is at 3pm");
expect(ctx?.ReplyToSender).toBe("+15550003333");
expect(ctx?.ReplyToIsQuote).toBe(true);
expect(String(ctx?.Body ?? "")).toContain("Thanks for the info!");
expect(String(ctx?.Body ?? "")).toContain("[Quoting +15550003333 id:1700000000000]");
});
it("uses the latest quote target when debouncing rapid quoted Signal replies", async () => {
vi.useFakeTimers();
try {
const handler = createQuoteHandler({
// oxlint-disable-next-line typescript/no-explicit-any
cfg: { messages: { inbound: { debounceMs: 25 } } } as any,
});
await handler(
createSignalReceiveEvent({
sourceNumber: "+15550002222",
sourceName: "Bob",
timestamp: 1700000000001,
dataMessage: {
message: "First chunk",
quote: {
id: 1700000000000,
authorNumber: "+15550003333",
text: "First quoted message",
},
},
}),
);
await handler(
createSignalReceiveEvent({
sourceNumber: "+15550002222",
sourceName: "Bob",
timestamp: 1700000000002,
dataMessage: {
message: "Second chunk",
quote: {
id: 1700000000009,
authorNumber: "+15550004444",
text: "Second quoted message",
},
},
}),
);
expect(dispatchInboundMessageMock).not.toHaveBeenCalled();
await vi.advanceTimersByTimeAsync(30);
await vi.waitFor(() => {
expect(dispatchInboundMessageMock).toHaveBeenCalledTimes(1);
});
const ctx = getCapturedCtx();
expect(ctx?.BodyForAgent).toBe("First chunk\\nSecond chunk");
expect(ctx?.ReplyToId).toBe("1700000000009");
expect(ctx?.ReplyToBody).toBe("Second quoted message");
expect(ctx?.ReplyToSender).toBe("+15550004444");
expect(String(ctx?.Body ?? "")).toContain("[Quoting +15550004444 id:1700000000009]");
expect(String(ctx?.Body ?? "")).not.toContain("[Quoting +15550003333 id:1700000000000]");
} finally {
vi.useRealTimers();
}
});
it("keeps quote-only replies and exposes the replied-message context block", async () => {
const handler = createQuoteHandler();
await handler(
createSignalReceiveEvent({
dataMessage: {
message: "",
quote: {
id: 1700000000000,
authorNumber: "+15550002222",
text: "Original message to quote",
},
},
}),
);
const ctx = getCapturedCtx();
expect(ctx).toBeTruthy();
expect(ctx?.BodyForAgent).toBe("");
expect(ctx?.ReplyToBody).toBe("Original message to quote");
const userContext = buildInboundUserContextPrefix(ctx as MsgContext);
expect(userContext).toContain("Replied message (untrusted, for context):");
expect(userContext).toContain('"body": "Original message to quote"');
expect(userContext).toContain('"sender_label": "+15550002222"');
});
it("hydrates Signal mentions inside quoted text before surfacing reply context", async () => {
const handler = createQuoteHandler();
const placeholder = "\uFFFC";
await handler(
createSignalReceiveEvent({
dataMessage: {
message: "Replying now",
quote: {
id: 1700000000000,
text: `${placeholder} can you check this?`,
mentions: [{ uuid: "123e4567", start: 0, length: placeholder.length }],
},
},
}),
);
const ctx = getCapturedCtx();
expect(ctx?.ReplyToBody).toBe("@123e4567 can you check this?");
expect(String(ctx?.Body ?? "")).toContain('"@123e4567 can you check this?"');
});
it("uses quoted attachment placeholders for media replies without text", async () => {
const handler = createQuoteHandler();
await handler(
createSignalReceiveEvent({
dataMessage: {
message: "Nice photo!",
quote: {
id: 1700000000000,
authorUuid: "123e4567-e89b-12d3-a456-426614174000",
text: null,
attachments: [{ contentType: "image/jpeg" }],
},
},
}),
);
const ctx = getCapturedCtx();
expect(ctx?.ReplyToBody).toBe("<media:image>");
expect(ctx?.ReplyToSender).toBe("uuid:123e4567-e89b-12d3-a456-426614174000");
});
it("falls back to a generic quoted body when signal-cli sends an empty quoted text string", async () => {
const handler = createQuoteHandler();
await handler(
createSignalReceiveEvent({
dataMessage: {
message: "Replying to media",
quote: {
id: 1700000000000,
text: "",
attachments: [],
},
},
}),
);
const ctx = getCapturedCtx();
expect(ctx?.ReplyToId).toBe("1700000000000");
expect(ctx?.ReplyToBody).toBe("<quoted message>");
});
it("drops invalid quote ids from reply metadata but keeps valid quoted text", async () => {
const handler = createQuoteHandler();
await handler(
createSignalReceiveEvent({
dataMessage: {
message: "I saw this",
quote: {
id: "1700000000000abc",
authorNumber: "+15550002222",
text: "Original text",
},
},
}),
);
const ctx = getCapturedCtx();
expect(ctx?.ReplyToId).toBeUndefined();
expect(ctx?.ReplyToBody).toBe("Original text");
expect(String(ctx?.Body ?? "")).not.toContain("id:1700000000000abc");
});
it("does not synthesize quote-only context from invalid negative ids", async () => {
const handler = createQuoteHandler();
await handler(
createSignalReceiveEvent({
dataMessage: {
message: "",
quote: {
id: -1,
text: null,
},
},
}),
);
expect(capturedCtx).toBeUndefined();
});
it("passes inherited reply state through deliverReplies for the current Signal message", async () => {
const deliverReplies = vi.fn().mockResolvedValue(undefined);
dispatchInboundMessageMock.mockImplementationOnce(
async (params: {
ctx: unknown;
dispatcher: {
sendToolResult: (payload: { text: string }) => boolean;
sendFinalReply: (payload: { text: string }) => boolean;
markComplete: () => void;
waitForIdle: () => Promise<void>;
};
}) => {
capturedCtx = params.ctx as CapturedSignalQuoteContext;
params.dispatcher.sendToolResult({ text: "First reply" });
params.dispatcher.sendFinalReply({ text: "Second reply" });
params.dispatcher.markComplete();
await params.dispatcher.waitForIdle();
return { queuedFinal: true, counts: { tool: 1, block: 0, final: 1 } };
},
);
const handler = createQuoteHandler({ deliverReplies });
await handler(
createSignalReceiveEvent({
dataMessage: {
message: "Incoming message",
},
}),
);
expect(deliverReplies).toHaveBeenCalledTimes(2);
expect(deliverReplies.mock.calls[0]?.[0].replies[0]?.replyToId).toBeUndefined();
expect(deliverReplies.mock.calls[1]?.[0].replies[0]?.replyToId).toBeUndefined();
expect(deliverReplies.mock.calls[0]?.[0].inheritedReplyToId).toBe("1700000000000");
expect(deliverReplies.mock.calls[1]?.[0].inheritedReplyToId).toBe("1700000000000");
expect(deliverReplies.mock.calls[0]?.[0].replyDeliveryState).toBe(
deliverReplies.mock.calls[1]?.[0].replyDeliveryState,
);
});
it("preserves explicit replyToId values on later deliveries in the same turn", async () => {
const deliverReplies = vi.fn().mockResolvedValue(undefined);
dispatchInboundMessageMock.mockImplementationOnce(
async (params: {
ctx: unknown;
dispatcher: {
sendToolResult: (payload: { text: string; replyToId: string }) => boolean;
sendFinalReply: (payload: { text: string; replyToId: string }) => boolean;
markComplete: () => void;
waitForIdle: () => Promise<void>;
};
}) => {
capturedCtx = params.ctx as CapturedSignalQuoteContext;
params.dispatcher.sendToolResult({ text: "First reply", replyToId: "1700000000001" });
params.dispatcher.sendFinalReply({ text: "Second reply", replyToId: "1700000000002" });
params.dispatcher.markComplete();
await params.dispatcher.waitForIdle();
return { queuedFinal: true, counts: { tool: 1, block: 0, final: 1 } };
},
);
const handler = createQuoteHandler({ deliverReplies });
await handler(
createSignalReceiveEvent({
dataMessage: {
message: "Incoming message",
},
}),
);
expect(deliverReplies).toHaveBeenCalledTimes(2);
expect(deliverReplies.mock.calls[0]?.[0].replies[0]?.replyToId).toBe("1700000000001");
expect(deliverReplies.mock.calls[1]?.[0].replies[0]?.replyToId).toBe("1700000000002");
});
it("resolves missing quote authors from previously seen group messages", async () => {
const handler = createQuoteHandler();
await handler(
createSignalReceiveEvent({
sourceNumber: "+15550002222",
sourceName: "Bob",
timestamp: 1700000000001,
dataMessage: {
message: "The meeting is at 3pm",
groupInfo: { groupId: "g1", groupName: "Test Group" },
},
}),
);
capturedCtx = undefined;
await handler(
createSignalReceiveEvent({
sourceNumber: "+15550003333",
sourceName: "Alice",
timestamp: 1700000000002,
dataMessage: {
message: "Thanks for the info!",
groupInfo: { groupId: "g1", groupName: "Test Group" },
quote: {
id: 1700000000001,
text: "The meeting is at 3pm",
},
},
}),
);
const ctx = getCapturedCtx();
expect(ctx?.ReplyToSender).toBe("+15550002222");
expect(String(ctx?.Body ?? "")).toContain("[Quoting +15550002222 id:1700000000001]");
});
it("does not poison the quote-author cache from attacker-controlled quote metadata", async () => {
const handler = createQuoteHandler();
await handler(
createSignalReceiveEvent({
sourceNumber: "+15550002222",
sourceName: "Bob",
timestamp: 1700000000001,
dataMessage: {
message: "Forwarding this",
groupInfo: { groupId: "g1", groupName: "Test Group" },
quote: {
id: 1700000000000,
authorNumber: "+15550009999",
text: "Mallory wrote this",
},
},
}),
);
capturedCtx = undefined;
await handler(
createSignalReceiveEvent({
sourceNumber: "+15550003333",
sourceName: "Alice",
timestamp: 1700000000002,
dataMessage: {
message: "Replying to Bob",
groupInfo: { groupId: "g1", groupName: "Test Group" },
quote: {
id: 1700000000001,
text: "Forwarding this",
},
},
}),
);
const ctx = getCapturedCtx();
expect(ctx?.ReplyToSender).toBe("+15550002222");
expect(String(ctx?.Body ?? "")).toContain("[Quoting +15550002222 id:1700000000001]");
});
it("resolves cached uuid senders with a uuid: prefix", async () => {
const handler = createQuoteHandler();
const senderUuid = "123e4567-e89b-12d3-a456-426614174000";
await handler(
createSignalReceiveEvent({
sourceNumber: null,
sourceUuid: senderUuid,
sourceName: "Bob",
timestamp: 1700000000001,
dataMessage: {
message: "The meeting is at 3pm",
groupInfo: { groupId: "g1", groupName: "Test Group" },
},
}),
);
capturedCtx = undefined;
await handler(
createSignalReceiveEvent({
sourceNumber: "+15550003333",
sourceName: "Alice",
timestamp: 1700000000002,
dataMessage: {
message: "Thanks for the info!",
groupInfo: { groupId: "g1", groupName: "Test Group" },
quote: {
id: 1700000000001,
text: "The meeting is at 3pm",
},
},
}),
);
const ctx = getCapturedCtx();
expect(ctx?.ReplyToSender).toBe(`uuid:${senderUuid}`);
expect(String(ctx?.Body ?? "")).toContain(`[Quoting uuid:${senderUuid} id:1700000000001]`);
});
it("preserves uuid: prefix in quote author normalization", async () => {
const handler = createQuoteHandler();
await handler(
createSignalReceiveEvent({
sourceNumber: "+15550002222",
sourceName: "Bob",
timestamp: 1700000000001,
dataMessage: {
message: "Thanks for the info!",
groupInfo: { groupId: "g1", groupName: "Test Group" },
quote: {
id: 1700000000000,
authorUuid: "uuid:01234567-89ab-cdef-0123-456789abcdef",
text: "The meeting is at 3pm",
},
},
}),
);
const ctx = getCapturedCtx();
expect(ctx?.ReplyToSender).toBe("uuid:01234567-89ab-cdef-0123-456789abcdef");
expect(String(ctx?.Body ?? "")).toContain(
"[Quoting uuid:01234567-89ab-cdef-0123-456789abcdef id:1700000000000]",
);
});
});

View File

@ -41,6 +41,7 @@ import {
formatSignalSenderDisplay,
formatSignalSenderId,
isSignalSenderAllowed,
isStrictUuid,
normalizeSignalAllowRecipient,
resolveSignalPeerId,
resolveSignalRecipient,
@ -55,8 +56,10 @@ import type {
SignalEventHandlerDeps,
SignalReactionMessage,
SignalReceivePayload,
SignalReplyTarget,
} from "./event-handler.types.js";
import { renderSignalMentions } from "./mentions.js";
import { describeSignalReplyTarget } from "./quote-context.js";
function formatAttachmentKindCount(kind: string, count: number): string {
if (kind === "attachment") {
@ -96,6 +99,78 @@ function resolveSignalInboundRoute(params: {
}
export function createSignalEventHandler(deps: SignalEventHandlerDeps) {
// Best-effort index to map (conversation, messageId) -> senderRecipient for quote-author resolution.
// signal-cli requires quote-author for group quotes.
const messageAuthorIndex = new Map<string, string>();
const MAX_MESSAGE_AUTHOR_INDEX = 5000;
const resolveConversationKey = (entry: {
isGroup: boolean;
groupId?: string;
senderPeerId: string;
}): string =>
entry.isGroup ? `group:${entry.groupId ?? "unknown"}` : `dm:${entry.senderPeerId}`;
// Strict E.164: + followed by 715 digits (ITU-T E.164 range).
const STRICT_E164_RE = /^\+\d{7,15}$/;
const normalizeCachedMessageAuthor = (raw?: string) => {
const trimmed = raw?.trim();
if (!trimmed) {
return undefined;
}
// Strip uuid: prefix and validate strictly.
const unprefixed = trimmed.toLowerCase().startsWith("uuid:")
? trimmed.slice("uuid:".length).trim()
: undefined;
if (unprefixed !== undefined) {
return isStrictUuid(unprefixed) ? `uuid:${unprefixed}` : undefined;
}
// Bare UUID without prefix.
if (isStrictUuid(trimmed)) {
return `uuid:${trimmed}`;
}
// E.164 phone number — normalize then validate.
const normalized = normalizeE164(trimmed);
return STRICT_E164_RE.test(normalized) ? normalized : undefined;
};
const rememberMessageAuthor = (params: {
conversationKey: string;
messageId?: string;
senderRecipient?: string;
}) => {
const id = params.messageId?.trim();
const senderRecipient = normalizeCachedMessageAuthor(params.senderRecipient);
if (!id || !senderRecipient) {
return;
}
const key = `${params.conversationKey}:${id}`;
// Refresh insertion order for simple LRU-style eviction.
if (messageAuthorIndex.has(key)) {
messageAuthorIndex.delete(key);
}
messageAuthorIndex.set(key, senderRecipient);
while (messageAuthorIndex.size > MAX_MESSAGE_AUTHOR_INDEX) {
const oldest = messageAuthorIndex.keys().next().value;
if (oldest === undefined) {
break;
}
messageAuthorIndex.delete(oldest);
}
};
const resolveQuotedAuthor = (params: {
conversationKey: string;
replyToId?: string;
}): string | undefined => {
const replyToId = params.replyToId?.trim();
if (!replyToId) {
return undefined;
}
return messageAuthorIndex.get(`${params.conversationKey}:${replyToId}`);
};
type SignalInboundEntry = {
senderName: string;
senderDisplay: string;
@ -114,6 +189,8 @@ export function createSignalEventHandler(deps: SignalEventHandlerDeps) {
mediaTypes?: string[];
commandAuthorized: boolean;
wasMentioned?: boolean;
// Quote context fields
quoteTarget?: SignalReplyTarget;
};
async function handleSignalInboundMessage(entry: SignalInboundEntry) {
@ -140,11 +217,18 @@ export function createSignalEventHandler(deps: SignalEventHandlerDeps) {
storePath,
sessionKey: route.sessionKey,
});
const quoteBlock = entry.quoteTarget
? `[Quoting ${entry.quoteTarget.author ?? "unknown"}${
entry.quoteTarget.id ? ` id:${entry.quoteTarget.id}` : ""
}]\n"${entry.quoteTarget.body}"\n[/Quoting]`
: "";
const bodyWithQuote = [entry.bodyText, quoteBlock].filter(Boolean).join("\n\n");
const body = formatInboundEnvelope({
channel: "Signal",
from: fromLabel,
timestamp: entry.timestamp ?? undefined,
body: entry.bodyText,
body: bodyWithQuote,
chatType: entry.isGroup ? "group" : "direct",
sender: { name: entry.senderName, id: entry.senderDisplay },
previousTimestamp,
@ -205,6 +289,11 @@ export function createSignalEventHandler(deps: SignalEventHandlerDeps) {
Provider: "signal" as const,
Surface: "signal" as const,
MessageSid: entry.messageId,
// Quote/Reply context fields
ReplyToId: entry.quoteTarget?.id,
ReplyToBody: entry.quoteTarget?.body,
ReplyToSender: entry.quoteTarget?.author,
ReplyToIsQuote: entry.quoteTarget ? true : undefined,
Timestamp: entry.timestamp ?? undefined,
MediaPath: entry.mediaPath,
MediaType: entry.mediaType,
@ -286,11 +375,14 @@ export function createSignalEventHandler(deps: SignalEventHandlerDeps) {
},
});
const replyDeliveryState = { consumed: false };
const { dispatcher, replyOptions, markDispatchIdle } = createReplyDispatcherWithTyping({
...replyPipeline,
humanDelay: resolveHumanDelayConfig(deps.cfg, route.agentId),
typingCallbacks,
deliver: async (payload) => {
const conversationKey = resolveConversationKey(entry);
await deps.deliverReplies({
replies: [payload],
target: ctxPayload.To,
@ -300,6 +392,19 @@ export function createSignalEventHandler(deps: SignalEventHandlerDeps) {
runtime: deps.runtime,
maxBytes: deps.mediaMaxBytes,
textLimit: deps.textLimit,
inheritedReplyToId: entry.messageId,
replyDeliveryState,
resolveQuoteAuthor: (replyToId) => {
const resolvedAuthor = resolveQuotedAuthor({ conversationKey, replyToId });
if (resolvedAuthor) {
return resolvedAuthor;
}
// Don't pin explicit reply IDs to the current sender. Only fall back to the
// inbound sender when we're still replying to the current Signal message.
return replyToId === entry.messageId
? normalizeCachedMessageAuthor(entry.senderRecipient)
: undefined;
},
});
},
onError: (err, info) => {
@ -371,6 +476,11 @@ export function createSignalEventHandler(deps: SignalEventHandlerDeps) {
if (!combinedText.trim()) {
return;
}
// Preserve quoteTarget from the latest entry that has one so the reply
// target matches the newest text in the merged body.
const latestQuoteTarget = entries
.toReversed()
.find((entry) => entry.quoteTarget)?.quoteTarget;
await handleSignalInboundMessage({
...last,
bodyText: combinedText,
@ -378,6 +488,7 @@ export function createSignalEventHandler(deps: SignalEventHandlerDeps) {
mediaType: undefined,
mediaPaths: undefined,
mediaTypes: undefined,
quoteTarget: latestQuoteTarget ?? last.quoteTarget,
});
},
onError: (err) => {
@ -519,10 +630,28 @@ export function createSignalEventHandler(deps: SignalEventHandlerDeps) {
const rawMessage = dataMessage?.message ?? "";
const normalizedMessage = renderSignalMentions(rawMessage, dataMessage?.mentions);
const messageText = normalizedMessage.trim();
const quoteText = dataMessage?.quote?.text?.trim() ?? "";
const senderPeerId = resolveSignalPeerId(sender);
const groupId = dataMessage?.groupInfo?.groupId ?? undefined;
const groupName = dataMessage?.groupInfo?.groupName ?? undefined;
const isGroup = Boolean(groupId);
const conversationKey = resolveConversationKey({
isGroup,
groupId,
senderPeerId,
});
// NOTE: Full group-quote author resolution currently depends on the patched
// signal-cli install at /opt/signal-cli-0.14.1-patched/, which removes the
// quote.getAuthor().isValid() filter. Once upstream ships lib v140 on JitPack,
// the stock signal-cli build should provide the same quote metadata.
const quoteTarget = dataMessage
? (describeSignalReplyTarget(dataMessage, {
resolveAuthor: resolveQuotedAuthor,
conversationKey,
}) ?? undefined)
: undefined;
const hasBodyContent =
Boolean(messageText || quoteText) || Boolean(!reaction && dataMessage?.attachments?.length);
Boolean(messageText || quoteTarget?.body) ||
Boolean(!reaction && dataMessage?.attachments?.length);
const senderDisplay = formatSignalSenderDisplay(sender);
const { resolveAccessDecision, dmAccess, effectiveDmAllow, effectiveGroupAllow } =
await resolveSignalAccessState({
@ -552,15 +681,11 @@ export function createSignalEventHandler(deps: SignalEventHandlerDeps) {
}
const senderRecipient = resolveSignalRecipient(sender);
const senderPeerId = resolveSignalPeerId(sender);
const senderAllowId = formatSignalSenderId(sender);
if (!senderRecipient) {
return;
}
const senderIdLine = formatSignalPairingIdLine(sender);
const groupId = dataMessage.groupInfo?.groupId ?? undefined;
const groupName = dataMessage.groupInfo?.groupName ?? undefined;
const isGroup = Boolean(groupId);
if (!isGroup) {
const allowedDirectMessage = await handleSignalDirectMessageAccess({
@ -654,6 +779,7 @@ export function createSignalEventHandler(deps: SignalEventHandlerDeps) {
commandAuthorized,
});
const effectiveWasMentioned = mentionGate.effectiveWasMentioned;
if (isGroup && requireMention && canDetectMention && mentionGate.shouldSkip) {
logInboundDrop({
log: logVerbose,
@ -661,7 +787,6 @@ export function createSignalEventHandler(deps: SignalEventHandlerDeps) {
reason: "no mention",
target: senderDisplay,
});
const quoteText = dataMessage.quote?.text?.trim() || "";
const pendingPlaceholder = (() => {
if (!dataMessage.attachments?.length) {
return "";
@ -681,7 +806,7 @@ export function createSignalEventHandler(deps: SignalEventHandlerDeps) {
const pendingKind = kindFromMime(firstContentType ?? undefined);
return pendingKind ? `<media:${pendingKind}>` : "<media:attachment>";
})();
const pendingBodyText = messageText || pendingPlaceholder || quoteText;
const pendingBodyText = messageText || pendingPlaceholder || quoteTarget?.body || "";
const historyKey = groupId ?? "unknown";
recordPendingHistoryEntryIfEnabled({
historyMap: deps.groupHistories,
@ -695,6 +820,12 @@ export function createSignalEventHandler(deps: SignalEventHandlerDeps) {
typeof envelope.timestamp === "number" ? String(envelope.timestamp) : undefined,
},
});
// Index the message author even when skipping due to no mention
rememberMessageAuthor({
conversationKey,
messageId: typeof envelope.timestamp === "number" ? String(envelope.timestamp) : undefined,
senderRecipient: senderRecipient,
});
return;
}
@ -745,11 +876,13 @@ export function createSignalEventHandler(deps: SignalEventHandlerDeps) {
}
}
const bodyText = messageText || placeholder || dataMessage.quote?.text?.trim() || "";
if (!bodyText) {
// Build body text - don't use quote text as fallback, preserve it for context
const bodyText = messageText || placeholder || "";
// Continue if we have either body text OR quote context (quote-only messages are valid)
if (!bodyText && !quoteTarget) {
return;
}
const receiptTimestamp =
typeof envelope.timestamp === "number"
? envelope.timestamp
@ -778,6 +911,17 @@ export function createSignalEventHandler(deps: SignalEventHandlerDeps) {
const senderName = envelope.sourceName ?? senderDisplay;
const messageId =
typeof envelope.timestamp === "number" ? String(envelope.timestamp) : undefined;
// Record (conversation, messageId) -> senderRecipient so later explicit reply tags
// ([[reply_to:<id>]]) can resolve a correct quote-author.
// SECURITY: Only cache from actual envelope senders — never from quote metadata,
// which is attacker-controlled and could poison the author cache.
rememberMessageAuthor({
conversationKey,
messageId,
senderRecipient,
});
await inboundDebouncer.enqueue({
senderName,
senderDisplay,
@ -796,6 +940,7 @@ export function createSignalEventHandler(deps: SignalEventHandlerDeps) {
mediaTypes: mediaTypes.length > 0 ? mediaTypes : undefined,
commandAuthorized,
wasMentioned: effectiveWasMentioned,
quoteTarget,
});
};
}

View File

@ -8,6 +8,7 @@ import type { HistoryEntry } from "openclaw/plugin-sdk/reply-history";
import type { ReplyPayload } from "openclaw/plugin-sdk/reply-runtime";
import type { RuntimeEnv } from "openclaw/plugin-sdk/runtime-env";
import type { SignalSender } from "../identity.js";
import type { SignalReplyDeliveryState } from "./reply-delivery.js";
export type SignalEnvelope = {
sourceNumber?: string | null;
@ -28,6 +29,37 @@ export type SignalMention = {
length?: number | null;
};
export type SignalTextStyle = {
start?: number | null;
length?: number | null;
style?: string | null;
};
export type SignalQuotedAttachment = {
contentType?: string | null;
filename?: string | null;
thumbnail?: SignalAttachment | null;
};
export type SignalQuote = {
id?: number | string | null; // signal-cli quote timestamp
author?: string | null; // deprecated legacy identifier from signal-cli
authorNumber?: string | null; // preferred E.164 author when available
authorUuid?: string | null; // preferred UUID author when available
text?: string | null;
mentions?: Array<SignalMention | null> | null;
attachments?: Array<SignalQuotedAttachment | null> | null;
textStyles?: Array<SignalTextStyle | null> | null;
};
export type SignalReplyTarget = {
id?: string; // message id of quoted message
author?: string; // who wrote the quoted message
body: string; // quoted text content
kind: "quote"; // always quote for Signal
mentions?: Array<SignalMention>; // mentions in quoted text
};
export type SignalDataMessage = {
timestamp?: number;
message?: string | null;
@ -37,7 +69,7 @@ export type SignalDataMessage = {
groupId?: string | null;
groupName?: string | null;
} | null;
quote?: { text?: string | null } | null;
quote?: SignalQuote | null;
reaction?: SignalReactionMessage | null;
};
@ -109,6 +141,9 @@ export type SignalEventHandlerDeps = {
runtime: RuntimeEnv;
maxBytes: number;
textLimit: number;
inheritedReplyToId?: string;
replyDeliveryState?: SignalReplyDeliveryState;
resolveQuoteAuthor?: (replyToId: string) => string | undefined;
}) => Promise<void>;
resolveSignalReactionTargets: (reaction: SignalReactionMessage) => SignalReactionTarget[];
isSignalReactionMessage: (

View File

@ -0,0 +1,112 @@
import { kindFromMime } from "../../../../src/media/mime.js";
import { normalizeE164 } from "../../../../src/utils.js";
import { looksLikeUuid } from "../identity.js";
import type {
SignalDataMessage,
SignalMention,
SignalQuote,
SignalQuotedAttachment,
SignalReplyTarget,
} from "./event-handler.types.js";
import { renderSignalMentions } from "./mentions.js";
export type SignalQuotedAuthorResolver = (params: {
conversationKey: string;
replyToId: string;
}) => string | undefined;
function filterMentions(mentions?: Array<SignalMention | null> | null) {
const filtered = mentions?.filter((mention): mention is SignalMention => mention != null);
return filtered && filtered.length > 0 ? filtered : undefined;
}
function normalizeQuoteAuthorValue(raw?: string | null) {
const trimmed = raw?.trim();
if (!trimmed) {
return undefined;
}
const unprefixed = trimmed.replace(/^uuid:/i, "").trim();
if (!unprefixed) {
return undefined;
}
if (looksLikeUuid(unprefixed)) {
// Preserve uuid: prefix for signal-cli compatibility
return `uuid:${unprefixed}`;
}
const digits = unprefixed.replace(/[^\d+]/g, "");
return digits ? normalizeE164(unprefixed) : undefined;
}
function resolveQuotedAuthorFromPayload(quote: SignalQuote) {
return (
normalizeQuoteAuthorValue(quote.authorNumber) ??
normalizeQuoteAuthorValue(quote.authorUuid) ??
normalizeQuoteAuthorValue(quote.author)
);
}
function resolveQuotedAttachmentPlaceholder(
attachments?: Array<SignalQuotedAttachment | null> | null,
) {
const firstContentType = attachments?.find((attachment) => attachment?.contentType)?.contentType;
const kind = kindFromMime(firstContentType ?? undefined);
if (kind) {
return `<media:${kind}>`;
}
return attachments?.length ? "<media:attachment>" : undefined;
}
export function normalizeSignalQuoteId(rawId?: SignalQuote["id"]) {
if (typeof rawId === "number") {
return Number.isInteger(rawId) && rawId > 0 ? String(rawId) : undefined;
}
const trimmed = rawId?.trim();
if (!trimmed) {
return undefined;
}
// Only accept decimal digit strings — reject hex (0x10), scientific (1e3),
// and other Number()-parseable formats that would normalize to a different ID.
if (!/^\d+$/.test(trimmed)) {
return undefined;
}
const numeric = Number(trimmed);
return Number.isInteger(numeric) && numeric > 0 ? String(numeric) : undefined;
}
export function describeSignalReplyTarget(
dataMessage: SignalDataMessage,
opts: {
resolveAuthor?: SignalQuotedAuthorResolver;
conversationKey?: string;
} = {},
): SignalReplyTarget | null {
const quote = dataMessage.quote;
if (!quote) {
return null;
}
const id = normalizeSignalQuoteId(quote.id);
const mentions = filterMentions(quote.mentions);
const renderedText = renderSignalMentions(quote.text ?? "", mentions)?.trim() || "";
const body =
renderedText ||
resolveQuotedAttachmentPlaceholder(quote.attachments) ||
(id ? "<quoted message>" : "");
if (!body) {
return null;
}
const author =
resolveQuotedAuthorFromPayload(quote) ??
(opts.resolveAuthor && opts.conversationKey && id
? opts.resolveAuthor({ conversationKey: opts.conversationKey, replyToId: id })
: undefined);
return {
id,
author,
body,
kind: "quote",
mentions,
};
}

View File

@ -0,0 +1,128 @@
import { describe, expect, it } from "vitest";
import {
markSignalReplyConsumed,
resolveSignalReplyDelivery,
type SignalReplyDeliveryState,
} from "./reply-delivery.js";
describe("resolveSignalReplyDelivery", () => {
it("uses the inherited reply target until the first successful send", () => {
const state: SignalReplyDeliveryState = { consumed: false };
const first = resolveSignalReplyDelivery({
payload: { text: "first" },
inheritedReplyToId: "1700000000000",
state,
});
markSignalReplyConsumed(state, first.effectiveReplyTo);
const second = resolveSignalReplyDelivery({
payload: { text: "second" },
inheritedReplyToId: "1700000000000",
state,
});
expect(first.payload.replyToId).toBeUndefined();
expect(first.effectiveReplyTo).toBe("1700000000000");
expect(second.payload.replyToId).toBeUndefined();
expect(second.effectiveReplyTo).toBeUndefined();
});
it("keeps explicit reply targets after the inherited reply target is consumed", () => {
const state: SignalReplyDeliveryState = { consumed: true };
const explicit = resolveSignalReplyDelivery({
payload: { text: "second", replyToId: "1700000000002" },
inheritedReplyToId: "1700000000000",
state,
});
expect(explicit.payload.replyToId).toBe("1700000000002");
expect(explicit.effectiveReplyTo).toBe("1700000000002");
});
it("preserves explicit null reply suppression without consuming inherited reply state", () => {
const state: SignalReplyDeliveryState = { consumed: false };
const suppressed = resolveSignalReplyDelivery({
payload: { text: "first", replyToId: null },
inheritedReplyToId: "1700000000000",
state,
});
markSignalReplyConsumed(state, suppressed.effectiveReplyTo);
const inherited = resolveSignalReplyDelivery({
payload: { text: "second" },
inheritedReplyToId: "1700000000000",
state,
});
expect(suppressed.payload.replyToId).toBeUndefined();
expect(suppressed.effectiveReplyTo).toBeUndefined();
expect(inherited.effectiveReplyTo).toBe("1700000000000");
});
it("does not consume inherited reply state for non-decimal reply ids", () => {
const state: SignalReplyDeliveryState = { consumed: false };
// Simulate a malformed reply_to tag that resolved to a non-timestamp string
const malformed = resolveSignalReplyDelivery({
payload: { text: "first", replyToId: "not-a-timestamp" },
inheritedReplyToId: "1700000000000",
state,
});
markSignalReplyConsumed(state, malformed.effectiveReplyTo);
// The inherited reply should still be available for the next payload
const next = resolveSignalReplyDelivery({
payload: { text: "second" },
inheritedReplyToId: "1700000000000",
state,
});
expect(malformed.effectiveReplyTo).toBe("not-a-timestamp");
expect(state.consumed).toBe(false);
expect(next.effectiveReplyTo).toBe("1700000000000");
});
it("does not consume inherited reply state for zero timestamps", () => {
const state: SignalReplyDeliveryState = { consumed: false };
const zero = resolveSignalReplyDelivery({
payload: { text: "first", replyToId: "0" },
inheritedReplyToId: "1700000000000",
state,
});
markSignalReplyConsumed(state, zero.effectiveReplyTo);
const next = resolveSignalReplyDelivery({
payload: { text: "second" },
inheritedReplyToId: "1700000000000",
state,
});
expect(zero.effectiveReplyTo).toBe("0");
expect(state.consumed).toBe(false);
expect(next.effectiveReplyTo).toBe("1700000000000");
});
it("does not consume inherited group reply state when quoteAuthor is unavailable", () => {
const state: SignalReplyDeliveryState = { consumed: false };
const first = resolveSignalReplyDelivery({
payload: { text: "first" },
inheritedReplyToId: "1700000000000",
state,
});
markSignalReplyConsumed(state, first.effectiveReplyTo, { isGroup: true });
const next = resolveSignalReplyDelivery({
payload: { text: "second" },
inheritedReplyToId: "1700000000000",
state,
});
expect(state.consumed).toBe(false);
expect(next.effectiveReplyTo).toBe("1700000000000");
});
});

View File

@ -0,0 +1,75 @@
import type { ReplyPayload } from "../../../../src/auto-reply/types.js";
import { resolveSignalQuoteMetadata } from "../reply-quote.js";
export type SignalReplyDeliveryState = {
consumed: boolean;
};
function normalizeReplyToId(raw?: string | null) {
if (raw == null) {
return raw;
}
const trimmed = raw.trim();
return trimmed ? trimmed : null;
}
export function resolveSignalReplyDelivery(params: {
payload: ReplyPayload;
inheritedReplyToId?: string | null;
state?: SignalReplyDeliveryState;
}): {
payload: ReplyPayload;
effectiveReplyTo?: string;
} {
const explicitReplyTo =
"replyToId" in params.payload ? normalizeReplyToId(params.payload.replyToId) : undefined;
const inheritedReplyTo =
explicitReplyTo === undefined ? normalizeReplyToId(params.inheritedReplyToId) : undefined;
const effectiveReplyTo =
explicitReplyTo != null
? explicitReplyTo
: !params.state?.consumed
? (inheritedReplyTo ?? undefined)
: undefined;
if (explicitReplyTo === undefined) {
return {
payload: params.payload,
effectiveReplyTo,
};
}
return {
payload:
explicitReplyTo === null
? { ...params.payload, replyToId: undefined }
: params.payload.replyToId === explicitReplyTo
? params.payload
: { ...params.payload, replyToId: explicitReplyTo },
effectiveReplyTo,
};
}
/**
* Only consume the inherited reply state when Signal can actually send quote
* metadata for the payload. Malformed ids (e.g. a raw `[[reply_to:...]]` tag)
* and group replies without a resolved quote-author are silently dropped by
* `sendMessageSignal`, so consuming state for them would lose the inherited
* quote for subsequent payloads in the same turn.
*/
export function markSignalReplyConsumed(
state: SignalReplyDeliveryState | undefined,
replyToId?: string,
options: { isGroup?: boolean; quoteAuthor?: string | null } = {},
) {
if (
state &&
resolveSignalQuoteMetadata({
replyToId,
quoteAuthor: options.quoteAuthor,
isGroup: options.isGroup,
}).quoteTimestamp !== undefined
) {
state.consumed = true;
}
}

View File

@ -31,7 +31,16 @@ export const signalOutbound: ChannelOutboundAdapter = {
chunker: (text, _limit) => text.split(/\n{2,}/).flatMap((chunk) => (chunk ? [chunk] : [])),
chunkerMode: "text",
textChunkLimit: 4000,
sendFormattedText: async ({ cfg, to, text, accountId, deps, abortSignal }) => {
sendFormattedText: async ({
cfg,
to,
text,
accountId,
deps,
abortSignal,
replyToId,
quoteAuthor,
}) => {
const send = resolveSignalSender(deps);
const maxBytes = resolveSignalMaxBytes({
cfg,
@ -49,6 +58,7 @@ export const signalOutbound: ChannelOutboundAdapter = {
chunks = [{ text, styles: [] }];
}
const results = [];
let first = true;
for (const chunk of chunks) {
abortSignal?.throwIfAborted();
const result = await send(to, chunk.text, {
@ -57,8 +67,11 @@ export const signalOutbound: ChannelOutboundAdapter = {
accountId: accountId ?? undefined,
textMode: "plain",
textStyles: chunk.styles,
replyTo: first ? (replyToId ?? undefined) : undefined,
quoteAuthor: first ? (quoteAuthor ?? undefined) : undefined,
});
results.push(result);
first = false;
}
return attachChannelToResults("signal", results);
},
@ -71,6 +84,8 @@ export const signalOutbound: ChannelOutboundAdapter = {
accountId,
deps,
abortSignal,
replyToId,
quoteAuthor,
}) => {
abortSignal?.throwIfAborted();
const send = resolveSignalSender(deps);
@ -93,12 +108,14 @@ export const signalOutbound: ChannelOutboundAdapter = {
textMode: "plain",
textStyles: formatted.styles,
mediaLocalRoots,
replyTo: replyToId ?? undefined,
quoteAuthor: quoteAuthor ?? undefined,
});
return attachChannelToResult("signal", result);
},
...createAttachedChannelResultAdapter({
channel: "signal",
sendText: async ({ cfg, to, text, accountId, deps }) => {
sendText: async ({ cfg, to, text, accountId, deps, replyToId, quoteAuthor }) => {
const send = resolveSignalSender(deps);
const maxBytes = resolveSignalMaxBytes({
cfg,
@ -108,9 +125,21 @@ export const signalOutbound: ChannelOutboundAdapter = {
cfg,
maxBytes,
accountId: accountId ?? undefined,
replyTo: replyToId ?? undefined,
quoteAuthor: quoteAuthor ?? undefined,
});
},
sendMedia: async ({ cfg, to, text, mediaUrl, mediaLocalRoots, accountId, deps }) => {
sendMedia: async ({
cfg,
to,
text,
mediaUrl,
mediaLocalRoots,
accountId,
deps,
replyToId,
quoteAuthor,
}) => {
const send = resolveSignalSender(deps);
const maxBytes = resolveSignalMaxBytes({
cfg,
@ -122,6 +151,8 @@ export const signalOutbound: ChannelOutboundAdapter = {
maxBytes,
accountId: accountId ?? undefined,
mediaLocalRoots,
replyTo: replyToId ?? undefined,
quoteAuthor: quoteAuthor ?? undefined,
});
},
}),

View File

@ -0,0 +1,40 @@
const SIGNAL_QUOTE_TIMESTAMP_RE = /^\d+$/;
export function parseSignalQuoteTimestamp(raw?: string | null): number | undefined {
const trimmed = raw?.trim();
if (!trimmed || !SIGNAL_QUOTE_TIMESTAMP_RE.test(trimmed)) {
return undefined;
}
const timestamp = Number(trimmed);
return Number.isInteger(timestamp) && timestamp > 0 ? timestamp : undefined;
}
export function isSignalGroupTarget(rawTarget: string): boolean {
let value = rawTarget.trim();
if (value.toLowerCase().startsWith("signal:")) {
value = value.slice("signal:".length).trim();
}
return value.toLowerCase().startsWith("group:");
}
export function resolveSignalQuoteMetadata(params: {
replyToId?: string | null;
quoteAuthor?: string | null;
isGroup?: boolean;
}): {
quoteTimestamp?: number;
quoteAuthor?: string;
} {
const quoteTimestamp = parseSignalQuoteTimestamp(params.replyToId);
if (quoteTimestamp === undefined) {
return {};
}
const quoteAuthor = params.quoteAuthor?.trim() || undefined;
if (params.isGroup && !quoteAuthor) {
return {};
}
return { quoteTimestamp, quoteAuthor };
}

View File

@ -0,0 +1,80 @@
import { beforeEach, describe, expect, it, vi } from "vitest";
import { sendMessageSignal } from "./send.js";
const rpcMock = vi.fn();
vi.mock("../../../src/config/config.js", async (importOriginal) => {
const actual = await importOriginal<typeof import("../../../src/config/config.js")>();
return {
...actual,
loadConfig: () => ({}),
};
});
vi.mock("./accounts.js", () => ({
resolveSignalAccount: () => ({
accountId: "default",
enabled: true,
baseUrl: "http://signal.local",
configured: true,
config: { account: "+15550001111" },
}),
}));
vi.mock("./client.js", () => ({
signalRpcRequest: (...args: unknown[]) => rpcMock(...args),
}));
describe("sendMessageSignal", () => {
beforeEach(() => {
rpcMock.mockReset().mockResolvedValue({ timestamp: 123 });
});
it("sends quote-author for group replies when quoteAuthor is available", async () => {
await sendMessageSignal("group:test-group", "hello", {
textMode: "plain",
replyTo: "1700000000000",
quoteAuthor: "uuid:sender-1",
});
const params = rpcMock.mock.calls[0]?.[1] as Record<string, unknown>;
expect(rpcMock).toHaveBeenCalledWith("send", expect.any(Object), expect.any(Object));
expect(params.groupId).toBe("test-group");
expect(params["quote-timestamp"]).toBe(1700000000000);
expect(params["quote-author"]).toBe("uuid:sender-1");
});
it("sends quote-timestamp for direct replies without quoteAuthor", async () => {
await sendMessageSignal("+15551230000", "hello", {
textMode: "plain",
replyTo: "1700000000000",
});
const params = rpcMock.mock.calls[0]?.[1] as Record<string, unknown>;
expect(params["quote-timestamp"]).toBe(1700000000000);
expect(params["quote-author"]).toBeUndefined();
});
it("ignores replyTo values with trailing non-numeric characters", async () => {
await sendMessageSignal("+15551230000", "hello", {
textMode: "plain",
replyTo: "1700000000000abc",
quoteAuthor: "uuid:sender-1",
});
const params = rpcMock.mock.calls[0]?.[1] as Record<string, unknown>;
expect(params["quote-timestamp"]).toBeUndefined();
expect(params["quote-author"]).toBeUndefined();
});
it("skips group quote metadata when quoteAuthor is unavailable", async () => {
await sendMessageSignal("group:test-group", "hello", {
textMode: "plain",
replyTo: "1700000000000",
});
const params = rpcMock.mock.calls[0]?.[1] as Record<string, unknown>;
expect(params["quote-timestamp"]).toBeUndefined();
expect(params["quote-author"]).toBeUndefined();
});
});

View File

@ -5,6 +5,7 @@ import { resolveOutboundAttachmentFromUrl } from "openclaw/plugin-sdk/media-runt
import { resolveSignalAccount } from "./accounts.js";
import { signalRpcRequest } from "./client.js";
import { markdownToSignalText, type SignalTextStyleRange } from "./format.js";
import { resolveSignalQuoteMetadata } from "./reply-quote.js";
import { resolveSignalRpcContext } from "./rpc-context.js";
export type SignalSendOpts = {
@ -18,6 +19,8 @@ export type SignalSendOpts = {
timeoutMs?: number;
textMode?: "markdown" | "plain";
textStyles?: SignalTextStyleRange[];
replyTo?: string;
quoteAuthor?: string;
};
export type SignalSendResult = {
@ -181,6 +184,19 @@ export async function sendMessageSignal(
}
Object.assign(params, targetParams);
// Add quote parameters for reply functionality
const { quoteTimestamp, quoteAuthor } = resolveSignalQuoteMetadata({
replyToId: opts.replyTo,
quoteAuthor: opts.quoteAuthor,
isGroup: target.type === "group",
});
if (quoteTimestamp !== undefined) {
params["quote-timestamp"] = quoteTimestamp;
if (quoteAuthor) {
params["quote-author"] = quoteAuthor;
}
}
const result = await signalRpcRequest<{ timestamp?: number }>("send", params, {
baseUrl,
timeoutMs: opts.timeoutMs,

View File

@ -649,7 +649,9 @@ export async function deliverReplies(params: {
try {
const deliveredCountBeforeReply = progress.deliveredCount;
const replyToId =
params.replyToMode === "off" ? undefined : resolveTelegramReplyId(reply.replyToId);
params.replyToMode === "off"
? undefined
: resolveTelegramReplyId(reply.replyToId ?? undefined);
const telegramData = reply.channelData?.telegram as TelegramReplyChannelData | undefined;
const shouldPinFirstMessage = telegramData?.pin === true;
const replyMarkup = buildInlineKeyboard(telegramData?.buttons);

View File

@ -10,6 +10,7 @@ import {
sendPayloadMediaSequenceOrFallback,
} from "openclaw/plugin-sdk/reply-payload";
import type { ReplyPayload } from "openclaw/plugin-sdk/reply-runtime";
import { markReplyApplied } from "../../../src/infra/outbound/reply-applied.js";
import type { TelegramInlineButtons } from "./button-types.js";
import { resolveTelegramInlineButtons } from "./button-types.js";
import { markdownToTelegramHtmlChunks } from "./format.js";
@ -21,6 +22,13 @@ export const TELEGRAM_TEXT_CHUNK_LIMIT = 4000;
type TelegramSendFn = typeof sendMessageTelegram;
type TelegramSendOpts = Parameters<TelegramSendFn>[2];
function attachReplyAppliedMarker<T extends object>(
result: T,
baseOpts: { replyToMessageId?: number },
) {
return markReplyApplied(result, baseOpts.replyToMessageId !== undefined);
}
function resolveTelegramSendContext(params: {
cfg: NonNullable<TelegramSendOpts>["cfg"];
deps?: OutboundSendDeps;
@ -116,9 +124,10 @@ export const telegramOutbound: ChannelOutboundAdapter = {
replyToId,
threadId,
});
return await send(to, text, {
const result = await send(to, text, {
...baseOpts,
});
return attachReplyAppliedMarker(result, baseOpts);
},
sendMedia: async ({
cfg,
@ -145,6 +154,7 @@ export const telegramOutbound: ChannelOutboundAdapter = {
mediaLocalRoots,
forceDocument: forceDocument ?? false,
});
return attachReplyAppliedMarker(result, baseOpts);
},
}),
sendPayload: async ({
@ -175,6 +185,6 @@ export const telegramOutbound: ChannelOutboundAdapter = {
forceDocument: forceDocument ?? false,
},
});
return attachChannelToResult("telegram", result);
return attachReplyAppliedMarker(attachChannelToResult("telegram", result), baseOpts);
},
};

View File

@ -59,7 +59,7 @@ export type EmbeddedPiRunResult = {
text?: string;
mediaUrl?: string;
mediaUrls?: string[];
replyToId?: string;
replyToId?: string | null;
isError?: boolean;
}>;
meta: EmbeddedPiRunMeta;

View File

@ -4,7 +4,7 @@ export const runEmbeddedPiAgentMock: Mock = vi.fn();
vi.mock("../agents/pi-embedded.js", () => ({
abortEmbeddedPiRun: vi.fn().mockReturnValue(false),
runEmbeddedPiAgent: (...args: unknown[]) => runEmbeddedPiAgentMock(...args),
runEmbeddedPiAgent: runEmbeddedPiAgentMock,
queueEmbeddedPiMessage: vi.fn().mockReturnValue(false),
resolveEmbeddedSessionLane: (key: string) => `session:${key.trim() || "main"}`,
isEmbeddedPiRunActive: vi.fn().mockReturnValue(false),

View File

@ -45,7 +45,7 @@ describe("createBlockReplyContentKey", () => {
describe("createBlockReplyPipeline dedup with threading", () => {
it("keeps separate deliveries for same text with different replyToId", async () => {
const sent: Array<{ text?: string; replyToId?: string }> = [];
const sent: Array<{ text?: string; replyToId?: string | null }> = [];
const pipeline = createBlockReplyPipeline({
onBlockReply: async (payload) => {
sent.push({ text: payload.text, replyToId: payload.replyToId });

View File

@ -2,10 +2,33 @@ import { describe, expect, it } from "vitest";
import { setActivePluginRegistry } from "../../plugins/runtime.js";
import { createTestRegistry } from "../../test-utils/channel-plugins.js";
import {
applyReplyThreading,
filterMessagingToolMediaDuplicates,
shouldSuppressMessagingToolReplies,
} from "./reply-payloads.js";
describe("applyReplyThreading", () => {
it("treats whitespace-only replyToId as unset so implicit replies still apply", () => {
const result = applyReplyThreading({
payloads: [{ text: "hello", replyToId: " \n\t " }],
replyToMode: "all",
currentMessageId: "123",
});
expect(result).toEqual([{ text: "hello", replyToId: "123" }]);
});
it("preserves explicit null replyToId as do-not-reply", () => {
const result = applyReplyThreading({
payloads: [{ text: "hello", replyToId: null }],
replyToMode: "all",
currentMessageId: "123",
});
expect(result).toEqual([{ text: "hello", replyToId: null }]);
});
});
describe("filterMessagingToolMediaDuplicates", () => {
it("strips mediaUrl when it matches sentMediaUrls", () => {
const result = filterMessagingToolMediaDuplicates({

View File

@ -31,12 +31,23 @@ function resolveReplyThreadingForPayload(params: {
}): ReplyPayload {
const implicitReplyToId = params.implicitReplyToId?.trim() || undefined;
const currentMessageId = params.currentMessageId?.trim() || undefined;
const normalizedReplyToId =
typeof params.payload.replyToId === "string"
? params.payload.replyToId.trim() || undefined
: params.payload.replyToId;
const normalizedPayload =
normalizedReplyToId === params.payload.replyToId
? params.payload
: { ...params.payload, replyToId: normalizedReplyToId };
const hasExplicitReplyToId = normalizedReplyToId !== undefined;
// 1) Apply implicit reply threading first (replyToMode will strip later if needed).
// Treat replyToId=null as an explicit "do not reply" signal (do not override).
let resolved: ReplyPayload =
params.payload.replyToId || params.payload.replyToCurrent === false || !implicitReplyToId
? params.payload
: { ...params.payload, replyToId: implicitReplyToId };
hasExplicitReplyToId || normalizedPayload.replyToCurrent === false || !implicitReplyToId
? normalizedPayload
: { ...normalizedPayload, replyToId: implicitReplyToId };
// 2) Parse explicit reply tags from text (if present) and clean them.
if (typeof resolved.text === "string" && resolved.text.includes("[[")) {
@ -55,7 +66,7 @@ function resolveReplyThreadingForPayload(params: {
// 3) If replyToCurrent was set out-of-band (e.g. tags already stripped upstream),
// ensure replyToId is set to the current message id when available.
if (resolved.replyToCurrent && !resolved.replyToId && currentMessageId) {
if (resolved.replyToCurrent && resolved.replyToId === undefined && currentMessageId) {
resolved = {
...resolved,
replyToId: currentMessageId,

View File

@ -159,6 +159,39 @@ describe("applyReplyThreading auto-threading", () => {
expect(result[0].replyToId).toBe("42");
});
it("does not overwrite explicit replyToId:null when implicit threading is available", () => {
const result = applyReplyThreading({
payloads: [{ text: "Hello", replyToId: null }],
replyToMode: "all",
currentMessageId: "42",
});
expect(result).toHaveLength(1);
expect(result[0].replyToId).toBeNull();
});
it("treats blank explicit replyToId as undefined so implicit threading still applies", () => {
const result = applyReplyThreading({
payloads: [{ text: "Hello", replyToId: " " }],
replyToMode: "all",
currentMessageId: "42",
});
expect(result).toHaveLength(1);
expect(result[0].replyToId).toBe("42");
});
it("trims non-empty explicit replyToId before preserving it", () => {
const result = applyReplyThreading({
payloads: [{ text: "Hello", replyToId: " abc-123 " }],
replyToMode: "all",
currentMessageId: "42",
});
expect(result).toHaveLength(1);
expect(result[0].replyToId).toBe("abc-123");
});
it("threads only first payload when mode is 'first'", () => {
const result = applyReplyThreading({
payloads: [{ text: "A" }, { text: "B" }],

View File

@ -81,7 +81,7 @@ export type ReplyPayload = {
btw?: {
question: string;
};
replyToId?: string;
replyToId?: string | null;
replyToTag?: boolean;
/** True when [[reply_to_current]] was present but not yet mapped to a message id. */
replyToCurrent?: boolean;

View File

@ -15,6 +15,7 @@ type DirectSendOptions = {
cfg: OpenClawConfig;
accountId?: string | null;
replyToId?: string | null;
quoteAuthor?: string | null;
mediaUrl?: string;
mediaLocalRoots?: readonly string[];
maxBytes?: number;
@ -78,6 +79,7 @@ export function createDirectTextMediaOutbound<
accountId?: string | null;
deps?: OutboundSendDeps;
replyToId?: string | null;
quoteAuthor?: string | null;
mediaUrl?: string;
mediaLocalRoots?: readonly string[];
buildOptions: (params: DirectSendOptions) => TOpts;
@ -96,6 +98,7 @@ export function createDirectTextMediaOutbound<
mediaLocalRoots: sendParams.mediaLocalRoots,
accountId: sendParams.accountId,
replyToId: sendParams.replyToId,
quoteAuthor: sendParams.quoteAuthor,
maxBytes,
}),
);
@ -109,7 +112,7 @@ export function createDirectTextMediaOutbound<
textChunkLimit: 4000,
sendPayload: async (ctx) =>
await sendTextMediaPayload({ channel: params.channel, ctx, adapter: outbound }),
sendText: async ({ cfg, to, text, accountId, deps, replyToId }) => {
sendText: async ({ cfg, to, text, accountId, deps, replyToId, quoteAuthor }) => {
return await sendDirect({
cfg,
to,
@ -117,10 +120,21 @@ export function createDirectTextMediaOutbound<
accountId,
deps,
replyToId,
quoteAuthor,
buildOptions: params.buildTextOptions,
});
},
sendMedia: async ({ cfg, to, text, mediaUrl, mediaLocalRoots, accountId, deps, replyToId }) => {
sendMedia: async ({
cfg,
to,
text,
mediaUrl,
mediaLocalRoots,
accountId,
deps,
replyToId,
quoteAuthor,
}) => {
return await sendDirect({
cfg,
to,
@ -130,6 +144,7 @@ export function createDirectTextMediaOutbound<
accountId,
deps,
replyToId,
quoteAuthor,
buildOptions: params.buildMediaOptions,
});
},

View File

@ -67,4 +67,32 @@ describe("signalOutbound", () => {
);
expect(result).toEqual({ channel: "signal", messageId: "sig-media-1", timestamp: 456 });
});
it("passes replyToId and quoteAuthor through sendText", async () => {
const sendSignal = vi.fn().mockResolvedValue({ messageId: "sig-text-2", timestamp: 789 });
const sendText = signalOutbound.sendText;
expect(sendText).toBeDefined();
const result = await sendText!({
cfg,
to: "group:test-group",
text: "hello",
accountId: "work",
replyToId: "1700000000000",
quoteAuthor: "uuid:sender-1",
deps: { sendSignal },
});
expect(sendSignal).toHaveBeenCalledWith(
"group:test-group",
"hello",
expect.objectContaining({
accountId: "work",
maxBytes: 4 * 1024 * 1024,
replyTo: "1700000000000",
quoteAuthor: "uuid:sender-1",
}),
);
expect(result).toEqual({ channel: "signal", messageId: "sig-text-2", timestamp: 789 });
});
});

View File

@ -135,6 +135,7 @@ export type ChannelOutboundContext = {
/** Send image as document to avoid Telegram compression. */
forceDocument?: boolean;
replyToId?: string | null;
quoteAuthor?: string | null;
threadId?: string | number | null;
accountId?: string | null;
identity?: OutboundIdentity;

View File

@ -0,0 +1,472 @@
import { afterEach, beforeEach, describe, expect, it, vi } from "vitest";
import type { ReplyPayload } from "../../auto-reply/types.js";
import { telegramOutbound } from "../../channels/plugins/outbound/telegram.js";
import type { OpenClawConfig } from "../../config/config.js";
import { setActivePluginRegistry } from "../../plugins/runtime.js";
import { createOutboundTestPlugin, createTestRegistry } from "../../test-utils/channel-plugins.js";
const { deliverOutboundPayloads } = await import("./deliver.js");
const defaultRegistry = createTestRegistry([
{
pluginId: "telegram",
plugin: createOutboundTestPlugin({ id: "telegram", outbound: telegramOutbound }),
source: "test",
},
]);
describe("deliverOutboundPayloads Greptile fixes", () => {
beforeEach(() => {
setActivePluginRegistry(defaultRegistry);
});
afterEach(() => {
setActivePluginRegistry(createTestRegistry());
});
it("retries replyToId on later non-signal text payloads after a best-effort failure", async () => {
const sendTelegram = vi
.fn()
.mockRejectedValueOnce(new Error("text fail"))
.mockResolvedValueOnce({ messageId: "m2", chatId: "chat-1" });
const onError = vi.fn();
const cfg: OpenClawConfig = {
channels: { telegram: { botToken: "tok-1", textChunkLimit: 2 } },
};
const results = await deliverOutboundPayloads({
cfg,
channel: "telegram",
to: "123",
payloads: [{ text: "ab" }, { text: "cd" }],
replyToId: "777",
deps: { sendTelegram },
bestEffort: true,
onError,
skipQueue: true,
});
expect(sendTelegram).toHaveBeenCalledTimes(2);
expect(sendTelegram.mock.calls[0]?.[2]).toEqual(
expect.objectContaining({ replyToMessageId: 777 }),
);
expect(sendTelegram.mock.calls[1]?.[2]).toEqual(
expect.objectContaining({ replyToMessageId: 777 }),
);
expect(onError).toHaveBeenCalledTimes(1);
expect(results).toEqual([{ messageId: "m2", chatId: "chat-1", channel: "telegram" }]);
});
it("retries replyToId on later sendPayload payloads after a best-effort failure", async () => {
const sendPayload = vi
.fn()
.mockRejectedValueOnce(new Error("payload fail"))
.mockResolvedValueOnce({ channel: "matrix", messageId: "mx-2" });
const sendText = vi.fn();
const sendMedia = vi.fn();
const onError = vi.fn();
setActivePluginRegistry(
createTestRegistry([
{
pluginId: "matrix",
source: "test",
plugin: createOutboundTestPlugin({
id: "matrix",
outbound: { deliveryMode: "direct", sendPayload, sendText, sendMedia },
}),
},
]),
);
const results = await deliverOutboundPayloads({
cfg: {},
channel: "matrix",
to: "!room:1",
payloads: [
{ text: "first", channelData: { mode: "custom" } },
{ text: "second", channelData: { mode: "custom" } },
],
replyToId: "orig-msg-id",
bestEffort: true,
onError,
skipQueue: true,
});
expect(sendPayload).toHaveBeenCalledTimes(2);
expect(sendPayload.mock.calls[0]?.[0]?.replyToId).toBe("orig-msg-id");
expect(sendPayload.mock.calls[1]?.[0]?.replyToId).toBe("orig-msg-id");
expect(onError).toHaveBeenCalledTimes(1);
expect(results).toEqual([{ channel: "matrix", messageId: "mx-2" }]);
});
it("preserves explicit null reply suppression without consuming the inherited reply", async () => {
const sendPayload = vi
.fn()
.mockResolvedValueOnce({ channel: "matrix", messageId: "mx-1" })
.mockResolvedValueOnce({ channel: "matrix", messageId: "mx-2" });
const sendText = vi.fn();
const sendMedia = vi.fn();
setActivePluginRegistry(
createTestRegistry([
{
pluginId: "matrix",
source: "test",
plugin: createOutboundTestPlugin({
id: "matrix",
outbound: { deliveryMode: "direct", sendPayload, sendText, sendMedia },
}),
},
]),
);
const payloads: ReplyPayload[] = [
{ text: "first", channelData: { mode: "custom" }, replyToId: null },
{ text: "second", channelData: { mode: "custom" } },
];
const results = await deliverOutboundPayloads({
cfg: {},
channel: "matrix",
to: "!room:1",
payloads,
replyToId: "orig-msg-id",
skipQueue: true,
});
expect(sendPayload).toHaveBeenCalledTimes(2);
expect(sendPayload.mock.calls[0]?.[0]?.replyToId).toBeUndefined();
expect(sendPayload.mock.calls[1]?.[0]?.replyToId).toBe("orig-msg-id");
expect(results).toEqual([
{ channel: "matrix", messageId: "mx-1" },
{ channel: "matrix", messageId: "mx-2" },
]);
});
it("treats replyToId: undefined as inherited reply metadata", async () => {
const sendPayload = vi
.fn()
.mockResolvedValueOnce({ channel: "matrix", messageId: "mx-1" })
.mockResolvedValueOnce({ channel: "matrix", messageId: "mx-2" });
const sendText = vi.fn();
const sendMedia = vi.fn();
setActivePluginRegistry(
createTestRegistry([
{
pluginId: "matrix",
source: "test",
plugin: createOutboundTestPlugin({
id: "matrix",
outbound: { deliveryMode: "direct", sendPayload, sendText, sendMedia },
}),
},
]),
);
const results = await deliverOutboundPayloads({
cfg: {},
channel: "matrix",
to: "!room:1",
payloads: [
{ text: "first", channelData: { mode: "custom" }, replyToId: undefined },
{ text: "second", channelData: { mode: "custom" } },
],
replyToId: "orig-msg-id",
skipQueue: true,
});
expect(sendPayload).toHaveBeenCalledTimes(2);
expect(sendPayload.mock.calls[0]?.[0]?.replyToId).toBe("orig-msg-id");
expect(sendPayload.mock.calls[1]?.[0]?.replyToId).toBeNull();
expect(results).toEqual([
{ channel: "matrix", messageId: "mx-1" },
{ channel: "matrix", messageId: "mx-2" },
]);
});
it("clears replyToId on later sendPayload payloads after the first successful send", async () => {
const sendPayload = vi
.fn()
.mockResolvedValueOnce({ channel: "matrix", messageId: "mx-1" })
.mockResolvedValueOnce({ channel: "matrix", messageId: "mx-2" });
const sendText = vi.fn();
const sendMedia = vi.fn();
setActivePluginRegistry(
createTestRegistry([
{
pluginId: "matrix",
source: "test",
plugin: createOutboundTestPlugin({
id: "matrix",
outbound: { deliveryMode: "direct", sendPayload, sendText, sendMedia },
}),
},
]),
);
const results = await deliverOutboundPayloads({
cfg: {},
channel: "matrix",
to: "!room:1",
payloads: [
{ text: "first", channelData: { mode: "custom" } },
{ text: "second", channelData: { mode: "custom" } },
],
replyToId: "orig-msg-id",
skipQueue: true,
});
expect(sendPayload).toHaveBeenCalledTimes(2);
expect(sendPayload.mock.calls[0]?.[0]?.replyToId).toBe("orig-msg-id");
expect(sendPayload.mock.calls[1]?.[0]?.replyToId).toBeNull();
expect(results).toEqual([
{ channel: "matrix", messageId: "mx-1" },
{ channel: "matrix", messageId: "mx-2" },
]);
});
it("preserves later explicit replyToId values after the first successful send", async () => {
const sendPayload = vi
.fn()
.mockResolvedValueOnce({ channel: "matrix", messageId: "mx-1" })
.mockResolvedValueOnce({ channel: "matrix", messageId: "mx-2" })
.mockResolvedValueOnce({ channel: "matrix", messageId: "mx-3" });
const sendText = vi.fn();
const sendMedia = vi.fn();
setActivePluginRegistry(
createTestRegistry([
{
pluginId: "matrix",
source: "test",
plugin: createOutboundTestPlugin({
id: "matrix",
outbound: { deliveryMode: "direct", sendPayload, sendText, sendMedia },
}),
},
]),
);
const results = await deliverOutboundPayloads({
cfg: {},
channel: "matrix",
to: "!room:1",
payloads: [
{ text: "first", replyToId: "reply-1", channelData: { mode: "custom" } },
{ text: "second", replyToId: "reply-2", channelData: { mode: "custom" } },
{ text: "third", replyToId: "reply-3", channelData: { mode: "custom" } },
],
skipQueue: true,
});
expect(sendPayload).toHaveBeenCalledTimes(3);
expect(sendPayload.mock.calls[0]?.[0]?.replyToId).toBe("reply-1");
expect(sendPayload.mock.calls[1]?.[0]?.replyToId).toBe("reply-2");
expect(sendPayload.mock.calls[2]?.[0]?.replyToId).toBe("reply-3");
expect(results).toEqual([
{ channel: "matrix", messageId: "mx-1" },
{ channel: "matrix", messageId: "mx-2" },
{ channel: "matrix", messageId: "mx-3" },
]);
});
it("preserves inherited replyToId across all googlechat sendPayload payloads (thread routing)", async () => {
const sendPayload = vi
.fn()
.mockResolvedValueOnce({ channel: "googlechat", messageId: "gc-1" })
.mockResolvedValueOnce({ channel: "googlechat", messageId: "gc-2" })
.mockResolvedValueOnce({ channel: "googlechat", messageId: "gc-3" });
const sendText = vi.fn();
const sendMedia = vi.fn();
setActivePluginRegistry(
createTestRegistry([
{
pluginId: "googlechat",
source: "test",
plugin: createOutboundTestPlugin({
id: "googlechat",
outbound: { deliveryMode: "direct", sendPayload, sendText, sendMedia },
}),
},
]),
);
const results = await deliverOutboundPayloads({
cfg: {},
channel: "googlechat",
to: "spaces/AAAA",
payloads: [
{ text: "first", channelData: { mode: "custom" } },
{ text: "second", channelData: { mode: "custom" } },
{ text: "third", channelData: { mode: "custom" } },
],
replyToId: "spaces/AAAA/threads/BBBB",
skipQueue: true,
});
expect(sendPayload).toHaveBeenCalledTimes(3);
// All payloads must retain the thread identifier — consuming it after the
// first send would orphan subsequent payloads to the top level.
expect(sendPayload.mock.calls[0]?.[0]?.replyToId).toBe("spaces/AAAA/threads/BBBB");
expect(sendPayload.mock.calls[1]?.[0]?.replyToId).toBe("spaces/AAAA/threads/BBBB");
expect(sendPayload.mock.calls[2]?.[0]?.replyToId).toBe("spaces/AAAA/threads/BBBB");
expect(results).toEqual([
{ channel: "googlechat", messageId: "gc-1" },
{ channel: "googlechat", messageId: "gc-2" },
{ channel: "googlechat", messageId: "gc-3" },
]);
});
it("preserves inherited replyToId across googlechat text payloads (sendText path)", async () => {
const sendText = vi
.fn()
.mockResolvedValueOnce({ channel: "googlechat", messageId: "gc-t1", chatId: "spaces/X" })
.mockResolvedValueOnce({ channel: "googlechat", messageId: "gc-t2", chatId: "spaces/X" });
const sendMedia = vi.fn();
setActivePluginRegistry(
createTestRegistry([
{
pluginId: "googlechat",
source: "test",
plugin: createOutboundTestPlugin({
id: "googlechat",
outbound: { deliveryMode: "direct", sendText, sendMedia },
}),
},
]),
);
const results = await deliverOutboundPayloads({
cfg: {},
channel: "googlechat",
to: "spaces/X",
payloads: [{ text: "chunk one" }, { text: "chunk two" }],
replyToId: "spaces/X/threads/T1",
skipQueue: true,
});
expect(sendText).toHaveBeenCalledTimes(2);
// Both text sends should receive replyToId for thread routing
expect(sendText.mock.calls[0]?.[0]?.replyToId).toBe("spaces/X/threads/T1");
expect(sendText.mock.calls[1]?.[0]?.replyToId).toBe("spaces/X/threads/T1");
expect(results).toHaveLength(2);
});
it("does not consume inherited telegram reply state after an invalid payload-level text override", async () => {
const sendTelegram = vi
.fn()
.mockResolvedValueOnce({ messageId: "tg-1", chatId: "chat-1" })
.mockResolvedValueOnce({ messageId: "tg-2", chatId: "chat-1" });
const cfg: OpenClawConfig = {
channels: { telegram: { botToken: "tok-1", textChunkLimit: 4000 } },
};
const results = await deliverOutboundPayloads({
cfg,
channel: "telegram",
to: "123",
payloads: [{ text: "first", replyToId: "not-a-number" }, { text: "second" }],
replyToId: "777",
deps: { sendTelegram },
skipQueue: true,
});
const firstOpts = sendTelegram.mock.calls[0]?.[2] as { replyToMessageId?: number } | undefined;
const secondOpts = sendTelegram.mock.calls[1]?.[2] as { replyToMessageId?: number } | undefined;
expect(sendTelegram).toHaveBeenCalledTimes(2);
expect(firstOpts?.replyToMessageId).toBeUndefined();
expect(secondOpts?.replyToMessageId).toBe(777);
expect(results).toEqual([
{ channel: "telegram", messageId: "tg-1", chatId: "chat-1" },
{ channel: "telegram", messageId: "tg-2", chatId: "chat-1" },
]);
});
it("does not consume inherited telegram reply state after an invalid payload-level sendPayload override", async () => {
const sendTelegram = vi
.fn()
.mockResolvedValueOnce({ messageId: "tg-1", chatId: "chat-1" })
.mockResolvedValueOnce({ messageId: "tg-2", chatId: "chat-1" });
const cfg: OpenClawConfig = {
channels: { telegram: { botToken: "tok-1", textChunkLimit: 4000 } },
};
const results = await deliverOutboundPayloads({
cfg,
channel: "telegram",
to: "123",
payloads: [
{
text: "first",
replyToId: "not-a-number",
channelData: { telegram: { buttons: [] } },
},
{
text: "second",
channelData: { telegram: { buttons: [] } },
},
],
replyToId: "777",
deps: { sendTelegram },
skipQueue: true,
});
const firstOpts = sendTelegram.mock.calls[0]?.[2] as { replyToMessageId?: number } | undefined;
const secondOpts = sendTelegram.mock.calls[1]?.[2] as { replyToMessageId?: number } | undefined;
expect(sendTelegram).toHaveBeenCalledTimes(2);
expect(firstOpts?.replyToMessageId).toBeUndefined();
expect(secondOpts?.replyToMessageId).toBe(777);
expect(results).toEqual([
{ channel: "telegram", messageId: "tg-1", chatId: "chat-1" },
{ channel: "telegram", messageId: "tg-2", chatId: "chat-1" },
]);
});
it("retries replyToId on later non-signal media payloads after a best-effort failure", async () => {
const sendText = vi.fn();
const sendMedia = vi
.fn()
.mockRejectedValueOnce(new Error("media fail"))
.mockResolvedValueOnce({ channel: "matrix", messageId: "mx-media-2" });
const onError = vi.fn();
setActivePluginRegistry(
createTestRegistry([
{
pluginId: "matrix",
source: "test",
plugin: createOutboundTestPlugin({
id: "matrix",
outbound: { deliveryMode: "direct", sendText, sendMedia },
}),
},
]),
);
const results = await deliverOutboundPayloads({
cfg: {},
channel: "matrix",
to: "!room:1",
payloads: [
{ text: "first", mediaUrl: "https://example.com/1.jpg" },
{ text: "second", mediaUrl: "https://example.com/2.jpg" },
],
replyToId: "orig-msg-id",
bestEffort: true,
onError,
skipQueue: true,
});
expect(sendMedia).toHaveBeenCalledTimes(2);
expect(sendMedia.mock.calls[0]?.[0]?.replyToId).toBe("orig-msg-id");
expect(sendMedia.mock.calls[1]?.[0]?.replyToId).toBe("orig-msg-id");
expect(onError).toHaveBeenCalledTimes(1);
expect(results).toEqual([{ channel: "matrix", messageId: "mx-media-2" }]);
});
});

View File

@ -3,6 +3,7 @@ import { afterEach, beforeAll, beforeEach, describe, expect, it, vi } from "vite
import { markdownToSignalTextChunks } from "../../../extensions/signal/src/format.js";
import {
signalOutbound,
slackOutbound,
telegramOutbound,
whatsappOutbound,
} from "../../../test/channel-outbounds.js";
@ -917,6 +918,155 @@ describe("deliverOutboundPayloads", () => {
expect(sendWhatsApp).not.toHaveBeenCalled();
});
it("does not re-apply Signal reply metadata after partial chunk failure in bestEffort mode", async () => {
const sendSignal = vi
.fn()
.mockResolvedValueOnce({ messageId: "s1", timestamp: 1 })
.mockRejectedValueOnce(new Error("chunk fail"))
.mockResolvedValueOnce({ messageId: "s3", timestamp: 3 });
const onError = vi.fn();
const cfg: OpenClawConfig = {
channels: { signal: { textChunkLimit: 2 } },
};
const results = await deliverOutboundPayloads({
cfg,
channel: "signal",
to: "+1555",
payloads: [{ text: "abcd" }, { text: "ef" }],
replyToId: "1700000000000",
quoteAuthor: "Tester",
deps: { sendSignal },
bestEffort: true,
onError,
});
expect(sendSignal).toHaveBeenCalledTimes(3);
expect(sendSignal).toHaveBeenNthCalledWith(
1,
"+1555",
expect.any(String),
expect.objectContaining({ replyTo: "1700000000000", quoteAuthor: "Tester" }),
);
expect(sendSignal).toHaveBeenNthCalledWith(
2,
"+1555",
expect.any(String),
expect.objectContaining({ replyTo: undefined, quoteAuthor: undefined }),
);
expect(sendSignal).toHaveBeenNthCalledWith(
3,
"+1555",
expect.any(String),
expect.objectContaining({ replyTo: undefined, quoteAuthor: undefined }),
);
expect(onError).toHaveBeenCalledTimes(1);
expect(results).toEqual([
{ channel: "signal", messageId: "s1", timestamp: 1 },
{ channel: "signal", messageId: "s3", timestamp: 3 },
]);
});
it("preserves inherited Signal reply metadata after an explicit malformed replyToId is skipped", async () => {
const sendSignal = vi
.fn()
.mockResolvedValueOnce({ messageId: "s1", timestamp: 1 })
.mockResolvedValueOnce({ messageId: "s2", timestamp: 2 });
await deliverOutboundPayloads({
cfg: { channels: { signal: {} } },
channel: "signal",
to: "+1555",
payloads: [{ text: "first", replyToId: "not-a-timestamp" }, { text: "second" }],
replyToId: "1700000000000",
quoteAuthor: "uuid:sender-1",
deps: { sendSignal },
});
expect(sendSignal).toHaveBeenCalledTimes(2);
expect(sendSignal).toHaveBeenNthCalledWith(
1,
"+1555",
"first",
expect.objectContaining({ replyTo: "not-a-timestamp", quoteAuthor: "uuid:sender-1" }),
);
expect(sendSignal).toHaveBeenNthCalledWith(
2,
"+1555",
"second",
expect.objectContaining({ replyTo: "1700000000000", quoteAuthor: "uuid:sender-1" }),
);
});
it("preserves inherited Signal group reply metadata when quoteAuthor is unavailable", async () => {
const sendSignal = vi
.fn()
.mockResolvedValueOnce({ messageId: "s1", timestamp: 1 })
.mockResolvedValueOnce({ messageId: "s2", timestamp: 2 });
await deliverOutboundPayloads({
cfg: { channels: { signal: {} } },
channel: "signal",
to: "group:test-group",
payloads: [{ text: "first" }, { text: "second" }],
replyToId: "1700000000000",
deps: { sendSignal },
});
expect(sendSignal).toHaveBeenCalledTimes(2);
expect(sendSignal).toHaveBeenNthCalledWith(
1,
"group:test-group",
"first",
expect.objectContaining({ replyTo: "1700000000000", quoteAuthor: undefined }),
);
expect(sendSignal).toHaveBeenNthCalledWith(
2,
"group:test-group",
"second",
expect.objectContaining({ replyTo: "1700000000000", quoteAuthor: undefined }),
);
});
it("keeps inherited Slack thread context across all payloads", async () => {
const sendSlack = vi
.fn()
.mockResolvedValueOnce({ messageId: "sl1", channelId: "C123" })
.mockResolvedValueOnce({ messageId: "sl2", channelId: "C123" });
setActivePluginRegistry(
createTestRegistry([
{
pluginId: "slack",
plugin: createOutboundTestPlugin({ id: "slack", outbound: slackOutbound }),
source: "test",
},
]),
);
await deliverOutboundPayloads({
cfg: { channels: { slack: {} } },
channel: "slack",
to: "C123",
payloads: [{ text: "first" }, { text: "second" }],
replyToId: "thread-123",
deps: { sendSlack },
});
expect(sendSlack).toHaveBeenCalledTimes(2);
expect(sendSlack).toHaveBeenNthCalledWith(
1,
"C123",
"first",
expect.objectContaining({ threadTs: "thread-123" }),
);
expect(sendSlack).toHaveBeenNthCalledWith(
2,
"C123",
"second",
expect.objectContaining({ threadTs: "thread-123" }),
);
});
it("passes normalized payload to onError", async () => {
const sendWhatsApp = vi.fn().mockRejectedValue(new Error("boom"));
const onError = vi.fn();

View File

@ -2,6 +2,15 @@ import {
resolveSendableOutboundReplyParts,
sendMediaWithLeadingCaption,
} from "openclaw/plugin-sdk/reply-payload";
import {
markdownToSignalTextChunks,
type SignalTextStyleRange,
} from "../../../extensions/signal/src/format.js";
import {
isSignalGroupTarget,
resolveSignalQuoteMetadata,
} from "../../../extensions/signal/src/reply-quote.js";
import { sendMessageSignal } from "../../../extensions/signal/src/send.js";
import {
chunkByParagraph,
chunkMarkdownTextWithMode,
@ -9,12 +18,14 @@ import {
resolveTextChunkLimit,
} from "../../auto-reply/chunk.js";
import type { ReplyPayload } from "../../auto-reply/types.js";
import { resolveChannelMediaMaxBytes } from "../../channels/plugins/media-limits.js";
import { loadChannelOutboundAdapter } from "../../channels/plugins/outbound/load.js";
import type {
ChannelOutboundAdapter,
ChannelOutboundContext,
} from "../../channels/plugins/types.js";
import type { OpenClawConfig } from "../../config/config.js";
import { resolveMarkdownTableMode } from "../../config/markdown-tables.js";
import {
appendAssistantMessageToSessionTranscript,
resolveMirroredTranscriptText,
@ -38,6 +49,7 @@ import type { OutboundIdentity } from "./identity.js";
import type { DeliveryMirror } from "./mirror.js";
import type { NormalizedOutboundPayload } from "./payloads.js";
import { normalizeReplyPayloadsForDelivery } from "./payloads.js";
import { readReplyApplied } from "./reply-applied.js";
import { isPlainTextSurface, sanitizeForPlainText } from "./sanitize-text.js";
import { resolveOutboundSendDep, type OutboundSendDeps } from "./send-deps.js";
import type { OutboundSessionContext } from "./session-context.js";
@ -118,6 +130,7 @@ type ChannelHandlerParams = {
to: string;
accountId?: string;
replyToId?: string | null;
quoteAuthor?: string | null;
threadId?: string | number | null;
identity?: OutboundIdentity;
deps?: OutboundSendDeps;
@ -161,8 +174,9 @@ function createPluginHandler(
threadId?: string | number | null;
}): Omit<ChannelOutboundContext, "text" | "mediaUrl"> => ({
...baseCtx,
replyToId: overrides?.replyToId ?? baseCtx.replyToId,
threadId: overrides?.threadId ?? baseCtx.threadId,
// Preserve explicit null overrides so later payloads can suppress inherited reply/thread metadata.
replyToId: overrides && "replyToId" in overrides ? overrides.replyToId : baseCtx.replyToId,
threadId: overrides && "threadId" in overrides ? overrides.threadId : baseCtx.threadId,
});
return {
chunker,
@ -223,6 +237,7 @@ function createPluginHandler(
return sendText({
...resolveCtx(overrides),
text: caption,
mediaUrl,
});
},
};
@ -236,6 +251,7 @@ function createChannelOutboundContextBase(
to: params.to,
accountId: params.accountId,
replyToId: params.replyToId,
quoteAuthor: params.quoteAuthor,
threadId: params.threadId,
identity: params.identity,
gifPlayback: params.gifPlayback,
@ -255,6 +271,7 @@ type DeliverOutboundPayloadsCoreParams = {
accountId?: string;
payloads: ReplyPayload[];
replyToId?: string | null;
quoteAuthor?: string | null;
threadId?: string | number | null;
identity?: OutboundIdentity;
deps?: OutboundSendDeps;
@ -541,6 +558,8 @@ async function deliverOutboundPayloadsCore(
const accountId = params.accountId;
const deps = params.deps;
const abortSignal = params.abortSignal;
const sendSignal =
resolveOutboundSendDep<typeof sendMessageSignal>(params.deps, "signal") ?? sendMessageSignal;
const mediaLocalRoots = getAgentScopedMediaLocalRoots(
cfg,
params.session?.agentId ?? params.mirror?.agentId,
@ -553,6 +572,7 @@ async function deliverOutboundPayloadsCore(
deps,
accountId,
replyToId: params.replyToId,
quoteAuthor: params.quoteAuthor,
threadId: params.threadId,
identity: params.identity,
gifPlayback: params.gifPlayback,
@ -569,6 +589,20 @@ async function deliverOutboundPayloadsCore(
? handler.resolveEffectiveTextChunkLimit(configuredTextLimit)
: configuredTextLimit;
const chunkMode = handler.chunker ? resolveChunkMode(cfg, channel, accountId) : "length";
const isSignalChannel = channel === "signal";
const signalIsGroupTarget = isSignalChannel && isSignalGroupTarget(to);
const signalTableMode = isSignalChannel
? resolveMarkdownTableMode({ cfg, channel: "signal", accountId })
: "code";
const signalMaxBytes = isSignalChannel
? resolveChannelMediaMaxBytes({
cfg,
resolveChannelLimitMb: ({ cfg, accountId }) =>
cfg.channels?.signal?.accounts?.[accountId]?.mediaMaxMb ??
cfg.channels?.signal?.mediaMaxMb,
accountId,
})
: undefined;
const sendTextChunks = async (
text: string,
@ -608,6 +642,81 @@ async function deliverOutboundPayloadsCore(
}
};
const normalizedPayloads = normalizePayloadsForChannelDelivery(payloads, channel, handler);
const sendSignalText = async (
text: string,
styles: SignalTextStyleRange[],
replyTo?: string,
quoteAuthor?: string,
) => {
throwIfAborted(abortSignal);
return {
channel: "signal" as const,
...(await sendSignal(to, text, {
cfg,
maxBytes: signalMaxBytes,
accountId: accountId ?? undefined,
textMode: "plain",
textStyles: styles,
replyTo,
quoteAuthor,
})),
};
};
const sendSignalTextChunks = async (text: string, replyTo?: string, quoteAuthor?: string) => {
throwIfAborted(abortSignal);
let signalChunks =
textLimit === undefined
? markdownToSignalTextChunks(text, Number.POSITIVE_INFINITY, {
tableMode: signalTableMode,
})
: markdownToSignalTextChunks(text, textLimit, { tableMode: signalTableMode });
if (signalChunks.length === 0 && text) {
signalChunks = [{ text, styles: [] }];
}
let first = true;
for (const chunk of signalChunks) {
throwIfAborted(abortSignal);
results.push(
await sendSignalText(
chunk.text,
chunk.styles,
first ? replyTo : undefined,
first ? quoteAuthor : undefined,
),
);
first = false;
}
};
const sendSignalMedia = async (
caption: string,
mediaUrl: string,
replyTo?: string,
quoteAuthor?: string,
) => {
throwIfAborted(abortSignal);
const formatted = markdownToSignalTextChunks(caption, Number.POSITIVE_INFINITY, {
tableMode: signalTableMode,
})[0] ?? {
text: caption,
styles: [],
};
return {
channel: "signal" as const,
...(await sendSignal(to, formatted.text, {
cfg,
mediaUrl,
maxBytes: signalMaxBytes,
accountId: accountId ?? undefined,
textMode: "plain",
textStyles: formatted.styles,
mediaLocalRoots,
replyTo,
quoteAuthor,
})),
};
};
const hookRunner = getGlobalHookRunner();
const sessionKeyForInternalHooks = params.mirror?.sessionKey ?? params.session?.key;
const mirrorIsGroup = params.mirror?.isGroup;
@ -632,6 +741,84 @@ async function deliverOutboundPayloadsCore(
},
);
}
let replyConsumed = false;
// Slack, Mattermost, and Google Chat use replyToId as persistent thread
// context (thread_ts, rootId, and threadName respectively) that must survive
// across all payloads. Never consume inherited reply state for thread-based
// channels.
const isThreadBasedChannel =
channel === "slack" || channel === "mattermost" || channel === "googlechat";
const shouldConsumeReplyAfterSend = (replyTo: string | undefined) => {
if (!replyTo) {
return false;
}
// Thread-based channels use replyToId for thread routing, not one-shot
// quoting — consuming it would orphan subsequent chunks from the thread.
if (isThreadBasedChannel) {
return false;
}
if (!isSignalChannel) {
return true;
}
// Signal silently drops malformed quote timestamps and group quotes without a
// quote-author, so only consume inherited reply state when the send can carry
// quote metadata on the wire.
return (
resolveSignalQuoteMetadata({
replyToId: replyTo,
quoteAuthor: params.quoteAuthor,
isGroup: signalIsGroupTarget,
}).quoteTimestamp !== undefined
);
};
const didSendApplyReply = <T>(
resultCountBeforeSend: number,
value?: T,
sent?: (value: T) => boolean,
) => {
const valueFlag = value === undefined ? undefined : readReplyApplied(value);
if (valueFlag !== undefined) {
return valueFlag;
}
const resultFlags = results
.slice(resultCountBeforeSend)
.map((result) => readReplyApplied(result))
.filter((flag): flag is boolean => flag !== undefined);
if (resultFlags.length > 0) {
return resultFlags.some(Boolean);
}
return value !== undefined && sent ? sent(value) : results.length > resultCountBeforeSend;
};
const markReplyConsumedIfSendSucceeded = (
replyTo: string | undefined,
resultCountBeforeSend: number,
) => {
if (shouldConsumeReplyAfterSend(replyTo) && didSendApplyReply(resultCountBeforeSend)) {
replyConsumed = true;
}
};
const trackReplyConsumption = async <T>(
replyTo: string | undefined,
send: () => Promise<T>,
sent?: (value: T) => boolean,
): Promise<T> => {
const resultCountBeforeSend = results.length;
try {
const value = await send();
if (
shouldConsumeReplyAfterSend(replyTo) &&
didSendApplyReply(resultCountBeforeSend, value, sent)
) {
replyConsumed = true;
}
return value;
} catch (err) {
// Best-effort delivery should only consume reply metadata once a quoted send
// actually succeeded, even if a later chunk/send in the same payload fails.
markReplyConsumedIfSendSucceeded(replyTo, resultCountBeforeSend);
throw err;
}
};
for (const payload of normalizedPayloads) {
let payloadSummary = buildPayloadSummary(payload);
try {
@ -654,10 +841,37 @@ async function deliverOutboundPayloadsCore(
payloadSummary = hookResult.payloadSummary;
params.onPayload?.(payloadSummary);
const sendOverrides = {
replyToId: effectivePayload.replyToId ?? params.replyToId ?? undefined,
// Treat null as an explicit "do not reply" override so payload-level suppression
// is preserved instead of falling back to inherited reply metadata.
// NOTE: Many payload builders emit replyToId: undefined. Treat that as "no opinion"
// so the inherited reply metadata can still apply.
const explicitPayloadReplyTo =
"replyToId" in effectivePayload && effectivePayload.replyToId !== undefined
? effectivePayload.replyToId
: undefined;
const inheritedReplyTo =
explicitPayloadReplyTo === undefined ? (params.replyToId ?? undefined) : undefined;
const effectiveReplyTo =
explicitPayloadReplyTo != null
? explicitPayloadReplyTo
: !replyConsumed
? inheritedReplyTo
: undefined;
// NOTE: quoteAuthor is derived from the top-level params, not resolved per-payload.
// The normal Signal inbound→outbound path (monitor.ts:deliverReplies) resolves
// quoteAuthor per-payload via resolveQuoteAuthor(effectiveReplyTo). If a future
// caller uses deliverOutboundPayloads directly with per-payload replyToId overrides,
// it should supply its own quoteAuthor resolution or extend this logic.
const effectiveQuoteAuthor = effectiveReplyTo ? (params.quoteAuthor ?? undefined) : undefined;
const effectiveSendOverrides = {
threadId: params.threadId ?? undefined,
forceDocument: params.forceDocument,
replyToId:
explicitPayloadReplyTo !== undefined // explicit payload override (string or null)
? (explicitPayloadReplyTo ?? undefined) // null → undefined for the wire format
: inheritedReplyTo !== undefined
? (effectiveReplyTo ?? null)
: undefined,
};
if (
handler.sendPayload &&
@ -666,7 +880,29 @@ async function deliverOutboundPayloadsCore(
channelData: effectivePayload.channelData,
})
) {
const delivery = await handler.sendPayload(effectivePayload, sendOverrides);
// Materialize the resolved replyToId on the payload so downstream handlers see
// the correct value:
// - string: reply applied
// - null: reply metadata was inherited but has been consumed (explicitly no reply)
// - undefined: no reply metadata was ever present (no opinion)
const resolvedReplyToId =
effectiveReplyTo !== undefined
? effectiveReplyTo
: inheritedReplyTo !== undefined
? null
: undefined;
const resolvedPayload = { ...effectivePayload, replyToId: resolvedReplyToId };
const delivery = await trackReplyConsumption(
effectiveReplyTo,
() =>
handler.sendPayload!(
resolvedPayload as typeof effectivePayload,
effectiveSendOverrides,
),
// Any non-undefined return counts as "sent" — channels that don't return
// messageId (e.g. matrix) should still consume the reply indicator.
(value) => value !== undefined,
);
results.push(delivery);
emitMessageSent({
success: true,
@ -675,12 +911,19 @@ async function deliverOutboundPayloadsCore(
});
continue;
}
if (payloadSummary.mediaUrls.length === 0) {
const beforeCount = results.length;
if (handler.sendFormattedText) {
results.push(...(await handler.sendFormattedText(payloadSummary.text, sendOverrides)));
if (isSignalChannel) {
await trackReplyConsumption(effectiveReplyTo, () =>
sendSignalTextChunks(payloadSummary.text, effectiveReplyTo, effectiveQuoteAuthor),
);
} else if (handler.sendFormattedText) {
results.push(...(await handler.sendFormattedText(payloadSummary.text, effectiveSendOverrides)));
} else {
await sendTextChunks(payloadSummary.text, sendOverrides);
await trackReplyConsumption(effectiveReplyTo, () =>
sendTextChunks(payloadSummary.text, effectiveSendOverrides),
);
}
const messageId = results.at(-1)?.messageId;
emitMessageSent({
@ -707,7 +950,9 @@ async function deliverOutboundPayloadsCore(
);
}
const beforeCount = results.length;
await sendTextChunks(fallbackText, sendOverrides);
await trackReplyConsumption(effectiveReplyTo, () =>
sendTextChunks(fallbackText, effectiveSendOverrides),
);
const messageId = results.at(-1)?.messageId;
emitMessageSent({
success: results.length > beforeCount,
@ -717,26 +962,46 @@ async function deliverOutboundPayloadsCore(
continue;
}
let first = true;
let lastMessageId: string | undefined;
await sendMediaWithLeadingCaption({
mediaUrls: payloadSummary.mediaUrls,
caption: payloadSummary.text,
send: async ({ mediaUrl, caption }) => {
throwIfAborted(abortSignal);
if (handler.sendFormattedMedia) {
const delivery = await handler.sendFormattedMedia(
await trackReplyConsumption(effectiveReplyTo, async () => {
if (isSignalChannel) {
for (const url of payloadSummary.mediaUrls) {
throwIfAborted(abortSignal);
const caption = first ? payloadSummary.text : "";
const mediaReplyTo = first ? effectiveReplyTo : undefined;
const mediaQuoteAuthor = first ? effectiveQuoteAuthor : undefined;
const delivery = await sendSignalMedia(caption, url, mediaReplyTo, mediaQuoteAuthor);
results.push(delivery);
lastMessageId = delivery.messageId;
first = false;
}
return;
}
await sendMediaWithLeadingCaption({
mediaUrls: payloadSummary.mediaUrls,
caption: payloadSummary.text,
send: async ({ mediaUrl, caption }) => {
throwIfAborted(abortSignal);
if (handler.sendFormattedMedia) {
const delivery = await handler.sendFormattedMedia(
caption ?? "",
mediaUrl,
effectiveSendOverrides,
);
results.push(delivery);
lastMessageId = delivery.messageId;
return;
}
const delivery = await handler.sendMedia(
caption ?? "",
mediaUrl,
sendOverrides,
effectiveSendOverrides,
);
results.push(delivery);
lastMessageId = delivery.messageId;
return;
}
const delivery = await handler.sendMedia(caption ?? "", mediaUrl, sendOverrides);
results.push(delivery);
lastMessageId = delivery.messageId;
},
},
});
});
emitMessageSent({
success: true,

View File

@ -67,6 +67,8 @@ export function normalizeReplyPayloadsForDelivery(
);
const hasMultipleMedia = (explicitMediaUrls?.length ?? 0) > 1;
const resolvedMediaUrl = hasMultipleMedia ? undefined : explicitMediaUrl;
const resolvedReplyToId =
payload.replyToId === null ? null : (payload.replyToId ?? parsed.replyToId);
const next: ReplyPayload = {
...payload,
text:
@ -76,7 +78,7 @@ export function normalizeReplyPayloadsForDelivery(
}) ?? "",
mediaUrls: mergedMedia.length ? mergedMedia : undefined,
mediaUrl: resolvedMediaUrl,
replyToId: payload.replyToId ?? parsed.replyToId,
...(resolvedReplyToId !== undefined ? { replyToId: resolvedReplyToId } : {}),
replyToTag: payload.replyToTag || parsed.replyToTag,
replyToCurrent: payload.replyToCurrent || parsed.replyToCurrent,
audioAsVoice: Boolean(payload.audioAsVoice || parsed.audioAsVoice),

View File

@ -0,0 +1,13 @@
const replyAppliedMarkers = new WeakMap<object, boolean>();
export function markReplyApplied<T extends object>(value: T, applied: boolean): T {
replyAppliedMarkers.set(value, applied);
return value;
}
export function readReplyApplied(value: unknown): boolean | undefined {
if (!value || typeof value !== "object") {
return undefined;
}
return replyAppliedMarkers.get(value);
}

View File

@ -7,7 +7,8 @@ export type OutboundReplyPayload = {
text?: string;
mediaUrls?: string[];
mediaUrl?: string;
replyToId?: string;
// null explicitly suppresses inherited reply metadata (distinct from undefined = "not set")
replyToId?: string | null;
};
export type SendableOutboundReplyParts = {
@ -38,7 +39,12 @@ export function normalizeOutboundReplyPayload(
)
: undefined;
const mediaUrl = typeof payload.mediaUrl === "string" ? payload.mediaUrl : undefined;
const replyToId = typeof payload.replyToId === "string" ? payload.replyToId : undefined;
const replyToId =
typeof payload.replyToId === "string"
? payload.replyToId
: payload.replyToId === null
? null
: undefined;
return {
text,
mediaUrls,

View File

@ -1,3 +1,4 @@
import { execFileSync } from "node:child_process";
import fs from "node:fs";
import os from "node:os";
import path from "node:path";
@ -3262,7 +3263,7 @@ module.exports = {
expect(record?.status).toBe("loaded");
});
it("supports legacy plugins importing monolithic plugin-sdk root", async () => {
it("supports legacy plugins importing monolithic plugin-sdk root", () => {
useNoBundledPlugins();
const plugin = writePlugin({
id: "legacy-root-import",

28
tasks/todo.md Normal file
View File

@ -0,0 +1,28 @@
# Signal quote reply fix
- [x] Inspect current reply consumption and Signal quote validation flow
- [x] Implement fix so invalid explicit Signal reply ids do not consume inherited reply state
- [x] Add or update regression tests for outbound delivery and Signal send behavior
- [x] Run targeted verification for touched test files and broader related tests if quick
- [x] Review diff, update this file with results, and commit/push changes
## Notes
- Status: pushed
- Shared Signal quote metadata validation now drives:
- Signal send param construction
- outbound reply consumption
- monitor reply consumption
- Regression coverage added for:
- malformed explicit Signal `replyToId` preserving inherited reply state
- group replies without `quoteAuthor` preserving inherited reply state
- direct-message valid numeric replies still emitting `quote-timestamp`
- existing partial-chunk failure case using a valid Signal timestamp fixture
- Verification:
- `pnpm exec vitest run --config vitest.unit.config.ts src/infra/outbound/deliver.test.ts src/signal/send.test.ts src/signal/monitor/reply-delivery.test.ts src/signal/monitor/event-handler.quote.test.ts`
- `pnpm exec vitest run --config vitest.unit.config.ts src/signal/*.test.ts src/signal/monitor/*.test.ts`
- `pnpm exec oxfmt --check src/infra/outbound/deliver.ts src/infra/outbound/deliver.test.ts src/signal/send.ts src/signal/send.test.ts src/signal/monitor.ts src/signal/monitor/reply-delivery.ts src/signal/monitor/reply-delivery.test.ts src/signal/reply-quote.ts tasks/todo.md`
- Commit:
- `a1e5c2966` `Signal: preserve inherited quote state`
- Push:
- `git push origin fix/signal-quote-reply`