diff --git a/v03_pipeline/lib/reference_data/clinvar.py b/v03_pipeline/lib/reference_data/clinvar.py index 46d06708f..6d24b3467 100644 --- a/v03_pipeline/lib/reference_data/clinvar.py +++ b/v03_pipeline/lib/reference_data/clinvar.py @@ -1,6 +1,5 @@ import gzip import os -import shutil import subprocess import tempfile import urllib @@ -166,7 +165,6 @@ def join_to_submission_summary_ht(vcf_ht: hl.Table) -> hl.Table: Submitters=hl.agg.collect(ht.Submitter), Conditions=hl.agg.collect(ht.ReportedPhenotypeInfo), ) - ht = ht.key_by('VariationID') return vcf_ht.annotate( submitters=ht[vcf_ht.rsid].Submitters, conditions=ht[vcf_ht.rsid].Conditions, @@ -177,20 +175,13 @@ def download_and_import_clinvar_submission_summary() -> hl.Table: with tempfile.NamedTemporaryFile( suffix='.txt.gz', delete=False, - ) as tmp_file, tempfile.NamedTemporaryFile( - suffix='.txt', - delete=False, - ) as unzipped_tmp_file: + ) as tmp_file: urllib.request.urlretrieve(CLINVAR_SUBMISSION_SUMMARY_URL, tmp_file.name) # noqa: S310 - # Unzip the gzipped file first to fix gzip files being read by hail with single partition - with gzip.open(tmp_file.name, 'rb') as f_in, open(unzipped_tmp_file.name, 'wb') as f_out: - shutil.copyfileobj(f_in, f_out) - gcs_tmp_file_name = os.path.join( Env.HAIL_TMPDIR, - os.path.basename(unzipped_tmp_file.name), + os.path.basename(tmp_file.name), ) - safely_move_to_gcs(unzipped_tmp_file.name, gcs_tmp_file_name) + safely_move_to_gcs(tmp_file.name, gcs_tmp_file_name) return hl.import_table( gcs_tmp_file_name, force=True, diff --git a/v03_pipeline/lib/reference_data/dataset_table_operations.py b/v03_pipeline/lib/reference_data/dataset_table_operations.py index 0ed8158aa..aced5cd16 100644 --- a/v03_pipeline/lib/reference_data/dataset_table_operations.py +++ b/v03_pipeline/lib/reference_data/dataset_table_operations.py @@ -36,8 +36,10 @@ def update_or_create_joined_ht( continue # Join the new one! + hl._set_flags(use_new_shuffle=None, no_whole_stage_codegen='1') # noqa: SLF001 dataset_ht = get_dataset_ht(dataset, reference_genome) dataset_ht, _ = checkpoint(dataset_ht) + hl._set_flags(use_new_shuffle='1', no_whole_stage_codegen='1') # noqa: SLF001 joined_ht = joined_ht.join(dataset_ht, 'outer') joined_ht = annotate_dataset_globals(joined_ht, dataset, dataset_ht) @@ -214,8 +216,10 @@ def join_hts( ), ) for dataset in reference_dataset_collection.datasets(dataset_type): + hl._set_flags(use_new_shuffle=None, no_whole_stage_codegen='1') # noqa: SLF001 dataset_ht = get_dataset_ht(dataset, reference_genome) dataset_ht, _ = checkpoint(dataset_ht) + hl._set_flags(use_new_shuffle='1', no_whole_stage_codegen='1') # noqa: SLF001 joined_ht = joined_ht.join(dataset_ht, 'outer') joined_ht = annotate_dataset_globals(joined_ht, dataset, dataset_ht) return joined_ht