Signal: add status reaction support (thinking/tool/done emoji)
Hook into the shared StatusReactionController so Signal shows lifecycle emoji on inbound messages as the agent progresses: queued → thinking → tool → compacting → done/error. Works for both native and container modes via client-adapter. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
This commit is contained in:
parent
24e394534e
commit
56c978cbc5
@ -224,7 +224,7 @@ describe("streamSignalEvents", () => {
|
||||
});
|
||||
|
||||
it("uses native SSE for native mode", async () => {
|
||||
mockNativeStreamEvents.mockResolvedValue();
|
||||
mockNativeStreamEvents.mockResolvedValue(undefined);
|
||||
|
||||
const onEvent = vi.fn();
|
||||
await streamSignalEvents({
|
||||
@ -244,7 +244,7 @@ describe("streamSignalEvents", () => {
|
||||
|
||||
it("uses container WebSocket for container mode", async () => {
|
||||
setApiMode("container");
|
||||
mockStreamContainerEvents.mockResolvedValue();
|
||||
mockStreamContainerEvents.mockResolvedValue(undefined);
|
||||
|
||||
const onEvent = vi.fn();
|
||||
await streamSignalEvents({
|
||||
@ -298,7 +298,7 @@ describe("streamSignalEvents", () => {
|
||||
});
|
||||
|
||||
it("passes abort signal to underlying stream", async () => {
|
||||
mockNativeStreamEvents.mockResolvedValue();
|
||||
mockNativeStreamEvents.mockResolvedValue(undefined);
|
||||
|
||||
const abortController = new AbortController();
|
||||
await streamSignalEvents({
|
||||
|
||||
@ -1,5 +1,11 @@
|
||||
import { resolveHumanDelayConfig } from "openclaw/plugin-sdk/agent-runtime";
|
||||
import { logTypingFailure } from "openclaw/plugin-sdk/channel-feedback";
|
||||
import { resolveAckReaction, resolveHumanDelayConfig } from "openclaw/plugin-sdk/agent-runtime";
|
||||
import {
|
||||
createStatusReactionController,
|
||||
DEFAULT_TIMING,
|
||||
logTypingFailure,
|
||||
shouldAckReaction,
|
||||
type StatusReactionController,
|
||||
} from "openclaw/plugin-sdk/channel-feedback";
|
||||
import {
|
||||
buildMentionRegexes,
|
||||
createChannelInboundDebouncer,
|
||||
@ -48,6 +54,7 @@ import {
|
||||
type SignalSender,
|
||||
} from "../identity.js";
|
||||
import { normalizeSignalMessagingTarget } from "../runtime-api.js";
|
||||
import { removeReactionSignal, sendReactionSignal } from "../send-reactions.js";
|
||||
import { sendMessageSignal, sendReadReceiptSignal, sendTypingSignal } from "../send.js";
|
||||
import { handleSignalDirectMessageAccess, resolveSignalAccessState } from "./access-policy.js";
|
||||
import type {
|
||||
@ -307,27 +314,143 @@ export function createSignalEventHandler(deps: SignalEventHandlerDeps) {
|
||||
},
|
||||
});
|
||||
|
||||
const { queuedFinal } = await dispatchInboundMessage({
|
||||
ctx: ctxPayload,
|
||||
cfg: deps.cfg,
|
||||
dispatcher,
|
||||
replyOptions: {
|
||||
...replyOptions,
|
||||
disableBlockStreaming:
|
||||
typeof deps.blockStreaming === "boolean" ? !deps.blockStreaming : undefined,
|
||||
onModelSelected,
|
||||
},
|
||||
// Status reactions: show emoji on the inbound message as agent progresses
|
||||
const statusReactionsConfig = deps.cfg.messages?.statusReactions;
|
||||
const ackEmoji = resolveAckReaction(deps.cfg, route.agentId, {
|
||||
channel: "signal",
|
||||
accountId: deps.accountId,
|
||||
});
|
||||
markDispatchIdle();
|
||||
if (!queuedFinal) {
|
||||
if (entry.isGroup && historyKey) {
|
||||
clearHistoryEntriesIfEnabled({
|
||||
historyMap: deps.groupHistories,
|
||||
historyKey,
|
||||
limit: deps.historyLimit,
|
||||
});
|
||||
const canAckReact =
|
||||
Boolean(ackEmoji) &&
|
||||
shouldAckReaction({
|
||||
scope: deps.cfg.messages?.ackReactionScope,
|
||||
isDirect: !entry.isGroup,
|
||||
isGroup: entry.isGroup,
|
||||
isMentionableGroup: entry.isGroup,
|
||||
requireMention: false,
|
||||
canDetectMention: true,
|
||||
effectiveWasMentioned: entry.wasMentioned === true,
|
||||
});
|
||||
const statusReactionsEnabled =
|
||||
statusReactionsConfig?.enabled === true && canAckReact && Boolean(entry.timestamp);
|
||||
let statusReactions: StatusReactionController | null = null;
|
||||
let lastReactionEmoji: string | undefined;
|
||||
if (statusReactionsEnabled && entry.timestamp) {
|
||||
const msgTimestamp = entry.timestamp;
|
||||
const reactionTarget = entry.isGroup ? undefined : entry.senderRecipient;
|
||||
const reactionGroupId = entry.isGroup ? (entry.groupId ?? undefined) : undefined;
|
||||
const reactionOpts = {
|
||||
cfg: deps.cfg,
|
||||
baseUrl: deps.baseUrl,
|
||||
account: deps.account,
|
||||
accountId: deps.accountId,
|
||||
targetAuthor: entry.senderRecipient,
|
||||
groupId: reactionGroupId,
|
||||
};
|
||||
statusReactions = createStatusReactionController({
|
||||
enabled: true,
|
||||
adapter: {
|
||||
setReaction: async (emoji: string) => {
|
||||
try {
|
||||
if (lastReactionEmoji && lastReactionEmoji !== emoji) {
|
||||
await removeReactionSignal(
|
||||
reactionTarget ?? "",
|
||||
msgTimestamp,
|
||||
lastReactionEmoji,
|
||||
reactionOpts,
|
||||
);
|
||||
}
|
||||
await sendReactionSignal(reactionTarget ?? "", msgTimestamp, emoji, reactionOpts);
|
||||
lastReactionEmoji = emoji;
|
||||
} catch (err) {
|
||||
logVerbose(`signal status-reaction set failed: ${String(err)}`);
|
||||
}
|
||||
},
|
||||
removeReaction: async (emoji: string) => {
|
||||
try {
|
||||
await removeReactionSignal(reactionTarget ?? "", msgTimestamp, emoji, reactionOpts);
|
||||
if (lastReactionEmoji === emoji) {
|
||||
lastReactionEmoji = undefined;
|
||||
}
|
||||
} catch (err) {
|
||||
logVerbose(`signal status-reaction remove failed: ${String(err)}`);
|
||||
}
|
||||
},
|
||||
},
|
||||
initialEmoji: ackEmoji,
|
||||
emojis: deps.cfg.messages?.statusReactions?.emojis,
|
||||
timing: deps.cfg.messages?.statusReactions?.timing,
|
||||
onError: (err) => {
|
||||
logVerbose(`signal status-reaction error: ${String(err)}`);
|
||||
},
|
||||
});
|
||||
void statusReactions.setQueued();
|
||||
}
|
||||
|
||||
let dispatchError = false;
|
||||
try {
|
||||
const { queuedFinal } = await dispatchInboundMessage({
|
||||
ctx: ctxPayload,
|
||||
cfg: deps.cfg,
|
||||
dispatcher,
|
||||
replyOptions: {
|
||||
...replyOptions,
|
||||
disableBlockStreaming:
|
||||
typeof deps.blockStreaming === "boolean" ? !deps.blockStreaming : undefined,
|
||||
onModelSelected,
|
||||
onReasoningStream: statusReactions
|
||||
? async () => {
|
||||
await statusReactions!.setThinking();
|
||||
}
|
||||
: undefined,
|
||||
onToolStart: statusReactions
|
||||
? async (payload) => {
|
||||
await statusReactions!.setTool(payload.name);
|
||||
}
|
||||
: undefined,
|
||||
onCompactionStart: statusReactions
|
||||
? async () => {
|
||||
await statusReactions!.setCompacting();
|
||||
}
|
||||
: undefined,
|
||||
onCompactionEnd: statusReactions
|
||||
? async () => {
|
||||
statusReactions!.cancelPending();
|
||||
await statusReactions!.setThinking();
|
||||
}
|
||||
: undefined,
|
||||
},
|
||||
});
|
||||
markDispatchIdle();
|
||||
if (!queuedFinal) {
|
||||
if (entry.isGroup && historyKey) {
|
||||
clearHistoryEntriesIfEnabled({
|
||||
historyMap: deps.groupHistories,
|
||||
historyKey,
|
||||
limit: deps.historyLimit,
|
||||
});
|
||||
}
|
||||
if (statusReactions) {
|
||||
void statusReactions.restoreInitial();
|
||||
}
|
||||
return;
|
||||
}
|
||||
} catch (err) {
|
||||
dispatchError = true;
|
||||
throw err;
|
||||
} finally {
|
||||
if (statusReactions) {
|
||||
if (dispatchError) {
|
||||
await statusReactions.setError();
|
||||
} else {
|
||||
await statusReactions.setDone();
|
||||
}
|
||||
void (async () => {
|
||||
const holdMs = dispatchError ? DEFAULT_TIMING.errorHoldMs : DEFAULT_TIMING.doneHoldMs;
|
||||
await new Promise((resolve) => setTimeout(resolve, holdMs));
|
||||
await statusReactions!.clear();
|
||||
})();
|
||||
}
|
||||
return;
|
||||
}
|
||||
if (entry.isGroup && historyKey) {
|
||||
clearHistoryEntriesIfEnabled({
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user