Skip to content

Commit c552745

Browse files
authored
Odsc 45790 support complex queries (#291)
2 parents 440947c + fc749d7 commit c552745

File tree

7 files changed

+220
-42
lines changed

7 files changed

+220
-42
lines changed

ads/feature_store/dataset.py

Lines changed: 80 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,20 @@
11
#!/usr/bin/env python
22
# -*- coding: utf-8; -*-
3-
import json
43
import logging
54
from copy import deepcopy
65
from datetime import datetime
76
from typing import Dict, List, Union
87

98
import pandas
9+
import pandas as pd
1010
from great_expectations.core import ExpectationSuite
1111

1212
from ads import deprecated
13+
from oci.feature_store.models import (
14+
DatasetFeatureGroupCollection,
15+
DatasetFeatureGroupSummary,
16+
)
17+
1318
from ads.common import utils
1419
from ads.common.oci_mixin import OCIModelMixin
1520
from ads.feature_store.common.enums import (
@@ -29,6 +34,7 @@
2934
OciExecutionStrategyProvider,
3035
)
3136
from ads.feature_store.feature import DatasetFeature
37+
from ads.feature_store.feature_group import FeatureGroup
3238
from ads.feature_store.feature_group_expectation import Expectation
3339
from ads.feature_store.feature_option_details import FeatureOptionDetails
3440
from ads.feature_store.service.oci_dataset import OCIDataset
@@ -116,6 +122,7 @@ class Dataset(Builder):
116122
CONST_ITEMS = "items"
117123
CONST_LAST_JOB_ID = "jobId"
118124
CONST_MODEL_DETAILS = "modelDetails"
125+
CONST_FEATURE_GROUP = "datasetFeatureGroups"
119126

120127
attribute_map = {
121128
CONST_ID: "id",
@@ -133,6 +140,7 @@ class Dataset(Builder):
133140
CONST_LIFECYCLE_STATE: "lifecycle_state",
134141
CONST_MODEL_DETAILS: "model_details",
135142
CONST_PARTITION_KEYS: "partition_keys",
143+
CONST_FEATURE_GROUP: "dataset_feature_groups",
136144
}
137145

138146
def __init__(self, spec: Dict = None, **kwargs) -> None:
@@ -151,6 +159,7 @@ def __init__(self, spec: Dict = None, **kwargs) -> None:
151159
super().__init__(spec=spec, **deepcopy(kwargs))
152160
# Specify oci Dataset instance
153161
self.dataset_job = None
162+
self._is_manual_association: bool = False
154163
self._spark_engine = None
155164
self.oci_dataset = self._to_oci_dataset(**kwargs)
156165
self.lineage = OCILineage(**kwargs)
@@ -183,6 +192,16 @@ def spark_engine(self):
183192
self._spark_engine = SparkEngine(get_metastore_id(self.feature_store_id))
184193
return self._spark_engine
185194

195+
@property
196+
def is_manual_association(self):
197+
collection: DatasetFeatureGroupCollection = self.get_spec(
198+
self.CONST_FEATURE_GROUP
199+
)
200+
if collection and collection.is_manual_association is not None:
201+
return collection.is_manual_association
202+
else:
203+
return self._is_manual_association
204+
186205
@property
187206
def kind(self) -> str:
188207
"""The kind of the object as showing in a YAML."""
@@ -530,6 +549,54 @@ def with_model_details(self, model_details: ModelDetails) -> "Dataset":
530549

531550
return self.set_spec(self.CONST_MODEL_DETAILS, model_details.to_dict())
532551

552+
@property
553+
def feature_groups(self) -> List["FeatureGroup"]:
554+
collection: "DatasetFeatureGroupCollection" = self.get_spec(
555+
self.CONST_FEATURE_GROUP
556+
)
557+
feature_groups: List["FeatureGroup"] = []
558+
if collection and collection.items:
559+
for datasetFGSummary in collection.items:
560+
feature_groups.append(
561+
FeatureGroup.from_id(datasetFGSummary.feature_group_id)
562+
)
563+
564+
return feature_groups
565+
566+
@feature_groups.setter
567+
def feature_groups(self, feature_groups: List["FeatureGroup"]):
568+
self.with_feature_groups(feature_groups)
569+
570+
def with_feature_groups(self, feature_groups: List["FeatureGroup"]) -> "Dataset":
571+
"""Sets the model details for the dataset.
572+
573+
Parameters
574+
----------
575+
feature_groups: List of feature groups
576+
Returns
577+
-------
578+
Dataset
579+
The Dataset instance (self).
580+
581+
"""
582+
collection: List["DatasetFeatureGroupSummary"] = []
583+
for group in feature_groups:
584+
collection.append(DatasetFeatureGroupSummary(feature_group_id=group.id))
585+
586+
self._is_manual_association = True
587+
return self.set_spec(
588+
self.CONST_FEATURE_GROUP,
589+
DatasetFeatureGroupCollection(items=collection, is_manual_association=True),
590+
)
591+
592+
def feature_groups_to_df(self):
593+
return pd.DataFrame.from_records(
594+
[
595+
feature_group.oci_feature_group.to_df_record()
596+
for feature_group in self.feature_groups
597+
]
598+
)
599+
533600
@property
534601
def partition_keys(self) -> List[str]:
535602
return self.get_spec(self.CONST_PARTITION_KEYS)
@@ -641,7 +708,7 @@ def show(self, rankdir: str = GraphOrientation.LEFT_RIGHT) -> None:
641708
f"Can't get lineage information for Feature group id {self.id}"
642709
)
643710

644-
def create(self, **kwargs) -> "Dataset":
711+
def create(self, validate_sql=False, **kwargs) -> "Dataset":
645712
"""Creates dataset resource.
646713
647714
!!! note "Lazy"
@@ -654,6 +721,8 @@ def create(self, **kwargs) -> "Dataset":
654721
kwargs
655722
Additional kwargs arguments.
656723
Can be any attribute that `oci.feature_store.models.Dataset` accepts.
724+
validate_sql:
725+
Boolean value indicating whether to validate sql before creating dataset
657726
658727
Returns
659728
-------
@@ -674,13 +743,17 @@ def create(self, **kwargs) -> "Dataset":
674743
if self.statistics_config is None:
675744
self.statistics_config = StatisticsConfig()
676745

746+
if validate_sql is True:
747+
self.spark_engine.sql(self.get_spec(self.CONST_QUERY))
748+
677749
payload = deepcopy(self._spec)
678750
payload.pop("id", None)
679751
logger.debug(f"Creating a dataset resource with payload {payload}")
680752

681753
# Create dataset
682754
logger.info("Saving dataset.")
683755
self.oci_dataset = self._to_oci_dataset(**kwargs).create()
756+
self._update_from_oci_dataset_model(self.oci_dataset)
684757
self.with_id(self.oci_dataset.id)
685758
return self
686759

@@ -793,8 +866,7 @@ def _update_from_oci_dataset_model(self, oci_dataset: OCIDataset) -> "Dataset":
793866

794867
value = {self.CONST_ITEMS: features_list}
795868
else:
796-
value = dataset_details[infra_attr]
797-
869+
value = getattr(self.oci_dataset, dsc_attr)
798870
self.set_spec(infra_attr, value)
799871

800872
return self
@@ -1134,6 +1206,10 @@ def to_dict(self) -> Dict:
11341206
for key, value in spec.items():
11351207
if hasattr(value, "to_dict"):
11361208
value = value.to_dict()
1209+
if hasattr(value, "attribute_map"):
1210+
value = self.oci_dataset.client.base_client.sanitize_for_serialization(
1211+
value
1212+
)
11371213
spec[key] = value
11381214

11391215
return {

ads/feature_store/execution_strategy/spark/spark_execution.py

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
show_validation_summary,
1717
)
1818
from ads.feature_store.execution_strategy.engine.spark_engine import SparkEngine
19+
import traceback
1920

2021
try:
2122
from pyspark.sql import DataFrame
@@ -291,8 +292,9 @@ def _save_offline_dataframe(
291292

292293
except Exception as ex:
293294
error_details = str(ex)
295+
tb = traceback.format_exc()
294296
logger.error(
295-
f"FeatureGroup Materialization Failed with : {type(ex)} with error message: {ex}"
297+
f"FeatureGroup Materialization Failed with : {type(ex)} with error message: {ex} and stacktrace {tb}",
296298
)
297299

298300
show_ingestion_summary(
@@ -427,8 +429,9 @@ def _save_dataset_input(self, dataset, dataset_job: DatasetJob):
427429

428430
except Exception as ex:
429431
error_details = str(ex)
432+
tb = traceback.format_exc()
430433
logger.error(
431-
f"Dataset Materialization Failed with : {type(ex)} with error message: {ex}"
434+
f"Dataset Materialization Failed with : {type(ex)} with error message: {ex} and stacktrace {tb}"
432435
)
433436

434437
show_ingestion_summary(

ads/feature_store/feature_group.py

Lines changed: 11 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -180,7 +180,7 @@ def __init__(self, spec: Dict = None, **kwargs) -> None:
180180
# Specify oci FeatureGroup instance
181181
self.feature_group_job = None
182182
self._spark_engine = None
183-
self.oci_feature_group = self._to_oci_feature_group(**kwargs)
183+
self.oci_feature_group: OCIFeatureGroup = self._to_oci_feature_group(**kwargs)
184184
self.dsc_job = OCIFeatureGroupJob()
185185
self.lineage = OCILineage(**kwargs)
186186

@@ -750,8 +750,12 @@ def get_features_df(self) -> "pd.DataFrame":
750750
"""
751751
records = []
752752
for feature in self.features:
753-
records.append({"name": feature.feature_name, "type": feature.feature_type})
754-
753+
records.append(
754+
{
755+
"name": feature.feature_name,
756+
"type": feature.feature_type,
757+
}
758+
)
755759
return pd.DataFrame.from_records(records)
756760

757761
def get_input_features_df(self) -> "pd.DataFrame":
@@ -936,7 +940,7 @@ def get_last_job(self) -> "FeatureGroupJob":
936940
return fg_job[0]
937941
return FeatureGroupJob.from_id(self.job_id)
938942

939-
def select(self, features: Optional[List[str]] = []) -> Query:
943+
def select(self, features: Optional[List[str]] = ()) -> Query:
940944
"""
941945
Selects a subset of features from the feature group and returns a Query object that can be used to view the
942946
resulting dataframe.
@@ -1161,28 +1165,9 @@ def list_df(cls, compartment_id: str = None, **kwargs) -> "pd.DataFrame":
11611165
for oci_feature_group in OCIFeatureGroup.list_resource(
11621166
compartment_id, **kwargs
11631167
):
1164-
records.append(
1165-
{
1166-
"id": oci_feature_group.id,
1167-
"name": oci_feature_group.name,
1168-
"description": oci_feature_group.description,
1169-
"time_created": oci_feature_group.time_created.strftime(
1170-
utils.date_format
1171-
),
1172-
"time_updated": oci_feature_group.time_updated.strftime(
1173-
utils.date_format
1174-
),
1175-
"lifecycle_state": oci_feature_group.lifecycle_state,
1176-
"created_by": f"...{oci_feature_group.created_by[-6:]}",
1177-
"compartment_id": f"...{oci_feature_group.compartment_id[-6:]}",
1178-
"primary_keys": oci_feature_group.primary_keys,
1179-
"feature_store_id": oci_feature_group.feature_store_id,
1180-
"entity_id": oci_feature_group.entity_id,
1181-
"input_feature_details": oci_feature_group.input_feature_details,
1182-
"expectation_details": oci_feature_group.expectation_details,
1183-
"statistics_config": oci_feature_group.statistics_config,
1184-
}
1185-
)
1168+
oci_feature_group: OCIFeatureGroup = oci_feature_group
1169+
records.append(oci_feature_group.to_df_record())
1170+
11861171
return pd.DataFrame.from_records(records)
11871172

11881173
@classmethod

ads/feature_store/service/oci_feature_group.py

Lines changed: 26 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,8 @@
55
# Licensed under the Universal Permissive License v 1.0 as shown at https://oss.oracle.com/licenses/upl/
66

77
import datetime
8+
import pandas as pd
9+
from ads.common import utils
810

911
import oci
1012
from oci.feature_store.models import (
@@ -64,9 +66,9 @@ class OCIFeatureGroup(OCIFeatureStoreMixin, oci.feature_store.models.FeatureGrou
6466
Gets feature group by OCID.
6567
Examples
6668
--------
67-
>>> oci_feature_group = OCIFeatureGroup.from_id("<feature_group_id>")
68-
>>> oci_feature_group.description = "A brand new description"
69-
>>> oci_feature_group.delete()
69+
>>> self = OCIFeatureGroup.from_id("<feature_group_id>")
70+
>>> self.description = "A brand new description"
71+
>>> self.delete()
7072
"""
7173

7274
def create(self) -> "OCIFeatureGroup":
@@ -122,6 +124,27 @@ def delete(self):
122124
"""
123125
self.client.delete_feature_group(self.id)
124126

127+
def to_df(self):
128+
return pd.DataFrame.from_records([self.to_df_record()])
129+
130+
def to_df_record(self):
131+
return {
132+
"id": self.id,
133+
"name": self.name,
134+
"description": self.description,
135+
"time_created": self.time_created.strftime(utils.date_format),
136+
"time_updated": self.time_updated.strftime(utils.date_format),
137+
"lifecycle_state": self.lifecycle_state,
138+
"created_by": f"...{self.created_by[-6:]}",
139+
"compartment_id": f"...{self.compartment_id[-6:]}",
140+
"primary_keys": self.primary_keys,
141+
"feature_store_id": self.feature_store_id,
142+
"entity_id": self.entity_id,
143+
"input_feature_details": self.input_feature_details,
144+
"expectation_details": self.expectation_details,
145+
"statistics_config": self.statistics_config,
146+
}
147+
125148
@classmethod
126149
def from_id(cls, id: str) -> "OCIFeatureGroup":
127150
"""Gets feature group resource by id.

tests/integration/feature_store/test_base.py

Lines changed: 16 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010

1111
import oci
1212
import pandas as pd
13+
from ads.feature_store.entity import Entity
1314
from great_expectations.core import ExpectationSuite, ExpectationConfiguration
1415
import ads
1516
import os
@@ -36,20 +37,26 @@
3637

3738

3839
def transformation_with_kwargs(data_frame, **kwargs):
39-
is_area_enabled = kwargs.get('is_area_enabled')
40+
is_area_enabled = kwargs.get("is_area_enabled")
4041

4142
if is_area_enabled:
4243
# Calculate petal area and sepal area
43-
data_frame["petal_area"] = data_frame["petal_length"] * data_frame["petal_width"]
44-
data_frame["sepal_area"] = data_frame["sepal_length"] * data_frame["sepal_width"]
44+
data_frame["petal_area"] = (
45+
data_frame["petal_length"] * data_frame["petal_width"]
46+
)
47+
data_frame["sepal_area"] = (
48+
data_frame["sepal_length"] * data_frame["sepal_width"]
49+
)
4550

4651
# Return the updated DataFrame
4752
return data_frame
4853

4954

5055
class FeatureStoreTestCase:
5156
# networks compartment in feature store
52-
TIME_NOW = str.format("{}_{}",datetime.utcnow().strftime("%Y_%m_%d_%H_%M_%S"),int(random()*1000))
57+
TIME_NOW = str.format(
58+
"{}_{}", datetime.utcnow().strftime("%Y_%m_%d_%H_%M_%S"), int(random() * 1000)
59+
)
5360
TENANCY_ID = "ocid1.tenancy.oc1..aaaaaaaa462hfhplpx652b32ix62xrdijppq2c7okwcqjlgrbknhgtj2kofa"
5461
COMPARTMENT_ID = "ocid1.tenancy.oc1..aaaaaaaa462hfhplpx652b32ix62xrdijppq2c7okwcqjlgrbknhgtj2kofa"
5562
METASTORE_ID = "ocid1.datacatalogmetastore.oc1.iad.amaaaaaabiudgxyap7tizm4gscwz7amu7dixz7ml3mtesqzzwwg3urvvdgua"
@@ -376,9 +383,11 @@ def create_entity_resource(self, feature_store) -> "Entity":
376383
return entity
377384

378385
def create_transformation_resource(self, feature_store) -> "Transformation":
379-
transformation = feature_store.create_transformation(source_code_func=transformation_with_kwargs,
380-
display_name="transformation_with_kwargs",
381-
transformation_mode=TransformationMode.PANDAS)
386+
transformation = feature_store.create_transformation(
387+
source_code_func=transformation_with_kwargs,
388+
display_name="transformation_with_kwargs",
389+
transformation_mode=TransformationMode.PANDAS,
390+
)
382391
return transformation
383392

384393
def define_feature_group_resource(

0 commit comments

Comments
 (0)