This project implements a serverless data pipeline that actively monitors the quality of financial market data from an API. It provides a robust framework for automated data ingestion, validation, and real-time monitoring, complete with a live dashboard and alerting system.
The pipeline fetches OHLCV (Open, High, Low, Close, Volume) data for BTC/USDT, validates it against a predefined set of data quality rules, and logs the results. This ensures data integrity and reliability for any downstream applications or analysis.
A live snapshot of the monitoring dashboard can be viewed here:
- Automated Data Ingestion: Periodically fetches OHLCV data from the CryptoCompare API.
- Efficient Change Detection: Uses content hashing (
SHA-256
) to identify changes in the source data, preventing redundant validations and conserving resources. - Comprehensive Data Validation: Leverages the Great Expectations framework to perform a suite of data quality checks, ensuring data is accurate, complete, and reliable.
- Real-time Alerting: Automatically sends email notifications via SendGrid when data quality issues or system errors are detected.
- Data Persistence: Stores validation results and summaries in a MongoDB database for historical analysis and trend monitoring.
- Insightful Dashboarding: Results are visualized in a Grafana dashboard, providing at-a-glance insights into data quality metrics, trends, and system health.
- CI/CD and Scheduled Monitoring: Includes GitHub Actions for continuous integration and for triggering the monitoring pipeline on a schedule.
The system is designed as an event-driven, serverless pipeline. A cron job, managed by GitHub Actions, triggers the process on a recurring schedule. This initiates a request to the monitoring endpoint hosted on Render.
┌─────────────────┐ ┌────────────────┐ ┌─────────────────┐
│ GitHub Actions │──────▶│ Render Service │──────▶│ CryptoCompare │
│ (Scheduled Cron)│ │ (API Endpoint) │ │ API (OHLCV Data)│
└─────────────────┘ └────────────────┘ └─────────────────┘
│
▼
┌──────────────────┐
│ Great │
│ Expectations │
│ Validation │
└──────────────────┘
│
┌─────────┴─────────┐
▼ ▼
┌──────────────┐ ┌──────────────┐
│ MongoDB │ │ SendGrid │
│ Atlas │ │ Alerts │
│ │ │ (Email) │
└──────────────┘ └──────────────┘
│
▼
┌──────────────┐
│ Grafana │
│ Cloud │
│ Dashboard │
└──────────────┘
- Backend: Python, Flask
- Data Validation: Great Expectations
- Database: MongoDB
- Alerting: SendGrid
- Deployment: Render
- CI/CD & Automation: GitHub Actions
- Data Visualization: Grafana
- Python 3.11+
- MongoDB Atlas account
- SendGrid account
- Render account
- Grafana Cloud account
-
Clone the repository:
git clone https://github.com/your-username/watchdog-data-pipeline.git cd watchdog-data-pipeline
-
Install dependencies:
pip install -r requirements.txt
-
Configure environment variables: Create a
.env
file in the root directory and populate it with your credentials. You can use.env.example
as a template.cp .env.example .env
-
Run the application locally:
flask run
The application will be available at
http://127.0.0.1:5000
. -
Trigger a validation check: Access
http://127.0.0.1:5000/api/monitor?force=true
in your browser or viacurl
.
This application can be deployed as a Web Service on Render.
- Create a new Web Service on Render and connect it to your forked repository.
- Set the Start Command:
gunicorn app:app
- Add Environment Variables: In the Render dashboard, add the environment variables defined in your
.env
file. - Deploy. Render will automatically deploy your application.
The monitoring pipeline is triggered by a scheduled workflow in GitHub Actions.
- In your GitHub repository, go to Settings > Secrets and variables > Actions.
- Create a new repository secret named
RENDER_ENDPOINT
. - Set the value to your Render service URL (e.g.,
https://your-app-name.onrender.com
). - The workflow in
.github/workflows/data-quality-monitor.yml
will now trigger your deployed service hourly.
The project includes a suite of unit tests. To run them:
pytest -v
To run tests with coverage:
pytest --cov=src
- Update the
TARGET_API_URL
environment variable. - Modify the
ingest_data()
function insrc/validator.py
to correctly parse the new API response. - Adjust the Great Expectations suite in
src/great_expectations/expectations/ohlcv_suite.json
to match the new data schema and quality requirements.
You can change the cron schedule in .github/workflows/data-quality-monitor.yml
:
# Every 15 minutes (for demos)
- cron: '*/15 * * * *'
# Every 30 minutes
- cron: '0,30 * * * *'
# Business hours only (9-5, Mon-Fri)
- cron: '0 9-17 * * 1-5'