Skip to content

Commit 9be2342

Browse files
committed
* Add MaxOutstandingPublisherConfirmations to CreateChannelOptions
1 parent 6cab6ee commit 9be2342

File tree

8 files changed

+64
-15
lines changed

8 files changed

+64
-15
lines changed

projects/RabbitMQ.Client/CreateChannelOptions.cs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,16 @@ public sealed class CreateChannelOptions
1515
/// </summary>
1616
public bool PublisherConfirmationTrackingEnabled { get; set; } = false;
1717

18+
/// <summary>
19+
/// The number of allowed outstanding publisher confirmations before publishing is blocked.
20+
///
21+
/// Defaults to <c>128</c>
22+
///
23+
/// Set to <c>null</c>, to allow an unlimited number of outstanding confirmations.
24+
///
25+
/// </summary>
26+
public ushort? MaxOutstandingPublisherConfirmations { get; set; } = 128;
27+
1828
/// <summary>
1929
/// Set to a value greater than one to enable concurrent processing. For a concurrency greater than one <see cref="IAsyncBasicConsumer"/>
2030
/// will be offloaded to the worker thread pool so it is important to choose the value for the concurrency wisely to avoid thread pool overloading.

projects/RabbitMQ.Client/Impl/AutorecoveringChannel.cs

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@ internal sealed class AutorecoveringChannel : IChannel, IRecoverable
5151
private ushort _prefetchCountGlobal;
5252
private bool _publisherConfirmationsEnabled = false;
5353
private bool _publisherConfirmationTrackingEnabled = false;
54+
private ushort? _maxOutstandingPublisherConfirmations = null;
5455
private bool _usesTransactions;
5556
private ushort _consumerDispatchConcurrency;
5657

@@ -71,14 +72,20 @@ public TimeSpan ContinuationTimeout
7172
set => InnerChannel.ContinuationTimeout = value;
7273
}
7374

74-
public AutorecoveringChannel(AutorecoveringConnection conn, RecoveryAwareChannel innerChannel,
75-
ushort consumerDispatchConcurrency, bool publisherConfirmationsEnabled, bool publisherConfirmationTrackingEnabled)
75+
// TODO just pass create channel options
76+
public AutorecoveringChannel(AutorecoveringConnection conn,
77+
RecoveryAwareChannel innerChannel,
78+
ushort consumerDispatchConcurrency,
79+
bool publisherConfirmationsEnabled,
80+
bool publisherConfirmationTrackingEnabled,
81+
ushort? maxOutstandingPublisherConfirmations)
7682
{
7783
_connection = conn;
7884
_innerChannel = innerChannel;
7985
_consumerDispatchConcurrency = consumerDispatchConcurrency;
8086
_publisherConfirmationsEnabled = publisherConfirmationsEnabled;
8187
_publisherConfirmationTrackingEnabled = publisherConfirmationTrackingEnabled;
88+
_maxOutstandingPublisherConfirmations = maxOutstandingPublisherConfirmations;
8289
}
8390

8491
public event AsyncEventHandler<BasicAckEventArgs> BasicAcksAsync
@@ -164,8 +171,11 @@ internal async Task<bool> AutomaticallyRecoverAsync(AutorecoveringConnection con
164171
_connection = conn;
165172

166173
RecoveryAwareChannel newChannel = await conn.CreateNonRecoveringChannelAsync(
167-
_publisherConfirmationsEnabled, _publisherConfirmationTrackingEnabled,
168-
_consumerDispatchConcurrency, cancellationToken)
174+
_publisherConfirmationsEnabled,
175+
_publisherConfirmationTrackingEnabled,
176+
_maxOutstandingPublisherConfirmations,
177+
_consumerDispatchConcurrency,
178+
cancellationToken)
169179
.ConfigureAwait(false);
170180
newChannel.TakeOver(_innerChannel);
171181

projects/RabbitMQ.Client/Impl/AutorecoveringConnection.cs

Lines changed: 18 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -184,16 +184,21 @@ public event AsyncEventHandler<RecoveringConsumerEventArgs> RecoveringConsumerAs
184184

185185
public IProtocol Protocol => Endpoint.Protocol;
186186

187+
// TODO pass channel creation options?
187188
public async ValueTask<RecoveryAwareChannel> CreateNonRecoveringChannelAsync(
188189
bool publisherConfirmationsEnabled = false,
189190
bool publisherConfirmationTrackingEnabled = false,
191+
ushort? maxOutstandingPublisherConfirmations = null,
190192
ushort? consumerDispatchConcurrency = null,
191193
CancellationToken cancellationToken = default)
192194
{
193195
ISession session = InnerConnection.CreateSession();
194196
var result = new RecoveryAwareChannel(_config, session, consumerDispatchConcurrency);
195197
return (RecoveryAwareChannel)await result.OpenAsync(
196-
publisherConfirmationsEnabled, publisherConfirmationTrackingEnabled, cancellationToken)
198+
publisherConfirmationsEnabled,
199+
publisherConfirmationTrackingEnabled,
200+
maxOutstandingPublisherConfirmations,
201+
cancellationToken)
197202
.ConfigureAwait(false);
198203
}
199204

@@ -266,11 +271,20 @@ public async Task<IChannel> CreateChannelAsync(CreateChannelOptions? options = d
266271
ushort cdc = options.ConsumerDispatchConcurrency.GetValueOrDefault(_config.ConsumerDispatchConcurrency);
267272

268273
RecoveryAwareChannel recoveryAwareChannel = await CreateNonRecoveringChannelAsync(
269-
options.PublisherConfirmationsEnabled, options.PublisherConfirmationTrackingEnabled, cdc, cancellationToken)
274+
options.PublisherConfirmationsEnabled,
275+
options.PublisherConfirmationTrackingEnabled,
276+
options.MaxOutstandingPublisherConfirmations,
277+
cdc,
278+
cancellationToken)
270279
.ConfigureAwait(false);
271280

272-
var autorecoveringChannel = new AutorecoveringChannel(this, recoveryAwareChannel, cdc,
273-
options.PublisherConfirmationsEnabled, options.PublisherConfirmationTrackingEnabled);
281+
// TODO just pass create channel options
282+
var autorecoveringChannel = new AutorecoveringChannel(this,
283+
recoveryAwareChannel,
284+
cdc,
285+
options.PublisherConfirmationsEnabled,
286+
options.PublisherConfirmationTrackingEnabled,
287+
options.MaxOutstandingPublisherConfirmations);
274288
await RecordChannelAsync(autorecoveringChannel, channelsSemaphoreHeld: false, cancellationToken: cancellationToken)
275289
.ConfigureAwait(false);
276290
return autorecoveringChannel;

projects/RabbitMQ.Client/Impl/Channel.BasicPublish.cs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,8 @@ namespace RabbitMQ.Client.Impl
4242
{
4343
internal partial class Channel : IChannel, IRecoverable
4444
{
45+
private readonly AsyncManualResetEvent _flowControlBlock = new(true);
46+
4547
public async ValueTask BasicPublishAsync<TProperties>(string exchange, string routingKey,
4648
bool mandatory, TProperties basicProperties, ReadOnlyMemory<byte> body,
4749
CancellationToken cancellationToken = default)

projects/RabbitMQ.Client/Impl/Channel.PublisherConfirms.cs

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,9 +47,11 @@ internal partial class Channel : IChannel, IRecoverable
4747
{
4848
private bool _publisherConfirmationsEnabled = false;
4949
private bool _publisherConfirmationTrackingEnabled = false;
50+
private ushort? _maxOutstandingPublisherConfirmations = null;
5051
private ulong _nextPublishSeqNo = 0;
5152
private readonly SemaphoreSlim _confirmSemaphore = new(1, 1);
5253
private readonly ConcurrentDictionary<ulong, TaskCompletionSource<bool>> _confirmsTaskCompletionSources = new();
54+
private readonly AsyncManualResetEvent _maxOutstandingPublisherConfirmsReached = new(true);
5355

5456
private class PublisherConfirmationInfo
5557
{
@@ -115,10 +117,13 @@ public async ValueTask<ulong> GetNextPublishSequenceNumberAsync(CancellationToke
115117
}
116118
}
117119

118-
private void ConfigurePublisherConfirmations(bool publisherConfirmationsEnabled, bool publisherConfirmationTrackingEnabled)
120+
private void ConfigurePublisherConfirmations(bool publisherConfirmationsEnabled,
121+
bool publisherConfirmationTrackingEnabled,
122+
ushort? maxOutstandingPublisherConfirmations)
119123
{
120124
_publisherConfirmationsEnabled = publisherConfirmationsEnabled;
121125
_publisherConfirmationTrackingEnabled = publisherConfirmationTrackingEnabled;
126+
_maxOutstandingPublisherConfirmations = maxOutstandingPublisherConfirmations;
122127
}
123128

124129
private async Task MaybeConfirmSelect(CancellationToken cancellationToken)

projects/RabbitMQ.Client/Impl/Channel.cs

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,6 @@ internal partial class Channel : IChannel, IRecoverable
5555
// AMQP only allows one RPC operation to be active at a time.
5656
protected readonly SemaphoreSlim _rpcSemaphore = new SemaphoreSlim(1, 1);
5757
private readonly RpcContinuationQueue _continuationQueue = new RpcContinuationQueue();
58-
private readonly AsyncManualResetEvent _flowControlBlock = new AsyncManualResetEvent(true);
5958

6059
private ShutdownEventArgs? _closeReason;
6160
public ShutdownEventArgs? CloseReason => Volatile.Read(ref _closeReason);
@@ -361,11 +360,14 @@ protected bool Enqueue(IRpcContinuation k)
361360
}
362361
}
363362

364-
internal async Task<IChannel> OpenAsync(bool publisherConfirmationsEnabled = false,
365-
bool publisherConfirmationTrackingEnabled = false,
366-
CancellationToken cancellationToken = default)
363+
internal async Task<IChannel> OpenAsync(bool publisherConfirmationsEnabled,
364+
bool publisherConfirmationTrackingEnabled,
365+
ushort? maxOutstandingPublisherConfirmations,
366+
CancellationToken cancellationToken)
367367
{
368-
ConfigurePublisherConfirmations(publisherConfirmationsEnabled, publisherConfirmationTrackingEnabled);
368+
ConfigurePublisherConfirmations(publisherConfirmationsEnabled,
369+
publisherConfirmationTrackingEnabled,
370+
maxOutstandingPublisherConfirmations);
369371

370372
bool enqueued = false;
371373
var k = new ChannelOpenAsyncRpcContinuation(ContinuationTimeout, cancellationToken);

projects/RabbitMQ.Client/Impl/Connection.cs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -273,7 +273,11 @@ public async Task<IChannel> CreateChannelAsync(CreateChannelOptions? options = d
273273

274274
// TODO channel CreateChannelAsync() to combine ctor and OpenAsync
275275
var channel = new Channel(_config, session, options.ConsumerDispatchConcurrency);
276-
IChannel ch = await channel.OpenAsync(options.PublisherConfirmationsEnabled, options.PublisherConfirmationTrackingEnabled, cancellationToken)
276+
IChannel ch = await channel.OpenAsync(
277+
options.PublisherConfirmationsEnabled,
278+
options.PublisherConfirmationTrackingEnabled,
279+
options.MaxOutstandingPublisherConfirmations,
280+
cancellationToken)
277281
.ConfigureAwait(false);
278282
return ch;
279283
}

projects/RabbitMQ.Client/PublicAPI.Unshipped.txt

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,8 @@ RabbitMQ.Client.CreateChannelOptions
33
RabbitMQ.Client.CreateChannelOptions.ConsumerDispatchConcurrency.get -> ushort?
44
RabbitMQ.Client.CreateChannelOptions.ConsumerDispatchConcurrency.set -> void
55
RabbitMQ.Client.CreateChannelOptions.CreateChannelOptions() -> void
6+
RabbitMQ.Client.CreateChannelOptions.MaxOutstandingPublisherConfirmations.get -> ushort?
7+
RabbitMQ.Client.CreateChannelOptions.MaxOutstandingPublisherConfirmations.set -> void
68
RabbitMQ.Client.CreateChannelOptions.PublisherConfirmationsEnabled.get -> bool
79
RabbitMQ.Client.CreateChannelOptions.PublisherConfirmationsEnabled.set -> void
810
RabbitMQ.Client.CreateChannelOptions.PublisherConfirmationTrackingEnabled.get -> bool

0 commit comments

Comments
 (0)