Skip to content

[FSTORE-1470] Documentation and Guides for Snowflake Schema #398

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Oct 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
244 changes: 163 additions & 81 deletions docs/user_guides/fs/feature_view/query.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,36 +8,49 @@ The joining functionality is heavily inspired by the APIs used by Pandas to merg

=== "Python"
```python
fs = ...
credit_card_transactions_fg = fs.get_feature_group(name="credit_card_transactions", version=1)
account_details_fg = fs.get_feature_group(name="account_details", version=1)
merchant_details_fg = fs.get_feature_group(name="merchant_details", version=1)

# create a query
feature_join = rain_fg.select_all() \
.join(temperature_fg.select_all(), on=["date", "location_id"]) \
.join(location_fg.select_all())
selected_features = credit_card_transactions_fg.select_all() \
.join(account_details_fg.select_all(), on=["cc_num"]) \
.join(merchant_details_fg.select_all())

# save the query to feature view
feature_view = fs.create_feature_view(
name='rain_dataset',
query=feature_join
version=1,
name='credit_card_fraud',
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add an explicit version to the feature view?
Labels in the feature view?

labels=["is_fraud"],
query=selected_features
)

# retrieve the query back from the feature view
feature_view = fs.get_feature_view(“rain_dataset”, version=1)
feature_view = fs.get_feature_view(“credit_card_fraud”, version=1)
query = feature_view.query
```

=== "Scala"
```scala

val fs = ...
val creditCardTransactionsFg = fs.getFeatureGroup("credit_card_transactions", 1)
val accountDetailsFg = fs.getFeatureGroup(name="account_details", version=1)
val merchantDetailsFg = fs.getFeatureGroup("merchant_details", 1)

// create a query
val featureJoin = (rainFg.selectAll()
.join(temperatureFg.selectAll(), on=Seq("date", "location_id"))
.join(locationFg.selectAll()))
val selectedFeatures = (creditCardTransactionsFg.selectAll()
.join(accountDetailsFg.selectAll(), on=Seq("cc_num"))
.join(merchantDetailsFg.selectAll()))

val featureView = featureStore.createFeatureView()
.name("rain_dataset")
.query(featureJoin)
.name("credit_card_fraud")
.query(selectedFeatures)
.build();

// retrieve the query back from the feature view
val featureView = fs.getFeatureView(“rain_dataset”, 1)
val featureView = fs.getFeatureView(“credit_card_fraud”, 1)
val query = featureView.getQuery()
```

Expand All @@ -53,18 +66,18 @@ Selecting features from a feature group is a lazy operation, returning a query w

=== "Python"
```python
rain_fg = fs.get_feature_group("rain_fg")
credit_card_transactions_fg = fs.get_feature_group("credit_card_transactions")

# Returns Query
feature_join = rain_fg.select(["location_id", "weekly_rainfall"])
selected_features = credit_card_transactions_fg.select(["amount", "latitude", "longitude"])
```

=== "Scala"
```Scala
val rainFg = fs.getFeatureGroup("rain_fg")
val creditCardTransactionsFg = fs.getFeatureGroup("credit_card_transactions")

# Returns Query
val featureJoin = rainFg.select(Seq("location_id", "weekly_rainfall"))
val selectedFeatures = creditCardTransactionsFg.select(Seq("amount", "latitude", "longitude"))
```

#### Join
Expand All @@ -75,35 +88,103 @@ By default, Hopsworks will use the maximal matching subset of the primary keys o
=== "Python"
```python
# Returns Query
feature_join = rain_fg.join(temperature_fg)
selected_features = credit_card_transactions_fg.join(account_details_fg)
```

=== "Scala"
```Scala
// Returns Query
val featureJoin = rainFg.join(temperatureFg)
val selectedFeatures = creditCardTransactionsFg.join(accountDetailsFg)
```
More complex joins are possible by selecting subsets of features from the joined feature groups and by specifying a join key and type.
Possible join types are "inner", "left" or "right". Furthermore, it is possible to specify different features for the join key of the left and right feature group.
The join key lists should contain the names of the features to join on.
Possible join types are "inner", "left" or "right". By default`join_type` is `"left". Furthermore, it is possible to specify different
features for the join key of the left and right feature group. The join key lists should contain the names of the features to join on.

=== "Python"
```python
feature_join = rain_fg.select_all() \
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, to keep join_type her.e

.join(temperature_fg.select_all(), on=["date", "location_id"]) \
.join(location_fg.select_all(), left_on=["location_id"], right_on=["id"], join_type="left")
selected_features = credit_card_transactions_fg.select_all() \
.join(account_details_fg.select_all(), on=["cc_num"]) \
.join(merchant_details_fg.select_all(), left_on=["merchant_id"], right_on=["id"], join_type="inner")
```

=== "Scala"
```scala
val featureJoin = (rainFg.selectAll()
.join(temperatureFg.selectAll(), Seq("date", "location_id"))
.join(locationFg.selectAll(), Seq("location_id"), Seq("id"), "left"))
val selectedFeatures = (creditCardTransactionsFg.selectAll()
.join(accountDetailsFg.selectAll(), Seq("cc_num"))
.join(merchantDetailsFg.selectAll(), Seq("merchant_id"), Seq("id"), "inner"))
```

### Data modeling in Hopsworks

Since v4.0 Hopsworks Feature selection API supports both Star and Snowflake Schema data models.

#### Star schema data model

When choosing Star Schema data model all tables are children of the parent (the left most) feature group, which has all
foreign keys for its child feature groups.

<p align="center">
<figure>
<img src="../../../../assets/images/guides/fs/feature_view/star.png" alt="Star schema data model">
<figcaption>Star schema data model</figcaption>
</figure>
</p>

=== "Python"
```python
selected_features = credit_card_transactions.select_all()
.join(aggregated_cc_transactions.select_all())
.join(account_details.select_all())
.join(merchant_details.select_all())
.join(cc_issuer_details.select_all())
```

!!! error "Nested Joins"
The API currently does not support nested joins. That is joins of joins.
You can fall back to Spark DataFrames to cover these cases. However, if you have to use joins of joins, most likely there is potential to optimise your feature group structure.
In online inference, when you want to retrieve features in your online model, you have to provide all foreign key values,
known as the serving_keys, from the parent feature group to retrieve your precomputed feature values using the feature view.

=== "Python"
```python
feature vector = feature_view.get_feature_vector({
‘cc_num’: “1234 5555 3333 8888”,
‘issuer_id’: 20440455,
‘merchant_id’: 44208484,
‘account_id’: 84403331
})
```

#### Snowflake schema
Hopsworks also provides the possibility to define a feature view that consists of a nested tree of children (to up to a depth of 20)
from the root (left most) feature group. This is called Snowflake Schema data model where you need to build nested tables (subtrees) using joins, and then join the
subtrees to their parents iteratively until you reach the root node (the leftmost feature group in the feature selection):

<p align="center">
<figure>
<img src="../../../../assets/images/guides/fs/feature_view/snowflake.png" alt="Snowflake schema data model">
<figcaption>Snowflake schema data model</figcaption>
</figure>
</p>

=== "Python"
```python
nested_selection = aggregated_cc_transactions.select_all()
.join(account_details.select_all())
.join(cc_issuer_details.select_all())

selected_features = credit_card_transactions.select_all()
.join(nested_selection)
.join(merchant_details.select_all())
```

Now, you have the benefit that in online inference you only need to pass two serving key values (the foreign keys of the leftmost feature group)
to retrieve the precomputed features:

=== "Python"
```python
feature vector = feature_view.get_feature_vector({
‘cc_num’: “1234 5555 3333 8888”,
‘merchant_id’: 44208484,
})
```

#### Filter

Expand All @@ -114,48 +195,48 @@ For the Scala part of the API, equivalent methods are available in the `Feature`

=== "Python"
```python
filtered_rain = rain_fg.filter(rain_fg.location_id == 10)
filtered_credit_card_transactions = credit_card_transactions_fg.filter(credit_card_transactions_fg.category == "Grocery")
```

=== "Scala"
```scala
val filteredRain = rainFg.filter(rainFg.getFeature("location_id").eq(10))
val filteredCreditCardTransactions = creditCardTransactionsFg.filter(creditCardTransactionsFg.getFeature("category").eq("Grocery"))
```

Filters are fully compatible with joins:

=== "Python"
```python
feature_join = rain_fg.select_all() \
.join(temperature_fg.select_all(), on=["date", "location_id"]) \
.join(location_fg.select_all(), left_on=["location_id"], right_on=["id"], join_type="left") \
.filter((rain_fg.location_id == 10) | (rain_fg.location_id == 20))
selected_features = credit_card_transactions_fg.select_all() \
.join(account_details_fg.select_all(), on=["cc_num"]) \
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cc_num -> merchant_id

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

its account_details_fg, cc_num is the foreign key here in credit_card_transactions_fg and id or transaction_id will be pk

.join(merchant_details_fg.select_all(), left_on=["merchant_id"], right_on=["id"]) \
.filter((credit_card_transactions_fg.category == "Grocery") | (credit_card_transactions_fg.category == "Restaurant/Cafeteria"))
```

=== "Scala"
```scala
val featureJoin = (rainFg.selectAll()
.join(temperatureFg.selectAll(), Seq("date", "location_id"))
.join(locationFg.selectAll(), Seq("location_id"), Seq("id"), "left")
.filter(rainFg.getFeature("location_id").eq(10).or(rainFg.getFeature("location_id").eq(20))))
val selectedFeatures = (creditCardTransactionsFg.selectAll()
.join(accountDetailsFg.selectAll(), Seq("cc_num"))
.join(merchantDetailsFg.selectAll(), Seq("merchant_id"), Seq("id"), "left")
.filter(creditCardTransactionsFg.getFeature("category").eq("Grocery").or(creditCardTransactionsFg.getFeature("category").eq("Restaurant/Cafeteria"))))
```

The filters can be applied at any point of the query:

=== "Python"
```python
feature_join = rain_fg.select_all() \
.join(temperature_fg.select_all().filter(temperature_fg.avg_temp >= 22), on=["date", "location_id"]) \
.join(location_fg.select_all(), left_on=["location_id"], right_on=["id"], join_type="left") \
.filter(rain_fg.location_id == 10)
selected_features = credit_card_transactions_fg.select_all() \
.join(accountDetails_fg.select_all().filter(accountDetails_fg.avg_temp >= 22), on=["cc_num"]) \
.join(merchant_details_fg.select_all(), left_on=["merchant_id"], right_on=["id"]) \
.filter(credit_card_transactions_fg.category == "Grocery")
```

=== "Scala"
```scala
val featureJoin = (rainFg.selectAll()
.join(temperatureFg.selectAll().filter(temperatureFg.getFeature("avg_temp").ge(22)), Seq("date", "location_id"))
.join(locationFg.selectAll(), Seq("location_id"), Seq("id"), "left")
.filter(rainFg.getFeature("location_id").eq(10)))
val selectedFeatures = (creditCardTransactionsFg.selectAll()
.join(accountDetailsFg.selectAll().filter(accountDetailsFg.getFeature("avg_temp").ge(22)), Seq("cc_num"))
.join(merchantDetailsFg.selectAll(), Seq("merchant_id"), Seq("id"), "left")
.filter(creditCardTransactionsFg.getFeature("category").eq("Grocery")))
```

#### Joins and/or Filters on feature view query
Expand All @@ -166,23 +247,23 @@ However, this operation will not update the metadata and persist the updated que
=== "Python"
```python
fs = ...
wind_speed_fg = fs.get_feature_group(name="wind_speed_fg", version=1)
rain_fg = fs.get_feature_group(name="rain_fg", version=1)
feature_view = fs.get_feature_view(“rain_dataset”, version=1)
merchant_details_fg = fs.get_feature_group(name="merchant_details", version=1)
credit_card_transactions_fg = fs.get_feature_group(name="credit_card_transactions", version=1)
feature_view = fs.get_feature_view(“credit_card_fraud”, version=1)
feature_view.query \
.join(wind_speed_fg.select_all()) \
.filter((rain_fg.location_id == 54)
.join(merchant_details_fg.select_all()) \
.filter((credit_card_transactions_fg.category == "Cash Withdrawal")
```

=== "Scala"
```scala
val fs = ...
val windSpeedFg = fs.getFeatureGroup("wind_speed_fg", 1)
val rainFg = fs.getFeatureGroup("rain_fg", 1)
val featureView = fs.getFeatureView(“rain_dataset”, 1)
val merchantDetailsFg = fs.getFeatureGroup("merchant_details", 1)
val creditCardTransactionsFg = fs.getFeatureGroup("credit_card_transactions", 1)
val featureView = fs.getFeatureView(“credit_card_fraud”, 1)
featureView.getQuery()
.join(windSpeedFg.selectAll())
.filter(rainFg.getFeature("location_id").eq(54))
.join(merchantDetailsFg.selectAll())
.filter(creditCardTransactionsFg.getFeature("category").eq("Cash Withdrawal"))
```

!!! warning
Expand All @@ -192,45 +273,46 @@ However, this operation will not update the metadata and persist the updated que
=== "Python"
```python
fs = ...
wind_speed_fg = fs.get_feature_group(name="wind_speed_fg", version=1)
solar_irradiance_fg = fs.get_feature_group(name="solar_irradiance_fg", version=1)
rain_fg = fs.get_feature_group(name="rain_fg", version=1)

merchant_details_fg = fs.get_feature_group(name="merchant_details", version=1)
account_details_fg = fs.get_feature_group(name="account_details", version=1)
credit_card_transactions_fg = fs.get_feature_group(name="credit_card_transactions", version=1)

# fetch new feature view and its query instance
feature_view = fs.get_feature_view(“rain_dataset”, version=1)
feature_view = fs.get_feature_view(“credit_card_fraud”, version=1)

# apply join/filter logic based on location and wind speed
feature_view.query.join(wind_speed_fg.select_all()) \
.filter((rain_fg.location_id == 54)
# apply join/filter logic based on purchase type
feature_view.query.join(merchant_details_fg.select_all()) \
.filter((credit_card_transactions_fg.category == "Cash Withdrawal")

# to apply new logic independent of location and wind speed from above
# to apply new logic independent of purchase type from above
# re-fetch new feature view and its query instance
feature_view = fs.get_feature_view(“rain_dataset”, version=1)
feature_view = fs.get_feature_view(“credit_card_fraud”, version=1)

# apply new join/filter logic based on solar irradiance
feature_view.query.join(solar_irradiance_fg.select_all()) \
.filter(solar_irradiance_fg.location_id == 28)
# apply new join/filter logic based on account details
feature_view.query.join(merchant_details_fg.select_all()) \
.filter(account_details_fg.gender == "F")
```

=== "Scala"
```scala
fs = ...
windSpeedFg = fs.getFeatureGroup("wind_speed_fg", 1)
solarIrradianceFg = fs.getFeatureGroup("solar_irradiance_fg", 1)
rainFg = fs.getFeatureGroup("rain_fg", 1)
merchantDetailsFg = fs.getFeatureGroup("merchant_details", 1)
accountDetailsFg = fs.getFeatureGroup("account_details", 1)
creditCardTransactionsFg = fs.getFeatureGroup("credit_card_transactions", 1)

// fetch new feature view and its query instance
val featureView = fs.getFeatureView(“rain_dataset”, version=1)
val featureView = fs.getFeatureView(“credit_card_fraud”, version=1)

// apply join/filter logic based on location and wind speed
featureView.getQuery.join(windSpeedFg.selectAll())
.filter(rainFg.getFeature("location_id").eq(54))
// apply join/filter logic based on purchase type
featureView.getQuery.join(merchantDetailsFg.selectAll())
.filter(creditCardTransactionsFg.getFeature("category").eq("Cash Withdrawal"))

// to apply new logic independent of location and wind speed from above
// to apply new logic independent of purchase type from above
// re-fetch new feature view and its query instance
val featureView = fs.getFeatureView(“rain_dataset”, 1)
val featureView = fs.getFeatureView(“credit_card_fraud”, 1)

// apply new join/filter logic based on solar irradiance
featureView.getQuery.join(solarIrradianceFg.selectAll())
.filter(solarIrradianceFg.getFeature("location_id").eq(28))
// apply new join/filter logic based on account details
featureView.getQuery.join(merchantDetailsFg.selectAll())
.filter(accountDetailsFg.getFeature("gender").eq("F"))
```