4
4
"bufio"
5
5
"crypto/tls"
6
6
"fmt"
7
+ "github.com/google/uuid"
7
8
"github.com/rabbitmq/rabbitmq-stream-go-client/pkg/amqp"
8
9
"github.com/rabbitmq/rabbitmq-stream-go-client/pkg/logs"
9
10
"github.com/rabbitmq/rabbitmq-stream-go-client/pkg/stream"
@@ -47,18 +48,13 @@ func main() {
47
48
fmt .Println ("Getting started with Streaming TLS client for RabbitMQ" )
48
49
fmt .Println ("Connecting to RabbitMQ streaming ..." )
49
50
50
- addressResolver := stream.AddressResolver {
51
- Host : "35.234.132.231" ,
52
- Port : 5551 ,
53
- }
54
51
// Connect to the broker ( or brokers )
55
52
env , err := stream .NewEnvironment (
56
53
stream .NewEnvironmentOptions ().
57
- SetAddressResolver (addressResolver ).
58
- SetPort (addressResolver .Port ). // standard TLS port
59
- SetHost (addressResolver .Host ).
60
- SetUser ("remote" ).
61
- SetPassword ("remote" ).
54
+ SetHost ("localhost" ).
55
+ SetPort (5551 ). // standard TLS port
56
+ SetUser ("guest" ).
57
+ SetPassword ("guest" ).
62
58
IsTLS (true ).
63
59
// use tls.Config to customize the TLS configuration
64
60
// for tests you may need InsecureSkipVerify: true
@@ -77,12 +73,12 @@ func main() {
77
73
// err = env.DeclareStream(streamName, nil)
78
74
// it is the best practise to define a size, 1GB for example:
79
75
80
- streamName := "perf-test-go"
81
- // err = env.DeclareStream(streamName,
82
- // &stream.StreamOptions{
83
- // MaxLengthBytes: stream.ByteCapacity{}.GB(2),
84
- // },
85
- // )
76
+ streamName := uuid . New (). String ()
77
+ err = env .DeclareStream (streamName ,
78
+ & stream.StreamOptions {
79
+ MaxLengthBytes : stream.ByteCapacity {}.GB (2 ),
80
+ },
81
+ )
86
82
87
83
CheckErr (err )
88
84
@@ -96,7 +92,7 @@ func main() {
96
92
97
93
// the send method automatically aggregates the messages
98
94
// based on batch size
99
- for i := 0 ; i < 10 ; i ++ {
95
+ for i := 0 ; i < 1000 ; i ++ {
100
96
err := producer .Send (amqp .NewMessage ([]byte ("hello_world_" + strconv .Itoa (i ))))
101
97
CheckErr (err )
102
98
}
@@ -111,24 +107,15 @@ func main() {
111
107
//
112
108
//}, nil)
113
109
// if you need to track the offset you need a consumer name like:
114
- consumed := 0
115
110
handleMessages := func (consumerContext stream.ConsumerContext , message * amqp.Message ) {
116
-
117
- consumed ++
118
- if consumed % 1000 == 0 {
119
-
120
- fmt .Printf ("name: %s, offset %d, chunk entities count: %d, total: %d \n " ,
121
- consumerContext .Consumer .GetName (), consumerContext .Consumer .GetOffset (), consumerContext .GetEntriesCount (), consumed )
122
-
123
- }
124
-
111
+ fmt .Printf ("consumer name: %s, text: %s \n " , consumerContext .Consumer .GetName (), message .Data )
125
112
}
126
113
127
114
consumer , err := env .NewConsumer (
128
115
streamName ,
129
116
handleMessages ,
130
117
stream .NewConsumerOptions ().
131
- SetConsumerName ("my_consumer" ). // set a consumer name
118
+ SetConsumerName ("my_consumer" ). // set a consumer name
132
119
SetOffset (stream.OffsetSpecification {}.First ())) // start consuming from the beginning
133
120
CheckErr (err )
134
121
channelClose := consumer .NotifyClose ()
@@ -141,7 +128,7 @@ func main() {
141
128
err = consumer .Close ()
142
129
time .Sleep (200 * time .Millisecond )
143
130
CheckErr (err )
144
- // err = env.DeleteStream(streamName)
131
+ err = env .DeleteStream (streamName )
145
132
CheckErr (err )
146
133
err = env .Close ()
147
134
CheckErr (err )
0 commit comments