Skip to content

Commit 3178560

Browse files
authored
Benb/split import and validate (#829)
* split import and validate * lint and share function * ruff * change dep * tweak update * lint * wrong method * correct method * mocks * change sample type annotation on test * remove additional row fields
1 parent c113106 commit 3178560

8 files changed

+194
-126
lines changed

v03_pipeline/lib/misc/callsets.py

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,3 +28,26 @@ def get_callset_ht(
2828
callset_hts,
2929
)
3030
return callset_ht.distinct()
31+
32+
33+
def additional_row_fields(
34+
mt: hl.MatrixTable,
35+
dataset_type: DatasetType,
36+
skip_check_sex_and_relatedness: bool,
37+
):
38+
return {
39+
**(
40+
{'info.AF': hl.tarray(hl.tfloat64)}
41+
if not skip_check_sex_and_relatedness
42+
and dataset_type.check_sex_and_relatedness
43+
else {}
44+
),
45+
# this field is never required, the pipeline
46+
# will run smoothly even in its absence, but
47+
# will trigger special handling if it is present.
48+
**(
49+
{'info.CALIBRATION_SENSITIVITY': hl.tarray(hl.tstr)}
50+
if hasattr(mt, 'info') and hasattr(mt.info, 'CALIBRATION_SENSITIVITY')
51+
else {}
52+
),
53+
}

v03_pipeline/lib/tasks/base/base_update.py

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,12 @@ def run(self) -> None:
1010
if not self.output().exists():
1111
ht = self.initialize_table()
1212
else:
13-
ht = hl.read_table(self.output().path)
13+
read_fn = (
14+
hl.read_matrix_table
15+
if self.output().path.endswith('mt')
16+
else hl.read_table
17+
)
18+
ht = read_fn(self.output().path)
1419
ht = self.update_table(ht)
1520
write(ht, self.output().path)
1621
# Set force to false after run, allowing "complete()" to succeeded

v03_pipeline/lib/tasks/update_variant_annotations_table_with_new_samples_test.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -206,13 +206,13 @@ def test_missing_interval_reference(
206206
@patch('v03_pipeline.lib.tasks.write_new_variants_table.register_alleles_in_chunks')
207207
@patch('v03_pipeline.lib.tasks.write_new_variants_table.Env')
208208
@patch(
209-
'v03_pipeline.lib.tasks.write_imported_callset.UpdatedCachedReferenceDatasetQuery',
209+
'v03_pipeline.lib.tasks.validate_callset.UpdatedCachedReferenceDatasetQuery',
210210
)
211211
@patch(
212212
'v03_pipeline.lib.tasks.write_new_variants_table.UpdateVariantAnnotationsTableWithUpdatedReferenceDataset',
213213
)
214214
@patch(
215-
'v03_pipeline.lib.tasks.write_imported_callset.validate_expected_contig_frequency',
215+
'v03_pipeline.lib.tasks.validate_callset.validate_expected_contig_frequency',
216216
partial(validate_expected_contig_frequency, min_rows_per_contig=25),
217217
)
218218
@patch.object(ReferenceGenome, 'standard_contigs', new_callable=PropertyMock)
Lines changed: 149 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,149 @@
1+
import hail as hl
2+
import luigi
3+
import luigi.util
4+
5+
from v03_pipeline.lib.misc.callsets import additional_row_fields
6+
from v03_pipeline.lib.misc.validation import (
7+
validate_allele_type,
8+
validate_expected_contig_frequency,
9+
validate_imported_field_types,
10+
validate_imputed_sex_ploidy,
11+
validate_no_duplicate_variants,
12+
validate_sample_type,
13+
)
14+
from v03_pipeline.lib.model import CachedReferenceDatasetQuery
15+
from v03_pipeline.lib.model.environment import Env
16+
from v03_pipeline.lib.paths import (
17+
cached_reference_dataset_query_path,
18+
imported_callset_path,
19+
sex_check_table_path,
20+
)
21+
from v03_pipeline.lib.tasks.base.base_loading_run_params import BaseLoadingRunParams
22+
from v03_pipeline.lib.tasks.base.base_update import BaseUpdateTask
23+
from v03_pipeline.lib.tasks.files import CallsetTask, GCSorLocalTarget, HailTableTask
24+
from v03_pipeline.lib.tasks.reference_data.updated_cached_reference_dataset_query import (
25+
UpdatedCachedReferenceDatasetQuery,
26+
)
27+
from v03_pipeline.lib.tasks.write_imported_callset import WriteImportedCallsetTask
28+
from v03_pipeline.lib.tasks.write_sex_check_table import WriteSexCheckTableTask
29+
30+
31+
@luigi.util.inherits(BaseLoadingRunParams)
32+
class ValidateCallsetTask(BaseUpdateTask):
33+
def complete(self) -> luigi.Target:
34+
if not self.force and super().complete():
35+
mt = hl.read_matrix_table(self.output().path)
36+
return hasattr(mt, 'validated_sample_type') and hl.eval(
37+
self.sample_type.value == mt.validated_sample_type,
38+
)
39+
return False
40+
41+
def output(self) -> luigi.Target:
42+
return GCSorLocalTarget(
43+
imported_callset_path(
44+
self.reference_genome,
45+
self.dataset_type,
46+
self.callset_path,
47+
),
48+
)
49+
50+
def requires(self) -> list[luigi.Task]:
51+
requirements = [
52+
self.clone(WriteImportedCallsetTask),
53+
]
54+
if not self.skip_validation and self.dataset_type.can_run_validation:
55+
requirements = [
56+
*requirements,
57+
(
58+
self.clone(
59+
UpdatedCachedReferenceDatasetQuery,
60+
crdq=CachedReferenceDatasetQuery.GNOMAD_CODING_AND_NONCODING_VARIANTS,
61+
)
62+
if Env.REFERENCE_DATA_AUTO_UPDATE
63+
else HailTableTask(
64+
cached_reference_dataset_query_path(
65+
self.reference_genome,
66+
self.dataset_type,
67+
CachedReferenceDatasetQuery.GNOMAD_CODING_AND_NONCODING_VARIANTS,
68+
),
69+
),
70+
),
71+
]
72+
if (
73+
Env.CHECK_SEX_AND_RELATEDNESS
74+
and not self.skip_check_sex_and_relatedness
75+
and self.dataset_type.check_sex_and_relatedness
76+
):
77+
requirements = [
78+
*requirements,
79+
self.clone(WriteSexCheckTableTask),
80+
]
81+
return [
82+
*requirements,
83+
CallsetTask(self.callset_path),
84+
]
85+
86+
def update_table(self, mt: hl.MatrixTable) -> hl.MatrixTable:
87+
mt = hl.read_matrix_table(
88+
imported_callset_path(
89+
self.reference_genome,
90+
self.dataset_type,
91+
self.callset_path,
92+
),
93+
)
94+
# This validation isn't override-able. If a field is the wrong
95+
# type, the pipeline will likely hard-fail downstream.
96+
validate_imported_field_types(
97+
mt,
98+
self.dataset_type,
99+
additional_row_fields(
100+
mt,
101+
self.dataset_type,
102+
self.skip_check_sex_and_relatedness,
103+
),
104+
)
105+
if self.dataset_type.can_run_validation:
106+
# Rather than throwing an error, we silently remove invalid contigs.
107+
# This happens fairly often for AnVIL requests.
108+
mt = mt.filter_rows(
109+
hl.set(self.reference_genome.standard_contigs).contains(
110+
mt.locus.contig,
111+
),
112+
)
113+
if not self.skip_validation and self.dataset_type.can_run_validation:
114+
validate_allele_type(mt)
115+
validate_no_duplicate_variants(mt)
116+
validate_expected_contig_frequency(mt, self.reference_genome)
117+
coding_and_noncoding_ht = hl.read_table(
118+
cached_reference_dataset_query_path(
119+
self.reference_genome,
120+
self.dataset_type,
121+
CachedReferenceDatasetQuery.GNOMAD_CODING_AND_NONCODING_VARIANTS,
122+
),
123+
)
124+
validate_sample_type(
125+
mt,
126+
coding_and_noncoding_ht,
127+
self.reference_genome,
128+
self.sample_type,
129+
)
130+
if (
131+
Env.CHECK_SEX_AND_RELATEDNESS
132+
and not self.skip_check_sex_and_relatedness
133+
and self.dataset_type.check_sex_and_relatedness
134+
):
135+
sex_check_ht = hl.read_table(
136+
sex_check_table_path(
137+
self.reference_genome,
138+
self.dataset_type,
139+
self.callset_path,
140+
),
141+
)
142+
validate_imputed_sex_ploidy(
143+
mt,
144+
sex_check_ht,
145+
)
146+
return mt.select_globals(
147+
callset_path=self.callset_path,
148+
validated_sample_type=self.sample_type.value,
149+
)

v03_pipeline/lib/tasks/write_imported_callset.py

Lines changed: 9 additions & 118 deletions
Original file line numberDiff line numberDiff line change
@@ -2,47 +2,28 @@
22
import luigi
33
import luigi.util
44

5+
from v03_pipeline.lib.misc.callsets import additional_row_fields
56
from v03_pipeline.lib.misc.io import (
67
import_callset,
78
import_vcf,
89
select_relevant_fields,
910
split_multi_hts,
1011
)
11-
from v03_pipeline.lib.misc.validation import (
12-
validate_allele_type,
13-
validate_expected_contig_frequency,
14-
validate_imported_field_types,
15-
validate_imputed_sex_ploidy,
16-
validate_no_duplicate_variants,
17-
validate_sample_type,
18-
)
1912
from v03_pipeline.lib.misc.vets import annotate_vets
20-
from v03_pipeline.lib.model import CachedReferenceDatasetQuery
2113
from v03_pipeline.lib.model.environment import Env
2214
from v03_pipeline.lib.paths import (
23-
cached_reference_dataset_query_path,
2415
imported_callset_path,
25-
sex_check_table_path,
2616
valid_filters_path,
2717
)
2818
from v03_pipeline.lib.tasks.base.base_loading_run_params import BaseLoadingRunParams
2919
from v03_pipeline.lib.tasks.base.base_write import BaseWriteTask
30-
from v03_pipeline.lib.tasks.files import CallsetTask, GCSorLocalTarget, HailTableTask
31-
from v03_pipeline.lib.tasks.reference_data.updated_cached_reference_dataset_query import (
32-
UpdatedCachedReferenceDatasetQuery,
33-
)
34-
from v03_pipeline.lib.tasks.write_sex_check_table import WriteSexCheckTableTask
20+
from v03_pipeline.lib.tasks.files import CallsetTask, GCSorLocalTarget
3521

3622

3723
@luigi.util.inherits(BaseLoadingRunParams)
3824
class WriteImportedCallsetTask(BaseWriteTask):
3925
def complete(self) -> luigi.Target:
40-
if not self.force and super().complete():
41-
mt = hl.read_matrix_table(self.output().path)
42-
return hasattr(mt, 'sample_type') and hl.eval(
43-
self.sample_type.value == mt.sample_type,
44-
)
45-
return False
26+
return not self.force and super().complete()
4627

4728
def output(self) -> luigi.Target:
4829
return GCSorLocalTarget(
@@ -72,56 +53,11 @@ def requires(self) -> list[luigi.Task]:
7253
),
7354
),
7455
]
75-
if not self.skip_validation and self.dataset_type.can_run_validation:
76-
requirements = [
77-
*requirements,
78-
(
79-
self.clone(
80-
UpdatedCachedReferenceDatasetQuery,
81-
crdq=CachedReferenceDatasetQuery.GNOMAD_CODING_AND_NONCODING_VARIANTS,
82-
)
83-
if Env.REFERENCE_DATA_AUTO_UPDATE
84-
else HailTableTask(
85-
cached_reference_dataset_query_path(
86-
self.reference_genome,
87-
self.dataset_type,
88-
CachedReferenceDatasetQuery.GNOMAD_CODING_AND_NONCODING_VARIANTS,
89-
),
90-
),
91-
),
92-
]
93-
if (
94-
Env.CHECK_SEX_AND_RELATEDNESS
95-
and not self.skip_check_sex_and_relatedness
96-
and self.dataset_type.check_sex_and_relatedness
97-
):
98-
requirements = [
99-
*requirements,
100-
self.clone(WriteSexCheckTableTask),
101-
]
10256
return [
10357
*requirements,
10458
CallsetTask(self.callset_path),
10559
]
10660

107-
def additional_row_fields(self, mt):
108-
return {
109-
**(
110-
{'info.AF': hl.tarray(hl.tfloat64)}
111-
if not self.skip_check_sex_and_relatedness
112-
and self.dataset_type.check_sex_and_relatedness
113-
else {}
114-
),
115-
# this field is never required, the pipeline
116-
# will run smoothly even in its absence, but
117-
# will trigger special handling if it is present.
118-
**(
119-
{'info.CALIBRATION_SENSITIVITY': hl.tarray(hl.tstr)}
120-
if hasattr(mt, 'info') and hasattr(mt.info, 'CALIBRATION_SENSITIVITY')
121-
else {}
122-
),
123-
}
124-
12561
def create_table(self) -> hl.MatrixTable:
12662
mt = import_callset(
12763
self.callset_path,
@@ -146,64 +82,19 @@ def create_table(self) -> hl.MatrixTable:
14682
mt = select_relevant_fields(
14783
mt,
14884
self.dataset_type,
149-
self.additional_row_fields(mt),
150-
)
151-
# This validation isn't override-able. If a field is the wrong
152-
# type, the pipeline will likely hard-fail downstream.
153-
validate_imported_field_types(
154-
mt,
155-
self.dataset_type,
156-
self.additional_row_fields(mt),
85+
additional_row_fields(
86+
mt,
87+
self.dataset_type,
88+
self.skip_check_sex_and_relatedness,
89+
),
15790
)
15891
if self.dataset_type.has_multi_allelic_variants:
15992
mt = split_multi_hts(mt)
16093
# Special handling of variant-level filter annotation for VETs filters.
16194
# The annotations are present on the sample-level FT field but are
16295
# expected upstream on "filters".
16396
mt = annotate_vets(mt)
164-
if self.dataset_type.can_run_validation:
165-
# Rather than throwing an error, we silently remove invalid contigs.
166-
# This happens fairly often for AnVIL requests.
167-
mt = mt.filter_rows(
168-
hl.set(self.reference_genome.standard_contigs).contains(
169-
mt.locus.contig,
170-
),
171-
)
172-
if not self.skip_validation and self.dataset_type.can_run_validation:
173-
validate_allele_type(mt)
174-
validate_no_duplicate_variants(mt)
175-
validate_expected_contig_frequency(mt, self.reference_genome)
176-
coding_and_noncoding_ht = hl.read_table(
177-
cached_reference_dataset_query_path(
178-
self.reference_genome,
179-
self.dataset_type,
180-
CachedReferenceDatasetQuery.GNOMAD_CODING_AND_NONCODING_VARIANTS,
181-
),
182-
)
183-
validate_sample_type(
184-
mt,
185-
coding_and_noncoding_ht,
186-
self.reference_genome,
187-
self.sample_type,
188-
)
189-
if (
190-
Env.CHECK_SEX_AND_RELATEDNESS
191-
and not self.skip_check_sex_and_relatedness
192-
and self.dataset_type.check_sex_and_relatedness
193-
):
194-
sex_check_ht = hl.read_table(
195-
sex_check_table_path(
196-
self.reference_genome,
197-
self.dataset_type,
198-
self.callset_path,
199-
),
200-
)
201-
validate_imputed_sex_ploidy(
202-
mt,
203-
sex_check_ht,
204-
)
205-
return mt.annotate_globals(
97+
return mt.select_globals(
20698
callset_path=self.callset_path,
20799
filters_path=filters_path or hl.missing(hl.tstr),
208-
sample_type=self.sample_type.value,
209100
)

0 commit comments

Comments
 (0)