Skip to content

Commit 9ba1f32

Browse files
authored
Odsc 45131/ads interfaces (#282)
2 parents 820b5bd + bb94e65 commit 9ba1f32

File tree

7 files changed

+373
-51
lines changed

7 files changed

+373
-51
lines changed

ads/feature_store/dataset.py

Lines changed: 75 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -245,6 +245,16 @@ def id(self) -> str:
245245
"""
246246
return self.get_spec(self.CONST_ID)
247247

248+
@property
249+
def features(self) -> List[DatasetFeature]:
250+
return [
251+
DatasetFeature(**feature_dict)
252+
for feature_dict in self.get_spec(self.CONST_OUTPUT_FEATURE_DETAILS)[
253+
self.CONST_ITEMS
254+
]
255+
or []
256+
]
257+
248258
def with_id(self, id: str) -> "Dataset":
249259
return self.set_spec(self.CONST_ID, id)
250260

@@ -709,6 +719,28 @@ def delete(self):
709719

710720
dataset_execution_strategy.delete_dataset(self, dataset_job)
711721

722+
def get_features(self) -> List[DatasetFeature]:
723+
"""
724+
Returns all the features in the dataset.
725+
726+
Returns:
727+
List[DatasetFeature]
728+
"""
729+
730+
return self.features
731+
732+
def get_features_df(self) -> "pandas.DataFrame":
733+
"""
734+
Returns all the features as pandas dataframe.
735+
736+
Returns:
737+
pandas.DataFrame
738+
"""
739+
records = []
740+
for feature in self.features:
741+
records.append({"name": feature.feature_name, "type": feature.feature_type})
742+
return pandas.DataFrame.from_records(records)
743+
712744
def update(self, **kwargs) -> "Dataset":
713745
"""Updates Dataset in the feature store.
714746
@@ -752,7 +784,18 @@ def _update_from_oci_dataset_model(self, oci_dataset: OCIDataset) -> "Dataset":
752784

753785
for infra_attr, dsc_attr in self.attribute_map.items():
754786
if infra_attr in dataset_details:
755-
self.set_spec(infra_attr, dataset_details[infra_attr])
787+
if infra_attr == self.CONST_OUTPUT_FEATURE_DETAILS:
788+
# May not need if we fix the backend and add dataset_id to the output_feature
789+
features_list = []
790+
for output_feature in dataset_details[infra_attr]["items"]:
791+
output_feature["datasetId"] = dataset_details[self.CONST_ID]
792+
features_list.append(output_feature)
793+
794+
value = {self.CONST_ITEMS: features_list}
795+
else:
796+
value = dataset_details[infra_attr]
797+
798+
self.set_spec(infra_attr, value)
756799

757800
return self
758801

@@ -792,6 +835,33 @@ def materialise(
792835

793836
dataset_execution_strategy.ingest_dataset(self, dataset_job)
794837

838+
def get_last_job(self) -> "DatasetJob":
839+
"""Gets the Job details for the last running Dataset job.
840+
841+
Returns:
842+
DatasetJob
843+
"""
844+
845+
if not self.id:
846+
raise ValueError(
847+
"Dataset needs to be saved to the feature store before getting associated jobs."
848+
)
849+
850+
if not self.job_id:
851+
ds_job = DatasetJob.list(
852+
dataset_id=self.id,
853+
compartment_id=self.compartment_id,
854+
sort_by="timeCreated",
855+
limit="1",
856+
)
857+
if not ds_job:
858+
raise ValueError(
859+
"Unable to retrieve the associated last job. Please make sure you materialized the data."
860+
)
861+
self.with_job_id(ds_job[0].id)
862+
return ds_job[0]
863+
return DatasetJob.from_id(self.job_id)
864+
795865
@deprecated(details="preview functionality is deprecated. Please use as_of.")
796866
def preview(
797867
self,
@@ -947,14 +1017,8 @@ def get_statistics(self, job_id: str = None) -> "Statistics":
9471017
raise ValueError(
9481018
"Dataset needs to be saved to the feature store before retrieving the statistics"
9491019
)
950-
stat_job_id = job_id
951-
if job_id is None:
952-
if self.job_id is None:
953-
raise ValueError(
954-
"Unable to retrieve the last job,please provide the job id,make sure you materialised the data'"
955-
)
956-
else:
957-
stat_job_id = self.job_id
1020+
1021+
stat_job_id = job_id if job_id is not None else self.get_last_job().id
9581022

9591023
# TODO: take the one in memory or will list down job ids and find the latest
9601024
dataset_job = DatasetJob.from_id(stat_job_id)
@@ -980,14 +1044,8 @@ def get_validation_output(self, job_id: str = None) -> "ValidationOutput":
9801044
raise ValueError(
9811045
"Dataset needs to be saved to the feature store before retrieving the validation report"
9821046
)
983-
validation_job_id = job_id
984-
if job_id is None:
985-
if self.job_id is None:
986-
raise ValueError(
987-
"Unable to retrieve the last job,please provide the job id,make sure you materialised the data'"
988-
)
989-
else:
990-
validation_job_id = self.job_id
1047+
1048+
validation_job_id = job_id if job_id is not None else self.get_last_job().id
9911049

9921050
# retrieve the validation output JSON from data_flow_batch_execution_output
9931051
dataset_job = DatasetJob.from_id(validation_job_id)

ads/feature_store/docs/source/dataset.rst

Lines changed: 2 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -227,23 +227,14 @@ The ``get_statistics()`` method takes the following optional parameter:
227227

228228
Get features
229229
============
230-
You can call the ``get_features_dataframe()`` method of the Dataset instance to fetch features in a dataset.
230+
You can call the ``get_features_df()`` method of the Dataset instance to fetch features in a dataset.
231231

232232
.. code-block:: python3
233233
234234
# Fetch features for a dataset
235-
df = dataset.get_features_dataframe()
235+
df = dataset.get_features_df()
236236
df.show()
237237
238-
Get input schema details
239-
========================
240-
You can call the ``get_input_schema_dataframe()`` method of the Dataset instance to fetch input schema details of a dataset.
241-
242-
.. code-block:: python3
243-
244-
# Fetch input schema details for a dataset
245-
df = dataset.get_input_schema_dataframe()
246-
df.show()
247238
248239
Preview
249240
========

ads/feature_store/feature_group.py

Lines changed: 15 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -750,13 +750,8 @@ def get_features_df(self) -> "pd.DataFrame":
750750
"""
751751
records = []
752752
for feature in self.features:
753-
records.append(
754-
{
755-
"name": feature.feature_name,
756-
"type": feature.feature_type,
757-
"feature_group_id": feature.feature_group_id,
758-
}
759-
)
753+
records.append({"name": feature.feature_name, "type": feature.feature_type})
754+
760755
return pd.DataFrame.from_records(records)
761756

762757
def get_input_features_df(self) -> "pd.DataFrame":
@@ -927,10 +922,18 @@ def get_last_job(self) -> "FeatureGroupJob":
927922
)
928923

929924
if not self.job_id:
930-
raise ValueError(
931-
"Associated jobs cannot be retrieved before calling 'materialise' or 'delete'."
925+
fg_job = FeatureGroupJob.list(
926+
feature_group_id=self.id,
927+
compartment_id=self.compartment_id,
928+
sort_by="timeCreated",
929+
limit="1",
932930
)
933-
931+
if not fg_job:
932+
raise ValueError(
933+
"Unable to retrieve the associated last job. Please make sure you materialized the data."
934+
)
935+
self.with_job_id(fg_job[0].id)
936+
return fg_job[0]
934937
return FeatureGroupJob.from_id(self.job_id)
935938

936939
def select(self, features: Optional[List[str]] = []) -> Query:
@@ -1350,7 +1353,7 @@ def get_statistics(self, job_id: str = None) -> "Statistics":
13501353
"FeatureGroup needs to be saved to the feature store before retrieving the statistics"
13511354
)
13521355

1353-
stat_job_id = self._get_job_id(job_id)
1356+
stat_job_id = job_id if job_id is not None else self.get_last_job().id
13541357

13551358
# TODO: take the one in memory or will list down job ids and find the latest
13561359
fg_job = FeatureGroupJob.from_id(stat_job_id)
@@ -1379,7 +1382,7 @@ def get_validation_output(self, job_id: str = None) -> "ValidationOutput":
13791382
"FeatureGroup needs to be saved to the feature store before retrieving the validation report"
13801383
)
13811384

1382-
validation_job_id = self._get_job_id(job_id)
1385+
validation_job_id = job_id if job_id is not None else self.get_last_job().id
13831386

13841387
# Retrieve the validation output JSON from data_flow_batch_execution_output.
13851388
fg_job = FeatureGroupJob.from_id(validation_job_id)

ads/feature_store/mixin/oci_feature_store.py

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
#!/usr/bin/env python
22
# -*- coding: utf-8; -*-
3+
from ads.common.decorator.utils import class_or_instance_method
34

45
# Copyright (c) 2023 Oracle and/or its affiliates.
56
# Licensed under the Universal Permissive License v 1.0 as shown at https://oss.oracle.com/licenses/upl/
@@ -77,3 +78,44 @@ def init_client(
7778
@property
7879
def client(self) -> oci.feature_store.feature_store_client.FeatureStoreClient:
7980
return super().client
81+
82+
@class_or_instance_method
83+
def list_resource(
84+
cls, compartment_id: str = None, limit: int = 0, **kwargs
85+
) -> list:
86+
"""Generic method to list OCI resources
87+
88+
Parameters
89+
----------
90+
compartment_id : str
91+
Compartment ID of the OCI resources. Defaults to None.
92+
If compartment_id is not specified,
93+
the value of NB_SESSION_COMPARTMENT_OCID in environment variable will be used.
94+
limit : int
95+
The maximum number of items to return. Defaults to 0, All items will be returned
96+
**kwargs :
97+
Additional keyword arguments to filter the resource.
98+
The kwargs are passed into OCI API.
99+
100+
Returns
101+
-------
102+
list
103+
A list of OCI resources
104+
105+
Raises
106+
------
107+
NotImplementedError
108+
List method is not supported or implemented.
109+
110+
"""
111+
if limit:
112+
items = cls._find_oci_method("list")(
113+
cls.check_compartment_id(compartment_id), limit=limit, **kwargs
114+
).data.items
115+
else:
116+
items = oci.pagination.list_call_get_all_results(
117+
cls._find_oci_method("list"),
118+
cls.check_compartment_id(compartment_id),
119+
**kwargs,
120+
).data
121+
return [cls.from_oci_model(item) for item in items]

0 commit comments

Comments
 (0)