Skip to content

Commit 24de23e

Browse files
committed
download size estimate and parallel gzipping
1 parent 5f10fd8 commit 24de23e

File tree

4 files changed

+154
-77
lines changed

4 files changed

+154
-77
lines changed

CHANGELOG.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,10 @@
11
# Changelog
22

3+
## 1.2.1
4+
5+
- added parallel gzipping of assemblies in `bactopia-atb-formatter`
6+
- added size estimation to `bactopia-atb-formatter` output
7+
38
## 1.2.0
49

510
- added `bactopia-atb-downloader` to download All-the-Bacteria assemblies

bactopia/cli/atb/atb_downloader.py

Lines changed: 112 additions & 76 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,14 @@
1111
import bactopia
1212
from bactopia.atb import parse_atb_file_list
1313
from bactopia.ncbi import is_biosample, taxid2name
14-
from bactopia.utils import download_url, execute, file_exists, mkdir, validate_file
14+
from bactopia.utils import (
15+
download_url,
16+
execute,
17+
file_exists,
18+
mkdir,
19+
pgzip,
20+
validate_file,
21+
)
1522

1623
# Set up Rich
1724
stderr = rich.console.Console(stderr=True)
@@ -32,6 +39,7 @@
3239
"--atb-file-list-url",
3340
"--dry-run",
3441
"--progress",
42+
"--cpus",
3543
"--uncompressed",
3644
"--remove-archives",
3745
],
@@ -91,6 +99,11 @@
9199
is_flag=True,
92100
help="Show download progress bar",
93101
)
102+
@click.option(
103+
"--cpus",
104+
default=4,
105+
help="The total number of cpus to use for downloading and compressing",
106+
)
94107
@click.option(
95108
"--uncompressed",
96109
"-u",
@@ -125,6 +138,7 @@ def atb_downloader(
125138
atb_file_list_url,
126139
dry_run,
127140
progress,
141+
cpus,
128142
uncompressed,
129143
remove_archives,
130144
ncbi_api_key,
@@ -194,102 +208,124 @@ def atb_downloader(
194208
logging.error(f"Species not found in ATB file list: {query_species}")
195209
sys.exit(1)
196210
matched_samples = species[query_species]
211+
212+
# Estimate total size of downloads
213+
total_size = 0
214+
for archive, info in archives_to_download.items():
215+
total_size += float(info["size"])
216+
197217
logging.info(f"Found {len(matched_samples)} samples to extract")
198218
logging.debug(f"Samples: {matched_samples}")
199-
logging.info(f"Found {len(archives_to_download)} archives to download")
219+
logging.info(
220+
f"Found {len(archives_to_download)} archives (~{int(total_size):,} MB) to download"
221+
)
200222
if verbose:
201223
for archive in archives_to_download:
202224
logging.debug(f"Archive: {archive}")
203225

204226
# Check if archives exist, otherwise download
205-
logging.info(f"Downloading archives to: {outdir}/archives")
206-
mkdir(f"{outdir}/archives")
207-
for archive, info in archives_to_download.items():
208-
archive_path = f"{str(outdir)}/archives/{archive}"
209-
if file_exists(archive_path):
210-
logging.info(f"Using existing archive: {archive_path}")
211-
archive_path = validate_file(archive_path)
212-
else:
213-
logging.info(f"Downloading archive to: {archive_path}")
214-
if dry_run:
215-
logging.info(f"Would download: {info['url']} to {archive_path}")
227+
if not dry_run:
228+
logging.info(f"Downloading archives to: {outdir}/archives")
229+
mkdir(f"{outdir}/archives")
230+
for archive, info in archives_to_download.items():
231+
archive_path = f"{str(outdir)}/archives/{archive}"
232+
if file_exists(archive_path):
233+
logging.info(f"Using existing archive: {archive_path}")
234+
archive_path = validate_file(archive_path)
216235
else:
217-
archive_path = download_url(info["url"], archive_path, progress)
236+
logging.info(f"Downloading archive to: {archive_path}")
237+
if dry_run:
238+
logging.info(f"Would download: {info['url']} to {archive_path}")
239+
else:
240+
archive_path = download_url(info["url"], archive_path, progress)
218241

219-
# Extract each of the archives
220-
cleanup = []
221-
for archive, info in archives_to_download.items():
222-
archive_path = f"{str(outdir)}/archives/{archive}"
223-
if dry_run:
224-
logging.info(f"Would have extracted: {archive_path}")
225-
else:
226-
logging.info(f"Extracting: {archive_path}")
227-
stdout, stderr = execute(
228-
f"tar xf {archive_path} -C {outdir}", capture=True, allow_fail=True
229-
)
230-
cleanup_dir = f"{outdir}/{archive.replace('.tar.xz', '')}"
231-
logging.debug(f"Adding {cleanup_dir} to cleanup list")
232-
cleanup.append(f"{outdir}/{archive.replace('.tar.xz', '')}")
242+
# Extract each of the archives
243+
cleanup = []
244+
for archive, info in archives_to_download.items():
245+
archive_path = f"{str(outdir)}/archives/{archive}"
246+
if dry_run:
247+
logging.info(f"Would have extracted: {archive_path}")
248+
else:
249+
logging.info(f"Extracting: {archive_path}")
250+
stdout, stderr = execute(
251+
f"tar xf {archive_path} -C {outdir}", capture=True, allow_fail=True
252+
)
253+
cleanup_dir = f"{outdir}/{archive.replace('.tar.xz', '')}"
254+
logging.debug(f"Adding {cleanup_dir} to cleanup list")
255+
cleanup.append(f"{outdir}/{archive.replace('.tar.xz', '')}")
256+
else:
257+
logging.info("Would have downloaded and extracted archives")
233258

234259
# Move samples into species directories, then compress
235260
species_dirs = {}
236-
logging.info(f"Moving {len(matched_samples)} samples to: {outdir}")
237-
for i, sample in enumerate(matched_samples):
238-
if uncompressed:
239-
logging.info(f"Moving sample {i+1} of {len(matched_samples)}: {sample}")
240-
else:
241-
logging.info(
242-
f"Moving and compressing sample {i+1} of {len(matched_samples)}: {sample}"
243-
)
244-
info = samples[sample]
245-
species = info["species_sylph"].lower().replace(" ", "_")
246-
if species not in species_dirs:
247-
species_dirs[species] = True
248-
mkdir(f"{outdir}/{species}")
249-
250-
archive_file = f"{outdir}/{info['filename']}"
251-
if file_exists(archive_file):
252-
sample_filename = info["filename"].split("/")[-1]
253-
sample_out = f"{outdir}/{species}/{sample_filename}"
261+
needs_compression = []
262+
if not dry_run:
263+
logging.info(f"Moving {len(matched_samples)} samples to: {outdir}")
264+
for i, sample in enumerate(matched_samples):
265+
logging.debug(f"Moving sample {i+1} of {len(matched_samples)}: {sample}")
266+
info = samples[sample]
267+
species = info["species_sylph"].lower().replace(" ", "_")
268+
if species not in species_dirs:
269+
species_dirs[species] = True
270+
mkdir(f"{outdir}/{species}")
254271

272+
archive_file = f"{outdir}/{info['filename']}"
255273
if file_exists(archive_file):
256-
if (file_exists(sample_out) and not force) or ():
257-
logging.debug(
258-
f"Sample already exists: {sample_out}...skipping unless --force provided"
259-
)
260-
elif file_exists(f"{sample_out}.gz") and not force:
261-
logging.debug(
262-
f"Sample already exists: {sample_out}.gz...skipping unless --force provided"
263-
)
264-
else:
265-
logging.debug(f"Moving {archive_file} to {sample_out}")
266-
stdout, stderr = execute(
267-
f"mv {archive_file} {sample_out}", capture=True, allow_fail=True
268-
)
274+
sample_filename = info["filename"].split("/")[-1]
275+
sample_out = f"{outdir}/{species}/{sample_filename}"
269276

270-
# Compress unless --uncompressed provided
271-
if not uncompressed:
272-
logging.debug(f"Compressing {sample_out}")
277+
if file_exists(archive_file):
278+
if (file_exists(sample_out) and not force) or ():
279+
logging.debug(
280+
f"Sample already exists: {sample_out}...skipping unless --force provided"
281+
)
282+
283+
# Compress unless --uncompressed provided
284+
if not uncompressed:
285+
needs_compression.append(sample_out)
286+
elif file_exists(f"{sample_out}.gz") and not force:
287+
logging.debug(
288+
f"Sample already exists: {sample_out}.gz...skipping unless --force provided"
289+
)
290+
else:
291+
logging.debug(f"Moving {archive_file} to {sample_out}")
273292
stdout, stderr = execute(
274-
f"gzip --force {sample_out}", capture=True, allow_fail=True
293+
f"mv {archive_file} {sample_out}",
294+
capture=True,
295+
allow_fail=True,
275296
)
297+
298+
# Compress unless --uncompressed provided
299+
if not uncompressed:
300+
needs_compression.append(sample_out)
301+
else:
302+
logging.warning(f"Unable to find {info['filename']}")
276303
else:
277-
logging.warning(f"Unable to find {info['filename']}")
278-
else:
279-
logging.warning(f"{outdir}/{info['filename']}")
304+
logging.warning(f"{outdir}/{info['filename']}")
305+
306+
# Compress samples
307+
if len(needs_compression):
308+
logging.info(f"Compressing {len(needs_compression)} samples")
309+
pgzip(needs_compression, cpus)
280310

281-
# Cleanup
282-
for archive in cleanup:
283-
if file_exists(archive):
284-
logging.info(f"Removing extracted files: {archive}")
285-
stdout, stderr = execute(f"rm -rf {archive}", capture=True, allow_fail=True)
311+
# Cleanup
312+
for archive in cleanup:
313+
if file_exists(archive):
314+
logging.info(f"Removing extracted files: {archive}")
315+
stdout, stderr = execute(
316+
f"rm -rf {archive}", capture=True, allow_fail=True
317+
)
286318

287-
if remove_archives:
319+
if remove_archives:
320+
logging.info(
321+
"Provided --remove-archives, removing all downloaded archives in {outdir}/archives"
322+
)
323+
stdout, stderr = execute(
324+
f"rm -rf {outdir}/archives", capture=True, allow_fail=True
325+
)
326+
else:
288327
logging.info(
289-
"Provided --remove-archives, removing all downloaded archives in {outdir}/archives"
290-
)
291-
stdout, stderr = execute(
292-
f"rm -rf {outdir}/archives", capture=True, allow_fail=True
328+
"Would have moved samples to species directories and cleaned up archives"
293329
)
294330

295331

bactopia/utils.py

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
import requests
88
import tqdm
99
from executor import ExternalCommand, ExternalCommandFailed
10+
from tqdm.contrib.concurrent import process_map
1011

1112
NCBI_GENOME_SIZE_URL = (
1213
"https://ftp.ncbi.nlm.nih.gov/genomes/ASSEMBLY_REPORTS/species_genome_size.txt.gz"
@@ -47,6 +48,41 @@ def execute(
4748
return None
4849

4950

51+
def pgzip(files: list, cpus: int) -> list:
52+
"""
53+
Parallel gzip a list of files
54+
55+
Args:
56+
files (list): A list of files to gzip
57+
cpus (int): The number of cpus to use
58+
59+
Returns:
60+
list: A list of gzipped files
61+
"""
62+
return process_map(
63+
_gzip,
64+
files,
65+
max_workers=cpus,
66+
chunksize=1,
67+
bar_format="{l_bar}{bar:80}{r_bar}{bar:-80b}",
68+
desc="Gzipping",
69+
)
70+
71+
72+
def _gzip(filename: str) -> str:
73+
"""
74+
Gzip a file
75+
76+
Args:
77+
filename (str): The file to gzip
78+
79+
Returns:
80+
str: The path to the gzipped file
81+
"""
82+
stdout, stderr = execute(f"gzip --force {filename}", capture=True, allow_fail=True)
83+
return f"{filename}.gz"
84+
85+
5086
def get_platform() -> str:
5187
"""
5288
Get the platform of the executing machine

pyproject.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[tool.poetry]
22
name = "bactopia"
3-
version = "1.2.0"
3+
version = "1.2.1"
44
description = "A Python package for working with Bactopia"
55
authors = [
66
"Robert A. Petit III <robbie.petit@gmail.com>",

0 commit comments

Comments
 (0)