P2-1 (agent-runner.ts): Restrict direct completion notice to block-streaming runs. The condition now checks blockStreamingEnabled in addition to opts?.onBlockReply, preventing duplicate completion notices in non-streaming sessions where verboseNotices already handles the compaction-complete text. P2-2 (agent-runner-execution.ts): Emit compaction start notice when streaming is off. blockReplyHandler is a no-op for non-streaming runs, so add a direct fallback path: when blockStreamingEnabled is false and opts.onBlockReply is present, send the start notice directly with applyReplyToMode threading applied.
775 lines
26 KiB
TypeScript
775 lines
26 KiB
TypeScript
import fs from "node:fs";
|
|
import { lookupContextTokens } from "../../agents/context.js";
|
|
import { DEFAULT_CONTEXT_TOKENS } from "../../agents/defaults.js";
|
|
import { resolveModelAuthMode } from "../../agents/model-auth.js";
|
|
import { isCliProvider } from "../../agents/model-selection.js";
|
|
import { queueEmbeddedPiMessage } from "../../agents/pi-embedded.js";
|
|
import { hasNonzeroUsage } from "../../agents/usage.js";
|
|
import {
|
|
resolveAgentIdFromSessionKey,
|
|
resolveSessionFilePath,
|
|
resolveSessionFilePathOptions,
|
|
resolveSessionTranscriptPath,
|
|
type SessionEntry,
|
|
updateSessionStore,
|
|
updateSessionStoreEntry,
|
|
} from "../../config/sessions.js";
|
|
import type { TypingMode } from "../../config/types.js";
|
|
import { emitAgentEvent } from "../../infra/agent-events.js";
|
|
import { emitDiagnosticEvent, isDiagnosticsEnabled } from "../../infra/diagnostic-events.js";
|
|
import { generateSecureUuid } from "../../infra/secure-random.js";
|
|
import { enqueueSystemEvent } from "../../infra/system-events.js";
|
|
import { defaultRuntime } from "../../runtime.js";
|
|
import { estimateUsageCost, resolveModelCostConfig } from "../../utils/usage-format.js";
|
|
import {
|
|
buildFallbackClearedNotice,
|
|
buildFallbackNotice,
|
|
resolveFallbackTransition,
|
|
} from "../fallback-state.js";
|
|
import type { OriginatingChannelType, TemplateContext } from "../templating.js";
|
|
import { resolveResponseUsageMode, type VerboseLevel } from "../thinking.js";
|
|
import type { GetReplyOptions, ReplyPayload } from "../types.js";
|
|
import { runAgentTurnWithFallback } from "./agent-runner-execution.js";
|
|
import {
|
|
createShouldEmitToolOutput,
|
|
createShouldEmitToolResult,
|
|
finalizeWithFollowup,
|
|
isAudioPayload,
|
|
signalTypingIfNeeded,
|
|
} from "./agent-runner-helpers.js";
|
|
import { runMemoryFlushIfNeeded } from "./agent-runner-memory.js";
|
|
import { buildReplyPayloads } from "./agent-runner-payloads.js";
|
|
import {
|
|
appendUnscheduledReminderNote,
|
|
hasSessionRelatedCronJobs,
|
|
hasUnbackedReminderCommitment,
|
|
} from "./agent-runner-reminder-guard.js";
|
|
import { appendUsageLine, formatResponseUsageLine } from "./agent-runner-utils.js";
|
|
import { createAudioAsVoiceBuffer, createBlockReplyPipeline } from "./block-reply-pipeline.js";
|
|
import { resolveEffectiveBlockStreamingConfig } from "./block-streaming.js";
|
|
import { createFollowupRunner } from "./followup-runner.js";
|
|
import { resolveOriginMessageProvider, resolveOriginMessageTo } from "./origin-routing.js";
|
|
import { readPostCompactionContext } from "./post-compaction-context.js";
|
|
import { resolveActiveRunQueueAction } from "./queue-policy.js";
|
|
import { enqueueFollowupRun, type FollowupRun, type QueueSettings } from "./queue.js";
|
|
import { createReplyMediaPathNormalizer } from "./reply-media-paths.js";
|
|
import { createReplyToModeFilterForChannel, resolveReplyToMode } from "./reply-threading.js";
|
|
import { incrementRunCompactionCount, persistRunSessionUsage } from "./session-run-accounting.js";
|
|
import { createTypingSignaler } from "./typing-mode.js";
|
|
import type { TypingController } from "./typing.js";
|
|
|
|
const BLOCK_REPLY_SEND_TIMEOUT_MS = 15_000;
|
|
|
|
export async function runReplyAgent(params: {
|
|
commandBody: string;
|
|
followupRun: FollowupRun;
|
|
queueKey: string;
|
|
resolvedQueue: QueueSettings;
|
|
shouldSteer: boolean;
|
|
shouldFollowup: boolean;
|
|
isActive: boolean;
|
|
isStreaming: boolean;
|
|
opts?: GetReplyOptions;
|
|
typing: TypingController;
|
|
sessionEntry?: SessionEntry;
|
|
sessionStore?: Record<string, SessionEntry>;
|
|
sessionKey?: string;
|
|
storePath?: string;
|
|
defaultModel: string;
|
|
agentCfgContextTokens?: number;
|
|
resolvedVerboseLevel: VerboseLevel;
|
|
isNewSession: boolean;
|
|
blockStreamingEnabled: boolean;
|
|
blockReplyChunking?: {
|
|
minChars: number;
|
|
maxChars: number;
|
|
breakPreference: "paragraph" | "newline" | "sentence";
|
|
flushOnParagraph?: boolean;
|
|
};
|
|
resolvedBlockStreamingBreak: "text_end" | "message_end";
|
|
sessionCtx: TemplateContext;
|
|
shouldInjectGroupIntro: boolean;
|
|
typingMode: TypingMode;
|
|
}): Promise<ReplyPayload | ReplyPayload[] | undefined> {
|
|
const {
|
|
commandBody,
|
|
followupRun,
|
|
queueKey,
|
|
resolvedQueue,
|
|
shouldSteer,
|
|
shouldFollowup,
|
|
isActive,
|
|
isStreaming,
|
|
opts,
|
|
typing,
|
|
sessionEntry,
|
|
sessionStore,
|
|
sessionKey,
|
|
storePath,
|
|
defaultModel,
|
|
agentCfgContextTokens,
|
|
resolvedVerboseLevel,
|
|
isNewSession,
|
|
blockStreamingEnabled,
|
|
blockReplyChunking,
|
|
resolvedBlockStreamingBreak,
|
|
sessionCtx,
|
|
shouldInjectGroupIntro,
|
|
typingMode,
|
|
} = params;
|
|
|
|
let activeSessionEntry = sessionEntry;
|
|
const activeSessionStore = sessionStore;
|
|
let activeIsNewSession = isNewSession;
|
|
|
|
const isHeartbeat = opts?.isHeartbeat === true;
|
|
const typingSignals = createTypingSignaler({
|
|
typing,
|
|
mode: typingMode,
|
|
isHeartbeat,
|
|
});
|
|
|
|
const shouldEmitToolResult = createShouldEmitToolResult({
|
|
sessionKey,
|
|
storePath,
|
|
resolvedVerboseLevel,
|
|
});
|
|
const shouldEmitToolOutput = createShouldEmitToolOutput({
|
|
sessionKey,
|
|
storePath,
|
|
resolvedVerboseLevel,
|
|
});
|
|
|
|
const pendingToolTasks = new Set<Promise<void>>();
|
|
const blockReplyTimeoutMs = opts?.blockReplyTimeoutMs ?? BLOCK_REPLY_SEND_TIMEOUT_MS;
|
|
|
|
const replyToChannel = resolveOriginMessageProvider({
|
|
originatingChannel: sessionCtx.OriginatingChannel,
|
|
provider: sessionCtx.Surface ?? sessionCtx.Provider,
|
|
}) as OriginatingChannelType | undefined;
|
|
const replyToMode = resolveReplyToMode(
|
|
followupRun.run.config,
|
|
replyToChannel,
|
|
sessionCtx.AccountId,
|
|
sessionCtx.ChatType,
|
|
);
|
|
const applyReplyToMode = createReplyToModeFilterForChannel(replyToMode, replyToChannel);
|
|
const cfg = followupRun.run.config;
|
|
const normalizeReplyMediaPaths = createReplyMediaPathNormalizer({
|
|
cfg,
|
|
sessionKey,
|
|
workspaceDir: followupRun.run.workspaceDir,
|
|
});
|
|
const blockReplyCoalescing =
|
|
blockStreamingEnabled && opts?.onBlockReply
|
|
? resolveEffectiveBlockStreamingConfig({
|
|
cfg,
|
|
provider: sessionCtx.Provider,
|
|
accountId: sessionCtx.AccountId,
|
|
chunking: blockReplyChunking,
|
|
}).coalescing
|
|
: undefined;
|
|
const blockReplyPipeline =
|
|
blockStreamingEnabled && opts?.onBlockReply
|
|
? createBlockReplyPipeline({
|
|
onBlockReply: opts.onBlockReply,
|
|
timeoutMs: blockReplyTimeoutMs,
|
|
coalescing: blockReplyCoalescing,
|
|
buffer: createAudioAsVoiceBuffer({ isAudioPayload }),
|
|
})
|
|
: null;
|
|
const touchActiveSessionEntry = async () => {
|
|
if (!activeSessionEntry || !activeSessionStore || !sessionKey) {
|
|
return;
|
|
}
|
|
const updatedAt = Date.now();
|
|
activeSessionEntry.updatedAt = updatedAt;
|
|
activeSessionStore[sessionKey] = activeSessionEntry;
|
|
if (storePath) {
|
|
await updateSessionStoreEntry({
|
|
storePath,
|
|
sessionKey,
|
|
update: async () => ({ updatedAt }),
|
|
});
|
|
}
|
|
};
|
|
|
|
if (shouldSteer && isStreaming) {
|
|
const steered = queueEmbeddedPiMessage(followupRun.run.sessionId, followupRun.prompt);
|
|
if (steered && !shouldFollowup) {
|
|
await touchActiveSessionEntry();
|
|
typing.cleanup();
|
|
return undefined;
|
|
}
|
|
}
|
|
|
|
const activeRunQueueAction = resolveActiveRunQueueAction({
|
|
isActive,
|
|
isHeartbeat,
|
|
shouldFollowup,
|
|
queueMode: resolvedQueue.mode,
|
|
});
|
|
|
|
if (activeRunQueueAction === "drop") {
|
|
typing.cleanup();
|
|
return undefined;
|
|
}
|
|
|
|
if (activeRunQueueAction === "enqueue-followup") {
|
|
enqueueFollowupRun(queueKey, followupRun, resolvedQueue);
|
|
await touchActiveSessionEntry();
|
|
typing.cleanup();
|
|
return undefined;
|
|
}
|
|
|
|
await typingSignals.signalRunStart();
|
|
|
|
activeSessionEntry = await runMemoryFlushIfNeeded({
|
|
cfg,
|
|
followupRun,
|
|
promptForEstimate: followupRun.prompt,
|
|
sessionCtx,
|
|
opts,
|
|
defaultModel,
|
|
agentCfgContextTokens,
|
|
resolvedVerboseLevel,
|
|
sessionEntry: activeSessionEntry,
|
|
sessionStore: activeSessionStore,
|
|
sessionKey,
|
|
storePath,
|
|
isHeartbeat,
|
|
});
|
|
|
|
const runFollowupTurn = createFollowupRunner({
|
|
opts,
|
|
typing,
|
|
typingMode,
|
|
sessionEntry: activeSessionEntry,
|
|
sessionStore: activeSessionStore,
|
|
sessionKey,
|
|
storePath,
|
|
defaultModel,
|
|
agentCfgContextTokens,
|
|
});
|
|
|
|
let responseUsageLine: string | undefined;
|
|
type SessionResetOptions = {
|
|
failureLabel: string;
|
|
buildLogMessage: (nextSessionId: string) => string;
|
|
cleanupTranscripts?: boolean;
|
|
};
|
|
const resetSession = async ({
|
|
failureLabel,
|
|
buildLogMessage,
|
|
cleanupTranscripts,
|
|
}: SessionResetOptions): Promise<boolean> => {
|
|
if (!sessionKey || !activeSessionStore || !storePath) {
|
|
return false;
|
|
}
|
|
const prevEntry = activeSessionStore[sessionKey] ?? activeSessionEntry;
|
|
if (!prevEntry) {
|
|
return false;
|
|
}
|
|
const prevSessionId = cleanupTranscripts ? prevEntry.sessionId : undefined;
|
|
const nextSessionId = generateSecureUuid();
|
|
const nextEntry: SessionEntry = {
|
|
...prevEntry,
|
|
sessionId: nextSessionId,
|
|
updatedAt: Date.now(),
|
|
systemSent: false,
|
|
abortedLastRun: false,
|
|
modelProvider: undefined,
|
|
model: undefined,
|
|
inputTokens: undefined,
|
|
outputTokens: undefined,
|
|
totalTokens: undefined,
|
|
totalTokensFresh: false,
|
|
estimatedCostUsd: undefined,
|
|
cacheRead: undefined,
|
|
cacheWrite: undefined,
|
|
contextTokens: undefined,
|
|
systemPromptReport: undefined,
|
|
fallbackNoticeSelectedModel: undefined,
|
|
fallbackNoticeActiveModel: undefined,
|
|
fallbackNoticeReason: undefined,
|
|
};
|
|
const agentId = resolveAgentIdFromSessionKey(sessionKey);
|
|
const nextSessionFile = resolveSessionTranscriptPath(
|
|
nextSessionId,
|
|
agentId,
|
|
sessionCtx.MessageThreadId,
|
|
);
|
|
nextEntry.sessionFile = nextSessionFile;
|
|
activeSessionStore[sessionKey] = nextEntry;
|
|
try {
|
|
await updateSessionStore(storePath, (store) => {
|
|
store[sessionKey] = nextEntry;
|
|
});
|
|
} catch (err) {
|
|
defaultRuntime.error(
|
|
`Failed to persist session reset after ${failureLabel} (${sessionKey}): ${String(err)}`,
|
|
);
|
|
}
|
|
followupRun.run.sessionId = nextSessionId;
|
|
followupRun.run.sessionFile = nextSessionFile;
|
|
activeSessionEntry = nextEntry;
|
|
activeIsNewSession = true;
|
|
defaultRuntime.error(buildLogMessage(nextSessionId));
|
|
if (cleanupTranscripts && prevSessionId) {
|
|
const transcriptCandidates = new Set<string>();
|
|
const resolved = resolveSessionFilePath(
|
|
prevSessionId,
|
|
prevEntry,
|
|
resolveSessionFilePathOptions({ agentId, storePath }),
|
|
);
|
|
if (resolved) {
|
|
transcriptCandidates.add(resolved);
|
|
}
|
|
transcriptCandidates.add(resolveSessionTranscriptPath(prevSessionId, agentId));
|
|
for (const candidate of transcriptCandidates) {
|
|
try {
|
|
fs.unlinkSync(candidate);
|
|
} catch {
|
|
// Best-effort cleanup.
|
|
}
|
|
}
|
|
}
|
|
return true;
|
|
};
|
|
const resetSessionAfterCompactionFailure = async (reason: string): Promise<boolean> =>
|
|
resetSession({
|
|
failureLabel: "compaction failure",
|
|
buildLogMessage: (nextSessionId) =>
|
|
`Auto-compaction failed (${reason}). Restarting session ${sessionKey} -> ${nextSessionId} and retrying.`,
|
|
});
|
|
const resetSessionAfterRoleOrderingConflict = async (reason: string): Promise<boolean> =>
|
|
resetSession({
|
|
failureLabel: "role ordering conflict",
|
|
buildLogMessage: (nextSessionId) =>
|
|
`Role ordering conflict (${reason}). Restarting session ${sessionKey} -> ${nextSessionId}.`,
|
|
cleanupTranscripts: true,
|
|
});
|
|
try {
|
|
const runStartedAt = Date.now();
|
|
const runOutcome = await runAgentTurnWithFallback({
|
|
commandBody,
|
|
followupRun,
|
|
sessionCtx,
|
|
opts,
|
|
typingSignals,
|
|
blockReplyPipeline,
|
|
blockStreamingEnabled,
|
|
blockReplyChunking,
|
|
resolvedBlockStreamingBreak,
|
|
applyReplyToMode,
|
|
shouldEmitToolResult,
|
|
shouldEmitToolOutput,
|
|
pendingToolTasks,
|
|
resetSessionAfterCompactionFailure,
|
|
resetSessionAfterRoleOrderingConflict,
|
|
isHeartbeat,
|
|
sessionKey,
|
|
getActiveSessionEntry: () => activeSessionEntry,
|
|
activeSessionStore,
|
|
storePath,
|
|
resolvedVerboseLevel,
|
|
});
|
|
|
|
if (runOutcome.kind === "final") {
|
|
return finalizeWithFollowup(runOutcome.payload, queueKey, runFollowupTurn);
|
|
}
|
|
|
|
const {
|
|
runId,
|
|
runResult,
|
|
fallbackProvider,
|
|
fallbackModel,
|
|
fallbackAttempts,
|
|
directlySentBlockKeys,
|
|
} = runOutcome;
|
|
let { didLogHeartbeatStrip, autoCompactionCount } = runOutcome;
|
|
|
|
if (
|
|
shouldInjectGroupIntro &&
|
|
activeSessionEntry &&
|
|
activeSessionStore &&
|
|
sessionKey &&
|
|
activeSessionEntry.groupActivationNeedsSystemIntro
|
|
) {
|
|
const updatedAt = Date.now();
|
|
activeSessionEntry.groupActivationNeedsSystemIntro = false;
|
|
activeSessionEntry.updatedAt = updatedAt;
|
|
activeSessionStore[sessionKey] = activeSessionEntry;
|
|
if (storePath) {
|
|
await updateSessionStoreEntry({
|
|
storePath,
|
|
sessionKey,
|
|
update: async () => ({
|
|
groupActivationNeedsSystemIntro: false,
|
|
updatedAt,
|
|
}),
|
|
});
|
|
}
|
|
}
|
|
|
|
const payloadArray = runResult.payloads ?? [];
|
|
|
|
if (blockReplyPipeline) {
|
|
await blockReplyPipeline.flush({ force: true });
|
|
blockReplyPipeline.stop();
|
|
}
|
|
|
|
// Send the compaction completion notice *after* the pipeline has flushed and
|
|
// stopped, so the enqueue does not set didStream() = true and cause
|
|
// buildReplyPayloads to discard the real assistant reply. We still apply a
|
|
// timeout so the notice cannot stall the run indefinitely.
|
|
if (autoCompactionCompleted && blockStreamingEnabled && opts?.onBlockReply) {
|
|
const verboseEnabled = resolvedVerboseLevel !== "off";
|
|
const completionText = verboseEnabled
|
|
? `🧹 Auto-compaction complete.`
|
|
: `✅ Context compacted.`;
|
|
const currentMessageId = sessionCtx.MessageSidFull ?? sessionCtx.MessageSid;
|
|
const noticePayload = applyReplyToMode({
|
|
text: completionText,
|
|
replyToId: currentMessageId,
|
|
replyToCurrent: true,
|
|
});
|
|
// Fire-and-forget with timeout — best-effort delivery; failure must not
|
|
// propagate to the caller.
|
|
void Promise.race([
|
|
opts.onBlockReply(noticePayload),
|
|
new Promise<void>((_, reject) =>
|
|
setTimeout(() => reject(new Error("compaction notice timeout")), blockReplyTimeoutMs),
|
|
),
|
|
]).catch(() => {
|
|
// Intentionally swallowed — the notice is informational only.
|
|
});
|
|
}
|
|
if (pendingToolTasks.size > 0) {
|
|
await Promise.allSettled(pendingToolTasks);
|
|
}
|
|
|
|
const usage = runResult.meta?.agentMeta?.usage;
|
|
const promptTokens = runResult.meta?.agentMeta?.promptTokens;
|
|
const modelUsed = runResult.meta?.agentMeta?.model ?? fallbackModel ?? defaultModel;
|
|
const providerUsed =
|
|
runResult.meta?.agentMeta?.provider ?? fallbackProvider ?? followupRun.run.provider;
|
|
const verboseEnabled = resolvedVerboseLevel !== "off";
|
|
const selectedProvider = followupRun.run.provider;
|
|
const selectedModel = followupRun.run.model;
|
|
const fallbackStateEntry =
|
|
activeSessionEntry ?? (sessionKey ? activeSessionStore?.[sessionKey] : undefined);
|
|
const fallbackTransition = resolveFallbackTransition({
|
|
selectedProvider,
|
|
selectedModel,
|
|
activeProvider: providerUsed,
|
|
activeModel: modelUsed,
|
|
attempts: fallbackAttempts,
|
|
state: fallbackStateEntry,
|
|
});
|
|
if (fallbackTransition.stateChanged) {
|
|
if (fallbackStateEntry) {
|
|
fallbackStateEntry.fallbackNoticeSelectedModel = fallbackTransition.nextState.selectedModel;
|
|
fallbackStateEntry.fallbackNoticeActiveModel = fallbackTransition.nextState.activeModel;
|
|
fallbackStateEntry.fallbackNoticeReason = fallbackTransition.nextState.reason;
|
|
fallbackStateEntry.updatedAt = Date.now();
|
|
activeSessionEntry = fallbackStateEntry;
|
|
}
|
|
if (sessionKey && fallbackStateEntry && activeSessionStore) {
|
|
activeSessionStore[sessionKey] = fallbackStateEntry;
|
|
}
|
|
if (sessionKey && storePath) {
|
|
await updateSessionStoreEntry({
|
|
storePath,
|
|
sessionKey,
|
|
update: async () => ({
|
|
fallbackNoticeSelectedModel: fallbackTransition.nextState.selectedModel,
|
|
fallbackNoticeActiveModel: fallbackTransition.nextState.activeModel,
|
|
fallbackNoticeReason: fallbackTransition.nextState.reason,
|
|
}),
|
|
});
|
|
}
|
|
}
|
|
const cliSessionId = isCliProvider(providerUsed, cfg)
|
|
? runResult.meta?.agentMeta?.sessionId?.trim()
|
|
: undefined;
|
|
const contextTokensUsed =
|
|
agentCfgContextTokens ??
|
|
lookupContextTokens(modelUsed) ??
|
|
activeSessionEntry?.contextTokens ??
|
|
DEFAULT_CONTEXT_TOKENS;
|
|
|
|
await persistRunSessionUsage({
|
|
storePath,
|
|
sessionKey,
|
|
cfg,
|
|
usage,
|
|
lastCallUsage: runResult.meta?.agentMeta?.lastCallUsage,
|
|
promptTokens,
|
|
modelUsed,
|
|
providerUsed,
|
|
contextTokensUsed,
|
|
systemPromptReport: runResult.meta?.systemPromptReport,
|
|
cliSessionId,
|
|
});
|
|
|
|
// Drain any late tool/block deliveries before deciding there's "nothing to send".
|
|
// Otherwise, a late typing trigger (e.g. from a tool callback) can outlive the run and
|
|
// keep the typing indicator stuck.
|
|
if (payloadArray.length === 0) {
|
|
return finalizeWithFollowup(undefined, queueKey, runFollowupTurn);
|
|
}
|
|
|
|
const payloadResult = await buildReplyPayloads({
|
|
payloads: payloadArray,
|
|
isHeartbeat,
|
|
didLogHeartbeatStrip,
|
|
blockStreamingEnabled,
|
|
blockReplyPipeline,
|
|
directlySentBlockKeys,
|
|
replyToMode,
|
|
replyToChannel,
|
|
currentMessageId: sessionCtx.MessageSidFull ?? sessionCtx.MessageSid,
|
|
messageProvider: followupRun.run.messageProvider,
|
|
messagingToolSentTexts: runResult.messagingToolSentTexts,
|
|
messagingToolSentMediaUrls: runResult.messagingToolSentMediaUrls,
|
|
messagingToolSentTargets: runResult.messagingToolSentTargets,
|
|
originatingChannel: sessionCtx.OriginatingChannel,
|
|
originatingTo: resolveOriginMessageTo({
|
|
originatingTo: sessionCtx.OriginatingTo,
|
|
to: sessionCtx.To,
|
|
}),
|
|
accountId: sessionCtx.AccountId,
|
|
normalizeMediaPaths: normalizeReplyMediaPaths,
|
|
});
|
|
const { replyPayloads } = payloadResult;
|
|
didLogHeartbeatStrip = payloadResult.didLogHeartbeatStrip;
|
|
|
|
if (replyPayloads.length === 0) {
|
|
return finalizeWithFollowup(undefined, queueKey, runFollowupTurn);
|
|
}
|
|
|
|
const successfulCronAdds = runResult.successfulCronAdds ?? 0;
|
|
const hasReminderCommitment = replyPayloads.some(
|
|
(payload) =>
|
|
!payload.isError &&
|
|
typeof payload.text === "string" &&
|
|
hasUnbackedReminderCommitment(payload.text),
|
|
);
|
|
// Suppress the guard note when an existing cron job (created in a prior
|
|
// turn) already covers the commitment — avoids false positives (#32228).
|
|
const coveredByExistingCron =
|
|
hasReminderCommitment && successfulCronAdds === 0
|
|
? await hasSessionRelatedCronJobs({
|
|
cronStorePath: cfg.cron?.store,
|
|
sessionKey,
|
|
})
|
|
: false;
|
|
const guardedReplyPayloads =
|
|
hasReminderCommitment && successfulCronAdds === 0 && !coveredByExistingCron
|
|
? appendUnscheduledReminderNote(replyPayloads)
|
|
: replyPayloads;
|
|
|
|
await signalTypingIfNeeded(guardedReplyPayloads, typingSignals);
|
|
|
|
if (isDiagnosticsEnabled(cfg) && hasNonzeroUsage(usage)) {
|
|
const input = usage.input ?? 0;
|
|
const output = usage.output ?? 0;
|
|
const cacheRead = usage.cacheRead ?? 0;
|
|
const cacheWrite = usage.cacheWrite ?? 0;
|
|
const promptTokens = input + cacheRead + cacheWrite;
|
|
const totalTokens = usage.total ?? promptTokens + output;
|
|
const costConfig = resolveModelCostConfig({
|
|
provider: providerUsed,
|
|
model: modelUsed,
|
|
config: cfg,
|
|
});
|
|
const costUsd = estimateUsageCost({ usage, cost: costConfig });
|
|
emitDiagnosticEvent({
|
|
type: "model.usage",
|
|
sessionKey,
|
|
sessionId: followupRun.run.sessionId,
|
|
channel: replyToChannel,
|
|
provider: providerUsed,
|
|
model: modelUsed,
|
|
usage: {
|
|
input,
|
|
output,
|
|
cacheRead,
|
|
cacheWrite,
|
|
promptTokens,
|
|
total: totalTokens,
|
|
},
|
|
lastCallUsage: runResult.meta?.agentMeta?.lastCallUsage,
|
|
context: {
|
|
limit: contextTokensUsed,
|
|
used: totalTokens,
|
|
},
|
|
costUsd,
|
|
durationMs: Date.now() - runStartedAt,
|
|
});
|
|
}
|
|
|
|
const responseUsageRaw =
|
|
activeSessionEntry?.responseUsage ??
|
|
(sessionKey ? activeSessionStore?.[sessionKey]?.responseUsage : undefined);
|
|
const responseUsageMode = resolveResponseUsageMode(responseUsageRaw);
|
|
if (responseUsageMode !== "off" && hasNonzeroUsage(usage)) {
|
|
const authMode = resolveModelAuthMode(providerUsed, cfg);
|
|
const showCost = authMode === "api-key";
|
|
const costConfig = showCost
|
|
? resolveModelCostConfig({
|
|
provider: providerUsed,
|
|
model: modelUsed,
|
|
config: cfg,
|
|
})
|
|
: undefined;
|
|
let formatted = formatResponseUsageLine({
|
|
usage,
|
|
showCost,
|
|
costConfig,
|
|
});
|
|
if (formatted && responseUsageMode === "full" && sessionKey) {
|
|
formatted = `${formatted} · session \`${sessionKey}\``;
|
|
}
|
|
if (formatted) {
|
|
responseUsageLine = formatted;
|
|
}
|
|
}
|
|
|
|
// If verbose is enabled, prepend operational run notices.
|
|
let finalPayloads = guardedReplyPayloads;
|
|
const verboseNotices: ReplyPayload[] = [];
|
|
|
|
if (verboseEnabled && activeIsNewSession) {
|
|
verboseNotices.push({ text: `🧭 New session: ${followupRun.run.sessionId}` });
|
|
}
|
|
|
|
if (fallbackTransition.fallbackTransitioned) {
|
|
emitAgentEvent({
|
|
runId,
|
|
sessionKey,
|
|
stream: "lifecycle",
|
|
data: {
|
|
phase: "fallback",
|
|
selectedProvider,
|
|
selectedModel,
|
|
activeProvider: providerUsed,
|
|
activeModel: modelUsed,
|
|
reasonSummary: fallbackTransition.reasonSummary,
|
|
attemptSummaries: fallbackTransition.attemptSummaries,
|
|
attempts: fallbackAttempts,
|
|
},
|
|
});
|
|
if (verboseEnabled) {
|
|
const fallbackNotice = buildFallbackNotice({
|
|
selectedProvider,
|
|
selectedModel,
|
|
activeProvider: providerUsed,
|
|
activeModel: modelUsed,
|
|
attempts: fallbackAttempts,
|
|
});
|
|
if (fallbackNotice) {
|
|
verboseNotices.push({ text: fallbackNotice });
|
|
}
|
|
}
|
|
}
|
|
if (fallbackTransition.fallbackCleared) {
|
|
emitAgentEvent({
|
|
runId,
|
|
sessionKey,
|
|
stream: "lifecycle",
|
|
data: {
|
|
phase: "fallback_cleared",
|
|
selectedProvider,
|
|
selectedModel,
|
|
activeProvider: providerUsed,
|
|
activeModel: modelUsed,
|
|
previousActiveModel: fallbackTransition.previousState.activeModel,
|
|
},
|
|
});
|
|
if (verboseEnabled) {
|
|
verboseNotices.push({
|
|
text: buildFallbackClearedNotice({
|
|
selectedProvider,
|
|
selectedModel,
|
|
previousActiveModel: fallbackTransition.previousState.activeModel,
|
|
}),
|
|
});
|
|
}
|
|
}
|
|
|
|
if (autoCompactionCount > 0) {
|
|
const count = await incrementRunCompactionCount({
|
|
sessionEntry: activeSessionEntry,
|
|
sessionStore: activeSessionStore,
|
|
sessionKey,
|
|
storePath,
|
|
amount: autoCompactionCount,
|
|
lastCallUsage: runResult.meta?.agentMeta?.lastCallUsage,
|
|
contextTokensUsed,
|
|
});
|
|
|
|
// Inject post-compaction workspace context for the next agent turn
|
|
if (sessionKey) {
|
|
const workspaceDir = process.cwd();
|
|
readPostCompactionContext(workspaceDir, cfg)
|
|
.then((contextContent) => {
|
|
if (contextContent) {
|
|
enqueueSystemEvent(contextContent, { sessionKey });
|
|
}
|
|
})
|
|
.catch(() => {
|
|
// Silent failure — post-compaction context is best-effort
|
|
});
|
|
}
|
|
|
|
// Always notify the user when compaction completes — not just in verbose
|
|
// mode. The "🧹 Compacting context..." notice was already sent at start,
|
|
// so the completion message closes the loop for every user regardless of
|
|
// their verbose setting.
|
|
const suffix = typeof count === "number" ? ` (count ${count})` : "";
|
|
const completionText = verboseEnabled
|
|
? `🧹 Auto-compaction complete${suffix}.`
|
|
: `✅ Context compacted${suffix}.`;
|
|
|
|
// In block-streaming mode the completion notice is sent above (after the
|
|
// pipeline has flushed) via a fire-and-forget call to opts.onBlockReply,
|
|
// so that it does not set didStream()=true and cause buildReplyPayloads to
|
|
// discard the real assistant reply.
|
|
// In non-streaming mode, push into verboseNotices so it is included in
|
|
// the final payload batch.
|
|
if (!blockReplyPipeline) {
|
|
verboseNotices.push({ text: completionText });
|
|
}
|
|
}
|
|
if (verboseNotices.length > 0) {
|
|
finalPayloads = [...verboseNotices, ...finalPayloads];
|
|
}
|
|
if (responseUsageLine) {
|
|
finalPayloads = appendUsageLine(finalPayloads, responseUsageLine);
|
|
}
|
|
|
|
return finalizeWithFollowup(
|
|
finalPayloads.length === 1 ? finalPayloads[0] : finalPayloads,
|
|
queueKey,
|
|
runFollowupTurn,
|
|
);
|
|
} catch (error) {
|
|
// Keep the followup queue moving even when an unexpected exception escapes
|
|
// the run path; the caller still receives the original error.
|
|
finalizeWithFollowup(undefined, queueKey, runFollowupTurn);
|
|
throw error;
|
|
} finally {
|
|
blockReplyPipeline?.stop();
|
|
typing.markRunComplete();
|
|
// Safety net: the dispatcher's onIdle callback normally fires
|
|
// markDispatchIdle(), but if the dispatcher exits early, errors,
|
|
// or the reply path doesn't go through it cleanly, the second
|
|
// signal never fires and the typing keepalive loop runs forever.
|
|
// Calling this twice is harmless — cleanup() is guarded by the
|
|
// `active` flag. Same pattern as the followup runner fix (#26881).
|
|
typing.markDispatchIdle();
|
|
}
|
|
}
|