Skip to content

Commit be719f5

Browse files
committed
refactor: consolidate MCP client initialization logic (#334)
- Move session setup, completion, and error callbacks into doInitialize method - Rename variables in McpClientSession for better clarity (sink -> pendingResponseSink/deliveredResponseSink) Signed-off-by: Christian Tzolov <christian.tzolov@broadcom.com>
1 parent ba5bc94 commit be719f5

File tree

2 files changed

+18
-16
lines changed

2 files changed

+18
-16
lines changed

mcp/src/main/java/io/modelcontextprotocol/client/McpAsyncClient.java

Lines changed: 11 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -389,7 +389,12 @@ public Mono<McpSchema.InitializeResult> initialize() {
389389
return withSession("by explicit API call", init -> Mono.just(init.get()));
390390
}
391391

392-
private Mono<McpSchema.InitializeResult> doInitialize(McpClientSession mcpClientSession) {
392+
private Mono<McpSchema.InitializeResult> doInitialize(Initialization initialization) {
393+
394+
initialization.setMcpClientSession(this.sessionSupplier.get());
395+
396+
McpClientSession mcpClientSession = initialization.mcpSession();
397+
393398
String latestVersion = this.protocolVersions.get(this.protocolVersions.size() - 1);
394399

395400
McpSchema.InitializeRequest initializeRequest = new McpSchema.InitializeRequest(// @formatter:off
@@ -412,6 +417,9 @@ private Mono<McpSchema.InitializeResult> doInitialize(McpClientSession mcpClient
412417

413418
return mcpClientSession.sendNotification(McpSchema.METHOD_NOTIFICATION_INITIALIZED, null)
414419
.thenReturn(initializeResult);
420+
}).doOnNext(initialization::complete).onErrorResume(ex -> {
421+
initialization.error(ex);
422+
return Mono.error(ex);
415423
});
416424
}
417425

@@ -479,15 +487,9 @@ private <T> Mono<T> withSession(String actionName, Function<Initialization, Mono
479487

480488
boolean needsToInitialize = previous == null;
481489
logger.debug(needsToInitialize ? "Initialization process started" : "Joining previous initialization");
482-
if (needsToInitialize) {
483-
newInit.setMcpClientSession(this.sessionSupplier.get());
484-
}
485490

486-
Mono<McpSchema.InitializeResult> initializationJob = needsToInitialize
487-
? doInitialize(newInit.mcpSession()).doOnNext(newInit::complete).onErrorResume(ex -> {
488-
newInit.error(ex);
489-
return Mono.error(ex);
490-
}) : previous.await();
491+
Mono<McpSchema.InitializeResult> initializationJob = needsToInitialize ? doInitialize(newInit)
492+
: previous.await();
491493

492494
return initializationJob.map(initializeResult -> this.initializationRef.get())
493495
.timeout(this.initializationTimeout)

mcp/src/main/java/io/modelcontextprotocol/spec/McpClientSession.java

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -229,27 +229,27 @@ private String generateRequestId() {
229229
public <T> Mono<T> sendRequest(String method, Object requestParams, TypeReference<T> typeRef) {
230230
String requestId = this.generateRequestId();
231231

232-
return Mono.deferContextual(ctx -> Mono.<McpSchema.JSONRPCResponse>create(sink -> {
232+
return Mono.deferContextual(ctx -> Mono.<McpSchema.JSONRPCResponse>create(pendingResponseSink -> {
233233
logger.debug("Sending message for method {}", method);
234-
this.pendingResponses.put(requestId, sink);
234+
this.pendingResponses.put(requestId, pendingResponseSink);
235235
McpSchema.JSONRPCRequest jsonrpcRequest = new McpSchema.JSONRPCRequest(McpSchema.JSONRPC_VERSION, method,
236236
requestId, requestParams);
237237
this.transport.sendMessage(jsonrpcRequest).contextWrite(ctx).subscribe(v -> {
238238
}, error -> {
239239
this.pendingResponses.remove(requestId);
240-
sink.error(error);
240+
pendingResponseSink.error(error);
241241
});
242-
})).timeout(this.requestTimeout).handle((jsonRpcResponse, sink) -> {
242+
})).timeout(this.requestTimeout).handle((jsonRpcResponse, deliveredResponseSink) -> {
243243
if (jsonRpcResponse.error() != null) {
244244
logger.error("Error handling request: {}", jsonRpcResponse.error());
245-
sink.error(new McpError(jsonRpcResponse.error()));
245+
deliveredResponseSink.error(new McpError(jsonRpcResponse.error()));
246246
}
247247
else {
248248
if (typeRef.getType().equals(Void.class)) {
249-
sink.complete();
249+
deliveredResponseSink.complete();
250250
}
251251
else {
252-
sink.next(this.transport.unmarshalFrom(jsonRpcResponse.result(), typeRef));
252+
deliveredResponseSink.next(this.transport.unmarshalFrom(jsonRpcResponse.result(), typeRef));
253253
}
254254
}
255255
});

0 commit comments

Comments
 (0)