8
8
from copy import deepcopy
9
9
from time import sleep , time
10
10
from typing import List , Optional
11
+ from enum import Enum
11
12
12
13
LOGGER = logging .getLogger ()
13
14
@@ -20,6 +21,33 @@ def __init__(self, url: str, caption: str = 'link') -> None:
20
21
self .url = f'https://{ self .url } '
21
22
self .caption = caption
22
23
24
+ class Node :
25
+ class Role (Enum ):
26
+ UNKNOWN = 0
27
+ STORAGE = 1
28
+ COMPUTE = 2
29
+
30
+ class Tablet :
31
+ def __init__ (self , desc : dict ):
32
+ self .state : str = desc .get ('State' , 'Red' )
33
+ self .type : str = desc .get ('Type' , 'Unknown' )
34
+ self .count : int = desc .get ('Count' , 0 )
35
+
36
+ def __init__ (self , desc : dict ):
37
+ ss = desc .get ('SystemState' , {})
38
+ self .host : str = ss .get ('Host' , '' )
39
+ self .disconnected : bool = desc .get ('Disconnected' , False )
40
+ self .cluster_name : str = ss .get ('ClusterName' , '' )
41
+ self .version : str = ss .get ('Version' , '' )
42
+ self .start_time : float = 0.001 * int (ss .get ('StartTime' , time () * 1000 ))
43
+ if 'Storage' in ss .get ('Roles' , []):
44
+ self .role = YdbCluster .Node .Role .STORAGE
45
+ elif 'Tenants' in ss .get ('Roles' , []):
46
+ self .role = YdbCluster .Node .Role .COMPUTE
47
+ else :
48
+ self .role = YdbCluster .Node .Role .UNKNOWN
49
+ self .tablets = [YdbCluster .Node .Tablet (t ) for t in desc .get ('Tablets' , [])]
50
+
23
51
_ydb_driver = None
24
52
_results_driver = None
25
53
_cluster_info = None
@@ -53,7 +81,7 @@ def _get_service_url(cls):
53
81
return f'http://{ host } :{ port } '
54
82
55
83
@classmethod
56
- def get_cluster_nodes (cls , path : Optional [str ] = None , db_only : bool = False ) -> list [dict [ str : any ] ]:
84
+ def get_cluster_nodes (cls , path : Optional [str ] = None , db_only : bool = False ) -> list [YdbCluster . Node ]:
57
85
try :
58
86
url = f'{ cls ._get_service_url ()} /viewer/json/nodes?'
59
87
if db_only or path is not None :
@@ -64,27 +92,32 @@ def get_cluster_nodes(cls, path: Optional[str] = None, db_only: bool = False) ->
64
92
# token = os.getenv('OLAP_YDB_OAUTH', None)
65
93
# if token is not None:
66
94
# headers['Authorization'] = token
67
- data = requests .get (url , headers = headers ).json ()
68
- nodes = data .get ('Nodes' , [])
69
- return nodes
95
+ response = requests .get (url , headers = headers )
96
+ response .raise_for_status ()
97
+ data = response .json ()
98
+ if not isinstance (data , dict ):
99
+ raise Exception (f'Incorrect response type: { data } ' )
100
+ return [YdbCluster .Node (n ) for n in data .get ('Nodes' , [])]
101
+ except requests .HTTPError as e :
102
+ LOGGER .error (f'{ e .strerror } : { e .response .content } ' )
70
103
except Exception as e :
71
104
LOGGER .error (e )
72
- return [], 0
105
+ return []
73
106
74
107
@classmethod
75
108
def get_cluster_info (cls ):
76
109
if cls ._cluster_info is None :
77
110
version = ''
78
111
cluster_name = ''
79
112
nodes_wilcard = ''
80
- nodes = cls .get_cluster_nodes ()
113
+ nodes = cls .get_cluster_nodes (db_only = True )
81
114
for node in nodes :
82
- n = node . get ( 'SystemState' , {})
83
- cluster_name = n . get ( 'ClusterName' , cluster_name )
84
- version = n . get ( 'Version' , version )
85
- for tenant in n . get ( 'Tenants' , []):
86
- if tenant . endswith ( cls . ydb_database ) :
87
- nodes_wilcard = n . get ( 'Host' , nodes_wilcard ) .split ('.' )[0 ].rstrip ('0123456789' )
115
+ if not cluster_name :
116
+ cluster_name = node . cluster_name
117
+ if not version :
118
+ version = node . version
119
+ if not nodes_wilcard and node . role == YdbCluster . Node . Role . COMPUTE :
120
+ nodes_wilcard = node . host .split ('.' )[0 ].rstrip ('0123456789' )
88
121
cls ._cluster_info = {
89
122
'database' : cls .ydb_database ,
90
123
'version' : version ,
@@ -162,6 +195,14 @@ def _get_tables(cls, path):
162
195
result .append (full_path )
163
196
return result
164
197
198
+ @staticmethod
199
+ def _join_errors (log_level : int , errors : list [str ]):
200
+ if len (errors ) > 0 :
201
+ error = ', ' .join (errors )
202
+ LOGGER .log (log_level , error )
203
+ return error
204
+ return None
205
+
165
206
@classmethod
166
207
@allure .step ('Execute scan query' )
167
208
def execute_single_result_query (cls , query , timeout = 10 ):
@@ -180,21 +221,14 @@ def execute_single_result_query(cls, query, timeout=10):
180
221
@classmethod
181
222
@allure .step ('Check if YDB alive' )
182
223
def check_if_ydb_alive (cls , timeout = 10 , balanced_paths = None ) -> tuple [str , str ]:
183
- def _check_node (n ):
184
- name = 'UnknownNode'
185
- error = None
186
- try :
187
- ss = n .get ('SystemState' , {})
188
- name = ss .get ("Host" )
189
- start_time = int (ss .get ('StartTime' , int (time ()) * 1000 )) / 1000
190
- uptime = int (time ()) - start_time
191
- if uptime < 15 :
192
- error = f'Node { name } too yong: { uptime } '
193
- except BaseException as ex :
194
- error = f"Error while process node { name } : { ex } "
195
- if error :
196
- LOGGER .error (error )
197
- return error
224
+ def _check_node (n : YdbCluster .Node ):
225
+ errors = []
226
+ if n .disconnected :
227
+ errors .append (f'Node { n .host } disconnected' )
228
+ uptime = time () - n .start_time
229
+ if uptime < 15 :
230
+ errors .append (f'Node { n .host } too yong: { uptime } ' )
231
+ return cls ._join_errors (logging .ERROR , errors )
198
232
199
233
errors = []
200
234
warnings = []
@@ -216,7 +250,7 @@ def _check_node(n):
216
250
else :
217
251
ok_node_count += 1
218
252
if ok_node_count < nodes_count :
219
- errors .append (f'Only { ok_node_count } from { ok_node_count } dynnodes are ok: { "," .join (node_errors )} ' )
253
+ errors .append (f'Only { ok_node_count } from { nodes_count } dynnodes are ok: { "," .join (node_errors )} ' )
220
254
paths_to_balance = []
221
255
if isinstance (balanced_paths , str ):
222
256
paths_to_balance += cls ._get_tables (balanced_paths )
@@ -232,11 +266,11 @@ def _check_node(n):
232
266
min = 0
233
267
for tn in table_nodes :
234
268
tablet_count = 0
235
- for tablet in tn .get ( "Tablets" , []) :
236
- if tablet .get ( "State" ) != "Green" :
237
- warnings .append (f'Node { tn .get ( "SystemState" , {}). get ( "Host" ) } : { tablet .get ( "Count" ) } tablets of type { tablet .get ( "Type" ) } in { tablet .get ( "State" ) } state' )
238
- if tablet .get ( "Type" ) in {"ColumnShard" , "DataShard" }:
239
- tablet_count += tablet .get ( "Count" )
269
+ for tablet in tn .tablets :
270
+ if tablet .count > 0 and tablet . state != "Green" :
271
+ warnings .append (f'Node { tn .host } : { tablet .count } tablets of type { tablet .type } in { tablet .state } state' )
272
+ if tablet .type in {"ColumnShard" , "DataShard" }:
273
+ tablet_count += tablet .count
240
274
if tablet_count > 0 :
241
275
if min is None or tablet_count < min :
242
276
min = tablet_count
@@ -251,10 +285,8 @@ def _check_node(n):
251
285
cls .execute_single_result_query ("select 1" , timeout )
252
286
except BaseException as ex :
253
287
errors .append (f"Cannot connect to YDB: { ex } " )
254
- error = ', ' .join (errors ) if len (errors ) > 0 else None
255
- warning = ', ' .join (warnings ) if len (warnings ) > 0 else None
256
- LOGGER .error (f'Errors: { error } , warnings: { warning } ' )
257
- return error , warning
288
+
289
+ return cls ._join_errors (logging .ERROR , errors ), cls ._join_errors (logging .WARNING , warnings )
258
290
259
291
@classmethod
260
292
@allure .step ('Wait YDB alive' )
0 commit comments