diff --git a/django_keycloak_auth/keycloak.py b/django_keycloak_auth/keycloak.py index 5ad9d26..b46fe5c 100644 --- a/django_keycloak_auth/keycloak.py +++ b/django_keycloak_auth/keycloak.py @@ -17,22 +17,20 @@ # You should have received a copy of the GNU Lesser General Public License # along with this program. If not, see . +import logging import json import jwt import requests -import logging -from base64 import b64decode -from cryptography.hazmat.primitives import serialization from django.core.cache import cache -from jwt.exceptions import DecodeError, ExpiredSignatureError +from jwt.exceptions import ExpiredSignatureError from requests import HTTPError LOGGER = logging.getLogger(__name__) class KeycloakConnect: - def __init__(self, server_url, realm_name, client_id, local_decode=False, client_secret_key=None, ): + def __init__(self, server_url, realm_name, client_id, local_decode=False, client_secret_key=None, audience=None): """Create Keycloak Instance. Args: @@ -58,23 +56,25 @@ def __init__(self, server_url, realm_name, client_id, local_decode=False, client self.client_id = client_id self.client_secret_key = client_secret_key self.local_decode = local_decode + self.audience = audience # Keycloak useful Urls + realms = "/realms/" self.well_known_endpoint = ( self.server_url - + "/realms/" + + realms + self.realm_name + "/.well-known/openid-configuration" ) self.token_introspection_endpoint = ( self.server_url - + "/realms/" + + realms + self.realm_name + "/protocol/openid-connect/token/introspect" ) self.userinfo_endpoint = ( self.server_url - + "/realms/" + + realms + self.realm_name + "/protocol/openid-connect/userinfo" ) @@ -211,7 +211,7 @@ def is_token_active(self, token, raise_exception=True): try: self.decode(token, options={"verify_exp": True}, raise_exception=raise_exception) is_active = True - except ExpiredSignatureError as e: + except ExpiredSignatureError: is_active = False else: introspect_token = self.introspect(token, raise_exception) @@ -219,21 +219,17 @@ def is_token_active(self, token, raise_exception=True): return True if is_active else False - def roles_from_token(self, token, raise_exception=True): + def roles_from_token(self, token_decoded): """ Get roles from token Args: - token (string): The string value of the token. + token_decoded (dict): Dict with payload from token raise_exception: Raise exception if the request ended with a status >= 400. Returns: list: List of roles. """ - if self.local_decode: - token_decoded = self.decode(token, raise_exception=raise_exception) - else: - token_decoded = self.introspect(token, raise_exception) realm_access = token_decoded.get("realm_access", None) resource_access = token_decoded.get("resource_access", None) @@ -299,11 +295,14 @@ def decode(self, token, audience=None, options=None, raise_exception=True): """ if audience is None: - audience = self.client_id + if options: + options['verify_aud'] = False + else: + options = {'verify_aud': False} jwks = self.jwks() keys = jwks.get('keys', []) - + public_keys = {} for jwk in keys: kid = jwk.get('kid') @@ -314,14 +313,12 @@ def decode(self, token, audience=None, options=None, raise_exception=True): key = public_keys.get(kid, '') try: - payload = jwt.decode(token, key=key, algorithms=['RS256'], audience=audience, options=options) + payload = jwt.decode(token, key=key, algorithms=['RS256'], + audience=audience, options=options) except Exception as ex: - LOGGER.error( - f"Error decoding token {ex}" - ) + LOGGER.error(f"Error decoding token {ex}") if raise_exception: raise return {} return payload - diff --git a/django_keycloak_auth/middleware.py b/django_keycloak_auth/middleware.py index f8b5d57..d8c3304 100644 --- a/django_keycloak_auth/middleware.py +++ b/django_keycloak_auth/middleware.py @@ -38,8 +38,8 @@ def __init__(self): self.server_url = config['KEYCLOAK_SERVER_URL'] self.realm = config['KEYCLOAK_REALM'] self.client_id = config['KEYCLOAK_CLIENT_ID'] - self.client_secret_key = config['KEYCLOAK_CLIENT_SECRET_KEY'] - except KeyError as e: + self.client_secret_key = config['KEYCLOAK_CLIENT_SECRET_KEY'] + except KeyError: raise ValueError("The mandatory KEYCLOAK configuration variables has not defined.") if config['KEYCLOAK_SERVER_URL'] is None: @@ -60,6 +60,8 @@ def __init__(self): raise ValueError("The LOCAL_DECODE configuration variable must be True or False.") else: self.local_decode = config.get('LOCAL_DECODE') + + self.audience = config.get('KEYCLOAK_AUDIENCE') class KeycloakMiddleware: @@ -77,25 +79,19 @@ def __init__(self, get_response): realm_name=self.keycloak_config.realm, client_id=self.keycloak_config.client_id, local_decode=self.keycloak_config.local_decode, - client_secret_key=self.keycloak_config.client_secret_key) + client_secret_key=self.keycloak_config.client_secret_key, + audience=self.keycloak_config.audience) def __call__(self, request): return self.get_response(request) def process_view(self, request, view_func, view_args, view_kwargs): - + # for now there is no role assigned yet and no userinfo defined request.roles = [] request.userinfo = [] - - # Checks the URIs (paths) that doesn't needs authentication - if hasattr(settings, 'KEYCLOAK_EXEMPT_URIS'): - path = request.path_info.lstrip('/') - if any(re.match(m, path) for m in settings.KEYCLOAK_EXEMPT_URIS): - # Checks to see if a request.method explicitly overwrites exemptions in SETTINGS - if hasattr(view_func.cls, "keycloak_roles") and request.method not in view_func.cls.keycloak_roles: - return None - + if not self.check_uris_path(request, view_func): + return None # There's condictions for these view_func.cls: # 1) @api_view -> view_func.cls is WrappedAPIView (validates in 'keycloak_roles' in decorators.py) -> True # 2) When it is a APIView, ViewSet or ModelViewSet with 'keycloak_roles' attribute -> False @@ -108,52 +104,64 @@ def process_view(self, request, view_func, view_args, view_kwargs): # Whether View hasn't this attribute, it means all request method routes will be permitted. try: view_roles = view_func.cls.keycloak_roles if not is_api_view else None - except AttributeError as e: + except AttributeError: return None - - # Checks if exists an authentication in the http request header + + # Checks if exists an authentication in the http request header if 'HTTP_AUTHORIZATION' not in request.META: - return JsonResponse({"detail": NotAuthenticated.default_detail}, status=NotAuthenticated.status_code) - - # Select actual role from 'keycloak_roles' according http request method (GET, POST, PUT or DELETE) + return JsonResponse( + {"detail": NotAuthenticated.default_detail}, + status=NotAuthenticated.status_code + ) + + # Select actual role from 'keycloak_roles' according http request method require_role = view_roles.get(request.method, [None]) if not is_api_view else [None] - + # Get access token from the http request header auth_header = request.META.get('HTTP_AUTHORIZATION').split() token = auth_header[1] if len(auth_header) == 2 else auth_header[0] - # Checks if the token is able to be decoded try: - if self.keycloak_config.local_decode: - self.keycloak.decode(token, options={'verify_signature': False}) - except Exception as ex: - LOGGER.error(f'Error in django_keycloak_auth middleware: {ex}') - return JsonResponse( - {"detail": "Invalid or expired token. Verify your Keycloak configuration."}, - status=AuthenticationFailed.status_code - ) - - # Checks token is active - if not self.keycloak.is_token_active(token): + token_payload = self.get_payload(token, self.keycloak_config.local_decode) + except Exception: return JsonResponse( {"detail": "Invalid or expired token. Verify your Keycloak configuration."}, status=AuthenticationFailed.status_code ) - # Get roles from access token - token_roles = self.keycloak.roles_from_token(token) + token_roles = self.keycloak.roles_from_token(token_payload) if token_roles is None: return JsonResponse( {'detail': 'This token has no client_id roles and no realm roles or client_id is not configured correctly.'}, status=AuthenticationFailed.status_code ) - # Check exists any Token Role contains in View Role for only APIView, ViewSet or ModelViewSet if not is_api_view and (len(set(token_roles) & set(require_role)) == 0): - return JsonResponse({'detail': PermissionDenied.default_detail}, status=PermissionDenied.status_code) - - # Add to View request param list of roles from authenticated token + return JsonResponse( + {'detail': PermissionDenied.default_detail}, + status=PermissionDenied.status_code + ) + request.roles = token_roles + request.userinfo = token_payload - # Add to userinfo to the view - request.userinfo = self.keycloak.userinfo(token) + def check_uris_path(self, request, view_func): + """Checks the URIs (paths) that doesn't needs authentication""" + if hasattr(settings, 'KEYCLOAK_EXEMPT_URIS'): + path = request.path_info.lstrip('/') + if any(re.match(m, path) for m in settings.KEYCLOAK_EXEMPT_URIS): + request_method = request.method not in view_func.cls.keycloak_roles + if hasattr(view_func.cls, "keycloak_roles") and request_method: + return False + return True + + def get_payload(self, token, local_decode): + '''Получение payload из токена''' + if local_decode: + payload_token = self.keycloak.decode( + token, + options={'verify_signature': True,"verify_exp": True} + ) + else: + payload_token = self.keycloak.introspect(token) + return payload_token