8
8
"github.com/rabbitmq/rabbitmq-stream-go-client/pkg/message"
9
9
"github.com/rabbitmq/rabbitmq-stream-go-client/pkg/stream"
10
10
"os"
11
+ "strconv"
11
12
"sync/atomic"
12
13
"time"
13
14
)
@@ -34,9 +35,17 @@ func handlePublishConfirm(confirms stream.ChannelPublishConfirm) {
34
35
}
35
36
36
37
func main () {
37
- reader := bufio .NewReader (os .Stdin )
38
38
39
- fmt .Println ("RabbitMQ Sub Entry Batch example" )
39
+ useSyncBatch := os .Args [1 ] == "sync"
40
+ useAsyncSend := os .Args [1 ] == "async"
41
+
42
+ messagesToSend , err := strconv .Atoi (os .Args [2 ])
43
+ CheckErr (err )
44
+ batchSize , err := strconv .Atoi (os .Args [3 ])
45
+ messagesToSend = messagesToSend / batchSize
46
+
47
+ reader := bufio .NewReader (os .Stdin )
48
+ fmt .Println ("RabbitMQ performance example" )
40
49
41
50
// Connect to the broker ( or brokers )
42
51
env , err := stream .NewEnvironment (
@@ -46,7 +55,6 @@ func main() {
46
55
SetUser ("guest" ).
47
56
SetPassword ("guest" ))
48
57
CheckErr (err )
49
- fmt .Printf ("------------------------------------------\n \n " )
50
58
fmt .Println ("Connected to the RabbitMQ server" )
51
59
52
60
streamName := uuid .New ().String ()
@@ -56,38 +64,66 @@ func main() {
56
64
},
57
65
)
58
66
CheckErr (err )
59
- fmt .Printf ("------------------------------------------\n \n " )
60
67
fmt .Printf ("Created Stream: %s \n " , streamName )
61
68
62
- producer , err := env .NewProducer (streamName , stream .NewProducerOptions ().
63
- SetSubEntrySize (500 ).
64
- SetCompression (stream.Compression {}.None ()))
69
+ producer , err := env .NewProducer (streamName ,
70
+ stream .NewProducerOptions ().
71
+ SetBatchSize (batchSize ).
72
+ SetBatchPublishingDelay (100 ))
65
73
CheckErr (err )
66
74
67
- //optional publish confirmation channel
68
75
chPublishConfirm := producer .NotifyPublishConfirmation ()
69
76
handlePublishConfirm (chPublishConfirm )
70
- messagesToSend := 20_000
71
77
fmt .Printf ("------------------------------------------\n \n " )
72
- fmt .Printf ("Start sending %d messages, data size: %d bytes\n " , messagesToSend , len ("hello_world" ))
73
- batchSize := 100
74
- var arr []message.StreamMessage
75
- for i := 0 ; i < batchSize ; i ++ {
76
- arr = append (arr , amqp .NewMessage ([]byte ("hello_world" )))
78
+ fmt .Printf ("Start sending %d messages, data size: %d bytes\n " , messagesToSend * batchSize , len ("hello_world" ))
79
+ var averageLatency time.Duration
80
+ var messagesConsumed int32
81
+ handleMessages := func (consumerContext stream.ConsumerContext , message * amqp.Message ) {
82
+ atomic .AddInt32 (& messagesConsumed , 1 )
83
+ var latency time.Time
84
+ err := latency .UnmarshalBinary (message .Data [0 ])
85
+ CheckErr (err )
86
+ averageLatency += time .Since (latency )
77
87
}
88
+ _ , err = env .NewConsumer (streamName , handleMessages , stream .NewConsumerOptions ().SetOffset (stream.OffsetSpecification {}.First ()))
89
+ CheckErr (err )
78
90
79
91
start := time .Now ()
80
- for i := 0 ; i < messagesToSend ; i ++ {
81
- err := producer .BatchSend (arr )
82
- CheckErr (err )
92
+
93
+ // here the client sends the messages in batch and it is up to the user to aggregate the messages
94
+ if useSyncBatch {
95
+ var arr []message.StreamMessage
96
+ for i := 0 ; i < messagesToSend ; i ++ {
97
+ for i := 0 ; i < batchSize ; i ++ {
98
+ latency , err := time .Now ().MarshalBinary ()
99
+ CheckErr (err )
100
+ arr = append (arr , amqp .NewMessage (latency ))
101
+ }
102
+ err := producer .BatchSend (arr )
103
+ CheckErr (err )
104
+ arr = arr [:0 ]
105
+ }
106
+ }
107
+
108
+ // here the client aggregates the messages based on the batch size and batch publishing delay
109
+ if useAsyncSend {
110
+ for i := 0 ; i < messagesToSend ; i ++ {
111
+ for i := 0 ; i < batchSize ; i ++ {
112
+ latency , err := time .Now ().MarshalBinary ()
113
+ CheckErr (err )
114
+ err = producer .Send (amqp .NewMessage (latency ))
115
+ CheckErr (err )
116
+ }
117
+ }
83
118
}
119
+
84
120
duration := time .Since (start )
121
+ fmt .Println ("Press any key to report and stop " )
122
+ _ , _ = reader .ReadString ('\n' )
85
123
fmt .Printf ("------------------------------------------\n \n " )
86
- fmt .Printf ("Sent %d messages in %s \n " , messagesToSend * 100 , duration )
124
+ fmt .Printf ("Sent %d messages in %s. Confirmed: %d avarage latency: %s \n " , messagesToSend * batchSize , duration , messagesConfirmed , averageLatency / time . Duration ( messagesConsumed ) )
87
125
fmt .Printf ("------------------------------------------\n \n " )
88
126
89
- fmt .Println ("Press any key to stop " )
90
- _ , _ = reader .ReadString ('\n' )
91
127
time .Sleep (200 * time .Millisecond )
92
128
CheckErr (err )
93
129
err = env .DeleteStream (streamName )
0 commit comments