Skip to content

Commit 61654c7

Browse files
committed
Fix failing unit test, and change acceptable values for the env variable
1 parent 929bd1f commit 61654c7

File tree

7 files changed

+140
-9
lines changed

7 files changed

+140
-9
lines changed

google/auth/_helpers.py

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import datetime
2020
from email.message import Message
2121
import hashlib
22+
import os
2223
import json
2324
import logging
2425
import sys
@@ -287,6 +288,45 @@ def unpadded_urlsafe_b64encode(value):
287288
return base64.urlsafe_b64encode(value).rstrip(b"=")
288289

289290

291+
def get_bool_from_env(variable_name, default=False):
292+
"""Gets a boolean value from an environment variable.
293+
294+
The environment variable is interpreted as a boolean with the following
295+
(case-insensitive) rules:
296+
- "true", "1" are considered true.
297+
- "false", "0" are considered false.
298+
299+
Args:
300+
variable_name (str): The name of the environment variable.
301+
default (bool): The default value if the environment variable is not
302+
set.
303+
304+
Returns:
305+
bool: The boolean value of the environment variable.
306+
307+
Raises:
308+
google.auth.exceptions.InvalidValue: If the environment variable is
309+
set to a value that can not be interpreted as a boolean.
310+
"""
311+
value = os.environ.get(variable_name)
312+
313+
if value is None:
314+
return default
315+
316+
value = value.lower()
317+
318+
if value in ("true", "1"):
319+
return True
320+
elif value in ("false", "0"):
321+
return False
322+
else:
323+
raise exceptions.InvalidValue(
324+
'Environment variable "{}" must be one of "true", "false", "1", or "0".'.format(
325+
variable_name
326+
)
327+
)
328+
329+
290330
def is_python_3():
291331
"""Check if the Python interpreter is Python 2 or 3.
292332

google/auth/credentials.py

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -368,16 +368,15 @@ def _lookup_trust_boundary(self, request):
368368
from google.oauth2 import _client
369369

370370
# Verify the trust boundary feature flag is enabled.
371-
if (
372-
os.getenv(environment_vars.GOOGLE_AUTH_TRUST_BOUNDARY_ENABLED, "").lower()
373-
!= "true"
371+
if not _helpers.get_bool_from_env(
372+
environment_vars.GOOGLE_AUTH_TRUST_BOUNDARY_ENABLED, default=False
374373
):
375374
# Skip the lookup and return early if it's not explicitly enabled.
376-
return
375+
return None
377376

378377
# Skip trust boundary flow for non-gdu universe domain.
379378
if self.universe_domain != DEFAULT_UNIVERSE_DOMAIN:
380-
return
379+
return None
381380

382381
url = self._build_trust_boundary_lookup_url()
383382
if not url:

google/auth/impersonated_credentials.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -439,7 +439,6 @@ def _make_copy(self):
439439

440440
@_helpers.copy_docstring(credentials.CredentialsWithTrustBoundary)
441441
def with_trust_boundary(self, trust_boundary):
442-
"""Returns a copy of these credentials with a modified trust boundary."""
443442
cred = self._make_copy()
444443
cred._trust_boundary = trust_boundary
445444
return cred

google/oauth2/service_account.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -388,7 +388,6 @@ def with_token_uri(self, token_uri):
388388

389389
@_helpers.copy_docstring(credentials.CredentialsWithTrustBoundary)
390390
def with_trust_boundary(self, trust_boundary):
391-
"""Returns a copy of these credentials with a modified trust boundary."""
392391
cred = self._make_copy()
393392
cred._trust_boundary = trust_boundary
394393
return cred

tests/oauth2/test_service_account.py

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -763,6 +763,52 @@ def test_refresh_trust_boundary_lookup_fails_no_cache(
763763
assert credentials._trust_boundary is None
764764
mock_lookup_trust_boundary.assert_called_once()
765765

766+
@mock.patch("google.oauth2._client.lookup_trust_boundary")
767+
@mock.patch("google.oauth2._client.jwt_grant", autospec=True)
768+
def test_refresh_trust_boundary_lookup_fails_with_cached_data(
769+
self, mock_jwt_grant, mock_lookup_trust_boundary
770+
):
771+
# Initial setup: Credentials with no trust boundary.
772+
credentials = self.make_credentials(trust_boundary=None)
773+
token = "token"
774+
mock_jwt_grant.return_value = (
775+
token,
776+
_helpers.utcnow() + datetime.timedelta(seconds=500),
777+
{},
778+
)
779+
request = mock.create_autospec(transport.Request, instance=True)
780+
781+
# First refresh: Successfully fetch a valid trust boundary.
782+
mock_lookup_trust_boundary.return_value = self.VALID_TRUST_BOUNDARY
783+
with mock.patch.dict(
784+
os.environ, {environment_vars.GOOGLE_AUTH_TRUST_BOUNDARY_ENABLED: "true"}
785+
):
786+
credentials.refresh(request)
787+
788+
assert credentials.valid
789+
assert credentials.token == token
790+
assert credentials._trust_boundary == self.VALID_TRUST_BOUNDARY
791+
mock_lookup_trust_boundary.assert_called_once_with(
792+
request, self.EXPECTED_TRUST_BOUNDARY_LOOKUP_URL_DEFAULT_UNIVERSE, mock.ANY
793+
)
794+
795+
# Second refresh: Mock lookup to fail, but expect cached data to be preserved.
796+
mock_lookup_trust_boundary.reset_mock()
797+
mock_lookup_trust_boundary.side_effect = exceptions.RefreshError(
798+
"Lookup failed"
799+
)
800+
801+
with mock.patch.dict(
802+
os.environ, {environment_vars.GOOGLE_AUTH_TRUST_BOUNDARY_ENABLED: "true"}
803+
):
804+
credentials.refresh(request) # This should NOT raise an exception
805+
806+
assert credentials.valid # Credentials should still be valid
807+
assert (
808+
credentials._trust_boundary == self.VALID_TRUST_BOUNDARY
809+
) # Cached data should be preserved
810+
mock_lookup_trust_boundary.assert_called_once() # Lookup should have been attempted again
811+
766812

767813
class TestIDTokenCredentials(object):
768814
SERVICE_ACCOUNT_EMAIL = "service-account@example.com"

tests/test_credentials.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,8 @@
1616

1717
import mock
1818
import pytest # type: ignore
19-
from google.auth import _client, exceptions
19+
from google.auth import exceptions
20+
from google.oauth2 import _client
2021

2122
from google.auth import _helpers
2223
from google.auth import credentials
@@ -430,7 +431,6 @@ def test_lookup_trust_boundary_build_url_returns_none(self, mock_lookup_tb):
430431
with mock.patch.dict(
431432
os.environ, {environment_vars.GOOGLE_AUTH_TRUST_BOUNDARY_ENABLED: "true"}
432433
):
433-
result = creds._lookup_trust_boundary(request)
434434
with pytest.raises(
435435
exceptions.InvalidValue,
436436
match="Failed to build trust boundary lookup URL.",

tests/test_impersonated_credentials.py

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -436,6 +436,54 @@ def test_refresh_starts_with_no_op_trust_boundary_skips_lookup(
436436
credentials.apply(headers_applied)
437437
assert headers_applied["x-allowed-locations"] == ""
438438

439+
@mock.patch("google.oauth2._client.lookup_trust_boundary")
440+
def test_refresh_trust_boundary_lookup_fails_with_cached_data2(
441+
self, mock_lookup_trust_boundary, mock_donor_credentials
442+
):
443+
# Start with no trust boundary
444+
credentials = self.make_credentials(lifetime=None, trust_boundary=None)
445+
token = "token"
446+
447+
expire_time = (
448+
_helpers.utcnow().replace(microsecond=0) + datetime.timedelta(seconds=500)
449+
).isoformat("T") + "Z"
450+
response_body = {"accessToken": token, "expireTime": expire_time}
451+
452+
request = self.make_request(
453+
data=json.dumps(response_body),
454+
status=http_client.OK,
455+
)
456+
457+
# First refresh: Successfully fetch a valid trust boundary.
458+
mock_lookup_trust_boundary.return_value = self.VALID_TRUST_BOUNDARY
459+
with mock.patch.dict(
460+
os.environ, {environment_vars.GOOGLE_AUTH_TRUST_BOUNDARY_ENABLED: "true"}
461+
), mock.patch(
462+
"google.auth.metrics.token_request_access_token_impersonate",
463+
return_value=ACCESS_TOKEN_REQUEST_METRICS_HEADER_VALUE,
464+
):
465+
credentials.refresh(request)
466+
467+
assert credentials.valid
468+
# Verify trust boundary was set.
469+
assert credentials._trust_boundary == self.VALID_TRUST_BOUNDARY
470+
mock_lookup_trust_boundary.assert_called_once()
471+
472+
# Second refresh: Mock lookup to fail, but expect cached data to be preserved.
473+
mock_lookup_trust_boundary.reset_mock()
474+
mock_lookup_trust_boundary.side_effect = exceptions.RefreshError(
475+
"Lookup failed"
476+
)
477+
478+
with mock.patch.dict(
479+
os.environ, {environment_vars.GOOGLE_AUTH_TRUST_BOUNDARY_ENABLED: "true"}
480+
):
481+
credentials.refresh(request)
482+
483+
assert credentials.valid
484+
assert credentials._trust_boundary == self.VALID_TRUST_BOUNDARY
485+
mock_lookup_trust_boundary.assert_called_once()
486+
439487
@pytest.mark.parametrize("use_data_bytes", [True, False])
440488
def test_refresh_with_subject_success(self, use_data_bytes, mock_dwd_credentials):
441489
credentials = self.make_credentials(subject="test@email.com", lifetime=None)

0 commit comments

Comments
 (0)