@@ -88,12 +88,16 @@ public class McpAsyncServer {
88
88
89
89
private final McpSchema .ServerCapabilities serverCapabilities ;
90
90
91
+ private final boolean isStreamableHttp ;
92
+
91
93
private final McpSchema .Implementation serverInfo ;
92
94
93
95
private final String instructions ;
94
96
95
97
private final CopyOnWriteArrayList <McpServerFeatures .AsyncToolSpecification > tools = new CopyOnWriteArrayList <>();
96
98
99
+ private final CopyOnWriteArrayList <McpServerFeatures .AsyncStreamingToolSpecification > streamTools = new CopyOnWriteArrayList <>();
100
+
97
101
private final CopyOnWriteArrayList <McpSchema .ResourceTemplate > resourceTemplates = new CopyOnWriteArrayList <>();
98
102
99
103
private final ConcurrentHashMap <String , McpServerFeatures .AsyncResourceSpecification > resources = new ConcurrentHashMap <>();
@@ -119,7 +123,7 @@ public class McpAsyncServer {
119
123
*/
120
124
McpAsyncServer (McpServerTransportProvider mcpTransportProvider , ObjectMapper objectMapper ,
121
125
McpServerFeatures .Async features , Duration requestTimeout ,
122
- McpUriTemplateManagerFactory uriTemplateManagerFactory ) {
126
+ McpUriTemplateManagerFactory uriTemplateManagerFactory , boolean isStreamableHttp ) {
123
127
this .mcpTransportProvider = mcpTransportProvider ;
124
128
this .objectMapper = objectMapper ;
125
129
this .serverInfo = features .serverInfo ();
@@ -131,6 +135,7 @@ public class McpAsyncServer {
131
135
this .prompts .putAll (features .prompts ());
132
136
this .completions .putAll (features .completions ());
133
137
this .uriTemplateManagerFactory = uriTemplateManagerFactory ;
138
+ this .isStreamableHttp = isStreamableHttp ;
134
139
135
140
Map <String , McpServerSession .RequestHandler <?>> requestHandlers = new HashMap <>();
136
141
@@ -188,6 +193,13 @@ public class McpAsyncServer {
188
193
notificationHandlers ));
189
194
}
190
195
196
+ // Alternate constructor for HTTP+SSE servers (past spec)
197
+ McpAsyncServer (McpServerTransportProvider mcpTransportProvider , ObjectMapper objectMapper ,
198
+ McpServerFeatures .Async features , Duration requestTimeout ,
199
+ McpUriTemplateManagerFactory uriTemplateManagerFactory ) {
200
+ this (mcpTransportProvider , objectMapper , features , requestTimeout , uriTemplateManagerFactory , false );
201
+ }
202
+
191
203
// ---------------------------------------
192
204
// Lifecycle Management
193
205
// ---------------------------------------
@@ -329,6 +341,69 @@ public Mono<Void> removeTool(String toolName) {
329
341
});
330
342
}
331
343
344
+ /**
345
+ * Add a new tool specification at runtime.
346
+ * @param toolSpecification The tool specification to add
347
+ * @return Mono that completes when clients have been notified of the change
348
+ */
349
+ public Mono <Void > addStreamTool (McpServerFeatures .AsyncStreamingToolSpecification toolSpecification ) {
350
+ if (toolSpecification == null ) {
351
+ return Mono .error (new McpError ("Tool specification must not be null" ));
352
+ }
353
+ if (toolSpecification .tool () == null ) {
354
+ return Mono .error (new McpError ("Tool must not be null" ));
355
+ }
356
+ if (toolSpecification .call () == null ) {
357
+ return Mono .error (new McpError ("Tool call handler must not be null" ));
358
+ }
359
+ if (this .serverCapabilities .tools () == null ) {
360
+ return Mono .error (new McpError ("Server must be configured with tool capabilities" ));
361
+ }
362
+
363
+ return Mono .defer (() -> {
364
+ // Check for duplicate tool names
365
+ if (this .streamTools .stream ().anyMatch (th -> th .tool ().name ().equals (toolSpecification .tool ().name ()))) {
366
+ return Mono
367
+ .error (new McpError ("Tool with name '" + toolSpecification .tool ().name () + "' already exists" ));
368
+ }
369
+
370
+ this .streamTools .add (toolSpecification );
371
+ logger .debug ("Added tool handler: {}" , toolSpecification .tool ().name ());
372
+
373
+ if (this .serverCapabilities .tools ().listChanged ()) {
374
+ return notifyToolsListChanged ();
375
+ }
376
+ return Mono .empty ();
377
+ });
378
+ }
379
+
380
+ /**
381
+ * Remove a tool handler at runtime.
382
+ * @param toolName The name of the tool handler to remove
383
+ * @return Mono that completes when clients have been notified of the change
384
+ */
385
+ public Mono <Void > removeStreamTool (String toolName ) {
386
+ if (toolName == null ) {
387
+ return Mono .error (new McpError ("Tool name must not be null" ));
388
+ }
389
+ if (this .serverCapabilities .tools () == null ) {
390
+ return Mono .error (new McpError ("Server must be configured with tool capabilities" ));
391
+ }
392
+
393
+ return Mono .defer (() -> {
394
+ boolean removed = this .tools
395
+ .removeIf (toolSpecification -> toolSpecification .tool ().name ().equals (toolName ));
396
+ if (removed ) {
397
+ logger .debug ("Removed tool handler: {}" , toolName );
398
+ if (this .serverCapabilities .tools ().listChanged ()) {
399
+ return notifyToolsListChanged ();
400
+ }
401
+ return Mono .empty ();
402
+ }
403
+ return Mono .error (new McpError ("Tool with name '" + toolName + "' not found" ));
404
+ });
405
+ }
406
+
332
407
/**
333
408
* Notifies clients that the list of available tools has changed.
334
409
* @return A Mono that completes when all clients have been notified
@@ -339,29 +414,97 @@ public Mono<Void> notifyToolsListChanged() {
339
414
340
415
private McpServerSession .RequestHandler <McpSchema .ListToolsResult > toolsListRequestHandler () {
341
416
return (exchange , params ) -> {
342
- List <Tool > tools = this .tools .stream ().map (McpServerFeatures .AsyncToolSpecification ::tool ).toList ();
417
+ List <Tool > tools = new ArrayList <>();
418
+ tools .addAll (this .tools .stream ().map (McpServerFeatures .AsyncToolSpecification ::tool ).toList ());
419
+ tools .addAll (
420
+ this .streamTools .stream ().map (McpServerFeatures .AsyncStreamingToolSpecification ::tool ).toList ());
343
421
344
422
return Mono .just (new McpSchema .ListToolsResult (tools , null ));
345
423
};
346
424
}
347
425
348
426
private McpServerSession .RequestHandler <CallToolResult > toolsCallRequestHandler () {
349
- return (exchange , params ) -> {
350
- McpSchema .CallToolRequest callToolRequest = objectMapper .convertValue (params ,
351
- new TypeReference <McpSchema .CallToolRequest >() {
352
- });
427
+ if (isStreamableHttp ) {
428
+ return new McpServerSession .StreamingRequestHandler <CallToolResult >() {
429
+ @ Override
430
+ public Mono <CallToolResult > handle (McpAsyncServerExchange exchange , Object params ) {
431
+ var callToolRequest = objectMapper .convertValue (params , McpSchema .CallToolRequest .class );
432
+
433
+ // Check regular tools first
434
+ var regularTool = tools .stream ()
435
+ .filter (tool -> callToolRequest .name ().equals (tool .tool ().name ()))
436
+ .findFirst ();
437
+
438
+ if (regularTool .isPresent ()) {
439
+ return regularTool .get ().call ().apply (exchange , callToolRequest .arguments ());
440
+ }
441
+
442
+ // Check streaming tools (take first result)
443
+ var streamingTool = streamTools .stream ()
444
+ .filter (tool -> callToolRequest .name ().equals (tool .tool ().name ()))
445
+ .findFirst ();
446
+
447
+ if (streamingTool .isPresent ()) {
448
+ return streamingTool .get ().call ().apply (exchange , callToolRequest .arguments ()).next ();
449
+ }
450
+
451
+ return Mono .error (new McpError ("Tool not found: " + callToolRequest .name ()));
452
+ }
353
453
354
- Optional < McpServerFeatures . AsyncToolSpecification > toolSpecification = this . tools . stream ()
355
- . filter ( tr -> callToolRequest . name (). equals ( tr . tool (). name ()))
356
- . findAny ( );
454
+ @ Override
455
+ public Flux < CallToolResult > handleStreaming ( McpAsyncServerExchange exchange , Object params ) {
456
+ var callToolRequest = objectMapper . convertValue ( params , McpSchema . CallToolRequest . class );
357
457
358
- if (toolSpecification .isEmpty ()) {
359
- return Mono .error (new McpError ("Tool not found: " + callToolRequest .name ()));
360
- }
458
+ // Check streaming tools first (preferred for streaming)
459
+ var streamingTool = streamTools .stream ()
460
+ .filter (tool -> callToolRequest .name ().equals (tool .tool ().name ()))
461
+ .findFirst ();
361
462
362
- return toolSpecification .map (tool -> tool .call ().apply (exchange , callToolRequest .arguments ()))
363
- .orElse (Mono .error (new McpError ("Tool not found: " + callToolRequest .name ())));
364
- };
463
+ if (streamingTool .isPresent ()) {
464
+ return streamingTool .get ().call ().apply (exchange , callToolRequest .arguments ());
465
+ }
466
+
467
+ // Fallback to regular tools (convert Mono to Flux)
468
+ var regularTool = tools .stream ()
469
+ .filter (tool -> callToolRequest .name ().equals (tool .tool ().name ()))
470
+ .findFirst ();
471
+
472
+ if (regularTool .isPresent ()) {
473
+ return regularTool .get ().call ().apply (exchange , callToolRequest .arguments ()).flux ();
474
+ }
475
+
476
+ return Flux .error (new McpError ("Tool not found: " + callToolRequest .name ()));
477
+ }
478
+ };
479
+ }
480
+ else {
481
+ return (exchange , params ) -> {
482
+ McpSchema .CallToolRequest callToolRequest = objectMapper .convertValue (params ,
483
+ new TypeReference <McpSchema .CallToolRequest >() {
484
+ });
485
+
486
+ // Check regular tools first
487
+ Optional <McpServerFeatures .AsyncToolSpecification > toolSpecification = this .tools .stream ()
488
+ .filter (tr -> callToolRequest .name ().equals (tr .tool ().name ()))
489
+ .findAny ();
490
+
491
+ if (toolSpecification .isPresent ()) {
492
+ return toolSpecification .get ().call ().apply (exchange , callToolRequest .arguments ());
493
+ }
494
+
495
+ // Check streaming tools (take first result)
496
+ Optional <McpServerFeatures .AsyncStreamingToolSpecification > streamToolSpecification = this .streamTools
497
+ .stream ()
498
+ .filter (tr -> callToolRequest .name ().equals (tr .tool ().name ()))
499
+ .findAny ();
500
+
501
+ if (streamToolSpecification .isPresent ()) {
502
+ return streamToolSpecification .get ().call ().apply (exchange , callToolRequest .arguments ()).next ();
503
+ }
504
+
505
+ return Mono .error (new McpError ("Tool not found: " + callToolRequest .name ()));
506
+ };
507
+ }
365
508
}
366
509
367
510
// ---------------------------------------
0 commit comments