19
19
20
20
from tests .common .assertions import assert_no_errors_in_logs
21
21
from tests .common .osu_common import run_individual_osu_benchmark
22
- from tests .common .utils import fetch_instance_slots , get_installed_parallelcluster_version , run_system_analyzer
22
+ from tests .common .utils import (
23
+ fetch_instance_slots ,
24
+ get_installed_parallelcluster_version ,
25
+ run_system_analyzer ,
26
+ write_file ,
27
+ )
23
28
24
29
# We collected OSU benchmarks results for c5n.18xlarge only.
25
30
OSU_BENCHMARKS_INSTANCES = ["c5n.18xlarge" ]
@@ -61,6 +66,8 @@ def test_osu(
61
66
62
67
benchmark_failures = []
63
68
69
+ output_dir = request .config .getoption ("output_dir" )
70
+
64
71
# Run OSU benchmarks in efa-enabled queue.
65
72
for mpi_version in mpi_variants :
66
73
benchmark_failures .extend (
@@ -69,6 +76,7 @@ def test_osu(
69
76
remote_command_executor ,
70
77
scheduler_commands ,
71
78
test_datadir ,
79
+ output_dir ,
72
80
os ,
73
81
instance ,
74
82
slots_per_instance ,
@@ -81,6 +89,7 @@ def test_osu(
81
89
remote_command_executor ,
82
90
scheduler_commands ,
83
91
test_datadir ,
92
+ output_dir ,
84
93
os ,
85
94
instance ,
86
95
num_instances = 32 ,
@@ -108,6 +117,7 @@ def _test_osu_benchmarks_pt2pt(
108
117
remote_command_executor ,
109
118
scheduler_commands ,
110
119
test_datadir ,
120
+ output_dir ,
111
121
os ,
112
122
instance ,
113
123
slots_per_instance ,
@@ -120,10 +130,11 @@ def _test_osu_benchmarks_pt2pt(
120
130
accepted_number_of_failures = 4
121
131
122
132
failed_benchmarks = []
133
+ benchmark_group = "pt2pt"
123
134
for benchmark_name in ["osu_latency" , "osu_bibw" ]:
124
135
_ , output = run_individual_osu_benchmark (
125
136
mpi_version ,
126
- "pt2pt" ,
137
+ benchmark_group ,
127
138
benchmark_name ,
128
139
partition ,
129
140
remote_command_executor ,
@@ -132,7 +143,9 @@ def _test_osu_benchmarks_pt2pt(
132
143
slots_per_instance ,
133
144
test_datadir ,
134
145
)
135
- failures = _check_osu_benchmarks_results (test_datadir , os , instance , mpi_version , benchmark_name , output )
146
+ failures = _check_osu_benchmarks_results (
147
+ test_datadir , output_dir , os , instance , mpi_version , benchmark_name , output
148
+ )
136
149
if failures > accepted_number_of_failures :
137
150
failed_benchmarks .append (f"{ mpi_version } -{ benchmark_name } " )
138
151
@@ -144,6 +157,7 @@ def _test_osu_benchmarks_collective(
144
157
remote_command_executor ,
145
158
scheduler_commands ,
146
159
test_datadir ,
160
+ output_dir ,
147
161
os ,
148
162
instance ,
149
163
num_instances ,
@@ -154,10 +168,11 @@ def _test_osu_benchmarks_collective(
154
168
accepted_number_of_failures = 3
155
169
156
170
failed_benchmarks = []
171
+ benchmark_group = "collective"
157
172
for benchmark_name in ["osu_allgather" , "osu_bcast" , "osu_allreduce" , "osu_alltoall" ]:
158
173
_ , output = run_individual_osu_benchmark (
159
174
mpi_version ,
160
- "collective" ,
175
+ benchmark_group ,
161
176
benchmark_name ,
162
177
partition ,
163
178
remote_command_executor ,
@@ -167,7 +182,9 @@ def _test_osu_benchmarks_collective(
167
182
test_datadir ,
168
183
timeout = 24 ,
169
184
)
170
- failures = _check_osu_benchmarks_results (test_datadir , os , instance , mpi_version , benchmark_name , output )
185
+ failures = _check_osu_benchmarks_results (
186
+ test_datadir , output_dir , os , instance , mpi_version , benchmark_name , output
187
+ )
171
188
if failures > accepted_number_of_failures :
172
189
failed_benchmarks .append (f"{ mpi_version } -{ benchmark_name } " )
173
190
@@ -213,12 +230,13 @@ def _test_osu_benchmarks_multiple_bandwidth(
213
230
assert_that (float (max_bandwidth )).is_greater_than (expected_bandwidth )
214
231
215
232
216
- def _check_osu_benchmarks_results (test_datadir , os , instance , mpi_version , benchmark_name , output ):
233
+ def _check_osu_benchmarks_results (test_datadir , output_dir , os , instance , mpi_version , benchmark_name , output ):
217
234
logging .info (output )
218
235
# Check avg latency for all packet sizes
219
236
failures = 0
220
237
metric_data = []
221
238
metric_namespace = "ParallelCluster/test_efa"
239
+ evaluation_output = ""
222
240
for packet_size , value in re .findall (r"(\d+)\s+(\d+)\." , output ):
223
241
with open (
224
242
str (test_datadir / "osu_benchmarks" / "results" / os / instance / mpi_version / benchmark_name ),
@@ -236,11 +254,17 @@ def _check_osu_benchmarks_results(test_datadir, os, instance, mpi_version, bench
236
254
237
255
is_failure = int (value ) > tolerated_value
238
256
257
+ percentage_diff = (float (value ) - float (tolerated_value )) / float (tolerated_value ) * 100
258
+
259
+ outcome = "DEGRADATION" if percentage_diff > 0 else "IMPROVEMENT"
260
+
239
261
message = (
240
- f"{ mpi_version } - { benchmark_name } - packet size { packet_size } : "
241
- f"tolerated: { tolerated_value } , current: { value } "
262
+ f"{ outcome } : { mpi_version } - { benchmark_name } - packet size { packet_size } : "
263
+ f"tolerated: { tolerated_value } , current: { value } , percentage_diff: { percentage_diff } % "
242
264
)
243
265
266
+ evaluation_output += f"\n { message } "
267
+
244
268
dimensions = {
245
269
"PclusterVersion" : get_installed_parallelcluster_version (),
246
270
"MpiVariant" : mpi_version ,
@@ -263,6 +287,11 @@ def _check_osu_benchmarks_results(test_datadir, os, instance, mpi_version, bench
263
287
logging .error (message )
264
288
else :
265
289
logging .info (message )
290
+ write_file (
291
+ dirname = f"{ output_dir } /osu-results" ,
292
+ filename = f"{ os } -{ instance } -{ mpi_version } -{ benchmark_name } -evaluation.out" ,
293
+ content = evaluation_output ,
294
+ )
266
295
boto3 .client ("cloudwatch" ).put_metric_data (Namespace = metric_namespace , MetricData = metric_data )
267
296
268
297
return failures
0 commit comments