diff --git a/src/meteole/forecast.py b/src/meteole/forecast.py index cfb9400..8b66b89 100644 --- a/src/meteole/forecast.py +++ b/src/meteole/forecast.py @@ -2,11 +2,10 @@ import datetime as dt import logging -import os import re +import tempfile from abc import ABC, abstractmethod from functools import reduce -from pathlib import Path from typing import Any from warnings import warn @@ -371,11 +370,42 @@ def _get_coverage_description(self, coverage_id: str) -> dict: response = self._client.get(url, params=params) return xmltodict.parse(response.text) - def _transform_grib_to_df(self) -> pd.DataFrame: - "Transform grib file into pandas dataframe" + def _grib_bytes_to_df(self, grib_str: bytes) -> pd.DataFrame: + """ + Converts GRIB data (in binary format) into a pandas DataFrame. + + This method writes the binary GRIB data to a temporary file, reads it using + the `cfgrib` engine via xarray, and converts the resulting xarray Dataset + into a pandas DataFrame. + + Args: + grib_str (bytes): Binary GRIB data as a byte string. + + Returns: + pd.DataFrame: A pandas DataFrame containing the extracted GRIB data, + with columns like `time`, `latitude`, `longitude`, and any associated + variables from the GRIB file. + + Raises: + ValueError: If the input `grib_str` is not of type `bytes` or `bytearray`. + + Notes: + - The method requires the `cfgrib` engine to be installed. + - The temporary file used for parsing is automatically deleted after use. + - Ensure the input GRIB data is valid and encoded in a binary format. + """ + + with tempfile.NamedTemporaryFile() as temp_file: + # Write the GRIB binary data to the temporary file + temp_file.write(grib_str) + temp_file.flush() # Ensure the data is written to disk + + # Open the GRIB file as an xarray Dataset using the cfgrib engine + ds = xr.open_dataset(temp_file.name, engine="cfgrib") + + # Convert the Dataset to a pandas DataFrame + df = ds.to_dataframe().reset_index() - ds = xr.open_dataset(self.filepath, engine="cfgrib") - df = ds.to_dataframe().reset_index() return df def _get_data_single_forecast( @@ -401,7 +431,7 @@ def _get_data_single_forecast( pd.DataFrame: The forecast for the specified time. """ - filepath = self._get_coverage_file( + grib_binary: bytes = self._get_coverage_file( coverage_id=coverage_id, height=height, pressure=pressure, @@ -410,10 +440,9 @@ def _get_data_single_forecast( long=long, ) - df = self._transform_grib_to_df() - - self._remove_coverage_files(filepath) + df: pd.DataFrame = self._grib_bytes_to_df(grib_binary) + # Drop and rename columns df.drop(columns=["surface", "valid_time"], errors="ignore", inplace=True) df.rename( columns={ @@ -422,7 +451,6 @@ def _get_data_single_forecast( }, inplace=True, ) - known_columns = {"latitude", "longitude", "run", "forecast_horizon", "heightAboveGround", "isobaricInhPa"} indicator_column = (set(df.columns) - known_columns).pop() @@ -449,42 +477,6 @@ def _get_data_single_forecast( return df - def _remove_coverage_files(self, filepath: Path) -> None: - """ - Removes a coverage file and its associated index files (.idx). - - If the parent directory becomes empty after file removal, it deletes the parent directory. - - Args: - filepath (Path): Path to the main coverage file to be removed. - - Raises: - FileNotFoundError: If the specified file does not exist. - PermissionError: If the file or directory cannot be removed due to insufficient permissions. - """ - # Ensure filepath is a Path object - filepath = Path(filepath) - - # remove file - os.remove(str(filepath)) - # Remove the main file - if filepath.exists(): - filepath.unlink() - - # remove potential idx files - idx_files = filepath.parent.glob(f"{filepath.name}.*.idx") - for idx_file in idx_files: - os.remove(idx_file) - - # Remove the parent directory if it's empty - parent_dir = filepath.parent - try: - if not any(parent_dir.iterdir()): # Check if the directory is empty - parent_dir.rmdir() - except OSError as e: - # Handle potential errors (e.g., directory in use or permissions issue) - raise PermissionError(f"Failed to remove directory '{parent_dir}': {e}") from e - def _get_coverage_file( self, coverage_id: str, @@ -493,9 +485,7 @@ def _get_coverage_file( forecast_horizon_in_seconds: int = 0, lat: tuple = (37.5, 55.4), long: tuple = (-12, 16), - file_format: str = "grib", - filepath: Path | None = None, - ) -> Path: + ) -> bytes: """ Retrieves raster data for a specified model prediction and saves it to a file. @@ -528,41 +518,23 @@ def _get_coverage_file( See Also: raster.plot_tiff_file: Method for plotting raster data stored in TIFF format. """ - self.filepath = filepath - - file_extension = "tiff" if file_format == "tiff" else "grib" - - filename = ( - f"{height or '_'}m_{forecast_horizon_in_seconds}Z_{lat[0]}-{lat[1]}_{long[0]}-{long[1]}.{file_extension}" - ) - - if self.filepath is None: - current_working_directory = Path(os.getcwd()) - self.filepath = current_working_directory / coverage_id / filename - self.folderpath = current_working_directory / coverage_id - logger.debug(f"{self.filepath}") - logger.debug("File not found in Cache, fetching data") - url = f"{self._model_base_path}/{self._entry_point}/GetCoverage" - params = { - "service": "WCS", - "version": "2.0.1", - "coverageid": coverage_id, - "format": "application/wmo-grib", - "subset": [ - *([f"pressure({pressure})"] if pressure is not None else []), - *([f"height({height})"] if height is not None else []), - f"time({forecast_horizon_in_seconds})", - f"lat({lat[0]},{lat[1]})", - f"long({long[0]},{long[1]})", - ], - } - response = self._client.get(url, params=params) - - self.filepath.parent.mkdir(parents=True, exist_ok=True) - with open(self.filepath, "wb") as f: - f.write(response.content) + url = f"{self._model_base_path}/{self._entry_point}/GetCoverage" + params = { + "service": "WCS", + "version": "2.0.1", + "coverageid": coverage_id, + "format": "application/wmo-grib", + "subset": [ + *([f"pressure({pressure})"] if pressure is not None else []), + *([f"height({height})"] if height is not None else []), + f"time({forecast_horizon_in_seconds})", + f"lat({lat[0]},{lat[1]})", + f"long({long[0]},{long[1]})", + ], + } + response = self._client.get(url, params=params) - return self.filepath + return response.content @staticmethod def _get_available_feature(grid_axis, feature_name): diff --git a/tests/test_forecasts.py b/tests/test_forecasts.py index 5234440..fbe8f43 100644 --- a/tests/test_forecasts.py +++ b/tests/test_forecasts.py @@ -156,43 +156,10 @@ def test_get_coverage_description(self, mock_get_request, mock_get_capabilities) self.assertIn("wcs:CoverageDescriptions", description) @patch("meteole._arome.AromeForecast.get_capabilities") - @patch("meteole.clients.MeteoFranceClient.get") - def test_get_coverage_file(self, mock_get_request, mock_get_capabilities): - mock_response = MagicMock() - mock_response.content = b"fake_data" - mock_get_request.return_value = mock_response - mock_get_capabilities.return_value = None - - forecast = AromeForecast( - self.client, - precision=self.precision, - territory=self.territory, - ) - - coverage_id = "coverage_1" - path = forecast._get_coverage_file( - coverage_id=coverage_id, - height=2, - forecast_horizon_in_seconds=0, - lat=(37.5, 55.4), - long=(-12, 16), - ) - - expected_path = Path(os.getcwd()) / coverage_id / "2m_0Z_37.5-55.4_-12-16.grib" - self.assertTrue(expected_path.exists()) - self.assertTrue(expected_path == path) - - # remove the folder created in _get_coverage_file - forecast._remove_coverage_files(path) - - @patch("meteole._arome.AromeForecast.get_capabilities") - @patch("meteole._arome.AromeForecast._transform_grib_to_df") + @patch("meteole._arome.AromeForecast._grib_bytes_to_df") @patch("meteole._arome.AromeForecast._get_coverage_file") - @patch("meteole._arome.AromeForecast._remove_coverage_files") - def test_get_data_single_forecast( - self, mock_remove_coverage_files, mock_get_coverage_file, mock_transform_grib_to_df, mock_get_capabilities - ): - mock_transform_grib_to_df.return_value = pd.DataFrame({"data": [1, 2, 3]}) + def test_get_data_single_forecast(self, mock_get_coverage_file, mock_grib_bytes_to_df, mock_get_capabilities): + mock_grib_bytes_to_df.return_value = pd.DataFrame({"data": [1, 2, 3]}) forecast = AromeForecast( self.client, @@ -212,13 +179,13 @@ def test_get_data_single_forecast( self.assertTrue("data" in df.columns) @patch("meteole._arome.AromeForecast.get_capabilities") - @patch("meteole._arome.AromeForecast._transform_grib_to_df") + @patch("meteole._arome.AromeForecast._grib_bytes_to_df") @patch("meteole._arome.AromeForecast._get_coverage_file") - @patch("meteole._arome.AromeForecast._remove_coverage_files") def test_get_data_single_forecast_with_height( - self, mock_remove_coverage_files, mock_get_coverage_file, mock_transform_grib_to_df, mock_get_capabilities + self, mock_get_coverage_file, mock_grib_bytes_to_df, mock_get_capabilities ): - mock_transform_grib_to_df.return_value = pd.DataFrame({"data": [1, 2, 3], "heightAboveGround": ["2", "2", "2"]}) + mock_get_coverage_file.return_value = "" + mock_grib_bytes_to_df.return_value = pd.DataFrame({"data": [1, 2, 3], "heightAboveGround": ["2", "2", "2"]}) forecast = AromeForecast( self.client,