Skip to content

Commit 533d1d4

Browse files
authored
Add service account credentialing (#997)
* Add service account credentialing * ruff
1 parent 0b87970 commit 533d1d4

File tree

3 files changed

+58
-1
lines changed

3 files changed

+58
-1
lines changed

v03_pipeline/lib/misc/gcp.py

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
import datetime
2+
3+
import google.auth
4+
import google.auth.transport.requests
5+
import google.oauth2.credentials
6+
import pytz
7+
8+
SERVICE_ACCOUNT_CREDENTIALS = None
9+
SOCIAL_AUTH_GOOGLE_OAUTH2_SCOPE = [
10+
'https://www.googleapis.com/auth/userinfo.profile',
11+
'https://www.googleapis.com/auth/userinfo.email',
12+
'openid',
13+
]
14+
ONE_MINUTE_S = 60
15+
16+
17+
def get_service_account_credentials() -> google.oauth2.credentials.Credentials:
18+
global SERVICE_ACCOUNT_CREDENTIALS
19+
if not SERVICE_ACCOUNT_CREDENTIALS:
20+
SERVICE_ACCOUNT_CREDENTIALS, _ = google.auth.default(
21+
scopes=SOCIAL_AUTH_GOOGLE_OAUTH2_SCOPE,
22+
)
23+
tz = pytz.UTC
24+
if (
25+
SERVICE_ACCOUNT_CREDENTIALS.token
26+
and (
27+
tz.localize(SERVICE_ACCOUNT_CREDENTIALS.expiry)
28+
- datetime.datetime.now(tz=tz)
29+
).total_seconds()
30+
> ONE_MINUTE_S
31+
):
32+
return SERVICE_ACCOUNT_CREDENTIALS
33+
SERVICE_ACCOUNT_CREDENTIALS.refresh(
34+
request=google.auth.transport.requests.Request(),
35+
)
36+
return SERVICE_ACCOUNT_CREDENTIALS

v03_pipeline/lib/tasks/dataproc/create_dataproc_cluster.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
from pip._internal.operations import freeze as pip_freeze
77

88
from v03_pipeline.lib.logger import get_logger
9+
from v03_pipeline.lib.misc.gcp import get_service_account_credentials
910
from v03_pipeline.lib.model import Env, ReferenceGenome
1011
from v03_pipeline.lib.tasks.base.base_loading_pipeline_params import (
1112
BaseLoadingPipelineParams,
@@ -22,9 +23,11 @@
2223

2324

2425
def get_cluster_config(reference_genome: ReferenceGenome, run_id: str):
26+
service_account_credentials = get_service_account_credentials()
2527
return {
2628
'project_id': Env.GCLOUD_PROJECT,
2729
'cluster_name': f'{CLUSTER_NAME_PREFIX}-{reference_genome.value.lower()}-{run_id}',
30+
# Schema found at https://cloud.google.com/dataproc/docs/reference/rest/v1/ClusterConfig
2831
'config': {
2932
'gce_cluster_config': {
3033
'zone_uri': Env.GCLOUD_ZONE,
@@ -35,6 +38,8 @@ def get_cluster_config(reference_genome: ReferenceGenome, run_id: str):
3538
'REFERENCE_GENOME': reference_genome.value,
3639
'PIPELINE_RUNNER_APP_VERSION': Env.PIPELINE_RUNNER_APP_VERSION,
3740
},
41+
'service_account': service_account_credentials.service_account_email,
42+
'service_account_scopes': service_account_credentials.scopes,
3843
},
3944
'master_config': {
4045
'num_instances': 1,

v03_pipeline/lib/tasks/dataproc/create_dataproc_cluster_test.py

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,17 +5,29 @@
55
import google.api_core.exceptions
66
import luigi
77

8+
from v03_pipeline.lib.misc.gcp import SOCIAL_AUTH_GOOGLE_OAUTH2_SCOPE
89
from v03_pipeline.lib.model import DatasetType, ReferenceGenome
910
from v03_pipeline.lib.tasks.dataproc.create_dataproc_cluster import (
1011
CreateDataprocClusterTask,
1112
)
1213

1314

15+
@patch(
16+
'v03_pipeline.lib.tasks.dataproc.create_dataproc_cluster.get_service_account_credentials',
17+
return_value=SimpleNamespace(
18+
service_account_email='test@serviceaccount.com',
19+
scopes=SOCIAL_AUTH_GOOGLE_OAUTH2_SCOPE,
20+
),
21+
)
1422
@patch(
1523
'v03_pipeline.lib.tasks.dataproc.create_dataproc_cluster.dataproc.ClusterControllerClient',
1624
)
1725
class CreateDataprocClusterTaskTest(unittest.TestCase):
18-
def test_dataset_type_unsupported(self, mock_cluster_controller: Mock) -> None:
26+
def test_dataset_type_unsupported(
27+
self,
28+
mock_cluster_controller: Mock,
29+
_: Mock,
30+
) -> None:
1931
worker = luigi.worker.Worker()
2032
task = CreateDataprocClusterTask(
2133
reference_genome=ReferenceGenome.GRCh38,
@@ -29,6 +41,7 @@ def test_dataset_type_unsupported(self, mock_cluster_controller: Mock) -> None:
2941
def test_spinup_cluster_already_exists_failed(
3042
self,
3143
mock_cluster_controller: Mock,
44+
_: Mock,
3245
) -> None:
3346
mock_client = mock_cluster_controller.return_value
3447
mock_client.get_cluster.return_value = SimpleNamespace(
@@ -50,6 +63,7 @@ def test_spinup_cluster_already_exists_failed(
5063
def test_spinup_cluster_already_exists_success(
5164
self,
5265
mock_cluster_controller: Mock,
66+
_: Mock,
5367
) -> None:
5468
mock_client = mock_cluster_controller.return_value
5569
mock_client.get_cluster.return_value = SimpleNamespace(
@@ -73,6 +87,7 @@ def test_spinup_cluster_doesnt_exist_failed(
7387
self,
7488
mock_logger: Mock,
7589
mock_cluster_controller: Mock,
90+
_: Mock,
7691
) -> None:
7792
mock_client = mock_cluster_controller.return_value
7893
mock_client.get_cluster.side_effect = google.api_core.exceptions.NotFound(
@@ -98,6 +113,7 @@ def test_spinup_cluster_doesnt_exist_success(
98113
self,
99114
mock_logger: Mock,
100115
mock_cluster_controller: Mock,
116+
_: Mock,
101117
) -> None:
102118
mock_client = mock_cluster_controller.return_value
103119
mock_client.get_cluster.side_effect = google.api_core.exceptions.NotFound(

0 commit comments

Comments
 (0)