Plugins: harden bound conversation routing

This commit is contained in:
huntharo 2026-03-15 16:07:56 -04:00 committed by Vincent Koc
parent 4adddbdab3
commit f554b736f5
10 changed files with 775 additions and 99 deletions

View File

@ -323,12 +323,12 @@ export async function reconcileAcpThreadBindingsOnStartup(params: {
}; };
} }
const acpBindings = manager.listBindings().filter((binding) => { const acpBindings = manager
if (binding.targetKind !== "acp") { .listBindings()
return false; .filter(
} (binding) =>
return binding.metadata?.pluginBindingOwner !== "plugin"; binding.targetKind === "acp" && binding.metadata?.pluginBindingOwner !== "plugin",
}); );
const staleBindings: ThreadBindingRecord[] = []; const staleBindings: ThreadBindingRecord[] = [];
const probeTargets: Array<{ const probeTargets: Array<{
binding: ThreadBindingRecord; binding: ThreadBindingRecord;

View File

@ -23,10 +23,17 @@ const diagnosticMocks = vi.hoisted(() => ({
logSessionStateChange: vi.fn(), logSessionStateChange: vi.fn(),
})); }));
const hookMocks = vi.hoisted(() => ({ const hookMocks = vi.hoisted(() => ({
registry: {
plugins: [] as Array<{
id: string;
status: "loaded" | "disabled" | "error";
}>,
},
runner: { runner: {
hasHooks: vi.fn(() => false), hasHooks: vi.fn(() => false),
runInboundClaim: vi.fn(async () => undefined), runInboundClaim: vi.fn(async () => undefined),
runInboundClaimForPlugin: vi.fn(async () => undefined), runInboundClaimForPlugin: vi.fn(async () => undefined),
runInboundClaimForPluginOutcome: vi.fn(async () => ({ status: "no_handler" as const })),
runMessageReceived: vi.fn(async () => {}), runMessageReceived: vi.fn(async () => {}),
}, },
})); }));
@ -136,6 +143,7 @@ vi.mock("../../config/sessions.js", async (importOriginal) => {
vi.mock("../../plugins/hook-runner-global.js", () => ({ vi.mock("../../plugins/hook-runner-global.js", () => ({
getGlobalHookRunner: () => hookMocks.runner, getGlobalHookRunner: () => hookMocks.runner,
getGlobalPluginRegistry: () => hookMocks.registry,
})); }));
vi.mock("../../hooks/internal-hooks.js", () => ({ vi.mock("../../hooks/internal-hooks.js", () => ({
createInternalHookEvent: internalHookMocks.createInternalHookEvent, createInternalHookEvent: internalHookMocks.createInternalHookEvent,
@ -181,6 +189,7 @@ vi.mock("../../tts/tts.js", () => ({
const { dispatchReplyFromConfig } = await import("./dispatch-from-config.js"); const { dispatchReplyFromConfig } = await import("./dispatch-from-config.js");
const { resetInboundDedupe } = await import("./inbound-dedupe.js"); const { resetInboundDedupe } = await import("./inbound-dedupe.js");
const { __testing: acpManagerTesting } = await import("../../acp/control-plane/manager.js"); const { __testing: acpManagerTesting } = await import("../../acp/control-plane/manager.js");
const { __testing: pluginBindingTesting } = await import("../../plugins/conversation-binding.js");
const noAbortResult = { handled: false, aborted: false } as const; const noAbortResult = { handled: false, aborted: false } as const;
const emptyConfig = {} as OpenClawConfig; const emptyConfig = {} as OpenClawConfig;
@ -254,7 +263,12 @@ describe("dispatchReplyFromConfig", () => {
hookMocks.runner.runInboundClaim.mockResolvedValue(undefined); hookMocks.runner.runInboundClaim.mockResolvedValue(undefined);
hookMocks.runner.runInboundClaimForPlugin.mockClear(); hookMocks.runner.runInboundClaimForPlugin.mockClear();
hookMocks.runner.runInboundClaimForPlugin.mockResolvedValue(undefined); hookMocks.runner.runInboundClaimForPlugin.mockResolvedValue(undefined);
hookMocks.runner.runInboundClaimForPluginOutcome.mockClear();
hookMocks.runner.runInboundClaimForPluginOutcome.mockResolvedValue({
status: "no_handler",
});
hookMocks.runner.runMessageReceived.mockClear(); hookMocks.runner.runMessageReceived.mockClear();
hookMocks.registry.plugins = [];
internalHookMocks.createInternalHookEvent.mockClear(); internalHookMocks.createInternalHookEvent.mockClear();
internalHookMocks.createInternalHookEvent.mockImplementation(createInternalHookEventPayload); internalHookMocks.createInternalHookEvent.mockImplementation(createInternalHookEventPayload);
internalHookMocks.triggerInternalHook.mockClear(); internalHookMocks.triggerInternalHook.mockClear();
@ -265,13 +279,14 @@ describe("dispatchReplyFromConfig", () => {
acpMocks.requireAcpRuntimeBackend.mockReset(); acpMocks.requireAcpRuntimeBackend.mockReset();
sessionBindingMocks.listBySession.mockReset(); sessionBindingMocks.listBySession.mockReset();
sessionBindingMocks.listBySession.mockReturnValue([]); sessionBindingMocks.listBySession.mockReturnValue([]);
pluginBindingTesting.reset();
sessionBindingMocks.resolveByConversation.mockReset();
sessionBindingMocks.resolveByConversation.mockReturnValue(null);
sessionBindingMocks.touch.mockReset();
sessionStoreMocks.currentEntry = undefined; sessionStoreMocks.currentEntry = undefined;
sessionStoreMocks.loadSessionStore.mockClear(); sessionStoreMocks.loadSessionStore.mockClear();
sessionStoreMocks.resolveStorePath.mockClear(); sessionStoreMocks.resolveStorePath.mockClear();
sessionStoreMocks.resolveSessionStoreEntry.mockClear(); sessionStoreMocks.resolveSessionStoreEntry.mockClear();
sessionBindingMocks.resolveByConversation.mockReset();
sessionBindingMocks.resolveByConversation.mockReturnValue(null);
sessionBindingMocks.touch.mockReset();
ttsMocks.state.synthesizeFinalAudio = false; ttsMocks.state.synthesizeFinalAudio = false;
ttsMocks.maybeApplyTtsToPayload.mockClear(); ttsMocks.maybeApplyTtsToPayload.mockClear();
ttsMocks.normalizeTtsAutoMode.mockClear(); ttsMocks.normalizeTtsAutoMode.mockClear();
@ -2033,6 +2048,11 @@ describe("dispatchReplyFromConfig", () => {
((hookName?: string) => ((hookName?: string) =>
hookName === "inbound_claim" || hookName === "message_received") as () => boolean, hookName === "inbound_claim" || hookName === "message_received") as () => boolean,
); );
hookMocks.registry.plugins = [{ id: "openclaw-codex-app-server", status: "loaded" }];
hookMocks.runner.runInboundClaimForPluginOutcome.mockResolvedValue({
status: "handled",
result: { handled: true },
});
sessionBindingMocks.resolveByConversation.mockReturnValue({ sessionBindingMocks.resolveByConversation.mockReturnValue({
bindingId: "binding-1", bindingId: "binding-1",
targetSessionKey: "plugin-binding:codex:abc123", targetSessionKey: "plugin-binding:codex:abc123",
@ -2075,7 +2095,7 @@ describe("dispatchReplyFromConfig", () => {
expect(result).toEqual({ queuedFinal: false, counts: { tool: 0, block: 0, final: 0 } }); expect(result).toEqual({ queuedFinal: false, counts: { tool: 0, block: 0, final: 0 } });
expect(sessionBindingMocks.touch).toHaveBeenCalledWith("binding-1"); expect(sessionBindingMocks.touch).toHaveBeenCalledWith("binding-1");
expect(hookMocks.runner.runInboundClaimForPlugin).toHaveBeenCalledWith( expect(hookMocks.runner.runInboundClaimForPluginOutcome).toHaveBeenCalledWith(
"openclaw-codex-app-server", "openclaw-codex-app-server",
expect.objectContaining({ expect.objectContaining({
channel: "discord", channel: "discord",
@ -2099,6 +2119,11 @@ describe("dispatchReplyFromConfig", () => {
((hookName?: string) => ((hookName?: string) =>
hookName === "inbound_claim" || hookName === "message_received") as () => boolean, hookName === "inbound_claim" || hookName === "message_received") as () => boolean,
); );
hookMocks.registry.plugins = [{ id: "openclaw-codex-app-server", status: "loaded" }];
hookMocks.runner.runInboundClaimForPluginOutcome.mockResolvedValue({
status: "handled",
result: { handled: true },
});
sessionBindingMocks.resolveByConversation.mockReturnValue({ sessionBindingMocks.resolveByConversation.mockReturnValue({
bindingId: "binding-dm-1", bindingId: "binding-dm-1",
targetSessionKey: "plugin-binding:codex:dm123", targetSessionKey: "plugin-binding:codex:dm123",
@ -2142,7 +2167,7 @@ describe("dispatchReplyFromConfig", () => {
expect(result).toEqual({ queuedFinal: false, counts: { tool: 0, block: 0, final: 0 } }); expect(result).toEqual({ queuedFinal: false, counts: { tool: 0, block: 0, final: 0 } });
expect(sessionBindingMocks.touch).toHaveBeenCalledWith("binding-dm-1"); expect(sessionBindingMocks.touch).toHaveBeenCalledWith("binding-dm-1");
expect(hookMocks.runner.runInboundClaimForPlugin).toHaveBeenCalledWith( expect(hookMocks.runner.runInboundClaimForPluginOutcome).toHaveBeenCalledWith(
"openclaw-codex-app-server", "openclaw-codex-app-server",
expect.objectContaining({ expect.objectContaining({
channel: "discord", channel: "discord",
@ -2160,6 +2185,268 @@ describe("dispatchReplyFromConfig", () => {
expect(replyResolver).not.toHaveBeenCalled(); expect(replyResolver).not.toHaveBeenCalled();
}); });
it("falls back to OpenClaw once per startup when a bound plugin is missing", async () => {
setNoAbort();
hookMocks.runner.hasHooks.mockImplementation(
((hookName?: string) =>
hookName === "inbound_claim" || hookName === "message_received") as () => boolean,
);
hookMocks.runner.runInboundClaimForPluginOutcome.mockResolvedValue({
status: "missing_plugin",
});
sessionBindingMocks.resolveByConversation.mockReturnValue({
bindingId: "binding-missing-1",
targetSessionKey: "plugin-binding:codex:missing123",
targetKind: "session",
conversation: {
channel: "discord",
accountId: "default",
conversationId: "channel:missing-plugin",
},
status: "active",
boundAt: 1710000000000,
metadata: {
pluginBindingOwner: "plugin",
pluginId: "openclaw-codex-app-server",
pluginName: "Codex App Server",
pluginRoot: "/Users/huntharo/github/openclaw-app-server",
detachHint: "/codex_detach",
},
} satisfies SessionBindingRecord);
const replyResolver = vi.fn(async () => ({ text: "openclaw fallback" }) satisfies ReplyPayload);
const firstDispatcher = createDispatcher();
await dispatchReplyFromConfig({
ctx: buildTestCtx({
Provider: "discord",
Surface: "discord",
OriginatingChannel: "discord",
OriginatingTo: "discord:channel:missing-plugin",
To: "discord:channel:missing-plugin",
AccountId: "default",
MessageSid: "msg-missing-plugin-1",
SessionKey: "agent:main:discord:channel:missing-plugin",
CommandBody: "hello",
RawBody: "hello",
Body: "hello",
}),
cfg: emptyConfig,
dispatcher: firstDispatcher,
replyResolver,
});
const firstNotice = (firstDispatcher.sendToolResult as ReturnType<typeof vi.fn>).mock
.calls[0]?.[0] as ReplyPayload | undefined;
expect(firstNotice?.text).toContain("Routing this message to OpenClaw instead.");
expect(firstNotice?.text).toContain("/codex_detach");
expect(replyResolver).toHaveBeenCalledTimes(1);
expect(hookMocks.runner.runInboundClaim).not.toHaveBeenCalled();
replyResolver.mockClear();
hookMocks.runner.runInboundClaim.mockClear();
const secondDispatcher = createDispatcher();
await dispatchReplyFromConfig({
ctx: buildTestCtx({
Provider: "discord",
Surface: "discord",
OriginatingChannel: "discord",
OriginatingTo: "discord:channel:missing-plugin",
To: "discord:channel:missing-plugin",
AccountId: "default",
MessageSid: "msg-missing-plugin-2",
SessionKey: "agent:main:discord:channel:missing-plugin",
CommandBody: "still there?",
RawBody: "still there?",
Body: "still there?",
}),
cfg: emptyConfig,
dispatcher: secondDispatcher,
replyResolver,
});
expect(secondDispatcher.sendToolResult).not.toHaveBeenCalled();
expect(replyResolver).toHaveBeenCalledTimes(1);
expect(hookMocks.runner.runInboundClaim).not.toHaveBeenCalled();
});
it("falls back to OpenClaw when the bound plugin is loaded but has no inbound_claim handler", async () => {
setNoAbort();
hookMocks.runner.hasHooks.mockImplementation(
((hookName?: string) =>
hookName === "inbound_claim" || hookName === "message_received") as () => boolean,
);
hookMocks.registry.plugins = [{ id: "openclaw-codex-app-server", status: "loaded" }];
hookMocks.runner.runInboundClaimForPluginOutcome.mockResolvedValue({
status: "no_handler",
});
sessionBindingMocks.resolveByConversation.mockReturnValue({
bindingId: "binding-no-handler-1",
targetSessionKey: "plugin-binding:codex:nohandler123",
targetKind: "session",
conversation: {
channel: "discord",
accountId: "default",
conversationId: "channel:no-handler",
},
status: "active",
boundAt: 1710000000000,
metadata: {
pluginBindingOwner: "plugin",
pluginId: "openclaw-codex-app-server",
pluginName: "Codex App Server",
pluginRoot: "/Users/huntharo/github/openclaw-app-server",
},
} satisfies SessionBindingRecord);
const dispatcher = createDispatcher();
const replyResolver = vi.fn(async () => ({ text: "openclaw fallback" }) satisfies ReplyPayload);
await dispatchReplyFromConfig({
ctx: buildTestCtx({
Provider: "discord",
Surface: "discord",
OriginatingChannel: "discord",
OriginatingTo: "discord:channel:no-handler",
To: "discord:channel:no-handler",
AccountId: "default",
MessageSid: "msg-no-handler-1",
SessionKey: "agent:main:discord:channel:no-handler",
CommandBody: "hello",
RawBody: "hello",
Body: "hello",
}),
cfg: emptyConfig,
dispatcher,
replyResolver,
});
const notice = (dispatcher.sendToolResult as ReturnType<typeof vi.fn>).mock.calls[0]?.[0] as
| ReplyPayload
| undefined;
expect(notice?.text).toContain("Routing this message to OpenClaw instead.");
expect(replyResolver).toHaveBeenCalledTimes(1);
expect(hookMocks.runner.runInboundClaim).not.toHaveBeenCalled();
});
it("notifies the user when a bound plugin declines the turn and keeps the binding attached", async () => {
setNoAbort();
hookMocks.runner.hasHooks.mockImplementation(
((hookName?: string) =>
hookName === "inbound_claim" || hookName === "message_received") as () => boolean,
);
hookMocks.registry.plugins = [{ id: "openclaw-codex-app-server", status: "loaded" }];
hookMocks.runner.runInboundClaimForPluginOutcome.mockResolvedValue({
status: "declined",
});
sessionBindingMocks.resolveByConversation.mockReturnValue({
bindingId: "binding-declined-1",
targetSessionKey: "plugin-binding:codex:declined123",
targetKind: "session",
conversation: {
channel: "discord",
accountId: "default",
conversationId: "channel:declined",
},
status: "active",
boundAt: 1710000000000,
metadata: {
pluginBindingOwner: "plugin",
pluginId: "openclaw-codex-app-server",
pluginName: "Codex App Server",
pluginRoot: "/Users/huntharo/github/openclaw-app-server",
detachHint: "/codex_detach",
},
} satisfies SessionBindingRecord);
const dispatcher = createDispatcher();
const replyResolver = vi.fn(async () => ({ text: "should not run" }) satisfies ReplyPayload);
await dispatchReplyFromConfig({
ctx: buildTestCtx({
Provider: "discord",
Surface: "discord",
OriginatingChannel: "discord",
OriginatingTo: "discord:channel:declined",
To: "discord:channel:declined",
AccountId: "default",
MessageSid: "msg-declined-1",
SessionKey: "agent:main:discord:channel:declined",
CommandBody: "hello",
RawBody: "hello",
Body: "hello",
}),
cfg: emptyConfig,
dispatcher,
replyResolver,
});
const finalNotice = (dispatcher.sendFinalReply as ReturnType<typeof vi.fn>).mock
.calls[0]?.[0] as ReplyPayload | undefined;
expect(finalNotice?.text).toContain("did not handle this message");
expect(finalNotice?.text).toContain("/codex_detach");
expect(replyResolver).not.toHaveBeenCalled();
expect(hookMocks.runner.runInboundClaim).not.toHaveBeenCalled();
});
it("notifies the user when a bound plugin errors and keeps raw details out of the reply", async () => {
setNoAbort();
hookMocks.runner.hasHooks.mockImplementation(
((hookName?: string) =>
hookName === "inbound_claim" || hookName === "message_received") as () => boolean,
);
hookMocks.registry.plugins = [{ id: "openclaw-codex-app-server", status: "loaded" }];
hookMocks.runner.runInboundClaimForPluginOutcome.mockResolvedValue({
status: "error",
error: "boom",
});
sessionBindingMocks.resolveByConversation.mockReturnValue({
bindingId: "binding-error-1",
targetSessionKey: "plugin-binding:codex:error123",
targetKind: "session",
conversation: {
channel: "discord",
accountId: "default",
conversationId: "channel:error",
},
status: "active",
boundAt: 1710000000000,
metadata: {
pluginBindingOwner: "plugin",
pluginId: "openclaw-codex-app-server",
pluginName: "Codex App Server",
pluginRoot: "/Users/huntharo/github/openclaw-app-server",
},
} satisfies SessionBindingRecord);
const dispatcher = createDispatcher();
const replyResolver = vi.fn(async () => ({ text: "should not run" }) satisfies ReplyPayload);
await dispatchReplyFromConfig({
ctx: buildTestCtx({
Provider: "discord",
Surface: "discord",
OriginatingChannel: "discord",
OriginatingTo: "discord:channel:error",
To: "discord:channel:error",
AccountId: "default",
MessageSid: "msg-error-1",
SessionKey: "agent:main:discord:channel:error",
CommandBody: "hello",
RawBody: "hello",
Body: "hello",
}),
cfg: emptyConfig,
dispatcher,
replyResolver,
});
const finalNotice = (dispatcher.sendFinalReply as ReturnType<typeof vi.fn>).mock
.calls[0]?.[0] as ReplyPayload | undefined;
expect(finalNotice?.text).toContain("hit an error handling this message");
expect(finalNotice?.text).not.toContain("boom");
expect(replyResolver).not.toHaveBeenCalled();
expect(hookMocks.runner.runInboundClaim).not.toHaveBeenCalled();
});
it("marks diagnostics skipped for duplicate inbound messages", async () => { it("marks diagnostics skipped for duplicate inbound messages", async () => {
setNoAbort(); setNoAbort();
const cfg = { diagnostics: { enabled: true } } as OpenClawConfig; const cfg = { diagnostics: { enabled: true } } as OpenClawConfig;

View File

@ -27,10 +27,15 @@ import {
logSessionStateChange, logSessionStateChange,
} from "../../logging/diagnostic.js"; } from "../../logging/diagnostic.js";
import { import {
buildPluginBindingDeclinedText,
buildPluginBindingErrorText,
buildPluginBindingUnavailableText,
hasShownPluginBindingFallbackNotice,
isPluginOwnedSessionBindingRecord, isPluginOwnedSessionBindingRecord,
markPluginBindingFallbackNoticeShown,
toPluginConversationBinding, toPluginConversationBinding,
} from "../../plugins/conversation-binding.js"; } from "../../plugins/conversation-binding.js";
import { getGlobalHookRunner } from "../../plugins/hook-runner-global.js"; import { getGlobalHookRunner, getGlobalPluginRegistry } from "../../plugins/hook-runner-global.js";
import { resolveSendPolicy } from "../../sessions/send-policy.js"; import { resolveSendPolicy } from "../../sessions/send-policy.js";
import { maybeApplyTtsToPayload, normalizeTtsAutoMode, resolveTtsConfig } from "../../tts/tts.js"; import { maybeApplyTtsToPayload, normalizeTtsAutoMode, resolveTtsConfig } from "../../tts/tts.js";
import { INTERNAL_MESSAGE_CHANNEL, normalizeMessageChannel } from "../../utils/message-channel.js"; import { INTERNAL_MESSAGE_CHANNEL, normalizeMessageChannel } from "../../utils/message-channel.js";
@ -198,64 +203,11 @@ export async function dispatchReplyFromConfig(params: {
const hookContext = deriveInboundMessageHookContext(ctx, { messageId: messageIdForHook }); const hookContext = deriveInboundMessageHookContext(ctx, { messageId: messageIdForHook });
const { isGroup, groupId } = hookContext; const { isGroup, groupId } = hookContext;
const inboundClaimContext = toPluginInboundClaimContext(hookContext); const inboundClaimContext = toPluginInboundClaimContext(hookContext);
const inboundClaimEvent = toPluginInboundClaimEvent(hookContext, {
const pluginOwnedBindingRecord = commandAuthorized:
inboundClaimContext.conversationId && inboundClaimContext.channelId typeof ctx.CommandAuthorized === "boolean" ? ctx.CommandAuthorized : undefined,
? getSessionBindingService().resolveByConversation({ wasMentioned: typeof ctx.WasMentioned === "boolean" ? ctx.WasMentioned : undefined,
channel: inboundClaimContext.channelId, });
accountId: inboundClaimContext.accountId ?? "default",
conversationId: inboundClaimContext.conversationId,
parentConversationId: inboundClaimContext.parentConversationId,
})
: null;
const pluginOwnedBinding = isPluginOwnedSessionBindingRecord(pluginOwnedBindingRecord)
? toPluginConversationBinding(pluginOwnedBindingRecord)
: null;
if (pluginOwnedBinding) {
getSessionBindingService().touch(pluginOwnedBinding.bindingId);
logVerbose(
`plugin-bound inbound routed to ${pluginOwnedBinding.pluginId} conversation=${pluginOwnedBinding.conversationId}`,
);
if (hookRunner?.hasHooks("inbound_claim")) {
await hookRunner.runInboundClaimForPlugin(
pluginOwnedBinding.pluginId,
toPluginInboundClaimEvent(hookContext, {
commandAuthorized:
typeof ctx.CommandAuthorized === "boolean" ? ctx.CommandAuthorized : undefined,
wasMentioned: typeof ctx.WasMentioned === "boolean" ? ctx.WasMentioned : undefined,
}),
inboundClaimContext,
);
}
markIdle("plugin_binding_dispatch");
recordProcessed("completed", { reason: "plugin-bound" });
return { queuedFinal: false, counts: dispatcher.getQueuedCounts() };
}
// Trigger plugin hooks (fire-and-forget)
if (hookRunner?.hasHooks("message_received")) {
fireAndForgetHook(
hookRunner.runMessageReceived(
toPluginMessageReceivedEvent(hookContext),
toPluginMessageContext(hookContext),
),
"dispatch-from-config: message_received plugin hook failed",
);
}
// Bridge to internal hooks (HOOK.md discovery system) - refs #8807
if (sessionKey) {
fireAndForgetHook(
triggerInternalHook(
createInternalHookEvent("message", "received", sessionKey, {
...toInternalMessageReceivedContext(hookContext),
timestamp,
}),
),
"dispatch-from-config: message_received internal hook failed",
);
}
// Check if we should route replies to originating channel instead of dispatcher. // Check if we should route replies to originating channel instead of dispatcher.
// Only route when the originating channel is DIFFERENT from the current surface. // Only route when the originating channel is DIFFERENT from the current surface.
@ -321,6 +273,144 @@ export async function dispatchReplyFromConfig(params: {
} }
}; };
const sendBindingNotice = async (
payload: ReplyPayload,
mode: "additive" | "terminal",
): Promise<boolean> => {
if (shouldRouteToOriginating && originatingChannel && originatingTo) {
const result = await routeReply({
payload,
channel: originatingChannel,
to: originatingTo,
sessionKey: ctx.SessionKey,
accountId: ctx.AccountId,
threadId: routeThreadId,
cfg,
isGroup,
groupId,
});
if (!result.ok) {
logVerbose(
`dispatch-from-config: route-reply (plugin binding notice) failed: ${result.error ?? "unknown error"}`,
);
}
return result.ok;
}
return mode === "additive"
? dispatcher.sendToolResult(payload)
: dispatcher.sendFinalReply(payload);
};
const pluginOwnedBindingRecord =
inboundClaimContext.conversationId && inboundClaimContext.channelId
? getSessionBindingService().resolveByConversation({
channel: inboundClaimContext.channelId,
accountId: inboundClaimContext.accountId ?? "default",
conversationId: inboundClaimContext.conversationId,
parentConversationId: inboundClaimContext.parentConversationId,
})
: null;
const pluginOwnedBinding = isPluginOwnedSessionBindingRecord(pluginOwnedBindingRecord)
? toPluginConversationBinding(pluginOwnedBindingRecord)
: null;
let pluginFallbackReason:
| "plugin-bound-fallback-missing-plugin"
| "plugin-bound-fallback-no-handler"
| undefined;
if (pluginOwnedBinding) {
getSessionBindingService().touch(pluginOwnedBinding.bindingId);
logVerbose(
`plugin-bound inbound routed to ${pluginOwnedBinding.pluginId} conversation=${pluginOwnedBinding.conversationId}`,
);
const targetedClaimOutcome = hookRunner?.runInboundClaimForPluginOutcome
? await hookRunner.runInboundClaimForPluginOutcome(
pluginOwnedBinding.pluginId,
inboundClaimEvent,
inboundClaimContext,
)
: (() => {
const pluginLoaded =
getGlobalPluginRegistry()?.plugins.some(
(plugin) => plugin.id === pluginOwnedBinding.pluginId && plugin.status === "loaded",
) ?? false;
return pluginLoaded
? ({ status: "no_handler" } as const)
: ({ status: "missing_plugin" } as const);
})();
switch (targetedClaimOutcome.status) {
case "handled": {
markIdle("plugin_binding_dispatch");
recordProcessed("completed", { reason: "plugin-bound-handled" });
return { queuedFinal: false, counts: dispatcher.getQueuedCounts() };
}
case "missing_plugin":
case "no_handler": {
pluginFallbackReason =
targetedClaimOutcome.status === "missing_plugin"
? "plugin-bound-fallback-missing-plugin"
: "plugin-bound-fallback-no-handler";
if (!hasShownPluginBindingFallbackNotice(pluginOwnedBinding.bindingId)) {
const didSendNotice = await sendBindingNotice(
{ text: buildPluginBindingUnavailableText(pluginOwnedBinding) },
"additive",
);
if (didSendNotice) {
markPluginBindingFallbackNoticeShown(pluginOwnedBinding.bindingId);
}
}
break;
}
case "declined": {
await sendBindingNotice(
{ text: buildPluginBindingDeclinedText(pluginOwnedBinding) },
"terminal",
);
markIdle("plugin_binding_declined");
recordProcessed("completed", { reason: "plugin-bound-declined" });
return { queuedFinal: false, counts: dispatcher.getQueuedCounts() };
}
case "error": {
logVerbose(
`plugin-bound inbound claim failed for ${pluginOwnedBinding.pluginId}: ${targetedClaimOutcome.error}`,
);
await sendBindingNotice(
{ text: buildPluginBindingErrorText(pluginOwnedBinding) },
"terminal",
);
markIdle("plugin_binding_error");
recordProcessed("completed", { reason: "plugin-bound-error" });
return { queuedFinal: false, counts: dispatcher.getQueuedCounts() };
}
}
}
// Trigger plugin hooks (fire-and-forget)
if (hookRunner?.hasHooks("message_received")) {
fireAndForgetHook(
hookRunner.runMessageReceived(
toPluginMessageReceivedEvent(hookContext),
toPluginMessageContext(hookContext),
),
"dispatch-from-config: message_received plugin hook failed",
);
}
// Bridge to internal hooks (HOOK.md discovery system) - refs #8807
if (sessionKey) {
fireAndForgetHook(
triggerInternalHook(
createInternalHookEvent("message", "received", sessionKey, {
...toInternalMessageReceivedContext(hookContext),
timestamp,
}),
),
"dispatch-from-config: message_received internal hook failed",
);
}
markProcessing(); markProcessing();
try { try {
@ -648,7 +738,10 @@ export async function dispatchReplyFromConfig(params: {
const counts = dispatcher.getQueuedCounts(); const counts = dispatcher.getQueuedCounts();
counts.final += routedFinalCount; counts.final += routedFinalCount;
recordProcessed("completed"); recordProcessed(
"completed",
pluginFallbackReason ? { reason: pluginFallbackReason } : undefined,
);
markIdle("message_completed"); markIdle("message_completed");
return { queuedFinal, counts }; return { queuedFinal, counts };
} catch (err) { } catch (err) {

View File

@ -139,26 +139,31 @@ describe("registerPluginCommand", () => {
}); });
it("does not expose binding APIs to plugin commands on unsupported channels", async () => { it("does not expose binding APIs to plugin commands on unsupported channels", async () => {
const handler = async (ctx: {
requestConversationBinding: (params: { summary: string }) => Promise<unknown>;
getCurrentConversationBinding: () => Promise<unknown>;
detachConversationBinding: () => Promise<unknown>;
}) => {
const requested = await ctx.requestConversationBinding({
summary: "Bind this conversation.",
});
const current = await ctx.getCurrentConversationBinding();
const detached = await ctx.detachConversationBinding();
return {
text: JSON.stringify({
requested,
current,
detached,
}),
};
};
registerPluginCommand( registerPluginCommand(
"demo-plugin", "demo-plugin",
{ {
name: "bindcheck", name: "bindcheck",
description: "Demo command", description: "Demo command",
acceptsArgs: false, acceptsArgs: false,
handler: async (ctx) => { handler,
const requested = await ctx.requestConversationBinding({
summary: "Bind this conversation.",
});
const current = await ctx.getCurrentConversationBinding();
const detached = await ctx.detachConversationBinding();
return {
text: JSON.stringify({
requested,
current,
detached,
}),
};
},
}, },
{ pluginRoot: "/plugins/demo-plugin" }, { pluginRoot: "/plugins/demo-plugin" },
); );
@ -168,20 +173,7 @@ describe("registerPluginCommand", () => {
name: "bindcheck", name: "bindcheck",
description: "Demo command", description: "Demo command",
acceptsArgs: false, acceptsArgs: false,
handler: async (ctx) => { handler,
const requested = await ctx.requestConversationBinding({
summary: "Bind this conversation.",
});
const current = await ctx.getCurrentConversationBinding();
const detached = await ctx.detachConversationBinding();
return {
text: JSON.stringify({
requested,
current,
detached,
}),
};
},
pluginId: "demo-plugin", pluginId: "demo-plugin",
pluginRoot: "/plugins/demo-plugin", pluginRoot: "/plugins/demo-plugin",
}, },

View File

@ -294,6 +294,54 @@ describe("plugin conversation binding approvals", () => {
expect(samePluginNewPath.status).toBe("pending"); expect(samePluginNewPath.status).toBe("pending");
}); });
it("persists detachHint on approved plugin bindings", async () => {
const request = await requestPluginConversationBinding({
pluginId: "codex",
pluginName: "Codex App Server",
pluginRoot: "/plugins/codex-a",
requestedBySenderId: "user-1",
conversation: {
channel: "discord",
accountId: "isolated",
conversationId: "channel:detach-hint",
},
binding: {
summary: "Bind this conversation to Codex thread 999.",
detachHint: "/codex_detach",
},
});
expect(["pending", "bound"]).toContain(request.status);
if (request.status === "pending") {
const approved = await resolvePluginConversationBindingApproval({
approvalId: request.approvalId,
decision: "allow-once",
senderId: "user-1",
});
expect(approved.status).toBe("approved");
if (approved.status !== "approved") {
throw new Error("expected approved bind request");
}
expect(approved.binding.detachHint).toBe("/codex_detach");
} else {
expect(request.binding.detachHint).toBe("/codex_detach");
}
const currentBinding = await getCurrentPluginConversationBinding({
pluginRoot: "/plugins/codex-a",
conversation: {
channel: "discord",
accountId: "isolated",
conversationId: "channel:detach-hint",
},
});
expect(currentBinding?.detachHint).toBe("/codex_detach");
});
it("returns and detaches only bindings owned by the requesting plugin root", async () => { it("returns and detaches only bindings owned by the requesting plugin root", async () => {
const request = await requestPluginConversationBinding({ const request = await requestPluginConversationBinding({
pluginId: "codex", pluginId: "codex",

View File

@ -61,6 +61,7 @@ type PendingPluginBindingRequest = {
requestedAt: number; requestedAt: number;
requestedBySenderId?: string; requestedBySenderId?: string;
summary?: string; summary?: string;
detachHint?: string;
}; };
type PluginBindingApprovalAction = { type PluginBindingApprovalAction = {
@ -80,6 +81,7 @@ type PluginBindingMetadata = {
pluginName?: string; pluginName?: string;
pluginRoot: string; pluginRoot: string;
summary?: string; summary?: string;
detachHint?: string;
}; };
type PluginBindingResolveResult = type PluginBindingResolveResult =
@ -99,9 +101,24 @@ type PluginBindingResolveResult =
const pendingRequests = new Map<string, PendingPluginBindingRequest>(); const pendingRequests = new Map<string, PendingPluginBindingRequest>();
type PluginBindingGlobalState = {
fallbackNoticeBindingIds: Set<string>;
};
const pluginBindingGlobalStateKey = Symbol.for("openclaw.plugins.binding.global-state");
let approvalsCache: PluginBindingApprovalsFile | null = null; let approvalsCache: PluginBindingApprovalsFile | null = null;
let approvalsLoaded = false; let approvalsLoaded = false;
function getPluginBindingGlobalState(): PluginBindingGlobalState {
const globalStore = globalThis as typeof globalThis & {
[pluginBindingGlobalStateKey]?: PluginBindingGlobalState;
};
return (globalStore[pluginBindingGlobalStateKey] ??= {
fallbackNoticeBindingIds: new Set<string>(),
});
}
class PluginBindingApprovalButton extends Button { class PluginBindingApprovalButton extends Button {
customId: string; customId: string;
label: string; label: string;
@ -369,6 +386,7 @@ function buildBindingMetadata(params: {
pluginName?: string; pluginName?: string;
pluginRoot: string; pluginRoot: string;
summary?: string; summary?: string;
detachHint?: string;
}): PluginBindingMetadata { }): PluginBindingMetadata {
return { return {
pluginBindingOwner: PLUGIN_BINDING_OWNER, pluginBindingOwner: PLUGIN_BINDING_OWNER,
@ -376,6 +394,7 @@ function buildBindingMetadata(params: {
pluginName: params.pluginName, pluginName: params.pluginName,
pluginRoot: params.pluginRoot, pluginRoot: params.pluginRoot,
summary: params.summary?.trim() || undefined, summary: params.summary?.trim() || undefined,
detachHint: params.detachHint?.trim() || undefined,
}; };
} }
@ -428,6 +447,7 @@ export function toPluginConversationBinding(
parentConversationId: record.conversation.parentConversationId, parentConversationId: record.conversation.parentConversationId,
boundAt: record.boundAt, boundAt: record.boundAt,
summary: metadata.summary, summary: metadata.summary,
detachHint: metadata.detachHint,
}; };
} }
@ -435,6 +455,7 @@ async function bindConversationNow(params: {
identity: PluginBindingIdentity; identity: PluginBindingIdentity;
conversation: PluginBindingConversation; conversation: PluginBindingConversation;
summary?: string; summary?: string;
detachHint?: string;
}): Promise<PluginConversationBinding> { }): Promise<PluginConversationBinding> {
const ref = toConversationRef(params.conversation); const ref = toConversationRef(params.conversation);
const targetSessionKey = buildPluginBindingSessionKey({ const targetSessionKey = buildPluginBindingSessionKey({
@ -453,6 +474,7 @@ async function bindConversationNow(params: {
pluginName: params.identity.pluginName, pluginName: params.identity.pluginName,
pluginRoot: params.identity.pluginRoot, pluginRoot: params.identity.pluginRoot,
summary: params.summary, summary: params.summary,
detachHint: params.detachHint,
}), }),
}); });
const binding = toPluginConversationBinding(record); const binding = toPluginConversationBinding(record);
@ -482,6 +504,46 @@ function buildApprovalMessage(request: PendingPluginBindingRequest): string {
return lines.join("\n"); return lines.join("\n");
} }
function resolvePluginBindingDisplayName(binding: {
pluginId: string;
pluginName?: string;
}): string {
return binding.pluginName?.trim() || binding.pluginId;
}
function buildDetachHintSuffix(detachHint?: string): string {
const trimmed = detachHint?.trim();
return trimmed ? ` To detach this conversation, use ${trimmed}.` : "";
}
export function buildPluginBindingUnavailableText(binding: PluginConversationBinding): string {
return `The bound plugin ${resolvePluginBindingDisplayName(binding)} is not currently loaded. Routing this message to OpenClaw instead.${buildDetachHintSuffix(binding.detachHint)}`;
}
export function buildPluginBindingDeclinedText(binding: PluginConversationBinding): string {
return `The bound plugin ${resolvePluginBindingDisplayName(binding)} did not handle this message. This conversation is still bound to that plugin.${buildDetachHintSuffix(binding.detachHint)}`;
}
export function buildPluginBindingErrorText(binding: PluginConversationBinding): string {
return `The bound plugin ${resolvePluginBindingDisplayName(binding)} hit an error handling this message. This conversation is still bound to that plugin.${buildDetachHintSuffix(binding.detachHint)}`;
}
export function hasShownPluginBindingFallbackNotice(bindingId: string): boolean {
const normalized = bindingId.trim();
if (!normalized) {
return false;
}
return getPluginBindingGlobalState().fallbackNoticeBindingIds.has(normalized);
}
export function markPluginBindingFallbackNoticeShown(bindingId: string): void {
const normalized = bindingId.trim();
if (!normalized) {
return;
}
getPluginBindingGlobalState().fallbackNoticeBindingIds.add(normalized);
}
function buildPendingReply(request: PendingPluginBindingRequest): ReplyPayload { function buildPendingReply(request: PendingPluginBindingRequest): ReplyPayload {
return { return {
text: buildApprovalMessage(request), text: buildApprovalMessage(request),
@ -594,6 +656,7 @@ export async function requestPluginConversationBinding(params: {
}, },
conversation, conversation,
summary: params.binding?.summary, summary: params.binding?.summary,
detachHint: params.binding?.detachHint,
}); });
log.info( log.info(
`plugin binding auto-refresh plugin=${params.pluginId} root=${params.pluginRoot} channel=${ref.channel} account=${ref.accountId} conversation=${ref.conversationId}`, `plugin binding auto-refresh plugin=${params.pluginId} root=${params.pluginRoot} channel=${ref.channel} account=${ref.accountId} conversation=${ref.conversationId}`,
@ -616,6 +679,7 @@ export async function requestPluginConversationBinding(params: {
}, },
conversation, conversation,
summary: params.binding?.summary, summary: params.binding?.summary,
detachHint: params.binding?.detachHint,
}); });
log.info( log.info(
`plugin binding auto-approved plugin=${params.pluginId} root=${params.pluginRoot} channel=${ref.channel} account=${ref.accountId} conversation=${ref.conversationId}`, `plugin binding auto-approved plugin=${params.pluginId} root=${params.pluginRoot} channel=${ref.channel} account=${ref.accountId} conversation=${ref.conversationId}`,
@ -632,6 +696,7 @@ export async function requestPluginConversationBinding(params: {
requestedAt: Date.now(), requestedAt: Date.now(),
requestedBySenderId: params.requestedBySenderId?.trim() || undefined, requestedBySenderId: params.requestedBySenderId?.trim() || undefined,
summary: params.binding?.summary?.trim() || undefined, summary: params.binding?.summary?.trim() || undefined,
detachHint: params.binding?.detachHint?.trim() || undefined,
}; };
pendingRequests.set(request.id, request); pendingRequests.set(request.id, request);
log.info( log.info(
@ -723,6 +788,7 @@ export async function resolvePluginConversationBindingApproval(params: {
}, },
conversation: request.conversation, conversation: request.conversation,
summary: request.summary, summary: request.summary,
detachHint: request.detachHint,
}); });
log.info( log.info(
`plugin binding approved plugin=${request.pluginId} root=${request.pluginRoot} decision=${params.decision} channel=${request.conversation.channel} account=${request.conversation.accountId} conversation=${request.conversation.conversationId}`, `plugin binding approved plugin=${request.pluginId} root=${request.pluginRoot} decision=${params.decision} channel=${request.conversation.channel} account=${request.conversation.accountId} conversation=${request.conversation.conversationId}`,
@ -754,5 +820,6 @@ export const __testing = {
pendingRequests.clear(); pendingRequests.clear();
approvalsCache = null; approvalsCache = null;
approvalsLoaded = false; approvalsLoaded = false;
getPluginBindingGlobalState().fallbackNoticeBindingIds.clear();
}, },
}; };

View File

@ -5,6 +5,27 @@ export function createMockPluginRegistry(
hooks: Array<{ hookName: string; handler: (...args: unknown[]) => unknown }>, hooks: Array<{ hookName: string; handler: (...args: unknown[]) => unknown }>,
): PluginRegistry { ): PluginRegistry {
return { return {
plugins: [
{
id: "test-plugin",
name: "Test Plugin",
source: "test",
origin: "workspace",
enabled: true,
status: "loaded",
toolNames: [],
hookNames: [],
channelIds: [],
providerIds: [],
gatewayMethods: [],
cliCommands: [],
services: [],
commands: [],
httpRoutes: 0,
hookCount: hooks.length,
configSchema: false,
},
],
hooks: hooks as never[], hooks: hooks as never[],
typedHooks: hooks.map((h) => ({ typedHooks: hooks.map((h) => ({
pluginId: "test-plugin", pluginId: "test-plugin",

View File

@ -114,6 +114,25 @@ export type HookRunnerOptions = {
catchErrors?: boolean; catchErrors?: boolean;
}; };
export type PluginTargetedInboundClaimOutcome =
| {
status: "handled";
result: PluginHookInboundClaimResult;
}
| {
status: "missing_plugin";
}
| {
status: "no_handler";
}
| {
status: "declined";
}
| {
status: "error";
error: string;
};
/** /**
* Get hooks for a specific hook name, sorted by priority (higher first). * Get hooks for a specific hook name, sorted by priority (higher first).
*/ */
@ -210,6 +229,12 @@ export function createHookRunner(registry: PluginRegistry, options: HookRunnerOp
throw new Error(msg, { cause: params.error }); throw new Error(msg, { cause: params.error });
}; };
const sanitizeHookError = (error: unknown): string => {
const raw = error instanceof Error ? error.message : String(error);
const firstLine = raw.split("\n")[0]?.trim();
return firstLine || "unknown error";
};
/** /**
* Run a hook that doesn't return a value (fire-and-forget style). * Run a hook that doesn't return a value (fire-and-forget style).
* All handlers are executed in parallel for performance. * All handlers are executed in parallel for performance.
@ -342,6 +367,58 @@ export function createHookRunner(registry: PluginRegistry, options: HookRunnerOp
return undefined; return undefined;
} }
async function runClaimingHookForPluginOutcome<
K extends PluginHookName,
TResult extends { handled: boolean },
>(
hookName: K,
pluginId: string,
event: Parameters<NonNullable<PluginHookRegistration<K>["handler"]>>[0],
ctx: Parameters<NonNullable<PluginHookRegistration<K>["handler"]>>[1],
): Promise<
| { status: "handled"; result: TResult }
| { status: "missing_plugin" }
| { status: "no_handler" }
| { status: "declined" }
| { status: "error"; error: string }
> {
const pluginLoaded = registry.plugins.some(
(plugin) => plugin.id === pluginId && plugin.status === "loaded",
);
if (!pluginLoaded) {
return { status: "missing_plugin" };
}
const hooks = getHooksForNameAndPlugin(registry, hookName, pluginId);
if (hooks.length === 0) {
return { status: "no_handler" };
}
logger?.debug?.(
`[hooks] running ${hookName} for ${pluginId} (${hooks.length} handlers, targeted outcome)`,
);
let firstError: string | null = null;
for (const hook of hooks) {
try {
const handlerResult = await (
hook.handler as (event: unknown, ctx: unknown) => Promise<TResult | void>
)(event, ctx);
if (handlerResult?.handled) {
return { status: "handled", result: handlerResult };
}
} catch (err) {
firstError ??= sanitizeHookError(err);
handleHookError({ hookName, pluginId: hook.pluginId, error: err });
}
}
if (firstError) {
return { status: "error", error: firstError };
}
return { status: "declined" };
}
// ========================================================================= // =========================================================================
// Agent Hooks // Agent Hooks
// ========================================================================= // =========================================================================
@ -491,6 +568,19 @@ export function createHookRunner(registry: PluginRegistry, options: HookRunnerOp
); );
} }
async function runInboundClaimForPluginOutcome(
pluginId: string,
event: PluginHookInboundClaimEvent,
ctx: PluginHookInboundClaimContext,
): Promise<PluginTargetedInboundClaimOutcome> {
return runClaimingHookForPluginOutcome<"inbound_claim", PluginHookInboundClaimResult>(
"inbound_claim",
pluginId,
event,
ctx,
);
}
/** /**
* Run message_received hook. * Run message_received hook.
* Runs in parallel (fire-and-forget). * Runs in parallel (fire-and-forget).
@ -843,6 +933,7 @@ export function createHookRunner(registry: PluginRegistry, options: HookRunnerOp
// Message hooks // Message hooks
runInboundClaim, runInboundClaim,
runInboundClaimForPlugin, runInboundClaimForPlugin,
runInboundClaimForPluginOutcome,
runMessageReceived, runMessageReceived,
runMessageSending, runMessageSending,
runMessageSent, runMessageSent,

View File

@ -279,6 +279,7 @@ export type PluginCommandContext = {
export type PluginConversationBindingRequestParams = { export type PluginConversationBindingRequestParams = {
summary?: string; summary?: string;
detachHint?: string;
}; };
export type PluginConversationBinding = { export type PluginConversationBinding = {
@ -293,6 +294,7 @@ export type PluginConversationBinding = {
threadId?: string | number; threadId?: string | number;
boundAt: number; boundAt: number;
summary?: string; summary?: string;
detachHint?: string;
}; };
export type PluginConversationBindingRequestResult = export type PluginConversationBindingRequestResult =

View File

@ -97,4 +97,79 @@ describe("inbound_claim hook runner", () => {
expect(first).toHaveBeenCalledTimes(1); expect(first).toHaveBeenCalledTimes(1);
expect(second).not.toHaveBeenCalled(); expect(second).not.toHaveBeenCalled();
}); });
it("reports missing_plugin when the bound plugin is not loaded", async () => {
const registry = createMockPluginRegistry([]);
registry.plugins = [];
const runner = createHookRunner(registry);
const result = await runner.runInboundClaimForPluginOutcome(
"missing-plugin",
{
content: "who are you",
channel: "discord",
accountId: "default",
conversationId: "channel:1",
isGroup: true,
},
{
channelId: "discord",
accountId: "default",
conversationId: "channel:1",
},
);
expect(result).toEqual({ status: "missing_plugin" });
});
it("reports no_handler when the plugin is loaded but has no targeted hooks", async () => {
const registry = createMockPluginRegistry([]);
const runner = createHookRunner(registry);
const result = await runner.runInboundClaimForPluginOutcome(
"test-plugin",
{
content: "who are you",
channel: "discord",
accountId: "default",
conversationId: "channel:1",
isGroup: true,
},
{
channelId: "discord",
accountId: "default",
conversationId: "channel:1",
},
);
expect(result).toEqual({ status: "no_handler" });
});
it("reports error when a targeted handler throws and none claim the event", async () => {
const logger = {
warn: vi.fn(),
error: vi.fn(),
};
const failing = vi.fn().mockRejectedValue(new Error("boom"));
const registry = createMockPluginRegistry([{ hookName: "inbound_claim", handler: failing }]);
const runner = createHookRunner(registry, { logger });
const result = await runner.runInboundClaimForPluginOutcome(
"test-plugin",
{
content: "who are you",
channel: "discord",
accountId: "default",
conversationId: "channel:1",
isGroup: true,
},
{
channelId: "discord",
accountId: "default",
conversationId: "channel:1",
},
);
expect(result).toEqual({ status: "error", error: "boom" });
});
}); });