From 65594f972c2cbdaa8fa8d1f2606d3ec2825e1ac1 Mon Sep 17 00:00:00 2001 From: Harold Hunt Date: Thu, 19 Mar 2026 22:09:38 -0400 Subject: [PATCH] Gateway: unify plugin interactive callback state (#50722) Merged via squash. Prepared head SHA: 7a2740b18a336bc3a58c23cff08953a5c06a6078 Co-authored-by: huntharo <5617868+huntharo@users.noreply.github.com> Co-authored-by: huntharo <5617868+huntharo@users.noreply.github.com> Reviewed-by: @huntharo --- CHANGELOG.md | 1 + extensions/telegram/src/bot.test.ts | 6 +- src/plugins/conversation-binding.test.ts | 113 +++++++++++++++++++++++ src/plugins/conversation-binding.ts | 42 +++++---- src/plugins/interactive.test.ts | 68 ++++++++++++++ src/plugins/interactive.ts | 23 ++++- 6 files changed, 227 insertions(+), 26 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 0cf9b671096..7928c21129d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -168,6 +168,7 @@ Docs: https://docs.openclaw.ai - Onboarding/custom providers: keep Azure AI Foundry `*.services.ai.azure.com` custom endpoints on the selected compatibility path instead of forcing Responses, so chat-completions Foundry models still work after setup. Fixes #50528. (#50535) Thanks @obviyus. - Plugins/update: let `openclaw plugins update ` target tracked npm installs by dist-tag or exact version, and preserve the recorded npm spec for later id-based updates. (#49998) Thanks @huntharo. - Tests/CLI: reduce command-secret gateway test import pressure while keeping the real protocol payload validator in place, so the isolated lane no longer carries the heavier runtime-web and message-channel graphs. (#50663) Thanks @huntharo. +- Gateway/plugins: share plugin interactive callback routing and plugin bind approval state across duplicate module graphs so Telegram Codex picker buttons and plugin bind approvals no longer fall through to normal inbound message routing. (#50722) Thanks @huntharo. ### Breaking diff --git a/extensions/telegram/src/bot.test.ts b/extensions/telegram/src/bot.test.ts index 995fe61ed2a..5fe9ff639f7 100644 --- a/extensions/telegram/src/bot.test.ts +++ b/extensions/telegram/src/bot.test.ts @@ -1382,14 +1382,14 @@ describe("createTelegramBot", () => { expect(replySpy).not.toHaveBeenCalled(); }); - it.skip("routes plugin-owned callback namespaces before synthetic command fallback", async () => { + it("routes plugin-owned callback namespaces before synthetic command fallback", async () => { onSpy.mockClear(); replySpy.mockClear(); editMessageTextSpy.mockClear(); sendMessageSpy.mockClear(); registerPluginInteractiveHandler("codex-plugin", { channel: "telegram", - namespace: "codex", + namespace: "codexapp", handler: async ({ respond, callback }: PluginInteractiveTelegramHandlerContext) => { await respond.editMessage({ text: `Handled ${callback.payload}`, @@ -1416,7 +1416,7 @@ describe("createTelegramBot", () => { await callbackHandler({ callbackQuery: { id: "cbq-codex-1", - data: "codex:resume:thread-1", + data: "codexapp:resume:thread-1", from: { id: 9, first_name: "Ada", username: "ada_bot" }, message: { chat: { id: 1234, type: "private" }, diff --git a/src/plugins/conversation-binding.test.ts b/src/plugins/conversation-binding.test.ts index 81371a7ce3d..3cfc8cc2420 100644 --- a/src/plugins/conversation-binding.test.ts +++ b/src/plugins/conversation-binding.test.ts @@ -109,6 +109,17 @@ const { registerSessionBindingAdapter, unregisterSessionBindingAdapter } = await import("../infra/outbound/session-binding-service.js"); type PluginBindingRequest = Awaited>; +type ConversationBindingModule = typeof import("./conversation-binding.js"); + +const conversationBindingModuleUrl = new URL("./conversation-binding.ts", import.meta.url).href; + +async function importConversationBindingModule( + cacheBust: string, +): Promise { + return (await import( + `${conversationBindingModuleUrl}?t=${cacheBust}` + )) as ConversationBindingModule; +} function createAdapter(channel: string, accountId: string): SessionBindingAdapter { return { @@ -290,6 +301,108 @@ describe("plugin conversation binding approvals", () => { expect(differentAccount.status).toBe("pending"); }); + it("shares pending bind approvals across duplicate module instances", async () => { + const first = await importConversationBindingModule(`first-${Date.now()}`); + const second = await importConversationBindingModule(`second-${Date.now()}`); + + first.__testing.reset(); + + const request = await first.requestPluginConversationBinding({ + pluginId: "codex", + pluginName: "Codex App Server", + pluginRoot: "/plugins/codex-a", + requestedBySenderId: "user-1", + conversation: { + channel: "telegram", + accountId: "default", + conversationId: "-10099:topic:77", + parentConversationId: "-10099", + threadId: "77", + }, + binding: { summary: "Bind this conversation to Codex thread abc." }, + }); + + expect(request.status).toBe("pending"); + if (request.status !== "pending") { + throw new Error("expected pending bind request"); + } + + await expect( + second.resolvePluginConversationBindingApproval({ + approvalId: request.approvalId, + decision: "allow-once", + senderId: "user-1", + }), + ).resolves.toMatchObject({ + status: "approved", + binding: expect.objectContaining({ + pluginId: "codex", + pluginRoot: "/plugins/codex-a", + conversationId: "-10099:topic:77", + }), + }); + + second.__testing.reset(); + }); + + it("shares persistent approvals across duplicate module instances", async () => { + const first = await importConversationBindingModule(`first-${Date.now()}`); + const second = await importConversationBindingModule(`second-${Date.now()}`); + + first.__testing.reset(); + + const request = await first.requestPluginConversationBinding({ + pluginId: "codex", + pluginName: "Codex App Server", + pluginRoot: "/plugins/codex-a", + requestedBySenderId: "user-1", + conversation: { + channel: "telegram", + accountId: "default", + conversationId: "-10099:topic:77", + parentConversationId: "-10099", + threadId: "77", + }, + binding: { summary: "Bind this conversation to Codex thread abc." }, + }); + + expect(request.status).toBe("pending"); + if (request.status !== "pending") { + throw new Error("expected pending bind request"); + } + + await expect( + second.resolvePluginConversationBindingApproval({ + approvalId: request.approvalId, + decision: "allow-always", + senderId: "user-1", + }), + ).resolves.toMatchObject({ + status: "approved", + decision: "allow-always", + }); + + const rebound = await first.requestPluginConversationBinding({ + pluginId: "codex", + pluginName: "Codex App Server", + pluginRoot: "/plugins/codex-a", + requestedBySenderId: "user-1", + conversation: { + channel: "telegram", + accountId: "default", + conversationId: "-10099:topic:78", + parentConversationId: "-10099", + threadId: "78", + }, + binding: { summary: "Bind this conversation to Codex thread def." }, + }); + + expect(rebound.status).toBe("bound"); + + first.__testing.reset(); + fs.rmSync(approvalsPath, { force: true }); + }); + it("does not share persistent approvals across plugin roots even with the same plugin id", async () => { const request = await requestPluginConversationBinding({ pluginId: "codex", diff --git a/src/plugins/conversation-binding.ts b/src/plugins/conversation-binding.ts index aef5ec92b40..10ceeeb9fd5 100644 --- a/src/plugins/conversation-binding.ts +++ b/src/plugins/conversation-binding.ts @@ -11,6 +11,7 @@ import { expandHomePrefix } from "../infra/home-dir.js"; import { writeJsonAtomic } from "../infra/json-files.js"; import { type ConversationRef } from "../infra/outbound/session-binding-service.js"; import { createSubsystemLogger } from "../logging/subsystem.js"; +import { resolveGlobalMap, resolveGlobalSingleton } from "../shared/global-singleton.js"; import { getActivePluginRegistry } from "./runtime.js"; import type { PluginConversationBinding, @@ -104,24 +105,26 @@ type PluginBindingResolveResult = status: "expired"; }; -const pendingRequests = new Map(); +const PLUGIN_BINDING_PENDING_REQUESTS_KEY = Symbol.for("openclaw.pluginBindingPendingRequests"); + +const pendingRequests = resolveGlobalMap( + PLUGIN_BINDING_PENDING_REQUESTS_KEY, +); type PluginBindingGlobalState = { fallbackNoticeBindingIds: Set; + approvalsCache: PluginBindingApprovalsFile | null; + approvalsLoaded: boolean; }; const pluginBindingGlobalStateKey = Symbol.for("openclaw.plugins.binding.global-state"); -let approvalsCache: PluginBindingApprovalsFile | null = null; -let approvalsLoaded = false; - function getPluginBindingGlobalState(): PluginBindingGlobalState { - const globalStore = globalThis as typeof globalThis & { - [pluginBindingGlobalStateKey]?: PluginBindingGlobalState; - }; - return (globalStore[pluginBindingGlobalStateKey] ??= { + return resolveGlobalSingleton(pluginBindingGlobalStateKey, () => ({ fallbackNoticeBindingIds: new Set(), - }); + approvalsCache: null, + approvalsLoaded: false, + })); } function resolveApprovalsPath(): string { @@ -297,8 +300,9 @@ function loadApprovalsFromDisk(): PluginBindingApprovalsFile { async function saveApprovals(file: PluginBindingApprovalsFile): Promise { const filePath = resolveApprovalsPath(); fs.mkdirSync(path.dirname(filePath), { recursive: true }); - approvalsCache = file; - approvalsLoaded = true; + const state = getPluginBindingGlobalState(); + state.approvalsCache = file; + state.approvalsLoaded = true; await writeJsonAtomic(filePath, file, { mode: 0o600, trailingNewline: true, @@ -306,11 +310,12 @@ async function saveApprovals(file: PluginBindingApprovalsFile): Promise { } function getApprovals(): PluginBindingApprovalsFile { - if (!approvalsLoaded || !approvalsCache) { - approvalsCache = loadApprovalsFromDisk(); - approvalsLoaded = true; + const state = getPluginBindingGlobalState(); + if (!state.approvalsLoaded || !state.approvalsCache) { + state.approvalsCache = loadApprovalsFromDisk(); + state.approvalsLoaded = true; } - return approvalsCache; + return state.approvalsCache; } function hasPersistentApproval(params: { @@ -836,8 +841,9 @@ export function buildPluginBindingResolvedText(params: PluginBindingResolveResul export const __testing = { reset() { pendingRequests.clear(); - approvalsCache = null; - approvalsLoaded = false; - getPluginBindingGlobalState().fallbackNoticeBindingIds.clear(); + const state = getPluginBindingGlobalState(); + state.approvalsCache = null; + state.approvalsLoaded = false; + state.fallbackNoticeBindingIds.clear(); }, }; diff --git a/src/plugins/interactive.test.ts b/src/plugins/interactive.test.ts index 2b595e856f8..0cc91e7f04f 100644 --- a/src/plugins/interactive.test.ts +++ b/src/plugins/interactive.test.ts @@ -49,6 +49,14 @@ type InteractiveDispatchParams = respond: PluginInteractiveSlackHandlerContext["respond"]; }; +type InteractiveModule = typeof import("./interactive.js"); + +const interactiveModuleUrl = new URL("./interactive.ts", import.meta.url).href; + +async function importInteractiveModule(cacheBust: string): Promise { + return (await import(`${interactiveModuleUrl}?t=${cacheBust}`)) as InteractiveModule; +} + async function expectDedupedInteractiveDispatch(params: { baseParams: InteractiveDispatchParams; handler: ReturnType; @@ -172,6 +180,66 @@ describe("plugin interactive handlers", () => { }); }); + it("shares interactive handlers across duplicate module instances", async () => { + const first = await importInteractiveModule(`first-${Date.now()}`); + const second = await importInteractiveModule(`second-${Date.now()}`); + const handler = vi.fn(async () => ({ handled: true })); + + first.clearPluginInteractiveHandlers(); + + expect( + first.registerPluginInteractiveHandler("codex-plugin", { + channel: "telegram", + namespace: "codexapp", + handler, + }), + ).toEqual({ ok: true }); + + await expect( + second.dispatchPluginInteractiveHandler({ + channel: "telegram", + data: "codexapp:resume:thread-1", + callbackId: "cb-shared-1", + ctx: { + accountId: "default", + callbackId: "cb-shared-1", + conversationId: "-10099:topic:77", + parentConversationId: "-10099", + senderId: "user-1", + senderUsername: "ada", + threadId: 77, + isGroup: true, + isForum: true, + auth: { isAuthorizedSender: true }, + callbackMessage: { + messageId: 55, + chatId: "-10099", + messageText: "Pick a thread", + }, + }, + respond: { + reply: vi.fn(async () => {}), + editMessage: vi.fn(async () => {}), + editButtons: vi.fn(async () => {}), + clearButtons: vi.fn(async () => {}), + deleteMessage: vi.fn(async () => {}), + }, + }), + ).resolves.toEqual({ matched: true, handled: true, duplicate: false }); + + expect(handler).toHaveBeenCalledWith( + expect.objectContaining({ + channel: "telegram", + callback: expect.objectContaining({ + namespace: "codexapp", + payload: "resume:thread-1", + }), + }), + ); + + second.clearPluginInteractiveHandlers(); + }); + it("rejects duplicate namespace registrations", () => { const first = registerPluginInteractiveHandler("plugin-a", { channel: "telegram", diff --git a/src/plugins/interactive.ts b/src/plugins/interactive.ts index 04403c80fa2..424a5c5d0af 100644 --- a/src/plugins/interactive.ts +++ b/src/plugins/interactive.ts @@ -1,4 +1,5 @@ import { createDedupeCache } from "../infra/dedupe.js"; +import { resolveGlobalSingleton } from "../shared/global-singleton.js"; import { dispatchDiscordInteractiveHandler, dispatchSlackInteractiveHandler, @@ -33,11 +34,23 @@ type InteractiveDispatchResult = | { matched: false; handled: false; duplicate: false } | { matched: true; handled: boolean; duplicate: boolean }; -const interactiveHandlers = new Map(); -const callbackDedupe = createDedupeCache({ - ttlMs: 5 * 60_000, - maxSize: 4096, -}); +type InteractiveState = { + interactiveHandlers: Map; + callbackDedupe: ReturnType; +}; + +const PLUGIN_INTERACTIVE_STATE_KEY = Symbol.for("openclaw.pluginInteractiveState"); + +const state = resolveGlobalSingleton(PLUGIN_INTERACTIVE_STATE_KEY, () => ({ + interactiveHandlers: new Map(), + callbackDedupe: createDedupeCache({ + ttlMs: 5 * 60_000, + maxSize: 4096, + }), +})); + +const interactiveHandlers = state.interactiveHandlers; +const callbackDedupe = state.callbackDedupe; function toRegistryKey(channel: string, namespace: string): string { return `${channel.trim().toLowerCase()}:${namespace.trim()}`;