Skip to content

Commit cf24b43

Browse files
committed
code changes for streaming integeration
1 parent 3ec41ae commit cf24b43

File tree

4 files changed

+198
-1
lines changed

4 files changed

+198
-1
lines changed

ads/feature_store/execution_strategy/spark/spark_execution.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -98,7 +98,7 @@ def ingest_feature_definition_stream(
9898
checkpoint_dir,
9999
):
100100
try:
101-
self._save_offline_dataframe_stream(
101+
return self._save_offline_dataframe_stream(
102102
dataframe,
103103
feature_group,
104104
feature_group_job,
Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
user_id,date,credit_score
2+
c123006815,01/01/22,568
3+
c123006815,01/01/22,568
4+
c123006850,05/02/22,740
Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
user_id,date,credit_score
2+
c123006818,04/01/22,571
3+
c123006847,02/02/22,800
4+
c123006820,06/01/22,573
5+
c123006857,12/02/22,850
6+
c123006822,08/01/22,575
7+
c123006823,09/01/22,300
8+
c123006824,10/01/22,577
Lines changed: 185 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,185 @@
1+
import time
2+
3+
from delta import configure_spark_with_delta_pip
4+
from great_expectations.core import ExpectationSuite, ExpectationConfiguration
5+
from pyspark.sql import SparkSession
6+
from pyspark.sql.types import StructType
7+
8+
from ads.feature_store.common.enums import TransformationMode, ExpectationType
9+
from ads.feature_store.statistics_config import StatisticsConfig
10+
from tests.integration.feature_store.test_base import FeatureStoreTestCase
11+
12+
13+
def get_streaming_df():
14+
spark_builder = (
15+
SparkSession.builder.appName("FeatureStore")
16+
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
17+
.config(
18+
"spark.sql.catalog.spark_catalog",
19+
"org.apache.spark.sql.delta.catalog.DeltaCatalog",
20+
)
21+
.enableHiveSupport()
22+
)
23+
24+
spark = configure_spark_with_delta_pip(
25+
spark_builder
26+
).getOrCreate()
27+
28+
# Define the schema for the streaming data frame
29+
credit_score_schema = StructType() \
30+
.add("user_id", "string") \
31+
.add("date", "string") \
32+
.add("credit_score", "string")
33+
34+
credit_score_streaming_df = spark.readStream \
35+
.option("sep", ",") \
36+
.option("header", "true")\
37+
.schema(credit_score_schema) \
38+
.csv("test_data/")
39+
40+
return credit_score_streaming_df
41+
42+
43+
def credit_score_transformation(credit_score):
44+
import pyspark.sql.functions as F
45+
46+
# Create a new Spark DataFrame that contains the transformed credit score.
47+
transformed_credit_score = credit_score.select(
48+
"user_id",
49+
"date",
50+
F.when(F.col("credit_score").cast("int") > 500, 1).otherwise(0).alias("credit_score")
51+
)
52+
53+
# Return the new Spark DataFrame.
54+
return transformed_credit_score
55+
56+
57+
class TestFeatureGroupWithStreamingDataFrame(FeatureStoreTestCase):
58+
"""Contains integration tests for Feature Group Kwargs supported transformation."""
59+
60+
def create_transformation_resource_stream(self, feature_store) -> "Transformation":
61+
transformation = feature_store.create_transformation(
62+
source_code_func=credit_score_transformation,
63+
display_name="credit_score_transformation",
64+
transformation_mode=TransformationMode.SPARK,
65+
)
66+
return transformation
67+
68+
69+
def test_feature_group_materialization_with_streaming_data_frame(self):
70+
fs = self.define_feature_store_resource().create()
71+
assert fs.oci_fs.id
72+
73+
entity = self.create_entity_resource(fs)
74+
assert entity.oci_fs_entity.id
75+
76+
transformation = self.create_transformation_resource_stream(fs)
77+
streaming_df = get_streaming_df()
78+
79+
stats_config = StatisticsConfig().with_is_enabled(False)
80+
fg = entity.create_feature_group(
81+
primary_keys=["User_id"],
82+
schema_details_dataframe=streaming_df,
83+
statistics_config=stats_config,
84+
name=self.get_name("streaming_fg_1"),
85+
transformation_id=transformation.id
86+
)
87+
assert fg.oci_feature_group.id
88+
89+
query = fg.materialise_stream(input_dataframe=streaming_df,
90+
checkpoint_dir=f"test_data/checkpoint/{fg.name}")
91+
92+
assert query
93+
time.sleep(10)
94+
query.stop()
95+
96+
assert fg.select().read().count() == 10
97+
98+
self.clean_up_feature_group(fg)
99+
self.clean_up_transformation(transformation)
100+
self.clean_up_entity(entity)
101+
self.clean_up_feature_store(fs)
102+
103+
def test_feature_group_materialization_with_streaming_data_frame_and_expectation(self):
104+
fs = self.define_feature_store_resource().create()
105+
assert fs.oci_fs.id
106+
107+
entity = self.create_entity_resource(fs)
108+
assert entity.oci_fs_entity.id
109+
110+
transformation = self.create_transformation_resource_stream(fs)
111+
streaming_df = get_streaming_df()
112+
113+
stats_config = StatisticsConfig().with_is_enabled(False)
114+
# Initialize Expectation Suite
115+
expectation_suite_trans = ExpectationSuite(expectation_suite_name="feature_definition")
116+
expectation_suite_trans.add_expectation(
117+
ExpectationConfiguration(
118+
expectation_type="EXPECT_COLUMN_VALUES_TO_BE_NULL", kwargs={"column": "date"}
119+
)
120+
)
121+
expectation_suite_trans.add_expectation(
122+
ExpectationConfiguration(
123+
expectation_type="EXPECT_COLUMN_VALUES_TO_NOT_BE_NULL",
124+
kwargs={"column": "date"},
125+
)
126+
)
127+
128+
fg = entity.create_feature_group(
129+
primary_keys=["User_id"],
130+
schema_details_dataframe=streaming_df,
131+
statistics_config=stats_config,
132+
expectation_suite=expectation_suite_trans,
133+
expectation_type=ExpectationType.LENIENT,
134+
name=self.get_name("streaming_fg_2"),
135+
transformation_id=transformation.id
136+
)
137+
assert fg.oci_feature_group.id
138+
139+
query = fg.materialise_stream(input_dataframe=streaming_df,
140+
checkpoint_dir=f"test_data/checkpoint/{fg.name}")
141+
142+
assert query
143+
time.sleep(10)
144+
query.stop()
145+
146+
assert fg.select().read().count() == 10
147+
assert fg.get_validation_output().to_pandas() is None
148+
149+
self.clean_up_feature_group(fg)
150+
self.clean_up_transformation(transformation)
151+
self.clean_up_entity(entity)
152+
self.clean_up_feature_store(fs)
153+
154+
def test_feature_group_materialization_with_streaming_data_frame_and_stats(self):
155+
fs = self.define_feature_store_resource().create()
156+
assert fs.oci_fs.id
157+
158+
entity = self.create_entity_resource(fs)
159+
assert entity.oci_fs_entity.id
160+
161+
transformation = self.create_transformation_resource_stream(fs)
162+
streaming_df = get_streaming_df()
163+
164+
fg = entity.create_feature_group(
165+
primary_keys=["User_id"],
166+
schema_details_dataframe=streaming_df,
167+
name=self.get_name("streaming_fg_3"),
168+
transformation_id=transformation.id
169+
)
170+
assert fg.oci_feature_group.id
171+
172+
query = fg.materialise_stream(input_dataframe=streaming_df,
173+
checkpoint_dir=f"test_data/checkpoint/{fg.name}")
174+
175+
assert query
176+
time.sleep(10)
177+
query.stop()
178+
179+
assert fg.select().read().count() == 10
180+
assert fg.get_statistics().to_pandas() is None
181+
182+
self.clean_up_feature_group(fg)
183+
self.clean_up_transformation(transformation)
184+
self.clean_up_entity(entity)
185+
self.clean_up_feature_store(fs)

0 commit comments

Comments
 (0)