import os from "node:os"; import path from "node:path"; import { createDedupeCache, createPersistentDedupe, readJsonFileWithFallback, } from "../runtime-api.js"; // Persistent TTL: 24 hours — survives restarts & WebSocket reconnects. const DEDUP_TTL_MS = 24 * 60 * 60 * 1000; const MEMORY_MAX_SIZE = 1_000; const FILE_MAX_ENTRIES = 10_000; const EVENT_DEDUP_TTL_MS = 5 * 60 * 1000; const EVENT_MEMORY_MAX_SIZE = 2_000; type PersistentDedupeData = Record; const memoryDedupe = createDedupeCache({ ttlMs: DEDUP_TTL_MS, maxSize: MEMORY_MAX_SIZE }); const processingClaims = createDedupeCache({ ttlMs: EVENT_DEDUP_TTL_MS, maxSize: EVENT_MEMORY_MAX_SIZE, }); function resolveStateDirFromEnv(env: NodeJS.ProcessEnv = process.env): string { const stateOverride = env.OPENCLAW_STATE_DIR?.trim() || env.CLAWDBOT_STATE_DIR?.trim(); if (stateOverride) { return stateOverride; } if (env.VITEST || env.NODE_ENV === "test") { return path.join(os.tmpdir(), ["openclaw-vitest", String(process.pid)].join("-")); } return path.join(os.homedir(), ".openclaw"); } function resolveNamespaceFilePath(namespace: string): string { const safe = namespace.replace(/[^a-zA-Z0-9_-]/g, "_"); return path.join(resolveStateDirFromEnv(), "feishu", "dedup", `${safe}.json`); } const persistentDedupe = createPersistentDedupe({ ttlMs: DEDUP_TTL_MS, memoryMaxSize: MEMORY_MAX_SIZE, fileMaxEntries: FILE_MAX_ENTRIES, resolveFilePath: resolveNamespaceFilePath, }); function resolveEventDedupeKey( namespace: string, messageId: string | undefined | null, ): string | null { const trimmed = messageId?.trim(); if (!trimmed) { return null; } return `${namespace}:${trimmed}`; } function normalizeMessageId(messageId: string | undefined | null): string | null { const trimmed = messageId?.trim(); return trimmed ? trimmed : null; } function resolveMemoryDedupeKey( namespace: string, messageId: string | undefined | null, ): string | null { const trimmed = normalizeMessageId(messageId); if (!trimmed) { return null; } return `${namespace}:${trimmed}`; } export function tryBeginFeishuMessageProcessing( messageId: string | undefined | null, namespace = "global", ): boolean { return !processingClaims.check(resolveEventDedupeKey(namespace, messageId)); } export function releaseFeishuMessageProcessing( messageId: string | undefined | null, namespace = "global", ): void { processingClaims.delete(resolveEventDedupeKey(namespace, messageId)); } export async function finalizeFeishuMessageProcessing(params: { messageId: string | undefined | null; namespace?: string; log?: (...args: unknown[]) => void; claimHeld?: boolean; }): Promise { const { messageId, namespace = "global", log, claimHeld = false } = params; const normalizedMessageId = normalizeMessageId(messageId); const memoryKey = resolveMemoryDedupeKey(namespace, messageId); if (!memoryKey || !normalizedMessageId) { return false; } if (!claimHeld && !tryBeginFeishuMessageProcessing(normalizedMessageId, namespace)) { return false; } if (!tryRecordMessage(memoryKey)) { releaseFeishuMessageProcessing(normalizedMessageId, namespace); return false; } if (!(await tryRecordMessagePersistent(normalizedMessageId, namespace, log))) { releaseFeishuMessageProcessing(normalizedMessageId, namespace); return false; } return true; } export async function recordProcessedFeishuMessage( messageId: string | undefined | null, namespace = "global", log?: (...args: unknown[]) => void, ): Promise { const normalizedMessageId = normalizeMessageId(messageId); const memoryKey = resolveMemoryDedupeKey(namespace, messageId); if (!memoryKey || !normalizedMessageId) { return false; } tryRecordMessage(memoryKey); return await tryRecordMessagePersistent(normalizedMessageId, namespace, log); } export async function hasProcessedFeishuMessage( messageId: string | undefined | null, namespace = "global", log?: (...args: unknown[]) => void, ): Promise { const normalizedMessageId = normalizeMessageId(messageId); const memoryKey = resolveMemoryDedupeKey(namespace, messageId); if (!memoryKey || !normalizedMessageId) { return false; } if (hasRecordedMessage(memoryKey)) { return true; } return hasRecordedMessagePersistent(normalizedMessageId, namespace, log); } /** * Synchronous dedup — memory only. * Kept for backward compatibility; prefer {@link tryRecordMessagePersistent}. */ export function tryRecordMessage(messageId: string): boolean { return !memoryDedupe.check(messageId); } export function hasRecordedMessage(messageId: string): boolean { const trimmed = messageId.trim(); if (!trimmed) { return false; } return memoryDedupe.peek(trimmed); } export async function tryRecordMessagePersistent( messageId: string, namespace = "global", log?: (...args: unknown[]) => void, ): Promise { return persistentDedupe.checkAndRecord(messageId, { namespace, onDiskError: (error) => { log?.(`feishu-dedup: disk error, falling back to memory: ${String(error)}`); }, }); } export async function hasRecordedMessagePersistent( messageId: string, namespace = "global", log?: (...args: unknown[]) => void, ): Promise { const trimmed = messageId.trim(); if (!trimmed) { return false; } const now = Date.now(); const filePath = resolveNamespaceFilePath(namespace); try { const { value } = await readJsonFileWithFallback(filePath, {}); const seenAt = value[trimmed]; if (typeof seenAt !== "number" || !Number.isFinite(seenAt)) { return false; } return DEDUP_TTL_MS <= 0 || now - seenAt < DEDUP_TTL_MS; } catch (error) { log?.(`feishu-dedup: persistent peek failed: ${String(error)}`); return false; } } export async function warmupDedupFromDisk( namespace: string, log?: (...args: unknown[]) => void, ): Promise { return persistentDedupe.warmup(namespace, (error) => { log?.(`feishu-dedup: warmup disk error: ${String(error)}`); }); }