From b07c3fba83c2b757134a35ba10d258fd411b96ab Mon Sep 17 00:00:00 2001 From: Ivan Ivanov Date: Fri, 11 Oct 2024 09:29:15 -0700 Subject: [PATCH 1/7] add pyramids on cpu based on Joarao's code --- biahub/cli/pyramid.py | 108 ++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 108 insertions(+) create mode 100644 biahub/cli/pyramid.py diff --git a/biahub/cli/pyramid.py b/biahub/cli/pyramid.py new file mode 100644 index 00000000..40eae470 --- /dev/null +++ b/biahub/cli/pyramid.py @@ -0,0 +1,108 @@ +import datetime +import itertools +from pathlib import Path +from typing import List, Optional, Protocol, Sequence, Union + +import click +import numpy as np + +from iohub.ngff import Position, open_ome_zarr +from slurmkit import SlurmParams, slurm_function, submit_function + +from skimage.transform import downscale_local_mean + + +class CreatePyramids(Protocol): + def __call__( + self, + fov: Position, + iterator: Sequence[Union[tuple[int, int], int]], + levels: int, + dependencies: Optional[List[Optional[int]]], + ) -> None: + ... + + +def no_pyramids(*args, **kwargs) -> None: + pass + + +@slurm_function +def pyramid(t: int, c: int, fov: Position, levels: int) -> None: + factors = (2,) * (fov.data.ndim - 2) + + # Add click.echo messages + array = fov["0"][t, c] + for level in range(1, levels): + array = downscale_local_mean(array, factors) + fov[str(level)][t, c] = array + + +def create_pyramids( + fov: Position, + iterator: Sequence[Union[tuple[int, int], int]], + levels: int, + dependencies: Optional[List[Optional[int]]], +) -> None: + """Creates additional levels of multi-scales pyramid.""" + + if dependencies is None: + dependencies = [None] * len(iterator) + + elif len(dependencies) != len(iterator): + raise ValueError( + f"Number of dependencies ({len(dependencies)}) must match iterator length ({len(iterator)})." + ) + + params = SlurmParams( + partition="preempted", + cpus_per_task=16, + mem="128G", + time=datetime.timedelta(minutes=30), + output="slurm_output/pyramid-%j.out", + ) + + if "1" not in fov.array_keys(): + fov.initialize_pyramid(levels=levels) + + pyramid_func = pyramid(fov=fov, levels=levels) + try: + for i, (t, c) in enumerate(iterator): # type: ignore[misc] + submit_function( + pyramid_func, params, t=t, c=c, dependencies=dependencies[i] + ) + + except TypeError: + for i, t in enumerate(iterator): + for c in range(fov.data.shape[1]): + submit_function( + pyramid_func, params, t=t, c=c, dependencies=dependencies[i] + ) + + +@click.command("pyramid") +# can't import from parsing due to circular import +@click.argument("paths", type=click.Path(exists=True, path_type=Path), nargs=-1) +@click.option( + "--levels", + "-l", + type=int, + default=4, + show_default=True, + help="Number of down-sampling levels.", +) +def pyramid_cli(paths: Sequence[Path], levels: int) -> None: + """Creates additional levels of multi-scales pyramid.""" + + for path in paths: + fov = open_ome_zarr(path, layout="fov", mode="a") + + iterator = list( + itertools.product(range(fov.data.shape[0]), range(fov.data.shape[1])) + ) + + create_pyramids(fov, iterator, levels, dependencies=None) + + +if __name__ == "__main__": + pyramid_cli() From 8972dd6d02f3e3249f4c464f0a999352675b17af Mon Sep 17 00:00:00 2001 From: Ivan Ivanov Date: Wed, 23 Apr 2025 11:00:18 -0700 Subject: [PATCH 2/7] move pyramid --- biahub/{cli => }/pyramid.py | 17 +++++------------ 1 file changed, 5 insertions(+), 12 deletions(-) rename biahub/{cli => }/pyramid.py (86%) diff --git a/biahub/cli/pyramid.py b/biahub/pyramid.py similarity index 86% rename from biahub/cli/pyramid.py rename to biahub/pyramid.py index 40eae470..c6ded469 100644 --- a/biahub/cli/pyramid.py +++ b/biahub/pyramid.py @@ -1,15 +1,14 @@ import datetime import itertools + from pathlib import Path from typing import List, Optional, Protocol, Sequence, Union import click -import numpy as np from iohub.ngff import Position, open_ome_zarr -from slurmkit import SlurmParams, slurm_function, submit_function - from skimage.transform import downscale_local_mean +from slurmkit import SlurmParams, slurm_function, submit_function class CreatePyramids(Protocol): @@ -68,16 +67,12 @@ def create_pyramids( pyramid_func = pyramid(fov=fov, levels=levels) try: for i, (t, c) in enumerate(iterator): # type: ignore[misc] - submit_function( - pyramid_func, params, t=t, c=c, dependencies=dependencies[i] - ) + submit_function(pyramid_func, params, t=t, c=c, dependencies=dependencies[i]) except TypeError: for i, t in enumerate(iterator): for c in range(fov.data.shape[1]): - submit_function( - pyramid_func, params, t=t, c=c, dependencies=dependencies[i] - ) + submit_function(pyramid_func, params, t=t, c=c, dependencies=dependencies[i]) @click.command("pyramid") @@ -97,9 +92,7 @@ def pyramid_cli(paths: Sequence[Path], levels: int) -> None: for path in paths: fov = open_ome_zarr(path, layout="fov", mode="a") - iterator = list( - itertools.product(range(fov.data.shape[0]), range(fov.data.shape[1])) - ) + iterator = list(itertools.product(range(fov.data.shape[0]), range(fov.data.shape[1]))) create_pyramids(fov, iterator, levels, dependencies=None) From d4f04ce979e69911fbb4cb1df077cbf6d0845c05 Mon Sep 17 00:00:00 2001 From: Sricharan Reddy Varra Date: Thu, 16 Oct 2025 23:39:21 -0700 Subject: [PATCH 3/7] added downscaling --- biahub/cli/main.py | 5 ++ biahub/pyramid.py | 186 ++++++++++++++++++++++++++++----------------- 2 files changed, 123 insertions(+), 68 deletions(-) diff --git a/biahub/cli/main.py b/biahub/cli/main.py index 9093f5fe..93f39169 100644 --- a/biahub/cli/main.py +++ b/biahub/cli/main.py @@ -79,6 +79,11 @@ def format_options(self, ctx, formatter): "import_path": "biahub.optimize_registration.optimize_registration_cli", "help": "Optimize transform based on match filtering", }, + { + "name": "pyramid", + "import_path": "biahub.pyramid.pyramid_cli", + "help": "Create pyramid levels for a dataset", + }, { "name": "register", "import_path": "biahub.register.register_cli", diff --git a/biahub/pyramid.py b/biahub/pyramid.py index c6ded469..3d537604 100644 --- a/biahub/pyramid.py +++ b/biahub/pyramid.py @@ -1,100 +1,150 @@ import datetime -import itertools from pathlib import Path -from typing import List, Optional, Protocol, Sequence, Union +from typing import List, Optional import click +import submitit +import tensorstore as ts -from iohub.ngff import Position, open_ome_zarr -from skimage.transform import downscale_local_mean -from slurmkit import SlurmParams, slurm_function, submit_function +from iohub.ngff import open_ome_zarr - -class CreatePyramids(Protocol): - def __call__( - self, - fov: Position, - iterator: Sequence[Union[tuple[int, int], int]], - levels: int, - dependencies: Optional[List[Optional[int]]], - ) -> None: - ... +from biahub.cli.parsing import ( + input_position_dirpaths, + local, + sbatch_filepath, + sbatch_to_submitit, +) -def no_pyramids(*args, **kwargs) -> None: - pass +def pyramid(fov_path: Path, levels: int, method: str) -> None: + """ + Create pyramid levels for a single field of view using tensorstore downsampling. + This function uses cascade downsampling, where each level is downsampled from + the previous level rather than from level 0. This avoids aliasing artifacts + and chunk boundary issues that occur with large downsample factors. -@slurm_function -def pyramid(t: int, c: int, fov: Position, levels: int) -> None: - factors = (2,) * (fov.data.ndim - 2) + Parameters + ---------- + fov_path : Path + Path to the FOV position directory + levels : int + Number of downsampling levels to create + method : str + Downsampling method (e.g., 'mean', 'max', 'min') + """ + with open_ome_zarr(fov_path, mode="r+") as dataset: + dataset.initialize_pyramid(levels=levels) - # Add click.echo messages - array = fov["0"][t, c] - for level in range(1, levels): - array = downscale_local_mean(array, factors) - fov[str(level)][t, c] = array + for level in range(1, levels): + previous_level = dataset[str(level - 1)].tensorstore() + current_scale = dataset.get_effective_scale(str(level)) + previous_scale = dataset.get_effective_scale(str(level - 1)) + downsample_factors = [ + int(round(current_scale[i] / previous_scale[i])) + for i in range(len(current_scale)) + ] -def create_pyramids( - fov: Position, - iterator: Sequence[Union[tuple[int, int], int]], - levels: int, - dependencies: Optional[List[Optional[int]]], -) -> None: - """Creates additional levels of multi-scales pyramid.""" - - if dependencies is None: - dependencies = [None] * len(iterator) - - elif len(dependencies) != len(iterator): - raise ValueError( - f"Number of dependencies ({len(dependencies)}) must match iterator length ({len(iterator)})." - ) - - params = SlurmParams( - partition="preempted", - cpus_per_task=16, - mem="128G", - time=datetime.timedelta(minutes=30), - output="slurm_output/pyramid-%j.out", - ) + click.echo(f" Level {level}: factors {downsample_factors} (from level {level-1})") - if "1" not in fov.array_keys(): - fov.initialize_pyramid(levels=levels) + downsampled = ts.downsample( + previous_level, downsample_factors=downsample_factors, method=method + ) - pyramid_func = pyramid(fov=fov, levels=levels) - try: - for i, (t, c) in enumerate(iterator): # type: ignore[misc] - submit_function(pyramid_func, params, t=t, c=c, dependencies=dependencies[i]) + target_store = dataset[str(level)].tensorstore() + target_store[:].write(downsampled[:].read().result()).result() - except TypeError: - for i, t in enumerate(iterator): - for c in range(fov.data.shape[1]): - submit_function(pyramid_func, params, t=t, c=c, dependencies=dependencies[i]) + click.echo(f"Completed pyramid for FOV: {fov_path}") @click.command("pyramid") -# can't import from parsing due to circular import -@click.argument("paths", type=click.Path(exists=True, path_type=Path), nargs=-1) +@input_position_dirpaths() +@sbatch_filepath() +@local() @click.option( "--levels", - "-l", + "-lv", type=int, default=4, show_default=True, - help="Number of down-sampling levels.", + help="Number of downsampling levels to create.", ) -def pyramid_cli(paths: Sequence[Path], levels: int) -> None: - """Creates additional levels of multi-scales pyramid.""" +@click.option( + "--method", + "-m", + type=click.Choice( + [ + "stride", + "median", + "mode", + "mean", + "min", + "max", + ] + ), + default="mean", + show_default=True, + help="Downsampling method to use.", +) +def pyramid_cli( + input_position_dirpaths: List[Path], + levels: int, + method: str, + sbatch_filepath: Optional[Path], + local: bool, +) -> None: + """ + Creates additional levels of multi-scale pyramids for OME-Zarr datasets. + + Uses efficient downsampling to create pyramid levels + in parallel. Each field-of-view (FOV) is processed as a separate SLURM job, + downsampling all timepoints and channels. The pyramids are created in-place + within the input zarr store using the specified downsampling method (default: 'mean'). + + Example: + biahub pyramid -i ./data.zarr/0/0/0 -lv 4 --method max + biahub pyramid -i ./data.zarr/*/*/* --levels 5 --local + """ + cluster = "local" if local else "slurm" + + slurm_args = { + "slurm_job_name": "pyramid", + "slurm_partition": "preempted", + "slurm_cpus_per_task": 16, + "slurm_mem_per_cpu": "8G", + "slurm_time": 30, + "slurm_array_parallelism": 100, + } + + # Override with sbatch file parameters if provided + if sbatch_filepath: + slurm_args.update(sbatch_to_submitit(sbatch_filepath)) + + slurm_out_path = Path("slurm_output") + slurm_out_path.mkdir(exist_ok=True) + + executor = submitit.AutoExecutor(folder=slurm_out_path, cluster=cluster) + executor.update_parameters(**slurm_args) + + click.echo( + f"Submitting {len(input_position_dirpaths)} pyramid jobs with resources: {slurm_args}" + ) - for path in paths: - fov = open_ome_zarr(path, layout="fov", mode="a") + jobs = [] + with submitit.helpers.clean_env(), executor.batch(): + for fov_path in input_position_dirpaths: + job = executor.submit(pyramid, fov_path=fov_path, levels=levels, method=method) + jobs.append(job) - iterator = list(itertools.product(range(fov.data.shape[0]), range(fov.data.shape[1]))) + job_ids = [job.job_id for job in jobs] + timestamp = datetime.datetime.now().strftime("%Y%m%d-%H%M%S") + log_path = slurm_out_path / f"pyramid-jobs_{timestamp}.log" + with log_path.open("w") as log_file: + log_file.write("\n".join(job_ids)) - create_pyramids(fov, iterator, levels, dependencies=None) + # wait_for_jobs_to_finish(jobs) if __name__ == "__main__": From f5b8a714fe4cf0e23bc95b0500fa689d2830bc81 Mon Sep 17 00:00:00 2001 From: Sricharan Reddy Varra Date: Fri, 17 Oct 2025 10:05:53 -0700 Subject: [PATCH 4/7] removed unnecessary echo --- biahub/pyramid.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/biahub/pyramid.py b/biahub/pyramid.py index 3d537604..8faff3f1 100644 --- a/biahub/pyramid.py +++ b/biahub/pyramid.py @@ -47,8 +47,6 @@ def pyramid(fov_path: Path, levels: int, method: str) -> None: for i in range(len(current_scale)) ] - click.echo(f" Level {level}: factors {downsample_factors} (from level {level-1})") - downsampled = ts.downsample( previous_level, downsample_factors=downsample_factors, method=method ) From 3dc096fc2a58a863be13284d6462da3640642ea1 Mon Sep 17 00:00:00 2001 From: Sricharan Reddy Varra Date: Mon, 20 Oct 2025 13:51:04 -0700 Subject: [PATCH 5/7] batched writing with chunks and transactions --- biahub/pyramid.py | 120 +++++++++++++++++++++++++++++++++++++--------- 1 file changed, 97 insertions(+), 23 deletions(-) diff --git a/biahub/pyramid.py b/biahub/pyramid.py index 8faff3f1..c235cb4e 100644 --- a/biahub/pyramid.py +++ b/biahub/pyramid.py @@ -1,13 +1,13 @@ import datetime from pathlib import Path -from typing import List, Optional +from typing import List, Literal, Optional import click import submitit import tensorstore as ts - from iohub.ngff import open_ome_zarr +from itertools import batched from biahub.cli.parsing import ( input_position_dirpaths, @@ -15,9 +15,62 @@ sbatch_filepath, sbatch_to_submitit, ) +from biahub.cli.slurm import wait_for_jobs_to_finish + + +def _write_ts_downsampled( + source_ts: ts.TensorStore, + target_ts: ts.TensorStore, + downsample_factors: list[int], + method: str, + level: int, + batch_size: int, +) -> None: + """ + Tensorstore downsampling with transaction + chunking. + + Parameters + ---------- + source_ts : ts.TensorStore + Source tensorstore Zarr to downsample from + target_ts : ts.TensorStore + Target tensorstore Zarr to write downsampled data to + downsample_factors : list[int] + Downsampling factors for each dimension + method : str + Downsampling method (e.g., 'mean', 'max', 'min') + level : int + Pyramid level being processed + batch_size : int + Number of chunks per batch/transaction + """ + downsampled = ts.downsample( + source_ts, downsample_factors=downsample_factors, method=method + ) + + step = target_ts.chunk_layout.write_chunk.shape[0] + + chunk_starts = range(0, downsampled.shape[0], step) + def write_batch(batch_starts): + """Write a batch of TensorStore chunks in a single transaction.""" + with ts.Transaction() as txn: + target_with_txn = target_ts.with_transaction(txn) -def pyramid(fov_path: Path, levels: int, method: str) -> None: + futures = [] + for start in batch_starts: + stop = min(start + step, downsampled.shape[0]) + future = target_with_txn[start:stop].write(downsampled[start:stop]) + futures.append((start, stop, future)) + + for start, stop, future in futures: + future.result() + + for batch in batched(chunk_starts, batch_size): + write_batch(batch) + + +def pyramid(fov_path: Path, levels: int, method: str, batch_size: int | Literal["auto"] = "auto") -> None: """ Create pyramid levels for a single field of view using tensorstore downsampling. @@ -33,11 +86,18 @@ def pyramid(fov_path: Path, levels: int, method: str) -> None: Number of downsampling levels to create method : str Downsampling method (e.g., 'mean', 'max', 'min') + batch_size : int | Literal["auto"] + Number of chunks per batch/transaction used by tensorstore to write downsampled data. + If "auto", uses the default batch size of 5. """ + # Handle "auto" batch_size + if batch_size == "auto": + batch_size = 5 + with open_ome_zarr(fov_path, mode="r+") as dataset: - dataset.initialize_pyramid(levels=levels) + dataset.initialize_pyramid(levels=levels + 1) - for level in range(1, levels): + for level in range(1, levels + 1): previous_level = dataset[str(level - 1)].tensorstore() current_scale = dataset.get_effective_scale(str(level)) @@ -47,14 +107,17 @@ def pyramid(fov_path: Path, levels: int, method: str) -> None: for i in range(len(current_scale)) ] - downsampled = ts.downsample( - previous_level, downsample_factors=downsample_factors, method=method - ) - target_store = dataset[str(level)].tensorstore() - target_store[:].write(downsampled[:].read().result()).result() + _write_ts_downsampled( + source_ts=previous_level, + target_ts=target_store, + downsample_factors=downsample_factors, + method=method, + level=level, + batch_size=batch_size, + ) - click.echo(f"Completed pyramid for FOV: {fov_path}") + click.echo(f"Completed pyramid for FOV: {fov_path}") @click.command("pyramid") @@ -86,25 +149,36 @@ def pyramid(fov_path: Path, levels: int, method: str) -> None: show_default=True, help="Downsampling method to use.", ) +@click.option( + "--batch-size", + "-bs", + type=int, + default="auto", + show_default=True, + help="Number of TensorStore chunks to write downsampled data in a single transaction.", +) def pyramid_cli( input_position_dirpaths: List[Path], - levels: int, - method: str, - sbatch_filepath: Optional[Path], - local: bool, + levels: int = 3, + method: str = "mean", + batch_size: int | Literal["auto"] = "auto", + sbatch_filepath: Optional[Path] = None, + local: bool = False, ) -> None: """ Creates additional levels of multi-scale pyramids for OME-Zarr datasets. - Uses efficient downsampling to create pyramid levels - in parallel. Each field-of-view (FOV) is processed as a separate SLURM job, - downsampling all timepoints and channels. The pyramids are created in-place - within the input zarr store using the specified downsampling method (default: 'mean'). + Uses tensorstore downsampling to generate progressively downscaled pyramid levels. + Setting levels=0 skips pyramid creation. For levels > 0, creates n additional pyramid + levels with 2^i downsampling (e.g., levels=3 creates 2x, 4x, and 8x downsampled versions). Example: - biahub pyramid -i ./data.zarr/0/0/0 -lv 4 --method max biahub pyramid -i ./data.zarr/*/*/* --levels 5 --local + biahub pyramid -i ./data.zarr/0/0/0 -lv 3 --method max """ + if levels == 0: + click.echo("No pyramid levels to create.") + return cluster = "local" if local else "slurm" slurm_args = { @@ -133,7 +207,9 @@ def pyramid_cli( jobs = [] with submitit.helpers.clean_env(), executor.batch(): for fov_path in input_position_dirpaths: - job = executor.submit(pyramid, fov_path=fov_path, levels=levels, method=method) + job = executor.submit( + pyramid, fov_path=fov_path, levels=levels, method=method, batch_size=batch_size + ) jobs.append(job) job_ids = [job.job_id for job in jobs] @@ -142,8 +218,6 @@ def pyramid_cli( with log_path.open("w") as log_file: log_file.write("\n".join(job_ids)) - # wait_for_jobs_to_finish(jobs) - if __name__ == "__main__": pyramid_cli() From 0daa5e95899586ac593b78f4b6d3088110e40703 Mon Sep 17 00:00:00 2001 From: Sricharan Reddy Varra Date: Mon, 20 Oct 2025 13:51:24 -0700 Subject: [PATCH 6/7] formatting --- biahub/pyramid.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/biahub/pyramid.py b/biahub/pyramid.py index c235cb4e..4b58e503 100644 --- a/biahub/pyramid.py +++ b/biahub/pyramid.py @@ -70,7 +70,9 @@ def write_batch(batch_starts): write_batch(batch) -def pyramid(fov_path: Path, levels: int, method: str, batch_size: int | Literal["auto"] = "auto") -> None: +def pyramid( + fov_path: Path, levels: int, method: str, batch_size: int | Literal["auto"] = "auto" +) -> None: """ Create pyramid levels for a single field of view using tensorstore downsampling. From 81b376888058d93322e2bf588d90a3decad5b9a3 Mon Sep 17 00:00:00 2001 From: Sricharan Reddy Varra Date: Mon, 20 Oct 2025 13:53:59 -0700 Subject: [PATCH 7/7] pre-commit --- biahub/pyramid.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/biahub/pyramid.py b/biahub/pyramid.py index 4b58e503..9f6fdcb8 100644 --- a/biahub/pyramid.py +++ b/biahub/pyramid.py @@ -1,13 +1,14 @@ import datetime +from itertools import batched from pathlib import Path from typing import List, Literal, Optional import click import submitit import tensorstore as ts + from iohub.ngff import open_ome_zarr -from itertools import batched from biahub.cli.parsing import ( input_position_dirpaths, @@ -15,7 +16,6 @@ sbatch_filepath, sbatch_to_submitit, ) -from biahub.cli.slurm import wait_for_jobs_to_finish def _write_ts_downsampled(