Skip to content

Custom for fuster #37

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 19 additions & 22 deletions django_keycloak_auth/keycloak.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,22 +17,20 @@
# You should have received a copy of the GNU Lesser General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.

import logging
import json
import jwt
import requests
import logging

from base64 import b64decode
from cryptography.hazmat.primitives import serialization
from django.core.cache import cache
from jwt.exceptions import DecodeError, ExpiredSignatureError
from jwt.exceptions import ExpiredSignatureError
from requests import HTTPError

LOGGER = logging.getLogger(__name__)


class KeycloakConnect:
def __init__(self, server_url, realm_name, client_id, local_decode=False, client_secret_key=None, ):
def __init__(self, server_url, realm_name, client_id, local_decode=False, client_secret_key=None, audience=None):
"""Create Keycloak Instance.

Args:
Expand All @@ -58,23 +56,25 @@ def __init__(self, server_url, realm_name, client_id, local_decode=False, client
self.client_id = client_id
self.client_secret_key = client_secret_key
self.local_decode = local_decode
self.audience = audience

# Keycloak useful Urls
realms = "/realms/"
self.well_known_endpoint = (
self.server_url
+ "/realms/"
+ realms
+ self.realm_name
+ "/.well-known/openid-configuration"
)
self.token_introspection_endpoint = (
self.server_url
+ "/realms/"
+ realms
+ self.realm_name
+ "/protocol/openid-connect/token/introspect"
)
self.userinfo_endpoint = (
self.server_url
+ "/realms/"
+ realms
+ self.realm_name
+ "/protocol/openid-connect/userinfo"
)
Expand Down Expand Up @@ -211,29 +211,25 @@ def is_token_active(self, token, raise_exception=True):
try:
self.decode(token, options={"verify_exp": True}, raise_exception=raise_exception)
is_active = True
except ExpiredSignatureError as e:
except ExpiredSignatureError:
is_active = False
else:
introspect_token = self.introspect(token, raise_exception)
is_active = introspect_token.get("active", None)

return True if is_active else False

def roles_from_token(self, token, raise_exception=True):
def roles_from_token(self, token_decoded):
"""
Get roles from token

Args:
token (string): The string value of the token.
token_decoded (dict): Dict with payload from token
raise_exception: Raise exception if the request ended with a status >= 400.

Returns:
list: List of roles.
"""
if self.local_decode:
token_decoded = self.decode(token, raise_exception=raise_exception)
else:
token_decoded = self.introspect(token, raise_exception)

realm_access = token_decoded.get("realm_access", None)
resource_access = token_decoded.get("resource_access", None)
Expand Down Expand Up @@ -299,11 +295,14 @@ def decode(self, token, audience=None, options=None, raise_exception=True):
"""

if audience is None:
audience = self.client_id
if options:
options['verify_aud'] = False
else:
options = {'verify_aud': False}

jwks = self.jwks()
keys = jwks.get('keys', [])

public_keys = {}
for jwk in keys:
kid = jwk.get('kid')
Expand All @@ -314,14 +313,12 @@ def decode(self, token, audience=None, options=None, raise_exception=True):
key = public_keys.get(kid, '')

try:
payload = jwt.decode(token, key=key, algorithms=['RS256'], audience=audience, options=options)
payload = jwt.decode(token, key=key, algorithms=['RS256'],
audience=audience, options=options)
except Exception as ex:
LOGGER.error(
f"Error decoding token {ex}"
)
LOGGER.error(f"Error decoding token {ex}")
if raise_exception:
raise
return {}

return payload

88 changes: 48 additions & 40 deletions django_keycloak_auth/middleware.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,8 @@ def __init__(self):
self.server_url = config['KEYCLOAK_SERVER_URL']
self.realm = config['KEYCLOAK_REALM']
self.client_id = config['KEYCLOAK_CLIENT_ID']
self.client_secret_key = config['KEYCLOAK_CLIENT_SECRET_KEY']
except KeyError as e:
self.client_secret_key = config['KEYCLOAK_CLIENT_SECRET_KEY']
except KeyError:
raise ValueError("The mandatory KEYCLOAK configuration variables has not defined.")

if config['KEYCLOAK_SERVER_URL'] is None:
Expand All @@ -60,6 +60,8 @@ def __init__(self):
raise ValueError("The LOCAL_DECODE configuration variable must be True or False.")
else:
self.local_decode = config.get('LOCAL_DECODE')

self.audience = config.get('KEYCLOAK_AUDIENCE')


class KeycloakMiddleware:
Expand All @@ -77,25 +79,19 @@ def __init__(self, get_response):
realm_name=self.keycloak_config.realm,
client_id=self.keycloak_config.client_id,
local_decode=self.keycloak_config.local_decode,
client_secret_key=self.keycloak_config.client_secret_key)
client_secret_key=self.keycloak_config.client_secret_key,
audience=self.keycloak_config.audience)

def __call__(self, request):
return self.get_response(request)

def process_view(self, request, view_func, view_args, view_kwargs):

# for now there is no role assigned yet and no userinfo defined
request.roles = []
request.userinfo = []

# Checks the URIs (paths) that doesn't needs authentication
if hasattr(settings, 'KEYCLOAK_EXEMPT_URIS'):
path = request.path_info.lstrip('/')
if any(re.match(m, path) for m in settings.KEYCLOAK_EXEMPT_URIS):
# Checks to see if a request.method explicitly overwrites exemptions in SETTINGS
if hasattr(view_func.cls, "keycloak_roles") and request.method not in view_func.cls.keycloak_roles:
return None

if not self.check_uris_path(request, view_func):
return None
# There's condictions for these view_func.cls:
# 1) @api_view -> view_func.cls is WrappedAPIView (validates in 'keycloak_roles' in decorators.py) -> True
# 2) When it is a APIView, ViewSet or ModelViewSet with 'keycloak_roles' attribute -> False
Expand All @@ -108,52 +104,64 @@ def process_view(self, request, view_func, view_args, view_kwargs):
# Whether View hasn't this attribute, it means all request method routes will be permitted.
try:
view_roles = view_func.cls.keycloak_roles if not is_api_view else None
except AttributeError as e:
except AttributeError:
return None
# Checks if exists an authentication in the http request header

# Checks if exists an authentication in the http request header
if 'HTTP_AUTHORIZATION' not in request.META:
return JsonResponse({"detail": NotAuthenticated.default_detail}, status=NotAuthenticated.status_code)

# Select actual role from 'keycloak_roles' according http request method (GET, POST, PUT or DELETE)
return JsonResponse(
{"detail": NotAuthenticated.default_detail},
status=NotAuthenticated.status_code
)

# Select actual role from 'keycloak_roles' according http request method
require_role = view_roles.get(request.method, [None]) if not is_api_view else [None]

# Get access token from the http request header
auth_header = request.META.get('HTTP_AUTHORIZATION').split()
token = auth_header[1] if len(auth_header) == 2 else auth_header[0]

# Checks if the token is able to be decoded
try:
if self.keycloak_config.local_decode:
self.keycloak.decode(token, options={'verify_signature': False})
except Exception as ex:
LOGGER.error(f'Error in django_keycloak_auth middleware: {ex}')
return JsonResponse(
{"detail": "Invalid or expired token. Verify your Keycloak configuration."},
status=AuthenticationFailed.status_code
)

# Checks token is active
if not self.keycloak.is_token_active(token):
token_payload = self.get_payload(token, self.keycloak_config.local_decode)
except Exception:
return JsonResponse(
{"detail": "Invalid or expired token. Verify your Keycloak configuration."},
status=AuthenticationFailed.status_code
)

# Get roles from access token
token_roles = self.keycloak.roles_from_token(token)
token_roles = self.keycloak.roles_from_token(token_payload)
if token_roles is None:
return JsonResponse(
{'detail': 'This token has no client_id roles and no realm roles or client_id is not configured correctly.'},
status=AuthenticationFailed.status_code
)

# Check exists any Token Role contains in View Role for only APIView, ViewSet or ModelViewSet
if not is_api_view and (len(set(token_roles) & set(require_role)) == 0):
return JsonResponse({'detail': PermissionDenied.default_detail}, status=PermissionDenied.status_code)

# Add to View request param list of roles from authenticated token
return JsonResponse(
{'detail': PermissionDenied.default_detail},
status=PermissionDenied.status_code
)

request.roles = token_roles
request.userinfo = token_payload

# Add to userinfo to the view
request.userinfo = self.keycloak.userinfo(token)
def check_uris_path(self, request, view_func):
"""Checks the URIs (paths) that doesn't needs authentication"""
if hasattr(settings, 'KEYCLOAK_EXEMPT_URIS'):
path = request.path_info.lstrip('/')
if any(re.match(m, path) for m in settings.KEYCLOAK_EXEMPT_URIS):
request_method = request.method not in view_func.cls.keycloak_roles
if hasattr(view_func.cls, "keycloak_roles") and request_method:
return False
return True

def get_payload(self, token, local_decode):
'''Получение payload из токена'''
if local_decode:
payload_token = self.keycloak.decode(
token,
options={'verify_signature': True,"verify_exp": True}
)
else:
payload_token = self.keycloak.introspect(token)
return payload_token