From 5f69406a6a2af58435a51dc2842198d07b01e978 Mon Sep 17 00:00:00 2001 From: Marcelo Trylesinski Date: Mon, 21 Jul 2025 10:43:04 +0200 Subject: [PATCH 1/3] Rename `streamablehttp_client` to `streamable_http_client` --- README.md | 8 +-- .../mcp_simple_auth_client/main.py | 12 ++-- examples/snippets/clients/oauth_client.py | 4 +- examples/snippets/clients/streamable_basic.py | 4 +- src/mcp/client/session_group.py | 6 +- src/mcp/client/streamable_http.py | 23 ++++++- tests/client/test_session_group.py | 4 +- tests/server/fastmcp/test_integration.py | 4 +- tests/shared/test_streamable_http.py | 60 +++++++++---------- 9 files changed, 71 insertions(+), 54 deletions(-) diff --git a/README.md b/README.md index 993b6006b..fb29ecd84 100644 --- a/README.md +++ b/README.md @@ -1471,12 +1471,12 @@ Run from the repository root: import asyncio from mcp import ClientSession -from mcp.client.streamable_http import streamablehttp_client +from mcp.client.streamable_http import streamable_http_client async def main(): # Connect to a streamable HTTP server - async with streamablehttp_client("http://localhost:8000/mcp") as ( + async with streamable_http_client("http://localhost:8000/mcp") as ( read_stream, write_stream, _, @@ -1604,7 +1604,7 @@ from pydantic import AnyUrl from mcp import ClientSession from mcp.client.auth import OAuthClientProvider, TokenStorage -from mcp.client.streamable_http import streamablehttp_client +from mcp.client.streamable_http import streamable_http_client from mcp.shared.auth import OAuthClientInformationFull, OAuthClientMetadata, OAuthToken @@ -1658,7 +1658,7 @@ async def main(): callback_handler=handle_callback, ) - async with streamablehttp_client("http://localhost:8001/mcp", auth=oauth_auth) as (read, write, _): + async with streamable_http_client("http://localhost:8001/mcp", auth=oauth_auth) as (read, write, _): async with ClientSession(read, write) as session: await session.initialize() diff --git a/examples/clients/simple-auth-client/mcp_simple_auth_client/main.py b/examples/clients/simple-auth-client/mcp_simple_auth_client/main.py index 7a9e32279..06d5b1274 100644 --- a/examples/clients/simple-auth-client/mcp_simple_auth_client/main.py +++ b/examples/clients/simple-auth-client/mcp_simple_auth_client/main.py @@ -19,7 +19,7 @@ from mcp.client.auth import OAuthClientProvider, TokenStorage from mcp.client.session import ClientSession from mcp.client.sse import sse_client -from mcp.client.streamable_http import streamablehttp_client +from mcp.client.streamable_http import streamable_http_client from mcp.shared.auth import OAuthClientInformationFull, OAuthClientMetadata, OAuthToken @@ -188,9 +188,7 @@ async def _default_redirect_handler(authorization_url: str) -> None: # Create OAuth authentication handler using the new interface oauth_auth = OAuthClientProvider( server_url=self.server_url.replace("/mcp", ""), - client_metadata=OAuthClientMetadata.model_validate( - client_metadata_dict - ), + client_metadata=OAuthClientMetadata.model_validate(client_metadata_dict), storage=InMemoryTokenStorage(), redirect_handler=_default_redirect_handler, callback_handler=callback_handler, @@ -207,7 +205,7 @@ async def _default_redirect_handler(authorization_url: str) -> None: await self._run_session(read_stream, write_stream, None) else: print("šŸ“” Opening StreamableHTTP transport connection with auth...") - async with streamablehttp_client( + async with streamable_http_client( url=self.server_url, auth=oauth_auth, timeout=timedelta(seconds=60), @@ -322,9 +320,7 @@ async def interactive_loop(self): await self.call_tool(tool_name, arguments) else: - print( - "āŒ Unknown command. Try 'list', 'call ', or 'quit'" - ) + print("āŒ Unknown command. Try 'list', 'call ', or 'quit'") except KeyboardInterrupt: print("\n\nšŸ‘‹ Goodbye!") diff --git a/examples/snippets/clients/oauth_client.py b/examples/snippets/clients/oauth_client.py index 45026590a..38bf7f95f 100644 --- a/examples/snippets/clients/oauth_client.py +++ b/examples/snippets/clients/oauth_client.py @@ -14,7 +14,7 @@ from mcp import ClientSession from mcp.client.auth import OAuthClientProvider, TokenStorage -from mcp.client.streamable_http import streamablehttp_client +from mcp.client.streamable_http import streamable_http_client from mcp.shared.auth import OAuthClientInformationFull, OAuthClientMetadata, OAuthToken @@ -68,7 +68,7 @@ async def main(): callback_handler=handle_callback, ) - async with streamablehttp_client("http://localhost:8001/mcp", auth=oauth_auth) as (read, write, _): + async with streamable_http_client("http://localhost:8001/mcp", auth=oauth_auth) as (read, write, _): async with ClientSession(read, write) as session: await session.initialize() diff --git a/examples/snippets/clients/streamable_basic.py b/examples/snippets/clients/streamable_basic.py index 108439613..071ea8155 100644 --- a/examples/snippets/clients/streamable_basic.py +++ b/examples/snippets/clients/streamable_basic.py @@ -6,12 +6,12 @@ import asyncio from mcp import ClientSession -from mcp.client.streamable_http import streamablehttp_client +from mcp.client.streamable_http import streamable_http_client async def main(): # Connect to a streamable HTTP server - async with streamablehttp_client("http://localhost:8000/mcp") as ( + async with streamable_http_client("http://localhost:8000/mcp") as ( read_stream, write_stream, _, diff --git a/src/mcp/client/session_group.py b/src/mcp/client/session_group.py index 700b5417f..606091c54 100644 --- a/src/mcp/client/session_group.py +++ b/src/mcp/client/session_group.py @@ -23,7 +23,7 @@ from mcp import types from mcp.client.sse import sse_client from mcp.client.stdio import StdioServerParameters -from mcp.client.streamable_http import streamablehttp_client +from mcp.client.streamable_http import streamable_http_client from mcp.shared.exceptions import McpError @@ -44,7 +44,7 @@ class SseServerParameters(BaseModel): class StreamableHttpParameters(BaseModel): - """Parameters for intializing a streamablehttp_client.""" + """Parameters for intializing a streamable_http_client.""" # The endpoint URL. url: str @@ -250,7 +250,7 @@ async def _establish_session( ) read, write = await session_stack.enter_async_context(client) else: - client = streamablehttp_client( + client = streamable_http_client( url=server_params.url, headers=server_params.headers, timeout=server_params.timeout, diff --git a/src/mcp/client/streamable_http.py b/src/mcp/client/streamable_http.py index 63b09133f..dfacbf132 100644 --- a/src/mcp/client/streamable_http.py +++ b/src/mcp/client/streamable_http.py @@ -17,6 +17,7 @@ from anyio.abc import TaskGroup from anyio.streams.memory import MemoryObjectReceiveStream, MemoryObjectSendStream from httpx_sse import EventSource, ServerSentEvent, aconnect_sse +from typing_extensions import deprecated from mcp.shared._httpx_utils import McpHttpClientFactory, create_mcp_http_client from mcp.shared.message import ClientMessageMetadata, SessionMessage @@ -438,7 +439,7 @@ def get_session_id(self) -> str | None: @asynccontextmanager -async def streamablehttp_client( +async def streamable_http_client( url: str, headers: dict[str, str] | None = None, timeout: float | timedelta = 30, @@ -507,3 +508,23 @@ def start_get_stream() -> None: finally: await read_stream_writer.aclose() await write_stream.aclose() + + +@deprecated("Use `streamable_http_client` instead.") +@asynccontextmanager +async def streamablehttp_client( + url: str, + headers: dict[str, str] | None = None, + timeout: float | timedelta = 30, + sse_read_timeout: float | timedelta = 60 * 5, + terminate_on_close: bool = True, +) -> AsyncGenerator[ + tuple[ + MemoryObjectReceiveStream[SessionMessage | Exception], + MemoryObjectSendStream[SessionMessage], + GetSessionIdCallback, + ], + None, +]: + async with streamable_http_client(url, headers, timeout, sse_read_timeout, terminate_on_close) as streams: + yield streams diff --git a/tests/client/test_session_group.py b/tests/client/test_session_group.py index 16a887e00..6141a8fd0 100644 --- a/tests/client/test_session_group.py +++ b/tests/client/test_session_group.py @@ -276,7 +276,7 @@ async def test_disconnect_non_existent_server(self): ( StreamableHttpParameters(url="http://test.com/stream", terminate_on_close=False), "streamablehttp", - "mcp.client.session_group.streamablehttp_client", + "mcp.client.session_group.streamable_http_client", ), # url, headers, timeout, sse_read_timeout, terminate_on_close ], ) @@ -292,7 +292,7 @@ async def test_establish_session_parameterized( mock_read_stream = mock.AsyncMock(name=f"{client_type_name}Read") mock_write_stream = mock.AsyncMock(name=f"{client_type_name}Write") - # streamablehttp_client's __aenter__ returns three values + # streamable_http_client's __aenter__ returns three values if client_type_name == "streamablehttp": mock_extra_stream_val = mock.AsyncMock(name="StreamableExtra") mock_client_cm_instance.__aenter__.return_value = ( diff --git a/tests/server/fastmcp/test_integration.py b/tests/server/fastmcp/test_integration.py index 377e4923b..7f8a48218 100644 --- a/tests/server/fastmcp/test_integration.py +++ b/tests/server/fastmcp/test_integration.py @@ -29,7 +29,7 @@ ) from mcp.client.session import ClientSession from mcp.client.sse import sse_client -from mcp.client.streamable_http import streamablehttp_client +from mcp.client.streamable_http import streamable_http_client from mcp.types import ( CreateMessageResult, ElicitResult, @@ -172,7 +172,7 @@ def create_client_for_transport(transport: str, server_url: str): return sse_client(endpoint) elif transport == "streamable-http": endpoint = f"{server_url}/mcp" - return streamablehttp_client(endpoint) + return streamable_http_client(endpoint) else: raise ValueError(f"Invalid transport: {transport}") diff --git a/tests/shared/test_streamable_http.py b/tests/shared/test_streamable_http.py index 3fea54f0b..871124ffd 100644 --- a/tests/shared/test_streamable_http.py +++ b/tests/shared/test_streamable_http.py @@ -23,7 +23,7 @@ import mcp.types as types from mcp.client.session import ClientSession -from mcp.client.streamable_http import streamablehttp_client +from mcp.client.streamable_http import streamable_http_client from mcp.server import Server from mcp.server.streamable_http import ( MCP_PROTOCOL_VERSION_HEADER, @@ -824,7 +824,7 @@ async def http_client(basic_server, basic_server_url): @pytest.fixture async def initialized_client_session(basic_server, basic_server_url): """Create initialized StreamableHTTP client session.""" - async with streamablehttp_client(f"{basic_server_url}/mcp") as ( + async with streamable_http_client(f"{basic_server_url}/mcp") as ( read_stream, write_stream, _, @@ -838,9 +838,9 @@ async def initialized_client_session(basic_server, basic_server_url): @pytest.mark.anyio -async def test_streamablehttp_client_basic_connection(basic_server, basic_server_url): +async def test_streamable_http_client_basic_connection(basic_server, basic_server_url): """Test basic client connection with initialization.""" - async with streamablehttp_client(f"{basic_server_url}/mcp") as ( + async with streamable_http_client(f"{basic_server_url}/mcp") as ( read_stream, write_stream, _, @@ -856,7 +856,7 @@ async def test_streamablehttp_client_basic_connection(basic_server, basic_server @pytest.mark.anyio -async def test_streamablehttp_client_resource_read(initialized_client_session): +async def test_streamable_http_client_resource_read(initialized_client_session): """Test client resource read functionality.""" response = await initialized_client_session.read_resource(uri=AnyUrl("foobar://test-resource")) assert len(response.contents) == 1 @@ -865,7 +865,7 @@ async def test_streamablehttp_client_resource_read(initialized_client_session): @pytest.mark.anyio -async def test_streamablehttp_client_tool_invocation(initialized_client_session): +async def test_streamable_http_client_tool_invocation(initialized_client_session): """Test client tool invocation.""" # First list tools tools = await initialized_client_session.list_tools() @@ -880,7 +880,7 @@ async def test_streamablehttp_client_tool_invocation(initialized_client_session) @pytest.mark.anyio -async def test_streamablehttp_client_error_handling(initialized_client_session): +async def test_streamable_http_client_error_handling(initialized_client_session): """Test error handling in client.""" with pytest.raises(McpError) as exc_info: await initialized_client_session.read_resource(uri=AnyUrl("unknown://test-error")) @@ -889,9 +889,9 @@ async def test_streamablehttp_client_error_handling(initialized_client_session): @pytest.mark.anyio -async def test_streamablehttp_client_session_persistence(basic_server, basic_server_url): +async def test_streamable_http_client_session_persistence(basic_server, basic_server_url): """Test that session ID persists across requests.""" - async with streamablehttp_client(f"{basic_server_url}/mcp") as ( + async with streamable_http_client(f"{basic_server_url}/mcp") as ( read_stream, write_stream, _, @@ -917,9 +917,9 @@ async def test_streamablehttp_client_session_persistence(basic_server, basic_ser @pytest.mark.anyio -async def test_streamablehttp_client_json_response(json_response_server, json_server_url): +async def test_streamable_http_client_json_response(json_response_server, json_server_url): """Test client with JSON response mode.""" - async with streamablehttp_client(f"{json_server_url}/mcp") as ( + async with streamable_http_client(f"{json_server_url}/mcp") as ( read_stream, write_stream, _, @@ -945,7 +945,7 @@ async def test_streamablehttp_client_json_response(json_response_server, json_se @pytest.mark.anyio -async def test_streamablehttp_client_get_stream(basic_server, basic_server_url): +async def test_streamable_http_client_get_stream(basic_server, basic_server_url): """Test GET stream functionality for server-initiated messages.""" import mcp.types as types from mcp.shared.session import RequestResponder @@ -959,7 +959,7 @@ async def message_handler( if isinstance(message, types.ServerNotification): notifications_received.append(message) - async with streamablehttp_client(f"{basic_server_url}/mcp") as ( + async with streamable_http_client(f"{basic_server_url}/mcp") as ( read_stream, write_stream, _, @@ -986,13 +986,13 @@ async def message_handler( @pytest.mark.anyio -async def test_streamablehttp_client_session_termination(basic_server, basic_server_url): +async def test_streamable_http_client_session_termination(basic_server, basic_server_url): """Test client session termination functionality.""" captured_session_id = None - # Create the streamablehttp_client with a custom httpx client to capture headers - async with streamablehttp_client(f"{basic_server_url}/mcp") as ( + # Create the streamable_http_client with a custom httpx client to capture headers + async with streamable_http_client(f"{basic_server_url}/mcp") as ( read_stream, write_stream, get_session_id, @@ -1012,7 +1012,7 @@ async def test_streamablehttp_client_session_termination(basic_server, basic_ser if captured_session_id: headers[MCP_SESSION_ID_HEADER] = captured_session_id - async with streamablehttp_client(f"{basic_server_url}/mcp", headers=headers) as ( + async with streamable_http_client(f"{basic_server_url}/mcp", headers=headers) as ( read_stream, write_stream, _, @@ -1027,7 +1027,7 @@ async def test_streamablehttp_client_session_termination(basic_server, basic_ser @pytest.mark.anyio -async def test_streamablehttp_client_session_termination_204(basic_server, basic_server_url, monkeypatch): +async def test_streamable_http_client_session_termination_204(basic_server, basic_server_url, monkeypatch): """Test client session termination functionality with a 204 response. This test patches the httpx client to return a 204 response for DELETEs. @@ -1055,8 +1055,8 @@ async def mock_delete(self, *args, **kwargs): captured_session_id = None - # Create the streamablehttp_client with a custom httpx client to capture headers - async with streamablehttp_client(f"{basic_server_url}/mcp") as ( + # Create the streamable_http_client with a custom httpx client to capture headers + async with streamable_http_client(f"{basic_server_url}/mcp") as ( read_stream, write_stream, get_session_id, @@ -1076,7 +1076,7 @@ async def mock_delete(self, *args, **kwargs): if captured_session_id: headers[MCP_SESSION_ID_HEADER] = captured_session_id - async with streamablehttp_client(f"{basic_server_url}/mcp", headers=headers) as ( + async with streamable_http_client(f"{basic_server_url}/mcp", headers=headers) as ( read_stream, write_stream, _, @@ -1091,7 +1091,7 @@ async def mock_delete(self, *args, **kwargs): @pytest.mark.anyio -async def test_streamablehttp_client_resumption(event_server): +async def test_streamable_http_client_resumption(event_server): """Test client session resumption using sync primitives for reliable coordination.""" _, server_url = event_server @@ -1118,7 +1118,7 @@ async def on_resumption_token_update(token: str) -> None: captured_resumption_token = token # First, start the client session and begin the tool that waits on lock - async with streamablehttp_client(f"{server_url}/mcp", terminate_on_close=False) as ( + async with streamable_http_client(f"{server_url}/mcp", terminate_on_close=False) as ( read_stream, write_stream, get_session_id, @@ -1175,7 +1175,7 @@ async def run_tool(): headers[MCP_SESSION_ID_HEADER] = captured_session_id if captured_protocol_version: headers[MCP_PROTOCOL_VERSION_HEADER] = captured_protocol_version - async with streamablehttp_client(f"{server_url}/mcp", headers=headers) as ( + async with streamable_http_client(f"{server_url}/mcp", headers=headers) as ( read_stream, write_stream, _, @@ -1242,7 +1242,7 @@ async def sampling_callback( ) # Create client with sampling callback - async with streamablehttp_client(f"{basic_server_url}/mcp") as ( + async with streamable_http_client(f"{basic_server_url}/mcp") as ( read_stream, write_stream, _, @@ -1403,7 +1403,7 @@ async def test_streamablehttp_request_context_propagation(context_aware_server: "X-Trace-Id": "trace-123", } - async with streamablehttp_client(f"{basic_server_url}/mcp", headers=custom_headers) as ( + async with streamable_http_client(f"{basic_server_url}/mcp", headers=custom_headers) as ( read_stream, write_stream, _, @@ -1440,7 +1440,7 @@ async def test_streamablehttp_request_context_isolation(context_aware_server: No "Authorization": f"Bearer token-{i}", } - async with streamablehttp_client(f"{basic_server_url}/mcp", headers=headers) as (read_stream, write_stream, _): + async with streamable_http_client(f"{basic_server_url}/mcp", headers=headers) as (read_stream, write_stream, _): async with ClientSession(read_stream, write_stream) as session: await session.initialize() @@ -1464,7 +1464,7 @@ async def test_streamablehttp_request_context_isolation(context_aware_server: No @pytest.mark.anyio async def test_client_includes_protocol_version_header_after_init(context_aware_server, basic_server_url): """Test that client includes mcp-protocol-version header after initialization.""" - async with streamablehttp_client(f"{basic_server_url}/mcp") as ( + async with streamable_http_client(f"{basic_server_url}/mcp") as ( read_stream, write_stream, _, @@ -1580,7 +1580,7 @@ async def test_client_crash_handled(basic_server, basic_server_url): # Simulate bad client that crashes after init async def bad_client(): """Client that triggers ClosedResourceError""" - async with streamablehttp_client(f"{basic_server_url}/mcp") as ( + async with streamable_http_client(f"{basic_server_url}/mcp") as ( read_stream, write_stream, _, @@ -1598,7 +1598,7 @@ async def bad_client(): await anyio.sleep(0.1) # Try a good client, it should still be able to connect and list tools - async with streamablehttp_client(f"{basic_server_url}/mcp") as ( + async with streamable_http_client(f"{basic_server_url}/mcp") as ( read_stream, write_stream, _, From 839f9d14a2795c80c81631c492d859997ba9a6c4 Mon Sep 17 00:00:00 2001 From: Marcelo Trylesinski Date: Mon, 21 Jul 2025 10:44:37 +0200 Subject: [PATCH 2/3] Apply pre-commit --- .../simple-auth-client/mcp_simple_auth_client/main.py | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/examples/clients/simple-auth-client/mcp_simple_auth_client/main.py b/examples/clients/simple-auth-client/mcp_simple_auth_client/main.py index 06d5b1274..c207bd4e1 100644 --- a/examples/clients/simple-auth-client/mcp_simple_auth_client/main.py +++ b/examples/clients/simple-auth-client/mcp_simple_auth_client/main.py @@ -188,7 +188,9 @@ async def _default_redirect_handler(authorization_url: str) -> None: # Create OAuth authentication handler using the new interface oauth_auth = OAuthClientProvider( server_url=self.server_url.replace("/mcp", ""), - client_metadata=OAuthClientMetadata.model_validate(client_metadata_dict), + client_metadata=OAuthClientMetadata.model_validate( + client_metadata_dict + ), storage=InMemoryTokenStorage(), redirect_handler=_default_redirect_handler, callback_handler=callback_handler, @@ -320,7 +322,9 @@ async def interactive_loop(self): await self.call_tool(tool_name, arguments) else: - print("āŒ Unknown command. Try 'list', 'call ', or 'quit'") + print( + "āŒ Unknown command. Try 'list', 'call ', or 'quit'" + ) except KeyboardInterrupt: print("\n\nšŸ‘‹ Goodbye!") From 9137b982f04d775b3350ee517292c66c2ff1bc52 Mon Sep 17 00:00:00 2001 From: Marcelo Trylesinski Date: Mon, 21 Jul 2025 12:17:21 +0200 Subject: [PATCH 3/3] Add missing paramters --- src/mcp/client/streamable_http.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/src/mcp/client/streamable_http.py b/src/mcp/client/streamable_http.py index dfacbf132..7b9d7d30d 100644 --- a/src/mcp/client/streamable_http.py +++ b/src/mcp/client/streamable_http.py @@ -518,6 +518,8 @@ async def streamablehttp_client( timeout: float | timedelta = 30, sse_read_timeout: float | timedelta = 60 * 5, terminate_on_close: bool = True, + httpx_client_factory: McpHttpClientFactory = create_mcp_http_client, + auth: httpx.Auth | None = None, ) -> AsyncGenerator[ tuple[ MemoryObjectReceiveStream[SessionMessage | Exception], @@ -526,5 +528,7 @@ async def streamablehttp_client( ], None, ]: - async with streamable_http_client(url, headers, timeout, sse_read_timeout, terminate_on_close) as streams: + async with streamable_http_client( + url, headers, timeout, sse_read_timeout, terminate_on_close, httpx_client_factory, auth + ) as streams: yield streams