From 64254c691acb3ff3d2e67443192844c1c8f1f334 Mon Sep 17 00:00:00 2001 From: Julia Klugherz Date: Mon, 24 Jun 2024 17:02:36 -0400 Subject: [PATCH 1/3] use_new_shuffle=None --- v03_pipeline/lib/reference_data/clinvar.py | 20 ++++++++------------ 1 file changed, 8 insertions(+), 12 deletions(-) diff --git a/v03_pipeline/lib/reference_data/clinvar.py b/v03_pipeline/lib/reference_data/clinvar.py index 46d06708f..6c169e3cc 100644 --- a/v03_pipeline/lib/reference_data/clinvar.py +++ b/v03_pipeline/lib/reference_data/clinvar.py @@ -160,37 +160,33 @@ def join_to_submission_summary_ht(vcf_ht: hl.Table) -> hl.Table: # https://ftp.ncbi.nlm.nih.gov/pub/clinvar/tab_delimited/README - submission_summary.txt logger.info('Getting clinvar submission summary') ht = download_and_import_clinvar_submission_summary() + # Set 'use_new_shuffle' to None here maintain expected partitions + hl._set_flags(use_new_shuffle=None) ht = ht.rename({'#VariationID': 'VariationID'}) ht = ht.select('VariationID', 'Submitter', 'ReportedPhenotypeInfo') ht = ht.group_by('VariationID').aggregate( Submitters=hl.agg.collect(ht.Submitter), Conditions=hl.agg.collect(ht.ReportedPhenotypeInfo), ) - ht = ht.key_by('VariationID') - return vcf_ht.annotate( + ht = vcf_ht.annotate( submitters=ht[vcf_ht.rsid].Submitters, conditions=ht[vcf_ht.rsid].Conditions, ) + hl._set_flags(use_new_shuffle='1') + return ht def download_and_import_clinvar_submission_summary() -> hl.Table: with tempfile.NamedTemporaryFile( suffix='.txt.gz', delete=False, - ) as tmp_file, tempfile.NamedTemporaryFile( - suffix='.txt', - delete=False, - ) as unzipped_tmp_file: + ) as tmp_file: urllib.request.urlretrieve(CLINVAR_SUBMISSION_SUMMARY_URL, tmp_file.name) # noqa: S310 - # Unzip the gzipped file first to fix gzip files being read by hail with single partition - with gzip.open(tmp_file.name, 'rb') as f_in, open(unzipped_tmp_file.name, 'wb') as f_out: - shutil.copyfileobj(f_in, f_out) - gcs_tmp_file_name = os.path.join( Env.HAIL_TMPDIR, - os.path.basename(unzipped_tmp_file.name), + os.path.basename(tmp_file.name), ) - safely_move_to_gcs(unzipped_tmp_file.name, gcs_tmp_file_name) + safely_move_to_gcs(tmp_file.name, gcs_tmp_file_name) return hl.import_table( gcs_tmp_file_name, force=True, From 8e48a0a625ac8cbe5228c23f0adbf8bc4b85e742 Mon Sep 17 00:00:00 2001 From: Julia Klugherz Date: Mon, 24 Jun 2024 17:11:40 -0400 Subject: [PATCH 2/3] try --- v03_pipeline/lib/reference_data/clinvar.py | 6 +----- v03_pipeline/lib/reference_data/dataset_table_operations.py | 4 ++++ 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/v03_pipeline/lib/reference_data/clinvar.py b/v03_pipeline/lib/reference_data/clinvar.py index 6c169e3cc..e43d18a7e 100644 --- a/v03_pipeline/lib/reference_data/clinvar.py +++ b/v03_pipeline/lib/reference_data/clinvar.py @@ -160,20 +160,16 @@ def join_to_submission_summary_ht(vcf_ht: hl.Table) -> hl.Table: # https://ftp.ncbi.nlm.nih.gov/pub/clinvar/tab_delimited/README - submission_summary.txt logger.info('Getting clinvar submission summary') ht = download_and_import_clinvar_submission_summary() - # Set 'use_new_shuffle' to None here maintain expected partitions - hl._set_flags(use_new_shuffle=None) ht = ht.rename({'#VariationID': 'VariationID'}) ht = ht.select('VariationID', 'Submitter', 'ReportedPhenotypeInfo') ht = ht.group_by('VariationID').aggregate( Submitters=hl.agg.collect(ht.Submitter), Conditions=hl.agg.collect(ht.ReportedPhenotypeInfo), ) - ht = vcf_ht.annotate( + return vcf_ht.annotate( submitters=ht[vcf_ht.rsid].Submitters, conditions=ht[vcf_ht.rsid].Conditions, ) - hl._set_flags(use_new_shuffle='1') - return ht def download_and_import_clinvar_submission_summary() -> hl.Table: diff --git a/v03_pipeline/lib/reference_data/dataset_table_operations.py b/v03_pipeline/lib/reference_data/dataset_table_operations.py index 0ed8158aa..aced5cd16 100644 --- a/v03_pipeline/lib/reference_data/dataset_table_operations.py +++ b/v03_pipeline/lib/reference_data/dataset_table_operations.py @@ -36,8 +36,10 @@ def update_or_create_joined_ht( continue # Join the new one! + hl._set_flags(use_new_shuffle=None, no_whole_stage_codegen='1') # noqa: SLF001 dataset_ht = get_dataset_ht(dataset, reference_genome) dataset_ht, _ = checkpoint(dataset_ht) + hl._set_flags(use_new_shuffle='1', no_whole_stage_codegen='1') # noqa: SLF001 joined_ht = joined_ht.join(dataset_ht, 'outer') joined_ht = annotate_dataset_globals(joined_ht, dataset, dataset_ht) @@ -214,8 +216,10 @@ def join_hts( ), ) for dataset in reference_dataset_collection.datasets(dataset_type): + hl._set_flags(use_new_shuffle=None, no_whole_stage_codegen='1') # noqa: SLF001 dataset_ht = get_dataset_ht(dataset, reference_genome) dataset_ht, _ = checkpoint(dataset_ht) + hl._set_flags(use_new_shuffle='1', no_whole_stage_codegen='1') # noqa: SLF001 joined_ht = joined_ht.join(dataset_ht, 'outer') joined_ht = annotate_dataset_globals(joined_ht, dataset, dataset_ht) return joined_ht From 696d4ea0c61e310bfbe79c6b2df558812925922d Mon Sep 17 00:00:00 2001 From: Julia Klugherz Date: Mon, 24 Jun 2024 17:13:26 -0400 Subject: [PATCH 3/3] lint --- v03_pipeline/lib/reference_data/clinvar.py | 1 - 1 file changed, 1 deletion(-) diff --git a/v03_pipeline/lib/reference_data/clinvar.py b/v03_pipeline/lib/reference_data/clinvar.py index e43d18a7e..6d24b3467 100644 --- a/v03_pipeline/lib/reference_data/clinvar.py +++ b/v03_pipeline/lib/reference_data/clinvar.py @@ -1,6 +1,5 @@ import gzip import os -import shutil import subprocess import tempfile import urllib