Skip to content

Commit 55ee156

Browse files
tzolovwwulfricHarryFQG
committed
feat(webflux): Add configurable SSE endpoints to WebFlux transport (#41, #67)
Enhances WebFlux SSE transport implementation with customizable endpoint paths: - Add configurable SSE endpoint support in both client and server transports - Update tests to verify custom SSE endpoint functionality - Implement builder pattern to support the new configuration options Co-authored-by: haidao <wwulfric@gmail.com> Co-authored-by: Harry <34418180+HarryFQG@users.noreply.github.com> Signed-off-by: Christian Tzolov <christian.tzolov@broadcom.com>
1 parent 2934635 commit 55ee156

File tree

8 files changed

+210
-19
lines changed

8 files changed

+210
-19
lines changed

mcp-spring/mcp-spring-webflux/src/main/java/io/modelcontextprotocol/client/transport/WebFluxSseClientTransport.java

Lines changed: 86 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,7 @@ public class WebFluxSseClientTransport implements McpClientTransport {
7979
* Default SSE endpoint path as specified by the MCP transport specification. This
8080
* endpoint is used to establish the SSE connection with the server.
8181
*/
82-
private static final String SSE_ENDPOINT = "/sse";
82+
private static final String DEFAULT_SSE_ENDPOINT = "/sse";
8383

8484
/**
8585
* Type reference for parsing SSE events containing string data.
@@ -117,6 +117,12 @@ public class WebFluxSseClientTransport implements McpClientTransport {
117117
*/
118118
protected final Sinks.One<String> messageEndpointSink = Sinks.one();
119119

120+
/**
121+
* The SSE endpoint URI provided by the server. Used for sending outbound messages via
122+
* HTTP POST requests.
123+
*/
124+
private String sseEndpoint;
125+
120126
/**
121127
* Constructs a new SseClientTransport with the specified WebClient builder. Uses a
122128
* default ObjectMapper instance for JSON processing.
@@ -137,11 +143,27 @@ public WebFluxSseClientTransport(WebClient.Builder webClientBuilder) {
137143
* @throws IllegalArgumentException if either parameter is null
138144
*/
139145
public WebFluxSseClientTransport(WebClient.Builder webClientBuilder, ObjectMapper objectMapper) {
146+
this(webClientBuilder, objectMapper, DEFAULT_SSE_ENDPOINT);
147+
}
148+
149+
/**
150+
* Constructs a new SseClientTransport with the specified WebClient builder and
151+
* ObjectMapper. Initializes both inbound and outbound message processing pipelines.
152+
* @param webClientBuilder the WebClient.Builder to use for creating the WebClient
153+
* instance
154+
* @param objectMapper the ObjectMapper to use for JSON processing
155+
* @param sseEndpoint the SSE endpoint URI to use for establishing the connection
156+
* @throws IllegalArgumentException if either parameter is null
157+
*/
158+
public WebFluxSseClientTransport(WebClient.Builder webClientBuilder, ObjectMapper objectMapper,
159+
String sseEndpoint) {
140160
Assert.notNull(objectMapper, "ObjectMapper must not be null");
141161
Assert.notNull(webClientBuilder, "WebClient.Builder must not be null");
162+
Assert.hasText(sseEndpoint, "SSE endpoint must not be null or empty");
142163

143164
this.objectMapper = objectMapper;
144165
this.webClient = webClientBuilder.build();
166+
this.sseEndpoint = sseEndpoint;
145167
}
146168

147169
/**
@@ -254,7 +276,7 @@ public Mono<Void> sendMessage(JSONRPCMessage message) {
254276
protected Flux<ServerSentEvent<String>> eventStream() {// @formatter:off
255277
return this.webClient
256278
.get()
257-
.uri(SSE_ENDPOINT)
279+
.uri(this.sseEndpoint)
258280
.accept(MediaType.TEXT_EVENT_STREAM)
259281
.retrieve()
260282
.bodyToFlux(SSE_TYPE)
@@ -321,4 +343,66 @@ public <T> T unmarshalFrom(Object data, TypeReference<T> typeRef) {
321343
return this.objectMapper.convertValue(data, typeRef);
322344
}
323345

346+
/**
347+
* Creates a new builder for {@link WebFluxSseClientTransport}.
348+
* @param webClientBuilder the WebClient.Builder to use for creating the WebClient
349+
* instance
350+
* @return a new builder instance
351+
*/
352+
public static Builder builder(WebClient.Builder webClientBuilder) {
353+
return new Builder(webClientBuilder);
354+
}
355+
356+
/**
357+
* Builder for {@link WebFluxSseClientTransport}.
358+
*/
359+
public static class Builder {
360+
361+
private final WebClient.Builder webClientBuilder;
362+
363+
private String sseEndpoint = DEFAULT_SSE_ENDPOINT;
364+
365+
private ObjectMapper objectMapper = new ObjectMapper();
366+
367+
/**
368+
* Creates a new builder with the specified WebClient.Builder.
369+
* @param webClientBuilder the WebClient.Builder to use
370+
*/
371+
public Builder(WebClient.Builder webClientBuilder) {
372+
Assert.notNull(webClientBuilder, "WebClient.Builder must not be null");
373+
this.webClientBuilder = webClientBuilder;
374+
}
375+
376+
/**
377+
* Sets the SSE endpoint path.
378+
* @param sseEndpoint the SSE endpoint path
379+
* @return this builder
380+
*/
381+
public Builder sseEndpoint(String sseEndpoint) {
382+
Assert.hasText(sseEndpoint, "sseEndpoint must not be empty");
383+
this.sseEndpoint = sseEndpoint;
384+
return this;
385+
}
386+
387+
/**
388+
* Sets the object mapper for JSON serialization/deserialization.
389+
* @param objectMapper the object mapper
390+
* @return this builder
391+
*/
392+
public Builder objectMapper(ObjectMapper objectMapper) {
393+
Assert.notNull(objectMapper, "objectMapper must not be null");
394+
this.objectMapper = objectMapper;
395+
return this;
396+
}
397+
398+
/**
399+
* Builds a new {@link WebFluxSseClientTransport} instance.
400+
* @return a new transport instance
401+
*/
402+
public WebFluxSseClientTransport build() {
403+
return new WebFluxSseClientTransport(webClientBuilder, objectMapper, sseEndpoint);
404+
}
405+
406+
}
407+
324408
}

mcp-spring/mcp-spring-webflux/src/main/java/io/modelcontextprotocol/server/transport/WebFluxSseServerTransportProvider.java

Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -346,4 +346,74 @@ public void close() {
346346

347347
}
348348

349+
public static Builder builder() {
350+
return new Builder();
351+
}
352+
353+
/**
354+
* Builder for creating instances of {@link WebFluxSseServerTransportProvider}.
355+
* <p>
356+
* This builder provides a fluent API for configuring and creating instances of
357+
* WebFluxSseServerTransportProvider with custom settings.
358+
*/
359+
public static class Builder {
360+
361+
private ObjectMapper objectMapper;
362+
363+
private String messageEndpoint;
364+
365+
private String sseEndpoint = DEFAULT_SSE_ENDPOINT;
366+
367+
/**
368+
* Sets the ObjectMapper to use for JSON serialization/deserialization of MCP
369+
* messages.
370+
* @param objectMapper The ObjectMapper instance. Must not be null.
371+
* @return this builder instance
372+
* @throws IllegalArgumentException if objectMapper is null
373+
*/
374+
public Builder objectMapper(ObjectMapper objectMapper) {
375+
Assert.notNull(objectMapper, "ObjectMapper must not be null");
376+
this.objectMapper = objectMapper;
377+
return this;
378+
}
379+
380+
/**
381+
* Sets the endpoint URI where clients should send their JSON-RPC messages.
382+
* @param messageEndpoint The message endpoint URI. Must not be null.
383+
* @return this builder instance
384+
* @throws IllegalArgumentException if messageEndpoint is null
385+
*/
386+
public Builder messageEndpoint(String messageEndpoint) {
387+
Assert.notNull(messageEndpoint, "Message endpoint must not be null");
388+
this.messageEndpoint = messageEndpoint;
389+
return this;
390+
}
391+
392+
/**
393+
* Sets the SSE endpoint path.
394+
* @param sseEndpoint The SSE endpoint path. Must not be null.
395+
* @return this builder instance
396+
* @throws IllegalArgumentException if sseEndpoint is null
397+
*/
398+
public Builder sseEndpoint(String sseEndpoint) {
399+
Assert.notNull(sseEndpoint, "SSE endpoint must not be null");
400+
this.sseEndpoint = sseEndpoint;
401+
return this;
402+
}
403+
404+
/**
405+
* Builds a new instance of {@link WebFluxSseServerTransportProvider} with the
406+
* configured settings.
407+
* @return A new WebFluxSseServerTransportProvider instance
408+
* @throws IllegalStateException if required parameters are not set
409+
*/
410+
public WebFluxSseServerTransportProvider build() {
411+
Assert.notNull(objectMapper, "ObjectMapper must be set");
412+
Assert.notNull(messageEndpoint, "Message endpoint must be set");
413+
414+
return new WebFluxSseServerTransportProvider(objectMapper, messageEndpoint, sseEndpoint);
415+
}
416+
417+
}
418+
349419
}

mcp-spring/mcp-spring-webflux/src/test/java/io/modelcontextprotocol/WebFluxSseIntegrationTests.java

Lines changed: 18 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -46,14 +46,17 @@
4646

4747
import static org.assertj.core.api.Assertions.assertThat;
4848
import static org.awaitility.Awaitility.await;
49-
import static org.junit.Assert.assertThat;
5049
import static org.mockito.Mockito.mock;
5150

5251
public class WebFluxSseIntegrationTests {
5352

5453
private static final int PORT = 8182;
5554

56-
private static final String MESSAGE_ENDPOINT = "/mcp/message";
55+
// private static final String MESSAGE_ENDPOINT = "/mcp/message";
56+
57+
private static final String CUSTOM_SSE_ENDPOINT = "/somePath/sse";
58+
59+
private static final String CUSTOM_MESSAGE_ENDPOINT = "/otherPath/mcp/message";
5760

5861
private DisposableServer httpServer;
5962

@@ -64,15 +67,25 @@ public class WebFluxSseIntegrationTests {
6467
@BeforeEach
6568
public void before() {
6669

67-
this.mcpServerTransportProvider = new WebFluxSseServerTransportProvider(new ObjectMapper(), MESSAGE_ENDPOINT);
70+
this.mcpServerTransportProvider = new WebFluxSseServerTransportProvider.Builder()
71+
.objectMapper(new ObjectMapper())
72+
.messageEndpoint(CUSTOM_MESSAGE_ENDPOINT)
73+
.sseEndpoint(CUSTOM_SSE_ENDPOINT)
74+
.build();
6875

6976
HttpHandler httpHandler = RouterFunctions.toHttpHandler(mcpServerTransportProvider.getRouterFunction());
7077
ReactorHttpHandlerAdapter adapter = new ReactorHttpHandlerAdapter(httpHandler);
7178
this.httpServer = HttpServer.create().port(PORT).handle(adapter).bindNow();
7279

73-
clientBulders.put("httpclient", McpClient.sync(new HttpClientSseClientTransport("http://localhost:" + PORT)));
80+
clientBulders.put("httpclient",
81+
McpClient.sync(HttpClientSseClientTransport.builder("http://localhost:" + PORT)
82+
.sseEndpoint(CUSTOM_SSE_ENDPOINT)
83+
.build()));
7484
clientBulders.put("webflux",
75-
McpClient.sync(new WebFluxSseClientTransport(WebClient.builder().baseUrl("http://localhost:" + PORT))));
85+
McpClient
86+
.sync(WebFluxSseClientTransport.builder(WebClient.builder().baseUrl("http://localhost:" + PORT))
87+
.sseEndpoint(CUSTOM_SSE_ENDPOINT)
88+
.build()));
7689

7790
}
7891

mcp-spring/mcp-spring-webflux/src/test/java/io/modelcontextprotocol/client/WebFluxSseMcpAsyncClientTests.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ class WebFluxSseMcpAsyncClientTests extends AbstractMcpAsyncClientTests {
3333

3434
@Override
3535
protected McpClientTransport createMcpTransport() {
36-
return new WebFluxSseClientTransport(WebClient.builder().baseUrl(host));
36+
return WebFluxSseClientTransport.builder(WebClient.builder().baseUrl(host)).build();
3737
}
3838

3939
@Override

mcp-spring/mcp-spring-webflux/src/test/java/io/modelcontextprotocol/client/WebFluxSseMcpSyncClientTests.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ class WebFluxSseMcpSyncClientTests extends AbstractMcpSyncClientTests {
3333

3434
@Override
3535
protected McpClientTransport createMcpTransport() {
36-
return new WebFluxSseClientTransport(WebClient.builder().baseUrl(host));
36+
return WebFluxSseClientTransport.builder(WebClient.builder().baseUrl(host)).build();
3737
}
3838

3939
@Override

mcp-spring/mcp-spring-webflux/src/test/java/io/modelcontextprotocol/client/transport/WebFluxSseClientTransportTests.java

Lines changed: 28 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -63,13 +63,6 @@ public TestSseClientTransport(WebClient.Builder webClientBuilder, ObjectMapper o
6363
super(webClientBuilder, objectMapper);
6464
}
6565

66-
// @Override
67-
// public Mono<Void> connect(Function<Mono<McpSchema.JSONRPCMessage>,
68-
// Mono<McpSchema.JSONRPCMessage>> handler) {
69-
// simulateEndpointEvent("https://localhost:3001");
70-
// return super.connect(handler);
71-
// }
72-
7366
@Override
7467
protected Flux<ServerSentEvent<String>> eventStream() {
7568
return super.eventStream().mergeWith(events.asFlux());
@@ -137,6 +130,33 @@ void constructorValidation() {
137130
.hasMessageContaining("ObjectMapper must not be null");
138131
}
139132

133+
@Test
134+
void testBuilderPattern() {
135+
// Test default builder
136+
WebFluxSseClientTransport transport1 = WebFluxSseClientTransport.builder(webClientBuilder).build();
137+
assertThatCode(() -> transport1.closeGracefully().block()).doesNotThrowAnyException();
138+
139+
// Test builder with custom ObjectMapper
140+
ObjectMapper customMapper = new ObjectMapper();
141+
WebFluxSseClientTransport transport2 = WebFluxSseClientTransport.builder(webClientBuilder)
142+
.objectMapper(customMapper)
143+
.build();
144+
assertThatCode(() -> transport2.closeGracefully().block()).doesNotThrowAnyException();
145+
146+
// Test builder with custom SSE endpoint
147+
WebFluxSseClientTransport transport3 = WebFluxSseClientTransport.builder(webClientBuilder)
148+
.sseEndpoint("/custom-sse")
149+
.build();
150+
assertThatCode(() -> transport3.closeGracefully().block()).doesNotThrowAnyException();
151+
152+
// Test builder with all custom parameters
153+
WebFluxSseClientTransport transport4 = WebFluxSseClientTransport.builder(webClientBuilder)
154+
.objectMapper(customMapper)
155+
.sseEndpoint("/custom-sse")
156+
.build();
157+
assertThatCode(() -> transport4.closeGracefully().block()).doesNotThrowAnyException();
158+
}
159+
140160
@Test
141161
void testMessageProcessing() {
142162
// Create a test message
@@ -240,7 +260,7 @@ void testRetryBehavior() {
240260
// Create a WebClient that simulates connection failures
241261
WebClient.Builder failingWebClientBuilder = WebClient.builder().baseUrl("http://non-existent-host");
242262

243-
WebFluxSseClientTransport failingTransport = new WebFluxSseClientTransport(failingWebClientBuilder);
263+
WebFluxSseClientTransport failingTransport = WebFluxSseClientTransport.builder(failingWebClientBuilder).build();
244264

245265
// Verify that the transport attempts to reconnect
246266
StepVerifier.create(Mono.delay(Duration.ofSeconds(2))).expectNextCount(1).verifyComplete();

mcp-spring/mcp-spring-webflux/src/test/java/io/modelcontextprotocol/server/WebFluxSseMcpAsyncServerTests.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,9 @@ class WebFluxSseMcpAsyncServerTests extends AbstractMcpAsyncServerTests {
3131

3232
@Override
3333
protected McpServerTransportProvider createMcpTransportProvider() {
34-
var transportProvider = new WebFluxSseServerTransportProvider(new ObjectMapper(), MESSAGE_ENDPOINT);
34+
var transportProvider = new WebFluxSseServerTransportProvider.Builder().objectMapper(new ObjectMapper())
35+
.messageEndpoint(MESSAGE_ENDPOINT)
36+
.build();
3537

3638
HttpHandler httpHandler = RouterFunctions.toHttpHandler(transportProvider.getRouterFunction());
3739
ReactorHttpHandlerAdapter adapter = new ReactorHttpHandlerAdapter(httpHandler);

mcp-spring/mcp-spring-webflux/src/test/java/io/modelcontextprotocol/server/WebFluxSseMcpSyncServerTests.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,9 @@ class WebFluxSseMcpSyncServerTests extends AbstractMcpSyncServerTests {
3333

3434
@Override
3535
protected McpServerTransportProvider createMcpTransportProvider() {
36-
transportProvider = new WebFluxSseServerTransportProvider(new ObjectMapper(), MESSAGE_ENDPOINT);
36+
transportProvider = new WebFluxSseServerTransportProvider.Builder().objectMapper(new ObjectMapper())
37+
.messageEndpoint(MESSAGE_ENDPOINT)
38+
.build();
3739
return transportProvider;
3840
}
3941

0 commit comments

Comments
 (0)