import * as crypto from "crypto"; import * as http from "http"; import * as Lark from "@larksuiteoapi/node-sdk"; import { type ClawdbotConfig, type RuntimeEnv, type HistoryEntry, installRequestBodyLimitGuard, } from "openclaw/plugin-sdk"; import { resolveFeishuAccount, listEnabledFeishuAccounts } from "./accounts.js"; import { handleFeishuMessage, type FeishuMessageEvent, type FeishuBotAddedEvent } from "./bot.js"; import { createFeishuWSClient, createEventDispatcher } from "./client.js"; import { probeFeishu } from "./probe.js"; import { getMessageFeishu } from "./send.js"; import type { ResolvedFeishuAccount } from "./types.js"; export type MonitorFeishuOpts = { config?: ClawdbotConfig; runtime?: RuntimeEnv; abortSignal?: AbortSignal; accountId?: string; }; // Per-account WebSocket clients, HTTP servers, and bot info const wsClients = new Map(); const httpServers = new Map(); const botOpenIds = new Map(); const FEISHU_WEBHOOK_MAX_BODY_BYTES = 1024 * 1024; const FEISHU_WEBHOOK_BODY_TIMEOUT_MS = 30_000; const FEISHU_WEBHOOK_RATE_LIMIT_WINDOW_MS = 60_000; const FEISHU_WEBHOOK_RATE_LIMIT_MAX_REQUESTS = 120; const FEISHU_WEBHOOK_RATE_LIMIT_MAX_TRACKED_KEYS = 4_096; const FEISHU_WEBHOOK_COUNTER_LOG_EVERY = 25; const FEISHU_REACTION_VERIFY_TIMEOUT_MS = 1_500; export type FeishuReactionCreatedEvent = { message_id: string; chat_id?: string; chat_type?: "p2p" | "group"; reaction_type?: { emoji_type?: string }; operator_type?: string; user_id?: { open_id?: string }; action_time?: string; }; type ResolveReactionSyntheticEventParams = { cfg: ClawdbotConfig; accountId: string; event: FeishuReactionCreatedEvent; botOpenId?: string; fetchMessage?: typeof getMessageFeishu; verificationTimeoutMs?: number; logger?: (message: string) => void; uuid?: () => string; }; const feishuWebhookRateLimits = new Map(); const feishuWebhookStatusCounters = new Map(); let lastWebhookRateLimitCleanupMs = 0; function isJsonContentType(value: string | string[] | undefined): boolean { const first = Array.isArray(value) ? value[0] : value; if (!first) { return false; } const mediaType = first.split(";", 1)[0]?.trim().toLowerCase(); return mediaType === "application/json" || Boolean(mediaType?.endsWith("+json")); } function trimWebhookRateLimitState(): void { while (feishuWebhookRateLimits.size > FEISHU_WEBHOOK_RATE_LIMIT_MAX_TRACKED_KEYS) { const oldestKey = feishuWebhookRateLimits.keys().next().value; if (typeof oldestKey !== "string") { break; } feishuWebhookRateLimits.delete(oldestKey); } } function maybePruneWebhookRateLimitState(nowMs: number): void { if ( feishuWebhookRateLimits.size === 0 || nowMs - lastWebhookRateLimitCleanupMs < FEISHU_WEBHOOK_RATE_LIMIT_WINDOW_MS ) { return; } lastWebhookRateLimitCleanupMs = nowMs; for (const [key, state] of feishuWebhookRateLimits) { if (nowMs - state.windowStartMs >= FEISHU_WEBHOOK_RATE_LIMIT_WINDOW_MS) { feishuWebhookRateLimits.delete(key); } } } export function clearFeishuWebhookRateLimitStateForTest(): void { feishuWebhookRateLimits.clear(); lastWebhookRateLimitCleanupMs = 0; } export function getFeishuWebhookRateLimitStateSizeForTest(): number { return feishuWebhookRateLimits.size; } export function isWebhookRateLimitedForTest(key: string, nowMs: number): boolean { maybePruneWebhookRateLimitState(nowMs); const state = feishuWebhookRateLimits.get(key); if (!state || nowMs - state.windowStartMs >= FEISHU_WEBHOOK_RATE_LIMIT_WINDOW_MS) { feishuWebhookRateLimits.set(key, { count: 1, windowStartMs: nowMs }); trimWebhookRateLimitState(); return false; } state.count += 1; if (state.count > FEISHU_WEBHOOK_RATE_LIMIT_MAX_REQUESTS) { return true; } return false; } function isWebhookRateLimited(key: string, nowMs: number): boolean { return isWebhookRateLimitedForTest(key, nowMs); } function recordWebhookStatus( runtime: RuntimeEnv | undefined, accountId: string, path: string, statusCode: number, ): void { if (![400, 401, 408, 413, 415, 429].includes(statusCode)) { return; } const key = `${accountId}:${path}:${statusCode}`; const next = (feishuWebhookStatusCounters.get(key) ?? 0) + 1; feishuWebhookStatusCounters.set(key, next); if (next === 1 || next % FEISHU_WEBHOOK_COUNTER_LOG_EVERY === 0) { const log = runtime?.log ?? console.log; log(`feishu[${accountId}]: webhook anomaly path=${path} status=${statusCode} count=${next}`); } } async function withTimeout(promise: Promise, timeoutMs: number): Promise { let timeoutId: NodeJS.Timeout | undefined; try { return await Promise.race([ promise, new Promise((resolve) => { timeoutId = setTimeout(() => resolve(null), timeoutMs); }), ]); } finally { if (timeoutId) { clearTimeout(timeoutId); } } } export async function resolveReactionSyntheticEvent( params: ResolveReactionSyntheticEventParams, ): Promise { const { cfg, accountId, event, botOpenId, fetchMessage = getMessageFeishu, verificationTimeoutMs = FEISHU_REACTION_VERIFY_TIMEOUT_MS, logger, uuid = () => crypto.randomUUID(), } = params; const emoji = event.reaction_type?.emoji_type; const messageId = event.message_id; const senderId = event.user_id?.open_id; if (!emoji || !messageId || !senderId) { return null; } const account = resolveFeishuAccount({ cfg, accountId }); const reactionNotifications = account.config.reactionNotifications ?? "own"; if (reactionNotifications === "off") { return null; } // Skip bot self-reactions if (event.operator_type === "app" || senderId === botOpenId) { return null; } // Skip typing indicator emoji if (emoji === "Typing") { return null; } if (reactionNotifications === "own" && !botOpenId) { logger?.( `feishu[${accountId}]: bot open_id unavailable, skipping reaction ${emoji} on ${messageId}`, ); return null; } const reactedMsg = await withTimeout( fetchMessage({ cfg, messageId, accountId }), verificationTimeoutMs, ).catch(() => null); const isBotMessage = reactedMsg?.senderType === "app" || reactedMsg?.senderOpenId === botOpenId; if (!reactedMsg || (reactionNotifications === "own" && !isBotMessage)) { logger?.( `feishu[${accountId}]: ignoring reaction on non-bot/unverified message ${messageId} ` + `(sender: ${reactedMsg?.senderOpenId ?? "unknown"})`, ); return null; } const syntheticChatIdRaw = event.chat_id ?? reactedMsg.chatId; const syntheticChatId = syntheticChatIdRaw?.trim() ? syntheticChatIdRaw : `p2p:${senderId}`; const syntheticChatType: "p2p" | "group" = event.chat_type ?? "p2p"; return { sender: { sender_id: { open_id: senderId }, sender_type: "user", }, message: { message_id: `${messageId}:reaction:${emoji}:${uuid()}`, chat_id: syntheticChatId, chat_type: syntheticChatType, message_type: "text", content: JSON.stringify({ text: `[reacted with ${emoji} to message ${messageId}]`, }), }, }; } async function fetchBotOpenId(account: ResolvedFeishuAccount): Promise { try { const result = await probeFeishu(account); return result.ok ? result.botOpenId : undefined; } catch { return undefined; } } /** * Register common event handlers on an EventDispatcher. * When fireAndForget is true (webhook mode), message handling is not awaited * to avoid blocking the HTTP response (Lark requires <3s response). */ function registerEventHandlers( eventDispatcher: Lark.EventDispatcher, context: { cfg: ClawdbotConfig; accountId: string; runtime?: RuntimeEnv; chatHistories: Map; fireAndForget?: boolean; }, ) { const { cfg, accountId, runtime, chatHistories, fireAndForget } = context; const log = runtime?.log ?? console.log; const error = runtime?.error ?? console.error; eventDispatcher.register({ "im.message.receive_v1": async (data) => { try { const event = data as unknown as FeishuMessageEvent; const promise = handleFeishuMessage({ cfg, event, botOpenId: botOpenIds.get(accountId), runtime, chatHistories, accountId, }); if (fireAndForget) { promise.catch((err) => { error(`feishu[${accountId}]: error handling message: ${String(err)}`); }); } else { await promise; } } catch (err) { error(`feishu[${accountId}]: error handling message: ${String(err)}`); } }, "im.message.message_read_v1": async () => { // Ignore read receipts }, "im.chat.member.bot.added_v1": async (data) => { try { const event = data as unknown as FeishuBotAddedEvent; log(`feishu[${accountId}]: bot added to chat ${event.chat_id}`); } catch (err) { error(`feishu[${accountId}]: error handling bot added event: ${String(err)}`); } }, "im.chat.member.bot.deleted_v1": async (data) => { try { const event = data as unknown as { chat_id: string }; log(`feishu[${accountId}]: bot removed from chat ${event.chat_id}`); } catch (err) { error(`feishu[${accountId}]: error handling bot removed event: ${String(err)}`); } }, "im.message.reaction.created_v1": async (data) => { const processReaction = async () => { const event = data as FeishuReactionCreatedEvent; const myBotId = botOpenIds.get(accountId); const syntheticEvent = await resolveReactionSyntheticEvent({ cfg, accountId, event, botOpenId: myBotId, logger: log, }); if (!syntheticEvent) { return; } const promise = handleFeishuMessage({ cfg, event: syntheticEvent, botOpenId: myBotId, runtime, chatHistories, accountId, }); if (fireAndForget) { promise.catch((err) => { error(`feishu[${accountId}]: error handling reaction: ${String(err)}`); }); return; } await promise; }; if (fireAndForget) { void processReaction().catch((err) => { error(`feishu[${accountId}]: error handling reaction event: ${String(err)}`); }); return; } try { await processReaction(); } catch (err) { error(`feishu[${accountId}]: error handling reaction event: ${String(err)}`); } }, "im.message.reaction.deleted_v1": async () => { // Ignore reaction removals }, }); } type MonitorAccountParams = { cfg: ClawdbotConfig; account: ResolvedFeishuAccount; runtime?: RuntimeEnv; abortSignal?: AbortSignal; }; /** * Monitor a single Feishu account. */ async function monitorSingleAccount(params: MonitorAccountParams): Promise { const { cfg, account, runtime, abortSignal } = params; const { accountId } = account; const log = runtime?.log ?? console.log; // Fetch bot open_id const botOpenId = await fetchBotOpenId(account); botOpenIds.set(accountId, botOpenId ?? ""); log(`feishu[${accountId}]: bot open_id resolved: ${botOpenId ?? "unknown"}`); const connectionMode = account.config.connectionMode ?? "websocket"; if (connectionMode === "webhook" && !account.verificationToken?.trim()) { throw new Error(`Feishu account "${accountId}" webhook mode requires verificationToken`); } const eventDispatcher = createEventDispatcher(account); const chatHistories = new Map(); registerEventHandlers(eventDispatcher, { cfg, accountId, runtime, chatHistories, fireAndForget: connectionMode === "webhook", }); if (connectionMode === "webhook") { return monitorWebhook({ params, accountId, eventDispatcher }); } return monitorWebSocket({ params, accountId, eventDispatcher }); } type ConnectionParams = { params: MonitorAccountParams; accountId: string; eventDispatcher: Lark.EventDispatcher; }; async function monitorWebSocket({ params, accountId, eventDispatcher, }: ConnectionParams): Promise { const { account, runtime, abortSignal } = params; const log = runtime?.log ?? console.log; const error = runtime?.error ?? console.error; log(`feishu[${accountId}]: starting WebSocket connection...`); const wsClient = createFeishuWSClient(account); wsClients.set(accountId, wsClient); return new Promise((resolve, reject) => { const cleanup = () => { wsClients.delete(accountId); botOpenIds.delete(accountId); }; const handleAbort = () => { log(`feishu[${accountId}]: abort signal received, stopping`); cleanup(); resolve(); }; if (abortSignal?.aborted) { cleanup(); resolve(); return; } abortSignal?.addEventListener("abort", handleAbort, { once: true }); try { wsClient.start({ eventDispatcher }); log(`feishu[${accountId}]: WebSocket client started`); } catch (err) { cleanup(); abortSignal?.removeEventListener("abort", handleAbort); reject(err); } }); } async function monitorWebhook({ params, accountId, eventDispatcher, }: ConnectionParams): Promise { const { account, runtime, abortSignal } = params; const log = runtime?.log ?? console.log; const error = runtime?.error ?? console.error; const port = account.config.webhookPort ?? 3000; const path = account.config.webhookPath ?? "/feishu/events"; const host = account.config.webhookHost ?? "127.0.0.1"; log(`feishu[${accountId}]: starting Webhook server on ${host}:${port}, path ${path}...`); const server = http.createServer(); const webhookHandler = Lark.adaptDefault(path, eventDispatcher, { autoChallenge: true }); server.on("request", (req, res) => { res.on("finish", () => { recordWebhookStatus(runtime, accountId, path, res.statusCode); }); const rateLimitKey = `${accountId}:${path}:${req.socket.remoteAddress ?? "unknown"}`; if (isWebhookRateLimited(rateLimitKey, Date.now())) { res.statusCode = 429; res.end("Too Many Requests"); return; } if (req.method === "POST" && !isJsonContentType(req.headers["content-type"])) { res.statusCode = 415; res.end("Unsupported Media Type"); return; } const guard = installRequestBodyLimitGuard(req, res, { maxBytes: FEISHU_WEBHOOK_MAX_BODY_BYTES, timeoutMs: FEISHU_WEBHOOK_BODY_TIMEOUT_MS, responseFormat: "text", }); if (guard.isTripped()) { return; } void Promise.resolve(webhookHandler(req, res)) .catch((err) => { if (!guard.isTripped()) { error(`feishu[${accountId}]: webhook handler error: ${String(err)}`); } }) .finally(() => { guard.dispose(); }); }); httpServers.set(accountId, server); return new Promise((resolve, reject) => { const cleanup = () => { server.close(); httpServers.delete(accountId); botOpenIds.delete(accountId); }; const handleAbort = () => { log(`feishu[${accountId}]: abort signal received, stopping Webhook server`); cleanup(); resolve(); }; if (abortSignal?.aborted) { cleanup(); resolve(); return; } abortSignal?.addEventListener("abort", handleAbort, { once: true }); server.listen(port, host, () => { log(`feishu[${accountId}]: Webhook server listening on ${host}:${port}`); }); server.on("error", (err) => { error(`feishu[${accountId}]: Webhook server error: ${err}`); abortSignal?.removeEventListener("abort", handleAbort); reject(err); }); }); } /** * Main entry: start monitoring for all enabled accounts. */ export async function monitorFeishuProvider(opts: MonitorFeishuOpts = {}): Promise { const cfg = opts.config; if (!cfg) { throw new Error("Config is required for Feishu monitor"); } const log = opts.runtime?.log ?? console.log; // If accountId is specified, only monitor that account if (opts.accountId) { const account = resolveFeishuAccount({ cfg, accountId: opts.accountId }); if (!account.enabled || !account.configured) { throw new Error(`Feishu account "${opts.accountId}" not configured or disabled`); } return monitorSingleAccount({ cfg, account, runtime: opts.runtime, abortSignal: opts.abortSignal, }); } // Otherwise, start all enabled accounts const accounts = listEnabledFeishuAccounts(cfg); if (accounts.length === 0) { throw new Error("No enabled Feishu accounts configured"); } log( `feishu: starting ${accounts.length} account(s): ${accounts.map((a) => a.accountId).join(", ")}`, ); // Start all accounts in parallel await Promise.all( accounts.map((account) => monitorSingleAccount({ cfg, account, runtime: opts.runtime, abortSignal: opts.abortSignal, }), ), ); } /** * Stop monitoring for a specific account or all accounts. */ export function stopFeishuMonitor(accountId?: string): void { if (accountId) { wsClients.delete(accountId); const server = httpServers.get(accountId); if (server) { server.close(); httpServers.delete(accountId); } botOpenIds.delete(accountId); } else { wsClients.clear(); for (const server of httpServers.values()) { server.close(); } httpServers.clear(); botOpenIds.clear(); } }