diff --git a/.ci/windows/gha-log-check.ps1 b/.ci/windows/gha-log-check.ps1
index d618f43b1..c79cf87db 100644
--- a/.ci/windows/gha-log-check.ps1
+++ b/.ci/windows/gha-log-check.ps1
@@ -8,6 +8,6 @@ Write-Host "[INFO] looking for errors in '$rabbitmq_log_dir'"
If (Get-ChildItem $rabbitmq_log_dir\*.log | Select-String -Quiet -SimpleMatch -Pattern inet_error)
{
- Write-Error "[WARNING] found inet_error in '$rabbitmq_log_dir'"
+ Write-Host -ForegroundColor Yellow "[WARNING] found inet_error in '$rabbitmq_log_dir'"
exit 0
}
diff --git a/projects/Directory.Packages.props b/projects/Directory.Packages.props
index 1d76dcea9..2a5f3a6ce 100644
--- a/projects/Directory.Packages.props
+++ b/projects/Directory.Packages.props
@@ -16,6 +16,7 @@
-->
+
diff --git a/projects/RabbitMQ.Client/CreateChannelOptions.cs b/projects/RabbitMQ.Client/CreateChannelOptions.cs
index 647177972..4fb04afd6 100644
--- a/projects/RabbitMQ.Client/CreateChannelOptions.cs
+++ b/projects/RabbitMQ.Client/CreateChannelOptions.cs
@@ -1,3 +1,37 @@
+// This source code is dual-licensed under the Apache License, version
+// 2.0, and the Mozilla Public License, version 2.0.
+//
+// The APL v2.0:
+//
+//---------------------------------------------------------------------------
+// Copyright (c) 2007-2024 Broadcom. All Rights Reserved.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// https://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+//---------------------------------------------------------------------------
+//
+// The MPL v2.0:
+//
+//---------------------------------------------------------------------------
+// This Source Code Form is subject to the terms of the Mozilla Public
+// License, v. 2.0. If a copy of the MPL was not distributed with this
+// file, You can obtain one at https://mozilla.org/MPL/2.0/.
+//
+// Copyright (c) 2007-2024 Broadcom. All Rights Reserved.
+//---------------------------------------------------------------------------
+
+using System.Threading.RateLimiting;
+using RabbitMQ.Client.Impl;
+
namespace RabbitMQ.Client
{
///
@@ -16,15 +50,10 @@ public sealed class CreateChannelOptions
public bool PublisherConfirmationTrackingEnabled { get; set; } = false;
///
- /// If publisher confirmation tracking is enabled, this represents the number of allowed
- /// outstanding publisher confirmations before publishing is blocked.
- ///
- /// Defaults to 128
- ///
- /// Set to null, to allow an unlimited number of outstanding confirmations.
- ///
+ /// If the publisher confirmation tracking is enabled, this represents the rate limiter used to
+ /// throttle additional attempts to publish once the threshold is reached.
///
- public ushort? MaxOutstandingPublisherConfirmations { get; set; } = 128;
+ public RateLimiter? OutstandingPublisherConfirmationsRateLimiter { get; set; } = new ThrottlingRateLimiter(128);
///
/// Set to a value greater than one to enable concurrent processing. For a concurrency greater than one
diff --git a/projects/RabbitMQ.Client/Impl/AutorecoveringChannel.cs b/projects/RabbitMQ.Client/Impl/AutorecoveringChannel.cs
index 2b8a4d62f..36c3c7dee 100644
--- a/projects/RabbitMQ.Client/Impl/AutorecoveringChannel.cs
+++ b/projects/RabbitMQ.Client/Impl/AutorecoveringChannel.cs
@@ -33,6 +33,7 @@
using System.Collections.Generic;
using System.Runtime.CompilerServices;
using System.Threading;
+using System.Threading.RateLimiting;
using System.Threading.Tasks;
using RabbitMQ.Client.ConsumerDispatching;
using RabbitMQ.Client.Events;
@@ -51,7 +52,7 @@ internal sealed class AutorecoveringChannel : IChannel, IRecoverable
private ushort _prefetchCountGlobal;
private bool _publisherConfirmationsEnabled = false;
private bool _publisherConfirmationTrackingEnabled = false;
- private ushort? _maxOutstandingPublisherConfirmations = null;
+ private RateLimiter? _outstandingPublisherConfirmationsRateLimiter = null;
private bool _usesTransactions;
private ushort _consumerDispatchConcurrency;
@@ -78,14 +79,14 @@ public AutorecoveringChannel(AutorecoveringConnection conn,
ushort consumerDispatchConcurrency,
bool publisherConfirmationsEnabled,
bool publisherConfirmationTrackingEnabled,
- ushort? maxOutstandingPublisherConfirmations)
+ RateLimiter? outstandingPublisherConfirmationsRateLimiter)
{
_connection = conn;
_innerChannel = innerChannel;
_consumerDispatchConcurrency = consumerDispatchConcurrency;
_publisherConfirmationsEnabled = publisherConfirmationsEnabled;
_publisherConfirmationTrackingEnabled = publisherConfirmationTrackingEnabled;
- _maxOutstandingPublisherConfirmations = maxOutstandingPublisherConfirmations;
+ _outstandingPublisherConfirmationsRateLimiter = outstandingPublisherConfirmationsRateLimiter;
}
public event AsyncEventHandler BasicAcksAsync
@@ -173,7 +174,7 @@ internal async Task AutomaticallyRecoverAsync(AutorecoveringConnection con
RecoveryAwareChannel newChannel = await conn.CreateNonRecoveringChannelAsync(
_publisherConfirmationsEnabled,
_publisherConfirmationTrackingEnabled,
- _maxOutstandingPublisherConfirmations,
+ _outstandingPublisherConfirmationsRateLimiter,
_consumerDispatchConcurrency,
cancellationToken)
.ConfigureAwait(false);
diff --git a/projects/RabbitMQ.Client/Impl/AutorecoveringConnection.cs b/projects/RabbitMQ.Client/Impl/AutorecoveringConnection.cs
index 3e544bb84..1c7adb0aa 100644
--- a/projects/RabbitMQ.Client/Impl/AutorecoveringConnection.cs
+++ b/projects/RabbitMQ.Client/Impl/AutorecoveringConnection.cs
@@ -33,6 +33,7 @@
using System.Collections.Generic;
using System.Runtime.CompilerServices;
using System.Threading;
+using System.Threading.RateLimiting;
using System.Threading.Tasks;
using RabbitMQ.Client.Events;
using RabbitMQ.Client.Exceptions;
@@ -188,7 +189,7 @@ public event AsyncEventHandler RecoveringConsumerAs
public async ValueTask CreateNonRecoveringChannelAsync(
bool publisherConfirmationsEnabled = false,
bool publisherConfirmationTrackingEnabled = false,
- ushort? maxOutstandingPublisherConfirmations = null,
+ RateLimiter? outstandingPublisherConfirmationsRateLimiter = null,
ushort? consumerDispatchConcurrency = null,
CancellationToken cancellationToken = default)
{
@@ -197,7 +198,7 @@ public async ValueTask CreateNonRecoveringChannelAsync(
return (RecoveryAwareChannel)await result.OpenAsync(
publisherConfirmationsEnabled,
publisherConfirmationTrackingEnabled,
- maxOutstandingPublisherConfirmations,
+ outstandingPublisherConfirmationsRateLimiter,
cancellationToken)
.ConfigureAwait(false);
}
@@ -273,7 +274,7 @@ public async Task CreateChannelAsync(CreateChannelOptions? options = d
RecoveryAwareChannel recoveryAwareChannel = await CreateNonRecoveringChannelAsync(
options.PublisherConfirmationsEnabled,
options.PublisherConfirmationTrackingEnabled,
- options.MaxOutstandingPublisherConfirmations,
+ options.OutstandingPublisherConfirmationsRateLimiter,
cdc,
cancellationToken)
.ConfigureAwait(false);
@@ -284,7 +285,7 @@ public async Task CreateChannelAsync(CreateChannelOptions? options = d
cdc,
options.PublisherConfirmationsEnabled,
options.PublisherConfirmationTrackingEnabled,
- options.MaxOutstandingPublisherConfirmations);
+ options.OutstandingPublisherConfirmationsRateLimiter);
await RecordChannelAsync(autorecoveringChannel, channelsSemaphoreHeld: false, cancellationToken: cancellationToken)
.ConfigureAwait(false);
return autorecoveringChannel;
diff --git a/projects/RabbitMQ.Client/Impl/Channel.PublisherConfirms.cs b/projects/RabbitMQ.Client/Impl/Channel.PublisherConfirms.cs
index f00311b25..680ac314f 100644
--- a/projects/RabbitMQ.Client/Impl/Channel.PublisherConfirms.cs
+++ b/projects/RabbitMQ.Client/Impl/Channel.PublisherConfirms.cs
@@ -35,6 +35,7 @@
using System.Diagnostics;
using System.Runtime.CompilerServices;
using System.Threading;
+using System.Threading.RateLimiting;
using System.Threading.Tasks;
using RabbitMQ.Client.Events;
using RabbitMQ.Client.Exceptions;
@@ -47,32 +48,26 @@ internal partial class Channel : IChannel, IRecoverable
{
private bool _publisherConfirmationsEnabled = false;
private bool _publisherConfirmationTrackingEnabled = false;
- private ushort? _maxOutstandingPublisherConfirmations = null;
- private SemaphoreSlim? _maxOutstandingConfirmationsSemaphore;
private ulong _nextPublishSeqNo = 0;
private readonly SemaphoreSlim _confirmSemaphore = new(1, 1);
private readonly ConcurrentDictionary> _confirmsTaskCompletionSources = new();
+ private RateLimiter? _outstandingPublisherConfirmationsRateLimiter;
- private class PublisherConfirmationInfo
+ private sealed class PublisherConfirmationInfo : IDisposable
{
- private ulong _publishSequenceNumber;
private TaskCompletionSource? _publisherConfirmationTcs;
+ private readonly RateLimitLease? _lease;
- internal PublisherConfirmationInfo()
+ internal PublisherConfirmationInfo(ulong publishSequenceNumber,
+ TaskCompletionSource? publisherConfirmationTcs,
+ RateLimitLease? lease)
{
- _publishSequenceNumber = 0;
- _publisherConfirmationTcs = null;
- }
-
- internal PublisherConfirmationInfo(ulong publishSequenceNumber, TaskCompletionSource? publisherConfirmationTcs)
- {
- _publishSequenceNumber = publishSequenceNumber;
+ PublishSequenceNumber = publishSequenceNumber;
_publisherConfirmationTcs = publisherConfirmationTcs;
+ _lease = lease;
}
- internal ulong PublishSequenceNumber => _publishSequenceNumber;
-
- internal TaskCompletionSource? PublisherConfirmationTcs => _publisherConfirmationTcs;
+ internal ulong PublishSequenceNumber { get; }
internal async Task MaybeWaitForConfirmationAsync(CancellationToken cancellationToken)
{
@@ -95,6 +90,11 @@ internal bool MaybeHandleException(Exception ex)
return exceptionWasHandled;
}
+
+ public void Dispose()
+ {
+ _lease?.Dispose();
+ }
}
public async ValueTask GetNextPublishSequenceNumberAsync(CancellationToken cancellationToken = default)
@@ -119,18 +119,11 @@ public async ValueTask GetNextPublishSequenceNumberAsync(CancellationToke
private void ConfigurePublisherConfirmations(bool publisherConfirmationsEnabled,
bool publisherConfirmationTrackingEnabled,
- ushort? maxOutstandingPublisherConfirmations)
+ RateLimiter? outstandingPublisherConfirmationsRateLimiter)
{
_publisherConfirmationsEnabled = publisherConfirmationsEnabled;
_publisherConfirmationTrackingEnabled = publisherConfirmationTrackingEnabled;
- _maxOutstandingPublisherConfirmations = maxOutstandingPublisherConfirmations;
-
- if (_publisherConfirmationTrackingEnabled && _maxOutstandingPublisherConfirmations is not null)
- {
- _maxOutstandingConfirmationsSemaphore = new SemaphoreSlim(
- (int)_maxOutstandingPublisherConfirmations,
- (int)_maxOutstandingPublisherConfirmations);
- }
+ _outstandingPublisherConfirmationsRateLimiter = outstandingPublisherConfirmationsRateLimiter;
}
private async Task MaybeConfirmSelect(CancellationToken cancellationToken)
@@ -282,11 +275,15 @@ await _confirmSemaphore.WaitAsync(reason.CancellationToken)
{
if (_publisherConfirmationsEnabled)
{
- if (_publisherConfirmationTrackingEnabled &&
- _maxOutstandingConfirmationsSemaphore is not null)
+ RateLimitLease? lease = null;
+ if (_publisherConfirmationTrackingEnabled)
{
- await _maxOutstandingConfirmationsSemaphore.WaitAsync(cancellationToken)
- .ConfigureAwait(false);
+ if (_outstandingPublisherConfirmationsRateLimiter is not null)
+ {
+ lease = await _outstandingPublisherConfirmationsRateLimiter.AcquireAsync(
+ cancellationToken: cancellationToken)
+ .ConfigureAwait(false);
+ }
}
await _confirmSemaphore.WaitAsync(cancellationToken)
@@ -303,7 +300,7 @@ await _confirmSemaphore.WaitAsync(cancellationToken)
_nextPublishSeqNo++;
- return new PublisherConfirmationInfo(publishSequenceNumber, publisherConfirmationTcs);
+ return new PublisherConfirmationInfo(publishSequenceNumber, publisherConfirmationTcs, lease);
}
else
{
@@ -339,18 +336,19 @@ private async Task MaybeEndPublisherConfirmationTracking(PublisherConfirmationIn
{
if (_publisherConfirmationsEnabled)
{
- if (_publisherConfirmationTrackingEnabled &&
- _maxOutstandingConfirmationsSemaphore is not null)
- {
- _maxOutstandingConfirmationsSemaphore.Release();
- }
-
_confirmSemaphore.Release();
if (publisherConfirmationInfo is not null)
{
- await publisherConfirmationInfo.MaybeWaitForConfirmationAsync(cancellationToken)
- .ConfigureAwait(false);
+ try
+ {
+ await publisherConfirmationInfo.MaybeWaitForConfirmationAsync(cancellationToken)
+ .ConfigureAwait(false);
+ }
+ finally
+ {
+ publisherConfirmationInfo.Dispose();
+ }
}
}
}
diff --git a/projects/RabbitMQ.Client/Impl/Channel.cs b/projects/RabbitMQ.Client/Impl/Channel.cs
index 871a84d60..c5c2decad 100644
--- a/projects/RabbitMQ.Client/Impl/Channel.cs
+++ b/projects/RabbitMQ.Client/Impl/Channel.cs
@@ -37,6 +37,7 @@
using System.Runtime.CompilerServices;
using System.Text;
using System.Threading;
+using System.Threading.RateLimiting;
using System.Threading.Tasks;
using RabbitMQ.Client.ConsumerDispatching;
using RabbitMQ.Client.Events;
@@ -362,12 +363,12 @@ protected bool Enqueue(IRpcContinuation k)
internal async Task OpenAsync(bool publisherConfirmationsEnabled,
bool publisherConfirmationTrackingEnabled,
- ushort? maxOutstandingPublisherConfirmations,
+ RateLimiter? outstandingPublisherConfirmationsRateLimiter,
CancellationToken cancellationToken)
{
ConfigurePublisherConfirmations(publisherConfirmationsEnabled,
publisherConfirmationTrackingEnabled,
- maxOutstandingPublisherConfirmations);
+ outstandingPublisherConfirmationsRateLimiter);
bool enqueued = false;
var k = new ChannelOpenAsyncRpcContinuation(ContinuationTimeout, cancellationToken);
@@ -532,7 +533,7 @@ protected virtual void Dispose(bool disposing)
ConsumerDispatcher.Dispose();
_rpcSemaphore.Dispose();
_confirmSemaphore.Dispose();
- _maxOutstandingConfirmationsSemaphore?.Dispose();
+ _outstandingPublisherConfirmationsRateLimiter?.Dispose();
}
}
@@ -554,7 +555,11 @@ protected virtual async ValueTask DisposeAsyncCore()
ConsumerDispatcher.Dispose();
_rpcSemaphore.Dispose();
_confirmSemaphore.Dispose();
- _maxOutstandingConfirmationsSemaphore?.Dispose();
+ if (_outstandingPublisherConfirmationsRateLimiter is not null)
+ {
+ await _outstandingPublisherConfirmationsRateLimiter.DisposeAsync()
+ .ConfigureAwait(false);
+ }
}
public Task ConnectionTuneOkAsync(ushort channelMax, uint frameMax, ushort heartbeat, CancellationToken cancellationToken)
diff --git a/projects/RabbitMQ.Client/Impl/Connection.cs b/projects/RabbitMQ.Client/Impl/Connection.cs
index 870ca5fc1..f8071d0e5 100644
--- a/projects/RabbitMQ.Client/Impl/Connection.cs
+++ b/projects/RabbitMQ.Client/Impl/Connection.cs
@@ -276,7 +276,7 @@ public async Task CreateChannelAsync(CreateChannelOptions? options = d
IChannel ch = await channel.OpenAsync(
options.PublisherConfirmationsEnabled,
options.PublisherConfirmationTrackingEnabled,
- options.MaxOutstandingPublisherConfirmations,
+ options.OutstandingPublisherConfirmationsRateLimiter,
cancellationToken)
.ConfigureAwait(false);
return ch;
diff --git a/projects/RabbitMQ.Client/PublicAPI.Unshipped.txt b/projects/RabbitMQ.Client/PublicAPI.Unshipped.txt
index d811ca426..d32cc4182 100644
--- a/projects/RabbitMQ.Client/PublicAPI.Unshipped.txt
+++ b/projects/RabbitMQ.Client/PublicAPI.Unshipped.txt
@@ -1,10 +1,16 @@
const RabbitMQ.Client.Constants.PublishSequenceNumberHeader = "x-dotnet-pub-seq-no" -> string!
+const RabbitMQ.Client.ThrottlingRateLimiter.DefaultThrottlingPercentage = 50 -> int
+override RabbitMQ.Client.ThrottlingRateLimiter.AcquireAsyncCore(int permitCount, System.Threading.CancellationToken cancellationToken) -> System.Threading.Tasks.ValueTask
+override RabbitMQ.Client.ThrottlingRateLimiter.AttemptAcquireCore(int permitCount) -> System.Threading.RateLimiting.RateLimitLease!
+override RabbitMQ.Client.ThrottlingRateLimiter.Dispose(bool disposing) -> void
+override RabbitMQ.Client.ThrottlingRateLimiter.GetStatistics() -> System.Threading.RateLimiting.RateLimiterStatistics?
+override RabbitMQ.Client.ThrottlingRateLimiter.IdleDuration.get -> System.TimeSpan?
RabbitMQ.Client.CreateChannelOptions
RabbitMQ.Client.CreateChannelOptions.ConsumerDispatchConcurrency.get -> ushort?
RabbitMQ.Client.CreateChannelOptions.ConsumerDispatchConcurrency.set -> void
RabbitMQ.Client.CreateChannelOptions.CreateChannelOptions() -> void
-RabbitMQ.Client.CreateChannelOptions.MaxOutstandingPublisherConfirmations.get -> ushort?
-RabbitMQ.Client.CreateChannelOptions.MaxOutstandingPublisherConfirmations.set -> void
+RabbitMQ.Client.CreateChannelOptions.OutstandingPublisherConfirmationsRateLimiter.get -> System.Threading.RateLimiting.RateLimiter?
+RabbitMQ.Client.CreateChannelOptions.OutstandingPublisherConfirmationsRateLimiter.set -> void
RabbitMQ.Client.CreateChannelOptions.PublisherConfirmationsEnabled.get -> bool
RabbitMQ.Client.CreateChannelOptions.PublisherConfirmationsEnabled.set -> void
RabbitMQ.Client.CreateChannelOptions.PublisherConfirmationTrackingEnabled.get -> bool
@@ -20,9 +26,11 @@ RabbitMQ.Client.Exceptions.RabbitMQClientException.RabbitMQClientException() ->
RabbitMQ.Client.IChannel.BasicPublishAsync(RabbitMQ.Client.CachedString! exchange, RabbitMQ.Client.CachedString! routingKey, bool mandatory, TProperties basicProperties, System.ReadOnlyMemory body, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.ValueTask
RabbitMQ.Client.IChannel.BasicPublishAsync(string! exchange, string! routingKey, bool mandatory, TProperties basicProperties, System.ReadOnlyMemory body, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.ValueTask
RabbitMQ.Client.IConnection.CreateChannelAsync(RabbitMQ.Client.CreateChannelOptions? options = null, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task!
+RabbitMQ.Client.ThrottlingRateLimiter
+RabbitMQ.Client.ThrottlingRateLimiter.ThrottlingRateLimiter(int maxConcurrentCalls, int? throttlingPercentage = 50) -> void
static RabbitMQ.Client.CreateChannelOptions.Default.get -> RabbitMQ.Client.CreateChannelOptions!
static RabbitMQ.Client.IChannelExtensions.BasicPublishAsync(this RabbitMQ.Client.IChannel! channel, RabbitMQ.Client.CachedString! exchange, RabbitMQ.Client.CachedString! routingKey, bool mandatory, System.ReadOnlyMemory body, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.ValueTask
static RabbitMQ.Client.IChannelExtensions.BasicPublishAsync(this RabbitMQ.Client.IChannel! channel, RabbitMQ.Client.CachedString! exchange, RabbitMQ.Client.CachedString! routingKey, System.ReadOnlyMemory body, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.ValueTask
static RabbitMQ.Client.IChannelExtensions.BasicPublishAsync(this RabbitMQ.Client.IChannel! channel, string! exchange, string! routingKey, bool mandatory, System.ReadOnlyMemory body, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.ValueTask
static RabbitMQ.Client.IChannelExtensions.BasicPublishAsync(this RabbitMQ.Client.IChannel! channel, string! exchange, string! routingKey, System.ReadOnlyMemory body, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.ValueTask
-static RabbitMQ.Client.IChannelExtensions.BasicPublishAsync(this RabbitMQ.Client.IChannel! channel, RabbitMQ.Client.PublicationAddress! addr, T basicProperties, System.ReadOnlyMemory body, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.ValueTask
\ No newline at end of file
+static RabbitMQ.Client.IChannelExtensions.BasicPublishAsync(this RabbitMQ.Client.IChannel! channel, RabbitMQ.Client.PublicationAddress! addr, T basicProperties, System.ReadOnlyMemory body, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.ValueTask
diff --git a/projects/RabbitMQ.Client/RabbitMQ.Client.csproj b/projects/RabbitMQ.Client/RabbitMQ.Client.csproj
index 1e7d59b92..1ddf4cad9 100644
--- a/projects/RabbitMQ.Client/RabbitMQ.Client.csproj
+++ b/projects/RabbitMQ.Client/RabbitMQ.Client.csproj
@@ -65,6 +65,7 @@
See https://github.com/rabbitmq/rabbitmq-dotnet-client/pull/1481#pullrequestreview-1847905299
-->
+
diff --git a/projects/RabbitMQ.Client/ThrottlingRateLimiter.cs b/projects/RabbitMQ.Client/ThrottlingRateLimiter.cs
new file mode 100644
index 000000000..b55ab9908
--- /dev/null
+++ b/projects/RabbitMQ.Client/ThrottlingRateLimiter.cs
@@ -0,0 +1,117 @@
+// This source code is dual-licensed under the Apache License, version
+// 2.0, and the Mozilla Public License, version 2.0.
+//
+// The APL v2.0:
+//
+//---------------------------------------------------------------------------
+// Copyright (c) 2007-2024 Broadcom. All Rights Reserved.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// https://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+//---------------------------------------------------------------------------
+//
+// The MPL v2.0:
+//
+//---------------------------------------------------------------------------
+// This Source Code Form is subject to the terms of the Mozilla Public
+// License, v. 2.0. If a copy of the MPL was not distributed with this
+// file, You can obtain one at https://mozilla.org/MPL/2.0/.
+//
+// Copyright (c) 2007-2024 Broadcom. All Rights Reserved.
+//---------------------------------------------------------------------------
+
+using System;
+using System.Threading;
+using System.Threading.RateLimiting;
+using System.Threading.Tasks;
+
+namespace RabbitMQ.Client
+{
+ public class ThrottlingRateLimiter : RateLimiter
+ {
+ public const int DefaultThrottlingPercentage = 50;
+
+ private readonly ConcurrencyLimiter _concurrencyLimiter;
+ private readonly int _maxConcurrency;
+ private readonly int _throttlingThreshold;
+
+ public ThrottlingRateLimiter(int maxConcurrentCalls, int? throttlingPercentage = DefaultThrottlingPercentage)
+ {
+ _maxConcurrency = maxConcurrentCalls;
+ _throttlingThreshold = _maxConcurrency * throttlingPercentage.GetValueOrDefault(DefaultThrottlingPercentage) / 100;
+
+ ConcurrencyLimiterOptions limiterOptions = new()
+ {
+ QueueLimit = _maxConcurrency,
+ PermitLimit = _maxConcurrency
+ };
+
+ _concurrencyLimiter = new ConcurrencyLimiter(limiterOptions);
+ }
+
+ public override RateLimiterStatistics? GetStatistics() => _concurrencyLimiter.GetStatistics();
+
+ protected override RateLimitLease AttemptAcquireCore(int permitCount)
+ {
+ RateLimitLease lease = _concurrencyLimiter.AttemptAcquire(permitCount);
+
+ ThrottleIfNeeded();
+
+ return lease;
+ }
+
+ protected override async ValueTask AcquireAsyncCore(int permitCount, CancellationToken cancellationToken)
+ {
+ RateLimitLease lease = await _concurrencyLimiter.AcquireAsync(permitCount, cancellationToken).ConfigureAwait(false);
+
+ await ThrottleIfNeededAsync(cancellationToken).ConfigureAwait(false);
+
+ return lease;
+ }
+
+ private void ThrottleIfNeeded()
+ {
+ long? availablePermits = _concurrencyLimiter.GetStatistics()?.CurrentAvailablePermits;
+ if (!(availablePermits < _throttlingThreshold))
+ {
+ return;
+ }
+
+ int delay = (int)((1.0 - availablePermits / (double)_maxConcurrency) * 1000);
+ Thread.Sleep(delay);
+ }
+
+ private Task ThrottleIfNeededAsync(CancellationToken cancellationToken = default)
+ {
+ long? availablePermits = _concurrencyLimiter.GetStatistics()?.CurrentAvailablePermits;
+ if (!(availablePermits < _throttlingThreshold))
+ {
+ return Task.CompletedTask;
+ }
+
+ int delay = (int)((1.0 - availablePermits / (double)_maxConcurrency) * 1000);
+ return Task.Delay(delay, cancellationToken);
+ }
+
+ protected override void Dispose(bool disposing)
+ {
+ if (disposing)
+ {
+ _concurrencyLimiter.Dispose();
+ }
+
+ base.Dispose(disposing);
+ }
+
+ public override TimeSpan? IdleDuration => null;
+ }
+}
diff --git a/projects/Test/Applications/PublisherConfirms/PublisherConfirms.cs b/projects/Test/Applications/PublisherConfirms/PublisherConfirms.cs
index 10e153b8e..2d78f0d24 100644
--- a/projects/Test/Applications/PublisherConfirms/PublisherConfirms.cs
+++ b/projects/Test/Applications/PublisherConfirms/PublisherConfirms.cs
@@ -37,6 +37,7 @@
using System.Threading;
using System.Threading.Tasks;
using RabbitMQ.Client;
+using RabbitMQ.Client.Impl;
const ushort MAX_OUTSTANDING_CONFIRMS = 256;
@@ -47,7 +48,7 @@
{
PublisherConfirmationsEnabled = true,
PublisherConfirmationTrackingEnabled = true,
- MaxOutstandingPublisherConfirmations = MAX_OUTSTANDING_CONFIRMS
+ OutstandingPublisherConfirmationsRateLimiter = new ThrottlingRateLimiter(MAX_OUTSTANDING_CONFIRMS)
};
#pragma warning disable CS8321 // Local function is declared but never used
diff --git a/projects/Test/Integration/TestToxiproxy.cs b/projects/Test/Integration/TestToxiproxy.cs
index 9c588f889..d6567ee1b 100644
--- a/projects/Test/Integration/TestToxiproxy.cs
+++ b/projects/Test/Integration/TestToxiproxy.cs
@@ -30,7 +30,9 @@
//---------------------------------------------------------------------------
using System;
+using System.Collections.Generic;
using System.Net;
+using System.Threading;
using System.Threading.Tasks;
using Integration;
using RabbitMQ.Client;
@@ -105,9 +107,9 @@ public async Task TestCloseConnection()
}
/*
- * Note: using TrySetResult because this callback will be called when the
- * test exits, and connectionShutdownTcs will have already been set
- */
+ * Note: using TrySetResult because this callback will be called when the
+ * test exits, and connectionShutdownTcs will have already been set
+ */
connectionShutdownTcs.TrySetResult(true);
return Task.CompletedTask;
};
@@ -284,6 +286,108 @@ public async Task TestTcpReset_GH1464()
await recoveryTask;
}
+ [SkippableFact]
+ [Trait("Category", "Toxiproxy")]
+ public async Task TestPublisherConfirmationThrottling()
+ {
+ Skip.IfNot(AreToxiproxyTestsEnabled, "RABBITMQ_TOXIPROXY_TESTS is not set, skipping test");
+
+ const int TotalMessageCount = 64;
+ const int MaxOutstandingConfirms = 8;
+ const int BatchSize = MaxOutstandingConfirms * 2;
+
+ using var pm = new ToxiproxyManager(_testDisplayName, IsRunningInCI, IsWindows);
+ await pm.InitializeAsync();
+
+ ConnectionFactory cf = CreateConnectionFactory();
+ cf.Endpoint = new AmqpTcpEndpoint(IPAddress.Loopback.ToString(), pm.ProxyPort);
+ cf.RequestedHeartbeat = TimeSpan.FromSeconds(5);
+ cf.AutomaticRecoveryEnabled = true;
+
+ var channelOpts = new CreateChannelOptions
+ {
+ PublisherConfirmationsEnabled = true,
+ PublisherConfirmationTrackingEnabled = true,
+ OutstandingPublisherConfirmationsRateLimiter = new ThrottlingRateLimiter(MaxOutstandingConfirms)
+ };
+
+ var channelCreatedTcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously);
+ var messagesPublishedTcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously);
+ long publishCount = 0;
+ Task publishTask = Task.Run(async () =>
+ {
+ await using (IConnection conn = await cf.CreateConnectionAsync())
+ {
+ await using (IChannel ch = await conn.CreateChannelAsync(channelOpts))
+ {
+ QueueDeclareOk q = await ch.QueueDeclareAsync();
+
+ channelCreatedTcs.SetResult(true);
+
+ try
+ {
+ var publishBatch = new List();
+ while (publishCount < TotalMessageCount)
+ {
+ for (int i = 0; i < BatchSize; i++)
+ {
+ publishBatch.Add(ch.BasicPublishAsync("", q.QueueName, GetRandomBody()));
+ }
+
+ foreach (ValueTask pt in publishBatch)
+ {
+ await pt;
+ Interlocked.Increment(ref publishCount);
+ }
+
+ publishBatch.Clear();
+ }
+
+ messagesPublishedTcs.SetResult(true);
+ }
+ catch (Exception ex)
+ {
+ messagesPublishedTcs.SetException(ex);
+ }
+ }
+ }
+ });
+
+ await channelCreatedTcs.Task;
+
+ const string toxicName = "rmq-localhost-bandwidth";
+ var bandwidthToxic = new BandwidthToxic();
+ bandwidthToxic.Name = toxicName;
+ bandwidthToxic.Attributes.Rate = 0;
+ bandwidthToxic.Toxicity = 1.0;
+ bandwidthToxic.Stream = ToxicDirection.DownStream;
+
+ await Task.Delay(TimeSpan.FromSeconds(1));
+
+ Task addToxicTask = pm.AddToxicAsync(bandwidthToxic);
+
+ while (true)
+ {
+ long publishCount0 = Interlocked.Read(ref publishCount);
+ await Task.Delay(TimeSpan.FromSeconds(5));
+ long publishCount1 = Interlocked.Read(ref publishCount);
+
+ if (publishCount0 == publishCount1)
+ {
+ // Publishing has "settled" due to being blocked
+ break;
+ }
+ }
+
+ await addToxicTask.WaitAsync(WaitSpan);
+ await pm.RemoveToxicAsync(toxicName).WaitAsync(WaitSpan);
+
+ await messagesPublishedTcs.Task.WaitAsync(WaitSpan);
+ await publishTask.WaitAsync(WaitSpan);
+
+ Assert.Equal(TotalMessageCount, publishCount);
+ }
+
private bool AreToxiproxyTestsEnabled
{
get