Skip to content

Commit 6035770

Browse files
committed
added docs by lorna
2 parents 935ab5c + fbdddd3 commit 6035770

File tree

13 files changed

+542
-30
lines changed

13 files changed

+542
-30
lines changed

ads/feature_store/common/enums.py

Lines changed: 29 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,20 @@ class DatasetIngestionMode(Enum):
4949
SQL = "SQL"
5050

5151

52-
class IngestionMode(Enum):
52+
class IngestionType(Enum):
53+
"""
54+
The type of ingestion that can be performed.
55+
56+
Possible values:
57+
* STREAMING: The data is ingested in real time.
58+
* BATCH: The data is ingested in batches.
59+
"""
60+
61+
STREAMING = "STREAMING"
62+
BATCH = "BATCH"
63+
64+
65+
class BatchIngestionMode(Enum):
5366
"""
5467
An enumeration that represents the supported Ingestion Mode in feature store.
5568
@@ -68,6 +81,20 @@ class IngestionMode(Enum):
6881
UPSERT = "UPSERT"
6982

7083

84+
class StreamingIngestionMode(Enum):
85+
"""
86+
Enumeration for stream ingestion modes.
87+
88+
- `COMPLETE`: Represents complete stream ingestion where the entire dataset is replaced.
89+
- `APPEND`: Represents appending new data to the existing dataset.
90+
- `UPDATE`: Represents updating existing data in the dataset.
91+
"""
92+
93+
COMPLETE = "COMPLETE"
94+
APPEND = "APPEND"
95+
UPDATE = "UPDATE"
96+
97+
7198
class JoinType(Enum):
7299
"""Enumeration of supported SQL join types.
73100
@@ -214,6 +241,7 @@ class TransformationMode(Enum):
214241

215242
SQL = "sql"
216243
PANDAS = "pandas"
244+
SPARK = "spark"
217245

218246

219247
class FilterOperators(Enum):

ads/feature_store/common/utils/transformation_utils.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,10 @@ def apply_transformation(
6969
temporary_table_view, **transformation_kwargs_dict
7070
)
7171
)
72-
elif transformation.transformation_mode == TransformationMode.PANDAS.value:
72+
elif transformation.transformation_mode in [
73+
TransformationMode.PANDAS.value,
74+
TransformationMode.SPARK.value,
75+
]:
7376
transformed_data = transformation_function_caller(
7477
dataframe, **transformation_kwargs_dict
7578
)

ads/feature_store/dataset.py

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,14 +21,15 @@
2121
ExecutionEngine,
2222
ExpectationType,
2323
EntityType,
24+
BatchIngestionMode,
2425
)
2526
from ads.feature_store.common.exceptions import NotMaterializedError
2627
from ads.feature_store.common.utils.utility import (
2728
get_metastore_id,
2829
validate_delta_format_parameters,
2930
convert_expectation_suite_to_expectation,
3031
)
31-
from ads.feature_store.dataset_job import DatasetJob, IngestionMode
32+
from ads.feature_store.dataset_job import DatasetJob
3233
from ads.feature_store.execution_strategy.engine.spark_engine import SparkEngine
3334
from ads.feature_store.execution_strategy.execution_strategy_provider import (
3435
OciExecutionStrategyProvider,
@@ -779,7 +780,7 @@ def delete(self):
779780
None
780781
"""
781782
# Create DataSet Job and persist it
782-
dataset_job = self._build_dataset_job(IngestionMode.DEFAULT)
783+
dataset_job = self._build_dataset_job(BatchIngestionMode.DEFAULT)
783784

784785
# Create the Job
785786
dataset_job.create()
@@ -874,7 +875,7 @@ def _update_from_oci_dataset_model(self, oci_dataset: OCIDataset) -> "Dataset":
874875

875876
def materialise(
876877
self,
877-
ingestion_mode: IngestionMode = IngestionMode.OVERWRITE,
878+
ingestion_mode: BatchIngestionMode = BatchIngestionMode.OVERWRITE,
878879
feature_option_details: FeatureOptionDetails = None,
879880
):
880881
"""Creates a dataset job.

ads/feature_store/dataset_job.py

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -5,13 +5,17 @@
55
# Licensed under the Universal Permissive License v 1.0 as shown at https://oss.oracle.com/licenses/upl/
66
import logging
77
from copy import deepcopy
8-
from typing import Dict, List, Any
8+
from typing import Dict, List, Any, Union
99

1010
import pandas
1111

1212
from ads.common import utils
13+
from ads.feature_store.common.enums import (
14+
JobConfigurationType,
15+
BatchIngestionMode,
16+
StreamingIngestionMode,
17+
)
1318
from ads.feature_store.feature_option_details import FeatureOptionDetails
14-
from ads.feature_store.common.enums import IngestionMode, JobConfigurationType
1519
from ads.feature_store.service.oci_dataset_job import OCIDatasetJob
1620
from ads.jobs.builders.base import Builder
1721

@@ -225,10 +229,14 @@ def ingestion_mode(self) -> str:
225229
return self.get_spec(self.CONST_INGESTION_MODE)
226230

227231
@ingestion_mode.setter
228-
def ingestion_mode(self, ingestion_mode: IngestionMode) -> "DatasetJob":
232+
def ingestion_mode(
233+
self, ingestion_mode: Union[BatchIngestionMode, StreamingIngestionMode]
234+
) -> "DatasetJob":
229235
return self.with_ingestion_mode(ingestion_mode)
230236

231-
def with_ingestion_mode(self, ingestion_mode: IngestionMode) -> "DatasetJob":
237+
def with_ingestion_mode(
238+
self, ingestion_mode: Union[BatchIngestionMode, StreamingIngestionMode]
239+
) -> "DatasetJob":
232240
"""Sets the mode of the dataset ingestion mode.
233241
234242
Parameters

ads/feature_store/execution_strategy/delta_lake/delta_lake_service.py

Lines changed: 34 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77
import logging
88

99
from ads.common.decorator.runtime_dependency import OptionalDependency
10-
from ads.feature_store.common.enums import IngestionMode
10+
from ads.feature_store.common.enums import BatchIngestionMode
1111
from ads.feature_store.execution_strategy.engine.spark_engine import SparkEngine
1212

1313
try:
@@ -57,9 +57,10 @@ def write_dataframe_to_delta_lake(
5757
None.
5858
"""
5959
logger.info(f"target table name {target_table_name}")
60+
6061
if (
6162
self.spark_engine.is_delta_table_exists(target_table_name)
62-
and ingestion_mode.upper() == IngestionMode.UPSERT.value
63+
and ingestion_mode.upper() == BatchIngestionMode.UPSERT.value
6364
):
6465
logger.info(f"Upsert ops for target table {target_table_name} begin")
6566

@@ -341,3 +342,34 @@ def __get_insert_update_query_expression(feature_data_source_columns, table_name
341342

342343
logger.info(f"get_insert_update_query_expression {feature_data_update_set}")
343344
return feature_data_update_set
345+
346+
def write_stream_dataframe_to_delta_lake(
347+
self,
348+
stream_dataframe,
349+
target_table,
350+
output_mode,
351+
query_name,
352+
await_termination,
353+
timeout,
354+
checkpoint_dir,
355+
feature_option_details,
356+
):
357+
if query_name is None:
358+
query_name = "insert_stream_" + target_table.split(".")[1]
359+
360+
query = (
361+
stream_dataframe.writeStream.outputMode(output_mode)
362+
.format("delta")
363+
.option(
364+
"checkpointLocation",
365+
checkpoint_dir,
366+
)
367+
.options(**self.get_delta_write_config(feature_option_details))
368+
.queryName(query_name)
369+
.toTable(target_table)
370+
)
371+
372+
if await_termination:
373+
query.awaitTermination(timeout)
374+
375+
return query

ads/feature_store/execution_strategy/engine/spark_engine.py

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -186,19 +186,31 @@ def get_tables_from_database(self, database):
186186

187187
return permanent_tables
188188

189-
def get_columns_from_table(self, table_name: str):
189+
def get_output_columns_from_table_or_dataframe(
190+
self, table_name: str = None, dataframe=None
191+
):
190192
"""Returns the column(features) along with type from the given table.
191193
192194
Args:
193195
table_name(str): A string specifying the name of table name for which columns should be returned.
196+
dataframe: Dataframe containing the transformed dataframe.
194197
195198
Returns:
196199
List[{"name": "<feature_name>","featureType": "<feature_type>"}]
197200
Returns the List of dictionary of column with name and type from the given table.
201+
198202
"""
203+
if table_name is None and dataframe is None:
204+
raise ValueError(
205+
"Either 'table_name' or 'dataframe' must be provided to retrieve output columns."
206+
)
207+
208+
if dataframe is not None:
209+
feature_data_target = dataframe
210+
else:
211+
feature_data_target = self.spark.sql(f"SELECT * FROM {table_name} LIMIT 1")
199212

200213
target_table_columns = []
201-
feature_data_target = self.spark.sql(f"SELECT * FROM {table_name} LIMIT 1")
202214

203215
for field in feature_data_target.schema.fields:
204216
target_table_columns.append(

ads/feature_store/execution_strategy/execution_strategy.py

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,19 @@ def ingest_feature_definition(
4242
"""
4343
pass
4444

45+
@abstractmethod
46+
def ingest_feature_definition_stream(
47+
self,
48+
feature_group,
49+
feature_group_job: FeatureGroupJob,
50+
dataframe,
51+
query_name,
52+
await_termination,
53+
timeout,
54+
checkpoint_dir,
55+
):
56+
pass
57+
4558
@abstractmethod
4659
def ingest_dataset(self, dataset, dataset_job: DatasetJob):
4760
"""

0 commit comments

Comments
 (0)