diff --git a/RabbitMQ.Stream.Client/RawConsumer.cs b/RabbitMQ.Stream.Client/RawConsumer.cs index eab59c5b..f34b367f 100644 --- a/RabbitMQ.Stream.Client/RawConsumer.cs +++ b/RabbitMQ.Stream.Client/RawConsumer.cs @@ -7,6 +7,7 @@ using System.Collections.Generic; using System.Diagnostics; using System.Runtime.CompilerServices; +using System.Threading; using System.Threading.Channels; using System.Threading.Tasks; using Microsoft.Extensions.Logging; @@ -197,7 +198,8 @@ public async Task StoreOffset(ulong offset) await _client.StoreOffset(_config.Reference, _config.Stream, offset).ConfigureAwait(false); } - // It is needed to understand if the consumer is active or not + ////// ********************* + // IsPromotedAsActive is needed to understand if the consumer is active or not // by default is active // in case of single active consumer can be not active // it is important to skip the messages in the chunk that @@ -206,6 +208,36 @@ public async Task StoreOffset(ulong offset) // long task private bool IsPromotedAsActive { get; set; } + // PromotionLock avoids race conditions when the consumer is promoted as active + // and the messages are dispatched in parallel. + // The consumer can be promoted as active with the function ConsumerUpdateListener + // It is needed when the consumer is single active consumer + private SemaphoreSlim PromotionLock { get; } = new(1); + + /// + /// MaybeLockDispatch locks the dispatch of the messages + /// it is needed only when the consumer is single active consumer + /// MaybeLockDispatch is an optimization to avoid to lock the dispatch + /// when the consumer is not single active consumer + /// + + private async Task MaybeLockDispatch() + { + if (_config.IsSingleActiveConsumer) + await PromotionLock.WaitAsync(Token).ConfigureAwait(false); + } + + /// + /// MaybeReleaseLock releases the lock on the dispatch of the messages + /// Following the MaybeLockDispatch method + /// + private void MaybeReleaseLock() + { + if (_config.IsSingleActiveConsumer) + PromotionLock.Release(); + } + + ////// ********************* public static async Task Create( ClientParameters clientParameters, RawConsumerConfig config, @@ -389,7 +421,15 @@ await _config.MessageHandler(this, for (ulong z = 0; z < subEntryChunk.NumRecordsInBatch; z++) { var message = MessageFromSequence(ref unCompressedData, ref compressOffset); - await DispatchMessage(message, messageOffset++).ConfigureAwait(false); + await MaybeLockDispatch().ConfigureAwait(false); + try + { + await DispatchMessage(message, messageOffset++).ConfigureAwait(false); + } + finally + { + MaybeReleaseLock(); + } } numRecords -= subEntryChunk.NumRecordsInBatch; @@ -590,7 +630,7 @@ private async Task Init() { // in this case the StoredOffsetSpec is overridden by the ConsumerUpdateListener // since the user decided to override the default behavior - + await MaybeLockDispatch().ConfigureAwait(false); try { _config.StoredOffsetSpec = await _config.ConsumerUpdateListener( @@ -606,6 +646,10 @@ private async Task Init() // in this case the default behavior is to use the OffsetTypeNext _config.StoredOffsetSpec = new OffsetTypeNext(); } + finally + { + MaybeReleaseLock(); + } } // Here we set the promotion status