diff --git a/projects/RabbitMQ.Client/CreateChannelOptions.cs b/projects/RabbitMQ.Client/CreateChannelOptions.cs index 17a9d6105..647177972 100644 --- a/projects/RabbitMQ.Client/CreateChannelOptions.cs +++ b/projects/RabbitMQ.Client/CreateChannelOptions.cs @@ -15,6 +15,17 @@ public sealed class CreateChannelOptions /// public bool PublisherConfirmationTrackingEnabled { get; set; } = false; + /// + /// If publisher confirmation tracking is enabled, this represents the number of allowed + /// outstanding publisher confirmations before publishing is blocked. + /// + /// Defaults to 128 + /// + /// Set to null, to allow an unlimited number of outstanding confirmations. + /// + /// + public ushort? MaxOutstandingPublisherConfirmations { get; set; } = 128; + /// /// Set to a value greater than one to enable concurrent processing. For a concurrency greater than one /// will be offloaded to the worker thread pool so it is important to choose the value for the concurrency wisely to avoid thread pool overloading. diff --git a/projects/RabbitMQ.Client/Impl/AutorecoveringChannel.cs b/projects/RabbitMQ.Client/Impl/AutorecoveringChannel.cs index 2a2efec63..2b8a4d62f 100644 --- a/projects/RabbitMQ.Client/Impl/AutorecoveringChannel.cs +++ b/projects/RabbitMQ.Client/Impl/AutorecoveringChannel.cs @@ -51,6 +51,7 @@ internal sealed class AutorecoveringChannel : IChannel, IRecoverable private ushort _prefetchCountGlobal; private bool _publisherConfirmationsEnabled = false; private bool _publisherConfirmationTrackingEnabled = false; + private ushort? _maxOutstandingPublisherConfirmations = null; private bool _usesTransactions; private ushort _consumerDispatchConcurrency; @@ -71,14 +72,20 @@ public TimeSpan ContinuationTimeout set => InnerChannel.ContinuationTimeout = value; } - public AutorecoveringChannel(AutorecoveringConnection conn, RecoveryAwareChannel innerChannel, - ushort consumerDispatchConcurrency, bool publisherConfirmationsEnabled, bool publisherConfirmationTrackingEnabled) + // TODO just pass create channel options + public AutorecoveringChannel(AutorecoveringConnection conn, + RecoveryAwareChannel innerChannel, + ushort consumerDispatchConcurrency, + bool publisherConfirmationsEnabled, + bool publisherConfirmationTrackingEnabled, + ushort? maxOutstandingPublisherConfirmations) { _connection = conn; _innerChannel = innerChannel; _consumerDispatchConcurrency = consumerDispatchConcurrency; _publisherConfirmationsEnabled = publisherConfirmationsEnabled; _publisherConfirmationTrackingEnabled = publisherConfirmationTrackingEnabled; + _maxOutstandingPublisherConfirmations = maxOutstandingPublisherConfirmations; } public event AsyncEventHandler BasicAcksAsync @@ -164,8 +171,11 @@ internal async Task AutomaticallyRecoverAsync(AutorecoveringConnection con _connection = conn; RecoveryAwareChannel newChannel = await conn.CreateNonRecoveringChannelAsync( - _publisherConfirmationsEnabled, _publisherConfirmationTrackingEnabled, - _consumerDispatchConcurrency, cancellationToken) + _publisherConfirmationsEnabled, + _publisherConfirmationTrackingEnabled, + _maxOutstandingPublisherConfirmations, + _consumerDispatchConcurrency, + cancellationToken) .ConfigureAwait(false); newChannel.TakeOver(_innerChannel); diff --git a/projects/RabbitMQ.Client/Impl/AutorecoveringConnection.cs b/projects/RabbitMQ.Client/Impl/AutorecoveringConnection.cs index 5cee094c3..3e544bb84 100644 --- a/projects/RabbitMQ.Client/Impl/AutorecoveringConnection.cs +++ b/projects/RabbitMQ.Client/Impl/AutorecoveringConnection.cs @@ -184,16 +184,21 @@ public event AsyncEventHandler RecoveringConsumerAs public IProtocol Protocol => Endpoint.Protocol; + // TODO pass channel creation options? public async ValueTask CreateNonRecoveringChannelAsync( bool publisherConfirmationsEnabled = false, bool publisherConfirmationTrackingEnabled = false, + ushort? maxOutstandingPublisherConfirmations = null, ushort? consumerDispatchConcurrency = null, CancellationToken cancellationToken = default) { ISession session = InnerConnection.CreateSession(); var result = new RecoveryAwareChannel(_config, session, consumerDispatchConcurrency); return (RecoveryAwareChannel)await result.OpenAsync( - publisherConfirmationsEnabled, publisherConfirmationTrackingEnabled, cancellationToken) + publisherConfirmationsEnabled, + publisherConfirmationTrackingEnabled, + maxOutstandingPublisherConfirmations, + cancellationToken) .ConfigureAwait(false); } @@ -266,11 +271,20 @@ public async Task CreateChannelAsync(CreateChannelOptions? options = d ushort cdc = options.ConsumerDispatchConcurrency.GetValueOrDefault(_config.ConsumerDispatchConcurrency); RecoveryAwareChannel recoveryAwareChannel = await CreateNonRecoveringChannelAsync( - options.PublisherConfirmationsEnabled, options.PublisherConfirmationTrackingEnabled, cdc, cancellationToken) + options.PublisherConfirmationsEnabled, + options.PublisherConfirmationTrackingEnabled, + options.MaxOutstandingPublisherConfirmations, + cdc, + cancellationToken) .ConfigureAwait(false); - var autorecoveringChannel = new AutorecoveringChannel(this, recoveryAwareChannel, cdc, - options.PublisherConfirmationsEnabled, options.PublisherConfirmationTrackingEnabled); + // TODO just pass create channel options + var autorecoveringChannel = new AutorecoveringChannel(this, + recoveryAwareChannel, + cdc, + options.PublisherConfirmationsEnabled, + options.PublisherConfirmationTrackingEnabled, + options.MaxOutstandingPublisherConfirmations); await RecordChannelAsync(autorecoveringChannel, channelsSemaphoreHeld: false, cancellationToken: cancellationToken) .ConfigureAwait(false); return autorecoveringChannel; diff --git a/projects/RabbitMQ.Client/Impl/Channel.BasicPublish.cs b/projects/RabbitMQ.Client/Impl/Channel.BasicPublish.cs index a0a084d02..04744cb0e 100644 --- a/projects/RabbitMQ.Client/Impl/Channel.BasicPublish.cs +++ b/projects/RabbitMQ.Client/Impl/Channel.BasicPublish.cs @@ -32,6 +32,7 @@ using System; using System.Collections.Generic; using System.Diagnostics; +using System.Runtime.CompilerServices; using System.Threading; using System.Threading.Tasks; using RabbitMQ.Client.Framing; @@ -41,6 +42,8 @@ namespace RabbitMQ.Client.Impl { internal partial class Channel : IChannel, IRecoverable { + private readonly AsyncManualResetEvent _flowControlBlock = new(true); + public async ValueTask BasicPublishAsync(string exchange, string routingKey, bool mandatory, TProperties basicProperties, ReadOnlyMemory body, CancellationToken cancellationToken = default) @@ -53,7 +56,7 @@ public async ValueTask BasicPublishAsync(string exchange, string ro await MaybeStartPublisherConfirmationTracking(cancellationToken) .ConfigureAwait(false); - await EnforceFlowControlAsync(cancellationToken) + await MaybeEnforceFlowControlAsync(cancellationToken) .ConfigureAwait(false); var cmd = new BasicPublish(exchange, routingKey, mandatory, default); @@ -108,7 +111,7 @@ public async ValueTask BasicPublishAsync(CachedString exchange, Cac await MaybeStartPublisherConfirmationTracking(cancellationToken) .ConfigureAwait(false); - await EnforceFlowControlAsync(cancellationToken) + await MaybeEnforceFlowControlAsync(cancellationToken) .ConfigureAwait(false); var cmd = new BasicPublishMemory(exchange.Bytes, routingKey.Bytes, mandatory, default); @@ -220,5 +223,16 @@ void MaybeAddPublishSequenceNumberToHeaders(IDictionary headers } } } + + [MethodImpl(MethodImplOptions.AggressiveInlining)] + private ValueTask MaybeEnforceFlowControlAsync(CancellationToken cancellationToken) + { + if (_flowControlBlock.IsSet) + { + return default; + } + + return _flowControlBlock.WaitAsync(cancellationToken); + } } } diff --git a/projects/RabbitMQ.Client/Impl/Channel.PublisherConfirms.cs b/projects/RabbitMQ.Client/Impl/Channel.PublisherConfirms.cs index f5617611e..f00311b25 100644 --- a/projects/RabbitMQ.Client/Impl/Channel.PublisherConfirms.cs +++ b/projects/RabbitMQ.Client/Impl/Channel.PublisherConfirms.cs @@ -47,6 +47,8 @@ internal partial class Channel : IChannel, IRecoverable { private bool _publisherConfirmationsEnabled = false; private bool _publisherConfirmationTrackingEnabled = false; + private ushort? _maxOutstandingPublisherConfirmations = null; + private SemaphoreSlim? _maxOutstandingConfirmationsSemaphore; private ulong _nextPublishSeqNo = 0; private readonly SemaphoreSlim _confirmSemaphore = new(1, 1); private readonly ConcurrentDictionary> _confirmsTaskCompletionSources = new(); @@ -115,10 +117,20 @@ public async ValueTask GetNextPublishSequenceNumberAsync(CancellationToke } } - private void ConfigurePublisherConfirmations(bool publisherConfirmationsEnabled, bool publisherConfirmationTrackingEnabled) + private void ConfigurePublisherConfirmations(bool publisherConfirmationsEnabled, + bool publisherConfirmationTrackingEnabled, + ushort? maxOutstandingPublisherConfirmations) { _publisherConfirmationsEnabled = publisherConfirmationsEnabled; _publisherConfirmationTrackingEnabled = publisherConfirmationTrackingEnabled; + _maxOutstandingPublisherConfirmations = maxOutstandingPublisherConfirmations; + + if (_publisherConfirmationTrackingEnabled && _maxOutstandingPublisherConfirmations is not null) + { + _maxOutstandingConfirmationsSemaphore = new SemaphoreSlim( + (int)_maxOutstandingPublisherConfirmations, + (int)_maxOutstandingPublisherConfirmations); + } } private async Task MaybeConfirmSelect(CancellationToken cancellationToken) @@ -270,6 +282,13 @@ await _confirmSemaphore.WaitAsync(reason.CancellationToken) { if (_publisherConfirmationsEnabled) { + if (_publisherConfirmationTrackingEnabled && + _maxOutstandingConfirmationsSemaphore is not null) + { + await _maxOutstandingConfirmationsSemaphore.WaitAsync(cancellationToken) + .ConfigureAwait(false); + } + await _confirmSemaphore.WaitAsync(cancellationToken) .ConfigureAwait(false); @@ -320,6 +339,12 @@ private async Task MaybeEndPublisherConfirmationTracking(PublisherConfirmationIn { if (_publisherConfirmationsEnabled) { + if (_publisherConfirmationTrackingEnabled && + _maxOutstandingConfirmationsSemaphore is not null) + { + _maxOutstandingConfirmationsSemaphore.Release(); + } + _confirmSemaphore.Release(); if (publisherConfirmationInfo is not null) diff --git a/projects/RabbitMQ.Client/Impl/Channel.cs b/projects/RabbitMQ.Client/Impl/Channel.cs index 6ca0c9a93..871a84d60 100644 --- a/projects/RabbitMQ.Client/Impl/Channel.cs +++ b/projects/RabbitMQ.Client/Impl/Channel.cs @@ -55,7 +55,6 @@ internal partial class Channel : IChannel, IRecoverable // AMQP only allows one RPC operation to be active at a time. protected readonly SemaphoreSlim _rpcSemaphore = new SemaphoreSlim(1, 1); private readonly RpcContinuationQueue _continuationQueue = new RpcContinuationQueue(); - private readonly AsyncManualResetEvent _flowControlBlock = new AsyncManualResetEvent(true); private ShutdownEventArgs? _closeReason; public ShutdownEventArgs? CloseReason => Volatile.Read(ref _closeReason); @@ -361,11 +360,14 @@ protected bool Enqueue(IRpcContinuation k) } } - internal async Task OpenAsync(bool publisherConfirmationsEnabled = false, - bool publisherConfirmationTrackingEnabled = false, - CancellationToken cancellationToken = default) + internal async Task OpenAsync(bool publisherConfirmationsEnabled, + bool publisherConfirmationTrackingEnabled, + ushort? maxOutstandingPublisherConfirmations, + CancellationToken cancellationToken) { - ConfigurePublisherConfirmations(publisherConfirmationsEnabled, publisherConfirmationTrackingEnabled); + ConfigurePublisherConfirmations(publisherConfirmationsEnabled, + publisherConfirmationTrackingEnabled, + maxOutstandingPublisherConfirmations); bool enqueued = false; var k = new ChannelOpenAsyncRpcContinuation(ContinuationTimeout, cancellationToken); @@ -450,17 +452,6 @@ protected ValueTask ModelSendAsync(in TMethod method, in THead return Session.TransmitAsync(in method, in header, body, cancellationToken); } - [MethodImpl(MethodImplOptions.AggressiveInlining)] - protected ValueTask EnforceFlowControlAsync(CancellationToken cancellationToken) - { - if (_flowControlBlock.IsSet) - { - return default; - } - - return _flowControlBlock.WaitAsync(cancellationToken); - } - internal Task OnCallbackExceptionAsync(CallbackExceptionEventArgs args) { return _callbackExceptionAsyncWrapper.InvokeAsync(this, args); @@ -540,7 +531,8 @@ protected virtual void Dispose(bool disposing) ConsumerDispatcher.Dispose(); _rpcSemaphore.Dispose(); - _confirmSemaphore?.Dispose(); + _confirmSemaphore.Dispose(); + _maxOutstandingConfirmationsSemaphore?.Dispose(); } } @@ -561,7 +553,8 @@ protected virtual async ValueTask DisposeAsyncCore() ConsumerDispatcher.Dispose(); _rpcSemaphore.Dispose(); - _confirmSemaphore?.Dispose(); + _confirmSemaphore.Dispose(); + _maxOutstandingConfirmationsSemaphore?.Dispose(); } public Task ConnectionTuneOkAsync(ushort channelMax, uint frameMax, ushort heartbeat, CancellationToken cancellationToken) diff --git a/projects/RabbitMQ.Client/Impl/Connection.cs b/projects/RabbitMQ.Client/Impl/Connection.cs index 333f24a4b..870ca5fc1 100644 --- a/projects/RabbitMQ.Client/Impl/Connection.cs +++ b/projects/RabbitMQ.Client/Impl/Connection.cs @@ -273,7 +273,11 @@ public async Task CreateChannelAsync(CreateChannelOptions? options = d // TODO channel CreateChannelAsync() to combine ctor and OpenAsync var channel = new Channel(_config, session, options.ConsumerDispatchConcurrency); - IChannel ch = await channel.OpenAsync(options.PublisherConfirmationsEnabled, options.PublisherConfirmationTrackingEnabled, cancellationToken) + IChannel ch = await channel.OpenAsync( + options.PublisherConfirmationsEnabled, + options.PublisherConfirmationTrackingEnabled, + options.MaxOutstandingPublisherConfirmations, + cancellationToken) .ConfigureAwait(false); return ch; } diff --git a/projects/RabbitMQ.Client/PublicAPI.Unshipped.txt b/projects/RabbitMQ.Client/PublicAPI.Unshipped.txt index e9f625369..d811ca426 100644 --- a/projects/RabbitMQ.Client/PublicAPI.Unshipped.txt +++ b/projects/RabbitMQ.Client/PublicAPI.Unshipped.txt @@ -3,6 +3,8 @@ RabbitMQ.Client.CreateChannelOptions RabbitMQ.Client.CreateChannelOptions.ConsumerDispatchConcurrency.get -> ushort? RabbitMQ.Client.CreateChannelOptions.ConsumerDispatchConcurrency.set -> void RabbitMQ.Client.CreateChannelOptions.CreateChannelOptions() -> void +RabbitMQ.Client.CreateChannelOptions.MaxOutstandingPublisherConfirmations.get -> ushort? +RabbitMQ.Client.CreateChannelOptions.MaxOutstandingPublisherConfirmations.set -> void RabbitMQ.Client.CreateChannelOptions.PublisherConfirmationsEnabled.get -> bool RabbitMQ.Client.CreateChannelOptions.PublisherConfirmationsEnabled.set -> void RabbitMQ.Client.CreateChannelOptions.PublisherConfirmationTrackingEnabled.get -> bool diff --git a/projects/Test/Applications/PublisherConfirms/PublisherConfirms.cs b/projects/Test/Applications/PublisherConfirms/PublisherConfirms.cs index d2439f256..10e153b8e 100644 --- a/projects/Test/Applications/PublisherConfirms/PublisherConfirms.cs +++ b/projects/Test/Applications/PublisherConfirms/PublisherConfirms.cs @@ -38,9 +38,18 @@ using System.Threading.Tasks; using RabbitMQ.Client; +const ushort MAX_OUTSTANDING_CONFIRMS = 256; + const int MESSAGE_COUNT = 50_000; bool debug = false; +var channelOpts = new CreateChannelOptions +{ + PublisherConfirmationsEnabled = true, + PublisherConfirmationTrackingEnabled = true, + MaxOutstandingPublisherConfirmations = MAX_OUTSTANDING_CONFIRMS +}; + #pragma warning disable CS8321 // Local function is declared but never used await PublishMessagesIndividuallyAsync(); @@ -53,12 +62,12 @@ static Task CreateConnectionAsync() return factory.CreateConnectionAsync(); } -static async Task PublishMessagesIndividuallyAsync() +async Task PublishMessagesIndividuallyAsync() { Console.WriteLine($"{DateTime.Now} [INFO] publishing {MESSAGE_COUNT:N0} messages and handling confirms per-message"); await using IConnection connection = await CreateConnectionAsync(); - await using IChannel channel = await connection.CreateChannelAsync(new CreateChannelOptions { PublisherConfirmationsEnabled = true, PublisherConfirmationTrackingEnabled = true }); + await using IChannel channel = await connection.CreateChannelAsync(channelOpts); // declare a server-named queue QueueDeclareOk queueDeclareResult = await channel.QueueDeclareAsync(); @@ -85,18 +94,18 @@ static async Task PublishMessagesIndividuallyAsync() Console.WriteLine($"{DateTime.Now} [INFO] published {MESSAGE_COUNT:N0} messages individually in {sw.ElapsedMilliseconds:N0} ms"); } -static async Task PublishMessagesInBatchAsync() +async Task PublishMessagesInBatchAsync() { Console.WriteLine($"{DateTime.Now} [INFO] publishing {MESSAGE_COUNT:N0} messages and handling confirms in batches"); await using IConnection connection = await CreateConnectionAsync(); - await using IChannel channel = await connection.CreateChannelAsync(new CreateChannelOptions { PublisherConfirmationsEnabled = true, PublisherConfirmationTrackingEnabled = true }); + await using IChannel channel = await connection.CreateChannelAsync(channelOpts); // declare a server-named queue QueueDeclareOk queueDeclareResult = await channel.QueueDeclareAsync(); string queueName = queueDeclareResult.QueueName; - int batchSize = 1000; + int batchSize = MAX_OUTSTANDING_CONFIRMS; int outstandingMessageCount = 0; var sw = new Stopwatch(); @@ -154,12 +163,8 @@ async Task HandlePublishConfirmsAsynchronously() await using IConnection connection = await CreateConnectionAsync(); - var channelOptions = new CreateChannelOptions - { - PublisherConfirmationsEnabled = true, - PublisherConfirmationTrackingEnabled = false - }; - await using IChannel channel = await connection.CreateChannelAsync(channelOptions); + channelOpts.PublisherConfirmationTrackingEnabled = false; + await using IChannel channel = await connection.CreateChannelAsync(channelOpts); // declare a server-named queue QueueDeclareOk queueDeclareResult = await channel.QueueDeclareAsync();