14
14
from typing import Optional
15
15
from ydb .tests .olap .lib .ydb_cli import YdbCliHelper , WorkloadType , CheckCanonicalPolicy
16
16
from ydb .tests .olap .lib .ydb_cluster import YdbCluster
17
- from ydb .tests .olap .lib .allure_utils import allure_test_description
17
+ from ydb .tests .olap .lib .allure_utils import allure_test_description , NodeErrors
18
18
from ydb .tests .olap .lib .results_processor import ResultsProcessor
19
19
from ydb .tests .olap .lib .utils import get_external_param
20
20
from ydb .tests .olap .scenario .helpers .scenario_tests_helper import ScenarioTestHelper
@@ -37,6 +37,7 @@ def __init__(self, iterations: Optional[int] = None, timeout: Optional[float] =
37
37
scale : Optional [int ] = None
38
38
query_prefix : str = get_external_param ('query-prefix' , '' )
39
39
verify_data : bool = True
40
+ __nodes_state : Optional [dict [tuple [str , int ], YdbCluster .Node ]] = None
40
41
41
42
@classmethod
42
43
def suite (cls ) -> str :
@@ -69,7 +70,7 @@ def _test_name(cls, query_num: int) -> str:
69
70
@allure .step ('check tables size' )
70
71
def check_tables_size (cls , folder : Optional [str ], tables : dict [str , int ]):
71
72
wait_error = YdbCluster .wait_ydb_alive (
72
- 20 * 60 , (
73
+ int ( os . getenv ( 'WAIT_CLUSTER_ALIVE_TIMEOUT' , 20 * 60 )) , (
73
74
f'{ YdbCluster .tables_path } /{ folder } '
74
75
if folder is not None
75
76
else [f'{ YdbCluster .tables_path } /{ t } ' for t in tables .keys ()]
@@ -92,8 +93,19 @@ def check_tables_size(cls, folder: Optional[str], tables: dict[str, int]):
92
93
msg = "\n " .join (errors )
93
94
pytest .fail (f'Unexpected tables size in `{ folder } `:\n { msg } ' )
94
95
96
+ @staticmethod
97
+ def __execute_ssh (host : str , cmd : str ):
98
+ ssh_cmd = ['ssh' , "-o" , "StrictHostKeyChecking=no" , "-o" , "UserKnownHostsFile=/dev/null" ]
99
+ ssh_user = os .getenv ('SSH_USER' )
100
+ if ssh_user is not None :
101
+ ssh_cmd += ['-l' , ssh_user ]
102
+ ssh_key_file = os .getenv ('SSH_KEY_FILE' )
103
+ if ssh_key_file is not None :
104
+ ssh_cmd += ['-i' , ssh_key_file ]
105
+ return yatest .common .execute (ssh_cmd + [host , cmd ], wait = False )
106
+
95
107
@classmethod
96
- def _attach_logs (cls , start_time , attach_name ):
108
+ def __attach_logs (cls , start_time , attach_name ):
97
109
hosts = [node .host for node in filter (lambda x : x .role == YdbCluster .Node .Role .STORAGE , YdbCluster .get_cluster_nodes ())]
98
110
tz = timezone ('Europe/Moscow' )
99
111
start = datetime .fromtimestamp (start_time , tz ).isoformat ()
@@ -102,28 +114,20 @@ def _attach_logs(cls, start_time, attach_name):
102
114
'' : {},
103
115
}
104
116
exec_start = deepcopy (exec_kikimr )
105
- ssh_cmd = ['ssh' , "-o" , "StrictHostKeyChecking=no" , "-o" , "UserKnownHostsFile=/dev/null" ]
106
- ssh_user = os .getenv ('SSH_USER' )
107
- if ssh_user is not None :
108
- ssh_cmd += ['-l' , ssh_user ]
109
- ssh_key_file = os .getenv ('SSH_KEY_FILE' )
110
- if ssh_key_file is not None :
111
- ssh_cmd += ['-i' , ssh_key_file ]
112
117
for host in hosts :
113
118
for c in exec_kikimr .keys ():
114
119
try :
115
- exec_kikimr [c ][host ] = yatest . common . execute ( ssh_cmd + [ host , cmd .format (
120
+ exec_kikimr [c ][host ] = cls . __execute_ssh ( host , cmd .format (
116
121
storage = 'kikimr' ,
117
- container = f' -m k8s_container:{ c } ' if c else '' )],
118
- wait = False )
122
+ container = f' -m k8s_container:{ c } ' if c else ''
123
+ ) )
119
124
except BaseException as e :
120
125
logging .error (e )
121
126
for c in exec_start .keys ():
122
127
try :
123
- exec_start [c ][host ] = yatest . common . execute ( ssh_cmd + [ host , cmd .format (
128
+ exec_start [c ][host ] = cls . __execute_ssh ( host , cmd .format (
124
129
storage = 'kikimr-start' ,
125
- container = f' -m k8s_container:{ c } ' if c else '' )],
126
- wait = False )
130
+ container = f' -m k8s_container:{ c } ' if c else '' ))
127
131
except BaseException as e :
128
132
logging .error (e )
129
133
@@ -146,6 +150,83 @@ def _attach_logs(cls, start_time, attach_name):
146
150
yatest .common .execute (['tar' , '-C' , dir , '-czf' , archive , '.' ])
147
151
allure .attach .file (archive , f'{ attach_name } _{ c } _logs' , extension = 'tar.gz' )
148
152
153
+ @classmethod
154
+ def save_nodes_state (cls ) -> None :
155
+ cls .__nodes_state = {(n .host , n .ic_port ): n for n in YdbCluster .get_cluster_nodes (db_only = True )}
156
+
157
+ @classmethod
158
+ def __get_core_hashes_by_pod (cls , hosts : set [str ], start_time : float , end_time : float ) -> dict [str , list [tuple [str , str ]]]:
159
+ core_processes = {
160
+ h : cls .__execute_ssh (h , 'sudo flock /tmp/brk_pad /Berkanavt/breakpad/bin/kikimr_breakpad_analizer.sh' )
161
+ for h in hosts
162
+ }
163
+
164
+ core_hashes = {}
165
+ for h , exec in core_processes .items ():
166
+ exec .wait (check_exit_code = False )
167
+ if exec .returncode != 0 :
168
+ logging .error (f'Error while process coredumps on host { h } : { exec .stderr .decode ("utf-8" )} ' )
169
+ exec = cls .__execute_ssh (h , ('find /coredumps/ -name "sended_*.json" '
170
+ f'-mmin -{ (10 + time () - start_time ) / 60 } -mmin +{ (- 10 + time () - end_time ) / 60 } '
171
+ ' | while read FILE; do cat $FILE; echo -n ","; done' ))
172
+ exec .wait (check_exit_code = False )
173
+ if exec .returncode == 0 :
174
+ for core in json .loads (f'[{ exec .stdout .decode ("utf-8" ).strip ("," )} ]' ):
175
+ pod_name = core .get ('pod' , '' )
176
+ core_hashes .setdefault (pod_name , [])
177
+ core_hashes [pod_name ].append ((core .get ('core_uuid' , '' ), core .get ('core_hash' , '' )))
178
+ else :
179
+ logging .error (f'Error while search coredumps on host { h } : { exec .stderr .decode ("utf-8" )} ' )
180
+ return core_hashes
181
+
182
+ @classmethod
183
+ def __get_hosts_with_omms (cls , hosts : set [str ], start_time : float , end_time : float ) -> set [str ]:
184
+ tz = timezone ('Europe/Moscow' )
185
+ start = datetime .fromtimestamp (start_time , tz ).strftime ("%Y-%m-%d %H:%M:%S" )
186
+ end = datetime .fromtimestamp (end_time , tz ).strftime ("%Y-%m-%d %H:%M:%S" )
187
+ oom_cmd = f'sudo journalctl -k -q --no-pager -S { start } -U { end } --grep "Out of memory: Kill process"'
188
+ ooms = set ()
189
+ for h in hosts :
190
+ exec = cls .__execute_ssh (h , oom_cmd )
191
+ exec .wait (check_exit_code = False )
192
+ if exec .returncode == 0 :
193
+ if exec .stdout .decode ('utf-8' ):
194
+ ooms .add (h )
195
+ else :
196
+ logging .error (f'Error while search OOMs on host { h } : { exec .stderr .decode ("utf-8" )} ' )
197
+ return ooms
198
+
199
+ @classmethod
200
+ def check_nodes (cls , result : YdbCliHelper .WorkloadRunResult , end_time : float ) -> list [NodeErrors ]:
201
+ if cls .__nodes_state is None :
202
+ return []
203
+ node_errors = []
204
+ fail_hosts = set ()
205
+ for node in YdbCluster .get_cluster_nodes (db_only = True ):
206
+ node_id = (node .host , node .ic_port )
207
+ saved_node = cls .__nodes_state .get (node_id )
208
+ if saved_node is not None :
209
+ if node .start_time > saved_node .start_time :
210
+ node_errors .append (NodeErrors (node , 'was restarted' ))
211
+ fail_hosts .add (node .host )
212
+ del cls .__nodes_state [node_id ]
213
+ for _ , node in cls .__nodes_state .items ():
214
+ node_errors .append (NodeErrors (node , 'is down' ))
215
+ fail_hosts .add (node .host )
216
+ cls .__nodes_state = None
217
+ if len (node_errors ) == 0 :
218
+ return []
219
+
220
+ core_hashes = cls .__get_core_hashes_by_pod (fail_hosts , result .start_time , end_time )
221
+ ooms = cls .__get_hosts_with_omms (fail_hosts , result .start_time , end_time )
222
+ for node in node_errors :
223
+ node .core_hashes = core_hashes .get (f'{ node .node .ic_port } @{ node .node .host } ' , [])
224
+ node .was_oom = node .node .host in ooms
225
+
226
+ for err in node_errors :
227
+ result .add_error (f'Node { err .node .ic_port } @{ err .node .host } { err .message } ' )
228
+ return node_errors
229
+
149
230
@classmethod
150
231
def process_query_result (cls , result : YdbCliHelper .WorkloadRunResult , query_num : int , iterations : int , upload : bool ):
151
232
def _get_duraton (stats , field ):
@@ -210,15 +291,20 @@ def _attach_plans(plan: YdbCliHelper.QueryPlan, name: str) -> None:
210
291
for p in ['Mean' ]:
211
292
if p in stats :
212
293
allure .dynamic .parameter (p , _duration_text (stats [p ] / 1000. ))
294
+ end_time = time ()
213
295
if os .getenv ('NO_KUBER_LOGS' ) is None and not result .success :
214
- cls ._attach_logs (start_time = result .start_time , attach_name = 'kikimr' )
296
+ cls .__attach_logs (start_time = result .start_time , attach_name = 'kikimr' )
215
297
allure .attach (json .dumps (stats , indent = 2 ), 'Stats' , attachment_type = allure .attachment_type .JSON )
298
+ allure_test_description (
299
+ cls .suite (), test , refference_set = cls .refference ,
300
+ start_time = result .start_time , end_time = end_time , node_errors = cls .check_nodes (result , end_time )
301
+ )
216
302
if upload :
217
303
ResultsProcessor .upload_results (
218
304
kind = 'Load' ,
219
305
suite = cls .suite (),
220
306
test = test ,
221
- timestamp = time () ,
307
+ timestamp = end_time ,
222
308
is_successful = result .success ,
223
309
min_duration = _get_duraton (stats , 'Min' ),
224
310
max_duration = _get_duraton (stats , 'Max' ),
@@ -263,6 +349,7 @@ def run_workload_test(self, path: str, query_num: int) -> None:
263
349
if param .name == 'query_num' :
264
350
param .mode = allure .parameter_mode .HIDDEN .value
265
351
qparams = self ._get_query_settings (query_num )
352
+ self .save_nodes_state ()
266
353
result = YdbCliHelper .workload_run (
267
354
path = path ,
268
355
query_num = query_num ,
@@ -274,5 +361,4 @@ def run_workload_test(self, path: str, query_num: int) -> None:
274
361
scale = self .scale ,
275
362
query_prefix = qparams .query_prefix
276
363
)
277
- allure_test_description (self .suite (), self ._test_name (query_num ), refference_set = self .refference , start_time = result .start_time , end_time = time ())
278
364
self .process_query_result (result , query_num , qparams .iterations , True )
0 commit comments