From 4cc0bb07c150001c180df354740bddf054a3050b Mon Sep 17 00:00:00 2001 From: Peter Steinberger Date: Thu, 19 Mar 2026 00:30:55 +0000 Subject: [PATCH] refactor: unify plugin sdk pairing flows --- .../src/matrix/monitor/access-policy.test.ts | 32 +++++++++++ .../src/matrix/monitor/access-policy.ts | 19 +++---- .../matrix/src/matrix/monitor/handler.ts | 57 +++++++++---------- .../signal/src/monitor/access-policy.test.ts | 43 ++++++++++++++ .../signal/src/monitor/access-policy.ts | 11 ++-- .../signal/src/monitor/event-handler.ts | 46 +++++++-------- src/plugin-sdk/channel-pairing.test.ts | 30 +++++++++- src/plugin-sdk/channel-pairing.ts | 27 +++++++-- src/plugin-sdk/matrix.ts | 6 +- 9 files changed, 192 insertions(+), 79 deletions(-) create mode 100644 extensions/matrix/src/matrix/monitor/access-policy.test.ts create mode 100644 extensions/signal/src/monitor/access-policy.test.ts diff --git a/extensions/matrix/src/matrix/monitor/access-policy.test.ts b/extensions/matrix/src/matrix/monitor/access-policy.test.ts new file mode 100644 index 00000000000..c4fe597b0ee --- /dev/null +++ b/extensions/matrix/src/matrix/monitor/access-policy.test.ts @@ -0,0 +1,32 @@ +import { describe, expect, it, vi } from "vitest"; +import { enforceMatrixDirectMessageAccess } from "./access-policy.js"; + +describe("enforceMatrixDirectMessageAccess", () => { + it("issues pairing through the injected channel pairing challenge", async () => { + const issuePairingChallenge = vi.fn(async () => ({ created: true, code: "123456" })); + const sendPairingReply = vi.fn(async () => {}); + + await expect( + enforceMatrixDirectMessageAccess({ + dmEnabled: true, + dmPolicy: "pairing", + accessDecision: "pairing", + senderId: "@alice:example.com", + senderName: "Alice", + effectiveAllowFrom: [], + issuePairingChallenge, + sendPairingReply, + logVerboseMessage: () => {}, + }), + ).resolves.toBe(false); + + expect(issuePairingChallenge).toHaveBeenCalledTimes(1); + expect(issuePairingChallenge).toHaveBeenCalledWith( + expect.objectContaining({ + senderId: "@alice:example.com", + meta: { name: "Alice" }, + sendPairingReply, + }), + ); + }); +}); diff --git a/extensions/matrix/src/matrix/monitor/access-policy.ts b/extensions/matrix/src/matrix/monitor/access-policy.ts index 8553b38c131..249051fbdc6 100644 --- a/extensions/matrix/src/matrix/monitor/access-policy.ts +++ b/extensions/matrix/src/matrix/monitor/access-policy.ts @@ -1,6 +1,5 @@ import { formatAllowlistMatchMeta, - issuePairingChallenge, readStoreAllowFromForDmPolicy, resolveDmGroupAccessWithLists, resolveSenderScopedGroupPolicy, @@ -68,13 +67,15 @@ export async function enforceMatrixDirectMessageAccess(params: { senderId: string; senderName: string; effectiveAllowFrom: string[]; - upsertPairingRequest: (input: { - id: string; + issuePairingChallenge: (params: { + senderId: string; + senderIdLine: string; meta?: Record; - }) => Promise<{ - code: string; - created: boolean; - }>; + buildReplyText: (params: { code: string }) => string; + sendPairingReply: (text: string) => Promise; + onCreated: () => void; + onReplyError: (err: unknown) => void; + }) => Promise<{ created: boolean; code?: string }>; sendPairingReply: (text: string) => Promise; logVerboseMessage: (message: string) => void; }): Promise { @@ -90,12 +91,10 @@ export async function enforceMatrixDirectMessageAccess(params: { }); const allowMatchMeta = formatAllowlistMatchMeta(allowMatch); if (params.accessDecision === "pairing") { - await issuePairingChallenge({ - channel: "matrix", + await params.issuePairingChallenge({ senderId: params.senderId, senderIdLine: `Matrix user id: ${params.senderId}`, meta: { name: params.senderName }, - upsertPairingRequest: params.upsertPairingRequest, buildReplyText: ({ code }) => [ "OpenClaw: access not configured.", diff --git a/extensions/matrix/src/matrix/monitor/handler.ts b/extensions/matrix/src/matrix/monitor/handler.ts index ddd8232280a..a0cd8148765 100644 --- a/extensions/matrix/src/matrix/monitor/handler.ts +++ b/extensions/matrix/src/matrix/monitor/handler.ts @@ -1,9 +1,8 @@ import type { LocationMessageEventContent, MatrixClient } from "@vector-im/matrix-bot-sdk"; import { DEFAULT_ACCOUNT_ID, - createScopedPairingAccess, - createReplyPrefixOptions, - createTypingCallbacks, + createChannelPairingController, + createChannelReplyPipeline, dispatchReplyFromConfigWithSettledDispatcher, evaluateGroupRouteAccessForPolicy, formatAllowlistMatchMeta, @@ -153,7 +152,7 @@ export function createMatrixRoomMessageHandler(params: MatrixMonitorHandlerParam accountId, } = params; const resolvedAccountId = accountId?.trim() || DEFAULT_ACCOUNT_ID; - const pairing = createScopedPairingAccess({ + const pairing = createChannelPairingController({ core, channel: "matrix", accountId: resolvedAccountId, @@ -322,7 +321,7 @@ export function createMatrixRoomMessageHandler(params: MatrixMonitorHandlerParam senderId, senderName, effectiveAllowFrom, - upsertPairingRequest: pairing.upsertPairingRequest, + issuePairingChallenge: pairing.issueChallenge, sendPairingReply: async (text) => { await sendMessageMatrix(`room:${roomId}`, text, { client }); }, @@ -680,38 +679,38 @@ export function createMatrixRoomMessageHandler(params: MatrixMonitorHandlerParam channel: "matrix", accountId: route.accountId, }); - const { onModelSelected, ...prefixOptions } = createReplyPrefixOptions({ + const { onModelSelected, typingCallbacks, ...replyPipeline } = createChannelReplyPipeline({ cfg, agentId: route.agentId, channel: "matrix", accountId: route.accountId, + typing: { + start: () => sendTypingMatrix(roomId, true, undefined, client), + stop: () => sendTypingMatrix(roomId, false, undefined, client), + onStartError: (err) => { + logTypingFailure({ + log: logVerboseMessage, + channel: "matrix", + action: "start", + target: roomId, + error: err, + }); + }, + onStopError: (err) => { + logTypingFailure({ + log: logVerboseMessage, + channel: "matrix", + action: "stop", + target: roomId, + error: err, + }); + }, + }, }); const humanDelay = core.channel.reply.resolveHumanDelayConfig(cfg, route.agentId); - const typingCallbacks = createTypingCallbacks({ - start: () => sendTypingMatrix(roomId, true, undefined, client), - stop: () => sendTypingMatrix(roomId, false, undefined, client), - onStartError: (err) => { - logTypingFailure({ - log: logVerboseMessage, - channel: "matrix", - action: "start", - target: roomId, - error: err, - }); - }, - onStopError: (err) => { - logTypingFailure({ - log: logVerboseMessage, - channel: "matrix", - action: "stop", - target: roomId, - error: err, - }); - }, - }); const { dispatcher, replyOptions, markDispatchIdle } = core.channel.reply.createReplyDispatcherWithTyping({ - ...prefixOptions, + ...replyPipeline, humanDelay, typingCallbacks, deliver: async (payload) => { diff --git a/extensions/signal/src/monitor/access-policy.test.ts b/extensions/signal/src/monitor/access-policy.test.ts new file mode 100644 index 00000000000..f057f4cdf05 --- /dev/null +++ b/extensions/signal/src/monitor/access-policy.test.ts @@ -0,0 +1,43 @@ +import { describe, expect, it, vi } from "vitest"; +import { handleSignalDirectMessageAccess } from "./access-policy.js"; + +describe("handleSignalDirectMessageAccess", () => { + it("returns true for already-allowed direct messages", async () => { + await expect( + handleSignalDirectMessageAccess({ + dmPolicy: "open", + dmAccessDecision: "allow", + senderId: "+15551230000", + senderIdLine: "Signal number: +15551230000", + senderDisplay: "Alice", + accountId: "default", + sendPairingReply: async () => {}, + log: () => {}, + }), + ).resolves.toBe(true); + }); + + it("issues a pairing challenge for pairing-gated senders", async () => { + const replies: string[] = []; + const sendPairingReply = vi.fn(async (text: string) => { + replies.push(text); + }); + + await expect( + handleSignalDirectMessageAccess({ + dmPolicy: "pairing", + dmAccessDecision: "pairing", + senderId: "+15551230000", + senderIdLine: "Signal number: +15551230000", + senderDisplay: "Alice", + senderName: "Alice", + accountId: "default", + sendPairingReply, + log: () => {}, + }), + ).resolves.toBe(false); + + expect(sendPairingReply).toHaveBeenCalledTimes(1); + expect(replies[0]).toContain("Pairing code:"); + }); +}); diff --git a/extensions/signal/src/monitor/access-policy.ts b/extensions/signal/src/monitor/access-policy.ts index de083efd9fd..cf1aff2cbe4 100644 --- a/extensions/signal/src/monitor/access-policy.ts +++ b/extensions/signal/src/monitor/access-policy.ts @@ -1,4 +1,4 @@ -import { issuePairingChallenge } from "openclaw/plugin-sdk/conversation-runtime"; +import { createChannelPairingChallengeIssuer } from "openclaw/plugin-sdk/channel-pairing"; import { upsertChannelPairingRequest } from "openclaw/plugin-sdk/conversation-runtime"; import { readStoreAllowFromForDmPolicy, @@ -62,11 +62,8 @@ export async function handleSignalDirectMessageAccess(params: { return false; } if (params.dmPolicy === "pairing") { - await issuePairingChallenge({ + await createChannelPairingChallengeIssuer({ channel: "signal", - senderId: params.senderId, - senderIdLine: params.senderIdLine, - meta: { name: params.senderName }, upsertPairingRequest: async ({ id, meta }) => await upsertChannelPairingRequest({ channel: "signal", @@ -74,6 +71,10 @@ export async function handleSignalDirectMessageAccess(params: { accountId: params.accountId, meta, }), + })({ + senderId: params.senderId, + senderIdLine: params.senderIdLine, + meta: { name: params.senderName }, sendPairingReply: params.sendPairingReply, onCreated: () => { params.log(`signal pairing request sender=${params.senderId}`); diff --git a/extensions/signal/src/monitor/event-handler.ts b/extensions/signal/src/monitor/event-handler.ts index c8f9da661a0..23eb676ae82 100644 --- a/extensions/signal/src/monitor/event-handler.ts +++ b/extensions/signal/src/monitor/event-handler.ts @@ -1,4 +1,5 @@ import { resolveHumanDelayConfig } from "openclaw/plugin-sdk/agent-runtime"; +import { createChannelReplyPipeline } from "openclaw/plugin-sdk/channel-reply-pipeline"; import { resolveControlCommandGate } from "openclaw/plugin-sdk/channel-runtime"; import { createChannelInboundDebouncer, @@ -7,9 +8,7 @@ import { import { logInboundDrop, logTypingFailure } from "openclaw/plugin-sdk/channel-runtime"; import { resolveMentionGatingWithBypass } from "openclaw/plugin-sdk/channel-runtime"; import { normalizeSignalMessagingTarget } from "openclaw/plugin-sdk/channel-runtime"; -import { createReplyPrefixOptions } from "openclaw/plugin-sdk/channel-runtime"; import { recordInboundSession } from "openclaw/plugin-sdk/channel-runtime"; -import { createTypingCallbacks } from "openclaw/plugin-sdk/channel-runtime"; import { resolveChannelGroupRequireMention } from "openclaw/plugin-sdk/config-runtime"; import { readSessionUpdatedAt, resolveStorePath } from "openclaw/plugin-sdk/config-runtime"; import { enqueueSystemEvent } from "openclaw/plugin-sdk/infra-runtime"; @@ -258,36 +257,35 @@ export function createSignalEventHandler(deps: SignalEventHandlerDeps) { logVerbose(`signal inbound: from=${ctxPayload.From} len=${body.length} preview="${preview}"`); } - const { onModelSelected, ...prefixOptions } = createReplyPrefixOptions({ + const { onModelSelected, typingCallbacks, ...replyPipeline } = createChannelReplyPipeline({ cfg: deps.cfg, agentId: route.agentId, channel: "signal", accountId: route.accountId, - }); - - const typingCallbacks = createTypingCallbacks({ - start: async () => { - if (!ctxPayload.To) { - return; - } - await sendTypingSignal(ctxPayload.To, { - baseUrl: deps.baseUrl, - account: deps.account, - accountId: deps.accountId, - }); - }, - onStartError: (err) => { - logTypingFailure({ - log: logVerbose, - channel: "signal", - target: ctxPayload.To ?? undefined, - error: err, - }); + typing: { + start: async () => { + if (!ctxPayload.To) { + return; + } + await sendTypingSignal(ctxPayload.To, { + baseUrl: deps.baseUrl, + account: deps.account, + accountId: deps.accountId, + }); + }, + onStartError: (err) => { + logTypingFailure({ + log: logVerbose, + channel: "signal", + target: ctxPayload.To ?? undefined, + error: err, + }); + }, }, }); const { dispatcher, replyOptions, markDispatchIdle } = createReplyDispatcherWithTyping({ - ...prefixOptions, + ...replyPipeline, humanDelay: resolveHumanDelayConfig(deps.cfg, route.agentId), typingCallbacks, deliver: async (payload) => { diff --git a/src/plugin-sdk/channel-pairing.test.ts b/src/plugin-sdk/channel-pairing.test.ts index 7caac389c9b..1638561749a 100644 --- a/src/plugin-sdk/channel-pairing.test.ts +++ b/src/plugin-sdk/channel-pairing.test.ts @@ -1,6 +1,9 @@ import { describe, expect, it, vi } from "vitest"; import type { PluginRuntime } from "../plugins/runtime/types.js"; -import { createChannelPairingController } from "./channel-pairing.js"; +import { + createChannelPairingChallengeIssuer, + createChannelPairingController, +} from "./channel-pairing.js"; describe("createChannelPairingController", () => { it("scopes store access and issues pairing challenges through the scoped store", async () => { @@ -46,3 +49,28 @@ describe("createChannelPairingController", () => { expect(replies[0]).toContain("123456"); }); }); + +describe("createChannelPairingChallengeIssuer", () => { + it("binds a channel and scoped pairing store to challenge issuance", async () => { + const upsertPairingRequest = vi.fn(async () => ({ code: "654321", created: true })); + const replies: string[] = []; + const issueChallenge = createChannelPairingChallengeIssuer({ + channel: "signal", + upsertPairingRequest, + }); + + await issueChallenge({ + senderId: "user-2", + senderIdLine: "Your id: user-2", + sendPairingReply: async (text: string) => { + replies.push(text); + }, + }); + + expect(upsertPairingRequest).toHaveBeenCalledWith({ + id: "user-2", + meta: undefined, + }); + expect(replies[0]).toContain("654321"); + }); +}); diff --git a/src/plugin-sdk/channel-pairing.ts b/src/plugin-sdk/channel-pairing.ts index 2628eebfde8..1d8a1ce3b05 100644 --- a/src/plugin-sdk/channel-pairing.ts +++ b/src/plugin-sdk/channel-pairing.ts @@ -13,6 +13,23 @@ export type ChannelPairingController = ScopedPairingAccess & { ) => ReturnType; }; +export function createChannelPairingChallengeIssuer(params: { + channel: ChannelId; + upsertPairingRequest: Parameters[0]["upsertPairingRequest"]; +}) { + return ( + challenge: Omit< + Parameters[0], + "channel" | "upsertPairingRequest" + >, + ) => + issuePairingChallenge({ + channel: params.channel, + upsertPairingRequest: params.upsertPairingRequest, + ...challenge, + }); +} + export function createChannelPairingController(params: { core: PluginRuntime; channel: ChannelId; @@ -21,11 +38,9 @@ export function createChannelPairingController(params: { const access = createScopedPairingAccess(params); return { ...access, - issueChallenge: (challenge) => - issuePairingChallenge({ - channel: params.channel, - upsertPairingRequest: access.upsertPairingRequest, - ...challenge, - }), + issueChallenge: createChannelPairingChallengeIssuer({ + channel: params.channel, + upsertPairingRequest: access.upsertPairingRequest, + }), }; } diff --git a/src/plugin-sdk/matrix.ts b/src/plugin-sdk/matrix.ts index 92785e4d97b..710bfb5eb40 100644 --- a/src/plugin-sdk/matrix.ts +++ b/src/plugin-sdk/matrix.ts @@ -57,8 +57,7 @@ export type { ChannelToolSend, } from "../channels/plugins/types.js"; export type { ChannelPlugin } from "../channels/plugins/types.plugin.js"; -export { createChannelReplyPipeline, createReplyPrefixOptions } from "./channel-reply-pipeline.js"; -export { createTypingCallbacks } from "./channel-reply-pipeline.js"; +export { createChannelReplyPipeline } from "./channel-reply-pipeline.js"; export type { OpenClawConfig } from "../config/config.js"; export { GROUP_POLICY_BLOCKED_LABEL, @@ -82,7 +81,6 @@ export { export { ToolPolicySchema } from "../config/zod-schema.agent-runtime.js"; export { MarkdownConfigSchema } from "../config/zod-schema.core.js"; export { fetchWithSsrFGuard } from "../infra/net/fetch-guard.js"; -export { issuePairingChallenge } from "../pairing/pairing-challenge.js"; export { emptyPluginConfigSchema } from "../plugins/config-schema.js"; export type { PluginRuntime, RuntimeLogger } from "../plugins/runtime/types.js"; export type { OpenClawPluginApi } from "../plugins/types.js"; @@ -100,7 +98,7 @@ export { evaluateGroupRouteAccessForPolicy, resolveSenderScopedGroupPolicy, } from "./group-access.js"; -export { createChannelPairingController, createScopedPairingAccess } from "./channel-pairing.js"; +export { createChannelPairingController } from "./channel-pairing.js"; export { formatResolvedUnresolvedNote } from "./resolution-notes.js"; export { runPluginCommandWithTimeout } from "./run-command.js"; export { dispatchReplyFromConfigWithSettledDispatcher } from "./inbound-reply-dispatch.js";