|
| 1 | +import json |
| 2 | +import pytest |
| 3 | +from pathlib import Path |
| 4 | +from unittest.mock import patch, MagicMock |
| 5 | +from django.conf import settings |
| 6 | +from django.test import override_settings |
| 7 | +from django.core.serializers import serialize |
| 8 | + |
| 9 | +from apps.etl.etl_tasks.emdat import ext_and_transform_emdat_latest_data |
| 10 | +from apps.etl.models import ExtractionData, Transform, PyStacLoadData |
| 11 | +from pystac_monty.sources.common import MontyDataTransformer |
| 12 | + |
| 13 | +MontyDataTransformer.base_collection_url = "/code/libs/pystac-monty/monty-stac-extension/examples" |
| 14 | + |
| 15 | +@override_settings(CELERY_TASK_ALWAYS_EAGER=True) |
| 16 | +@pytest.mark.django_db |
| 17 | +def test_handle_extraction_with_mocked_request(): |
| 18 | + """ |
| 19 | + Test the GIDD extraction process by mocking the request sent to the extractor. |
| 20 | + Ensures that Celery tasks run synchronously. |
| 21 | + """ |
| 22 | + settings.CELERY_TASK_ALWAYS_EAGER = True # Ensure Celery tasks run synchronously in tests |
| 23 | + |
| 24 | + json_file_path = Path('/code/apps/etl/Dataset/EM-DAT/EM-DAT.json') |
| 25 | + |
| 26 | + # Read mock data from file |
| 27 | + with open(json_file_path, 'r') as f: |
| 28 | + mock_data = json.load(f) |
| 29 | + print("Mock Data:", mock_data) # Check if data is correct |
| 30 | + |
| 31 | + # Patch 'requests.get' used inside 'ext_and_transform_emdat_latest_data' |
| 32 | + with patch('requests.get') as mock_get: |
| 33 | + mock_response = MagicMock() |
| 34 | + mock_response.status_code = 200 |
| 35 | + mock_response.json.return_value = mock_data # Mock .json() response |
| 36 | + |
| 37 | + # Ensure that content is also correctly mocked (return valid JSON as bytes) |
| 38 | + mock_response.content = json.dumps(mock_data).encode('utf-8') # Mock .content |
| 39 | + mock_response.headers = {"Content-Type": "application/json"} |
| 40 | + |
| 41 | + # Mock requests.get() to return this response |
| 42 | + mock_get.return_value = mock_response |
| 43 | + |
| 44 | + # Call the function (without parameters) - it will use the patched requests.get |
| 45 | + ext_and_transform_emdat_latest_data() |
| 46 | + |
| 47 | + # Assertions: Check if data was correctly extracted and stored |
| 48 | + assert ExtractionData.objects.count() == 25 |
| 49 | + assert Transform.objects.count() == 25 |
| 50 | + assert PyStacLoadData.objects.count() == 400 # Ensure expected number of records |
| 51 | + |
| 52 | + # Fetch last processed data (latest 10 records) |
| 53 | + latest_data = PyStacLoadData.objects.all().order_by('-id')[:10] |
| 54 | + latest_data_json = serialize('json', latest_data) # Convert queryset to JSON format |
| 55 | + latest_data_dict = json.loads(latest_data_json) # Convert JSON string to dictionary |
| 56 | + |
| 57 | + # Save the latest processed data to a JSON file |
| 58 | + output_path = Path('/code/output/output_emdat.json') |
| 59 | + output_path.parent.mkdir(parents=True, exist_ok=True) # Ensure the directory exists |
| 60 | + |
| 61 | + with open(output_path, 'w', encoding='utf-8') as json_file: |
| 62 | + json.dump(latest_data_dict, json_file, ensure_ascii=False, indent=4) |
| 63 | + |
| 64 | + # Assert JSON file was created |
| 65 | + assert output_path.exists(), f"Expected output JSON file {output_path} was not created." |
0 commit comments