@@ -36,8 +36,10 @@ func newSilent() *cobra.Command {
36
36
}
37
37
38
38
var (
39
- publisherMessageCount int32
40
- consumerMessageCount int32
39
+ publisherMessageCount int32
40
+ consumerMessageCount int32
41
+ //consumerMessageCountPerLatency int32
42
+ totalLatency int64
41
43
confirmedMessageCount int32
42
44
notConfirmedMessageCount int32
43
45
consumersCloseCount int32
@@ -77,9 +79,15 @@ func printStats() {
77
79
78
80
PMessagesPerSecond := float64 (atomic .LoadInt32 (& publisherMessageCount )) / float64 (v ) * 1000
79
81
CMessagesPerSecond := float64 (atomic .LoadInt32 (& consumerMessageCount )) / float64 (v ) * 1000
82
+ //latency := float64(totalLatency) / float64(atomic.LoadInt32(&consumerMessageCount))
83
+ averageLatency := int64 (0 )
84
+ if atomic .LoadInt32 (& consumerMessageCount ) > 0 {
85
+ averageLatency = totalLatency / int64 (atomic .LoadInt32 (& consumerMessageCount ))
86
+ }
87
+
80
88
ConfirmedMessagesPerSecond := float64 (atomic .LoadInt32 (& confirmedMessageCount )) / float64 (v ) * 1000
81
- logInfo ("Published %8.1f msg/s | Confirmed %8.1f msg/s | Consumed %8.1f msg/s | %3v | %3v | msg sent: %3v |" ,
82
- PMessagesPerSecond , ConfirmedMessagesPerSecond , CMessagesPerSecond , decodeRate (), decodeBody (), atomic .LoadInt64 (& messagesSent ))
89
+ logInfo ("Published %8.1f msg/s | Confirmed %8.1f msg/s | Consumed %8.1f msg/s | %3v | %3v | msg sent: %3v | latency: %d ms " ,
90
+ PMessagesPerSecond , ConfirmedMessagesPerSecond , CMessagesPerSecond , decodeRate (), decodeBody (), atomic .LoadInt64 (& messagesSent ), averageLatency )
83
91
}
84
92
}
85
93
@@ -273,28 +281,9 @@ func startPublisher(streamName string) error {
273
281
return err
274
282
}
275
283
276
- var arr []message.StreamMessage
277
- var body []byte
278
- for z := 0 ; z < batchSize ; z ++ {
279
-
280
- if fixedBody > 0 {
281
- body = make ([]byte , fixedBody )
282
- } else {
283
- if variableBody > 0 {
284
- rand .Seed (time .Now ().UnixNano ())
285
- body = make ([]byte , rand .Intn (variableBody ))
286
- }
287
- }
288
- n := time .Now ().UnixNano ()
289
- var buff = make ([]byte , 8 )
290
- binary .BigEndian .PutUint64 (buff , uint64 (n ))
291
- /// added to calculate the latency
292
- msg := amqp .NewMessage (append (buff , body ... ))
293
- arr = append (arr , msg )
294
- }
295
-
296
- go func (prod * ha.ReliableProducer , messages []message.StreamMessage ) {
284
+ go func (prod * ha.ReliableProducer ) {
297
285
for {
286
+
298
287
if rate > 0 {
299
288
rateWithBatchSize := float64 (rate ) / float64 (batchSize )
300
289
sleepAfterMessage := float64 (time .Second ) / rateWithBatchSize
@@ -313,21 +302,50 @@ func startPublisher(streamName string) error {
313
302
}
314
303
time .Sleep (time .Duration (sleep ) * time .Millisecond )
315
304
}
305
+ messages := buildMessages ()
316
306
317
- atomic .AddInt64 (& messagesSent , int64 (len (arr )))
318
- for _ , streamMessage := range arr {
319
- err = prod .Send ( streamMessage )
307
+ atomic .AddInt64 (& messagesSent , int64 (len (messages )))
308
+ if isBatchSend {
309
+ err = prod .BatchSend ( messages )
320
310
checkErr (err )
311
+ } else {
312
+ for _ , streamMessage := range messages {
313
+ err = prod .Send (streamMessage )
314
+ checkErr (err )
315
+ }
321
316
}
322
- atomic .AddInt32 (& publisherMessageCount , int32 (len (arr )))
317
+
318
+ atomic .AddInt32 (& publisherMessageCount , int32 (len (messages )))
323
319
324
320
}
325
- }(rPublisher , arr )
321
+ }(rPublisher )
326
322
327
323
return nil
328
324
329
325
}
330
326
327
+ func buildMessages () []message.StreamMessage {
328
+ var arr []message.StreamMessage
329
+ for z := 0 ; z < batchSize ; z ++ {
330
+ //var body []byte
331
+ if fixedBody > 0 {
332
+ // body = make([]byte, fixedBody)
333
+ } else {
334
+ if variableBody > 0 {
335
+ rand .Seed (time .Now ().UnixNano ())
336
+ // body = make([]byte, rand.Intn(variableBody))
337
+ }
338
+ }
339
+ var buff = make ([]byte , 8 )
340
+ sentTime := time .Now ().UnixMilli ()
341
+ binary .BigEndian .PutUint64 (buff , uint64 (sentTime ))
342
+ /// added to calculate the latency
343
+ msg := amqp .NewMessage (buff )
344
+ arr = append (arr , msg )
345
+ }
346
+ return arr
347
+ }
348
+
331
349
func startPublishers () error {
332
350
333
351
logInfo ("Starting %d publishers..." , publishers )
@@ -362,8 +380,12 @@ func handleConsumerClose(channelClose stream.ChannelClose) {
362
380
func startConsumer (consumerName string , streamName string ) error {
363
381
364
382
handleMessages := func (consumerContext stream.ConsumerContext , message * amqp.Message ) {
365
- atomic .AddInt32 (& consumerMessageCount , 1 )
366
383
384
+ sentTime := binary .BigEndian .Uint64 (message .GetData ()[:8 ]) // Decode the timestamp
385
+ startTimeFromMessage := time .UnixMilli (int64 (sentTime ))
386
+ latency := time .Now ().Sub (startTimeFromMessage ).Milliseconds ()
387
+ totalLatency += latency
388
+ atomic .AddInt32 (& consumerMessageCount , 1 )
367
389
}
368
390
offsetSpec := stream.OffsetSpecification {}.Last ()
369
391
switch consumerOffset {
0 commit comments