@@ -222,11 +222,18 @@ def parse_consumer(line):
222
222
223
223
line = line .strip ()
224
224
toks = line .split ("," )
225
- if len (toks ) >= 4 and toks [1 ].strip () == '0' :
226
- toks = line .split ("," )
225
+ if len (toks ) == 4 and "records received" in toks [0 ]:
226
+ r_t = toks [1 ].strip ().split (" " )
227
+ rec_per_s = float (r_t [0 ].strip ())
228
+ thr_per_s = float (r_t [2 ].strip ('(' ))
229
+
230
+ avg_lat = float (toks [2 ].strip ().split (' ' )[0 ].strip ())
231
+ max_lat = float (toks [3 ].strip ().split (' ' )[0 ].strip ())
232
+
233
+ aggr_records = int (toks [0 ].strip ().split (' ' )[0 ])
234
+ elif len (toks ) >= 4 and toks [1 ].strip () == '0' :
227
235
rec_per_s = float (toks [5 ].strip ())
228
236
thr_per_s = float (toks [3 ].strip ())
229
- thr_unit = "MB/s"
230
237
231
238
avg_lat = float (toks [- 1 ].strip ())
232
239
max_lat = float (toks [- 2 ].strip ())
@@ -383,6 +390,10 @@ def print_res(cum_records, total_rec_per_s, total_thr_per_s, thr_unit, total_avg
383
390
out_f = open (args .output_file , "w+" , encoding = "UTF8" )
384
391
out_writer = csv .writer (out_f )
385
392
393
+ hist_aggr_records = {}
394
+ for host in hosts :
395
+ hist_aggr_records [host ] = [0 ] * threads
396
+
386
397
while True :
387
398
completed = 0
388
399
@@ -398,9 +409,11 @@ def print_res(cum_records, total_rec_per_s, total_thr_per_s, thr_unit, total_avg
398
409
399
410
res_count = 0
400
411
for host , outs_per_host in outs .items ():
412
+ i = 0
401
413
for out in outs_per_host :
402
414
try :
403
415
line = next (out )
416
+ # print("{}:{} {}".format(host, res_count, line))
404
417
if args .type == "producer" :
405
418
rec_per_s , thr_per_s , thr_unit , avg_lat , max_lat , aggr_records , throughput_limit = parse_producer (line )
406
419
total_throughput_limit += throughput_limit
@@ -413,18 +426,25 @@ def print_res(cum_records, total_rec_per_s, total_thr_per_s, thr_unit, total_avg
413
426
total_avg_lat = total_avg_lat + avg_lat * aggr_records
414
427
total_max_lat = max (total_max_lat , max_lat )
415
428
total_aggr_records = total_aggr_records + aggr_records
429
+
430
+ hist_aggr_records [host ][i ] = aggr_records
431
+
416
432
# print("{}:{} {}".format(host, res_count, throughput_limit))
417
- # print("{}:{} {}".format(host, res_count, line))
418
433
# print(rec_per_s, thr_per_s, thr_unit, avg_lat, max_lat, aggr_records, total_rec_per_s, total_thr_per_s, total_avg_lat / total_aggr_records, total_max_lat, total_aggr_records)
419
434
except StopIteration :
420
435
completed += 1
436
+ if args .type == "consumer" :
437
+ total_aggr_records = total_aggr_records + hist_aggr_records [host ][i ]
438
+
421
439
if args .wait_for_all and completed >= total_clients :
422
440
print ("All " + str (completed ) + " clients completed" )
423
441
break
424
442
elif not args .wait_for_all :
425
443
print ("{} clients completed. Kill the {} other clients" .format (completed , (total_clients - completed )))
426
444
kill_clients (hosts , typ = args .type )
427
445
break
446
+
447
+ i += 1
428
448
else :
429
449
continue
430
450
break
0 commit comments