A data engineering project for managing, transforming, and analyzing COVID-19 datasets using dbt, Dagster, and Python.
- Data Ingestion: Collects and stores COVID-19 data from various sources.
- Transformation: Uses dbt for data modeling and transformation.
- Orchestration: Leverages Dagster for workflow orchestration and asset management.
- Testing: Includes unit and integration tests for data pipelines.
-
Python
The primary programming language for data processing, orchestration, and utility scripts. -
dbt (Data Build Tool)
Used for data modeling, transformation, and analytics engineering with SQL. -
Dagster
Orchestration framework for building, running, and monitoring data pipelines and assets. -
DuckDB
Embedded analytical database used for fast, in-process data storage and querying. -
Pandas
Python library for data manipulation and analysis. -
Docker
Containerization platform to package and run the application in isolated environments. -
Docker Compose
Tool for defining and running multi-container Docker applications. -
pytest
Testing framework for writing and running unit and integration tests.
covid_data/
├── data/
│ └── .keep
├── dbt_covid_data/
│ ├── dbt_project.yml
│ ├── profiles.yml
│ ├── analysis/
│ │ └── top_5_values_analysis.sql
│ ├── logs/
│ │ └── dbt.log
│ ├── macros/
│ │ ├── get_top_5_values_with_frequency.sql
│ │ ├── safe_cast_to_number.sql
│ │ └── table_exists.sql
│ ├── models/
│ │ ├── dim_calendar.sql
│ │ ├── dim_locations.sql
│ │ ├── fct_covid_global_metrics.sql
│ │ ├── fct_covid_us_metrics.sql
│ │ ├── models.yml
│ │ └── sources.yml
│ ├── seeds/
│ │ ├── calendar.csv
│ │ └── seeds.yml
├── dagster_covid_data/
│ ├── __init__.py
│ ├── assets/
│ │ ├── __init__.py
│ │ ├── analysis.py
│ │ ├── dbt_assets.py
│ │ └── ingestion/
│ │ ├── csse_covid_19_daily_reports.py
│ │ ├── csse_covid_19_time_series.py
│ │ ├── uid_iso_fips_lookup_table.py
│ │ └── who_covid_19_situation_reports.py
│ └── utils/
│ ├── __init__.py
│ ├── common.py
│ ├── data.py
│ └── duckdb.py
├── tests/
│ ├── test_utils_common.py
│ ├── test_utils_data.py
│ └── test_utils_duckdb.py
├── requirements.txt
├── Dockerfile
├── docker-compose.yml
├── .gitignore
├── settings.py
└── README.md
-
requirements.txt
Lists Python dependencies required for the project. -
Dockerfile
Instructions to build a Docker image for the project. -
docker-compose.yml
Defines multi-container Docker applications and service orchestration. -
.gitignore
Specifies files and directories to be ignored by git. -
settings.py
Central configuration for paths, resources, and global settings. -
README.md
Project documentation and usage instructions.
- .keep
Placeholder to ensure the data directory is tracked by git.
-
dbt_project.yml
Main dbt project configuration file. -
profiles.yml
dbt profile for connection settings (usually in~/.dbt/
, but included here for reference).
- top_5_values_analysis.sql
Ad-hoc analysis SQL file for top 5 values per column in key tables.
- dbt.log
Log file generated by dbt runs.
- get_top_5_values_with_frequency.sql
Macro to get top 5 values and their frequency for each column. - safe_cast_to_number.sql
Macro to safely cast values to numbers. - table_exists.sql
Macro to check if a table exists.
- dim_calendar.sql
Calendar dimension model. - dim_locations.sql
Locations dimension model. - fct_covid_global_metrics.sql
Fact table for global COVID-19 metrics. - fct_covid_us_metrics.sql
Fact table for US COVID-19 metrics. - models.yml
Model-level metadata/configuration. - sources.yml
Source definitions for raw data.
- calendar.csv
Calendar seed data. - seeds.yml
Seed metadata/configuration.
- init.py
Package initializer.
- init.py
Asset package initializer. - analysis.py
Assets for analytical queries (e.g., top 5 common values). - dbt_assets.py
Asset for running dbt models via Dagster.
- csse_covid_19_daily_reports.py
Ingests daily reports from CSSE. - csse_covid_19_time_series.py
Ingests time series data from CSSE. - uid_iso_fips_lookup_table.py
Ingests UID/ISO/FIPS lookup table. - who_covid_19_situation_reports.py
Ingests WHO situation reports.
- init.py
Utility package initializer. - common.py
Common helper functions (e.g., date generation). - data.py
Data cleaning and schema utilities. - duckdb.py
DuckDB database interaction utilities.
- test_utils_common.py
Unit tests forcommon.py
. - test_utils_data.py
Unit tests fordata.py
. - test_utils_duckdb.py
Unit tests forduckdb.py
.
- Clone the repository:
git clone https://github.com/allen-morales/covid_data.git cd covid_data
Start all services:
docker compose up
This will build and start the containers as defined in docker-compose.yml
.
- Install dependencies:
pip install -r requirements.txt
- Start Dagster development server:
dagster dev
- Run dbt commands:
cd dbt_covid_data dbt run
- What are the top 5 most common values in a particular column, and what is their frequency?
- How does a particular metric change over time within the dataset?
- Is there a correlation between two specific columns? Explain your findings.
-
Daily Operational Use:
The pipeline is designed to operate on a daily schedule, ingesting, transforming, and analyzing new COVID-19 data each day. It is assumed that the system will process data partitions corresponding to each calendar day, and that a comprehensive report will be generated at the end of every day. This ensures that stakeholders have access to up-to-date metrics and insights on a daily basis, supporting timely decision-making and monitoring of trends as they develop.This assumption also helps in isolating data inconsistencies, such as days where the column structure differs from the expected schema.
-
Stable Daily Data Volume:
It is assumed that the volume of data ingested each day will remain relatively stable and will not increase significantly over time. This ensures that the pipeline can process daily data efficiently without performance degradation or the need for frequent scaling of resources. If the daily data volume were to grow unexpectedly, adjustments to storage, processing, or infrastructure may be required. -
Development Mode for Demonstration:
For demonstration purposes, the pipeline is run in development mode (e.g., using Dagster'sdagster dev
server). This mode is intended for testing, exploration, and showcasing the pipeline's capabilities, rather than for production workloads. As such, certain configurations (such as logging, error handling, and resource scaling) may be less robust than in a production environment. It is assumed that, for actual deployment, the pipeline will be configured and hardened according to production best practices (e.g. provisioning SQL Database for Dagster's storage, Network setup, etc.).