Skip to content

Commit a01872d

Browse files
author
Zachary German
committed
Integrated dedicated GET/listening stream and multi-transport management
1 parent 2c07ee8 commit a01872d

File tree

8 files changed

+235
-487
lines changed

8 files changed

+235
-487
lines changed

mcp-spring/mcp-spring-webflux/src/test/java/io/modelcontextprotocol/server/transport/StreamableHttpTransportIntegrationTest.java

Lines changed: 13 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -58,12 +58,12 @@ void setUp() {
5858
// Set up session factory with proper server capabilities
5959
McpSchema.ServerCapabilities serverCapabilities = new McpSchema.ServerCapabilities(null, null, null, null, null,
6060
null);
61-
serverTransportProvider.setStreamableHttpSessionFactory(
62-
sessionId -> new io.modelcontextprotocol.spec.McpStreamableHttpServerSession(sessionId,
63-
java.time.Duration.ofSeconds(30),
64-
request -> reactor.core.publisher.Mono.just(new McpSchema.InitializeResult("2025-06-18",
65-
serverCapabilities, new McpSchema.Implementation("Test Server", "1.0.0"), null)),
66-
() -> reactor.core.publisher.Mono.empty(), java.util.Map.of(), java.util.Map.of()));
61+
serverTransportProvider
62+
.setStreamableHttpSessionFactory(sessionId -> new io.modelcontextprotocol.spec.McpServerSession(sessionId,
63+
java.time.Duration.ofSeconds(30),
64+
request -> reactor.core.publisher.Mono.just(new McpSchema.InitializeResult("2025-06-18",
65+
serverCapabilities, new McpSchema.Implementation("Test Server", "1.0.0"), null)),
66+
() -> reactor.core.publisher.Mono.empty(), java.util.Map.of(), java.util.Map.of()));
6767

6868
tomcat = TomcatTestUtil.createTomcatServer("", PORT, serverTransportProvider);
6969
try {
@@ -132,14 +132,14 @@ void shouldCallImmediateToolSuccessfully() {
132132
McpSchema.ServerCapabilities serverCapabilities = new McpSchema.ServerCapabilities(null, null, null, null, null,
133133
new McpSchema.ServerCapabilities.ToolCapabilities(true));
134134
serverTransportProvider
135-
.setStreamableHttpSessionFactory(sessionId -> new io.modelcontextprotocol.spec.McpStreamableHttpServerSession(
136-
sessionId, java.time.Duration.ofSeconds(30),
135+
.setStreamableHttpSessionFactory(sessionId -> new io.modelcontextprotocol.spec.McpServerSession(sessionId,
136+
java.time.Duration.ofSeconds(30),
137137
request -> reactor.core.publisher.Mono.just(new McpSchema.InitializeResult("2025-06-18",
138138
serverCapabilities, new McpSchema.Implementation("Test Server", "1.0.0"), null)),
139139
() -> reactor.core.publisher.Mono.empty(),
140140
java.util.Map.of("tools/call",
141-
(io.modelcontextprotocol.spec.McpStreamableHttpServerSession.RequestHandler<CallToolResult>) (
142-
exchange, params) -> tool.call().apply(exchange, (Map<String, Object>) params)),
141+
(io.modelcontextprotocol.spec.McpServerSession.RequestHandler<CallToolResult>) (exchange,
142+
params) -> tool.call().apply(exchange, (Map<String, Object>) params)),
143143
java.util.Map.of()));
144144

145145
var mcpClient = clientBuilder.build();
@@ -174,12 +174,12 @@ void shouldCallStreamingToolSuccessfully() {
174174
McpSchema.ServerCapabilities serverCapabilities = new McpSchema.ServerCapabilities(null, null, null, null, null,
175175
new McpSchema.ServerCapabilities.ToolCapabilities(true));
176176
serverTransportProvider
177-
.setStreamableHttpSessionFactory(sessionId -> new io.modelcontextprotocol.spec.McpStreamableHttpServerSession(
178-
sessionId, java.time.Duration.ofSeconds(30),
177+
.setStreamableHttpSessionFactory(sessionId -> new io.modelcontextprotocol.spec.McpServerSession(sessionId,
178+
java.time.Duration.ofSeconds(30),
179179
request -> reactor.core.publisher.Mono.just(new McpSchema.InitializeResult("2025-06-18",
180180
serverCapabilities, new McpSchema.Implementation("Test Server", "1.0.0"), null)),
181181
() -> reactor.core.publisher.Mono.empty(), java.util.Map.of("tools/call",
182-
(io.modelcontextprotocol.spec.McpStreamableHttpServerSession.StreamingRequestHandler<CallToolResult>) new io.modelcontextprotocol.spec.McpStreamableHttpServerSession.StreamingRequestHandler<CallToolResult>() {
182+
(io.modelcontextprotocol.spec.McpServerSession.StreamingRequestHandler<CallToolResult>) new io.modelcontextprotocol.spec.McpServerSession.StreamingRequestHandler<CallToolResult>() {
183183
@Override
184184
public Mono<CallToolResult> handle(
185185
io.modelcontextprotocol.server.McpAsyncServerExchange exchange, Object params) {

mcp/src/main/java/io/modelcontextprotocol/server/McpAsyncServer.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -183,9 +183,9 @@ public class McpAsyncServer {
183183
notificationHandlers.put(McpSchema.METHOD_NOTIFICATION_ROOTS_LIST_CHANGED,
184184
asyncRootsListChangedNotificationHandler(rootsChangeConsumers));
185185

186-
mcpTransportProvider.setSessionFactory(
187-
transport -> new McpServerSession(UUID.randomUUID().toString(), requestTimeout, transport,
188-
this::asyncInitializeRequestHandler, Mono::empty, requestHandlers, notificationHandlers));
186+
mcpTransportProvider.setSessionFactory(listeningTransport -> new McpServerSession(UUID.randomUUID().toString(),
187+
requestTimeout, listeningTransport, this::asyncInitializeRequestHandler, Mono::empty, requestHandlers,
188+
notificationHandlers));
189189
}
190190

191191
// ---------------------------------------

mcp/src/main/java/io/modelcontextprotocol/server/McpAsyncServerExchange.java

Lines changed: 0 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,6 @@
1313
import io.modelcontextprotocol.spec.McpSchema.LoggingLevel;
1414
import io.modelcontextprotocol.spec.McpSchema.LoggingMessageNotification;
1515
import io.modelcontextprotocol.spec.McpSession;
16-
import io.modelcontextprotocol.spec.McpStreamableHttpServerSession;
1716
import io.modelcontextprotocol.spec.McpServerSession;
1817
import io.modelcontextprotocol.util.Assert;
1918
import reactor.core.publisher.Mono;
@@ -61,20 +60,6 @@ public McpAsyncServerExchange(McpServerSession session, McpSchema.ClientCapabili
6160
this.clientInfo = clientInfo;
6261
}
6362

64-
/**
65-
* Create a new asynchronous exchange with the client.
66-
* @param session The server session representing a 1-1 interaction.
67-
* @param clientCapabilities The client capabilities that define the supported
68-
* features and functionality.
69-
* @param clientInfo The client implementation information.
70-
*/
71-
public McpAsyncServerExchange(McpStreamableHttpServerSession session,
72-
McpSchema.ClientCapabilities clientCapabilities, McpSchema.Implementation clientInfo) {
73-
this.session = session;
74-
this.clientCapabilities = clientCapabilities;
75-
this.clientInfo = clientInfo;
76-
}
77-
7863
/**
7964
* Get the client capabilities that define the supported features and functionality.
8065
* @return The client capabilities

mcp/src/main/java/io/modelcontextprotocol/server/McpAsyncStreamableHttpServer.java

Lines changed: 19 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515
import com.fasterxml.jackson.databind.ObjectMapper;
1616

1717
import io.modelcontextprotocol.server.transport.StreamableHttpServerTransportProvider;
18-
import io.modelcontextprotocol.spec.McpStreamableHttpServerSession;
18+
import io.modelcontextprotocol.spec.McpServerSession;
1919
import io.modelcontextprotocol.spec.McpSchema;
2020
import io.modelcontextprotocol.util.Assert;
2121
import io.modelcontextprotocol.util.DeafaultMcpUriTemplateManagerFactory;
@@ -86,7 +86,7 @@ public class McpAsyncStreamableHttpServer {
8686
* Sets up the request handlers for standard MCP methods.
8787
*/
8888
private void setupRequestHandlers() {
89-
Map<String, McpStreamableHttpServerSession.RequestHandler<?>> requestHandlers = new HashMap<>();
89+
Map<String, McpServerSession.RequestHandler<?>> requestHandlers = new HashMap<>();
9090

9191
// Ping handler
9292
requestHandlers.put(McpSchema.METHOD_PING, (exchange, params) -> Mono.just(Map.of()));
@@ -123,15 +123,15 @@ private void setupRequestHandlers() {
123123
this.requestHandlers = requestHandlers;
124124
}
125125

126-
private Map<String, McpStreamableHttpServerSession.RequestHandler<?>> requestHandlers;
126+
private Map<String, McpServerSession.RequestHandler<?>> requestHandlers;
127127

128-
private Map<String, McpStreamableHttpServerSession.NotificationHandler> notificationHandlers;
128+
private Map<String, McpServerSession.NotificationHandler> notificationHandlers;
129129

130130
/**
131131
* Sets up notification handlers.
132132
*/
133133
private void setupNotificationHandlers() {
134-
Map<String, McpStreamableHttpServerSession.NotificationHandler> handlers = new HashMap<>();
134+
Map<String, McpServerSession.NotificationHandler> handlers = new HashMap<>();
135135

136136
handlers.put(McpSchema.METHOD_NOTIFICATION_INITIALIZED, (exchange, params) -> {
137137
logger.info("[INIT] Received initialized notification - initialization complete!");
@@ -150,7 +150,7 @@ private void setupNotificationHandlers() {
150150
private void setupSessionFactory() {
151151
setupNotificationHandlers();
152152

153-
httpTransportProvider.setStreamableHttpSessionFactory(sessionId -> new McpStreamableHttpServerSession(sessionId,
153+
httpTransportProvider.setStreamableHttpSessionFactory(sessionId -> new McpServerSession(sessionId,
154154
requestTimeout, this::handleInitializeRequest, Mono::empty, requestHandlers, notificationHandlers));
155155
}
156156

@@ -181,7 +181,7 @@ private Mono<McpSchema.InitializeResult> handleInitializeRequest(McpSchema.Initi
181181
}
182182

183183
// Request handler creation methods
184-
private McpStreamableHttpServerSession.RequestHandler<McpSchema.ListToolsResult> createToolsListHandler() {
184+
private McpServerSession.RequestHandler<McpSchema.ListToolsResult> createToolsListHandler() {
185185
return (exchange, params) -> {
186186
var regularTools = features.tools().stream().map(McpServerFeatures.AsyncToolSpecification::tool).toList();
187187
var streamingTools = features.streamTools()
@@ -194,8 +194,8 @@ private McpStreamableHttpServerSession.RequestHandler<McpSchema.ListToolsResult>
194194
};
195195
}
196196

197-
private McpStreamableHttpServerSession.RequestHandler<McpSchema.CallToolResult> createToolsCallHandler() {
198-
return new McpStreamableHttpServerSession.StreamingRequestHandler<McpSchema.CallToolResult>() {
197+
private McpServerSession.RequestHandler<McpSchema.CallToolResult> createToolsCallHandler() {
198+
return new McpServerSession.StreamingRequestHandler<McpSchema.CallToolResult>() {
199199
@Override
200200
public Mono<McpSchema.CallToolResult> handle(McpAsyncServerExchange exchange, Object params) {
201201
var callToolRequest = objectMapper.convertValue(params, McpSchema.CallToolRequest.class);
@@ -252,7 +252,7 @@ public Flux<McpSchema.CallToolResult> handleStreaming(McpAsyncServerExchange exc
252252
};
253253
}
254254

255-
private McpStreamableHttpServerSession.RequestHandler<McpSchema.ListResourcesResult> createResourcesListHandler() {
255+
private McpServerSession.RequestHandler<McpSchema.ListResourcesResult> createResourcesListHandler() {
256256
return (exchange, params) -> {
257257
var resources = features.resources()
258258
.values()
@@ -263,7 +263,7 @@ private McpStreamableHttpServerSession.RequestHandler<McpSchema.ListResourcesRes
263263
};
264264
}
265265

266-
private McpStreamableHttpServerSession.RequestHandler<McpSchema.ReadResourceResult> createResourcesReadHandler() {
266+
private McpServerSession.RequestHandler<McpSchema.ReadResourceResult> createResourcesReadHandler() {
267267
return (exchange, params) -> {
268268
var resourceRequest = objectMapper.convertValue(params, McpSchema.ReadResourceRequest.class);
269269
var resourceUri = resourceRequest.uri();
@@ -278,12 +278,12 @@ private McpStreamableHttpServerSession.RequestHandler<McpSchema.ReadResourceResu
278278
};
279279
}
280280

281-
private McpStreamableHttpServerSession.RequestHandler<McpSchema.ListResourceTemplatesResult> createResourceTemplatesListHandler() {
281+
private McpServerSession.RequestHandler<McpSchema.ListResourceTemplatesResult> createResourceTemplatesListHandler() {
282282
return (exchange, params) -> Mono
283283
.just(new McpSchema.ListResourceTemplatesResult(features.resourceTemplates(), null));
284284
}
285285

286-
private McpStreamableHttpServerSession.RequestHandler<McpSchema.ListPromptsResult> createPromptsListHandler() {
286+
private McpServerSession.RequestHandler<McpSchema.ListPromptsResult> createPromptsListHandler() {
287287
return (exchange, params) -> {
288288
var prompts = features.prompts()
289289
.values()
@@ -294,7 +294,7 @@ private McpStreamableHttpServerSession.RequestHandler<McpSchema.ListPromptsResul
294294
};
295295
}
296296

297-
private McpStreamableHttpServerSession.RequestHandler<McpSchema.GetPromptResult> createPromptsGetHandler() {
297+
private McpServerSession.RequestHandler<McpSchema.GetPromptResult> createPromptsGetHandler() {
298298
return (exchange, params) -> {
299299
var promptRequest = objectMapper.convertValue(params, McpSchema.GetPromptRequest.class);
300300

@@ -308,15 +308,15 @@ private McpStreamableHttpServerSession.RequestHandler<McpSchema.GetPromptResult>
308308
};
309309
}
310310

311-
private McpStreamableHttpServerSession.RequestHandler<Object> createLoggingSetLevelHandler() {
311+
private McpServerSession.RequestHandler<Object> createLoggingSetLevelHandler() {
312312
return (exchange, params) -> {
313313
var setLevelRequest = objectMapper.convertValue(params, McpSchema.SetLevelRequest.class);
314314
exchange.setMinLoggingLevel(setLevelRequest.level());
315315
return Mono.just(Map.of());
316316
};
317317
}
318318

319-
private McpStreamableHttpServerSession.RequestHandler<McpSchema.CompleteResult> createCompletionCompleteHandler() {
319+
private McpServerSession.RequestHandler<McpSchema.CompleteResult> createCompletionCompleteHandler() {
320320
return (exchange, params) -> {
321321
var completeRequest = objectMapper.convertValue(params, McpSchema.CompleteRequest.class);
322322

@@ -330,7 +330,7 @@ private McpStreamableHttpServerSession.RequestHandler<McpSchema.CompleteResult>
330330
};
331331
}
332332

333-
private McpStreamableHttpServerSession.NotificationHandler createRootsListChangedHandler() {
333+
private McpServerSession.NotificationHandler createRootsListChangedHandler() {
334334
return (exchange, params) -> {
335335
var rootsChangeConsumers = features.rootsChangeConsumers();
336336
if (rootsChangeConsumers.isEmpty()) {
@@ -418,8 +418,8 @@ public static Builder builder() {
418418
* transport.
419419
*
420420
* <p>
421-
* This builder provides a fluent API for configuring Streamable HTTP MCP
422-
* servers with enhanced features:
421+
* This builder provides a fluent API for configuring Streamable HTTP MCP servers with
422+
* enhanced features:
423423
* <ul>
424424
* <li>Single session class managing all transport streams</li>
425425
* <li>Resource management and lifecycle handling</li>

0 commit comments

Comments
 (0)