Note: This project involves GCP, and as my expertise primarily lies in AWS, my interaction with Google Cloud services may not always be precise.
- dags: Data Pipelines with DAGs and SQL. Also, it includes package modules with all useful scripts.
- data: Input Data. But in DAG I use loading directly from shared folder with input data
- terraform: Scripts for Deploy Services. Made for Google Composer only. Can be extended.
- README.md: Description of project.
- requirements.txt: external Python packages. For local using.
Google Cloud project is here.
I use free trial for communicate with GCP Services. Access shared with artem.p@rounds.com
user.
Note:
I use Google Composer (Full information bellow), but for saving cost the environment is deleted.
I am not sure that you will have enough permission for deploy and run Environment in this project.
If needed we can make brief call for deploying Google Composer and running provided Airflow DAG.
This document details the Airflow DAG create_and_load_tables
data pipeline.
Designed for loading and processing data from Google Drive to Google BigQuery.
It is a critical part of the ETL process, handling various datasets and generating analytical data in BigQuery.
In Google Cloud Platform (GCP), there are several services to create and manage data pipelines that can run Python and SQL scripts. The choice of service depends on your specific data processing and orchestration needs.
During research, I paid attention on the following Services:
- Description: A fully-managed service for stream and batch data processing.
- Use Cases: Suitable for complex, large-scale data processing tasks.
- Languages: Supports Python through Apache Beam. SQL can be used via Beam SQL.
- Link: Cloud Dataflow
- Description: Managed Apache Airflow service for workflow orchestration.
- Use Cases: Ideal for orchestrating complex workflows.
- Languages: Native Python support. SQL tasks can be managed using Airflow operators.
- Link: Cloud Composer
- Description: Data warehouse service with capabilities for running SQL queries.
- Use Cases: Best for data analysis and reporting with SQL. Python scripts can interact with BigQuery.
- Languages: SQL for queries. Python for external script interaction.
- Link: BigQuery
- Description: Serverless platforms for deploying small, single-purpose functions.
- Use Cases: Suitable for lightweight, event-driven processes.
- Languages: Python supported. SQL through connections to Cloud SQL or BigQuery.
- Link: Cloud Functions, Cloud Run
- Description: Managed Hadoop and Spark service.
- Use Cases: Good for data processing workflows fitting the Hadoop/Spark model.
- Languages: PySpark for Python. Spark SQL for SQL scripts.
- Link: Dataproc
Based on workflow's complexity and processing requirements Google Composer Airflow was chosen as the orchestration tool for this pipeline due to several key benefits:
-
Managed Service: Google Composer is a fully managed service that simplifies the setup and maintenance of Airflow environments. It eliminates the need for manual configuration and scaling of the orchestration infrastructure.
-
Integration with GCP Services: As a part of the Google Cloud Platform, Composer has seamless integrations with other GCP services like BigQuery, Google Drive, and Secret Manager. This integration streamlines data workflows and enhances security through unified access management.
-
Scalability: Composer easily scales to handle an increase in workflow complexity and data volume. This scalability is essential for growing data pipelines and evolving business requirements.
-
Reliability and Security: Being a Google Cloud service, Composer inherits strong security protocols and reliable infrastructure, ensuring data safety and high availability.
-
Simplified Workflow Management: Composer's Airflow environment simplifies the process of creating, scheduling, and monitoring workflows. This allows for more focus on data pipeline development rather than infrastructure management.
-
Customization and Flexibility: Composer supports custom plugins and configurations, offering the flexibility to tailor the environment to specific pipeline needs.
-
Community and Support: Airflow has a strong community, ensuring continuous updates and support. Google's support and seamless updates to Composer further enhance this benefit.
By leveraging Google Composer Airflow, the pipeline gains a robust, secure, and scalable orchestration environment that aligns well with the data processing objectives and the broader architecture on the GCP platform.
dag_csv_bq.py - Python script with DAG implementation. git
The DAG is constructed using the TaskFlow API
approach, where the task generate_etl_timestamp is directly linked to each subsequent task utilizing etl_timestamp.
While this results in a somewhat cluttered graph visualization, the underlying logic is consistent and functions as intended.
- DAG ID:
create_and_load_tables
- Owner:
Airflow
- Start Date: 1 day ago from the current date
- Retries:
1
- Retry Delay:
1 minute
- Tags:
['etl']
- Project ID:
datatobq-405917
- Secret ID for Service Account:
csv-unloader-secret
- Google Drive Folder ID:
1-4ElvxoHdIOl3uMjPILdme-YqXpO92Ep
- this is id of shared folder with input data.
-
generate_etl_timestamp: Generates the current UTC timestamp as an ETL timestamp for data tracking. Using in each Task and in Statistics gathering mechanism. Needed for serving merging in statistics data and creating metadata reports.
-
create_table_task_factory: Factory function to create tasks for each table defined in schemas. These tasks handle the creation of BigQuery tables, including dropping existing tables if necessary (except for statistics tables).
-
drive_to_bq_task_factory: Factory function to create tasks for loading CSV data from Google Drive to BigQuery. It includes data processing logic, such as file downloading, loading into BigQuery, updating the ETL timestamp and Statistics gathering for creating quality checks and reports. This is data dataset.
-
analytics_mapping_task_factory: Factory function to create tasks for running SQL queries for analytics mapping. Processes data in BigQuery to generate mapping tables and collects statistics about the operation. It Includes Statistics gathering for creating quality checks and reports. This is analytics dataset level.
-
analytics_task_factory: Similar to the analytics mapping tasks, but focused on creating and loading data into various analytics tables. It Includes Statistics gathering for creating quality checks and reports. This is analytics dataset level.
Statistics (Metadata) (structure description bellow) gathering for creating quality checks and reports based on: useful.build_statistics common function. Note: Can be extended and refactored for more flexible using.
- The DAG starts with the
start
task (an EmptyOperator). - Then moves to the
generate_etl_timestamp
task. - Subsequent dynamic tasks are created using
create_table_task_factory
for each table schema defined. These tasks depend ongenerate_etl_timestamp
. - The
drive_to_bq_task_factory
creates tasks for loading data from Google Drive to BigQuery. These tasks depend on the completion of table creation tasks. - The loaded data from Drive triggers execution of analytics mapping tasks created by
analytics_mapping_task_factory
. - Finally,
analytics_task_factory
generates tasks for further processing and loading of analytics data, dependent on the completion of the mapping tasks.
- The DAG triggers based on its schedule or can be manually triggered. For now, made for manual run only. Schedule time can be updated.
- Initially generates an ETL timestamp used throughout the DAG for data consistency.
- Proceeds to create or replace necessary tables in BigQuery.
- Data from Google Drive is loaded into these tables, where transformation and processing occur.
- Subsequent tasks perform analytics mapping and generate analytical data for business intelligence and analytics processes.
- Uses a service account (with needed permission) configured in GCP Secret Manager for secure authentication.
- Designed to handle retries and failures, ensuring data integrity and consistency.
- Modular task factory design allows scalability and maintainability.
Currently, the solution operates on a full sync basis, which means it entirely truncates and reloads data rather than supporting delta loading or incremental data ingestion. However, this approach can be expanded to include such functionalities in the future.
The Solution includes following tables and datasets:
- app_ratings: Contents data from app_ratings.csv file.
- config_history: Contents data from config_history.csv file.
- country_code_to_name: Contents data from country_code_to_name.csv file.
- country_names_to_code: Contents data from country_names_to_code.csv file.
- package_engagement_metrics: Contents data from package_engagement_metrics.csv file.
- ua_campaign_spendings: Contents data from ua_campaign_spendings.csv file.
- user_installs: Contents data from user_installs.csv file.
- user_subscription_trials: Contents data from user_subscription_trials.csv file.
- statistics_drive_to_bq: Contents Metadata information of tables during loading to data dataset. Detailed description of structure is in description of statistics_analytics tabl.
Example bellow:
- app_user_mapping_hash_key: git
After careful analysis has been figure out that for merging all existing sources (app and user sources) can be used the following fields:
package
date
(DATE type, not TIMESTAMP)version
All these fields have been met in each source and demonstrated excellent data consistency.
Columns:
hash_key
: Unique hash key for mapping (Primary key).package
: App package name.date
: The date of app engagement.version
: App version.etl_timestamp
: The timestamp when ETL process was executed.
- date_campaign_country_mapping_hash_key: git
This mapping help to normalize user_installs table and as result final merged app and user data (merged_app_users table)
Columns:
hash_key
: Unique hash key for the mapping (Primary key).date
: Date of the campaign.campaign
: Campaign name.country
: Country name.etl_timestamp
: The timestamp when ETL process was executed.
app_user_mapping_hash_key and date_campaign_country_mapping_hash_key tables using as mapping/dictionary tables for normalizing dataset 'Analytics'.
3.app_engagement_metrics: git
Columns:
hash_key
: hash key identification (FK) ofapp_user_mapping_hash_key
table.country_code
: The country code of the app usage.sessions
: Number of sessions for the app.avg_session_duration
: Average duration of app sessions.dau
: Daily Active Users.wau
: Weekly Active Users.mau
: Monthly Active Users.etl_timestamp
: The timestamp when ETL process was executed.
- app_rating: git
Columns:
hash_key
: hash key identification (FK) ofapp_user_mapping_hash_key
table.date
: The date of the rating.app_version_code
: The version code of the app.star_rating
: Star rating given to the app.etl_timestamp
: The timestamp when ETL process was executed.
- user_installs: git
Columns:
hash_key
: hash key identification (FK) ofapp_user_mapping_hash_key
table.user_id
: Unique identifier for the user.mobile_brand_name
,mobile_marketing_name
: Mobile brand and marketing names.hash_date_campaign_country
: Hash key for date, campaign, and country mapping. Hash key identification (FK) ofdate_campaign_country_mapping_hash_key
tablesource
: Source of the installation.etl_timestamp
: The timestamp when ETL process was executed.
- user_subscription_trials: git
Columns:
hash_key
: hash key identification (FK) ofapp_user_mapping_hash_key
table.user_id
: Unique identifier for the user.sku
: Stock Keeping Unit.order_id
: Order identifier.install_timestamp
,purchase_timestamp
: Installation and purchase timestamps.etl_timestamp
: The timestamp when ETL process was executed.
- ua_campaign_spendings: git
Columns:
hash_key
: Hash key for the campaign. Hash key identification (FK) ofdate_campaign_country_mapping_hash_key
table.cost_ils
: Cost in Israeli Shekels.etl_timestamp
: The timestamp when ETL process was executed.
- merged_app_users: git
Columns:
hash_key
: Hash key for user-app mapping. Hash key identification (FK) ofapp_user_mapping_hash_key
table.user_id
: Unique identifier for the user.hash_date_campaign_country
: Hash key for date, campaign, and country mapping. Hash key identification (FK) ofdate_campaign_country_mapping_hash_key
table.order_id
: Order identifier.star_rating
: App star rating.country_code
: Country code.sessions
: Number of sessions.avg_session_duration
: Average session duration.dau
,wau
,mau
: User activity metrics.etl_timestamp
: The timestamp when ETL process was executed.
- avg_session_by_country: git
Columns:
country
: Country name.avg_session_duration_by_country
: Average session duration in the country.avg_session_by_country
: Average number of sessions in the country.avg_dau_by_country
: Average Daily Active Users in the country.avg_wau_by_country
: Average Weekly Active Users in the country.avg_mau_by_country
: Average Monthly Active Users in the country.etl_timestamp
: The timestamp when ETL process was executed.
- statistics_analytics table: Contents Metadata information of tables in analytics. git
The statistics tables (data.statistics_drive_to_bq and analytics.statistics_analytics) in BigQuery is designed to store various metrics related to data loading processes. Below is a detailed description of each column in the statistics table.
Columns:
1.table_name
:
- Description: The name of the table to which the statistics pertain.
- Type: String
- Example: 'user_installs'
2.dag_execution_date
:
- Description: The date and time when the DAG (Directed Acyclic Graph) in Airflow was executed.
- Type: Timestamp
- Example: '2023-04-01T12:30:00Z'
3.load_to_bq_timestamp
:
- Description: The timestamp indicating when the data loading to BigQuery was initiated.
- Type: Timestamp
- Example: '2023-04-01T12:35:00Z'
4.etl_timestamp
:
- Description: The timestamp representing when the ETL process was executed.
- Type: Timestamp
- Example: '2023-04-01T12:40:00Z'
5.duration_loading_to_bq_min
- Description: The total duration, in minutes, taken to load the data into the BigQuery table, rounded to two decimal places.
- Type: Float
- Example: 0.25
6.row_loaded_to_bq
- Description: The total number of rows loaded into the BigQuery table.
- Type: Integer
- Example: 1000
7.distinct_row_loaded_to_bq
- Description: The count of distinct rows in the BigQuery table after the data load. It is determined by executing a query that counts distinct combinations of all columns in the table.
- Type: Integer
- Example: 950
These columns collectively provide a comprehensive overview of the data loading process, including timing, count of records, and data uniqueness, which is crucial for monitoring and auditing ETL operations.
Tested on macOS: Deploying takes ~20min. Destroying takes ~6min
Was created Terraform resource for deploying Google Composer only. Can be extended for other resource (like service account)
Guideline:
- Open ./terraform dir in terminal
- Make this script executable:
chmod +x automation.sh
- Need to have installed Terraform package
- Need to have installed gsutil (part og gcloud lib)
- To deploy:
./automation.sh deploy
- To destroy:
./automation.sh destroy
This README.md provides a comprehensive understanding of the DAG's purpose, configuration, and execution flow, ensuring maintainability and ease of use for future modifications and enhancements. Also, BQ tables structure and Deploying process.