Skip to content

Feature/prometheus metrics #27

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 9 commits into from
Jun 20, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Empty file removed .github/workflows/.gitkeep
Empty file.
29 changes: 29 additions & 0 deletions .github/workflows/python-tests.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
name: Python CI

on:
push:
branches: [ main, dev ]
pull_request:
branches: [ main, dev ]

jobs:
test:
runs-on: ubuntu-latest

steps:
- name: Checkout code
uses: actions/checkout@v4

- name: Set up Python
uses: actions/setup-python@v5
with:
python-version: '3.10'

- name: Install dependencies
run: |
python -m pip install --upgrade pip
if [ -f requirements.txt ]; then pip install -r requirements.txt; fi

- name: Run unit tests
run: |
python -m unittest discover -s tests/unit -p "*.py" -v
Empty file removed config/.gitkeep
Empty file.
Empty file removed data/samples/.gitkeep
Empty file.
Empty file removed data/schemas/.gitkeep
Empty file.
2 changes: 2 additions & 0 deletions datagen/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ RUN pip install -r requirements.txt

CMD [ "python", "./kafka_push.py" ]

EXPOSE 9101




Expand Down
35 changes: 33 additions & 2 deletions datagen/kafka_push.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,46 @@
from kafka import KafkaProducer, KafkaConsumer
from data_generator import RandomDataGenerator
import json
from prometheus_client import start_http_server, Counter, Histogram, Gauge, Summary
import time

# Métricas Prometheus ampliadas
MESSAGES_SENT = Counter('kafka_messages_sent_total', 'Total mensajes enviados a Kafka')
SEND_ERRORS = Counter('kafka_send_errors_total', 'Errores al enviar mensajes a Kafka')
GEN_TIME = Histogram('kafka_message_generation_seconds', 'Tiempo en generar un mensaje aleatorio')
SEND_TIME = Histogram('kafka_message_send_seconds', 'Tiempo en enviar un mensaje a Kafka')
MESSAGE_SIZE = Histogram('kafka_message_size_bytes', 'Tamaño de los mensajes enviados a Kafka')
MESSAGES_IN_PROGRESS = Gauge('kafka_messages_in_progress', 'Mensajes en proceso de envío')
LAST_SEND_STATUS = Gauge('kafka_last_send_status', 'Último estado de envío (1=OK, 0=Error)')
SEND_SUMMARY = Summary('kafka_send_summary_seconds', 'Resumen de tiempos de envío')
SERIALIZE_TIME = Histogram('kafka_message_serialize_seconds', 'Tiempo en serializar un mensaje a JSON')

# Inicia el endpoint Prometheus en el puerto 9101
start_http_server(9101)

def make_producer(host):
producer = KafkaProducer(bootstrap_servers=[host], value_serializer=lambda x: json.dumps(x).encode("utf-8"))
return producer

def send_random(producer):
for msg in RandomDataGenerator():
producer.send("probando", msg)
print("Sent")
with GEN_TIME.time():
try:
with SERIALIZE_TIME.time():
msg_bytes = json.dumps(msg).encode("utf-8")
MESSAGE_SIZE.observe(len(msg_bytes))
MESSAGES_IN_PROGRESS.inc()
with SEND_TIME.time(), SEND_SUMMARY.time():
producer.send("probando", msg)
MESSAGES_SENT.inc()
LAST_SEND_STATUS.set(1)
print("Sent")
except Exception as e:
SEND_ERRORS.inc()
LAST_SEND_STATUS.set(0)
print(f"Error enviando mensaje: {e}")
finally:
MESSAGES_IN_PROGRESS.dec()

def main():
producer = make_producer(host="kafka:9092")
Expand Down
Binary file modified datagen/requirements.txt
Binary file not shown.
26 changes: 26 additions & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,31 @@ services:
- kafka_network
restart: always

prometheus:
image: prom/prometheus:latest
container_name: prometheus
volumes:
- ./monitoring/prometheus/prometheus.yml:/etc/prometheus/prometheus.yml
command:
- '--config.file=/etc/prometheus/prometheus.yml'
ports:
- "9090:9090"
networks:
- kafka_network

grafana:
image: grafana/grafana:latest
container_name: grafana
ports:
- "3000:3000"
volumes:
- ./monitoring/grafana:/var/lib/grafana
- ./monitoring/grafana/provisioning:/etc/grafana/provisioning
depends_on:
- prometheus
networks:
- kafka_network

redis:
image: redis:latest
container_name: redis_kafka
Expand All @@ -111,6 +136,7 @@ services:
- kafka_network
restart: always


volumes:
mongo_data:

Expand Down
Empty file removed docs/.gitkeep
Empty file.
1 change: 1 addition & 0 deletions monitoring/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@

10 changes: 10 additions & 0 deletions monitoring/grafana/provisioning/dashboards/dashboards.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
apiVersion: 1
providers:
- name: 'default'
orgId: 1
folder: ''
type: file
disableDeletion: false
updateIntervalSeconds: 10
options:
path: /etc/grafana/provisioning/dashboards
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
{
"id": null,
"title": "Full Pipeline Metrics",
"tags": [],
"timezone": "browser",
"schemaVersion": 36,
"version": 1,
"refresh": "5s",
"panels": [
{
"type": "stat",
"title": "Kafka Messages Produced",
"targets": [
{ "expr": "kafka_messages_produced_total", "refId": "A" }
],
"gridPos": { "x": 0, "y": 0, "w": 6, "h": 3 }
},
{
"type": "stat",
"title": "Kafka Produce Errors",
"targets": [
{ "expr": "kafka_produce_errors_total", "refId": "B" }
],
"gridPos": { "x": 6, "y": 0, "w": 6, "h": 3 }
},
{
"type": "stat",
"title": "Kafka Messages Consumed",
"targets": [
{ "expr": "kafka_messages_consumed_total", "refId": "C" }
],
"gridPos": { "x": 0, "y": 3, "w": 6, "h": 3 }
},
{
"type": "stat",
"title": "Kafka Consume Errors",
"targets": [
{ "expr": "kafka_consume_errors_total", "refId": "D" }
],
"gridPos": { "x": 6, "y": 3, "w": 6, "h": 3 }
},
{
"type": "graph",
"title": "Kafka Message Processing Time",
"targets": [
{ "expr": "kafka_message_process_time_seconds", "refId": "E" }
],
"gridPos": { "x": 0, "y": 6, "w": 12, "h": 6 }
},
{
"type": "stat",
"title": "Mongo Insert Errors",
"targets": [
{ "expr": "mongo_insert_errors_total", "refId": "F" }
],
"gridPos": { "x": 0, "y": 12, "w": 6, "h": 3 }
},
{
"type": "graph",
"title": "Mongo Insert Time",
"targets": [
{ "expr": "mongo_insert_time_seconds", "refId": "G" }
],
"gridPos": { "x": 6, "y": 12, "w": 6, "h": 3 }
},
{
"type": "stat",
"title": "Supabase Insert Errors",
"targets": [
{ "expr": "supabase_insert_errors_total", "refId": "H" }
],
"gridPos": { "x": 0, "y": 15, "w": 6, "h": 3 }
},
{
"type": "graph",
"title": "Supabase Insert Time",
"targets": [
{ "expr": "supabase_insert_seconds", "refId": "I" }
],
"gridPos": { "x": 6, "y": 15, "w": 6, "h": 3 }
},
{
"type": "stat",
"title": "ETL Complete Persons",
"targets": [
{ "expr": "etl_complete_persons_total", "refId": "J" }
],
"gridPos": { "x": 0, "y": 18, "w": 6, "h": 3 }
},
{
"type": "stat",
"title": "ETL Incomplete Persons",
"targets": [
{ "expr": "etl_incomplete_persons_total", "refId": "K" }
],
"gridPos": { "x": 6, "y": 18, "w": 6, "h": 3 }
}
]
}
83 changes: 83 additions & 0 deletions monitoring/grafana/provisioning/dashboards/pipeline_overview.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
{
"id": null,
"title": "Data Pipeline Overview",
"tags": ["pipeline", "kafka", "mongo", "supabase"],
"timezone": "browser",
"schemaVersion": 36,
"version": 1,
"refresh": "10s",
"panels": [
{
"type": "stat",
"title": "Kafka - Mensajes Enviados",
"targets": [{"expr": "kafka_messages_sent_total", "refId": "A"}],
"gridPos": {"x": 0, "y": 0, "w": 6, "h": 4}
},
{
"type": "stat",
"title": "Kafka - Errores de Envío",
"targets": [{"expr": "kafka_send_errors_total", "refId": "A"}],
"gridPos": {"x": 6, "y": 0, "w": 6, "h": 4}
},
{
"type": "stat",
"title": "Kafka - Tamaño Medio de Mensaje",
"targets": [{"expr": "avg(kafka_message_size_bytes)", "refId": "A"}],
"gridPos": {"x": 0, "y": 4, "w": 6, "h": 4}
},
{
"type": "stat",
"title": "Kafka Consumer - Mensajes Consumidos",
"targets": [{"expr": "kafka_messages_consumed_total", "refId": "A"}],
"gridPos": {"x": 6, "y": 4, "w": 6, "h": 4}
},
{
"type": "stat",
"title": "Kafka Consumer - Errores de Consumo",
"targets": [{"expr": "kafka_consume_errors_total", "refId": "A"}],
"gridPos": {"x": 0, "y": 8, "w": 6, "h": 4}
},
{
"type": "stat",
"title": "MongoDB - Documentos Insertados",
"targets": [{"expr": "mongo_inserts_total", "refId": "A"}],
"gridPos": {"x": 6, "y": 8, "w": 6, "h": 4}
},
{
"type": "stat",
"title": "MongoDB - Errores de Inserción",
"targets": [{"expr": "mongo_insert_errors_total", "refId": "A"}],
"gridPos": {"x": 0, "y": 12, "w": 6, "h": 4}
},
{
"type": "stat",
"title": "Supabase - Personas Migradas",
"targets": [{"expr": "supabase_persons_migrated_total", "refId": "A"}],
"gridPos": {"x": 6, "y": 12, "w": 6, "h": 4}
},
{
"type": "stat",
"title": "Supabase - Errores de Inserción",
"targets": [{"expr": "supabase_insert_errors_total", "refId": "A"}],
"gridPos": {"x": 0, "y": 16, "w": 6, "h": 4}
},
{
"type": "graph",
"title": "Kafka - Tiempo de Envío de Mensajes",
"targets": [{"expr": "rate(kafka_message_send_seconds_sum[1m]) / rate(kafka_message_send_seconds_count[1m])", "refId": "A"}],
"gridPos": {"x": 6, "y": 16, "w": 12, "h": 8}
},
{
"type": "graph",
"title": "MongoDB - Tiempo de Inserción",
"targets": [{"expr": "rate(mongo_insert_seconds_sum[1m]) / rate(mongo_insert_seconds_count[1m])", "refId": "A"}],
"gridPos": {"x": 0, "y": 24, "w": 12, "h": 8}
},
{
"type": "graph",
"title": "Supabase - Tiempo de Inserción",
"targets": [{"expr": "rate(supabase_insert_seconds_sum[1m]) / rate(supabase_insert_seconds_count[1m])", "refId": "A"}],
"gridPos": {"x": 12, "y": 24, "w": 12, "h": 8}
}
]
}
7 changes: 7 additions & 0 deletions monitoring/grafana/provisioning/datasources/datasource.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
apiVersion: 1
datasources:
- name: Prometheus
type: prometheus
access: proxy
url: http://prometheus:9090
isDefault: true
13 changes: 13 additions & 0 deletions monitoring/prometheus/prometheus.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
global:
scrape_interval: 5s

scrape_configs:
- job_name: 'kafka_push'
static_configs:
- targets: ['random_generator:9101']
- job_name: 'kafka_consumer'
static_configs:
- targets: ['kafka_consumer:9102']
- job_name: 'mongo_to_postgres'
static_configs:
- targets: ['mongo_to_postgres:9103']
1 change: 1 addition & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,5 @@ kafka-python
pymongo
python-dotenv
supabase
prometheus_client
redis
Empty file removed scripts/.gitkeep
Empty file.
Empty file removed src/database/.gitkeep
Empty file.
4 changes: 3 additions & 1 deletion src/etl/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -20,4 +20,6 @@ ENV PYTHONPATH=/app
#RUN chmod +x /app/src/etl/wait-for-it.sh

# Ejecutar el script
CMD ["python", "mongo_to_postgres.py"]
CMD ["python", "mongo_to_postgres.py"]

EXPOSE 9103
Loading