Skip to content

Commit 0c5d8fc

Browse files
committed
merge
2 parents e78a6e4 + e5290ed commit 0c5d8fc

File tree

4 files changed

+29
-27
lines changed

4 files changed

+29
-27
lines changed

v03_pipeline/lib/misc/family_entries.py

Lines changed: 14 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -98,13 +98,22 @@ def remove_family_guids(
9898
family_guids: hl.SetExpression,
9999
) -> hl.Table:
100100
# Remove families from the existing project table structure (both the entries arrays and the globals are mutated)
101-
family_indexes_to_keep = hl.array(
102-
hl.enumerate(ht.globals.family_guids)
103-
.filter(lambda item: ~family_guids.contains(item[1]))
104-
.map(lambda item: item[0]),
101+
family_indexes_to_keep = hl.eval(
102+
hl.array(
103+
hl.enumerate(ht.globals.family_guids)
104+
.filter(lambda item: ~family_guids.contains(item[1]))
105+
.map(lambda item: item[0]),
106+
),
105107
)
106108
ht = ht.annotate(
107-
family_entries=family_indexes_to_keep.map(lambda i: ht.family_entries[i]),
109+
# NB: this "should" work without the extra if statement (and does in the tests)
110+
# however, experiments on dataproc showed this statement hanging with an empty
111+
# unevaluated indexes array.
112+
family_entries=hl.array(family_indexes_to_keep).map(
113+
lambda i: ht.family_entries[i],
114+
)
115+
if len(family_indexes_to_keep) > 0
116+
else hl.empty_array(ht.family_entries.dtype.element_type),
108117
)
109118
ht = ht.filter(hl.any(ht.family_entries.map(hl.is_defined)))
110119
return ht.annotate_globals(

v03_pipeline/lib/misc/lookup.py

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -128,22 +128,23 @@ def remove_project(
128128
existing_project_guids = hl.eval(ht.globals.project_guids)
129129
if project_guid not in existing_project_guids:
130130
return ht
131-
project_indexes_to_keep = (
131+
project_indexes_to_keep = hl.eval(
132132
hl.enumerate(existing_project_guids)
133133
.filter(lambda item: item[1] != project_guid)
134-
.map(lambda item: item[0])
134+
.map(lambda item: item[0]),
135135
)
136136
ht = ht.annotate(
137137
project_stats=(
138-
project_indexes_to_keep.map(
139-
lambda i: ht.project_stats[i],
140-
)
138+
# See "remove_family_guids" func for why this was necessary
139+
hl.array(project_indexes_to_keep).map(lambda i: ht.project_stats[i])
140+
if len(project_indexes_to_keep) > 0
141+
else hl.empty_array(ht.project_stats.dtype.element_type)
141142
),
142143
)
143144
ht = ht.filter(hl.any(ht.project_stats.map(hl.is_defined)))
144145
return ht.annotate_globals(
145-
project_guids=project_indexes_to_keep.map(
146-
lambda i: ht.project_guids[i],
146+
project_guids=ht.project_guids.filter(
147+
lambda p: p != project_guid,
147148
),
148149
project_families=hl.dict(
149150
ht.project_families.items().filter(lambda item: item[0] != project_guid),

v03_pipeline/lib/reference_data/clinvar.py

Lines changed: 3 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
import gzip
22
import os
3-
import shutil
43
import subprocess
54
import tempfile
65
import urllib
@@ -166,7 +165,6 @@ def join_to_submission_summary_ht(vcf_ht: hl.Table) -> hl.Table:
166165
Submitters=hl.agg.collect(ht.Submitter),
167166
Conditions=hl.agg.collect(ht.ReportedPhenotypeInfo),
168167
)
169-
ht = ht.key_by('VariationID')
170168
return vcf_ht.annotate(
171169
submitters=ht[vcf_ht.rsid].Submitters,
172170
conditions=ht[vcf_ht.rsid].Conditions,
@@ -177,23 +175,13 @@ def download_and_import_clinvar_submission_summary() -> hl.Table:
177175
with tempfile.NamedTemporaryFile(
178176
suffix='.txt.gz',
179177
delete=False,
180-
) as tmp_file, tempfile.NamedTemporaryFile(
181-
suffix='.txt',
182-
delete=False,
183-
) as unzipped_tmp_file:
178+
) as tmp_file:
184179
urllib.request.urlretrieve(CLINVAR_SUBMISSION_SUMMARY_URL, tmp_file.name) # noqa: S310
185-
# Unzip the gzipped file first to fix gzip files being read by hail with single partition
186-
with gzip.open(tmp_file.name, 'rb') as f_in, open(
187-
unzipped_tmp_file.name,
188-
'wb',
189-
) as f_out:
190-
shutil.copyfileobj(f_in, f_out)
191-
192180
gcs_tmp_file_name = os.path.join(
193181
Env.HAIL_TMPDIR,
194-
os.path.basename(unzipped_tmp_file.name),
182+
os.path.basename(tmp_file.name),
195183
)
196-
safely_move_to_gcs(unzipped_tmp_file.name, gcs_tmp_file_name)
184+
safely_move_to_gcs(tmp_file.name, gcs_tmp_file_name)
197185
return hl.import_table(
198186
gcs_tmp_file_name,
199187
force=True,

v03_pipeline/lib/reference_data/dataset_table_operations.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,8 +36,10 @@ def update_or_create_joined_ht(
3636
continue
3737

3838
# Join the new one!
39+
hl._set_flags(use_new_shuffle=None, no_whole_stage_codegen='1') # noqa: SLF001
3940
dataset_ht = get_dataset_ht(dataset, reference_genome)
4041
dataset_ht, _ = checkpoint(dataset_ht)
42+
hl._set_flags(use_new_shuffle='1', no_whole_stage_codegen='1') # noqa: SLF001
4143
joined_ht = joined_ht.join(dataset_ht, 'outer')
4244
joined_ht = annotate_dataset_globals(joined_ht, dataset, dataset_ht)
4345

@@ -214,8 +216,10 @@ def join_hts(
214216
),
215217
)
216218
for dataset in reference_dataset_collection.datasets(dataset_type):
219+
hl._set_flags(use_new_shuffle=None, no_whole_stage_codegen='1') # noqa: SLF001
217220
dataset_ht = get_dataset_ht(dataset, reference_genome)
218221
dataset_ht, _ = checkpoint(dataset_ht)
222+
hl._set_flags(use_new_shuffle='1', no_whole_stage_codegen='1') # noqa: SLF001
219223
joined_ht = joined_ht.join(dataset_ht, 'outer')
220224
joined_ht = annotate_dataset_globals(joined_ht, dataset, dataset_ht)
221225
return joined_ht

0 commit comments

Comments
 (0)