diff --git a/pkg/stream/super_stream_producer_test.go b/pkg/stream/super_stream_producer_test.go index deaf2601..90820e09 100644 --- a/pkg/stream/super_stream_producer_test.go +++ b/pkg/stream/super_stream_producer_test.go @@ -518,7 +518,11 @@ var _ = Describe("Super Stream Producer", Label("super-stream-producer"), func() go func() { client, ok := env.producers.getCoordinators()["localhost:5552"].clientsPerContext.Load(1) Expect(ok).To(BeTrue()) + // https://github.com/rabbitmq/rabbitmq-stream-go-client/issues/406 + // we use internal mutex, we can safely call maybeCleanProducers + env.producers.mutex.Lock() client.(*Client).maybeCleanProducers(partitionToClose) + env.producers.mutex.Unlock() }() // Wait for the partition close event