Skip to content

Commit ce05d89

Browse files
committed
review comments,new Int test for schema mismatch,doc update
1 parent ad65400 commit ce05d89

File tree

4 files changed

+160
-8
lines changed

4 files changed

+160
-8
lines changed

ads/feature_store/common/utils/utility.py

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -264,7 +264,7 @@ def largest_matching_subset_of_primary_keys(left_feature_group, right_feature_gr
264264

265265
def convert_pandas_datatype_with_schema(
266266
raw_feature_details: List[dict], input_df: pd.DataFrame
267-
):
267+
) -> pd.DataFrame:
268268
feature_detail_map = {}
269269
columns_to_remove = []
270270
for feature_details in raw_feature_details:
@@ -280,19 +280,21 @@ def convert_pandas_datatype_with_schema(
280280
.where(pd.notnull(input_df[column]), None)
281281
)
282282
else:
283-
logger.warning("column" + column + "doesnt exist in the input feature details")
283+
logger.warning("column" + column + "doesn't exist in the input feature details")
284284
columns_to_remove.append(column)
285285
return input_df.drop(columns = columns_to_remove)
286286

287287

288-
def validate_spark_dataframe_schema(raw_feature_details: List[dict], input_df: DataFrame):
288+
def convert_spark_dataframe_with_schema(
289+
raw_feature_details: List[dict], input_df: DataFrame
290+
) -> DataFrame:
289291
feature_detail_map = {}
290292
columns_to_remove = []
291293
for feature_details in raw_feature_details:
292294
feature_detail_map[feature_details.get("name")] = feature_details
293295
for column in input_df.columns:
294296
if column not in feature_detail_map.keys():
295-
logger.warning("column" + column + "doesnt exist in the input feature details")
297+
logger.warning("column" + column + "doesn't exist in the input feature details")
296298
columns_to_remove.append(column)
297299

298300
return input_df.drop(*columns_to_remove)
@@ -301,4 +303,4 @@ def validate_spark_dataframe_schema(raw_feature_details: List[dict], input_df: D
301303
def validate_input_feature_details(input_feature_details, data_frame):
302304
if isinstance(data_frame, pd.DataFrame):
303305
return convert_pandas_datatype_with_schema(input_feature_details, data_frame)
304-
return validate_spark_dataframe_schema(input_feature_details, data_frame)
306+
return convert_spark_dataframe_with_schema(input_feature_details, data_frame)

ads/feature_store/docs/source/feature_group.rst

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -173,7 +173,7 @@ The ``.save_expectation()`` method takes the following optional parameter:
173173
174174
Statistics Results
175175
==================
176-
You can call the ``get_statistics()`` method of the FeatureGroup instance to fetch validation results for a specific ingestion job.
176+
You can call the ``get_statistics()`` method of the FeatureGroup instance to fetch statistics for a specific ingestion job.
177177

178178
.. code-block:: python3
179179
@@ -295,7 +295,8 @@ The data will be stored in a data type native to each store. There is an option
295295

296296
Offline data types
297297
###################
298-
Please refer to the following mapping when registering a Spark DataFrame, or a Pandas DataFrame.
298+
Please refer to the following mapping when registering a Spark DataFrame, or a Pandas DataFrame.For spark dataframes we support
299+
all the data types and the ones which are not specified in the following table will be mapped to Offline Feature Type COMPLEX
299300

300301
.. list-table::
301302
:widths: 20 25 25 40

ads/feature_store/docs/source/release_notes.rst

Lines changed: 27 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
=============
44
Release Notes
55
=============
6-
1.0
6+
1.1
77
---
88

99
.. note::
@@ -25,6 +25,32 @@ Release Notes
2525
- Par link expires Jan 5, 2026
2626

2727

28+
Release notes: July 5, 2023
29+
30+
* [FEATURE] Supporting Offline Feature Type COMPLEX
31+
* [[DOCS] Data Type update for Offline Feature Type COMPLEX
32+
33+
1.0
34+
---
35+
36+
.. note::
37+
38+
.. list-table::
39+
:header-rows: 1
40+
41+
* - Package Name
42+
- Latest Version
43+
- Notes
44+
* - Conda pack
45+
- `https://objectstorage.us-ashburn-1.oraclecloud.com/n/bigdatadatasciencelarge/b/service-conda-packs-fs/o/service_pack/cpu/PySpark_3.2_and_Feature_Store/1.0/fspyspark32_p38_cpu_v1#conda`
46+
-
47+
* - SERVICE_VERSION
48+
- 0.1.209.master
49+
-
50+
* - Terraform Stack
51+
- `link <https://objectstorage.us-ashburn-1.oraclecloud.com/p/vZogtXWwHqbkGLeqyKiqBmVxdbR4MK4nyOBqDsJNVE4sHGUY5KFi4T3mOFGA3FOy/n/idogsu2ylimg/b/oci-feature-store/o/beta/terraform/feature-store-terraform.zip>`__
52+
- Par link expires Jan 5, 2026
53+
2854
Release notes: June 15, 2023
2955

3056
* [FEATURE] Included ``FeatureStore``, ``FeatureGroup``, ``Dataset``, ``Entity`` and ``Transformation`` concepts for feature store.
Lines changed: 123 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,123 @@
1+
from pyspark.sql.types import StructType, ShortType, IntegerType, LongType, FloatType, DoubleType, \
2+
BooleanType, StringType, StructField, ByteType, BinaryType, DecimalType
3+
from tests.integration.feature_store.test_base import FeatureStoreTestCase
4+
from ads.feature_store.input_feature_detail import FeatureDetail, FeatureType
5+
from ads.feature_store.feature_group import FeatureGroup
6+
from ads.feature_store.feature_group_job import FeatureGroupJob
7+
import pandas as pd
8+
import numpy as np
9+
import pytest
10+
from ads.feature_store.common.spark_session_singleton import SparkSessionSingleton
11+
12+
13+
class TestInputSchema(FeatureStoreTestCase):
14+
input_feature_details = [
15+
FeatureDetail("A").with_feature_type(FeatureType.STRING).with_order_number(1),
16+
FeatureDetail("B").with_feature_type(FeatureType.INTEGER).with_order_number(2)
17+
]
18+
19+
a = ["value1", "value2"]
20+
b = [25, 60]
21+
c = [30, 50]
22+
pandas_basic_df = pd.DataFrame(
23+
{
24+
"A": a,
25+
"B": b,
26+
"C": c
27+
}
28+
)
29+
30+
schema = StructType(
31+
[StructField("string_col", StringType(), True),
32+
StructField("int_col", IntegerType(), True),
33+
StructField("long_col", LongType(), True)]
34+
)
35+
36+
input_feature_details_spark = [
37+
FeatureDetail("string_col").with_feature_type(FeatureType.STRING).with_order_number(1),
38+
FeatureDetail("int_col").with_feature_type(FeatureType.INTEGER).with_order_number(2),
39+
FeatureDetail("C").with_feature_type(FeatureType.INTEGER).with_order_number(2),
40+
FeatureDetail("B").with_feature_type(FeatureType.INTEGER).with_order_number(2),
41+
]
42+
43+
data = [
44+
("value1", 100, 1000),
45+
("value2", 200, 2000)
46+
]
47+
spark = SparkSessionSingleton(FeatureStoreTestCase.METASTORE_ID).get_spark_session()
48+
basic_df = spark.createDataFrame(data, schema)
49+
50+
def define_feature_group_resource_with_pandas_schema(
51+
self, entity_id, feature_store_id
52+
) -> "FeatureGroup":
53+
feature_group_pandas_array = (
54+
FeatureGroup()
55+
.with_description("feature group resource for pandas array types")
56+
.with_compartment_id(self.COMPARTMENT_ID)
57+
.with_name(self.get_name("feature_group_pandas_array"))
58+
.with_entity_id(entity_id)
59+
.with_feature_store_id(feature_store_id)
60+
.with_primary_keys([])
61+
.with_input_feature_details(self.input_feature_details)
62+
)
63+
return feature_group_pandas_array
64+
65+
def define_feature_group_resource_with_spark_schema(
66+
self, entity_id, feature_store_id
67+
) -> "FeatureGroup":
68+
feature_group_spark_schema = (
69+
FeatureGroup()
70+
.with_description("feature group resource for pandas array types")
71+
.with_compartment_id(self.COMPARTMENT_ID)
72+
.with_name(self.get_name("feature_group_spark_schema"))
73+
.with_entity_id(entity_id)
74+
.with_feature_store_id(feature_store_id)
75+
.with_primary_keys([])
76+
.with_input_feature_details(self.input_feature_details_spark)
77+
)
78+
return feature_group_spark_schema
79+
80+
def test_feature_group_pandas_schema_mismatch(self):
81+
"""Tests pandas schema"""
82+
fs = self.define_feature_store_resource().create()
83+
assert fs.oci_fs.id
84+
85+
entity = self.create_entity_resource(fs)
86+
assert entity.oci_fs_entity.id
87+
88+
feature_group = self.define_feature_group_resource_with_pandas_schema(
89+
entity.oci_fs_entity.id, fs.oci_fs.id
90+
)
91+
feature_group.create()
92+
feature_group.materialise(self.pandas_basic_df)
93+
94+
95+
df = feature_group.select().read()
96+
assert len(df.columns) == 2
97+
98+
self.clean_up_feature_group(feature_group)
99+
self.clean_up_entity(entity)
100+
self.clean_up_feature_store(fs)
101+
102+
def test_feature_group_spark_schema_mismatch(self):
103+
"""Tests pandas date time data types"""
104+
fs = self.define_feature_store_resource().create()
105+
assert fs.oci_fs.id
106+
107+
entity = self.create_entity_resource(fs)
108+
assert entity.oci_fs_entity.id
109+
110+
feature_group = self.define_feature_group_resource_with_spark_schema(
111+
entity.oci_fs_entity.id, fs.oci_fs.id
112+
)
113+
feature_group.create()
114+
feature_group.materialise(self.basic_df)
115+
116+
df = feature_group.select().read()
117+
assert len(df.columns) == 2
118+
119+
self.clean_up_feature_group(feature_group)
120+
self.clean_up_entity(entity)
121+
self.clean_up_feature_store(fs)
122+
123+

0 commit comments

Comments
 (0)