Skip to content

Commit ac4baa9

Browse files
committed
Merge branch 'main' of github.com:broadinstitute/seqr-loading-pipelines into dev
2 parents c126df8 + 0362aef commit ac4baa9

9 files changed

+673
-37
lines changed

v03_pipeline/lib/misc/lookup.py

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -121,6 +121,35 @@ def remove_family_guids(
121121
)
122122

123123

124+
def remove_project(
125+
ht: hl.Table,
126+
project_guid: str,
127+
) -> hl.Table:
128+
existing_project_guids = hl.eval(ht.globals.project_guids)
129+
if project_guid not in existing_project_guids:
130+
return ht
131+
project_indexes_to_keep = (
132+
hl.enumerate(existing_project_guids)
133+
.filter(lambda item: item[1] != project_guid)
134+
.map(lambda item: item[0])
135+
)
136+
ht = ht.annotate(
137+
project_stats=(
138+
project_indexes_to_keep.map(
139+
lambda i: ht.project_stats[i],
140+
)
141+
),
142+
)
143+
return ht.annotate_globals(
144+
project_guids=project_indexes_to_keep.map(
145+
lambda i: ht.project_guids[i],
146+
),
147+
project_families=hl.dict(
148+
ht.project_families.items().filter(lambda item: item[0] != project_guid),
149+
),
150+
)
151+
152+
124153
def join_lookup_hts(
125154
ht: hl.Table,
126155
callset_ht: hl.Table,

v03_pipeline/lib/misc/lookup_test.py

Lines changed: 128 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
compute_callset_lookup_ht,
77
join_lookup_hts,
88
remove_family_guids,
9+
remove_project,
910
)
1011
from v03_pipeline.lib.model import DatasetType
1112

@@ -224,6 +225,133 @@ def test_remove_new_callset_family_guids(self) -> None:
224225
],
225226
)
226227

228+
def test_remove_project(self) -> None:
229+
lookup_ht = hl.Table.parallelize(
230+
[
231+
{
232+
'id': 0,
233+
'project_stats': [
234+
[
235+
hl.Struct(
236+
ref_samples=0,
237+
heteroplasmic_samples=0,
238+
homoplasmic_samples=0,
239+
),
240+
hl.Struct(
241+
ref_samples=1,
242+
heteroplasmic_samples=1,
243+
homoplasmic_samples=1,
244+
),
245+
hl.Struct(
246+
ref_samples=2,
247+
heteroplasmic_samples=2,
248+
homoplasmic_samples=2,
249+
),
250+
],
251+
[
252+
hl.Struct(
253+
ref_samples=3,
254+
heteroplasmic_samples=3,
255+
homoplasmic_samples=3,
256+
),
257+
],
258+
],
259+
},
260+
{
261+
'id': 1,
262+
'project_stats': [
263+
[
264+
hl.Struct(
265+
ref_samples=0,
266+
heteroplasmic_samples=0,
267+
homoplasmic_samples=0,
268+
),
269+
hl.Struct(
270+
ref_samples=1,
271+
heteroplasmic_samples=1,
272+
homoplasmic_samples=1,
273+
),
274+
hl.Struct(
275+
ref_samples=2,
276+
heteroplasmic_samples=2,
277+
homoplasmic_samples=2,
278+
),
279+
],
280+
[
281+
hl.Struct(
282+
ref_samples=3,
283+
heteroplasmic_samples=3,
284+
homoplasmic_samples=3,
285+
),
286+
],
287+
],
288+
},
289+
],
290+
hl.tstruct(
291+
id=hl.tint32,
292+
project_stats=hl.tarray(
293+
hl.tarray(
294+
hl.tstruct(
295+
ref_samples=hl.tint32,
296+
heteroplasmic_samples=hl.tint32,
297+
homoplasmic_samples=hl.tint32,
298+
),
299+
),
300+
),
301+
),
302+
key='id',
303+
globals=hl.Struct(
304+
project_guids=['project_a', 'project_b'],
305+
project_families={'project_a': ['1', '2', '3'], 'project_b': ['4']},
306+
),
307+
)
308+
lookup_ht = remove_project(
309+
lookup_ht,
310+
'project_c',
311+
)
312+
lookup_ht = remove_project(
313+
lookup_ht,
314+
'project_a',
315+
)
316+
self.assertCountEqual(
317+
lookup_ht.globals.collect(),
318+
[
319+
hl.Struct(
320+
project_guids=['project_b'],
321+
project_families={'project_b': ['4']},
322+
),
323+
],
324+
)
325+
self.assertCountEqual(
326+
lookup_ht.collect(),
327+
[
328+
hl.Struct(
329+
id=0,
330+
project_stats=[
331+
[
332+
hl.Struct(
333+
ref_samples=3,
334+
heteroplasmic_samples=3,
335+
homoplasmic_samples=3,
336+
),
337+
],
338+
],
339+
),
340+
hl.Struct(
341+
id=1,
342+
project_stats=[
343+
[
344+
hl.Struct(
345+
ref_samples=3,
346+
heteroplasmic_samples=3,
347+
homoplasmic_samples=3,
348+
),
349+
],
350+
],
351+
),
352+
],
353+
)
354+
227355
def test_join_lookup_hts_empty_table(self) -> None:
228356
ht = hl.Table.parallelize(
229357
[],

v03_pipeline/lib/tasks/__init__.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,13 @@
44
from v03_pipeline.lib.tasks.update_lookup_table import (
55
UpdateLookupTableTask,
66
)
7+
from v03_pipeline.lib.tasks.update_lookup_table_with_deleted_project import (
8+
UpdateLookupTableWithDeletedProjectTask,
9+
)
710
from v03_pipeline.lib.tasks.update_project_table import UpdateProjectTableTask
11+
from v03_pipeline.lib.tasks.update_variant_annotations_table_with_deleted_project import (
12+
UpdateVariantAnnotationsTableWithDeletedProjectTask,
13+
)
814
from v03_pipeline.lib.tasks.update_variant_annotations_table_with_new_samples import (
915
UpdateVariantAnnotationsTableWithNewSamplesTask,
1016
)
@@ -16,7 +22,9 @@
1622
__all__ = [
1723
'UpdateProjectTableTask',
1824
'UpdateLookupTableTask',
25+
'UpdateLookupTableWithDeletedProjectTask',
1926
'UpdateVariantAnnotationsTableWithNewSamplesTask',
27+
'UpdateVariantAnnotationsTableWithDeletedProjectTask',
2028
'WriteCachedReferenceDatasetQuery',
2129
'WriteMetadataForRunTask',
2230
'WriteProjectFamilyTablesTask',
Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
import hail as hl
2+
import luigi
3+
4+
from v03_pipeline.lib.paths import lookup_table_path
5+
from v03_pipeline.lib.tasks.base.base_update_task import BaseUpdateTask
6+
from v03_pipeline.lib.tasks.files import GCSorLocalTarget
7+
8+
9+
class BaseLookupTableTask(BaseUpdateTask):
10+
def output(self) -> luigi.Target:
11+
return GCSorLocalTarget(
12+
lookup_table_path(
13+
self.reference_genome,
14+
self.dataset_type,
15+
),
16+
)
17+
18+
def initialize_table(self) -> hl.Table:
19+
key_type = self.dataset_type.table_key_type(self.reference_genome)
20+
return hl.Table.parallelize(
21+
[],
22+
hl.tstruct(
23+
**key_type,
24+
project_stats=hl.tarray(
25+
hl.tarray(
26+
hl.tstruct(
27+
**{
28+
field: hl.tint32
29+
for field in self.dataset_type.lookup_table_fields_and_genotype_filter_fns
30+
},
31+
),
32+
),
33+
),
34+
),
35+
key=key_type.fields,
36+
globals=hl.Struct(
37+
project_guids=hl.empty_array(hl.tstr),
38+
project_families=hl.empty_dict(hl.tstr, hl.tarray(hl.tstr)),
39+
updates=hl.empty_set(hl.tstruct(callset=hl.tstr, project_guid=hl.tstr)),
40+
),
41+
)

v03_pipeline/lib/tasks/update_lookup_table.py

Lines changed: 2 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -8,15 +8,13 @@
88
remove_family_guids,
99
)
1010
from v03_pipeline.lib.model.constants import PROJECTS_EXCLUDED_FROM_LOOKUP
11-
from v03_pipeline.lib.paths import lookup_table_path
12-
from v03_pipeline.lib.tasks.base.base_update_task import BaseUpdateTask
13-
from v03_pipeline.lib.tasks.files import GCSorLocalTarget
11+
from v03_pipeline.lib.tasks.base.base_lookup_table_task import BaseLookupTableTask
1412
from v03_pipeline.lib.tasks.write_remapped_and_subsetted_callset import (
1513
WriteRemappedAndSubsettedCallsetTask,
1614
)
1715

1816

19-
class UpdateLookupTableTask(BaseUpdateTask):
17+
class UpdateLookupTableTask(BaseLookupTableTask):
2018
callset_paths = luigi.ListParameter()
2119
project_guids = luigi.ListParameter()
2220
project_remap_paths = luigi.ListParameter()
@@ -38,14 +36,6 @@ class UpdateLookupTableTask(BaseUpdateTask):
3836
parsing=luigi.BoolParameter.EXPLICIT_PARSING,
3937
)
4038

41-
def output(self) -> luigi.Target:
42-
return GCSorLocalTarget(
43-
lookup_table_path(
44-
self.reference_genome,
45-
self.dataset_type,
46-
),
47-
)
48-
4939
def complete(self) -> bool:
5040
return (
5141
not self.force
@@ -106,31 +96,6 @@ def requires(self) -> list[luigi.Task]:
10696
)
10797
]
10898

109-
def initialize_table(self) -> hl.Table:
110-
key_type = self.dataset_type.table_key_type(self.reference_genome)
111-
return hl.Table.parallelize(
112-
[],
113-
hl.tstruct(
114-
**key_type,
115-
project_stats=hl.tarray(
116-
hl.tarray(
117-
hl.tstruct(
118-
**{
119-
field: hl.tint32
120-
for field in self.dataset_type.lookup_table_fields_and_genotype_filter_fns
121-
},
122-
),
123-
),
124-
),
125-
),
126-
key=key_type.fields,
127-
globals=hl.Struct(
128-
project_guids=hl.empty_array(hl.tstr),
129-
project_families=hl.empty_dict(hl.tstr, hl.tarray(hl.tstr)),
130-
updates=hl.empty_set(hl.tstruct(callset=hl.tstr, project_guid=hl.tstr)),
131-
),
132-
)
133-
13499
def update_table(self, ht: hl.Table) -> hl.Table:
135100
# NB: there's a chance this many hail operations blows the DAG compute stack
136101
# in an unfortunate way. Please keep an eye out!
Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
import hail as hl
2+
import luigi
3+
4+
from v03_pipeline.lib.misc.lookup import (
5+
remove_project,
6+
)
7+
from v03_pipeline.lib.tasks.base.base_lookup_table_task import BaseLookupTableTask
8+
from v03_pipeline.lib.tasks.update_variant_annotations_table_with_deleted_project import (
9+
UpdateVariantAnnotationsTableWithDeletedProjectTask,
10+
)
11+
12+
13+
class UpdateLookupTableWithDeletedProjectTask(BaseLookupTableTask):
14+
project_guid = luigi.Parameter()
15+
16+
def requires(self) -> luigi.Task:
17+
return UpdateVariantAnnotationsTableWithDeletedProjectTask(
18+
dataset_type=self.dataset_type,
19+
sample_type=self.sample_type,
20+
reference_genome=self.reference_genome,
21+
project_guid=self.project_guid,
22+
)
23+
24+
def complete(self) -> bool:
25+
return super().complete() and hl.eval(
26+
~hl.read_table(self.output().path).updates.project_guid.contains(
27+
self.project_guid,
28+
),
29+
)
30+
31+
def update_table(self, ht: hl.Table) -> hl.Table:
32+
ht = remove_project(ht, self.project_guid)
33+
return ht.annotate_globals(
34+
updates=ht.updates.filter(lambda u: u.project_guid != self.project_guid),
35+
)

0 commit comments

Comments
 (0)