diff --git a/v03_pipeline/lib/misc/validation.py b/v03_pipeline/lib/misc/validation.py index 113e79b53..5d23397be 100644 --- a/v03_pipeline/lib/misc/validation.py +++ b/v03_pipeline/lib/misc/validation.py @@ -151,10 +151,16 @@ def validate_imputed_sex_ploidy( ) ), ) - discrepant_rate = mt.aggregate_cols(hl.agg.fraction(mt.discrepant)) - if discrepant_rate: - msg = f'{discrepant_rate:.2%} of samples have misaligned ploidy with their provided imputed sex.' - raise SeqrValidationError(msg) + discrepant_samples = mt.aggregate_cols( + hl.agg.filter(mt.discrepant, hl.agg.collect_as_set(mt.s)), + ) + if discrepant_samples: + sorted_discrepant_samples = sorted(discrepant_samples) + msg = f'Found samples with misaligned ploidy with their provided imputed sex (first 10, if applicable) : {sorted_discrepant_samples[:10]}' + raise SeqrValidationError( + msg, + {'imputed_sex_ploidy_failures': sorted_discrepant_samples}, + ) def validate_sample_type( diff --git a/v03_pipeline/lib/misc/validation_test.py b/v03_pipeline/lib/misc/validation_test.py index f32900a99..b1057e1e2 100644 --- a/v03_pipeline/lib/misc/validation_test.py +++ b/v03_pipeline/lib/misc/validation_test.py @@ -18,60 +18,68 @@ def _mt_from_contigs(contigs): - return hl.MatrixTable.from_parts( - rows={ - 'locus': [ - hl.Locus( - contig=contig, - position=1, - reference_genome='GRCh38', - ) - for contig in contigs - ], - }, - cols={'s': ['sample_1']}, - entries={'HL': [[0.0] for _ in range(len(contigs))]}, - ).key_rows_by('locus') - - -class ValidationTest(unittest.TestCase): - def test_validate_allele_type(self) -> None: - mt = hl.MatrixTable.from_parts( + return ( + hl.MatrixTable.from_parts( rows={ 'locus': [ hl.Locus( - contig='chr1', + contig=contig, position=1, reference_genome='GRCh38', - ), - hl.Locus( - contig='chr1', - position=2, - reference_genome='GRCh38', - ), - hl.Locus( - contig='chr1', - position=3, - reference_genome='GRCh38', - ), - hl.Locus( - contig='chr1', - position=4, - reference_genome='GRCh38', - ), - ], - 'alleles': [ - ['A', 'T'], - # NB: star alleles should pass through this validation just fine, - # but are eventually filtered out upstream. - ['A', '*'], - ['A', '-'], - ['A', ''], + ) + for contig in contigs ], }, cols={'s': ['sample_1']}, - entries={'HL': [[0.0], [0.0], [0.0], [0.0]]}, - ).key_rows_by('locus', 'alleles') + entries={'HL': [[0.0] for _ in range(len(contigs))]}, + ) + .key_rows_by('locus') + .key_cols_by('s') + ) + + +class ValidationTest(unittest.TestCase): + def test_validate_allele_type(self) -> None: + mt = ( + hl.MatrixTable.from_parts( + rows={ + 'locus': [ + hl.Locus( + contig='chr1', + position=1, + reference_genome='GRCh38', + ), + hl.Locus( + contig='chr1', + position=2, + reference_genome='GRCh38', + ), + hl.Locus( + contig='chr1', + position=3, + reference_genome='GRCh38', + ), + hl.Locus( + contig='chr1', + position=4, + reference_genome='GRCh38', + ), + ], + 'alleles': [ + ['A', 'T'], + # NB: star alleles should pass through this validation just fine, + # but are eventually filtered out upstream. + ['A', '*'], + ['A', '-'], + ['A', ''], + ], + }, + cols={'s': ['sample_1']}, + entries={'HL': [[0.0], [0.0], [0.0], [0.0]]}, + ) + .key_rows_by('locus', 'alleles') + .key_cols_by('s') + ) self.assertRaisesRegex( SeqrValidationError, "Alleles with invalid AlleleType are present in the callset: \\[\\('A', '-'\\), \\('A', ''\\)\\]", @@ -80,28 +88,32 @@ def test_validate_allele_type(self) -> None: DatasetType.SNV_INDEL, ) - mt = hl.MatrixTable.from_parts( - rows={ - 'locus': [ - hl.Locus( - contig='chr1', - position=1, - reference_genome='GRCh38', - ), - hl.Locus( - contig='chr1', - position=2, - reference_genome='GRCh38', - ), - ], - 'alleles': [ - ['C', ''], - ['A', ''], - ], - }, - cols={'s': ['sample_1']}, - entries={'HL': [[0.0], [0.0]]}, - ).key_rows_by('locus', 'alleles') + mt = ( + hl.MatrixTable.from_parts( + rows={ + 'locus': [ + hl.Locus( + contig='chr1', + position=1, + reference_genome='GRCh38', + ), + hl.Locus( + contig='chr1', + position=2, + reference_genome='GRCh38', + ), + ], + 'alleles': [ + ['C', ''], + ['A', ''], + ], + }, + cols={'s': ['sample_1']}, + entries={'HL': [[0.0], [0.0]]}, + ) + .key_rows_by('locus', 'alleles') + .key_cols_by('s') + ) self.assertRaisesRegex( SeqrValidationError, 'Alleles with invalid allele are present in the callset. This appears to be a GVCF containing records for sites with no variants.', @@ -122,116 +134,128 @@ def test_validate_imputed_sex_ploidy(self) -> None: sex_check_ht = hl.read_table(TEST_SEX_CHECK_1) # All calls on X chromosome are valid - mt = hl.MatrixTable.from_parts( - rows={ - 'locus': [ - hl.Locus( - contig='chrX', - position=1, - reference_genome='GRCh38', - ), - ], - }, - cols={ - 's': [ - female_sample, - male_sample_1, - x0_sample, - xxy_sample, - xyy_sample, - xxx_sample, - ], - }, - entries={ - 'GT': [ - [ - hl.Call(alleles=[0, 0], phased=False), - hl.Call(alleles=[0], phased=False), - hl.Call(alleles=[0, 0], phased=False), # X0 - hl.Call(alleles=[0, 0], phased=False), # XXY - hl.Call(alleles=[0, 0], phased=False), # XYY - hl.Call(alleles=[0, 0], phased=False), # XXX + mt = ( + hl.MatrixTable.from_parts( + rows={ + 'locus': [ + hl.Locus( + contig='chrX', + position=1, + reference_genome='GRCh38', + ), ], - ], - }, - ).key_rows_by('locus') + }, + cols={ + 's': [ + female_sample, + male_sample_1, + x0_sample, + xxy_sample, + xyy_sample, + xxx_sample, + ], + }, + entries={ + 'GT': [ + [ + hl.Call(alleles=[0, 0], phased=False), + hl.Call(alleles=[0], phased=False), + hl.Call(alleles=[0, 0], phased=False), # X0 + hl.Call(alleles=[0, 0], phased=False), # XXY + hl.Call(alleles=[0, 0], phased=False), # XYY + hl.Call(alleles=[0, 0], phased=False), # XXX + ], + ], + }, + ) + .key_rows_by('locus') + .key_cols_by('s') + ) validate_imputed_sex_ploidy(mt, sex_check_ht) # All calls on Y chromosome are valid - mt = hl.MatrixTable.from_parts( - rows={ - 'locus': [ - hl.Locus( - contig='chrY', - position=1, - reference_genome='GRCh38', - ), - ], - }, - cols={ - 's': [ - female_sample, - male_sample_1, - x0_sample, - xxy_sample, - xyy_sample, - xxx_sample, - ], - }, - entries={ - 'GT': [ - [ - hl.missing(hl.tcall), - hl.Call(alleles=[0], phased=False), - hl.missing(hl.tcall), # X0 - hl.Call(alleles=[0, 0], phased=False), # XXY - hl.Call(alleles=[0, 0], phased=False), # XYY - hl.missing(hl.tcall), # XXX + mt = ( + hl.MatrixTable.from_parts( + rows={ + 'locus': [ + hl.Locus( + contig='chrY', + position=1, + reference_genome='GRCh38', + ), ], - ], - }, - ).key_rows_by('locus') + }, + cols={ + 's': [ + female_sample, + male_sample_1, + x0_sample, + xxy_sample, + xyy_sample, + xxx_sample, + ], + }, + entries={ + 'GT': [ + [ + hl.missing(hl.tcall), + hl.Call(alleles=[0], phased=False), + hl.missing(hl.tcall), # X0 + hl.Call(alleles=[0, 0], phased=False), # XXY + hl.Call(alleles=[0, 0], phased=False), # XYY + hl.missing(hl.tcall), # XXX + ], + ], + }, + ) + .key_rows_by('locus') + .key_cols_by('s') + ) validate_imputed_sex_ploidy(mt, sex_check_ht) # Invalid X chromosome case - mt = hl.MatrixTable.from_parts( - rows={ - 'locus': [ - hl.Locus( - contig='chrX', - position=1, - reference_genome='GRCh38', - ), - ], - }, - cols={ - 's': [ - female_sample, - male_sample_1, - male_sample_2, - x0_sample, - xxy_sample, - xyy_sample, - xxx_sample, - ], - }, - entries={ - 'GT': [ - [ - hl.Call(alleles=[0], phased=False), # invalid Female call - hl.Call(alleles=[0], phased=False), # valid Male call - hl.missing(hl.tcall), # invalid Male call - hl.Call(alleles=[0], phased=False), # invalid X0 call - hl.Call(alleles=[0], phased=False), # invalid XXY call - hl.missing(hl.tcall), # valid XYY call - hl.Call(alleles=[0, 0], phased=False), # valid XXX call + mt = ( + hl.MatrixTable.from_parts( + rows={ + 'locus': [ + hl.Locus( + contig='chrX', + position=1, + reference_genome='GRCh38', + ), ], - ], - }, - ).key_rows_by('locus') + }, + cols={ + 's': [ + female_sample, + male_sample_1, + male_sample_2, + x0_sample, + xxy_sample, + xyy_sample, + xxx_sample, + ], + }, + entries={ + 'GT': [ + [ + hl.Call(alleles=[0], phased=False), # invalid Female call + hl.Call(alleles=[0], phased=False), # valid Male call + hl.missing(hl.tcall), # invalid Male call + hl.Call(alleles=[0], phased=False), # invalid X0 call + hl.Call(alleles=[0], phased=False), # invalid XXY call + hl.missing(hl.tcall), # valid XYY call + hl.Call(alleles=[0, 0], phased=False), # valid XXX call + ], + ], + }, + ) + .key_rows_by('locus') + .key_cols_by('s') + ) self.assertRaisesRegex( SeqrValidationError, - '57.14% of samples have misaligned ploidy', + "Found samples with misaligned ploidy with their provided imputed sex \\(first 10, if applicable\\) : \\['HG00731_1', 'HG00732_1', 'NA20889_1', 'NA20899_1'\\].*", validate_imputed_sex_ploidy, mt, sex_check_ht, @@ -253,34 +277,38 @@ def test_validate_imported_field_types(self) -> None: ) def test_validate_no_duplicate_variants(self) -> None: - mt = hl.MatrixTable.from_parts( - rows={ - 'locus': [ - hl.Locus( - contig='chr1', - position=1, - reference_genome='GRCh38', - ), - hl.Locus( - contig='chr1', - position=2, - reference_genome='GRCh38', - ), - hl.Locus( - contig='chr1', - position=2, - reference_genome='GRCh38', - ), - ], - 'alleles': [ - ['A', 'C'], - ['A', 'C'], - ['A', 'C'], - ], - }, - cols={'s': ['sample_1']}, - entries={'HL': [[0.0], [0.0], [0.0]]}, - ).key_rows_by('locus', 'alleles') + mt = ( + hl.MatrixTable.from_parts( + rows={ + 'locus': [ + hl.Locus( + contig='chr1', + position=1, + reference_genome='GRCh38', + ), + hl.Locus( + contig='chr1', + position=2, + reference_genome='GRCh38', + ), + hl.Locus( + contig='chr1', + position=2, + reference_genome='GRCh38', + ), + ], + 'alleles': [ + ['A', 'C'], + ['A', 'C'], + ['A', 'C'], + ], + }, + cols={'s': ['sample_1']}, + entries={'HL': [[0.0], [0.0], [0.0]]}, + ) + .key_rows_by('locus', 'alleles') + .key_cols_by('s') + ) self.assertRaisesRegex( SeqrValidationError, "Variants are present multiple times in the callset: \\['1-2-A-C'\\]", diff --git a/v03_pipeline/lib/model/environment.py b/v03_pipeline/lib/model/environment.py index 42ae4c658..72c8c5a45 100644 --- a/v03_pipeline/lib/model/environment.py +++ b/v03_pipeline/lib/model/environment.py @@ -50,6 +50,8 @@ GCLOUD_ZONE = os.environ.get('GCLOUD_ZONE') GCLOUD_REGION = os.environ.get('GCLOUD_REGION') PIPELINE_RUNNER_APP_VERSION = os.environ.get('PIPELINE_RUNNER_APP_VERSION', 'latest') +SEQR_APP_HAIL_SEARCH_DATA_DIR = os.environ.get('SEQR_APP_HAIL_SEARCH_DATA_DIR') +SEQR_APP_REFERENCE_DATASETS_DIR = os.environ.get('SEQR_APP_REFERENCE_DATASETS_DIR') @dataclass @@ -71,4 +73,6 @@ class Env: PIPELINE_RUNNER_APP_VERSION: str = PIPELINE_RUNNER_APP_VERSION PRIVATE_REFERENCE_DATASETS_DIR: str = PRIVATE_REFERENCE_DATASETS_DIR REFERENCE_DATASETS_DIR: str = REFERENCE_DATASETS_DIR + SEQR_APP_HAIL_SEARCH_DATA_DIR: str | None = SEQR_APP_HAIL_SEARCH_DATA_DIR + SEQR_APP_REFERENCE_DATASETS_DIR: str | None = SEQR_APP_REFERENCE_DATASETS_DIR VEP_REFERENCE_DATASETS_DIR: str = VEP_REFERENCE_DATASETS_DIR diff --git a/v03_pipeline/lib/model/feature_flag.py b/v03_pipeline/lib/model/feature_flag.py index 0b57e8643..8a033f60f 100644 --- a/v03_pipeline/lib/model/feature_flag.py +++ b/v03_pipeline/lib/model/feature_flag.py @@ -11,6 +11,7 @@ INCLUDE_PIPELINE_VERSION_IN_PREFIX = ( os.environ.get('INCLUDE_PIPELINE_VERSION_IN_PREFIX') == '1' ) +RUN_PIPELINE_ON_DATAPROC = os.environ.get('RUN_PIPELINE_ON_DATAPROC') == '1' SHOULD_TRIGGER_HAIL_BACKEND_RELOAD = ( os.environ.get('SHOULD_TRIGGER_HAIL_BACKEND_RELOAD') == '1' ) @@ -23,4 +24,5 @@ class FeatureFlag: EXPECT_TDR_METRICS: bool = EXPECT_TDR_METRICS EXPECT_WES_FILTERS: bool = EXPECT_WES_FILTERS INCLUDE_PIPELINE_VERSION_IN_PREFIX: bool = INCLUDE_PIPELINE_VERSION_IN_PREFIX + RUN_PIPELINE_ON_DATAPROC: bool = RUN_PIPELINE_ON_DATAPROC SHOULD_TRIGGER_HAIL_BACKEND_RELOAD: bool = SHOULD_TRIGGER_HAIL_BACKEND_RELOAD diff --git a/v03_pipeline/lib/paths.py b/v03_pipeline/lib/paths.py index 2942b1dae..87aa024cf 100644 --- a/v03_pipeline/lib/paths.py +++ b/v03_pipeline/lib/paths.py @@ -17,7 +17,7 @@ ) -def _pipeline_prefix( +def pipeline_prefix( root: str, reference_genome: ReferenceGenome, dataset_type: DatasetType, @@ -36,38 +36,15 @@ def _pipeline_prefix( ) -def _v03_reference_data_prefix( - access_control: AccessControl, - reference_genome: ReferenceGenome, - dataset_type: DatasetType, -) -> str: - root = ( - Env.PRIVATE_REFERENCE_DATASETS_DIR - if access_control == AccessControl.PRIVATE - else Env.REFERENCE_DATASETS_DIR - ) - if FeatureFlag.INCLUDE_PIPELINE_VERSION_IN_PREFIX: - return os.path.join( - root, - PipelineVersion.V03.value, - reference_genome.value, - dataset_type.value, - ) - return os.path.join( - root, - reference_genome.value, - dataset_type.value, - ) - - def _v03_reference_dataset_prefix( + root: str, access_control: AccessControl, reference_genome: ReferenceGenome, ) -> str: root = ( Env.PRIVATE_REFERENCE_DATASETS_DIR if access_control == AccessControl.PRIVATE - else Env.REFERENCE_DATASETS_DIR + else root ) if FeatureFlag.INCLUDE_PIPELINE_VERSION_IN_PREFIX: return os.path.join( @@ -88,7 +65,7 @@ def family_table_path( family_guid: str, ) -> str: return os.path.join( - _pipeline_prefix( + pipeline_prefix( Env.HAIL_SEARCH_DATA_DIR, reference_genome, dataset_type, @@ -104,7 +81,7 @@ def tdr_metrics_dir( dataset_type: DatasetType, ) -> str: return os.path.join( - _pipeline_prefix( + pipeline_prefix( Env.LOADING_DATASETS_DIR, reference_genome, dataset_type, @@ -130,7 +107,7 @@ def imported_callset_path( callset_path: str, ) -> str: return os.path.join( - _pipeline_prefix( + pipeline_prefix( Env.LOADING_DATASETS_DIR, reference_genome, dataset_type, @@ -177,7 +154,7 @@ def project_table_path( project_guid: str, ) -> str: return os.path.join( - _pipeline_prefix( + pipeline_prefix( Env.HAIL_SEARCH_DATA_DIR, reference_genome, dataset_type, @@ -194,7 +171,7 @@ def relatedness_check_table_path( callset_path: str, ) -> str: return os.path.join( - _pipeline_prefix( + pipeline_prefix( Env.LOADING_DATASETS_DIR, reference_genome, dataset_type, @@ -210,7 +187,7 @@ def relatedness_check_tsv_path( callset_path: str, ) -> str: return os.path.join( - _pipeline_prefix( + pipeline_prefix( Env.LOADING_DATASETS_DIR, reference_genome, dataset_type, @@ -227,7 +204,7 @@ def remapped_and_subsetted_callset_path( project_guid: str, ) -> str: return os.path.join( - _pipeline_prefix( + pipeline_prefix( Env.LOADING_DATASETS_DIR, reference_genome, dataset_type, @@ -243,7 +220,7 @@ def lookup_table_path( dataset_type: DatasetType, ) -> str: return os.path.join( - _pipeline_prefix( + pipeline_prefix( Env.HAIL_SEARCH_DATA_DIR, reference_genome, dataset_type, @@ -257,7 +234,7 @@ def runs_path( dataset_type: DatasetType, ) -> str: return os.path.join( - _pipeline_prefix( + pipeline_prefix( Env.HAIL_SEARCH_DATA_DIR, reference_genome, dataset_type, @@ -272,7 +249,7 @@ def sex_check_table_path( callset_path: str, ) -> str: return os.path.join( - _pipeline_prefix( + pipeline_prefix( Env.LOADING_DATASETS_DIR, reference_genome, dataset_type, @@ -306,6 +283,7 @@ def valid_reference_dataset_path( ) -> str | None: return os.path.join( _v03_reference_dataset_prefix( + Env.REFERENCE_DATASETS_DIR, reference_dataset.access_control, reference_genome, ), @@ -318,9 +296,13 @@ def valid_reference_dataset_query_path( reference_genome: ReferenceGenome, dataset_type: DatasetType, reference_dataset_query: ReferenceDatasetQuery, + root=None, ) -> str | None: + if not root: + root = Env.REFERENCE_DATASETS_DIR return os.path.join( _v03_reference_dataset_prefix( + root, reference_dataset_query.access_control, reference_genome, ), @@ -334,7 +316,7 @@ def variant_annotations_table_path( dataset_type: DatasetType, ) -> str: return os.path.join( - _pipeline_prefix( + pipeline_prefix( Env.HAIL_SEARCH_DATA_DIR, reference_genome, dataset_type, @@ -348,7 +330,7 @@ def variant_annotations_vcf_path( dataset_type: DatasetType, ) -> str: return os.path.join( - _pipeline_prefix( + pipeline_prefix( Env.HAIL_SEARCH_DATA_DIR, reference_genome, dataset_type, @@ -386,7 +368,7 @@ def project_remap_path( project_guid: str, ) -> str: return os.path.join( - _pipeline_prefix( + pipeline_prefix( Env.LOADING_DATASETS_DIR, reference_genome, dataset_type, @@ -404,7 +386,7 @@ def project_pedigree_path( project_guid: str, ) -> str: return os.path.join( - _pipeline_prefix( + pipeline_prefix( Env.LOADING_DATASETS_DIR, reference_genome, dataset_type, diff --git a/v03_pipeline/lib/reference_datasets/reference_dataset.py b/v03_pipeline/lib/reference_datasets/reference_dataset.py index 445e191aa..0ca80f8cb 100644 --- a/v03_pipeline/lib/reference_datasets/reference_dataset.py +++ b/v03_pipeline/lib/reference_datasets/reference_dataset.py @@ -166,6 +166,21 @@ class ReferenceDatasetQuery(BaseReferenceDataset, str, Enum): clinvar_path_variants = 'clinvar_path_variants' high_af_variants = 'high_af_variants' + @classmethod + def for_reference_genome_dataset_type( + cls, + reference_genome: ReferenceGenome, + dataset_type: DatasetType, + ) -> set['ReferenceDatasetQuery']: + return { + dataset + for dataset in super().for_reference_genome_dataset_type( + reference_genome, + dataset_type, + ) + if isinstance(dataset, cls) + } + @property def requires(self) -> ReferenceDataset: return { diff --git a/v03_pipeline/lib/tasks/__init__.py b/v03_pipeline/lib/tasks/__init__.py index 1b986f668..76832c5c5 100644 --- a/v03_pipeline/lib/tasks/__init__.py +++ b/v03_pipeline/lib/tasks/__init__.py @@ -11,6 +11,7 @@ from v03_pipeline.lib.tasks.reference_data.update_variant_annotations_table_with_updated_reference_dataset import ( UpdateVariantAnnotationsTableWithUpdatedReferenceDataset, ) +from v03_pipeline.lib.tasks.run_pipeline import RunPipelineTask from v03_pipeline.lib.tasks.update_lookup_table import ( UpdateLookupTableTask, ) @@ -46,6 +47,7 @@ 'DeleteProjectTablesTask', 'MigrateAllLookupTablesTask', 'MigrateAllVariantAnnotationsTablesTask', + 'RunPipelineTask', 'UpdateProjectTableTask', 'UpdateProjectTablesWithDeletedFamiliesTask', 'UpdateLookupTableTask', diff --git a/v03_pipeline/lib/tasks/dataproc/base_run_job_on_dataproc.py b/v03_pipeline/lib/tasks/dataproc/base_run_job_on_dataproc.py index 1094d651e..b8162392e 100644 --- a/v03_pipeline/lib/tasks/dataproc/base_run_job_on_dataproc.py +++ b/v03_pipeline/lib/tasks/dataproc/base_run_job_on_dataproc.py @@ -33,12 +33,12 @@ def __init__(self, *args, **kwargs): ) @property - def task_name(self): - return self.get_task_family().split('.')[-1] + def task(self): + raise NotImplementedError @property def job_id(self): - return f'{self.task_name}-{self.run_id}' + return f'{self.task.task_family}-{self.run_id}' def requires(self) -> [luigi.Task]: return [self.clone(CreateDataprocClusterTask)] @@ -58,7 +58,7 @@ def complete(self) -> bool: except google.api_core.exceptions.NotFound: return False if job.status.state == ERROR_STATE: - msg = f'Job {self.task_name}-{self.run_id} entered ERROR state' + msg = f'Job {self.task.task_family}-{self.run_id} entered ERROR state' logger.error(msg) logger.error(job.status.details) return job.status.state == DONE_STATE @@ -81,7 +81,7 @@ def run(self): 'pyspark_job': { 'main_python_file_uri': f'{SEQR_PIPELINE_RUNNER_BUILD}/bin/run_task.py', 'args': [ - self.task_name, + self.task.task_family, '--local-scheduler', *to_kebab_str_args(self), ], diff --git a/v03_pipeline/lib/tasks/dataproc/misc_test.py b/v03_pipeline/lib/tasks/dataproc/misc_test.py index 335cacbf7..22729bedc 100644 --- a/v03_pipeline/lib/tasks/dataproc/misc_test.py +++ b/v03_pipeline/lib/tasks/dataproc/misc_test.py @@ -3,8 +3,8 @@ from v03_pipeline.lib.model import DatasetType, ReferenceGenome, SampleType from v03_pipeline.lib.tasks.dataproc.misc import to_kebab_str_args -from v03_pipeline.lib.tasks.dataproc.write_success_file_on_dataproc import ( - WriteSuccessFileOnDataprocTask, +from v03_pipeline.lib.tasks.dataproc.rsync_to_seqr_app_dirs import ( + RsyncToSeqrAppDirsTask, ) @@ -13,7 +13,7 @@ ) class MiscTest(unittest.TestCase): def test_to_kebab_str_args(self, _: Mock): - t = WriteSuccessFileOnDataprocTask( + t = RsyncToSeqrAppDirsTask( reference_genome=ReferenceGenome.GRCh38, dataset_type=DatasetType.SNV_INDEL, sample_type=SampleType.WGS, diff --git a/v03_pipeline/lib/tasks/dataproc/rsync_to_seqr_app_dirs.py b/v03_pipeline/lib/tasks/dataproc/rsync_to_seqr_app_dirs.py new file mode 100644 index 000000000..194f27611 --- /dev/null +++ b/v03_pipeline/lib/tasks/dataproc/rsync_to_seqr_app_dirs.py @@ -0,0 +1,101 @@ +import os +import subprocess + +import luigi + +from v03_pipeline.lib.model import Env +from v03_pipeline.lib.paths import pipeline_prefix, valid_reference_dataset_query_path +from v03_pipeline.lib.reference_datasets.reference_dataset import ReferenceDatasetQuery +from v03_pipeline.lib.tasks.base.base_loading_run_params import ( + BaseLoadingRunParams, +) +from v03_pipeline.lib.tasks.dataproc.run_pipeline_on_dataproc import ( + RunPipelineOnDataprocTask, +) + + +def hail_search_value(value: str) -> str: + return value.replace('SV', 'SV_WGS').replace( + 'GCNV', + 'SV_WES', + ) + + +def rsync_command(src_path: str, dst_path: str) -> list[str]: + return [ + '/bin/bash', + '-cx', + f'mkdir -p {dst_path} && gsutil -qm rsync -rd -x .*runs.* {src_path} {dst_path} && sync {dst_path}', + ] + + +@luigi.util.inherits(BaseLoadingRunParams) +class RsyncToSeqrAppDirsTask(luigi.Task): + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + self.done = False + + def output(self) -> None: + return None + + def complete(self) -> bool: + return self.done + + def requires(self) -> luigi.Task: + return self.clone(RunPipelineOnDataprocTask) + + def run(self) -> None: + if not ( + Env.SEQR_APP_HAIL_SEARCH_DATA_DIR and Env.SEQR_APP_REFERENCE_DATASETS_DIR + ): + self.done = True + return + + if not ( + Env.HAIL_SEARCH_DATA_DIR.startswith('gs://') + and Env.REFERENCE_DATASETS_DIR.startswith('gs://') + ): + msg = 'Overridden HAIL_SEARCH_DATA_DIR and REFERENCE_DATASETS_DIR must be Google Cloud buckets.' + raise RuntimeError(msg) + + # Sync Pipeline Tables + src_path = pipeline_prefix( + Env.HAIL_SEARCH_DATA_DIR, + self.reference_genome, + self.dataset_type, + ) + dst_path = hail_search_value( + pipeline_prefix( + Env.SEQR_APP_HAIL_SEARCH_DATA_DIR, + self.reference_genome, + self.dataset_type, + ), + ) + subprocess.call( + rsync_command(src_path, dst_path), # noqa: S603 + ) + + # Sync RDQs + for query in ReferenceDatasetQuery.for_reference_genome_dataset_type( + self.reference_genome, + self.dataset_type, + ): + src_path = valid_reference_dataset_query_path( + self.reference_genome, + self.dataset_type, + query, + ) + dst_path = os.path.join( + hail_search_value( + valid_reference_dataset_query_path( + self.reference_genome, + self.dataset_type, + query, + Env.SEQR_APP_REFERENCE_DATASETS_DIR, + ), + ), + ) + subprocess.call( + rsync_command(src_path, dst_path), # noqa: S603 + ) + self.done = True diff --git a/v03_pipeline/lib/tasks/dataproc/rsync_to_seqr_app_dirs_test.py b/v03_pipeline/lib/tasks/dataproc/rsync_to_seqr_app_dirs_test.py new file mode 100644 index 000000000..e4894e1db --- /dev/null +++ b/v03_pipeline/lib/tasks/dataproc/rsync_to_seqr_app_dirs_test.py @@ -0,0 +1,112 @@ +import unittest +from unittest.mock import Mock, call, patch + +import luigi + +from v03_pipeline.lib.model import ( + DatasetType, + Env, + FeatureFlag, + ReferenceGenome, + SampleType, +) +from v03_pipeline.lib.tasks.dataproc.rsync_to_seqr_app_dirs import ( + RsyncToSeqrAppDirsTask, +) +from v03_pipeline.lib.test.mock_complete_task import MockCompleteTask + + +class RsyncToSeqrAppDirsTaskTest(unittest.TestCase): + @patch( + 'v03_pipeline.lib.tasks.dataproc.rsync_to_seqr_app_dirs.RunPipelineOnDataprocTask', + ) + @patch('v03_pipeline.lib.tasks.dataproc.rsync_to_seqr_app_dirs.subprocess') + def test_rsync_to_seqr_app_dirs_no_sync( + self, + mock_subprocess: Mock, + mock_run_pipeline_task: Mock, + ) -> None: + mock_run_pipeline_task.return_value = MockCompleteTask() + worker = luigi.worker.Worker() + task = RsyncToSeqrAppDirsTask( + reference_genome=ReferenceGenome.GRCh38, + dataset_type=DatasetType.SNV_INDEL, + sample_type=SampleType.WGS, + callset_path='test_callset', + project_guids=['R0113_test_project'], + project_remap_paths=['test_remap'], + project_pedigree_paths=['test_pedigree'], + run_id='manual__2024-04-01', + ) + worker.add(task) + worker.run() + self.assertTrue(task.complete()) + mock_subprocess.call.assert_not_called() + + @patch( + 'v03_pipeline.lib.tasks.dataproc.rsync_to_seqr_app_dirs.RunPipelineOnDataprocTask', + ) + @patch('v03_pipeline.lib.tasks.dataproc.rsync_to_seqr_app_dirs.subprocess') + @patch.object(Env, 'HAIL_SEARCH_DATA_DIR', 'gs://test-hail-search-dir') + @patch.object(Env, 'REFERENCE_DATASETS_DIR', 'gs://test-reference-data-dir') + @patch.object( + Env, + 'SEQR_APP_HAIL_SEARCH_DATA_DIR', + '/var/seqr/seqr-hail-search-data', + ) + @patch.object( + Env, + 'SEQR_APP_REFERENCE_DATASETS_DIR', + '/var/seqr/seqr-reference-data', + ) + @patch.object( + FeatureFlag, + 'INCLUDE_PIPELINE_VERSION_IN_PREFIX', + False, + ) + def test_rsync_to_seqr_app_dirs_sync( + self, + mock_subprocess: Mock, + mock_run_pipeline_task: Mock, + ) -> None: + mock_run_pipeline_task.return_value = MockCompleteTask() + worker = luigi.worker.Worker() + task = RsyncToSeqrAppDirsTask( + reference_genome=ReferenceGenome.GRCh38, + dataset_type=DatasetType.SNV_INDEL, + sample_type=SampleType.WGS, + callset_path='test_callset', + project_guids=['R0113_test_project'], + project_remap_paths=['test_remap'], + project_pedigree_paths=['test_pedigree'], + run_id='manual__2024-04-02', + ) + worker.add(task) + worker.run() + self.assertTrue(task.complete()) + mock_subprocess.call.assert_has_calls( + [ + call( + [ + '/bin/bash', + '-cx', + 'mkdir -p /var/seqr/seqr-hail-search-data/GRCh38/SNV_INDEL && gsutil -qm rsync -rd -x .*runs.* gs://test-hail-search-dir/GRCh38/SNV_INDEL /var/seqr/seqr-hail-search-data/GRCh38/SNV_INDEL && sync /var/seqr/seqr-hail-search-data/GRCh38/SNV_INDEL', + ], + ), + call( + [ + '/bin/bash', + '-cx', + 'mkdir -p /var/seqr/seqr-reference-data/GRCh38/SNV_INDEL/high_af_variants.ht && gsutil -qm rsync -rd -x .*runs.* gs://test-reference-data-dir/GRCh38/SNV_INDEL/high_af_variants.ht /var/seqr/seqr-reference-data/GRCh38/SNV_INDEL/high_af_variants.ht && sync /var/seqr/seqr-reference-data/GRCh38/SNV_INDEL/high_af_variants.ht', + ], + ), + call( + [ + '/bin/bash', + '-cx', + 'mkdir -p /var/seqr/seqr-reference-data/GRCh38/SNV_INDEL/clinvar_path_variants.ht && gsutil -qm rsync -rd -x .*runs.* gs://test-reference-data-dir/GRCh38/SNV_INDEL/clinvar_path_variants.ht /var/seqr/seqr-reference-data/GRCh38/SNV_INDEL/clinvar_path_variants.ht && sync /var/seqr/seqr-reference-data/GRCh38/SNV_INDEL/clinvar_path_variants.ht', + ], + ), + ], + any_order=True, + ) diff --git a/v03_pipeline/lib/tasks/dataproc/run_pipeline_on_dataproc.py b/v03_pipeline/lib/tasks/dataproc/run_pipeline_on_dataproc.py new file mode 100644 index 000000000..15ee74fb4 --- /dev/null +++ b/v03_pipeline/lib/tasks/dataproc/run_pipeline_on_dataproc.py @@ -0,0 +1,16 @@ +import luigi + +from v03_pipeline.lib.tasks.base.base_loading_run_params import ( + BaseLoadingRunParams, +) +from v03_pipeline.lib.tasks.dataproc.base_run_job_on_dataproc import ( + BaseRunJobOnDataprocTask, +) +from v03_pipeline.lib.tasks.run_pipeline import RunPipelineTask + + +@luigi.util.inherits(BaseLoadingRunParams) +class RunPipelineOnDataprocTask(BaseRunJobOnDataprocTask): + @property + def task(self) -> luigi.Task: + return RunPipelineTask diff --git a/v03_pipeline/lib/tasks/dataproc/write_success_file_on_dataproc_test.py b/v03_pipeline/lib/tasks/dataproc/run_pipeline_on_dataproc_test.py similarity index 91% rename from v03_pipeline/lib/tasks/dataproc/write_success_file_on_dataproc_test.py rename to v03_pipeline/lib/tasks/dataproc/run_pipeline_on_dataproc_test.py index 62cb4d678..abde6833f 100644 --- a/v03_pipeline/lib/tasks/dataproc/write_success_file_on_dataproc_test.py +++ b/v03_pipeline/lib/tasks/dataproc/run_pipeline_on_dataproc_test.py @@ -6,8 +6,8 @@ import luigi from v03_pipeline.lib.model import DatasetType, ReferenceGenome, SampleType -from v03_pipeline.lib.tasks.dataproc.write_success_file_on_dataproc import ( - WriteSuccessFileOnDataprocTask, +from v03_pipeline.lib.tasks.dataproc.run_pipeline_on_dataproc import ( + RunPipelineOnDataprocTask, ) from v03_pipeline.lib.test.mock_complete_task import MockCompleteTask @@ -38,7 +38,7 @@ def test_job_already_exists_failed( google.api_core.exceptions.AlreadyExists('job exists') ) worker = luigi.worker.Worker() - task = WriteSuccessFileOnDataprocTask( + task = RunPipelineOnDataprocTask( reference_genome=ReferenceGenome.GRCh38, dataset_type=DatasetType.SNV_INDEL, sample_type=SampleType.WGS, @@ -54,7 +54,7 @@ def test_job_already_exists_failed( mock_logger.error.assert_has_calls( [ call( - 'Job WriteSuccessFileOnDataprocTask-manual__2024-04-03 entered ERROR state', + 'Job RunPipelineTask-manual__2024-04-03 entered ERROR state', ), ], ) @@ -70,7 +70,7 @@ def test_job_already_exists_success( status=SimpleNamespace(state='DONE'), ) worker = luigi.worker.Worker() - task = WriteSuccessFileOnDataprocTask( + task = RunPipelineOnDataprocTask( reference_genome=ReferenceGenome.GRCh38, dataset_type=DatasetType.SNV_INDEL, sample_type=SampleType.WGS, @@ -102,7 +102,7 @@ def test_job_failed( 'FailedPrecondition: 400 Job failed with message', ) worker = luigi.worker.Worker() - task = WriteSuccessFileOnDataprocTask( + task = RunPipelineOnDataprocTask( reference_genome=ReferenceGenome.GRCh38, dataset_type=DatasetType.SNV_INDEL, sample_type=SampleType.WGS, @@ -118,7 +118,7 @@ def test_job_failed( mock_logger.info.assert_has_calls( [ call( - 'Waiting for job completion WriteSuccessFileOnDataprocTask-manual__2024-04-05', + 'Waiting for job completion RunPipelineTask-manual__2024-04-05', ), ], ) @@ -141,7 +141,7 @@ def test_job_success( operation = mock_client.submit_job_as_operation.return_value operation.done.side_effect = [False, True] worker = luigi.worker.Worker() - task = WriteSuccessFileOnDataprocTask( + task = RunPipelineOnDataprocTask( reference_genome=ReferenceGenome.GRCh38, dataset_type=DatasetType.SNV_INDEL, sample_type=SampleType.WGS, diff --git a/v03_pipeline/lib/tasks/dataproc/write_success_file_on_dataproc.py b/v03_pipeline/lib/tasks/dataproc/write_success_file_on_dataproc.py deleted file mode 100644 index 0c48c344f..000000000 --- a/v03_pipeline/lib/tasks/dataproc/write_success_file_on_dataproc.py +++ /dev/null @@ -1,22 +0,0 @@ -import luigi - -from v03_pipeline.lib.paths import pipeline_run_success_file_path -from v03_pipeline.lib.tasks.base.base_loading_run_params import ( - BaseLoadingRunParams, -) -from v03_pipeline.lib.tasks.dataproc.base_run_job_on_dataproc import ( - BaseRunJobOnDataprocTask, -) -from v03_pipeline.lib.tasks.files import GCSorLocalTarget - - -@luigi.util.inherits(BaseLoadingRunParams) -class WriteSuccessFileOnDataprocTask(BaseRunJobOnDataprocTask): - def output(self) -> luigi.Target: - return GCSorLocalTarget( - pipeline_run_success_file_path( - self.reference_genome, - self.dataset_type, - self.run_id, - ), - ) diff --git a/v03_pipeline/lib/tasks/run_pipeline.py b/v03_pipeline/lib/tasks/run_pipeline.py new file mode 100644 index 000000000..1711143d2 --- /dev/null +++ b/v03_pipeline/lib/tasks/run_pipeline.py @@ -0,0 +1,32 @@ +import luigi +import luigi.util + +from v03_pipeline.lib.tasks.base.base_loading_run_params import ( + BaseLoadingRunParams, +) +from v03_pipeline.lib.tasks.update_variant_annotations_table_with_new_samples import ( + UpdateVariantAnnotationsTableWithNewSamplesTask, +) +from v03_pipeline.lib.tasks.write_metadata_for_run import WriteMetadataForRunTask +from v03_pipeline.lib.tasks.write_project_family_tables import ( + WriteProjectFamilyTablesTask, +) + + +@luigi.util.inherits(BaseLoadingRunParams) +class RunPipelineTask(luigi.WrapperTask): + def requires(self): + requirements = [ + self.clone(WriteMetadataForRunTask), + self.clone(UpdateVariantAnnotationsTableWithNewSamplesTask), + ] + return [ + *requirements, + *[ + self.clone( + WriteProjectFamilyTablesTask, + project_i=i, + ) + for i in range(len(self.project_guids)) + ], + ] diff --git a/v03_pipeline/lib/tasks/validate_callset_test.py b/v03_pipeline/lib/tasks/validate_callset_test.py index a6d2b0377..8f3638376 100644 --- a/v03_pipeline/lib/tasks/validate_callset_test.py +++ b/v03_pipeline/lib/tasks/validate_callset_test.py @@ -83,6 +83,5 @@ def test_validate_callset_multiple_exceptions( 'Missing the following expected contigs:chr10, chr11, chr12, chr13, chr14, chr15, chr16, chr17, chr18, chr19, chr2, chr20, chr21, chr22, chr3, chr4, chr5, chr6, chr7, chr8, chr9, chrX', 'Sample type validation error: dataset sample-type is specified as WES but appears to be WGS because it contains many common non-coding variants', ], - 'failed_family_samples': {}, }, ) diff --git a/v03_pipeline/lib/tasks/write_success_file.py b/v03_pipeline/lib/tasks/write_success_file.py index 60c90f81d..2428c6ceb 100644 --- a/v03_pipeline/lib/tasks/write_success_file.py +++ b/v03_pipeline/lib/tasks/write_success_file.py @@ -1,16 +1,16 @@ import luigi import luigi.util +from v03_pipeline.lib.model.feature_flag import FeatureFlag from v03_pipeline.lib.paths import pipeline_run_success_file_path -from v03_pipeline.lib.tasks import WriteProjectFamilyTablesTask from v03_pipeline.lib.tasks.base.base_loading_run_params import ( BaseLoadingRunParams, ) -from v03_pipeline.lib.tasks.files import GCSorLocalTarget -from v03_pipeline.lib.tasks.update_variant_annotations_table_with_new_samples import ( - UpdateVariantAnnotationsTableWithNewSamplesTask, +from v03_pipeline.lib.tasks.dataproc.rsync_to_seqr_app_dirs import ( + RsyncToSeqrAppDirsTask, ) -from v03_pipeline.lib.tasks.write_metadata_for_run import WriteMetadataForRunTask +from v03_pipeline.lib.tasks.files import GCSorLocalTarget +from v03_pipeline.lib.tasks.run_pipeline import RunPipelineTask @luigi.util.inherits(BaseLoadingRunParams) @@ -24,21 +24,12 @@ def output(self) -> luigi.Target: ), ) - def requires(self): - requirements = [ - self.clone(WriteMetadataForRunTask), - self.clone(UpdateVariantAnnotationsTableWithNewSamplesTask), - ] - return [ - *requirements, - *[ - self.clone( - WriteProjectFamilyTablesTask, - project_i=i, - ) - for i in range(len(self.project_guids)) - ], - ] + def requires(self) -> luigi.Task: + return ( + self.clone(RsyncToSeqrAppDirsTask) + if FeatureFlag.RUN_PIPELINE_ON_DATAPROC + else self.clone(RunPipelineTask) + ) def run(self): with self.output().open('w') as f: diff --git a/v03_pipeline/lib/tasks/write_success_file_test.py b/v03_pipeline/lib/tasks/write_success_file_test.py index 18df69e86..4949481a6 100644 --- a/v03_pipeline/lib/tasks/write_success_file_test.py +++ b/v03_pipeline/lib/tasks/write_success_file_test.py @@ -10,24 +10,13 @@ class WriteSuccessFileTaskTest(MockedDatarootTestCase): @mock.patch( - 'v03_pipeline.lib.tasks.write_success_file.WriteMetadataForRunTask', - ) - @mock.patch( - 'v03_pipeline.lib.tasks.write_success_file.WriteProjectFamilyTablesTask', - ) - @mock.patch( - 'v03_pipeline.lib.tasks.write_success_file.UpdateVariantAnnotationsTableWithNewSamplesTask', + 'v03_pipeline.lib.tasks.write_success_file.RunPipelineTask', ) def test_write_success_file_task( self, - mock_update_variant_annotations_task, - mock_write_project_fam_tables, - mock_write_metadata_for_run_task, + mock_run_pipeline_task: mock.Mock, ) -> None: - mock_write_metadata_for_run_task.return_value = MockCompleteTask() - mock_update_variant_annotations_task.return_value = MockCompleteTask() - mock_write_project_fam_tables.return_value = MockCompleteTask() - + mock_run_pipeline_task.return_value = MockCompleteTask() worker = luigi.worker.Worker() write_success_file = WriteSuccessFileTask( reference_genome=ReferenceGenome.GRCh38, diff --git a/v03_pipeline/lib/tasks/write_validation_errors_for_run.py b/v03_pipeline/lib/tasks/write_validation_errors_for_run.py index d99800f6f..c59fce7e0 100644 --- a/v03_pipeline/lib/tasks/write_validation_errors_for_run.py +++ b/v03_pipeline/lib/tasks/write_validation_errors_for_run.py @@ -15,7 +15,7 @@ class WriteValidationErrorsForRunTask(luigi.Task): project_guids = luigi.ListParameter() error_messages = luigi.ListParameter(default=[]) - failed_family_samples = luigi.DictParameter(default={}) + error_body = luigi.DictParameter(default={}) def to_single_error_message(self) -> str: with self.output().open('r') as f: @@ -37,8 +37,8 @@ def run(self) -> None: validation_errors_json = { 'project_guids': self.project_guids, 'error_messages': self.error_messages, - 'failed_family_samples': luigi.freezing.recursively_unfreeze( - self.failed_family_samples, + **luigi.freezing.recursively_unfreeze( + self.error_body, ), } with self.output().open('w') as f: @@ -57,7 +57,7 @@ def wrapper(self: luigi.Task): write_validation_errors_for_run_task = self.clone( WriteValidationErrorsForRunTask, error_messages=[str(e.args[0])], - failed_family_samples=e.args[1]['failed_family_samples'], + error_body=e.args[1], ) else: write_validation_errors_for_run_task = self.clone(