fix(signal): complete bidirectional quote-reply support

Outbound:
- Map [[reply_to:]] tag to signal-cli quote-timestamp + quote-author RPC params
- Validate reply IDs are decimal-only (reject hex/scientific notation)
- Track quote consumption across dispatcher callbacks (turnReplyConsumed flag)
- Strip replyToId from payload after first delivery; prevents duplicate quote bubbles
- Preserve explicit null as reply suppression signal (distinct from unset)
- Materialize resolved replyToId on sendPayload payloads for downstream correctness
- Handle sendPayload partial failure: only consume on valid messageId

Inbound:
- Surface ReplyToId, ReplyToBody, ReplyToSender, ReplyToIsQuote on MsgContext
- Resolve quoted authors from LRU cache (uuid: prefix normalization for signal-cli)
- Preserve quote metadata through debounce coalescing
- Index skipped group messages for later author resolution
- Drop wrong-person fallback for unresolvable explicit reply IDs

Type safety:
- Extend OutboundReplyPayload.replyToId to string | null | undefined
- Fix null propagation in googlechat, irc, nextcloud-talk, imessage, telegram adapters
- Fix Feishu SDK type errors (timeout not in call param types)

Requires patched signal-cli at /opt/signal-cli-0.14.1-patched/ until
upstream publishes lib v140 to JitPack.
This commit is contained in:
Codex CLI Audit 2026-03-06 16:43:15 -04:00 committed by Joey Krug
parent 0a842de354
commit 3b4866f57a
29 changed files with 1942 additions and 75 deletions

View File

@ -368,7 +368,7 @@ async function downloadAttachment(
}
async function deliverGoogleChatReply(params: {
payload: { text?: string; mediaUrls?: string[]; mediaUrl?: string; replyToId?: string };
payload: { text?: string; mediaUrls?: string[]; mediaUrl?: string; replyToId?: string | null };
account: ResolvedGoogleChatAccount;
spaceId: string;
runtime: GoogleChatRuntimeEnv;
@ -462,7 +462,7 @@ async function deliverGoogleChatReply(params: {
account,
space: spaceId,
text: caption,
thread: payload.replyToId,
thread: payload.replyToId ?? undefined,
attachments: [
{ attachmentUploadToken: upload.attachmentUploadToken, contentName: loaded.fileName },
],

View File

@ -50,7 +50,7 @@ export async function deliverReplies(params: {
maxBytes,
client,
accountId,
replyToId: payload.replyToId,
replyToId: payload.replyToId ?? undefined,
});
sentMessageCache?.remember(scope, { text: chunk, messageId: sent.messageId });
},
@ -60,7 +60,7 @@ export async function deliverReplies(params: {
maxBytes,
client,
accountId,
replyToId: payload.replyToId,
replyToId: payload.replyToId ?? undefined,
});
sentMessageCache?.remember(scope, {
text: caption || undefined,

View File

@ -70,6 +70,8 @@ async function sendSignalOutbound(params: {
mediaUrl?: string;
mediaLocalRoots?: readonly string[];
accountId?: string;
replyToId?: string | null;
quoteAuthor?: string | null;
deps?: { [channelId: string]: unknown };
}) {
const { send, maxBytes } = resolveSignalSendContext(params);
@ -79,6 +81,8 @@ async function sendSignalOutbound(params: {
...(params.mediaLocalRoots?.length ? { mediaLocalRoots: params.mediaLocalRoots } : {}),
maxBytes,
accountId: params.accountId ?? undefined,
replyTo: params.replyToId ?? undefined,
quoteAuthor: params.quoteAuthor ?? undefined,
});
}
@ -194,6 +198,8 @@ async function sendFormattedSignalText(ctx: {
to: string;
text: string;
accountId?: string | null;
replyToId?: string | null;
quoteAuthor?: string | null;
deps?: { [channelId: string]: unknown };
abortSignal?: AbortSignal;
}) {
@ -218,6 +224,7 @@ async function sendFormattedSignalText(ctx: {
chunks = [{ text: ctx.text, styles: [] }];
}
const results = [];
let first = true;
for (const chunk of chunks) {
ctx.abortSignal?.throwIfAborted();
const result = await send(ctx.to, chunk.text, {
@ -226,8 +233,11 @@ async function sendFormattedSignalText(ctx: {
accountId: ctx.accountId ?? undefined,
textMode: "plain",
textStyles: chunk.styles,
replyTo: first ? (ctx.replyToId ?? undefined) : undefined,
quoteAuthor: first ? (ctx.quoteAuthor ?? undefined) : undefined,
});
results.push(result);
first = false;
}
return attachChannelToResults("signal", results);
}
@ -239,6 +249,8 @@ async function sendFormattedSignalMedia(ctx: {
mediaUrl: string;
mediaLocalRoots?: readonly string[];
accountId?: string | null;
replyToId?: string | null;
quoteAuthor?: string | null;
deps?: { [channelId: string]: unknown };
abortSignal?: AbortSignal;
}) {
@ -267,6 +279,8 @@ async function sendFormattedSignalMedia(ctx: {
accountId: ctx.accountId ?? undefined,
textMode: "plain",
textStyles: formatted.styles,
replyTo: ctx.replyToId ?? undefined,
quoteAuthor: ctx.quoteAuthor ?? undefined,
});
return attachChannelToResult("signal", result);
}
@ -315,12 +329,23 @@ export const signalPlugin: ChannelPlugin<ResolvedSignalAccount> = {
chunker: (text, limit) => getSignalRuntime().channel.text.chunkText(text, limit),
chunkerMode: "text",
textChunkLimit: 4000,
sendFormattedText: async ({ cfg, to, text, accountId, deps, abortSignal }) =>
sendFormattedText: async ({
cfg,
to,
text,
accountId,
deps,
abortSignal,
replyToId,
quoteAuthor,
}) =>
await sendFormattedSignalText({
cfg,
to,
text,
accountId,
replyToId,
quoteAuthor,
deps,
abortSignal,
}),
@ -333,6 +358,8 @@ export const signalPlugin: ChannelPlugin<ResolvedSignalAccount> = {
accountId,
deps,
abortSignal,
replyToId,
quoteAuthor,
}) =>
await sendFormattedSignalMedia({
cfg,
@ -341,20 +368,34 @@ export const signalPlugin: ChannelPlugin<ResolvedSignalAccount> = {
mediaUrl,
mediaLocalRoots,
accountId,
replyToId,
quoteAuthor,
deps,
abortSignal,
}),
...createAttachedChannelResultAdapter({
channel: "signal",
sendText: async ({ cfg, to, text, accountId, deps }) =>
sendText: async ({ cfg, to, text, accountId, deps, replyToId, quoteAuthor }) =>
await sendSignalOutbound({
cfg,
to,
text,
accountId: accountId ?? undefined,
replyToId,
quoteAuthor,
deps,
}),
sendMedia: async ({ cfg, to, text, mediaUrl, mediaLocalRoots, accountId, deps }) =>
sendMedia: async ({
cfg,
to,
text,
mediaUrl,
mediaLocalRoots,
accountId,
deps,
replyToId,
quoteAuthor,
}) =>
await sendSignalOutbound({
cfg,
to,
@ -362,6 +403,8 @@ export const signalPlugin: ChannelPlugin<ResolvedSignalAccount> = {
mediaUrl,
mediaLocalRoots,
accountId: accountId ?? undefined,
replyToId,
quoteAuthor,
deps,
}),
}),

View File

@ -10,10 +10,6 @@ import type { BackoffPolicy } from "openclaw/plugin-sdk/infra-runtime";
import { waitForTransportReady } from "openclaw/plugin-sdk/infra-runtime";
import { saveMediaBuffer } from "openclaw/plugin-sdk/media-runtime";
import { DEFAULT_GROUP_HISTORY_LIMIT, type HistoryEntry } from "openclaw/plugin-sdk/reply-history";
import {
deliverTextOrMediaReply,
resolveSendableOutboundReplyParts,
} from "openclaw/plugin-sdk/reply-payload";
import type { ReplyPayload } from "openclaw/plugin-sdk/reply-runtime";
import {
chunkTextWithMode,
@ -33,6 +29,11 @@ import type {
SignalReactionMessage,
SignalReactionTarget,
} from "./monitor/event-handler.types.js";
import {
markSignalReplyConsumed,
resolveSignalReplyDelivery,
type SignalReplyDeliveryState,
} from "./monitor/reply-delivery.js";
import { sendMessageSignal } from "./send.js";
import { runSignalSseLoop } from "./sse-reconnect.js";
@ -299,36 +300,63 @@ async function deliverReplies(params: {
maxBytes: number;
textLimit: number;
chunkMode: "length" | "newline";
inheritedReplyToId?: string;
replyDeliveryState?: SignalReplyDeliveryState;
resolveQuoteAuthor?: (replyToId: string) => string | undefined;
}) {
const { replies, target, baseUrl, account, accountId, runtime, maxBytes, textLimit, chunkMode } =
params;
for (const payload of replies) {
const reply = resolveSendableOutboundReplyParts(payload);
const delivered = await deliverTextOrMediaReply({
const { payload: resolvedPayload, effectiveReplyTo } = resolveSignalReplyDelivery({
payload,
text: reply.text,
chunkText: (value) => chunkTextWithMode(value, textLimit, chunkMode),
sendText: async (chunk) => {
inheritedReplyToId: params.inheritedReplyToId,
state: params.replyDeliveryState,
});
const effectiveQuoteAuthor = effectiveReplyTo
? params.resolveQuoteAuthor?.(effectiveReplyTo)
: undefined;
const text = resolvedPayload.text ?? "";
const resolvedMediaList =
resolvedPayload.mediaUrls ?? (resolvedPayload.mediaUrl ? [resolvedPayload.mediaUrl] : []);
if (!text && resolvedMediaList.length === 0) {
continue;
}
if (resolvedMediaList.length === 0) {
let first = true;
for (const chunk of chunkTextWithMode(text, textLimit, chunkMode)) {
await sendMessageSignal(target, chunk, {
baseUrl,
account,
maxBytes,
accountId,
replyTo: first ? effectiveReplyTo : undefined,
quoteAuthor: first ? effectiveQuoteAuthor : undefined,
});
},
sendMedia: async ({ mediaUrl, caption }) => {
await sendMessageSignal(target, caption ?? "", {
if (first) {
markSignalReplyConsumed(params.replyDeliveryState, effectiveReplyTo);
}
first = false;
}
} else {
let first = true;
for (const url of resolvedMediaList) {
const caption = first ? text : "";
await sendMessageSignal(target, caption, {
baseUrl,
account,
mediaUrl,
maxBytes,
accountId,
replyTo: first ? effectiveReplyTo : undefined,
quoteAuthor: first ? effectiveQuoteAuthor : undefined,
});
},
});
if (delivered !== "empty") {
runtime.log?.(`delivered reply to ${target}`);
if (first) {
markSignalReplyConsumed(params.replyDeliveryState, effectiveReplyTo);
}
first = false;
}
}
runtime.log?.(`delivered reply to ${target}`);
}
}

View File

@ -1,4 +1,5 @@
import { beforeEach, describe, expect, it, vi } from "vitest";
import { buildInboundUserContextPrefix } from "../../../../src/auto-reply/reply/inbound-meta.js";
import type { MsgContext } from "../../../../src/auto-reply/templating.js";
import { expectChannelInboundContextContract as expectInboundContextContract } from "../../../../src/channels/plugins/contracts/suites.js";
import { createSignalEventHandler } from "./event-handler.js";
@ -206,6 +207,138 @@ describe("signal createSignalEventHandler inbound context", () => {
expect(capture.ctx?.MediaTypes).toEqual(["image/jpeg", "application/octet-stream"]);
});
it("surfaces quoted reply context in the agent-visible metadata block", async () => {
const handler = createSignalEventHandler(
createBaseSignalEventHandlerDeps({
cfg: {
messages: { inbound: { debounceMs: 0 } },
channels: { signal: { dmPolicy: "open", allowFrom: ["*"] } },
},
historyLimit: 0,
}),
);
await handler(
createSignalReceiveEvent({
sourceNumber: "+15550002222",
sourceName: "Bob",
timestamp: 1700000000001,
dataMessage: {
message: "thanks",
quote: {
id: 1700000000000,
authorNumber: "+15550003333",
text: " sent the details",
mentions: [{ number: "+15550004444", start: 0, length: 1 }],
},
attachments: [],
},
}),
);
expect(capture.ctx).toBeTruthy();
expect(capture.ctx?.ReplyToId).toBe("1700000000000");
expect(capture.ctx?.ReplyToSender).toBe("+15550003333");
expect(capture.ctx?.ReplyToBody).toBe("@+15550004444 sent the details");
expect(capture.ctx?.ReplyToIsQuote).toBe(true);
expect(String(capture.ctx?.Body ?? "")).toContain("[Quoting +15550003333 id:1700000000000]");
const userContext = buildInboundUserContextPrefix(capture.ctx!);
expect(userContext).toContain("Replied message (untrusted, for context):");
expect(userContext).toContain('"sender_label": "+15550003333"');
expect(userContext).toContain('"body": "@+15550004444 sent the details"');
});
it("keeps quote-only messages when the user sends no new text", async () => {
const handler = createSignalEventHandler(
createBaseSignalEventHandlerDeps({
cfg: {
messages: { inbound: { debounceMs: 0 } },
channels: { signal: { dmPolicy: "open", allowFrom: ["*"] } },
},
historyLimit: 0,
}),
);
await handler(
createSignalReceiveEvent({
dataMessage: {
message: "",
quote: {
id: 1700000000000,
text: "original context",
},
attachments: [],
},
}),
);
expect(capture.ctx).toBeTruthy();
expect(capture.ctx?.RawBody).toBe("");
expect(capture.ctx?.ReplyToBody).toBe("original context");
expect(String(capture.ctx?.Body ?? "")).toContain('"original context"');
});
it("uses quoted attachment metadata for media-only quoted replies", async () => {
const handler = createSignalEventHandler(
createBaseSignalEventHandlerDeps({
cfg: {
messages: { inbound: { debounceMs: 0 } },
channels: { signal: { dmPolicy: "open", allowFrom: ["*"] } },
},
historyLimit: 0,
}),
);
await handler(
createSignalReceiveEvent({
dataMessage: {
message: "nice one",
quote: {
id: 1700000000000,
attachments: [{ contentType: "image/jpeg", filename: "photo.jpg" }],
},
attachments: [],
},
}),
);
expect(capture.ctx).toBeTruthy();
expect(capture.ctx?.ReplyToId).toBe("1700000000000");
expect(capture.ctx?.ReplyToBody).toBe("<media:image>");
expect(String(capture.ctx?.Body ?? "")).toContain('"<media:image>"');
});
it("ignores invalid quote ids while preserving the quoted body context", async () => {
const handler = createSignalEventHandler(
createBaseSignalEventHandlerDeps({
cfg: {
messages: { inbound: { debounceMs: 0 } },
channels: { signal: { dmPolicy: "open", allowFrom: ["*"] } },
},
historyLimit: 0,
}),
);
await handler(
createSignalReceiveEvent({
dataMessage: {
message: "reply",
quote: {
id: "1700000000000abc",
text: "quoted context",
},
attachments: [],
},
}),
);
expect(capture.ctx).toBeTruthy();
expect(capture.ctx?.ReplyToId).toBeUndefined();
expect(capture.ctx?.ReplyToBody).toBe("quoted context");
expect(String(capture.ctx?.Body ?? "")).not.toContain("id:1700000000000abc");
});
it("drops own UUID inbound messages when only accountUuid is configured", async () => {
const ownUuid = "123e4567-e89b-12d3-a456-426614174000";
const handler = createSignalEventHandler(

View File

@ -6,6 +6,7 @@ import {
createBaseSignalEventHandlerDeps,
createSignalReceiveEvent,
} from "./event-handler.test-harness.js";
import type { SignalQuote } from "./event-handler.types.js";
type SignalMsgContext = Pick<MsgContext, "Body" | "WasMentioned"> & {
Body?: string;
@ -32,6 +33,7 @@ type GroupEventOpts = {
message?: string;
attachments?: unknown[];
quoteText?: string;
quote?: SignalQuote;
mentions?: Array<{
uuid?: string;
number?: string;
@ -45,7 +47,7 @@ function makeGroupEvent(opts: GroupEventOpts) {
dataMessage: {
message: opts.message ?? "",
attachments: opts.attachments ?? [],
quote: opts.quoteText ? { text: opts.quoteText } : undefined,
quote: opts.quote ?? (opts.quoteText ? { text: opts.quoteText } : undefined),
mentions: opts.mentions ?? undefined,
groupInfo: { groupId: "g1", groupName: "Test Group" },
},
@ -203,6 +205,33 @@ describe("signal mention gating", () => {
await expectSkippedGroupHistory({ message: "", quoteText: "quoted context" }, "quoted context");
});
it("records quoted media placeholders in pending history for skipped quote-only group messages", async () => {
await expectSkippedGroupHistory(
{
message: "",
quote: {
id: 1700000000000,
attachments: [{ contentType: "image/png", filename: "photo.png" }],
},
},
"<media:image>",
);
});
it("hydrates quote mentions in pending history for skipped quote-only group messages", async () => {
const placeholder = "\uFFFC";
await expectSkippedGroupHistory(
{
message: "",
quote: {
text: `${placeholder} quoted context`,
mentions: [{ uuid: "123e4567", start: 0, length: placeholder.length }],
},
},
"@123e4567 quoted context",
);
});
it("bypasses mention gating for authorized control commands", async () => {
capturedCtx = undefined;
const handler = createMentionHandler({ requireMention: true });

View File

@ -0,0 +1,414 @@
import { beforeEach, describe, expect, it, vi } from "vitest";
import { buildInboundUserContextPrefix } from "../../../../src/auto-reply/reply/inbound-meta.js";
import type { MsgContext } from "../../../../src/auto-reply/templating.js";
import { createSignalEventHandler } from "./event-handler.js";
import {
createBaseSignalEventHandlerDeps,
createSignalReceiveEvent,
} from "./event-handler.test-harness.js";
import type { SignalEventHandlerDeps } from "./event-handler.types.js";
type CapturedSignalQuoteContext = Pick<
MsgContext,
"Body" | "BodyForAgent" | "ReplyToBody" | "ReplyToId" | "ReplyToIsQuote" | "ReplyToSender"
> & {
Body?: string;
BodyForAgent?: string;
ReplyToBody?: string;
ReplyToId?: string;
ReplyToIsQuote?: boolean;
ReplyToSender?: string;
};
let capturedCtx: CapturedSignalQuoteContext | undefined;
const { dispatchInboundMessageMock } = vi.hoisted(() => ({
dispatchInboundMessageMock: vi.fn(),
}));
function getCapturedCtx() {
return capturedCtx as CapturedSignalQuoteContext;
}
vi.mock("../../../../src/auto-reply/dispatch.js", async (importOriginal) => {
const actual = await importOriginal<typeof import("../../../../src/auto-reply/dispatch.js")>();
return {
...actual,
dispatchInboundMessage: dispatchInboundMessageMock,
dispatchInboundMessageWithDispatcher: dispatchInboundMessageMock,
dispatchInboundMessageWithBufferedDispatcher: dispatchInboundMessageMock,
};
});
vi.mock("../send.js", () => ({
sendMessageSignal: vi.fn(),
sendTypingSignal: vi.fn().mockResolvedValue(true),
sendReadReceiptSignal: vi.fn().mockResolvedValue(true),
}));
vi.mock("../../../../src/pairing/pairing-store.js", () => ({
readChannelAllowFromStore: vi.fn().mockResolvedValue([]),
upsertChannelPairingRequest: vi.fn(),
}));
function createQuoteHandler(overrides: Partial<SignalEventHandlerDeps> = {}) {
return createSignalEventHandler(
createBaseSignalEventHandlerDeps({
// oxlint-disable-next-line typescript/no-explicit-any
cfg: { messages: { inbound: { debounceMs: 0 } } } as any,
historyLimit: 0,
...overrides,
}),
);
}
describe("signal quote reply handling", () => {
beforeEach(() => {
capturedCtx = undefined;
dispatchInboundMessageMock.mockReset();
dispatchInboundMessageMock.mockImplementation(async (params: { ctx: unknown }) => {
capturedCtx = params.ctx as CapturedSignalQuoteContext;
return { queuedFinal: false, counts: { tool: 0, block: 0, final: 0 } };
});
});
it("surfaces quoted text in reply metadata while preserving the new message text", async () => {
const handler = createQuoteHandler();
await handler(
createSignalReceiveEvent({
sourceNumber: "+15550002222",
sourceName: "Bob",
timestamp: 1700000000001,
dataMessage: {
message: "Thanks for the info!",
quote: {
id: 1700000000000,
authorNumber: "+15550003333",
text: "The meeting is at 3pm",
},
},
}),
);
const ctx = getCapturedCtx();
expect(ctx?.BodyForAgent).toBe("Thanks for the info!");
expect(ctx?.ReplyToId).toBe("1700000000000");
expect(ctx?.ReplyToBody).toBe("The meeting is at 3pm");
expect(ctx?.ReplyToSender).toBe("+15550003333");
expect(ctx?.ReplyToIsQuote).toBe(true);
expect(String(ctx?.Body ?? "")).toContain("Thanks for the info!");
expect(String(ctx?.Body ?? "")).toContain("[Quoting +15550003333 id:1700000000000]");
});
it("keeps quote-only replies and exposes the replied-message context block", async () => {
const handler = createQuoteHandler();
await handler(
createSignalReceiveEvent({
dataMessage: {
message: "",
quote: {
id: 1700000000000,
authorNumber: "+15550002222",
text: "Original message to quote",
},
},
}),
);
const ctx = getCapturedCtx();
expect(ctx).toBeTruthy();
expect(ctx?.BodyForAgent).toBe("");
expect(ctx?.ReplyToBody).toBe("Original message to quote");
const userContext = buildInboundUserContextPrefix(ctx as MsgContext);
expect(userContext).toContain("Replied message (untrusted, for context):");
expect(userContext).toContain('"body": "Original message to quote"');
expect(userContext).toContain('"sender_label": "+15550002222"');
});
it("hydrates Signal mentions inside quoted text before surfacing reply context", async () => {
const handler = createQuoteHandler();
const placeholder = "\uFFFC";
await handler(
createSignalReceiveEvent({
dataMessage: {
message: "Replying now",
quote: {
id: 1700000000000,
text: `${placeholder} can you check this?`,
mentions: [{ uuid: "123e4567", start: 0, length: placeholder.length }],
},
},
}),
);
const ctx = getCapturedCtx();
expect(ctx?.ReplyToBody).toBe("@123e4567 can you check this?");
expect(String(ctx?.Body ?? "")).toContain('"@123e4567 can you check this?"');
});
it("uses quoted attachment placeholders for media replies without text", async () => {
const handler = createQuoteHandler();
await handler(
createSignalReceiveEvent({
dataMessage: {
message: "Nice photo!",
quote: {
id: 1700000000000,
authorUuid: "123e4567-e89b-12d3-a456-426614174000",
text: null,
attachments: [{ contentType: "image/jpeg" }],
},
},
}),
);
const ctx = getCapturedCtx();
expect(ctx?.ReplyToBody).toBe("<media:image>");
expect(ctx?.ReplyToSender).toBe("uuid:123e4567-e89b-12d3-a456-426614174000");
});
it("falls back to a generic quoted body when signal-cli sends an empty quoted text string", async () => {
const handler = createQuoteHandler();
await handler(
createSignalReceiveEvent({
dataMessage: {
message: "Replying to media",
quote: {
id: 1700000000000,
text: "",
attachments: [],
},
},
}),
);
const ctx = getCapturedCtx();
expect(ctx?.ReplyToId).toBe("1700000000000");
expect(ctx?.ReplyToBody).toBe("<quoted message>");
});
it("drops invalid quote ids from reply metadata but keeps valid quoted text", async () => {
const handler = createQuoteHandler();
await handler(
createSignalReceiveEvent({
dataMessage: {
message: "I saw this",
quote: {
id: "1700000000000abc",
authorNumber: "+15550002222",
text: "Original text",
},
},
}),
);
const ctx = getCapturedCtx();
expect(ctx?.ReplyToId).toBeUndefined();
expect(ctx?.ReplyToBody).toBe("Original text");
expect(String(ctx?.Body ?? "")).not.toContain("id:1700000000000abc");
});
it("does not synthesize quote-only context from invalid negative ids", async () => {
const handler = createQuoteHandler();
await handler(
createSignalReceiveEvent({
dataMessage: {
message: "",
quote: {
id: -1,
text: null,
},
},
}),
);
expect(capturedCtx).toBeUndefined();
});
it("passes inherited reply state through deliverReplies for the current Signal message", async () => {
const deliverReplies = vi.fn().mockResolvedValue(undefined);
dispatchInboundMessageMock.mockImplementationOnce(
async (params: {
ctx: unknown;
dispatcher: {
sendToolResult: (payload: { text: string }) => boolean;
sendFinalReply: (payload: { text: string }) => boolean;
markComplete: () => void;
waitForIdle: () => Promise<void>;
};
}) => {
capturedCtx = params.ctx as CapturedSignalQuoteContext;
params.dispatcher.sendToolResult({ text: "First reply" });
params.dispatcher.sendFinalReply({ text: "Second reply" });
params.dispatcher.markComplete();
await params.dispatcher.waitForIdle();
return { queuedFinal: true, counts: { tool: 1, block: 0, final: 1 } };
},
);
const handler = createQuoteHandler({ deliverReplies });
await handler(
createSignalReceiveEvent({
dataMessage: {
message: "Incoming message",
},
}),
);
expect(deliverReplies).toHaveBeenCalledTimes(2);
expect(deliverReplies.mock.calls[0]?.[0].replies[0]?.replyToId).toBeUndefined();
expect(deliverReplies.mock.calls[1]?.[0].replies[0]?.replyToId).toBeUndefined();
expect(deliverReplies.mock.calls[0]?.[0].inheritedReplyToId).toBe("1700000000000");
expect(deliverReplies.mock.calls[1]?.[0].inheritedReplyToId).toBe("1700000000000");
expect(deliverReplies.mock.calls[0]?.[0].replyDeliveryState).toBe(
deliverReplies.mock.calls[1]?.[0].replyDeliveryState,
);
});
it("preserves explicit replyToId values on later deliveries in the same turn", async () => {
const deliverReplies = vi.fn().mockResolvedValue(undefined);
dispatchInboundMessageMock.mockImplementationOnce(
async (params: {
ctx: unknown;
dispatcher: {
sendToolResult: (payload: { text: string; replyToId: string }) => boolean;
sendFinalReply: (payload: { text: string; replyToId: string }) => boolean;
markComplete: () => void;
waitForIdle: () => Promise<void>;
};
}) => {
capturedCtx = params.ctx as CapturedSignalQuoteContext;
params.dispatcher.sendToolResult({ text: "First reply", replyToId: "1700000000001" });
params.dispatcher.sendFinalReply({ text: "Second reply", replyToId: "1700000000002" });
params.dispatcher.markComplete();
await params.dispatcher.waitForIdle();
return { queuedFinal: true, counts: { tool: 1, block: 0, final: 1 } };
},
);
const handler = createQuoteHandler({ deliverReplies });
await handler(
createSignalReceiveEvent({
dataMessage: {
message: "Incoming message",
},
}),
);
expect(deliverReplies).toHaveBeenCalledTimes(2);
expect(deliverReplies.mock.calls[0]?.[0].replies[0]?.replyToId).toBe("1700000000001");
expect(deliverReplies.mock.calls[1]?.[0].replies[0]?.replyToId).toBe("1700000000002");
});
it("resolves missing quote authors from previously seen group messages", async () => {
const handler = createQuoteHandler();
await handler(
createSignalReceiveEvent({
sourceNumber: "+15550002222",
sourceName: "Bob",
timestamp: 1700000000001,
dataMessage: {
message: "The meeting is at 3pm",
groupInfo: { groupId: "g1", groupName: "Test Group" },
},
}),
);
capturedCtx = undefined;
await handler(
createSignalReceiveEvent({
sourceNumber: "+15550003333",
sourceName: "Alice",
timestamp: 1700000000002,
dataMessage: {
message: "Thanks for the info!",
groupInfo: { groupId: "g1", groupName: "Test Group" },
quote: {
id: 1700000000001,
text: "The meeting is at 3pm",
},
},
}),
);
const ctx = getCapturedCtx();
expect(ctx?.ReplyToSender).toBe("+15550002222");
expect(String(ctx?.Body ?? "")).toContain("[Quoting +15550002222 id:1700000000001]");
});
it("resolves cached uuid senders with a uuid: prefix", async () => {
const handler = createQuoteHandler();
const senderUuid = "123e4567-e89b-12d3-a456-426614174000";
await handler(
createSignalReceiveEvent({
sourceNumber: null,
sourceUuid: senderUuid,
sourceName: "Bob",
timestamp: 1700000000001,
dataMessage: {
message: "The meeting is at 3pm",
groupInfo: { groupId: "g1", groupName: "Test Group" },
},
}),
);
capturedCtx = undefined;
await handler(
createSignalReceiveEvent({
sourceNumber: "+15550003333",
sourceName: "Alice",
timestamp: 1700000000002,
dataMessage: {
message: "Thanks for the info!",
groupInfo: { groupId: "g1", groupName: "Test Group" },
quote: {
id: 1700000000001,
text: "The meeting is at 3pm",
},
},
}),
);
const ctx = getCapturedCtx();
expect(ctx?.ReplyToSender).toBe(`uuid:${senderUuid}`);
expect(String(ctx?.Body ?? "")).toContain(`[Quoting uuid:${senderUuid} id:1700000000001]`);
});
it("preserves uuid: prefix in quote author normalization", async () => {
const handler = createQuoteHandler();
await handler(
createSignalReceiveEvent({
sourceNumber: "+15550002222",
sourceName: "Bob",
timestamp: 1700000000001,
dataMessage: {
message: "Thanks for the info!",
groupInfo: { groupId: "g1", groupName: "Test Group" },
quote: {
id: 1700000000000,
authorUuid: "uuid:01234567-89ab-cdef-0123-456789abcdef",
text: "The meeting is at 3pm",
},
},
}),
);
const ctx = getCapturedCtx();
expect(ctx?.ReplyToSender).toBe("uuid:01234567-89ab-cdef-0123-456789abcdef");
expect(String(ctx?.Body ?? "")).toContain(
"[Quoting uuid:01234567-89ab-cdef-0123-456789abcdef id:1700000000000]",
);
});
});

View File

@ -41,6 +41,7 @@ import {
formatSignalSenderDisplay,
formatSignalSenderId,
isSignalSenderAllowed,
looksLikeUuid,
normalizeSignalAllowRecipient,
resolveSignalPeerId,
resolveSignalRecipient,
@ -55,8 +56,10 @@ import type {
SignalEventHandlerDeps,
SignalReactionMessage,
SignalReceivePayload,
SignalReplyTarget,
} from "./event-handler.types.js";
import { renderSignalMentions } from "./mentions.js";
import { describeSignalReplyTarget } from "./quote-context.js";
function formatAttachmentKindCount(kind: string, count: number): string {
if (kind === "attachment") {
@ -96,6 +99,66 @@ function resolveSignalInboundRoute(params: {
}
export function createSignalEventHandler(deps: SignalEventHandlerDeps) {
// Best-effort index to map (conversation, messageId) -> senderRecipient for quote-author resolution.
// signal-cli requires quote-author for group quotes.
const messageAuthorIndex = new Map<string, string>();
const MAX_MESSAGE_AUTHOR_INDEX = 5000;
const resolveConversationKey = (entry: {
isGroup: boolean;
groupId?: string;
senderPeerId: string;
}): string =>
entry.isGroup ? `group:${entry.groupId ?? "unknown"}` : `dm:${entry.senderPeerId}`;
const normalizeCachedMessageAuthor = (raw?: string) => {
const trimmed = raw?.trim();
if (!trimmed) {
return undefined;
}
if (trimmed.toLowerCase().startsWith("uuid:")) {
const uuid = trimmed.slice("uuid:".length).trim();
return uuid ? `uuid:${uuid}` : undefined;
}
return looksLikeUuid(trimmed) ? `uuid:${trimmed}` : trimmed;
};
const rememberMessageAuthor = (params: {
conversationKey: string;
messageId?: string;
senderRecipient?: string;
}) => {
const id = params.messageId?.trim();
const senderRecipient = normalizeCachedMessageAuthor(params.senderRecipient);
if (!id || !senderRecipient) {
return;
}
const key = `${params.conversationKey}:${id}`;
// Refresh insertion order for simple LRU-style eviction.
if (messageAuthorIndex.has(key)) {
messageAuthorIndex.delete(key);
}
messageAuthorIndex.set(key, senderRecipient);
while (messageAuthorIndex.size > MAX_MESSAGE_AUTHOR_INDEX) {
const oldest = messageAuthorIndex.keys().next().value;
if (oldest === undefined) {
break;
}
messageAuthorIndex.delete(oldest);
}
};
const resolveQuotedAuthor = (params: {
conversationKey: string;
replyToId?: string;
}): string | undefined => {
const replyToId = params.replyToId?.trim();
if (!replyToId) {
return undefined;
}
return messageAuthorIndex.get(`${params.conversationKey}:${replyToId}`);
};
type SignalInboundEntry = {
senderName: string;
senderDisplay: string;
@ -114,6 +177,8 @@ export function createSignalEventHandler(deps: SignalEventHandlerDeps) {
mediaTypes?: string[];
commandAuthorized: boolean;
wasMentioned?: boolean;
// Quote context fields
quoteTarget?: SignalReplyTarget;
};
async function handleSignalInboundMessage(entry: SignalInboundEntry) {
@ -140,11 +205,18 @@ export function createSignalEventHandler(deps: SignalEventHandlerDeps) {
storePath,
sessionKey: route.sessionKey,
});
const quoteBlock = entry.quoteTarget
? `[Quoting ${entry.quoteTarget.author ?? "unknown"}${
entry.quoteTarget.id ? ` id:${entry.quoteTarget.id}` : ""
}]\n"${entry.quoteTarget.body}"\n[/Quoting]`
: "";
const bodyWithQuote = [entry.bodyText, quoteBlock].filter(Boolean).join("\n\n");
const body = formatInboundEnvelope({
channel: "Signal",
from: fromLabel,
timestamp: entry.timestamp ?? undefined,
body: entry.bodyText,
body: bodyWithQuote,
chatType: entry.isGroup ? "group" : "direct",
sender: { name: entry.senderName, id: entry.senderDisplay },
previousTimestamp,
@ -205,6 +277,11 @@ export function createSignalEventHandler(deps: SignalEventHandlerDeps) {
Provider: "signal" as const,
Surface: "signal" as const,
MessageSid: entry.messageId,
// Quote/Reply context fields
ReplyToId: entry.quoteTarget?.id,
ReplyToBody: entry.quoteTarget?.body,
ReplyToSender: entry.quoteTarget?.author,
ReplyToIsQuote: entry.quoteTarget ? true : undefined,
Timestamp: entry.timestamp ?? undefined,
MediaPath: entry.mediaPath,
MediaType: entry.mediaType,
@ -286,11 +363,14 @@ export function createSignalEventHandler(deps: SignalEventHandlerDeps) {
},
});
const replyDeliveryState = { consumed: false };
const { dispatcher, replyOptions, markDispatchIdle } = createReplyDispatcherWithTyping({
...replyPipeline,
humanDelay: resolveHumanDelayConfig(deps.cfg, route.agentId),
typingCallbacks,
deliver: async (payload) => {
const conversationKey = resolveConversationKey(entry);
await deps.deliverReplies({
replies: [payload],
target: ctxPayload.To,
@ -300,6 +380,19 @@ export function createSignalEventHandler(deps: SignalEventHandlerDeps) {
runtime: deps.runtime,
maxBytes: deps.mediaMaxBytes,
textLimit: deps.textLimit,
inheritedReplyToId: entry.messageId,
replyDeliveryState,
resolveQuoteAuthor: (replyToId) => {
const resolvedAuthor = resolveQuotedAuthor({ conversationKey, replyToId });
if (resolvedAuthor) {
return resolvedAuthor;
}
// Don't pin explicit reply IDs to the current sender. Only fall back to the
// inbound sender when we're still replying to the current Signal message.
return replyToId === entry.messageId
? normalizeCachedMessageAuthor(entry.senderRecipient)
: undefined;
},
});
},
onError: (err, info) => {
@ -371,6 +464,8 @@ export function createSignalEventHandler(deps: SignalEventHandlerDeps) {
if (!combinedText.trim()) {
return;
}
// Preserve quoteTarget from the earliest entry that has one
const earliestQuoteTarget = entries.find((entry) => entry.quoteTarget)?.quoteTarget;
await handleSignalInboundMessage({
...last,
bodyText: combinedText,
@ -378,6 +473,7 @@ export function createSignalEventHandler(deps: SignalEventHandlerDeps) {
mediaType: undefined,
mediaPaths: undefined,
mediaTypes: undefined,
quoteTarget: earliestQuoteTarget ?? last.quoteTarget,
});
},
onError: (err) => {
@ -519,10 +615,28 @@ export function createSignalEventHandler(deps: SignalEventHandlerDeps) {
const rawMessage = dataMessage?.message ?? "";
const normalizedMessage = renderSignalMentions(rawMessage, dataMessage?.mentions);
const messageText = normalizedMessage.trim();
const quoteText = dataMessage?.quote?.text?.trim() ?? "";
const senderPeerId = resolveSignalPeerId(sender);
const groupId = dataMessage?.groupInfo?.groupId ?? undefined;
const groupName = dataMessage?.groupInfo?.groupName ?? undefined;
const isGroup = Boolean(groupId);
const conversationKey = resolveConversationKey({
isGroup,
groupId,
senderPeerId,
});
// NOTE: Full group-quote author resolution currently depends on the patched
// signal-cli install at /opt/signal-cli-0.14.1-patched/, which removes the
// quote.getAuthor().isValid() filter. Once upstream ships lib v140 on JitPack,
// the stock signal-cli build should provide the same quote metadata.
const quoteTarget = dataMessage
? (describeSignalReplyTarget(dataMessage, {
resolveAuthor: resolveQuotedAuthor,
conversationKey,
}) ?? undefined)
: undefined;
const hasBodyContent =
Boolean(messageText || quoteText) || Boolean(!reaction && dataMessage?.attachments?.length);
Boolean(messageText || quoteTarget?.body) ||
Boolean(!reaction && dataMessage?.attachments?.length);
const senderDisplay = formatSignalSenderDisplay(sender);
const { resolveAccessDecision, dmAccess, effectiveDmAllow, effectiveGroupAllow } =
await resolveSignalAccessState({
@ -552,15 +666,11 @@ export function createSignalEventHandler(deps: SignalEventHandlerDeps) {
}
const senderRecipient = resolveSignalRecipient(sender);
const senderPeerId = resolveSignalPeerId(sender);
const senderAllowId = formatSignalSenderId(sender);
if (!senderRecipient) {
return;
}
const senderIdLine = formatSignalPairingIdLine(sender);
const groupId = dataMessage.groupInfo?.groupId ?? undefined;
const groupName = dataMessage.groupInfo?.groupName ?? undefined;
const isGroup = Boolean(groupId);
if (!isGroup) {
const allowedDirectMessage = await handleSignalDirectMessageAccess({
@ -654,6 +764,7 @@ export function createSignalEventHandler(deps: SignalEventHandlerDeps) {
commandAuthorized,
});
const effectiveWasMentioned = mentionGate.effectiveWasMentioned;
if (isGroup && requireMention && canDetectMention && mentionGate.shouldSkip) {
logInboundDrop({
log: logVerbose,
@ -661,7 +772,6 @@ export function createSignalEventHandler(deps: SignalEventHandlerDeps) {
reason: "no mention",
target: senderDisplay,
});
const quoteText = dataMessage.quote?.text?.trim() || "";
const pendingPlaceholder = (() => {
if (!dataMessage.attachments?.length) {
return "";
@ -681,7 +791,7 @@ export function createSignalEventHandler(deps: SignalEventHandlerDeps) {
const pendingKind = kindFromMime(firstContentType ?? undefined);
return pendingKind ? `<media:${pendingKind}>` : "<media:attachment>";
})();
const pendingBodyText = messageText || pendingPlaceholder || quoteText;
const pendingBodyText = messageText || pendingPlaceholder || quoteTarget?.body || "";
const historyKey = groupId ?? "unknown";
recordPendingHistoryEntryIfEnabled({
historyMap: deps.groupHistories,
@ -695,6 +805,12 @@ export function createSignalEventHandler(deps: SignalEventHandlerDeps) {
typeof envelope.timestamp === "number" ? String(envelope.timestamp) : undefined,
},
});
// Index the message author even when skipping due to no mention
rememberMessageAuthor({
conversationKey,
messageId: typeof envelope.timestamp === "number" ? String(envelope.timestamp) : undefined,
senderRecipient: senderRecipient,
});
return;
}
@ -745,11 +861,13 @@ export function createSignalEventHandler(deps: SignalEventHandlerDeps) {
}
}
const bodyText = messageText || placeholder || dataMessage.quote?.text?.trim() || "";
if (!bodyText) {
// Build body text - don't use quote text as fallback, preserve it for context
const bodyText = messageText || placeholder || "";
// Continue if we have either body text OR quote context (quote-only messages are valid)
if (!bodyText && !quoteTarget) {
return;
}
const receiptTimestamp =
typeof envelope.timestamp === "number"
? envelope.timestamp
@ -778,6 +896,22 @@ export function createSignalEventHandler(deps: SignalEventHandlerDeps) {
const senderName = envelope.sourceName ?? senderDisplay;
const messageId =
typeof envelope.timestamp === "number" ? String(envelope.timestamp) : undefined;
// Record (conversation, messageId) -> senderRecipient so later explicit reply tags
// ([[reply_to:<id>]]) can resolve a correct quote-author.
rememberMessageAuthor({
conversationKey,
messageId,
senderRecipient,
});
if (quoteTarget?.id && quoteTarget.author) {
rememberMessageAuthor({
conversationKey,
messageId: quoteTarget.id,
senderRecipient: quoteTarget.author,
});
}
await inboundDebouncer.enqueue({
senderName,
senderDisplay,
@ -796,6 +930,7 @@ export function createSignalEventHandler(deps: SignalEventHandlerDeps) {
mediaTypes: mediaTypes.length > 0 ? mediaTypes : undefined,
commandAuthorized,
wasMentioned: effectiveWasMentioned,
quoteTarget,
});
};
}

View File

@ -8,6 +8,7 @@ import type { HistoryEntry } from "openclaw/plugin-sdk/reply-history";
import type { ReplyPayload } from "openclaw/plugin-sdk/reply-runtime";
import type { RuntimeEnv } from "openclaw/plugin-sdk/runtime-env";
import type { SignalSender } from "../identity.js";
import type { SignalReplyDeliveryState } from "./reply-delivery.js";
export type SignalEnvelope = {
sourceNumber?: string | null;
@ -28,6 +29,37 @@ export type SignalMention = {
length?: number | null;
};
export type SignalTextStyle = {
start?: number | null;
length?: number | null;
style?: string | null;
};
export type SignalQuotedAttachment = {
contentType?: string | null;
filename?: string | null;
thumbnail?: SignalAttachment | null;
};
export type SignalQuote = {
id?: number | string | null; // signal-cli quote timestamp
author?: string | null; // deprecated legacy identifier from signal-cli
authorNumber?: string | null; // preferred E.164 author when available
authorUuid?: string | null; // preferred UUID author when available
text?: string | null;
mentions?: Array<SignalMention | null> | null;
attachments?: Array<SignalQuotedAttachment | null> | null;
textStyles?: Array<SignalTextStyle | null> | null;
};
export type SignalReplyTarget = {
id?: string; // message id of quoted message
author?: string; // who wrote the quoted message
body: string; // quoted text content
kind: "quote"; // always quote for Signal
mentions?: Array<SignalMention>; // mentions in quoted text
};
export type SignalDataMessage = {
timestamp?: number;
message?: string | null;
@ -37,7 +69,7 @@ export type SignalDataMessage = {
groupId?: string | null;
groupName?: string | null;
} | null;
quote?: { text?: string | null } | null;
quote?: SignalQuote | null;
reaction?: SignalReactionMessage | null;
};
@ -109,6 +141,9 @@ export type SignalEventHandlerDeps = {
runtime: RuntimeEnv;
maxBytes: number;
textLimit: number;
inheritedReplyToId?: string;
replyDeliveryState?: SignalReplyDeliveryState;
resolveQuoteAuthor?: (replyToId: string) => string | undefined;
}) => Promise<void>;
resolveSignalReactionTargets: (reaction: SignalReactionMessage) => SignalReactionTarget[];
isSignalReactionMessage: (

View File

@ -0,0 +1,107 @@
import { kindFromMime } from "../../../../src/media/mime.js";
import { normalizeE164 } from "../../../../src/utils.js";
import { looksLikeUuid } from "../identity.js";
import type {
SignalDataMessage,
SignalMention,
SignalQuote,
SignalQuotedAttachment,
SignalReplyTarget,
} from "./event-handler.types.js";
import { renderSignalMentions } from "./mentions.js";
export type SignalQuotedAuthorResolver = (params: {
conversationKey: string;
replyToId: string;
}) => string | undefined;
function filterMentions(mentions?: Array<SignalMention | null> | null) {
const filtered = mentions?.filter((mention): mention is SignalMention => mention != null);
return filtered && filtered.length > 0 ? filtered : undefined;
}
function normalizeQuoteAuthorValue(raw?: string | null) {
const trimmed = raw?.trim();
if (!trimmed) {
return undefined;
}
const unprefixed = trimmed.replace(/^uuid:/i, "").trim();
if (!unprefixed) {
return undefined;
}
if (looksLikeUuid(unprefixed)) {
// Preserve uuid: prefix for signal-cli compatibility
return `uuid:${unprefixed}`;
}
const digits = unprefixed.replace(/[^\d+]/g, "");
return digits ? normalizeE164(unprefixed) : undefined;
}
function resolveQuotedAuthorFromPayload(quote: SignalQuote) {
return (
normalizeQuoteAuthorValue(quote.authorNumber) ??
normalizeQuoteAuthorValue(quote.authorUuid) ??
normalizeQuoteAuthorValue(quote.author)
);
}
function resolveQuotedAttachmentPlaceholder(
attachments?: Array<SignalQuotedAttachment | null> | null,
) {
const firstContentType = attachments?.find((attachment) => attachment?.contentType)?.contentType;
const kind = kindFromMime(firstContentType ?? undefined);
if (kind) {
return `<media:${kind}>`;
}
return attachments?.length ? "<media:attachment>" : undefined;
}
export function normalizeSignalQuoteId(rawId?: SignalQuote["id"]) {
if (typeof rawId === "number") {
return Number.isInteger(rawId) && rawId > 0 ? String(rawId) : undefined;
}
const trimmed = rawId?.trim();
if (!trimmed) {
return undefined;
}
const numeric = Number(trimmed);
return Number.isInteger(numeric) && numeric > 0 ? String(numeric) : undefined;
}
export function describeSignalReplyTarget(
dataMessage: SignalDataMessage,
opts: {
resolveAuthor?: SignalQuotedAuthorResolver;
conversationKey?: string;
} = {},
): SignalReplyTarget | null {
const quote = dataMessage.quote;
if (!quote) {
return null;
}
const id = normalizeSignalQuoteId(quote.id);
const mentions = filterMentions(quote.mentions);
const renderedText = renderSignalMentions(quote.text ?? "", mentions)?.trim() || "";
const body =
renderedText ||
resolveQuotedAttachmentPlaceholder(quote.attachments) ||
(id ? "<quoted message>" : "");
if (!body) {
return null;
}
const author =
resolveQuotedAuthorFromPayload(quote) ??
(opts.resolveAuthor && opts.conversationKey && id
? opts.resolveAuthor({ conversationKey: opts.conversationKey, replyToId: id })
: undefined);
return {
id,
author,
body,
kind: "quote",
mentions,
};
}

View File

@ -0,0 +1,108 @@
import { describe, expect, it } from "vitest";
import {
markSignalReplyConsumed,
resolveSignalReplyDelivery,
type SignalReplyDeliveryState,
} from "./reply-delivery.js";
describe("resolveSignalReplyDelivery", () => {
it("uses the inherited reply target until the first successful send", () => {
const state: SignalReplyDeliveryState = { consumed: false };
const first = resolveSignalReplyDelivery({
payload: { text: "first" },
inheritedReplyToId: "1700000000000",
state,
});
markSignalReplyConsumed(state, first.effectiveReplyTo);
const second = resolveSignalReplyDelivery({
payload: { text: "second" },
inheritedReplyToId: "1700000000000",
state,
});
expect(first.payload.replyToId).toBeUndefined();
expect(first.effectiveReplyTo).toBe("1700000000000");
expect(second.payload.replyToId).toBeUndefined();
expect(second.effectiveReplyTo).toBeUndefined();
});
it("keeps explicit reply targets after the inherited reply target is consumed", () => {
const state: SignalReplyDeliveryState = { consumed: true };
const explicit = resolveSignalReplyDelivery({
payload: { text: "second", replyToId: "1700000000002" },
inheritedReplyToId: "1700000000000",
state,
});
expect(explicit.payload.replyToId).toBe("1700000000002");
expect(explicit.effectiveReplyTo).toBe("1700000000002");
});
it("preserves explicit null reply suppression without consuming inherited reply state", () => {
const state: SignalReplyDeliveryState = { consumed: false };
const suppressed = resolveSignalReplyDelivery({
payload: { text: "first", replyToId: null },
inheritedReplyToId: "1700000000000",
state,
});
markSignalReplyConsumed(state, suppressed.effectiveReplyTo);
const inherited = resolveSignalReplyDelivery({
payload: { text: "second" },
inheritedReplyToId: "1700000000000",
state,
});
expect(suppressed.payload.replyToId).toBeUndefined();
expect(suppressed.effectiveReplyTo).toBeUndefined();
expect(inherited.effectiveReplyTo).toBe("1700000000000");
});
it("does not consume inherited reply state for non-decimal reply ids", () => {
const state: SignalReplyDeliveryState = { consumed: false };
// Simulate a malformed reply_to tag that resolved to a non-timestamp string
const malformed = resolveSignalReplyDelivery({
payload: { text: "first", replyToId: "not-a-timestamp" },
inheritedReplyToId: "1700000000000",
state,
});
markSignalReplyConsumed(state, malformed.effectiveReplyTo);
// The inherited reply should still be available for the next payload
const next = resolveSignalReplyDelivery({
payload: { text: "second" },
inheritedReplyToId: "1700000000000",
state,
});
expect(malformed.effectiveReplyTo).toBe("not-a-timestamp");
expect(state.consumed).toBe(false);
expect(next.effectiveReplyTo).toBe("1700000000000");
});
it("does not consume inherited reply state for zero timestamps", () => {
const state: SignalReplyDeliveryState = { consumed: false };
const zero = resolveSignalReplyDelivery({
payload: { text: "first", replyToId: "0" },
inheritedReplyToId: "1700000000000",
state,
});
markSignalReplyConsumed(state, zero.effectiveReplyTo);
const next = resolveSignalReplyDelivery({
payload: { text: "second" },
inheritedReplyToId: "1700000000000",
state,
});
expect(zero.effectiveReplyTo).toBe("0");
expect(state.consumed).toBe(false);
expect(next.effectiveReplyTo).toBe("1700000000000");
});
});

View File

@ -0,0 +1,66 @@
import type { ReplyPayload } from "../../../../src/auto-reply/types.js";
export type SignalReplyDeliveryState = {
consumed: boolean;
};
function normalizeReplyToId(raw?: string | null) {
if (raw == null) {
return raw;
}
const trimmed = raw.trim();
return trimmed ? trimmed : null;
}
export function resolveSignalReplyDelivery(params: {
payload: ReplyPayload;
inheritedReplyToId?: string | null;
state?: SignalReplyDeliveryState;
}): {
payload: ReplyPayload;
effectiveReplyTo?: string;
} {
const explicitReplyTo =
"replyToId" in params.payload ? normalizeReplyToId(params.payload.replyToId) : undefined;
const inheritedReplyTo =
explicitReplyTo === undefined ? normalizeReplyToId(params.inheritedReplyToId) : undefined;
const effectiveReplyTo =
explicitReplyTo != null
? explicitReplyTo
: !params.state?.consumed
? (inheritedReplyTo ?? undefined)
: undefined;
if (explicitReplyTo === undefined) {
return {
payload: params.payload,
effectiveReplyTo,
};
}
return {
payload:
explicitReplyTo === null
? { ...params.payload, replyToId: undefined }
: params.payload.replyToId === explicitReplyTo
? params.payload
: { ...params.payload, replyToId: explicitReplyTo },
effectiveReplyTo,
};
}
/**
* Only consume the inherited reply state when the reply id is a valid Signal
* timestamp (pure decimal string, > 0). Malformed ids (e.g. a raw
* `[[reply_to:...]]` tag that wasn't resolved) would be silently dropped by
* `sendMessageSignal`, so consuming state for them would lose the inherited
* quote for subsequent payloads in the same turn.
*/
export function markSignalReplyConsumed(
state: SignalReplyDeliveryState | undefined,
replyToId?: string,
) {
if (state && replyToId && /^\d+$/.test(replyToId) && parseInt(replyToId, 10) > 0) {
state.consumed = true;
}
}

View File

@ -31,7 +31,16 @@ export const signalOutbound: ChannelOutboundAdapter = {
chunker: (text, _limit) => text.split(/\n{2,}/).flatMap((chunk) => (chunk ? [chunk] : [])),
chunkerMode: "text",
textChunkLimit: 4000,
sendFormattedText: async ({ cfg, to, text, accountId, deps, abortSignal }) => {
sendFormattedText: async ({
cfg,
to,
text,
accountId,
deps,
abortSignal,
replyToId,
quoteAuthor,
}) => {
const send = resolveSignalSender(deps);
const maxBytes = resolveSignalMaxBytes({
cfg,
@ -49,6 +58,7 @@ export const signalOutbound: ChannelOutboundAdapter = {
chunks = [{ text, styles: [] }];
}
const results = [];
let first = true;
for (const chunk of chunks) {
abortSignal?.throwIfAborted();
const result = await send(to, chunk.text, {
@ -57,8 +67,11 @@ export const signalOutbound: ChannelOutboundAdapter = {
accountId: accountId ?? undefined,
textMode: "plain",
textStyles: chunk.styles,
replyTo: first ? (replyToId ?? undefined) : undefined,
quoteAuthor: first ? (quoteAuthor ?? undefined) : undefined,
});
results.push(result);
first = false;
}
return attachChannelToResults("signal", results);
},
@ -71,6 +84,8 @@ export const signalOutbound: ChannelOutboundAdapter = {
accountId,
deps,
abortSignal,
replyToId,
quoteAuthor,
}) => {
abortSignal?.throwIfAborted();
const send = resolveSignalSender(deps);
@ -93,12 +108,14 @@ export const signalOutbound: ChannelOutboundAdapter = {
textMode: "plain",
textStyles: formatted.styles,
mediaLocalRoots,
replyTo: replyToId ?? undefined,
quoteAuthor: quoteAuthor ?? undefined,
});
return attachChannelToResult("signal", result);
},
...createAttachedChannelResultAdapter({
channel: "signal",
sendText: async ({ cfg, to, text, accountId, deps }) => {
sendText: async ({ cfg, to, text, accountId, deps, replyToId, quoteAuthor }) => {
const send = resolveSignalSender(deps);
const maxBytes = resolveSignalMaxBytes({
cfg,
@ -108,9 +125,21 @@ export const signalOutbound: ChannelOutboundAdapter = {
cfg,
maxBytes,
accountId: accountId ?? undefined,
replyTo: replyToId ?? undefined,
quoteAuthor: quoteAuthor ?? undefined,
});
},
sendMedia: async ({ cfg, to, text, mediaUrl, mediaLocalRoots, accountId, deps }) => {
sendMedia: async ({
cfg,
to,
text,
mediaUrl,
mediaLocalRoots,
accountId,
deps,
replyToId,
quoteAuthor,
}) => {
const send = resolveSignalSender(deps);
const maxBytes = resolveSignalMaxBytes({
cfg,
@ -122,6 +151,8 @@ export const signalOutbound: ChannelOutboundAdapter = {
maxBytes,
accountId: accountId ?? undefined,
mediaLocalRoots,
replyTo: replyToId ?? undefined,
quoteAuthor: quoteAuthor ?? undefined,
});
},
}),

View File

@ -0,0 +1,69 @@
import { beforeEach, describe, expect, it, vi } from "vitest";
import { sendMessageSignal } from "./send.js";
const rpcMock = vi.fn();
vi.mock("../config/config.js", async (importOriginal) => {
const actual = await importOriginal<typeof import("../config/config.js")>();
return {
...actual,
loadConfig: () => ({}),
};
});
vi.mock("./accounts.js", () => ({
resolveSignalAccount: () => ({
accountId: "default",
enabled: true,
baseUrl: "http://signal.local",
configured: true,
config: { account: "+15550001111" },
}),
}));
vi.mock("./client.js", () => ({
signalRpcRequest: (...args: unknown[]) => rpcMock(...args),
}));
describe("sendMessageSignal", () => {
beforeEach(() => {
rpcMock.mockReset().mockResolvedValue({ timestamp: 123 });
});
it("sends quote-author for group replies when quoteAuthor is available", async () => {
await sendMessageSignal("group:test-group", "hello", {
textMode: "plain",
replyTo: "1700000000000",
quoteAuthor: "uuid:sender-1",
});
const params = rpcMock.mock.calls[0]?.[1] as Record<string, unknown>;
expect(rpcMock).toHaveBeenCalledWith("send", expect.any(Object), expect.any(Object));
expect(params.groupId).toBe("test-group");
expect(params["quote-timestamp"]).toBe(1700000000000);
expect(params["quote-author"]).toBe("uuid:sender-1");
});
it("ignores replyTo values with trailing non-numeric characters", async () => {
await sendMessageSignal("+15551230000", "hello", {
textMode: "plain",
replyTo: "1700000000000abc",
quoteAuthor: "uuid:sender-1",
});
const params = rpcMock.mock.calls[0]?.[1] as Record<string, unknown>;
expect(params["quote-timestamp"]).toBeUndefined();
expect(params["quote-author"]).toBeUndefined();
});
it("skips group quote metadata when quoteAuthor is unavailable", async () => {
await sendMessageSignal("group:test-group", "hello", {
textMode: "plain",
replyTo: "1700000000000",
});
const params = rpcMock.mock.calls[0]?.[1] as Record<string, unknown>;
expect(params["quote-timestamp"]).toBeUndefined();
expect(params["quote-author"]).toBeUndefined();
});
});

View File

@ -18,6 +18,8 @@ export type SignalSendOpts = {
timeoutMs?: number;
textMode?: "markdown" | "plain";
textStyles?: SignalTextStyleRange[];
replyTo?: string;
quoteAuthor?: string;
};
export type SignalSendResult = {
@ -181,6 +183,31 @@ export async function sendMessageSignal(
}
Object.assign(params, targetParams);
// Add quote parameters for reply functionality
const rawReplyTo = opts.replyTo?.trim();
if (rawReplyTo) {
// Only accept pure decimal strings to avoid Number() accepting "1e3" or "0x10"
if (!/^\d+$/.test(rawReplyTo)) {
// Invalid replyTo format, skip quoting but allow message to send
} else {
const quoteTs = Number(rawReplyTo);
const quoteAuthor = opts.quoteAuthor?.trim() || undefined;
const isGroup = target.type === "group";
if (Number.isInteger(quoteTs) && quoteTs > 0) {
// In group chats, signal-cli requires quote-author. If we can't resolve it,
// skip quoting entirely to avoid hard send failures.
if (!isGroup || quoteAuthor) {
params["quote-timestamp"] = quoteTs;
if (quoteAuthor) {
params["quote-author"] = quoteAuthor;
}
}
}
// Note: Invalid replyTo values (non-numeric, negative) are silently ignored
// This prevents signal-cli errors while allowing the message to still send
}
}
const result = await signalRpcRequest<{ timestamp?: number }>("send", params, {
baseUrl,
timeoutMs: opts.timeoutMs,

View File

@ -649,7 +649,9 @@ export async function deliverReplies(params: {
try {
const deliveredCountBeforeReply = progress.deliveredCount;
const replyToId =
params.replyToMode === "off" ? undefined : resolveTelegramReplyId(reply.replyToId);
params.replyToMode === "off"
? undefined
: resolveTelegramReplyId(reply.replyToId ?? undefined);
const telegramData = reply.channelData?.telegram as TelegramReplyChannelData | undefined;
const shouldPinFirstMessage = telegramData?.pin === true;
const replyMarkup = buildInlineKeyboard(telegramData?.buttons);

View File

@ -59,7 +59,7 @@ export type EmbeddedPiRunResult = {
text?: string;
mediaUrl?: string;
mediaUrls?: string[];
replyToId?: string;
replyToId?: string | null;
isError?: boolean;
}>;
meta: EmbeddedPiRunMeta;

View File

@ -4,7 +4,7 @@ export const runEmbeddedPiAgentMock: Mock = vi.fn();
vi.mock("../agents/pi-embedded.js", () => ({
abortEmbeddedPiRun: vi.fn().mockReturnValue(false),
runEmbeddedPiAgent: (...args: unknown[]) => runEmbeddedPiAgentMock(...args),
runEmbeddedPiAgent: runEmbeddedPiAgentMock,
queueEmbeddedPiMessage: vi.fn().mockReturnValue(false),
resolveEmbeddedSessionLane: (key: string) => `session:${key.trim() || "main"}`,
isEmbeddedPiRunActive: vi.fn().mockReturnValue(false),

View File

@ -32,9 +32,12 @@ function resolveReplyThreadingForPayload(params: {
const implicitReplyToId = params.implicitReplyToId?.trim() || undefined;
const currentMessageId = params.currentMessageId?.trim() || undefined;
const hasExplicitReplyToId = params.payload.replyToId !== undefined;
// 1) Apply implicit reply threading first (replyToMode will strip later if needed).
// Treat replyToId=null as an explicit "do not reply" signal (do not override).
let resolved: ReplyPayload =
params.payload.replyToId || params.payload.replyToCurrent === false || !implicitReplyToId
hasExplicitReplyToId || params.payload.replyToCurrent === false || !implicitReplyToId
? params.payload
: { ...params.payload, replyToId: implicitReplyToId };
@ -55,7 +58,7 @@ function resolveReplyThreadingForPayload(params: {
// 3) If replyToCurrent was set out-of-band (e.g. tags already stripped upstream),
// ensure replyToId is set to the current message id when available.
if (resolved.replyToCurrent && !resolved.replyToId && currentMessageId) {
if (resolved.replyToCurrent && resolved.replyToId === undefined && currentMessageId) {
resolved = {
...resolved,
replyToId: currentMessageId,

View File

@ -159,6 +159,17 @@ describe("applyReplyThreading auto-threading", () => {
expect(result[0].replyToId).toBe("42");
});
it("does not overwrite explicit replyToId:null when implicit threading is available", () => {
const result = applyReplyThreading({
payloads: [{ text: "Hello", replyToId: null }],
replyToMode: "all",
currentMessageId: "42",
});
expect(result).toHaveLength(1);
expect(result[0].replyToId).toBeNull();
});
it("threads only first payload when mode is 'first'", () => {
const result = applyReplyThreading({
payloads: [{ text: "A" }, { text: "B" }],

View File

@ -81,7 +81,7 @@ export type ReplyPayload = {
btw?: {
question: string;
};
replyToId?: string;
replyToId?: string | null;
replyToTag?: boolean;
/** True when [[reply_to_current]] was present but not yet mapped to a message id. */
replyToCurrent?: boolean;

View File

@ -15,6 +15,7 @@ type DirectSendOptions = {
cfg: OpenClawConfig;
accountId?: string | null;
replyToId?: string | null;
quoteAuthor?: string | null;
mediaUrl?: string;
mediaLocalRoots?: readonly string[];
maxBytes?: number;
@ -78,6 +79,7 @@ export function createDirectTextMediaOutbound<
accountId?: string | null;
deps?: OutboundSendDeps;
replyToId?: string | null;
quoteAuthor?: string | null;
mediaUrl?: string;
mediaLocalRoots?: readonly string[];
buildOptions: (params: DirectSendOptions) => TOpts;
@ -96,6 +98,7 @@ export function createDirectTextMediaOutbound<
mediaLocalRoots: sendParams.mediaLocalRoots,
accountId: sendParams.accountId,
replyToId: sendParams.replyToId,
quoteAuthor: sendParams.quoteAuthor,
maxBytes,
}),
);
@ -109,7 +112,7 @@ export function createDirectTextMediaOutbound<
textChunkLimit: 4000,
sendPayload: async (ctx) =>
await sendTextMediaPayload({ channel: params.channel, ctx, adapter: outbound }),
sendText: async ({ cfg, to, text, accountId, deps, replyToId }) => {
sendText: async ({ cfg, to, text, accountId, deps, replyToId, quoteAuthor }) => {
return await sendDirect({
cfg,
to,
@ -117,10 +120,21 @@ export function createDirectTextMediaOutbound<
accountId,
deps,
replyToId,
quoteAuthor,
buildOptions: params.buildTextOptions,
});
},
sendMedia: async ({ cfg, to, text, mediaUrl, mediaLocalRoots, accountId, deps, replyToId }) => {
sendMedia: async ({
cfg,
to,
text,
mediaUrl,
mediaLocalRoots,
accountId,
deps,
replyToId,
quoteAuthor,
}) => {
return await sendDirect({
cfg,
to,
@ -130,6 +144,7 @@ export function createDirectTextMediaOutbound<
accountId,
deps,
replyToId,
quoteAuthor,
buildOptions: params.buildMediaOptions,
});
},

View File

@ -67,4 +67,32 @@ describe("signalOutbound", () => {
);
expect(result).toEqual({ channel: "signal", messageId: "sig-media-1", timestamp: 456 });
});
it("passes replyToId and quoteAuthor through sendText", async () => {
const sendSignal = vi.fn().mockResolvedValue({ messageId: "sig-text-2", timestamp: 789 });
const sendText = signalOutbound.sendText;
expect(sendText).toBeDefined();
const result = await sendText!({
cfg,
to: "group:test-group",
text: "hello",
accountId: "work",
replyToId: "1700000000000",
quoteAuthor: "uuid:sender-1",
deps: { sendSignal },
});
expect(sendSignal).toHaveBeenCalledWith(
"group:test-group",
"hello",
expect.objectContaining({
accountId: "work",
maxBytes: 4 * 1024 * 1024,
replyTo: "1700000000000",
quoteAuthor: "uuid:sender-1",
}),
);
expect(result).toEqual({ channel: "signal", messageId: "sig-text-2", timestamp: 789 });
});
});

View File

@ -135,6 +135,7 @@ export type ChannelOutboundContext = {
/** Send image as document to avoid Telegram compression. */
forceDocument?: boolean;
replyToId?: string | null;
quoteAuthor?: string | null;
threadId?: string | number | null;
accountId?: string | null;
identity?: OutboundIdentity;

View File

@ -0,0 +1,316 @@
import { afterEach, beforeEach, describe, expect, it, vi } from "vitest";
import type { ReplyPayload } from "../../auto-reply/types.js";
import { telegramOutbound } from "../../channels/plugins/outbound/telegram.js";
import type { OpenClawConfig } from "../../config/config.js";
import { setActivePluginRegistry } from "../../plugins/runtime.js";
import { createOutboundTestPlugin, createTestRegistry } from "../../test-utils/channel-plugins.js";
const { deliverOutboundPayloads } = await import("./deliver.js");
const defaultRegistry = createTestRegistry([
{
pluginId: "telegram",
plugin: createOutboundTestPlugin({ id: "telegram", outbound: telegramOutbound }),
source: "test",
},
]);
describe("deliverOutboundPayloads Greptile fixes", () => {
beforeEach(() => {
setActivePluginRegistry(defaultRegistry);
});
afterEach(() => {
setActivePluginRegistry(createTestRegistry());
});
it("retries replyToId on later non-signal text payloads after a best-effort failure", async () => {
const sendTelegram = vi
.fn()
.mockRejectedValueOnce(new Error("text fail"))
.mockResolvedValueOnce({ messageId: "m2", chatId: "chat-1" });
const onError = vi.fn();
const cfg: OpenClawConfig = {
channels: { telegram: { botToken: "tok-1", textChunkLimit: 2 } },
};
const results = await deliverOutboundPayloads({
cfg,
channel: "telegram",
to: "123",
payloads: [{ text: "ab" }, { text: "cd" }],
replyToId: "777",
deps: { sendTelegram },
bestEffort: true,
onError,
skipQueue: true,
});
expect(sendTelegram).toHaveBeenCalledTimes(2);
expect(sendTelegram.mock.calls[0]?.[2]).toEqual(
expect.objectContaining({ replyToMessageId: 777 }),
);
expect(sendTelegram.mock.calls[1]?.[2]).toEqual(
expect.objectContaining({ replyToMessageId: 777 }),
);
expect(onError).toHaveBeenCalledTimes(1);
expect(results).toEqual([{ messageId: "m2", chatId: "chat-1", channel: "telegram" }]);
});
it("retries replyToId on later sendPayload payloads after a best-effort failure", async () => {
const sendPayload = vi
.fn()
.mockRejectedValueOnce(new Error("payload fail"))
.mockResolvedValueOnce({ channel: "matrix", messageId: "mx-2" });
const sendText = vi.fn();
const sendMedia = vi.fn();
const onError = vi.fn();
setActivePluginRegistry(
createTestRegistry([
{
pluginId: "matrix",
source: "test",
plugin: createOutboundTestPlugin({
id: "matrix",
outbound: { deliveryMode: "direct", sendPayload, sendText, sendMedia },
}),
},
]),
);
const results = await deliverOutboundPayloads({
cfg: {},
channel: "matrix",
to: "!room:1",
payloads: [
{ text: "first", channelData: { mode: "custom" } },
{ text: "second", channelData: { mode: "custom" } },
],
replyToId: "orig-msg-id",
bestEffort: true,
onError,
skipQueue: true,
});
expect(sendPayload).toHaveBeenCalledTimes(2);
expect(sendPayload.mock.calls[0]?.[0]?.replyToId).toBe("orig-msg-id");
expect(sendPayload.mock.calls[1]?.[0]?.replyToId).toBe("orig-msg-id");
expect(onError).toHaveBeenCalledTimes(1);
expect(results).toEqual([{ channel: "matrix", messageId: "mx-2" }]);
});
it("preserves explicit null reply suppression without consuming the inherited reply", async () => {
const sendPayload = vi
.fn()
.mockResolvedValueOnce({ channel: "matrix", messageId: "mx-1" })
.mockResolvedValueOnce({ channel: "matrix", messageId: "mx-2" });
const sendText = vi.fn();
const sendMedia = vi.fn();
setActivePluginRegistry(
createTestRegistry([
{
pluginId: "matrix",
source: "test",
plugin: createOutboundTestPlugin({
id: "matrix",
outbound: { deliveryMode: "direct", sendPayload, sendText, sendMedia },
}),
},
]),
);
const payloads: ReplyPayload[] = [
{ text: "first", channelData: { mode: "custom" }, replyToId: null },
{ text: "second", channelData: { mode: "custom" } },
];
const results = await deliverOutboundPayloads({
cfg: {},
channel: "matrix",
to: "!room:1",
payloads,
replyToId: "orig-msg-id",
skipQueue: true,
});
expect(sendPayload).toHaveBeenCalledTimes(2);
expect(sendPayload.mock.calls[0]?.[0]?.replyToId).toBeUndefined();
expect(sendPayload.mock.calls[1]?.[0]?.replyToId).toBe("orig-msg-id");
expect(results).toEqual([
{ channel: "matrix", messageId: "mx-1" },
{ channel: "matrix", messageId: "mx-2" },
]);
});
it("treats replyToId: undefined as inherited reply metadata", async () => {
const sendPayload = vi
.fn()
.mockResolvedValueOnce({ channel: "matrix", messageId: "mx-1" })
.mockResolvedValueOnce({ channel: "matrix", messageId: "mx-2" });
const sendText = vi.fn();
const sendMedia = vi.fn();
setActivePluginRegistry(
createTestRegistry([
{
pluginId: "matrix",
source: "test",
plugin: createOutboundTestPlugin({
id: "matrix",
outbound: { deliveryMode: "direct", sendPayload, sendText, sendMedia },
}),
},
]),
);
const results = await deliverOutboundPayloads({
cfg: {},
channel: "matrix",
to: "!room:1",
payloads: [
{ text: "first", channelData: { mode: "custom" }, replyToId: undefined },
{ text: "second", channelData: { mode: "custom" } },
],
replyToId: "orig-msg-id",
skipQueue: true,
});
expect(sendPayload).toHaveBeenCalledTimes(2);
expect(sendPayload.mock.calls[0]?.[0]?.replyToId).toBe("orig-msg-id");
expect(sendPayload.mock.calls[1]?.[0]?.replyToId).toBeNull();
expect(results).toEqual([
{ channel: "matrix", messageId: "mx-1" },
{ channel: "matrix", messageId: "mx-2" },
]);
});
it("clears replyToId on later sendPayload payloads after the first successful send", async () => {
const sendPayload = vi
.fn()
.mockResolvedValueOnce({ channel: "matrix", messageId: "mx-1" })
.mockResolvedValueOnce({ channel: "matrix", messageId: "mx-2" });
const sendText = vi.fn();
const sendMedia = vi.fn();
setActivePluginRegistry(
createTestRegistry([
{
pluginId: "matrix",
source: "test",
plugin: createOutboundTestPlugin({
id: "matrix",
outbound: { deliveryMode: "direct", sendPayload, sendText, sendMedia },
}),
},
]),
);
const results = await deliverOutboundPayloads({
cfg: {},
channel: "matrix",
to: "!room:1",
payloads: [
{ text: "first", channelData: { mode: "custom" } },
{ text: "second", channelData: { mode: "custom" } },
],
replyToId: "orig-msg-id",
skipQueue: true,
});
expect(sendPayload).toHaveBeenCalledTimes(2);
expect(sendPayload.mock.calls[0]?.[0]?.replyToId).toBe("orig-msg-id");
expect(sendPayload.mock.calls[1]?.[0]?.replyToId).toBeNull();
expect(results).toEqual([
{ channel: "matrix", messageId: "mx-1" },
{ channel: "matrix", messageId: "mx-2" },
]);
});
it("preserves later explicit replyToId values after the first successful send", async () => {
const sendPayload = vi
.fn()
.mockResolvedValueOnce({ channel: "matrix", messageId: "mx-1" })
.mockResolvedValueOnce({ channel: "matrix", messageId: "mx-2" })
.mockResolvedValueOnce({ channel: "matrix", messageId: "mx-3" });
const sendText = vi.fn();
const sendMedia = vi.fn();
setActivePluginRegistry(
createTestRegistry([
{
pluginId: "matrix",
source: "test",
plugin: createOutboundTestPlugin({
id: "matrix",
outbound: { deliveryMode: "direct", sendPayload, sendText, sendMedia },
}),
},
]),
);
const results = await deliverOutboundPayloads({
cfg: {},
channel: "matrix",
to: "!room:1",
payloads: [
{ text: "first", replyToId: "reply-1", channelData: { mode: "custom" } },
{ text: "second", replyToId: "reply-2", channelData: { mode: "custom" } },
{ text: "third", replyToId: "reply-3", channelData: { mode: "custom" } },
],
skipQueue: true,
});
expect(sendPayload).toHaveBeenCalledTimes(3);
expect(sendPayload.mock.calls[0]?.[0]?.replyToId).toBe("reply-1");
expect(sendPayload.mock.calls[1]?.[0]?.replyToId).toBe("reply-2");
expect(sendPayload.mock.calls[2]?.[0]?.replyToId).toBe("reply-3");
expect(results).toEqual([
{ channel: "matrix", messageId: "mx-1" },
{ channel: "matrix", messageId: "mx-2" },
{ channel: "matrix", messageId: "mx-3" },
]);
});
it("retries replyToId on later non-signal media payloads after a best-effort failure", async () => {
const sendText = vi.fn();
const sendMedia = vi
.fn()
.mockRejectedValueOnce(new Error("media fail"))
.mockResolvedValueOnce({ channel: "matrix", messageId: "mx-media-2" });
const onError = vi.fn();
setActivePluginRegistry(
createTestRegistry([
{
pluginId: "matrix",
source: "test",
plugin: createOutboundTestPlugin({
id: "matrix",
outbound: { deliveryMode: "direct", sendText, sendMedia },
}),
},
]),
);
const results = await deliverOutboundPayloads({
cfg: {},
channel: "matrix",
to: "!room:1",
payloads: [
{ text: "first", mediaUrl: "https://example.com/1.jpg" },
{ text: "second", mediaUrl: "https://example.com/2.jpg" },
],
replyToId: "orig-msg-id",
bestEffort: true,
onError,
skipQueue: true,
});
expect(sendMedia).toHaveBeenCalledTimes(2);
expect(sendMedia.mock.calls[0]?.[0]?.replyToId).toBe("orig-msg-id");
expect(sendMedia.mock.calls[1]?.[0]?.replyToId).toBe("orig-msg-id");
expect(onError).toHaveBeenCalledTimes(1);
expect(results).toEqual([{ channel: "matrix", messageId: "mx-media-2" }]);
});
});

View File

@ -917,6 +917,55 @@ describe("deliverOutboundPayloads", () => {
expect(sendWhatsApp).not.toHaveBeenCalled();
});
it("does not re-apply Signal reply metadata after partial chunk failure in bestEffort mode", async () => {
const sendSignal = vi
.fn()
.mockResolvedValueOnce({ messageId: "s1", timestamp: 1 })
.mockRejectedValueOnce(new Error("chunk fail"))
.mockResolvedValueOnce({ messageId: "s3", timestamp: 3 });
const onError = vi.fn();
const cfg: OpenClawConfig = {
channels: { signal: { textChunkLimit: 2 } },
};
const results = await deliverOutboundPayloads({
cfg,
channel: "signal",
to: "+1555",
payloads: [{ text: "abcd" }, { text: "ef" }],
replyToId: "orig-msg-id",
quoteAuthor: "Tester",
deps: { sendSignal },
bestEffort: true,
onError,
});
expect(sendSignal).toHaveBeenCalledTimes(3);
expect(sendSignal).toHaveBeenNthCalledWith(
1,
"+1555",
expect.any(String),
expect.objectContaining({ replyTo: "orig-msg-id", quoteAuthor: "Tester" }),
);
expect(sendSignal).toHaveBeenNthCalledWith(
2,
"+1555",
expect.any(String),
expect.objectContaining({ replyTo: undefined, quoteAuthor: undefined }),
);
expect(sendSignal).toHaveBeenNthCalledWith(
3,
"+1555",
expect.any(String),
expect.objectContaining({ replyTo: undefined, quoteAuthor: undefined }),
);
expect(onError).toHaveBeenCalledTimes(1);
expect(results).toEqual([
{ channel: "signal", messageId: "s1", timestamp: 1 },
{ channel: "signal", messageId: "s3", timestamp: 3 },
]);
});
it("passes normalized payload to onError", async () => {
const sendWhatsApp = vi.fn().mockRejectedValue(new Error("boom"));
const onError = vi.fn();

View File

@ -9,12 +9,14 @@ import {
resolveTextChunkLimit,
} from "../../auto-reply/chunk.js";
import type { ReplyPayload } from "../../auto-reply/types.js";
import { resolveChannelMediaMaxBytes } from "../../channels/plugins/media-limits.js";
import { loadChannelOutboundAdapter } from "../../channels/plugins/outbound/load.js";
import type {
ChannelOutboundAdapter,
ChannelOutboundContext,
} from "../../channels/plugins/types.js";
import type { OpenClawConfig } from "../../config/config.js";
import { resolveMarkdownTableMode } from "../../config/markdown-tables.js";
import {
appendAssistantMessageToSessionTranscript,
resolveMirroredTranscriptText,
@ -42,6 +44,11 @@ import { isPlainTextSurface, sanitizeForPlainText } from "./sanitize-text.js";
import { resolveOutboundSendDep, type OutboundSendDeps } from "./send-deps.js";
import type { OutboundSessionContext } from "./session-context.js";
import type { OutboundChannel } from "./targets.js";
import {
markdownToSignalTextChunks,
type SignalTextStyleRange,
} from "../../../extensions/signal/src/format.js";
import { sendMessageSignal } from "../../../extensions/signal/src/send.js";
export type { NormalizedOutboundPayload } from "./payloads.js";
export { normalizeOutboundPayloads } from "./payloads.js";
@ -118,6 +125,7 @@ type ChannelHandlerParams = {
to: string;
accountId?: string;
replyToId?: string | null;
quoteAuthor?: string | null;
threadId?: string | number | null;
identity?: OutboundIdentity;
deps?: OutboundSendDeps;
@ -161,8 +169,9 @@ function createPluginHandler(
threadId?: string | number | null;
}): Omit<ChannelOutboundContext, "text" | "mediaUrl"> => ({
...baseCtx,
replyToId: overrides?.replyToId ?? baseCtx.replyToId,
threadId: overrides?.threadId ?? baseCtx.threadId,
// Preserve explicit null overrides so later payloads can suppress inherited reply/thread metadata.
replyToId: overrides && "replyToId" in overrides ? overrides.replyToId : baseCtx.replyToId,
threadId: overrides && "threadId" in overrides ? overrides.threadId : baseCtx.threadId,
});
return {
chunker,
@ -223,6 +232,7 @@ function createPluginHandler(
return sendText({
...resolveCtx(overrides),
text: caption,
mediaUrl,
});
},
};
@ -236,6 +246,7 @@ function createChannelOutboundContextBase(
to: params.to,
accountId: params.accountId,
replyToId: params.replyToId,
quoteAuthor: params.quoteAuthor,
threadId: params.threadId,
identity: params.identity,
gifPlayback: params.gifPlayback,
@ -255,6 +266,7 @@ type DeliverOutboundPayloadsCoreParams = {
accountId?: string;
payloads: ReplyPayload[];
replyToId?: string | null;
quoteAuthor?: string | null;
threadId?: string | number | null;
identity?: OutboundIdentity;
deps?: OutboundSendDeps;
@ -541,6 +553,8 @@ async function deliverOutboundPayloadsCore(
const accountId = params.accountId;
const deps = params.deps;
const abortSignal = params.abortSignal;
const sendSignal =
resolveOutboundSendDep<typeof sendMessageSignal>(params.deps, "signal") ?? sendMessageSignal;
const mediaLocalRoots = getAgentScopedMediaLocalRoots(
cfg,
params.session?.agentId ?? params.mirror?.agentId,
@ -553,6 +567,7 @@ async function deliverOutboundPayloadsCore(
deps,
accountId,
replyToId: params.replyToId,
quoteAuthor: params.quoteAuthor,
threadId: params.threadId,
identity: params.identity,
gifPlayback: params.gifPlayback,
@ -569,6 +584,19 @@ async function deliverOutboundPayloadsCore(
? handler.resolveEffectiveTextChunkLimit(configuredTextLimit)
: configuredTextLimit;
const chunkMode = handler.chunker ? resolveChunkMode(cfg, channel, accountId) : "length";
const isSignalChannel = channel === "signal";
const signalTableMode = isSignalChannel
? resolveMarkdownTableMode({ cfg, channel: "signal", accountId })
: "code";
const signalMaxBytes = isSignalChannel
? resolveChannelMediaMaxBytes({
cfg,
resolveChannelLimitMb: ({ cfg, accountId }) =>
cfg.channels?.signal?.accounts?.[accountId]?.mediaMaxMb ??
cfg.channels?.signal?.mediaMaxMb,
accountId,
})
: undefined;
const sendTextChunks = async (
text: string,
@ -608,6 +636,81 @@ async function deliverOutboundPayloadsCore(
}
};
const normalizedPayloads = normalizePayloadsForChannelDelivery(payloads, channel, handler);
const sendSignalText = async (
text: string,
styles: SignalTextStyleRange[],
replyTo?: string,
quoteAuthor?: string,
) => {
throwIfAborted(abortSignal);
return {
channel: "signal" as const,
...(await sendSignal(to, text, {
cfg,
maxBytes: signalMaxBytes,
accountId: accountId ?? undefined,
textMode: "plain",
textStyles: styles,
replyTo,
quoteAuthor,
})),
};
};
const sendSignalTextChunks = async (text: string, replyTo?: string, quoteAuthor?: string) => {
throwIfAborted(abortSignal);
let signalChunks =
textLimit === undefined
? markdownToSignalTextChunks(text, Number.POSITIVE_INFINITY, {
tableMode: signalTableMode,
})
: markdownToSignalTextChunks(text, textLimit, { tableMode: signalTableMode });
if (signalChunks.length === 0 && text) {
signalChunks = [{ text, styles: [] }];
}
let first = true;
for (const chunk of signalChunks) {
throwIfAborted(abortSignal);
results.push(
await sendSignalText(
chunk.text,
chunk.styles,
first ? replyTo : undefined,
first ? quoteAuthor : undefined,
),
);
first = false;
}
};
const sendSignalMedia = async (
caption: string,
mediaUrl: string,
replyTo?: string,
quoteAuthor?: string,
) => {
throwIfAborted(abortSignal);
const formatted = markdownToSignalTextChunks(caption, Number.POSITIVE_INFINITY, {
tableMode: signalTableMode,
})[0] ?? {
text: caption,
styles: [],
};
return {
channel: "signal" as const,
...(await sendSignal(to, formatted.text, {
cfg,
mediaUrl,
maxBytes: signalMaxBytes,
accountId: accountId ?? undefined,
textMode: "plain",
textStyles: formatted.styles,
mediaLocalRoots,
replyTo,
quoteAuthor,
})),
};
};
const hookRunner = getGlobalHookRunner();
const sessionKeyForInternalHooks = params.mirror?.sessionKey ?? params.session?.key;
const mirrorIsGroup = params.mirror?.isGroup;
@ -632,6 +735,34 @@ async function deliverOutboundPayloadsCore(
},
);
}
let replyConsumed = false;
const markReplyConsumedIfSendSucceeded = (
replyTo: string | undefined,
resultCountBeforeSend: number,
) => {
if (replyTo && results.length > resultCountBeforeSend) {
replyConsumed = true;
}
};
const trackReplyConsumption = async <T>(
replyTo: string | undefined,
send: () => Promise<T>,
sent?: (value: T) => boolean,
): Promise<T> => {
const resultCountBeforeSend = results.length;
try {
const value = await send();
if (replyTo && (sent?.(value) ?? results.length > resultCountBeforeSend)) {
replyConsumed = true;
}
return value;
} catch (err) {
// Best-effort delivery should only consume reply metadata once a quoted send
// actually succeeded, even if a later chunk/send in the same payload fails.
markReplyConsumedIfSendSucceeded(replyTo, resultCountBeforeSend);
throw err;
}
};
for (const payload of normalizedPayloads) {
let payloadSummary = buildPayloadSummary(payload);
try {
@ -654,10 +785,37 @@ async function deliverOutboundPayloadsCore(
payloadSummary = hookResult.payloadSummary;
params.onPayload?.(payloadSummary);
const sendOverrides = {
replyToId: effectivePayload.replyToId ?? params.replyToId ?? undefined,
// Treat null as an explicit "do not reply" override so payload-level suppression
// is preserved instead of falling back to inherited reply metadata.
// NOTE: Many payload builders emit replyToId: undefined. Treat that as "no opinion"
// so the inherited reply metadata can still apply.
const explicitPayloadReplyTo =
"replyToId" in effectivePayload && effectivePayload.replyToId !== undefined
? effectivePayload.replyToId
: undefined;
const inheritedReplyTo =
explicitPayloadReplyTo === undefined ? (params.replyToId ?? undefined) : undefined;
const effectiveReplyTo =
explicitPayloadReplyTo != null
? explicitPayloadReplyTo
: !replyConsumed
? inheritedReplyTo
: undefined;
// NOTE: quoteAuthor is derived from the top-level params, not resolved per-payload.
// The normal Signal inbound→outbound path (monitor.ts:deliverReplies) resolves
// quoteAuthor per-payload via resolveQuoteAuthor(effectiveReplyTo). If a future
// caller uses deliverOutboundPayloads directly with per-payload replyToId overrides,
// it should supply its own quoteAuthor resolution or extend this logic.
const effectiveQuoteAuthor = effectiveReplyTo ? (params.quoteAuthor ?? undefined) : undefined;
const effectiveSendOverrides = {
threadId: params.threadId ?? undefined,
forceDocument: params.forceDocument,
replyToId:
explicitPayloadReplyTo !== undefined // explicit payload override (string or null)
? (explicitPayloadReplyTo ?? undefined) // null → undefined for the wire format
: inheritedReplyTo !== undefined
? (effectiveReplyTo ?? null)
: undefined,
};
if (
handler.sendPayload &&
@ -666,7 +824,29 @@ async function deliverOutboundPayloadsCore(
channelData: effectivePayload.channelData,
})
) {
const delivery = await handler.sendPayload(effectivePayload, sendOverrides);
// Materialize the resolved replyToId on the payload so downstream handlers see
// the correct value:
// - string: reply applied
// - null: reply metadata was inherited but has been consumed (explicitly no reply)
// - undefined: no reply metadata was ever present (no opinion)
const resolvedReplyToId =
effectiveReplyTo !== undefined
? effectiveReplyTo
: inheritedReplyTo !== undefined
? null
: undefined;
const resolvedPayload = { ...effectivePayload, replyToId: resolvedReplyToId };
const delivery = await trackReplyConsumption(
effectiveReplyTo,
() =>
handler.sendPayload!(
resolvedPayload as typeof effectivePayload,
effectiveSendOverrides,
),
// Any non-undefined return counts as "sent" — channels that don't return
// messageId (e.g. matrix) should still consume the reply indicator.
(value) => value !== undefined,
);
results.push(delivery);
emitMessageSent({
success: true,
@ -675,12 +855,19 @@ async function deliverOutboundPayloadsCore(
});
continue;
}
if (payloadSummary.mediaUrls.length === 0) {
const beforeCount = results.length;
if (handler.sendFormattedText) {
results.push(...(await handler.sendFormattedText(payloadSummary.text, sendOverrides)));
if (isSignalChannel) {
await trackReplyConsumption(effectiveReplyTo, () =>
sendSignalTextChunks(payloadSummary.text, effectiveReplyTo, effectiveQuoteAuthor),
);
} else if (handler.sendFormattedText) {
results.push(...(await handler.sendFormattedText(payloadSummary.text, effectiveSendOverrides)));
} else {
await sendTextChunks(payloadSummary.text, sendOverrides);
await trackReplyConsumption(effectiveReplyTo, () =>
sendTextChunks(payloadSummary.text, effectiveSendOverrides),
);
}
const messageId = results.at(-1)?.messageId;
emitMessageSent({
@ -707,7 +894,9 @@ async function deliverOutboundPayloadsCore(
);
}
const beforeCount = results.length;
await sendTextChunks(fallbackText, sendOverrides);
await trackReplyConsumption(effectiveReplyTo, () =>
sendTextChunks(fallbackText, effectiveSendOverrides),
);
const messageId = results.at(-1)?.messageId;
emitMessageSent({
success: results.length > beforeCount,
@ -717,26 +906,46 @@ async function deliverOutboundPayloadsCore(
continue;
}
let first = true;
let lastMessageId: string | undefined;
await sendMediaWithLeadingCaption({
mediaUrls: payloadSummary.mediaUrls,
caption: payloadSummary.text,
send: async ({ mediaUrl, caption }) => {
throwIfAborted(abortSignal);
if (handler.sendFormattedMedia) {
const delivery = await handler.sendFormattedMedia(
await trackReplyConsumption(effectiveReplyTo, async () => {
if (isSignalChannel) {
for (const url of payloadSummary.mediaUrls) {
throwIfAborted(abortSignal);
const caption = first ? payloadSummary.text : "";
const mediaReplyTo = first ? effectiveReplyTo : undefined;
const mediaQuoteAuthor = first ? effectiveQuoteAuthor : undefined;
const delivery = await sendSignalMedia(caption, url, mediaReplyTo, mediaQuoteAuthor);
results.push(delivery);
lastMessageId = delivery.messageId;
first = false;
}
return;
}
await sendMediaWithLeadingCaption({
mediaUrls: payloadSummary.mediaUrls,
caption: payloadSummary.text,
send: async ({ mediaUrl, caption }) => {
throwIfAborted(abortSignal);
if (handler.sendFormattedMedia) {
const delivery = await handler.sendFormattedMedia(
caption ?? "",
mediaUrl,
effectiveSendOverrides,
);
results.push(delivery);
lastMessageId = delivery.messageId;
return;
}
const delivery = await handler.sendMedia(
caption ?? "",
mediaUrl,
sendOverrides,
effectiveSendOverrides,
);
results.push(delivery);
lastMessageId = delivery.messageId;
return;
}
const delivery = await handler.sendMedia(caption ?? "", mediaUrl, sendOverrides);
results.push(delivery);
lastMessageId = delivery.messageId;
},
},
});
});
emitMessageSent({
success: true,

View File

@ -67,6 +67,8 @@ export function normalizeReplyPayloadsForDelivery(
);
const hasMultipleMedia = (explicitMediaUrls?.length ?? 0) > 1;
const resolvedMediaUrl = hasMultipleMedia ? undefined : explicitMediaUrl;
const resolvedReplyToId =
payload.replyToId === null ? null : (payload.replyToId ?? parsed.replyToId);
const next: ReplyPayload = {
...payload,
text:
@ -76,7 +78,7 @@ export function normalizeReplyPayloadsForDelivery(
}) ?? "",
mediaUrls: mergedMedia.length ? mergedMedia : undefined,
mediaUrl: resolvedMediaUrl,
replyToId: payload.replyToId ?? parsed.replyToId,
...(resolvedReplyToId !== undefined ? { replyToId: resolvedReplyToId } : {}),
replyToTag: payload.replyToTag || parsed.replyToTag,
replyToCurrent: payload.replyToCurrent || parsed.replyToCurrent,
audioAsVoice: Boolean(payload.audioAsVoice || parsed.audioAsVoice),

View File

@ -7,7 +7,8 @@ export type OutboundReplyPayload = {
text?: string;
mediaUrls?: string[];
mediaUrl?: string;
replyToId?: string;
// null explicitly suppresses inherited reply metadata (distinct from undefined = "not set")
replyToId?: string | null;
};
export type SendableOutboundReplyParts = {
@ -38,7 +39,12 @@ export function normalizeOutboundReplyPayload(
)
: undefined;
const mediaUrl = typeof payload.mediaUrl === "string" ? payload.mediaUrl : undefined;
const replyToId = typeof payload.replyToId === "string" ? payload.replyToId : undefined;
const replyToId =
typeof payload.replyToId === "string"
? payload.replyToId
: payload.replyToId === null
? null
: undefined;
return {
text,
mediaUrls,