From 2718d469f37a578053f799d32449ce7c931443ba Mon Sep 17 00:00:00 2001 From: Luke Bakken Date: Wed, 22 Jan 2025 13:05:41 -0800 Subject: [PATCH] Fix very rare deadlock MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Fixes #1751 * Drain and log pending work when AsyncConsumerDispatcher loop ends. * Make quiescing thread safe. * Ensure that dequeued RPC continuations are always disposed. Found by @DenisMayorko ⭐ --- .../AsyncConsumerDispatcher.cs | 11 +++++++ .../ConsumerDispatcherChannelBase.cs | 29 ++++++++++++----- projects/RabbitMQ.Client/Impl/Channel.cs | 32 +++++++++++-------- .../Impl/RpcContinuationQueue.cs | 5 ++- 4 files changed, 54 insertions(+), 23 deletions(-) diff --git a/projects/RabbitMQ.Client/ConsumerDispatching/AsyncConsumerDispatcher.cs b/projects/RabbitMQ.Client/ConsumerDispatching/AsyncConsumerDispatcher.cs index 071cd537e..d9958fae0 100644 --- a/projects/RabbitMQ.Client/ConsumerDispatching/AsyncConsumerDispatcher.cs +++ b/projects/RabbitMQ.Client/ConsumerDispatching/AsyncConsumerDispatcher.cs @@ -3,6 +3,7 @@ using System.Threading.Tasks; using RabbitMQ.Client.Events; using RabbitMQ.Client.Impl; +using RabbitMQ.Client.Logging; namespace RabbitMQ.Client.ConsumerDispatching { @@ -71,6 +72,16 @@ await _channel.OnCallbackExceptionAsync(CallbackExceptionEventArgs.Build(e, work throw; } } + finally + { + while (_reader.TryRead(out WorkStruct work)) + { + using (work) + { + ESLog.Warn($"discarding consumer work: {work.WorkType}"); + } + } + } } } } diff --git a/projects/RabbitMQ.Client/ConsumerDispatching/ConsumerDispatcherChannelBase.cs b/projects/RabbitMQ.Client/ConsumerDispatching/ConsumerDispatcherChannelBase.cs index b2ff4f2bc..46623f6cf 100644 --- a/projects/RabbitMQ.Client/ConsumerDispatching/ConsumerDispatcherChannelBase.cs +++ b/projects/RabbitMQ.Client/ConsumerDispatching/ConsumerDispatcherChannelBase.cs @@ -44,7 +44,7 @@ internal abstract class ConsumerDispatcherChannelBase : ConsumerDispatcherBase, private readonly System.Threading.Channels.ChannelWriter _writer; private readonly Task _worker; private readonly ushort _concurrency; - private bool _quiesce = false; + private long _isQuiescing; private bool _disposed; internal ConsumerDispatcherChannelBase(Impl.Channel channel, ushort concurrency) @@ -79,7 +79,7 @@ internal ConsumerDispatcherChannelBase(Impl.Channel channel, ushort concurrency) } } - public bool IsShutdown => _quiesce; + public bool IsShutdown => IsQuiescing; public ushort Concurrency => _concurrency; @@ -87,7 +87,7 @@ public async ValueTask HandleBasicConsumeOkAsync(IAsyncBasicConsumer consumer, s { cancellationToken.ThrowIfCancellationRequested(); - if (false == _disposed && false == _quiesce) + if (false == _disposed && false == IsQuiescing) { try { @@ -110,7 +110,7 @@ public async ValueTask HandleBasicDeliverAsync(string consumerTag, ulong deliver { cancellationToken.ThrowIfCancellationRequested(); - if (false == _disposed && false == _quiesce) + if (false == _disposed && false == IsQuiescing) { IAsyncBasicConsumer consumer = GetConsumerOrDefault(consumerTag); var work = WorkStruct.CreateDeliver(consumer, consumerTag, deliveryTag, redelivered, exchange, routingKey, basicProperties, body); @@ -123,7 +123,7 @@ public async ValueTask HandleBasicCancelOkAsync(string consumerTag, Cancellation { cancellationToken.ThrowIfCancellationRequested(); - if (false == _disposed && false == _quiesce) + if (false == _disposed && false == IsQuiescing) { IAsyncBasicConsumer consumer = GetAndRemoveConsumer(consumerTag); WorkStruct work = WorkStruct.CreateCancelOk(consumer, consumerTag); @@ -136,7 +136,7 @@ public async ValueTask HandleBasicCancelAsync(string consumerTag, CancellationTo { cancellationToken.ThrowIfCancellationRequested(); - if (false == _disposed && false == _quiesce) + if (false == _disposed && false == IsQuiescing) { IAsyncBasicConsumer consumer = GetAndRemoveConsumer(consumerTag); WorkStruct work = WorkStruct.CreateCancel(consumer, consumerTag); @@ -147,7 +147,7 @@ await _writer.WriteAsync(work, cancellationToken) public void Quiesce() { - _quiesce = true; + Interlocked.Exchange(ref _isQuiescing, 1); } public async Task WaitForShutdownAsync() @@ -157,7 +157,7 @@ public async Task WaitForShutdownAsync() return; } - if (_quiesce) + if (IsQuiescing) { try { @@ -193,6 +193,19 @@ await _worker } } + protected bool IsQuiescing + { + get + { + if (Interlocked.Read(ref _isQuiescing) == 1) + { + return true; + } + + return false; + } + } + protected sealed override void ShutdownConsumer(IAsyncBasicConsumer consumer, ShutdownEventArgs reason) { _writer.TryWrite(WorkStruct.CreateShutdown(consumer, reason)); diff --git a/projects/RabbitMQ.Client/Impl/Channel.cs b/projects/RabbitMQ.Client/Impl/Channel.cs index 0538107f2..156bb721a 100644 --- a/projects/RabbitMQ.Client/Impl/Channel.cs +++ b/projects/RabbitMQ.Client/Impl/Channel.cs @@ -744,11 +744,13 @@ protected async Task HandleChannelCloseOkAsync(IncomingCommand cmd, Cancel await FinishCloseAsync(cancellationToken) .ConfigureAwait(false); - if (_continuationQueue.TryPeek(out ChannelCloseAsyncRpcContinuation? k)) + if (_continuationQueue.TryPeek(out ChannelCloseAsyncRpcContinuation? k)) { - _continuationQueue.Next(); - await k.HandleCommandAsync(cmd) - .ConfigureAwait(false); + using (IRpcContinuation c = _continuationQueue.Next()) + { + await k.HandleCommandAsync(cmd) + .ConfigureAwait(false); + } } return true; @@ -818,10 +820,12 @@ await ModelSendAsync(in replyMethod, cancellationToken) protected async Task HandleConnectionSecureAsync(IncomingCommand cmd, CancellationToken cancellationToken) { - var k = (ConnectionSecureOrTuneAsyncRpcContinuation)_continuationQueue.Next(); - await k.HandleCommandAsync(new IncomingCommand()) - .ConfigureAwait(false); // release the continuation. - return true; + using (var k = (ConnectionSecureOrTuneAsyncRpcContinuation)_continuationQueue.Next()) + { + await k.HandleCommandAsync(new IncomingCommand()) + .ConfigureAwait(false); // release the continuation. + return true; + } } protected async Task HandleConnectionStartAsync(IncomingCommand cmd, CancellationToken cancellationToken) @@ -848,12 +852,12 @@ await Session.Connection.CloseAsync(reason, false, protected async Task HandleConnectionTuneAsync(IncomingCommand cmd, CancellationToken cancellationToken) { - // Note: `using` here to ensure instance is disposed - using var k = (ConnectionSecureOrTuneAsyncRpcContinuation)_continuationQueue.Next(); - - // Note: releases the continuation and returns the buffers - await k.HandleCommandAsync(cmd) - .ConfigureAwait(false); + using (var k = (ConnectionSecureOrTuneAsyncRpcContinuation)_continuationQueue.Next()) + { + // Note: releases the continuation and returns the buffers + await k.HandleCommandAsync(cmd) + .ConfigureAwait(false); + } return true; } diff --git a/projects/RabbitMQ.Client/Impl/RpcContinuationQueue.cs b/projects/RabbitMQ.Client/Impl/RpcContinuationQueue.cs index 2a017328d..5622abe36 100644 --- a/projects/RabbitMQ.Client/Impl/RpcContinuationQueue.cs +++ b/projects/RabbitMQ.Client/Impl/RpcContinuationQueue.cs @@ -97,7 +97,10 @@ public void Enqueue(IRpcContinuation k) /// public void HandleChannelShutdown(ShutdownEventArgs reason) { - Next().HandleChannelShutdown(reason); + using (IRpcContinuation c = Next()) + { + c.HandleChannelShutdown(reason); + } } ///Retrieve the next waiting continuation.