diff --git a/pkg/stream/super_stream_producer.go b/pkg/stream/super_stream_producer.go index abbc33a8..7cff8739 100644 --- a/pkg/stream/super_stream_producer.go +++ b/pkg/stream/super_stream_producer.go @@ -279,21 +279,20 @@ func (s *SuperStreamProducer) ConnectPartition(partition string) error { event := <-_closedEvent s.mutex.Lock() + defer s.mutex.Unlock() for i := range s.activeProducers { if s.activeProducers[i].GetStreamName() == gpartion { s.activeProducers = append(s.activeProducers[:i], s.activeProducers[i+1:]...) break } } - s.mutex.Unlock() + if s.chSuperStreamPartitionClose != nil { - s.mutex.Lock() s.chSuperStreamPartitionClose <- PPartitionClose{ Partition: gpartion, Event: event, Context: s, } - s.mutex.Unlock() } logs.LogDebug("[SuperStreamProducer] chSuperStreamPartitionClose for partition: %s", gpartion) }(partition, closedEvent) @@ -327,6 +326,9 @@ func (s *SuperStreamProducer) NotifyPublishConfirmation(size int) chan Partition // Event will give the reason of the close // size is the size of the channel func (s *SuperStreamProducer) NotifyPartitionClose(size int) chan PPartitionClose { + s.mutex.Lock() + defer s.mutex.Unlock() + ch := make(chan PPartitionClose, size) s.chSuperStreamPartitionClose = ch return ch diff --git a/pkg/stream/super_stream_producer_test.go b/pkg/stream/super_stream_producer_test.go index d6052923..0102a8cd 100644 --- a/pkg/stream/super_stream_producer_test.go +++ b/pkg/stream/super_stream_producer_test.go @@ -532,5 +532,25 @@ var _ = Describe("Super Stream Producer", Label("super-stream-producer"), func() Expect(env.DeleteSuperStream(superStream)).NotTo(HaveOccurred()) Expect(env.Close()).NotTo(HaveOccurred()) }) + It("should detect potential data races when sending concurrently", func() { + env, err := NewEnvironment(nil) + Expect(err).NotTo(HaveOccurred()) + var superStream = fmt.Sprintf("race-super-stream-%d", time.Now().Unix()) + Expect(env.DeclareSuperStream(superStream, NewPartitionsOptions(10))).NotTo(HaveOccurred()) + superProducer, err := env.NewSuperStreamProducer(superStream, NewSuperStreamProducerOptions( + NewHashRoutingStrategy(func(message message.StreamMessage) string { + return message.GetApplicationProperties()["routingKey"].(string) + }), + )) + Expect(err).NotTo(HaveOccurred()) + + // example error handling from producer on the client side + go func() { + superProducer.NotifyPartitionClose(1) + }() + + Expect(env.DeleteSuperStream(superStream)).NotTo(HaveOccurred()) + Expect(env.Close()).NotTo(HaveOccurred()) + }) })