- Python >=3.8
- Apache Airflow 2.10
- Google Cloud Platform account with required services enabled
This project implements an ETL (Extract, Transform, Load) pipeline using Apache Airflow to process data from the DummyJSON API. The pipeline extracts user, product, and cart data, transforms it according to specified schemas, and loads it into Google Cloud Storage and BigQuery for analysis.
project_root/
├── dags/
│ └── etl_dag.py
├── schemas/
│ ├── users.py
│ ├── products.py
│ └── carts.py
├── scripts/
│ ├── helpers.py
│ └── bg_sql_scripts.py
├── config/
│ └── config.py
├── tests/
│ └── test_schemas.py
├── requirements.txt
└── README.md
- Clone the repository:
git clone <repository-url>
- Create and activate virtual environment:
python -m venv venv
# Linux/macOS
source venv/bin/activate
# Windows
.\venv\Scripts\activate
- Install dependencies:
pip install -r requirements.txt
- Fetches data from DummyJSON API endpoints
- Handles pagination
- Stores raw data locally
- Implements error handling and retries
- Validates and cleans data using Pydantic models
- Flattens nested structures
- Filters products based on price threshold (<50)
- Calculates cart values
- Performs data quality checks
- Uploads raw and cleaned data
- Creates BigQuery tables
- Loads transformed data into BigQuery
- Validates data loads
- User purchase summary query
- Category sales analysis query
- Detailed cart information query
Create a .env
file with:
BUCKET_NAME=
BUCKET_PATH=
BIGQUERY_PROJECT_DATASET=
BASE_FILE_PATH=
GCP_CONNECTION_ID=
Required permissions for GCP service account:
- Storage Object Admin
- BigQuery Data Editor
- BigQuery Job User
Place your GCP service account key in config/gcp.json
Run tests using pytest:
pytest tests/
- Ensure all configuration is set up correctly
- Start Airflow webserver and scheduler:
airflow webserver
airflow scheduler
- Access Airflow UI at http://localhost:8080
- Trigger the DAG named
dummy_pipeline
through the Airflow UI