Merge 7f82d055fcb1994a534026abc42e723debb07bda into 6b4c24c2e55b5b4013277bd799525086f6a0c40f

This commit is contained in:
Da Wei 2026-03-20 21:43:58 -07:00 committed by GitHub
commit d86f22f868
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 744 additions and 48 deletions

106
src/agents/config-utils.ts Normal file
View File

@ -0,0 +1,106 @@
import { existsSync, readFileSync } from "node:fs";
import { join as pathJoin } from "node:path";
export interface ManusilizedConfig {
streaming?: {
mode?: "standard" | "enhanced" | "ultra";
bufferSize?: number;
throttleDelay?: number;
enableThinkingOutput?: boolean;
streamInterval?: number;
};
context?: {
enableMegaContext?: boolean;
maxContextWindow?: number;
autoDetectContext?: boolean;
};
}
/**
* Load Manusilized configuration from config file
* @param configPath Path to the OpenClaw config directory
* @returns ManusilizedConfig or default config if not found
*/
export function loadManusilizedConfig(configPath?: string): ManusilizedConfig {
// Try to find config file in common locations
const possiblePaths = [
configPath ? pathJoin(configPath, "manusilized-streaming.json") : "",
pathJoin(process.cwd(), "config", "manusilized-streaming.json"),
pathJoin(process.cwd(), "manusilized-streaming.json"),
"/etc/openclaw/manusilized-streaming.json",
].filter(Boolean) as string[];
for (const configFilePath of possiblePaths) {
if (existsSync(configFilePath)) {
try {
const configFile = readFileSync(configFilePath, "utf8");
const config = JSON.parse(configFile) as ManusilizedConfig;
console.log(`[manusilized] Loaded config from ${configFilePath}`);
return config;
} catch (err) {
console.warn(`[manusilized] Failed to parse config file ${configFilePath}:`, err);
}
}
}
// Return default config if no file found
console.log("[manusilized] Using default configuration");
return {
streaming: {
mode: "standard",
bufferSize: 1024,
throttleDelay: 10,
enableThinkingOutput: false,
streamInterval: 50,
},
context: {
enableMegaContext: false,
maxContextWindow: 262144,
autoDetectContext: true,
},
};
}
/**
* Apply streaming mode presets to config
* @param config Base configuration
* @returns Configuration with streaming mode applied
*/
export function applyStreamingMode(config: ManusilizedConfig): ManusilizedConfig {
const mode = config.streaming?.mode || "standard";
switch (mode) {
case "enhanced":
return {
...config,
streaming: {
...config.streaming,
bufferSize: 2048,
throttleDelay: 5,
streamInterval: 25,
},
};
case "ultra":
return {
...config,
streaming: {
...config.streaming,
bufferSize: 4096,
throttleDelay: 1,
streamInterval: 10,
enableThinkingOutput: true,
},
};
case "standard":
default:
return {
...config,
streaming: {
...config.streaming,
bufferSize: 1024,
throttleDelay: 10,
streamInterval: 50,
},
};
}
}

View File

@ -2,7 +2,10 @@ import type { ModelDefinitionConfig } from "../config/types.models.js";
import { OLLAMA_DEFAULT_BASE_URL } from "./ollama-defaults.js";
export const OLLAMA_DEFAULT_CONTEXT_WINDOW = 128000;
export const OLLAMA_EXTENDED_CONTEXT_WINDOW = 262144; // 256K default for newer models
export const OLLAMA_MEGA_CONTEXT_WINDOW = 2097152; // 2M context window support
export const OLLAMA_DEFAULT_MAX_TOKENS = 8192;
export const OLLAMA_EXTENDED_MAX_TOKENS = 32768; // Extended max tokens for large models
export const OLLAMA_DEFAULT_COST = {
input: 0,
output: 0,
@ -19,6 +22,7 @@ export type OllamaTagModel = {
details?: {
family?: string;
parameter_size?: string;
quantization_level?: string;
};
};
@ -28,9 +32,13 @@ export type OllamaTagsResponse = {
export type OllamaModelWithContext = OllamaTagModel & {
contextWindow?: number;
maxTokens?: number;
isLocal?: boolean;
};
const OLLAMA_SHOW_CONCURRENCY = 8;
const CONTEXT_WINDOW_CACHE = new Map<string, { contextWindow: number; timestamp: number }>();
const CACHE_TTL = 300000; // 5 minutes cache TTL
/**
* Derive the Ollama native API base URL from a configured base URL.
@ -48,76 +56,301 @@ export function resolveOllamaApiBase(configuredBaseUrl?: string): string {
return trimmed.replace(/\/v1$/i, "");
}
/**
* Enhanced context window detection with caching and fallback strategies
*/
export async function queryOllamaContextWindow(
apiBase: string,
modelName: string,
): Promise<number | undefined> {
// Check cache first
const cached = CONTEXT_WINDOW_CACHE.get(modelName);
if (cached && Date.now() - cached.timestamp < CACHE_TTL) {
return cached.contextWindow;
}
try {
// Try the modern /api/show endpoint first
const response = await fetch(`${apiBase}/api/show`, {
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify({ name: modelName }),
signal: AbortSignal.timeout(3000),
signal: AbortSignal.timeout(5000),
});
if (!response.ok) {
return undefined;
// Fallback to older method
return await queryOllamaContextWindowLegacy(apiBase, modelName);
}
const data = (await response.json()) as { model_info?: Record<string, unknown> };
if (!data.model_info) {
return undefined;
// Fallback if no model info
return estimateContextWindow(modelName);
}
// Look for context window information in model info
let contextWindow: number | undefined;
for (const [key, value] of Object.entries(data.model_info)) {
if (key.endsWith(".context_length") && typeof value === "number" && Number.isFinite(value)) {
const contextWindow = Math.floor(value);
if (contextWindow > 0) {
return contextWindow;
const detectedWindow = Math.floor(value);
if (detectedWindow > 0) {
contextWindow = detectedWindow;
break;
}
}
// Also check for BERT-style max_position_embeddings
if (key.includes("max_position_embeddings") && typeof value === "number" && Number.isFinite(value)) {
const detectedWindow = Math.floor(value);
if (detectedWindow > 0) {
contextWindow = detectedWindow;
break;
}
}
}
return undefined;
// If not found, try to estimate based on model name and parameters
if (contextWindow === undefined) {
contextWindow = estimateContextWindow(modelName, data.model_info);
}
// Cache the result
if (contextWindow !== undefined) {
CONTEXT_WINDOW_CACHE.set(modelName, {
contextWindow,
timestamp: Date.now()
});
}
return contextWindow;
} catch (error) {
// Fallback to estimation on error
const estimated = estimateContextWindow(modelName);
if (estimated !== undefined) {
CONTEXT_WINDOW_CACHE.set(modelName, {
contextWindow: estimated,
timestamp: Date.now()
});
}
return estimated;
}
}
/**
* Legacy method for context window detection
*/
async function queryOllamaContextWindowLegacy(
apiBase: string,
modelName: string,
): Promise<number | undefined> {
try {
// Try to get model parameters through config
const configResponse = await fetch(`${apiBase}/api/show`, {
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify({
name: modelName,
verbose: true
}),
signal: AbortSignal.timeout(3000),
});
if (!configResponse.ok) {
return undefined;
}
const configData = await configResponse.json();
// Estimate based on model family and size
return estimateContextWindow(modelName, configData);
} catch {
return undefined;
}
}
/**
* Intelligent context window estimation based on model name and properties
*/
function estimateContextWindow(
modelName: string,
modelInfo?: Record<string, unknown>
): number | undefined {
const name = modelName.toLowerCase();
// Check for explicit context window in model info
if (modelInfo) {
for (const [key, value] of Object.entries(modelInfo)) {
if (key.includes('context') && typeof value === 'number' && value > 0) {
return Math.min(Math.floor(value), OLLAMA_MEGA_CONTEXT_WINDOW);
}
}
}
// Qwen3 series - known for large context support
if (name.includes('qwen3') || name.includes('qwen-3')) {
if (name.includes('128k') || name.includes('128K')) return 131072;
if (name.includes('256k') || name.includes('256K')) return 262144;
if (name.includes('1m') || name.includes('1M')) return 1048576;
if (name.includes('2m') || name.includes('2M')) return OLLAMA_MEGA_CONTEXT_WINDOW;
return OLLAMA_EXTENDED_CONTEXT_WINDOW; // Default 256K for Qwen3
}
// GLM series
if (name.includes('glm') && (name.includes('5') || name.includes('4'))) {
if (name.includes('128k')) return 131072;
return OLLAMA_EXTENDED_CONTEXT_WINDOW;
}
// DeepSeek series
if (name.includes('deepseek') && name.includes('v3')) {
return OLLAMA_MEGA_CONTEXT_WINDOW; // 2M context for DeepSeek V3
}
// Kimi series
if (name.includes('kimi') && (name.includes('k2') || name.includes('2.5'))) {
return OLLAMA_MEGA_CONTEXT_WINDOW;
}
// Large models with likely extended context
if (name.includes('llama3') || name.includes('llama-3')) {
if (name.includes('70b') || name.includes('70B')) {
return OLLAMA_EXTENDED_CONTEXT_WINDOW;
}
if (name.includes('405b') || name.includes('405B')) {
return OLLAMA_MEGA_CONTEXT_WINDOW;
}
}
// Code models often have extended context
if (name.includes('code') || name.includes('coder')) {
if (name.includes('34b') || name.includes('34B')) {
return OLLAMA_EXTENDED_CONTEXT_WINDOW;
}
}
// Return default if no specific match
return undefined;
}
export async function enrichOllamaModelsWithContext(
apiBase: string,
models: OllamaTagModel[],
opts?: { concurrency?: number },
opts?: { concurrency?: number; enableMegaContext?: boolean },
): Promise<OllamaModelWithContext[]> {
const concurrency = Math.max(1, Math.floor(opts?.concurrency ?? OLLAMA_SHOW_CONCURRENCY));
const enableMegaContext = opts?.enableMegaContext ?? false;
const enriched: OllamaModelWithContext[] = [];
for (let index = 0; index < models.length; index += concurrency) {
const batch = models.slice(index, index + concurrency);
const batchResults = await Promise.all(
batch.map(async (model) => ({
...model,
contextWindow: await queryOllamaContextWindow(apiBase, model.name),
})),
batch.map(async (model) => {
const contextWindow = await queryOllamaContextWindow(apiBase, model.name);
const maxTokens = determineMaxTokens(model.name, contextWindow);
const isLocal = detectLocalModel(model);
return {
...model,
contextWindow: enableMegaContext && contextWindow ?
Math.min(contextWindow * 2, OLLAMA_MEGA_CONTEXT_WINDOW) :
contextWindow,
maxTokens,
isLocal,
};
}),
);
enriched.push(...batchResults);
}
return enriched;
}
/** Heuristic: treat models with "r1", "reasoning", or "think" in the name as reasoning models. */
export function isReasoningModelHeuristic(modelId: string): boolean {
return /r1|reasoning|think|reason/i.test(modelId);
/**
* Determine appropriate max tokens based on context window
*/
function determineMaxTokens(modelName: string, contextWindow?: number): number {
if (!contextWindow) {
return OLLAMA_DEFAULT_MAX_TOKENS;
}
// For very large context windows, allow larger max tokens
if (contextWindow >= OLLAMA_MEGA_CONTEXT_WINDOW) {
return OLLAMA_EXTENDED_MAX_TOKENS;
}
if (contextWindow >= OLLAMA_EXTENDED_CONTEXT_WINDOW) {
return Math.min(OLLAMA_EXTENDED_MAX_TOKENS, Math.floor(contextWindow * 0.25));
}
return OLLAMA_DEFAULT_MAX_TOKENS;
}
/** Build a ModelDefinitionConfig for an Ollama model with default values. */
/**
* Detect if model is likely running locally
*/
function detectLocalModel(model: OllamaTagModel): boolean {
// Check for local indicators in model name
const name = model.name.toLowerCase();
return !name.includes('api') &&
!name.includes('cloud') &&
!name.includes('remote') &&
!model.remote_host;
}
/**
* Heuristic: treat models with known reasoning-capable name patterns as
* reasoning models. The list is intentionally broad to cover the rapidly
* evolving open-source landscape (2025-2026).
*
* Patterns added by manusilized:
* - qwen3 (Qwen3 series all variants have extended thinking)
* - qwq (QwQ reasoning model family)
* - glm-?5 / glm5 (GLM-5 supports deep reasoning)
* - kimi-?k2 / k2.5 (Kimi K2.5 trillion-parameter reasoning)
* - deepseek-v3 (DeepSeek V3.2 with integrated thinking mode)
* - marco-o1 (Marco-o1 reasoning model)
* - skywork-o (Skywork-o series)
* - llama.*reason (Llama variants with reasoning capability)
* - mistral.*large (Large Mistral models with reasoning)
* - yi.*1.5 (Yi 1.5 series reasoning models)
* - command.*r (Command R series reasoning models)
*/
export function isReasoningModelHeuristic(modelId: string): boolean {
const id = modelId.toLowerCase();
return /r1|reasoning|think|reason|qwen3|qwq|glm-?5|kimi-?k2|deepseek-v3|marco-o|skywork-o|llama.*reason|mistral.*large|yi.*1\.5|command.*r/i.test(
id,
);
}
/**
* Enhanced model definition with support for mega-context windows
*/
export function buildOllamaModelDefinition(
modelId: string,
contextWindow?: number,
maxTokens?: number,
): ModelDefinitionConfig {
// Determine if this is a reasoning model
const isReasoning = isReasoningModelHeuristic(modelId);
// Use provided values or defaults
const effectiveContextWindow = contextWindow ??
(isReasoning ? OLLAMA_EXTENDED_CONTEXT_WINDOW : OLLAMA_DEFAULT_CONTEXT_WINDOW);
const effectiveMaxTokens = maxTokens ??
(effectiveContextWindow >= OLLAMA_MEGA_CONTEXT_WINDOW ?
OLLAMA_EXTENDED_MAX_TOKENS :
OLLAMA_DEFAULT_MAX_TOKENS);
return {
id: modelId,
name: modelId,
reasoning: isReasoningModelHeuristic(modelId),
reasoning: isReasoning,
input: ["text"],
cost: OLLAMA_DEFAULT_COST,
contextWindow: contextWindow ?? OLLAMA_DEFAULT_CONTEXT_WINDOW,
maxTokens: OLLAMA_DEFAULT_MAX_TOKENS,
contextWindow: Math.min(effectiveContextWindow, OLLAMA_MEGA_CONTEXT_WINDOW),
maxTokens: Math.min(effectiveMaxTokens,
Math.floor(Math.min(effectiveContextWindow, OLLAMA_MEGA_CONTEXT_WINDOW) * 0.5)),
};
}
@ -140,3 +373,26 @@ export async function fetchOllamaModels(
return { reachable: false, models: [] };
}
}
/**
* Predefined context windows for popular models
*/
export const PREDEFINED_CONTEXT_WINDOWS: Record<string, number> = {
'qwen3': 262144,
'qwen3:4b': 262144,
'qwen3:8b': 262144,
'qwen3:32b': 262144,
'qwen3:72b': 262144,
'qwen3:110b': 2097152, // 2M context
'glm-5': 262144,
'glm-5-air': 262144,
'glm-5-pro': 2097152, // 2M context
'deepseek-v3': 2097152, // 2M context
'kimi-k2.5': 2097152, // 2M context
'llama3.1:8b': 131072,
'llama3.1:70b': 131072,
'llama3.1:405b': 2097152, // 2M context
'mistral-large': 131072,
'yi-1.5:9b': 32768,
'yi-1.5:34b': 262144,
};

View File

@ -13,10 +13,14 @@ import { isNonSecretApiKeyMarker } from "./model-auth-markers.js";
import { OLLAMA_DEFAULT_BASE_URL } from "./ollama-defaults.js";
import {
buildAssistantMessage as buildStreamAssistantMessage,
buildAssistantMessageWithZeroUsage,
buildStreamErrorAssistantMessage,
buildUsageWithNoCost,
} from "./stream-message-shared.js";
// Import config utilities
import { loadManusilizedConfig, applyStreamingMode, type ManusilizedConfig } from "./config-utils.js";
const log = createSubsystemLogger("ollama-stream");
export const OLLAMA_NATIVE_BASE_URL = OLLAMA_DEFAULT_BASE_URL;
@ -374,13 +378,162 @@ export function buildAssistantMessage(
});
}
// ── NDJSON streaming parser ─────────────────────────────────────────────────
// ── Markdown tool-call fallback extractor ──────────────────────────────────
//
// Some open-source models (e.g. older Llama3, GLM variants) do not emit
// structured `tool_calls` in the Ollama response. Instead they embed a JSON
// object inside a fenced code block in the `content` field, e.g.:
//
// ```json
// {"name": "bash", "arguments": {"command": "ls"}}
// ```
//
// `extractMarkdownToolCalls` scans the accumulated content string for these
// patterns and converts them into proper `OllamaToolCall` objects so the rest
// of the pipeline can treat them identically to native tool calls.
const MARKDOWN_TOOL_CALL_RE =
/```(?:json)?\s*\n?\s*\{[\s\S]*?"name"\s*:\s*"([^"]+)"[\s\S]*?\}\s*\n?```/g;
// Additional patterns for better tool call detection
// Additional patterns for better tool call detection
const ADDITIONAL_TOOL_CALL_PATTERNS = [
/```(?:json)?\s*\n?\s*\{[\s\S]*?"name"\s*:\s*"([^"]+)"[\s\S]*?\}\s*\n?```/g,
/```(?:json)?\s*\n?\s*\{[\s\S]*?"function"\s*:\s*\{[\s\S]*?"name"\s*:\s*"([^"]+)"[\s\S]*?\}\s*\n?```/g,
/\{[\s\S]*?"name"\s*:\s*"([^"]+)"[\s\S]*?"arguments"\s*:\s*(\{[\s\S]*?\})[\s\S]*?\}/g,
/<tool_call>([\s\S]*?)<\/tool_call>/g,
/```(?:ya?ml)\s*\n([\s\S]*?name:\s*[^\n]+[\s\S]*?arguments:\s*\{[\s\S]*?\})\s*\n```/g,
/(?:^|\n)(name:\s*[^\n]+\narguments:\s*\{[\s\S]*?\})(?=\n|$)/gm,
];
/**
* Parse YAML-style tool call content
* @param yamlContent YAML formatted tool call string
* @returns Parsed tool call object
*/
function parseYamlToolCall(yamlContent: string): Record<string, unknown> {
const lines = yamlContent.trim().split('\n');
const result: Record<string, unknown> = {};
for (const line of lines) {
const trimmed = line.trim();
if (trimmed.startsWith('name:')) {
result.name = trimmed.substring(5).trim().replace(/^["']|["']$/g, '');
} else if (trimmed.startsWith('arguments:')) {
// Extract JSON portion after arguments:
const argsMatch = trimmed.match(/arguments:\s*(\{.*\})/);
if (argsMatch && argsMatch[1]) {
try {
result.arguments = JSON.parse(argsMatch[1]);
} catch {
// If JSON parsing fails, try to extract key-value pairs
const argsObj: Record<string, unknown> = {};
const argsLines = trimmed.substring(10).trim();
if (argsLines.startsWith('{') && argsLines.endsWith('}')) {
// Simplified JSON parsing for common cases
const keyValuePairs = argsLines.substring(1, argsLines.length - 1).split(',');
for (const pair of keyValuePairs) {
const [key, value] = pair.split(':').map(s => s.trim());
if (key && value) {
// Try to parse value as JSON or keep as string
try {
argsObj[key.replace(/^["']|["']$/g, '')] = JSON.parse(value);
} catch {
argsObj[key.replace(/^["']|["']$/g, '')] = value.replace(/^["']|["']$/g, '');
}
}
}
result.arguments = argsObj;
}
}
}
}
}
return result;
}
export function extractMarkdownToolCalls(content: string): OllamaToolCall[] {
const results: OllamaToolCall[] = [];
// Original pattern
let match: RegExpExecArray | null;
MARKDOWN_TOOL_CALL_RE.lastIndex = 0;
while ((match = MARKDOWN_TOOL_CALL_RE.exec(content)) !== null) {
const raw = match[0]
.replace(/^```(?:json)?\s*/i, "")
.replace(/\s*```$/, "")
.trim();
try {
const parsed = parseJsonPreservingUnsafeIntegers(raw) as Record<string, unknown>;
const name = typeof parsed.name === "string" ? parsed.name : undefined;
if (!name) {
continue;
}
const args =
parsed.arguments != null && typeof parsed.arguments === "object"
? (parsed.arguments as Record<string, unknown>)
: parsed.parameters != null && typeof parsed.parameters === "object"
? (parsed.parameters as Record<string, unknown>)
: {};
results.push({ function: { name, arguments: args } });
} catch {
log.warn(`[manusilized] Failed to parse Markdown tool call: ${raw.slice(0, 120)}`);
}
}
// Additional patterns
for (const pattern of ADDITIONAL_TOOL_CALL_PATTERNS) {
pattern.lastIndex = 0;
while ((match = pattern.exec(content)) !== null) {
try {
let parsed: Record<string, unknown>;
if (match.length >= 3) {
// For patterns with separate name and arguments captures
const name = match[1];
const argsRaw = match[2];
parsed = { name, arguments: JSON.parse(argsRaw) };
} else if (pattern.source.includes("ya?ml")) {
// Handle YAML format
const yamlContent = match[1] || match[0];
parsed = parseYamlToolCall(yamlContent);
} else {
// For patterns with single capture containing full JSON
const raw = match[1] || match[0];
parsed = parseJsonPreservingUnsafeIntegers(raw) as Record<string, unknown>;
}
const name = typeof parsed.name === "string" ? parsed.name : undefined;
if (!name) {
continue;
}
const args =
parsed.arguments != null && typeof parsed.arguments === "object"
? (parsed.arguments as Record<string, unknown>)
: parsed.parameters != null && typeof parsed.parameters === "object"
? (parsed.parameters as Record<string, unknown>)
: {};
results.push({ function: { name, arguments: args } });
} catch (err) {
log.warn(`[manusilized] Failed to parse additional tool call pattern: ${match[0].slice(0, 120)}`);
}
}
}
return results;
}
// ── NDJSON streaming parser with enhanced buffering ─────────────────────────
export async function* parseNdjsonStream(
reader: ReadableStreamDefaultReader<Uint8Array>,
bufferSize: number = 1024, // Configurable buffer size for smoother streaming
): AsyncGenerator<OllamaChatResponse> {
const decoder = new TextDecoder();
let buffer = "";
let accumulatedBuffer = "";
while (true) {
const { done, value } = await reader.read();
@ -388,32 +541,60 @@ export async function* parseNdjsonStream(
break;
}
buffer += decoder.decode(value, { stream: true });
const lines = buffer.split("\n");
buffer = lines.pop() ?? "";
// Accumulate buffer for smoother processing
accumulatedBuffer += buffer;
// Only process when we have enough data or stream is ending
if (accumulatedBuffer.length >= bufferSize || done) {
const lines = accumulatedBuffer.split("\n");
accumulatedBuffer = lines.pop() ?? "";
for (const line of lines) {
const trimmed = line.trim();
if (!trimmed) {
continue;
for (const line of lines) {
const trimmed = line.trim();
if (!trimmed) {
continue;
}
try {
yield parseJsonPreservingUnsafeIntegers(trimmed) as OllamaChatResponse;
} catch {
log.warn(`Skipping malformed NDJSON line: ${trimmed.slice(0, 120)}`);
}
}
try {
yield parseJsonPreservingUnsafeIntegers(trimmed) as OllamaChatResponse;
} catch {
log.warn(`Skipping malformed NDJSON line: ${trimmed.slice(0, 120)}`);
// Reset buffer if we're done
if (done && accumulatedBuffer.trim()) {
try {
yield parseJsonPreservingUnsafeIntegers(accumulatedBuffer.trim()) as OllamaChatResponse;
} catch {
log.warn(`Skipping malformed trailing data: ${accumulatedBuffer.trim().slice(0, 120)}`);
}
}
}
buffer = ""; // Reset buffer for next iteration
}
if (buffer.trim()) {
// Handle any remaining data
if (accumulatedBuffer.trim()) {
try {
yield parseJsonPreservingUnsafeIntegers(buffer.trim()) as OllamaChatResponse;
yield parseJsonPreservingUnsafeIntegers(accumulatedBuffer.trim()) as OllamaChatResponse;
} catch {
log.warn(`Skipping malformed trailing data: ${buffer.trim().slice(0, 120)}`);
log.warn(`Skipping malformed trailing data: ${accumulatedBuffer.trim().slice(0, 120)}`);
}
}
}
// ── Main StreamFn factory ───────────────────────────────────────────────────
// ── Enhanced Stream Configuration ───────────────────────────────────────────
interface StreamConfig {
bufferSize?: number;
throttleDelay?: number;
enableThinkingOutput?: boolean;
streamInterval?: number;
}
// ── Main StreamFn factory with enhanced streaming ───────────────────────────
function resolveOllamaChatUrl(baseUrl: string): string {
const trimmed = baseUrl.trim().replace(/\/+$/, "");
@ -434,8 +615,48 @@ function resolveOllamaModelHeaders(model: {
export function createOllamaStreamFn(
baseUrl: string,
defaultHeaders?: Record<string, string>,
streamConfig?: StreamConfig,
configPath?: string,
): StreamFn {
// Load configuration from file if available
let baseConfig: ManusilizedConfig = {
streaming: {
mode: "standard",
bufferSize: 1024,
throttleDelay: 10,
enableThinkingOutput: false,
streamInterval: 50,
},
context: {
enableMegaContext: false,
maxContextWindow: 262144,
autoDetectContext: true,
},
};
// Try to load config from file
try {
if (loadManusilizedConfig) {
const fileConfig = loadManusilizedConfig(configPath);
baseConfig = { ...baseConfig, ...fileConfig };
}
} catch (err) {
console.warn("[manusilized] Failed to load config file, using defaults:", err);
}
// Apply streaming mode presets
if (applyStreamingMode) {
baseConfig = applyStreamingMode(baseConfig);
}
const chatUrl = resolveOllamaChatUrl(baseUrl);
const config = {
bufferSize: baseConfig.streaming?.bufferSize || 1024,
throttleDelay: baseConfig.streaming?.throttleDelay || 10,
enableThinkingOutput: baseConfig.streaming?.enableThinkingOutput || false,
streamInterval: baseConfig.streaming?.streamInterval || 50,
...streamConfig,
};
return (model, context, options) => {
const stream = createAssistantMessageEventStream();
@ -451,7 +672,11 @@ export function createOllamaStreamFn(
// Ollama defaults to num_ctx=4096 which is too small for large
// system prompts + many tool definitions. Use model's contextWindow.
const ollamaOptions: Record<string, unknown> = { num_ctx: model.contextWindow ?? 65536 };
const ollamaOptions: Record<string, unknown> = {
num_ctx: model.contextWindow ?? 65536,
// Enable thinking output for reasoning models
...(config.enableThinkingOutput ? { thinking: true } : {})
};
if (typeof options?.temperature === "number") {
ollamaOptions.temperature = options.temperature;
}
@ -499,21 +724,91 @@ export function createOllamaStreamFn(
let accumulatedContent = "";
const accumulatedToolCalls: OllamaToolCall[] = [];
let finalResponse: OllamaChatResponse | undefined;
let contentIndex = 0;
let lastStreamTime = Date.now();
for await (const chunk of parseNdjsonStream(reader)) {
if (chunk.message?.content) {
accumulatedContent += chunk.message.content;
}
// Emit a "start" event so consumers know the assistant has begun.
stream.push({
type: "start",
partial: buildAssistantMessageWithZeroUsage({
model,
content: [],
stopReason: "stop",
}),
});
// Ollama sends tool_calls in intermediate (done:false) chunks,
// NOT in the final done:true chunk. Collect from all chunks.
if (chunk.message?.tool_calls) {
accumulatedToolCalls.push(...chunk.message.tool_calls);
}
// Retry mechanism for stream parsing
let retryCount = 0;
const maxRetries = 3;
let connectionHealthy = true;
const connectionCheckInterval = 30000; // 30 seconds
let lastConnectionCheck = Date.now();
while (retryCount <= maxRetries) {
try {
for await (const chunk of parseNdjsonStream(reader, config.bufferSize)) {
const currentTime = Date.now();
// Throttle streaming to reduce UI updates
if (currentTime - lastStreamTime < config.throttleDelay) {
continue;
}
// ── Real-time text_delta events (manusilized: incremental streaming) ──
// Emit each content fragment immediately so the UI can render a
// live typewriter effect instead of waiting for the full response.
if (chunk.message?.content) {
const delta = chunk.message.content;
accumulatedContent += delta;
stream.push({
type: "text_delta",
contentIndex,
delta,
partial: buildAssistantMessageWithZeroUsage({
model,
content: [{ type: "text", text: accumulatedContent }],
stopReason: "stop",
}),
});
lastStreamTime = currentTime;
}
if (chunk.done) {
finalResponse = chunk;
break;
// Include thinking/reasoning output for enhanced experience
if (config.enableThinkingOutput && chunk.message?.thinking) {
stream.push({
type: "thinking_delta",
content: chunk.message.thinking,
});
}
// Ollama sends tool_calls in intermediate (done:false) chunks,
// NOT in the final done:true chunk. Collect from all chunks.
if (chunk.message?.tool_calls) {
accumulatedToolCalls.push(...chunk.message.tool_calls);
}
if (chunk.done) {
finalResponse = chunk;
break;
}
}
break; // Success, exit retry loop
} catch (err) {
retryCount++;
if (retryCount > maxRetries) {
throw err; // Re-throw if max retries exceeded
}
log.warn(`[manusilized] Stream parsing failed, retry ${retryCount}/${maxRetries}:`, err);
// Wait before retry with exponential backoff
await new Promise(resolve => setTimeout(resolve, Math.pow(2, retryCount) * 1000));
// Re-initialize the stream if possible
if (options?.signal?.aborted) {
throw new Error("Stream aborted by user");
}
}
}
@ -522,10 +817,47 @@ export function createOllamaStreamFn(
}
finalResponse.message.content = accumulatedContent;
// Emit text_end event to indicate completion of text streaming
stream.push({
type: "text_end",
contentIndex,
partial: buildAssistantMessageWithZeroUsage({
model,
content: [{ type: "text", text: accumulatedContent }],
stopReason: "stop",
}),
});
// ── Markdown tool-call fallback (manusilized: fault-tolerant adapter) ──
// If the model produced no native tool_calls but embedded a JSON tool
// call inside a fenced code block, extract it as a fallback so that
// open-source models that don't support structured output still work.
if (accumulatedToolCalls.length === 0 && accumulatedContent) {
const markdownCalls = extractMarkdownToolCalls(accumulatedContent);
if (markdownCalls.length > 0) {
log.debug(
`[manusilized] Extracted ${markdownCalls.length} tool call(s) from Markdown fallback`,
);
accumulatedToolCalls.push(...markdownCalls);
// Strip the tool-call JSON blocks from the visible content so the
// user doesn't see raw JSON in the chat bubble.
finalResponse.message.content = accumulatedContent
.replace(MARKDOWN_TOOL_CALL_RE, "")
.replace(/```(?:json)?\s*\{[\s\S]*?\}\s*```/g, "")
.replace(/<tool_call>[\s\S]*?<\/tool_call>/g, "")
.replace(/```(?:ya?ml)\s*\n[\s\S]*?\s*\n```/g, "")
.trim();
}
}
if (accumulatedToolCalls.length > 0) {
finalResponse.message.tool_calls = accumulatedToolCalls;
}
// Increment contentIndex after text is done (mirrors OpenAI WS pattern).
contentIndex += 1;
const assistantMessage = buildAssistantMessage(finalResponse, {
api: model.api,
provider: model.provider,
@ -563,6 +895,7 @@ export function createOllamaStreamFn(
export function createConfiguredOllamaStreamFn(params: {
model: { baseUrl?: string; headers?: unknown };
providerBaseUrl?: string;
streamConfig?: StreamConfig;
}): StreamFn {
const modelBaseUrl = typeof params.model.baseUrl === "string" ? params.model.baseUrl : undefined;
return createOllamaStreamFn(
@ -571,5 +904,6 @@ export function createConfiguredOllamaStreamFn(params: {
providerBaseUrl: params.providerBaseUrl,
}),
resolveOllamaModelHeaders(params.model),
params.streamConfig,
);
}
}