From 7ef75b87797281e77efc3f6bdd0a195c284781f6 Mon Sep 17 00:00:00 2001 From: huntharo Date: Sun, 15 Mar 2026 09:04:40 -0400 Subject: [PATCH] Discord: restore plugin bindings after restart --- .../monitor/message-handler.preflight.test.ts | 87 +++++++++++++++++++ .../src/monitor/message-handler.preflight.ts | 3 +- .../monitor/thread-bindings.lifecycle.test.ts | 41 +++++++++ .../src/monitor/thread-bindings.lifecycle.ts | 7 +- .../reply/dispatch-from-config.test.ts | 33 ++----- src/auto-reply/reply/dispatch-from-config.ts | 19 ---- 6 files changed, 144 insertions(+), 46 deletions(-) diff --git a/extensions/discord/src/monitor/message-handler.preflight.test.ts b/extensions/discord/src/monitor/message-handler.preflight.test.ts index a7a5ff2f6ef..2fb14bafe8e 100644 --- a/extensions/discord/src/monitor/message-handler.preflight.test.ts +++ b/extensions/discord/src/monitor/message-handler.preflight.test.ts @@ -90,6 +90,20 @@ function createThreadClient(params: { threadId: string; parentId: string }): Dis } as unknown as DiscordClient; } +function createDmClient(channelId: string): DiscordClient { + return { + fetchChannel: async (id: string) => { + if (id === channelId) { + return { + id: channelId, + type: ChannelType.DM, + }; + } + return null; + }, + } as unknown as DiscordClient; +} + async function runThreadBoundPreflight(params: { threadId: string; parentId: string; @@ -157,6 +171,25 @@ async function runGuildPreflight(params: { }); } +async function runDmPreflight(params: { + channelId: string; + message: import("@buape/carbon").Message; + discordConfig: DiscordConfig; +}) { + return preflightDiscordMessage({ + ...createPreflightArgs({ + cfg: DEFAULT_PREFLIGHT_CFG, + discordConfig: params.discordConfig, + data: { + channel_id: params.channelId, + author: params.message.author, + message: params.message, + } as DiscordMessageEvent, + client: createDmClient(params.channelId), + }), + }); +} + async function runMentionOnlyBotPreflight(params: { channelId: string; guildId: string; @@ -258,6 +291,60 @@ describe("preflightDiscordMessage", () => { expect(result).toBeNull(); }); + it("restores direct-message bindings by user target instead of DM channel id", async () => { + registerSessionBindingAdapter({ + channel: "discord", + accountId: "default", + listBySession: () => [], + resolveByConversation: (ref) => + ref.conversationId === "user:user-1" + ? createThreadBinding({ + conversation: { + channel: "discord", + accountId: "default", + conversationId: "user:user-1", + }, + metadata: { + pluginBindingOwner: "plugin", + pluginId: "openclaw-codex-app-server", + pluginRoot: "/Users/huntharo/github/openclaw-app-server", + }, + }) + : null, + }); + + const result = await runDmPreflight({ + channelId: "dm-channel-1", + message: createDiscordMessage({ + id: "m-dm-1", + channelId: "dm-channel-1", + content: "who are you", + author: { + id: "user-1", + bot: false, + username: "alice", + }, + }), + discordConfig: { + allowBots: true, + dmPolicy: "open", + } as DiscordConfig, + }); + + expect(result).not.toBeNull(); + expect(result?.threadBinding).toMatchObject({ + conversation: { + channel: "discord", + accountId: "default", + conversationId: "user:user-1", + }, + metadata: { + pluginBindingOwner: "plugin", + pluginId: "openclaw-codex-app-server", + }, + }); + }); + it("keeps bound-thread regular bot messages flowing when allowBots=true", async () => { const threadBinding = createThreadBinding({ targetKind: "session", diff --git a/extensions/discord/src/monitor/message-handler.preflight.ts b/extensions/discord/src/monitor/message-handler.preflight.ts index fc8c5aa7b6e..bfefb6b17f5 100644 --- a/extensions/discord/src/monitor/message-handler.preflight.ts +++ b/extensions/discord/src/monitor/message-handler.preflight.ts @@ -351,12 +351,13 @@ export async function preflightDiscordMessage( }), parentConversationId: earlyThreadParentId, }); + const bindingConversationId = isDirectMessage ? `user:${author.id}` : messageChannelId; let threadBinding: SessionBindingRecord | undefined; threadBinding = getSessionBindingService().resolveByConversation({ channel: "discord", accountId: params.accountId, - conversationId: messageChannelId, + conversationId: bindingConversationId, parentConversationId: earlyThreadParentId, }) ?? undefined; const configuredRoute = diff --git a/extensions/discord/src/monitor/thread-bindings.lifecycle.test.ts b/extensions/discord/src/monitor/thread-bindings.lifecycle.test.ts index 3efd8c52584..ed221645fcf 100644 --- a/extensions/discord/src/monitor/thread-bindings.lifecycle.test.ts +++ b/extensions/discord/src/monitor/thread-bindings.lifecycle.test.ts @@ -1000,6 +1000,47 @@ describe("thread binding lifecycle", () => { expect(manager.getByThreadId("thread-acp-uncertain")).toBeDefined(); }); + it("does not reconcile plugin-owned direct bindings as stale ACP sessions", async () => { + const manager = createThreadBindingManager({ + accountId: "default", + persist: false, + enableSweeper: false, + idleTimeoutMs: 24 * 60 * 60 * 1000, + maxAgeMs: 0, + }); + + await manager.bindTarget({ + threadId: "user:1177378744822943744", + channelId: "user:1177378744822943744", + targetKind: "acp", + targetSessionKey: "plugin-binding:openclaw-codex-app-server:dm", + agentId: "codex", + metadata: { + pluginBindingOwner: "plugin", + pluginId: "openclaw-codex-app-server", + pluginRoot: "/Users/huntharo/github/openclaw-app-server", + }, + }); + + hoisted.readAcpSessionEntry.mockReturnValue(null); + + const result = await reconcileAcpThreadBindingsOnStartup({ + cfg: {} as OpenClawConfig, + accountId: "default", + }); + + expect(result.checked).toBe(0); + expect(result.removed).toBe(0); + expect(result.staleSessionKeys).toEqual([]); + expect(manager.getByThreadId("user:1177378744822943744")).toMatchObject({ + threadId: "user:1177378744822943744", + metadata: { + pluginBindingOwner: "plugin", + pluginId: "openclaw-codex-app-server", + }, + }); + }); + it("removes ACP bindings when health probe marks running session as stale", async () => { const manager = createThreadBindingManager({ accountId: "default", diff --git a/extensions/discord/src/monitor/thread-bindings.lifecycle.ts b/extensions/discord/src/monitor/thread-bindings.lifecycle.ts index d7389d68439..44373e03860 100644 --- a/extensions/discord/src/monitor/thread-bindings.lifecycle.ts +++ b/extensions/discord/src/monitor/thread-bindings.lifecycle.ts @@ -323,7 +323,12 @@ export async function reconcileAcpThreadBindingsOnStartup(params: { }; } - const acpBindings = manager.listBindings().filter((binding) => binding.targetKind === "acp"); + const acpBindings = manager.listBindings().filter((binding) => { + if (binding.targetKind !== "acp") { + return false; + } + return binding.metadata?.pluginBindingOwner !== "plugin"; + }); const staleBindings: ThreadBindingRecord[] = []; const probeTargets: Array<{ binding: ThreadBindingRecord; diff --git a/src/auto-reply/reply/dispatch-from-config.test.ts b/src/auto-reply/reply/dispatch-from-config.test.ts index 94c978a3904..b39e6762b6c 100644 --- a/src/auto-reply/reply/dispatch-from-config.test.ts +++ b/src/auto-reply/reply/dispatch-from-config.test.ts @@ -1879,7 +1879,7 @@ describe("dispatchReplyFromConfig", () => { ); }); - it("lets a plugin claim inbound traffic before core commands and agent dispatch", async () => { + it("does not broadcast inbound claims without a core-owned plugin binding", async () => { setNoAbort(); hookMocks.runner.hasHooks.mockImplementation( ((hookName?: string) => @@ -1906,31 +1906,12 @@ describe("dispatchReplyFromConfig", () => { MessageSid: "msg-claim-1", SessionKey: "agent:main:telegram:group:-10099:77", }); - const replyResolver = vi.fn(async () => ({ text: "should not run" }) satisfies ReplyPayload); + const replyResolver = vi.fn(async () => ({ text: "core reply" }) satisfies ReplyPayload); const result = await dispatchReplyFromConfig({ ctx, cfg, dispatcher, replyResolver }); - expect(result).toEqual({ queuedFinal: false, counts: { tool: 0, block: 0, final: 0 } }); - expect(hookMocks.runner.runInboundClaim).toHaveBeenCalledWith( - expect.objectContaining({ - content: "who are you", - channel: "telegram", - accountId: "default", - conversationId: "-10099:topic:77", - parentConversationId: "-10099", - senderId: "user-9", - commandAuthorized: true, - wasMentioned: true, - }), - expect.objectContaining({ - channelId: "telegram", - accountId: "default", - conversationId: "-10099:topic:77", - parentConversationId: "-10099", - senderId: "user-9", - messageId: "msg-claim-1", - }), - ); + expect(result).toEqual({ queuedFinal: true, counts: { tool: 0, block: 0, final: 0 } }); + expect(hookMocks.runner.runInboundClaim).not.toHaveBeenCalled(); expect(hookMocks.runner.runMessageReceived).toHaveBeenCalledWith( expect.objectContaining({ from: ctx.From, @@ -1957,8 +1938,10 @@ describe("dispatchReplyFromConfig", () => { sessionKey: "agent:main:telegram:group:-10099:77", }), ); - expect(replyResolver).not.toHaveBeenCalled(); - expect(dispatcher.sendFinalReply).not.toHaveBeenCalled(); + expect(replyResolver).toHaveBeenCalledTimes(1); + expect(dispatcher.sendFinalReply).toHaveBeenCalledWith( + expect.objectContaining({ text: "core reply" }), + ); }); it("emits internal message:received hook when a session key is available", async () => { diff --git a/src/auto-reply/reply/dispatch-from-config.ts b/src/auto-reply/reply/dispatch-from-config.ts index 45cb0cc44c1..a22e942a8b9 100644 --- a/src/auto-reply/reply/dispatch-from-config.ts +++ b/src/auto-reply/reply/dispatch-from-config.ts @@ -233,19 +233,6 @@ export async function dispatchReplyFromConfig(params: { return { queuedFinal: false, counts: dispatcher.getQueuedCounts() }; } - let pluginClaimedInbound = false; - if (hookRunner?.hasHooks("inbound_claim")) { - const inboundClaim = await hookRunner.runInboundClaim( - toPluginInboundClaimEvent(hookContext, { - commandAuthorized: - typeof ctx.CommandAuthorized === "boolean" ? ctx.CommandAuthorized : undefined, - wasMentioned: typeof ctx.WasMentioned === "boolean" ? ctx.WasMentioned : undefined, - }), - inboundClaimContext, - ); - pluginClaimedInbound = inboundClaim?.handled === true; - } - // Trigger plugin hooks (fire-and-forget) if (hookRunner?.hasHooks("message_received")) { fireAndForgetHook( @@ -270,12 +257,6 @@ export async function dispatchReplyFromConfig(params: { ); } - if (pluginClaimedInbound) { - markIdle("plugin_claim"); - recordProcessed("completed", { reason: "plugin-claimed" }); - return { queuedFinal: false, counts: dispatcher.getQueuedCounts() }; - } - // Check if we should route replies to originating channel instead of dispatcher. // Only route when the originating channel is DIFFERENT from the current surface. // This handles cross-provider routing (e.g., message from Telegram being processed