Skip to content

Lock Message dispatch during the promotion #417

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
May 28, 2025
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 47 additions & 3 deletions RabbitMQ.Stream.Client/RawConsumer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
using System.Collections.Generic;
using System.Diagnostics;
using System.Runtime.CompilerServices;
using System.Threading;
using System.Threading.Channels;
using System.Threading.Tasks;
using Microsoft.Extensions.Logging;
Expand Down Expand Up @@ -197,7 +198,8 @@ public async Task StoreOffset(ulong offset)
await _client.StoreOffset(_config.Reference, _config.Stream, offset).ConfigureAwait(false);
}

// It is needed to understand if the consumer is active or not
////// *********************
// IsPromotedAsActive is needed to understand if the consumer is active or not
// by default is active
// in case of single active consumer can be not active
// it is important to skip the messages in the chunk that
Expand All @@ -206,6 +208,36 @@ public async Task StoreOffset(ulong offset)
// long task
private bool IsPromotedAsActive { get; set; }

// PromotionLock avoids race conditions when the consumer is promoted as active
// and the messages are dispatched in parallel.
// The consumer can be promoted as active with the function ConsumerUpdateListener
// It is needed when the consumer is single active consumer
private SemaphoreSlim PromotionLock { get; } = new(1);

/// <summary>
/// MaybeLockDispatch locks the dispatch of the messages
/// it is needed only when the consumer is single active consumer
/// MaybeLockDispatch is an optimization to avoid to lock the dispatch
/// when the consumer is not single active consumer
/// </summary>

private async Task MaybeLockDispatch()
{
if (_config.IsSingleActiveConsumer)
await PromotionLock.WaitAsync(Token).ConfigureAwait(false);
}

/// <summary>
/// MaybeReleaseLock releases the lock on the dispatch of the messages
/// Following the MaybeLockDispatch method
/// </summary>
private void MaybeReleaseLock()
{
if (_config.IsSingleActiveConsumer)
PromotionLock.Release();
}

////// *********************
public static async Task<IConsumer> Create(
ClientParameters clientParameters,
RawConsumerConfig config,
Expand Down Expand Up @@ -389,7 +421,15 @@ await _config.MessageHandler(this,
for (ulong z = 0; z < subEntryChunk.NumRecordsInBatch; z++)
{
var message = MessageFromSequence(ref unCompressedData, ref compressOffset);
await DispatchMessage(message, messageOffset++).ConfigureAwait(false);
await MaybeLockDispatch().ConfigureAwait(false);
try
{
await DispatchMessage(message, messageOffset++).ConfigureAwait(false);
}
finally
{
MaybeReleaseLock();
}
}

numRecords -= subEntryChunk.NumRecordsInBatch;
Expand Down Expand Up @@ -590,7 +630,7 @@ private async Task Init()
{
// in this case the StoredOffsetSpec is overridden by the ConsumerUpdateListener
// since the user decided to override the default behavior

await MaybeLockDispatch().ConfigureAwait(false);
try
{
_config.StoredOffsetSpec = await _config.ConsumerUpdateListener(
Expand All @@ -606,6 +646,10 @@ private async Task Init()
// in this case the default behavior is to use the OffsetTypeNext
_config.StoredOffsetSpec = new OffsetTypeNext();
}
finally
{
MaybeReleaseLock();
}
}

// Here we set the promotion status
Expand Down