-
Notifications
You must be signed in to change notification settings - Fork 325
feat: Add trust boundary support for service accounts and impersonation. #1778
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from 4 commits
dfd26f6
eac0a50
baefc0f
fb08254
929bd1f
61654c7
5edd899
f064e90
52b1fc5
1d26a76
36015c6
cd213c0
c281fee
bd342ea
5af9ef9
d7d6b75
a052c9e
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -292,6 +292,20 @@ def with_universe_domain(self, universe_domain): | |
class CredentialsWithTrustBoundary(Credentials): | ||
"""Abstract base for credentials supporting ``with_trust_boundary`` factory""" | ||
|
||
@abc.abstractmethod | ||
def _refresh_token(self, request): | ||
"""Refreshes the access token. | ||
|
||
Args: | ||
request (google.auth.transport.Request): The object used to make | ||
HTTP requests. | ||
|
||
Raises: | ||
google.auth.exceptions.RefreshError: If the credentials could | ||
not be refreshed. | ||
""" | ||
raise NotImplementedError("_refresh_token must be implemented") | ||
|
||
def with_trust_boundary(self, trust_boundary): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is this method necessary? When would it be used? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The with_* method is part of a design pattern used for credential objects throughout this library. We treat credential objects as mostly immutable. You'll see similar factory methods like with_scopes(), with_quota_project(), and with_universe_domain(). |
||
"""Returns a copy of these credentials with a modified trust boundary. | ||
|
||
|
@@ -306,6 +320,29 @@ def with_trust_boundary(self, trust_boundary): | |
""" | ||
raise NotImplementedError("This credential does not support trust boundaries.") | ||
|
||
def _is_trust_boundary_lookup_required(self): | ||
"""Checks if a trust boundary lookup is required. | ||
|
||
A lookup is required if the feature is enabled via an environment | ||
variable, the universe domain is supported, and a no-op boundary | ||
is not already cached. | ||
|
||
Returns: | ||
bool: True if a trust boundary lookup is required, False otherwise. | ||
""" | ||
# 1. Check if the feature is enabled via environment variable. | ||
if not _helpers.get_bool_from_env( | ||
environment_vars.GOOGLE_AUTH_TRUST_BOUNDARY_ENABLED, default=False | ||
): | ||
return False | ||
|
||
# 2. Skip trust boundary flow for non-default universe domains. | ||
if self.universe_domain != DEFAULT_UNIVERSE_DOMAIN: | ||
return False | ||
|
||
# 3. Do not trigger refresh if credential has a cached no-op trust boundary. | ||
return not self._has_no_op_trust_boundary() | ||
|
||
def _get_trust_boundary_header(self): | ||
if self._trust_boundary is not None: | ||
if self._has_no_op_trust_boundary(): | ||
|
@@ -320,6 +357,15 @@ def apply(self, headers, token=None): | |
super().apply(headers, token) | ||
headers.update(self._get_trust_boundary_header()) | ||
|
||
def refresh(self, request): | ||
"""Refreshes the access token and the trust boundary. | ||
|
||
This method calls the subclass's token refresh logic and then | ||
refreshes the trust boundary if applicable. | ||
""" | ||
self._refresh_token(request) | ||
self._refresh_trust_boundary(request) | ||
|
||
def _refresh_trust_boundary(self, request): | ||
"""Triggers a refresh of the trust boundary and updates the cache if necessary. | ||
|
||
|
@@ -331,12 +377,10 @@ def _refresh_trust_boundary(self, request): | |
google.auth.exceptions.RefreshError: If the trust boundary could | ||
not be refreshed and no cached value is available. | ||
""" | ||
# Do not trigger refresh if credential has a cached no-op trust boundary. | ||
if self._has_no_op_trust_boundary(): | ||
if not self._is_trust_boundary_lookup_required(): | ||
return | ||
new_trust_boundary = {} | ||
try: | ||
new_trust_boundary = self._lookup_trust_boundary(request) | ||
self._trust_boundary = self._lookup_trust_boundary(request) | ||
except exceptions.RefreshError as error: | ||
# If the call to the lookup API failed, check if there is a trust boundary | ||
# already cached. If there is, do nothing. If not, then throw the error. | ||
|
@@ -347,8 +391,6 @@ def _refresh_trust_boundary(self, request): | |
"Using cached trust boundary due to refresh error: %s", error | ||
) | ||
return | ||
nbayati marked this conversation as resolved.
Show resolved
Hide resolved
|
||
else: | ||
self._trust_boundary = new_trust_boundary | ||
|
||
def _lookup_trust_boundary(self, request): | ||
"""Calls the trust boundary lookup API to refresh the trust boundary cache. | ||
|
@@ -366,24 +408,13 @@ def _lookup_trust_boundary(self, request): | |
""" | ||
from google.oauth2 import _client | ||
|
||
# Verify the trust boundary feature flag is enabled. | ||
if not _helpers.get_bool_from_env( | ||
environment_vars.GOOGLE_AUTH_TRUST_BOUNDARY_ENABLED, default=False | ||
): | ||
# Skip the lookup and return early if it's not explicitly enabled. | ||
return None | ||
|
||
# Skip trust boundary flow for non-gdu universe domain. | ||
if self.universe_domain != DEFAULT_UNIVERSE_DOMAIN: | ||
return None | ||
|
||
url = self._build_trust_boundary_lookup_url() | ||
if not url: | ||
raise exceptions.InvalidValue("Failed to build trust boundary lookup URL.") | ||
|
||
headers = {} | ||
self.apply(headers) | ||
return _client.lookup_trust_boundary(request, url, headers=headers) | ||
return _client._lookup_trust_boundary(request, url, headers=headers) | ||
|
||
@abc.abstractmethod | ||
def _build_trust_boundary_lookup_url(self): | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Lets revert these changes and keep them for a future PR. We should only add this with the changes for supporting the feature |
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. How does this work for self signed JWTs? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I've updated service_account.Credentials to handle self signed JWTs. When a credential that uses a self-signed JWT also needs to perform a trust boundary lookup, the refresh() method now follows a two-step process:
This approach ensures that the trust boundary lookup is authenticated correctly using a token the IAM service expects, while the final token used by the application is properly configured for its intended service. |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -435,8 +435,8 @@ def _metric_header_for_usage(self): | |
return metrics.CRED_TYPE_SA_JWT | ||
return metrics.CRED_TYPE_SA_ASSERTION | ||
|
||
@_helpers.copy_docstring(credentials.Credentials) | ||
def refresh(self, request): | ||
@_helpers.copy_docstring(credentials.CredentialsWithTrustBoundary) | ||
def _refresh_token(self, request): | ||
if self._always_use_jwt_access and not self._jwt_credentials: | ||
# If self signed jwt should be used but jwt credential is not | ||
# created, try to create one with scopes | ||
|
@@ -461,7 +461,57 @@ def refresh(self, request): | |
) | ||
self.token = access_token | ||
self.expiry = expiry | ||
self._refresh_trust_boundary(request) | ||
|
||
def refresh(self, request): | ||
"""Refreshes the credential's access token. | ||
|
||
This method is overridden to provide special handling for credentials that | ||
use a self-signed JWT and have a trust boundary configured. In this | ||
scenario, it first generates a temporary, IAM-specific self-signed JWT | ||
to perform the trust boundary lookup, and then generates the final | ||
self-signed JWT for the target API. | ||
|
||
For all other cases, it falls back to the standard refresh behavior | ||
from the parent class. | ||
|
||
Args: | ||
request (google.auth.transport.Request): The object used to make | ||
HTTP requests. | ||
|
||
Raises: | ||
google.auth.exceptions.RefreshError: If the credentials could | ||
not be refreshed. | ||
""" | ||
# This is a special path for self-signed JWTs that need to look up a trust boundary. | ||
# The `_subject` check is to ensure we are not in a domain-wide | ||
# delegation flow, which uses a different authentication mechanism. | ||
if ( | ||
self._always_use_jwt_access | ||
and self._subject is None | ||
and self._is_trust_boundary_lookup_required() | ||
): | ||
# Special case: self-signed JWT with trust boundary. | ||
# 1. Create a temporary self-signed JWT for the IAM API. | ||
iam_audience = "https://iamcredentials.{}/".format(self._universe_domain) | ||
iam_jwt_creds = jwt.Credentials.from_signing_credentials(self, iam_audience) | ||
iam_jwt_creds.refresh(request) | ||
|
||
# 2. Use this JWT to perform the trust boundary lookup. | ||
# We temporarily set self.token for the base lookup method. | ||
# The base lookup method will call self.apply() which adds the | ||
# authorization header. | ||
original_token = self.token | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This seems to be welcoming race conditions. What happens if one thread is updating trust boundary and set There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Reverted this change since we're still waiting to hear back from the backend team on whether or not the workaround is necessary. We'll address self signed jwt flow if necessary in a future PR. |
||
self.token = iam_jwt_creds.token.decode() | ||
try: | ||
self._refresh_trust_boundary(request) | ||
finally: | ||
self.token = original_token | ||
|
||
# 3. Now, refresh the original self-signed JWT for the target API. | ||
self._refresh_token(request) | ||
else: | ||
# For all other cases, use the standard refresh mechanism. | ||
super(Credentials, self).refresh(request) | ||
|
||
def _create_self_signed_jwt(self, audience): | ||
"""Create a self-signed JWT from the credentials if requirements are met. | ||
|
@@ -798,6 +848,12 @@ def with_trust_boundary(self, trust_boundary): | |
cred._trust_boundary = trust_boundary | ||
return cred | ||
|
||
def _refresh_token(self, request): | ||
"""Not used by this class, which overrides refresh() directly.""" | ||
# This is required to satisfy the abstract base class, but this | ||
# class's refresh() method is called directly and does not use this. | ||
pass | ||
|
||
def _build_trust_boundary_lookup_url(self): | ||
"""Builds and returns the URL for the trust boundary lookup API. | ||
|
||
|
Uh oh!
There was an error while loading. Please reload this page.