Skip to content

Commit 47c11ec

Browse files
tnagorraRup-Narayan-Rajbanshi
authored andcommitted
Use new geocoder
- Split EMDAT historical data extraction - Update helm chart
1 parent ae78590 commit 47c11ec

File tree

1 file changed

+57
-29
lines changed
  • apps/etl/transform/sources

1 file changed

+57
-29
lines changed

apps/etl/transform/sources/pdc.py

Lines changed: 57 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -2,10 +2,13 @@
22
import logging
33
import tempfile
44

5+
from pystac_monty.geocoding import TheirGeocoder
56
from pystac_monty.sources.pdc import PDCDataSource, PDCTransformer
67

7-
from apps.etl.models import ExtractionData
8+
from apps.etl.models import ExtractionData, Transform, get_trace_id
89
from main.celery import app
10+
from main.configs import etl_config
11+
from main.logging import log_extra
912

1013
from .handler import BaseTransformerHandler
1114

@@ -17,49 +20,74 @@ class PDCTransformHandler(BaseTransformerHandler[PDCTransformer, PDCDataSource])
1720
transformer_schema = PDCDataSource
1821

1922
@classmethod
20-
def get_schema_data(cls, extraction_obj: ExtractionData):
21-
metadata: dict | None = extraction_obj.metadata
22-
if not metadata:
23-
raise Exception("Metadata is not defined")
24-
input_metadata = metadata.get("input", {})
25-
26-
from apps.etl.extraction.sources.pdc.extract import PdcExposureInputMetadata
27-
28-
input_metadata = PdcExposureInputMetadata(**input_metadata)
29-
30-
geo_json_obj = ExtractionData.objects.get(id=input_metadata.geojson_id)
31-
32-
with geo_json_obj.resp_data.open("rb") as f:
33-
file_content = f.read()
34-
# FIXME: Why do we have delete=False? We need to delete this in post action
35-
tmp_geojson_file = tempfile.NamedTemporaryFile(suffix=".json", delete=False)
36-
tmp_geojson_file.write(file_content)
37-
tmp_geojson_file.close()
23+
def get_schema_data(cls, extraction_obj: ExtractionData, geo_json_obj: ExtractionData): # type: ignore[reportIncompatibleMethodOverride]
24+
source_url = extraction_obj.url
3825

3926
with extraction_obj.parent.resp_data.open("rb") as f:
4027
file_content = f.read()
41-
# FIXME: Why do we have delete=False? We need to delete this in post action
4228
tmp_hazard_file = tempfile.NamedTemporaryFile(suffix=".json", delete=False)
4329
tmp_hazard_file.write(file_content)
44-
tmp_hazard_file.close()
4530

4631
with extraction_obj.resp_data.open("rb") as f:
4732
file_content = f.read()
48-
# FIXME: Why do we have delete=False? We need to delete this in post action
4933
tmp_exposure_detail_file = tempfile.NamedTemporaryFile(suffix=".json", delete=False)
5034
tmp_exposure_detail_file.write(file_content)
51-
tmp_exposure_detail_file.close()
35+
36+
with geo_json_obj.resp_data.open("rb") as f:
37+
file_content = f.read()
38+
tmp_geojson_file = tempfile.NamedTemporaryFile(suffix=".json", delete=False)
39+
tmp_geojson_file.write(file_content)
5240

5341
data = {
5442
"hazards_file_path": tmp_hazard_file.name,
55-
"exposure_timestamp": input_metadata.exposure_id,
56-
"uuid": input_metadata.hazard_uuid,
43+
"exposure_timestamp": extraction_obj.metadata["exposure_id"],
44+
"uuid": extraction_obj.metadata["uuid"],
5745
"exposure_detail_file_path": tmp_exposure_detail_file.name,
5846
"geojson_file_path": tmp_geojson_file.name,
5947
}
60-
return cls.transformer_schema(source_url=extraction_obj.parent.url, data=json.dumps(data))
48+
49+
return cls.transformer_schema(source_url=source_url, data=json.dumps(data))
50+
51+
@classmethod
52+
def handle_transformation(cls, extraction_id, geo_json_id): # type: ignore[reportIncompatibleMethodOverride]
53+
logger.info("Transformation started")
54+
extraction_obj = ExtractionData.objects.get(id=extraction_id)
55+
geo_json_obj = ExtractionData.objects.get(id=geo_json_id)
56+
if not extraction_obj.resp_data:
57+
logger.info("Transformation ended due to no data")
58+
return
59+
60+
transform_obj = Transform.objects.create(
61+
extraction=extraction_obj,
62+
trace_id=get_trace_id(extraction_obj),
63+
)
64+
65+
transform_obj.mark_as_started()
66+
geocoder = TheirGeocoder(etl_config.GEOCODER_URL)
67+
68+
try:
69+
schema = cls.get_schema_data(extraction_obj, geo_json_obj)
70+
transformer = cls.transformer_class(schema, geocoder)
71+
transformed_items = transformer.make_items()
72+
73+
cls.load_stac_item_to_queue(transform_obj, transformed_items)
74+
75+
summary = transformer.transform_summary
76+
transform_obj.metadata["summary"] = {
77+
"failed_rows": summary.failed_rows,
78+
"total_rows": summary.total_rows,
79+
}
80+
transform_obj.mark_as_ended(Transform.Status.SUCCESS, update_fields=["metadata"])
81+
82+
logger.info("Transformation ended")
83+
84+
except Exception as e:
85+
logger.error("Transformation failed", exc_info=True, extra=log_extra({"extraction_id": extraction_obj.id}))
86+
transform_obj.mark_as_ended(Transform.Status.FAILED)
87+
# FIXME: Check if this creates duplicate entry in Sentry. if yes, remove this.
88+
raise e
6189

6290
@staticmethod
63-
@app.task(queue="transform")
64-
def task(extraction_id):
65-
return PDCTransformHandler().handle_transformation(extraction_id)
91+
@app.task
92+
def task(extraction_id, geo_json_id): # type: ignore[reportIncompatibleMethodOverride]
93+
return PDCTransformHandler().handle_transformation(extraction_id, geo_json_id)

0 commit comments

Comments
 (0)