Skip to content

Commit 0ebda4f

Browse files
authored
Test stability: workload_log fixes (#14384)
1 parent 544a6a7 commit 0ebda4f

File tree

4 files changed

+148
-49
lines changed

4 files changed

+148
-49
lines changed

ydb/tests/library/wardens/factories.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@
1111
from ydb.tests.library.wardens.schemeshard import SchemeShardHasNoInFlightTransactions
1212

1313

14-
def safety_warden_factory(cluster, ssh_username, lines_after=5, cut=True):
14+
def safety_warden_factory(cluster, ssh_username, lines_after=5, cut=True, modification_days=1):
1515
list_of_host_names = [node.host for node in cluster.nodes.values()]
1616
wardens = [AllPDisksAreInValidStateSafetyWarden(cluster)]
1717
wardens.extend(kikimr_grep_dmesg_safety_warden_factory(list_of_host_names, ssh_username))
@@ -24,7 +24,7 @@ def safety_warden_factory(cluster, ssh_username, lines_after=5, cut=True):
2424
for directory, list_of_host_names in by_directory.items():
2525
wardens.extend(
2626
kikimr_start_logs_safety_warden_factory(
27-
list_of_host_names, ssh_username, directory, lines_after, cut
27+
list_of_host_names, ssh_username, directory, lines_after, cut, modification_days
2828
)
2929
)
3030

ydb/tests/library/wardens/logs.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,9 +10,9 @@
1010

1111

1212
def kikimr_start_logs_safety_warden_factory(
13-
list_of_host_names, ssh_username, deploy_path, lines_after=5, cut=True
13+
list_of_host_names, ssh_username, deploy_path, lines_after=5, cut=True, modification_days=1
1414
):
15-
start_markers = ['VERIFY', 'FAIL', 'signal 11', 'signal 6', 'signal 15', 'uncaught exception']
15+
start_markers = ['VERIFY', 'FAIL ', 'signal 11', 'signal 6', 'signal 15', 'uncaught exception', 'ERROR: AddressSanitizer', 'SIG']
1616
username = ssh_username
1717
return [
1818
GrepLogFileForMarkers(
@@ -27,6 +27,7 @@ def kikimr_start_logs_safety_warden_factory(
2727
list_of_host_names,
2828
log_file_pattern=os.path.join(deploy_path, 'kikimr.start.*gz'),
2929
list_of_markers=start_markers,
30+
modification_days=modification_days,
3031
username=username,
3132
lines_after=lines_after,
3233
cut=cut

ydb/tests/stability/tool/__main__.py

Lines changed: 140 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -105,7 +105,7 @@ class bcolors:
105105

106106

107107
class StabilityCluster:
108-
def __init__(self, ssh_username, cluster_path, ydbd_path, ydbd_next_path=None):
108+
def __init__(self, ssh_username, cluster_path, ydbd_path=None, ydbd_next_path=None):
109109
self.working_dir = os.path.join(tempfile.gettempdir(), "ydb_stability")
110110
os.makedirs(self.working_dir, exist_ok=True)
111111
self.ssh_username = ssh_username
@@ -145,7 +145,7 @@ def clean_trace(self, traces):
145145
for line in traces.split('\n'):
146146
line = re.sub(r' @ 0x[a-fA-F0-9]+', '', line)
147147
# Убираем все до текста ошибки или указателя на строку кода
148-
match_verify = re.search(r'VERIFY|FAIL|signal 11|signal 6|signal 15|uncaught exception', line)
148+
match_verify = re.search(r'VERIFY|FAIL|signal 11|signal 6|signal 15|uncaught exception|ERROR: AddressSanitizer|SIG', line)
149149
match_code_file_line = re.search(r'\s+(\S+\.cpp:\d+).*', line)
150150

151151
if match_verify:
@@ -198,12 +198,11 @@ def process_lines(self, text):
198198
trace = trace + line + '\n'
199199
return traces
200200

201-
def get_all_errors(self):
202-
logging.getLogger().setLevel(logging.WARNING)
201+
def get_all_errors(self, mode='all'):
203202
all_results = []
204-
for node in self.kikimr_cluster.nodes.values():
205-
result = node.ssh_command("""
206-
ls -ltr /Berkanavt/kikimr*/logs/kikimr* |
203+
if mode == 'all' or mode == 'raw' or mode == 'aggr':
204+
command = """
205+
ls -ltr /Berkanavt/kikim*/logs/kikimr* |
207206
awk '{print $NF}' |
208207
while read file; do
209208
case "$file" in
@@ -212,27 +211,53 @@ def get_all_errors(self):
212211
*) cat "$file" ;;
213212
esac
214213
done |
215-
grep -E 'VERIFY|FAIL|signal 11|signal 6|signal 15|uncaught exception' -A 20
216-
""", raise_on_error=False)
214+
grep -E 'VERIFY|FAIL |signal 11|signal 6|signal 15|uncaught exception|ERROR: AddressSanitizer|SIG' -A 40 -B 20
215+
"""
216+
elif mode == 'last':
217+
command = """
218+
ls -ltr /Berkanavt/kikim*/logs/kikimr |
219+
awk '{print $NF}' |
220+
while read file; do
221+
cat "$file" | grep -E 'VERIFY|FAIL |signal 11|signal 6|signal 15|uncaught exception|ERROR: AddressSanitizer|SIG' -A 40 -B 20 | tail -120
222+
echo "--"
223+
done
224+
"""
225+
for node in self.kikimr_cluster.nodes.values():
226+
result = node.ssh_command(command, raise_on_error=False)
217227
if result:
218228
all_results.append(result.decode('utf-8'))
219229
all_results = self.process_lines(all_results)
220230
return all_results
221231

222-
def get_errors(self):
223-
errors = self.get_all_errors()
224-
unique_traces = self.find_unique_traces_with_counts(errors)
225-
for trace in unique_traces:
226-
print(f"Trace (Occurrences: {len(unique_traces[trace])}):\n{trace}\n{'-'*60}")
232+
def get_errors(self, mode='raw'):
233+
errors = self.get_all_errors(mode=mode)
234+
if mode == 'raw' or mode == 'last':
235+
print('Traces:')
236+
for trace in errors:
237+
print(f"{trace}\n{'-'*60}")
238+
else:
239+
unique_traces = self.find_unique_traces_with_counts(errors)
240+
for trace in unique_traces:
241+
print(f"Trace (Occurrences: {len(unique_traces[trace])}):\n{trace}\n{'-'*60}")
227242

228243
def perform_checks(self):
229244

230-
safety_violations = safety_warden_factory(self.kikimr_cluster, self.ssh_username, lines_after=20, cut=False).list_of_safety_violations()
245+
safety_violations = safety_warden_factory(self.kikimr_cluster, self.ssh_username, lines_after=20, cut=False, modification_days=3).list_of_safety_violations()
231246
liveness_violations = liveness_warden_factory(self.kikimr_cluster, self.ssh_username).list_of_liveness_violations
232247
coredumps_search_results = {}
233248
for node in self.kikimr_cluster.nodes.values():
234249
result = node.ssh_command('find /coredumps/ -type f | wc -l', raise_on_error=False)
235250
coredumps_search_results[node.host.split(':')[0]] = int(result.decode('utf-8'))
251+
minidumps_search_results = {}
252+
for node in self.kikimr_cluster.nodes.values():
253+
result = node.ssh_command('''
254+
if [ -d "/Berkanavt/minidumps/" ]; then
255+
find /Berkanavt/minidumps/ -type f | wc -l
256+
else
257+
echo 0
258+
fi
259+
''', raise_on_error=False)
260+
minidumps_search_results[node.host.split(':')[0]] = int(result.decode('utf-8'))
236261

237262
print("SAFETY WARDEN:")
238263
for i, violation in enumerate(safety_violations):
@@ -249,6 +274,9 @@ def perform_checks(self):
249274
print("COREDUMPS:")
250275
for node in coredumps_search_results:
251276
print(f' {node}: {coredumps_search_results[node]}')
277+
print("MINIDUMPS:")
278+
for node in coredumps_search_results:
279+
print(f' {node}: {minidumps_search_results[node]}')
252280

253281
def start_nemesis(self):
254282
for node in self.kikimr_cluster.nodes.values():
@@ -258,7 +286,7 @@ def stop_workloads(self):
258286
for node in self.kikimr_cluster.nodes.values():
259287
node.ssh_command(
260288
'sudo pkill screen',
261-
raise_on_error=True
289+
raise_on_error=False
262290
)
263291

264292
def stop_nemesis(self):
@@ -284,17 +312,14 @@ def get_state(self):
284312
print(f'\t{state_object}:\t{status}')
285313

286314
def cleanup(self, mode='all'):
287-
if mode in ['all', 'logs']:
288-
self.kikimr_cluster.cleanup_logs()
289315
for node in self.kikimr_cluster.nodes.values():
290316
if mode in ['all', 'dumps']:
291317
node.ssh_command('sudo rm -rf /coredumps/*', raise_on_error=False)
292318
if mode in ['all', 'logs']:
319+
node.ssh_command('sudo find /Berkanavt/kikimr*/logs/kikimr* -type f -exec rm -f {} +', raise_on_error=False)
293320
node.ssh_command('sudo rm -rf /Berkanavt/nemesis/log/*', raise_on_error=False)
294-
if mode == 'all':
295-
self.stop_nemesis()
296-
node.ssh_command('sudo pkill screen', raise_on_error=False)
297-
node.ssh_command('sudo rm -rf /Berkanavt/kikimr/bin/*', raise_on_error=False)
321+
if mode in ['all', 'logs']:
322+
self.kikimr_cluster.cleanup_logs()
298323

299324
def deploy_ydb(self):
300325
self.cleanup()
@@ -309,6 +334,7 @@ def deploy_ydb(self):
309334
node.ssh_command("/Berkanavt/kikimr/bin/kikimr admin console validator disable bootstrap", raise_on_error=True)
310335

311336
self.deploy_tools()
337+
self.get_state()
312338

313339
def deploy_tools(self):
314340
for node in self.kikimr_cluster.nodes.values():
@@ -348,7 +374,7 @@ def parse_args():
348374
)
349375
parser.add_argument(
350376
"--ydbd_path",
351-
required=True,
377+
required=False,
352378
type=path_type,
353379
help="Path to ydbd",
354380
)
@@ -371,32 +397,56 @@ def parse_args():
371397
nargs="+",
372398
choices=[
373399
"get_errors",
400+
"get_errors_aggr",
401+
"get_errors_last",
374402
"get_state",
403+
"clean_workload",
375404
"cleanup",
376405
"cleanup_logs",
377406
"cleanup_dumps",
378407
"deploy_ydb",
379408
"deploy_tools",
380409
"start_nemesis",
381410
"stop_nemesis",
382-
"start_all_workloads",
411+
"start_default_workloads",
383412
"start_workload_simple_queue_row",
384413
"start_workload_simple_queue_column",
385414
"start_workload_olap_workload",
386415
"start_workload_log",
387416
"start_workload_log_column",
388417
"start_workload_log_row",
389418
"stop_workloads",
419+
"stop_workload",
390420
"perform_checks",
391421
],
392422
help="actions to execute",
393423
)
424+
args, unknown = parser.parse_known_args()
425+
if "stop_workload" in args.actions:
426+
parser.add_argument(
427+
"--name",
428+
type=str,
429+
required=True,
430+
help="Name of the workload to stop",
431+
choices=list(DICT_OF_PROCESSES.keys())
432+
)
433+
434+
if "clean_workload" in args.actions:
435+
parser.add_argument(
436+
"--name",
437+
type=str,
438+
required=True,
439+
help="Name of the workload to stop",
440+
choices=list(DICT_OF_PROCESSES.keys())
441+
)
442+
394443
return parser.parse_args()
395444

396445

397446
def main():
398447
args = parse_args()
399448
ssh_username = args.ssh_user
449+
print('Initing cluster info')
400450
stability_cluster = StabilityCluster(
401451
ssh_username=ssh_username,
402452
cluster_path=args.cluster_path,
@@ -405,8 +455,13 @@ def main():
405455
)
406456

407457
for action in args.actions:
458+
print(f'Start action {action}')
408459
if action == "get_errors":
409-
stability_cluster.get_errors()
460+
stability_cluster.get_errors(mode='raw')
461+
if action == "get_errors_aggr":
462+
stability_cluster.get_errors(mode='aggr')
463+
if action == "get_errors_last":
464+
stability_cluster.get_errors(mode='last')
410465
if action == "get_state":
411466
stability_cluster.get_state()
412467
if action == "deploy_ydb":
@@ -419,7 +474,7 @@ def main():
419474
stability_cluster.cleanup('dumps')
420475
if action == "deploy_tools":
421476
stability_cluster.deploy_tools()
422-
if action == "start_all_workloads":
477+
if action == "start_default_workloads":
423478
for node_id, node in enumerate(stability_cluster.kikimr_cluster.nodes.values()):
424479
node.ssh_command(
425480
'screen -s simple_queue_row -d -m bash -c "while true; do /Berkanavt/nemesis/bin/simple_queue --database /Root/db1 --mode row; done"',
@@ -434,6 +489,43 @@ def main():
434489
raise_on_error=True
435490
)
436491
stability_cluster.get_state()
492+
if action == "stop_workload":
493+
workload_name = args.name
494+
if DICT_OF_PROCESSES.get(workload_name):
495+
for node_id, node in enumerate(stability_cluster.kikimr_cluster.nodes.values()):
496+
node.ssh_command(
497+
f"ps aux | grep {workload_name} | grep -v grep | awk '{{print $2}}' | xargs kill -9",
498+
raise_on_error=True)
499+
else:
500+
print(f"Unknown workload {workload_name}")
501+
stability_cluster.get_state()
502+
if "clean_workload" in action:
503+
workload_name = args.name
504+
if DICT_OF_PROCESSES.get(workload_name):
505+
store_type_list = []
506+
if 'column' in workload_name:
507+
store_type_list.append('column')
508+
elif 'row' in workload_name:
509+
store_type_list.append('row')
510+
else:
511+
store_type_list = ['column', 'row']
512+
if 'log_' in workload_name:
513+
first_node = stability_cluster.kikimr_cluster.nodes[1]
514+
for store_type in store_type_list:
515+
first_node.ssh_command([
516+
'/Berkanavt/nemesis/bin/ydb_cli',
517+
'--endpoint', f'grpc://localhost:{first_node.grpc_port}',
518+
'--database', '/Root/db1',
519+
'workload', 'log', 'clean',
520+
'--path', f'log_workload_{store_type}',
521+
],
522+
raise_on_error=True
523+
)
524+
else:
525+
print(f"Not supported workload clean command for {workload_name}")
526+
else:
527+
print(f"Unknown workload {workload_name}")
528+
stability_cluster.get_state()
437529
if "start_workload_log" in action:
438530
store_type_list = []
439531
if action == 'start_workload_log_column':
@@ -444,31 +536,22 @@ def main():
444536
store_type_list = ['column', 'row']
445537
first_node = stability_cluster.kikimr_cluster.nodes[1]
446538
for store_type in store_type_list:
447-
first_node.ssh_command([
448-
'/Berkanavt/nemesis/bin/ydb_cli',
449-
'--endpoint', f'grpc://localhost:{first_node.grpc_port}',
450-
'--database', '/Root/db1',
451-
'workload', 'log', 'clean',
452-
'--path', f'log_workload_{store_type}',
453-
],
454-
raise_on_error=True
455-
)
456539
first_node.ssh_command([
457540
'/Berkanavt/nemesis/bin/ydb_cli',
458541
'--endpoint', f'grpc://localhost:{first_node.grpc_port}',
459542
'--database', '/Root/db1',
460543
'workload', 'log', 'init',
461544
'--len', '1000',
462-
'--int-cols', '20',
463-
'--key-cols', '20',
545+
'--int-cols', '18',
546+
'--key-cols', '18',
464547
'--min-partitions', '100',
465548
'--partition-size', '10',
466549
'--auto-partition', '0',
467550
'--store', store_type,
468551
'--path', f'log_workload_{store_type}',
469-
'--ttl', '3600'
552+
'--ttl', '20160'
470553
],
471-
raise_on_error=True
554+
raise_on_error=False
472555
)
473556
for node_id, node in enumerate(stability_cluster.kikimr_cluster.nodes.values()):
474557
node.ssh_command([
@@ -478,16 +561,31 @@ def main():
478561
'--database', '/Root/db1',
479562
'workload', 'log', 'run', 'bulk_upsert',
480563
'--len', '1000',
481-
'--int-cols', '20',
482-
'--key-cols', '20',
483-
'--threads', '20',
564+
'--int-cols', '18',
565+
'--key-cols', '18',
566+
'--threads', '1',
484567
'--timestamp_deviation', '180',
485568
'--seconds', '86400',
486569
'--path', f'log_workload_{store_type}',
487570
'; done"'
488571
],
489572
raise_on_error=True
490573
)
574+
node.ssh_command([
575+
f'screen -s workload_log_{store_type}_select -d -m bash -c "while true; do',
576+
'/Berkanavt/nemesis/bin/ydb_cli',
577+
'--verbose',
578+
'--endpoint', f'grpc://localhost:{node.grpc_port}',
579+
'--database', '/Root/db1',
580+
'workload', 'log', 'run', 'select',
581+
'--client-timeout', '1800000',
582+
'--threads', '1',
583+
'--seconds', '86400',
584+
'--path', f'log_workload_{store_type}',
585+
'; done"'
586+
],
587+
raise_on_error=True
588+
)
491589
stability_cluster.get_state()
492590
if action == "start_workload_simple_queue_row":
493591
for node_id, node in enumerate(stability_cluster.kikimr_cluster.nodes.values()):

0 commit comments

Comments
 (0)