From c102309028bf84866720c4d8f4690d5131fcffe4 Mon Sep 17 00:00:00 2001 From: huntharo Date: Thu, 19 Mar 2026 22:48:15 -0400 Subject: [PATCH 1/3] Infra: unify plugin split runtime state --- src/infra/agent-events.test.ts | 46 +++++ src/infra/agent-events.ts | 46 +++-- src/infra/heartbeat-events.test.ts | 34 ++++ src/infra/heartbeat-events.ts | 30 +++- .../outbound/session-binding-service.test.ts | 161 ++++++++++++++++-- src/infra/outbound/session-binding-service.ts | 36 +++- src/infra/system-events.test.ts | 28 +++ src/infra/system-events.ts | 6 +- 8 files changed, 349 insertions(+), 38 deletions(-) diff --git a/src/infra/agent-events.test.ts b/src/infra/agent-events.test.ts index 0079a443c7b..c0a56110b3b 100644 --- a/src/infra/agent-events.test.ts +++ b/src/infra/agent-events.test.ts @@ -8,6 +8,14 @@ import { resetAgentRunContextForTest, } from "./agent-events.js"; +type AgentEventsModule = typeof import("./agent-events.js"); + +const agentEventsModuleUrl = new URL("./agent-events.ts", import.meta.url).href; + +async function importAgentEventsModule(cacheBust: string): Promise { + return (await import(`${agentEventsModuleUrl}?t=${cacheBust}`)) as AgentEventsModule; +} + describe("agent-events sequencing", () => { test("stores and clears run context", async () => { resetAgentRunContextForTest(); @@ -144,4 +152,42 @@ describe("agent-events sequencing", () => { expect(seen).toEqual(["run-safe"]); }); + + test("shares run context, listeners, and sequence state across duplicate module instances", async () => { + const first = await importAgentEventsModule(`first-${Date.now()}`); + const second = await importAgentEventsModule(`second-${Date.now()}`); + + first.resetAgentEventsForTest(); + first.registerAgentRunContext("run-dup", { sessionKey: "session-dup" }); + + const seen: Array<{ seq: number; sessionKey?: string }> = []; + const stop = first.onAgentEvent((evt) => { + if (evt.runId === "run-dup") { + seen.push({ seq: evt.seq, sessionKey: evt.sessionKey }); + } + }); + + second.emitAgentEvent({ + runId: "run-dup", + stream: "assistant", + data: { text: "from second" }, + sessionKey: " ", + }); + first.emitAgentEvent({ + runId: "run-dup", + stream: "assistant", + data: { text: "from first" }, + sessionKey: " ", + }); + + stop(); + + expect(second.getAgentRunContext("run-dup")).toEqual({ sessionKey: "session-dup" }); + expect(seen).toEqual([ + { seq: 1, sessionKey: "session-dup" }, + { seq: 2, sessionKey: "session-dup" }, + ]); + + first.resetAgentEventsForTest(); + }); }); diff --git a/src/infra/agent-events.ts b/src/infra/agent-events.ts index 3b2e219574b..2f57087b027 100644 --- a/src/infra/agent-events.ts +++ b/src/infra/agent-events.ts @@ -1,4 +1,5 @@ import type { VerboseLevel } from "../auto-reply/thinking.js"; +import { resolveGlobalSingleton } from "../shared/global-singleton.js"; export type AgentEventStream = "lifecycle" | "tool" | "assistant" | "error" | (string & {}); @@ -19,18 +20,27 @@ export type AgentRunContext = { isControlUiVisible?: boolean; }; -// Keep per-run counters so streams stay strictly monotonic per runId. -const seqByRun = new Map(); -const listeners = new Set<(evt: AgentEventPayload) => void>(); -const runContextById = new Map(); +type AgentEventState = { + seqByRun: Map; + listeners: Set<(evt: AgentEventPayload) => void>; + runContextById: Map; +}; + +const AGENT_EVENT_STATE_KEY = Symbol.for("openclaw.agentEvents.state"); + +const state = resolveGlobalSingleton(AGENT_EVENT_STATE_KEY, () => ({ + seqByRun: new Map(), + listeners: new Set<(evt: AgentEventPayload) => void>(), + runContextById: new Map(), +})); export function registerAgentRunContext(runId: string, context: AgentRunContext) { if (!runId) { return; } - const existing = runContextById.get(runId); + const existing = state.runContextById.get(runId); if (!existing) { - runContextById.set(runId, { ...context }); + state.runContextById.set(runId, { ...context }); return; } if (context.sessionKey && existing.sessionKey !== context.sessionKey) { @@ -48,21 +58,21 @@ export function registerAgentRunContext(runId: string, context: AgentRunContext) } export function getAgentRunContext(runId: string) { - return runContextById.get(runId); + return state.runContextById.get(runId); } export function clearAgentRunContext(runId: string) { - runContextById.delete(runId); + state.runContextById.delete(runId); } export function resetAgentRunContextForTest() { - runContextById.clear(); + state.runContextById.clear(); } export function emitAgentEvent(event: Omit) { - const nextSeq = (seqByRun.get(event.runId) ?? 0) + 1; - seqByRun.set(event.runId, nextSeq); - const context = runContextById.get(event.runId); + const nextSeq = (state.seqByRun.get(event.runId) ?? 0) + 1; + state.seqByRun.set(event.runId, nextSeq); + const context = state.runContextById.get(event.runId); const isControlUiVisible = context?.isControlUiVisible ?? true; const eventSessionKey = typeof event.sessionKey === "string" && event.sessionKey.trim() ? event.sessionKey : undefined; @@ -73,7 +83,7 @@ export function emitAgentEvent(event: Omit) { seq: nextSeq, ts: Date.now(), }; - for (const listener of listeners) { + for (const listener of state.listeners) { try { listener(enriched); } catch { @@ -83,6 +93,12 @@ export function emitAgentEvent(event: Omit) { } export function onAgentEvent(listener: (evt: AgentEventPayload) => void) { - listeners.add(listener); - return () => listeners.delete(listener); + state.listeners.add(listener); + return () => state.listeners.delete(listener); +} + +export function resetAgentEventsForTest() { + state.seqByRun.clear(); + state.listeners.clear(); + state.runContextById.clear(); } diff --git a/src/infra/heartbeat-events.test.ts b/src/infra/heartbeat-events.test.ts index d1583f8080a..98bb8f541bb 100644 --- a/src/infra/heartbeat-events.test.ts +++ b/src/infra/heartbeat-events.test.ts @@ -3,9 +3,18 @@ import { emitHeartbeatEvent, getLastHeartbeatEvent, onHeartbeatEvent, + resetHeartbeatEventsForTest, resolveIndicatorType, } from "./heartbeat-events.js"; +type HeartbeatEventsModule = typeof import("./heartbeat-events.js"); + +const heartbeatEventsModuleUrl = new URL("./heartbeat-events.ts", import.meta.url).href; + +async function importHeartbeatEventsModule(cacheBust: string): Promise { + return (await import(`${heartbeatEventsModuleUrl}?t=${cacheBust}`)) as HeartbeatEventsModule; +} + describe("resolveIndicatorType", () => { it("maps heartbeat statuses to indicator types", () => { expect(resolveIndicatorType("ok-empty")).toBe("ok"); @@ -23,6 +32,7 @@ describe("heartbeat events", () => { }); afterEach(() => { + resetHeartbeatEventsForTest(); vi.useRealTimers(); }); @@ -56,4 +66,28 @@ describe("heartbeat events", () => { expect(seen).toEqual(["first:ok-empty", "third:ok-empty"]); }); + + it("shares heartbeat state across duplicate module instances", async () => { + const first = await importHeartbeatEventsModule(`first-${Date.now()}`); + const second = await importHeartbeatEventsModule(`second-${Date.now()}`); + + first.resetHeartbeatEventsForTest(); + + const seen: string[] = []; + const stop = first.onHeartbeatEvent((evt) => { + seen.push(evt.status); + }); + + second.emitHeartbeatEvent({ status: "ok-token", preview: "pong" }); + + expect(first.getLastHeartbeatEvent()).toEqual({ + ts: 1767960000000, + status: "ok-token", + preview: "pong", + }); + expect(seen).toEqual(["ok-token"]); + + stop(); + first.resetHeartbeatEventsForTest(); + }); }); diff --git a/src/infra/heartbeat-events.ts b/src/infra/heartbeat-events.ts index e16860e5d4f..04ea6a4dd30 100644 --- a/src/infra/heartbeat-events.ts +++ b/src/infra/heartbeat-events.ts @@ -1,3 +1,5 @@ +import { resolveGlobalSingleton } from "../shared/global-singleton.js"; + export type HeartbeatIndicatorType = "ok" | "alert" | "error"; export type HeartbeatEventPayload = { @@ -33,13 +35,22 @@ export function resolveIndicatorType( } } -let lastHeartbeat: HeartbeatEventPayload | null = null; -const listeners = new Set<(evt: HeartbeatEventPayload) => void>(); +type HeartbeatEventState = { + lastHeartbeat: HeartbeatEventPayload | null; + listeners: Set<(evt: HeartbeatEventPayload) => void>; +}; + +const HEARTBEAT_EVENT_STATE_KEY = Symbol.for("openclaw.heartbeatEvents.state"); + +const state = resolveGlobalSingleton(HEARTBEAT_EVENT_STATE_KEY, () => ({ + lastHeartbeat: null, + listeners: new Set<(evt: HeartbeatEventPayload) => void>(), +})); export function emitHeartbeatEvent(evt: Omit) { const enriched: HeartbeatEventPayload = { ts: Date.now(), ...evt }; - lastHeartbeat = enriched; - for (const listener of listeners) { + state.lastHeartbeat = enriched; + for (const listener of state.listeners) { try { listener(enriched); } catch { @@ -49,10 +60,15 @@ export function emitHeartbeatEvent(evt: Omit) { } export function onHeartbeatEvent(listener: (evt: HeartbeatEventPayload) => void): () => void { - listeners.add(listener); - return () => listeners.delete(listener); + state.listeners.add(listener); + return () => state.listeners.delete(listener); } export function getLastHeartbeatEvent(): HeartbeatEventPayload | null { - return lastHeartbeat; + return state.lastHeartbeat; +} + +export function resetHeartbeatEventsForTest(): void { + state.lastHeartbeat = null; + state.listeners.clear(); } diff --git a/src/infra/outbound/session-binding-service.test.ts b/src/infra/outbound/session-binding-service.test.ts index 84fe5708802..994ff427c8a 100644 --- a/src/infra/outbound/session-binding-service.test.ts +++ b/src/infra/outbound/session-binding-service.test.ts @@ -4,10 +4,24 @@ import { getSessionBindingService, isSessionBindingError, registerSessionBindingAdapter, + unregisterSessionBindingAdapter, type SessionBindingBindInput, type SessionBindingRecord, } from "./session-binding-service.js"; +type SessionBindingServiceModule = typeof import("./session-binding-service.js"); + +const sessionBindingServiceModuleUrl = new URL("./session-binding-service.ts", import.meta.url) + .href; + +async function importSessionBindingServiceModule( + cacheBust: string, +): Promise { + return (await import( + `${sessionBindingServiceModuleUrl}?t=${cacheBust}` + )) as SessionBindingServiceModule; +} + function createRecord(input: SessionBindingBindInput): SessionBindingRecord { const conversationId = input.placement === "child" @@ -199,23 +213,150 @@ describe("session binding service", () => { }); }); - it("rejects duplicate adapter registration for the same channel account", () => { + it("treats duplicate adapter registration for the same channel account as idempotent", async () => { + const firstBind = vi.fn(async (input: SessionBindingBindInput) => createRecord(input)); + const secondBind = vi.fn(async (input: SessionBindingBindInput) => createRecord(input)); + registerSessionBindingAdapter({ channel: "discord", accountId: "default", - bind: async (input) => createRecord(input), + bind: firstBind, listBySession: () => [], resolveByConversation: () => null, }); - expect(() => - registerSessionBindingAdapter({ - channel: "Discord", - accountId: "DEFAULT", - bind: async (input) => createRecord(input), - listBySession: () => [], - resolveByConversation: () => null, + registerSessionBindingAdapter({ + channel: "Discord", + accountId: "DEFAULT", + bind: secondBind, + listBySession: () => [], + resolveByConversation: () => null, + }); + + await expect( + getSessionBindingService().bind({ + targetSessionKey: "agent:main:subagent:child-1", + targetKind: "subagent", + conversation: { + channel: "discord", + accountId: "default", + conversationId: "thread-1", + }, }), - ).toThrow("Session binding adapter already registered for discord:default"); + ).resolves.toMatchObject({ + conversation: expect.objectContaining({ + channel: "discord", + accountId: "default", + conversationId: "thread-1", + }), + }); + expect(firstBind).toHaveBeenCalledTimes(1); + expect(secondBind).not.toHaveBeenCalled(); + + unregisterSessionBindingAdapter({ channel: "discord", accountId: "default" }); + + await expect( + getSessionBindingService().bind({ + targetSessionKey: "agent:main:subagent:child-2", + targetKind: "subagent", + conversation: { + channel: "discord", + accountId: "default", + conversationId: "thread-2", + }, + }), + ).resolves.toMatchObject({ + conversation: expect.objectContaining({ + channel: "discord", + accountId: "default", + conversationId: "thread-2", + }), + }); + + unregisterSessionBindingAdapter({ channel: "discord", accountId: "default" }); + + await expect( + getSessionBindingService().bind({ + targetSessionKey: "agent:main:subagent:child-3", + targetKind: "subagent", + conversation: { + channel: "discord", + accountId: "default", + conversationId: "thread-3", + }, + }), + ).rejects.toMatchObject({ + code: "BINDING_ADAPTER_UNAVAILABLE", + }); + }); + + it("shares registered adapters across duplicate module instances", async () => { + const first = await importSessionBindingServiceModule(`first-${Date.now()}`); + const second = await importSessionBindingServiceModule(`second-${Date.now()}`); + const firstBind = vi.fn(async (input: SessionBindingBindInput) => createRecord(input)); + const secondBind = vi.fn(async (input: SessionBindingBindInput) => createRecord(input)); + + first.__testing.resetSessionBindingAdaptersForTests(); + first.registerSessionBindingAdapter({ + channel: "discord", + accountId: "default", + bind: firstBind, + listBySession: () => [], + resolveByConversation: () => null, + }); + second.registerSessionBindingAdapter({ + channel: "discord", + accountId: "default", + bind: secondBind, + listBySession: () => [], + resolveByConversation: () => null, + }); + + expect(second.__testing.getRegisteredAdapterKeys()).toEqual(["discord:default"]); + + await expect( + second.getSessionBindingService().bind({ + targetSessionKey: "agent:main:subagent:child-1", + targetKind: "subagent", + conversation: { + channel: "discord", + accountId: "default", + conversationId: "thread-1", + }, + }), + ).resolves.toMatchObject({ + conversation: expect.objectContaining({ + channel: "discord", + accountId: "default", + conversationId: "thread-1", + }), + }); + expect(firstBind).toHaveBeenCalledTimes(1); + expect(secondBind).not.toHaveBeenCalled(); + + second.unregisterSessionBindingAdapter({ + channel: "discord", + accountId: "default", + }); + + await expect( + first.getSessionBindingService().bind({ + targetSessionKey: "agent:main:subagent:child-2", + targetKind: "subagent", + conversation: { + channel: "discord", + accountId: "default", + conversationId: "thread-2", + }, + }), + ).resolves.toMatchObject({ + conversation: expect.objectContaining({ + channel: "discord", + accountId: "default", + conversationId: "thread-2", + }), + }); + + first.__testing.resetSessionBindingAdaptersForTests(); }); }); diff --git a/src/infra/outbound/session-binding-service.ts b/src/infra/outbound/session-binding-service.ts index c155d3792b8..ccbe54d53a9 100644 --- a/src/infra/outbound/session-binding-service.ts +++ b/src/infra/outbound/session-binding-service.ts @@ -1,4 +1,5 @@ import { normalizeAccountId } from "../../routing/session-key.js"; +import { resolveGlobalMap } from "../../shared/global-singleton.js"; export type BindingTargetKind = "subagent" | "session"; export type BindingStatus = "active" | "ending" | "ended"; @@ -145,7 +146,17 @@ function resolveAdapterCapabilities( }; } -const ADAPTERS_BY_CHANNEL_ACCOUNT = new Map(); +const SESSION_BINDING_ADAPTERS_KEY = Symbol.for("openclaw.sessionBinding.adapters"); +const SESSION_BINDING_ADAPTER_REF_COUNTS_KEY = Symbol.for( + "openclaw.sessionBinding.adapterRefCounts", +); + +const ADAPTERS_BY_CHANNEL_ACCOUNT = resolveGlobalMap( + SESSION_BINDING_ADAPTERS_KEY, +); +const ADAPTER_REF_COUNTS_BY_CHANNEL_ACCOUNT = resolveGlobalMap( + SESSION_BINDING_ADAPTER_REF_COUNTS_KEY, +); export function registerSessionBindingAdapter(adapter: SessionBindingAdapter): void { const normalizedAdapter = { @@ -158,19 +169,33 @@ export function registerSessionBindingAdapter(adapter: SessionBindingAdapter): v accountId: normalizedAdapter.accountId, }); const existing = ADAPTERS_BY_CHANNEL_ACCOUNT.get(key); - if (existing && existing !== adapter) { - throw new Error( - `Session binding adapter already registered for ${normalizedAdapter.channel}:${normalizedAdapter.accountId}`, + if (existing) { + // Duplicate module graphs can legitimately hit registration multiple times + // for the same logical adapter key in one process. Keep the first adapter + // stable and track registrations so later duplicate imports can unregister + // independently without deleting the live shared adapter too early. + ADAPTER_REF_COUNTS_BY_CHANNEL_ACCOUNT.set( + key, + Math.max(1, ADAPTER_REF_COUNTS_BY_CHANNEL_ACCOUNT.get(key) ?? 1) + 1, ); + return; } ADAPTERS_BY_CHANNEL_ACCOUNT.set(key, normalizedAdapter); + ADAPTER_REF_COUNTS_BY_CHANNEL_ACCOUNT.set(key, 1); } export function unregisterSessionBindingAdapter(params: { channel: string; accountId: string; }): void { - ADAPTERS_BY_CHANNEL_ACCOUNT.delete(toAdapterKey(params)); + const key = toAdapterKey(params); + const currentRefCount = ADAPTER_REF_COUNTS_BY_CHANNEL_ACCOUNT.get(key) ?? 0; + if (currentRefCount > 1) { + ADAPTER_REF_COUNTS_BY_CHANNEL_ACCOUNT.set(key, currentRefCount - 1); + return; + } + ADAPTER_REF_COUNTS_BY_CHANNEL_ACCOUNT.delete(key); + ADAPTERS_BY_CHANNEL_ACCOUNT.delete(key); } function resolveAdapterForConversation(ref: ConversationRef): SessionBindingAdapter | null { @@ -325,6 +350,7 @@ export function getSessionBindingService(): SessionBindingService { export const __testing = { resetSessionBindingAdaptersForTests() { ADAPTERS_BY_CHANNEL_ACCOUNT.clear(); + ADAPTER_REF_COUNTS_BY_CHANNEL_ACCOUNT.clear(); }, getRegisteredAdapterKeys() { return [...ADAPTERS_BY_CHANNEL_ACCOUNT.keys()]; diff --git a/src/infra/system-events.test.ts b/src/infra/system-events.test.ts index cf16416e210..c03dbe83725 100644 --- a/src/infra/system-events.test.ts +++ b/src/infra/system-events.test.ts @@ -13,6 +13,14 @@ import { resetSystemEventsForTest, } from "./system-events.js"; +type SystemEventsModule = typeof import("./system-events.js"); + +const systemEventsModuleUrl = new URL("./system-events.ts", import.meta.url).href; + +async function importSystemEventsModule(cacheBust: string): Promise { + return (await import(`${systemEventsModuleUrl}?t=${cacheBust}`)) as SystemEventsModule; +} + const cfg = {} as unknown as OpenClawConfig; const mainKey = resolveMainSessionKey(cfg); @@ -108,6 +116,26 @@ describe("system events (session routing)", () => { ); }); + it("shares queued events across duplicate module instances", async () => { + const first = await importSystemEventsModule(`first-${Date.now()}`); + const second = await importSystemEventsModule(`second-${Date.now()}`); + const key = "agent:main:test-duplicate-module"; + + first.resetSystemEventsForTest(); + second.enqueueSystemEvent("Node connected", { sessionKey: key, contextKey: "build:123" }); + + expect(first.peekSystemEventEntries(key)).toEqual([ + expect.objectContaining({ + text: "Node connected", + contextKey: "build:123", + }), + ]); + expect(first.isSystemEventContextChanged(key, "build:123")).toBe(false); + expect(first.drainSystemEvents(key)).toEqual(["Node connected"]); + + first.resetSystemEventsForTest(); + }); + it("filters heartbeat/noise lines, returning undefined", async () => { const key = "agent:main:test-heartbeat-filter"; enqueueSystemEvent("Read HEARTBEAT.md before continuing", { sessionKey: key }); diff --git a/src/infra/system-events.ts b/src/infra/system-events.ts index 771890bcddd..ce8785d5a22 100644 --- a/src/infra/system-events.ts +++ b/src/infra/system-events.ts @@ -2,6 +2,8 @@ // prefixed to the next prompt. We intentionally avoid persistence to keep // events ephemeral. Events are session-scoped and require an explicit key. +import { resolveGlobalMap } from "../shared/global-singleton.js"; + export type SystemEvent = { text: string; ts: number; contextKey?: string | null }; const MAX_EVENTS = 20; @@ -12,7 +14,9 @@ type SessionQueue = { lastContextKey: string | null; }; -const queues = new Map(); +const SYSTEM_EVENT_QUEUES_KEY = Symbol.for("openclaw.systemEvents.queues"); + +const queues = resolveGlobalMap(SYSTEM_EVENT_QUEUES_KEY); type SystemEventOptions = { sessionKey: string; From f4df5fc5ebaaa0179531a426ef31dbb3b21170ae Mon Sep 17 00:00:00 2001 From: huntharo Date: Thu, 19 Mar 2026 23:43:55 -0400 Subject: [PATCH 2/3] Infra: track session binding adapter owners --- .../src/monitor/thread-bindings.manager.ts | 8 +- extensions/feishu/src/thread-bindings.ts | 13 +++- .../matrix/src/matrix/thread-bindings.ts | 8 +- extensions/telegram/src/thread-bindings.ts | 13 +++- .../outbound/session-binding-service.test.ts | 74 +++++++++++++----- src/infra/outbound/session-binding-service.ts | 78 ++++++++++++------- 6 files changed, 136 insertions(+), 58 deletions(-) diff --git a/extensions/discord/src/monitor/thread-bindings.manager.ts b/extensions/discord/src/monitor/thread-bindings.manager.ts index 5c37ac4bbf0..9e76d01243a 100644 --- a/extensions/discord/src/monitor/thread-bindings.manager.ts +++ b/extensions/discord/src/monitor/thread-bindings.manager.ts @@ -5,6 +5,7 @@ import { registerSessionBindingAdapter, unregisterSessionBindingAdapter, type BindingTargetKind, + type SessionBindingAdapter, type SessionBindingRecord, } from "openclaw/plugin-sdk/conversation-runtime"; import { normalizeAccountId, resolveAgentIdFromSessionKey } from "openclaw/plugin-sdk/routing"; @@ -556,6 +557,7 @@ export function createThreadBindingManager( unregisterSessionBindingAdapter({ channel: "discord", accountId, + adapter: sessionBindingAdapter, }); forgetThreadBindingToken(accountId); }, @@ -572,7 +574,7 @@ export function createThreadBindingManager( } } - registerSessionBindingAdapter({ + const sessionBindingAdapter: SessionBindingAdapter = { channel: "discord", accountId, capabilities: { @@ -682,7 +684,9 @@ export function createThreadBindingManager( }); return removed ? [toSessionBindingRecord(removed, { idleTimeoutMs, maxAgeMs })] : []; }, - }); + }; + + registerSessionBindingAdapter(sessionBindingAdapter); registerManager(manager); return manager; diff --git a/extensions/feishu/src/thread-bindings.ts b/extensions/feishu/src/thread-bindings.ts index cfae8fb2058..a35fe137b5b 100644 --- a/extensions/feishu/src/thread-bindings.ts +++ b/extensions/feishu/src/thread-bindings.ts @@ -8,6 +8,7 @@ import { registerSessionBindingAdapter, unregisterSessionBindingAdapter, type BindingTargetKind, + type SessionBindingAdapter, type SessionBindingRecord, } from "openclaw/plugin-sdk/conversation-runtime"; import { normalizeAccountId, resolveAgentIdFromSessionKey } from "openclaw/plugin-sdk/routing"; @@ -231,11 +232,15 @@ export function createFeishuThreadBindingManager(params: { } } MANAGERS_BY_ACCOUNT_ID.delete(accountId); - unregisterSessionBindingAdapter({ channel: "feishu", accountId }); + unregisterSessionBindingAdapter({ + channel: "feishu", + accountId, + adapter: sessionBindingAdapter, + }); }, }; - registerSessionBindingAdapter({ + const sessionBindingAdapter: SessionBindingAdapter = { channel: "feishu", accountId, capabilities: { @@ -290,7 +295,9 @@ export function createFeishuThreadBindingManager(params: { const removed = manager.unbindConversation(conversationId); return removed ? [toSessionBindingRecord(removed, { idleTimeoutMs, maxAgeMs })] : []; }, - }); + }; + + registerSessionBindingAdapter(sessionBindingAdapter); MANAGERS_BY_ACCOUNT_ID.set(accountId, manager); return manager; diff --git a/extensions/matrix/src/matrix/thread-bindings.ts b/extensions/matrix/src/matrix/thread-bindings.ts index edbbde5d000..1bc3945ccc2 100644 --- a/extensions/matrix/src/matrix/thread-bindings.ts +++ b/extensions/matrix/src/matrix/thread-bindings.ts @@ -1,4 +1,5 @@ import path from "node:path"; +import type { SessionBindingAdapter } from "openclaw/plugin-sdk/conversation-runtime"; import { readJsonFileWithFallback, registerSessionBindingAdapter, @@ -367,6 +368,7 @@ export async function createMatrixThreadBindingManager(params: { unregisterSessionBindingAdapter({ channel: "matrix", accountId: params.accountId, + adapter: sessionBindingAdapter, }); if (getMatrixThreadBindingManagerEntry(params.accountId)?.manager === manager) { deleteMatrixThreadBindingManagerEntry(params.accountId); @@ -413,7 +415,7 @@ export async function createMatrixThreadBindingManager(params: { return removed.map((record) => toSessionBindingRecord(record, defaults)); }; - registerSessionBindingAdapter({ + const sessionBindingAdapter: SessionBindingAdapter = { channel: "matrix", accountId: params.accountId, capabilities: { placements: ["current", "child"], bindSupported: true, unbindSupported: true }, @@ -512,7 +514,9 @@ export async function createMatrixThreadBindingManager(params: { ); return removed; }, - }); + }; + + registerSessionBindingAdapter(sessionBindingAdapter); if (params.enableSweeper !== false) { sweepTimer = setInterval(() => { diff --git a/extensions/telegram/src/thread-bindings.ts b/extensions/telegram/src/thread-bindings.ts index aaf13e15561..71c26989e0c 100644 --- a/extensions/telegram/src/thread-bindings.ts +++ b/extensions/telegram/src/thread-bindings.ts @@ -7,6 +7,7 @@ import { registerSessionBindingAdapter, unregisterSessionBindingAdapter, type BindingTargetKind, + type SessionBindingAdapter, type SessionBindingRecord, } from "openclaw/plugin-sdk/conversation-runtime"; import { writeJsonAtomic } from "openclaw/plugin-sdk/infra-runtime"; @@ -505,7 +506,11 @@ export function createTelegramThreadBindingManager( clearInterval(sweepTimer); sweepTimer = null; } - unregisterSessionBindingAdapter({ channel: "telegram", accountId }); + unregisterSessionBindingAdapter({ + channel: "telegram", + accountId, + adapter: sessionBindingAdapter, + }); const existingManager = MANAGERS_BY_ACCOUNT_ID.get(accountId); if (existingManager === manager) { MANAGERS_BY_ACCOUNT_ID.delete(accountId); @@ -513,7 +518,7 @@ export function createTelegramThreadBindingManager( }, }; - registerSessionBindingAdapter({ + const sessionBindingAdapter: SessionBindingAdapter = { channel: "telegram", accountId, capabilities: { @@ -638,7 +643,9 @@ export function createTelegramThreadBindingManager( ] : []; }, - }); + }; + + registerSessionBindingAdapter(sessionBindingAdapter); const sweeperEnabled = params.enableSweeper !== false; if (sweeperEnabled) { diff --git a/src/infra/outbound/session-binding-service.test.ts b/src/infra/outbound/session-binding-service.test.ts index 994ff427c8a..855dfb54880 100644 --- a/src/infra/outbound/session-binding-service.test.ts +++ b/src/infra/outbound/session-binding-service.test.ts @@ -5,6 +5,7 @@ import { isSessionBindingError, registerSessionBindingAdapter, unregisterSessionBindingAdapter, + type SessionBindingAdapter, type SessionBindingBindInput, type SessionBindingRecord, } from "./session-binding-service.js"; @@ -213,25 +214,27 @@ describe("session binding service", () => { }); }); - it("treats duplicate adapter registration for the same channel account as idempotent", async () => { + it("promotes the remaining adapter when duplicate registrations unregister", async () => { const firstBind = vi.fn(async (input: SessionBindingBindInput) => createRecord(input)); const secondBind = vi.fn(async (input: SessionBindingBindInput) => createRecord(input)); - registerSessionBindingAdapter({ + const firstAdapter: SessionBindingAdapter = { channel: "discord", accountId: "default", bind: firstBind, listBySession: () => [], resolveByConversation: () => null, - }); - - registerSessionBindingAdapter({ + }; + const secondAdapter: SessionBindingAdapter = { channel: "Discord", accountId: "DEFAULT", bind: secondBind, listBySession: () => [], resolveByConversation: () => null, - }); + }; + + registerSessionBindingAdapter(firstAdapter); + registerSessionBindingAdapter(secondAdapter); await expect( getSessionBindingService().bind({ @@ -250,10 +253,14 @@ describe("session binding service", () => { conversationId: "thread-1", }), }); - expect(firstBind).toHaveBeenCalledTimes(1); - expect(secondBind).not.toHaveBeenCalled(); + expect(firstBind).not.toHaveBeenCalled(); + expect(secondBind).toHaveBeenCalledTimes(1); - unregisterSessionBindingAdapter({ channel: "discord", accountId: "default" }); + unregisterSessionBindingAdapter({ + channel: "discord", + accountId: "default", + adapter: firstAdapter, + }); await expect( getSessionBindingService().bind({ @@ -272,8 +279,14 @@ describe("session binding service", () => { conversationId: "thread-2", }), }); + expect(firstBind).not.toHaveBeenCalled(); + expect(secondBind).toHaveBeenCalledTimes(2); - unregisterSessionBindingAdapter({ channel: "discord", accountId: "default" }); + unregisterSessionBindingAdapter({ + channel: "discord", + accountId: "default", + adapter: secondAdapter, + }); await expect( getSessionBindingService().bind({ @@ -295,22 +308,24 @@ describe("session binding service", () => { const second = await importSessionBindingServiceModule(`second-${Date.now()}`); const firstBind = vi.fn(async (input: SessionBindingBindInput) => createRecord(input)); const secondBind = vi.fn(async (input: SessionBindingBindInput) => createRecord(input)); - - first.__testing.resetSessionBindingAdaptersForTests(); - first.registerSessionBindingAdapter({ + const firstAdapter: SessionBindingAdapter = { channel: "discord", accountId: "default", bind: firstBind, listBySession: () => [], resolveByConversation: () => null, - }); - second.registerSessionBindingAdapter({ + }; + const secondAdapter: SessionBindingAdapter = { channel: "discord", accountId: "default", bind: secondBind, listBySession: () => [], resolveByConversation: () => null, - }); + }; + + first.__testing.resetSessionBindingAdaptersForTests(); + first.registerSessionBindingAdapter(firstAdapter); + second.registerSessionBindingAdapter(secondAdapter); expect(second.__testing.getRegisteredAdapterKeys()).toEqual(["discord:default"]); @@ -331,12 +346,13 @@ describe("session binding service", () => { conversationId: "thread-1", }), }); - expect(firstBind).toHaveBeenCalledTimes(1); - expect(secondBind).not.toHaveBeenCalled(); + expect(firstBind).not.toHaveBeenCalled(); + expect(secondBind).toHaveBeenCalledTimes(1); second.unregisterSessionBindingAdapter({ channel: "discord", accountId: "default", + adapter: secondAdapter, }); await expect( @@ -356,6 +372,28 @@ describe("session binding service", () => { conversationId: "thread-2", }), }); + expect(firstBind).toHaveBeenCalledTimes(1); + expect(secondBind).toHaveBeenCalledTimes(1); + + first.unregisterSessionBindingAdapter({ + channel: "discord", + accountId: "default", + adapter: firstAdapter, + }); + + await expect( + second.getSessionBindingService().bind({ + targetSessionKey: "agent:main:subagent:child-3", + targetKind: "subagent", + conversation: { + channel: "discord", + accountId: "default", + conversationId: "thread-3", + }, + }), + ).rejects.toMatchObject({ + code: "BINDING_ADAPTER_UNAVAILABLE", + }); first.__testing.resetSessionBindingAdaptersForTests(); }); diff --git a/src/infra/outbound/session-binding-service.ts b/src/infra/outbound/session-binding-service.ts index ccbe54d53a9..e95492cfab4 100644 --- a/src/infra/outbound/session-binding-service.ts +++ b/src/infra/outbound/session-binding-service.ts @@ -147,16 +147,20 @@ function resolveAdapterCapabilities( } const SESSION_BINDING_ADAPTERS_KEY = Symbol.for("openclaw.sessionBinding.adapters"); -const SESSION_BINDING_ADAPTER_REF_COUNTS_KEY = Symbol.for( - "openclaw.sessionBinding.adapterRefCounts", -); -const ADAPTERS_BY_CHANNEL_ACCOUNT = resolveGlobalMap( +type SessionBindingAdapterRegistration = { + adapter: SessionBindingAdapter; + normalizedAdapter: SessionBindingAdapter; +}; + +const ADAPTERS_BY_CHANNEL_ACCOUNT = resolveGlobalMap( SESSION_BINDING_ADAPTERS_KEY, ); -const ADAPTER_REF_COUNTS_BY_CHANNEL_ACCOUNT = resolveGlobalMap( - SESSION_BINDING_ADAPTER_REF_COUNTS_KEY, -); + +function getActiveAdapterForKey(key: string): SessionBindingAdapter | null { + const registrations = ADAPTERS_BY_CHANNEL_ACCOUNT.get(key); + return registrations?.at(-1)?.normalizedAdapter ?? null; +} export function registerSessionBindingAdapter(adapter: SessionBindingAdapter): void { const normalizedAdapter = { @@ -169,33 +173,42 @@ export function registerSessionBindingAdapter(adapter: SessionBindingAdapter): v accountId: normalizedAdapter.accountId, }); const existing = ADAPTERS_BY_CHANNEL_ACCOUNT.get(key); - if (existing) { - // Duplicate module graphs can legitimately hit registration multiple times - // for the same logical adapter key in one process. Keep the first adapter - // stable and track registrations so later duplicate imports can unregister - // independently without deleting the live shared adapter too early. - ADAPTER_REF_COUNTS_BY_CHANNEL_ACCOUNT.set( - key, - Math.max(1, ADAPTER_REF_COUNTS_BY_CHANNEL_ACCOUNT.get(key) ?? 1) + 1, - ); - return; - } - ADAPTERS_BY_CHANNEL_ACCOUNT.set(key, normalizedAdapter); - ADAPTER_REF_COUNTS_BY_CHANNEL_ACCOUNT.set(key, 1); + const registrations = existing ? [...existing] : []; + registrations.push({ + adapter, + normalizedAdapter, + }); + ADAPTERS_BY_CHANNEL_ACCOUNT.set(key, registrations); } export function unregisterSessionBindingAdapter(params: { channel: string; accountId: string; + adapter?: SessionBindingAdapter; }): void { const key = toAdapterKey(params); - const currentRefCount = ADAPTER_REF_COUNTS_BY_CHANNEL_ACCOUNT.get(key) ?? 0; - if (currentRefCount > 1) { - ADAPTER_REF_COUNTS_BY_CHANNEL_ACCOUNT.set(key, currentRefCount - 1); + const registrations = ADAPTERS_BY_CHANNEL_ACCOUNT.get(key); + if (!registrations || registrations.length === 0) { return; } - ADAPTER_REF_COUNTS_BY_CHANNEL_ACCOUNT.delete(key); - ADAPTERS_BY_CHANNEL_ACCOUNT.delete(key); + const nextRegistrations = [...registrations]; + if (params.adapter) { + // Remove the matching owner so a surviving duplicate graph can stay active. + const registrationIndex = nextRegistrations.findLastIndex( + (registration) => registration.adapter === params.adapter, + ); + if (registrationIndex < 0) { + return; + } + nextRegistrations.splice(registrationIndex, 1); + } else { + nextRegistrations.pop(); + } + if (nextRegistrations.length === 0) { + ADAPTERS_BY_CHANNEL_ACCOUNT.delete(key); + return; + } + ADAPTERS_BY_CHANNEL_ACCOUNT.set(key, nextRegistrations); } function resolveAdapterForConversation(ref: ConversationRef): SessionBindingAdapter | null { @@ -213,7 +226,13 @@ function resolveAdapterForChannelAccount(params: { channel: params.channel, accountId: params.accountId, }); - return ADAPTERS_BY_CHANNEL_ACCOUNT.get(key) ?? null; + return getActiveAdapterForKey(key); +} + +function getActiveRegisteredAdapters(): SessionBindingAdapter[] { + return [...ADAPTERS_BY_CHANNEL_ACCOUNT.values()] + .map((registrations) => registrations.at(-1)?.normalizedAdapter ?? null) + .filter((adapter): adapter is SessionBindingAdapter => Boolean(adapter)); } function dedupeBindings(records: SessionBindingRecord[]): SessionBindingRecord[] { @@ -297,7 +316,7 @@ function createDefaultSessionBindingService(): SessionBindingService { return []; } const results: SessionBindingRecord[] = []; - for (const adapter of ADAPTERS_BY_CHANNEL_ACCOUNT.values()) { + for (const adapter of getActiveRegisteredAdapters()) { const entries = adapter.listBySession(key); if (entries.length > 0) { results.push(...entries); @@ -321,13 +340,13 @@ function createDefaultSessionBindingService(): SessionBindingService { if (!normalizedBindingId) { return; } - for (const adapter of ADAPTERS_BY_CHANNEL_ACCOUNT.values()) { + for (const adapter of getActiveRegisteredAdapters()) { adapter.touch?.(normalizedBindingId, at); } }, unbind: async (input) => { const removed: SessionBindingRecord[] = []; - for (const adapter of ADAPTERS_BY_CHANNEL_ACCOUNT.values()) { + for (const adapter of getActiveRegisteredAdapters()) { if (!adapter.unbind) { continue; } @@ -350,7 +369,6 @@ export function getSessionBindingService(): SessionBindingService { export const __testing = { resetSessionBindingAdaptersForTests() { ADAPTERS_BY_CHANNEL_ACCOUNT.clear(); - ADAPTER_REF_COUNTS_BY_CHANNEL_ACCOUNT.clear(); }, getRegisteredAdapterKeys() { return [...ADAPTERS_BY_CHANNEL_ACCOUNT.keys()]; From 5cacc8c2f232d22f70d44f7c4977fb5a4084161b Mon Sep 17 00:00:00 2001 From: huntharo Date: Thu, 19 Mar 2026 23:53:20 -0400 Subject: [PATCH 3/3] Infra: preserve active session binding adapter --- .../outbound/session-binding-service.test.ts | 102 ++++++------------ src/infra/outbound/session-binding-service.ts | 4 +- 2 files changed, 33 insertions(+), 73 deletions(-) diff --git a/src/infra/outbound/session-binding-service.test.ts b/src/infra/outbound/session-binding-service.test.ts index 855dfb54880..849171c91ad 100644 --- a/src/infra/outbound/session-binding-service.test.ts +++ b/src/infra/outbound/session-binding-service.test.ts @@ -214,21 +214,29 @@ describe("session binding service", () => { }); }); - it("promotes the remaining adapter when duplicate registrations unregister", async () => { - const firstBind = vi.fn(async (input: SessionBindingBindInput) => createRecord(input)); - const secondBind = vi.fn(async (input: SessionBindingBindInput) => createRecord(input)); - + it("keeps the first live adapter authoritative until it unregisters", () => { + const firstBinding = { + bindingId: "first-binding", + targetSessionKey: "agent:main", + targetKind: "session" as const, + conversation: { + channel: "discord", + accountId: "default", + conversationId: "thread-1", + }, + status: "active" as const, + boundAt: 1, + }; const firstAdapter: SessionBindingAdapter = { channel: "discord", accountId: "default", - bind: firstBind, - listBySession: () => [], + listBySession: (targetSessionKey) => + targetSessionKey === "agent:main" ? [firstBinding] : [], resolveByConversation: () => null, }; const secondAdapter: SessionBindingAdapter = { channel: "Discord", accountId: "DEFAULT", - bind: secondBind, listBySession: () => [], resolveByConversation: () => null, }; @@ -236,51 +244,7 @@ describe("session binding service", () => { registerSessionBindingAdapter(firstAdapter); registerSessionBindingAdapter(secondAdapter); - await expect( - getSessionBindingService().bind({ - targetSessionKey: "agent:main:subagent:child-1", - targetKind: "subagent", - conversation: { - channel: "discord", - accountId: "default", - conversationId: "thread-1", - }, - }), - ).resolves.toMatchObject({ - conversation: expect.objectContaining({ - channel: "discord", - accountId: "default", - conversationId: "thread-1", - }), - }); - expect(firstBind).not.toHaveBeenCalled(); - expect(secondBind).toHaveBeenCalledTimes(1); - - unregisterSessionBindingAdapter({ - channel: "discord", - accountId: "default", - adapter: firstAdapter, - }); - - await expect( - getSessionBindingService().bind({ - targetSessionKey: "agent:main:subagent:child-2", - targetKind: "subagent", - conversation: { - channel: "discord", - accountId: "default", - conversationId: "thread-2", - }, - }), - ).resolves.toMatchObject({ - conversation: expect.objectContaining({ - channel: "discord", - accountId: "default", - conversationId: "thread-2", - }), - }); - expect(firstBind).not.toHaveBeenCalled(); - expect(secondBind).toHaveBeenCalledTimes(2); + expect(getSessionBindingService().listBySession("agent:main")).toEqual([firstBinding]); unregisterSessionBindingAdapter({ channel: "discord", @@ -288,19 +252,15 @@ describe("session binding service", () => { adapter: secondAdapter, }); - await expect( - getSessionBindingService().bind({ - targetSessionKey: "agent:main:subagent:child-3", - targetKind: "subagent", - conversation: { - channel: "discord", - accountId: "default", - conversationId: "thread-3", - }, - }), - ).rejects.toMatchObject({ - code: "BINDING_ADAPTER_UNAVAILABLE", + expect(getSessionBindingService().listBySession("agent:main")).toEqual([firstBinding]); + + unregisterSessionBindingAdapter({ + channel: "discord", + accountId: "default", + adapter: firstAdapter, }); + + expect(getSessionBindingService().listBySession("agent:main")).toEqual([]); }); it("shares registered adapters across duplicate module instances", async () => { @@ -346,17 +306,17 @@ describe("session binding service", () => { conversationId: "thread-1", }), }); - expect(firstBind).not.toHaveBeenCalled(); - expect(secondBind).toHaveBeenCalledTimes(1); + expect(firstBind).toHaveBeenCalledTimes(1); + expect(secondBind).not.toHaveBeenCalled(); - second.unregisterSessionBindingAdapter({ + first.unregisterSessionBindingAdapter({ channel: "discord", accountId: "default", - adapter: secondAdapter, + adapter: firstAdapter, }); await expect( - first.getSessionBindingService().bind({ + second.getSessionBindingService().bind({ targetSessionKey: "agent:main:subagent:child-2", targetKind: "subagent", conversation: { @@ -375,10 +335,10 @@ describe("session binding service", () => { expect(firstBind).toHaveBeenCalledTimes(1); expect(secondBind).toHaveBeenCalledTimes(1); - first.unregisterSessionBindingAdapter({ + second.unregisterSessionBindingAdapter({ channel: "discord", accountId: "default", - adapter: firstAdapter, + adapter: secondAdapter, }); await expect( diff --git a/src/infra/outbound/session-binding-service.ts b/src/infra/outbound/session-binding-service.ts index e95492cfab4..4aab3fd0e7c 100644 --- a/src/infra/outbound/session-binding-service.ts +++ b/src/infra/outbound/session-binding-service.ts @@ -159,7 +159,7 @@ const ADAPTERS_BY_CHANNEL_ACCOUNT = resolveGlobalMap registrations.at(-1)?.normalizedAdapter ?? null) + .map((registrations) => registrations[0]?.normalizedAdapter ?? null) .filter((adapter): adapter is SessionBindingAdapter => Boolean(adapter)); }