diff --git a/src/infra/agent-events.test.ts b/src/infra/agent-events.test.ts index 0079a443c7b..c0a56110b3b 100644 --- a/src/infra/agent-events.test.ts +++ b/src/infra/agent-events.test.ts @@ -8,6 +8,14 @@ import { resetAgentRunContextForTest, } from "./agent-events.js"; +type AgentEventsModule = typeof import("./agent-events.js"); + +const agentEventsModuleUrl = new URL("./agent-events.ts", import.meta.url).href; + +async function importAgentEventsModule(cacheBust: string): Promise { + return (await import(`${agentEventsModuleUrl}?t=${cacheBust}`)) as AgentEventsModule; +} + describe("agent-events sequencing", () => { test("stores and clears run context", async () => { resetAgentRunContextForTest(); @@ -144,4 +152,42 @@ describe("agent-events sequencing", () => { expect(seen).toEqual(["run-safe"]); }); + + test("shares run context, listeners, and sequence state across duplicate module instances", async () => { + const first = await importAgentEventsModule(`first-${Date.now()}`); + const second = await importAgentEventsModule(`second-${Date.now()}`); + + first.resetAgentEventsForTest(); + first.registerAgentRunContext("run-dup", { sessionKey: "session-dup" }); + + const seen: Array<{ seq: number; sessionKey?: string }> = []; + const stop = first.onAgentEvent((evt) => { + if (evt.runId === "run-dup") { + seen.push({ seq: evt.seq, sessionKey: evt.sessionKey }); + } + }); + + second.emitAgentEvent({ + runId: "run-dup", + stream: "assistant", + data: { text: "from second" }, + sessionKey: " ", + }); + first.emitAgentEvent({ + runId: "run-dup", + stream: "assistant", + data: { text: "from first" }, + sessionKey: " ", + }); + + stop(); + + expect(second.getAgentRunContext("run-dup")).toEqual({ sessionKey: "session-dup" }); + expect(seen).toEqual([ + { seq: 1, sessionKey: "session-dup" }, + { seq: 2, sessionKey: "session-dup" }, + ]); + + first.resetAgentEventsForTest(); + }); }); diff --git a/src/infra/agent-events.ts b/src/infra/agent-events.ts index 3b2e219574b..2f57087b027 100644 --- a/src/infra/agent-events.ts +++ b/src/infra/agent-events.ts @@ -1,4 +1,5 @@ import type { VerboseLevel } from "../auto-reply/thinking.js"; +import { resolveGlobalSingleton } from "../shared/global-singleton.js"; export type AgentEventStream = "lifecycle" | "tool" | "assistant" | "error" | (string & {}); @@ -19,18 +20,27 @@ export type AgentRunContext = { isControlUiVisible?: boolean; }; -// Keep per-run counters so streams stay strictly monotonic per runId. -const seqByRun = new Map(); -const listeners = new Set<(evt: AgentEventPayload) => void>(); -const runContextById = new Map(); +type AgentEventState = { + seqByRun: Map; + listeners: Set<(evt: AgentEventPayload) => void>; + runContextById: Map; +}; + +const AGENT_EVENT_STATE_KEY = Symbol.for("openclaw.agentEvents.state"); + +const state = resolveGlobalSingleton(AGENT_EVENT_STATE_KEY, () => ({ + seqByRun: new Map(), + listeners: new Set<(evt: AgentEventPayload) => void>(), + runContextById: new Map(), +})); export function registerAgentRunContext(runId: string, context: AgentRunContext) { if (!runId) { return; } - const existing = runContextById.get(runId); + const existing = state.runContextById.get(runId); if (!existing) { - runContextById.set(runId, { ...context }); + state.runContextById.set(runId, { ...context }); return; } if (context.sessionKey && existing.sessionKey !== context.sessionKey) { @@ -48,21 +58,21 @@ export function registerAgentRunContext(runId: string, context: AgentRunContext) } export function getAgentRunContext(runId: string) { - return runContextById.get(runId); + return state.runContextById.get(runId); } export function clearAgentRunContext(runId: string) { - runContextById.delete(runId); + state.runContextById.delete(runId); } export function resetAgentRunContextForTest() { - runContextById.clear(); + state.runContextById.clear(); } export function emitAgentEvent(event: Omit) { - const nextSeq = (seqByRun.get(event.runId) ?? 0) + 1; - seqByRun.set(event.runId, nextSeq); - const context = runContextById.get(event.runId); + const nextSeq = (state.seqByRun.get(event.runId) ?? 0) + 1; + state.seqByRun.set(event.runId, nextSeq); + const context = state.runContextById.get(event.runId); const isControlUiVisible = context?.isControlUiVisible ?? true; const eventSessionKey = typeof event.sessionKey === "string" && event.sessionKey.trim() ? event.sessionKey : undefined; @@ -73,7 +83,7 @@ export function emitAgentEvent(event: Omit) { seq: nextSeq, ts: Date.now(), }; - for (const listener of listeners) { + for (const listener of state.listeners) { try { listener(enriched); } catch { @@ -83,6 +93,12 @@ export function emitAgentEvent(event: Omit) { } export function onAgentEvent(listener: (evt: AgentEventPayload) => void) { - listeners.add(listener); - return () => listeners.delete(listener); + state.listeners.add(listener); + return () => state.listeners.delete(listener); +} + +export function resetAgentEventsForTest() { + state.seqByRun.clear(); + state.listeners.clear(); + state.runContextById.clear(); } diff --git a/src/infra/heartbeat-events.test.ts b/src/infra/heartbeat-events.test.ts index d1583f8080a..98bb8f541bb 100644 --- a/src/infra/heartbeat-events.test.ts +++ b/src/infra/heartbeat-events.test.ts @@ -3,9 +3,18 @@ import { emitHeartbeatEvent, getLastHeartbeatEvent, onHeartbeatEvent, + resetHeartbeatEventsForTest, resolveIndicatorType, } from "./heartbeat-events.js"; +type HeartbeatEventsModule = typeof import("./heartbeat-events.js"); + +const heartbeatEventsModuleUrl = new URL("./heartbeat-events.ts", import.meta.url).href; + +async function importHeartbeatEventsModule(cacheBust: string): Promise { + return (await import(`${heartbeatEventsModuleUrl}?t=${cacheBust}`)) as HeartbeatEventsModule; +} + describe("resolveIndicatorType", () => { it("maps heartbeat statuses to indicator types", () => { expect(resolveIndicatorType("ok-empty")).toBe("ok"); @@ -23,6 +32,7 @@ describe("heartbeat events", () => { }); afterEach(() => { + resetHeartbeatEventsForTest(); vi.useRealTimers(); }); @@ -56,4 +66,28 @@ describe("heartbeat events", () => { expect(seen).toEqual(["first:ok-empty", "third:ok-empty"]); }); + + it("shares heartbeat state across duplicate module instances", async () => { + const first = await importHeartbeatEventsModule(`first-${Date.now()}`); + const second = await importHeartbeatEventsModule(`second-${Date.now()}`); + + first.resetHeartbeatEventsForTest(); + + const seen: string[] = []; + const stop = first.onHeartbeatEvent((evt) => { + seen.push(evt.status); + }); + + second.emitHeartbeatEvent({ status: "ok-token", preview: "pong" }); + + expect(first.getLastHeartbeatEvent()).toEqual({ + ts: 1767960000000, + status: "ok-token", + preview: "pong", + }); + expect(seen).toEqual(["ok-token"]); + + stop(); + first.resetHeartbeatEventsForTest(); + }); }); diff --git a/src/infra/heartbeat-events.ts b/src/infra/heartbeat-events.ts index e16860e5d4f..04ea6a4dd30 100644 --- a/src/infra/heartbeat-events.ts +++ b/src/infra/heartbeat-events.ts @@ -1,3 +1,5 @@ +import { resolveGlobalSingleton } from "../shared/global-singleton.js"; + export type HeartbeatIndicatorType = "ok" | "alert" | "error"; export type HeartbeatEventPayload = { @@ -33,13 +35,22 @@ export function resolveIndicatorType( } } -let lastHeartbeat: HeartbeatEventPayload | null = null; -const listeners = new Set<(evt: HeartbeatEventPayload) => void>(); +type HeartbeatEventState = { + lastHeartbeat: HeartbeatEventPayload | null; + listeners: Set<(evt: HeartbeatEventPayload) => void>; +}; + +const HEARTBEAT_EVENT_STATE_KEY = Symbol.for("openclaw.heartbeatEvents.state"); + +const state = resolveGlobalSingleton(HEARTBEAT_EVENT_STATE_KEY, () => ({ + lastHeartbeat: null, + listeners: new Set<(evt: HeartbeatEventPayload) => void>(), +})); export function emitHeartbeatEvent(evt: Omit) { const enriched: HeartbeatEventPayload = { ts: Date.now(), ...evt }; - lastHeartbeat = enriched; - for (const listener of listeners) { + state.lastHeartbeat = enriched; + for (const listener of state.listeners) { try { listener(enriched); } catch { @@ -49,10 +60,15 @@ export function emitHeartbeatEvent(evt: Omit) { } export function onHeartbeatEvent(listener: (evt: HeartbeatEventPayload) => void): () => void { - listeners.add(listener); - return () => listeners.delete(listener); + state.listeners.add(listener); + return () => state.listeners.delete(listener); } export function getLastHeartbeatEvent(): HeartbeatEventPayload | null { - return lastHeartbeat; + return state.lastHeartbeat; +} + +export function resetHeartbeatEventsForTest(): void { + state.lastHeartbeat = null; + state.listeners.clear(); } diff --git a/src/infra/outbound/session-binding-service.test.ts b/src/infra/outbound/session-binding-service.test.ts index 84fe5708802..994ff427c8a 100644 --- a/src/infra/outbound/session-binding-service.test.ts +++ b/src/infra/outbound/session-binding-service.test.ts @@ -4,10 +4,24 @@ import { getSessionBindingService, isSessionBindingError, registerSessionBindingAdapter, + unregisterSessionBindingAdapter, type SessionBindingBindInput, type SessionBindingRecord, } from "./session-binding-service.js"; +type SessionBindingServiceModule = typeof import("./session-binding-service.js"); + +const sessionBindingServiceModuleUrl = new URL("./session-binding-service.ts", import.meta.url) + .href; + +async function importSessionBindingServiceModule( + cacheBust: string, +): Promise { + return (await import( + `${sessionBindingServiceModuleUrl}?t=${cacheBust}` + )) as SessionBindingServiceModule; +} + function createRecord(input: SessionBindingBindInput): SessionBindingRecord { const conversationId = input.placement === "child" @@ -199,23 +213,150 @@ describe("session binding service", () => { }); }); - it("rejects duplicate adapter registration for the same channel account", () => { + it("treats duplicate adapter registration for the same channel account as idempotent", async () => { + const firstBind = vi.fn(async (input: SessionBindingBindInput) => createRecord(input)); + const secondBind = vi.fn(async (input: SessionBindingBindInput) => createRecord(input)); + registerSessionBindingAdapter({ channel: "discord", accountId: "default", - bind: async (input) => createRecord(input), + bind: firstBind, listBySession: () => [], resolveByConversation: () => null, }); - expect(() => - registerSessionBindingAdapter({ - channel: "Discord", - accountId: "DEFAULT", - bind: async (input) => createRecord(input), - listBySession: () => [], - resolveByConversation: () => null, + registerSessionBindingAdapter({ + channel: "Discord", + accountId: "DEFAULT", + bind: secondBind, + listBySession: () => [], + resolveByConversation: () => null, + }); + + await expect( + getSessionBindingService().bind({ + targetSessionKey: "agent:main:subagent:child-1", + targetKind: "subagent", + conversation: { + channel: "discord", + accountId: "default", + conversationId: "thread-1", + }, }), - ).toThrow("Session binding adapter already registered for discord:default"); + ).resolves.toMatchObject({ + conversation: expect.objectContaining({ + channel: "discord", + accountId: "default", + conversationId: "thread-1", + }), + }); + expect(firstBind).toHaveBeenCalledTimes(1); + expect(secondBind).not.toHaveBeenCalled(); + + unregisterSessionBindingAdapter({ channel: "discord", accountId: "default" }); + + await expect( + getSessionBindingService().bind({ + targetSessionKey: "agent:main:subagent:child-2", + targetKind: "subagent", + conversation: { + channel: "discord", + accountId: "default", + conversationId: "thread-2", + }, + }), + ).resolves.toMatchObject({ + conversation: expect.objectContaining({ + channel: "discord", + accountId: "default", + conversationId: "thread-2", + }), + }); + + unregisterSessionBindingAdapter({ channel: "discord", accountId: "default" }); + + await expect( + getSessionBindingService().bind({ + targetSessionKey: "agent:main:subagent:child-3", + targetKind: "subagent", + conversation: { + channel: "discord", + accountId: "default", + conversationId: "thread-3", + }, + }), + ).rejects.toMatchObject({ + code: "BINDING_ADAPTER_UNAVAILABLE", + }); + }); + + it("shares registered adapters across duplicate module instances", async () => { + const first = await importSessionBindingServiceModule(`first-${Date.now()}`); + const second = await importSessionBindingServiceModule(`second-${Date.now()}`); + const firstBind = vi.fn(async (input: SessionBindingBindInput) => createRecord(input)); + const secondBind = vi.fn(async (input: SessionBindingBindInput) => createRecord(input)); + + first.__testing.resetSessionBindingAdaptersForTests(); + first.registerSessionBindingAdapter({ + channel: "discord", + accountId: "default", + bind: firstBind, + listBySession: () => [], + resolveByConversation: () => null, + }); + second.registerSessionBindingAdapter({ + channel: "discord", + accountId: "default", + bind: secondBind, + listBySession: () => [], + resolveByConversation: () => null, + }); + + expect(second.__testing.getRegisteredAdapterKeys()).toEqual(["discord:default"]); + + await expect( + second.getSessionBindingService().bind({ + targetSessionKey: "agent:main:subagent:child-1", + targetKind: "subagent", + conversation: { + channel: "discord", + accountId: "default", + conversationId: "thread-1", + }, + }), + ).resolves.toMatchObject({ + conversation: expect.objectContaining({ + channel: "discord", + accountId: "default", + conversationId: "thread-1", + }), + }); + expect(firstBind).toHaveBeenCalledTimes(1); + expect(secondBind).not.toHaveBeenCalled(); + + second.unregisterSessionBindingAdapter({ + channel: "discord", + accountId: "default", + }); + + await expect( + first.getSessionBindingService().bind({ + targetSessionKey: "agent:main:subagent:child-2", + targetKind: "subagent", + conversation: { + channel: "discord", + accountId: "default", + conversationId: "thread-2", + }, + }), + ).resolves.toMatchObject({ + conversation: expect.objectContaining({ + channel: "discord", + accountId: "default", + conversationId: "thread-2", + }), + }); + + first.__testing.resetSessionBindingAdaptersForTests(); }); }); diff --git a/src/infra/outbound/session-binding-service.ts b/src/infra/outbound/session-binding-service.ts index c155d3792b8..ccbe54d53a9 100644 --- a/src/infra/outbound/session-binding-service.ts +++ b/src/infra/outbound/session-binding-service.ts @@ -1,4 +1,5 @@ import { normalizeAccountId } from "../../routing/session-key.js"; +import { resolveGlobalMap } from "../../shared/global-singleton.js"; export type BindingTargetKind = "subagent" | "session"; export type BindingStatus = "active" | "ending" | "ended"; @@ -145,7 +146,17 @@ function resolveAdapterCapabilities( }; } -const ADAPTERS_BY_CHANNEL_ACCOUNT = new Map(); +const SESSION_BINDING_ADAPTERS_KEY = Symbol.for("openclaw.sessionBinding.adapters"); +const SESSION_BINDING_ADAPTER_REF_COUNTS_KEY = Symbol.for( + "openclaw.sessionBinding.adapterRefCounts", +); + +const ADAPTERS_BY_CHANNEL_ACCOUNT = resolveGlobalMap( + SESSION_BINDING_ADAPTERS_KEY, +); +const ADAPTER_REF_COUNTS_BY_CHANNEL_ACCOUNT = resolveGlobalMap( + SESSION_BINDING_ADAPTER_REF_COUNTS_KEY, +); export function registerSessionBindingAdapter(adapter: SessionBindingAdapter): void { const normalizedAdapter = { @@ -158,19 +169,33 @@ export function registerSessionBindingAdapter(adapter: SessionBindingAdapter): v accountId: normalizedAdapter.accountId, }); const existing = ADAPTERS_BY_CHANNEL_ACCOUNT.get(key); - if (existing && existing !== adapter) { - throw new Error( - `Session binding adapter already registered for ${normalizedAdapter.channel}:${normalizedAdapter.accountId}`, + if (existing) { + // Duplicate module graphs can legitimately hit registration multiple times + // for the same logical adapter key in one process. Keep the first adapter + // stable and track registrations so later duplicate imports can unregister + // independently without deleting the live shared adapter too early. + ADAPTER_REF_COUNTS_BY_CHANNEL_ACCOUNT.set( + key, + Math.max(1, ADAPTER_REF_COUNTS_BY_CHANNEL_ACCOUNT.get(key) ?? 1) + 1, ); + return; } ADAPTERS_BY_CHANNEL_ACCOUNT.set(key, normalizedAdapter); + ADAPTER_REF_COUNTS_BY_CHANNEL_ACCOUNT.set(key, 1); } export function unregisterSessionBindingAdapter(params: { channel: string; accountId: string; }): void { - ADAPTERS_BY_CHANNEL_ACCOUNT.delete(toAdapterKey(params)); + const key = toAdapterKey(params); + const currentRefCount = ADAPTER_REF_COUNTS_BY_CHANNEL_ACCOUNT.get(key) ?? 0; + if (currentRefCount > 1) { + ADAPTER_REF_COUNTS_BY_CHANNEL_ACCOUNT.set(key, currentRefCount - 1); + return; + } + ADAPTER_REF_COUNTS_BY_CHANNEL_ACCOUNT.delete(key); + ADAPTERS_BY_CHANNEL_ACCOUNT.delete(key); } function resolveAdapterForConversation(ref: ConversationRef): SessionBindingAdapter | null { @@ -325,6 +350,7 @@ export function getSessionBindingService(): SessionBindingService { export const __testing = { resetSessionBindingAdaptersForTests() { ADAPTERS_BY_CHANNEL_ACCOUNT.clear(); + ADAPTER_REF_COUNTS_BY_CHANNEL_ACCOUNT.clear(); }, getRegisteredAdapterKeys() { return [...ADAPTERS_BY_CHANNEL_ACCOUNT.keys()]; diff --git a/src/infra/system-events.test.ts b/src/infra/system-events.test.ts index cf16416e210..c03dbe83725 100644 --- a/src/infra/system-events.test.ts +++ b/src/infra/system-events.test.ts @@ -13,6 +13,14 @@ import { resetSystemEventsForTest, } from "./system-events.js"; +type SystemEventsModule = typeof import("./system-events.js"); + +const systemEventsModuleUrl = new URL("./system-events.ts", import.meta.url).href; + +async function importSystemEventsModule(cacheBust: string): Promise { + return (await import(`${systemEventsModuleUrl}?t=${cacheBust}`)) as SystemEventsModule; +} + const cfg = {} as unknown as OpenClawConfig; const mainKey = resolveMainSessionKey(cfg); @@ -108,6 +116,26 @@ describe("system events (session routing)", () => { ); }); + it("shares queued events across duplicate module instances", async () => { + const first = await importSystemEventsModule(`first-${Date.now()}`); + const second = await importSystemEventsModule(`second-${Date.now()}`); + const key = "agent:main:test-duplicate-module"; + + first.resetSystemEventsForTest(); + second.enqueueSystemEvent("Node connected", { sessionKey: key, contextKey: "build:123" }); + + expect(first.peekSystemEventEntries(key)).toEqual([ + expect.objectContaining({ + text: "Node connected", + contextKey: "build:123", + }), + ]); + expect(first.isSystemEventContextChanged(key, "build:123")).toBe(false); + expect(first.drainSystemEvents(key)).toEqual(["Node connected"]); + + first.resetSystemEventsForTest(); + }); + it("filters heartbeat/noise lines, returning undefined", async () => { const key = "agent:main:test-heartbeat-filter"; enqueueSystemEvent("Read HEARTBEAT.md before continuing", { sessionKey: key }); diff --git a/src/infra/system-events.ts b/src/infra/system-events.ts index 771890bcddd..ce8785d5a22 100644 --- a/src/infra/system-events.ts +++ b/src/infra/system-events.ts @@ -2,6 +2,8 @@ // prefixed to the next prompt. We intentionally avoid persistence to keep // events ephemeral. Events are session-scoped and require an explicit key. +import { resolveGlobalMap } from "../shared/global-singleton.js"; + export type SystemEvent = { text: string; ts: number; contextKey?: string | null }; const MAX_EVENTS = 20; @@ -12,7 +14,9 @@ type SessionQueue = { lastContextKey: string | null; }; -const queues = new Map(); +const SYSTEM_EVENT_QUEUES_KEY = Symbol.for("openclaw.systemEvents.queues"); + +const queues = resolveGlobalMap(SYSTEM_EVENT_QUEUES_KEY); type SystemEventOptions = { sessionKey: string;