From 56684903a3756bd4185b55516520cdb7942ac152 Mon Sep 17 00:00:00 2001 From: cureprotocols Date: Wed, 2 Apr 2025 20:01:20 -0600 Subject: [PATCH] fix(types): add full type annotations to oauth utils and exceptions (#1508) --- google/auth/exceptions.py | 12 +++- google/oauth2/utils.py | 144 ++++++++++++++------------------------ 2 files changed, 62 insertions(+), 94 deletions(-) diff --git a/google/auth/exceptions.py b/google/auth/exceptions.py index feb9f7411..45c202605 100644 --- a/google/auth/exceptions.py +++ b/google/auth/exceptions.py @@ -59,8 +59,16 @@ def retryable(self): class OAuthError(GoogleAuthError): - """Used to indicate an error occurred during an OAuth related HTTP - request.""" + """Used to indicate an error occurred during an OAuth-related HTTP request.""" + + def __init__( + self, + message: Optional[str] = None, + response_body: Optional[str] = None, + **kwargs: Any, + ) -> None: + super().__init__(message or "", **kwargs) + self.response_body = response_body class ReauthFailError(RefreshError): diff --git a/google/oauth2/utils.py b/google/oauth2/utils.py index d72ff1916..86675da84 100644 --- a/google/oauth2/utils.py +++ b/google/oauth2/utils.py @@ -14,154 +14,114 @@ """OAuth 2.0 Utilities. -This module provides implementations for various OAuth 2.0 utilities. -This includes `OAuth error handling`_ and -`Client authentication for OAuth flows`_. - -OAuth error handling --------------------- -This will define interfaces for handling OAuth related error responses as -stated in `RFC 6749 section 5.2`_. -This will include a common function to convert these HTTP error responses to a -:class:`google.auth.exceptions.OAuthError` exception. - - -Client authentication for OAuth flows -------------------------------------- -We introduce an interface for defining client authentication credentials based -on `RFC 6749 section 2.3.1`_. This will expose the following -capabilities: - - * Ability to support basic authentication via request header. - * Ability to support bearer token authentication via request header. - * Ability to support client ID / secret authentication via request body. - -.. _RFC 6749 section 2.3.1: https://tools.ietf.org/html/rfc6749#section-2.3.1 -.. _RFC 6749 section 5.2: https://tools.ietf.org/html/rfc6749#section-5.2 +Provides OAuth error handling and client authentication utilities. """ import abc import base64 import enum import json +from typing import Mapping, Optional, MutableMapping, Any from google.auth import exceptions -# OAuth client authentication based on -# https://tools.ietf.org/html/rfc6749#section-2.3. class ClientAuthType(enum.Enum): basic = 1 request_body = 2 -class ClientAuthentication(object): - """Defines the client authentication credentials for basic and request-body - types based on https://tools.ietf.org/html/rfc6749#section-2.3.1. - """ +class ClientAuthentication: + """OAuth client authentication credentials. - def __init__(self, client_auth_type, client_id, client_secret=None): - """Instantiates a client authentication object containing the client ID - and secret credentials for basic and response-body auth. + Args: + client_auth_type: The client authentication type. + client_id: The client ID. + client_secret: The client secret (optional). + """ - Args: - client_auth_type (google.oauth2.oauth_utils.ClientAuthType): The - client authentication type. - client_id (str): The client ID. - client_secret (Optional[str]): The client secret. - """ + def __init__( + self, + client_auth_type: ClientAuthType, + client_id: str, + client_secret: Optional[str] = None, + ) -> None: self.client_auth_type = client_auth_type self.client_id = client_id self.client_secret = client_secret class OAuthClientAuthHandler(metaclass=abc.ABCMeta): - """Abstract class for handling client authentication in OAuth-based - operations. - """ + """Handles client authentication in OAuth flows.""" - def __init__(self, client_authentication=None): - """Instantiates an OAuth client authentication handler. - - Args: - client_authentication (Optional[google.oauth2.utils.ClientAuthentication]): - The OAuth client authentication credentials if available. - """ - super(OAuthClientAuthHandler, self).__init__() + def __init__(self, client_authentication: Optional[ClientAuthentication] = None) -> None: self._client_authentication = client_authentication def apply_client_authentication_options( - self, headers, request_body=None, bearer_token=None - ): - """Applies client authentication on the OAuth request's headers or POST - body. + self, + headers: MutableMapping[str, str], + request_body: Optional[MutableMapping[str, str]] = None, + bearer_token: Optional[str] = None, + ) -> None: + """Applies authentication via headers or POST body. Args: - headers (Mapping[str, str]): The HTTP request header. - request_body (Optional[Mapping[str, str]]): The HTTP request body - dictionary. For requests that do not support request body, this - is None and will be ignored. - bearer_token (Optional[str]): The optional bearer token. + headers: HTTP headers. + request_body: POST body dictionary (optional). + bearer_token: Bearer token (optional). """ - # Inject authenticated header. self._inject_authenticated_headers(headers, bearer_token) - # Inject authenticated request body. if bearer_token is None: self._inject_authenticated_request_body(request_body) - def _inject_authenticated_headers(self, headers, bearer_token=None): + def _inject_authenticated_headers( + self, + headers: MutableMapping[str, str], + bearer_token: Optional[str] = None, + ) -> None: if bearer_token is not None: - headers["Authorization"] = "Bearer %s" % bearer_token + headers["Authorization"] = f"Bearer {bearer_token}" elif ( self._client_authentication is not None - and self._client_authentication.client_auth_type is ClientAuthType.basic + and self._client_authentication.client_auth_type == ClientAuthType.basic ): username = self._client_authentication.client_id password = self._client_authentication.client_secret or "" + credentials = base64.b64encode(f"{username}:{password}".encode()).decode() + headers["Authorization"] = f"Basic {credentials}" - credentials = base64.b64encode( - ("%s:%s" % (username, password)).encode() - ).decode() - headers["Authorization"] = "Basic %s" % credentials - - def _inject_authenticated_request_body(self, request_body): + def _inject_authenticated_request_body( + self, + request_body: Optional[MutableMapping[str, str]], + ) -> None: if ( self._client_authentication is not None - and self._client_authentication.client_auth_type - is ClientAuthType.request_body + and self._client_authentication.client_auth_type == ClientAuthType.request_body ): if request_body is None: - raise exceptions.OAuthError( - "HTTP request does not support request-body" - ) - else: - request_body["client_id"] = self._client_authentication.client_id - request_body["client_secret"] = ( - self._client_authentication.client_secret or "" - ) + raise exceptions.OAuthError("HTTP request does not support request-body") + + request_body["client_id"] = self._client_authentication.client_id + request_body["client_secret"] = self._client_authentication.client_secret or "" -def handle_error_response(response_body): - """Translates an error response from an OAuth operation into an - OAuthError exception. +def handle_error_response(response_body: str) -> None: + """Converts OAuth error JSON response to an exception. Args: - response_body (str): The decoded response data. + response_body: The decoded response data as string. Raises: - google.auth.exceptions.OAuthError + OAuthError: A typed exception with error details. """ try: - error_components = [] - error_data = json.loads(response_body) - - error_components.append("Error code {}".format(error_data["error"])) + error_data: dict[str, Any] = json.loads(response_body) + error_components = [f"Error code {error_data['error']}"] if "error_description" in error_data: - error_components.append(": {}".format(error_data["error_description"])) + error_components.append(f": {error_data['error_description']}") if "error_uri" in error_data: - error_components.append(" - {}".format(error_data["error_uri"])) + error_components.append(f" - {error_data['error_uri']}") error_details = "".join(error_components) - # If no details could be extracted, use the response data. except (KeyError, ValueError): error_details = response_body