diff --git a/apps/etl/etl_tasks/pdc.py b/apps/etl/etl_tasks/pdc.py index 78eaae15..41af0ded 100644 --- a/apps/etl/etl_tasks/pdc.py +++ b/apps/etl/etl_tasks/pdc.py @@ -13,13 +13,14 @@ @shared_task -def extract_and_transform_pdc_data(): +def ext_and_transform_pdc_latest_data(): + # TODO Fix according to latest extraction logic that accepts Metadata data_url = f"{etl_config.PDC_SENTRY_BASE_URL}/hp_srv/services/hazards/t/json/get_active_hazards" PDCExtraction.task.s(data_url).apply_async() @shared_task(queue="extraction") -def extract_and_transform_historical_pdc_data(): +def ext_and_transform_pdc_historical_data(): pdc_start_date = datetime.strptime(str(etl_config.PDC_START_DATE), "%Y-%m-%d") pdc_interval_years = etl_config.PDC_EXTRACTION_INTERVAL_YEARS diff --git a/apps/etl/management/commands/extract_pdc_data.py b/apps/etl/management/commands/extract_pdc_data.py index a3adcf4d..b7a7cb57 100644 --- a/apps/etl/management/commands/extract_pdc_data.py +++ b/apps/etl/management/commands/extract_pdc_data.py @@ -2,7 +2,7 @@ from django.core.management.base import BaseCommand -from apps.etl.etl_tasks.pdc import extract_and_transform_historical_pdc_data +from apps.etl.etl_tasks.pdc import ext_and_transform_pdc_historical_data logger = logging.getLogger(__name__) @@ -11,4 +11,4 @@ class Command(BaseCommand): help = "Import data from pdc api" def handle(self, *args, **options): - extract_and_transform_historical_pdc_data.delay() + ext_and_transform_pdc_historical_data.delay() diff --git a/apps/etl/tasks.py b/apps/etl/tasks.py index 8389563b..e83a66c3 100644 --- a/apps/etl/tasks.py +++ b/apps/etl/tasks.py @@ -2,21 +2,6 @@ from django.core.management import call_command -@shared_task -def extract_pdc_data(): - call_command("extract_pdc_data") - - -@shared_task -def extract_gidd_data(): - call_command("extract_gidd_data") - - -@shared_task -def extract_usgs_data(): - call_command("extract_usgs_data") - - @shared_task def load_data(): call_command("load_data_to_stac") diff --git a/apps/etl/transform/sources/pdc.py b/apps/etl/transform/sources/pdc.py index ad432d8f..661e0584 100644 --- a/apps/etl/transform/sources/pdc.py +++ b/apps/etl/transform/sources/pdc.py @@ -2,10 +2,13 @@ import logging import tempfile +from pystac_monty.geocoding import TheirGeocoder from pystac_monty.sources.pdc import PDCDataSource, PDCTransformer -from apps.etl.models import ExtractionData +from apps.etl.models import ExtractionData, Transform, get_trace_id from main.celery import app +from main.configs import etl_config +from main.logging import log_extra from .handler import BaseTransformerHandler @@ -17,49 +20,74 @@ class PDCTransformHandler(BaseTransformerHandler[PDCTransformer, PDCDataSource]) transformer_schema = PDCDataSource @classmethod - def get_schema_data(cls, extraction_obj: ExtractionData): - metadata: dict | None = extraction_obj.metadata - if not metadata: - raise Exception("Metadata is not defined") - input_metadata = metadata.get("input", {}) - - from apps.etl.extraction.sources.pdc.extract import PdcExposureInputMetadata - - input_metadata = PdcExposureInputMetadata(**input_metadata) - - geo_json_obj = ExtractionData.objects.get(id=input_metadata.geojson_id) - - with geo_json_obj.resp_data.open("rb") as f: - file_content = f.read() - # FIXME: Why do we have delete=False? We need to delete this in post action - tmp_geojson_file = tempfile.NamedTemporaryFile(suffix=".json", delete=False) - tmp_geojson_file.write(file_content) - tmp_geojson_file.close() + def get_schema_data(cls, extraction_obj: ExtractionData, geo_json_obj: ExtractionData): # type: ignore[reportIncompatibleMethodOverride] + source_url = extraction_obj.url with extraction_obj.parent.resp_data.open("rb") as f: file_content = f.read() - # FIXME: Why do we have delete=False? We need to delete this in post action tmp_hazard_file = tempfile.NamedTemporaryFile(suffix=".json", delete=False) tmp_hazard_file.write(file_content) - tmp_hazard_file.close() with extraction_obj.resp_data.open("rb") as f: file_content = f.read() - # FIXME: Why do we have delete=False? We need to delete this in post action tmp_exposure_detail_file = tempfile.NamedTemporaryFile(suffix=".json", delete=False) tmp_exposure_detail_file.write(file_content) - tmp_exposure_detail_file.close() + + with geo_json_obj.resp_data.open("rb") as f: + file_content = f.read() + tmp_geojson_file = tempfile.NamedTemporaryFile(suffix=".json", delete=False) + tmp_geojson_file.write(file_content) data = { "hazards_file_path": tmp_hazard_file.name, - "exposure_timestamp": input_metadata.exposure_id, - "uuid": input_metadata.hazard_uuid, + "exposure_timestamp": extraction_obj.metadata["exposure_id"], + "uuid": extraction_obj.metadata["uuid"], "exposure_detail_file_path": tmp_exposure_detail_file.name, "geojson_file_path": tmp_geojson_file.name, } - return cls.transformer_schema(source_url=extraction_obj.parent.url, data=json.dumps(data)) + + return cls.transformer_schema(source_url=source_url, data=json.dumps(data)) + + @classmethod + def handle_transformation(cls, extraction_id, geo_json_id): # type: ignore[reportIncompatibleMethodOverride] + logger.info("Transformation started") + extraction_obj = ExtractionData.objects.get(id=extraction_id) + geo_json_obj = ExtractionData.objects.get(id=geo_json_id) + if not extraction_obj.resp_data: + logger.info("Transformation ended due to no data") + return + + transform_obj = Transform.objects.create( + extraction=extraction_obj, + trace_id=get_trace_id(extraction_obj), + ) + + transform_obj.mark_as_started() + geocoder = TheirGeocoder(etl_config.GEOCODER_URL) + + try: + schema = cls.get_schema_data(extraction_obj, geo_json_obj) + transformer = cls.transformer_class(schema, geocoder) + transformed_items = transformer.make_items() + + cls.load_stac_item_to_queue(transform_obj, transformed_items) + + summary = transformer.transform_summary + transform_obj.metadata["summary"] = { + "failed_rows": summary.failed_rows, + "total_rows": summary.total_rows, + } + transform_obj.mark_as_ended(Transform.Status.SUCCESS, update_fields=["metadata"]) + + logger.info("Transformation ended") + + except Exception as e: + logger.error("Transformation failed", exc_info=True, extra=log_extra({"extraction_id": extraction_obj.id})) + transform_obj.mark_as_ended(Transform.Status.FAILED) + # FIXME: Check if this creates duplicate entry in Sentry. if yes, remove this. + raise e @staticmethod - @app.task(queue="transform") - def task(extraction_id): - return PDCTransformHandler().handle_transformation(extraction_id) + @app.task + def task(extraction_id, geo_json_id): # type: ignore[reportIncompatibleMethodOverride] + return PDCTransformHandler().handle_transformation(extraction_id, geo_json_id) diff --git a/main/cronjobs.py b/main/cronjobs.py index f3088c91..57194ba0 100644 --- a/main/cronjobs.py +++ b/main/cronjobs.py @@ -50,7 +50,7 @@ class CronJob(typing.NamedTuple): schedule=crontab(minute=0, hour=17), ), "import_gidd_data": CronJob( - task="apps.etl.tasks.extract_gidd_data", + task="apps.etl.etl_tasks.gidd.ext_and_transform_gidd_latest_data", schedule=crontab(minute=0, hour=18), ), "import_usgs_data": CronJob( @@ -58,7 +58,7 @@ class CronJob(typing.NamedTuple): schedule=crontab(minute=0, hour=19), ), "import_pdc_data": CronJob( - task="apps.etl.tasks.extract_pdc_data", + task="apps.etl.etl_tasks.pdc.ext_and_transform_pdc_latest_data", schedule=crontab(minute=30, hour=20), ), "load_data_to_stac": CronJob(