Skip to content

Commit ddb262d

Browse files
authored
ODSC-45790: Improve error reporting. Improve get_feature_df output. H… (#281)
2 parents ba93f40 + 3edfa43 commit ddb262d

File tree

7 files changed

+165
-21
lines changed

7 files changed

+165
-21
lines changed

ads/feature_store/dataset.py

Lines changed: 56 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,11 @@
88

99
import pandas
1010
from great_expectations.core import ExpectationSuite
11+
from oci.feature_store.models import (
12+
DatasetFeatureGroupCollection,
13+
DatasetFeatureGroupSummary,
14+
)
15+
1116
from ads.common import utils
1217
from ads.common.oci_mixin import OCIModelMixin
1318
from ads.feature_store.common.enums import (
@@ -26,6 +31,7 @@
2631
OciExecutionStrategyProvider,
2732
)
2833
from ads.feature_store.feature import DatasetFeature
34+
from ads.feature_store.feature_group import FeatureGroup
2935
from ads.feature_store.feature_group_expectation import Expectation
3036
from ads.feature_store.feature_option_details import FeatureOptionDetails
3137
from ads.feature_store.service.oci_dataset import OCIDataset
@@ -113,6 +119,7 @@ class Dataset(Builder):
113119
CONST_ITEMS = "items"
114120
CONST_LAST_JOB_ID = "jobId"
115121
CONST_MODEL_DETAILS = "modelDetails"
122+
CONST_FEATURE_GROUP = "datasetFeatureGroups"
116123

117124
attribute_map = {
118125
CONST_ID: "id",
@@ -130,6 +137,7 @@ class Dataset(Builder):
130137
CONST_LIFECYCLE_STATE: "lifecycle_state",
131138
CONST_MODEL_DETAILS: "model_details",
132139
CONST_PARTITION_KEYS: "partition_keys",
140+
CONST_FEATURE_GROUP: "dataset_feature_groups",
133141
}
134142

135143
def __init__(self, spec: Dict = None, **kwargs) -> None:
@@ -503,6 +511,44 @@ def with_model_details(self, model_details: ModelDetails) -> "Dataset":
503511

504512
return self.set_spec(self.CONST_MODEL_DETAILS, model_details.to_dict())
505513

514+
@property
515+
def feature_groups(self) -> List["FeatureGroup"]:
516+
collection: "DatasetFeatureGroupCollection" = self.get_spec(
517+
self.CONST_FEATURE_GROUP
518+
)
519+
feature_groups: List["FeatureGroup"] = []
520+
if collection and collection.items:
521+
for datasetFGSummary in collection.items:
522+
feature_groups.append(
523+
FeatureGroup.from_id(datasetFGSummary.feature_group_id)
524+
)
525+
526+
return feature_groups
527+
528+
@feature_groups.setter
529+
def feature_groups(self, feature_groups: List["FeatureGroup"]):
530+
self.with_feature_groups(feature_groups)
531+
532+
def with_feature_groups(self, feature_groups: List["FeatureGroup"]) -> "Dataset":
533+
"""Sets the model details for the dataset.
534+
535+
Parameters
536+
----------
537+
feature_groups: List of feature groups
538+
Returns
539+
-------
540+
Dataset
541+
The Dataset instance (self).
542+
543+
"""
544+
collection: List["DatasetFeatureGroupSummary"] = []
545+
for group in feature_groups:
546+
collection.append(DatasetFeatureGroupSummary(feature_group_id=group.id))
547+
548+
return self.set_spec(
549+
self.CONST_FEATURE_GROUP, DatasetFeatureGroupCollection(items=collection)
550+
)
551+
506552
@property
507553
def partition_keys(self) -> List[str]:
508554
return self.get_spec(self.CONST_PARTITION_KEYS)
@@ -560,7 +606,9 @@ def add_models(self, model_details: ModelDetails) -> "Dataset":
560606
f"Dataset update Failed with : {type(ex)} with error message: {ex}"
561607
)
562608
if existing_model_details:
563-
self.with_model_details(ModelDetails().with_items(existing_model_details["items"]))
609+
self.with_model_details(
610+
ModelDetails().with_items(existing_model_details["items"])
611+
)
564612
else:
565613
self.with_model_details(ModelDetails().with_items([]))
566614
return self
@@ -652,6 +700,7 @@ def create(self, **kwargs) -> "Dataset":
652700
# Create dataset
653701
logger.info("Saving dataset.")
654702
self.oci_dataset = self._to_oci_dataset(**kwargs).create()
703+
self._update_from_oci_dataset_model(self.oci_dataset)
655704
self.with_id(self.oci_dataset.id)
656705
return self
657706

@@ -729,11 +778,10 @@ def _update_from_oci_dataset_model(self, oci_dataset: OCIDataset) -> "Dataset":
729778

730779
# Update the main properties
731780
self.oci_dataset = oci_dataset
732-
dataset_details = oci_dataset.to_dict()
733781

734782
for infra_attr, dsc_attr in self.attribute_map.items():
735-
if infra_attr in dataset_details:
736-
self.set_spec(infra_attr, dataset_details[infra_attr])
783+
if dsc_attr in self.oci_dataset.attribute_map:
784+
self.set_spec(infra_attr, getattr(self.oci_dataset, dsc_attr))
737785

738786
return self
739787

@@ -1013,6 +1061,10 @@ def to_dict(self) -> Dict:
10131061
for key, value in spec.items():
10141062
if hasattr(value, "to_dict"):
10151063
value = value.to_dict()
1064+
if hasattr(value, "attribute_map"):
1065+
value = self.oci_dataset.client.base_client.sanitize_for_serialization(
1066+
value
1067+
)
10161068
spec[key] = value
10171069

10181070
return {

ads/feature_store/execution_strategy/spark/spark_execution.py

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
show_validation_summary,
1717
)
1818
from ads.feature_store.execution_strategy.engine.spark_engine import SparkEngine
19+
import traceback
1920

2021
try:
2122
from pyspark.sql import DataFrame
@@ -291,8 +292,9 @@ def _save_offline_dataframe(
291292

292293
except Exception as ex:
293294
error_details = str(ex)
295+
tb = traceback.format_exc()
294296
logger.error(
295-
f"FeatureGroup Materialization Failed with : {type(ex)} with error message: {ex}"
297+
f"FeatureGroup Materialization Failed with : {type(ex)} with error message: {ex} and stacktrace {tb}",
296298
)
297299

298300
show_ingestion_summary(
@@ -427,8 +429,9 @@ def _save_dataset_input(self, dataset, dataset_job: DatasetJob):
427429

428430
except Exception as ex:
429431
error_details = str(ex)
432+
tb = traceback.format_exc()
430433
logger.error(
431-
f"Dataset Materialization Failed with : {type(ex)} with error message: {ex}"
434+
f"Dataset Materialization Failed with : {type(ex)} with error message: {ex} and stacktrace {tb}"
432435
)
433436

434437
show_ingestion_summary(

ads/feature_store/feature_group.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -748,7 +748,6 @@ def get_features_df(self) -> "pd.DataFrame":
748748
{
749749
"name": feature.feature_name,
750750
"type": feature.feature_type,
751-
"feature_group_id": feature.feature_group_id,
752751
}
753752
)
754753
return pd.DataFrame.from_records(records)

tests/integration/feature_store/test_base.py

Lines changed: 17 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010

1111
import oci
1212
import pandas as pd
13+
from ads.feature_store.entity import Entity
1314
from great_expectations.core import ExpectationSuite, ExpectationConfiguration
1415
import ads
1516
import os
@@ -22,7 +23,7 @@
2223

2324
client_kwargs = dict(
2425
retry_strategy=oci.retry.NoneRetryStrategy,
25-
service_endpoint=os.getenv("service_endpoint"),
26+
service_endpoint="http://127.0.0.1:21000/20230101",
2627
)
2728
ads.set_auth(client_kwargs=client_kwargs)
2829

@@ -36,20 +37,26 @@
3637

3738

3839
def transformation_with_kwargs(data_frame, **kwargs):
39-
is_area_enabled = kwargs.get('is_area_enabled')
40+
is_area_enabled = kwargs.get("is_area_enabled")
4041

4142
if is_area_enabled:
4243
# Calculate petal area and sepal area
43-
data_frame["petal_area"] = data_frame["petal_length"] * data_frame["petal_width"]
44-
data_frame["sepal_area"] = data_frame["sepal_length"] * data_frame["sepal_width"]
44+
data_frame["petal_area"] = (
45+
data_frame["petal_length"] * data_frame["petal_width"]
46+
)
47+
data_frame["sepal_area"] = (
48+
data_frame["sepal_length"] * data_frame["sepal_width"]
49+
)
4550

4651
# Return the updated DataFrame
4752
return data_frame
4853

4954

5055
class FeatureStoreTestCase:
5156
# networks compartment in feature store
52-
TIME_NOW = str.format("{}_{}",datetime.utcnow().strftime("%Y_%m_%d_%H_%M_%S"),int(random()*1000))
57+
TIME_NOW = str.format(
58+
"{}_{}", datetime.utcnow().strftime("%Y_%m_%d_%H_%M_%S"), int(random() * 1000)
59+
)
5360
TENANCY_ID = "ocid1.tenancy.oc1..aaaaaaaa462hfhplpx652b32ix62xrdijppq2c7okwcqjlgrbknhgtj2kofa"
5461
COMPARTMENT_ID = "ocid1.tenancy.oc1..aaaaaaaa462hfhplpx652b32ix62xrdijppq2c7okwcqjlgrbknhgtj2kofa"
5562
METASTORE_ID = "ocid1.datacatalogmetastore.oc1.iad.amaaaaaabiudgxyap7tizm4gscwz7amu7dixz7ml3mtesqzzwwg3urvvdgua"
@@ -376,9 +383,11 @@ def create_entity_resource(self, feature_store) -> "Entity":
376383
return entity
377384

378385
def create_transformation_resource(self, feature_store) -> "Transformation":
379-
transformation = feature_store.create_transformation(source_code_func=transformation_with_kwargs,
380-
display_name="transformation_with_kwargs",
381-
transformation_mode=TransformationMode.PANDAS)
386+
transformation = feature_store.create_transformation(
387+
source_code_func=transformation_with_kwargs,
388+
display_name="transformation_with_kwargs",
389+
transformation_mode=TransformationMode.PANDAS,
390+
)
382391
return transformation
383392

384393
def define_feature_group_resource(
Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,76 @@
1+
#!/usr/bin/env python
2+
# -*- coding: utf-8 -*--
3+
4+
# Copyright (c) 2023 Oracle and/or its affiliates.
5+
# Licensed under the Universal Permissive License v 1.0 as shown at https://oss.oracle.com/licenses/upl/
6+
7+
import pytest
8+
import unittest
9+
10+
from ads.feature_store.dataset import Dataset
11+
12+
from ads.feature_store.entity import Entity
13+
14+
from ads.feature_store.feature_store import FeatureStore
15+
16+
from ads.feature_store.feature_group import FeatureGroup
17+
18+
from ads.feature_store.feature_option_details import FeatureOptionDetails
19+
from tests.integration.feature_store.test_base import FeatureStoreTestCase
20+
21+
22+
class TestDatasetComplex(FeatureStoreTestCase):
23+
"""Contains integration tests for Dataset Delta changes."""
24+
25+
@pytest.fixture()
26+
def feature_store(self) -> FeatureStore:
27+
feature_store = self.define_feature_store_resource().create()
28+
yield feature_store
29+
# self.clean_up_feature_store(feature_store)
30+
31+
@pytest.fixture()
32+
def entity(self, feature_store: FeatureStore):
33+
entity = self.create_entity_resource(feature_store)
34+
yield entity
35+
# self.clean_up_entity(entity)
36+
37+
@pytest.fixture()
38+
def feature_group(self, entity, feature_store) -> "FeatureGroup":
39+
feature_group = self.define_feature_group_resource(
40+
entity.oci_fs_entity.id, feature_store.oci_fs.id
41+
).create()
42+
yield feature_group
43+
# self.clean_up_feature_group(feature_group)
44+
45+
def test_manual_dataset(
46+
self,
47+
feature_store: FeatureStore,
48+
entity: Entity,
49+
feature_group: FeatureGroup,
50+
):
51+
query = """
52+
(SELECT
53+
name, games, goals
54+
FROM tblMadrid WHERE name = 'ronaldo')
55+
UNION
56+
(SELECT
57+
name, games, goals
58+
FROM tblBarcelona WHERE name = 'messi')
59+
ORDER BY goals"""
60+
name = self.get_name("fireside_football_debate")
61+
dataset_resource = (
62+
Dataset()
63+
.with_description("dataset description")
64+
.with_compartment_id(self.COMPARTMENT_ID)
65+
.with_name(name)
66+
.with_entity_id(entity_id=entity.id)
67+
.with_feature_store_id(feature_store_id=feature_store.id)
68+
.with_query(query)
69+
.with_feature_groups([feature_group])
70+
).create()
71+
assert len(dataset_resource.feature_groups) == 1
72+
assert dataset_resource.feature_groups[0].id == feature_group.id
73+
assert dataset_resource.get_spec(
74+
Dataset.CONST_FEATURE_GROUP
75+
).is_manual_association
76+
return dataset_resource

tests/integration/feature_store/test_dataset_delta.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,8 @@
77
import pytest
88
import unittest
99

10+
from ads.feature_store.dataset import Dataset
11+
1012
from ads.feature_store.feature_option_details import FeatureOptionDetails
1113
from tests.integration.feature_store.test_base import FeatureStoreTestCase
1214

@@ -125,6 +127,9 @@ def test_dataset_materialise_overwrite(self, feature_group, dataset):
125127
dataset.materialise(ingestion_mode=IngestionMode.OVERWRITE)
126128

127129
df = dataset.preview(row_count=50)
130+
assert (
131+
dataset.get_spec(Dataset.CONST_FEATURE_GROUP).is_manual_association == False
132+
)
128133
assert df.count() == 14
129134
assert len(df.columns) == 6
130135

tests/unitary/with_extras/feature_store/test_dataset.py

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -259,12 +259,12 @@ def test__to_oci_fs_entity(self, mock_load_key_file, mock_config_from_file):
259259
@patch.object(SparkSessionSingleton, "__init__", return_value=None)
260260
@patch.object(SparkSessionSingleton, "get_spark_session")
261261
def test_materialise(self, spark, get_spark_session, mock_update):
262-
with patch.object(DatasetJob, "create") as mock_dataset_job:
263-
with patch.object(FeatureStore, "from_id"):
264-
with patch.object(DatasetJob, "_mark_job_complete"):
265-
mock_dataset_job.return_value = self.mock_dsc_dataset_job
266-
self.mock_dsc_dataset.with_id(DATASET_OCID)
267-
self.mock_dsc_dataset.materialise()
262+
with patch.object(DatasetJob, "create") as mock_dataset_job:
263+
with patch.object(FeatureStore, "from_id"):
264+
with patch.object(DatasetJob, "_mark_job_complete"):
265+
mock_dataset_job.return_value = self.mock_dsc_dataset_job
266+
self.mock_dsc_dataset.with_id(DATASET_OCID)
267+
self.mock_dsc_dataset.materialise()
268268

269269
@patch.object(SparkSessionSingleton, "__init__", return_value=None)
270270
@patch.object(SparkSessionSingleton, "get_spark_session")

0 commit comments

Comments
 (0)