diff --git a/extensions/discord/src/monitor/thread-bindings.manager.ts b/extensions/discord/src/monitor/thread-bindings.manager.ts index 5c37ac4bbf0..9e76d01243a 100644 --- a/extensions/discord/src/monitor/thread-bindings.manager.ts +++ b/extensions/discord/src/monitor/thread-bindings.manager.ts @@ -5,6 +5,7 @@ import { registerSessionBindingAdapter, unregisterSessionBindingAdapter, type BindingTargetKind, + type SessionBindingAdapter, type SessionBindingRecord, } from "openclaw/plugin-sdk/conversation-runtime"; import { normalizeAccountId, resolveAgentIdFromSessionKey } from "openclaw/plugin-sdk/routing"; @@ -556,6 +557,7 @@ export function createThreadBindingManager( unregisterSessionBindingAdapter({ channel: "discord", accountId, + adapter: sessionBindingAdapter, }); forgetThreadBindingToken(accountId); }, @@ -572,7 +574,7 @@ export function createThreadBindingManager( } } - registerSessionBindingAdapter({ + const sessionBindingAdapter: SessionBindingAdapter = { channel: "discord", accountId, capabilities: { @@ -682,7 +684,9 @@ export function createThreadBindingManager( }); return removed ? [toSessionBindingRecord(removed, { idleTimeoutMs, maxAgeMs })] : []; }, - }); + }; + + registerSessionBindingAdapter(sessionBindingAdapter); registerManager(manager); return manager; diff --git a/extensions/feishu/src/thread-bindings.ts b/extensions/feishu/src/thread-bindings.ts index cfae8fb2058..a35fe137b5b 100644 --- a/extensions/feishu/src/thread-bindings.ts +++ b/extensions/feishu/src/thread-bindings.ts @@ -8,6 +8,7 @@ import { registerSessionBindingAdapter, unregisterSessionBindingAdapter, type BindingTargetKind, + type SessionBindingAdapter, type SessionBindingRecord, } from "openclaw/plugin-sdk/conversation-runtime"; import { normalizeAccountId, resolveAgentIdFromSessionKey } from "openclaw/plugin-sdk/routing"; @@ -231,11 +232,15 @@ export function createFeishuThreadBindingManager(params: { } } MANAGERS_BY_ACCOUNT_ID.delete(accountId); - unregisterSessionBindingAdapter({ channel: "feishu", accountId }); + unregisterSessionBindingAdapter({ + channel: "feishu", + accountId, + adapter: sessionBindingAdapter, + }); }, }; - registerSessionBindingAdapter({ + const sessionBindingAdapter: SessionBindingAdapter = { channel: "feishu", accountId, capabilities: { @@ -290,7 +295,9 @@ export function createFeishuThreadBindingManager(params: { const removed = manager.unbindConversation(conversationId); return removed ? [toSessionBindingRecord(removed, { idleTimeoutMs, maxAgeMs })] : []; }, - }); + }; + + registerSessionBindingAdapter(sessionBindingAdapter); MANAGERS_BY_ACCOUNT_ID.set(accountId, manager); return manager; diff --git a/extensions/matrix/src/matrix/thread-bindings.ts b/extensions/matrix/src/matrix/thread-bindings.ts index edbbde5d000..1bc3945ccc2 100644 --- a/extensions/matrix/src/matrix/thread-bindings.ts +++ b/extensions/matrix/src/matrix/thread-bindings.ts @@ -1,4 +1,5 @@ import path from "node:path"; +import type { SessionBindingAdapter } from "openclaw/plugin-sdk/conversation-runtime"; import { readJsonFileWithFallback, registerSessionBindingAdapter, @@ -367,6 +368,7 @@ export async function createMatrixThreadBindingManager(params: { unregisterSessionBindingAdapter({ channel: "matrix", accountId: params.accountId, + adapter: sessionBindingAdapter, }); if (getMatrixThreadBindingManagerEntry(params.accountId)?.manager === manager) { deleteMatrixThreadBindingManagerEntry(params.accountId); @@ -413,7 +415,7 @@ export async function createMatrixThreadBindingManager(params: { return removed.map((record) => toSessionBindingRecord(record, defaults)); }; - registerSessionBindingAdapter({ + const sessionBindingAdapter: SessionBindingAdapter = { channel: "matrix", accountId: params.accountId, capabilities: { placements: ["current", "child"], bindSupported: true, unbindSupported: true }, @@ -512,7 +514,9 @@ export async function createMatrixThreadBindingManager(params: { ); return removed; }, - }); + }; + + registerSessionBindingAdapter(sessionBindingAdapter); if (params.enableSweeper !== false) { sweepTimer = setInterval(() => { diff --git a/extensions/telegram/src/thread-bindings.ts b/extensions/telegram/src/thread-bindings.ts index aaf13e15561..71c26989e0c 100644 --- a/extensions/telegram/src/thread-bindings.ts +++ b/extensions/telegram/src/thread-bindings.ts @@ -7,6 +7,7 @@ import { registerSessionBindingAdapter, unregisterSessionBindingAdapter, type BindingTargetKind, + type SessionBindingAdapter, type SessionBindingRecord, } from "openclaw/plugin-sdk/conversation-runtime"; import { writeJsonAtomic } from "openclaw/plugin-sdk/infra-runtime"; @@ -505,7 +506,11 @@ export function createTelegramThreadBindingManager( clearInterval(sweepTimer); sweepTimer = null; } - unregisterSessionBindingAdapter({ channel: "telegram", accountId }); + unregisterSessionBindingAdapter({ + channel: "telegram", + accountId, + adapter: sessionBindingAdapter, + }); const existingManager = MANAGERS_BY_ACCOUNT_ID.get(accountId); if (existingManager === manager) { MANAGERS_BY_ACCOUNT_ID.delete(accountId); @@ -513,7 +518,7 @@ export function createTelegramThreadBindingManager( }, }; - registerSessionBindingAdapter({ + const sessionBindingAdapter: SessionBindingAdapter = { channel: "telegram", accountId, capabilities: { @@ -638,7 +643,9 @@ export function createTelegramThreadBindingManager( ] : []; }, - }); + }; + + registerSessionBindingAdapter(sessionBindingAdapter); const sweeperEnabled = params.enableSweeper !== false; if (sweeperEnabled) { diff --git a/src/infra/outbound/session-binding-service.test.ts b/src/infra/outbound/session-binding-service.test.ts index 994ff427c8a..855dfb54880 100644 --- a/src/infra/outbound/session-binding-service.test.ts +++ b/src/infra/outbound/session-binding-service.test.ts @@ -5,6 +5,7 @@ import { isSessionBindingError, registerSessionBindingAdapter, unregisterSessionBindingAdapter, + type SessionBindingAdapter, type SessionBindingBindInput, type SessionBindingRecord, } from "./session-binding-service.js"; @@ -213,25 +214,27 @@ describe("session binding service", () => { }); }); - it("treats duplicate adapter registration for the same channel account as idempotent", async () => { + it("promotes the remaining adapter when duplicate registrations unregister", async () => { const firstBind = vi.fn(async (input: SessionBindingBindInput) => createRecord(input)); const secondBind = vi.fn(async (input: SessionBindingBindInput) => createRecord(input)); - registerSessionBindingAdapter({ + const firstAdapter: SessionBindingAdapter = { channel: "discord", accountId: "default", bind: firstBind, listBySession: () => [], resolveByConversation: () => null, - }); - - registerSessionBindingAdapter({ + }; + const secondAdapter: SessionBindingAdapter = { channel: "Discord", accountId: "DEFAULT", bind: secondBind, listBySession: () => [], resolveByConversation: () => null, - }); + }; + + registerSessionBindingAdapter(firstAdapter); + registerSessionBindingAdapter(secondAdapter); await expect( getSessionBindingService().bind({ @@ -250,10 +253,14 @@ describe("session binding service", () => { conversationId: "thread-1", }), }); - expect(firstBind).toHaveBeenCalledTimes(1); - expect(secondBind).not.toHaveBeenCalled(); + expect(firstBind).not.toHaveBeenCalled(); + expect(secondBind).toHaveBeenCalledTimes(1); - unregisterSessionBindingAdapter({ channel: "discord", accountId: "default" }); + unregisterSessionBindingAdapter({ + channel: "discord", + accountId: "default", + adapter: firstAdapter, + }); await expect( getSessionBindingService().bind({ @@ -272,8 +279,14 @@ describe("session binding service", () => { conversationId: "thread-2", }), }); + expect(firstBind).not.toHaveBeenCalled(); + expect(secondBind).toHaveBeenCalledTimes(2); - unregisterSessionBindingAdapter({ channel: "discord", accountId: "default" }); + unregisterSessionBindingAdapter({ + channel: "discord", + accountId: "default", + adapter: secondAdapter, + }); await expect( getSessionBindingService().bind({ @@ -295,22 +308,24 @@ describe("session binding service", () => { const second = await importSessionBindingServiceModule(`second-${Date.now()}`); const firstBind = vi.fn(async (input: SessionBindingBindInput) => createRecord(input)); const secondBind = vi.fn(async (input: SessionBindingBindInput) => createRecord(input)); - - first.__testing.resetSessionBindingAdaptersForTests(); - first.registerSessionBindingAdapter({ + const firstAdapter: SessionBindingAdapter = { channel: "discord", accountId: "default", bind: firstBind, listBySession: () => [], resolveByConversation: () => null, - }); - second.registerSessionBindingAdapter({ + }; + const secondAdapter: SessionBindingAdapter = { channel: "discord", accountId: "default", bind: secondBind, listBySession: () => [], resolveByConversation: () => null, - }); + }; + + first.__testing.resetSessionBindingAdaptersForTests(); + first.registerSessionBindingAdapter(firstAdapter); + second.registerSessionBindingAdapter(secondAdapter); expect(second.__testing.getRegisteredAdapterKeys()).toEqual(["discord:default"]); @@ -331,12 +346,13 @@ describe("session binding service", () => { conversationId: "thread-1", }), }); - expect(firstBind).toHaveBeenCalledTimes(1); - expect(secondBind).not.toHaveBeenCalled(); + expect(firstBind).not.toHaveBeenCalled(); + expect(secondBind).toHaveBeenCalledTimes(1); second.unregisterSessionBindingAdapter({ channel: "discord", accountId: "default", + adapter: secondAdapter, }); await expect( @@ -356,6 +372,28 @@ describe("session binding service", () => { conversationId: "thread-2", }), }); + expect(firstBind).toHaveBeenCalledTimes(1); + expect(secondBind).toHaveBeenCalledTimes(1); + + first.unregisterSessionBindingAdapter({ + channel: "discord", + accountId: "default", + adapter: firstAdapter, + }); + + await expect( + second.getSessionBindingService().bind({ + targetSessionKey: "agent:main:subagent:child-3", + targetKind: "subagent", + conversation: { + channel: "discord", + accountId: "default", + conversationId: "thread-3", + }, + }), + ).rejects.toMatchObject({ + code: "BINDING_ADAPTER_UNAVAILABLE", + }); first.__testing.resetSessionBindingAdaptersForTests(); }); diff --git a/src/infra/outbound/session-binding-service.ts b/src/infra/outbound/session-binding-service.ts index ccbe54d53a9..e95492cfab4 100644 --- a/src/infra/outbound/session-binding-service.ts +++ b/src/infra/outbound/session-binding-service.ts @@ -147,16 +147,20 @@ function resolveAdapterCapabilities( } const SESSION_BINDING_ADAPTERS_KEY = Symbol.for("openclaw.sessionBinding.adapters"); -const SESSION_BINDING_ADAPTER_REF_COUNTS_KEY = Symbol.for( - "openclaw.sessionBinding.adapterRefCounts", -); -const ADAPTERS_BY_CHANNEL_ACCOUNT = resolveGlobalMap( +type SessionBindingAdapterRegistration = { + adapter: SessionBindingAdapter; + normalizedAdapter: SessionBindingAdapter; +}; + +const ADAPTERS_BY_CHANNEL_ACCOUNT = resolveGlobalMap( SESSION_BINDING_ADAPTERS_KEY, ); -const ADAPTER_REF_COUNTS_BY_CHANNEL_ACCOUNT = resolveGlobalMap( - SESSION_BINDING_ADAPTER_REF_COUNTS_KEY, -); + +function getActiveAdapterForKey(key: string): SessionBindingAdapter | null { + const registrations = ADAPTERS_BY_CHANNEL_ACCOUNT.get(key); + return registrations?.at(-1)?.normalizedAdapter ?? null; +} export function registerSessionBindingAdapter(adapter: SessionBindingAdapter): void { const normalizedAdapter = { @@ -169,33 +173,42 @@ export function registerSessionBindingAdapter(adapter: SessionBindingAdapter): v accountId: normalizedAdapter.accountId, }); const existing = ADAPTERS_BY_CHANNEL_ACCOUNT.get(key); - if (existing) { - // Duplicate module graphs can legitimately hit registration multiple times - // for the same logical adapter key in one process. Keep the first adapter - // stable and track registrations so later duplicate imports can unregister - // independently without deleting the live shared adapter too early. - ADAPTER_REF_COUNTS_BY_CHANNEL_ACCOUNT.set( - key, - Math.max(1, ADAPTER_REF_COUNTS_BY_CHANNEL_ACCOUNT.get(key) ?? 1) + 1, - ); - return; - } - ADAPTERS_BY_CHANNEL_ACCOUNT.set(key, normalizedAdapter); - ADAPTER_REF_COUNTS_BY_CHANNEL_ACCOUNT.set(key, 1); + const registrations = existing ? [...existing] : []; + registrations.push({ + adapter, + normalizedAdapter, + }); + ADAPTERS_BY_CHANNEL_ACCOUNT.set(key, registrations); } export function unregisterSessionBindingAdapter(params: { channel: string; accountId: string; + adapter?: SessionBindingAdapter; }): void { const key = toAdapterKey(params); - const currentRefCount = ADAPTER_REF_COUNTS_BY_CHANNEL_ACCOUNT.get(key) ?? 0; - if (currentRefCount > 1) { - ADAPTER_REF_COUNTS_BY_CHANNEL_ACCOUNT.set(key, currentRefCount - 1); + const registrations = ADAPTERS_BY_CHANNEL_ACCOUNT.get(key); + if (!registrations || registrations.length === 0) { return; } - ADAPTER_REF_COUNTS_BY_CHANNEL_ACCOUNT.delete(key); - ADAPTERS_BY_CHANNEL_ACCOUNT.delete(key); + const nextRegistrations = [...registrations]; + if (params.adapter) { + // Remove the matching owner so a surviving duplicate graph can stay active. + const registrationIndex = nextRegistrations.findLastIndex( + (registration) => registration.adapter === params.adapter, + ); + if (registrationIndex < 0) { + return; + } + nextRegistrations.splice(registrationIndex, 1); + } else { + nextRegistrations.pop(); + } + if (nextRegistrations.length === 0) { + ADAPTERS_BY_CHANNEL_ACCOUNT.delete(key); + return; + } + ADAPTERS_BY_CHANNEL_ACCOUNT.set(key, nextRegistrations); } function resolveAdapterForConversation(ref: ConversationRef): SessionBindingAdapter | null { @@ -213,7 +226,13 @@ function resolveAdapterForChannelAccount(params: { channel: params.channel, accountId: params.accountId, }); - return ADAPTERS_BY_CHANNEL_ACCOUNT.get(key) ?? null; + return getActiveAdapterForKey(key); +} + +function getActiveRegisteredAdapters(): SessionBindingAdapter[] { + return [...ADAPTERS_BY_CHANNEL_ACCOUNT.values()] + .map((registrations) => registrations.at(-1)?.normalizedAdapter ?? null) + .filter((adapter): adapter is SessionBindingAdapter => Boolean(adapter)); } function dedupeBindings(records: SessionBindingRecord[]): SessionBindingRecord[] { @@ -297,7 +316,7 @@ function createDefaultSessionBindingService(): SessionBindingService { return []; } const results: SessionBindingRecord[] = []; - for (const adapter of ADAPTERS_BY_CHANNEL_ACCOUNT.values()) { + for (const adapter of getActiveRegisteredAdapters()) { const entries = adapter.listBySession(key); if (entries.length > 0) { results.push(...entries); @@ -321,13 +340,13 @@ function createDefaultSessionBindingService(): SessionBindingService { if (!normalizedBindingId) { return; } - for (const adapter of ADAPTERS_BY_CHANNEL_ACCOUNT.values()) { + for (const adapter of getActiveRegisteredAdapters()) { adapter.touch?.(normalizedBindingId, at); } }, unbind: async (input) => { const removed: SessionBindingRecord[] = []; - for (const adapter of ADAPTERS_BY_CHANNEL_ACCOUNT.values()) { + for (const adapter of getActiveRegisteredAdapters()) { if (!adapter.unbind) { continue; } @@ -350,7 +369,6 @@ export function getSessionBindingService(): SessionBindingService { export const __testing = { resetSessionBindingAdaptersForTests() { ADAPTERS_BY_CHANNEL_ACCOUNT.clear(); - ADAPTER_REF_COUNTS_BY_CHANNEL_ACCOUNT.clear(); }, getRegisteredAdapterKeys() { return [...ADAPTERS_BY_CHANNEL_ACCOUNT.keys()];