diff --git a/RabbitMQ.Stream.Client/RawConsumer.cs b/RabbitMQ.Stream.Client/RawConsumer.cs
index eab59c5b..f34b367f 100644
--- a/RabbitMQ.Stream.Client/RawConsumer.cs
+++ b/RabbitMQ.Stream.Client/RawConsumer.cs
@@ -7,6 +7,7 @@
using System.Collections.Generic;
using System.Diagnostics;
using System.Runtime.CompilerServices;
+using System.Threading;
using System.Threading.Channels;
using System.Threading.Tasks;
using Microsoft.Extensions.Logging;
@@ -197,7 +198,8 @@ public async Task StoreOffset(ulong offset)
await _client.StoreOffset(_config.Reference, _config.Stream, offset).ConfigureAwait(false);
}
- // It is needed to understand if the consumer is active or not
+ ////// *********************
+ // IsPromotedAsActive is needed to understand if the consumer is active or not
// by default is active
// in case of single active consumer can be not active
// it is important to skip the messages in the chunk that
@@ -206,6 +208,36 @@ public async Task StoreOffset(ulong offset)
// long task
private bool IsPromotedAsActive { get; set; }
+ // PromotionLock avoids race conditions when the consumer is promoted as active
+ // and the messages are dispatched in parallel.
+ // The consumer can be promoted as active with the function ConsumerUpdateListener
+ // It is needed when the consumer is single active consumer
+ private SemaphoreSlim PromotionLock { get; } = new(1);
+
+ ///
+ /// MaybeLockDispatch locks the dispatch of the messages
+ /// it is needed only when the consumer is single active consumer
+ /// MaybeLockDispatch is an optimization to avoid to lock the dispatch
+ /// when the consumer is not single active consumer
+ ///
+
+ private async Task MaybeLockDispatch()
+ {
+ if (_config.IsSingleActiveConsumer)
+ await PromotionLock.WaitAsync(Token).ConfigureAwait(false);
+ }
+
+ ///
+ /// MaybeReleaseLock releases the lock on the dispatch of the messages
+ /// Following the MaybeLockDispatch method
+ ///
+ private void MaybeReleaseLock()
+ {
+ if (_config.IsSingleActiveConsumer)
+ PromotionLock.Release();
+ }
+
+ ////// *********************
public static async Task Create(
ClientParameters clientParameters,
RawConsumerConfig config,
@@ -389,7 +421,15 @@ await _config.MessageHandler(this,
for (ulong z = 0; z < subEntryChunk.NumRecordsInBatch; z++)
{
var message = MessageFromSequence(ref unCompressedData, ref compressOffset);
- await DispatchMessage(message, messageOffset++).ConfigureAwait(false);
+ await MaybeLockDispatch().ConfigureAwait(false);
+ try
+ {
+ await DispatchMessage(message, messageOffset++).ConfigureAwait(false);
+ }
+ finally
+ {
+ MaybeReleaseLock();
+ }
}
numRecords -= subEntryChunk.NumRecordsInBatch;
@@ -590,7 +630,7 @@ private async Task Init()
{
// in this case the StoredOffsetSpec is overridden by the ConsumerUpdateListener
// since the user decided to override the default behavior
-
+ await MaybeLockDispatch().ConfigureAwait(false);
try
{
_config.StoredOffsetSpec = await _config.ConsumerUpdateListener(
@@ -606,6 +646,10 @@ private async Task Init()
// in this case the default behavior is to use the OffsetTypeNext
_config.StoredOffsetSpec = new OffsetTypeNext();
}
+ finally
+ {
+ MaybeReleaseLock();
+ }
}
// Here we set the promotion status