refactor: unify plugin sdk pairing flows

This commit is contained in:
Peter Steinberger 2026-03-19 00:30:55 +00:00
parent b736a92e19
commit 4cc0bb07c1
9 changed files with 192 additions and 79 deletions

View File

@ -0,0 +1,32 @@
import { describe, expect, it, vi } from "vitest";
import { enforceMatrixDirectMessageAccess } from "./access-policy.js";
describe("enforceMatrixDirectMessageAccess", () => {
it("issues pairing through the injected channel pairing challenge", async () => {
const issuePairingChallenge = vi.fn(async () => ({ created: true, code: "123456" }));
const sendPairingReply = vi.fn(async () => {});
await expect(
enforceMatrixDirectMessageAccess({
dmEnabled: true,
dmPolicy: "pairing",
accessDecision: "pairing",
senderId: "@alice:example.com",
senderName: "Alice",
effectiveAllowFrom: [],
issuePairingChallenge,
sendPairingReply,
logVerboseMessage: () => {},
}),
).resolves.toBe(false);
expect(issuePairingChallenge).toHaveBeenCalledTimes(1);
expect(issuePairingChallenge).toHaveBeenCalledWith(
expect.objectContaining({
senderId: "@alice:example.com",
meta: { name: "Alice" },
sendPairingReply,
}),
);
});
});

View File

@ -1,6 +1,5 @@
import {
formatAllowlistMatchMeta,
issuePairingChallenge,
readStoreAllowFromForDmPolicy,
resolveDmGroupAccessWithLists,
resolveSenderScopedGroupPolicy,
@ -68,13 +67,15 @@ export async function enforceMatrixDirectMessageAccess(params: {
senderId: string;
senderName: string;
effectiveAllowFrom: string[];
upsertPairingRequest: (input: {
id: string;
issuePairingChallenge: (params: {
senderId: string;
senderIdLine: string;
meta?: Record<string, string | undefined>;
}) => Promise<{
code: string;
created: boolean;
}>;
buildReplyText: (params: { code: string }) => string;
sendPairingReply: (text: string) => Promise<void>;
onCreated: () => void;
onReplyError: (err: unknown) => void;
}) => Promise<{ created: boolean; code?: string }>;
sendPairingReply: (text: string) => Promise<void>;
logVerboseMessage: (message: string) => void;
}): Promise<boolean> {
@ -90,12 +91,10 @@ export async function enforceMatrixDirectMessageAccess(params: {
});
const allowMatchMeta = formatAllowlistMatchMeta(allowMatch);
if (params.accessDecision === "pairing") {
await issuePairingChallenge({
channel: "matrix",
await params.issuePairingChallenge({
senderId: params.senderId,
senderIdLine: `Matrix user id: ${params.senderId}`,
meta: { name: params.senderName },
upsertPairingRequest: params.upsertPairingRequest,
buildReplyText: ({ code }) =>
[
"OpenClaw: access not configured.",

View File

@ -1,9 +1,8 @@
import type { LocationMessageEventContent, MatrixClient } from "@vector-im/matrix-bot-sdk";
import {
DEFAULT_ACCOUNT_ID,
createScopedPairingAccess,
createReplyPrefixOptions,
createTypingCallbacks,
createChannelPairingController,
createChannelReplyPipeline,
dispatchReplyFromConfigWithSettledDispatcher,
evaluateGroupRouteAccessForPolicy,
formatAllowlistMatchMeta,
@ -153,7 +152,7 @@ export function createMatrixRoomMessageHandler(params: MatrixMonitorHandlerParam
accountId,
} = params;
const resolvedAccountId = accountId?.trim() || DEFAULT_ACCOUNT_ID;
const pairing = createScopedPairingAccess({
const pairing = createChannelPairingController({
core,
channel: "matrix",
accountId: resolvedAccountId,
@ -322,7 +321,7 @@ export function createMatrixRoomMessageHandler(params: MatrixMonitorHandlerParam
senderId,
senderName,
effectiveAllowFrom,
upsertPairingRequest: pairing.upsertPairingRequest,
issuePairingChallenge: pairing.issueChallenge,
sendPairingReply: async (text) => {
await sendMessageMatrix(`room:${roomId}`, text, { client });
},
@ -680,38 +679,38 @@ export function createMatrixRoomMessageHandler(params: MatrixMonitorHandlerParam
channel: "matrix",
accountId: route.accountId,
});
const { onModelSelected, ...prefixOptions } = createReplyPrefixOptions({
const { onModelSelected, typingCallbacks, ...replyPipeline } = createChannelReplyPipeline({
cfg,
agentId: route.agentId,
channel: "matrix",
accountId: route.accountId,
typing: {
start: () => sendTypingMatrix(roomId, true, undefined, client),
stop: () => sendTypingMatrix(roomId, false, undefined, client),
onStartError: (err) => {
logTypingFailure({
log: logVerboseMessage,
channel: "matrix",
action: "start",
target: roomId,
error: err,
});
},
onStopError: (err) => {
logTypingFailure({
log: logVerboseMessage,
channel: "matrix",
action: "stop",
target: roomId,
error: err,
});
},
},
});
const humanDelay = core.channel.reply.resolveHumanDelayConfig(cfg, route.agentId);
const typingCallbacks = createTypingCallbacks({
start: () => sendTypingMatrix(roomId, true, undefined, client),
stop: () => sendTypingMatrix(roomId, false, undefined, client),
onStartError: (err) => {
logTypingFailure({
log: logVerboseMessage,
channel: "matrix",
action: "start",
target: roomId,
error: err,
});
},
onStopError: (err) => {
logTypingFailure({
log: logVerboseMessage,
channel: "matrix",
action: "stop",
target: roomId,
error: err,
});
},
});
const { dispatcher, replyOptions, markDispatchIdle } =
core.channel.reply.createReplyDispatcherWithTyping({
...prefixOptions,
...replyPipeline,
humanDelay,
typingCallbacks,
deliver: async (payload) => {

View File

@ -0,0 +1,43 @@
import { describe, expect, it, vi } from "vitest";
import { handleSignalDirectMessageAccess } from "./access-policy.js";
describe("handleSignalDirectMessageAccess", () => {
it("returns true for already-allowed direct messages", async () => {
await expect(
handleSignalDirectMessageAccess({
dmPolicy: "open",
dmAccessDecision: "allow",
senderId: "+15551230000",
senderIdLine: "Signal number: +15551230000",
senderDisplay: "Alice",
accountId: "default",
sendPairingReply: async () => {},
log: () => {},
}),
).resolves.toBe(true);
});
it("issues a pairing challenge for pairing-gated senders", async () => {
const replies: string[] = [];
const sendPairingReply = vi.fn(async (text: string) => {
replies.push(text);
});
await expect(
handleSignalDirectMessageAccess({
dmPolicy: "pairing",
dmAccessDecision: "pairing",
senderId: "+15551230000",
senderIdLine: "Signal number: +15551230000",
senderDisplay: "Alice",
senderName: "Alice",
accountId: "default",
sendPairingReply,
log: () => {},
}),
).resolves.toBe(false);
expect(sendPairingReply).toHaveBeenCalledTimes(1);
expect(replies[0]).toContain("Pairing code:");
});
});

View File

@ -1,4 +1,4 @@
import { issuePairingChallenge } from "openclaw/plugin-sdk/conversation-runtime";
import { createChannelPairingChallengeIssuer } from "openclaw/plugin-sdk/channel-pairing";
import { upsertChannelPairingRequest } from "openclaw/plugin-sdk/conversation-runtime";
import {
readStoreAllowFromForDmPolicy,
@ -62,11 +62,8 @@ export async function handleSignalDirectMessageAccess(params: {
return false;
}
if (params.dmPolicy === "pairing") {
await issuePairingChallenge({
await createChannelPairingChallengeIssuer({
channel: "signal",
senderId: params.senderId,
senderIdLine: params.senderIdLine,
meta: { name: params.senderName },
upsertPairingRequest: async ({ id, meta }) =>
await upsertChannelPairingRequest({
channel: "signal",
@ -74,6 +71,10 @@ export async function handleSignalDirectMessageAccess(params: {
accountId: params.accountId,
meta,
}),
})({
senderId: params.senderId,
senderIdLine: params.senderIdLine,
meta: { name: params.senderName },
sendPairingReply: params.sendPairingReply,
onCreated: () => {
params.log(`signal pairing request sender=${params.senderId}`);

View File

@ -1,4 +1,5 @@
import { resolveHumanDelayConfig } from "openclaw/plugin-sdk/agent-runtime";
import { createChannelReplyPipeline } from "openclaw/plugin-sdk/channel-reply-pipeline";
import { resolveControlCommandGate } from "openclaw/plugin-sdk/channel-runtime";
import {
createChannelInboundDebouncer,
@ -7,9 +8,7 @@ import {
import { logInboundDrop, logTypingFailure } from "openclaw/plugin-sdk/channel-runtime";
import { resolveMentionGatingWithBypass } from "openclaw/plugin-sdk/channel-runtime";
import { normalizeSignalMessagingTarget } from "openclaw/plugin-sdk/channel-runtime";
import { createReplyPrefixOptions } from "openclaw/plugin-sdk/channel-runtime";
import { recordInboundSession } from "openclaw/plugin-sdk/channel-runtime";
import { createTypingCallbacks } from "openclaw/plugin-sdk/channel-runtime";
import { resolveChannelGroupRequireMention } from "openclaw/plugin-sdk/config-runtime";
import { readSessionUpdatedAt, resolveStorePath } from "openclaw/plugin-sdk/config-runtime";
import { enqueueSystemEvent } from "openclaw/plugin-sdk/infra-runtime";
@ -258,36 +257,35 @@ export function createSignalEventHandler(deps: SignalEventHandlerDeps) {
logVerbose(`signal inbound: from=${ctxPayload.From} len=${body.length} preview="${preview}"`);
}
const { onModelSelected, ...prefixOptions } = createReplyPrefixOptions({
const { onModelSelected, typingCallbacks, ...replyPipeline } = createChannelReplyPipeline({
cfg: deps.cfg,
agentId: route.agentId,
channel: "signal",
accountId: route.accountId,
});
const typingCallbacks = createTypingCallbacks({
start: async () => {
if (!ctxPayload.To) {
return;
}
await sendTypingSignal(ctxPayload.To, {
baseUrl: deps.baseUrl,
account: deps.account,
accountId: deps.accountId,
});
},
onStartError: (err) => {
logTypingFailure({
log: logVerbose,
channel: "signal",
target: ctxPayload.To ?? undefined,
error: err,
});
typing: {
start: async () => {
if (!ctxPayload.To) {
return;
}
await sendTypingSignal(ctxPayload.To, {
baseUrl: deps.baseUrl,
account: deps.account,
accountId: deps.accountId,
});
},
onStartError: (err) => {
logTypingFailure({
log: logVerbose,
channel: "signal",
target: ctxPayload.To ?? undefined,
error: err,
});
},
},
});
const { dispatcher, replyOptions, markDispatchIdle } = createReplyDispatcherWithTyping({
...prefixOptions,
...replyPipeline,
humanDelay: resolveHumanDelayConfig(deps.cfg, route.agentId),
typingCallbacks,
deliver: async (payload) => {

View File

@ -1,6 +1,9 @@
import { describe, expect, it, vi } from "vitest";
import type { PluginRuntime } from "../plugins/runtime/types.js";
import { createChannelPairingController } from "./channel-pairing.js";
import {
createChannelPairingChallengeIssuer,
createChannelPairingController,
} from "./channel-pairing.js";
describe("createChannelPairingController", () => {
it("scopes store access and issues pairing challenges through the scoped store", async () => {
@ -46,3 +49,28 @@ describe("createChannelPairingController", () => {
expect(replies[0]).toContain("123456");
});
});
describe("createChannelPairingChallengeIssuer", () => {
it("binds a channel and scoped pairing store to challenge issuance", async () => {
const upsertPairingRequest = vi.fn(async () => ({ code: "654321", created: true }));
const replies: string[] = [];
const issueChallenge = createChannelPairingChallengeIssuer({
channel: "signal",
upsertPairingRequest,
});
await issueChallenge({
senderId: "user-2",
senderIdLine: "Your id: user-2",
sendPairingReply: async (text: string) => {
replies.push(text);
},
});
expect(upsertPairingRequest).toHaveBeenCalledWith({
id: "user-2",
meta: undefined,
});
expect(replies[0]).toContain("654321");
});
});

View File

@ -13,6 +13,23 @@ export type ChannelPairingController = ScopedPairingAccess & {
) => ReturnType<typeof issuePairingChallenge>;
};
export function createChannelPairingChallengeIssuer(params: {
channel: ChannelId;
upsertPairingRequest: Parameters<typeof issuePairingChallenge>[0]["upsertPairingRequest"];
}) {
return (
challenge: Omit<
Parameters<typeof issuePairingChallenge>[0],
"channel" | "upsertPairingRequest"
>,
) =>
issuePairingChallenge({
channel: params.channel,
upsertPairingRequest: params.upsertPairingRequest,
...challenge,
});
}
export function createChannelPairingController(params: {
core: PluginRuntime;
channel: ChannelId;
@ -21,11 +38,9 @@ export function createChannelPairingController(params: {
const access = createScopedPairingAccess(params);
return {
...access,
issueChallenge: (challenge) =>
issuePairingChallenge({
channel: params.channel,
upsertPairingRequest: access.upsertPairingRequest,
...challenge,
}),
issueChallenge: createChannelPairingChallengeIssuer({
channel: params.channel,
upsertPairingRequest: access.upsertPairingRequest,
}),
};
}

View File

@ -57,8 +57,7 @@ export type {
ChannelToolSend,
} from "../channels/plugins/types.js";
export type { ChannelPlugin } from "../channels/plugins/types.plugin.js";
export { createChannelReplyPipeline, createReplyPrefixOptions } from "./channel-reply-pipeline.js";
export { createTypingCallbacks } from "./channel-reply-pipeline.js";
export { createChannelReplyPipeline } from "./channel-reply-pipeline.js";
export type { OpenClawConfig } from "../config/config.js";
export {
GROUP_POLICY_BLOCKED_LABEL,
@ -82,7 +81,6 @@ export {
export { ToolPolicySchema } from "../config/zod-schema.agent-runtime.js";
export { MarkdownConfigSchema } from "../config/zod-schema.core.js";
export { fetchWithSsrFGuard } from "../infra/net/fetch-guard.js";
export { issuePairingChallenge } from "../pairing/pairing-challenge.js";
export { emptyPluginConfigSchema } from "../plugins/config-schema.js";
export type { PluginRuntime, RuntimeLogger } from "../plugins/runtime/types.js";
export type { OpenClawPluginApi } from "../plugins/types.js";
@ -100,7 +98,7 @@ export {
evaluateGroupRouteAccessForPolicy,
resolveSenderScopedGroupPolicy,
} from "./group-access.js";
export { createChannelPairingController, createScopedPairingAccess } from "./channel-pairing.js";
export { createChannelPairingController } from "./channel-pairing.js";
export { formatResolvedUnresolvedNote } from "./resolution-notes.js";
export { runPluginCommandWithTimeout } from "./run-command.js";
export { dispatchReplyFromConfigWithSettledDispatcher } from "./inbound-reply-dispatch.js";