From 417069c43abe9f748ef55ff9182fd867d297e60f Mon Sep 17 00:00:00 2001 From: Simon Ye Date: Fri, 1 Mar 2019 16:40:22 -0500 Subject: [PATCH 1/2] Revamp of metagenomics rules Add support for krakenuniq, kaiju to replace kraken and diamond in default pipelines. Both rely on customized krakenuniq/kaiju packages for custom functionality, like loading database from pipe or printing out all taxa hits (kaiju). Krona fixed to use hit counts as scores in report files directly, instead of counting the individual read classifications (which takes a long time). --- pipes/WDL/workflows/classify_kraken.wdl | 5 +++++ 1 file changed, 5 insertions(+) create mode 100644 pipes/WDL/workflows/classify_kraken.wdl diff --git a/pipes/WDL/workflows/classify_kraken.wdl b/pipes/WDL/workflows/classify_kraken.wdl new file mode 100644 index 000000000..b4cb62ed9 --- /dev/null +++ b/pipes/WDL/workflows/classify_kraken.wdl @@ -0,0 +1,5 @@ +import "tasks_metagenomics.wdl" as metagenomics + +workflow classify_kraken { + call metagenomics.krakenuniq +} From 22378223386e9e7c7de22d2f24e756d9c7dcffc5 Mon Sep 17 00:00:00 2001 From: Simon Ye Date: Mon, 4 Mar 2019 13:13:39 -0500 Subject: [PATCH 2/2] Kraken2 depletion for human reads as alternative to bmtagger * Add kraken_unclassified command that combines p filtering with depletion * Less sensitive than bmtagger (tradeoffs) but much faster. Can be controlled with p. p=0.01-0.05 a close to bmtagger but not as aggressive. * Custom kraken2 needed to avoid executable name clashes with other kraken* packages. --- metagenomics.py | 84 +++++++++++++++-- pipes/WDL/dx-launcher/dx-yml-build | 2 +- pipes/rules/common.rules | 2 +- taxon_filter.py | 65 +++++++++++++ test/__init__.py | 5 +- test/pipelines/snakemake/snake.py | 2 +- test/unit/test_metagenomics.py | 133 +++++++++++++------------- test/unit/test_snake.py | 2 +- tools/kraken.py | 145 ++++++++++++++++++++++++++++- util/file.py | 2 +- 10 files changed, 365 insertions(+), 77 deletions(-) diff --git a/metagenomics.py b/metagenomics.py index 2356d078c..a100dfe9b 100755 --- a/metagenomics.py +++ b/metagenomics.py @@ -861,6 +861,77 @@ def kaiju(inBam, db, taxDb, outReport, outReads=None, threads=None): __commands__.append(('kaiju', parser_kaiju)) +def extract_kraken_unclassified(in_kraken_reads, in_bam, out_bam, filter_taxids=None, filter_children=None, tax_dir=None, p_threshold=0.05): + if filter_children: + assert tax_dir + db = ncbitax.TaxonomyDb(tax_dir, load_nodes=True, load_merged=True) + filter_taxids = collect_children(db.children, set(filter_taxids)) + + qnames = set() + with gzip.open(in_kraken_reads, 'rt') as f: + for line in f: + parts = line.split('\t') + classified = parts[0] == 'C' + qname = parts[1] + taxid = parts[2] + length = parts[3] + # Check if already filtered and P= added + if '=' in parts[4]: + p = float(parts[4].split('=')[1]) + kmer_str = parts[5] + else: + kmer_str = parts[4] + kmers = [kmer for kmer in kmer_str.rstrip().split(' ')] + kmer_counts = [] + for kmer in kmers: + t = kmer.split(':') + # Kraken2 marker for paired read joiner + if t[0] == '|': + continue + kmer_counts.append((t[0], int(t[1]))) + n_kmers = 0 + n_classified_kmers = 0 + for kmer_taxid, count in kmer_counts: + if kmer_taxid != 'A': + n_kmers += count + if kmer_taxid not in ('0', 'A'): + if filter_taxids is None: + n_classified_kmers += count + elif int(kmer_taxid) in filter_taxids: + n_classified_kmers += count + if n_kmers > 0: + p = n_classified_kmers / n_kmers + else: + p = 0 + + if p <= float(p_threshold): + if qname.endswith('/1') or qname.endswith('/2'): + qname = qname[:-2] + qnames.add(qname) + + in_sam_f = pysam.AlignmentFile(in_bam, 'rb', check_sq=False) + out_sam_f = pysam.AlignmentFile(out_bam, 'wb', template=in_sam_f) + + for sam1 in in_sam_f: + if sam1.query_name in qnames: + out_sam_f.write(sam1) + + +def parser_kraken_unclassified(parser=argparse.ArgumentParser()): + parser.add_argument('in_bam', metavar='inBam', help='Input original bam.') + parser.add_argument('in_kraken_reads', metavar='inKrakenReads', help='Input kraken reads.') + parser.add_argument('out_bam', metavar='outBam', help='Output extracted bam') + parser.add_argument('-p', '--pThreshold', dest='p_threshold', default=0.05, help='Kraken p threshold') + parser.add_argument('--filterTaxids', dest='filter_taxids', nargs='+', type=int, help='Taxids to filter/deplete out') + parser.add_argument('--filterChildren', dest='filter_children', action='store_true', help='Including filter taxid descendants when filtering - requires taxDb') + parser.add_argument('--taxDb', dest='tax_dir', help='Directory to NCBI taxonomy db') + util.cmd.common_args(parser, (('loglevel', None), ('version', None), ('tmp_dir', None))) + util.cmd.attach_main(parser, extract_kraken_unclassified, split_args=True) + return parser + +__commands__.append(('kraken_unclassified', parser_kraken_unclassified)) + + def sam_lca_report(tax_db, bam_aligned, outReport, outReads=None, unique_only=None): if outReads: @@ -950,7 +1021,6 @@ def metagenomic_report_merge(metagenomic_reports, out_kraken_summary, kraken_db, __commands__.append(('report_merge', parser_metagenomic_report_merge)) - def fasta_library_accessions(library): '''Parse accession from ids of fasta files in library directory. ''' library_accessions = set() @@ -962,7 +1032,7 @@ def fasta_library_accessions(library): for seqr in SeqIO.parse(filepath, 'fasta'): name = seqr.name # Search for accession - mo = re.search('([A-Z]+_?\d+\.\d+)', name) + mo = re.search(r'([A-Z]+_?\d+\.\d+)', name) if mo: accession = mo.group(1) library_accessions.add(accession) @@ -973,9 +1043,9 @@ class KrakenUniqBuildError(Exception): '''Error while building KrakenUniq database.''' def parser_filter_bam_to_taxa(parser=argparse.ArgumentParser()): - parser.add_argument('in_bam', help='Input bam file.') - parser.add_argument('read_IDs_to_tax_IDs', help='TSV file mapping read IDs to taxIDs, Kraken-format by default. Assumes bijective mapping of read ID to tax ID.') - parser.add_argument('out_bam', help='Output bam file, filtered to the taxa specified') + parser.add_argument('in_bam', metavar='inBam', help='Input bam file.') + parser.add_argument('reads_tsv', metavar='readsTsv', help='TSV file mapping read IDs to taxIDs, Kraken-format by default. Assumes bijective mapping of read ID to tax ID.') + parser.add_argument('out_bam', metavar='outBam', help='Output bam file, filtered to the taxa specified') parser.add_argument('nodes_dmp', help='nodes.dmp file from ftp://ftp.ncbi.nlm.nih.gov/pub/taxonomy/') parser.add_argument('names_dmp', help='names.dmp file from ftp://ftp.ncbi.nlm.nih.gov/pub/taxonomy/') parser.add_argument('--taxNames', nargs="+", dest="tax_names", help='The taxonomic names to include. More than one can be specified. Mapped to Tax IDs by lowercase exact match only. Ex. "Viruses" This is in addition to any taxonomic IDs provided.') @@ -992,7 +1062,7 @@ def parser_filter_bam_to_taxa(parser=argparse.ArgumentParser()): util.cmd.attach_main(parser, filter_bam_to_taxa, split_args=True) return parser -def filter_bam_to_taxa(in_bam, read_IDs_to_tax_IDs, out_bam, +def filter_bam_to_taxa(in_bam, reads_tsv, out_bam, nodes_dmp, names_dmp, tax_names=None, tax_ids=None, omit_children=False, @@ -1049,7 +1119,7 @@ def filter_bam_to_taxa(in_bam, read_IDs_to_tax_IDs, out_bam, with util.file.tempfname(".txt.gz") as temp_read_list: with open_or_gzopen(temp_read_list, "wt") as read_IDs_file: read_ids_written = 0 - for row in util.file.read_tabfile(read_IDs_to_tax_IDs): + for row in util.file.read_tabfile(reads_tsv): assert tax_id_col.+)"', req.info()['Content-Disposition']) + m = re.search(r'filename="(?P.+)"', req.info()['Content-Disposition']) if m: destFileName = m.group("filename") diff --git a/taxon_filter.py b/taxon_filter.py index e8b8cd413..ccdf35320 100755 --- a/taxon_filter.py +++ b/taxon_filter.py @@ -246,6 +246,71 @@ def parser_filter_lastal_bam(parser=argparse.ArgumentParser()): __commands__.append(('filter_lastal_bam', parser_filter_lastal_bam)) +# ============================== +# *** deplete kraken2 *** +# ============================== +def parser_deplete_bam_kraken2(parser=argparse.ArgumentParser()): + parser.add_argument('in_bam', metavar='inBam', help='Input BAM file.') + parser.add_argument('db', help='Kraken2 database directory.') + parser.add_argument('out_bam', metavar='outBam', help='Output BAM file.') + parser.add_argument('--taxDb', dest='tax_dir', help='Directory to NCBI taxonomy db') + parser.add_argument( + '--JVMmemory', + default=tools.picard.FilterSamReadsTool.jvmMemDefault, + help='JVM virtual memory size (default: %(default)s)' + ) + parser.add_argument('-p', '--pThreshold', dest='p_threshold', default=0.5, help='Kraken p threshold') + parser.add_argument('--filterTaxid', dest='filter_taxid', type=int, help='Taxid to filter/deplete out') + util.cmd.common_args(parser, (('loglevel', None), ('version', None), ('tmp_dir', None))) + util.cmd.attach_main(parser, main_deplete_bam_kraken2) + return parser + +def main_deplete_bam_kraken2(args): + '''Use kraken2 to deplete input reads against several databases.''' + + kraken2 = tools.kraken.Kraken2() + with util.file.tempfname() as k_reads_fn, util.file.tempfname() as k_report_fn: + kraken2.classify(args.db, args.tax_db, args.in_bam, + output_report=k_report_fn, output_reads=k_reads_fn, + num_threads=threads) + metagenomics.extract_kraken_unclassified( + k_reads_fn, args.in_bam, out_bam, + filter_taxid=args.filter_taxid, tax_dir=args.tax_dir, + p_threshold=args.p_threshold) + +__commands__.append(('deplete_bam_kraken2', parser_deplete_bam_kraken2)) + + +def multi_db_deplete_bam(inBam, refDbs, deplete_method, outBam, **kwargs): + + tmpDb = None + if len(refDbs)>1 and not any( + not os.path.exists(db) # indexed db prefix + or os.path.isdir(db) # indexed db in directory + or (os.path.isfile(db) and ('.tar' in db or '.tgz' in db or '.zip' in db)) # packaged indexed db + for db in refDbs): + # this is a scenario where all refDbs are unbuilt fasta + # files. we can simplify and speed up execution by + # concatenating them all and running deplete_method + # just once + tmpDb = mkstempfname('.fasta') + merge_compressed_files(refDbs, tmpDb, sep='\n') + refDbs = [tmpDb] + + samtools = tools.samtools.SamtoolsTool() + tmpBamIn = inBam + for db in refDbs: + if not samtools.isEmpty(tmpBamIn): + tmpBamOut = mkstempfname('.bam') + deplete_method(tmpBamIn, db, tmpBamOut, **kwargs) + if tmpBamIn != inBam: + os.unlink(tmpBamIn) + tmpBamIn = tmpBamOut + shutil.copyfile(tmpBamIn, outBam) + + if tmpDb: + os.unlink(tmpDb) + # ============================== # *** deplete_bmtagger_bam *** # ============================== diff --git a/test/__init__.py b/test/__init__.py index 75b09b2fe..df49634e6 100644 --- a/test/__init__.py +++ b/test/__init__.py @@ -55,7 +55,10 @@ def assert_equal_bam_reads(testCase, bam_filename1, bam_filename2): samtools.view(args=[], inFile=bam_filename2, outFile=sam_two) try: - testCase.assertTrue(filecmp.cmp(sam_one, sam_two, shallow=False)) + if testCase: + testCase.assertTrue(filecmp.cmp(sam_one, sam_two, shallow=False)) + else: + assert filecmp.cmp(sam_one, sam_two, shallow=False) finally: for fname in [sam_one, sam_two]: if os.path.exists(fname): diff --git a/test/pipelines/snakemake/snake.py b/test/pipelines/snakemake/snake.py index 042a3eb3f..d5148481b 100755 --- a/test/pipelines/snakemake/snake.py +++ b/test/pipelines/snakemake/snake.py @@ -71,7 +71,7 @@ def setup(self): os.symlink(self.root, self.bindir) os.symlink(join(self.root, 'pipes', 'Snakefile'), join(self.workdir, 'Snakefile')) with open(join(self.root, 'pipes', 'config.yaml')) as f: - config = yaml.load(f) + config = yaml.load(f, loader=yaml.FullLoader) self.config = merge_yaml_dicts(config, {'number_of_threads': _CPUS}) if self.override_config: self.config = merge_yaml_dicts(self.config, self.override_config) diff --git a/test/unit/test_metagenomics.py b/test/unit/test_metagenomics.py index 66f981268..20ed7ce57 100644 --- a/test/unit/test_metagenomics.py +++ b/test/unit/test_metagenomics.py @@ -6,6 +6,7 @@ import copy import os.path from os.path import join +import subprocess import tempfile import textwrap import unittest @@ -26,32 +27,22 @@ from io import StringIO -class TestCommandHelp(unittest.TestCase): +def test_help_parser_for_each_command(): + for cmd_name, parser_fun in metagenomics.__commands__: + parser = parser_fun(argparse.ArgumentParser()) + helpstring = parser.format_help() - def test_help_parser_for_each_command(self): - for cmd_name, parser_fun in metagenomics.__commands__: - parser = parser_fun(argparse.ArgumentParser()) - helpstring = parser.format_help() - -class TestKronaCalls(TestCaseWithTmp): - - def setUp(self): - super().setUp() - patcher = patch('tools.krona.Krona', autospec=True) - self.addCleanup(patcher.stop) - self.mock_krona = patcher.start() - - self.inTsv = util.file.mkstempfname('.tsv') - self.db = tempfile.mkdtemp('db') - - def test_krona_import_taxonomy(self): - out_html = util.file.mkstempfname('.html') - metagenomics.krona(self.inTsv, self.db, out_html, queryColumn=3, taxidColumn=5, scoreColumn=7, - noHits=True, noRank=True, inputType='tsv') - self.mock_krona().import_taxonomy.assert_called_once_with( - self.db, [self.inTsv], out_html, query_column=3, taxid_column=5, score_column=7, - no_hits=True, no_rank=True, magnitude_column=None, root_name=os.path.basename(self.inTsv)) +def test_krona_import_taxonomy(mocker): + p = mocker.patch('tools.krona.Krona', autospec=True) + out_html = util.file.mkstempfname('.html') + in_tsv = util.file.mkstempfname('.tsv') + db = tempfile.mkdtemp('db') + metagenomics.krona(in_tsv, db, out_html, queryColumn=3, taxidColumn=5, scoreColumn=7, + noHits=True, noRank=True, inputType='tsv') + p().import_taxonomy.assert_called_once_with( + db, [in_tsv], out_html, query_column=3, taxid_column=5, score_column=7, + no_hits=True, no_rank=True, magnitude_column=None, root_name=os.path.basename(in_tsv)) @pytest.fixture @@ -123,8 +114,7 @@ def ranks(): @pytest.fixture def simple_m8(): - test_path = join(util.file.get_test_input_path(), - 'TestTaxonomy') + test_path = join(util.file.get_test_input_path(), 'TestTaxonomy') return open(join(test_path, 'simple.m8')) @@ -170,8 +160,7 @@ def test_rank_code(): def test_blast_records(simple_m8): - test_path = join(util.file.get_test_input_path(), - 'TestTaxonomy') + test_path = join(util.file.get_test_input_path(), 'TestTaxonomy') with simple_m8 as f: records = list(metagenomics.blast_records(f)) assert len(records) == 110 @@ -180,8 +169,7 @@ def test_blast_records(simple_m8): def test_blast_lca(taxa_db_simple, simple_m8): - test_path = join(util.file.get_test_input_path(), - 'TestTaxonomy') + test_path = join(util.file.get_test_input_path(), 'TestTaxonomy') expected = textwrap.dedent("""\ C\tM04004:13:000000000-AGV3H:1:1101:12068:2105\t2 C\tM04004:13:000000000-AGV3H:1:1101:13451:2146\t2 @@ -283,45 +271,64 @@ def test_kaiju(mocker): p.assert_called_with('db.fmi', 'tax_db', 'input.bam', output_report='output.report', num_threads=mock.ANY, output_reads='output.reads') -class TestBamFilter(TestCaseWithTmp): - def test_bam_filter_simple(self): - temp_dir = tempfile.gettempdir() - input_dir = util.file.get_test_input_path(self) - taxonomy_dir = os.path.join(util.file.get_test_input_path(),"TestMetagenomicsSimple","db","taxonomy") +@pytest.fixture +def taxonomy_dir(): + return os.path.join(util.file.get_test_input_path(),"TestMetagenomicsSimple", "db", "taxonomy") + +def test_kraken_unclassified(taxonomy_dir): + input_dir = join(util.file.get_test_input_path(), 'TestBamFilter') + + for p, n_leftover in zip([0.05, 0.5], [37, 579]): filtered_bam = util.file.mkstempfname('.bam') - args = [ - os.path.join(input_dir,"input.bam"), - os.path.join(input_dir,"input.kraken-reads.tsv.gz"), + cmd_args = [ + os.path.join(input_dir, "input.bam"), + os.path.join(input_dir, "input.kraken-reads.tsv.gz"), filtered_bam, - os.path.join(taxonomy_dir,"nodes.dmp"), - os.path.join(taxonomy_dir,"names.dmp"), - "--taxNames", - "Ebolavirus" + '-p', str(p) ] - args = metagenomics.parser_filter_bam_to_taxa(argparse.ArgumentParser()).parse_args(args) + args = metagenomics.parser_kraken_unclassified(argparse.ArgumentParser()).parse_args(cmd_args) args.func_main(args) - expected_bam = os.path.join(input_dir,"expected.bam") - assert_equal_bam_reads(self, filtered_bam, expected_bam) + cmd = ['samtools', 'view', filtered_bam] + p = subprocess.run(cmd, stdout=subprocess.PIPE) + assert len(p.stdout.decode('utf-8').split('\n')) == n_leftover - def test_bam_filter_by_tax_id(self): - temp_dir = tempfile.gettempdir() - input_dir = util.file.get_test_input_path(self) - taxonomy_dir = os.path.join(util.file.get_test_input_path(),"TestMetagenomicsSimple","db","taxonomy") - filtered_bam = util.file.mkstempfname('.bam') - args = [ - os.path.join(input_dir,"input.bam"), - os.path.join(input_dir,"input.kraken-reads.tsv.gz"), - filtered_bam, - os.path.join(taxonomy_dir,"nodes.dmp"), - os.path.join(taxonomy_dir,"names.dmp"), - "--taxIDs", - "186538" - ] - args = metagenomics.parser_filter_bam_to_taxa(argparse.ArgumentParser()).parse_args(args) - args.func_main(args) +def test_bam_filter_simple(taxonomy_dir): + input_dir = join(util.file.get_test_input_path(), 'TestBamFilter') + + filtered_bam = util.file.mkstempfname('.bam') + args = [ + os.path.join(input_dir, "input.bam"), + os.path.join(input_dir, "input.kraken-reads.tsv.gz"), + filtered_bam, + os.path.join(taxonomy_dir, "nodes.dmp"), + os.path.join(taxonomy_dir, "names.dmp"), + "--taxNames", + "Ebolavirus" + ] + args = metagenomics.parser_filter_bam_to_taxa(argparse.ArgumentParser()).parse_args(args) + args.func_main(args) + + expected_bam = os.path.join(input_dir,"expected.bam") + assert_equal_bam_reads(None, filtered_bam, expected_bam) + + +def test_bam_filter_by_tax_id(taxonomy_dir): + input_dir = join(util.file.get_test_input_path(), 'TestBamFilter') + filtered_bam = util.file.mkstempfname('.bam') + args = [ + os.path.join(input_dir, "input.bam"), + os.path.join(input_dir, "input.kraken-reads.tsv.gz"), + filtered_bam, + os.path.join(taxonomy_dir, "nodes.dmp"), + os.path.join(taxonomy_dir, "names.dmp"), + "--taxIDs", + "186538" + ] + args = metagenomics.parser_filter_bam_to_taxa(argparse.ArgumentParser()).parse_args(args) + args.func_main(args) - expected_bam = os.path.join(input_dir,"expected.bam") - assert_equal_bam_reads(self, filtered_bam, expected_bam) + expected_bam = os.path.join(input_dir,"expected.bam") + assert_equal_bam_reads(None, filtered_bam, expected_bam) diff --git a/test/unit/test_snake.py b/test/unit/test_snake.py index 1d6bccfe5..c979e378b 100644 --- a/test/unit/test_snake.py +++ b/test/unit/test_snake.py @@ -52,7 +52,7 @@ def setup_dummy_simple(workdir, sample_names=('G1234', 'G5678', 'G3671.1_r1', 'G shutil.copy(os.path.join(util.file.get_project_path(), 'pipes', 'Snakefile'), workdir) with open(os.path.join(util.file.get_project_path(), 'pipes', 'config.yaml')) as f: - config = yaml.load(f) + config = yaml.load(f, Loader=yaml.FullLoader) def translate_remote_s3(uri): remote_path = uri[5:] diff --git a/tools/kraken.py b/tools/kraken.py index c3eef1acc..340ae40fe 100644 --- a/tools/kraken.py +++ b/tools/kraken.py @@ -85,7 +85,7 @@ def s3_psub(path): path = db[5:] bucket_name, db_dir = path.split('/', 1) obj = s3.Object(bucket_name, '/'.join([db_dir, 'config.yaml'])) - db_config = yaml.load(obj.get()['Body'].read()) + db_config = yaml.load(obj.get()['Body'].read(), loader=yaml.FullLoader) db_opts = (' --db-pipe --db-index {index_f} --index-size {index_size} --db-file {db_f} --db-size {db_size} ' '--taxonomy {nodes}'.format( @@ -381,3 +381,146 @@ def read_report(self, report_fn): name = parts[8] report[tax_id] = (tax_reads, tax_kmers) return report + + + +class Kraken2(tools.Tool): + + BINS = { + 'classify': 'kraken2', + 'build': 'kraken2-build' + } + + def __init__(self, install_methods=None): + self.subtool_name = self.subtool_name if hasattr(self, "subtool_name") else "kraken2" + if not install_methods: + install_methods = [] + install_methods.append(tools.CondaPackage('kraken2', executable=self.subtool_name, version=KRAKEN_VERSION, channel='broad-viral')) + super(Kraken2, self).__init__(install_methods=install_methods) + + def version(self): + return KRAKEN2_VERSION + + @property + def libexec(self): + if not self.executable_path(): + self.install_and_get_path() + return os.path.dirname(self.executable_path()) + + def build(self, db, options=None, option_string=None): + '''Create a kraken database. + + Args: + db: Kraken database directory to build. Must have library/ and + taxonomy/ subdirectories to build from. + *args: List of input filenames to process. + ''' + options['--threads'] = util.misc.sanitize_thread_count(options.get('--threads')) + self.execute(self.BINS['build'], db, db, options=options, + option_string=option_string) + + def pipeline(self, db, inBams, outReports=None, outReads=None, + lockMemory=None, filterThreshold=None, numThreads=None): + assert outReads is not None or outReports is not None + + + try: + from itertools import zip_longest + except: # Python 2 compat + from itertools import izip_longest as zip_longest + assert out_reads is not None or out_reports is not None + out_reports = out_reports or [] + out_reads = out_reads or [] + + for in_bam, out_read, out_report in zip_longest(in_bams, out_reads, out_reports): + self.classify(in_bam, db, out_reads=out_read, out_report=out_report, num_threads=None) + + + def classify(self, in_bam, db, out_reads=None, out_report=None, num_threads=None): + """Classify input reads (bam) + + Args: + inBam: unaligned reads + db: Kraken built database directory. + outReads: Output file of command. + """ + if tools.samtools.SamtoolsTool().isEmpty(inBam): + # kraken cannot deal with empty input + with open(outReads, 'rt') as outf: + pass + return + tmp_fastq1 = util.file.mkstempfname('.1.fastq.gz') + tmp_fastq2 = util.file.mkstempfname('.2.fastq.gz') + # do not convert this to samtools bam2fq unless we can figure out how to replicate + # the clipping functionality of Picard SamToFastq + picard = tools.picard.SamToFastqTool() + picard_opts = { + 'CLIPPING_ATTRIBUTE': tools.picard.SamToFastqTool.illumina_clipping_attribute, + 'CLIPPING_ACTION': 'X' + } + picard.execute(in_bam, tmp_fastq1, tmp_fastq2, + picardOptions=tools.picard.PicardTools.dict_to_picard_opts(picard_opts), + JVMmemory=picard.jvmMemDefault) + + opts = { + '--threads': util.misc.sanitize_thread_count(num_threads), + '--fastq-input': None, + '--gzip-compressed': None, + } + if out_report: + opts['--report'] = out_report + # Detect if input bam was paired by checking fastq 2 + if os.path.getsize(tmp_fastq2) < 50: + res = self.execute('kraken2', db, out_reads, args=[tmp_fastq1], options=opts) + else: + opts['--paired'] = None + res = self.execute('kraken2', db, out_reads, args=[tmp_fastq1, tmp_fastq2], options=opts) + os.unlink(tmp_fastq1) + os.unlink(tmp_fastq2) + + def filter(self, inReads, db, outReads, filterThreshold): + """Filter Kraken hits + """ + self.execute(self.BINS['filter'], db, outReads, args=[inReads], + options={'--threshold': filterThreshold}) + + def report(self, in_reads, db, outReport): + """Convert Kraken read-based output to summary reports + """ + self.execute(self.BINS['report'], db, outReport, args=[inReads]) + + def execute(self, command, db, output, args=None, options=None, + option_string=None): + '''Run a kraken-* command. + + Args: + db: Kraken database directory. + output: Output file of command. + args: List of positional args. + options: List of keyword options. + option_string: Raw strip command line options. + ''' + options = options or {} + + if command == self.BINS['classify']: + if output: + options['--output'] = output + option_string = option_string or '' + args = args or [] + + cmd = [command, '--db', db] + # We need some way to allow empty options args like --build, hence + # we filter out on 'x is None'. + cmd.extend([str(x) for x in itertools.chain(*options.items()) + if x is not None]) + cmd.extend(shlex.split(option_string)) + cmd.extend(args) + log.debug('Calling %s: %s', command, ' '.join(cmd)) + + if command == self.BINS['classify']: + subprocess.check_call(cmd) + elif command == self.BINS['build']: + subprocess.check_call(cmd) + else: + with util.file.open_or_gzopen(output, 'w') as of: + util.misc.run(cmd, stdout=of, stderr=subprocess.PIPE, check=True) diff --git a/util/file.py b/util/file.py index 6db6b1296..c5793f4be 100644 --- a/util/file.py +++ b/util/file.py @@ -541,7 +541,7 @@ def download_file(uriToGet, dest, destFileName=None): req = urlopen(uriToGet) if not destFileName: - m = re.search('filename="(?P.+)"', req.info()['Content-Disposition']) + m = re.search(r'filename="(?P.+)"', req.info()['Content-Disposition']) if m: destFileName = m.group("filename")