From 566e4cf77b2d44048e9e5d47b1a1a2c263c1ef42 Mon Sep 17 00:00:00 2001 From: Tak Hoffman <781889+Takhoffman@users.noreply.github.com> Date: Thu, 19 Mar 2026 16:45:35 -0500 Subject: [PATCH] test: add Zalo reply-once lifecycle regression --- .../src/monitor.reply-once.lifecycle.test.ts | 315 ++++++++++++++++++ 1 file changed, 315 insertions(+) create mode 100644 extensions/zalo/src/monitor.reply-once.lifecycle.test.ts diff --git a/extensions/zalo/src/monitor.reply-once.lifecycle.test.ts b/extensions/zalo/src/monitor.reply-once.lifecycle.test.ts new file mode 100644 index 00000000000..ab4b409fc23 --- /dev/null +++ b/extensions/zalo/src/monitor.reply-once.lifecycle.test.ts @@ -0,0 +1,315 @@ +import { createServer, type RequestListener } from "node:http"; +import type { AddressInfo } from "node:net"; +import { afterEach, beforeEach, describe, expect, it, vi } from "vitest"; +import { createPluginRuntimeMock } from "../../../test/helpers/extensions/plugin-runtime-mock.js"; +import { createEmptyPluginRegistry } from "../../../src/plugins/registry.js"; +import { setActivePluginRegistry } from "../../../src/plugins/runtime.js"; +import type { OpenClawConfig, PluginRuntime } from "../runtime-api.js"; +import { + clearZaloWebhookSecurityStateForTest, + monitorZaloProvider, +} from "./monitor.js"; +import type { ResolvedZaloAccount } from "./accounts.js"; + +const setWebhookMock = vi.hoisted(() => vi.fn(async () => ({ ok: true, result: { url: "" } }))); +const deleteWebhookMock = vi.hoisted(() => vi.fn(async () => ({ ok: true, result: { url: "" } }))); +const getWebhookInfoMock = vi.hoisted(() => vi.fn(async () => ({ ok: true, result: { url: "" } }))); +const getUpdatesMock = vi.hoisted(() => vi.fn(() => new Promise(() => {}))); +const sendChatActionMock = vi.hoisted(() => vi.fn(async () => ({ ok: true }))); +const sendMessageMock = vi.hoisted(() => + vi.fn(async () => ({ ok: true, result: { message_id: "reply-zalo-1" } })), +); +const sendPhotoMock = vi.hoisted(() => vi.fn(async () => ({ ok: true }))); +const getZaloRuntimeMock = vi.hoisted(() => vi.fn()); + +vi.mock("./api.js", async (importOriginal) => { + const actual = await importOriginal(); + return { + ...actual, + deleteWebhook: deleteWebhookMock, + getUpdates: getUpdatesMock, + getWebhookInfo: getWebhookInfoMock, + sendChatAction: sendChatActionMock, + sendMessage: sendMessageMock, + sendPhoto: sendPhotoMock, + setWebhook: setWebhookMock, + }; +}); + +vi.mock("./runtime.js", () => ({ + getZaloRuntime: getZaloRuntimeMock, +})); + +async function withServer(handler: RequestListener, fn: (baseUrl: string) => Promise) { + const server = createServer(handler); + await new Promise((resolve) => { + server.listen(0, "127.0.0.1", () => resolve()); + }); + const address = server.address() as AddressInfo | null; + if (!address) { + throw new Error("missing server address"); + } + try { + await fn(`http://127.0.0.1:${address.port}`); + } finally { + await new Promise((resolve) => server.close(() => resolve())); + } +} + +function createLifecycleConfig(): OpenClawConfig { + return { + channels: { + zalo: { + enabled: true, + accounts: { + "acct-zalo-lifecycle": { + enabled: true, + webhookUrl: "https://example.com/hooks/zalo", + webhookSecret: "supersecret", // pragma: allowlist secret + dmPolicy: "open", + }, + }, + }, + }, + } as OpenClawConfig; +} + +function createLifecycleAccount(): ResolvedZaloAccount { + return { + accountId: "acct-zalo-lifecycle", + enabled: true, + token: "zalo-token", + tokenSource: "config", + config: { + webhookUrl: "https://example.com/hooks/zalo", + webhookSecret: "supersecret", // pragma: allowlist secret + dmPolicy: "open", + }, + } as ResolvedZaloAccount; +} + +function createRuntimeEnv() { + return { + log: vi.fn<(message: string) => void>(), + error: vi.fn<(message: string) => void>(), + }; +} + +function createTextUpdate(messageId: string) { + return { + event_name: "message.text.received", + message: { + from: { id: "user-1", name: "User One" }, + chat: { id: "dm-chat-1", chat_type: "PRIVATE" as const }, + message_id: messageId, + date: Math.floor(Date.now() / 1000), + text: "hello from zalo", + }, + }; +} + +async function settleAsyncWork(): Promise { + for (let i = 0; i < 6; i += 1) { + await Promise.resolve(); + await new Promise((resolve) => setTimeout(resolve, 0)); + } +} + +async function postWebhookUpdate(params: { + baseUrl: string; + path: string; + secret: string; + payload: Record; +}) { + return await fetch(`${params.baseUrl}${params.path}`, { + method: "POST", + headers: { + "content-type": "application/json", + "x-bot-api-secret-token": params.secret, + }, + body: JSON.stringify(params.payload), + }); +} + +describe("Zalo reply-once lifecycle", () => { + const finalizeInboundContextMock = vi.fn((ctx: Record) => ctx); + const recordInboundSessionMock = vi.fn(async () => undefined); + const resolveAgentRouteMock = vi.fn(() => ({ + agentId: "main", + channel: "zalo", + accountId: "acct-zalo-lifecycle", + sessionKey: "agent:main:zalo:direct:dm-chat-1", + mainSessionKey: "agent:main:main", + matchedBy: "default", + })); + const dispatchReplyWithBufferedBlockDispatcherMock = vi.fn(); + + beforeEach(() => { + vi.clearAllMocks(); + clearZaloWebhookSecurityStateForTest(); + + getZaloRuntimeMock.mockReturnValue( + createPluginRuntimeMock({ + channel: { + routing: { + resolveAgentRoute: + resolveAgentRouteMock as unknown as PluginRuntime["channel"]["routing"]["resolveAgentRoute"], + }, + reply: { + finalizeInboundContext: + finalizeInboundContextMock as unknown as PluginRuntime["channel"]["reply"]["finalizeInboundContext"], + dispatchReplyWithBufferedBlockDispatcher: + dispatchReplyWithBufferedBlockDispatcherMock as unknown as PluginRuntime["channel"]["reply"]["dispatchReplyWithBufferedBlockDispatcher"], + }, + session: { + recordInboundSession: + recordInboundSessionMock as unknown as PluginRuntime["channel"]["session"]["recordInboundSession"], + }, + }, + }), + ); + }); + + afterEach(() => { + setActivePluginRegistry(createEmptyPluginRegistry()); + }); + + it("routes one accepted webhook event to one visible reply across duplicate replay", async () => { + dispatchReplyWithBufferedBlockDispatcherMock.mockImplementation(async ({ dispatcherOptions }) => { + await dispatcherOptions.deliver({ text: "zalo reply once" }); + }); + + const registry = createEmptyPluginRegistry(); + setActivePluginRegistry(registry); + const abort = new AbortController(); + const runtime = createRuntimeEnv(); + const run = monitorZaloProvider({ + token: "zalo-token", + account: createLifecycleAccount(), + config: createLifecycleConfig(), + runtime, + abortSignal: abort.signal, + useWebhook: true, + webhookUrl: "https://example.com/hooks/zalo", + webhookSecret: "supersecret", + }); + + await vi.waitFor(() => expect(setWebhookMock).toHaveBeenCalledTimes(1)); + expect(registry.httpRoutes).toHaveLength(1); + const route = registry.httpRoutes[0]; + if (!route) { + throw new Error("missing plugin HTTP route"); + } + + await withServer((req, res) => route.handler(req, res), async (baseUrl) => { + const payload = createTextUpdate(`zalo-replay-${Date.now()}`); + const first = await postWebhookUpdate({ + baseUrl, + path: "/hooks/zalo", + secret: "supersecret", + payload, + }); + const second = await postWebhookUpdate({ + baseUrl, + path: "/hooks/zalo", + secret: "supersecret", + payload, + }); + + expect(first.status).toBe(200); + expect(second.status).toBe(200); + await settleAsyncWork(); + }); + + expect(finalizeInboundContextMock).toHaveBeenCalledTimes(1); + expect(finalizeInboundContextMock).toHaveBeenCalledWith( + expect.objectContaining({ + AccountId: "acct-zalo-lifecycle", + SessionKey: "agent:main:zalo:direct:dm-chat-1", + MessageSid: expect.stringContaining("zalo-replay-"), + From: "zalo:user-1", + To: "zalo:dm-chat-1", + }), + ); + expect(recordInboundSessionMock).toHaveBeenCalledTimes(1); + expect(recordInboundSessionMock).toHaveBeenCalledWith( + expect.objectContaining({ + sessionKey: "agent:main:zalo:direct:dm-chat-1", + }), + ); + expect(sendMessageMock).toHaveBeenCalledTimes(1); + expect(sendMessageMock).toHaveBeenCalledWith( + "zalo-token", + expect.objectContaining({ + chat_id: "dm-chat-1", + text: "zalo reply once", + }), + undefined, + ); + + abort.abort(); + await run; + }); + + it("does not emit a second visible reply when replay arrives after a post-send failure", async () => { + let dispatchAttempts = 0; + dispatchReplyWithBufferedBlockDispatcherMock.mockImplementation(async ({ dispatcherOptions }) => { + dispatchAttempts += 1; + await dispatcherOptions.deliver({ text: "zalo reply after failure" }); + if (dispatchAttempts === 1) { + throw new Error("post-send failure"); + } + }); + + const registry = createEmptyPluginRegistry(); + setActivePluginRegistry(registry); + const abort = new AbortController(); + const runtime = createRuntimeEnv(); + const run = monitorZaloProvider({ + token: "zalo-token", + account: createLifecycleAccount(), + config: createLifecycleConfig(), + runtime, + abortSignal: abort.signal, + useWebhook: true, + webhookUrl: "https://example.com/hooks/zalo", + webhookSecret: "supersecret", + }); + + await vi.waitFor(() => expect(setWebhookMock).toHaveBeenCalledTimes(1)); + const route = registry.httpRoutes[0]; + if (!route) { + throw new Error("missing plugin HTTP route"); + } + + await withServer((req, res) => route.handler(req, res), async (baseUrl) => { + const payload = createTextUpdate(`zalo-retry-${Date.now()}`); + const first = await postWebhookUpdate({ + baseUrl, + path: "/hooks/zalo", + secret: "supersecret", + payload, + }); + await settleAsyncWork(); + const replay = await postWebhookUpdate({ + baseUrl, + path: "/hooks/zalo", + secret: "supersecret", + payload, + }); + + expect(first.status).toBe(200); + expect(replay.status).toBe(200); + await settleAsyncWork(); + }); + + expect(dispatchReplyWithBufferedBlockDispatcherMock).toHaveBeenCalledTimes(1); + expect(sendMessageMock).toHaveBeenCalledTimes(1); + expect(runtime.error).toHaveBeenCalledWith( + expect.stringContaining("Zalo webhook failed: Error: post-send failure"), + ); + + abort.abort(); + await run; + }); +});