Infra: track session binding adapter owners

This commit is contained in:
huntharo 2026-03-19 23:43:55 -04:00
parent c102309028
commit f4df5fc5eb
No known key found for this signature in database
6 changed files with 136 additions and 58 deletions

View File

@ -5,6 +5,7 @@ import {
registerSessionBindingAdapter,
unregisterSessionBindingAdapter,
type BindingTargetKind,
type SessionBindingAdapter,
type SessionBindingRecord,
} from "openclaw/plugin-sdk/conversation-runtime";
import { normalizeAccountId, resolveAgentIdFromSessionKey } from "openclaw/plugin-sdk/routing";
@ -556,6 +557,7 @@ export function createThreadBindingManager(
unregisterSessionBindingAdapter({
channel: "discord",
accountId,
adapter: sessionBindingAdapter,
});
forgetThreadBindingToken(accountId);
},
@ -572,7 +574,7 @@ export function createThreadBindingManager(
}
}
registerSessionBindingAdapter({
const sessionBindingAdapter: SessionBindingAdapter = {
channel: "discord",
accountId,
capabilities: {
@ -682,7 +684,9 @@ export function createThreadBindingManager(
});
return removed ? [toSessionBindingRecord(removed, { idleTimeoutMs, maxAgeMs })] : [];
},
});
};
registerSessionBindingAdapter(sessionBindingAdapter);
registerManager(manager);
return manager;

View File

@ -8,6 +8,7 @@ import {
registerSessionBindingAdapter,
unregisterSessionBindingAdapter,
type BindingTargetKind,
type SessionBindingAdapter,
type SessionBindingRecord,
} from "openclaw/plugin-sdk/conversation-runtime";
import { normalizeAccountId, resolveAgentIdFromSessionKey } from "openclaw/plugin-sdk/routing";
@ -231,11 +232,15 @@ export function createFeishuThreadBindingManager(params: {
}
}
MANAGERS_BY_ACCOUNT_ID.delete(accountId);
unregisterSessionBindingAdapter({ channel: "feishu", accountId });
unregisterSessionBindingAdapter({
channel: "feishu",
accountId,
adapter: sessionBindingAdapter,
});
},
};
registerSessionBindingAdapter({
const sessionBindingAdapter: SessionBindingAdapter = {
channel: "feishu",
accountId,
capabilities: {
@ -290,7 +295,9 @@ export function createFeishuThreadBindingManager(params: {
const removed = manager.unbindConversation(conversationId);
return removed ? [toSessionBindingRecord(removed, { idleTimeoutMs, maxAgeMs })] : [];
},
});
};
registerSessionBindingAdapter(sessionBindingAdapter);
MANAGERS_BY_ACCOUNT_ID.set(accountId, manager);
return manager;

View File

@ -1,4 +1,5 @@
import path from "node:path";
import type { SessionBindingAdapter } from "openclaw/plugin-sdk/conversation-runtime";
import {
readJsonFileWithFallback,
registerSessionBindingAdapter,
@ -367,6 +368,7 @@ export async function createMatrixThreadBindingManager(params: {
unregisterSessionBindingAdapter({
channel: "matrix",
accountId: params.accountId,
adapter: sessionBindingAdapter,
});
if (getMatrixThreadBindingManagerEntry(params.accountId)?.manager === manager) {
deleteMatrixThreadBindingManagerEntry(params.accountId);
@ -413,7 +415,7 @@ export async function createMatrixThreadBindingManager(params: {
return removed.map((record) => toSessionBindingRecord(record, defaults));
};
registerSessionBindingAdapter({
const sessionBindingAdapter: SessionBindingAdapter = {
channel: "matrix",
accountId: params.accountId,
capabilities: { placements: ["current", "child"], bindSupported: true, unbindSupported: true },
@ -512,7 +514,9 @@ export async function createMatrixThreadBindingManager(params: {
);
return removed;
},
});
};
registerSessionBindingAdapter(sessionBindingAdapter);
if (params.enableSweeper !== false) {
sweepTimer = setInterval(() => {

View File

@ -7,6 +7,7 @@ import {
registerSessionBindingAdapter,
unregisterSessionBindingAdapter,
type BindingTargetKind,
type SessionBindingAdapter,
type SessionBindingRecord,
} from "openclaw/plugin-sdk/conversation-runtime";
import { writeJsonAtomic } from "openclaw/plugin-sdk/infra-runtime";
@ -505,7 +506,11 @@ export function createTelegramThreadBindingManager(
clearInterval(sweepTimer);
sweepTimer = null;
}
unregisterSessionBindingAdapter({ channel: "telegram", accountId });
unregisterSessionBindingAdapter({
channel: "telegram",
accountId,
adapter: sessionBindingAdapter,
});
const existingManager = MANAGERS_BY_ACCOUNT_ID.get(accountId);
if (existingManager === manager) {
MANAGERS_BY_ACCOUNT_ID.delete(accountId);
@ -513,7 +518,7 @@ export function createTelegramThreadBindingManager(
},
};
registerSessionBindingAdapter({
const sessionBindingAdapter: SessionBindingAdapter = {
channel: "telegram",
accountId,
capabilities: {
@ -638,7 +643,9 @@ export function createTelegramThreadBindingManager(
]
: [];
},
});
};
registerSessionBindingAdapter(sessionBindingAdapter);
const sweeperEnabled = params.enableSweeper !== false;
if (sweeperEnabled) {

View File

@ -5,6 +5,7 @@ import {
isSessionBindingError,
registerSessionBindingAdapter,
unregisterSessionBindingAdapter,
type SessionBindingAdapter,
type SessionBindingBindInput,
type SessionBindingRecord,
} from "./session-binding-service.js";
@ -213,25 +214,27 @@ describe("session binding service", () => {
});
});
it("treats duplicate adapter registration for the same channel account as idempotent", async () => {
it("promotes the remaining adapter when duplicate registrations unregister", async () => {
const firstBind = vi.fn(async (input: SessionBindingBindInput) => createRecord(input));
const secondBind = vi.fn(async (input: SessionBindingBindInput) => createRecord(input));
registerSessionBindingAdapter({
const firstAdapter: SessionBindingAdapter = {
channel: "discord",
accountId: "default",
bind: firstBind,
listBySession: () => [],
resolveByConversation: () => null,
});
registerSessionBindingAdapter({
};
const secondAdapter: SessionBindingAdapter = {
channel: "Discord",
accountId: "DEFAULT",
bind: secondBind,
listBySession: () => [],
resolveByConversation: () => null,
});
};
registerSessionBindingAdapter(firstAdapter);
registerSessionBindingAdapter(secondAdapter);
await expect(
getSessionBindingService().bind({
@ -250,10 +253,14 @@ describe("session binding service", () => {
conversationId: "thread-1",
}),
});
expect(firstBind).toHaveBeenCalledTimes(1);
expect(secondBind).not.toHaveBeenCalled();
expect(firstBind).not.toHaveBeenCalled();
expect(secondBind).toHaveBeenCalledTimes(1);
unregisterSessionBindingAdapter({ channel: "discord", accountId: "default" });
unregisterSessionBindingAdapter({
channel: "discord",
accountId: "default",
adapter: firstAdapter,
});
await expect(
getSessionBindingService().bind({
@ -272,8 +279,14 @@ describe("session binding service", () => {
conversationId: "thread-2",
}),
});
expect(firstBind).not.toHaveBeenCalled();
expect(secondBind).toHaveBeenCalledTimes(2);
unregisterSessionBindingAdapter({ channel: "discord", accountId: "default" });
unregisterSessionBindingAdapter({
channel: "discord",
accountId: "default",
adapter: secondAdapter,
});
await expect(
getSessionBindingService().bind({
@ -295,22 +308,24 @@ describe("session binding service", () => {
const second = await importSessionBindingServiceModule(`second-${Date.now()}`);
const firstBind = vi.fn(async (input: SessionBindingBindInput) => createRecord(input));
const secondBind = vi.fn(async (input: SessionBindingBindInput) => createRecord(input));
first.__testing.resetSessionBindingAdaptersForTests();
first.registerSessionBindingAdapter({
const firstAdapter: SessionBindingAdapter = {
channel: "discord",
accountId: "default",
bind: firstBind,
listBySession: () => [],
resolveByConversation: () => null,
});
second.registerSessionBindingAdapter({
};
const secondAdapter: SessionBindingAdapter = {
channel: "discord",
accountId: "default",
bind: secondBind,
listBySession: () => [],
resolveByConversation: () => null,
});
};
first.__testing.resetSessionBindingAdaptersForTests();
first.registerSessionBindingAdapter(firstAdapter);
second.registerSessionBindingAdapter(secondAdapter);
expect(second.__testing.getRegisteredAdapterKeys()).toEqual(["discord:default"]);
@ -331,12 +346,13 @@ describe("session binding service", () => {
conversationId: "thread-1",
}),
});
expect(firstBind).toHaveBeenCalledTimes(1);
expect(secondBind).not.toHaveBeenCalled();
expect(firstBind).not.toHaveBeenCalled();
expect(secondBind).toHaveBeenCalledTimes(1);
second.unregisterSessionBindingAdapter({
channel: "discord",
accountId: "default",
adapter: secondAdapter,
});
await expect(
@ -356,6 +372,28 @@ describe("session binding service", () => {
conversationId: "thread-2",
}),
});
expect(firstBind).toHaveBeenCalledTimes(1);
expect(secondBind).toHaveBeenCalledTimes(1);
first.unregisterSessionBindingAdapter({
channel: "discord",
accountId: "default",
adapter: firstAdapter,
});
await expect(
second.getSessionBindingService().bind({
targetSessionKey: "agent:main:subagent:child-3",
targetKind: "subagent",
conversation: {
channel: "discord",
accountId: "default",
conversationId: "thread-3",
},
}),
).rejects.toMatchObject({
code: "BINDING_ADAPTER_UNAVAILABLE",
});
first.__testing.resetSessionBindingAdaptersForTests();
});

View File

@ -147,16 +147,20 @@ function resolveAdapterCapabilities(
}
const SESSION_BINDING_ADAPTERS_KEY = Symbol.for("openclaw.sessionBinding.adapters");
const SESSION_BINDING_ADAPTER_REF_COUNTS_KEY = Symbol.for(
"openclaw.sessionBinding.adapterRefCounts",
);
const ADAPTERS_BY_CHANNEL_ACCOUNT = resolveGlobalMap<string, SessionBindingAdapter>(
type SessionBindingAdapterRegistration = {
adapter: SessionBindingAdapter;
normalizedAdapter: SessionBindingAdapter;
};
const ADAPTERS_BY_CHANNEL_ACCOUNT = resolveGlobalMap<string, SessionBindingAdapterRegistration[]>(
SESSION_BINDING_ADAPTERS_KEY,
);
const ADAPTER_REF_COUNTS_BY_CHANNEL_ACCOUNT = resolveGlobalMap<string, number>(
SESSION_BINDING_ADAPTER_REF_COUNTS_KEY,
);
function getActiveAdapterForKey(key: string): SessionBindingAdapter | null {
const registrations = ADAPTERS_BY_CHANNEL_ACCOUNT.get(key);
return registrations?.at(-1)?.normalizedAdapter ?? null;
}
export function registerSessionBindingAdapter(adapter: SessionBindingAdapter): void {
const normalizedAdapter = {
@ -169,33 +173,42 @@ export function registerSessionBindingAdapter(adapter: SessionBindingAdapter): v
accountId: normalizedAdapter.accountId,
});
const existing = ADAPTERS_BY_CHANNEL_ACCOUNT.get(key);
if (existing) {
// Duplicate module graphs can legitimately hit registration multiple times
// for the same logical adapter key in one process. Keep the first adapter
// stable and track registrations so later duplicate imports can unregister
// independently without deleting the live shared adapter too early.
ADAPTER_REF_COUNTS_BY_CHANNEL_ACCOUNT.set(
key,
Math.max(1, ADAPTER_REF_COUNTS_BY_CHANNEL_ACCOUNT.get(key) ?? 1) + 1,
);
return;
}
ADAPTERS_BY_CHANNEL_ACCOUNT.set(key, normalizedAdapter);
ADAPTER_REF_COUNTS_BY_CHANNEL_ACCOUNT.set(key, 1);
const registrations = existing ? [...existing] : [];
registrations.push({
adapter,
normalizedAdapter,
});
ADAPTERS_BY_CHANNEL_ACCOUNT.set(key, registrations);
}
export function unregisterSessionBindingAdapter(params: {
channel: string;
accountId: string;
adapter?: SessionBindingAdapter;
}): void {
const key = toAdapterKey(params);
const currentRefCount = ADAPTER_REF_COUNTS_BY_CHANNEL_ACCOUNT.get(key) ?? 0;
if (currentRefCount > 1) {
ADAPTER_REF_COUNTS_BY_CHANNEL_ACCOUNT.set(key, currentRefCount - 1);
const registrations = ADAPTERS_BY_CHANNEL_ACCOUNT.get(key);
if (!registrations || registrations.length === 0) {
return;
}
ADAPTER_REF_COUNTS_BY_CHANNEL_ACCOUNT.delete(key);
ADAPTERS_BY_CHANNEL_ACCOUNT.delete(key);
const nextRegistrations = [...registrations];
if (params.adapter) {
// Remove the matching owner so a surviving duplicate graph can stay active.
const registrationIndex = nextRegistrations.findLastIndex(
(registration) => registration.adapter === params.adapter,
);
if (registrationIndex < 0) {
return;
}
nextRegistrations.splice(registrationIndex, 1);
} else {
nextRegistrations.pop();
}
if (nextRegistrations.length === 0) {
ADAPTERS_BY_CHANNEL_ACCOUNT.delete(key);
return;
}
ADAPTERS_BY_CHANNEL_ACCOUNT.set(key, nextRegistrations);
}
function resolveAdapterForConversation(ref: ConversationRef): SessionBindingAdapter | null {
@ -213,7 +226,13 @@ function resolveAdapterForChannelAccount(params: {
channel: params.channel,
accountId: params.accountId,
});
return ADAPTERS_BY_CHANNEL_ACCOUNT.get(key) ?? null;
return getActiveAdapterForKey(key);
}
function getActiveRegisteredAdapters(): SessionBindingAdapter[] {
return [...ADAPTERS_BY_CHANNEL_ACCOUNT.values()]
.map((registrations) => registrations.at(-1)?.normalizedAdapter ?? null)
.filter((adapter): adapter is SessionBindingAdapter => Boolean(adapter));
}
function dedupeBindings(records: SessionBindingRecord[]): SessionBindingRecord[] {
@ -297,7 +316,7 @@ function createDefaultSessionBindingService(): SessionBindingService {
return [];
}
const results: SessionBindingRecord[] = [];
for (const adapter of ADAPTERS_BY_CHANNEL_ACCOUNT.values()) {
for (const adapter of getActiveRegisteredAdapters()) {
const entries = adapter.listBySession(key);
if (entries.length > 0) {
results.push(...entries);
@ -321,13 +340,13 @@ function createDefaultSessionBindingService(): SessionBindingService {
if (!normalizedBindingId) {
return;
}
for (const adapter of ADAPTERS_BY_CHANNEL_ACCOUNT.values()) {
for (const adapter of getActiveRegisteredAdapters()) {
adapter.touch?.(normalizedBindingId, at);
}
},
unbind: async (input) => {
const removed: SessionBindingRecord[] = [];
for (const adapter of ADAPTERS_BY_CHANNEL_ACCOUNT.values()) {
for (const adapter of getActiveRegisteredAdapters()) {
if (!adapter.unbind) {
continue;
}
@ -350,7 +369,6 @@ export function getSessionBindingService(): SessionBindingService {
export const __testing = {
resetSessionBindingAdaptersForTests() {
ADAPTERS_BY_CHANNEL_ACCOUNT.clear();
ADAPTER_REF_COUNTS_BY_CHANNEL_ACCOUNT.clear();
},
getRegisteredAdapterKeys() {
return [...ADAPTERS_BY_CHANNEL_ACCOUNT.keys()];