diff --git a/projects/RabbitMQ.Client/Impl/AutorecoveringChannel.cs b/projects/RabbitMQ.Client/Impl/AutorecoveringChannel.cs index 241dbbdc4..b057af115 100644 --- a/projects/RabbitMQ.Client/Impl/AutorecoveringChannel.cs +++ b/projects/RabbitMQ.Client/Impl/AutorecoveringChannel.cs @@ -31,6 +31,7 @@ using System; using System.Collections.Generic; +using System.Diagnostics.CodeAnalysis; using System.Runtime.CompilerServices; using System.Threading; using System.Threading.Tasks; @@ -48,8 +49,7 @@ internal sealed class AutorecoveringChannel : IChannel, IRecoverable private AutorecoveringConnection _connection; private RecoveryAwareChannel _innerChannel; private bool _disposed; - private bool _isDisposing; - private readonly object _locker = new(); + private int _isDisposing; private ushort _prefetchCountConsumer; private ushort _prefetchCountGlobal; @@ -254,32 +254,15 @@ await _connection.DeleteRecordedChannelAsync(this, public override string ToString() => InnerChannel.ToString(); - public void Dispose() - { - if (_disposed) - { - return; - } - - DisposeAsync().AsTask().GetAwaiter().GetResult(); - } + public void Dispose() => DisposeAsync().AsTask().GetAwaiter().GetResult(); public async ValueTask DisposeAsync() { - if (_disposed) + if (IsDisposing) { return; } - lock (_locker) - { - if (_isDisposing) - { - return; - } - _isDisposing = true; - } - try { if (IsOpen) @@ -293,7 +276,6 @@ await this.AbortAsync() finally { _disposed = true; - _isDisposing = false; } } @@ -508,7 +490,23 @@ private void ThrowIfDisposed() ThrowDisposed(); } + return; + + [DoesNotReturn] static void ThrowDisposed() => throw new ObjectDisposedException(typeof(AutorecoveringChannel).FullName); } + + private bool IsDisposing + { + get + { + if (Interlocked.Exchange(ref _isDisposing, 1) != 0) + { + return true; + } + + return false; + } + } } } diff --git a/projects/RabbitMQ.Client/Impl/AutorecoveringConnection.cs b/projects/RabbitMQ.Client/Impl/AutorecoveringConnection.cs index f266f60d0..879ed249a 100644 --- a/projects/RabbitMQ.Client/Impl/AutorecoveringConnection.cs +++ b/projects/RabbitMQ.Client/Impl/AutorecoveringConnection.cs @@ -31,6 +31,7 @@ using System; using System.Collections.Generic; +using System.Diagnostics.CodeAnalysis; using System.Runtime.CompilerServices; using System.Threading; using System.Threading.Tasks; @@ -50,8 +51,7 @@ internal sealed partial class AutorecoveringConnection : IConnection private Connection _innerConnection; private bool _disposed; - private bool _isDisposing; - private readonly object _locker = new(); + private int _isDisposing; private Connection InnerConnection { @@ -270,32 +270,15 @@ await RecordChannelAsync(autorecoveringChannel, channelsSemaphoreHeld: false, ca return autorecoveringChannel; } - public void Dispose() - { - if (_disposed) - { - return; - } - - DisposeAsync().AsTask().GetAwaiter().GetResult(); - } + public void Dispose() => DisposeAsync().AsTask().GetAwaiter().GetResult(); public async ValueTask DisposeAsync() { - if (_disposed) + if (IsDisposing) { return; } - lock (_locker) - { - if (_isDisposing) - { - return; - } - _isDisposing = true; - } - try { await _innerConnection.DisposeAsync() @@ -313,7 +296,6 @@ await _innerConnection.DisposeAsync() finally { _disposed = true; - _isDisposing = false; } } @@ -328,7 +310,23 @@ private void ThrowIfDisposed() ThrowDisposed(); } + return; + + [DoesNotReturn] static void ThrowDisposed() => throw new ObjectDisposedException(typeof(AutorecoveringConnection).FullName); } + + private bool IsDisposing + { + get + { + if (Interlocked.Exchange(ref _isDisposing, 1) != 0) + { + return true; + } + + return false; + } + } } } diff --git a/projects/RabbitMQ.Client/Impl/Channel.cs b/projects/RabbitMQ.Client/Impl/Channel.cs index 7718b0fbb..7293f584c 100644 --- a/projects/RabbitMQ.Client/Impl/Channel.cs +++ b/projects/RabbitMQ.Client/Impl/Channel.cs @@ -64,9 +64,7 @@ internal partial class Channel : IChannel, IRecoverable internal readonly IConsumerDispatcher ConsumerDispatcher; private bool _disposed; - private bool _isDisposing; - - private readonly object _locker = new(); + private int _isDisposing; public Channel(ISession session, CreateChannelOptions createChannelOptions) { @@ -531,20 +529,11 @@ void IDisposable.Dispose() protected virtual void Dispose(bool disposing) { - if (_disposed) + if (IsDisposing) { return; } - lock (_locker) - { - if (_isDisposing) - { - return; - } - _isDisposing = true; - } - if (disposing) { try @@ -554,10 +543,7 @@ protected virtual void Dispose(bool disposing) this.AbortAsync().GetAwaiter().GetResult(); } - if (_serverOriginatedChannelCloseTcs is not null) - { - _serverOriginatedChannelCloseTcs.Task.Wait(TimeSpan.FromSeconds(5)); - } + _serverOriginatedChannelCloseTcs?.Task.Wait(TimeSpan.FromSeconds(5)); ConsumerDispatcher.Dispose(); _rpcSemaphore.Dispose(); @@ -567,7 +553,6 @@ protected virtual void Dispose(bool disposing) finally { _disposed = true; - _isDisposing = false; } } } @@ -582,20 +567,11 @@ await DisposeAsyncCore() protected virtual async ValueTask DisposeAsyncCore() { - if (_disposed) + if (IsDisposing) { return; } - lock (_locker) - { - if (_isDisposing) - { - return; - } - _isDisposing = true; - } - try { if (IsOpen) @@ -612,6 +588,7 @@ await _serverOriginatedChannelCloseTcs.Task.WaitAsync(TimeSpan.FromSeconds(5)) ConsumerDispatcher.Dispose(); _rpcSemaphore.Dispose(); _confirmSemaphore.Dispose(); + if (_outstandingPublisherConfirmationsRateLimiter is not null) { await _outstandingPublisherConfirmationsRateLimiter.DisposeAsync() @@ -621,7 +598,6 @@ await _outstandingPublisherConfirmationsRateLimiter.DisposeAsync() finally { _disposed = true; - _isDisposing = false; } } @@ -718,9 +694,12 @@ protected virtual ulong AdjustDeliveryTag(ulong deliveryTag) protected async Task HandleChannelCloseAsync(IncomingCommand cmd, CancellationToken cancellationToken) { - lock (_locker) + TaskCompletionSource? serverOriginatedChannelCloseTcs = _serverOriginatedChannelCloseTcs; + if (serverOriginatedChannelCloseTcs is null) { - _serverOriginatedChannelCloseTcs ??= new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); + // Attempt to assign the new TCS only if _tcs is still null + _ = Interlocked.CompareExchange(ref _serverOriginatedChannelCloseTcs, + new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously), null); } try @@ -742,12 +721,12 @@ await ModelSendAsync(in method, cancellationToken) await Session.NotifyAsync(cancellationToken) .ConfigureAwait(false); - _serverOriginatedChannelCloseTcs.TrySetResult(true); + _serverOriginatedChannelCloseTcs?.TrySetResult(true); return true; } catch (Exception ex) { - _serverOriginatedChannelCloseTcs.TrySetException(ex); + _serverOriginatedChannelCloseTcs?.TrySetException(ex); throw; } } @@ -1669,5 +1648,18 @@ private Task DispatchCommandAsync(IncomingCommand cmd, CancellationToken c } } } + + private bool IsDisposing + { + get + { + if (Interlocked.Exchange(ref _isDisposing, 1) != 0) + { + return true; + } + + return false; + } + } } } diff --git a/projects/RabbitMQ.Client/Impl/Connection.cs b/projects/RabbitMQ.Client/Impl/Connection.cs index 1f8cafc5a..fd88b5b5b 100644 --- a/projects/RabbitMQ.Client/Impl/Connection.cs +++ b/projects/RabbitMQ.Client/Impl/Connection.cs @@ -32,6 +32,7 @@ using System; using System.Collections.Generic; using System.Diagnostics; +using System.Diagnostics.CodeAnalysis; using System.IO; using System.Runtime.CompilerServices; using System.Threading; @@ -46,8 +47,7 @@ namespace RabbitMQ.Client.Framing internal sealed partial class Connection : IConnection { private bool _disposed; - private bool _isDisposing; - private readonly object _locker = new(); + private int _isDisposing; private volatile bool _closed; private readonly ConnectionConfig _config; @@ -487,32 +487,15 @@ internal ValueTask WriteAsync(RentedMemory frames, CancellationToken cancellatio return _frameHandler.WriteAsync(frames, cancellationToken); } - public void Dispose() - { - if (_disposed) - { - return; - } - - DisposeAsync().AsTask().GetAwaiter().GetResult(); - } + public void Dispose() => DisposeAsync().AsTask().GetAwaiter().GetResult(); public async ValueTask DisposeAsync() { - if (_disposed) + if (IsDisposing) { return; } - lock (_locker) - { - if (_isDisposing) - { - return; - } - _isDisposing = true; - } - try { if (IsOpen) @@ -534,7 +517,6 @@ await _channel0.DisposeAsync() finally { _disposed = true; - _isDisposing = false; } } @@ -543,13 +525,13 @@ private void ThrowIfDisposed() { if (_disposed) { - ThrowObjectDisposedException(); + ThrowDisposed(); } - static void ThrowObjectDisposedException() - { - throw new ObjectDisposedException(typeof(Connection).FullName); - } + return; + + [DoesNotReturn] + static void ThrowDisposed() => throw new ObjectDisposedException(typeof(Connection).FullName); } public override string ToString() @@ -557,9 +539,23 @@ public override string ToString() return $"Connection({_id},{Endpoint})"; } + [DoesNotReturn] private static void ThrowAlreadyClosedException(ShutdownEventArgs closeReason) { throw new AlreadyClosedException(closeReason); } + + private bool IsDisposing + { + get + { + if (Interlocked.Exchange(ref _isDisposing, 1) != 0) + { + return true; + } + + return false; + } + } } }