25
25
from google .auth import exceptions
26
26
from google .auth import metrics
27
27
from google .auth ._credentials_base import _BaseCredentials
28
+ from google .auth ._default import _LOGGER
28
29
from google .auth ._refresh_worker import RefreshThreadManager
29
30
30
31
DEFAULT_UNIVERSE_DOMAIN = "googleapis.com"
31
32
NO_OP_TRUST_BOUNDARY_LOCATIONS : "typing.Tuple[str]" = ()
32
33
NO_OP_TRUST_BOUNDARY_ENCODED_LOCATIONS = "0x0"
33
- TRUST_BOUNDARY_ENV_VAR = "GOOGLE_AUTH_TRUST_BOUNDARY_ENABLED"
34
34
35
35
36
36
class Credentials (_BaseCredentials ):
@@ -310,7 +310,16 @@ def apply(self, headers, token=None):
310
310
"""Apply the token to the authentication header."""
311
311
super ().apply (headers , token )
312
312
if self ._trust_boundary is not None :
313
- headers ["x-allowed-locations" ] = self ._trust_boundary ["encodedLocations" ]
313
+ if (
314
+ self ._trust_boundary ["encodedLocations" ]
315
+ == NO_OP_TRUST_BOUNDARY_ENCODED_LOCATIONS
316
+ ):
317
+ # STS expects an empty string if the trust boundary value is no-op.
318
+ headers ["x-allowed-locations" ] = ""
319
+ else :
320
+ headers ["x-allowed-locations" ] = self ._trust_boundary [
321
+ "encodedLocations"
322
+ ]
314
323
315
324
def _refresh_trust_boundary (self , request ):
316
325
"""Triggers a refresh of the trust boundary and updates the cache if necessary.
@@ -333,7 +342,11 @@ def _refresh_trust_boundary(self, request):
333
342
# If the call to the lookup API failed, check if there is a trust boundary
334
343
# already cached. If there is, do nothing. If not, then throw the error.
335
344
if self ._trust_boundary is None :
336
- raise (error )
345
+ raise error
346
+ if _helpers .is_logging_enabled (_LOGGER ):
347
+ _LOGGER .debug (
348
+ "Using cached trust boundary due to refresh error: %s" , error
349
+ )
337
350
return
338
351
else :
339
352
self ._trust_boundary = new_trust_boundary
@@ -353,9 +366,12 @@ def _lookup_trust_boundary(self, request):
353
366
retrieved.
354
367
"""
355
368
from google .oauth2 import _client
356
-
357
- # Verify the trust boundary feature flag is enabled.
358
- if os .getenv (TRUST_BOUNDARY_ENV_VAR , "" ).lower () != "true" :
369
+
370
+ # Verify the trust boundary feature flag is enabled.
371
+ if (
372
+ os .getenv (environment_vars .GOOGLE_AUTH_TRUST_BOUNDARY_ENABLED , "" ).lower ()
373
+ != "true"
374
+ ):
359
375
# Skip the lookup and return early if it's not explicitly enabled.
360
376
return
361
377
@@ -364,6 +380,8 @@ def _lookup_trust_boundary(self, request):
364
380
return
365
381
366
382
url = self ._build_trust_boundary_lookup_url ()
383
+ if not url :
384
+ raise exceptions .InvalidValue ("Failed to build trust boundary lookup URL." )
367
385
return _client .lookup_trust_boundary (request , url , self .token )
368
386
369
387
@abc .abstractmethod
@@ -378,22 +396,9 @@ def _build_trust_boundary_lookup_url(self):
378
396
str: The URL for the trust boundary lookup endpoint, or None
379
397
if lookup should be skipped (e.g., for non-applicable universe domains).
380
398
"""
381
- raise NotImplementedError ("_build_trust_boundary_lookup_url must be implemented" )
382
-
383
- @staticmethod
384
- def _parse_trust_boundary (trust_boundary_string : str ):
385
- try :
386
- trust_boundary = json .loads (trust_boundary_string )
387
- if (
388
- "locations" not in trust_boundary
389
- or "encodedLocations" not in trust_boundary
390
- ):
391
- raise exceptions .MalformedError
392
- return trust_boundary
393
- except Exception :
394
- raise exceptions .MalformedError (
395
- "Cannot parse trust boundary {}" .format (trust_boundary_string )
396
- )
399
+ raise NotImplementedError (
400
+ "_build_trust_boundary_lookup_url must be implemented"
401
+ )
397
402
398
403
def _has_no_op_trust_boundary (self ):
399
404
# A no-op trust boundary is indicated by encodedLocations being "0x0".
@@ -490,8 +495,7 @@ def default_scopes(self):
490
495
491
496
@abc .abstractproperty
492
497
def requires_scopes (self ):
493
- """True if these credentials require scopes to obtain an access token.
494
- """
498
+ """True if these credentials require scopes to obtain an access token."""
495
499
return False
496
500
497
501
def has_scopes (self , scopes ):
0 commit comments