diff --git a/projects/RabbitMQ.Client/Impl/AsyncRpcContinuations.cs b/projects/RabbitMQ.Client/Impl/AsyncRpcContinuations.cs index 6c57555cf..37c45c106 100644 --- a/projects/RabbitMQ.Client/Impl/AsyncRpcContinuations.cs +++ b/projects/RabbitMQ.Client/Impl/AsyncRpcContinuations.cs @@ -51,7 +51,7 @@ internal abstract class AsyncRpcContinuation : IRpcContinuation private bool _disposedValue; - public AsyncRpcContinuation(TimeSpan continuationTimeout, CancellationToken cancellationToken) + protected AsyncRpcContinuation(TimeSpan continuationTimeout, CancellationToken cancellationToken) { /* * Note: we can't use an ObjectPool for these because the netstandard2.0 diff --git a/projects/RabbitMQ.Client/Impl/Channel.PublisherConfirms.cs b/projects/RabbitMQ.Client/Impl/Channel.PublisherConfirms.cs index dff4504d1..9e9b5f77d 100644 --- a/projects/RabbitMQ.Client/Impl/Channel.PublisherConfirms.cs +++ b/projects/RabbitMQ.Client/Impl/Channel.PublisherConfirms.cs @@ -131,38 +131,27 @@ private async Task MaybeConfirmSelect(CancellationToken cancellationToken) if (_publisherConfirmationsEnabled) { // NOTE: _rpcSemaphore is held - bool enqueued = false; - var k = new ConfirmSelectAsyncRpcContinuation(ContinuationTimeout, cancellationToken); + using var k = new ConfirmSelectAsyncRpcContinuation(ContinuationTimeout, cancellationToken); - try + if (_nextPublishSeqNo == 0UL) { - if (_nextPublishSeqNo == 0UL) + if (_publisherConfirmationTrackingEnabled) { - if (_publisherConfirmationTrackingEnabled) - { - _confirmsTaskCompletionSources.Clear(); - } - _nextPublishSeqNo = 1; + _confirmsTaskCompletionSources.Clear(); } + _nextPublishSeqNo = 1; + } - enqueued = Enqueue(k); + Enqueue(k); - var method = new ConfirmSelect(false); - await ModelSendAsync(in method, k.CancellationToken) - .ConfigureAwait(false); + var method = new ConfirmSelect(false); + await ModelSendAsync(in method, k.CancellationToken) + .ConfigureAwait(false); - bool result = await k; - Debug.Assert(result); + bool result = await k; + Debug.Assert(result); - return; - } - finally - { - if (false == enqueued) - { - k.Dispose(); - } - } + return; } } diff --git a/projects/RabbitMQ.Client/Impl/Channel.cs b/projects/RabbitMQ.Client/Impl/Channel.cs index 64be4152e..ad342f3a4 100644 --- a/projects/RabbitMQ.Client/Impl/Channel.cs +++ b/projects/RabbitMQ.Client/Impl/Channel.cs @@ -203,15 +203,14 @@ public Task CloseAsync(ushort replyCode, string replyText, bool abort, public async Task CloseAsync(ShutdownEventArgs args, bool abort, CancellationToken cancellationToken) { - bool enqueued = false; - var k = new ChannelCloseAsyncRpcContinuation(ContinuationTimeout, cancellationToken); + using var k = new ChannelCloseAsyncRpcContinuation(ContinuationTimeout, cancellationToken); await _rpcSemaphore.WaitAsync(k.CancellationToken) .ConfigureAwait(false); try { ChannelShutdownAsync += k.OnConnectionShutdownAsync; - enqueued = Enqueue(k); + Enqueue(k); ConsumerDispatcher.Quiesce(); if (SetCloseReason(args)) @@ -251,10 +250,6 @@ await ConsumerDispatcher.WaitForShutdownAsync() } finally { - if (false == enqueued) - { - k.Dispose(); - } _rpcSemaphore.Release(); ChannelShutdownAsync -= k.OnConnectionShutdownAsync; } @@ -272,14 +267,13 @@ internal async ValueTask ConnectionOpenAsync(string virtualHost, CancellationTok internal async ValueTask ConnectionSecureOkAsync(byte[] response, CancellationToken cancellationToken) { - bool enqueued = false; - var k = new ConnectionSecureOrTuneAsyncRpcContinuation(ContinuationTimeout, cancellationToken); + using var k = new ConnectionSecureOrTuneAsyncRpcContinuation(ContinuationTimeout, cancellationToken); await _rpcSemaphore.WaitAsync(k.CancellationToken) .ConfigureAwait(false); try { - enqueued = Enqueue(k); + Enqueue(k); try { @@ -298,10 +292,6 @@ await ModelSendAsync(in method, k.CancellationToken) } finally { - if (false == enqueued) - { - k.Dispose(); - } _rpcSemaphore.Release(); } } @@ -311,14 +301,13 @@ internal async ValueTask ConnectionStartOkAsync( string mechanism, byte[] response, string locale, CancellationToken cancellationToken) { - bool enqueued = false; - var k = new ConnectionSecureOrTuneAsyncRpcContinuation(ContinuationTimeout, cancellationToken); + using var k = new ConnectionSecureOrTuneAsyncRpcContinuation(ContinuationTimeout, cancellationToken); await _rpcSemaphore.WaitAsync(k.CancellationToken) .ConfigureAwait(false); try { - enqueued = Enqueue(k); + Enqueue(k); try { @@ -337,25 +326,19 @@ await ModelSendAsync(in method, k.CancellationToken) } finally { - if (false == enqueued) - { - k.Dispose(); - } _rpcSemaphore.Release(); } } - protected bool Enqueue(IRpcContinuation k) + protected void Enqueue(IRpcContinuation k) { if (IsOpen) { _continuationQueue.Enqueue(k); - return true; } else { k.HandleChannelShutdown(CloseReason); - return false; } } @@ -366,14 +349,13 @@ internal async Task OpenAsync(CreateChannelOptions createChannelOption createChannelOptions.PublisherConfirmationTrackingEnabled, createChannelOptions.OutstandingPublisherConfirmationsRateLimiter); - bool enqueued = false; - var k = new ChannelOpenAsyncRpcContinuation(ContinuationTimeout, cancellationToken); + using var k = new ChannelOpenAsyncRpcContinuation(ContinuationTimeout, cancellationToken); await _rpcSemaphore.WaitAsync(k.CancellationToken) .ConfigureAwait(false); try { - enqueued = Enqueue(k); + Enqueue(k); var method = new ChannelOpen(); await ModelSendAsync(in method, k.CancellationToken) @@ -387,10 +369,6 @@ await MaybeConfirmSelect(cancellationToken) } finally { - if (false == enqueued) - { - k.Dispose(); - } _rpcSemaphore.Release(); } @@ -422,11 +400,9 @@ private async Task HandleCommandAsync(IncomingCommand cmd, CancellationToken can if (false == await DispatchCommandAsync(cmd, cancellationToken) .ConfigureAwait(false)) { - using (IRpcContinuation c = _continuationQueue.Next()) - { - await c.HandleCommandAsync(cmd) - .ConfigureAwait(false); - } + using IRpcContinuation c = _continuationQueue.Next(); + await c.HandleCommandAsync(cmd) + .ConfigureAwait(false); } } finally @@ -824,11 +800,7 @@ public virtual ValueTask BasicRejectAsync(ulong deliveryTag, bool requeue, public async Task BasicCancelAsync(string consumerTag, bool noWait, CancellationToken cancellationToken) { - bool enqueued = false; - // NOTE: - // Maybe don't dispose these instances because the CancellationTokens must remain - // valid for processing the response. - var k = new BasicCancelAsyncRpcContinuation(consumerTag, ConsumerDispatcher, + using var k = new BasicCancelAsyncRpcContinuation(consumerTag, ConsumerDispatcher, ContinuationTimeout, cancellationToken); await _rpcSemaphore.WaitAsync(k.CancellationToken) @@ -845,7 +817,7 @@ await ModelSendAsync(in method, k.CancellationToken) } else { - enqueued = Enqueue(k); + Enqueue(k); await ModelSendAsync(in method, k.CancellationToken) .ConfigureAwait(false); @@ -858,10 +830,6 @@ await ModelSendAsync(in method, k.CancellationToken) } finally { - if (false == enqueued) - { - k.Dispose(); - } _rpcSemaphore.Release(); } } @@ -870,17 +838,13 @@ public async Task BasicConsumeAsync(string queue, bool autoAck, string c IDictionary? arguments, IAsyncBasicConsumer consumer, CancellationToken cancellationToken) { - // NOTE: - // Maybe don't dispose this instance because the CancellationToken must remain - // valid for processing the response. - bool enqueued = false; - var k = new BasicConsumeAsyncRpcContinuation(consumer, ConsumerDispatcher, ContinuationTimeout, cancellationToken); + using var k = new BasicConsumeAsyncRpcContinuation(consumer, ConsumerDispatcher, ContinuationTimeout, cancellationToken); await _rpcSemaphore.WaitAsync(k.CancellationToken) .ConfigureAwait(false); try { - enqueued = Enqueue(k); + Enqueue(k); var method = new BasicConsume(queue, consumerTag, noLocal, autoAck, exclusive, false, arguments); await ModelSendAsync(in method, k.CancellationToken) @@ -890,10 +854,6 @@ await ModelSendAsync(in method, k.CancellationToken) } finally { - if (false == enqueued) - { - k.Dispose(); - } _rpcSemaphore.Release(); } } @@ -901,15 +861,13 @@ await ModelSendAsync(in method, k.CancellationToken) public async Task BasicGetAsync(string queue, bool autoAck, CancellationToken cancellationToken) { - bool enqueued = false; - - var k = new BasicGetAsyncRpcContinuation(AdjustDeliveryTag, ContinuationTimeout, cancellationToken); + using var k = new BasicGetAsyncRpcContinuation(AdjustDeliveryTag, ContinuationTimeout, cancellationToken); await _rpcSemaphore.WaitAsync(k.CancellationToken) .ConfigureAwait(false); try { - enqueued = Enqueue(k); + Enqueue(k); var method = new BasicGet(queue, autoAck); await ModelSendAsync(in method, k.CancellationToken) @@ -929,10 +887,6 @@ await ModelSendAsync(in method, k.CancellationToken) } finally { - if (false == enqueued) - { - k.Dispose(); - } _rpcSemaphore.Release(); } } @@ -950,15 +904,14 @@ public async Task UpdateSecretAsync(string newSecret, string reason, throw new ArgumentNullException(nameof(reason)); } - bool enqueued = false; - var k = new SimpleAsyncRpcContinuation(ProtocolCommandId.ConnectionUpdateSecretOk, + using var k = new SimpleAsyncRpcContinuation(ProtocolCommandId.ConnectionUpdateSecretOk, ContinuationTimeout, cancellationToken); await _rpcSemaphore.WaitAsync(k.CancellationToken) .ConfigureAwait(false); try { - enqueued = Enqueue(k); + Enqueue(k); byte[] newSecretBytes = Encoding.UTF8.GetBytes(newSecret); var method = new ConnectionUpdateSecret(newSecretBytes, reason); @@ -971,10 +924,6 @@ await ModelSendAsync(in method, k.CancellationToken) } finally { - if (false == enqueued) - { - k.Dispose(); - } _rpcSemaphore.Release(); } } @@ -982,14 +931,13 @@ await ModelSendAsync(in method, k.CancellationToken) public async Task BasicQosAsync(uint prefetchSize, ushort prefetchCount, bool global, CancellationToken cancellationToken) { - bool enqueued = false; - var k = new BasicQosAsyncRpcContinuation(ContinuationTimeout, cancellationToken); + using var k = new BasicQosAsyncRpcContinuation(ContinuationTimeout, cancellationToken); await _rpcSemaphore.WaitAsync(k.CancellationToken) .ConfigureAwait(false); try { - enqueued = Enqueue(k); + Enqueue(k); var method = new BasicQos(prefetchSize, prefetchCount, global); await ModelSendAsync(in method, k.CancellationToken) @@ -1001,10 +949,6 @@ await ModelSendAsync(in method, k.CancellationToken) } finally { - if (false == enqueued) - { - k.Dispose(); - } _rpcSemaphore.Release(); } } @@ -1013,8 +957,7 @@ public async Task ExchangeBindAsync(string destination, string source, string ro IDictionary? arguments, bool noWait, CancellationToken cancellationToken) { - bool enqueued = false; - var k = new ExchangeBindAsyncRpcContinuation(ContinuationTimeout, cancellationToken); + using var k = new ExchangeBindAsyncRpcContinuation(ContinuationTimeout, cancellationToken); await _rpcSemaphore.WaitAsync(k.CancellationToken) .ConfigureAwait(false); @@ -1029,7 +972,7 @@ await ModelSendAsync(in method, k.CancellationToken) } else { - enqueued = Enqueue(k); + Enqueue(k); await ModelSendAsync(in method, k.CancellationToken) .ConfigureAwait(false); @@ -1042,10 +985,6 @@ await ModelSendAsync(in method, k.CancellationToken) } finally { - if (false == enqueued) - { - k.Dispose(); - } _rpcSemaphore.Release(); } } @@ -1061,8 +1000,7 @@ public async Task ExchangeDeclareAsync(string exchange, string type, bool durabl IDictionary? arguments, bool passive, bool noWait, CancellationToken cancellationToken) { - bool enqueued = false; - var k = new ExchangeDeclareAsyncRpcContinuation(ContinuationTimeout, cancellationToken); + using var k = new ExchangeDeclareAsyncRpcContinuation(ContinuationTimeout, cancellationToken); await _rpcSemaphore.WaitAsync(k.CancellationToken) .ConfigureAwait(false); @@ -1076,7 +1014,7 @@ await ModelSendAsync(in method, k.CancellationToken) } else { - enqueued = Enqueue(k); + Enqueue(k); await ModelSendAsync(in method, k.CancellationToken) .ConfigureAwait(false); @@ -1089,10 +1027,6 @@ await ModelSendAsync(in method, k.CancellationToken) } finally { - if (false == enqueued) - { - k.Dispose(); - } _rpcSemaphore.Release(); } } @@ -1100,8 +1034,7 @@ await ModelSendAsync(in method, k.CancellationToken) public async Task ExchangeDeleteAsync(string exchange, bool ifUnused, bool noWait, CancellationToken cancellationToken) { - bool enqueued = false; - var k = new ExchangeDeleteAsyncRpcContinuation(ContinuationTimeout, cancellationToken); + using var k = new ExchangeDeleteAsyncRpcContinuation(ContinuationTimeout, cancellationToken); await _rpcSemaphore.WaitAsync(k.CancellationToken) .ConfigureAwait(false); @@ -1116,7 +1049,7 @@ await ModelSendAsync(in method, k.CancellationToken) } else { - enqueued = Enqueue(k); + Enqueue(k); await ModelSendAsync(in method, k.CancellationToken) .ConfigureAwait(false); @@ -1129,10 +1062,6 @@ await ModelSendAsync(in method, k.CancellationToken) } finally { - if (false == enqueued) - { - k.Dispose(); - } _rpcSemaphore.Release(); } } @@ -1141,8 +1070,7 @@ public async Task ExchangeUnbindAsync(string destination, string source, string IDictionary? arguments, bool noWait, CancellationToken cancellationToken) { - bool enqueued = false; - var k = new ExchangeUnbindAsyncRpcContinuation(ContinuationTimeout, cancellationToken); + using var k = new ExchangeUnbindAsyncRpcContinuation(ContinuationTimeout, cancellationToken); await _rpcSemaphore.WaitAsync(k.CancellationToken) .ConfigureAwait(false); @@ -1157,7 +1085,7 @@ await ModelSendAsync(in method, k.CancellationToken) } else { - enqueued = Enqueue(k); + Enqueue(k); await ModelSendAsync(in method, k.CancellationToken) .ConfigureAwait(false); @@ -1170,10 +1098,6 @@ await ModelSendAsync(in method, k.CancellationToken) } finally { - if (false == enqueued) - { - k.Dispose(); - } _rpcSemaphore.Release(); } } @@ -1203,8 +1127,7 @@ public async Task QueueDeclareAsync(string queue, bool durable, } } - bool enqueued = false; - var k = new QueueDeclareAsyncRpcContinuation(ContinuationTimeout, cancellationToken); + using var k = new QueueDeclareAsyncRpcContinuation(ContinuationTimeout, cancellationToken); await _rpcSemaphore.WaitAsync(k.CancellationToken) .ConfigureAwait(false); @@ -1227,7 +1150,7 @@ await ModelSendAsync(in method, k.CancellationToken) } else { - enqueued = Enqueue(k); + Enqueue(k); await ModelSendAsync(in method, k.CancellationToken) .ConfigureAwait(false); @@ -1243,10 +1166,6 @@ await ModelSendAsync(in method, k.CancellationToken) } finally { - if (false == enqueued) - { - k.Dispose(); - } _rpcSemaphore.Release(); } } @@ -1255,8 +1174,7 @@ public async Task QueueBindAsync(string queue, string exchange, string routingKe IDictionary? arguments, bool noWait, CancellationToken cancellationToken) { - bool enqueued = false; - var k = new QueueBindAsyncRpcContinuation(ContinuationTimeout, cancellationToken); + using var k = new QueueBindAsyncRpcContinuation(ContinuationTimeout, cancellationToken); await _rpcSemaphore.WaitAsync(k.CancellationToken) .ConfigureAwait(false); @@ -1271,7 +1189,7 @@ await ModelSendAsync(in method, k.CancellationToken) } else { - enqueued = Enqueue(k); + Enqueue(k); await ModelSendAsync(in method, k.CancellationToken) .ConfigureAwait(false); @@ -1284,10 +1202,6 @@ await ModelSendAsync(in method, k.CancellationToken) } finally { - if (false == enqueued) - { - k.Dispose(); - } _rpcSemaphore.Release(); } } @@ -1311,8 +1225,7 @@ public async Task ConsumerCountAsync(string queue, public async Task QueueDeleteAsync(string queue, bool ifUnused, bool ifEmpty, bool noWait, CancellationToken cancellationToken) { - bool enqueued = false; - var k = new QueueDeleteAsyncRpcContinuation(ContinuationTimeout, cancellationToken); + using var k = new QueueDeleteAsyncRpcContinuation(ContinuationTimeout, cancellationToken); await _rpcSemaphore.WaitAsync(k.CancellationToken) .ConfigureAwait(false); @@ -1329,7 +1242,7 @@ await ModelSendAsync(in method, k.CancellationToken) } else { - enqueued = Enqueue(k); + Enqueue(k); await ModelSendAsync(in method, k.CancellationToken) .ConfigureAwait(false); @@ -1339,25 +1252,19 @@ await ModelSendAsync(in method, k.CancellationToken) } finally { - if (false == enqueued) - { - k.Dispose(); - } _rpcSemaphore.Release(); } } public async Task QueuePurgeAsync(string queue, CancellationToken cancellationToken) { - bool enqueued = false; - - var k = new QueuePurgeAsyncRpcContinuation(ContinuationTimeout, cancellationToken); + using var k = new QueuePurgeAsyncRpcContinuation(ContinuationTimeout, cancellationToken); await _rpcSemaphore.WaitAsync(k.CancellationToken) .ConfigureAwait(false); try { - enqueued = Enqueue(k); + Enqueue(k); var method = new QueuePurge(queue, false); await ModelSendAsync(in method, k.CancellationToken) @@ -1367,10 +1274,6 @@ await ModelSendAsync(in method, k.CancellationToken) } finally { - if (false == enqueued) - { - k.Dispose(); - } _rpcSemaphore.Release(); } } @@ -1379,14 +1282,13 @@ public async Task QueueUnbindAsync(string queue, string exchange, string routing IDictionary? arguments, CancellationToken cancellationToken) { - bool enqueued = false; - var k = new QueueUnbindAsyncRpcContinuation(ContinuationTimeout, cancellationToken); + using var k = new QueueUnbindAsyncRpcContinuation(ContinuationTimeout, cancellationToken); await _rpcSemaphore.WaitAsync(k.CancellationToken) .ConfigureAwait(false); try { - enqueued = Enqueue(k); + Enqueue(k); var method = new QueueUnbind(queue, exchange, routingKey, arguments); await ModelSendAsync(in method, k.CancellationToken) @@ -1398,24 +1300,19 @@ await ModelSendAsync(in method, k.CancellationToken) } finally { - if (false == enqueued) - { - k.Dispose(); - } _rpcSemaphore.Release(); } } public async Task TxCommitAsync(CancellationToken cancellationToken) { - bool enqueued = false; - var k = new TxCommitAsyncRpcContinuation(ContinuationTimeout, cancellationToken); + using var k = new TxCommitAsyncRpcContinuation(ContinuationTimeout, cancellationToken); await _rpcSemaphore.WaitAsync(k.CancellationToken) .ConfigureAwait(false); try { - enqueued = Enqueue(k); + Enqueue(k); var method = new TxCommit(); await ModelSendAsync(in method, k.CancellationToken) @@ -1427,24 +1324,19 @@ await ModelSendAsync(in method, k.CancellationToken) } finally { - if (false == enqueued) - { - k.Dispose(); - } _rpcSemaphore.Release(); } } public async Task TxRollbackAsync(CancellationToken cancellationToken) { - bool enqueued = false; - var k = new TxRollbackAsyncRpcContinuation(ContinuationTimeout, cancellationToken); + using var k = new TxRollbackAsyncRpcContinuation(ContinuationTimeout, cancellationToken); await _rpcSemaphore.WaitAsync(k.CancellationToken) .ConfigureAwait(false); try { - enqueued = Enqueue(k); + Enqueue(k); var method = new TxRollback(); await ModelSendAsync(in method, k.CancellationToken) @@ -1456,24 +1348,19 @@ await ModelSendAsync(in method, k.CancellationToken) } finally { - if (false == enqueued) - { - k.Dispose(); - } _rpcSemaphore.Release(); } } public async Task TxSelectAsync(CancellationToken cancellationToken) { - bool enqueued = false; - var k = new TxSelectAsyncRpcContinuation(ContinuationTimeout, cancellationToken); + using var k = new TxSelectAsyncRpcContinuation(ContinuationTimeout, cancellationToken); await _rpcSemaphore.WaitAsync(k.CancellationToken) .ConfigureAwait(false); try { - enqueued = Enqueue(k); + Enqueue(k); var method = new TxSelect(); await ModelSendAsync(in method, k.CancellationToken) @@ -1485,10 +1372,6 @@ await ModelSendAsync(in method, k.CancellationToken) } finally { - if (false == enqueued) - { - k.Dispose(); - } _rpcSemaphore.Release(); } }