Skip to content

Dev #1001

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Dec 13, 2024
Merged

Dev #1001

Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions requirements.in
Original file line number Diff line number Diff line change
Expand Up @@ -4,3 +4,4 @@ gnomad==0.6.4
aiofiles==24.1.0
pydantic==2.8.2
google-cloud-dataproc==5.14.0
google-cloud-bigquery==3.27.0
22 changes: 18 additions & 4 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -97,17 +97,30 @@ frozenlist==1.5.0
gnomad==0.6.4
# via -r requirements.in
google-api-core[grpc]==2.22.0
# via google-cloud-dataproc
# via
# google-cloud-bigquery
# google-cloud-core
# google-cloud-dataproc
google-auth==2.35.0
# via
# google-api-core
# google-auth-oauthlib
# google-cloud-bigquery
# google-cloud-core
# google-cloud-dataproc
# hail
google-auth-oauthlib==0.8.0
# via hail
google-cloud-bigquery==3.27.0
# via -r requirements.in
google-cloud-core==2.4.1
# via google-cloud-bigquery
google-cloud-dataproc==5.14.0
# via -r requirements.in
google-crc32c==1.6.0
# via google-resumable-media
google-resumable-media==2.7.2
# via google-cloud-bigquery
googleapis-common-protos[grpc]==1.65.0
# via
# google-api-core
Expand Down Expand Up @@ -197,6 +210,7 @@ orjson==3.10.10
packaging==24.1
# via
# bokeh
# google-cloud-bigquery
# plotly
pandas==2.2.3
# via
Expand Down Expand Up @@ -256,16 +270,15 @@ pygments==2.18.0
# ipython
# rich
pyjwt[crypto]==2.9.0
# via
# msal
# pyjwt
# via msal
pyspark==3.5.3
# via hail
python-daemon==3.1.0
# via luigi
python-dateutil==2.9.0.post0
# via
# botocore
# google-cloud-bigquery
# luigi
# pandas
python-json-logger==2.0.7
Expand All @@ -282,6 +295,7 @@ requests==2.32.3
# via
# azure-core
# google-api-core
# google-cloud-bigquery
# hail
# msal
# msrest
Expand Down
10 changes: 2 additions & 8 deletions v03_pipeline/lib/misc/allele_registry.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,9 @@
import hailtop.fs as hfs
import requests
from requests import HTTPError
from requests.adapters import HTTPAdapter, Retry

from v03_pipeline.lib.logger import get_logger
from v03_pipeline.lib.misc.requests import requests_retry_session
from v03_pipeline.lib.model import Env, ReferenceGenome

MAX_VARIANTS_PER_REQUEST = 1000000
Expand Down Expand Up @@ -96,13 +96,7 @@ def register_alleles(
logger.info('Calling the ClinGen Allele Registry')
with hfs.open(formatted_vcf_file_name, 'r') as vcf_in:
data = vcf_in.read()
s = requests.Session()
retries = Retry(
total=5,
backoff_factor=1,
status_forcelist=[500, 502, 503, 504],
)
s.mount('https://', HTTPAdapter(max_retries=retries))
s = requests_retry_session()
res = s.put(
url=build_url(base_url, reference_genome),
data=data,
Expand Down
36 changes: 36 additions & 0 deletions v03_pipeline/lib/misc/gcp.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
import datetime

import google.auth
import google.auth.transport.requests
import google.oauth2.credentials
import pytz

SERVICE_ACCOUNT_CREDENTIALS = None
SOCIAL_AUTH_GOOGLE_OAUTH2_SCOPE = [
'https://www.googleapis.com/auth/userinfo.profile',
'https://www.googleapis.com/auth/userinfo.email',
'openid',
]
ONE_MINUTE_S = 60


def get_service_account_credentials() -> google.oauth2.credentials.Credentials:
global SERVICE_ACCOUNT_CREDENTIALS
if not SERVICE_ACCOUNT_CREDENTIALS:
SERVICE_ACCOUNT_CREDENTIALS, _ = google.auth.default(
scopes=SOCIAL_AUTH_GOOGLE_OAUTH2_SCOPE,
)
tz = pytz.UTC
if (
SERVICE_ACCOUNT_CREDENTIALS.token
and (
tz.localize(SERVICE_ACCOUNT_CREDENTIALS.expiry)
- datetime.datetime.now(tz=tz)
).total_seconds()
> ONE_MINUTE_S
):
return SERVICE_ACCOUNT_CREDENTIALS
SERVICE_ACCOUNT_CREDENTIALS.refresh(
request=google.auth.transport.requests.Request(),
)
return SERVICE_ACCOUNT_CREDENTIALS
6 changes: 5 additions & 1 deletion v03_pipeline/lib/misc/io.py
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,11 @@ def select_relevant_fields(
def import_imputed_sex(imputed_sex_path: str) -> hl.Table:
ht = hl.import_table(imputed_sex_path)
imputed_sex_lookup = hl.dict(
{s.imputed_sex_value: s.value for s in Sex},
{
imputed_sex_value: s.value
for s in Sex
for imputed_sex_value in s.imputed_sex_values
},
)
ht = ht.select(
s=ht.collaborator_sample_id,
Expand Down
1 change: 1 addition & 0 deletions v03_pipeline/lib/misc/io_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ def test_import_imputed_sex(self) -> None:
hl.Struct(s='abc_2', predicted_sex='F'),
hl.Struct(s='abc_3', predicted_sex='U'),
hl.Struct(s='abc_4', predicted_sex='XYY'),
hl.Struct(s='abc_5', predicted_sex='U'),
],
)

Expand Down
14 changes: 14 additions & 0 deletions v03_pipeline/lib/misc/requests.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
import requests
from requests.adapters import HTTPAdapter, Retry


def requests_retry_session():
s = requests.Session()
retries = Retry(
total=5,
backoff_factor=1,
status_forcelist=[500, 502, 503, 504],
)
s.mount('http://', HTTPAdapter(max_retries=retries))
s.mount('https://', HTTPAdapter(max_retries=retries))
return s
68 changes: 68 additions & 0 deletions v03_pipeline/lib/misc/terra_data_repository.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
import os
import re
from collections.abc import Generator
from concurrent.futures import ThreadPoolExecutor, as_completed

import google.cloud.bigquery
from google.cloud import bigquery

from v03_pipeline.lib.misc.gcp import get_service_account_credentials
from v03_pipeline.lib.misc.requests import requests_retry_session

BIGQUERY_METRICS = [
'collaborator_sample_id',
'predicted_sex',
]
BIGQUERY_RESOURCE = 'bigquery'
TABLE_NAME_VALIDATION_REGEX = r'datarepo-\w+.datarepo_\w+'
TDR_ROOT_URL = 'https://data.terra.bio/api/repository/v1/'


def _tdr_request(resource: str) -> dict:
service_account_token = get_service_account_credentials().token
s = requests_retry_session()
res = s.get(
url=os.path.join(TDR_ROOT_URL, resource),
headers={'Authorization': f'Bearer {service_account_token}'},
timeout=10,
)
res.raise_for_status()
return res.json()


def _get_dataset_ids() -> list[str]:
res_body = _tdr_request('datasets')
items = res_body['items']
for item in items:
if not any(x['cloudResource'] == BIGQUERY_RESOURCE for x in item['storage']):
# Hard failure on purpose to prompt manual investigation.
msg = 'Datasets without bigquery sources are unsupported'
raise ValueError(msg)
return [x['id'] for x in items]


def gen_bq_table_names() -> Generator[str]:
with ThreadPoolExecutor(max_workers=5) as executor:
futures = [
executor.submit(
_tdr_request,
f'datasets/{dataset_id}?include=ACCESS_INFORMATION',
)
for dataset_id in _get_dataset_ids()
]
for future in as_completed(futures):
result = future.result()
yield f"{result['accessInformation']['bigQuery']['projectId']}.{result['accessInformation']['bigQuery']['datasetName']}"


def bq_metrics_query(bq_table_name: str) -> google.cloud.bigquery.table.RowIterator:
if not re.match(TABLE_NAME_VALIDATION_REGEX, bq_table_name):
msg = f'{bq_table_name} does not match expected pattern'
raise ValueError(msg)
client = bigquery.Client()
return client.query_and_wait(
f"""
SELECT {','.join(BIGQUERY_METRICS)}
FROM `{bq_table_name}.sample`
""", # noqa: S608
)
Loading
Loading