Skip to content

Commit b8f9c16

Browse files
committed
review comment changes
1 parent 8cce79f commit b8f9c16

File tree

4 files changed

+243
-26
lines changed

4 files changed

+243
-26
lines changed

ads/feature_store/dataset.py

Lines changed: 31 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -835,7 +835,33 @@ def materialise(
835835

836836
dataset_execution_strategy.ingest_dataset(self, dataset_job)
837837

838-
@deprecated(details="preview functionality is deprecated. Please use as_of.")
838+
def get_last_job(self) -> "DatasetJob":
839+
"""Gets the Job details for the last running Dataset job.
840+
841+
Returns:
842+
DatasetJob
843+
"""
844+
845+
if not self.id:
846+
raise ValueError(
847+
"Dataset needs to be saved to the feature store before getting associated jobs."
848+
)
849+
850+
if not self.job_id:
851+
ds_job = DatasetJob.list(
852+
dataset_id=self.id,
853+
compartment_id=self.compartment_id,
854+
sort_by="timeCreated",
855+
limit="1",
856+
)
857+
if not ds_job:
858+
raise ValueError(
859+
"Unable to retrieve the associated last job. Please make sure you materialized the data."
860+
)
861+
self.with_job_id(ds_job[0].id)
862+
return ds_job[0]
863+
return DatasetJob.from_id(self.job_id)
864+
839865
def preview(
840866
self,
841867
row_count: int = 10,
@@ -990,14 +1016,8 @@ def get_statistics(self, job_id: str = None) -> "Statistics":
9901016
raise ValueError(
9911017
"Dataset needs to be saved to the feature store before retrieving the statistics"
9921018
)
993-
stat_job_id = job_id
994-
if job_id is None:
995-
if self.job_id is None:
996-
raise ValueError(
997-
"Unable to retrieve the last job,please provide the job id,make sure you materialised the data'"
998-
)
999-
else:
1000-
stat_job_id = self.job_id
1019+
1020+
stat_job_id = job_id if job_id is not None else self.get_last_job().id
10011021

10021022
# TODO: take the one in memory or will list down job ids and find the latest
10031023
dataset_job = DatasetJob.from_id(stat_job_id)
@@ -1023,14 +1043,8 @@ def get_validation_output(self, job_id: str = None) -> "ValidationOutput":
10231043
raise ValueError(
10241044
"Dataset needs to be saved to the feature store before retrieving the validation report"
10251045
)
1026-
validation_job_id = job_id
1027-
if job_id is None:
1028-
if self.job_id is None:
1029-
raise ValueError(
1030-
"Unable to retrieve the last job,please provide the job id,make sure you materialised the data'"
1031-
)
1032-
else:
1033-
validation_job_id = self.job_id
1046+
1047+
validation_job_id = job_id if job_id is not None else self.get_last_job().id
10341048

10351049
# retrieve the validation output JSON from data_flow_batch_execution_output
10361050
dataset_job = DatasetJob.from_id(validation_job_id)

ads/feature_store/feature_group.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -930,7 +930,7 @@ def get_last_job(self) -> "FeatureGroupJob":
930930
)
931931
if not fg_job:
932932
raise ValueError(
933-
"Associated jobs cannot be retrieved before calling 'materialise' or 'delete'."
933+
"Unable to retrieve the associated last job. Please make sure you materialized the data."
934934
)
935935
self.with_job_id(fg_job[0].id)
936936
return fg_job[0]
@@ -1353,7 +1353,7 @@ def get_statistics(self, job_id: str = None) -> "Statistics":
13531353
"FeatureGroup needs to be saved to the feature store before retrieving the statistics"
13541354
)
13551355

1356-
stat_job_id = self._get_job_id(job_id)
1356+
stat_job_id = job_id if job_id is not None else self.get_last_job().id
13571357

13581358
# TODO: take the one in memory or will list down job ids and find the latest
13591359
fg_job = FeatureGroupJob.from_id(stat_job_id)
@@ -1382,7 +1382,7 @@ def get_validation_output(self, job_id: str = None) -> "ValidationOutput":
13821382
"FeatureGroup needs to be saved to the feature store before retrieving the validation report"
13831383
)
13841384

1385-
validation_job_id = self._get_job_id(job_id)
1385+
validation_job_id = job_id if job_id is not None else self.get_last_job().id
13861386

13871387
# Retrieve the validation output JSON from data_flow_batch_execution_output.
13881388
fg_job = FeatureGroupJob.from_id(validation_job_id)
Lines changed: 185 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,185 @@
1+
#!/usr/bin/env python
2+
# -*- coding: utf-8 -*--
3+
4+
# Copyright (c) 2023 Oracle and/or its affiliates.
5+
# Licensed under the Universal Permissive License v 1.0 as shown at https://oss.oracle.com/licenses/upl/
6+
7+
import pytest
8+
from ads.feature_store.feature_group_job import FeatureGroupJob
9+
10+
from ads.feature_store.dataset import Dataset
11+
from ads.feature_store.feature_group import FeatureGroup
12+
from tests.integration.feature_store.test_base import FeatureStoreTestCase
13+
14+
15+
class TestFeatureGroupDatasetListing(FeatureStoreTestCase):
16+
"""Contains integration tests for Feature Group and Dataset Listing."""
17+
18+
def define_feature_group_resource_with_default_config(
19+
self, entity_id, feature_store_id
20+
) -> "FeatureGroup":
21+
feature_group_resource = (
22+
FeatureGroup()
23+
.with_description("feature group with default stats config")
24+
.with_compartment_id(self.COMPARTMENT_ID)
25+
.with_name(self.get_name("petals3"))
26+
.with_entity_id(entity_id)
27+
.with_feature_store_id(feature_store_id)
28+
.with_primary_keys([])
29+
.with_input_feature_details(self.INPUT_FEATURE_DETAILS)
30+
)
31+
return feature_group_resource
32+
33+
def define_feature_group_resource_with_stats_disabled(
34+
self, entity_id, feature_store_id
35+
) -> "FeatureGroup":
36+
feature_group_resource = (
37+
FeatureGroup()
38+
.with_description("feature group with statistics disabled")
39+
.with_compartment_id(self.COMPARTMENT_ID)
40+
.with_name(self.get_name("petals2"))
41+
.with_entity_id(entity_id)
42+
.with_feature_store_id(feature_store_id)
43+
.with_primary_keys([])
44+
.with_input_feature_details(self.INPUT_FEATURE_DETAILS)
45+
.with_statistics_config(False)
46+
)
47+
return feature_group_resource
48+
49+
def define_dataset_resource_with_default_config(
50+
self, entity_id, feature_store_id, feature_group_name
51+
) -> "Dataset":
52+
name = self.get_name("petals1")
53+
dataset_resource = (
54+
Dataset()
55+
.with_description("dataset with default statistics configuration")
56+
.with_compartment_id(self.COMPARTMENT_ID)
57+
.with_name(self.get_name("petals_ds_default_stat"))
58+
.with_entity_id(entity_id)
59+
.with_feature_store_id(feature_store_id)
60+
.with_query(f"SELECT * FROM `{entity_id}`.{feature_group_name}")
61+
)
62+
return dataset_resource
63+
64+
def define_dataset_resource_with_stats_disabled(
65+
self, entity_id, feature_store_id, feature_group_name
66+
) -> "Dataset":
67+
name = self.get_name("petals4")
68+
dataset_resource = (
69+
Dataset()
70+
.with_description("dataset with statistics disabled")
71+
.with_compartment_id(self.COMPARTMENT_ID)
72+
.with_name(self.get_name("petals_ds_stat_disabled"))
73+
.with_entity_id(entity_id)
74+
.with_feature_store_id(feature_store_id)
75+
.with_query(f"SELECT * FROM `{entity_id}`.{feature_group_name}")
76+
.with_statistics_config(False)
77+
)
78+
return dataset_resource
79+
80+
def test_feature_group_listing_without_limit(self):
81+
"""Tests listing of feature group resources with user defined limit."""
82+
fs = self.define_feature_store_resource().create()
83+
assert fs.oci_fs.id
84+
85+
entity = self.create_entity_resource(fs)
86+
assert entity.oci_fs_entity.id
87+
88+
fg1 = self.define_feature_group_resource_with_default_config(
89+
entity.oci_fs_entity.id, fs.oci_fs.id
90+
).create()
91+
assert fg1.oci_feature_group.id
92+
fg1.materialise(self.data)
93+
fg1.materialise(self.data2)
94+
95+
fg1_job_list = FeatureGroupJob.list(compartment_id=self.COMPARTMENT_ID)
96+
assert fg1_job_list is not None
97+
assert len(fg1_job_list) == 2
98+
99+
fg2 = self.define_feature_group_resource_with_stats_disabled(
100+
entity.oci_fs_entity.id, fs.oci_fs.id
101+
).create()
102+
assert fg2.oci_feature_group.id
103+
fg2.materialise(self.data3)
104+
105+
fg_list = FeatureGroup.list(compartment_id=self.COMPARTMENT_ID)
106+
assert fg_list is not None
107+
assert len(fg_list) == 2
108+
109+
self.clean_up_feature_group(fg1)
110+
self.clean_up_feature_group(fg2)
111+
self.clean_up_entity(entity)
112+
self.clean_up_feature_store(fs)
113+
114+
def test_feature_group_listing_with_limit(self):
115+
"""Tests listing of feature group resources with user defined limit."""
116+
fs = self.define_feature_store_resource().create()
117+
assert fs.oci_fs.id
118+
119+
entity = self.create_entity_resource(fs)
120+
assert entity.oci_fs_entity.id
121+
122+
fg1 = self.define_feature_group_resource_with_default_config(
123+
entity.oci_fs_entity.id, fs.oci_fs.id
124+
).create()
125+
assert fg1.oci_feature_group.id
126+
fg1.materialise(self.data)
127+
fg1.materialise(self.data2)
128+
129+
fg1_job_list = FeatureGroupJob.list(
130+
compartment_id=self.COMPARTMENT_ID,
131+
feature_group_id=fg1.id,
132+
sort_by="timeCreated",
133+
limit="1",
134+
)
135+
assert fg1_job_list is not None
136+
assert len(fg1_job_list) == 1
137+
138+
fg2 = self.define_feature_group_resource_with_stats_disabled(
139+
entity.oci_fs_entity.id, fs.oci_fs.id
140+
).create()
141+
assert fg2.oci_feature_group.id
142+
fg2.materialise(self.data3)
143+
144+
fg_list = FeatureGroup.list(
145+
compartment_id=self.COMPARTMENT_ID,
146+
sort_by="timeCreated",
147+
limit="1",
148+
)
149+
assert fg_list is not None
150+
assert len(fg_list) == 1
151+
152+
self.clean_up_feature_group(fg1)
153+
self.clean_up_feature_group(fg2)
154+
self.clean_up_entity(entity)
155+
self.clean_up_feature_store(fs)
156+
157+
def test_dataset_listing_without_limit(self):
158+
"""Tests listing of dataset resources without any limit."""
159+
fs = self.define_feature_store_resource().create()
160+
assert fs.oci_fs.id
161+
162+
entity = self.create_entity_resource(fs)
163+
assert entity.oci_fs_entity.id
164+
165+
fg = self.define_feature_group_resource(
166+
entity.oci_fs_entity.id, fs.oci_fs.id
167+
).create()
168+
assert fg.oci_feature_group.id
169+
170+
fg.materialise(self.data)
171+
172+
dataset = self.define_dataset_resource(
173+
entity.oci_fs_entity.id, fs.oci_fs.id, fg.oci_feature_group.name
174+
).create()
175+
assert dataset.oci_dataset.id
176+
177+
dataset.materialise()
178+
ds_list = Dataset.list(compartment_id=self.COMPARTMENT_ID)
179+
assert ds_list is not None
180+
assert len(ds_list) == 1
181+
182+
self.clean_up_dataset(dataset)
183+
self.clean_up_feature_group(fg)
184+
self.clean_up_entity(entity)
185+
self.clean_up_feature_store(fs)

tests/unitary/with_extras/feature_store/test_dataset.py

Lines changed: 24 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,14 @@
4343
"ingestionMode": "OVERWRITE",
4444
}
4545

46+
DATASET_JOB_RESPONSE_PAYLOAD = {
47+
"compartmentId": "ocid1.compartment.oc1.iad.xxx",
48+
"datasetId": "861AA4E9C8E811A79D74C464A01CDF42",
49+
"id": "d40265b7-d66e-49a3-ae26-699012e0df5d",
50+
"ingestionMode": "OVERWRITE",
51+
"lifecycleState": "SUCCEEDED",
52+
}
53+
4654

4755
@pytest.fixture
4856
def dataframe_fixture_basic():
@@ -259,12 +267,12 @@ def test__to_oci_fs_entity(self, mock_load_key_file, mock_config_from_file):
259267
@patch.object(SparkSessionSingleton, "__init__", return_value=None)
260268
@patch.object(SparkSessionSingleton, "get_spark_session")
261269
def test_materialise(self, spark, get_spark_session, mock_update):
262-
with patch.object(DatasetJob, "create") as mock_dataset_job:
263-
with patch.object(FeatureStore, "from_id"):
264-
with patch.object(DatasetJob, "_mark_job_complete"):
265-
mock_dataset_job.return_value = self.mock_dsc_dataset_job
266-
self.mock_dsc_dataset.with_id(DATASET_OCID)
267-
self.mock_dsc_dataset.materialise()
270+
with patch.object(DatasetJob, "create") as mock_dataset_job:
271+
with patch.object(FeatureStore, "from_id"):
272+
with patch.object(DatasetJob, "_mark_job_complete"):
273+
mock_dataset_job.return_value = self.mock_dsc_dataset_job
274+
self.mock_dsc_dataset.with_id(DATASET_OCID)
275+
self.mock_dsc_dataset.materialise()
268276

269277
@patch.object(SparkSessionSingleton, "__init__", return_value=None)
270278
@patch.object(SparkSessionSingleton, "get_spark_session")
@@ -306,3 +314,13 @@ def test_restore(self, spark, get_spark_session, feature_store, mock_update):
306314
self.mock_dsc_dataset.with_id(DATASET_OCID)
307315
self.mock_dsc_dataset.restore(1)
308316
mock_execution_strategy.assert_called_once()
317+
318+
def test_get_last_job(self):
319+
"""Tests getting most recent dataset job for a dataset."""
320+
with patch.object(DatasetJob, "list") as mock_dataset_job:
321+
self.mock_dsc_dataset.with_id(DATASET_OCID)
322+
mock_dataset_job.return_value = [
323+
DatasetJob.from_dict({"spec": DATASET_JOB_RESPONSE_PAYLOAD})
324+
]
325+
ds_job = self.mock_dsc_dataset.get_last_job()
326+
assert ds_job is not None

0 commit comments

Comments
 (0)