@@ -76,18 +76,15 @@ func printStats() {
76
76
select {
77
77
case _ = <- ticker .C :
78
78
v := time .Now ().Sub (start ).Milliseconds ()
79
-
80
- PMessagesPerSecond := float64 (atomic .LoadInt32 (& publisherMessageCount )) / float64 (v ) * 1000
81
- CMessagesPerSecond := float64 (atomic .LoadInt32 (& consumerMessageCount )) / float64 (v ) * 1000
82
- //latency := float64(totalLatency) / float64(atomic.LoadInt32(&consumerMessageCount))
83
79
averageLatency := int64 (0 )
84
80
if atomic .LoadInt32 (& consumerMessageCount ) > 0 {
81
+ PMessagesPerSecond := float64 (atomic .LoadInt32 (& publisherMessageCount )) / float64 (v ) * 1000
82
+ CMessagesPerSecond := float64 (atomic .LoadInt32 (& consumerMessageCount )) / float64 (v ) * 1000
85
83
averageLatency = totalLatency / int64 (atomic .LoadInt32 (& consumerMessageCount ))
84
+ ConfirmedMessagesPerSecond := float64 (atomic .LoadInt32 (& confirmedMessageCount )) / float64 (v ) * 1000
85
+ logInfo ("Published %8.1f msg/s | Confirmed %8.1f msg/s | Consumed %8.1f msg/s | %3v | %3v | msg sent: %3v | latency: %d ms" ,
86
+ PMessagesPerSecond , ConfirmedMessagesPerSecond , CMessagesPerSecond , decodeRate (), decodeBody (), atomic .LoadInt64 (& messagesSent ), averageLatency )
86
87
}
87
-
88
- ConfirmedMessagesPerSecond := float64 (atomic .LoadInt32 (& confirmedMessageCount )) / float64 (v ) * 1000
89
- logInfo ("Published %8.1f msg/s | Confirmed %8.1f msg/s | Consumed %8.1f msg/s | %3v | %3v | msg sent: %3v | latency: %d ms" ,
90
- PMessagesPerSecond , ConfirmedMessagesPerSecond , CMessagesPerSecond , decodeRate (), decodeBody (), atomic .LoadInt64 (& messagesSent ), averageLatency )
91
88
}
92
89
}
93
90
@@ -100,6 +97,7 @@ func printStats() {
100
97
101
98
atomic .SwapInt32 (& publisherMessageCount , 0 )
102
99
atomic .SwapInt32 (& consumerMessageCount , 0 )
100
+ atomic .SwapInt64 (& totalLatency , 0 )
103
101
atomic .SwapInt32 (& confirmedMessageCount , 0 )
104
102
atomic .SwapInt32 (& notConfirmedMessageCount , 0 )
105
103
start = time .Now ()
@@ -159,8 +157,9 @@ func startSimulation() error {
159
157
err := initStreams ()
160
158
checkErr (err )
161
159
160
+ //
162
161
simulEnvironment , err = stream .NewEnvironment (stream .NewEnvironmentOptions ().
163
- SetUris (rabbitmqBrokerUrl ).
162
+ SetUri (rabbitmqBrokerUrl [ 0 ] ).
164
163
SetMaxProducersPerClient (publishersPerClient ).
165
164
SetMaxConsumersPerClient (consumersPerClient ))
166
165
checkErr (err )
0 commit comments