Skip to content

1749 review #1761

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 7 additions & 22 deletions projects/RabbitMQ.Client/Impl/AutorecoveringChannel.cs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@

using System;
using System.Collections.Generic;
using System.Diagnostics.CodeAnalysis;
using System.Runtime.CompilerServices;
using System.Threading;
using System.Threading.Tasks;
Expand All @@ -48,8 +49,7 @@ internal sealed class AutorecoveringChannel : IChannel, IRecoverable
private AutorecoveringConnection _connection;
private RecoveryAwareChannel _innerChannel;
private bool _disposed;
private bool _isDisposing;
private readonly object _locker = new();
private int _disposeSignaled;

private ushort _prefetchCountConsumer;
private ushort _prefetchCountGlobal;
Expand Down Expand Up @@ -254,32 +254,15 @@ await _connection.DeleteRecordedChannelAsync(this,
public override string ToString()
=> InnerChannel.ToString();

public void Dispose()
{
if (_disposed)
{
return;
}

DisposeAsync().AsTask().GetAwaiter().GetResult();
}
public void Dispose() => DisposeAsync().AsTask().GetAwaiter().GetResult();

public async ValueTask DisposeAsync()
{
if (_disposed)
if (Interlocked.Exchange(ref _disposeSignaled, 1) != 0)
{
return;
}

lock (_locker)
{
if (_isDisposing)
{
return;
}
_isDisposing = true;
}

try
{
if (IsOpen)
Expand All @@ -293,7 +276,6 @@ await this.AbortAsync()
finally
{
_disposed = true;
_isDisposing = false;
}
}

Expand Down Expand Up @@ -508,6 +490,9 @@ private void ThrowIfDisposed()
ThrowDisposed();
}

return;

[DoesNotReturn]
static void ThrowDisposed() => throw new ObjectDisposedException(typeof(AutorecoveringChannel).FullName);
}
}
Expand Down
29 changes: 7 additions & 22 deletions projects/RabbitMQ.Client/Impl/AutorecoveringConnection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@

using System;
using System.Collections.Generic;
using System.Diagnostics.CodeAnalysis;
using System.Runtime.CompilerServices;
using System.Threading;
using System.Threading.Tasks;
Expand All @@ -50,8 +51,7 @@ internal sealed partial class AutorecoveringConnection : IConnection

private Connection _innerConnection;
private bool _disposed;
private bool _isDisposing;
private readonly object _locker = new();
private int _disposeSignaled;

private Connection InnerConnection
{
Expand Down Expand Up @@ -270,32 +270,15 @@ await RecordChannelAsync(autorecoveringChannel, channelsSemaphoreHeld: false, ca
return autorecoveringChannel;
}

public void Dispose()
{
if (_disposed)
{
return;
}

DisposeAsync().AsTask().GetAwaiter().GetResult();
}
public void Dispose() => DisposeAsync().AsTask().GetAwaiter().GetResult();

public async ValueTask DisposeAsync()
{
if (_disposed)
if (Interlocked.Exchange(ref _disposeSignaled, 1) != 0)
{
return;
}

lock (_locker)
{
if (_isDisposing)
{
return;
}
_isDisposing = true;
}

try
{
await _innerConnection.DisposeAsync()
Expand All @@ -313,7 +296,6 @@ await _innerConnection.DisposeAsync()
finally
{
_disposed = true;
_isDisposing = false;
}
}

Expand All @@ -328,6 +310,9 @@ private void ThrowIfDisposed()
ThrowDisposed();
}

return;

[DoesNotReturn]
static void ThrowDisposed() => throw new ObjectDisposedException(typeof(AutorecoveringConnection).FullName);
}
}
Expand Down
40 changes: 9 additions & 31 deletions projects/RabbitMQ.Client/Impl/Channel.cs
Original file line number Diff line number Diff line change
Expand Up @@ -64,9 +64,7 @@ internal partial class Channel : IChannel, IRecoverable
internal readonly IConsumerDispatcher ConsumerDispatcher;

private bool _disposed;
private bool _isDisposing;

private readonly object _locker = new();
private int _disposeSignaled;

public Channel(ISession session, CreateChannelOptions createChannelOptions)
{
Expand Down Expand Up @@ -531,20 +529,11 @@ void IDisposable.Dispose()

protected virtual void Dispose(bool disposing)
{
if (_disposed)
if (Interlocked.Exchange(ref _disposeSignaled, 1) != 0)
{
return;
}

lock (_locker)
{
if (_isDisposing)
{
return;
}
_isDisposing = true;
}

if (disposing)
{
try
Expand All @@ -554,10 +543,7 @@ protected virtual void Dispose(bool disposing)
this.AbortAsync().GetAwaiter().GetResult();
}

if (_serverOriginatedChannelCloseTcs is not null)
{
_serverOriginatedChannelCloseTcs.Task.Wait(TimeSpan.FromSeconds(5));
}
_serverOriginatedChannelCloseTcs?.Task.Wait(TimeSpan.FromSeconds(5));

ConsumerDispatcher.Dispose();
_rpcSemaphore.Dispose();
Expand All @@ -567,7 +553,6 @@ protected virtual void Dispose(bool disposing)
finally
{
_disposed = true;
_isDisposing = false;
}
}
}
Expand All @@ -582,20 +567,11 @@ await DisposeAsyncCore()

protected virtual async ValueTask DisposeAsyncCore()
{
if (_disposed)
if (Interlocked.Exchange(ref _disposeSignaled, 1) != 0)
{
return;
}

lock (_locker)
{
if (_isDisposing)
{
return;
}
_isDisposing = true;
}

try
{
if (IsOpen)
Expand All @@ -612,6 +588,7 @@ await _serverOriginatedChannelCloseTcs.Task.WaitAsync(TimeSpan.FromSeconds(5))
ConsumerDispatcher.Dispose();
_rpcSemaphore.Dispose();
_confirmSemaphore.Dispose();

if (_outstandingPublisherConfirmationsRateLimiter is not null)
{
await _outstandingPublisherConfirmationsRateLimiter.DisposeAsync()
Expand All @@ -621,7 +598,6 @@ await _outstandingPublisherConfirmationsRateLimiter.DisposeAsync()
finally
{
_disposed = true;
_isDisposing = false;
}
}

Expand Down Expand Up @@ -718,9 +694,11 @@ protected virtual ulong AdjustDeliveryTag(ulong deliveryTag)

protected async Task<bool> HandleChannelCloseAsync(IncomingCommand cmd, CancellationToken cancellationToken)
{
lock (_locker)
var serverOriginatedChannelCloseTcs = _serverOriginatedChannelCloseTcs;
if (serverOriginatedChannelCloseTcs is null)
{
_serverOriginatedChannelCloseTcs ??= new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);
// Attempt to assign the new TCS only if _tcs is still null
_ = Interlocked.CompareExchange(ref _serverOriginatedChannelCloseTcs, new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously), null);
}

try
Expand Down
37 changes: 10 additions & 27 deletions projects/RabbitMQ.Client/Impl/Connection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Diagnostics.CodeAnalysis;
using System.IO;
using System.Runtime.CompilerServices;
using System.Threading;
Expand All @@ -46,8 +47,7 @@ namespace RabbitMQ.Client.Framing
internal sealed partial class Connection : IConnection
{
private bool _disposed;
private bool _isDisposing;
private readonly object _locker = new();
private int _disposeSignaled;
private volatile bool _closed;

private readonly ConnectionConfig _config;
Expand Down Expand Up @@ -487,32 +487,15 @@ internal ValueTask WriteAsync(RentedMemory frames, CancellationToken cancellatio
return _frameHandler.WriteAsync(frames, cancellationToken);
}

public void Dispose()
{
if (_disposed)
{
return;
}

DisposeAsync().AsTask().GetAwaiter().GetResult();
}
public void Dispose() => DisposeAsync().AsTask().GetAwaiter().GetResult();

public async ValueTask DisposeAsync()
{
if (_disposed)
if (Interlocked.Exchange(ref _disposeSignaled, 1) != 0)
{
return;
}

lock (_locker)
{
if (_isDisposing)
{
return;
}
_isDisposing = true;
}

try
{
if (IsOpen)
Expand All @@ -534,7 +517,6 @@ await _channel0.DisposeAsync()
finally
{
_disposed = true;
_isDisposing = false;
}
}

Expand All @@ -543,20 +525,21 @@ private void ThrowIfDisposed()
{
if (_disposed)
{
ThrowObjectDisposedException();
ThrowDisposed();
}

static void ThrowObjectDisposedException()
{
throw new ObjectDisposedException(typeof(Connection).FullName);
}
return;

[DoesNotReturn]
static void ThrowDisposed() => throw new ObjectDisposedException(typeof(Connection).FullName);
}

public override string ToString()
{
return $"Connection({_id},{Endpoint})";
}

[DoesNotReturn]
private static void ThrowAlreadyClosedException(ShutdownEventArgs closeReason)
{
throw new AlreadyClosedException(closeReason);
Expand Down
Loading