Skip to content

Dev #839

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Jul 18, 2024
Merged

Dev #839

Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 6 additions & 4 deletions v03_pipeline/lib/misc/io.py
Original file line number Diff line number Diff line change
Expand Up @@ -214,10 +214,12 @@ def checkpoint(t: hl.Table | hl.MatrixTable) -> tuple[hl.Table | hl.MatrixTable,
def write(
t: hl.Table | hl.MatrixTable,
destination_path: str,
repartition: bool = True,
) -> hl.Table | hl.MatrixTable:
t, path = checkpoint(t)
t = t.repartition(
compute_hail_n_partitions(file_size_bytes(path)),
shuffle=False,
)
if repartition:
t = t.repartition(
compute_hail_n_partitions(file_size_bytes(path)),
shuffle=False,
)
return t.write(destination_path, overwrite=True)
7 changes: 7 additions & 0 deletions v03_pipeline/lib/paths.py
Original file line number Diff line number Diff line change
Expand Up @@ -282,3 +282,10 @@ def new_variants_table_path(
run_id,
'new_variants.ht',
)


def clinvar_dataset_path(reference_genome: ReferenceGenome, etag: str) -> str:
return os.path.join(
Env.HAIL_TMPDIR,
f'clinvar-{reference_genome.value}-{etag}.ht',
)
64 changes: 43 additions & 21 deletions v03_pipeline/lib/reference_data/clinvar.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,15 @@
import urllib

import hail as hl
import hailtop.fs as hfs
import requests

from v03_pipeline.lib.annotations.enums import CLINVAR_PATHOGENICITIES_LOOKUP
from v03_pipeline.lib.logger import get_logger
from v03_pipeline.lib.misc.io import write
from v03_pipeline.lib.model import Env
from v03_pipeline.lib.model.definitions import ReferenceGenome
from v03_pipeline.lib.paths import clinvar_dataset_path

CLINVAR_ASSERTIONS = [
'Affects',
Expand Down Expand Up @@ -110,12 +114,26 @@ def parsed_and_mapped_clnsigconf(ht: hl.Table):
)


def get_clinvar_ht(
clinvar_url: str,
reference_genome: ReferenceGenome,
):
etag = requests.head(clinvar_url, timeout=10).headers.get('ETag').strip('"')
clinvar_ht_path = clinvar_dataset_path(reference_genome, etag)
if hfs.exists(clinvar_ht_path):
logger.info(f'Try using cached clinvar ht with etag {etag}')
ht = hl.read_table(clinvar_ht_path)
else:
logger.info('Cached clinvar ht not found, downloading latest clinvar vcf')
ht = download_and_import_latest_clinvar_vcf(clinvar_url, reference_genome)
write(ht, clinvar_ht_path, repartition=False)
return ht


def download_and_import_latest_clinvar_vcf(
clinvar_url: str,
reference_genome: ReferenceGenome,
) -> hl.Table:
"""Downloads the latest clinvar VCF from the NCBI FTP server, imports it to a MT and returns that."""

with tempfile.NamedTemporaryFile(suffix='.vcf.gz', delete=False) as tmp_file:
urllib.request.urlretrieve(clinvar_url, tmp_file.name) # noqa: S310
gcs_tmp_file_name = os.path.join(
Expand Down Expand Up @@ -158,14 +176,8 @@ def _parse_clinvar_release_date(local_vcf_path: str) -> str:

def join_to_submission_summary_ht(vcf_ht: hl.Table) -> hl.Table:
# https://ftp.ncbi.nlm.nih.gov/pub/clinvar/tab_delimited/README - submission_summary.txt
logger.info('Getting clinvar submission summary')
logger.info('Getting clinvar submission summary from NCBI FTP server')
ht = download_and_import_clinvar_submission_summary()
ht = ht.rename({'#VariationID': 'VariationID'})
ht = ht.select('VariationID', 'Submitter', 'ReportedPhenotypeInfo')
ht = ht.group_by('VariationID').aggregate(
Submitters=hl.agg.collect(ht.Submitter),
Conditions=hl.agg.collect(ht.ReportedPhenotypeInfo),
)
return vcf_ht.annotate(
submitters=ht[vcf_ht.rsid].Submitters,
conditions=ht[vcf_ht.rsid].Conditions,
Expand Down Expand Up @@ -193,15 +205,25 @@ def download_and_import_clinvar_submission_summary() -> hl.Table:
os.path.basename(unzipped_tmp_file.name),
)
safely_move_to_gcs(unzipped_tmp_file.name, gcs_tmp_file_name)
return hl.import_table(
gcs_tmp_file_name,
force=True,
filter='^(#[^:]*:|^##).*$', # removes all comments except for the header line
types={
'#VariationID': hl.tstr,
'Submitter': hl.tstr,
'ReportedPhenotypeInfo': hl.tstr,
},
missing='-',
min_partitions=MIN_HT_PARTITIONS,
)
return import_submission_table(gcs_tmp_file_name)


def import_submission_table(file_name: str) -> hl.Table:
ht = hl.import_table(
file_name,
force=True,
filter='^(#[^:]*:|^##).*$', # removes all comments except for the header line
types={
'#VariationID': hl.tstr,
'Submitter': hl.tstr,
'ReportedPhenotypeInfo': hl.tstr,
},
missing='-',
min_partitions=MIN_HT_PARTITIONS,
)
ht = ht.rename({'#VariationID': 'VariationID'})
ht = ht.select('VariationID', 'Submitter', 'ReportedPhenotypeInfo')
return ht.group_by('VariationID').aggregate(
Submitters=hl.agg.collect(ht.Submitter),
Conditions=hl.agg.collect(ht.ReportedPhenotypeInfo),
)
189 changes: 115 additions & 74 deletions v03_pipeline/lib/reference_data/clinvar_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import hail as hl

from v03_pipeline.lib.reference_data.clinvar import (
import_submission_table,
join_to_submission_summary_ht,
parsed_and_mapped_clnsigconf,
parsed_clnsig,
Expand Down Expand Up @@ -87,56 +88,10 @@ def test_parsed_and_mapped_clnsigconf(self):
)

@mock.patch(
'v03_pipeline.lib.reference_data.clinvar.download_and_import_clinvar_submission_summary',
'v03_pipeline.lib.reference_data.clinvar.hl.import_table',
)
def test_join_to_submission_summary_ht(self, mock_download):
clinvar_enums_struct = hl.Struct(
CLNSIG=[
'Pathogenic/Likely_pathogenic/Pathogenic',
'_low_penetrance',
],
CLNSIGCONF=[
'Pathogenic(8)|Likely_pathogenic(2)|Pathogenic',
'_low_penetrance(1)|Uncertain_significance(1)',
],
CLNREVSTAT=['no_classifications_from_unflagged_records'],
)
vcf_ht = hl.Table.parallelize(
[
{
'locus': hl.Locus(
contig='chr1',
position=871269,
reference_genome='GRCh38',
),
'alleles': ['A', 'C'],
'rsid': '5',
'info': hl.Struct(ALLELEID=1, **clinvar_enums_struct),
},
{
'locus': hl.Locus(
contig='chr1',
position=871269,
reference_genome='GRCh38',
),
'alleles': ['A', 'AC'],
'rsid': '7',
'info': hl.Struct(ALLELEID=1, **clinvar_enums_struct),
},
],
hl.tstruct(
locus=hl.tlocus('GRCh38'),
alleles=hl.tarray(hl.tstr),
rsid=hl.tstr,
info=hl.tstruct(
ALLELEID=hl.tint32,
CLNSIG=hl.tarray(hl.tstr),
CLNSIGCONF=hl.tarray(hl.tstr),
CLNREVSTAT=hl.tarray(hl.tstr),
),
),
)
mock_download.return_value = hl.Table.parallelize(
def test_import_submission_table(self, mock_import_table):
mock_import_table.return_value = hl.Table.parallelize(
[
{
'#VariationID': '5',
Expand Down Expand Up @@ -164,51 +119,137 @@ def test_join_to_submission_summary_ht(self, mock_download):
'ReportedPhenotypeInfo': 'na:B',
},
],
hl.tstruct(
**{
'#VariationID': hl.tstr,
'Submitter': hl.tstr,
'ReportedPhenotypeInfo': hl.tstr,
},
),
)
ht = join_to_submission_summary_ht(vcf_ht)
ht = import_submission_table('mock_file_name')
self.assertEqual(
ht.collect(),
[
hl.Struct(
locus=hl.Locus(
contig='chr1',
position=871269,
reference_genome='GRCh38',
),
alleles=['A', 'C'],
rsid='5',
info=hl.Struct(ALLELEID=1, **clinvar_enums_struct),
submitters=[
VariationID='5',
Submitters=[
'OMIM',
'Broad Institute Rare Disease Group, Broad Institute',
'PreventionGenetics, part of Exact Sciences',
'Invitae',
],
conditions=[
Conditions=[
'C3661900:not provided',
'C0023264:Leigh syndrome',
'na:FOXRED1-related condition',
'C4748791:Mitochondrial complex 1 deficiency, nuclear type 19',
],
),
hl.Struct(
locus=hl.Locus(
VariationID='6',
Submitters=['A'],
Conditions=['na:B'],
),
],
)

@mock.patch(
'v03_pipeline.lib.reference_data.clinvar.download_and_import_clinvar_submission_summary',
)
def test_join_to_submission_summary_ht(
self,
mock_download,
):
vcf_ht = hl.Table.parallelize(
[
{
'locus': hl.Locus(
contig='chr1',
position=871269,
reference_genome='GRCh38',
),
alleles=['A', 'AC'],
rsid='7',
info=hl.Struct(ALLELEID=1, **clinvar_enums_struct),
submitters=None,
conditions=None,
),
'alleles': ['A', 'C'],
'rsid': '5',
'info': hl.Struct(ALLELEID=1),
},
{
'locus': hl.Locus(
contig='chr1',
position=871269,
reference_genome='GRCh38',
),
'alleles': ['A', 'AC'],
'rsid': '7',
'info': hl.Struct(ALLELEID=1),
},
],
hl.tstruct(
locus=hl.tlocus('GRCh38'),
alleles=hl.tarray(hl.tstr),
rsid=hl.tstr,
info=hl.tstruct(ALLELEID=hl.tint32),
),
)
submitters_ht = hl.Table.parallelize(
[
{
'VariationID': '5',
'Submitters': [
'OMIM',
'Broad Institute Rare Disease Group, Broad Institute',
'PreventionGenetics, part of Exact Sciences',
'Invitae',
],
'Conditions': [
'C3661900:not provided',
'C0023264:Leigh syndrome',
'na:FOXRED1-related condition',
'C4748791:Mitochondrial complex 1 deficiency, nuclear type 19',
],
},
{'VariationID': '6', 'Submitters': ['A'], 'Conditions': ['na:B']},
],
hl.tstruct(
VariationID=hl.tstr,
Submitters=hl.tarray(hl.tstr),
Conditions=hl.tarray(hl.tstr),
),
key='VariationID',
)
expected_clinvar_ht_rows = [
hl.Struct(
locus=hl.Locus(
contig='chr1',
position=871269,
reference_genome='GRCh38',
),
alleles=['A', 'C'],
rsid='5',
info=hl.Struct(ALLELEID=1),
submitters=[
'OMIM',
'Broad Institute Rare Disease Group, Broad Institute',
'PreventionGenetics, part of Exact Sciences',
'Invitae',
],
conditions=[
'C3661900:not provided',
'C0023264:Leigh syndrome',
'na:FOXRED1-related condition',
'C4748791:Mitochondrial complex 1 deficiency, nuclear type 19',
],
),
hl.Struct(
locus=hl.Locus(
contig='chr1',
position=871269,
reference_genome='GRCh38',
),
alleles=['A', 'AC'],
rsid='7',
info=hl.Struct(ALLELEID=1),
submitters=None,
conditions=None,
),
]

mock_download.return_value = submitters_ht
ht = join_to_submission_summary_ht(vcf_ht)
self.assertEqual(
ht.collect(),
expected_clinvar_ht_rows,
)
Loading
Loading