Skip to content

Commit 104b211

Browse files
authored
Fix multi uris connection (#310)
* Fix multi uris connection * Fixes: #309 --------- Signed-off-by: Gabriele Santomaggio <g.santomaggio@gmail.com>
1 parent e5b3ba9 commit 104b211

File tree

2 files changed

+70
-3
lines changed

2 files changed

+70
-3
lines changed

pkg/stream/environment.go

Lines changed: 17 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -53,8 +53,8 @@ func NewEnvironment(options *EnvironmentOptions) (*Environment, error) {
5353
if len(options.ConnectionParameters) == 0 {
5454
options.ConnectionParameters = []*Broker{newBrokerDefault()}
5555
}
56-
57-
for _, parameter := range options.ConnectionParameters {
56+
var connectionError error
57+
for idx, parameter := range options.ConnectionParameters {
5858

5959
if parameter.Uri != "" {
6060
u, err := url.Parse(parameter.Uri)
@@ -78,13 +78,27 @@ func NewEnvironment(options *EnvironmentOptions) (*Environment, error) {
7878
parameter.mergeWithDefault()
7979

8080
client.broker = parameter
81+
82+
connectionError = client.connect()
83+
if connectionError == nil {
84+
break
85+
} else {
86+
nextIfThereIs := ""
87+
if idx < len(options.ConnectionParameters)-1 {
88+
nextIfThereIs = "Trying the next broker..."
89+
}
90+
logs.LogError("New environment creation. Can't connect to the broker: %s port: %s. %s",
91+
parameter.Host, parameter.Port, nextIfThereIs)
92+
93+
}
8194
}
95+
8296
return &Environment{
8397
options: options,
8498
producers: newProducers(options.MaxProducersPerClient),
8599
consumers: newConsumerEnvironment(options.MaxConsumersPerClient),
86100
closed: false,
87-
}, client.connect()
101+
}, connectionError
88102
}
89103
func (env *Environment) newReconnectClient() (*Client, error) {
90104
broker := env.options.ConnectionParameters[0]

pkg/stream/environment_test.go

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -336,6 +336,59 @@ var _ = Describe("Environment test", func() {
336336
Expect(errWrong).To(HaveOccurred())
337337
})
338338

339+
It("Multi Uris/Multi with some not reachable end-points ", func() {
340+
// To connect the client is enough to have one valid endpoint
341+
// even the other endpoints are not reachable
342+
// https://github.com/rabbitmq/rabbitmq-stream-go-client/issues/309
343+
344+
env, err := NewEnvironment(NewEnvironmentOptions().
345+
SetUris([]string{
346+
"rabbitmq-stream://guest:guest@localhost:5552/%2f",
347+
"rabbitmq-stream://guest:guest@wrong:5552/%2f",
348+
}))
349+
Expect(err).NotTo(HaveOccurred())
350+
Expect(env.Close()).NotTo(HaveOccurred())
351+
352+
env, err = NewEnvironment(NewEnvironmentOptions().
353+
SetUris([]string{
354+
"rabbitmq-stream://guest:guest@wrong:5552/%2f",
355+
"rabbitmq-stream://guest:guest@localhost:5552/%2f",
356+
}))
357+
Expect(err).NotTo(HaveOccurred())
358+
Expect(env.Close()).NotTo(HaveOccurred())
359+
360+
env, err = NewEnvironment(NewEnvironmentOptions().
361+
SetUris([]string{
362+
"rabbitmq-stream://guest:guest@wrong:5552/%2f",
363+
"rabbitmq-stream://guest:guest@localhost:5552/%2f",
364+
"rabbitmq-stream://guest:guest@wrong:5552/%2f",
365+
}))
366+
Expect(err).NotTo(HaveOccurred())
367+
Expect(env.Close()).NotTo(HaveOccurred())
368+
369+
env, err = NewEnvironment(NewEnvironmentOptions().
370+
SetUris([]string{
371+
"rabbitmq-stream://guest:guest@wrong:5552/%2f",
372+
"rabbitmq-stream://guest:guest@localhost:5552/%2f",
373+
"rabbitmq-stream://guest:guest@wrong:5552/%2f",
374+
"rabbitmq-stream://guest:guest@localhost:5552/%2f",
375+
}))
376+
Expect(err).NotTo(HaveOccurred())
377+
Expect(env.Close()).NotTo(HaveOccurred())
378+
379+
// in this case all the endpoints are not reachable
380+
// so it will fail
381+
_, err = NewEnvironment(NewEnvironmentOptions().
382+
SetUris([]string{
383+
"rabbitmq-stream://guest:guest@wrong:5552/%2f",
384+
"rabbitmq-stream://guest:guest@wrong:5552/%2f",
385+
"rabbitmq-stream://guest:guest@wrong:5552/%2f",
386+
"rabbitmq-stream://guest:guest@wrong:5552/%2f",
387+
}))
388+
Expect(err).To(HaveOccurred())
389+
390+
})
391+
339392
It("Fail TLS connection", func() {
340393
_, err := NewEnvironment(NewEnvironmentOptions().
341394
SetTLSConfig(&tls.Config{InsecureSkipVerify: true}).

0 commit comments

Comments
 (0)