31
31
logging .getLogger ("library.python.retry" ).setLevel ("ERROR" )
32
32
33
33
34
+ def plain_or_under_sanitizer_wrapper (plain , sanitized ):
35
+ try :
36
+ return plain_or_under_sanitizer (plain , sanitized )
37
+ except Exception :
38
+ return plain
39
+
40
+
34
41
class BaseTenant (abc .ABC ):
35
42
def __init__ (
36
43
self ,
@@ -215,14 +222,14 @@ def get_worker_count(self, node_index):
215
222
{"subsystem" : "worker_manager" , "sensor" : "ActiveWorkers" })
216
223
return result if result is not None else 0
217
224
218
- def wait_worker_count (self , node_index , activity , expected_count , timeout = plain_or_under_sanitizer (30 , 150 )):
225
+ def wait_worker_count (self , node_index , activity , expected_count , timeout = plain_or_under_sanitizer_wrapper (30 , 150 )):
219
226
deadline = time .time () + timeout
220
227
while True :
221
228
count = self .get_actor_count (node_index , activity )
222
229
if count >= expected_count :
223
230
break
224
231
assert time .time () < deadline , "Wait actor count failed"
225
- time .sleep (plain_or_under_sanitizer (0.5 , 2 ))
232
+ time .sleep (plain_or_under_sanitizer_wrapper (0.5 , 2 ))
226
233
pass
227
234
228
235
def get_mkql_limit (self , node_index ):
@@ -267,7 +274,7 @@ def ensure_is_alive(self):
267
274
self .wait_bootstrap (n )
268
275
assert self .get_actor_count (n , "GRPC_PROXY" ) > 0 , "Node {} died" .format (n )
269
276
270
- def wait_bootstrap (self , node_index = None , wait_time = plain_or_under_sanitizer (90 , 400 )):
277
+ def wait_bootstrap (self , node_index = None , wait_time = plain_or_under_sanitizer_wrapper (90 , 400 )):
271
278
if node_index is None :
272
279
for n in self .kikimr_cluster .nodes :
273
280
self .wait_bootstrap (n , wait_time )
@@ -280,13 +287,13 @@ def wait_bootstrap(self, node_index=None, wait_time=plain_or_under_sanitizer(90,
280
287
if self .get_actor_count (node_index , "GRPC_PROXY" ) == 0 :
281
288
continue
282
289
except Exception :
283
- time .sleep (plain_or_under_sanitizer (0.3 , 2 ))
290
+ time .sleep (plain_or_under_sanitizer_wrapper (0.3 , 2 ))
284
291
continue
285
292
break
286
293
self .bootstraped_nodes .add (node_index )
287
294
logging .debug ("Node {} has been bootstrapped" .format (node_index ))
288
295
289
- def wait_discovery (self , node_index = None , wait_time = plain_or_under_sanitizer (30 , 150 )):
296
+ def wait_discovery (self , node_index = None , wait_time = plain_or_under_sanitizer_wrapper (30 , 150 )):
290
297
if node_index is None :
291
298
for n in self .kikimr_cluster .nodes :
292
299
self .wait_discovery (n , wait_time )
@@ -301,12 +308,12 @@ def wait_discovery(self, node_index=None, wait_time=plain_or_under_sanitizer(30,
301
308
if peer_count is None or peer_count < self .node_count :
302
309
continue
303
310
except Exception :
304
- time .sleep (plain_or_under_sanitizer (0.3 , 2 ))
311
+ time .sleep (plain_or_under_sanitizer_wrapper (0.3 , 2 ))
305
312
continue
306
313
break
307
314
logging .debug ("Node {} discovery finished" .format (node_index ))
308
315
309
- def wait_workers (self , worker_count , wait_time = plain_or_under_sanitizer (30 , 150 )):
316
+ def wait_workers (self , worker_count , wait_time = plain_or_under_sanitizer_wrapper (30 , 150 )):
310
317
ca_count = worker_count * 2 # we count 2x CAs
311
318
deadline = time .time () + wait_time
312
319
while True :
@@ -357,17 +364,17 @@ def get_completed_checkpoints(self, query_id, expect_counters_exist=False):
357
364
expect_counters_exist = expect_counters_exist )
358
365
359
366
def wait_completed_checkpoints (self , query_id , checkpoints_count ,
360
- timeout = plain_or_under_sanitizer (30 , 150 ),
367
+ timeout = plain_or_under_sanitizer_wrapper (30 , 150 ),
361
368
expect_counters_exist = False ):
362
369
deadline = time .time () + timeout
363
370
while True :
364
371
completed = self .get_completed_checkpoints (query_id , expect_counters_exist = expect_counters_exist )
365
372
if completed >= checkpoints_count :
366
373
break
367
374
assert time .time () < deadline , "Wait zero checkpoint failed, actual completed: " + str (completed )
368
- time .sleep (plain_or_under_sanitizer (0.5 , 2 ))
375
+ time .sleep (plain_or_under_sanitizer_wrapper (0.5 , 2 ))
369
376
370
- def wait_zero_checkpoint (self , query_id , timeout = plain_or_under_sanitizer (30 , 150 ),
377
+ def wait_zero_checkpoint (self , query_id , timeout = plain_or_under_sanitizer_wrapper (30 , 150 ),
371
378
expect_counters_exist = False ):
372
379
self .wait_completed_checkpoints (query_id , 1 , timeout , expect_counters_exist )
373
380
0 commit comments