Merge a7b2e95d4c337ed292b1adcf5a4f9277074f9a7e into 598f1826d8b2bc969aace2c6459824737667218c

This commit is contained in:
Dhananjai Krishnakumar P M 2026-03-20 23:15:55 -04:00 committed by GitHub
commit d675dbbc2c
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 399 additions and 18 deletions

125
src/agents/mcp-sse.test.ts Normal file
View File

@ -0,0 +1,125 @@
import { describe, expect, it } from "vitest";
import { describeSseMcpServerLaunchConfig, resolveSseMcpServerLaunchConfig } from "./mcp-sse.js";
describe("resolveSseMcpServerLaunchConfig", () => {
it("resolves a valid https URL", () => {
const result = resolveSseMcpServerLaunchConfig({
url: "https://mcp.example.com/sse",
});
expect(result).toEqual({
ok: true,
config: {
url: "https://mcp.example.com/sse",
headers: undefined,
},
});
});
it("resolves a valid http URL", () => {
const result = resolveSseMcpServerLaunchConfig({
url: "http://localhost:3000/sse",
});
expect(result).toEqual({
ok: true,
config: {
url: "http://localhost:3000/sse",
headers: undefined,
},
});
});
it("includes headers when provided", () => {
const result = resolveSseMcpServerLaunchConfig({
url: "https://mcp.example.com/sse",
headers: {
Authorization: "Bearer token123",
"X-Custom": "value",
},
});
expect(result).toEqual({
ok: true,
config: {
url: "https://mcp.example.com/sse",
headers: {
Authorization: "Bearer token123",
"X-Custom": "value",
},
},
});
});
it("coerces numeric and boolean header values to strings", () => {
const result = resolveSseMcpServerLaunchConfig({
url: "https://mcp.example.com/sse",
headers: { "X-Count": 42, "X-Debug": true },
});
expect(result).toEqual({
ok: true,
config: {
url: "https://mcp.example.com/sse",
headers: { "X-Count": "42", "X-Debug": "true" },
},
});
});
it("rejects non-object input", () => {
const result = resolveSseMcpServerLaunchConfig("not-an-object");
expect(result.ok).toBe(false);
if (!result.ok) {
expect(result.reason).toContain("must be an object");
}
});
it("rejects missing url", () => {
const result = resolveSseMcpServerLaunchConfig({ command: "npx" });
expect(result.ok).toBe(false);
if (!result.ok) {
expect(result.reason).toContain("url is missing");
}
});
it("rejects empty url", () => {
const result = resolveSseMcpServerLaunchConfig({ url: " " });
expect(result.ok).toBe(false);
if (!result.ok) {
expect(result.reason).toContain("url is missing");
}
});
it("rejects invalid URL format", () => {
const result = resolveSseMcpServerLaunchConfig({ url: "not-a-url" });
expect(result.ok).toBe(false);
if (!result.ok) {
expect(result.reason).toContain("not a valid URL");
}
});
it("rejects non-http protocols", () => {
const result = resolveSseMcpServerLaunchConfig({ url: "ftp://example.com/sse" });
expect(result.ok).toBe(false);
if (!result.ok) {
expect(result.reason).toContain("only http and https");
}
});
it("trims whitespace from url", () => {
const result = resolveSseMcpServerLaunchConfig({
url: " https://mcp.example.com/sse ",
});
expect(result).toEqual({
ok: true,
config: {
url: "https://mcp.example.com/sse",
headers: undefined,
},
});
});
});
describe("describeSseMcpServerLaunchConfig", () => {
it("returns the url", () => {
expect(describeSseMcpServerLaunchConfig({ url: "https://mcp.example.com/sse" })).toBe(
"https://mcp.example.com/sse",
);
});
});

107
src/agents/mcp-sse.ts Normal file
View File

@ -0,0 +1,107 @@
type SseMcpServerLaunchConfig = {
url: string;
headers?: Record<string, string>;
};
type SseMcpServerLaunchResult =
| { ok: true; config: SseMcpServerLaunchConfig }
| { ok: false; reason: string };
function isRecord(value: unknown): value is Record<string, unknown> {
return value !== null && typeof value === "object" && !Array.isArray(value);
}
function toStringRecord(
value: unknown,
warnDropped?: (key: string, entry: unknown) => void,
): Record<string, string> | undefined {
if (!isRecord(value)) {
return undefined;
}
const entries = Object.entries(value)
.map(([key, entry]) => {
if (typeof entry === "string") {
return [key, entry] as const;
}
if (typeof entry === "number" || typeof entry === "boolean") {
return [key, String(entry)] as const;
}
warnDropped?.(key, entry);
return null;
})
.filter((entry): entry is readonly [string, string] => entry !== null);
return entries.length > 0 ? Object.fromEntries(entries) : undefined;
}
export function resolveSseMcpServerLaunchConfig(
raw: unknown,
options?: {
onDroppedHeader?: (key: string, value: unknown) => void;
onMalformedHeaders?: (value: unknown) => void;
},
): SseMcpServerLaunchResult {
if (!isRecord(raw)) {
return { ok: false, reason: "server config must be an object" };
}
if (typeof raw.url !== "string" || raw.url.trim().length === 0) {
return { ok: false, reason: "its url is missing" };
}
const url = raw.url.trim();
let parsed: URL;
try {
parsed = new URL(url);
} catch {
return { ok: false, reason: `its url is not a valid URL: ${url}` };
}
if (parsed.protocol !== "http:" && parsed.protocol !== "https:") {
return {
ok: false,
reason: `only http and https URLs are supported, got ${parsed.protocol}`,
};
}
// Warn if headers is present but not an object (e.g. a string or array).
let headers: Record<string, string> | undefined;
if (raw.headers !== undefined && raw.headers !== null) {
if (!isRecord(raw.headers)) {
options?.onMalformedHeaders?.(raw.headers);
} else {
headers = toStringRecord(raw.headers, options?.onDroppedHeader);
}
}
return {
ok: true,
config: {
url,
headers,
},
};
}
export function describeSseMcpServerLaunchConfig(config: SseMcpServerLaunchConfig): string {
try {
const parsed = new URL(config.url);
// Redact embedded credentials and query-token auth from log/description output.
if (parsed.username || parsed.password) {
parsed.username = parsed.username ? "***" : "";
parsed.password = parsed.password ? "***" : "";
}
for (const key of parsed.searchParams.keys()) {
const lower = key.toLowerCase();
if (
lower === "token" ||
lower === "key" ||
lower === "api_key" ||
lower === "apikey" ||
lower === "secret" ||
lower === "access_token"
) {
parsed.searchParams.set(key, "***");
}
}
return parsed.toString();
} catch {
return config.url;
}
}
export type { SseMcpServerLaunchConfig, SseMcpServerLaunchResult };

View File

@ -47,7 +47,7 @@ export function resolveStdioMcpServerLaunchConfig(raw: unknown): StdioMcpServerL
if (typeof raw.url === "string" && raw.url.trim().length > 0) {
return {
ok: false,
reason: "only stdio MCP servers are supported right now",
reason: "not a stdio server (has url)",
};
}
return { ok: false, reason: "its command is missing" };

View File

@ -1,4 +1,5 @@
import fs from "node:fs/promises";
import http from "node:http";
import { createRequire } from "node:module";
import os from "node:os";
import path from "node:path";
@ -8,6 +9,7 @@ import { createBundleMcpToolRuntime } from "./pi-bundle-mcp-tools.js";
const require = createRequire(import.meta.url);
const SDK_SERVER_MCP_PATH = require.resolve("@modelcontextprotocol/sdk/server/mcp.js");
const SDK_SERVER_STDIO_PATH = require.resolve("@modelcontextprotocol/sdk/server/stdio.js");
const SDK_SERVER_SSE_PATH = require.resolve("@modelcontextprotocol/sdk/server/sse.js");
const tempDirs: string[] = [];
@ -181,4 +183,79 @@ describe("createBundleMcpToolRuntime", () => {
await runtime.dispose();
}
});
it("loads configured SSE MCP tools via url", async () => {
// Dynamically import the SSE server transport from the SDK.
const { McpServer } = await import(SDK_SERVER_MCP_PATH);
const { SSEServerTransport } = await import(SDK_SERVER_SSE_PATH);
const mcpServer = new McpServer({ name: "sse-probe", version: "1.0.0" });
mcpServer.tool("sse_probe", "SSE MCP probe", async () => {
return {
content: [{ type: "text", text: "FROM-SSE" }],
};
});
// Start an HTTP server that hosts the SSE MCP transport.
let sseTransport:
| {
handlePostMessage: (req: http.IncomingMessage, res: http.ServerResponse) => Promise<void>;
}
| undefined;
const httpServer = http.createServer(async (req, res) => {
if (req.url === "/sse") {
sseTransport = new SSEServerTransport("/messages", res);
await mcpServer.connect(sseTransport);
} else if (req.url?.startsWith("/messages") && req.method === "POST") {
if (sseTransport) {
await sseTransport.handlePostMessage(req, res);
} else {
res.writeHead(400).end("No SSE session");
}
} else {
res.writeHead(404).end();
}
});
await new Promise<void>((resolve) => {
httpServer.listen(0, "127.0.0.1", resolve);
});
const addr = httpServer.address();
const port = typeof addr === "object" && addr ? addr.port : 0;
try {
const workspaceDir = await makeTempDir("openclaw-bundle-mcp-sse-");
const runtime = await createBundleMcpToolRuntime({
workspaceDir,
cfg: {
mcp: {
servers: {
sseProbe: {
url: `http://127.0.0.1:${port}/sse`,
},
},
},
},
});
try {
expect(runtime.tools.map((tool) => tool.name)).toEqual(["sse_probe"]);
const result = await runtime.tools[0].execute("call-sse-probe", {}, undefined, undefined);
expect(result.content[0]).toMatchObject({
type: "text",
text: "FROM-SSE",
});
expect(result.details).toEqual({
mcpServer: "sseProbe",
mcpTool: "sse_probe",
});
} finally {
await runtime.dispose();
}
} finally {
await new Promise<void>((resolve, reject) =>
httpServer.close((err) => (err ? reject(err) : resolve())),
);
}
});
});

View File

@ -1,10 +1,13 @@
import type { AgentToolResult } from "@mariozechner/pi-agent-core";
import { Client } from "@modelcontextprotocol/sdk/client/index.js";
import { SSEClientTransport } from "@modelcontextprotocol/sdk/client/sse.js";
import { StdioClientTransport } from "@modelcontextprotocol/sdk/client/stdio.js";
import type { Transport } from "@modelcontextprotocol/sdk/shared/transport.js";
import type { CallToolResult } from "@modelcontextprotocol/sdk/types.js";
import type { OpenClawConfig } from "../config/config.js";
import { logDebug, logWarn } from "../logger.js";
import { loadEmbeddedPiMcpConfig } from "./embedded-pi-mcp.js";
import { describeSseMcpServerLaunchConfig, resolveSseMcpServerLaunchConfig } from "./mcp-sse.js";
import {
describeStdioMcpServerLaunchConfig,
resolveStdioMcpServerLaunchConfig,
@ -19,7 +22,7 @@ type BundleMcpToolRuntime = {
type BundleMcpSession = {
serverName: string;
client: Client;
transport: StdioClientTransport;
transport: Transport;
detachStderr?: () => void;
};
@ -119,6 +122,76 @@ async function disposeSession(session: BundleMcpSession) {
await session.transport.close().catch(() => {});
}
/** Try to create a stdio or SSE transport for the given raw server config. */
function resolveTransport(
serverName: string,
rawServer: unknown,
): {
transport: Transport;
description: string;
detachStderr?: () => void;
} | null {
// Try stdio first (command-based servers).
const stdioLaunch = resolveStdioMcpServerLaunchConfig(rawServer);
if (stdioLaunch.ok) {
const transport = new StdioClientTransport({
command: stdioLaunch.config.command,
args: stdioLaunch.config.args,
env: stdioLaunch.config.env,
cwd: stdioLaunch.config.cwd,
stderr: "pipe",
});
return {
transport,
description: describeStdioMcpServerLaunchConfig(stdioLaunch.config),
detachStderr: attachStderrLogging(serverName, transport),
};
}
// Try SSE (url-based servers).
const sseLaunch = resolveSseMcpServerLaunchConfig(rawServer, {
onDroppedHeader: (key) => {
logWarn(
`bundle-mcp: server "${serverName}": header "${key}" has an unsupported value type and was ignored.`,
);
},
onMalformedHeaders: () => {
logWarn(
`bundle-mcp: server "${serverName}": "headers" must be a JSON object; the value was ignored.`,
);
},
});
if (sseLaunch.ok) {
const headers: Record<string, string> = {
...sseLaunch.config.headers,
};
const hasHeaders = Object.keys(headers).length > 0;
const transport = new SSEClientTransport(new URL(sseLaunch.config.url), {
// Apply headers to POST requests (tool calls, listTools, etc.).
requestInit: hasHeaders ? { headers } : undefined,
// Apply headers to the initial SSE GET handshake (required for auth).
eventSourceInit: hasHeaders
? {
fetch: (url, init) =>
fetch(url, {
...init,
headers: { ...headers, ...(init?.headers as Record<string, string>) },
}),
}
: undefined,
});
return {
transport,
description: describeSseMcpServerLaunchConfig(sseLaunch.config),
};
}
logWarn(
`bundle-mcp: skipped server "${serverName}" because ${stdioLaunch.reason} and ${sseLaunch.reason}.`,
);
return null;
}
export async function createBundleMcpToolRuntime(params: {
workspaceDir: string;
cfg?: OpenClawConfig;
@ -144,20 +217,11 @@ export async function createBundleMcpToolRuntime(params: {
try {
for (const [serverName, rawServer] of Object.entries(loaded.mcpServers)) {
const launch = resolveStdioMcpServerLaunchConfig(rawServer);
if (!launch.ok) {
logWarn(`bundle-mcp: skipped server "${serverName}" because ${launch.reason}.`);
const resolved = resolveTransport(serverName, rawServer);
if (!resolved) {
continue;
}
const launchConfig = launch.config;
const transport = new StdioClientTransport({
command: launchConfig.command,
args: launchConfig.args,
env: launchConfig.env,
cwd: launchConfig.cwd,
stderr: "pipe",
});
const client = new Client(
{
name: "openclaw-bundle-mcp",
@ -168,12 +232,12 @@ export async function createBundleMcpToolRuntime(params: {
const session: BundleMcpSession = {
serverName,
client,
transport,
detachStderr: attachStderrLogging(serverName, transport),
transport: resolved.transport,
detachStderr: resolved.detachStderr,
};
try {
await client.connect(transport);
await client.connect(resolved.transport);
const listedTools = await listAllTools(client);
sessions.push(session);
for (const tool of listedTools) {
@ -193,7 +257,7 @@ export async function createBundleMcpToolRuntime(params: {
label: tool.title ?? tool.name,
description:
tool.description?.trim() ||
`Provided by bundle MCP server "${serverName}" (${describeStdioMcpServerLaunchConfig(launchConfig)}).`,
`Provided by bundle MCP server "${serverName}" (${resolved.description}).`,
parameters: tool.inputSchema,
execute: async (_toolCallId, input) => {
const result = (await client.callTool({
@ -210,7 +274,7 @@ export async function createBundleMcpToolRuntime(params: {
}
} catch (error) {
logWarn(
`bundle-mcp: failed to start server "${serverName}" (${describeStdioMcpServerLaunchConfig(launchConfig)}): ${String(error)}`,
`bundle-mcp: failed to start server "${serverName}" (${resolved.description}): ${String(error)}`,
);
await disposeSession(session);
}

View File

@ -1,10 +1,18 @@
export type McpServerConfig = {
/** Stdio transport: command to spawn. */
command?: string;
/** Stdio transport: arguments for the command. */
args?: string[];
/** Environment variables passed to the server process (stdio only). */
env?: Record<string, string | number | boolean>;
/** Working directory for stdio server. */
cwd?: string;
/** Alias for cwd. */
workingDirectory?: string;
/** SSE transport: URL of the remote MCP server (http or https). */
url?: string;
/** SSE transport: extra HTTP headers sent with every request. */
headers?: Record<string, string>;
[key: string]: unknown;
};