From a0eb1ea73e59c519b0c52b3e6c630b317bc5a2e5 Mon Sep 17 00:00:00 2001 From: Alima Date: Sat, 29 Oct 2022 16:14:00 +0330 Subject: [PATCH] create refresh and access token together now we create access token and refresh token together so in deny list we can block both of them, by blocking that is same in both --- fastapi_jwt_auth/auth_jwt.py | 55 +++++++++++++++++++++++++++++++++++- 1 file changed, 54 insertions(+), 1 deletion(-) diff --git a/fastapi_jwt_auth/auth_jwt.py b/fastapi_jwt_auth/auth_jwt.py index 4110bdb..05aeef6 100644 --- a/fastapi_jwt_auth/auth_jwt.py +++ b/fastapi_jwt_auth/auth_jwt.py @@ -14,16 +14,24 @@ RefreshTokenRequired, FreshTokenRequired ) +from fastapi import Header +from uuid import uuid4 class AuthJWT(AuthConfig): - def __init__(self,req: Request = None, res: Response = None): + def __init__(self,req: Request = None, res: Response = None, token:str= Header(None)): """ Get jwt header from incoming request or get request and response object if jwt in the cookie :param req: all incoming request :param res: response from endpoint + :param token: Bearer token to show in /docs """ + + if token and req != None: + auth = (b'authorization', bytes('Bearer {}'.format(token), "UTF-8")) + req.headers._list.append(auth) + if res and self.jwt_in_cookies: self._response = res @@ -267,6 +275,7 @@ def create_access_token( :return: hash token """ + return self._create_token( subject=subject, type_token="access", @@ -303,6 +312,50 @@ def create_refresh_token( audience=audience, user_claims=user_claims ) + + def create_pair_token( + self, + subject: Union[str,int], + algorithm: Optional[str] = None, + headers: Optional[Dict] = None, + expires_time: Optional[Union[timedelta,int,bool]] = None, + audience: Optional[Union[str,Sequence[str]]] = None, + user_claims: Optional[Dict] = {}, + fresh:Optional[bool]=False + ) -> str: + """ + Create a refresh token with 30 days for expired time (default), + info for param and return check to function create token + + :return: hash token + """ + user_claims["aid"] = str(uuid4()) + + refresh = self._create_token( + subject=subject, + type_token="refresh", + exp_time=self._get_expired_time("refresh",expires_time), + algorithm=algorithm, + headers=headers, + audience=audience, + user_claims=user_claims + ) + access = self._create_token( + subject=subject, + type_token="access", + exp_time=self._get_expired_time("access",expires_time), + fresh=fresh, + algorithm=algorithm, + headers=headers, + audience=audience, + user_claims=user_claims, + issuer=self._encode_issuer + ) + return { + "access_token": access, + "refresh_token" : refresh + } + def _get_csrf_token(self,encoded_token: str) -> str: """