From a3c4ec557294956a5f5d301ec79ded7ab2ec3a51 Mon Sep 17 00:00:00 2001 From: Luke Bakken Date: Fri, 24 Jan 2025 13:11:54 -0800 Subject: [PATCH 1/2] Create cancellation token from `timeout` Fixes #1759 When passing a timeout of 0, `DisposeAsync` would block forever after closing a connection. This change ensures that the timeout is used in a cancellation token. --- .../RabbitMQ.Client/IConnectionExtensions.cs | 46 +++++++++++-------- projects/RabbitMQ.Client/Impl/Connection.cs | 4 +- projects/RabbitMQ.Client/Impl/MainSession.cs | 28 ++++++++++- .../Test/Integration/GH/TestGitHubIssues.cs | 16 +++++++ 4 files changed, 72 insertions(+), 22 deletions(-) diff --git a/projects/RabbitMQ.Client/IConnectionExtensions.cs b/projects/RabbitMQ.Client/IConnectionExtensions.cs index 33b9bc64c4..c2c9ba84a1 100644 --- a/projects/RabbitMQ.Client/IConnectionExtensions.cs +++ b/projects/RabbitMQ.Client/IConnectionExtensions.cs @@ -59,10 +59,11 @@ public static Task CloseAsync(this IConnection connection, ushort reasonCode, st /// To wait infinitely for the close operations to complete use . /// /// - public static Task CloseAsync(this IConnection connection, TimeSpan timeout) + public static async Task CloseAsync(this IConnection connection, TimeSpan timeout) { - return connection.CloseAsync(Constants.ReplySuccess, "Goodbye", timeout, false, - CancellationToken.None); + using var cts = new CancellationTokenSource(timeout); + await connection.CloseAsync(Constants.ReplySuccess, "Goodbye", timeout, false, cts.Token) + .ConfigureAwait(false); } /// @@ -82,10 +83,11 @@ public static Task CloseAsync(this IConnection connection, TimeSpan timeout) /// Operation timeout. /// /// - public static Task CloseAsync(this IConnection connection, ushort reasonCode, string reasonText, TimeSpan timeout) + public static async Task CloseAsync(this IConnection connection, ushort reasonCode, string reasonText, TimeSpan timeout) { - return connection.CloseAsync(reasonCode, reasonText, timeout, false, - CancellationToken.None); + using var cts = new CancellationTokenSource(timeout); + await connection.CloseAsync(reasonCode, reasonText, timeout, false, cts.Token) + .ConfigureAwait(false); } /// @@ -97,10 +99,12 @@ public static Task CloseAsync(this IConnection connection, ushort reasonCode, st /// during closing connection. ///This method waits infinitely for the in-progress close operation to complete. /// - public static Task AbortAsync(this IConnection connection) + public static async Task AbortAsync(this IConnection connection) { - return connection.CloseAsync(Constants.ReplySuccess, "Connection close forced", InternalConstants.DefaultConnectionAbortTimeout, true, - CancellationToken.None); + using var cts = new CancellationTokenSource(InternalConstants.DefaultConnectionAbortTimeout); + await connection.CloseAsync(Constants.ReplySuccess, + "Connection close forced", InternalConstants.DefaultConnectionAbortTimeout, true, cts.Token) + .ConfigureAwait(false); } /// @@ -116,10 +120,12 @@ public static Task AbortAsync(this IConnection connection) /// A message indicating the reason for closing the connection /// /// - public static Task AbortAsync(this IConnection connection, ushort reasonCode, string reasonText) + public static async Task AbortAsync(this IConnection connection, ushort reasonCode, string reasonText) { - return connection.CloseAsync(reasonCode, reasonText, InternalConstants.DefaultConnectionAbortTimeout, true, - CancellationToken.None); + using var cts = new CancellationTokenSource(InternalConstants.DefaultConnectionAbortTimeout); + await connection.CloseAsync(reasonCode, + reasonText, InternalConstants.DefaultConnectionAbortTimeout, true, cts.Token) + .ConfigureAwait(false); } /// @@ -135,10 +141,12 @@ public static Task AbortAsync(this IConnection connection, ushort reasonCode, st /// To wait infinitely for the close operations to complete use . /// /// - public static Task AbortAsync(this IConnection connection, TimeSpan timeout) + public static async Task AbortAsync(this IConnection connection, TimeSpan timeout) { - return connection.CloseAsync(Constants.ReplySuccess, "Connection close forced", timeout, true, - CancellationToken.None); + using var cts = new CancellationTokenSource(InternalConstants.DefaultConnectionAbortTimeout); + await connection.CloseAsync(Constants.ReplySuccess, + "Connection close forced", timeout, true, cts.Token) + .ConfigureAwait(false); } /// @@ -155,10 +163,12 @@ public static Task AbortAsync(this IConnection connection, TimeSpan timeout) /// A message indicating the reason for closing the connection. /// /// - public static Task AbortAsync(this IConnection connection, ushort reasonCode, string reasonText, TimeSpan timeout) + public static async Task AbortAsync(this IConnection connection, ushort reasonCode, string reasonText, TimeSpan timeout) { - return connection.CloseAsync(reasonCode, reasonText, timeout, true, - CancellationToken.None); + using var cts = new CancellationTokenSource(InternalConstants.DefaultConnectionAbortTimeout); + await connection.CloseAsync(reasonCode, + reasonText, timeout, true, cts.Token) + .ConfigureAwait(false); } } } diff --git a/projects/RabbitMQ.Client/Impl/Connection.cs b/projects/RabbitMQ.Client/Impl/Connection.cs index 247543a902..14965c4e75 100644 --- a/projects/RabbitMQ.Client/Impl/Connection.cs +++ b/projects/RabbitMQ.Client/Impl/Connection.cs @@ -330,8 +330,6 @@ internal async Task CloseAsync(ShutdownEventArgs reason, bool abort, TimeSpan ti } else { - cancellationToken.ThrowIfCancellationRequested(); - await OnShutdownAsync(reason) .ConfigureAwait(false); await _session0.SetSessionClosingAsync(false, cancellationToken) @@ -518,7 +516,6 @@ await this.AbortAsync() } _session0.Dispose(); - _mainLoopCts.Dispose(); await _channel0.DisposeAsync() .ConfigureAwait(false); @@ -529,6 +526,7 @@ await _channel0.DisposeAsync() } finally { + _mainLoopCts.Dispose(); _disposed = true; } } diff --git a/projects/RabbitMQ.Client/Impl/MainSession.cs b/projects/RabbitMQ.Client/Impl/MainSession.cs index 80abd9899b..935d4a65db 100644 --- a/projects/RabbitMQ.Client/Impl/MainSession.cs +++ b/projects/RabbitMQ.Client/Impl/MainSession.cs @@ -47,6 +47,7 @@ internal sealed class MainSession : Session, IDisposable private volatile bool _closeIsServerInitiated; private volatile bool _closing; private readonly SemaphoreSlim _closingSemaphore = new SemaphoreSlim(1, 1); + private bool _disposed = false; public MainSession(Connection connection, uint maxBodyLength) : base(connection, 0, maxBodyLength) @@ -83,6 +84,13 @@ public override Task HandleFrameAsync(InboundFrame frame, CancellationToken canc public async Task SetSessionClosingAsync(bool closeIsServerInitiated, CancellationToken cancellationToken) { + if (_disposed) + { + _closing = true; + _closeIsServerInitiated = closeIsServerInitiated; + return; + } + if (await _closingSemaphore.WaitAsync(InternalConstants.DefaultConnectionAbortTimeout, cancellationToken) .ConfigureAwait(false)) { @@ -122,6 +130,24 @@ public override ValueTask TransmitAsync(in T cmd, CancellationToken cancellat return base.TransmitAsync(in cmd, cancellationToken); } - public void Dispose() => ((IDisposable)_closingSemaphore).Dispose(); + public void Dispose() + { + if (_disposed) + { + return; + } + + try + { + _closingSemaphore.Dispose(); + } + catch + { + } + finally + { + _disposed = true; + } + } } } diff --git a/projects/Test/Integration/GH/TestGitHubIssues.cs b/projects/Test/Integration/GH/TestGitHubIssues.cs index 7f8810fb65..f288c28224 100644 --- a/projects/Test/Integration/GH/TestGitHubIssues.cs +++ b/projects/Test/Integration/GH/TestGitHubIssues.cs @@ -131,5 +131,21 @@ public async Task TestHeartbeatTimeoutValue_GH1756() Assert.True(_conn.Heartbeat != default); } + + [Fact] + public async Task DisposeWhileCatchingTimeoutDeadlocksRepro_GH1759() + { + _connFactory = new ConnectionFactory(); + _conn = await _connFactory.CreateConnectionAsync(); + try + { + await _conn.CloseAsync(TimeSpan.Zero); + } + catch (Exception) + { + } + + await _conn.DisposeAsync(); + } } } From fa9bc433d940d0d91971a0ce29f7343cd91580f4 Mon Sep 17 00:00:00 2001 From: Luke Bakken Date: Thu, 30 Jan 2025 15:37:07 -0800 Subject: [PATCH 2/2] * Create `CancellationToken` in internal `CloseAsync` method. Thanks @JanEggers --- .../RabbitMQ.Client/IConnectionExtensions.cs | 46 ++++++++----------- projects/RabbitMQ.Client/Impl/Connection.cs | 11 +++-- 2 files changed, 25 insertions(+), 32 deletions(-) diff --git a/projects/RabbitMQ.Client/IConnectionExtensions.cs b/projects/RabbitMQ.Client/IConnectionExtensions.cs index c2c9ba84a1..33b9bc64c4 100644 --- a/projects/RabbitMQ.Client/IConnectionExtensions.cs +++ b/projects/RabbitMQ.Client/IConnectionExtensions.cs @@ -59,11 +59,10 @@ public static Task CloseAsync(this IConnection connection, ushort reasonCode, st /// To wait infinitely for the close operations to complete use . /// /// - public static async Task CloseAsync(this IConnection connection, TimeSpan timeout) + public static Task CloseAsync(this IConnection connection, TimeSpan timeout) { - using var cts = new CancellationTokenSource(timeout); - await connection.CloseAsync(Constants.ReplySuccess, "Goodbye", timeout, false, cts.Token) - .ConfigureAwait(false); + return connection.CloseAsync(Constants.ReplySuccess, "Goodbye", timeout, false, + CancellationToken.None); } /// @@ -83,11 +82,10 @@ await connection.CloseAsync(Constants.ReplySuccess, "Goodbye", timeout, false, c /// Operation timeout. /// /// - public static async Task CloseAsync(this IConnection connection, ushort reasonCode, string reasonText, TimeSpan timeout) + public static Task CloseAsync(this IConnection connection, ushort reasonCode, string reasonText, TimeSpan timeout) { - using var cts = new CancellationTokenSource(timeout); - await connection.CloseAsync(reasonCode, reasonText, timeout, false, cts.Token) - .ConfigureAwait(false); + return connection.CloseAsync(reasonCode, reasonText, timeout, false, + CancellationToken.None); } /// @@ -99,12 +97,10 @@ await connection.CloseAsync(reasonCode, reasonText, timeout, false, cts.Token) /// during closing connection. ///This method waits infinitely for the in-progress close operation to complete. /// - public static async Task AbortAsync(this IConnection connection) + public static Task AbortAsync(this IConnection connection) { - using var cts = new CancellationTokenSource(InternalConstants.DefaultConnectionAbortTimeout); - await connection.CloseAsync(Constants.ReplySuccess, - "Connection close forced", InternalConstants.DefaultConnectionAbortTimeout, true, cts.Token) - .ConfigureAwait(false); + return connection.CloseAsync(Constants.ReplySuccess, "Connection close forced", InternalConstants.DefaultConnectionAbortTimeout, true, + CancellationToken.None); } /// @@ -120,12 +116,10 @@ await connection.CloseAsync(Constants.ReplySuccess, /// A message indicating the reason for closing the connection /// /// - public static async Task AbortAsync(this IConnection connection, ushort reasonCode, string reasonText) + public static Task AbortAsync(this IConnection connection, ushort reasonCode, string reasonText) { - using var cts = new CancellationTokenSource(InternalConstants.DefaultConnectionAbortTimeout); - await connection.CloseAsync(reasonCode, - reasonText, InternalConstants.DefaultConnectionAbortTimeout, true, cts.Token) - .ConfigureAwait(false); + return connection.CloseAsync(reasonCode, reasonText, InternalConstants.DefaultConnectionAbortTimeout, true, + CancellationToken.None); } /// @@ -141,12 +135,10 @@ await connection.CloseAsync(reasonCode, /// To wait infinitely for the close operations to complete use . /// /// - public static async Task AbortAsync(this IConnection connection, TimeSpan timeout) + public static Task AbortAsync(this IConnection connection, TimeSpan timeout) { - using var cts = new CancellationTokenSource(InternalConstants.DefaultConnectionAbortTimeout); - await connection.CloseAsync(Constants.ReplySuccess, - "Connection close forced", timeout, true, cts.Token) - .ConfigureAwait(false); + return connection.CloseAsync(Constants.ReplySuccess, "Connection close forced", timeout, true, + CancellationToken.None); } /// @@ -163,12 +155,10 @@ await connection.CloseAsync(Constants.ReplySuccess, /// A message indicating the reason for closing the connection. /// /// - public static async Task AbortAsync(this IConnection connection, ushort reasonCode, string reasonText, TimeSpan timeout) + public static Task AbortAsync(this IConnection connection, ushort reasonCode, string reasonText, TimeSpan timeout) { - using var cts = new CancellationTokenSource(InternalConstants.DefaultConnectionAbortTimeout); - await connection.CloseAsync(reasonCode, - reasonText, timeout, true, cts.Token) - .ConfigureAwait(false); + return connection.CloseAsync(reasonCode, reasonText, timeout, true, + CancellationToken.None); } } } diff --git a/projects/RabbitMQ.Client/Impl/Connection.cs b/projects/RabbitMQ.Client/Impl/Connection.cs index 14965c4e75..2bf1ccc9a3 100644 --- a/projects/RabbitMQ.Client/Impl/Connection.cs +++ b/projects/RabbitMQ.Client/Impl/Connection.cs @@ -320,6 +320,9 @@ public Task CloseAsync(ushort reasonCode, string reasonText, TimeSpan timeout, b /// internal async Task CloseAsync(ShutdownEventArgs reason, bool abort, TimeSpan timeout, CancellationToken cancellationToken) { + using var timeoutCts = new CancellationTokenSource(timeout); + using var cts = CancellationTokenSource.CreateLinkedTokenSource(timeoutCts.Token, cancellationToken); + if (false == SetCloseReason(reason)) { // close reason is already set @@ -332,7 +335,7 @@ internal async Task CloseAsync(ShutdownEventArgs reason, bool abort, TimeSpan ti { await OnShutdownAsync(reason) .ConfigureAwait(false); - await _session0.SetSessionClosingAsync(false, cancellationToken) + await _session0.SetSessionClosingAsync(false, cts.Token) .ConfigureAwait(false); try @@ -341,7 +344,7 @@ await _session0.SetSessionClosingAsync(false, cancellationToken) if (false == _closed) { var method = new ConnectionClose(reason.ReplyCode, reason.ReplyText, 0, 0); - await _session0.TransmitAsync(method, cancellationToken) + await _session0.TransmitAsync(method, cts.Token) .ConfigureAwait(false); } } @@ -390,14 +393,14 @@ await _session0.TransmitAsync(method, cancellationToken) try { - await _mainLoopTask.WaitAsync(timeout, cancellationToken) + await _mainLoopTask.WaitAsync(timeout, cts.Token) .ConfigureAwait(false); } catch { try { - await _frameHandler.CloseAsync(cancellationToken) + await _frameHandler.CloseAsync(cts.Token) .ConfigureAwait(false); } catch