Skip to content

Benb/split import and validate (#829) #835

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Jul 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 23 additions & 0 deletions v03_pipeline/lib/misc/callsets.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,3 +28,26 @@ def get_callset_ht(
callset_hts,
)
return callset_ht.distinct()


def additional_row_fields(
mt: hl.MatrixTable,
dataset_type: DatasetType,
skip_check_sex_and_relatedness: bool,
):
return {
**(
{'info.AF': hl.tarray(hl.tfloat64)}
if not skip_check_sex_and_relatedness
and dataset_type.check_sex_and_relatedness
else {}
),
# this field is never required, the pipeline
# will run smoothly even in its absence, but
# will trigger special handling if it is present.
**(
{'info.CALIBRATION_SENSITIVITY': hl.tarray(hl.tstr)}
if hasattr(mt, 'info') and hasattr(mt.info, 'CALIBRATION_SENSITIVITY')
else {}
),
}
7 changes: 6 additions & 1 deletion v03_pipeline/lib/tasks/base/base_update.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,12 @@ def run(self) -> None:
if not self.output().exists():
ht = self.initialize_table()
else:
ht = hl.read_table(self.output().path)
read_fn = (
hl.read_matrix_table
if self.output().path.endswith('mt')
else hl.read_table
)
ht = read_fn(self.output().path)
ht = self.update_table(ht)
write(ht, self.output().path)
# Set force to false after run, allowing "complete()" to succeeded
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -206,13 +206,13 @@ def test_missing_interval_reference(
@patch('v03_pipeline.lib.tasks.write_new_variants_table.register_alleles_in_chunks')
@patch('v03_pipeline.lib.tasks.write_new_variants_table.Env')
@patch(
'v03_pipeline.lib.tasks.write_imported_callset.UpdatedCachedReferenceDatasetQuery',
'v03_pipeline.lib.tasks.validate_callset.UpdatedCachedReferenceDatasetQuery',
)
@patch(
'v03_pipeline.lib.tasks.write_new_variants_table.UpdateVariantAnnotationsTableWithUpdatedReferenceDataset',
)
@patch(
'v03_pipeline.lib.tasks.write_imported_callset.validate_expected_contig_frequency',
'v03_pipeline.lib.tasks.validate_callset.validate_expected_contig_frequency',
partial(validate_expected_contig_frequency, min_rows_per_contig=25),
)
@patch.object(ReferenceGenome, 'standard_contigs', new_callable=PropertyMock)
Expand Down
149 changes: 149 additions & 0 deletions v03_pipeline/lib/tasks/validate_callset.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
import hail as hl
import luigi
import luigi.util

from v03_pipeline.lib.misc.callsets import additional_row_fields
from v03_pipeline.lib.misc.validation import (
validate_allele_type,
validate_expected_contig_frequency,
validate_imported_field_types,
validate_imputed_sex_ploidy,
validate_no_duplicate_variants,
validate_sample_type,
)
from v03_pipeline.lib.model import CachedReferenceDatasetQuery
from v03_pipeline.lib.model.environment import Env
from v03_pipeline.lib.paths import (
cached_reference_dataset_query_path,
imported_callset_path,
sex_check_table_path,
)
from v03_pipeline.lib.tasks.base.base_loading_run_params import BaseLoadingRunParams
from v03_pipeline.lib.tasks.base.base_update import BaseUpdateTask
from v03_pipeline.lib.tasks.files import CallsetTask, GCSorLocalTarget, HailTableTask
from v03_pipeline.lib.tasks.reference_data.updated_cached_reference_dataset_query import (
UpdatedCachedReferenceDatasetQuery,
)
from v03_pipeline.lib.tasks.write_imported_callset import WriteImportedCallsetTask
from v03_pipeline.lib.tasks.write_sex_check_table import WriteSexCheckTableTask


@luigi.util.inherits(BaseLoadingRunParams)
class ValidateCallsetTask(BaseUpdateTask):
def complete(self) -> luigi.Target:
if not self.force and super().complete():
mt = hl.read_matrix_table(self.output().path)
return hasattr(mt, 'validated_sample_type') and hl.eval(
self.sample_type.value == mt.validated_sample_type,
)
return False

def output(self) -> luigi.Target:
return GCSorLocalTarget(
imported_callset_path(
self.reference_genome,
self.dataset_type,
self.callset_path,
),
)

def requires(self) -> list[luigi.Task]:
requirements = [
self.clone(WriteImportedCallsetTask),
]
if not self.skip_validation and self.dataset_type.can_run_validation:
requirements = [
*requirements,
(
self.clone(
UpdatedCachedReferenceDatasetQuery,
crdq=CachedReferenceDatasetQuery.GNOMAD_CODING_AND_NONCODING_VARIANTS,
)
if Env.REFERENCE_DATA_AUTO_UPDATE
else HailTableTask(
cached_reference_dataset_query_path(
self.reference_genome,
self.dataset_type,
CachedReferenceDatasetQuery.GNOMAD_CODING_AND_NONCODING_VARIANTS,
),
),
),
]
if (
Env.CHECK_SEX_AND_RELATEDNESS
and not self.skip_check_sex_and_relatedness
and self.dataset_type.check_sex_and_relatedness
):
requirements = [
*requirements,
self.clone(WriteSexCheckTableTask),
]
return [
*requirements,
CallsetTask(self.callset_path),
]

def update_table(self, mt: hl.MatrixTable) -> hl.MatrixTable:
mt = hl.read_matrix_table(
imported_callset_path(
self.reference_genome,
self.dataset_type,
self.callset_path,
),
)
# This validation isn't override-able. If a field is the wrong
# type, the pipeline will likely hard-fail downstream.
validate_imported_field_types(
mt,
self.dataset_type,
additional_row_fields(
mt,
self.dataset_type,
self.skip_check_sex_and_relatedness,
),
)
if self.dataset_type.can_run_validation:
# Rather than throwing an error, we silently remove invalid contigs.
# This happens fairly often for AnVIL requests.
mt = mt.filter_rows(
hl.set(self.reference_genome.standard_contigs).contains(
mt.locus.contig,
),
)
if not self.skip_validation and self.dataset_type.can_run_validation:
validate_allele_type(mt)
validate_no_duplicate_variants(mt)
validate_expected_contig_frequency(mt, self.reference_genome)
coding_and_noncoding_ht = hl.read_table(
cached_reference_dataset_query_path(
self.reference_genome,
self.dataset_type,
CachedReferenceDatasetQuery.GNOMAD_CODING_AND_NONCODING_VARIANTS,
),
)
validate_sample_type(
mt,
coding_and_noncoding_ht,
self.reference_genome,
self.sample_type,
)
if (
Env.CHECK_SEX_AND_RELATEDNESS
and not self.skip_check_sex_and_relatedness
and self.dataset_type.check_sex_and_relatedness
):
sex_check_ht = hl.read_table(
sex_check_table_path(
self.reference_genome,
self.dataset_type,
self.callset_path,
),
)
validate_imputed_sex_ploidy(
mt,
sex_check_ht,
)
return mt.select_globals(
callset_path=self.callset_path,
validated_sample_type=self.sample_type.value,
)
127 changes: 9 additions & 118 deletions v03_pipeline/lib/tasks/write_imported_callset.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,47 +2,28 @@
import luigi
import luigi.util

from v03_pipeline.lib.misc.callsets import additional_row_fields
from v03_pipeline.lib.misc.io import (
import_callset,
import_vcf,
select_relevant_fields,
split_multi_hts,
)
from v03_pipeline.lib.misc.validation import (
validate_allele_type,
validate_expected_contig_frequency,
validate_imported_field_types,
validate_imputed_sex_ploidy,
validate_no_duplicate_variants,
validate_sample_type,
)
from v03_pipeline.lib.misc.vets import annotate_vets
from v03_pipeline.lib.model import CachedReferenceDatasetQuery
from v03_pipeline.lib.model.environment import Env
from v03_pipeline.lib.paths import (
cached_reference_dataset_query_path,
imported_callset_path,
sex_check_table_path,
valid_filters_path,
)
from v03_pipeline.lib.tasks.base.base_loading_run_params import BaseLoadingRunParams
from v03_pipeline.lib.tasks.base.base_write import BaseWriteTask
from v03_pipeline.lib.tasks.files import CallsetTask, GCSorLocalTarget, HailTableTask
from v03_pipeline.lib.tasks.reference_data.updated_cached_reference_dataset_query import (
UpdatedCachedReferenceDatasetQuery,
)
from v03_pipeline.lib.tasks.write_sex_check_table import WriteSexCheckTableTask
from v03_pipeline.lib.tasks.files import CallsetTask, GCSorLocalTarget


@luigi.util.inherits(BaseLoadingRunParams)
class WriteImportedCallsetTask(BaseWriteTask):
def complete(self) -> luigi.Target:
if not self.force and super().complete():
mt = hl.read_matrix_table(self.output().path)
return hasattr(mt, 'sample_type') and hl.eval(
self.sample_type.value == mt.sample_type,
)
return False
return not self.force and super().complete()

def output(self) -> luigi.Target:
return GCSorLocalTarget(
Expand Down Expand Up @@ -72,56 +53,11 @@ def requires(self) -> list[luigi.Task]:
),
),
]
if not self.skip_validation and self.dataset_type.can_run_validation:
requirements = [
*requirements,
(
self.clone(
UpdatedCachedReferenceDatasetQuery,
crdq=CachedReferenceDatasetQuery.GNOMAD_CODING_AND_NONCODING_VARIANTS,
)
if Env.REFERENCE_DATA_AUTO_UPDATE
else HailTableTask(
cached_reference_dataset_query_path(
self.reference_genome,
self.dataset_type,
CachedReferenceDatasetQuery.GNOMAD_CODING_AND_NONCODING_VARIANTS,
),
),
),
]
if (
Env.CHECK_SEX_AND_RELATEDNESS
and not self.skip_check_sex_and_relatedness
and self.dataset_type.check_sex_and_relatedness
):
requirements = [
*requirements,
self.clone(WriteSexCheckTableTask),
]
return [
*requirements,
CallsetTask(self.callset_path),
]

def additional_row_fields(self, mt):
return {
**(
{'info.AF': hl.tarray(hl.tfloat64)}
if not self.skip_check_sex_and_relatedness
and self.dataset_type.check_sex_and_relatedness
else {}
),
# this field is never required, the pipeline
# will run smoothly even in its absence, but
# will trigger special handling if it is present.
**(
{'info.CALIBRATION_SENSITIVITY': hl.tarray(hl.tstr)}
if hasattr(mt, 'info') and hasattr(mt.info, 'CALIBRATION_SENSITIVITY')
else {}
),
}

def create_table(self) -> hl.MatrixTable:
mt = import_callset(
self.callset_path,
Expand All @@ -146,64 +82,19 @@ def create_table(self) -> hl.MatrixTable:
mt = select_relevant_fields(
mt,
self.dataset_type,
self.additional_row_fields(mt),
)
# This validation isn't override-able. If a field is the wrong
# type, the pipeline will likely hard-fail downstream.
validate_imported_field_types(
mt,
self.dataset_type,
self.additional_row_fields(mt),
additional_row_fields(
mt,
self.dataset_type,
self.skip_check_sex_and_relatedness,
),
)
if self.dataset_type.has_multi_allelic_variants:
mt = split_multi_hts(mt)
# Special handling of variant-level filter annotation for VETs filters.
# The annotations are present on the sample-level FT field but are
# expected upstream on "filters".
mt = annotate_vets(mt)
if self.dataset_type.can_run_validation:
# Rather than throwing an error, we silently remove invalid contigs.
# This happens fairly often for AnVIL requests.
mt = mt.filter_rows(
hl.set(self.reference_genome.standard_contigs).contains(
mt.locus.contig,
),
)
if not self.skip_validation and self.dataset_type.can_run_validation:
validate_allele_type(mt)
validate_no_duplicate_variants(mt)
validate_expected_contig_frequency(mt, self.reference_genome)
coding_and_noncoding_ht = hl.read_table(
cached_reference_dataset_query_path(
self.reference_genome,
self.dataset_type,
CachedReferenceDatasetQuery.GNOMAD_CODING_AND_NONCODING_VARIANTS,
),
)
validate_sample_type(
mt,
coding_and_noncoding_ht,
self.reference_genome,
self.sample_type,
)
if (
Env.CHECK_SEX_AND_RELATEDNESS
and not self.skip_check_sex_and_relatedness
and self.dataset_type.check_sex_and_relatedness
):
sex_check_ht = hl.read_table(
sex_check_table_path(
self.reference_genome,
self.dataset_type,
self.callset_path,
),
)
validate_imputed_sex_ploidy(
mt,
sex_check_ht,
)
return mt.annotate_globals(
return mt.select_globals(
callset_path=self.callset_path,
filters_path=filters_path or hl.missing(hl.tstr),
sample_type=self.sample_type.value,
)
Loading
Loading