diff --git a/.github/workflows/.gitkeep b/.github/workflows/.gitkeep deleted file mode 100644 index e69de29..0000000 diff --git a/.github/workflows/python-tests.yml b/.github/workflows/python-tests.yml new file mode 100644 index 0000000..3753000 --- /dev/null +++ b/.github/workflows/python-tests.yml @@ -0,0 +1,29 @@ +name: Python CI + +on: + push: + branches: [ main, dev ] + pull_request: + branches: [ main, dev ] + +jobs: + test: + runs-on: ubuntu-latest + + steps: + - name: Checkout code + uses: actions/checkout@v4 + + - name: Set up Python + uses: actions/setup-python@v5 + with: + python-version: '3.10' + + - name: Install dependencies + run: | + python -m pip install --upgrade pip + if [ -f requirements.txt ]; then pip install -r requirements.txt; fi + + - name: Run unit tests + run: | + python -m unittest discover -s tests/unit -p "*.py" -v \ No newline at end of file diff --git a/config/.gitkeep b/config/.gitkeep deleted file mode 100644 index e69de29..0000000 diff --git a/data/samples/.gitkeep b/data/samples/.gitkeep deleted file mode 100644 index e69de29..0000000 diff --git a/data/schemas/.gitkeep b/data/schemas/.gitkeep deleted file mode 100644 index e69de29..0000000 diff --git a/datagen/Dockerfile b/datagen/Dockerfile index 7e97977..c37123c 100644 --- a/datagen/Dockerfile +++ b/datagen/Dockerfile @@ -8,6 +8,8 @@ RUN pip install -r requirements.txt CMD [ "python", "./kafka_push.py" ] +EXPOSE 9101 + diff --git a/datagen/kafka_push.py b/datagen/kafka_push.py index 561ae1e..a0b789e 100644 --- a/datagen/kafka_push.py +++ b/datagen/kafka_push.py @@ -1,6 +1,22 @@ from kafka import KafkaProducer, KafkaConsumer from data_generator import RandomDataGenerator import json +from prometheus_client import start_http_server, Counter, Histogram, Gauge, Summary +import time + +# Métricas Prometheus ampliadas +MESSAGES_SENT = Counter('kafka_messages_sent_total', 'Total mensajes enviados a Kafka') +SEND_ERRORS = Counter('kafka_send_errors_total', 'Errores al enviar mensajes a Kafka') +GEN_TIME = Histogram('kafka_message_generation_seconds', 'Tiempo en generar un mensaje aleatorio') +SEND_TIME = Histogram('kafka_message_send_seconds', 'Tiempo en enviar un mensaje a Kafka') +MESSAGE_SIZE = Histogram('kafka_message_size_bytes', 'Tamaño de los mensajes enviados a Kafka') +MESSAGES_IN_PROGRESS = Gauge('kafka_messages_in_progress', 'Mensajes en proceso de envío') +LAST_SEND_STATUS = Gauge('kafka_last_send_status', 'Último estado de envío (1=OK, 0=Error)') +SEND_SUMMARY = Summary('kafka_send_summary_seconds', 'Resumen de tiempos de envío') +SERIALIZE_TIME = Histogram('kafka_message_serialize_seconds', 'Tiempo en serializar un mensaje a JSON') + +# Inicia el endpoint Prometheus en el puerto 9101 +start_http_server(9101) def make_producer(host): producer = KafkaProducer(bootstrap_servers=[host], value_serializer=lambda x: json.dumps(x).encode("utf-8")) @@ -8,8 +24,23 @@ def make_producer(host): def send_random(producer): for msg in RandomDataGenerator(): - producer.send("probando", msg) - print("Sent") + with GEN_TIME.time(): + try: + with SERIALIZE_TIME.time(): + msg_bytes = json.dumps(msg).encode("utf-8") + MESSAGE_SIZE.observe(len(msg_bytes)) + MESSAGES_IN_PROGRESS.inc() + with SEND_TIME.time(), SEND_SUMMARY.time(): + producer.send("probando", msg) + MESSAGES_SENT.inc() + LAST_SEND_STATUS.set(1) + print("Sent") + except Exception as e: + SEND_ERRORS.inc() + LAST_SEND_STATUS.set(0) + print(f"Error enviando mensaje: {e}") + finally: + MESSAGES_IN_PROGRESS.dec() def main(): producer = make_producer(host="kafka:9092") diff --git a/datagen/requirements.txt b/datagen/requirements.txt index 423ee40..0ca32b9 100644 Binary files a/datagen/requirements.txt and b/datagen/requirements.txt differ diff --git a/docker-compose.yml b/docker-compose.yml index 54dc213..69e908d 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -102,6 +102,31 @@ services: - kafka_network restart: always + prometheus: + image: prom/prometheus:latest + container_name: prometheus + volumes: + - ./monitoring/prometheus/prometheus.yml:/etc/prometheus/prometheus.yml + command: + - '--config.file=/etc/prometheus/prometheus.yml' + ports: + - "9090:9090" + networks: + - kafka_network + + grafana: + image: grafana/grafana:latest + container_name: grafana + ports: + - "3000:3000" + volumes: + - ./monitoring/grafana:/var/lib/grafana + - ./monitoring/grafana/provisioning:/etc/grafana/provisioning + depends_on: + - prometheus + networks: + - kafka_network + redis: image: redis:latest container_name: redis_kafka @@ -111,6 +136,7 @@ services: - kafka_network restart: always + volumes: mongo_data: diff --git a/docs/.gitkeep b/docs/.gitkeep deleted file mode 100644 index e69de29..0000000 diff --git a/monitoring/__init__.py b/monitoring/__init__.py new file mode 100644 index 0000000..8b13789 --- /dev/null +++ b/monitoring/__init__.py @@ -0,0 +1 @@ + diff --git a/monitoring/grafana/provisioning/dashboards/dashboards.yml b/monitoring/grafana/provisioning/dashboards/dashboards.yml new file mode 100644 index 0000000..673349d --- /dev/null +++ b/monitoring/grafana/provisioning/dashboards/dashboards.yml @@ -0,0 +1,10 @@ +apiVersion: 1 +providers: + - name: 'default' + orgId: 1 + folder: '' + type: file + disableDeletion: false + updateIntervalSeconds: 10 + options: + path: /etc/grafana/provisioning/dashboards \ No newline at end of file diff --git a/monitoring/grafana/provisioning/dashboards/full_pipeline_metrics.json b/monitoring/grafana/provisioning/dashboards/full_pipeline_metrics.json new file mode 100644 index 0000000..498b997 --- /dev/null +++ b/monitoring/grafana/provisioning/dashboards/full_pipeline_metrics.json @@ -0,0 +1,99 @@ +{ + "id": null, + "title": "Full Pipeline Metrics", + "tags": [], + "timezone": "browser", + "schemaVersion": 36, + "version": 1, + "refresh": "5s", + "panels": [ + { + "type": "stat", + "title": "Kafka Messages Produced", + "targets": [ + { "expr": "kafka_messages_produced_total", "refId": "A" } + ], + "gridPos": { "x": 0, "y": 0, "w": 6, "h": 3 } + }, + { + "type": "stat", + "title": "Kafka Produce Errors", + "targets": [ + { "expr": "kafka_produce_errors_total", "refId": "B" } + ], + "gridPos": { "x": 6, "y": 0, "w": 6, "h": 3 } + }, + { + "type": "stat", + "title": "Kafka Messages Consumed", + "targets": [ + { "expr": "kafka_messages_consumed_total", "refId": "C" } + ], + "gridPos": { "x": 0, "y": 3, "w": 6, "h": 3 } + }, + { + "type": "stat", + "title": "Kafka Consume Errors", + "targets": [ + { "expr": "kafka_consume_errors_total", "refId": "D" } + ], + "gridPos": { "x": 6, "y": 3, "w": 6, "h": 3 } + }, + { + "type": "graph", + "title": "Kafka Message Processing Time", + "targets": [ + { "expr": "kafka_message_process_time_seconds", "refId": "E" } + ], + "gridPos": { "x": 0, "y": 6, "w": 12, "h": 6 } + }, + { + "type": "stat", + "title": "Mongo Insert Errors", + "targets": [ + { "expr": "mongo_insert_errors_total", "refId": "F" } + ], + "gridPos": { "x": 0, "y": 12, "w": 6, "h": 3 } + }, + { + "type": "graph", + "title": "Mongo Insert Time", + "targets": [ + { "expr": "mongo_insert_time_seconds", "refId": "G" } + ], + "gridPos": { "x": 6, "y": 12, "w": 6, "h": 3 } + }, + { + "type": "stat", + "title": "Supabase Insert Errors", + "targets": [ + { "expr": "supabase_insert_errors_total", "refId": "H" } + ], + "gridPos": { "x": 0, "y": 15, "w": 6, "h": 3 } + }, + { + "type": "graph", + "title": "Supabase Insert Time", + "targets": [ + { "expr": "supabase_insert_seconds", "refId": "I" } + ], + "gridPos": { "x": 6, "y": 15, "w": 6, "h": 3 } + }, + { + "type": "stat", + "title": "ETL Complete Persons", + "targets": [ + { "expr": "etl_complete_persons_total", "refId": "J" } + ], + "gridPos": { "x": 0, "y": 18, "w": 6, "h": 3 } + }, + { + "type": "stat", + "title": "ETL Incomplete Persons", + "targets": [ + { "expr": "etl_incomplete_persons_total", "refId": "K" } + ], + "gridPos": { "x": 6, "y": 18, "w": 6, "h": 3 } + } + ] +} \ No newline at end of file diff --git a/monitoring/grafana/provisioning/dashboards/pipeline_overview.json b/monitoring/grafana/provisioning/dashboards/pipeline_overview.json new file mode 100644 index 0000000..abcf34e --- /dev/null +++ b/monitoring/grafana/provisioning/dashboards/pipeline_overview.json @@ -0,0 +1,83 @@ +{ + "id": null, + "title": "Data Pipeline Overview", + "tags": ["pipeline", "kafka", "mongo", "supabase"], + "timezone": "browser", + "schemaVersion": 36, + "version": 1, + "refresh": "10s", + "panels": [ + { + "type": "stat", + "title": "Kafka - Mensajes Enviados", + "targets": [{"expr": "kafka_messages_sent_total", "refId": "A"}], + "gridPos": {"x": 0, "y": 0, "w": 6, "h": 4} + }, + { + "type": "stat", + "title": "Kafka - Errores de Envío", + "targets": [{"expr": "kafka_send_errors_total", "refId": "A"}], + "gridPos": {"x": 6, "y": 0, "w": 6, "h": 4} + }, + { + "type": "stat", + "title": "Kafka - Tamaño Medio de Mensaje", + "targets": [{"expr": "avg(kafka_message_size_bytes)", "refId": "A"}], + "gridPos": {"x": 0, "y": 4, "w": 6, "h": 4} + }, + { + "type": "stat", + "title": "Kafka Consumer - Mensajes Consumidos", + "targets": [{"expr": "kafka_messages_consumed_total", "refId": "A"}], + "gridPos": {"x": 6, "y": 4, "w": 6, "h": 4} + }, + { + "type": "stat", + "title": "Kafka Consumer - Errores de Consumo", + "targets": [{"expr": "kafka_consume_errors_total", "refId": "A"}], + "gridPos": {"x": 0, "y": 8, "w": 6, "h": 4} + }, + { + "type": "stat", + "title": "MongoDB - Documentos Insertados", + "targets": [{"expr": "mongo_inserts_total", "refId": "A"}], + "gridPos": {"x": 6, "y": 8, "w": 6, "h": 4} + }, + { + "type": "stat", + "title": "MongoDB - Errores de Inserción", + "targets": [{"expr": "mongo_insert_errors_total", "refId": "A"}], + "gridPos": {"x": 0, "y": 12, "w": 6, "h": 4} + }, + { + "type": "stat", + "title": "Supabase - Personas Migradas", + "targets": [{"expr": "supabase_persons_migrated_total", "refId": "A"}], + "gridPos": {"x": 6, "y": 12, "w": 6, "h": 4} + }, + { + "type": "stat", + "title": "Supabase - Errores de Inserción", + "targets": [{"expr": "supabase_insert_errors_total", "refId": "A"}], + "gridPos": {"x": 0, "y": 16, "w": 6, "h": 4} + }, + { + "type": "graph", + "title": "Kafka - Tiempo de Envío de Mensajes", + "targets": [{"expr": "rate(kafka_message_send_seconds_sum[1m]) / rate(kafka_message_send_seconds_count[1m])", "refId": "A"}], + "gridPos": {"x": 6, "y": 16, "w": 12, "h": 8} + }, + { + "type": "graph", + "title": "MongoDB - Tiempo de Inserción", + "targets": [{"expr": "rate(mongo_insert_seconds_sum[1m]) / rate(mongo_insert_seconds_count[1m])", "refId": "A"}], + "gridPos": {"x": 0, "y": 24, "w": 12, "h": 8} + }, + { + "type": "graph", + "title": "Supabase - Tiempo de Inserción", + "targets": [{"expr": "rate(supabase_insert_seconds_sum[1m]) / rate(supabase_insert_seconds_count[1m])", "refId": "A"}], + "gridPos": {"x": 12, "y": 24, "w": 12, "h": 8} + } + ] +} \ No newline at end of file diff --git a/monitoring/grafana/provisioning/datasources/datasource.yml b/monitoring/grafana/provisioning/datasources/datasource.yml new file mode 100644 index 0000000..8d9f9d8 --- /dev/null +++ b/monitoring/grafana/provisioning/datasources/datasource.yml @@ -0,0 +1,7 @@ +apiVersion: 1 +datasources: + - name: Prometheus + type: prometheus + access: proxy + url: http://prometheus:9090 + isDefault: true \ No newline at end of file diff --git a/monitoring/prometheus/prometheus.yml b/monitoring/prometheus/prometheus.yml new file mode 100644 index 0000000..7437405 --- /dev/null +++ b/monitoring/prometheus/prometheus.yml @@ -0,0 +1,13 @@ +global: + scrape_interval: 5s + +scrape_configs: + - job_name: 'kafka_push' + static_configs: + - targets: ['random_generator:9101'] + - job_name: 'kafka_consumer' + static_configs: + - targets: ['kafka_consumer:9102'] + - job_name: 'mongo_to_postgres' + static_configs: + - targets: ['mongo_to_postgres:9103'] \ No newline at end of file diff --git a/requirements.txt b/requirements.txt index a458443..a45c141 100644 --- a/requirements.txt +++ b/requirements.txt @@ -2,4 +2,5 @@ kafka-python pymongo python-dotenv supabase +prometheus_client redis \ No newline at end of file diff --git a/scripts/.gitkeep b/scripts/.gitkeep deleted file mode 100644 index e69de29..0000000 diff --git a/src/database/.gitkeep b/src/database/.gitkeep deleted file mode 100644 index e69de29..0000000 diff --git a/src/etl/Dockerfile b/src/etl/Dockerfile index 31e0a0b..e176002 100644 --- a/src/etl/Dockerfile +++ b/src/etl/Dockerfile @@ -20,4 +20,6 @@ ENV PYTHONPATH=/app #RUN chmod +x /app/src/etl/wait-for-it.sh # Ejecutar el script -CMD ["python", "mongo_to_postgres.py"] \ No newline at end of file +CMD ["python", "mongo_to_postgres.py"] + +EXPOSE 9103 \ No newline at end of file diff --git a/src/etl/mongo_to_postgres.py b/src/etl/mongo_to_postgres.py index 8b490e1..c41f9d6 100644 --- a/src/etl/mongo_to_postgres.py +++ b/src/etl/mongo_to_postgres.py @@ -4,17 +4,12 @@ from supabase import create_client from etl_utils import ( clean_data, - find_person_key, - register_keys, - identifying_keys, - person_index, - people_data, - group_records, address_match, nombres_en_fullname ) from utils.logg import write_log import json +from prometheus_client import start_http_server, Counter, Histogram, Gauge, Summary # Cargar variables de entorno @@ -28,6 +23,13 @@ SUPABASE_URL = os.getenv('SUPABASE_URL') SUPABASE_SERVICE_ROLE_KEY = os.getenv('SUPABASE_SERVICE_ROLE_KEY') +MIGRATED_PERSONS = Counter('supabase_persons_migrated_total', 'Total personas migradas a Supabase') +SUPABASE_INSERT_ERRORS = Counter('supabase_insert_errors_total', 'Errores al insertar en Supabase') +SUPABASE_INSERT_TIME = Histogram('supabase_insert_seconds', 'Tiempo en insertar en Supabase') +SUPABASE_INSERT_IN_PROGRESS = Gauge('supabase_insert_in_progress', 'Inserciones en proceso en Supabase') +SUPABASE_INSERT_SUMMARY = Summary('supabase_insert_summary_seconds', 'Resumen de tiempos de inserción en Supabase') + +start_http_server(9103) # Puerto Prometheus para este servicio def check_mongo_connection(): try: @@ -70,54 +72,33 @@ def get_mongo_collections(): "net_data": db["net_data"] } -def extract_and_group_records(collections): - # Extraer registros de MongoDB y agruparlos - print("🔄 Extrayendo y agrupando registros de MongoDB...") - people_data.clear() - person_index.clear() - - # Extraer y limpiar todos los documentos de todas las colecciones - all_docs = [] - for cname, col in collections.items(): - for doc in col.find(): - doc.pop("_id", None) - clean_doc = clean_data(doc) - all_docs.append(clean_doc) - write_log("INFO", "mongo_to_postgres.py", f"Documento extraído y limpiado de {cname}: {clean_doc}") - - - # Agrupar usando la lógica de etl_utils - group_records(all_docs) - print("DEBUG >> Personas agrupadas (resumen):") - for i, person in enumerate(people_data): - print(f" [{i}] {person}") - print(f"✅ Total de personas agrupadas: {len(people_data)}") - write_log("INFO", "mongo_to_postgres.py", f"Total de personas agrupadas: {len(people_data)}") - return people_data - def insert_location(supabase, person): address = person.get("address") city = person.get("city") if not address or not city: return None try: - resp = supabase.table("locations").select("location_id").eq("address", address).eq("city", city).execute() - if resp.data: - location_id = resp.data[0]["location_id"] - else: - ins = supabase.table("locations").insert({"address": address, "city": city}).execute() - location_id = ins.data[0]["location_id"] + with SUPABASE_INSERT_TIME.time(), SUPABASE_INSERT_SUMMARY.time(): + SUPABASE_INSERT_IN_PROGRESS.inc() + resp = supabase.table("locations").select("location_id").eq("address", address).eq("city", city).execute() + if resp.data: + location_id = resp.data[0]["location_id"] + else: + ins = supabase.table("locations").insert({"address": address, "city": city}).execute() + location_id = ins.data[0]["location_id"] write_log("INFO", "mongo_to_postgres.py", f"Ubicación insertada/obtenida: {address}, {city} -> {location_id}") return location_id except Exception as e: + SUPABASE_INSERT_ERRORS.inc() write_log("ERROR", "mongo_to_postgres.py", f"Error insertando ubicación: {e}") return None + finally: + SUPABASE_INSERT_IN_PROGRESS.dec() def insert_person(supabase, person, location_id): if not person.get("passport") or not person.get("name") or not person.get("last_name") or not person.get("email"): write_log("WARNING", "mongo_to_postgres.py", f"Datos insuficientes para persona: {person}") return - # Ignorar fullname y address data = { "passport": person.get("passport"), "first_name": person.get("name"), @@ -128,11 +109,17 @@ def insert_person(supabase, person, location_id): "location_id_fk": location_id } try: - supabase.table("persons").upsert(data, on_conflict=["passport"]).execute() + with SUPABASE_INSERT_TIME.time(), SUPABASE_INSERT_SUMMARY.time(): + SUPABASE_INSERT_IN_PROGRESS.inc() + supabase.table("persons").upsert(data, on_conflict=["passport"]).execute() + MIGRATED_PERSONS.inc() write_log("INFO", "mongo_to_postgres.py", f"Persona insertada/actualizada: {data}") except Exception as e: + SUPABASE_INSERT_ERRORS.inc() print(f"ERROR >> Fallo insertando en persons: {e}") write_log("ERROR", "mongo_to_postgres.py", f"Error insertando persons: {e}") + finally: + SUPABASE_INSERT_IN_PROGRESS.dec() def insert_bank(supabase, person): if not person.get("passport") or not person.get("IBAN"): @@ -143,32 +130,40 @@ def insert_bank(supabase, person): "salary": person.get("salary") } try: - supabase.table("bank_data").upsert(data, on_conflict=["passport_fk"]).execute() + with SUPABASE_INSERT_TIME.time(), SUPABASE_INSERT_SUMMARY.time(): + SUPABASE_INSERT_IN_PROGRESS.inc() + supabase.table("bank_data").upsert(data, on_conflict=["passport_fk"]).execute() write_log("INFO", "mongo_to_postgres.py", f"Bank_data insertado/actualizado: {data}") except Exception as e: + SUPABASE_INSERT_ERRORS.inc() print(f"ERROR >> Fallo insertando en bank_data: {e}") write_log("ERROR", "mongo_to_postgres.py", f"Error insertando bank_data: {e}") + finally: + SUPABASE_INSERT_IN_PROGRESS.dec() def insert_network(supabase, person, location_id): if not person.get("passport") or not person.get("IPv4"): return - # Ignorar fullname y address data = { "passport_fk": person.get("passport"), "location_id_fk": location_id, "ip_address": person.get("IPv4") } try: - supabase.table("network_data").upsert(data, on_conflict=["passport_fk"]).execute() + with SUPABASE_INSERT_TIME.time(), SUPABASE_INSERT_SUMMARY.time(): + SUPABASE_INSERT_IN_PROGRESS.inc() + supabase.table("network_data").upsert(data, on_conflict=["passport_fk"]).execute() write_log("INFO", "mongo_to_postgres.py", f"Network_data insertado/actualizado: {data}") except Exception as e: + SUPABASE_INSERT_ERRORS.inc() print(f"ERROR >> Fallo insertando en network_data: {e}") write_log("ERROR", "mongo_to_postgres.py", f"Error insertando network_data: {e}") + finally: + SUPABASE_INSERT_IN_PROGRESS.dec() def insert_professional(supabase, person): if not person.get("passport") or not person.get("company"): return - # Ignorar fullname y address data = { "passport_fk": person.get("passport"), "company_name": person.get("company"), @@ -178,11 +173,16 @@ def insert_professional(supabase, person): "job_title": person.get("job") } try: - supabase.table("professional_data").upsert(data, on_conflict=["passport_fk"]).execute() + with SUPABASE_INSERT_TIME.time(), SUPABASE_INSERT_SUMMARY.time(): + SUPABASE_INSERT_IN_PROGRESS.inc() + supabase.table("professional_data").upsert(data, on_conflict=["passport_fk"]).execute() write_log("INFO", "mongo_to_postgres.py", f"Professional_data insertado/actualizado: {data}") except Exception as e: + SUPABASE_INSERT_ERRORS.inc() print(f"ERROR >> Fallo insertando en professional_data: {e}") write_log("ERROR", "mongo_to_postgres.py", f"Error insertando professional_data: {e}") + finally: + SUPABASE_INSERT_IN_PROGRESS.dec() def buscar_passport_por_fullname_address(person, personas): """ diff --git a/src/etl/requirements.txt b/src/etl/requirements.txt index 61ac3dc..89a3fca 100644 --- a/src/etl/requirements.txt +++ b/src/etl/requirements.txt @@ -2,4 +2,5 @@ kafka-python-ng==2.2.2 pymongo supabase python-dotenv +prometheus_client redis diff --git a/src/kafka_consumer/Dockerfile b/src/kafka_consumer/Dockerfile index 980104a..a0bb6dd 100644 --- a/src/kafka_consumer/Dockerfile +++ b/src/kafka_consumer/Dockerfile @@ -1,15 +1,10 @@ FROM python:3.10-slim -# Instalar netcat para wait-for-it.sh -#RUN apt-get update && apt-get install -y netcat-traditional && rm -rf /var/lib/apt/lists/* - # Instalar netcat para wait-for-it.sh RUN apt-get update && apt-get install -y netcat-traditional && rm -rf /var/lib/apt/lists/* WORKDIR /app - - # Copy all files COPY requirements.txt . @@ -17,28 +12,16 @@ COPY requirements.txt . RUN pip install --no-cache-dir --upgrade pip && \ pip install --no-cache-dir -r requirements.txt -# Set PYTHONPATH to include src directory -#ENV PYTHONPATH=/app - -# Make wait-for-it.sh executable -#RUN chmod +x /app/src/kafka_consumer/wait-for-it.sh - - - # Copy consumer files COPY *.py . # The src directory will be mounted via docker-compose volumes # So we don't need to copy the entire src structure here - - - # Set working directory to where the code will be mounted WORKDIR /app/src/kafka_consumer - -# Use wait-for-it.sh to wait for Kafka # CMD ["python", "consumer.py"] - CMD ["sh", "-c", "while ! nc -z kafka 9092; do echo 'Esperando Kafka...'; sleep 2; done; echo 'Kafka disponible!'; python consumer.py"] + +EXPOSE 9102 \ No newline at end of file diff --git a/src/kafka_consumer/consumer.py b/src/kafka_consumer/consumer.py index 21607cb..1cc8845 100644 --- a/src/kafka_consumer/consumer.py +++ b/src/kafka_consumer/consumer.py @@ -3,13 +3,34 @@ import redis import hashlib from storage_mongo import guardar_en_mongo +from prometheus_client import start_http_server, Counter, Histogram, Gauge, Summary + +CONSUMED_MESSAGES = Counter('kafka_messages_consumed_total', 'Total mensajes consumidos de Kafka') +CONSUME_ERRORS = Counter('kafka_consume_errors_total', 'Errores al consumir mensajes de Kafka') +CONSUME_TIME = Histogram('kafka_message_consume_seconds', 'Tiempo en procesar un mensaje de Kafka') +CONSUME_IN_PROGRESS = Gauge('kafka_consume_in_progress', 'Mensajes en proceso de consumo') +CONSUME_SUMMARY = Summary('kafka_consume_summary_seconds', 'Resumen de tiempos de consumo') + +start_http_server(9102) # Puerto Prometheus para el consumidor + +def main(): + consumer = KafkaConsumer( + 'probando', # 'probando' es el nombre del topic a consumir + bootstrap_servers='kafka:9092', #Dirección del servidor Kafka, en mi docker es kafka:9092 + auto_offset_reset='earliest', # Para leer desde el principio del topic (si nunca ha leido), 'latest' para leer solo nuevos mensajes + enable_auto_commit=True, # Kafka guarda el "offset" automáticamente. El offset es un marcador que dice: "ya leí hasta aquí", se guardará automáticamente qué mensajes ya leí (en el grupo de consumidores) + group_id='hrpro-consumer-group', # Identificador del grupo de consumidores, si no se especifica, se crea uno por defecto + value_deserializer=lambda x: json.loads(x.decode('utf-8')) # Deserializador para convertir el mensaje de bytes a JSON + ) + + print("Esperando mensajes...") + import sys import os # Resto de tus imports from etl.utils.logg import write_log - # Configurar Redis (host y puerto pueden venir de variables de entorno si quieres) redis_client = redis.Redis(host='redis', port=6379, decode_responses=True) @@ -17,9 +38,8 @@ def fingerprint(message): # Creamos hash único para mensaje para deduplicar return hashlib.sha256(json.dumps(message, sort_keys=True).encode('utf-8')).hexdigest() - -# def write_log(level, module, message): -# print(f"{level} - {module} - {message}") +def write_log(level, module, message): + print(f"{level} - {module} - {message}") def main(): try: @@ -36,6 +56,7 @@ def main(): write_log("ERROR", "consumer.py", f"Error iniciando Kafka consumer: {e}") return + for message in consumer: try: # # --- DEBUG: simular mensaje duplicado --- @@ -55,9 +76,21 @@ def main(): write_log("INFO", "consumer.py", f"Mensaje duplicado ignorado: {fp}") continue + + with CONSUME_TIME.time(), CONSUME_SUMMARY.time(): + try: + CONSUME_IN_PROGRESS.inc() + guardar_en_mongo(message.value) + CONSUMED_MESSAGES.inc() + except Exception as e: + CONSUME_ERRORS.inc() + finally: + CONSUME_IN_PROGRESS.dec() + # Guardar en MongoDB guardar_en_mongo(msg) + # Guardar fingerprint en Redis con TTL 1 día (86400 segundos) redis_client.set(fp, 1, ex=86400) write_log("INFO", "consumer.py", f"Mensaje procesado y guardado: {fp}") diff --git a/src/kafka_consumer/requirements.txt b/src/kafka_consumer/requirements.txt index b1f747d..3ab434d 100644 --- a/src/kafka_consumer/requirements.txt +++ b/src/kafka_consumer/requirements.txt @@ -1,4 +1,6 @@ pymongo python-dotenv kafka-python-ng==2.2.2 +prometheus_client redis + diff --git a/src/kafka_consumer/storage_mongo.py b/src/kafka_consumer/storage_mongo.py index bce0631..2beebcf 100644 --- a/src/kafka_consumer/storage_mongo.py +++ b/src/kafka_consumer/storage_mongo.py @@ -1,13 +1,9 @@ import os from pymongo import MongoClient +from prometheus_client import Counter, Histogram, Gauge, Summary import sys -import os - - from etl.utils.logg import write_log - - MONGO_URI = os.getenv("MONGO_URI", "mongodb://admin:adminpassword@mongo:27017/") client = None db = None @@ -27,35 +23,48 @@ def conectar_mongo(): # def write_log(level, module, message): # print(f"{level} - {module} - {message}") +MONGO_INSERTS = Counter('mongo_inserts_total', 'Total documentos insertados en MongoDB') +MONGO_ERRORS = Counter('mongo_insert_errors_total', 'Errores al insertar en MongoDB') +MONGO_INSERT_TIME = Histogram('mongo_insert_seconds', 'Tiempo en insertar documento en MongoDB') +MONGO_INSERT_IN_PROGRESS = Gauge('mongo_insert_in_progress', 'Documentos en proceso de inserción') +MONGO_INSERT_SUMMARY = Summary('mongo_insert_summary_seconds', 'Resumen de tiempos de inserción') + def guardar_en_mongo(documento): try: - if client is None or db is None: - conectar_mongo() - - keys = set(k.lower() for k in documento.keys()) - - if "passport" in keys and "name" in keys: - collection = db["personal_data"] - filter_key = {"passport": documento.get("passport")} - elif "fullname" in keys and "city" in keys: - collection = db["location_data"] - filter_key = {"fullname": documento.get("fullname")} - elif "company" in keys or "job" in keys: - collection = db["professional_data"] - filter_key = {"fullname": documento.get("fullname")} - elif "iban" in keys: - collection = db["bank_data"] - filter_key = {"passport": documento.get("passport")} - elif "ipv4" in keys: - collection = db["net_data"] - filter_key = {"IPv4": documento.get("IPv4")} - else: - collection = db["unknown_type"] - filter_key = documento # sin filtro único, puede cambiar - - collection.update_one(filter_key, {"$set": documento}, upsert=True) - write_log("INFO", "storage_mongo.py", f"Documento guardado o actualizado en colección: {collection.name}") + with MONGO_INSERT_TIME.time(), MONGO_INSERT_SUMMARY.time(): + MONGO_INSERT_IN_PROGRESS.inc() + #print("Guardando documento en MongoDB:", documento) + keys = set(k.lower() for k in documento.keys()) + + if "passport" in keys and "name" in keys: + collection = db["personal_data"] + filter_key = {"passport": documento.get("passport")} + elif "fullname" in keys and "city" in keys: + collection = db["location_data"] + filter_key = {"fullname": documento.get("fullname")} + elif "company" in keys or "job" in keys: + collection = db["professional_data"] + filter_key = {"fullname": documento.get("fullname")} + elif "iban" in keys: + collection = db["bank_data"] + filter_key = {"passport": documento.get("passport")} + elif "ipv4" in keys: + collection = db["net_data"] + # usar IPv4 como filtro único + filter_key = {"IPv4": documento.get("IPv4")} + else: + collection = db["unknown_type"] + filter_key = documento # sin filtro único, podría cambiar + + collection.update_one(filter_key, {"$set": documento}, upsert=True) + MONGO_INSERTS.inc() + #print(f"✅ Documento guardado o actualizado en colección: {collection.name}") except Exception as e: - write_log("ERROR", "storage_mongo.py", f"Error al guardar en MongoDB: {e}") + MONGO_ERRORS.inc() + #print("❌ Error al guardar en MongoDB :", e) + print("") + + finally: + MONGO_INSERT_IN_PROGRESS.dec() diff --git a/src/monitoring/grafana_dashboards/.gitkeep b/src/monitoring/grafana_dashboards/.gitkeep deleted file mode 100644 index e69de29..0000000 diff --git a/tests/unit/.gitkeep b/tests/unit/.gitkeep deleted file mode 100644 index e69de29..0000000 diff --git a/src/etl/test.py b/tests/unit/test.py similarity index 100% rename from src/etl/test.py rename to tests/unit/test.py