openclaw/src/signal/monitor/event-handler.ts
2026-02-26 21:57:52 +01:00

735 lines
25 KiB
TypeScript
Raw Blame History

This file contains invisible Unicode characters

This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import { resolveHumanDelayConfig } from "../../agents/identity.js";
import { hasControlCommand } from "../../auto-reply/command-detection.js";
import { dispatchInboundMessage } from "../../auto-reply/dispatch.js";
import {
formatInboundEnvelope,
formatInboundFromLabel,
resolveEnvelopeFormatOptions,
} from "../../auto-reply/envelope.js";
import {
createInboundDebouncer,
resolveInboundDebounceMs,
} from "../../auto-reply/inbound-debounce.js";
import {
buildPendingHistoryContextFromMap,
clearHistoryEntriesIfEnabled,
recordPendingHistoryEntryIfEnabled,
} from "../../auto-reply/reply/history.js";
import { finalizeInboundContext } from "../../auto-reply/reply/inbound-context.js";
import { buildMentionRegexes, matchesMentionPatterns } from "../../auto-reply/reply/mentions.js";
import { createReplyDispatcherWithTyping } from "../../auto-reply/reply/reply-dispatcher.js";
import { resolveControlCommandGate } from "../../channels/command-gating.js";
import { logInboundDrop, logTypingFailure } from "../../channels/logging.js";
import { resolveMentionGatingWithBypass } from "../../channels/mention-gating.js";
import { normalizeSignalMessagingTarget } from "../../channels/plugins/normalize/signal.js";
import { createReplyPrefixOptions } from "../../channels/reply-prefix.js";
import { recordInboundSession } from "../../channels/session.js";
import { createTypingCallbacks } from "../../channels/typing.js";
import { resolveChannelGroupRequireMention } from "../../config/group-policy.js";
import { readSessionUpdatedAt, resolveStorePath } from "../../config/sessions.js";
import { danger, logVerbose, shouldLogVerbose } from "../../globals.js";
import { enqueueSystemEvent } from "../../infra/system-events.js";
import { mediaKindFromMime } from "../../media/constants.js";
import { buildPairingReply } from "../../pairing/pairing-messages.js";
import { upsertChannelPairingRequest } from "../../pairing/pairing-store.js";
import { resolveAgentRoute } from "../../routing/resolve-route.js";
import {
DM_GROUP_ACCESS_REASON,
readStoreAllowFromForDmPolicy,
resolveDmGroupAccessWithLists,
} from "../../security/dm-policy-shared.js";
import { normalizeE164 } from "../../utils.js";
import {
formatSignalPairingIdLine,
formatSignalSenderDisplay,
formatSignalSenderId,
isSignalSenderAllowed,
resolveSignalPeerId,
resolveSignalRecipient,
resolveSignalSender,
type SignalSender,
} from "../identity.js";
import { sendMessageSignal, sendReadReceiptSignal, sendTypingSignal } from "../send.js";
import type {
SignalEnvelope,
SignalEventHandlerDeps,
SignalReactionMessage,
SignalReceivePayload,
} from "./event-handler.types.js";
import { renderSignalMentions } from "./mentions.js";
export function createSignalEventHandler(deps: SignalEventHandlerDeps) {
const inboundDebounceMs = resolveInboundDebounceMs({ cfg: deps.cfg, channel: "signal" });
type SignalInboundEntry = {
senderName: string;
senderDisplay: string;
senderRecipient: string;
senderPeerId: string;
groupId?: string;
groupName?: string;
isGroup: boolean;
bodyText: string;
timestamp?: number;
messageId?: string;
mediaPath?: string;
mediaType?: string;
commandAuthorized: boolean;
wasMentioned?: boolean;
};
async function handleSignalInboundMessage(entry: SignalInboundEntry) {
const fromLabel = formatInboundFromLabel({
isGroup: entry.isGroup,
groupLabel: entry.groupName ?? undefined,
groupId: entry.groupId ?? "unknown",
groupFallback: "Group",
directLabel: entry.senderName,
directId: entry.senderDisplay,
});
const route = resolveAgentRoute({
cfg: deps.cfg,
channel: "signal",
accountId: deps.accountId,
peer: {
kind: entry.isGroup ? "group" : "direct",
id: entry.isGroup ? (entry.groupId ?? "unknown") : entry.senderPeerId,
},
});
const storePath = resolveStorePath(deps.cfg.session?.store, {
agentId: route.agentId,
});
const envelopeOptions = resolveEnvelopeFormatOptions(deps.cfg);
const previousTimestamp = readSessionUpdatedAt({
storePath,
sessionKey: route.sessionKey,
});
const body = formatInboundEnvelope({
channel: "Signal",
from: fromLabel,
timestamp: entry.timestamp ?? undefined,
body: entry.bodyText,
chatType: entry.isGroup ? "group" : "direct",
sender: { name: entry.senderName, id: entry.senderDisplay },
previousTimestamp,
envelope: envelopeOptions,
});
let combinedBody = body;
const historyKey = entry.isGroup ? String(entry.groupId ?? "unknown") : undefined;
if (entry.isGroup && historyKey) {
combinedBody = buildPendingHistoryContextFromMap({
historyMap: deps.groupHistories,
historyKey,
limit: deps.historyLimit,
currentMessage: combinedBody,
formatEntry: (historyEntry) =>
formatInboundEnvelope({
channel: "Signal",
from: fromLabel,
timestamp: historyEntry.timestamp,
body: `${historyEntry.body}${
historyEntry.messageId ? ` [id:${historyEntry.messageId}]` : ""
}`,
chatType: "group",
senderLabel: historyEntry.sender,
envelope: envelopeOptions,
}),
});
}
const signalToRaw = entry.isGroup
? `group:${entry.groupId}`
: `signal:${entry.senderRecipient}`;
const signalTo = normalizeSignalMessagingTarget(signalToRaw) ?? signalToRaw;
const inboundHistory =
entry.isGroup && historyKey && deps.historyLimit > 0
? (deps.groupHistories.get(historyKey) ?? []).map((historyEntry) => ({
sender: historyEntry.sender,
body: historyEntry.body,
timestamp: historyEntry.timestamp,
}))
: undefined;
const ctxPayload = finalizeInboundContext({
Body: combinedBody,
BodyForAgent: entry.bodyText,
InboundHistory: inboundHistory,
RawBody: entry.bodyText,
CommandBody: entry.bodyText,
From: entry.isGroup
? `group:${entry.groupId ?? "unknown"}`
: `signal:${entry.senderRecipient}`,
To: signalTo,
SessionKey: route.sessionKey,
AccountId: route.accountId,
ChatType: entry.isGroup ? "group" : "direct",
ConversationLabel: fromLabel,
GroupSubject: entry.isGroup ? (entry.groupName ?? undefined) : undefined,
SenderName: entry.senderName,
SenderId: entry.senderDisplay,
Provider: "signal" as const,
Surface: "signal" as const,
MessageSid: entry.messageId,
Timestamp: entry.timestamp ?? undefined,
MediaPath: entry.mediaPath,
MediaType: entry.mediaType,
MediaUrl: entry.mediaPath,
WasMentioned: entry.isGroup ? entry.wasMentioned === true : undefined,
CommandAuthorized: entry.commandAuthorized,
OriginatingChannel: "signal" as const,
OriginatingTo: signalTo,
});
await recordInboundSession({
storePath,
sessionKey: ctxPayload.SessionKey ?? route.sessionKey,
ctx: ctxPayload,
updateLastRoute: !entry.isGroup
? {
sessionKey: route.mainSessionKey,
channel: "signal",
to: entry.senderRecipient,
accountId: route.accountId,
}
: undefined,
onRecordError: (err) => {
logVerbose(`signal: failed updating session meta: ${String(err)}`);
},
});
if (shouldLogVerbose()) {
const preview = body.slice(0, 200).replace(/\\n/g, "\\\\n");
logVerbose(`signal inbound: from=${ctxPayload.From} len=${body.length} preview="${preview}"`);
}
const { onModelSelected, ...prefixOptions } = createReplyPrefixOptions({
cfg: deps.cfg,
agentId: route.agentId,
channel: "signal",
accountId: route.accountId,
});
const typingCallbacks = createTypingCallbacks({
start: async () => {
if (!ctxPayload.To) {
return;
}
await sendTypingSignal(ctxPayload.To, {
baseUrl: deps.baseUrl,
account: deps.account,
accountId: deps.accountId,
});
},
onStartError: (err) => {
logTypingFailure({
log: logVerbose,
channel: "signal",
target: ctxPayload.To ?? undefined,
error: err,
});
},
});
const { dispatcher, replyOptions, markDispatchIdle } = createReplyDispatcherWithTyping({
...prefixOptions,
humanDelay: resolveHumanDelayConfig(deps.cfg, route.agentId),
typingCallbacks,
deliver: async (payload) => {
await deps.deliverReplies({
replies: [payload],
target: ctxPayload.To,
baseUrl: deps.baseUrl,
account: deps.account,
accountId: deps.accountId,
runtime: deps.runtime,
maxBytes: deps.mediaMaxBytes,
textLimit: deps.textLimit,
});
},
onError: (err, info) => {
deps.runtime.error?.(danger(`signal ${info.kind} reply failed: ${String(err)}`));
},
});
const { queuedFinal } = await dispatchInboundMessage({
ctx: ctxPayload,
cfg: deps.cfg,
dispatcher,
replyOptions: {
...replyOptions,
disableBlockStreaming:
typeof deps.blockStreaming === "boolean" ? !deps.blockStreaming : undefined,
onModelSelected,
},
});
markDispatchIdle();
if (!queuedFinal) {
if (entry.isGroup && historyKey) {
clearHistoryEntriesIfEnabled({
historyMap: deps.groupHistories,
historyKey,
limit: deps.historyLimit,
});
}
return;
}
if (entry.isGroup && historyKey) {
clearHistoryEntriesIfEnabled({
historyMap: deps.groupHistories,
historyKey,
limit: deps.historyLimit,
});
}
}
const inboundDebouncer = createInboundDebouncer<SignalInboundEntry>({
debounceMs: inboundDebounceMs,
buildKey: (entry) => {
const conversationId = entry.isGroup ? (entry.groupId ?? "unknown") : entry.senderPeerId;
if (!conversationId || !entry.senderPeerId) {
return null;
}
return `signal:${deps.accountId}:${conversationId}:${entry.senderPeerId}`;
},
shouldDebounce: (entry) => {
if (!entry.bodyText.trim()) {
return false;
}
if (entry.mediaPath || entry.mediaType) {
return false;
}
return !hasControlCommand(entry.bodyText, deps.cfg);
},
onFlush: async (entries) => {
const last = entries.at(-1);
if (!last) {
return;
}
if (entries.length === 1) {
await handleSignalInboundMessage(last);
return;
}
const combinedText = entries
.map((entry) => entry.bodyText)
.filter(Boolean)
.join("\\n");
if (!combinedText.trim()) {
return;
}
await handleSignalInboundMessage({
...last,
bodyText: combinedText,
mediaPath: undefined,
mediaType: undefined,
});
},
onError: (err) => {
deps.runtime.error?.(`signal debounce flush failed: ${String(err)}`);
},
});
function handleReactionOnlyInbound(params: {
envelope: SignalEnvelope;
sender: SignalSender;
senderDisplay: string;
reaction: SignalReactionMessage;
hasBodyContent: boolean;
resolveAccessDecision: (isGroup: boolean) => {
decision: "allow" | "block" | "pairing";
reason: string;
};
}): boolean {
if (params.hasBodyContent) {
return false;
}
if (params.reaction.isRemove) {
return true; // Ignore reaction removals
}
const emojiLabel = params.reaction.emoji?.trim() || "emoji";
const senderName = params.envelope.sourceName ?? params.senderDisplay;
logVerbose(`signal reaction: ${emojiLabel} from ${senderName}`);
const groupId = params.reaction.groupInfo?.groupId ?? undefined;
const groupName = params.reaction.groupInfo?.groupName ?? undefined;
const isGroup = Boolean(groupId);
const reactionAccess = params.resolveAccessDecision(isGroup);
if (reactionAccess.decision !== "allow") {
logVerbose(
`Blocked signal reaction sender ${params.senderDisplay} (${reactionAccess.reason})`,
);
return true;
}
const targets = deps.resolveSignalReactionTargets(params.reaction);
const shouldNotify = deps.shouldEmitSignalReactionNotification({
mode: deps.reactionMode,
account: deps.account,
targets,
sender: params.sender,
allowlist: deps.reactionAllowlist,
});
if (!shouldNotify) {
return true;
}
const senderPeerId = resolveSignalPeerId(params.sender);
const route = resolveAgentRoute({
cfg: deps.cfg,
channel: "signal",
accountId: deps.accountId,
peer: {
kind: isGroup ? "group" : "direct",
id: isGroup ? (groupId ?? "unknown") : senderPeerId,
},
});
const groupLabel = isGroup ? `${groupName ?? "Signal Group"} id:${groupId}` : undefined;
const messageId = params.reaction.targetSentTimestamp
? String(params.reaction.targetSentTimestamp)
: "unknown";
const text = deps.buildSignalReactionSystemEventText({
emojiLabel,
actorLabel: senderName,
messageId,
targetLabel: targets[0]?.display,
groupLabel,
});
const senderId = formatSignalSenderId(params.sender);
const contextKey = [
"signal",
"reaction",
"added",
messageId,
senderId,
emojiLabel,
groupId ?? "",
]
.filter(Boolean)
.join(":");
enqueueSystemEvent(text, { sessionKey: route.sessionKey, contextKey });
return true;
}
return async (event: { event?: string; data?: string }) => {
if (event.event !== "receive" || !event.data) {
return;
}
let payload: SignalReceivePayload | null = null;
try {
payload = JSON.parse(event.data) as SignalReceivePayload;
} catch (err) {
deps.runtime.error?.(`failed to parse event: ${String(err)}`);
return;
}
if (payload?.exception?.message) {
deps.runtime.error?.(`receive exception: ${payload.exception.message}`);
}
const envelope = payload?.envelope;
if (!envelope) {
return;
}
if (envelope.syncMessage) {
return;
}
const sender = resolveSignalSender(envelope);
if (!sender) {
return;
}
if (deps.account && sender.kind === "phone") {
if (sender.e164 === normalizeE164(deps.account)) {
return;
}
}
const dataMessage = envelope.dataMessage ?? envelope.editMessage?.dataMessage;
const reaction = deps.isSignalReactionMessage(envelope.reactionMessage)
? envelope.reactionMessage
: deps.isSignalReactionMessage(dataMessage?.reaction)
? dataMessage?.reaction
: null;
// Replace (object replacement character) with @uuid or @phone from mentions
// Signal encodes mentions as the object replacement character; hydrate them from metadata first.
const rawMessage = dataMessage?.message ?? "";
const normalizedMessage = renderSignalMentions(rawMessage, dataMessage?.mentions);
const messageText = normalizedMessage.trim();
const quoteText = dataMessage?.quote?.text?.trim() ?? "";
const hasBodyContent =
Boolean(messageText || quoteText) || Boolean(!reaction && dataMessage?.attachments?.length);
const senderDisplay = formatSignalSenderDisplay(sender);
const storeAllowFrom = await readStoreAllowFromForDmPolicy({
provider: "signal",
accountId: deps.accountId,
dmPolicy: deps.dmPolicy,
});
const resolveAccessDecision = (isGroup: boolean) =>
resolveDmGroupAccessWithLists({
isGroup,
dmPolicy: deps.dmPolicy,
groupPolicy: deps.groupPolicy,
allowFrom: deps.allowFrom,
groupAllowFrom: deps.groupAllowFrom,
storeAllowFrom,
isSenderAllowed: (allowEntries) => isSignalSenderAllowed(sender, allowEntries),
});
const dmAccess = resolveAccessDecision(false);
const effectiveDmAllow = dmAccess.effectiveAllowFrom;
const effectiveGroupAllow = dmAccess.effectiveGroupAllowFrom;
if (
reaction &&
handleReactionOnlyInbound({
envelope,
sender,
senderDisplay,
reaction,
hasBodyContent,
resolveAccessDecision,
})
) {
return;
}
if (!dataMessage) {
return;
}
const senderRecipient = resolveSignalRecipient(sender);
const senderPeerId = resolveSignalPeerId(sender);
const senderAllowId = formatSignalSenderId(sender);
if (!senderRecipient) {
return;
}
const senderIdLine = formatSignalPairingIdLine(sender);
const groupId = dataMessage.groupInfo?.groupId ?? undefined;
const groupName = dataMessage.groupInfo?.groupName ?? undefined;
const isGroup = Boolean(groupId);
if (!isGroup) {
if (dmAccess.decision === "block") {
if (deps.dmPolicy !== "disabled") {
logVerbose(`Blocked signal sender ${senderDisplay} (dmPolicy=${deps.dmPolicy})`);
}
return;
}
if (dmAccess.decision === "pairing") {
if (deps.dmPolicy === "pairing") {
const senderId = senderAllowId;
const { code, created } = await upsertChannelPairingRequest({
channel: "signal",
id: senderId,
accountId: deps.accountId,
meta: { name: envelope.sourceName ?? undefined },
});
if (created) {
logVerbose(`signal pairing request sender=${senderId}`);
try {
await sendMessageSignal(
`signal:${senderRecipient}`,
buildPairingReply({
channel: "signal",
idLine: senderIdLine,
code,
}),
{
baseUrl: deps.baseUrl,
account: deps.account,
maxBytes: deps.mediaMaxBytes,
accountId: deps.accountId,
},
);
} catch (err) {
logVerbose(`signal pairing reply failed for ${senderId}: ${String(err)}`);
}
}
}
return;
}
}
if (isGroup) {
const groupAccess = resolveAccessDecision(true);
if (groupAccess.decision !== "allow") {
if (groupAccess.reasonCode === DM_GROUP_ACCESS_REASON.GROUP_POLICY_DISABLED) {
logVerbose("Blocked signal group message (groupPolicy: disabled)");
} else if (groupAccess.reasonCode === DM_GROUP_ACCESS_REASON.GROUP_POLICY_EMPTY_ALLOWLIST) {
logVerbose("Blocked signal group message (groupPolicy: allowlist, no groupAllowFrom)");
} else {
logVerbose(`Blocked signal group sender ${senderDisplay} (not in groupAllowFrom)`);
}
return;
}
}
const useAccessGroups = deps.cfg.commands?.useAccessGroups !== false;
const commandDmAllow = isGroup ? deps.allowFrom : effectiveDmAllow;
const ownerAllowedForCommands = isSignalSenderAllowed(sender, commandDmAllow);
const groupAllowedForCommands = isSignalSenderAllowed(sender, effectiveGroupAllow);
const hasControlCommandInMessage = hasControlCommand(messageText, deps.cfg);
const commandGate = resolveControlCommandGate({
useAccessGroups,
authorizers: [
{ configured: commandDmAllow.length > 0, allowed: ownerAllowedForCommands },
{ configured: effectiveGroupAllow.length > 0, allowed: groupAllowedForCommands },
],
allowTextCommands: true,
hasControlCommand: hasControlCommandInMessage,
});
const commandAuthorized = commandGate.commandAuthorized;
if (isGroup && commandGate.shouldBlock) {
logInboundDrop({
log: logVerbose,
channel: "signal",
reason: "control command (unauthorized)",
target: senderDisplay,
});
return;
}
const route = resolveAgentRoute({
cfg: deps.cfg,
channel: "signal",
accountId: deps.accountId,
peer: {
kind: isGroup ? "group" : "direct",
id: isGroup ? (groupId ?? "unknown") : senderPeerId,
},
});
const mentionRegexes = buildMentionRegexes(deps.cfg, route.agentId);
const wasMentioned = isGroup && matchesMentionPatterns(messageText, mentionRegexes);
const requireMention =
isGroup &&
resolveChannelGroupRequireMention({
cfg: deps.cfg,
channel: "signal",
groupId,
accountId: deps.accountId,
});
const canDetectMention = mentionRegexes.length > 0;
const mentionGate = resolveMentionGatingWithBypass({
isGroup,
requireMention: Boolean(requireMention),
canDetectMention,
wasMentioned,
implicitMention: false,
hasAnyMention: false,
allowTextCommands: true,
hasControlCommand: hasControlCommandInMessage,
commandAuthorized,
});
const effectiveWasMentioned = mentionGate.effectiveWasMentioned;
if (isGroup && requireMention && canDetectMention && mentionGate.shouldSkip) {
logInboundDrop({
log: logVerbose,
channel: "signal",
reason: "no mention",
target: senderDisplay,
});
const quoteText = dataMessage.quote?.text?.trim() || "";
const pendingPlaceholder = (() => {
if (!dataMessage.attachments?.length) {
return "";
}
// When we're skipping a message we intentionally avoid downloading attachments.
// Still record a useful placeholder for pending-history context.
if (deps.ignoreAttachments) {
return "<media:attachment>";
}
const firstContentType = dataMessage.attachments?.[0]?.contentType;
const pendingKind = mediaKindFromMime(firstContentType ?? undefined);
return pendingKind ? `<media:${pendingKind}>` : "<media:attachment>";
})();
const pendingBodyText = messageText || pendingPlaceholder || quoteText;
const historyKey = groupId ?? "unknown";
recordPendingHistoryEntryIfEnabled({
historyMap: deps.groupHistories,
historyKey,
limit: deps.historyLimit,
entry: {
sender: envelope.sourceName ?? senderDisplay,
body: pendingBodyText,
timestamp: envelope.timestamp ?? undefined,
messageId:
typeof envelope.timestamp === "number" ? String(envelope.timestamp) : undefined,
},
});
return;
}
let mediaPath: string | undefined;
let mediaType: string | undefined;
let placeholder = "";
const firstAttachment = dataMessage.attachments?.[0];
if (firstAttachment?.id && !deps.ignoreAttachments) {
try {
const fetched = await deps.fetchAttachment({
baseUrl: deps.baseUrl,
account: deps.account,
attachment: firstAttachment,
sender: senderRecipient,
groupId,
maxBytes: deps.mediaMaxBytes,
});
if (fetched) {
mediaPath = fetched.path;
mediaType = fetched.contentType ?? firstAttachment.contentType ?? undefined;
}
} catch (err) {
deps.runtime.error?.(danger(`attachment fetch failed: ${String(err)}`));
}
}
const kind = mediaKindFromMime(mediaType ?? undefined);
if (kind) {
placeholder = `<media:${kind}>`;
} else if (dataMessage.attachments?.length) {
placeholder = "<media:attachment>";
}
const bodyText = messageText || placeholder || dataMessage.quote?.text?.trim() || "";
if (!bodyText) {
return;
}
const receiptTimestamp =
typeof envelope.timestamp === "number"
? envelope.timestamp
: typeof dataMessage.timestamp === "number"
? dataMessage.timestamp
: undefined;
if (deps.sendReadReceipts && !deps.readReceiptsViaDaemon && !isGroup && receiptTimestamp) {
try {
await sendReadReceiptSignal(`signal:${senderRecipient}`, receiptTimestamp, {
baseUrl: deps.baseUrl,
account: deps.account,
accountId: deps.accountId,
});
} catch (err) {
logVerbose(`signal read receipt failed for ${senderDisplay}: ${String(err)}`);
}
} else if (
deps.sendReadReceipts &&
!deps.readReceiptsViaDaemon &&
!isGroup &&
!receiptTimestamp
) {
logVerbose(`signal read receipt skipped (missing timestamp) for ${senderDisplay}`);
}
const senderName = envelope.sourceName ?? senderDisplay;
const messageId =
typeof envelope.timestamp === "number" ? String(envelope.timestamp) : undefined;
await inboundDebouncer.enqueue({
senderName,
senderDisplay,
senderRecipient,
senderPeerId,
groupId,
groupName,
isGroup,
bodyText,
timestamp: envelope.timestamp ?? undefined,
messageId,
mediaPath,
mediaType,
commandAuthorized,
wasMentioned: effectiveWasMentioned,
});
};
}