From 6cab6ee489d17dce3fceaf03c821f85de3a21a63 Mon Sep 17 00:00:00 2001 From: Luke Bakken Date: Fri, 18 Oct 2024 15:10:24 -0700 Subject: [PATCH 1/8] Allow setting max outstanding publisher confirms Part of #1682 --- .../RabbitMQ.Client/Impl/Channel.BasicPublish.cs | 12 ++++++++++++ projects/RabbitMQ.Client/Impl/Channel.cs | 11 ----------- 2 files changed, 12 insertions(+), 11 deletions(-) diff --git a/projects/RabbitMQ.Client/Impl/Channel.BasicPublish.cs b/projects/RabbitMQ.Client/Impl/Channel.BasicPublish.cs index a0a084d02..25b99a96f 100644 --- a/projects/RabbitMQ.Client/Impl/Channel.BasicPublish.cs +++ b/projects/RabbitMQ.Client/Impl/Channel.BasicPublish.cs @@ -32,6 +32,7 @@ using System; using System.Collections.Generic; using System.Diagnostics; +using System.Runtime.CompilerServices; using System.Threading; using System.Threading.Tasks; using RabbitMQ.Client.Framing; @@ -220,5 +221,16 @@ void MaybeAddPublishSequenceNumberToHeaders(IDictionary headers } } } + + [MethodImpl(MethodImplOptions.AggressiveInlining)] + private ValueTask EnforceFlowControlAsync(CancellationToken cancellationToken) + { + if (_flowControlBlock.IsSet) + { + return default; + } + + return _flowControlBlock.WaitAsync(cancellationToken); + } } } diff --git a/projects/RabbitMQ.Client/Impl/Channel.cs b/projects/RabbitMQ.Client/Impl/Channel.cs index 6ca0c9a93..ca749531b 100644 --- a/projects/RabbitMQ.Client/Impl/Channel.cs +++ b/projects/RabbitMQ.Client/Impl/Channel.cs @@ -450,17 +450,6 @@ protected ValueTask ModelSendAsync(in TMethod method, in THead return Session.TransmitAsync(in method, in header, body, cancellationToken); } - [MethodImpl(MethodImplOptions.AggressiveInlining)] - protected ValueTask EnforceFlowControlAsync(CancellationToken cancellationToken) - { - if (_flowControlBlock.IsSet) - { - return default; - } - - return _flowControlBlock.WaitAsync(cancellationToken); - } - internal Task OnCallbackExceptionAsync(CallbackExceptionEventArgs args) { return _callbackExceptionAsyncWrapper.InvokeAsync(this, args); From 9be2342832e151399da442d0ebcf2a084cae2e8a Mon Sep 17 00:00:00 2001 From: Luke Bakken Date: Fri, 18 Oct 2024 15:57:16 -0700 Subject: [PATCH 2/8] * Add `MaxOutstandingPublisherConfirmations` to `CreateChannelOptions` --- .../RabbitMQ.Client/CreateChannelOptions.cs | 10 +++++++++ .../Impl/AutorecoveringChannel.cs | 18 +++++++++++---- .../Impl/AutorecoveringConnection.cs | 22 +++++++++++++++---- .../Impl/Channel.BasicPublish.cs | 2 ++ .../Impl/Channel.PublisherConfirms.cs | 7 +++++- projects/RabbitMQ.Client/Impl/Channel.cs | 12 +++++----- projects/RabbitMQ.Client/Impl/Connection.cs | 6 ++++- .../RabbitMQ.Client/PublicAPI.Unshipped.txt | 2 ++ 8 files changed, 64 insertions(+), 15 deletions(-) diff --git a/projects/RabbitMQ.Client/CreateChannelOptions.cs b/projects/RabbitMQ.Client/CreateChannelOptions.cs index 17a9d6105..a64ae1934 100644 --- a/projects/RabbitMQ.Client/CreateChannelOptions.cs +++ b/projects/RabbitMQ.Client/CreateChannelOptions.cs @@ -15,6 +15,16 @@ public sealed class CreateChannelOptions /// public bool PublisherConfirmationTrackingEnabled { get; set; } = false; + /// + /// The number of allowed outstanding publisher confirmations before publishing is blocked. + /// + /// Defaults to 128 + /// + /// Set to null, to allow an unlimited number of outstanding confirmations. + /// + /// + public ushort? MaxOutstandingPublisherConfirmations { get; set; } = 128; + /// /// Set to a value greater than one to enable concurrent processing. For a concurrency greater than one /// will be offloaded to the worker thread pool so it is important to choose the value for the concurrency wisely to avoid thread pool overloading. diff --git a/projects/RabbitMQ.Client/Impl/AutorecoveringChannel.cs b/projects/RabbitMQ.Client/Impl/AutorecoveringChannel.cs index 2a2efec63..2b8a4d62f 100644 --- a/projects/RabbitMQ.Client/Impl/AutorecoveringChannel.cs +++ b/projects/RabbitMQ.Client/Impl/AutorecoveringChannel.cs @@ -51,6 +51,7 @@ internal sealed class AutorecoveringChannel : IChannel, IRecoverable private ushort _prefetchCountGlobal; private bool _publisherConfirmationsEnabled = false; private bool _publisherConfirmationTrackingEnabled = false; + private ushort? _maxOutstandingPublisherConfirmations = null; private bool _usesTransactions; private ushort _consumerDispatchConcurrency; @@ -71,14 +72,20 @@ public TimeSpan ContinuationTimeout set => InnerChannel.ContinuationTimeout = value; } - public AutorecoveringChannel(AutorecoveringConnection conn, RecoveryAwareChannel innerChannel, - ushort consumerDispatchConcurrency, bool publisherConfirmationsEnabled, bool publisherConfirmationTrackingEnabled) + // TODO just pass create channel options + public AutorecoveringChannel(AutorecoveringConnection conn, + RecoveryAwareChannel innerChannel, + ushort consumerDispatchConcurrency, + bool publisherConfirmationsEnabled, + bool publisherConfirmationTrackingEnabled, + ushort? maxOutstandingPublisherConfirmations) { _connection = conn; _innerChannel = innerChannel; _consumerDispatchConcurrency = consumerDispatchConcurrency; _publisherConfirmationsEnabled = publisherConfirmationsEnabled; _publisherConfirmationTrackingEnabled = publisherConfirmationTrackingEnabled; + _maxOutstandingPublisherConfirmations = maxOutstandingPublisherConfirmations; } public event AsyncEventHandler BasicAcksAsync @@ -164,8 +171,11 @@ internal async Task AutomaticallyRecoverAsync(AutorecoveringConnection con _connection = conn; RecoveryAwareChannel newChannel = await conn.CreateNonRecoveringChannelAsync( - _publisherConfirmationsEnabled, _publisherConfirmationTrackingEnabled, - _consumerDispatchConcurrency, cancellationToken) + _publisherConfirmationsEnabled, + _publisherConfirmationTrackingEnabled, + _maxOutstandingPublisherConfirmations, + _consumerDispatchConcurrency, + cancellationToken) .ConfigureAwait(false); newChannel.TakeOver(_innerChannel); diff --git a/projects/RabbitMQ.Client/Impl/AutorecoveringConnection.cs b/projects/RabbitMQ.Client/Impl/AutorecoveringConnection.cs index 5cee094c3..3e544bb84 100644 --- a/projects/RabbitMQ.Client/Impl/AutorecoveringConnection.cs +++ b/projects/RabbitMQ.Client/Impl/AutorecoveringConnection.cs @@ -184,16 +184,21 @@ public event AsyncEventHandler RecoveringConsumerAs public IProtocol Protocol => Endpoint.Protocol; + // TODO pass channel creation options? public async ValueTask CreateNonRecoveringChannelAsync( bool publisherConfirmationsEnabled = false, bool publisherConfirmationTrackingEnabled = false, + ushort? maxOutstandingPublisherConfirmations = null, ushort? consumerDispatchConcurrency = null, CancellationToken cancellationToken = default) { ISession session = InnerConnection.CreateSession(); var result = new RecoveryAwareChannel(_config, session, consumerDispatchConcurrency); return (RecoveryAwareChannel)await result.OpenAsync( - publisherConfirmationsEnabled, publisherConfirmationTrackingEnabled, cancellationToken) + publisherConfirmationsEnabled, + publisherConfirmationTrackingEnabled, + maxOutstandingPublisherConfirmations, + cancellationToken) .ConfigureAwait(false); } @@ -266,11 +271,20 @@ public async Task CreateChannelAsync(CreateChannelOptions? options = d ushort cdc = options.ConsumerDispatchConcurrency.GetValueOrDefault(_config.ConsumerDispatchConcurrency); RecoveryAwareChannel recoveryAwareChannel = await CreateNonRecoveringChannelAsync( - options.PublisherConfirmationsEnabled, options.PublisherConfirmationTrackingEnabled, cdc, cancellationToken) + options.PublisherConfirmationsEnabled, + options.PublisherConfirmationTrackingEnabled, + options.MaxOutstandingPublisherConfirmations, + cdc, + cancellationToken) .ConfigureAwait(false); - var autorecoveringChannel = new AutorecoveringChannel(this, recoveryAwareChannel, cdc, - options.PublisherConfirmationsEnabled, options.PublisherConfirmationTrackingEnabled); + // TODO just pass create channel options + var autorecoveringChannel = new AutorecoveringChannel(this, + recoveryAwareChannel, + cdc, + options.PublisherConfirmationsEnabled, + options.PublisherConfirmationTrackingEnabled, + options.MaxOutstandingPublisherConfirmations); await RecordChannelAsync(autorecoveringChannel, channelsSemaphoreHeld: false, cancellationToken: cancellationToken) .ConfigureAwait(false); return autorecoveringChannel; diff --git a/projects/RabbitMQ.Client/Impl/Channel.BasicPublish.cs b/projects/RabbitMQ.Client/Impl/Channel.BasicPublish.cs index 25b99a96f..a5f4eab7e 100644 --- a/projects/RabbitMQ.Client/Impl/Channel.BasicPublish.cs +++ b/projects/RabbitMQ.Client/Impl/Channel.BasicPublish.cs @@ -42,6 +42,8 @@ namespace RabbitMQ.Client.Impl { internal partial class Channel : IChannel, IRecoverable { + private readonly AsyncManualResetEvent _flowControlBlock = new(true); + public async ValueTask BasicPublishAsync(string exchange, string routingKey, bool mandatory, TProperties basicProperties, ReadOnlyMemory body, CancellationToken cancellationToken = default) diff --git a/projects/RabbitMQ.Client/Impl/Channel.PublisherConfirms.cs b/projects/RabbitMQ.Client/Impl/Channel.PublisherConfirms.cs index f5617611e..8e972eec8 100644 --- a/projects/RabbitMQ.Client/Impl/Channel.PublisherConfirms.cs +++ b/projects/RabbitMQ.Client/Impl/Channel.PublisherConfirms.cs @@ -47,9 +47,11 @@ internal partial class Channel : IChannel, IRecoverable { private bool _publisherConfirmationsEnabled = false; private bool _publisherConfirmationTrackingEnabled = false; + private ushort? _maxOutstandingPublisherConfirmations = null; private ulong _nextPublishSeqNo = 0; private readonly SemaphoreSlim _confirmSemaphore = new(1, 1); private readonly ConcurrentDictionary> _confirmsTaskCompletionSources = new(); + private readonly AsyncManualResetEvent _maxOutstandingPublisherConfirmsReached = new(true); private class PublisherConfirmationInfo { @@ -115,10 +117,13 @@ public async ValueTask GetNextPublishSequenceNumberAsync(CancellationToke } } - private void ConfigurePublisherConfirmations(bool publisherConfirmationsEnabled, bool publisherConfirmationTrackingEnabled) + private void ConfigurePublisherConfirmations(bool publisherConfirmationsEnabled, + bool publisherConfirmationTrackingEnabled, + ushort? maxOutstandingPublisherConfirmations) { _publisherConfirmationsEnabled = publisherConfirmationsEnabled; _publisherConfirmationTrackingEnabled = publisherConfirmationTrackingEnabled; + _maxOutstandingPublisherConfirmations = maxOutstandingPublisherConfirmations; } private async Task MaybeConfirmSelect(CancellationToken cancellationToken) diff --git a/projects/RabbitMQ.Client/Impl/Channel.cs b/projects/RabbitMQ.Client/Impl/Channel.cs index ca749531b..f23521abc 100644 --- a/projects/RabbitMQ.Client/Impl/Channel.cs +++ b/projects/RabbitMQ.Client/Impl/Channel.cs @@ -55,7 +55,6 @@ internal partial class Channel : IChannel, IRecoverable // AMQP only allows one RPC operation to be active at a time. protected readonly SemaphoreSlim _rpcSemaphore = new SemaphoreSlim(1, 1); private readonly RpcContinuationQueue _continuationQueue = new RpcContinuationQueue(); - private readonly AsyncManualResetEvent _flowControlBlock = new AsyncManualResetEvent(true); private ShutdownEventArgs? _closeReason; public ShutdownEventArgs? CloseReason => Volatile.Read(ref _closeReason); @@ -361,11 +360,14 @@ protected bool Enqueue(IRpcContinuation k) } } - internal async Task OpenAsync(bool publisherConfirmationsEnabled = false, - bool publisherConfirmationTrackingEnabled = false, - CancellationToken cancellationToken = default) + internal async Task OpenAsync(bool publisherConfirmationsEnabled, + bool publisherConfirmationTrackingEnabled, + ushort? maxOutstandingPublisherConfirmations, + CancellationToken cancellationToken) { - ConfigurePublisherConfirmations(publisherConfirmationsEnabled, publisherConfirmationTrackingEnabled); + ConfigurePublisherConfirmations(publisherConfirmationsEnabled, + publisherConfirmationTrackingEnabled, + maxOutstandingPublisherConfirmations); bool enqueued = false; var k = new ChannelOpenAsyncRpcContinuation(ContinuationTimeout, cancellationToken); diff --git a/projects/RabbitMQ.Client/Impl/Connection.cs b/projects/RabbitMQ.Client/Impl/Connection.cs index 333f24a4b..870ca5fc1 100644 --- a/projects/RabbitMQ.Client/Impl/Connection.cs +++ b/projects/RabbitMQ.Client/Impl/Connection.cs @@ -273,7 +273,11 @@ public async Task CreateChannelAsync(CreateChannelOptions? options = d // TODO channel CreateChannelAsync() to combine ctor and OpenAsync var channel = new Channel(_config, session, options.ConsumerDispatchConcurrency); - IChannel ch = await channel.OpenAsync(options.PublisherConfirmationsEnabled, options.PublisherConfirmationTrackingEnabled, cancellationToken) + IChannel ch = await channel.OpenAsync( + options.PublisherConfirmationsEnabled, + options.PublisherConfirmationTrackingEnabled, + options.MaxOutstandingPublisherConfirmations, + cancellationToken) .ConfigureAwait(false); return ch; } diff --git a/projects/RabbitMQ.Client/PublicAPI.Unshipped.txt b/projects/RabbitMQ.Client/PublicAPI.Unshipped.txt index e9f625369..d811ca426 100644 --- a/projects/RabbitMQ.Client/PublicAPI.Unshipped.txt +++ b/projects/RabbitMQ.Client/PublicAPI.Unshipped.txt @@ -3,6 +3,8 @@ RabbitMQ.Client.CreateChannelOptions RabbitMQ.Client.CreateChannelOptions.ConsumerDispatchConcurrency.get -> ushort? RabbitMQ.Client.CreateChannelOptions.ConsumerDispatchConcurrency.set -> void RabbitMQ.Client.CreateChannelOptions.CreateChannelOptions() -> void +RabbitMQ.Client.CreateChannelOptions.MaxOutstandingPublisherConfirmations.get -> ushort? +RabbitMQ.Client.CreateChannelOptions.MaxOutstandingPublisherConfirmations.set -> void RabbitMQ.Client.CreateChannelOptions.PublisherConfirmationsEnabled.get -> bool RabbitMQ.Client.CreateChannelOptions.PublisherConfirmationsEnabled.set -> void RabbitMQ.Client.CreateChannelOptions.PublisherConfirmationTrackingEnabled.get -> bool From 9343e8dadf0a26dc26e24dd38e43017558b6cd62 Mon Sep 17 00:00:00 2001 From: Luke Bakken Date: Fri, 18 Oct 2024 16:20:54 -0700 Subject: [PATCH 3/8] * Actually enforce outstanding pub conf limit. --- .../Impl/Channel.BasicPublish.cs | 12 +++-- .../Impl/Channel.PublisherConfirms.cs | 52 +++++++++++++++++++ 2 files changed, 61 insertions(+), 3 deletions(-) diff --git a/projects/RabbitMQ.Client/Impl/Channel.BasicPublish.cs b/projects/RabbitMQ.Client/Impl/Channel.BasicPublish.cs index a5f4eab7e..f9374a5ee 100644 --- a/projects/RabbitMQ.Client/Impl/Channel.BasicPublish.cs +++ b/projects/RabbitMQ.Client/Impl/Channel.BasicPublish.cs @@ -56,7 +56,10 @@ public async ValueTask BasicPublishAsync(string exchange, string ro await MaybeStartPublisherConfirmationTracking(cancellationToken) .ConfigureAwait(false); - await EnforceFlowControlAsync(cancellationToken) + await MaybeEnforceFlowControlAsync(cancellationToken) + .ConfigureAwait(false); + + await MaybeEnforceOutstandingPublisherConfirmationsAsync(cancellationToken) .ConfigureAwait(false); var cmd = new BasicPublish(exchange, routingKey, mandatory, default); @@ -111,7 +114,10 @@ public async ValueTask BasicPublishAsync(CachedString exchange, Cac await MaybeStartPublisherConfirmationTracking(cancellationToken) .ConfigureAwait(false); - await EnforceFlowControlAsync(cancellationToken) + await MaybeEnforceFlowControlAsync(cancellationToken) + .ConfigureAwait(false); + + await MaybeEnforceOutstandingPublisherConfirmationsAsync(cancellationToken) .ConfigureAwait(false); var cmd = new BasicPublishMemory(exchange.Bytes, routingKey.Bytes, mandatory, default); @@ -225,7 +231,7 @@ void MaybeAddPublishSequenceNumberToHeaders(IDictionary headers } [MethodImpl(MethodImplOptions.AggressiveInlining)] - private ValueTask EnforceFlowControlAsync(CancellationToken cancellationToken) + private ValueTask MaybeEnforceFlowControlAsync(CancellationToken cancellationToken) { if (_flowControlBlock.IsSet) { diff --git a/projects/RabbitMQ.Client/Impl/Channel.PublisherConfirms.cs b/projects/RabbitMQ.Client/Impl/Channel.PublisherConfirms.cs index 8e972eec8..040c72884 100644 --- a/projects/RabbitMQ.Client/Impl/Channel.PublisherConfirms.cs +++ b/projects/RabbitMQ.Client/Impl/Channel.PublisherConfirms.cs @@ -141,6 +141,7 @@ private async Task MaybeConfirmSelect(CancellationToken cancellationToken) if (_publisherConfirmationTrackingEnabled) { _confirmsTaskCompletionSources.Clear(); + MaybeUnblockPublishers(); } _nextPublishSeqNo = 1; } @@ -186,6 +187,7 @@ private void HandleAck(ulong deliveryTag, bool multiple) { pair.Value.SetResult(true); _confirmsTaskCompletionSources.Remove(pair.Key, out _); + MaybeUnblockPublishers(); } } } @@ -193,6 +195,7 @@ private void HandleAck(ulong deliveryTag, bool multiple) { if (_confirmsTaskCompletionSources.TryRemove(deliveryTag, out TaskCompletionSource? tcs)) { + MaybeUnblockPublishers(); tcs.SetResult(true); } } @@ -212,6 +215,7 @@ private void HandleNack(ulong deliveryTag, bool multiple, bool isReturn) { pair.Value.SetException(new PublishException(pair.Key, isReturn)); _confirmsTaskCompletionSources.Remove(pair.Key, out _); + MaybeUnblockPublishers(); } } } @@ -219,6 +223,7 @@ private void HandleNack(ulong deliveryTag, bool multiple, bool isReturn) { if (_confirmsTaskCompletionSources.Remove(deliveryTag, out TaskCompletionSource? tcs)) { + MaybeUnblockPublishers(); tcs.SetException(new PublishException(deliveryTag, isReturn)); } } @@ -261,6 +266,7 @@ await _confirmSemaphore.WaitAsync(reason.CancellationToken) } _confirmsTaskCompletionSources.Clear(); + MaybeUnblockPublishers(); } } finally @@ -285,6 +291,7 @@ await _confirmSemaphore.WaitAsync(cancellationToken) { publisherConfirmationTcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); _confirmsTaskCompletionSources[publishSequenceNumber] = publisherConfirmationTcs; + MaybeBlockPublishers(); } _nextPublishSeqNo++; @@ -311,6 +318,7 @@ private bool MaybeHandleExceptionWithEnabledPublisherConfirmations(PublisherConf if (_publisherConfirmationTrackingEnabled) { _confirmsTaskCompletionSources.TryRemove(publisherConfirmationInfo.PublishSequenceNumber, out _); + MaybeUnblockPublishers(); } exceptionWasHandled = publisherConfirmationInfo.MaybeHandleException(ex); @@ -334,5 +342,49 @@ await publisherConfirmationInfo.MaybeWaitForConfirmationAsync(cancellationToken) } } } + + [MethodImpl(MethodImplOptions.AggressiveInlining)] + private ValueTask MaybeEnforceOutstandingPublisherConfirmationsAsync(CancellationToken cancellationToken) + { + if (_publisherConfirmationTrackingEnabled) + { + if (_maxOutstandingPublisherConfirmsReached.IsSet) + { + return default; + } + else + { + return _maxOutstandingPublisherConfirmsReached.WaitAsync(cancellationToken); + } + } + + return default; + } + + [MethodImpl(MethodImplOptions.AggressiveInlining)] + private void MaybeBlockPublishers() + { + if (_publisherConfirmationTrackingEnabled) + { + if (_maxOutstandingPublisherConfirmations is not null + && _confirmsTaskCompletionSources.Count >= _maxOutstandingPublisherConfirmations) + { + _maxOutstandingPublisherConfirmsReached.Reset(); + } + } + } + + [MethodImpl(MethodImplOptions.AggressiveInlining)] + private void MaybeUnblockPublishers() + { + if (_publisherConfirmationTrackingEnabled) + { + if (_maxOutstandingPublisherConfirmations is not null + && _confirmsTaskCompletionSources.Count < _maxOutstandingPublisherConfirmations) + { + _maxOutstandingPublisherConfirmsReached.Set(); + } + } + } } } From 197a22b1a878fbfc0cd5566350161bc581bb058c Mon Sep 17 00:00:00 2001 From: Luke Bakken Date: Sat, 19 Oct 2024 15:08:35 -0700 Subject: [PATCH 4/8] * Use `SemaphoreSlim` to block publishers when max outstanding is reached. --- .../Impl/Channel.BasicPublish.cs | 6 -- .../Impl/Channel.PublisherConfirms.cs | 66 +++++-------------- projects/RabbitMQ.Client/Impl/Channel.cs | 6 +- 3 files changed, 20 insertions(+), 58 deletions(-) diff --git a/projects/RabbitMQ.Client/Impl/Channel.BasicPublish.cs b/projects/RabbitMQ.Client/Impl/Channel.BasicPublish.cs index f9374a5ee..04744cb0e 100644 --- a/projects/RabbitMQ.Client/Impl/Channel.BasicPublish.cs +++ b/projects/RabbitMQ.Client/Impl/Channel.BasicPublish.cs @@ -59,9 +59,6 @@ await MaybeStartPublisherConfirmationTracking(cancellationToken) await MaybeEnforceFlowControlAsync(cancellationToken) .ConfigureAwait(false); - await MaybeEnforceOutstandingPublisherConfirmationsAsync(cancellationToken) - .ConfigureAwait(false); - var cmd = new BasicPublish(exchange, routingKey, mandatory, default); using Activity? sendActivity = RabbitMQActivitySource.PublisherHasListeners @@ -117,9 +114,6 @@ await MaybeStartPublisherConfirmationTracking(cancellationToken) await MaybeEnforceFlowControlAsync(cancellationToken) .ConfigureAwait(false); - await MaybeEnforceOutstandingPublisherConfirmationsAsync(cancellationToken) - .ConfigureAwait(false); - var cmd = new BasicPublishMemory(exchange.Bytes, routingKey.Bytes, mandatory, default); using Activity? sendActivity = RabbitMQActivitySource.PublisherHasListeners diff --git a/projects/RabbitMQ.Client/Impl/Channel.PublisherConfirms.cs b/projects/RabbitMQ.Client/Impl/Channel.PublisherConfirms.cs index 040c72884..db4df66e8 100644 --- a/projects/RabbitMQ.Client/Impl/Channel.PublisherConfirms.cs +++ b/projects/RabbitMQ.Client/Impl/Channel.PublisherConfirms.cs @@ -48,10 +48,10 @@ internal partial class Channel : IChannel, IRecoverable private bool _publisherConfirmationsEnabled = false; private bool _publisherConfirmationTrackingEnabled = false; private ushort? _maxOutstandingPublisherConfirmations = null; + private SemaphoreSlim? _maxOutstandingConfirmationsSemaphore; private ulong _nextPublishSeqNo = 0; private readonly SemaphoreSlim _confirmSemaphore = new(1, 1); private readonly ConcurrentDictionary> _confirmsTaskCompletionSources = new(); - private readonly AsyncManualResetEvent _maxOutstandingPublisherConfirmsReached = new(true); private class PublisherConfirmationInfo { @@ -124,6 +124,13 @@ private void ConfigurePublisherConfirmations(bool publisherConfirmationsEnabled, _publisherConfirmationsEnabled = publisherConfirmationsEnabled; _publisherConfirmationTrackingEnabled = publisherConfirmationTrackingEnabled; _maxOutstandingPublisherConfirmations = maxOutstandingPublisherConfirmations; + + if (_maxOutstandingPublisherConfirmations is not null) + { + _maxOutstandingConfirmationsSemaphore = new SemaphoreSlim( + (int)_maxOutstandingPublisherConfirmations, + (int)_maxOutstandingPublisherConfirmations); + } } private async Task MaybeConfirmSelect(CancellationToken cancellationToken) @@ -141,7 +148,6 @@ private async Task MaybeConfirmSelect(CancellationToken cancellationToken) if (_publisherConfirmationTrackingEnabled) { _confirmsTaskCompletionSources.Clear(); - MaybeUnblockPublishers(); } _nextPublishSeqNo = 1; } @@ -187,7 +193,6 @@ private void HandleAck(ulong deliveryTag, bool multiple) { pair.Value.SetResult(true); _confirmsTaskCompletionSources.Remove(pair.Key, out _); - MaybeUnblockPublishers(); } } } @@ -195,7 +200,6 @@ private void HandleAck(ulong deliveryTag, bool multiple) { if (_confirmsTaskCompletionSources.TryRemove(deliveryTag, out TaskCompletionSource? tcs)) { - MaybeUnblockPublishers(); tcs.SetResult(true); } } @@ -215,7 +219,6 @@ private void HandleNack(ulong deliveryTag, bool multiple, bool isReturn) { pair.Value.SetException(new PublishException(pair.Key, isReturn)); _confirmsTaskCompletionSources.Remove(pair.Key, out _); - MaybeUnblockPublishers(); } } } @@ -223,7 +226,6 @@ private void HandleNack(ulong deliveryTag, bool multiple, bool isReturn) { if (_confirmsTaskCompletionSources.Remove(deliveryTag, out TaskCompletionSource? tcs)) { - MaybeUnblockPublishers(); tcs.SetException(new PublishException(deliveryTag, isReturn)); } } @@ -266,7 +268,6 @@ await _confirmSemaphore.WaitAsync(reason.CancellationToken) } _confirmsTaskCompletionSources.Clear(); - MaybeUnblockPublishers(); } } finally @@ -291,7 +292,12 @@ await _confirmSemaphore.WaitAsync(cancellationToken) { publisherConfirmationTcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); _confirmsTaskCompletionSources[publishSequenceNumber] = publisherConfirmationTcs; - MaybeBlockPublishers(); + } + + if (_maxOutstandingConfirmationsSemaphore is not null) + { + await _maxOutstandingConfirmationsSemaphore.WaitAsync(cancellationToken) + .ConfigureAwait(false); } _nextPublishSeqNo++; @@ -318,7 +324,6 @@ private bool MaybeHandleExceptionWithEnabledPublisherConfirmations(PublisherConf if (_publisherConfirmationTrackingEnabled) { _confirmsTaskCompletionSources.TryRemove(publisherConfirmationInfo.PublishSequenceNumber, out _); - MaybeUnblockPublishers(); } exceptionWasHandled = publisherConfirmationInfo.MaybeHandleException(ex); @@ -340,49 +345,10 @@ private async Task MaybeEndPublisherConfirmationTracking(PublisherConfirmationIn await publisherConfirmationInfo.MaybeWaitForConfirmationAsync(cancellationToken) .ConfigureAwait(false); } - } - } - - [MethodImpl(MethodImplOptions.AggressiveInlining)] - private ValueTask MaybeEnforceOutstandingPublisherConfirmationsAsync(CancellationToken cancellationToken) - { - if (_publisherConfirmationTrackingEnabled) - { - if (_maxOutstandingPublisherConfirmsReached.IsSet) - { - return default; - } - else - { - return _maxOutstandingPublisherConfirmsReached.WaitAsync(cancellationToken); - } - } - - return default; - } - - [MethodImpl(MethodImplOptions.AggressiveInlining)] - private void MaybeBlockPublishers() - { - if (_publisherConfirmationTrackingEnabled) - { - if (_maxOutstandingPublisherConfirmations is not null - && _confirmsTaskCompletionSources.Count >= _maxOutstandingPublisherConfirmations) - { - _maxOutstandingPublisherConfirmsReached.Reset(); - } - } - } - [MethodImpl(MethodImplOptions.AggressiveInlining)] - private void MaybeUnblockPublishers() - { - if (_publisherConfirmationTrackingEnabled) - { - if (_maxOutstandingPublisherConfirmations is not null - && _confirmsTaskCompletionSources.Count < _maxOutstandingPublisherConfirmations) + if (_maxOutstandingConfirmationsSemaphore is not null) { - _maxOutstandingPublisherConfirmsReached.Set(); + _maxOutstandingConfirmationsSemaphore.Release(); } } } diff --git a/projects/RabbitMQ.Client/Impl/Channel.cs b/projects/RabbitMQ.Client/Impl/Channel.cs index f23521abc..871a84d60 100644 --- a/projects/RabbitMQ.Client/Impl/Channel.cs +++ b/projects/RabbitMQ.Client/Impl/Channel.cs @@ -531,7 +531,8 @@ protected virtual void Dispose(bool disposing) ConsumerDispatcher.Dispose(); _rpcSemaphore.Dispose(); - _confirmSemaphore?.Dispose(); + _confirmSemaphore.Dispose(); + _maxOutstandingConfirmationsSemaphore?.Dispose(); } } @@ -552,7 +553,8 @@ protected virtual async ValueTask DisposeAsyncCore() ConsumerDispatcher.Dispose(); _rpcSemaphore.Dispose(); - _confirmSemaphore?.Dispose(); + _confirmSemaphore.Dispose(); + _maxOutstandingConfirmationsSemaphore?.Dispose(); } public Task ConnectionTuneOkAsync(ushort channelMax, uint frameMax, ushort heartbeat, CancellationToken cancellationToken) From 7e5c2c191a04a37a80db76319c9cae1a63b690ef Mon Sep 17 00:00:00 2001 From: Luke Bakken Date: Sat, 19 Oct 2024 15:39:19 -0700 Subject: [PATCH 5/8] * Add delay if outstanding confirms is more than 50% of max. --- .../RabbitMQ.Client/CreateChannelOptions.cs | 3 ++- .../Impl/Channel.PublisherConfirms.cs | 27 ++++++++++++++----- .../PublisherConfirms/PublisherConfirms.cs | 27 +++++++++++-------- 3 files changed, 38 insertions(+), 19 deletions(-) diff --git a/projects/RabbitMQ.Client/CreateChannelOptions.cs b/projects/RabbitMQ.Client/CreateChannelOptions.cs index a64ae1934..647177972 100644 --- a/projects/RabbitMQ.Client/CreateChannelOptions.cs +++ b/projects/RabbitMQ.Client/CreateChannelOptions.cs @@ -16,7 +16,8 @@ public sealed class CreateChannelOptions public bool PublisherConfirmationTrackingEnabled { get; set; } = false; /// - /// The number of allowed outstanding publisher confirmations before publishing is blocked. + /// If publisher confirmation tracking is enabled, this represents the number of allowed + /// outstanding publisher confirmations before publishing is blocked. /// /// Defaults to 128 /// diff --git a/projects/RabbitMQ.Client/Impl/Channel.PublisherConfirms.cs b/projects/RabbitMQ.Client/Impl/Channel.PublisherConfirms.cs index db4df66e8..706105677 100644 --- a/projects/RabbitMQ.Client/Impl/Channel.PublisherConfirms.cs +++ b/projects/RabbitMQ.Client/Impl/Channel.PublisherConfirms.cs @@ -125,7 +125,7 @@ private void ConfigurePublisherConfirmations(bool publisherConfirmationsEnabled, _publisherConfirmationTrackingEnabled = publisherConfirmationTrackingEnabled; _maxOutstandingPublisherConfirmations = maxOutstandingPublisherConfirmations; - if (_maxOutstandingPublisherConfirmations is not null) + if (_publisherConfirmationTrackingEnabled && _maxOutstandingPublisherConfirmations is not null) { _maxOutstandingConfirmationsSemaphore = new SemaphoreSlim( (int)_maxOutstandingPublisherConfirmations, @@ -282,6 +282,18 @@ await _confirmSemaphore.WaitAsync(reason.CancellationToken) { if (_publisherConfirmationsEnabled) { + if (_publisherConfirmationTrackingEnabled) + { + if (_maxOutstandingPublisherConfirmations is not null) + { + int percentOfMax = _confirmsTaskCompletionSources.Count / (int)_maxOutstandingPublisherConfirmations; + if (percentOfMax > 0.5) + { + await Task.Delay(1000 * percentOfMax).ConfigureAwait(false); + } + } + } + await _confirmSemaphore.WaitAsync(cancellationToken) .ConfigureAwait(false); @@ -292,12 +304,12 @@ await _confirmSemaphore.WaitAsync(cancellationToken) { publisherConfirmationTcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); _confirmsTaskCompletionSources[publishSequenceNumber] = publisherConfirmationTcs; - } - if (_maxOutstandingConfirmationsSemaphore is not null) - { - await _maxOutstandingConfirmationsSemaphore.WaitAsync(cancellationToken) - .ConfigureAwait(false); + if (_maxOutstandingConfirmationsSemaphore is not null) + { + await _maxOutstandingConfirmationsSemaphore.WaitAsync(cancellationToken) + .ConfigureAwait(false); + } } _nextPublishSeqNo++; @@ -346,7 +358,8 @@ await publisherConfirmationInfo.MaybeWaitForConfirmationAsync(cancellationToken) .ConfigureAwait(false); } - if (_maxOutstandingConfirmationsSemaphore is not null) + if (_publisherConfirmationTrackingEnabled && + _maxOutstandingConfirmationsSemaphore is not null) { _maxOutstandingConfirmationsSemaphore.Release(); } diff --git a/projects/Test/Applications/PublisherConfirms/PublisherConfirms.cs b/projects/Test/Applications/PublisherConfirms/PublisherConfirms.cs index d2439f256..10e153b8e 100644 --- a/projects/Test/Applications/PublisherConfirms/PublisherConfirms.cs +++ b/projects/Test/Applications/PublisherConfirms/PublisherConfirms.cs @@ -38,9 +38,18 @@ using System.Threading.Tasks; using RabbitMQ.Client; +const ushort MAX_OUTSTANDING_CONFIRMS = 256; + const int MESSAGE_COUNT = 50_000; bool debug = false; +var channelOpts = new CreateChannelOptions +{ + PublisherConfirmationsEnabled = true, + PublisherConfirmationTrackingEnabled = true, + MaxOutstandingPublisherConfirmations = MAX_OUTSTANDING_CONFIRMS +}; + #pragma warning disable CS8321 // Local function is declared but never used await PublishMessagesIndividuallyAsync(); @@ -53,12 +62,12 @@ static Task CreateConnectionAsync() return factory.CreateConnectionAsync(); } -static async Task PublishMessagesIndividuallyAsync() +async Task PublishMessagesIndividuallyAsync() { Console.WriteLine($"{DateTime.Now} [INFO] publishing {MESSAGE_COUNT:N0} messages and handling confirms per-message"); await using IConnection connection = await CreateConnectionAsync(); - await using IChannel channel = await connection.CreateChannelAsync(new CreateChannelOptions { PublisherConfirmationsEnabled = true, PublisherConfirmationTrackingEnabled = true }); + await using IChannel channel = await connection.CreateChannelAsync(channelOpts); // declare a server-named queue QueueDeclareOk queueDeclareResult = await channel.QueueDeclareAsync(); @@ -85,18 +94,18 @@ static async Task PublishMessagesIndividuallyAsync() Console.WriteLine($"{DateTime.Now} [INFO] published {MESSAGE_COUNT:N0} messages individually in {sw.ElapsedMilliseconds:N0} ms"); } -static async Task PublishMessagesInBatchAsync() +async Task PublishMessagesInBatchAsync() { Console.WriteLine($"{DateTime.Now} [INFO] publishing {MESSAGE_COUNT:N0} messages and handling confirms in batches"); await using IConnection connection = await CreateConnectionAsync(); - await using IChannel channel = await connection.CreateChannelAsync(new CreateChannelOptions { PublisherConfirmationsEnabled = true, PublisherConfirmationTrackingEnabled = true }); + await using IChannel channel = await connection.CreateChannelAsync(channelOpts); // declare a server-named queue QueueDeclareOk queueDeclareResult = await channel.QueueDeclareAsync(); string queueName = queueDeclareResult.QueueName; - int batchSize = 1000; + int batchSize = MAX_OUTSTANDING_CONFIRMS; int outstandingMessageCount = 0; var sw = new Stopwatch(); @@ -154,12 +163,8 @@ async Task HandlePublishConfirmsAsynchronously() await using IConnection connection = await CreateConnectionAsync(); - var channelOptions = new CreateChannelOptions - { - PublisherConfirmationsEnabled = true, - PublisherConfirmationTrackingEnabled = false - }; - await using IChannel channel = await connection.CreateChannelAsync(channelOptions); + channelOpts.PublisherConfirmationTrackingEnabled = false; + await using IChannel channel = await connection.CreateChannelAsync(channelOpts); // declare a server-named queue QueueDeclareOk queueDeclareResult = await channel.QueueDeclareAsync(); From abf31be7ad5f808793db49fc61ca881c8fe1c114 Mon Sep 17 00:00:00 2001 From: Luke Bakken Date: Mon, 21 Oct 2024 07:50:39 -0700 Subject: [PATCH 6/8] * Disable toxiproxy tests to see if this is why there are CI timeouts --- .github/workflows/build-test.yaml | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/.github/workflows/build-test.yaml b/.github/workflows/build-test.yaml index 07d13d90e..eaf5d3182 100644 --- a/.github/workflows/build-test.yaml +++ b/.github/workflows/build-test.yaml @@ -69,14 +69,14 @@ jobs: - name: Integration Tests timeout-minutes: 25 run: | - $tx = Start-Job -Verbose -ScriptBlock { & "${{ github.workspace }}\.ci\windows\toxiproxy\toxiproxy-server.exe" }; ` - Start-Sleep -Seconds 1; ` - Receive-Job -Job $tx; ` - & "${{ github.workspace }}\.ci\windows\toxiproxy\toxiproxy-cli.exe" list; ` + # $tx = Start-Job -Verbose -ScriptBlock { & "${{ github.workspace }}\.ci\windows\toxiproxy\toxiproxy-server.exe" }; ` + # Start-Sleep -Seconds 1; ` + # Receive-Job -Job $tx; ` + # & "${{ github.workspace }}\.ci\windows\toxiproxy\toxiproxy-cli.exe" list; ` dotnet test ` --environment 'RABBITMQ_LONG_RUNNING_TESTS=true' ` --environment "RABBITMQ_RABBITMQCTL_PATH=${{ steps.install-start-rabbitmq.outputs.path }}" ` - --environment 'RABBITMQ_TOXIPROXY_TESTS=true' ` + --environment 'RABBITMQ_TOXIPROXY_TESTS=false' ` --environment 'PASSWORD=grapefruit' ` --environment SSL_CERTS_DIR="${{ github.workspace }}\.ci\certs" ` "${{ github.workspace }}\projects\Test\Integration\Integration.csproj" --no-restore --no-build --logger 'console;verbosity=detailed' @@ -185,14 +185,14 @@ jobs: path: projects - name: Start RabbitMQ id: start-rabbitmq - run: ${{ github.workspace }}/.ci/ubuntu/gha-setup.sh toxiproxy + run: ${{ github.workspace }}/.ci/ubuntu/gha-setup.sh no-toxiproxy - name: Integration Tests timeout-minutes: 15 run: | dotnet test \ --environment 'RABBITMQ_LONG_RUNNING_TESTS=true' \ --environment "RABBITMQ_RABBITMQCTL_PATH=DOCKER:${{ steps.start-rabbitmq.outputs.id }}" \ - --environment 'RABBITMQ_TOXIPROXY_TESTS=true' \ + --environment 'RABBITMQ_TOXIPROXY_TESTS=false' \ --environment 'PASSWORD=grapefruit' \ --environment SSL_CERTS_DIR="${{ github.workspace }}/.ci/certs" \ "${{ github.workspace }}/projects/Test/Integration/Integration.csproj" --no-restore --no-build --logger 'console;verbosity=detailed' From ebba739ca7797c1000a230d6b4242116a74227bd Mon Sep 17 00:00:00 2001 From: Luke Bakken Date: Mon, 21 Oct 2024 08:24:50 -0700 Subject: [PATCH 7/8] * Re-enable toxiproxy tests * Remove max outstanding confirms check, because that is what is causing issues right now. --- .github/workflows/build-test.yaml | 14 +++++++------- .../Impl/Channel.PublisherConfirms.cs | 19 ++++++------------- 2 files changed, 13 insertions(+), 20 deletions(-) diff --git a/.github/workflows/build-test.yaml b/.github/workflows/build-test.yaml index eaf5d3182..07d13d90e 100644 --- a/.github/workflows/build-test.yaml +++ b/.github/workflows/build-test.yaml @@ -69,14 +69,14 @@ jobs: - name: Integration Tests timeout-minutes: 25 run: | - # $tx = Start-Job -Verbose -ScriptBlock { & "${{ github.workspace }}\.ci\windows\toxiproxy\toxiproxy-server.exe" }; ` - # Start-Sleep -Seconds 1; ` - # Receive-Job -Job $tx; ` - # & "${{ github.workspace }}\.ci\windows\toxiproxy\toxiproxy-cli.exe" list; ` + $tx = Start-Job -Verbose -ScriptBlock { & "${{ github.workspace }}\.ci\windows\toxiproxy\toxiproxy-server.exe" }; ` + Start-Sleep -Seconds 1; ` + Receive-Job -Job $tx; ` + & "${{ github.workspace }}\.ci\windows\toxiproxy\toxiproxy-cli.exe" list; ` dotnet test ` --environment 'RABBITMQ_LONG_RUNNING_TESTS=true' ` --environment "RABBITMQ_RABBITMQCTL_PATH=${{ steps.install-start-rabbitmq.outputs.path }}" ` - --environment 'RABBITMQ_TOXIPROXY_TESTS=false' ` + --environment 'RABBITMQ_TOXIPROXY_TESTS=true' ` --environment 'PASSWORD=grapefruit' ` --environment SSL_CERTS_DIR="${{ github.workspace }}\.ci\certs" ` "${{ github.workspace }}\projects\Test\Integration\Integration.csproj" --no-restore --no-build --logger 'console;verbosity=detailed' @@ -185,14 +185,14 @@ jobs: path: projects - name: Start RabbitMQ id: start-rabbitmq - run: ${{ github.workspace }}/.ci/ubuntu/gha-setup.sh no-toxiproxy + run: ${{ github.workspace }}/.ci/ubuntu/gha-setup.sh toxiproxy - name: Integration Tests timeout-minutes: 15 run: | dotnet test \ --environment 'RABBITMQ_LONG_RUNNING_TESTS=true' \ --environment "RABBITMQ_RABBITMQCTL_PATH=DOCKER:${{ steps.start-rabbitmq.outputs.id }}" \ - --environment 'RABBITMQ_TOXIPROXY_TESTS=false' \ + --environment 'RABBITMQ_TOXIPROXY_TESTS=true' \ --environment 'PASSWORD=grapefruit' \ --environment SSL_CERTS_DIR="${{ github.workspace }}/.ci/certs" \ "${{ github.workspace }}/projects/Test/Integration/Integration.csproj" --no-restore --no-build --logger 'console;verbosity=detailed' diff --git a/projects/RabbitMQ.Client/Impl/Channel.PublisherConfirms.cs b/projects/RabbitMQ.Client/Impl/Channel.PublisherConfirms.cs index 706105677..480c56bd1 100644 --- a/projects/RabbitMQ.Client/Impl/Channel.PublisherConfirms.cs +++ b/projects/RabbitMQ.Client/Impl/Channel.PublisherConfirms.cs @@ -282,17 +282,16 @@ await _confirmSemaphore.WaitAsync(reason.CancellationToken) { if (_publisherConfirmationsEnabled) { + /* if (_publisherConfirmationTrackingEnabled) { - if (_maxOutstandingPublisherConfirmations is not null) + if (_maxOutstandingConfirmationsSemaphore is not null) { - int percentOfMax = _confirmsTaskCompletionSources.Count / (int)_maxOutstandingPublisherConfirmations; - if (percentOfMax > 0.5) - { - await Task.Delay(1000 * percentOfMax).ConfigureAwait(false); - } + await _maxOutstandingConfirmationsSemaphore.WaitAsync(cancellationToken) + .ConfigureAwait(false); } } + */ await _confirmSemaphore.WaitAsync(cancellationToken) .ConfigureAwait(false); @@ -304,12 +303,6 @@ await _confirmSemaphore.WaitAsync(cancellationToken) { publisherConfirmationTcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); _confirmsTaskCompletionSources[publishSequenceNumber] = publisherConfirmationTcs; - - if (_maxOutstandingConfirmationsSemaphore is not null) - { - await _maxOutstandingConfirmationsSemaphore.WaitAsync(cancellationToken) - .ConfigureAwait(false); - } } _nextPublishSeqNo++; @@ -361,7 +354,7 @@ await publisherConfirmationInfo.MaybeWaitForConfirmationAsync(cancellationToken) if (_publisherConfirmationTrackingEnabled && _maxOutstandingConfirmationsSemaphore is not null) { - _maxOutstandingConfirmationsSemaphore.Release(); + // _maxOutstandingConfirmationsSemaphore.Release(); } } } From 16601090271b73f90b9625d7912a77f047707323 Mon Sep 17 00:00:00 2001 From: Luke Bakken Date: Mon, 21 Oct 2024 08:29:49 -0700 Subject: [PATCH 8/8] * Acquire / release outstanding confirms semaphore *before* confirms semaphore. --- .../Impl/Channel.PublisherConfirms.cs | 24 ++++++++----------- 1 file changed, 10 insertions(+), 14 deletions(-) diff --git a/projects/RabbitMQ.Client/Impl/Channel.PublisherConfirms.cs b/projects/RabbitMQ.Client/Impl/Channel.PublisherConfirms.cs index 480c56bd1..f00311b25 100644 --- a/projects/RabbitMQ.Client/Impl/Channel.PublisherConfirms.cs +++ b/projects/RabbitMQ.Client/Impl/Channel.PublisherConfirms.cs @@ -282,16 +282,12 @@ await _confirmSemaphore.WaitAsync(reason.CancellationToken) { if (_publisherConfirmationsEnabled) { - /* - if (_publisherConfirmationTrackingEnabled) + if (_publisherConfirmationTrackingEnabled && + _maxOutstandingConfirmationsSemaphore is not null) { - if (_maxOutstandingConfirmationsSemaphore is not null) - { - await _maxOutstandingConfirmationsSemaphore.WaitAsync(cancellationToken) - .ConfigureAwait(false); - } + await _maxOutstandingConfirmationsSemaphore.WaitAsync(cancellationToken) + .ConfigureAwait(false); } - */ await _confirmSemaphore.WaitAsync(cancellationToken) .ConfigureAwait(false); @@ -343,6 +339,12 @@ private async Task MaybeEndPublisherConfirmationTracking(PublisherConfirmationIn { if (_publisherConfirmationsEnabled) { + if (_publisherConfirmationTrackingEnabled && + _maxOutstandingConfirmationsSemaphore is not null) + { + _maxOutstandingConfirmationsSemaphore.Release(); + } + _confirmSemaphore.Release(); if (publisherConfirmationInfo is not null) @@ -350,12 +352,6 @@ private async Task MaybeEndPublisherConfirmationTracking(PublisherConfirmationIn await publisherConfirmationInfo.MaybeWaitForConfirmationAsync(cancellationToken) .ConfigureAwait(false); } - - if (_publisherConfirmationTrackingEnabled && - _maxOutstandingConfirmationsSemaphore is not null) - { - // _maxOutstandingConfirmationsSemaphore.Release(); - } } } }