7
7
import ydb
8
8
from copy import deepcopy
9
9
from time import sleep , time
10
- from typing import List
10
+ from typing import List , Optional
11
11
12
12
LOGGER = logging .getLogger ()
13
13
@@ -53,9 +53,11 @@ def _get_service_url(cls):
53
53
return f'http://{ host } :{ port } '
54
54
55
55
@classmethod
56
- def get_cluster_nodes (cls , path = None ) :
56
+ def get_cluster_nodes (cls , path : Optional [ str ] = None , db_only : bool = False ) -> list [ dict [ str : any ]] :
57
57
try :
58
- url = f'{ cls ._get_service_url ()} /viewer/json/nodes?database=/{ cls .ydb_database } '
58
+ url = f'{ cls ._get_service_url ()} /viewer/json/nodes?'
59
+ if db_only or path is not None :
60
+ url += f'database=/{ cls .ydb_database } '
59
61
if path is not None :
60
62
url += f'&path={ path } &tablets=true'
61
63
headers = {}
@@ -64,8 +66,7 @@ def get_cluster_nodes(cls, path=None):
64
66
# headers['Authorization'] = token
65
67
data = requests .get (url , headers = headers ).json ()
66
68
nodes = data .get ('Nodes' , [])
67
- nodes_count = int (data .get ('TotalNodes' , len (nodes )))
68
- return nodes , nodes_count
69
+ return nodes
69
70
except Exception as e :
70
71
LOGGER .error (e )
71
72
return [], 0
@@ -76,7 +77,7 @@ def get_cluster_info(cls):
76
77
version = ''
77
78
cluster_name = ''
78
79
nodes_wilcard = ''
79
- nodes , node_count = cls .get_cluster_nodes ()
80
+ nodes = cls .get_cluster_nodes ()
80
81
for node in nodes :
81
82
n = node .get ('SystemState' , {})
82
83
cluster_name = n .get ('ClusterName' , cluster_name )
@@ -86,7 +87,6 @@ def get_cluster_info(cls):
86
87
nodes_wilcard = n .get ('Host' , nodes_wilcard ).split ('.' )[0 ].rstrip ('0123456789' )
87
88
cls ._cluster_info = {
88
89
'database' : cls .ydb_database ,
89
- 'node_count' : node_count ,
90
90
'version' : version ,
91
91
'name' : cluster_name ,
92
92
'nodes_wilcard' : nodes_wilcard ,
@@ -183,63 +183,53 @@ def check_if_ydb_alive(cls, timeout=10, balanced_paths=None):
183
183
def _check_node (n ):
184
184
name = 'UnknownNode'
185
185
error = None
186
- role = 'Unknown'
187
186
try :
188
187
ss = n .get ('SystemState' , {})
189
188
name = ss .get ("Host" )
190
189
start_time = int (ss .get ('StartTime' , int (time ()) * 1000 )) / 1000
191
190
uptime = int (time ()) - start_time
192
- r = ss .get ('Roles' , [])
193
- for role_candidate in ['Storage' , 'Tenant' ]:
194
- if role_candidate in r :
195
- role = role_candidate
196
- break
197
191
if uptime < 15 :
198
192
error = f'Node { name } too yong: { uptime } '
199
193
except BaseException as ex :
200
194
error = f"Error while process node { name } : { ex } "
201
195
if error :
202
196
LOGGER .error (error )
203
- return error , role
197
+ return error
204
198
205
199
errors = []
206
200
try :
207
- nodes , node_count = cls .get_cluster_nodes ()
208
- if node_count == 0 :
209
- errors .append ('nodes_count == 0' )
210
- if len (nodes ) < node_count :
211
- errors .append (f"{ node_count - len (nodes )} nodes from { node_count } don't live" )
212
- ok_by_role = {'Tenant' : 0 , 'Storage' : 0 , 'Unknown' : 0 }
213
- nodes_by_role = deepcopy (ok_by_role )
214
- node_errors = {'Tenant' : [], 'Storage' : [], 'Unknown' : []}
201
+ nodes = cls .get_cluster_nodes (db_only = True )
202
+ expected_nodes_count = os .getenv ('EXPECTED_DYN_NODES_COUNT' )
203
+ nodes_count = len (nodes )
204
+ if expected_nodes_count :
205
+ LOGGER .debug (f'Expected nodes count: { expected_nodes_count } ' )
206
+ expected_nodes_count = int (expected_nodes_count )
207
+ if nodes_count < expected_nodes_count :
208
+ errors .append (f"{ expected_nodes_count - nodes_count } nodes from { expected_nodes_count } don't alive" )
209
+ ok_node_count = 0
210
+ node_errors = []
215
211
for n in nodes :
216
- error , role = _check_node (n )
212
+ error = _check_node (n )
217
213
if error :
218
- node_errors [ role ] .append (error )
214
+ node_errors .append (error )
219
215
else :
220
- ok_by_role [role ] += 1
221
- nodes_by_role [role ] += 1
222
- dynnodes_count = nodes_by_role ['Tenant' ]
223
- ok_dynnodes_count = ok_by_role ['Tenant' ]
224
- if ok_dynnodes_count < dynnodes_count :
225
- dynnodes_errors = ',' .join (node_errors ['Tenant' ])
226
- errors .append (f'Only { ok_dynnodes_count } from { dynnodes_count } dynnodes are ok: { dynnodes_errors } ' )
227
- storage_nodes_count = nodes_by_role ['Storage' ]
228
- ok_storage_nodes_count = ok_by_role ['Storage' ]
229
- if ok_storage_nodes_count < storage_nodes_count :
230
- storage_nodes_errors = ',' .join (node_errors ['Tenant' ])
231
- errors .append (f'Only { ok_storage_nodes_count } from { storage_nodes_count } storage nodes are ok. { storage_nodes_errors } ' )
232
- paths_to_balance = []
233
- if isinstance (balanced_paths , str ):
234
- paths_to_balance += cls ._get_tables (balanced_paths )
235
- elif isinstance (balanced_paths , list ):
236
- for path in balanced_paths :
237
- paths_to_balance += cls ._get_tables (path )
216
+ ok_node_count += 1
217
+ if ok_node_count < nodes_count :
218
+ errors .append (f'Only { ok_node_count } from { ok_node_count } dynnodes are ok: { "," .join (node_errors )} ' )
238
219
if os .getenv ('TEST_CHECK_BALANCING' , 'no' ) == 'yes' :
220
+ paths_to_balance = []
221
+ if isinstance (balanced_paths , str ):
222
+ paths_to_balance += cls ._get_tables (balanced_paths )
223
+ elif isinstance (balanced_paths , list ):
224
+ for path in balanced_paths :
225
+ paths_to_balance += cls ._get_tables (path )
239
226
for p in paths_to_balance :
240
- table_nodes , _ = cls .get_cluster_nodes (p )
227
+ table_nodes = cls .get_cluster_nodes (p )
241
228
min = None
242
229
max = None
230
+ if expected_nodes_count :
231
+ if len (table_nodes ) < expected_nodes_count :
232
+ min = 0
243
233
for tn in table_nodes :
244
234
tablet_count = 0
245
235
for tablet in tn .get ("Tablets" , []):
@@ -256,7 +246,7 @@ def _check_node(n):
256
246
errors .append (f'Table { p } has no tablets' )
257
247
elif max - min > 1 :
258
248
errors .append (f'Table { p } is not balanced: { min } -{ max } shards.' )
259
- LOGGER .info (f'Table { p } is balanced : { min } -{ max } shards.' )
249
+ LOGGER .info (f'Table { p } balance : { min } -{ max } shards.' )
260
250
261
251
cls .execute_single_result_query ("select 1" , timeout )
262
252
except BaseException as ex :
0 commit comments