Skip to content

Commit a58f74a

Browse files
committed
code changes to tie ingestion and validation
1 parent 432955f commit a58f74a

File tree

3 files changed

+137
-27
lines changed

3 files changed

+137
-27
lines changed

ads/feature_store/common/utils/utility.py

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@
4040
from ads.feature_engineering.feature_type import datetime
4141

4242
logger = logging.getLogger(__name__)
43+
logger.setLevel(logging.INFO)
4344

4445

4546
def get_execution_engine_type(
@@ -116,6 +117,60 @@ def validate_delta_format_parameters(
116117
raise Exception(f"version number cannot be negative")
117118

118119

120+
def show_ingestion_summary(
121+
entity_type: EntityType = EntityType.FEATURE_GROUP, error_details: str = None
122+
):
123+
"""
124+
Displays a ingestion summary table with the given entity type and error details.
125+
126+
Args:
127+
entity_type (EntityType, optional): The type of entity being ingested. Defaults to EntityType.FEATURE_GROUP.
128+
error_details (str, optional): Details of any errors that occurred during ingestion. Defaults to None.
129+
"""
130+
from tabulate import tabulate
131+
132+
table_headers = ["entity_type", "ingestion_status", "error_details"]
133+
ingestion_status = "Failed" if error_details else "Succeeded"
134+
135+
table_values = [
136+
entity_type.value,
137+
ingestion_status,
138+
error_details if error_details else "None",
139+
]
140+
141+
logger.info(
142+
"Ingestion Summary \n"
143+
+ tabulate(
144+
[table_values],
145+
headers=table_headers,
146+
tablefmt="fancy_grid",
147+
numalign="center",
148+
stralign="center",
149+
)
150+
)
151+
152+
153+
def show_validation_summary(ingestion_status: str, statistics, expectation_type):
154+
from tabulate import tabulate
155+
156+
table_headers = (
157+
["expectation_type"] + list(statistics.keys()) + ["ingestion_status"]
158+
)
159+
160+
table_values = [expectation_type] + list(statistics.values()) + [ingestion_status]
161+
162+
logger.info(
163+
"Validation Summary \n"
164+
+ tabulate(
165+
[table_values],
166+
headers=table_headers,
167+
tablefmt="fancy_grid",
168+
numalign="center",
169+
stralign="center",
170+
)
171+
)
172+
173+
119174
def get_features(
120175
output_columns: List[dict],
121176
parent_id: str,

ads/feature_store/data_validation/great_expectation.py

Lines changed: 2 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -115,7 +115,7 @@ def apply_validations(expectation_details, expectation_suite_name, dataframe):
115115
str
116116
A string representation of the validation result.
117117
"""
118-
validation_output = None
118+
expectation_response = None
119119
if (
120120
expectation_details
121121
and expectation_details.get("expectationType")
@@ -126,14 +126,4 @@ def apply_validations(expectation_details, expectation_suite_name, dataframe):
126126
expectation_details, expectation_suite_name, dataframe
127127
)
128128

129-
validation_output = str(expectation_response)
130-
131-
if expectation_details["expectationType"] == ExpectationType.STRICT.value:
132-
if not expectation_response["success"]:
133-
raise Exception(
134-
"Expectation failed with statistics: {0} ... Aborting ingestion.".format(
135-
expectation_response["statistics"]
136-
)
137-
)
138-
139-
return validation_output
129+
return expectation_response

ads/feature_store/execution_strategy/spark/spark_execution.py

Lines changed: 80 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
#!/usr/bin/env python
22
# -*- coding: utf-8; -*-
3+
import json
34

45
# Copyright (c) 2023 Oracle and/or its affiliates.
56
# Licensed under the Universal Permissive License v 1.0 as shown at https://oss.oracle.com/licenses/upl/
@@ -8,7 +9,11 @@
89
import pandas as pd
910

1011
from ads.common.decorator.runtime_dependency import OptionalDependency
11-
from ads.feature_store.common.utils.utility import get_features
12+
from ads.feature_store.common.utils.utility import (
13+
get_features,
14+
show_ingestion_summary,
15+
show_validation_summary,
16+
)
1217
from ads.feature_store.execution_strategy.engine.spark_engine import SparkEngine
1318

1419
try:
@@ -25,6 +30,7 @@
2530
FeatureStoreJobType,
2631
LifecycleState,
2732
EntityType,
33+
ExpectationType,
2834
)
2935
from ads.feature_store.common.spark_session_singleton import SparkSessionSingleton
3036
from ads.feature_store.common.utils.feature_schema_mapper import (
@@ -147,6 +153,37 @@ def delete_dataset(self, dataset, dataset_job: DatasetJob):
147153
output_details=output_details,
148154
)
149155

156+
@staticmethod
157+
def _validate_expectation(expectation_type, validation_output):
158+
"""
159+
Validates the expectation based on the given expectation type and the validation output.
160+
161+
Args:
162+
expectation_type (str): The type of expectation to validate (e.g., "STRICT", "LENIENT").
163+
validation_output (dict): The output of the validation containing success status and statistics.
164+
165+
Raises:
166+
Exception: If the expectation fails in strict mode, raises an exception with an error message.
167+
168+
Warnings:
169+
If the expectation fails in lenient mode, logs a warning message.
170+
171+
"""
172+
173+
error_message = None
174+
ingestion_status = "Ingestion in progress"
175+
176+
if not validation_output["success"]:
177+
statistics = validation_output["statistics"]
178+
if expectation_type == ExpectationType.STRICT.value:
179+
error_message = f"Expectation failed with Insufficient Success Rate, Aborting ingestion"
180+
ingestion_status = "Insufficient Success Rate, Aborting ingestion"
181+
182+
show_validation_summary(ingestion_status, statistics, expectation_type)
183+
184+
if error_message:
185+
raise Exception(error_message)
186+
150187
def _save_offline_dataframe(
151188
self, data_frame, feature_group, feature_group_job: FeatureGroupJob
152189
):
@@ -185,12 +222,22 @@ def _save_offline_dataframe(
185222

186223
# TODO: Get event timestamp column and apply filtering basis from and to timestamp
187224

188-
# Apply validations
189-
validation_output = ExpectationService.apply_validations(
190-
expectation_details=feature_group.expectation_details,
191-
expectation_suite_name=feature_group.name,
192-
dataframe=data_frame,
193-
)
225+
if feature_group.expectation_details:
226+
expectation_type = feature_group.expectation_details["expectationType"]
227+
logger.info(f"Validation expectation type: {expectation_type}")
228+
229+
# Apply validations
230+
validation_output = ExpectationService.apply_validations(
231+
expectation_details=feature_group.expectation_details,
232+
expectation_suite_name=feature_group.name,
233+
dataframe=data_frame,
234+
)
235+
236+
if validation_output:
237+
self._validate_expectation(
238+
expectation_type=expectation_type,
239+
validation_output=validation_output,
240+
)
194241

195242
# Apply the transformation
196243
if feature_group.transformation_id:
@@ -241,9 +288,13 @@ def _save_offline_dataframe(
241288
f"FeatureGroup Materialization Failed with : {type(ex)} with error message: {ex}"
242289
)
243290

291+
show_ingestion_summary(
292+
entity_type=EntityType.FEATURE_GROUP, error_details=error_details
293+
)
294+
244295
output_details = {
245296
"error_details": error_details,
246-
"validation_output": validation_output,
297+
"validation_output": str(validation_output),
247298
"commit_id": "commit_id",
248299
"feature_statistics": feature_statistics,
249300
}
@@ -326,12 +377,22 @@ def _save_dataset_input(self, dataset, dataset_job: DatasetJob):
326377
# Execute the SQL query on the spark and load the dataframe.
327378
dataset_dataframe = self.spark_engine.sql(dataset.query)
328379

329-
# Apply validations
330-
validation_output = ExpectationService.apply_validations(
331-
expectation_details=dataset.expectation_details,
332-
expectation_suite_name=dataset.name,
333-
dataframe=dataset_dataframe,
334-
)
380+
if dataset.expectation_details:
381+
expectation_type = dataset.expectation_details["expectationType"]
382+
logger.info(f"Validation expectation type: {expectation_type}")
383+
384+
# Apply validations
385+
validation_output = ExpectationService.apply_validations(
386+
expectation_details=dataset.expectation_details,
387+
expectation_suite_name=dataset.name,
388+
dataframe=dataset_dataframe,
389+
)
390+
391+
if validation_output:
392+
self._validate_expectation(
393+
expectation_type=expectation_type,
394+
validation_output=validation_output,
395+
)
335396

336397
self.delta_lake_service.save_delta_dataframe(
337398
dataset_dataframe,
@@ -360,9 +421,13 @@ def _save_dataset_input(self, dataset, dataset_job: DatasetJob):
360421
f"Dataset Materialization Failed with : {type(ex)} with error message: {ex}"
361422
)
362423

424+
show_ingestion_summary(
425+
entity_type=EntityType.DATASET, error_details=error_details
426+
)
427+
363428
output_details = {
364429
"error_details": error_details,
365-
"validation_output": validation_output,
430+
"validation_output": str(validation_output),
366431
"commit_id": "commit_id",
367432
"feature_statistics": feature_statistics,
368433
}

0 commit comments

Comments
 (0)