From f914f1d84fceeaf1bb40a885490e57bcb716d01d Mon Sep 17 00:00:00 2001 From: manu-sj <152865565+manu-sj@users.noreply.github.com> Date: Mon, 3 Feb 2025 17:56:05 +0100 Subject: [PATCH] [FSTORE-1672] Allow multiple on-demand features to be returned from an on-demand transformation function and allow passing of local variables to a transformation function (#452) * working code for many to many transformation and also transformation context * adding comments and tests * update feature group schema based on transformation function only if transform is set to true * throw exception if transformation context is passed to training datatset materialization jobs from the python engiine * adapting check_missing_request_parameters to handle multiple on-demand feature returned from a single transformaiton function * checking excplicitly that the passed feature vector is None so that pandas dataframe can also be passed to get feature vector * adding else to handle usecase in which features have to be overwritten after tranformations * adding null check for entries before trying to add them to request parameters * addressing review comments --------- Co-authored-by: Ralf --- python/hsfs/core/feature_group_engine.py | 44 ++- python/hsfs/core/feature_view_engine.py | 13 +- python/hsfs/core/vector_server.py | 327 +++++++++++++----- python/hsfs/engine/python.py | 26 +- python/hsfs/engine/spark.py | 27 +- python/hsfs/feature_group.py | 45 ++- python/hsfs/feature_group_writer.py | 4 + python/hsfs/feature_view.py | 76 +++- python/hsfs/hopsworks_udf.py | 129 +++++-- python/hsfs/transformation_function.py | 31 +- python/tests/engine/test_python.py | 49 ++- python/tests/engine/test_spark.py | 83 ++++- python/tests/test_feature_group.py | 16 +- python/tests/test_feature_group_writer.py | 2 + .../transformation_test_helper.py | 4 + python/tests/test_hopswork_udf.py | 160 +++++++++ python/tests/test_transformation_function.py | 170 ++++++++- 17 files changed, 1028 insertions(+), 178 deletions(-) diff --git a/python/hsfs/core/feature_group_engine.py b/python/hsfs/core/feature_group_engine.py index 30d1cbe4b..e77fbb8ba 100644 --- a/python/hsfs/core/feature_group_engine.py +++ b/python/hsfs/core/feature_group_engine.py @@ -15,7 +15,7 @@ from __future__ import annotations import warnings -from typing import List, Union +from typing import Any, Dict, List, Union from hsfs import engine, feature, util from hsfs import feature_group as fg @@ -49,12 +49,18 @@ def _update_feature_group_schema_on_demand_transformations( transformed_features = [] dropped_features = [] for tf in feature_group.transformation_functions: - transformed_features.append( - feature.Feature( - tf.hopsworks_udf.output_column_names[0], - tf.hopsworks_udf.return_types[0], - on_demand=True, - ) + transformed_features.extend( + [ + feature.Feature( + output_column_name, + return_type, + on_demand=True, + ) + for output_column_name, return_type in zip( + tf.hopsworks_udf.output_column_names, + tf.hopsworks_udf.return_types, + ) + ] ) if tf.hopsworks_udf.dropped_features: dropped_features.extend(tf.hopsworks_udf.dropped_features) @@ -141,6 +147,8 @@ def insert( storage, write_options, validation_options: dict = None, + transformation_context: Dict[str, Any] = None, + transform: bool = True, ): dataframe_features = engine.get_instance().parse_schema_feature_group( feature_dataframe, @@ -152,16 +160,20 @@ def insert( if ( not isinstance(feature_group, fg.ExternalFeatureGroup) and feature_group.transformation_functions + and transform ): feature_dataframe = engine.get_instance()._apply_transformation_function( - feature_group.transformation_functions, feature_dataframe + feature_group.transformation_functions, + feature_dataframe, + transformation_context=transformation_context, ) - dataframe_features = ( - self._update_feature_group_schema_on_demand_transformations( - feature_group=feature_group, features=dataframe_features + dataframe_features = ( + self._update_feature_group_schema_on_demand_transformations( + feature_group=feature_group, features=dataframe_features + ) ) - ) + util.validate_embedding_feature_type( feature_group.embedding_index, dataframe_features ) @@ -361,6 +373,8 @@ def insert_stream( timeout, checkpoint_dir, write_options, + transformation_context: Dict[str, Any] = None, + transform: bool = True, ): if not feature_group.online_enabled and not feature_group.stream: raise exceptions.FeatureStoreException( @@ -377,9 +391,11 @@ def insert_stream( ) ) - if feature_group.transformation_functions: + if feature_group.transformation_functions and transform: dataframe = engine.get_instance()._apply_transformation_function( - feature_group.transformation_functions, dataframe + feature_group.transformation_functions, + dataframe, + transformation_context=transformation_context, ) util.validate_embedding_feature_type( diff --git a/python/hsfs/core/feature_view_engine.py b/python/hsfs/core/feature_view_engine.py index be284f752..592cc2acd 100644 --- a/python/hsfs/core/feature_view_engine.py +++ b/python/hsfs/core/feature_view_engine.py @@ -392,6 +392,7 @@ def create_training_dataset( primary_keys=False, event_time=False, training_helper_columns=False, + transformation_context: Dict[str, Any] = None, ): self._set_event_time(feature_view_obj, training_dataset_obj) updated_instance = self._create_training_data_metadata( @@ -405,6 +406,7 @@ def create_training_dataset( primary_keys=primary_keys, event_time=event_time, training_helper_columns=training_helper_columns, + transformation_context=transformation_context, ) return updated_instance, td_job @@ -420,6 +422,7 @@ def get_training_data( event_time=False, training_helper_columns=False, dataframe_type="default", + transformation_context: Dict[str, Any] = None, ): # check if provided td version has already existed. if training_dataset_version: @@ -497,6 +500,7 @@ def get_training_data( read_options, dataframe_type, training_dataset_version, + transformation_context=transformation_context, ) self.compute_training_dataset_statistics( feature_view_obj, td_updated, split_df @@ -581,6 +585,7 @@ def recreate_training_dataset( statistics_config, user_write_options, spine=None, + transformation_context: Dict[str, Any] = None, ): training_dataset_obj = self._get_training_dataset_metadata( feature_view_obj, training_dataset_version @@ -597,6 +602,7 @@ def recreate_training_dataset( user_write_options, training_dataset_obj=training_dataset_obj, spine=spine, + transformation_context=transformation_context, ) # Set training dataset schema after training dataset has been generated training_dataset_obj.schema = self.get_training_dataset_schema( @@ -757,6 +763,7 @@ def compute_training_dataset( primary_keys=False, event_time=False, training_helper_columns=False, + transformation_context: Dict[str, Any] = None, ): if training_dataset_obj: pass @@ -791,6 +798,7 @@ def compute_training_dataset( user_write_options, self._OVERWRITE, feature_view_obj=feature_view_obj, + transformation_context=transformation_context, ) # Set training dataset schema after training dataset has been generated @@ -913,6 +921,7 @@ def get_batch_data( inference_helper_columns=False, dataframe_type="default", transformed=True, + transformation_context: Dict[str, Any] = None, ): self._check_feature_group_accessibility(feature_view_obj) @@ -936,7 +945,9 @@ def get_batch_data( ).read(read_options=read_options, dataframe_type=dataframe_type) if transformation_functions and transformed: return engine.get_instance()._apply_transformation_function( - transformation_functions, dataset=feature_dataframe + transformation_functions, + dataset=feature_dataframe, + transformation_context=transformation_context, ) else: return feature_dataframe diff --git a/python/hsfs/core/vector_server.py b/python/hsfs/core/vector_server.py index d354a5400..9f0f6c158 100755 --- a/python/hsfs/core/vector_server.py +++ b/python/hsfs/core/vector_server.py @@ -224,15 +224,30 @@ def init_transformation( entity: Union[feature_view.FeatureView], ): # attach transformation functions - self._model_dependent_transformation_functions = tf_engine_mod.TransformationFunctionEngine.get_ready_to_use_transformation_fns( + model_dependent_transformations = tf_engine_mod.TransformationFunctionEngine.get_ready_to_use_transformation_fns( entity, self._training_dataset_version, ) - self._on_demand_transformation_functions = [ - feature.on_demand_transformation_function - for feature in entity.features - if feature.on_demand_transformation_function + + # Filter out model-dependent transformation functions that use label features. Only the first label feature is checked since a transformation function using label feature can only contain label features. + self._model_dependent_transformation_functions = [ + tf + for tf in model_dependent_transformations + if tf.hopsworks_udf.transformation_features[0] not in entity.labels ] + + self._on_demand_transformation_functions = [] + + for feature in entity.features: + if ( + feature.on_demand_transformation_function + and feature.on_demand_transformation_function + not in self._on_demand_transformation_functions + ): + self._on_demand_transformation_functions.append( + feature.on_demand_transformation_function + ) + self._on_demand_feature_names = [ feature.name for feature in entity.features @@ -294,21 +309,39 @@ def check_missing_request_parameters( available_parameters = set((features | request_parameters).keys()) missing_request_parameters_features = {} - for on_demand_feature, on_demand_transformation in zip( - self._on_demand_feature_names, self._on_demand_transformation_functions - ): - missing_request_parameter = ( - set(on_demand_transformation.hopsworks_udf.transformation_features) - - available_parameters + for on_demand_transformation in self._on_demand_transformation_functions: + feature_name_prefix = ( + on_demand_transformation.hopsworks_udf.feature_name_prefix + ) + + transformation_features = set( + on_demand_transformation.hopsworks_udf.transformation_features ) + unprefixed_features = set( + on_demand_transformation.hopsworks_udf.unprefixed_transformation_features + ) + # Prefixed feature names for request parameters that are not available in their unprefixed form as request-parameters. + unprefixed_missing_features = { + feature_name_prefix + feature_name + if feature_name_prefix + else feature_name + for feature_name in unprefixed_features - available_parameters + } + + # Prefixed feature names for features that are not available in the in their unprefixed form as request-parameters. + prefixed_missing_features = transformation_features - available_parameters + + # Get Missing request parameters: These are will include request parameters that are not provided in their unprefixed or prefixed form. + missing_request_parameter = prefixed_missing_features.intersection( + unprefixed_missing_features + ) + if missing_request_parameter: - missing_request_parameters_features[on_demand_feature] = sorted( - list( - set( - on_demand_transformation.hopsworks_udf.transformation_features - ) - - available_parameters - ) + missing_request_parameters_features.update( + { + on_demand_feature: sorted(missing_request_parameter) + for on_demand_feature in on_demand_transformation.hopsworks_udf.output_column_names + } ) if missing_request_parameters_features: @@ -340,11 +373,18 @@ def get_feature_vector( force_sql_client: bool = False, transform: bool = True, request_parameters: Optional[Dict[str, Any]] = None, + transformation_context: Dict[str, Any] = None, ) -> Union[pd.DataFrame, pl.DataFrame, np.ndarray, List[Any], Dict[str, Any]]: """Assembles serving vector from online feature store.""" online_client_choice = self.which_client_and_ensure_initialised( force_rest_client=force_rest_client, force_sql_client=force_sql_client ) + + # Adding values in entry to request_parameters if it is not explicitly mentioned so that on-demand feature can be computed using the values in entry if they are not present in retrieved feature vector. This happens when no features can be retrieved from the feature view since the serving key is not yet there. + if request_parameters and entry: + for key, value in entry.items(): + request_parameters.setdefault(key, value) + rondb_entry = self.validate_entry( entry=entry, allow_missing=allow_missing, @@ -373,6 +413,7 @@ def get_feature_vector( client=online_client_choice, transform=transform, request_parameters=request_parameters, + transformation_context=transformation_context, ) return self.handle_feature_vector_return_type( @@ -397,6 +438,7 @@ def get_feature_vectors( force_rest_client: bool = False, force_sql_client: bool = False, transform: bool = True, + transformation_context: Dict[str, Any] = None, ) -> Union[pd.DataFrame, pl.DataFrame, np.ndarray, List[Any], List[Dict[str, Any]]]: """Assembles serving vector from online feature store.""" if passed_features is None: @@ -420,6 +462,18 @@ def get_feature_vectors( or len(request_parameters) == len(entries) ), "Request Parameters should be a Dictionary, None, empty or have the same length as the entries if they are not None or empty." + # Adding values in entry to request_parameters if it is not explicitly mentioned so that on-demand feature can be computed using the values in entry if they are not present in retrieved feature vector. + if request_parameters and entries: + if isinstance(request_parameters, list) and len(entries) == len( + request_parameters + ): + for idx, entry in enumerate(entries): + for key, value in entry.items(): + request_parameters[idx].setdefault(key, value) + elif isinstance(request_parameters, dict) and len(entries) == 1: + for key, value in entries[0].items(): + request_parameters.setdefault(key, value) + online_client_choice = self.which_client_and_ensure_initialised( force_rest_client=force_rest_client, force_sql_client=force_sql_client ) @@ -507,6 +561,7 @@ def get_feature_vectors( client=online_client_choice, transform=transform, request_parameters=request_parameter, + transformation_context=transformation_context, ) if vector is not None: @@ -530,6 +585,7 @@ def assemble_feature_vector( client: Literal["rest", "sql"], transform: bool, request_parameters: Optional[Dict[str, Any]] = None, + transformation_context: Dict[str, Any] = None, ) -> Optional[List[Any]]: """Assembles serving vector from online feature store.""" # Errors in batch requests are returned as None values @@ -549,10 +605,10 @@ def assemble_feature_vector( .difference(result_dict.keys()) .difference(self._on_demand_feature_names) ) - - self.check_missing_request_parameters( - features=result_dict, request_parameters=request_parameters - ) + if transform: + self.check_missing_request_parameters( + features=result_dict, request_parameters=request_parameters + ) # for backward compatibility, before 3.4, if result is empty, # instead of throwing error, it skips the result @@ -580,7 +636,9 @@ def assemble_feature_vector( len(self.model_dependent_transformation_functions) > 0 or len(self.on_demand_transformation_functions) > 0 ) and transform: - self.apply_transformation(result_dict, request_parameters or {}) + self.apply_transformation( + result_dict, request_parameters or {}, transformation_context + ) _logger.debug("Assembled and transformed dict feature vector: %s", result_dict) if transform: @@ -594,6 +652,53 @@ def assemble_feature_vector( for fname in self._untransformed_feature_vector_col_name ] + def _validate_input_features( + self, + feature_vectors: Union[List[Any], List[List[Any]], pd.DataFrame, pl.DataFrame], + on_demand_features: bool = False, + ) -> None: + """ + Validate if an feature-vector provided contain all required features. + + # Arguments + feature_vectors: `Union[List[Any], List[List[Any]], pd.DataFrame, pl.DataFrame]`. The feature vectors to be converted. + on_demand_features : `bool`. Specify if on-demand features provided in the input feature vector. + + """ + + required_features = ( + set(self._untransformed_feature_vector_col_name) + if not on_demand_features + else set(self._on_demand_feature_vector_col_name) + ) + + if isinstance( + feature_vectors, pd.DataFrame or isinstance(feature_vectors, pl.DataFrame) + ): + missing_features = required_features - set(feature_vectors.columns) + if missing_features: + raise exceptions.FeatureStoreException( + f"The input feature vector is missing the following required features: `{'`, `'.join(missing_features)}`. Please include these features in the feature vector." + ) + else: + if isinstance(feature_vectors, list): + if feature_vectors and all( + isinstance(feature_vector, list) + for feature_vector in feature_vectors + ): + if any( + len(feature_vector) != len(required_features) + for feature_vector in feature_vectors + ): + raise exceptions.FeatureStoreException( + f"Input feature vector is missing required features. Please ensure the following features are included: `{'`, `'.join(required_features)}`." + ) + else: + if len(feature_vectors) != len(required_features): + raise exceptions.FeatureStoreException( + f"Input feature vector is missing required features. Please ensure the following features are included: '{', '.join(required_features)}'." + ) + def _check_feature_vectors_type_and_convert_to_dict( self, feature_vectors: Union[List[Any], List[List[Any]], pd.DataFrame, pl.DataFrame], @@ -610,17 +715,17 @@ def _check_feature_vectors_type_and_convert_to_dict( `Tuple[Dict[str, Any], Literal["pandas", "polars", "list"]]`: A tuple that contains the feature vector as a dictionary and a string denoting the data type of the input feature vector. """ + if isinstance(feature_vectors, pd.DataFrame): return_type = "pandas" feature_vectors = feature_vectors.to_dict(orient="records") elif HAS_POLARS and isinstance(feature_vectors, pl.DataFrame): return_type = "polars" - feature_vectors = feature_vectors.to_pandas() - feature_vectors = feature_vectors.to_dict(orient="records") + feature_vectors = feature_vectors.to_pandas().to_dict(orient="records") - elif isinstance(feature_vectors, list) and feature_vectors: - if all( + elif isinstance(feature_vectors, list): + if feature_vectors and all( isinstance(feature_vector, list) for feature_vector in feature_vectors ): return_type = "list" @@ -648,13 +753,17 @@ def _check_feature_vectors_type_and_convert_to_dict( def transform( self, feature_vectors: Union[List[Any], List[List[Any]], pd.DataFrame, pl.DataFrame], + transformation_context: Dict[str, Any] = None, + return_type: Union[Literal["list", "numpy", "pandas", "polars"]] = None, ) -> Union[List[Any], List[List[Any]], pd.DataFrame, pl.DataFrame]: """ Applies model dependent transformation on the provided feature vector. # Arguments feature_vectors: `Union[List[Any], List[List[Any]], pd.DataFrame, pl.DataFrame]`. The feature vectors to be transformed using attached model-dependent transformations. - + transformation_context: `Dict[str, Any]` A dictionary mapping variable names to objects that will be provided as contextual information to the transformation function at runtime. + These variables must be explicitly defined as parameters in the transformation function to be accessible during execution. If no context variables are provided, this parameter defaults to `None`. + return_type: `"list"`, `"pandas"`, `"polars"` or `"numpy"`. Defaults to the same type as the input feature vector. # Returns `Union[List[Any], List[List[Any]], pd.DataFrame, pl.DataFrame]`: The transformed feature vector. """ @@ -665,15 +774,18 @@ def transform( ) return feature_vectors - feature_vectors, return_type = ( + feature_vectors, default_return_type = ( self._check_feature_vectors_type_and_convert_to_dict( feature_vectors, on_demand_features=True ) ) + + return_type = return_type if return_type else default_return_type + transformed_feature_vectors = [] for feature_vector in feature_vectors: transformed_feature_vector = self.apply_model_dependent_transformations( - feature_vector + feature_vector, transformation_context=transformation_context ) transformed_feature_vectors.append( [ @@ -698,8 +810,12 @@ def transform( def compute_on_demand_features( self, - feature_vectors: Union[List[Any], List[List[Any]], pd.DataFrame, pl.DataFrame], - request_parameters: Union[List[Dict[str, Any]], Dict[str, Any]], + feature_vectors: Optional[ + Union[List[Any], List[List[Any]], pd.DataFrame, pl.DataFrame] + ] = None, + request_parameters: Union[List[Dict[str, Any]], Dict[str, Any]] = None, + transformation_context: Dict[str, Any] = None, + return_type: Union[Literal["list", "numpy", "pandas", "polars"]] = None, ): """ Function computes on-demand features present in the feature view. @@ -707,6 +823,9 @@ def compute_on_demand_features( # Arguments feature_vector: `Union[List[Any], List[List[Any]], pd.DataFrame, pl.DataFrame]`. The feature vector to be transformed. request_parameters: Request parameters required by on-demand transformation functions to compute on-demand features present in the feature view. + transformation_context: `Dict[str, Any]` A dictionary mapping variable names to objects that will be provided as contextual information to the transformation function at runtime. + These variables must be explicitly defined as parameters in the transformation function to be accessible during execution. If no context variables are provided, this parameter defaults to `None`. + return_type: `"list"`, `"pandas"`, `"polars"` or `"numpy"`. Defaults to the same type as the input feature vector. # Returns `Union[List[Any], List[List[Any]], pd.DataFrame, pl.DataFrame]`: The feature vector that contains all on-demand features in the feature view. """ @@ -717,11 +836,16 @@ def compute_on_demand_features( ) return feature_vectors + feature_vectors = [] if feature_vectors is None else feature_vectors + request_parameters = {} if not request_parameters else request_parameters # Convert feature vectors to dictionary - feature_vectors, return_type = ( + feature_vectors, default_return_type = ( self._check_feature_vectors_type_and_convert_to_dict(feature_vectors) ) + + return_type = return_type if return_type else default_return_type + # Check if all request parameters are provided # If request parameter is a dictionary then copy it to list with the same length as that of entires request_parameters = ( @@ -737,7 +861,9 @@ def compute_on_demand_features( feature_vectors, request_parameters ): on_demand_feature_vector = self.apply_on_demand_transformations( - feature_vector, request_parameter + feature_vector, + request_parameter, + transformation_context=transformation_context, ) on_demand_feature_vectors.append( [ @@ -1010,51 +1136,68 @@ def _set_default_client( self._init_sql_client = True def apply_on_demand_transformations( - self, rows: Union[dict, pd.DataFrame], request_parameter: Dict[str, Any] + self, + rows: Union[dict, pd.DataFrame], + request_parameter: Dict[str, Any], + transformation_context: Dict[str, Any] = None, ) -> dict: _logger.debug("Applying On-Demand transformation functions.") for tf in self._on_demand_transformation_functions: - if ( - tf.hopsworks_udf.execution_mode.get_current_execution_mode(online=True) - == UDFExecutionMode.PANDAS - ): - # Check if feature provided as request parameter if not get it from retrieved feature vector. - features = [ - pd.Series(request_parameter[feature]) - if feature in request_parameter.keys() - else ( - pd.Series( - rows[feature] - if (not isinstance(rows[feature], pd.Series)) - else rows[feature] - ) + # Setting transformation function context variables. + tf.hopsworks_udf.transformation_context = transformation_context + + # Check if feature provided as request parameter in prefixed or unprefixed format if not get it from retrieved feature vector. + features = [] + for ( + unprefixed_feature + ) in tf.hopsworks_udf.unprefixed_transformation_features: + # Check if the on-demand feature has a prefix. If it does, compute the prefixed feature name. + if tf.hopsworks_udf.feature_name_prefix: + prefixed_feature = ( + tf.hopsworks_udf.feature_name_prefix + unprefixed_feature ) - for feature in tf.hopsworks_udf.transformation_features - ] - else: - # No need to cast to pandas Series for Python UDF's - features = [ - request_parameter[feature] - if feature in request_parameter.keys() - else rows[feature] - for feature in tf.hopsworks_udf.transformation_features - ] + else: + prefixed_feature = unprefixed_feature + + # Check if the prefixed feature name is provided as a request parameter, if so then use it. Otherwise if the unprefixed feature name is provided as a request parameter and use it. Else fetch the feature from the retrieved feature vector + feature_value = request_parameter.get( + prefixed_feature, + request_parameter.get( + unprefixed_feature, rows.get(prefixed_feature) + ), + ) + + if ( + tf.hopsworks_udf.execution_mode.get_current_execution_mode( + online=True + ) + == UDFExecutionMode.PANDAS + ): + features.append( + pd.Series(feature_value) + if (not isinstance(feature_value, pd.Series)) + else feature_value + ) + else: + # No need to cast to pandas Series for Python UDF's + features.append(feature_value) on_demand_feature = tf.hopsworks_udf.get_udf(online=True)( *features ) # Get only python compatible UDF irrespective of engine - if ( - tf.hopsworks_udf.execution_mode.get_current_execution_mode(online=True) - == UDFExecutionMode.PANDAS - ): - rows[on_demand_feature.name] = on_demand_feature.values[0] - else: - rows[tf.output_column_names[0]] = on_demand_feature + + rows.update(self.parse_transformed_result(on_demand_feature, tf)) return rows - def apply_model_dependent_transformations(self, rows: Union[dict, pd.DataFrame]): + def apply_model_dependent_transformations( + self, + rows: Union[dict, pd.DataFrame], + transformation_context: Dict[str, Any] = None, + ): _logger.debug("Applying Model-Dependent transformation functions.") for tf in self.model_dependent_transformation_functions: + # Setting transformation function context variables. + tf.hopsworks_udf.transformation_context = transformation_context if ( tf.hopsworks_udf.execution_mode.get_current_execution_mode(online=True) == UDFExecutionMode.PANDAS @@ -1076,32 +1219,50 @@ def apply_model_dependent_transformations(self, rows: Union[dict, pd.DataFrame]) *features ) # Get only python compatible UDF irrespective of engine - if ( - tf.hopsworks_udf.execution_mode.get_current_execution_mode(online=True) - == UDFExecutionMode.PANDAS + rows.update(self.parse_transformed_result(transformed_result, tf)) + return rows + + def parse_transformed_result(self, transformed_results, transformation_function): + rows = {} + if ( + transformation_function.hopsworks_udf.execution_mode.get_current_execution_mode( + online=True + ) + == UDFExecutionMode.PANDAS + ): + if isinstance(transformed_results, pd.Series): + rows[transformed_results.name] = transformed_results.values[0] + else: + for col in transformed_results: + rows[col] = transformed_results[col].values[0] + else: + if isinstance(transformed_results, tuple) or isinstance( + transformed_results, list ): - if isinstance(transformed_result, pd.Series): - rows[transformed_result.name] = transformed_result.values[0] - else: - for col in transformed_result: - rows[col] = transformed_result[col].values[0] + for index, result in enumerate(transformed_results): + rows[transformation_function.output_column_names[index]] = result else: - if isinstance(transformed_result, tuple): - for index, result in enumerate(transformed_result): - rows[tf.output_column_names[index]] = result - else: - rows[tf.output_column_names[0]] = transformed_result + rows[transformation_function.output_column_names[0]] = ( + transformed_results + ) return rows def apply_transformation( - self, row_dict: Union[dict, pd.DataFrame], request_parameter: Dict[str, Any] + self, + row_dict: Union[dict, pd.DataFrame], + request_parameter: Dict[str, Any], + transformation_context: Dict[str, Any] = None, ): """ Function that applies both on-demand and model dependent transformation to the input dictonary """ - feature_dict = self.apply_on_demand_transformations(row_dict, request_parameter) + feature_dict = self.apply_on_demand_transformations( + row_dict, request_parameter, transformation_context + ) - encoded_feature_dict = self.apply_model_dependent_transformations(feature_dict) + encoded_feature_dict = self.apply_model_dependent_transformations( + feature_dict, transformation_context + ) return encoded_feature_dict def apply_return_value_handlers( diff --git a/python/hsfs/engine/python.py b/python/hsfs/engine/python.py index d5e16fe93..460217080 100644 --- a/python/hsfs/engine/python.py +++ b/python/hsfs/engine/python.py @@ -869,6 +869,7 @@ def get_training_data( read_options: Dict[str, Any], dataframe_type: str, training_dataset_version: int = None, + transformation_context: Dict[str, Any] = None, ) -> Union[pd.DataFrame, pl.DataFrame]: """ Function that creates or retrieves already created the training dataset. @@ -880,6 +881,8 @@ def get_training_data( read_options `Dict[str, Any]`: Dictionary that can be used to specify extra parameters for reading data. dataframe_type `str`: The type of dataframe returned. training_dataset_version `int`: Version of training data to be retrieved. + transformation_context: `Dict[str, Any]` A dictionary mapping variable names to objects that will be provided as contextual information to the transformation function at runtime. + These variables must be explicitly defined as parameters in the transformation function to be accessible during execution. If no context variables are provided, this parameter defaults to `None`. # Raises `ValueError`: If the training dataset statistics could not be retrieved. """ @@ -897,6 +900,7 @@ def get_training_data( read_options, dataframe_type, training_dataset_version, + transformation_context=transformation_context, ) else: df = query_obj.read( @@ -911,7 +915,9 @@ def get_training_data( # training_dataset_obj, feature_view_obj, training_dataset_version # ) return self._apply_transformation_function( - feature_view_obj.transformation_functions, df + feature_view_obj.transformation_functions, + df, + transformation_context=transformation_context, ) def split_labels( @@ -945,6 +951,7 @@ def _prepare_transform_split_df( read_option: Dict[str, Any], dataframe_type: str, training_dataset_version: int = None, + transformation_context: Dict[str, Any] = None, ) -> Dict[str, Union[pd.DataFrame, pl.DataFrame]]: """ Split a df into slices defined by `splits`. `splits` is a `dict(str, int)` which keys are name of split @@ -957,6 +964,8 @@ def _prepare_transform_split_df( read_options `Dict[str, Any]`: Dictionary that can be used to specify extra parameters for reading data. dataframe_type `str`: The type of dataframe returned. training_dataset_version `int`: Version of training data to be retrieved. + transformation_context: `Dict[str, Any]` A dictionary mapping variable names to objects that will be provided as contextual information to the transformation function at runtime. + These variables must be explicitly defined as parameters in the transformation function to be accessible during execution. If no context variables are provided, this parameter defaults to `None`. # Raises `ValueError`: If the training dataset statistics could not be retrieved. """ @@ -1005,6 +1014,7 @@ def _prepare_transform_split_df( result_dfs[split_name] = self._apply_transformation_function( feature_view_obj.transformation_functions, result_dfs.get(split_name), + transformation_context=transformation_context, ) return result_dfs @@ -1085,6 +1095,7 @@ def write_training_dataset( save_mode: str, feature_view_obj: Optional[feature_view.FeatureView] = None, to_df: bool = False, + transformation_context: Dict[str, Any] = None, ) -> Union["job.Job", Any]: if not feature_view_obj and not isinstance(dataset, query.Query): raise Exception( @@ -1105,6 +1116,7 @@ def write_training_dataset( and feature_view_obj and len(feature_view_obj.transformation_functions) == 0 and training_dataset.data_format == "parquet" + and not transformation_context ): query_obj, _ = dataset._prep_read(False, user_write_options) response = util.run_with_loading_animation( @@ -1121,6 +1133,13 @@ def write_training_dataset( # As for creating a feature group, users have the possibility of passing # a spark_job_configuration object as part of the user_write_options with the key "spark" spark_job_configuration = user_write_options.pop("spark", None) + + # Pass transformation context to the training dataset job + if transformation_context: + raise FeatureStoreException( + "Cannot pass transformation context to training dataset materialization job from the Python Kernel. Please use the Spark Kernel." + ) + td_app_conf = training_dataset_job_conf.TrainingDatasetJobConf( query=dataset, overwrite=(save_mode == "overwrite"), @@ -1257,6 +1276,7 @@ def _apply_transformation_function( transformation_functions: List[transformation_function.TransformationFunction], dataset: Union[pd.DataFrame, pl.DataFrame], online_inference: bool = False, + transformation_context: Dict[str, Any] = None, ) -> Union[pd.DataFrame, pl.DataFrame]: """ Apply transformation function to the dataframe. @@ -1285,6 +1305,10 @@ def _apply_transformation_function( for tf in transformation_functions: hopsworks_udf = tf.hopsworks_udf + + # Setting transformation function context variables. + hopsworks_udf.transformation_context = transformation_context + missing_features = set(hopsworks_udf.transformation_features) - set( dataset.columns ) diff --git a/python/hsfs/engine/spark.py b/python/hsfs/engine/spark.py index 10d3a9cb1..661c894a6 100644 --- a/python/hsfs/engine/spark.py +++ b/python/hsfs/engine/spark.py @@ -197,7 +197,9 @@ def register_external_temporary_table(self, external_fg, alias): external_fg.query, external_fg.data_format, external_fg.options, - external_fg.storage_connector._get_path(external_fg.path), # cant rely on location since this method can be used before FG is saved + external_fg.storage_connector._get_path( + external_fg.path + ), # cant rely on location since this method can be used before FG is saved ) else: external_dataset = external_fg.dataframe @@ -635,6 +637,7 @@ def get_training_data( read_options: Dict[str, Any], dataframe_type: str, training_dataset_version: int = None, + transformation_context: Dict[str, Any] = None, ): """ Function that creates or retrieves already created the training dataset. @@ -646,6 +649,8 @@ def get_training_data( read_options `Dict[str, Any]`: Dictionary that can be used to specify extra parameters for reading data. dataframe_type `str`: The type of dataframe returned. training_dataset_version `int`: Version of training data to be retrieved. + transformation_context: `Dict[str, Any]` A dictionary mapping variable names to objects that will be provided as contextual information to the transformation function at runtime. + These variables must be explicitly defined as parameters in the transformation function to be accessible during execution. If no context variables are provided, this parameter defaults to `None`. # Raises `ValueError`: If the training dataset statistics could not be retrieved. """ @@ -658,6 +663,7 @@ def get_training_data( to_df=True, feature_view_obj=feature_view_obj, training_dataset_version=training_dataset_version, + transformation_context=transformation_context, ) def split_labels(self, df, labels, dataframe_type): @@ -688,6 +694,7 @@ def write_training_dataset( feature_view_obj: feature_view.FeatureView = None, to_df: bool = False, training_dataset_version: Optional[int] = None, + transformation_context: Dict[str, Any] = None, ): """ Function that creates or retrieves already created the training dataset. @@ -701,6 +708,8 @@ def write_training_dataset( feature_view_obj `FeatureView`: The feature view object for the which the training data is being created. to_df `bool`: Return dataframe instead of writing the data. training_dataset_version `Optional[int]`: Version of training data to be retrieved. + transformation_context: `Dict[str, Any]` A dictionary mapping variable names to objects that will be provided as contextual information to the transformation function at runtime. + These variables must be explicitly defined as parameters in the transformation function to be accessible during execution. If no context variables are provided, this parameter defaults to `None`. # Raises `ValueError`: If the training dataset statistics could not be retrieved. """ @@ -739,6 +748,7 @@ def write_training_dataset( save_mode, path, to_df=to_df, + transformation_context=transformation_context, ) else: split_dataset = self._split_df( @@ -766,6 +776,7 @@ def write_training_dataset( save_mode, to_df=to_df, transformation_functions=feature_view_obj.transformation_functions, + transformation_context=transformation_context, ) def _split_df(self, query_obj, training_dataset, read_options=None): @@ -920,6 +931,7 @@ def _write_training_dataset_splits( transformation_functions: List[ transformation_function.TransformationFunction ] = None, + transformation_context: Dict[str, Any] = None, ): for split_name, feature_dataframe in feature_dataframes.items(): split_path = training_dataset.location + "/" + str(split_name) @@ -932,6 +944,7 @@ def _write_training_dataset_splits( save_mode, split_path, to_df=to_df, + transformation_context=transformation_context, ) if to_df: @@ -947,10 +960,13 @@ def _write_training_dataset_single( save_mode, path, to_df=False, + transformation_context: Dict[str, Any] = None, ): # apply transformation functions (they are applied separately to each split) feature_dataframe = self._apply_transformation_function( - transformation_functions, dataset=feature_dataframe + transformation_functions, + dataset=feature_dataframe, + transformation_context=transformation_context, ) if to_df: return feature_dataframe @@ -1348,6 +1364,7 @@ def _apply_transformation_function( self, transformation_functions: List[transformation_function.TransformationFunction], dataset: DataFrame, + transformation_context: Dict[str, Any] = None, ): """ Apply transformation function to the dataframe. @@ -1355,6 +1372,8 @@ def _apply_transformation_function( # Arguments transformation_functions `List[TransformationFunction]` : List of transformation functions. dataset `Union[DataFrame]`: A spark dataframe. + transformation_context: `Dict[str, Any]` A dictionary mapping variable names to objects that will be provided as contextual information to the transformation function at runtime. + These variables must be explicitly defined as parameters in the transformation function to be accessible during execution. If no context variables are provided, this parameter defaults to `None`. # Returns `DataFrame`: A spark dataframe with the transformed data. # Raises @@ -1367,6 +1386,10 @@ def _apply_transformation_function( explode_name = [] for tf in transformation_functions: hopsworks_udf = tf.hopsworks_udf + + # Setting transformation function context variables. + hopsworks_udf.transformation_context = transformation_context + missing_features = set(hopsworks_udf.transformation_features) - set( dataset.columns ) diff --git a/python/hsfs/feature_group.py b/python/hsfs/feature_group.py index 06ced9ee6..95c0fbfe2 100644 --- a/python/hsfs/feature_group.py +++ b/python/hsfs/feature_group.py @@ -2065,7 +2065,7 @@ def storage_connector(self) -> "sc.StorageConnector": def prepare_spark_location(self) -> str: location = self.location - if (self.storage_connector is not None): + if self.storage_connector is not None: location = self.storage_connector.prepare_spark(location) return location @@ -2727,10 +2727,14 @@ def save( # Raises `hsfs.client.exceptions.RestAPIError`. Unable to create feature group. """ - if (features is None and len(self._features) > 0) or ( - isinstance(features, List) - and len(features) > 0 - and all([isinstance(f, feature.Feature) for f in features]) + if ( + (features is None and len(self._features) > 0) + or ( + isinstance(features, List) + and len(features) > 0 + and all([isinstance(f, feature.Feature) for f in features]) + ) + or (not features and len(self.transformation_functions) > 0) ): # This is done for compatibility. Users can specify the feature list in the # (get_or_)create_feature_group. Users can also provide the feature list in the save(). @@ -2739,7 +2743,13 @@ def save( # and in the `save()` call, then the (get_or_)create_feature_group wins. # This is consistent with the behavior of the insert method where the feature list wins over the # dataframe structure - self._features = self._features if len(self._features) > 0 else features + self._features = ( + self._features + if len(self._features) > 0 + else features + if features + else [] + ) self._features = self._feature_group_engine._update_feature_group_schema_on_demand_transformations( self, self._features @@ -2804,6 +2814,8 @@ def insert( write_options: Optional[Dict[str, Any]] = None, validation_options: Optional[Dict[str, Any]] = None, wait: bool = False, + transformation_context: Dict[str, Any] = None, + transform: bool = True, ) -> Tuple[Optional[Job], Optional[ValidationReport]]: """Persist the metadata and materialize the feature group to the feature store or insert data from a dataframe into the existing feature group. @@ -2910,6 +2922,9 @@ def insert( suite of the feature group should be fetched before every insert. wait: Wait for job to finish before returning, defaults to `False`. Shortcut for read_options `{"wait_for_job": False}`. + transformation_context: `Dict[str, Any]` A dictionary mapping variable names to objects that will be provided as contextual information to the transformation function at runtime. + These variables must be explicitly defined as parameters in the transformation function to be accessible during execution. If no context variables are provided, this parameter defaults to `None`. + transform: `bool`. When set to `False`, the dataframe is inserted without applying any on-demand transformations. In this case, all required on-demand features must already exist in the provided dataframe. Defaults to `True`. # Returns (`Job`, `ValidationReport`) A tuple with job information if python engine is used and the validation report if validation is enabled. @@ -2945,6 +2960,8 @@ def insert( storage=storage.lower() if storage is not None else None, write_options=write_options, validation_options={"save_report": True, **validation_options}, + transformation_context=transformation_context, + transform=transform, ) if engine.get_type().startswith("spark") and not self.stream: @@ -2974,6 +2991,8 @@ def multi_part_insert( storage: Optional[str] = None, write_options: Optional[Dict[str, Any]] = None, validation_options: Optional[Dict[str, Any]] = None, + transformation_context: Dict[str, Any] = None, + transform: bool = True, ) -> Union[ Tuple[Optional[Job], Optional[ValidationReport]], feature_group_writer.FeatureGroupWriter, @@ -3072,6 +3091,9 @@ def multi_part_insert( * key `ge_validate_kwargs` a dictionary containing kwargs for the validate method of Great Expectations. * key `fetch_expectation_suite` a boolean value, by default `False` for multi part inserts, to control whether the expectation suite of the feature group should be fetched before every insert. + transformation_context: `Dict[str, Any]` A dictionary mapping variable names to objects that will be provided as contextual information to the transformation function at runtime. + These variables must be explicitly defined as parameters in the transformation function to be accessible during execution. If no context variables are provided, this parameter defaults to `None`. + transform: `bool`. When set to `False`, the dataframe is inserted without applying any on-demand transformations. In this case, all required on-demand features must already exist in the provided dataframe. Defaults to `True`. # Returns (`Job`, `ValidationReport`) A tuple with job information if python engine is used and the validation report if validation is enabled. @@ -3090,6 +3112,8 @@ def multi_part_insert( storage, write_options or {}, validation_options or {}, + transformation_context, + transform=transform, ) def finalize_multi_part_insert(self) -> None: @@ -3132,6 +3156,8 @@ def insert_stream( timeout: Optional[int] = None, checkpoint_dir: Optional[str] = None, write_options: Optional[Dict[str, Any]] = None, + transformation_context: Dict[str, Any] = None, + transform: bool = True, ) -> TypeVar("StreamingQuery"): """Ingest a Spark Structured Streaming Dataframe to the online feature store. @@ -3185,6 +3211,9 @@ def insert_stream( "insert_stream_" + online_topic_name. Defaults to `None`. write_options: Additional write options for Spark as key-value pairs. Defaults to `{}`. + transformation_context: `Dict[str, Any]` A dictionary mapping variable names to objects that will be provided as contextual information to the transformation function at runtime. + These variables must be explicitly defined as parameters in the transformation function to be accessible during execution. If no context variables are provided, this parameter defaults to `None`. + transform: `bool`. When set to `False`, the dataframe is inserted without applying any on-demand transformations. In this case, all required on-demand features must already exist in the provided dataframe. Defaults to `True`. # Returns `StreamingQuery`: Spark Structured Streaming Query object. @@ -3219,6 +3248,8 @@ def insert_stream( timeout, checkpoint_dir, write_options or {}, + transformation_context=transformation_context, + transform=transform, ) def commit_details( @@ -3277,7 +3308,7 @@ def delta_vacuum( self, retention_hours: int = None, ) -> None: - """ Vacuum files that are no longer referenced by a Delta table and are older than the retention threshold. + """Vacuum files that are no longer referenced by a Delta table and are older than the retention threshold. This method can only be used on feature groups stored as DELTA. !!! example diff --git a/python/hsfs/feature_group_writer.py b/python/hsfs/feature_group_writer.py index de63bd5a4..ea1c16e82 100644 --- a/python/hsfs/feature_group_writer.py +++ b/python/hsfs/feature_group_writer.py @@ -48,6 +48,8 @@ def insert( storage: Optional[str] = None, write_options: Optional[Dict[str, Any]] = None, validation_options: Optional[Dict[str, Any]] = None, + transformation_context: Dict[str, Any] = None, + transform: bool = True, ) -> Tuple[Optional[Job], Optional[ValidationReport]]: if validation_options is None: validation_options = {} @@ -60,6 +62,8 @@ def insert( storage=storage, write_options={"start_offline_materialization": False, **write_options}, validation_options={"fetch_expectation_suite": False, **validation_options}, + transformation_context=transformation_context, + transform=transform, ) def __exit__(self, exc_type, exc_value, exc_tb): diff --git a/python/hsfs/feature_view.py b/python/hsfs/feature_view.py index e30002775..1b325a50e 100644 --- a/python/hsfs/feature_view.py +++ b/python/hsfs/feature_view.py @@ -529,6 +529,7 @@ def get_feature_vector( force_sql_client: bool = False, transform: Optional[bool] = True, request_parameters: Optional[Dict[str, Any]] = None, + transformation_context: Dict[str, Any] = None, ) -> Union[List[Any], pd.DataFrame, np.ndarray, pl.DataFrame]: """Returns assembled feature vector from online feature store. Call [`feature_view.init_serving`](#init_serving) before this method if the following configurations are needed. @@ -604,6 +605,8 @@ def get_feature_vector( allow_missing: Setting to `True` returns feature vectors with missing values. transformed: Setting to `False` returns the untransformed feature vectors. request_parameters: Request parameters required by on-demand transformation functions to compute on-demand features present in the feature view. + transformation_context: `Dict[str, Any]` A dictionary mapping variable names to objects that will be provided as contextual information to the transformation function at runtime. + These variables must be explicitly defined as parameters in the transformation function to be accessible during execution. If no context variables are provided, this parameter defaults to `None`. # Returns `list`, `pd.DataFrame`, `polars.DataFrame` or `np.ndarray` if `return type` is set to `"list"`, `"pandas"`, `"polars"` or `"numpy"` @@ -631,6 +634,7 @@ def get_feature_vector( force_sql_client=force_sql_client, transform=transform, request_parameters=request_parameters, + transformation_context=transformation_context, ) def get_feature_vectors( @@ -644,6 +648,7 @@ def get_feature_vectors( force_sql_client: bool = False, transform: Optional[bool] = True, request_parameters: Optional[List[Dict[str, Any]]] = None, + transformation_context: Dict[str, Any] = None, ) -> Union[List[List[Any]], pd.DataFrame, np.ndarray, pl.DataFrame]: """Returns assembled feature vectors in batches from online feature store. Call [`feature_view.init_serving`](#init_serving) before this method if the following configurations are needed. @@ -717,6 +722,8 @@ def get_feature_vectors( allow_missing: Setting to `True` returns feature vectors with missing values. transformed: Setting to `False` returns the untransformed feature vectors. request_parameters: Request parameters required by on-demand transformation functions to compute on-demand features present in the feature view. + transformation_context: `Dict[str, Any]` A dictionary mapping variable names to objects that will be provided as contextual information to the transformation function at runtime. + These variables must be explicitly defined as parameters in the transformation function to be accessible during execution. If no context variables are provided, this parameter defaults to `None`. # Returns `List[list]`, `pd.DataFrame`, `polars.DataFrame` or `np.ndarray` if `return type` is set to `"list", `"pandas"`,`"polars"` or `"numpy"` @@ -747,6 +754,7 @@ def get_feature_vectors( force_sql_client=force_sql_client, transform=transform, request_parameters=request_parameters, + transformation_context=transformation_context, ) def get_inference_helper( @@ -999,6 +1007,7 @@ def get_batch_data( inference_helper_columns: bool = False, dataframe_type: Optional[str] = "default", transformed: Optional[bool] = True, + transformation_context: Dict[str, Any] = None, **kwargs, ) -> TrainingDatasetDataFrameTypes: """Get a batch of data from an event time interval from the offline feature store. @@ -1055,6 +1064,8 @@ def get_batch_data( Possible values are `"default"`, `"spark"`,`"pandas"`, `"polars"`, `"numpy"` or `"python"`. Defaults to "default", which maps to Spark dataframe for the Spark Engine and Pandas dataframe for the Python engine. transformed: Setting to `False` returns the untransformed feature vectors. + transformation_context: `Dict[str, Any]` A dictionary mapping variable names to objects that will be provided as contextual information to the transformation function at runtime. + These variables must be explicitly defined as parameters in the transformation function to be accessible during execution. If no context variables are provided, this parameter defaults to `None`. # Returns `DataFrame`: The spark dataframe containing the feature data. @@ -1080,6 +1091,7 @@ def get_batch_data( inference_helper_columns, dataframe_type, transformed=transformed, + transformation_context=transformation_context, ) def add_tag(self, name: str, value: Any) -> None: @@ -1272,6 +1284,7 @@ def create_training_data( primary_key: bool = False, event_time: bool = False, training_helper_columns: bool = False, + transformation_context: Dict[str, Any] = None, **kwargs, ) -> Tuple[int, job.Job]: """Create the metadata for a training dataset and save the corresponding training data into `location`. @@ -1444,6 +1457,8 @@ def create_training_data( model schema itself but can be used during training as a helper for extra information. If training helper columns were not defined in the feature view then`training_helper_columns=True` will not have any effect. Defaults to `False`, no training helper columns. + transformation_context: `Dict[str, Any]` A dictionary mapping variable names to objects that will be provided as contextual information to the transformation function at runtime. + These variables must be explicitly defined as parameters in the transformation function to be accessible during execution. If no context variables are provided, this parameter defaults to `None`. # Returns (td_version, `Job`): Tuple of training dataset version and job. When using the `python` engine, it returns the Hopsworks Job @@ -1474,6 +1489,7 @@ def create_training_data( primary_keys=kwargs.get("primary_keys") or primary_key, event_time=event_time, training_helper_columns=training_helper_columns, + transformation_context=transformation_context, ) warnings.warn( "Incremented version to `{}`.".format(td.version), @@ -1505,6 +1521,7 @@ def create_train_test_split( primary_key: bool = False, event_time: bool = False, training_helper_columns: bool = False, + transformation_context: Dict[str, Any] = None, **kwargs, ) -> Tuple[int, job.Job]: """Create the metadata for a training dataset and save the corresponding training data into `location`. @@ -1724,6 +1741,8 @@ def create_train_test_split( extra information. If training helper columns were not defined in the feature view then`training_helper_columns=True` will not have any effect. Defaults to `False`, no training helper columns. + transformation_context: `Dict[str, Any]` A dictionary mapping variable names to objects that will be provided as contextual information to the transformation function at runtime. + These variables must be explicitly defined as parameters in the transformation function to be accessible during execution. If no context variables are provided, this parameter defaults to `None`. # Returns (td_version, `Job`): Tuple of training dataset version and job. When using the `python` engine, it returns the Hopsworks Job @@ -1762,6 +1781,7 @@ def create_train_test_split( primary_keys=kwargs.get("primary_keys") or primary_key, event_time=event_time, training_helper_columns=training_helper_columns, + transformation_context=transformation_context, ) warnings.warn( "Incremented version to `{}`.".format(td.version), @@ -1795,6 +1815,7 @@ def create_train_validation_test_split( primary_key: bool = False, event_time: bool = False, training_helper_columns: bool = False, + transformation_context: Dict[str, Any] = None, **kwargs, ) -> Tuple[int, job.Job]: """Create the metadata for a training dataset and save the corresponding training data into `location`. @@ -2000,6 +2021,8 @@ def create_train_validation_test_split( extra information. If training helper columns were not defined in the feature view then`training_helper_columns=True` will not have any effect. Defaults to `False`, no training helper columns. + transformation_context: `Dict[str, Any]` A dictionary mapping variable names to objects that will be provided as contextual information to the transformation function at runtime. + These variables must be explicitly defined as parameters in the transformation function to be accessible during execution. If no context variables are provided, this parameter defaults to `None`. # Returns (td_version, `Job`): Tuple of training dataset version and job. When using the `python` engine, it returns the Hopsworks Job @@ -2046,6 +2069,7 @@ def create_train_validation_test_split( primary_keys=kwargs.get("primary_keys") or primary_key, event_time=event_time, training_helper_columns=training_helper_columns, + transformation_context=transformation_context, ) warnings.warn( "Incremented version to `{}`.".format(td.version), @@ -2063,6 +2087,7 @@ def recreate_training_dataset( statistics_config: Optional[Union[StatisticsConfig, bool, dict]] = None, write_options: Optional[Dict[Any, Any]] = None, spine: Optional[SplineDataFrameTypes] = None, + transformation_context: Dict[str, Any] = None, ) -> job.Job: """ Recreate a training dataset. @@ -2115,6 +2140,8 @@ def recreate_training_dataset( It is possible to directly pass a spine group instead of a dataframe to overwrite the left side of the feature join, however, the same features as in the original feature group that is being replaced need to be available in the spine group. + transformation_context: `Dict[str, Any]` A dictionary mapping variable names to objects that will be provided as contextual information to the transformation function at runtime. + These variables must be explicitly defined as parameters in the transformation function to be accessible during execution. If no context variables are provided, this parameter defaults to `None`. # Returns `Job`: When using the `python` engine, it returns the Hopsworks Job @@ -2126,6 +2153,7 @@ def recreate_training_dataset( statistics_config=statistics_config, user_write_options=write_options or {}, spine=spine, + transformation_context=transformation_context, ) self.update_last_accessed_training_dataset(td.version) @@ -2145,6 +2173,7 @@ def training_data( event_time: bool = False, training_helper_columns: bool = False, dataframe_type: Optional[str] = "default", + transformation_context: Dict[str, Any] = None, **kwargs, ) -> Tuple[ TrainingDatasetDataFrameTypes, @@ -2243,6 +2272,8 @@ def training_data( dataframe_type: str, optional. The type of the returned dataframe. Possible values are `"default"`, `"spark"`,`"pandas"`, `"polars"`, `"numpy"` or `"python"`. Defaults to "default", which maps to Spark dataframe for the Spark Engine and Pandas dataframe for the Python engine. + transformation_context: `Dict[str, Any]` A dictionary mapping variable names to objects that will be provided as contextual information to the transformation function at runtime. + These variables must be explicitly defined as parameters in the transformation function to be accessible during execution. If no context variables are provided, this parameter defaults to `None`. # Returns (X, y): Tuple of dataframe of features and labels. If there are no labels, y returns `None`. """ @@ -2270,6 +2301,7 @@ def training_data( event_time=event_time, training_helper_columns=training_helper_columns, dataframe_type=dataframe_type, + transformation_context=transformation_context, ) warnings.warn( "Incremented version to `{}`.".format(td.version), @@ -2296,6 +2328,7 @@ def train_test_split( event_time: bool = False, training_helper_columns: bool = False, dataframe_type: Optional[str] = "default", + transformation_context: Dict[str, Any] = None, **kwargs, ) -> Tuple[ TrainingDatasetDataFrameTypes, @@ -2406,6 +2439,8 @@ def train_test_split( dataframe_type: str, optional. The type of the returned dataframe. Possible values are `"default"`, `"spark"`,`"pandas"`, `"polars"`, `"numpy"` or `"python"`. Defaults to "default", which maps to Spark dataframe for the Spark Engine and Pandas dataframe for the Python engine. + transformation_context: `Dict[str, Any]` A dictionary mapping variable names to objects that will be provided as contextual information to the transformation function at runtime. + These variables must be explicitly defined as parameters in the transformation function to be accessible during execution. If no context variables are provided, this parameter defaults to `None`. # Returns (X_train, X_test, y_train, y_test): Tuple of dataframe of features and labels @@ -2442,6 +2477,7 @@ def train_test_split( event_time=event_time, training_helper_columns=training_helper_columns, dataframe_type=dataframe_type, + transformation_context=transformation_context, ) warnings.warn( "Incremented version to `{}`.".format(td.version), @@ -2484,6 +2520,7 @@ def train_validation_test_split( event_time: bool = False, training_helper_columns: bool = False, dataframe_type: Optional[str] = "default", + transformation_context: Dict[str, Any] = None, **kwargs, ) -> Tuple[ TrainingDatasetDataFrameTypes, @@ -2609,6 +2646,8 @@ def train_validation_test_split( dataframe_type: str, optional. The type of the returned dataframe. Possible values are `"default"`, `"spark"`,`"pandas"`, `"polars"`, `"numpy"` or `"python"`. Defaults to "default", which maps to Spark dataframe for the Spark Engine and Pandas dataframe for the Python engine. + transformation_context: `Dict[str, Any]` A dictionary mapping variable names to objects that will be provided as contextual information to the transformation function at runtime. + These variables must be explicitly defined as parameters in the transformation function to be accessible during execution. If no context variables are provided, this parameter defaults to `None`. # Returns (X_train, X_val, X_test, y_train, y_val, y_test): Tuple of dataframe of features and labels @@ -2658,6 +2697,7 @@ def train_validation_test_split( event_time=event_time, training_helper_columns=training_helper_columns, dataframe_type=dataframe_type, + transformation_context=transformation_context, ) warnings.warn( "Incremented version to `{}`.".format(td.version), @@ -2697,6 +2737,7 @@ def get_training_data( event_time: bool = False, training_helper_columns: bool = False, dataframe_type: Optional[str] = "default", + transformation_context: Dict[str, Any] = None, **kwargs, ) -> Tuple[ TrainingDatasetDataFrameTypes, @@ -2754,6 +2795,7 @@ def get_training_data( event_time=event_time, training_helper_columns=training_helper_columns, dataframe_type=dataframe_type, + transformation_context=transformation_context, ) self.update_last_accessed_training_dataset(td.version) return df @@ -2767,6 +2809,7 @@ def get_train_test_split( event_time: bool = False, training_helper_columns: bool = False, dataframe_type: Optional[str] = "default", + transformation_context: Dict[str, Any] = None, **kwargs, ) -> Tuple[ TrainingDatasetDataFrameTypes, @@ -2810,6 +2853,8 @@ def get_train_test_split( dataframe_type: str, optional. The type of the returned dataframe. Possible values are `"default"`, `"spark"`,`"pandas"`, `"polars"`, `"numpy"` or `"python"`. Defaults to "default", which maps to Spark dataframe for the Spark Engine and Pandas dataframe for the Python engine. + transformation_context: `Dict[str, Any]` A dictionary mapping variable names to objects that will be provided as contextual information to the transformation function at runtime. + These variables must be explicitly defined as parameters in the transformation function to be accessible during execution. If no context variables are provided, this parameter defaults to `None`. # Returns (X_train, X_test, y_train, y_test): Tuple of dataframe of features and labels @@ -2823,6 +2868,7 @@ def get_train_test_split( event_time=event_time, training_helper_columns=training_helper_columns, dataframe_type=dataframe_type, + transformation_context=transformation_context, ) self.update_last_accessed_training_dataset(td.version) return df @@ -2836,6 +2882,7 @@ def get_train_validation_test_split( event_time: bool = False, training_helper_columns: bool = False, dataframe_type: str = "default", + transformation_context: Dict[str, Any] = None, **kwargs, ) -> Tuple[ TrainingDatasetDataFrameTypes, @@ -2881,6 +2928,8 @@ def get_train_validation_test_split( dataframe_type: str, optional. The type of the returned dataframe. Possible values are `"default"`, `"spark"`,`"pandas"`, `"polars"`, `"numpy"` or `"python"`. Defaults to "default", which maps to Spark dataframe for the Spark Engine and Pandas dataframe for the Python engine. + transformation_context: `Dict[str, Any]` A dictionary mapping variable names to objects that will be provided as contextual information to the transformation function at runtime. + These variables must be explicitly defined as parameters in the transformation function to be accessible during execution. If no context variables are provided, this parameter defaults to `None`. # Returns (X_train, X_val, X_test, y_train, y_val, y_test): Tuple of dataframe of features and labels @@ -2898,6 +2947,7 @@ def get_train_validation_test_split( event_time=event_time, training_helper_columns=training_helper_columns, dataframe_type=dataframe_type, + transformation_context=transformation_context, ) self.update_last_accessed_training_dataset(td.version) return df @@ -3532,11 +3582,14 @@ def update_from_response_json(self, json_dict: Dict[str, Any]) -> "FeatureView": def compute_on_demand_features( self, - feature_vector: Union[List[Any], List[List[Any]], pd.DataFrame, pl.DataFrame], + feature_vector: Optional[ + Union[List[Any], List[List[Any]], pd.DataFrame, pl.DataFrame] + ] = None, request_parameters: Optional[ Union[List[Dict[str, Any]], Dict[str, Any]] ] = None, - external: Optional[bool] = None, + transformation_context: Dict[str, Any] = None, + return_type: Union[Literal["list", "numpy", "pandas", "polars"]] = None, ): """ Function computes on-demand features present in the feature view. @@ -3544,18 +3597,26 @@ def compute_on_demand_features( # Arguments feature_vector: `Union[List[Any], List[List[Any]], pd.DataFrame, pl.DataFrame]`. The feature vector to be transformed. request_parameters: Request parameters required by on-demand transformation functions to compute on-demand features present in the feature view. + transformation_context: `Dict[str, Any]` A dictionary mapping variable names to objects that will be provided as contextual information to the transformation function at runtime. + These variables must be explicitly defined as parameters in the transformation function to be accessible during execution. If no context variables are provided, this parameter defaults to `None`. + return_type: `"list"`, `"pandas"`, `"polars"` or `"numpy"`. Defaults to the same type as the input feature vector. # Returns `Union[List[Any], List[List[Any]], pd.DataFrame, pl.DataFrame]`: The feature vector that contains all on-demand features in the feature view. """ return self._vector_server.compute_on_demand_features( - feature_vectors=feature_vector, request_parameters=request_parameters + feature_vectors=feature_vector, + request_parameters=request_parameters, + transformation_context=transformation_context, + return_type=return_type, ) def transform( self, feature_vector: Union[List[Any], List[List[Any]], pd.DataFrame, pl.DataFrame], external: Optional[bool] = None, + transformation_context: Dict[str, Any] = None, + return_type: Union[Literal["list", "numpy", "pandas", "polars"]] = None, ): """ Transform the input feature vector by applying Model-dependent transformations attached to the feature view. @@ -3571,13 +3632,20 @@ def transform( If set to False, the online feature store storage connector is used which relies on the private IP. Defaults to True if connection to Hopsworks is established from external environment (e.g AWS Sagemaker or Google Colab), otherwise to False. + transformation_context: `Dict[str, Any]` A dictionary mapping variable names to objects that will be provided as contextual information to the transformation function at runtime. + These variables must be explicitly defined as parameters in the transformation function to be accessible during execution. If no context variables are provided, this parameter defaults to `None`. + return_type: `"list"`, `"pandas"`, `"polars"` or `"numpy"`. Defaults to the same type as the input feature vector. # Returns `Union[List[Any], List[List[Any]], pd.DataFrame, pl.DataFrame]`: The transformed feature vector obtained by applying Model-Dependent Transformations. """ if not self._vector_server._serving_initialized: self.init_serving(external=external) - return self._vector_server.transform(feature_vectors=feature_vector) + return self._vector_server.transform( + feature_vectors=feature_vector, + transformation_context=transformation_context, + return_type=return_type, + ) def enable_logging(self) -> None: """Enable feature logging for the current feature view. diff --git a/python/hsfs/hopsworks_udf.py b/python/hsfs/hopsworks_udf.py index d71191dcf..0010958fe 100644 --- a/python/hsfs/hopsworks_udf.py +++ b/python/hsfs/hopsworks_udf.py @@ -61,6 +61,15 @@ def from_string(execution_mode: str): ) from e +class UDFKeyWords(Enum): + """ + Class that stores the keywords used as arguments in a UDFs. + """ + + STATISTICS = "statistics" + CONTEXT = "context" + + def udf( return_type: Union[List[type], type], drop: Optional[Union[str, List[str]]] = None, @@ -86,8 +95,9 @@ def add_one(data1): ``` # Arguments - return_type: The output types of the defined UDF - drop: The features to be dropped after application of transformation functions + return_type: `Union[List[type], type]`. The output types of the defined UDF + drop: `Optional[Union[str, List[str]]]`. The features to be dropped after application of transformation functions. Default's to None. + mode: `Literal["default", "python", "pandas"]`. The exection mode of the UDF. Default's to 'default' # Returns `HopsworksUdf`: The metadata object for hopsworks UDF's. @@ -144,7 +154,7 @@ class HopsworksUdf: return_types : `Union[List[type], type, List[str], str]`. A python type or a list of python types that denotes the data types of the columns output from the transformation functions. name : `Optional[str]`. Name of the transformation function. transformation_features : `Optional[List[TransformationFeature]]`. A list of objects of `TransformationFeature` that maps the feature used for transformation to their corresponding statistics argument names if any - transformation_function_argument_names : `Optional[List[TransformationFeature]]`. The argument names of the transformation function. + transformation_function_argument_names : `Optional[List[str]]`. The argument names of the transformation function. dropped_argument_names : `Optional[List[str]]`. The arguments to be dropped from the finial DataFrame after the transformation functions are applied. dropped_feature_names : `Optional[List[str]]`. The feature name corresponding to the arguments names that are dropped feature_name_prefix: `Optional[str]`. Prefixes if any used in the feature view. @@ -170,9 +180,7 @@ def __init__( execution_mode: UDFExecutionMode, name: Optional[str] = None, transformation_features: Optional[List[TransformationFeature]] = None, - transformation_function_argument_names: Optional[ - List[TransformationFeature] - ] = None, + transformation_function_argument_names: Optional[List[str]] = None, dropped_argument_names: Optional[List[str]] = None, dropped_feature_names: Optional[List[str]] = None, feature_name_prefix: Optional[str] = None, @@ -244,6 +252,8 @@ def __init__( self._statistics: Optional[TransformationStatistics] = None + self._transformation_context: Dict[str, Any] = {} + # Denote if the output feature names have to be generated. # Set to `False` if the output column names are saved in the backend and the udf is constructed from it using `from_response_json` function or if user has specified the output feature names using the `alias`` function. self._generate_output_col_name: bool = generate_output_col_names @@ -418,8 +428,11 @@ def _parse_function_signature(source_code: str) -> Tuple[List[str], str, int, in for arg in arg_list if not arg.strip() == "" ] - if "statistics" in arg_list: - arg_list.remove("statistics") + + # Extracting keywords like `context`and `statistics` from the function signature. + keyword_to_remove = [keyword.value for keyword in UDFKeyWords] + arg_list = [arg for arg in arg_list if arg not in keyword_to_remove] + return arg_list, signature, signature_start_line, signature_end_line @staticmethod @@ -439,10 +452,13 @@ def _extract_function_arguments(function: Callable) -> List[TransformationFeatur raise FeatureStoreException( "No arguments present in the provided user defined function. Please provide at least one argument in the defined user defined function." ) + + # Extracting keywords like `context`and `statistics` from the function signature. for arg in inspect.signature(function).parameters.values(): - if arg.name == "statistics": + # Fetching default value for statistics parameter to find argument names that require statistics. + if arg.name == UDFKeyWords.STATISTICS.value: statistics = arg.default - else: + elif arg.name != UDFKeyWords.CONTEXT.value: arg_list.append(arg.name) if statistics: @@ -508,6 +524,28 @@ def _create_pandas_udf_return_schema_from_list(self) -> str: else: return self.return_types[0] + def _prepare_transformation_function_scope(self, **kwargs) -> Dict[str, Any]: + """ + Function that prepares the scope for the transformation function to be executed. This scope would include any variable that are required to be injected into the transformation function. + + By default the output column names, transformation statistics and transformation context are injected into the scope if they are required by the transformation function. Any additional arguments can be passed as kwargs. + """ + # Shallow copy of scope performed because updating statistics argument of scope must not affect other instances. + scope = __import__("__main__").__dict__.copy() + + # Adding variables required to be injected into the scope. + vaariable_to_inject = { + UDFKeyWords.STATISTICS.value: self.transformation_statistics, + UDFKeyWords.CONTEXT.value: self.transformation_context, + "_output_col_names": self.output_column_names, + } + vaariable_to_inject.update(**kwargs) + + # Injecting variables that have a value into scope. + scope.update({k: v for k, v in vaariable_to_inject.items() if v is not None}) + + return scope + def python_udf_wrapper(self, rename_outputs) -> Callable: """ Function that creates a dynamic wrapper function for the defined udf. The wrapper function would be used to specify column names, in spark engines and to localize timezones. @@ -525,9 +563,9 @@ def python_udf_wrapper(self, rename_outputs) -> Callable: # Function that converts the timestamp to localized timezone convert_timstamp_function = ( - "from datetime import datetime, timezone\n" - + "import tzlocal\n" - + "def convert_timezone(date_time_obj : datetime):\n" + "def convert_timezone(date_time_obj : datetime):\n" + + " from datetime import datetime, timezone\n" + + " import tzlocal\n" + " current_timezone = tzlocal.get_localzone()\n" + " if date_time_obj and isinstance(date_time_obj, datetime):\n" + " if date_time_obj.tzinfo is None:\n" @@ -555,7 +593,7 @@ def python_udf_wrapper(self, rename_outputs) -> Callable: code += ( " transformed_features = list(transformed_features)\n" " for index in _date_time_output_index:\n" - + " transformed_features[index] = convert_timezone(transformed_features[index])" + + " transformed_features[index] = convert_timezone(transformed_features[index])\n" ) if rename_outputs: # Use a dictionary to rename output to correct column names. This must be for the udf's to be executable in spark. @@ -570,11 +608,9 @@ def python_udf_wrapper(self, rename_outputs) -> Callable: code += " return transformed_features" # Inject required parameter to scope - scope = __import__("__main__").__dict__.copy() - if self.transformation_statistics is not None: - scope.update({"statistics": self.transformation_statistics}) - scope.update({"_output_col_names": self.output_column_names}) - scope.update({"_date_time_output_index": date_time_output_index}) + scope = self._prepare_transformation_function_scope( + _date_time_output_index=date_time_output_index + ) # executing code exec(code, scope) @@ -646,14 +682,11 @@ def renaming_wrapper(*args): return df""" ) - # injecting variables into scope used to execute wrapper function. + # Inject required parameter to scope + scope = self._prepare_transformation_function_scope( + _date_time_output_columns=date_time_output_columns + ) - # Shallow copy of scope performed because updating statistics argument of scope must not affect other instances. - scope = __import__("__main__").__dict__.copy() - if self.transformation_statistics is not None: - scope.update({"statistics": self.transformation_statistics}) - scope.update({"_output_col_names": self.output_column_names}) - scope.update({"_date_time_output_columns": date_time_output_columns}) # executing code exec(code, scope) @@ -749,6 +782,19 @@ def _validate_output_col_name(self, output_col_names): f"The number of output feature names provided does not match the number of features returned by the transformation function '{repr(self)}'. Pease provide exactly {len(self.return_types)} feature name(s) to match the output." ) + def _validate_transformation_context(self, transformation_context: Dict[str, Any]): + """ + Function that checks if the context variables provided to the transformation function is valid. + + it checks if context variables are defined as a dictionary. + """ + if not isinstance(transformation_context, dict): + raise FeatureStoreException( + "Transformation context variable must be passed as dictionary." + ) + + return transformation_context + def update_return_type_one_hot(self): self._return_types = [ self._return_types[0] @@ -995,6 +1041,21 @@ def transformation_features(self) -> List[str]: for transformation_feature in self._transformation_features ] + @property + def unprefixed_transformation_features(self) -> List[str]: + """ + List of feature name used in the transformation function without the feature name prefix. + """ + return [ + transformation_feature.feature_name + for transformation_feature in self._transformation_features + ] + + @property + def feature_name_prefix(self) -> Optional[str]: + """The feature name prefix that needs to be added to the feature names""" + return self._feature_name_prefix + @property def statistics_features(self) -> List[str]: """ @@ -1044,6 +1105,22 @@ def dropped_features(self) -> List[str]: def execution_mode(self) -> UDFExecutionMode: return self._execution_mode + @property + def transformation_context(self) -> Dict[str, Any]: + """ + Dictionary that contains the context variables required for the UDF. + These context variables passed to the UDF during execution. + """ + return self._transformation_context if self._transformation_context else {} + + @transformation_context.setter + def transformation_context(self, context_variables: Dict[str, Any]) -> None: + self._transformation_context = ( + self._validate_transformation_context(context_variables) + if context_variables + else {} + ) + @dropped_features.setter def dropped_features(self, features: List[str]) -> None: self._dropped_features = HopsworksUdf._validate_and_convert_drop_features( diff --git a/python/hsfs/transformation_function.py b/python/hsfs/transformation_function.py index a480ee521..0c6383cfd 100644 --- a/python/hsfs/transformation_function.py +++ b/python/hsfs/transformation_function.py @@ -235,6 +235,7 @@ def to_dict(self) -> Dict[str, Any]: "version": self._version, "featurestoreId": self._featurestore_id, "hopsworksUdf": self.hopsworks_udf.to_dict(), + "transformationType": self.transformation_type.value, } def alias(self, *args: str): @@ -270,18 +271,24 @@ def _get_output_column_names(self) -> str: ) ): output_col_names = [self.__hopsworks_udf.function_name] + else: + if self.transformation_type == TransformationType.MODEL_DEPENDENT: + _BASE_COLUMN_NAME = f'{self.__hopsworks_udf.function_name}_{"_".join(self.__hopsworks_udf.transformation_features)}_' + elif self.transformation_type == TransformationType.ON_DEMAND: + _BASE_COLUMN_NAME = ( + self.__hopsworks_udf.function_name + if len(self.__hopsworks_udf.return_types) == 1 + else f"{self.__hopsworks_udf.function_name}_" + ) - if self.transformation_type == TransformationType.MODEL_DEPENDENT: - _BASE_COLUMN_NAME = f'{self.__hopsworks_udf.function_name}_{"_".join(self.__hopsworks_udf.transformation_features)}_' - if len(self.__hopsworks_udf.return_types) > 1: - output_col_names = [ + output_col_names = ( + [ f"{_BASE_COLUMN_NAME}{i}" for i in range(len(self.__hopsworks_udf.return_types)) ] - else: - output_col_names = [f"{_BASE_COLUMN_NAME}"] - elif self.transformation_type == TransformationType.ON_DEMAND: - output_col_names = [self.__hopsworks_udf.function_name] + if len(self.__hopsworks_udf.return_types) > 1 + else [_BASE_COLUMN_NAME] + ) if any( len(output_col_name) > FEATURES.MAX_LENGTH_NAME @@ -315,11 +322,6 @@ def _validate_transformation_type( """ if transformation_type == TransformationType.ON_DEMAND: - if len(hopsworks_udf.return_types) > 1: - raise FeatureStoreException( - "On-Demand Transformation functions can only return one column as output" - ) - if hopsworks_udf.statistics_required: raise FeatureStoreException( "On-Demand Transformation functions cannot use statistics, please remove statistics parameters from the functions" @@ -395,3 +397,6 @@ def __repr__(self): return f"On-Demand Transformation Function : {repr(self.__hopsworks_udf)}" else: return f"Transformation Function : {repr(self.__hopsworks_udf)}" + + def __eq__(self, other): + return self.to_dict() == other.to_dict() diff --git a/python/tests/engine/test_python.py b/python/tests/engine/test_python.py index 84e2ca10a..91d6780f8 100644 --- a/python/tests/engine/test_python.py +++ b/python/tests/engine/test_python.py @@ -2676,6 +2676,51 @@ def plus_one(col1): assert result["plus_one_tf_name_"][0] == 2 assert result["plus_one_tf_name_"][1] == 3 + @pytest.mark.parametrize("execution_mode", ["default", "pandas", "python"]) + def test_apply_transformation_function_udf_transformation_context( + self, mocker, execution_mode + ): + # Arrange + mocker.patch("hopsworks_common.client.get_instance") + hopsworks_common.connection._hsfs_engine_type = "python" + python_engine = python.Engine() + + @udf(int, mode=execution_mode) + def plus_one(col1, context): + return col1 + context["test"] + + fg = feature_group.FeatureGroup( + name="test1", + version=1, + featurestore_id=99, + primary_key=[], + partition_key=[], + features=[feature.Feature("id"), feature.Feature("tf_name")], + id=11, + stream=False, + ) + + fv = feature_view.FeatureView( + name="fv_name", + query=fg.select_all(), + featurestore_id=99, + transformation_functions=[plus_one("tf_name")], + ) + + df = pd.DataFrame(data={"tf_name": [1, 2]}) + + # Act + result = python_engine._apply_transformation_function( + transformation_functions=fv.transformation_functions, + dataset=df, + transformation_context={"test": 10}, + ) + + # Assert + assert len(result["plus_one_tf_name_"]) == 2 + assert result["plus_one_tf_name_"][0] == 11 + assert result["plus_one_tf_name_"][1] == 12 + def test_apply_transformation_function_multiple_output_udf_default_mode( self, mocker ): @@ -3441,7 +3486,7 @@ def add_one(col1): python_engine._apply_transformation_function( transformation_functions=fg.transformation_functions, dataset=df ) - print(str(exception.value)) + assert ( str(exception.value) == "The following feature(s): `missing_col1`, specified in the on-demand transformation function 'add_one' are not present in the dataframe being inserted into the feature group. " @@ -3485,7 +3530,7 @@ def add_one(col1): python_engine._apply_transformation_function( transformation_functions=fv.transformation_functions, dataset=df ) - print(str(exception.value)) + assert ( str(exception.value) == "The following feature(s): `missing_col1`, specified in the model-dependent transformation function 'add_one' are not present in the feature view. " diff --git a/python/tests/engine/test_spark.py b/python/tests/engine/test_spark.py index f74aaf36f..391de6b0f 100644 --- a/python/tests/engine/test_spark.py +++ b/python/tests/engine/test_spark.py @@ -4414,7 +4414,7 @@ def plus_one(col1): "col_2": [True, False], "plus_one_col_0_": [2, 3], } - ) # todo why it doesnt return int? + ) expected_spark_df = spark_engine._spark_session.createDataFrame(expected_df) @@ -4475,7 +4475,7 @@ def plus_one(col1): "col_2": [True, False], "plus_one_col_0_": [2, 3], } - ) # todo why it doesnt return int? + ) expected_spark_df = spark_engine._spark_session.createDataFrame(expected_df) @@ -4536,7 +4536,71 @@ def plus_one(col1): "col_2": [True, False], "plus_one_col_0_": [2, 3], } - ) # todo why it doesnt return int? + ) + + expected_spark_df = spark_engine._spark_session.createDataFrame(expected_df) + + # Act + result = spark_engine._apply_transformation_function( + transformation_functions=fv.transformation_functions, + dataset=spark_df, + ) + # Assert + assert result.schema == expected_spark_df.schema + assert result.collect() == expected_spark_df.collect() + + @pytest.mark.parametrize("execution_mode", ["default", "pandas", "python"]) + def test_apply_transformation_function_single_output_udf_transformation_context( + self, mocker, execution_mode + ): + # Arrange + mocker.patch("hopsworks_common.client.get_instance") + hopsworks_common.connection._hsfs_engine_type = "spark" + spark_engine = spark.Engine() + + @udf(int, drop=["col1"], mode=execution_mode) + def plus_one(col1, context): + return col1 + context["test"] + + tf = transformation_function.TransformationFunction( + 99, + hopsworks_udf=plus_one, + transformation_type=TransformationType.MODEL_DEPENDENT, + ) + + f = feature.Feature(name="col_0", type=IntegerType(), index=0) + f1 = feature.Feature(name="col_1", type=StringType(), index=1) + f2 = feature.Feature(name="col_2", type=BooleanType(), index=1) + features = [f, f1, f2] + fg1 = feature_group.FeatureGroup( + name="test1", + version=1, + featurestore_id=99, + primary_key=[], + partition_key=[], + features=features, + id=11, + stream=False, + ) + fv = feature_view.FeatureView( + name="test", + featurestore_id=99, + query=fg1.select_all(), + transformation_functions=[tf("col_0")], + ) + + d = {"col_0": [1, 2], "col_1": ["test_1", "test_2"], "col_2": [True, False]} + df = pd.DataFrame(data=d) + + spark_df = spark_engine._spark_session.createDataFrame(df) + + expected_df = pd.DataFrame( + data={ + "col_1": ["test_1", "test_2"], + "col_2": [True, False], + "plus_one_col_0_": [21, 22], + } + ) expected_spark_df = spark_engine._spark_session.createDataFrame(expected_df) @@ -4544,6 +4608,7 @@ def plus_one(col1): result = spark_engine._apply_transformation_function( transformation_functions=fv.transformation_functions, dataset=spark_df, + transformation_context={"test": 20}, ) # Assert assert result.schema == expected_spark_df.schema @@ -4600,7 +4665,7 @@ def plus_two(col1): "plus_two_col_0_0": [2, 3], "plus_two_col_0_1": [3, 4], } - ) # todo why it doesnt return int? + ) expected_spark_df = spark_engine._spark_session.createDataFrame(expected_df) @@ -4664,7 +4729,7 @@ def plus_two(col1): "plus_two_col_0_0": [2, 3], "plus_two_col_0_1": [3, 4], } - ) # todo why it doesnt return int? + ) expected_spark_df = spark_engine._spark_session.createDataFrame(expected_df) @@ -4728,7 +4793,7 @@ def plus_two(col1): "plus_two_col_0_0": [2, 3], "plus_two_col_0_1": [3, 4], } - ) # todo why it doesnt return int? + ) expected_spark_df = spark_engine._spark_session.createDataFrame(expected_df) @@ -5178,7 +5243,7 @@ def test(col1, col2): "test_col_0_col_2_0": [2, 3], "test_col_0_col_2_1": [12, 13], } - ) # todo why it doesnt return int? + ) expected_spark_df = spark_engine._spark_session.createDataFrame(expected_df) @@ -5241,7 +5306,7 @@ def test(col1, col2): "test_col_0_col_2_0": [2, 3], "test_col_0_col_2_1": [12, 13], } - ) # todo why it doesnt return int? + ) expected_spark_df = spark_engine._spark_session.createDataFrame(expected_df) @@ -5304,7 +5369,7 @@ def test(col1, col2): "test_col_0_col_2_0": [2, 3], "test_col_0_col_2_1": [12, 13], } - ) # todo why it doesnt return int? + ) expected_spark_df = spark_engine._spark_session.createDataFrame(expected_df) diff --git a/python/tests/test_feature_group.py b/python/tests/test_feature_group.py index 5e01b5a10..1e4795ceb 100644 --- a/python/tests/test_feature_group.py +++ b/python/tests/test_feature_group.py @@ -624,6 +624,8 @@ def test_save_report_true_default(self, mocker, dataframe_fixture_basic): storage=None, write_options={"wait_for_job": False}, validation_options={"save_report": True}, + transformation_context=None, + transform=True, ) def test_save_report_default_overwritable(self, mocker, dataframe_fixture_basic): @@ -658,6 +660,8 @@ def test_save_report_default_overwritable(self, mocker, dataframe_fixture_basic) storage=None, write_options={"wait_for_job": False}, validation_options={"save_report": False}, + transformation_context=None, + transform=True, ) @@ -931,7 +935,9 @@ def test_prepare_spark_location_with_s3_connector(self, mocker, backend_fixtures json = backend_fixtures["feature_group"]["get_basic_info"]["response"] fg = feature_group.FeatureGroup.from_response_json(json) fg._location = f"{fg.name}_{fg.version}" - fg._storage_connector = storage_connector.S3Connector(id=1, name="s3_conn", featurestore_id=fg.feature_store_id) + fg._storage_connector = storage_connector.S3Connector( + id=1, name="s3_conn", featurestore_id=fg.feature_store_id + ) # Act path = fg.prepare_spark_location() @@ -940,14 +946,18 @@ def test_prepare_spark_location_with_s3_connector(self, mocker, backend_fixtures assert fg.location == path engine_instance.assert_called_once() - def test_prepare_spark_location_with_s3_connector_python(self, mocker, backend_fixtures): + def test_prepare_spark_location_with_s3_connector_python( + self, mocker, backend_fixtures + ): # Arrange engine = python.Engine() engine_instance = mocker.patch("hsfs.engine.get_instance", return_value=engine) json = backend_fixtures["feature_group"]["get_basic_info"]["response"] fg = feature_group.FeatureGroup.from_response_json(json) fg._location = f"{fg.name}_{fg.version}" - fg._storage_connector = storage_connector.S3Connector(id=1, name="s3_conn", featurestore_id=fg.feature_store_id) + fg._storage_connector = storage_connector.S3Connector( + id=1, name="s3_conn", featurestore_id=fg.feature_store_id + ) # Act with pytest.raises(AttributeError): diff --git a/python/tests/test_feature_group_writer.py b/python/tests/test_feature_group_writer.py index 409c14ede..b2f577575 100644 --- a/python/tests/test_feature_group_writer.py +++ b/python/tests/test_feature_group_writer.py @@ -43,6 +43,8 @@ def test_fg_writer_context_manager(self, mocker, dataframe_fixture_basic): storage=None, write_options={"start_offline_materialization": False}, validation_options={"fetch_expectation_suite": False}, + transformation_context=None, + transform=True, ) assert fg._multi_part_insert is False diff --git a/python/tests/test_helpers/transformation_test_helper.py b/python/tests/test_helpers/transformation_test_helper.py index 99967c864..9415d44c8 100644 --- a/python/tests/test_helpers/transformation_test_helper.py +++ b/python/tests/test_helpers/transformation_test_helper.py @@ -107,3 +107,7 @@ def test_function_transformation_statistics_as_default_multiple_line_return_type def test_function_statistics_invalid(arg1: pd.Series, statistics=stats_arg3): pass + + +def test_function_context_variables(arg1: pd.Series, context): + pass diff --git a/python/tests/test_hopswork_udf.py b/python/tests/test_hopswork_udf.py index 44de7cced..08c1fc28b 100644 --- a/python/tests/test_hopswork_udf.py +++ b/python/tests/test_hopswork_udf.py @@ -337,6 +337,21 @@ def test_extract_function_arguments_multiple_argumen_all_parameter_multiline_wit TransformationFeature(feature_name="arg3", statistic_argument_name="arg3"), ] + def test_extract_function_arguments_context_variable( + self, + ): + from .test_helpers.transformation_test_helper import ( + test_function_context_variables, + ) + + function_argument = HopsworksUdf._extract_function_arguments( + test_function_context_variables + ) + + assert function_argument == [ + TransformationFeature(feature_name="arg1", statistic_argument_name=None), + ] + def test_extract_function_arguments_statistics_invalid(self): from .test_helpers.transformation_test_helper import ( test_function_statistics_invalid, @@ -681,6 +696,27 @@ def test_format_source_code_transformation_statistics_as_default_multiple_line_r \t pass""" ) + def test_format_source_code_context_variable( + self, + ): + from .test_helpers.transformation_test_helper import ( + test_function_context_variables, + ) + + function_source = HopsworksUdf._extract_source_code( + test_function_context_variables + ) + + formated_source, module_imports = HopsworksUdf._format_source_code( + function_source + ) + + assert ( + formated_source.strip() + == """def test_function_context_variables(arg1): +\t pass""" + ) + def test_drop_features_one_element(self): @udf([int, float, int], drop="col1") def test_func(col1, col2, col3): @@ -790,6 +826,22 @@ def test_func(col1): assert result.name == "test_func_col1_" assert result.values.tolist() == [2, 3, 4, 5] + def test_pandas_udf_wrapper_context_variables(self): + test_dataframe = pd.DataFrame({"col1": [1, 2, 3, 4]}) + + @udf(int) + def test_func(col1, context): + return col1 + context["test_value"] + + test_func.output_column_names = ["test_func_col1_"] + test_func.transformation_context = {"test_value": 200} + renaming_wrapper_function = test_func.pandas_udf_wrapper() + + result = renaming_wrapper_function(test_dataframe["col1"]) + + assert result.name == "test_func_col1_" + assert result.values.tolist() == [201, 202, 203, 204] + def test_python_udf_wrapper_single_output(self): test_dataframe = pd.DataFrame({"col1": [1, 2, 3, 4]}) @@ -806,6 +858,23 @@ def test_func(col1): assert result.values.tolist() == [2, 3, 4, 5] + def test_python_udf_wrapper_context_variables(self): + test_dataframe = pd.DataFrame({"col1": [1, 2, 3, 4]}) + + @udf(int) + def test_func(col1, context): + return col1 + context["test_value"] + + test_func.transformation_context = {"test_value": 100} + test_func.output_column_names = ["test_func_col1_"] + wrapper_function = test_func.python_udf_wrapper(rename_outputs=False) + + result = test_dataframe.apply( + lambda x: wrapper_function(x["col1"]), axis=1, result_type="expand" + ) + + assert result.values.tolist() == [101, 102, 103, 104] + def test_pandas_udf_wrapper_multiple_output(self): @udf([int, float]) def test_func(col1, col2): @@ -1336,3 +1405,94 @@ def add_and_sub(feature): str(exp.value) == "Invalid output feature names specified for the transformation function 'add_and_sub(feature)'. Please provide names shorter than 63 characters." ) + + def test_invalid_transformation_context(self): + @udf(int) + def test_func(feature, context): + return feature + context["test_value"] + + with pytest.raises(FeatureStoreException) as exp: + test_func.transformation_context = "invalid_context" + + exp.match("Transformation context variable must be passed as dictionary.") + + def test_prepare_transformation_function_scope_no_kwargs_no_statistics_no_context( + self, + ): + @udf(int) + def test_func(feature): + return feature + 1 + + # Setting the output column names to test the scope as they would always be set before scope is prepared. + test_func._output_column_names = ["test_func_feature_"] + + scope = test_func._prepare_transformation_function_scope() + + assert scope["_output_col_names"] == ["test_func_feature_"] + assert "_output_col_names" in scope.keys() + + def test_prepare_transformation_function_scope_no_kwargs_no_statistics_context( + self, + ): + @udf(int) + def test_func(feature, context): + return feature + 1 + + # Setting the output column names to test the scope, transformation context as they would always be set before scope is prepared. + test_func._output_column_names = ["test_func_feature_"] + test_func.transformation_context = {"test_value": 100} + + scope = test_func._prepare_transformation_function_scope() + + print(scope) + + assert scope["_output_col_names"] == ["test_func_feature_"] + assert scope["context"] == {"test_value": 100} + assert all( + value in scope.keys() + for value in { + "_output_col_names", + "context", + } + ) + + def test_prepare_transformation_function_scope_no_kwargs_statistics_context(self): + @udf(int) + def test_func(feature, context): + return feature + 1 + + # Setting the output column names to test the scope, transformation context and statistics as they would always be set before scope is prepared. + test_func._output_column_names = ["test_func_feature_"] + test_func.transformation_context = {"test_value": 100} + test_func._statistics = 10 + + scope = test_func._prepare_transformation_function_scope() + + assert scope["_output_col_names"] == ["test_func_feature_"] + assert scope["context"] == {"test_value": 100} + assert scope["statistics"] == 10 + assert all( + value in scope.keys() + for value in {"_output_col_names", "context", "statistics"} + ) + + def test_prepare_transformation_function_scope_kwargs_statistics_context(self): + @udf(int) + def test_func(feature, context): + return feature + 1 + + # Setting the output column names to test the scope, transformation context and statistics as they would always be set before scope is prepared. + test_func._output_column_names = ["test_func_feature_"] + test_func.transformation_context = {"test_value": 100} + test_func._statistics = 10 + + scope = test_func._prepare_transformation_function_scope(test="values") + + assert scope["_output_col_names"] == ["test_func_feature_"] + assert scope["context"] == {"test_value": 100} + assert scope["statistics"] == 10 + assert scope["test"] == "values" + assert all( + value in scope.keys() + for value in {"_output_col_names", "context", "statistics", "test"} + ) diff --git a/python/tests/test_transformation_function.py b/python/tests/test_transformation_function.py index 2f10d9924..bf2f37631 100644 --- a/python/tests/test_transformation_function.py +++ b/python/tests/test_transformation_function.py @@ -381,6 +381,27 @@ def test_func(col1): "test_func_col1_2", ] + def test_generate_output_column_names_single_argument_multiple_output_type_odt( + self, + ): + @udf([int, float, int]) + def test_func(col1): + return pd.DataFrame( + {"col1": [col1 + 1], "col2": [col1 + 1], "col3": [col1 + 1]} + ) + + odt = TransformationFunction( + featurestore_id=10, + hopsworks_udf=test_func, + transformation_type=TransformationType.ON_DEMAND, + ) + assert odt._get_output_column_names() == [ + "test_func_0", + "test_func_1", + "test_func_2", + ] + assert odt.output_column_names == ["test_func_0", "test_func_1", "test_func_2"] + def test_generate_output_column_names_single_argument_multiple_output_type_prefix_mdt( self, ): @@ -408,6 +429,34 @@ def test_func(col1): "prefix_test_func_prefix_col1_2", ] + def test_generate_output_column_names_single_argument_multiple_output_type_prefix_odt( + self, + ): + @udf([int, float, int]) + def test_func(col1): + return pd.DataFrame( + {"col1": [col1 + 1], "col2": [col1 + 1], "col3": [col1 + 1]} + ) + + test_func._feature_name_prefix = "prefix_" + + odt = TransformationFunction( + featurestore_id=10, + hopsworks_udf=test_func, + transformation_type=TransformationType.ON_DEMAND, + ) + + assert odt._get_output_column_names() == [ + "test_func_0", + "test_func_1", + "test_func_2", + ] + assert odt.output_column_names == [ + "prefix_test_func_0", + "prefix_test_func_1", + "prefix_test_func_2", + ] + def test_generate_output_column_names_multiple_argument_multiple_output_type_mdt( self, ): @@ -427,6 +476,33 @@ def test_func(col1, col2, col3): "test_func_col1_col2_col3_1", "test_func_col1_col2_col3_2", ] + assert mdt.output_column_names == [ + "test_func_col1_col2_col3_0", + "test_func_col1_col2_col3_1", + "test_func_col1_col2_col3_2", + ] + + def test_generate_output_column_names_multiple_argument_multiple_output_type_odt( + self, + ): + @udf([int, float, int]) + def test_func(col1, col2, col3): + return pd.DataFrame( + {"col1": [col1 + 1], "col2": [col2 + 1], "col3": [col3 + 1]} + ) + + odt = TransformationFunction( + featurestore_id=10, + hopsworks_udf=test_func, + transformation_type=TransformationType.ON_DEMAND, + ) + + assert odt._get_output_column_names() == [ + "test_func_0", + "test_func_1", + "test_func_2", + ] + assert odt.output_column_names == ["test_func_0", "test_func_1", "test_func_2"] def test_generate_output_column_names_multiple_argument_multiple_output_type_prefix_mdt( self, @@ -455,23 +531,34 @@ def test_func(col1, col2, col3): "prefix_test_func_prefix_col1_prefix_col2_prefix_col3_2", ] - def test_validate_udf_type_on_demand_multiple_output(self): - @udf([int, float]) - def test_func(col1, col2): - return pd.DataFrame({"out1": col1 + 1, "out2": col2 + 2}) - - with pytest.raises(FeatureStoreException) as exe: - TransformationFunction( - featurestore_id=10, - hopsworks_udf=test_func, - transformation_type=TransformationType.ON_DEMAND, + def test_generate_output_column_names_multiple_argument_multiple_output_type_prefix_odt( + self, + ): + @udf([int, float, int]) + def test_func(col1, col2, col3): + return pd.DataFrame( + {"col1": [col1 + 1], "col2": [col2 + 1], "col3": [col3 + 1]} ) - assert ( - str(exe.value) - == "On-Demand Transformation functions can only return one column as output" + test_func._feature_name_prefix = "prefix_" + + odt = TransformationFunction( + featurestore_id=10, + hopsworks_udf=test_func, + transformation_type=TransformationType.ON_DEMAND, ) + assert odt._get_output_column_names() == [ + "test_func_0", + "test_func_1", + "test_func_2", + ] + assert odt.output_column_names == [ + "prefix_test_func_0", + "prefix_test_func_1", + "prefix_test_func_2", + ] + def test_validate_udf_type_on_demand_statistics(self): from hsfs.transformation_statistics import TransformationStatistics @@ -868,3 +955,60 @@ def really_long_function_name_that_exceed_63_characters_causing_invalid_name_for assert mdt.hopsworks_udf.output_column_names == [ "really_long_function_name_that_exceed_63_characters_causing_inv" ] + + def test_equality_mdt(self): + @udf([int]) + def add_one(feature): + return feature + 1 + + mdt1 = TransformationFunction( + featurestore_id=10, + hopsworks_udf=add_one, + transformation_type=TransformationType.MODEL_DEPENDENT, + ) + + mdt2 = TransformationFunction( + featurestore_id=10, + hopsworks_udf=add_one, + transformation_type=TransformationType.MODEL_DEPENDENT, + ) + + assert mdt1 == mdt2 + + def test_equality_odt(self): + @udf([int]) + def add_one(feature): + return feature + 1 + + odt1 = TransformationFunction( + featurestore_id=10, + hopsworks_udf=add_one, + transformation_type=TransformationType.ON_DEMAND, + ) + + odt2 = TransformationFunction( + featurestore_id=10, + hopsworks_udf=add_one, + transformation_type=TransformationType.ON_DEMAND, + ) + + assert odt1 == odt2 + + def test_inequality(self): + @udf([int]) + def add_one(feature): + return feature + 1 + + mdt = TransformationFunction( + featurestore_id=10, + hopsworks_udf=add_one, + transformation_type=TransformationType.MODEL_DEPENDENT, + ) + + odt = TransformationFunction( + featurestore_id=10, + hopsworks_udf=add_one, + transformation_type=TransformationType.ON_DEMAND, + ) + + assert mdt != odt