Skip to content

Commit dcd40f8

Browse files
authored
Dev (#1001)
* Add service account credentialing (#997) * Add service account credentialing * ruff * feat: Handle parsing empty predicted sex into Unknown (#1000) * Add helper functions for querying `Terra Data Repository` (#998) * Add service account credentialing * ruff * First pass * tests passing * add coverage of bigquery test * change function names * use generators everywhere * bq requirement * resolver * Update sample id name * Build Sex Check Table from TDR Metrics (#999)
1 parent b399784 commit dcd40f8

23 files changed

+693
-38
lines changed

requirements.in

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,3 +4,4 @@ gnomad==0.6.4
44
aiofiles==24.1.0
55
pydantic==2.8.2
66
google-cloud-dataproc==5.14.0
7+
google-cloud-bigquery==3.27.0

requirements.txt

Lines changed: 18 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -97,17 +97,30 @@ frozenlist==1.5.0
9797
gnomad==0.6.4
9898
# via -r requirements.in
9999
google-api-core[grpc]==2.22.0
100-
# via google-cloud-dataproc
100+
# via
101+
# google-cloud-bigquery
102+
# google-cloud-core
103+
# google-cloud-dataproc
101104
google-auth==2.35.0
102105
# via
103106
# google-api-core
104107
# google-auth-oauthlib
108+
# google-cloud-bigquery
109+
# google-cloud-core
105110
# google-cloud-dataproc
106111
# hail
107112
google-auth-oauthlib==0.8.0
108113
# via hail
114+
google-cloud-bigquery==3.27.0
115+
# via -r requirements.in
116+
google-cloud-core==2.4.1
117+
# via google-cloud-bigquery
109118
google-cloud-dataproc==5.14.0
110119
# via -r requirements.in
120+
google-crc32c==1.6.0
121+
# via google-resumable-media
122+
google-resumable-media==2.7.2
123+
# via google-cloud-bigquery
111124
googleapis-common-protos[grpc]==1.65.0
112125
# via
113126
# google-api-core
@@ -197,6 +210,7 @@ orjson==3.10.10
197210
packaging==24.1
198211
# via
199212
# bokeh
213+
# google-cloud-bigquery
200214
# plotly
201215
pandas==2.2.3
202216
# via
@@ -256,16 +270,15 @@ pygments==2.18.0
256270
# ipython
257271
# rich
258272
pyjwt[crypto]==2.9.0
259-
# via
260-
# msal
261-
# pyjwt
273+
# via msal
262274
pyspark==3.5.3
263275
# via hail
264276
python-daemon==3.1.0
265277
# via luigi
266278
python-dateutil==2.9.0.post0
267279
# via
268280
# botocore
281+
# google-cloud-bigquery
269282
# luigi
270283
# pandas
271284
python-json-logger==2.0.7
@@ -282,6 +295,7 @@ requests==2.32.3
282295
# via
283296
# azure-core
284297
# google-api-core
298+
# google-cloud-bigquery
285299
# hail
286300
# msal
287301
# msrest

v03_pipeline/lib/misc/allele_registry.py

Lines changed: 2 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -8,9 +8,9 @@
88
import hailtop.fs as hfs
99
import requests
1010
from requests import HTTPError
11-
from requests.adapters import HTTPAdapter, Retry
1211

1312
from v03_pipeline.lib.logger import get_logger
13+
from v03_pipeline.lib.misc.requests import requests_retry_session
1414
from v03_pipeline.lib.model import Env, ReferenceGenome
1515

1616
MAX_VARIANTS_PER_REQUEST = 1000000
@@ -96,13 +96,7 @@ def register_alleles(
9696
logger.info('Calling the ClinGen Allele Registry')
9797
with hfs.open(formatted_vcf_file_name, 'r') as vcf_in:
9898
data = vcf_in.read()
99-
s = requests.Session()
100-
retries = Retry(
101-
total=5,
102-
backoff_factor=1,
103-
status_forcelist=[500, 502, 503, 504],
104-
)
105-
s.mount('https://', HTTPAdapter(max_retries=retries))
99+
s = requests_retry_session()
106100
res = s.put(
107101
url=build_url(base_url, reference_genome),
108102
data=data,

v03_pipeline/lib/misc/gcp.py

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
import datetime
2+
3+
import google.auth
4+
import google.auth.transport.requests
5+
import google.oauth2.credentials
6+
import pytz
7+
8+
SERVICE_ACCOUNT_CREDENTIALS = None
9+
SOCIAL_AUTH_GOOGLE_OAUTH2_SCOPE = [
10+
'https://www.googleapis.com/auth/userinfo.profile',
11+
'https://www.googleapis.com/auth/userinfo.email',
12+
'openid',
13+
]
14+
ONE_MINUTE_S = 60
15+
16+
17+
def get_service_account_credentials() -> google.oauth2.credentials.Credentials:
18+
global SERVICE_ACCOUNT_CREDENTIALS
19+
if not SERVICE_ACCOUNT_CREDENTIALS:
20+
SERVICE_ACCOUNT_CREDENTIALS, _ = google.auth.default(
21+
scopes=SOCIAL_AUTH_GOOGLE_OAUTH2_SCOPE,
22+
)
23+
tz = pytz.UTC
24+
if (
25+
SERVICE_ACCOUNT_CREDENTIALS.token
26+
and (
27+
tz.localize(SERVICE_ACCOUNT_CREDENTIALS.expiry)
28+
- datetime.datetime.now(tz=tz)
29+
).total_seconds()
30+
> ONE_MINUTE_S
31+
):
32+
return SERVICE_ACCOUNT_CREDENTIALS
33+
SERVICE_ACCOUNT_CREDENTIALS.refresh(
34+
request=google.auth.transport.requests.Request(),
35+
)
36+
return SERVICE_ACCOUNT_CREDENTIALS

v03_pipeline/lib/misc/io.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -219,7 +219,11 @@ def select_relevant_fields(
219219
def import_imputed_sex(imputed_sex_path: str) -> hl.Table:
220220
ht = hl.import_table(imputed_sex_path)
221221
imputed_sex_lookup = hl.dict(
222-
{s.imputed_sex_value: s.value for s in Sex},
222+
{
223+
imputed_sex_value: s.value
224+
for s in Sex
225+
for imputed_sex_value in s.imputed_sex_values
226+
},
223227
)
224228
ht = ht.select(
225229
s=ht.collaborator_sample_id,

v03_pipeline/lib/misc/io_test.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@ def test_import_imputed_sex(self) -> None:
4747
hl.Struct(s='abc_2', predicted_sex='F'),
4848
hl.Struct(s='abc_3', predicted_sex='U'),
4949
hl.Struct(s='abc_4', predicted_sex='XYY'),
50+
hl.Struct(s='abc_5', predicted_sex='U'),
5051
],
5152
)
5253

v03_pipeline/lib/misc/requests.py

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
import requests
2+
from requests.adapters import HTTPAdapter, Retry
3+
4+
5+
def requests_retry_session():
6+
s = requests.Session()
7+
retries = Retry(
8+
total=5,
9+
backoff_factor=1,
10+
status_forcelist=[500, 502, 503, 504],
11+
)
12+
s.mount('http://', HTTPAdapter(max_retries=retries))
13+
s.mount('https://', HTTPAdapter(max_retries=retries))
14+
return s
Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,68 @@
1+
import os
2+
import re
3+
from collections.abc import Generator
4+
from concurrent.futures import ThreadPoolExecutor, as_completed
5+
6+
import google.cloud.bigquery
7+
from google.cloud import bigquery
8+
9+
from v03_pipeline.lib.misc.gcp import get_service_account_credentials
10+
from v03_pipeline.lib.misc.requests import requests_retry_session
11+
12+
BIGQUERY_METRICS = [
13+
'collaborator_sample_id',
14+
'predicted_sex',
15+
]
16+
BIGQUERY_RESOURCE = 'bigquery'
17+
TABLE_NAME_VALIDATION_REGEX = r'datarepo-\w+.datarepo_\w+'
18+
TDR_ROOT_URL = 'https://data.terra.bio/api/repository/v1/'
19+
20+
21+
def _tdr_request(resource: str) -> dict:
22+
service_account_token = get_service_account_credentials().token
23+
s = requests_retry_session()
24+
res = s.get(
25+
url=os.path.join(TDR_ROOT_URL, resource),
26+
headers={'Authorization': f'Bearer {service_account_token}'},
27+
timeout=10,
28+
)
29+
res.raise_for_status()
30+
return res.json()
31+
32+
33+
def _get_dataset_ids() -> list[str]:
34+
res_body = _tdr_request('datasets')
35+
items = res_body['items']
36+
for item in items:
37+
if not any(x['cloudResource'] == BIGQUERY_RESOURCE for x in item['storage']):
38+
# Hard failure on purpose to prompt manual investigation.
39+
msg = 'Datasets without bigquery sources are unsupported'
40+
raise ValueError(msg)
41+
return [x['id'] for x in items]
42+
43+
44+
def gen_bq_table_names() -> Generator[str]:
45+
with ThreadPoolExecutor(max_workers=5) as executor:
46+
futures = [
47+
executor.submit(
48+
_tdr_request,
49+
f'datasets/{dataset_id}?include=ACCESS_INFORMATION',
50+
)
51+
for dataset_id in _get_dataset_ids()
52+
]
53+
for future in as_completed(futures):
54+
result = future.result()
55+
yield f"{result['accessInformation']['bigQuery']['projectId']}.{result['accessInformation']['bigQuery']['datasetName']}"
56+
57+
58+
def bq_metrics_query(bq_table_name: str) -> google.cloud.bigquery.table.RowIterator:
59+
if not re.match(TABLE_NAME_VALIDATION_REGEX, bq_table_name):
60+
msg = f'{bq_table_name} does not match expected pattern'
61+
raise ValueError(msg)
62+
client = bigquery.Client()
63+
return client.query_and_wait(
64+
f"""
65+
SELECT {','.join(BIGQUERY_METRICS)}
66+
FROM `{bq_table_name}.sample`
67+
""", # noqa: S608
68+
)

0 commit comments

Comments
 (0)