Skip to content

This project automates the extraction, transformation, and export of sales data from a PostgreSQL database, enhances the data with exchange rate information, and exports the results in CSV and Google Sheets formats. It uses a Directed Acyclic Graph (DAG) to manage task dependencies and execute them in order.

Notifications You must be signed in to change notification settings

CamilaJaviera91/dag-first-approach

Folders and files

NameName
Last commit message
Last commit date

Latest commit

Β 
Β 
Β 
Β 
Β 
Β 
Β 

Repository files navigation

🧭 Table of Contents


πŸ§ͺ Test Badge

Python Tests


πŸ§ͺ Continuous Integration (CI)

This project uses GitHub Actions to automatically run tests on every push.

CI file: python-tests.yml

βœ… Ensures code quality
βœ… Prevents regressions


πŸ’‘ DAG-Based ETL Pipeline for Sales Reporting

🧠 Project Description:

This project automates the extraction, transformation, and export of sales data using Apache Airflow. It pulls data from a PostgreSQL database, enriches it with USD to CLP exchange rate information, and exports the final dataset to both a CSV file and a Google Sheet.


The pipeline is designed as a Directed Acyclic Graph (DAG) to manage task dependencies and ensure a reliable and repeatable workflow.


πŸš€ Project Structure:

dag-first-approach/
β”œβ”€β”€ project_airflow_etl/

β”‚   β”œβ”€β”€ dags/
β”‚   β”‚   └── etl_sales_report.py       # Airflow DAG definition
β”‚   β”œβ”€β”€ data/
β”‚   β”‚   β”œβ”€β”€ dag.csv
β”‚   β”‚   β”œβ”€β”€ report.csv                # Final report file
β”‚   β”‚   └── sales.png                 # Yearly sales
β”‚   β”œβ”€β”€ logs/                         # Airflow logs
β”‚   β”œβ”€β”€ plugins/                      # Custom Airflow plugins
β”‚   β”œβ”€β”€ src/
β”‚   β”‚   └── etl_modules/              # ETL module scripts
β”‚   β”‚       β”œβ”€β”€ __init__.py
β”‚   β”‚       β”œβ”€β”€ connection.py
β”‚   β”‚       β”œβ”€β”€ enrich.py
β”‚   β”‚       β”œβ”€β”€ export.py
β”‚   β”‚       β”œβ”€β”€ extract.py
β”‚   β”‚       β”œβ”€β”€ generate_sales_plot.py
β”‚   β”‚       β”œβ”€β”€ google_sheets.py
β”‚   β”‚       └── usd_to_clp.py
β”‚   β”œβ”€β”€ test/                         # For testing on Github
β”‚   β”‚   β”œβ”€β”€ test_connection.py
β”‚   β”‚   β”œβ”€β”€ test_enrich.py
β”‚   β”‚   β”œβ”€β”€ test_export.py
β”‚   β”‚   β”œβ”€β”€ test_extract.py
β”‚   β”‚   β”œβ”€β”€ test_generate_sales_plot.py
β”‚   β”‚   β”œβ”€β”€ test_google_sheets.py
β”‚   β”‚   └── test_usd_to_clp.py
β”‚   β”œβ”€β”€ airflow.cfg                   # Airflow configuration file
β”‚   β”œβ”€β”€ airflow.db                    # Airflow database (SQLite for local use)
β”‚   β”œβ”€β”€ docker-compose.yaml           # Docker setup for Airflow
β”‚   β”œβ”€β”€ requirements.txt              # Python dependencies
└── README.md

🧩 What This Project Does?:

  • Extracts data from a PostgreSQL database using a custom SQL query.

  • Fetches the current USD to CLP exchange rate from a public API.

  • Enriches the data by converting sales totals from USD to CLP.

  • Exports the final dataset:

    • as a CSV file (report.csv)

    • to a Google Sheet


πŸ› οΈ Technologies Used:

  • Python

  • Apache Airflow

  • PostgreSQL

  • Google Sheets API

  • Docker (via docker-compose)

  • Pandas, Requests, Matplotlib


πŸ—‚οΈ What's DAG?:

A Directed Acyclic Graph (DAG) is a graph where:

  1. Directed: All edges have a direction (from one node to another)

  2. Acyclic: No cycles existβ€”you can’t loop back to a previous node


πŸ“‹ Common Uses of DAGs:

  • Task scheduling (e.g., Airflow, build systems like Make)

  • Version control systems (e.g., Git)

  • Data processing pipelines

  • Compilers and expression trees


πŸš€ Installation and Execution:

  1. Clone the repository:
git clone https://github.com/CamilaJaviera91/dag-first-approach.git
cd dag-first-approach
  1. Create a Virtual Environment:
python3 -m venv venv
source venv/bin/activate # On Windows: venv\Scripts\activate
  1. Install the required dependencies:
pip install -r requirements.txt
  1. Configure Environment Variables:

Create a .env file in the root directory and add the following:

DB_HOST=your_database_host
DB_PORT=your_database_port
DB_NAME=your_database_name
DB_USER=your_database_user
DB_PASSWORD=your_database_password
DB_SCHEMA=your_database_schema #optional

GOOGLE_SHEET_ID=your_google_sheet_id
GOOGLE_SERVICE_ACCOUNT_FILE=path/to/your/service_account.json
  1. Initialize the Airflow Database:
airflow db init
  1. Set Up Google Sheets API

    • Follow this guide to:

      1. Create a project in Google Developers Console.

      2. Enable the Google Sheets API and Google Drive API.

      3. Download the service account JSON credentials

      4. Set the path to this file in GOOGLE_CREDENTIALS_PATH.

    • Make sure to share your target Google Sheet with the service account email.

  2. Start Postgres Services:

sudo systemctl start postgresql
  1. Start Airflow Services:
airflow webserver --port 8080
airflow scheduler
  1. Access the Airflow Web Interface:

Navigate to http://localhost:8080 in your web browser.


🧩 Sales ETL Pipeline

This project defines an Apache Airflow DAG that automates a complete ETL process:

  • πŸ“₯ Extracts sales data from a PostgreSQL database.

  • πŸ’± Fetches the current USD to CLP exchange rate.

  • πŸ§ͺ Enriches the data by converting USD totals into CLP.

  • πŸ“Š Generates a sales plot by year.

  • πŸ’Ύ Exports the enriched data to a CSV file.

  • ☁️ Sends the data to Google Sheets for easy access.


πŸ–ΌοΈ DAG Graph View

This is the task flow as represented in Airflow:

DAG Screenshot


πŸ—“οΈ DAG Configuration

Parameter Value
DAG ID sales_etl_dag
Schedule @daily
Catchup False
Start Date 2024-01-01
Owner Camila

πŸ—‚οΈ Output Files:

  • data/report.csv

  • data/sales.png

  • Google Spreadsheet: Sales Report β†’ ReportSheet


βœ… Sample Output:

πŸ“Š sales.png

sales

πŸ“‚ report.csv

year store total total_clp
2020 Teno-3 1,292,370.99 1,219,364,953
2020 Cauquenes-5 1,298,515.67 1,225,162,520
2020 Villa Alegre-2 1,325,040.86 1,250,189,302
2020 LongavΓ­-9 1,353,795.29 1,277,319,394
2020 ConstituciΓ³n-4 1,353,981.94 1,277,495,500

πŸ“„ Sales Report (GoogleSheets)


❗Troubleshooting:

  • Connection Errors: Check your database credentials and network access.

  • Google Sheets Permissions: Make sure the service account has access to edit the target sheet.

  • Missing Environment Variables: Ensure .env is properly set and loaded.


πŸ“ Notes:

  • Ensure the database is accessible and credentials are valid

  • The service account must have permission to edit the target Google Sheet

  • You can customize the SQL query, filenames, and sheet names


πŸ“˜ How to Add a DAG to Apache Airflow and Display It in the Webserver

Follow these steps to add your DAG to Apache Airflow and make it visible in the Airflow web interface.

  1. πŸ“‚ Place Your DAG in the dags Directory

Airflow loads DAGs from a specific folder, typically located at:

~/airflow/dags/
  • If you've changed the path in your airflow.cfg (dags_folder), use that custom directory instead.
  1. πŸ“ Create Your DAG File

Create a new Python file inside the dags folder. For example:

~/airflow/dags/my_example_dag.py
  1. πŸ” Restart Airflow Services

After placing your DAG file, restart the Airflow scheduler and webserver:

airflow scheduler
airflow webserver
  1. 🌐 Open the Airflow Web UI

Visit the Airflow UI in your browser:

http://localhost:8080
  • You should see your DAG (my_example_dag) listed. Enable it and trigger it as needed.

πŸ› οΈ Troubleshooting in how to Add a DAG to Apache Airflow

If your DAG doesn't appear:

  • βœ… Ensure the file ends with .py

  • βœ… Make sure dag_id is unique and the syntax is valid

  • βœ… Confirm it's located in the correct dags_folder

  • βœ… Check the Airflow scheduler logs for errors:

airflow scheduler --log-level INFO

🀝 Contributing:

Contributions are welcome! Please follow these steps:

  • Fork the repository.

  • Create a new branch: git checkout -b feature/YourFeatureName

  • Commit your changes: git commit -m 'Add some feature'

  • Push to the branch: git push origin feature/YourFeatureName

  • Open a pull request.


πŸ—’οΈ Roadmap

  • Extraction from PostgreSQL.
  • Enrichment with exchange rate.
  • Export to CSV and Google Sheets.
  • Integration with other data sources (e.g., S3, external APIs).
  • Real-time dashboard visualization.

πŸ“§ Questions?:

If you get stuck or need help customizing the pipeline, feel free to open an issue or reach out!


πŸ‘©β€πŸ’» Author:

Camila Javiera MuΓ±oz Navarro
πŸ”— LinkedIn
πŸ™ GitHub


πŸ“š Useful Resources


✨ Coding Standards and Best Practices

  • Modular structure for maintainability
  • Separation of concerns in ETL steps
  • CI with unit tests using pytest
  • Secure use of .env for credentials
  • Logging and exception handling in DAGs

πŸ“„ License:

  • This project is licensed under the MIT License.

  • ⚠️ Note: Never commit your .env or service_account.json file. Use .gitignore and GitHub secrets for CI/CD.


πŸ“š Wiki

About

This project automates the extraction, transformation, and export of sales data from a PostgreSQL database, enhances the data with exchange rate information, and exports the results in CSV and Google Sheets formats. It uses a Directed Acyclic Graph (DAG) to manage task dependencies and execute them in order.

Topics

Resources

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published

Languages