From b10a60187bfa87e43dc6904f9ad152de3f4a127b Mon Sep 17 00:00:00 2001 From: jdomdev Date: Thu, 19 Jun 2025 16:43:34 +0200 Subject: [PATCH 1/8] feat: integrate Prometheus and Grafana for monitoring ETL pipeline services. --- .gitignore | 18 +++- datagen/Dockerfile | 2 + datagen/kafka_push.py | 35 +++++++- datagen/requirements.txt | Bin 110 -> 148 bytes docker-compose.yml | 24 +++++ monitoring/__init__.py | 1 + monitoring/grafana/README.md | 13 +++ .../grafana/dashboards/pipeline_overview.json | 83 ++++++++++++++++++ monitoring/prometheus/prometheus.yml | 13 +++ requirements.txt | 3 +- src/etl/Dockerfile | 4 +- src/etl/mongo_to_postgres.py | 57 +++++++++--- src/etl/requirements.txt | 1 + src/kafka_consumer/Dockerfile | 2 + src/kafka_consumer/consumer.py | 22 ++++- src/kafka_consumer/requirements.txt | 1 + src/kafka_consumer/storage_mongo.py | 64 ++++++++------ src/monitoring/grafana_dashboards/.gitkeep | 0 18 files changed, 297 insertions(+), 46 deletions(-) create mode 100644 monitoring/__init__.py create mode 100644 monitoring/grafana/README.md create mode 100644 monitoring/grafana/dashboards/pipeline_overview.json create mode 100644 monitoring/prometheus/prometheus.yml delete mode 100644 src/monitoring/grafana_dashboards/.gitkeep diff --git a/.gitignore b/.gitignore index ab42276..c0f0b39 100644 --- a/.gitignore +++ b/.gitignore @@ -158,4 +158,20 @@ cython_debug/ # be found at https://github.com/github/gitignore/blob/main/Global/JetBrains.gitignore # and can be added to the global gitignore or merged into this file. For a more nuclear # option (not recommended) you can uncomment the following to ignore the entire idea folder. -.idea/ \ No newline at end of file +.idea/ + +# Grafana runtime files +monitoring/grafana/data/ +monitoring/grafana/plugins/ +monitoring/grafana/grafana.db +monitoring/grafana/pdf/ +monitoring/grafana/csv/ +monitoring/grafana/png/ +monitoring/grafana/__pycache__/ +monitoring/grafana/logs/ +monitoring/grafana/conf/ +monitoring/grafana/public/ +monitoring/grafana/node_modules/ +monitoring/grafana/yarn.lock +monitoring/grafana/package-lock.json +monitoring/grafana/package.json \ No newline at end of file diff --git a/datagen/Dockerfile b/datagen/Dockerfile index 7e97977..c37123c 100644 --- a/datagen/Dockerfile +++ b/datagen/Dockerfile @@ -8,6 +8,8 @@ RUN pip install -r requirements.txt CMD [ "python", "./kafka_push.py" ] +EXPOSE 9101 + diff --git a/datagen/kafka_push.py b/datagen/kafka_push.py index 561ae1e..a0b789e 100644 --- a/datagen/kafka_push.py +++ b/datagen/kafka_push.py @@ -1,6 +1,22 @@ from kafka import KafkaProducer, KafkaConsumer from data_generator import RandomDataGenerator import json +from prometheus_client import start_http_server, Counter, Histogram, Gauge, Summary +import time + +# Métricas Prometheus ampliadas +MESSAGES_SENT = Counter('kafka_messages_sent_total', 'Total mensajes enviados a Kafka') +SEND_ERRORS = Counter('kafka_send_errors_total', 'Errores al enviar mensajes a Kafka') +GEN_TIME = Histogram('kafka_message_generation_seconds', 'Tiempo en generar un mensaje aleatorio') +SEND_TIME = Histogram('kafka_message_send_seconds', 'Tiempo en enviar un mensaje a Kafka') +MESSAGE_SIZE = Histogram('kafka_message_size_bytes', 'Tamaño de los mensajes enviados a Kafka') +MESSAGES_IN_PROGRESS = Gauge('kafka_messages_in_progress', 'Mensajes en proceso de envío') +LAST_SEND_STATUS = Gauge('kafka_last_send_status', 'Último estado de envío (1=OK, 0=Error)') +SEND_SUMMARY = Summary('kafka_send_summary_seconds', 'Resumen de tiempos de envío') +SERIALIZE_TIME = Histogram('kafka_message_serialize_seconds', 'Tiempo en serializar un mensaje a JSON') + +# Inicia el endpoint Prometheus en el puerto 9101 +start_http_server(9101) def make_producer(host): producer = KafkaProducer(bootstrap_servers=[host], value_serializer=lambda x: json.dumps(x).encode("utf-8")) @@ -8,8 +24,23 @@ def make_producer(host): def send_random(producer): for msg in RandomDataGenerator(): - producer.send("probando", msg) - print("Sent") + with GEN_TIME.time(): + try: + with SERIALIZE_TIME.time(): + msg_bytes = json.dumps(msg).encode("utf-8") + MESSAGE_SIZE.observe(len(msg_bytes)) + MESSAGES_IN_PROGRESS.inc() + with SEND_TIME.time(), SEND_SUMMARY.time(): + producer.send("probando", msg) + MESSAGES_SENT.inc() + LAST_SEND_STATUS.set(1) + print("Sent") + except Exception as e: + SEND_ERRORS.inc() + LAST_SEND_STATUS.set(0) + print(f"Error enviando mensaje: {e}") + finally: + MESSAGES_IN_PROGRESS.dec() def main(): producer = make_producer(host="kafka:9092") diff --git a/datagen/requirements.txt b/datagen/requirements.txt index 423ee4075d60adfbf0c580dccb1364e900de194d..0ca32b9382e3800cabe8b2fe814a2d438cefb1ef 100644 GIT binary patch delta 44 vcmd0s!Z;yMt$?A3A)g_aA(f$oAp=O3G88k!GbA(QFk}LGc|cKK1}+8w_|^$& delta 5 McmbQjm^UF000qSY0ssI2 diff --git a/docker-compose.yml b/docker-compose.yml index a14b160..6fef505 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -100,6 +100,30 @@ services: - kafka_network restart: always + prometheus: + image: prom/prometheus:latest + container_name: prometheus + volumes: + - ./monitoring/prometheus/prometheus.yml:/etc/prometheus/prometheus.yml + command: + - '--config.file=/etc/prometheus/prometheus.yml' + ports: + - "9090:9090" + networks: + - kafka_network + + grafana: + image: grafana/grafana:latest + container_name: grafana + ports: + - "3000:3000" + volumes: + - ./monitoring/grafana:/var/lib/grafana + depends_on: + - prometheus + networks: + - kafka_network + volumes: mongo_data: diff --git a/monitoring/__init__.py b/monitoring/__init__.py new file mode 100644 index 0000000..8b13789 --- /dev/null +++ b/monitoring/__init__.py @@ -0,0 +1 @@ + diff --git a/monitoring/grafana/README.md b/monitoring/grafana/README.md new file mode 100644 index 0000000..c7ee029 --- /dev/null +++ b/monitoring/grafana/README.md @@ -0,0 +1,13 @@ +# Grafana + +Esta carpeta se utiliza como volumen persistente para los datos y configuraciones de Grafana. + +## Dashboards + +Puedes importar dashboards de ejemplo desde archivos JSON ubicados en `monitoring/grafana/dashboards/`. + +## Uso + +- Accede a Grafana en http://localhost:3000 (usuario y contraseña por defecto: admin / admin). +- Añade Prometheus como fuente de datos (`http://prometheus:9090`). +- Importa dashboards desde la sección de Dashboards > Import. \ No newline at end of file diff --git a/monitoring/grafana/dashboards/pipeline_overview.json b/monitoring/grafana/dashboards/pipeline_overview.json new file mode 100644 index 0000000..abcf34e --- /dev/null +++ b/monitoring/grafana/dashboards/pipeline_overview.json @@ -0,0 +1,83 @@ +{ + "id": null, + "title": "Data Pipeline Overview", + "tags": ["pipeline", "kafka", "mongo", "supabase"], + "timezone": "browser", + "schemaVersion": 36, + "version": 1, + "refresh": "10s", + "panels": [ + { + "type": "stat", + "title": "Kafka - Mensajes Enviados", + "targets": [{"expr": "kafka_messages_sent_total", "refId": "A"}], + "gridPos": {"x": 0, "y": 0, "w": 6, "h": 4} + }, + { + "type": "stat", + "title": "Kafka - Errores de Envío", + "targets": [{"expr": "kafka_send_errors_total", "refId": "A"}], + "gridPos": {"x": 6, "y": 0, "w": 6, "h": 4} + }, + { + "type": "stat", + "title": "Kafka - Tamaño Medio de Mensaje", + "targets": [{"expr": "avg(kafka_message_size_bytes)", "refId": "A"}], + "gridPos": {"x": 0, "y": 4, "w": 6, "h": 4} + }, + { + "type": "stat", + "title": "Kafka Consumer - Mensajes Consumidos", + "targets": [{"expr": "kafka_messages_consumed_total", "refId": "A"}], + "gridPos": {"x": 6, "y": 4, "w": 6, "h": 4} + }, + { + "type": "stat", + "title": "Kafka Consumer - Errores de Consumo", + "targets": [{"expr": "kafka_consume_errors_total", "refId": "A"}], + "gridPos": {"x": 0, "y": 8, "w": 6, "h": 4} + }, + { + "type": "stat", + "title": "MongoDB - Documentos Insertados", + "targets": [{"expr": "mongo_inserts_total", "refId": "A"}], + "gridPos": {"x": 6, "y": 8, "w": 6, "h": 4} + }, + { + "type": "stat", + "title": "MongoDB - Errores de Inserción", + "targets": [{"expr": "mongo_insert_errors_total", "refId": "A"}], + "gridPos": {"x": 0, "y": 12, "w": 6, "h": 4} + }, + { + "type": "stat", + "title": "Supabase - Personas Migradas", + "targets": [{"expr": "supabase_persons_migrated_total", "refId": "A"}], + "gridPos": {"x": 6, "y": 12, "w": 6, "h": 4} + }, + { + "type": "stat", + "title": "Supabase - Errores de Inserción", + "targets": [{"expr": "supabase_insert_errors_total", "refId": "A"}], + "gridPos": {"x": 0, "y": 16, "w": 6, "h": 4} + }, + { + "type": "graph", + "title": "Kafka - Tiempo de Envío de Mensajes", + "targets": [{"expr": "rate(kafka_message_send_seconds_sum[1m]) / rate(kafka_message_send_seconds_count[1m])", "refId": "A"}], + "gridPos": {"x": 6, "y": 16, "w": 12, "h": 8} + }, + { + "type": "graph", + "title": "MongoDB - Tiempo de Inserción", + "targets": [{"expr": "rate(mongo_insert_seconds_sum[1m]) / rate(mongo_insert_seconds_count[1m])", "refId": "A"}], + "gridPos": {"x": 0, "y": 24, "w": 12, "h": 8} + }, + { + "type": "graph", + "title": "Supabase - Tiempo de Inserción", + "targets": [{"expr": "rate(supabase_insert_seconds_sum[1m]) / rate(supabase_insert_seconds_count[1m])", "refId": "A"}], + "gridPos": {"x": 12, "y": 24, "w": 12, "h": 8} + } + ] +} \ No newline at end of file diff --git a/monitoring/prometheus/prometheus.yml b/monitoring/prometheus/prometheus.yml new file mode 100644 index 0000000..7437405 --- /dev/null +++ b/monitoring/prometheus/prometheus.yml @@ -0,0 +1,13 @@ +global: + scrape_interval: 5s + +scrape_configs: + - job_name: 'kafka_push' + static_configs: + - targets: ['random_generator:9101'] + - job_name: 'kafka_consumer' + static_configs: + - targets: ['kafka_consumer:9102'] + - job_name: 'mongo_to_postgres' + static_configs: + - targets: ['mongo_to_postgres:9103'] \ No newline at end of file diff --git a/requirements.txt b/requirements.txt index de2e871..a68bd69 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,4 +1,5 @@ kafka-python pymongo python-dotenv -supabase \ No newline at end of file +supabase +prometheus_client \ No newline at end of file diff --git a/src/etl/Dockerfile b/src/etl/Dockerfile index 31e0a0b..e176002 100644 --- a/src/etl/Dockerfile +++ b/src/etl/Dockerfile @@ -20,4 +20,6 @@ ENV PYTHONPATH=/app #RUN chmod +x /app/src/etl/wait-for-it.sh # Ejecutar el script -CMD ["python", "mongo_to_postgres.py"] \ No newline at end of file +CMD ["python", "mongo_to_postgres.py"] + +EXPOSE 9103 \ No newline at end of file diff --git a/src/etl/mongo_to_postgres.py b/src/etl/mongo_to_postgres.py index 8b490e1..37a492a 100644 --- a/src/etl/mongo_to_postgres.py +++ b/src/etl/mongo_to_postgres.py @@ -15,6 +15,7 @@ ) from utils.logg import write_log import json +from prometheus_client import start_http_server, Counter, Histogram, Gauge, Summary # Cargar variables de entorno @@ -28,6 +29,13 @@ SUPABASE_URL = os.getenv('SUPABASE_URL') SUPABASE_SERVICE_ROLE_KEY = os.getenv('SUPABASE_SERVICE_ROLE_KEY') +MIGRATED_PERSONS = Counter('supabase_persons_migrated_total', 'Total personas migradas a Supabase') +SUPABASE_INSERT_ERRORS = Counter('supabase_insert_errors_total', 'Errores al insertar en Supabase') +SUPABASE_INSERT_TIME = Histogram('supabase_insert_seconds', 'Tiempo en insertar en Supabase') +SUPABASE_INSERT_IN_PROGRESS = Gauge('supabase_insert_in_progress', 'Inserciones en proceso en Supabase') +SUPABASE_INSERT_SUMMARY = Summary('supabase_insert_summary_seconds', 'Resumen de tiempos de inserción en Supabase') + +start_http_server(9103) # Puerto Prometheus para este servicio def check_mongo_connection(): try: @@ -101,23 +109,27 @@ def insert_location(supabase, person): if not address or not city: return None try: - resp = supabase.table("locations").select("location_id").eq("address", address).eq("city", city).execute() - if resp.data: - location_id = resp.data[0]["location_id"] - else: - ins = supabase.table("locations").insert({"address": address, "city": city}).execute() - location_id = ins.data[0]["location_id"] + with SUPABASE_INSERT_TIME.time(), SUPABASE_INSERT_SUMMARY.time(): + SUPABASE_INSERT_IN_PROGRESS.inc() + resp = supabase.table("locations").select("location_id").eq("address", address).eq("city", city).execute() + if resp.data: + location_id = resp.data[0]["location_id"] + else: + ins = supabase.table("locations").insert({"address": address, "city": city}).execute() + location_id = ins.data[0]["location_id"] write_log("INFO", "mongo_to_postgres.py", f"Ubicación insertada/obtenida: {address}, {city} -> {location_id}") return location_id except Exception as e: + SUPABASE_INSERT_ERRORS.inc() write_log("ERROR", "mongo_to_postgres.py", f"Error insertando ubicación: {e}") return None + finally: + SUPABASE_INSERT_IN_PROGRESS.dec() def insert_person(supabase, person, location_id): if not person.get("passport") or not person.get("name") or not person.get("last_name") or not person.get("email"): write_log("WARNING", "mongo_to_postgres.py", f"Datos insuficientes para persona: {person}") return - # Ignorar fullname y address data = { "passport": person.get("passport"), "first_name": person.get("name"), @@ -128,11 +140,17 @@ def insert_person(supabase, person, location_id): "location_id_fk": location_id } try: - supabase.table("persons").upsert(data, on_conflict=["passport"]).execute() + with SUPABASE_INSERT_TIME.time(), SUPABASE_INSERT_SUMMARY.time(): + SUPABASE_INSERT_IN_PROGRESS.inc() + supabase.table("persons").upsert(data, on_conflict=["passport"]).execute() + MIGRATED_PERSONS.inc() write_log("INFO", "mongo_to_postgres.py", f"Persona insertada/actualizada: {data}") except Exception as e: + SUPABASE_INSERT_ERRORS.inc() print(f"ERROR >> Fallo insertando en persons: {e}") write_log("ERROR", "mongo_to_postgres.py", f"Error insertando persons: {e}") + finally: + SUPABASE_INSERT_IN_PROGRESS.dec() def insert_bank(supabase, person): if not person.get("passport") or not person.get("IBAN"): @@ -143,32 +161,40 @@ def insert_bank(supabase, person): "salary": person.get("salary") } try: - supabase.table("bank_data").upsert(data, on_conflict=["passport_fk"]).execute() + with SUPABASE_INSERT_TIME.time(), SUPABASE_INSERT_SUMMARY.time(): + SUPABASE_INSERT_IN_PROGRESS.inc() + supabase.table("bank_data").upsert(data, on_conflict=["passport_fk"]).execute() write_log("INFO", "mongo_to_postgres.py", f"Bank_data insertado/actualizado: {data}") except Exception as e: + SUPABASE_INSERT_ERRORS.inc() print(f"ERROR >> Fallo insertando en bank_data: {e}") write_log("ERROR", "mongo_to_postgres.py", f"Error insertando bank_data: {e}") + finally: + SUPABASE_INSERT_IN_PROGRESS.dec() def insert_network(supabase, person, location_id): if not person.get("passport") or not person.get("IPv4"): return - # Ignorar fullname y address data = { "passport_fk": person.get("passport"), "location_id_fk": location_id, "ip_address": person.get("IPv4") } try: - supabase.table("network_data").upsert(data, on_conflict=["passport_fk"]).execute() + with SUPABASE_INSERT_TIME.time(), SUPABASE_INSERT_SUMMARY.time(): + SUPABASE_INSERT_IN_PROGRESS.inc() + supabase.table("network_data").upsert(data, on_conflict=["passport_fk"]).execute() write_log("INFO", "mongo_to_postgres.py", f"Network_data insertado/actualizado: {data}") except Exception as e: + SUPABASE_INSERT_ERRORS.inc() print(f"ERROR >> Fallo insertando en network_data: {e}") write_log("ERROR", "mongo_to_postgres.py", f"Error insertando network_data: {e}") + finally: + SUPABASE_INSERT_IN_PROGRESS.dec() def insert_professional(supabase, person): if not person.get("passport") or not person.get("company"): return - # Ignorar fullname y address data = { "passport_fk": person.get("passport"), "company_name": person.get("company"), @@ -178,11 +204,16 @@ def insert_professional(supabase, person): "job_title": person.get("job") } try: - supabase.table("professional_data").upsert(data, on_conflict=["passport_fk"]).execute() + with SUPABASE_INSERT_TIME.time(), SUPABASE_INSERT_SUMMARY.time(): + SUPABASE_INSERT_IN_PROGRESS.inc() + supabase.table("professional_data").upsert(data, on_conflict=["passport_fk"]).execute() write_log("INFO", "mongo_to_postgres.py", f"Professional_data insertado/actualizado: {data}") except Exception as e: + SUPABASE_INSERT_ERRORS.inc() print(f"ERROR >> Fallo insertando en professional_data: {e}") write_log("ERROR", "mongo_to_postgres.py", f"Error insertando professional_data: {e}") + finally: + SUPABASE_INSERT_IN_PROGRESS.dec() def buscar_passport_por_fullname_address(person, personas): """ diff --git a/src/etl/requirements.txt b/src/etl/requirements.txt index f49d666..3676978 100644 --- a/src/etl/requirements.txt +++ b/src/etl/requirements.txt @@ -2,3 +2,4 @@ kafka-python-ng==2.2.2 pymongo supabase python-dotenv +prometheus_client diff --git a/src/kafka_consumer/Dockerfile b/src/kafka_consumer/Dockerfile index f70a70b..acdd23d 100644 --- a/src/kafka_consumer/Dockerfile +++ b/src/kafka_consumer/Dockerfile @@ -20,3 +20,5 @@ RUN pip install --no-cache-dir --upgrade pip && \ # Use wait-for-it.sh to wait for Kafka CMD ["python", "consumer.py"] + +EXPOSE 9102 diff --git a/src/kafka_consumer/consumer.py b/src/kafka_consumer/consumer.py index bbe1698..8e01ac5 100644 --- a/src/kafka_consumer/consumer.py +++ b/src/kafka_consumer/consumer.py @@ -1,13 +1,22 @@ from kafka import KafkaConsumer import json from storage_mongo import guardar_en_mongo +from prometheus_client import start_http_server, Counter, Histogram, Gauge, Summary + +CONSUMED_MESSAGES = Counter('kafka_messages_consumed_total', 'Total mensajes consumidos de Kafka') +CONSUME_ERRORS = Counter('kafka_consume_errors_total', 'Errores al consumir mensajes de Kafka') +CONSUME_TIME = Histogram('kafka_message_consume_seconds', 'Tiempo en procesar un mensaje de Kafka') +CONSUME_IN_PROGRESS = Gauge('kafka_consume_in_progress', 'Mensajes en proceso de consumo') +CONSUME_SUMMARY = Summary('kafka_consume_summary_seconds', 'Resumen de tiempos de consumo') + +start_http_server(9102) # Puerto Prometheus para el consumidor def main(): consumer = KafkaConsumer( 'probando', # 'probando' es el nombre del topic a consumir bootstrap_servers='kafka:9092', #Dirección del servidor Kafka, en mi docker es kafka:9092 auto_offset_reset='earliest', # Para leer desde el principio del topic (si nunca ha leido), 'latest' para leer solo nuevos mensajes - enable_auto_commit=True, # Kafka guarda el “offset” automáticamente. El offset es un marcador que dice: “ya leí hasta aquí”, se guardará automáticamente qué mensajes ya leí (en el grupo de consumidores) + enable_auto_commit=True, # Kafka guarda el "offset" automáticamente. El offset es un marcador que dice: "ya leí hasta aquí", se guardará automáticamente qué mensajes ya leí (en el grupo de consumidores) group_id='hrpro-consumer-group', # Identificador del grupo de consumidores, si no se especifica, se crea uno por defecto value_deserializer=lambda x: json.loads(x.decode('utf-8')) # Deserializador para convertir el mensaje de bytes a JSON ) @@ -17,8 +26,15 @@ def main(): #print("Mensaje recibido:") #print(json.dumps(message.value, indent=2)) # bonito para debug - # Guardar en Mongo - guardar_en_mongo(message.value) + with CONSUME_TIME.time(), CONSUME_SUMMARY.time(): + try: + CONSUME_IN_PROGRESS.inc() + guardar_en_mongo(message.value) + CONSUMED_MESSAGES.inc() + except Exception as e: + CONSUME_ERRORS.inc() + finally: + CONSUME_IN_PROGRESS.dec() diff --git a/src/kafka_consumer/requirements.txt b/src/kafka_consumer/requirements.txt index 8c6bfbc..2e5c3e8 100644 --- a/src/kafka_consumer/requirements.txt +++ b/src/kafka_consumer/requirements.txt @@ -1,3 +1,4 @@ pymongo python-dotenv kafka-python-ng==2.2.2 +prometheus_client \ No newline at end of file diff --git a/src/kafka_consumer/storage_mongo.py b/src/kafka_consumer/storage_mongo.py index 8589434..c6a752b 100644 --- a/src/kafka_consumer/storage_mongo.py +++ b/src/kafka_consumer/storage_mongo.py @@ -1,39 +1,53 @@ import os from pymongo import MongoClient +from prometheus_client import Counter, Histogram, Gauge, Summary MONGO_URI = os.getenv("MONGO_URI", "mongodb://admin:adminpassword@mongo:27017/") client = MongoClient(MONGO_URI) db = client["raw_mongo_db"] +MONGO_INSERTS = Counter('mongo_inserts_total', 'Total documentos insertados en MongoDB') +MONGO_ERRORS = Counter('mongo_insert_errors_total', 'Errores al insertar en MongoDB') +MONGO_INSERT_TIME = Histogram('mongo_insert_seconds', 'Tiempo en insertar documento en MongoDB') +MONGO_INSERT_IN_PROGRESS = Gauge('mongo_insert_in_progress', 'Documentos en proceso de inserción') +MONGO_INSERT_SUMMARY = Summary('mongo_insert_summary_seconds', 'Resumen de tiempos de inserción') + def guardar_en_mongo(documento): try: - #print("Guardando documento en MongoDB:", documento) - keys = set(k.lower() for k in documento.keys()) - - if "passport" in keys and "name" in keys: - collection = db["personal_data"] - filter_key = {"passport": documento.get("passport")} - elif "fullname" in keys and "city" in keys: - collection = db["location_data"] - filter_key = {"fullname": documento.get("fullname")} - elif "company" in keys or "job" in keys: - collection = db["professional_data"] - filter_key = {"fullname": documento.get("fullname")} - elif "iban" in keys: - collection = db["bank_data"] - filter_key = {"passport": documento.get("passport")} - elif "ipv4" in keys: - collection = db["net_data"] - # usar IPv4 como filtro único - filter_key = {"IPv4": documento.get("IPv4")} - else: - collection = db["unknown_type"] - filter_key = documento # sin filtro único, podría cambiar - - collection.update_one(filter_key, {"$set": documento}, upsert=True) - #print(f"✅ Documento guardado o actualizado en colección: {collection.name}") + with MONGO_INSERT_TIME.time(), MONGO_INSERT_SUMMARY.time(): + MONGO_INSERT_IN_PROGRESS.inc() + #print("Guardando documento en MongoDB:", documento) + keys = set(k.lower() for k in documento.keys()) + + if "passport" in keys and "name" in keys: + collection = db["personal_data"] + filter_key = {"passport": documento.get("passport")} + elif "fullname" in keys and "city" in keys: + collection = db["location_data"] + filter_key = {"fullname": documento.get("fullname")} + elif "company" in keys or "job" in keys: + collection = db["professional_data"] + filter_key = {"fullname": documento.get("fullname")} + elif "iban" in keys: + collection = db["bank_data"] + filter_key = {"passport": documento.get("passport")} + elif "ipv4" in keys: + collection = db["net_data"] + # usar IPv4 como filtro único + filter_key = {"IPv4": documento.get("IPv4")} + else: + collection = db["unknown_type"] + filter_key = documento # sin filtro único, podría cambiar + + collection.update_one(filter_key, {"$set": documento}, upsert=True) + MONGO_INSERTS.inc() + #print(f"✅ Documento guardado o actualizado en colección: {collection.name}") except Exception as e: + MONGO_ERRORS.inc() #print("❌ Error al guardar en MongoDB :", e) print("") + finally: + MONGO_INSERT_IN_PROGRESS.dec() + diff --git a/src/monitoring/grafana_dashboards/.gitkeep b/src/monitoring/grafana_dashboards/.gitkeep deleted file mode 100644 index e69de29..0000000 From dc8c5257e9b45e204021a5393269a53394837d74 Mon Sep 17 00:00:00 2001 From: jdomdev Date: Fri, 20 Jun 2025 11:33:09 +0200 Subject: [PATCH 2/8] refactor(SQL): remove unused functions in mongo_to_supabase file and imports from etl_utils. --- src/etl/mongo_to_postgres.py | 31 ------------------------------- 1 file changed, 31 deletions(-) diff --git a/src/etl/mongo_to_postgres.py b/src/etl/mongo_to_postgres.py index 37a492a..c41f9d6 100644 --- a/src/etl/mongo_to_postgres.py +++ b/src/etl/mongo_to_postgres.py @@ -4,12 +4,6 @@ from supabase import create_client from etl_utils import ( clean_data, - find_person_key, - register_keys, - identifying_keys, - person_index, - people_data, - group_records, address_match, nombres_en_fullname ) @@ -78,31 +72,6 @@ def get_mongo_collections(): "net_data": db["net_data"] } -def extract_and_group_records(collections): - # Extraer registros de MongoDB y agruparlos - print("🔄 Extrayendo y agrupando registros de MongoDB...") - people_data.clear() - person_index.clear() - - # Extraer y limpiar todos los documentos de todas las colecciones - all_docs = [] - for cname, col in collections.items(): - for doc in col.find(): - doc.pop("_id", None) - clean_doc = clean_data(doc) - all_docs.append(clean_doc) - write_log("INFO", "mongo_to_postgres.py", f"Documento extraído y limpiado de {cname}: {clean_doc}") - - - # Agrupar usando la lógica de etl_utils - group_records(all_docs) - print("DEBUG >> Personas agrupadas (resumen):") - for i, person in enumerate(people_data): - print(f" [{i}] {person}") - print(f"✅ Total de personas agrupadas: {len(people_data)}") - write_log("INFO", "mongo_to_postgres.py", f"Total de personas agrupadas: {len(people_data)}") - return people_data - def insert_location(supabase, person): address = person.get("address") city = person.get("city") From 6925040901205772e4d32f418abbdbb3a67b232b Mon Sep 17 00:00:00 2001 From: jdomdev Date: Fri, 20 Jun 2025 17:35:04 +0200 Subject: [PATCH 3/8] feat: enhance Grafana monitoring with structured dashboards and datasource provisioning. chore: relocate unit test to /tests/unit/. --- docker-compose.yml | 1 + .../provisioning/dashboards/dashboards.yml | 10 ++ .../dashboards/full_pipeline_metrics.json | 99 +++++++++++++++++++ .../dashboards/pipeline_overview.json | 0 .../provisioning/datasources/datasource.yml | 7 ++ tests/unit/.gitkeep | 0 {src/etl => tests/unit}/test.py | 0 7 files changed, 117 insertions(+) create mode 100644 monitoring/grafana/provisioning/dashboards/dashboards.yml create mode 100644 monitoring/grafana/provisioning/dashboards/full_pipeline_metrics.json rename monitoring/grafana/{ => provisioning}/dashboards/pipeline_overview.json (100%) create mode 100644 monitoring/grafana/provisioning/datasources/datasource.yml delete mode 100644 tests/unit/.gitkeep rename {src/etl => tests/unit}/test.py (100%) diff --git a/docker-compose.yml b/docker-compose.yml index 6fef505..c11d1e3 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -119,6 +119,7 @@ services: - "3000:3000" volumes: - ./monitoring/grafana:/var/lib/grafana + - ./monitoring/grafana/provisioning:/etc/grafana/provisioning depends_on: - prometheus networks: diff --git a/monitoring/grafana/provisioning/dashboards/dashboards.yml b/monitoring/grafana/provisioning/dashboards/dashboards.yml new file mode 100644 index 0000000..673349d --- /dev/null +++ b/monitoring/grafana/provisioning/dashboards/dashboards.yml @@ -0,0 +1,10 @@ +apiVersion: 1 +providers: + - name: 'default' + orgId: 1 + folder: '' + type: file + disableDeletion: false + updateIntervalSeconds: 10 + options: + path: /etc/grafana/provisioning/dashboards \ No newline at end of file diff --git a/monitoring/grafana/provisioning/dashboards/full_pipeline_metrics.json b/monitoring/grafana/provisioning/dashboards/full_pipeline_metrics.json new file mode 100644 index 0000000..498b997 --- /dev/null +++ b/monitoring/grafana/provisioning/dashboards/full_pipeline_metrics.json @@ -0,0 +1,99 @@ +{ + "id": null, + "title": "Full Pipeline Metrics", + "tags": [], + "timezone": "browser", + "schemaVersion": 36, + "version": 1, + "refresh": "5s", + "panels": [ + { + "type": "stat", + "title": "Kafka Messages Produced", + "targets": [ + { "expr": "kafka_messages_produced_total", "refId": "A" } + ], + "gridPos": { "x": 0, "y": 0, "w": 6, "h": 3 } + }, + { + "type": "stat", + "title": "Kafka Produce Errors", + "targets": [ + { "expr": "kafka_produce_errors_total", "refId": "B" } + ], + "gridPos": { "x": 6, "y": 0, "w": 6, "h": 3 } + }, + { + "type": "stat", + "title": "Kafka Messages Consumed", + "targets": [ + { "expr": "kafka_messages_consumed_total", "refId": "C" } + ], + "gridPos": { "x": 0, "y": 3, "w": 6, "h": 3 } + }, + { + "type": "stat", + "title": "Kafka Consume Errors", + "targets": [ + { "expr": "kafka_consume_errors_total", "refId": "D" } + ], + "gridPos": { "x": 6, "y": 3, "w": 6, "h": 3 } + }, + { + "type": "graph", + "title": "Kafka Message Processing Time", + "targets": [ + { "expr": "kafka_message_process_time_seconds", "refId": "E" } + ], + "gridPos": { "x": 0, "y": 6, "w": 12, "h": 6 } + }, + { + "type": "stat", + "title": "Mongo Insert Errors", + "targets": [ + { "expr": "mongo_insert_errors_total", "refId": "F" } + ], + "gridPos": { "x": 0, "y": 12, "w": 6, "h": 3 } + }, + { + "type": "graph", + "title": "Mongo Insert Time", + "targets": [ + { "expr": "mongo_insert_time_seconds", "refId": "G" } + ], + "gridPos": { "x": 6, "y": 12, "w": 6, "h": 3 } + }, + { + "type": "stat", + "title": "Supabase Insert Errors", + "targets": [ + { "expr": "supabase_insert_errors_total", "refId": "H" } + ], + "gridPos": { "x": 0, "y": 15, "w": 6, "h": 3 } + }, + { + "type": "graph", + "title": "Supabase Insert Time", + "targets": [ + { "expr": "supabase_insert_seconds", "refId": "I" } + ], + "gridPos": { "x": 6, "y": 15, "w": 6, "h": 3 } + }, + { + "type": "stat", + "title": "ETL Complete Persons", + "targets": [ + { "expr": "etl_complete_persons_total", "refId": "J" } + ], + "gridPos": { "x": 0, "y": 18, "w": 6, "h": 3 } + }, + { + "type": "stat", + "title": "ETL Incomplete Persons", + "targets": [ + { "expr": "etl_incomplete_persons_total", "refId": "K" } + ], + "gridPos": { "x": 6, "y": 18, "w": 6, "h": 3 } + } + ] +} \ No newline at end of file diff --git a/monitoring/grafana/dashboards/pipeline_overview.json b/monitoring/grafana/provisioning/dashboards/pipeline_overview.json similarity index 100% rename from monitoring/grafana/dashboards/pipeline_overview.json rename to monitoring/grafana/provisioning/dashboards/pipeline_overview.json diff --git a/monitoring/grafana/provisioning/datasources/datasource.yml b/monitoring/grafana/provisioning/datasources/datasource.yml new file mode 100644 index 0000000..8d9f9d8 --- /dev/null +++ b/monitoring/grafana/provisioning/datasources/datasource.yml @@ -0,0 +1,7 @@ +apiVersion: 1 +datasources: + - name: Prometheus + type: prometheus + access: proxy + url: http://prometheus:9090 + isDefault: true \ No newline at end of file diff --git a/tests/unit/.gitkeep b/tests/unit/.gitkeep deleted file mode 100644 index e69de29..0000000 diff --git a/src/etl/test.py b/tests/unit/test.py similarity index 100% rename from src/etl/test.py rename to tests/unit/test.py From a2adbaf210003a24d782c1ea158e6ece6adbfb25 Mon Sep 17 00:00:00 2001 From: jdomdev Date: Fri, 20 Jun 2025 18:04:36 +0200 Subject: [PATCH 4/8] chore: remove database/.gitkeep placeholder file. --- src/database/.gitkeep | 0 1 file changed, 0 insertions(+), 0 deletions(-) delete mode 100644 src/database/.gitkeep diff --git a/src/database/.gitkeep b/src/database/.gitkeep deleted file mode 100644 index e69de29..0000000 From ef5fce534b8672f495ff5bc912e191ee610b8b3e Mon Sep 17 00:00:00 2001 From: jdomdev Date: Fri, 20 Jun 2025 19:14:20 +0200 Subject: [PATCH 5/8] refactor: remove .gitkeep from docs/ folder. --- docs/.gitkeep | 0 1 file changed, 0 insertions(+), 0 deletions(-) delete mode 100644 docs/.gitkeep diff --git a/docs/.gitkeep b/docs/.gitkeep deleted file mode 100644 index e69de29..0000000 From bb3f2e0f1b947f291a507ca870cba323fac880c4 Mon Sep 17 00:00:00 2001 From: jdomdev Date: Fri, 20 Jun 2025 19:16:18 +0200 Subject: [PATCH 6/8] refactor: remove config/ and data/ folders from root project. --- config/.gitkeep | 0 data/samples/.gitkeep | 0 data/schemas/.gitkeep | 0 3 files changed, 0 insertions(+), 0 deletions(-) delete mode 100644 config/.gitkeep delete mode 100644 data/samples/.gitkeep delete mode 100644 data/schemas/.gitkeep diff --git a/config/.gitkeep b/config/.gitkeep deleted file mode 100644 index e69de29..0000000 diff --git a/data/samples/.gitkeep b/data/samples/.gitkeep deleted file mode 100644 index e69de29..0000000 diff --git a/data/schemas/.gitkeep b/data/schemas/.gitkeep deleted file mode 100644 index e69de29..0000000 From 9616bef65c77b271e8c8d7e3fed26e0d51256ed1 Mon Sep 17 00:00:00 2001 From: jdomdev Date: Fri, 20 Jun 2025 19:17:28 +0200 Subject: [PATCH 7/8] refactor: remove scripts/ folder from root project. --- scripts/.gitkeep | 0 1 file changed, 0 insertions(+), 0 deletions(-) delete mode 100644 scripts/.gitkeep diff --git a/scripts/.gitkeep b/scripts/.gitkeep deleted file mode 100644 index e69de29..0000000 From 6d60e42b68d8d6959a4fba4a10f815a3954b938c Mon Sep 17 00:00:00 2001 From: jdomdev Date: Fri, 20 Jun 2025 19:31:38 +0200 Subject: [PATCH 8/8] ci: add GitHub Actions workflow for test automation on main and dev branches. --- .github/workflows/.gitkeep | 0 .github/workflows/python-tests.yml | 29 +++++++++++++++++++++++++++++ 2 files changed, 29 insertions(+) delete mode 100644 .github/workflows/.gitkeep create mode 100644 .github/workflows/python-tests.yml diff --git a/.github/workflows/.gitkeep b/.github/workflows/.gitkeep deleted file mode 100644 index e69de29..0000000 diff --git a/.github/workflows/python-tests.yml b/.github/workflows/python-tests.yml new file mode 100644 index 0000000..3753000 --- /dev/null +++ b/.github/workflows/python-tests.yml @@ -0,0 +1,29 @@ +name: Python CI + +on: + push: + branches: [ main, dev ] + pull_request: + branches: [ main, dev ] + +jobs: + test: + runs-on: ubuntu-latest + + steps: + - name: Checkout code + uses: actions/checkout@v4 + + - name: Set up Python + uses: actions/setup-python@v5 + with: + python-version: '3.10' + + - name: Install dependencies + run: | + python -m pip install --upgrade pip + if [ -f requirements.txt ]; then pip install -r requirements.txt; fi + + - name: Run unit tests + run: | + python -m unittest discover -s tests/unit -p "*.py" -v \ No newline at end of file