> Hey There!, I am Shubham Dalvi
「 I am a data engineer with a passion for big data, distributed computing, cloud solutions, and data visualization 」
✌️ Enjoy solving data problems
❤️ Passionate about big data technologies, cloud platforms, and data visualizations
📧 Reach me: shubhamdworkmail@gmail.com
A production-ready data pipeline using Apache Airflow and DBT for processing CFTC reports.
graph LR
subgraph External_APIs[External APIs & File Formats]
A1[CFTC Trading Reports API<br/>.json<br/>Weekly Friday 3:30 PM ET]
A2[EU Agriculture Price API<br/>.xlsx<br/>Weekly Thursday 06:31 GMT]
end
subgraph Ingestion_DAGs[Data Ingestion Process]
subgraph Hooks[Database Connection Layer]
H[PostgresHook<br/>Connection ID: db<br/>Manages DB Connections]
end
B1[CFTC Weekly Position Loader<br/>pandas DataFrame<br/>to_sql replace strategy]
B2[Eurostat Price Loader<br/>pandas DataFrame<br/>to_sql replace strategy]
end
subgraph Raw_Storage[PostgreSQL Raw Schema]
C1[Raw Trading Positions<br/>05_COT_Legacy_Combined_Report<br/>Weekly Full Refresh]
C2[Raw Wheat Prices<br/>07_Eurostat_Wheat_Prices<br/>Weekly Full Refresh]
end
subgraph DBT_Transformations[DBT Transformation Layer]
D1[Refined Trading Report<br/>01_Refined_COT_Report.sql<br/>Incremental delete+insert]
D2[Refined Price Data<br/>02_Refined_Eurostat.sql<br/>Full table refresh]
end
subgraph Quality_Checks[Data Quality]
Q[Data Quality Gates<br/>- Not Null Checks<br/>- Range Validations<br/>- Unique Keys<br/>- Value Lists]
end
subgraph Final_Result[Analytics Ready Data]
F1[Refined CFTC Trading Data<br/>With Position Percentages]
F2[Refined EU Wheat Prices<br/>Hamburg Exchange Focus]
end
A1 --> B1
A2 --> B2
B1 --> H
B2 --> H
H --> C1
H --> C2
C1 --> D1
C2 --> D2
D1 --> Q
D2 --> Q
Q --> F1
Q --> F2
style A1 fill:#f9f,stroke:#333,stroke-width:2px
style A2 fill:#f9f,stroke:#333,stroke-width:2px
style B1 fill:#bbf,stroke:#333,stroke-width:2px
style B2 fill:#bbf,stroke:#333,stroke-width:2px
style H fill:#faa,stroke:#333,stroke-width:2px
style C1 fill:#dfd,stroke:#333,stroke-width:2px
style C2 fill:#dfd,stroke:#333,stroke-width:2px
style D1 fill:#fdd,stroke:#333,stroke-width:2px
style D2 fill:#fdd,stroke:#333,stroke-width:2px
style Q fill:#dff,stroke:#333,stroke-width:2px
style F1 fill:#ffd,stroke:#333,stroke-width:2px
style F2 fill:#ffd,stroke:#333,stroke-width:2px
classDef subgraphStyle fill:#fff,stroke:#333,stroke-width:2px
class External_APIs,Ingestion_DAGs,Raw_Storage,DBT_Transformations,Quality_Checks,Final_Result,Hooks subgraphStyle
This project implements an automated data pipeline that:
- Ingests CFTC (Commodity Futures Trading Commission) report data
- Processes and transforms the data using DBT
- Runs data quality tests
- Manages the entire workflow using Apache Airflow
- Source: CFTC Public API (publicreporting.cftc.gov)
- Data Type: Commitments of Traders (COT) Reports
- Format: JSON via Socrata API
- Update Frequency: Weekly (Every Friday at 3:30 PM Eastern)
- Raw Table:
raw.05_COT_Legacy_Combined_Report
- Content: Trading positions data including:
- Commercial and non-commercial positions
- Long and short percentages
- Open interest statistics
- Focus on wheat, corn, and soybean markets
- Source: European Commission Agriculture Portal
- Data Type: EU Wheat Price Data
- Format: Excel (.xlsx) file
- Update Frequency: Weekly (Every Thursday at 06:31 London time)
- Raw Table:
raw.07_Eurostat_Wheat_Prices
- Content: Agricultural commodity prices including:
- Regional wheat prices across EU
- Price points from major trading hubs
- Historical price trends
- Focus on Hamburg exchange prices
-
CFTC Trading Positions (
raw.05_COT_Legacy_Combined_Report
):# Using Airflow's PostgresHook and pandas postgres_hook = PostgresHook(postgres_conn_id="db") df = pd.DataFrame.from_records(cftc_api_results) df.to_sql('05_COT_Legacy_Combined_Report', postgres_hook.get_sqlalchemy_engine(), schema='raw', if_exists='replace', index=False)
- Fetched via Socrata API client
- Transformed to pandas DataFrame
- Bulk loaded using SQLAlchemy engine
- Replace strategy for weekly updates
-
Eurostat Prices (
raw.07_Eurostat_Wheat_Prices
):# Using requests and pandas response = requests.get(eurostat_url) df = pd.read_excel(BytesIO(response.content)) df.to_sql('07_Eurostat_Wheat_Prices', postgres_hook.get_sqlalchemy_engine(), schema='raw', if_exists='replace', index=False)
- Downloads Excel file via HTTP
- Direct Excel to DataFrame conversion
- Bulk loaded using SQLAlchemy engine
- Replace strategy for weekly updates
-
Refined Trading Report (
public.01_Refined_COT_Report
):-- Incremental model with delete+insert strategy {{ config( materialized='incremental', unique_key=['date', 'commodity_name'], incremental_strategy='delete+insert' ) }}
- Implements incremental loading
- Ensures data uniqueness
- Handles historical updates
-
Refined Price Data (
public.02_Refined_Eurostat
):-- Full refresh table materialization {{ config( materialized='table' ) }}
- Complete table refresh
- Filters for specific price points
- Focuses on Hamburg exchange data
- Not null constraints
- Value range validations (0-100 for percentages)
- Unique composite keys
- Accepted value lists for commodities
- Apache Airflow: Orchestrates the entire data pipeline
- DBT: Handles data transformation and testing
- PostgreSQL: Stores both raw and transformed data
- Docker: Containerizes all services for consistent deployment
- Docker and Docker Compose
- Python 3.10 or higher
- Git
-
Clone the repository:
git clone <repository-url> cd Docker_Airflow_Postgres-main
-
Create and configure the
.env
file:cp .env.example .env # Edit .env with your configurations
-
Start the services:
docker-compose up -d
-
Access Airflow UI:
- URL: http://localhost:8080
- Default credentials: airflow/airflow
Key environment variables in .env
:
AIRFLOW_UID=50000
AIRFLOW_GID=0
PYTHON_VERSION=3.10
DB_HOST=postgres
DB_USER=airflow
DB_PASSWORD=airflow
DB_PORT=5432
DB_NAME=airflow
DB_SCHEMA=public
-
Airflow Webserver:
- Optimized Gunicorn settings
- Health checks enabled
- Automatic restart on failure
-
DBT Service:
- Custom Dockerfile with required dependencies
- Git integration for package management
- Standardized container naming
-
PostgreSQL:
- Configured for optimal performance
- Proper connection pooling
- Health monitoring enabled
.
├── Airflow/
│ ├── dags/ # Airflow DAG definitions
│ └── logs/ # Airflow logs
├── dsec_dbt/
│ ├── models/ # DBT transformation models
│ ├── tests/ # Data quality tests
│ └── dbt_project.yml # DBT configuration
├── docker/ # Docker configuration files
├── docs/ # Project documentation
├── docker-compose.yml # Service orchestration
└── README.md # This file
- CFTC Report Loader (
06_CFTC_Rreport_Loader.py
):- Ingests CFTC report data
- Runs DBT transformations
- Executes data quality tests
- Scheduled to run weekly
The Airflow logs are organized in a hierarchical structure under the Airflow/logs
directory:
Airflow/logs/
├── dag_id=<dag_name>/ # Specific DAG logs
│ ├── run_id=<run_identifier>/ # Each DAG run
│ │ └── task_id=<task_name>/ # Individual task logs
│ └── ...
├── scheduler/ # Scheduler logs
│ ├── YYYY-MM-DD/ # Date-based logs
│ └── ...
├── dag_processor_manager/ # DAG processing logs
└── ...
-
DAG Run Logs:
- Located in
dag_id=<dag_name>/run_id=<run_identifier>/
- Run IDs include both manual and scheduled runs
- Format:
manual__YYYY-MM-DDTHHMMSS+0000
orscheduled__YYYY-MM-DDTHHMMSS+0000
- Contains all task execution logs for that specific run
- Located in
-
Service Logs:
- Scheduler: Daily logs of task scheduling and DAG processing
- DAG Processor: Logs related to DAG file processing
- Worker: Task execution and resource usage (in worker container)
-
Log Retention:
- Logs are retained based on configuration
- Default retention period: 30 days
- Configurable via environment variables
-
Task Execution Logs:
- Generated by the Airflow worker when executing tasks
- Each task instance creates its own log file
- Captures:
- Task start/end times - Python stdout/stderr output - Custom log messages from task code - Error tracebacks - Operator-specific information (e.g., SQL queries, DBT output)
-
Scheduler Logs:
- Generated by the Airflow scheduler process
- Created in daily rotating files
- Records:
- DAG file processing events - Task scheduling decisions - Task state transitions - Retry attempts - Scheduler heartbeat information
-
DAG Processor Logs:
- Generated when Airflow processes DAG files
- Captures:
- DAG parsing results - Import errors - DAG validation issues - File processing timing - Syntax errors in DAG files
-
Worker Logs:
- Generated by Celery workers (in our case, Docker containers)
- Contains:
- Task execution environment details - Resource usage (CPU, memory) - Container-specific information - Connection pool status
Key settings that affect log generation:
# Logging level for different components
AIRFLOW__LOGGING__LOGGING_LEVEL=INFO # General logging level
AIRFLOW__CELERY__WORKER_LOG_SERVER_PORT=8793 # Worker log server port
# Log format customization
AIRFLOW__LOGGING__LOG_FORMAT=[%%(asctime)s] {%%(filename)s:%%(lineno)d} %%(levelname)s - %%(message)s
AIRFLOW__LOGGING__SIMPLE_LOG_FORMAT=%%(asctime)s %%(levelname)s - %%(message)s
# Log handlers configuration
AIRFLOW__LOGGING__FAB_LOGGING_LEVEL=WARN # Flask AppBuilder logging
AIRFLOW__LOGGING__PROCESSOR_LOG_FOLDER=/opt/airflow/logs/dag_processor_manager
-
Task Handler:
# Example from DAG @task(task_id='example_task') def example_task(**context): # Logs are automatically captured logging.info("Task started") # Task code here logging.error("Error occurred")
-
Custom Logging:
# In your DAG file from airflow.utils.log.logging_mixin import LoggingMixin class CustomOperator(LoggingMixin): def execute(self, context): self.log.info("Custom message") # Your operator code
-
Via Airflow UI:
- Navigate to DAG > Graph View
- Click on task
- Select "View Log"
- Use log navigation to view different runs
-
Via Docker Commands:
# View DAG task logs docker-compose logs airflow-worker # View scheduler logs docker-compose logs airflow-scheduler # View specific container logs docker-compose logs [service-name]
-
Direct File Access:
# Access logs directory docker-compose exec airflow-worker ls -l /opt/airflow/logs/dag_id=<dag_name> # View specific run logs docker-compose exec airflow-worker ls -l /opt/airflow/logs/dag_id=<dag_name>/run_id=<run_id>
Key environment variables for logging:
AIRFLOW__LOGGING__BASE_LOG_FOLDER=/opt/airflow/logs
AIRFLOW__LOGGING__DAG_FILE_PROCESSOR_LOG_RETENTION_DAYS=30
AIRFLOW__LOGGING__LOGGING_LEVEL=INFO
Common issues and solutions are documented in:
docs/debugging_steps.md
: Detailed debugging guidedocs/docker_commands.md
: Useful Docker commands
All services include health checks:
- Airflow Webserver: Gunicorn process monitoring
- DBT: Git availability check
- PostgreSQL: Connection verification
- Redis: Ping check
-
Container Management:
- Use of health checks
- Proper restart policies
- Resource limits configured
-
Network Configuration:
- Standardized network setup
- Internal service discovery
- Proper DNS resolution
-
Resource Management:
- Memory limits configured
- CPU allocation optimized
- Disk space monitoring
-
Error Handling:
- Comprehensive logging
- Retry mechanisms
- Proper cleanup procedures
- Fork the repository
- Create a feature branch
- Submit a pull request
[Your License Here]
For issues and questions:
- Check the debugging guide in `