Skip to content

Commit 5edd899

Browse files
committed
Add unit tests for gce trust boundary.
1 parent 61654c7 commit 5edd899

File tree

4 files changed

+421
-3
lines changed

4 files changed

+421
-3
lines changed

tests/compute_engine/test_credentials.py

Lines changed: 327 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,16 +13,19 @@
1313
# limitations under the License.
1414
import base64
1515
import datetime
16+
import os
1617

1718
import mock
1819
import pytest # type: ignore
1920
import responses # type: ignore
21+
from google.oauth2 import _client
2022

2123
from google.auth import _helpers
2224
from google.auth import exceptions
2325
from google.auth import jwt
2426
from google.auth import transport
2527
from google.auth.compute_engine import credentials
28+
from google.auth.compute_engine import _metadata
2629
from google.auth.transport import requests
2730

2831
SAMPLE_ID_TOKEN_EXP = 1584393400
@@ -49,6 +52,7 @@
4952
ID_TOKEN_REQUEST_METRICS_HEADER_VALUE = (
5053
"gl-python/3.7 auth/1.1 auth-request-type/it cred-type/mds"
5154
)
55+
from google.auth import environment_vars
5256

5357
FAKE_SERVICE_ACCOUNT_EMAIL = "foo@bar.com"
5458
FAKE_QUOTA_PROJECT_ID = "fake-quota-project"
@@ -60,6 +64,10 @@
6064
class TestCredentials(object):
6165
credentials = None
6266
credentials_with_all_fields = None
67+
VALID_TRUST_BOUNDARY = {"encodedLocations": "valid-encoded-locations"}
68+
NO_OP_TRUST_BOUNDARY = {"encodedLocations": ""}
69+
EXPECTED_TRUST_BOUNDARY_LOOKUP_URL_DEFAULT_UNIVERSE = "https://iamcredentials.googleapis.com/v1/projects/-/serviceAccounts/default/allowedLocations"
70+
ACCESS_TOKEN_REQUEST_METRICS_HEADER_VALUE = "gl-python/3.7 auth/1.1 auth-request-type/at cred-type/mds" # Adjust version if needed
6371

6472
@pytest.fixture(autouse=True)
6573
def credentials_fixture(self):
@@ -247,6 +255,323 @@ def test_user_provided_universe_domain(self, get_universe_domain):
247255
# domain endpoint.
248256
get_universe_domain.assert_not_called()
249257

258+
@mock.patch("google.oauth2._client.lookup_trust_boundary", autospec=True)
259+
@mock.patch("google.auth.compute_engine._metadata.get", autospec=True)
260+
def test_refresh_trust_boundary_lookup_skipped_if_env_var_not_true(
261+
self,
262+
mock_metadata_get,
263+
mock_lookup_tb,
264+
):
265+
creds = self.credentials
266+
request = mock.Mock()
267+
268+
mock_metadata_get.return_value = {
269+
"access_token": "mock_token",
270+
"expires_in": 3600,
271+
}
272+
273+
with mock.patch.dict(
274+
os.environ, {environment_vars.GOOGLE_AUTH_TRUST_BOUNDARY_ENABLED: "false"}
275+
):
276+
creds.refresh(request)
277+
278+
mock_lookup_tb.assert_not_called()
279+
assert creds._trust_boundary is None
280+
281+
@mock.patch("google.oauth2._client.lookup_trust_boundary", autospec=True)
282+
@mock.patch("google.auth.compute_engine._metadata.get", autospec=True)
283+
def test_refresh_trust_boundary_lookup_skipped_if_env_var_missing(
284+
self, mock_metadata_get, mock_lookup_tb
285+
):
286+
creds = self.credentials
287+
request = mock.Mock()
288+
289+
mock_metadata_get.return_value = {
290+
"access_token": "mock_token",
291+
"expires_in": 3600,
292+
}
293+
294+
with mock.patch.dict(os.environ, clear=True):
295+
creds.refresh(request)
296+
297+
mock_lookup_tb.assert_not_called()
298+
assert creds._trust_boundary is None
299+
300+
@mock.patch.object(_client, "lookup_trust_boundary", autospec=True)
301+
@mock.patch("google.auth.compute_engine._metadata.get", autospec=True)
302+
def test_refresh_trust_boundary_lookup_success(
303+
self, mock_metadata_get, mock_lookup_tb
304+
):
305+
mock_lookup_tb.return_value = {
306+
"locations": ["us-central1"],
307+
"encodedLocations": "0xABC",
308+
}
309+
creds = self.credentials
310+
request = mock.Mock()
311+
312+
# The first call to _metadata.get is for the token, the second for the
313+
# universe domain, and the third is to get service account info to
314+
# build the trust boundary URL.
315+
mock_metadata_get.side_effect = [
316+
{"access_token": "mock_token", "expires_in": 3600},
317+
"", # for universe_domain
318+
{"email": "resolved-email@example.com", "scopes": ["scope1"]},
319+
]
320+
321+
with mock.patch.dict(
322+
os.environ, {environment_vars.GOOGLE_AUTH_TRUST_BOUNDARY_ENABLED: "true"}
323+
):
324+
creds.refresh(request)
325+
326+
# Verify _metadata.get was called three times.
327+
assert mock_metadata_get.call_count == 3
328+
# Verify lookup_trust_boundary was called with correct URL and token
329+
mock_lookup_tb.assert_called_once_with(
330+
request,
331+
"https://iamcredentials.googleapis.com/v1/projects/-/serviceAccounts/resolved-email@example.com/allowedLocations",
332+
"mock_token",
333+
)
334+
# Verify trust boundary was set
335+
assert creds._trust_boundary == {
336+
"locations": ["us-central1"],
337+
"encodedLocations": "0xABC",
338+
}
339+
340+
# Verify x-allowed-locations header is set by apply()
341+
headers_applied = {}
342+
creds.apply(headers_applied)
343+
assert headers_applied["x-allowed-locations"] == "0xABC"
344+
345+
@mock.patch("google.auth.compute_engine._metadata.get", autospec=True)
346+
@mock.patch.object(_client, "lookup_trust_boundary", autospec=True)
347+
def test_refresh_trust_boundary_lookup_fails_no_cache(
348+
self, mock_lookup_tb, mock_metadata_get
349+
):
350+
mock_lookup_tb.side_effect = exceptions.RefreshError("Lookup failed")
351+
creds = self.credentials
352+
request = mock.Mock()
353+
354+
# Mock metadata calls for token, universe domain, and service account info
355+
mock_metadata_get.side_effect = [
356+
{"access_token": "mock_token", "expires_in": 3600},
357+
"", # for universe_domain
358+
{"email": "resolved-email@example.com", "scopes": ["scope1"]},
359+
]
360+
361+
with mock.patch.dict(
362+
os.environ, {environment_vars.GOOGLE_AUTH_TRUST_BOUNDARY_ENABLED: "true"}
363+
):
364+
with pytest.raises(exceptions.RefreshError, match="Lookup failed"):
365+
creds.refresh(request)
366+
367+
assert creds._trust_boundary is None
368+
assert mock_metadata_get.call_count == 3
369+
mock_lookup_tb.assert_called_once()
370+
371+
@mock.patch("google.auth.compute_engine._metadata.get", autospec=True)
372+
@mock.patch.object(_client, "lookup_trust_boundary", autospec=True)
373+
def test_refresh_trust_boundary_lookup_fails_with_cached_data(
374+
self, mock_lookup_tb, mock_metadata_get
375+
):
376+
# First refresh: Successfully fetch a valid trust boundary.
377+
mock_lookup_tb.return_value = {
378+
"locations": ["us-central1"],
379+
"encodedLocations": "0xABC",
380+
}
381+
mock_metadata_get.side_effect = [
382+
{"access_token": "mock_token_1", "expires_in": 3600},
383+
"", # for universe_domain
384+
{"email": "resolved-email@example.com", "scopes": ["scope1"]},
385+
]
386+
creds = self.credentials
387+
request = mock.Mock()
388+
389+
with mock.patch.dict(
390+
os.environ, {environment_vars.GOOGLE_AUTH_TRUST_BOUNDARY_ENABLED: "true"}
391+
):
392+
creds.refresh(request)
393+
394+
assert creds._trust_boundary == {
395+
"locations": ["us-central1"],
396+
"encodedLocations": "0xABC",
397+
}
398+
mock_lookup_tb.assert_called_once()
399+
400+
# Second refresh: Mock lookup to fail, but expect cached data to be preserved.
401+
mock_lookup_tb.reset_mock()
402+
mock_lookup_tb.side_effect = exceptions.RefreshError("Lookup failed")
403+
404+
with mock.patch.dict(
405+
os.environ, {environment_vars.GOOGLE_AUTH_TRUST_BOUNDARY_ENABLED: "true"}
406+
):
407+
# This refresh should not raise an error because a cached value exists.
408+
mock_metadata_get.reset_mock()
409+
mock_metadata_get.side_effect = [
410+
{"access_token": "mock_token_2", "expires_in": 3600},
411+
"", # for universe_domain
412+
{"email": "resolved-email@example.com", "scopes": ["scope1"]},
413+
]
414+
creds.refresh(request)
415+
416+
assert creds._trust_boundary == {
417+
"locations": ["us-central1"],
418+
"encodedLocations": "0xABC",
419+
}
420+
mock_lookup_tb.assert_called_once()
421+
422+
@mock.patch("google.auth.compute_engine._metadata.get", autospec=True)
423+
@mock.patch.object(_client, "lookup_trust_boundary", autospec=True)
424+
def test_refresh_fetches_no_op_trust_boundary(
425+
self, mock_lookup_tb, mock_metadata_get
426+
):
427+
mock_lookup_tb.return_value = {"locations": [], "encodedLocations": "0x0"}
428+
creds = self.credentials
429+
request = mock.Mock()
430+
431+
mock_metadata_get.side_effect = [
432+
{"access_token": "mock_token", "expires_in": 3600},
433+
"", # for universe_domain
434+
{"email": "resolved-email@example.com", "scopes": ["scope1"]},
435+
]
436+
437+
with mock.patch.dict(
438+
os.environ, {environment_vars.GOOGLE_AUTH_TRUST_BOUNDARY_ENABLED: "true"}
439+
):
440+
creds.refresh(request)
441+
442+
assert creds._trust_boundary == {"locations": [], "encodedLocations": "0x0"}
443+
assert mock_metadata_get.call_count == 3
444+
mock_lookup_tb.assert_called_once_with(
445+
request,
446+
"https://iamcredentials.googleapis.com/v1/projects/-/serviceAccounts/resolved-email@example.com/allowedLocations",
447+
"mock_token",
448+
)
449+
# Verify that an empty header was added.
450+
headers_applied = {}
451+
creds.apply(headers_applied)
452+
assert headers_applied["x-allowed-locations"] == ""
453+
454+
@mock.patch("google.auth.compute_engine._metadata.get", autospec=True)
455+
@mock.patch.object(_client, "lookup_trust_boundary", autospec=True)
456+
def test_refresh_starts_with_no_op_trust_boundary_skips_lookup(
457+
self, mock_lookup_tb, mock_metadata_get
458+
):
459+
creds = self.credentials
460+
creds._trust_boundary = {"locations": [], "encodedLocations": "0x0"}
461+
request = mock.Mock()
462+
463+
mock_metadata_get.return_value = {
464+
"access_token": "mock_token",
465+
"expires_in": 3600,
466+
}
467+
468+
with mock.patch.dict(
469+
os.environ, {environment_vars.GOOGLE_AUTH_TRUST_BOUNDARY_ENABLED: "true"}
470+
):
471+
creds.refresh(request)
472+
473+
# Verify trust boundary remained NO_OP
474+
assert creds._trust_boundary == {"locations": [], "encodedLocations": "0x0"}
475+
# Lookup should be skipped
476+
mock_lookup_tb.assert_not_called()
477+
# Only the token refresh metadata call should have happened.
478+
mock_metadata_get.assert_called_once()
479+
480+
# Verify that an empty header was added.
481+
headers_applied = {}
482+
creds.apply(headers_applied)
483+
assert headers_applied["x-allowed-locations"] == ""
484+
485+
@mock.patch(
486+
"google.auth.compute_engine._metadata.get_service_account_info", autospec=True
487+
)
488+
@mock.patch(
489+
"google.auth.compute_engine._metadata.get_universe_domain", autospec=True
490+
)
491+
def test_build_trust_boundary_lookup_url_default_email(
492+
self, mock_get_universe_domain, mock_get_service_account_info
493+
):
494+
# Test with default service account email, which needs resolution
495+
creds = self.credentials
496+
creds._service_account_email = "default"
497+
mock_get_service_account_info.return_value = {
498+
"email": "resolved-email@example.com"
499+
}
500+
mock_get_universe_domain.return_value = "googleapis.com"
501+
502+
url = creds._build_trust_boundary_lookup_url()
503+
504+
mock_get_service_account_info.assert_called_once_with(mock.ANY, "default")
505+
mock_get_universe_domain.assert_called_once_with(mock.ANY)
506+
assert (
507+
url
508+
== "https://iamcredentials.googleapis.com/v1/projects/-/serviceAccounts/resolved-email@example.com/allowedLocations"
509+
)
510+
511+
@mock.patch(
512+
"google.auth.compute_engine._metadata.get_service_account_info", autospec=True
513+
)
514+
@mock.patch(
515+
"google.auth.compute_engine._metadata.get_universe_domain", autospec=True
516+
)
517+
def test_build_trust_boundary_lookup_url_explicit_email(
518+
self, mock_get_universe_domain, mock_get_service_account_info
519+
):
520+
# Test with an explicit service account email, no resolution needed
521+
creds = self.credentials
522+
creds._service_account_email = FAKE_SERVICE_ACCOUNT_EMAIL
523+
mock_get_universe_domain.return_value = "googleapis.com"
524+
525+
url = creds._build_trust_boundary_lookup_url()
526+
527+
mock_get_service_account_info.assert_not_called()
528+
mock_get_universe_domain.assert_called_once_with(mock.ANY)
529+
assert (
530+
url
531+
== "https://iamcredentials.googleapis.com/v1/projects/-/serviceAccounts/foo@bar.com/allowedLocations"
532+
)
533+
534+
@mock.patch(
535+
"google.auth.compute_engine._metadata.get_service_account_info", autospec=True
536+
)
537+
@mock.patch(
538+
"google.auth.compute_engine._metadata.get_universe_domain", autospec=True
539+
)
540+
def test_build_trust_boundary_lookup_url_non_default_universe(
541+
self, mock_get_universe_domain, mock_get_service_account_info
542+
):
543+
# Test with a non-default universe domain
544+
creds = self.credentials_with_all_fields
545+
546+
url = creds._build_trust_boundary_lookup_url()
547+
548+
# Universe domain is cached and email is explicit, so no metadata calls needed.
549+
mock_get_service_account_info.assert_not_called()
550+
mock_get_universe_domain.assert_not_called()
551+
assert (
552+
url
553+
== "https://iamcredentials.fake-universe-domain/v1/projects/-/serviceAccounts/foo@bar.com/allowedLocations"
554+
)
555+
556+
@mock.patch(
557+
"google.auth.compute_engine._metadata.get_service_account_info", autospec=True
558+
)
559+
def test_build_trust_boundary_lookup_url_get_service_account_info_fails(
560+
self, mock_get_service_account_info
561+
):
562+
# Test scenario where get_service_account_info fails
563+
mock_get_service_account_info.side_effect = exceptions.TransportError(
564+
"Failed to get info"
565+
)
566+
creds = self.credentials
567+
creds._service_account_email = "default"
568+
569+
with pytest.raises(
570+
exceptions.RefreshError,
571+
match="Failed to get service account email for trust boundary lookup: Failed to get info",
572+
):
573+
creds._build_trust_boundary_lookup_url()
574+
250575

251576
class TestIDTokenCredentials(object):
252577
credentials = None
@@ -433,7 +758,7 @@ def test_with_target_audience(self, sign, get, utcnow):
433758

434759
@responses.activate
435760
def test_with_target_audience_integration(self):
436-
""" Test that it is possible to refresh credentials
761+
"""Test that it is possible to refresh credentials
437762
generated from `with_target_audience`.
438763
439764
Instead of mocking the methods, the HTTP responses
@@ -587,7 +912,7 @@ def test_with_token_uri_exception(self, sign, get, utcnow):
587912

588913
@responses.activate
589914
def test_with_quota_project_integration(self):
590-
""" Test that it is possible to refresh credentials
915+
"""Test that it is possible to refresh credentials
591916
generated from `with_quota_project`.
592917
593918
Instead of mocking the methods, the HTTP responses

0 commit comments

Comments
 (0)