Skip to content
This repository was archived by the owner on Sep 26, 2023. It is now read-only.
Rostyslav Zatserkovnyi edited this page Dec 24, 2018 · 11 revisions

Welcome to the snowplow-snowflake-loader wiki!

Overview

This is a project to load Snowplow enriched events into the Snowflake cloud data warehouse.

Technical architecture

This application consists of two independent apps:

  1. Snowplow Snowflake Transformer, a Spark job which reads Snowplow enriched events from Amazon S3 and writes them back into S3 in a format ready for Snowflake
  2. Snowplow Snowflake Loader, a Scala app which takes the Snowflake-ready data in S3 and loads it into Snowflake

Snowplow Snowflake Transformer

Overview

This is a Spark job which reads Snowplow enriched events from Amazon S3 and writes them back into S3 in a format ready for Snowflake.

This Spark job is written in Scala, and makes use of the Snowplow Scala Analytics SDK as well as a Netflix-based S3 staging committer.

Algorithm

The Transformer:

  • Reads Snowplow enriched events from S3
  • Uses the JSON transformer from the Snowplow Scala Analytics SDK to convert those enriched events into JSONs
  • Writes those enriched event JSONs back to S3 as newline-delimited gzipped files. This step uses a custom staging committer which accelerates writing to S3 from Spark by writing task outputs to a temporary directory on the local FS rather than S3.
  • Keeps track of which folders it has processed using the Snowplow Scala Analytics SDK's DynamoDB manifest functionality

Snowplow Snowflake Loader

Overview

This is a Scala app which takes the Snowflake-ready data in S3 loads it into Snowflake.

Algorithm

The algorithm is as follows:

  • Check for new transformed folders in S3, generated by Snowplow Snowflake Transformer, which haven't yet been loaded into Snowflake
  • Check for any new JSON properties (unstruct_events, contexts or derived_contexts) in the transformed files which are not yet present in the Snowflake database table
  • Add these new JSON properties to the table as large VARCHAR fields
  • Perform the load from S3 into Snowflake, making sure to load all of the existing and new JSON properties

Outstanding questions

  1. How do we check if a given folder in S3 has been loaded into S3 yet - some kind of manifest in DynamoDB or in Snowflake (ideal if it's in Snowflake and can be written as part of the load transaction)? Maybe some Kinesis stream of commands for the Loader to consume?
  2. How do we make sure that a new transformed folder in S3 is finalized (both in terms of eventual consistency and checking that the Snowplow Snowflake Transformer is not still running)
  3. How should we check for new JSON properties in the transformed files which are not yet present in the Snowflake database table - perhaps the Snowplow Snowflake Transformer writes out an inventory of all the JSON properties it found, and the Snowplow Snowflake Loader cross-checks this against the existing properties that are found in the Snowflake enriched events table

Other resources

We will need to do an initial port of the Snowplow enriched event to Snowflake, similar to the representation of atomic.events in Redshift.

Loading steps

  1. Transform data, add folder to manifest.
  2. Alter table - add new columns found in manifest
  3. Create temporary table snowplow_tmp_2017_10_27_23_10_45 with single enriched_data column of VARIANT type to load whole enriched data. From docs: "A temporary table persists only for the duration of the user session in which it was created and is not visible to other users. A temporary table and all its contents are dropped at the end of the session."
  4. Load data from stage to temporary table
  5. Insert data. Primary loading method. Statements looks like following:
INSERT INTO atomic.events(app_id,platform,etl_tstamp...)                                                                   -- Specify order of columns
  SELECT enriched_data:app_id::VARCHAR(255), enriched_data:platform::VARCHAR(255), enriched_data:etl_tstamp::TIMESTAMP...  -- Query and cast columns
  FROM atomic.snowplow_tmp_2017_10_27_23_10_45;
Clone this wiki locally