diff --git a/fastapi_jwt_auth/auth_jwt.py b/fastapi_jwt_auth/auth_jwt.py index 4110bdb..05aeef6 100644 --- a/fastapi_jwt_auth/auth_jwt.py +++ b/fastapi_jwt_auth/auth_jwt.py @@ -14,16 +14,24 @@ RefreshTokenRequired, FreshTokenRequired ) +from fastapi import Header +from uuid import uuid4 class AuthJWT(AuthConfig): - def __init__(self,req: Request = None, res: Response = None): + def __init__(self,req: Request = None, res: Response = None, token:str= Header(None)): """ Get jwt header from incoming request or get request and response object if jwt in the cookie :param req: all incoming request :param res: response from endpoint + :param token: Bearer token to show in /docs """ + + if token and req != None: + auth = (b'authorization', bytes('Bearer {}'.format(token), "UTF-8")) + req.headers._list.append(auth) + if res and self.jwt_in_cookies: self._response = res @@ -267,6 +275,7 @@ def create_access_token( :return: hash token """ + return self._create_token( subject=subject, type_token="access", @@ -303,6 +312,50 @@ def create_refresh_token( audience=audience, user_claims=user_claims ) + + def create_pair_token( + self, + subject: Union[str,int], + algorithm: Optional[str] = None, + headers: Optional[Dict] = None, + expires_time: Optional[Union[timedelta,int,bool]] = None, + audience: Optional[Union[str,Sequence[str]]] = None, + user_claims: Optional[Dict] = {}, + fresh:Optional[bool]=False + ) -> str: + """ + Create a refresh token with 30 days for expired time (default), + info for param and return check to function create token + + :return: hash token + """ + user_claims["aid"] = str(uuid4()) + + refresh = self._create_token( + subject=subject, + type_token="refresh", + exp_time=self._get_expired_time("refresh",expires_time), + algorithm=algorithm, + headers=headers, + audience=audience, + user_claims=user_claims + ) + access = self._create_token( + subject=subject, + type_token="access", + exp_time=self._get_expired_time("access",expires_time), + fresh=fresh, + algorithm=algorithm, + headers=headers, + audience=audience, + user_claims=user_claims, + issuer=self._encode_issuer + ) + return { + "access_token": access, + "refresh_token" : refresh + } + def _get_csrf_token(self,encoded_token: str) -> str: """