Skip to content

Commit 057cb4d

Browse files
authored
Delete "family" tasks (#761)
* Delete project tasks * cleanup * ruff format * well * rename * hacking away * almost there! * ruff * Fix missing updates change * ruff * Remove debug code * remove bad merge * more precision in test * project table * allow for missing project * remove some unnecessary checks * test already deleted family * add comment
1 parent ac4baa9 commit 057cb4d

10 files changed

+751
-35
lines changed

v03_pipeline/lib/misc/lookup_test.py

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -89,7 +89,7 @@ def test_compute_callset_lookup_ht(self) -> None:
8989
],
9090
)
9191

92-
def test_remove_new_callset_family_guids(self) -> None:
92+
def test_remove_family_guids(self) -> None:
9393
lookup_ht = hl.Table.parallelize(
9494
[
9595
{
@@ -179,6 +179,11 @@ def test_remove_new_callset_family_guids(self) -> None:
179179
'project_a',
180180
hl.set(['3', '1']),
181181
)
182+
lookup_ht = remove_family_guids(
183+
lookup_ht,
184+
'project_a',
185+
hl.set(['1']),
186+
)
182187
lookup_ht = remove_family_guids(
183188
lookup_ht,
184189
'project_b',
Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
import hail as hl
2+
import luigi
3+
4+
from v03_pipeline.lib.paths import project_table_path
5+
from v03_pipeline.lib.tasks.base.base_update_task import BaseUpdateTask
6+
from v03_pipeline.lib.tasks.files import GCSorLocalTarget
7+
8+
9+
class BaseProjectTableTask(BaseUpdateTask):
10+
project_guid = luigi.Parameter()
11+
12+
def output(self) -> luigi.Target:
13+
return GCSorLocalTarget(
14+
project_table_path(
15+
self.reference_genome,
16+
self.dataset_type,
17+
self.project_guid,
18+
),
19+
)
20+
21+
def initialize_table(self) -> hl.Table:
22+
key_type = self.dataset_type.table_key_type(self.reference_genome)
23+
return hl.Table.parallelize(
24+
[],
25+
hl.tstruct(
26+
**key_type,
27+
filters=hl.tset(hl.tstr),
28+
# NB: entries is missing here because it is untyped
29+
# until we read the type off of the first callset aggregation.
30+
),
31+
key=key_type.fields,
32+
globals=hl.Struct(
33+
family_guids=hl.empty_array(hl.tstr),
34+
family_samples=hl.empty_dict(hl.tstr, hl.tarray(hl.tstr)),
35+
updates=hl.empty_set(hl.tstr),
36+
),
37+
)
Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
import hail as hl
2+
import luigi
3+
4+
from v03_pipeline.lib.misc.lookup import (
5+
remove_family_guids,
6+
)
7+
from v03_pipeline.lib.tasks.base.base_lookup_table_task import BaseLookupTableTask
8+
from v03_pipeline.lib.tasks.update_variant_annotations_table_with_deleted_families import (
9+
UpdateVariantAnnotationsTableWithDeletedFamiliesTask,
10+
)
11+
12+
13+
class UpdateLookupTableWithDeletedFamiliesTask(BaseLookupTableTask):
14+
project_guid = luigi.Parameter()
15+
family_guids = luigi.ListParameter()
16+
17+
def requires(self) -> luigi.Task:
18+
# We require updating the annotations table first so that
19+
# we are able to use the lookup table to determine which rows
20+
# of the annotations table require re-annotation.
21+
return UpdateVariantAnnotationsTableWithDeletedFamiliesTask(
22+
dataset_type=self.dataset_type,
23+
sample_type=self.sample_type,
24+
reference_genome=self.reference_genome,
25+
project_guid=self.project_guid,
26+
family_guids=self.family_guids,
27+
)
28+
29+
def complete(self) -> bool:
30+
return super().complete() and hl.eval(
31+
hl.bind(
32+
lambda family_guids: (
33+
hl.is_missing(family_guids) # The project itself is missing
34+
| hl.all(
35+
hl.array(list(self.family_guids)).map(
36+
lambda family_guid: ~family_guids.contains(family_guid),
37+
),
38+
)
39+
),
40+
hl.set(
41+
hl.read_table(self.output().path).globals.project_families.get(
42+
self.project_guid,
43+
),
44+
),
45+
),
46+
)
47+
48+
def update_table(self, ht: hl.Table) -> hl.Table:
49+
return remove_family_guids(
50+
ht,
51+
self.project_guid,
52+
hl.set(list(self.family_guids)),
53+
)
Lines changed: 208 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,208 @@
1+
from unittest import mock
2+
3+
import hail as hl
4+
import luigi.worker
5+
6+
from v03_pipeline.lib.model import DatasetType, ReferenceGenome, SampleType
7+
from v03_pipeline.lib.tasks.update_lookup_table_with_deleted_families import (
8+
UpdateLookupTableWithDeletedFamiliesTask,
9+
)
10+
from v03_pipeline.lib.test.mock_complete_task import MockCompleteTask
11+
from v03_pipeline.lib.test.mocked_dataroot_testcase import MockedDatarootTestCase
12+
13+
14+
@mock.patch(
15+
'v03_pipeline.lib.tasks.update_lookup_table_with_deleted_families.UpdateVariantAnnotationsTableWithDeletedFamiliesTask',
16+
)
17+
class UpdateLookupTableWithDeletedProjectTaskTest(MockedDatarootTestCase):
18+
def test_delete_project_empty_table(
19+
self,
20+
mock_update_lookup_table_task: mock.Mock,
21+
) -> None:
22+
mock_update_lookup_table_task.return_value = MockCompleteTask()
23+
worker = luigi.worker.Worker()
24+
task = UpdateLookupTableWithDeletedFamiliesTask(
25+
dataset_type=DatasetType.SNV_INDEL,
26+
sample_type=SampleType.WGS,
27+
reference_genome=ReferenceGenome.GRCh38,
28+
project_guid='R0555_seqr_demo',
29+
family_guids=['abc'],
30+
)
31+
worker.add(task)
32+
worker.run()
33+
self.assertTrue(task.output().exists())
34+
self.assertTrue(task.complete())
35+
ht = hl.read_table(task.output().path)
36+
self.assertEqual(
37+
ht.globals.collect(),
38+
[
39+
hl.Struct(
40+
project_guids=[],
41+
project_families={},
42+
updates=set(),
43+
),
44+
],
45+
)
46+
self.assertEqual(ht.collect(), [])
47+
48+
@mock.patch(
49+
'v03_pipeline.lib.tasks.update_lookup_table_with_deleted_families.UpdateLookupTableWithDeletedFamiliesTask.initialize_table',
50+
)
51+
def test_delete_project(
52+
self,
53+
mock_initialize_table: mock.Mock,
54+
mock_update_lookup_table_task: mock.Mock,
55+
) -> None:
56+
mock_update_lookup_table_task.return_value = MockCompleteTask()
57+
mock_initialize_table.return_value = hl.Table.parallelize(
58+
[
59+
{
60+
'id': 0,
61+
'project_stats': [
62+
[
63+
hl.Struct(
64+
ref_samples=0,
65+
heteroplasmic_samples=0,
66+
homoplasmic_samples=0,
67+
),
68+
hl.Struct(
69+
ref_samples=1,
70+
heteroplasmic_samples=1,
71+
homoplasmic_samples=1,
72+
),
73+
hl.Struct(
74+
ref_samples=2,
75+
heteroplasmic_samples=2,
76+
homoplasmic_samples=2,
77+
),
78+
],
79+
[
80+
hl.Struct(
81+
ref_samples=3,
82+
heteroplasmic_samples=3,
83+
homoplasmic_samples=3,
84+
),
85+
],
86+
],
87+
},
88+
{
89+
'id': 1,
90+
'project_stats': [
91+
[
92+
hl.Struct(
93+
ref_samples=0,
94+
heteroplasmic_samples=0,
95+
homoplasmic_samples=0,
96+
),
97+
hl.Struct(
98+
ref_samples=1,
99+
heteroplasmic_samples=1,
100+
homoplasmic_samples=1,
101+
),
102+
hl.Struct(
103+
ref_samples=2,
104+
heteroplasmic_samples=2,
105+
homoplasmic_samples=2,
106+
),
107+
],
108+
[
109+
hl.Struct(
110+
ref_samples=3,
111+
heteroplasmic_samples=3,
112+
homoplasmic_samples=3,
113+
),
114+
],
115+
],
116+
},
117+
],
118+
hl.tstruct(
119+
id=hl.tint32,
120+
project_stats=hl.tarray(
121+
hl.tarray(
122+
hl.tstruct(
123+
ref_samples=hl.tint32,
124+
heteroplasmic_samples=hl.tint32,
125+
homoplasmic_samples=hl.tint32,
126+
),
127+
),
128+
),
129+
),
130+
key='id',
131+
globals=hl.Struct(
132+
project_guids=['project_a', 'project_b'],
133+
project_families={'project_a': ['1', '2', '3'], 'project_b': ['4']},
134+
updates={
135+
hl.Struct(project_guid='project_a', callset='abc'),
136+
hl.Struct(project_guid='project_b', callset='abc'),
137+
},
138+
),
139+
)
140+
worker = luigi.worker.Worker()
141+
task = UpdateLookupTableWithDeletedFamiliesTask(
142+
dataset_type=DatasetType.SNV_INDEL,
143+
sample_type=SampleType.WGS,
144+
reference_genome=ReferenceGenome.GRCh38,
145+
project_guid='project_a',
146+
family_guids=['1', '3'],
147+
)
148+
worker.add(task)
149+
worker.run()
150+
self.assertTrue(task.output().exists())
151+
self.assertTrue(task.complete())
152+
ht = hl.read_table(task.output().path)
153+
self.assertEqual(
154+
ht.globals.collect(),
155+
[
156+
hl.Struct(
157+
project_guids=['project_a', 'project_b'],
158+
project_families={'project_a': ['2'], 'project_b': ['4']},
159+
updates={
160+
hl.Struct(project_guid='project_a', callset='abc'),
161+
hl.Struct(project_guid='project_b', callset='abc'),
162+
},
163+
),
164+
],
165+
)
166+
self.assertEqual(
167+
ht.collect(),
168+
[
169+
hl.Struct(
170+
id=0,
171+
project_stats=[
172+
[
173+
hl.Struct(
174+
ref_samples=1,
175+
heteroplasmic_samples=1,
176+
homoplasmic_samples=1,
177+
),
178+
],
179+
[
180+
hl.Struct(
181+
ref_samples=3,
182+
heteroplasmic_samples=3,
183+
homoplasmic_samples=3,
184+
),
185+
],
186+
],
187+
),
188+
hl.Struct(
189+
id=1,
190+
project_stats=[
191+
[
192+
hl.Struct(
193+
ref_samples=1,
194+
heteroplasmic_samples=1,
195+
homoplasmic_samples=1,
196+
),
197+
],
198+
[
199+
hl.Struct(
200+
ref_samples=3,
201+
heteroplasmic_samples=3,
202+
homoplasmic_samples=3,
203+
),
204+
],
205+
],
206+
),
207+
],
208+
)

v03_pipeline/lib/tasks/update_project_table.py

Lines changed: 2 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -7,17 +7,14 @@
77
join_family_entries_hts,
88
remove_family_guids,
99
)
10-
from v03_pipeline.lib.paths import project_table_path
11-
from v03_pipeline.lib.tasks.base.base_update_task import BaseUpdateTask
12-
from v03_pipeline.lib.tasks.files import GCSorLocalTarget
10+
from v03_pipeline.lib.tasks.base.base_project_table_task import BaseProjectTableTask
1311
from v03_pipeline.lib.tasks.write_remapped_and_subsetted_callset import (
1412
WriteRemappedAndSubsettedCallsetTask,
1513
)
1614

1715

18-
class UpdateProjectTableTask(BaseUpdateTask):
16+
class UpdateProjectTableTask(BaseProjectTableTask):
1917
callset_path = luigi.Parameter()
20-
project_guid = luigi.Parameter()
2118
project_remap_path = luigi.Parameter()
2219
project_pedigree_path = luigi.Parameter()
2320
ignore_missing_samples_when_subsetting = luigi.BoolParameter(
@@ -41,15 +38,6 @@ class UpdateProjectTableTask(BaseUpdateTask):
4138
description='Is this a fully joint-called callset.',
4239
)
4340

44-
def output(self) -> luigi.Target:
45-
return GCSorLocalTarget(
46-
project_table_path(
47-
self.reference_genome,
48-
self.dataset_type,
49-
self.project_guid,
50-
),
51-
)
52-
5341
def complete(self) -> bool:
5442
return (
5543
not self.force
@@ -76,24 +64,6 @@ def requires(self) -> luigi.Task:
7664
False,
7765
)
7866

79-
def initialize_table(self) -> hl.Table:
80-
key_type = self.dataset_type.table_key_type(self.reference_genome)
81-
return hl.Table.parallelize(
82-
[],
83-
hl.tstruct(
84-
**key_type,
85-
filters=hl.tset(hl.tstr),
86-
# NB: entries is missing here because it is untyped
87-
# until we read the type off of the first callset aggregation.
88-
),
89-
key=key_type.fields,
90-
globals=hl.Struct(
91-
family_guids=hl.empty_array(hl.tstr),
92-
family_samples=hl.empty_dict(hl.tstr, hl.tarray(hl.tstr)),
93-
updates=hl.empty_set(hl.tstr),
94-
),
95-
)
96-
9767
def update_table(self, ht: hl.Table) -> hl.Table:
9868
callset_mt = hl.read_matrix_table(self.input().path)
9969
callset_ht = compute_callset_family_entries_ht(

0 commit comments

Comments
 (0)