From fc2276ffdebae4cd182874112471cf0d696561c2 Mon Sep 17 00:00:00 2001 From: Alberto Moretti Date: Thu, 18 Jul 2024 23:28:52 +0200 Subject: [PATCH] add BatchSend to HA Producer --- pkg/ha/ha_publisher.go | 43 ++++++++++++++++++++++++++++--------- pkg/ha/ha_publisher_test.go | 19 ++++++++++++---- 2 files changed, 48 insertions(+), 14 deletions(-) diff --git a/pkg/ha/ha_publisher.go b/pkg/ha/ha_publisher.go index 6f20e7e0..620bd4e5 100644 --- a/pkg/ha/ha_publisher.go +++ b/pkg/ha/ha_publisher.go @@ -3,13 +3,14 @@ package ha import ( "errors" "fmt" - "github.com/rabbitmq/rabbitmq-stream-go-client/pkg/logs" - "github.com/rabbitmq/rabbitmq-stream-go-client/pkg/message" - "github.com/rabbitmq/rabbitmq-stream-go-client/pkg/stream" "strings" "sync" "sync/atomic" "time" + + "github.com/rabbitmq/rabbitmq-stream-go-client/pkg/logs" + "github.com/rabbitmq/rabbitmq-stream-go-client/pkg/message" + "github.com/rabbitmq/rabbitmq-stream-go-client/pkg/stream" ) func (p *ReliableProducer) handlePublishConfirm(confirms stream.ChannelPublishConfirm) { @@ -109,12 +110,13 @@ func (p *ReliableProducer) newProducer() error { return err } -func (p *ReliableProducer) Send(message message.StreamMessage) error { +func (p *ReliableProducer) isReadyToSend() error { if p.GetStatus() == StatusStreamDoesNotExist { return stream.StreamDoesNotExist } + if p.GetStatus() == StatusClosed { - return errors.New(fmt.Sprintf("%s is closed", p.getInfo())) + return fmt.Errorf("%s is closed", p.getInfo()) } if p.GetStatus() == StatusReconnecting { @@ -123,11 +125,10 @@ func (p *ReliableProducer) Send(message message.StreamMessage) error { logs.LogDebug("[Reliable] %s reconnected. The send is unlocked", p.getInfo()) } - p.mutex.Lock() - defer p.mutex.Unlock() - - errW := p.producer.Send(message) + return nil +} +func (p *ReliableProducer) checkWriteError(errW error) error { if errW != nil { switch { case errors.Is(errW, stream.FrameTooLarge): @@ -138,10 +139,32 @@ func (p *ReliableProducer) Send(message message.StreamMessage) error { time.Sleep(500 * time.Millisecond) logs.LogError("[Reliable] %s - error during send %s", p.getInfo(), errW.Error()) } + } + return nil +} +func (p *ReliableProducer) Send(message message.StreamMessage) error { + if err := p.isReadyToSend(); err != nil { + return err } - return nil + p.mutex.Lock() + errW := p.producer.Send(message) + p.mutex.Unlock() + + return p.checkWriteError(errW) +} + +func (p *ReliableProducer) BatchSend(batchMessages []message.StreamMessage) error { + if err := p.isReadyToSend(); err != nil { + return err + } + + p.mutex.Lock() + errW := p.producer.BatchSend(batchMessages) + p.mutex.Unlock() + + return p.checkWriteError(errW) } func (p *ReliableProducer) IsOpen() bool { diff --git a/pkg/ha/ha_publisher_test.go b/pkg/ha/ha_publisher_test.go index 012043c4..d45e6ce0 100644 --- a/pkg/ha/ha_publisher_test.go +++ b/pkg/ha/ha_publisher_test.go @@ -1,14 +1,16 @@ package ha import ( + "sync/atomic" + "time" + "github.com/google/uuid" . "github.com/onsi/ginkgo/v2" . "github.com/onsi/gomega" "github.com/rabbitmq/rabbitmq-stream-go-client/pkg/amqp" + "github.com/rabbitmq/rabbitmq-stream-go-client/pkg/message" . "github.com/rabbitmq/rabbitmq-stream-go-client/pkg/stream" - "github.com/rabbitmq/rabbitmq-stream-go-client/pkg/test-helper" - "sync/atomic" - "time" + test_helper "github.com/rabbitmq/rabbitmq-stream-go-client/pkg/test-helper" ) var _ = Describe("Reliable Producer", func() { @@ -44,6 +46,7 @@ var _ = Describe("Reliable Producer", func() { }) It("Create/Confirm and close a Reliable Producer", func() { + const expectedMessages = 20 signal := make(chan struct{}) var confirmed int32 producer, err := NewReliableProducer(envForRProducer, @@ -51,7 +54,7 @@ var _ = Describe("Reliable Producer", func() { for _, confirm := range messageConfirm { Expect(confirm.IsConfirmed()).To(BeTrue()) } - if atomic.AddInt32(&confirmed, int32(len(messageConfirm))) == 10 { + if atomic.AddInt32(&confirmed, int32(len(messageConfirm))) == expectedMessages { signal <- struct{}{} } }) @@ -61,6 +64,14 @@ var _ = Describe("Reliable Producer", func() { err := producer.Send(msg) Expect(err).NotTo(HaveOccurred()) } + + for i := 0; i < 5; i++ { + msg1 := amqp.NewMessage([]byte("ha_batch_1")) + msg2 := amqp.NewMessage([]byte("ha_batch_2")) + batch := []message.StreamMessage{msg1, msg2} + err := producer.BatchSend(batch) + Expect(err).NotTo(HaveOccurred()) + } <-signal Expect(producer.Close()).NotTo(HaveOccurred()) })