diff --git a/data/roads/pull_osm_roads.py b/data/roads/pull_osm_roads.py old mode 100644 new mode 100755 index fd63dda9f..983fda942 --- a/data/roads/pull_osm_roads.py +++ b/data/roads/pull_osm_roads.py @@ -16,7 +16,8 @@ from overpy.exception import OverpassGatewayTimeout, OverpassTooManyRequests from shapely.geometry import LineString -from utils.shared_functions import run_with_mp, setup_mp_file_logger +from src.utils.shared_functions import FIM_Helpers as fh +from src.utils.shared_functions import run_with_mp, setup_mp_file_logger srcDir = os.getenv('srcDir') @@ -199,6 +200,8 @@ def pull_roads(HUC_no, huc_geom, file_logger, screen_queue, task_id): def split_roads(gdf_roads, catchment_path, file_logger, screen_queue, task_id): huc_number = os.path.basename(os.path.dirname(catchment_path)) + # raise Exception(f"testing exception for {task_id}") + if not os.path.exists(catchment_path): # return the original roads and assign a dummy catchment id of 000 file_logger.info(f"no catchment file for {task_id}") @@ -237,7 +240,7 @@ def single_huc_job( HUC_no, huc_boundary_path, split_boundary_path, output_dir, file_logger, screen_queue, task_id ): # this is basically the task function that is passed into mp run - file_logger.info(f"started the processfor {task_id}") + file_logger.debug(f"started the process for {task_id}") try: huc_gpd = gpd.read_file(huc_boundary_path) huc_gpd_projected = huc_gpd.to_crs(pyproj.CRS.from_string("EPSG:4326")) @@ -257,27 +260,44 @@ def single_huc_job( output_path = os.path.join(output_dir, f"roads_{HUC_no}.gpkg") splitted_osm_roads.to_file(output_path) else: - file_logger.info(f"No roads within actual boundary of HUC for {task_id}") + file_logger.warning(f"No roads within actual boundary of HUC for {task_id}") screen_queue.put(f"No roads within actual boundary of HUC for {task_id}") return True except Exception as e: file_logger.error(f"❌ Exception in HUC {HUC_no}: {str(e)}") file_logger.error(traceback.format_exc()) # Optional: log full traceback + screen_queue.put(f"❌ Exception in HUC {HUC_no}: {str(e)}") return False -def pull_osm_roads(preclip_dir, output_dir, number_jobs): +def pull_osm_roads(preclip_dir, output_dir, number_jobs, lst_hucs): + + # ------------------- + # Validation + if number_jobs > 3: + print("Overpy does not seem to like more than 3 jobs. Adjusting job number down to 3.") + number_jobs = 3 + + if not os.path.exists(preclip_dir): + raise ValueError("preclip directory not found") + start_time = datetime.now(timezone.utc) # make output directory os.makedirs(output_dir, exist_ok=True) # Create the logger - file_dt_string = start_time.strftime("%Y_%m_%d-%H_%M_%S") + file_dt_string = start_time.strftime("%Y%m%d-%H%M") log_file_path = os.path.join(output_dir, f"pull_osm_roads_{file_dt_string}.log") file_logger = setup_mp_file_logger(log_file_path) + print("==================================") + print("Starting load of OSM Road data") + msg = f"Start time: {start_time.strftime('%m/%d/%Y %H:%M:%S')}" + file_logger.info(msg) + print(msg) + # === Build job list === # get list of HUC8s huc_numbers = [ @@ -286,6 +306,18 @@ def pull_osm_roads(preclip_dir, output_dir, number_jobs): if os.path.isdir(os.path.join(preclip_dir, huc)) and len(huc) == 8 ] + # If filtering hucs coming in, use it, if not continue with all. + if lst_hucs != '': + lst_hucs = lst_hucs.strip() + selected_hucs = lst_hucs.split(" ") + huc_numbers = [huc for huc in huc_numbers if huc in selected_hucs] + # print(f"new filtered list is {huc_numbers}") + + msg = f"Total number of hucs to be processed after filtering, if applicable, is {len(huc_numbers)}" + file_logger.info(msg) + print(msg) + huc_numbers.sort() + tasks_args_list = [] for HUC_no in huc_numbers: tasks_args_list.append( @@ -298,6 +330,8 @@ def pull_osm_roads(preclip_dir, output_dir, number_jobs): ) # === Run jobs in parallel === + # Note: Overpass API does not really like more than 4 request at a time + # but this can be a bit higher if on faster machines with better network speed. mp_results = run_with_mp( task_function=single_huc_job, tasks_args_list=tasks_args_list, @@ -312,8 +346,8 @@ def pull_osm_roads(preclip_dir, output_dir, number_jobs): failed_keys = [k for k, v in mp_results.items() if not v] if not failed_keys: - file_logger.info("✅ All multiprocessing tasks Succeeded") - print("✅ All multiprocessing tasks Succeeded") + file_logger.info("✅ All multiprocessing tasks succeeded") + print("✅ All multiprocessing tasks succeeded") else: file_logger.info(f"❌ {len(failed_keys)} failed:") print(f"❌ {len(failed_keys)} failed:") @@ -326,26 +360,32 @@ def pull_osm_roads(preclip_dir, output_dir, number_jobs): # Record run time end_time = datetime.now(timezone.utc) - tot_run_time = end_time - start_time - - start_formatted = start_time.strftime('%Y-%m-%d %H:%M:%S') - end_formatted = end_time.strftime('%Y-%m-%d %H:%M:%S') - - file_logger.info(f'Pulling OSM roads complete. {start_formatted}, {end_formatted}') - print('Pulling OSM roads complete', start_formatted, end_formatted) - file_logger.info(f'TOTAL RUN TIME: {str(tot_run_time)}') - print('TOTAL RUN TIME: ' + str(tot_run_time)) + msg = f"Pulling OSM roads complete. {end_time.strftime('%Y-%m-%d %H:%M:%S')}" + file_logger.info(msg) + print(msg) + dur = fh.print_date_time_duration(start_time, end_time, print_dur_msg=False) + file_logger.info(dur) + print(dur) if __name__ == "__main__": - # Only need to run this code once, since it looks for all hucs (conus or alaska) from preclip folder + # Only need to run this code once, since it looks for all hucs (conus, alaska, Guam, Am Somoa) from preclip folder # sample usage: # python foss_fim/data/roads/pull_osm_roads.py # -p data/inputs/pre_clip_huc8/20250218 - # -o outputs/roads/test/ - # -j 4 + # -o outputs/roads/test/20250910 (the FIM Dev convention is to make a folder with the date at the end) + + # When using the -lh argument, make sure the hucs are in quotes, spaces between HUC numbers. + # ie) '21010005 19020302' + + # +++++++++++++++++++++++++ + # Note: Overpass API has a system at their servers that manages the number of calls coming in from + # all locations, not just this script. At busier times, that threshold can be lower. + # Exact number varies. If you submit more than 4 jobs, we will adjust it down, but + # also have code to do re-tries. It seems to like 3 jobs most of the time. + # +++++++++++++++++++++++++ parser = argparse.ArgumentParser(description='Download OSM roads for all HUCs') @@ -361,11 +401,21 @@ def pull_osm_roads(preclip_dir, output_dir, number_jobs): parser.add_argument( '-j', '--number_jobs', - help='OPTIONAL: Number of (jobs) cores/processes for downloading HUC roads, default is 4. ', + help='OPTIONAL: Number of (jobs) cores/processes for downloading HUC roads, default is 3. ', required=False, - default=4, + default=3, type=int, ) + parser.add_argument( + '-lh', + '--lst-hucs', + help='OPTIONAL: Space-delimited list of HUCs to which can be used to filter osm road processing.' + ' This can be used for quick dev tests.' + ' Defaults to all HUC8s in the WBD input file.', + required=False, + default='', + ) + args = vars(parser.parse_args()) pull_osm_roads(**args) diff --git a/docs/CHANGELOG.md b/docs/CHANGELOG.md index bd3d3722c..ba9598efd 100755 --- a/docs/CHANGELOG.md +++ b/docs/CHANGELOG.md @@ -1,6 +1,14 @@ All notable changes to this project will be documented in this file. We follow the [Semantic Versioning 2.0.0](http://semver.org/) format. +## v4.8.10.4 - 2025-09-19 - [PR#1648](https://github.com/NOAA-OWP/inundation-mapping/pull/1648) + +Updated pull_osm_roads to allow for a selected set of hucs for testing. + +### Changes +- `data/roads/pull_osm_roads.py`: as described above. +
+ ## v4.8.10.3 - 2025-08-29 - [PR#1627](https://github.com/NOAA-OWP/inundation-mapping/pull/1627) Adds gcsfs dependency to allow retrieval of NWM output from the Google Cloud Service.