From 7d81edc55076e38b32b0d1abf2ee88a657216e7f Mon Sep 17 00:00:00 2001 From: Luke Bakken Date: Thu, 27 Mar 2025 09:27:54 -0700 Subject: [PATCH 1/7] Investigate & fix `SemaphoreFullException` Fixes #1818 Start by modifying test to concurrently publish to the same `IChannel` instance. --- .../Test/Integration/TestFloodPublishing.cs | 36 +++++++++++-------- 1 file changed, 21 insertions(+), 15 deletions(-) diff --git a/projects/Test/Integration/TestFloodPublishing.cs b/projects/Test/Integration/TestFloodPublishing.cs index 5b700ed51..89fabf598 100644 --- a/projects/Test/Integration/TestFloodPublishing.cs +++ b/projects/Test/Integration/TestFloodPublishing.cs @@ -92,31 +92,37 @@ public async Task TestUnthrottledFloodPublishing() return Task.CompletedTask; }; - var publishTasks = new List(); var stopwatch = Stopwatch.StartNew(); - int i = 0; int publishCount = 0; try { - for (i = 0; i < 65535 * 64; i++) + var tasks = new List(); + for (int j = 0; j < 64; j++) { - if (i % 65536 == 0) + tasks.Add(Task.Run(async () => { - if (stopwatch.Elapsed > FiveSeconds) + var publishTasks = new List(); + for (int i = 0; i < 65536 * 2; i++) { - break; - } - } + if (stopwatch.Elapsed > FiveSeconds) + { + await Task.WhenAll(publishTasks).WaitAsync(ShortSpan); + publishTasks.Clear(); + break; + } - publishCount++; - publishTasks.Add(_channel.BasicPublishAsync(CachedString.Empty, CachedString.Empty, _body).AsTask()); + Interlocked.Increment(ref publishCount); + publishTasks.Add(_channel.BasicPublishAsync(CachedString.Empty, CachedString.Empty, _body).AsTask()); - if (i % 500 == 0) - { - await Task.WhenAll(publishTasks).WaitAsync(ShortSpan); - publishTasks.Clear(); - } + if (i % 500 == 0) + { + await Task.WhenAll(publishTasks).WaitAsync(ShortSpan); + publishTasks.Clear(); + } + } + })); } + await Task.WhenAll(tasks).WaitAsync(WaitSpan); } finally { From e633973be9d36761de54971bc7f9e04834fa9b22 Mon Sep 17 00:00:00 2001 From: Luke Bakken Date: Thu, 27 Mar 2025 14:32:28 -0700 Subject: [PATCH 2/7] Make copy --- projects/RabbitMQ.Client/Impl/Channel.PublisherConfirms.cs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/projects/RabbitMQ.Client/Impl/Channel.PublisherConfirms.cs b/projects/RabbitMQ.Client/Impl/Channel.PublisherConfirms.cs index d2ca45247..9e6e14a9e 100644 --- a/projects/RabbitMQ.Client/Impl/Channel.PublisherConfirms.cs +++ b/projects/RabbitMQ.Client/Impl/Channel.PublisherConfirms.cs @@ -180,7 +180,7 @@ private void HandleAck(ulong deliveryTag, bool multiple) { if (multiple) { - foreach (KeyValuePair> pair in _confirmsTaskCompletionSources) + foreach (KeyValuePair> pair in _confirmsTaskCompletionSources.ToArray()) { if (pair.Key <= deliveryTag) { @@ -206,7 +206,7 @@ private void HandleNack(ulong deliveryTag, bool multiple, bool isReturn) { if (multiple) { - foreach (KeyValuePair> pair in _confirmsTaskCompletionSources) + foreach (KeyValuePair> pair in _confirmsTaskCompletionSources.ToArray()) { if (pair.Key <= deliveryTag) { From 71ccda0ece586aa0e6995d4f73d6ebb45563a165 Mon Sep 17 00:00:00 2001 From: Luke Bakken Date: Thu, 27 Mar 2025 14:56:54 -0700 Subject: [PATCH 3/7] * Add queue arguments to test to simulate nacks. --- .../Test/Integration/TestFloodPublishing.cs | 73 ++++++++++++++++--- 1 file changed, 61 insertions(+), 12 deletions(-) diff --git a/projects/Test/Integration/TestFloodPublishing.cs b/projects/Test/Integration/TestFloodPublishing.cs index 89fabf598..17fff7400 100644 --- a/projects/Test/Integration/TestFloodPublishing.cs +++ b/projects/Test/Integration/TestFloodPublishing.cs @@ -30,6 +30,7 @@ //--------------------------------------------------------------------------- using System; +using System.Collections.Concurrent; using System.Collections.Generic; using System.Diagnostics; using System.Threading; @@ -43,7 +44,7 @@ namespace Test.Integration { public class TestFloodPublishing : IntegrationFixture { - private static readonly TimeSpan FiveSeconds = TimeSpan.FromSeconds(5); + private static readonly TimeSpan ElapsedMax = TimeSpan.FromSeconds(10); private readonly byte[] _body = GetRandomBody(2048); public TestFloodPublishing(ITestOutputHelper output) : base(output) @@ -92,36 +93,82 @@ public async Task TestUnthrottledFloodPublishing() return Task.CompletedTask; }; + var queueArguments = new Dictionary + { + ["x-max-length"] = 131072, + ["x-overflow"] = "reject-publish" + }; + + QueueDeclareOk q = await _channel.QueueDeclareAsync(queue: string.Empty, + passive: false, durable: false, exclusive: true, autoDelete: true, arguments: queueArguments); + string queueName = q.QueueName; + + var exceptions = new ConcurrentBag(); var stopwatch = Stopwatch.StartNew(); int publishCount = 0; try { var tasks = new List(); - for (int j = 0; j < 64; j++) + for (int j = 0; j < 8; j++) { tasks.Add(Task.Run(async () => { - var publishTasks = new List(); - for (int i = 0; i < 65536 * 2; i++) + var publishTasks = new List(); + for (int i = 0; i < 65536; i++) { - if (stopwatch.Elapsed > FiveSeconds) + if (stopwatch.Elapsed > ElapsedMax) { - await Task.WhenAll(publishTasks).WaitAsync(ShortSpan); + foreach (ValueTask pt in publishTasks) + { + try + { + await pt; + } + catch (Exception ex) + { + exceptions.Add(ex); + } + } publishTasks.Clear(); - break; + return; } Interlocked.Increment(ref publishCount); - publishTasks.Add(_channel.BasicPublishAsync(CachedString.Empty, CachedString.Empty, _body).AsTask()); + publishTasks.Add(_channel.BasicPublishAsync(exchange: string.Empty, routingKey: queueName, mandatory: true, + body: _body)); - if (i % 500 == 0) + if (i % 128 == 0) { - await Task.WhenAll(publishTasks).WaitAsync(ShortSpan); + foreach (ValueTask pt in publishTasks) + { + try + { + await pt; + } + catch (Exception ex) + { + exceptions.Add(ex); + } + } publishTasks.Clear(); } } + + foreach (ValueTask pt in publishTasks) + { + try + { + await pt; + } + catch (Exception ex) + { + exceptions.Add(ex); + } + } + publishTasks.Clear(); })); } + await Task.WhenAll(tasks).WaitAsync(WaitSpan); } finally @@ -131,9 +178,11 @@ public async Task TestUnthrottledFloodPublishing() Assert.True(_conn.IsOpen); Assert.False(sawUnexpectedShutdown); - if (IsVerbose) + // if (IsVerbose) + if (true) { - _output.WriteLine("[INFO] published {0} messages in {1}", publishCount, stopwatch.Elapsed); + _output.WriteLine("[INFO] published {0} messages in {1}, exceptions: {2}", + publishCount, stopwatch.Elapsed, exceptions.Count); } } From 4c844e1746e2160caf4695637c0c58b97d4b7f4c Mon Sep 17 00:00:00 2001 From: Luke Bakken Date: Thu, 27 Mar 2025 15:29:35 -0700 Subject: [PATCH 4/7] * Consolidate duplicated code. --- .../Test/Integration/TestFloodPublishing.cs | 60 +++++++------------ 1 file changed, 22 insertions(+), 38 deletions(-) diff --git a/projects/Test/Integration/TestFloodPublishing.cs b/projects/Test/Integration/TestFloodPublishing.cs index 17fff7400..53723e85a 100644 --- a/projects/Test/Integration/TestFloodPublishing.cs +++ b/projects/Test/Integration/TestFloodPublishing.cs @@ -104,6 +104,24 @@ public async Task TestUnthrottledFloodPublishing() string queueName = q.QueueName; var exceptions = new ConcurrentBag(); + + async Task WaitPublishTasksAsync(ICollection publishTasks) + { + foreach (ValueTask pt in publishTasks) + { + try + { + await pt; + } + catch (Exception ex) + { + exceptions.Add(ex); + } + } + + publishTasks.Clear(); + } + var stopwatch = Stopwatch.StartNew(); int publishCount = 0; try @@ -118,18 +136,7 @@ public async Task TestUnthrottledFloodPublishing() { if (stopwatch.Elapsed > ElapsedMax) { - foreach (ValueTask pt in publishTasks) - { - try - { - await pt; - } - catch (Exception ex) - { - exceptions.Add(ex); - } - } - publishTasks.Clear(); + await WaitPublishTasksAsync(publishTasks); return; } @@ -139,33 +146,11 @@ public async Task TestUnthrottledFloodPublishing() if (i % 128 == 0) { - foreach (ValueTask pt in publishTasks) - { - try - { - await pt; - } - catch (Exception ex) - { - exceptions.Add(ex); - } - } - publishTasks.Clear(); + await WaitPublishTasksAsync(publishTasks); } } - foreach (ValueTask pt in publishTasks) - { - try - { - await pt; - } - catch (Exception ex) - { - exceptions.Add(ex); - } - } - publishTasks.Clear(); + await WaitPublishTasksAsync(publishTasks); })); } @@ -178,8 +163,7 @@ public async Task TestUnthrottledFloodPublishing() Assert.True(_conn.IsOpen); Assert.False(sawUnexpectedShutdown); - // if (IsVerbose) - if (true) + if (IsVerbose) { _output.WriteLine("[INFO] published {0} messages in {1}, exceptions: {2}", publishCount, stopwatch.Elapsed, exceptions.Count); From ab1f3586578fd3817f75d524003634716e05e9c0 Mon Sep 17 00:00:00 2001 From: Luke Bakken Date: Fri, 28 Mar 2025 13:36:30 -0700 Subject: [PATCH 5/7] * Remove TCS from `_confirmsTaskCompletionSources` if there is an `OperationCanceledException`. --- projects/RabbitMQ.Client/Impl/Channel.PublisherConfirms.cs | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/projects/RabbitMQ.Client/Impl/Channel.PublisherConfirms.cs b/projects/RabbitMQ.Client/Impl/Channel.PublisherConfirms.cs index 9e6e14a9e..76541204b 100644 --- a/projects/RabbitMQ.Client/Impl/Channel.PublisherConfirms.cs +++ b/projects/RabbitMQ.Client/Impl/Channel.PublisherConfirms.cs @@ -380,6 +380,11 @@ private async Task MaybeEndPublisherConfirmationTracking(PublisherConfirmationIn await publisherConfirmationInfo.MaybeWaitForConfirmationAsync(cancellationToken) .ConfigureAwait(false); } + catch (OperationCanceledException) + { + _confirmsTaskCompletionSources.Remove(publisherConfirmationInfo.PublishSequenceNumber, out _); + throw; + } finally { publisherConfirmationInfo.Dispose(); From 6c0488ef1413380dd4004a5856a3b265d4c27ce7 Mon Sep 17 00:00:00 2001 From: Luke Bakken Date: Fri, 28 Mar 2025 13:58:55 -0700 Subject: [PATCH 6/7] * Consistently use `TryRemove` --- .../Impl/Channel.PublisherConfirms.cs | 16 ++++++++++------ 1 file changed, 10 insertions(+), 6 deletions(-) diff --git a/projects/RabbitMQ.Client/Impl/Channel.PublisherConfirms.cs b/projects/RabbitMQ.Client/Impl/Channel.PublisherConfirms.cs index 76541204b..2c074ffec 100644 --- a/projects/RabbitMQ.Client/Impl/Channel.PublisherConfirms.cs +++ b/projects/RabbitMQ.Client/Impl/Channel.PublisherConfirms.cs @@ -184,8 +184,10 @@ private void HandleAck(ulong deliveryTag, bool multiple) { if (pair.Key <= deliveryTag) { - pair.Value.SetResult(true); - _confirmsTaskCompletionSources.Remove(pair.Key, out _); + if (_confirmsTaskCompletionSources.TryRemove(pair.Key, out TaskCompletionSource? tcs)) + { + tcs.SetResult(true); + } } } } @@ -210,14 +212,16 @@ private void HandleNack(ulong deliveryTag, bool multiple, bool isReturn) { if (pair.Key <= deliveryTag) { - pair.Value.SetException(new PublishException(pair.Key, isReturn)); - _confirmsTaskCompletionSources.Remove(pair.Key, out _); + if (_confirmsTaskCompletionSources.TryRemove(pair.Key, out TaskCompletionSource? tcs)) + { + tcs.SetException(new PublishException(pair.Key, isReturn)); + } } } } else { - if (_confirmsTaskCompletionSources.Remove(deliveryTag, out TaskCompletionSource? tcs)) + if (_confirmsTaskCompletionSources.TryRemove(deliveryTag, out TaskCompletionSource? tcs)) { tcs.SetException(new PublishException(deliveryTag, isReturn)); } @@ -382,7 +386,7 @@ await publisherConfirmationInfo.MaybeWaitForConfirmationAsync(cancellationToken) } catch (OperationCanceledException) { - _confirmsTaskCompletionSources.Remove(publisherConfirmationInfo.PublishSequenceNumber, out _); + _confirmsTaskCompletionSources.TryRemove(publisherConfirmationInfo.PublishSequenceNumber, out _); throw; } finally From 0ae8d392da980f77bae7e0c20972a0a9cb138ae6 Mon Sep 17 00:00:00 2001 From: Luke Bakken Date: Tue, 8 Apr 2025 08:41:31 -0700 Subject: [PATCH 7/7] * Remove use of `Debug.Assert` --- .../Impl/AsyncRpcContinuations.cs | 19 ++++-- .../Impl/Channel.PublisherConfirms.cs | 10 ++-- projects/RabbitMQ.Client/Impl/Channel.cs | 58 +++++++------------ 3 files changed, 41 insertions(+), 46 deletions(-) diff --git a/projects/RabbitMQ.Client/Impl/AsyncRpcContinuations.cs b/projects/RabbitMQ.Client/Impl/AsyncRpcContinuations.cs index 35dc1cedd..0bbef38e9 100644 --- a/projects/RabbitMQ.Client/Impl/AsyncRpcContinuations.cs +++ b/projects/RabbitMQ.Client/Impl/AsyncRpcContinuations.cs @@ -30,7 +30,6 @@ //--------------------------------------------------------------------------- using System; -using System.Diagnostics; using System.Runtime.CompilerServices; using System.Threading; using System.Threading.Tasks; @@ -280,10 +279,20 @@ protected override async Task DoHandleCommandAsync(IncomingCommand cmd) { if (cmd.CommandId == ProtocolCommandId.BasicCancelOk) { - Debug.Assert(_consumerTag == new BasicCancelOk(cmd.MethodSpan)._consumerTag); - await _consumerDispatcher.HandleBasicCancelOkAsync(_consumerTag, CancellationToken) - .ConfigureAwait(false); - _tcs.SetResult(true); + var result = new BasicCancelOk(cmd.MethodSpan); + if (_consumerTag == result._consumerTag) + { + await _consumerDispatcher.HandleBasicCancelOkAsync(_consumerTag, CancellationToken) + .ConfigureAwait(false); + _tcs.SetResult(true); + } + else + { + string msg = string.Format("Consumer tag '{0}' does not match expected consumer tag for basic.cancel operation {1}", + result._consumerTag, _consumerTag); + var ex = new InvalidOperationException(msg); + _tcs.SetException(ex); + } } else { diff --git a/projects/RabbitMQ.Client/Impl/Channel.PublisherConfirms.cs b/projects/RabbitMQ.Client/Impl/Channel.PublisherConfirms.cs index 2c074ffec..b22625510 100644 --- a/projects/RabbitMQ.Client/Impl/Channel.PublisherConfirms.cs +++ b/projects/RabbitMQ.Client/Impl/Channel.PublisherConfirms.cs @@ -32,7 +32,6 @@ using System; using System.Collections.Concurrent; using System.Collections.Generic; -using System.Diagnostics; using System.Runtime.CompilerServices; using System.Text; using System.Threading; @@ -148,13 +147,14 @@ private async Task MaybeConfirmSelect(CancellationToken cancellationToken) enqueued = Enqueue(k); var method = new ConfirmSelect(false); + await ModelSendAsync(in method, k.CancellationToken) .ConfigureAwait(false); - bool result = await k; - Debug.Assert(result); - - return; + if (false == await k) + { + throw new InvalidOperationException(InternalConstants.BugFound); + } } finally { diff --git a/projects/RabbitMQ.Client/Impl/Channel.cs b/projects/RabbitMQ.Client/Impl/Channel.cs index b573d30e2..0bfc3e2b9 100644 --- a/projects/RabbitMQ.Client/Impl/Channel.cs +++ b/projects/RabbitMQ.Client/Impl/Channel.cs @@ -234,8 +234,7 @@ await ModelSendAsync(in method, k.CancellationToken) .ConfigureAwait(false); } - bool result = await k; - Debug.Assert(result); + AssertResultIsTrue(await k); await ConsumerDispatcher.WaitForShutdownAsync() .ConfigureAwait(false); @@ -387,8 +386,7 @@ await ModelSendAsync(in method, k.CancellationToken) try { - bool result = await k; - Debug.Assert(result); + AssertResultIsTrue(await k); await MaybeConfirmSelect(cancellationToken) .ConfigureAwait(false); @@ -465,6 +463,14 @@ await c.HandleCommandAsync(cmd) } } + private static void AssertResultIsTrue(bool result) + { + if (false == result) + { + throw new InvalidOperationException(InternalConstants.BugFound); + } + } + [MethodImpl(MethodImplOptions.AggressiveInlining)] protected ValueTask ModelSendAsync(in T method, CancellationToken cancellationToken) where T : struct, IOutgoingAmqpMethod { @@ -978,8 +984,7 @@ await ModelSendAsync(in method, k.CancellationToken) await ModelSendAsync(in method, k.CancellationToken) .ConfigureAwait(false); - bool result = await k; - Debug.Assert(result); + AssertResultIsTrue(await k); } catch { @@ -1108,9 +1113,7 @@ await ModelSendAsync(in method, k.CancellationToken) try { - bool result = await k; - Debug.Assert(result); - return; + AssertResultIsTrue(await k); } catch (OperationCanceledException) { @@ -1143,9 +1146,7 @@ await ModelSendAsync(in method, k.CancellationToken) try { - bool result = await k; - Debug.Assert(result); - return; + AssertResultIsTrue(await k); } catch (OperationCanceledException) { @@ -1187,9 +1188,7 @@ await ModelSendAsync(in method, k.CancellationToken) try { - bool result = await k; - Debug.Assert(result); - return; + AssertResultIsTrue(await k); } catch (OperationCanceledException) { @@ -1240,9 +1239,7 @@ await ModelSendAsync(in method, k.CancellationToken) try { - bool result = await k; - Debug.Assert(result); - return; + AssertResultIsTrue(await k); } catch (OperationCanceledException) { @@ -1286,8 +1283,7 @@ await ModelSendAsync(in method, k.CancellationToken) try { - bool result = await k; - Debug.Assert(result); + AssertResultIsTrue(await k); } catch (OperationCanceledException) { @@ -1332,8 +1328,7 @@ await ModelSendAsync(in method, k.CancellationToken) try { - bool result = await k; - Debug.Assert(result); + AssertResultIsTrue(await k); } catch (OperationCanceledException) { @@ -1456,8 +1451,7 @@ await ModelSendAsync(in method, k.CancellationToken) try { - bool result = await k; - Debug.Assert(result); + AssertResultIsTrue(await k); } catch (OperationCanceledException) { @@ -1587,9 +1581,7 @@ await ModelSendAsync(in method, k.CancellationToken) try { - bool result = await k; - Debug.Assert(result); - return; + AssertResultIsTrue(await k); } catch (OperationCanceledException) { @@ -1621,9 +1613,7 @@ await ModelSendAsync(in method, k.CancellationToken) try { - bool result = await k; - Debug.Assert(result); - return; + AssertResultIsTrue(await k); } catch (OperationCanceledException) { @@ -1655,9 +1645,7 @@ await ModelSendAsync(in method, k.CancellationToken) try { - bool result = await k; - Debug.Assert(result); - return; + AssertResultIsTrue(await k); } catch (OperationCanceledException) { @@ -1689,9 +1677,7 @@ await ModelSendAsync(in method, k.CancellationToken) try { - bool result = await k; - Debug.Assert(result); - return; + AssertResultIsTrue(await k); } catch (OperationCanceledException) {