Skip to content

Commit c711f83

Browse files
tzolovchemicL
andcommitted
feat: implement Streamable HTTP HttpClient transport and improve error handling (#337)
- Add HttpClientStreamableHttpTransport implementing the 2025-03-26 MCP specification - Add ResponseSubscribers utility for handling different HTTP response body types - Refactor HttpClientSseClientTransport to use reactive streams instead of FlowSseClient - Remove FlowSseClient in favor of reactive stream patterns - Use Disposable-based connection management instead of CountDownLatch - Replace message endpoint discovery with Sinks.One approach - Improve error handling by replacing doOnError/onErrorResume with onErrorComplete - Add test coverage for new Streamable HTTP transport - Add resiliency tests for connection handling and session invalidation BREAKING CHANGE: FlowSseClient has been removed in favor of reactive streams Signed-off-by: Christian Tzolov <christian.tzolov@broadcom.com> Co-authored-by: Dariusz Jędrzejczyk <dariusz.jedrzejczyk@broadcom.com>
1 parent b328051 commit c711f83

21 files changed

+1601
-394
lines changed

mcp-spring/mcp-spring-webflux/src/main/java/io/modelcontextprotocol/client/transport/WebClientStreamableHttpTransport.java

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -129,11 +129,10 @@ private DefaultMcpTransportSession createTransportSession() {
129129
Function<String, Publisher<Void>> onClose = sessionId -> sessionId == null ? Mono.empty()
130130
: webClient.delete().uri(this.endpoint).headers(httpHeaders -> {
131131
httpHeaders.add("mcp-session-id", sessionId);
132-
})
133-
.retrieve()
134-
.toBodilessEntity()
135-
.doOnError(e -> logger.warn("Got error when closing transport", e))
136-
.then();
132+
}).retrieve().toBodilessEntity().onErrorComplete(e -> {
133+
logger.warn("Got error when closing transport", e);
134+
return true;
135+
}).then();
137136
return new DefaultMcpTransportSession(onClose);
138137
}
139138

@@ -305,12 +304,12 @@ else if (mediaType.isCompatibleWith(MediaType.APPLICATION_JSON)) {
305304
}
306305
})
307306
.flatMap(jsonRpcMessage -> this.handler.get().apply(Mono.just(jsonRpcMessage)))
308-
.onErrorResume(t -> {
307+
.onErrorComplete(t -> {
309308
// handle the error first
310309
this.handleException(t);
311310
// inform the caller of sendMessage
312311
sink.error(t);
313-
return Flux.empty();
312+
return true;
314313
})
315314
.doFinally(s -> {
316315
Disposable ref = disposableRef.getAndSet(null);

mcp-spring/mcp-spring-webflux/src/test/java/io/modelcontextprotocol/client/WebClientStreamableHttpAsyncClientTests.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,12 @@
11
package io.modelcontextprotocol.client;
22

3-
import io.modelcontextprotocol.client.transport.WebClientStreamableHttpTransport;
4-
import io.modelcontextprotocol.spec.McpClientTransport;
53
import org.junit.jupiter.api.Timeout;
4+
import org.springframework.web.reactive.function.client.WebClient;
65
import org.testcontainers.containers.GenericContainer;
76
import org.testcontainers.containers.wait.strategy.Wait;
87

9-
import org.springframework.web.reactive.function.client.WebClient;
8+
import io.modelcontextprotocol.client.transport.WebClientStreamableHttpTransport;
9+
import io.modelcontextprotocol.spec.McpClientTransport;
1010

1111
@Timeout(15)
1212
public class WebClientStreamableHttpAsyncClientTests extends AbstractMcpAsyncClientTests {

mcp-spring/mcp-spring-webflux/src/test/java/io/modelcontextprotocol/client/WebClientStreamableHttpSyncClientTests.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,12 @@
11
package io.modelcontextprotocol.client;
22

3-
import io.modelcontextprotocol.client.transport.WebClientStreamableHttpTransport;
4-
import io.modelcontextprotocol.spec.McpClientTransport;
53
import org.junit.jupiter.api.Timeout;
4+
import org.springframework.web.reactive.function.client.WebClient;
65
import org.testcontainers.containers.GenericContainer;
76
import org.testcontainers.containers.wait.strategy.Wait;
87

9-
import org.springframework.web.reactive.function.client.WebClient;
8+
import io.modelcontextprotocol.client.transport.WebClientStreamableHttpTransport;
9+
import io.modelcontextprotocol.spec.McpClientTransport;
1010

1111
@Timeout(15)
1212
public class WebClientStreamableHttpSyncClientTests extends AbstractMcpSyncClientTests {

mcp-spring/mcp-spring-webmvc/src/test/resources/logback.xml

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -9,16 +9,16 @@
99
</appender>
1010

1111
<!-- Main MCP package -->
12-
<logger name="io.modelcontextprotocol" level="DEBUG"/>
12+
<logger name="io.modelcontextprotocol" level="INFO"/>
1313

1414
<!-- Client packages -->
15-
<logger name="io.modelcontextprotocol.client" level="DEBUG"/>
15+
<logger name="io.modelcontextprotocol.client" level="INFO"/>
1616

1717
<!-- Server transport package -->
18-
<logger name="io.modelcontextprotocol.server.transport" level="DEBUG"/>
18+
<logger name="io.modelcontextprotocol.server.transport" level="INFO"/>
1919

2020
<!-- Spec package -->
21-
<logger name="io.modelcontextprotocol.spec" level="DEBUG"/>
21+
<logger name="io.modelcontextprotocol.spec" level="INFO"/>
2222

2323
<!-- Root logger -->
2424
<root level="INFO">

mcp-test/src/main/java/io/modelcontextprotocol/client/AbstractMcpAsyncClientResiliencyTests.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,7 @@ public abstract class AbstractMcpAsyncClientResiliencyTests {
7979
host = "http://" + ipAddressViaToxiproxy + ":" + portViaToxiproxy;
8080
}
8181

82-
private static void disconnect() {
82+
static void disconnect() {
8383
long start = System.nanoTime();
8484
try {
8585
// disconnect
@@ -96,7 +96,7 @@ private static void disconnect() {
9696
}
9797
}
9898

99-
private static void reconnect() {
99+
static void reconnect() {
100100
long start = System.nanoTime();
101101
try {
102102
proxy.toxics().get("RESET_UPSTREAM").remove();
@@ -110,7 +110,7 @@ private static void reconnect() {
110110
}
111111
}
112112

113-
private static void restartMcpServer() {
113+
static void restartMcpServer() {
114114
container.stop();
115115
container.start();
116116
}

mcp/pom.xml

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -141,11 +141,11 @@
141141

142142
<!-- Mockito cannot mock this class: class java.net.http.HttpClient. the bytebuddy helps. -->
143143
<dependency>
144-
<groupId>net.bytebuddy</groupId>
145-
<artifactId>byte-buddy</artifactId>
146-
<version>${byte-buddy.version}</version>
147-
<scope>test</scope>
148-
</dependency>
144+
<groupId>net.bytebuddy</groupId>
145+
<artifactId>byte-buddy</artifactId>
146+
<version>${byte-buddy.version}</version>
147+
<scope>test</scope>
148+
</dependency>
149149
<dependency>
150150
<groupId>io.projectreactor</groupId>
151151
<artifactId>reactor-test</artifactId>
@@ -202,6 +202,14 @@
202202
<scope>test</scope>
203203
</dependency>
204204

205+
<dependency>
206+
<groupId>org.testcontainers</groupId>
207+
<artifactId>toxiproxy</artifactId>
208+
<version>${toxiproxy.version}</version>
209+
<scope>test</scope>
210+
</dependency>
211+
212+
205213
</dependencies>
206214

207215

mcp/src/main/java/io/modelcontextprotocol/client/transport/FlowSseClient.java

Lines changed: 0 additions & 211 deletions
This file was deleted.

0 commit comments

Comments
 (0)