Skip to content

Implement super stream ha producer #401

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 2 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 3 additions & 13 deletions pkg/ha/ha_consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,18 +29,8 @@ type ReliableConsumer struct {
}

func (c *ReliableConsumer) GetStatusAsString() string {
switch c.GetStatus() {
case StatusOpen:
return "Open"
case StatusClosed:
return "Closed"
case StatusStreamDoesNotExist:
return "StreamDoesNotExist"
case StatusReconnecting:
return "Reconnecting"
default:
return "Unknown"
}
return getStatusAsString(c)

}

func (c *ReliableConsumer) handleNotifyClose(channelClose stream.ChannelClose) {
Expand All @@ -50,7 +40,7 @@ func (c *ReliableConsumer) handleNotifyClose(channelClose stream.ChannelClose) {
c.setStatus(StatusReconnecting)
logs.LogWarn("[Reliable] - %s closed unexpectedly %s.. Reconnecting..", c.getInfo(), event.Reason)
c.bootstrap = false
err, reconnected := retry(1, c)
err, reconnected := retry(1, c, c.getStreamName())
if err != nil {
logs.LogInfo(""+
"[Reliable] - %s won't be reconnected. Error: %s", c.getInfo(), err)
Expand Down
15 changes: 2 additions & 13 deletions pkg/ha/ha_publisher.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ func (p *ReliableProducer) handleNotifyClose(channelClose stream.ChannelClose) {
waitTime := randomWaitWithBackoff(1)
logs.LogWarn("[Reliable] - %s closed unexpectedly.. Reconnecting in %d milliseconds waiting pending messages", p.getInfo(), waitTime)
time.Sleep(time.Duration(waitTime) * time.Millisecond)
err, reconnected := retry(1, p)
err, reconnected := retry(1, p, p.getStreamName())
if err != nil {
logs.LogInfo(
"[Reliable] - %s won't be reconnected. Error: %s", p.getInfo(), err)
Expand Down Expand Up @@ -186,18 +186,7 @@ func (p *ReliableProducer) GetStatus() int {
}

func (p *ReliableProducer) GetStatusAsString() string {
switch p.GetStatus() {
case StatusOpen:
return "Open"
case StatusClosed:
return "Closed"
case StatusStreamDoesNotExist:
return "StreamDoesNotExist"
case StatusReconnecting:
return "Reconnecting"
default:
return "Unknown"
}
return getStatusAsString(p)
}

// IReliable interface
Expand Down
55 changes: 55 additions & 0 deletions pkg/ha/ha_super_stream_consumer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
package ha

import (
"fmt"
"github.com/rabbitmq/rabbitmq-stream-go-client/pkg/stream"
"sync"
"time"
)

type ReliableSuperStreamConsumer struct {
env *stream.Environment
consumer *stream.SuperStreamConsumer
superStreamName string
consumerOptions *stream.SuperStreamConsumerOptions
mutexStatus *sync.Mutex
status int
}

func (r ReliableSuperStreamConsumer) setStatus(value int) {
r.mutexStatus.Lock()
defer r.mutexStatus.Unlock()
r.status = value
}

func (r ReliableSuperStreamConsumer) getInfo() string {
return fmt.Sprintf("consumer %s for super stream %s",
r.consumerOptions.ClientProvidedName, r.superStreamName)
}

func (r ReliableSuperStreamConsumer) getEnv() *stream.Environment {
return r.env
}

func (r ReliableSuperStreamConsumer) getNewInstance() newEntityInstance {
return nil
}

func (r ReliableSuperStreamConsumer) getTimeOut() time.Duration {
//TODO implement me
panic("implement me")
}

func (r ReliableSuperStreamConsumer) getStreamName() string {
return r.superStreamName
}

func (r ReliableSuperStreamConsumer) GetStatus() int {
r.mutexStatus.Lock()
defer r.mutexStatus.Unlock()
return r.status
}

func (r ReliableSuperStreamConsumer) GetStatusAsString() string {
return getStatusAsString(r)
}
47 changes: 31 additions & 16 deletions pkg/ha/reliable_common.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,13 @@ type newEntityInstance func() error

type IReliable interface {
setStatus(value int)
GetStatus() int
getInfo() string
getEnv() *stream.Environment
getNewInstance() newEntityInstance
getTimeOut() time.Duration
getStreamName() string

GetStatus() int
GetStatusAsString() string
}

Expand All @@ -41,41 +42,41 @@ type IReliable interface {
// `LeaderNotReady` is a client side error: Stream exists it is Ready but the leader is not elected yet. It is mandatory for the Producer
// In both cases it retries the reconnection

func retry(backoff int, reliable IReliable) (error, bool) {
func retry(backoff int, reliable IReliable, streamName string) (error, bool) {
waitTime := randomWaitWithBackoff(backoff)
logs.LogInfo("[Reliable] - The %s for the stream %s is in reconnection in %d milliseconds", reliable.getInfo(), reliable.getStreamName(), waitTime)
logs.LogInfo("[Reliable] - The %s for the stream %s is in reconnection in %d milliseconds", reliable.getInfo(), streamName, waitTime)
time.Sleep(time.Duration(waitTime) * time.Millisecond)
streamMetaData, errS := reliable.getEnv().StreamMetaData(reliable.getStreamName())
streamMetaData, errS := reliable.getEnv().StreamMetaData(streamName)
if errors.Is(errS, stream.StreamDoesNotExist) {
logs.LogInfo("[Reliable] - The stream %s does not exist for %s. Stopping it", reliable.getStreamName(), reliable.getInfo())
logs.LogInfo("[Reliable] - The stream %s does not exist for %s. Stopping it", streamName, reliable.getInfo())
return errS, false
}
if errors.Is(errS, stream.StreamNotAvailable) {
logs.LogInfo("[Reliable] - The stream %s is not available for %s. Trying to reconnect", reliable.getStreamName(), reliable.getInfo())
return retry(backoff+1, reliable)
logs.LogInfo("[Reliable] - The stream %s is not available for %s. Trying to reconnect", streamName, reliable.getInfo())
return retry(backoff+1, reliable, streamName)
}
if errors.Is(errS, stream.LeaderNotReady) {
logs.LogInfo("[Reliable] - The leader for the stream %s is not ready for %s. Trying to reconnect", reliable.getStreamName(), reliable.getInfo())
return retry(backoff+1, reliable)
logs.LogInfo("[Reliable] - The leader for the stream %s is not ready for %s. Trying to reconnect", streamName, reliable.getInfo())
return retry(backoff+1, reliable, streamName)
}

if errors.Is(errS, stream.StreamMetadataFailure) {
logs.LogInfo("[Reliable] - Fail to retrieve the %s metadata for %s. Trying to reconnect", reliable.getStreamName(), reliable.getInfo())
return retry(backoff+1, reliable)
logs.LogInfo("[Reliable] - Fail to retrieve the %s metadata for %s. Trying to reconnect", streamName, reliable.getInfo())
return retry(backoff+1, reliable, streamName)
}

var result error
if streamMetaData != nil {
logs.LogInfo("[Reliable] - The stream %s exists. Reconnecting the %s.", reliable.getStreamName(), reliable.getInfo())
logs.LogInfo("[Reliable] - The stream %s exists. Reconnecting the %s.", streamName, reliable.getInfo())
result = reliable.getNewInstance()()
if result == nil {
logs.LogInfo("[Reliable] - The stream %s exists. %s reconnected.", reliable.getInfo(), reliable.getStreamName())
logs.LogInfo("[Reliable] - The stream %s exists. %s reconnected.", reliable.getInfo(), streamName)
} else {
logs.LogInfo("[Reliable] - error %s creating %s for the stream %s. Trying to reconnect", result, reliable.getInfo(), reliable.getStreamName())
return retry(backoff+1, reliable)
logs.LogInfo("[Reliable] - error %s creating %s for the stream %s. Trying to reconnect", result, reliable.getInfo(), streamName)
return retry(backoff+1, reliable, streamName)
}
} else {
logs.LogError("[Reliable] - The stream %s does not exist for %s. Closing..", reliable.getStreamName(), reliable.getInfo())
logs.LogError("[Reliable] - The stream %s does not exist for %s. Closing..", streamName, reliable.getInfo())
return stream.StreamDoesNotExist, false
}

Expand All @@ -96,5 +97,19 @@ func randomWaitWithBackoff(attempt int) int {
}

return waitTime
}

func getStatusAsString(c IReliable) string {
switch c.GetStatus() {
case StatusOpen:
return "Open"
case StatusClosed:
return "Closed"
case StatusStreamDoesNotExist:
return "StreamDoesNotExist"
case StatusReconnecting:
return "Reconnecting"
default:
return "Unknown"
}
}
Loading