diff --git a/pkg/ha/ha_consumer.go b/pkg/ha/ha_consumer.go index 0148d0ba..fbc1dcfb 100644 --- a/pkg/ha/ha_consumer.go +++ b/pkg/ha/ha_consumer.go @@ -29,18 +29,8 @@ type ReliableConsumer struct { } func (c *ReliableConsumer) GetStatusAsString() string { - switch c.GetStatus() { - case StatusOpen: - return "Open" - case StatusClosed: - return "Closed" - case StatusStreamDoesNotExist: - return "StreamDoesNotExist" - case StatusReconnecting: - return "Reconnecting" - default: - return "Unknown" - } + return getStatusAsString(c) + } func (c *ReliableConsumer) handleNotifyClose(channelClose stream.ChannelClose) { @@ -50,7 +40,7 @@ func (c *ReliableConsumer) handleNotifyClose(channelClose stream.ChannelClose) { c.setStatus(StatusReconnecting) logs.LogWarn("[Reliable] - %s closed unexpectedly %s.. Reconnecting..", c.getInfo(), event.Reason) c.bootstrap = false - err, reconnected := retry(1, c) + err, reconnected := retry(1, c, c.getStreamName()) if err != nil { logs.LogInfo(""+ "[Reliable] - %s won't be reconnected. Error: %s", c.getInfo(), err) diff --git a/pkg/ha/ha_publisher.go b/pkg/ha/ha_publisher.go index 3aa8bd10..3c9c58ec 100644 --- a/pkg/ha/ha_publisher.go +++ b/pkg/ha/ha_publisher.go @@ -30,7 +30,7 @@ func (p *ReliableProducer) handleNotifyClose(channelClose stream.ChannelClose) { waitTime := randomWaitWithBackoff(1) logs.LogWarn("[Reliable] - %s closed unexpectedly.. Reconnecting in %d milliseconds waiting pending messages", p.getInfo(), waitTime) time.Sleep(time.Duration(waitTime) * time.Millisecond) - err, reconnected := retry(1, p) + err, reconnected := retry(1, p, p.getStreamName()) if err != nil { logs.LogInfo( "[Reliable] - %s won't be reconnected. Error: %s", p.getInfo(), err) @@ -186,18 +186,7 @@ func (p *ReliableProducer) GetStatus() int { } func (p *ReliableProducer) GetStatusAsString() string { - switch p.GetStatus() { - case StatusOpen: - return "Open" - case StatusClosed: - return "Closed" - case StatusStreamDoesNotExist: - return "StreamDoesNotExist" - case StatusReconnecting: - return "Reconnecting" - default: - return "Unknown" - } + return getStatusAsString(p) } // IReliable interface diff --git a/pkg/ha/ha_super_stream_consumer.go b/pkg/ha/ha_super_stream_consumer.go new file mode 100644 index 00000000..1ecb438f --- /dev/null +++ b/pkg/ha/ha_super_stream_consumer.go @@ -0,0 +1,55 @@ +package ha + +import ( + "fmt" + "github.com/rabbitmq/rabbitmq-stream-go-client/pkg/stream" + "sync" + "time" +) + +type ReliableSuperStreamConsumer struct { + env *stream.Environment + consumer *stream.SuperStreamConsumer + superStreamName string + consumerOptions *stream.SuperStreamConsumerOptions + mutexStatus *sync.Mutex + status int +} + +func (r ReliableSuperStreamConsumer) setStatus(value int) { + r.mutexStatus.Lock() + defer r.mutexStatus.Unlock() + r.status = value +} + +func (r ReliableSuperStreamConsumer) getInfo() string { + return fmt.Sprintf("consumer %s for super stream %s", + r.consumerOptions.ClientProvidedName, r.superStreamName) +} + +func (r ReliableSuperStreamConsumer) getEnv() *stream.Environment { + return r.env +} + +func (r ReliableSuperStreamConsumer) getNewInstance() newEntityInstance { + return nil +} + +func (r ReliableSuperStreamConsumer) getTimeOut() time.Duration { + //TODO implement me + panic("implement me") +} + +func (r ReliableSuperStreamConsumer) getStreamName() string { + return r.superStreamName +} + +func (r ReliableSuperStreamConsumer) GetStatus() int { + r.mutexStatus.Lock() + defer r.mutexStatus.Unlock() + return r.status +} + +func (r ReliableSuperStreamConsumer) GetStatusAsString() string { + return getStatusAsString(r) +} diff --git a/pkg/ha/reliable_common.go b/pkg/ha/reliable_common.go index 217038e2..008ba47c 100644 --- a/pkg/ha/reliable_common.go +++ b/pkg/ha/reliable_common.go @@ -19,12 +19,13 @@ type newEntityInstance func() error type IReliable interface { setStatus(value int) - GetStatus() int getInfo() string getEnv() *stream.Environment getNewInstance() newEntityInstance getTimeOut() time.Duration getStreamName() string + + GetStatus() int GetStatusAsString() string } @@ -41,41 +42,41 @@ type IReliable interface { // `LeaderNotReady` is a client side error: Stream exists it is Ready but the leader is not elected yet. It is mandatory for the Producer // In both cases it retries the reconnection -func retry(backoff int, reliable IReliable) (error, bool) { +func retry(backoff int, reliable IReliable, streamName string) (error, bool) { waitTime := randomWaitWithBackoff(backoff) - logs.LogInfo("[Reliable] - The %s for the stream %s is in reconnection in %d milliseconds", reliable.getInfo(), reliable.getStreamName(), waitTime) + logs.LogInfo("[Reliable] - The %s for the stream %s is in reconnection in %d milliseconds", reliable.getInfo(), streamName, waitTime) time.Sleep(time.Duration(waitTime) * time.Millisecond) - streamMetaData, errS := reliable.getEnv().StreamMetaData(reliable.getStreamName()) + streamMetaData, errS := reliable.getEnv().StreamMetaData(streamName) if errors.Is(errS, stream.StreamDoesNotExist) { - logs.LogInfo("[Reliable] - The stream %s does not exist for %s. Stopping it", reliable.getStreamName(), reliable.getInfo()) + logs.LogInfo("[Reliable] - The stream %s does not exist for %s. Stopping it", streamName, reliable.getInfo()) return errS, false } if errors.Is(errS, stream.StreamNotAvailable) { - logs.LogInfo("[Reliable] - The stream %s is not available for %s. Trying to reconnect", reliable.getStreamName(), reliable.getInfo()) - return retry(backoff+1, reliable) + logs.LogInfo("[Reliable] - The stream %s is not available for %s. Trying to reconnect", streamName, reliable.getInfo()) + return retry(backoff+1, reliable, streamName) } if errors.Is(errS, stream.LeaderNotReady) { - logs.LogInfo("[Reliable] - The leader for the stream %s is not ready for %s. Trying to reconnect", reliable.getStreamName(), reliable.getInfo()) - return retry(backoff+1, reliable) + logs.LogInfo("[Reliable] - The leader for the stream %s is not ready for %s. Trying to reconnect", streamName, reliable.getInfo()) + return retry(backoff+1, reliable, streamName) } if errors.Is(errS, stream.StreamMetadataFailure) { - logs.LogInfo("[Reliable] - Fail to retrieve the %s metadata for %s. Trying to reconnect", reliable.getStreamName(), reliable.getInfo()) - return retry(backoff+1, reliable) + logs.LogInfo("[Reliable] - Fail to retrieve the %s metadata for %s. Trying to reconnect", streamName, reliable.getInfo()) + return retry(backoff+1, reliable, streamName) } var result error if streamMetaData != nil { - logs.LogInfo("[Reliable] - The stream %s exists. Reconnecting the %s.", reliable.getStreamName(), reliable.getInfo()) + logs.LogInfo("[Reliable] - The stream %s exists. Reconnecting the %s.", streamName, reliable.getInfo()) result = reliable.getNewInstance()() if result == nil { - logs.LogInfo("[Reliable] - The stream %s exists. %s reconnected.", reliable.getInfo(), reliable.getStreamName()) + logs.LogInfo("[Reliable] - The stream %s exists. %s reconnected.", reliable.getInfo(), streamName) } else { - logs.LogInfo("[Reliable] - error %s creating %s for the stream %s. Trying to reconnect", result, reliable.getInfo(), reliable.getStreamName()) - return retry(backoff+1, reliable) + logs.LogInfo("[Reliable] - error %s creating %s for the stream %s. Trying to reconnect", result, reliable.getInfo(), streamName) + return retry(backoff+1, reliable, streamName) } } else { - logs.LogError("[Reliable] - The stream %s does not exist for %s. Closing..", reliable.getStreamName(), reliable.getInfo()) + logs.LogError("[Reliable] - The stream %s does not exist for %s. Closing..", streamName, reliable.getInfo()) return stream.StreamDoesNotExist, false } @@ -96,5 +97,19 @@ func randomWaitWithBackoff(attempt int) int { } return waitTime +} +func getStatusAsString(c IReliable) string { + switch c.GetStatus() { + case StatusOpen: + return "Open" + case StatusClosed: + return "Closed" + case StatusStreamDoesNotExist: + return "StreamDoesNotExist" + case StatusReconnecting: + return "Reconnecting" + default: + return "Unknown" + } }