Skip to content

feat: Add trust boundary support for service accounts and impersonation. #1778

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 17 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions google/auth/_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -295,6 +295,7 @@ def get_bool_from_env(variable_name, default=False):
(case-insensitive) rules:
- "true", "1" are considered true.
- "false", "0" are considered false.
Any other values will raise an exception.

Args:
variable_name (str): The name of the environment variable.
Expand Down
4 changes: 1 addition & 3 deletions google/auth/compute_engine/credentials.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ def __init__(
def _metric_header_for_usage(self):
return metrics.CRED_TYPE_SA_MDS

def refresh(self, request):
def _refresh_token(self, request):
"""Refresh the access token and scopes.

Args:
Expand All @@ -119,8 +119,6 @@ def refresh(self, request):
new_exc = exceptions.RefreshError(caught_exc)
raise new_exc from caught_exc

self._refresh_trust_boundary(request)

def _build_trust_boundary_lookup_url(self):
"""Builds and returns the URL for the trust boundary lookup API for GCE."""
# If the service account email is 'default', we need to get the
Expand Down
67 changes: 49 additions & 18 deletions google/auth/credentials.py
Original file line number Diff line number Diff line change
Expand Up @@ -292,6 +292,20 @@ def with_universe_domain(self, universe_domain):
class CredentialsWithTrustBoundary(Credentials):
"""Abstract base for credentials supporting ``with_trust_boundary`` factory"""

@abc.abstractmethod
def _refresh_token(self, request):
"""Refreshes the access token.

Args:
request (google.auth.transport.Request): The object used to make
HTTP requests.

Raises:
google.auth.exceptions.RefreshError: If the credentials could
not be refreshed.
"""
raise NotImplementedError("_refresh_token must be implemented")

def with_trust_boundary(self, trust_boundary):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this method necessary? When would it be used?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The with_* method is part of a design pattern used for credential objects throughout this library.

We treat credential objects as mostly immutable. You'll see similar factory methods like with_scopes(), with_quota_project(), and with_universe_domain().

"""Returns a copy of these credentials with a modified trust boundary.

Expand All @@ -306,6 +320,29 @@ def with_trust_boundary(self, trust_boundary):
"""
raise NotImplementedError("This credential does not support trust boundaries.")

def _is_trust_boundary_lookup_required(self):
"""Checks if a trust boundary lookup is required.

A lookup is required if the feature is enabled via an environment
variable, the universe domain is supported, and a no-op boundary
is not already cached.

Returns:
bool: True if a trust boundary lookup is required, False otherwise.
"""
# 1. Check if the feature is enabled via environment variable.
if not _helpers.get_bool_from_env(
environment_vars.GOOGLE_AUTH_TRUST_BOUNDARY_ENABLED, default=False
):
return False

# 2. Skip trust boundary flow for non-default universe domains.
if self.universe_domain != DEFAULT_UNIVERSE_DOMAIN:
return False

# 3. Do not trigger refresh if credential has a cached no-op trust boundary.
return not self._has_no_op_trust_boundary()

def _get_trust_boundary_header(self):
if self._trust_boundary is not None:
if self._has_no_op_trust_boundary():
Expand All @@ -320,6 +357,15 @@ def apply(self, headers, token=None):
super().apply(headers, token)
headers.update(self._get_trust_boundary_header())

def refresh(self, request):
"""Refreshes the access token and the trust boundary.

This method calls the subclass's token refresh logic and then
refreshes the trust boundary if applicable.
"""
self._refresh_token(request)
self._refresh_trust_boundary(request)

def _refresh_trust_boundary(self, request):
"""Triggers a refresh of the trust boundary and updates the cache if necessary.

Expand All @@ -331,12 +377,10 @@ def _refresh_trust_boundary(self, request):
google.auth.exceptions.RefreshError: If the trust boundary could
not be refreshed and no cached value is available.
"""
# Do not trigger refresh if credential has a cached no-op trust boundary.
if self._has_no_op_trust_boundary():
if not self._is_trust_boundary_lookup_required():
return
new_trust_boundary = {}
try:
new_trust_boundary = self._lookup_trust_boundary(request)
self._trust_boundary = self._lookup_trust_boundary(request)
except exceptions.RefreshError as error:
# If the call to the lookup API failed, check if there is a trust boundary
# already cached. If there is, do nothing. If not, then throw the error.
Expand All @@ -347,8 +391,6 @@ def _refresh_trust_boundary(self, request):
"Using cached trust boundary due to refresh error: %s", error
)
return
else:
self._trust_boundary = new_trust_boundary

def _lookup_trust_boundary(self, request):
"""Calls the trust boundary lookup API to refresh the trust boundary cache.
Expand All @@ -366,24 +408,13 @@ def _lookup_trust_boundary(self, request):
"""
from google.oauth2 import _client

# Verify the trust boundary feature flag is enabled.
if not _helpers.get_bool_from_env(
environment_vars.GOOGLE_AUTH_TRUST_BOUNDARY_ENABLED, default=False
):
# Skip the lookup and return early if it's not explicitly enabled.
return None

# Skip trust boundary flow for non-gdu universe domain.
if self.universe_domain != DEFAULT_UNIVERSE_DOMAIN:
return None

url = self._build_trust_boundary_lookup_url()
if not url:
raise exceptions.InvalidValue("Failed to build trust boundary lookup URL.")

headers = {}
self.apply(headers)
return _client.lookup_trust_boundary(request, url, headers=headers)
return _client._lookup_trust_boundary(request, url, headers=headers)

@abc.abstractmethod
def _build_trust_boundary_lookup_url(self):
Expand Down
35 changes: 14 additions & 21 deletions google/auth/external_account.py
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Lets revert these changes and keep them for a future PR. We should only add this with the changes for supporting the feature

Original file line number Diff line number Diff line change
Expand Up @@ -59,18 +59,18 @@
@dataclass
class SupplierContext:
"""A context class that contains information about the requested third party credential that is passed
to AWS security credential and subject token suppliers.
to AWS security credential and subject token suppliers.

Attributes:
subject_token_type (str): The requested subject token type based on the Oauth2.0 token exchange spec.
Expected values include::
Attributes:
subject_token_type (str): The requested subject token type based on the Oauth2.0 token exchange spec.
Expected values include::

“urn:ietf:params:oauth:token-type:jwt”
“urn:ietf:params:oauth:token-type:id-token”
“urn:ietf:params:oauth:token-type:saml2”
“urn:ietf:params:aws:token-type:aws4_request”
“urn:ietf:params:oauth:token-type:jwt”
“urn:ietf:params:oauth:token-type:id-token”
“urn:ietf:params:oauth:token-type:saml2”
“urn:ietf:params:aws:token-type:aws4_request”

audience (str): The requested audience for the subject token.
audience (str): The requested audience for the subject token.
"""

subject_token_type: str
Expand All @@ -81,7 +81,6 @@ class Credentials(
credentials.Scoped,
credentials.CredentialsWithQuotaProject,
credentials.CredentialsWithTokenUri,
credentials.CredentialsWithTrustBoundary,
metaclass=abc.ABCMeta,
):
"""Base class for all external account credentials.
Expand Down Expand Up @@ -134,14 +133,14 @@ def __init__(
authorization grant.
default_scopes (Optional[Sequence[str]]): Default scopes passed by a
Google client library. Use 'scopes' for user-defined scopes.
workforce_pool_user_project (Optonal[str]): The optional workforce pool user
workforce_pool_user_project (Optona[str]): The optional workforce pool user
project number when the credential corresponds to a workforce pool and not
a workload identity pool. The underlying principal must still have
serviceusage.services.use IAM permission to use the project for
billing/quota.
universe_domain (str): The universe domain. The default universe
domain is googleapis.com.
trust_boundary (str): String representation of trust boundary metadata.
trust_boundary (str): String representation of trust boundary meta.
Raises:
google.auth.exceptions.RefreshError: If the generateAccessToken
endpoint returned an error.
Expand All @@ -168,9 +167,9 @@ def __init__(
self._default_scopes = default_scopes
self._workforce_pool_user_project = workforce_pool_user_project
self._trust_boundary = {
"locations": credentials.NO_OP_TRUST_BOUNDARY_LOCATIONS,
"encodedLocations": credentials.NO_OP_TRUST_BOUNDARY_ENCODED_LOCATIONS,
} # Sets a no-op trust boundary value.
"locations": [],
"encoded_locations": "0x0",
} # expose a placeholder trust boundary value.

if self._client_id:
self._client_auth = utils.ClientAuthentication(
Expand Down Expand Up @@ -457,12 +456,6 @@ def refresh(self, request):

self.expiry = now + lifetime

def _build_trust_boundary_lookup_url(self):
"""Builds and returns the URL for the trust boundary lookup API.
Will be implemented in a follow up PR.
"""
return

def _make_copy(self):
kwargs = self._constructor_args()
new_cred = self.__class__(**kwargs)
Expand Down
7 changes: 1 addition & 6 deletions google/auth/impersonated_credentials.py
Original file line number Diff line number Diff line change
Expand Up @@ -264,12 +264,7 @@ def __init__(
def _metric_header_for_usage(self):
return metrics.CRED_TYPE_SA_IMPERSONATE

@_helpers.copy_docstring(credentials.Credentials)
def refresh(self, request):
self._update_token(request)
self._refresh_trust_boundary(request)

def _update_token(self, request):
def _refresh_token(self, request):
"""Updates credentials with a new access_token representing
the impersonated account.

Expand Down
30 changes: 5 additions & 25 deletions google/oauth2/_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -515,7 +515,7 @@ def refresh_grant(
return _handle_refresh_grant_response(response_data, refresh_token)


def lookup_trust_boundary(request, url, headers=None):
def _lookup_trust_boundary(request, url, headers=None):
"""Implements the global lookup of a credential trust boundary.
For the lookup, we send a request to the global lookup endpoint and then
parse the response. Service account credentials, workload identity
Expand Down Expand Up @@ -556,9 +556,7 @@ def lookup_trust_boundary(request, url, headers=None):
return response_data


def _lookup_trust_boundary_request(
request, url, can_retry=True, headers=None, **kwargs
):
def _lookup_trust_boundary_request(request, url, can_retry=True, headers=None):
"""Makes a request to the trust boundary lookup endpoint.

Args:
Expand All @@ -567,13 +565,6 @@ def _lookup_trust_boundary_request(
url (str): The trust boundary lookup url.
can_retry (bool): Enable or disable request retry behavior. Defaults to true.
headers (Optional[Mapping[str, str]]): The headers for the request.
kwargs: Additional arguments passed on to the request method. The
kwargs will be passed to `requests.request` method, see:
https://docs.python-requests.org/en/latest/api/#requests.request.
For example, you can use `cert=("cert_pem_path", "key_pem_path")`
to set up client side SSL certificate, and use
`verify="ca_bundle_path"` to set up the CA certificates for sever
side SSL certificate verification.

Returns:
Mapping[str, str]: The JSON-decoded response data.
Expand All @@ -583,18 +574,14 @@ def _lookup_trust_boundary_request(
an error.
"""
response_status_ok, response_data, retryable_error = (
_lookup_trust_boundary_request_no_throw(
request, url, can_retry, headers, **kwargs
)
_lookup_trust_boundary_request_no_throw(request, url, can_retry, headers)
)
if not response_status_ok:
_handle_error_response(response_data, retryable_error)
return response_data


def _lookup_trust_boundary_request_no_throw(
request, url, can_retry=True, headers=None, **kwargs
):
def _lookup_trust_boundary_request_no_throw(request, url, can_retry=True, headers=None):
"""Makes a request to the trust boundary lookup endpoint. This
function doesn't throw on response errors.

Expand All @@ -604,13 +591,6 @@ def _lookup_trust_boundary_request_no_throw(
url (str): The trust boundary lookup url.
can_retry (bool): Enable or disable request retry behavior. Defaults to true.
headers (Optional[Mapping[str, str]]): The headers for the request.
kwargs: Additional arguments passed on to the request method. The
kwargs will be passed to `requests.request` method, see:
https://docs.python-requests.org/en/latest/api/#requests.request.
For example, you can use `cert=("cert_pem_path", "key_pem_path")`
to set up client side SSL certificate, and use
`verify="ca_bundle_path"` to set up the CA certificates for sever
side SSL certificate verification.

Returns:
Tuple(bool, Mapping[str, str], Optional[bool]): A boolean indicating
Expand All @@ -624,7 +604,7 @@ def _lookup_trust_boundary_request_no_throw(

retries = _exponential_backoff.ExponentialBackoff()
for _ in retries:
response = request(method="GET", url=url, headers=headers, **kwargs)
response = request(method="GET", url=url, headers=headers)
response_body = (
response.data.decode("utf-8")
if hasattr(response.data, "decode")
Expand Down
62 changes: 59 additions & 3 deletions google/oauth2/service_account.py
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How does this work for self signed JWTs?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've updated service_account.Credentials to handle self signed JWTs. When a credential that uses a self-signed JWT also needs to perform a trust boundary lookup, the refresh() method now follows a two-step process:

  1. IAM-Specific JWT for Lookup: It first generates a temporary self-signed JWT where the audience is specifically set to the IAM Credentials API. This token is used exclusively to authenticate the request that fetches the trust boundary information.

  2. Final JWT for Target API: After the trust boundary is successfully retrieved, the method then proceeds to generate the final self-signed JWT with the audience required by the application's target API (e.g., Pub/Sub, Storage).

This approach ensures that the trust boundary lookup is authenticated correctly using a token the IAM service expects, while the final token used by the application is properly configured for its intended service.

Original file line number Diff line number Diff line change
Expand Up @@ -435,8 +435,8 @@ def _metric_header_for_usage(self):
return metrics.CRED_TYPE_SA_JWT
return metrics.CRED_TYPE_SA_ASSERTION

@_helpers.copy_docstring(credentials.Credentials)
def refresh(self, request):
@_helpers.copy_docstring(credentials.CredentialsWithTrustBoundary)
def _refresh_token(self, request):
if self._always_use_jwt_access and not self._jwt_credentials:
# If self signed jwt should be used but jwt credential is not
# created, try to create one with scopes
Expand All @@ -461,7 +461,57 @@ def refresh(self, request):
)
self.token = access_token
self.expiry = expiry
self._refresh_trust_boundary(request)

def refresh(self, request):
"""Refreshes the credential's access token.

This method is overridden to provide special handling for credentials that
use a self-signed JWT and have a trust boundary configured. In this
scenario, it first generates a temporary, IAM-specific self-signed JWT
to perform the trust boundary lookup, and then generates the final
self-signed JWT for the target API.

For all other cases, it falls back to the standard refresh behavior
from the parent class.

Args:
request (google.auth.transport.Request): The object used to make
HTTP requests.

Raises:
google.auth.exceptions.RefreshError: If the credentials could
not be refreshed.
"""
# This is a special path for self-signed JWTs that need to look up a trust boundary.
# The `_subject` check is to ensure we are not in a domain-wide
# delegation flow, which uses a different authentication mechanism.
if (
self._always_use_jwt_access
and self._subject is None
and self._is_trust_boundary_lookup_required()
):
# Special case: self-signed JWT with trust boundary.
# 1. Create a temporary self-signed JWT for the IAM API.
iam_audience = "https://iamcredentials.{}/".format(self._universe_domain)
iam_jwt_creds = jwt.Credentials.from_signing_credentials(self, iam_audience)
iam_jwt_creds.refresh(request)

# 2. Use this JWT to perform the trust boundary lookup.
# We temporarily set self.token for the base lookup method.
# The base lookup method will call self.apply() which adds the
# authorization header.
original_token = self.token
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems to be welcoming race conditions. What happens if one thread is updating trust boundary and set self.token to iam audience and another thread tries to read self.token

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reverted this change since we're still waiting to hear back from the backend team on whether or not the workaround is necessary. We'll address self signed jwt flow if necessary in a future PR.

self.token = iam_jwt_creds.token.decode()
try:
self._refresh_trust_boundary(request)
finally:
self.token = original_token

# 3. Now, refresh the original self-signed JWT for the target API.
self._refresh_token(request)
else:
# For all other cases, use the standard refresh mechanism.
super(Credentials, self).refresh(request)

def _create_self_signed_jwt(self, audience):
"""Create a self-signed JWT from the credentials if requirements are met.
Expand Down Expand Up @@ -798,6 +848,12 @@ def with_trust_boundary(self, trust_boundary):
cred._trust_boundary = trust_boundary
return cred

def _refresh_token(self, request):
"""Not used by this class, which overrides refresh() directly."""
# This is required to satisfy the abstract base class, but this
# class's refresh() method is called directly and does not use this.
pass

def _build_trust_boundary_lookup_url(self):
"""Builds and returns the URL for the trust boundary lookup API.

Expand Down
Loading