From a8f12ea2025c948887db476d71052a386419cc1e Mon Sep 17 00:00:00 2001 From: rup Date: Mon, 19 May 2025 18:20:22 +0545 Subject: [PATCH 01/21] add file as input inplace of data in transformer --- apps/etl/transform/sources/emdat.py | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/apps/etl/transform/sources/emdat.py b/apps/etl/transform/sources/emdat.py index 2655a39d..2571b11e 100644 --- a/apps/etl/transform/sources/emdat.py +++ b/apps/etl/transform/sources/emdat.py @@ -1,10 +1,10 @@ -import json import logging from pystac_monty.sources.emdat import EMDATDataSource, EMDATTransformer from apps.etl.models import ExtractionData from apps.etl.transform.sources.handler import BaseTransformerHandler +from apps.etl.utils import write_into_temp_file from main.celery import app logger = logging.getLogger(__name__) @@ -16,12 +16,13 @@ class EMDATTransformHandler(BaseTransformerHandler[EMDATTransformer, EMDATDataSo @classmethod def get_schema_data(cls, extraction_obj: ExtractionData): - with extraction_obj.resp_data.open() as file_data: - data = json.loads(file_data.read()) + with extraction_obj.resp_data.open("rb") as f: + data = f.read() + data_file = write_into_temp_file(data) return cls.transformer_schema( source_url=extraction_obj.url, - data=data, + data=data_file.name, ) @staticmethod From 7fb0bb613124da9e26d6f63d3b7d744371596150 Mon Sep 17 00:00:00 2001 From: rup-narayan-rajbanshi Date: Wed, 21 May 2025 17:02:47 +0545 Subject: [PATCH 02/21] change input data fromat for emdat transformer to accept file --- apps/etl/transform/sources/emdat.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/apps/etl/transform/sources/emdat.py b/apps/etl/transform/sources/emdat.py index 2571b11e..49bd5b1b 100644 --- a/apps/etl/transform/sources/emdat.py +++ b/apps/etl/transform/sources/emdat.py @@ -1,5 +1,6 @@ import logging +from pystac_monty.sources.common import DataType, File from pystac_monty.sources.emdat import EMDATDataSource, EMDATTransformer from apps.etl.models import ExtractionData @@ -20,10 +21,9 @@ def get_schema_data(cls, extraction_obj: ExtractionData): data = f.read() data_file = write_into_temp_file(data) - return cls.transformer_schema( - source_url=extraction_obj.url, - data=data_file.name, - ) + data_source = {"source_url": extraction_obj.url, "source_data": File(path=data_file.name, data_type=DataType.FILE)} + + return cls.transformer_schema(data_source) @staticmethod @app.task From e6164c15fd44bff920b0540089e6d94cd9a1e1d2 Mon Sep 17 00:00:00 2001 From: Ranjan Shrestha Date: Thu, 22 May 2025 12:08:47 +0545 Subject: [PATCH 03/21] Add ijson library and jq tool --- Dockerfile | 2 ++ pyproject.toml | 1 + uv.lock | 45 +++++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 48 insertions(+) diff --git a/Dockerfile b/Dockerfile index 099cfc87..6533af33 100644 --- a/Dockerfile +++ b/Dockerfile @@ -14,6 +14,8 @@ WORKDIR /code COPY libs /code/libs +RUN apt-get update && apt-get install -y jq + RUN --mount=type=cache,target=/root/.cache/uv \ --mount=type=bind,source=uv.lock,target=uv.lock \ --mount=type=bind,source=pyproject.toml,target=pyproject.toml \ diff --git a/pyproject.toml b/pyproject.toml index aced086a..bb060f51 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -34,6 +34,7 @@ dependencies = [ "colorlog", "requests_cache", "termcolor", + "ijson>=3.4.0", ] [tool.uv.sources] diff --git a/uv.lock b/uv.lock index dba78180..e5edeed3 100644 --- a/uv.lock +++ b/uv.lock @@ -810,6 +810,47 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/76/c6/c88e154df9c4e1a2a66ccf0005a88dfb2650c1dffb6f5ce603dfbd452ce3/idna-3.10-py3-none-any.whl", hash = "sha256:946d195a0d259cbba61165e88e65941f16e9b36ea6ddb97f00452bae8b1287d3", size = 70442 }, ] +[[package]] +name = "ijson" +version = "3.4.0" +source = { registry = "https://pypi.org/simple" } +sdist = { url = "https://files.pythonhosted.org/packages/a3/4f/1cfeada63f5fce87536651268ddf5cca79b8b4bbb457aee4e45777964a0a/ijson-3.4.0.tar.gz", hash = "sha256:5f74dcbad9d592c428d3ca3957f7115a42689ee7ee941458860900236ae9bb13", size = 65782 } +wheels = [ + { url = "https://files.pythonhosted.org/packages/f8/ec/317ee5b2d13e50448833ead3aa906659a32b376191f6abc2a7c6112d2b27/ijson-3.4.0-cp312-cp312-macosx_10_13_universal2.whl", hash = "sha256:956b148f88259a80a9027ffbe2d91705fae0c004fbfba3e5a24028fbe72311a9", size = 87212 }, + { url = "https://files.pythonhosted.org/packages/f8/43/b06c96ced30cacecc5d518f89b0fd1c98c294a30ff88848b70ed7b7f72a1/ijson-3.4.0-cp312-cp312-macosx_10_13_x86_64.whl", hash = "sha256:06b89960f5c721106394c7fba5760b3f67c515b8eb7d80f612388f5eca2f4621", size = 59175 }, + { url = "https://files.pythonhosted.org/packages/e9/df/b4aeafb7ecde463130840ee9be36130823ec94a00525049bf700883378b8/ijson-3.4.0-cp312-cp312-macosx_11_0_arm64.whl", hash = "sha256:9a0bb591cf250dd7e9dfab69d634745a7f3272d31cfe879f9156e0a081fd97ee", size = 59011 }, + { url = "https://files.pythonhosted.org/packages/e3/7c/a80b8e361641609507f62022089626d4b8067f0826f51e1c09e4ba86eba8/ijson-3.4.0-cp312-cp312-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:72e92de999977f4c6b660ffcf2b8d59604ccd531edcbfde05b642baf283e0de8", size = 146094 }, + { url = "https://files.pythonhosted.org/packages/01/44/fa416347b9a802e3646c6ff377fc3278bd7d6106e17beb339514b6a3184e/ijson-3.4.0-cp312-cp312-manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:9e9602157a5b869d44b6896e64f502c712a312fcde044c2e586fccb85d3e316e", size = 137903 }, + { url = "https://files.pythonhosted.org/packages/24/c6/41a9ad4d42df50ff6e70fdce79b034f09b914802737ebbdc141153d8d791/ijson-3.4.0-cp312-cp312-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:b1e83660edb931a425b7ff662eb49db1f10d30ca6d4d350e5630edbed098bc01", size = 148339 }, + { url = "https://files.pythonhosted.org/packages/5f/6f/7d01efda415b8502dce67e067ed9e8a124f53e763002c02207e542e1a2f1/ijson-3.4.0-cp312-cp312-musllinux_1_2_aarch64.whl", hash = "sha256:49bf8eac1c7b7913073865a859c215488461f7591b4fa6a33c14b51cb73659d0", size = 149383 }, + { url = "https://files.pythonhosted.org/packages/95/6c/0d67024b9ecb57916c5e5ab0350251c9fe2f86dc9c8ca2b605c194bdad6a/ijson-3.4.0-cp312-cp312-musllinux_1_2_i686.whl", hash = "sha256:160b09273cb42019f1811469508b0a057d19f26434d44752bde6f281da6d3f32", size = 141580 }, + { url = "https://files.pythonhosted.org/packages/06/43/e10edcc1c6a3b619294de835e7678bfb3a1b8a75955f3689fd66a1e9e7b4/ijson-3.4.0-cp312-cp312-musllinux_1_2_x86_64.whl", hash = "sha256:2019ff4e6f354aa00c76c8591bd450899111c61f2354ad55cc127e2ce2492c44", size = 150280 }, + { url = "https://files.pythonhosted.org/packages/07/84/1cbeee8e8190a1ebe6926569a92cf1fa80ddb380c129beb6f86559e1bb24/ijson-3.4.0-cp312-cp312-win32.whl", hash = "sha256:931c007bf6bb8330705429989b2deed6838c22b63358a330bf362b6e458ba0bf", size = 51512 }, + { url = "https://files.pythonhosted.org/packages/66/13/530802bc391c95be6fe9f96e9aa427d94067e7c0b7da7a9092344dc44c4b/ijson-3.4.0-cp312-cp312-win_amd64.whl", hash = "sha256:71523f2b64cb856a820223e94d23e88369f193017ecc789bb4de198cc9d349eb", size = 54081 }, + { url = "https://files.pythonhosted.org/packages/77/b3/b1d2eb2745e5204ec7a25365a6deb7868576214feb5e109bce368fb692c9/ijson-3.4.0-cp313-cp313-macosx_10_13_universal2.whl", hash = "sha256:e8d96f88d75196a61c9d9443de2b72c2d4a7ba9456ff117b57ae3bba23a54256", size = 87216 }, + { url = "https://files.pythonhosted.org/packages/b1/cd/cd6d340087617f8cc9bedbb21d974542fe2f160ed0126b8288d3499a469b/ijson-3.4.0-cp313-cp313-macosx_10_13_x86_64.whl", hash = "sha256:c45906ce2c1d3b62f15645476fc3a6ca279549127f01662a39ca5ed334a00cf9", size = 59170 }, + { url = "https://files.pythonhosted.org/packages/3e/4d/32d3a9903b488d3306e3c8288f6ee4217d2eea82728261db03a1045eb5d1/ijson-3.4.0-cp313-cp313-macosx_11_0_arm64.whl", hash = "sha256:4ab4bc2119b35c4363ea49f29563612237cae9413d2fbe54b223be098b97bc9e", size = 59013 }, + { url = "https://files.pythonhosted.org/packages/d5/c8/db15465ab4b0b477cee5964c8bfc94bf8c45af8e27a23e1ad78d1926e587/ijson-3.4.0-cp313-cp313-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:97b0a9b5a15e61dfb1f14921ea4e0dba39f3a650df6d8f444ddbc2b19b479ff1", size = 146564 }, + { url = "https://files.pythonhosted.org/packages/c4/d8/0755545bc122473a9a434ab90e0f378780e603d75495b1ca3872de757873/ijson-3.4.0-cp313-cp313-manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:e3047bb994dabedf11de11076ed1147a307924b6e5e2df6784fb2599c4ad8c60", size = 137917 }, + { url = "https://files.pythonhosted.org/packages/d0/c6/aeb89c8939ebe3f534af26c8c88000c5e870dbb6ae33644c21a4531f87d2/ijson-3.4.0-cp313-cp313-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:68c83161b052e9f5dc8191acbc862bb1e63f8a35344cb5cd0db1afd3afd487a6", size = 148897 }, + { url = "https://files.pythonhosted.org/packages/be/0e/7ef6e9b372106f2682a4a32b3c65bf86bb471a1670e4dac242faee4a7d3f/ijson-3.4.0-cp313-cp313-musllinux_1_2_aarch64.whl", hash = "sha256:1eebd9b6c20eb1dffde0ae1f0fbb4aeacec2eb7b89adb5c7c0449fc9fd742760", size = 149711 }, + { url = "https://files.pythonhosted.org/packages/d1/5d/9841c3ed75bcdabf19b3202de5f862a9c9c86ce5c7c9d95fa32347fdbf5f/ijson-3.4.0-cp313-cp313-musllinux_1_2_i686.whl", hash = "sha256:13fb6d5c35192c541421f3ee81239d91fc15a8d8f26c869250f941f4b346a86c", size = 141691 }, + { url = "https://files.pythonhosted.org/packages/d5/d2/ce74e17218dba292e9be10a44ed0c75439f7958cdd263adb0b5b92d012d5/ijson-3.4.0-cp313-cp313-musllinux_1_2_x86_64.whl", hash = "sha256:28b7196ff7b37c4897c547a28fa4876919696739fc91c1f347651c9736877c69", size = 150738 }, + { url = "https://files.pythonhosted.org/packages/4e/43/dcc480f94453b1075c9911d4755b823f3ace275761bb37b40139f22109ca/ijson-3.4.0-cp313-cp313-win32.whl", hash = "sha256:3c2691d2da42629522140f77b99587d6f5010440d58d36616f33bc7bdc830cc3", size = 51512 }, + { url = "https://files.pythonhosted.org/packages/35/dd/d8c5f15efd85ba51e6e11451ebe23d779361a9ec0d192064c2a8c3cdfcb8/ijson-3.4.0-cp313-cp313-win_amd64.whl", hash = "sha256:c4554718c275a044c47eb3874f78f2c939f300215d9031e785a6711cc51b83fc", size = 54074 }, + { url = "https://files.pythonhosted.org/packages/79/73/24ad8cd106203419c4d22bed627e02e281d66b83e91bc206a371893d0486/ijson-3.4.0-cp313-cp313t-macosx_10_13_universal2.whl", hash = "sha256:915a65e3f3c0eee2ea937bc62aaedb6c14cc1e8f0bb9f3f4fb5a9e2bbfa4b480", size = 91694 }, + { url = "https://files.pythonhosted.org/packages/17/2d/f7f680984bcb7324a46a4c2df3bd73cf70faef0acfeb85a3f811abdfd590/ijson-3.4.0-cp313-cp313t-macosx_10_13_x86_64.whl", hash = "sha256:afbe9748707684b6c5adc295c4fdcf27765b300aec4d484e14a13dca4e5c0afa", size = 61390 }, + { url = "https://files.pythonhosted.org/packages/09/a1/f3ca7bab86f95bdb82494739e71d271410dfefce4590785d511669127145/ijson-3.4.0-cp313-cp313t-macosx_11_0_arm64.whl", hash = "sha256:d823f8f321b4d8d5fa020d0a84f089fec5d52b7c0762430476d9f8bf95bbc1a9", size = 61140 }, + { url = "https://files.pythonhosted.org/packages/51/79/dd340df3d4fc7771c95df29997956b92ed0570fe7b616d1792fea9ad93f2/ijson-3.4.0-cp313-cp313t-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:b8a0a2c54f3becf76881188beefd98b484b1d3bd005769a740d5b433b089fa23", size = 214739 }, + { url = "https://files.pythonhosted.org/packages/59/f0/85380b7f51d1f5fb7065d76a7b623e02feca920cc678d329b2eccc0011e0/ijson-3.4.0-cp313-cp313t-manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:ced19a83ab09afa16257a0b15bc1aa888dbc555cb754be09d375c7f8d41051f2", size = 198338 }, + { url = "https://files.pythonhosted.org/packages/a5/cd/313264cf2ec42e0f01d198c49deb7b6fadeb793b3685e20e738eb6b3fa13/ijson-3.4.0-cp313-cp313t-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:8100f9885eff1f38d35cef80ef759a1bbf5fc946349afa681bd7d0e681b7f1a0", size = 207515 }, + { url = "https://files.pythonhosted.org/packages/12/94/bf14457aa87ea32641f2db577c9188ef4e4ae373478afef422b31fc7f309/ijson-3.4.0-cp313-cp313t-musllinux_1_2_aarch64.whl", hash = "sha256:d7bcc3f7f21b0f703031ecd15209b1284ea51b2a329d66074b5261de3916c1eb", size = 210081 }, + { url = "https://files.pythonhosted.org/packages/7d/b4/eaee39e290e40e52d665db9bd1492cfdce86bd1e47948e0440db209c6023/ijson-3.4.0-cp313-cp313t-musllinux_1_2_i686.whl", hash = "sha256:2dcb190227b09dd171bdcbfe4720fddd574933c66314818dfb3960c8a6246a77", size = 199253 }, + { url = "https://files.pythonhosted.org/packages/c5/9c/e09c7b9ac720a703ab115b221b819f149ed54c974edfff623c1e925e57da/ijson-3.4.0-cp313-cp313t-musllinux_1_2_x86_64.whl", hash = "sha256:eda4cfb1d49c6073a901735aaa62e39cb7ab47f3ad7bb184862562f776f1fa8a", size = 203816 }, + { url = "https://files.pythonhosted.org/packages/7c/14/acd304f412e32d16a2c12182b9d78206bb0ae35354d35664f45db05c1b3b/ijson-3.4.0-cp313-cp313t-win32.whl", hash = "sha256:0772638efa1f3b72b51736833404f1cbd2f5beeb9c1a3d392e7d385b9160cba7", size = 53760 }, + { url = "https://files.pythonhosted.org/packages/2f/24/93dd0a467191590a5ed1fc2b35842bca9d09900d001e00b0b497c0208ef6/ijson-3.4.0-cp313-cp313t-win_amd64.whl", hash = "sha256:3d8a0d67f36e4fb97c61a724456ef0791504b16ce6f74917a31c2e92309bbeb9", size = 56948 }, +] + [[package]] name = "ipython" version = "9.2.0" @@ -975,6 +1016,7 @@ dependencies = [ { name = "fiona" }, { name = "flower" }, { name = "geopandas" }, + { name = "ijson" }, { name = "ipython" }, { name = "lxml" }, { name = "pandas" }, @@ -1006,6 +1048,7 @@ requires-dist = [ { name = "fiona", specifier = ">=1.10.1" }, { name = "flower" }, { name = "geopandas" }, + { name = "ijson", specifier = ">=3.4.0" }, { name = "ipython" }, { name = "lxml", specifier = ">=5.3.0,<6" }, { name = "pandas", specifier = ">=2.2.3,<3" }, @@ -1449,6 +1492,7 @@ name = "pystac-monty" source = { editable = "libs/pystac-monty" } dependencies = [ { name = "geojson" }, + { name = "ijson" }, { name = "lxml" }, { name = "markdownify" }, { name = "pandas" }, @@ -1461,6 +1505,7 @@ dependencies = [ [package.metadata] requires-dist = [ { name = "geojson", specifier = ">=2.5.0" }, + { name = "ijson", specifier = ">=3.4.0" }, { name = "lxml", specifier = ">=5.3.0" }, { name = "markdownify", specifier = ">=0.14.1" }, { name = "pandas", specifier = ">=2.2.0" }, From 82eaecc8f87de7d3d5c67a1194bc3386f9ac56bb Mon Sep 17 00:00:00 2001 From: Ranjan Shrestha Date: Thu, 22 May 2025 14:43:44 +0545 Subject: [PATCH 04/21] Refactor Data source for the IDUDataSourceV2 Fix lint --- apps/etl/transform/sources/idu.py | 18 ++++++++++++------ 1 file changed, 12 insertions(+), 6 deletions(-) diff --git a/apps/etl/transform/sources/idu.py b/apps/etl/transform/sources/idu.py index 112b8c1e..bc0f50db 100644 --- a/apps/etl/transform/sources/idu.py +++ b/apps/etl/transform/sources/idu.py @@ -1,19 +1,25 @@ -from pystac_monty.sources.idu import IDUDataSource, IDUTransformer +from pystac_monty.sources.common import DataType, File +from pystac_monty.sources.idu import IDUDataSourceV2, IDUTransformer from apps.etl.transform.sources.handler import BaseTransformerHandler +from apps.etl.utils import write_into_temp_file from main.celery import CeleryQueue, app -class IDUTransformHandler(BaseTransformerHandler[IDUTransformer, IDUDataSource]): +class IDUTransformHandler(BaseTransformerHandler[IDUTransformer, IDUDataSourceV2]): transformer_class = IDUTransformer - transformer_schema = IDUDataSource + transformer_schema = IDUDataSourceV2 @classmethod def get_schema_data(cls, extraction_obj): - with extraction_obj.resp_data.open() as file_data: - data = file_data.read() + with extraction_obj.resp_data.open("rb") as file_data: + file_content = file_data.read() - return cls.transformer_schema(source_url=extraction_obj.url, data=data) + data_file = write_into_temp_file(content=file_content) + + data_source = {"source_url": extraction_obj.url, "source_data": File(path=data_file.name, data_type=DataType.FILE)} + + return cls.transformer_schema(data_source) @staticmethod @app.task(queue=CeleryQueue.DEFAULT) From 6fb4fe3ff4c644f7935b0b972d26c8e83da3e19e Mon Sep 17 00:00:00 2001 From: rup-narayan-rajbanshi Date: Tue, 20 May 2025 17:12:15 +0545 Subject: [PATCH 05/21] send file as input for glide transformer. - Change data schema format for glide transformer. --- apps/etl/transform/sources/glide.py | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/apps/etl/transform/sources/glide.py b/apps/etl/transform/sources/glide.py index a45ccafe..3f7328f0 100644 --- a/apps/etl/transform/sources/glide.py +++ b/apps/etl/transform/sources/glide.py @@ -1,9 +1,11 @@ import json +from pystac_monty.sources.common import DataType, File from pystac_monty.sources.glide import GlideDataSource, GlideTransformer from apps.etl.models import ExtractionData from apps.etl.transform.sources.handler import BaseTransformerHandler +from apps.etl.utils import write_into_temp_file from main.celery import app @@ -13,10 +15,12 @@ class GlideTransformHandler(BaseTransformerHandler[GlideTransformer, GlideDataSo @classmethod def get_schema_data(cls, extraction_obj): - with extraction_obj.resp_data.open() as file_data: - data = file_data.read() + with extraction_obj.resp_data.open("rb") as f: + data = f.read() + data_file = write_into_temp_file(data) + data_source = {"source_url": extraction_obj.url, "source_data": File(path=data_file.name, data_type=DataType.FILE)} - return cls.transformer_schema(source_url=extraction_obj.url, data=data) + return cls.transformer_schema(data_source) @staticmethod @app.task From d342d5bd8bcaa24f4ec6b3ec370f9a72dfd7d685 Mon Sep 17 00:00:00 2001 From: rup-narayan-rajbanshi Date: Tue, 20 May 2025 17:12:15 +0545 Subject: [PATCH 06/21] send file as input for glide transformer. - Change data schema format for glide transformer. --- apps/etl/transform/sources/gdacs.py | 56 +++++++++++++++++++++-------- libs/pystac-monty | 2 +- 2 files changed, 42 insertions(+), 16 deletions(-) diff --git a/apps/etl/transform/sources/gdacs.py b/apps/etl/transform/sources/gdacs.py index 6ae5da42..fe51dda5 100644 --- a/apps/etl/transform/sources/gdacs.py +++ b/apps/etl/transform/sources/gdacs.py @@ -1,13 +1,14 @@ -import json import logging +from pystac_monty.sources.common import DataType, File, GdacsDataSourceType, GdacsEpisodes, GenericDataSource from pystac_monty.sources.gdacs import ( - GDACSDataSource, GDACSDataSourceType, + GDACSDataSourceV3, GDACSTransformer, ) from apps.etl.transform.sources.handler import BaseTransformerHandler +from apps.etl.utils import write_into_temp_file from main.celery import app logger = logging.getLogger(__name__) @@ -15,29 +16,54 @@ # FIXME: start_end_handler base zzz -class GDACSTransformHandler(BaseTransformerHandler[GDACSTransformer, GDACSDataSource]): +class GDACSTransformHandler(BaseTransformerHandler[GDACSTransformer, GDACSDataSourceV3]): transformer_class = GDACSTransformer - transformer_schema = GDACSDataSource + transformer_schema = GDACSDataSourceV3 @classmethod def get_schema_data(cls, extraction_object): - data = extraction_object.resp_data.read() + with extraction_object.resp_data.open("rb") as f: + file_content = f.read() + data_file = write_into_temp_file(file_content) + episodes = [] event_objects = extraction_object.child_extractions.all() for episode_obj in event_objects: - episodes_data_dict = {} - event_episode_data = episode_obj.resp_data.read() - episodes_data_dict[GDACSDataSourceType.EVENT] = (episode_obj.url, json.loads(event_episode_data)) - geometry_objects = episode_obj.child_extractions.all() + with episode_obj.resp_data.open("rb") as f: + file_content = f.read() + episode_data_temp_file = write_into_temp_file(file_content) + + event_episode_data = GdacsEpisodes( + type=GDACSDataSourceType.EVENT, + data=GenericDataSource( + source_url=episode_obj.url, data_source=File(path=episode_data_temp_file.name, data_type=DataType.FILE) + ), + ) + geometry_object = episode_obj.child_extractions.all().first() + + with geometry_object.resp_data.open("rb") as f: + file_content = f.read() + geometry_detail_temp_file = write_into_temp_file(file_content) + geometry_episode_data = GdacsEpisodes( + type=GDACSDataSourceType.GEOMETRY, + data=GenericDataSource( + source_url=geometry_object.url, + data_source=File(path=geometry_detail_temp_file.name, data_type=DataType.FILE), + ), + ) - for geometry_detail in geometry_objects: - geometry_episode_data = geometry_detail.resp_data.read() - episodes_data_dict[GDACSDataSourceType.GEOMETRY] = (geometry_detail.url, json.loads(geometry_episode_data)) + episode_data_tuple = (event_episode_data, geometry_episode_data) + episodes.append(episode_data_tuple) - episodes.append(episodes_data_dict) - return cls.transformer_schema(source_url=extraction_object.url, data=json.loads(data), episodes=episodes) + return cls.transformer_schema( + data=GdacsDataSourceType( + source_url=extraction_object.url, + event_data=File(path=data_file.name, data_type=DataType.FILE), + episodes=episodes, + ) + ) @staticmethod - @app.task + @app.task(rate_limit="50/m") def task(extraction_id): GDACSTransformHandler().handle_transformation(extraction_id) diff --git a/libs/pystac-monty b/libs/pystac-monty index 833f2a74..80c96aa9 160000 --- a/libs/pystac-monty +++ b/libs/pystac-monty @@ -1 +1 @@ -Subproject commit 833f2a74bf38504de1b67d7abe3fc134e6fb59b3 +Subproject commit 80c96aa95cec3689fb7594a75a996e4ce599d22b From b7a4945fc2fc4598ca0b5ac4e539552d68425b74 Mon Sep 17 00:00:00 2001 From: rup-narayan-rajbanshi Date: Thu, 29 May 2025 12:00:17 +0545 Subject: [PATCH 07/21] Change data format for glide to accept file and memory data. --- apps/etl/transform/sources/glide.py | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/apps/etl/transform/sources/glide.py b/apps/etl/transform/sources/glide.py index 3f7328f0..b8a17386 100644 --- a/apps/etl/transform/sources/glide.py +++ b/apps/etl/transform/sources/glide.py @@ -1,6 +1,6 @@ import json -from pystac_monty.sources.common import DataType, File +from pystac_monty.sources.common import DataType, File, GenericDataSource from pystac_monty.sources.glide import GlideDataSource, GlideTransformer from apps.etl.models import ExtractionData @@ -18,9 +18,13 @@ def get_schema_data(cls, extraction_obj): with extraction_obj.resp_data.open("rb") as f: data = f.read() data_file = write_into_temp_file(data) - data_source = {"source_url": extraction_obj.url, "source_data": File(path=data_file.name, data_type=DataType.FILE)} - return cls.transformer_schema(data_source) + return cls.transformer_schema( + data=GenericDataSource( + source_url=extraction_obj.url, + data_source=File(path=data_file.name, data_type=DataType.FILE), + ) + ) @staticmethod @app.task From babd3a650daa5c4890e58f67c2afbe5b51cd386d Mon Sep 17 00:00:00 2001 From: Ranjan Shrestha Date: Mon, 26 May 2025 17:00:36 +0545 Subject: [PATCH 08/21] Update the schema to handle the GIDD data --- apps/etl/transform/sources/gidd.py | 16 +++++++++++----- 1 file changed, 11 insertions(+), 5 deletions(-) diff --git a/apps/etl/transform/sources/gidd.py b/apps/etl/transform/sources/gidd.py index 931f1e99..1d2e61ce 100644 --- a/apps/etl/transform/sources/gidd.py +++ b/apps/etl/transform/sources/gidd.py @@ -1,19 +1,25 @@ -from pystac_monty.sources.gidd import GIDDDataSource, GIDDTransformer +from pystac_monty.sources.common import DataType, File +from pystac_monty.sources.gidd import GIDDDataSourceV2, GIDDTransformer from apps.etl.transform.sources.handler import BaseTransformerHandler +from apps.etl.utils import write_into_temp_file from main.celery import CeleryQueue, app -class GIDDTransformHandler(BaseTransformerHandler[GIDDTransformer, GIDDDataSource]): +class GIDDTransformHandler(BaseTransformerHandler[GIDDTransformer, GIDDDataSourceV2]): transformer_class = GIDDTransformer - transformer_schema = GIDDDataSource + transformer_schema = GIDDDataSourceV2 @classmethod def get_schema_data(cls, extraction_obj): with extraction_obj.resp_data.open() as file_data: - data = file_data.read() + file_content = file_data.read() - return cls.transformer_schema(source_url=extraction_obj.url, data=data) + data_file = write_into_temp_file(content=file_content) + + data_source = {"source_url": extraction_obj.url, "source_data": File(path=data_file.name, data_type=DataType.FILE)} + + return cls.transformer_schema(data_source) @staticmethod @app.task(queue=CeleryQueue.DEFAULT) From 5a700d91aa9b90a397dffa4559ac74a987784b22 Mon Sep 17 00:00:00 2001 From: Ranjan Shrestha Date: Fri, 30 May 2025 12:23:15 +0545 Subject: [PATCH 09/21] Update the data based on generic class Fix the lint --- apps/etl/transform/sources/gdacs.py | 4 ++-- apps/etl/transform/sources/gidd.py | 6 ++++-- apps/etl/transform/sources/glide.py | 2 +- 3 files changed, 7 insertions(+), 5 deletions(-) diff --git a/apps/etl/transform/sources/gdacs.py b/apps/etl/transform/sources/gdacs.py index fe51dda5..6f3f85a5 100644 --- a/apps/etl/transform/sources/gdacs.py +++ b/apps/etl/transform/sources/gdacs.py @@ -36,7 +36,7 @@ def get_schema_data(cls, extraction_object): event_episode_data = GdacsEpisodes( type=GDACSDataSourceType.EVENT, data=GenericDataSource( - source_url=episode_obj.url, data_source=File(path=episode_data_temp_file.name, data_type=DataType.FILE) + source_url=episode_obj.url, input_data=File(path=episode_data_temp_file.name, data_type=DataType.FILE) ), ) geometry_object = episode_obj.child_extractions.all().first() @@ -48,7 +48,7 @@ def get_schema_data(cls, extraction_object): type=GDACSDataSourceType.GEOMETRY, data=GenericDataSource( source_url=geometry_object.url, - data_source=File(path=geometry_detail_temp_file.name, data_type=DataType.FILE), + input_data=File(path=geometry_detail_temp_file.name, data_type=DataType.FILE), ), ) diff --git a/apps/etl/transform/sources/gidd.py b/apps/etl/transform/sources/gidd.py index 1d2e61ce..1e645640 100644 --- a/apps/etl/transform/sources/gidd.py +++ b/apps/etl/transform/sources/gidd.py @@ -1,4 +1,4 @@ -from pystac_monty.sources.common import DataType, File +from pystac_monty.sources.common import DataType, File, GenericDataSource from pystac_monty.sources.gidd import GIDDDataSourceV2, GIDDTransformer from apps.etl.transform.sources.handler import BaseTransformerHandler @@ -17,7 +17,9 @@ def get_schema_data(cls, extraction_obj): data_file = write_into_temp_file(content=file_content) - data_source = {"source_url": extraction_obj.url, "source_data": File(path=data_file.name, data_type=DataType.FILE)} + data_source = GenericDataSource( + source_url=extraction_obj.url, data_source=File(path=data_file.name, data_type=DataType.FILE) + ) return cls.transformer_schema(data_source) diff --git a/apps/etl/transform/sources/glide.py b/apps/etl/transform/sources/glide.py index b8a17386..01a9a509 100644 --- a/apps/etl/transform/sources/glide.py +++ b/apps/etl/transform/sources/glide.py @@ -22,7 +22,7 @@ def get_schema_data(cls, extraction_obj): return cls.transformer_schema( data=GenericDataSource( source_url=extraction_obj.url, - data_source=File(path=data_file.name, data_type=DataType.FILE), + input_data=File(path=data_file.name, data_type=DataType.FILE), ) ) From 1a08ab7da9877b0e5d50714e3b69c30c22846516 Mon Sep 17 00:00:00 2001 From: Ranjan Shrestha Date: Fri, 30 May 2025 12:29:34 +0545 Subject: [PATCH 10/21] Update the class for GIDD source --- apps/etl/transform/sources/gidd.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/apps/etl/transform/sources/gidd.py b/apps/etl/transform/sources/gidd.py index 1e645640..cf677c72 100644 --- a/apps/etl/transform/sources/gidd.py +++ b/apps/etl/transform/sources/gidd.py @@ -1,14 +1,14 @@ from pystac_monty.sources.common import DataType, File, GenericDataSource -from pystac_monty.sources.gidd import GIDDDataSourceV2, GIDDTransformer +from pystac_monty.sources.gidd import GIDDDataSource, GIDDTransformer from apps.etl.transform.sources.handler import BaseTransformerHandler from apps.etl.utils import write_into_temp_file from main.celery import CeleryQueue, app -class GIDDTransformHandler(BaseTransformerHandler[GIDDTransformer, GIDDDataSourceV2]): +class GIDDTransformHandler(BaseTransformerHandler[GIDDTransformer, GIDDDataSource]): transformer_class = GIDDTransformer - transformer_schema = GIDDDataSourceV2 + transformer_schema = GIDDDataSource @classmethod def get_schema_data(cls, extraction_obj): @@ -18,7 +18,7 @@ def get_schema_data(cls, extraction_obj): data_file = write_into_temp_file(content=file_content) data_source = GenericDataSource( - source_url=extraction_obj.url, data_source=File(path=data_file.name, data_type=DataType.FILE) + source_url=extraction_obj.url, input_data=File(path=data_file.name, data_type=DataType.FILE) ) return cls.transformer_schema(data_source) From 770bdb103e7ff9b8cb35ac13c0495afec0d96d89 Mon Sep 17 00:00:00 2001 From: Ranjan Shrestha Date: Fri, 30 May 2025 13:41:51 +0545 Subject: [PATCH 11/21] Update the class --- apps/etl/transform/sources/gdacs.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/apps/etl/transform/sources/gdacs.py b/apps/etl/transform/sources/gdacs.py index 6f3f85a5..aa8d1b61 100644 --- a/apps/etl/transform/sources/gdacs.py +++ b/apps/etl/transform/sources/gdacs.py @@ -2,8 +2,8 @@ from pystac_monty.sources.common import DataType, File, GdacsDataSourceType, GdacsEpisodes, GenericDataSource from pystac_monty.sources.gdacs import ( + GDACSDataSource, GDACSDataSourceType, - GDACSDataSourceV3, GDACSTransformer, ) @@ -16,9 +16,9 @@ # FIXME: start_end_handler base zzz -class GDACSTransformHandler(BaseTransformerHandler[GDACSTransformer, GDACSDataSourceV3]): +class GDACSTransformHandler(BaseTransformerHandler[GDACSTransformer, GDACSDataSource]): transformer_class = GDACSTransformer - transformer_schema = GDACSDataSourceV3 + transformer_schema = GDACSDataSource @classmethod def get_schema_data(cls, extraction_object): From cb07296235e0b8e61ae14c62e1549d4f95b9a457 Mon Sep 17 00:00:00 2001 From: Ranjan Shrestha Date: Fri, 30 May 2025 14:11:07 +0545 Subject: [PATCH 12/21] Use the updated file/memory schema --- apps/etl/transform/sources/idu.py | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/apps/etl/transform/sources/idu.py b/apps/etl/transform/sources/idu.py index bc0f50db..4f2fe5c7 100644 --- a/apps/etl/transform/sources/idu.py +++ b/apps/etl/transform/sources/idu.py @@ -1,14 +1,14 @@ -from pystac_monty.sources.common import DataType, File -from pystac_monty.sources.idu import IDUDataSourceV2, IDUTransformer +from pystac_monty.sources.common import DataType, File, GenericDataSource +from pystac_monty.sources.idu import IDUDataSource, IDUTransformer from apps.etl.transform.sources.handler import BaseTransformerHandler from apps.etl.utils import write_into_temp_file from main.celery import CeleryQueue, app -class IDUTransformHandler(BaseTransformerHandler[IDUTransformer, IDUDataSourceV2]): +class IDUTransformHandler(BaseTransformerHandler[IDUTransformer, IDUDataSource]): transformer_class = IDUTransformer - transformer_schema = IDUDataSourceV2 + transformer_schema = IDUDataSource @classmethod def get_schema_data(cls, extraction_obj): @@ -17,8 +17,9 @@ def get_schema_data(cls, extraction_obj): data_file = write_into_temp_file(content=file_content) - data_source = {"source_url": extraction_obj.url, "source_data": File(path=data_file.name, data_type=DataType.FILE)} - + data_source = GenericDataSource( + source_url=extraction_obj.url, input_data=File(path=data_file.name, data_type=DataType.FILE) + ) return cls.transformer_schema(data_source) @staticmethod From 534a318991861acb7fb5872b4abd284df34fcb7a Mon Sep 17 00:00:00 2001 From: rup-narayan-rajbanshi Date: Thu, 29 May 2025 15:36:40 +0545 Subject: [PATCH 13/21] change data format for emdat transformer schema to accept file. --- apps/etl/transform/sources/emdat.py | 11 +++++++---- libs/pystac-monty | 2 +- 2 files changed, 8 insertions(+), 5 deletions(-) diff --git a/apps/etl/transform/sources/emdat.py b/apps/etl/transform/sources/emdat.py index 49bd5b1b..2f69e7db 100644 --- a/apps/etl/transform/sources/emdat.py +++ b/apps/etl/transform/sources/emdat.py @@ -1,6 +1,6 @@ import logging -from pystac_monty.sources.common import DataType, File +from pystac_monty.sources.common import DataType, File, GenericDataSource from pystac_monty.sources.emdat import EMDATDataSource, EMDATTransformer from apps.etl.models import ExtractionData @@ -21,9 +21,12 @@ def get_schema_data(cls, extraction_obj: ExtractionData): data = f.read() data_file = write_into_temp_file(data) - data_source = {"source_url": extraction_obj.url, "source_data": File(path=data_file.name, data_type=DataType.FILE)} - - return cls.transformer_schema(data_source) + return cls.transformer_schema( + data=GenericDataSource( + source_url=extraction_obj.url, + input_data=File(path=data_file.name, data_type=DataType.FILE), + ) + ) @staticmethod @app.task diff --git a/libs/pystac-monty b/libs/pystac-monty index 80c96aa9..4e87377b 160000 --- a/libs/pystac-monty +++ b/libs/pystac-monty @@ -1 +1 @@ -Subproject commit 80c96aa95cec3689fb7594a75a996e4ce599d22b +Subproject commit 4e87377b624234d292d32f9154b2b51d934e84b1 From 2f31c051cc8eb584f656e3b3ab5f6c9601f7ebf9 Mon Sep 17 00:00:00 2001 From: Ranjan Shrestha Date: Fri, 30 May 2025 15:44:24 +0545 Subject: [PATCH 14/21] Update the file / data schema for GFD source --- apps/etl/transform/sources/gfd.py | 14 ++++++++++---- 1 file changed, 10 insertions(+), 4 deletions(-) diff --git a/apps/etl/transform/sources/gfd.py b/apps/etl/transform/sources/gfd.py index b33c2b90..25148939 100644 --- a/apps/etl/transform/sources/gfd.py +++ b/apps/etl/transform/sources/gfd.py @@ -1,7 +1,9 @@ +from pystac_monty.sources.common import DataType, File, GenericDataSource from pystac_monty.sources.gfd import GFDDataSource, GFDTransformer from apps.etl.models import ExtractionData from apps.etl.transform.sources.handler import BaseTransformerHandler +from apps.etl.utils import write_into_temp_file from main.celery import CeleryQueue, app @@ -11,11 +13,15 @@ class GFDTransformHandler(BaseTransformerHandler[GFDTransformer, GFDDataSource]) @classmethod def get_schema_data(cls, extraction_obj: ExtractionData): - with extraction_obj.resp_data.open() as file_data: - data = file_data.read() - data = data.decode("utf-8") + with extraction_obj.resp_data.open("rb") as file_data: + file_content = file_data.read() - return cls.transformer_schema(source_url=extraction_obj.url, data=data) + data_file = write_into_temp_file(content=file_content) + + data_source = GenericDataSource( + source_url=extraction_obj.url, input_data=File(path=data_file.name, data_type=DataType.FILE) + ) + return cls.transformer_schema(data_source) @staticmethod @app.task(queue=CeleryQueue.DEFAULT) From 0cad2893fe02d7fed80360ff648e4ae03fd0692c Mon Sep 17 00:00:00 2001 From: Sudan Bhandari Date: Fri, 23 May 2025 16:15:05 +0545 Subject: [PATCH 15/21] Add file handling in ibtracs --- apps/etl/transform/sources/noaa_ibtracs.py | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/apps/etl/transform/sources/noaa_ibtracs.py b/apps/etl/transform/sources/noaa_ibtracs.py index 7dc94098..0dfc9ec0 100644 --- a/apps/etl/transform/sources/noaa_ibtracs.py +++ b/apps/etl/transform/sources/noaa_ibtracs.py @@ -1,8 +1,10 @@ import logging +from pystac_monty.sources.common import DataType, File from pystac_monty.sources.ibtracs import IBTrACSDataSource, IBTrACSTransformer from apps.etl.transform.sources.handler import BaseTransformerHandler +from apps.etl.utils import write_into_temp_file from main.celery import app logger = logging.getLogger(__name__) @@ -16,11 +18,10 @@ class IbtracsTransformHandler(BaseTransformerHandler[IBTrACSTransformer, IBTrACS def get_schema_data(cls, extraction_obj): with extraction_obj.resp_data.open() as file_data: data = file_data.read() + data_file = write_into_temp_file(data) - return cls.transformer_schema( - source_url=extraction_obj.url, - data=data.decode("utf-8"), - ) + data_source = {"source_url": extraction_obj.url, "source_data": File(path=data_file.name, data_type=DataType.FILE)} + return cls.transformer_schema(data_source) @staticmethod @app.task From d9df983efa59b75eac93e41a53e3534d2b6b1c14 Mon Sep 17 00:00:00 2001 From: Sudan Bhandari Date: Tue, 3 Jun 2025 14:30:04 +0545 Subject: [PATCH 16/21] fixup! Add file handling in ibtracs --- apps/etl/transform/sources/noaa_ibtracs.py | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/apps/etl/transform/sources/noaa_ibtracs.py b/apps/etl/transform/sources/noaa_ibtracs.py index 0dfc9ec0..2bca3aac 100644 --- a/apps/etl/transform/sources/noaa_ibtracs.py +++ b/apps/etl/transform/sources/noaa_ibtracs.py @@ -1,6 +1,6 @@ import logging -from pystac_monty.sources.common import DataType, File +from pystac_monty.sources.common import DataType, File, GenericDataSource from pystac_monty.sources.ibtracs import IBTrACSDataSource, IBTrACSTransformer from apps.etl.transform.sources.handler import BaseTransformerHandler @@ -20,8 +20,12 @@ def get_schema_data(cls, extraction_obj): data = file_data.read() data_file = write_into_temp_file(data) - data_source = {"source_url": extraction_obj.url, "source_data": File(path=data_file.name, data_type=DataType.FILE)} - return cls.transformer_schema(data_source) + data_source = GenericDataSource( + source_url=extraction_obj.url, + input_data=File(path=data_file.name, data_type=DataType.FILE), + ) + + return cls.transformer_schema(data=data_source) @staticmethod @app.task From aa565050eb5107aaf607b5eef3eed0a0ffb0be21 Mon Sep 17 00:00:00 2001 From: Sudan Bhandari Date: Thu, 29 May 2025 00:28:42 +0545 Subject: [PATCH 17/21] File streaming for ifrc dref --- apps/etl/transform/sources/ifrc_event.py | 13 ++++++++++--- 1 file changed, 10 insertions(+), 3 deletions(-) diff --git a/apps/etl/transform/sources/ifrc_event.py b/apps/etl/transform/sources/ifrc_event.py index 8537e21c..7417a018 100644 --- a/apps/etl/transform/sources/ifrc_event.py +++ b/apps/etl/transform/sources/ifrc_event.py @@ -1,9 +1,11 @@ import json import logging +from pystac_monty.sources.common import DataType, File, GenericDataSource from pystac_monty.sources.ifrc_events import IFRCEventDataSource, IFRCEventTransformer from apps.etl.transform.sources.handler import BaseTransformerHandler +from apps.etl.utils import write_into_temp_file from main.celery import CeleryQueue, app logger = logging.getLogger(__name__) @@ -17,10 +19,15 @@ class IFRCEventTransformHandler(BaseTransformerHandler[IFRCEventTransformer, IFR def get_schema_data(cls, extraction_obj): with extraction_obj.resp_data.open() as file_data: data = json.loads(file_data.read()) - return cls.transformer_schema( - source_url=extraction_obj.url, - data=json.dumps(data["results"]), + + data_file = write_into_temp_file(json.dumps(data["results"]).encode("utf-8")) + + data_source = IFRCEventDataSource( + data=GenericDataSource( + source_url=extraction_obj.url, data_source=File(path=data_file.name, data_type=DataType.FILE) + ) ) + return cls.transformer_schema(data_source) @staticmethod @app.task(queue=CeleryQueue.TRANSFORM) From a9439ee231f80b5638bfcd6aaa79aa9b65c798e3 Mon Sep 17 00:00:00 2001 From: rup-narayan-rajbanshi Date: Tue, 3 Jun 2025 14:26:09 +0545 Subject: [PATCH 18/21] Change data schema for ifrc transformer. --- apps/etl/etl_tasks/ifrc_event.py | 2 +- apps/etl/transform/sources/ifrc_event.py | 8 +++----- libs/pystac-monty | 2 +- 3 files changed, 5 insertions(+), 7 deletions(-) diff --git a/apps/etl/etl_tasks/ifrc_event.py b/apps/etl/etl_tasks/ifrc_event.py index af457b8e..8b03ea12 100644 --- a/apps/etl/etl_tasks/ifrc_event.py +++ b/apps/etl/etl_tasks/ifrc_event.py @@ -28,7 +28,7 @@ def ext_and_transform_ifrcevent_latest_data(): if ext_object: start_date = ext_object.created_at.date() else: - start_date = etl_config.GLIDE_START_DATE + start_date = etl_config.IFRC_EVENT_START_DATE params = IfrcEventExtractionInputMetadata( disaster_start_date__gte=str(start_date), diff --git a/apps/etl/transform/sources/ifrc_event.py b/apps/etl/transform/sources/ifrc_event.py index 7417a018..eba9f857 100644 --- a/apps/etl/transform/sources/ifrc_event.py +++ b/apps/etl/transform/sources/ifrc_event.py @@ -21,12 +21,10 @@ def get_schema_data(cls, extraction_obj): data = json.loads(file_data.read()) data_file = write_into_temp_file(json.dumps(data["results"]).encode("utf-8")) - - data_source = IFRCEventDataSource( - data=GenericDataSource( - source_url=extraction_obj.url, data_source=File(path=data_file.name, data_type=DataType.FILE) - ) + data_source = GenericDataSource( + source_url=extraction_obj.url, input_data=File(path=data_file.name, data_type=DataType.FILE) ) + return cls.transformer_schema(data_source) @staticmethod diff --git a/libs/pystac-monty b/libs/pystac-monty index 4e87377b..6e51e2a6 160000 --- a/libs/pystac-monty +++ b/libs/pystac-monty @@ -1 +1 @@ -Subproject commit 4e87377b624234d292d32f9154b2b51d934e84b1 +Subproject commit 6e51e2a6475d7300fa726b948393842e080d93c8 From 408185615acd28af05b8a5a927ecbe5eda613ea9 Mon Sep 17 00:00:00 2001 From: Sudan Bhandari Date: Thu, 8 May 2025 11:43:20 +0545 Subject: [PATCH 19/21] Stream usgs file into transform --- apps/etl/transform/sources/usgs.py | 18 +++++++++++++----- 1 file changed, 13 insertions(+), 5 deletions(-) diff --git a/apps/etl/transform/sources/usgs.py b/apps/etl/transform/sources/usgs.py index feea729b..9271b3ad 100644 --- a/apps/etl/transform/sources/usgs.py +++ b/apps/etl/transform/sources/usgs.py @@ -1,4 +1,5 @@ import json +import tempfile from pystac_monty.sources.usgs import USGSDataSource, USGSTransformer @@ -11,6 +12,7 @@ class USGSTransformHandler(BaseTransformerHandler[USGSTransformer, USGSDataSourc transformer_class = USGSTransformer transformer_schema = USGSDataSource + @classmethod @classmethod def get_schema_data(cls, extraction_obj): losses_data_qs = ExtractionData.objects.filter(parent=extraction_obj) @@ -23,15 +25,21 @@ def get_schema_data(cls, extraction_obj): data = json.loads(file_data.read()) losses_data.append(data) + # Write losses_data (which is JSON) to a temp file in text mode + with tempfile.NamedTemporaryFile(delete=False, suffix=".json", mode="w", encoding="utf-8") as tmp_file: + json.dump(losses_data, tmp_file) + losses_data_path = tmp_file.name + + # Read the main extraction object data (as bytes) with extraction_obj.resp_data.open() as file_data: data = file_data.read() - losses_data = json.dumps(losses_data) - # FIXME: Why are we setting lossed_data to None? - if not losses_data: - losses_data = None + # Write raw bytes to a temp file + with tempfile.NamedTemporaryFile(delete=False, suffix=".json") as tmp_data_file: + tmp_data_file.write(data) + data_path = tmp_data_file.name - return cls.transformer_schema(source_url=extraction_obj.url, data=data, losses_data=losses_data) + return cls.transformer_schema(source_url=extraction_obj.url, data=data_path, losses_data=losses_data_path) @staticmethod @app.task(queue=CeleryQueue.TRANSFORM) From 3bb2d4ead61e452cdcb759d43af952024dcbcfd9 Mon Sep 17 00:00:00 2001 From: Sudan Bhandari Date: Wed, 4 Jun 2025 10:01:34 +0545 Subject: [PATCH 20/21] Update schema --- .dockerignore | 1 + apps/etl/transform/sources/usgs.py | 12 +++++++++--- libs/pystac-monty | 2 +- 3 files changed, 11 insertions(+), 4 deletions(-) diff --git a/.dockerignore b/.dockerignore index 24b67423..3eea377e 100644 --- a/.dockerignore +++ b/.dockerignore @@ -50,6 +50,7 @@ coverage.xml *.py,cover .hypothesis/ .pytest_cache/ +assets/ # Translations *.mo diff --git a/apps/etl/transform/sources/usgs.py b/apps/etl/transform/sources/usgs.py index 9271b3ad..7c3bbd9a 100644 --- a/apps/etl/transform/sources/usgs.py +++ b/apps/etl/transform/sources/usgs.py @@ -1,7 +1,8 @@ import json import tempfile -from pystac_monty.sources.usgs import USGSDataSource, USGSTransformer +from pystac_monty.sources.common import DataType, File +from pystac_monty.sources.usgs import USGSDataSource, USGSDataSourceType, USGSTransformer from apps.etl.models import ExtractionData from apps.etl.transform.sources.handler import BaseTransformerHandler @@ -12,7 +13,6 @@ class USGSTransformHandler(BaseTransformerHandler[USGSTransformer, USGSDataSourc transformer_class = USGSTransformer transformer_schema = USGSDataSource - @classmethod @classmethod def get_schema_data(cls, extraction_obj): losses_data_qs = ExtractionData.objects.filter(parent=extraction_obj) @@ -39,7 +39,13 @@ def get_schema_data(cls, extraction_obj): tmp_data_file.write(data) data_path = tmp_data_file.name - return cls.transformer_schema(source_url=extraction_obj.url, data=data_path, losses_data=losses_data_path) + return cls.transformer_schema( + USGSDataSourceType( + source_url=extraction_obj.url, + event_data=File(path=data_path, data_type=DataType.FILE), + loss_data=File(path=losses_data_path, data_type=DataType.FILE), + ) + ) @staticmethod @app.task(queue=CeleryQueue.TRANSFORM) diff --git a/libs/pystac-monty b/libs/pystac-monty index 6e51e2a6..5c5777d3 160000 --- a/libs/pystac-monty +++ b/libs/pystac-monty @@ -1 +1 @@ -Subproject commit 6e51e2a6475d7300fa726b948393842e080d93c8 +Subproject commit 5c5777d33f328742d6530c22cf9813d7778e9472 From 47b3b73b0c6fc7e11c46bb48ac08854539459da3 Mon Sep 17 00:00:00 2001 From: Ranjan Shrestha Date: Wed, 4 Jun 2025 11:19:53 +0545 Subject: [PATCH 21/21] Update class schema for desinventar source --- apps/etl/transform/sources/desinventar.py | 8 +++++--- libs/pystac-monty | 2 +- 2 files changed, 6 insertions(+), 4 deletions(-) diff --git a/apps/etl/transform/sources/desinventar.py b/apps/etl/transform/sources/desinventar.py index c358bd22..9bad5e1e 100644 --- a/apps/etl/transform/sources/desinventar.py +++ b/apps/etl/transform/sources/desinventar.py @@ -2,6 +2,7 @@ import tempfile from pystac_monty.geocoding import TheirGeocoder +from pystac_monty.sources.common import DataType, DesinventarDataSourceType, File from pystac_monty.sources.desinventar import ( DesinventarDataSource, DesinventarTransformer, @@ -29,12 +30,13 @@ def get_schema_data(cls, extraction_obj: ExtractionData, country_code: str, iso3 tmp_zip_file = tempfile.NamedTemporaryFile(suffix=".zip", delete=False) tmp_zip_file.write(file_content) - return cls.transformer_schema( - tmp_zip_file=tmp_zip_file, + data_source = DesinventarDataSourceType( + tmp_zip_file=File(path=tmp_zip_file, data_type=DataType.FILE), source_url=f"https://www.desinventar.net/DesInventar/download/DI_export_{country_code}.zip", - country_code=country_code, iso3=iso3, + country_code=country_code, ) + return cls.transformer_schema(data_source) @classmethod def handle_transformation(cls, extraction_id: int): # type: ignore[reportIncompatibleMethodOverride] diff --git a/libs/pystac-monty b/libs/pystac-monty index 5c5777d3..0446399c 160000 --- a/libs/pystac-monty +++ b/libs/pystac-monty @@ -1 +1 @@ -Subproject commit 5c5777d33f328742d6530c22cf9813d7778e9472 +Subproject commit 0446399c922456430fa24c3ef55d950d9af62279