Gateway: unify plugin interactive callback state (#50722)
Merged via squash. Prepared head SHA: 7a2740b18a336bc3a58c23cff08953a5c06a6078 Co-authored-by: huntharo <5617868+huntharo@users.noreply.github.com> Co-authored-by: huntharo <5617868+huntharo@users.noreply.github.com> Reviewed-by: @huntharo
This commit is contained in:
parent
61ae7e033b
commit
65594f972c
@ -168,6 +168,7 @@ Docs: https://docs.openclaw.ai
|
||||
- Onboarding/custom providers: keep Azure AI Foundry `*.services.ai.azure.com` custom endpoints on the selected compatibility path instead of forcing Responses, so chat-completions Foundry models still work after setup. Fixes #50528. (#50535) Thanks @obviyus.
|
||||
- Plugins/update: let `openclaw plugins update <npm-spec>` target tracked npm installs by dist-tag or exact version, and preserve the recorded npm spec for later id-based updates. (#49998) Thanks @huntharo.
|
||||
- Tests/CLI: reduce command-secret gateway test import pressure while keeping the real protocol payload validator in place, so the isolated lane no longer carries the heavier runtime-web and message-channel graphs. (#50663) Thanks @huntharo.
|
||||
- Gateway/plugins: share plugin interactive callback routing and plugin bind approval state across duplicate module graphs so Telegram Codex picker buttons and plugin bind approvals no longer fall through to normal inbound message routing. (#50722) Thanks @huntharo.
|
||||
|
||||
### Breaking
|
||||
|
||||
|
||||
@ -1382,14 +1382,14 @@ describe("createTelegramBot", () => {
|
||||
expect(replySpy).not.toHaveBeenCalled();
|
||||
});
|
||||
|
||||
it.skip("routes plugin-owned callback namespaces before synthetic command fallback", async () => {
|
||||
it("routes plugin-owned callback namespaces before synthetic command fallback", async () => {
|
||||
onSpy.mockClear();
|
||||
replySpy.mockClear();
|
||||
editMessageTextSpy.mockClear();
|
||||
sendMessageSpy.mockClear();
|
||||
registerPluginInteractiveHandler("codex-plugin", {
|
||||
channel: "telegram",
|
||||
namespace: "codex",
|
||||
namespace: "codexapp",
|
||||
handler: async ({ respond, callback }: PluginInteractiveTelegramHandlerContext) => {
|
||||
await respond.editMessage({
|
||||
text: `Handled ${callback.payload}`,
|
||||
@ -1416,7 +1416,7 @@ describe("createTelegramBot", () => {
|
||||
await callbackHandler({
|
||||
callbackQuery: {
|
||||
id: "cbq-codex-1",
|
||||
data: "codex:resume:thread-1",
|
||||
data: "codexapp:resume:thread-1",
|
||||
from: { id: 9, first_name: "Ada", username: "ada_bot" },
|
||||
message: {
|
||||
chat: { id: 1234, type: "private" },
|
||||
|
||||
@ -109,6 +109,17 @@ const { registerSessionBindingAdapter, unregisterSessionBindingAdapter } =
|
||||
await import("../infra/outbound/session-binding-service.js");
|
||||
|
||||
type PluginBindingRequest = Awaited<ReturnType<typeof requestPluginConversationBinding>>;
|
||||
type ConversationBindingModule = typeof import("./conversation-binding.js");
|
||||
|
||||
const conversationBindingModuleUrl = new URL("./conversation-binding.ts", import.meta.url).href;
|
||||
|
||||
async function importConversationBindingModule(
|
||||
cacheBust: string,
|
||||
): Promise<ConversationBindingModule> {
|
||||
return (await import(
|
||||
`${conversationBindingModuleUrl}?t=${cacheBust}`
|
||||
)) as ConversationBindingModule;
|
||||
}
|
||||
|
||||
function createAdapter(channel: string, accountId: string): SessionBindingAdapter {
|
||||
return {
|
||||
@ -290,6 +301,108 @@ describe("plugin conversation binding approvals", () => {
|
||||
expect(differentAccount.status).toBe("pending");
|
||||
});
|
||||
|
||||
it("shares pending bind approvals across duplicate module instances", async () => {
|
||||
const first = await importConversationBindingModule(`first-${Date.now()}`);
|
||||
const second = await importConversationBindingModule(`second-${Date.now()}`);
|
||||
|
||||
first.__testing.reset();
|
||||
|
||||
const request = await first.requestPluginConversationBinding({
|
||||
pluginId: "codex",
|
||||
pluginName: "Codex App Server",
|
||||
pluginRoot: "/plugins/codex-a",
|
||||
requestedBySenderId: "user-1",
|
||||
conversation: {
|
||||
channel: "telegram",
|
||||
accountId: "default",
|
||||
conversationId: "-10099:topic:77",
|
||||
parentConversationId: "-10099",
|
||||
threadId: "77",
|
||||
},
|
||||
binding: { summary: "Bind this conversation to Codex thread abc." },
|
||||
});
|
||||
|
||||
expect(request.status).toBe("pending");
|
||||
if (request.status !== "pending") {
|
||||
throw new Error("expected pending bind request");
|
||||
}
|
||||
|
||||
await expect(
|
||||
second.resolvePluginConversationBindingApproval({
|
||||
approvalId: request.approvalId,
|
||||
decision: "allow-once",
|
||||
senderId: "user-1",
|
||||
}),
|
||||
).resolves.toMatchObject({
|
||||
status: "approved",
|
||||
binding: expect.objectContaining({
|
||||
pluginId: "codex",
|
||||
pluginRoot: "/plugins/codex-a",
|
||||
conversationId: "-10099:topic:77",
|
||||
}),
|
||||
});
|
||||
|
||||
second.__testing.reset();
|
||||
});
|
||||
|
||||
it("shares persistent approvals across duplicate module instances", async () => {
|
||||
const first = await importConversationBindingModule(`first-${Date.now()}`);
|
||||
const second = await importConversationBindingModule(`second-${Date.now()}`);
|
||||
|
||||
first.__testing.reset();
|
||||
|
||||
const request = await first.requestPluginConversationBinding({
|
||||
pluginId: "codex",
|
||||
pluginName: "Codex App Server",
|
||||
pluginRoot: "/plugins/codex-a",
|
||||
requestedBySenderId: "user-1",
|
||||
conversation: {
|
||||
channel: "telegram",
|
||||
accountId: "default",
|
||||
conversationId: "-10099:topic:77",
|
||||
parentConversationId: "-10099",
|
||||
threadId: "77",
|
||||
},
|
||||
binding: { summary: "Bind this conversation to Codex thread abc." },
|
||||
});
|
||||
|
||||
expect(request.status).toBe("pending");
|
||||
if (request.status !== "pending") {
|
||||
throw new Error("expected pending bind request");
|
||||
}
|
||||
|
||||
await expect(
|
||||
second.resolvePluginConversationBindingApproval({
|
||||
approvalId: request.approvalId,
|
||||
decision: "allow-always",
|
||||
senderId: "user-1",
|
||||
}),
|
||||
).resolves.toMatchObject({
|
||||
status: "approved",
|
||||
decision: "allow-always",
|
||||
});
|
||||
|
||||
const rebound = await first.requestPluginConversationBinding({
|
||||
pluginId: "codex",
|
||||
pluginName: "Codex App Server",
|
||||
pluginRoot: "/plugins/codex-a",
|
||||
requestedBySenderId: "user-1",
|
||||
conversation: {
|
||||
channel: "telegram",
|
||||
accountId: "default",
|
||||
conversationId: "-10099:topic:78",
|
||||
parentConversationId: "-10099",
|
||||
threadId: "78",
|
||||
},
|
||||
binding: { summary: "Bind this conversation to Codex thread def." },
|
||||
});
|
||||
|
||||
expect(rebound.status).toBe("bound");
|
||||
|
||||
first.__testing.reset();
|
||||
fs.rmSync(approvalsPath, { force: true });
|
||||
});
|
||||
|
||||
it("does not share persistent approvals across plugin roots even with the same plugin id", async () => {
|
||||
const request = await requestPluginConversationBinding({
|
||||
pluginId: "codex",
|
||||
|
||||
@ -11,6 +11,7 @@ import { expandHomePrefix } from "../infra/home-dir.js";
|
||||
import { writeJsonAtomic } from "../infra/json-files.js";
|
||||
import { type ConversationRef } from "../infra/outbound/session-binding-service.js";
|
||||
import { createSubsystemLogger } from "../logging/subsystem.js";
|
||||
import { resolveGlobalMap, resolveGlobalSingleton } from "../shared/global-singleton.js";
|
||||
import { getActivePluginRegistry } from "./runtime.js";
|
||||
import type {
|
||||
PluginConversationBinding,
|
||||
@ -104,24 +105,26 @@ type PluginBindingResolveResult =
|
||||
status: "expired";
|
||||
};
|
||||
|
||||
const pendingRequests = new Map<string, PendingPluginBindingRequest>();
|
||||
const PLUGIN_BINDING_PENDING_REQUESTS_KEY = Symbol.for("openclaw.pluginBindingPendingRequests");
|
||||
|
||||
const pendingRequests = resolveGlobalMap<string, PendingPluginBindingRequest>(
|
||||
PLUGIN_BINDING_PENDING_REQUESTS_KEY,
|
||||
);
|
||||
|
||||
type PluginBindingGlobalState = {
|
||||
fallbackNoticeBindingIds: Set<string>;
|
||||
approvalsCache: PluginBindingApprovalsFile | null;
|
||||
approvalsLoaded: boolean;
|
||||
};
|
||||
|
||||
const pluginBindingGlobalStateKey = Symbol.for("openclaw.plugins.binding.global-state");
|
||||
|
||||
let approvalsCache: PluginBindingApprovalsFile | null = null;
|
||||
let approvalsLoaded = false;
|
||||
|
||||
function getPluginBindingGlobalState(): PluginBindingGlobalState {
|
||||
const globalStore = globalThis as typeof globalThis & {
|
||||
[pluginBindingGlobalStateKey]?: PluginBindingGlobalState;
|
||||
};
|
||||
return (globalStore[pluginBindingGlobalStateKey] ??= {
|
||||
return resolveGlobalSingleton<PluginBindingGlobalState>(pluginBindingGlobalStateKey, () => ({
|
||||
fallbackNoticeBindingIds: new Set<string>(),
|
||||
});
|
||||
approvalsCache: null,
|
||||
approvalsLoaded: false,
|
||||
}));
|
||||
}
|
||||
|
||||
function resolveApprovalsPath(): string {
|
||||
@ -297,8 +300,9 @@ function loadApprovalsFromDisk(): PluginBindingApprovalsFile {
|
||||
async function saveApprovals(file: PluginBindingApprovalsFile): Promise<void> {
|
||||
const filePath = resolveApprovalsPath();
|
||||
fs.mkdirSync(path.dirname(filePath), { recursive: true });
|
||||
approvalsCache = file;
|
||||
approvalsLoaded = true;
|
||||
const state = getPluginBindingGlobalState();
|
||||
state.approvalsCache = file;
|
||||
state.approvalsLoaded = true;
|
||||
await writeJsonAtomic(filePath, file, {
|
||||
mode: 0o600,
|
||||
trailingNewline: true,
|
||||
@ -306,11 +310,12 @@ async function saveApprovals(file: PluginBindingApprovalsFile): Promise<void> {
|
||||
}
|
||||
|
||||
function getApprovals(): PluginBindingApprovalsFile {
|
||||
if (!approvalsLoaded || !approvalsCache) {
|
||||
approvalsCache = loadApprovalsFromDisk();
|
||||
approvalsLoaded = true;
|
||||
const state = getPluginBindingGlobalState();
|
||||
if (!state.approvalsLoaded || !state.approvalsCache) {
|
||||
state.approvalsCache = loadApprovalsFromDisk();
|
||||
state.approvalsLoaded = true;
|
||||
}
|
||||
return approvalsCache;
|
||||
return state.approvalsCache;
|
||||
}
|
||||
|
||||
function hasPersistentApproval(params: {
|
||||
@ -836,8 +841,9 @@ export function buildPluginBindingResolvedText(params: PluginBindingResolveResul
|
||||
export const __testing = {
|
||||
reset() {
|
||||
pendingRequests.clear();
|
||||
approvalsCache = null;
|
||||
approvalsLoaded = false;
|
||||
getPluginBindingGlobalState().fallbackNoticeBindingIds.clear();
|
||||
const state = getPluginBindingGlobalState();
|
||||
state.approvalsCache = null;
|
||||
state.approvalsLoaded = false;
|
||||
state.fallbackNoticeBindingIds.clear();
|
||||
},
|
||||
};
|
||||
|
||||
@ -49,6 +49,14 @@ type InteractiveDispatchParams =
|
||||
respond: PluginInteractiveSlackHandlerContext["respond"];
|
||||
};
|
||||
|
||||
type InteractiveModule = typeof import("./interactive.js");
|
||||
|
||||
const interactiveModuleUrl = new URL("./interactive.ts", import.meta.url).href;
|
||||
|
||||
async function importInteractiveModule(cacheBust: string): Promise<InteractiveModule> {
|
||||
return (await import(`${interactiveModuleUrl}?t=${cacheBust}`)) as InteractiveModule;
|
||||
}
|
||||
|
||||
async function expectDedupedInteractiveDispatch(params: {
|
||||
baseParams: InteractiveDispatchParams;
|
||||
handler: ReturnType<typeof vi.fn>;
|
||||
@ -172,6 +180,66 @@ describe("plugin interactive handlers", () => {
|
||||
});
|
||||
});
|
||||
|
||||
it("shares interactive handlers across duplicate module instances", async () => {
|
||||
const first = await importInteractiveModule(`first-${Date.now()}`);
|
||||
const second = await importInteractiveModule(`second-${Date.now()}`);
|
||||
const handler = vi.fn(async () => ({ handled: true }));
|
||||
|
||||
first.clearPluginInteractiveHandlers();
|
||||
|
||||
expect(
|
||||
first.registerPluginInteractiveHandler("codex-plugin", {
|
||||
channel: "telegram",
|
||||
namespace: "codexapp",
|
||||
handler,
|
||||
}),
|
||||
).toEqual({ ok: true });
|
||||
|
||||
await expect(
|
||||
second.dispatchPluginInteractiveHandler({
|
||||
channel: "telegram",
|
||||
data: "codexapp:resume:thread-1",
|
||||
callbackId: "cb-shared-1",
|
||||
ctx: {
|
||||
accountId: "default",
|
||||
callbackId: "cb-shared-1",
|
||||
conversationId: "-10099:topic:77",
|
||||
parentConversationId: "-10099",
|
||||
senderId: "user-1",
|
||||
senderUsername: "ada",
|
||||
threadId: 77,
|
||||
isGroup: true,
|
||||
isForum: true,
|
||||
auth: { isAuthorizedSender: true },
|
||||
callbackMessage: {
|
||||
messageId: 55,
|
||||
chatId: "-10099",
|
||||
messageText: "Pick a thread",
|
||||
},
|
||||
},
|
||||
respond: {
|
||||
reply: vi.fn(async () => {}),
|
||||
editMessage: vi.fn(async () => {}),
|
||||
editButtons: vi.fn(async () => {}),
|
||||
clearButtons: vi.fn(async () => {}),
|
||||
deleteMessage: vi.fn(async () => {}),
|
||||
},
|
||||
}),
|
||||
).resolves.toEqual({ matched: true, handled: true, duplicate: false });
|
||||
|
||||
expect(handler).toHaveBeenCalledWith(
|
||||
expect.objectContaining({
|
||||
channel: "telegram",
|
||||
callback: expect.objectContaining({
|
||||
namespace: "codexapp",
|
||||
payload: "resume:thread-1",
|
||||
}),
|
||||
}),
|
||||
);
|
||||
|
||||
second.clearPluginInteractiveHandlers();
|
||||
});
|
||||
|
||||
it("rejects duplicate namespace registrations", () => {
|
||||
const first = registerPluginInteractiveHandler("plugin-a", {
|
||||
channel: "telegram",
|
||||
|
||||
@ -1,4 +1,5 @@
|
||||
import { createDedupeCache } from "../infra/dedupe.js";
|
||||
import { resolveGlobalSingleton } from "../shared/global-singleton.js";
|
||||
import {
|
||||
dispatchDiscordInteractiveHandler,
|
||||
dispatchSlackInteractiveHandler,
|
||||
@ -33,11 +34,23 @@ type InteractiveDispatchResult =
|
||||
| { matched: false; handled: false; duplicate: false }
|
||||
| { matched: true; handled: boolean; duplicate: boolean };
|
||||
|
||||
const interactiveHandlers = new Map<string, RegisteredInteractiveHandler>();
|
||||
const callbackDedupe = createDedupeCache({
|
||||
ttlMs: 5 * 60_000,
|
||||
maxSize: 4096,
|
||||
});
|
||||
type InteractiveState = {
|
||||
interactiveHandlers: Map<string, RegisteredInteractiveHandler>;
|
||||
callbackDedupe: ReturnType<typeof createDedupeCache>;
|
||||
};
|
||||
|
||||
const PLUGIN_INTERACTIVE_STATE_KEY = Symbol.for("openclaw.pluginInteractiveState");
|
||||
|
||||
const state = resolveGlobalSingleton<InteractiveState>(PLUGIN_INTERACTIVE_STATE_KEY, () => ({
|
||||
interactiveHandlers: new Map<string, RegisteredInteractiveHandler>(),
|
||||
callbackDedupe: createDedupeCache({
|
||||
ttlMs: 5 * 60_000,
|
||||
maxSize: 4096,
|
||||
}),
|
||||
}));
|
||||
|
||||
const interactiveHandlers = state.interactiveHandlers;
|
||||
const callbackDedupe = state.callbackDedupe;
|
||||
|
||||
function toRegistryKey(channel: string, namespace: string): string {
|
||||
return `${channel.trim().toLowerCase()}:${namespace.trim()}`;
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user