Skip to content

Commit 34daee4

Browse files
committed
Implement for client
BREAKING : Client Specification record is braking
1 parent 8fe3cba commit 34daee4

File tree

3 files changed

+78
-3
lines changed

3 files changed

+78
-3
lines changed

mcp/src/main/java/io/modelcontextprotocol/client/McpAsyncClient.java

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -253,6 +253,16 @@ public class McpAsyncClient {
253253
notificationHandlers.put(McpSchema.METHOD_NOTIFICATION_MESSAGE,
254254
asyncLoggingNotificationHandler(loggingConsumersFinal));
255255

256+
// Utility Progress Notification
257+
List<Function<McpSchema.ProgressNotification, Mono<Void>>> progressConsumersFinal = new ArrayList<>();
258+
progressConsumersFinal
259+
.add((notification) -> Mono.fromRunnable(() -> logger.debug("Progress: {}", notification)));
260+
if (!Utils.isEmpty(features.progressConsumers())) {
261+
progressConsumersFinal.addAll(features.progressConsumers());
262+
}
263+
notificationHandlers.put(McpSchema.METHOD_NOTIFICATION_PROGRESS,
264+
asyncProgressNotificationHandler(progressConsumersFinal));
265+
256266
this.transport.setExceptionHandler(this::handleException);
257267
this.sessionSupplier = () -> new McpClientSession(requestTimeout, transport, requestHandlers,
258268
notificationHandlers);
@@ -917,6 +927,20 @@ private NotificationHandler asyncLoggingNotificationHandler(
917927
};
918928
}
919929

930+
private NotificationHandler asyncProgressNotificationHandler(
931+
List<Function<McpSchema.ProgressNotification, Mono<Void>>> progressConsumers) {
932+
933+
return params -> {
934+
McpSchema.ProgressNotification progressNotification = transport.unmarshalFrom(params,
935+
new TypeReference<McpSchema.ProgressNotification>() {
936+
});
937+
938+
return Flux.fromIterable(progressConsumers)
939+
.flatMap(consumer -> consumer.apply(progressNotification))
940+
.then();
941+
};
942+
}
943+
920944
/**
921945
* Sets the minimum logging level for messages received from the server. The client
922946
* will only receive log messages at or above the specified severity level.

mcp/src/main/java/io/modelcontextprotocol/client/McpClient.java

Lines changed: 37 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -175,6 +175,8 @@ class SyncSpec {
175175

176176
private final List<Consumer<McpSchema.LoggingMessageNotification>> loggingConsumers = new ArrayList<>();
177177

178+
private final List<Consumer<McpSchema.ProgressNotification>> progressConsumers = new ArrayList<>();
179+
178180
private Function<CreateMessageRequest, CreateMessageResult> samplingHandler;
179181

180182
private Function<ElicitRequest, ElicitResult> elicitationHandler;
@@ -375,6 +377,36 @@ public SyncSpec loggingConsumers(List<Consumer<McpSchema.LoggingMessageNotificat
375377
return this;
376378
}
377379

380+
/**
381+
* Adds a consumer to be notified of progress notifications from the server. This
382+
* allows the client to track long-running operations and provide feedback to
383+
* users.
384+
* @param progressConsumer A consumer that receives progress notifications. Must
385+
* not be null.
386+
* @return This builder instance for method chaining
387+
* @throws IllegalArgumentException if progressConsumer is null
388+
*/
389+
public SyncSpec progressConsumer(Consumer<McpSchema.ProgressNotification> progressConsumer) {
390+
Assert.notNull(progressConsumer, "Progress consumer must not be null");
391+
this.progressConsumers.add(progressConsumer);
392+
return this;
393+
}
394+
395+
/**
396+
* Adds a multiple consumers to be notified of progress notifications from the
397+
* server. This allows the client to track long-running operations and provide
398+
* feedback to users.
399+
* @param progressConsumers A list of consumers that receives progress
400+
* notifications. Must not be null.
401+
* @return This builder instance for method chaining
402+
* @throws IllegalArgumentException if progressConsumer is null
403+
*/
404+
public SyncSpec progressConsumers(List<Consumer<McpSchema.ProgressNotification>> progressConsumers) {
405+
Assert.notNull(progressConsumers, "Progress consumers must not be null");
406+
this.progressConsumers.addAll(progressConsumers);
407+
return this;
408+
}
409+
378410
/**
379411
* Create an instance of {@link McpSyncClient} with the provided configurations or
380412
* sensible defaults.
@@ -383,7 +415,7 @@ public SyncSpec loggingConsumers(List<Consumer<McpSchema.LoggingMessageNotificat
383415
public McpSyncClient build() {
384416
McpClientFeatures.Sync syncFeatures = new McpClientFeatures.Sync(this.clientInfo, this.capabilities,
385417
this.roots, this.toolsChangeConsumers, this.resourcesChangeConsumers, this.promptsChangeConsumers,
386-
this.loggingConsumers, this.samplingHandler, this.elicitationHandler);
418+
this.loggingConsumers, this.progressConsumers, this.samplingHandler, this.elicitationHandler);
387419

388420
McpClientFeatures.Async asyncFeatures = McpClientFeatures.Async.fromSync(syncFeatures);
389421

@@ -431,6 +463,8 @@ class AsyncSpec {
431463

432464
private final List<Function<McpSchema.LoggingMessageNotification, Mono<Void>>> loggingConsumers = new ArrayList<>();
433465

466+
private final List<Function<McpSchema.ProgressNotification, Mono<Void>>> progressConsumers = new ArrayList<>();
467+
434468
private Function<CreateMessageRequest, Mono<CreateMessageResult>> samplingHandler;
435469

436470
private Function<ElicitRequest, Mono<ElicitResult>> elicitationHandler;
@@ -642,7 +676,8 @@ public McpAsyncClient build() {
642676
return new McpAsyncClient(this.transport, this.requestTimeout, this.initializationTimeout,
643677
new McpClientFeatures.Async(this.clientInfo, this.capabilities, this.roots,
644678
this.toolsChangeConsumers, this.resourcesChangeConsumers, this.promptsChangeConsumers,
645-
this.loggingConsumers, this.samplingHandler, this.elicitationHandler));
679+
this.loggingConsumers, this.progressConsumers, this.samplingHandler,
680+
this.elicitationHandler));
646681
}
647682

648683
}

mcp/src/main/java/io/modelcontextprotocol/client/McpClientFeatures.java

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,7 @@ class McpClientFeatures {
5959
* @param resourcesChangeConsumers the resources change consumers.
6060
* @param promptsChangeConsumers the prompts change consumers.
6161
* @param loggingConsumers the logging consumers.
62+
* @param progressConsumers the progress consumers.
6263
* @param samplingHandler the sampling handler.
6364
* @param elicitationHandler the elicitation handler.
6465
*/
@@ -67,6 +68,7 @@ record Async(McpSchema.Implementation clientInfo, McpSchema.ClientCapabilities c
6768
List<Function<List<McpSchema.Resource>, Mono<Void>>> resourcesChangeConsumers,
6869
List<Function<List<McpSchema.Prompt>, Mono<Void>>> promptsChangeConsumers,
6970
List<Function<McpSchema.LoggingMessageNotification, Mono<Void>>> loggingConsumers,
71+
List<Function<McpSchema.ProgressNotification, Mono<Void>>> progressConsumers,
7072
Function<McpSchema.CreateMessageRequest, Mono<McpSchema.CreateMessageResult>> samplingHandler,
7173
Function<McpSchema.ElicitRequest, Mono<McpSchema.ElicitResult>> elicitationHandler) {
7274

@@ -78,6 +80,7 @@ record Async(McpSchema.Implementation clientInfo, McpSchema.ClientCapabilities c
7880
* @param resourcesChangeConsumers the resources change consumers.
7981
* @param promptsChangeConsumers the prompts change consumers.
8082
* @param loggingConsumers the logging consumers.
83+
* @param progressConsumers the progressconsumers.
8184
* @param samplingHandler the sampling handler.
8285
* @param elicitationHandler the elicitation handler.
8386
*/
@@ -87,6 +90,7 @@ public Async(McpSchema.Implementation clientInfo, McpSchema.ClientCapabilities c
8790
List<Function<List<McpSchema.Resource>, Mono<Void>>> resourcesChangeConsumers,
8891
List<Function<List<McpSchema.Prompt>, Mono<Void>>> promptsChangeConsumers,
8992
List<Function<McpSchema.LoggingMessageNotification, Mono<Void>>> loggingConsumers,
93+
List<Function<McpSchema.ProgressNotification, Mono<Void>>> progressConsumers,
9094
Function<McpSchema.CreateMessageRequest, Mono<McpSchema.CreateMessageResult>> samplingHandler,
9195
Function<McpSchema.ElicitRequest, Mono<McpSchema.ElicitResult>> elicitationHandler) {
9296

@@ -103,6 +107,7 @@ public Async(McpSchema.Implementation clientInfo, McpSchema.ClientCapabilities c
103107
this.resourcesChangeConsumers = resourcesChangeConsumers != null ? resourcesChangeConsumers : List.of();
104108
this.promptsChangeConsumers = promptsChangeConsumers != null ? promptsChangeConsumers : List.of();
105109
this.loggingConsumers = loggingConsumers != null ? loggingConsumers : List.of();
110+
this.progressConsumers = progressConsumers != null ? progressConsumers : List.of();
106111
this.samplingHandler = samplingHandler;
107112
this.elicitationHandler = elicitationHandler;
108113
}
@@ -141,6 +146,12 @@ public static Async fromSync(Sync syncSpec) {
141146
.subscribeOn(Schedulers.boundedElastic()));
142147
}
143148

149+
List<Function<McpSchema.ProgressNotification, Mono<Void>>> progressConsumers = new ArrayList<>();
150+
for (Consumer<McpSchema.ProgressNotification> consumer : syncSpec.progressConsumers()) {
151+
progressConsumers.add(p -> Mono.<Void>fromRunnable(() -> consumer.accept(p))
152+
.subscribeOn(Schedulers.boundedElastic()));
153+
}
154+
144155
Function<McpSchema.CreateMessageRequest, Mono<McpSchema.CreateMessageResult>> samplingHandler = r -> Mono
145156
.fromCallable(() -> syncSpec.samplingHandler().apply(r))
146157
.subscribeOn(Schedulers.boundedElastic());
@@ -151,7 +162,7 @@ public static Async fromSync(Sync syncSpec) {
151162

152163
return new Async(syncSpec.clientInfo(), syncSpec.clientCapabilities(), syncSpec.roots(),
153164
toolsChangeConsumers, resourcesChangeConsumers, promptsChangeConsumers, loggingConsumers,
154-
samplingHandler, elicitationHandler);
165+
progressConsumers, samplingHandler, elicitationHandler);
155166
}
156167
}
157168

@@ -166,6 +177,7 @@ public static Async fromSync(Sync syncSpec) {
166177
* @param resourcesChangeConsumers the resources change consumers.
167178
* @param promptsChangeConsumers the prompts change consumers.
168179
* @param loggingConsumers the logging consumers.
180+
* @param progressConsumers the progress consumers.
169181
* @param samplingHandler the sampling handler.
170182
* @param elicitationHandler the elicitation handler.
171183
*/
@@ -174,6 +186,7 @@ public record Sync(McpSchema.Implementation clientInfo, McpSchema.ClientCapabili
174186
List<Consumer<List<McpSchema.Resource>>> resourcesChangeConsumers,
175187
List<Consumer<List<McpSchema.Prompt>>> promptsChangeConsumers,
176188
List<Consumer<McpSchema.LoggingMessageNotification>> loggingConsumers,
189+
List<Consumer<McpSchema.ProgressNotification>> progressConsumers,
177190
Function<McpSchema.CreateMessageRequest, McpSchema.CreateMessageResult> samplingHandler,
178191
Function<McpSchema.ElicitRequest, McpSchema.ElicitResult> elicitationHandler) {
179192

@@ -186,6 +199,7 @@ public record Sync(McpSchema.Implementation clientInfo, McpSchema.ClientCapabili
186199
* @param resourcesChangeConsumers the resources change consumers.
187200
* @param promptsChangeConsumers the prompts change consumers.
188201
* @param loggingConsumers the logging consumers.
202+
* @param progressConsumers the progress consumers.
189203
* @param samplingHandler the sampling handler.
190204
* @param elicitationHandler the elicitation handler.
191205
*/
@@ -194,6 +208,7 @@ public Sync(McpSchema.Implementation clientInfo, McpSchema.ClientCapabilities cl
194208
List<Consumer<List<McpSchema.Resource>>> resourcesChangeConsumers,
195209
List<Consumer<List<McpSchema.Prompt>>> promptsChangeConsumers,
196210
List<Consumer<McpSchema.LoggingMessageNotification>> loggingConsumers,
211+
List<Consumer<McpSchema.ProgressNotification>> progressConsumers,
197212
Function<McpSchema.CreateMessageRequest, McpSchema.CreateMessageResult> samplingHandler,
198213
Function<McpSchema.ElicitRequest, McpSchema.ElicitResult> elicitationHandler) {
199214

@@ -210,6 +225,7 @@ public Sync(McpSchema.Implementation clientInfo, McpSchema.ClientCapabilities cl
210225
this.resourcesChangeConsumers = resourcesChangeConsumers != null ? resourcesChangeConsumers : List.of();
211226
this.promptsChangeConsumers = promptsChangeConsumers != null ? promptsChangeConsumers : List.of();
212227
this.loggingConsumers = loggingConsumers != null ? loggingConsumers : List.of();
228+
this.progressConsumers = progressConsumers != null ? progressConsumers : List.of();
213229
this.samplingHandler = samplingHandler;
214230
this.elicitationHandler = elicitationHandler;
215231
}

0 commit comments

Comments
 (0)