From 78c03ff5f14e809962670c39299f626e05d477ff Mon Sep 17 00:00:00 2001 From: Gabriele Santomaggio Date: Tue, 27 May 2025 08:46:10 +0200 Subject: [PATCH 1/2] lock update listener Signed-off-by: Gabriele Santomaggio --- RabbitMQ.Stream.Client/RawConsumer.cs | 18 ++++++++++++++++-- 1 file changed, 16 insertions(+), 2 deletions(-) diff --git a/RabbitMQ.Stream.Client/RawConsumer.cs b/RabbitMQ.Stream.Client/RawConsumer.cs index eab59c5b..bc0d035e 100644 --- a/RabbitMQ.Stream.Client/RawConsumer.cs +++ b/RabbitMQ.Stream.Client/RawConsumer.cs @@ -7,6 +7,7 @@ using System.Collections.Generic; using System.Diagnostics; using System.Runtime.CompilerServices; +using System.Threading; using System.Threading.Channels; using System.Threading.Tasks; using Microsoft.Extensions.Logging; @@ -205,6 +206,7 @@ public async Task StoreOffset(ulong offset) // avoiding to block the consumer handler if the user put some // long task private bool IsPromotedAsActive { get; set; } + private SemaphoreSlim Lock { get; } = new(1); public static async Task Create( ClientParameters clientParameters, @@ -389,7 +391,15 @@ await _config.MessageHandler(this, for (ulong z = 0; z < subEntryChunk.NumRecordsInBatch; z++) { var message = MessageFromSequence(ref unCompressedData, ref compressOffset); - await DispatchMessage(message, messageOffset++).ConfigureAwait(false); + await Lock.WaitAsync().ConfigureAwait(false); + try + { + await DispatchMessage(message, messageOffset++).ConfigureAwait(false); + } + finally + { + Lock.Release(); + } } numRecords -= subEntryChunk.NumRecordsInBatch; @@ -590,7 +600,7 @@ private async Task Init() { // in this case the StoredOffsetSpec is overridden by the ConsumerUpdateListener // since the user decided to override the default behavior - + await Lock.WaitAsync().ConfigureAwait(false); try { _config.StoredOffsetSpec = await _config.ConsumerUpdateListener( @@ -606,6 +616,10 @@ private async Task Init() // in this case the default behavior is to use the OffsetTypeNext _config.StoredOffsetSpec = new OffsetTypeNext(); } + finally + { + Lock.Release(); + } } // Here we set the promotion status From 3d219a48a6457c42253857d3a5aaca3b14b440c0 Mon Sep 17 00:00:00 2001 From: Gabriele Santomaggio Date: Tue, 27 May 2025 11:20:11 +0200 Subject: [PATCH 2/2] lock consumer update when the consumer is in single active consumer Signed-off-by: Gabriele Santomaggio --- RabbitMQ.Stream.Client/RawConsumer.cs | 42 +++++++++++++++++++++++---- 1 file changed, 36 insertions(+), 6 deletions(-) diff --git a/RabbitMQ.Stream.Client/RawConsumer.cs b/RabbitMQ.Stream.Client/RawConsumer.cs index bc0d035e..f34b367f 100644 --- a/RabbitMQ.Stream.Client/RawConsumer.cs +++ b/RabbitMQ.Stream.Client/RawConsumer.cs @@ -198,7 +198,8 @@ public async Task StoreOffset(ulong offset) await _client.StoreOffset(_config.Reference, _config.Stream, offset).ConfigureAwait(false); } - // It is needed to understand if the consumer is active or not + ////// ********************* + // IsPromotedAsActive is needed to understand if the consumer is active or not // by default is active // in case of single active consumer can be not active // it is important to skip the messages in the chunk that @@ -206,8 +207,37 @@ public async Task StoreOffset(ulong offset) // avoiding to block the consumer handler if the user put some // long task private bool IsPromotedAsActive { get; set; } - private SemaphoreSlim Lock { get; } = new(1); + // PromotionLock avoids race conditions when the consumer is promoted as active + // and the messages are dispatched in parallel. + // The consumer can be promoted as active with the function ConsumerUpdateListener + // It is needed when the consumer is single active consumer + private SemaphoreSlim PromotionLock { get; } = new(1); + + /// + /// MaybeLockDispatch locks the dispatch of the messages + /// it is needed only when the consumer is single active consumer + /// MaybeLockDispatch is an optimization to avoid to lock the dispatch + /// when the consumer is not single active consumer + /// + + private async Task MaybeLockDispatch() + { + if (_config.IsSingleActiveConsumer) + await PromotionLock.WaitAsync(Token).ConfigureAwait(false); + } + + /// + /// MaybeReleaseLock releases the lock on the dispatch of the messages + /// Following the MaybeLockDispatch method + /// + private void MaybeReleaseLock() + { + if (_config.IsSingleActiveConsumer) + PromotionLock.Release(); + } + + ////// ********************* public static async Task Create( ClientParameters clientParameters, RawConsumerConfig config, @@ -391,14 +421,14 @@ await _config.MessageHandler(this, for (ulong z = 0; z < subEntryChunk.NumRecordsInBatch; z++) { var message = MessageFromSequence(ref unCompressedData, ref compressOffset); - await Lock.WaitAsync().ConfigureAwait(false); + await MaybeLockDispatch().ConfigureAwait(false); try { await DispatchMessage(message, messageOffset++).ConfigureAwait(false); } finally { - Lock.Release(); + MaybeReleaseLock(); } } @@ -600,7 +630,7 @@ private async Task Init() { // in this case the StoredOffsetSpec is overridden by the ConsumerUpdateListener // since the user decided to override the default behavior - await Lock.WaitAsync().ConfigureAwait(false); + await MaybeLockDispatch().ConfigureAwait(false); try { _config.StoredOffsetSpec = await _config.ConsumerUpdateListener( @@ -618,7 +648,7 @@ private async Task Init() } finally { - Lock.Release(); + MaybeReleaseLock(); } }