From 485481895e967a2fc8d6a2b2dd770bfffe1b6ad0 Mon Sep 17 00:00:00 2001 From: Peter Steinberger Date: Mon, 5 Jan 2026 22:33:51 +0100 Subject: [PATCH] style: format PR 223 changes --- src/agents/pi-embedded-helpers.test.ts | 12 +- src/agents/pi-embedded-helpers.ts | 5 +- src/agents/pi-embedded-runner.ts | 586 +++++++++++++------------ src/auto-reply/reply/commands.ts | 12 +- 4 files changed, 313 insertions(+), 302 deletions(-) diff --git a/src/agents/pi-embedded-helpers.test.ts b/src/agents/pi-embedded-helpers.test.ts index 1fb719c2b0c..12496501468 100644 --- a/src/agents/pi-embedded-helpers.test.ts +++ b/src/agents/pi-embedded-helpers.test.ts @@ -1,14 +1,17 @@ import type { AssistantMessage } from "@mariozechner/pi-ai"; import { describe, expect, it } from "vitest"; - +import type { ThinkLevel } from "../auto-reply/thinking.js"; import { isRateLimitAssistantError, pickFallbackThinkingLevel, } from "./pi-embedded-helpers.js"; -import type { ThinkLevel } from "../auto-reply/thinking.js"; const asAssistant = (overrides: Partial) => - ({ role: "assistant", stopReason: "error", ...overrides }) as AssistantMessage; + ({ + role: "assistant", + stopReason: "error", + ...overrides, + }) as AssistantMessage; describe("isRateLimitAssistantError", () => { it("detects 429 rate limit payloads", () => { @@ -57,8 +60,7 @@ describe("pickFallbackThinkingLevel", () => { it("skips already attempted levels", () => { const attempted = new Set(["low", "medium"]); const next = pickFallbackThinkingLevel({ - message: - "Supported values are: 'medium', 'high', and 'xhigh'.", + message: "Supported values are: 'medium', 'high', and 'xhigh'.", attempted, }); expect(next).toBe("high"); diff --git a/src/agents/pi-embedded-helpers.ts b/src/agents/pi-embedded-helpers.ts index 4cb5f86a268..7a03e02e8bc 100644 --- a/src/agents/pi-embedded-helpers.ts +++ b/src/agents/pi-embedded-helpers.ts @@ -6,7 +6,10 @@ import type { AgentToolResult, } from "@mariozechner/pi-agent-core"; import type { AssistantMessage } from "@mariozechner/pi-ai"; -import { normalizeThinkLevel, type ThinkLevel } from "../auto-reply/thinking.js"; +import { + normalizeThinkLevel, + type ThinkLevel, +} from "../auto-reply/thinking.js"; import { sanitizeContentBlocksImages } from "./tool-images.js"; import type { WorkspaceBootstrapFile } from "./workspace.js"; diff --git a/src/agents/pi-embedded-runner.ts b/src/agents/pi-embedded-runner.ts index 1942bba07c7..849d313437e 100644 --- a/src/agents/pi-embedded-runner.ts +++ b/src/agents/pi-embedded-runner.ts @@ -340,319 +340,325 @@ export async function runEmbeddedPiAgent(params: { let restoreSkillEnv: (() => void) | undefined; process.chdir(resolvedWorkspace); try { - const shouldLoadSkillEntries = - !params.skillsSnapshot || !params.skillsSnapshot.resolvedSkills; - const skillEntries = shouldLoadSkillEntries - ? loadWorkspaceSkillEntries(resolvedWorkspace) - : []; - const skillsSnapshot = - params.skillsSnapshot ?? - buildWorkspaceSkillSnapshot(resolvedWorkspace, { + const shouldLoadSkillEntries = + !params.skillsSnapshot || !params.skillsSnapshot.resolvedSkills; + const skillEntries = shouldLoadSkillEntries + ? loadWorkspaceSkillEntries(resolvedWorkspace) + : []; + const skillsSnapshot = + params.skillsSnapshot ?? + buildWorkspaceSkillSnapshot(resolvedWorkspace, { + config: params.config, + entries: skillEntries, + }); + const sandboxSessionKey = + params.sessionKey?.trim() || params.sessionId; + const sandbox = await resolveSandboxContext({ config: params.config, - entries: skillEntries, - }); - const sandboxSessionKey = params.sessionKey?.trim() || params.sessionId; - const sandbox = await resolveSandboxContext({ - config: params.config, - sessionKey: sandboxSessionKey, - workspaceDir: resolvedWorkspace, - }); - restoreSkillEnv = params.skillsSnapshot - ? applySkillEnvOverridesFromSnapshot({ - snapshot: params.skillsSnapshot, - config: params.config, - }) - : applySkillEnvOverrides({ - skills: skillEntries ?? [], - config: params.config, - }); - - const bootstrapFiles = - await loadWorkspaceBootstrapFiles(resolvedWorkspace); - const contextFiles = buildBootstrapContextFiles(bootstrapFiles); - const promptSkills = resolvePromptSkills(skillsSnapshot, skillEntries); - // Tool schemas must be provider-compatible (OpenAI requires top-level `type: "object"`). - // `createClawdbotCodingTools()` normalizes schemas so the session can pass them through unchanged. - const tools = createClawdbotCodingTools({ - bash: { - ...params.config?.agent?.bash, - elevated: params.bashElevated, - }, - sandbox, - surface: params.surface, - sessionKey: params.sessionKey ?? params.sessionId, - config: params.config, - }); - const machineName = await getMachineDisplayName(); - const runtimeInfo = { - host: machineName, - os: `${os.type()} ${os.release()}`, - arch: os.arch(), - node: process.version, - model: `${provider}/${modelId}`, - }; - const sandboxInfo = buildEmbeddedSandboxInfo(sandbox); - const reasoningTagHint = provider === "ollama"; - const systemPrompt = buildSystemPrompt({ - appendPrompt: buildAgentSystemPromptAppend({ + sessionKey: sandboxSessionKey, workspaceDir: resolvedWorkspace, - defaultThinkLevel: thinkLevel, - extraSystemPrompt: params.extraSystemPrompt, - ownerNumbers: params.ownerNumbers, - reasoningTagHint, - runtimeInfo, - sandboxInfo, - toolNames: tools.map((tool) => tool.name), - }), - contextFiles, - skills: promptSkills, - cwd: resolvedWorkspace, - tools, - }); + }); + restoreSkillEnv = params.skillsSnapshot + ? applySkillEnvOverridesFromSnapshot({ + snapshot: params.skillsSnapshot, + config: params.config, + }) + : applySkillEnvOverrides({ + skills: skillEntries ?? [], + config: params.config, + }); - const sessionManager = SessionManager.open(params.sessionFile); - const settingsManager = SettingsManager.create( - resolvedWorkspace, - agentDir, - ); - - const { session } = await createAgentSession({ - cwd: resolvedWorkspace, - agentDir, - authStorage, - modelRegistry, - model, - thinkingLevel, - systemPrompt, - // Custom tool set: extra bash/process + read image sanitization. - tools, - sessionManager, - settingsManager, - skills: promptSkills, - contextFiles, - }); - - const prior = await sanitizeSessionMessagesImages( - session.messages, - "session:history", - ); - if (prior.length > 0) { - session.agent.replaceMessages(prior); - } - let aborted = Boolean(params.abortSignal?.aborted); - const abortRun = () => { - aborted = true; - void session.abort(); - }; - const queueHandle: EmbeddedPiQueueHandle = { - queueMessage: async (text: string) => { - await session.steer(text); - }, - isStreaming: () => session.isStreaming, - abort: abortRun, - }; - ACTIVE_EMBEDDED_RUNS.set(params.sessionId, queueHandle); - - const { - assistantTexts, - toolMetas, - unsubscribe, - waitForCompactionRetry, - } = subscribeEmbeddedPiSession({ - session, - runId: params.runId, - verboseLevel: params.verboseLevel, - shouldEmitToolResult: params.shouldEmitToolResult, - onToolResult: params.onToolResult, - onBlockReply: params.onBlockReply, - blockReplyBreak: params.blockReplyBreak, - blockReplyChunking: params.blockReplyChunking, - onPartialReply: params.onPartialReply, - onAgentEvent: params.onAgentEvent, - enforceFinalTag: params.enforceFinalTag, - }); - - let abortWarnTimer: NodeJS.Timeout | undefined; - const abortTimer = setTimeout( - () => { - log.warn( - `embedded run timeout: runId=${params.runId} sessionId=${params.sessionId} timeoutMs=${params.timeoutMs}`, - ); - abortRun(); - if (!abortWarnTimer) { - abortWarnTimer = setTimeout(() => { - if (!session.isStreaming) return; - log.warn( - `embedded run abort still streaming: runId=${params.runId} sessionId=${params.sessionId}`, - ); - }, 10_000); - } - }, - Math.max(1, params.timeoutMs), - ); - - let messagesSnapshot: AgentMessage[] = []; - let sessionIdUsed = session.sessionId; - const onAbort = () => { - abortRun(); - }; - if (params.abortSignal) { - if (params.abortSignal.aborted) { - onAbort(); - } else { - params.abortSignal.addEventListener("abort", onAbort, { - once: true, - }); - } - } - let promptError: unknown = null; - try { - const promptStartedAt = Date.now(); - log.debug( - `embedded run prompt start: runId=${params.runId} sessionId=${params.sessionId}`, + const bootstrapFiles = + await loadWorkspaceBootstrapFiles(resolvedWorkspace); + const contextFiles = buildBootstrapContextFiles(bootstrapFiles); + const promptSkills = resolvePromptSkills( + skillsSnapshot, + skillEntries, ); + // Tool schemas must be provider-compatible (OpenAI requires top-level `type: "object"`). + // `createClawdbotCodingTools()` normalizes schemas so the session can pass them through unchanged. + const tools = createClawdbotCodingTools({ + bash: { + ...params.config?.agent?.bash, + elevated: params.bashElevated, + }, + sandbox, + surface: params.surface, + sessionKey: params.sessionKey ?? params.sessionId, + config: params.config, + }); + const machineName = await getMachineDisplayName(); + const runtimeInfo = { + host: machineName, + os: `${os.type()} ${os.release()}`, + arch: os.arch(), + node: process.version, + model: `${provider}/${modelId}`, + }; + const sandboxInfo = buildEmbeddedSandboxInfo(sandbox); + const reasoningTagHint = provider === "ollama"; + const systemPrompt = buildSystemPrompt({ + appendPrompt: buildAgentSystemPromptAppend({ + workspaceDir: resolvedWorkspace, + defaultThinkLevel: thinkLevel, + extraSystemPrompt: params.extraSystemPrompt, + ownerNumbers: params.ownerNumbers, + reasoningTagHint, + runtimeInfo, + sandboxInfo, + toolNames: tools.map((tool) => tool.name), + }), + contextFiles, + skills: promptSkills, + cwd: resolvedWorkspace, + tools, + }); + + const sessionManager = SessionManager.open(params.sessionFile); + const settingsManager = SettingsManager.create( + resolvedWorkspace, + agentDir, + ); + + const { session } = await createAgentSession({ + cwd: resolvedWorkspace, + agentDir, + authStorage, + modelRegistry, + model, + thinkingLevel, + systemPrompt, + // Custom tool set: extra bash/process + read image sanitization. + tools, + sessionManager, + settingsManager, + skills: promptSkills, + contextFiles, + }); + + const prior = await sanitizeSessionMessagesImages( + session.messages, + "session:history", + ); + if (prior.length > 0) { + session.agent.replaceMessages(prior); + } + let aborted = Boolean(params.abortSignal?.aborted); + const abortRun = () => { + aborted = true; + void session.abort(); + }; + const queueHandle: EmbeddedPiQueueHandle = { + queueMessage: async (text: string) => { + await session.steer(text); + }, + isStreaming: () => session.isStreaming, + abort: abortRun, + }; + ACTIVE_EMBEDDED_RUNS.set(params.sessionId, queueHandle); + + const { + assistantTexts, + toolMetas, + unsubscribe, + waitForCompactionRetry, + } = subscribeEmbeddedPiSession({ + session, + runId: params.runId, + verboseLevel: params.verboseLevel, + shouldEmitToolResult: params.shouldEmitToolResult, + onToolResult: params.onToolResult, + onBlockReply: params.onBlockReply, + blockReplyBreak: params.blockReplyBreak, + blockReplyChunking: params.blockReplyChunking, + onPartialReply: params.onPartialReply, + onAgentEvent: params.onAgentEvent, + enforceFinalTag: params.enforceFinalTag, + }); + + let abortWarnTimer: NodeJS.Timeout | undefined; + const abortTimer = setTimeout( + () => { + log.warn( + `embedded run timeout: runId=${params.runId} sessionId=${params.sessionId} timeoutMs=${params.timeoutMs}`, + ); + abortRun(); + if (!abortWarnTimer) { + abortWarnTimer = setTimeout(() => { + if (!session.isStreaming) return; + log.warn( + `embedded run abort still streaming: runId=${params.runId} sessionId=${params.sessionId}`, + ); + }, 10_000); + } + }, + Math.max(1, params.timeoutMs), + ); + + let messagesSnapshot: AgentMessage[] = []; + let sessionIdUsed = session.sessionId; + const onAbort = () => { + abortRun(); + }; + if (params.abortSignal) { + if (params.abortSignal.aborted) { + onAbort(); + } else { + params.abortSignal.addEventListener("abort", onAbort, { + once: true, + }); + } + } + let promptError: unknown = null; try { - await session.prompt(params.prompt); - } catch (err) { - promptError = err; - } finally { + const promptStartedAt = Date.now(); log.debug( - `embedded run prompt end: runId=${params.runId} sessionId=${params.sessionId} durationMs=${Date.now() - promptStartedAt}`, + `embedded run prompt start: runId=${params.runId} sessionId=${params.sessionId}`, ); + try { + await session.prompt(params.prompt); + } catch (err) { + promptError = err; + } finally { + log.debug( + `embedded run prompt end: runId=${params.runId} sessionId=${params.sessionId} durationMs=${Date.now() - promptStartedAt}`, + ); + } + await waitForCompactionRetry(); + messagesSnapshot = session.messages.slice(); + sessionIdUsed = session.sessionId; + } finally { + clearTimeout(abortTimer); + if (abortWarnTimer) { + clearTimeout(abortWarnTimer); + abortWarnTimer = undefined; + } + unsubscribe(); + if (ACTIVE_EMBEDDED_RUNS.get(params.sessionId) === queueHandle) { + ACTIVE_EMBEDDED_RUNS.delete(params.sessionId); + notifyEmbeddedRunEnded(params.sessionId); + } + session.dispose(); + params.abortSignal?.removeEventListener?.("abort", onAbort); } - await waitForCompactionRetry(); - messagesSnapshot = session.messages.slice(); - sessionIdUsed = session.sessionId; - } finally { - clearTimeout(abortTimer); - if (abortWarnTimer) { - clearTimeout(abortWarnTimer); - abortWarnTimer = undefined; + if (promptError && !aborted) { + const fallbackThinking = pickFallbackThinkingLevel({ + message: + promptError instanceof Error + ? promptError.message + : String(promptError), + attempted: attemptedThinking, + }); + if (fallbackThinking) { + log.warn( + `unsupported thinking level for ${provider}/${modelId}; retrying with ${fallbackThinking}`, + ); + thinkLevel = fallbackThinking; + continue; + } + throw promptError; } - unsubscribe(); - if (ACTIVE_EMBEDDED_RUNS.get(params.sessionId) === queueHandle) { - ACTIVE_EMBEDDED_RUNS.delete(params.sessionId); - notifyEmbeddedRunEnded(params.sessionId); - } - session.dispose(); - params.abortSignal?.removeEventListener?.("abort", onAbort); - } - if (promptError && !aborted) { + + const lastAssistant = messagesSnapshot + .slice() + .reverse() + .find((m) => (m as AgentMessage)?.role === "assistant") as + | AssistantMessage + | undefined; + const fallbackThinking = pickFallbackThinkingLevel({ - message: - promptError instanceof Error - ? promptError.message - : String(promptError), + message: lastAssistant?.errorMessage, attempted: attemptedThinking, }); - if (fallbackThinking) { + if (fallbackThinking && !aborted) { log.warn( `unsupported thinking level for ${provider}/${modelId}; retrying with ${fallbackThinking}`, ); thinkLevel = fallbackThinking; continue; } - throw promptError; - } - const lastAssistant = messagesSnapshot - .slice() - .reverse() - .find((m) => (m as AgentMessage)?.role === "assistant") as - | AssistantMessage - | undefined; - - const fallbackThinking = pickFallbackThinkingLevel({ - message: lastAssistant?.errorMessage, - attempted: attemptedThinking, - }); - if (fallbackThinking && !aborted) { - log.warn( - `unsupported thinking level for ${provider}/${modelId}; retrying with ${fallbackThinking}`, - ); - thinkLevel = fallbackThinking; - continue; - } - - const fallbackConfigured = - (params.config?.agent?.modelFallbacks?.length ?? 0) > 0; - if (fallbackConfigured && isRateLimitAssistantError(lastAssistant)) { - const message = - lastAssistant?.errorMessage?.trim() || - (lastAssistant ? formatAssistantErrorText(lastAssistant) : "") || - "LLM request rate limited."; - throw new Error(message); - } - - const usage = lastAssistant?.usage; - const agentMeta: EmbeddedPiAgentMeta = { - sessionId: sessionIdUsed, - provider: lastAssistant?.provider ?? provider, - model: lastAssistant?.model ?? model.id, - usage: usage - ? { - input: usage.input, - output: usage.output, - cacheRead: usage.cacheRead, - cacheWrite: usage.cacheWrite, - total: usage.totalTokens, - } - : undefined, - }; - - const replyItems: Array<{ text: string; media?: string[] }> = []; - - const errorText = lastAssistant - ? formatAssistantErrorText(lastAssistant) - : undefined; - if (errorText) replyItems.push({ text: errorText }); - - const inlineToolResults = - params.verboseLevel === "on" && - !params.onPartialReply && - !params.onToolResult && - toolMetas.length > 0; - if (inlineToolResults) { - for (const { toolName, meta } of toolMetas) { - const agg = formatToolAggregate(toolName, meta ? [meta] : []); - const { text: cleanedText, mediaUrls } = splitMediaFromOutput(agg); - if (cleanedText) - replyItems.push({ text: cleanedText, media: mediaUrls }); + const fallbackConfigured = + (params.config?.agent?.modelFallbacks?.length ?? 0) > 0; + if (fallbackConfigured && isRateLimitAssistantError(lastAssistant)) { + const message = + lastAssistant?.errorMessage?.trim() || + (lastAssistant ? formatAssistantErrorText(lastAssistant) : "") || + "LLM request rate limited."; + throw new Error(message); } - } - for (const text of assistantTexts.length - ? assistantTexts - : lastAssistant - ? [extractAssistantText(lastAssistant)] - : []) { - const { text: cleanedText, mediaUrls } = splitMediaFromOutput(text); - if (!cleanedText && (!mediaUrls || mediaUrls.length === 0)) continue; - replyItems.push({ text: cleanedText, media: mediaUrls }); - } + const usage = lastAssistant?.usage; + const agentMeta: EmbeddedPiAgentMeta = { + sessionId: sessionIdUsed, + provider: lastAssistant?.provider ?? provider, + model: lastAssistant?.model ?? model.id, + usage: usage + ? { + input: usage.input, + output: usage.output, + cacheRead: usage.cacheRead, + cacheWrite: usage.cacheWrite, + total: usage.totalTokens, + } + : undefined, + }; - const payloads = replyItems - .map((item) => ({ - text: item.text?.trim() ? item.text.trim() : undefined, - mediaUrls: item.media?.length ? item.media : undefined, - mediaUrl: item.media?.[0], - })) - .filter( - (p) => - p.text || p.mediaUrl || (p.mediaUrls && p.mediaUrls.length > 0), + const replyItems: Array<{ text: string; media?: string[] }> = []; + + const errorText = lastAssistant + ? formatAssistantErrorText(lastAssistant) + : undefined; + if (errorText) replyItems.push({ text: errorText }); + + const inlineToolResults = + params.verboseLevel === "on" && + !params.onPartialReply && + !params.onToolResult && + toolMetas.length > 0; + if (inlineToolResults) { + for (const { toolName, meta } of toolMetas) { + const agg = formatToolAggregate(toolName, meta ? [meta] : []); + const { text: cleanedText, mediaUrls } = + splitMediaFromOutput(agg); + if (cleanedText) + replyItems.push({ text: cleanedText, media: mediaUrls }); + } + } + + for (const text of assistantTexts.length + ? assistantTexts + : lastAssistant + ? [extractAssistantText(lastAssistant)] + : []) { + const { text: cleanedText, mediaUrls } = splitMediaFromOutput(text); + if (!cleanedText && (!mediaUrls || mediaUrls.length === 0)) + continue; + replyItems.push({ text: cleanedText, media: mediaUrls }); + } + + const payloads = replyItems + .map((item) => ({ + text: item.text?.trim() ? item.text.trim() : undefined, + mediaUrls: item.media?.length ? item.media : undefined, + mediaUrl: item.media?.[0], + })) + .filter( + (p) => + p.text || p.mediaUrl || (p.mediaUrls && p.mediaUrls.length > 0), + ); + + log.debug( + `embedded run done: runId=${params.runId} sessionId=${params.sessionId} durationMs=${Date.now() - started} aborted=${aborted}`, ); - - log.debug( - `embedded run done: runId=${params.runId} sessionId=${params.sessionId} durationMs=${Date.now() - started} aborted=${aborted}`, - ); - return { - payloads: payloads.length ? payloads : undefined, - meta: { - durationMs: Date.now() - started, - agentMeta, - aborted, - }, - }; + return { + payloads: payloads.length ? payloads : undefined, + meta: { + durationMs: Date.now() - started, + agentMeta, + aborted, + }, + }; } finally { restoreSkillEnv?.(); process.chdir(prevCwd); diff --git a/src/auto-reply/reply/commands.ts b/src/auto-reply/reply/commands.ts index e393e8080dd..49bff224386 100644 --- a/src/auto-reply/reply/commands.ts +++ b/src/auto-reply/reply/commands.ts @@ -1,6 +1,9 @@ import fs from "node:fs"; - +import { getEnvApiKey } from "@mariozechner/pi-ai"; +import { discoverAuthStorage } from "@mariozechner/pi-coding-agent"; +import { resolveClawdbotAgentDir } from "../../agents/agent-paths.js"; import type { ClawdbotConfig } from "../../config/config.js"; +import { resolveOAuthPath } from "../../config/paths.js"; import { type SessionEntry, type SessionScope, @@ -12,10 +15,6 @@ import { resolveSendPolicy } from "../../sessions/send-policy.js"; import { normalizeE164 } from "../../utils.js"; import { resolveHeartbeatSeconds } from "../../web/reconnect.js"; import { getWebAuthAgeMs, webAuthExists } from "../../web/session.js"; -import { resolveClawdbotAgentDir } from "../../agents/agent-paths.js"; -import { resolveOAuthPath } from "../../config/paths.js"; -import { getEnvApiKey } from "@mariozechner/pi-ai"; -import { discoverAuthStorage } from "@mariozechner/pi-coding-agent"; import { normalizeGroupActivation, parseActivationCommand, @@ -61,7 +60,8 @@ function hasOAuthCredentials(provider: string): boolean { if (!entry) return false; const refresh = entry.refresh ?? entry.refresh_token ?? entry.refreshToken ?? ""; - const access = entry.access ?? entry.access_token ?? entry.accessToken ?? ""; + const access = + entry.access ?? entry.access_token ?? entry.accessToken ?? ""; return Boolean(refresh.trim() && access.trim()); } catch { return false;