diff --git a/extensions/discord/src/accounts.ts b/extensions/discord/src/accounts.ts index a323120a787..56a2a46a53f 100644 --- a/extensions/discord/src/accounts.ts +++ b/extensions/discord/src/accounts.ts @@ -20,6 +20,39 @@ const { listAccountIds, resolveDefaultAccountId } = createAccountListHelpers("di export const listDiscordAccountIds = listAccountIds; export const resolveDefaultDiscordAccountId = resolveDefaultAccountId; +const managedDiscordAccountIdByBotUserId = new Map(); + +export function rememberDiscordManagedBotIdentity(params: { + botUserId: string; + accountId: string; +}): void { + managedDiscordAccountIdByBotUserId.set(params.botUserId, params.accountId); +} + +export function forgetDiscordManagedBotIdentity(params: { + botUserId?: string | null; + accountId?: string | null; +}): void { + if (!params.botUserId) { + return; + } + if ( + !params.accountId || + managedDiscordAccountIdByBotUserId.get(params.botUserId) === params.accountId + ) { + managedDiscordAccountIdByBotUserId.delete(params.botUserId); + } +} + +export function resolveDiscordManagedAccountIdByBotUserId( + botUserId?: string | null, +): string | undefined { + if (!botUserId) { + return undefined; + } + return managedDiscordAccountIdByBotUserId.get(botUserId); +} + export function resolveDiscordAccountConfig( cfg: OpenClawConfig, accountId: string, diff --git a/extensions/discord/src/monitor/inbound-worker.ts b/extensions/discord/src/monitor/inbound-worker.ts index c00b7dc1c1d..413de3d4b7c 100644 --- a/extensions/discord/src/monitor/inbound-worker.ts +++ b/extensions/discord/src/monitor/inbound-worker.ts @@ -18,6 +18,7 @@ type DiscordInboundWorkerParams = { export type DiscordInboundWorker = { enqueue: (job: DiscordInboundJob) => void; deactivate: () => void; + waitForIdle: () => Promise; }; function formatDiscordRunContextSuffix(job: DiscordInboundJob): string { @@ -101,5 +102,8 @@ export function createDiscordInboundWorker( }); }, deactivate: runState.deactivate, + waitForIdle: async () => { + await runQueue.waitForIdle(); + }, }; } diff --git a/extensions/discord/src/monitor/message-handler.process.ts b/extensions/discord/src/monitor/message-handler.process.ts index b381013349e..a6ba89402d6 100644 --- a/extensions/discord/src/monitor/message-handler.process.ts +++ b/extensions/discord/src/monitor/message-handler.process.ts @@ -36,7 +36,10 @@ import { danger, logVerbose, shouldLogVerbose } from "openclaw/plugin-sdk/runtim import { convertMarkdownTables } from "openclaw/plugin-sdk/text-runtime"; import { stripReasoningTagsFromText } from "openclaw/plugin-sdk/text-runtime"; import { truncateUtf16Safe } from "openclaw/plugin-sdk/text-runtime"; -import { resolveDiscordMaxLinesPerMessage } from "../accounts.js"; +import { + resolveDiscordManagedAccountIdByBotUserId, + resolveDiscordMaxLinesPerMessage, +} from "../accounts.js"; import { chunkDiscordTextWithMode } from "../chunk.js"; import { resolveDiscordDraftStreamingChunking } from "../draft-chunking.js"; import { createDiscordDraftStream } from "../draft-stream.js"; @@ -224,6 +227,7 @@ export async function processDiscordMessage(ctx: DiscordMessagePreflightContext) ? (sender.tag ?? sender.name ?? author.username) : author.username; const senderTag = sender.tag; + const senderManagedAccountId = resolveDiscordManagedAccountIdByBotUserId(sender.id); const { groupSystemPrompt, ownerAllowFrom, untrustedContext } = buildDiscordInboundAccessContext({ channelConfig, guildInfo, @@ -364,6 +368,7 @@ export async function processDiscordMessage(ctx: DiscordMessagePreflightContext) ChatType: isDirectMessage ? "direct" : "channel", ConversationLabel: fromLabel, SenderName: senderName, + SenderManagedAccountId: senderManagedAccountId, SenderId: sender.id, SenderUsername: senderUsername, SenderTag: senderTag, diff --git a/extensions/discord/src/monitor/message-handler.ts b/extensions/discord/src/monitor/message-handler.ts index e17dcc906af..31342b9b56d 100644 --- a/extensions/discord/src/monitor/message-handler.ts +++ b/extensions/discord/src/monitor/message-handler.ts @@ -28,6 +28,7 @@ type DiscordMessageHandlerParams = Omit< export type DiscordMessageHandlerWithLifecycle = DiscordMessageHandler & { deactivate: () => void; + waitForIdle: () => Promise; }; export function createDiscordMessageHandler( @@ -181,6 +182,7 @@ export function createDiscordMessageHandler( }; handler.deactivate = inboundWorker.deactivate; + handler.waitForIdle = inboundWorker.waitForIdle; return handler; } diff --git a/extensions/discord/src/monitor/provider.test.ts b/extensions/discord/src/monitor/provider.test.ts index ff6fb310464..9bdb4331d07 100644 --- a/extensions/discord/src/monitor/provider.test.ts +++ b/extensions/discord/src/monitor/provider.test.ts @@ -7,6 +7,7 @@ import { baseRuntime, getFirstDiscordMessageHandlerParams, getProviderMonitorTestMocks, + mockResolvedDiscordAccountConfig, resetDiscordProviderMonitorMocks, } from "../../../../test/helpers/extensions/discord-provider.test-support.js"; @@ -28,29 +29,16 @@ const { listSkillCommandsForAgentsMock, monitorLifecycleMock, reconcileAcpThreadBindingsOnStartupMock, + rememberDiscordManagedBotIdentityMock, resolveDiscordAllowlistConfigMock, resolveDiscordAccountMock, resolveNativeCommandsEnabledMock, resolveNativeSkillsEnabledMock, shouldLogVerboseMock, + forgetDiscordManagedBotIdentityMock, voiceRuntimeModuleLoadedMock, } = getProviderMonitorTestMocks(); -function createConfigWithDiscordAccount(overrides: Record = {}): OpenClawConfig { - return { - channels: { - discord: { - accounts: { - default: { - token: "MTIz.abc.def", - ...overrides, - }, - }, - }, - }, - } as OpenClawConfig; -} - vi.mock("openclaw/plugin-sdk/plugin-runtime", async () => { const actual = await vi.importActual( "openclaw/plugin-sdk/plugin-runtime", @@ -61,6 +49,23 @@ vi.mock("openclaw/plugin-sdk/plugin-runtime", async () => { }; }); +vi.mock("../accounts.js", () => ({ + forgetDiscordManagedBotIdentity: forgetDiscordManagedBotIdentityMock, + rememberDiscordManagedBotIdentity: rememberDiscordManagedBotIdentityMock, + resolveDiscordAccount: resolveDiscordAccountMock, +})); + +vi.mock("../probe.js", () => ({ + fetchDiscordApplicationId: async () => "app-1", +})); + +vi.mock("../token.js", () => ({ + normalizeDiscordToken: (value?: string) => value, +})); + +vi.mock("../voice/command.js", () => ({ + createDiscordVoiceCommand: () => ({ name: "voice-command" }), +})); vi.mock("../voice/manager.runtime.js", () => { voiceRuntimeModuleLoadedMock(); return { @@ -115,18 +120,7 @@ describe("monitorDiscordProvider", () => { }; beforeEach(() => { - vi.resetModules(); resetDiscordProviderMonitorMocks(); - vi.doMock("../accounts.js", () => ({ - resolveDiscordAccount: (...args: Parameters) => - resolveDiscordAccountMock(...args), - })); - vi.doMock("../probe.js", () => ({ - fetchDiscordApplicationId: async () => "app-1", - })); - vi.doMock("../token.js", () => ({ - normalizeDiscordToken: (value?: string) => value, - })); }); it("stops thread bindings when startup fails before lifecycle begins", async () => { @@ -175,7 +169,7 @@ describe("monitorDiscordProvider", () => { it("loads the Discord voice runtime only when voice is enabled", async () => { resolveDiscordAccountMock.mockReturnValue({ accountId: "default", - token: "MTIz.abc.def", + token: "cfg-token", config: { commands: { native: true, nativeSkills: false }, voice: { enabled: true }, @@ -392,19 +386,12 @@ describe("monitorDiscordProvider", () => { }); it("forwards custom eventQueue config from discord config to Carbon Client", async () => { - resolveDiscordAccountMock.mockReturnValue({ - accountId: "default", - token: "MTIz.abc.def", - config: { - commands: { native: true, nativeSkills: false }, - voice: { enabled: false }, - agentComponents: { enabled: false }, - execApprovals: { enabled: false }, - eventQueue: { listenerTimeout: 300_000 }, - }, - }); const { monitorDiscordProvider } = await import("./provider.js"); + mockResolvedDiscordAccountConfig({ + eventQueue: { listenerTimeout: 300_000 }, + }); + await monitorDiscordProvider({ config: baseConfig(), runtime: baseRuntime(), @@ -417,10 +404,12 @@ describe("monitorDiscordProvider", () => { it("does not reuse eventQueue.listenerTimeout as the queued inbound worker timeout", async () => { const { monitorDiscordProvider } = await import("./provider.js"); + mockResolvedDiscordAccountConfig({ + eventQueue: { listenerTimeout: 50_000 }, + }); + await monitorDiscordProvider({ - config: createConfigWithDiscordAccount({ - eventQueue: { listenerTimeout: 50_000 }, - }), + config: baseConfig(), runtime: baseRuntime(), }); @@ -433,19 +422,12 @@ describe("monitorDiscordProvider", () => { }); it("forwards inbound worker timeout config to the Discord message handler", async () => { - resolveDiscordAccountMock.mockReturnValue({ - accountId: "default", - token: "MTIz.abc.def", - config: { - commands: { native: true, nativeSkills: false }, - voice: { enabled: false }, - agentComponents: { enabled: false }, - execApprovals: { enabled: false }, - inboundWorker: { runTimeoutMs: 300_000 }, - }, - }); const { monitorDiscordProvider } = await import("./provider.js"); + mockResolvedDiscordAccountConfig({ + inboundWorker: { runTimeoutMs: 300_000 }, + }); + await monitorDiscordProvider({ config: baseConfig(), runtime: baseRuntime(), @@ -479,43 +461,6 @@ describe("monitorDiscordProvider", () => { expect(commandNames).toContain("cron_jobs"); }); - it("registers plugin commands from the real registry as native Discord commands", async () => { - const { clearPluginCommands, getPluginCommandSpecs, registerPluginCommand } = - await import("../../../../src/plugins/commands.js"); - clearPluginCommands(); - const { monitorDiscordProvider } = await import("./provider.js"); - listNativeCommandSpecsForConfigMock.mockReturnValue([ - { name: "status", description: "Status", acceptsArgs: false }, - ]); - getPluginCommandSpecsMock.mockImplementation((provider?: string) => - getPluginCommandSpecs(provider), - ); - - expect( - registerPluginCommand("demo-plugin", { - name: "pair", - description: "Pair device", - acceptsArgs: true, - requireAuth: false, - handler: async ({ args }) => ({ text: `paired:${args ?? ""}` }), - }), - ).toEqual({ ok: true }); - - await monitorDiscordProvider({ - config: baseConfig(), - runtime: baseRuntime(), - }); - - const commandNames = (createDiscordNativeCommandMock.mock.calls as Array) - .map((call) => (call[0] as { command?: { name?: string } } | undefined)?.command?.name) - .filter((value): value is string => typeof value === "string"); - - expect(commandNames).toContain("status"); - expect(commandNames).toContain("pair"); - expect(clientHandleDeployRequestMock).toHaveBeenCalledTimes(1); - expect(monitorLifecycleMock).toHaveBeenCalledTimes(1); - }); - it("continues startup when Discord daily slash-command create quota is exhausted", async () => { const { RateLimitError } = await import("@buape/carbon"); const { monitorDiscordProvider } = await import("./provider.js"); @@ -550,6 +495,85 @@ describe("monitorDiscordProvider", () => { ); }); + it("waits for inbound handler idle before forgetting managed bot identity", async () => { + const { monitorDiscordProvider } = await import("./provider.js"); + const deactivate = vi.fn(); + const waitForIdle = vi.fn(async () => undefined); + createDiscordMessageHandlerMock.mockImplementation(() => + Object.assign( + vi.fn(async () => undefined), + { + deactivate, + waitForIdle, + }, + ), + ); + + await monitorDiscordProvider({ + config: baseConfig(), + runtime: baseRuntime(), + }); + + expect(rememberDiscordManagedBotIdentityMock).toHaveBeenCalledWith({ + botUserId: "bot-1", + accountId: "default", + }); + expect(deactivate).toHaveBeenCalledTimes(1); + expect(waitForIdle).toHaveBeenCalledTimes(1); + expect(forgetDiscordManagedBotIdentityMock).toHaveBeenCalledWith({ + botUserId: "bot-1", + accountId: "default", + }); + expect(deactivate.mock.invocationCallOrder[0]).toBeLessThan( + waitForIdle.mock.invocationCallOrder[0], + ); + expect(waitForIdle.mock.invocationCallOrder[0]).toBeLessThan( + forgetDiscordManagedBotIdentityMock.mock.invocationCallOrder[0], + ); + }); + + it("bounds inbound handler idle wait during teardown", async () => { + vi.useFakeTimers(); + try { + const { monitorDiscordProvider } = await import("./provider.js"); + const deactivate = vi.fn(); + const waitForIdle = vi.fn(() => new Promise(() => undefined)); + createDiscordMessageHandlerMock.mockImplementation(() => + Object.assign( + vi.fn(async () => undefined), + { + deactivate, + waitForIdle, + }, + ), + ); + mockResolvedDiscordAccountConfig({ + inboundWorker: { runTimeoutMs: 5_000 }, + }); + const runtime = baseRuntime(); + + const monitorPromise = monitorDiscordProvider({ + config: baseConfig(), + runtime, + }); + + await vi.advanceTimersByTimeAsync(5_100); + await expect(monitorPromise).resolves.toBeUndefined(); + + expect(deactivate).toHaveBeenCalledTimes(1); + expect(waitForIdle).toHaveBeenCalledTimes(1); + expect(forgetDiscordManagedBotIdentityMock).toHaveBeenCalledWith({ + botUserId: "bot-1", + accountId: "default", + }); + expect(runtime.log).toHaveBeenCalledWith( + expect.stringContaining("inbound handler did not drain within 5000ms during teardown"), + ); + } finally { + vi.useRealTimers(); + } + }); + it("configures Carbon native deploy by default", async () => { const { monitorDiscordProvider } = await import("./provider.js"); @@ -581,50 +605,4 @@ describe("monitorDiscordProvider", () => { expect(connectedTrue).toBeDefined(); expect(connectedFalse).toBeDefined(); }); - - it("logs Discord startup phases and early gateway debug events", async () => { - const { monitorDiscordProvider } = await import("./provider.js"); - const runtime = baseRuntime(); - const emitter = new EventEmitter(); - const gateway = { emitter, isConnected: true, reconnectAttempts: 0 }; - clientGetPluginMock.mockImplementation((name: string) => - name === "gateway" ? gateway : undefined, - ); - clientFetchUserMock.mockImplementationOnce(async () => { - emitter.emit("debug", "WebSocket connection opened"); - return { id: "bot-1", username: "Molty" }; - }); - isVerboseMock.mockReturnValue(true); - - await monitorDiscordProvider({ - config: baseConfig(), - runtime, - }); - - const messages = vi.mocked(runtime.log).mock.calls.map((call) => String(call[0])); - expect(messages.some((msg) => msg.includes("fetch-application-id:start"))).toBe(true); - expect(messages.some((msg) => msg.includes("fetch-application-id:done"))).toBe(true); - expect(messages.some((msg) => msg.includes("deploy-commands:start"))).toBe(true); - expect(messages.some((msg) => msg.includes("deploy-commands:done"))).toBe(true); - expect(messages.some((msg) => msg.includes("fetch-bot-identity:start"))).toBe(true); - expect(messages.some((msg) => msg.includes("fetch-bot-identity:done"))).toBe(true); - expect( - messages.some( - (msg) => msg.includes("gateway-debug") && msg.includes("WebSocket connection opened"), - ), - ).toBe(true); - }); - - it("keeps Discord startup chatter quiet by default", async () => { - const { monitorDiscordProvider } = await import("./provider.js"); - const runtime = baseRuntime(); - - await monitorDiscordProvider({ - config: baseConfig(), - runtime, - }); - - const messages = vi.mocked(runtime.log).mock.calls.map((call) => String(call[0])); - expect(messages.some((msg) => msg.includes("discord startup ["))).toBe(false); - }); }); diff --git a/extensions/discord/src/monitor/provider.ts b/extensions/discord/src/monitor/provider.ts index 523f7c54c36..5b0a078e854 100644 --- a/extensions/discord/src/monitor/provider.ts +++ b/extensions/discord/src/monitor/provider.ts @@ -52,7 +52,11 @@ import { import { createSubsystemLogger } from "openclaw/plugin-sdk/runtime-env"; import { createNonExitingRuntime, type RuntimeEnv } from "openclaw/plugin-sdk/runtime-env"; import { summarizeStringEntries } from "openclaw/plugin-sdk/text-runtime"; -import { resolveDiscordAccount } from "../accounts.js"; +import { + forgetDiscordManagedBotIdentity, + rememberDiscordManagedBotIdentity, + resolveDiscordAccount, +} from "../accounts.js"; import { getDiscordGatewayEmitter } from "../monitor.gateway.js"; import { fetchDiscordApplicationId } from "../probe.js"; import { normalizeDiscordToken } from "../token.js"; @@ -99,6 +103,7 @@ import { reconcileAcpThreadBindingsOnStartup, } from "./thread-bindings.js"; import { formatThreadBindingDurationLabel } from "./thread-bindings.messages.js"; +import { normalizeDiscordInboundWorkerTimeoutMs } from "./timeouts.js"; export type MonitorDiscordOpts = { token?: string; @@ -161,6 +166,30 @@ function appendPluginCommandSpecs(params: { const DISCORD_ACP_STATUS_PROBE_TIMEOUT_MS = 8_000; const DISCORD_ACP_STALE_RUNNING_ACTIVITY_MS = 2 * 60 * 1000; +function withTimeout(promise: Promise, timeoutMs: number): Promise { + if (!timeoutMs || timeoutMs <= 0) { + return promise; + } + let timer: NodeJS.Timeout | null = null; + const timeout = new Promise((_, reject) => { + timer = setTimeout(() => reject(new Error("timeout")), timeoutMs); + }); + return Promise.race([promise, timeout]).finally(() => { + if (timer) { + clearTimeout(timer); + } + }); +} + +const DISCORD_MESSAGE_HANDLER_IDLE_TIMEOUT_CAP_MS = 15_000; + +function resolveDiscordMessageHandlerIdleTimeoutMs(raw: number | undefined): number { + const normalized = normalizeDiscordInboundWorkerTimeoutMs(raw); + if (!normalized) { + return DISCORD_MESSAGE_HANDLER_IDLE_TIMEOUT_CAP_MS; + } + return Math.min(normalized, DISCORD_MESSAGE_HANDLER_IDLE_TIMEOUT_CAP_MS); +} function isLegacyMissingSessionError(message: string): boolean { return ( @@ -631,9 +660,11 @@ export async function monitorDiscordProvider(opts: MonitorDiscordOpts = {}) { let lifecycleStarted = false; let releaseEarlyGatewayErrorGuard = () => {}; let deactivateMessageHandler: (() => void) | undefined; + let waitForMessageHandlerIdle: (() => Promise) | undefined; let autoPresenceController: ReturnType | null = null; let earlyGatewayEmitter: ReturnType | undefined; let onEarlyGatewayDebug: ((msg: unknown) => void) | undefined; + let botUserId: string | undefined; try { const commands: BaseCommand[] = commandSpecs.map((spec) => createDiscordNativeCommand({ @@ -827,7 +858,6 @@ export async function monitorDiscordProvider(opts: MonitorDiscordOpts = {}) { const logger = createSubsystemLogger("discord/monitor"); const guildHistories = new Map(); - let botUserId: string | undefined; let botUserName: string | undefined; let voiceManager: DiscordVoiceManager | null = null; @@ -864,6 +894,12 @@ export async function monitorDiscordProvider(opts: MonitorDiscordOpts = {}) { const botUser = await client.fetchUser("@me"); botUserId = botUser?.id; botUserName = botUser?.username?.trim() || botUser?.globalName?.trim() || undefined; + if (botUserId) { + rememberDiscordManagedBotIdentity({ + botUserId, + accountId: account.accountId, + }); + } logDiscordStartupPhase({ runtime, accountId: account.accountId, @@ -922,6 +958,7 @@ export async function monitorDiscordProvider(opts: MonitorDiscordOpts = {}) { discordRestFetch, }); deactivateMessageHandler = messageHandler.deactivate; + waitForMessageHandlerIdle = messageHandler.waitForIdle; const trackInboundEvent = opts.setStatus ? () => { const at = Date.now(); @@ -996,6 +1033,24 @@ export async function monitorDiscordProvider(opts: MonitorDiscordOpts = {}) { }); } finally { deactivateMessageHandler?.(); + if (waitForMessageHandlerIdle) { + const idleTimeoutMs = resolveDiscordMessageHandlerIdleTimeoutMs( + discordCfg.inboundWorker?.runTimeoutMs, + ); + try { + await withTimeout(waitForMessageHandlerIdle(), idleTimeoutMs); + } catch (error) { + const message = + error instanceof Error && error.message === "timeout" + ? `discord: inbound handler did not drain within ${idleTimeoutMs}ms during teardown; continuing shutdown` + : `discord: inbound handler idle wait failed during teardown: ${formatErrorMessage(error)}`; + runtime.log?.(warn(message)); + } + } + forgetDiscordManagedBotIdentity({ + botUserId, + accountId: account.accountId, + }); autoPresenceController?.stop(); opts.setStatus?.({ connected: false }); if (onEarlyGatewayDebug) { diff --git a/src/auto-reply/templating.ts b/src/auto-reply/templating.ts index 601fa6891bf..e1823c9ed46 100644 --- a/src/auto-reply/templating.ts +++ b/src/auto-reply/templating.ts @@ -123,6 +123,8 @@ export type MsgContext = { /** Explicit owner allowlist overrides (trusted, configuration-derived). */ OwnerAllowFrom?: Array; SenderName?: string; + /** Provider-managed account id when the inbound sender is one of our configured bot accounts. */ + SenderManagedAccountId?: string; SenderId?: string; SenderUsername?: string; SenderTag?: string; diff --git a/src/gateway/server-methods/config.ts b/src/gateway/server-methods/config.ts index 977a59f00b5..63b35726493 100644 --- a/src/gateway/server-methods/config.ts +++ b/src/gateway/server-methods/config.ts @@ -252,6 +252,7 @@ function loadSchemaWithPlugins(): ConfigSchemaResponse { runtimeOptions: { allowGatewaySubagentBinding: true, }, + inheritSharedRuntimeOptions: true, logger: { info: () => {}, warn: () => {}, diff --git a/src/hooks/message-hook-mappers.test.ts b/src/hooks/message-hook-mappers.test.ts index 53660054a15..2e26ff02584 100644 --- a/src/hooks/message-hook-mappers.test.ts +++ b/src/hooks/message-hook-mappers.test.ts @@ -171,6 +171,9 @@ describe("message hook mappers", () => { channelId: "telegram", accountId: "acc-1", messageId: "out-1", + threadId: "thread-1", + sessionKey: "agent:agent-1:telegram:chat:456", + agentId: "agent-1", isGroup: true, groupId: "telegram:chat:456", }); @@ -185,6 +188,17 @@ describe("message hook mappers", () => { content: "reply", success: false, error: "network error", + metadata: { + channel: "telegram", + accountId: "acc-1", + conversationId: "telegram:chat:456", + messageId: "out-1", + threadId: "thread-1", + sessionKey: "agent:agent-1:telegram:chat:456", + agentId: "agent-1", + isGroup: true, + groupId: "telegram:chat:456", + }, }); expect(toInternalMessageSentContext(canonical)).toEqual({ to: "telegram:chat:456", diff --git a/src/hooks/message-hook-mappers.ts b/src/hooks/message-hook-mappers.ts index 968a4d50719..680470c3d71 100644 --- a/src/hooks/message-hook-mappers.ts +++ b/src/hooks/message-hook-mappers.ts @@ -28,6 +28,7 @@ export type CanonicalInboundMessageHookContext = { messageId?: string; senderId?: string; senderName?: string; + senderManagedAccountId?: string; senderUsername?: string; senderE164?: string; provider?: string; @@ -52,6 +53,9 @@ export type CanonicalSentMessageHookContext = { accountId?: string; conversationId?: string; messageId?: string; + threadId?: string | number; + sessionKey?: string; + agentId?: string; isGroup?: boolean; groupId?: string; }; @@ -97,6 +101,7 @@ export function deriveInboundMessageHookContext( ctx.MessageSidLast, senderId: ctx.SenderId, senderName: ctx.SenderName, + senderManagedAccountId: ctx.SenderManagedAccountId, senderUsername: ctx.SenderUsername, senderE164: ctx.SenderE164, provider: ctx.Provider, @@ -122,6 +127,9 @@ export function buildCanonicalSentMessageHookContext(params: { accountId?: string; conversationId?: string; messageId?: string; + threadId?: string | number; + sessionKey?: string; + agentId?: string; isGroup?: boolean; groupId?: string; }): CanonicalSentMessageHookContext { @@ -134,6 +142,9 @@ export function buildCanonicalSentMessageHookContext(params: { accountId: params.accountId, conversationId: params.conversationId ?? params.to, messageId: params.messageId, + threadId: params.threadId, + sessionKey: params.sessionKey, + agentId: params.agentId, isGroup: params.isGroup, groupId: params.groupId, }; @@ -296,6 +307,7 @@ export function toPluginMessageReceivedEvent( messageId: canonical.messageId, senderId: canonical.senderId, senderName: canonical.senderName, + senderManagedAccountId: canonical.senderManagedAccountId, senderUsername: canonical.senderUsername, senderE164: canonical.senderE164, guildId: canonical.guildId, @@ -312,6 +324,17 @@ export function toPluginMessageSentEvent( content: canonical.content, success: canonical.success, ...(canonical.error ? { error: canonical.error } : {}), + metadata: { + channel: canonical.channelId, + accountId: canonical.accountId, + conversationId: canonical.conversationId, + messageId: canonical.messageId, + threadId: canonical.threadId, + sessionKey: canonical.sessionKey, + agentId: canonical.agentId, + isGroup: canonical.isGroup, + groupId: canonical.groupId, + }, }; } @@ -333,6 +356,7 @@ export function toInternalMessageReceivedContext( threadId: canonical.threadId, senderId: canonical.senderId, senderName: canonical.senderName, + senderManagedAccountId: canonical.senderManagedAccountId, senderUsername: canonical.senderUsername, senderE164: canonical.senderE164, guildId: canonical.guildId, diff --git a/src/infra/outbound/channel-resolution.test.ts b/src/infra/outbound/channel-resolution.test.ts index 30480fd0046..40f1cf13ee0 100644 --- a/src/infra/outbound/channel-resolution.test.ts +++ b/src/infra/outbound/channel-resolution.test.ts @@ -126,6 +126,7 @@ describe("outbound channel resolution", () => { runtimeOptions: { allowGatewaySubagentBinding: true, }, + inheritSharedRuntimeOptions: true, }); getChannelPluginMock.mockReturnValue(undefined); @@ -140,6 +141,7 @@ describe("outbound channel resolution", () => { runtimeOptions: { allowGatewaySubagentBinding: true, }, + inheritSharedRuntimeOptions: true, }); }); diff --git a/src/infra/outbound/channel-resolution.ts b/src/infra/outbound/channel-resolution.ts index 15372daa2a1..4b5cb48fcf6 100644 --- a/src/infra/outbound/channel-resolution.ts +++ b/src/infra/outbound/channel-resolution.ts @@ -57,6 +57,7 @@ function maybeBootstrapChannelPlugin(params: { runtimeOptions: { allowGatewaySubagentBinding: true, }, + inheritSharedRuntimeOptions: true, }); } catch { // Allow a follow-up resolution attempt if bootstrap failed transiently. diff --git a/src/infra/outbound/deliver.test.ts b/src/infra/outbound/deliver.test.ts index 6bf69a519f8..9148d4b4713 100644 --- a/src/infra/outbound/deliver.test.ts +++ b/src/infra/outbound/deliver.test.ts @@ -20,7 +20,8 @@ const mocks = vi.hoisted(() => ({ })); const hookMocks = vi.hoisted(() => ({ runner: { - hasHooks: vi.fn(() => false), + hasHooks: vi.fn<(name: string) => boolean>((_name) => false), + runMessageSending: vi.fn(async () => undefined), runMessageSent: vi.fn(async () => {}), }, })); @@ -210,6 +211,8 @@ describe("deliverOutboundPayloads", () => { mocks.appendAssistantMessageToSessionTranscript.mockClear(); hookMocks.runner.hasHooks.mockClear(); hookMocks.runner.hasHooks.mockReturnValue(false); + hookMocks.runner.runMessageSending.mockClear(); + hookMocks.runner.runMessageSending.mockResolvedValue(undefined); hookMocks.runner.runMessageSent.mockClear(); hookMocks.runner.runMessageSent.mockResolvedValue(undefined); internalHookMocks.createInternalHookEvent.mockClear(); @@ -983,6 +986,167 @@ describe("deliverOutboundPayloads", () => { ); }); + it("uses the same resolved sessionKey in message_sending and message_sent", async () => { + hookMocks.runner.hasHooks.mockImplementation( + (name: string) => name === "message_sending" || name === "message_sent", + ); + const sendWhatsApp = vi.fn().mockResolvedValue({ messageId: "w1", toJid: "jid" }); + + await deliverOutboundPayloads({ + cfg: {}, + channel: "whatsapp", + to: "+1555", + payloads: [{ text: "hello" }], + deps: { sendWhatsApp }, + session: { key: "agent:sender:session", agentId: "sender-agent" }, + mirror: { sessionKey: "agent:mirror:session", agentId: "mirror-agent" }, + }); + + expect(hookMocks.runner.runMessageSending).toHaveBeenCalledWith( + expect.objectContaining({ + metadata: expect.objectContaining({ + sessionKey: "agent:mirror:session", + agentId: "mirror-agent", + }), + }), + expect.anything(), + ); + expect(hookMocks.runner.runMessageSent).toHaveBeenCalledWith( + expect.objectContaining({ + metadata: expect.objectContaining({ + sessionKey: "agent:mirror:session", + agentId: "mirror-agent", + }), + }), + expect.anything(), + ); + }); + + it("uses replyToId as hook threadId for Slack threaded sends", async () => { + hookMocks.runner.hasHooks.mockImplementation( + (name: string) => name === "message_sending" || name === "message_sent", + ); + const sendText = vi.fn().mockResolvedValue({ channel: "slack", messageId: "slack-1" }); + setActivePluginRegistry( + createTestRegistry([ + { + pluginId: "slack", + source: "test", + plugin: createOutboundTestPlugin({ + id: "slack", + outbound: { deliveryMode: "direct", sendText }, + }), + }, + ]), + ); + + await deliverOutboundPayloads({ + cfg: {}, + channel: "slack", + to: "C123", + payloads: [{ text: "hello" }], + replyToId: "1741719181.247349", + threadId: null, + session: { key: "agent:sender:session", agentId: "sender-agent" }, + }); + + expect(hookMocks.runner.runMessageSending).toHaveBeenCalledWith( + expect.objectContaining({ + metadata: expect.objectContaining({ + threadId: "1741719181.247349", + sessionKey: "agent:sender:session", + agentId: "sender-agent", + }), + }), + expect.anything(), + ); + expect(hookMocks.runner.runMessageSent).toHaveBeenCalledWith( + expect.objectContaining({ + metadata: expect.objectContaining({ + threadId: "1741719181.247349", + sessionKey: "agent:sender:session", + agentId: "sender-agent", + }), + }), + expect.anything(), + ); + }); + + it("uses payload-level replyToId as hook threadId for Slack threaded sends", async () => { + hookMocks.runner.hasHooks.mockImplementation( + (name: string) => name === "message_sending" || name === "message_sent", + ); + const sendText = vi.fn().mockResolvedValue({ channel: "slack", messageId: "slack-2" }); + setActivePluginRegistry( + createTestRegistry([ + { + pluginId: "slack", + source: "test", + plugin: createOutboundTestPlugin({ + id: "slack", + outbound: { deliveryMode: "direct", sendText }, + }), + }, + ]), + ); + + await deliverOutboundPayloads({ + cfg: {}, + channel: "slack", + to: "C123", + payloads: [{ text: "hello", replyToId: "1741719181.999999" }], + replyToId: undefined, + threadId: null, + session: { key: "agent:sender:session", agentId: "sender-agent" }, + }); + + expect(hookMocks.runner.runMessageSending).toHaveBeenCalledWith( + expect.objectContaining({ + metadata: expect.objectContaining({ + threadId: "1741719181.999999", + sessionKey: "agent:sender:session", + agentId: "sender-agent", + }), + }), + expect.anything(), + ); + expect(hookMocks.runner.runMessageSent).toHaveBeenCalledWith( + expect.objectContaining({ + metadata: expect.objectContaining({ + threadId: "1741719181.999999", + sessionKey: "agent:sender:session", + agentId: "sender-agent", + }), + }), + expect.anything(), + ); + }); + + it("falls back to mirror metadata for message_sending when session is absent", async () => { + hookMocks.runner.hasHooks.mockImplementation((name: string) => name === "message_sending"); + const sendWhatsApp = vi.fn().mockResolvedValue({ messageId: "w1", toJid: "jid" }); + + await deliverOutboundPayloads({ + cfg: {}, + channel: "whatsapp", + to: "+1555", + payloads: [{ text: "hello" }], + deps: { sendWhatsApp }, + mirror: { sessionKey: "agent:mirror:session", agentId: "mirror-agent" }, + skipQueue: true, + }); + + expect(hookMocks.runner.runMessageSending).toHaveBeenCalledWith( + expect.objectContaining({ + metadata: expect.objectContaining({ + sessionKey: "agent:mirror:session", + agentId: "mirror-agent", + }), + }), + expect.anything(), + ); + }); + it("emits message_sent success for sendPayload deliveries", async () => { hookMocks.runner.hasHooks.mockReturnValue(true); const sendPayload = vi.fn().mockResolvedValue({ channel: "matrix", messageId: "mx-1" }); @@ -1199,6 +1363,47 @@ describe("deliverOutboundPayloads", () => { expect.objectContaining({ channelId: "whatsapp" }), ); }); + + it("emits payload-level threadId in message_sent failure metadata for Slack threaded sends", async () => { + hookMocks.runner.hasHooks.mockReturnValue(true); + const sendText = vi.fn().mockRejectedValue(new Error("downstream failed")); + setActivePluginRegistry( + createTestRegistry([ + { + pluginId: "slack", + source: "test", + plugin: createOutboundTestPlugin({ + id: "slack", + outbound: { deliveryMode: "direct", sendText }, + }), + }, + ]), + ); + + await expect( + deliverOutboundPayloads({ + cfg: {}, + channel: "slack", + to: "C123", + payloads: [{ text: "hi", replyToId: "1741719181.424242" }], + threadId: null, + session: { key: "agent:sender:session", agentId: "sender-agent" }, + }), + ).rejects.toThrow("downstream failed"); + + expect(hookMocks.runner.runMessageSent).toHaveBeenCalledWith( + expect.objectContaining({ + metadata: expect.objectContaining({ + threadId: "1741719181.424242", + sessionKey: "agent:sender:session", + agentId: "sender-agent", + }), + success: false, + error: "downstream failed", + }), + expect.anything(), + ); + }); }); const emptyRegistry = createTestRegistry([]); diff --git a/src/infra/outbound/deliver.ts b/src/infra/outbound/deliver.ts index e1be816c910..d1ff7dcb067 100644 --- a/src/infra/outbound/deliver.ts +++ b/src/infra/outbound/deliver.ts @@ -345,13 +345,23 @@ function createMessageSentEmitter(params: { channel: Exclude; to: string; accountId?: string; + conversationId?: string; + agentId?: string; + sessionKey?: string; sessionKeyForInternalHooks?: string; mirrorIsGroup?: boolean; mirrorGroupId?: string; -}): { emitMessageSent: (event: MessageSentEvent) => void; hasMessageSentHooks: boolean } { +}): { + emitMessageSent: ( + event: MessageSentEvent & { threadId?: string | number; conversationId?: string }, + ) => void; + hasMessageSentHooks: boolean; +} { const hasMessageSentHooks = params.hookRunner?.hasHooks("message_sent") ?? false; const canEmitInternalHook = Boolean(params.sessionKeyForInternalHooks); - const emitMessageSent = (event: MessageSentEvent) => { + const emitMessageSent = ( + event: MessageSentEvent & { threadId?: string | number; conversationId?: string }, + ) => { if (!hasMessageSentHooks && !canEmitInternalHook) { return; } @@ -362,8 +372,11 @@ function createMessageSentEmitter(params: { error: event.error, channelId: params.channel, accountId: params.accountId ?? undefined, - conversationId: params.to, + conversationId: event.conversationId ?? params.conversationId ?? params.to, messageId: event.messageId, + threadId: event.threadId, + sessionKey: params.sessionKey, + agentId: params.agentId, isGroup: params.mirrorIsGroup, groupId: params.mirrorGroupId, }); @@ -400,6 +413,31 @@ function createMessageSentEmitter(params: { return { emitMessageSent, hasMessageSentHooks }; } +function resolveOutboundHookMetadata(params: { + channel: Exclude; + to: string; + conversationId?: string; + replyToId?: string | null; + threadId?: string | number | null; + session?: OutboundSessionContext; + mirror?: DeliverOutboundPayloadsCoreParams["mirror"]; +}): { + conversationId: string; + threadId?: string | number; + sessionKey?: string; + agentId?: string; +} { + const resolvedThreadId = + params.threadId ?? + (params.channel === "slack" && params.replyToId ? params.replyToId : undefined); + return { + conversationId: params.conversationId ?? params.to, + threadId: resolvedThreadId, + sessionKey: params.mirror?.sessionKey ?? params.session?.key, + agentId: params.mirror?.agentId ?? params.session?.agentId, + }; +} + async function applyMessageSendingHook(params: { hookRunner: ReturnType; enabled: boolean; @@ -408,6 +446,10 @@ async function applyMessageSendingHook(params: { to: string; channel: Exclude; accountId?: string; + conversationId?: string; + threadId?: string | number | null; + sessionKey?: string; + agentId?: string; }): Promise<{ cancelled: boolean; payload: ReplyPayload; @@ -428,12 +470,17 @@ async function applyMessageSendingHook(params: { metadata: { channel: params.channel, accountId: params.accountId, + conversationId: params.conversationId ?? params.to, + threadId: params.threadId ?? undefined, + sessionKey: params.sessionKey, + agentId: params.agentId, mediaUrls: params.payloadSummary.mediaUrls, }, }, { channelId: params.channel, accountId: params.accountId ?? undefined, + conversationId: params.conversationId ?? params.to, }, ); if (sendingResult?.cancel) { @@ -609,7 +656,16 @@ async function deliverOutboundPayloadsCore( }; const normalizedPayloads = normalizePayloadsForChannelDelivery(payloads, channel, handler); const hookRunner = getGlobalHookRunner(); - const sessionKeyForInternalHooks = params.mirror?.sessionKey ?? params.session?.key; + const hookMetadata = resolveOutboundHookMetadata({ + channel, + to, + conversationId: to, + replyToId: params.replyToId, + threadId: params.threadId, + session: params.session, + mirror: params.mirror, + }); + const sessionKeyForInternalHooks = hookMetadata.sessionKey; const mirrorIsGroup = params.mirror?.isGroup; const mirrorGroupId = params.mirror?.groupId; const { emitMessageSent, hasMessageSentHooks } = createMessageSentEmitter({ @@ -617,23 +673,35 @@ async function deliverOutboundPayloadsCore( channel, to, accountId, + conversationId: hookMetadata.conversationId, + agentId: hookMetadata.agentId, + sessionKey: hookMetadata.sessionKey, sessionKeyForInternalHooks, mirrorIsGroup, mirrorGroupId, }); const hasMessageSendingHooks = hookRunner?.hasHooks("message_sending") ?? false; - if (hasMessageSentHooks && params.session?.agentId && !sessionKeyForInternalHooks) { + if (hasMessageSentHooks && hookMetadata.agentId && !sessionKeyForInternalHooks) { log.warn( "deliverOutboundPayloads: session.agentId present without session key; internal message:sent hook will be skipped", { channel, to, - agentId: params.session.agentId, + agentId: hookMetadata.agentId, }, ); } for (const payload of normalizedPayloads) { let payloadSummary = buildPayloadSummary(payload); + let payloadHookMetadata = resolveOutboundHookMetadata({ + channel, + to, + conversationId: to, + replyToId: payload.replyToId ?? params.replyToId, + threadId: params.threadId, + session: params.session, + mirror: params.mirror, + }); try { throwIfAborted(abortSignal); @@ -646,6 +714,10 @@ async function deliverOutboundPayloadsCore( to, channel, accountId, + conversationId: payloadHookMetadata.conversationId, + threadId: payloadHookMetadata.threadId, + sessionKey: payloadHookMetadata.sessionKey, + agentId: payloadHookMetadata.agentId, }); if (hookResult.cancelled) { continue; @@ -659,6 +731,15 @@ async function deliverOutboundPayloadsCore( threadId: params.threadId ?? undefined, forceDocument: params.forceDocument, }; + payloadHookMetadata = resolveOutboundHookMetadata({ + channel, + to, + conversationId: to, + replyToId: sendOverrides.replyToId, + threadId: sendOverrides.threadId, + session: params.session, + mirror: params.mirror, + }); if ( handler.sendPayload && hasReplyPayloadContent({ @@ -672,6 +753,8 @@ async function deliverOutboundPayloadsCore( success: true, content: payloadSummary.text, messageId: delivery.messageId, + conversationId: payloadHookMetadata.conversationId, + threadId: payloadHookMetadata.threadId, }); continue; } @@ -687,6 +770,8 @@ async function deliverOutboundPayloadsCore( success: results.length > beforeCount, content: payloadSummary.text, messageId, + conversationId: payloadHookMetadata.conversationId, + threadId: payloadHookMetadata.threadId, }); continue; } @@ -713,6 +798,8 @@ async function deliverOutboundPayloadsCore( success: results.length > beforeCount, content: payloadSummary.text, messageId, + conversationId: payloadHookMetadata.conversationId, + threadId: payloadHookMetadata.threadId, }); continue; } @@ -742,12 +829,16 @@ async function deliverOutboundPayloadsCore( success: true, content: payloadSummary.text, messageId: lastMessageId, + conversationId: payloadHookMetadata.conversationId, + threadId: payloadHookMetadata.threadId, }); } catch (err) { emitMessageSent({ success: false, content: payloadSummary.text, error: err instanceof Error ? err.message : String(err), + conversationId: payloadHookMetadata.conversationId, + threadId: payloadHookMetadata.threadId, }); if (!params.bestEffort) { throw err; diff --git a/src/infra/run-node.test.ts b/src/infra/run-node.test.ts index 9b6c871379b..3b1d12d61c0 100644 --- a/src/infra/run-node.test.ts +++ b/src/infra/run-node.test.ts @@ -34,7 +34,12 @@ async function writeRuntimePostBuildScaffold(tmp: string): Promise { await fs.utimes(pluginSdkAliasPath, baselineTime, baselineTime); } -function expectedBuildSpawn() { +async function readJsonFile(filePath: string): Promise { + return JSON.parse(await fs.readFile(filePath, "utf-8")); +} + +function expectedBuildSpawn(platform: NodeJS.Platform = process.platform) { + void platform; return [process.execPath, "scripts/tsdown-build.mjs", "--no-clean"]; } @@ -153,10 +158,11 @@ describe("run-node script", () => { fs.readFile(path.join(tmp, "dist", "plugin-sdk", "root-alias.cjs"), "utf-8"), ).resolves.toContain("module.exports = {};"); await expect( - fs - .readFile(path.join(tmp, "dist", "extensions", "demo", "openclaw.plugin.json"), "utf-8") - .then((raw) => JSON.parse(raw)), - ).resolves.toMatchObject({ id: "demo" }); + readJsonFile(path.join(tmp, "dist", "extensions", "demo", "openclaw.plugin.json")), + ).resolves.toMatchObject({ + id: "demo", + configSchema: { type: "object" }, + }); await expect( fs.readFile(path.join(tmp, "dist", "extensions", "demo", "package.json"), "utf-8"), ).resolves.toContain( @@ -499,10 +505,9 @@ describe("run-node script", () => { expect(exitCode).toBe(0); expect(spawnCalls).toEqual([[process.execPath, "openclaw.mjs", "status"]]); - await expect( - fs.readFile(distManifestPath, "utf-8").then((raw) => JSON.parse(raw)), - ).resolves.toMatchObject({ + await expect(readJsonFile(distManifestPath)).resolves.toMatchObject({ id: "demo", + configSchema: { type: "object" }, }); }); }); @@ -568,10 +573,9 @@ describe("run-node script", () => { expect(exitCode).toBe(0); expect(spawnCalls).toEqual([[process.execPath, "openclaw.mjs", "status"]]); - await expect( - fs.readFile(distManifestPath, "utf-8").then((raw) => JSON.parse(raw)), - ).resolves.toMatchObject({ + await expect(readJsonFile(distManifestPath)).resolves.toMatchObject({ id: "demo", + configSchema: { type: "object" }, }); }); }); diff --git a/src/plugin-sdk/keyed-async-queue.test.ts b/src/plugin-sdk/keyed-async-queue.test.ts index 50038f5bc93..b266cdb2406 100644 --- a/src/plugin-sdk/keyed-async-queue.test.ts +++ b/src/plugin-sdk/keyed-async-queue.test.ts @@ -105,4 +105,42 @@ describe("KeyedAsyncQueue", () => { await Promise.resolve(); expect(queue.getTailMapForTesting().has("actor")).toBe(false); }); + + it("waits for tasks enqueued while draining", async () => { + const queue = new KeyedAsyncQueue(); + const firstGate = deferred(); + const secondGate = deferred(); + let secondStarted = false; + + void queue.enqueue("actor", async () => { + await firstGate.promise; + }); + + const idlePromise = queue.waitForIdle(); + + await vi.waitFor(() => { + expect(queue.getTailMapForTesting().has("actor")).toBe(true); + }); + + void queue.enqueue("actor", async () => { + secondStarted = true; + await secondGate.promise; + }); + + firstGate.resolve(); + await vi.waitFor(() => { + expect(secondStarted).toBe(true); + }); + + let idleResolved = false; + void idlePromise.then(() => { + idleResolved = true; + }); + await Promise.resolve(); + expect(idleResolved).toBe(false); + + secondGate.resolve(); + await idlePromise; + expect(queue.getTailMapForTesting().size).toBe(0); + }); }); diff --git a/src/plugin-sdk/keyed-async-queue.ts b/src/plugin-sdk/keyed-async-queue.ts index 0f07f3c8462..81e4103ca8f 100644 --- a/src/plugin-sdk/keyed-async-queue.ts +++ b/src/plugin-sdk/keyed-async-queue.ts @@ -46,4 +46,10 @@ export class KeyedAsyncQueue { ...(hooks ? { hooks } : {}), }); } + + async waitForIdle(): Promise { + while (this.tails.size > 0) { + await Promise.allSettled(this.tails.values()); + } + } } diff --git a/src/plugins/loader.ts b/src/plugins/loader.ts index 6f5900f8334..f02ef977f35 100644 --- a/src/plugins/loader.ts +++ b/src/plugins/loader.ts @@ -61,6 +61,7 @@ export type PluginLoadOptions = { logger?: PluginLogger; coreGatewayHandlers?: Record; runtimeOptions?: CreatePluginRuntimeOptions; + inheritSharedRuntimeOptions?: boolean; cache?: boolean; mode?: "full" | "validate"; onlyPluginIds?: string[]; diff --git a/src/plugins/types.ts b/src/plugins/types.ts index 343a338c4f8..e204c55c12a 100644 --- a/src/plugins/types.ts +++ b/src/plugins/types.ts @@ -1670,6 +1670,7 @@ export type PluginHookMessageSentEvent = { content: string; success: boolean; error?: string; + metadata?: Record; }; // Tool context diff --git a/test/helpers/extensions/discord-provider.test-support.ts b/test/helpers/extensions/discord-provider.test-support.ts index 538e00ae9fa..854b0cd869c 100644 --- a/test/helpers/extensions/discord-provider.test-support.ts +++ b/test/helpers/extensions/discord-provider.test-support.ts @@ -38,8 +38,10 @@ type ProviderMonitorTestMocks = { listNativeCommandSpecsForConfigMock: Mock<() => NativeCommandSpecMock[]>; listSkillCommandsForAgentsMock: Mock<() => unknown[]>; monitorLifecycleMock: Mock<(params: { threadBindings: { stop: () => void } }) => Promise>; + rememberDiscordManagedBotIdentityMock: Mock<() => void>; resolveDiscordAccountMock: Mock<() => unknown>; resolveDiscordAllowlistConfigMock: Mock<() => Promise>; + forgetDiscordManagedBotIdentityMock: Mock<() => void>; resolveNativeCommandsEnabledMock: Mock<() => boolean>; resolveNativeSkillsEnabledMock: Mock<() => boolean>; isVerboseMock: Mock<() => boolean>; @@ -81,6 +83,7 @@ const providerMonitorTestMocks: ProviderMonitorTestMocks = vi.hoisted(() => { vi.fn(async () => undefined), { deactivate: vi.fn(), + waitForIdle: vi.fn(async () => undefined), }, ), ), @@ -113,6 +116,7 @@ const providerMonitorTestMocks: ProviderMonitorTestMocks = vi.hoisted(() => { monitorLifecycleMock: vi.fn(async (params: { threadBindings: { stop: () => void } }) => { params.threadBindings.stop(); }), + rememberDiscordManagedBotIdentityMock: vi.fn(), resolveDiscordAccountMock: vi.fn(() => ({ accountId: "default", token: "cfg-token", @@ -122,6 +126,7 @@ const providerMonitorTestMocks: ProviderMonitorTestMocks = vi.hoisted(() => { guildEntries: undefined, allowFrom: undefined, })), + forgetDiscordManagedBotIdentityMock: vi.fn(), resolveNativeCommandsEnabledMock: vi.fn(() => true), resolveNativeSkillsEnabledMock: vi.fn(() => false), isVerboseMock, @@ -147,8 +152,10 @@ const { listNativeCommandSpecsForConfigMock, listSkillCommandsForAgentsMock, monitorLifecycleMock, + rememberDiscordManagedBotIdentityMock, resolveDiscordAccountMock, resolveDiscordAllowlistConfigMock, + forgetDiscordManagedBotIdentityMock, resolveNativeCommandsEnabledMock, resolveNativeSkillsEnabledMock, isVerboseMock, @@ -199,6 +206,7 @@ export function resetDiscordProviderMonitorMocks(params?: { vi.fn(async () => undefined), { deactivate: vi.fn(), + waitForIdle: vi.fn(async () => undefined), }, ), ); @@ -221,6 +229,7 @@ export function resetDiscordProviderMonitorMocks(params?: { monitorLifecycleMock.mockClear().mockImplementation(async (monitorParams) => { monitorParams.threadBindings.stop(); }); + rememberDiscordManagedBotIdentityMock.mockClear(); resolveDiscordAccountMock.mockClear().mockReturnValue({ accountId: "default", token: "cfg-token", @@ -230,6 +239,7 @@ export function resetDiscordProviderMonitorMocks(params?: { guildEntries: undefined, allowFrom: undefined, }); + forgetDiscordManagedBotIdentityMock.mockClear(); resolveNativeCommandsEnabledMock.mockClear().mockReturnValue(true); resolveNativeSkillsEnabledMock.mockClear().mockReturnValue(false); isVerboseMock.mockClear().mockReturnValue(false); @@ -248,16 +258,34 @@ export const baseConfig = (): OpenClawConfig => channels: { discord: { accounts: { - default: { - token: "MTIz.abc.def", - }, + default: {}, }, }, }, }) as OpenClawConfig; -vi.mock("@buape/carbon", async (importOriginal) => { - const actual = await importOriginal(); +vi.mock("@buape/carbon", () => { + class Button {} + class ChannelSelectMenu {} + class Command {} + class CommandWithSubcommands {} + class Container { + constructor( + _components?: unknown, + _options?: { + accentColor?: string; + spoiler?: boolean; + }, + ) {} + } + class MentionableSelectMenu {} + class Message {} + class MessageCreateListener {} + class MessageReactionAddListener {} + class MessageReactionRemoveListener {} + class Modal {} + class PresenceUpdateListener {} + class ReadyListener {} class RateLimitError extends Error { status = 429; discordCode?: number; @@ -294,16 +322,91 @@ vi.mock("@buape/carbon", async (importOriginal) => { return clientGetPluginMock(name); } } - return { ...actual, Client, RateLimitError }; + class RequestClient {} + class Row {} + class RoleSelectMenu {} + class Separator {} + class StringSelectMenu {} + class TextDisplay {} + class ThreadUpdateListener {} + class UserSelectMenu {} + class Embed {} + const ChannelType = { + GuildText: 0, + DM: 1, + GuildVoice: 2, + GroupDM: 3, + GuildCategory: 4, + GuildAnnouncement: 5, + AnnouncementThread: 10, + PublicThread: 11, + PrivateThread: 12, + }; + const MessageType = { + Default: 0, + Reply: 19, + }; + return { + Client, + Button, + ChannelSelectMenu, + ChannelType, + Command, + CommandWithSubcommands, + Container, + Embed, + MentionableSelectMenu, + Message, + MessageCreateListener, + MessageType, + MessageReactionAddListener, + MessageReactionRemoveListener, + Modal, + PresenceUpdateListener, + RateLimitError, + ReadyListener, + RequestClient, + Row, + RoleSelectMenu, + Separator, + serializePayload: (payload: unknown) => payload, + StringSelectMenu, + TextDisplay, + ThreadUpdateListener, + UserSelectMenu, + }; }); -vi.mock("@buape/carbon/gateway", () => ({ - GatewayCloseCodes: { DisallowedIntents: 4014 }, -})); +vi.mock("@buape/carbon/gateway", () => { + class GatewayPlugin { + gatewayInfo?: unknown; + constructor(_options?: unknown) {} + async registerClient(_client: unknown) { + return undefined; + } + } + return { + GatewayCloseCodes: { DisallowedIntents: 4014 }, + GatewayIntents: { + Guilds: 1 << 0, + GuildMessages: 1 << 9, + MessageContent: 1 << 15, + DirectMessages: 1 << 12, + GuildMessageReactions: 1 << 10, + DirectMessageReactions: 1 << 13, + GuildVoiceStates: 1 << 7, + GuildPresences: 1 << 8, + GuildMembers: 1 << 1, + }, + GatewayPlugin, + }; +}); -vi.mock("@buape/carbon/voice", () => ({ - VoicePlugin: class VoicePlugin {}, -})); +vi.mock("@buape/carbon/voice", () => { + return { + VoicePlugin: class VoicePlugin {}, + }; +}); vi.mock("openclaw/plugin-sdk/acp-runtime", async () => { const actual = await vi.importActual( @@ -388,6 +491,8 @@ vi.mock("openclaw/plugin-sdk/infra-runtime", async () => { }); vi.mock("../../../extensions/discord/src/accounts.js", () => ({ + forgetDiscordManagedBotIdentity: forgetDiscordManagedBotIdentityMock, + rememberDiscordManagedBotIdentity: rememberDiscordManagedBotIdentityMock, resolveDiscordAccount: resolveDiscordAccountMock, })); @@ -472,9 +577,7 @@ vi.mock("../../../extensions/discord/src/monitor/provider.lifecycle.js", () => ( })); vi.mock("../../../extensions/discord/src/monitor/rest-fetch.js", () => ({ - resolveDiscordRestFetch: () => async () => { - throw new Error("offline"); - }, + resolveDiscordRestFetch: () => async () => undefined, })); vi.mock("../../../extensions/discord/src/monitor/thread-bindings.js", () => ({