Skip to content

Commit 0812e75

Browse files
committed
Merge branch 'dev' of github.com:broadinstitute/seqr-loading-pipelines into benb/add_metadata_sex_and_relatedness_failures
2 parents 6dde833 + 127a074 commit 0812e75

12 files changed

+226
-20
lines changed

requirements.in

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
elasticsearch==7.9.1
22
google-api-python-client>=1.8.0
3-
hail==0.2.122
3+
hail==0.2.128
44
luigi>=3.4.0
55
gnomad==0.6.4
6+
google-cloud-storage>=2.14.0

requirements.txt

Lines changed: 6 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,6 @@ asttokens==2.4.1
1616
# via stack-data
1717
async-timeout==4.0.3
1818
# via aiohttp
19-
asyncinit==0.2.4
20-
# via hail
2119
attrs==23.1.0
2220
# via aiohttp
2321
avro==1.11.3
@@ -119,19 +117,19 @@ google-auth-httplib2==0.1.1
119117
# via google-api-python-client
120118
google-auth-oauthlib==0.8.0
121119
# via hail
122-
google-cloud-core==2.3.3
120+
google-cloud-core==2.4.1
123121
# via google-cloud-storage
124-
google-cloud-storage==2.13.0
125-
# via hail
122+
google-cloud-storage==2.14.0
123+
# via -r requirements.in
126124
google-crc32c==1.5.0
127125
# via
128126
# google-cloud-storage
129127
# google-resumable-media
130-
google-resumable-media==2.6.0
128+
google-resumable-media==2.7.0
131129
# via google-cloud-storage
132130
googleapis-common-protos==1.61.0
133131
# via google-api-core
134-
hail==0.2.122
132+
hail==0.2.128
135133
# via -r requirements.in
136134
hdbscan==0.8.33
137135
# via gnomad
@@ -254,9 +252,7 @@ pygments==2.17.2
254252
# ipython
255253
# rich
256254
pyjwt[crypto]==2.8.0
257-
# via
258-
# msal
259-
# pyjwt
255+
# via msal
260256
pyparsing==3.1.1
261257
# via httplib2
262258
pyspark==3.3.3

v03_pipeline/lib/annotations/fields_test.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,13 +31,16 @@ def setUp(self) -> None:
3131
),
3232
)
3333

34+
@patch('v03_pipeline.lib.vep.validate_vep_config_reference_genome')
3435
@patch('v03_pipeline.lib.vep.hl.vep')
35-
def test_get_formatting_fields(self, mock_vep: Mock) -> None:
36+
def test_get_formatting_fields(self, mock_vep: Mock, mock_validate: Mock) -> None:
3637
ht = hl.read_table(TEST_COMBINED_1)
3738
mock_vep.return_value = ht.annotate(vep=MOCK_VEP_DATA)
39+
mock_validate.return_value = None
3840
ht = run_vep(
3941
ht,
4042
DatasetType.SNV_INDEL,
43+
ReferenceGenome.GRCh38,
4144
None,
4245
)
4346
ht = ht.annotate(rsid='abcd')

v03_pipeline/lib/annotations/shared_test.py

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,8 +10,13 @@
1010

1111

1212
class SharedAnnotationsTest(unittest.TestCase):
13+
@patch('v03_pipeline.lib.vep.validate_vep_config_reference_genome')
1314
@patch('v03_pipeline.lib.vep.hl.vep')
14-
def test_sorted_transcript_consequences(self, mock_vep: Mock) -> None:
15+
def test_sorted_transcript_consequences(
16+
self,
17+
mock_vep: Mock,
18+
mock_validate: Mock,
19+
) -> None:
1520
ht = hl.Table.parallelize(
1621
[
1722
{
@@ -30,9 +35,11 @@ def test_sorted_transcript_consequences(self, mock_vep: Mock) -> None:
3035
key=['locus', 'alleles'],
3136
)
3237
mock_vep.return_value = ht.annotate(vep=MOCK_VEP_DATA)
38+
mock_validate.return_value = None
3339
ht = run_vep(
3440
ht,
3541
DatasetType.SNV_INDEL,
42+
ReferenceGenome.GRCh38,
3643
None,
3744
)
3845
ht = ht.select(

v03_pipeline/lib/paths.py

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -79,12 +79,10 @@ def metadata_for_run_path(
7979
run_id: str,
8080
) -> str:
8181
return os.path.join(
82-
_v03_pipeline_prefix(
83-
Env.HAIL_SEARCH_DATA,
82+
runs_path(
8483
reference_genome,
8584
dataset_type,
8685
),
87-
'runs',
8886
run_id,
8987
'metadata.json',
9088
)
@@ -140,6 +138,20 @@ def remapped_and_subsetted_callset_path(
140138
)
141139

142140

141+
def runs_path(
142+
reference_genome: ReferenceGenome,
143+
dataset_type: DatasetType,
144+
) -> str:
145+
return os.path.join(
146+
_v03_pipeline_prefix(
147+
Env.HAIL_SEARCH_DATA,
148+
reference_genome,
149+
dataset_type,
150+
),
151+
'runs',
152+
)
153+
154+
143155
def sample_lookup_table_path(
144156
reference_genome: ReferenceGenome,
145157
dataset_type: DatasetType,

v03_pipeline/lib/tasks/__init__.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
from v03_pipeline.lib.tasks.delete_old_runs import DeleteOldRunsTask
12
from v03_pipeline.lib.tasks.update_project_table import UpdateProjectTableTask
23
from v03_pipeline.lib.tasks.update_sample_lookup_table import (
34
UpdateSampleLookupTableTask,
@@ -11,6 +12,7 @@
1112
)
1213

1314
__all__ = [
15+
'DeleteOldRunsTask',
1416
'UpdateProjectTableTask',
1517
'UpdateSampleLookupTableTask',
1618
'UpdateVariantAnnotationsTableWithNewSamplesTask',
Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
import os
2+
3+
import hail as hl
4+
5+
from v03_pipeline.lib.paths import runs_path
6+
from v03_pipeline.lib.tasks.base.base_hail_table_task import BaseHailTableTask
7+
from v03_pipeline.lib.tasks.files import GCSorLocalTarget
8+
9+
MIN_SUCCESSFUL_RUNS = 10
10+
11+
12+
class DeleteOldRunsTask(BaseHailTableTask):
13+
def __init__(self, *args, **kwargs):
14+
super().__init__(*args, **kwargs)
15+
self._complete = False
16+
17+
def complete(self) -> bool:
18+
return self._complete
19+
20+
def run(self) -> None:
21+
run_dir_paths = sorted(
22+
[
23+
rd['path']
24+
for rd in hl.hadoop_ls(
25+
runs_path(
26+
self.reference_genome,
27+
self.dataset_type,
28+
),
29+
)
30+
if rd['is_dir']
31+
],
32+
)
33+
successful_run_dir_paths = [
34+
run_dir_path
35+
for run_dir_path in run_dir_paths
36+
if hl.hadoop_exists(
37+
os.path.join(
38+
run_dir_path,
39+
'_SUCCESS',
40+
),
41+
)
42+
]
43+
if len(successful_run_dir_paths) < MIN_SUCCESSFUL_RUNS:
44+
self._complete = True
45+
return
46+
47+
# Delete run dirs until we encounter the first of the N successful runs to keep.
48+
oldest_successful_run_index = run_dir_paths.index(
49+
successful_run_dir_paths[-MIN_SUCCESSFUL_RUNS],
50+
)
51+
for run_dir_path in run_dir_paths[:oldest_successful_run_index]:
52+
GCSorLocalTarget(run_dir_path.replace('file:', '')).remove()
53+
self._complete = True
Lines changed: 113 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,113 @@
1+
import os
2+
from pathlib import Path
3+
4+
import hail as hl
5+
import luigi.worker
6+
7+
from v03_pipeline.lib.model import DatasetType, ReferenceGenome, SampleType
8+
from v03_pipeline.lib.paths import runs_path
9+
from v03_pipeline.lib.tasks.delete_old_runs import DeleteOldRunsTask
10+
from v03_pipeline.lib.test.mocked_dataroot_testcase import MockedDatarootTestCase
11+
12+
13+
class DeleteOldRunsTaskTest(MockedDatarootTestCase):
14+
def test_too_few_successful_runs(self) -> None:
15+
for run_dir in [
16+
'manual__2024-01-05',
17+
'manual__2024-01-06',
18+
'manual__2024-01-07',
19+
'manual__2024-01-08',
20+
'manual__2024-01-09',
21+
'manual__2024-01-10',
22+
'manual__2024-01-11',
23+
]:
24+
run_dir_path = os.path.join(
25+
runs_path(
26+
ReferenceGenome.GRCh38,
27+
DatasetType.SNV_INDEL,
28+
),
29+
run_dir,
30+
)
31+
Path(run_dir_path).mkdir(parents=True, exist_ok=True)
32+
Path((run_dir_path) / Path('_SUCCESS')).touch()
33+
worker = luigi.worker.Worker()
34+
write_metadata_for_run_task = DeleteOldRunsTask(
35+
reference_genome=ReferenceGenome.GRCh38,
36+
dataset_type=DatasetType.SNV_INDEL,
37+
sample_type=SampleType.WGS,
38+
)
39+
worker.add(write_metadata_for_run_task)
40+
worker.run()
41+
self.assertTrue(write_metadata_for_run_task.complete())
42+
self.assertEqual(
43+
len(
44+
hl.hadoop_ls(
45+
runs_path(
46+
ReferenceGenome.GRCh38,
47+
DatasetType.SNV_INDEL,
48+
),
49+
),
50+
),
51+
7,
52+
)
53+
54+
def test_leave_incomplete_runs(self) -> None:
55+
for run_dir in [
56+
'manual__2024-01-05',
57+
'manual__2024-01-06',
58+
'manual__2024-01-07',
59+
'manual__2024-01-08',
60+
'manual__2024-01-09',
61+
'manual__2024-01-10',
62+
'manual__2024-01-11',
63+
'manual__2024-01-12',
64+
'manual__2024-01-13',
65+
'manual__2024-01-14',
66+
'manual__2024-01-15',
67+
'manual__2024-01-16',
68+
'manual__2024-01-17',
69+
]:
70+
run_dir_path = os.path.join(
71+
runs_path(
72+
ReferenceGenome.GRCh38,
73+
DatasetType.SNV_INDEL,
74+
),
75+
run_dir,
76+
)
77+
Path(run_dir_path).mkdir(parents=True, exist_ok=True)
78+
79+
# Force a couple of incomplete runs
80+
if run_dir not in {'manual__2024-01-13', 'manual__2024-01-16'}:
81+
Path((run_dir_path) / Path('_SUCCESS')).touch()
82+
83+
worker = luigi.worker.Worker()
84+
write_metadata_for_run_task = DeleteOldRunsTask(
85+
reference_genome=ReferenceGenome.GRCh38,
86+
dataset_type=DatasetType.SNV_INDEL,
87+
sample_type=SampleType.WGS,
88+
)
89+
worker.add(write_metadata_for_run_task)
90+
worker.run()
91+
self.assertTrue(write_metadata_for_run_task.complete())
92+
self.assertEqual(
93+
len(
94+
hl.hadoop_ls(
95+
runs_path(
96+
ReferenceGenome.GRCh38,
97+
DatasetType.SNV_INDEL,
98+
),
99+
),
100+
),
101+
12,
102+
)
103+
self.assertFalse(
104+
hl.hadoop_exists(
105+
os.path.join(
106+
runs_path(
107+
ReferenceGenome.GRCh38,
108+
DatasetType.SNV_INDEL,
109+
),
110+
'manual__2024-01-05',
111+
),
112+
),
113+
)

v03_pipeline/lib/tasks/update_variant_annotations_table_with_new_samples.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -215,6 +215,7 @@ def update_table(self, ht: hl.Table) -> hl.Table:
215215
new_variants_ht = run_vep(
216216
new_variants_ht,
217217
self.dataset_type,
218+
self.reference_genome,
218219
self.vep_config_json_path,
219220
)
220221

v03_pipeline/lib/tasks/update_variant_annotations_table_with_new_samples_test.py

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -186,14 +186,17 @@ def test_missing_interval_reference(self, mock_update_rdc_task) -> None:
186186
)
187187
@patch.object(ReferenceGenome, 'standard_contigs', new_callable=PropertyMock)
188188
@patch('v03_pipeline.lib.vep.hl.vep')
189+
@patch('v03_pipeline.lib.vep.validate_vep_config_reference_genome')
189190
def test_multiple_update_vat(
190191
self,
192+
mock_vep_validate: Mock,
191193
mock_vep: Mock,
192194
mock_standard_contigs: Mock,
193195
mock_update_rdc_task: Mock,
194196
) -> None:
195197
mock_update_rdc_task.return_value = MockCompleteTask()
196198
mock_vep.side_effect = lambda ht, **_: ht.annotate(vep=MOCK_VEP_DATA)
199+
mock_vep_validate.return_value = None
197200
mock_standard_contigs.return_value = {'chr1'}
198201
# This creates a mock validation table with 1 coding and 1 non-coding variant
199202
# explicitly chosen from the VCF.
@@ -504,13 +507,16 @@ def test_multiple_update_vat(
504507
)
505508

506509
@patch('v03_pipeline.lib.vep.hl.vep')
510+
@patch('v03_pipeline.lib.vep.validate_vep_config_reference_genome')
507511
def test_update_vat_grch37(
508512
self,
513+
mock_vep_validate: Mock,
509514
mock_vep: Mock,
510515
mock_update_rdc_task: Mock,
511516
) -> None:
512517
mock_update_rdc_task.return_value = MockCompleteTask()
513518
mock_vep.side_effect = lambda ht, **_: ht.annotate(vep=MOCK_VEP_DATA)
519+
mock_vep_validate.return_value = None
514520
worker = luigi.worker.Worker()
515521
uvatwns_task = UpdateVariantAnnotationsTableWithNewSamplesTask(
516522
reference_genome=ReferenceGenome.GRCh37,
@@ -551,8 +557,10 @@ def test_update_vat_grch37(
551557

552558
@patch('v03_pipeline.lib.model.reference_dataset_collection.Env')
553559
@patch('v03_pipeline.lib.vep.hl.vep')
560+
@patch('v03_pipeline.lib.vep.validate_vep_config_reference_genome')
554561
def test_update_vat_without_accessing_private_datasets(
555562
self,
563+
mock_vep_validate: Mock,
556564
mock_vep: Mock,
557565
mock_rdc_env: Mock,
558566
mock_update_rdc_task: Mock,
@@ -567,6 +575,7 @@ def test_update_vat_without_accessing_private_datasets(
567575
)
568576
mock_rdc_env.ACCESS_PRIVATE_REFERENCE_DATASETS = False
569577
mock_vep.side_effect = lambda ht, **_: ht.annotate(vep=MOCK_VEP_DATA)
578+
mock_vep_validate.return_value = None
570579
worker = luigi.worker.Worker()
571580
uvatwns_task = UpdateVariantAnnotationsTableWithNewSamplesTask(
572581
reference_genome=ReferenceGenome.GRCh38,

0 commit comments

Comments
 (0)