From f51cac277c87f3409ba550a21aa9bb7082c66835 Mon Sep 17 00:00:00 2001 From: Peter Steinberger Date: Sat, 7 Mar 2026 21:13:47 +0000 Subject: [PATCH] fix(discord): make message listener non-blocking (#39154, thanks @yaseenkadlemakki) Co-authored-by: Yaseen Kadlemakki --- CHANGELOG.md | 1 + src/discord/monitor.test.ts | 60 ++++----- src/discord/monitor/listeners.test.ts | 168 +++++++------------------- src/discord/monitor/listeners.ts | 40 ++---- 4 files changed, 81 insertions(+), 188 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 208c93a198a..71e0058cb12 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -280,6 +280,7 @@ Docs: https://docs.openclaw.ai - Podman/.env gateway bind precedence: evaluate `OPENCLAW_GATEWAY_BIND` after sourcing `.env` in `run-openclaw-podman.sh` so env-file overrides are honored. (#38785) Thanks @majinyu666. - Models/default alias refresh: bump `gpt` to `openai/gpt-5.4` and Gemini defaults to `gemini-3.1` preview aliases (including normalization/default wiring) to track current model IDs. (#38638) Thanks @ademczuk. - Config/env substitution degraded mode: convert missing `${VAR}` resolution in config reads from hard-fail to warning-backed degraded behavior, while preventing unresolved placeholders from being accepted as gateway credentials. (#39050) Thanks @akz142857. +- Discord inbound listener non-blocking dispatch: make `MESSAGE_CREATE` listener handoff asynchronous (no per-listener queue blocking), so long runs no longer stall unrelated incoming events. (#39154) Thanks @yaseenkadlemakki. ## 2026.3.2 diff --git a/src/discord/monitor.test.ts b/src/discord/monitor.test.ts index 50bb52af18d..10c7dc66747 100644 --- a/src/discord/monitor.test.ts +++ b/src/discord/monitor.test.ts @@ -115,7 +115,7 @@ describe("DiscordMessageListener", () => { expect(handlerResolved).toBe(true); }); - it("queues subsequent events until prior message handling completes", async () => { + it("dispatches subsequent events concurrently without blocking on prior handler", async () => { const first = createDeferred(); const second = createDeferred(); let runCount = 0; @@ -142,12 +142,12 @@ describe("DiscordMessageListener", () => { ), ).resolves.toBeUndefined(); - expect(handler).toHaveBeenCalledTimes(1); - first.resolve(); + // Both handlers are dispatched concurrently (fire-and-forget). await vi.waitFor(() => { expect(handler).toHaveBeenCalledTimes(2); }); + first.resolve(); second.resolve(); await Promise.resolve(); }); @@ -171,42 +171,28 @@ describe("DiscordMessageListener", () => { }); }); - it("logs slow handlers after the threshold", async () => { - vi.useFakeTimers(); - vi.setSystemTime(0); + it("does not apply its own slow-listener logging (owned by inbound worker)", async () => { + const deferred = createDeferred(); + const handler = vi.fn(() => deferred.promise); + const logger = { + warn: vi.fn(), + error: vi.fn(), + } as unknown as ReturnType; + const listener = new DiscordMessageListener(handler, logger); - try { - const deferred = createDeferred(); - const handler = vi.fn(() => deferred.promise); - const logger = { - warn: vi.fn(), - error: vi.fn(), - } as unknown as ReturnType; - const listener = new DiscordMessageListener(handler, logger); + const handlePromise = listener.handle( + {} as unknown as import("./monitor/listeners.js").DiscordMessageEvent, + {} as unknown as import("@buape/carbon").Client, + ); + await expect(handlePromise).resolves.toBeUndefined(); - // handle() should release immediately. - const handlePromise = listener.handle( - {} as unknown as import("./monitor/listeners.js").DiscordMessageEvent, - {} as unknown as import("@buape/carbon").Client, - ); - await expect(handlePromise).resolves.toBeUndefined(); - expect(logger.warn).not.toHaveBeenCalled(); - - // Advance wall clock past the slow listener threshold. - vi.setSystemTime(31_000); - - // Release the background handler and allow slow-log finalizer to run. - deferred.resolve(); - await vi.waitFor(() => { - expect(logger.warn).toHaveBeenCalled(); - }); - const warnMock = logger.warn as unknown as { mock: { calls: unknown[][] } }; - const [, meta] = warnMock.mock.calls[0] ?? []; - const durationMs = (meta as { durationMs?: number } | undefined)?.durationMs; - expect(durationMs).toBeGreaterThanOrEqual(30_000); - } finally { - vi.useRealTimers(); - } + deferred.resolve(); + await vi.waitFor(() => { + expect(handler).toHaveBeenCalledOnce(); + }); + // The listener no longer wraps handlers with slow-listener logging; + // that responsibility moved to the inbound worker. + expect(logger.warn).not.toHaveBeenCalled(); }); }); diff --git a/src/discord/monitor/listeners.test.ts b/src/discord/monitor/listeners.test.ts index d1342b3ddb2..71145396a82 100644 --- a/src/discord/monitor/listeners.test.ts +++ b/src/discord/monitor/listeners.test.ts @@ -25,44 +25,63 @@ describe("DiscordMessageListener", () => { const listener = new DiscordMessageListener(handler as never, logger as never); await expect(listener.handle(fakeEvent("ch-1"), {} as never)).resolves.toBeUndefined(); - expect(handler).toHaveBeenCalledTimes(1); + // Handler was dispatched but may not have been called yet (fire-and-forget). + // Wait for the microtask to flush so the handler starts. + await vi.waitFor(() => { + expect(handler).toHaveBeenCalledTimes(1); + }); expect(logger.error).not.toHaveBeenCalled(); resolveHandler?.(); await handlerDone; }); - it("serializes queued handler runs for the same channel", async () => { - let firstResolve: (() => void) | undefined; - let secondResolve: (() => void) | undefined; - const firstDone = new Promise((resolve) => { - firstResolve = resolve; + it("runs handlers for the same channel concurrently (no per-channel serialization)", async () => { + const order: string[] = []; + let resolveA: (() => void) | undefined; + let resolveB: (() => void) | undefined; + const doneA = new Promise((r) => { + resolveA = r; }); - const secondDone = new Promise((resolve) => { - secondResolve = resolve; + const doneB = new Promise((r) => { + resolveB = r; }); - let runCount = 0; + let callCount = 0; const handler = vi.fn(async () => { - runCount += 1; - if (runCount === 1) { - await firstDone; - return; + callCount += 1; + const id = callCount; + order.push(`start:${id}`); + if (id === 1) { + await doneA; + } else { + await doneB; } - await secondDone; + order.push(`end:${id}`); }); const listener = new DiscordMessageListener(handler as never, createLogger() as never); - await expect(listener.handle(fakeEvent("ch-1"), {} as never)).resolves.toBeUndefined(); - await expect(listener.handle(fakeEvent("ch-1"), {} as never)).resolves.toBeUndefined(); + // Both messages target the same channel — previously serialized, now concurrent. + await listener.handle(fakeEvent("ch-1"), {} as never); + await listener.handle(fakeEvent("ch-1"), {} as never); - expect(handler).toHaveBeenCalledTimes(1); - firstResolve?.(); await vi.waitFor(() => { expect(handler).toHaveBeenCalledTimes(2); }); + // Both handlers started without waiting for the first to finish. + expect(order).toContain("start:1"); + expect(order).toContain("start:2"); - secondResolve?.(); - await secondDone; + resolveB?.(); + await vi.waitFor(() => { + expect(order).toContain("end:2"); + }); + // First handler is still running — no serialization. + expect(order).not.toContain("end:1"); + + resolveA?.(); + await vi.waitFor(() => { + expect(order).toContain("end:1"); + }); }); it("runs handlers for different channels in parallel", async () => { @@ -122,109 +141,14 @@ describe("DiscordMessageListener", () => { }); }); - it("continues same-channel processing after handler timeout", async () => { - vi.useFakeTimers(); - try { - const never = new Promise(() => {}); - const handler = vi.fn(async () => { - if (handler.mock.calls.length === 1) { - await never; - return; - } - }); - const logger = createLogger(); - const listener = new DiscordMessageListener(handler as never, logger as never, undefined, { - timeoutMs: 50, - }); + it("calls onEvent callback for each message", async () => { + const handler = vi.fn(async () => {}); + const onEvent = vi.fn(); + const listener = new DiscordMessageListener(handler as never, undefined, onEvent); - await listener.handle(fakeEvent("ch-1"), {} as never); - await listener.handle(fakeEvent("ch-1"), {} as never); - expect(handler).toHaveBeenCalledTimes(1); + await listener.handle(fakeEvent("ch-1"), {} as never); + await listener.handle(fakeEvent("ch-2"), {} as never); - await vi.advanceTimersByTimeAsync(60); - await vi.waitFor(() => { - expect(handler).toHaveBeenCalledTimes(2); - }); - expect(logger.error).toHaveBeenCalledWith(expect.stringContaining("timed out after")); - } finally { - vi.useRealTimers(); - } - }); - - it("aborts timed-out handlers and prevents late side effects", async () => { - vi.useFakeTimers(); - try { - let abortReceived = false; - let lateSideEffect = false; - const handler = vi.fn( - async ( - _data: unknown, - _client: unknown, - options?: { - abortSignal?: AbortSignal; - }, - ) => { - await new Promise((resolve) => { - if (options?.abortSignal?.aborted) { - abortReceived = true; - resolve(); - return; - } - options?.abortSignal?.addEventListener( - "abort", - () => { - abortReceived = true; - resolve(); - }, - { once: true }, - ); - }); - if (options?.abortSignal?.aborted) { - return; - } - lateSideEffect = true; - }, - ); - const logger = createLogger(); - const listener = new DiscordMessageListener(handler as never, logger as never, undefined, { - timeoutMs: 50, - }); - - await listener.handle(fakeEvent("ch-1"), {} as never); - await listener.handle(fakeEvent("ch-1"), {} as never); - - await vi.advanceTimersByTimeAsync(60); - await vi.waitFor(() => { - expect(handler).toHaveBeenCalledTimes(2); - }); - expect(abortReceived).toBe(true); - expect(lateSideEffect).toBe(false); - expect(logger.error).toHaveBeenCalledWith(expect.stringContaining("timed out after")); - } finally { - vi.useRealTimers(); - } - }); - - it("does not emit slow-listener warnings when timeout already fired", async () => { - vi.useFakeTimers(); - try { - const never = new Promise(() => {}); - const handler = vi.fn(async () => { - await never; - }); - const logger = createLogger(); - const listener = new DiscordMessageListener(handler as never, logger as never, undefined, { - timeoutMs: 31_000, - }); - - await listener.handle(fakeEvent("ch-1"), {} as never); - await vi.advanceTimersByTimeAsync(31_100); - await vi.waitFor(() => { - expect(logger.error).toHaveBeenCalledWith(expect.stringContaining("timed out after")); - }); - expect(logger.warn).not.toHaveBeenCalled(); - } finally { - vi.useRealTimers(); - } + expect(onEvent).toHaveBeenCalledTimes(2); }); }); diff --git a/src/discord/monitor/listeners.ts b/src/discord/monitor/listeners.ts index 4ca94de098d..056a1ad7116 100644 --- a/src/discord/monitor/listeners.ts +++ b/src/discord/monitor/listeners.ts @@ -13,7 +13,6 @@ import { danger, logVerbose } from "../../globals.js"; import { formatDurationSeconds } from "../../infra/format-time/format-duration.ts"; import { enqueueSystemEvent } from "../../infra/system-events.js"; import { createSubsystemLogger } from "../../logging/subsystem.js"; -import { KeyedAsyncQueue } from "../../plugin-sdk/keyed-async-queue.js"; import { resolveAgentRoute } from "../../routing/resolve-route.js"; import { readStoreAllowFromForDmPolicy, @@ -199,44 +198,27 @@ export function registerDiscordListener(listeners: Array, listener: obje } export class DiscordMessageListener extends MessageCreateListener { - private readonly channelQueue = new KeyedAsyncQueue(); - private readonly listenerTimeoutMs: number; - constructor( private handler: DiscordMessageHandler, private logger?: Logger, private onEvent?: () => void, - options?: { timeoutMs?: number }, + _options?: { timeoutMs?: number }, ) { super(); - this.listenerTimeoutMs = normalizeDiscordListenerTimeoutMs(options?.timeoutMs); } async handle(data: DiscordMessageEvent, client: Client) { this.onEvent?.(); - const channelId = data.channel_id; - const context = { - channelId, - messageId: (data as { message?: { id?: string } }).message?.id, - guildId: (data as { guild_id?: string }).guild_id, - } satisfies Record; - // Serialize messages within the same channel to preserve ordering, - // but allow different channels to proceed in parallel so that - // channel-bound agents are not blocked by each other. - void this.channelQueue.enqueue(channelId, () => - runDiscordListenerWithSlowLog({ - logger: this.logger, - listener: this.constructor.name, - event: this.type, - timeoutMs: this.listenerTimeoutMs, - context, - run: (abortSignal) => this.handler(data, client, { abortSignal }), - onError: (err) => { - const logger = this.logger ?? discordEventQueueLog; - logger.error(danger(`discord handler failed: ${String(err)}`)); - }, - }), - ); + // Fire-and-forget: hand off to the handler without blocking the + // Carbon listener. Per-session ordering and run timeouts are owned + // by the inbound worker queue, so the listener no longer serializes + // or applies its own timeout. + void Promise.resolve() + .then(() => this.handler(data, client)) + .catch((err) => { + const logger = this.logger ?? discordEventQueueLog; + logger.error(danger(`discord handler failed: ${String(err)}`)); + }); } }