From 49ac9e93ffa256cb06011798b2ba8c53e3c574a4 Mon Sep 17 00:00:00 2001 From: yurahaid Date: Mon, 26 May 2025 18:16:57 +0300 Subject: [PATCH 1/2] Create test to catch a data race if the producer stopped quickly after start --- pkg/stream/super_stream_producer_test.go | 22 ++++++++++++++++++++++ 1 file changed, 22 insertions(+) diff --git a/pkg/stream/super_stream_producer_test.go b/pkg/stream/super_stream_producer_test.go index d6052923..a3cd4699 100644 --- a/pkg/stream/super_stream_producer_test.go +++ b/pkg/stream/super_stream_producer_test.go @@ -532,5 +532,27 @@ var _ = Describe("Super Stream Producer", Label("super-stream-producer"), func() Expect(env.DeleteSuperStream(superStream)).NotTo(HaveOccurred()) Expect(env.Close()).NotTo(HaveOccurred()) }) + It("should detect potential data races when sending concurrently", func() { + env, err := NewEnvironment(nil) + Expect(err).NotTo(HaveOccurred()) + var superStream = fmt.Sprintf("race-super-stream-%d", time.Now().Unix()) + Expect(env.DeclareSuperStream(superStream, NewPartitionsOptions(10))).NotTo(HaveOccurred()) + + superProducer, err := env.NewSuperStreamProducer(superStream, NewSuperStreamProducerOptions( + NewHashRoutingStrategy(func(message message.StreamMessage) string { + return message.GetApplicationProperties()["routingKey"].(string) + }), + )) + Expect(err).NotTo(HaveOccurred()) + + // example error handling from producer on the client side + go func() { + for _ = range superProducer.NotifyPartitionClose(1) { + // handler errors on client side + } + }() + Expect(env.DeleteSuperStream(superStream)).NotTo(HaveOccurred()) + Expect(env.Close()).NotTo(HaveOccurred()) + }) }) From 191b7c7f56489fcb3f3a13a2d4408837f2f931c8 Mon Sep 17 00:00:00 2001 From: yurahaid Date: Mon, 26 May 2025 19:23:58 +0300 Subject: [PATCH 2/2] Fix: prevent data race when publisher connection is closed shortly after start --- pkg/stream/super_stream_producer.go | 8 +++++--- pkg/stream/super_stream_producer_test.go | 4 +--- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/pkg/stream/super_stream_producer.go b/pkg/stream/super_stream_producer.go index abbc33a8..7cff8739 100644 --- a/pkg/stream/super_stream_producer.go +++ b/pkg/stream/super_stream_producer.go @@ -279,21 +279,20 @@ func (s *SuperStreamProducer) ConnectPartition(partition string) error { event := <-_closedEvent s.mutex.Lock() + defer s.mutex.Unlock() for i := range s.activeProducers { if s.activeProducers[i].GetStreamName() == gpartion { s.activeProducers = append(s.activeProducers[:i], s.activeProducers[i+1:]...) break } } - s.mutex.Unlock() + if s.chSuperStreamPartitionClose != nil { - s.mutex.Lock() s.chSuperStreamPartitionClose <- PPartitionClose{ Partition: gpartion, Event: event, Context: s, } - s.mutex.Unlock() } logs.LogDebug("[SuperStreamProducer] chSuperStreamPartitionClose for partition: %s", gpartion) }(partition, closedEvent) @@ -327,6 +326,9 @@ func (s *SuperStreamProducer) NotifyPublishConfirmation(size int) chan Partition // Event will give the reason of the close // size is the size of the channel func (s *SuperStreamProducer) NotifyPartitionClose(size int) chan PPartitionClose { + s.mutex.Lock() + defer s.mutex.Unlock() + ch := make(chan PPartitionClose, size) s.chSuperStreamPartitionClose = ch return ch diff --git a/pkg/stream/super_stream_producer_test.go b/pkg/stream/super_stream_producer_test.go index a3cd4699..0102a8cd 100644 --- a/pkg/stream/super_stream_producer_test.go +++ b/pkg/stream/super_stream_producer_test.go @@ -547,9 +547,7 @@ var _ = Describe("Super Stream Producer", Label("super-stream-producer"), func() // example error handling from producer on the client side go func() { - for _ = range superProducer.NotifyPartitionClose(1) { - // handler errors on client side - } + superProducer.NotifyPartitionClose(1) }() Expect(env.DeleteSuperStream(superStream)).NotTo(HaveOccurred())