Merge 5cacc8c2f232d22f70d44f7c4977fb5a4084161b into 43513cd1df63af0704dfb351ee7864607f955dcc
This commit is contained in:
commit
8d80b5f2b8
@ -5,6 +5,7 @@ import {
|
||||
resolveThreadBindingConversationIdFromBindingId,
|
||||
unregisterSessionBindingAdapter,
|
||||
type BindingTargetKind,
|
||||
type SessionBindingAdapter,
|
||||
type SessionBindingRecord,
|
||||
} from "openclaw/plugin-sdk/conversation-runtime";
|
||||
import { normalizeAccountId, resolveAgentIdFromSessionKey } from "openclaw/plugin-sdk/routing";
|
||||
@ -556,6 +557,7 @@ export function createThreadBindingManager(
|
||||
unregisterSessionBindingAdapter({
|
||||
channel: "discord",
|
||||
accountId,
|
||||
adapter: sessionBindingAdapter,
|
||||
});
|
||||
forgetThreadBindingToken(accountId);
|
||||
},
|
||||
@ -572,7 +574,7 @@ export function createThreadBindingManager(
|
||||
}
|
||||
}
|
||||
|
||||
registerSessionBindingAdapter({
|
||||
const sessionBindingAdapter: SessionBindingAdapter = {
|
||||
channel: "discord",
|
||||
accountId,
|
||||
capabilities: {
|
||||
@ -682,7 +684,9 @@ export function createThreadBindingManager(
|
||||
});
|
||||
return removed ? [toSessionBindingRecord(removed, { idleTimeoutMs, maxAgeMs })] : [];
|
||||
},
|
||||
});
|
||||
};
|
||||
|
||||
registerSessionBindingAdapter(sessionBindingAdapter);
|
||||
|
||||
registerManager(manager);
|
||||
return manager;
|
||||
|
||||
@ -6,6 +6,7 @@ import {
|
||||
resolveThreadBindingConversationIdFromBindingId,
|
||||
unregisterSessionBindingAdapter,
|
||||
type BindingTargetKind,
|
||||
type SessionBindingAdapter,
|
||||
type SessionBindingRecord,
|
||||
} from "openclaw/plugin-sdk/conversation-runtime";
|
||||
import { normalizeAccountId, resolveAgentIdFromSessionKey } from "openclaw/plugin-sdk/routing";
|
||||
@ -229,11 +230,15 @@ export function createFeishuThreadBindingManager(params: {
|
||||
}
|
||||
}
|
||||
MANAGERS_BY_ACCOUNT_ID.delete(accountId);
|
||||
unregisterSessionBindingAdapter({ channel: "feishu", accountId });
|
||||
unregisterSessionBindingAdapter({
|
||||
channel: "feishu",
|
||||
accountId,
|
||||
adapter: sessionBindingAdapter,
|
||||
});
|
||||
},
|
||||
};
|
||||
|
||||
registerSessionBindingAdapter({
|
||||
const sessionBindingAdapter: SessionBindingAdapter = {
|
||||
channel: "feishu",
|
||||
accountId,
|
||||
capabilities: {
|
||||
@ -288,7 +293,9 @@ export function createFeishuThreadBindingManager(params: {
|
||||
const removed = manager.unbindConversation(conversationId);
|
||||
return removed ? [toSessionBindingRecord(removed, { idleTimeoutMs, maxAgeMs })] : [];
|
||||
},
|
||||
});
|
||||
};
|
||||
|
||||
registerSessionBindingAdapter(sessionBindingAdapter);
|
||||
|
||||
MANAGERS_BY_ACCOUNT_ID.set(accountId, manager);
|
||||
return manager;
|
||||
|
||||
@ -1,4 +1,5 @@
|
||||
import path from "node:path";
|
||||
import type { SessionBindingAdapter } from "openclaw/plugin-sdk/conversation-runtime";
|
||||
import {
|
||||
readJsonFileWithFallback,
|
||||
registerSessionBindingAdapter,
|
||||
@ -367,6 +368,7 @@ export async function createMatrixThreadBindingManager(params: {
|
||||
unregisterSessionBindingAdapter({
|
||||
channel: "matrix",
|
||||
accountId: params.accountId,
|
||||
adapter: sessionBindingAdapter,
|
||||
});
|
||||
if (getMatrixThreadBindingManagerEntry(params.accountId)?.manager === manager) {
|
||||
deleteMatrixThreadBindingManagerEntry(params.accountId);
|
||||
@ -413,7 +415,7 @@ export async function createMatrixThreadBindingManager(params: {
|
||||
return removed.map((record) => toSessionBindingRecord(record, defaults));
|
||||
};
|
||||
|
||||
registerSessionBindingAdapter({
|
||||
const sessionBindingAdapter: SessionBindingAdapter = {
|
||||
channel: "matrix",
|
||||
accountId: params.accountId,
|
||||
capabilities: { placements: ["current", "child"], bindSupported: true, unbindSupported: true },
|
||||
@ -512,7 +514,9 @@ export async function createMatrixThreadBindingManager(params: {
|
||||
);
|
||||
return removed;
|
||||
},
|
||||
});
|
||||
};
|
||||
|
||||
registerSessionBindingAdapter(sessionBindingAdapter);
|
||||
|
||||
if (params.enableSweeper !== false) {
|
||||
sweepTimer = setInterval(() => {
|
||||
|
||||
@ -8,6 +8,7 @@ import {
|
||||
resolveThreadBindingEffectiveExpiresAt,
|
||||
unregisterSessionBindingAdapter,
|
||||
type BindingTargetKind,
|
||||
type SessionBindingAdapter,
|
||||
type SessionBindingRecord,
|
||||
} from "openclaw/plugin-sdk/conversation-runtime";
|
||||
import { writeJsonAtomic } from "openclaw/plugin-sdk/infra-runtime";
|
||||
@ -539,7 +540,11 @@ export function createTelegramThreadBindingManager(
|
||||
clearInterval(sweepTimer);
|
||||
sweepTimer = null;
|
||||
}
|
||||
unregisterSessionBindingAdapter({ channel: "telegram", accountId });
|
||||
unregisterSessionBindingAdapter({
|
||||
channel: "telegram",
|
||||
accountId,
|
||||
adapter: sessionBindingAdapter,
|
||||
});
|
||||
const existingManager = MANAGERS_BY_ACCOUNT_ID.get(accountId);
|
||||
if (existingManager === manager) {
|
||||
MANAGERS_BY_ACCOUNT_ID.delete(accountId);
|
||||
@ -547,7 +552,7 @@ export function createTelegramThreadBindingManager(
|
||||
},
|
||||
};
|
||||
|
||||
registerSessionBindingAdapter({
|
||||
const sessionBindingAdapter: SessionBindingAdapter = {
|
||||
channel: "telegram",
|
||||
accountId,
|
||||
capabilities: {
|
||||
@ -684,7 +689,9 @@ export function createTelegramThreadBindingManager(
|
||||
]
|
||||
: [];
|
||||
},
|
||||
});
|
||||
};
|
||||
|
||||
registerSessionBindingAdapter(sessionBindingAdapter);
|
||||
|
||||
const sweeperEnabled = params.enableSweeper !== false;
|
||||
if (sweeperEnabled) {
|
||||
|
||||
@ -8,6 +8,14 @@ import {
|
||||
resetAgentRunContextForTest,
|
||||
} from "./agent-events.js";
|
||||
|
||||
type AgentEventsModule = typeof import("./agent-events.js");
|
||||
|
||||
const agentEventsModuleUrl = new URL("./agent-events.ts", import.meta.url).href;
|
||||
|
||||
async function importAgentEventsModule(cacheBust: string): Promise<AgentEventsModule> {
|
||||
return (await import(`${agentEventsModuleUrl}?t=${cacheBust}`)) as AgentEventsModule;
|
||||
}
|
||||
|
||||
describe("agent-events sequencing", () => {
|
||||
test("stores and clears run context", async () => {
|
||||
resetAgentRunContextForTest();
|
||||
@ -144,4 +152,42 @@ describe("agent-events sequencing", () => {
|
||||
|
||||
expect(seen).toEqual(["run-safe"]);
|
||||
});
|
||||
|
||||
test("shares run context, listeners, and sequence state across duplicate module instances", async () => {
|
||||
const first = await importAgentEventsModule(`first-${Date.now()}`);
|
||||
const second = await importAgentEventsModule(`second-${Date.now()}`);
|
||||
|
||||
first.resetAgentEventsForTest();
|
||||
first.registerAgentRunContext("run-dup", { sessionKey: "session-dup" });
|
||||
|
||||
const seen: Array<{ seq: number; sessionKey?: string }> = [];
|
||||
const stop = first.onAgentEvent((evt) => {
|
||||
if (evt.runId === "run-dup") {
|
||||
seen.push({ seq: evt.seq, sessionKey: evt.sessionKey });
|
||||
}
|
||||
});
|
||||
|
||||
second.emitAgentEvent({
|
||||
runId: "run-dup",
|
||||
stream: "assistant",
|
||||
data: { text: "from second" },
|
||||
sessionKey: " ",
|
||||
});
|
||||
first.emitAgentEvent({
|
||||
runId: "run-dup",
|
||||
stream: "assistant",
|
||||
data: { text: "from first" },
|
||||
sessionKey: " ",
|
||||
});
|
||||
|
||||
stop();
|
||||
|
||||
expect(second.getAgentRunContext("run-dup")).toEqual({ sessionKey: "session-dup" });
|
||||
expect(seen).toEqual([
|
||||
{ seq: 1, sessionKey: "session-dup" },
|
||||
{ seq: 2, sessionKey: "session-dup" },
|
||||
]);
|
||||
|
||||
first.resetAgentEventsForTest();
|
||||
});
|
||||
});
|
||||
|
||||
@ -1,4 +1,5 @@
|
||||
import type { VerboseLevel } from "../auto-reply/thinking.js";
|
||||
import { resolveGlobalSingleton } from "../shared/global-singleton.js";
|
||||
|
||||
export type AgentEventStream = "lifecycle" | "tool" | "assistant" | "error" | (string & {});
|
||||
|
||||
@ -19,18 +20,27 @@ export type AgentRunContext = {
|
||||
isControlUiVisible?: boolean;
|
||||
};
|
||||
|
||||
// Keep per-run counters so streams stay strictly monotonic per runId.
|
||||
const seqByRun = new Map<string, number>();
|
||||
const listeners = new Set<(evt: AgentEventPayload) => void>();
|
||||
const runContextById = new Map<string, AgentRunContext>();
|
||||
type AgentEventState = {
|
||||
seqByRun: Map<string, number>;
|
||||
listeners: Set<(evt: AgentEventPayload) => void>;
|
||||
runContextById: Map<string, AgentRunContext>;
|
||||
};
|
||||
|
||||
const AGENT_EVENT_STATE_KEY = Symbol.for("openclaw.agentEvents.state");
|
||||
|
||||
const state = resolveGlobalSingleton<AgentEventState>(AGENT_EVENT_STATE_KEY, () => ({
|
||||
seqByRun: new Map<string, number>(),
|
||||
listeners: new Set<(evt: AgentEventPayload) => void>(),
|
||||
runContextById: new Map<string, AgentRunContext>(),
|
||||
}));
|
||||
|
||||
export function registerAgentRunContext(runId: string, context: AgentRunContext) {
|
||||
if (!runId) {
|
||||
return;
|
||||
}
|
||||
const existing = runContextById.get(runId);
|
||||
const existing = state.runContextById.get(runId);
|
||||
if (!existing) {
|
||||
runContextById.set(runId, { ...context });
|
||||
state.runContextById.set(runId, { ...context });
|
||||
return;
|
||||
}
|
||||
if (context.sessionKey && existing.sessionKey !== context.sessionKey) {
|
||||
@ -48,21 +58,21 @@ export function registerAgentRunContext(runId: string, context: AgentRunContext)
|
||||
}
|
||||
|
||||
export function getAgentRunContext(runId: string) {
|
||||
return runContextById.get(runId);
|
||||
return state.runContextById.get(runId);
|
||||
}
|
||||
|
||||
export function clearAgentRunContext(runId: string) {
|
||||
runContextById.delete(runId);
|
||||
state.runContextById.delete(runId);
|
||||
}
|
||||
|
||||
export function resetAgentRunContextForTest() {
|
||||
runContextById.clear();
|
||||
state.runContextById.clear();
|
||||
}
|
||||
|
||||
export function emitAgentEvent(event: Omit<AgentEventPayload, "seq" | "ts">) {
|
||||
const nextSeq = (seqByRun.get(event.runId) ?? 0) + 1;
|
||||
seqByRun.set(event.runId, nextSeq);
|
||||
const context = runContextById.get(event.runId);
|
||||
const nextSeq = (state.seqByRun.get(event.runId) ?? 0) + 1;
|
||||
state.seqByRun.set(event.runId, nextSeq);
|
||||
const context = state.runContextById.get(event.runId);
|
||||
const isControlUiVisible = context?.isControlUiVisible ?? true;
|
||||
const eventSessionKey =
|
||||
typeof event.sessionKey === "string" && event.sessionKey.trim() ? event.sessionKey : undefined;
|
||||
@ -73,7 +83,7 @@ export function emitAgentEvent(event: Omit<AgentEventPayload, "seq" | "ts">) {
|
||||
seq: nextSeq,
|
||||
ts: Date.now(),
|
||||
};
|
||||
for (const listener of listeners) {
|
||||
for (const listener of state.listeners) {
|
||||
try {
|
||||
listener(enriched);
|
||||
} catch {
|
||||
@ -83,6 +93,12 @@ export function emitAgentEvent(event: Omit<AgentEventPayload, "seq" | "ts">) {
|
||||
}
|
||||
|
||||
export function onAgentEvent(listener: (evt: AgentEventPayload) => void) {
|
||||
listeners.add(listener);
|
||||
return () => listeners.delete(listener);
|
||||
state.listeners.add(listener);
|
||||
return () => state.listeners.delete(listener);
|
||||
}
|
||||
|
||||
export function resetAgentEventsForTest() {
|
||||
state.seqByRun.clear();
|
||||
state.listeners.clear();
|
||||
state.runContextById.clear();
|
||||
}
|
||||
|
||||
@ -3,9 +3,18 @@ import {
|
||||
emitHeartbeatEvent,
|
||||
getLastHeartbeatEvent,
|
||||
onHeartbeatEvent,
|
||||
resetHeartbeatEventsForTest,
|
||||
resolveIndicatorType,
|
||||
} from "./heartbeat-events.js";
|
||||
|
||||
type HeartbeatEventsModule = typeof import("./heartbeat-events.js");
|
||||
|
||||
const heartbeatEventsModuleUrl = new URL("./heartbeat-events.ts", import.meta.url).href;
|
||||
|
||||
async function importHeartbeatEventsModule(cacheBust: string): Promise<HeartbeatEventsModule> {
|
||||
return (await import(`${heartbeatEventsModuleUrl}?t=${cacheBust}`)) as HeartbeatEventsModule;
|
||||
}
|
||||
|
||||
describe("resolveIndicatorType", () => {
|
||||
it("maps heartbeat statuses to indicator types", () => {
|
||||
expect(resolveIndicatorType("ok-empty")).toBe("ok");
|
||||
@ -23,6 +32,7 @@ describe("heartbeat events", () => {
|
||||
});
|
||||
|
||||
afterEach(() => {
|
||||
resetHeartbeatEventsForTest();
|
||||
vi.useRealTimers();
|
||||
});
|
||||
|
||||
@ -56,4 +66,28 @@ describe("heartbeat events", () => {
|
||||
|
||||
expect(seen).toEqual(["first:ok-empty", "third:ok-empty"]);
|
||||
});
|
||||
|
||||
it("shares heartbeat state across duplicate module instances", async () => {
|
||||
const first = await importHeartbeatEventsModule(`first-${Date.now()}`);
|
||||
const second = await importHeartbeatEventsModule(`second-${Date.now()}`);
|
||||
|
||||
first.resetHeartbeatEventsForTest();
|
||||
|
||||
const seen: string[] = [];
|
||||
const stop = first.onHeartbeatEvent((evt) => {
|
||||
seen.push(evt.status);
|
||||
});
|
||||
|
||||
second.emitHeartbeatEvent({ status: "ok-token", preview: "pong" });
|
||||
|
||||
expect(first.getLastHeartbeatEvent()).toEqual({
|
||||
ts: 1767960000000,
|
||||
status: "ok-token",
|
||||
preview: "pong",
|
||||
});
|
||||
expect(seen).toEqual(["ok-token"]);
|
||||
|
||||
stop();
|
||||
first.resetHeartbeatEventsForTest();
|
||||
});
|
||||
});
|
||||
|
||||
@ -1,3 +1,5 @@
|
||||
import { resolveGlobalSingleton } from "../shared/global-singleton.js";
|
||||
|
||||
export type HeartbeatIndicatorType = "ok" | "alert" | "error";
|
||||
|
||||
export type HeartbeatEventPayload = {
|
||||
@ -33,13 +35,22 @@ export function resolveIndicatorType(
|
||||
}
|
||||
}
|
||||
|
||||
let lastHeartbeat: HeartbeatEventPayload | null = null;
|
||||
const listeners = new Set<(evt: HeartbeatEventPayload) => void>();
|
||||
type HeartbeatEventState = {
|
||||
lastHeartbeat: HeartbeatEventPayload | null;
|
||||
listeners: Set<(evt: HeartbeatEventPayload) => void>;
|
||||
};
|
||||
|
||||
const HEARTBEAT_EVENT_STATE_KEY = Symbol.for("openclaw.heartbeatEvents.state");
|
||||
|
||||
const state = resolveGlobalSingleton<HeartbeatEventState>(HEARTBEAT_EVENT_STATE_KEY, () => ({
|
||||
lastHeartbeat: null,
|
||||
listeners: new Set<(evt: HeartbeatEventPayload) => void>(),
|
||||
}));
|
||||
|
||||
export function emitHeartbeatEvent(evt: Omit<HeartbeatEventPayload, "ts">) {
|
||||
const enriched: HeartbeatEventPayload = { ts: Date.now(), ...evt };
|
||||
lastHeartbeat = enriched;
|
||||
for (const listener of listeners) {
|
||||
state.lastHeartbeat = enriched;
|
||||
for (const listener of state.listeners) {
|
||||
try {
|
||||
listener(enriched);
|
||||
} catch {
|
||||
@ -49,10 +60,15 @@ export function emitHeartbeatEvent(evt: Omit<HeartbeatEventPayload, "ts">) {
|
||||
}
|
||||
|
||||
export function onHeartbeatEvent(listener: (evt: HeartbeatEventPayload) => void): () => void {
|
||||
listeners.add(listener);
|
||||
return () => listeners.delete(listener);
|
||||
state.listeners.add(listener);
|
||||
return () => state.listeners.delete(listener);
|
||||
}
|
||||
|
||||
export function getLastHeartbeatEvent(): HeartbeatEventPayload | null {
|
||||
return lastHeartbeat;
|
||||
return state.lastHeartbeat;
|
||||
}
|
||||
|
||||
export function resetHeartbeatEventsForTest(): void {
|
||||
state.lastHeartbeat = null;
|
||||
state.listeners.clear();
|
||||
}
|
||||
|
||||
@ -4,10 +4,25 @@ import {
|
||||
getSessionBindingService,
|
||||
isSessionBindingError,
|
||||
registerSessionBindingAdapter,
|
||||
unregisterSessionBindingAdapter,
|
||||
type SessionBindingAdapter,
|
||||
type SessionBindingBindInput,
|
||||
type SessionBindingRecord,
|
||||
} from "./session-binding-service.js";
|
||||
|
||||
type SessionBindingServiceModule = typeof import("./session-binding-service.js");
|
||||
|
||||
const sessionBindingServiceModuleUrl = new URL("./session-binding-service.ts", import.meta.url)
|
||||
.href;
|
||||
|
||||
async function importSessionBindingServiceModule(
|
||||
cacheBust: string,
|
||||
): Promise<SessionBindingServiceModule> {
|
||||
return (await import(
|
||||
`${sessionBindingServiceModuleUrl}?t=${cacheBust}`
|
||||
)) as SessionBindingServiceModule;
|
||||
}
|
||||
|
||||
function createRecord(input: SessionBindingBindInput): SessionBindingRecord {
|
||||
const conversationId =
|
||||
input.placement === "child"
|
||||
@ -199,23 +214,147 @@ describe("session binding service", () => {
|
||||
});
|
||||
});
|
||||
|
||||
it("rejects duplicate adapter registration for the same channel account", () => {
|
||||
registerSessionBindingAdapter({
|
||||
it("keeps the first live adapter authoritative until it unregisters", () => {
|
||||
const firstBinding = {
|
||||
bindingId: "first-binding",
|
||||
targetSessionKey: "agent:main",
|
||||
targetKind: "session" as const,
|
||||
conversation: {
|
||||
channel: "discord",
|
||||
accountId: "default",
|
||||
conversationId: "thread-1",
|
||||
},
|
||||
status: "active" as const,
|
||||
boundAt: 1,
|
||||
};
|
||||
const firstAdapter: SessionBindingAdapter = {
|
||||
channel: "discord",
|
||||
accountId: "default",
|
||||
bind: async (input) => createRecord(input),
|
||||
listBySession: (targetSessionKey) =>
|
||||
targetSessionKey === "agent:main" ? [firstBinding] : [],
|
||||
resolveByConversation: () => null,
|
||||
};
|
||||
const secondAdapter: SessionBindingAdapter = {
|
||||
channel: "Discord",
|
||||
accountId: "DEFAULT",
|
||||
listBySession: () => [],
|
||||
resolveByConversation: () => null,
|
||||
};
|
||||
|
||||
registerSessionBindingAdapter(firstAdapter);
|
||||
registerSessionBindingAdapter(secondAdapter);
|
||||
|
||||
expect(getSessionBindingService().listBySession("agent:main")).toEqual([firstBinding]);
|
||||
|
||||
unregisterSessionBindingAdapter({
|
||||
channel: "discord",
|
||||
accountId: "default",
|
||||
adapter: secondAdapter,
|
||||
});
|
||||
|
||||
expect(() =>
|
||||
registerSessionBindingAdapter({
|
||||
channel: "Discord",
|
||||
accountId: "DEFAULT",
|
||||
bind: async (input) => createRecord(input),
|
||||
listBySession: () => [],
|
||||
resolveByConversation: () => null,
|
||||
expect(getSessionBindingService().listBySession("agent:main")).toEqual([firstBinding]);
|
||||
|
||||
unregisterSessionBindingAdapter({
|
||||
channel: "discord",
|
||||
accountId: "default",
|
||||
adapter: firstAdapter,
|
||||
});
|
||||
|
||||
expect(getSessionBindingService().listBySession("agent:main")).toEqual([]);
|
||||
});
|
||||
|
||||
it("shares registered adapters across duplicate module instances", async () => {
|
||||
const first = await importSessionBindingServiceModule(`first-${Date.now()}`);
|
||||
const second = await importSessionBindingServiceModule(`second-${Date.now()}`);
|
||||
const firstBind = vi.fn(async (input: SessionBindingBindInput) => createRecord(input));
|
||||
const secondBind = vi.fn(async (input: SessionBindingBindInput) => createRecord(input));
|
||||
const firstAdapter: SessionBindingAdapter = {
|
||||
channel: "discord",
|
||||
accountId: "default",
|
||||
bind: firstBind,
|
||||
listBySession: () => [],
|
||||
resolveByConversation: () => null,
|
||||
};
|
||||
const secondAdapter: SessionBindingAdapter = {
|
||||
channel: "discord",
|
||||
accountId: "default",
|
||||
bind: secondBind,
|
||||
listBySession: () => [],
|
||||
resolveByConversation: () => null,
|
||||
};
|
||||
|
||||
first.__testing.resetSessionBindingAdaptersForTests();
|
||||
first.registerSessionBindingAdapter(firstAdapter);
|
||||
second.registerSessionBindingAdapter(secondAdapter);
|
||||
|
||||
expect(second.__testing.getRegisteredAdapterKeys()).toEqual(["discord:default"]);
|
||||
|
||||
await expect(
|
||||
second.getSessionBindingService().bind({
|
||||
targetSessionKey: "agent:main:subagent:child-1",
|
||||
targetKind: "subagent",
|
||||
conversation: {
|
||||
channel: "discord",
|
||||
accountId: "default",
|
||||
conversationId: "thread-1",
|
||||
},
|
||||
}),
|
||||
).toThrow("Session binding adapter already registered for discord:default");
|
||||
).resolves.toMatchObject({
|
||||
conversation: expect.objectContaining({
|
||||
channel: "discord",
|
||||
accountId: "default",
|
||||
conversationId: "thread-1",
|
||||
}),
|
||||
});
|
||||
expect(firstBind).toHaveBeenCalledTimes(1);
|
||||
expect(secondBind).not.toHaveBeenCalled();
|
||||
|
||||
first.unregisterSessionBindingAdapter({
|
||||
channel: "discord",
|
||||
accountId: "default",
|
||||
adapter: firstAdapter,
|
||||
});
|
||||
|
||||
await expect(
|
||||
second.getSessionBindingService().bind({
|
||||
targetSessionKey: "agent:main:subagent:child-2",
|
||||
targetKind: "subagent",
|
||||
conversation: {
|
||||
channel: "discord",
|
||||
accountId: "default",
|
||||
conversationId: "thread-2",
|
||||
},
|
||||
}),
|
||||
).resolves.toMatchObject({
|
||||
conversation: expect.objectContaining({
|
||||
channel: "discord",
|
||||
accountId: "default",
|
||||
conversationId: "thread-2",
|
||||
}),
|
||||
});
|
||||
expect(firstBind).toHaveBeenCalledTimes(1);
|
||||
expect(secondBind).toHaveBeenCalledTimes(1);
|
||||
|
||||
second.unregisterSessionBindingAdapter({
|
||||
channel: "discord",
|
||||
accountId: "default",
|
||||
adapter: secondAdapter,
|
||||
});
|
||||
|
||||
await expect(
|
||||
second.getSessionBindingService().bind({
|
||||
targetSessionKey: "agent:main:subagent:child-3",
|
||||
targetKind: "subagent",
|
||||
conversation: {
|
||||
channel: "discord",
|
||||
accountId: "default",
|
||||
conversationId: "thread-3",
|
||||
},
|
||||
}),
|
||||
).rejects.toMatchObject({
|
||||
code: "BINDING_ADAPTER_UNAVAILABLE",
|
||||
});
|
||||
|
||||
first.__testing.resetSessionBindingAdaptersForTests();
|
||||
});
|
||||
});
|
||||
|
||||
@ -1,4 +1,5 @@
|
||||
import { normalizeAccountId } from "../../routing/session-key.js";
|
||||
import { resolveGlobalMap } from "../../shared/global-singleton.js";
|
||||
|
||||
export type BindingTargetKind = "subagent" | "session";
|
||||
export type BindingStatus = "active" | "ending" | "ended";
|
||||
@ -145,7 +146,21 @@ function resolveAdapterCapabilities(
|
||||
};
|
||||
}
|
||||
|
||||
const ADAPTERS_BY_CHANNEL_ACCOUNT = new Map<string, SessionBindingAdapter>();
|
||||
const SESSION_BINDING_ADAPTERS_KEY = Symbol.for("openclaw.sessionBinding.adapters");
|
||||
|
||||
type SessionBindingAdapterRegistration = {
|
||||
adapter: SessionBindingAdapter;
|
||||
normalizedAdapter: SessionBindingAdapter;
|
||||
};
|
||||
|
||||
const ADAPTERS_BY_CHANNEL_ACCOUNT = resolveGlobalMap<string, SessionBindingAdapterRegistration[]>(
|
||||
SESSION_BINDING_ADAPTERS_KEY,
|
||||
);
|
||||
|
||||
function getActiveAdapterForKey(key: string): SessionBindingAdapter | null {
|
||||
const registrations = ADAPTERS_BY_CHANNEL_ACCOUNT.get(key);
|
||||
return registrations?.[0]?.normalizedAdapter ?? null;
|
||||
}
|
||||
|
||||
export function registerSessionBindingAdapter(adapter: SessionBindingAdapter): void {
|
||||
const normalizedAdapter = {
|
||||
@ -158,19 +173,42 @@ export function registerSessionBindingAdapter(adapter: SessionBindingAdapter): v
|
||||
accountId: normalizedAdapter.accountId,
|
||||
});
|
||||
const existing = ADAPTERS_BY_CHANNEL_ACCOUNT.get(key);
|
||||
if (existing && existing !== adapter) {
|
||||
throw new Error(
|
||||
`Session binding adapter already registered for ${normalizedAdapter.channel}:${normalizedAdapter.accountId}`,
|
||||
);
|
||||
}
|
||||
ADAPTERS_BY_CHANNEL_ACCOUNT.set(key, normalizedAdapter);
|
||||
const registrations = existing ? [...existing] : [];
|
||||
registrations.push({
|
||||
adapter,
|
||||
normalizedAdapter,
|
||||
});
|
||||
ADAPTERS_BY_CHANNEL_ACCOUNT.set(key, registrations);
|
||||
}
|
||||
|
||||
export function unregisterSessionBindingAdapter(params: {
|
||||
channel: string;
|
||||
accountId: string;
|
||||
adapter?: SessionBindingAdapter;
|
||||
}): void {
|
||||
ADAPTERS_BY_CHANNEL_ACCOUNT.delete(toAdapterKey(params));
|
||||
const key = toAdapterKey(params);
|
||||
const registrations = ADAPTERS_BY_CHANNEL_ACCOUNT.get(key);
|
||||
if (!registrations || registrations.length === 0) {
|
||||
return;
|
||||
}
|
||||
const nextRegistrations = [...registrations];
|
||||
if (params.adapter) {
|
||||
// Remove the matching owner so a surviving duplicate graph can stay active.
|
||||
const registrationIndex = nextRegistrations.findLastIndex(
|
||||
(registration) => registration.adapter === params.adapter,
|
||||
);
|
||||
if (registrationIndex < 0) {
|
||||
return;
|
||||
}
|
||||
nextRegistrations.splice(registrationIndex, 1);
|
||||
} else {
|
||||
nextRegistrations.pop();
|
||||
}
|
||||
if (nextRegistrations.length === 0) {
|
||||
ADAPTERS_BY_CHANNEL_ACCOUNT.delete(key);
|
||||
return;
|
||||
}
|
||||
ADAPTERS_BY_CHANNEL_ACCOUNT.set(key, nextRegistrations);
|
||||
}
|
||||
|
||||
function resolveAdapterForConversation(ref: ConversationRef): SessionBindingAdapter | null {
|
||||
@ -188,7 +226,13 @@ function resolveAdapterForChannelAccount(params: {
|
||||
channel: params.channel,
|
||||
accountId: params.accountId,
|
||||
});
|
||||
return ADAPTERS_BY_CHANNEL_ACCOUNT.get(key) ?? null;
|
||||
return getActiveAdapterForKey(key);
|
||||
}
|
||||
|
||||
function getActiveRegisteredAdapters(): SessionBindingAdapter[] {
|
||||
return [...ADAPTERS_BY_CHANNEL_ACCOUNT.values()]
|
||||
.map((registrations) => registrations[0]?.normalizedAdapter ?? null)
|
||||
.filter((adapter): adapter is SessionBindingAdapter => Boolean(adapter));
|
||||
}
|
||||
|
||||
function dedupeBindings(records: SessionBindingRecord[]): SessionBindingRecord[] {
|
||||
@ -272,7 +316,7 @@ function createDefaultSessionBindingService(): SessionBindingService {
|
||||
return [];
|
||||
}
|
||||
const results: SessionBindingRecord[] = [];
|
||||
for (const adapter of ADAPTERS_BY_CHANNEL_ACCOUNT.values()) {
|
||||
for (const adapter of getActiveRegisteredAdapters()) {
|
||||
const entries = adapter.listBySession(key);
|
||||
if (entries.length > 0) {
|
||||
results.push(...entries);
|
||||
@ -296,13 +340,13 @@ function createDefaultSessionBindingService(): SessionBindingService {
|
||||
if (!normalizedBindingId) {
|
||||
return;
|
||||
}
|
||||
for (const adapter of ADAPTERS_BY_CHANNEL_ACCOUNT.values()) {
|
||||
for (const adapter of getActiveRegisteredAdapters()) {
|
||||
adapter.touch?.(normalizedBindingId, at);
|
||||
}
|
||||
},
|
||||
unbind: async (input) => {
|
||||
const removed: SessionBindingRecord[] = [];
|
||||
for (const adapter of ADAPTERS_BY_CHANNEL_ACCOUNT.values()) {
|
||||
for (const adapter of getActiveRegisteredAdapters()) {
|
||||
if (!adapter.unbind) {
|
||||
continue;
|
||||
}
|
||||
|
||||
@ -13,6 +13,14 @@ import {
|
||||
resetSystemEventsForTest,
|
||||
} from "./system-events.js";
|
||||
|
||||
type SystemEventsModule = typeof import("./system-events.js");
|
||||
|
||||
const systemEventsModuleUrl = new URL("./system-events.ts", import.meta.url).href;
|
||||
|
||||
async function importSystemEventsModule(cacheBust: string): Promise<SystemEventsModule> {
|
||||
return (await import(`${systemEventsModuleUrl}?t=${cacheBust}`)) as SystemEventsModule;
|
||||
}
|
||||
|
||||
const cfg = {} as unknown as OpenClawConfig;
|
||||
const mainKey = resolveMainSessionKey(cfg);
|
||||
|
||||
@ -108,6 +116,26 @@ describe("system events (session routing)", () => {
|
||||
);
|
||||
});
|
||||
|
||||
it("shares queued events across duplicate module instances", async () => {
|
||||
const first = await importSystemEventsModule(`first-${Date.now()}`);
|
||||
const second = await importSystemEventsModule(`second-${Date.now()}`);
|
||||
const key = "agent:main:test-duplicate-module";
|
||||
|
||||
first.resetSystemEventsForTest();
|
||||
second.enqueueSystemEvent("Node connected", { sessionKey: key, contextKey: "build:123" });
|
||||
|
||||
expect(first.peekSystemEventEntries(key)).toEqual([
|
||||
expect.objectContaining({
|
||||
text: "Node connected",
|
||||
contextKey: "build:123",
|
||||
}),
|
||||
]);
|
||||
expect(first.isSystemEventContextChanged(key, "build:123")).toBe(false);
|
||||
expect(first.drainSystemEvents(key)).toEqual(["Node connected"]);
|
||||
|
||||
first.resetSystemEventsForTest();
|
||||
});
|
||||
|
||||
it("filters heartbeat/noise lines, returning undefined", async () => {
|
||||
const key = "agent:main:test-heartbeat-filter";
|
||||
enqueueSystemEvent("Read HEARTBEAT.md before continuing", { sessionKey: key });
|
||||
|
||||
@ -2,6 +2,8 @@
|
||||
// prefixed to the next prompt. We intentionally avoid persistence to keep
|
||||
// events ephemeral. Events are session-scoped and require an explicit key.
|
||||
|
||||
import { resolveGlobalMap } from "../shared/global-singleton.js";
|
||||
|
||||
export type SystemEvent = { text: string; ts: number; contextKey?: string | null };
|
||||
|
||||
const MAX_EVENTS = 20;
|
||||
@ -12,7 +14,9 @@ type SessionQueue = {
|
||||
lastContextKey: string | null;
|
||||
};
|
||||
|
||||
const queues = new Map<string, SessionQueue>();
|
||||
const SYSTEM_EVENT_QUEUES_KEY = Symbol.for("openclaw.systemEvents.queues");
|
||||
|
||||
const queues = resolveGlobalMap<string, SessionQueue>(SYSTEM_EVENT_QUEUES_KEY);
|
||||
|
||||
type SystemEventOptions = {
|
||||
sessionKey: string;
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user