import { resolveSessionAgentId } from "../../agents/agent-scope.js"; import type { OpenClawConfig } from "../../config/config.js"; import { loadSessionStore, resolveStorePath, type SessionEntry } from "../../config/sessions.js"; import { logVerbose } from "../../globals.js"; import { fireAndForgetHook } from "../../hooks/fire-and-forget.js"; import { createInternalHookEvent, triggerInternalHook } from "../../hooks/internal-hooks.js"; import { deriveInboundMessageHookContext, toInternalMessageReceivedContext, toPluginMessageContext, toPluginMessageReceivedEvent, } from "../../hooks/message-hook-mappers.js"; import { isDiagnosticsEnabled } from "../../infra/diagnostic-events.js"; import { logMessageProcessed, logMessageQueued, logSessionStateChange, } from "../../logging/diagnostic.js"; import { getGlobalHookRunner } from "../../plugins/hook-runner-global.js"; import { resolveSendPolicy } from "../../sessions/send-policy.js"; import { maybeApplyTtsToPayload, normalizeTtsAutoMode, resolveTtsConfig } from "../../tts/tts.js"; import { INTERNAL_MESSAGE_CHANNEL, normalizeMessageChannel } from "../../utils/message-channel.js"; import { getReplyFromConfig } from "../reply.js"; import type { FinalizedMsgContext } from "../templating.js"; import type { GetReplyOptions, ReplyPayload } from "../types.js"; import { formatAbortReplyText, tryFastAbortFromMessage } from "./abort.js"; import { shouldBypassAcpDispatchForCommand, tryDispatchAcpReply } from "./dispatch-acp.js"; import { shouldSkipDuplicateInbound } from "./inbound-dedupe.js"; import type { ReplyDispatcher, ReplyDispatchKind } from "./reply-dispatcher.js"; import { shouldSuppressReasoningPayload } from "./reply-payloads.js"; import { isRoutableChannel, routeReply } from "./route-reply.js"; import { resolveRunTypingPolicy } from "./typing-policy.js"; const AUDIO_PLACEHOLDER_RE = /^(\s*\([^)]*\))?$/i; const AUDIO_HEADER_RE = /^\[Audio\b/i; const normalizeMediaType = (value: string): string => value.split(";")[0]?.trim().toLowerCase(); const isInboundAudioContext = (ctx: FinalizedMsgContext): boolean => { const rawTypes = [ typeof ctx.MediaType === "string" ? ctx.MediaType : undefined, ...(Array.isArray(ctx.MediaTypes) ? ctx.MediaTypes : []), ].filter(Boolean) as string[]; const types = rawTypes.map((type) => normalizeMediaType(type)); if (types.some((type) => type === "audio" || type.startsWith("audio/"))) { return true; } const body = typeof ctx.BodyForCommands === "string" ? ctx.BodyForCommands : typeof ctx.CommandBody === "string" ? ctx.CommandBody : typeof ctx.RawBody === "string" ? ctx.RawBody : typeof ctx.Body === "string" ? ctx.Body : ""; const trimmed = body.trim(); if (!trimmed) { return false; } if (AUDIO_PLACEHOLDER_RE.test(trimmed)) { return true; } return AUDIO_HEADER_RE.test(trimmed); }; const resolveSessionStoreEntry = ( ctx: FinalizedMsgContext, cfg: OpenClawConfig, ): { sessionKey?: string; entry?: SessionEntry; } => { const targetSessionKey = ctx.CommandSource === "native" ? ctx.CommandTargetSessionKey?.trim() : undefined; const sessionKey = (targetSessionKey ?? ctx.SessionKey)?.trim(); if (!sessionKey) { return {}; } const agentId = resolveSessionAgentId({ sessionKey, config: cfg }); const storePath = resolveStorePath(cfg.session?.store, { agentId }); try { const store = loadSessionStore(storePath); return { sessionKey, entry: store[sessionKey.toLowerCase()] ?? store[sessionKey], }; } catch { return { sessionKey, }; } }; export type DispatchFromConfigResult = { queuedFinal: boolean; counts: Record; }; export async function dispatchReplyFromConfig(params: { ctx: FinalizedMsgContext; cfg: OpenClawConfig; dispatcher: ReplyDispatcher; replyOptions?: Omit; replyResolver?: typeof getReplyFromConfig; }): Promise { const { ctx, cfg, dispatcher } = params; const diagnosticsEnabled = isDiagnosticsEnabled(cfg); const channel = String(ctx.Surface ?? ctx.Provider ?? "unknown").toLowerCase(); const chatId = ctx.To ?? ctx.From; const messageId = ctx.MessageSid ?? ctx.MessageSidFirst ?? ctx.MessageSidLast; const sessionKey = ctx.SessionKey; const startTime = diagnosticsEnabled ? Date.now() : 0; const canTrackSession = diagnosticsEnabled && Boolean(sessionKey); const recordProcessed = ( outcome: "completed" | "skipped" | "error", opts?: { reason?: string; error?: string; }, ) => { if (!diagnosticsEnabled) { return; } logMessageProcessed({ channel, chatId, messageId, sessionKey, durationMs: Date.now() - startTime, outcome, reason: opts?.reason, error: opts?.error, }); }; const markProcessing = () => { if (!canTrackSession || !sessionKey) { return; } logMessageQueued({ sessionKey, channel, source: "dispatch" }); logSessionStateChange({ sessionKey, state: "processing", reason: "message_start", }); }; const markIdle = (reason: string) => { if (!canTrackSession || !sessionKey) { return; } logSessionStateChange({ sessionKey, state: "idle", reason, }); }; if (shouldSkipDuplicateInbound(ctx)) { recordProcessed("skipped", { reason: "duplicate" }); return { queuedFinal: false, counts: dispatcher.getQueuedCounts() }; } const sessionStoreEntry = resolveSessionStoreEntry(ctx, cfg); const acpDispatchSessionKey = sessionStoreEntry.sessionKey ?? sessionKey; const inboundAudio = isInboundAudioContext(ctx); const sessionTtsAuto = normalizeTtsAutoMode(sessionStoreEntry.entry?.ttsAuto); const hookRunner = getGlobalHookRunner(); // Extract message context for hooks (plugin and internal) const timestamp = typeof ctx.Timestamp === "number" && Number.isFinite(ctx.Timestamp) ? ctx.Timestamp : undefined; const messageIdForHook = ctx.MessageSidFull ?? ctx.MessageSid ?? ctx.MessageSidFirst ?? ctx.MessageSidLast; const hookContext = deriveInboundMessageHookContext(ctx, { messageId: messageIdForHook }); const { isGroup, groupId } = hookContext; // Trigger plugin hooks (fire-and-forget) if (hookRunner?.hasHooks("message_received")) { fireAndForgetHook( hookRunner.runMessageReceived( toPluginMessageReceivedEvent(hookContext), toPluginMessageContext(hookContext), ), "dispatch-from-config: message_received plugin hook failed", ); } // Bridge to internal hooks (HOOK.md discovery system) - refs #8807 if (sessionKey) { fireAndForgetHook( triggerInternalHook( createInternalHookEvent("message", "received", sessionKey, { ...toInternalMessageReceivedContext(hookContext), timestamp, }), ), "dispatch-from-config: message_received internal hook failed", ); } // Check if we should route replies to originating channel instead of dispatcher. // Only route when the originating channel is DIFFERENT from the current surface. // This handles cross-provider routing (e.g., message from Telegram being processed // by a shared session that's currently on Slack) while preserving normal dispatcher // flow when the provider handles its own messages. // // Debug: `pnpm test src/auto-reply/reply/dispatch-from-config.test.ts` const originatingChannel = normalizeMessageChannel(ctx.OriginatingChannel); const originatingTo = ctx.OriginatingTo; const providerChannel = normalizeMessageChannel(ctx.Provider); const surfaceChannel = normalizeMessageChannel(ctx.Surface); // Prefer provider channel because surface may carry origin metadata in relayed flows. const currentSurface = providerChannel ?? surfaceChannel; const isInternalWebchatTurn = currentSurface === INTERNAL_MESSAGE_CHANNEL && (surfaceChannel === INTERNAL_MESSAGE_CHANNEL || !surfaceChannel); const shouldRouteToOriginating = Boolean( !isInternalWebchatTurn && isRoutableChannel(originatingChannel) && originatingTo && originatingChannel !== currentSurface, ); const shouldSuppressTyping = shouldRouteToOriginating || originatingChannel === INTERNAL_MESSAGE_CHANNEL; const ttsChannel = shouldRouteToOriginating ? originatingChannel : currentSurface; /** * Helper to send a payload via route-reply (async). * Only used when actually routing to a different provider. * Note: Only called when shouldRouteToOriginating is true, so * originatingChannel and originatingTo are guaranteed to be defined. */ const sendPayloadAsync = async ( payload: ReplyPayload, abortSignal?: AbortSignal, mirror?: boolean, ): Promise => { // TypeScript doesn't narrow these from the shouldRouteToOriginating check, // but they're guaranteed non-null when this function is called. if (!originatingChannel || !originatingTo) { return; } if (abortSignal?.aborted) { return; } const result = await routeReply({ payload, channel: originatingChannel, to: originatingTo, sessionKey: ctx.SessionKey, accountId: ctx.AccountId, threadId: ctx.MessageThreadId, cfg, abortSignal, mirror, isGroup, groupId, }); if (!result.ok) { logVerbose(`dispatch-from-config: route-reply failed: ${result.error ?? "unknown error"}`); } }; markProcessing(); try { const fastAbort = await tryFastAbortFromMessage({ ctx, cfg }); if (fastAbort.handled) { const payload = { text: formatAbortReplyText(fastAbort.stoppedSubagents), } satisfies ReplyPayload; let queuedFinal = false; let routedFinalCount = 0; if (shouldRouteToOriginating && originatingChannel && originatingTo) { const result = await routeReply({ payload, channel: originatingChannel, to: originatingTo, sessionKey: ctx.SessionKey, accountId: ctx.AccountId, threadId: ctx.MessageThreadId, cfg, isGroup, groupId, }); queuedFinal = result.ok; if (result.ok) { routedFinalCount += 1; } if (!result.ok) { logVerbose( `dispatch-from-config: route-reply (abort) failed: ${result.error ?? "unknown error"}`, ); } } else { queuedFinal = dispatcher.sendFinalReply(payload); } const counts = dispatcher.getQueuedCounts(); counts.final += routedFinalCount; recordProcessed("completed", { reason: "fast_abort" }); markIdle("message_completed"); return { queuedFinal, counts }; } const bypassAcpForCommand = shouldBypassAcpDispatchForCommand(ctx, cfg); const sendPolicy = resolveSendPolicy({ cfg, entry: sessionStoreEntry.entry, sessionKey: sessionStoreEntry.sessionKey ?? sessionKey, channel: sessionStoreEntry.entry?.channel ?? ctx.OriginatingChannel ?? ctx.Surface ?? ctx.Provider ?? undefined, chatType: sessionStoreEntry.entry?.chatType, }); if (sendPolicy === "deny" && !bypassAcpForCommand) { logVerbose( `Send blocked by policy for session ${sessionStoreEntry.sessionKey ?? sessionKey ?? "unknown"}`, ); const counts = dispatcher.getQueuedCounts(); recordProcessed("completed", { reason: "send_policy_deny" }); markIdle("message_completed"); return { queuedFinal: false, counts }; } const shouldSendToolSummaries = ctx.ChatType !== "group" && ctx.CommandSource !== "native"; const acpDispatch = await tryDispatchAcpReply({ ctx, cfg, dispatcher, sessionKey: acpDispatchSessionKey, inboundAudio, sessionTtsAuto, ttsChannel, shouldRouteToOriginating, originatingChannel, originatingTo, shouldSendToolSummaries, bypassForCommand: bypassAcpForCommand, onReplyStart: params.replyOptions?.onReplyStart, recordProcessed, markIdle, }); if (acpDispatch) { return acpDispatch; } // Track accumulated block text for TTS generation after streaming completes. // When block streaming succeeds, there's no final reply, so we need to generate // TTS audio separately from the accumulated block content. let accumulatedBlockText = ""; let blockCount = 0; const resolveToolDeliveryPayload = (payload: ReplyPayload): ReplyPayload | null => { if (shouldSendToolSummaries) { return payload; } // Group/native flows intentionally suppress tool summary text, but media-only // tool results (for example TTS audio) must still be delivered. const hasMedia = Boolean(payload.mediaUrl) || (payload.mediaUrls?.length ?? 0) > 0; if (!hasMedia) { return null; } return { ...payload, text: undefined }; }; const typing = resolveRunTypingPolicy({ requestedPolicy: params.replyOptions?.typingPolicy, suppressTyping: params.replyOptions?.suppressTyping === true || shouldSuppressTyping, originatingChannel, systemEvent: shouldRouteToOriginating, }); const replyResult = await (params.replyResolver ?? getReplyFromConfig)( ctx, { ...params.replyOptions, typingPolicy: typing.typingPolicy, suppressTyping: typing.suppressTyping, onToolResult: (payload: ReplyPayload) => { const run = async () => { const ttsPayload = await maybeApplyTtsToPayload({ payload, cfg, channel: ttsChannel, kind: "tool", inboundAudio, ttsAuto: sessionTtsAuto, }); const deliveryPayload = resolveToolDeliveryPayload(ttsPayload); if (!deliveryPayload) { return; } if (shouldRouteToOriginating) { await sendPayloadAsync(deliveryPayload, undefined, false); } else { dispatcher.sendToolResult(deliveryPayload); } }; return run(); }, onBlockReply: (payload: ReplyPayload, context) => { const run = async () => { // Suppress reasoning payloads — channels using this generic dispatch // path (WhatsApp, web, etc.) do not have a dedicated reasoning lane. // Telegram has its own dispatch path that handles reasoning splitting. if (shouldSuppressReasoningPayload(payload)) { return; } // Accumulate block text for TTS generation after streaming if (payload.text) { if (accumulatedBlockText.length > 0) { accumulatedBlockText += "\n"; } accumulatedBlockText += payload.text; blockCount++; } const ttsPayload = await maybeApplyTtsToPayload({ payload, cfg, channel: ttsChannel, kind: "block", inboundAudio, ttsAuto: sessionTtsAuto, }); if (shouldRouteToOriginating) { await sendPayloadAsync(ttsPayload, context?.abortSignal, false); } else { dispatcher.sendBlockReply(ttsPayload); } }; return run(); }, }, cfg, ); if (ctx.AcpDispatchTailAfterReset === true) { // Command handling prepared a trailing prompt after ACP in-place reset. // Route that tail through ACP now (same turn) instead of embedded dispatch. ctx.AcpDispatchTailAfterReset = false; const acpTailDispatch = await tryDispatchAcpReply({ ctx, cfg, dispatcher, sessionKey: acpDispatchSessionKey, inboundAudio, sessionTtsAuto, ttsChannel, shouldRouteToOriginating, originatingChannel, originatingTo, shouldSendToolSummaries, bypassForCommand: false, onReplyStart: params.replyOptions?.onReplyStart, recordProcessed, markIdle, }); if (acpTailDispatch) { return acpTailDispatch; } } const replies = replyResult ? (Array.isArray(replyResult) ? replyResult : [replyResult]) : []; let queuedFinal = false; let routedFinalCount = 0; for (const reply of replies) { // Suppress reasoning payloads from channel delivery — channels using this // generic dispatch path do not have a dedicated reasoning lane. if (shouldSuppressReasoningPayload(reply)) { continue; } const ttsReply = await maybeApplyTtsToPayload({ payload: reply, cfg, channel: ttsChannel, kind: "final", inboundAudio, ttsAuto: sessionTtsAuto, }); if (shouldRouteToOriginating && originatingChannel && originatingTo) { // Route final reply to originating channel. const result = await routeReply({ payload: ttsReply, channel: originatingChannel, to: originatingTo, sessionKey: ctx.SessionKey, accountId: ctx.AccountId, threadId: ctx.MessageThreadId, cfg, isGroup, groupId, }); if (!result.ok) { logVerbose( `dispatch-from-config: route-reply (final) failed: ${result.error ?? "unknown error"}`, ); } queuedFinal = result.ok || queuedFinal; if (result.ok) { routedFinalCount += 1; } } else { queuedFinal = dispatcher.sendFinalReply(ttsReply) || queuedFinal; } } const ttsMode = resolveTtsConfig(cfg).mode ?? "final"; // Generate TTS-only reply after block streaming completes (when there's no final reply). // This handles the case where block streaming succeeds and drops final payloads, // but we still want TTS audio to be generated from the accumulated block content. if ( ttsMode === "final" && replies.length === 0 && blockCount > 0 && accumulatedBlockText.trim() ) { try { const ttsSyntheticReply = await maybeApplyTtsToPayload({ payload: { text: accumulatedBlockText }, cfg, channel: ttsChannel, kind: "final", inboundAudio, ttsAuto: sessionTtsAuto, }); // Only send if TTS was actually applied (mediaUrl exists) if (ttsSyntheticReply.mediaUrl) { // Send TTS-only payload (no text, just audio) so it doesn't duplicate the block content const ttsOnlyPayload: ReplyPayload = { mediaUrl: ttsSyntheticReply.mediaUrl, audioAsVoice: ttsSyntheticReply.audioAsVoice, }; if (shouldRouteToOriginating && originatingChannel && originatingTo) { const result = await routeReply({ payload: ttsOnlyPayload, channel: originatingChannel, to: originatingTo, sessionKey: ctx.SessionKey, accountId: ctx.AccountId, threadId: ctx.MessageThreadId, cfg, isGroup, groupId, }); queuedFinal = result.ok || queuedFinal; if (result.ok) { routedFinalCount += 1; } if (!result.ok) { logVerbose( `dispatch-from-config: route-reply (tts-only) failed: ${result.error ?? "unknown error"}`, ); } } else { const didQueue = dispatcher.sendFinalReply(ttsOnlyPayload); queuedFinal = didQueue || queuedFinal; } } } catch (err) { logVerbose( `dispatch-from-config: accumulated block TTS failed: ${err instanceof Error ? err.message : String(err)}`, ); } } const counts = dispatcher.getQueuedCounts(); counts.final += routedFinalCount; recordProcessed("completed"); markIdle("message_completed"); return { queuedFinal, counts }; } catch (err) { recordProcessed("error", { error: String(err) }); markIdle("message_error"); throw err; } }