From ce1d9305961aaee7a2f070841c373789bc720c04 Mon Sep 17 00:00:00 2001 From: Gabriele Santomaggio Date: Fri, 7 Jun 2024 13:56:25 +0200 Subject: [PATCH 1/3] Fix multi uris connection Fixes: https://github.com/rabbitmq/rabbitmq-stream-go-client/issues/309 Signed-off-by: Gabriele Santomaggio --- pkg/stream/environment.go | 20 +++++++++++-- pkg/stream/environment_test.go | 52 ++++++++++++++++++++++++++++++++++ 2 files changed, 69 insertions(+), 3 deletions(-) diff --git a/pkg/stream/environment.go b/pkg/stream/environment.go index 91924056..5e4e7184 100644 --- a/pkg/stream/environment.go +++ b/pkg/stream/environment.go @@ -47,8 +47,8 @@ func NewEnvironment(options *EnvironmentOptions) (*Environment, error) { if len(options.ConnectionParameters) == 0 { options.ConnectionParameters = []*Broker{newBrokerDefault()} } - - for _, parameter := range options.ConnectionParameters { + var connectionError error + for idx, parameter := range options.ConnectionParameters { if parameter.Uri != "" { u, err := url.Parse(parameter.Uri) @@ -72,13 +72,27 @@ func NewEnvironment(options *EnvironmentOptions) (*Environment, error) { parameter.mergeWithDefault() client.broker = parameter + + connectionError = client.connect() + if connectionError == nil { + break + } else { + nextIfThereIs := "" + if idx < len(options.ConnectionParameters)-1 { + nextIfThereIs = "Trying the next broker..." + } + logs.LogError("New environment creation. Can't connect to the broker: %s port: %s. %s", + parameter.Host, parameter.Port, nextIfThereIs) + + } } + return &Environment{ options: options, producers: newProducers(options.MaxProducersPerClient), consumers: newConsumerEnvironment(options.MaxConsumersPerClient), closed: false, - }, client.connect() + }, connectionError } func (env *Environment) newReconnectClient() (*Client, error) { broker := env.options.ConnectionParameters[0] diff --git a/pkg/stream/environment_test.go b/pkg/stream/environment_test.go index e4144fc9..2deff64f 100644 --- a/pkg/stream/environment_test.go +++ b/pkg/stream/environment_test.go @@ -335,6 +335,58 @@ var _ = Describe("Environment test", func() { Expect(errWrong).To(HaveOccurred()) }) + It("Multi Uris/Multi with some not reachable end-points ", func() { + // To connect the client is enough to have one valid endpoint + // even the other endpoints are not reachable + // https://github.com/rabbitmq/rabbitmq-stream-go-client/issues/309 + + env, err := NewEnvironment(NewEnvironmentOptions(). + SetUris([]string{ + "rabbitmq-stream://guest:guest@localhost:5552/%2f", + "rabbitmq-stream://guest:guest@wrong:5552/%2f", + })) + Expect(err).NotTo(HaveOccurred()) + Expect(env.Close()).NotTo(HaveOccurred()) + + env, err = NewEnvironment(NewEnvironmentOptions(). + SetUris([]string{ + "rabbitmq-stream://guest:guest@wrong:5552/%2f", + "rabbitmq-stream://guest:guest@localhost:5552/%2f", + })) + Expect(err).NotTo(HaveOccurred()) + Expect(env.Close()).NotTo(HaveOccurred()) + + env, err = NewEnvironment(NewEnvironmentOptions(). + SetUris([]string{ + "rabbitmq-stream://guest:guest@wrong:5552/%2f", + "rabbitmq-stream://guest:guest@localhost:5552/%2f", + "rabbitmq-stream://guest:guest@wrong:5552/%2f", + })) + Expect(err).NotTo(HaveOccurred()) + Expect(env.Close()).NotTo(HaveOccurred()) + + env, err = NewEnvironment(NewEnvironmentOptions(). + SetUris([]string{ + "rabbitmq-stream://guest:guest@wrong:5552/%2f", + "rabbitmq-stream://guest:guest@localhost:5552/%2f", + "rabbitmq-stream://guest:guest@wrong:5552/%2f", + "rabbitmq-stream://guest:guest@localhost:5552/%2f", + })) + Expect(err).NotTo(HaveOccurred()) + Expect(env.Close()).NotTo(HaveOccurred()) + + env, err = NewEnvironment(NewEnvironmentOptions(). + SetUris([]string{ + "rabbitmq-stream://guest:guest@wrong:5552/%2f", + "rabbitmq-stream://guest:guest@wrong:5552/%2f", + "rabbitmq-stream://guest:guest@wrong:5552/%2f", + "rabbitmq-stream://guest:guest@wrong:5552/%2f", + })) + Expect(err).NotTo(HaveOccurred()) + Expect(env.Close()).NotTo(HaveOccurred()) + + }) + It("Fail TLS connection", func() { _, err := NewEnvironment(NewEnvironmentOptions(). SetTLSConfig(&tls.Config{InsecureSkipVerify: true}). From 401e6ffa0bbb23816fc40af358dc53cd6c36d6fa Mon Sep 17 00:00:00 2001 From: Gabriele Santomaggio Date: Fri, 7 Jun 2024 14:00:34 +0200 Subject: [PATCH 2/3] Fix multi uris connection Fixes: https://github.com/rabbitmq/rabbitmq-stream-go-client/issues/309 Signed-off-by: Gabriele Santomaggio --- pkg/stream/environment_test.go | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/pkg/stream/environment_test.go b/pkg/stream/environment_test.go index 2deff64f..f9aa25c8 100644 --- a/pkg/stream/environment_test.go +++ b/pkg/stream/environment_test.go @@ -375,6 +375,8 @@ var _ = Describe("Environment test", func() { Expect(err).NotTo(HaveOccurred()) Expect(env.Close()).NotTo(HaveOccurred()) + // in this case all the endpoints are not reachable + // so it will fail env, err = NewEnvironment(NewEnvironmentOptions(). SetUris([]string{ "rabbitmq-stream://guest:guest@wrong:5552/%2f", @@ -382,8 +384,7 @@ var _ = Describe("Environment test", func() { "rabbitmq-stream://guest:guest@wrong:5552/%2f", "rabbitmq-stream://guest:guest@wrong:5552/%2f", })) - Expect(err).NotTo(HaveOccurred()) - Expect(env.Close()).NotTo(HaveOccurred()) + Expect(err).To(HaveOccurred()) }) From f055cf8cdf0fc13a31142b127b00601f41e70b98 Mon Sep 17 00:00:00 2001 From: Gabriele Santomaggio Date: Fri, 7 Jun 2024 14:02:45 +0200 Subject: [PATCH 3/3] Fix multi uris connection Fixes: https://github.com/rabbitmq/rabbitmq-stream-go-client/issues/309 Signed-off-by: Gabriele Santomaggio --- pkg/stream/environment_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/stream/environment_test.go b/pkg/stream/environment_test.go index f9aa25c8..bc5b8f70 100644 --- a/pkg/stream/environment_test.go +++ b/pkg/stream/environment_test.go @@ -377,7 +377,7 @@ var _ = Describe("Environment test", func() { // in this case all the endpoints are not reachable // so it will fail - env, err = NewEnvironment(NewEnvironmentOptions(). + _, err = NewEnvironment(NewEnvironmentOptions(). SetUris([]string{ "rabbitmq-stream://guest:guest@wrong:5552/%2f", "rabbitmq-stream://guest:guest@wrong:5552/%2f",