Skip to content

Feature use geocoding #242

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 2 commits into
base: project/historical-gdacs-pdc
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions apps/etl/etl_tasks/pdc.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,14 @@


@shared_task
def extract_and_transform_pdc_data():
def ext_and_transform_pdc_latest_data():
# TODO Fix according to latest extraction logic that accepts Metadata
data_url = f"{etl_config.PDC_SENTRY_BASE_URL}/hp_srv/services/hazards/t/json/get_active_hazards"
PDCExtraction.task.s(data_url).apply_async()


@shared_task(queue="extraction")
def extract_and_transform_historical_pdc_data():
def ext_and_transform_pdc_historical_data():
pdc_start_date = datetime.strptime(str(etl_config.PDC_START_DATE), "%Y-%m-%d")
pdc_interval_years = etl_config.PDC_EXTRACTION_INTERVAL_YEARS

Expand Down
4 changes: 2 additions & 2 deletions apps/etl/management/commands/extract_pdc_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

from django.core.management.base import BaseCommand

from apps.etl.etl_tasks.pdc import extract_and_transform_historical_pdc_data
from apps.etl.etl_tasks.pdc import ext_and_transform_pdc_historical_data

logger = logging.getLogger(__name__)

Expand All @@ -11,4 +11,4 @@ class Command(BaseCommand):
help = "Import data from pdc api"

def handle(self, *args, **options):
extract_and_transform_historical_pdc_data.delay()
ext_and_transform_pdc_historical_data.delay()
15 changes: 0 additions & 15 deletions apps/etl/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,21 +2,6 @@
from django.core.management import call_command


@shared_task
def extract_pdc_data():
call_command("extract_pdc_data")


@shared_task
def extract_gidd_data():
call_command("extract_gidd_data")


@shared_task
def extract_usgs_data():
call_command("extract_usgs_data")


@shared_task
def load_data():
call_command("load_data_to_stac")
86 changes: 57 additions & 29 deletions apps/etl/transform/sources/pdc.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,13 @@
import logging
import tempfile

from pystac_monty.geocoding import TheirGeocoder
from pystac_monty.sources.pdc import PDCDataSource, PDCTransformer

from apps.etl.models import ExtractionData
from apps.etl.models import ExtractionData, Transform, get_trace_id
from main.celery import app
from main.configs import etl_config
from main.logging import log_extra

from .handler import BaseTransformerHandler

Expand All @@ -17,49 +20,74 @@ class PDCTransformHandler(BaseTransformerHandler[PDCTransformer, PDCDataSource])
transformer_schema = PDCDataSource

@classmethod
def get_schema_data(cls, extraction_obj: ExtractionData):
metadata: dict | None = extraction_obj.metadata
if not metadata:
raise Exception("Metadata is not defined")
input_metadata = metadata.get("input", {})

from apps.etl.extraction.sources.pdc.extract import PdcExposureInputMetadata

input_metadata = PdcExposureInputMetadata(**input_metadata)

geo_json_obj = ExtractionData.objects.get(id=input_metadata.geojson_id)

with geo_json_obj.resp_data.open("rb") as f:
file_content = f.read()
# FIXME: Why do we have delete=False? We need to delete this in post action
tmp_geojson_file = tempfile.NamedTemporaryFile(suffix=".json", delete=False)
tmp_geojson_file.write(file_content)
tmp_geojson_file.close()
def get_schema_data(cls, extraction_obj: ExtractionData, geo_json_obj: ExtractionData): # type: ignore[reportIncompatibleMethodOverride]
source_url = extraction_obj.url

with extraction_obj.parent.resp_data.open("rb") as f:
file_content = f.read()
# FIXME: Why do we have delete=False? We need to delete this in post action
tmp_hazard_file = tempfile.NamedTemporaryFile(suffix=".json", delete=False)
tmp_hazard_file.write(file_content)
tmp_hazard_file.close()

with extraction_obj.resp_data.open("rb") as f:
file_content = f.read()
# FIXME: Why do we have delete=False? We need to delete this in post action
tmp_exposure_detail_file = tempfile.NamedTemporaryFile(suffix=".json", delete=False)
tmp_exposure_detail_file.write(file_content)
tmp_exposure_detail_file.close()

with geo_json_obj.resp_data.open("rb") as f:
file_content = f.read()
tmp_geojson_file = tempfile.NamedTemporaryFile(suffix=".json", delete=False)
tmp_geojson_file.write(file_content)

data = {
"hazards_file_path": tmp_hazard_file.name,
"exposure_timestamp": input_metadata.exposure_id,
"uuid": input_metadata.hazard_uuid,
"exposure_timestamp": extraction_obj.metadata["exposure_id"],
"uuid": extraction_obj.metadata["uuid"],
"exposure_detail_file_path": tmp_exposure_detail_file.name,
"geojson_file_path": tmp_geojson_file.name,
}
return cls.transformer_schema(source_url=extraction_obj.parent.url, data=json.dumps(data))

return cls.transformer_schema(source_url=source_url, data=json.dumps(data))

@classmethod
def handle_transformation(cls, extraction_id, geo_json_id): # type: ignore[reportIncompatibleMethodOverride]
logger.info("Transformation started")
extraction_obj = ExtractionData.objects.get(id=extraction_id)
geo_json_obj = ExtractionData.objects.get(id=geo_json_id)
if not extraction_obj.resp_data:
logger.info("Transformation ended due to no data")
return

transform_obj = Transform.objects.create(
extraction=extraction_obj,
trace_id=get_trace_id(extraction_obj),
)

transform_obj.mark_as_started()
geocoder = TheirGeocoder(etl_config.GEOCODER_URL)

try:
schema = cls.get_schema_data(extraction_obj, geo_json_obj)
transformer = cls.transformer_class(schema, geocoder)
transformed_items = transformer.make_items()

cls.load_stac_item_to_queue(transform_obj, transformed_items)

summary = transformer.transform_summary
transform_obj.metadata["summary"] = {
"failed_rows": summary.failed_rows,
"total_rows": summary.total_rows,
}
transform_obj.mark_as_ended(Transform.Status.SUCCESS, update_fields=["metadata"])

logger.info("Transformation ended")

except Exception as e:
logger.error("Transformation failed", exc_info=True, extra=log_extra({"extraction_id": extraction_obj.id}))
transform_obj.mark_as_ended(Transform.Status.FAILED)
# FIXME: Check if this creates duplicate entry in Sentry. if yes, remove this.
raise e

@staticmethod
@app.task(queue="transform")
def task(extraction_id):
return PDCTransformHandler().handle_transformation(extraction_id)
@app.task
def task(extraction_id, geo_json_id): # type: ignore[reportIncompatibleMethodOverride]
return PDCTransformHandler().handle_transformation(extraction_id, geo_json_id)
4 changes: 2 additions & 2 deletions main/cronjobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,15 +50,15 @@ class CronJob(typing.NamedTuple):
schedule=crontab(minute=0, hour=17),
),
"import_gidd_data": CronJob(
task="apps.etl.tasks.extract_gidd_data",
task="apps.etl.etl_tasks.gidd.ext_and_transform_gidd_latest_data",
schedule=crontab(minute=0, hour=18),
),
"import_usgs_data": CronJob(
task="apps.etl.etl_tasks.usgs.ext_and_transform_usgs_latest_data",
schedule=crontab(minute=0, hour=19),
),
"import_pdc_data": CronJob(
task="apps.etl.tasks.extract_pdc_data",
task="apps.etl.etl_tasks.pdc.ext_and_transform_pdc_latest_data",
schedule=crontab(minute=30, hour=20),
),
"load_data_to_stac": CronJob(
Expand Down
Loading