Skip to content

[FSTORE-1411-APPEND] On-Demand Transformation Functions #236

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 6 commits into from
Jul 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
257 changes: 231 additions & 26 deletions python/hsfs/core/vector_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

import itertools
import logging
import warnings
from base64 import b64decode
from datetime import datetime, timezone
from io import BytesIO
Expand Down Expand Up @@ -97,6 +98,15 @@ def __init__(
)
]
self._untransformed_feature_vector_col_name = [
feat.name
for feat in features
if not (
feat.label
or feat.training_helper_column
or feat.on_demand_transformation_function
)
]
self._on_demand_feature_vector_col_name = [
feat.name
for feat in features
if not (feat.label or feat.training_helper_column)
Expand Down Expand Up @@ -310,7 +320,7 @@ def get_feature_vector(
allow_missing: bool = False,
force_rest_client: bool = False,
force_sql_client: bool = False,
transformed=True,
transform: bool = True,
request_parameters: Optional[Dict[str, Any]] = None,
) -> Union[pd.DataFrame, pl.DataFrame, np.ndarray, List[Any], Dict[str, Any]]:
"""Assembles serving vector from online feature store."""
Expand Down Expand Up @@ -343,7 +353,7 @@ def get_feature_vector(
vector_db_result=vector_db_features or {},
allow_missing=allow_missing,
client=online_client_choice,
transformed=transformed,
transform=transform,
request_parameters=request_parameters,
)

Expand All @@ -352,7 +362,8 @@ def get_feature_vector(
batch=False,
inference_helper=False,
return_type=return_type,
transformed=transformed,
transform=transform,
on_demand_feature=transform,
)

def get_feature_vectors(
Expand All @@ -367,7 +378,7 @@ def get_feature_vectors(
allow_missing: bool = False,
force_rest_client: bool = False,
force_sql_client: bool = False,
transformed=True,
transform: bool = True,
) -> Union[pd.DataFrame, pl.DataFrame, np.ndarray, List[Any], List[Dict[str, Any]]]:
"""Assembles serving vector from online feature store."""
if passed_features is None:
Expand Down Expand Up @@ -467,7 +478,7 @@ def get_feature_vectors(
vector_db_result=vector_db_result,
allow_missing=allow_missing,
client=online_client_choice,
transformed=transformed,
transform=transform,
request_parameters=request_parameter,
)

Expand All @@ -479,7 +490,8 @@ def get_feature_vectors(
batch=True,
inference_helper=False,
return_type=return_type,
transformed=transformed,
transform=transform,
on_demand_feature=transform,
)

def assemble_feature_vector(
Expand All @@ -489,7 +501,7 @@ def assemble_feature_vector(
vector_db_result: Optional[Dict[str, Any]],
allow_missing: bool,
client: Literal["rest", "sql"],
transformed,
transform: bool,
request_parameters: Optional[Dict[str, Any]] = None,
) -> Optional[List[Any]]:
"""Assembles serving vector from online feature store."""
Expand Down Expand Up @@ -537,11 +549,11 @@ def assemble_feature_vector(
if (
len(self.model_dependent_transformation_functions) > 0
or len(self.on_demand_transformation_functions) > 0
) and transformed:
) and transform:
self.apply_transformation(result_dict, request_parameters or {})

_logger.debug("Assembled and transformed dict feature vector: %s", result_dict)
if transformed:
if transform:
return [
result_dict.get(fname, None)
for fname in self.transformed_feature_vector_col_name
Expand All @@ -552,22 +564,206 @@ def assemble_feature_vector(
for fname in self._untransformed_feature_vector_col_name
]

def transform_feature_vectors(self, batch_features):
return [
self.apply_transformation(self.get_untransformed_features_map(features))
for features in batch_features
]
def _check_feature_vectors_type_and_convert_to_dict(
self,
feature_vectors: Union[List[Any], List[List[Any]], pd.DataFrame, pl.DataFrame],
on_demand_features: bool = False,
) -> Tuple[Dict[str, Any], Literal["pandas", "polars", "list"]]:
"""
Function that converts an input feature vector into a list of dictionaries.

def get_untransformed_features_map(self, features) -> Dict[str, Any]:
return dict(
[
(fname, fvalue)
for fname, fvalue in zip(
self._untransformed_feature_vector_col_name, features
)
]
# Arguments
feature_vectors: `Union[List[Any], List[List[Any]], pd.DataFrame, pl.DataFrame]`. The feature vectors to be converted.
on_demand_features : `bool`. Specify if on-demand features provided in the input feature vector.

# Returns
`Tuple[Dict[str, Any], Literal["pandas", "polars", "list"]]`: A tuple that contains the feature vector as a dictionary and a string denoting the data type of the input feature vector.

"""
if isinstance(feature_vectors, pd.DataFrame):
return_type = "pandas"
feature_vectors = feature_vectors.to_dict(orient="records")

elif isinstance(feature_vectors, pl.DataFrame):
return_type = "polars"
feature_vectors = feature_vectors.to_pandas()
feature_vectors = feature_vectors.to_dict(orient="records")

elif isinstance(feature_vectors, list) and feature_vectors:
if all(
isinstance(feature_vector, list) for feature_vector in feature_vectors
):
return_type = "list"
feature_vectors = [
self.get_untransformed_features_map(
feature_vector, on_demand_features=on_demand_features
)
for feature_vector in feature_vectors
]

else:
return_type = "list"
feature_vectors = [
self.get_untransformed_features_map(
feature_vectors, on_demand_features=on_demand_features
)
]

else:
raise exceptions.FeatureStoreException(
"Unsupported input type for feature vector. Supported types are `List`, `pandas.DataFrame`, `polars.DataFrame`"
)
return feature_vectors, return_type

def transform(
self,
feature_vectors: Union[List[Any], List[List[Any]], pd.DataFrame, pl.DataFrame],
) -> Union[List[Any], List[List[Any]], pd.DataFrame, pl.DataFrame]:
"""
Applies model dependent transformation on the provided feature vector.

# Arguments
feature_vectors: `Union[List[Any], List[List[Any]], pd.DataFrame, pl.DataFrame]`. The feature vectors to be transformed using attached model-dependent transformations.

# Returns
`Union[List[Any], List[List[Any]], pd.DataFrame, pl.DataFrame]`: The transformed feature vector.
"""
if not self._model_dependent_transformation_functions:
warnings.warn(
"Feature view does not have any attached model-dependent transformations. Returning input feature vectors.",
stacklevel=0,
)
return feature_vectors

feature_vectors, return_type = (
self._check_feature_vectors_type_and_convert_to_dict(
feature_vectors, on_demand_features=True
)
)
transformed_feature_vectors = []
for feature_vector in feature_vectors:
transformed_feature_vector = self.apply_model_dependent_transformations(
feature_vector
)
transformed_feature_vectors.append(
[
transformed_feature_vector.get(fname, None)
for fname in self.transformed_feature_vector_col_name
]
)

if len(transformed_feature_vectors) == 1:
batch = False
transformed_feature_vectors = transformed_feature_vectors[0]
else:
batch = True

return self.handle_feature_vector_return_type(
transformed_feature_vectors,
batch=batch,
inference_helper=False,
return_type=return_type,
transform=True,
)

def compute_on_demand_features(
self,
feature_vectors: Union[List[Any], List[List[Any]], pd.DataFrame, pl.DataFrame],
request_parameters: Union[List[Dict[str, Any]], Dict[str, Any]],
):
"""
Function computes on-demand features present in the feature view.

# Arguments
feature_vector: `Union[List[Any], List[List[Any]], pd.DataFrame, pl.DataFrame]`. The feature vector to be transformed.
request_parameters: Request parameters required by on-demand transformation functions to compute on-demand features present in the feature view.
# Returns
`Union[List[Any], List[List[Any]], pd.DataFrame, pl.DataFrame]`: The feature vector that contains all on-demand features in the feature view.
"""
if not self._on_demand_transformation_functions:
warnings.warn(
"Feature view does not have any on-demand features. Returning input feature vectors.",
stacklevel=1,
)
return feature_vectors

request_parameters = {} if not request_parameters else request_parameters
# Convert feature vectors to dictionary
feature_vectors, return_type = (
self._check_feature_vectors_type_and_convert_to_dict(feature_vectors)
)
# Check if all request parameters are provided
# If request parameter is a dictionary then copy it to list with the same length as that of entires
request_parameters = (
[request_parameters] * len(feature_vectors)
if isinstance(request_parameters, dict)
else request_parameters
)
self.check_missing_request_parameters(
features=feature_vectors[0], request_parameters=request_parameters[0]
)
on_demand_feature_vectors = []
for feature_vector, request_parameter in zip(
feature_vectors, request_parameters
):
on_demand_feature_vector = self.apply_on_demand_transformations(
feature_vector, request_parameter
)
on_demand_feature_vectors.append(
[
on_demand_feature_vector.get(fname, None)
for fname in self._on_demand_feature_vector_col_name
]
)

if len(on_demand_feature_vectors) == 1:
batch = False
on_demand_feature_vectors = on_demand_feature_vectors[0]
else:
batch = True

return self.handle_feature_vector_return_type(
on_demand_feature_vectors,
batch=batch,
inference_helper=False,
return_type=return_type,
transform=False,
on_demand_feature=True,
)

def get_untransformed_features_map(
self, features: List[Any], on_demand_features: bool = False
) -> Dict[str, Any]:
"""
Function that accepts a feature vectors as a list and returns the untransformed features as a dict that maps
feature names to their values

# Arguments
features : `List[Any]`. List of feature vectors.
on_demand_features : `bool`. Specify if on-demand features provided in the input feature vector.

# Returns
`Dict[str, Any]` : Dictionary mapping features name to values.
"""
if on_demand_features:
return dict(
[
(fname, fvalue)
for fname, fvalue in zip(
self._on_demand_feature_vector_col_name, features
)
]
)
else:
return dict(
[
(fname, fvalue)
for fname, fvalue in zip(
self._untransformed_feature_vector_col_name, features
)
]
)

def handle_feature_vector_return_type(
self,
feature_vectorz: Union[
Expand All @@ -576,7 +772,8 @@ def handle_feature_vector_return_type(
batch: bool,
inference_helper: bool,
return_type: Union[Literal["list", "dict", "numpy", "pandas", "polars"]],
transformed: bool = False,
transform: bool = False,
on_demand_feature: bool = False,
) -> Union[
pd.DataFrame,
pl.DataFrame,
Expand All @@ -586,10 +783,13 @@ def handle_feature_vector_return_type(
Dict[str, Any],
List[Dict[str, Any]],
]:
if transformed:
if transform:
column_names = self.transformed_feature_vector_col_name
else:
column_names = self._feature_vector_col_name
if on_demand_feature:
column_names = self._on_demand_feature_vector_col_name
else:
column_names = self._untransformed_feature_vector_col_name

# Only get-feature-vector and get-feature-vectors can return list or numpy
if return_type.lower() == "list" and not inference_helper:
Expand Down Expand Up @@ -652,12 +852,16 @@ def get_inference_helper(
batch=False,
inference_helper=True,
return_type=return_type,
transform=False,
on_demand_feature=False,
)
return self.handle_feature_vector_return_type(
self.sql_client.get_inference_helper_vector(entry),
batch=False,
inference_helper=True,
return_type=return_type,
transform=False,
on_demand_feature=False,
)

def get_inference_helpers(
Expand Down Expand Up @@ -706,7 +910,8 @@ def get_inference_helpers(
batch=True,
inference_helper=True,
return_type=return_type,
transformed=False,
transform=False,
on_demand_feature=False,
)

def which_client_and_ensure_initialised(
Expand Down
Loading
Loading