Skip to content

Add importer for DWD radar data products and reprojection onto a regular grid as part of #464 #483

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 7 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
385 changes: 385 additions & 0 deletions pysteps/io/importers.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,10 +82,14 @@
import_odim_hdf5
import_opera_hdf5
import_saf_crri
import_dwd_hdf5
import_dwd_radolan
"""

import gzip
import os
import array
import datetime
from functools import partial

import numpy as np
Expand Down Expand Up @@ -1617,3 +1621,384 @@ def _import_saf_crri_geodata(filename):
ds_rainfall.close()

return geodata


@postprocess_import()
def import_dwd_hdf5(filename, qty="RATE", **kwargs):
"""
Import a DWD precipitation product field (and optionally the quality
field) from a HDF5 file conforming to the ODIM specification

Parameters
----------
filename: str
Name of the file to import.
qty: {'RATE', 'ACRR', 'DBZH'}
The quantity to read from the file. The currently supported identitiers
are: 'RATE'=instantaneous rain rate (mm/h), 'ACRR'=hourly rainfall
accumulation (mm) and 'DBZH'=max-reflectivity (dBZ). The default value
is 'RATE'.

{extra_kwargs_doc}

Returns
-------
out: tuple
A three-element tuple containing the DWD preciptiaton product with the
requested quantity and the associated quality field and metadata. The
quality field is read from the file if it contains a dataset whose
quantity identifier is 'QIND'.
"""
if not H5PY_IMPORTED:
raise MissingOptionalDependency(
"h5py package is required to import "
"radar reflectivity composites using ODIM HDF5 specification "
"but it is not installed"
)

if not PYPROJ_IMPORTED:
raise MissingOptionalDependency(
"pyproj package is required to import "
"DWD's radar reflectivity composite "
"but it is not installed"
)

if qty not in ["ACRR", "DBZH", "RATE"]:
raise ValueError(
"unknown quantity %s: the available options are 'ACRR', 'DBZH' and 'RATE'"
)

f = h5py.File(filename, "r")

precip = None
quality = None

file_content = {}
_read_hdf5_cont(f, file_content)
f.close()

data_prop = {}
_get_whatgrp(file_content, data_prop)

arr = file_content["dataset1"]["data1"]["data"]
mask_n = arr == data_prop["nodata"]
mask_u = arr == data_prop["undetect"]
mask = np.logical_and(~mask_u, ~mask_n)

if data_prop["quantity"] == qty:
precip = np.empty(arr.shape)
precip[mask] = arr[mask] * data_prop["gain"] + data_prop["offset"]
if qty != "DBZH":
precip[mask_u] = data_prop["offset"]
else:
precip[mask_u] = -32.5
precip[mask_n] = np.nan
elif data_prop["quantity"] == "QIND":
quality = np.empty(arr.shape, dtype=float)
quality[mask] = arr[mask]
quality[~mask] = np.nan

if precip is None:
raise IOError("requested quantity %s not found" % qty)

pr = pyproj.Proj(file_content["where"]["projdef"])

ll_x, ll_y = pr(file_content["where"]["LL_lon"], file_content["where"]["LL_lat"])
ur_x, ur_y = pr(file_content["where"]["UR_lon"], file_content["where"]["UR_lat"])

if len([k for k in file_content["where"].keys() if "_lat" in k]) == 4:
lr_x, lr_y = pr(
file_content["where"]["LR_lon"], file_content["where"]["LR_lat"]
)
ul_x, ul_y = pr(
file_content["where"]["UL_lon"], file_content["where"]["UL_lat"]
)
x1 = min(ll_x, ul_x)
y1 = min(ll_y, lr_y)
x2 = max(lr_x, ur_x)
y2 = max(ul_y, ur_y)
else:
x1 = ll_x
y1 = ll_y
x2 = ur_x
y2 = ur_y

if (
"where" in file_content["dataset1"].keys()
and "xscale" in file_content["dataset1"]["where"].keys()
):
xpixelsize = file_content["dataset1"]["where"]["xscale"]
ypixelsize = file_content["dataset1"]["where"]["yscale"]
elif "xscale" in file_content["where"].keys():
xpixelsize = file_content["where"]["xscale"]
ypixelsize = file_content["where"]["yscale"]
else:
xpixelsize = None
ypixelsize = None

if qty == "ACRR":
unit = "mm"
transform = None
elif qty == "DBZH":
unit = "dBZ"
transform = "dB"
else:
unit = "mm/h"
transform = None

startdate = datetime.datetime.strptime(
file_content["dataset1"]["what"]["startdate"]
+ file_content["dataset1"]["what"]["starttime"],
"%Y%m%d%H%M%S",
)
enddate = datetime.datetime.strptime(
file_content["dataset1"]["what"]["enddate"]
+ file_content["dataset1"]["what"]["endtime"],
"%Y%m%d%H%M%S",
)
accutime = (enddate - startdate).total_seconds() / 60.0

metadata = {
"projection": file_content["where"]["projdef"],
"ll_lon": file_content["where"]["LL_lon"],
"ll_lat": file_content["where"]["LL_lat"],
"ur_lon": file_content["where"]["UR_lon"],
"ur_lat": file_content["where"]["UR_lat"],
"x1": x1,
"y1": y1,
"x2": x2,
"y2": y2,
"xpixelsize": xpixelsize,
"ypixelsize": ypixelsize,
"cartesian_unit": "m",
"yorigin": "upper",
"institution": file_content["what"]["source"],
"accutime": accutime,
"unit": unit,
"transform": transform,
"zerovalue": np.nanmin(precip),
"threshold": _get_threshold_value(precip),
}

metadata.update(kwargs)

f.close()

return precip, quality, metadata


def _read_hdf5_cont(f, d):
"""
Recursively read nested dictionaries from a HDF5 file.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good to add some extra docstrings here about what the input variables are and what you expect to return.

"""

# set simple types of hdf content
group_type = h5py._hl.group.Group

for key, value in f.items():

if isinstance(value, group_type):

d[key] = {}

if len(list(value.items())) > 0:

_read_hdf5_cont(value, d[key])

else:

d[key] = {attr: value.attrs[attr] for attr in value.attrs}
d[key] = {
k: v.decode() if type(v) == np.bytes_ else v
for k, v in d[key].items()
}

else:

try:

d[key] = np.array(value)

except TypeError:

d[key] = np.array(value.astype(float))

except:

d[key] = value.value
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What exception are you expecting here?


return


def _get_whatgrp(d, g):
"""
Recursively get the what group containing the data field properties.
"""
if "what" in d.keys():
if "gain" in d["what"].keys():
g.update(d["what"])
else:
k = [k for k in d.keys() if "data" in k][0]
_get_whatgrp(d[k], g)
else:
raise DataModelError(
"Non ODIM compliant file: "
"no what group found from {} "
"or its subgroups".format(d.keys()[0])
)
return


@postprocess_import()
def import_dwd_radolan(filename, product):
"""
Import a RADOLAN precipitation product from a binary file.

Parameters
----------
filename: str
Name of the file to import.
product: {'WX','RX','EX','RY','RW','AY','RS','YW','WN'}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it make sense to call this variable product_name? Then the prod variable can get a longer, more descriptive name as well.

The specific product to read from the file. Please see
https://www.dwd.de/DE/leistungen/radolan/radolan_info/
radolan_radvor_op_komposit_format_pdf.pdf
for a detailed description.

{extra_kwargs_doc}

Returns
-------
out: tuple
A three-element tuple containing the precipitation field in mm/h imported
from a MeteoSwiss gif file and the associated quality field and metadata.
The quality field is currently set to None.
"""
size_file = os.path.getsize(filename)
size_data = np.round(size_file, -3)
size_header = size_file - size_data

f = open(filename, "rb")
header = f.read(size_header).decode("utf-8")

prod = header[:2]

assert prod == product, "Product not in File!"

prod_cat1 = np.array(["WX", "RX"])
prod_cat2 = np.array(["RY", "RW", "YW"])

nbyte = 1 if prod in prod_cat1 else 2
signed = "B" if prod in prod_cat1 else "H"

fac = int(header.split("E-")[1].split("INT")[0])
dimsplit = header.split("x")
dims = np.array((dimsplit[0][-4:], dimsplit[1][:4]), dtype=int)[::-1]

data = array.array(signed)
data.fromfile(f, size_data // nbyte)

f.close()

data = np.array(np.reshape(data, dims, order="F"), dtype=float).T

if prod == "SF":
no_echo_value = 0.0
elif prod in prod_cat2:
no_echo_value = -0.01
else:
no_echo_value = -32.5

if prod in prod_cat1:
data[data >= 249] = np.nan
data = data / 2.0 + no_echo_value
elif prod in prod_cat2:
data, no_data_mask = _identify_info_bits(data)
if prod == "AY":
data = (10 ** (fac * -1)) * data / 2.0 + no_echo_value
else:
data = (10 ** (fac * -1)) * data + no_echo_value
else:
data, no_data_mask = _identify_info_bits(data)
data = (10 ** (fac * -1)) * data / 2.0 + no_echo_value

data[no_data_mask] = np.nan

geodata = _import_dwd_geodata(product, dims)
metadata = geodata

return data, None, metadata


def _identify_info_bits(data):

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you also add some docstrings here?

# clutter
clutter_mask = data - 2**15 >= 0.0
data[clutter_mask] = 0

# negative values
mask = data - 2**14 >= 0.0
data[mask] -= 2**14

# no data
no_data_mask = data - 2**13 == 2500.0

if np.sum(no_data_mask) == 0.0:

no_data_mask = data - 2**13 == 0.0

data[no_data_mask] = 0.0

# secondary data
data[data - 2**12 > 0.0] -= 2**12

data[mask] *= -1

return data, no_data_mask


def _import_dwd_geodata(product, dims):
"""
Geodata for RADOLAN products. Hard-coded here, cause there is only basic
information about the projection in the header of the binary RADOLAN
files.
"""
Copy link
Contributor

@RubenImhoff RubenImhoff Jul 1, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add some more docstrings here, too (input parameters and output). I have already added much more docstrings, but feel free to edit them from here.


geodata = {}

projdef = "+a=6378137.0 +b=6356752.0 +proj=stere +lat_ts=60.0 +lat_0=90.0 +lon_0=10.0 +x_0=0 +y_0=0"
geodata["projection"] = projdef

geodata["xpixelsize"] = 1000.0
geodata["ypixelsize"] = 1000.0
geodata["cartesian_unit"] = "m"
geodata["yorigin"] = "upper"

prod_cat1 = ["RX", "RY", "RW"] # 900x900
prod_cat2 = ["WN"] # 1200x1100
prod_cat3 = ["WX", "YW"] # 1100x900

if product in prod_cat1: # lower left
lon = 3.604382995
lat = 46.95361536
if product in prod_cat2: # lower left
lon = 3.566994635
lat = 45.69642538
if product in prod_cat3: # center
lon = 9.0
lat = 51.0

pr = pyproj.Proj(projdef)
x1, y1 = pr(lon, lat)
if product in prod_cat3:
x1 -= dims[0] * 1000 // 2
y1 -= dims[1] * 1000 // 2 - 80000

x2 = x1 + dims[0] * 1000
y2 = y1 + dims[1] * 1000

geodata["x1"] = x1
geodata["y1"] = y1
geodata["x2"] = x2
geodata["y2"] = y2

return geodata
Loading