diff --git a/python/hsfs/core/vector_server.py b/python/hsfs/core/vector_server.py index 1c467028b..fb744398c 100755 --- a/python/hsfs/core/vector_server.py +++ b/python/hsfs/core/vector_server.py @@ -17,6 +17,7 @@ import itertools import logging +import warnings from base64 import b64decode from datetime import datetime, timezone from io import BytesIO @@ -97,6 +98,15 @@ def __init__( ) ] self._untransformed_feature_vector_col_name = [ + feat.name + for feat in features + if not ( + feat.label + or feat.training_helper_column + or feat.on_demand_transformation_function + ) + ] + self._on_demand_feature_vector_col_name = [ feat.name for feat in features if not (feat.label or feat.training_helper_column) @@ -310,7 +320,7 @@ def get_feature_vector( allow_missing: bool = False, force_rest_client: bool = False, force_sql_client: bool = False, - transformed=True, + transform: bool = True, request_parameters: Optional[Dict[str, Any]] = None, ) -> Union[pd.DataFrame, pl.DataFrame, np.ndarray, List[Any], Dict[str, Any]]: """Assembles serving vector from online feature store.""" @@ -343,7 +353,7 @@ def get_feature_vector( vector_db_result=vector_db_features or {}, allow_missing=allow_missing, client=online_client_choice, - transformed=transformed, + transform=transform, request_parameters=request_parameters, ) @@ -352,7 +362,8 @@ def get_feature_vector( batch=False, inference_helper=False, return_type=return_type, - transformed=transformed, + transform=transform, + on_demand_feature=transform, ) def get_feature_vectors( @@ -367,7 +378,7 @@ def get_feature_vectors( allow_missing: bool = False, force_rest_client: bool = False, force_sql_client: bool = False, - transformed=True, + transform: bool = True, ) -> Union[pd.DataFrame, pl.DataFrame, np.ndarray, List[Any], List[Dict[str, Any]]]: """Assembles serving vector from online feature store.""" if passed_features is None: @@ -467,7 +478,7 @@ def get_feature_vectors( vector_db_result=vector_db_result, allow_missing=allow_missing, client=online_client_choice, - transformed=transformed, + transform=transform, request_parameters=request_parameter, ) @@ -479,7 +490,8 @@ def get_feature_vectors( batch=True, inference_helper=False, return_type=return_type, - transformed=transformed, + transform=transform, + on_demand_feature=transform, ) def assemble_feature_vector( @@ -489,7 +501,7 @@ def assemble_feature_vector( vector_db_result: Optional[Dict[str, Any]], allow_missing: bool, client: Literal["rest", "sql"], - transformed, + transform: bool, request_parameters: Optional[Dict[str, Any]] = None, ) -> Optional[List[Any]]: """Assembles serving vector from online feature store.""" @@ -537,11 +549,11 @@ def assemble_feature_vector( if ( len(self.model_dependent_transformation_functions) > 0 or len(self.on_demand_transformation_functions) > 0 - ) and transformed: + ) and transform: self.apply_transformation(result_dict, request_parameters or {}) _logger.debug("Assembled and transformed dict feature vector: %s", result_dict) - if transformed: + if transform: return [ result_dict.get(fname, None) for fname in self.transformed_feature_vector_col_name @@ -552,22 +564,206 @@ def assemble_feature_vector( for fname in self._untransformed_feature_vector_col_name ] - def transform_feature_vectors(self, batch_features): - return [ - self.apply_transformation(self.get_untransformed_features_map(features)) - for features in batch_features - ] + def _check_feature_vectors_type_and_convert_to_dict( + self, + feature_vectors: Union[List[Any], List[List[Any]], pd.DataFrame, pl.DataFrame], + on_demand_features: bool = False, + ) -> Tuple[Dict[str, Any], Literal["pandas", "polars", "list"]]: + """ + Function that converts an input feature vector into a list of dictionaries. - def get_untransformed_features_map(self, features) -> Dict[str, Any]: - return dict( - [ - (fname, fvalue) - for fname, fvalue in zip( - self._untransformed_feature_vector_col_name, features - ) - ] + # Arguments + feature_vectors: `Union[List[Any], List[List[Any]], pd.DataFrame, pl.DataFrame]`. The feature vectors to be converted. + on_demand_features : `bool`. Specify if on-demand features provided in the input feature vector. + + # Returns + `Tuple[Dict[str, Any], Literal["pandas", "polars", "list"]]`: A tuple that contains the feature vector as a dictionary and a string denoting the data type of the input feature vector. + + """ + if isinstance(feature_vectors, pd.DataFrame): + return_type = "pandas" + feature_vectors = feature_vectors.to_dict(orient="records") + + elif isinstance(feature_vectors, pl.DataFrame): + return_type = "polars" + feature_vectors = feature_vectors.to_pandas() + feature_vectors = feature_vectors.to_dict(orient="records") + + elif isinstance(feature_vectors, list) and feature_vectors: + if all( + isinstance(feature_vector, list) for feature_vector in feature_vectors + ): + return_type = "list" + feature_vectors = [ + self.get_untransformed_features_map( + feature_vector, on_demand_features=on_demand_features + ) + for feature_vector in feature_vectors + ] + + else: + return_type = "list" + feature_vectors = [ + self.get_untransformed_features_map( + feature_vectors, on_demand_features=on_demand_features + ) + ] + + else: + raise exceptions.FeatureStoreException( + "Unsupported input type for feature vector. Supported types are `List`, `pandas.DataFrame`, `polars.DataFrame`" + ) + return feature_vectors, return_type + + def transform( + self, + feature_vectors: Union[List[Any], List[List[Any]], pd.DataFrame, pl.DataFrame], + ) -> Union[List[Any], List[List[Any]], pd.DataFrame, pl.DataFrame]: + """ + Applies model dependent transformation on the provided feature vector. + + # Arguments + feature_vectors: `Union[List[Any], List[List[Any]], pd.DataFrame, pl.DataFrame]`. The feature vectors to be transformed using attached model-dependent transformations. + + # Returns + `Union[List[Any], List[List[Any]], pd.DataFrame, pl.DataFrame]`: The transformed feature vector. + """ + if not self._model_dependent_transformation_functions: + warnings.warn( + "Feature view does not have any attached model-dependent transformations. Returning input feature vectors.", + stacklevel=0, + ) + return feature_vectors + + feature_vectors, return_type = ( + self._check_feature_vectors_type_and_convert_to_dict( + feature_vectors, on_demand_features=True + ) + ) + transformed_feature_vectors = [] + for feature_vector in feature_vectors: + transformed_feature_vector = self.apply_model_dependent_transformations( + feature_vector + ) + transformed_feature_vectors.append( + [ + transformed_feature_vector.get(fname, None) + for fname in self.transformed_feature_vector_col_name + ] + ) + + if len(transformed_feature_vectors) == 1: + batch = False + transformed_feature_vectors = transformed_feature_vectors[0] + else: + batch = True + + return self.handle_feature_vector_return_type( + transformed_feature_vectors, + batch=batch, + inference_helper=False, + return_type=return_type, + transform=True, ) + def compute_on_demand_features( + self, + feature_vectors: Union[List[Any], List[List[Any]], pd.DataFrame, pl.DataFrame], + request_parameters: Union[List[Dict[str, Any]], Dict[str, Any]], + ): + """ + Function computes on-demand features present in the feature view. + + # Arguments + feature_vector: `Union[List[Any], List[List[Any]], pd.DataFrame, pl.DataFrame]`. The feature vector to be transformed. + request_parameters: Request parameters required by on-demand transformation functions to compute on-demand features present in the feature view. + # Returns + `Union[List[Any], List[List[Any]], pd.DataFrame, pl.DataFrame]`: The feature vector that contains all on-demand features in the feature view. + """ + if not self._on_demand_transformation_functions: + warnings.warn( + "Feature view does not have any on-demand features. Returning input feature vectors.", + stacklevel=1, + ) + return feature_vectors + + request_parameters = {} if not request_parameters else request_parameters + # Convert feature vectors to dictionary + feature_vectors, return_type = ( + self._check_feature_vectors_type_and_convert_to_dict(feature_vectors) + ) + # Check if all request parameters are provided + # If request parameter is a dictionary then copy it to list with the same length as that of entires + request_parameters = ( + [request_parameters] * len(feature_vectors) + if isinstance(request_parameters, dict) + else request_parameters + ) + self.check_missing_request_parameters( + features=feature_vectors[0], request_parameters=request_parameters[0] + ) + on_demand_feature_vectors = [] + for feature_vector, request_parameter in zip( + feature_vectors, request_parameters + ): + on_demand_feature_vector = self.apply_on_demand_transformations( + feature_vector, request_parameter + ) + on_demand_feature_vectors.append( + [ + on_demand_feature_vector.get(fname, None) + for fname in self._on_demand_feature_vector_col_name + ] + ) + + if len(on_demand_feature_vectors) == 1: + batch = False + on_demand_feature_vectors = on_demand_feature_vectors[0] + else: + batch = True + + return self.handle_feature_vector_return_type( + on_demand_feature_vectors, + batch=batch, + inference_helper=False, + return_type=return_type, + transform=False, + on_demand_feature=True, + ) + + def get_untransformed_features_map( + self, features: List[Any], on_demand_features: bool = False + ) -> Dict[str, Any]: + """ + Function that accepts a feature vectors as a list and returns the untransformed features as a dict that maps + feature names to their values + + # Arguments + features : `List[Any]`. List of feature vectors. + on_demand_features : `bool`. Specify if on-demand features provided in the input feature vector. + + # Returns + `Dict[str, Any]` : Dictionary mapping features name to values. + """ + if on_demand_features: + return dict( + [ + (fname, fvalue) + for fname, fvalue in zip( + self._on_demand_feature_vector_col_name, features + ) + ] + ) + else: + return dict( + [ + (fname, fvalue) + for fname, fvalue in zip( + self._untransformed_feature_vector_col_name, features + ) + ] + ) + def handle_feature_vector_return_type( self, feature_vectorz: Union[ @@ -576,7 +772,8 @@ def handle_feature_vector_return_type( batch: bool, inference_helper: bool, return_type: Union[Literal["list", "dict", "numpy", "pandas", "polars"]], - transformed: bool = False, + transform: bool = False, + on_demand_feature: bool = False, ) -> Union[ pd.DataFrame, pl.DataFrame, @@ -586,10 +783,13 @@ def handle_feature_vector_return_type( Dict[str, Any], List[Dict[str, Any]], ]: - if transformed: + if transform: column_names = self.transformed_feature_vector_col_name else: - column_names = self._feature_vector_col_name + if on_demand_feature: + column_names = self._on_demand_feature_vector_col_name + else: + column_names = self._untransformed_feature_vector_col_name # Only get-feature-vector and get-feature-vectors can return list or numpy if return_type.lower() == "list" and not inference_helper: @@ -652,12 +852,16 @@ def get_inference_helper( batch=False, inference_helper=True, return_type=return_type, + transform=False, + on_demand_feature=False, ) return self.handle_feature_vector_return_type( self.sql_client.get_inference_helper_vector(entry), batch=False, inference_helper=True, return_type=return_type, + transform=False, + on_demand_feature=False, ) def get_inference_helpers( @@ -706,7 +910,8 @@ def get_inference_helpers( batch=True, inference_helper=True, return_type=return_type, - transformed=False, + transform=False, + on_demand_feature=False, ) def which_client_and_ensure_initialised( diff --git a/python/hsfs/feature_view.py b/python/hsfs/feature_view.py index bdcf4b51c..e854f4ce4 100644 --- a/python/hsfs/feature_view.py +++ b/python/hsfs/feature_view.py @@ -511,7 +511,7 @@ def get_feature_vector( allow_missing: bool = False, force_rest_client: bool = False, force_sql_client: bool = False, - transformed: Optional[bool] = True, + transform: Optional[bool] = True, request_parameters: Optional[Dict[str, Any]] = None, ) -> Union[List[Any], pd.DataFrame, np.ndarray, pl.DataFrame]: """Returns assembled feature vector from online feature store. @@ -613,7 +613,7 @@ def get_feature_vector( vector_db_features=vector_db_features, force_rest_client=force_rest_client, force_sql_client=force_sql_client, - transformed=transformed, + transform=transform, request_parameters=request_parameters, ) @@ -626,7 +626,7 @@ def get_feature_vectors( allow_missing: bool = False, force_rest_client: bool = False, force_sql_client: bool = False, - transformed: Optional[bool] = True, + transform: Optional[bool] = True, request_parameters: Optional[List[Dict[str, Any]]] = None, ) -> Union[List[List[Any]], pd.DataFrame, np.ndarray, pl.DataFrame]: """Returns assembled feature vectors in batches from online feature store. @@ -728,7 +728,7 @@ def get_feature_vectors( vector_db_features=vector_db_features, force_rest_client=force_rest_client, force_sql_client=force_sql_client, - transformed=transformed, + transform=transform, request_parameters=request_parameters, ) @@ -3941,7 +3941,7 @@ def transformation_functions( self._transformation_functions = transformation_functions @property - def model_dependent_tranformation_functions(self) -> Dict["str", Callable]: + def model_dependent_transformations(self) -> Dict["str", Callable]: """Get Model-Dependent transformations as a dictionary mapping transformed feature names to transformation function""" return { transformation_function.hopsworks_udf.output_column_names[ @@ -3951,7 +3951,7 @@ def model_dependent_tranformation_functions(self) -> Dict["str", Callable]: } @property - def on_demand_tranformation_functions(self) -> Dict["str", Callable]: + def on_demand_transformations(self) -> Dict["str", Callable]: """Get On-Demand transformations as a dictionary mapping on-demand feature names to transformation function""" return { feature.on_demand_transformation_function.hopsworks_udf.function_name: feature.on_demand_transformation_function.hopsworks_udf.get_udf() @@ -4021,16 +4021,18 @@ def logging_enabled(self) -> bool: def logging_enabled(self, logging_enabled) -> None: self._logging_enabled = logging_enabled + @property def transformed_features(self) -> List[str]: """Name of features of a feature view after transformation functions have been applied""" - transformation_features = set() + dropped_features = set() transformed_column_names = [] for tf in self.transformation_functions: transformed_column_names.extend(tf.output_column_names) - transformation_features.update(tf.hopsworks_udf.transformation_features) + if tf.hopsworks_udf.dropped_features: + dropped_features.update(tf.hopsworks_udf.dropped_features) return [ feature.name for feature in self.features - if feature.name not in transformation_features + if feature.name not in dropped_features ] + transformed_column_names diff --git a/python/hsfs/hopsworks_udf.py b/python/hsfs/hopsworks_udf.py index 01952eb8e..36524603d 100644 --- a/python/hsfs/hopsworks_udf.py +++ b/python/hsfs/hopsworks_udf.py @@ -669,7 +669,7 @@ def to_dict(self) -> Dict[str, Any]: "statisticsArgumentNames": self._statistics_argument_names if self.statistics_required else None, - "name": self._function_name, + "name": self.function_name, "featureNamePrefix": self._feature_name_prefix, } @@ -916,7 +916,7 @@ def transformation_statistics( def output_column_names(self, output_col_names: Union[str, List[str]]) -> None: if not isinstance(output_col_names, List): output_col_names = [output_col_names] - if len(output_col_names) != len(self.return_types): + if not output_col_names and len(output_col_names) != len(self.return_types): raise FeatureStoreException( f"Provided names for output columns does not match the number of columns returned from the UDF. Please provide {len(self.return_types)} names." ) diff --git a/python/hsfs/training_dataset_feature.py b/python/hsfs/training_dataset_feature.py index 3aa3f6a81..53ed5e2f0 100644 --- a/python/hsfs/training_dataset_feature.py +++ b/python/hsfs/training_dataset_feature.py @@ -66,7 +66,7 @@ def to_dict(self): "trainingHelperColumn": self._training_helper_column, "featureGroupFeatureName": self._feature_group_feature_name, "featuregroup": self._feature_group, - "transformation_function": self._on_demand_transformation_function, + "transformationFunction": self.on_demand_transformation_function, } @classmethod