|
1 | 1 | """
|
2 | 2 | MCP Client Manager
|
3 | 3 |
|
4 |
| -This class is responsible for managing MCP SSE clients. |
| 4 | +This class is responsible for managing MCP clients with support for both SSE and HTTP streamable transports. |
5 | 5 |
|
6 | 6 | This is a Proxy
|
7 | 7 | """
|
|
25 | 25 | MCPTransport,
|
26 | 26 | MCPTransportType,
|
27 | 27 | )
|
| 28 | + |
| 29 | +try: |
| 30 | + from mcp.client.streamable_http import streamablehttp_client |
| 31 | +except ImportError: |
| 32 | + streamablehttp_client = None # type: ignore |
| 33 | + |
28 | 34 | from litellm.types.mcp_server.mcp_server_manager import MCPInfo, MCPServer
|
29 | 35 |
|
30 | 36 |
|
@@ -169,16 +175,43 @@ async def _get_tools_from_server(self, server: MCPServer) -> List[MCPTool]:
|
169 | 175 |
|
170 | 176 | # Update tool to server mapping
|
171 | 177 | for tool in tools_result.tools:
|
172 |
| - self.tool_name_to_mcp_server_name_mapping[ |
173 |
| - tool.name |
174 |
| - ] = server.name |
| 178 | + self.tool_name_to_mcp_server_name_mapping[tool.name] = ( |
| 179 | + server.name |
| 180 | + ) |
175 | 181 |
|
176 | 182 | return tools_result.tools
|
177 | 183 | elif server.transport == MCPTransport.http:
|
178 |
| - # TODO: implement http transport |
179 |
| - return [] |
| 184 | + if streamablehttp_client is None: |
| 185 | + verbose_logger.error( |
| 186 | + "streamablehttp_client not available - install mcp with HTTP support" |
| 187 | + ) |
| 188 | + raise ValueError( |
| 189 | + "streamablehttp_client not available - please run `pip install mcp -U`" |
| 190 | + ) |
| 191 | + verbose_logger.debug(f"Using HTTP streamable transport for {server.url}") |
| 192 | + async with streamablehttp_client( |
| 193 | + url=server.url, |
| 194 | + ) as (read_stream, write_stream, get_session_id): |
| 195 | + async with ClientSession(read_stream, write_stream) as session: |
| 196 | + await session.initialize() |
| 197 | + |
| 198 | + if get_session_id is not None: |
| 199 | + session_id = get_session_id() |
| 200 | + if session_id: |
| 201 | + verbose_logger.debug(f"HTTP session ID: {session_id}") |
| 202 | + |
| 203 | + tools_result = await session.list_tools() |
| 204 | + verbose_logger.debug(f"Tools from {server.name}: {tools_result}") |
| 205 | + |
| 206 | + # Update tool to server mapping |
| 207 | + for tool in tools_result.tools: |
| 208 | + self.tool_name_to_mcp_server_name_mapping[tool.name] = ( |
| 209 | + server.name |
| 210 | + ) |
| 211 | + |
| 212 | + return tools_result.tools |
180 | 213 | else:
|
181 |
| - # TODO: throw error on transport found or skip |
| 214 | + verbose_logger.warning(f"Unsupported transport type: {server.transport}") |
182 | 215 | return []
|
183 | 216 |
|
184 | 217 | def initialize_tool_name_to_mcp_server_name_mapping(self):
|
@@ -217,8 +250,30 @@ async def call_tool(self, name: str, arguments: Dict[str, Any]):
|
217 | 250 | await session.initialize()
|
218 | 251 | return await session.call_tool(name, arguments)
|
219 | 252 | elif mcp_server.transport == MCPTransport.http:
|
220 |
| - # TODO: implement http transport |
221 |
| - raise NotImplementedError("HTTP transport is not implemented yet") |
| 253 | + if streamablehttp_client is None: |
| 254 | + verbose_logger.error( |
| 255 | + "streamablehttp_client not available - install mcp with HTTP support" |
| 256 | + ) |
| 257 | + raise ValueError( |
| 258 | + "streamablehttp_client not available - please run `pip install mcp -U`" |
| 259 | + ) |
| 260 | + verbose_logger.debug( |
| 261 | + f"Using HTTP streamable transport for tool call: {name}" |
| 262 | + ) |
| 263 | + async with streamablehttp_client( |
| 264 | + url=mcp_server.url, |
| 265 | + ) as (read_stream, write_stream, get_session_id): |
| 266 | + async with ClientSession(read_stream, write_stream) as session: |
| 267 | + await session.initialize() |
| 268 | + |
| 269 | + if get_session_id is not None: |
| 270 | + session_id = get_session_id() |
| 271 | + if session_id: |
| 272 | + verbose_logger.debug( |
| 273 | + f"HTTP session ID for tool call: {session_id}" |
| 274 | + ) |
| 275 | + |
| 276 | + return await session.call_tool(name, arguments) |
222 | 277 | else:
|
223 | 278 | return CallToolResult(content=[], isError=True)
|
224 | 279 |
|
|
0 commit comments