Skip to content

Commit 05aac74

Browse files
committed
black code formating
1 parent 10a38fe commit 05aac74

File tree

2 files changed

+353
-66
lines changed

2 files changed

+353
-66
lines changed

gedidb/core/gediprocessor.py

Lines changed: 165 additions & 66 deletions
Original file line numberDiff line numberDiff line change
@@ -6,25 +6,29 @@
66
# SPDX-FileCopyrightText: 2025 Helmholtz Centre Potsdam - GFZ German Research Centre for Geosciences
77
#
88

9-
import concurrent.futures
109
import logging
1110
import os
1211
import traceback
1312
from datetime import datetime
1413
from pathlib import Path
1514
from typing import Optional, Union
15+
import time
1616

1717
import geopandas as gpd
1818
import pandas as pd
1919
import yaml
2020
from dask.distributed import Client
21+
import concurrent.futures
22+
from concurrent.futures import as_completed
2123

2224
from gedidb.core.gedidatabase import GEDIDatabase
2325
from gedidb.core.gedigranule import GEDIGranule
2426
from gedidb.downloader.authentication import EarthDataAuthenticator
2527
from gedidb.downloader.data_downloader import CMRDataDownloader, H5FileDownloader
2628
from gedidb.utils.constants import GediProduct
2729
from gedidb.utils.geo_processing import _temporal_tiling, check_and_format_shape
30+
from gedidb.utils.progress_ledger import ProgressLedger, Row
31+
2832

2933
# Configure logging
3034
logging.basicConfig(
@@ -169,6 +173,11 @@ def __init__(
169173
os.path.join(self.data_info["data_dir"], "download")
170174
)
171175

176+
self.progress_dir = self._ensure_directory(
177+
os.path.join(self.data_info["data_dir"], "progress")
178+
)
179+
self.report_every = int(self.data_info["tiledb"].get("report_every", 25))
180+
172181
# Initialize database writer
173182
self.database_writer = self._initialize_database_writer(credentials)
174183

@@ -349,46 +358,84 @@ def _process_granules(self, unprocessed_cmr_data: dict):
349358
"""
350359
Process unprocessed granules in parallel using the selected parallelization engine.
351360
"""
352-
# Check the temporal tiling configuration
353361
temporal_batching = self.data_info["tiledb"].get("temporal_batching", None)
354-
355-
if temporal_batching == "daily" or temporal_batching == "weekly":
356-
# Apply temporal tiling based on the specified configuration
357-
unprocessed_temporal_cmr_data = _temporal_tiling(
358-
unprocessed_cmr_data, temporal_batching
359-
)
362+
if temporal_batching in ("daily", "weekly"):
363+
batches = _temporal_tiling(unprocessed_cmr_data, temporal_batching)
360364
elif temporal_batching is None:
361-
# No tiling, process all granules as a single batch
362-
unprocessed_temporal_cmr_data = {"all": unprocessed_cmr_data}
365+
batches = {"all": unprocessed_cmr_data}
363366
else:
364-
# Raise an error for invalid temporal tiling options
365-
raise ValueError(
366-
f"Invalid temporal batching option: '{temporal_batching}'. "
367-
"It must be one of ['daily', 'weekly', None]."
368-
)
367+
raise ValueError("Invalid temporal batching option.")
369368

370369
if isinstance(self.parallel_engine, concurrent.futures.Executor):
371-
# Create the executor once
372370
with self.parallel_engine as executor:
373-
for (
374-
timeframe,
375-
granules,
376-
) in unprocessed_temporal_cmr_data.items():
377-
futures = [
378-
executor.submit(
371+
for timeframe, granules in batches.items():
372+
ledger = ProgressLedger(
373+
os.path.join(self.progress_dir, timeframe), timeframe
374+
)
375+
# submit all
376+
future_map = {}
377+
for gid, pinf in granules.items():
378+
ledger.note_submit(gid)
379+
fut = executor.submit(
379380
GEDIProcessor.process_single_granule,
380-
granule_id,
381-
product_info,
381+
gid,
382+
pinf,
382383
self.data_info,
383384
self.download_path,
384385
)
385-
for granule_id, product_info in granules.items()
386-
]
387-
results = [future.result() for future in futures]
388-
389-
# Collect valid data for writing
390-
valid_dataframes = [gdf for _, gdf in results if gdf is not None]
391-
386+
future_map[fut] = gid
387+
388+
valid_dataframes = []
389+
counter = 0
390+
for fut in as_completed(future_map):
391+
gid = future_map[fut]
392+
started_ts = time.time()
393+
try:
394+
(ids_, gdf, metrics) = fut.result()
395+
finished_ts = time.time()
396+
ok = ids_ is not None
397+
if gdf is not None:
398+
valid_dataframes.append(gdf)
399+
if ok:
400+
self.database_writer.mark_granule_as_processed(ids_)
401+
row = Row(
402+
granule_id=gid,
403+
timeframe=timeframe,
404+
submitted_ts=ledger._submits.get(gid, finished_ts),
405+
started_ts=metrics.get("started_ts", started_ts),
406+
finished_ts=finished_ts,
407+
duration_s=finished_ts
408+
- metrics.get("started_ts", started_ts),
409+
status="ok" if ok else "fail",
410+
n_records=metrics.get("n_records"),
411+
bytes_downloaded=metrics.get("bytes_downloaded"),
412+
products=",".join(metrics.get("products", [])),
413+
error_msg=None,
414+
)
415+
ledger.append(row)
416+
except Exception as e:
417+
finished_ts = time.time()
418+
tb = traceback.format_exc()
419+
ledger.write_error(gid, tb)
420+
row = Row(
421+
granule_id=gid,
422+
timeframe=timeframe,
423+
submitted_ts=ledger._submits.get(gid, finished_ts),
424+
started_ts=started_ts,
425+
finished_ts=finished_ts,
426+
duration_s=finished_ts - started_ts,
427+
status="fail",
428+
error_msg=str(e),
429+
)
430+
ledger.append(row)
431+
logger.error(f"Granule {gid} failed: {e}")
432+
finally:
433+
counter += 1
434+
if counter % self.report_every == 0:
435+
ledger.write_status_md()
436+
ledger.write_html()
437+
438+
# write data then finalize report
392439
if valid_dataframes:
393440
concatenated_df = pd.concat(valid_dataframes, ignore_index=True)
394441
quadrants = self.database_writer.spatial_chunking(
@@ -397,47 +444,85 @@ def _process_granules(self, unprocessed_cmr_data: dict):
397444
for data in quadrants.values():
398445
self.database_writer.write_granule(data)
399446

400-
# Collect processed granules
401-
granule_ids = [ids_ for ids_, _ in results if ids_ is not None]
402-
for granule_id in granule_ids:
403-
self.database_writer.mark_granule_as_processed(granule_id)
447+
ledger.write_status_md()
448+
ledger.write_html()
404449

405450
elif isinstance(self.parallel_engine, Client):
406-
for timeframe, granules in unprocessed_temporal_cmr_data.items():
407-
408-
# Assume Dask client
409-
futures = [
410-
self.parallel_engine.submit(
451+
for timeframe, granules in batches.items():
452+
ledger = ProgressLedger(
453+
os.path.join(self.progress_dir, timeframe), timeframe
454+
)
455+
futures = []
456+
for gid, pinf in granules.items():
457+
ledger.note_submit(gid)
458+
fut = self.parallel_engine.submit(
411459
GEDIProcessor.process_single_granule,
412-
granule_id,
413-
product_info,
460+
gid,
461+
pinf,
414462
self.data_info,
415463
self.download_path,
416464
)
417-
for granule_id, product_info in granules.items()
418-
]
419-
results = self.parallel_engine.gather(futures)
420-
421-
# Collect valid data for writing
422-
valid_dataframes = [gdf for _, gdf in results if gdf is not None]
465+
futures.append((gid, fut))
466+
467+
valid_dataframes = []
468+
counter = 0
469+
for gid, fut in futures:
470+
started_ts = time.time()
471+
try:
472+
ids_, gdf, metrics = self.parallel_engine.gather(fut)
473+
finished_ts = time.time()
474+
ok = ids_ is not None
475+
if gdf is not None:
476+
valid_dataframes.append(gdf)
477+
if ok:
478+
self.database_writer.mark_granule_as_processed(ids_)
479+
row = Row(
480+
granule_id=gid,
481+
timeframe=timeframe,
482+
submitted_ts=ledger._submits.get(gid, finished_ts),
483+
started_ts=metrics.get("started_ts", started_ts),
484+
finished_ts=finished_ts,
485+
duration_s=finished_ts
486+
- metrics.get("started_ts", started_ts),
487+
status="ok" if ok else "fail",
488+
n_records=metrics.get("n_records"),
489+
bytes_downloaded=metrics.get("bytes_downloaded"),
490+
products=",".join(metrics.get("products", [])),
491+
error_msg=None,
492+
)
493+
ledger.append(row)
494+
except Exception as e:
495+
finished_ts = time.time()
496+
tb = traceback.format_exc()
497+
ledger.write_error(gid, tb)
498+
row = Row(
499+
granule_id=gid,
500+
timeframe=timeframe,
501+
submitted_ts=ledger._submits.get(gid, finished_ts),
502+
started_ts=started_ts,
503+
finished_ts=finished_ts,
504+
duration_s=finished_ts - started_ts,
505+
status="fail",
506+
error_msg=str(e),
507+
)
508+
ledger.append(row)
509+
logger.error(f"Dask task for {gid} failed: {e}")
510+
finally:
511+
counter += 1
512+
if counter % self.report_every == 0:
513+
ledger.write_status_md()
514+
ledger.write_html()
423515

424516
if valid_dataframes:
425517
concatenated_df = pd.concat(valid_dataframes, ignore_index=True)
426-
quadrants = self.database_writer.spatial_chunking(
427-
concatenated_df,
428-
chunk_size=self.data_info["tiledb"]["chunk_size"],
429-
)
518+
quadrants = self.database_writer.spatial_chunking(concatenated_df)
430519
for data in quadrants.values():
431520
self.database_writer.write_granule(data)
432521

433-
# Collect processed granules
434-
granule_ids = [ids_ for ids_, _ in results if ids_ is not None]
435-
for granule_id in granule_ids:
436-
self.database_writer.mark_granule_as_processed(granule_id)
522+
ledger.write_status_md()
523+
ledger.write_html()
437524
else:
438-
raise ValueError(
439-
"Unsupported parallel engine. Provide a 'concurrent.futures.Executor' or 'dask.distributed.Client'."
440-
)
525+
raise ValueError("Unsupported parallel engine.")
441526

442527
@staticmethod
443528
def process_single_granule(granule_id, product_info, data_info, download_path):
@@ -460,18 +545,32 @@ def process_single_granule(granule_id, product_info, data_info, download_path):
460545
tuple
461546
A tuple containing the granule ID and the processed granule data.
462547
"""
463-
# Initialize the downloader
548+
549+
started_ts = time.time()
464550
downloader = H5FileDownloader(download_path)
465551

466-
# Sequentially download each product
552+
bytes_dl = 0
553+
prods = []
467554
download_results = []
468-
for url, product, _ in product_info:
469-
result = downloader.download(granule_id, url, GediProduct(product))
470-
download_results.append(result)
555+
for url, product, extra in product_info:
556+
res = downloader.download(granule_id, url, GediProduct(product))
557+
# If your downloader can expose sizes, insert here:
558+
if isinstance(res, tuple) and len(res) >= 2 and isinstance(res[1], int):
559+
bytes_dl += int(res[1])
560+
prods.append(str(product))
561+
download_results.append(res)
471562

472-
# Process granule
473563
granule_processor = GEDIGranule(download_path, data_info)
474-
return granule_processor.process_granule(download_results)
564+
ids_, gdf = granule_processor.process_granule(download_results)
565+
n_records = int(gdf.shape[0]) if gdf is not None else None
566+
567+
metrics = {
568+
"started_ts": started_ts,
569+
"bytes_downloaded": bytes_dl or None,
570+
"products": prods,
571+
"n_records": n_records,
572+
}
573+
return ids_, gdf, metrics
475574

476575
def close(self):
477576
"""Close the parallelization engine if applicable."""

0 commit comments

Comments
 (0)