From 0f0a412881014256fa0f0168182497c8436805fe Mon Sep 17 00:00:00 2001 From: manu-sj Date: Mon, 22 Jul 2024 15:49:29 +0200 Subject: [PATCH 1/6] changing transformed to transform, fixing retriving setting outputcolumn names, correcting column names for untransformed features and adding back missed merge changes --- python/hsfs/core/vector_server.py | 243 ++++++++++++++++++++++++++---- python/hsfs/feature_view.py | 12 +- python/hsfs/hopsworks_udf.py | 2 +- 3 files changed, 224 insertions(+), 33 deletions(-) diff --git a/python/hsfs/core/vector_server.py b/python/hsfs/core/vector_server.py index 1c467028b..9116eb4ea 100755 --- a/python/hsfs/core/vector_server.py +++ b/python/hsfs/core/vector_server.py @@ -17,6 +17,7 @@ import itertools import logging +import warnings from base64 import b64decode from datetime import datetime, timezone from io import BytesIO @@ -97,6 +98,11 @@ def __init__( ) ] self._untransformed_feature_vector_col_name = [ + feat.name + for feat in features + if not (feat.label or feat.training_helper_column or feat.on_demand_transformation_function) + ] + self._on_demand_feature_vector_col_name = [ feat.name for feat in features if not (feat.label or feat.training_helper_column) @@ -310,7 +316,7 @@ def get_feature_vector( allow_missing: bool = False, force_rest_client: bool = False, force_sql_client: bool = False, - transformed=True, + transform: bool=True, request_parameters: Optional[Dict[str, Any]] = None, ) -> Union[pd.DataFrame, pl.DataFrame, np.ndarray, List[Any], Dict[str, Any]]: """Assembles serving vector from online feature store.""" @@ -343,7 +349,7 @@ def get_feature_vector( vector_db_result=vector_db_features or {}, allow_missing=allow_missing, client=online_client_choice, - transformed=transformed, + transform=transform, request_parameters=request_parameters, ) @@ -352,7 +358,8 @@ def get_feature_vector( batch=False, inference_helper=False, return_type=return_type, - transformed=transformed, + transform=transform, + on_demand_feature=transform ) def get_feature_vectors( @@ -367,7 +374,7 @@ def get_feature_vectors( allow_missing: bool = False, force_rest_client: bool = False, force_sql_client: bool = False, - transformed=True, + transform: bool=True, ) -> Union[pd.DataFrame, pl.DataFrame, np.ndarray, List[Any], List[Dict[str, Any]]]: """Assembles serving vector from online feature store.""" if passed_features is None: @@ -467,7 +474,7 @@ def get_feature_vectors( vector_db_result=vector_db_result, allow_missing=allow_missing, client=online_client_choice, - transformed=transformed, + transform=transform, request_parameters=request_parameter, ) @@ -479,7 +486,8 @@ def get_feature_vectors( batch=True, inference_helper=False, return_type=return_type, - transformed=transformed, + transform=transform, + on_demand_feature=transform ) def assemble_feature_vector( @@ -489,7 +497,7 @@ def assemble_feature_vector( vector_db_result: Optional[Dict[str, Any]], allow_missing: bool, client: Literal["rest", "sql"], - transformed, + transform: bool, request_parameters: Optional[Dict[str, Any]] = None, ) -> Optional[List[Any]]: """Assembles serving vector from online feature store.""" @@ -537,11 +545,11 @@ def assemble_feature_vector( if ( len(self.model_dependent_transformation_functions) > 0 or len(self.on_demand_transformation_functions) > 0 - ) and transformed: + ) and transform: self.apply_transformation(result_dict, request_parameters or {}) _logger.debug("Assembled and transformed dict feature vector: %s", result_dict) - if transformed: + if transform: return [ result_dict.get(fname, None) for fname in self.transformed_feature_vector_col_name @@ -552,21 +560,195 @@ def assemble_feature_vector( for fname in self._untransformed_feature_vector_col_name ] - def transform_feature_vectors(self, batch_features): - return [ - self.apply_transformation(self.get_untransformed_features_map(features)) - for features in batch_features - ] + def _check_feature_vectors_type_and_convert_to_dict( + self, + feature_vectors: Union[List[Any], List[List[Any]], pd.DataFrame, pl.DataFrame], + on_demand_features : bool = False, + ) -> Tuple[Dict[str, Any], Literal["pandas", "polars", "list"]]: + """ + Function that converts an input feature vector into a list of dictionaries. - def get_untransformed_features_map(self, features) -> Dict[str, Any]: - return dict( - [ - (fname, fvalue) - for fname, fvalue in zip( - self._untransformed_feature_vector_col_name, features - ) - ] + # Arguments + feature_vectors: `Union[List[Any], List[List[Any]], pd.DataFrame, pl.DataFrame]`. The feature vectors to be converted. + on_demand_features : `bool`. Specify if on-demand features provided in the input feature vector. + + # Returns + `Tuple[Dict[str, Any], Literal["pandas", "polars", "list"]]`: A tuple that contains the feature vector as a dictionary and a string denoting the data type of the input feature vector. + + """ + if isinstance(feature_vectors, pd.DataFrame): + return_type = "pandas" + feature_vectors = feature_vectors.to_dict(orient="records") + + elif isinstance(feature_vectors, pl.DataFrame): + return_type = "polars" + feature_vectors = feature_vectors.to_pandas() + feature_vectors = feature_vectors.to_dict(orient="records") + + elif isinstance(feature_vectors, list) and feature_vectors: + if all( + isinstance(feature_vector, list) for feature_vector in feature_vectors + ): + return_type = "list" + feature_vectors = [ + self.get_untransformed_features_map(feature_vector, on_demand_features=on_demand_features) + for feature_vector in feature_vectors + ] + + else: + return_type = "list" + feature_vectors = [self.get_untransformed_features_map(feature_vectors, on_demand_features=on_demand_features)] + + else: + raise exceptions.FeatureStoreException( + "Unsupported input type for feature vector. Supported types are `List`, `pandas.DataFrame`, `polars.DataFrame`" + ) + return feature_vectors, return_type + + def transform( + self, + feature_vectors: Union[List[Any], List[List[Any]], pd.DataFrame, pl.DataFrame], + ) -> Union[List[Any], List[List[Any]], pd.DataFrame, pl.DataFrame]: + """ + Applies model dependent transformation on the provided feature vector. + + # Arguments + feature_vectors: `Union[List[Any], List[List[Any]], pd.DataFrame, pl.DataFrame]`. The feature vectors to be transformed using attached model-dependent transformations. + + # Returns + `Union[List[Any], List[List[Any]], pd.DataFrame, pl.DataFrame]`: The transformed feature vector. + """ + if not self._model_dependent_transformation_functions: + warnings.warn( + "Feature view does not have any attached model-dependent transformations. Returning input feature vectors.", + stacklevel=0, + ) + return feature_vectors + + feature_vectors, return_type = ( + self._check_feature_vectors_type_and_convert_to_dict(feature_vectors, on_demand_features=True) ) + transformed_feature_vectors = [] + for feature_vector in feature_vectors: + transformed_feature_vector = self.apply_model_dependent_transformations( + feature_vector + ) + transformed_feature_vectors.append( + [ + transformed_feature_vector.get(fname, None) + for fname in self.transformed_feature_vector_col_name + ] + ) + + if len(transformed_feature_vectors) == 1: + batch = False + transformed_feature_vectors = transformed_feature_vectors[0] + else: + batch = True + + return self.handle_feature_vector_return_type( + transformed_feature_vectors, + batch=batch, + inference_helper=False, + return_type=return_type, + transform=True, + ) + + def compute_on_demand_features( + self, + feature_vectors: Union[List[Any], List[List[Any]], pd.DataFrame, pl.DataFrame], + request_parameters: Union[List[Dict[str, Any]], Dict[str, Any]], + ): + """ + Function computes on-demand features present in the feature view. + + # Arguments + feature_vector: `Union[List[Any], List[List[Any]], pd.DataFrame, pl.DataFrame]`. The feature vector to be transformed. + request_parameters: Request parameters required by on-demand transformation functions to compute on-demand features present in the feature view. + # Returns + `Union[List[Any], List[List[Any]], pd.DataFrame, pl.DataFrame]`: The feature vector that contains all on-demand features in the feature view. + """ + if not self._on_demand_transformation_functions: + warnings.warn( + "Feature view does not have any on-demand features. Returning input feature vectors.", + stacklevel=1, + ) + return feature_vectors + + request_parameters = {} if not request_parameters else request_parameters + # Convert feature vectors to dictionary + feature_vectors, return_type = ( + self._check_feature_vectors_type_and_convert_to_dict(feature_vectors) + ) + # Check if all request parameters are provided + # If request parameter is a dictionary then copy it to list with the same length as that of entires + request_parameters = ( + [request_parameters] * len(feature_vectors) + if isinstance(request_parameters, dict) + else request_parameters + ) + self.check_missing_request_parameters( + features=feature_vectors[0], request_parameters=request_parameters[0] + ) + on_demand_feature_vectors = [] + for feature_vector, request_parameter in zip( + feature_vectors, request_parameters + ): + on_demand_feature_vector = self.apply_on_demand_transformations( + feature_vector, request_parameter + ) + on_demand_feature_vectors.append( + [ + on_demand_feature_vector.get(fname, None) + for fname in self._on_demand_feature_vector_col_name + ] + ) + + if len(on_demand_feature_vectors) == 1: + batch = False + on_demand_feature_vectors = on_demand_feature_vectors[0] + else: + batch = True + + return self.handle_feature_vector_return_type( + on_demand_feature_vectors, + batch=batch, + inference_helper=False, + return_type=return_type, + transform=False, + on_demand_feature=True + ) + + def get_untransformed_features_map(self, features : List[Any], on_demand_features : bool = False) -> Dict[str, Any]: + """ + Function that accepts a feature vectors as a list and returns the untransformed features as a dict that maps + feature names to their values + + # Arguments + features : `List[Any]`. List of feature vectors. + on_demand_features : `bool`. Specify if on-demand features provided in the input feature vector. + + # Returns + `Dict[str, Any]` : Dictionary mapping features name to values. + """ + if on_demand_features: + return dict( + [ + (fname, fvalue) + for fname, fvalue in zip( + self._on_demand_feature_vector_col_name, features + ) + ] + ) + else: + return dict( + [ + (fname, fvalue) + for fname, fvalue in zip( + self._untransformed_feature_vector_col_name, features + ) + ] + ) def handle_feature_vector_return_type( self, @@ -576,7 +758,8 @@ def handle_feature_vector_return_type( batch: bool, inference_helper: bool, return_type: Union[Literal["list", "dict", "numpy", "pandas", "polars"]], - transformed: bool = False, + transform: bool = False, + on_demand_feature: bool=False ) -> Union[ pd.DataFrame, pl.DataFrame, @@ -586,10 +769,13 @@ def handle_feature_vector_return_type( Dict[str, Any], List[Dict[str, Any]], ]: - if transformed: + if transform: column_names = self.transformed_feature_vector_col_name else: - column_names = self._feature_vector_col_name + if on_demand_feature: + column_names = self._on_demand_feature_vector_col_name + else: + column_names = self._untransformed_feature_vector_col_name # Only get-feature-vector and get-feature-vectors can return list or numpy if return_type.lower() == "list" and not inference_helper: @@ -652,12 +838,16 @@ def get_inference_helper( batch=False, inference_helper=True, return_type=return_type, + transform=False, + on_demand_feature=False ) return self.handle_feature_vector_return_type( self.sql_client.get_inference_helper_vector(entry), batch=False, inference_helper=True, return_type=return_type, + transform=False, + on_demand_feature=False ) def get_inference_helpers( @@ -706,7 +896,8 @@ def get_inference_helpers( batch=True, inference_helper=True, return_type=return_type, - transformed=False, + transform=False, + on_demand_feature=False ) def which_client_and_ensure_initialised( diff --git a/python/hsfs/feature_view.py b/python/hsfs/feature_view.py index bdcf4b51c..b8b1fddd3 100644 --- a/python/hsfs/feature_view.py +++ b/python/hsfs/feature_view.py @@ -511,7 +511,7 @@ def get_feature_vector( allow_missing: bool = False, force_rest_client: bool = False, force_sql_client: bool = False, - transformed: Optional[bool] = True, + transform: Optional[bool] = True, request_parameters: Optional[Dict[str, Any]] = None, ) -> Union[List[Any], pd.DataFrame, np.ndarray, pl.DataFrame]: """Returns assembled feature vector from online feature store. @@ -613,7 +613,7 @@ def get_feature_vector( vector_db_features=vector_db_features, force_rest_client=force_rest_client, force_sql_client=force_sql_client, - transformed=transformed, + transform=transform, request_parameters=request_parameters, ) @@ -626,7 +626,7 @@ def get_feature_vectors( allow_missing: bool = False, force_rest_client: bool = False, force_sql_client: bool = False, - transformed: Optional[bool] = True, + transform: Optional[bool] = True, request_parameters: Optional[List[Dict[str, Any]]] = None, ) -> Union[List[List[Any]], pd.DataFrame, np.ndarray, pl.DataFrame]: """Returns assembled feature vectors in batches from online feature store. @@ -728,7 +728,7 @@ def get_feature_vectors( vector_db_features=vector_db_features, force_rest_client=force_rest_client, force_sql_client=force_sql_client, - transformed=transformed, + transform=transform, request_parameters=request_parameters, ) @@ -3941,7 +3941,7 @@ def transformation_functions( self._transformation_functions = transformation_functions @property - def model_dependent_tranformation_functions(self) -> Dict["str", Callable]: + def model_dependent_transformations(self) -> Dict["str", Callable]: """Get Model-Dependent transformations as a dictionary mapping transformed feature names to transformation function""" return { transformation_function.hopsworks_udf.output_column_names[ @@ -3951,7 +3951,7 @@ def model_dependent_tranformation_functions(self) -> Dict["str", Callable]: } @property - def on_demand_tranformation_functions(self) -> Dict["str", Callable]: + def on_demand_transformations(self) -> Dict["str", Callable]: """Get On-Demand transformations as a dictionary mapping on-demand feature names to transformation function""" return { feature.on_demand_transformation_function.hopsworks_udf.function_name: feature.on_demand_transformation_function.hopsworks_udf.get_udf() diff --git a/python/hsfs/hopsworks_udf.py b/python/hsfs/hopsworks_udf.py index 01952eb8e..7e16f03c7 100644 --- a/python/hsfs/hopsworks_udf.py +++ b/python/hsfs/hopsworks_udf.py @@ -916,7 +916,7 @@ def transformation_statistics( def output_column_names(self, output_col_names: Union[str, List[str]]) -> None: if not isinstance(output_col_names, List): output_col_names = [output_col_names] - if len(output_col_names) != len(self.return_types): + if not output_col_names and len(output_col_names) != len(self.return_types): raise FeatureStoreException( f"Provided names for output columns does not match the number of columns returned from the UDF. Please provide {len(self.return_types)} names." ) From 0c015113758f3862b97b8b7c459def2323de2d31 Mon Sep 17 00:00:00 2001 From: manu-sj Date: Mon, 22 Jul 2024 15:53:58 +0200 Subject: [PATCH 2/6] reformating with ruff --- python/hsfs/core/vector_server.py | 44 ++++++++++++++++++++----------- 1 file changed, 29 insertions(+), 15 deletions(-) diff --git a/python/hsfs/core/vector_server.py b/python/hsfs/core/vector_server.py index 9116eb4ea..fb744398c 100755 --- a/python/hsfs/core/vector_server.py +++ b/python/hsfs/core/vector_server.py @@ -100,7 +100,11 @@ def __init__( self._untransformed_feature_vector_col_name = [ feat.name for feat in features - if not (feat.label or feat.training_helper_column or feat.on_demand_transformation_function) + if not ( + feat.label + or feat.training_helper_column + or feat.on_demand_transformation_function + ) ] self._on_demand_feature_vector_col_name = [ feat.name @@ -316,7 +320,7 @@ def get_feature_vector( allow_missing: bool = False, force_rest_client: bool = False, force_sql_client: bool = False, - transform: bool=True, + transform: bool = True, request_parameters: Optional[Dict[str, Any]] = None, ) -> Union[pd.DataFrame, pl.DataFrame, np.ndarray, List[Any], Dict[str, Any]]: """Assembles serving vector from online feature store.""" @@ -359,7 +363,7 @@ def get_feature_vector( inference_helper=False, return_type=return_type, transform=transform, - on_demand_feature=transform + on_demand_feature=transform, ) def get_feature_vectors( @@ -374,7 +378,7 @@ def get_feature_vectors( allow_missing: bool = False, force_rest_client: bool = False, force_sql_client: bool = False, - transform: bool=True, + transform: bool = True, ) -> Union[pd.DataFrame, pl.DataFrame, np.ndarray, List[Any], List[Dict[str, Any]]]: """Assembles serving vector from online feature store.""" if passed_features is None: @@ -487,7 +491,7 @@ def get_feature_vectors( inference_helper=False, return_type=return_type, transform=transform, - on_demand_feature=transform + on_demand_feature=transform, ) def assemble_feature_vector( @@ -563,7 +567,7 @@ def assemble_feature_vector( def _check_feature_vectors_type_and_convert_to_dict( self, feature_vectors: Union[List[Any], List[List[Any]], pd.DataFrame, pl.DataFrame], - on_demand_features : bool = False, + on_demand_features: bool = False, ) -> Tuple[Dict[str, Any], Literal["pandas", "polars", "list"]]: """ Function that converts an input feature vector into a list of dictionaries. @@ -591,13 +595,19 @@ def _check_feature_vectors_type_and_convert_to_dict( ): return_type = "list" feature_vectors = [ - self.get_untransformed_features_map(feature_vector, on_demand_features=on_demand_features) + self.get_untransformed_features_map( + feature_vector, on_demand_features=on_demand_features + ) for feature_vector in feature_vectors ] else: return_type = "list" - feature_vectors = [self.get_untransformed_features_map(feature_vectors, on_demand_features=on_demand_features)] + feature_vectors = [ + self.get_untransformed_features_map( + feature_vectors, on_demand_features=on_demand_features + ) + ] else: raise exceptions.FeatureStoreException( @@ -626,7 +636,9 @@ def transform( return feature_vectors feature_vectors, return_type = ( - self._check_feature_vectors_type_and_convert_to_dict(feature_vectors, on_demand_features=True) + self._check_feature_vectors_type_and_convert_to_dict( + feature_vectors, on_demand_features=True + ) ) transformed_feature_vectors = [] for feature_vector in feature_vectors: @@ -716,10 +728,12 @@ def compute_on_demand_features( inference_helper=False, return_type=return_type, transform=False, - on_demand_feature=True + on_demand_feature=True, ) - def get_untransformed_features_map(self, features : List[Any], on_demand_features : bool = False) -> Dict[str, Any]: + def get_untransformed_features_map( + self, features: List[Any], on_demand_features: bool = False + ) -> Dict[str, Any]: """ Function that accepts a feature vectors as a list and returns the untransformed features as a dict that maps feature names to their values @@ -759,7 +773,7 @@ def handle_feature_vector_return_type( inference_helper: bool, return_type: Union[Literal["list", "dict", "numpy", "pandas", "polars"]], transform: bool = False, - on_demand_feature: bool=False + on_demand_feature: bool = False, ) -> Union[ pd.DataFrame, pl.DataFrame, @@ -839,7 +853,7 @@ def get_inference_helper( inference_helper=True, return_type=return_type, transform=False, - on_demand_feature=False + on_demand_feature=False, ) return self.handle_feature_vector_return_type( self.sql_client.get_inference_helper_vector(entry), @@ -847,7 +861,7 @@ def get_inference_helper( inference_helper=True, return_type=return_type, transform=False, - on_demand_feature=False + on_demand_feature=False, ) def get_inference_helpers( @@ -897,7 +911,7 @@ def get_inference_helpers( inference_helper=True, return_type=return_type, transform=False, - on_demand_feature=False + on_demand_feature=False, ) def which_client_and_ensure_initialised( From 2709d4c6b5e32bad0892d795dc2aeb3d1a8255b3 Mon Sep 17 00:00:00 2001 From: manu-sj Date: Tue, 23 Jul 2024 21:28:55 +0200 Subject: [PATCH 3/6] changing key name --- python/hsfs/training_dataset_feature.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/hsfs/training_dataset_feature.py b/python/hsfs/training_dataset_feature.py index 3aa3f6a81..922039bab 100644 --- a/python/hsfs/training_dataset_feature.py +++ b/python/hsfs/training_dataset_feature.py @@ -66,7 +66,7 @@ def to_dict(self): "trainingHelperColumn": self._training_helper_column, "featureGroupFeatureName": self._feature_group_feature_name, "featuregroup": self._feature_group, - "transformation_function": self._on_demand_transformation_function, + "transformation_function": self.on_demand_transformation_function, } @classmethod From a97f8155e612a721e6d5596976678fdb156eb066 Mon Sep 17 00:00:00 2001 From: manu-sj Date: Tue, 23 Jul 2024 21:39:23 +0200 Subject: [PATCH 4/6] changing key name in hopsworks_udf --- python/hsfs/hopsworks_udf.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/hsfs/hopsworks_udf.py b/python/hsfs/hopsworks_udf.py index 7e16f03c7..36524603d 100644 --- a/python/hsfs/hopsworks_udf.py +++ b/python/hsfs/hopsworks_udf.py @@ -669,7 +669,7 @@ def to_dict(self) -> Dict[str, Any]: "statisticsArgumentNames": self._statistics_argument_names if self.statistics_required else None, - "name": self._function_name, + "name": self.function_name, "featureNamePrefix": self._feature_name_prefix, } From 648b4029d05f328eafb317a1ee863780f2a75c62 Mon Sep 17 00:00:00 2001 From: manu-sj Date: Wed, 24 Jul 2024 09:12:09 +0200 Subject: [PATCH 5/6] changing to camel case in to_dict --- python/hsfs/training_dataset_feature.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/hsfs/training_dataset_feature.py b/python/hsfs/training_dataset_feature.py index 922039bab..53ed5e2f0 100644 --- a/python/hsfs/training_dataset_feature.py +++ b/python/hsfs/training_dataset_feature.py @@ -66,7 +66,7 @@ def to_dict(self): "trainingHelperColumn": self._training_helper_column, "featureGroupFeatureName": self._feature_group_feature_name, "featuregroup": self._feature_group, - "transformation_function": self.on_demand_transformation_function, + "transformationFunction": self.on_demand_transformation_function, } @classmethod From a0886dafebb73baf8ba5437dac49069d70b6a643 Mon Sep 17 00:00:00 2001 From: manu-sj Date: Wed, 24 Jul 2024 16:35:42 +0200 Subject: [PATCH 6/6] setting transformed_features as a property and considering dropped features in it --- python/hsfs/feature_view.py | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/python/hsfs/feature_view.py b/python/hsfs/feature_view.py index b8b1fddd3..e854f4ce4 100644 --- a/python/hsfs/feature_view.py +++ b/python/hsfs/feature_view.py @@ -4021,16 +4021,18 @@ def logging_enabled(self) -> bool: def logging_enabled(self, logging_enabled) -> None: self._logging_enabled = logging_enabled + @property def transformed_features(self) -> List[str]: """Name of features of a feature view after transformation functions have been applied""" - transformation_features = set() + dropped_features = set() transformed_column_names = [] for tf in self.transformation_functions: transformed_column_names.extend(tf.output_column_names) - transformation_features.update(tf.hopsworks_udf.transformation_features) + if tf.hopsworks_udf.dropped_features: + dropped_features.update(tf.hopsworks_udf.dropped_features) return [ feature.name for feature in self.features - if feature.name not in transformation_features + if feature.name not in dropped_features ] + transformed_column_names