Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
92 changes: 71 additions & 21 deletions data/roads/pull_osm_roads.py
100644 → 100755
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@
from overpy.exception import OverpassGatewayTimeout, OverpassTooManyRequests
from shapely.geometry import LineString

from utils.shared_functions import run_with_mp, setup_mp_file_logger
from src.utils.shared_functions import FIM_Helpers as fh
from src.utils.shared_functions import run_with_mp, setup_mp_file_logger


srcDir = os.getenv('srcDir')
Expand Down Expand Up @@ -199,6 +200,8 @@ def pull_roads(HUC_no, huc_geom, file_logger, screen_queue, task_id):
def split_roads(gdf_roads, catchment_path, file_logger, screen_queue, task_id):
huc_number = os.path.basename(os.path.dirname(catchment_path))

# raise Exception(f"testing exception for {task_id}")

if not os.path.exists(catchment_path):
# return the original roads and assign a dummy catchment id of 000
file_logger.info(f"no catchment file for {task_id}")
Expand Down Expand Up @@ -237,7 +240,7 @@ def single_huc_job(
HUC_no, huc_boundary_path, split_boundary_path, output_dir, file_logger, screen_queue, task_id
):
# this is basically the task function that is passed into mp run
file_logger.info(f"started the processfor {task_id}")
file_logger.debug(f"started the process for {task_id}")
try:
huc_gpd = gpd.read_file(huc_boundary_path)
huc_gpd_projected = huc_gpd.to_crs(pyproj.CRS.from_string("EPSG:4326"))
Expand All @@ -257,27 +260,44 @@ def single_huc_job(
output_path = os.path.join(output_dir, f"roads_{HUC_no}.gpkg")
splitted_osm_roads.to_file(output_path)
else:
file_logger.info(f"No roads within actual boundary of HUC for {task_id}")
file_logger.warning(f"No roads within actual boundary of HUC for {task_id}")
screen_queue.put(f"No roads within actual boundary of HUC for {task_id}")
return True

except Exception as e:
file_logger.error(f"❌ Exception in HUC {HUC_no}: {str(e)}")
file_logger.error(traceback.format_exc()) # Optional: log full traceback
screen_queue.put(f"❌ Exception in HUC {HUC_no}: {str(e)}")
return False


def pull_osm_roads(preclip_dir, output_dir, number_jobs):
def pull_osm_roads(preclip_dir, output_dir, number_jobs, lst_hucs):

# -------------------
# Validation
if number_jobs > 3:
print("Overpy does not seem to like more than 3 jobs. Adjusting job number down to 3.")
number_jobs = 3

if not os.path.exists(preclip_dir):
raise ValueError("preclip directory not found")

start_time = datetime.now(timezone.utc)

# make output directory
os.makedirs(output_dir, exist_ok=True)

# Create the logger
file_dt_string = start_time.strftime("%Y_%m_%d-%H_%M_%S")
file_dt_string = start_time.strftime("%Y%m%d-%H%M")
log_file_path = os.path.join(output_dir, f"pull_osm_roads_{file_dt_string}.log")
file_logger = setup_mp_file_logger(log_file_path)

print("==================================")
print("Starting load of OSM Road data")
msg = f"Start time: {start_time.strftime('%m/%d/%Y %H:%M:%S')}"
file_logger.info(msg)
print(msg)

# === Build job list ===
# get list of HUC8s
huc_numbers = [
Expand All @@ -286,6 +306,18 @@ def pull_osm_roads(preclip_dir, output_dir, number_jobs):
if os.path.isdir(os.path.join(preclip_dir, huc)) and len(huc) == 8
]

# If filtering hucs coming in, use it, if not continue with all.
if lst_hucs != '':
lst_hucs = lst_hucs.strip()
selected_hucs = lst_hucs.split(" ")
huc_numbers = [huc for huc in huc_numbers if huc in selected_hucs]
# print(f"new filtered list is {huc_numbers}")

msg = f"Total number of hucs to be processed after filtering, if applicable, is {len(huc_numbers)}"
file_logger.info(msg)
print(msg)
huc_numbers.sort()

tasks_args_list = []
for HUC_no in huc_numbers:
tasks_args_list.append(
Expand All @@ -298,6 +330,8 @@ def pull_osm_roads(preclip_dir, output_dir, number_jobs):
)

# === Run jobs in parallel ===
# Note: Overpass API does not really like more than 4 request at a time
# but this can be a bit higher if on faster machines with better network speed.
mp_results = run_with_mp(
task_function=single_huc_job,
tasks_args_list=tasks_args_list,
Expand All @@ -312,8 +346,8 @@ def pull_osm_roads(preclip_dir, output_dir, number_jobs):
failed_keys = [k for k, v in mp_results.items() if not v]

if not failed_keys:
file_logger.info("✅ All multiprocessing tasks Succeeded")
print("✅ All multiprocessing tasks Succeeded")
file_logger.info("✅ All multiprocessing tasks succeeded")
print("✅ All multiprocessing tasks succeeded")
else:
file_logger.info(f"❌ {len(failed_keys)} failed:")
print(f"❌ {len(failed_keys)} failed:")
Expand All @@ -326,26 +360,32 @@ def pull_osm_roads(preclip_dir, output_dir, number_jobs):

# Record run time
end_time = datetime.now(timezone.utc)
tot_run_time = end_time - start_time

start_formatted = start_time.strftime('%Y-%m-%d %H:%M:%S')
end_formatted = end_time.strftime('%Y-%m-%d %H:%M:%S')

file_logger.info(f'Pulling OSM roads complete. {start_formatted}, {end_formatted}')
print('Pulling OSM roads complete', start_formatted, end_formatted)
file_logger.info(f'TOTAL RUN TIME: {str(tot_run_time)}')
print('TOTAL RUN TIME: ' + str(tot_run_time))
msg = f"Pulling OSM roads complete. {end_time.strftime('%Y-%m-%d %H:%M:%S')}"
file_logger.info(msg)
print(msg)
dur = fh.print_date_time_duration(start_time, end_time, print_dur_msg=False)
file_logger.info(dur)
print(dur)


if __name__ == "__main__":

# Only need to run this code once, since it looks for all hucs (conus or alaska) from preclip folder
# Only need to run this code once, since it looks for all hucs (conus, alaska, Guam, Am Somoa) from preclip folder

# sample usage:
# python foss_fim/data/roads/pull_osm_roads.py
# -p data/inputs/pre_clip_huc8/20250218
# -o outputs/roads/test/
# -j 4
# -o outputs/roads/test/20250910 (the FIM Dev convention is to make a folder with the date at the end)

# When using the -lh argument, make sure the hucs are in quotes, spaces between HUC numbers.
# ie) '21010005 19020302'

# +++++++++++++++++++++++++
# Note: Overpass API has a system at their servers that manages the number of calls coming in from
# all locations, not just this script. At busier times, that threshold can be lower.
# Exact number varies. If you submit more than 4 jobs, we will adjust it down, but
# also have code to do re-tries. It seems to like 3 jobs most of the time.
# +++++++++++++++++++++++++

parser = argparse.ArgumentParser(description='Download OSM roads for all HUCs')

Expand All @@ -361,11 +401,21 @@ def pull_osm_roads(preclip_dir, output_dir, number_jobs):
parser.add_argument(
'-j',
'--number_jobs',
help='OPTIONAL: Number of (jobs) cores/processes for downloading HUC roads, default is 4. ',
help='OPTIONAL: Number of (jobs) cores/processes for downloading HUC roads, default is 3. ',
required=False,
default=4,
default=3,
type=int,
)

parser.add_argument(
'-lh',
'--lst-hucs',
help='OPTIONAL: Space-delimited list of HUCs to which can be used to filter osm road processing.'
' This can be used for quick dev tests.'
' Defaults to all HUC8s in the WBD input file.',
required=False,
default='',
)

args = vars(parser.parse_args())
pull_osm_roads(**args)
8 changes: 8 additions & 0 deletions docs/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,14 @@
All notable changes to this project will be documented in this file.
We follow the [Semantic Versioning 2.0.0](http://semver.org/) format.

## v4.8.x.x - 2025-09-08 - [PR#1648](https://github.com/NOAA-OWP/inundation-mapping/pull/1648)

Updated pull_osm_roads to allow for a selected set of hucs for testing.

### Changes
- `data/roads/pull_osm_roads.py`: as described above.
<br/>

## v4.8.10.3 - 2025-08-29 - [PR#1627](https://github.com/NOAA-OWP/inundation-mapping/pull/1627)

Adds gcsfs dependency to allow retrieval of NWM output from the Google Cloud Service.
Expand Down