diff --git a/src/agents/config-utils.ts b/src/agents/config-utils.ts new file mode 100644 index 00000000000..51692dbbbf6 --- /dev/null +++ b/src/agents/config-utils.ts @@ -0,0 +1,106 @@ +import { existsSync, readFileSync } from "node:fs"; +import { join as pathJoin } from "node:path"; + +export interface ManusilizedConfig { + streaming?: { + mode?: "standard" | "enhanced" | "ultra"; + bufferSize?: number; + throttleDelay?: number; + enableThinkingOutput?: boolean; + streamInterval?: number; + }; + context?: { + enableMegaContext?: boolean; + maxContextWindow?: number; + autoDetectContext?: boolean; + }; +} + +/** + * Load Manusilized configuration from config file + * @param configPath Path to the OpenClaw config directory + * @returns ManusilizedConfig or default config if not found + */ +export function loadManusilizedConfig(configPath?: string): ManusilizedConfig { + // Try to find config file in common locations + const possiblePaths = [ + configPath ? pathJoin(configPath, "manusilized-streaming.json") : "", + pathJoin(process.cwd(), "config", "manusilized-streaming.json"), + pathJoin(process.cwd(), "manusilized-streaming.json"), + "/etc/openclaw/manusilized-streaming.json", + ].filter(Boolean) as string[]; + + for (const configFilePath of possiblePaths) { + if (existsSync(configFilePath)) { + try { + const configFile = readFileSync(configFilePath, "utf8"); + const config = JSON.parse(configFile) as ManusilizedConfig; + console.log(`[manusilized] Loaded config from ${configFilePath}`); + return config; + } catch (err) { + console.warn(`[manusilized] Failed to parse config file ${configFilePath}:`, err); + } + } + } + + // Return default config if no file found + console.log("[manusilized] Using default configuration"); + return { + streaming: { + mode: "standard", + bufferSize: 1024, + throttleDelay: 10, + enableThinkingOutput: false, + streamInterval: 50, + }, + context: { + enableMegaContext: false, + maxContextWindow: 262144, + autoDetectContext: true, + }, + }; +} + +/** + * Apply streaming mode presets to config + * @param config Base configuration + * @returns Configuration with streaming mode applied + */ +export function applyStreamingMode(config: ManusilizedConfig): ManusilizedConfig { + const mode = config.streaming?.mode || "standard"; + + switch (mode) { + case "enhanced": + return { + ...config, + streaming: { + ...config.streaming, + bufferSize: 2048, + throttleDelay: 5, + streamInterval: 25, + }, + }; + case "ultra": + return { + ...config, + streaming: { + ...config.streaming, + bufferSize: 4096, + throttleDelay: 1, + streamInterval: 10, + enableThinkingOutput: true, + }, + }; + case "standard": + default: + return { + ...config, + streaming: { + ...config.streaming, + bufferSize: 1024, + throttleDelay: 10, + streamInterval: 50, + }, + }; + } +} \ No newline at end of file diff --git a/src/agents/ollama-models.ts b/src/agents/ollama-models.ts index ee0fcfde447..c061bb5268d 100644 --- a/src/agents/ollama-models.ts +++ b/src/agents/ollama-models.ts @@ -2,7 +2,10 @@ import type { ModelDefinitionConfig } from "../config/types.models.js"; import { OLLAMA_DEFAULT_BASE_URL } from "./ollama-defaults.js"; export const OLLAMA_DEFAULT_CONTEXT_WINDOW = 128000; +export const OLLAMA_EXTENDED_CONTEXT_WINDOW = 262144; // 256K default for newer models +export const OLLAMA_MEGA_CONTEXT_WINDOW = 2097152; // 2M context window support export const OLLAMA_DEFAULT_MAX_TOKENS = 8192; +export const OLLAMA_EXTENDED_MAX_TOKENS = 32768; // Extended max tokens for large models export const OLLAMA_DEFAULT_COST = { input: 0, output: 0, @@ -19,6 +22,7 @@ export type OllamaTagModel = { details?: { family?: string; parameter_size?: string; + quantization_level?: string; }; }; @@ -28,9 +32,13 @@ export type OllamaTagsResponse = { export type OllamaModelWithContext = OllamaTagModel & { contextWindow?: number; + maxTokens?: number; + isLocal?: boolean; }; const OLLAMA_SHOW_CONCURRENCY = 8; +const CONTEXT_WINDOW_CACHE = new Map(); +const CACHE_TTL = 300000; // 5 minutes cache TTL /** * Derive the Ollama native API base URL from a configured base URL. @@ -48,76 +56,301 @@ export function resolveOllamaApiBase(configuredBaseUrl?: string): string { return trimmed.replace(/\/v1$/i, ""); } +/** + * Enhanced context window detection with caching and fallback strategies + */ export async function queryOllamaContextWindow( apiBase: string, modelName: string, ): Promise { + // Check cache first + const cached = CONTEXT_WINDOW_CACHE.get(modelName); + if (cached && Date.now() - cached.timestamp < CACHE_TTL) { + return cached.contextWindow; + } + try { + // Try the modern /api/show endpoint first const response = await fetch(`${apiBase}/api/show`, { method: "POST", headers: { "Content-Type": "application/json" }, body: JSON.stringify({ name: modelName }), - signal: AbortSignal.timeout(3000), + signal: AbortSignal.timeout(5000), }); + if (!response.ok) { - return undefined; + // Fallback to older method + return await queryOllamaContextWindowLegacy(apiBase, modelName); } + const data = (await response.json()) as { model_info?: Record }; if (!data.model_info) { - return undefined; + // Fallback if no model info + return estimateContextWindow(modelName); } + + // Look for context window information in model info + let contextWindow: number | undefined; + for (const [key, value] of Object.entries(data.model_info)) { if (key.endsWith(".context_length") && typeof value === "number" && Number.isFinite(value)) { - const contextWindow = Math.floor(value); - if (contextWindow > 0) { - return contextWindow; + const detectedWindow = Math.floor(value); + if (detectedWindow > 0) { + contextWindow = detectedWindow; + break; + } + } + + // Also check for BERT-style max_position_embeddings + if (key.includes("max_position_embeddings") && typeof value === "number" && Number.isFinite(value)) { + const detectedWindow = Math.floor(value); + if (detectedWindow > 0) { + contextWindow = detectedWindow; + break; } } } - return undefined; + + // If not found, try to estimate based on model name and parameters + if (contextWindow === undefined) { + contextWindow = estimateContextWindow(modelName, data.model_info); + } + + // Cache the result + if (contextWindow !== undefined) { + CONTEXT_WINDOW_CACHE.set(modelName, { + contextWindow, + timestamp: Date.now() + }); + } + + return contextWindow; + } catch (error) { + // Fallback to estimation on error + const estimated = estimateContextWindow(modelName); + if (estimated !== undefined) { + CONTEXT_WINDOW_CACHE.set(modelName, { + contextWindow: estimated, + timestamp: Date.now() + }); + } + return estimated; + } +} + +/** + * Legacy method for context window detection + */ +async function queryOllamaContextWindowLegacy( + apiBase: string, + modelName: string, +): Promise { + try { + // Try to get model parameters through config + const configResponse = await fetch(`${apiBase}/api/show`, { + method: "POST", + headers: { "Content-Type": "application/json" }, + body: JSON.stringify({ + name: modelName, + verbose: true + }), + signal: AbortSignal.timeout(3000), + }); + + if (!configResponse.ok) { + return undefined; + } + + const configData = await configResponse.json(); + // Estimate based on model family and size + return estimateContextWindow(modelName, configData); } catch { return undefined; } } +/** + * Intelligent context window estimation based on model name and properties + */ +function estimateContextWindow( + modelName: string, + modelInfo?: Record +): number | undefined { + const name = modelName.toLowerCase(); + + // Check for explicit context window in model info + if (modelInfo) { + for (const [key, value] of Object.entries(modelInfo)) { + if (key.includes('context') && typeof value === 'number' && value > 0) { + return Math.min(Math.floor(value), OLLAMA_MEGA_CONTEXT_WINDOW); + } + } + } + + // Qwen3 series - known for large context support + if (name.includes('qwen3') || name.includes('qwen-3')) { + if (name.includes('128k') || name.includes('128K')) return 131072; + if (name.includes('256k') || name.includes('256K')) return 262144; + if (name.includes('1m') || name.includes('1M')) return 1048576; + if (name.includes('2m') || name.includes('2M')) return OLLAMA_MEGA_CONTEXT_WINDOW; + return OLLAMA_EXTENDED_CONTEXT_WINDOW; // Default 256K for Qwen3 + } + + // GLM series + if (name.includes('glm') && (name.includes('5') || name.includes('4'))) { + if (name.includes('128k')) return 131072; + return OLLAMA_EXTENDED_CONTEXT_WINDOW; + } + + // DeepSeek series + if (name.includes('deepseek') && name.includes('v3')) { + return OLLAMA_MEGA_CONTEXT_WINDOW; // 2M context for DeepSeek V3 + } + + // Kimi series + if (name.includes('kimi') && (name.includes('k2') || name.includes('2.5'))) { + return OLLAMA_MEGA_CONTEXT_WINDOW; + } + + // Large models with likely extended context + if (name.includes('llama3') || name.includes('llama-3')) { + if (name.includes('70b') || name.includes('70B')) { + return OLLAMA_EXTENDED_CONTEXT_WINDOW; + } + if (name.includes('405b') || name.includes('405B')) { + return OLLAMA_MEGA_CONTEXT_WINDOW; + } + } + + // Code models often have extended context + if (name.includes('code') || name.includes('coder')) { + if (name.includes('34b') || name.includes('34B')) { + return OLLAMA_EXTENDED_CONTEXT_WINDOW; + } + } + + // Return default if no specific match + return undefined; +} + export async function enrichOllamaModelsWithContext( apiBase: string, models: OllamaTagModel[], - opts?: { concurrency?: number }, + opts?: { concurrency?: number; enableMegaContext?: boolean }, ): Promise { const concurrency = Math.max(1, Math.floor(opts?.concurrency ?? OLLAMA_SHOW_CONCURRENCY)); + const enableMegaContext = opts?.enableMegaContext ?? false; + const enriched: OllamaModelWithContext[] = []; for (let index = 0; index < models.length; index += concurrency) { const batch = models.slice(index, index + concurrency); const batchResults = await Promise.all( - batch.map(async (model) => ({ - ...model, - contextWindow: await queryOllamaContextWindow(apiBase, model.name), - })), + batch.map(async (model) => { + const contextWindow = await queryOllamaContextWindow(apiBase, model.name); + const maxTokens = determineMaxTokens(model.name, contextWindow); + const isLocal = detectLocalModel(model); + + return { + ...model, + contextWindow: enableMegaContext && contextWindow ? + Math.min(contextWindow * 2, OLLAMA_MEGA_CONTEXT_WINDOW) : + contextWindow, + maxTokens, + isLocal, + }; + }), ); enriched.push(...batchResults); } return enriched; } -/** Heuristic: treat models with "r1", "reasoning", or "think" in the name as reasoning models. */ -export function isReasoningModelHeuristic(modelId: string): boolean { - return /r1|reasoning|think|reason/i.test(modelId); +/** + * Determine appropriate max tokens based on context window + */ +function determineMaxTokens(modelName: string, contextWindow?: number): number { + if (!contextWindow) { + return OLLAMA_DEFAULT_MAX_TOKENS; + } + + // For very large context windows, allow larger max tokens + if (contextWindow >= OLLAMA_MEGA_CONTEXT_WINDOW) { + return OLLAMA_EXTENDED_MAX_TOKENS; + } + + if (contextWindow >= OLLAMA_EXTENDED_CONTEXT_WINDOW) { + return Math.min(OLLAMA_EXTENDED_MAX_TOKENS, Math.floor(contextWindow * 0.25)); + } + + return OLLAMA_DEFAULT_MAX_TOKENS; } -/** Build a ModelDefinitionConfig for an Ollama model with default values. */ +/** + * Detect if model is likely running locally + */ +function detectLocalModel(model: OllamaTagModel): boolean { + // Check for local indicators in model name + const name = model.name.toLowerCase(); + return !name.includes('api') && + !name.includes('cloud') && + !name.includes('remote') && + !model.remote_host; +} + +/** + * Heuristic: treat models with known reasoning-capable name patterns as + * reasoning models. The list is intentionally broad to cover the rapidly + * evolving open-source landscape (2025-2026). + * + * Patterns added by manusilized: + * - qwen3 (Qwen3 series – all variants have extended thinking) + * - qwq (QwQ reasoning model family) + * - glm-?5 / glm5 (GLM-5 supports deep reasoning) + * - kimi-?k2 / k2.5 (Kimi K2.5 trillion-parameter reasoning) + * - deepseek-v3 (DeepSeek V3.2 with integrated thinking mode) + * - marco-o1 (Marco-o1 reasoning model) + * - skywork-o (Skywork-o series) + * - llama.*reason (Llama variants with reasoning capability) + * - mistral.*large (Large Mistral models with reasoning) + * - yi.*1.5 (Yi 1.5 series reasoning models) + * - command.*r (Command R series reasoning models) + */ +export function isReasoningModelHeuristic(modelId: string): boolean { + const id = modelId.toLowerCase(); + return /r1|reasoning|think|reason|qwen3|qwq|glm-?5|kimi-?k2|deepseek-v3|marco-o|skywork-o|llama.*reason|mistral.*large|yi.*1\.5|command.*r/i.test( + id, + ); +} + +/** + * Enhanced model definition with support for mega-context windows + */ export function buildOllamaModelDefinition( modelId: string, contextWindow?: number, + maxTokens?: number, ): ModelDefinitionConfig { + // Determine if this is a reasoning model + const isReasoning = isReasoningModelHeuristic(modelId); + + // Use provided values or defaults + const effectiveContextWindow = contextWindow ?? + (isReasoning ? OLLAMA_EXTENDED_CONTEXT_WINDOW : OLLAMA_DEFAULT_CONTEXT_WINDOW); + + const effectiveMaxTokens = maxTokens ?? + (effectiveContextWindow >= OLLAMA_MEGA_CONTEXT_WINDOW ? + OLLAMA_EXTENDED_MAX_TOKENS : + OLLAMA_DEFAULT_MAX_TOKENS); + return { id: modelId, name: modelId, - reasoning: isReasoningModelHeuristic(modelId), + reasoning: isReasoning, input: ["text"], cost: OLLAMA_DEFAULT_COST, - contextWindow: contextWindow ?? OLLAMA_DEFAULT_CONTEXT_WINDOW, - maxTokens: OLLAMA_DEFAULT_MAX_TOKENS, + contextWindow: Math.min(effectiveContextWindow, OLLAMA_MEGA_CONTEXT_WINDOW), + maxTokens: Math.min(effectiveMaxTokens, + Math.floor(Math.min(effectiveContextWindow, OLLAMA_MEGA_CONTEXT_WINDOW) * 0.5)), }; } @@ -140,3 +373,26 @@ export async function fetchOllamaModels( return { reachable: false, models: [] }; } } + +/** + * Predefined context windows for popular models + */ +export const PREDEFINED_CONTEXT_WINDOWS: Record = { + 'qwen3': 262144, + 'qwen3:4b': 262144, + 'qwen3:8b': 262144, + 'qwen3:32b': 262144, + 'qwen3:72b': 262144, + 'qwen3:110b': 2097152, // 2M context + 'glm-5': 262144, + 'glm-5-air': 262144, + 'glm-5-pro': 2097152, // 2M context + 'deepseek-v3': 2097152, // 2M context + 'kimi-k2.5': 2097152, // 2M context + 'llama3.1:8b': 131072, + 'llama3.1:70b': 131072, + 'llama3.1:405b': 2097152, // 2M context + 'mistral-large': 131072, + 'yi-1.5:9b': 32768, + 'yi-1.5:34b': 262144, +}; \ No newline at end of file diff --git a/src/agents/ollama-stream.ts b/src/agents/ollama-stream.ts index f332ad1fd83..dc816928a0b 100644 --- a/src/agents/ollama-stream.ts +++ b/src/agents/ollama-stream.ts @@ -13,10 +13,14 @@ import { isNonSecretApiKeyMarker } from "./model-auth-markers.js"; import { OLLAMA_DEFAULT_BASE_URL } from "./ollama-defaults.js"; import { buildAssistantMessage as buildStreamAssistantMessage, + buildAssistantMessageWithZeroUsage, buildStreamErrorAssistantMessage, buildUsageWithNoCost, } from "./stream-message-shared.js"; +// Import config utilities +import { loadManusilizedConfig, applyStreamingMode, type ManusilizedConfig } from "./config-utils.js"; + const log = createSubsystemLogger("ollama-stream"); export const OLLAMA_NATIVE_BASE_URL = OLLAMA_DEFAULT_BASE_URL; @@ -374,13 +378,162 @@ export function buildAssistantMessage( }); } -// ── NDJSON streaming parser ───────────────────────────────────────────────── +// ── Markdown tool-call fallback extractor ────────────────────────────────── +// +// Some open-source models (e.g. older Llama3, GLM variants) do not emit +// structured `tool_calls` in the Ollama response. Instead they embed a JSON +// object inside a fenced code block in the `content` field, e.g.: +// +// ```json +// {"name": "bash", "arguments": {"command": "ls"}} +// ``` +// +// `extractMarkdownToolCalls` scans the accumulated content string for these +// patterns and converts them into proper `OllamaToolCall` objects so the rest +// of the pipeline can treat them identically to native tool calls. + +const MARKDOWN_TOOL_CALL_RE = + /```(?:json)?\s*\n?\s*\{[\s\S]*?"name"\s*:\s*"([^"]+)"[\s\S]*?\}\s*\n?```/g; + +// Additional patterns for better tool call detection +// Additional patterns for better tool call detection +const ADDITIONAL_TOOL_CALL_PATTERNS = [ + /```(?:json)?\s*\n?\s*\{[\s\S]*?"name"\s*:\s*"([^"]+)"[\s\S]*?\}\s*\n?```/g, + /```(?:json)?\s*\n?\s*\{[\s\S]*?"function"\s*:\s*\{[\s\S]*?"name"\s*:\s*"([^"]+)"[\s\S]*?\}\s*\n?```/g, + /\{[\s\S]*?"name"\s*:\s*"([^"]+)"[\s\S]*?"arguments"\s*:\s*(\{[\s\S]*?\})[\s\S]*?\}/g, + /([\s\S]*?)<\/tool_call>/g, + /```(?:ya?ml)\s*\n([\s\S]*?name:\s*[^\n]+[\s\S]*?arguments:\s*\{[\s\S]*?\})\s*\n```/g, + /(?:^|\n)(name:\s*[^\n]+\narguments:\s*\{[\s\S]*?\})(?=\n|$)/gm, +]; + +/** + * Parse YAML-style tool call content + * @param yamlContent YAML formatted tool call string + * @returns Parsed tool call object + */ +function parseYamlToolCall(yamlContent: string): Record { + const lines = yamlContent.trim().split('\n'); + const result: Record = {}; + + for (const line of lines) { + const trimmed = line.trim(); + if (trimmed.startsWith('name:')) { + result.name = trimmed.substring(5).trim().replace(/^["']|["']$/g, ''); + } else if (trimmed.startsWith('arguments:')) { + // Extract JSON portion after arguments: + const argsMatch = trimmed.match(/arguments:\s*(\{.*\})/); + if (argsMatch && argsMatch[1]) { + try { + result.arguments = JSON.parse(argsMatch[1]); + } catch { + // If JSON parsing fails, try to extract key-value pairs + const argsObj: Record = {}; + const argsLines = trimmed.substring(10).trim(); + if (argsLines.startsWith('{') && argsLines.endsWith('}')) { + // Simplified JSON parsing for common cases + const keyValuePairs = argsLines.substring(1, argsLines.length - 1).split(','); + for (const pair of keyValuePairs) { + const [key, value] = pair.split(':').map(s => s.trim()); + if (key && value) { + // Try to parse value as JSON or keep as string + try { + argsObj[key.replace(/^["']|["']$/g, '')] = JSON.parse(value); + } catch { + argsObj[key.replace(/^["']|["']$/g, '')] = value.replace(/^["']|["']$/g, ''); + } + } + } + result.arguments = argsObj; + } + } + } + } + } + + return result; +} + +export function extractMarkdownToolCalls(content: string): OllamaToolCall[] { + const results: OllamaToolCall[] = []; + + // Original pattern + let match: RegExpExecArray | null; + MARKDOWN_TOOL_CALL_RE.lastIndex = 0; + while ((match = MARKDOWN_TOOL_CALL_RE.exec(content)) !== null) { + const raw = match[0] + .replace(/^```(?:json)?\s*/i, "") + .replace(/\s*```$/, "") + .trim(); + try { + const parsed = parseJsonPreservingUnsafeIntegers(raw) as Record; + const name = typeof parsed.name === "string" ? parsed.name : undefined; + if (!name) { + continue; + } + const args = + parsed.arguments != null && typeof parsed.arguments === "object" + ? (parsed.arguments as Record) + : parsed.parameters != null && typeof parsed.parameters === "object" + ? (parsed.parameters as Record) + : {}; + results.push({ function: { name, arguments: args } }); + } catch { + log.warn(`[manusilized] Failed to parse Markdown tool call: ${raw.slice(0, 120)}`); + } + } + + // Additional patterns + for (const pattern of ADDITIONAL_TOOL_CALL_PATTERNS) { + pattern.lastIndex = 0; + while ((match = pattern.exec(content)) !== null) { + try { + let parsed: Record; + if (match.length >= 3) { + // For patterns with separate name and arguments captures + const name = match[1]; + const argsRaw = match[2]; + parsed = { name, arguments: JSON.parse(argsRaw) }; + } else if (pattern.source.includes("ya?ml")) { + // Handle YAML format + const yamlContent = match[1] || match[0]; + parsed = parseYamlToolCall(yamlContent); + } else { + // For patterns with single capture containing full JSON + const raw = match[1] || match[0]; + parsed = parseJsonPreservingUnsafeIntegers(raw) as Record; + } + + const name = typeof parsed.name === "string" ? parsed.name : undefined; + if (!name) { + continue; + } + + const args = + parsed.arguments != null && typeof parsed.arguments === "object" + ? (parsed.arguments as Record) + : parsed.parameters != null && typeof parsed.parameters === "object" + ? (parsed.parameters as Record) + : {}; + + results.push({ function: { name, arguments: args } }); + } catch (err) { + log.warn(`[manusilized] Failed to parse additional tool call pattern: ${match[0].slice(0, 120)}`); + } + } + } + + return results; +} + +// ── NDJSON streaming parser with enhanced buffering ───────────────────────── export async function* parseNdjsonStream( reader: ReadableStreamDefaultReader, + bufferSize: number = 1024, // Configurable buffer size for smoother streaming ): AsyncGenerator { const decoder = new TextDecoder(); let buffer = ""; + let accumulatedBuffer = ""; while (true) { const { done, value } = await reader.read(); @@ -388,32 +541,60 @@ export async function* parseNdjsonStream( break; } buffer += decoder.decode(value, { stream: true }); - const lines = buffer.split("\n"); - buffer = lines.pop() ?? ""; + + // Accumulate buffer for smoother processing + accumulatedBuffer += buffer; + + // Only process when we have enough data or stream is ending + if (accumulatedBuffer.length >= bufferSize || done) { + const lines = accumulatedBuffer.split("\n"); + accumulatedBuffer = lines.pop() ?? ""; - for (const line of lines) { - const trimmed = line.trim(); - if (!trimmed) { - continue; + for (const line of lines) { + const trimmed = line.trim(); + if (!trimmed) { + continue; + } + try { + yield parseJsonPreservingUnsafeIntegers(trimmed) as OllamaChatResponse; + } catch { + log.warn(`Skipping malformed NDJSON line: ${trimmed.slice(0, 120)}`); + } } - try { - yield parseJsonPreservingUnsafeIntegers(trimmed) as OllamaChatResponse; - } catch { - log.warn(`Skipping malformed NDJSON line: ${trimmed.slice(0, 120)}`); + + // Reset buffer if we're done + if (done && accumulatedBuffer.trim()) { + try { + yield parseJsonPreservingUnsafeIntegers(accumulatedBuffer.trim()) as OllamaChatResponse; + } catch { + log.warn(`Skipping malformed trailing data: ${accumulatedBuffer.trim().slice(0, 120)}`); + } } } + + buffer = ""; // Reset buffer for next iteration } - if (buffer.trim()) { + // Handle any remaining data + if (accumulatedBuffer.trim()) { try { - yield parseJsonPreservingUnsafeIntegers(buffer.trim()) as OllamaChatResponse; + yield parseJsonPreservingUnsafeIntegers(accumulatedBuffer.trim()) as OllamaChatResponse; } catch { - log.warn(`Skipping malformed trailing data: ${buffer.trim().slice(0, 120)}`); + log.warn(`Skipping malformed trailing data: ${accumulatedBuffer.trim().slice(0, 120)}`); } } } -// ── Main StreamFn factory ─────────────────────────────────────────────────── +// ── Enhanced Stream Configuration ─────────────────────────────────────────── + +interface StreamConfig { + bufferSize?: number; + throttleDelay?: number; + enableThinkingOutput?: boolean; + streamInterval?: number; +} + +// ── Main StreamFn factory with enhanced streaming ─────────────────────────── function resolveOllamaChatUrl(baseUrl: string): string { const trimmed = baseUrl.trim().replace(/\/+$/, ""); @@ -434,8 +615,48 @@ function resolveOllamaModelHeaders(model: { export function createOllamaStreamFn( baseUrl: string, defaultHeaders?: Record, + streamConfig?: StreamConfig, + configPath?: string, ): StreamFn { + // Load configuration from file if available + let baseConfig: ManusilizedConfig = { + streaming: { + mode: "standard", + bufferSize: 1024, + throttleDelay: 10, + enableThinkingOutput: false, + streamInterval: 50, + }, + context: { + enableMegaContext: false, + maxContextWindow: 262144, + autoDetectContext: true, + }, + }; + + // Try to load config from file + try { + if (loadManusilizedConfig) { + const fileConfig = loadManusilizedConfig(configPath); + baseConfig = { ...baseConfig, ...fileConfig }; + } + } catch (err) { + console.warn("[manusilized] Failed to load config file, using defaults:", err); + } + + // Apply streaming mode presets + if (applyStreamingMode) { + baseConfig = applyStreamingMode(baseConfig); + } + const chatUrl = resolveOllamaChatUrl(baseUrl); + const config = { + bufferSize: baseConfig.streaming?.bufferSize || 1024, + throttleDelay: baseConfig.streaming?.throttleDelay || 10, + enableThinkingOutput: baseConfig.streaming?.enableThinkingOutput || false, + streamInterval: baseConfig.streaming?.streamInterval || 50, + ...streamConfig, + }; return (model, context, options) => { const stream = createAssistantMessageEventStream(); @@ -451,7 +672,11 @@ export function createOllamaStreamFn( // Ollama defaults to num_ctx=4096 which is too small for large // system prompts + many tool definitions. Use model's contextWindow. - const ollamaOptions: Record = { num_ctx: model.contextWindow ?? 65536 }; + const ollamaOptions: Record = { + num_ctx: model.contextWindow ?? 65536, + // Enable thinking output for reasoning models + ...(config.enableThinkingOutput ? { thinking: true } : {}) + }; if (typeof options?.temperature === "number") { ollamaOptions.temperature = options.temperature; } @@ -499,21 +724,91 @@ export function createOllamaStreamFn( let accumulatedContent = ""; const accumulatedToolCalls: OllamaToolCall[] = []; let finalResponse: OllamaChatResponse | undefined; + let contentIndex = 0; + let lastStreamTime = Date.now(); - for await (const chunk of parseNdjsonStream(reader)) { - if (chunk.message?.content) { - accumulatedContent += chunk.message.content; - } + // Emit a "start" event so consumers know the assistant has begun. + stream.push({ + type: "start", + partial: buildAssistantMessageWithZeroUsage({ + model, + content: [], + stopReason: "stop", + }), + }); - // Ollama sends tool_calls in intermediate (done:false) chunks, - // NOT in the final done:true chunk. Collect from all chunks. - if (chunk.message?.tool_calls) { - accumulatedToolCalls.push(...chunk.message.tool_calls); - } +// Retry mechanism for stream parsing + let retryCount = 0; + const maxRetries = 3; + let connectionHealthy = true; + const connectionCheckInterval = 30000; // 30 seconds + let lastConnectionCheck = Date.now(); + + while (retryCount <= maxRetries) { + try { + for await (const chunk of parseNdjsonStream(reader, config.bufferSize)) { + const currentTime = Date.now(); + + // Throttle streaming to reduce UI updates + if (currentTime - lastStreamTime < config.throttleDelay) { + continue; + } + + // ── Real-time text_delta events (manusilized: incremental streaming) ── + // Emit each content fragment immediately so the UI can render a + // live typewriter effect instead of waiting for the full response. + if (chunk.message?.content) { + const delta = chunk.message.content; + accumulatedContent += delta; + stream.push({ + type: "text_delta", + contentIndex, + delta, + partial: buildAssistantMessageWithZeroUsage({ + model, + content: [{ type: "text", text: accumulatedContent }], + stopReason: "stop", + }), + }); + + lastStreamTime = currentTime; + } - if (chunk.done) { - finalResponse = chunk; - break; + // Include thinking/reasoning output for enhanced experience + if (config.enableThinkingOutput && chunk.message?.thinking) { + stream.push({ + type: "thinking_delta", + content: chunk.message.thinking, + }); + } + + // Ollama sends tool_calls in intermediate (done:false) chunks, + // NOT in the final done:true chunk. Collect from all chunks. + if (chunk.message?.tool_calls) { + accumulatedToolCalls.push(...chunk.message.tool_calls); + } + + if (chunk.done) { + finalResponse = chunk; + break; + } + } + break; // Success, exit retry loop + } catch (err) { + retryCount++; + if (retryCount > maxRetries) { + throw err; // Re-throw if max retries exceeded + } + + log.warn(`[manusilized] Stream parsing failed, retry ${retryCount}/${maxRetries}:`, err); + + // Wait before retry with exponential backoff + await new Promise(resolve => setTimeout(resolve, Math.pow(2, retryCount) * 1000)); + + // Re-initialize the stream if possible + if (options?.signal?.aborted) { + throw new Error("Stream aborted by user"); + } } } @@ -522,10 +817,47 @@ export function createOllamaStreamFn( } finalResponse.message.content = accumulatedContent; + + // Emit text_end event to indicate completion of text streaming + stream.push({ + type: "text_end", + contentIndex, + partial: buildAssistantMessageWithZeroUsage({ + model, + content: [{ type: "text", text: accumulatedContent }], + stopReason: "stop", + }), + }); + + // ── Markdown tool-call fallback (manusilized: fault-tolerant adapter) ── + // If the model produced no native tool_calls but embedded a JSON tool + // call inside a fenced code block, extract it as a fallback so that + // open-source models that don't support structured output still work. + if (accumulatedToolCalls.length === 0 && accumulatedContent) { + const markdownCalls = extractMarkdownToolCalls(accumulatedContent); + if (markdownCalls.length > 0) { + log.debug( + `[manusilized] Extracted ${markdownCalls.length} tool call(s) from Markdown fallback`, + ); + accumulatedToolCalls.push(...markdownCalls); + // Strip the tool-call JSON blocks from the visible content so the + // user doesn't see raw JSON in the chat bubble. + finalResponse.message.content = accumulatedContent + .replace(MARKDOWN_TOOL_CALL_RE, "") + .replace(/```(?:json)?\s*\{[\s\S]*?\}\s*```/g, "") + .replace(/[\s\S]*?<\/tool_call>/g, "") + .replace(/```(?:ya?ml)\s*\n[\s\S]*?\s*\n```/g, "") + .trim(); + } + } + if (accumulatedToolCalls.length > 0) { finalResponse.message.tool_calls = accumulatedToolCalls; } + // Increment contentIndex after text is done (mirrors OpenAI WS pattern). + contentIndex += 1; + const assistantMessage = buildAssistantMessage(finalResponse, { api: model.api, provider: model.provider, @@ -563,6 +895,7 @@ export function createOllamaStreamFn( export function createConfiguredOllamaStreamFn(params: { model: { baseUrl?: string; headers?: unknown }; providerBaseUrl?: string; + streamConfig?: StreamConfig; }): StreamFn { const modelBaseUrl = typeof params.model.baseUrl === "string" ? params.model.baseUrl : undefined; return createOllamaStreamFn( @@ -571,5 +904,6 @@ export function createConfiguredOllamaStreamFn(params: { providerBaseUrl: params.providerBaseUrl, }), resolveOllamaModelHeaders(params.model), + params.streamConfig, ); -} +} \ No newline at end of file