From f8a18cd43955e8f40d04178430adb83061525b2f Mon Sep 17 00:00:00 2001 From: Daniel Marbach Date: Sun, 20 Oct 2024 18:05:07 +0200 Subject: [PATCH] Leverage `System.Threading.RateLimiting` * Cosmetic formatting * Use `RateLimitLease` instead of `IDisposable` in internal classes. * Move `ThrottlingRateLimiter` to `RabbitMQ.Client` namespace. * Add the outline of a test for throttling publishes based on outstanding confirms. * Do not error exit on Windows when `inet_error` is found in logs * Modify `TestPublisherConfirmationThrottling` to use toxiproxy. --- .ci/windows/gha-log-check.ps1 | 2 +- projects/Directory.Packages.props | 1 + .../RabbitMQ.Client/CreateChannelOptions.cs | 45 +++++-- .../Impl/AutorecoveringChannel.cs | 9 +- .../Impl/AutorecoveringConnection.cs | 9 +- .../Impl/Channel.PublisherConfirms.cs | 72 ++++++----- projects/RabbitMQ.Client/Impl/Channel.cs | 13 +- projects/RabbitMQ.Client/Impl/Connection.cs | 2 +- .../RabbitMQ.Client/PublicAPI.Unshipped.txt | 14 ++- .../RabbitMQ.Client/RabbitMQ.Client.csproj | 1 + .../RabbitMQ.Client/ThrottlingRateLimiter.cs | 117 ++++++++++++++++++ .../PublisherConfirms/PublisherConfirms.cs | 3 +- projects/Test/Integration/TestToxiproxy.cs | 110 +++++++++++++++- 13 files changed, 332 insertions(+), 66 deletions(-) create mode 100644 projects/RabbitMQ.Client/ThrottlingRateLimiter.cs diff --git a/.ci/windows/gha-log-check.ps1 b/.ci/windows/gha-log-check.ps1 index d618f43b1..c79cf87db 100644 --- a/.ci/windows/gha-log-check.ps1 +++ b/.ci/windows/gha-log-check.ps1 @@ -8,6 +8,6 @@ Write-Host "[INFO] looking for errors in '$rabbitmq_log_dir'" If (Get-ChildItem $rabbitmq_log_dir\*.log | Select-String -Quiet -SimpleMatch -Pattern inet_error) { - Write-Error "[WARNING] found inet_error in '$rabbitmq_log_dir'" + Write-Host -ForegroundColor Yellow "[WARNING] found inet_error in '$rabbitmq_log_dir'" exit 0 } diff --git a/projects/Directory.Packages.props b/projects/Directory.Packages.props index 1d76dcea9..2a5f3a6ce 100644 --- a/projects/Directory.Packages.props +++ b/projects/Directory.Packages.props @@ -16,6 +16,7 @@ --> + diff --git a/projects/RabbitMQ.Client/CreateChannelOptions.cs b/projects/RabbitMQ.Client/CreateChannelOptions.cs index 647177972..4fb04afd6 100644 --- a/projects/RabbitMQ.Client/CreateChannelOptions.cs +++ b/projects/RabbitMQ.Client/CreateChannelOptions.cs @@ -1,3 +1,37 @@ +// This source code is dual-licensed under the Apache License, version +// 2.0, and the Mozilla Public License, version 2.0. +// +// The APL v2.0: +// +//--------------------------------------------------------------------------- +// Copyright (c) 2007-2024 Broadcom. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +//--------------------------------------------------------------------------- +// +// The MPL v2.0: +// +//--------------------------------------------------------------------------- +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at https://mozilla.org/MPL/2.0/. +// +// Copyright (c) 2007-2024 Broadcom. All Rights Reserved. +//--------------------------------------------------------------------------- + +using System.Threading.RateLimiting; +using RabbitMQ.Client.Impl; + namespace RabbitMQ.Client { /// @@ -16,15 +50,10 @@ public sealed class CreateChannelOptions public bool PublisherConfirmationTrackingEnabled { get; set; } = false; /// - /// If publisher confirmation tracking is enabled, this represents the number of allowed - /// outstanding publisher confirmations before publishing is blocked. - /// - /// Defaults to 128 - /// - /// Set to null, to allow an unlimited number of outstanding confirmations. - /// + /// If the publisher confirmation tracking is enabled, this represents the rate limiter used to + /// throttle additional attempts to publish once the threshold is reached. /// - public ushort? MaxOutstandingPublisherConfirmations { get; set; } = 128; + public RateLimiter? OutstandingPublisherConfirmationsRateLimiter { get; set; } = new ThrottlingRateLimiter(128); /// /// Set to a value greater than one to enable concurrent processing. For a concurrency greater than one diff --git a/projects/RabbitMQ.Client/Impl/AutorecoveringChannel.cs b/projects/RabbitMQ.Client/Impl/AutorecoveringChannel.cs index 2b8a4d62f..36c3c7dee 100644 --- a/projects/RabbitMQ.Client/Impl/AutorecoveringChannel.cs +++ b/projects/RabbitMQ.Client/Impl/AutorecoveringChannel.cs @@ -33,6 +33,7 @@ using System.Collections.Generic; using System.Runtime.CompilerServices; using System.Threading; +using System.Threading.RateLimiting; using System.Threading.Tasks; using RabbitMQ.Client.ConsumerDispatching; using RabbitMQ.Client.Events; @@ -51,7 +52,7 @@ internal sealed class AutorecoveringChannel : IChannel, IRecoverable private ushort _prefetchCountGlobal; private bool _publisherConfirmationsEnabled = false; private bool _publisherConfirmationTrackingEnabled = false; - private ushort? _maxOutstandingPublisherConfirmations = null; + private RateLimiter? _outstandingPublisherConfirmationsRateLimiter = null; private bool _usesTransactions; private ushort _consumerDispatchConcurrency; @@ -78,14 +79,14 @@ public AutorecoveringChannel(AutorecoveringConnection conn, ushort consumerDispatchConcurrency, bool publisherConfirmationsEnabled, bool publisherConfirmationTrackingEnabled, - ushort? maxOutstandingPublisherConfirmations) + RateLimiter? outstandingPublisherConfirmationsRateLimiter) { _connection = conn; _innerChannel = innerChannel; _consumerDispatchConcurrency = consumerDispatchConcurrency; _publisherConfirmationsEnabled = publisherConfirmationsEnabled; _publisherConfirmationTrackingEnabled = publisherConfirmationTrackingEnabled; - _maxOutstandingPublisherConfirmations = maxOutstandingPublisherConfirmations; + _outstandingPublisherConfirmationsRateLimiter = outstandingPublisherConfirmationsRateLimiter; } public event AsyncEventHandler BasicAcksAsync @@ -173,7 +174,7 @@ internal async Task AutomaticallyRecoverAsync(AutorecoveringConnection con RecoveryAwareChannel newChannel = await conn.CreateNonRecoveringChannelAsync( _publisherConfirmationsEnabled, _publisherConfirmationTrackingEnabled, - _maxOutstandingPublisherConfirmations, + _outstandingPublisherConfirmationsRateLimiter, _consumerDispatchConcurrency, cancellationToken) .ConfigureAwait(false); diff --git a/projects/RabbitMQ.Client/Impl/AutorecoveringConnection.cs b/projects/RabbitMQ.Client/Impl/AutorecoveringConnection.cs index 3e544bb84..1c7adb0aa 100644 --- a/projects/RabbitMQ.Client/Impl/AutorecoveringConnection.cs +++ b/projects/RabbitMQ.Client/Impl/AutorecoveringConnection.cs @@ -33,6 +33,7 @@ using System.Collections.Generic; using System.Runtime.CompilerServices; using System.Threading; +using System.Threading.RateLimiting; using System.Threading.Tasks; using RabbitMQ.Client.Events; using RabbitMQ.Client.Exceptions; @@ -188,7 +189,7 @@ public event AsyncEventHandler RecoveringConsumerAs public async ValueTask CreateNonRecoveringChannelAsync( bool publisherConfirmationsEnabled = false, bool publisherConfirmationTrackingEnabled = false, - ushort? maxOutstandingPublisherConfirmations = null, + RateLimiter? outstandingPublisherConfirmationsRateLimiter = null, ushort? consumerDispatchConcurrency = null, CancellationToken cancellationToken = default) { @@ -197,7 +198,7 @@ public async ValueTask CreateNonRecoveringChannelAsync( return (RecoveryAwareChannel)await result.OpenAsync( publisherConfirmationsEnabled, publisherConfirmationTrackingEnabled, - maxOutstandingPublisherConfirmations, + outstandingPublisherConfirmationsRateLimiter, cancellationToken) .ConfigureAwait(false); } @@ -273,7 +274,7 @@ public async Task CreateChannelAsync(CreateChannelOptions? options = d RecoveryAwareChannel recoveryAwareChannel = await CreateNonRecoveringChannelAsync( options.PublisherConfirmationsEnabled, options.PublisherConfirmationTrackingEnabled, - options.MaxOutstandingPublisherConfirmations, + options.OutstandingPublisherConfirmationsRateLimiter, cdc, cancellationToken) .ConfigureAwait(false); @@ -284,7 +285,7 @@ public async Task CreateChannelAsync(CreateChannelOptions? options = d cdc, options.PublisherConfirmationsEnabled, options.PublisherConfirmationTrackingEnabled, - options.MaxOutstandingPublisherConfirmations); + options.OutstandingPublisherConfirmationsRateLimiter); await RecordChannelAsync(autorecoveringChannel, channelsSemaphoreHeld: false, cancellationToken: cancellationToken) .ConfigureAwait(false); return autorecoveringChannel; diff --git a/projects/RabbitMQ.Client/Impl/Channel.PublisherConfirms.cs b/projects/RabbitMQ.Client/Impl/Channel.PublisherConfirms.cs index f00311b25..680ac314f 100644 --- a/projects/RabbitMQ.Client/Impl/Channel.PublisherConfirms.cs +++ b/projects/RabbitMQ.Client/Impl/Channel.PublisherConfirms.cs @@ -35,6 +35,7 @@ using System.Diagnostics; using System.Runtime.CompilerServices; using System.Threading; +using System.Threading.RateLimiting; using System.Threading.Tasks; using RabbitMQ.Client.Events; using RabbitMQ.Client.Exceptions; @@ -47,32 +48,26 @@ internal partial class Channel : IChannel, IRecoverable { private bool _publisherConfirmationsEnabled = false; private bool _publisherConfirmationTrackingEnabled = false; - private ushort? _maxOutstandingPublisherConfirmations = null; - private SemaphoreSlim? _maxOutstandingConfirmationsSemaphore; private ulong _nextPublishSeqNo = 0; private readonly SemaphoreSlim _confirmSemaphore = new(1, 1); private readonly ConcurrentDictionary> _confirmsTaskCompletionSources = new(); + private RateLimiter? _outstandingPublisherConfirmationsRateLimiter; - private class PublisherConfirmationInfo + private sealed class PublisherConfirmationInfo : IDisposable { - private ulong _publishSequenceNumber; private TaskCompletionSource? _publisherConfirmationTcs; + private readonly RateLimitLease? _lease; - internal PublisherConfirmationInfo() + internal PublisherConfirmationInfo(ulong publishSequenceNumber, + TaskCompletionSource? publisherConfirmationTcs, + RateLimitLease? lease) { - _publishSequenceNumber = 0; - _publisherConfirmationTcs = null; - } - - internal PublisherConfirmationInfo(ulong publishSequenceNumber, TaskCompletionSource? publisherConfirmationTcs) - { - _publishSequenceNumber = publishSequenceNumber; + PublishSequenceNumber = publishSequenceNumber; _publisherConfirmationTcs = publisherConfirmationTcs; + _lease = lease; } - internal ulong PublishSequenceNumber => _publishSequenceNumber; - - internal TaskCompletionSource? PublisherConfirmationTcs => _publisherConfirmationTcs; + internal ulong PublishSequenceNumber { get; } internal async Task MaybeWaitForConfirmationAsync(CancellationToken cancellationToken) { @@ -95,6 +90,11 @@ internal bool MaybeHandleException(Exception ex) return exceptionWasHandled; } + + public void Dispose() + { + _lease?.Dispose(); + } } public async ValueTask GetNextPublishSequenceNumberAsync(CancellationToken cancellationToken = default) @@ -119,18 +119,11 @@ public async ValueTask GetNextPublishSequenceNumberAsync(CancellationToke private void ConfigurePublisherConfirmations(bool publisherConfirmationsEnabled, bool publisherConfirmationTrackingEnabled, - ushort? maxOutstandingPublisherConfirmations) + RateLimiter? outstandingPublisherConfirmationsRateLimiter) { _publisherConfirmationsEnabled = publisherConfirmationsEnabled; _publisherConfirmationTrackingEnabled = publisherConfirmationTrackingEnabled; - _maxOutstandingPublisherConfirmations = maxOutstandingPublisherConfirmations; - - if (_publisherConfirmationTrackingEnabled && _maxOutstandingPublisherConfirmations is not null) - { - _maxOutstandingConfirmationsSemaphore = new SemaphoreSlim( - (int)_maxOutstandingPublisherConfirmations, - (int)_maxOutstandingPublisherConfirmations); - } + _outstandingPublisherConfirmationsRateLimiter = outstandingPublisherConfirmationsRateLimiter; } private async Task MaybeConfirmSelect(CancellationToken cancellationToken) @@ -282,11 +275,15 @@ await _confirmSemaphore.WaitAsync(reason.CancellationToken) { if (_publisherConfirmationsEnabled) { - if (_publisherConfirmationTrackingEnabled && - _maxOutstandingConfirmationsSemaphore is not null) + RateLimitLease? lease = null; + if (_publisherConfirmationTrackingEnabled) { - await _maxOutstandingConfirmationsSemaphore.WaitAsync(cancellationToken) - .ConfigureAwait(false); + if (_outstandingPublisherConfirmationsRateLimiter is not null) + { + lease = await _outstandingPublisherConfirmationsRateLimiter.AcquireAsync( + cancellationToken: cancellationToken) + .ConfigureAwait(false); + } } await _confirmSemaphore.WaitAsync(cancellationToken) @@ -303,7 +300,7 @@ await _confirmSemaphore.WaitAsync(cancellationToken) _nextPublishSeqNo++; - return new PublisherConfirmationInfo(publishSequenceNumber, publisherConfirmationTcs); + return new PublisherConfirmationInfo(publishSequenceNumber, publisherConfirmationTcs, lease); } else { @@ -339,18 +336,19 @@ private async Task MaybeEndPublisherConfirmationTracking(PublisherConfirmationIn { if (_publisherConfirmationsEnabled) { - if (_publisherConfirmationTrackingEnabled && - _maxOutstandingConfirmationsSemaphore is not null) - { - _maxOutstandingConfirmationsSemaphore.Release(); - } - _confirmSemaphore.Release(); if (publisherConfirmationInfo is not null) { - await publisherConfirmationInfo.MaybeWaitForConfirmationAsync(cancellationToken) - .ConfigureAwait(false); + try + { + await publisherConfirmationInfo.MaybeWaitForConfirmationAsync(cancellationToken) + .ConfigureAwait(false); + } + finally + { + publisherConfirmationInfo.Dispose(); + } } } } diff --git a/projects/RabbitMQ.Client/Impl/Channel.cs b/projects/RabbitMQ.Client/Impl/Channel.cs index 871a84d60..c5c2decad 100644 --- a/projects/RabbitMQ.Client/Impl/Channel.cs +++ b/projects/RabbitMQ.Client/Impl/Channel.cs @@ -37,6 +37,7 @@ using System.Runtime.CompilerServices; using System.Text; using System.Threading; +using System.Threading.RateLimiting; using System.Threading.Tasks; using RabbitMQ.Client.ConsumerDispatching; using RabbitMQ.Client.Events; @@ -362,12 +363,12 @@ protected bool Enqueue(IRpcContinuation k) internal async Task OpenAsync(bool publisherConfirmationsEnabled, bool publisherConfirmationTrackingEnabled, - ushort? maxOutstandingPublisherConfirmations, + RateLimiter? outstandingPublisherConfirmationsRateLimiter, CancellationToken cancellationToken) { ConfigurePublisherConfirmations(publisherConfirmationsEnabled, publisherConfirmationTrackingEnabled, - maxOutstandingPublisherConfirmations); + outstandingPublisherConfirmationsRateLimiter); bool enqueued = false; var k = new ChannelOpenAsyncRpcContinuation(ContinuationTimeout, cancellationToken); @@ -532,7 +533,7 @@ protected virtual void Dispose(bool disposing) ConsumerDispatcher.Dispose(); _rpcSemaphore.Dispose(); _confirmSemaphore.Dispose(); - _maxOutstandingConfirmationsSemaphore?.Dispose(); + _outstandingPublisherConfirmationsRateLimiter?.Dispose(); } } @@ -554,7 +555,11 @@ protected virtual async ValueTask DisposeAsyncCore() ConsumerDispatcher.Dispose(); _rpcSemaphore.Dispose(); _confirmSemaphore.Dispose(); - _maxOutstandingConfirmationsSemaphore?.Dispose(); + if (_outstandingPublisherConfirmationsRateLimiter is not null) + { + await _outstandingPublisherConfirmationsRateLimiter.DisposeAsync() + .ConfigureAwait(false); + } } public Task ConnectionTuneOkAsync(ushort channelMax, uint frameMax, ushort heartbeat, CancellationToken cancellationToken) diff --git a/projects/RabbitMQ.Client/Impl/Connection.cs b/projects/RabbitMQ.Client/Impl/Connection.cs index 870ca5fc1..f8071d0e5 100644 --- a/projects/RabbitMQ.Client/Impl/Connection.cs +++ b/projects/RabbitMQ.Client/Impl/Connection.cs @@ -276,7 +276,7 @@ public async Task CreateChannelAsync(CreateChannelOptions? options = d IChannel ch = await channel.OpenAsync( options.PublisherConfirmationsEnabled, options.PublisherConfirmationTrackingEnabled, - options.MaxOutstandingPublisherConfirmations, + options.OutstandingPublisherConfirmationsRateLimiter, cancellationToken) .ConfigureAwait(false); return ch; diff --git a/projects/RabbitMQ.Client/PublicAPI.Unshipped.txt b/projects/RabbitMQ.Client/PublicAPI.Unshipped.txt index d811ca426..d32cc4182 100644 --- a/projects/RabbitMQ.Client/PublicAPI.Unshipped.txt +++ b/projects/RabbitMQ.Client/PublicAPI.Unshipped.txt @@ -1,10 +1,16 @@ const RabbitMQ.Client.Constants.PublishSequenceNumberHeader = "x-dotnet-pub-seq-no" -> string! +const RabbitMQ.Client.ThrottlingRateLimiter.DefaultThrottlingPercentage = 50 -> int +override RabbitMQ.Client.ThrottlingRateLimiter.AcquireAsyncCore(int permitCount, System.Threading.CancellationToken cancellationToken) -> System.Threading.Tasks.ValueTask +override RabbitMQ.Client.ThrottlingRateLimiter.AttemptAcquireCore(int permitCount) -> System.Threading.RateLimiting.RateLimitLease! +override RabbitMQ.Client.ThrottlingRateLimiter.Dispose(bool disposing) -> void +override RabbitMQ.Client.ThrottlingRateLimiter.GetStatistics() -> System.Threading.RateLimiting.RateLimiterStatistics? +override RabbitMQ.Client.ThrottlingRateLimiter.IdleDuration.get -> System.TimeSpan? RabbitMQ.Client.CreateChannelOptions RabbitMQ.Client.CreateChannelOptions.ConsumerDispatchConcurrency.get -> ushort? RabbitMQ.Client.CreateChannelOptions.ConsumerDispatchConcurrency.set -> void RabbitMQ.Client.CreateChannelOptions.CreateChannelOptions() -> void -RabbitMQ.Client.CreateChannelOptions.MaxOutstandingPublisherConfirmations.get -> ushort? -RabbitMQ.Client.CreateChannelOptions.MaxOutstandingPublisherConfirmations.set -> void +RabbitMQ.Client.CreateChannelOptions.OutstandingPublisherConfirmationsRateLimiter.get -> System.Threading.RateLimiting.RateLimiter? +RabbitMQ.Client.CreateChannelOptions.OutstandingPublisherConfirmationsRateLimiter.set -> void RabbitMQ.Client.CreateChannelOptions.PublisherConfirmationsEnabled.get -> bool RabbitMQ.Client.CreateChannelOptions.PublisherConfirmationsEnabled.set -> void RabbitMQ.Client.CreateChannelOptions.PublisherConfirmationTrackingEnabled.get -> bool @@ -20,9 +26,11 @@ RabbitMQ.Client.Exceptions.RabbitMQClientException.RabbitMQClientException() -> RabbitMQ.Client.IChannel.BasicPublishAsync(RabbitMQ.Client.CachedString! exchange, RabbitMQ.Client.CachedString! routingKey, bool mandatory, TProperties basicProperties, System.ReadOnlyMemory body, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.ValueTask RabbitMQ.Client.IChannel.BasicPublishAsync(string! exchange, string! routingKey, bool mandatory, TProperties basicProperties, System.ReadOnlyMemory body, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.ValueTask RabbitMQ.Client.IConnection.CreateChannelAsync(RabbitMQ.Client.CreateChannelOptions? options = null, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task! +RabbitMQ.Client.ThrottlingRateLimiter +RabbitMQ.Client.ThrottlingRateLimiter.ThrottlingRateLimiter(int maxConcurrentCalls, int? throttlingPercentage = 50) -> void static RabbitMQ.Client.CreateChannelOptions.Default.get -> RabbitMQ.Client.CreateChannelOptions! static RabbitMQ.Client.IChannelExtensions.BasicPublishAsync(this RabbitMQ.Client.IChannel! channel, RabbitMQ.Client.CachedString! exchange, RabbitMQ.Client.CachedString! routingKey, bool mandatory, System.ReadOnlyMemory body, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.ValueTask static RabbitMQ.Client.IChannelExtensions.BasicPublishAsync(this RabbitMQ.Client.IChannel! channel, RabbitMQ.Client.CachedString! exchange, RabbitMQ.Client.CachedString! routingKey, System.ReadOnlyMemory body, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.ValueTask static RabbitMQ.Client.IChannelExtensions.BasicPublishAsync(this RabbitMQ.Client.IChannel! channel, string! exchange, string! routingKey, bool mandatory, System.ReadOnlyMemory body, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.ValueTask static RabbitMQ.Client.IChannelExtensions.BasicPublishAsync(this RabbitMQ.Client.IChannel! channel, string! exchange, string! routingKey, System.ReadOnlyMemory body, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.ValueTask -static RabbitMQ.Client.IChannelExtensions.BasicPublishAsync(this RabbitMQ.Client.IChannel! channel, RabbitMQ.Client.PublicationAddress! addr, T basicProperties, System.ReadOnlyMemory body, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.ValueTask \ No newline at end of file +static RabbitMQ.Client.IChannelExtensions.BasicPublishAsync(this RabbitMQ.Client.IChannel! channel, RabbitMQ.Client.PublicationAddress! addr, T basicProperties, System.ReadOnlyMemory body, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.ValueTask diff --git a/projects/RabbitMQ.Client/RabbitMQ.Client.csproj b/projects/RabbitMQ.Client/RabbitMQ.Client.csproj index 1e7d59b92..1ddf4cad9 100644 --- a/projects/RabbitMQ.Client/RabbitMQ.Client.csproj +++ b/projects/RabbitMQ.Client/RabbitMQ.Client.csproj @@ -65,6 +65,7 @@ See https://github.com/rabbitmq/rabbitmq-dotnet-client/pull/1481#pullrequestreview-1847905299 --> + diff --git a/projects/RabbitMQ.Client/ThrottlingRateLimiter.cs b/projects/RabbitMQ.Client/ThrottlingRateLimiter.cs new file mode 100644 index 000000000..b55ab9908 --- /dev/null +++ b/projects/RabbitMQ.Client/ThrottlingRateLimiter.cs @@ -0,0 +1,117 @@ +// This source code is dual-licensed under the Apache License, version +// 2.0, and the Mozilla Public License, version 2.0. +// +// The APL v2.0: +// +//--------------------------------------------------------------------------- +// Copyright (c) 2007-2024 Broadcom. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +//--------------------------------------------------------------------------- +// +// The MPL v2.0: +// +//--------------------------------------------------------------------------- +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at https://mozilla.org/MPL/2.0/. +// +// Copyright (c) 2007-2024 Broadcom. All Rights Reserved. +//--------------------------------------------------------------------------- + +using System; +using System.Threading; +using System.Threading.RateLimiting; +using System.Threading.Tasks; + +namespace RabbitMQ.Client +{ + public class ThrottlingRateLimiter : RateLimiter + { + public const int DefaultThrottlingPercentage = 50; + + private readonly ConcurrencyLimiter _concurrencyLimiter; + private readonly int _maxConcurrency; + private readonly int _throttlingThreshold; + + public ThrottlingRateLimiter(int maxConcurrentCalls, int? throttlingPercentage = DefaultThrottlingPercentage) + { + _maxConcurrency = maxConcurrentCalls; + _throttlingThreshold = _maxConcurrency * throttlingPercentage.GetValueOrDefault(DefaultThrottlingPercentage) / 100; + + ConcurrencyLimiterOptions limiterOptions = new() + { + QueueLimit = _maxConcurrency, + PermitLimit = _maxConcurrency + }; + + _concurrencyLimiter = new ConcurrencyLimiter(limiterOptions); + } + + public override RateLimiterStatistics? GetStatistics() => _concurrencyLimiter.GetStatistics(); + + protected override RateLimitLease AttemptAcquireCore(int permitCount) + { + RateLimitLease lease = _concurrencyLimiter.AttemptAcquire(permitCount); + + ThrottleIfNeeded(); + + return lease; + } + + protected override async ValueTask AcquireAsyncCore(int permitCount, CancellationToken cancellationToken) + { + RateLimitLease lease = await _concurrencyLimiter.AcquireAsync(permitCount, cancellationToken).ConfigureAwait(false); + + await ThrottleIfNeededAsync(cancellationToken).ConfigureAwait(false); + + return lease; + } + + private void ThrottleIfNeeded() + { + long? availablePermits = _concurrencyLimiter.GetStatistics()?.CurrentAvailablePermits; + if (!(availablePermits < _throttlingThreshold)) + { + return; + } + + int delay = (int)((1.0 - availablePermits / (double)_maxConcurrency) * 1000); + Thread.Sleep(delay); + } + + private Task ThrottleIfNeededAsync(CancellationToken cancellationToken = default) + { + long? availablePermits = _concurrencyLimiter.GetStatistics()?.CurrentAvailablePermits; + if (!(availablePermits < _throttlingThreshold)) + { + return Task.CompletedTask; + } + + int delay = (int)((1.0 - availablePermits / (double)_maxConcurrency) * 1000); + return Task.Delay(delay, cancellationToken); + } + + protected override void Dispose(bool disposing) + { + if (disposing) + { + _concurrencyLimiter.Dispose(); + } + + base.Dispose(disposing); + } + + public override TimeSpan? IdleDuration => null; + } +} diff --git a/projects/Test/Applications/PublisherConfirms/PublisherConfirms.cs b/projects/Test/Applications/PublisherConfirms/PublisherConfirms.cs index 10e153b8e..2d78f0d24 100644 --- a/projects/Test/Applications/PublisherConfirms/PublisherConfirms.cs +++ b/projects/Test/Applications/PublisherConfirms/PublisherConfirms.cs @@ -37,6 +37,7 @@ using System.Threading; using System.Threading.Tasks; using RabbitMQ.Client; +using RabbitMQ.Client.Impl; const ushort MAX_OUTSTANDING_CONFIRMS = 256; @@ -47,7 +48,7 @@ { PublisherConfirmationsEnabled = true, PublisherConfirmationTrackingEnabled = true, - MaxOutstandingPublisherConfirmations = MAX_OUTSTANDING_CONFIRMS + OutstandingPublisherConfirmationsRateLimiter = new ThrottlingRateLimiter(MAX_OUTSTANDING_CONFIRMS) }; #pragma warning disable CS8321 // Local function is declared but never used diff --git a/projects/Test/Integration/TestToxiproxy.cs b/projects/Test/Integration/TestToxiproxy.cs index 9c588f889..d6567ee1b 100644 --- a/projects/Test/Integration/TestToxiproxy.cs +++ b/projects/Test/Integration/TestToxiproxy.cs @@ -30,7 +30,9 @@ //--------------------------------------------------------------------------- using System; +using System.Collections.Generic; using System.Net; +using System.Threading; using System.Threading.Tasks; using Integration; using RabbitMQ.Client; @@ -105,9 +107,9 @@ public async Task TestCloseConnection() } /* - * Note: using TrySetResult because this callback will be called when the - * test exits, and connectionShutdownTcs will have already been set - */ + * Note: using TrySetResult because this callback will be called when the + * test exits, and connectionShutdownTcs will have already been set + */ connectionShutdownTcs.TrySetResult(true); return Task.CompletedTask; }; @@ -284,6 +286,108 @@ public async Task TestTcpReset_GH1464() await recoveryTask; } + [SkippableFact] + [Trait("Category", "Toxiproxy")] + public async Task TestPublisherConfirmationThrottling() + { + Skip.IfNot(AreToxiproxyTestsEnabled, "RABBITMQ_TOXIPROXY_TESTS is not set, skipping test"); + + const int TotalMessageCount = 64; + const int MaxOutstandingConfirms = 8; + const int BatchSize = MaxOutstandingConfirms * 2; + + using var pm = new ToxiproxyManager(_testDisplayName, IsRunningInCI, IsWindows); + await pm.InitializeAsync(); + + ConnectionFactory cf = CreateConnectionFactory(); + cf.Endpoint = new AmqpTcpEndpoint(IPAddress.Loopback.ToString(), pm.ProxyPort); + cf.RequestedHeartbeat = TimeSpan.FromSeconds(5); + cf.AutomaticRecoveryEnabled = true; + + var channelOpts = new CreateChannelOptions + { + PublisherConfirmationsEnabled = true, + PublisherConfirmationTrackingEnabled = true, + OutstandingPublisherConfirmationsRateLimiter = new ThrottlingRateLimiter(MaxOutstandingConfirms) + }; + + var channelCreatedTcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); + var messagesPublishedTcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); + long publishCount = 0; + Task publishTask = Task.Run(async () => + { + await using (IConnection conn = await cf.CreateConnectionAsync()) + { + await using (IChannel ch = await conn.CreateChannelAsync(channelOpts)) + { + QueueDeclareOk q = await ch.QueueDeclareAsync(); + + channelCreatedTcs.SetResult(true); + + try + { + var publishBatch = new List(); + while (publishCount < TotalMessageCount) + { + for (int i = 0; i < BatchSize; i++) + { + publishBatch.Add(ch.BasicPublishAsync("", q.QueueName, GetRandomBody())); + } + + foreach (ValueTask pt in publishBatch) + { + await pt; + Interlocked.Increment(ref publishCount); + } + + publishBatch.Clear(); + } + + messagesPublishedTcs.SetResult(true); + } + catch (Exception ex) + { + messagesPublishedTcs.SetException(ex); + } + } + } + }); + + await channelCreatedTcs.Task; + + const string toxicName = "rmq-localhost-bandwidth"; + var bandwidthToxic = new BandwidthToxic(); + bandwidthToxic.Name = toxicName; + bandwidthToxic.Attributes.Rate = 0; + bandwidthToxic.Toxicity = 1.0; + bandwidthToxic.Stream = ToxicDirection.DownStream; + + await Task.Delay(TimeSpan.FromSeconds(1)); + + Task addToxicTask = pm.AddToxicAsync(bandwidthToxic); + + while (true) + { + long publishCount0 = Interlocked.Read(ref publishCount); + await Task.Delay(TimeSpan.FromSeconds(5)); + long publishCount1 = Interlocked.Read(ref publishCount); + + if (publishCount0 == publishCount1) + { + // Publishing has "settled" due to being blocked + break; + } + } + + await addToxicTask.WaitAsync(WaitSpan); + await pm.RemoveToxicAsync(toxicName).WaitAsync(WaitSpan); + + await messagesPublishedTcs.Task.WaitAsync(WaitSpan); + await publishTask.WaitAsync(WaitSpan); + + Assert.Equal(TotalMessageCount, publishCount); + } + private bool AreToxiproxyTestsEnabled { get