diff --git a/django_auth_adfs/__init__.py b/django_auth_adfs/__init__.py index 05785d5..67f1dd5 100644 --- a/django_auth_adfs/__init__.py +++ b/django_auth_adfs/__init__.py @@ -4,4 +4,4 @@ Adding imports here will break setup.py """ -__version__ = '1.15.0' +__version__ = "1.16.0" diff --git a/django_auth_adfs/backend.py b/django_auth_adfs/backend.py index c3165cf..6125826 100644 --- a/django_auth_adfs/backend.py +++ b/django_auth_adfs/backend.py @@ -181,7 +181,7 @@ def validate_access_token(self, access_token): logger.info(str(error)) raise PermissionDenied - def process_access_token(self, access_token, adfs_response=None): + def process_access_token(self, access_token, adfs_response=None, request=None): if not access_token: raise PermissionDenied @@ -197,6 +197,10 @@ def process_access_token(self, access_token, adfs_response=None): if not claims: raise PermissionDenied + # Store tokens in session if middleware is enabled + if request and adfs_response and hasattr(request, "token_storage"): + request.token_storage.store_tokens(request, access_token, adfs_response) + groups = self.process_user_groups(claims, access_token) user = self.create_user(claims) self.update_user_attributes(user, claims) @@ -420,7 +424,7 @@ def authenticate(self, request=None, authorization_code=None, **kwargs): adfs_response = self.exchange_auth_code(authorization_code, request) access_token = adfs_response["access_token"] - user = self.process_access_token(access_token, adfs_response) + user = self.process_access_token(access_token, adfs_response, request) return user @@ -440,7 +444,7 @@ def authenticate(self, request=None, access_token=None, **kwargs): return access_token = access_token.decode() - user = self.process_access_token(access_token) + user = self.process_access_token(access_token, request=request) return user diff --git a/django_auth_adfs/config.py b/django_auth_adfs/config.py index 317781f..e4a4a57 100644 --- a/django_auth_adfs/config.py +++ b/django_auth_adfs/config.py @@ -77,6 +77,12 @@ def __init__(self): ) self.PROXIES = None + # Token Lifecycle Middleware settings + self.TOKEN_REFRESH_THRESHOLD = 300 # 5 minutes + self.STORE_OBO_TOKEN = True + self.TOKEN_ENCRYPTION_SALT = b"django_auth_adfs_token_encryption" + self.LOGOUT_ON_TOKEN_REFRESH_FAILURE = False + self.VERSION = 'v1.0' self.SCOPES = [] diff --git a/django_auth_adfs/middleware.py b/django_auth_adfs/middleware.py index 649a239..159dcb9 100644 --- a/django_auth_adfs/middleware.py +++ b/django_auth_adfs/middleware.py @@ -1,6 +1,8 @@ """ Based on https://djangosnippets.org/snippets/1179/ """ + +import logging from re import compile from django.conf import settings as django_settings @@ -9,6 +11,7 @@ from django_auth_adfs.exceptions import MFARequired from django_auth_adfs.config import settings +from django_auth_adfs.token_manager import token_manager LOGIN_EXEMPT_URLS = [ compile(django_settings.LOGIN_URL.lstrip('/')), @@ -19,6 +22,8 @@ if hasattr(settings, 'LOGIN_EXEMPT_URLS'): LOGIN_EXEMPT_URLS += [compile(expr) for expr in settings.LOGIN_EXEMPT_URLS] +logger = logging.getLogger("django_auth_adfs") + class LoginRequiredMiddleware: """ @@ -30,6 +35,7 @@ class LoginRequiredMiddleware: Requires authentication middleware and template context processors to be loaded. You'll get an error if they aren't. """ + def __init__(self, get_response): self.get_response = get_response @@ -49,3 +55,47 @@ def __call__(self, request): return redirect_to_login('django_auth_adfs:login-force-mfa') return self.get_response(request) + + +class TokenLifecycleMiddleware: + """ + Middleware that handles the lifecycle of ADFS access and refresh tokens. + + This middleware will: + 1. Check if the access token is about to expire + 2. Use the refresh token to get a new access token if needed + 3. Update the tokens in the session + 4. Handle OBO (On-Behalf-Of) tokens for Microsoft Graph API + + Token storage during authentication is handled by the backend when this middleware is enabled. + + To enable this middleware, add it to your MIDDLEWARE setting: + 'django_auth_adfs.middleware.TokenLifecycleMiddleware' + + You can configure the token refresh behavior with these settings: + + TOKEN_REFRESH_THRESHOLD: Number of seconds before expiration to refresh (default: 300) + STORE_OBO_TOKEN: Boolean to enable/disable OBO token storage (default: True) + LOGOUT_ON_TOKEN_REFRESH_FAILURE: Whether to log out the user if token refresh fails (default: False) + """ + + def __init__(self, get_response): + self.get_response = get_response + self.token_manager = token_manager + # Log warning if using signed cookies + if token_manager.using_signed_cookies: + logger.warning( + "TokenLifecycleMiddleware is enabled but you are using the signed_cookies session backend. " + "Storing tokens in signed cookies is not recommended for security reasons and cookie size limitations. " + "The middleware will not store tokens in the session. " + "Consider using database or cache-based sessions instead." + ) + + def __call__(self, request): + if hasattr(request, "session") and not self.token_manager.using_signed_cookies: + request.token_storage = self.token_manager + + if hasattr(request, "user") and request.user.is_authenticated: + self.token_manager.check_token_expiration(request) + + return self.get_response(request) diff --git a/django_auth_adfs/token_manager.py b/django_auth_adfs/token_manager.py new file mode 100644 index 0000000..4876d29 --- /dev/null +++ b/django_auth_adfs/token_manager.py @@ -0,0 +1,489 @@ +""" +Token management for django-auth-adfs. + +This module provides a centralized way to manage tokens for django-auth-adfs. +""" + +import logging +import base64 +import datetime + +from cryptography.fernet import Fernet +from cryptography.hazmat.primitives import hashes +from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC + +from django.conf import settings as django_settings +from django.contrib.auth import logout +from django_auth_adfs.config import settings, provider_config + +logger = logging.getLogger("django_auth_adfs") + + +class TokenManager: + """ + Centralized manager for token lifecycle operations. + + This class handles: + - Token storage during authentication + - Token encryption/decryption + - Token refresh + - Token retrieval + - OBO token management + + It's designed to be lightweight when not actively performing operations, + and to handle all token operations in a safe, transparent, and error-free manner. + """ + + # Session key constants + ACCESS_TOKEN_KEY = "ADFS_ACCESS_TOKEN" + REFRESH_TOKEN_KEY = "ADFS_REFRESH_TOKEN" + TOKEN_EXPIRES_AT_KEY = "ADFS_TOKEN_EXPIRES_AT" + OBO_ACCESS_TOKEN_KEY = "ADFS_OBO_ACCESS_TOKEN" + OBO_TOKEN_EXPIRES_AT_KEY = "ADFS_OBO_TOKEN_EXPIRES_AT" + + def __init__(self): + """Initialize the TokenManager with settings.""" + self.refresh_threshold = getattr(settings, "TOKEN_REFRESH_THRESHOLD", 300) + self.store_obo_token = getattr(settings, "STORE_OBO_TOKEN", True) + self.logout_on_refresh_failure = getattr( + settings, "LOGOUT_ON_TOKEN_REFRESH_FAILURE", False + ) + + self.using_signed_cookies = ( + django_settings.SESSION_ENGINE + == "django.contrib.sessions.backends.signed_cookies" + ) + + if self.using_signed_cookies: + logger.warning( + "TokenManager: Storing tokens in signed cookies is not recommended for security " + "reasons and cookie size limitations. Token storage will be disabled." + ) + + def store_tokens(self, request, access_token, adfs_response=None): + if not hasattr(request, "session"): + return False + + try: + session_modified = False + + encrypted_token = self.encrypt_token(access_token) + if encrypted_token: + request.session[self.ACCESS_TOKEN_KEY] = encrypted_token + session_modified = True + logger.debug("Stored access token") + + if adfs_response and "refresh_token" in adfs_response: + refresh_token = adfs_response["refresh_token"] + if refresh_token: + encrypted_token = self.encrypt_token(refresh_token) + if encrypted_token: + request.session[self.REFRESH_TOKEN_KEY] = encrypted_token + session_modified = True + logger.debug("Stored refresh token") + else: + logger.warning("Failed to encrypt refresh token") + else: + logger.warning("Empty refresh token received from ADFS") + else: + logger.debug("No refresh token in ADFS response") + + if adfs_response and "expires_in" in adfs_response: + expires_at = datetime.datetime.now() + datetime.timedelta( + seconds=int(adfs_response["expires_in"]) + ) + request.session[self.TOKEN_EXPIRES_AT_KEY] = expires_at.isoformat() + session_modified = True + logger.debug("Stored token expiration") + + if self.store_obo_token: + try: + from django_auth_adfs.backend import AdfsBaseBackend + + backend = AdfsBaseBackend() + obo_token = backend.get_obo_access_token(access_token) + if obo_token: + encrypted_token = self.encrypt_token(obo_token) + if encrypted_token: + request.session[self.OBO_ACCESS_TOKEN_KEY] = encrypted_token + import jwt + + decoded_token = jwt.decode( + obo_token, options={"verify_signature": False} + ) + if "exp" in decoded_token: + obo_expires_at = datetime.datetime.fromtimestamp( + decoded_token["exp"] + ) + request.session[self.OBO_TOKEN_EXPIRES_AT_KEY] = ( + obo_expires_at.isoformat() + ) + session_modified = True + logger.debug( + "Stored OBO token with expiration from token claims" + ) + except Exception as e: + logger.warning(f"Error getting OBO token: {e}") + + if session_modified: + request.session.modified = True + logger.debug("All tokens stored successfully") + return True + + logger.warning("No tokens were stored") + return False + + except Exception as e: + logger.warning(f"Error storing tokens in session: {e}") + return False + + def _get_encryption_key(self): + """ + Derive a Fernet encryption key from Django's SECRET_KEY. + + Returns: + bytes: A 32-byte key suitable for Fernet encryption + """ + # Use Django's SECRET_KEY to derive a suitable encryption key + default_salt = b"django_auth_adfs_token_encryption" + salt = getattr(settings, "TOKEN_ENCRYPTION_SALT", default_salt) + + if isinstance(salt, str): + salt = salt.encode() + + kdf = PBKDF2HMAC( + algorithm=hashes.SHA256(), + length=32, + salt=salt, + iterations=100000, + ) + key = base64.urlsafe_b64encode(kdf.derive(django_settings.SECRET_KEY.encode())) + return key + + def encrypt_token(self, token): + """ + Encrypt a token using Django's SECRET_KEY. + + Args: + token (str): The token to encrypt + + Returns: + str: The encrypted token as a string or None if encryption fails + """ + if not token: + return None + + try: + key = self._get_encryption_key() + f = Fernet(key) + encrypted_token = f.encrypt(token.encode()) + return encrypted_token.decode() + except Exception as e: + logger.error(f"Error encrypting token: {e}") + return None + + def decrypt_token(self, encrypted_token): + """ + Decrypt a token that was encrypted using Django's SECRET_KEY. + + Args: + encrypted_token (str): The encrypted token + + Returns: + str: The decrypted token or None if decryption fails + """ + if not encrypted_token: + return None + + try: + key = self._get_encryption_key() + f = Fernet(key) + decrypted_token = f.decrypt(encrypted_token.encode()) + return decrypted_token.decode() + except Exception as e: + logger.error(f"Error decrypting token: {e}") + return None + + def get_access_token(self, request): + """ + Get the current access token from the session. + + The token is automatically decrypted before being returned. + + Args: + request: The current request object + + Returns: + str: The access token or None if not available + """ + if not hasattr(request, "session"): + return None + + if self.using_signed_cookies: + logger.debug("Token retrieval from signed_cookies session is disabled") + return None + + encrypted_token = request.session.get(self.ACCESS_TOKEN_KEY) + return self.decrypt_token(encrypted_token) + + def get_obo_access_token(self, request): + """ + Get the current OBO access token from the session. + + The token is automatically decrypted before being returned. + + Args: + request: The current request object + + Returns: + str: The OBO access token or None if not available + """ + if not hasattr(request, "session"): + return None + + if self.using_signed_cookies: + logger.debug("Token retrieval from signed_cookies session is disabled") + return None + + if not self.store_obo_token: + logger.debug("OBO token storage is disabled") + return None + + encrypted_token = request.session.get(self.OBO_ACCESS_TOKEN_KEY) + return self.decrypt_token(encrypted_token) + + def check_token_expiration(self, request): + """ + Check if tokens need to be refreshed and refresh them if needed. + + Args: + request: The current request object + + Returns: + bool: True if tokens were checked, False otherwise + """ + if not hasattr(request, "user") or not request.user.is_authenticated: + return False + + if self.using_signed_cookies: + return False + + try: + if self.TOKEN_EXPIRES_AT_KEY not in request.session: + return False + + # Check if token is about to expire + expires_at = datetime.datetime.fromisoformat( + request.session[self.TOKEN_EXPIRES_AT_KEY] + ) + remaining = expires_at - datetime.datetime.now() + + if remaining.total_seconds() < self.refresh_threshold: + logger.debug("Token is about to expire. Refreshing...") + self.refresh_tokens(request) + + # Check if OBO token is about to expire + if ( + self.store_obo_token + and self.OBO_TOKEN_EXPIRES_AT_KEY in request.session + ): + obo_expires_at = datetime.datetime.fromisoformat( + request.session[self.OBO_TOKEN_EXPIRES_AT_KEY] + ) + obo_remaining = obo_expires_at - datetime.datetime.now() + + if obo_remaining.total_seconds() < self.refresh_threshold: + logger.debug("OBO token is about to expire. Refreshing...") + self.refresh_obo_token(request) + + return True + + except Exception as e: + logger.warning(f"Error checking token expiration: {e}") + return False + + def refresh_tokens(self, request): + """ + Refresh the access token using the refresh token. + + + Args: + request: The current request object + + Returns: + bool: True if tokens were refreshed, False otherwise + """ + if self.using_signed_cookies: + return False + + if self.REFRESH_TOKEN_KEY not in request.session: + return False + + try: + refresh_token = self.decrypt_token(request.session[self.REFRESH_TOKEN_KEY]) + if not refresh_token: + logger.warning("Failed to decrypt refresh token") + return False + + provider_config.load_config() + + data = { + "grant_type": "refresh_token", + "client_id": settings.CLIENT_ID, + "refresh_token": refresh_token, + } + + if settings.CLIENT_SECRET: + data["client_secret"] = settings.CLIENT_SECRET + + token_endpoint = provider_config.token_endpoint + if token_endpoint is None: + logger.error("Token endpoint is None, cannot refresh tokens") + return False + + response = provider_config.session.post( + token_endpoint, data=data, timeout=settings.TIMEOUT + ) + + if response.status_code == 200: + token_data = response.json() + + # Store new tokens - if another refresh happened, these will just overwrite + # with fresher tokens, which is fine + request.session[self.ACCESS_TOKEN_KEY] = self.encrypt_token( + token_data["access_token"] + ) + request.session[self.REFRESH_TOKEN_KEY] = self.encrypt_token( + token_data["refresh_token"] + ) + expires_at = datetime.datetime.now() + datetime.timedelta( + seconds=int(token_data["expires_in"]) + ) + request.session[self.TOKEN_EXPIRES_AT_KEY] = expires_at.isoformat() + request.session.modified = True + logger.debug("Refreshed tokens successfully") + + # Also refresh the OBO token if needed + if self.store_obo_token: + self.refresh_obo_token(request) + + return True + else: + logger.warning( + f"Failed to refresh token: {response.status_code} {response.text}" + ) + if self.logout_on_refresh_failure: + logger.info("Logging out user due to token refresh failure") + logout(request) + return False + + except Exception as e: + logger.exception(f"Error refreshing tokens: {e}") + if self.logout_on_refresh_failure: + logger.info("Logging out user due to token refresh error") + logout(request) + return False + + def refresh_obo_token(self, request): + """ + Refresh the OBO token for Microsoft Graph API. + + Args: + request: The current request object + + Returns: + bool: True if OBO token was refreshed, False otherwise + """ + if not self.store_obo_token: + return False + + if self.using_signed_cookies: + return False + + if self.ACCESS_TOKEN_KEY not in request.session: + return False + + try: + provider_config.load_config() + + access_token = self.decrypt_token(request.session[self.ACCESS_TOKEN_KEY]) + if not access_token: + logger.warning("Failed to decrypt access token") + return False + + # Import here to avoid circular imports + from django_auth_adfs.backend import AdfsBaseBackend + + backend = AdfsBaseBackend() + obo_token = backend.get_obo_access_token(access_token) + + if obo_token: + request.session[self.OBO_ACCESS_TOKEN_KEY] = self.encrypt_token( + obo_token + ) + # Decode the OBO token to get its actual expiration time + import jwt + + decoded_token = jwt.decode( + obo_token, options={"verify_signature": False} + ) + if "exp" in decoded_token: + obo_expires_at = datetime.datetime.fromtimestamp( + decoded_token["exp"] + ) + request.session[self.OBO_TOKEN_EXPIRES_AT_KEY] = ( + obo_expires_at.isoformat() + ) + request.session.modified = True + logger.debug( + "Refreshed OBO token with expiration from token claims" + ) + return True + + return False + + except Exception as e: + logger.warning(f"Error refreshing OBO token: {e}") + return False + + def clear_tokens(self, request): + """ + Clear all tokens from the session. + + Args: + request: The current request object + + Returns: + bool: True if tokens were cleared, False otherwise + """ + if not hasattr(request, "session"): + return False + + try: + session_modified = False + + for key in [ + self.ACCESS_TOKEN_KEY, + self.REFRESH_TOKEN_KEY, + self.TOKEN_EXPIRES_AT_KEY, + self.OBO_ACCESS_TOKEN_KEY, + self.OBO_TOKEN_EXPIRES_AT_KEY, + ]: + if key in request.session: + del request.session[key] + session_modified = True + + if session_modified: + request.session.modified = True + logger.debug("Cleared tokens from session") + return True + + return False + + except Exception as e: + logger.warning(f"Error clearing tokens from session: {e}") + return False + + +# Create a singleton instance +token_manager = TokenManager() diff --git a/docs/index.rst b/docs/index.rst index ff325ec..006b008 100644 --- a/docs/index.rst +++ b/docs/index.rst @@ -38,6 +38,7 @@ Contents settings_ref config_guides middleware + token_lifecycle signals rest_framework demo diff --git a/docs/settings_ref.rst b/docs/settings_ref.rst index 9d7ad94..7648e19 100644 --- a/docs/settings_ref.rst +++ b/docs/settings_ref.rst @@ -497,3 +497,110 @@ PROXIES An optional proxy for all communication with the server. Example: ``{'http': '10.0.0.1', 'https': '10.0.0.2'}`` See the `requests documentation `__ for more information. + +TOKEN_REFRESH_THRESHOLD +--------------------------- +* **Default**: ``300`` (5 minutes) +* **Type**: ``integer`` +* **Unit**: seconds + +Used by the ``TokenLifecycleMiddleware`` to determine how long before token expiration to attempt a refresh. +This setting controls how proactively the middleware will refresh tokens before they expire. + +For example, with the default value of 300 seconds (5 minutes), if a token is set to expire in 4 minutes, +the middleware will attempt to refresh it during the next request. This helps ensure that users don't +experience disruptions due to token expiration during active sessions. + +.. code-block:: python + + # In your Django settings.py + # Refresh tokens 10 minutes before they expire + AUTH_ADFS = { + # other settings + "TOKEN_REFRESH_THRESHOLD": 600 + } + +STORE_OBO_TOKEN +------------------ +* **Default**: ``True`` +* **Type**: ``boolean`` + +Used by the ``TokenLifecycleMiddleware`` to enable or disable the storage of On-Behalf-Of (OBO) tokens +for Microsoft Graph API. Set to ``False`` if you don't need to access Microsoft Graph API. + +.. note:: + When using the ``TokenLifecycleMiddleware`` with Django's ``signed_cookies`` session backend, token storage + is always disabled for security reasons. This behavior cannot be overridden. If you need token storage, + you must use a different session backend like database or cache-based sessions. + +.. code-block:: python + + # In your Django settings.py + AUTH_ADFS = { + # other settings + "STORE_OBO_TOKEN": False + } + +TOKEN_ENCRYPTION_SALT +-------------------------- +* **Default**: ``b"django_auth_adfs_token_encryption"`` +* **Type**: ``string`` + +Used by the ``TokenLifecycleMiddleware`` to derive an encryption key for token encryption. +The salt is combined with Django's ``SECRET_KEY`` to create a unique encryption key. + +You can customize this value to use a different salt for token encryption: + +.. code-block:: python + + # In your Django settings.py + AUTH_ADFS = { + # other settings + "TOKEN_ENCRYPTION_SALT": "your-custom-salt-string" + } + +While the default value is defined as a bytes literal (with the ``b`` prefix) in the code, +you should simply provide a regular string in your settings. The middleware automatically +handles the conversion to bytes as needed. + +.. warning:: + If you change this setting after tokens have been stored in sessions, those tokens will no longer be decryptable. + This effectively invalidates all existing tokens, requiring users to re-authenticate. + Consider this when deploying changes to the salt in production environments. + +LOGOUT_ON_TOKEN_REFRESH_FAILURE +------------------------------- +* **Default**: ``False`` +* **Type**: ``boolean`` + +Used by the ``TokenLifecycleMiddleware`` to control whether users should be automatically logged out when token refresh fails. + +When set to ``True``, if a token refresh attempt fails (either due to an error response from the server or an exception), +the middleware will automatically log the user out of the Django application. + +When set to ``False`` (the default), the middleware will log the error but allow the user to continue using the application +until their session expires naturally, even though their tokens are no longer valid. + +This setting is particularly important for security considerations, as it determines how your application responds when a user's account +has been disabled in Azure AD/ADFS. When enabled, it can help ensure that users who have had their accounts disabled in the +identity provider are promptly logged out of your Django application, closing a potential security gap. + +This feature is disabled by default to prioritize user experience, but can be enabled for applications where security requirements +outweigh the potential disruption of unexpected logouts. + +.. code-block:: python + + # In your Django settings.py + AUTH_ADFS = { + # other settings + "LOGOUT_ON_TOKEN_REFRESH_FAILURE": True + } + +.. note:: + This setting only affects what happens when token refresh fails. It does not affect the initial authentication process + or what happens when tokens expire without a refresh attempt. + +.. important:: + Even for applications that don't make additional API calls after authentication, enabling this setting provides + an optional security mechanism that can help ensure that access revocation in Azure AD/ADFS is reflected in your + Django application. diff --git a/docs/token_lifecycle.rst b/docs/token_lifecycle.rst new file mode 100644 index 0000000..71f4592 --- /dev/null +++ b/docs/token_lifecycle.rst @@ -0,0 +1,430 @@ +Token Lifecycle Middleware +========================== + +Traditionally, django-auth-adfs is used **exclusively** as an authentication solution - it handles user authentication +via ADFS/Azure AD and maps claims to Django users. It doesn't really care about the access tokens from Azure/ADFS after you've been authenticated. + +The Token Lifecycle system extends django-auth-adfs beyond pure authentication to also handle the complete lifecycle of access tokens +after the authentication process. This creates a more integrated approach where: + +* The same application registration handles both authentication and resource access +* Tokens obtained during authentication are stored and refreshed automatically in the session +* The application can make delegated API calls on behalf of the user +* The system can optionally log out users when token refresh fails + +How it works +------------ + +The token lifecycle system performs the following: + +1. **Token Storage**: The django-auth-adfs backend automatically stores and encrypts tokens during authentication when the ``TokenLifecycleMiddleware`` is enabled +2. **Token Monitoring**: The middleware checks token expiration on each request +3. **Token Refresh**: When a token is about to expire, it is automatically refreshed +4. **OBO Token Management**: When enabled (by default), OBO tokens are automatically acquired and refreshed +5. **Security Controls**: Optional automatic logout on token refresh failures + +Read more about the OBO flow: https://learn.microsoft.com/en-us/entra/identity-platform/v2-oauth2-on-behalf-of-flow#protocol-diagram + + +.. warning:: + The Token Lifecycle system is a new feature in django-auth-adfs and is considered experimental. + Please be aware: + + **Currently no community support is guaranteed to be available for this feature** + + We recommend thoroughly testing this feature in your specific environment before deploying to production. + + Consider enabling the ``LOGOUT_ON_TOKEN_REFRESH_FAILURE`` setting, + which allows you to log out users when token refresh fails. + + +Configuration +------------- + +To enable the token lifecycle system, add the middleware to your ``MIDDLEWARE`` setting in your Django settings file: + +.. code-block:: python + + MIDDLEWARE = [ + # ... other middleware + 'django.contrib.sessions.middleware.SessionMiddleware', + 'django.contrib.auth.middleware.AuthenticationMiddleware', + 'django_auth_adfs.middleware.TokenLifecycleMiddleware', # Add this line + # ... other middleware + ] + +.. important:: + The middleware must be placed after the ``SessionMiddleware`` and ``AuthenticationMiddleware``. + + +You can configure the token lifecycle behavior with these settings in your Django settings file: + +.. code-block:: python + + AUTH_ADFS = { + # other settings + + # Number of seconds before expiration to refresh (default: 300, i.e., 5 minutes) + "TOKEN_REFRESH_THRESHOLD": 300, + + # Enable or disable OBO token functionality (default: True) + "STORE_OBO_TOKEN": True, + + # Custom salt for token encryption (optional) + # If not specified, a default salt is used + "TOKEN_ENCRYPTION_SALT": "your-custom-salt-string", + + # Automatically log out users when token refresh fails (default: False) + "LOGOUT_ON_TOKEN_REFRESH_FAILURE": False, + } + +.. warning:: + If you change the ``TOKEN_ENCRYPTION_SALT`` after tokens have been stored in sessions, those tokens will no longer be decryptable. + This effectively invalidates all existing tokens, requiring users to re-authenticate. + + Consider this when deploying changes to the salt in production environments. + +Considerations +-------------- + +- Token storage and encryption are handled automatically by the middleware during authentication +- Token refresh only works for authenticated users with valid sessions +- If the refresh token is invalid or expired, the system will not be able to refresh the access token +- By default, the system will not log the user out if token refresh fails, but this behavior can be changed with the ``LOGOUT_ON_TOKEN_REFRESH_FAILURE`` setting +- The system will not store tokens in the session when using the ``signed_cookies`` session backend +- OBO token storage is enabled by default but can be disabled with the ``STORE_OBO_TOKEN`` setting +- Using the OBO token versus the regular access token is dependent on the resources you are accessing and the permissions granted to your ADFS/Azure AD application. See `the token types section <#understanding-access-tokens-vs-obo-tokens>`_ for more details. + +**Token Refresh Failures** + +By default, when token refresh fails, the system logs the error but allows the user to continue using the application until their session expires naturally. This behavior can be changed with the ``LOGOUT_ON_TOKEN_REFRESH_FAILURE`` setting: + +- When set to ``False`` (default), users remain logged in even if their tokens can't be refreshed +- When set to ``True``, users are automatically logged out when token refresh fails + +**Existing Sessions** + +When deploying the Token Lifecycle system to an existing application with active user sessions, be aware of the following: + +The system only captures tokens during the authentication process. Existing authenticated sessions won't have tokens stored in them, which means: + +- Users with existing sessions won't have access to token-dependent features until they re-authenticate +- Utility functions like ``get_access_token()`` and ``get_obo_access_token()`` will return ``None`` for these sessions +- API calls that depend on these tokens will fail for existing sessions + +The best approach is to ensure that all users re-authenticate after the system is deployed. + +Azure AD Application Configuration +---------------------------------- + +When using the Token Lifecycle system, your Azure AD application registration needs additional permissions +beyond those required for simple authentication. This extends the standard authentication-only setup described in the :doc:`azure_ad_config_guide` with additional +API permissions needed for delegated access. + +.. important:: + Your Django application's session cookie age must be set to a value that is less than that of your ADFS/Azure AD application's refresh token lifetime. + + If a user's refresh token has expired, the user will be required to re-authenticate to continue making delegated requests. + +Security Overview +----------------------- + +**Token Encryption** + +Tokens are automatically encrypted before being stored in the session and decrypted when they are retrieved. +The encryption is handled transparently by the TokenManager and utility functions. + +**Signed Cookies Session Backend Restriction** + +If you're using the ``signed_cookies`` session backend and need token storage, you won't be able to use the token lifecycle system. + +.. note:: + This restriction only applies to the ``signed_cookies`` session backend. For other session backends (database, cache, file), + tokens are stored securely on the server and only a session ID is stored in the cookie. + +**Automatic OBO Token Acquisition** + +By default, the system automatically requests OBO tokens when storing tokens. If your application doesn't need OBO tokens, you can disable this behavior to reduce unnecessary token requests (see `the OBO token configuration section <#disabling-obo-token-functionality>`_ for more details). + +Disabling OBO Token Functionality +--------------------------------- + +By default, the Token Lifecycle system automatically requests and stores OBO (On-Behalf-Of) tokens. + +If you don't need this functionality, you can disable it completely: + +.. code-block:: python + + # In your Django settings.py + AUTH_ADFS = { + "STORE_OBO_TOKEN": False, + } + +Note that disabling OBO tokens doesn't affect the regular access token functionality. Your application will still be able to use the access token obtained during authentication for its own resources and APIs that directly trust your application. + +See `the token types section <#understanding-access-tokens-vs-obo-tokens>`_ for more details. + +Accessing Tokens in Your Views +------------------------------ + +Since tokens are encrypted in the session, the Token Lifecycle system provides a centralized TokenManager to help you access tokens safely: + +.. code-block:: python + + from django_auth_adfs.token_manager import token_manager + + # For your own APIs or APIs that trust your application directly + access_token = token_manager.get_access_token(request) + + # For Microsoft Graph API or other APIs requiring delegated access + obo_token = token_manager.get_obo_access_token(request) + +The TokenManager automatically handles encryption/decryption of tokens, so you don't need to worry about the encryption details. + +.. warning:: + You should always use the TokenManager to access tokens rather than accessing them directly from the session. + Direct access to ``request.session["ADFS_ACCESS_TOKEN"]`` will give you the encrypted token, not the actual token value. + +Examples +---------------------- + +Here are practical examples of using the TokenManager in your views: + +Using with Microsoft Graph API +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +This example demonstrates using the OBO token to access Microsoft Graph API + +.. code-block:: python + + from django.contrib.auth.decorators import login_required + from django.http import JsonResponse + from django_auth_adfs.token_manager import token_manager + import requests + + @login_required + def me_view(request): + """Get the user's profile from Microsoft Graph API""" + obo_token = token_manager.get_obo_access_token(request) + + if not obo_token: + return JsonResponse({"error": "No OBO token available"}, status=401) + + headers = { + "Authorization": f"Bearer {obo_token}", + "Content-Type": "application/json", + } + + try: + response = requests.get("https://graph.microsoft.com/v1.0/me", headers=headers) + response.raise_for_status() + return JsonResponse(response.json()) + except requests.exceptions.RequestException as e: + return JsonResponse( + {"error": "Failed to fetch user profile", "details": str(e)}, + status=500 + ) + +Using with Custom ADFS-Protected API +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +This example shows how to use the OBO token to access a custom API protected by ADFS that supports the OBO flow. + +.. code-block:: python + + from django.contrib.auth.decorators import login_required + from django.http import JsonResponse + from django_auth_adfs.token_manager import token_manager + import requests + + @login_required + def custom_api_view(request): + """Access a custom API using OBO token""" + obo_token = token_manager.get_obo_access_token(request) + + if not obo_token: + return JsonResponse({"error": "No OBO token available"}, status=401) + + headers = { + "Authorization": f"Bearer {obo_token}", + "Content-Type": "application/json", + } + + try: + response = requests.get( + "https://your-custom-api.example.com/data", + headers=headers + ) + response.raise_for_status() + return JsonResponse(response.json()) + except requests.exceptions.RequestException as e: + return JsonResponse( + {"error": "Failed to fetch data", "details": str(e)}, + status=500 + ) + +Using with Direct Resource Access +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +For APIs that directly trust your application (no OBO flow needed), use the regular access token: + +.. code-block:: python + + from rest_framework.views import APIView + from rest_framework.response import Response + from django_auth_adfs.token_manager import token_manager + import requests + + class ExternalApiView(APIView): + def get(self, request): + """Call an API that accepts your application's token""" + token = token_manager.get_access_token(request) + + if not token: + return Response({"error": "No access token available"}, status=401) + + headers = {"Authorization": f"Bearer {token}"} + response = requests.get("https://api.example.com/data", headers=headers) + + return Response(response.json()) + +Debug view +---------- + +The following example code demonstrates a debug view to check the values of the tokens stored in the session: + +.. code-block:: python + + import requests + from django.contrib.auth.decorators import login_required + from django.http import JsonResponse + from django_auth_adfs.token_manager import token_manager + from datetime import datetime + + @login_required + def debug_view(request): + """ + Debug view that provides detailed information about the authentication state, + tokens, and session data. + """ + if not request.user.is_authenticated: + return JsonResponse({"authenticated": False}) + + # Basic session token info + session_info = { + "has_access_token": token_manager.ACCESS_TOKEN_KEY in request.session, + "has_refresh_token": token_manager.REFRESH_TOKEN_KEY in request.session, + "has_expires_at": token_manager.TOKEN_EXPIRES_AT_KEY in request.session, + } + + # Add token expiration details if available + if token_manager.TOKEN_EXPIRES_AT_KEY in request.session: + try: + expires_at = datetime.fromisoformat( + request.session[token_manager.TOKEN_EXPIRES_AT_KEY] + ) + now = datetime.now() + session_info["token_expires_at"] = expires_at.isoformat() + session_info["expires_in_seconds"] = max( + 0, int((expires_at - now).total_seconds()) + ) + session_info["is_expired"] = expires_at <= now + except (ValueError, TypeError) as e: + session_info["expiration_parse_error"] = str(e) + + # Show raw encrypted tokens for debugging + if token_manager.ACCESS_TOKEN_KEY in request.session: + raw_token = request.session[token_manager.ACCESS_TOKEN_KEY] + session_info["raw_token_preview"] = f"{raw_token[:10]}...{raw_token[-10:]}" + session_info["raw_token_length"] = len(raw_token) + + # Try to decode as JWT without decryption (should fail if properly encrypted) + try: + import jwt + jwt.decode(raw_token, options={"verify_signature": False}) + session_info["is_encrypted"] = False + except: + session_info["is_encrypted"] = True + + # Get properly decrypted access token + try: + access_token = token_manager.get_access_token(request) + session_info["decrypted_access_token_available"] = access_token is not None + + if access_token: + if len(access_token) > 20: + session_info["decrypted_access_token_preview"] = ( + f"{access_token[:10]}...{access_token[-10:]}" + ) + session_info["decrypted_access_token_length"] = len(access_token) + + # Try to decode as JWT (should succeed if properly decrypted) + try: + import jwt + decoded = jwt.decode(access_token, options={"verify_signature": False}) + session_info["jwt_decode_success"] = True + # Add some basic JWT info without exposing sensitive data + if "exp" in decoded: + exp_time = datetime.fromtimestamp(decoded["exp"]) + session_info["jwt_expiry"] = exp_time.isoformat() + except Exception as e: + session_info["jwt_decode_error"] = str(e) + except Exception as e: + session_info["access_token_error"] = f"Error getting access token: {str(e)}" + + # Check if OBO token is available + try: + obo_token = token_manager.get_obo_access_token(request) + obo_info = { + "has_obo_token": obo_token is not None, + } + + # Show raw encrypted OBO token if available + if token_manager.OBO_ACCESS_TOKEN_KEY in request.session: + raw_obo = request.session[token_manager.OBO_ACCESS_TOKEN_KEY] + obo_info["raw_obo_preview"] = f"{raw_obo[:10]}...{raw_obo[-10:]}" + obo_info["raw_obo_length"] = len(raw_obo) + + if obo_token: + if len(obo_token) > 20: + obo_info["obo_token_preview"] = f"{obo_token[:10]}...{obo_token[-10:]}" + obo_info["obo_token_length"] = len(obo_token) + + # Try to decode as JWT (should succeed if properly decrypted) + try: + import jwt + decoded = jwt.decode(obo_token, options={"verify_signature": False}) + obo_info["jwt_decode_success"] = True + # Add some basic JWT info without exposing sensitive data + if "exp" in decoded: + exp_time = datetime.fromtimestamp(decoded["exp"]) + obo_info["jwt_expiry"] = exp_time.isoformat() + except Exception as e: + obo_info["jwt_decode_error"] = str(e) + except Exception as e: + obo_info = {"error": f"Error getting OBO token: {str(e)}"} + + # Return all the collected information + return JsonResponse( + { + "authenticated": True, + "user": { + "id": request.user.id, + "username": request.user.username, + "email": request.user.email, + "is_staff": request.user.is_staff, + "is_superuser": request.user.is_superuser, + }, + "session_tokens": session_info, + "obo_token": obo_info, + }, + json_dumps_params={"indent": 2}, + ) + +Understanding Access Tokens vs. OBO Tokens +------------------------------------------ + +For more information on the different types of permissions and flows, see: + +* `OAuth 2.0 On-Behalf-Of flow `_ +* `Permission types `_ diff --git a/pyproject.toml b/pyproject.toml index f0a11f5..a9db688 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = 'django-auth-adfs' -version = "1.15.0" # Remember to also change __init__.py version +version = "1.16.0" # Remember to also change __init__.py version description = 'A Django authentication backend for Microsoft ADFS and AzureAD' authors = ['Joris Beckers '] maintainers = ['Jonas Krüger Svensson ', 'Sondre Lillebø Gundersen '] diff --git a/tests/test_middleware.py b/tests/test_middleware.py new file mode 100644 index 0000000..11fa6d6 --- /dev/null +++ b/tests/test_middleware.py @@ -0,0 +1,324 @@ +""" +Tests for the TokenLifecycleMiddleware and TokenManager. +""" + +import datetime +import json +import base64 +from unittest.mock import Mock, patch +import time + +from django.contrib.auth import get_user_model +from django.contrib.auth.models import AnonymousUser +from django.test import TestCase, RequestFactory, override_settings +from django.contrib.sessions.backends.db import SessionStore + +from django_auth_adfs.middleware import TokenLifecycleMiddleware +from django_auth_adfs.config import settings as adfs_settings +from django_auth_adfs.token_manager import token_manager, TokenManager +from tests.settings import MIDDLEWARE + +User = get_user_model() + +MIDDLEWARE_WITH_TOKEN_LIFECYCLE = MIDDLEWARE + ( + "django_auth_adfs.middleware.TokenLifecycleMiddleware", +) + + +def create_test_token(claims=None, exp_delta=3600): + """Create a test JWT token with the given claims and expiration delta.""" + if claims is None: + claims = {} + + # Create a basic JWT token with ADFS-like structure + header = {"typ": "JWT", "alg": "RS256", "x5t": "example-thumbprint"} + + # Add standard ADFS claims if not present + now = int(time.time()) + if "iat" not in claims: + claims["iat"] = now + if "exp" not in claims: + claims["exp"] = now + exp_delta + if "aud" not in claims: + claims["aud"] = "microsoft:identityserver:your-RelyingPartyTrust-identifier" + if "iss" not in claims: + claims["iss"] = "https://sts.windows.net/01234567-89ab-cdef-0123-456789abcdef/" + if "sub" not in claims: + claims["sub"] = "john.doe@example.com" + + # Encode each part + header_part = ( + base64.urlsafe_b64encode(json.dumps(header).encode()).rstrip(b"=").decode() + ) + claims_part = ( + base64.urlsafe_b64encode(json.dumps(claims).encode()).rstrip(b"=").decode() + ) + signature_part = base64.urlsafe_b64encode(b"test_signature").rstrip(b"=").decode() + + # Combine parts + return f"{header_part}.{claims_part}.{signature_part}" + + +@override_settings(MIDDLEWARE=MIDDLEWARE_WITH_TOKEN_LIFECYCLE) +class TokenLifecycleTests(TestCase): + """ + Tests for the token lifecycle functionality, covering both TokenManager and TokenLifecycleMiddleware. + """ + + def setUp(self): + self.factory = RequestFactory() + self.user = User.objects.create_user(username="testuser") + self.request = self.factory.get("/") + self.request.user = self.user + self.request.session = SessionStore() + self.middleware = TokenLifecycleMiddleware(lambda r: None) + + def test_settings_configuration(self): + """Test settings are properly loaded from Django settings""" + with patch.object(adfs_settings, "TOKEN_REFRESH_THRESHOLD", 600), patch.object( + adfs_settings, "STORE_OBO_TOKEN", False + ), patch.object(adfs_settings, "LOGOUT_ON_TOKEN_REFRESH_FAILURE", True): + + manager = TokenManager() + self.assertEqual(manager.refresh_threshold, 600) + self.assertFalse(manager.store_obo_token) + self.assertTrue(manager.logout_on_refresh_failure) + + def test_token_storage_capability(self): + """Test token storage capability is properly added by middleware""" + # Test with no session + request_without_session = self.factory.get("/") + self.middleware(request_without_session) + self.assertFalse(hasattr(request_without_session, "token_storage")) + + # Test with signed cookies + token_manager.using_signed_cookies = True + try: + self.middleware(self.request) + self.assertFalse(hasattr(self.request, "token_storage")) + finally: + token_manager.using_signed_cookies = False + + # Test with valid session + self.middleware(self.request) + self.assertTrue(hasattr(self.request, "token_storage")) + self.assertIs(self.request.token_storage, token_manager) + + def test_token_storage_and_retrieval(self): + """Test the complete token storage and retrieval flow""" + access_token = create_test_token({"type": "access"}) + refresh_token = create_test_token({"type": "refresh"}) + + # Add token storage capability + self.middleware(self.request) + + # Store tokens + self.request.token_storage.store_tokens( + self.request, + access_token, + { + "access_token": access_token, + "refresh_token": refresh_token, + "expires_in": 3600, + }, + ) + + # Verify storage + self.assertEqual(token_manager.get_access_token(self.request), access_token) + self.assertTrue(token_manager.TOKEN_EXPIRES_AT_KEY in self.request.session) + + # Verify encryption + encrypted = self.request.session[token_manager.ACCESS_TOKEN_KEY] + self.assertNotEqual(encrypted, access_token) + self.assertEqual(token_manager.decrypt_token(encrypted), access_token) + + def test_token_refresh_flow(self): + """Test the complete token refresh flow""" + old_access_token = create_test_token({"type": "access"}, exp_delta=60) + old_refresh_token = create_test_token({"type": "refresh"}) + new_access_token = create_test_token({"type": "access"}) + new_refresh_token = create_test_token({"type": "refresh"}) + + # Add token storage capability and setup expired token + self.middleware(self.request) + self.request.token_storage.store_tokens( + self.request, + old_access_token, + { + "access_token": old_access_token, + "refresh_token": old_refresh_token, + "expires_in": 60, # Will trigger refresh + }, + ) + + # Mock refresh response + with patch("django_auth_adfs.token_manager.provider_config") as mock_config: + mock_response = Mock(status_code=200) + mock_response.json.return_value = { + "access_token": new_access_token, + "refresh_token": new_refresh_token, + "expires_in": 3600, + } + mock_config.session.post.return_value = mock_response + mock_config.token_endpoint = "https://example.com/token" + + # Trigger refresh via middleware + self.middleware(self.request) + + # Verify tokens were updated + self.assertEqual( + token_manager.get_access_token(self.request), new_access_token + ) + + def test_obo_token_management(self): + """Test OBO token functionality when enabled""" + access_token = create_test_token({"type": "access"}) + obo_token = create_test_token({"type": "obo"}) + + # Add token storage capability and store regular token + self.middleware(self.request) + self.request.token_storage.store_tokens( + self.request, + access_token, + {"access_token": access_token, "expires_in": 3600}, + ) + + # Mock OBO flow + with patch("django_auth_adfs.backend.AdfsBaseBackend") as mock_backend: + mock_backend.return_value.get_obo_access_token.return_value = obo_token + + # Verify OBO token storage and retrieval + self.request.session[token_manager.OBO_ACCESS_TOKEN_KEY] = ( + token_manager.encrypt_token(obo_token) + ) + self.request.session[token_manager.OBO_TOKEN_EXPIRES_AT_KEY] = ( + datetime.datetime.now() + datetime.timedelta(hours=1) + ).isoformat() + + self.assertEqual( + token_manager.get_obo_access_token(self.request), obo_token + ) + + def test_error_handling(self): + """Test error handling in various scenarios""" + # Add token storage capability + self.middleware(self.request) + + # Test invalid data handling + self.assertIsNone(token_manager.decrypt_token("invalid_data")) + self.assertIsNone(token_manager.encrypt_token(None)) + + # Test refresh failure + access_token = create_test_token({"type": "access"}, exp_delta=-60) + refresh_token = create_test_token({"type": "refresh"}) + + with patch("django_auth_adfs.token_manager.provider_config") as mock_config: + # Setup expired tokens first + self.request.token_storage.store_tokens( + self.request, + access_token, + { + "access_token": access_token, + "refresh_token": refresh_token, + "expires_in": -60, # Already expired + }, + ) + + mock_config.session.post.return_value = Mock(status_code=400, text="Error") + mock_config.token_endpoint = "https://example.com/token" + + token_manager.logout_on_refresh_failure = True + try: + with patch("django_auth_adfs.token_manager.logout") as mock_logout: + token_manager.refresh_tokens(self.request) + mock_logout.assert_called_once_with(self.request) + finally: + token_manager.logout_on_refresh_failure = False + + def test_signed_cookies_handling(self): + """Test behavior with signed cookies session backend""" + token_manager.using_signed_cookies = True + try: + self.middleware(self.request) + self.assertFalse(hasattr(self.request, "token_storage")) + finally: + token_manager.using_signed_cookies = False + + def test_middleware_integration(self): + """Test TokenLifecycleMiddleware integration""" + # Test with unauthenticated user + self.request.user = AnonymousUser() + response = self.middleware(self.request) + self.assertIsNone(response) # Middleware should pass through + + # Test with authenticated user + self.request.user = self.user + with patch.object(token_manager, "check_token_expiration") as mock_check: + self.middleware(self.request) + mock_check.assert_called_once_with(self.request) + + def test_clear_tokens(self): + """Test clearing tokens from session""" + access_token = create_test_token({"type": "access"}) + refresh_token = create_test_token({"type": "refresh"}) + + # Add token storage capability and store tokens + self.middleware(self.request) + self.request.token_storage.store_tokens( + self.request, + access_token, + { + "access_token": access_token, + "refresh_token": refresh_token, + "expires_in": 3600, + }, + ) + + # Verify tokens were stored + self.assertTrue(token_manager.ACCESS_TOKEN_KEY in self.request.session) + self.assertTrue(token_manager.REFRESH_TOKEN_KEY in self.request.session) + + # Clear tokens + success = token_manager.clear_tokens(self.request) + self.assertTrue(success) + + # Verify tokens were cleared + self.assertFalse(token_manager.ACCESS_TOKEN_KEY in self.request.session) + self.assertFalse(token_manager.REFRESH_TOKEN_KEY in self.request.session) + self.assertFalse(token_manager.TOKEN_EXPIRES_AT_KEY in self.request.session) + self.assertFalse(token_manager.OBO_ACCESS_TOKEN_KEY in self.request.session) + self.assertFalse(token_manager.OBO_TOKEN_EXPIRES_AT_KEY in self.request.session) + + def test_refresh_obo_token_directly(self): + """Test direct OBO token refresh""" + access_token = create_test_token({"type": "access"}) + new_obo_token = create_test_token({"type": "obo"}) + + # Add token storage capability and store access token + self.middleware(self.request) + self.request.token_storage.store_tokens( + self.request, + access_token, + {"access_token": access_token, "expires_in": 3600}, + ) + + # Mock OBO token acquisition and provider config + with patch("django_auth_adfs.backend.AdfsBaseBackend") as mock_backend, patch( + "django_auth_adfs.token_manager.provider_config" + ) as mock_provider: + + mock_backend.return_value.get_obo_access_token.return_value = new_obo_token + mock_provider.load_config.return_value = None + mock_provider.token_endpoint = "https://example.com/token" + mock_provider.session.verify = False # Disable cert validation + + # Refresh OBO token + success = token_manager.refresh_obo_token(self.request) + self.assertTrue(success) + + # Verify new OBO token was stored + obo_token = token_manager.get_obo_access_token(self.request) + self.assertEqual(obo_token, new_obo_token) + self.assertTrue( + token_manager.OBO_TOKEN_EXPIRES_AT_KEY in self.request.session + )