diff --git a/pkg/stream/environment.go b/pkg/stream/environment.go index 68d68387..c0eeef10 100644 --- a/pkg/stream/environment.go +++ b/pkg/stream/environment.go @@ -53,8 +53,8 @@ func NewEnvironment(options *EnvironmentOptions) (*Environment, error) { if len(options.ConnectionParameters) == 0 { options.ConnectionParameters = []*Broker{newBrokerDefault()} } - - for _, parameter := range options.ConnectionParameters { + var connectionError error + for idx, parameter := range options.ConnectionParameters { if parameter.Uri != "" { u, err := url.Parse(parameter.Uri) @@ -78,13 +78,27 @@ func NewEnvironment(options *EnvironmentOptions) (*Environment, error) { parameter.mergeWithDefault() client.broker = parameter + + connectionError = client.connect() + if connectionError == nil { + break + } else { + nextIfThereIs := "" + if idx < len(options.ConnectionParameters)-1 { + nextIfThereIs = "Trying the next broker..." + } + logs.LogError("New environment creation. Can't connect to the broker: %s port: %s. %s", + parameter.Host, parameter.Port, nextIfThereIs) + + } } + return &Environment{ options: options, producers: newProducers(options.MaxProducersPerClient), consumers: newConsumerEnvironment(options.MaxConsumersPerClient), closed: false, - }, client.connect() + }, connectionError } func (env *Environment) newReconnectClient() (*Client, error) { broker := env.options.ConnectionParameters[0] diff --git a/pkg/stream/environment_test.go b/pkg/stream/environment_test.go index d6917aa1..4a31f386 100644 --- a/pkg/stream/environment_test.go +++ b/pkg/stream/environment_test.go @@ -336,6 +336,59 @@ var _ = Describe("Environment test", func() { Expect(errWrong).To(HaveOccurred()) }) + It("Multi Uris/Multi with some not reachable end-points ", func() { + // To connect the client is enough to have one valid endpoint + // even the other endpoints are not reachable + // https://github.com/rabbitmq/rabbitmq-stream-go-client/issues/309 + + env, err := NewEnvironment(NewEnvironmentOptions(). + SetUris([]string{ + "rabbitmq-stream://guest:guest@localhost:5552/%2f", + "rabbitmq-stream://guest:guest@wrong:5552/%2f", + })) + Expect(err).NotTo(HaveOccurred()) + Expect(env.Close()).NotTo(HaveOccurred()) + + env, err = NewEnvironment(NewEnvironmentOptions(). + SetUris([]string{ + "rabbitmq-stream://guest:guest@wrong:5552/%2f", + "rabbitmq-stream://guest:guest@localhost:5552/%2f", + })) + Expect(err).NotTo(HaveOccurred()) + Expect(env.Close()).NotTo(HaveOccurred()) + + env, err = NewEnvironment(NewEnvironmentOptions(). + SetUris([]string{ + "rabbitmq-stream://guest:guest@wrong:5552/%2f", + "rabbitmq-stream://guest:guest@localhost:5552/%2f", + "rabbitmq-stream://guest:guest@wrong:5552/%2f", + })) + Expect(err).NotTo(HaveOccurred()) + Expect(env.Close()).NotTo(HaveOccurred()) + + env, err = NewEnvironment(NewEnvironmentOptions(). + SetUris([]string{ + "rabbitmq-stream://guest:guest@wrong:5552/%2f", + "rabbitmq-stream://guest:guest@localhost:5552/%2f", + "rabbitmq-stream://guest:guest@wrong:5552/%2f", + "rabbitmq-stream://guest:guest@localhost:5552/%2f", + })) + Expect(err).NotTo(HaveOccurred()) + Expect(env.Close()).NotTo(HaveOccurred()) + + // in this case all the endpoints are not reachable + // so it will fail + _, err = NewEnvironment(NewEnvironmentOptions(). + SetUris([]string{ + "rabbitmq-stream://guest:guest@wrong:5552/%2f", + "rabbitmq-stream://guest:guest@wrong:5552/%2f", + "rabbitmq-stream://guest:guest@wrong:5552/%2f", + "rabbitmq-stream://guest:guest@wrong:5552/%2f", + })) + Expect(err).To(HaveOccurred()) + + }) + It("Fail TLS connection", func() { _, err := NewEnvironment(NewEnvironmentOptions(). SetTLSConfig(&tls.Config{InsecureSkipVerify: true}).