Skip to content

feat: Add trust boundary support for service accounts and impersonation. #1778

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 17 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
140 changes: 124 additions & 16 deletions google/auth/credentials.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@

import abc
from enum import Enum
import json
import os
import typing

from google.auth import _helpers, environment_vars
from google.auth import exceptions
Expand All @@ -26,6 +28,9 @@
from google.auth._refresh_worker import RefreshThreadManager

DEFAULT_UNIVERSE_DOMAIN = "googleapis.com"
NO_OP_TRUST_BOUNDARY_LOCATIONS: "typing.Tuple[str]" = ()
NO_OP_TRUST_BOUNDARY_ENCODED_LOCATIONS = "0x0"
TRUST_BOUNDARY_ENV_VAR = "GOOGLE_AUTH_TRUST_BOUNDARY_ENABLED"


class Credentials(_BaseCredentials):
Expand Down Expand Up @@ -178,22 +183,7 @@ def apply(self, headers, token=None):
token (Optional[str]): If specified, overrides the current access
token.
"""
self._apply(headers, token=token)
"""Trust boundary value will be a cached value from global lookup.

The response of trust boundary will be a list of regions and a hex
encoded representation.

An example of global lookup response:
{
"locations": [
"us-central1", "us-east1", "europe-west1", "asia-east1"
]
"encoded_locations": "0xA30"
}
"""
if self._trust_boundary is not None:
headers["x-allowed-locations"] = self._trust_boundary["encoded_locations"]
self._apply(headers, token)
if self.quota_project_id:
headers["x-goog-user-project"] = self.quota_project_id

Expand Down Expand Up @@ -299,6 +289,124 @@ def with_universe_domain(self, universe_domain):
)


class CredentialsWithTrustBoundary(Credentials):
"""Abstract base for credentials supporting ``with_trust_boundary`` factory"""

def with_trust_boundary(self, trust_boundary):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this method necessary? When would it be used?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The with_* method is part of a design pattern used for credential objects throughout this library.

We treat credential objects as mostly immutable. You'll see similar factory methods like with_scopes(), with_quota_project(), and with_universe_domain().

"""Returns a copy of these credentials with a modified trust boundary.

Args:
trust_boundary Mapping[str, str]: The trust boundary to use for the
credential. This should be a map with a "locations" key that maps to
a list of GCP regions, and a "encodedLocations" key that maps to a
hex string.

Returns:
google.auth.credentials.Credentials: A new credentials instance.
"""
raise NotImplementedError("This credential does not support trust boundaries.")

def apply(self, headers, token=None):
"""Apply the token to the authentication header."""
super().apply(headers, token)
if self._trust_boundary is not None:
headers["x-allowed-locations"] = self._trust_boundary["encodedLocations"]

def _refresh_trust_boundary(self, request):
"""Triggers a refresh of the trust boundary and updates the cache if necessary.

Args:
request (google.auth.transport.Request): The object used to make
HTTP requests.

Raises:
google.auth.exceptions.RefreshError: If the trust boundary could
not be refreshed and no cached value is available.
"""
# Do not trigger refresh if credential has a cached no-op trust boundary.
if self._has_no_op_trust_boundary():
return
new_trust_boundary = {}
try:
new_trust_boundary = self._lookup_trust_boundary(request)
except exceptions.RefreshError as error:
# If the call to the lookup API failed, check if there is a trust boundary
# already cached. If there is, do nothing. If not, then throw the error.
if self._trust_boundary is None:
raise (error)
return
else:
self._trust_boundary = new_trust_boundary

def _lookup_trust_boundary(self, request):
"""Calls the trust boundary lookup API to refresh the trust boundary cache.

Args:
request (google.auth.transport.Request): The object used to make
HTTP requests.

Returns:
trust_boundary (dict): The trust boundary object returned by the lookup API.

Raises:
google.auth.exceptions.RefreshError: If the trust boundary could not be
retrieved.
"""
from google.oauth2 import _client

# Verify the trust boundary feature flag is enabled.
if os.getenv(TRUST_BOUNDARY_ENV_VAR, "").lower() != "true":
# Skip the lookup and return early if it's not explicitly enabled.
return

# Skip trust boundary flow for non-gdu universe domain.
if self.universe_domain != DEFAULT_UNIVERSE_DOMAIN:
return

url = self._build_trust_boundary_lookup_url()
return _client.lookup_trust_boundary(request, url, self.token)

@abc.abstractmethod
def _build_trust_boundary_lookup_url(self):
"""
Builds and returns the URL for the trust boundary lookup API.

This method should be implemented by subclasses to provide the
specific URL based on the credential type and its properties.

Returns:
str: The URL for the trust boundary lookup endpoint, or None
if lookup should be skipped (e.g., for non-applicable universe domains).
"""
raise NotImplementedError("_build_trust_boundary_lookup_url must be implemented")

@staticmethod
def _parse_trust_boundary(trust_boundary_string: str):
try:
trust_boundary = json.loads(trust_boundary_string)
if (
"locations" not in trust_boundary
or "encodedLocations" not in trust_boundary
):
raise exceptions.MalformedError
return trust_boundary
except Exception:
raise exceptions.MalformedError(
"Cannot parse trust boundary {}".format(trust_boundary_string)
)

def _has_no_op_trust_boundary(self):
# A no-op trust boundary is indicated by encodedLocations being "0x0".
# The "locations" list may or may not be present as an empty list.
if (
self._trust_boundary is not None
and self._trust_boundary["encodedLocations"]
== NO_OP_TRUST_BOUNDARY_ENCODED_LOCATIONS
):
return True
return False


class AnonymousCredentials(Credentials):
"""Credentials that do not provide any authentication information.

Expand Down
17 changes: 12 additions & 5 deletions google/auth/external_account.py
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Lets revert these changes and keep them for a future PR. We should only add this with the changes for supporting the feature

Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ class Credentials(
credentials.Scoped,
credentials.CredentialsWithQuotaProject,
credentials.CredentialsWithTokenUri,
credentials.CredentialsWithTrustBoundary,
metaclass=abc.ABCMeta,
):
"""Base class for all external account credentials.
Expand Down Expand Up @@ -133,14 +134,14 @@ def __init__(
authorization grant.
default_scopes (Optional[Sequence[str]]): Default scopes passed by a
Google client library. Use 'scopes' for user-defined scopes.
workforce_pool_user_project (Optona[str]): The optional workforce pool user
workforce_pool_user_project (Optonal[str]): The optional workforce pool user
project number when the credential corresponds to a workforce pool and not
a workload identity pool. The underlying principal must still have
serviceusage.services.use IAM permission to use the project for
billing/quota.
universe_domain (str): The universe domain. The default universe
domain is googleapis.com.
trust_boundary (str): String representation of trust boundary meta.
trust_boundary (str): String representation of trust boundary metadata.
Raises:
google.auth.exceptions.RefreshError: If the generateAccessToken
endpoint returned an error.
Expand All @@ -167,9 +168,9 @@ def __init__(
self._default_scopes = default_scopes
self._workforce_pool_user_project = workforce_pool_user_project
self._trust_boundary = {
"locations": [],
"encoded_locations": "0x0",
} # expose a placeholder trust boundary value.
"locations": credentials.NO_OP_TRUST_BOUNDARY_LOCATIONS,
"encodedLocations": credentials.NO_OP_TRUST_BOUNDARY_ENCODED_LOCATIONS,
} # Sets a no-op trust boundary value.

if self._client_id:
self._client_auth = utils.ClientAuthentication(
Expand Down Expand Up @@ -456,6 +457,12 @@ def refresh(self, request):

self.expiry = now + lifetime

def _build_trust_boundary_lookup_url(self):
"""Builds and returns the URL for the trust boundary lookup API.
Will be implemented in a follow up PR.
"""
return

def _make_copy(self):
kwargs = self._constructor_args()
new_cred = self.__class__(**kwargs)
Expand Down
34 changes: 33 additions & 1 deletion google/auth/impersonated_credentials.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,9 @@
_DEFAULT_TOKEN_LIFETIME_SECS = 3600 # 1 hour in seconds

_GOOGLE_OAUTH2_TOKEN_ENDPOINT = "https://oauth2.googleapis.com/token"
_TRUST_BOUNDARY_LOOKUP_ENDPOINT = (
"https://iamcredentials.{}/v1/projects/-/serviceAccounts/{}/allowedLocations"
)

_SOURCE_CREDENTIAL_AUTHORIZED_USER_TYPE = "authorized_user"
_SOURCE_CREDENTIAL_SERVICE_ACCOUNT_TYPE = "service_account"
Expand Down Expand Up @@ -117,7 +120,10 @@ def _make_iam_token_request(


class Credentials(
credentials.Scoped, credentials.CredentialsWithQuotaProject, credentials.Signing
credentials.Scoped,
credentials.CredentialsWithQuotaProject,
credentials.Signing,
credentials.CredentialsWithTrustBoundary,
):
"""This module defines impersonated credentials which are essentially
impersonated identities.
Expand Down Expand Up @@ -190,6 +196,7 @@ def __init__(
lifetime=_DEFAULT_TOKEN_LIFETIME_SECS,
quota_project_id=None,
iam_endpoint_override=None,
trust_boundary=None,
):
"""
Args:
Expand Down Expand Up @@ -220,6 +227,7 @@ def __init__(
subject (Optional[str]): sub field of a JWT. This field should only be set
if you wish to impersonate as a user. This feature is useful when
using domain wide delegation.
trust_boundary (Mapping[str,str]): A credential trust boundary.
"""

super(Credentials, self).__init__()
Expand Down Expand Up @@ -251,13 +259,15 @@ def __init__(
self._quota_project_id = quota_project_id
self._iam_endpoint_override = iam_endpoint_override
self._cred_file_path = None
self._trust_boundary = trust_boundary

def _metric_header_for_usage(self):
return metrics.CRED_TYPE_SA_IMPERSONATE

@_helpers.copy_docstring(credentials.Credentials)
def refresh(self, request):
self._update_token(request)
self._refresh_trust_boundary(request)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One suggestion for a potential refactoring to improve the design:

Currently, each class that inherits from CredentialsWithTrustBoundary (e.g., compute_engine.Credentials, impersonated_credentials.Credentials, service_account.Credentials) is required to explicitly call self._refresh_trust_boundary(request) within its own refresh method.

To enhance encapsulation and reduce the burden on subclasses, it would be preferable if this call was handled automatically by the base class. Could this be refactored?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's a great suggestion!
I've refactored the CredentialsWithTrustBoundary base class to handle this. The base refresh() method now handles the entire refresh process. It calls a new abstract method, _refresh_token(), which subclasses must implement, and then it calls _refresh_trust_boundary() itself.


def _update_token(self, request):
"""Updates credentials with a new access_token representing
Expand Down Expand Up @@ -331,6 +341,28 @@ def _update_token(self, request):
iam_endpoint_override=self._iam_endpoint_override,
)

def _build_trust_boundary_lookup_url(self):
"""Builds and returns the URL for the trust boundary lookup API.

This method constructs the specific URL for the IAM Credentials API's
`allowedLocations` endpoint, using the credential's universe domain
and service account email.

Raises:
ValueError: If `self.service_account_email` is None or an empty
string, as it's required to form the URL.

Returns:
str: The URL for the trust boundary lookup endpoint.
"""
if not self.service_account_email:
raise ValueError(
"Service account email is required to build the trust boundary lookup URL."
)
return _TRUST_BOUNDARY_LOOKUP_ENDPOINT.format(
self.universe_domain, self.service_account_email
)

def sign_bytes(self, message):
from google.auth.transport.requests import AuthorizedSession

Expand Down
Loading