From f7d88b6c7ec86063092fe320f95564d6fd5f5984 Mon Sep 17 00:00:00 2001 From: Guillaume Poirier-Morency Date: Tue, 8 Feb 2022 16:18:09 -0800 Subject: [PATCH] Reuse Bioluigi definitions for RSEM tasks (fix #18) --- rnaseq_pipeline/targets.py | 13 ---- rnaseq_pipeline/tasks.py | 126 +++++++++++-------------------------- tests/test_tasks.py | 5 +- 3 files changed, 41 insertions(+), 103 deletions(-) diff --git a/rnaseq_pipeline/targets.py b/rnaseq_pipeline/targets.py index 74027af5..85f897ec 100644 --- a/rnaseq_pipeline/targets.py +++ b/rnaseq_pipeline/targets.py @@ -6,19 +6,6 @@ from requests.auth import HTTPBasicAuth from .gemma import GemmaApi -class RsemReference(luigi.Target): - """ - Represents the target of rsem-prepare-reference script. - """ - def __init__(self, prefix, taxon): - self.prefix = prefix - self.taxon = taxon - - def exists(self): - exts = ['grp', 'ti', 'seq', 'chrlist'] - return all(exists(join(self.prefix, '{}_0.{}'.format(self.taxon, ext))) - for ext in exts) - class GemmaDatasetPlatform(luigi.Target): """ Represents a platform associated to a Gemma dataset. diff --git a/rnaseq_pipeline/tasks.py b/rnaseq_pipeline/tasks.py index b765083c..b63c8ab0 100755 --- a/rnaseq_pipeline/tasks.py +++ b/rnaseq_pipeline/tasks.py @@ -11,7 +11,7 @@ import requests import yaml from bioluigi.scheduled_external_program import ScheduledExternalProgramTask -from bioluigi.tasks import fastqc, multiqc +from bioluigi.tasks import fastqc, multiqc, rsem from bioluigi.tasks.utils import DynamicTaskWithOutputMixin, TaskWithOutputMixin, DynamicWrapperTask from luigi.task import flatten, flatten_output, WrapperTask from luigi.util import requires @@ -22,7 +22,7 @@ from .sources.geo import DownloadGeoSample, DownloadGeoSeries, ExtractGeoSeriesBatchInfo from .sources.local import DownloadLocalSample, DownloadLocalExperiment from .sources.sra import DownloadSraProject, DownloadSraExperiment, ExtractSraProjectBatchInfo -from .targets import GemmaDatasetPlatform, GemmaDatasetFactor, RsemReference +from .targets import GemmaDatasetPlatform, GemmaDatasetFactor from .utils import no_retry, IlluminaFastqHeader, TaskWithPriorityMixin, RerunnableTaskMixin, remove_task_output from .gemma import GemmaTask @@ -179,110 +179,58 @@ def run(self): for dst in download_sample_tasks] @no_retry -class PrepareReference(ScheduledExternalProgramTask): - """ - Prepare a STAR/RSEM reference. - - :param taxon: Taxon - :param reference_id: Reference annotation build to use (i.e. ensembl98, hg38_ncbi) +@requires(TrimSample) +class AlignSample(luigi.Task): """ - taxon = luigi.Parameter(default='human') - reference_id = luigi.Parameter(default='hg38_ncbi') - - cpus = 16 - memory = 32 - - def input(self): - genome_dir = join(cfg.OUTPUT_DIR, cfg.GENOMES, self.reference_id) - gtf_files = glob(join(genome_dir, '*.gtf')) - fasta_files = glob(join(genome_dir, '*.fa')) - if len(gtf_files) != 1: - raise ValueError('Only one GTF file is expected in {}.'.format(genome_dir)) - return [luigi.LocalTarget(gtf_files[0]), - [luigi.LocalTarget(f) for f in fasta_files]] - - def program_args(self): - gtf, genome_fasta = self.input() - args = [join(cfg.RSEM_DIR, 'rsem-prepare-reference')] - - args.extend(['--gtf', gtf.path]) + Prepare a STAR/RSEM reference and align the sample against it. - args.extend([ - '--star', - '-p', self.cpus]) - - args.extend([t.path for t in genome_fasta]) - - args.append(join(self.output().prefix)) - - return args - - def run(self): - os.makedirs(os.path.dirname(self.output().prefix), exist_ok=True) - return super().run() - - def output(self): - return RsemReference(join(cfg.OUTPUT_DIR, cfg.REFERENCES, self.reference_id), self.taxon) - -@no_retry -@requires(TrimSample, PrepareReference) -class AlignSample(ScheduledExternalProgramTask): - """ The output of the task is a pair of isoform and gene quantification results processed by STAR and RSEM. - - :attr strand_specific: Indicate if the RNA-Seq data is stranded """ + taxon = luigi.Parameter(default='human',positional=False, description='Taxon') + reference_id = luigi.Parameter(default='hg38_ncbi', positional=False, description='Reference annotation build to use (i.e. ensembl98, hg38_ncbi)') + # TODO: handle strand-specific reads - strand_specific = luigi.BoolParameter(default=False, positional=False) + strand_specific = luigi.BoolParameter(default=False, positional=False, description='Indicate if the RNA-Seq data is strand-specific.') scope = luigi.Parameter(default='genes', positional=False) - cpus = 8 - memory = 32 - walltime = datetime.timedelta(days=1) - - # cleanup unused shared memory objects before and after the task is run - scheduler_extra_args = ['--task-prolog', abspath(cfg.STAR_CLEANUP_SCRIPT), '--task-epilog', abspath(cfg.STAR_CLEANUP_SCRIPT)] - - def run(self): - self.output().makedirs() - return super().run() - def _get_output_prefix(self): return join(cfg.OUTPUT_DIR, cfg.ALIGNDIR, self.reference_id, self.experiment_id, self.sample_id) - def program_args(self): - args = [join(cfg.RSEM_DIR, 'rsem-calculate-expression'), '-p', self.cpus] - - args.extend([ - '--time', - '--star', - '--star-gzipped-read-file', - '--no-bam-output']) - - if self.strand_specific: - args.append('--strand-specific') - - sample_run, reference = self.input() - + def run(self): + sample_run = self.input() + genome_dir = join(cfg.OUTPUT_DIR, cfg.GENOMES, self.reference_id) + gtf_files = glob(join(genome_dir, '*.gtf')) + fasta_files = glob(join(genome_dir, '*.fna')) + reference_name = join(cfg.OUTPUT_DIR, cfg.REFERENCES, self.reference_id, f'{self.taxon}_0') fastqs = [mate.path for mate in sample_run] + sample_name = self._get_output_prefix() + extra_args = ['--time', '--no-bam-output'] + if self.strand_specific: + extra_args.append('--strand-specific') - if len(fastqs) == 1: - args.append(fastqs[0]) - elif len(fastqs) == 2: - args.append('--paired-end') - args.extend(fastqs) - else: - raise NotImplementedError('Alignment of more than two input FASTQs is not supported.') + if len(fasta_files) < 1: + raise ValueError('At least one FASTA file is expected in {}.'.format(genome_dir)) - # reference for alignments and quantifications - args.append(join(reference.prefix, '{}_0'.format(self.taxon))) + if len(gtf_files) != 1: + raise ValueError('Exactly one GTF file is expected in {}.'.format(genome_dir)) - # output prefix - args.append(self._get_output_prefix()) + self.output().makedirs() - return args + yield rsem.CalculateExpression( + gtf_files[0], + fasta_files, + reference_name, + fastqs, + sample_name, + aligner='star', + extra_args=extra_args, + cpus=8, + memory=32, + walltime=datetime.timedelta(days=1), + # cleanup unused shared memory objects before and after the task is run + scheduler_extra_args=['--task-prolog', abspath(cfg.STAR_CLEANUP_SCRIPT), '--task-epilog', abspath(cfg.STAR_CLEANUP_SCRIPT)]) def output(self): return luigi.LocalTarget(self._get_output_prefix() + f'.{self.scope}.results') diff --git a/tests/test_tasks.py b/tests/test_tasks.py index 0802befd..f517d99a 100644 --- a/tests/test_tasks.py +++ b/tests/test_tasks.py @@ -40,7 +40,10 @@ def test_platform_retrieval_by_name_when_unknown_instrument(): def test_align_sample_task(): task = AlignSample('GSE', 'GSM', reference_id='hg38_ncbi', scope='genes') assert task.output().path == join(cfg.OUTPUT_DIR, cfg.ALIGNDIR, 'hg38_ncbi', 'GSE', 'GSM.genes.results') - assert task.walltime == datetime.timedelta(days=1) + yielded_calculate_expression_task = next(task.run()) + assert yielded_calculate_expression_task.cpus == 8 + assert yielded_calculate_expression_task.memory == 32 + assert yielded_calculate_expression_task.walltime == datetime.timedelta(days=1) def test_gemma_task(): gemma_task = GemmaTask('GSE110256')