File tree Expand file tree Collapse file tree 3 files changed +16
-9
lines changed Expand file tree Collapse file tree 3 files changed +16
-9
lines changed Original file line number Diff line number Diff line change @@ -200,21 +200,24 @@ def import_pedigree(pedigree_path: str) -> hl.Table:
200
200
)
201
201
202
202
203
- def write (
204
- t : hl .Table | hl .MatrixTable ,
205
- destination_path : str ,
206
- ) -> hl .Table | hl .MatrixTable :
203
+ def checkpoint (t : hl .Table | hl .MatrixTable ) -> tuple [hl .Table | hl .MatrixTable , str ]:
207
204
suffix = 'mt' if isinstance (t , hl .MatrixTable ) else 'ht'
208
205
read_fn = hl .read_matrix_table if isinstance (t , hl .MatrixTable ) else hl .read_table
209
206
checkpoint_path = os .path .join (
210
207
Env .HAIL_TMPDIR ,
211
208
f'{ uuid .uuid4 ()} .{ suffix } ' ,
212
209
)
213
- # not using checkpoint to read/write here because the checkpoint codec is different, leading to a different on disk size.
214
210
t .write (checkpoint_path )
215
- t = read_fn (checkpoint_path )
211
+ return read_fn (checkpoint_path ), checkpoint_path
212
+
213
+
214
+ def write (
215
+ t : hl .Table | hl .MatrixTable ,
216
+ destination_path : str ,
217
+ ) -> hl .Table | hl .MatrixTable :
218
+ t , path = checkpoint (t )
216
219
t = t .repartition (
217
- compute_hail_n_partitions (file_size_bytes (checkpoint_path )),
220
+ compute_hail_n_partitions (file_size_bytes (path )),
218
221
shuffle = False ,
219
222
)
220
223
return t .write (destination_path , overwrite = True )
Original file line number Diff line number Diff line change @@ -183,7 +183,7 @@ def download_and_import_clinvar_submission_summary() -> hl.Table:
183
183
os .path .basename (tmp_file .name ),
184
184
)
185
185
safely_move_to_gcs (tmp_file .name , gcs_tmp_file_name )
186
- return hl .import_table (
186
+ ht = hl .import_table (
187
187
gcs_tmp_file_name ,
188
188
force = True ,
189
189
filter = '^(#[^:]*:|^##).*$' , # removes all comments except for the header line
@@ -193,5 +193,6 @@ def download_and_import_clinvar_submission_summary() -> hl.Table:
193
193
'ReportedPhenotypeInfo' : hl .tstr ,
194
194
},
195
195
missing = '-' ,
196
- min_partitions = MIN_HT_PARTITIONS ,
197
196
)
197
+ # NB: min_partitions fails with force=True but appears overrideable
198
+ return ht .reparition (MIN_HT_PARTITIONS )
Original file line number Diff line number Diff line change 4
4
import hail as hl
5
5
import pytz
6
6
7
+ from v03_pipeline .lib .misc .io import checkpoint
7
8
from v03_pipeline .lib .misc .nested_field import parse_nested_field
8
9
from v03_pipeline .lib .model import (
9
10
DatasetType ,
@@ -36,6 +37,7 @@ def update_or_create_joined_ht(
36
37
37
38
# Join the new one!
38
39
dataset_ht = get_dataset_ht (dataset , reference_genome )
40
+ dataset_ht , _ = checkpoint (dataset_ht )
39
41
joined_ht = joined_ht .join (dataset_ht , 'outer' )
40
42
joined_ht = annotate_dataset_globals (joined_ht , dataset , dataset_ht )
41
43
@@ -213,6 +215,7 @@ def join_hts(
213
215
)
214
216
for dataset in reference_dataset_collection .datasets (dataset_type ):
215
217
dataset_ht = get_dataset_ht (dataset , reference_genome )
218
+ dataset_ht , _ = checkpoint (dataset_ht )
216
219
joined_ht = joined_ht .join (dataset_ht , 'outer' )
217
220
joined_ht = annotate_dataset_globals (joined_ht , dataset , dataset_ht )
218
221
return joined_ht
You can’t perform that action at this time.
0 commit comments