This repository was archived by the owner on Sep 26, 2023. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 9
Home
Rostyslav Zatserkovnyi edited this page Dec 24, 2018
·
11 revisions
Welcome to the snowplow-snowflake-loader wiki!
This is a project to load Snowplow enriched events into the Snowflake cloud data warehouse.
This application consists of two independent apps:
- Snowplow Snowflake Transformer, a Spark job which reads Snowplow enriched events from Amazon S3 and writes them back into S3 in a format ready for Snowflake
- Snowplow Snowflake Loader, a Scala app which takes the Snowflake-ready data in S3 and loads it into Snowflake
This is a Spark job which reads Snowplow enriched events from Amazon S3 and writes them back into S3 in a format ready for Snowflake.
This Spark job is written in Scala, and makes use of the Snowplow Scala Analytics SDK as well as a Netflix-based S3 staging committer.
The Transformer:
- Reads Snowplow enriched events from S3
- Uses the JSON transformer from the Snowplow Scala Analytics SDK to convert those enriched events into JSONs
- Writes those enriched event JSONs back to S3 as newline-delimited gzipped files. This step uses a custom staging committer which accelerates writing to S3 from Spark by writing task outputs to a temporary directory on the local FS rather than S3.
- Keeps track of which folders it has processed using the Snowplow Scala Analytics SDK's DynamoDB manifest functionality
This is a Scala app which takes the Snowflake-ready data in S3 loads it into Snowflake.
The algorithm is as follows:
- Check for new transformed folders in S3, generated by Snowplow Snowflake Transformer, which haven't yet been loaded into Snowflake
- Check for any new JSON properties (unstruct_events, contexts or derived_contexts) in the transformed files which are not yet present in the Snowflake database table
- Add these new JSON properties to the table as large VARCHAR fields
- Perform the load from S3 into Snowflake, making sure to load all of the existing and new JSON properties
- How do we check if a given folder in S3 has been loaded into S3 yet - some kind of manifest in DynamoDB or in Snowflake (ideal if it's in Snowflake and can be written as part of the load transaction)? Maybe some Kinesis stream of commands for the Loader to consume?
- How do we make sure that a new transformed folder in S3 is finalized (both in terms of eventual consistency and checking that the Snowplow Snowflake Transformer is not still running)
- How should we check for new JSON properties in the transformed files which are not yet present in the Snowflake database table - perhaps the Snowplow Snowflake Transformer writes out an inventory of all the JSON properties it found, and the Snowplow Snowflake Loader cross-checks this against the existing properties that are found in the Snowflake enriched events table
We will need to do an initial port of the Snowplow enriched event to Snowflake, similar to the representation of atomic.events in Redshift.
- Transform data, add folder to manifest.
- Alter table - add new columns found in manifest
- Create temporary table
snowplow_tmp_2017_10_27_23_10_45
with singleenriched_data
column ofVARIANT
type to load whole enriched data. From docs: "A temporary table persists only for the duration of the user session in which it was created and is not visible to other users. A temporary table and all its contents are dropped at the end of the session." - Load data from stage to temporary table
- Insert data. Primary loading method. Statements looks like following:
INSERT INTO atomic.events(app_id,platform,etl_tstamp...) -- Specify order of columns
SELECT enriched_data:app_id::VARCHAR(255), enriched_data:platform::VARCHAR(255), enriched_data:etl_tstamp::TIMESTAMP... -- Query and cast columns
FROM atomic.snowplow_tmp_2017_10_27_23_10_45;