Skip to content

Commit 8c8dc05

Browse files
committed
code changes for the partitiong support
1 parent d28c13d commit 8c8dc05

File tree

4 files changed

+94
-44
lines changed

4 files changed

+94
-44
lines changed

ads/feature_store/entity.py

Lines changed: 46 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -281,16 +281,17 @@ def create(self, **kwargs) -> "Entity":
281281
return self
282282

283283
def _build_feature_group(
284-
self,
285-
primary_keys,
286-
input_feature_details,
287-
expectation_suite: ExpectationSuite = None,
288-
expectation_type: ExpectationType = ExpectationType.NO_EXPECTATION,
289-
statistics_config: Union[StatisticsConfig, bool] = True,
290-
transformation_id: str = None,
291-
name: str = None,
292-
description: str = None,
293-
compartment_id: str = None,
284+
self,
285+
primary_keys,
286+
partition_keys,
287+
input_feature_details,
288+
expectation_suite: ExpectationSuite = None,
289+
expectation_type: ExpectationType = ExpectationType.NO_EXPECTATION,
290+
statistics_config: Union[StatisticsConfig, bool] = True,
291+
transformation_id: str = None,
292+
name: str = None,
293+
description: str = None,
294+
compartment_id: str = None,
294295
):
295296
feature_group_resource = (
296297
FeatureGroup()
@@ -302,6 +303,7 @@ def _build_feature_group(
302303
)
303304
.with_entity_id(self.id)
304305
.with_transformation_id(transformation_id)
306+
.with_partition_keys(partition_keys)
305307
.with_primary_keys(primary_keys)
306308
.with_input_feature_details(input_feature_details)
307309
.with_statistics_config(statistics_config)
@@ -314,24 +316,27 @@ def _build_feature_group(
314316
return feature_group_resource
315317

316318
def create_feature_group(
317-
self,
318-
primary_keys: List[str],
319-
input_feature_details: List[FeatureDetail] = None,
320-
schema_details_dataframe: Union[DataFrame, pd.DataFrame] = None,
321-
expectation_suite: ExpectationSuite = None,
322-
expectation_type: ExpectationType = ExpectationType.NO_EXPECTATION,
323-
statistics_config: Union[StatisticsConfig, bool] = True,
324-
transformation_id: str = None,
325-
name: str = None,
326-
description: str = None,
327-
compartment_id: str = None,
319+
self,
320+
primary_keys: List[str],
321+
partition_keys: List[str] = None,
322+
input_feature_details: List[FeatureDetail] = None,
323+
schema_details_dataframe: Union[DataFrame, pd.DataFrame] = None,
324+
expectation_suite: ExpectationSuite = None,
325+
expectation_type: ExpectationType = ExpectationType.NO_EXPECTATION,
326+
statistics_config: Union[StatisticsConfig, bool] = True,
327+
transformation_id: str = None,
328+
name: str = None,
329+
description: str = None,
330+
compartment_id: str = None,
328331
) -> "FeatureGroup":
329332
"""Creates FeatureGroup resource.
330333
331334
Parameters
332335
----------
333336
primary_keys: List[str]
334337
List of primary keys.
338+
partition_keys: List[str]
339+
List of partition_keys to partition the materialized data.
335340
input_feature_details: List[FeatureDetail]
336341
Raw feature schema for the input features.
337342
schema_details_dataframe: Union[DataFrame, pd.DataFrame]
@@ -377,6 +382,7 @@ def create_feature_group(
377382

378383
self.oci_feature_group = self._build_feature_group(
379384
primary_keys,
385+
partition_keys,
380386
raw_feature_details,
381387
expectation_suite,
382388
expectation_type,
@@ -405,7 +411,7 @@ def delete_feature_group(self):
405411

406412
@classmethod
407413
def list_feature_group(
408-
cls, compartment_id: str = None, **kwargs
414+
cls, compartment_id: str = None, **kwargs
409415
) -> List["FeatureGroup"]:
410416
"""Lists FeatureGroup resources in a given compartment.
411417
@@ -426,7 +432,7 @@ def list_feature_group(
426432

427433
@classmethod
428434
def list_feature_group_df(
429-
cls, compartment_id: str = None, **kwargs
435+
cls, compartment_id: str = None, **kwargs
430436
) -> "pandas.DataFrame":
431437
"""Lists FeatureGroup resources in a given compartment as pandas dataframe.
432438
@@ -446,14 +452,14 @@ def list_feature_group_df(
446452
return FeatureGroup.list_df(compartment_id, **kwargs)
447453

448454
def _build_dataset(
449-
self,
450-
query: str,
451-
name: str = None,
452-
description: str = None,
453-
compartment_id: str = None,
454-
expectation_suite: ExpectationSuite = None,
455-
expectation_type: ExpectationType = ExpectationType.NO_EXPECTATION,
456-
statistics_config: Union[StatisticsConfig, bool] = True,
455+
self,
456+
query: str,
457+
name: str = None,
458+
description: str = None,
459+
compartment_id: str = None,
460+
expectation_suite: ExpectationSuite = None,
461+
expectation_type: ExpectationType = ExpectationType.NO_EXPECTATION,
462+
statistics_config: Union[StatisticsConfig, bool] = True,
457463
):
458464
dataset_resource = (
459465
Dataset()
@@ -477,14 +483,14 @@ def _build_dataset(
477483
return dataset_resource
478484

479485
def create_dataset(
480-
self,
481-
query: str,
482-
name: str = None,
483-
description: str = None,
484-
compartment_id: str = None,
485-
expectation_suite: ExpectationSuite = None,
486-
expectation_type: ExpectationType = ExpectationType.NO_EXPECTATION,
487-
statistics_config: Union[StatisticsConfig, bool] = True,
486+
self,
487+
query: str,
488+
name: str = None,
489+
description: str = None,
490+
compartment_id: str = None,
491+
expectation_suite: ExpectationSuite = None,
492+
expectation_type: ExpectationType = ExpectationType.NO_EXPECTATION,
493+
statistics_config: Union[StatisticsConfig, bool] = True,
488494
) -> "Dataset":
489495
"""Creates Dataset resource.
490496
@@ -560,7 +566,7 @@ def list_dataset(cls, compartment_id: str = None, **kwargs) -> List["Dataset"]:
560566

561567
@classmethod
562568
def list_dataset_df(
563-
cls, compartment_id: str = None, **kwargs
569+
cls, compartment_id: str = None, **kwargs
564570
) -> "pandas.DataFrame":
565571
"""Lists Dataset resources in a given compartment as pandas dataframe.
566572

ads/feature_store/execution_strategy/delta_lake/delta_lake_service.py

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ def write_dataframe_to_delta_lake(
3737
dataflow_output,
3838
target_table_name,
3939
delta_table_primary_key,
40+
partition_keys,
4041
ingestion_mode,
4142
raw_schema,
4243
feature_options=None,
@@ -47,6 +48,7 @@ def write_dataframe_to_delta_lake(
4748
dataflow_output (DataFrame): The data frame that needs to be written to the Delta table.
4849
target_table_name (str): The name of the target Delta table.
4950
delta_table_primary_key (List[dict]): The list of primary keys for the target Delta table.
51+
partition_keys(List[dict]): The List of partition Keys.
5052
ingestion_mode (str): The ingestion mode for the data load.
5153
raw_schema (StructType): The schema of the raw data being ingested.
5254
feature_options (Dict[str, Union[str, int, float, bool]]): Optional. The dictionary containing feature options.
@@ -55,6 +57,8 @@ def write_dataframe_to_delta_lake(
5557
None.
5658
"""
5759

60+
print(delta_table_primary_key, partition_keys, "Aaaya Kya")
61+
5862
logger.info(f"target table name {target_table_name}")
5963
if (
6064
self.spark_engine.is_delta_table_exists(target_table_name)
@@ -109,7 +113,7 @@ def write_dataframe_to_delta_lake(
109113
logger.info(f"Upsert ops for target table {target_table_name} ended")
110114
else:
111115
self.save_delta_dataframe(
112-
dataflow_output, ingestion_mode, target_table_name, feature_options
116+
dataflow_output, ingestion_mode, target_table_name, feature_options, partition_keys
113117
)
114118

115119
def __execute_delta_merge_insert_update(
@@ -193,7 +197,7 @@ def __execute_delta_merge_insert_update_all(
193197
)
194198

195199
def save_delta_dataframe(
196-
self, dataframe, dataframe_ingestion_mode, table_name, feature_options=None
200+
self, dataframe, dataframe_ingestion_mode, table_name, feature_options=None, partition_keys=None
197201
):
198202
"""
199203
Saves a DataFrame to a Delta table with the specified options.
@@ -205,6 +209,12 @@ def save_delta_dataframe(
205209
feature_options (dict): Optional feature options to use when saving the DataFrame.
206210
207211
"""
212+
delta_partition_keys = []
213+
214+
partition_keys_items = partition_keys["items"]
215+
if partition_keys_items:
216+
delta_partition_keys = [partition_key.get("name") for partition_key in partition_keys_items]
217+
208218
if feature_options and feature_options.get("featureOptionWriteConfigDetails"):
209219
feature_delta_write_option_config = feature_options.get(
210220
"featureOptionWriteConfigDetails"
@@ -216,9 +226,9 @@ def save_delta_dataframe(
216226

217227
dataframe.write.format("delta").options(
218228
**self.get_delta_write_config(feature_delta_write_option_config)
219-
).mode(dataframe_ingestion_mode).saveAsTable(table_name)
229+
).mode(dataframe_ingestion_mode).partitionBy(delta_partition_keys).saveAsTable(table_name)
220230
else:
221-
dataframe.write.format("delta").mode(dataframe_ingestion_mode).saveAsTable(
231+
dataframe.write.format("delta").mode(dataframe_ingestion_mode).partitionBy(delta_partition_keys).saveAsTable(
222232
table_name
223233
)
224234

ads/feature_store/execution_strategy/spark/spark_execution.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -260,6 +260,7 @@ def _save_offline_dataframe(
260260
featured_data,
261261
target_table,
262262
feature_group.primary_keys,
263+
feature_group.partition_keys,
263264
feature_group_job.ingestion_mode,
264265
featured_data.schema,
265266
feature_group_job.feature_option_details,

ads/feature_store/feature_group.py

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -127,6 +127,7 @@ class FeatureGroup(Builder):
127127
CONST_ITEMS = "items"
128128
CONST_PRIMARY_KEY_NAME = "name"
129129
CONST_PRIMARY_KEYS = "primaryKeys"
130+
CONST_PARTITION_KEYS = "partitionKeys"
130131
CONST_EXPECTATION_DETAILS = "expectationDetails"
131132
CONST_INPUT_FEATURE_DETAILS = "inputFeatureDetails"
132133
CONST_OUTPUT_FEATURE_DETAILS = "outputFeatureDetails"
@@ -156,6 +157,7 @@ class FeatureGroup(Builder):
156157
CONST_OUTPUT_FEATURE_DETAILS: "output_feature_details",
157158
CONST_STATISTICS_CONFIG: "statistics_config",
158159
CONST_INFER_SCHEMA: "is_infer_schema",
160+
CONST_PARTITION_KEYS: "partition_keys"
159161
}
160162

161163
def __init__(self, spec: Dict = None, **kwargs) -> None:
@@ -325,6 +327,37 @@ def with_primary_keys(self, primary_keys: List[str]) -> "FeatureGroup":
325327
},
326328
)
327329

330+
@property
331+
def partition_keys(self) -> List[str]:
332+
return self.get_spec(self.CONST_PARTITION_KEYS)
333+
334+
@partition_keys.setter
335+
def partition_keys(self, value: List[str]):
336+
self.with_partition_keys(value)
337+
338+
def with_partition_keys(self, partition_keys: List[str]) -> "FeatureGroup":
339+
"""Sets the partition keys of the feature group.
340+
341+
Parameters
342+
----------
343+
partition_keys: List[str]
344+
The List of partition keys for the feature group.
345+
346+
Returns
347+
-------
348+
FeatureGroup
349+
The FeatureGroup instance (self)
350+
"""
351+
return self.set_spec(
352+
self.CONST_PARTITION_KEYS,
353+
{
354+
self.CONST_ITEMS: [
355+
{self.CONST_PRIMARY_KEY_NAME: partition_key}
356+
for partition_key in partition_keys or []
357+
]
358+
},
359+
)
360+
328361
@property
329362
def feature_store_id(self) -> str:
330363
return self.get_spec(self.CONST_FEATURE_STORE_ID)

0 commit comments

Comments
 (0)