- π‘ Project Description
- π Project Structure
- π§© What This Project Does?
- π οΈ Technologies Used
- ποΈ What's DAG?
- π Installation and Execution
- π§© Sales ETL Pipeline
- πΌοΈ DAG Graph View
- ποΈ DAG Configuration
- ποΈ Output Files
- β Sample Output
- β Troubleshooting
- π Notes
- π How to Add a DAG to Apache Airflow
- π€ Contributing
- ποΈ Roadmap
- π§ Questions?
- π©βπ» Author
- π Useful Resources
- β¨ Coding Standards and Best Practices
- π License
- π Wiki
This project uses GitHub Actions to automatically run tests on every push.
CI file: python-tests.yml
β
Ensures code quality
β
Prevents regressions
This project automates the extraction, transformation, and export of sales data using Apache Airflow
. It pulls data from a PostgreSQL database, enriches it with USD to CLP exchange rate information, and exports the final dataset to both a CSV file and a Google Sheet.
The pipeline is designed as a Directed Acyclic Graph (DAG
) to manage task dependencies and ensure a reliable and repeatable workflow.
dag-first-approach/
βββ project_airflow_etl/
β βββ dags/
β β βββ etl_sales_report.py # Airflow DAG definition
β βββ data/
β β βββ dag.csv
β β βββ report.csv # Final report file
β β βββ sales.png # Yearly sales
β βββ logs/ # Airflow logs
β βββ plugins/ # Custom Airflow plugins
β βββ src/
β β βββ etl_modules/ # ETL module scripts
β β βββ __init__.py
β β βββ connection.py
β β βββ enrich.py
β β βββ export.py
β β βββ extract.py
β β βββ generate_sales_plot.py
β β βββ google_sheets.py
β β βββ usd_to_clp.py
β βββ test/ # For testing on Github
β β βββ test_connection.py
β β βββ test_enrich.py
β β βββ test_export.py
β β βββ test_extract.py
β β βββ test_generate_sales_plot.py
β β βββ test_google_sheets.py
β β βββ test_usd_to_clp.py
β βββ airflow.cfg # Airflow configuration file
β βββ airflow.db # Airflow database (SQLite for local use)
β βββ docker-compose.yaml # Docker setup for Airflow
β βββ requirements.txt # Python dependencies
βββ README.md
-
Extracts data from a PostgreSQL database using a custom SQL query.
-
Fetches the current USD to CLP exchange rate from a public API.
-
Enriches the data by converting sales totals from USD to CLP.
-
Exports the final dataset:
-
as a CSV file (
report.csv
) -
to a Google Sheet
-
-
Python
-
Apache Airflow
-
PostgreSQL
-
Google Sheets API
-
Docker (via
docker-compose
) -
Pandas, Requests, Matplotlib
A Directed Acyclic Graph (DAG) is a graph where:
-
Directed: All edges have a direction (from one node to another)
-
Acyclic: No cycles existβyou canβt loop back to a previous node
-
Task scheduling (e.g., Airflow, build systems like Make)
-
Version control systems (
e.g., Git
) -
Data processing pipelines
-
Compilers and expression trees
- Clone the repository:
git clone https://github.com/CamilaJaviera91/dag-first-approach.git
cd dag-first-approach
- Create a Virtual Environment:
python3 -m venv venv
source venv/bin/activate # On Windows: venv\Scripts\activate
- Install the required dependencies:
pip install -r requirements.txt
- Configure Environment Variables:
Create a .env
file in the root directory and add the following:
DB_HOST=your_database_host
DB_PORT=your_database_port
DB_NAME=your_database_name
DB_USER=your_database_user
DB_PASSWORD=your_database_password
DB_SCHEMA=your_database_schema #optional
GOOGLE_SHEET_ID=your_google_sheet_id
GOOGLE_SERVICE_ACCOUNT_FILE=path/to/your/service_account.json
- Initialize the Airflow Database:
airflow db init
-
Set Up Google Sheets API
-
Follow this guide to:
-
Create a project in Google Developers Console.
-
Enable the Google Sheets API and Google Drive API.
-
Download the service account JSON credentials
-
Set the path to this file in
GOOGLE_CREDENTIALS_PATH
.
-
-
Make sure to share your target Google Sheet with the service account email.
-
-
Start Postgres Services:
sudo systemctl start postgresql
- Start Airflow Services:
airflow webserver --port 8080
airflow scheduler
- Access the Airflow Web Interface:
Navigate to http://localhost:8080 in your web browser.
This project defines an Apache Airflow DAG that automates a complete ETL process:
-
π₯ Extracts sales data from a PostgreSQL database.
-
π± Fetches the current USD to CLP exchange rate.
-
π§ͺ Enriches the data by converting USD totals into CLP.
-
π Generates a sales plot by year.
-
πΎ Exports the enriched data to a CSV file.
-
βοΈ Sends the data to Google Sheets for easy access.
This is the task flow as represented in Airflow:
Parameter | Value |
---|---|
DAG ID | sales_etl_dag |
Schedule | @daily |
Catchup | False |
Start Date | 2024-01-01 |
Owner | Camila |
-
data/report.csv
-
data/sales.png
-
Google Spreadsheet:
Sales Report β ReportSheet
year | store | total | total_clp |
---|---|---|---|
2020 | Teno-3 | 1,292,370.99 | 1,219,364,953 |
2020 | Cauquenes-5 | 1,298,515.67 | 1,225,162,520 |
2020 | Villa Alegre-2 | 1,325,040.86 | 1,250,189,302 |
2020 | LongavΓ-9 | 1,353,795.29 | 1,277,319,394 |
2020 | ConstituciΓ³n-4 | 1,353,981.94 | 1,277,495,500 |
-
Connection Errors: Check your database credentials and network access.
-
Google Sheets Permissions: Make sure the service account has access to edit the target sheet.
-
Missing Environment Variables: Ensure
.env
is properly set and loaded.
-
Ensure the database is accessible and credentials are valid
-
The service account must have permission to edit the target Google Sheet
-
You can customize the SQL query, filenames, and sheet names
Follow these steps to add your DAG
to Apache Airflow and make it visible in the Airflow web interface.
- π Place Your DAG in the dags Directory
Airflow loads DAGs from a specific folder, typically located at:
~/airflow/dags/
- If you've changed the path in your
airflow.cfg
(dags_folder
), use that custom directory instead.
- π Create Your DAG File
Create a new Python file inside the dags folder. For example:
~/airflow/dags/my_example_dag.py
- π Restart Airflow Services
After placing your DAG file, restart the Airflow scheduler and webserver:
airflow scheduler
airflow webserver
- π Open the Airflow Web UI
Visit the Airflow UI in your browser:
http://localhost:8080
- You should see your DAG (
my_example_dag
) listed. Enable it and trigger it as needed.
If your DAG doesn't appear:
-
β Ensure the file ends with .py
-
β Make sure dag_id is unique and the syntax is valid
-
β Confirm it's located in the correct dags_folder
-
β Check the Airflow scheduler logs for errors:
airflow scheduler --log-level INFO
Contributions are welcome! Please follow these steps:
-
Fork the repository.
-
Create a new branch:
git checkout -b feature/YourFeatureName
-
Commit your changes:
git commit -m 'Add some feature'
-
Push to the branch:
git push origin feature/YourFeatureName
-
Open a pull request.
- Extraction from PostgreSQL.
- Enrichment with exchange rate.
- Export to CSV and Google Sheets.
- Integration with other data sources (e.g., S3, external APIs).
- Real-time dashboard visualization.
If you get stuck or need help customizing the pipeline, feel free to open an issue or reach out!
Camila Javiera MuΓ±oz Navarro
π LinkedIn
π GitHub
- Modular structure for maintainability
- Separation of concerns in ETL steps
- CI with unit tests using
pytest
- Secure use of
.env
for credentials - Logging and exception handling in DAGs
-
This project is licensed under the MIT License.
-
β οΈ Note: Never commit your.env
orservice_account.json
file. Use.gitignore
and GitHub secrets for CI/CD.