From 97dfb2debe246bae51c0e4876331f9767268e777 Mon Sep 17 00:00:00 2001 From: Gabriele Santomaggio Date: Fri, 11 Apr 2025 08:40:17 +0200 Subject: [PATCH 1/4] Detect zombie consumer with a load balancer configuration the disconnection can take time due of heartbeat in this pr the client detects possible zombie consumers Signed-off-by: Gabriele Santomaggio --- pkg/ha/ha_consumer.go | 4 ++-- pkg/stream/client.go | 28 +++++++++++++++++++++++++--- pkg/stream/constants.go | 1 + pkg/stream/consumer.go | 11 ++++++++++- pkg/stream/coordinator.go | 8 +++++++- 5 files changed, 45 insertions(+), 7 deletions(-) diff --git a/pkg/ha/ha_consumer.go b/pkg/ha/ha_consumer.go index 4ceec9ea..0148d0ba 100644 --- a/pkg/ha/ha_consumer.go +++ b/pkg/ha/ha_consumer.go @@ -46,9 +46,9 @@ func (c *ReliableConsumer) GetStatusAsString() string { func (c *ReliableConsumer) handleNotifyClose(channelClose stream.ChannelClose) { go func() { event := <-channelClose - if strings.EqualFold(event.Reason, stream.SocketClosed) || strings.EqualFold(event.Reason, stream.MetaDataUpdate) { + if strings.EqualFold(event.Reason, stream.SocketClosed) || strings.EqualFold(event.Reason, stream.MetaDataUpdate) || strings.EqualFold(event.Reason, stream.ZombieConsumer) { c.setStatus(StatusReconnecting) - logs.LogWarn("[Reliable] - %s closed unexpectedly.. Reconnecting..", c.getInfo()) + logs.LogWarn("[Reliable] - %s closed unexpectedly %s.. Reconnecting..", c.getInfo(), event.Reason) c.bootstrap = false err, reconnected := retry(1, c) if err != nil { diff --git a/pkg/stream/client.go b/pkg/stream/client.go index 9220a550..a6750e93 100644 --- a/pkg/stream/client.go +++ b/pkg/stream/client.go @@ -487,7 +487,10 @@ func (c *Client) Close() error { } } - for _, cs := range c.coordinator.consumers { + for _, cs := range c.coordinator.GetConsumers() { + if cs == nil { + continue + } err := c.coordinator.RemoveConsumerById(cs.(*Consumer).ID, Event{ Command: CommandClose, StreamName: cs.(*Consumer).GetStreamName(), @@ -1019,8 +1022,27 @@ func (c *Client) declareSubscriber(streamName string, } } - case <-time.After(consumer.options.autoCommitStrategy.flushInterval): - consumer.cacheStoreOffset() + case <-time.After(1_000 * time.Millisecond): + if consumer.options.autocommit && time.Since(consumer.getLastAutoCommitStored()) >= consumer.options.autoCommitStrategy.flushInterval { + consumer.cacheStoreOffset() + } + + // This is a very edge case where the consumer is not active anymore + // but the consumer is still in the list of consumers + // It can happen during the reconnection with load-balancing + // found this problem with a caos test where random killing the load-balancer and node where + // the client should be connected + if consumer.isZombie() { + logs.LogWarn("Detected zombie consumer for stream %s, closing", streamName) + consumer.close(Event{ + Command: CommandUnsubscribe, + StreamName: consumer.GetStreamName(), + Name: consumer.GetName(), + Reason: ZombieConsumer, + Err: nil, + }) + return + } } } }() diff --git a/pkg/stream/constants.go b/pkg/stream/constants.go index 82c3ac38..3442fe66 100644 --- a/pkg/stream/constants.go +++ b/pkg/stream/constants.go @@ -114,6 +114,7 @@ const ( defaultConfirmationTimeOut = 10 * time.Second // + ZombieConsumer = "zombie-consumer" SocketClosed = "socket client closed" MetaDataUpdate = "metadata Data update" LeaderLocatorBalanced = "balanced" diff --git a/pkg/stream/consumer.go b/pkg/stream/consumer.go index 1c94d857..e6502437 100644 --- a/pkg/stream/consumer.go +++ b/pkg/stream/consumer.go @@ -54,6 +54,12 @@ func (consumer *Consumer) getStatus() int { return consumer.status } +func (consumer *Consumer) isZombie() bool { + consumer.mutex.Lock() + defer consumer.mutex.Unlock() + return consumer.status == open && !consumer.options.client.socket.isOpen() +} + func (consumer *Consumer) GetStreamName() string { if consumer.options == nil { return "" @@ -341,7 +347,10 @@ func (consumer *Consumer) close(reason Event) error { consumer.closeHandler = nil } - close(consumer.chunkForConsumer) + if consumer.chunkForConsumer != nil { + close(consumer.chunkForConsumer) + consumer.chunkForConsumer = nil + } if consumer.response.data != nil { close(consumer.response.data) diff --git a/pkg/stream/coordinator.go b/pkg/stream/coordinator.go index 37c6fcb3..18d952ea 100644 --- a/pkg/stream/coordinator.go +++ b/pkg/stream/coordinator.go @@ -89,8 +89,14 @@ func (coordinator *Coordinator) RemoveConsumerById(id interface{}, reason Event) return consumer.close(reason) } -func (coordinator *Coordinator) RemoveProducerById(id uint8, reason Event) error { +func (coordinator *Coordinator) GetConsumers() map[interface{}]interface{} { + coordinator.mutex.Lock() + defer coordinator.mutex.Unlock() + return coordinator.consumers +} + +func (coordinator *Coordinator) RemoveProducerById(id uint8, reason Event) error { producer, err := coordinator.ExtractProducerById(id) if err != nil { return err From fb6a5a52c85116e3a4cc12fe5117343613708bff Mon Sep 17 00:00:00 2001 From: Gabriele Santomaggio Date: Fri, 11 Apr 2025 08:43:13 +0200 Subject: [PATCH 2/4] remove noise log Signed-off-by: Gabriele Santomaggio --- pkg/stream/producer_unconfirmed.go | 1 - 1 file changed, 1 deletion(-) diff --git a/pkg/stream/producer_unconfirmed.go b/pkg/stream/producer_unconfirmed.go index cad6c27d..3b2e0a10 100644 --- a/pkg/stream/producer_unconfirmed.go +++ b/pkg/stream/producer_unconfirmed.go @@ -34,7 +34,6 @@ func newUnConfirmed(maxSize int) *unConfirmed { func (u *unConfirmed) addFromSequences(messages []*messageSequence, producerID uint8) { if u.size() > u.maxSize { - logs.LogDebug("unConfirmed size: %d reached, producer blocked", u.maxSize) u.blockSignal.L.Lock() u.blockSignal.Wait() u.blockSignal.L.Unlock() From 80bf0c84bcae794d872d2faf73f8d069a6f23ef1 Mon Sep 17 00:00:00 2001 From: Gabriele Santomaggio Date: Fri, 11 Apr 2025 09:11:01 +0200 Subject: [PATCH 3/4] set handler to nil Signed-off-by: Gabriele Santomaggio --- pkg/stream/client.go | 1 + pkg/stream/consumer.go | 5 +---- 2 files changed, 2 insertions(+), 4 deletions(-) diff --git a/pkg/stream/client.go b/pkg/stream/client.go index a6750e93..fbbbac47 100644 --- a/pkg/stream/client.go +++ b/pkg/stream/client.go @@ -1045,6 +1045,7 @@ func (c *Client) declareSubscriber(streamName string, } } } + }() return consumer, err.Err } diff --git a/pkg/stream/consumer.go b/pkg/stream/consumer.go index e6502437..f04bdbbe 100644 --- a/pkg/stream/consumer.go +++ b/pkg/stream/consumer.go @@ -347,12 +347,9 @@ func (consumer *Consumer) close(reason Event) error { consumer.closeHandler = nil } - if consumer.chunkForConsumer != nil { + if consumer.response.data != nil { close(consumer.chunkForConsumer) - consumer.chunkForConsumer = nil - } - if consumer.response.data != nil { close(consumer.response.data) consumer.response.data = nil } From 2d6091c7727fa8f0f2afefd26da0564735653240 Mon Sep 17 00:00:00 2001 From: Gabriele Santomaggio Date: Tue, 15 Apr 2025 10:16:37 +0200 Subject: [PATCH 4/4] check if is nil Signed-off-by: Gabriele Santomaggio --- pkg/stream/client.go | 24 ++++++++++++------------ 1 file changed, 12 insertions(+), 12 deletions(-) diff --git a/pkg/stream/client.go b/pkg/stream/client.go index fbbbac47..9b25ae54 100644 --- a/pkg/stream/client.go +++ b/pkg/stream/client.go @@ -488,18 +488,18 @@ func (c *Client) Close() error { } for _, cs := range c.coordinator.GetConsumers() { - if cs == nil { - continue - } - err := c.coordinator.RemoveConsumerById(cs.(*Consumer).ID, Event{ - Command: CommandClose, - StreamName: cs.(*Consumer).GetStreamName(), - Name: cs.(*Consumer).GetName(), - Reason: SocketClosed, - Err: nil, - }) - if err != nil { - logs.LogWarn("error removing consumer: %s", err) + if cs != nil { + err := c.coordinator.RemoveConsumerById(cs.(*Consumer).ID, Event{ + Command: CommandClose, + StreamName: cs.(*Consumer).GetStreamName(), + Name: cs.(*Consumer).GetName(), + Reason: SocketClosed, + Err: nil, + }) + + if err != nil { + logs.LogWarn("error removing consumer: %s", err) + } } } if c.getSocket().isOpen() {