2026-01-04 05:47:21 +01:00
import crypto from "node:crypto" ;
2026-01-06 07:24:51 -05:00
import fs from "node:fs" ;
2026-01-04 05:47:21 +01:00
import { lookupContextTokens } from "../../agents/context.js" ;
import { DEFAULT_CONTEXT_TOKENS } from "../../agents/defaults.js" ;
2026-01-04 18:11:41 +01:00
import { runWithModelFallback } from "../../agents/model-fallback.js" ;
2026-01-04 05:47:21 +01:00
import {
queueEmbeddedPiMessage ,
runEmbeddedPiAgent ,
} from "../../agents/pi-embedded.js" ;
2026-01-06 18:51:45 +00:00
import { hasNonzeroUsage } from "../../agents/usage.js" ;
2026-01-04 05:47:21 +01:00
import {
loadSessionStore ,
2026-01-06 07:24:51 -05:00
resolveSessionTranscriptPath ,
2026-01-04 05:47:21 +01:00
type SessionEntry ,
saveSessionStore ,
} from "../../config/sessions.js" ;
2026-01-07 21:58:54 +00:00
import type { TypingMode } from "../../config/types.js" ;
2026-01-04 05:47:21 +01:00
import { logVerbose } from "../../globals.js" ;
import { registerAgentRunContext } from "../../infra/agent-events.js" ;
import { defaultRuntime } from "../../runtime.js" ;
import { stripHeartbeatToken } from "../heartbeat.js" ;
2026-01-08 00:50:29 +00:00
import type { OriginatingChannelType , TemplateContext } from "../templating.js" ;
2026-01-04 05:47:21 +01:00
import { normalizeVerboseLevel , type VerboseLevel } from "../thinking.js" ;
import { SILENT_REPLY_TOKEN } from "../tokens.js" ;
import type { GetReplyOptions , ReplyPayload } from "../types.js" ;
2026-01-08 03:13:54 +00:00
import { extractAudioTag } from "./audio-tags.js" ;
2026-01-04 05:47:21 +01:00
import { createFollowupRunner } from "./followup-runner.js" ;
import {
enqueueFollowupRun ,
type FollowupRun ,
type QueueSettings ,
scheduleFollowupDrain ,
} from "./queue.js" ;
2026-01-08 01:09:13 +00:00
import {
applyReplyTagsToPayload ,
applyReplyThreading ,
filterMessagingToolDuplicates ,
isRenderablePayload ,
2026-01-08 08:49:16 +01:00
shouldSuppressMessagingToolReplies ,
2026-01-08 01:09:13 +00:00
} from "./reply-payloads.js" ;
2026-01-08 00:50:29 +00:00
import {
createReplyToModeFilter ,
resolveReplyToMode ,
} from "./reply-threading.js" ;
2026-01-06 02:41:48 +01:00
import { incrementCompactionCount } from "./session-updates.js" ;
2026-01-04 05:47:21 +01:00
import type { TypingController } from "./typing.js" ;
2026-01-07 22:18:11 +00:00
import { createTypingSignaler } from "./typing-mode.js" ;
2026-01-04 05:47:21 +01:00
2026-01-06 22:43:29 +01:00
const BUN_FETCH_SOCKET_ERROR_RE = /socket connection was closed unexpectedly/i ;
const isBunFetchSocketError = ( message? : string ) = >
Boolean ( message && BUN_FETCH_SOCKET_ERROR_RE . test ( message ) ) ;
const formatBunFetchSocketError = ( message : string ) = > {
const trimmed = message . trim ( ) ;
return [
"⚠️ LLM connection failed. This could be due to server issues, network problems, or context length exceeded (e.g., with local LLMs like LM Studio). Original error:" ,
"```" ,
trimmed || "Unknown error" ,
"```" ,
] . join ( "\n" ) ;
} ;
2026-01-04 05:47:21 +01:00
export async function runReplyAgent ( params : {
commandBody : string ;
followupRun : FollowupRun ;
queueKey : string ;
resolvedQueue : QueueSettings ;
shouldSteer : boolean ;
shouldFollowup : boolean ;
isActive : boolean ;
isStreaming : boolean ;
opts? : GetReplyOptions ;
typing : TypingController ;
sessionEntry? : SessionEntry ;
sessionStore? : Record < string , SessionEntry > ;
sessionKey? : string ;
storePath? : string ;
defaultModel : string ;
agentCfgContextTokens? : number ;
resolvedVerboseLevel : VerboseLevel ;
isNewSession : boolean ;
blockStreamingEnabled : boolean ;
blockReplyChunking ? : {
minChars : number ;
maxChars : number ;
breakPreference : "paragraph" | "newline" | "sentence" ;
} ;
resolvedBlockStreamingBreak : "text_end" | "message_end" ;
sessionCtx : TemplateContext ;
shouldInjectGroupIntro : boolean ;
2026-01-07 21:58:54 +00:00
typingMode : TypingMode ;
2026-01-04 05:47:21 +01:00
} ) : Promise < ReplyPayload | ReplyPayload [ ] | undefined > {
const {
commandBody ,
followupRun ,
queueKey ,
resolvedQueue ,
shouldSteer ,
shouldFollowup ,
isActive ,
isStreaming ,
opts ,
typing ,
sessionEntry ,
sessionStore ,
sessionKey ,
storePath ,
defaultModel ,
agentCfgContextTokens ,
resolvedVerboseLevel ,
isNewSession ,
blockStreamingEnabled ,
blockReplyChunking ,
resolvedBlockStreamingBreak ,
sessionCtx ,
shouldInjectGroupIntro ,
2026-01-07 21:58:54 +00:00
typingMode ,
2026-01-04 05:47:21 +01:00
} = params ;
2026-01-05 12:03:36 +13:00
const isHeartbeat = opts ? . isHeartbeat === true ;
2026-01-07 22:18:11 +00:00
const typingSignals = createTypingSignaler ( {
typing ,
mode : typingMode ,
isHeartbeat ,
} ) ;
2026-01-05 12:03:36 +13:00
2026-01-04 05:47:21 +01:00
const shouldEmitToolResult = ( ) = > {
if ( ! sessionKey || ! storePath ) {
return resolvedVerboseLevel === "on" ;
}
try {
const store = loadSessionStore ( storePath ) ;
const entry = store [ sessionKey ] ;
const current = normalizeVerboseLevel ( entry ? . verboseLevel ) ;
if ( current ) return current === "on" ;
} catch {
// ignore store read failures
}
return resolvedVerboseLevel === "on" ;
} ;
const streamedPayloadKeys = new Set < string > ( ) ;
const pendingStreamedPayloadKeys = new Set < string > ( ) ;
const pendingBlockTasks = new Set < Promise < void > > ( ) ;
2026-01-07 00:24:08 +05:30
const pendingToolTasks = new Set < Promise < void > > ( ) ;
2026-01-04 05:47:21 +01:00
let didStreamBlockReply = false ;
const buildPayloadKey = ( payload : ReplyPayload ) = > {
const text = payload . text ? . trim ( ) ? ? "" ;
const mediaList = payload . mediaUrls ? . length
? payload . mediaUrls
: payload . mediaUrl
? [ payload . mediaUrl ]
: [ ] ;
return JSON . stringify ( {
text ,
mediaList ,
replyToId : payload.replyToId ? ? null ,
} ) ;
} ;
2026-01-08 00:50:29 +00:00
const replyToChannel =
sessionCtx . OriginatingChannel ? ?
( ( sessionCtx . Surface ? ? sessionCtx . Provider ) ? . toLowerCase ( ) as
| OriginatingChannelType
| undefined ) ;
const replyToMode = resolveReplyToMode (
followupRun . run . config ,
replyToChannel ,
) ;
const applyReplyToMode = createReplyToModeFilter ( replyToMode ) ;
2026-01-04 05:47:21 +01:00
if ( shouldSteer && isStreaming ) {
const steered = queueEmbeddedPiMessage (
followupRun . run . sessionId ,
followupRun . prompt ,
) ;
if ( steered && ! shouldFollowup ) {
if ( sessionEntry && sessionStore && sessionKey ) {
sessionEntry . updatedAt = Date . now ( ) ;
sessionStore [ sessionKey ] = sessionEntry ;
if ( storePath ) {
await saveSessionStore ( storePath , sessionStore ) ;
}
}
typing . cleanup ( ) ;
return undefined ;
}
}
if ( isActive && ( shouldFollowup || resolvedQueue . mode === "steer" ) ) {
enqueueFollowupRun ( queueKey , followupRun , resolvedQueue ) ;
if ( sessionEntry && sessionStore && sessionKey ) {
sessionEntry . updatedAt = Date . now ( ) ;
sessionStore [ sessionKey ] = sessionEntry ;
if ( storePath ) {
await saveSessionStore ( storePath , sessionStore ) ;
}
}
typing . cleanup ( ) ;
return undefined ;
}
const runFollowupTurn = createFollowupRunner ( {
opts ,
typing ,
2026-01-07 21:58:54 +00:00
typingMode ,
2026-01-04 05:47:21 +01:00
sessionEntry ,
sessionStore ,
sessionKey ,
storePath ,
defaultModel ,
agentCfgContextTokens ,
} ) ;
const finalizeWithFollowup = < T > ( value : T ) : T = > {
scheduleFollowupDrain ( queueKey , runFollowupTurn ) ;
return value ;
} ;
let didLogHeartbeatStrip = false ;
2026-01-06 02:41:48 +01:00
let autoCompactionCompleted = false ;
2026-01-04 05:47:21 +01:00
try {
const runId = crypto . randomUUID ( ) ;
if ( sessionKey ) {
registerAgentRunContext ( runId , { sessionKey } ) ;
}
let runResult : Awaited < ReturnType < typeof runEmbeddedPiAgent > > ;
2026-01-04 17:50:55 +01:00
let fallbackProvider = followupRun . run . provider ;
let fallbackModel = followupRun . run . model ;
2026-01-04 05:47:21 +01:00
try {
2026-01-07 11:08:11 +01:00
const allowPartialStream = ! (
followupRun . run . reasoningLevel === "stream" && opts ? . onReasoningStream
) ;
2026-01-04 17:50:55 +01:00
const fallbackResult = await runWithModelFallback ( {
cfg : followupRun.run.config ,
2026-01-04 05:47:21 +01:00
provider : followupRun.run.provider ,
model : followupRun.run.model ,
2026-01-04 17:50:55 +01:00
run : ( provider , model ) = >
runEmbeddedPiAgent ( {
sessionId : followupRun.run.sessionId ,
sessionKey ,
2026-01-06 18:25:37 +00:00
messageProvider :
sessionCtx . Provider ? . trim ( ) . toLowerCase ( ) || undefined ,
2026-01-08 08:49:16 +01:00
agentAccountId : sessionCtx.AccountId ,
2026-01-04 17:50:55 +01:00
sessionFile : followupRun.run.sessionFile ,
workspaceDir : followupRun.run.workspaceDir ,
2026-01-06 18:25:37 +00:00
agentDir : followupRun.run.agentDir ,
2026-01-04 17:50:55 +01:00
config : followupRun.run.config ,
skillsSnapshot : followupRun.run.skillsSnapshot ,
prompt : commandBody ,
extraSystemPrompt : followupRun.run.extraSystemPrompt ,
ownerNumbers : followupRun.run.ownerNumbers ,
enforceFinalTag : followupRun.run.enforceFinalTag ,
provider ,
model ,
2026-01-06 00:56:29 +00:00
authProfileId : followupRun.run.authProfileId ,
2026-01-04 17:50:55 +01:00
thinkLevel : followupRun.run.thinkLevel ,
verboseLevel : followupRun.run.verboseLevel ,
2026-01-07 06:16:38 +01:00
reasoningLevel : followupRun.run.reasoningLevel ,
2026-01-04 17:50:55 +01:00
bashElevated : followupRun.run.bashElevated ,
timeoutMs : followupRun.run.timeoutMs ,
runId ,
blockReplyBreak : resolvedBlockStreamingBreak ,
blockReplyChunking ,
2026-01-07 11:08:11 +01:00
onPartialReply :
opts ? . onPartialReply && allowPartialStream
? async ( payload ) = > {
let text = payload . text ;
if ( ! isHeartbeat && text ? . includes ( "HEARTBEAT_OK" ) ) {
const stripped = stripHeartbeatToken ( text , {
mode : "message" ,
} ) ;
if ( stripped . didStrip && ! didLogHeartbeatStrip ) {
didLogHeartbeatStrip = true ;
logVerbose (
"Stripped stray HEARTBEAT_OK token from reply" ,
) ;
}
if (
stripped . shouldSkip &&
( payload . mediaUrls ? . length ? ? 0 ) === 0
) {
return ;
}
text = stripped . text ;
2026-01-04 17:50:55 +01:00
}
2026-01-07 22:18:11 +00:00
await typingSignals . signalTextDelta ( text ) ;
2026-01-07 11:08:11 +01:00
await opts . onPartialReply ? . ( {
text ,
mediaUrls : payload.mediaUrls ,
} ) ;
2026-01-05 12:03:36 +13:00
}
2026-01-07 11:08:11 +01:00
: undefined ,
2026-01-07 21:58:54 +00:00
onReasoningStream :
2026-01-07 22:18:11 +00:00
typingSignals . shouldStartOnReasoning || opts ? . onReasoningStream
2026-01-07 21:58:54 +00:00
? async ( payload ) = > {
2026-01-07 22:18:11 +00:00
await typingSignals . signalReasoningDelta ( ) ;
2026-01-07 21:58:54 +00:00
await opts ? . onReasoningStream ? . ( {
text : payload.text ,
mediaUrls : payload.mediaUrls ,
} ) ;
}
: undefined ,
2026-01-06 02:41:48 +01:00
onAgentEvent : ( evt ) = > {
2026-01-07 20:34:56 -08:00
// Trigger typing when tools start executing
if ( evt . stream === "tool" ) {
const phase =
typeof evt . data . phase === "string" ? evt . data . phase : "" ;
if ( phase === "start" ) {
void typingSignals . signalToolStart ( ) ;
}
}
// Track auto-compaction completion
if ( evt . stream === "compaction" ) {
const phase =
typeof evt . data . phase === "string" ? evt . data . phase : "" ;
const willRetry = Boolean ( evt . data . willRetry ) ;
if ( phase === "end" && ! willRetry ) {
autoCompactionCompleted = true ;
}
2026-01-06 02:41:48 +01:00
}
} ,
2026-01-04 17:50:55 +01:00
onBlockReply :
blockStreamingEnabled && opts ? . onBlockReply
? async ( payload ) = > {
let text = payload . text ;
2026-01-05 12:03:36 +13:00
if ( ! isHeartbeat && text ? . includes ( "HEARTBEAT_OK" ) ) {
2026-01-04 17:50:55 +01:00
const stripped = stripHeartbeatToken ( text , {
mode : "message" ,
} ) ;
if ( stripped . didStrip && ! didLogHeartbeatStrip ) {
didLogHeartbeatStrip = true ;
logVerbose (
"Stripped stray HEARTBEAT_OK token from reply" ,
) ;
}
const hasMedia = ( payload . mediaUrls ? . length ? ? 0 ) > 0 ;
if ( stripped . shouldSkip && ! hasMedia ) return ;
text = stripped . text ;
}
2026-01-08 01:09:13 +00:00
const taggedPayload = applyReplyTagsToPayload (
{
text ,
mediaUrls : payload.mediaUrls ,
mediaUrl : payload.mediaUrls?. [ 0 ] ,
} ,
2026-01-04 17:50:55 +01:00
sessionCtx . MessageSid ,
) ;
2026-01-08 01:09:13 +00:00
if ( ! isRenderablePayload ( taggedPayload ) ) return ;
2026-01-04 22:15:22 +01:00
const audioTagResult = extractAudioTag ( taggedPayload . text ) ;
const cleaned = audioTagResult . cleaned || undefined ;
2026-01-08 01:09:13 +00:00
const hasMedia =
Boolean ( taggedPayload . mediaUrl ) ||
( taggedPayload . mediaUrls ? . length ? ? 0 ) > 0 ;
2026-01-08 03:13:54 +00:00
if ( ! cleaned && ! hasMedia ) return ;
2026-01-04 22:15:22 +01:00
if ( cleaned ? . trim ( ) === SILENT_REPLY_TOKEN && ! hasMedia )
2026-01-04 17:50:55 +01:00
return ;
2026-01-04 22:15:22 +01:00
const blockPayload : ReplyPayload = applyReplyToMode ( {
. . . taggedPayload ,
text : cleaned ,
audioAsVoice : audioTagResult.audioAsVoice ,
} ) ;
2026-01-04 17:50:55 +01:00
const payloadKey = buildPayloadKey ( blockPayload ) ;
if (
streamedPayloadKeys . has ( payloadKey ) ||
pendingStreamedPayloadKeys . has ( payloadKey )
) {
return ;
}
pendingStreamedPayloadKeys . add ( payloadKey ) ;
const task = ( async ( ) = > {
2026-01-08 01:09:13 +00:00
await typingSignals . signalTextDelta ( taggedPayload . text ) ;
2026-01-04 17:50:55 +01:00
await opts . onBlockReply ? . ( blockPayload ) ;
} ) ( )
. then ( ( ) = > {
streamedPayloadKeys . add ( payloadKey ) ;
didStreamBlockReply = true ;
} )
. catch ( ( err ) = > {
logVerbose (
` block reply delivery failed: ${ String ( err ) } ` ,
) ;
} )
. finally ( ( ) = > {
pendingStreamedPayloadKeys . delete ( payloadKey ) ;
} ) ;
pendingBlockTasks . add ( task ) ;
void task . finally ( ( ) = > pendingBlockTasks . delete ( task ) ) ;
}
: undefined ,
shouldEmitToolResult ,
onToolResult : opts?.onToolResult
2026-01-07 00:24:08 +05:30
? ( payload ) = > {
2026-01-06 18:56:40 +00:00
// `subscribeEmbeddedPiSession` may invoke tool callbacks without awaiting them.
// If a tool callback starts typing after the run finalized, we can end up with
// a typing loop that never sees a matching markRunComplete(). Track and drain.
2026-01-07 00:24:08 +05:30
const task = ( async ( ) = > {
let text = payload . text ;
if ( ! isHeartbeat && text ? . includes ( "HEARTBEAT_OK" ) ) {
const stripped = stripHeartbeatToken ( text , {
mode : "message" ,
} ) ;
if ( stripped . didStrip && ! didLogHeartbeatStrip ) {
didLogHeartbeatStrip = true ;
logVerbose (
"Stripped stray HEARTBEAT_OK token from reply" ,
) ;
}
if (
stripped . shouldSkip &&
( payload . mediaUrls ? . length ? ? 0 ) === 0
) {
return ;
}
text = stripped . text ;
2026-01-04 17:50:55 +01:00
}
2026-01-07 22:18:11 +00:00
await typingSignals . signalTextDelta ( text ) ;
2026-01-07 00:24:08 +05:30
await opts . onToolResult ? . ( {
text ,
mediaUrls : payload.mediaUrls ,
} ) ;
} ) ( )
. catch ( ( err ) = > {
logVerbose ( ` tool result delivery failed: ${ String ( err ) } ` ) ;
} )
. finally ( ( ) = > {
pendingToolTasks . delete ( task ) ;
} ) ;
pendingToolTasks . add ( task ) ;
2026-01-04 05:47:21 +01:00
}
2026-01-04 17:50:55 +01:00
: undefined ,
} ) ,
2026-01-04 05:47:21 +01:00
} ) ;
2026-01-04 17:50:55 +01:00
runResult = fallbackResult . result ;
fallbackProvider = fallbackResult . provider ;
fallbackModel = fallbackResult . model ;
2026-01-04 05:47:21 +01:00
} catch ( err ) {
const message = err instanceof Error ? err.message : String ( err ) ;
const isContextOverflow =
/context.*overflow|too large|context window/i . test ( message ) ;
2026-01-06 07:24:51 -05:00
const isSessionCorruption =
2026-01-06 23:06:01 +01:00
/function call turn comes immediately after/i . test ( message ) ;
2026-01-06 07:24:51 -05:00
// Auto-recover from Gemini session corruption by resetting the session
if ( isSessionCorruption && sessionKey && sessionStore && storePath ) {
const corruptedSessionId = sessionEntry ? . sessionId ;
defaultRuntime . error (
` Session history corrupted (Gemini function call ordering). Resetting session: ${ sessionKey } ` ,
) ;
2026-01-06 23:06:01 +01:00
try {
// Delete transcript file if it exists
if ( corruptedSessionId ) {
const transcriptPath =
resolveSessionTranscriptPath ( corruptedSessionId ) ;
try {
fs . unlinkSync ( transcriptPath ) ;
} catch {
// Ignore if file doesn't exist
}
2026-01-06 07:24:51 -05:00
}
2026-01-06 23:06:01 +01:00
// Remove session entry from store
delete sessionStore [ sessionKey ] ;
await saveSessionStore ( storePath , sessionStore ) ;
} catch ( cleanupErr ) {
defaultRuntime . error (
` Failed to reset corrupted session ${ sessionKey } : ${ String ( cleanupErr ) } ` ,
) ;
}
2026-01-06 07:24:51 -05:00
return finalizeWithFollowup ( {
text : "⚠️ Session history was corrupted. I've reset the conversation - please try again!" ,
} ) ;
}
2026-01-04 05:47:21 +01:00
defaultRuntime . error ( ` Embedded agent failed before reply: ${ message } ` ) ;
return finalizeWithFollowup ( {
text : isContextOverflow
? "⚠️ Context overflow - conversation too long. Starting fresh might help!"
: ` ⚠️ Agent failed before reply: ${ message } . Check gateway logs for details. ` ,
} ) ;
}
if (
shouldInjectGroupIntro &&
sessionEntry &&
sessionStore &&
sessionKey &&
sessionEntry . groupActivationNeedsSystemIntro
) {
sessionEntry . groupActivationNeedsSystemIntro = false ;
sessionEntry . updatedAt = Date . now ( ) ;
sessionStore [ sessionKey ] = sessionEntry ;
if ( storePath ) {
await saveSessionStore ( storePath , sessionStore ) ;
}
}
const payloadArray = runResult . payloads ? ? [ ] ;
if ( pendingBlockTasks . size > 0 ) {
await Promise . allSettled ( pendingBlockTasks ) ;
}
2026-01-07 00:24:08 +05:30
if ( pendingToolTasks . size > 0 ) {
await Promise . allSettled ( pendingToolTasks ) ;
}
2026-01-06 18:56:40 +00:00
// Drain any late tool/block deliveries before deciding there's "nothing to send".
// Otherwise, a late typing trigger (e.g. from a tool callback) can outlive the run and
// keep the typing indicator stuck.
if ( payloadArray . length === 0 ) return finalizeWithFollowup ( undefined ) ;
2026-01-04 05:47:21 +01:00
2026-01-05 12:03:36 +13:00
const sanitizedPayloads = isHeartbeat
2026-01-04 05:47:21 +01:00
? payloadArray
: payloadArray . flatMap ( ( payload ) = > {
2026-01-06 21:17:55 +01:00
let text = payload . text ;
2026-01-06 22:43:29 +01:00
if ( payload . isError && text && isBunFetchSocketError ( text ) ) {
text = formatBunFetchSocketError ( text ) ;
2026-01-06 21:17:55 +01:00
}
if ( ! text || ! text . includes ( "HEARTBEAT_OK" ) )
return [ { . . . payload , text } ] ;
2026-01-04 05:47:21 +01:00
const stripped = stripHeartbeatToken ( text , { mode : "message" } ) ;
if ( stripped . didStrip && ! didLogHeartbeatStrip ) {
didLogHeartbeatStrip = true ;
logVerbose ( "Stripped stray HEARTBEAT_OK token from reply" ) ;
}
const hasMedia =
Boolean ( payload . mediaUrl ) || ( payload . mediaUrls ? . length ? ? 0 ) > 0 ;
if ( stripped . shouldSkip && ! hasMedia ) return [ ] ;
return [ { . . . payload , text : stripped.text } ] ;
} ) ;
2026-01-08 01:09:13 +00:00
const replyTaggedPayloads : ReplyPayload [ ] = applyReplyThreading ( {
payloads : sanitizedPayloads ,
applyReplyToMode ,
currentMessageId : sessionCtx.MessageSid ,
2026-01-04 22:15:22 +01:00
} )
. map ( ( payload ) = > {
const audioTagResult = extractAudioTag ( payload . text ) ;
return {
. . . payload ,
text : audioTagResult.cleaned ? audioTagResult.cleaned : undefined ,
audioAsVoice : audioTagResult.audioAsVoice ,
} ;
} )
. filter ( isRenderablePayload ) ;
2026-01-04 05:47:21 +01:00
2026-01-08 00:50:29 +00:00
// Drop final payloads if block streaming is enabled and we already streamed
// block replies. Tool-sent duplicates are filtered below.
2026-01-04 05:47:21 +01:00
const shouldDropFinalPayloads =
2026-01-08 00:50:29 +00:00
blockStreamingEnabled && didStreamBlockReply ;
const messagingToolSentTexts = runResult . messagingToolSentTexts ? ? [ ] ;
2026-01-08 08:49:16 +01:00
const messagingToolSentTargets = runResult . messagingToolSentTargets ? ? [ ] ;
const suppressMessagingToolReplies = shouldSuppressMessagingToolReplies ( {
messageProvider : followupRun.run.messageProvider ,
messagingToolSentTargets ,
originatingTo : sessionCtx.OriginatingTo ? ? sessionCtx . To ,
accountId : sessionCtx.AccountId ,
} ) ;
2026-01-08 01:09:13 +00:00
const dedupedPayloads = filterMessagingToolDuplicates ( {
payloads : replyTaggedPayloads ,
sentTexts : messagingToolSentTexts ,
} ) ;
2026-01-04 05:47:21 +01:00
const filteredPayloads = shouldDropFinalPayloads
? [ ]
: blockStreamingEnabled
2026-01-08 00:50:29 +00:00
? dedupedPayloads . filter (
2026-01-04 05:47:21 +01:00
( payload ) = > ! streamedPayloadKeys . has ( buildPayloadKey ( payload ) ) ,
)
2026-01-08 00:50:29 +00:00
: dedupedPayloads ;
2026-01-08 08:49:16 +01:00
const replyPayloads = suppressMessagingToolReplies ? [ ] : filteredPayloads ;
2026-01-04 05:47:21 +01:00
2026-01-08 08:49:16 +01:00
if ( replyPayloads . length === 0 ) return finalizeWithFollowup ( undefined ) ;
2026-01-04 05:47:21 +01:00
2026-01-08 08:49:16 +01:00
const shouldSignalTyping = replyPayloads . some ( ( payload ) = > {
2026-01-04 05:47:21 +01:00
const trimmed = payload . text ? . trim ( ) ;
if ( trimmed && trimmed !== SILENT_REPLY_TOKEN ) return true ;
if ( payload . mediaUrl ) return true ;
if ( payload . mediaUrls && payload . mediaUrls . length > 0 ) return true ;
return false ;
} ) ;
2026-01-07 22:18:11 +00:00
if ( shouldSignalTyping ) {
await typingSignals . signalRunStart ( ) ;
2026-01-04 05:47:21 +01:00
}
if ( sessionStore && sessionKey ) {
const usage = runResult . meta . agentMeta ? . usage ;
2026-01-04 17:50:55 +01:00
const modelUsed =
runResult . meta . agentMeta ? . model ? ? fallbackModel ? ? defaultModel ;
const providerUsed =
runResult . meta . agentMeta ? . provider ? ?
fallbackProvider ? ?
followupRun . run . provider ;
2026-01-04 05:47:21 +01:00
const contextTokensUsed =
agentCfgContextTokens ? ?
lookupContextTokens ( modelUsed ) ? ?
sessionEntry ? . contextTokens ? ?
DEFAULT_CONTEXT_TOKENS ;
2026-01-06 18:51:45 +00:00
if ( hasNonzeroUsage ( usage ) ) {
2026-01-04 05:47:21 +01:00
const entry = sessionEntry ? ? sessionStore [ sessionKey ] ;
if ( entry ) {
const input = usage . input ? ? 0 ;
const output = usage . output ? ? 0 ;
const promptTokens =
input + ( usage . cacheRead ? ? 0 ) + ( usage . cacheWrite ? ? 0 ) ;
const nextEntry = {
. . . entry ,
inputTokens : input ,
outputTokens : output ,
totalTokens :
promptTokens > 0 ? promptTokens : ( usage . total ? ? input ) ,
2026-01-04 17:50:55 +01:00
modelProvider : providerUsed ,
2026-01-04 05:47:21 +01:00
model : modelUsed ,
contextTokens : contextTokensUsed ? ? entry . contextTokens ,
updatedAt : Date.now ( ) ,
} ;
sessionStore [ sessionKey ] = nextEntry ;
if ( storePath ) {
await saveSessionStore ( storePath , sessionStore ) ;
}
}
} else if ( modelUsed || contextTokensUsed ) {
const entry = sessionEntry ? ? sessionStore [ sessionKey ] ;
if ( entry ) {
sessionStore [ sessionKey ] = {
. . . entry ,
2026-01-04 17:50:55 +01:00
modelProvider : providerUsed ? ? entry . modelProvider ,
2026-01-04 05:47:21 +01:00
model : modelUsed ? ? entry . model ,
contextTokens : contextTokensUsed ? ? entry . contextTokens ,
} ;
if ( storePath ) {
await saveSessionStore ( storePath , sessionStore ) ;
}
}
}
}
// If verbose is enabled and this is a new session, prepend a session hint.
2026-01-08 08:49:16 +01:00
let finalPayloads = replyPayloads ;
2026-01-06 02:41:48 +01:00
if ( autoCompactionCompleted ) {
const count = await incrementCompactionCount ( {
sessionEntry ,
sessionStore ,
sessionKey ,
storePath ,
} ) ;
if ( resolvedVerboseLevel === "on" ) {
const suffix = typeof count === "number" ? ` (count ${ count } ) ` : "" ;
finalPayloads = [
{ text : ` 🧹 Auto-compaction complete ${ suffix } . ` } ,
. . . finalPayloads ,
] ;
}
}
2026-01-04 05:47:21 +01:00
if ( resolvedVerboseLevel === "on" && isNewSession ) {
finalPayloads = [
{ text : ` 🧭 New session: ${ followupRun . run . sessionId } ` } ,
. . . finalPayloads ,
] ;
}
return finalizeWithFollowup (
finalPayloads . length === 1 ? finalPayloads [ 0 ] : finalPayloads ,
) ;
} finally {
2026-01-06 03:05:11 +00:00
typing . markRunComplete ( ) ;
2026-01-04 05:47:21 +01:00
}
}