diff --git a/projects/RabbitMQ.Client/Impl/Channel.PublisherConfirms.cs b/projects/RabbitMQ.Client/Impl/Channel.PublisherConfirms.cs index e16836965..4ad9475df 100644 --- a/projects/RabbitMQ.Client/Impl/Channel.PublisherConfirms.cs +++ b/projects/RabbitMQ.Client/Impl/Channel.PublisherConfirms.cs @@ -344,7 +344,24 @@ private async Task MaybeEndPublisherConfirmationTracking(PublisherConfirmationIn { if (_publisherConfirmationsEnabled) { - _confirmSemaphore.Release(); + try + { + _confirmSemaphore.Release(); + } + catch (SemaphoreFullException ex) + { + /* + * rabbitmq/rabbitmq-dotnet-client-1793 + * If MaybeStartPublisherConfirmationTracking throws an exception *prior* to acquiring + * _confirmSemaphore, the above Release() call will throw SemaphoreFullException. + * In "normal" cases, publisherConfirmationInfo will thus be null, but if not, throw + * a "bug found" exception here. + */ + if (publisherConfirmationInfo is not null) + { + throw new InvalidOperationException(InternalConstants.BugFound, ex); + } + } if (publisherConfirmationInfo is not null) { diff --git a/projects/RabbitMQ.Client/ThrottlingRateLimiter.cs b/projects/RabbitMQ.Client/ThrottlingRateLimiter.cs index b6bd99802..eb26ef1dc 100644 --- a/projects/RabbitMQ.Client/ThrottlingRateLimiter.cs +++ b/projects/RabbitMQ.Client/ThrottlingRateLimiter.cs @@ -134,13 +134,24 @@ protected override void Dispose(bool disposing) private int CalculateDelay() { - long? availablePermits = _concurrencyLimiter.GetStatistics()?.CurrentAvailablePermits; - if (!(availablePermits < _throttlingThreshold)) + RateLimiterStatistics? rateLimiterStatistics = _concurrencyLimiter.GetStatistics(); + if (rateLimiterStatistics is null) { return 0; } - return (int)((1.0 - availablePermits / (double)_maxConcurrency) * 1000); + long availablePermits = rateLimiterStatistics.CurrentAvailablePermits; + if (availablePermits >= _throttlingThreshold) + { + /* + * Note: do NOT add a delay because available permits exceeeds the threshold + * below which throttling begins + */ + return 0; + } + + double percentageUsed = 1.0 - (availablePermits / (double)_maxConcurrency); + return (int)(percentageUsed * 1000); } } } diff --git a/projects/Test/Integration/GH/NeverAcquiredRateLimiter.cs b/projects/Test/Integration/GH/NeverAcquiredRateLimiter.cs new file mode 100644 index 000000000..ccafd392b --- /dev/null +++ b/projects/Test/Integration/GH/NeverAcquiredRateLimiter.cs @@ -0,0 +1,68 @@ +// This source code is dual-licensed under the Apache License, version +// 2.0, and the Mozilla Public License, version 2.0. +// +// The APL v2.0: +// +//--------------------------------------------------------------------------- +// Copyright (c) 2007-2024 Broadcom. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +//--------------------------------------------------------------------------- +// +// The MPL v2.0: +// +//--------------------------------------------------------------------------- +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at https://mozilla.org/MPL/2.0/. +// +// Copyright (c) 2007-2024 Broadcom. All Rights Reserved. +//--------------------------------------------------------------------------- + +using System; +using System.Collections.Generic; +using System.Threading; +using System.Threading.RateLimiting; +using System.Threading.Tasks; + +namespace Test.Integration.GH +{ + public class NeverAcquiredRateLimiter : RateLimiter + { + public override TimeSpan? IdleDuration => throw new NotImplementedException(); + public override RateLimiterStatistics GetStatistics() => throw new NotImplementedException(); + + protected override ValueTask AcquireAsyncCore(int permitCount, CancellationToken cancellationToken) + { + return new ValueTask(new NotAcquiredRateLimitLease()); + } + + protected override RateLimitLease AttemptAcquireCore(int permitCount) + { + return new NotAcquiredRateLimitLease(); + } + } + + public class NotAcquiredRateLimitLease : RateLimitLease + { + public override bool IsAcquired => false; + + public override IEnumerable MetadataNames => []; + + public override bool TryGetMetadata(string metadataName, out object metadata) + { + metadata = string.Empty; + return true; + } + } +} diff --git a/projects/Test/Integration/GH/TestGitHubIssues.cs b/projects/Test/Integration/GH/TestGitHubIssues.cs index dfee6e720..844202d89 100644 --- a/projects/Test/Integration/GH/TestGitHubIssues.cs +++ b/projects/Test/Integration/GH/TestGitHubIssues.cs @@ -329,5 +329,58 @@ await ch.BasicPublishAsync(exchange: string.Empty, routingKey: queueName, _output.WriteLine("saw {0} publishExceptions", publishExceptions.Count); } } + + [Fact] + public async Task MaybeSomethingUpWithRateLimiter_GH1793() + { + const int messageCount = 16; + + _connFactory = new ConnectionFactory + { + AutomaticRecoveryEnabled = true + }; + + _conn = await _connFactory.CreateConnectionAsync(); + + var channelOpts = new CreateChannelOptions( + publisherConfirmationsEnabled: true, + publisherConfirmationTrackingEnabled: true, + outstandingPublisherConfirmationsRateLimiter: new NeverAcquiredRateLimiter() + ); + + _channel = await _conn.CreateChannelAsync(channelOpts); + + var properties = new BasicProperties + { + DeliveryMode = DeliveryModes.Persistent + }; + + for (int i = 0; i < messageCount; i++) + { + int retryCount = 0; + const int maxRetries = 3; + while (retryCount <= maxRetries) + { + try + { + byte[] bytes = Encoding.UTF8.GetBytes("message"); + await Assert.ThrowsAnyAsync(async () => + { + await _channel.BasicPublishAsync(string.Empty, string.Empty, true, properties, bytes); + }); + break; + } + catch (SemaphoreFullException ex0) + { + _output.WriteLine("{0} ex: {1}", _testDisplayName, ex0); + retryCount++; + } + catch (PublishException) + { + retryCount++; + } + } + } + } } }