Infra: unify plugin split runtime state

This commit is contained in:
huntharo 2026-03-19 22:48:15 -04:00
parent de9f2dc227
commit c102309028
No known key found for this signature in database
8 changed files with 349 additions and 38 deletions

View File

@ -8,6 +8,14 @@ import {
resetAgentRunContextForTest,
} from "./agent-events.js";
type AgentEventsModule = typeof import("./agent-events.js");
const agentEventsModuleUrl = new URL("./agent-events.ts", import.meta.url).href;
async function importAgentEventsModule(cacheBust: string): Promise<AgentEventsModule> {
return (await import(`${agentEventsModuleUrl}?t=${cacheBust}`)) as AgentEventsModule;
}
describe("agent-events sequencing", () => {
test("stores and clears run context", async () => {
resetAgentRunContextForTest();
@ -144,4 +152,42 @@ describe("agent-events sequencing", () => {
expect(seen).toEqual(["run-safe"]);
});
test("shares run context, listeners, and sequence state across duplicate module instances", async () => {
const first = await importAgentEventsModule(`first-${Date.now()}`);
const second = await importAgentEventsModule(`second-${Date.now()}`);
first.resetAgentEventsForTest();
first.registerAgentRunContext("run-dup", { sessionKey: "session-dup" });
const seen: Array<{ seq: number; sessionKey?: string }> = [];
const stop = first.onAgentEvent((evt) => {
if (evt.runId === "run-dup") {
seen.push({ seq: evt.seq, sessionKey: evt.sessionKey });
}
});
second.emitAgentEvent({
runId: "run-dup",
stream: "assistant",
data: { text: "from second" },
sessionKey: " ",
});
first.emitAgentEvent({
runId: "run-dup",
stream: "assistant",
data: { text: "from first" },
sessionKey: " ",
});
stop();
expect(second.getAgentRunContext("run-dup")).toEqual({ sessionKey: "session-dup" });
expect(seen).toEqual([
{ seq: 1, sessionKey: "session-dup" },
{ seq: 2, sessionKey: "session-dup" },
]);
first.resetAgentEventsForTest();
});
});

View File

@ -1,4 +1,5 @@
import type { VerboseLevel } from "../auto-reply/thinking.js";
import { resolveGlobalSingleton } from "../shared/global-singleton.js";
export type AgentEventStream = "lifecycle" | "tool" | "assistant" | "error" | (string & {});
@ -19,18 +20,27 @@ export type AgentRunContext = {
isControlUiVisible?: boolean;
};
// Keep per-run counters so streams stay strictly monotonic per runId.
const seqByRun = new Map<string, number>();
const listeners = new Set<(evt: AgentEventPayload) => void>();
const runContextById = new Map<string, AgentRunContext>();
type AgentEventState = {
seqByRun: Map<string, number>;
listeners: Set<(evt: AgentEventPayload) => void>;
runContextById: Map<string, AgentRunContext>;
};
const AGENT_EVENT_STATE_KEY = Symbol.for("openclaw.agentEvents.state");
const state = resolveGlobalSingleton<AgentEventState>(AGENT_EVENT_STATE_KEY, () => ({
seqByRun: new Map<string, number>(),
listeners: new Set<(evt: AgentEventPayload) => void>(),
runContextById: new Map<string, AgentRunContext>(),
}));
export function registerAgentRunContext(runId: string, context: AgentRunContext) {
if (!runId) {
return;
}
const existing = runContextById.get(runId);
const existing = state.runContextById.get(runId);
if (!existing) {
runContextById.set(runId, { ...context });
state.runContextById.set(runId, { ...context });
return;
}
if (context.sessionKey && existing.sessionKey !== context.sessionKey) {
@ -48,21 +58,21 @@ export function registerAgentRunContext(runId: string, context: AgentRunContext)
}
export function getAgentRunContext(runId: string) {
return runContextById.get(runId);
return state.runContextById.get(runId);
}
export function clearAgentRunContext(runId: string) {
runContextById.delete(runId);
state.runContextById.delete(runId);
}
export function resetAgentRunContextForTest() {
runContextById.clear();
state.runContextById.clear();
}
export function emitAgentEvent(event: Omit<AgentEventPayload, "seq" | "ts">) {
const nextSeq = (seqByRun.get(event.runId) ?? 0) + 1;
seqByRun.set(event.runId, nextSeq);
const context = runContextById.get(event.runId);
const nextSeq = (state.seqByRun.get(event.runId) ?? 0) + 1;
state.seqByRun.set(event.runId, nextSeq);
const context = state.runContextById.get(event.runId);
const isControlUiVisible = context?.isControlUiVisible ?? true;
const eventSessionKey =
typeof event.sessionKey === "string" && event.sessionKey.trim() ? event.sessionKey : undefined;
@ -73,7 +83,7 @@ export function emitAgentEvent(event: Omit<AgentEventPayload, "seq" | "ts">) {
seq: nextSeq,
ts: Date.now(),
};
for (const listener of listeners) {
for (const listener of state.listeners) {
try {
listener(enriched);
} catch {
@ -83,6 +93,12 @@ export function emitAgentEvent(event: Omit<AgentEventPayload, "seq" | "ts">) {
}
export function onAgentEvent(listener: (evt: AgentEventPayload) => void) {
listeners.add(listener);
return () => listeners.delete(listener);
state.listeners.add(listener);
return () => state.listeners.delete(listener);
}
export function resetAgentEventsForTest() {
state.seqByRun.clear();
state.listeners.clear();
state.runContextById.clear();
}

View File

@ -3,9 +3,18 @@ import {
emitHeartbeatEvent,
getLastHeartbeatEvent,
onHeartbeatEvent,
resetHeartbeatEventsForTest,
resolveIndicatorType,
} from "./heartbeat-events.js";
type HeartbeatEventsModule = typeof import("./heartbeat-events.js");
const heartbeatEventsModuleUrl = new URL("./heartbeat-events.ts", import.meta.url).href;
async function importHeartbeatEventsModule(cacheBust: string): Promise<HeartbeatEventsModule> {
return (await import(`${heartbeatEventsModuleUrl}?t=${cacheBust}`)) as HeartbeatEventsModule;
}
describe("resolveIndicatorType", () => {
it("maps heartbeat statuses to indicator types", () => {
expect(resolveIndicatorType("ok-empty")).toBe("ok");
@ -23,6 +32,7 @@ describe("heartbeat events", () => {
});
afterEach(() => {
resetHeartbeatEventsForTest();
vi.useRealTimers();
});
@ -56,4 +66,28 @@ describe("heartbeat events", () => {
expect(seen).toEqual(["first:ok-empty", "third:ok-empty"]);
});
it("shares heartbeat state across duplicate module instances", async () => {
const first = await importHeartbeatEventsModule(`first-${Date.now()}`);
const second = await importHeartbeatEventsModule(`second-${Date.now()}`);
first.resetHeartbeatEventsForTest();
const seen: string[] = [];
const stop = first.onHeartbeatEvent((evt) => {
seen.push(evt.status);
});
second.emitHeartbeatEvent({ status: "ok-token", preview: "pong" });
expect(first.getLastHeartbeatEvent()).toEqual({
ts: 1767960000000,
status: "ok-token",
preview: "pong",
});
expect(seen).toEqual(["ok-token"]);
stop();
first.resetHeartbeatEventsForTest();
});
});

View File

@ -1,3 +1,5 @@
import { resolveGlobalSingleton } from "../shared/global-singleton.js";
export type HeartbeatIndicatorType = "ok" | "alert" | "error";
export type HeartbeatEventPayload = {
@ -33,13 +35,22 @@ export function resolveIndicatorType(
}
}
let lastHeartbeat: HeartbeatEventPayload | null = null;
const listeners = new Set<(evt: HeartbeatEventPayload) => void>();
type HeartbeatEventState = {
lastHeartbeat: HeartbeatEventPayload | null;
listeners: Set<(evt: HeartbeatEventPayload) => void>;
};
const HEARTBEAT_EVENT_STATE_KEY = Symbol.for("openclaw.heartbeatEvents.state");
const state = resolveGlobalSingleton<HeartbeatEventState>(HEARTBEAT_EVENT_STATE_KEY, () => ({
lastHeartbeat: null,
listeners: new Set<(evt: HeartbeatEventPayload) => void>(),
}));
export function emitHeartbeatEvent(evt: Omit<HeartbeatEventPayload, "ts">) {
const enriched: HeartbeatEventPayload = { ts: Date.now(), ...evt };
lastHeartbeat = enriched;
for (const listener of listeners) {
state.lastHeartbeat = enriched;
for (const listener of state.listeners) {
try {
listener(enriched);
} catch {
@ -49,10 +60,15 @@ export function emitHeartbeatEvent(evt: Omit<HeartbeatEventPayload, "ts">) {
}
export function onHeartbeatEvent(listener: (evt: HeartbeatEventPayload) => void): () => void {
listeners.add(listener);
return () => listeners.delete(listener);
state.listeners.add(listener);
return () => state.listeners.delete(listener);
}
export function getLastHeartbeatEvent(): HeartbeatEventPayload | null {
return lastHeartbeat;
return state.lastHeartbeat;
}
export function resetHeartbeatEventsForTest(): void {
state.lastHeartbeat = null;
state.listeners.clear();
}

View File

@ -4,10 +4,24 @@ import {
getSessionBindingService,
isSessionBindingError,
registerSessionBindingAdapter,
unregisterSessionBindingAdapter,
type SessionBindingBindInput,
type SessionBindingRecord,
} from "./session-binding-service.js";
type SessionBindingServiceModule = typeof import("./session-binding-service.js");
const sessionBindingServiceModuleUrl = new URL("./session-binding-service.ts", import.meta.url)
.href;
async function importSessionBindingServiceModule(
cacheBust: string,
): Promise<SessionBindingServiceModule> {
return (await import(
`${sessionBindingServiceModuleUrl}?t=${cacheBust}`
)) as SessionBindingServiceModule;
}
function createRecord(input: SessionBindingBindInput): SessionBindingRecord {
const conversationId =
input.placement === "child"
@ -199,23 +213,150 @@ describe("session binding service", () => {
});
});
it("rejects duplicate adapter registration for the same channel account", () => {
it("treats duplicate adapter registration for the same channel account as idempotent", async () => {
const firstBind = vi.fn(async (input: SessionBindingBindInput) => createRecord(input));
const secondBind = vi.fn(async (input: SessionBindingBindInput) => createRecord(input));
registerSessionBindingAdapter({
channel: "discord",
accountId: "default",
bind: async (input) => createRecord(input),
bind: firstBind,
listBySession: () => [],
resolveByConversation: () => null,
});
expect(() =>
registerSessionBindingAdapter({
channel: "Discord",
accountId: "DEFAULT",
bind: async (input) => createRecord(input),
listBySession: () => [],
resolveByConversation: () => null,
registerSessionBindingAdapter({
channel: "Discord",
accountId: "DEFAULT",
bind: secondBind,
listBySession: () => [],
resolveByConversation: () => null,
});
await expect(
getSessionBindingService().bind({
targetSessionKey: "agent:main:subagent:child-1",
targetKind: "subagent",
conversation: {
channel: "discord",
accountId: "default",
conversationId: "thread-1",
},
}),
).toThrow("Session binding adapter already registered for discord:default");
).resolves.toMatchObject({
conversation: expect.objectContaining({
channel: "discord",
accountId: "default",
conversationId: "thread-1",
}),
});
expect(firstBind).toHaveBeenCalledTimes(1);
expect(secondBind).not.toHaveBeenCalled();
unregisterSessionBindingAdapter({ channel: "discord", accountId: "default" });
await expect(
getSessionBindingService().bind({
targetSessionKey: "agent:main:subagent:child-2",
targetKind: "subagent",
conversation: {
channel: "discord",
accountId: "default",
conversationId: "thread-2",
},
}),
).resolves.toMatchObject({
conversation: expect.objectContaining({
channel: "discord",
accountId: "default",
conversationId: "thread-2",
}),
});
unregisterSessionBindingAdapter({ channel: "discord", accountId: "default" });
await expect(
getSessionBindingService().bind({
targetSessionKey: "agent:main:subagent:child-3",
targetKind: "subagent",
conversation: {
channel: "discord",
accountId: "default",
conversationId: "thread-3",
},
}),
).rejects.toMatchObject({
code: "BINDING_ADAPTER_UNAVAILABLE",
});
});
it("shares registered adapters across duplicate module instances", async () => {
const first = await importSessionBindingServiceModule(`first-${Date.now()}`);
const second = await importSessionBindingServiceModule(`second-${Date.now()}`);
const firstBind = vi.fn(async (input: SessionBindingBindInput) => createRecord(input));
const secondBind = vi.fn(async (input: SessionBindingBindInput) => createRecord(input));
first.__testing.resetSessionBindingAdaptersForTests();
first.registerSessionBindingAdapter({
channel: "discord",
accountId: "default",
bind: firstBind,
listBySession: () => [],
resolveByConversation: () => null,
});
second.registerSessionBindingAdapter({
channel: "discord",
accountId: "default",
bind: secondBind,
listBySession: () => [],
resolveByConversation: () => null,
});
expect(second.__testing.getRegisteredAdapterKeys()).toEqual(["discord:default"]);
await expect(
second.getSessionBindingService().bind({
targetSessionKey: "agent:main:subagent:child-1",
targetKind: "subagent",
conversation: {
channel: "discord",
accountId: "default",
conversationId: "thread-1",
},
}),
).resolves.toMatchObject({
conversation: expect.objectContaining({
channel: "discord",
accountId: "default",
conversationId: "thread-1",
}),
});
expect(firstBind).toHaveBeenCalledTimes(1);
expect(secondBind).not.toHaveBeenCalled();
second.unregisterSessionBindingAdapter({
channel: "discord",
accountId: "default",
});
await expect(
first.getSessionBindingService().bind({
targetSessionKey: "agent:main:subagent:child-2",
targetKind: "subagent",
conversation: {
channel: "discord",
accountId: "default",
conversationId: "thread-2",
},
}),
).resolves.toMatchObject({
conversation: expect.objectContaining({
channel: "discord",
accountId: "default",
conversationId: "thread-2",
}),
});
first.__testing.resetSessionBindingAdaptersForTests();
});
});

View File

@ -1,4 +1,5 @@
import { normalizeAccountId } from "../../routing/session-key.js";
import { resolveGlobalMap } from "../../shared/global-singleton.js";
export type BindingTargetKind = "subagent" | "session";
export type BindingStatus = "active" | "ending" | "ended";
@ -145,7 +146,17 @@ function resolveAdapterCapabilities(
};
}
const ADAPTERS_BY_CHANNEL_ACCOUNT = new Map<string, SessionBindingAdapter>();
const SESSION_BINDING_ADAPTERS_KEY = Symbol.for("openclaw.sessionBinding.adapters");
const SESSION_BINDING_ADAPTER_REF_COUNTS_KEY = Symbol.for(
"openclaw.sessionBinding.adapterRefCounts",
);
const ADAPTERS_BY_CHANNEL_ACCOUNT = resolveGlobalMap<string, SessionBindingAdapter>(
SESSION_BINDING_ADAPTERS_KEY,
);
const ADAPTER_REF_COUNTS_BY_CHANNEL_ACCOUNT = resolveGlobalMap<string, number>(
SESSION_BINDING_ADAPTER_REF_COUNTS_KEY,
);
export function registerSessionBindingAdapter(adapter: SessionBindingAdapter): void {
const normalizedAdapter = {
@ -158,19 +169,33 @@ export function registerSessionBindingAdapter(adapter: SessionBindingAdapter): v
accountId: normalizedAdapter.accountId,
});
const existing = ADAPTERS_BY_CHANNEL_ACCOUNT.get(key);
if (existing && existing !== adapter) {
throw new Error(
`Session binding adapter already registered for ${normalizedAdapter.channel}:${normalizedAdapter.accountId}`,
if (existing) {
// Duplicate module graphs can legitimately hit registration multiple times
// for the same logical adapter key in one process. Keep the first adapter
// stable and track registrations so later duplicate imports can unregister
// independently without deleting the live shared adapter too early.
ADAPTER_REF_COUNTS_BY_CHANNEL_ACCOUNT.set(
key,
Math.max(1, ADAPTER_REF_COUNTS_BY_CHANNEL_ACCOUNT.get(key) ?? 1) + 1,
);
return;
}
ADAPTERS_BY_CHANNEL_ACCOUNT.set(key, normalizedAdapter);
ADAPTER_REF_COUNTS_BY_CHANNEL_ACCOUNT.set(key, 1);
}
export function unregisterSessionBindingAdapter(params: {
channel: string;
accountId: string;
}): void {
ADAPTERS_BY_CHANNEL_ACCOUNT.delete(toAdapterKey(params));
const key = toAdapterKey(params);
const currentRefCount = ADAPTER_REF_COUNTS_BY_CHANNEL_ACCOUNT.get(key) ?? 0;
if (currentRefCount > 1) {
ADAPTER_REF_COUNTS_BY_CHANNEL_ACCOUNT.set(key, currentRefCount - 1);
return;
}
ADAPTER_REF_COUNTS_BY_CHANNEL_ACCOUNT.delete(key);
ADAPTERS_BY_CHANNEL_ACCOUNT.delete(key);
}
function resolveAdapterForConversation(ref: ConversationRef): SessionBindingAdapter | null {
@ -325,6 +350,7 @@ export function getSessionBindingService(): SessionBindingService {
export const __testing = {
resetSessionBindingAdaptersForTests() {
ADAPTERS_BY_CHANNEL_ACCOUNT.clear();
ADAPTER_REF_COUNTS_BY_CHANNEL_ACCOUNT.clear();
},
getRegisteredAdapterKeys() {
return [...ADAPTERS_BY_CHANNEL_ACCOUNT.keys()];

View File

@ -13,6 +13,14 @@ import {
resetSystemEventsForTest,
} from "./system-events.js";
type SystemEventsModule = typeof import("./system-events.js");
const systemEventsModuleUrl = new URL("./system-events.ts", import.meta.url).href;
async function importSystemEventsModule(cacheBust: string): Promise<SystemEventsModule> {
return (await import(`${systemEventsModuleUrl}?t=${cacheBust}`)) as SystemEventsModule;
}
const cfg = {} as unknown as OpenClawConfig;
const mainKey = resolveMainSessionKey(cfg);
@ -108,6 +116,26 @@ describe("system events (session routing)", () => {
);
});
it("shares queued events across duplicate module instances", async () => {
const first = await importSystemEventsModule(`first-${Date.now()}`);
const second = await importSystemEventsModule(`second-${Date.now()}`);
const key = "agent:main:test-duplicate-module";
first.resetSystemEventsForTest();
second.enqueueSystemEvent("Node connected", { sessionKey: key, contextKey: "build:123" });
expect(first.peekSystemEventEntries(key)).toEqual([
expect.objectContaining({
text: "Node connected",
contextKey: "build:123",
}),
]);
expect(first.isSystemEventContextChanged(key, "build:123")).toBe(false);
expect(first.drainSystemEvents(key)).toEqual(["Node connected"]);
first.resetSystemEventsForTest();
});
it("filters heartbeat/noise lines, returning undefined", async () => {
const key = "agent:main:test-heartbeat-filter";
enqueueSystemEvent("Read HEARTBEAT.md before continuing", { sessionKey: key });

View File

@ -2,6 +2,8 @@
// prefixed to the next prompt. We intentionally avoid persistence to keep
// events ephemeral. Events are session-scoped and require an explicit key.
import { resolveGlobalMap } from "../shared/global-singleton.js";
export type SystemEvent = { text: string; ts: number; contextKey?: string | null };
const MAX_EVENTS = 20;
@ -12,7 +14,9 @@ type SessionQueue = {
lastContextKey: string | null;
};
const queues = new Map<string, SessionQueue>();
const SYSTEM_EVENT_QUEUES_KEY = Symbol.for("openclaw.systemEvents.queues");
const queues = resolveGlobalMap<string, SessionQueue>(SYSTEM_EVENT_QUEUES_KEY);
type SystemEventOptions = {
sessionKey: string;