diff --git a/mcp/src/main/java/io/modelcontextprotocol/spec/McpClientSession.java b/mcp/src/main/java/io/modelcontextprotocol/spec/McpClientSession.java index 0895e02b..949aca4f 100644 --- a/mcp/src/main/java/io/modelcontextprotocol/spec/McpClientSession.java +++ b/mcp/src/main/java/io/modelcontextprotocol/spec/McpClientSession.java @@ -122,7 +122,12 @@ public McpClientSession(Duration requestTimeout, McpClientTransport transport, // Observation associated with the individual message - it can be used to // create child Observation and emit it together with the message to the // consumer - this.connection = this.transport.connect(mono -> mono.doOnNext(message -> { + this.connection = this.transport.connect(mono -> mono.doOnNext(message -> handle(message).subscribe())) + .subscribe(); + } + + public Mono handle(McpSchema.JSONRPCMessage message) { + return Mono.defer(() -> { if (message instanceof McpSchema.JSONRPCResponse response) { logger.debug("Received Response: {}", response); var sink = pendingResponses.remove(response.id()); @@ -132,23 +137,27 @@ public McpClientSession(Duration requestTimeout, McpClientTransport transport, else { sink.success(response); } + return Mono.empty(); } else if (message instanceof McpSchema.JSONRPCRequest request) { logger.debug("Received request: {}", request); - handleIncomingRequest(request).subscribe(response -> transport.sendMessage(response).subscribe(), - error -> { - var errorResponse = new McpSchema.JSONRPCResponse(McpSchema.JSONRPC_VERSION, request.id(), - null, new McpSchema.JSONRPCResponse.JSONRPCError( - McpSchema.ErrorCodes.INTERNAL_ERROR, error.getMessage(), null)); - transport.sendMessage(errorResponse).subscribe(); - }); + return handleIncomingRequest(request).onErrorResume(error -> { + var errorResponse = new McpSchema.JSONRPCResponse(McpSchema.JSONRPC_VERSION, request.id(), null, + new McpSchema.JSONRPCResponse.JSONRPCError(McpSchema.ErrorCodes.INTERNAL_ERROR, + error.getMessage(), null)); + return this.transport.sendMessage(errorResponse).then(Mono.empty()); + }).flatMap(this.transport::sendMessage); } else if (message instanceof McpSchema.JSONRPCNotification notification) { logger.debug("Received notification: {}", notification); - handleIncomingNotification(notification).subscribe(null, - error -> logger.error("Error handling notification: {}", error.getMessage())); + return handleIncomingNotification(notification) + .doOnError(error -> logger.error("Error handling notification: {}", error.getMessage())); } - })).subscribe(); + else { + logger.warn("Received unknown message type: {}", message); + return Mono.empty(); + } + }); } /**