2026-01-04 05:47:21 +01:00
import crypto from "node:crypto" ;
2026-01-06 07:24:51 -05:00
import fs from "node:fs" ;
2026-01-09 04:18:21 +00:00
import { runClaudeCliAgent } from "../../agents/claude-cli-runner.js" ;
2026-01-04 05:47:21 +01:00
import { lookupContextTokens } from "../../agents/context.js" ;
import { DEFAULT_CONTEXT_TOKENS } from "../../agents/defaults.js" ;
2026-01-09 02:21:17 +00:00
import { resolveModelAuthMode } from "../../agents/model-auth.js" ;
2026-01-04 18:11:41 +01:00
import { runWithModelFallback } from "../../agents/model-fallback.js" ;
2026-01-04 05:47:21 +01:00
import {
queueEmbeddedPiMessage ,
runEmbeddedPiAgent ,
} from "../../agents/pi-embedded.js" ;
2026-01-09 02:21:17 +00:00
import { hasNonzeroUsage , type NormalizedUsage } from "../../agents/usage.js" ;
2026-01-04 05:47:21 +01:00
import {
loadSessionStore ,
2026-01-06 07:24:51 -05:00
resolveSessionTranscriptPath ,
2026-01-04 05:47:21 +01:00
type SessionEntry ,
saveSessionStore ,
} from "../../config/sessions.js" ;
2026-01-07 21:58:54 +00:00
import type { TypingMode } from "../../config/types.js" ;
2026-01-04 05:47:21 +01:00
import { logVerbose } from "../../globals.js" ;
2026-01-09 05:27:54 +00:00
import {
emitAgentEvent ,
registerAgentRunContext ,
} from "../../infra/agent-events.js" ;
2026-01-10 02:40:41 +01:00
import { isAudioFileName } from "../../media/mime.js" ;
2026-01-04 05:47:21 +01:00
import { defaultRuntime } from "../../runtime.js" ;
2026-01-09 02:21:17 +00:00
import {
estimateUsageCost ,
formatTokenCount ,
formatUsd ,
resolveModelCostConfig ,
} from "../../utils/usage-format.js" ;
2026-01-04 05:47:21 +01:00
import { stripHeartbeatToken } from "../heartbeat.js" ;
2026-01-08 00:50:29 +00:00
import type { OriginatingChannelType , TemplateContext } from "../templating.js" ;
2026-01-04 05:47:21 +01:00
import { normalizeVerboseLevel , type VerboseLevel } from "../thinking.js" ;
2026-01-10 03:01:04 +01:00
import { SILENT_REPLY_TOKEN } from "../tokens.js" ;
2026-01-04 05:47:21 +01:00
import type { GetReplyOptions , ReplyPayload } from "../types.js" ;
2026-01-10 02:40:41 +01:00
import {
createAudioAsVoiceBuffer ,
createBlockReplyPipeline ,
} from "./block-reply-pipeline.js" ;
2026-01-09 18:19:55 +00:00
import { resolveBlockStreamingCoalescing } from "./block-streaming.js" ;
2026-01-04 05:47:21 +01:00
import { createFollowupRunner } from "./followup-runner.js" ;
import {
enqueueFollowupRun ,
type FollowupRun ,
type QueueSettings ,
scheduleFollowupDrain ,
} from "./queue.js" ;
2026-01-10 03:01:04 +01:00
import { parseReplyDirectives } from "./reply-directives.js" ;
2026-01-08 01:09:13 +00:00
import {
applyReplyTagsToPayload ,
applyReplyThreading ,
filterMessagingToolDuplicates ,
isRenderablePayload ,
2026-01-08 08:49:16 +01:00
shouldSuppressMessagingToolReplies ,
2026-01-08 01:09:13 +00:00
} from "./reply-payloads.js" ;
2026-01-08 00:50:29 +00:00
import {
2026-01-09 16:01:47 +00:00
createReplyToModeFilterForChannel ,
2026-01-08 00:50:29 +00:00
resolveReplyToMode ,
} from "./reply-threading.js" ;
2026-01-06 02:41:48 +01:00
import { incrementCompactionCount } from "./session-updates.js" ;
2026-01-04 05:47:21 +01:00
import type { TypingController } from "./typing.js" ;
2026-01-07 22:18:11 +00:00
import { createTypingSignaler } from "./typing-mode.js" ;
2026-01-04 05:47:21 +01:00
2026-01-06 22:43:29 +01:00
const BUN_FETCH_SOCKET_ERROR_RE = /socket connection was closed unexpectedly/i ;
2026-01-08 19:30:24 +01:00
const BLOCK_REPLY_SEND_TIMEOUT_MS = 15 _000 ;
2026-01-06 22:43:29 +01:00
2026-01-08 16:04:52 -08:00
/ * *
* Build Slack - specific threading context for tool auto - injection .
* Returns undefined values for non - Slack providers .
* /
function buildSlackThreadingContext ( params : {
sessionCtx : TemplateContext ;
config : { slack ? : { replyToMode ? : "off" | "first" | "all" } } | undefined ;
hasRepliedRef : { value : boolean } | undefined ;
} ) : {
currentChannelId : string | undefined ;
currentThreadTs : string | undefined ;
replyToMode : "off" | "first" | "all" | undefined ;
hasRepliedRef : { value : boolean } | undefined ;
} {
const { sessionCtx , config , hasRepliedRef } = params ;
const isSlack = sessionCtx . Provider ? . toLowerCase ( ) === "slack" ;
if ( ! isSlack ) {
return {
currentChannelId : undefined ,
currentThreadTs : undefined ,
replyToMode : undefined ,
hasRepliedRef : undefined ,
} ;
}
2026-01-09 22:09:02 +01:00
// If we're already inside a thread, never jump replies out of it (even in
// replyToMode="off"/"first"). This keeps tool calls consistent with the
// auto-reply path.
const configuredReplyToMode = config ? . slack ? . replyToMode ? ? "off" ;
const effectiveReplyToMode = sessionCtx . ThreadLabel
? ( "all" as const )
: configuredReplyToMode ;
2026-01-08 16:04:52 -08:00
return {
// Extract channel from "channel:C123" format
currentChannelId : sessionCtx.To?.startsWith ( "channel:" )
? sessionCtx . To . slice ( "channel:" . length )
: undefined ,
currentThreadTs : sessionCtx.ReplyToId ,
2026-01-09 22:09:02 +01:00
replyToMode : effectiveReplyToMode ,
2026-01-08 16:04:52 -08:00
hasRepliedRef ,
} ;
}
2026-01-06 22:43:29 +01:00
const isBunFetchSocketError = ( message? : string ) = >
Boolean ( message && BUN_FETCH_SOCKET_ERROR_RE . test ( message ) ) ;
const formatBunFetchSocketError = ( message : string ) = > {
const trimmed = message . trim ( ) ;
return [
"⚠️ LLM connection failed. This could be due to server issues, network problems, or context length exceeded (e.g., with local LLMs like LM Studio). Original error:" ,
"```" ,
trimmed || "Unknown error" ,
"```" ,
] . join ( "\n" ) ;
} ;
2026-01-09 02:21:17 +00:00
const formatResponseUsageLine = ( params : {
usage? : NormalizedUsage ;
showCost : boolean ;
costConfig ? : {
input : number ;
output : number ;
cacheRead : number ;
cacheWrite : number ;
} ;
} ) : string | null = > {
const usage = params . usage ;
if ( ! usage ) return null ;
const input = usage . input ;
const output = usage . output ;
if ( typeof input !== "number" && typeof output !== "number" ) return null ;
const inputLabel = typeof input === "number" ? formatTokenCount ( input ) : "?" ;
const outputLabel =
typeof output === "number" ? formatTokenCount ( output ) : "?" ;
const cost =
params . showCost && typeof input === "number" && typeof output === "number"
? estimateUsageCost ( {
usage : {
input ,
output ,
cacheRead : usage.cacheRead ,
cacheWrite : usage.cacheWrite ,
} ,
cost : params.costConfig ,
} )
: undefined ;
const costLabel = params . showCost ? formatUsd ( cost ) : undefined ;
const suffix = costLabel ? ` · est ${ costLabel } ` : "" ;
return ` Usage: ${ inputLabel } in / ${ outputLabel } out ${ suffix } ` ;
} ;
const appendUsageLine = (
payloads : ReplyPayload [ ] ,
line : string ,
) : ReplyPayload [ ] = > {
let index = - 1 ;
for ( let i = payloads . length - 1 ; i >= 0 ; i -= 1 ) {
if ( payloads [ i ] ? . text ) {
index = i ;
break ;
}
}
if ( index === - 1 ) return [ . . . payloads , { text : line } ] ;
const existing = payloads [ index ] ;
const existingText = existing . text ? ? "" ;
const separator = existingText . endsWith ( "\n" ) ? "" : "\n" ;
const next = {
. . . existing ,
text : ` ${ existingText } ${ separator } ${ line } ` ,
} ;
const updated = payloads . slice ( ) ;
updated [ index ] = next ;
return updated ;
} ;
2026-01-04 05:47:21 +01:00
export async function runReplyAgent ( params : {
commandBody : string ;
followupRun : FollowupRun ;
queueKey : string ;
resolvedQueue : QueueSettings ;
shouldSteer : boolean ;
shouldFollowup : boolean ;
isActive : boolean ;
isStreaming : boolean ;
opts? : GetReplyOptions ;
typing : TypingController ;
sessionEntry? : SessionEntry ;
sessionStore? : Record < string , SessionEntry > ;
sessionKey? : string ;
storePath? : string ;
defaultModel : string ;
agentCfgContextTokens? : number ;
resolvedVerboseLevel : VerboseLevel ;
isNewSession : boolean ;
blockStreamingEnabled : boolean ;
blockReplyChunking ? : {
minChars : number ;
maxChars : number ;
breakPreference : "paragraph" | "newline" | "sentence" ;
} ;
resolvedBlockStreamingBreak : "text_end" | "message_end" ;
sessionCtx : TemplateContext ;
shouldInjectGroupIntro : boolean ;
2026-01-07 21:58:54 +00:00
typingMode : TypingMode ;
2026-01-04 05:47:21 +01:00
} ) : Promise < ReplyPayload | ReplyPayload [ ] | undefined > {
const {
commandBody ,
followupRun ,
queueKey ,
resolvedQueue ,
shouldSteer ,
shouldFollowup ,
isActive ,
isStreaming ,
opts ,
typing ,
sessionEntry ,
sessionStore ,
sessionKey ,
storePath ,
defaultModel ,
agentCfgContextTokens ,
resolvedVerboseLevel ,
isNewSession ,
blockStreamingEnabled ,
blockReplyChunking ,
resolvedBlockStreamingBreak ,
sessionCtx ,
shouldInjectGroupIntro ,
2026-01-07 21:58:54 +00:00
typingMode ,
2026-01-04 05:47:21 +01:00
} = params ;
2026-01-05 12:03:36 +13:00
const isHeartbeat = opts ? . isHeartbeat === true ;
2026-01-07 22:18:11 +00:00
const typingSignals = createTypingSignaler ( {
typing ,
mode : typingMode ,
isHeartbeat ,
} ) ;
2026-01-05 12:03:36 +13:00
2026-01-04 05:47:21 +01:00
const shouldEmitToolResult = ( ) = > {
if ( ! sessionKey || ! storePath ) {
return resolvedVerboseLevel === "on" ;
}
try {
const store = loadSessionStore ( storePath ) ;
const entry = store [ sessionKey ] ;
const current = normalizeVerboseLevel ( entry ? . verboseLevel ) ;
if ( current ) return current === "on" ;
} catch {
// ignore store read failures
}
return resolvedVerboseLevel === "on" ;
} ;
2026-01-07 00:24:08 +05:30
const pendingToolTasks = new Set < Promise < void > > ( ) ;
2026-01-08 19:30:24 +01:00
const blockReplyTimeoutMs =
opts ? . blockReplyTimeoutMs ? ? BLOCK_REPLY_SEND_TIMEOUT_MS ;
2026-01-08 12:40:31 +00:00
const hasAudioMedia = ( urls? : string [ ] ) : boolean = >
2026-01-10 02:40:41 +01:00
Boolean ( urls ? . some ( ( u ) = > isAudioFileName ( u ) ) ) ;
const isAudioPayload = ( payload : ReplyPayload ) = >
hasAudioMedia (
payload . mediaUrls ? ? ( payload . mediaUrl ? [ payload . mediaUrl ] : undefined ) ,
) ;
2026-01-08 00:50:29 +00:00
const replyToChannel =
sessionCtx . OriginatingChannel ? ?
( ( sessionCtx . Surface ? ? sessionCtx . Provider ) ? . toLowerCase ( ) as
| OriginatingChannelType
| undefined ) ;
const replyToMode = resolveReplyToMode (
followupRun . run . config ,
replyToChannel ,
) ;
2026-01-09 16:01:47 +00:00
const applyReplyToMode = createReplyToModeFilterForChannel (
replyToMode ,
replyToChannel ,
) ;
2026-01-09 02:21:17 +00:00
const cfg = followupRun . run . config ;
2026-01-09 18:19:55 +00:00
const blockReplyCoalescing =
blockStreamingEnabled && opts ? . onBlockReply
? resolveBlockStreamingCoalescing (
cfg ,
sessionCtx . Provider ,
sessionCtx . AccountId ,
blockReplyChunking ,
)
: undefined ;
const blockReplyPipeline =
blockStreamingEnabled && opts ? . onBlockReply
? createBlockReplyPipeline ( {
onBlockReply : opts.onBlockReply ,
timeoutMs : blockReplyTimeoutMs ,
coalescing : blockReplyCoalescing ,
2026-01-10 02:40:41 +01:00
buffer : createAudioAsVoiceBuffer ( { isAudioPayload } ) ,
2026-01-09 18:19:55 +00:00
} )
: null ;
2026-01-04 05:47:21 +01:00
if ( shouldSteer && isStreaming ) {
const steered = queueEmbeddedPiMessage (
followupRun . run . sessionId ,
followupRun . prompt ,
) ;
if ( steered && ! shouldFollowup ) {
if ( sessionEntry && sessionStore && sessionKey ) {
sessionEntry . updatedAt = Date . now ( ) ;
sessionStore [ sessionKey ] = sessionEntry ;
if ( storePath ) {
await saveSessionStore ( storePath , sessionStore ) ;
}
}
typing . cleanup ( ) ;
return undefined ;
}
}
if ( isActive && ( shouldFollowup || resolvedQueue . mode === "steer" ) ) {
enqueueFollowupRun ( queueKey , followupRun , resolvedQueue ) ;
if ( sessionEntry && sessionStore && sessionKey ) {
sessionEntry . updatedAt = Date . now ( ) ;
sessionStore [ sessionKey ] = sessionEntry ;
if ( storePath ) {
await saveSessionStore ( storePath , sessionStore ) ;
}
}
typing . cleanup ( ) ;
return undefined ;
}
const runFollowupTurn = createFollowupRunner ( {
opts ,
typing ,
2026-01-07 21:58:54 +00:00
typingMode ,
2026-01-04 05:47:21 +01:00
sessionEntry ,
sessionStore ,
sessionKey ,
storePath ,
defaultModel ,
agentCfgContextTokens ,
} ) ;
const finalizeWithFollowup = < T > ( value : T ) : T = > {
scheduleFollowupDrain ( queueKey , runFollowupTurn ) ;
return value ;
} ;
let didLogHeartbeatStrip = false ;
2026-01-06 02:41:48 +01:00
let autoCompactionCompleted = false ;
2026-01-09 02:21:17 +00:00
let responseUsageLine : string | undefined ;
2026-01-04 05:47:21 +01:00
try {
const runId = crypto . randomUUID ( ) ;
if ( sessionKey ) {
2026-01-10 00:52:11 +01:00
registerAgentRunContext ( runId , {
sessionKey ,
verboseLevel : resolvedVerboseLevel ,
} ) ;
2026-01-04 05:47:21 +01:00
}
let runResult : Awaited < ReturnType < typeof runEmbeddedPiAgent > > ;
2026-01-04 17:50:55 +01:00
let fallbackProvider = followupRun . run . provider ;
let fallbackModel = followupRun . run . model ;
2026-01-04 05:47:21 +01:00
try {
2026-01-07 11:08:11 +01:00
const allowPartialStream = ! (
followupRun . run . reasoningLevel === "stream" && opts ? . onReasoningStream
) ;
2026-01-04 17:50:55 +01:00
const fallbackResult = await runWithModelFallback ( {
cfg : followupRun.run.config ,
2026-01-04 05:47:21 +01:00
provider : followupRun.run.provider ,
model : followupRun.run.model ,
2026-01-09 04:18:21 +00:00
run : ( provider , model ) = > {
if ( provider === "claude-cli" ) {
const startedAt = Date . now ( ) ;
emitAgentEvent ( {
runId ,
stream : "lifecycle" ,
data : {
phase : "start" ,
startedAt ,
} ,
} ) ;
return runClaudeCliAgent ( {
sessionId : followupRun.run.sessionId ,
sessionKey ,
sessionFile : followupRun.run.sessionFile ,
workspaceDir : followupRun.run.workspaceDir ,
config : followupRun.run.config ,
prompt : commandBody ,
provider ,
model ,
thinkLevel : followupRun.run.thinkLevel ,
timeoutMs : followupRun.run.timeoutMs ,
runId ,
extraSystemPrompt : followupRun.run.extraSystemPrompt ,
ownerNumbers : followupRun.run.ownerNumbers ,
2026-01-09 05:27:54 +00:00
claudeSessionId :
sessionEntry ? . claudeCliSessionId ? . trim ( ) || undefined ,
} )
2026-01-09 04:18:21 +00:00
. then ( ( result ) = > {
emitAgentEvent ( {
runId ,
stream : "lifecycle" ,
data : {
phase : "end" ,
startedAt ,
endedAt : Date.now ( ) ,
} ,
} ) ;
return result ;
} )
. catch ( ( err ) = > {
emitAgentEvent ( {
runId ,
stream : "lifecycle" ,
data : {
phase : "error" ,
startedAt ,
endedAt : Date.now ( ) ,
error : err instanceof Error ? err.message : String ( err ) ,
} ,
} ) ;
throw err ;
} ) ;
}
return runEmbeddedPiAgent ( {
2026-01-04 17:50:55 +01:00
sessionId : followupRun.run.sessionId ,
sessionKey ,
2026-01-06 18:25:37 +00:00
messageProvider :
sessionCtx . Provider ? . trim ( ) . toLowerCase ( ) || undefined ,
2026-01-08 08:49:16 +01:00
agentAccountId : sessionCtx.AccountId ,
2026-01-08 16:04:52 -08:00
// Slack threading context for tool auto-injection
. . . buildSlackThreadingContext ( {
sessionCtx ,
config : followupRun.run.config ,
hasRepliedRef : opts?.hasRepliedRef ,
} ) ,
2026-01-04 17:50:55 +01:00
sessionFile : followupRun.run.sessionFile ,
workspaceDir : followupRun.run.workspaceDir ,
2026-01-06 18:25:37 +00:00
agentDir : followupRun.run.agentDir ,
2026-01-04 17:50:55 +01:00
config : followupRun.run.config ,
skillsSnapshot : followupRun.run.skillsSnapshot ,
prompt : commandBody ,
extraSystemPrompt : followupRun.run.extraSystemPrompt ,
ownerNumbers : followupRun.run.ownerNumbers ,
enforceFinalTag : followupRun.run.enforceFinalTag ,
provider ,
model ,
2026-01-06 00:56:29 +00:00
authProfileId : followupRun.run.authProfileId ,
2026-01-04 17:50:55 +01:00
thinkLevel : followupRun.run.thinkLevel ,
verboseLevel : followupRun.run.verboseLevel ,
2026-01-07 06:16:38 +01:00
reasoningLevel : followupRun.run.reasoningLevel ,
2026-01-04 17:50:55 +01:00
bashElevated : followupRun.run.bashElevated ,
timeoutMs : followupRun.run.timeoutMs ,
runId ,
blockReplyBreak : resolvedBlockStreamingBreak ,
blockReplyChunking ,
2026-01-07 11:08:11 +01:00
onPartialReply :
opts ? . onPartialReply && allowPartialStream
? async ( payload ) = > {
let text = payload . text ;
if ( ! isHeartbeat && text ? . includes ( "HEARTBEAT_OK" ) ) {
const stripped = stripHeartbeatToken ( text , {
mode : "message" ,
} ) ;
if ( stripped . didStrip && ! didLogHeartbeatStrip ) {
didLogHeartbeatStrip = true ;
logVerbose (
"Stripped stray HEARTBEAT_OK token from reply" ,
) ;
}
if (
stripped . shouldSkip &&
( payload . mediaUrls ? . length ? ? 0 ) === 0
) {
return ;
}
text = stripped . text ;
2026-01-04 17:50:55 +01:00
}
2026-01-07 22:18:11 +00:00
await typingSignals . signalTextDelta ( text ) ;
2026-01-07 11:08:11 +01:00
await opts . onPartialReply ? . ( {
text ,
mediaUrls : payload.mediaUrls ,
} ) ;
2026-01-05 12:03:36 +13:00
}
2026-01-07 11:08:11 +01:00
: undefined ,
2026-01-07 21:58:54 +00:00
onReasoningStream :
2026-01-07 22:18:11 +00:00
typingSignals . shouldStartOnReasoning || opts ? . onReasoningStream
2026-01-07 21:58:54 +00:00
? async ( payload ) = > {
2026-01-07 22:18:11 +00:00
await typingSignals . signalReasoningDelta ( ) ;
2026-01-07 21:58:54 +00:00
await opts ? . onReasoningStream ? . ( {
text : payload.text ,
mediaUrls : payload.mediaUrls ,
} ) ;
}
: undefined ,
2026-01-06 02:41:48 +01:00
onAgentEvent : ( evt ) = > {
2026-01-07 20:34:56 -08:00
// Trigger typing when tools start executing
if ( evt . stream === "tool" ) {
const phase =
typeof evt . data . phase === "string" ? evt . data . phase : "" ;
if ( phase === "start" ) {
void typingSignals . signalToolStart ( ) ;
}
}
// Track auto-compaction completion
if ( evt . stream === "compaction" ) {
const phase =
typeof evt . data . phase === "string" ? evt . data . phase : "" ;
const willRetry = Boolean ( evt . data . willRetry ) ;
if ( phase === "end" && ! willRetry ) {
autoCompactionCompleted = true ;
}
2026-01-06 02:41:48 +01:00
}
} ,
2026-01-04 17:50:55 +01:00
onBlockReply :
blockStreamingEnabled && opts ? . onBlockReply
? async ( payload ) = > {
let text = payload . text ;
2026-01-05 12:03:36 +13:00
if ( ! isHeartbeat && text ? . includes ( "HEARTBEAT_OK" ) ) {
2026-01-04 17:50:55 +01:00
const stripped = stripHeartbeatToken ( text , {
mode : "message" ,
} ) ;
if ( stripped . didStrip && ! didLogHeartbeatStrip ) {
didLogHeartbeatStrip = true ;
logVerbose (
"Stripped stray HEARTBEAT_OK token from reply" ,
) ;
}
const hasMedia = ( payload . mediaUrls ? . length ? ? 0 ) > 0 ;
if ( stripped . shouldSkip && ! hasMedia ) return ;
text = stripped . text ;
}
2026-01-08 01:09:13 +00:00
const taggedPayload = applyReplyTagsToPayload (
{
text ,
mediaUrls : payload.mediaUrls ,
mediaUrl : payload.mediaUrls?. [ 0 ] ,
} ,
2026-01-04 17:50:55 +01:00
sessionCtx . MessageSid ,
) ;
2026-01-08 12:40:31 +00:00
// Let through payloads with audioAsVoice flag even if empty (need to track it)
2026-01-08 14:26:54 +00:00
if (
! isRenderablePayload ( taggedPayload ) &&
! payload . audioAsVoice
)
2026-01-08 12:40:31 +00:00
return ;
2026-01-10 03:01:04 +01:00
const parsed = parseReplyDirectives (
taggedPayload . text ? ? "" ,
{
currentMessageId : sessionCtx.MessageSid ,
silentToken : SILENT_REPLY_TOKEN ,
} ,
) ;
const cleaned = parsed . text || undefined ;
2026-01-08 01:09:13 +00:00
const hasMedia =
Boolean ( taggedPayload . mediaUrl ) ||
( taggedPayload . mediaUrls ? . length ? ? 0 ) > 0 ;
2026-01-08 12:40:31 +00:00
// Skip empty payloads unless they have audioAsVoice flag (need to track it)
2026-01-09 23:29:01 +00:00
if (
2026-01-10 03:01:04 +01:00
! cleaned &&
! hasMedia &&
! payload . audioAsVoice &&
! parsed . audioAsVoice
2026-01-09 23:29:01 +00:00
)
2026-01-04 17:50:55 +01:00
return ;
2026-01-10 03:01:04 +01:00
if ( parsed . isSilent && ! hasMedia ) return ;
2026-01-08 12:40:31 +00:00
2026-01-04 22:15:22 +01:00
const blockPayload : ReplyPayload = applyReplyToMode ( {
. . . taggedPayload ,
text : cleaned ,
2026-01-10 02:25:19 +00:00
audioAsVoice : Boolean (
parsed . audioAsVoice || payload . audioAsVoice ,
) ,
2026-01-10 03:01:04 +01:00
replyToId : taggedPayload.replyToId ? ? parsed . replyToId ,
replyToTag : taggedPayload.replyToTag || parsed . replyToTag ,
replyToCurrent :
taggedPayload . replyToCurrent || parsed . replyToCurrent ,
2026-01-04 22:15:22 +01:00
} ) ;
2026-01-08 12:40:31 +00:00
2026-01-08 19:30:24 +01:00
void typingSignals
2026-01-10 03:01:04 +01:00
. signalTextDelta ( cleaned ? ? taggedPayload . text )
2026-01-08 19:30:24 +01:00
. catch ( ( err ) = > {
logVerbose (
` block reply typing signal failed: ${ String ( err ) } ` ,
) ;
} ) ;
2026-01-08 12:40:31 +00:00
2026-01-09 18:19:55 +00:00
blockReplyPipeline ? . enqueue ( blockPayload ) ;
2026-01-04 17:50:55 +01:00
}
: undefined ,
shouldEmitToolResult ,
onToolResult : opts?.onToolResult
2026-01-07 00:24:08 +05:30
? ( payload ) = > {
2026-01-06 18:56:40 +00:00
// `subscribeEmbeddedPiSession` may invoke tool callbacks without awaiting them.
// If a tool callback starts typing after the run finalized, we can end up with
// a typing loop that never sees a matching markRunComplete(). Track and drain.
2026-01-07 00:24:08 +05:30
const task = ( async ( ) = > {
let text = payload . text ;
if ( ! isHeartbeat && text ? . includes ( "HEARTBEAT_OK" ) ) {
const stripped = stripHeartbeatToken ( text , {
mode : "message" ,
} ) ;
if ( stripped . didStrip && ! didLogHeartbeatStrip ) {
didLogHeartbeatStrip = true ;
logVerbose (
"Stripped stray HEARTBEAT_OK token from reply" ,
) ;
}
if (
stripped . shouldSkip &&
( payload . mediaUrls ? . length ? ? 0 ) === 0
) {
return ;
}
text = stripped . text ;
2026-01-04 17:50:55 +01:00
}
2026-01-07 22:18:11 +00:00
await typingSignals . signalTextDelta ( text ) ;
2026-01-07 00:24:08 +05:30
await opts . onToolResult ? . ( {
text ,
mediaUrls : payload.mediaUrls ,
} ) ;
} ) ( )
. catch ( ( err ) = > {
logVerbose ( ` tool result delivery failed: ${ String ( err ) } ` ) ;
} )
. finally ( ( ) = > {
pendingToolTasks . delete ( task ) ;
} ) ;
pendingToolTasks . add ( task ) ;
2026-01-04 05:47:21 +01:00
}
2026-01-04 17:50:55 +01:00
: undefined ,
2026-01-09 04:18:21 +00:00
} ) ;
} ,
2026-01-04 05:47:21 +01:00
} ) ;
2026-01-04 17:50:55 +01:00
runResult = fallbackResult . result ;
fallbackProvider = fallbackResult . provider ;
fallbackModel = fallbackResult . model ;
2026-01-04 05:47:21 +01:00
} catch ( err ) {
const message = err instanceof Error ? err.message : String ( err ) ;
const isContextOverflow =
/context.*overflow|too large|context window/i . test ( message ) ;
2026-01-06 07:24:51 -05:00
const isSessionCorruption =
2026-01-06 23:06:01 +01:00
/function call turn comes immediately after/i . test ( message ) ;
2026-01-06 07:24:51 -05:00
// Auto-recover from Gemini session corruption by resetting the session
if ( isSessionCorruption && sessionKey && sessionStore && storePath ) {
const corruptedSessionId = sessionEntry ? . sessionId ;
defaultRuntime . error (
` Session history corrupted (Gemini function call ordering). Resetting session: ${ sessionKey } ` ,
) ;
2026-01-06 23:06:01 +01:00
try {
// Delete transcript file if it exists
if ( corruptedSessionId ) {
const transcriptPath =
resolveSessionTranscriptPath ( corruptedSessionId ) ;
try {
fs . unlinkSync ( transcriptPath ) ;
} catch {
// Ignore if file doesn't exist
}
2026-01-06 07:24:51 -05:00
}
2026-01-06 23:06:01 +01:00
// Remove session entry from store
delete sessionStore [ sessionKey ] ;
await saveSessionStore ( storePath , sessionStore ) ;
} catch ( cleanupErr ) {
defaultRuntime . error (
` Failed to reset corrupted session ${ sessionKey } : ${ String ( cleanupErr ) } ` ,
) ;
}
2026-01-06 07:24:51 -05:00
return finalizeWithFollowup ( {
text : "⚠️ Session history was corrupted. I've reset the conversation - please try again!" ,
} ) ;
}
2026-01-04 05:47:21 +01:00
defaultRuntime . error ( ` Embedded agent failed before reply: ${ message } ` ) ;
return finalizeWithFollowup ( {
text : isContextOverflow
? "⚠️ Context overflow - conversation too long. Starting fresh might help!"
: ` ⚠️ Agent failed before reply: ${ message } . Check gateway logs for details. ` ,
} ) ;
}
if (
shouldInjectGroupIntro &&
sessionEntry &&
sessionStore &&
sessionKey &&
sessionEntry . groupActivationNeedsSystemIntro
) {
sessionEntry . groupActivationNeedsSystemIntro = false ;
sessionEntry . updatedAt = Date . now ( ) ;
sessionStore [ sessionKey ] = sessionEntry ;
if ( storePath ) {
await saveSessionStore ( storePath , sessionStore ) ;
}
}
const payloadArray = runResult . payloads ? ? [ ] ;
2026-01-08 12:40:31 +00:00
2026-01-09 18:19:55 +00:00
if ( blockReplyPipeline ) {
await blockReplyPipeline . flush ( { force : true } ) ;
blockReplyPipeline . stop ( ) ;
2026-01-04 05:47:21 +01:00
}
2026-01-07 00:24:08 +05:30
if ( pendingToolTasks . size > 0 ) {
await Promise . allSettled ( pendingToolTasks ) ;
}
2026-01-08 12:40:31 +00:00
2026-01-06 18:56:40 +00:00
// Drain any late tool/block deliveries before deciding there's "nothing to send".
// Otherwise, a late typing trigger (e.g. from a tool callback) can outlive the run and
// keep the typing indicator stuck.
if ( payloadArray . length === 0 ) return finalizeWithFollowup ( undefined ) ;
2026-01-04 05:47:21 +01:00
2026-01-05 12:03:36 +13:00
const sanitizedPayloads = isHeartbeat
2026-01-04 05:47:21 +01:00
? payloadArray
: payloadArray . flatMap ( ( payload ) = > {
2026-01-06 21:17:55 +01:00
let text = payload . text ;
2026-01-06 22:43:29 +01:00
if ( payload . isError && text && isBunFetchSocketError ( text ) ) {
text = formatBunFetchSocketError ( text ) ;
2026-01-06 21:17:55 +01:00
}
if ( ! text || ! text . includes ( "HEARTBEAT_OK" ) )
return [ { . . . payload , text } ] ;
2026-01-04 05:47:21 +01:00
const stripped = stripHeartbeatToken ( text , { mode : "message" } ) ;
if ( stripped . didStrip && ! didLogHeartbeatStrip ) {
didLogHeartbeatStrip = true ;
logVerbose ( "Stripped stray HEARTBEAT_OK token from reply" ) ;
}
const hasMedia =
Boolean ( payload . mediaUrl ) || ( payload . mediaUrls ? . length ? ? 0 ) > 0 ;
if ( stripped . shouldSkip && ! hasMedia ) return [ ] ;
return [ { . . . payload , text : stripped.text } ] ;
} ) ;
2026-01-08 01:09:13 +00:00
const replyTaggedPayloads : ReplyPayload [ ] = applyReplyThreading ( {
payloads : sanitizedPayloads ,
2026-01-09 16:01:47 +00:00
replyToMode ,
replyToChannel ,
2026-01-08 01:09:13 +00:00
currentMessageId : sessionCtx.MessageSid ,
2026-01-04 22:15:22 +01:00
} )
. map ( ( payload ) = > {
2026-01-10 03:01:04 +01:00
const parsed = parseReplyDirectives ( payload . text ? ? "" , {
currentMessageId : sessionCtx.MessageSid ,
silentToken : SILENT_REPLY_TOKEN ,
} ) ;
const mediaUrls = payload . mediaUrls ? ? parsed . mediaUrls ;
const mediaUrl = payload . mediaUrl ? ? parsed . mediaUrl ? ? mediaUrls ? . [ 0 ] ;
2026-01-04 22:15:22 +01:00
return {
. . . payload ,
2026-01-10 03:01:04 +01:00
text : parsed.text ? parsed.text : undefined ,
mediaUrls ,
mediaUrl ,
replyToId : payload.replyToId ? ? parsed . replyToId ,
replyToTag : payload.replyToTag || parsed . replyToTag ,
replyToCurrent : payload.replyToCurrent || parsed . replyToCurrent ,
2026-01-10 02:25:19 +00:00
audioAsVoice : Boolean ( payload . audioAsVoice || parsed . audioAsVoice ) ,
2026-01-04 22:15:22 +01:00
} ;
} )
. filter ( isRenderablePayload ) ;
2026-01-04 05:47:21 +01:00
2026-01-08 19:30:24 +01:00
// Drop final payloads only when block streaming succeeded end-to-end.
// If streaming aborted (e.g., timeout), fall back to final payloads.
2026-01-04 05:47:21 +01:00
const shouldDropFinalPayloads =
2026-01-09 18:19:55 +00:00
blockStreamingEnabled &&
Boolean ( blockReplyPipeline ? . didStream ( ) ) &&
! blockReplyPipeline ? . isAborted ( ) ;
2026-01-08 00:50:29 +00:00
const messagingToolSentTexts = runResult . messagingToolSentTexts ? ? [ ] ;
2026-01-08 08:49:16 +01:00
const messagingToolSentTargets = runResult . messagingToolSentTargets ? ? [ ] ;
const suppressMessagingToolReplies = shouldSuppressMessagingToolReplies ( {
messageProvider : followupRun.run.messageProvider ,
messagingToolSentTargets ,
originatingTo : sessionCtx.OriginatingTo ? ? sessionCtx . To ,
accountId : sessionCtx.AccountId ,
} ) ;
2026-01-08 01:09:13 +00:00
const dedupedPayloads = filterMessagingToolDuplicates ( {
payloads : replyTaggedPayloads ,
sentTexts : messagingToolSentTexts ,
} ) ;
2026-01-04 05:47:21 +01:00
const filteredPayloads = shouldDropFinalPayloads
? [ ]
: blockStreamingEnabled
2026-01-08 00:50:29 +00:00
? dedupedPayloads . filter (
2026-01-09 18:19:55 +00:00
( payload ) = > ! blockReplyPipeline ? . hasSentPayload ( payload ) ,
2026-01-04 05:47:21 +01:00
)
2026-01-08 00:50:29 +00:00
: dedupedPayloads ;
2026-01-08 08:49:16 +01:00
const replyPayloads = suppressMessagingToolReplies ? [ ] : filteredPayloads ;
2026-01-04 05:47:21 +01:00
2026-01-08 08:49:16 +01:00
if ( replyPayloads . length === 0 ) return finalizeWithFollowup ( undefined ) ;
2026-01-04 05:47:21 +01:00
2026-01-08 08:49:16 +01:00
const shouldSignalTyping = replyPayloads . some ( ( payload ) = > {
2026-01-04 05:47:21 +01:00
const trimmed = payload . text ? . trim ( ) ;
2026-01-10 03:01:04 +01:00
if ( trimmed ) return true ;
2026-01-04 05:47:21 +01:00
if ( payload . mediaUrl ) return true ;
if ( payload . mediaUrls && payload . mediaUrls . length > 0 ) return true ;
return false ;
} ) ;
2026-01-07 22:18:11 +00:00
if ( shouldSignalTyping ) {
await typingSignals . signalRunStart ( ) ;
2026-01-04 05:47:21 +01:00
}
2026-01-09 02:21:17 +00:00
const usage = runResult . meta . agentMeta ? . usage ;
const modelUsed =
runResult . meta . agentMeta ? . model ? ? fallbackModel ? ? defaultModel ;
const providerUsed =
runResult . meta . agentMeta ? . provider ? ?
fallbackProvider ? ?
followupRun . run . provider ;
2026-01-09 04:18:21 +00:00
const cliSessionId =
providerUsed === "claude-cli"
? runResult . meta . agentMeta ? . sessionId ? . trim ( )
: undefined ;
2026-01-09 02:21:17 +00:00
const contextTokensUsed =
agentCfgContextTokens ? ?
lookupContextTokens ( modelUsed ) ? ?
sessionEntry ? . contextTokens ? ?
DEFAULT_CONTEXT_TOKENS ;
2026-01-04 05:47:21 +01:00
2026-01-09 02:21:17 +00:00
if ( sessionStore && sessionKey ) {
2026-01-06 18:51:45 +00:00
if ( hasNonzeroUsage ( usage ) ) {
2026-01-04 05:47:21 +01:00
const entry = sessionEntry ? ? sessionStore [ sessionKey ] ;
if ( entry ) {
const input = usage . input ? ? 0 ;
const output = usage . output ? ? 0 ;
const promptTokens =
input + ( usage . cacheRead ? ? 0 ) + ( usage . cacheWrite ? ? 0 ) ;
const nextEntry = {
. . . entry ,
inputTokens : input ,
outputTokens : output ,
totalTokens :
promptTokens > 0 ? promptTokens : ( usage . total ? ? input ) ,
2026-01-04 17:50:55 +01:00
modelProvider : providerUsed ,
2026-01-04 05:47:21 +01:00
model : modelUsed ,
contextTokens : contextTokensUsed ? ? entry . contextTokens ,
updatedAt : Date.now ( ) ,
} ;
2026-01-09 04:18:21 +00:00
if ( cliSessionId ) {
nextEntry . claudeCliSessionId = cliSessionId ;
}
2026-01-04 05:47:21 +01:00
sessionStore [ sessionKey ] = nextEntry ;
if ( storePath ) {
await saveSessionStore ( storePath , sessionStore ) ;
}
}
} else if ( modelUsed || contextTokensUsed ) {
const entry = sessionEntry ? ? sessionStore [ sessionKey ] ;
if ( entry ) {
sessionStore [ sessionKey ] = {
. . . entry ,
2026-01-04 17:50:55 +01:00
modelProvider : providerUsed ? ? entry . modelProvider ,
2026-01-04 05:47:21 +01:00
model : modelUsed ? ? entry . model ,
contextTokens : contextTokensUsed ? ? entry . contextTokens ,
2026-01-09 04:18:21 +00:00
claudeCliSessionId : cliSessionId ? ? entry . claudeCliSessionId ,
2026-01-04 05:47:21 +01:00
} ;
if ( storePath ) {
await saveSessionStore ( storePath , sessionStore ) ;
}
}
}
}
2026-01-09 02:21:17 +00:00
const responseUsageEnabled =
( sessionEntry ? . responseUsage ? ?
( sessionKey
? sessionStore ? . [ sessionKey ] ? . responseUsage
: undefined ) ) === "on" ;
if ( responseUsageEnabled && hasNonzeroUsage ( usage ) ) {
const authMode = resolveModelAuthMode ( providerUsed , cfg ) ;
const showCost = authMode === "api-key" ;
const costConfig = showCost
? resolveModelCostConfig ( {
provider : providerUsed ,
model : modelUsed ,
config : cfg ,
} )
: undefined ;
const formatted = formatResponseUsageLine ( {
usage ,
showCost ,
costConfig ,
} ) ;
if ( formatted ) responseUsageLine = formatted ;
}
2026-01-04 05:47:21 +01:00
// If verbose is enabled and this is a new session, prepend a session hint.
2026-01-08 08:49:16 +01:00
let finalPayloads = replyPayloads ;
2026-01-06 02:41:48 +01:00
if ( autoCompactionCompleted ) {
const count = await incrementCompactionCount ( {
sessionEntry ,
sessionStore ,
sessionKey ,
storePath ,
} ) ;
if ( resolvedVerboseLevel === "on" ) {
const suffix = typeof count === "number" ? ` (count ${ count } ) ` : "" ;
finalPayloads = [
{ text : ` 🧹 Auto-compaction complete ${ suffix } . ` } ,
. . . finalPayloads ,
] ;
}
}
2026-01-04 05:47:21 +01:00
if ( resolvedVerboseLevel === "on" && isNewSession ) {
finalPayloads = [
{ text : ` 🧭 New session: ${ followupRun . run . sessionId } ` } ,
. . . finalPayloads ,
] ;
}
2026-01-09 02:21:17 +00:00
if ( responseUsageLine ) {
finalPayloads = appendUsageLine ( finalPayloads , responseUsageLine ) ;
}
2026-01-04 05:47:21 +01:00
return finalizeWithFollowup (
finalPayloads . length === 1 ? finalPayloads [ 0 ] : finalPayloads ,
) ;
} finally {
2026-01-09 18:19:55 +00:00
blockReplyPipeline ? . stop ( ) ;
2026-01-06 03:05:11 +00:00
typing . markRunComplete ( ) ;
2026-01-04 05:47:21 +01:00
}
}