diff --git a/v03_pipeline/lib/misc/callsets.py b/v03_pipeline/lib/misc/callsets.py index e65e9bc61..34ac4f8a7 100644 --- a/v03_pipeline/lib/misc/callsets.py +++ b/v03_pipeline/lib/misc/callsets.py @@ -6,14 +6,11 @@ from v03_pipeline.lib.paths import remapped_and_subsetted_callset_path -def get_callset_ht( # noqa: PLR0913 +def get_callset_ht( reference_genome: ReferenceGenome, dataset_type: DatasetType, - callset_paths: list[str], + callset_path: str, project_guids: list[str], - project_remap_paths: list[str], - project_pedigree_paths: list[str], - imputed_sex_paths: list[str] | None, ): callset_hts = [ hl.read_matrix_table( @@ -24,58 +21,10 @@ def get_callset_ht( # noqa: PLR0913 project_guid, ), ).rows() - for (callset_path, project_guid, _, _, _) in callset_project_pairs( - callset_paths, - project_guids, - project_remap_paths, - project_pedigree_paths, - imputed_sex_paths, - ) + for project_guid in project_guids ] callset_ht = functools.reduce( (lambda ht1, ht2: ht1.union(ht2, unify=True)), callset_hts, ) return callset_ht.distinct() - - -def callset_project_pairs( - callset_paths: list[str], - project_guids: list[str], - project_remap_paths: list[str], - project_pedigree_paths: list[str], - imputed_sex_paths: list[str] | None, -): - if len(callset_paths) == len(project_guids): - return zip( - callset_paths, - project_guids, - project_remap_paths, - project_pedigree_paths, - imputed_sex_paths - if imputed_sex_paths is not None - else [None] * len(callset_paths), - strict=True, - ) - return ( - ( - callset_path, - project_guid, - project_remap_path, - project_pedigree_path, - imputed_sex_path, - ) - for callset_path, imputed_sex_path in zip( - callset_paths, - imputed_sex_paths - if imputed_sex_paths is not None - else [None] * len(callset_paths), - strict=False, - ) - for (project_guid, project_remap_path, project_pedigree_path) in zip( - project_guids, - project_remap_paths, - project_pedigree_paths, - strict=True, - ) - ) diff --git a/v03_pipeline/lib/misc/io.py b/v03_pipeline/lib/misc/io.py index 91d07e851..992e2585c 100644 --- a/v03_pipeline/lib/misc/io.py +++ b/v03_pipeline/lib/misc/io.py @@ -121,7 +121,6 @@ def import_callset( callset_path: str, reference_genome: ReferenceGenome, dataset_type: DatasetType, - filters_path: str | None = None, ) -> hl.MatrixTable: if dataset_type == DatasetType.GCNV: mt = import_gcnv_bed_file(callset_path) @@ -131,9 +130,6 @@ def import_callset( mt = hl.read_matrix_table(callset_path) if dataset_type == DatasetType.SV: mt = mt.annotate_rows(variant_id=mt.rsid) - if filters_path: - filters_ht = import_vcf(filters_path, reference_genome).rows() - mt = mt.annotate_rows(filters=filters_ht[mt.row_key].filters) return mt.key_rows_by(*dataset_type.table_key_type(reference_genome).fields) diff --git a/v03_pipeline/lib/model/dataset_type.py b/v03_pipeline/lib/model/dataset_type.py index b376a6ae8..e7af6983f 100644 --- a/v03_pipeline/lib/model/dataset_type.py +++ b/v03_pipeline/lib/model/dataset_type.py @@ -4,7 +4,7 @@ import hail as hl from v03_pipeline.lib.annotations import gcnv, mito, shared, snv_indel, sv -from v03_pipeline.lib.model.definitions import ReferenceGenome +from v03_pipeline.lib.model.definitions import ReferenceGenome, SampleType MITO_MIN_HOM_THRESHOLD = 0.95 ZERO = 0.0 @@ -155,6 +155,12 @@ def has_gencode_ensembl_to_refseq_id_mapping( self == DatasetType.SNV_INDEL and reference_genome == ReferenceGenome.GRCh38 ) + def expect_filters( + self, + sample_type: SampleType, + ) -> bool: + return self == DatasetType.SNV_INDEL and sample_type == SampleType.WES + @property def has_gencode_gene_symbol_to_gene_id_mapping(self) -> bool: return self == DatasetType.SV diff --git a/v03_pipeline/lib/model/environment.py b/v03_pipeline/lib/model/environment.py index d89567d8b..4681939ca 100644 --- a/v03_pipeline/lib/model/environment.py +++ b/v03_pipeline/lib/model/environment.py @@ -2,12 +2,12 @@ from dataclasses import dataclass # NB: using os.environ.get inside the dataclass defaults gives a lint error. -ACCESS_PRIVATE_REFERENCE_DATASETS = ( - os.environ.get('ACCESS_PRIVATE_REFERENCE_DATASETS') == '1' -) -REFERENCE_DATA_AUTO_UPDATE = os.environ.get('REFERENCE_DATA_AUTO_UPDATE') == '1' HAIL_TMPDIR = os.environ.get('HAIL_TMPDIR', '/tmp') # noqa: S108 HAIL_SEARCH_DATA = os.environ.get('HAIL_SEARCH_DATA', '/hail-search-data') +LIFTOVER_REF_PATH = os.environ.get( + 'LIFTOVER_REF_PATH', + 'gs://hail-common/references/grch38_to_grch37.over.chain.gz', +) LOADING_DATASETS = os.environ.get('LOADING_DATASETS', '/seqr-loading-temp') PRIVATE_REFERENCE_DATASETS = os.environ.get( 'PRIVATE_REFERENCE_DATASETS', @@ -19,21 +19,34 @@ ) VEP_CONFIG_PATH = os.environ.get('VEP_CONFIG_PATH', None) VEP_CONFIG_URI = os.environ.get('VEP_CONFIG_URI', None) -SHOULD_REGISTER_ALLELES = os.environ.get('SHOULD_REGISTER_ALLELES') == '1' + +# Allele registry secrets :/ ALLELE_REGISTRY_SECRET_NAME = os.environ.get('ALLELE_REGISTRY_SECRET_NAME', None) PROJECT_ID = os.environ.get('PROJECT_ID', None) +# Feature Flags +ACCESS_PRIVATE_REFERENCE_DATASETS = ( + os.environ.get('ACCESS_PRIVATE_REFERENCE_DATASETS') == '1' +) +CHECK_SEX_AND_RELATEDNESS = os.environ.get('CHECK_SEX_AND_RELATEDNESS') == '1' +EXPECT_WES_FILTERS = os.environ.get('EXPECT_WES_FILTERS') == '1' +REFERENCE_DATA_AUTO_UPDATE = os.environ.get('REFERENCE_DATA_AUTO_UPDATE') == '1' +SHOULD_REGISTER_ALLELES = os.environ.get('SHOULD_REGISTER_ALLELES') == '1' + @dataclass class Env: ACCESS_PRIVATE_REFERENCE_DATASETS: bool = ACCESS_PRIVATE_REFERENCE_DATASETS ALLELE_REGISTRY_SECRET_NAME: str | None = ALLELE_REGISTRY_SECRET_NAME - REFERENCE_DATA_AUTO_UPDATE: bool = REFERENCE_DATA_AUTO_UPDATE + CHECK_SEX_AND_RELATEDNESS: bool = CHECK_SEX_AND_RELATEDNESS + EXPECT_WES_FILTERS: bool = EXPECT_WES_FILTERS HAIL_TMPDIR: str = HAIL_TMPDIR HAIL_SEARCH_DATA: str = HAIL_SEARCH_DATA + LIFTOVER_REF_PATH: str = LIFTOVER_REF_PATH LOADING_DATASETS: str = LOADING_DATASETS PRIVATE_REFERENCE_DATASETS: str = PRIVATE_REFERENCE_DATASETS PROJECT_ID: str | None = PROJECT_ID + REFERENCE_DATA_AUTO_UPDATE: bool = REFERENCE_DATA_AUTO_UPDATE REFERENCE_DATASETS: str = REFERENCE_DATASETS SHOULD_REGISTER_ALLELES: bool = SHOULD_REGISTER_ALLELES VEP_CONFIG_PATH: str | None = VEP_CONFIG_PATH diff --git a/v03_pipeline/lib/paths.py b/v03_pipeline/lib/paths.py index 14482d831..3ab830e5f 100644 --- a/v03_pipeline/lib/paths.py +++ b/v03_pipeline/lib/paths.py @@ -1,5 +1,6 @@ import hashlib import os +import re from v03_pipeline.lib.model import ( AccessControl, @@ -9,6 +10,7 @@ PipelineVersion, ReferenceDatasetCollection, ReferenceGenome, + SampleType, ) @@ -73,6 +75,22 @@ def family_table_path( ) +def imputed_sex_path( + reference_genome: ReferenceGenome, + dataset_type: DatasetType, + callset_path: str, +) -> str: + return os.path.join( + _v03_pipeline_prefix( + Env.LOADING_DATASETS, + reference_genome, + dataset_type, + ), + 'imputed_sex', + f'{hashlib.sha256(callset_path.encode("utf8")).hexdigest()}.tsv', + ) + + def imported_callset_path( reference_genome: ReferenceGenome, dataset_type: DatasetType, @@ -198,6 +216,24 @@ def sex_check_table_path( ) +def valid_filters_path( + dataset_type: DatasetType, + sample_type: SampleType, + callset_path: str, +) -> str | None: + if ( + not Env.EXPECT_WES_FILTERS + or not dataset_type.expect_filters(sample_type) + or 'part_one_outputs' not in callset_path + ): + return None + return re.sub( + 'part_one_outputs/.*$', + 'part_two_outputs/*.filtered.*.vcf.gz', + callset_path, + ) + + def valid_reference_dataset_collection_path( reference_genome: ReferenceGenome, dataset_type: DatasetType, diff --git a/v03_pipeline/lib/paths_test.py b/v03_pipeline/lib/paths_test.py index d6f0b10ba..ff437cf45 100644 --- a/v03_pipeline/lib/paths_test.py +++ b/v03_pipeline/lib/paths_test.py @@ -6,11 +6,13 @@ DatasetType, ReferenceDatasetCollection, ReferenceGenome, + SampleType, ) from v03_pipeline.lib.paths import ( cached_reference_dataset_query_path, family_table_path, imported_callset_path, + imputed_sex_path, lookup_table_path, metadata_for_run_path, new_variants_table_path, @@ -18,6 +20,7 @@ relatedness_check_table_path, remapped_and_subsetted_callset_path, sex_check_table_path, + valid_filters_path, valid_reference_dataset_collection_path, variant_annotations_table_path, ) @@ -54,6 +57,26 @@ def test_family_table_path(self) -> None: 'gs://seqr-datasets/v03/GRCh37/SNV_INDEL/families/franklin.ht', ) + def test_valid_filters_path(self) -> None: + self.assertEqual( + valid_filters_path( + DatasetType.MITO, + SampleType.WES, + 'gs://bucket/RDG_Broad_WES_Internal_Oct2023/part_one_outputs/chr*/*.vcf.gz', + ), + None, + ) + with patch('v03_pipeline.lib.paths.Env') as mock_env: + mock_env.EXPECT_WES_FILTERS = True + self.assertEqual( + valid_filters_path( + DatasetType.SNV_INDEL, + SampleType.WES, + 'gs://bucket/RDG_Broad_WES_Internal_Oct2023/part_one_outputs/chr*/*.vcf.gz', + ), + 'gs://bucket/RDG_Broad_WES_Internal_Oct2023/part_two_outputs/*.filtered.*.vcf.gz', + ) + def test_project_table_path(self) -> None: self.assertEqual( project_table_path( @@ -162,6 +185,16 @@ def test_imported_callset_path(self) -> None: '/seqr-loading-temp/v03/GRCh38/SNV_INDEL/imported_callsets/ead56bb177a5de24178e1e622ce1d8beb3f8892bdae1c925d22ca0af4013d6dd.mt', ) + def test_imputed_sex_path(self) -> None: + self.assertEqual( + imputed_sex_path( + ReferenceGenome.GRCh38, + DatasetType.SNV_INDEL, + 'gs://abc.efg/callset.vcf.gz', + ), + '/seqr-loading-temp/v03/GRCh38/SNV_INDEL/imputed_sex/ead56bb177a5de24178e1e622ce1d8beb3f8892bdae1c925d22ca0af4013d6dd.tsv', + ) + def test_new_variants_table_path(self) -> None: self.assertEqual( new_variants_table_path( diff --git a/v03_pipeline/lib/tasks/base/base_loading_run_params.py b/v03_pipeline/lib/tasks/base/base_loading_run_params.py new file mode 100644 index 000000000..f5ce3b3e4 --- /dev/null +++ b/v03_pipeline/lib/tasks/base/base_loading_run_params.py @@ -0,0 +1,36 @@ +import luigi + +from v03_pipeline.lib.model import SampleType + + +class BaseLoadingRunParams(luigi.Task): + # NB: + # These params are "inherited" with the special + # luigi.util.inherits function, copying params + # but nothing else. + sample_type = luigi.EnumParameter(enum=SampleType) + callset_path = luigi.Parameter() + ignore_missing_samples_when_remapping = luigi.BoolParameter( + default=False, + parsing=luigi.BoolParameter.EXPLICIT_PARSING, + ) + force = luigi.BoolParameter( + default=False, + parsing=luigi.BoolParameter.EXPLICIT_PARSING, + ) + skip_check_sex_and_relatedness = luigi.BoolParameter( + default=False, + parsing=luigi.BoolParameter.EXPLICIT_PARSING, + ) + skip_expect_filters = luigi.BoolParameter( + default=False, + parsing=luigi.BoolParameter.EXPLICIT_PARSING, + ) + skip_validation = luigi.BoolParameter( + default=False, + parsing=luigi.BoolParameter.EXPLICIT_PARSING, + ) + is_new_gcnv_joint_call = luigi.BoolParameter( + default=False, + description='Is this a fully joint-called callset.', + ) diff --git a/v03_pipeline/lib/tasks/update_lookup_table.py b/v03_pipeline/lib/tasks/update_lookup_table.py index 0c389b713..eb04d1d48 100644 --- a/v03_pipeline/lib/tasks/update_lookup_table.py +++ b/v03_pipeline/lib/tasks/update_lookup_table.py @@ -1,14 +1,14 @@ import hail as hl import luigi +import luigi.util -from v03_pipeline.lib.misc.callsets import callset_project_pairs from v03_pipeline.lib.misc.lookup import ( compute_callset_lookup_ht, join_lookup_hts, remove_family_guids, ) -from v03_pipeline.lib.model import SampleType from v03_pipeline.lib.model.constants import PROJECTS_EXCLUDED_FROM_LOOKUP +from v03_pipeline.lib.tasks.base.base_loading_run_params import BaseLoadingRunParams from v03_pipeline.lib.tasks.base.base_update_lookup_table import ( BaseUpdateLookupTableTask, ) @@ -17,25 +17,11 @@ ) +@luigi.util.inherits(BaseLoadingRunParams) class UpdateLookupTableTask(BaseUpdateLookupTableTask): - sample_type = luigi.EnumParameter(enum=SampleType) - callset_paths = luigi.ListParameter() project_guids = luigi.ListParameter() project_remap_paths = luigi.ListParameter() project_pedigree_paths = luigi.ListParameter() - imputed_sex_paths = luigi.ListParameter(default=None) - ignore_missing_samples_when_remapping = luigi.BoolParameter( - default=False, - parsing=luigi.BoolParameter.EXPLICIT_PARSING, - ) - validate = luigi.BoolParameter( - default=True, - parsing=luigi.BoolParameter.EXPLICIT_PARSING, - ) - force = luigi.BoolParameter( - default=False, - parsing=luigi.BoolParameter.EXPLICIT_PARSING, - ) def complete(self) -> bool: return ( @@ -47,23 +33,11 @@ def complete(self) -> bool: [ updates.contains( hl.Struct( - callset=callset_path, + callset=self.callset_path, project_guid=project_guid, ), ) - for ( - callset_path, - project_guid, - _, - _, - _, - ) in callset_project_pairs( - self.callset_paths, - self.project_guids, - self.project_remap_paths, - self.project_pedigree_paths, - self.imputed_sex_paths, - ) + for project_guid in self.project_guids ], ), hl.read_table(self.output().path).updates, @@ -73,51 +47,34 @@ def complete(self) -> bool: def requires(self) -> list[luigi.Task]: return [ - WriteRemappedAndSubsettedCallsetTask( - self.reference_genome, - self.dataset_type, - self.sample_type, - callset_path, - project_guid, - project_remap_path, - project_pedigree_path, - imputed_sex_path, - self.ignore_missing_samples_when_remapping, - self.validate, - False, + self.clone( + WriteRemappedAndSubsettedCallsetTask, + project_guid=project_guid, + project_remap_path=project_remap_path, + project_pedigree_path=project_pedigree_path, + force=False, ) for ( - callset_path, project_guid, project_remap_path, project_pedigree_path, - imputed_sex_path, - ) in callset_project_pairs( - self.callset_paths, + ) in zip( self.project_guids, self.project_remap_paths, self.project_pedigree_paths, - self.imputed_sex_paths, + strict=True, ) ] def update_table(self, ht: hl.Table) -> hl.Table: # NB: there's a chance this many hail operations blows the DAG compute stack # in an unfortunate way. Please keep an eye out! - for i, (callset_path, project_guid, _, _, _) in enumerate( - callset_project_pairs( - self.callset_paths, - self.project_guids, - self.project_remap_paths, - self.project_pedigree_paths, - self.imputed_sex_paths, - ), - ): + for i, project_guid in enumerate(self.project_guids): if project_guid in PROJECTS_EXCLUDED_FROM_LOOKUP: ht = ht.annotate_globals( updates=ht.updates.add( hl.Struct( - callset=callset_path, + callset=self.callset_path, project_guid=project_guid, ), ), @@ -143,7 +100,7 @@ def update_table(self, ht: hl.Table) -> hl.Table: project_families=ht.project_families, updates=ht.updates.add( hl.Struct( - callset=callset_path, + callset=self.callset_path, project_guid=project_guid, ), ), diff --git a/v03_pipeline/lib/tasks/update_lookup_table_test.py b/v03_pipeline/lib/tasks/update_lookup_table_test.py index a81fe6a35..8551d873e 100644 --- a/v03_pipeline/lib/tasks/update_lookup_table_test.py +++ b/v03_pipeline/lib/tasks/update_lookup_table_test.py @@ -19,13 +19,13 @@ def test_skip_update_lookup_table_task(self) -> None: reference_genome=ReferenceGenome.GRCh38, dataset_type=DatasetType.SNV_INDEL, sample_type=SampleType.WGS, - callset_paths=[TEST_VCF], + callset_path=TEST_VCF, project_guids=[ 'R0555_seqr_demo', ], # a project excluded from the lookup table project_remap_paths=[TEST_REMAP], project_pedigree_paths=[TEST_PEDIGREE_3], - validate=False, + skip_validation=True, ) worker.add(uslt_task) worker.run() @@ -52,11 +52,11 @@ def test_update_lookup_table_task(self) -> None: reference_genome=ReferenceGenome.GRCh38, dataset_type=DatasetType.SNV_INDEL, sample_type=SampleType.WGS, - callset_paths=[TEST_VCF], + callset_path=TEST_VCF, project_guids=['R0113_test_project'], project_remap_paths=[TEST_REMAP], project_pedigree_paths=[TEST_PEDIGREE_3], - validate=False, + skip_validation=True, ) worker.add(uslt_task) worker.run() diff --git a/v03_pipeline/lib/tasks/update_project_table.py b/v03_pipeline/lib/tasks/update_project_table.py index 288b6b20c..c7ea539e4 100644 --- a/v03_pipeline/lib/tasks/update_project_table.py +++ b/v03_pipeline/lib/tasks/update_project_table.py @@ -1,5 +1,6 @@ import hail as hl import luigi +import luigi.util from v03_pipeline.lib.annotations.fields import get_fields from v03_pipeline.lib.misc.family_entries import ( @@ -7,7 +8,7 @@ join_family_entries_hts, remove_family_guids, ) -from v03_pipeline.lib.model import SampleType +from v03_pipeline.lib.tasks.base.base_loading_run_params import BaseLoadingRunParams from v03_pipeline.lib.tasks.base.base_update_project_table import ( BaseUpdateProjectTableTask, ) @@ -16,28 +17,10 @@ ) +@luigi.util.inherits(BaseLoadingRunParams) class UpdateProjectTableTask(BaseUpdateProjectTableTask): - sample_type = luigi.EnumParameter(enum=SampleType) - callset_path = luigi.Parameter() project_remap_path = luigi.Parameter() project_pedigree_path = luigi.Parameter() - imputed_sex_path = luigi.Parameter(default=None) - ignore_missing_samples_when_remapping = luigi.BoolParameter( - default=False, - parsing=luigi.BoolParameter.EXPLICIT_PARSING, - ) - validate = luigi.BoolParameter( - default=True, - parsing=luigi.BoolParameter.EXPLICIT_PARSING, - ) - force = luigi.BoolParameter( - default=False, - parsing=luigi.BoolParameter.EXPLICIT_PARSING, - ) - is_new_gcnv_joint_call = luigi.BoolParameter( - default=False, - description='Is this a fully joint-called callset.', - ) def complete(self) -> bool: return ( @@ -51,19 +34,7 @@ def complete(self) -> bool: ) def requires(self) -> luigi.Task: - return WriteRemappedAndSubsettedCallsetTask( - self.reference_genome, - self.dataset_type, - self.sample_type, - self.callset_path, - self.project_guid, - self.project_remap_path, - self.project_pedigree_path, - self.imputed_sex_path, - self.ignore_missing_samples_when_remapping, - self.validate, - False, - ) + return self.clone(WriteRemappedAndSubsettedCallsetTask, force=False) def update_table(self, ht: hl.Table) -> hl.Table: callset_mt = hl.read_matrix_table(self.input().path) diff --git a/v03_pipeline/lib/tasks/update_project_table_test.py b/v03_pipeline/lib/tasks/update_project_table_test.py index bc92b762f..c0a5a4e57 100644 --- a/v03_pipeline/lib/tasks/update_project_table_test.py +++ b/v03_pipeline/lib/tasks/update_project_table_test.py @@ -21,7 +21,7 @@ def test_update_project_table_task(self) -> None: project_guid='R0113_test_project', project_remap_path=TEST_REMAP, project_pedigree_path=TEST_PEDIGREE_3, - validate=False, + skip_validation=True, ) worker.add(upt_task) worker.run() diff --git a/v03_pipeline/lib/tasks/update_variant_annotations_table_with_new_samples.py b/v03_pipeline/lib/tasks/update_variant_annotations_table_with_new_samples.py index a38bd2bc7..5ba1448c9 100644 --- a/v03_pipeline/lib/tasks/update_variant_annotations_table_with_new_samples.py +++ b/v03_pipeline/lib/tasks/update_variant_annotations_table_with_new_samples.py @@ -1,64 +1,33 @@ import hail as hl import luigi +import luigi.util from v03_pipeline.lib.annotations.fields import get_fields -from v03_pipeline.lib.misc.callsets import callset_project_pairs, get_callset_ht -from v03_pipeline.lib.model import SampleType +from v03_pipeline.lib.misc.callsets import get_callset_ht from v03_pipeline.lib.paths import ( lookup_table_path, new_variants_table_path, ) +from v03_pipeline.lib.tasks.base.base_loading_run_params import BaseLoadingRunParams from v03_pipeline.lib.tasks.base.base_update_variant_annotations_table import ( BaseUpdateVariantAnnotationsTableTask, ) from v03_pipeline.lib.tasks.write_new_variants_table import WriteNewVariantsTableTask +@luigi.util.inherits(BaseLoadingRunParams) class UpdateVariantAnnotationsTableWithNewSamplesTask( BaseUpdateVariantAnnotationsTableTask, ): - sample_type = luigi.EnumParameter(enum=SampleType) - callset_paths = luigi.ListParameter() project_guids = luigi.ListParameter() project_remap_paths = luigi.ListParameter() project_pedigree_paths = luigi.ListParameter() - imputed_sex_paths = luigi.ListParameter(default=None) - ignore_missing_samples_when_remapping = luigi.BoolParameter( - default=False, - parsing=luigi.BoolParameter.EXPLICIT_PARSING, - ) - validate = luigi.BoolParameter( - default=True, - parsing=luigi.BoolParameter.EXPLICIT_PARSING, - ) - force = luigi.BoolParameter( - default=False, - parsing=luigi.BoolParameter.EXPLICIT_PARSING, - ) - liftover_ref_path = luigi.OptionalParameter( - default='gs://hail-common/references/grch38_to_grch37.over.chain.gz', - description='Path to GRCh38 to GRCh37 coordinates file', - ) run_id = luigi.Parameter() def requires(self) -> list[luigi.Task]: return [ *super().requires(), - WriteNewVariantsTableTask( - self.reference_genome, - self.dataset_type, - self.sample_type, - self.callset_paths, - self.project_guids, - self.project_remap_paths, - self.project_pedigree_paths, - self.imputed_sex_paths, - self.ignore_missing_samples_when_remapping, - self.validate, - self.force, - self.liftover_ref_path, - self.run_id, - ), + self.clone(WriteNewVariantsTableTask), ] def complete(self) -> bool: @@ -71,23 +40,11 @@ def complete(self) -> bool: [ updates.contains( hl.Struct( - callset=callset_path, + callset=self.callset_path, project_guid=project_guid, ), ) - for ( - callset_path, - project_guid, - _, - _, - _, - ) in callset_project_pairs( - self.callset_paths, - self.project_guids, - self.project_remap_paths, - self.project_pedigree_paths, - self.imputed_sex_paths, - ) + for project_guid in self.project_guids ], ), hl.read_table(self.output().path).updates, @@ -110,11 +67,8 @@ def update_table(self, ht: hl.Table) -> hl.Table: callset_ht = get_callset_ht( self.reference_genome, self.dataset_type, - self.callset_paths, + self.callset_path, self.project_guids, - self.project_remap_paths, - self.project_pedigree_paths, - self.imputed_sex_paths, ) # new_variants_ht consists of variants present in the new callset, fully annotated, # but NOT present in the existing annotations table. @@ -142,20 +96,8 @@ def update_table(self, ht: hl.Table) -> hl.Table: return ht.annotate_globals( updates=ht.updates.union( { - hl.Struct(callset=callset_path, project_guid=project_guid) - for ( - callset_path, - project_guid, - _, - _, - _, - ) in callset_project_pairs( - self.callset_paths, - self.project_guids, - self.project_remap_paths, - self.project_pedigree_paths, - self.imputed_sex_paths, - ) + hl.Struct(callset=self.callset_path, project_guid=project_guid) + for project_guid in self.project_guids }, ), ) diff --git a/v03_pipeline/lib/tasks/update_variant_annotations_table_with_new_samples_test.py b/v03_pipeline/lib/tasks/update_variant_annotations_table_with_new_samples_test.py index 996959fcf..7d9668544 100644 --- a/v03_pipeline/lib/tasks/update_variant_annotations_table_with_new_samples_test.py +++ b/v03_pipeline/lib/tasks/update_variant_annotations_table_with_new_samples_test.py @@ -43,7 +43,6 @@ from v03_pipeline.lib.test.mocked_dataroot_testcase import MockedDatarootTestCase from v03_pipeline.var.test.vep.mock_vep_data import MOCK_37_VEP_DATA, MOCK_38_VEP_DATA -TEST_LIFTOVER = 'v03_pipeline/var/test/liftover/grch38_to_grch37.over.chain.gz' TEST_MITO_MT = 'v03_pipeline/var/test/callsets/mito_1.mt' TEST_SNV_INDEL_VCF = 'v03_pipeline/var/test/callsets/1kg_30variants.vcf' TEST_SV_VCF = 'v03_pipeline/var/test/callsets/sv_1.vcf' @@ -159,12 +158,11 @@ def test_missing_pedigree( reference_genome=ReferenceGenome.GRCh38, dataset_type=DatasetType.SNV_INDEL, sample_type=SampleType.WGS, - callset_paths=[TEST_SNV_INDEL_VCF], + callset_path=TEST_SNV_INDEL_VCF, project_guids=['R0113_test_project'], project_remap_paths=[TEST_REMAP], project_pedigree_paths=['bad_pedigree'], - validate=False, - liftover_ref_path=TEST_LIFTOVER, + skip_validation=True, run_id=TEST_RUN_ID, ) worker = luigi.worker.Worker() @@ -193,12 +191,11 @@ def test_missing_interval_reference( reference_genome=ReferenceGenome.GRCh38, dataset_type=DatasetType.SNV_INDEL, sample_type=SampleType.WGS, - callset_paths=[TEST_SNV_INDEL_VCF], + callset_path=TEST_SNV_INDEL_VCF, project_guids=['R0113_test_project'], project_remap_paths=[TEST_REMAP], project_pedigree_paths=[TEST_PEDIGREE_3], - validate=False, - liftover_ref_path=TEST_LIFTOVER, + skip_validation=True, run_id=TEST_RUN_ID, ) worker = luigi.worker.Worker() @@ -362,12 +359,11 @@ def test_multiple_update_vat( reference_genome=ReferenceGenome.GRCh38, dataset_type=DatasetType.SNV_INDEL, sample_type=SampleType.WGS, - callset_paths=[TEST_SNV_INDEL_VCF], + callset_path=TEST_SNV_INDEL_VCF, project_guids=['R0113_test_project'], project_remap_paths=[TEST_REMAP], project_pedigree_paths=[TEST_PEDIGREE_3], - validate=True, - liftover_ref_path=TEST_LIFTOVER, + skip_validation=False, run_id=TEST_RUN_ID, ) worker.add(uvatwns_task_3) @@ -414,12 +410,11 @@ def test_multiple_update_vat( reference_genome=ReferenceGenome.GRCh38, dataset_type=DatasetType.SNV_INDEL, sample_type=SampleType.WGS, - callset_paths=[TEST_SNV_INDEL_VCF], + callset_path=TEST_SNV_INDEL_VCF, project_guids=['R0114_project4'], project_remap_paths=[TEST_REMAP], project_pedigree_paths=[TEST_PEDIGREE_4], - validate=True, - liftover_ref_path=TEST_LIFTOVER, + skip_validation=False, run_id=TEST_RUN_ID, ) worker.add(uvatwns_task_4) @@ -685,12 +680,11 @@ def test_update_vat_grch37( reference_genome=ReferenceGenome.GRCh37, dataset_type=DatasetType.SNV_INDEL, sample_type=SampleType.WGS, - callset_paths=[TEST_SNV_INDEL_VCF], + callset_path=TEST_SNV_INDEL_VCF, project_guids=['R0113_test_project'], project_remap_paths=[TEST_REMAP], project_pedigree_paths=[TEST_PEDIGREE_3], - validate=False, - liftover_ref_path=TEST_LIFTOVER, + skip_validation=True, run_id=TEST_RUN_ID, ) worker.add(uvatwns_task) @@ -765,12 +759,11 @@ def test_update_vat_without_accessing_private_datasets( reference_genome=ReferenceGenome.GRCh38, dataset_type=DatasetType.SNV_INDEL, sample_type=SampleType.WGS, - callset_paths=[TEST_SNV_INDEL_VCF], + callset_path=TEST_SNV_INDEL_VCF, project_guids=['R0113_test_project'], project_remap_paths=[TEST_REMAP], project_pedigree_paths=[TEST_PEDIGREE_3], - validate=False, - liftover_ref_path=TEST_LIFTOVER, + skip_validation=True, run_id=TEST_RUN_ID, ) worker.add(uvatwns_task) @@ -823,12 +816,11 @@ def test_mito_update_vat( reference_genome=ReferenceGenome.GRCh38, dataset_type=DatasetType.MITO, sample_type=SampleType.WGS, - callset_paths=[TEST_MITO_MT], + callset_path=TEST_MITO_MT, project_guids=['R0115_test_project2'], project_remap_paths=['not_a_real_file'], project_pedigree_paths=[TEST_PEDIGREE_5], - validate=False, - liftover_ref_path=TEST_LIFTOVER, + skip_validation=True, run_id=TEST_RUN_ID, ) ) @@ -1088,12 +1080,11 @@ def test_sv_update_vat( reference_genome=ReferenceGenome.GRCh38, dataset_type=DatasetType.SV, sample_type=SampleType.WGS, - callset_paths=[TEST_SV_VCF], + callset_path=TEST_SV_VCF, project_guids=['R0115_test_project2'], project_remap_paths=['not_a_real_file'], project_pedigree_paths=[TEST_PEDIGREE_5], - validate=False, - liftover_ref_path=TEST_LIFTOVER, + skip_validation=True, run_id=TEST_RUN_ID, ) ) @@ -1650,12 +1641,11 @@ def test_gcnv_update_vat( reference_genome=ReferenceGenome.GRCh38, dataset_type=DatasetType.GCNV, sample_type=SampleType.WES, - callset_paths=[TEST_GCNV_BED_FILE], + callset_path=TEST_GCNV_BED_FILE, project_guids=['R0115_test_project2'], project_remap_paths=['not_a_real_file'], project_pedigree_paths=[TEST_PEDIGREE_5], - validate=False, - liftover_ref_path=TEST_LIFTOVER, + skip_validation=True, run_id=TEST_RUN_ID, ) ) diff --git a/v03_pipeline/lib/tasks/write_family_table.py b/v03_pipeline/lib/tasks/write_family_table.py index 73400983f..ce4c8679c 100644 --- a/v03_pipeline/lib/tasks/write_family_table.py +++ b/v03_pipeline/lib/tasks/write_family_table.py @@ -1,8 +1,9 @@ import hail as hl import luigi +import luigi.util -from v03_pipeline.lib.model import SampleType from v03_pipeline.lib.paths import family_table_path +from v03_pipeline.lib.tasks.base.base_loading_run_params import BaseLoadingRunParams from v03_pipeline.lib.tasks.base.base_write import BaseWriteTask from v03_pipeline.lib.tasks.files import GCSorLocalTarget from v03_pipeline.lib.tasks.update_project_table import ( @@ -10,26 +11,11 @@ ) +@luigi.util.inherits(BaseLoadingRunParams) class WriteFamilyTableTask(BaseWriteTask): - sample_type = luigi.EnumParameter(enum=SampleType) - callset_path = luigi.Parameter() project_guid = luigi.Parameter() project_remap_path = luigi.Parameter() project_pedigree_path = luigi.Parameter() - imputed_sex_path = luigi.Parameter(default=None) - ignore_missing_samples_when_remapping = luigi.BoolParameter( - parsing=luigi.BoolParameter.EXPLICIT_PARSING, - ) - validate = luigi.BoolParameter( - parsing=luigi.BoolParameter.EXPLICIT_PARSING, - ) - force = luigi.BoolParameter( - default=False, - parsing=luigi.BoolParameter.EXPLICIT_PARSING, - ) - is_new_gcnv_joint_call = luigi.BoolParameter( - description='Is this a fully joint-called callset.', - ) family_guid = luigi.Parameter() def output(self) -> luigi.Target: @@ -51,20 +37,7 @@ def complete(self) -> bool: ) def requires(self) -> luigi.Task: - return UpdateProjectTableTask( - self.reference_genome, - self.dataset_type, - self.project_guid, - self.sample_type, - self.callset_path, - self.project_remap_path, - self.project_pedigree_path, - self.imputed_sex_path, - self.ignore_missing_samples_when_remapping, - self.validate, - False, - self.is_new_gcnv_joint_call, - ) + return self.clone(UpdateProjectTableTask, force=False) def create_table(self) -> hl.Table: project_ht = hl.read_table(self.input().path) diff --git a/v03_pipeline/lib/tasks/write_family_table_test.py b/v03_pipeline/lib/tasks/write_family_table_test.py index 1e205aa52..3adc96901 100644 --- a/v03_pipeline/lib/tasks/write_family_table_test.py +++ b/v03_pipeline/lib/tasks/write_family_table_test.py @@ -25,7 +25,7 @@ def test_snv_write_family_table_task(self) -> None: project_remap_path=TEST_REMAP, project_pedigree_path=TEST_PEDIGREE_3, family_guid='abc_1', - validate=False, + skip_validation=True, ) worker.add(wft_task) worker.run() @@ -162,7 +162,7 @@ def test_sv_write_family_table_task(self) -> None: project_remap_path='not_a_real_file', project_pedigree_path=TEST_PEDIGREE_5, family_guid='family_2_1', - validate=False, + skip_validation=True, ) worker.add(write_family_table_task) worker.run() @@ -414,7 +414,7 @@ def test_gcnv_write_family_table_task(self) -> None: project_remap_path='not_a_real_file', project_pedigree_path=TEST_PEDIGREE_5, family_guid='family_2_1', - validate=False, + skip_validation=True, ) worker.add(write_family_table_task) worker.run() diff --git a/v03_pipeline/lib/tasks/write_imported_callset.py b/v03_pipeline/lib/tasks/write_imported_callset.py index 07b0bb37e..be42fb750 100644 --- a/v03_pipeline/lib/tasks/write_imported_callset.py +++ b/v03_pipeline/lib/tasks/write_imported_callset.py @@ -1,8 +1,10 @@ import hail as hl import luigi +import luigi.util from v03_pipeline.lib.misc.io import ( import_callset, + import_vcf, select_relevant_fields, split_multi_hts, ) @@ -15,13 +17,15 @@ validate_sample_type, ) from v03_pipeline.lib.misc.vets import annotate_vets -from v03_pipeline.lib.model import CachedReferenceDatasetQuery, SampleType +from v03_pipeline.lib.model import CachedReferenceDatasetQuery from v03_pipeline.lib.model.environment import Env from v03_pipeline.lib.paths import ( cached_reference_dataset_query_path, imported_callset_path, sex_check_table_path, + valid_filters_path, ) +from v03_pipeline.lib.tasks.base.base_loading_run_params import BaseLoadingRunParams from v03_pipeline.lib.tasks.base.base_write import BaseWriteTask from v03_pipeline.lib.tasks.files import CallsetTask, GCSorLocalTarget, HailTableTask from v03_pipeline.lib.tasks.reference_data.updated_cached_reference_dataset_query import ( @@ -30,27 +34,8 @@ from v03_pipeline.lib.tasks.write_sex_check_table import WriteSexCheckTableTask +@luigi.util.inherits(BaseLoadingRunParams) class WriteImportedCallsetTask(BaseWriteTask): - sample_type = luigi.EnumParameter(enum=SampleType) - callset_path = luigi.Parameter() - imputed_sex_path = luigi.Parameter(default=None) - filters_path = luigi.OptionalParameter( - default=None, - description='Optional path to part two outputs from callset (VCF shards containing filter information)', - ) - validate = luigi.BoolParameter( - default=True, - parsing=luigi.BoolParameter.EXPLICIT_PARSING, - ) - force = luigi.BoolParameter( - default=False, - parsing=luigi.BoolParameter.EXPLICIT_PARSING, - ) - check_sex_and_relatedness = luigi.BoolParameter( - default=False, - parsing=luigi.BoolParameter.EXPLICIT_PARSING, - ) - def complete(self) -> luigi.Target: if not self.force and super().complete(): mt = hl.read_matrix_table(self.output().path) @@ -70,18 +55,29 @@ def output(self) -> luigi.Target: def requires(self) -> list[luigi.Task]: requirements = [] - if self.filters_path: + if ( + Env.EXPECT_WES_FILTERS + and not self.skip_expect_filters + and self.dataset_type.expect_filters( + self.sample_type, + ) + ): requirements = [ *requirements, - CallsetTask(self.filters_path), + CallsetTask( + valid_filters_path( + self.dataset_type, + self.sample_type, + self.callset_path, + ), + ), ] - if self.validate and self.dataset_type.can_run_validation: + if not self.skip_validation and self.dataset_type.can_run_validation: requirements = [ *requirements, ( - UpdatedCachedReferenceDatasetQuery( - reference_genome=self.reference_genome, - dataset_type=self.dataset_type, + self.clone( + UpdatedCachedReferenceDatasetQuery, crdq=CachedReferenceDatasetQuery.GNOMAD_CODING_AND_NONCODING_VARIANTS, ) if Env.REFERENCE_DATA_AUTO_UPDATE @@ -95,17 +91,13 @@ def requires(self) -> list[luigi.Task]: ), ] if ( - self.check_sex_and_relatedness + Env.CHECK_SEX_AND_RELATEDNESS + and not self.skip_check_sex_and_relatedness and self.dataset_type.check_sex_and_relatedness ): requirements = [ *requirements, - WriteSexCheckTableTask( - self.reference_genome, - self.dataset_type, - self.callset_path, - self.imputed_sex_path, - ), + self.clone(WriteSexCheckTableTask), ] return [ *requirements, @@ -116,7 +108,7 @@ def additional_row_fields(self, mt): return { **( {'info.AF': hl.tarray(hl.tfloat64)} - if self.check_sex_and_relatedness + if not self.skip_check_sex_and_relatedness and self.dataset_type.check_sex_and_relatedness else {} ), @@ -135,8 +127,22 @@ def create_table(self) -> hl.MatrixTable: self.callset_path, self.reference_genome, self.dataset_type, - self.filters_path, ) + filters_path = None + if ( + Env.EXPECT_WES_FILTERS + and not self.skip_expect_filters + and self.dataset_type.expect_filters( + self.sample_type, + ) + ): + filters_path = valid_filters_path( + self.dataset_type, + self.sample_type, + self.callset_path, + ) + filters_ht = import_vcf(filters_path, self.reference_genome).rows() + mt = mt.annotate_rows(filters=filters_ht[mt.row_key].filters) mt = select_relevant_fields( mt, self.dataset_type, @@ -163,7 +169,7 @@ def create_table(self) -> hl.MatrixTable: mt.locus.contig, ), ) - if self.validate and self.dataset_type.can_run_validation: + if not self.skip_validation and self.dataset_type.can_run_validation: validate_allele_type(mt) validate_no_duplicate_variants(mt) validate_expected_contig_frequency(mt, self.reference_genome) @@ -181,7 +187,8 @@ def create_table(self) -> hl.MatrixTable: self.sample_type, ) if ( - self.check_sex_and_relatedness + Env.CHECK_SEX_AND_RELATEDNESS + and not self.skip_check_sex_and_relatedness and self.dataset_type.check_sex_and_relatedness ): sex_check_ht = hl.read_table( @@ -197,6 +204,6 @@ def create_table(self) -> hl.MatrixTable: ) return mt.annotate_globals( callset_path=self.callset_path, - filters_path=self.filters_path or hl.missing(hl.tstr), + filters_path=filters_path or hl.missing(hl.tstr), sample_type=self.sample_type.value, ) diff --git a/v03_pipeline/lib/tasks/write_metadata_for_run.py b/v03_pipeline/lib/tasks/write_metadata_for_run.py index 80b39caca..69e470bb7 100644 --- a/v03_pipeline/lib/tasks/write_metadata_for_run.py +++ b/v03_pipeline/lib/tasks/write_metadata_for_run.py @@ -2,40 +2,22 @@ import hail as hl import luigi +import luigi.util -from v03_pipeline.lib.misc.callsets import callset_project_pairs -from v03_pipeline.lib.model import SampleType from v03_pipeline.lib.paths import metadata_for_run_path from v03_pipeline.lib.tasks.base.base_hail_table import BaseHailTableTask +from v03_pipeline.lib.tasks.base.base_loading_run_params import BaseLoadingRunParams from v03_pipeline.lib.tasks.files import GCSorLocalTarget from v03_pipeline.lib.tasks.write_remapped_and_subsetted_callset import ( WriteRemappedAndSubsettedCallsetTask, ) +@luigi.util.inherits(BaseLoadingRunParams) class WriteMetadataForRunTask(BaseHailTableTask): - sample_type = luigi.EnumParameter(enum=SampleType) - callset_paths = luigi.ListParameter() project_guids = luigi.ListParameter() project_remap_paths = luigi.ListParameter() project_pedigree_paths = luigi.ListParameter() - imputed_sex_paths = luigi.ListParameter(default=None) - ignore_missing_samples_when_remapping = luigi.BoolParameter( - default=False, - parsing=luigi.BoolParameter.EXPLICIT_PARSING, - ) - validate = luigi.BoolParameter( - default=True, - parsing=luigi.BoolParameter.EXPLICIT_PARSING, - ) - force = luigi.BoolParameter( - default=False, - parsing=luigi.BoolParameter.EXPLICIT_PARSING, - ) - check_sex_and_relatedness = luigi.BoolParameter( - default=True, - parsing=luigi.BoolParameter.EXPLICIT_PARSING, - ) run_id = luigi.Parameter() def output(self) -> luigi.Target: @@ -52,38 +34,23 @@ def complete(self) -> bool: def requires(self) -> list[luigi.Task]: return [ - WriteRemappedAndSubsettedCallsetTask( - self.reference_genome, - self.dataset_type, - self.sample_type, - callset_path, - project_guid, - project_remap_path, - project_pedigree_path, - imputed_sex_path, - self.ignore_missing_samples_when_remapping, - self.validate, - self.force, - self.check_sex_and_relatedness, + self.clone( + WriteRemappedAndSubsettedCallsetTask, + project_guid=project_guid, + project_remap_path=project_remap_path, + project_pedigree_path=project_pedigree_path, ) - for ( - callset_path, - project_guid, - project_remap_path, - project_pedigree_path, - imputed_sex_path, - ) in callset_project_pairs( - self.callset_paths, + for (project_guid, project_remap_path, project_pedigree_path) in zip( self.project_guids, self.project_remap_paths, self.project_pedigree_paths, - self.imputed_sex_paths, + strict=True, ) ] def run(self) -> None: metadata_json = { - 'callsets': self.callset_paths, + 'callsets': [self.callset_path], 'run_id': self.run_id, 'sample_type': self.sample_type.value, 'family_samples': {}, diff --git a/v03_pipeline/lib/tasks/write_metadata_for_run_test.py b/v03_pipeline/lib/tasks/write_metadata_for_run_test.py index 13ced2c85..f5d733a79 100644 --- a/v03_pipeline/lib/tasks/write_metadata_for_run_test.py +++ b/v03_pipeline/lib/tasks/write_metadata_for_run_test.py @@ -19,12 +19,11 @@ def test_write_metadata_for_run_task(self) -> None: reference_genome=ReferenceGenome.GRCh38, dataset_type=DatasetType.SNV_INDEL, sample_type=SampleType.WGS, - callset_paths=[TEST_VCF], + callset_path=TEST_VCF, project_guids=['R0113_test_project', 'R0114_project4'], project_remap_paths=[TEST_REMAP_2, TEST_REMAP_2], project_pedigree_paths=[TEST_PEDIGREE_3, TEST_PEDIGREE_4], - validate=False, - check_sex_and_relatedness=False, + skip_validation=True, run_id='run_123456', ) worker.add(write_metadata_for_run_task) diff --git a/v03_pipeline/lib/tasks/write_new_variants_table.py b/v03_pipeline/lib/tasks/write_new_variants_table.py index f444e4d0e..b70dc2a6f 100644 --- a/v03_pipeline/lib/tasks/write_new_variants_table.py +++ b/v03_pipeline/lib/tasks/write_new_variants_table.py @@ -2,15 +2,16 @@ import hail as hl import luigi +import luigi.util from v03_pipeline.lib.annotations.fields import get_fields from v03_pipeline.lib.annotations.rdc_dependencies import ( get_rdc_annotation_dependencies, ) from v03_pipeline.lib.misc.allele_registry import register_alleles_in_chunks -from v03_pipeline.lib.misc.callsets import callset_project_pairs, get_callset_ht +from v03_pipeline.lib.misc.callsets import get_callset_ht from v03_pipeline.lib.misc.math import constrain -from v03_pipeline.lib.model import Env, ReferenceDatasetCollection, SampleType +from v03_pipeline.lib.model import Env, ReferenceDatasetCollection from v03_pipeline.lib.paths import ( new_variants_table_path, variant_annotations_table_path, @@ -19,6 +20,7 @@ load_gencode_ensembl_to_refseq_id, load_gencode_gene_symbol_to_gene_id, ) +from v03_pipeline.lib.tasks.base.base_loading_run_params import BaseLoadingRunParams from v03_pipeline.lib.tasks.base.base_update_variant_annotations_table import ( BaseUpdateVariantAnnotationsTableTask, ) @@ -40,29 +42,11 @@ GENCODE_FOR_VEP_RELEASE = 44 +@luigi.util.inherits(BaseLoadingRunParams) class WriteNewVariantsTableTask(BaseWriteTask): - sample_type = luigi.EnumParameter(enum=SampleType) - callset_paths = luigi.ListParameter() project_guids = luigi.ListParameter() project_remap_paths = luigi.ListParameter() project_pedigree_paths = luigi.ListParameter() - imputed_sex_paths = luigi.ListParameter(default=None) - ignore_missing_samples_when_remapping = luigi.BoolParameter( - default=False, - parsing=luigi.BoolParameter.EXPLICIT_PARSING, - ) - validate = luigi.BoolParameter( - default=True, - parsing=luigi.BoolParameter.EXPLICIT_PARSING, - ) - force = luigi.BoolParameter( - default=False, - parsing=luigi.BoolParameter.EXPLICIT_PARSING, - ) - liftover_ref_path = luigi.OptionalParameter( - default='gs://hail-common/references/grch38_to_grch37.over.chain.gz', - description='Path to GRCh38 to GRCh37 coordinates file', - ) run_id = luigi.Parameter() @property @@ -78,6 +62,7 @@ def annotation_dependencies(self) -> dict[str, hl.Table]: deps['gencode_gene_symbol_to_gene_id_mapping'] = hl.literal( load_gencode_gene_symbol_to_gene_id(GENCODE_RELEASE, ''), ) + deps['liftover_ref_path'] = Env.LIFTOVER_REF_PATH return deps def output(self) -> luigi.Target: @@ -91,14 +76,14 @@ def output(self) -> luigi.Target: def requires(self) -> list[luigi.Task]: if Env.REFERENCE_DATA_AUTO_UPDATE: - upstream_table_tasks = [ + requirements = [ UpdateVariantAnnotationsTableWithUpdatedReferenceDataset( self.reference_genome, self.dataset_type, ), ] else: - upstream_table_tasks = [ + requirements = [ BaseUpdateVariantAnnotationsTableTask( self.reference_genome, self.dataset_type, @@ -106,55 +91,33 @@ def requires(self) -> list[luigi.Task]: ] if self.dataset_type.has_lookup_table: # NB: the lookup table task has remapped and subsetted callset tasks as dependencies. - upstream_table_tasks.extend( - [ - UpdateLookupTableTask( - self.reference_genome, - self.dataset_type, - self.sample_type, - self.callset_paths, - self.project_guids, - self.project_remap_paths, - self.project_pedigree_paths, - self.imputed_sex_paths, - self.ignore_missing_samples_when_remapping, - self.validate, - self.force, - ), - ], - ) + # Also note that force is passed here, + requirements = [ + *requirements, + self.clone(UpdateLookupTableTask), + ] else: - upstream_table_tasks.extend( + requirements.extend( [ - WriteRemappedAndSubsettedCallsetTask( - self.reference_genome, - self.dataset_type, - self.sample_type, - callset_path, - project_guid, - project_remap_path, - project_pedigree_path, - imputed_sex_path, - self.ignore_missing_samples_when_remapping, - self.validate, - False, + self.clone( + WriteRemappedAndSubsettedCallsetTask, + project_guid=project_guid, + project_remap_path=project_remap_path, + project_pedigree_path=project_pedigree_path, ) for ( - callset_path, project_guid, project_remap_path, project_pedigree_path, - imputed_sex_path, - ) in callset_project_pairs( - self.callset_paths, + ) in zip( self.project_guids, self.project_remap_paths, self.project_pedigree_paths, - self.imputed_sex_paths, + strict=True, ) ], ) - return upstream_table_tasks + return requirements def complete(self) -> bool: return super().complete() and hl.eval( @@ -163,23 +126,11 @@ def complete(self) -> bool: [ updates.contains( hl.Struct( - callset=callset_path, + callset=self.callset_path, project_guid=project_guid, ), ) - for ( - callset_path, - project_guid, - _, - _, - _, - ) in callset_project_pairs( - self.callset_paths, - self.project_guids, - self.project_remap_paths, - self.project_pedigree_paths, - self.imputed_sex_paths, - ) + for project_guid in self.project_guids ], ), hl.read_table(self.output().path).updates, @@ -190,11 +141,8 @@ def create_table(self) -> hl.Table: callset_ht = get_callset_ht( self.reference_genome, self.dataset_type, - self.callset_paths, + self.callset_path, self.project_guids, - self.project_remap_paths, - self.project_pedigree_paths, - self.imputed_sex_paths, ) # 1) Identify new variants. @@ -270,19 +218,7 @@ def create_table(self) -> hl.Table: return new_variants_ht.select_globals( updates={ - hl.Struct(callset=callset_path, project_guid=project_guid) - for ( - callset_path, - project_guid, - _, - _, - _, - ) in callset_project_pairs( - self.callset_paths, - self.project_guids, - self.project_remap_paths, - self.project_pedigree_paths, - self.imputed_sex_paths, - ) + hl.Struct(callset=self.callset_path, project_guid=project_guid) + for project_guid in self.project_guids }, ) diff --git a/v03_pipeline/lib/tasks/write_project_family_tables.py b/v03_pipeline/lib/tasks/write_project_family_tables.py index b7d83cd49..26253a1da 100644 --- a/v03_pipeline/lib/tasks/write_project_family_tables.py +++ b/v03_pipeline/lib/tasks/write_project_family_tables.py @@ -1,35 +1,18 @@ import hail as hl import luigi +import luigi.util -from v03_pipeline.lib.model import SampleType from v03_pipeline.lib.tasks.base.base_hail_table import BaseHailTableTask +from v03_pipeline.lib.tasks.base.base_loading_run_params import BaseLoadingRunParams from v03_pipeline.lib.tasks.update_project_table import UpdateProjectTableTask from v03_pipeline.lib.tasks.write_family_table import WriteFamilyTableTask +@luigi.util.inherits(BaseLoadingRunParams) class WriteProjectFamilyTablesTask(BaseHailTableTask): - sample_type = luigi.EnumParameter(enum=SampleType) - callset_path = luigi.Parameter() project_guid = luigi.Parameter() project_remap_path = luigi.Parameter() project_pedigree_path = luigi.Parameter() - imputed_sex_path = luigi.Parameter(default=None) - ignore_missing_samples_when_remapping = luigi.BoolParameter( - default=False, - parsing=luigi.BoolParameter.EXPLICIT_PARSING, - ) - validate = luigi.BoolParameter( - default=True, - parsing=luigi.BoolParameter.EXPLICIT_PARSING, - ) - force = luigi.BoolParameter( - default=False, - parsing=luigi.BoolParameter.EXPLICIT_PARSING, - ) - is_new_gcnv_joint_call = luigi.BoolParameter( - default=False, - description='Is this a fully joint-called callset.', - ) def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) @@ -47,27 +30,14 @@ def complete(self) -> bool: def run(self): # https://luigi.readthedocs.io/en/stable/tasks.html#dynamic-dependencies - update_project_table_task: luigi.Target = yield UpdateProjectTableTask( - self.reference_genome, - self.dataset_type, - self.project_guid, - self.sample_type, - self.callset_path, - self.project_remap_path, - self.project_pedigree_path, - self.imputed_sex_path, - self.ignore_missing_samples_when_remapping, - self.validate, - False, - self.is_new_gcnv_joint_call, + update_project_table_task: luigi.Target = yield self.clone( + UpdateProjectTableTask, + force=False, ) project_ht = hl.read_table(update_project_table_task.path) family_guids = hl.eval(project_ht.globals.family_guids) for family_guid in family_guids: self.dynamic_write_family_table_tasks.add( - WriteFamilyTableTask( - **self.param_kwargs, - family_guid=family_guid, - ), + self.clone(WriteFamilyTableTask, family_guid=family_guid), ) yield self.dynamic_write_family_table_tasks diff --git a/v03_pipeline/lib/tasks/write_project_family_tables_test.py b/v03_pipeline/lib/tasks/write_project_family_tables_test.py index d3b713224..37bb9a556 100644 --- a/v03_pipeline/lib/tasks/write_project_family_tables_test.py +++ b/v03_pipeline/lib/tasks/write_project_family_tables_test.py @@ -23,7 +23,8 @@ def test_snv_write_project_family_tables_task(self) -> None: project_guid='R0113_test_project', project_remap_path=TEST_REMAP, project_pedigree_path=TEST_PEDIGREE_4, - validate=False, + skip_validation=True, + skip_check_sex_and_relatedness=True, ) worker.add(write_project_family_tables) worker.run() diff --git a/v03_pipeline/lib/tasks/write_relatedness_check_table.py b/v03_pipeline/lib/tasks/write_relatedness_check_table.py index dc8bf17d6..6b943c643 100644 --- a/v03_pipeline/lib/tasks/write_relatedness_check_table.py +++ b/v03_pipeline/lib/tasks/write_relatedness_check_table.py @@ -1,12 +1,14 @@ import hail as hl import luigi +import luigi.util from v03_pipeline.lib.methods.relatedness import call_relatedness -from v03_pipeline.lib.model import CachedReferenceDatasetQuery, Env, SampleType +from v03_pipeline.lib.model import CachedReferenceDatasetQuery, Env from v03_pipeline.lib.paths import ( cached_reference_dataset_query_path, relatedness_check_table_path, ) +from v03_pipeline.lib.tasks.base.base_loading_run_params import BaseLoadingRunParams from v03_pipeline.lib.tasks.base.base_write import BaseWriteTask from v03_pipeline.lib.tasks.files import GCSorLocalTarget, HailTableTask from v03_pipeline.lib.tasks.reference_data.updated_cached_reference_dataset_query import ( @@ -15,10 +17,8 @@ from v03_pipeline.lib.tasks.write_imported_callset import WriteImportedCallsetTask +@luigi.util.inherits(BaseLoadingRunParams) class WriteRelatednessCheckTableTask(BaseWriteTask): - sample_type = luigi.EnumParameter(enum=SampleType) - callset_path = luigi.Parameter() - def output(self) -> luigi.Target: return GCSorLocalTarget( relatedness_check_table_path( @@ -30,20 +30,14 @@ def output(self) -> luigi.Target: def requires(self) -> luigi.Task: requirements = [ - WriteImportedCallsetTask( - self.reference_genome, - self.dataset_type, - self.sample_type, - self.callset_path, - ), + self.clone(WriteImportedCallsetTask), ] if Env.ACCESS_PRIVATE_REFERENCE_DATASETS: requirements = [ *requirements, ( - UpdatedCachedReferenceDatasetQuery( - reference_genome=self.reference_genome, - dataset_type=self.dataset_type, + self.clone( + UpdatedCachedReferenceDatasetQuery, crdq=CachedReferenceDatasetQuery.GNOMAD_QC, ) if Env.REFERENCE_DATA_AUTO_UPDATE diff --git a/v03_pipeline/lib/tasks/write_remapped_and_subsetted_callset.py b/v03_pipeline/lib/tasks/write_remapped_and_subsetted_callset.py index 079996d35..f5e9eb48e 100644 --- a/v03_pipeline/lib/tasks/write_remapped_and_subsetted_callset.py +++ b/v03_pipeline/lib/tasks/write_remapped_and_subsetted_callset.py @@ -1,5 +1,6 @@ import hail as hl import luigi +import luigi.util from v03_pipeline.lib.logger import get_logger from v03_pipeline.lib.misc.family_loading_failures import ( @@ -14,8 +15,9 @@ ) from v03_pipeline.lib.misc.pedigree import parse_pedigree_ht_to_families from v03_pipeline.lib.misc.sample_ids import remap_sample_ids, subset_samples -from v03_pipeline.lib.model import SampleType +from v03_pipeline.lib.model.environment import Env from v03_pipeline.lib.paths import remapped_and_subsetted_callset_path +from v03_pipeline.lib.tasks.base.base_loading_run_params import BaseLoadingRunParams from v03_pipeline.lib.tasks.base.base_write import BaseWriteTask from v03_pipeline.lib.tasks.files import GCSorLocalTarget, RawFileTask from v03_pipeline.lib.tasks.write_imported_callset import WriteImportedCallsetTask @@ -27,29 +29,11 @@ logger = get_logger(__name__) +@luigi.util.inherits(BaseLoadingRunParams) class WriteRemappedAndSubsettedCallsetTask(BaseWriteTask): - sample_type = luigi.EnumParameter(enum=SampleType) - callset_path = luigi.Parameter() project_guid = luigi.Parameter() project_remap_path = luigi.Parameter() project_pedigree_path = luigi.Parameter() - imputed_sex_path = luigi.Parameter(default=None) - ignore_missing_samples_when_remapping = luigi.BoolParameter( - default=False, - parsing=luigi.BoolParameter.EXPLICIT_PARSING, - ) - validate = luigi.BoolParameter( - default=True, - parsing=luigi.BoolParameter.EXPLICIT_PARSING, - ) - force = luigi.BoolParameter( - default=False, - parsing=luigi.BoolParameter.EXPLICIT_PARSING, - ) - check_sex_and_relatedness = luigi.BoolParameter( - default=False, - parsing=luigi.BoolParameter.EXPLICIT_PARSING, - ) def complete(self) -> luigi.Target: return not self.force and super().complete() @@ -66,40 +50,18 @@ def output(self) -> luigi.Target: def requires(self) -> list[luigi.Task]: requirements = [ - WriteImportedCallsetTask( - reference_genome=self.reference_genome, - dataset_type=self.dataset_type, - sample_type=self.sample_type, - callset_path=self.callset_path, - imputed_sex_path=self.imputed_sex_path, - # NB: filters_path is explicitly passed as None here - # to avoid carrying it throughout the rest of the pipeline. - # Only the primary import task itself should be aware of it. - filters_path=None, - validate=self.validate, - force=False, - check_sex_and_relatedness=self.check_sex_and_relatedness, - ), + self.clone(WriteImportedCallsetTask, force=False), RawFileTask(self.project_pedigree_path), ] if ( - self.check_sex_and_relatedness + Env.CHECK_SEX_AND_RELATEDNESS + and not self.skip_check_sex_and_relatedness and self.dataset_type.check_sex_and_relatedness ): requirements = [ *requirements, - WriteRelatednessCheckTableTask( - self.reference_genome, - self.dataset_type, - self.sample_type, - self.callset_path, - ), - WriteSexCheckTableTask( - self.reference_genome, - self.dataset_type, - self.callset_path, - self.imputed_sex_path, - ), + self.clone(WriteRelatednessCheckTableTask), + self.clone(WriteSexCheckTableTask), ] return requirements @@ -128,7 +90,8 @@ def create_table(self) -> hl.MatrixTable: families_failed_relatedness_check = {} families_failed_sex_check = {} if ( - self.check_sex_and_relatedness + Env.CHECK_SEX_AND_RELATEDNESS + and not self.skip_check_sex_and_relatedness and self.dataset_type.check_sex_and_relatedness ): relatedness_check_ht = hl.read_table(self.input()[2].path) diff --git a/v03_pipeline/lib/tasks/write_remapped_and_subsetted_callset_test.py b/v03_pipeline/lib/tasks/write_remapped_and_subsetted_callset_test.py index 6cfd95098..48f2b481a 100644 --- a/v03_pipeline/lib/tasks/write_remapped_and_subsetted_callset_test.py +++ b/v03_pipeline/lib/tasks/write_remapped_and_subsetted_callset_test.py @@ -1,4 +1,5 @@ import shutil +from unittest.mock import Mock, patch import hail as hl import luigi.worker @@ -82,8 +83,7 @@ def test_write_remapped_and_subsetted_callset_task( project_guid='R0113_test_project', project_remap_path=TEST_REMAP, project_pedigree_path=TEST_PEDIGREE_3, - validate=False, - check_sex_and_relatedness=True, + skip_validation=True, ) worker.add(wrsc_task) worker.run() @@ -104,9 +104,12 @@ def test_write_remapped_and_subsetted_callset_task( ], ) + @patch('v03_pipeline.lib.tasks.write_remapped_and_subsetted_callset.Env') def test_write_remapped_and_subsetted_callset_task_failed_sex_check_family( self, + mock_env: Mock, ) -> None: + mock_env.CHECK_SEX_AND_RELATEDNESS = True worker = luigi.worker.Worker() wrsc_task = WriteRemappedAndSubsettedCallsetTask( reference_genome=ReferenceGenome.GRCh38, @@ -116,8 +119,7 @@ def test_write_remapped_and_subsetted_callset_task_failed_sex_check_family( project_guid='R0114_project4', project_remap_path=TEST_REMAP, project_pedigree_path=TEST_PEDIGREE_4, - validate=False, - check_sex_and_relatedness=True, + skip_validation=True, ) worker.add(wrsc_task) worker.run() diff --git a/v03_pipeline/lib/tasks/write_sex_check_table.py b/v03_pipeline/lib/tasks/write_sex_check_table.py index 55cbc1387..b8b4bb9e7 100644 --- a/v03_pipeline/lib/tasks/write_sex_check_table.py +++ b/v03_pipeline/lib/tasks/write_sex_check_table.py @@ -2,14 +2,13 @@ import luigi from v03_pipeline.lib.misc.io import import_imputed_sex -from v03_pipeline.lib.paths import sex_check_table_path +from v03_pipeline.lib.paths import imputed_sex_path, sex_check_table_path from v03_pipeline.lib.tasks.base.base_write import BaseWriteTask from v03_pipeline.lib.tasks.files import GCSorLocalTarget, RawFileTask class WriteSexCheckTableTask(BaseWriteTask): callset_path = luigi.Parameter() - imputed_sex_path = luigi.Parameter() def output(self) -> luigi.Target: return GCSorLocalTarget( @@ -21,9 +20,13 @@ def output(self) -> luigi.Target: ) def requires(self) -> luigi.Task: - return [ - RawFileTask(self.imputed_sex_path), - ] + return RawFileTask( + imputed_sex_path( + self.reference_genome, + self.dataset_type, + self.callset_path, + ), + ) def create_table(self) -> hl.Table: - return import_imputed_sex(self.input()[0].path) + return import_imputed_sex(self.input().path)