diff --git a/extensions/signal/src/client-adapter.test.ts b/extensions/signal/src/client-adapter.test.ts index c02b0f2f486..53e3045d26c 100644 --- a/extensions/signal/src/client-adapter.test.ts +++ b/extensions/signal/src/client-adapter.test.ts @@ -224,7 +224,7 @@ describe("streamSignalEvents", () => { }); it("uses native SSE for native mode", async () => { - mockNativeStreamEvents.mockResolvedValue(); + mockNativeStreamEvents.mockResolvedValue(undefined); const onEvent = vi.fn(); await streamSignalEvents({ @@ -244,7 +244,7 @@ describe("streamSignalEvents", () => { it("uses container WebSocket for container mode", async () => { setApiMode("container"); - mockStreamContainerEvents.mockResolvedValue(); + mockStreamContainerEvents.mockResolvedValue(undefined); const onEvent = vi.fn(); await streamSignalEvents({ @@ -298,7 +298,7 @@ describe("streamSignalEvents", () => { }); it("passes abort signal to underlying stream", async () => { - mockNativeStreamEvents.mockResolvedValue(); + mockNativeStreamEvents.mockResolvedValue(undefined); const abortController = new AbortController(); await streamSignalEvents({ diff --git a/extensions/signal/src/monitor/event-handler.ts b/extensions/signal/src/monitor/event-handler.ts index 58ff8d4f8d7..3bfff9a9451 100644 --- a/extensions/signal/src/monitor/event-handler.ts +++ b/extensions/signal/src/monitor/event-handler.ts @@ -1,5 +1,11 @@ -import { resolveHumanDelayConfig } from "openclaw/plugin-sdk/agent-runtime"; -import { logTypingFailure } from "openclaw/plugin-sdk/channel-feedback"; +import { resolveAckReaction, resolveHumanDelayConfig } from "openclaw/plugin-sdk/agent-runtime"; +import { + createStatusReactionController, + DEFAULT_TIMING, + logTypingFailure, + shouldAckReaction, + type StatusReactionController, +} from "openclaw/plugin-sdk/channel-feedback"; import { buildMentionRegexes, createChannelInboundDebouncer, @@ -48,6 +54,7 @@ import { type SignalSender, } from "../identity.js"; import { normalizeSignalMessagingTarget } from "../runtime-api.js"; +import { removeReactionSignal, sendReactionSignal } from "../send-reactions.js"; import { sendMessageSignal, sendReadReceiptSignal, sendTypingSignal } from "../send.js"; import { handleSignalDirectMessageAccess, resolveSignalAccessState } from "./access-policy.js"; import type { @@ -307,27 +314,143 @@ export function createSignalEventHandler(deps: SignalEventHandlerDeps) { }, }); - const { queuedFinal } = await dispatchInboundMessage({ - ctx: ctxPayload, - cfg: deps.cfg, - dispatcher, - replyOptions: { - ...replyOptions, - disableBlockStreaming: - typeof deps.blockStreaming === "boolean" ? !deps.blockStreaming : undefined, - onModelSelected, - }, + // Status reactions: show emoji on the inbound message as agent progresses + const statusReactionsConfig = deps.cfg.messages?.statusReactions; + const ackEmoji = resolveAckReaction(deps.cfg, route.agentId, { + channel: "signal", + accountId: deps.accountId, }); - markDispatchIdle(); - if (!queuedFinal) { - if (entry.isGroup && historyKey) { - clearHistoryEntriesIfEnabled({ - historyMap: deps.groupHistories, - historyKey, - limit: deps.historyLimit, - }); + const canAckReact = + Boolean(ackEmoji) && + shouldAckReaction({ + scope: deps.cfg.messages?.ackReactionScope, + isDirect: !entry.isGroup, + isGroup: entry.isGroup, + isMentionableGroup: entry.isGroup, + requireMention: false, + canDetectMention: true, + effectiveWasMentioned: entry.wasMentioned === true, + }); + const statusReactionsEnabled = + statusReactionsConfig?.enabled === true && canAckReact && Boolean(entry.timestamp); + let statusReactions: StatusReactionController | null = null; + let lastReactionEmoji: string | undefined; + if (statusReactionsEnabled && entry.timestamp) { + const msgTimestamp = entry.timestamp; + const reactionTarget = entry.isGroup ? undefined : entry.senderRecipient; + const reactionGroupId = entry.isGroup ? (entry.groupId ?? undefined) : undefined; + const reactionOpts = { + cfg: deps.cfg, + baseUrl: deps.baseUrl, + account: deps.account, + accountId: deps.accountId, + targetAuthor: entry.senderRecipient, + groupId: reactionGroupId, + }; + statusReactions = createStatusReactionController({ + enabled: true, + adapter: { + setReaction: async (emoji: string) => { + try { + if (lastReactionEmoji && lastReactionEmoji !== emoji) { + await removeReactionSignal( + reactionTarget ?? "", + msgTimestamp, + lastReactionEmoji, + reactionOpts, + ); + } + await sendReactionSignal(reactionTarget ?? "", msgTimestamp, emoji, reactionOpts); + lastReactionEmoji = emoji; + } catch (err) { + logVerbose(`signal status-reaction set failed: ${String(err)}`); + } + }, + removeReaction: async (emoji: string) => { + try { + await removeReactionSignal(reactionTarget ?? "", msgTimestamp, emoji, reactionOpts); + if (lastReactionEmoji === emoji) { + lastReactionEmoji = undefined; + } + } catch (err) { + logVerbose(`signal status-reaction remove failed: ${String(err)}`); + } + }, + }, + initialEmoji: ackEmoji, + emojis: deps.cfg.messages?.statusReactions?.emojis, + timing: deps.cfg.messages?.statusReactions?.timing, + onError: (err) => { + logVerbose(`signal status-reaction error: ${String(err)}`); + }, + }); + void statusReactions.setQueued(); + } + + let dispatchError = false; + try { + const { queuedFinal } = await dispatchInboundMessage({ + ctx: ctxPayload, + cfg: deps.cfg, + dispatcher, + replyOptions: { + ...replyOptions, + disableBlockStreaming: + typeof deps.blockStreaming === "boolean" ? !deps.blockStreaming : undefined, + onModelSelected, + onReasoningStream: statusReactions + ? async () => { + await statusReactions!.setThinking(); + } + : undefined, + onToolStart: statusReactions + ? async (payload) => { + await statusReactions!.setTool(payload.name); + } + : undefined, + onCompactionStart: statusReactions + ? async () => { + await statusReactions!.setCompacting(); + } + : undefined, + onCompactionEnd: statusReactions + ? async () => { + statusReactions!.cancelPending(); + await statusReactions!.setThinking(); + } + : undefined, + }, + }); + markDispatchIdle(); + if (!queuedFinal) { + if (entry.isGroup && historyKey) { + clearHistoryEntriesIfEnabled({ + historyMap: deps.groupHistories, + historyKey, + limit: deps.historyLimit, + }); + } + if (statusReactions) { + void statusReactions.restoreInitial(); + } + return; + } + } catch (err) { + dispatchError = true; + throw err; + } finally { + if (statusReactions) { + if (dispatchError) { + await statusReactions.setError(); + } else { + await statusReactions.setDone(); + } + void (async () => { + const holdMs = dispatchError ? DEFAULT_TIMING.errorHoldMs : DEFAULT_TIMING.doneHoldMs; + await new Promise((resolve) => setTimeout(resolve, holdMs)); + await statusReactions!.clear(); + })(); } - return; } if (entry.isGroup && historyKey) { clearHistoryEntriesIfEnabled({