diff --git a/README.md b/README.md index 2a180a80..7a010f34 100644 --- a/README.md +++ b/README.md @@ -221,7 +221,7 @@ Use `StreamExists` to check if a stream exists. ### Streams Statistics -To get stream statistics you need to use the the `environment.StreamStats` method. +To get stream statistics you need to use the `environment.StreamStats` method. ```golang stats, err := environment.StreamStats(testStreamName) diff --git a/pkg/stream/super_stream_consumer.go b/pkg/stream/super_stream_consumer.go index 7f3e54f3..4d93f16f 100644 --- a/pkg/stream/super_stream_consumer.go +++ b/pkg/stream/super_stream_consumer.go @@ -14,6 +14,7 @@ type SuperStreamConsumerOptions struct { Filter *ConsumerFilter SingleActiveConsumer *SingleActiveConsumer ConsumerName string + AutoCommitStrategy *AutoCommitStrategy } func NewSuperStreamConsumerOptions() *SuperStreamConsumerOptions { @@ -47,6 +48,11 @@ func (s *SuperStreamConsumerOptions) SetConsumerName(consumerName string) *Super return s } +func (s *SuperStreamConsumerOptions) SetAutoCommit(autoCommitStrategy *AutoCommitStrategy) *SuperStreamConsumerOptions { + s.AutoCommitStrategy = autoCommitStrategy + return s +} + // CPartitionClose is a struct that is used to notify the user when a partition from a consumer is closed // The user can use the NotifyPartitionClose to get the channel type CPartitionClose struct { @@ -161,6 +167,9 @@ func (s *SuperStreamConsumer) ConnectPartition(partition string, offset OffsetSp } options = options.SetFilter(s.SuperStreamConsumerOptions.Filter) + if s.SuperStreamConsumerOptions.AutoCommitStrategy != nil { + options = options.SetAutoCommit(s.SuperStreamConsumerOptions.AutoCommitStrategy) + } if s.SuperStreamConsumerOptions.SingleActiveConsumer != nil { // mandatory to enable the super stream consumer diff --git a/pkg/stream/super_stream_consumer_test.go b/pkg/stream/super_stream_consumer_test.go index 2c04e131..fdb5045d 100644 --- a/pkg/stream/super_stream_consumer_test.go +++ b/pkg/stream/super_stream_consumer_test.go @@ -7,6 +7,7 @@ import ( "github.com/rabbitmq/rabbitmq-stream-go-client/pkg/amqp" "github.com/rabbitmq/rabbitmq-stream-go-client/pkg/message" test_helper "github.com/rabbitmq/rabbitmq-stream-go-client/pkg/test-helper" + "strconv" "sync" "sync/atomic" "time" @@ -509,4 +510,72 @@ var _ = Describe("Super Stream Producer", Label("super-stream-consumer"), func() Expect(env.Close()).NotTo(HaveOccurred()) }) + It("Super Stream Consumer AutoCommit", func() { + // test the auto commit + env, err := NewEnvironment(nil) + Expect(err).NotTo(HaveOccurred()) + + superStream := "super-stream-consumer-with-autocommit" + Expect(env.DeclareSuperStream(superStream, + NewPartitionsOptions(2))).NotTo(HaveOccurred()) + + superProducer, err := env.NewSuperStreamProducer(superStream, NewSuperStreamProducerOptions( + NewHashRoutingStrategy(func(message message.StreamMessage) string { + return message.GetMessageProperties().GroupID + }))) + Expect(err).NotTo(HaveOccurred()) + + for i := 0; i < 20; i++ { + msg := amqp.NewMessage(make([]byte, 0)) + msg.Properties = &amqp.MessageProperties{ + GroupID: strconv.Itoa(i % 2), + } + Expect(superProducer.Send(msg)).NotTo(HaveOccurred()) + } + + var receivedMessages int32 + handleMessages := func(consumerContext ConsumerContext, message *amqp.Message) { + atomic.AddInt32(&receivedMessages, 1) + } + + superStreamConsumer, err := env.NewSuperStreamConsumer(superStream, handleMessages, + NewSuperStreamConsumerOptions(). + SetOffset(OffsetSpecification{}.First()). + SetConsumerName("auto-commit-consumer"). + // the setting is to trigger the auto commit based on the message count + // the consumer will commit the offset after 9 messages + SetAutoCommit(&AutoCommitStrategy{ + messageCountBeforeStorage: 9, + // flushInterval is set to 50 seconds. So it will be ignored + // messageCountBeforeStorage will be triggered first + flushInterval: 50 * time.Second, + })) + Expect(err).NotTo(HaveOccurred()) + + time.Sleep(1 * time.Second) + Eventually(func() int32 { + return atomic.LoadInt32(&receivedMessages) + }). + WithPolling(300 * time.Millisecond).WithTimeout(5 * time.Second).Should(Equal(int32(20))) + + // Given the partition routing strategy, the consumer will receive 10 messages from each partition + // the consumer triggers the auto-commit after 9 messages. + // So the query offset should return 8 for each partition + offset0, err := env.QueryOffset("auto-commit-consumer", fmt.Sprintf("%s-0", superStream)) + Expect(err).NotTo(HaveOccurred()) + offset1, err := env.QueryOffset("auto-commit-consumer", fmt.Sprintf("%s-1", superStream)) + Expect(err).NotTo(HaveOccurred()) + + Expect(offset0).NotTo(BeNil()) + Expect(offset0).To(Equal(int64(8))) + + Expect(offset1).NotTo(BeNil()) + Expect(offset1).To(Equal(int64(8))) + + Expect(superProducer.Close()).NotTo(HaveOccurred()) + Expect(superStreamConsumer.Close()).NotTo(HaveOccurred()) + Expect(env.DeleteSuperStream(superStream)).NotTo(HaveOccurred()) + Expect(env.Close()).NotTo(HaveOccurred()) + }) + })