5
5
. "github.com/onsi/ginkgo/v2"
6
6
. "github.com/onsi/gomega"
7
7
"github.com/rabbitmq/rabbitmq-stream-go-client/pkg/amqp"
8
+ "github.com/rabbitmq/rabbitmq-stream-go-client/pkg/logs"
8
9
"github.com/rabbitmq/rabbitmq-stream-go-client/pkg/message"
9
10
"github.com/rabbitmq/rabbitmq-stream-go-client/pkg/test-helper"
10
11
"math/rand"
@@ -158,6 +159,8 @@ var _ = Describe("Super Stream Producer", Label("super-stream"), func() {
158
159
}
159
160
mutex .Lock ()
160
161
msgReceived [superStreamPublishConfirm .Partition ] = len (superStreamPublishConfirm .ConfirmationStatus )
162
+ logs .LogInfo ("Partition %s confirmed %d messages, total %d" ,
163
+ superStreamPublishConfirm .Partition , len (superStreamPublishConfirm .ConfirmationStatus ), msgReceived [superStreamPublishConfirm .Partition ])
161
164
mutex .Unlock ()
162
165
}
163
166
@@ -168,23 +171,26 @@ var _ = Describe("Super Stream Producer", Label("super-stream"), func() {
168
171
msg .ApplicationProperties = map [string ]interface {}{"routingKey" : fmt .Sprintf ("hello%d" , i )}
169
172
Expect (superProducer .Send (msg )).NotTo (HaveOccurred ())
170
173
}
171
-
174
+ time . Sleep ( 1 * time . Second )
172
175
// these values are the same for .NET,Python,Java stream clients
173
176
// The aim for this test is to validate the correct routing with the
174
177
// MurmurStrategy.
175
178
Eventually (func () int {
176
179
mutex .Lock ()
177
180
defer mutex .Unlock ()
181
+ logs .LogInfo ("Partition 0 confirmed %d messages" , msgReceived [fmt .Sprintf ("%s-%s" , superStream , "0" )])
178
182
return msgReceived [fmt .Sprintf ("%s-%s" , superStream , "0" )]
179
183
}).WithPolling (300 * time .Millisecond ).WithTimeout (2 * time .Second ).Should (Equal (9 ))
180
184
Eventually (func () int {
181
185
mutex .Lock ()
182
186
defer mutex .Unlock ()
187
+ logs .LogInfo ("Partition 1 confirmed %d messages" , msgReceived [fmt .Sprintf ("%s-%s" , superStream , "1" )])
183
188
return msgReceived [fmt .Sprintf ("%s-%s" , superStream , "1" )]
184
189
}).WithPolling (300 * time .Millisecond ).WithTimeout (2 * time .Second ).Should (Equal (7 ))
185
190
Eventually (func () int {
186
191
mutex .Lock ()
187
192
defer mutex .Unlock ()
193
+ logs .LogInfo ("Partition 2 confirmed %d messages" , msgReceived [fmt .Sprintf ("%s-%s" , superStream , "2" )])
188
194
return msgReceived [fmt .Sprintf ("%s-%s" , superStream , "2" )]
189
195
}).WithPolling (300 * time .Millisecond ).WithTimeout (2 * time .Second ).Should (Equal (4 ))
190
196
0 commit comments