diff --git a/projects/RabbitMQ.Client/CreateChannelOptions.cs b/projects/RabbitMQ.Client/CreateChannelOptions.cs
index 17a9d6105..647177972 100644
--- a/projects/RabbitMQ.Client/CreateChannelOptions.cs
+++ b/projects/RabbitMQ.Client/CreateChannelOptions.cs
@@ -15,6 +15,17 @@ public sealed class CreateChannelOptions
///
public bool PublisherConfirmationTrackingEnabled { get; set; } = false;
+ ///
+ /// If publisher confirmation tracking is enabled, this represents the number of allowed
+ /// outstanding publisher confirmations before publishing is blocked.
+ ///
+ /// Defaults to 128
+ ///
+ /// Set to null, to allow an unlimited number of outstanding confirmations.
+ ///
+ ///
+ public ushort? MaxOutstandingPublisherConfirmations { get; set; } = 128;
+
///
/// Set to a value greater than one to enable concurrent processing. For a concurrency greater than one
/// will be offloaded to the worker thread pool so it is important to choose the value for the concurrency wisely to avoid thread pool overloading.
diff --git a/projects/RabbitMQ.Client/Impl/AutorecoveringChannel.cs b/projects/RabbitMQ.Client/Impl/AutorecoveringChannel.cs
index 2a2efec63..2b8a4d62f 100644
--- a/projects/RabbitMQ.Client/Impl/AutorecoveringChannel.cs
+++ b/projects/RabbitMQ.Client/Impl/AutorecoveringChannel.cs
@@ -51,6 +51,7 @@ internal sealed class AutorecoveringChannel : IChannel, IRecoverable
private ushort _prefetchCountGlobal;
private bool _publisherConfirmationsEnabled = false;
private bool _publisherConfirmationTrackingEnabled = false;
+ private ushort? _maxOutstandingPublisherConfirmations = null;
private bool _usesTransactions;
private ushort _consumerDispatchConcurrency;
@@ -71,14 +72,20 @@ public TimeSpan ContinuationTimeout
set => InnerChannel.ContinuationTimeout = value;
}
- public AutorecoveringChannel(AutorecoveringConnection conn, RecoveryAwareChannel innerChannel,
- ushort consumerDispatchConcurrency, bool publisherConfirmationsEnabled, bool publisherConfirmationTrackingEnabled)
+ // TODO just pass create channel options
+ public AutorecoveringChannel(AutorecoveringConnection conn,
+ RecoveryAwareChannel innerChannel,
+ ushort consumerDispatchConcurrency,
+ bool publisherConfirmationsEnabled,
+ bool publisherConfirmationTrackingEnabled,
+ ushort? maxOutstandingPublisherConfirmations)
{
_connection = conn;
_innerChannel = innerChannel;
_consumerDispatchConcurrency = consumerDispatchConcurrency;
_publisherConfirmationsEnabled = publisherConfirmationsEnabled;
_publisherConfirmationTrackingEnabled = publisherConfirmationTrackingEnabled;
+ _maxOutstandingPublisherConfirmations = maxOutstandingPublisherConfirmations;
}
public event AsyncEventHandler BasicAcksAsync
@@ -164,8 +171,11 @@ internal async Task AutomaticallyRecoverAsync(AutorecoveringConnection con
_connection = conn;
RecoveryAwareChannel newChannel = await conn.CreateNonRecoveringChannelAsync(
- _publisherConfirmationsEnabled, _publisherConfirmationTrackingEnabled,
- _consumerDispatchConcurrency, cancellationToken)
+ _publisherConfirmationsEnabled,
+ _publisherConfirmationTrackingEnabled,
+ _maxOutstandingPublisherConfirmations,
+ _consumerDispatchConcurrency,
+ cancellationToken)
.ConfigureAwait(false);
newChannel.TakeOver(_innerChannel);
diff --git a/projects/RabbitMQ.Client/Impl/AutorecoveringConnection.cs b/projects/RabbitMQ.Client/Impl/AutorecoveringConnection.cs
index 5cee094c3..3e544bb84 100644
--- a/projects/RabbitMQ.Client/Impl/AutorecoveringConnection.cs
+++ b/projects/RabbitMQ.Client/Impl/AutorecoveringConnection.cs
@@ -184,16 +184,21 @@ public event AsyncEventHandler RecoveringConsumerAs
public IProtocol Protocol => Endpoint.Protocol;
+ // TODO pass channel creation options?
public async ValueTask CreateNonRecoveringChannelAsync(
bool publisherConfirmationsEnabled = false,
bool publisherConfirmationTrackingEnabled = false,
+ ushort? maxOutstandingPublisherConfirmations = null,
ushort? consumerDispatchConcurrency = null,
CancellationToken cancellationToken = default)
{
ISession session = InnerConnection.CreateSession();
var result = new RecoveryAwareChannel(_config, session, consumerDispatchConcurrency);
return (RecoveryAwareChannel)await result.OpenAsync(
- publisherConfirmationsEnabled, publisherConfirmationTrackingEnabled, cancellationToken)
+ publisherConfirmationsEnabled,
+ publisherConfirmationTrackingEnabled,
+ maxOutstandingPublisherConfirmations,
+ cancellationToken)
.ConfigureAwait(false);
}
@@ -266,11 +271,20 @@ public async Task CreateChannelAsync(CreateChannelOptions? options = d
ushort cdc = options.ConsumerDispatchConcurrency.GetValueOrDefault(_config.ConsumerDispatchConcurrency);
RecoveryAwareChannel recoveryAwareChannel = await CreateNonRecoveringChannelAsync(
- options.PublisherConfirmationsEnabled, options.PublisherConfirmationTrackingEnabled, cdc, cancellationToken)
+ options.PublisherConfirmationsEnabled,
+ options.PublisherConfirmationTrackingEnabled,
+ options.MaxOutstandingPublisherConfirmations,
+ cdc,
+ cancellationToken)
.ConfigureAwait(false);
- var autorecoveringChannel = new AutorecoveringChannel(this, recoveryAwareChannel, cdc,
- options.PublisherConfirmationsEnabled, options.PublisherConfirmationTrackingEnabled);
+ // TODO just pass create channel options
+ var autorecoveringChannel = new AutorecoveringChannel(this,
+ recoveryAwareChannel,
+ cdc,
+ options.PublisherConfirmationsEnabled,
+ options.PublisherConfirmationTrackingEnabled,
+ options.MaxOutstandingPublisherConfirmations);
await RecordChannelAsync(autorecoveringChannel, channelsSemaphoreHeld: false, cancellationToken: cancellationToken)
.ConfigureAwait(false);
return autorecoveringChannel;
diff --git a/projects/RabbitMQ.Client/Impl/Channel.BasicPublish.cs b/projects/RabbitMQ.Client/Impl/Channel.BasicPublish.cs
index a0a084d02..04744cb0e 100644
--- a/projects/RabbitMQ.Client/Impl/Channel.BasicPublish.cs
+++ b/projects/RabbitMQ.Client/Impl/Channel.BasicPublish.cs
@@ -32,6 +32,7 @@
using System;
using System.Collections.Generic;
using System.Diagnostics;
+using System.Runtime.CompilerServices;
using System.Threading;
using System.Threading.Tasks;
using RabbitMQ.Client.Framing;
@@ -41,6 +42,8 @@ namespace RabbitMQ.Client.Impl
{
internal partial class Channel : IChannel, IRecoverable
{
+ private readonly AsyncManualResetEvent _flowControlBlock = new(true);
+
public async ValueTask BasicPublishAsync(string exchange, string routingKey,
bool mandatory, TProperties basicProperties, ReadOnlyMemory body,
CancellationToken cancellationToken = default)
@@ -53,7 +56,7 @@ public async ValueTask BasicPublishAsync(string exchange, string ro
await MaybeStartPublisherConfirmationTracking(cancellationToken)
.ConfigureAwait(false);
- await EnforceFlowControlAsync(cancellationToken)
+ await MaybeEnforceFlowControlAsync(cancellationToken)
.ConfigureAwait(false);
var cmd = new BasicPublish(exchange, routingKey, mandatory, default);
@@ -108,7 +111,7 @@ public async ValueTask BasicPublishAsync(CachedString exchange, Cac
await MaybeStartPublisherConfirmationTracking(cancellationToken)
.ConfigureAwait(false);
- await EnforceFlowControlAsync(cancellationToken)
+ await MaybeEnforceFlowControlAsync(cancellationToken)
.ConfigureAwait(false);
var cmd = new BasicPublishMemory(exchange.Bytes, routingKey.Bytes, mandatory, default);
@@ -220,5 +223,16 @@ void MaybeAddPublishSequenceNumberToHeaders(IDictionary headers
}
}
}
+
+ [MethodImpl(MethodImplOptions.AggressiveInlining)]
+ private ValueTask MaybeEnforceFlowControlAsync(CancellationToken cancellationToken)
+ {
+ if (_flowControlBlock.IsSet)
+ {
+ return default;
+ }
+
+ return _flowControlBlock.WaitAsync(cancellationToken);
+ }
}
}
diff --git a/projects/RabbitMQ.Client/Impl/Channel.PublisherConfirms.cs b/projects/RabbitMQ.Client/Impl/Channel.PublisherConfirms.cs
index f5617611e..f00311b25 100644
--- a/projects/RabbitMQ.Client/Impl/Channel.PublisherConfirms.cs
+++ b/projects/RabbitMQ.Client/Impl/Channel.PublisherConfirms.cs
@@ -47,6 +47,8 @@ internal partial class Channel : IChannel, IRecoverable
{
private bool _publisherConfirmationsEnabled = false;
private bool _publisherConfirmationTrackingEnabled = false;
+ private ushort? _maxOutstandingPublisherConfirmations = null;
+ private SemaphoreSlim? _maxOutstandingConfirmationsSemaphore;
private ulong _nextPublishSeqNo = 0;
private readonly SemaphoreSlim _confirmSemaphore = new(1, 1);
private readonly ConcurrentDictionary> _confirmsTaskCompletionSources = new();
@@ -115,10 +117,20 @@ public async ValueTask GetNextPublishSequenceNumberAsync(CancellationToke
}
}
- private void ConfigurePublisherConfirmations(bool publisherConfirmationsEnabled, bool publisherConfirmationTrackingEnabled)
+ private void ConfigurePublisherConfirmations(bool publisherConfirmationsEnabled,
+ bool publisherConfirmationTrackingEnabled,
+ ushort? maxOutstandingPublisherConfirmations)
{
_publisherConfirmationsEnabled = publisherConfirmationsEnabled;
_publisherConfirmationTrackingEnabled = publisherConfirmationTrackingEnabled;
+ _maxOutstandingPublisherConfirmations = maxOutstandingPublisherConfirmations;
+
+ if (_publisherConfirmationTrackingEnabled && _maxOutstandingPublisherConfirmations is not null)
+ {
+ _maxOutstandingConfirmationsSemaphore = new SemaphoreSlim(
+ (int)_maxOutstandingPublisherConfirmations,
+ (int)_maxOutstandingPublisherConfirmations);
+ }
}
private async Task MaybeConfirmSelect(CancellationToken cancellationToken)
@@ -270,6 +282,13 @@ await _confirmSemaphore.WaitAsync(reason.CancellationToken)
{
if (_publisherConfirmationsEnabled)
{
+ if (_publisherConfirmationTrackingEnabled &&
+ _maxOutstandingConfirmationsSemaphore is not null)
+ {
+ await _maxOutstandingConfirmationsSemaphore.WaitAsync(cancellationToken)
+ .ConfigureAwait(false);
+ }
+
await _confirmSemaphore.WaitAsync(cancellationToken)
.ConfigureAwait(false);
@@ -320,6 +339,12 @@ private async Task MaybeEndPublisherConfirmationTracking(PublisherConfirmationIn
{
if (_publisherConfirmationsEnabled)
{
+ if (_publisherConfirmationTrackingEnabled &&
+ _maxOutstandingConfirmationsSemaphore is not null)
+ {
+ _maxOutstandingConfirmationsSemaphore.Release();
+ }
+
_confirmSemaphore.Release();
if (publisherConfirmationInfo is not null)
diff --git a/projects/RabbitMQ.Client/Impl/Channel.cs b/projects/RabbitMQ.Client/Impl/Channel.cs
index 6ca0c9a93..871a84d60 100644
--- a/projects/RabbitMQ.Client/Impl/Channel.cs
+++ b/projects/RabbitMQ.Client/Impl/Channel.cs
@@ -55,7 +55,6 @@ internal partial class Channel : IChannel, IRecoverable
// AMQP only allows one RPC operation to be active at a time.
protected readonly SemaphoreSlim _rpcSemaphore = new SemaphoreSlim(1, 1);
private readonly RpcContinuationQueue _continuationQueue = new RpcContinuationQueue();
- private readonly AsyncManualResetEvent _flowControlBlock = new AsyncManualResetEvent(true);
private ShutdownEventArgs? _closeReason;
public ShutdownEventArgs? CloseReason => Volatile.Read(ref _closeReason);
@@ -361,11 +360,14 @@ protected bool Enqueue(IRpcContinuation k)
}
}
- internal async Task OpenAsync(bool publisherConfirmationsEnabled = false,
- bool publisherConfirmationTrackingEnabled = false,
- CancellationToken cancellationToken = default)
+ internal async Task OpenAsync(bool publisherConfirmationsEnabled,
+ bool publisherConfirmationTrackingEnabled,
+ ushort? maxOutstandingPublisherConfirmations,
+ CancellationToken cancellationToken)
{
- ConfigurePublisherConfirmations(publisherConfirmationsEnabled, publisherConfirmationTrackingEnabled);
+ ConfigurePublisherConfirmations(publisherConfirmationsEnabled,
+ publisherConfirmationTrackingEnabled,
+ maxOutstandingPublisherConfirmations);
bool enqueued = false;
var k = new ChannelOpenAsyncRpcContinuation(ContinuationTimeout, cancellationToken);
@@ -450,17 +452,6 @@ protected ValueTask ModelSendAsync(in TMethod method, in THead
return Session.TransmitAsync(in method, in header, body, cancellationToken);
}
- [MethodImpl(MethodImplOptions.AggressiveInlining)]
- protected ValueTask EnforceFlowControlAsync(CancellationToken cancellationToken)
- {
- if (_flowControlBlock.IsSet)
- {
- return default;
- }
-
- return _flowControlBlock.WaitAsync(cancellationToken);
- }
-
internal Task OnCallbackExceptionAsync(CallbackExceptionEventArgs args)
{
return _callbackExceptionAsyncWrapper.InvokeAsync(this, args);
@@ -540,7 +531,8 @@ protected virtual void Dispose(bool disposing)
ConsumerDispatcher.Dispose();
_rpcSemaphore.Dispose();
- _confirmSemaphore?.Dispose();
+ _confirmSemaphore.Dispose();
+ _maxOutstandingConfirmationsSemaphore?.Dispose();
}
}
@@ -561,7 +553,8 @@ protected virtual async ValueTask DisposeAsyncCore()
ConsumerDispatcher.Dispose();
_rpcSemaphore.Dispose();
- _confirmSemaphore?.Dispose();
+ _confirmSemaphore.Dispose();
+ _maxOutstandingConfirmationsSemaphore?.Dispose();
}
public Task ConnectionTuneOkAsync(ushort channelMax, uint frameMax, ushort heartbeat, CancellationToken cancellationToken)
diff --git a/projects/RabbitMQ.Client/Impl/Connection.cs b/projects/RabbitMQ.Client/Impl/Connection.cs
index 333f24a4b..870ca5fc1 100644
--- a/projects/RabbitMQ.Client/Impl/Connection.cs
+++ b/projects/RabbitMQ.Client/Impl/Connection.cs
@@ -273,7 +273,11 @@ public async Task CreateChannelAsync(CreateChannelOptions? options = d
// TODO channel CreateChannelAsync() to combine ctor and OpenAsync
var channel = new Channel(_config, session, options.ConsumerDispatchConcurrency);
- IChannel ch = await channel.OpenAsync(options.PublisherConfirmationsEnabled, options.PublisherConfirmationTrackingEnabled, cancellationToken)
+ IChannel ch = await channel.OpenAsync(
+ options.PublisherConfirmationsEnabled,
+ options.PublisherConfirmationTrackingEnabled,
+ options.MaxOutstandingPublisherConfirmations,
+ cancellationToken)
.ConfigureAwait(false);
return ch;
}
diff --git a/projects/RabbitMQ.Client/PublicAPI.Unshipped.txt b/projects/RabbitMQ.Client/PublicAPI.Unshipped.txt
index e9f625369..d811ca426 100644
--- a/projects/RabbitMQ.Client/PublicAPI.Unshipped.txt
+++ b/projects/RabbitMQ.Client/PublicAPI.Unshipped.txt
@@ -3,6 +3,8 @@ RabbitMQ.Client.CreateChannelOptions
RabbitMQ.Client.CreateChannelOptions.ConsumerDispatchConcurrency.get -> ushort?
RabbitMQ.Client.CreateChannelOptions.ConsumerDispatchConcurrency.set -> void
RabbitMQ.Client.CreateChannelOptions.CreateChannelOptions() -> void
+RabbitMQ.Client.CreateChannelOptions.MaxOutstandingPublisherConfirmations.get -> ushort?
+RabbitMQ.Client.CreateChannelOptions.MaxOutstandingPublisherConfirmations.set -> void
RabbitMQ.Client.CreateChannelOptions.PublisherConfirmationsEnabled.get -> bool
RabbitMQ.Client.CreateChannelOptions.PublisherConfirmationsEnabled.set -> void
RabbitMQ.Client.CreateChannelOptions.PublisherConfirmationTrackingEnabled.get -> bool
diff --git a/projects/Test/Applications/PublisherConfirms/PublisherConfirms.cs b/projects/Test/Applications/PublisherConfirms/PublisherConfirms.cs
index d2439f256..10e153b8e 100644
--- a/projects/Test/Applications/PublisherConfirms/PublisherConfirms.cs
+++ b/projects/Test/Applications/PublisherConfirms/PublisherConfirms.cs
@@ -38,9 +38,18 @@
using System.Threading.Tasks;
using RabbitMQ.Client;
+const ushort MAX_OUTSTANDING_CONFIRMS = 256;
+
const int MESSAGE_COUNT = 50_000;
bool debug = false;
+var channelOpts = new CreateChannelOptions
+{
+ PublisherConfirmationsEnabled = true,
+ PublisherConfirmationTrackingEnabled = true,
+ MaxOutstandingPublisherConfirmations = MAX_OUTSTANDING_CONFIRMS
+};
+
#pragma warning disable CS8321 // Local function is declared but never used
await PublishMessagesIndividuallyAsync();
@@ -53,12 +62,12 @@ static Task CreateConnectionAsync()
return factory.CreateConnectionAsync();
}
-static async Task PublishMessagesIndividuallyAsync()
+async Task PublishMessagesIndividuallyAsync()
{
Console.WriteLine($"{DateTime.Now} [INFO] publishing {MESSAGE_COUNT:N0} messages and handling confirms per-message");
await using IConnection connection = await CreateConnectionAsync();
- await using IChannel channel = await connection.CreateChannelAsync(new CreateChannelOptions { PublisherConfirmationsEnabled = true, PublisherConfirmationTrackingEnabled = true });
+ await using IChannel channel = await connection.CreateChannelAsync(channelOpts);
// declare a server-named queue
QueueDeclareOk queueDeclareResult = await channel.QueueDeclareAsync();
@@ -85,18 +94,18 @@ static async Task PublishMessagesIndividuallyAsync()
Console.WriteLine($"{DateTime.Now} [INFO] published {MESSAGE_COUNT:N0} messages individually in {sw.ElapsedMilliseconds:N0} ms");
}
-static async Task PublishMessagesInBatchAsync()
+async Task PublishMessagesInBatchAsync()
{
Console.WriteLine($"{DateTime.Now} [INFO] publishing {MESSAGE_COUNT:N0} messages and handling confirms in batches");
await using IConnection connection = await CreateConnectionAsync();
- await using IChannel channel = await connection.CreateChannelAsync(new CreateChannelOptions { PublisherConfirmationsEnabled = true, PublisherConfirmationTrackingEnabled = true });
+ await using IChannel channel = await connection.CreateChannelAsync(channelOpts);
// declare a server-named queue
QueueDeclareOk queueDeclareResult = await channel.QueueDeclareAsync();
string queueName = queueDeclareResult.QueueName;
- int batchSize = 1000;
+ int batchSize = MAX_OUTSTANDING_CONFIRMS;
int outstandingMessageCount = 0;
var sw = new Stopwatch();
@@ -154,12 +163,8 @@ async Task HandlePublishConfirmsAsynchronously()
await using IConnection connection = await CreateConnectionAsync();
- var channelOptions = new CreateChannelOptions
- {
- PublisherConfirmationsEnabled = true,
- PublisherConfirmationTrackingEnabled = false
- };
- await using IChannel channel = await connection.CreateChannelAsync(channelOptions);
+ channelOpts.PublisherConfirmationTrackingEnabled = false;
+ await using IChannel channel = await connection.CreateChannelAsync(channelOpts);
// declare a server-named queue
QueueDeclareOk queueDeclareResult = await channel.QueueDeclareAsync();