Skip to content

Commit 03b2de2

Browse files
committed
Chain operations
1 parent 4798b92 commit 03b2de2

File tree

3 files changed

+112
-79
lines changed

3 files changed

+112
-79
lines changed

v03_pipeline/lib/tasks/update_lookup_table.py

Lines changed: 91 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
join_lookup_hts,
77
remove_family_guids,
88
)
9+
from v03_pipeline.lib.misc.util import callset_project_pairs
910
from v03_pipeline.lib.model.constants import PROJECTS_EXCLUDED_FROM_LOOKUP
1011
from v03_pipeline.lib.paths import lookup_table_path
1112
from v03_pipeline.lib.tasks.base.base_update_task import BaseUpdateTask
@@ -16,10 +17,10 @@
1617

1718

1819
class UpdateLookupTableTask(BaseUpdateTask):
19-
callset_path = luigi.Parameter()
20-
project_guid = luigi.Parameter()
21-
project_remap_path = luigi.Parameter()
22-
project_pedigree_path = luigi.Parameter()
20+
callset_paths = luigi.Parameter()
21+
project_guids = luigi.ListParameter()
22+
project_remap_paths = luigi.ListParameter()
23+
project_pedigree_paths = luigi.ListParameter()
2324
ignore_missing_samples_when_subsetting = luigi.BoolParameter(
2425
default=False,
2526
parsing=luigi.BoolParameter.EXPLICIT_PARSING,
@@ -43,27 +44,58 @@ def output(self) -> luigi.Target:
4344

4445
def complete(self) -> bool:
4546
return super().complete() and hl.eval(
46-
hl.read_table(self.output().path).updates.contains(
47-
hl.Struct(
48-
callset=self.callset_path,
49-
project_guid=self.project_guid,
47+
hl.bind(
48+
lambda updates: hl.all(
49+
[
50+
updates.contains(
51+
hl.Struct(
52+
callset=callset_path,
53+
project_guid=project_guid,
54+
),
55+
)
56+
for (
57+
callset_path,
58+
project_guid,
59+
_,
60+
_,
61+
) in callset_project_pairs(
62+
self.callset_paths,
63+
self.project_guids,
64+
self.project_remap_paths,
65+
self.project_pedigree_paths,
66+
)
67+
],
5068
),
69+
hl.read_table(self.output().path).updates,
5170
),
5271
)
5372

54-
def requires(self) -> luigi.Task:
55-
return WriteRemappedAndSubsettedCallsetTask(
56-
self.reference_genome,
57-
self.dataset_type,
58-
self.sample_type,
59-
self.callset_path,
60-
self.project_guid,
61-
self.project_remap_path,
62-
self.project_pedigree_path,
63-
self.ignore_missing_samples_when_subsetting,
64-
self.ignore_missing_samples_when_remapping,
65-
self.validate,
66-
)
73+
def requires(self) -> list[luigi.Task]:
74+
return [
75+
WriteRemappedAndSubsettedCallsetTask(
76+
self.reference_genome,
77+
self.dataset_type,
78+
self.sample_type,
79+
callset_path,
80+
project_guid,
81+
project_remap_path,
82+
project_pedigree_path,
83+
self.ignore_missing_samples_when_subsetting,
84+
self.ignore_missing_samples_when_remapping,
85+
self.validate,
86+
)
87+
for (
88+
callset_path,
89+
project_guid,
90+
project_remap_path,
91+
project_pedigree_path,
92+
) in callset_project_pairs(
93+
self.callset_paths,
94+
self.project_guids,
95+
self.project_remap_paths,
96+
self.project_pedigree_paths,
97+
)
98+
]
6799

68100
def initialize_table(self) -> hl.Table:
69101
key_type = self.dataset_type.table_key_type(self.reference_genome)
@@ -91,34 +123,44 @@ def initialize_table(self) -> hl.Table:
91123
)
92124

93125
def update_table(self, ht: hl.Table) -> hl.Table:
94-
if self.project_guid in PROJECTS_EXCLUDED_FROM_LOOKUP:
95-
return ht.annotate_globals(
96-
updates=ht.updates.add(
97-
hl.Struct(
98-
callset=self.callset_path,
99-
project_guid=self.project_guid,
126+
# NB: there's a chance this many hail operations blows the DAG compute stack
127+
# in an unfortunate way. Please keep an eye out!
128+
for i, (callset_path, project_guid, _, _) in enumerate(callset_project_pairs(
129+
self.callset_paths,
130+
self.project_guids,
131+
self.project_remap_paths,
132+
self.project_pedigree_paths,
133+
)):
134+
if project_guid in PROJECTS_EXCLUDED_FROM_LOOKUP:
135+
ht = ht.annotate_globals(
136+
updates=ht.updates.add(
137+
hl.Struct(
138+
callset=callset_path,
139+
project_guid=project_guid,
140+
),
100141
),
142+
)
143+
continue
144+
callset_mt = hl.read_matrix_table(self.input()[i].path)
145+
ht = remove_family_guids(
146+
ht,
147+
project_guid,
148+
callset_mt.index_globals().family_samples.key_set(),
149+
)
150+
callset_ht = compute_callset_lookup_ht(
151+
self.dataset_type,
152+
callset_mt,
153+
project_guid,
154+
)
155+
ht = join_lookup_hts(
156+
ht,
157+
callset_ht,
158+
)
159+
ht = ht.select_globals(
160+
project_guids=ht.project_guids,
161+
project_families=ht.project_families,
162+
updates=ht.updates.add(
163+
hl.Struct(callset=self.callset_path, project_guid=self.project_guid),
101164
),
102165
)
103-
callset_mt = hl.read_matrix_table(self.input().path)
104-
ht = remove_family_guids(
105-
ht,
106-
self.project_guid,
107-
callset_mt.index_globals().family_samples.key_set(),
108-
)
109-
callset_ht = compute_callset_lookup_ht(
110-
self.dataset_type,
111-
callset_mt,
112-
self.project_guid,
113-
)
114-
ht = join_lookup_hts(
115-
ht,
116-
callset_ht,
117-
)
118-
return ht.select_globals(
119-
project_guids=ht.project_guids,
120-
project_families=ht.project_families,
121-
updates=ht.updates.add(
122-
hl.Struct(callset=self.callset_path, project_guid=self.project_guid),
123-
),
124-
)
166+
return ht

v03_pipeline/lib/tasks/update_lookup_table_test.py

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -19,10 +19,12 @@ def test_skip_update_lookup_table_task(self) -> None:
1919
reference_genome=ReferenceGenome.GRCh38,
2020
dataset_type=DatasetType.SNV_INDEL,
2121
sample_type=SampleType.WGS,
22-
callset_path=TEST_VCF,
23-
project_guid='R0555_seqr_demo', # a project excluded from the lookup table
24-
project_remap_path=TEST_REMAP,
25-
project_pedigree_path=TEST_PEDIGREE_3,
22+
callset_paths=[TEST_VCF],
23+
project_guids=[
24+
'R0555_seqr_demo',
25+
], # a project excluded from the lookup table
26+
project_remap_paths=[TEST_REMAP],
27+
project_pedigree_paths=[TEST_PEDIGREE_3],
2628
validate=False,
2729
)
2830
worker.add(uslt_task)
@@ -50,10 +52,10 @@ def test_update_lookup_table_task(self) -> None:
5052
reference_genome=ReferenceGenome.GRCh38,
5153
dataset_type=DatasetType.SNV_INDEL,
5254
sample_type=SampleType.WGS,
53-
callset_path=TEST_VCF,
54-
project_guid='R0113_test_project',
55-
project_remap_path=TEST_REMAP,
56-
project_pedigree_path=TEST_PEDIGREE_3,
55+
callset_path=[TEST_VCF],
56+
project_guid=['R0113_test_project'],
57+
project_remap_path=[TEST_REMAP],
58+
project_pedigree_path=[TEST_PEDIGREE_3],
5759
validate=False,
5860
)
5961
worker.add(uslt_task)

v03_pipeline/lib/tasks/update_variant_annotations_table_with_new_samples.py

Lines changed: 11 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -49,24 +49,24 @@ class UpdateVariantAnnotationsTableWithNewSamplesTask(BaseVariantAnnotationsTabl
4949
parsing=luigi.BoolParameter.EXPLICIT_PARSING,
5050
)
5151
liftover_ref_path = luigi.OptionalParameter(
52-
default='gs://hail-common/references/grch38_to_grch37.over.chain.gz',
53-
description='Path to GRCh38 to GRCh37 coordinates file',
52+
default="gs://hail-common/references/grch38_to_grch37.over.chain.gz",
53+
description="Path to GRCh38 to GRCh37 coordinates file",
5454
)
5555

5656
@property
5757
def other_annotation_dependencies(self) -> dict[str, hl.Table]:
5858
annotation_dependencies = {}
5959
if self.dataset_type.has_lookup_table:
60-
annotation_dependencies['lookup_ht'] = hl.read_table(
60+
annotation_dependencies["lookup_ht"] = hl.read_table(
6161
lookup_table_path(
6262
self.reference_genome,
6363
self.dataset_type,
6464
),
6565
)
6666

6767
if self.dataset_type.has_gencode_mapping:
68-
annotation_dependencies['gencode_mapping'] = hl.literal(
69-
load_gencode(GENCODE_RELEASE, ''),
68+
annotation_dependencies["gencode_mapping"] = hl.literal(
69+
load_gencode(GENCODE_RELEASE, ""),
7070
)
7171
return annotation_dependencies
7272

@@ -89,25 +89,14 @@ def requires(self) -> list[luigi.Task]:
8989
self.reference_genome,
9090
self.dataset_type,
9191
self.sample_type,
92-
callset_path,
93-
project_guid,
94-
project_remap_path,
95-
project_pedigree_path,
96-
self.ignore_missing_samples_when_subsetting,
97-
self.ignore_missing_samples_when_remapping,
98-
self.validate,
99-
)
100-
for (
101-
callset_path,
102-
project_guid,
103-
project_remap_path,
104-
project_pedigree_path,
105-
) in callset_project_pairs(
10692
self.callset_paths,
10793
self.project_guids,
10894
self.project_remap_paths,
10995
self.project_pedigree_paths,
110-
)
96+
self.ignore_missing_samples_when_subsetting,
97+
self.ignore_missing_samples_when_remapping,
98+
self.validate,
99+
),
111100
],
112101
)
113102
else:
@@ -233,8 +222,8 @@ def update_table(self, ht: hl.Table) -> hl.Table:
233222
):
234223
if rdc.requires_annotation:
235224
continue
236-
rdc_ht = self.rdc_annotation_dependencies[f'{rdc.value}_ht']
237-
new_variants_ht = new_variants_ht.join(rdc_ht, 'left')
225+
rdc_ht = self.rdc_annotation_dependencies[f"{rdc.value}_ht"]
226+
new_variants_ht = new_variants_ht.join(rdc_ht, "left")
238227

239228
# 4) Union with the existing variant annotations table
240229
# and annotate with the lookup table.

0 commit comments

Comments
 (0)