22
33import datetime as dt
44import logging
5- import os
65import re
6+ import tempfile
77from abc import ABC , abstractmethod
88from functools import reduce
9- from pathlib import Path
109from typing import Any
1110from warnings import warn
1211
@@ -417,16 +416,42 @@ def _get_coverage_description(self, coverage_id: str) -> dict[Any, Any]:
417416 response = self ._client .get (url , params = params )
418417 return xmltodict .parse (response .text )
419418
420- def _transform_grib_to_df (self ) -> pd .DataFrame :
419+ def _grib_bytes_to_df (self , grib_str : bytes ) -> pd .DataFrame :
421420 """(Protected)
422- Transform grib file into pandas dataframe.
421+ Converts GRIB data (in binary format) into a pandas DataFrame.
422+
423+ This method writes the binary GRIB data to a temporary file, reads it using
424+ the `cfgrib` engine via xarray, and converts the resulting xarray Dataset
425+ into a pandas DataFrame.
426+
427+ Args:
428+ grib_str (bytes): Binary GRIB data as a byte string.
423429
424430 Returns:
425- DataFrame of the read data.
431+ pd.DataFrame: A pandas DataFrame containing the extracted GRIB data,
432+ with columns like `time`, `latitude`, `longitude`, and any associated
433+ variables from the GRIB file.
434+
435+ Raises:
436+ ValueError: If the input `grib_str` is not of type `bytes` or `bytearray`.
437+
438+ Notes:
439+ - The method requires the `cfgrib` engine to be installed.
440+ - The temporary file used for parsing is automatically deleted after use.
441+ - Ensure the input GRIB data is valid and encoded in a binary format.
426442 """
427443
428- ds = xr .open_dataset (self .filepath , engine = "cfgrib" )
429- df = ds .to_dataframe ().reset_index ()
444+ with tempfile .NamedTemporaryFile () as temp_file :
445+ # Write the GRIB binary data to the temporary file
446+ temp_file .write (grib_str )
447+ temp_file .flush () # Ensure the data is written to disk
448+
449+ # Open the GRIB file as an xarray Dataset using the cfgrib engine
450+ ds = xr .open_dataset (temp_file .name , engine = "cfgrib" )
451+
452+ # Convert the Dataset to a pandas DataFrame
453+ df = ds .to_dataframe ().reset_index ()
454+
430455 return df
431456
432457 def _get_data_single_forecast (
@@ -453,7 +478,7 @@ def _get_data_single_forecast(
453478 pd.DataFrame: The forecast for the specified time.
454479 """
455480
456- filepath = self ._get_coverage_file (
481+ grib_binary : bytes = self ._get_coverage_file (
457482 coverage_id = coverage_id ,
458483 height = height ,
459484 pressure = pressure ,
@@ -462,10 +487,9 @@ def _get_data_single_forecast(
462487 long = long ,
463488 )
464489
465- df = self ._transform_grib_to_df ()
466-
467- self ._remove_coverage_files (filepath )
490+ df : pd .DataFrame = self ._grib_bytes_to_df (grib_binary )
468491
492+ # Drop and rename columns
469493 df .drop (columns = ["surface" , "valid_time" ], errors = "ignore" , inplace = True )
470494 df .rename (
471495 columns = {
@@ -474,7 +498,6 @@ def _get_data_single_forecast(
474498 },
475499 inplace = True ,
476500 )
477-
478501 known_columns = {"latitude" , "longitude" , "run" , "forecast_horizon" , "heightAboveGround" , "isobaricInhPa" }
479502 indicator_column = (set (df .columns ) - known_columns ).pop ()
480503
@@ -501,42 +524,6 @@ def _get_data_single_forecast(
501524
502525 return df
503526
504- def _remove_coverage_files (self , filepath : Path ) -> None :
505- """
506- Removes a coverage file and its associated index files (.idx).
507-
508- If the parent directory becomes empty after file removal, it deletes the parent directory.
509-
510- Args:
511- filepath (Path): Path to the main coverage file to be removed.
512-
513- Raises:
514- FileNotFoundError: If the specified file does not exist.
515- PermissionError: If the file or directory cannot be removed due to insufficient permissions.
516- """
517- # Ensure filepath is a Path object
518- filepath = Path (filepath )
519-
520- # remove file
521- os .remove (str (filepath ))
522- # Remove the main file
523- if filepath .exists ():
524- filepath .unlink ()
525-
526- # remove potential idx files
527- idx_files = filepath .parent .glob (f"{ filepath .name } .*.idx" )
528- for idx_file in idx_files :
529- os .remove (idx_file )
530-
531- # Remove the parent directory if it's empty
532- parent_dir = filepath .parent
533- try :
534- if not any (parent_dir .iterdir ()): # Check if the directory is empty
535- parent_dir .rmdir ()
536- except OSError as e :
537- # Handle potential errors (e.g., directory in use or permissions issue)
538- raise PermissionError (f"Failed to remove directory '{ parent_dir } ': { e } " ) from e
539-
540527 def _get_coverage_file (
541528 self ,
542529 coverage_id : str ,
@@ -545,11 +532,9 @@ def _get_coverage_file(
545532 forecast_horizon_in_seconds : int = 0 ,
546533 lat : tuple = (37.5 , 55.4 ),
547534 long : tuple = (- 12 , 16 ),
548- file_format : str = "grib" ,
549- filepath : Path | None = None ,
550- ) -> Path :
535+ ) -> bytes :
551536 """(Protected)
552- Retrieve raster data for a specified model prediction and saves it to a file.
537+ Retrieves raster data for a specified model prediction and saves it to a file.
553538
554539 If no `filepath` is provided, the file is saved to a default cache directory under
555540 the current working directory.
@@ -580,41 +565,23 @@ def _get_coverage_file(
580565 See Also:
581566 raster.plot_tiff_file: Method for plotting raster data stored in TIFF format.
582567 """
583- self .filepath = filepath
584-
585- file_extension = "tiff" if file_format == "tiff" else "grib"
586-
587- filename = (
588- f"{ height or '_' } m_{ forecast_horizon_in_seconds } Z_{ lat [0 ]} -{ lat [1 ]} _{ long [0 ]} -{ long [1 ]} .{ file_extension } "
589- )
590-
591- if self .filepath is None :
592- current_working_directory = Path (os .getcwd ())
593- self .filepath = current_working_directory / coverage_id / filename
594- self .folderpath = current_working_directory / coverage_id
595- logger .debug (f"{ self .filepath } " )
596- logger .debug ("File not found in Cache, fetching data" )
597- url = f"{ self ._model_base_path } /{ self ._entry_point } /GetCoverage"
598- params = {
599- "service" : "WCS" ,
600- "version" : "2.0.1" ,
601- "coverageid" : coverage_id ,
602- "format" : "application/wmo-grib" ,
603- "subset" : [
604- * ([f"pressure({ pressure } )" ] if pressure is not None else []),
605- * ([f"height({ height } )" ] if height is not None else []),
606- f"time({ forecast_horizon_in_seconds } )" ,
607- f"lat({ lat [0 ]} ,{ lat [1 ]} )" ,
608- f"long({ long [0 ]} ,{ long [1 ]} )" ,
609- ],
610- }
611- response = self ._client .get (url , params = params )
612-
613- self .filepath .parent .mkdir (parents = True , exist_ok = True )
614- with open (self .filepath , "wb" ) as f :
615- f .write (response .content )
568+ url = f"{ self ._model_base_path } /{ self ._entry_point } /GetCoverage"
569+ params = {
570+ "service" : "WCS" ,
571+ "version" : "2.0.1" ,
572+ "coverageid" : coverage_id ,
573+ "format" : "application/wmo-grib" ,
574+ "subset" : [
575+ * ([f"pressure({ pressure } )" ] if pressure is not None else []),
576+ * ([f"height({ height } )" ] if height is not None else []),
577+ f"time({ forecast_horizon_in_seconds } )" ,
578+ f"lat({ lat [0 ]} ,{ lat [1 ]} )" ,
579+ f"long({ long [0 ]} ,{ long [1 ]} )" ,
580+ ],
581+ }
582+ response = self ._client .get (url , params = params )
616583
617- return self . filepath
584+ return response . content
618585
619586 @staticmethod
620587 def _get_available_feature (grid_axis : list [dict [str , Any ]], feature_name : str ) -> list [int ]:
0 commit comments