From f705e1675d0514aa63f8a88be1ad5af69b218e49 Mon Sep 17 00:00:00 2001 From: Alberto Moretti Date: Fri, 19 Jul 2024 10:32:30 +0200 Subject: [PATCH 1/3] fix lock on concurrent writes while reconnecting --- pkg/ha/ha_publisher.go | 17 +++++----- pkg/ha/ha_publisher_test.go | 62 +++++++++++++++++++++++++++++++++++++ 2 files changed, 71 insertions(+), 8 deletions(-) diff --git a/pkg/ha/ha_publisher.go b/pkg/ha/ha_publisher.go index 620bd4e5..7e743d84 100644 --- a/pkg/ha/ha_publisher.go +++ b/pkg/ha/ha_publisher.go @@ -42,10 +42,9 @@ func (p *ReliableProducer) handleNotifyClose(channelClose stream.ChannelClose) { p.setStatus(StatusClosed) } - select { - case p.reconnectionSignal <- struct{}{}: - case <-time.After(2 * time.Second): - } + p.reconnectionSignal.L.Lock() + p.reconnectionSignal.Broadcast() + p.reconnectionSignal.L.Unlock() } }() } @@ -53,7 +52,7 @@ func (p *ReliableProducer) handleNotifyClose(channelClose stream.ChannelClose) { // ReliableProducer is a producer that can reconnect in case of connection problems // the function handlePublishConfirm is mandatory // in case of problems the messages have the message.Confirmed == false -// The function `send` is blocked during the reconnection +// The functions `Send` and `SendBatch` are blocked during the reconnection type ReliableProducer struct { env *stream.Environment producer *stream.Producer @@ -64,7 +63,7 @@ type ReliableProducer struct { mutex *sync.Mutex mutexStatus *sync.Mutex status int - reconnectionSignal chan struct{} + reconnectionSignal *sync.Cond } type ConfirmMessageHandler func(messageConfirm []*stream.ConfirmationStatus) @@ -81,7 +80,7 @@ func NewReliableProducer(env *stream.Environment, streamName string, mutex: &sync.Mutex{}, mutexStatus: &sync.Mutex{}, confirmMessageHandler: confirmMessageHandler, - reconnectionSignal: make(chan struct{}), + reconnectionSignal: sync.NewCond(&sync.Mutex{}), } if confirmMessageHandler == nil { return nil, fmt.Errorf("the confirmation message handler is mandatory") @@ -121,7 +120,9 @@ func (p *ReliableProducer) isReadyToSend() error { if p.GetStatus() == StatusReconnecting { logs.LogDebug("[Reliable] %s is reconnecting. The send is blocked", p.getInfo()) - <-p.reconnectionSignal + p.reconnectionSignal.L.Lock() + p.reconnectionSignal.Wait() + p.reconnectionSignal.L.Unlock() logs.LogDebug("[Reliable] %s reconnected. The send is unlocked", p.getInfo()) } diff --git a/pkg/ha/ha_publisher_test.go b/pkg/ha/ha_publisher_test.go index d45e6ce0..009d1c66 100644 --- a/pkg/ha/ha_publisher_test.go +++ b/pkg/ha/ha_publisher_test.go @@ -123,6 +123,68 @@ var _ = Describe("Reliable Producer", func() { Expect(producer.Close()).NotTo(HaveOccurred()) }) + It("unblock all Reliable Producer sends while restarting with concurrent writes", func() { + signal := make(chan struct{}) + var confirmed int32 + clientProvidedName := uuid.New().String() + producer, err := NewReliableProducer(envForRProducer, + streamForRProducer, + NewProducerOptions(). + SetClientProvidedName(clientProvidedName), + func(messageConfirm []*ConfirmationStatus) { + for _, confirm := range messageConfirm { + Expect(confirm.IsConfirmed()).To(BeTrue()) + } + if atomic.AddInt32(&confirmed, int32(len(messageConfirm))) == 2 { + signal <- struct{}{} + } + }) + Expect(err).NotTo(HaveOccurred()) + + time.Sleep(1 * time.Second) + connectionToDrop := "" + Eventually(func() bool { + connections, err := test_helper.Connections("15672") + if err != nil { + return false + } + for _, connection := range connections { + if connection.ClientProperties.Connection_name == clientProvidedName { + connectionToDrop = connection.Name + return true + } + } + return false + }, time.Second*5). + Should(BeTrue()) + + Expect(connectionToDrop).NotTo(BeEmpty()) + + // concurret writes while reconnecting + sendMsg := func() { + msg := amqp.NewMessage([]byte("ha")) + batch := []message.StreamMessage{msg} + err := producer.BatchSend(batch) + Expect(err).NotTo(HaveOccurred()) + } + + // kill the connection + errDrop := test_helper.DropConnection(connectionToDrop, "15672") + Expect(errDrop).NotTo(HaveOccurred()) + + // wait for the producer to be in reconnecting state + Eventually(func() bool { + return producer.GetStatus() == StatusReconnecting + }, time.Second*5, time.Millisecond). + Should(BeTrue()) + + go sendMsg() + go sendMsg() + + <-signal + Expect(producer.Close()).NotTo(HaveOccurred()) + }) + It("Delete the stream should close the producer", func() { producer, err := NewReliableProducer(envForRProducer, streamForRProducer, NewProducerOptions(), func(messageConfirm []*ConfirmationStatus) { From 9c2702313dfb6efe02e719c3605e3c794137b70a Mon Sep 17 00:00:00 2001 From: Alberto Moretti Date: Fri, 19 Jul 2024 11:19:26 +0200 Subject: [PATCH 2/3] refactor test --- pkg/ha/ha_publisher_test.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/pkg/ha/ha_publisher_test.go b/pkg/ha/ha_publisher_test.go index 009d1c66..131aeb2e 100644 --- a/pkg/ha/ha_publisher_test.go +++ b/pkg/ha/ha_publisher_test.go @@ -124,6 +124,7 @@ var _ = Describe("Reliable Producer", func() { }) It("unblock all Reliable Producer sends while restarting with concurrent writes", func() { + const expectedMessages = 2 signal := make(chan struct{}) var confirmed int32 clientProvidedName := uuid.New().String() @@ -135,7 +136,7 @@ var _ = Describe("Reliable Producer", func() { for _, confirm := range messageConfirm { Expect(confirm.IsConfirmed()).To(BeTrue()) } - if atomic.AddInt32(&confirmed, int32(len(messageConfirm))) == 2 { + if atomic.AddInt32(&confirmed, int32(len(messageConfirm))) == expectedMessages { signal <- struct{}{} } }) From 4229fe79a9c446a2dd8ac1d1b06ed39fbbe9a87a Mon Sep 17 00:00:00 2001 From: Alberto Moretti Date: Mon, 22 Jul 2024 12:28:09 +0200 Subject: [PATCH 3/3] example: update reliable client example --- examples/reliable/reliable_client.go | 57 +++++++++++++++------------- 1 file changed, 31 insertions(+), 26 deletions(-) diff --git a/examples/reliable/reliable_client.go b/examples/reliable/reliable_client.go index d4a8db94..230431b4 100644 --- a/examples/reliable/reliable_client.go +++ b/examples/reliable/reliable_client.go @@ -4,22 +4,20 @@ import ( "bufio" "errors" "fmt" - "github.com/rabbitmq/rabbitmq-stream-go-client/pkg/amqp" - "github.com/rabbitmq/rabbitmq-stream-go-client/pkg/ha" - "github.com/rabbitmq/rabbitmq-stream-go-client/pkg/message" - "github.com/rabbitmq/rabbitmq-stream-go-client/pkg/stream" "os" "sync" "sync/atomic" "time" -) -// The ha producer and consumer provide a way to auto-reconnect in case of connection problems - -import ( + "github.com/rabbitmq/rabbitmq-stream-go-client/pkg/amqp" + "github.com/rabbitmq/rabbitmq-stream-go-client/pkg/ha" "github.com/rabbitmq/rabbitmq-stream-go-client/pkg/logs" + "github.com/rabbitmq/rabbitmq-stream-go-client/pkg/message" + "github.com/rabbitmq/rabbitmq-stream-go-client/pkg/stream" ) +// The ha producer and consumer provide a way to auto-reconnect in case of connection problems + func CheckErr(err error) { if err != nil { fmt.Printf("%s ", err) @@ -37,6 +35,7 @@ func main() { // Tune the parameters to test the reliability const messagesToSend = 5_000_000 const numberOfProducers = 2 + const concurrentProducers = 2 const numberOfConsumers = 2 const sendDelay = 1 * time.Millisecond const delayEachMessages = 200 @@ -81,8 +80,9 @@ func main() { go func() { for isRunning { totalConfirmed := atomic.LoadInt32(&confirmed) + atomic.LoadInt32(&fail) - fmt.Printf("%s - ToSend: %d - nProducers: %d - nConsumers %d \n", time.Now().Format(time.RFC822), - messagesToSend*numberOfProducers, numberOfProducers, numberOfConsumers) + expectedMessages := messagesToSend * numberOfProducers * concurrentProducers + fmt.Printf("%s - ToSend: %d - nProducers: %d - concurrentProducers: %d - nConsumers %d \n", time.Now().Format(time.RFC822), + expectedMessages, numberOfProducers, concurrentProducers, numberOfConsumers) fmt.Printf("Sent:%d - ReSent %d - Confirmed:%d - Not confirmed:%d - Fail+Confirmed :%d \n", sent, atomic.LoadInt32(&reSent), atomic.LoadInt32(&confirmed), atomic.LoadInt32(&fail), totalConfirmed) fmt.Printf("Total Consumed: %d - Per consumer: %d \n", atomic.LoadInt32(&consumed), @@ -120,22 +120,27 @@ func main() { CheckErr(err) producers = append(producers, rProducer) go func() { - for i := 0; i < messagesToSend; i++ { - msg := amqp.NewMessage([]byte("ha")) - mutex.Lock() - for _, confirmedMessage := range unConfirmedMessages { - err := rProducer.Send(confirmedMessage) - atomic.AddInt32(&reSent, 1) - CheckErr(err) - } - unConfirmedMessages = []message.StreamMessage{} - mutex.Unlock() - err := rProducer.Send(msg) - if i%delayEachMessages == 0 { - time.Sleep(sendDelay) - } - atomic.AddInt32(&sent, 1) - CheckErr(err) + for i := 0; i < concurrentProducers; i++ { + go func() { + for i := 0; i < messagesToSend; i++ { + msg := amqp.NewMessage([]byte("ha")) + mutex.Lock() + for _, confirmedMessage := range unConfirmedMessages { + err := rProducer.Send(confirmedMessage) + atomic.AddInt32(&reSent, 1) + CheckErr(err) + } + unConfirmedMessages = []message.StreamMessage{} + mutex.Unlock() + err := rProducer.Send(msg) + if i%delayEachMessages == 0 { + time.Sleep(sendDelay) + } + atomic.AddInt32(&sent, 1) + CheckErr(err) + + } + }() } }() }