Skip to content

Commit cd4e436

Browse files
gdacs: pass file as input in transformer
1 parent 5e47bc8 commit cd4e436

File tree

2 files changed

+20
-9
lines changed

2 files changed

+20
-9
lines changed

apps/etl/extraction/sources/gdacs/extract.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -147,7 +147,7 @@ def handle_extract(self):
147147
@app.task(
148148
bind=True,
149149
base=RetryableTask,
150-
rate_limit="100/m",
150+
rate_limit="50/m",
151151
)
152152
def task(celery_task, extraction_id) -> int:
153153
return GdacsExtraction(celery_task, extraction_id).handle()

apps/etl/transform/sources/gdacs.py

Lines changed: 19 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,3 @@
1-
import json
21
import logging
32

43
from pystac_monty.sources.gdacs import (
@@ -8,6 +7,7 @@
87
)
98

109
from apps.etl.transform.sources.handler import BaseTransformerHandler
10+
from apps.etl.utils import write_into_temp_file
1111
from main.celery import app
1212

1313
logger = logging.getLogger(__name__)
@@ -21,23 +21,34 @@ class GDACSTransformHandler(BaseTransformerHandler[GDACSTransformer, GDACSDataSo
2121

2222
@classmethod
2323
def get_schema_data(cls, extraction_object):
24-
data = extraction_object.resp_data.read()
24+
with extraction_object.resp_data.open("rb") as f:
25+
file_content = f.read()
26+
data_file = write_into_temp_file(file_content)
27+
2528
episodes = []
2629
event_objects = extraction_object.child_extractions.all()
2730
for episode_obj in event_objects:
2831
episodes_data_dict = {}
29-
event_episode_data = episode_obj.resp_data.read()
30-
episodes_data_dict[GDACSDataSourceType.EVENT] = (episode_obj.url, json.loads(event_episode_data))
32+
33+
with episode_obj.resp_data.open("rb") as f:
34+
file_content = f.read()
35+
episode_data_temp_file = write_into_temp_file(file_content)
36+
37+
episodes_data_dict[GDACSDataSourceType.EVENT] = (episode_obj.url, episode_data_temp_file.name)
3138
geometry_objects = episode_obj.child_extractions.all()
3239

3340
for geometry_detail in geometry_objects:
34-
geometry_episode_data = geometry_detail.resp_data.read()
35-
episodes_data_dict[GDACSDataSourceType.GEOMETRY] = (geometry_detail.url, json.loads(geometry_episode_data))
41+
with geometry_detail.resp_data.open("rb") as f:
42+
file_content = f.read()
43+
geometry_detail_temp_file = write_into_temp_file(file_content)
44+
45+
episodes_data_dict[GDACSDataSourceType.GEOMETRY] = (geometry_detail.url, geometry_detail_temp_file.name)
3646

3747
episodes.append(episodes_data_dict)
38-
return cls.transformer_schema(source_url=extraction_object.url, data=json.loads(data), episodes=episodes)
48+
49+
return cls.transformer_schema(source_url=extraction_object.url, data=data_file.name, episodes=episodes)
3950

4051
@staticmethod
41-
@app.task
52+
@app.task(rate_limit="50/m")
4253
def task(extraction_id):
4354
GDACSTransformHandler().handle_transformation(extraction_id)

0 commit comments

Comments
 (0)