|
1 |
| -from pystac_monty.sources.idu import IDUDataSource, IDUTransformer |
| 1 | +from pystac_monty.sources.common import DataType, File |
| 2 | +from pystac_monty.sources.idu import IDUDataSourceV2, IDUTransformer |
2 | 3 |
|
3 | 4 | from apps.etl.transform.sources.handler import BaseTransformerHandler
|
| 5 | +from apps.etl.utils import write_into_temp_file |
4 | 6 | from main.celery import CeleryQueue, app
|
5 | 7 |
|
6 | 8 |
|
7 |
| -class IDUTransformHandler(BaseTransformerHandler[IDUTransformer, IDUDataSource]): |
| 9 | +class IDUTransformHandler(BaseTransformerHandler[IDUTransformer, IDUDataSourceV2]): |
8 | 10 | transformer_class = IDUTransformer
|
9 |
| - transformer_schema = IDUDataSource |
| 11 | + transformer_schema = IDUDataSourceV2 |
10 | 12 |
|
11 | 13 | @classmethod
|
12 | 14 | def get_schema_data(cls, extraction_obj):
|
13 |
| - with extraction_obj.resp_data.open() as file_data: |
14 |
| - data = file_data.read() |
| 15 | + with extraction_obj.resp_data.open("rb") as file_data: |
| 16 | + file_content = file_data.read() |
15 | 17 |
|
16 |
| - return cls.transformer_schema(source_url=extraction_obj.url, data=data) |
| 18 | + data_file = write_into_temp_file(content=file_content) |
| 19 | + |
| 20 | + data_source = {"source_url": extraction_obj.url, "source_data": File(path=data_file.name, data_type=DataType.FILE)} |
| 21 | + |
| 22 | + return cls.transformer_schema(data_source) |
17 | 23 |
|
18 | 24 | @staticmethod
|
19 | 25 | @app.task(queue=CeleryQueue.DEFAULT)
|
|
0 commit comments