From 0ef8e94b0ecb40d25c6220f43614a53724eb7b40 Mon Sep 17 00:00:00 2001 From: Gabriele Santomaggio Date: Tue, 1 Apr 2025 10:23:04 +0200 Subject: [PATCH 1/3] Fix autorecconetion in query metadata Signed-off-by: Gabriele Santomaggio --- .../Reliable/ConsumerFactory.cs | 65 ++++++++++++++----- .../Reliable/ProducerFactory.cs | 35 +++++++--- .../Reliable/ReliableBase.cs | 18 ++--- 3 files changed, 85 insertions(+), 33 deletions(-) diff --git a/RabbitMQ.Stream.Client/Reliable/ConsumerFactory.cs b/RabbitMQ.Stream.Client/Reliable/ConsumerFactory.cs index e0f07332..dde70adf 100644 --- a/RabbitMQ.Stream.Client/Reliable/ConsumerFactory.cs +++ b/RabbitMQ.Stream.Client/Reliable/ConsumerFactory.cs @@ -2,8 +2,10 @@ // 2.0, and the Mozilla Public License, version 2.0. // Copyright (c) 2017-2023 Broadcom. All Rights Reserved. The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries. +using System; using System.Collections.Concurrent; using System.Threading.Tasks; +using Microsoft.Extensions.Logging; namespace RabbitMQ.Stream.Client.Reliable; @@ -62,16 +64,32 @@ private async Task StandardConsumer(bool boot) if (IsClosedNormally(closeReason)) return; - await OnEntityClosed(_consumerConfig.StreamSystem, _consumerConfig.Stream, - FromConnectionClosedReasonToStatusReason(closeReason)).ConfigureAwait(false); + try + { + await OnEntityClosed(_consumerConfig.StreamSystem, _consumerConfig.Stream, + FromConnectionClosedReasonToStatusReason(closeReason)).ConfigureAwait(false); + } + catch (Exception e) + { + BaseLogger?.LogError(e, + $"Stream consumer.MetadataHandler error. Auto recovery failed for: {ToString()}"); + } }, MetadataHandler = async _ => { if (IsClosedNormally()) return; - await OnEntityClosed(_consumerConfig.StreamSystem, _consumerConfig.Stream, - ChangeStatusReason.MetaDataUpdate).ConfigureAwait(false); + try + { + await OnEntityClosed(_consumerConfig.StreamSystem, _consumerConfig.Stream, + ChangeStatusReason.MetaDataUpdate).ConfigureAwait(false); + } + catch (Exception e) + { + BaseLogger?.LogError(e, + $"Stream consumer.MetadataHandler error. Auto recovery failed for: {ToString()}"); + } }, MessageHandler = async (consumer, ctx, message) => { @@ -129,21 +147,36 @@ private async Task SuperConsumer(bool boot) await RandomWait().ConfigureAwait(false); if (IsClosedNormally(closeReason)) return; - - var r = ((RawSuperStreamConsumer)(_consumer)).ReconnectPartition; - await OnEntityClosed(_consumerConfig.StreamSystem, partitionStream, r, - FromConnectionClosedReasonToStatusReason(closeReason)).ConfigureAwait(false); + try + { + var r = ((RawSuperStreamConsumer)(_consumer)).ReconnectPartition; + await OnEntityClosed(_consumerConfig.StreamSystem, partitionStream, r, + FromConnectionClosedReasonToStatusReason(closeReason)).ConfigureAwait(false); + } + catch (Exception e) + { + BaseLogger?.LogError(e, + $"Super stream consumer. MetadataHandler error. Auto recovery failed for: {ToString()}"); + } }, MetadataHandler = async update => { - await RandomWait().ConfigureAwait(false); - if (IsClosedNormally()) - return; - - var r = ((RawSuperStreamConsumer)(_consumer)).ReconnectPartition; - await OnEntityClosed(_consumerConfig.StreamSystem, update.Stream, r, - ChangeStatusReason.MetaDataUpdate) - .ConfigureAwait(false); + try + { + await RandomWait().ConfigureAwait(false); + if (IsClosedNormally()) + return; + + var r = ((RawSuperStreamConsumer)(_consumer)).ReconnectPartition; + await OnEntityClosed(_consumerConfig.StreamSystem, update.Stream, r, + ChangeStatusReason.MetaDataUpdate) + .ConfigureAwait(false); + } + catch (Exception e) + { + BaseLogger?.LogError(e, + $"Super stream consumer.MetadataHandler error. Auto recovery failed for: {ToString()}"); + } }, MessageHandler = async (partitionStream, consumer, ctx, message) => { diff --git a/RabbitMQ.Stream.Client/Reliable/ProducerFactory.cs b/RabbitMQ.Stream.Client/Reliable/ProducerFactory.cs index dd3fd136..ca203797 100644 --- a/RabbitMQ.Stream.Client/Reliable/ProducerFactory.cs +++ b/RabbitMQ.Stream.Client/Reliable/ProducerFactory.cs @@ -2,7 +2,9 @@ // 2.0, and the Mozilla Public License, version 2.0. // Copyright (c) 2017-2023 Broadcom. All Rights Reserved. The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries. +using System; using System.Threading.Tasks; +using Microsoft.Extensions.Logging; namespace RabbitMQ.Stream.Client.Reliable; @@ -49,21 +51,36 @@ private async Task SuperStreamProducer(bool boot) await RandomWait().ConfigureAwait(false); if (IsClosedNormally(closeReason)) return; - var r = ((RawSuperStreamProducer)(_producer)).ReconnectPartition; - await OnEntityClosed(_producerConfig.StreamSystem, partitionStream, r, - ReliableBase.FromConnectionClosedReasonToStatusReason(closeReason)) - .ConfigureAwait(false); + try + { + var r = ((RawSuperStreamProducer)(_producer)).ReconnectPartition; + await OnEntityClosed(_producerConfig.StreamSystem, partitionStream, r, + FromConnectionClosedReasonToStatusReason(closeReason)) + .ConfigureAwait(false); + } + catch (Exception e) + { + BaseLogger?.LogError(e, + $"Super stream producer. ConnectionClosedHandler error. Auto recovery failed for: {ToString()}"); + } }, MetadataHandler = async update => { await RandomWait().ConfigureAwait(false); if (IsClosedNormally()) return; - - var r = ((RawSuperStreamProducer)(_producer)).ReconnectPartition; - await OnEntityClosed(_producerConfig.StreamSystem, update.Stream, r, - ChangeStatusReason.MetaDataUpdate) - .ConfigureAwait(false); + try + { + var r = ((RawSuperStreamProducer)(_producer)).ReconnectPartition; + await OnEntityClosed(_producerConfig.StreamSystem, update.Stream, r, + ChangeStatusReason.MetaDataUpdate) + .ConfigureAwait(false); + } + catch (Exception e) + { + BaseLogger?.LogError(e, + $"Super stream producer. MetadataHandler error. Auto recovery failed for: {ToString()}"); + } }, ConfirmHandler = confirmationHandler => { diff --git a/RabbitMQ.Stream.Client/Reliable/ReliableBase.cs b/RabbitMQ.Stream.Client/Reliable/ReliableBase.cs index 3f782025..c4cf46e5 100644 --- a/RabbitMQ.Stream.Client/Reliable/ReliableBase.cs +++ b/RabbitMQ.Stream.Client/Reliable/ReliableBase.cs @@ -271,10 +271,11 @@ private async Task Init(bool boot) /// stream name /// stream system /// - private async Task CheckIfStreamIsAvailable(string stream, StreamSystem system) + private async Task<(bool, StreamInfo)> CheckIfStreamIsAvailable(string stream, StreamSystem system) { await Task.Delay(Consts.RandomMid()).ConfigureAwait(false); var exists = false; + StreamInfo streamInfo = default; var tryAgain = true; while (tryAgain) { @@ -286,7 +287,7 @@ private async Task CheckIfStreamIsAvailable(string stream, StreamSystem sy { // It is not enough to check if the stream exists // we need to check if the stream has the leader - var streamInfo = await system.StreamInfo(stream).ConfigureAwait(false); + streamInfo = await system.StreamInfo(stream).ConfigureAwait(false); ClientExceptions.CheckLeader(streamInfo); available += " and has a valid leader"; } @@ -304,7 +305,7 @@ await _reliableConfig.ResourceAvailableReconnectStrategy } if (exists) - return true; + return (true, streamInfo); // In this case the stream doesn't exist anymore or it failed to check if the stream exists // too many tentatives for the reconnection strategy // the Entity is just closed. @@ -316,7 +317,7 @@ await _reliableConfig.ResourceAvailableReconnectStrategy ToString() ); - return false; + return (false, default); } // @@ -409,16 +410,17 @@ internal async Task OnEntityClosed(StreamSystem system, string stream, Func reconnectPartitionFunc, ChangeStatusReason reason) { var streamExists = false; + await SemaphoreSlim.WaitAsync().ConfigureAwait(false); UpdateStatus(ReliableEntityStatus.Reconnection, reason, [stream]); try { - streamExists = await CheckIfStreamIsAvailable(stream, system) + var (localStreamExists, streamInfo) = await CheckIfStreamIsAvailable(stream, system) .ConfigureAwait(false); - if (streamExists) + streamExists = localStreamExists; + if (streamExists && !streamInfo.Equals(default)) { - var streamInfo = await system.StreamInfo(stream).ConfigureAwait(false); await MaybeReconnectPartition(streamInfo, ToString(), reconnectPartitionFunc).ConfigureAwait(false); } } @@ -446,7 +448,7 @@ internal async Task OnEntityClosed(StreamSystem system, string stream, ChangeSta UpdateStatus(ReliableEntityStatus.Reconnection, reason, [stream]); try { - streamExists = await CheckIfStreamIsAvailable(stream, system) + (streamExists, _) = await CheckIfStreamIsAvailable(stream, system) .ConfigureAwait(false); if (streamExists) { From bcba85ca0b4b61e0fe4e4266dcb753a1517222b2 Mon Sep 17 00:00:00 2001 From: Gabriele Santomaggio Date: Tue, 1 Apr 2025 10:27:16 +0200 Subject: [PATCH 2/3] Fix autorecconetion in query metadata Signed-off-by: Gabriele Santomaggio --- RabbitMQ.Stream.Client/Reliable/ConsumerFactory.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/RabbitMQ.Stream.Client/Reliable/ConsumerFactory.cs b/RabbitMQ.Stream.Client/Reliable/ConsumerFactory.cs index dde70adf..e059a3fe 100644 --- a/RabbitMQ.Stream.Client/Reliable/ConsumerFactory.cs +++ b/RabbitMQ.Stream.Client/Reliable/ConsumerFactory.cs @@ -156,7 +156,7 @@ await OnEntityClosed(_consumerConfig.StreamSystem, partitionStream, r, catch (Exception e) { BaseLogger?.LogError(e, - $"Super stream consumer. MetadataHandler error. Auto recovery failed for: {ToString()}"); + $"Super stream consumer. ConnectionClosedHandler error. Auto recovery failed for: {ToString()}"); } }, MetadataHandler = async update => From e08a97113f15a973c938912648d1db32c0d13ad1 Mon Sep 17 00:00:00 2001 From: Gabriele Santomaggio Date: Tue, 1 Apr 2025 11:39:05 +0200 Subject: [PATCH 3/3] Fix autorecconetion in query metadata Signed-off-by: Gabriele Santomaggio --- RabbitMQ.Stream.Client/Reliable/ConsumerFactory.cs | 4 ++-- RabbitMQ.Stream.Client/Reliable/ProducerFactory.cs | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/RabbitMQ.Stream.Client/Reliable/ConsumerFactory.cs b/RabbitMQ.Stream.Client/Reliable/ConsumerFactory.cs index e059a3fe..21b60560 100644 --- a/RabbitMQ.Stream.Client/Reliable/ConsumerFactory.cs +++ b/RabbitMQ.Stream.Client/Reliable/ConsumerFactory.cs @@ -156,7 +156,7 @@ await OnEntityClosed(_consumerConfig.StreamSystem, partitionStream, r, catch (Exception e) { BaseLogger?.LogError(e, - $"Super stream consumer. ConnectionClosedHandler error. Auto recovery failed for: {ToString()}"); + $"Super stream consumer. ConnectionClosedHandler error. Auto recovery failed for stream: {_consumerConfig.Stream}"); } }, MetadataHandler = async update => @@ -175,7 +175,7 @@ await OnEntityClosed(_consumerConfig.StreamSystem, update.Stream, r, catch (Exception e) { BaseLogger?.LogError(e, - $"Super stream consumer.MetadataHandler error. Auto recovery failed for: {ToString()}"); + $"Super stream consumer.MetadataHandler error. Auto recovery failed stream: {_consumerConfig.Stream}"); } }, MessageHandler = async (partitionStream, consumer, ctx, message) => diff --git a/RabbitMQ.Stream.Client/Reliable/ProducerFactory.cs b/RabbitMQ.Stream.Client/Reliable/ProducerFactory.cs index ca203797..2d173326 100644 --- a/RabbitMQ.Stream.Client/Reliable/ProducerFactory.cs +++ b/RabbitMQ.Stream.Client/Reliable/ProducerFactory.cs @@ -61,7 +61,7 @@ await OnEntityClosed(_producerConfig.StreamSystem, partitionStream, r, catch (Exception e) { BaseLogger?.LogError(e, - $"Super stream producer. ConnectionClosedHandler error. Auto recovery failed for: {ToString()}"); + $"Super stream producer. ConnectionClosedHandler error. Auto recovery failed for stream: {_producerConfig.Stream}"); } }, MetadataHandler = async update => @@ -79,7 +79,7 @@ await OnEntityClosed(_producerConfig.StreamSystem, update.Stream, r, catch (Exception e) { BaseLogger?.LogError(e, - $"Super stream producer. MetadataHandler error. Auto recovery failed for: {ToString()}"); + $"Super stream producer. MetadataHandler error. Auto recovery failed stream: {_producerConfig.Stream}"); } }, ConfirmHandler = confirmationHandler =>