diff --git a/v03_pipeline/lib/misc/io.py b/v03_pipeline/lib/misc/io.py index f2077cb5d..c420c84e9 100644 --- a/v03_pipeline/lib/misc/io.py +++ b/v03_pipeline/lib/misc/io.py @@ -214,10 +214,12 @@ def checkpoint(t: hl.Table | hl.MatrixTable) -> tuple[hl.Table | hl.MatrixTable, def write( t: hl.Table | hl.MatrixTable, destination_path: str, + repartition: bool = True, ) -> hl.Table | hl.MatrixTable: t, path = checkpoint(t) - t = t.repartition( - compute_hail_n_partitions(file_size_bytes(path)), - shuffle=False, - ) + if repartition: + t = t.repartition( + compute_hail_n_partitions(file_size_bytes(path)), + shuffle=False, + ) return t.write(destination_path, overwrite=True) diff --git a/v03_pipeline/lib/paths.py b/v03_pipeline/lib/paths.py index 3ab830e5f..628470212 100644 --- a/v03_pipeline/lib/paths.py +++ b/v03_pipeline/lib/paths.py @@ -282,3 +282,10 @@ def new_variants_table_path( run_id, 'new_variants.ht', ) + + +def clinvar_dataset_path(reference_genome: ReferenceGenome, etag: str) -> str: + return os.path.join( + Env.HAIL_TMPDIR, + f'clinvar-{reference_genome.value}-{etag}.ht', + ) diff --git a/v03_pipeline/lib/reference_data/clinvar.py b/v03_pipeline/lib/reference_data/clinvar.py index 8478efa72..e010b2eec 100644 --- a/v03_pipeline/lib/reference_data/clinvar.py +++ b/v03_pipeline/lib/reference_data/clinvar.py @@ -6,11 +6,15 @@ import urllib import hail as hl +import hailtop.fs as hfs +import requests from v03_pipeline.lib.annotations.enums import CLINVAR_PATHOGENICITIES_LOOKUP from v03_pipeline.lib.logger import get_logger +from v03_pipeline.lib.misc.io import write from v03_pipeline.lib.model import Env from v03_pipeline.lib.model.definitions import ReferenceGenome +from v03_pipeline.lib.paths import clinvar_dataset_path CLINVAR_ASSERTIONS = [ 'Affects', @@ -110,12 +114,26 @@ def parsed_and_mapped_clnsigconf(ht: hl.Table): ) +def get_clinvar_ht( + clinvar_url: str, + reference_genome: ReferenceGenome, +): + etag = requests.head(clinvar_url, timeout=10).headers.get('ETag').strip('"') + clinvar_ht_path = clinvar_dataset_path(reference_genome, etag) + if hfs.exists(clinvar_ht_path): + logger.info(f'Try using cached clinvar ht with etag {etag}') + ht = hl.read_table(clinvar_ht_path) + else: + logger.info('Cached clinvar ht not found, downloading latest clinvar vcf') + ht = download_and_import_latest_clinvar_vcf(clinvar_url, reference_genome) + write(ht, clinvar_ht_path, repartition=False) + return ht + + def download_and_import_latest_clinvar_vcf( clinvar_url: str, reference_genome: ReferenceGenome, ) -> hl.Table: - """Downloads the latest clinvar VCF from the NCBI FTP server, imports it to a MT and returns that.""" - with tempfile.NamedTemporaryFile(suffix='.vcf.gz', delete=False) as tmp_file: urllib.request.urlretrieve(clinvar_url, tmp_file.name) # noqa: S310 gcs_tmp_file_name = os.path.join( @@ -158,14 +176,8 @@ def _parse_clinvar_release_date(local_vcf_path: str) -> str: def join_to_submission_summary_ht(vcf_ht: hl.Table) -> hl.Table: # https://ftp.ncbi.nlm.nih.gov/pub/clinvar/tab_delimited/README - submission_summary.txt - logger.info('Getting clinvar submission summary') + logger.info('Getting clinvar submission summary from NCBI FTP server') ht = download_and_import_clinvar_submission_summary() - ht = ht.rename({'#VariationID': 'VariationID'}) - ht = ht.select('VariationID', 'Submitter', 'ReportedPhenotypeInfo') - ht = ht.group_by('VariationID').aggregate( - Submitters=hl.agg.collect(ht.Submitter), - Conditions=hl.agg.collect(ht.ReportedPhenotypeInfo), - ) return vcf_ht.annotate( submitters=ht[vcf_ht.rsid].Submitters, conditions=ht[vcf_ht.rsid].Conditions, @@ -193,15 +205,25 @@ def download_and_import_clinvar_submission_summary() -> hl.Table: os.path.basename(unzipped_tmp_file.name), ) safely_move_to_gcs(unzipped_tmp_file.name, gcs_tmp_file_name) - return hl.import_table( - gcs_tmp_file_name, - force=True, - filter='^(#[^:]*:|^##).*$', # removes all comments except for the header line - types={ - '#VariationID': hl.tstr, - 'Submitter': hl.tstr, - 'ReportedPhenotypeInfo': hl.tstr, - }, - missing='-', - min_partitions=MIN_HT_PARTITIONS, - ) + return import_submission_table(gcs_tmp_file_name) + + +def import_submission_table(file_name: str) -> hl.Table: + ht = hl.import_table( + file_name, + force=True, + filter='^(#[^:]*:|^##).*$', # removes all comments except for the header line + types={ + '#VariationID': hl.tstr, + 'Submitter': hl.tstr, + 'ReportedPhenotypeInfo': hl.tstr, + }, + missing='-', + min_partitions=MIN_HT_PARTITIONS, + ) + ht = ht.rename({'#VariationID': 'VariationID'}) + ht = ht.select('VariationID', 'Submitter', 'ReportedPhenotypeInfo') + return ht.group_by('VariationID').aggregate( + Submitters=hl.agg.collect(ht.Submitter), + Conditions=hl.agg.collect(ht.ReportedPhenotypeInfo), + ) diff --git a/v03_pipeline/lib/reference_data/clinvar_test.py b/v03_pipeline/lib/reference_data/clinvar_test.py index c3a8bb14a..8e1b509ff 100644 --- a/v03_pipeline/lib/reference_data/clinvar_test.py +++ b/v03_pipeline/lib/reference_data/clinvar_test.py @@ -4,6 +4,7 @@ import hail as hl from v03_pipeline.lib.reference_data.clinvar import ( + import_submission_table, join_to_submission_summary_ht, parsed_and_mapped_clnsigconf, parsed_clnsig, @@ -87,56 +88,10 @@ def test_parsed_and_mapped_clnsigconf(self): ) @mock.patch( - 'v03_pipeline.lib.reference_data.clinvar.download_and_import_clinvar_submission_summary', + 'v03_pipeline.lib.reference_data.clinvar.hl.import_table', ) - def test_join_to_submission_summary_ht(self, mock_download): - clinvar_enums_struct = hl.Struct( - CLNSIG=[ - 'Pathogenic/Likely_pathogenic/Pathogenic', - '_low_penetrance', - ], - CLNSIGCONF=[ - 'Pathogenic(8)|Likely_pathogenic(2)|Pathogenic', - '_low_penetrance(1)|Uncertain_significance(1)', - ], - CLNREVSTAT=['no_classifications_from_unflagged_records'], - ) - vcf_ht = hl.Table.parallelize( - [ - { - 'locus': hl.Locus( - contig='chr1', - position=871269, - reference_genome='GRCh38', - ), - 'alleles': ['A', 'C'], - 'rsid': '5', - 'info': hl.Struct(ALLELEID=1, **clinvar_enums_struct), - }, - { - 'locus': hl.Locus( - contig='chr1', - position=871269, - reference_genome='GRCh38', - ), - 'alleles': ['A', 'AC'], - 'rsid': '7', - 'info': hl.Struct(ALLELEID=1, **clinvar_enums_struct), - }, - ], - hl.tstruct( - locus=hl.tlocus('GRCh38'), - alleles=hl.tarray(hl.tstr), - rsid=hl.tstr, - info=hl.tstruct( - ALLELEID=hl.tint32, - CLNSIG=hl.tarray(hl.tstr), - CLNSIGCONF=hl.tarray(hl.tstr), - CLNREVSTAT=hl.tarray(hl.tstr), - ), - ), - ) - mock_download.return_value = hl.Table.parallelize( + def test_import_submission_table(self, mock_import_table): + mock_import_table.return_value = hl.Table.parallelize( [ { '#VariationID': '5', @@ -164,34 +119,20 @@ def test_join_to_submission_summary_ht(self, mock_download): 'ReportedPhenotypeInfo': 'na:B', }, ], - hl.tstruct( - **{ - '#VariationID': hl.tstr, - 'Submitter': hl.tstr, - 'ReportedPhenotypeInfo': hl.tstr, - }, - ), ) - ht = join_to_submission_summary_ht(vcf_ht) + ht = import_submission_table('mock_file_name') self.assertEqual( ht.collect(), [ hl.Struct( - locus=hl.Locus( - contig='chr1', - position=871269, - reference_genome='GRCh38', - ), - alleles=['A', 'C'], - rsid='5', - info=hl.Struct(ALLELEID=1, **clinvar_enums_struct), - submitters=[ + VariationID='5', + Submitters=[ 'OMIM', 'Broad Institute Rare Disease Group, Broad Institute', 'PreventionGenetics, part of Exact Sciences', 'Invitae', ], - conditions=[ + Conditions=[ 'C3661900:not provided', 'C0023264:Leigh syndrome', 'na:FOXRED1-related condition', @@ -199,16 +140,116 @@ def test_join_to_submission_summary_ht(self, mock_download): ], ), hl.Struct( - locus=hl.Locus( + VariationID='6', + Submitters=['A'], + Conditions=['na:B'], + ), + ], + ) + + @mock.patch( + 'v03_pipeline.lib.reference_data.clinvar.download_and_import_clinvar_submission_summary', + ) + def test_join_to_submission_summary_ht( + self, + mock_download, + ): + vcf_ht = hl.Table.parallelize( + [ + { + 'locus': hl.Locus( contig='chr1', position=871269, reference_genome='GRCh38', ), - alleles=['A', 'AC'], - rsid='7', - info=hl.Struct(ALLELEID=1, **clinvar_enums_struct), - submitters=None, - conditions=None, - ), + 'alleles': ['A', 'C'], + 'rsid': '5', + 'info': hl.Struct(ALLELEID=1), + }, + { + 'locus': hl.Locus( + contig='chr1', + position=871269, + reference_genome='GRCh38', + ), + 'alleles': ['A', 'AC'], + 'rsid': '7', + 'info': hl.Struct(ALLELEID=1), + }, + ], + hl.tstruct( + locus=hl.tlocus('GRCh38'), + alleles=hl.tarray(hl.tstr), + rsid=hl.tstr, + info=hl.tstruct(ALLELEID=hl.tint32), + ), + ) + submitters_ht = hl.Table.parallelize( + [ + { + 'VariationID': '5', + 'Submitters': [ + 'OMIM', + 'Broad Institute Rare Disease Group, Broad Institute', + 'PreventionGenetics, part of Exact Sciences', + 'Invitae', + ], + 'Conditions': [ + 'C3661900:not provided', + 'C0023264:Leigh syndrome', + 'na:FOXRED1-related condition', + 'C4748791:Mitochondrial complex 1 deficiency, nuclear type 19', + ], + }, + {'VariationID': '6', 'Submitters': ['A'], 'Conditions': ['na:B']}, ], + hl.tstruct( + VariationID=hl.tstr, + Submitters=hl.tarray(hl.tstr), + Conditions=hl.tarray(hl.tstr), + ), + key='VariationID', + ) + expected_clinvar_ht_rows = [ + hl.Struct( + locus=hl.Locus( + contig='chr1', + position=871269, + reference_genome='GRCh38', + ), + alleles=['A', 'C'], + rsid='5', + info=hl.Struct(ALLELEID=1), + submitters=[ + 'OMIM', + 'Broad Institute Rare Disease Group, Broad Institute', + 'PreventionGenetics, part of Exact Sciences', + 'Invitae', + ], + conditions=[ + 'C3661900:not provided', + 'C0023264:Leigh syndrome', + 'na:FOXRED1-related condition', + 'C4748791:Mitochondrial complex 1 deficiency, nuclear type 19', + ], + ), + hl.Struct( + locus=hl.Locus( + contig='chr1', + position=871269, + reference_genome='GRCh38', + ), + alleles=['A', 'AC'], + rsid='7', + info=hl.Struct(ALLELEID=1), + submitters=None, + conditions=None, + ), + ] + + mock_download.return_value = submitters_ht + ht = join_to_submission_summary_ht(vcf_ht) + self.assertEqual( + ht.collect(), + expected_clinvar_ht_rows, ) diff --git a/v03_pipeline/lib/reference_data/config.py b/v03_pipeline/lib/reference_data/config.py index 54a9f4603..f38d2773f 100644 --- a/v03_pipeline/lib/reference_data/config.py +++ b/v03_pipeline/lib/reference_data/config.py @@ -11,7 +11,7 @@ from v03_pipeline.lib.reference_data.clinvar import ( CLINVAR_ASSERTIONS, CLINVAR_GOLD_STARS_LOOKUP, - download_and_import_latest_clinvar_vcf, + get_clinvar_ht, parsed_and_mapped_clnsigconf, parsed_clnsig, ) @@ -190,8 +190,8 @@ def custom_mpc_select(ht): }, 'clinvar': { '37': { - 'custom_import': download_and_import_latest_clinvar_vcf, - 'source_path': 'ftp://ftp.ncbi.nlm.nih.gov/pub/clinvar/vcf_GRCh37/clinvar.vcf.gz', + 'custom_import': get_clinvar_ht, + 'source_path': 'https://ftp.ncbi.nlm.nih.gov/pub/clinvar/vcf_GRCh37/clinvar.vcf.gz', 'select': {'alleleId': 'info.ALLELEID'}, 'custom_select': clinvar_custom_select, 'enum_select': { @@ -201,8 +201,8 @@ def custom_mpc_select(ht): 'filter': lambda ht: ht.locus.contig != 'MT', }, '38': { - 'custom_import': download_and_import_latest_clinvar_vcf, - 'source_path': 'ftp://ftp.ncbi.nlm.nih.gov/pub/clinvar/vcf_GRCh38/clinvar.vcf.gz', + 'custom_import': get_clinvar_ht, + 'source_path': 'https://ftp.ncbi.nlm.nih.gov/pub/clinvar/vcf_GRCh38/clinvar.vcf.gz', 'select': {'alleleId': 'info.ALLELEID'}, 'custom_select': clinvar_custom_select, 'enum_select': { @@ -442,8 +442,8 @@ def custom_mpc_select(ht): }, 'clinvar_mito': { '37': { - 'custom_import': download_and_import_latest_clinvar_vcf, - 'source_path': 'ftp://ftp.ncbi.nlm.nih.gov/pub/clinvar/vcf_GRCh37/clinvar.vcf.gz', + 'custom_import': get_clinvar_ht, + 'source_path': 'https://ftp.ncbi.nlm.nih.gov/pub/clinvar/vcf_GRCh37/clinvar.vcf.gz', 'select': {'alleleId': 'info.ALLELEID'}, 'custom_select': clinvar_custom_select, 'enum_select': { @@ -453,8 +453,8 @@ def custom_mpc_select(ht): 'filter': lambda ht: ht.locus.contig == 'MT', }, '38': { - 'custom_import': download_and_import_latest_clinvar_vcf, - 'source_path': 'ftp://ftp.ncbi.nlm.nih.gov/pub/clinvar/vcf_GRCh38/clinvar.vcf.gz', + 'custom_import': get_clinvar_ht, + 'source_path': 'https://ftp.ncbi.nlm.nih.gov/pub/clinvar/vcf_GRCh38/clinvar.vcf.gz', 'select': {'alleleId': 'info.ALLELEID'}, 'custom_select': clinvar_custom_select, 'enum_select': { diff --git a/v03_pipeline/lib/tasks/reference_data/updated_reference_dataset_collection_test.py b/v03_pipeline/lib/tasks/reference_data/updated_reference_dataset_collection_test.py index 9995225c0..b3fdde4bb 100644 --- a/v03_pipeline/lib/tasks/reference_data/updated_reference_dataset_collection_test.py +++ b/v03_pipeline/lib/tasks/reference_data/updated_reference_dataset_collection_test.py @@ -221,7 +221,7 @@ def test_update_task_with_empty_reference_data_table( paths=hl.Struct( primate_ai='gs://seqr-reference-data/GRCh38/primate_ai/PrimateAI_scores_v0.2.liftover_grch38.ht', cadd='gs://seqr-reference-data/GRCh38/CADD/CADD_snvs_and_indels.v1.6.ht', - clinvar='ftp://ftp.ncbi.nlm.nih.gov/pub/clinvar/vcf_GRCh38/clinvar.vcf.gz', + clinvar='https://ftp.ncbi.nlm.nih.gov/pub/clinvar/vcf_GRCh38/clinvar.vcf.gz', ), versions=hl.Struct( primate_ai='v0.3', diff --git a/v03_pipeline/lib/tasks/write_project_family_tables.py b/v03_pipeline/lib/tasks/write_project_family_tables.py index 26253a1da..fbac1c206 100644 --- a/v03_pipeline/lib/tasks/write_project_family_tables.py +++ b/v03_pipeline/lib/tasks/write_project_family_tables.py @@ -2,8 +2,11 @@ import luigi import luigi.util +from v03_pipeline.lib.misc.io import import_pedigree +from v03_pipeline.lib.misc.pedigree import parse_pedigree_ht_to_families from v03_pipeline.lib.tasks.base.base_hail_table import BaseHailTableTask from v03_pipeline.lib.tasks.base.base_loading_run_params import BaseLoadingRunParams +from v03_pipeline.lib.tasks.files import RawFileTask from v03_pipeline.lib.tasks.update_project_table import UpdateProjectTableTask from v03_pipeline.lib.tasks.write_family_table import WriteFamilyTableTask @@ -30,13 +33,26 @@ def complete(self) -> bool: def run(self): # https://luigi.readthedocs.io/en/stable/tasks.html#dynamic-dependencies + # Fetch family guids from project table update_project_table_task: luigi.Target = yield self.clone( UpdateProjectTableTask, force=False, ) project_ht = hl.read_table(update_project_table_task.path) - family_guids = hl.eval(project_ht.globals.family_guids) - for family_guid in family_guids: + family_guids_in_project_table = set(hl.eval(project_ht.globals.family_guids)) + + # Fetch family guids from pedigree + pedigree_ht_task: luigi.Target = yield RawFileTask(self.project_pedigree_path) + pedigree_ht = import_pedigree(pedigree_ht_task.path) + families_guids_in_pedigree = { + f.family_guid for f in parse_pedigree_ht_to_families(pedigree_ht) + } + + # Intersect them + family_guids_to_load = ( + family_guids_in_project_table & families_guids_in_pedigree + ) + for family_guid in family_guids_to_load: self.dynamic_write_family_table_tasks.add( self.clone(WriteFamilyTableTask, family_guid=family_guid), ) diff --git a/v03_pipeline/lib/tasks/write_project_family_tables_test.py b/v03_pipeline/lib/tasks/write_project_family_tables_test.py index 37bb9a556..074d0a294 100644 --- a/v03_pipeline/lib/tasks/write_project_family_tables_test.py +++ b/v03_pipeline/lib/tasks/write_project_family_tables_test.py @@ -2,6 +2,7 @@ import luigi.worker from v03_pipeline.lib.model import DatasetType, ReferenceGenome, SampleType +from v03_pipeline.lib.paths import project_table_path from v03_pipeline.lib.tasks.write_project_family_tables import ( WriteProjectFamilyTablesTask, ) @@ -10,9 +11,12 @@ TEST_SNV_INDEL_VCF = 'v03_pipeline/var/test/callsets/1kg_30variants.vcf' TEST_REMAP = 'v03_pipeline/var/test/remaps/test_remap_1.tsv' TEST_PEDIGREE_4 = 'v03_pipeline/var/test/pedigrees/test_pedigree_4.tsv' +TEST_PEDIGREE_4_SUBSET = 'v03_pipeline/var/test/pedigrees/test_pedigree_4_subset.tsv' class WriteProjectFamilyTablesTest(MockedDatarootTestCase): + maxDiff = None + def test_snv_write_project_family_tables_task(self) -> None: worker = luigi.worker.Worker() write_project_family_tables = WriteProjectFamilyTablesTask( @@ -51,3 +55,54 @@ def test_snv_write_project_family_tables_task(self) -> None: [['NA20888_1']], ], ) + + write_project_family_tables_subset = WriteProjectFamilyTablesTask( + reference_genome=ReferenceGenome.GRCh38, + dataset_type=DatasetType.SNV_INDEL, + sample_type=SampleType.WGS, + callset_path=TEST_SNV_INDEL_VCF, + project_guid='R0113_test_project', + project_remap_path=TEST_REMAP, + project_pedigree_path=TEST_PEDIGREE_4_SUBSET, + skip_validation=True, + skip_check_sex_and_relatedness=True, + ) + worker.add(write_project_family_tables_subset) + worker.run() + self.assertTrue(write_project_family_tables_subset.complete()) + hts = [ + hl.read_table(write_family_table_task.output().path) + for write_family_table_task in write_project_family_tables_subset.dynamic_write_family_table_tasks + ] + # Only one family table written + self.assertEqual( + len(hts), + 1, + ) + # Project table still contains all family guids + self.assertCountEqual( + hl.read_table( + project_table_path( + ReferenceGenome.GRCh38, + DatasetType.SNV_INDEL, + 'R0113_test_project', + ), + ).family_guids.collect(), + [ + [ + '123_1', + '234_1', + '345_1', + '456_1', + '567_1', + '678_1', + '789_1', + '890_1', + '901_1', + 'bcd_1', + 'cde_1', + 'def_1', + 'efg_1', + ], + ], + ) diff --git a/v03_pipeline/var/test/pedigrees/test_pedigree_4_subset.tsv b/v03_pipeline/var/test/pedigrees/test_pedigree_4_subset.tsv new file mode 100644 index 000000000..63e2addd8 --- /dev/null +++ b/v03_pipeline/var/test/pedigrees/test_pedigree_4_subset.tsv @@ -0,0 +1,2 @@ +Project_GUID Family_GUID Family_ID Individual_ID Paternal_ID Maternal_ID Sex +R0114_project4 123_1 123 NA19675_1 F