import "./test-helpers.js"; import crypto from "node:crypto"; import fs from "node:fs/promises"; import { beforeAll, describe, expect, it, vi } from "vitest"; import type { OpenClawConfig } from "../../../src/config/config.js"; import { setLoggerOverride } from "../../../src/logging.js"; import { withEnvAsync } from "../../../src/test-utils/env.js"; import { escapeRegExp, formatEnvelopeTimestamp } from "../../../test/helpers/envelope-timestamp.js"; import { createMockWebListener, createWebListenerFactoryCapture, installWebAutoReplyTestHomeHooks, installWebAutoReplyUnitTestHooks, makeSessionStore, resetLoadConfigMock, setLoadConfigMock, } from "./auto-reply.test-harness.js"; import type { WebInboundMessage } from "./inbound.js"; installWebAutoReplyTestHomeHooks(); function createRuntime() { return { log: vi.fn(), error: vi.fn(), exit: vi.fn(), }; } function startMonitorWebChannel(params: { monitorWebChannelFn: (...args: unknown[]) => Promise; listenerFactory: unknown; sleep: ReturnType; signal?: AbortSignal; heartbeatSeconds?: number; messageTimeoutMs?: number; watchdogCheckMs?: number; reconnect?: { initialMs: number; maxMs: number; maxAttempts: number; factor: number }; }) { const runtime = createRuntime(); const controller = new AbortController(); const run = params.monitorWebChannelFn( false, params.listenerFactory as never, true, async () => ({ text: "ok" }), runtime as never, params.signal ?? controller.signal, { heartbeatSeconds: params.heartbeatSeconds ?? 1, messageTimeoutMs: params.messageTimeoutMs, watchdogCheckMs: params.watchdogCheckMs, reconnect: params.reconnect ?? { initialMs: 10, maxMs: 10, maxAttempts: 3, factor: 1.1 }, sleep: params.sleep, }, ); return { runtime, controller, run }; } function makeInboundMessage(params: { body: string; from: string; to: string; id?: string; timestamp?: number; sendComposing: ReturnType; reply: ReturnType; sendMedia: ReturnType; }): WebInboundMessage { return { body: params.body, from: params.from, to: params.to, id: params.id, timestamp: params.timestamp, conversationId: params.from, accountId: "default", chatType: "direct", chatId: params.from, sendComposing: params.sendComposing as unknown as WebInboundMessage["sendComposing"], reply: params.reply as unknown as WebInboundMessage["reply"], sendMedia: params.sendMedia as unknown as WebInboundMessage["sendMedia"], }; } describe("web auto-reply connection", () => { installWebAutoReplyUnitTestHooks(); let monitorWebChannel: typeof import("./auto-reply.js").monitorWebChannel; beforeAll(async () => { ({ monitorWebChannel } = await import("./auto-reply.js")); }); it("handles helper envelope timestamps with trimmed timezones (regression)", () => { const d = new Date("2025-01-01T00:00:00.000Z"); expect(() => formatEnvelopeTimestamp(d, " America/Los_Angeles ")).not.toThrow(); }); it("handles reconnect progress and max-attempt stop behavior", async () => { for (const scenario of [ { reconnect: { initialMs: 10, maxMs: 10, maxAttempts: 3, factor: 1.1 }, expectedCallsAfterFirstClose: 2, closeTwiceAndFinish: false, expectedError: "Retry 1", }, { reconnect: { initialMs: 5, maxMs: 5, maxAttempts: 2, factor: 1.1 }, expectedCallsAfterFirstClose: 2, closeTwiceAndFinish: true, expectedError: "max attempts reached", }, ]) { const closeResolvers: Array<() => void> = []; const sleep = vi.fn(async () => {}); const listenerFactory = vi.fn(async () => { const onClose = new Promise((res) => { closeResolvers.push(res); }); return { close: vi.fn(), onClose }; }); const { runtime, controller, run } = startMonitorWebChannel({ monitorWebChannelFn: monitorWebChannel as never, listenerFactory, sleep, reconnect: scenario.reconnect, }); await Promise.resolve(); expect(listenerFactory).toHaveBeenCalledTimes(1); closeResolvers.shift()?.(); await vi.waitFor( () => { expect(listenerFactory).toHaveBeenCalledTimes(scenario.expectedCallsAfterFirstClose); }, { timeout: 250, interval: 2 }, ); if (scenario.closeTwiceAndFinish) { closeResolvers.shift()?.(); await run; } else { controller.abort(); closeResolvers.shift()?.(); await Promise.resolve(); await run; } expect(runtime.error).toHaveBeenCalledWith(expect.stringContaining(scenario.expectedError)); } }); it("treats status 440 as non-retryable and stops without retrying", async () => { const closeResolvers: Array<(reason?: unknown) => void> = []; const sleep = vi.fn(async () => {}); const listenerFactory = vi.fn(async () => { const onClose = new Promise((res) => { closeResolvers.push(res); }); return { close: vi.fn(), onClose }; }); const { runtime, controller, run } = startMonitorWebChannel({ monitorWebChannelFn: monitorWebChannel as never, listenerFactory, sleep, reconnect: { initialMs: 10, maxMs: 10, maxAttempts: 3, factor: 1.1 }, }); await Promise.resolve(); expect(listenerFactory).toHaveBeenCalledTimes(1); closeResolvers.shift()?.({ status: 440, isLoggedOut: false, error: "Unknown Stream Errored (conflict)", }); const completedQuickly = await Promise.race([ run.then(() => true), new Promise((resolve) => setTimeout(() => resolve(false), 60)), ]); if (!completedQuickly) { await vi.waitFor( () => { expect(listenerFactory).toHaveBeenCalledTimes(2); }, { timeout: 250, interval: 2 }, ); controller.abort(); closeResolvers[1]?.({ status: 499, isLoggedOut: false, error: "aborted" }); await run; } expect(completedQuickly).toBe(true); expect(listenerFactory).toHaveBeenCalledTimes(1); expect(sleep).not.toHaveBeenCalled(); expect(runtime.error).toHaveBeenCalledWith(expect.stringContaining("status 440")); expect(runtime.error).toHaveBeenCalledWith(expect.stringContaining("session conflict")); expect(runtime.error).toHaveBeenCalledWith(expect.stringContaining("Stopping web monitoring")); }); it("forces reconnect when watchdog closes without onClose", async () => { vi.useFakeTimers(); try { const sleep = vi.fn(async () => {}); const closeResolvers: Array<(reason: unknown) => void> = []; let capturedOnMessage: | ((msg: import("./inbound.js").WebInboundMessage) => Promise) | undefined; const listenerFactory = vi.fn( async (opts: { onMessage: (msg: import("./inbound.js").WebInboundMessage) => Promise; }) => { capturedOnMessage = opts.onMessage; let resolveClose: (reason: unknown) => void = () => {}; const onClose = new Promise((res) => { resolveClose = res; closeResolvers.push(res); }); return { close: vi.fn(), onClose, signalClose: (reason?: unknown) => resolveClose(reason), }; }, ); const { controller, run } = startMonitorWebChannel({ monitorWebChannelFn: monitorWebChannel as never, listenerFactory, sleep, heartbeatSeconds: 60, messageTimeoutMs: 30, watchdogCheckMs: 5, }); await Promise.resolve(); expect(listenerFactory).toHaveBeenCalledTimes(1); await vi.waitFor( () => { expect(capturedOnMessage).toBeTypeOf("function"); }, { timeout: 250, interval: 2 }, ); const reply = vi.fn().mockResolvedValue(undefined); const sendComposing = vi.fn(); const sendMedia = vi.fn(); void capturedOnMessage?.( makeInboundMessage({ body: "hi", from: "+1", to: "+2", id: "m1", sendComposing, reply, sendMedia, }), ); await vi.advanceTimersByTimeAsync(200); await Promise.resolve(); await vi.waitFor( () => { expect(listenerFactory).toHaveBeenCalledTimes(2); }, { timeout: 250, interval: 2 }, ); controller.abort(); closeResolvers[1]?.({ status: 499, isLoggedOut: false }); await Promise.resolve(); await run; } finally { vi.useRealTimers(); } }); it("processes inbound messages without batching and preserves timestamps", async () => { await withEnvAsync({ TZ: "Europe/Vienna" }, async () => { const originalMax = process.getMaxListeners(); process.setMaxListeners?.(1); const store = await makeSessionStore({ main: { sessionId: "sid", updatedAt: Date.now() }, }); try { const sendMedia = vi.fn(); const reply = vi.fn().mockResolvedValue(undefined); const sendComposing = vi.fn(); const resolver = vi.fn().mockResolvedValue({ text: "ok" }); const capture = createWebListenerFactoryCapture(); setLoadConfigMock(() => ({ agents: { defaults: { envelopeTimezone: "utc", }, }, session: { store: store.storePath }, })); await monitorWebChannel(false, capture.listenerFactory as never, false, resolver); const capturedOnMessage = capture.getOnMessage(); expect(capturedOnMessage).toBeDefined(); await capturedOnMessage?.( makeInboundMessage({ body: "first", from: "+1", to: "+2", id: "m1", timestamp: 1735689600000, sendComposing, reply, sendMedia, }), ); await capturedOnMessage?.( makeInboundMessage({ body: "second", from: "+1", to: "+2", id: "m2", timestamp: 1735693200000, sendComposing, reply, sendMedia, }), ); expect(resolver).toHaveBeenCalledTimes(2); const firstArgs = resolver.mock.calls[0][0]; const secondArgs = resolver.mock.calls[1][0]; const firstTimestamp = formatEnvelopeTimestamp(new Date("2025-01-01T00:00:00Z")); const secondTimestamp = formatEnvelopeTimestamp(new Date("2025-01-01T01:00:00Z")); const firstPattern = escapeRegExp(firstTimestamp); const secondPattern = escapeRegExp(secondTimestamp); expect(firstArgs.Body).toMatch( new RegExp(`\\[WhatsApp \\+1 (\\+\\d+[smhd] )?${firstPattern}\\] \\[openclaw\\] first`), ); expect(firstArgs.Body).not.toContain("second"); expect(secondArgs.Body).toMatch( new RegExp(`\\[WhatsApp \\+1 (\\+\\d+[smhd] )?${secondPattern}\\] \\[openclaw\\] second`), ); expect(secondArgs.Body).not.toContain("first"); expect(process.getMaxListeners?.()).toBeGreaterThanOrEqual(50); } finally { process.setMaxListeners?.(originalMax); await store.cleanup(); resetLoadConfigMock(); } }); }); it("emits heartbeat logs with connection metadata", async () => { vi.useFakeTimers(); const logPath = `/tmp/openclaw-heartbeat-${crypto.randomUUID()}.log`; setLoggerOverride({ level: "trace", file: logPath }); const runtime = { log: vi.fn(), error: vi.fn(), exit: vi.fn(), }; const controller = new AbortController(); const listenerFactory = vi.fn(async () => { const onClose = new Promise(() => { // never resolves; abort will short-circuit }); return { close: vi.fn(), onClose }; }); const run = monitorWebChannel( false, listenerFactory as never, true, async () => ({ text: "ok" }), runtime as never, controller.signal, { heartbeatSeconds: 1, reconnect: { initialMs: 5, maxMs: 5, maxAttempts: 1, factor: 1.1 }, }, ); await vi.advanceTimersByTimeAsync(1_000); controller.abort(); await vi.runAllTimersAsync(); await run.catch(() => {}); const content = await fs.readFile(logPath, "utf-8"); expect(content).toMatch(/web-heartbeat/); expect(content).toMatch(/connectionId/); expect(content).toMatch(/messagesHandled/); }); it("logs outbound replies to file", async () => { const logPath = `/tmp/openclaw-log-test-${crypto.randomUUID()}.log`; setLoggerOverride({ level: "trace", file: logPath }); const capture = createWebListenerFactoryCapture(); const resolver = vi.fn().mockResolvedValue({ text: "auto" }); await monitorWebChannel(false, capture.listenerFactory as never, false, resolver as never); const capturedOnMessage = capture.getOnMessage(); expect(capturedOnMessage).toBeDefined(); await capturedOnMessage?.({ body: "hello", from: "+1", conversationId: "+1", to: "+2", accountId: "default", chatType: "direct", chatId: "+1", id: "msg1", sendComposing: vi.fn(), reply: vi.fn(), sendMedia: vi.fn(), }); const content = await fs.readFile(logPath, "utf-8"); expect(content).toMatch(/web-auto-reply/); expect(content).toMatch(/auto/); }); it("marks dispatch idle after replies flush", async () => { const markDispatchIdle = vi.fn(); const typingMock = { onReplyStart: vi.fn(async () => {}), startTypingLoop: vi.fn(async () => {}), startTypingOnText: vi.fn(async () => {}), refreshTypingTtl: vi.fn(), isActive: vi.fn(() => false), markRunComplete: vi.fn(), markDispatchIdle, cleanup: vi.fn(), }; const reply = vi.fn().mockResolvedValue(undefined); const sendComposing = vi.fn().mockResolvedValue(undefined); const sendMedia = vi.fn().mockResolvedValue(undefined); const replyResolver = vi.fn().mockImplementation(async (_ctx, opts) => { opts?.onTypingController?.(typingMock); return { text: "final reply" }; }); const mockConfig: OpenClawConfig = { channels: { whatsapp: { allowFrom: ["*"] } }, }; setLoadConfigMock(mockConfig); await monitorWebChannel( false, async ({ onMessage }) => { await onMessage({ id: "m1", from: "+1000", conversationId: "+1000", to: "+2000", body: "hello", timestamp: Date.now(), chatType: "direct", chatId: "direct:+1000", accountId: "default", sendComposing, reply, sendMedia, }); return createMockWebListener(); }, false, replyResolver, ); resetLoadConfigMock(); expect(markDispatchIdle).toHaveBeenCalled(); }); });