30
30
from google .auth .compute_engine import _metadata
31
31
from google .oauth2 import _client
32
32
33
+ _TRUST_BOUNDARY_LOOKUP_ENDPOINT = (
34
+ "https://iamcredentials.{}/v1/projects/-/serviceAccounts/{}/allowedLocations"
35
+ )
36
+
33
37
34
38
class Credentials (
35
39
credentials .Scoped ,
36
40
credentials .CredentialsWithQuotaProject ,
37
41
credentials .CredentialsWithUniverseDomain ,
42
+ credentials .CredentialsWithTrustBoundary ,
38
43
):
39
44
"""Compute Engine Credentials.
40
45
@@ -61,6 +66,7 @@ def __init__(
61
66
scopes = None ,
62
67
default_scopes = None ,
63
68
universe_domain = None ,
69
+ trust_boundary = None ,
64
70
):
65
71
"""
66
72
Args:
@@ -76,6 +82,7 @@ def __init__(
76
82
provided or None, credential will attempt to fetch the value
77
83
from metadata server. If metadata server doesn't have universe
78
84
domain endpoint, then the default googleapis.com will be used.
85
+ trust_boundary (Mapping[str,str]): A credential trust boundary.
79
86
"""
80
87
super (Credentials , self ).__init__ ()
81
88
self ._service_account_email = service_account_email
@@ -86,6 +93,7 @@ def __init__(
86
93
if universe_domain :
87
94
self ._universe_domain = universe_domain
88
95
self ._universe_domain_cached = True
96
+ self ._trust_boundary = trust_boundary
89
97
90
98
def _metric_header_for_usage (self ):
91
99
return metrics .CRED_TYPE_SA_MDS
@@ -111,6 +119,33 @@ def refresh(self, request):
111
119
new_exc = exceptions .RefreshError (caught_exc )
112
120
raise new_exc from caught_exc
113
121
122
+ self ._refresh_trust_boundary (request )
123
+
124
+ def _build_trust_boundary_lookup_url (self ):
125
+ """Builds and returns the URL for the trust boundary lookup API for GCE."""
126
+ # If the service account email is 'default', we need to get the
127
+ # actual email address from the metadata server.
128
+ if self ._service_account_email == "default" :
129
+ from google .auth .transport import requests as google_auth_requests
130
+
131
+ request = google_auth_requests .Request ()
132
+ try :
133
+ info = _metadata .get_service_account_info (request , "default" )
134
+ # Cache the fetched email so we don't have to do this again.
135
+ self ._service_account_email = info ["email" ]
136
+
137
+ except exceptions .TransportError as e :
138
+ # If fetching the service account email fails due to a transport error,
139
+ # it means we cannot build the trust boundary lookup URL.
140
+ # Wrap this in a RefreshError so it's caught by _refresh_trust_boundary.
141
+ raise exceptions .RefreshError (
142
+ f"Failed to get service account email for trust boundary lookup: { e } "
143
+ ) from e
144
+
145
+ return _TRUST_BOUNDARY_LOOKUP_ENDPOINT .format (
146
+ self .universe_domain , self .service_account_email
147
+ )
148
+
114
149
@property
115
150
def service_account_email (self ):
116
151
"""The service account email.
@@ -152,6 +187,7 @@ def with_quota_project(self, quota_project_id):
152
187
quota_project_id = quota_project_id ,
153
188
scopes = self ._scopes ,
154
189
default_scopes = self ._default_scopes ,
190
+ trust_boundary = self ._trust_boundary ,
155
191
)
156
192
creds ._universe_domain = self ._universe_domain
157
193
creds ._universe_domain_cached = self ._universe_domain_cached
@@ -167,6 +203,7 @@ def with_scopes(self, scopes, default_scopes=None):
167
203
default_scopes = default_scopes ,
168
204
service_account_email = self ._service_account_email ,
169
205
quota_project_id = self ._quota_project_id ,
206
+ trust_boundary = self ._trust_boundary ,
170
207
)
171
208
creds ._universe_domain = self ._universe_domain
172
209
creds ._universe_domain_cached = self ._universe_domain_cached
@@ -179,9 +216,23 @@ def with_universe_domain(self, universe_domain):
179
216
default_scopes = self ._default_scopes ,
180
217
service_account_email = self ._service_account_email ,
181
218
quota_project_id = self ._quota_project_id ,
219
+ trust_boundary = self ._trust_boundary ,
182
220
universe_domain = universe_domain ,
183
221
)
184
222
223
+ @_helpers .copy_docstring (credentials .CredentialsWithTrustBoundary )
224
+ def with_trust_boundary (self , trust_boundary ):
225
+ creds = self .__class__ (
226
+ service_account_email = self ._service_account_email ,
227
+ quota_project_id = self ._quota_project_id ,
228
+ scopes = self ._scopes ,
229
+ default_scopes = self ._default_scopes ,
230
+ trust_boundary = trust_boundary ,
231
+ )
232
+ creds ._universe_domain = self ._universe_domain
233
+ creds ._universe_domain_cached = self ._universe_domain_cached
234
+ return creds
235
+
185
236
186
237
_DEFAULT_TOKEN_LIFETIME_SECS = 3600 # 1 hour in seconds
187
238
_DEFAULT_TOKEN_URI = "https://www.googleapis.com/oauth2/v4/token"
0 commit comments