Skip to content

Commit fbdddd3

Browse files
authored
Streaming dataframe support (#354)
2 parents 1ec5922 + 399b1a7 commit fbdddd3

20 files changed

+585
-37
lines changed

ads/feature_store/common/enums.py

Lines changed: 29 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,20 @@ class DatasetIngestionMode(Enum):
4949
SQL = "SQL"
5050

5151

52-
class IngestionMode(Enum):
52+
class IngestionType(Enum):
53+
"""
54+
The type of ingestion that can be performed.
55+
56+
Possible values:
57+
* STREAMING: The data is ingested in real time.
58+
* BATCH: The data is ingested in batches.
59+
"""
60+
61+
STREAMING = "STREAMING"
62+
BATCH = "BATCH"
63+
64+
65+
class BatchIngestionMode(Enum):
5366
"""
5467
An enumeration that represents the supported Ingestion Mode in feature store.
5568
@@ -68,6 +81,20 @@ class IngestionMode(Enum):
6881
UPSERT = "UPSERT"
6982

7083

84+
class StreamingIngestionMode(Enum):
85+
"""
86+
Enumeration for stream ingestion modes.
87+
88+
- `COMPLETE`: Represents complete stream ingestion where the entire dataset is replaced.
89+
- `APPEND`: Represents appending new data to the existing dataset.
90+
- `UPDATE`: Represents updating existing data in the dataset.
91+
"""
92+
93+
COMPLETE = "COMPLETE"
94+
APPEND = "APPEND"
95+
UPDATE = "UPDATE"
96+
97+
7198
class JoinType(Enum):
7299
"""Enumeration of supported SQL join types.
73100
@@ -214,6 +241,7 @@ class TransformationMode(Enum):
214241

215242
SQL = "sql"
216243
PANDAS = "pandas"
244+
SPARK = "spark"
217245

218246

219247
class FilterOperators(Enum):

ads/feature_store/common/utils/transformation_utils.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,10 @@ def apply_transformation(
6969
temporary_table_view, **transformation_kwargs_dict
7070
)
7171
)
72-
elif transformation.transformation_mode == TransformationMode.PANDAS.value:
72+
elif transformation.transformation_mode in [
73+
TransformationMode.PANDAS.value,
74+
TransformationMode.SPARK.value,
75+
]:
7376
transformed_data = transformation_function_caller(
7477
dataframe, **transformation_kwargs_dict
7578
)

ads/feature_store/dataset.py

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,14 +21,15 @@
2121
ExecutionEngine,
2222
ExpectationType,
2323
EntityType,
24+
BatchIngestionMode,
2425
)
2526
from ads.feature_store.common.exceptions import NotMaterializedError
2627
from ads.feature_store.common.utils.utility import (
2728
get_metastore_id,
2829
validate_delta_format_parameters,
2930
convert_expectation_suite_to_expectation,
3031
)
31-
from ads.feature_store.dataset_job import DatasetJob, IngestionMode
32+
from ads.feature_store.dataset_job import DatasetJob
3233
from ads.feature_store.execution_strategy.engine.spark_engine import SparkEngine
3334
from ads.feature_store.execution_strategy.execution_strategy_provider import (
3435
OciExecutionStrategyProvider,
@@ -779,7 +780,7 @@ def delete(self):
779780
None
780781
"""
781782
# Create DataSet Job and persist it
782-
dataset_job = self._build_dataset_job(IngestionMode.DEFAULT)
783+
dataset_job = self._build_dataset_job(BatchIngestionMode.DEFAULT)
783784

784785
# Create the Job
785786
dataset_job.create()
@@ -874,7 +875,7 @@ def _update_from_oci_dataset_model(self, oci_dataset: OCIDataset) -> "Dataset":
874875

875876
def materialise(
876877
self,
877-
ingestion_mode: IngestionMode = IngestionMode.OVERWRITE,
878+
ingestion_mode: BatchIngestionMode = BatchIngestionMode.OVERWRITE,
878879
feature_option_details: FeatureOptionDetails = None,
879880
):
880881
"""Creates a dataset job.

ads/feature_store/dataset_job.py

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -5,13 +5,17 @@
55
# Licensed under the Universal Permissive License v 1.0 as shown at https://oss.oracle.com/licenses/upl/
66
import logging
77
from copy import deepcopy
8-
from typing import Dict, List, Any
8+
from typing import Dict, List, Any, Union
99

1010
import pandas
1111

1212
from ads.common import utils
13+
from ads.feature_store.common.enums import (
14+
JobConfigurationType,
15+
BatchIngestionMode,
16+
StreamingIngestionMode,
17+
)
1318
from ads.feature_store.feature_option_details import FeatureOptionDetails
14-
from ads.feature_store.common.enums import IngestionMode, JobConfigurationType
1519
from ads.feature_store.service.oci_dataset_job import OCIDatasetJob
1620
from ads.jobs.builders.base import Builder
1721

@@ -225,10 +229,14 @@ def ingestion_mode(self) -> str:
225229
return self.get_spec(self.CONST_INGESTION_MODE)
226230

227231
@ingestion_mode.setter
228-
def ingestion_mode(self, ingestion_mode: IngestionMode) -> "DatasetJob":
232+
def ingestion_mode(
233+
self, ingestion_mode: Union[BatchIngestionMode, StreamingIngestionMode]
234+
) -> "DatasetJob":
229235
return self.with_ingestion_mode(ingestion_mode)
230236

231-
def with_ingestion_mode(self, ingestion_mode: IngestionMode) -> "DatasetJob":
237+
def with_ingestion_mode(
238+
self, ingestion_mode: Union[BatchIngestionMode, StreamingIngestionMode]
239+
) -> "DatasetJob":
232240
"""Sets the mode of the dataset ingestion mode.
233241
234242
Parameters

ads/feature_store/docs/source/dataset.rst

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,7 @@ Use the ``from_id()`` method from the ``Dataset`` class to load an existing data
7474
7575
from ads.feature_store.dataset import Dataset
7676
77-
dataset = Dataset.from_id("ocid1.dataset..<unique_id>")
77+
dataset = Dataset.from_id("<unique_id>")
7878
7979
Materialise
8080
===========
@@ -138,6 +138,10 @@ Feature store allows you to define expectations on data being materialized into
138138

139139
.. code-block:: python3
140140
141+
from great_expectations.core import ExpectationSuite, ExpectationConfiguration
142+
from ads.feature_store.common.enums import TransformationMode, ExpectationType
143+
from ads.feature_store.feature_group import FeatureGroup
144+
141145
expectation_suite = ExpectationSuite(
142146
expectation_suite_name="expectation_suite_name"
143147
)
@@ -186,6 +190,7 @@ dataset or it can be updated later as well.
186190
.. code-block:: python3
187191
188192
# Define statistics configuration for selected features
193+
from ads.feature_store.statistics_config import StatisticsConfig
189194
stats_config = StatisticsConfig().with_is_enabled(True).with_columns(["column1", "column2"])
190195
191196
@@ -194,6 +199,7 @@ This can be used with dataset instance.
194199
.. code-block:: python3
195200
196201
from ads.feature_store.dataset import Dataset
202+
from ads.feature_store.statistics_config import StatisticsConfig
197203
198204
dataset = (
199205
Dataset

ads/feature_store/docs/source/dataset_job.rst

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,7 @@ Use the ``from_id()`` method from the ``DatasetJob`` class to load an existing d
6767
6868
from ads.feature_store.dataset_job import DatasetJob
6969
70-
dataset_job = DatasetJob.from_id("ocid1.dataset_job..<unique_id>")
70+
dataset_job = DatasetJob.from_id("<unique_id>")
7171
7272
Delete
7373
======

ads/feature_store/docs/source/entity.rst

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@ Use the ``from_id()`` method from the ``Entity`` class to load an existing entit
6666
6767
from ads.feature_store.entity import Entity
6868
69-
entity = Entity.from_id("ocid1.entity..<unique_id>")
69+
entity = Entity.from_id("<unique_id>")
7070
7171
Delete
7272
======

ads/feature_store/docs/source/feature_group.rst

Lines changed: 31 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -93,7 +93,7 @@ Use the ``from_id()`` method from the ``FeatureGroup`` class to load an existing
9393
9494
from ads.feature_store.feature_group import FeatureGroup
9595
96-
feature_group = FeatureGroup.from_id("ocid1.feature_group..<unique_id>")
96+
feature_group = FeatureGroup.from_id("<unique_id>")
9797
9898
9999
Materialise
@@ -122,6 +122,32 @@ The ``.materialise()`` method takes the following parameter:
122122
.. seealso::
123123
Refer :ref:`Data types` supported by feature store
124124

125+
126+
Materialise Stream
127+
==================
128+
You can call the ``materialise_stream() -> FeatureGroupJob`` method of the ``FeatureGroup`` instance to load the streaming data to feature group. To persist the feature_group and save feature_group data along the metadata in the feature store, call the ``materialise_stream()``
129+
130+
The ``.materialise_stream()`` method takes the following parameter:
131+
- ``input_dataframe``: Features in Streaming Dataframe to be saved.
132+
- ``query_name``: It is possible to optionally specify a name for the query to make it easier to recognise in the Spark UI. Defaults to ``None``.
133+
- ``ingestion_mode``: Specifies how data of a streaming DataFrame/Dataset is written to a streaming sink.
134+
- ``append``: Only the new rows in the streaming DataFrame/Dataset will be written to the sink. If the query doesn’t contain aggregations, it will be equivalent to append mode. Defaults to ``"append"``.
135+
- ``complete``: All the rows in the streaming DataFrame/Dataset will be written to the sink every time there is some update.
136+
- ``update``: only the rows that were updated in the streaming DataFrame/Dataset will be written to the sink every time there are some updates.
137+
- ``await_termination``: Waits for the termination of this query, either by ``query.stop()`` or by an exception. If the query has terminated with an exception, then the exception will be thrown. If timeout is set, it returns whether the query has terminated or not within the timeout seconds. Defaults to ``False``.
138+
- ``timeout``: Only relevant in combination with ``await_termination=True``.
139+
- Defaults to ``None``.
140+
- ``checkpoint_dir``: Checkpoint directory location. This will be used to as a reference to from where to resume the streaming job. Defaults to ``None``.
141+
- ``write_options``: Additional write options for Spark as key-value pairs.
142+
- Defaults to ``{}``.
143+
144+
.. seealso::
145+
:ref:`Feature Group Job`
146+
147+
.. seealso::
148+
Refer :ref:`Data types` supported by feature store
149+
150+
125151
Delete
126152
======
127153

@@ -173,6 +199,9 @@ With a ``FeatureGroup`` instance, You can save the expectation details using ``w
173199
.. image:: figures/validation.png
174200

175201
.. code-block:: python3
202+
from great_expectations.core import ExpectationSuite, ExpectationConfiguration
203+
from ads.feature_store.common.enums import TransformationMode, ExpectationType
204+
from ads.feature_store.feature_group import FeatureGroup
176205
177206
expectation_suite = ExpectationSuite(
178207
expectation_suite_name="expectation_suite_name"
@@ -221,6 +250,7 @@ feature group or it can be updated later as well.
221250
.. code-block:: python3
222251
223252
# Define statistics configuration for selected features
253+
from ads.feature_store.statistics_config import StatisticsConfig
224254
stats_config = StatisticsConfig().with_is_enabled(True).with_columns(["column1", "column2"])
225255
226256

ads/feature_store/docs/source/feature_group_job.rst

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,7 @@ Use the ``from_id()`` method from the ``FeatureGroupJob`` class to load an exist
6767
6868
from ads.feature_store.feature_group_job import FeatureGroupJob
6969
70-
feature_group_job = FeatureGroupJob.from_id("ocid1.feature_group_job..<unique_id>")
70+
feature_group_job = FeatureGroupJob.from_id("<unique_id>")
7171
7272
Delete
7373
======

ads/feature_store/docs/source/feature_store.rst

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,7 @@ Use the ``from_id()`` method from the ``FeatureStore`` class to load an existing
6767
6868
from ads.feature_store.feature_store import FeatureStore
6969
70-
feature_store = FeatureStore.from_id("ocid1.feature_store..<unique_id>")
70+
feature_store = FeatureStore.from_id("<unique_id>")
7171
7272
Delete
7373
======

0 commit comments

Comments
 (0)