diff --git a/google/auth/_helpers.py b/google/auth/_helpers.py index 78fe22f72..f3f0a5c79 100644 --- a/google/auth/_helpers.py +++ b/google/auth/_helpers.py @@ -19,6 +19,7 @@ import datetime from email.message import Message import hashlib +import os import json import logging import sys @@ -287,6 +288,46 @@ def unpadded_urlsafe_b64encode(value): return base64.urlsafe_b64encode(value).rstrip(b"=") +def get_bool_from_env(variable_name, default=False): + """Gets a boolean value from an environment variable. + + The environment variable is interpreted as a boolean with the following + (case-insensitive) rules: + - "true", "1" are considered true. + - "false", "0" are considered false. + Any other values will raise an exception. + + Args: + variable_name (str): The name of the environment variable. + default (bool): The default value if the environment variable is not + set. + + Returns: + bool: The boolean value of the environment variable. + + Raises: + google.auth.exceptions.InvalidValue: If the environment variable is + set to a value that can not be interpreted as a boolean. + """ + value = os.environ.get(variable_name) + + if value is None: + return default + + value = value.lower() + + if value in ("true", "1"): + return True + elif value in ("false", "0"): + return False + else: + raise exceptions.InvalidValue( + 'Environment variable "{}" must be one of "true", "false", "1", or "0".'.format( + variable_name + ) + ) + + def is_python_3(): """Check if the Python interpreter is Python 2 or 3. diff --git a/google/auth/compute_engine/credentials.py b/google/auth/compute_engine/credentials.py index 74f12e7cc..45337fb06 100644 --- a/google/auth/compute_engine/credentials.py +++ b/google/auth/compute_engine/credentials.py @@ -30,11 +30,16 @@ from google.auth.compute_engine import _metadata from google.oauth2 import _client +_TRUST_BOUNDARY_LOOKUP_ENDPOINT = ( + "https://iamcredentials.{}/v1/projects/-/serviceAccounts/{}/allowedLocations" +) + class Credentials( credentials.Scoped, credentials.CredentialsWithQuotaProject, credentials.CredentialsWithUniverseDomain, + credentials.CredentialsWithTrustBoundary, ): """Compute Engine Credentials. @@ -61,6 +66,7 @@ def __init__( scopes=None, default_scopes=None, universe_domain=None, + trust_boundary=None, ): """ Args: @@ -76,6 +82,7 @@ def __init__( provided or None, credential will attempt to fetch the value from metadata server. If metadata server doesn't have universe domain endpoint, then the default googleapis.com will be used. + trust_boundary (Mapping[str,str]): A credential trust boundary. """ super(Credentials, self).__init__() self._service_account_email = service_account_email @@ -86,11 +93,12 @@ def __init__( if universe_domain: self._universe_domain = universe_domain self._universe_domain_cached = True + self._trust_boundary = trust_boundary def _metric_header_for_usage(self): return metrics.CRED_TYPE_SA_MDS - def refresh(self, request): + def _refresh_token(self, request): """Refresh the access token and scopes. Args: @@ -111,6 +119,31 @@ def refresh(self, request): new_exc = exceptions.RefreshError(caught_exc) raise new_exc from caught_exc + def _build_trust_boundary_lookup_url(self): + """Builds and returns the URL for the trust boundary lookup API for GCE.""" + # If the service account email is 'default', we need to get the + # actual email address from the metadata server. + if self._service_account_email == "default": + from google.auth.transport import requests as google_auth_requests + + request = google_auth_requests.Request() + try: + info = _metadata.get_service_account_info(request, "default") + # Cache the fetched email so we don't have to do this again. + self._service_account_email = info["email"] + + except exceptions.TransportError as e: + # If fetching the service account email fails due to a transport error, + # it means we cannot build the trust boundary lookup URL. + # Wrap this in a RefreshError so it's caught by _refresh_trust_boundary. + raise exceptions.RefreshError( + f"Failed to get service account email for trust boundary lookup: {e}" + ) from e + + return _TRUST_BOUNDARY_LOOKUP_ENDPOINT.format( + self.universe_domain, self.service_account_email + ) + @property def service_account_email(self): """The service account email. @@ -152,8 +185,9 @@ def with_quota_project(self, quota_project_id): quota_project_id=quota_project_id, scopes=self._scopes, default_scopes=self._default_scopes, + universe_domain=self._universe_domain, + trust_boundary=self._trust_boundary, ) - creds._universe_domain = self._universe_domain creds._universe_domain_cached = self._universe_domain_cached return creds @@ -167,8 +201,9 @@ def with_scopes(self, scopes, default_scopes=None): default_scopes=default_scopes, service_account_email=self._service_account_email, quota_project_id=self._quota_project_id, + universe_domain=self._universe_domain, + trust_boundary=self._trust_boundary, ) - creds._universe_domain = self._universe_domain creds._universe_domain_cached = self._universe_domain_cached return creds @@ -179,9 +214,23 @@ def with_universe_domain(self, universe_domain): default_scopes=self._default_scopes, service_account_email=self._service_account_email, quota_project_id=self._quota_project_id, + trust_boundary=self._trust_boundary, universe_domain=universe_domain, ) + @_helpers.copy_docstring(credentials.CredentialsWithTrustBoundary) + def with_trust_boundary(self, trust_boundary): + creds = self.__class__( + service_account_email=self._service_account_email, + quota_project_id=self._quota_project_id, + scopes=self._scopes, + default_scopes=self._default_scopes, + universe_domain=self._universe_domain, + trust_boundary=trust_boundary, + ) + creds._universe_domain_cached = self._universe_domain_cached + return creds + _DEFAULT_TOKEN_LIFETIME_SECS = 3600 # 1 hour in seconds _DEFAULT_TOKEN_URI = "https://www.googleapis.com/oauth2/v4/token" diff --git a/google/auth/credentials.py b/google/auth/credentials.py index 2c67e0443..734bdeb29 100644 --- a/google/auth/credentials.py +++ b/google/auth/credentials.py @@ -17,15 +17,20 @@ import abc from enum import Enum +import json import os +import typing from google.auth import _helpers, environment_vars from google.auth import exceptions from google.auth import metrics from google.auth._credentials_base import _BaseCredentials +from google.auth._default import _LOGGER from google.auth._refresh_worker import RefreshThreadManager DEFAULT_UNIVERSE_DOMAIN = "googleapis.com" +NO_OP_TRUST_BOUNDARY_LOCATIONS: "typing.Tuple[str]" = () +NO_OP_TRUST_BOUNDARY_ENCODED_LOCATIONS = "0x0" class Credentials(_BaseCredentials): @@ -178,22 +183,7 @@ def apply(self, headers, token=None): token (Optional[str]): If specified, overrides the current access token. """ - self._apply(headers, token=token) - """Trust boundary value will be a cached value from global lookup. - - The response of trust boundary will be a list of regions and a hex - encoded representation. - - An example of global lookup response: - { - "locations": [ - "us-central1", "us-east1", "europe-west1", "asia-east1" - ] - "encoded_locations": "0xA30" - } - """ - if self._trust_boundary is not None: - headers["x-allowed-locations"] = self._trust_boundary["encoded_locations"] + self._apply(headers, token) if self.quota_project_id: headers["x-goog-user-project"] = self.quota_project_id @@ -299,6 +289,161 @@ def with_universe_domain(self, universe_domain): ) +class CredentialsWithTrustBoundary(Credentials): + """Abstract base for credentials supporting ``with_trust_boundary`` factory""" + + @abc.abstractmethod + def _refresh_token(self, request): + """Refreshes the access token. + + Args: + request (google.auth.transport.Request): The object used to make + HTTP requests. + + Raises: + google.auth.exceptions.RefreshError: If the credentials could + not be refreshed. + """ + raise NotImplementedError("_refresh_token must be implemented") + + def with_trust_boundary(self, trust_boundary): + """Returns a copy of these credentials with a modified trust boundary. + + Args: + trust_boundary Mapping[str, str]: The trust boundary to use for the + credential. This should be a map with a "locations" key that maps to + a list of GCP regions, and a "encodedLocations" key that maps to a + hex string. + + Returns: + google.auth.credentials.Credentials: A new credentials instance. + """ + raise NotImplementedError("This credential does not support trust boundaries.") + + def _is_trust_boundary_lookup_required(self): + """Checks if a trust boundary lookup is required. + + A lookup is required if the feature is enabled via an environment + variable, the universe domain is supported, and a no-op boundary + is not already cached. + + Returns: + bool: True if a trust boundary lookup is required, False otherwise. + """ + # 1. Check if the feature is enabled via environment variable. + if not _helpers.get_bool_from_env( + environment_vars.GOOGLE_AUTH_TRUST_BOUNDARY_ENABLED, default=False + ): + return False + + # 2. Skip trust boundary flow for non-default universe domains. + if self.universe_domain != DEFAULT_UNIVERSE_DOMAIN: + return False + + # 3. Do not trigger refresh if credential has a cached no-op trust boundary. + return not self._has_no_op_trust_boundary() + + def _get_trust_boundary_header(self): + if self._trust_boundary is not None: + if self._has_no_op_trust_boundary(): + # STS expects an empty string if the trust boundary value is no-op. + return {"x-allowed-locations": ""} + else: + return {"x-allowed-locations": self._trust_boundary["encodedLocations"]} + return {} + + def apply(self, headers, token=None): + """Apply the token to the authentication header.""" + super().apply(headers, token) + headers.update(self._get_trust_boundary_header()) + + def refresh(self, request): + """Refreshes the access token and the trust boundary. + + This method calls the subclass's token refresh logic and then + refreshes the trust boundary if applicable. + """ + self._refresh_token(request) + self._refresh_trust_boundary(request) + + def _refresh_trust_boundary(self, request): + """Triggers a refresh of the trust boundary and updates the cache if necessary. + + Args: + request (google.auth.transport.Request): The object used to make + HTTP requests. + + Raises: + google.auth.exceptions.RefreshError: If the trust boundary could + not be refreshed and no cached value is available. + """ + if not self._is_trust_boundary_lookup_required(): + return + try: + self._trust_boundary = self._lookup_trust_boundary(request) + except exceptions.RefreshError as error: + # If the call to the lookup API failed, check if there is a trust boundary + # already cached. If there is, do nothing. If not, then throw the error. + if self._trust_boundary is None: + raise error + if _helpers.is_logging_enabled(_LOGGER): + _LOGGER.debug( + "Using cached trust boundary due to refresh error: %s", error + ) + return + + def _lookup_trust_boundary(self, request): + """Calls the trust boundary lookup API to refresh the trust boundary cache. + + Args: + request (google.auth.transport.Request): The object used to make + HTTP requests. + + Returns: + trust_boundary (dict): The trust boundary object returned by the lookup API. + + Raises: + google.auth.exceptions.RefreshError: If the trust boundary could not be + retrieved. + """ + from google.oauth2 import _client + + url = self._build_trust_boundary_lookup_url() + if not url: + raise exceptions.InvalidValue("Failed to build trust boundary lookup URL.") + + headers = {} + self.apply(headers) + return _client._lookup_trust_boundary(request, url, headers=headers) + + @abc.abstractmethod + def _build_trust_boundary_lookup_url(self): + """ + Builds and returns the URL for the trust boundary lookup API. + + This method should be implemented by subclasses to provide the + specific URL based on the credential type and its properties. + + Returns: + str: The URL for the trust boundary lookup endpoint, or None + if lookup should be skipped (e.g., for non-applicable universe domains). + """ + raise NotImplementedError( + "_build_trust_boundary_lookup_url must be implemented" + ) + + def _has_no_op_trust_boundary(self): + # A no-op trust boundary is indicated by encodedLocations being "0x0". + # The "locations" list may or may not be present as an empty list. + if ( + self._trust_boundary is not None + and self._trust_boundary["encodedLocations"] + == NO_OP_TRUST_BOUNDARY_ENCODED_LOCATIONS + ): + return True + return False + + class AnonymousCredentials(Credentials): """Credentials that do not provide any authentication information. @@ -382,8 +527,7 @@ def default_scopes(self): @abc.abstractproperty def requires_scopes(self): - """True if these credentials require scopes to obtain an access token. - """ + """True if these credentials require scopes to obtain an access token.""" return False def has_scopes(self, scopes): diff --git a/google/auth/environment_vars.py b/google/auth/environment_vars.py index 81f31571e..e5f3598e8 100644 --- a/google/auth/environment_vars.py +++ b/google/auth/environment_vars.py @@ -82,3 +82,7 @@ AWS_SESSION_TOKEN = "AWS_SESSION_TOKEN" AWS_REGION = "AWS_REGION" AWS_DEFAULT_REGION = "AWS_DEFAULT_REGION" + +GOOGLE_AUTH_TRUST_BOUNDARY_ENABLED = "GOOGLE_AUTH_TRUST_BOUNDARY_ENABLED" +"""Environment variable controlling whether to enable trust boundary feature. +The default value is false. Users have to explicitly set this value to true.""" diff --git a/google/auth/external_account.py b/google/auth/external_account.py index 161e6c50c..b72f4c20f 100644 --- a/google/auth/external_account.py +++ b/google/auth/external_account.py @@ -59,18 +59,18 @@ @dataclass class SupplierContext: """A context class that contains information about the requested third party credential that is passed - to AWS security credential and subject token suppliers. + to AWS security credential and subject token suppliers. - Attributes: - subject_token_type (str): The requested subject token type based on the Oauth2.0 token exchange spec. - Expected values include:: + Attributes: + subject_token_type (str): The requested subject token type based on the Oauth2.0 token exchange spec. + Expected values include:: - “urn:ietf:params:oauth:token-type:jwt” - “urn:ietf:params:oauth:token-type:id-token” - “urn:ietf:params:oauth:token-type:saml2” - “urn:ietf:params:aws:token-type:aws4_request” + “urn:ietf:params:oauth:token-type:jwt” + “urn:ietf:params:oauth:token-type:id-token” + “urn:ietf:params:oauth:token-type:saml2” + “urn:ietf:params:aws:token-type:aws4_request” - audience (str): The requested audience for the subject token. + audience (str): The requested audience for the subject token. """ subject_token_type: str diff --git a/google/auth/impersonated_credentials.py b/google/auth/impersonated_credentials.py index d49998cfb..4d8b72941 100644 --- a/google/auth/impersonated_credentials.py +++ b/google/auth/impersonated_credentials.py @@ -46,6 +46,9 @@ _DEFAULT_TOKEN_LIFETIME_SECS = 3600 # 1 hour in seconds _GOOGLE_OAUTH2_TOKEN_ENDPOINT = "https://oauth2.googleapis.com/token" +_TRUST_BOUNDARY_LOOKUP_ENDPOINT = ( + "https://iamcredentials.{}/v1/projects/-/serviceAccounts/{}/allowedLocations" +) _SOURCE_CREDENTIAL_AUTHORIZED_USER_TYPE = "authorized_user" _SOURCE_CREDENTIAL_SERVICE_ACCOUNT_TYPE = "service_account" @@ -117,7 +120,10 @@ def _make_iam_token_request( class Credentials( - credentials.Scoped, credentials.CredentialsWithQuotaProject, credentials.Signing + credentials.Scoped, + credentials.CredentialsWithQuotaProject, + credentials.Signing, + credentials.CredentialsWithTrustBoundary, ): """This module defines impersonated credentials which are essentially impersonated identities. @@ -190,6 +196,7 @@ def __init__( lifetime=_DEFAULT_TOKEN_LIFETIME_SECS, quota_project_id=None, iam_endpoint_override=None, + trust_boundary=None, ): """ Args: @@ -220,6 +227,7 @@ def __init__( subject (Optional[str]): sub field of a JWT. This field should only be set if you wish to impersonate as a user. This feature is useful when using domain wide delegation. + trust_boundary (Mapping[str,str]): A credential trust boundary. """ super(Credentials, self).__init__() @@ -251,15 +259,12 @@ def __init__( self._quota_project_id = quota_project_id self._iam_endpoint_override = iam_endpoint_override self._cred_file_path = None + self._trust_boundary = trust_boundary def _metric_header_for_usage(self): return metrics.CRED_TYPE_SA_IMPERSONATE - @_helpers.copy_docstring(credentials.Credentials) - def refresh(self, request): - self._update_token(request) - - def _update_token(self, request): + def _refresh_token(self, request): """Updates credentials with a new access_token representing the impersonated account. @@ -331,6 +336,28 @@ def _update_token(self, request): iam_endpoint_override=self._iam_endpoint_override, ) + def _build_trust_boundary_lookup_url(self): + """Builds and returns the URL for the trust boundary lookup API. + + This method constructs the specific URL for the IAM Credentials API's + `allowedLocations` endpoint, using the credential's universe domain + and service account email. + + Raises: + ValueError: If `self.service_account_email` is None or an empty + string, as it's required to form the URL. + + Returns: + str: The URL for the trust boundary lookup endpoint. + """ + if not self.service_account_email: + raise ValueError( + "Service account email is required to build the trust boundary lookup URL." + ) + return _TRUST_BOUNDARY_LOOKUP_ENDPOINT.format( + self.universe_domain, self.service_account_email + ) + def sign_bytes(self, message): from google.auth.transport.requests import AuthorizedSession @@ -400,10 +427,17 @@ def _make_copy(self): lifetime=self._lifetime, quota_project_id=self._quota_project_id, iam_endpoint_override=self._iam_endpoint_override, + trust_boundary=self._trust_boundary, ) cred._cred_file_path = self._cred_file_path return cred + @_helpers.copy_docstring(credentials.CredentialsWithTrustBoundary) + def with_trust_boundary(self, trust_boundary): + cred = self._make_copy() + cred._trust_boundary = trust_boundary + return cred + @_helpers.copy_docstring(credentials.CredentialsWithQuotaProject) def with_quota_project(self, quota_project_id): cred = self._make_copy() @@ -487,9 +521,7 @@ def from_impersonated_service_account_info(cls, info, scopes=None): class IDTokenCredentials(credentials.CredentialsWithQuotaProject): - """Open ID Connect ID Token-based service account credentials. - - """ + """Open ID Connect ID Token-based service account credentials.""" def __init__( self, diff --git a/google/oauth2/_client.py b/google/oauth2/_client.py index 5a9fc3503..44b41c093 100644 --- a/google/oauth2/_client.py +++ b/google/oauth2/_client.py @@ -256,15 +256,17 @@ def _token_endpoint_request( an error. """ - response_status_ok, response_data, retryable_error = _token_endpoint_request_no_throw( - request, - token_uri, - body, - access_token=access_token, - use_json=use_json, - can_retry=can_retry, - headers=headers, - **kwargs + response_status_ok, response_data, retryable_error = ( + _token_endpoint_request_no_throw( + request, + token_uri, + body, + access_token=access_token, + use_json=use_json, + can_retry=can_retry, + headers=headers, + **kwargs + ) ) if not response_status_ok: _handle_error_response(response_data, retryable_error) @@ -337,6 +339,8 @@ def call_iam_generate_id_token_endpoint( generateIdToken endpoint. audience (str): The audience for the ID token. access_token (str): The access token used to call the IAM endpoint. + universe_domain (str): The universe domain for the request. The + default is ``googleapis.com``. Returns: Tuple[str, datetime]: The ID token and expiration. @@ -506,3 +510,118 @@ def refresh_grant( request, token_uri, body, can_retry=can_retry ) return _handle_refresh_grant_response(response_data, refresh_token) + + +def _lookup_trust_boundary(request, url, headers=None): + """Implements the global lookup of a credential trust boundary. + For the lookup, we send a request to the global lookup endpoint and then + parse the response. Service account credentials, workload identity + pools and workforce pools implementation may have trust boundaries configured. + Args: + request (google.auth.transport.Request): A callable used to make + HTTP requests. + url (str): The trust boundary lookup url. + headers (Optional[Mapping[str, str]]): The headers for the request. + Returns: + Mapping[str,list|str]: A dictionary containing + "locations" as a list of allowed locations as strings and + "encodedLocations" as a hex string. + e.g: + { + "locations": [ + "us-central1", "us-east1", "europe-west1", "asia-east1" + ], + "encodedLocations": "0xA30" + } + If the credential is not set up with explicit trust boundaries, a trust boundary + of "all" will be returned as a default response. + { + "locations": [], + "encodedLocations": "0x0" + } + Raises: + exceptions.RefreshError: If the response status code is not 200. + exceptions.MalformedError: If the response is not in a valid format. + """ + + response_data = _lookup_trust_boundary_request(request, url, headers=headers) + # In case of no-op response, the "locations" list may or may not be present as an empty list. + if "encodedLocations" not in response_data: + raise exceptions.MalformedError( + "Invalid trust boundary info: {}".format(response_data) + ) + return response_data + + +def _lookup_trust_boundary_request(request, url, can_retry=True, headers=None): + """Makes a request to the trust boundary lookup endpoint. + + Args: + request (google.auth.transport.Request): A callable used to make + HTTP requests. + url (str): The trust boundary lookup url. + can_retry (bool): Enable or disable request retry behavior. Defaults to true. + headers (Optional[Mapping[str, str]]): The headers for the request. + + Returns: + Mapping[str, str]: The JSON-decoded response data. + + Raises: + google.auth.exceptions.RefreshError: If the token endpoint returned + an error. + """ + response_status_ok, response_data, retryable_error = ( + _lookup_trust_boundary_request_no_throw(request, url, can_retry, headers) + ) + if not response_status_ok: + _handle_error_response(response_data, retryable_error) + return response_data + + +def _lookup_trust_boundary_request_no_throw(request, url, can_retry=True, headers=None): + """Makes a request to the trust boundary lookup endpoint. This + function doesn't throw on response errors. + + Args: + request (google.auth.transport.Request): A callable used to make + HTTP requests. + url (str): The trust boundary lookup url. + can_retry (bool): Enable or disable request retry behavior. Defaults to true. + headers (Optional[Mapping[str, str]]): The headers for the request. + + Returns: + Tuple(bool, Mapping[str, str], Optional[bool]): A boolean indicating + if the request is successful, a mapping for the JSON-decoded response + data and in the case of an error a boolean indicating if the error + is retryable. + """ + + response_data = {} + retryable_error = False + + retries = _exponential_backoff.ExponentialBackoff() + for _ in retries: + response = request(method="GET", url=url, headers=headers) + response_body = ( + response.data.decode("utf-8") + if hasattr(response.data, "decode") + else response.data + ) + + try: + # response_body should be a JSON + response_data = json.loads(response_body) + except ValueError: + response_data = response_body + + if response.status == http_client.OK: + return True, response_data, None + + retryable_error = _can_retry( + status_code=response.status, response_data=response_data + ) + + if not can_retry or not retryable_error: + return False, response_data, retryable_error + + return False, response_data, retryable_error diff --git a/google/oauth2/service_account.py b/google/oauth2/service_account.py index 3e84194ac..30d1997fb 100644 --- a/google/oauth2/service_account.py +++ b/google/oauth2/service_account.py @@ -84,6 +84,9 @@ _DEFAULT_TOKEN_LIFETIME_SECS = 3600 # 1 hour in seconds _GOOGLE_OAUTH2_TOKEN_ENDPOINT = "https://oauth2.googleapis.com/token" +_TRUST_BOUNDARY_LOOKUP_ENDPOINT = ( + "https://iamcredentials.{}/v1/projects/-/serviceAccounts/{}/allowedLocations" +) class Credentials( @@ -91,6 +94,7 @@ class Credentials( credentials.Scoped, credentials.CredentialsWithQuotaProject, credentials.CredentialsWithTokenUri, + credentials.CredentialsWithTrustBoundary, ): """Service account credentials @@ -164,7 +168,7 @@ def __init__( universe_domain (str): The universe domain. The default universe domain is googleapis.com. For default value self signed jwt is used for token refresh. - trust_boundary (str): String representation of trust boundary meta. + trust_boundary (Mapping[str,str]): A credential trust boundary. .. note:: Typically one of the helper constructors :meth:`from_service_account_file` or @@ -194,7 +198,7 @@ def __init__( self._additional_claims = additional_claims else: self._additional_claims = {} - self._trust_boundary = {"locations": [], "encoded_locations": "0x0"} + self._trust_boundary = trust_boundary @classmethod def _from_signer_and_info(cls, signer, info, **kwargs): @@ -294,6 +298,7 @@ def _make_copy(self): additional_claims=self._additional_claims.copy(), always_use_jwt_access=self._always_use_jwt_access, universe_domain=self._universe_domain, + trust_boundary=self._trust_boundary, ) cred._cred_file_path = self._cred_file_path return cred @@ -381,6 +386,12 @@ def with_token_uri(self, token_uri): cred._token_uri = token_uri return cred + @_helpers.copy_docstring(credentials.CredentialsWithTrustBoundary) + def with_trust_boundary(self, trust_boundary): + cred = self._make_copy() + cred._trust_boundary = trust_boundary + return cred + def _make_authorization_grant_assertion(self): """Create the OAuth 2.0 assertion. @@ -424,8 +435,8 @@ def _metric_header_for_usage(self): return metrics.CRED_TYPE_SA_JWT return metrics.CRED_TYPE_SA_ASSERTION - @_helpers.copy_docstring(credentials.Credentials) - def refresh(self, request): + @_helpers.copy_docstring(credentials.CredentialsWithTrustBoundary) + def _refresh_token(self, request): if self._always_use_jwt_access and not self._jwt_credentials: # If self signed jwt should be used but jwt credential is not # created, try to create one with scopes @@ -491,6 +502,28 @@ def _create_self_signed_jwt(self, audience): self, audience ) + def _build_trust_boundary_lookup_url(self): + """Builds and returns the URL for the trust boundary lookup API. + + This method constructs the specific URL for the IAM Credentials API's + `allowedLocations` endpoint, using the credential's universe domain + and service account email. + + Raises: + ValueError: If `self.service_account_email` is None or an empty + string, as it's required to form the URL. + + Returns: + str: The URL for the trust boundary lookup endpoint. + """ + if not self.service_account_email: + raise ValueError( + "Service account email is required to build the trust boundary lookup URL." + ) + return _TRUST_BOUNDARY_LOOKUP_ENDPOINT.format( + self._universe_domain, self._service_account_email + ) + @_helpers.copy_docstring(credentials.Signing) def sign_bytes(self, message): return self._signer.sign(message) @@ -520,6 +553,7 @@ class IDTokenCredentials( credentials.Signing, credentials.CredentialsWithQuotaProject, credentials.CredentialsWithTokenUri, + credentials.CredentialsWithTrustBoundary, ): """Open ID Connect ID Token-based service account credentials. @@ -574,6 +608,7 @@ def __init__( additional_claims=None, quota_project_id=None, universe_domain=credentials.DEFAULT_UNIVERSE_DOMAIN, + trust_boundary=None, ): """ Args: @@ -591,6 +626,8 @@ def __init__( token endponint is used for token refresh. Note that iam.serviceAccountTokenCreator role is required to use the IAM endpoint. + trust_boundary (Mapping[str,str]): A credential trust boundary. + .. note:: Typically one of the helper constructors :meth:`from_service_account_file` or :meth:`from_service_account_info` are used instead of calling the @@ -603,6 +640,7 @@ def __init__( self._target_audience = target_audience self._quota_project_id = quota_project_id self._use_iam_endpoint = False + self._trust_boundary = trust_boundary if not universe_domain: self._universe_domain = credentials.DEFAULT_UNIVERSE_DOMAIN @@ -640,6 +678,8 @@ def _from_signer_and_info(cls, signer, info, **kwargs): kwargs.setdefault("token_uri", info["token_uri"]) if "universe_domain" in info: kwargs["universe_domain"] = info["universe_domain"] + if "trust_boundary" in info: + kwargs["trust_boundary"] = info["trust_boundary"] return cls(signer, **kwargs) @classmethod @@ -689,6 +729,7 @@ def _make_copy(self): additional_claims=self._additional_claims.copy(), quota_project_id=self.quota_project_id, universe_domain=self._universe_domain, + trust_boundary=self._trust_boundary, ) # _use_iam_endpoint is not exposed in the constructor cred._use_iam_endpoint = self._use_iam_endpoint @@ -750,6 +791,28 @@ def with_token_uri(self, token_uri): cred._token_uri = token_uri return cred + @_helpers.copy_docstring(credentials.CredentialsWithTrustBoundary) + def with_trust_boundary(self, trust_boundary): + cred = self._make_copy() + cred._trust_boundary = trust_boundary + return cred + + def _refresh_token(self, request): + """Not used by this class, which overrides refresh() directly.""" + # This is required to satisfy the abstract base class, but this + # class's refresh() method is called directly and does not use this. + pass + + def _build_trust_boundary_lookup_url(self): + """Builds and returns the URL for the trust boundary lookup API. + + This is not used by IDTokenCredentials as it does not perform trust + boundary lookups. It is defined here to satisfy the abstract base class. + """ + raise NotImplementedError( + "_build_trust_boundary_lookup_url is not implemented for IDTokenCredentials." + ) + def _make_authorization_grant_assertion(self): """Create the OAuth 2.0 assertion. @@ -806,6 +869,7 @@ def _refresh_with_iam_endpoint(self, request): additional_claims={"scope": "https://www.googleapis.com/auth/iam"}, ) jwt_credentials.refresh(request) + self.token, self.expiry = _client.call_iam_generate_id_token_endpoint( request, self._iam_id_token_endpoint, diff --git a/tests/compute_engine/test_credentials.py b/tests/compute_engine/test_credentials.py index 8485ece4b..704677298 100644 --- a/tests/compute_engine/test_credentials.py +++ b/tests/compute_engine/test_credentials.py @@ -13,16 +13,19 @@ # limitations under the License. import base64 import datetime +import os import mock import pytest # type: ignore import responses # type: ignore +from google.oauth2 import _client from google.auth import _helpers from google.auth import exceptions from google.auth import jwt from google.auth import transport from google.auth.compute_engine import credentials +from google.auth.compute_engine import _metadata from google.auth.transport import requests SAMPLE_ID_TOKEN_EXP = 1584393400 @@ -49,6 +52,7 @@ ID_TOKEN_REQUEST_METRICS_HEADER_VALUE = ( "gl-python/3.7 auth/1.1 auth-request-type/it cred-type/mds" ) +from google.auth import environment_vars FAKE_SERVICE_ACCOUNT_EMAIL = "foo@bar.com" FAKE_QUOTA_PROJECT_ID = "fake-quota-project" @@ -60,6 +64,10 @@ class TestCredentials(object): credentials = None credentials_with_all_fields = None + VALID_TRUST_BOUNDARY = {"encodedLocations": "valid-encoded-locations"} + NO_OP_TRUST_BOUNDARY = {"encodedLocations": ""} + EXPECTED_TRUST_BOUNDARY_LOOKUP_URL_DEFAULT_UNIVERSE = "https://iamcredentials.googleapis.com/v1/projects/-/serviceAccounts/default/allowedLocations" + ACCESS_TOKEN_REQUEST_METRICS_HEADER_VALUE = "gl-python/3.7 auth/1.1 auth-request-type/at cred-type/mds" # Adjust version if needed @pytest.fixture(autouse=True) def credentials_fixture(self): @@ -208,6 +216,18 @@ def test_with_universe_domain(self): assert creds.universe_domain == "universe_domain" assert creds._universe_domain_cached + def test_with_trust_boundary(self): + creds = self.credentials_with_all_fields + new_boundary = {"encodedLocations": "new_boundary"} + new_creds = creds.with_trust_boundary(new_boundary) + + assert new_creds is not creds + assert new_creds._trust_boundary == new_boundary + assert new_creds._service_account_email == creds._service_account_email + assert new_creds._quota_project_id == creds._quota_project_id + assert new_creds._scopes == creds._scopes + assert new_creds._default_scopes == creds._default_scopes + def test_token_usage_metrics(self): self.credentials.token = "token" self.credentials.expiry = None @@ -247,6 +267,325 @@ def test_user_provided_universe_domain(self, get_universe_domain): # domain endpoint. get_universe_domain.assert_not_called() + @mock.patch("google.oauth2._client._lookup_trust_boundary", autospec=True) + @mock.patch("google.auth.compute_engine._metadata.get", autospec=True) + def test_refresh_trust_boundary_lookup_skipped_if_env_var_not_true( + self, + mock_metadata_get, + mock_lookup_tb, + ): + creds = self.credentials + request = mock.Mock() + + mock_metadata_get.return_value = { + "access_token": "mock_token", + "expires_in": 3600, + } + + with mock.patch.dict( + os.environ, {environment_vars.GOOGLE_AUTH_TRUST_BOUNDARY_ENABLED: "false"} + ): + creds.refresh(request) + + mock_lookup_tb.assert_not_called() + assert creds._trust_boundary is None + + @mock.patch("google.oauth2._client._lookup_trust_boundary", autospec=True) + @mock.patch("google.auth.compute_engine._metadata.get", autospec=True) + def test_refresh_trust_boundary_lookup_skipped_if_env_var_missing( + self, mock_metadata_get, mock_lookup_tb + ): + creds = self.credentials + request = mock.Mock() + + mock_metadata_get.return_value = { + "access_token": "mock_token", + "expires_in": 3600, + } + + with mock.patch.dict(os.environ, clear=True): + creds.refresh(request) + + mock_lookup_tb.assert_not_called() + assert creds._trust_boundary is None + + @mock.patch.object(_client, "_lookup_trust_boundary", autospec=True) + @mock.patch("google.auth.compute_engine._metadata.get", autospec=True) + def test_refresh_trust_boundary_lookup_success( + self, mock_metadata_get, mock_lookup_tb + ): + mock_lookup_tb.return_value = { + "locations": ["us-central1"], + "encodedLocations": "0xABC", + } + creds = self.credentials + request = mock.Mock() + + # The first call to _metadata.get is for the token, the second for the + # universe domain, and the third is to get service account info to + # build the trust boundary URL. + mock_metadata_get.side_effect = [ + {"access_token": "mock_token", "expires_in": 3600}, + "", # for universe_domain + {"email": "resolved-email@example.com", "scopes": ["scope1"]}, + ] + + with mock.patch.dict( + os.environ, {environment_vars.GOOGLE_AUTH_TRUST_BOUNDARY_ENABLED: "true"} + ): + creds.refresh(request) + + # Verify _metadata.get was called three times. + assert mock_metadata_get.call_count == 3 + # Verify lookup_trust_boundary was called with correct URL and token + mock_lookup_tb.assert_called_once_with( + request, + "https://iamcredentials.googleapis.com/v1/projects/-/serviceAccounts/resolved-email@example.com/allowedLocations", + headers={"authorization": "Bearer mock_token"}, + ) + # Verify trust boundary was set + assert creds._trust_boundary == { + "locations": ["us-central1"], + "encodedLocations": "0xABC", + } + + # Verify x-allowed-locations header is set by apply() + headers_applied = {} + creds.apply(headers_applied) + assert headers_applied["x-allowed-locations"] == "0xABC" + + @mock.patch("google.auth.compute_engine._metadata.get", autospec=True) + @mock.patch.object(_client, "_lookup_trust_boundary", autospec=True) + def test_refresh_trust_boundary_lookup_fails_no_cache( + self, mock_lookup_tb, mock_metadata_get + ): + mock_lookup_tb.side_effect = exceptions.RefreshError("Lookup failed") + creds = self.credentials + request = mock.Mock() + + # Mock metadata calls for token, universe domain, and service account info + mock_metadata_get.side_effect = [ + {"access_token": "mock_token", "expires_in": 3600}, + "", # for universe_domain + {"email": "resolved-email@example.com", "scopes": ["scope1"]}, + ] + + with mock.patch.dict( + os.environ, {environment_vars.GOOGLE_AUTH_TRUST_BOUNDARY_ENABLED: "true"} + ): + with pytest.raises(exceptions.RefreshError, match="Lookup failed"): + creds.refresh(request) + + assert creds._trust_boundary is None + assert mock_metadata_get.call_count == 3 + mock_lookup_tb.assert_called_once() + + @mock.patch("google.auth.compute_engine._metadata.get", autospec=True) + @mock.patch.object(_client, "_lookup_trust_boundary", autospec=True) + def test_refresh_trust_boundary_lookup_fails_with_cached_data( + self, mock_lookup_tb, mock_metadata_get + ): + # First refresh: Successfully fetch a valid trust boundary. + mock_lookup_tb.return_value = { + "locations": ["us-central1"], + "encodedLocations": "0xABC", + } + mock_metadata_get.side_effect = [ + {"access_token": "mock_token_1", "expires_in": 3600}, + "", # for universe_domain + {"email": "resolved-email@example.com", "scopes": ["scope1"]}, + ] + creds = self.credentials + request = mock.Mock() + + with mock.patch.dict( + os.environ, {environment_vars.GOOGLE_AUTH_TRUST_BOUNDARY_ENABLED: "true"} + ): + creds.refresh(request) + + assert creds._trust_boundary == { + "locations": ["us-central1"], + "encodedLocations": "0xABC", + } + mock_lookup_tb.assert_called_once() + + # Second refresh: Mock lookup to fail, but expect cached data to be preserved. + mock_lookup_tb.reset_mock() + mock_lookup_tb.side_effect = exceptions.RefreshError("Lookup failed") + + with mock.patch.dict( + os.environ, {environment_vars.GOOGLE_AUTH_TRUST_BOUNDARY_ENABLED: "true"} + ): + # This refresh should not raise an error because a cached value exists. + mock_metadata_get.reset_mock() + mock_metadata_get.side_effect = [ + {"access_token": "mock_token_2", "expires_in": 3600}, + "", # for universe_domain + {"email": "resolved-email@example.com", "scopes": ["scope1"]}, + ] + creds.refresh(request) + + assert creds._trust_boundary == { + "locations": ["us-central1"], + "encodedLocations": "0xABC", + } + mock_lookup_tb.assert_called_once() + + @mock.patch("google.auth.compute_engine._metadata.get", autospec=True) + @mock.patch.object(_client, "_lookup_trust_boundary", autospec=True) + def test_refresh_fetches_no_op_trust_boundary( + self, mock_lookup_tb, mock_metadata_get + ): + mock_lookup_tb.return_value = {"locations": [], "encodedLocations": "0x0"} + creds = self.credentials + request = mock.Mock() + + mock_metadata_get.side_effect = [ + {"access_token": "mock_token", "expires_in": 3600}, + "", # for universe_domain + {"email": "resolved-email@example.com", "scopes": ["scope1"]}, + ] + + with mock.patch.dict( + os.environ, {environment_vars.GOOGLE_AUTH_TRUST_BOUNDARY_ENABLED: "true"} + ): + creds.refresh(request) + + assert creds._trust_boundary == {"locations": [], "encodedLocations": "0x0"} + assert mock_metadata_get.call_count == 3 + mock_lookup_tb.assert_called_once_with( + request, + "https://iamcredentials.googleapis.com/v1/projects/-/serviceAccounts/resolved-email@example.com/allowedLocations", + headers={"authorization": "Bearer mock_token"}, + ) + # Verify that an empty header was added. + headers_applied = {} + creds.apply(headers_applied) + assert headers_applied["x-allowed-locations"] == "" + + @mock.patch("google.auth.compute_engine._metadata.get", autospec=True) + @mock.patch.object(_client, "_lookup_trust_boundary", autospec=True) + def test_refresh_starts_with_no_op_trust_boundary_skips_lookup( + self, mock_lookup_tb, mock_metadata_get + ): + creds = self.credentials + # Use pre-cache universe domain to avoid an extra metadata call. + creds._universe_domain_cached = True + creds._trust_boundary = {"locations": [], "encodedLocations": "0x0"} + request = mock.Mock() + + mock_metadata_get.return_value = { + "access_token": "mock_token", + "expires_in": 3600, + } + + with mock.patch.dict( + os.environ, {environment_vars.GOOGLE_AUTH_TRUST_BOUNDARY_ENABLED: "true"} + ): + creds.refresh(request) + + # Verify trust boundary remained NO_OP + assert creds._trust_boundary == {"locations": [], "encodedLocations": "0x0"} + # Lookup should be skipped + mock_lookup_tb.assert_not_called() + # Only the token refresh metadata call should have happened. + mock_metadata_get.assert_called_once() + + # Verify that an empty header was added. + headers_applied = {} + creds.apply(headers_applied) + assert headers_applied["x-allowed-locations"] == "" + + @mock.patch( + "google.auth.compute_engine._metadata.get_service_account_info", autospec=True + ) + @mock.patch( + "google.auth.compute_engine._metadata.get_universe_domain", autospec=True + ) + def test_build_trust_boundary_lookup_url_default_email( + self, mock_get_universe_domain, mock_get_service_account_info + ): + # Test with default service account email, which needs resolution + creds = self.credentials + creds._service_account_email = "default" + mock_get_service_account_info.return_value = { + "email": "resolved-email@example.com" + } + mock_get_universe_domain.return_value = "googleapis.com" + + url = creds._build_trust_boundary_lookup_url() + + mock_get_service_account_info.assert_called_once_with(mock.ANY, "default") + mock_get_universe_domain.assert_called_once_with(mock.ANY) + assert ( + url + == "https://iamcredentials.googleapis.com/v1/projects/-/serviceAccounts/resolved-email@example.com/allowedLocations" + ) + + @mock.patch( + "google.auth.compute_engine._metadata.get_service_account_info", autospec=True + ) + @mock.patch( + "google.auth.compute_engine._metadata.get_universe_domain", autospec=True + ) + def test_build_trust_boundary_lookup_url_explicit_email( + self, mock_get_universe_domain, mock_get_service_account_info + ): + # Test with an explicit service account email, no resolution needed + creds = self.credentials + creds._service_account_email = FAKE_SERVICE_ACCOUNT_EMAIL + mock_get_universe_domain.return_value = "googleapis.com" + + url = creds._build_trust_boundary_lookup_url() + + mock_get_service_account_info.assert_not_called() + mock_get_universe_domain.assert_called_once_with(mock.ANY) + assert ( + url + == "https://iamcredentials.googleapis.com/v1/projects/-/serviceAccounts/foo@bar.com/allowedLocations" + ) + + @mock.patch( + "google.auth.compute_engine._metadata.get_service_account_info", autospec=True + ) + @mock.patch( + "google.auth.compute_engine._metadata.get_universe_domain", autospec=True + ) + def test_build_trust_boundary_lookup_url_non_default_universe( + self, mock_get_universe_domain, mock_get_service_account_info + ): + # Test with a non-default universe domain + creds = self.credentials_with_all_fields + + url = creds._build_trust_boundary_lookup_url() + + # Universe domain is cached and email is explicit, so no metadata calls needed. + mock_get_service_account_info.assert_not_called() + mock_get_universe_domain.assert_not_called() + assert ( + url + == "https://iamcredentials.fake-universe-domain/v1/projects/-/serviceAccounts/foo@bar.com/allowedLocations" + ) + + @mock.patch( + "google.auth.compute_engine._metadata.get_service_account_info", autospec=True + ) + def test_build_trust_boundary_lookup_url_get_service_account_info_fails( + self, mock_get_service_account_info + ): + # Test scenario where get_service_account_info fails + mock_get_service_account_info.side_effect = exceptions.TransportError( + "Failed to get info" + ) + creds = self.credentials + creds._service_account_email = "default" + + with pytest.raises( + exceptions.RefreshError, + match="Failed to get service account email for trust boundary lookup: Failed to get info", + ): + creds._build_trust_boundary_lookup_url() + class TestIDTokenCredentials(object): credentials = None @@ -433,7 +772,7 @@ def test_with_target_audience(self, sign, get, utcnow): @responses.activate def test_with_target_audience_integration(self): - """ Test that it is possible to refresh credentials + """Test that it is possible to refresh credentials generated from `with_target_audience`. Instead of mocking the methods, the HTTP responses @@ -587,7 +926,7 @@ def test_with_token_uri_exception(self, sign, get, utcnow): @responses.activate def test_with_quota_project_integration(self): - """ Test that it is possible to refresh credentials + """Test that it is possible to refresh credentials generated from `with_quota_project`. Instead of mocking the methods, the HTTP responses diff --git a/tests/oauth2/test__client.py b/tests/oauth2/test__client.py index 6a085729f..2f2255dd4 100644 --- a/tests/oauth2/test__client.py +++ b/tests/oauth2/test__client.py @@ -324,7 +324,7 @@ def test_call_iam_generate_id_token_endpoint(): "fake_email", "fake_audience", "fake_access_token", - "googleapis.com", + universe_domain="googleapis.com", ) assert ( @@ -630,3 +630,181 @@ def test__token_endpoint_request_no_throw_with_retry(can_retry): assert mock_request.call_count == 3 else: assert mock_request.call_count == 1 + + +def test_lookup_trust_boundary(): + response_data = { + "locations": ["us-central1", "us-east1"], + "encodedLocations": "0x80080000000000", + } + + mock_response = mock.create_autospec(transport.Response, instance=True) + mock_response.status = http_client.OK + mock_response.data = json.dumps(response_data).encode("utf-8") + + mock_request = mock.create_autospec(transport.Request) + mock_request.return_value = mock_response + + url = "http://example.com" + headers = {"Authorization": "Bearer access_token"} + response = _client._lookup_trust_boundary(mock_request, url, headers=headers) + + assert response["encodedLocations"] == "0x80080000000000" + assert response["locations"] == ["us-central1", "us-east1"] + + mock_request.assert_called_once_with(method="GET", url=url, headers=headers) + + +def test_lookup_trust_boundary_no_op_response_without_locations(): + response_data = {"encodedLocations": "0x0"} + + mock_response = mock.create_autospec(transport.Response, instance=True) + mock_response.status = http_client.OK + mock_response.data = json.dumps(response_data).encode("utf-8") + + mock_request = mock.create_autospec(transport.Request) + mock_request.return_value = mock_response + + url = "http://example.com" + headers = {"Authorization": "Bearer access_token"} + # for the response to be valid, we only need encodedLocations to be present. + response = _client._lookup_trust_boundary(mock_request, url, headers=headers) + assert response["encodedLocations"] == "0x0" + assert "locations" not in response + + mock_request.assert_called_once_with(method="GET", url=url, headers=headers) + + +def test_lookup_trust_boundary_no_op_response(): + response_data = {"locations": [], "encodedLocations": "0x0"} + + mock_response = mock.create_autospec(transport.Response, instance=True) + mock_response.status = http_client.OK + mock_response.data = json.dumps(response_data).encode("utf-8") + + mock_request = mock.create_autospec(transport.Request) + mock_request.return_value = mock_response + + url = "http://example.com" + headers = {"Authorization": "Bearer access_token"} + response = _client._lookup_trust_boundary(mock_request, url, headers=headers) + + assert response["encodedLocations"] == "0x0" + assert response["locations"] == [] + + mock_request.assert_called_once_with(method="GET", url=url, headers=headers) + + +def test_lookup_trust_boundary_error(): + mock_response = mock.create_autospec(transport.Response, instance=True) + mock_response.status = http_client.INTERNAL_SERVER_ERROR + mock_response.data = "this is an error message" + + mock_request = mock.create_autospec(transport.Request) + mock_request.return_value = mock_response + + url = "http://example.com" + headers = {"Authorization": "Bearer access_token"} + with pytest.raises(exceptions.RefreshError) as excinfo: + _client._lookup_trust_boundary(mock_request, url, headers=headers) + assert excinfo.match("this is an error message") + + mock_request.assert_called_with(method="GET", url=url, headers=headers) + + +def test_lookup_trust_boundary_missing_encoded_locations(): + response_data = {"locations": [], "bad_field": "0x0"} + + mock_response = mock.create_autospec(transport.Response, instance=True) + mock_response.status = http_client.OK + mock_response.data = json.dumps(response_data).encode("utf-8") + + mock_request = mock.create_autospec(transport.Request) + mock_request.return_value = mock_response + + url = "http://example.com" + headers = {"Authorization": "Bearer access_token"} + with pytest.raises(exceptions.MalformedError) as excinfo: + _client._lookup_trust_boundary(mock_request, url, headers=headers) + assert excinfo.match("Invalid trust boundary info") + + mock_request.assert_called_once_with(method="GET", url=url, headers=headers) + + +def test_lookup_trust_boundary_internal_failure_and_retry_failure_error(): + retryable_error = mock.create_autospec(transport.Response, instance=True) + retryable_error.status = http_client.BAD_REQUEST + retryable_error.data = json.dumps({"error_description": "internal_failure"}).encode( + "utf-8" + ) + + unretryable_error = mock.create_autospec(transport.Response, instance=True) + unretryable_error.status = http_client.BAD_REQUEST + unretryable_error.data = json.dumps({"error_description": "invalid_scope"}).encode( + "utf-8" + ) + + request = mock.create_autospec(transport.Request) + + request.side_effect = [retryable_error, retryable_error, unretryable_error] + headers = {"Authorization": "Bearer access_token"} + + with pytest.raises(exceptions.RefreshError): + _client._lookup_trust_boundary_request( + request, "http://example.com", headers=headers + ) + # request should be called three times. Two retryable errors and one + # unretryable error to break the retry loop. + assert request.call_count == 3 + for call in request.call_args_list: + assert call[1]["headers"] == headers + + +def test_lookup_trust_boundary_internal_failure_and_retry_succeeds(): + retryable_error = mock.create_autospec(transport.Response, instance=True) + retryable_error.status = http_client.BAD_REQUEST + retryable_error.data = json.dumps({"error_description": "internal_failure"}).encode( + "utf-8" + ) + + response_data = {"locations": [], "encodedLocations": "0x0"} + response = mock.create_autospec(transport.Response, instance=True) + response.status = http_client.OK + response.data = json.dumps(response_data).encode("utf-8") + + request = mock.create_autospec(transport.Request) + + headers = {"Authorization": "Bearer access_token"} + request.side_effect = [retryable_error, response] + + _ = _client._lookup_trust_boundary_request( + request, "http://example.com", headers=headers + ) + + assert request.call_count == 2 + for call in request.call_args_list: + assert call[1]["headers"] == headers + + +def test_lookup_trust_boundary_with_headers(): + response_data = { + "locations": ["us-central1", "us-east1"], + "encodedLocations": "0x80080000000000", + } + + mock_response = mock.create_autospec(transport.Response, instance=True) + mock_response.status = http_client.OK + mock_response.data = json.dumps(response_data).encode("utf-8") + + mock_request = mock.create_autospec(transport.Request) + mock_request.return_value = mock_response + headers = { + "Authorization": "Bearer access_token", + "x-test-header": "test-value", + } + + _client._lookup_trust_boundary(mock_request, "http://example.com", headers=headers) + + mock_request.assert_called_once_with( + method="GET", url="http://example.com", headers=headers + ) diff --git a/tests/oauth2/test_service_account.py b/tests/oauth2/test_service_account.py index 91a7d93e0..22389353c 100644 --- a/tests/oauth2/test_service_account.py +++ b/tests/oauth2/test_service_account.py @@ -24,7 +24,9 @@ from google.auth import exceptions from google.auth import iam from google.auth import jwt +from google.auth import environment_vars from google.auth import transport +from google.auth import credentials from google.auth.credentials import DEFAULT_UNIVERSE_DOMAIN from google.oauth2 import service_account @@ -58,14 +60,31 @@ class TestCredentials(object): SERVICE_ACCOUNT_EMAIL = "service-account@example.com" TOKEN_URI = "https://example.com/oauth2/token" + NO_OP_TRUST_BOUNDARY = { + "locations": credentials.NO_OP_TRUST_BOUNDARY_LOCATIONS, + "encodedLocations": credentials.NO_OP_TRUST_BOUNDARY_ENCODED_LOCATIONS, + } + VALID_TRUST_BOUNDARY = { + "locations": ["us-central1", "us-east1"], + "encodedLocations": "0xVALIDHEXSA", + } + EXPECTED_TRUST_BOUNDARY_LOOKUP_URL_DEFAULT_UNIVERSE = ( + "https://iamcredentials.googleapis.com/v1/projects/-" + "/serviceAccounts/service-account@example.com/allowedLocations" + ) @classmethod - def make_credentials(cls, universe_domain=DEFAULT_UNIVERSE_DOMAIN): + def make_credentials( + cls, + universe_domain=DEFAULT_UNIVERSE_DOMAIN, + trust_boundary=None, # Align with Credentials class default + ): return service_account.Credentials( SIGNER, cls.SERVICE_ACCOUNT_EMAIL, cls.TOKEN_URI, universe_domain=universe_domain, + trust_boundary=trust_boundary, ) def test_get_cred_info(self): @@ -251,6 +270,18 @@ def test__with_always_use_jwt_access_non_default_universe_domain(self): "always_use_jwt_access should be True for non-default universe domain" ) + def test_with_trust_boundary(self): + credentials = self.make_credentials() + new_boundary = {"encodedLocations": "new_boundary"} + new_credentials = credentials.with_trust_boundary(new_boundary) + + assert new_credentials is not credentials + assert new_credentials._trust_boundary == new_boundary + assert new_credentials._signer == credentials._signer + assert ( + new_credentials.service_account_email == credentials.service_account_email + ) + def test__make_authorization_grant_assertion(self): credentials = self.make_credentials() token = credentials._make_authorization_grant_assertion() @@ -495,10 +526,42 @@ def test_refresh_success(self, jwt_grant): # Check that the credentials have the token. assert credentials.token == token - # Check that the credentials are valid (have a token and are not - # expired) + # Check that the credentials are valid (have a token and are not expired). assert credentials.valid + # Trust boundary should be None since env var is not set and no initial + # boundary was provided. + assert credentials._trust_boundary is None + + @mock.patch("google.oauth2._client._lookup_trust_boundary") + @mock.patch("google.oauth2._client.jwt_grant", autospec=True) + def test_refresh_skips_trust_boundary_lookup_non_default_universe( + self, mock_jwt_grant, mock_lookup_trust_boundary + ): + # Create credentials with a non-default universe domain + credentials = self.make_credentials(universe_domain=FAKE_UNIVERSE_DOMAIN) + token = "token" + mock_jwt_grant.return_value = ( + token, + _helpers.utcnow() + datetime.timedelta(seconds=500), + {}, + ) + request = mock.create_autospec(transport.Request, instance=True) + + with mock.patch.dict( + os.environ, {environment_vars.GOOGLE_AUTH_TRUST_BOUNDARY_ENABLED: "true"} + ): + credentials.refresh(request) + + # Ensure jwt_grant was called (token refresh happened) + mock_jwt_grant.assert_called_once() + # Ensure trust boundary lookup was not called + mock_lookup_trust_boundary.assert_not_called() + # Verify that x-allowed-locations header is not set by apply() + headers_applied = {} + credentials.apply(headers_applied) + assert "x-allowed-locations" not in headers_applied + @mock.patch("google.oauth2._client.jwt_grant", autospec=True) def test_before_request_refreshes(self, jwt_grant): credentials = self.make_credentials() @@ -607,6 +670,208 @@ def test_refresh_non_gdu_domain_wide_delegation_not_supported(self): credentials.refresh(None) assert excinfo.match("domain wide delegation is not supported") + @mock.patch("google.oauth2._client._lookup_trust_boundary") + @mock.patch("google.oauth2._client.jwt_grant", autospec=True) + def test_refresh_success_with_valid_trust_boundary( + self, mock_jwt_grant, mock_lookup_trust_boundary + ): + # Start with no boundary. + credentials = self.make_credentials(trust_boundary=None) + token = "token" + mock_jwt_grant.return_value = ( + token, + _helpers.utcnow() + datetime.timedelta(seconds=500), + {}, + ) + request = mock.create_autospec(transport.Request, instance=True) + + # Mock the trust boundary lookup to return a valid boundary. + mock_lookup_trust_boundary.return_value = self.VALID_TRUST_BOUNDARY + + with mock.patch.dict( + os.environ, {environment_vars.GOOGLE_AUTH_TRUST_BOUNDARY_ENABLED: "true"} + ): + credentials.refresh(request) + + assert credentials.valid + assert credentials.token == token + + # Verify trust boundary was set. + assert credentials._trust_boundary == self.VALID_TRUST_BOUNDARY + + # Verify the mock was called with the correct URL. + mock_lookup_trust_boundary.assert_called_once_with( + request, + self.EXPECTED_TRUST_BOUNDARY_LOOKUP_URL_DEFAULT_UNIVERSE, + headers={"authorization": "Bearer token"}, + ) + + # Verify x-allowed-locations header is set correctly by apply(). + headers_applied = {} + credentials.apply(headers_applied) + assert ( + headers_applied["x-allowed-locations"] + == self.VALID_TRUST_BOUNDARY["encodedLocations"] + ) + + @mock.patch("google.oauth2._client._lookup_trust_boundary") + @mock.patch("google.oauth2._client.jwt_grant", autospec=True) + def test_refresh_fetches_no_op_trust_boundary( + self, mock_jwt_grant, mock_lookup_trust_boundary + ): + # Start with no trust boundary + credentials = self.make_credentials(trust_boundary=None) + token = "token" + mock_jwt_grant.return_value = ( + token, + _helpers.utcnow() + datetime.timedelta(seconds=500), + {}, + ) + request = mock.create_autospec(transport.Request, instance=True) + + mock_lookup_trust_boundary.return_value = self.NO_OP_TRUST_BOUNDARY + + with mock.patch.dict( + os.environ, {environment_vars.GOOGLE_AUTH_TRUST_BOUNDARY_ENABLED: "true"} + ): + credentials.refresh(request) + + assert credentials.valid + assert credentials.token == token + assert credentials._trust_boundary == self.NO_OP_TRUST_BOUNDARY + mock_lookup_trust_boundary.assert_called_once_with( + request, + self.EXPECTED_TRUST_BOUNDARY_LOOKUP_URL_DEFAULT_UNIVERSE, + headers={"authorization": "Bearer token"}, + ) + headers_applied = {} + credentials.apply(headers_applied) + assert headers_applied["x-allowed-locations"] == "" + + @mock.patch("google.oauth2._client._lookup_trust_boundary") + @mock.patch("google.oauth2._client.jwt_grant", autospec=True) + def test_refresh_starts_with_no_op_trust_boundary_skips_lookup( + self, mock_jwt_grant, mock_lookup_trust_boundary + ): + credentials = self.make_credentials( + trust_boundary=self.NO_OP_TRUST_BOUNDARY + ) # Start with NO_OP + token = "token" + mock_jwt_grant.return_value = ( + token, + _helpers.utcnow() + datetime.timedelta(seconds=500), + {}, + ) + request = mock.create_autospec(transport.Request, instance=True) + + with mock.patch.dict( + os.environ, {environment_vars.GOOGLE_AUTH_TRUST_BOUNDARY_ENABLED: "true"} + ): + credentials.refresh(request) + + assert credentials.valid + assert credentials.token == token + # Verify trust boundary remained NO_OP + assert credentials._trust_boundary == self.NO_OP_TRUST_BOUNDARY + + # Lookup should be skipped + mock_lookup_trust_boundary.assert_not_called() + + # Verify that an empty header was added. + headers_applied = {} + credentials.apply(headers_applied) + assert headers_applied["x-allowed-locations"] == "" + + @mock.patch("google.oauth2._client._lookup_trust_boundary") + @mock.patch("google.oauth2._client.jwt_grant", autospec=True) + def test_refresh_trust_boundary_lookup_fails_no_cache( + self, mock_jwt_grant, mock_lookup_trust_boundary + ): + # Start with no trust boundary + credentials = self.make_credentials(trust_boundary=None) + mock_lookup_trust_boundary.side_effect = exceptions.RefreshError( + "Lookup failed" + ) + mock_jwt_grant.return_value = ( + "mock_access_token", + _helpers.utcnow() + datetime.timedelta(seconds=3600), + {}, + ) + request = mock.create_autospec(transport.Request, instance=True) + + # Mock the trust boundary lookup to raise an error + with mock.patch.dict( + os.environ, {environment_vars.GOOGLE_AUTH_TRUST_BOUNDARY_ENABLED: "true"} + ), pytest.raises(exceptions.RefreshError, match="Lookup failed"): + credentials.refresh(request) + + assert credentials._trust_boundary is None + mock_lookup_trust_boundary.assert_called_once() + + @mock.patch("google.oauth2._client._lookup_trust_boundary") + @mock.patch("google.oauth2._client.jwt_grant", autospec=True) + def test_refresh_trust_boundary_lookup_fails_with_cached_data( + self, mock_jwt_grant, mock_lookup_trust_boundary + ): + # Initial setup: Credentials with no trust boundary. + credentials = self.make_credentials(trust_boundary=None) + token = "token" + mock_jwt_grant.return_value = ( + token, + _helpers.utcnow() + datetime.timedelta(seconds=500), + {}, + ) + request = mock.create_autospec(transport.Request, instance=True) + + # First refresh: Successfully fetch a valid trust boundary. + mock_lookup_trust_boundary.return_value = self.VALID_TRUST_BOUNDARY + with mock.patch.dict( + os.environ, {environment_vars.GOOGLE_AUTH_TRUST_BOUNDARY_ENABLED: "true"} + ): + credentials.refresh(request) + + assert credentials.valid + assert credentials.token == token + assert credentials._trust_boundary == self.VALID_TRUST_BOUNDARY + mock_lookup_trust_boundary.assert_called_once_with( + request, + self.EXPECTED_TRUST_BOUNDARY_LOOKUP_URL_DEFAULT_UNIVERSE, + headers={"authorization": "Bearer token"}, + ) + + # Second refresh: Mock lookup to fail, but expect cached data to be preserved. + mock_lookup_trust_boundary.reset_mock() + mock_lookup_trust_boundary.side_effect = exceptions.RefreshError( + "Lookup failed" + ) + + with mock.patch.dict( + os.environ, {environment_vars.GOOGLE_AUTH_TRUST_BOUNDARY_ENABLED: "true"} + ): + credentials.refresh(request) # This should NOT raise an exception + + assert credentials.valid # Credentials should still be valid + assert ( + credentials._trust_boundary == self.VALID_TRUST_BOUNDARY + ) # Cached data should be preserved + mock_lookup_trust_boundary.assert_called_once_with( + request, + self.EXPECTED_TRUST_BOUNDARY_LOOKUP_URL_DEFAULT_UNIVERSE, + headers={ + "authorization": "Bearer token", + "x-allowed-locations": self.VALID_TRUST_BOUNDARY["encodedLocations"], + }, + ) # Lookup should have been attempted again + + def test_build_trust_boundary_lookup_url_no_email(self): + credentials = self.make_credentials() + credentials._service_account_email = None + + with pytest.raises(ValueError) as excinfo: + credentials._build_trust_boundary_lookup_url() + + assert "Service account email is required" in str(excinfo.value) + class TestIDTokenCredentials(object): SERVICE_ACCOUNT_EMAIL = "service-account@example.com" diff --git a/tests/test__helpers.py b/tests/test__helpers.py index a4337c016..ce3ec11e2 100644 --- a/tests/test__helpers.py +++ b/tests/test__helpers.py @@ -20,7 +20,7 @@ import pytest # type: ignore -from google.auth import _helpers +from google.auth import _helpers, exceptions # _MOCK_BASE_LOGGER_NAME is the base logger namespace used for testing. _MOCK_BASE_LOGGER_NAME = "foogle" @@ -234,6 +234,33 @@ def test_unpadded_urlsafe_b64encode(): assert _helpers.unpadded_urlsafe_b64encode(case) == expected +def test_get_bool_from_env(monkeypatch): + # Test default value when environment variable is not set. + assert _helpers.get_bool_from_env("TEST_VAR") is False + assert _helpers.get_bool_from_env("TEST_VAR", default=True) is True + + # Test true values (case-insensitive) + for true_value in ("true", "True", "TRUE", "1"): + monkeypatch.setenv("TEST_VAR", true_value) + assert _helpers.get_bool_from_env("TEST_VAR") is True + + # Test false values (case-insensitive) + for false_value in ("false", "False", "FALSE", "0"): + monkeypatch.setenv("TEST_VAR", false_value) + assert _helpers.get_bool_from_env("TEST_VAR") is False + + # Test invalid value + monkeypatch.setenv("TEST_VAR", "invalid_value") + with pytest.raises(exceptions.InvalidValue) as excinfo: + _helpers.get_bool_from_env("TEST_VAR") + assert 'must be one of "true", "false", "1", or "0"' in str(excinfo.value) + + # Test empty string value + monkeypatch.setenv("TEST_VAR", "") + with pytest.raises(exceptions.InvalidValue): + _helpers.get_bool_from_env("TEST_VAR") + + def test_hash_sensitive_info_basic(): test_data = { "expires_in": 3599, diff --git a/tests/test_aws.py b/tests/test_aws.py index df1f02e7d..1e5fa5e4b 100644 --- a/tests/test_aws.py +++ b/tests/test_aws.py @@ -42,8 +42,10 @@ SERVICE_ACCOUNT_IMPERSONATION_URL_BASE = ( "https://us-east1-iamcredentials.googleapis.com" ) -SERVICE_ACCOUNT_IMPERSONATION_URL_ROUTE = "/v1/projects/-/serviceAccounts/{}:generateAccessToken".format( - SERVICE_ACCOUNT_EMAIL +SERVICE_ACCOUNT_IMPERSONATION_URL_ROUTE = ( + "/v1/projects/-/serviceAccounts/{}:generateAccessToken".format( + SERVICE_ACCOUNT_EMAIL + ) ) SERVICE_ACCOUNT_IMPERSONATION_URL = ( SERVICE_ACCOUNT_IMPERSONATION_URL_BASE + SERVICE_ACCOUNT_IMPERSONATION_URL_ROUTE @@ -920,7 +922,7 @@ def assert_token_request_kwargs( assert request_kwargs["body"] is not None body_tuples = urllib.parse.parse_qsl(request_kwargs["body"]) assert len(body_tuples) == len(request_data.keys()) - for (k, v) in body_tuples: + for k, v in body_tuples: assert v.decode("utf-8") == request_data[k.decode("utf-8")] @classmethod @@ -1344,9 +1346,9 @@ def test_retrieve_subject_token_success_temp_creds_no_environment_vars_idmsv2( imdsv2_session_token_data=self.AWS_IMDSV2_SESSION_TOKEN, ) credential_source_token_url = self.CREDENTIAL_SOURCE.copy() - credential_source_token_url[ - "imdsv2_session_token_url" - ] = IMDSV2_SESSION_TOKEN_URL + credential_source_token_url["imdsv2_session_token_url"] = ( + IMDSV2_SESSION_TOKEN_URL + ) credentials = self.make_credentials( credential_source=credential_source_token_url ) @@ -1452,9 +1454,9 @@ def test_retrieve_subject_token_success_temp_creds_environment_vars_missing_secr imdsv2_session_token_data=self.AWS_IMDSV2_SESSION_TOKEN, ) credential_source_token_url = self.CREDENTIAL_SOURCE.copy() - credential_source_token_url[ - "imdsv2_session_token_url" - ] = IMDSV2_SESSION_TOKEN_URL + credential_source_token_url["imdsv2_session_token_url"] = ( + IMDSV2_SESSION_TOKEN_URL + ) credentials = self.make_credentials( credential_source=credential_source_token_url ) @@ -1509,9 +1511,9 @@ def test_retrieve_subject_token_success_temp_creds_environment_vars_missing_acce imdsv2_session_token_data=self.AWS_IMDSV2_SESSION_TOKEN, ) credential_source_token_url = self.CREDENTIAL_SOURCE.copy() - credential_source_token_url[ - "imdsv2_session_token_url" - ] = IMDSV2_SESSION_TOKEN_URL + credential_source_token_url["imdsv2_session_token_url"] = ( + IMDSV2_SESSION_TOKEN_URL + ) credentials = self.make_credentials( credential_source=credential_source_token_url ) @@ -1560,9 +1562,9 @@ def test_retrieve_subject_token_success_temp_creds_environment_vars_missing_cred imdsv2_session_token_data=self.AWS_IMDSV2_SESSION_TOKEN, ) credential_source_token_url = self.CREDENTIAL_SOURCE.copy() - credential_source_token_url[ - "imdsv2_session_token_url" - ] = IMDSV2_SESSION_TOKEN_URL + credential_source_token_url["imdsv2_session_token_url"] = ( + IMDSV2_SESSION_TOKEN_URL + ) credentials = self.make_credentials( credential_source=credential_source_token_url ) @@ -1611,9 +1613,9 @@ def test_retrieve_subject_token_success_temp_creds_idmsv2(self, utcnow): role_status=http_client.OK, role_name=self.AWS_ROLE ) credential_source_token_url = self.CREDENTIAL_SOURCE.copy() - credential_source_token_url[ - "imdsv2_session_token_url" - ] = IMDSV2_SESSION_TOKEN_URL + credential_source_token_url["imdsv2_session_token_url"] = ( + IMDSV2_SESSION_TOKEN_URL + ) credentials = self.make_credentials( credential_source=credential_source_token_url ) @@ -1692,9 +1694,9 @@ def test_retrieve_subject_token_session_error_idmsv2(self, utcnow): imdsv2_session_token_data="unauthorized", ) credential_source_token_url = self.CREDENTIAL_SOURCE.copy() - credential_source_token_url[ - "imdsv2_session_token_url" - ] = IMDSV2_SESSION_TOKEN_URL + credential_source_token_url["imdsv2_session_token_url"] = ( + IMDSV2_SESSION_TOKEN_URL + ) credentials = self.make_credentials( credential_source=credential_source_token_url ) @@ -2057,7 +2059,9 @@ def test_refresh_success_with_impersonation_ignore_default_scopes( "authorization": "Bearer {}".format(self.SUCCESS_RESPONSE["access_token"]), "x-goog-user-project": QUOTA_PROJECT_ID, "x-goog-api-client": IMPERSONATE_ACCESS_TOKEN_REQUEST_METRICS_HEADER_VALUE, - "x-allowed-locations": "0x0", + # TODO(negarb): Uncomment and update when trust boundary is supported + # for external account credentials. + # "x-allowed-locations": "0x0", } impersonation_request_data = { "delegates": None, @@ -2150,7 +2154,7 @@ def test_refresh_success_with_impersonation_use_default_scopes( "authorization": "Bearer {}".format(self.SUCCESS_RESPONSE["access_token"]), "x-goog-user-project": QUOTA_PROJECT_ID, "x-goog-api-client": IMPERSONATE_ACCESS_TOKEN_REQUEST_METRICS_HEADER_VALUE, - "x-allowed-locations": "0x0", + # "x-allowed-locations": "0x0", } impersonation_request_data = { "delegates": None, @@ -2345,7 +2349,7 @@ def test_refresh_success_with_supplier_with_impersonation( "authorization": "Bearer {}".format(self.SUCCESS_RESPONSE["access_token"]), "x-goog-user-project": QUOTA_PROJECT_ID, "x-goog-api-client": IMPERSONATE_ACCESS_TOKEN_REQUEST_METRICS_HEADER_VALUE, - "x-allowed-locations": "0x0", + # "x-allowed-locations": "0x0", } impersonation_request_data = { "delegates": None, diff --git a/tests/test_credentials.py b/tests/test_credentials.py index e11bcb4e5..d4ab9ac1b 100644 --- a/tests/test_credentials.py +++ b/tests/test_credentials.py @@ -16,13 +16,17 @@ import mock import pytest # type: ignore +from google.auth import exceptions +from google.oauth2 import _client from google.auth import _helpers from google.auth import credentials +from google.auth import environment_vars +import os -class CredentialsImpl(credentials.Credentials): - def refresh(self, request): +class CredentialsImpl(credentials.CredentialsWithTrustBoundary): + def _refresh_token(self, request): self.token = request self.expiry = ( datetime.datetime.utcnow() @@ -33,6 +37,10 @@ def refresh(self, request): def with_quota_project(self, quota_project_id): raise NotImplementedError() + def _build_trust_boundary_lookup_url(self): + # Using self.token here to make the URL dynamic for testing purposes + return "http://mock.url/lookup_for_{}".format(self.token) + class CredentialsImplWithMetrics(credentials.Credentials): def refresh(self, request): @@ -113,7 +121,7 @@ def test_before_request(): def test_before_request_with_trust_boundary(): DUMMY_BOUNDARY = "0xA30" credentials = CredentialsImpl() - credentials._trust_boundary = {"locations": [], "encoded_locations": DUMMY_BOUNDARY} + credentials._trust_boundary = {"locations": [], "encodedLocations": DUMMY_BOUNDARY} request = "token" headers = {} @@ -343,3 +351,94 @@ def test_token_state_no_expiry(): assert c.token_state == credentials.TokenState.FRESH c.before_request(request, "http://example.com", "GET", {}) + + +class TestCredentialsWithTrustBoundary(object): + @mock.patch.object(_client, "_lookup_trust_boundary") + def test_lookup_trust_boundary_env_var_not_true(self, mock_lookup_tb): + creds = CredentialsImpl() + request = mock.Mock() + + # Ensure env var is not "true" + with mock.patch.dict( + os.environ, {environment_vars.GOOGLE_AUTH_TRUST_BOUNDARY_ENABLED: "false"} + ): + result = creds._refresh_trust_boundary(request) + + assert result is None + mock_lookup_tb.assert_not_called() + + @mock.patch.object(_client, "_lookup_trust_boundary") + def test_lookup_trust_boundary_env_var_missing(self, mock_lookup_tb): + creds = CredentialsImpl() + request = mock.Mock() + + # Ensure env var is missing + with mock.patch.dict(os.environ, clear=True): + # Remove the var if it was set by other tests + if environment_vars.GOOGLE_AUTH_TRUST_BOUNDARY_ENABLED in os.environ: + del os.environ[environment_vars.GOOGLE_AUTH_TRUST_BOUNDARY_ENABLED] + result = creds._refresh_trust_boundary(request) + + assert result is None + mock_lookup_tb.assert_not_called() + + @mock.patch.object(_client, "_lookup_trust_boundary") + def test_lookup_trust_boundary_non_default_universe(self, mock_lookup_tb): + creds = CredentialsImpl() + creds._universe_domain = "my.universe.com" # Non-GDU + request = mock.Mock() + + with mock.patch.dict( + os.environ, {environment_vars.GOOGLE_AUTH_TRUST_BOUNDARY_ENABLED: "true"} + ): + result = creds._refresh_trust_boundary(request) + + assert result is None + mock_lookup_tb.assert_not_called() + + @mock.patch.object(_client, "_lookup_trust_boundary") + def test_lookup_trust_boundary_calls_client_and_build_url(self, mock_lookup_tb): + creds = CredentialsImpl() + creds.token = "test_token" # For _build_trust_boundary_lookup_url + request = mock.Mock() + expected_url = "http://mock.url/lookup_for_test_token" + expected_boundary_info = {"encodedLocations": "0xABC"} + mock_lookup_tb.return_value = expected_boundary_info + + # Mock _build_trust_boundary_lookup_url to ensure it's called. + mock_build_url = mock.Mock(return_value=expected_url) + creds._build_trust_boundary_lookup_url = mock_build_url + + with mock.patch.dict( + os.environ, {environment_vars.GOOGLE_AUTH_TRUST_BOUNDARY_ENABLED: "true"} + ): + result = creds._lookup_trust_boundary(request) + + assert result == expected_boundary_info + mock_build_url.assert_called_once() + expected_headers = {"authorization": "Bearer test_token"} + mock_lookup_tb.assert_called_once_with( + request, expected_url, headers=expected_headers + ) + + @mock.patch.object(_client, "_lookup_trust_boundary") + def test_lookup_trust_boundary_build_url_returns_none(self, mock_lookup_tb): + creds = CredentialsImpl() + request = mock.Mock() + + # Mock _build_trust_boundary_lookup_url to return None + mock_build_url = mock.Mock(return_value=None) + creds._build_trust_boundary_lookup_url = mock_build_url + + with mock.patch.dict( + os.environ, {environment_vars.GOOGLE_AUTH_TRUST_BOUNDARY_ENABLED: "true"} + ): + with pytest.raises( + exceptions.InvalidValue, + match="Failed to build trust boundary lookup URL.", + ): + creds._lookup_trust_boundary(request) + + mock_build_url.assert_called_once() # Ensure _build_trust_boundary_lookup_url was called + mock_lookup_tb.assert_not_called() # Ensure _client.lookup_trust_boundary was not called diff --git a/tests/test_external_account.py b/tests/test_external_account.py index bddcb4afa..d86a19bef 100644 --- a/tests/test_external_account.py +++ b/tests/test_external_account.py @@ -247,7 +247,7 @@ def assert_token_request_kwargs( assert "cert" not in request_kwargs assert request_kwargs["body"] is not None body_tuples = urllib.parse.parse_qsl(request_kwargs["body"]) - for (k, v) in body_tuples: + for k, v in body_tuples: assert v.decode("utf-8") == request_data[k.decode("utf-8")] assert len(body_tuples) == len(request_data.keys()) @@ -920,7 +920,9 @@ def test_refresh_impersonation_without_client_auth_success( "Content-Type": "application/json", "authorization": "Bearer {}".format(token_response["access_token"]), "x-goog-api-client": IMPERSONATE_ACCESS_TOKEN_REQUEST_METRICS_HEADER_VALUE, - "x-allowed-locations": "0x0", + # TODO(negarb): Uncomment and update when trust boundary is supported + # for external account credentials. + # "x-allowed-locations": "0x0", } impersonation_request_data = { "delegates": None, @@ -1010,7 +1012,7 @@ def test_refresh_impersonation_with_mtls_success( "Content-Type": "application/json", "authorization": "Bearer {}".format(token_response["access_token"]), "x-goog-api-client": IMPERSONATE_ACCESS_TOKEN_REQUEST_METRICS_HEADER_VALUE, - "x-allowed-locations": "0x0", + # "x-allowed-locations": "0x0", } impersonation_request_data = { "delegates": None, @@ -1097,7 +1099,7 @@ def test_refresh_workforce_impersonation_without_client_auth_success( "Content-Type": "application/json", "authorization": "Bearer {}".format(token_response["access_token"]), "x-goog-api-client": IMPERSONATE_ACCESS_TOKEN_REQUEST_METRICS_HEADER_VALUE, - "x-allowed-locations": "0x0", + # "x-allowed-locations": "0x0", } impersonation_request_data = { "delegates": None, @@ -1331,7 +1333,7 @@ def test_refresh_impersonation_with_client_auth_success_ignore_default_scopes( "Content-Type": "application/json", "authorization": "Bearer {}".format(token_response["access_token"]), "x-goog-api-client": IMPERSONATE_ACCESS_TOKEN_REQUEST_METRICS_HEADER_VALUE, - "x-allowed-locations": "0x0", + # "x-allowed-locations": "0x0", } impersonation_request_data = { "delegates": None, @@ -1415,7 +1417,7 @@ def test_refresh_impersonation_with_client_auth_success_use_default_scopes( "Content-Type": "application/json", "authorization": "Bearer {}".format(token_response["access_token"]), "x-goog-api-client": IMPERSONATE_ACCESS_TOKEN_REQUEST_METRICS_HEADER_VALUE, - "x-allowed-locations": "0x0", + # "x-allowed-locations": "0x0", } impersonation_request_data = { "delegates": None, @@ -1471,7 +1473,7 @@ def test_apply_without_quota_project_id(self): assert headers == { "authorization": "Bearer {}".format(self.SUCCESS_RESPONSE["access_token"]), - "x-allowed-locations": "0x0", + # "x-allowed-locations": "0x0", } def test_apply_workforce_without_quota_project_id(self): @@ -1488,7 +1490,7 @@ def test_apply_workforce_without_quota_project_id(self): assert headers == { "authorization": "Bearer {}".format(self.SUCCESS_RESPONSE["access_token"]), - "x-allowed-locations": "0x0", + # "x-allowed-locations": "0x0", } def test_apply_impersonation_without_quota_project_id(self): @@ -1520,7 +1522,7 @@ def test_apply_impersonation_without_quota_project_id(self): assert headers == { "authorization": "Bearer {}".format(impersonation_response["accessToken"]), - "x-allowed-locations": "0x0", + # "x-allowed-locations": "0x0", } def test_apply_with_quota_project_id(self): @@ -1537,7 +1539,7 @@ def test_apply_with_quota_project_id(self): "other": "header-value", "authorization": "Bearer {}".format(self.SUCCESS_RESPONSE["access_token"]), "x-goog-user-project": self.QUOTA_PROJECT_ID, - "x-allowed-locations": "0x0", + # "x-allowed-locations": "0x0", } def test_apply_impersonation_with_quota_project_id(self): @@ -1572,7 +1574,7 @@ def test_apply_impersonation_with_quota_project_id(self): "other": "header-value", "authorization": "Bearer {}".format(impersonation_response["accessToken"]), "x-goog-user-project": self.QUOTA_PROJECT_ID, - "x-allowed-locations": "0x0", + # "x-allowed-locations": "0x0", } def test_before_request(self): @@ -1588,7 +1590,7 @@ def test_before_request(self): assert headers == { "other": "header-value", "authorization": "Bearer {}".format(self.SUCCESS_RESPONSE["access_token"]), - "x-allowed-locations": "0x0", + # "x-allowed-locations": "0x0", } # Second call shouldn't call refresh. @@ -1597,7 +1599,7 @@ def test_before_request(self): assert headers == { "other": "header-value", "authorization": "Bearer {}".format(self.SUCCESS_RESPONSE["access_token"]), - "x-allowed-locations": "0x0", + # "x-allowed-locations": "0x0", } def test_before_request_workforce(self): @@ -1615,7 +1617,7 @@ def test_before_request_workforce(self): assert headers == { "other": "header-value", "authorization": "Bearer {}".format(self.SUCCESS_RESPONSE["access_token"]), - "x-allowed-locations": "0x0", + # "x-allowed-locations": "0x0", } # Second call shouldn't call refresh. @@ -1624,7 +1626,7 @@ def test_before_request_workforce(self): assert headers == { "other": "header-value", "authorization": "Bearer {}".format(self.SUCCESS_RESPONSE["access_token"]), - "x-allowed-locations": "0x0", + # "x-allowed-locations": "0x0", } def test_before_request_impersonation(self): @@ -1655,7 +1657,7 @@ def test_before_request_impersonation(self): assert headers == { "other": "header-value", "authorization": "Bearer {}".format(impersonation_response["accessToken"]), - "x-allowed-locations": "0x0", + # "x-allowed-locations": "0x0", } # Second call shouldn't call refresh. @@ -1664,7 +1666,7 @@ def test_before_request_impersonation(self): assert headers == { "other": "header-value", "authorization": "Bearer {}".format(impersonation_response["accessToken"]), - "x-allowed-locations": "0x0", + # "x-allowed-locations": "0x0", } @mock.patch("google.auth._helpers.utcnow") @@ -1693,7 +1695,7 @@ def test_before_request_expired(self, utcnow): # Cached token should be used. assert headers == { "authorization": "Bearer token", - "x-allowed-locations": "0x0", + # "x-allowed-locations": "0x0", } # Next call should simulate 1 second passed. @@ -1709,7 +1711,7 @@ def test_before_request_expired(self, utcnow): # New token should be retrieved. assert headers == { "authorization": "Bearer {}".format(self.SUCCESS_RESPONSE["access_token"]), - "x-allowed-locations": "0x0", + # "x-allowed-locations": "0x0", } @mock.patch("google.auth._helpers.utcnow") @@ -1754,7 +1756,7 @@ def test_before_request_impersonation_expired(self, utcnow): # Cached token should be used. assert headers == { "authorization": "Bearer token", - "x-allowed-locations": "0x0", + # "x-allowed-locations": "0x0", } # Next call should simulate 1 second passed. This will trigger the expiration @@ -1773,7 +1775,7 @@ def test_before_request_impersonation_expired(self, utcnow): # New token should be retrieved. assert headers == { "authorization": "Bearer {}".format(impersonation_response["accessToken"]), - "x-allowed-locations": "0x0", + # "x-allowed-locations": "0x0", } @pytest.mark.parametrize( @@ -1872,7 +1874,7 @@ def test_get_project_id_cloud_resource_manager_success( "x-goog-user-project": self.QUOTA_PROJECT_ID, "authorization": "Bearer {}".format(token_response["access_token"]), "x-goog-api-client": IMPERSONATE_ACCESS_TOKEN_REQUEST_METRICS_HEADER_VALUE, - "x-allowed-locations": "0x0", + # "x-allowed-locations": "0x0", } impersonation_request_data = { "delegates": None, @@ -1926,7 +1928,7 @@ def test_get_project_id_cloud_resource_manager_success( "authorization": "Bearer {}".format( impersonation_response["accessToken"] ), - "x-allowed-locations": "0x0", + # "x-allowed-locations": "0x0", }, ) @@ -1998,7 +2000,7 @@ def test_workforce_pool_get_project_id_cloud_resource_manager_success( "authorization": "Bearer {}".format( self.SUCCESS_RESPONSE["access_token"] ), - "x-allowed-locations": "0x0", + # "x-allowed-locations": "0x0", }, ) @@ -2048,7 +2050,7 @@ def test_refresh_impersonation_with_lifetime( "Content-Type": "application/json", "authorization": "Bearer {}".format(token_response["access_token"]), "x-goog-api-client": IMPERSONATE_ACCESS_TOKEN_REQUEST_METRICS_HEADER_VALUE, - "x-allowed-locations": "0x0", + # "x-allowed-locations": "0x0", } impersonation_request_data = { "delegates": None, diff --git a/tests/test_identity_pool.py b/tests/test_identity_pool.py index 41fd18892..ac4e90b07 100644 --- a/tests/test_identity_pool.py +++ b/tests/test_identity_pool.py @@ -38,8 +38,10 @@ SERVICE_ACCOUNT_IMPERSONATION_URL_BASE = ( "https://us-east1-iamcredentials.googleapis.com" ) -SERVICE_ACCOUNT_IMPERSONATION_URL_ROUTE = "/v1/projects/-/serviceAccounts/{}:generateAccessToken".format( - SERVICE_ACCOUNT_EMAIL +SERVICE_ACCOUNT_IMPERSONATION_URL_ROUTE = ( + "/v1/projects/-/serviceAccounts/{}:generateAccessToken".format( + SERVICE_ACCOUNT_EMAIL + ) ) SERVICE_ACCOUNT_IMPERSONATION_URL = ( SERVICE_ACCOUNT_IMPERSONATION_URL_BASE + SERVICE_ACCOUNT_IMPERSONATION_URL_ROUTE @@ -284,7 +286,7 @@ def assert_token_request_kwargs( assert request_kwargs["body"] is not None body_tuples = urllib.parse.parse_qsl(request_kwargs["body"]) assert len(body_tuples) == len(request_data.keys()) - for (k, v) in body_tuples: + for k, v in body_tuples: assert v.decode("utf-8") == request_data[k.decode("utf-8")] @classmethod @@ -383,7 +385,9 @@ def assert_underlying_credentials_refresh( "Content-Type": "application/json", "authorization": "Bearer {}".format(token_response["access_token"]), "x-goog-api-client": metrics_header_value, - "x-allowed-locations": "0x0", + # TODO(negarb): Uncomment and update when trust boundary is supported + # for external account credentials. + # "x-allowed-locations": "0x0", } impersonation_request_data = { "delegates": None, diff --git a/tests/test_impersonated_credentials.py b/tests/test_impersonated_credentials.py index 4aa357e3e..c81694e50 100644 --- a/tests/test_impersonated_credentials.py +++ b/tests/test_impersonated_credentials.py @@ -21,11 +21,13 @@ import mock import pytest # type: ignore -from google.auth import _helpers +from google.auth import _helpers, iam from google.auth import crypt from google.auth import exceptions from google.auth import impersonated_credentials from google.auth import transport +from google.auth import credentials as auth_credentials +from google.auth import environment_vars from google.auth.impersonated_credentials import Credentials from google.oauth2 import credentials from google.oauth2 import service_account @@ -127,8 +129,21 @@ class TestImpersonatedCredentials(object): # Because Python 2.7: DELEGATES = [] # type: ignore LIFETIME = 3600 + NO_OP_TRUST_BOUNDARY = { + "locations": auth_credentials.NO_OP_TRUST_BOUNDARY_LOCATIONS, + "encodedLocations": auth_credentials.NO_OP_TRUST_BOUNDARY_ENCODED_LOCATIONS, + } + VALID_TRUST_BOUNDARY = { + "locations": ["us-central1", "us-east1"], + "encodedLocations": "0xVALIDHEX", + } + EXPECTED_TRUST_BOUNDARY_LOOKUP_URL_DEFAULT_UNIVERSE = ( + "https://iamcredentials.googleapis.com/v1/projects/-" + "/serviceAccounts/impersonated@project.iam.gserviceaccount.com/allowedLocations" + ) + FAKE_UNIVERSE_DOMAIN = "universe.foo" SOURCE_CREDENTIALS = service_account.Credentials( - SIGNER, SERVICE_ACCOUNT_EMAIL, TOKEN_URI + SIGNER, SERVICE_ACCOUNT_EMAIL, TOKEN_URI, trust_boundary=NO_OP_TRUST_BOUNDARY ) USER_SOURCE_CREDENTIALS = credentials.Credentials(token="ABCDE") IAM_ENDPOINT_OVERRIDE = ( @@ -143,6 +158,7 @@ def make_credentials( target_principal=TARGET_PRINCIPAL, subject=None, iam_endpoint_override=None, + trust_boundary=None, # Align with Credentials class default ): return Credentials( @@ -153,16 +169,19 @@ def make_credentials( lifetime=lifetime, subject=subject, iam_endpoint_override=iam_endpoint_override, + trust_boundary=trust_boundary, ) def test_from_impersonated_service_account_info(self): - credentials = impersonated_credentials.Credentials.from_impersonated_service_account_info( - IMPERSONATED_SERVICE_ACCOUNT_AUTHORIZED_USER_SOURCE_INFO + credentials = ( + impersonated_credentials.Credentials.from_impersonated_service_account_info( + IMPERSONATED_SERVICE_ACCOUNT_AUTHORIZED_USER_SOURCE_INFO + ) ) assert isinstance(credentials, impersonated_credentials.Credentials) def test_from_impersonated_service_account_info_with_invalid_source_credentials_type( - self + self, ): info = copy.deepcopy(IMPERSONATED_SERVICE_ACCOUNT_AUTHORIZED_USER_SOURCE_INFO) assert "source_credentials" in info @@ -177,7 +196,7 @@ def test_from_impersonated_service_account_info_with_invalid_source_credentials_ ) def test_from_impersonated_service_account_info_with_invalid_impersonation_url( - self + self, ): info = copy.deepcopy(IMPERSONATED_SERVICE_ACCOUNT_AUTHORIZED_USER_SOURCE_INFO) info["service_account_impersonation_url"] = "invalid_url" @@ -262,8 +281,12 @@ def test_token_usage_metrics(self): assert headers["x-goog-api-client"] == "cred-type/imp" @pytest.mark.parametrize("use_data_bytes", [True, False]) - def test_refresh_success(self, use_data_bytes, mock_donor_credentials): - credentials = self.make_credentials(lifetime=None) + @mock.patch("google.oauth2._client._lookup_trust_boundary") + def test_refresh_success( + self, mock_lookup_trust_boundary, use_data_bytes, mock_donor_credentials + ): + # Start with no boundary. + credentials = self.make_credentials(lifetime=None, trust_boundary=None) token = "token" expire_time = ( @@ -277,7 +300,12 @@ def test_refresh_success(self, use_data_bytes, mock_donor_credentials): use_data_bytes=use_data_bytes, ) - with mock.patch( + # Mock the trust boundary lookup to return a valid value. + mock_lookup_trust_boundary.return_value = self.VALID_TRUST_BOUNDARY + + with mock.patch.dict( + os.environ, {environment_vars.GOOGLE_AUTH_TRUST_BOUNDARY_ENABLED: "true"} + ), mock.patch( "google.auth.metrics.token_request_access_token_impersonate", return_value=ACCESS_TOKEN_REQUEST_METRICS_HEADER_VALUE, ): @@ -290,6 +318,244 @@ def test_refresh_success(self, use_data_bytes, mock_donor_credentials): == ACCESS_TOKEN_REQUEST_METRICS_HEADER_VALUE ) + # Verify that the x-allowed-locations header from the source credential + # was applied. The source credential has a NO_OP boundary, so the + # header should be an empty string. + request_kwargs = request.call_args[1] + assert "headers" in request_kwargs + assert "x-allowed-locations" in request_kwargs["headers"] + assert request_kwargs["headers"]["x-allowed-locations"] == "" + + # Verify trust boundary was set. + assert credentials._trust_boundary == self.VALID_TRUST_BOUNDARY + + # Verify the mock was called with the correct URL. + mock_lookup_trust_boundary.assert_called_once_with( + request, + self.EXPECTED_TRUST_BOUNDARY_LOOKUP_URL_DEFAULT_UNIVERSE, + headers={"authorization": "Bearer token"}, + ) + + # Verify x-allowed-locations header is set correctly by apply(). + headers_applied = {} + credentials.apply(headers_applied) + assert ( + headers_applied["x-allowed-locations"] + == self.VALID_TRUST_BOUNDARY["encodedLocations"] + ) + + def test_refresh_source_creds_no_trust_boundary(self): + # Use a source credential that does not support trust boundaries. + source_credentials = credentials.Credentials(token="source_token") + creds = self.make_credentials(source_credentials=source_credentials) + token = "impersonated_token" + + expire_time = ( + _helpers.utcnow().replace(microsecond=0) + datetime.timedelta(seconds=500) + ).isoformat("T") + "Z" + response_body = {"accessToken": token, "expireTime": expire_time} + + request = self.make_request( + data=json.dumps(response_body), + status=http_client.OK, + ) + + creds.refresh(request) + + # Verify that the x-allowed-locations header was NOT applied because + # the source credential does not support trust boundaries. + request_kwargs = request.call_args[1] + assert "x-allowed-locations" not in request_kwargs["headers"] + + @mock.patch("google.oauth2._client._lookup_trust_boundary") + def test_refresh_trust_boundary_lookup_fails_no_cache( + self, mock_lookup_trust_boundary, mock_donor_credentials + ): + # Start with no trust boundary + credentials = self.make_credentials(lifetime=None, trust_boundary=None) + token = "token" + + expire_time = ( + _helpers.utcnow().replace(microsecond=0) + datetime.timedelta(seconds=500) + ).isoformat("T") + "Z" + response_body = {"accessToken": token, "expireTime": expire_time} + + request = self.make_request( + data=json.dumps(response_body), + status=http_client.OK, + ) + + # Mock the trust boundary lookup to raise an error + mock_lookup_trust_boundary.side_effect = exceptions.RefreshError( + "Lookup failed" + ) + + with mock.patch.dict( + os.environ, {environment_vars.GOOGLE_AUTH_TRUST_BOUNDARY_ENABLED: "true"} + ), pytest.raises(exceptions.RefreshError) as excinfo: + credentials.refresh(request) + + assert "Lookup failed" in str(excinfo.value) + assert credentials._trust_boundary is None # Still no trust boundary + mock_lookup_trust_boundary.assert_called_once() + + @mock.patch("google.oauth2._client._lookup_trust_boundary") + def test_refresh_fetches_no_op_trust_boundary( + self, mock_lookup_trust_boundary, mock_donor_credentials + ): + # Start with no trust boundary + credentials = self.make_credentials(lifetime=None, trust_boundary=None) + token = "token" + expire_time = ( + _helpers.utcnow().replace(microsecond=0) + datetime.timedelta(seconds=500) + ).isoformat("T") + "Z" + response_body = {"accessToken": token, "expireTime": expire_time} + request = self.make_request( + data=json.dumps(response_body), + status=http_client.OK, + ) + + mock_lookup_trust_boundary.return_value = ( + self.NO_OP_TRUST_BOUNDARY + ) # Mock returns NO_OP + + with mock.patch.dict( + os.environ, {environment_vars.GOOGLE_AUTH_TRUST_BOUNDARY_ENABLED: "true"} + ), mock.patch( + "google.auth.metrics.token_request_access_token_impersonate", + return_value=ACCESS_TOKEN_REQUEST_METRICS_HEADER_VALUE, + ): + credentials.refresh(request) + + assert credentials._trust_boundary == self.NO_OP_TRUST_BOUNDARY + mock_lookup_trust_boundary.assert_called_once_with( + request, + self.EXPECTED_TRUST_BOUNDARY_LOOKUP_URL_DEFAULT_UNIVERSE, + headers={"authorization": "Bearer token"}, + ) + headers_applied = {} + credentials.apply(headers_applied) + assert headers_applied["x-allowed-locations"] == "" + + @mock.patch("google.oauth2._client._lookup_trust_boundary") + def test_refresh_skips_trust_boundary_lookup_non_default_universe( + self, mock_lookup_trust_boundary + ): + # Create source credentials with a non-default universe domain + source_credentials = service_account.Credentials( + SIGNER, + "some@email.com", + TOKEN_URI, + universe_domain=self.FAKE_UNIVERSE_DOMAIN, + ) + # Create impersonated credentials using the non-default source credentials + credentials = self.make_credentials(source_credentials=source_credentials) + + # Mock the IAM credentials API call for generateAccessToken + token = "token" + expire_time = ( + _helpers.utcnow().replace(microsecond=0) + datetime.timedelta(seconds=500) + ).isoformat("T") + "Z" + response_body = {"accessToken": token, "expireTime": expire_time} + request = self.make_request( + data=json.dumps(response_body), status=http_client.OK + ) + + with mock.patch.dict( + os.environ, {environment_vars.GOOGLE_AUTH_TRUST_BOUNDARY_ENABLED: "true"} + ): + credentials.refresh(request) + + # Ensure trust boundary lookup was not called + mock_lookup_trust_boundary.assert_not_called() + # Verify that x-allowed-locations header is not set by apply() + headers_applied = {} + credentials.apply(headers_applied) + assert "x-allowed-locations" not in headers_applied + + @mock.patch("google.oauth2._client._lookup_trust_boundary") + def test_refresh_starts_with_no_op_trust_boundary_skips_lookup( + self, mock_lookup_trust_boundary, mock_donor_credentials + ): + credentials = self.make_credentials( + lifetime=None, trust_boundary=self.NO_OP_TRUST_BOUNDARY + ) # Start with NO_OP + token = "token" + expire_time = ( + _helpers.utcnow().replace(microsecond=0) + datetime.timedelta(seconds=500) + ).isoformat("T") + "Z" + response_body = {"accessToken": token, "expireTime": expire_time} + request = self.make_request( + data=json.dumps(response_body), + status=http_client.OK, + ) + with mock.patch.dict( + os.environ, {environment_vars.GOOGLE_AUTH_TRUST_BOUNDARY_ENABLED: "true"} + ), mock.patch( + "google.auth.metrics.token_request_access_token_impersonate", + return_value=ACCESS_TOKEN_REQUEST_METRICS_HEADER_VALUE, + ): + credentials.refresh(request) + + # Verify trust boundary remained NO_OP + assert credentials._trust_boundary == self.NO_OP_TRUST_BOUNDARY + + # Lookup should be skipped + mock_lookup_trust_boundary.assert_not_called() + + # Verify that an empty header was added. + headers_applied = {} + credentials.apply(headers_applied) + assert headers_applied["x-allowed-locations"] == "" + + @mock.patch("google.oauth2._client._lookup_trust_boundary") + def test_refresh_trust_boundary_lookup_fails_with_cached_data2( + self, mock_lookup_trust_boundary, mock_donor_credentials + ): + # Start with no trust boundary + credentials = self.make_credentials(lifetime=None, trust_boundary=None) + token = "token" + + expire_time = ( + _helpers.utcnow().replace(microsecond=0) + datetime.timedelta(seconds=500) + ).isoformat("T") + "Z" + response_body = {"accessToken": token, "expireTime": expire_time} + + request = self.make_request( + data=json.dumps(response_body), + status=http_client.OK, + ) + + # First refresh: Successfully fetch a valid trust boundary. + mock_lookup_trust_boundary.return_value = self.VALID_TRUST_BOUNDARY + with mock.patch.dict( + os.environ, {environment_vars.GOOGLE_AUTH_TRUST_BOUNDARY_ENABLED: "true"} + ), mock.patch( + "google.auth.metrics.token_request_access_token_impersonate", + return_value=ACCESS_TOKEN_REQUEST_METRICS_HEADER_VALUE, + ): + credentials.refresh(request) + + assert credentials.valid + # Verify trust boundary was set. + assert credentials._trust_boundary == self.VALID_TRUST_BOUNDARY + mock_lookup_trust_boundary.assert_called_once() + + # Second refresh: Mock lookup to fail, but expect cached data to be preserved. + mock_lookup_trust_boundary.reset_mock() + mock_lookup_trust_boundary.side_effect = exceptions.RefreshError( + "Lookup failed" + ) + + with mock.patch.dict( + os.environ, {environment_vars.GOOGLE_AUTH_TRUST_BOUNDARY_ENABLED: "true"} + ): + credentials.refresh(request) + + assert credentials.valid + assert credentials._trust_boundary == self.VALID_TRUST_BOUNDARY + mock_lookup_trust_boundary.assert_called_once() + @pytest.mark.parametrize("use_data_bytes", [True, False]) def test_refresh_with_subject_success(self, use_data_bytes, mock_dwd_credentials): credentials = self.make_credentials(subject="test@email.com", lifetime=None) @@ -672,6 +938,29 @@ def test_with_scopes(self): assert credentials.requires_scopes is False assert credentials._target_scopes == ["fake_scope1", "fake_scope2"] + def test_with_trust_boundary(self): + credentials = self.make_credentials() + new_boundary = {"encodedLocations": "new_boundary"} + new_credentials = credentials.with_trust_boundary(new_boundary) + + assert new_credentials is not credentials + assert new_credentials._trust_boundary == new_boundary + # The source credentials should be a copy, not the same object. + # But they should be functionally equivalent. + assert ( + new_credentials._source_credentials is not credentials._source_credentials + ) + + assert ( + new_credentials._source_credentials.service_account_email + == credentials._source_credentials.service_account_email + ) + assert ( + new_credentials._source_credentials._signer + == credentials._source_credentials._signer + ) + assert new_credentials._target_principal == credentials._target_principal + def test_with_scopes_provide_default_scopes(self): credentials = self.make_credentials() credentials._target_scopes = []