-
Notifications
You must be signed in to change notification settings - Fork 325
feat: Add trust boundary support for service accounts and impersonation. #1778
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
dfd26f6
eac0a50
baefc0f
fb08254
929bd1f
61654c7
5edd899
f064e90
52b1fc5
1d26a76
36015c6
cd213c0
c281fee
bd342ea
5af9ef9
d7d6b75
a052c9e
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -30,11 +30,16 @@ | |
from google.auth.compute_engine import _metadata | ||
from google.oauth2 import _client | ||
|
||
_TRUST_BOUNDARY_LOOKUP_ENDPOINT = ( | ||
"https://iamcredentials.{}/v1/projects/-/serviceAccounts/{}/allowedLocations" | ||
) | ||
|
||
|
||
class Credentials( | ||
credentials.Scoped, | ||
credentials.CredentialsWithQuotaProject, | ||
credentials.CredentialsWithUniverseDomain, | ||
credentials.CredentialsWithTrustBoundary, | ||
): | ||
"""Compute Engine Credentials. | ||
|
||
|
@@ -61,6 +66,7 @@ def __init__( | |
scopes=None, | ||
default_scopes=None, | ||
universe_domain=None, | ||
trust_boundary=None, | ||
): | ||
""" | ||
Args: | ||
|
@@ -76,6 +82,7 @@ def __init__( | |
provided or None, credential will attempt to fetch the value | ||
from metadata server. If metadata server doesn't have universe | ||
domain endpoint, then the default googleapis.com will be used. | ||
trust_boundary (Mapping[str,str]): A credential trust boundary. | ||
""" | ||
super(Credentials, self).__init__() | ||
self._service_account_email = service_account_email | ||
|
@@ -86,11 +93,12 @@ def __init__( | |
if universe_domain: | ||
self._universe_domain = universe_domain | ||
self._universe_domain_cached = True | ||
self._trust_boundary = trust_boundary | ||
|
||
def _metric_header_for_usage(self): | ||
return metrics.CRED_TYPE_SA_MDS | ||
|
||
def refresh(self, request): | ||
def _refresh_token(self, request): | ||
"""Refresh the access token and scopes. | ||
|
||
Args: | ||
|
@@ -111,6 +119,31 @@ def refresh(self, request): | |
new_exc = exceptions.RefreshError(caught_exc) | ||
raise new_exc from caught_exc | ||
|
||
def _build_trust_boundary_lookup_url(self): | ||
"""Builds and returns the URL for the trust boundary lookup API for GCE.""" | ||
# If the service account email is 'default', we need to get the | ||
# actual email address from the metadata server. | ||
if self._service_account_email == "default": | ||
from google.auth.transport import requests as google_auth_requests | ||
|
||
request = google_auth_requests.Request() | ||
try: | ||
info = _metadata.get_service_account_info(request, "default") | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @harkamaljot you had worked on optimizing this call somewhere else in the code recently. Can you PTAL at this method and see if there are any issues in doing this call or something can be optimized. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I optimized this call when getting token for the service account, however for getting the trust boundary looks like we need service account email. Based on the documentation(http://cloud/iam/docs/reference/credentials/rest/v1/projects.serviceAccounts/getAllowedLocations) and also trying it out manually as well, I get 400 error when querying this endpoint using default. |
||
# Cache the fetched email so we don't have to do this again. | ||
self._service_account_email = info["email"] | ||
|
||
except exceptions.TransportError as e: | ||
# If fetching the service account email fails due to a transport error, | ||
# it means we cannot build the trust boundary lookup URL. | ||
# Wrap this in a RefreshError so it's caught by _refresh_trust_boundary. | ||
raise exceptions.RefreshError( | ||
f"Failed to get service account email for trust boundary lookup: {e}" | ||
) from e | ||
|
||
return _TRUST_BOUNDARY_LOOKUP_ENDPOINT.format( | ||
self.universe_domain, self.service_account_email | ||
) | ||
|
||
@property | ||
def service_account_email(self): | ||
"""The service account email. | ||
|
@@ -152,8 +185,9 @@ def with_quota_project(self, quota_project_id): | |
quota_project_id=quota_project_id, | ||
scopes=self._scopes, | ||
default_scopes=self._default_scopes, | ||
universe_domain=self._universe_domain, | ||
trust_boundary=self._trust_boundary, | ||
) | ||
creds._universe_domain = self._universe_domain | ||
creds._universe_domain_cached = self._universe_domain_cached | ||
return creds | ||
|
||
|
@@ -167,8 +201,9 @@ def with_scopes(self, scopes, default_scopes=None): | |
default_scopes=default_scopes, | ||
service_account_email=self._service_account_email, | ||
quota_project_id=self._quota_project_id, | ||
universe_domain=self._universe_domain, | ||
trust_boundary=self._trust_boundary, | ||
) | ||
creds._universe_domain = self._universe_domain | ||
creds._universe_domain_cached = self._universe_domain_cached | ||
return creds | ||
|
||
|
@@ -179,9 +214,23 @@ def with_universe_domain(self, universe_domain): | |
default_scopes=self._default_scopes, | ||
service_account_email=self._service_account_email, | ||
quota_project_id=self._quota_project_id, | ||
trust_boundary=self._trust_boundary, | ||
universe_domain=universe_domain, | ||
) | ||
|
||
@_helpers.copy_docstring(credentials.CredentialsWithTrustBoundary) | ||
def with_trust_boundary(self, trust_boundary): | ||
creds = self.__class__( | ||
service_account_email=self._service_account_email, | ||
quota_project_id=self._quota_project_id, | ||
scopes=self._scopes, | ||
default_scopes=self._default_scopes, | ||
universe_domain=self._universe_domain, | ||
trust_boundary=trust_boundary, | ||
) | ||
creds._universe_domain_cached = self._universe_domain_cached | ||
return creds | ||
|
||
|
||
_DEFAULT_TOKEN_LIFETIME_SECS = 3600 # 1 hour in seconds | ||
_DEFAULT_TOKEN_URI = "https://www.googleapis.com/oauth2/v4/token" | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -17,15 +17,20 @@ | |
|
||
import abc | ||
from enum import Enum | ||
import json | ||
import os | ||
import typing | ||
|
||
from google.auth import _helpers, environment_vars | ||
from google.auth import exceptions | ||
from google.auth import metrics | ||
from google.auth._credentials_base import _BaseCredentials | ||
from google.auth._default import _LOGGER | ||
from google.auth._refresh_worker import RefreshThreadManager | ||
|
||
DEFAULT_UNIVERSE_DOMAIN = "googleapis.com" | ||
NO_OP_TRUST_BOUNDARY_LOCATIONS: "typing.Tuple[str]" = () | ||
NO_OP_TRUST_BOUNDARY_ENCODED_LOCATIONS = "0x0" | ||
|
||
|
||
class Credentials(_BaseCredentials): | ||
|
@@ -178,22 +183,7 @@ def apply(self, headers, token=None): | |
token (Optional[str]): If specified, overrides the current access | ||
token. | ||
""" | ||
self._apply(headers, token=token) | ||
"""Trust boundary value will be a cached value from global lookup. | ||
|
||
The response of trust boundary will be a list of regions and a hex | ||
encoded representation. | ||
|
||
An example of global lookup response: | ||
{ | ||
"locations": [ | ||
"us-central1", "us-east1", "europe-west1", "asia-east1" | ||
] | ||
"encoded_locations": "0xA30" | ||
} | ||
""" | ||
if self._trust_boundary is not None: | ||
headers["x-allowed-locations"] = self._trust_boundary["encoded_locations"] | ||
self._apply(headers, token) | ||
if self.quota_project_id: | ||
headers["x-goog-user-project"] = self.quota_project_id | ||
|
||
|
@@ -299,6 +289,161 @@ def with_universe_domain(self, universe_domain): | |
) | ||
|
||
|
||
class CredentialsWithTrustBoundary(Credentials): | ||
"""Abstract base for credentials supporting ``with_trust_boundary`` factory""" | ||
|
||
@abc.abstractmethod | ||
def _refresh_token(self, request): | ||
"""Refreshes the access token. | ||
|
||
Args: | ||
request (google.auth.transport.Request): The object used to make | ||
HTTP requests. | ||
|
||
Raises: | ||
google.auth.exceptions.RefreshError: If the credentials could | ||
not be refreshed. | ||
""" | ||
raise NotImplementedError("_refresh_token must be implemented") | ||
|
||
def with_trust_boundary(self, trust_boundary): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is this method necessary? When would it be used? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The with_* method is part of a design pattern used for credential objects throughout this library. We treat credential objects as mostly immutable. You'll see similar factory methods like with_scopes(), with_quota_project(), and with_universe_domain(). |
||
"""Returns a copy of these credentials with a modified trust boundary. | ||
|
||
Args: | ||
trust_boundary Mapping[str, str]: The trust boundary to use for the | ||
credential. This should be a map with a "locations" key that maps to | ||
a list of GCP regions, and a "encodedLocations" key that maps to a | ||
hex string. | ||
|
||
Returns: | ||
google.auth.credentials.Credentials: A new credentials instance. | ||
""" | ||
raise NotImplementedError("This credential does not support trust boundaries.") | ||
|
||
def _is_trust_boundary_lookup_required(self): | ||
"""Checks if a trust boundary lookup is required. | ||
|
||
A lookup is required if the feature is enabled via an environment | ||
variable, the universe domain is supported, and a no-op boundary | ||
is not already cached. | ||
|
||
Returns: | ||
bool: True if a trust boundary lookup is required, False otherwise. | ||
""" | ||
# 1. Check if the feature is enabled via environment variable. | ||
if not _helpers.get_bool_from_env( | ||
environment_vars.GOOGLE_AUTH_TRUST_BOUNDARY_ENABLED, default=False | ||
): | ||
return False | ||
|
||
# 2. Skip trust boundary flow for non-default universe domains. | ||
if self.universe_domain != DEFAULT_UNIVERSE_DOMAIN: | ||
return False | ||
|
||
# 3. Do not trigger refresh if credential has a cached no-op trust boundary. | ||
return not self._has_no_op_trust_boundary() | ||
|
||
def _get_trust_boundary_header(self): | ||
if self._trust_boundary is not None: | ||
if self._has_no_op_trust_boundary(): | ||
# STS expects an empty string if the trust boundary value is no-op. | ||
return {"x-allowed-locations": ""} | ||
else: | ||
return {"x-allowed-locations": self._trust_boundary["encodedLocations"]} | ||
return {} | ||
|
||
def apply(self, headers, token=None): | ||
"""Apply the token to the authentication header.""" | ||
super().apply(headers, token) | ||
headers.update(self._get_trust_boundary_header()) | ||
|
||
def refresh(self, request): | ||
"""Refreshes the access token and the trust boundary. | ||
|
||
This method calls the subclass's token refresh logic and then | ||
refreshes the trust boundary if applicable. | ||
""" | ||
self._refresh_token(request) | ||
self._refresh_trust_boundary(request) | ||
|
||
def _refresh_trust_boundary(self, request): | ||
"""Triggers a refresh of the trust boundary and updates the cache if necessary. | ||
|
||
Args: | ||
request (google.auth.transport.Request): The object used to make | ||
HTTP requests. | ||
|
||
Raises: | ||
google.auth.exceptions.RefreshError: If the trust boundary could | ||
not be refreshed and no cached value is available. | ||
""" | ||
if not self._is_trust_boundary_lookup_required(): | ||
return | ||
try: | ||
self._trust_boundary = self._lookup_trust_boundary(request) | ||
except exceptions.RefreshError as error: | ||
# If the call to the lookup API failed, check if there is a trust boundary | ||
# already cached. If there is, do nothing. If not, then throw the error. | ||
if self._trust_boundary is None: | ||
raise error | ||
if _helpers.is_logging_enabled(_LOGGER): | ||
_LOGGER.debug( | ||
"Using cached trust boundary due to refresh error: %s", error | ||
) | ||
return | ||
nbayati marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
def _lookup_trust_boundary(self, request): | ||
"""Calls the trust boundary lookup API to refresh the trust boundary cache. | ||
|
||
Args: | ||
request (google.auth.transport.Request): The object used to make | ||
HTTP requests. | ||
|
||
Returns: | ||
trust_boundary (dict): The trust boundary object returned by the lookup API. | ||
|
||
Raises: | ||
google.auth.exceptions.RefreshError: If the trust boundary could not be | ||
retrieved. | ||
""" | ||
from google.oauth2 import _client | ||
|
||
url = self._build_trust_boundary_lookup_url() | ||
if not url: | ||
raise exceptions.InvalidValue("Failed to build trust boundary lookup URL.") | ||
|
||
headers = {} | ||
self.apply(headers) | ||
return _client._lookup_trust_boundary(request, url, headers=headers) | ||
|
||
@abc.abstractmethod | ||
def _build_trust_boundary_lookup_url(self): | ||
""" | ||
Builds and returns the URL for the trust boundary lookup API. | ||
|
||
This method should be implemented by subclasses to provide the | ||
specific URL based on the credential type and its properties. | ||
|
||
Returns: | ||
str: The URL for the trust boundary lookup endpoint, or None | ||
if lookup should be skipped (e.g., for non-applicable universe domains). | ||
""" | ||
raise NotImplementedError( | ||
"_build_trust_boundary_lookup_url must be implemented" | ||
) | ||
|
||
def _has_no_op_trust_boundary(self): | ||
# A no-op trust boundary is indicated by encodedLocations being "0x0". | ||
# The "locations" list may or may not be present as an empty list. | ||
if ( | ||
self._trust_boundary is not None | ||
and self._trust_boundary["encodedLocations"] | ||
== NO_OP_TRUST_BOUNDARY_ENCODED_LOCATIONS | ||
): | ||
return True | ||
return False | ||
|
||
|
||
class AnonymousCredentials(Credentials): | ||
"""Credentials that do not provide any authentication information. | ||
|
||
|
@@ -382,8 +527,7 @@ def default_scopes(self): | |
|
||
@abc.abstractproperty | ||
def requires_scopes(self): | ||
"""True if these credentials require scopes to obtain an access token. | ||
""" | ||
"""True if these credentials require scopes to obtain an access token.""" | ||
return False | ||
|
||
def has_scopes(self, scopes): | ||
|
Uh oh!
There was an error while loading. Please reload this page.