diff --git a/docs/CHANGELOG.md b/docs/CHANGELOG.md index f22ff5026..6c9a3ff9e 100755 --- a/docs/CHANGELOG.md +++ b/docs/CHANGELOG.md @@ -1,6 +1,24 @@ All notable changes to this project will be documented in this file. We follow the [Semantic Versioning 2.0.0](http://semver.org/) format. +## V4.8.14.0 - 2025-10-10 - [PR#1650](https://github.com/NOAA-OWP/inundation-mapping/pull/1650) + +Fixes Alaska HUCs attempt to convert to Int16 if there are too many Hydro IDs and creates error handling for when this issue may arise. + +### Changes +- src/delineate_hydros_and_produce_HAND.sh: Add check for Alaska HUC2 identifier before running conversion to int16 data type. +- src/process_branch.sh: Add handling for int16 conversion failure error code. +- src/src_adjust_spatial_obs.py: Add check for Alaska HUC2 identifier to see if process should run in Int16 or Int32. +- src/utils/fim_enums.py: Add error code for int16 conversion error. +- tools/catfim/generate_categorical_fim_mapping.py: Process in either in Int16 for rem and HydroIDs, or int32 and float32 respectively in the case of an Alaska HUC. +- tools/inundation.py: Process in either in Int16 for rem and HydroIDs, or int32 and float32 respectively in the case of an Alaska HUC. +- tools/mosaic_inundation.py: Add rasterio merge and remove calls to removed script overlapping_inundation.py. +- tools/convert_to_int16.py: Add raise error for scenarios where catchment raster has too many HydroIDs or HydroIDs that have more than 8 digits. + +### Removals +- /tools/overlapping_inundation.py +
+ ## v4.8.13.0 - 2025-10-10 - [PR#1673](https://github.com/NOAA-OWP/inundation-mapping/pull/1673) This script processes river segment slope data to identify and correct unrealistic values based on user-defined thresholds. It is designed to check against the NWM hydrofab to and fill/extrapolate missing or invalid slope values using valid upstream/downstream values. diff --git a/src/delineate_hydros_and_produce_HAND.sh b/src/delineate_hydros_and_produce_HAND.sh index 40f7919cd..dda968540 100755 --- a/src/delineate_hydros_and_produce_HAND.sh +++ b/src/delineate_hydros_and_produce_HAND.sh @@ -2,6 +2,7 @@ ## Level is equal to the parent script: 'unit' or 'branch' level=$1 +huc2Identifier=${hucNumber:0:2} if [ "$level" = "branch" ]; then b_arg=$tempCurrentBranchDataDir/nwm_subset_streams_levelPaths_$current_branch_id.gpkg @@ -311,8 +312,12 @@ if [ "$current_branch_id" = "$branch_zero_id" ] && [ "$evaluateCrosswalk" = "1" -z $current_branch_id fi -## CONVERSION TO INT16 ## -echo -e $startDiv"Convert GW Catchments and REM to Int16 $hucNumber $current_branch_id" -python3 $toolsDir/convert_to_int16.py \ - -b $tempCurrentBranchDataDir +if [ $huc2Identifier -eq 19 ]; then + echo -e "Skipping Int16 Conversion for Alaska HUC" +else + ## CONVERSION TO INT16 ## + echo -e $startDiv"Convert GW Catchments and REM to Int16 $hucNumber $current_branch_id" + python3 $toolsDir/convert_to_int16.py \ + -b $tempCurrentBranchDataDir +fi diff --git a/src/process_branch.sh b/src/process_branch.sh index 7a85a5627..88e74ac6f 100755 --- a/src/process_branch.sh +++ b/src/process_branch.sh @@ -43,6 +43,11 @@ do err_exists=1 echo "***** Branch has no crosswalks *****" rm -rf $tempHucDataDir/branches/$branchId/ + elif [ $code -eq 65]; then + echo + err_exists=1 + echo "***** Too many HydroIDs or a HydroID with more than 8 digits in gw catchments to convert to Int16 *****" + rm -rf $tempHucDataDir/branches/$branchId/ elif [ $code -ne 0 ]; then echo err_exists=1 diff --git a/src/src_adjust_spatial_obs.py b/src/src_adjust_spatial_obs.py index e8116e13e..861f6c688 100644 --- a/src/src_adjust_spatial_obs.py +++ b/src/src_adjust_spatial_obs.py @@ -86,18 +86,31 @@ def process_points(args): water_edge_df = water_edge_df.to_crs(DEFAULT_FIM_PROJECTION_CRS) - with open(hydroid_prefixpath, 'r') as file: - hydroid_prefix = file.read() - int_hid_prefix = int(hydroid_prefix) * 10000 + process_int16 = True + try: + with open(hydroid_prefixpath, 'r') as file: + hydroid_prefix = file.read() + int_hid_prefix = int(hydroid_prefix) * 10000 + except FileNotFoundError: + process_int16 = False + int_hid_prefix = 0 ## Use point geometry to determine HAND raster pixel values. with rasterio.open(hand_path) as hand_src, rasterio.open(catchments_path) as catchments_src: - water_edge_df['hand'] = [np.float32(h[0]) / 1000 for h in hand_src.sample(coords)] + + water_edge_df['hand'] = ( + [np.float32(h[0]) / 1000 for h in hand_src.sample(coords)] + if process_int16 + else [h[0] for h in hand_src.sample(coords)] + ) + hydroids = [] for c in catchments_src.sample(coords): - hid = int_hid_prefix * -1 + c[0] if c[0] < 0 else int_hid_prefix + c[0] + c[0] = c[0] if c[0] >= 0 else c[0] * -1 + hid = int_hid_prefix + c[0] if process_int16 else c[0] hydroids.append(hid) + water_edge_df['hydroid'] = hydroids water_edge_df = water_edge_df[ diff --git a/src/utils/fim_enums.py b/src/utils/fim_enums.py index cee14f63d..2375dc9fe 100644 --- a/src/utils/fim_enums.py +++ b/src/utils/fim_enums.py @@ -25,8 +25,13 @@ class FIM_exit_codes(Enum): https://docs.python.org/3/library/enum.html ''' + # Codes beginning with 6 are acceptable errors UNIT_NO_BRANCHES = 60 NO_FLOWLINES_EXIST = 61 EXCESS_UNIT_ERRORS = 62 NO_BRANCH_LEVELPATHS_EXIST = 63 NO_VALID_CROSSWALKS = 64 + CANNOT_CONVERT_HYDROIDS_TO_INT16 = 65 + + # @TODO Make system codes that shut down the entire HUC process + # Codes beginning with 5 are not acceptable errors and shutdown HUC processing diff --git a/tools/catfim/generate_categorical_fim_mapping.py b/tools/catfim/generate_categorical_fim_mapping.py index 8d65b7508..d7061a33e 100755 --- a/tools/catfim/generate_categorical_fim_mapping.py +++ b/tools/catfim/generate_categorical_fim_mapping.py @@ -73,7 +73,9 @@ def produce_stage_based_lid_tifs( # Subtract HAND gage elevation from HAND WSE to get HAND stage. hand_stage_m = datum_adj_wse_m - lid_usgs_elev - hand_stage = round(hand_stage_m * 1000) # convert to mm to match HAND + hand_stage = ( + hand_stage_m if str(huc)[:2] == '19' else round(hand_stage_m * 1000) + ) # convert to mm to match HAND # If hand_stage is negative, write message and exit out if hand_stage < 0: @@ -324,15 +326,20 @@ def produce_inundated_branch_tif( # Use numpy.where operation to reclassify rem_path on the condition that the pixel values # are <= to hand_stage and the catchments value is in the hydroid_list. + + is_alaska = str(huc)[:2] == '19' + + output_dtype = 'uint8' if is_alaska else 'int16' + reclass_rem_array = np.where((rem_array <= hand_stage) & (rem_array != rem_src.nodata), 1, 0).astype( - 'int16' + output_dtype ) # The catchment_array has hydroid that have had the first 4 chars cut off # we need to the same for the hydroid's from the hydroid_list clipped_hydroid_list = [] for i in hydroid_list: - clipped_str = str(i)[-4:] + clipped_str = str(i) if is_alaska else str(i)[-4:] clipped_hydroid_list.append(int(clipped_str)) # Create a mask of the catchments_array where the values are in the clipped_hydroid_list @@ -341,12 +348,12 @@ def produce_inundated_branch_tif( # Create a target array where the catchments_array is not nodata and the hydroid_mask is True target_catchments_array = np.where( ((hydroid_mask == True) & (catchments_array != catchments_src.nodata)), 1, 0 - ).astype('int16') + ).astype(output_dtype) # Mask the reclass_rem_array with the target_catchments_array masked_reclass_rem_array = np.where( ((reclass_rem_array >= 1) & (target_catchments_array >= 1)), 1, 0 - ).astype('int16') + ).astype(output_dtype) ## Debugging: diff --git a/tools/convert_to_int16.py b/tools/convert_to_int16.py index a80307ccd..3aa7d8e4a 100644 --- a/tools/convert_to_int16.py +++ b/tools/convert_to_int16.py @@ -1,7 +1,7 @@ #!/usr/bin/env python3 - import argparse import os +import sys import traceback from glob import glob @@ -9,6 +9,8 @@ import rioxarray as rxr import xarray as xr +from utils.fim_enums import FIM_exit_codes + def convert_to_int16(branch_dir: str): """ @@ -31,23 +33,17 @@ def convert_to_int16(branch_dir: str): # Iterate through each pair of gw catchments and rems for c, r in zip(catchments, rems): - rem = rxr.open_rasterio(r) - # Save original as another file to be deleted by deny list or saved - rem.rio.to_raster(r.replace('.tif', '_float32.tif'), compress="LZW", tiled=True) - nodata, crs = rem.rio.nodata, rem.rio.crs - - # Preserve the second highest possible number for int16, use the highest number for nodata - rem.data = xr.where(rem > 32.766, 32.766, rem) - rem.data = xr.where(rem >= 0, np.round(rem * 1000), 32767) - - rem = rem.astype(np.int16) - rem.rio.write_nodata(32767, inplace=True) - rem.rio.write_crs(crs, inplace=True) + catchment = rxr.open_rasterio(c) - rem.rio.to_raster(r, dtype=np.int16, compress="LZW", tiled=True) + # Check if converting data is possible + if (np.unique(catchment).shape[0] > 32766) | (len(str(int(np.max(catchment)))) > 8): - catchment = rxr.open_rasterio(c) + print( + "Catchment raster has either more than 32766 unique HydroIDs or has HydroIDs with more", + "than 8 digits. Please adjust data accordingly before running Int16 data conversion.", + ) + sys.exit(FIM_exit_codes.CANNOT_CONVERT_HYDROIDS_TO_INT16.value) if hydroid_prefix is None: hydroid_prefix = str(int(np.floor(catchment.max() / 10000))) @@ -69,6 +65,22 @@ def convert_to_int16(branch_dir: str): with open(hydroid_prefix_path, 'w') as file: file.write(hydroid_prefix) + rem = rxr.open_rasterio(r) + + # Save original as another file to be deleted by deny list or saved + rem.rio.to_raster(r.replace('.tif', '_float32.tif'), compress="LZW", tiled=True) + nodata, crs = rem.rio.nodata, rem.rio.crs + + # Preserve the second highest possible number for int16, use the highest number for nodata + rem.data = xr.where(rem > 32.766, 32.766, rem) + rem.data = xr.where(rem >= 0, np.round(rem * 1000), 32767) + + rem = rem.astype(np.int16) + rem.rio.write_nodata(32767, inplace=True) + rem.rio.write_crs(crs, inplace=True) + + rem.rio.to_raster(r, dtype=np.int16, compress="LZW", tiled=True) + if __name__ == "__main__": diff --git a/tools/inundation.py b/tools/inundation.py index bc74e5a16..9a24588c8 100644 --- a/tools/inundation.py +++ b/tools/inundation.py @@ -176,10 +176,15 @@ def inundate( if hydro_table is None: raise TypeError("Pass hydro table csv") + depths_profile = rem.profile + inundation_profile = catchments.profile + + int_16 = inundation_profile['dtype'] == 'int16' + # catchment stages dictionary if hydro_table is not None: catchmentStagesDict, hucSet = __subset_hydroTable_to_forecast( - hydro_table, forecast, subset_hucs, precalb_option + hydro_table, forecast, subset_hucs, int_16, precalb_option ) else: raise TypeError("Pass hydro table csv") @@ -188,9 +193,6 @@ def inundate( if src_table is not None: create_src_subset_csv(hydro_table, catchmentStagesDict, src_table) - depths_profile = rem.profile - inundation_profile = catchments.profile - depths_profile.update(driver='GTiff', blockxsize=256, blockysize=256, tiled=True) inundation_profile.update(driver='GTiff', blockxsize=256, blockysize=256, tiled=True, nodata=0) @@ -202,6 +204,8 @@ def inundate( else None ) + nodata = np.int16(inundation_profile['nodata']) if int_16 else np.int32(inundation_profile['nodata']) + # make windows generator window_gen = __make_windows_generator( rem, @@ -217,7 +221,8 @@ def inundate( windowed=windowed, depth_rst=depth_rst, inundation_rst=inundation_rst, - inundation_nodata=inundation_profile['nodata'], + inundation_nodata=nodata, + min_value=30 if int_16 else 0.03048, ) inundation_rasters = [] @@ -251,6 +256,7 @@ def __inundate_in_huc( quiet: Optional[bool] = False, window: Optional[bool] = None, inundation_nodata: Optional[int] = None, + min_value=30, ) -> Tuple[str, str, str]: """ Inundate within the chosen scope @@ -296,7 +302,8 @@ def __inundate_in_huc( catchmentStagesDict, rem_array.shape[1], rem_array.shape[0], - np.int16(inundation_nodata), + inundation_nodata, + min_value, ) if depths is not None: @@ -310,7 +317,13 @@ def __inundate_in_huc( @njit(nogil=True, fastmath=True, cache=True) def __go_fast_mapping( - rem: np.ndarray, catchments: np.ndarray, catchment_stages_dict: typed.Dict, x: int, y: int, nodata_c: int + rem: np.ndarray, + catchments: np.ndarray, + catchment_stages_dict: typed.Dict, + x: int, + y: int, + nodata_c: int, + min_value: Union[int, float], ) -> Tuple[np.ndarray, np.ndarray]: """ Numba optimization for determining flood depth and flood @@ -349,7 +362,7 @@ def __go_fast_mapping( depth = catchment_stages_dict[catchments[i, j]] - rem[i, j] # If the depth is greater than approximately 1/10th of a foot - if depth < 30: + if depth < min_value: catchments[i, j] *= -1 # set HydroIDs to negative rem[i, j] = 0 else: @@ -382,6 +395,7 @@ def __make_windows_generator( depth_rst: Optional[str] = None, inundation_rst: Optional[str] = None, inundation_nodata: Optional[int] = None, + min_value: int = 30, ): """ Generator to split processing in to windows or different masked datasets @@ -512,6 +526,7 @@ def __return_huc_in_hucSet(hucCode, hucSet): "quiet": quiet, "window": None, "inundation_nodata": inundation_nodata, + "min_value": min_value, } else: hucCode = None @@ -530,6 +545,7 @@ def __return_huc_in_hucSet(hucCode, hucSet): "quiet": quiet, "window": window, "inundation_nodata": inundation_nodata, + "min_value": min_value, } else: yield { @@ -544,6 +560,7 @@ def __return_huc_in_hucSet(hucCode, hucSet): "quiet": quiet, "window": None, "inundation_nodata": inundation_nodata, + "min_value": min_value, } @@ -575,6 +592,7 @@ def __subset_hydroTable_to_forecast( hydroTable: Union[str, pd.DataFrame], forecast: Union[str, pd.DataFrame], subset_hucs=None, + process_int16=True, precalb_option: bool = False, ) -> Tuple[typed.Dict, List[str]]: """ @@ -588,6 +606,8 @@ def __subset_hydroTable_to_forecast( Whether to rename the headers in the forecast file subset_hucs: Union[str, list] List to subset the hydrotable + process_int16: bool, default = True + Whether to process inundation with int16 datatype Returns ------- @@ -691,12 +711,16 @@ def __subset_hydroTable_to_forecast( try: hydroTable = hydroTable.join(forecast, on=['feature_id'], how='inner') except AttributeError: - # print("FORECAST ERROR") raise NoForecastFound("No forecast value found for the passed feature_ids in the Hydro-Table") else: + # initialize dictionary - catchmentStagesDict = typed.Dict.empty(types.int16, types.int16) + catchmentStagesDict = ( + typed.Dict.empty(types.int16, types.int16) + if process_int16 + else typed.Dict.empty(types.int32, types.float32) + ) # interpolate stages for hid, sub_table in hydroTable.groupby(level='HydroID'): @@ -716,8 +740,8 @@ def __subset_hydroTable_to_forecast( # add this interpolated stage to catchment stages dict h = round(interpolated_stage[0], 4) - hid = types.int16(np.int16(str(hid)[4:])) - h = types.int16(np.round(h * 1000)) + hid = types.int16(np.int16(str(hid)[4:])) if process_int16 else types.int32(hid) + h = types.int16(np.round(h * 1000)) if process_int16 else types.float32(h) catchmentStagesDict[hid] = h # huc set diff --git a/tools/mosaic_inundation.py b/tools/mosaic_inundation.py index 5b72652e4..f21bd18be 100755 --- a/tools/mosaic_inundation.py +++ b/tools/mosaic_inundation.py @@ -3,16 +3,31 @@ import argparse import os +from concurrent.futures import ThreadPoolExecutor, as_completed +from threading import Lock from typing import Optional, Union +import geopandas as gpd +import numpy as np import pandas as pd -from overlapping_inundation import OverlapWindowMerge +import rasterio +import rioxarray as rxr +import xarray as xr +from geocube.api.core import make_geocube +from rasterio.features import shapes +from rasterio.merge import merge +from shapely.geometry import box +from shapely.geometry.multipolygon import MultiPolygon +from shapely.geometry.polygon import Polygon from tqdm import tqdm from utils.shared_functions import FIM_Helpers as fh from utils.shared_variables import elev_raster_ndv +gpd.options.io_engine = "pyogrio" + + def Mosaic_inundation( map_file: Union[str, pd.DataFrame], mosaic_attribute: str, @@ -184,21 +199,14 @@ def mosaic_by_unit( str File name of mosaiced output """ - # overlap object instance - overlap = OverlapWindowMerge(inundation_maps_list, (30, 30)) if mosaic_output is not None: - if workers > 1: - threaded = True - else: - threaded = False - overlap.merge_rasters(mosaic_output, threaded=threaded, workers=workers, nodata=nodata) + merge(inundation_maps_list, method='max', nodata=nodata, dst_path=mosaic_output) if mask: fh.vprint("Masking ...", verbose) - # print("Masking Begin", time.localtime()) - overlap.mask_mosaic(mosaic_output, mask, outfile=mosaic_output, workers=workers) + mask_mosaic(mosaic_output, mask, outfile=mosaic_output, workers=workers) if remove_inputs: fh.vprint("Removing inputs ...", verbose) @@ -211,6 +219,76 @@ def mosaic_by_unit( return remove_list +def _vprint(message, verbose): + if verbose: + print(message) + + +def mask_mosaic(mosaic, polys, polys_layer=None, outfile=None, workers=4, quiet=True): + + if isinstance(mosaic, str): + with rasterio.open(mosaic, 'r') as rst: + windows = [windows for _, windows in rst.block_windows()] + profile = rst.profile + elif isinstance(mosaic, rasterio.DatasetReader): + pass + else: + raise TypeError("Pass rasterio dataset or filepath for mosaic") + + if isinstance(polys, str): + polys = gpd.read_file(polys, layer=polys_layer) + elif isinstance(polys, gpd.GeoDataFrame): + pass + else: + raise TypeError("Pass geopandas dataset or filepath for catchment polygons") + + mosaic_read = rxr.open_rasterio(mosaic) + mosaic_read = mosaic_read.sel({'band': 1}) + geom = polys['geometry'].values[0] + + def write_window(geom, window, wrst, lock): + mosaic_slice = mosaic.isel( + y=slice(window.row_off, window.row_off + window.height), + x=slice(window.col_off, window.col_off + window.width), + ) + bbox = box(*mosaic_slice.rio.bounds()) + + if geom.intersects(bbox): + + inter = geom.intersection(bbox) + + if inter.area != bbox.area: + gdf_temp = gpd.GeoDataFrame(geometry=[inter], crs=mosaic_slice.rio.crs) + gdf_temp['arb'] = np.int8(1) + temp_rast = make_geocube(vector_data=gdf_temp, measurements=['arb'], like=mosaic_slice) + mosaic_slice.data = xr.where(np.isnan(temp_rast['arb']), 0, mosaic_slice.data) + # with lock: + wrst.write_band(1, mosaic_slice.data.squeeze(), window=window) + + executor = ThreadPoolExecutor(max_workers=workers) + + def __data_generator(windows, mosaic, geom, wrst, lock): + for window in windows: + yield mosaic, geom, window, wrst, lock + + lock = Lock() + + with rasterio.open(outfile, "r+", **profile) as wrst: + dgen = __data_generator(windows, mosaic_read, geom, wrst, lock) + results = {executor.submit(write_window, *wg): 1 for wg in dgen} + + for future in as_completed(results): + try: + future.result() + except Exception as exc: + _vprint("Exception {} for {}".format(exc, results[future]), not quiet) + else: + if results[future] is not None: + _vprint("... {} complete".format(results[future]), not quiet) + else: + _vprint("... complete", not quiet) + + def mosaic_final_inundation_extent_to_poly( inundation_raster: Optional[str] = None, inundation_polygon: Optional[str] = None, @@ -229,14 +307,6 @@ def mosaic_final_inundation_extent_to_poly( File type to output inundation polygon """ - import geopandas as gpd - import numpy as np - import rasterio - from rasterio.features import shapes - from shapely.geometry.multipolygon import MultiPolygon - from shapely.geometry.polygon import Polygon - - gpd.options.io_engine = "pyogrio" with rasterio.open(inundation_raster) as src: # Open inundation_raster using rasterio. diff --git a/tools/overlapping_inundation.py b/tools/overlapping_inundation.py deleted file mode 100644 index 4c740404f..000000000 --- a/tools/overlapping_inundation.py +++ /dev/null @@ -1,535 +0,0 @@ -#!/usr/bin/env python -# coding: utf-8 - -import concurrent.futures -import warnings -from concurrent.futures import ThreadPoolExecutor, as_completed -from functools import partial -from threading import Lock - -import geopandas as gpd -import numpy as np -import rasterio -import rioxarray as rxr -import xarray as xr -from affine import Affine -from geocube.api.core import make_geocube -from numba import njit -from rasterio.windows import from_bounds -from scipy.optimize import newton -from shapely.geometry import box - - -gpd.options.io_engine = "pyogrio" - - -def _vprint(message, verbose): - if verbose: - print(message) - - -class OverlapWindowMerge: - def __init__(self, inundation_rsts, num_partitions=None, window_xy_size=None): - """ - Initialize the object - - :param inundation_rsts: list of inundation paths or datasets - :param num_partitions: tuple of integers representing num windows in x and y space - :param window_xy_size: tuple of integers represeting num of pixels in windows in x an y space - """ - - # sort for largest spanning dataset (todo: handle mismatched resolutions) - # size_func = lambda x: np.abs(x.bounds.left - x.bounds.right) * np.abs(x.bounds.top - x.bounds.bottom) - def size_func(x): - return np.abs(x.bounds.left - x.bounds.right) * np.abs(x.bounds.top - x.bounds.bottom) - - # key_sort_func = lambda x: x["size"] - def key_sort_func(x): - return x['size'] - - datasets = [rasterio.open(ds) for ds in inundation_rsts] - ds_dict = [{"dataset": ds, "size": size_func(ds)} for ds in datasets] - ds_dict.sort(key=key_sort_func, reverse=True) - - # load sample overlapping inundation depth rasters - self.depth_rsts = [x["dataset"] for x in ds_dict] - del ds_dict - - self.rst_dims = [[x.height, x.width] for x in self.depth_rsts] - - self.res = self.depth_rsts[0].meta["transform"][0] - self.depth_bounds = ( - np.array( - [[[x.bounds.top, x.bounds.left], [x.bounds.bottom, x.bounds.right]] for x in self.depth_rsts] - ) - / self.res - ) - - # get transform, width, height and bounds - (self.proc_unit_transform, self.proc_unit_width, self.proc_unit_height, final_bounds) = ( - self.get_final_dims() - ) - - self.proc_unit_bounds = np.array( - [[final_bounds["top"], final_bounds["left"]], [final_bounds["bottom"], final_bounds["right"]]] - ) - - self.proc_unit_bounds = self.proc_unit_bounds / self.res - - self.lat_lon_sign = [ - np.sign(self.proc_unit_bounds[1, 0] - self.proc_unit_bounds[0, 0]), - np.sign(self.proc_unit_bounds[1, 1] - self.proc_unit_bounds[0, 1]), - ] - - self.partitions = num_partitions - self.window_sizes = window_xy_size - - @staticmethod - @njit - def get_res_bbox_min(x, v, z, y): - """ - Optimize for bounds that fit the final resolution - - :param x: float of compare - :param v: float representing min bound - :param z: float representing max bound - :param y: float representing resolution - """ - return np.abs(z - x) - np.round(np.abs(z - v) / y) * y - - def get_final_dims(self): - """ - Get transform, width, height, and bbox of final dataset - - :return: Affine transform, int width, int height, dict bounds - """ - - left = np.min([d.bounds.left for d in self.depth_rsts]) - top = np.max([d.bounds.top for d in self.depth_rsts]) - right = np.max([d.bounds.right for d in self.depth_rsts]) - bottom = np.min([d.bounds.bottom for d in self.depth_rsts]) - - left = newton(self.get_res_bbox_min, left, args=(left, right, self.res)) - bottom = newton(self.get_res_bbox_min, bottom, args=(bottom, top, self.res)) - - transform = self.depth_rsts[0].meta["transform"] - - width = int(np.abs(right - left) / self.res) - height = int(np.abs(top - bottom) / self.res) - new_transform = Affine(transform[0], transform[1], left, transform[3], transform[4], top) - - return new_transform, width, height, {"left": left, "top": top, "right": right, "bottom": bottom} - - def get_window_coords(self): - """ - Return ul/br bounds of window and its respective window idx - - :param partitions: tuple or list of partition sizes for x and y - :param sizes: tuple or list of pixel sizes for x and y - :return: list of ul/br bounds of window, int of respective window idx - """ - - # Set up desired number of partitions (can also be set pixel size) - if self.partitions is not None: - x_res, y_res = self.partitions - elif self.window_sizes is not None: - x_res, y_res = self.window_sizes - else: - raise ("in bran crunch") - - # Get window widths (both normal and edge windows) - window_width1 = np.repeat(int(self.proc_unit_width / x_res), x_res) * self.lat_lon_sign[1] - window_width2 = window_width1.copy() - window_width2[-1] += self.proc_unit_width - window_width1[0] * x_res * self.lat_lon_sign[1] - - # Get window heights (both normal and edge windows) - window_height1 = np.repeat(int(self.proc_unit_height / y_res), y_res) * self.lat_lon_sign[0] - window_height2 = window_height1.copy() - window_height2[-1] += self.proc_unit_height - window_height1[0] * y_res * self.lat_lon_sign[0] - - # Get window sizes (both normal and edge windows) - window_bounds1 = np.flip( - np.array(np.meshgrid(window_width1, window_height1)).T.reshape(-1, 2), axis=1 - ).astype(int) - window_bounds2 = np.flip( - np.array(np.meshgrid(window_width2, window_height2)).T.reshape(-1, 2), axis=1 - ).astype(int) - - window_idx = np.array(np.unravel_index(np.arange(y_res * x_res), (y_res, x_res), order="F")) - - return [window_bounds1, window_bounds2], window_idx - - def create_lat_lons(self, window_bounds, window_idx): - """ - Return bbox of window and list of latitudes and longitudes - - :param window_bounds: tuple or list of partition sizes for x and y - :param window_idx: int representing index of window - :return: list of float latitudes, list of float longitudes, list of window bbox, - list of ul/br coords for window - """ - - upper_left = window_idx.T * window_bounds[0] - lower_right = upper_left + window_bounds[1] - - # Merge point arrays, convert back to original units, and get drawable path for each window - bbox = np.hstack([upper_left, lower_right]) - scaled_path_points = [ - np.array(np.meshgrid([st[0], st[2]], [st[1], st[3]])).T.reshape(-1, 2) for st in bbox - ] - path_points = (scaled_path_points + self.proc_unit_bounds[0]) * self.res - - # Create a range of latitudes and longitudes and add half of window size - latitudes = np.arange( - self.proc_unit_bounds[0, 0], - self.proc_unit_bounds[1, 0] + self.lat_lon_sign[0], - window_bounds[1][0][0], - )[:-1] + (window_bounds[1][0][0] / 2) - longitudes = np.arange( - self.proc_unit_bounds[0, 1], - self.proc_unit_bounds[1, 1] + self.lat_lon_sign[1], - window_bounds[1][0][1], - )[:-1] + (window_bounds[1][0][1] / 2) - - return latitudes, longitudes, path_points, bbox - - @staticmethod - def get_window_idx(latitudes, longitudes, coords, partitions): - """ - Return raveled window indices - - :param latitudes: list of latitudes within bounds - :param longitudes: list of longitudes within bounds - - :return: ndarray of raveled multi indexes - """ - # Get difference of upper-left and lower-right boundaries and computed lat lons - lat_dif = [np.abs(latitudes - coords[0, 0]), np.abs(latitudes - coords[1, 0])] - lon_dif = [np.abs(longitudes - coords[0, 1]), np.abs(longitudes - coords[1, 1])] - - # Create range between the closest idx for both lats and lons - lon_range = np.arange(np.argmin(lon_dif[0]), np.argmin(lon_dif[1]) + 1) - lat_range = np.arange(np.argmin(lat_dif[0]), np.argmin(lat_dif[1]) + 1) - - # Create mesh grid for each possible set of coords and ravel to get window idx - grid = np.array(np.meshgrid(lat_range, lon_range)).T.reshape(-1, 2) - del lon_range, lat_range, lat_dif, lon_dif - return np.ravel_multi_index([grid[:, 0], grid[:, 1]], partitions, order="F") - - def read_rst_data(self, win_idx, datasets, path_points, bbox, meta): - """ - Return data windows and final bounds of window - - :param win_idx: int window index - :param datasets: list of int representing dataset inx - :param path_points: list of bbox for windows - :param bbox: list of ul/br coords of windows - :param meta: metadata for final dataset - - :return: rasterio window object for final window, rasterio window of data window bounds, - data for each raster in window, - """ - # Get window bounding box and get final array output dimensions - window = path_points[win_idx] - window_height, window_width = np.array( - [np.abs(bbox[win_idx][2] - bbox[win_idx][0]), np.abs(bbox[win_idx][3] - bbox[win_idx][1])] - ).astype(int) - - bnds, data, str_dtype = [], [], str(meta['dtype']) - for ds in datasets: - # Get rasterio window for each pair of window bounds and depth dataset - - bnd = from_bounds( - window[0][1], - window[-1][0], - window[-1][1], - window[0][0], - transform=self.depth_rsts[ds].transform, - ) - - bnds.append(bnd) - - # Read raster data with window - read_data = self.depth_rsts[ds].read(1, window=bnd) - - if 'float' in str_dtype: - # Convert all no data to nan values - read_data[read_data == self.depth_rsts[ds].meta["nodata"]] = np.nan - - elif 'int' in str_dtype: - - read_data[read_data == self.depth_rsts[ds].meta["nodata"]] = np.iinfo(str_dtype).min - - else: - raise ValueError('Unsupported dtype') - - data.append(read_data) - del bnd - - final_bnds = from_bounds( - window[0][1], window[-1][0], window[-1][1], window[0][0], transform=meta["transform"] - ) - - return [final_bnds, bnds, data] - - def merge_rasters(self, out_fname, nodata=0, threaded=False, workers=4, quiet=True): - """ - Merge multiple raster datasets - - :param out_fname: str path for final merged dataset - :param nodata: int/float representing no data value - """ - - window_bounds, window_idx = self.get_window_coords() - latitudes, longitudes, path_points, bbox = self.create_lat_lons(window_bounds, window_idx) - - windows = [ - self.get_window_idx(latitudes, longitudes, coords, self.partitions) - for coords in self.depth_bounds - ] - - # Create dict with window idx key and dataset idx vals - data_dict = {} - for idx, win in enumerate(windows): - for win_idx in win: - if win_idx in data_dict: - data_dict[win_idx].append(idx) - else: - data_dict[win_idx] = [idx] - - meta = self.depth_rsts[0].meta - - agg_function = partial(np.max, axis=0) if 'int' in str(meta['dtype']) else partial(np.nanmax, axis=0) - - meta.update( - transform=self.proc_unit_transform, - width=self.proc_unit_width, - height=self.proc_unit_height, - nodata=nodata, - blockxsize=256, - blockysize=256, - tiled=True, - compress="lzw", - ) - - def __data_generator(data_dict, path_points, bbox, meta): - for key, val in data_dict.items(): - f_window, window, dat = self.read_rst_data(key, val, path_points, bbox, meta) - yield dat, window, f_window, val - # final_windows.append(f_window) - # data_windows.append(window) - # data.append(dat) - # del f_window, window, dat - - # create data generator - dgen = __data_generator(data_dict, path_points, bbox, meta) - - lock = Lock() - - with rasterio.open(out_fname, "w+", **meta) as rst: - merge_partial = partial( - merge_data, - rst=rst, - lock=lock, - dtype=meta["dtype"], - agg_function=agg_function, - nodata=meta["nodata"], - rst_dims=self.rst_dims, - ) - - if not threaded: - # for d, dw, fw, ddict in zip(data, - # data_windows, - # final_windows, - # data_dict.values()): - for d, dw, fw, ddict in dgen: - merge_partial(d, dw, fw, ddict) - else: - # start up thread pool - executor = ThreadPoolExecutor(max_workers=workers) - results = {executor.submit(merge_partial, *wg): 1 for wg in dgen} - - for future in as_completed(results): - try: - future.result() - except Exception as exc: - _vprint("Exception {} for {}".format(exc, results[future]), not quiet) - else: - if results[future] is not None: - _vprint("... {} complete".format(results[future]), not quiet) - else: - _vprint("... complete", not quiet) - - @staticmethod - def mask_mosaic(mosaic, polys, polys_layer=None, outfile=None, workers=4, quiet=True): - - if isinstance(mosaic, str): - with rasterio.open(mosaic, 'r') as rst: - windows = [windows for _, windows in rst.block_windows()] - profile = rst.profile - elif isinstance(mosaic, rasterio.DatasetReader): - pass - else: - raise TypeError("Pass rasterio dataset or filepath for mosaic") - - if isinstance(polys, str): - polys = gpd.read_file(polys, layer=polys_layer) - elif isinstance(polys, gpd.GeoDataFrame): - pass - else: - raise TypeError("Pass geopandas dataset or filepath for catchment polygons") - - mosaic_read = rxr.open_rasterio(mosaic) - mosaic_read = mosaic_read.sel({'band': 1}) - geom = polys['geometry'].values[0] - - def write_window(geom, window, wrst, lock): - mosaic_slice = mosaic.isel( - y=slice(window.row_off, window.row_off + window.height), - x=slice(window.col_off, window.col_off + window.width), - ) - bbox = box(*mosaic_slice.rio.bounds()) - - if geom.intersects(bbox): - - inter = geom.intersection(bbox) - - if inter.area != bbox.area: - gdf_temp = gpd.GeoDataFrame(geometry=[inter], crs=mosaic_slice.rio.crs) - gdf_temp['arb'] = np.int8(1) - temp_rast = make_geocube(vector_data=gdf_temp, measurements=['arb'], like=mosaic_slice) - mosaic_slice.data = xr.where(np.isnan(temp_rast['arb']), 0, mosaic_slice.data) - # with lock: - wrst.write_band(1, mosaic_slice.data.squeeze(), window=window) - - executor = ThreadPoolExecutor(max_workers=workers) - - def __data_generator(windows, mosaic, geom, wrst, lock): - for window in windows: - yield mosaic, geom, window, wrst, lock - - lock = Lock() - - with rasterio.open(outfile, "r+", **profile) as wrst: - dgen = __data_generator(windows, mosaic_read, geom, wrst, lock) - results = {executor.submit(write_window, *wg): 1 for wg in dgen} - - for future in as_completed(results): - try: - future.result() - except Exception as exc: - _vprint("Exception {} for {}".format(exc, results[future]), not quiet) - else: - if results[future] is not None: - _vprint("... {} complete".format(results[future]), not quiet) - else: - _vprint("... complete", not quiet) - - -# Quasi multi write -# Throughput achieved assuming processing time is not identical between windows -# and queued datasets, preferably approx N/2 threads for 9 windows -# @njit -def merge_data( - rst_data, window_bnds, final_window, datasets, dtype, rst, lock, agg_function, nodata, rst_dims -): - """ - Merge data in to final dataset (multi threaded) - - :param rst_data: list of rst data from window - :param window_bnds: list rasterio windows representing data window bounds - :param final_window: rasterio window representing final window bounds - :param datasets: list of int representing dataset idx - :param dtype: data type of final output - :param rst: rasterio writer for final dataset - :param lock: thread concurrency lock - :param agg_function: function to aggregate datasets - :param nodata: nodata of final output - :param rst_dims: dimensions of overlapping rasters - """ - - nan_tile = np.array([nodata]) - window_data = np.tile(nan_tile, [int(final_window.height), int(final_window.width)]) - - for data, bnds, idx in zip(rst_data, window_bnds, datasets): - # Get indices to apply to base - - col_slice = slice( - int(np.max([0, np.ceil(bnds.col_off * -1)])), - int(np.min([bnds.width, rst_dims[idx][1] - bnds.col_off])), - ) - - row_slice = slice( - int(np.max([0, np.ceil(bnds.row_off * -1)])), - int(np.min([bnds.height, rst_dims[idx][0] - bnds.row_off])), - ) - - win_shape = window_data[row_slice, col_slice].shape - - if not np.all(np.sign(np.array(win_shape) - np.array(data.shape)) > 0): - data = data[: win_shape[0], : win_shape[1]] - # Assign the data to the base array with aggregate function - merge = [window_data[row_slice, col_slice], data] - - del data - - with warnings.catch_warnings(): - # This `with` block supresses the RuntimeWarning thrown by numpy when aggregating nan values - warnings.simplefilter("ignore", category=RuntimeWarning) - window_data[row_slice, col_slice] = agg_function(merge) - - # window_data[np.isnan(window_data)] = nodata - del merge - - del rst_data, window_bnds, datasets - - window_data[(window_data == nan_tile)] = nodata - - with lock: - rst.write_band(1, window_data, window=final_window) - del window_data - - -if __name__ == "__main__": - # import tracemalloc - import glob - import time - - # print('start', time.localtime()) - # project_path = r'../documentation/data' - # overlap = OverlapWindowMerge([project_path + '/overlap1.tif', - # project_path + '/overlap2.tif', - # project_path + '/overlap3.tif', - # ], - # (3, 3)) - # overlap.merge_rasters(project_path + '/merged_overlap.tif', nodata=0) - # print('end', time.localtime()) - # tracemalloc.start() - print("start", time.localtime()) - # project_path = r'../documentation/data' - # project_path = '*/mosaicing_data/1_fr_ms_composite' - # overlap = OverlapWindowMerge([project_path + '/inundation_extent_12090301_FR.tif', - # project_path + '/inundation_extent_12090301_MS.tif' - # ], - # (30, 30)) - # overlap.merge_rasters(project_path + '/merged_final5.tif', threaded=True, workers=4, nodata=0) - - # tracemalloc.start() - print("start", time.localtime()) - # project_path = r'../documentation/data' - # project_path = '*/mosaicing_data/2_gms' - # a = glob.glob(project_path + '/inundation*.tif') - # overlap = OverlapWindowMerge(a, - # (30, 30)) - # overlap.merge_rasters(project_path + '/merged_final5.tif', threaded=True, workers=4, nodata=-2e9) - # current, peak = tracemalloc.get_traced_memory() - # print(f"Current memory usage is {current / 10 ** 6}MB; Peak was {peak / 10 ** 6}MB") - # tracemalloc.stop() - - project_path = "*" - overlap = OverlapWindowMerge( - [project_path + "/nwm_resampled.tif", project_path + "/rnr_inundation_031403_2020092000.tif"], (1, 1) - ) - overlap.merge_rasters(project_path + "/merged_final5.tif", threaded=False, workers=4) - - print("end", time.localtime())