1
1
import abc
2
2
import logging
3
- from collections .abc import Awaitable , Callable , Iterable , Mapping , MutableMapping
3
+ from collections .abc import Iterable
4
4
from importlib .metadata import version
5
- from typing import Any , Optional , cast
5
+ from typing import Optional
6
6
7
7
import httpx
8
8
from typing_extensions import Self
9
9
10
- from ._httpx import AuthType
11
- from .model .op import OperationModel
12
- from .model .request import RequestFactory
13
- from .operation import get_operation_model
14
- from .request import build_request
15
- from .types_ import MultiAuth , NamedAuth , SecurityRequirements
10
+ from .model .auth import AuthRegistry
11
+ from .types_ import NamedAuth , SecurityRequirements
16
12
17
13
logger = logging .getLogger (__name__ )
18
14
@@ -36,112 +32,25 @@ def __init__(
36
32
headers ['User-Agent' ] = user_agent
37
33
38
34
self ._client = _http_client or httpx .AsyncClient (base_url = base_url , headers = headers , ** httpx_kwargs )
39
- self ._security = security
40
- self ._lapidary_operations : MutableMapping [str , OperationModel ] = {}
41
- self ._auth : MutableMapping [str , httpx .Auth ] = {}
42
- self ._auth_cache : MutableMapping [str , httpx .Auth ] = {}
35
+ self ._auth_registry = AuthRegistry (security )
43
36
44
37
async def __aenter__ (self : Self ) -> Self :
45
38
await self ._client .__aenter__ ()
46
39
return self
47
40
48
41
async def __aexit__ (self , __exc_type = None , __exc_value = None , __traceback = None ) -> None :
49
- await self ._client .__aexit__ (__exc_type , __exc_value , __traceback )
50
-
51
- async def _request (
52
- self ,
53
- method : str ,
54
- path : str ,
55
- fn : Callable [..., Awaitable ],
56
- security : Optional [Iterable [SecurityRequirements ]],
57
- actual_params : Mapping [str , Any ],
58
- ):
59
- if fn .__name__ not in self ._lapidary_operations :
60
- operation = get_operation_model (method , path , fn )
61
- self ._lapidary_operations [fn .__name__ ] = operation
62
- else :
63
- operation = self ._lapidary_operations [fn .__name__ ]
64
-
65
- auth = self ._resolve_auth (fn , security )
66
-
67
- request = build_request (
68
- operation ,
69
- actual_params ,
70
- cast (RequestFactory , self ._client .build_request ),
71
- )
72
-
73
- logger .debug ('%s %s %s' , request .method , request .url , request .headers )
74
-
75
- response = await self ._client .send (request , auth = auth )
76
- await response .aread ()
77
-
78
- return operation .handle_response (response )
79
-
80
- def _resolve_auth (self , fn : Callable , security : Optional [Iterable [SecurityRequirements ]]) -> AuthType :
81
- if security :
82
- sec_name = fn .__name__
83
- sec_source = security
84
- elif self ._security :
85
- sec_name = '*'
86
- sec_source = self ._security
87
- else :
88
- sec_name = None
89
- sec_source = None
90
-
91
- if sec_source :
92
- assert sec_name
93
- if sec_name not in self ._auth_cache :
94
- auth = self ._mk_auth (sec_source )
95
- self ._auth_cache [sec_name ] = auth
96
- else :
97
- auth = self ._auth_cache [sec_name ]
98
- return auth
99
- else :
100
- return None
42
+ return await self ._client .__aexit__ (__exc_type , __exc_value , __traceback )
101
43
102
44
def lapidary_authenticate (self , * auth_args : NamedAuth , ** auth_kwargs : httpx .Auth ) -> None :
103
45
"""Register named Auth instances for future use with methods that require authentication."""
104
46
if auth_args :
105
47
# make python complain about duplicate names
106
48
self .lapidary_authenticate (** dict (auth_args ), ** auth_kwargs )
107
49
108
- self ._auth .update (auth_kwargs )
109
- self ._auth_cache .clear ()
50
+ self ._auth_registry .authenticate (auth_kwargs )
110
51
111
52
def lapidary_deauthenticate (self , * sec_names : str ) -> None :
112
53
"""Remove reference to a given Auth instance.
113
54
Calling with no parameters removes all references"""
114
55
115
- if sec_names :
116
- for sec_name in sec_names :
117
- del self ._auth [sec_name ]
118
- else :
119
- self ._auth .clear ()
120
- self ._auth_cache .clear ()
121
-
122
- def _mk_auth (self , security : Iterable [SecurityRequirements ]) -> httpx .Auth :
123
- security = list (security )
124
- assert security
125
- last_error : Optional [Exception ] = None
126
- for requirements in security :
127
- try :
128
- auth = _build_auth (self ._auth , requirements )
129
- break
130
- except ValueError as ve :
131
- last_error = ve
132
- continue
133
- else :
134
- assert last_error
135
- # due to asserts and break above, we never enter here, unless ValueError was raised
136
- raise last_error # noqa
137
- return auth
138
-
139
-
140
- def _build_auth (schemes : Mapping [str , httpx .Auth ], requirements : SecurityRequirements ) -> httpx .Auth :
141
- auth_flows = []
142
- for scheme , scopes in requirements .items ():
143
- auth_flow = schemes .get (scheme )
144
- if not auth_flow :
145
- raise ValueError ('Not authenticated' , scheme )
146
- auth_flows .append (auth_flow )
147
- return MultiAuth (* auth_flows )
56
+ self ._auth_registry .deauthenticate (sec_names )
0 commit comments