Skip to content

Commit 38e19d1

Browse files
committed
partitioning code changes for the dataset
1 parent 8c8dc05 commit 38e19d1

File tree

6 files changed

+114
-65
lines changed

6 files changed

+114
-65
lines changed

ads/feature_store/dataset.py

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -105,6 +105,7 @@ class Dataset(Builder):
105105
CONST_DESCRIPTION = "description"
106106
CONST_FREEFORM_TAG = "freeformTags"
107107
CONST_DEFINED_TAG = "definedTags"
108+
CONST_PARTITION_KEYS = "partitionKeys"
108109
CONST_OUTPUT_FEATURE_DETAILS = "outputFeatureDetails"
109110
CONST_EXPECTATION_DETAILS = "expectationDetails"
110111
CONST_STATISTICS_CONFIG = "statisticsConfig"
@@ -128,6 +129,7 @@ class Dataset(Builder):
128129
CONST_OUTPUT_FEATURE_DETAILS: "output_feature_details",
129130
CONST_LIFECYCLE_STATE: "lifecycle_state",
130131
CONST_MODEL_DETAILS: "model_details",
132+
CONST_PARTITION_KEYS: "partition_keys",
131133
}
132134

133135
def __init__(self, spec: Dict = None, **kwargs) -> None:
@@ -500,6 +502,37 @@ def with_model_details(self, model_details: ModelDetails) -> "Dataset":
500502
)
501503
return self.set_spec(self.CONST_MODEL_DETAILS, model_details.to_dict())
502504

505+
@property
506+
def partition_keys(self) -> List[str]:
507+
return self.get_spec(self.CONST_PARTITION_KEYS)
508+
509+
@partition_keys.setter
510+
def partition_keys(self, value: List[str]):
511+
self.with_partition_keys(value)
512+
513+
def with_partition_keys(self, partition_keys: List[str]) -> "Dataset":
514+
"""Sets the partition keys of the dataset.
515+
516+
Parameters
517+
----------
518+
partition_keys: List[str]
519+
The List of partition keys for the feature group.
520+
521+
Returns
522+
-------
523+
FeatureGroup
524+
The FeatureGroup instance (self)
525+
"""
526+
return self.set_spec(
527+
self.CONST_PARTITION_KEYS,
528+
{
529+
self.CONST_ITEMS: [
530+
{self.CONST_NAME: partition_key}
531+
for partition_key in partition_keys or []
532+
]
533+
},
534+
)
535+
503536
def add_models(self, model_details: ModelDetails) -> "Dataset":
504537
"""Add model details to the dataset, Append to the existing model id list
505538

ads/feature_store/entity.py

Lines changed: 48 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -281,17 +281,17 @@ def create(self, **kwargs) -> "Entity":
281281
return self
282282

283283
def _build_feature_group(
284-
self,
285-
primary_keys,
286-
partition_keys,
287-
input_feature_details,
288-
expectation_suite: ExpectationSuite = None,
289-
expectation_type: ExpectationType = ExpectationType.NO_EXPECTATION,
290-
statistics_config: Union[StatisticsConfig, bool] = True,
291-
transformation_id: str = None,
292-
name: str = None,
293-
description: str = None,
294-
compartment_id: str = None,
284+
self,
285+
primary_keys,
286+
partition_keys,
287+
input_feature_details,
288+
expectation_suite: ExpectationSuite = None,
289+
expectation_type: ExpectationType = ExpectationType.NO_EXPECTATION,
290+
statistics_config: Union[StatisticsConfig, bool] = True,
291+
transformation_id: str = None,
292+
name: str = None,
293+
description: str = None,
294+
compartment_id: str = None,
295295
):
296296
feature_group_resource = (
297297
FeatureGroup()
@@ -316,18 +316,18 @@ def _build_feature_group(
316316
return feature_group_resource
317317

318318
def create_feature_group(
319-
self,
320-
primary_keys: List[str],
321-
partition_keys: List[str] = None,
322-
input_feature_details: List[FeatureDetail] = None,
323-
schema_details_dataframe: Union[DataFrame, pd.DataFrame] = None,
324-
expectation_suite: ExpectationSuite = None,
325-
expectation_type: ExpectationType = ExpectationType.NO_EXPECTATION,
326-
statistics_config: Union[StatisticsConfig, bool] = True,
327-
transformation_id: str = None,
328-
name: str = None,
329-
description: str = None,
330-
compartment_id: str = None,
319+
self,
320+
primary_keys: List[str],
321+
partition_keys: List[str] = None,
322+
input_feature_details: List[FeatureDetail] = None,
323+
schema_details_dataframe: Union[DataFrame, pd.DataFrame] = None,
324+
expectation_suite: ExpectationSuite = None,
325+
expectation_type: ExpectationType = ExpectationType.NO_EXPECTATION,
326+
statistics_config: Union[StatisticsConfig, bool] = True,
327+
transformation_id: str = None,
328+
name: str = None,
329+
description: str = None,
330+
compartment_id: str = None,
331331
) -> "FeatureGroup":
332332
"""Creates FeatureGroup resource.
333333
@@ -411,7 +411,7 @@ def delete_feature_group(self):
411411

412412
@classmethod
413413
def list_feature_group(
414-
cls, compartment_id: str = None, **kwargs
414+
cls, compartment_id: str = None, **kwargs
415415
) -> List["FeatureGroup"]:
416416
"""Lists FeatureGroup resources in a given compartment.
417417
@@ -432,7 +432,7 @@ def list_feature_group(
432432

433433
@classmethod
434434
def list_feature_group_df(
435-
cls, compartment_id: str = None, **kwargs
435+
cls, compartment_id: str = None, **kwargs
436436
) -> "pandas.DataFrame":
437437
"""Lists FeatureGroup resources in a given compartment as pandas dataframe.
438438
@@ -452,14 +452,15 @@ def list_feature_group_df(
452452
return FeatureGroup.list_df(compartment_id, **kwargs)
453453

454454
def _build_dataset(
455-
self,
456-
query: str,
457-
name: str = None,
458-
description: str = None,
459-
compartment_id: str = None,
460-
expectation_suite: ExpectationSuite = None,
461-
expectation_type: ExpectationType = ExpectationType.NO_EXPECTATION,
462-
statistics_config: Union[StatisticsConfig, bool] = True,
455+
self,
456+
query: str,
457+
name: str = None,
458+
description: str = None,
459+
compartment_id: str = None,
460+
expectation_suite: ExpectationSuite = None,
461+
expectation_type: ExpectationType = ExpectationType.NO_EXPECTATION,
462+
statistics_config: Union[StatisticsConfig, bool] = True,
463+
partition_keys: List[str] = None,
463464
):
464465
dataset_resource = (
465466
Dataset()
@@ -472,6 +473,7 @@ def _build_dataset(
472473
compartment_id if compartment_id else self.compartment_id
473474
)
474475
.with_statistics_config(statistics_config)
476+
.with_partition_keys(partition_keys)
475477
)
476478

477479
if expectation_suite:
@@ -483,14 +485,15 @@ def _build_dataset(
483485
return dataset_resource
484486

485487
def create_dataset(
486-
self,
487-
query: str,
488-
name: str = None,
489-
description: str = None,
490-
compartment_id: str = None,
491-
expectation_suite: ExpectationSuite = None,
492-
expectation_type: ExpectationType = ExpectationType.NO_EXPECTATION,
493-
statistics_config: Union[StatisticsConfig, bool] = True,
488+
self,
489+
query: str,
490+
name: str = None,
491+
description: str = None,
492+
compartment_id: str = None,
493+
expectation_suite: ExpectationSuite = None,
494+
expectation_type: ExpectationType = ExpectationType.NO_EXPECTATION,
495+
statistics_config: Union[StatisticsConfig, bool] = True,
496+
partition_keys: List[str] = None,
494497
) -> "Dataset":
495498
"""Creates Dataset resource.
496499
@@ -510,6 +513,8 @@ def create_dataset(
510513
Type of the expectation.
511514
statistics_config: StatisticsConfig = None
512515
Config details for the Statistics.
516+
partition_keys: List[str]
517+
Partition keys for the datset.
513518
514519
Returns
515520
-------
@@ -529,6 +534,7 @@ def create_dataset(
529534
expectation_suite,
530535
expectation_type,
531536
statistics_config,
537+
partition_keys,
532538
)
533539

534540
return self.oci_fs_dataset.create()
@@ -566,7 +572,7 @@ def list_dataset(cls, compartment_id: str = None, **kwargs) -> List["Dataset"]:
566572

567573
@classmethod
568574
def list_dataset_df(
569-
cls, compartment_id: str = None, **kwargs
575+
cls, compartment_id: str = None, **kwargs
570576
) -> "pandas.DataFrame":
571577
"""Lists Dataset resources in a given compartment as pandas dataframe.
572578

ads/feature_store/execution_strategy/delta_lake/delta_lake_service.py

Lines changed: 24 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -56,9 +56,6 @@ def write_dataframe_to_delta_lake(
5656
Returns:
5757
None.
5858
"""
59-
60-
print(delta_table_primary_key, partition_keys, "Aaaya Kya")
61-
6259
logger.info(f"target table name {target_table_name}")
6360
if (
6461
self.spark_engine.is_delta_table_exists(target_table_name)
@@ -113,7 +110,11 @@ def write_dataframe_to_delta_lake(
113110
logger.info(f"Upsert ops for target table {target_table_name} ended")
114111
else:
115112
self.save_delta_dataframe(
116-
dataflow_output, ingestion_mode, target_table_name, feature_options, partition_keys
113+
dataflow_output,
114+
ingestion_mode,
115+
target_table_name,
116+
feature_options,
117+
partition_keys,
117118
)
118119

119120
def __execute_delta_merge_insert_update(
@@ -197,7 +198,12 @@ def __execute_delta_merge_insert_update_all(
197198
)
198199

199200
def save_delta_dataframe(
200-
self, dataframe, dataframe_ingestion_mode, table_name, feature_options=None, partition_keys=None
201+
self,
202+
dataframe,
203+
dataframe_ingestion_mode,
204+
table_name,
205+
feature_options=None,
206+
partition_keys=None,
201207
):
202208
"""
203209
Saves a DataFrame to a Delta table with the specified options.
@@ -211,9 +217,12 @@ def save_delta_dataframe(
211217
"""
212218
delta_partition_keys = []
213219

214-
partition_keys_items = partition_keys["items"]
215-
if partition_keys_items:
216-
delta_partition_keys = [partition_key.get("name") for partition_key in partition_keys_items]
220+
if partition_keys:
221+
partition_keys_items = partition_keys["items"]
222+
if partition_keys_items:
223+
delta_partition_keys = [
224+
partition_key.get("name") for partition_key in partition_keys_items
225+
]
217226

218227
if feature_options and feature_options.get("featureOptionWriteConfigDetails"):
219228
feature_delta_write_option_config = feature_options.get(
@@ -226,11 +235,15 @@ def save_delta_dataframe(
226235

227236
dataframe.write.format("delta").options(
228237
**self.get_delta_write_config(feature_delta_write_option_config)
229-
).mode(dataframe_ingestion_mode).partitionBy(delta_partition_keys).saveAsTable(table_name)
230-
else:
231-
dataframe.write.format("delta").mode(dataframe_ingestion_mode).partitionBy(delta_partition_keys).saveAsTable(
238+
).mode(dataframe_ingestion_mode).partitionBy(
239+
delta_partition_keys
240+
).saveAsTable(
232241
table_name
233242
)
243+
else:
244+
dataframe.write.format("delta").mode(dataframe_ingestion_mode).partitionBy(
245+
delta_partition_keys
246+
).saveAsTable(table_name)
234247

235248
def get_delta_write_config(self, feature_delta_write_option_config):
236249
"""Returns a dictionary containing delta schema configuration options based on a given dictionary of feature

ads/feature_store/execution_strategy/spark/spark_execution.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -398,6 +398,7 @@ def _save_dataset_input(self, dataset, dataset_job: DatasetJob):
398398
dataset_job.ingestion_mode,
399399
target_table,
400400
dataset_job.feature_option_details,
401+
dataset.partition_keys,
401402
)
402403

403404
# Get the output features

ads/feature_store/feature_group.py

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -125,7 +125,6 @@ class FeatureGroup(Builder):
125125
CONST_FEATURE_STORE_ID = "featureStoreId"
126126
CONST_ENTITY_ID = "entityId"
127127
CONST_ITEMS = "items"
128-
CONST_PRIMARY_KEY_NAME = "name"
129128
CONST_PRIMARY_KEYS = "primaryKeys"
130129
CONST_PARTITION_KEYS = "partitionKeys"
131130
CONST_EXPECTATION_DETAILS = "expectationDetails"
@@ -157,7 +156,7 @@ class FeatureGroup(Builder):
157156
CONST_OUTPUT_FEATURE_DETAILS: "output_feature_details",
158157
CONST_STATISTICS_CONFIG: "statistics_config",
159158
CONST_INFER_SCHEMA: "is_infer_schema",
160-
CONST_PARTITION_KEYS: "partition_keys"
159+
CONST_PARTITION_KEYS: "partition_keys",
161160
}
162161

163162
def __init__(self, spec: Dict = None, **kwargs) -> None:
@@ -321,8 +320,7 @@ def with_primary_keys(self, primary_keys: List[str]) -> "FeatureGroup":
321320
self.CONST_PRIMARY_KEYS,
322321
{
323322
self.CONST_ITEMS: [
324-
{self.CONST_PRIMARY_KEY_NAME: primary_key}
325-
for primary_key in primary_keys
323+
{self.CONST_NAME: primary_key} for primary_key in primary_keys
326324
]
327325
},
328326
)
@@ -352,7 +350,7 @@ def with_partition_keys(self, partition_keys: List[str]) -> "FeatureGroup":
352350
self.CONST_PARTITION_KEYS,
353351
{
354352
self.CONST_ITEMS: [
355-
{self.CONST_PRIMARY_KEY_NAME: partition_key}
353+
{self.CONST_NAME: partition_key}
356354
for partition_key in partition_keys or []
357355
]
358356
},

ads/feature_store/validation_output.py

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -23,10 +23,10 @@ def to_pandas(self) -> pd.DataFrame:
2323
The validation output information as a pandas DataFrame.
2424
"""
2525
if self.content:
26-
validation_output_json = (
27-
json.loads(self.content)
28-
)
29-
profile_result = pd.json_normalize(validation_output_json.get("results")).transpose()
26+
validation_output_json = json.loads(self.content)
27+
profile_result = pd.json_normalize(
28+
validation_output_json.get("results")
29+
).transpose()
3030
return profile_result
3131

3232
def to_summary(self) -> pd.DataFrame:
@@ -39,9 +39,7 @@ def to_summary(self) -> pd.DataFrame:
3939
The validation output summary information as a pandas DataFrame.
4040
"""
4141
if self.content:
42-
validation_output_json = (
43-
json.loads(self.content)
44-
)
42+
validation_output_json = json.loads(self.content)
4543
profile_result = pd.json_normalize(validation_output_json).transpose()
4644
summary_df = profile_result.drop("results")
4745
return summary_df

0 commit comments

Comments
 (0)