diff --git a/data/bridges/make_dem_dif_for_bridges.py b/data/bridges/make_dem_dif_for_bridges.py index f4cfe7c46..6d2b0f500 100644 --- a/data/bridges/make_dem_dif_for_bridges.py +++ b/data/bridges/make_dem_dif_for_bridges.py @@ -20,7 +20,7 @@ from shapely.geometry import Point from data.create_vrt_file import create_vrt_file -from utils.shared_functions import run_with_mp, setup_mp_file_logger +from src.utils.shared_functions import run_with_mp, setup_mp_file_logger """ @@ -165,12 +165,12 @@ def make_one_diff( with rasterio.open(output_diff_path, 'w', **raster_meta) as dst: dst.write(updated_raster, 1) screen_queue.put(f"End of processing {task_id}") - return 0, [True] + return 1, [True] except Exception as e: file_logger.error(f"❌ Exception in HUC {task_id}: {str(e)}") file_logger.error(traceback.format_exc()) - return 1, [False] + return 0, [False] def make_dif_rasters(OSM_bridge_file, dem_dir, lidar_tif_dir, output_dir, number_jobs): @@ -180,17 +180,23 @@ def make_dif_rasters(OSM_bridge_file, dem_dir, lidar_tif_dir, output_dir, number file_dt_string = start_time.strftime("%Y_%m_%d-%H_%M_%S") log_file_path = os.path.join(output_dir, f"DEM_diff_rasters-{file_dt_string}.log") - file_logger = setup_mp_file_logger(log_file_path) + file_logger = setup_mp_file_logger(log_file_path, "DEM_diff_raster") try: print('Reading osm bridge lines...') + if not os.path.isfile(OSM_bridge_file): + raise ValueError(f"Argument -i OSM_bridge_file of {OSM_bridge_file} does not exist.") OSM_bridge_lines_gdf = gpd.read_file(OSM_bridge_file) print('Adding HUC8/6 number and info about existence of lidar raster or not...') OSM_bridge_lines_gdf['huc6'] = OSM_bridge_lines_gdf['huc8'].str[:6] OSM_bridge_lines_gdf = identify_bridges_with_lidar(OSM_bridge_lines_gdf, lidar_tif_dir) + if len(OSM_bridge_lines_gdf) == 0: + raise ValueError("There are no bridges with lidar data, check data and tif folder pathing.") dem_files = list(glob.glob(os.path.join(dem_dir, '*.tif'))) + if len(dem_files) == 0: + raise ValueError("No DEM files were found. Please recheck the DEM folder pathing") dem_files.sort() available_dif_files = list(glob.glob(os.path.join(output_dir, '*.tif'))) diff --git a/data/bridges/pull_osm_bridges.py b/data/bridges/pull_osm_bridges.py index a8877d091..ba70b4026 100644 --- a/data/bridges/pull_osm_bridges.py +++ b/data/bridges/pull_osm_bridges.py @@ -313,6 +313,8 @@ def combine_huc_features(output_dir): def process_osm_bridges(wbd_file, output_folder, number_of_jobs, lst_hucs): start_time = dt.datetime.now(dt.timezone.utc) + + # TODO: plug in shared_functions version for logger setup __setup_logger(output_folder) print("==================================") diff --git a/data/nfhl/download_fema_nfhl.py b/data/nfhl/download_fema_nfhl.py index b86d576b2..e153d6040 100644 --- a/data/nfhl/download_fema_nfhl.py +++ b/data/nfhl/download_fema_nfhl.py @@ -282,10 +282,10 @@ def __get_nfhl_flood_hazard_zones( ) if success: # True file_logger.info(f"Completed processing HUC {task_id}") - return 0, [True] + return 1, [True] # screen_queue.put(f"Completed processing HUC {task_id}") else: # False - return 2, [False] + return 0, [False] except Exception as e: file_logger.error(f"Exception in HUC {task_id}: {str(e)}") @@ -325,7 +325,8 @@ def download_nfhl_wrapper(huc_list, output_folder, geometryType='esriGeometryEnv # Set up logger file_dt_string = start_time.strftime("%Y_%m_%d-%H_%M_%S") log_file_path = os.path.join(output_folder, f"nfhl_download-{file_dt_string}.log") - file_logger = setup_mp_file_logger(log_file_path) + # file_logger = setup_mp_file_logger(log_file_path) + file_logger = setup_mp_file_logger(log_file_path, "nfhl_download") try: diff --git a/data/roads/pull_osm_roads.py b/data/roads/pull_osm_roads.py index 29948f82f..e3022233e 100755 --- a/data/roads/pull_osm_roads.py +++ b/data/roads/pull_osm_roads.py @@ -262,12 +262,12 @@ def single_huc_job( else: file_logger.warning(f"No roads within actual boundary of HUC for {task_id}") screen_queue.put(f"No roads within actual boundary of HUC for {task_id}") - return 0, [True] + return 1, [True] except Exception as e: file_logger.error(f"❌ Exception in HUC {HUC_no}: {str(e)}") file_logger.error(traceback.format_exc()) # Optional: log full traceback - return 2, [False] + return 0, [False] def pull_osm_roads(preclip_dir, output_dir, number_jobs, lst_hucs): @@ -289,7 +289,7 @@ def pull_osm_roads(preclip_dir, output_dir, number_jobs, lst_hucs): # Create the logger file_dt_string = start_time.strftime("%Y%m%d-%H%M") log_file_path = os.path.join(output_dir, f"pull_osm_roads_{file_dt_string}.log") - file_logger = setup_mp_file_logger(log_file_path) + file_logger = setup_mp_file_logger(log_file_path, "pull_osm_roads") print("==================================") print("Starting load of OSM Road data") diff --git a/data/usgs/get_usgs_rating_curves.py b/data/usgs/get_usgs_rating_curves.py index 9e17bf1bd..7e75663fa 100755 --- a/data/usgs/get_usgs_rating_curves.py +++ b/data/usgs/get_usgs_rating_curves.py @@ -104,12 +104,12 @@ def __mp_get_flows_for_site( flow_df = flow_df.rename(columns={'discharge': 'discharge_cms'}) site_flows_df = pd.concat([site_flows_df, flow_df], ignore_index=True) - return 0, [site_flows_df] + return 1, site_flows_df except Exception: file_logger.critical(f"❌ Critical error while processing {task_id}") file_logger.critical(traceback.format_exc()) - return 1, [] # shut the program down. + return -1, None # shut the program down. # Generate categorical flows for each category across all sites. @@ -131,12 +131,6 @@ def __write_categorical_flow_files( This function uses its own mp log file. This arg allows the mp logs to be appended to the parent when it is done. Note: For now.. any error files created anywhere are not removed. - - Returns - ------- - all_flows_data : DataFrame - A dataframe of categorical flow for every feature ID in the input metadata. - ''' threshold_url = f'{API_BASE_URL}/nws_threshold' @@ -189,7 +183,8 @@ def __write_categorical_flow_files( # Again.. we make a special mp for this set mp_log_file_path = os.path.join(output_dir, f"get_rating_curves-mp-flow-{file_datetime_string}.log") mp_logger = sf.setup_mp_file_logger(mp_log_file_path, logger_name="mp_flows") - list_flow_dfs = sf.run_with_mp( + # We get a list of dictionaries + flow_dfs = sf.run_with_mp( task_function=__mp_get_flows_for_site, tasks_args_list=sorted_tasks_args_list, file_logger=mp_logger, @@ -203,7 +198,14 @@ def __write_categorical_flow_files( # for now.. let's leave the error files alone, even the mp ones # roll up the list of df's into one master df - all_flows_data = pd.concat(list_flow_dfs, ignore_index=True) + # run_with_mp returns a list of dictionaries keyed with a huc. We don't care about the keys, just the values + # which are df's + all_flows_data = pd.DataFrame() + for i, value in enumerate(flow_dfs.values()): + if i == 0: + all_flows_data = value + else: + all_flows_data = pd.concat([all_flows_data, value]) # Write usgs stage discharge data, used by Sierra tests (rating_curve_comparison.py) logging.info("Writing for USGS discharge data for each usgs stage (ie. action, minor, etc)") @@ -211,8 +213,8 @@ def __write_categorical_flow_files( usgs_discharge_file_name = os.path.join(output_dir, 'usgs_stage_discharge_cms.csv') final_data = all_flows_data[['feature_id', 'discharge_cms', 'recurr_interval']] final_data.to_csv(usgs_discharge_file_name, index=False) - - return all_flows_data + else: + logging.info("No flow data was found. Saving of usgs_stage_discharge_cms file skipped") def set_global_env(env_file): @@ -260,7 +262,7 @@ def __mp_get_site_rating_curve( # If no rating curve was returned, skip site. if curve.empty: file_logger.warning(f'{location_id}: Removed because it has no rating curves') - return 2, [] # log and continue the next task + return 0, None # log and continue the next task # If the site is in PR, VI, or HI, keep datum in LMSL (local mean sea level) # because our 3DEP dems are also in LMSL for these areas. @@ -278,7 +280,7 @@ def __mp_get_site_rating_curve( f'VI, or HI but has a datum other than LMSL ({datum_name})' ) file_logger.warning(message) - return 2, [] # log and continue the next task + return 0, None # log and continue the next task # If the state is not PR, VI, or HI, then we want to adjust the datum to NAVD88 if needed. # If the datum is unknown, skip site. @@ -291,7 +293,7 @@ def __mp_get_site_rating_curve( # If datum API failed, print message and skip site. if datum_adj_ft is None: file_logger.warning(f'{location_id}: Removed because datum adjustment failed!!') - return 2, [] # log and continue the next task + return 0, None # log and continue the next task # If datum adjustment succeeded, calculate datum in NAVD88 navd88_datum = round(usgs['datum'] + datum_adj_ft, 2) @@ -306,13 +308,13 @@ def __mp_get_site_rating_curve( file_logger.warning( f'{location_id}: Removed because LMSL datum found outside of PR, VI, or HI' ) - return 2, [] # log and continue the next task + return 0, None # log and continue the next task else: # If the site has an unrecognized datum, skip site. datum_name = usgs['vcs'] file_logger.warning(f'{location_id}: Removed due to unknown datum ({datum_name})') - return 2, [] # log and continue the next task + return 0, None # log and continue the next task # Populate rating curve with metadata and use navd88 datum to convert stage to elevation. # If you came in looking for all sites, then "active" will be true. A filtered set, this might be True or False @@ -320,15 +322,15 @@ def __mp_get_site_rating_curve( curve['datum'] = usgs['datum'] curve['datum_vcs'] = usgs['vcs'] curve['navd88_datum'] = navd88_datum - curve['elevation_navd88'] = curve['stage'] + navd88_datum + curve['elevation_navd88'] = round(curve['stage'] + navd88_datum, 2) file_logger.debug(f"Done rating curves for usgs location id of {usgs_site_code}") - return 0, [curve] + return 1, curve except Exception: file_logger.critical(f"❌ Critical error while processing {task_id}") file_logger.critical(traceback.format_exc()) - return 1, [] # shut the program down. + return -1, None # shut the program down. def __get_usgs_metadata(list_of_gage_sites, metadata_url): @@ -535,12 +537,14 @@ def usgs_rating_to_elev(list_of_gage_sites, env_file, num_jobs, output_dir): sys.exit() else: print(f'Loading environment file: {env_file}') + print("") # Set global variables set_global_env(env_file) if list_of_gage_sites != 'all': print( - "*** You have provide a list of specific usgs site codes to process.\nPlease note that when getting all sites," + "\n" + "*** NOTICE: You have provide a list of specific usgs site codes to process.\nPlease note that when getting all sites," " it filters to only sites that are active. But when using specific codes, it will not use the 'is active' filter.\n\n" "To continue, hit your enter key or CTRL-C to abort" ) @@ -575,9 +579,7 @@ def usgs_rating_to_elev(list_of_gage_sites, env_file, num_jobs, output_dir): file_datetime_string = overall_start_dt.strftime("%Y%m%d-%H%M") display_dt_string = datetime.now(timezone.utc).strftime("%m/%d/%Y %H:%M:%S") - log_file_name = f"get_rating_curves-{file_datetime_string}.log" - log_file_path = os.path.join(output_dir, log_file_name) - sf.setup_file_logger(log_file_path) + log_file_path = sf.setup_file_logger(output_dir, "get_rating_curves") # file_logger = sf.setup_mp_file_logger(log_file_path) try: @@ -604,7 +606,6 @@ def usgs_rating_to_elev(list_of_gage_sites, env_file, num_jobs, output_dir): display_dt_string = section_start_dt.strftime("%m/%d/%Y %H:%M:%S") logging.info("=============") logging.info(f"Processing metadata started: {display_dt_string} (UTC)") - all_rating_curves = pd.DataFrame() num_sites = len(metadata_list) logging.info(f"Number of sites to process: {num_sites}") @@ -634,7 +635,8 @@ def usgs_rating_to_elev(list_of_gage_sites, env_file, num_jobs, output_dir): # parent script. mp_log_file_path = os.path.join(output_dir, f"get_rating_curves-mp-{file_datetime_string}.log") mp_logger = sf.setup_mp_file_logger(mp_log_file_path, logger_name="mp_rcs") - list_rating_curves_dfs = sf.run_with_mp( + # We get a list of dictionaries + rating_curves_dfs = sf.run_with_mp( task_function=__mp_get_site_rating_curve, tasks_args_list=sorted_tasks_args_list, file_logger=mp_logger, @@ -648,11 +650,19 @@ def usgs_rating_to_elev(list_of_gage_sites, env_file, num_jobs, output_dir): # for now.. let's leave the error files alone, even the mp ones # more processing of rating curves - if len(list_rating_curves_dfs) == 0: + if len(rating_curves_dfs) == 0: logging.error("There are no acceptable sites. Stopping program.") sys.exit(1) - all_rating_curves = pd.concat(list_rating_curves_dfs) + # run_with_mp returns a list of dictionaries keyed with a huc. We don't care about the keys, just the values + # which are df's + all_rating_curves = pd.DataFrame() + for i, value in enumerate(rating_curves_dfs.values()): + if i == 0: + all_rating_curves = value + else: + all_rating_curves = pd.concat([all_rating_curves, value]) + logging.info(f"Number of sites to processes with metadata: {len(all_rating_curves)}") display_dt_string = datetime.now(timezone.utc).strftime("%m/%d/%Y %H:%M:%S") diff --git a/docs/CHANGELOG.md b/docs/CHANGELOG.md index ec063590f..9e7397dc4 100755 --- a/docs/CHANGELOG.md +++ b/docs/CHANGELOG.md @@ -1,6 +1,25 @@ All notable changes to this project will be documented in this file. We follow the [Semantic Versioning 2.0.0](http://semver.org/) format. +## v4.8.12.3 - 2025-10-10 - [PR#1660](https://github.com/NOAA-OWP/inundation-mapping/pull/1660) + +The file shared_functions.py had a bug in it related to a object length check. During that processes we discovered some enhancements to logging tools and run_with_mp. The return code system was changed now where 1 = success, 0 = fail but do not abort and -1 means fail and abort run. + +This triggered changes to scripts that use the logging or run_with_mp tools for their return values from their multi-proc functions. The change affected four data load scripts, and all were retested for correct data loading. A fifth data load script, pull_osm_bridges.py, was updated but only for a comment and was not affected by other changes in this PR. + +There was also a change to shared_functions.setup_file_logger. It no longer takes a full path and file name for a log file, but two separate variables, one for the log folder, the other for a log file prefix. A date will be added automatically to the log file name. + +### Changes + +- `data` + - `bridges\make_dem_dif_for_bridges.py`: Updated call to setup_file_logger plus changes related to run_with_mp. A bit of additional error handling was also added. + - `bridges\pull_osm_bridges.py`: Just a comment added + - `nfhl\download_fema_nfhl.py`: Updated call to setup_file_logger plus changes related to run_with_mp. + - `roads\pull_osm_roads.py`: Updated call to setup_file_logger plus changes related to run_with_mp. +- `src\shared_functions.py`: as described. + - `usgs\get_usgs_rating_curve.py`: Updated call to setup_file_logger plus changes related to run_with_mp. A bit of additional error handling was also added. A small fix for rounding was also fixed. +
+ ## v4.8.12.2 - 2025-10-10 - [PR#1616](https://github.com/NOAA-OWP/inundation-mapping/pull/1616) Fixes a bug that was introduced to flow-based CatFIM in recent changes to the Inundate_gms() function. The bug was fixed by adding multi_process = True as an input to the function. diff --git a/src/utils/shared_functions.py b/src/utils/shared_functions.py index dc825589c..314754d23 100755 --- a/src/utils/shared_functions.py +++ b/src/utils/shared_functions.py @@ -23,26 +23,26 @@ from tqdm import tqdm +# Registry: name -> path +_LOGGER_REGISTRY = {} + gp.options.io_engine = "pyogrio" # ################################# # log file tools -def calc_error_log_file_path(log_file_path): - - # takes the log_file_path and creates and adjusted file name to handle errors - # ie) log_file_path = /data/outputs/my_log_file.log and comes back with - # /data/outputs/my_log_file-errors.log - # This helps with error file rollup into parent logs if desired - if not log_file_path.endswith(".log"): - raise Exception("log file name must end with .log") - - return log_file_path.replace(".log", "-errors.log") # This one is a standard Python logger, NOT MEANT for multi-proc -def setup_file_logger(log_file_path): +# def setup_file_logger(log_file_path): +def setup_file_logger(log_file_dir, log_file_name_prefix): """ + + This creates a file name for you. I will take the log_file_name_prefix, then append a dt, then extension + + ie) setup_file_logger("/ouputs/mylogs", "pull_osm_bridges") + The log name becomes "/outputs/mylogs/pull_osm_bridges_20250925_1842.log + This one is not meant to be used for MP's. It prints to file and screen at the same time. @@ -58,10 +58,20 @@ def setup_file_logger(log_file_path): Note: The file is created automatically and if no actual errors are found, that file will be empty. In the end, the error log file is not removed if it is empty, just watch its file size. + + Returns the name/path of the new log file. """ - if not log_file_path.endswith(".log"): - raise Exception("log file name must end with .log") + if log_file_dir is None or log_file_dir == "": + raise ValueError("log directory path can not be None or empty") + + if log_file_name_prefix is None or log_file_name_prefix == "": + raise ValueError("log file name prefix can not be None or empty") + + file_dt_string = datetime.now(timezone.utc).strftime("%Y%m%d-%H%M") + log_file_name = f"{log_file_name_prefix}_{file_dt_string}.log" + log_file_path = os.path.join(log_file_dir, log_file_name) + print(f"Logs saved to: {log_file_path}") logger = logging.getLogger() logger.setLevel(logging.INFO) @@ -77,7 +87,7 @@ def setup_file_logger(log_file_path): formatter = logging.Formatter("%(asctime)s - %(levelname)s : %(message)s") # error file handler - error_file_name = calc_error_log_file_path(log_file_path) + error_file_name = log_file_path.replace(".log", "-errors.log") err_file_handler = logging.FileHandler(error_file_name) err_file_handler.setLevel(logging.ERROR) err_file_handler.setFormatter(formatter) @@ -93,14 +103,18 @@ def setup_file_logger(log_file_path): logger.addHandler(file_handler) logger.addHandler(console_handler) - # return logger (don't do it :) ) + return log_file_path # This one is more designed to be for multi-proc as it has logger handler names # Notice how it does not have a "console / stream handler"? hence, a special function for MP -def setup_mp_file_logger(log_file_path, logger_name="custom_logger", level=logging.DEBUG): +def setup_mp_file_logger(log_file_path: str, logger_name: str, level=logging.DEBUG): """ + Create a logger bound by a strict bijection: + - Each logger_name maps to exactly one log_file_path. + - Each log_file_path may only be used by one logger_name. + This version is meant for use in MP as it makes a custom logger and does not use the default logger. However, both can co-exist. @@ -121,10 +135,30 @@ def setup_mp_file_logger(log_file_path, logger_name="custom_logger", level=loggi """ + if not logger_name: + raise ValueError("logger_name can not be None or empty") + if not log_file_path.endswith(".log"): raise Exception("log file name must end with .log") - os.makedirs(os.path.dirname(log_file_path), exist_ok=True) + abs_path = os.path.abspath(log_file_path) + os.makedirs(os.path.dirname(abs_path), exist_ok=True) + + # Check name -> path + if logger_name in _LOGGER_REGISTRY and _LOGGER_REGISTRY[logger_name] != abs_path: + raise ValueError( + f"Logger '{logger_name}' already bound to '{_LOGGER_REGISTRY[logger_name]}', cannot bind to '{abs_path}'." + ) + + # Check path -> name (reverse lookup) + for name, path in _LOGGER_REGISTRY.items(): + if path == abs_path and name != logger_name: + raise ValueError( + f"Path '{abs_path}' is already bound to logger '{name}', cannot assign to '{logger_name}'." + ) + + # Register if new + _LOGGER_REGISTRY.setdefault(logger_name, abs_path) logger = logging.getLogger(logger_name) logger.setLevel(level) @@ -136,7 +170,7 @@ def setup_mp_file_logger(log_file_path, logger_name="custom_logger", level=loggi # Order of adding handlers may be important ??? # error file handler - error_file_name = calc_error_log_file_path(log_file_path) + error_file_name = log_file_path.replace(".log", "-errors.log") err_file_handler = logging.FileHandler(error_file_name) err_file_handler.setLevel(logging.ERROR) err_file_handler.setFormatter(formatter) @@ -198,23 +232,23 @@ def run_with_mp( # In the end, you will have a set of T/F, dictionaries, dataframes, string, etc # - A status code. options are: - # 0: Success and show tqdm or print success line - # 1: Fail and the entire script should be aborted - # 2: Fail but don't shut down, advance the pbar AND show the tqdm / print error or warning message + # 1: Success and show tqdm or print success line + # 0: Fail but don't shut down, advance the pbar AND show the tqdm / print error or warning message + # -1: Critical Fail and the entire script should be aborted # Some examples of usage: - - # Some tools like pull_osm_roads.py want a T/F returned for every mp item, so its mp process + # data\roads\pull_osm_roads.py wants a T/F returned for every mp item, so its mp process # named "single_huc_job" returns: - # 0, [True] (meaning success and add "True" to the run_by_mp result set) - # 2, [False] (meaning fail don't shut down the entire process, add the value of + # 1, [True] (meaning success and add "True" to the run_by_mp result set) + # 0, [False] (meaning fail don't shut down the entire process, add the value of # False to the run_with_mp return results and show the tqdm / print message - # Some tools like get_usgs_rating_curves.py have different needs. Inside its mp function, + # Another example: + # data\usgs\get_usgs_rating_curves.py have different needs. Inside its mp function, # named "__mp___mp_get_site_rating_curve" could have three scenerios (at a min) - # 0, [some dataframe] (success and add the dataframe to the run_by_mp result set) - # 1, [] (Catestrophic fail, shut down the entire script) - # 2, [] (Fail but there is nothing to add to the run_by_mp result set) + # 1, [some dataframe] (success and add the dataframe to the run_by_mp result set) + # 0, [None] (Fail but there is nothing to add to the run_by_mp result set) + # -1, [None] (Catestrophic fail, shut down the entire script) # ++++++++++++++++++++++ @@ -244,7 +278,7 @@ def log_worker(queue): # - can be a system level such as a CTRL-C # - CPU collisons, etc - # Becuase there are multiple ways that an MP can crash, it very easy to leave either + # Because there are multiple ways that an MP can crash, it very easy to leave either # an orphaned process (memory leak), or a thread that is still forceing the program to stay open. # It is not possible to kill an mp function already in progress short of some very, very complex # complete operating system process management (extremely not recommended). @@ -273,6 +307,7 @@ def log_worker(queue): results = {} with ProcessPoolExecutor(max_workers=max_workers) as executor: + future_to_id = {} # up to this point, the code is run immediately--submision is done right away. Now we wait for each job to be completed and be processed as below pbar = tqdm( @@ -311,7 +346,7 @@ def log_worker(queue): # See notes above about return values. rtn_code, rtn_value = future.result() - if rtn_code == 0: + if rtn_code == 1: # Positive = good # success and show tqdm or print line if show_progress: tqdm.write( @@ -321,7 +356,14 @@ def log_worker(queue): print(f"✅ Success for {task_id}") file_logger.info(f"✅ Success for {task_id}") - elif rtn_code == 1: + elif rtn_code == 0: # Fail but not shut down the pool. + if show_progress: + tqdm.write(f"❌ Error reported for {task_id}.") + else: + print(f"❌ Error reported for {task_id}.") + file_logger.info(f"❌ Error reported for {task_id}.") + + else: # rtn_code == -1, but really any negative int # Catestrophic fails, shut the tool down (and assumes the mp logged the reason why) # throw an exception to shut down and cleanup all objects (pool, tqdm, queue) raise Exception( @@ -329,32 +371,6 @@ def log_worker(queue): " See exception details in the logs." ) - elif rtn_code == 2: - # Fail but not shut down the pool. - if rtn_code == 2: # show tqdm / print message - if show_progress: - tqdm.write(f"❌ Error or Warning reported for {task_id}.") - else: - print(f"❌ Error or Warning reported for {task_id}.") - file_logger.info(f"❌ Error or Warning reported for {task_id}.") - else: - raise Exception("Child mp task returned and invalid status code") - - if len(rtn_value) == 1: - # add it to the run_with_mp results - # Some mp functions will return an empty list meaning they don't - # want to add anything to the run_with_mp return set. - - # IMPORTANT NOTE: - # This extracts the first item only. - results[task_id] = rtn_value[0] - if len(rtn_value > 1): - raise Exception( - "Child mp task must return either 0 or 1 list items, and you have more" - " than one item in the return list. Consider a list or dictionary in" - " return list." - ) - if pbar: # print("task bar being updated") pbar.update(1) # ✅ Progress update for each completed task