Skip to content

Commit 4f6d353

Browse files
committed
FIX: various fixes and uniformization for s3 reading/writing
1 parent 76fe7e7 commit 4f6d353

File tree

4 files changed

+211
-169
lines changed

4 files changed

+211
-169
lines changed

src/pyrad_proc/pyrad/flow/flow_aux.py

Lines changed: 52 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -1000,16 +1000,24 @@ def _generate_prod(dataset, cfg, prdname, prdfunc, dsname, voltime, runinfo=None
10001000
filenames = prdfunc(dataset, prdcfg)
10011001
if isinstance(filenames, str): # convert to list if needed
10021002
filenames = [filenames]
1003-
if "s3copypath" in prdcfg and filenames is not None: # copy to S3
1004-
s3AccessPolicy = (
1005-
prdcfg["s3AccessPolicy"] if "s3AccessPolicy" in prdcfg else None
1006-
)
1003+
if (
1004+
"s3BucketWrite" in prdcfg
1005+
and "s3EndpointWrite" in prdcfg
1006+
and filenames is not None
1007+
): # copy to S3
1008+
s3AccessPolicy = prdcfg.get("s3AccessPolicy", None)
1009+
s3path = prdcfg.get("s3PathWrite", None)
10071010
for fname in filenames:
10081011
if (
10091012
prdcfg["basepath"] in fname
10101013
): # only products saved to standard basepath
10111014
write_to_s3(
1012-
fname, prdcfg["basepath"], prdcfg["s3copypath"], s3AccessPolicy
1015+
fname,
1016+
prdcfg["basepath"],
1017+
prdcfg["s3EndpointWrite"],
1018+
prdcfg["s3BucketWrite"],
1019+
s3path,
1020+
s3AccessPolicy,
10131021
)
10141022
return False
10151023
except Exception as inst:
@@ -1221,6 +1229,9 @@ def _create_cfg_dict(cfgfile):
12211229
if "MFScale" not in cfg:
12221230
cfg.update({"MFScale": 0})
12231231

1232+
if not cfg["datapath"]: # empty datapath in case of s3 reading
1233+
cfg["datapath"] = ["" for rad in range(cfg["NumRadars"])]
1234+
12241235
# parameters necessary to read correctly MF grid binary files
12251236
if "BinFileParams" not in cfg:
12261237
bin_file_params = {
@@ -1333,35 +1344,35 @@ def _create_datacfg_dict(cfg):
13331344
datacfg.update({"DataTypeIDInFiles": cfg["DataTypeIDInFiles"]})
13341345

13351346
# s3 buckets
1336-
if "bucket" in cfg:
1347+
if "s3BucketRead" in cfg:
13371348
try:
1338-
datacfg["s3_key"] = os.environ["S3_IN_KEY"]
1339-
datacfg["s3_secret_key"] = os.environ["S3_IN_SECRET"]
1349+
datacfg["s3KeyRead"] = os.environ["S3_KEY_READ"]
1350+
datacfg["s3SecretRead"] = os.environ["S3_SECRET_READ"]
13401351
except KeyError:
13411352
warn(
1342-
"Define environment variables S3_IN_KEY and S3_IN_SECRET"
1353+
"Define environment variables S3_KEY_READ and S3_SECRET_READ"
13431354
" to get input data from S3 buckets."
13441355
)
13451356

1346-
if "s3path" in cfg:
1347-
datacfg.update({"s3path": cfg["s3path"]})
1357+
if "s3PathRead" in cfg:
1358+
datacfg.update({"s3PathRead": cfg["s3PathRead"]})
13481359
else:
1349-
warn("Unable to read data from s3 bucket. Define s3path")
1350-
if "s3_url" in cfg:
1351-
datacfg.update({"s3_url": cfg["s3_url"]})
1360+
warn("Unable to read data from s3 bucket. Define s3PathRead")
1361+
if "s3EndpointRead" in cfg:
1362+
datacfg.update({"s3EndpointRead": cfg["s3EndpointRead"]})
13521363
else:
1353-
warn("Unable to read data from s3 bucket. Define s3_url")
1364+
warn("Unable to read data from s3 bucket. Define s3EndpointRead")
13541365

13551366
if "rm_s3_file" in cfg:
13561367
datacfg.update({"rm_s3_file": cfg["rm_s3_file"]})
13571368

13581369
if (
1359-
"s3path" in datacfg
1360-
and "s3_url" in datacfg
1361-
and "s3_key" in datacfg
1362-
and "s3_secret_key" in datacfg
1370+
"s3PathRead" in datacfg
1371+
and "s3EndpointRead" in datacfg
1372+
and "s3KeyRead" in datacfg
1373+
and "s3SecretRead" in datacfg
13631374
):
1364-
datacfg.update({"bucket": cfg["bucket"]})
1375+
datacfg.update({"s3BucketRead": cfg["s3BucketRead"]})
13651376

13661377
# Modify size of radar or radar spectra object
13671378
datacfg.update({"elmin": cfg.get("elmin", None)})
@@ -1582,11 +1593,14 @@ def _create_prdcfg_dict(cfg, dataset, product, voltime, runinfo=None):
15821593
prdcfg.update({"imgformat": cfg["imgformat"]})
15831594
prdcfg.update({"RadarName": cfg["RadarName"]})
15841595

1585-
if "s3copypath" in cfg:
1586-
prdcfg.update({"s3copypath": cfg["s3copypath"]})
1596+
if "s3EndpointWrite" in cfg:
1597+
prdcfg.update({"s3EndpointWrite": cfg["s3EndpointWrite"]})
1598+
if "s3BucketWrite" in cfg:
1599+
prdcfg.update({"s3BucketWrite": cfg["s3BucketWrite"]})
1600+
if "s3PathWrite" in cfg:
1601+
prdcfg.update({"s3PathWrite": cfg["s3PathWrite"]})
15871602
if "s3AccessPolicy" in cfg:
15881603
prdcfg.update({"s3AccessPolicy": cfg["s3AccessPolicy"]})
1589-
15901604
if "RadarBeamwidth" in cfg:
15911605
prdcfg.update({"RadarBeamwidth": cfg["RadarBeamwidth"]})
15921606
if "ppiImageConfig" in cfg:
@@ -1641,9 +1655,13 @@ def _get_datatype_list(cfg, radarnr="RADAR001"):
16411655
if "datatype" not in cfg[dataset]:
16421656
continue
16431657
if isinstance(cfg[dataset]["datatype"], str):
1644-
(radarnr_descr, datagroup, datatype_aux, dataset_save, product_save) = (
1645-
get_datatype_fields(cfg[dataset]["datatype"])
1646-
)
1658+
(
1659+
radarnr_descr,
1660+
datagroup,
1661+
datatype_aux,
1662+
dataset_save,
1663+
product_save,
1664+
) = get_datatype_fields(cfg[dataset]["datatype"])
16471665
if datagroup != "PROC" and radarnr_descr == radarnr:
16481666
if (dataset_save is None) and (product_save is None):
16491667
datatypesdescr.add(
@@ -1667,9 +1685,13 @@ def _get_datatype_list(cfg, radarnr="RADAR001"):
16671685
)
16681686
else:
16691687
for datatype in cfg[dataset]["datatype"]:
1670-
(radarnr_descr, datagroup, datatype_aux, dataset_save, product_save) = (
1671-
get_datatype_fields(datatype)
1672-
)
1688+
(
1689+
radarnr_descr,
1690+
datagroup,
1691+
datatype_aux,
1692+
dataset_save,
1693+
product_save,
1694+
) = get_datatype_fields(datatype)
16731695
if datagroup != "PROC" and radarnr_descr == radarnr:
16741696
if dataset_save is None and product_save is None:
16751697
datatypesdescr.add(
@@ -1799,7 +1821,7 @@ def _get_masterfile_list(datatypesdescr, starttimes, endtimes, datacfg, scan_lis
17991821
)
18001822
return [], None, None
18011823

1802-
if "bucket" in datacfg:
1824+
if "s3BucketRead" in datacfg:
18031825
masterfilelist = get_file_list_s3(
18041826
masterdatatypedescr, starttimes, endtimes, datacfg, scan=masterscan
18051827
)

src/pyrad_proc/pyrad/io/io_aux.py

Lines changed: 19 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -2122,12 +2122,12 @@ def get_scan_files_to_merge_s3(
21222122

21232123
s3_client = boto3.client(
21242124
"s3",
2125-
endpoint_url=cfg["s3_url"],
2126-
aws_access_key_id=cfg["s3_key"],
2127-
aws_secret_access_key=cfg["s3_secret_key"],
2125+
endpoint_url=cfg["s3EndpointRead"],
2126+
aws_access_key_id=cfg["s3KeyRead"],
2127+
aws_secret_access_key=cfg["s3SecretRead"],
21282128
verify=False,
21292129
)
2130-
response = s3_client.list_objects_v2(Bucket=cfg["bucket"])
2130+
response = s3_client.list_objects_v2(Bucket=cfg["s3BucketRead"])
21312131

21322132
dayinfo = voltime.strftime("%y%j")
21332133
timeinfo = voltime.strftime("%H%M")
@@ -2751,12 +2751,12 @@ def get_file_list_s3(datadescriptor, starttimes, endtimes, cfg, scan=None):
27512751

27522752
s3_client = boto3.client(
27532753
"s3",
2754-
endpoint_url=cfg["s3_url"],
2755-
aws_access_key_id=cfg["s3_key"],
2756-
aws_secret_access_key=cfg["s3_secret_key"],
2754+
endpoint_url=cfg["s3EndpointRead"],
2755+
aws_access_key_id=cfg["s3KeyRead"],
2756+
aws_secret_access_key=cfg["s3SecretRead"],
27572757
verify=False,
27582758
)
2759-
response = s3_client.list_objects_v2(Bucket=cfg["bucket"])
2759+
response = s3_client.list_objects_v2(Bucket=cfg["s3BucketRead"])
27602760

27612761
for starttime, endtime in zip(starttimes, endtimes):
27622762
startdate = starttime.replace(hour=0, minute=0, second=0, microsecond=0)
@@ -2775,9 +2775,9 @@ def get_file_list_s3(datadescriptor, starttimes, endtimes, cfg, scan=None):
27752775
)
27762776
daydir = (starttime + datetime.timedelta(days=i)).strftime(fpath_strf)
27772777
if daydir == "":
2778-
pattern = f'{cfg["s3path"]}*{scan}*'
2778+
pattern = f'{cfg["s3PathRead"]}*{scan}*'
27792779
else:
2780-
pattern = f'{cfg["s3path"]}{daydir}/*{scan}*'
2780+
pattern = f'{cfg["s3PathRead"]}{daydir}/*{scan}*'
27812781
dayfilelist = []
27822782
for content in response["Contents"]:
27832783
if fnmatch.fnmatch(content["Key"], pattern):
@@ -2805,7 +2805,7 @@ def get_file_list_s3(datadescriptor, starttimes, endtimes, cfg, scan=None):
28052805
f'M{cfg["RadarRes"][ind_rad]}'
28062806
f'{cfg["RadarName"][ind_rad]}{dayinfo}'
28072807
)
2808-
datapath = f'{cfg["s3path"]}{dayinfo}/{basename}/'
2808+
datapath = f'{cfg["s3PathRead"]}{dayinfo}/{basename}/'
28092809

28102810
# check that M files exist. if not search P files
28112811
pattern = f"{datapath}{basename}*{scan}*"
@@ -2818,7 +2818,7 @@ def get_file_list_s3(datadescriptor, starttimes, endtimes, cfg, scan=None):
28182818
f'P{cfg["RadarRes"][ind_rad]}'
28192819
f'{cfg["RadarName"][ind_rad]}{dayinfo}'
28202820
)
2821-
datapath = f'{cfg["s3path"]}{dayinfo}/{basename}/'
2821+
datapath = f'{cfg["s3PathRead"]}{dayinfo}/{basename}/'
28222822
pattern = f"{datapath}{basename}*{scan}*"
28232823
for content in response["Contents"]:
28242824
if fnmatch.fnmatch(content["Key"], pattern):
@@ -2837,9 +2837,9 @@ def get_file_list_s3(datadescriptor, starttimes, endtimes, cfg, scan=None):
28372837
fpath_strf
28382838
)
28392839
if daydir == "":
2840-
pattern = f'{cfg["s3path"]}*{scan}*'
2840+
pattern = f'{cfg["s3PathRead"]}*{scan}*'
28412841
else:
2842-
pattern = f'{cfg["s3path"]}{daydir}/*{scan}*'
2842+
pattern = f'{cfg["s3PathRead"]}{daydir}/*{scan}*'
28432843

28442844
dayfilelist = []
28452845
for content in response["Contents"]:
@@ -2859,9 +2859,9 @@ def get_file_list_s3(datadescriptor, starttimes, endtimes, cfg, scan=None):
28592859
fpath_strf
28602860
)
28612861
if daydir == "":
2862-
pattern = f'{cfg["s3path"]}{scan}/*'
2862+
pattern = f'{cfg["s3PathRead"]}{scan}/*'
28632863
else:
2864-
pattern = f'{cfg["s3path"]}{scan}/{daydir}/*'
2864+
pattern = f'{cfg["s3PathRead"]}{scan}/{daydir}/*'
28652865

28662866
dayfilelist = []
28672867
for content in response["Contents"]:
@@ -2874,7 +2874,7 @@ def get_file_list_s3(datadescriptor, starttimes, endtimes, cfg, scan=None):
28742874
f'{cfg["RadarName"][ind_rad]}{dayinfo}'
28752875
)
28762876
datapath = (
2877-
f'{cfg["s3path"]}M{cfg["RadarRes"][ind_rad]}'
2877+
f'{cfg["s3PathRead"]}M{cfg["RadarRes"][ind_rad]}'
28782878
f'{cfg["RadarName"][ind_rad]}'
28792879
)
28802880

@@ -2890,7 +2890,7 @@ def get_file_list_s3(datadescriptor, starttimes, endtimes, cfg, scan=None):
28902890
f'{cfg["RadarName"][ind_rad]}{dayinfo}'
28912891
)
28922892
datapath = (
2893-
f'{cfg["s3path"]}P{cfg["RadarRes"][ind_rad]}'
2893+
f'{cfg["s3PathRead"]}P{cfg["RadarRes"][ind_rad]}'
28942894
f'{cfg["RadarName"][ind_rad]}'
28952895
)
28962896
pattern = f"{datapath}{basename}*{scan}*"
@@ -2910,7 +2910,7 @@ def get_file_list_s3(datadescriptor, starttimes, endtimes, cfg, scan=None):
29102910
if not os.path.isdir(datapath):
29112911
os.makedirs(datapath)
29122912
fname_aux = f"{datapath}{os.path.basename(filename)}"
2913-
s3_client.download_file(cfg["bucket"], filename, fname_aux)
2913+
s3_client.download_file(cfg["s3PathRead"], filename, fname_aux)
29142914
_, tend_sweeps, _, _ = get_sweep_time_coverage(fname_aux)
29152915
for tend in tend_sweeps:
29162916
if starttime <= tend <= endtime:

0 commit comments

Comments
 (0)