1
1
import asyncio
2
2
import hashlib
3
- import json
4
3
import logging
5
4
import threading
6
- from typing import Optional , Any , Callable
5
+ from typing import Optional , Any , Callable , Union
7
6
8
7
import httpx
9
8
10
9
from .endpoint import Endpoint
11
10
from ..cache import BaseCache , MemoryCache , memory_cache
12
11
from ..exception import HTTPResponseError
12
+ from ..serializer import Serializer , AutoSerializer
13
13
from ..typings import SyncAsync
14
14
15
15
logger = logging .getLogger (__name__ )
@@ -29,15 +29,25 @@ def _parse_config_key(key: str):
29
29
return key .split ('#' )
30
30
31
31
32
+ def _serialize_config (
33
+ config : Any ,
34
+ serializer : Optional [Union ["Serializer" , bool ]] = None
35
+ ):
36
+ """ Serialize config with serializer """
37
+ if isinstance (serializer , bool ) and serializer is True :
38
+ serializer = AutoSerializer ()
39
+ if isinstance (serializer , Serializer ):
40
+ return serializer (config )
41
+ return config
42
+
43
+
32
44
class _BaseConfigEndpoint (Endpoint ):
33
45
34
46
def _get (
35
47
self ,
36
48
data_id : str ,
37
49
group : str ,
38
- tenant : Optional [str ] = '' ,
39
- * ,
40
- serialized : Optional [bool ] = False
50
+ tenant : Optional [str ] = ''
41
51
) -> SyncAsync [Any ]:
42
52
return self .client .request (
43
53
"/nacos/v1/cs/configs" ,
@@ -46,7 +56,7 @@ def _get(
46
56
"group" : group ,
47
57
"tenant" : tenant ,
48
58
},
49
- serialized = serialized
59
+ serialized = False
50
60
)
51
61
52
62
def publish (
@@ -121,12 +131,10 @@ def subscriber(
121
131
class ConfigOperationMixin :
122
132
123
133
@staticmethod
124
- def _config_callback (callback , config , serialized ):
134
+ def _config_callback (callback , config , serializer ):
125
135
if not callable (callback ):
126
136
return
127
-
128
- if serialized :
129
- config = json .loads (config )
137
+ config = _serialize_config (config , serializer )
130
138
callback (config )
131
139
132
140
def get (
@@ -135,20 +143,20 @@ def get(
135
143
group : str ,
136
144
tenant : Optional [str ] = '' ,
137
145
* ,
138
- serialized : Optional [bool ] = False ,
146
+ serializer : Optional [Union [ "Serializer" , bool ]] = None ,
139
147
cache : Optional [BaseCache ] = None ,
140
148
default : Optional [str ] = None
141
149
) -> SyncAsync [Any ]:
142
150
cache = cache or memory_cache
143
151
config_key = _get_config_key (data_id , group , tenant )
144
152
try :
145
- config = self ._get (data_id , group , tenant , serialized = serialized )
153
+ config = self ._get (data_id , group , tenant )
146
154
# todo: this function need to be optimized
147
155
cache .set (config_key , config )
148
- return config
156
+ return _serialize_config ( config , serializer )
149
157
except (httpx .ConnectError , httpx .TimeoutException ) as exc :
150
158
logger .error ("Failed to get config from server, try to get from cache. %s" , exc )
151
- return cache .get (config_key )
159
+ return _serialize_config ( cache .get (config_key ), serializer )
152
160
except HTTPResponseError as exc :
153
161
logger .debug ("Failed to get config from server. %s" , exc )
154
162
if exc .status == 404 and default is not None :
@@ -161,7 +169,7 @@ def subscribe(
161
169
group : str ,
162
170
tenant : Optional [str ] = '' ,
163
171
timeout : Optional [int ] = 30_000 ,
164
- serialized : Optional [bool ] = False ,
172
+ serializer : Optional [Union [ "Serializer" , bool ]] = None ,
165
173
cache : Optional [BaseCache ] = None ,
166
174
callback : Optional [Callable ] = None
167
175
) -> SyncAsync [Any ]:
@@ -179,10 +187,10 @@ def _subscriber():
179
187
if not response :
180
188
continue
181
189
logging .info ("Configuration update detected." )
182
- last_config = self .get (data_id , group , tenant , serialized = False )
190
+ last_config = self ._get (data_id , group , tenant )
183
191
last_md5 = _get_md5 (last_config )
184
192
cache .set (config_key , last_config )
185
- self ._config_callback (callback , last_config , serialized )
193
+ self ._config_callback (callback , last_config , serializer )
186
194
except Exception as exc :
187
195
logging .error (exc )
188
196
stop_event .wait (1 )
@@ -195,12 +203,11 @@ def _subscriber():
195
203
class ConfigAsyncOperationMixin :
196
204
197
205
@staticmethod
198
- async def _config_callback (callback , config , serialized ):
206
+ async def _config_callback (callback , config , serializer ):
199
207
if not callable (callback ):
200
208
return
201
209
202
- if serialized :
203
- config = json .loads (config )
210
+ config = _serialize_config (config , serializer )
204
211
if asyncio .iscoroutinefunction (callback ):
205
212
await callback (config )
206
213
else :
@@ -212,19 +219,19 @@ async def get(
212
219
group : str ,
213
220
tenant : Optional [str ] = '' ,
214
221
* ,
215
- serialized : Optional [bool ] = False ,
222
+ serializer : Optional [Union [ "Serializer" , bool ]] = None ,
216
223
cache : Optional [BaseCache ] = None ,
217
224
default : Optional [str ] = None
218
225
) -> SyncAsync [Any ]:
219
226
cache = cache or memory_cache
220
227
config_key = _get_config_key (data_id , group , tenant )
221
228
try :
222
- config = await self ._get (data_id , group , tenant , serialized = serialized )
229
+ config = await self ._get (data_id , group , tenant )
223
230
cache .set (config_key , config )
224
- return config
231
+ return _serialize_config ( config , serializer )
225
232
except (httpx .ConnectError , httpx .TimeoutException ) as exc :
226
233
logger .error ("Failed to get config from server, try to get from cache. %s" , exc )
227
- return cache .get (config_key )
234
+ return _serialize_config ( cache .get (config_key ), serializer )
228
235
except HTTPResponseError as exc :
229
236
logger .debug ("Failed to get config from server. %s" , exc )
230
237
if exc .status == 404 and default is not None :
@@ -237,7 +244,7 @@ async def subscribe(
237
244
group : str ,
238
245
tenant : Optional [str ] = '' ,
239
246
timeout : Optional [int ] = 30_000 ,
240
- serialized : Optional [bool ] = False ,
247
+ serializer : Optional [Union [ "Serializer" , bool ]] = None ,
241
248
cache : Optional [BaseCache ] = None ,
242
249
callback : Optional [Callable ] = None ,
243
250
) -> SyncAsync [Any ]:
@@ -257,10 +264,10 @@ async def _async_subscriber():
257
264
if not response :
258
265
continue
259
266
logging .info ("Configuration update detected." )
260
- last_config = await self .get (data_id , group , tenant , serialized = False )
267
+ last_config = await self ._get (data_id , group , tenant )
261
268
last_md5 = _get_md5 (last_config )
262
269
cache .set (config_key , last_config )
263
- await self ._config_callback (callback , last_config , serialized )
270
+ await self ._config_callback (callback , last_config , serializer )
264
271
except asyncio .CancelledError :
265
272
break
266
273
except Exception as exc :
0 commit comments