import { describe, expect, it, vi } from "vitest"; import type { OpenClawConfig } from "../../config/types.js"; import { createNoopThreadBindingManager } from "./thread-bindings.js"; const preflightDiscordMessageMock = vi.hoisted(() => vi.fn()); const processDiscordMessageMock = vi.hoisted(() => vi.fn()); vi.mock("./message-handler.preflight.js", () => ({ preflightDiscordMessage: preflightDiscordMessageMock, })); vi.mock("./message-handler.process.js", () => ({ processDiscordMessage: processDiscordMessageMock, })); const { createDiscordMessageHandler } = await import("./message-handler.js"); function createDeferred() { let resolve: (value: T | PromiseLike) => void = () => {}; const promise = new Promise((innerResolve) => { resolve = innerResolve; }); return { promise, resolve }; } function createHandlerParams(overrides?: { setStatus?: (patch: Record) => void; abortSignal?: AbortSignal; }) { const cfg: OpenClawConfig = { channels: { discord: { enabled: true, token: "test-token", groupPolicy: "allowlist", }, }, messages: { inbound: { debounceMs: 0, }, }, }; return { cfg, discordConfig: cfg.channels?.discord, accountId: "default", token: "test-token", runtime: { log: vi.fn(), error: vi.fn(), exit: (code: number): never => { throw new Error(`exit ${code}`); }, }, botUserId: "bot-123", guildHistories: new Map(), historyLimit: 0, mediaMaxBytes: 10_000, textLimit: 2_000, replyToMode: "off" as const, dmEnabled: true, groupDmEnabled: false, threadBindings: createNoopThreadBindingManager("default"), setStatus: overrides?.setStatus, abortSignal: overrides?.abortSignal, }; } function createMessageData(messageId: string, channelId = "ch-1") { return { channel_id: channelId, author: { id: "user-1" }, message: { id: messageId, author: { id: "user-1", bot: false }, content: "hello", channel_id: channelId, attachments: [{ id: `att-${messageId}` }], }, }; } function createPreflightContext(channelId = "ch-1") { return { route: { sessionKey: `agent:main:discord:channel:${channelId}`, }, baseSessionKey: `agent:main:discord:channel:${channelId}`, messageChannelId: channelId, }; } describe("createDiscordMessageHandler queue behavior", () => { it("resets busy counters when the handler is created", () => { preflightDiscordMessageMock.mockReset(); processDiscordMessageMock.mockReset(); const setStatus = vi.fn(); createDiscordMessageHandler(createHandlerParams({ setStatus })); expect(setStatus).toHaveBeenCalledWith( expect.objectContaining({ activeRuns: 0, busy: false, }), ); }); it("returns immediately and tracks busy status while queued runs execute", async () => { preflightDiscordMessageMock.mockReset(); processDiscordMessageMock.mockReset(); const firstRun = createDeferred(); const secondRun = createDeferred(); processDiscordMessageMock .mockImplementationOnce(async () => { await firstRun.promise; }) .mockImplementationOnce(async () => { await secondRun.promise; }); preflightDiscordMessageMock.mockImplementation( async (params: { data: { channel_id: string } }) => createPreflightContext(params.data.channel_id), ); const setStatus = vi.fn(); const handler = createDiscordMessageHandler(createHandlerParams({ setStatus })); await expect(handler(createMessageData("m-1") as never, {} as never)).resolves.toBeUndefined(); await vi.waitFor(() => { expect(processDiscordMessageMock).toHaveBeenCalledTimes(1); }); expect(setStatus).toHaveBeenCalledWith( expect.objectContaining({ activeRuns: 1, busy: true, }), ); await expect(handler(createMessageData("m-2") as never, {} as never)).resolves.toBeUndefined(); await vi.waitFor(() => { expect(preflightDiscordMessageMock).toHaveBeenCalledTimes(2); }); expect(processDiscordMessageMock).toHaveBeenCalledTimes(1); firstRun.resolve(); await firstRun.promise; await vi.waitFor(() => { expect(processDiscordMessageMock).toHaveBeenCalledTimes(2); }); secondRun.resolve(); await secondRun.promise; await vi.waitFor(() => { expect(setStatus).toHaveBeenLastCalledWith( expect.objectContaining({ activeRuns: 0, busy: false, }), ); }); }); it("refreshes run activity while active runs are in progress", async () => { preflightDiscordMessageMock.mockReset(); processDiscordMessageMock.mockReset(); const runInFlight = createDeferred(); processDiscordMessageMock.mockImplementation(async () => { await runInFlight.promise; }); preflightDiscordMessageMock.mockImplementation( async (params: { data: { channel_id: string } }) => createPreflightContext(params.data.channel_id), ); let heartbeatTick: () => void = () => {}; let capturedHeartbeat = false; const setIntervalSpy = vi .spyOn(globalThis, "setInterval") .mockImplementation((callback: TimerHandler) => { if (typeof callback === "function") { heartbeatTick = () => { callback(); }; capturedHeartbeat = true; } return 1 as unknown as ReturnType; }); const clearIntervalSpy = vi.spyOn(globalThis, "clearInterval"); try { const setStatus = vi.fn(); const handler = createDiscordMessageHandler(createHandlerParams({ setStatus })); await expect( handler(createMessageData("m-1") as never, {} as never), ).resolves.toBeUndefined(); await vi.waitFor(() => { expect(processDiscordMessageMock).toHaveBeenCalledTimes(1); }); expect(capturedHeartbeat).toBe(true); const busyCallsBefore = setStatus.mock.calls.filter( ([patch]) => (patch as { busy?: boolean }).busy === true, ).length; heartbeatTick(); const busyCallsAfter = setStatus.mock.calls.filter( ([patch]) => (patch as { busy?: boolean }).busy === true, ).length; expect(busyCallsAfter).toBeGreaterThan(busyCallsBefore); runInFlight.resolve(); await runInFlight.promise; await vi.waitFor(() => { expect(clearIntervalSpy).toHaveBeenCalled(); }); } finally { setIntervalSpy.mockRestore(); clearIntervalSpy.mockRestore(); } }); it("stops status publishing after lifecycle abort", async () => { preflightDiscordMessageMock.mockReset(); processDiscordMessageMock.mockReset(); const runInFlight = createDeferred(); processDiscordMessageMock.mockImplementation(async () => { await runInFlight.promise; }); preflightDiscordMessageMock.mockImplementation( async (params: { data: { channel_id: string } }) => createPreflightContext(params.data.channel_id), ); const setStatus = vi.fn(); const abortController = new AbortController(); const handler = createDiscordMessageHandler( createHandlerParams({ setStatus, abortSignal: abortController.signal }), ); await expect(handler(createMessageData("m-1") as never, {} as never)).resolves.toBeUndefined(); await vi.waitFor(() => { expect(processDiscordMessageMock).toHaveBeenCalledTimes(1); }); const callsBeforeAbort = setStatus.mock.calls.length; abortController.abort(); runInFlight.resolve(); await runInFlight.promise; await Promise.resolve(); expect(setStatus.mock.calls.length).toBe(callsBeforeAbort); }); it("stops status publishing after handler deactivation", async () => { preflightDiscordMessageMock.mockReset(); processDiscordMessageMock.mockReset(); const runInFlight = createDeferred(); processDiscordMessageMock.mockImplementation(async () => { await runInFlight.promise; }); preflightDiscordMessageMock.mockImplementation( async (params: { data: { channel_id: string } }) => createPreflightContext(params.data.channel_id), ); const setStatus = vi.fn(); const handler = createDiscordMessageHandler(createHandlerParams({ setStatus })); await expect(handler(createMessageData("m-1") as never, {} as never)).resolves.toBeUndefined(); await vi.waitFor(() => { expect(processDiscordMessageMock).toHaveBeenCalledTimes(1); }); const callsBeforeDeactivate = setStatus.mock.calls.length; handler.deactivate(); runInFlight.resolve(); await runInFlight.promise; await Promise.resolve(); expect(setStatus.mock.calls.length).toBe(callsBeforeDeactivate); }); it("skips queued runs that have not started yet after deactivation", async () => { preflightDiscordMessageMock.mockReset(); processDiscordMessageMock.mockReset(); const firstRun = createDeferred(); processDiscordMessageMock .mockImplementationOnce(async () => { await firstRun.promise; }) .mockImplementationOnce(async () => undefined); preflightDiscordMessageMock.mockImplementation( async (params: { data: { channel_id: string } }) => createPreflightContext(params.data.channel_id), ); const handler = createDiscordMessageHandler(createHandlerParams()); await expect(handler(createMessageData("m-1") as never, {} as never)).resolves.toBeUndefined(); await vi.waitFor(() => { expect(processDiscordMessageMock).toHaveBeenCalledTimes(1); }); await expect(handler(createMessageData("m-2") as never, {} as never)).resolves.toBeUndefined(); handler.deactivate(); firstRun.resolve(); await firstRun.promise; await Promise.resolve(); expect(processDiscordMessageMock).toHaveBeenCalledTimes(1); }); it("preserves non-debounced message ordering by awaiting debouncer enqueue", async () => { preflightDiscordMessageMock.mockReset(); processDiscordMessageMock.mockReset(); const firstPreflight = createDeferred(); const processedMessageIds: string[] = []; preflightDiscordMessageMock.mockImplementation( async (params: { data: { channel_id: string; message?: { id?: string } } }) => { const messageId = params.data.message?.id ?? "unknown"; if (messageId === "m-1") { await firstPreflight.promise; } return { ...createPreflightContext(params.data.channel_id), messageId, }; }, ); processDiscordMessageMock.mockImplementation(async (ctx: { messageId?: string }) => { processedMessageIds.push(ctx.messageId ?? "unknown"); }); const handler = createDiscordMessageHandler(createHandlerParams()); const sequentialDispatch = (async () => { await handler(createMessageData("m-1") as never, {} as never); await handler(createMessageData("m-2") as never, {} as never); })(); await vi.waitFor(() => { expect(preflightDiscordMessageMock).toHaveBeenCalledTimes(1); }); await Promise.resolve(); expect(preflightDiscordMessageMock).toHaveBeenCalledTimes(1); firstPreflight.resolve(); await sequentialDispatch; await vi.waitFor(() => { expect(processDiscordMessageMock).toHaveBeenCalledTimes(2); }); expect(processedMessageIds).toEqual(["m-1", "m-2"]); }); it("recovers queue progress after a run failure without leaving busy state stuck", async () => { preflightDiscordMessageMock.mockReset(); processDiscordMessageMock.mockReset(); const firstRun = createDeferred(); processDiscordMessageMock .mockImplementationOnce(async () => { await firstRun.promise; throw new Error("simulated run failure"); }) .mockImplementationOnce(async () => undefined); preflightDiscordMessageMock.mockImplementation( async (params: { data: { channel_id: string } }) => createPreflightContext(params.data.channel_id), ); const setStatus = vi.fn(); const handler = createDiscordMessageHandler(createHandlerParams({ setStatus })); await expect(handler(createMessageData("m-1") as never, {} as never)).resolves.toBeUndefined(); await expect(handler(createMessageData("m-2") as never, {} as never)).resolves.toBeUndefined(); firstRun.resolve(); await firstRun.promise.catch(() => undefined); await vi.waitFor(() => { expect(processDiscordMessageMock).toHaveBeenCalledTimes(2); }); await vi.waitFor(() => { expect(setStatus).toHaveBeenCalledWith( expect.objectContaining({ activeRuns: 0, busy: false }), ); }); }); });