Skip to content

Commit a88c241

Browse files
committed
Implement for client
BREAKING : Client Specification record is braking
1 parent aa95c0d commit a88c241

File tree

3 files changed

+79
-4
lines changed

3 files changed

+79
-4
lines changed

mcp/src/main/java/io/modelcontextprotocol/client/McpAsyncClient.java

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -267,6 +267,16 @@ public class McpAsyncClient {
267267
notificationHandlers.put(McpSchema.METHOD_NOTIFICATION_MESSAGE,
268268
asyncLoggingNotificationHandler(loggingConsumersFinal));
269269

270+
// Utility Progress Notification
271+
List<Function<McpSchema.ProgressNotification, Mono<Void>>> progressConsumersFinal = new ArrayList<>();
272+
progressConsumersFinal
273+
.add((notification) -> Mono.fromRunnable(() -> logger.debug("Progress: {}", notification)));
274+
if (!Utils.isEmpty(features.progressConsumers())) {
275+
progressConsumersFinal.addAll(features.progressConsumers());
276+
}
277+
notificationHandlers.put(McpSchema.METHOD_NOTIFICATION_PROGRESS,
278+
asyncProgressNotificationHandler(progressConsumersFinal));
279+
270280
this.transport.setExceptionHandler(this::handleException);
271281
this.sessionSupplier = () -> new McpClientSession(requestTimeout, transport, requestHandlers,
272282
notificationHandlers);
@@ -985,6 +995,20 @@ private NotificationHandler asyncLoggingNotificationHandler(
985995
};
986996
}
987997

998+
private NotificationHandler asyncProgressNotificationHandler(
999+
List<Function<McpSchema.ProgressNotification, Mono<Void>>> progressConsumers) {
1000+
1001+
return params -> {
1002+
McpSchema.ProgressNotification progressNotification = transport.unmarshalFrom(params,
1003+
new TypeReference<McpSchema.ProgressNotification>() {
1004+
});
1005+
1006+
return Flux.fromIterable(progressConsumers)
1007+
.flatMap(consumer -> consumer.apply(progressNotification))
1008+
.then();
1009+
};
1010+
}
1011+
9881012
/**
9891013
* Sets the minimum logging level for messages received from the server. The client
9901014
* will only receive log messages at or above the specified severity level.

mcp/src/main/java/io/modelcontextprotocol/client/McpClient.java

Lines changed: 38 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -177,6 +177,8 @@ class SyncSpec {
177177

178178
private final List<Consumer<McpSchema.LoggingMessageNotification>> loggingConsumers = new ArrayList<>();
179179

180+
private final List<Consumer<McpSchema.ProgressNotification>> progressConsumers = new ArrayList<>();
181+
180182
private Function<CreateMessageRequest, CreateMessageResult> samplingHandler;
181183

182184
private Function<ElicitRequest, ElicitResult> elicitationHandler;
@@ -377,6 +379,36 @@ public SyncSpec loggingConsumers(List<Consumer<McpSchema.LoggingMessageNotificat
377379
return this;
378380
}
379381

382+
/**
383+
* Adds a consumer to be notified of progress notifications from the server. This
384+
* allows the client to track long-running operations and provide feedback to
385+
* users.
386+
* @param progressConsumer A consumer that receives progress notifications. Must
387+
* not be null.
388+
* @return This builder instance for method chaining
389+
* @throws IllegalArgumentException if progressConsumer is null
390+
*/
391+
public SyncSpec progressConsumer(Consumer<McpSchema.ProgressNotification> progressConsumer) {
392+
Assert.notNull(progressConsumer, "Progress consumer must not be null");
393+
this.progressConsumers.add(progressConsumer);
394+
return this;
395+
}
396+
397+
/**
398+
* Adds a multiple consumers to be notified of progress notifications from the
399+
* server. This allows the client to track long-running operations and provide
400+
* feedback to users.
401+
* @param progressConsumers A list of consumers that receives progress
402+
* notifications. Must not be null.
403+
* @return This builder instance for method chaining
404+
* @throws IllegalArgumentException if progressConsumer is null
405+
*/
406+
public SyncSpec progressConsumers(List<Consumer<McpSchema.ProgressNotification>> progressConsumers) {
407+
Assert.notNull(progressConsumers, "Progress consumers must not be null");
408+
this.progressConsumers.addAll(progressConsumers);
409+
return this;
410+
}
411+
380412
/**
381413
* Create an instance of {@link McpSyncClient} with the provided configurations or
382414
* sensible defaults.
@@ -385,7 +417,8 @@ public SyncSpec loggingConsumers(List<Consumer<McpSchema.LoggingMessageNotificat
385417
public McpSyncClient build() {
386418
McpClientFeatures.Sync syncFeatures = new McpClientFeatures.Sync(this.clientInfo, this.capabilities,
387419
this.roots, this.toolsChangeConsumers, this.resourcesChangeConsumers, this.resourcesUpdateConsumers,
388-
this.promptsChangeConsumers, this.loggingConsumers, this.samplingHandler, this.elicitationHandler);
420+
this.promptsChangeConsumers, this.loggingConsumers, this.progressConsumers, this.samplingHandler,
421+
this.elicitationHandler);
389422

390423
McpClientFeatures.Async asyncFeatures = McpClientFeatures.Async.fromSync(syncFeatures);
391424

@@ -435,6 +468,8 @@ class AsyncSpec {
435468

436469
private final List<Function<McpSchema.LoggingMessageNotification, Mono<Void>>> loggingConsumers = new ArrayList<>();
437470

471+
private final List<Function<McpSchema.ProgressNotification, Mono<Void>>> progressConsumers = new ArrayList<>();
472+
438473
private Function<CreateMessageRequest, Mono<CreateMessageResult>> samplingHandler;
439474

440475
private Function<ElicitRequest, Mono<ElicitResult>> elicitationHandler;
@@ -663,8 +698,8 @@ public McpAsyncClient build() {
663698
return new McpAsyncClient(this.transport, this.requestTimeout, this.initializationTimeout,
664699
new McpClientFeatures.Async(this.clientInfo, this.capabilities, this.roots,
665700
this.toolsChangeConsumers, this.resourcesChangeConsumers, this.resourcesUpdateConsumers,
666-
this.promptsChangeConsumers, this.loggingConsumers, this.samplingHandler,
667-
this.elicitationHandler));
701+
this.promptsChangeConsumers, this.loggingConsumers, this.progressConsumers,
702+
this.samplingHandler, this.elicitationHandler));
668703
}
669704

670705
}

mcp/src/main/java/io/modelcontextprotocol/client/McpClientFeatures.java

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,7 @@ class McpClientFeatures {
5959
* @param resourcesChangeConsumers the resources change consumers.
6060
* @param promptsChangeConsumers the prompts change consumers.
6161
* @param loggingConsumers the logging consumers.
62+
* @param progressConsumers the progress consumers.
6263
* @param samplingHandler the sampling handler.
6364
* @param elicitationHandler the elicitation handler.
6465
*/
@@ -68,6 +69,7 @@ record Async(McpSchema.Implementation clientInfo, McpSchema.ClientCapabilities c
6869
List<Function<List<McpSchema.ResourceContents>, Mono<Void>>> resourcesUpdateConsumers,
6970
List<Function<List<McpSchema.Prompt>, Mono<Void>>> promptsChangeConsumers,
7071
List<Function<McpSchema.LoggingMessageNotification, Mono<Void>>> loggingConsumers,
72+
List<Function<McpSchema.ProgressNotification, Mono<Void>>> progressConsumers,
7173
Function<McpSchema.CreateMessageRequest, Mono<McpSchema.CreateMessageResult>> samplingHandler,
7274
Function<McpSchema.ElicitRequest, Mono<McpSchema.ElicitResult>> elicitationHandler) {
7375

@@ -79,6 +81,7 @@ record Async(McpSchema.Implementation clientInfo, McpSchema.ClientCapabilities c
7981
* @param resourcesChangeConsumers the resources change consumers.
8082
* @param promptsChangeConsumers the prompts change consumers.
8183
* @param loggingConsumers the logging consumers.
84+
* @param progressConsumers the progressconsumers.
8285
* @param samplingHandler the sampling handler.
8386
* @param elicitationHandler the elicitation handler.
8487
*/
@@ -89,6 +92,7 @@ public Async(McpSchema.Implementation clientInfo, McpSchema.ClientCapabilities c
8992
List<Function<List<McpSchema.ResourceContents>, Mono<Void>>> resourcesUpdateConsumers,
9093
List<Function<List<McpSchema.Prompt>, Mono<Void>>> promptsChangeConsumers,
9194
List<Function<McpSchema.LoggingMessageNotification, Mono<Void>>> loggingConsumers,
95+
List<Function<McpSchema.ProgressNotification, Mono<Void>>> progressConsumers,
9296
Function<McpSchema.CreateMessageRequest, Mono<McpSchema.CreateMessageResult>> samplingHandler,
9397
Function<McpSchema.ElicitRequest, Mono<McpSchema.ElicitResult>> elicitationHandler) {
9498

@@ -106,6 +110,7 @@ public Async(McpSchema.Implementation clientInfo, McpSchema.ClientCapabilities c
106110
this.resourcesUpdateConsumers = resourcesUpdateConsumers != null ? resourcesUpdateConsumers : List.of();
107111
this.promptsChangeConsumers = promptsChangeConsumers != null ? promptsChangeConsumers : List.of();
108112
this.loggingConsumers = loggingConsumers != null ? loggingConsumers : List.of();
113+
this.progressConsumers = progressConsumers != null ? progressConsumers : List.of();
109114
this.samplingHandler = samplingHandler;
110115
this.elicitationHandler = elicitationHandler;
111116
}
@@ -149,6 +154,12 @@ public static Async fromSync(Sync syncSpec) {
149154
.subscribeOn(Schedulers.boundedElastic()));
150155
}
151156

157+
List<Function<McpSchema.ProgressNotification, Mono<Void>>> progressConsumers = new ArrayList<>();
158+
for (Consumer<McpSchema.ProgressNotification> consumer : syncSpec.progressConsumers()) {
159+
progressConsumers.add(p -> Mono.<Void>fromRunnable(() -> consumer.accept(p))
160+
.subscribeOn(Schedulers.boundedElastic()));
161+
}
162+
152163
Function<McpSchema.CreateMessageRequest, Mono<McpSchema.CreateMessageResult>> samplingHandler = r -> Mono
153164
.fromCallable(() -> syncSpec.samplingHandler().apply(r))
154165
.subscribeOn(Schedulers.boundedElastic());
@@ -159,7 +170,7 @@ public static Async fromSync(Sync syncSpec) {
159170

160171
return new Async(syncSpec.clientInfo(), syncSpec.clientCapabilities(), syncSpec.roots(),
161172
toolsChangeConsumers, resourcesChangeConsumers, resourcesUpdateConsumers, promptsChangeConsumers,
162-
loggingConsumers, samplingHandler, elicitationHandler);
173+
loggingConsumers, progressConsumers, samplingHandler, elicitationHandler);
163174
}
164175
}
165176

@@ -174,6 +185,7 @@ public static Async fromSync(Sync syncSpec) {
174185
* @param resourcesChangeConsumers the resources change consumers.
175186
* @param promptsChangeConsumers the prompts change consumers.
176187
* @param loggingConsumers the logging consumers.
188+
* @param progressConsumers the progress consumers.
177189
* @param samplingHandler the sampling handler.
178190
* @param elicitationHandler the elicitation handler.
179191
*/
@@ -183,6 +195,7 @@ public record Sync(McpSchema.Implementation clientInfo, McpSchema.ClientCapabili
183195
List<Consumer<List<McpSchema.ResourceContents>>> resourcesUpdateConsumers,
184196
List<Consumer<List<McpSchema.Prompt>>> promptsChangeConsumers,
185197
List<Consumer<McpSchema.LoggingMessageNotification>> loggingConsumers,
198+
List<Consumer<McpSchema.ProgressNotification>> progressConsumers,
186199
Function<McpSchema.CreateMessageRequest, McpSchema.CreateMessageResult> samplingHandler,
187200
Function<McpSchema.ElicitRequest, McpSchema.ElicitResult> elicitationHandler) {
188201

@@ -196,6 +209,7 @@ public record Sync(McpSchema.Implementation clientInfo, McpSchema.ClientCapabili
196209
* @param resourcesUpdateConsumers the resource update consumers.
197210
* @param promptsChangeConsumers the prompts change consumers.
198211
* @param loggingConsumers the logging consumers.
212+
* @param progressConsumers the progress consumers.
199213
* @param samplingHandler the sampling handler.
200214
* @param elicitationHandler the elicitation handler.
201215
*/
@@ -205,6 +219,7 @@ public Sync(McpSchema.Implementation clientInfo, McpSchema.ClientCapabilities cl
205219
List<Consumer<List<McpSchema.ResourceContents>>> resourcesUpdateConsumers,
206220
List<Consumer<List<McpSchema.Prompt>>> promptsChangeConsumers,
207221
List<Consumer<McpSchema.LoggingMessageNotification>> loggingConsumers,
222+
List<Consumer<McpSchema.ProgressNotification>> progressConsumers,
208223
Function<McpSchema.CreateMessageRequest, McpSchema.CreateMessageResult> samplingHandler,
209224
Function<McpSchema.ElicitRequest, McpSchema.ElicitResult> elicitationHandler) {
210225

@@ -222,6 +237,7 @@ public Sync(McpSchema.Implementation clientInfo, McpSchema.ClientCapabilities cl
222237
this.resourcesUpdateConsumers = resourcesUpdateConsumers != null ? resourcesUpdateConsumers : List.of();
223238
this.promptsChangeConsumers = promptsChangeConsumers != null ? promptsChangeConsumers : List.of();
224239
this.loggingConsumers = loggingConsumers != null ? loggingConsumers : List.of();
240+
this.progressConsumers = progressConsumers != null ? progressConsumers : List.of();
225241
this.samplingHandler = samplingHandler;
226242
this.elicitationHandler = elicitationHandler;
227243
}

0 commit comments

Comments
 (0)