Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 10 additions & 4 deletions data/bridges/make_dem_dif_for_bridges.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
from shapely.geometry import Point

from data.create_vrt_file import create_vrt_file
from utils.shared_functions import run_with_mp, setup_mp_file_logger
from src.utils.shared_functions import run_with_mp, setup_mp_file_logger


"""
Expand Down Expand Up @@ -165,12 +165,12 @@ def make_one_diff(
with rasterio.open(output_diff_path, 'w', **raster_meta) as dst:
dst.write(updated_raster, 1)
screen_queue.put(f"End of processing {task_id}")
return 0, [True]
return 1, [True]

except Exception as e:
file_logger.error(f"❌ Exception in HUC {task_id}: {str(e)}")
file_logger.error(traceback.format_exc())
return 1, [False]
return 0, [False]


def make_dif_rasters(OSM_bridge_file, dem_dir, lidar_tif_dir, output_dir, number_jobs):
Expand All @@ -180,17 +180,23 @@ def make_dif_rasters(OSM_bridge_file, dem_dir, lidar_tif_dir, output_dir, number

file_dt_string = start_time.strftime("%Y_%m_%d-%H_%M_%S")
log_file_path = os.path.join(output_dir, f"DEM_diff_rasters-{file_dt_string}.log")
file_logger = setup_mp_file_logger(log_file_path)
file_logger = setup_mp_file_logger(log_file_path, "DEM_diff_raster")

try:
print('Reading osm bridge lines...')
if not os.path.isfile(OSM_bridge_file):
raise ValueError(f"Argument -i OSM_bridge_file of {OSM_bridge_file} does not exist.")
OSM_bridge_lines_gdf = gpd.read_file(OSM_bridge_file)

print('Adding HUC8/6 number and info about existence of lidar raster or not...')
OSM_bridge_lines_gdf['huc6'] = OSM_bridge_lines_gdf['huc8'].str[:6]
OSM_bridge_lines_gdf = identify_bridges_with_lidar(OSM_bridge_lines_gdf, lidar_tif_dir)
if len(OSM_bridge_lines_gdf) == 0:
raise ValueError("There are no bridges with lidar data, check data and tif folder pathing.")

dem_files = list(glob.glob(os.path.join(dem_dir, '*.tif')))
if len(dem_files) == 0:
raise ValueError("No DEM files were found. Please recheck the DEM folder pathing")
dem_files.sort()

available_dif_files = list(glob.glob(os.path.join(output_dir, '*.tif')))
Expand Down
2 changes: 2 additions & 0 deletions data/bridges/pull_osm_bridges.py
Original file line number Diff line number Diff line change
Expand Up @@ -313,6 +313,8 @@ def combine_huc_features(output_dir):

def process_osm_bridges(wbd_file, output_folder, number_of_jobs, lst_hucs):
start_time = dt.datetime.now(dt.timezone.utc)

# TODO: plug in shared_functions version for logger setup
__setup_logger(output_folder)

print("==================================")
Expand Down
7 changes: 4 additions & 3 deletions data/nfhl/download_fema_nfhl.py
Original file line number Diff line number Diff line change
Expand Up @@ -282,10 +282,10 @@ def __get_nfhl_flood_hazard_zones(
)
if success: # True
file_logger.info(f"Completed processing HUC {task_id}")
return 0, [True]
return 1, [True]
# screen_queue.put(f"Completed processing HUC {task_id}")
else: # False
return 2, [False]
return 0, [False]

except Exception as e:
file_logger.error(f"Exception in HUC {task_id}: {str(e)}")
Expand Down Expand Up @@ -325,7 +325,8 @@ def download_nfhl_wrapper(huc_list, output_folder, geometryType='esriGeometryEnv
# Set up logger
file_dt_string = start_time.strftime("%Y_%m_%d-%H_%M_%S")
log_file_path = os.path.join(output_folder, f"nfhl_download-{file_dt_string}.log")
file_logger = setup_mp_file_logger(log_file_path)
# file_logger = setup_mp_file_logger(log_file_path)
file_logger = setup_mp_file_logger(log_file_path, "nfhl_download")

try:

Expand Down
6 changes: 3 additions & 3 deletions data/roads/pull_osm_roads.py
Original file line number Diff line number Diff line change
Expand Up @@ -262,12 +262,12 @@ def single_huc_job(
else:
file_logger.warning(f"No roads within actual boundary of HUC for {task_id}")
screen_queue.put(f"No roads within actual boundary of HUC for {task_id}")
return 0, [True]
return 1, [True]

except Exception as e:
file_logger.error(f"❌ Exception in HUC {HUC_no}: {str(e)}")
file_logger.error(traceback.format_exc()) # Optional: log full traceback
return 2, [False]
return 0, [False]


def pull_osm_roads(preclip_dir, output_dir, number_jobs, lst_hucs):
Expand All @@ -289,7 +289,7 @@ def pull_osm_roads(preclip_dir, output_dir, number_jobs, lst_hucs):
# Create the logger
file_dt_string = start_time.strftime("%Y%m%d-%H%M")
log_file_path = os.path.join(output_dir, f"pull_osm_roads_{file_dt_string}.log")
file_logger = setup_mp_file_logger(log_file_path)
file_logger = setup_mp_file_logger(log_file_path, "pull_osm_roads")

print("==================================")
print("Starting load of OSM Road data")
Expand Down
66 changes: 38 additions & 28 deletions data/usgs/get_usgs_rating_curves.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,12 +104,12 @@ def __mp_get_flows_for_site(
flow_df = flow_df.rename(columns={'discharge': 'discharge_cms'})

site_flows_df = pd.concat([site_flows_df, flow_df], ignore_index=True)
return 0, [site_flows_df]
return 1, site_flows_df

except Exception:
file_logger.critical(f"❌ Critical error while processing {task_id}")
file_logger.critical(traceback.format_exc())
return 1, [] # shut the program down.
return -1, None # shut the program down.


# Generate categorical flows for each category across all sites.
Expand All @@ -131,12 +131,6 @@ def __write_categorical_flow_files(
This function uses its own mp log file. This arg allows the mp logs to be
appended to the parent when it is done.
Note: For now.. any error files created anywhere are not removed.

Returns
-------
all_flows_data : DataFrame
A dataframe of categorical flow for every feature ID in the input metadata.

'''

threshold_url = f'{API_BASE_URL}/nws_threshold'
Expand Down Expand Up @@ -189,7 +183,8 @@ def __write_categorical_flow_files(
# Again.. we make a special mp for this set
mp_log_file_path = os.path.join(output_dir, f"get_rating_curves-mp-flow-{file_datetime_string}.log")
mp_logger = sf.setup_mp_file_logger(mp_log_file_path, logger_name="mp_flows")
list_flow_dfs = sf.run_with_mp(
# We get a list of dictionaries
flow_dfs = sf.run_with_mp(
task_function=__mp_get_flows_for_site,
tasks_args_list=sorted_tasks_args_list,
file_logger=mp_logger,
Expand All @@ -203,16 +198,23 @@ def __write_categorical_flow_files(
# for now.. let's leave the error files alone, even the mp ones

# roll up the list of df's into one master df
all_flows_data = pd.concat(list_flow_dfs, ignore_index=True)
# run_with_mp returns a list of dictionaries keyed with a huc. We don't care about the keys, just the values
# which are df's
all_flows_data = pd.DataFrame()
for i, value in enumerate(flow_dfs.values()):
if i == 0:
all_flows_data = value
else:
all_flows_data = pd.concat([all_flows_data, value])

# Write usgs stage discharge data, used by Sierra tests (rating_curve_comparison.py)
logging.info("Writing for USGS discharge data for each usgs stage (ie. action, minor, etc)")
if not all_flows_data.empty:
usgs_discharge_file_name = os.path.join(output_dir, 'usgs_stage_discharge_cms.csv')
final_data = all_flows_data[['feature_id', 'discharge_cms', 'recurr_interval']]
final_data.to_csv(usgs_discharge_file_name, index=False)

return all_flows_data
else:
logging.info("No flow data was found. Saving of usgs_stage_discharge_cms file skipped")


def set_global_env(env_file):
Expand Down Expand Up @@ -260,7 +262,7 @@ def __mp_get_site_rating_curve(
# If no rating curve was returned, skip site.
if curve.empty:
file_logger.warning(f'{location_id}: Removed because it has no rating curves')
return 2, [] # log and continue the next task
return 0, None # log and continue the next task

# If the site is in PR, VI, or HI, keep datum in LMSL (local mean sea level)
# because our 3DEP dems are also in LMSL for these areas.
Expand All @@ -278,7 +280,7 @@ def __mp_get_site_rating_curve(
f'VI, or HI but has a datum other than LMSL ({datum_name})'
)
file_logger.warning(message)
return 2, [] # log and continue the next task
return 0, None # log and continue the next task

# If the state is not PR, VI, or HI, then we want to adjust the datum to NAVD88 if needed.
# If the datum is unknown, skip site.
Expand All @@ -291,7 +293,7 @@ def __mp_get_site_rating_curve(
# If datum API failed, print message and skip site.
if datum_adj_ft is None:
file_logger.warning(f'{location_id}: Removed because datum adjustment failed!!')
return 2, [] # log and continue the next task
return 0, None # log and continue the next task

# If datum adjustment succeeded, calculate datum in NAVD88
navd88_datum = round(usgs['datum'] + datum_adj_ft, 2)
Expand All @@ -306,29 +308,29 @@ def __mp_get_site_rating_curve(
file_logger.warning(
f'{location_id}: Removed because LMSL datum found outside of PR, VI, or HI'
)
return 2, [] # log and continue the next task
return 0, None # log and continue the next task

else:
# If the site has an unrecognized datum, skip site.
datum_name = usgs['vcs']
file_logger.warning(f'{location_id}: Removed due to unknown datum ({datum_name})')
return 2, [] # log and continue the next task
return 0, None # log and continue the next task

# Populate rating curve with metadata and use navd88 datum to convert stage to elevation.
# If you came in looking for all sites, then "active" will be true. A filtered set, this might be True or False
curve['active'] = usgs['active']
curve['datum'] = usgs['datum']
curve['datum_vcs'] = usgs['vcs']
curve['navd88_datum'] = navd88_datum
curve['elevation_navd88'] = curve['stage'] + navd88_datum
curve['elevation_navd88'] = round(curve['stage'] + navd88_datum, 2)

file_logger.debug(f"Done rating curves for usgs location id of {usgs_site_code}")
return 0, [curve]
return 1, curve

except Exception:
file_logger.critical(f"❌ Critical error while processing {task_id}")
file_logger.critical(traceback.format_exc())
return 1, [] # shut the program down.
return -1, None # shut the program down.


def __get_usgs_metadata(list_of_gage_sites, metadata_url):
Expand Down Expand Up @@ -535,12 +537,14 @@ def usgs_rating_to_elev(list_of_gage_sites, env_file, num_jobs, output_dir):
sys.exit()
else:
print(f'Loading environment file: {env_file}')
print("")
# Set global variables
set_global_env(env_file)

if list_of_gage_sites != 'all':
print(
"*** You have provide a list of specific usgs site codes to process.\nPlease note that when getting all sites,"
"\n"
"*** NOTICE: You have provide a list of specific usgs site codes to process.\nPlease note that when getting all sites,"
" it filters to only sites that are active. But when using specific codes, it will not use the 'is active' filter.\n\n"
"To continue, hit your enter key or CTRL-C to abort"
)
Expand Down Expand Up @@ -575,9 +579,7 @@ def usgs_rating_to_elev(list_of_gage_sites, env_file, num_jobs, output_dir):
file_datetime_string = overall_start_dt.strftime("%Y%m%d-%H%M")
display_dt_string = datetime.now(timezone.utc).strftime("%m/%d/%Y %H:%M:%S")

log_file_name = f"get_rating_curves-{file_datetime_string}.log"
log_file_path = os.path.join(output_dir, log_file_name)
sf.setup_file_logger(log_file_path)
log_file_path = sf.setup_file_logger(output_dir, "get_rating_curves")
# file_logger = sf.setup_mp_file_logger(log_file_path)

try:
Expand All @@ -604,7 +606,6 @@ def usgs_rating_to_elev(list_of_gage_sites, env_file, num_jobs, output_dir):
display_dt_string = section_start_dt.strftime("%m/%d/%Y %H:%M:%S")
logging.info("=============")
logging.info(f"Processing metadata started: {display_dt_string} (UTC)")
all_rating_curves = pd.DataFrame()

num_sites = len(metadata_list)
logging.info(f"Number of sites to process: {num_sites}")
Expand Down Expand Up @@ -634,7 +635,8 @@ def usgs_rating_to_elev(list_of_gage_sites, env_file, num_jobs, output_dir):
# parent script.
mp_log_file_path = os.path.join(output_dir, f"get_rating_curves-mp-{file_datetime_string}.log")
mp_logger = sf.setup_mp_file_logger(mp_log_file_path, logger_name="mp_rcs")
list_rating_curves_dfs = sf.run_with_mp(
# We get a list of dictionaries
rating_curves_dfs = sf.run_with_mp(
task_function=__mp_get_site_rating_curve,
tasks_args_list=sorted_tasks_args_list,
file_logger=mp_logger,
Expand All @@ -648,11 +650,19 @@ def usgs_rating_to_elev(list_of_gage_sites, env_file, num_jobs, output_dir):
# for now.. let's leave the error files alone, even the mp ones

# more processing of rating curves
if len(list_rating_curves_dfs) == 0:
if len(rating_curves_dfs) == 0:
logging.error("There are no acceptable sites. Stopping program.")
sys.exit(1)

all_rating_curves = pd.concat(list_rating_curves_dfs)
# run_with_mp returns a list of dictionaries keyed with a huc. We don't care about the keys, just the values
# which are df's
all_rating_curves = pd.DataFrame()
for i, value in enumerate(rating_curves_dfs.values()):
if i == 0:
all_rating_curves = value
else:
all_rating_curves = pd.concat([all_rating_curves, value])

logging.info(f"Number of sites to processes with metadata: {len(all_rating_curves)}")

display_dt_string = datetime.now(timezone.utc).strftime("%m/%d/%Y %H:%M:%S")
Expand Down
19 changes: 19 additions & 0 deletions docs/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,25 @@
All notable changes to this project will be documented in this file.
We follow the [Semantic Versioning 2.0.0](http://semver.org/) format.

## v4.8.12.3 - 2025-10-10 - [PR#1660](https://github.com/NOAA-OWP/inundation-mapping/pull/1660)

The file shared_functions.py had a bug in it related to a object length check. During that processes we discovered some enhancements to logging tools and run_with_mp. The return code system was changed now where 1 = success, 0 = fail but do not abort and -1 means fail and abort run.

This triggered changes to scripts that use the logging or run_with_mp tools for their return values from their multi-proc functions. The change affected four data load scripts, and all were retested for correct data loading. A fifth data load script, pull_osm_bridges.py, was updated but only for a comment and was not affected by other changes in this PR.

There was also a change to shared_functions.setup_file_logger. It no longer takes a full path and file name for a log file, but two separate variables, one for the log folder, the other for a log file prefix. A date will be added automatically to the log file name.

### Changes

- `data`
- `bridges\make_dem_dif_for_bridges.py`: Updated call to setup_file_logger plus changes related to run_with_mp. A bit of additional error handling was also added.
- `bridges\pull_osm_bridges.py`: Just a comment added
- `nfhl\download_fema_nfhl.py`: Updated call to setup_file_logger plus changes related to run_with_mp.
- `roads\pull_osm_roads.py`: Updated call to setup_file_logger plus changes related to run_with_mp.
- `src\shared_functions.py`: as described.
- `usgs\get_usgs_rating_curve.py`: Updated call to setup_file_logger plus changes related to run_with_mp. A bit of additional error handling was also added. A small fix for rounding was also fixed.
<br/>

## v4.8.12.2 - 2025-10-10 - [PR#1616](https://github.com/NOAA-OWP/inundation-mapping/pull/1616)

Fixes a bug that was introduced to flow-based CatFIM in recent changes to the Inundate_gms() function. The bug was fixed by adding multi_process = True as an input to the function.
Expand Down
Loading