1
+ import asyncio
1
2
import re
3
+ from functools import wraps
4
+ from typing import Dict , Union , Tuple , List , Optional , Any , TypeVar , Callable , cast
2
5
3
6
import websockets
4
7
11
14
KPageSize = 65536
12
15
13
16
14
- def calculate_offset (aIndex ) :
17
+ def calculate_offset (aIndex : int ) -> int :
15
18
offset = 0
16
19
17
20
if (aIndex > vlxDevConstants .RANGE_START_g_cyclone_general_info ) and (
@@ -125,6 +128,28 @@ def to_kelvin(value: float) -> int:
125
128
return int (round (value * 10 ) * 10 + 27315 )
126
129
127
130
131
+ FuncT = TypeVar ("FuncT" , bound = Callable [..., Any ])
132
+
133
+
134
+ def _websocket_exception_handler (request_fn : FuncT ) -> FuncT :
135
+ @wraps (request_fn )
136
+ async def wrapped (* args : Any , ** kwargs : Any ) -> Any :
137
+ try :
138
+ return await request_fn (* args , ** kwargs )
139
+ except websockets .InvalidHandshake as e :
140
+ raise ValloxWebsocketException ("Websocket handshake failed" ) from e
141
+ except websockets .InvalidURI as e :
142
+ raise ValloxWebsocketException ("Websocket invalid URI" ) from e
143
+ except websockets .PayloadTooBig as e :
144
+ raise ValloxWebsocketException ("Websocket payload too big" ) from e
145
+ except websockets .InvalidState as e :
146
+ raise ValloxWebsocketException ("Websocket invalid state" ) from e
147
+ except websockets .WebSocketProtocolError as e :
148
+ raise ValloxWebsocketException ("Websocket protocol error" ) from e
149
+
150
+ return cast (FuncT , wrapped )
151
+
152
+
128
153
class Client :
129
154
SETTABLE_INT_VALS = {
130
155
re .compile ("^A_CYC_STATE$" ),
@@ -135,18 +160,12 @@ class Client:
135
160
re .compile ("^A_CYC_(?:EXTR|SUPP)_FAN_BALANCE_BASE$" ),
136
161
}
137
162
138
- _settable_addresses = None
163
+ _settable_addresses : Dict [ int , type ]
139
164
140
- def get_settable_addresses (self ):
141
- if not self ._settable_addresses :
142
- self ._settable_addresses = dict (
143
- (v , int )
144
- for k , v in vlxDevConstants .__dict__ .items ()
145
- if any (r .match (k ) for r in self .SETTABLE_INT_VALS )
146
- )
165
+ def get_settable_addresses (self ) -> Dict [int , type ]:
147
166
return self ._settable_addresses
148
167
149
- def set_settable_address (self , address , var_type ) :
168
+ def set_settable_address (self , address : Union [ int , str ], var_type : type ) -> None :
150
169
if var_type not in [int , float ]:
151
170
raise AttributeError ("Only float or int type are supported" )
152
171
@@ -165,16 +184,23 @@ def set_settable_address(self, address, var_type):
165
184
"Unable to add address '%s' to settable list" % str (address )
166
185
)
167
186
168
- def __init__ (self , ip_address ):
187
+ def __init__ (self , ip_address : str ):
169
188
self .ip_address = ip_address
170
189
171
- def _decode_pair (self , key , value ):
190
+ self ._settable_addresses = dict (
191
+ (v , int )
192
+ for k , v in vlxDevConstants .__dict__ .items ()
193
+ if any (r .match (k ) for r in self .SETTABLE_INT_VALS )
194
+ )
195
+
196
+ def _decode_pair (self , key : str , value : Union [int ,str ]) -> Tuple [int , Union [int , float ]]:
172
197
try :
173
198
address = int (getattr (vlxDevConstants , key , key ))
174
199
except ValueError :
175
200
raise AttributeError ("%s setting does not exist" % key )
176
201
if "_TEMP_" in key :
177
202
value = to_kelvin (float (value ))
203
+ raw_value : Union [int , float ]
178
204
try :
179
205
raw_value = int (value )
180
206
except ValueError :
@@ -185,42 +211,35 @@ def _decode_pair(self, key, value):
185
211
required_type = addresses [address ]
186
212
except KeyError :
187
213
raise AttributeError ("%s setting is not settable" % key )
214
+
188
215
assert type (raw_value ) == required_type , (
189
216
"%s(%d) key needs to be an %s, but %s passed"
190
217
% (key , address , required_type .__name__ , type (raw_value ).__name__ )
191
218
)
192
219
193
220
return address , raw_value
194
221
195
- async def _websocket_request (self , payload , read_packets = 1 ):
196
- try :
197
- async with websockets .connect ("ws://%s/" % self .ip_address ) as ws :
198
- await ws .send (payload )
199
- results = []
200
- for i in range (0 , read_packets ):
201
- r = await ws .recv ()
202
-
203
- results .append (r )
204
- return results [0 ] if read_packets == 1 else results
205
- except websockets .InvalidHandshake as e :
206
- raise ValloxWebsocketException ("Websocket handshake failed" ) from e
207
- except websockets .InvalidURI as e :
208
- raise ValloxWebsocketException ("Websocket invalid URI" ) from e
209
- except websockets .PayloadTooBig as e :
210
- raise ValloxWebsocketException ("Websocket payload too big" ) from e
211
- except websockets .InvalidState as e :
212
- raise ValloxWebsocketException ("Websocket invalid state" ) from e
213
- except websockets .WebSocketProtocolError as e :
214
- raise ValloxWebsocketException ("Websocket protocol error" ) from e
222
+ @_websocket_exception_handler
223
+ async def _websocket_request (self , payload : bytes ) -> bytes :
224
+ async with websockets .connect ("ws://%s/" % self .ip_address ) as ws :
225
+ await ws .send (payload )
226
+ r : bytes = await ws .recv ()
227
+ return r
228
+
229
+ @_websocket_exception_handler
230
+ async def _websocket_request_multiple (self , payload : bytes , read_packets : int ) -> List [bytes ]:
231
+ async with websockets .connect ("ws://%s/" % self .ip_address ) as ws :
232
+ await ws .send (payload )
233
+ return await asyncio .gather (* [ws .recv () for _ in range (0 , read_packets )])
215
234
216
- async def fetch_metrics (self , metric_keys = None ):
235
+ async def fetch_metrics (self , metric_keys : Optional [ List [ str ]] = None ) -> Dict [ str , Union [ int , float ]] :
217
236
metrics = {}
218
237
payload = ReadTableRequest .build ({})
219
238
result = await self ._websocket_request (payload )
220
239
221
240
data = ReadTableResponse .parse (result )
222
241
223
- if not metric_keys :
242
+ if metric_keys is None :
224
243
metric_keys = vlxDevConstants .__dict__ .keys ()
225
244
226
245
for key in metric_keys :
@@ -233,9 +252,9 @@ async def fetch_metrics(self, metric_keys=None):
233
252
234
253
return metrics
235
254
236
- async def fetch_raw_logs (self ):
255
+ async def fetch_raw_logs (self ) -> List [ List [ Dict [ str , Union [ int , float ]]]] :
237
256
payload = LogReadRequest .build ({})
238
- result = await self ._websocket_request (payload , read_packets = 2 )
257
+ result = await self ._websocket_request_multiple (payload , read_packets = 2 )
239
258
page_count = LogReadResponse1 .parse (result [0 ]).fields .value .pages
240
259
241
260
expected_total_len = KPageSize * page_count
@@ -269,10 +288,10 @@ async def fetch_raw_logs(self):
269
288
270
289
return series
271
290
272
- async def fetch_metric (self , metric_key ) :
291
+ async def fetch_metric (self , metric_key : str ) -> Optional [ Union [ int , float ]] :
273
292
return (await self .fetch_metrics ([metric_key ])).get (metric_key , None )
274
293
275
- async def set_values (self , dict_ ) :
294
+ async def set_values (self , dict_ : Dict [ str , Union [ int , str ]]) -> bool :
276
295
items = []
277
296
for key , value in dict_ .items ():
278
297
address , raw_value = self ._decode_pair (key , value )
0 commit comments