diff --git a/src/agents/mcp-sse.test.ts b/src/agents/mcp-sse.test.ts new file mode 100644 index 00000000000..10c79876a69 --- /dev/null +++ b/src/agents/mcp-sse.test.ts @@ -0,0 +1,125 @@ +import { describe, expect, it } from "vitest"; +import { describeSseMcpServerLaunchConfig, resolveSseMcpServerLaunchConfig } from "./mcp-sse.js"; + +describe("resolveSseMcpServerLaunchConfig", () => { + it("resolves a valid https URL", () => { + const result = resolveSseMcpServerLaunchConfig({ + url: "https://mcp.example.com/sse", + }); + expect(result).toEqual({ + ok: true, + config: { + url: "https://mcp.example.com/sse", + headers: undefined, + }, + }); + }); + + it("resolves a valid http URL", () => { + const result = resolveSseMcpServerLaunchConfig({ + url: "http://localhost:3000/sse", + }); + expect(result).toEqual({ + ok: true, + config: { + url: "http://localhost:3000/sse", + headers: undefined, + }, + }); + }); + + it("includes headers when provided", () => { + const result = resolveSseMcpServerLaunchConfig({ + url: "https://mcp.example.com/sse", + headers: { + Authorization: "Bearer token123", + "X-Custom": "value", + }, + }); + expect(result).toEqual({ + ok: true, + config: { + url: "https://mcp.example.com/sse", + headers: { + Authorization: "Bearer token123", + "X-Custom": "value", + }, + }, + }); + }); + + it("coerces numeric and boolean header values to strings", () => { + const result = resolveSseMcpServerLaunchConfig({ + url: "https://mcp.example.com/sse", + headers: { "X-Count": 42, "X-Debug": true }, + }); + expect(result).toEqual({ + ok: true, + config: { + url: "https://mcp.example.com/sse", + headers: { "X-Count": "42", "X-Debug": "true" }, + }, + }); + }); + + it("rejects non-object input", () => { + const result = resolveSseMcpServerLaunchConfig("not-an-object"); + expect(result.ok).toBe(false); + if (!result.ok) { + expect(result.reason).toContain("must be an object"); + } + }); + + it("rejects missing url", () => { + const result = resolveSseMcpServerLaunchConfig({ command: "npx" }); + expect(result.ok).toBe(false); + if (!result.ok) { + expect(result.reason).toContain("url is missing"); + } + }); + + it("rejects empty url", () => { + const result = resolveSseMcpServerLaunchConfig({ url: " " }); + expect(result.ok).toBe(false); + if (!result.ok) { + expect(result.reason).toContain("url is missing"); + } + }); + + it("rejects invalid URL format", () => { + const result = resolveSseMcpServerLaunchConfig({ url: "not-a-url" }); + expect(result.ok).toBe(false); + if (!result.ok) { + expect(result.reason).toContain("not a valid URL"); + } + }); + + it("rejects non-http protocols", () => { + const result = resolveSseMcpServerLaunchConfig({ url: "ftp://example.com/sse" }); + expect(result.ok).toBe(false); + if (!result.ok) { + expect(result.reason).toContain("only http and https"); + } + }); + + it("trims whitespace from url", () => { + const result = resolveSseMcpServerLaunchConfig({ + url: " https://mcp.example.com/sse ", + }); + expect(result).toEqual({ + ok: true, + config: { + url: "https://mcp.example.com/sse", + headers: undefined, + }, + }); + }); +}); + +describe("describeSseMcpServerLaunchConfig", () => { + it("returns the url", () => { + expect(describeSseMcpServerLaunchConfig({ url: "https://mcp.example.com/sse" })).toBe( + "https://mcp.example.com/sse", + ); + }); +}); diff --git a/src/agents/mcp-sse.ts b/src/agents/mcp-sse.ts new file mode 100644 index 00000000000..ec8936dcffa --- /dev/null +++ b/src/agents/mcp-sse.ts @@ -0,0 +1,107 @@ +type SseMcpServerLaunchConfig = { + url: string; + headers?: Record; +}; + +type SseMcpServerLaunchResult = + | { ok: true; config: SseMcpServerLaunchConfig } + | { ok: false; reason: string }; + +function isRecord(value: unknown): value is Record { + return value !== null && typeof value === "object" && !Array.isArray(value); +} + +function toStringRecord( + value: unknown, + warnDropped?: (key: string, entry: unknown) => void, +): Record | undefined { + if (!isRecord(value)) { + return undefined; + } + const entries = Object.entries(value) + .map(([key, entry]) => { + if (typeof entry === "string") { + return [key, entry] as const; + } + if (typeof entry === "number" || typeof entry === "boolean") { + return [key, String(entry)] as const; + } + warnDropped?.(key, entry); + return null; + }) + .filter((entry): entry is readonly [string, string] => entry !== null); + return entries.length > 0 ? Object.fromEntries(entries) : undefined; +} + +export function resolveSseMcpServerLaunchConfig( + raw: unknown, + options?: { + onDroppedHeader?: (key: string, value: unknown) => void; + onMalformedHeaders?: (value: unknown) => void; + }, +): SseMcpServerLaunchResult { + if (!isRecord(raw)) { + return { ok: false, reason: "server config must be an object" }; + } + if (typeof raw.url !== "string" || raw.url.trim().length === 0) { + return { ok: false, reason: "its url is missing" }; + } + const url = raw.url.trim(); + let parsed: URL; + try { + parsed = new URL(url); + } catch { + return { ok: false, reason: `its url is not a valid URL: ${url}` }; + } + if (parsed.protocol !== "http:" && parsed.protocol !== "https:") { + return { + ok: false, + reason: `only http and https URLs are supported, got ${parsed.protocol}`, + }; + } + // Warn if headers is present but not an object (e.g. a string or array). + let headers: Record | undefined; + if (raw.headers !== undefined && raw.headers !== null) { + if (!isRecord(raw.headers)) { + options?.onMalformedHeaders?.(raw.headers); + } else { + headers = toStringRecord(raw.headers, options?.onDroppedHeader); + } + } + return { + ok: true, + config: { + url, + headers, + }, + }; +} + +export function describeSseMcpServerLaunchConfig(config: SseMcpServerLaunchConfig): string { + try { + const parsed = new URL(config.url); + // Redact embedded credentials and query-token auth from log/description output. + if (parsed.username || parsed.password) { + parsed.username = parsed.username ? "***" : ""; + parsed.password = parsed.password ? "***" : ""; + } + for (const key of parsed.searchParams.keys()) { + const lower = key.toLowerCase(); + if ( + lower === "token" || + lower === "key" || + lower === "api_key" || + lower === "apikey" || + lower === "secret" || + lower === "access_token" + ) { + parsed.searchParams.set(key, "***"); + } + } + return parsed.toString(); + } catch { + return config.url; + } +} + +export type { SseMcpServerLaunchConfig, SseMcpServerLaunchResult }; diff --git a/src/agents/mcp-stdio.ts b/src/agents/mcp-stdio.ts index 77ab6171ca7..55b2877bd66 100644 --- a/src/agents/mcp-stdio.ts +++ b/src/agents/mcp-stdio.ts @@ -47,7 +47,7 @@ export function resolveStdioMcpServerLaunchConfig(raw: unknown): StdioMcpServerL if (typeof raw.url === "string" && raw.url.trim().length > 0) { return { ok: false, - reason: "only stdio MCP servers are supported right now", + reason: "not a stdio server (has url)", }; } return { ok: false, reason: "its command is missing" }; diff --git a/src/agents/pi-bundle-mcp-tools.test.ts b/src/agents/pi-bundle-mcp-tools.test.ts index 69b2839eb94..9e2ecd5c5bb 100644 --- a/src/agents/pi-bundle-mcp-tools.test.ts +++ b/src/agents/pi-bundle-mcp-tools.test.ts @@ -1,4 +1,5 @@ import fs from "node:fs/promises"; +import http from "node:http"; import { createRequire } from "node:module"; import os from "node:os"; import path from "node:path"; @@ -8,6 +9,7 @@ import { createBundleMcpToolRuntime } from "./pi-bundle-mcp-tools.js"; const require = createRequire(import.meta.url); const SDK_SERVER_MCP_PATH = require.resolve("@modelcontextprotocol/sdk/server/mcp.js"); const SDK_SERVER_STDIO_PATH = require.resolve("@modelcontextprotocol/sdk/server/stdio.js"); +const SDK_SERVER_SSE_PATH = require.resolve("@modelcontextprotocol/sdk/server/sse.js"); const tempDirs: string[] = []; @@ -181,4 +183,79 @@ describe("createBundleMcpToolRuntime", () => { await runtime.dispose(); } }); + + it("loads configured SSE MCP tools via url", async () => { + // Dynamically import the SSE server transport from the SDK. + const { McpServer } = await import(SDK_SERVER_MCP_PATH); + const { SSEServerTransport } = await import(SDK_SERVER_SSE_PATH); + + const mcpServer = new McpServer({ name: "sse-probe", version: "1.0.0" }); + mcpServer.tool("sse_probe", "SSE MCP probe", async () => { + return { + content: [{ type: "text", text: "FROM-SSE" }], + }; + }); + + // Start an HTTP server that hosts the SSE MCP transport. + let sseTransport: + | { + handlePostMessage: (req: http.IncomingMessage, res: http.ServerResponse) => Promise; + } + | undefined; + const httpServer = http.createServer(async (req, res) => { + if (req.url === "/sse") { + sseTransport = new SSEServerTransport("/messages", res); + await mcpServer.connect(sseTransport); + } else if (req.url?.startsWith("/messages") && req.method === "POST") { + if (sseTransport) { + await sseTransport.handlePostMessage(req, res); + } else { + res.writeHead(400).end("No SSE session"); + } + } else { + res.writeHead(404).end(); + } + }); + + await new Promise((resolve) => { + httpServer.listen(0, "127.0.0.1", resolve); + }); + const addr = httpServer.address(); + const port = typeof addr === "object" && addr ? addr.port : 0; + + try { + const workspaceDir = await makeTempDir("openclaw-bundle-mcp-sse-"); + const runtime = await createBundleMcpToolRuntime({ + workspaceDir, + cfg: { + mcp: { + servers: { + sseProbe: { + url: `http://127.0.0.1:${port}/sse`, + }, + }, + }, + }, + }); + + try { + expect(runtime.tools.map((tool) => tool.name)).toEqual(["sse_probe"]); + const result = await runtime.tools[0].execute("call-sse-probe", {}, undefined, undefined); + expect(result.content[0]).toMatchObject({ + type: "text", + text: "FROM-SSE", + }); + expect(result.details).toEqual({ + mcpServer: "sseProbe", + mcpTool: "sse_probe", + }); + } finally { + await runtime.dispose(); + } + } finally { + await new Promise((resolve, reject) => + httpServer.close((err) => (err ? reject(err) : resolve())), + ); + } + }); }); diff --git a/src/agents/pi-bundle-mcp-tools.ts b/src/agents/pi-bundle-mcp-tools.ts index bbe3aa200ae..bae9160fcbc 100644 --- a/src/agents/pi-bundle-mcp-tools.ts +++ b/src/agents/pi-bundle-mcp-tools.ts @@ -1,10 +1,13 @@ import type { AgentToolResult } from "@mariozechner/pi-agent-core"; import { Client } from "@modelcontextprotocol/sdk/client/index.js"; +import { SSEClientTransport } from "@modelcontextprotocol/sdk/client/sse.js"; import { StdioClientTransport } from "@modelcontextprotocol/sdk/client/stdio.js"; +import type { Transport } from "@modelcontextprotocol/sdk/shared/transport.js"; import type { CallToolResult } from "@modelcontextprotocol/sdk/types.js"; import type { OpenClawConfig } from "../config/config.js"; import { logDebug, logWarn } from "../logger.js"; import { loadEmbeddedPiMcpConfig } from "./embedded-pi-mcp.js"; +import { describeSseMcpServerLaunchConfig, resolveSseMcpServerLaunchConfig } from "./mcp-sse.js"; import { describeStdioMcpServerLaunchConfig, resolveStdioMcpServerLaunchConfig, @@ -19,7 +22,7 @@ type BundleMcpToolRuntime = { type BundleMcpSession = { serverName: string; client: Client; - transport: StdioClientTransport; + transport: Transport; detachStderr?: () => void; }; @@ -119,6 +122,76 @@ async function disposeSession(session: BundleMcpSession) { await session.transport.close().catch(() => {}); } +/** Try to create a stdio or SSE transport for the given raw server config. */ +function resolveTransport( + serverName: string, + rawServer: unknown, +): { + transport: Transport; + description: string; + detachStderr?: () => void; +} | null { + // Try stdio first (command-based servers). + const stdioLaunch = resolveStdioMcpServerLaunchConfig(rawServer); + if (stdioLaunch.ok) { + const transport = new StdioClientTransport({ + command: stdioLaunch.config.command, + args: stdioLaunch.config.args, + env: stdioLaunch.config.env, + cwd: stdioLaunch.config.cwd, + stderr: "pipe", + }); + return { + transport, + description: describeStdioMcpServerLaunchConfig(stdioLaunch.config), + detachStderr: attachStderrLogging(serverName, transport), + }; + } + + // Try SSE (url-based servers). + const sseLaunch = resolveSseMcpServerLaunchConfig(rawServer, { + onDroppedHeader: (key) => { + logWarn( + `bundle-mcp: server "${serverName}": header "${key}" has an unsupported value type and was ignored.`, + ); + }, + onMalformedHeaders: () => { + logWarn( + `bundle-mcp: server "${serverName}": "headers" must be a JSON object; the value was ignored.`, + ); + }, + }); + if (sseLaunch.ok) { + const headers: Record = { + ...sseLaunch.config.headers, + }; + const hasHeaders = Object.keys(headers).length > 0; + const transport = new SSEClientTransport(new URL(sseLaunch.config.url), { + // Apply headers to POST requests (tool calls, listTools, etc.). + requestInit: hasHeaders ? { headers } : undefined, + // Apply headers to the initial SSE GET handshake (required for auth). + eventSourceInit: hasHeaders + ? { + fetch: (url, init) => + fetch(url, { + ...init, + headers: { ...headers, ...(init?.headers as Record) }, + }), + } + : undefined, + }); + return { + transport, + description: describeSseMcpServerLaunchConfig(sseLaunch.config), + }; + } + + logWarn( + `bundle-mcp: skipped server "${serverName}" because ${stdioLaunch.reason} and ${sseLaunch.reason}.`, + ); + return null; +} + export async function createBundleMcpToolRuntime(params: { workspaceDir: string; cfg?: OpenClawConfig; @@ -144,20 +217,11 @@ export async function createBundleMcpToolRuntime(params: { try { for (const [serverName, rawServer] of Object.entries(loaded.mcpServers)) { - const launch = resolveStdioMcpServerLaunchConfig(rawServer); - if (!launch.ok) { - logWarn(`bundle-mcp: skipped server "${serverName}" because ${launch.reason}.`); + const resolved = resolveTransport(serverName, rawServer); + if (!resolved) { continue; } - const launchConfig = launch.config; - const transport = new StdioClientTransport({ - command: launchConfig.command, - args: launchConfig.args, - env: launchConfig.env, - cwd: launchConfig.cwd, - stderr: "pipe", - }); const client = new Client( { name: "openclaw-bundle-mcp", @@ -168,12 +232,12 @@ export async function createBundleMcpToolRuntime(params: { const session: BundleMcpSession = { serverName, client, - transport, - detachStderr: attachStderrLogging(serverName, transport), + transport: resolved.transport, + detachStderr: resolved.detachStderr, }; try { - await client.connect(transport); + await client.connect(resolved.transport); const listedTools = await listAllTools(client); sessions.push(session); for (const tool of listedTools) { @@ -193,7 +257,7 @@ export async function createBundleMcpToolRuntime(params: { label: tool.title ?? tool.name, description: tool.description?.trim() || - `Provided by bundle MCP server "${serverName}" (${describeStdioMcpServerLaunchConfig(launchConfig)}).`, + `Provided by bundle MCP server "${serverName}" (${resolved.description}).`, parameters: tool.inputSchema, execute: async (_toolCallId, input) => { const result = (await client.callTool({ @@ -210,7 +274,7 @@ export async function createBundleMcpToolRuntime(params: { } } catch (error) { logWarn( - `bundle-mcp: failed to start server "${serverName}" (${describeStdioMcpServerLaunchConfig(launchConfig)}): ${String(error)}`, + `bundle-mcp: failed to start server "${serverName}" (${resolved.description}): ${String(error)}`, ); await disposeSession(session); } diff --git a/src/config/types.mcp.ts b/src/config/types.mcp.ts index 9d6b5e5a1d6..fcc5297434e 100644 --- a/src/config/types.mcp.ts +++ b/src/config/types.mcp.ts @@ -1,10 +1,18 @@ export type McpServerConfig = { + /** Stdio transport: command to spawn. */ command?: string; + /** Stdio transport: arguments for the command. */ args?: string[]; + /** Environment variables passed to the server process (stdio only). */ env?: Record; + /** Working directory for stdio server. */ cwd?: string; + /** Alias for cwd. */ workingDirectory?: string; + /** SSE transport: URL of the remote MCP server (http or https). */ url?: string; + /** SSE transport: extra HTTP headers sent with every request. */ + headers?: Record; [key: string]: unknown; };