Skip to content

Commit 6bbb58a

Browse files
committed
code refactor for streaming dataframe
1 parent dc4c9de commit 6bbb58a

File tree

9 files changed

+130
-92
lines changed

9 files changed

+130
-92
lines changed

ads/feature_store/common/enums.py

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,20 @@ class DatasetIngestionMode(Enum):
4949
SQL = "SQL"
5050

5151

52-
class IngestionMode(Enum):
52+
class IngestionType(Enum):
53+
"""
54+
The type of ingestion that can be performed.
55+
56+
Possible values:
57+
* STREAMING: The data is ingested in real time.
58+
* BATCH: The data is ingested in batches.
59+
"""
60+
61+
STREAMING = "STREAMING"
62+
BATCH = "BATCH"
63+
64+
65+
class BatchIngestionMode(Enum):
5366
"""
5467
An enumeration that represents the supported Ingestion Mode in feature store.
5568
@@ -67,18 +80,21 @@ class IngestionMode(Enum):
6780
DEFAULT = "DEFAULT"
6881
UPSERT = "UPSERT"
6982

70-
class StreamIngestionMode(Enum):
83+
84+
class StreamingIngestionMode(Enum):
7185
"""
7286
Enumeration for stream ingestion modes.
7387
7488
- `COMPLETE`: Represents complete stream ingestion where the entire dataset is replaced.
7589
- `APPEND`: Represents appending new data to the existing dataset.
7690
- `UPDATE`: Represents updating existing data in the dataset.
7791
"""
92+
7893
COMPLETE = "COMPLETE"
7994
APPEND = "APPEND"
8095
UPDATE = "UPDATE"
8196

97+
8298
class JoinType(Enum):
8399
"""Enumeration of supported SQL join types.
84100

ads/feature_store/common/utils/transformation_utils.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,10 @@ def apply_transformation(
6969
temporary_table_view, **transformation_kwargs_dict
7070
)
7171
)
72-
elif transformation.transformation_mode in [TransformationMode.PANDAS.value, TransformationMode.SPARK.value]:
72+
elif transformation.transformation_mode in [
73+
TransformationMode.PANDAS.value,
74+
TransformationMode.SPARK.value,
75+
]:
7376
transformed_data = transformation_function_caller(
7477
dataframe, **transformation_kwargs_dict
7578
)

ads/feature_store/dataset.py

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,14 +21,15 @@
2121
ExecutionEngine,
2222
ExpectationType,
2323
EntityType,
24+
BatchIngestionMode,
2425
)
2526
from ads.feature_store.common.exceptions import NotMaterializedError
2627
from ads.feature_store.common.utils.utility import (
2728
get_metastore_id,
2829
validate_delta_format_parameters,
2930
convert_expectation_suite_to_expectation,
3031
)
31-
from ads.feature_store.dataset_job import DatasetJob, IngestionMode
32+
from ads.feature_store.dataset_job import DatasetJob
3233
from ads.feature_store.execution_strategy.engine.spark_engine import SparkEngine
3334
from ads.feature_store.execution_strategy.execution_strategy_provider import (
3435
OciExecutionStrategyProvider,
@@ -779,7 +780,7 @@ def delete(self):
779780
None
780781
"""
781782
# Create DataSet Job and persist it
782-
dataset_job = self._build_dataset_job(IngestionMode.DEFAULT)
783+
dataset_job = self._build_dataset_job(BatchIngestionMode.DEFAULT)
783784

784785
# Create the Job
785786
dataset_job.create()
@@ -874,7 +875,7 @@ def _update_from_oci_dataset_model(self, oci_dataset: OCIDataset) -> "Dataset":
874875

875876
def materialise(
876877
self,
877-
ingestion_mode: IngestionMode = IngestionMode.OVERWRITE,
878+
ingestion_mode: BatchIngestionMode = BatchIngestionMode.OVERWRITE,
878879
feature_option_details: FeatureOptionDetails = None,
879880
):
880881
"""Creates a dataset job.

ads/feature_store/dataset_job.py

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -5,13 +5,17 @@
55
# Licensed under the Universal Permissive License v 1.0 as shown at https://oss.oracle.com/licenses/upl/
66
import logging
77
from copy import deepcopy
8-
from typing import Dict, List, Any
8+
from typing import Dict, List, Any, Union
99

1010
import pandas
1111

1212
from ads.common import utils
13+
from ads.feature_store.common.enums import (
14+
JobConfigurationType,
15+
BatchIngestionMode,
16+
StreamingIngestionMode,
17+
)
1318
from ads.feature_store.feature_option_details import FeatureOptionDetails
14-
from ads.feature_store.common.enums import IngestionMode, JobConfigurationType
1519
from ads.feature_store.service.oci_dataset_job import OCIDatasetJob
1620
from ads.jobs.builders.base import Builder
1721

@@ -225,10 +229,14 @@ def ingestion_mode(self) -> str:
225229
return self.get_spec(self.CONST_INGESTION_MODE)
226230

227231
@ingestion_mode.setter
228-
def ingestion_mode(self, ingestion_mode: IngestionMode) -> "DatasetJob":
232+
def ingestion_mode(
233+
self, ingestion_mode: Union[BatchIngestionMode, StreamingIngestionMode]
234+
) -> "DatasetJob":
229235
return self.with_ingestion_mode(ingestion_mode)
230236

231-
def with_ingestion_mode(self, ingestion_mode: IngestionMode) -> "DatasetJob":
237+
def with_ingestion_mode(
238+
self, ingestion_mode: Union[BatchIngestionMode, StreamingIngestionMode]
239+
) -> "DatasetJob":
232240
"""Sets the mode of the dataset ingestion mode.
233241
234242
Parameters

ads/feature_store/execution_strategy/delta_lake/delta_lake_service.py

Lines changed: 7 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77
import logging
88

99
from ads.common.decorator.runtime_dependency import OptionalDependency
10-
from ads.feature_store.common.enums import IngestionMode
10+
from ads.feature_store.common.enums import BatchIngestionMode
1111
from ads.feature_store.execution_strategy.engine.spark_engine import SparkEngine
1212

1313
try:
@@ -57,21 +57,10 @@ def write_dataframe_to_delta_lake(
5757
None.
5858
"""
5959
logger.info(f"target table name {target_table_name}")
60-
# query = (
61-
# dataflow_output.writeStream.outputMode("append")
62-
# .format("delta")
63-
# .option(
64-
# "checkpointLocation",
65-
# "/Users/yogeshkumawat/Desktop/Github-Oracle/accelerated-data-science/TestYogi/streaming",
66-
# )
67-
# .toTable(target_table_name)
68-
# )
69-
#
70-
# query.awaitTermination()
7160

7261
if (
7362
self.spark_engine.is_delta_table_exists(target_table_name)
74-
and ingestion_mode.upper() == IngestionMode.UPSERT.value
63+
and ingestion_mode.upper() == BatchIngestionMode.UPSERT.value
7564
):
7665
logger.info(f"Upsert ops for target table {target_table_name} begin")
7766

@@ -365,16 +354,17 @@ def write_stream_dataframe_to_delta_lake(
365354
checkpoint_dir,
366355
feature_option_details,
367356
):
357+
if query_name is None:
358+
query_name = "insert_stream_" + target_table.split(".")[1]
359+
368360
query = (
369-
stream_dataframe
370-
.writeStream.
371-
outputMode(output_mode)
361+
stream_dataframe.writeStream.outputMode(output_mode)
372362
.format("delta")
373363
.option(
374364
"checkpointLocation",
375365
checkpoint_dir,
376366
)
377-
.options(self.get_delta_write_config(feature_option_details))
367+
.options(**self.get_delta_write_config(feature_option_details))
378368
.queryName(query_name)
379369
.toTable(target_table)
380370
)

ads/feature_store/execution_strategy/engine/spark_engine.py

Lines changed: 14 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,6 @@
88
from datetime import datetime
99

1010
from ads.common.decorator.runtime_dependency import OptionalDependency
11-
from ads.feature_store.common.utils.utility import get_schema_from_spark_dataframe, get_schema_from_spark_df
1211

1312
try:
1413
from pyspark.sql import SparkSession
@@ -43,10 +42,10 @@ def __init__(self, metastore_id: str = None, spark_session: SparkSession = None)
4342
)
4443

4544
def get_time_version_data(
46-
self,
47-
delta_table_name: str,
48-
version_number: int = None,
49-
timestamp: datetime = None,
45+
self,
46+
delta_table_name: str,
47+
version_number: int = None,
48+
timestamp: datetime = None,
5049
):
5150
split_db_name = delta_table_name.split(".")
5251

@@ -104,10 +103,10 @@ def _read_delta_table(self, delta_table_path: str, read_options: Dict):
104103
return df
105104

106105
def sql(
107-
self,
108-
query: str,
109-
dataframe_type: DataFrameType = DataFrameType.SPARK,
110-
is_online: bool = False,
106+
self,
107+
query: str,
108+
dataframe_type: DataFrameType = DataFrameType.SPARK,
109+
is_online: bool = False,
111110
):
112111
"""Execute SQL command on the offline or online feature store database
113112
@@ -187,7 +186,9 @@ def get_tables_from_database(self, database):
187186

188187
return permanent_tables
189188

190-
def get_output_columns_from_table_or_dataframe(self, table_name: str = None, dataframe=None):
189+
def get_output_columns_from_table_or_dataframe(
190+
self, table_name: str = None, dataframe=None
191+
):
191192
"""Returns the column(features) along with type from the given table.
192193
193194
Args:
@@ -200,7 +201,9 @@ def get_output_columns_from_table_or_dataframe(self, table_name: str = None, dat
200201
201202
"""
202203
if table_name is None and dataframe is None:
203-
raise ValueError("Either 'table_name' or 'dataframe' must be provided to retrieve output columns.")
204+
raise ValueError(
205+
"Either 'table_name' or 'dataframe' must be provided to retrieve output columns."
206+
)
204207

205208
if dataframe is not None:
206209
feature_data_target = dataframe

0 commit comments

Comments
 (0)