test: add Zalo reply-once lifecycle regression

This commit is contained in:
Tak Hoffman 2026-03-19 16:45:35 -05:00
parent 5841e3b493
commit 566e4cf77b
No known key found for this signature in database

View File

@ -0,0 +1,315 @@
import { createServer, type RequestListener } from "node:http";
import type { AddressInfo } from "node:net";
import { afterEach, beforeEach, describe, expect, it, vi } from "vitest";
import { createPluginRuntimeMock } from "../../../test/helpers/extensions/plugin-runtime-mock.js";
import { createEmptyPluginRegistry } from "../../../src/plugins/registry.js";
import { setActivePluginRegistry } from "../../../src/plugins/runtime.js";
import type { OpenClawConfig, PluginRuntime } from "../runtime-api.js";
import {
clearZaloWebhookSecurityStateForTest,
monitorZaloProvider,
} from "./monitor.js";
import type { ResolvedZaloAccount } from "./accounts.js";
const setWebhookMock = vi.hoisted(() => vi.fn(async () => ({ ok: true, result: { url: "" } })));
const deleteWebhookMock = vi.hoisted(() => vi.fn(async () => ({ ok: true, result: { url: "" } })));
const getWebhookInfoMock = vi.hoisted(() => vi.fn(async () => ({ ok: true, result: { url: "" } })));
const getUpdatesMock = vi.hoisted(() => vi.fn(() => new Promise(() => {})));
const sendChatActionMock = vi.hoisted(() => vi.fn(async () => ({ ok: true })));
const sendMessageMock = vi.hoisted(() =>
vi.fn(async () => ({ ok: true, result: { message_id: "reply-zalo-1" } })),
);
const sendPhotoMock = vi.hoisted(() => vi.fn(async () => ({ ok: true })));
const getZaloRuntimeMock = vi.hoisted(() => vi.fn());
vi.mock("./api.js", async (importOriginal) => {
const actual = await importOriginal<typeof import("./api.js")>();
return {
...actual,
deleteWebhook: deleteWebhookMock,
getUpdates: getUpdatesMock,
getWebhookInfo: getWebhookInfoMock,
sendChatAction: sendChatActionMock,
sendMessage: sendMessageMock,
sendPhoto: sendPhotoMock,
setWebhook: setWebhookMock,
};
});
vi.mock("./runtime.js", () => ({
getZaloRuntime: getZaloRuntimeMock,
}));
async function withServer(handler: RequestListener, fn: (baseUrl: string) => Promise<void>) {
const server = createServer(handler);
await new Promise<void>((resolve) => {
server.listen(0, "127.0.0.1", () => resolve());
});
const address = server.address() as AddressInfo | null;
if (!address) {
throw new Error("missing server address");
}
try {
await fn(`http://127.0.0.1:${address.port}`);
} finally {
await new Promise<void>((resolve) => server.close(() => resolve()));
}
}
function createLifecycleConfig(): OpenClawConfig {
return {
channels: {
zalo: {
enabled: true,
accounts: {
"acct-zalo-lifecycle": {
enabled: true,
webhookUrl: "https://example.com/hooks/zalo",
webhookSecret: "supersecret", // pragma: allowlist secret
dmPolicy: "open",
},
},
},
},
} as OpenClawConfig;
}
function createLifecycleAccount(): ResolvedZaloAccount {
return {
accountId: "acct-zalo-lifecycle",
enabled: true,
token: "zalo-token",
tokenSource: "config",
config: {
webhookUrl: "https://example.com/hooks/zalo",
webhookSecret: "supersecret", // pragma: allowlist secret
dmPolicy: "open",
},
} as ResolvedZaloAccount;
}
function createRuntimeEnv() {
return {
log: vi.fn<(message: string) => void>(),
error: vi.fn<(message: string) => void>(),
};
}
function createTextUpdate(messageId: string) {
return {
event_name: "message.text.received",
message: {
from: { id: "user-1", name: "User One" },
chat: { id: "dm-chat-1", chat_type: "PRIVATE" as const },
message_id: messageId,
date: Math.floor(Date.now() / 1000),
text: "hello from zalo",
},
};
}
async function settleAsyncWork(): Promise<void> {
for (let i = 0; i < 6; i += 1) {
await Promise.resolve();
await new Promise((resolve) => setTimeout(resolve, 0));
}
}
async function postWebhookUpdate(params: {
baseUrl: string;
path: string;
secret: string;
payload: Record<string, unknown>;
}) {
return await fetch(`${params.baseUrl}${params.path}`, {
method: "POST",
headers: {
"content-type": "application/json",
"x-bot-api-secret-token": params.secret,
},
body: JSON.stringify(params.payload),
});
}
describe("Zalo reply-once lifecycle", () => {
const finalizeInboundContextMock = vi.fn((ctx: Record<string, unknown>) => ctx);
const recordInboundSessionMock = vi.fn(async () => undefined);
const resolveAgentRouteMock = vi.fn(() => ({
agentId: "main",
channel: "zalo",
accountId: "acct-zalo-lifecycle",
sessionKey: "agent:main:zalo:direct:dm-chat-1",
mainSessionKey: "agent:main:main",
matchedBy: "default",
}));
const dispatchReplyWithBufferedBlockDispatcherMock = vi.fn();
beforeEach(() => {
vi.clearAllMocks();
clearZaloWebhookSecurityStateForTest();
getZaloRuntimeMock.mockReturnValue(
createPluginRuntimeMock({
channel: {
routing: {
resolveAgentRoute:
resolveAgentRouteMock as unknown as PluginRuntime["channel"]["routing"]["resolveAgentRoute"],
},
reply: {
finalizeInboundContext:
finalizeInboundContextMock as unknown as PluginRuntime["channel"]["reply"]["finalizeInboundContext"],
dispatchReplyWithBufferedBlockDispatcher:
dispatchReplyWithBufferedBlockDispatcherMock as unknown as PluginRuntime["channel"]["reply"]["dispatchReplyWithBufferedBlockDispatcher"],
},
session: {
recordInboundSession:
recordInboundSessionMock as unknown as PluginRuntime["channel"]["session"]["recordInboundSession"],
},
},
}),
);
});
afterEach(() => {
setActivePluginRegistry(createEmptyPluginRegistry());
});
it("routes one accepted webhook event to one visible reply across duplicate replay", async () => {
dispatchReplyWithBufferedBlockDispatcherMock.mockImplementation(async ({ dispatcherOptions }) => {
await dispatcherOptions.deliver({ text: "zalo reply once" });
});
const registry = createEmptyPluginRegistry();
setActivePluginRegistry(registry);
const abort = new AbortController();
const runtime = createRuntimeEnv();
const run = monitorZaloProvider({
token: "zalo-token",
account: createLifecycleAccount(),
config: createLifecycleConfig(),
runtime,
abortSignal: abort.signal,
useWebhook: true,
webhookUrl: "https://example.com/hooks/zalo",
webhookSecret: "supersecret",
});
await vi.waitFor(() => expect(setWebhookMock).toHaveBeenCalledTimes(1));
expect(registry.httpRoutes).toHaveLength(1);
const route = registry.httpRoutes[0];
if (!route) {
throw new Error("missing plugin HTTP route");
}
await withServer((req, res) => route.handler(req, res), async (baseUrl) => {
const payload = createTextUpdate(`zalo-replay-${Date.now()}`);
const first = await postWebhookUpdate({
baseUrl,
path: "/hooks/zalo",
secret: "supersecret",
payload,
});
const second = await postWebhookUpdate({
baseUrl,
path: "/hooks/zalo",
secret: "supersecret",
payload,
});
expect(first.status).toBe(200);
expect(second.status).toBe(200);
await settleAsyncWork();
});
expect(finalizeInboundContextMock).toHaveBeenCalledTimes(1);
expect(finalizeInboundContextMock).toHaveBeenCalledWith(
expect.objectContaining({
AccountId: "acct-zalo-lifecycle",
SessionKey: "agent:main:zalo:direct:dm-chat-1",
MessageSid: expect.stringContaining("zalo-replay-"),
From: "zalo:user-1",
To: "zalo:dm-chat-1",
}),
);
expect(recordInboundSessionMock).toHaveBeenCalledTimes(1);
expect(recordInboundSessionMock).toHaveBeenCalledWith(
expect.objectContaining({
sessionKey: "agent:main:zalo:direct:dm-chat-1",
}),
);
expect(sendMessageMock).toHaveBeenCalledTimes(1);
expect(sendMessageMock).toHaveBeenCalledWith(
"zalo-token",
expect.objectContaining({
chat_id: "dm-chat-1",
text: "zalo reply once",
}),
undefined,
);
abort.abort();
await run;
});
it("does not emit a second visible reply when replay arrives after a post-send failure", async () => {
let dispatchAttempts = 0;
dispatchReplyWithBufferedBlockDispatcherMock.mockImplementation(async ({ dispatcherOptions }) => {
dispatchAttempts += 1;
await dispatcherOptions.deliver({ text: "zalo reply after failure" });
if (dispatchAttempts === 1) {
throw new Error("post-send failure");
}
});
const registry = createEmptyPluginRegistry();
setActivePluginRegistry(registry);
const abort = new AbortController();
const runtime = createRuntimeEnv();
const run = monitorZaloProvider({
token: "zalo-token",
account: createLifecycleAccount(),
config: createLifecycleConfig(),
runtime,
abortSignal: abort.signal,
useWebhook: true,
webhookUrl: "https://example.com/hooks/zalo",
webhookSecret: "supersecret",
});
await vi.waitFor(() => expect(setWebhookMock).toHaveBeenCalledTimes(1));
const route = registry.httpRoutes[0];
if (!route) {
throw new Error("missing plugin HTTP route");
}
await withServer((req, res) => route.handler(req, res), async (baseUrl) => {
const payload = createTextUpdate(`zalo-retry-${Date.now()}`);
const first = await postWebhookUpdate({
baseUrl,
path: "/hooks/zalo",
secret: "supersecret",
payload,
});
await settleAsyncWork();
const replay = await postWebhookUpdate({
baseUrl,
path: "/hooks/zalo",
secret: "supersecret",
payload,
});
expect(first.status).toBe(200);
expect(replay.status).toBe(200);
await settleAsyncWork();
});
expect(dispatchReplyWithBufferedBlockDispatcherMock).toHaveBeenCalledTimes(1);
expect(sendMessageMock).toHaveBeenCalledTimes(1);
expect(runtime.error).toHaveBeenCalledWith(
expect.stringContaining("Zalo webhook failed: Error: post-send failure"),
);
abort.abort();
await run;
});
});