Skip to content

Commit 52d463f

Browse files
authored
code changes to tie ingestion and validation (#247)
2 parents 09c31ec + db35130 commit 52d463f

File tree

3 files changed

+168
-28
lines changed

3 files changed

+168
-28
lines changed

ads/feature_store/common/utils/utility.py

Lines changed: 83 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
#!/usr/bin/env python
22
# -*- coding: utf-8; -*-
3-
3+
import copy
44
# Copyright (c) 2023 Oracle and/or its affiliates.
55
# Licensed under the Universal Permissive License v 1.0 as shown at https://oss.oracle.com/licenses/upl/
66

@@ -41,6 +41,7 @@
4141
from ads.feature_engineering.feature_type import datetime
4242

4343
logger = logging.getLogger(__name__)
44+
logger.setLevel(logging.INFO)
4445

4546

4647
def get_execution_engine_type(
@@ -117,6 +118,87 @@ def validate_delta_format_parameters(
117118
raise Exception(f"version number cannot be negative")
118119

119120

121+
def show_ingestion_summary(
122+
entity_id: str,
123+
entity_type: EntityType = EntityType.FEATURE_GROUP,
124+
error_details: str = None,
125+
):
126+
"""
127+
Displays a ingestion summary table with the given entity type and error details.
128+
129+
Args:
130+
entity_id: str
131+
entity_type (EntityType, optional): The type of entity being ingested. Defaults to EntityType.FEATURE_GROUP.
132+
error_details (str, optional): Details of any errors that occurred during ingestion. Defaults to None.
133+
"""
134+
from tabulate import tabulate
135+
136+
table_headers = ["entity_id", "entity_type", "ingestion_status", "error_details"]
137+
ingestion_status = "Failed" if error_details else "Succeeded"
138+
139+
table_values = [
140+
entity_id,
141+
entity_type.value,
142+
ingestion_status,
143+
error_details if error_details else "None",
144+
]
145+
146+
logger.info(
147+
"Ingestion Summary \n"
148+
+ tabulate(
149+
[table_values],
150+
headers=table_headers,
151+
tablefmt="fancy_grid",
152+
numalign="center",
153+
stralign="center",
154+
)
155+
)
156+
157+
158+
def show_validation_summary(ingestion_status: str, validation_output, expectation_type):
159+
from tabulate import tabulate
160+
statistics = validation_output["statistics"]
161+
162+
table_headers = (
163+
["expectation_type"] + list(statistics.keys()) + ["ingestion_status"]
164+
)
165+
166+
table_values = [expectation_type] + list(statistics.values()) + [ingestion_status]
167+
168+
logger.info(
169+
"Validation Summary \n"
170+
+ tabulate(
171+
[table_values],
172+
headers=table_headers,
173+
tablefmt="fancy_grid",
174+
numalign="center",
175+
stralign="center",
176+
)
177+
)
178+
179+
rule_table_headers = ["rule_type", "arguments", "status"]
180+
181+
rule_table_values = [
182+
[
183+
rule_output["expectation_config"].get("expectation_type"),
184+
{key: value for key, value in rule_output["expectation_config"]["kwargs"].items() if key != "batch_id"},
185+
rule_output.get("success")
186+
]
187+
for rule_output in validation_output["results"]
188+
]
189+
190+
logger.info(
191+
"Validations Rules Summary \n"
192+
+ tabulate(
193+
rule_table_values,
194+
headers=rule_table_headers,
195+
tablefmt="fancy_grid",
196+
numalign="center",
197+
stralign="center",
198+
)
199+
)
200+
201+
120202
def get_features(
121203
output_columns: List[dict],
122204
parent_id: str,

ads/feature_store/data_validation/great_expectation.py

Lines changed: 2 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -115,7 +115,7 @@ def apply_validations(expectation_details, expectation_suite_name, dataframe):
115115
str
116116
A string representation of the validation result.
117117
"""
118-
validation_output = None
118+
expectation_response = None
119119
if (
120120
expectation_details
121121
and expectation_details.get("expectationType")
@@ -126,14 +126,4 @@ def apply_validations(expectation_details, expectation_suite_name, dataframe):
126126
expectation_details, expectation_suite_name, dataframe
127127
)
128128

129-
validation_output = str(expectation_response)
130-
131-
if expectation_details["expectationType"] == ExpectationType.STRICT.value:
132-
if not expectation_response["success"]:
133-
raise Exception(
134-
"Expectation failed with statistics: {0} ... Aborting ingestion.".format(
135-
expectation_response["statistics"]
136-
)
137-
)
138-
139-
return validation_output
129+
return expectation_response

ads/feature_store/execution_strategy/spark/spark_execution.py

Lines changed: 83 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
#!/usr/bin/env python
22
# -*- coding: utf-8; -*-
3+
import json
34

45
# Copyright (c) 2023 Oracle and/or its affiliates.
56
# Licensed under the Universal Permissive License v 1.0 as shown at https://oss.oracle.com/licenses/upl/
@@ -8,7 +9,11 @@
89
import pandas as pd
910

1011
from ads.common.decorator.runtime_dependency import OptionalDependency
11-
from ads.feature_store.common.utils.utility import get_features
12+
from ads.feature_store.common.utils.utility import (
13+
get_features,
14+
show_ingestion_summary,
15+
show_validation_summary,
16+
)
1217
from ads.feature_store.execution_strategy.engine.spark_engine import SparkEngine
1318

1419
try:
@@ -25,6 +30,7 @@
2530
FeatureStoreJobType,
2631
LifecycleState,
2732
EntityType,
33+
ExpectationType,
2834
)
2935
from ads.feature_store.common.spark_session_singleton import SparkSessionSingleton
3036
from ads.feature_store.common.utils.transformation_utils import TransformationUtils
@@ -145,6 +151,36 @@ def delete_dataset(self, dataset, dataset_job: DatasetJob):
145151
output_details=output_details,
146152
)
147153

154+
@staticmethod
155+
def _validate_expectation(expectation_type, validation_output):
156+
"""
157+
Validates the expectation based on the given expectation type and the validation output.
158+
159+
Args:
160+
expectation_type (str): The type of expectation to validate (e.g., "STRICT", "LENIENT").
161+
validation_output (dict): The output of the validation containing success status and statistics.
162+
163+
Raises:
164+
Exception: If the expectation fails in strict mode, raises an exception with an error message.
165+
166+
Warnings:
167+
If the expectation fails in lenient mode, logs a warning message.
168+
169+
"""
170+
171+
error_message = None
172+
ingestion_status = "Ingestion in progress"
173+
174+
if not validation_output["success"]:
175+
if expectation_type == ExpectationType.STRICT.value:
176+
error_message = f"Expectation failed with Insufficient Success Rate, Aborting ingestion"
177+
ingestion_status = "Insufficient Success Rate, Aborting ingestion"
178+
179+
show_validation_summary(ingestion_status, validation_output, expectation_type)
180+
181+
if error_message:
182+
raise Exception(error_message)
183+
148184
def _save_offline_dataframe(
149185
self, data_frame, feature_group, feature_group_job: FeatureGroupJob
150186
):
@@ -182,12 +218,22 @@ def _save_offline_dataframe(
182218

183219
# TODO: Get event timestamp column and apply filtering basis from and to timestamp
184220

185-
# Apply validations
186-
validation_output = ExpectationService.apply_validations(
187-
expectation_details=feature_group.expectation_details,
188-
expectation_suite_name=feature_group.name,
189-
dataframe=data_frame,
190-
)
221+
if feature_group.expectation_details:
222+
expectation_type = feature_group.expectation_details["expectationType"]
223+
logger.info(f"Validation expectation type: {expectation_type}")
224+
225+
# Apply validations
226+
validation_output = ExpectationService.apply_validations(
227+
expectation_details=feature_group.expectation_details,
228+
expectation_suite_name=feature_group.name,
229+
dataframe=data_frame,
230+
)
231+
232+
if validation_output:
233+
self._validate_expectation(
234+
expectation_type=expectation_type,
235+
validation_output=validation_output,
236+
)
191237

192238
# Apply the transformation
193239
if feature_group.transformation_id:
@@ -238,9 +284,15 @@ def _save_offline_dataframe(
238284
f"FeatureGroup Materialization Failed with : {type(ex)} with error message: {ex}"
239285
)
240286

287+
show_ingestion_summary(
288+
entity_id=feature_group.id,
289+
entity_type=EntityType.FEATURE_GROUP,
290+
error_details=error_details,
291+
)
292+
241293
output_details = {
242294
"error_details": error_details,
243-
"validation_output": validation_output,
295+
"validation_output": str(validation_output),
244296
"commit_id": "commit_id",
245297
"feature_statistics": feature_statistics,
246298
}
@@ -323,12 +375,22 @@ def _save_dataset_input(self, dataset, dataset_job: DatasetJob):
323375
# Execute the SQL query on the spark and load the dataframe.
324376
dataset_dataframe = self.spark_engine.sql(dataset.query)
325377

326-
# Apply validations
327-
validation_output = ExpectationService.apply_validations(
328-
expectation_details=dataset.expectation_details,
329-
expectation_suite_name=dataset.name,
330-
dataframe=dataset_dataframe,
331-
)
378+
if dataset.expectation_details:
379+
expectation_type = dataset.expectation_details["expectationType"]
380+
logger.info(f"Validation expectation type: {expectation_type}")
381+
382+
# Apply validations
383+
validation_output = ExpectationService.apply_validations(
384+
expectation_details=dataset.expectation_details,
385+
expectation_suite_name=dataset.name,
386+
dataframe=dataset_dataframe,
387+
)
388+
389+
if validation_output:
390+
self._validate_expectation(
391+
expectation_type=expectation_type,
392+
validation_output=validation_output,
393+
)
332394

333395
self.delta_lake_service.save_delta_dataframe(
334396
dataset_dataframe,
@@ -357,9 +419,15 @@ def _save_dataset_input(self, dataset, dataset_job: DatasetJob):
357419
f"Dataset Materialization Failed with : {type(ex)} with error message: {ex}"
358420
)
359421

422+
show_ingestion_summary(
423+
entity_id=dataset.id,
424+
entity_type=EntityType.DATASET,
425+
error_details=error_details,
426+
)
427+
360428
output_details = {
361429
"error_details": error_details,
362-
"validation_output": validation_output,
430+
"validation_output": str(validation_output),
363431
"commit_id": "commit_id",
364432
"feature_statistics": feature_statistics,
365433
}

0 commit comments

Comments
 (0)