diff --git a/v03_pipeline/lib/misc/callsets.py b/v03_pipeline/lib/misc/callsets.py index 34ac4f8a7..63fc4939e 100644 --- a/v03_pipeline/lib/misc/callsets.py +++ b/v03_pipeline/lib/misc/callsets.py @@ -28,3 +28,26 @@ def get_callset_ht( callset_hts, ) return callset_ht.distinct() + + +def additional_row_fields( + mt: hl.MatrixTable, + dataset_type: DatasetType, + skip_check_sex_and_relatedness: bool, +): + return { + **( + {'info.AF': hl.tarray(hl.tfloat64)} + if not skip_check_sex_and_relatedness + and dataset_type.check_sex_and_relatedness + else {} + ), + # this field is never required, the pipeline + # will run smoothly even in its absence, but + # will trigger special handling if it is present. + **( + {'info.CALIBRATION_SENSITIVITY': hl.tarray(hl.tstr)} + if hasattr(mt, 'info') and hasattr(mt.info, 'CALIBRATION_SENSITIVITY') + else {} + ), + } diff --git a/v03_pipeline/lib/tasks/base/base_update.py b/v03_pipeline/lib/tasks/base/base_update.py index fe27a6c81..675eaa1a2 100644 --- a/v03_pipeline/lib/tasks/base/base_update.py +++ b/v03_pipeline/lib/tasks/base/base_update.py @@ -10,7 +10,12 @@ def run(self) -> None: if not self.output().exists(): ht = self.initialize_table() else: - ht = hl.read_table(self.output().path) + read_fn = ( + hl.read_matrix_table + if self.output().path.endswith('mt') + else hl.read_table + ) + ht = read_fn(self.output().path) ht = self.update_table(ht) write(ht, self.output().path) # Set force to false after run, allowing "complete()" to succeeded diff --git a/v03_pipeline/lib/tasks/update_variant_annotations_table_with_new_samples_test.py b/v03_pipeline/lib/tasks/update_variant_annotations_table_with_new_samples_test.py index 7d9668544..d22dae749 100644 --- a/v03_pipeline/lib/tasks/update_variant_annotations_table_with_new_samples_test.py +++ b/v03_pipeline/lib/tasks/update_variant_annotations_table_with_new_samples_test.py @@ -206,13 +206,13 @@ def test_missing_interval_reference( @patch('v03_pipeline.lib.tasks.write_new_variants_table.register_alleles_in_chunks') @patch('v03_pipeline.lib.tasks.write_new_variants_table.Env') @patch( - 'v03_pipeline.lib.tasks.write_imported_callset.UpdatedCachedReferenceDatasetQuery', + 'v03_pipeline.lib.tasks.validate_callset.UpdatedCachedReferenceDatasetQuery', ) @patch( 'v03_pipeline.lib.tasks.write_new_variants_table.UpdateVariantAnnotationsTableWithUpdatedReferenceDataset', ) @patch( - 'v03_pipeline.lib.tasks.write_imported_callset.validate_expected_contig_frequency', + 'v03_pipeline.lib.tasks.validate_callset.validate_expected_contig_frequency', partial(validate_expected_contig_frequency, min_rows_per_contig=25), ) @patch.object(ReferenceGenome, 'standard_contigs', new_callable=PropertyMock) diff --git a/v03_pipeline/lib/tasks/validate_callset.py b/v03_pipeline/lib/tasks/validate_callset.py new file mode 100644 index 000000000..d88c27aa5 --- /dev/null +++ b/v03_pipeline/lib/tasks/validate_callset.py @@ -0,0 +1,149 @@ +import hail as hl +import luigi +import luigi.util + +from v03_pipeline.lib.misc.callsets import additional_row_fields +from v03_pipeline.lib.misc.validation import ( + validate_allele_type, + validate_expected_contig_frequency, + validate_imported_field_types, + validate_imputed_sex_ploidy, + validate_no_duplicate_variants, + validate_sample_type, +) +from v03_pipeline.lib.model import CachedReferenceDatasetQuery +from v03_pipeline.lib.model.environment import Env +from v03_pipeline.lib.paths import ( + cached_reference_dataset_query_path, + imported_callset_path, + sex_check_table_path, +) +from v03_pipeline.lib.tasks.base.base_loading_run_params import BaseLoadingRunParams +from v03_pipeline.lib.tasks.base.base_update import BaseUpdateTask +from v03_pipeline.lib.tasks.files import CallsetTask, GCSorLocalTarget, HailTableTask +from v03_pipeline.lib.tasks.reference_data.updated_cached_reference_dataset_query import ( + UpdatedCachedReferenceDatasetQuery, +) +from v03_pipeline.lib.tasks.write_imported_callset import WriteImportedCallsetTask +from v03_pipeline.lib.tasks.write_sex_check_table import WriteSexCheckTableTask + + +@luigi.util.inherits(BaseLoadingRunParams) +class ValidateCallsetTask(BaseUpdateTask): + def complete(self) -> luigi.Target: + if not self.force and super().complete(): + mt = hl.read_matrix_table(self.output().path) + return hasattr(mt, 'validated_sample_type') and hl.eval( + self.sample_type.value == mt.validated_sample_type, + ) + return False + + def output(self) -> luigi.Target: + return GCSorLocalTarget( + imported_callset_path( + self.reference_genome, + self.dataset_type, + self.callset_path, + ), + ) + + def requires(self) -> list[luigi.Task]: + requirements = [ + self.clone(WriteImportedCallsetTask), + ] + if not self.skip_validation and self.dataset_type.can_run_validation: + requirements = [ + *requirements, + ( + self.clone( + UpdatedCachedReferenceDatasetQuery, + crdq=CachedReferenceDatasetQuery.GNOMAD_CODING_AND_NONCODING_VARIANTS, + ) + if Env.REFERENCE_DATA_AUTO_UPDATE + else HailTableTask( + cached_reference_dataset_query_path( + self.reference_genome, + self.dataset_type, + CachedReferenceDatasetQuery.GNOMAD_CODING_AND_NONCODING_VARIANTS, + ), + ), + ), + ] + if ( + Env.CHECK_SEX_AND_RELATEDNESS + and not self.skip_check_sex_and_relatedness + and self.dataset_type.check_sex_and_relatedness + ): + requirements = [ + *requirements, + self.clone(WriteSexCheckTableTask), + ] + return [ + *requirements, + CallsetTask(self.callset_path), + ] + + def update_table(self, mt: hl.MatrixTable) -> hl.MatrixTable: + mt = hl.read_matrix_table( + imported_callset_path( + self.reference_genome, + self.dataset_type, + self.callset_path, + ), + ) + # This validation isn't override-able. If a field is the wrong + # type, the pipeline will likely hard-fail downstream. + validate_imported_field_types( + mt, + self.dataset_type, + additional_row_fields( + mt, + self.dataset_type, + self.skip_check_sex_and_relatedness, + ), + ) + if self.dataset_type.can_run_validation: + # Rather than throwing an error, we silently remove invalid contigs. + # This happens fairly often for AnVIL requests. + mt = mt.filter_rows( + hl.set(self.reference_genome.standard_contigs).contains( + mt.locus.contig, + ), + ) + if not self.skip_validation and self.dataset_type.can_run_validation: + validate_allele_type(mt) + validate_no_duplicate_variants(mt) + validate_expected_contig_frequency(mt, self.reference_genome) + coding_and_noncoding_ht = hl.read_table( + cached_reference_dataset_query_path( + self.reference_genome, + self.dataset_type, + CachedReferenceDatasetQuery.GNOMAD_CODING_AND_NONCODING_VARIANTS, + ), + ) + validate_sample_type( + mt, + coding_and_noncoding_ht, + self.reference_genome, + self.sample_type, + ) + if ( + Env.CHECK_SEX_AND_RELATEDNESS + and not self.skip_check_sex_and_relatedness + and self.dataset_type.check_sex_and_relatedness + ): + sex_check_ht = hl.read_table( + sex_check_table_path( + self.reference_genome, + self.dataset_type, + self.callset_path, + ), + ) + validate_imputed_sex_ploidy( + mt, + sex_check_ht, + ) + return mt.select_globals( + callset_path=self.callset_path, + validated_sample_type=self.sample_type.value, + ) diff --git a/v03_pipeline/lib/tasks/write_imported_callset.py b/v03_pipeline/lib/tasks/write_imported_callset.py index be42fb750..774e1b1f3 100644 --- a/v03_pipeline/lib/tasks/write_imported_callset.py +++ b/v03_pipeline/lib/tasks/write_imported_callset.py @@ -2,47 +2,28 @@ import luigi import luigi.util +from v03_pipeline.lib.misc.callsets import additional_row_fields from v03_pipeline.lib.misc.io import ( import_callset, import_vcf, select_relevant_fields, split_multi_hts, ) -from v03_pipeline.lib.misc.validation import ( - validate_allele_type, - validate_expected_contig_frequency, - validate_imported_field_types, - validate_imputed_sex_ploidy, - validate_no_duplicate_variants, - validate_sample_type, -) from v03_pipeline.lib.misc.vets import annotate_vets -from v03_pipeline.lib.model import CachedReferenceDatasetQuery from v03_pipeline.lib.model.environment import Env from v03_pipeline.lib.paths import ( - cached_reference_dataset_query_path, imported_callset_path, - sex_check_table_path, valid_filters_path, ) from v03_pipeline.lib.tasks.base.base_loading_run_params import BaseLoadingRunParams from v03_pipeline.lib.tasks.base.base_write import BaseWriteTask -from v03_pipeline.lib.tasks.files import CallsetTask, GCSorLocalTarget, HailTableTask -from v03_pipeline.lib.tasks.reference_data.updated_cached_reference_dataset_query import ( - UpdatedCachedReferenceDatasetQuery, -) -from v03_pipeline.lib.tasks.write_sex_check_table import WriteSexCheckTableTask +from v03_pipeline.lib.tasks.files import CallsetTask, GCSorLocalTarget @luigi.util.inherits(BaseLoadingRunParams) class WriteImportedCallsetTask(BaseWriteTask): def complete(self) -> luigi.Target: - if not self.force and super().complete(): - mt = hl.read_matrix_table(self.output().path) - return hasattr(mt, 'sample_type') and hl.eval( - self.sample_type.value == mt.sample_type, - ) - return False + return not self.force and super().complete() def output(self) -> luigi.Target: return GCSorLocalTarget( @@ -72,56 +53,11 @@ def requires(self) -> list[luigi.Task]: ), ), ] - if not self.skip_validation and self.dataset_type.can_run_validation: - requirements = [ - *requirements, - ( - self.clone( - UpdatedCachedReferenceDatasetQuery, - crdq=CachedReferenceDatasetQuery.GNOMAD_CODING_AND_NONCODING_VARIANTS, - ) - if Env.REFERENCE_DATA_AUTO_UPDATE - else HailTableTask( - cached_reference_dataset_query_path( - self.reference_genome, - self.dataset_type, - CachedReferenceDatasetQuery.GNOMAD_CODING_AND_NONCODING_VARIANTS, - ), - ), - ), - ] - if ( - Env.CHECK_SEX_AND_RELATEDNESS - and not self.skip_check_sex_and_relatedness - and self.dataset_type.check_sex_and_relatedness - ): - requirements = [ - *requirements, - self.clone(WriteSexCheckTableTask), - ] return [ *requirements, CallsetTask(self.callset_path), ] - def additional_row_fields(self, mt): - return { - **( - {'info.AF': hl.tarray(hl.tfloat64)} - if not self.skip_check_sex_and_relatedness - and self.dataset_type.check_sex_and_relatedness - else {} - ), - # this field is never required, the pipeline - # will run smoothly even in its absence, but - # will trigger special handling if it is present. - **( - {'info.CALIBRATION_SENSITIVITY': hl.tarray(hl.tstr)} - if hasattr(mt, 'info') and hasattr(mt.info, 'CALIBRATION_SENSITIVITY') - else {} - ), - } - def create_table(self) -> hl.MatrixTable: mt = import_callset( self.callset_path, @@ -146,14 +82,11 @@ def create_table(self) -> hl.MatrixTable: mt = select_relevant_fields( mt, self.dataset_type, - self.additional_row_fields(mt), - ) - # This validation isn't override-able. If a field is the wrong - # type, the pipeline will likely hard-fail downstream. - validate_imported_field_types( - mt, - self.dataset_type, - self.additional_row_fields(mt), + additional_row_fields( + mt, + self.dataset_type, + self.skip_check_sex_and_relatedness, + ), ) if self.dataset_type.has_multi_allelic_variants: mt = split_multi_hts(mt) @@ -161,49 +94,7 @@ def create_table(self) -> hl.MatrixTable: # The annotations are present on the sample-level FT field but are # expected upstream on "filters". mt = annotate_vets(mt) - if self.dataset_type.can_run_validation: - # Rather than throwing an error, we silently remove invalid contigs. - # This happens fairly often for AnVIL requests. - mt = mt.filter_rows( - hl.set(self.reference_genome.standard_contigs).contains( - mt.locus.contig, - ), - ) - if not self.skip_validation and self.dataset_type.can_run_validation: - validate_allele_type(mt) - validate_no_duplicate_variants(mt) - validate_expected_contig_frequency(mt, self.reference_genome) - coding_and_noncoding_ht = hl.read_table( - cached_reference_dataset_query_path( - self.reference_genome, - self.dataset_type, - CachedReferenceDatasetQuery.GNOMAD_CODING_AND_NONCODING_VARIANTS, - ), - ) - validate_sample_type( - mt, - coding_and_noncoding_ht, - self.reference_genome, - self.sample_type, - ) - if ( - Env.CHECK_SEX_AND_RELATEDNESS - and not self.skip_check_sex_and_relatedness - and self.dataset_type.check_sex_and_relatedness - ): - sex_check_ht = hl.read_table( - sex_check_table_path( - self.reference_genome, - self.dataset_type, - self.callset_path, - ), - ) - validate_imputed_sex_ploidy( - mt, - sex_check_ht, - ) - return mt.annotate_globals( + return mt.select_globals( callset_path=self.callset_path, filters_path=filters_path or hl.missing(hl.tstr), - sample_type=self.sample_type.value, ) diff --git a/v03_pipeline/lib/tasks/write_relatedness_check_table.py b/v03_pipeline/lib/tasks/write_relatedness_check_table.py index 6b943c643..979668eaa 100644 --- a/v03_pipeline/lib/tasks/write_relatedness_check_table.py +++ b/v03_pipeline/lib/tasks/write_relatedness_check_table.py @@ -14,7 +14,7 @@ from v03_pipeline.lib.tasks.reference_data.updated_cached_reference_dataset_query import ( UpdatedCachedReferenceDatasetQuery, ) -from v03_pipeline.lib.tasks.write_imported_callset import WriteImportedCallsetTask +from v03_pipeline.lib.tasks.validate_callset import ValidateCallsetTask @luigi.util.inherits(BaseLoadingRunParams) @@ -30,7 +30,7 @@ def output(self) -> luigi.Target: def requires(self) -> luigi.Task: requirements = [ - self.clone(WriteImportedCallsetTask), + self.clone(ValidateCallsetTask), ] if Env.ACCESS_PRIVATE_REFERENCE_DATASETS: requirements = [ diff --git a/v03_pipeline/lib/tasks/write_relatedness_check_table_test.py b/v03_pipeline/lib/tasks/write_relatedness_check_table_test.py index cfaed3ea7..e54be1c6b 100644 --- a/v03_pipeline/lib/tasks/write_relatedness_check_table_test.py +++ b/v03_pipeline/lib/tasks/write_relatedness_check_table_test.py @@ -57,7 +57,7 @@ def setUp(self) -> None: # Force imported callset to be complete ht = import_vcf(TEST_VCF, ReferenceGenome.GRCh38) - ht = ht.annotate_globals(sample_type=SampleType.WGS.value) + ht = ht.annotate_globals(validated_sample_type=SampleType.WGS.value) ht = ht.annotate_rows(**{'info.AF': ht.info.AF}) ht.write( imported_callset_path( diff --git a/v03_pipeline/lib/tasks/write_remapped_and_subsetted_callset.py b/v03_pipeline/lib/tasks/write_remapped_and_subsetted_callset.py index f5e9eb48e..609c3b014 100644 --- a/v03_pipeline/lib/tasks/write_remapped_and_subsetted_callset.py +++ b/v03_pipeline/lib/tasks/write_remapped_and_subsetted_callset.py @@ -20,7 +20,7 @@ from v03_pipeline.lib.tasks.base.base_loading_run_params import BaseLoadingRunParams from v03_pipeline.lib.tasks.base.base_write import BaseWriteTask from v03_pipeline.lib.tasks.files import GCSorLocalTarget, RawFileTask -from v03_pipeline.lib.tasks.write_imported_callset import WriteImportedCallsetTask +from v03_pipeline.lib.tasks.validate_callset import ValidateCallsetTask from v03_pipeline.lib.tasks.write_relatedness_check_table import ( WriteRelatednessCheckTableTask, ) @@ -50,7 +50,7 @@ def output(self) -> luigi.Target: def requires(self) -> list[luigi.Task]: requirements = [ - self.clone(WriteImportedCallsetTask, force=False), + self.clone(ValidateCallsetTask, force=False), RawFileTask(self.project_pedigree_path), ] if (