Discord: restore plugin bindings after restart

This commit is contained in:
huntharo 2026-03-15 09:04:40 -04:00 committed by Vincent Koc
parent eb4e96573a
commit 7ef75b8779
6 changed files with 144 additions and 46 deletions

View File

@ -90,6 +90,20 @@ function createThreadClient(params: { threadId: string; parentId: string }): Dis
} as unknown as DiscordClient;
}
function createDmClient(channelId: string): DiscordClient {
return {
fetchChannel: async (id: string) => {
if (id === channelId) {
return {
id: channelId,
type: ChannelType.DM,
};
}
return null;
},
} as unknown as DiscordClient;
}
async function runThreadBoundPreflight(params: {
threadId: string;
parentId: string;
@ -157,6 +171,25 @@ async function runGuildPreflight(params: {
});
}
async function runDmPreflight(params: {
channelId: string;
message: import("@buape/carbon").Message;
discordConfig: DiscordConfig;
}) {
return preflightDiscordMessage({
...createPreflightArgs({
cfg: DEFAULT_PREFLIGHT_CFG,
discordConfig: params.discordConfig,
data: {
channel_id: params.channelId,
author: params.message.author,
message: params.message,
} as DiscordMessageEvent,
client: createDmClient(params.channelId),
}),
});
}
async function runMentionOnlyBotPreflight(params: {
channelId: string;
guildId: string;
@ -258,6 +291,60 @@ describe("preflightDiscordMessage", () => {
expect(result).toBeNull();
});
it("restores direct-message bindings by user target instead of DM channel id", async () => {
registerSessionBindingAdapter({
channel: "discord",
accountId: "default",
listBySession: () => [],
resolveByConversation: (ref) =>
ref.conversationId === "user:user-1"
? createThreadBinding({
conversation: {
channel: "discord",
accountId: "default",
conversationId: "user:user-1",
},
metadata: {
pluginBindingOwner: "plugin",
pluginId: "openclaw-codex-app-server",
pluginRoot: "/Users/huntharo/github/openclaw-app-server",
},
})
: null,
});
const result = await runDmPreflight({
channelId: "dm-channel-1",
message: createDiscordMessage({
id: "m-dm-1",
channelId: "dm-channel-1",
content: "who are you",
author: {
id: "user-1",
bot: false,
username: "alice",
},
}),
discordConfig: {
allowBots: true,
dmPolicy: "open",
} as DiscordConfig,
});
expect(result).not.toBeNull();
expect(result?.threadBinding).toMatchObject({
conversation: {
channel: "discord",
accountId: "default",
conversationId: "user:user-1",
},
metadata: {
pluginBindingOwner: "plugin",
pluginId: "openclaw-codex-app-server",
},
});
});
it("keeps bound-thread regular bot messages flowing when allowBots=true", async () => {
const threadBinding = createThreadBinding({
targetKind: "session",

View File

@ -351,12 +351,13 @@ export async function preflightDiscordMessage(
}),
parentConversationId: earlyThreadParentId,
});
const bindingConversationId = isDirectMessage ? `user:${author.id}` : messageChannelId;
let threadBinding: SessionBindingRecord | undefined;
threadBinding =
getSessionBindingService().resolveByConversation({
channel: "discord",
accountId: params.accountId,
conversationId: messageChannelId,
conversationId: bindingConversationId,
parentConversationId: earlyThreadParentId,
}) ?? undefined;
const configuredRoute =

View File

@ -1000,6 +1000,47 @@ describe("thread binding lifecycle", () => {
expect(manager.getByThreadId("thread-acp-uncertain")).toBeDefined();
});
it("does not reconcile plugin-owned direct bindings as stale ACP sessions", async () => {
const manager = createThreadBindingManager({
accountId: "default",
persist: false,
enableSweeper: false,
idleTimeoutMs: 24 * 60 * 60 * 1000,
maxAgeMs: 0,
});
await manager.bindTarget({
threadId: "user:1177378744822943744",
channelId: "user:1177378744822943744",
targetKind: "acp",
targetSessionKey: "plugin-binding:openclaw-codex-app-server:dm",
agentId: "codex",
metadata: {
pluginBindingOwner: "plugin",
pluginId: "openclaw-codex-app-server",
pluginRoot: "/Users/huntharo/github/openclaw-app-server",
},
});
hoisted.readAcpSessionEntry.mockReturnValue(null);
const result = await reconcileAcpThreadBindingsOnStartup({
cfg: {} as OpenClawConfig,
accountId: "default",
});
expect(result.checked).toBe(0);
expect(result.removed).toBe(0);
expect(result.staleSessionKeys).toEqual([]);
expect(manager.getByThreadId("user:1177378744822943744")).toMatchObject({
threadId: "user:1177378744822943744",
metadata: {
pluginBindingOwner: "plugin",
pluginId: "openclaw-codex-app-server",
},
});
});
it("removes ACP bindings when health probe marks running session as stale", async () => {
const manager = createThreadBindingManager({
accountId: "default",

View File

@ -323,7 +323,12 @@ export async function reconcileAcpThreadBindingsOnStartup(params: {
};
}
const acpBindings = manager.listBindings().filter((binding) => binding.targetKind === "acp");
const acpBindings = manager.listBindings().filter((binding) => {
if (binding.targetKind !== "acp") {
return false;
}
return binding.metadata?.pluginBindingOwner !== "plugin";
});
const staleBindings: ThreadBindingRecord[] = [];
const probeTargets: Array<{
binding: ThreadBindingRecord;

View File

@ -1879,7 +1879,7 @@ describe("dispatchReplyFromConfig", () => {
);
});
it("lets a plugin claim inbound traffic before core commands and agent dispatch", async () => {
it("does not broadcast inbound claims without a core-owned plugin binding", async () => {
setNoAbort();
hookMocks.runner.hasHooks.mockImplementation(
((hookName?: string) =>
@ -1906,31 +1906,12 @@ describe("dispatchReplyFromConfig", () => {
MessageSid: "msg-claim-1",
SessionKey: "agent:main:telegram:group:-10099:77",
});
const replyResolver = vi.fn(async () => ({ text: "should not run" }) satisfies ReplyPayload);
const replyResolver = vi.fn(async () => ({ text: "core reply" }) satisfies ReplyPayload);
const result = await dispatchReplyFromConfig({ ctx, cfg, dispatcher, replyResolver });
expect(result).toEqual({ queuedFinal: false, counts: { tool: 0, block: 0, final: 0 } });
expect(hookMocks.runner.runInboundClaim).toHaveBeenCalledWith(
expect.objectContaining({
content: "who are you",
channel: "telegram",
accountId: "default",
conversationId: "-10099:topic:77",
parentConversationId: "-10099",
senderId: "user-9",
commandAuthorized: true,
wasMentioned: true,
}),
expect.objectContaining({
channelId: "telegram",
accountId: "default",
conversationId: "-10099:topic:77",
parentConversationId: "-10099",
senderId: "user-9",
messageId: "msg-claim-1",
}),
);
expect(result).toEqual({ queuedFinal: true, counts: { tool: 0, block: 0, final: 0 } });
expect(hookMocks.runner.runInboundClaim).not.toHaveBeenCalled();
expect(hookMocks.runner.runMessageReceived).toHaveBeenCalledWith(
expect.objectContaining({
from: ctx.From,
@ -1957,8 +1938,10 @@ describe("dispatchReplyFromConfig", () => {
sessionKey: "agent:main:telegram:group:-10099:77",
}),
);
expect(replyResolver).not.toHaveBeenCalled();
expect(dispatcher.sendFinalReply).not.toHaveBeenCalled();
expect(replyResolver).toHaveBeenCalledTimes(1);
expect(dispatcher.sendFinalReply).toHaveBeenCalledWith(
expect.objectContaining({ text: "core reply" }),
);
});
it("emits internal message:received hook when a session key is available", async () => {

View File

@ -233,19 +233,6 @@ export async function dispatchReplyFromConfig(params: {
return { queuedFinal: false, counts: dispatcher.getQueuedCounts() };
}
let pluginClaimedInbound = false;
if (hookRunner?.hasHooks("inbound_claim")) {
const inboundClaim = await hookRunner.runInboundClaim(
toPluginInboundClaimEvent(hookContext, {
commandAuthorized:
typeof ctx.CommandAuthorized === "boolean" ? ctx.CommandAuthorized : undefined,
wasMentioned: typeof ctx.WasMentioned === "boolean" ? ctx.WasMentioned : undefined,
}),
inboundClaimContext,
);
pluginClaimedInbound = inboundClaim?.handled === true;
}
// Trigger plugin hooks (fire-and-forget)
if (hookRunner?.hasHooks("message_received")) {
fireAndForgetHook(
@ -270,12 +257,6 @@ export async function dispatchReplyFromConfig(params: {
);
}
if (pluginClaimedInbound) {
markIdle("plugin_claim");
recordProcessed("completed", { reason: "plugin-claimed" });
return { queuedFinal: false, counts: dispatcher.getQueuedCounts() };
}
// Check if we should route replies to originating channel instead of dispatcher.
// Only route when the originating channel is DIFFERENT from the current surface.
// This handles cross-provider routing (e.g., message from Telegram being processed