Skip to content

Commit 9343e8d

Browse files
committed
* Actually enforce outstanding pub conf limit.
1 parent 9be2342 commit 9343e8d

File tree

2 files changed

+61
-3
lines changed

2 files changed

+61
-3
lines changed

projects/RabbitMQ.Client/Impl/Channel.BasicPublish.cs

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,10 @@ public async ValueTask BasicPublishAsync<TProperties>(string exchange, string ro
5656
await MaybeStartPublisherConfirmationTracking(cancellationToken)
5757
.ConfigureAwait(false);
5858

59-
await EnforceFlowControlAsync(cancellationToken)
59+
await MaybeEnforceFlowControlAsync(cancellationToken)
60+
.ConfigureAwait(false);
61+
62+
await MaybeEnforceOutstandingPublisherConfirmationsAsync(cancellationToken)
6063
.ConfigureAwait(false);
6164

6265
var cmd = new BasicPublish(exchange, routingKey, mandatory, default);
@@ -111,7 +114,10 @@ public async ValueTask BasicPublishAsync<TProperties>(CachedString exchange, Cac
111114
await MaybeStartPublisherConfirmationTracking(cancellationToken)
112115
.ConfigureAwait(false);
113116

114-
await EnforceFlowControlAsync(cancellationToken)
117+
await MaybeEnforceFlowControlAsync(cancellationToken)
118+
.ConfigureAwait(false);
119+
120+
await MaybeEnforceOutstandingPublisherConfirmationsAsync(cancellationToken)
115121
.ConfigureAwait(false);
116122

117123
var cmd = new BasicPublishMemory(exchange.Bytes, routingKey.Bytes, mandatory, default);
@@ -225,7 +231,7 @@ void MaybeAddPublishSequenceNumberToHeaders(IDictionary<string, object?> headers
225231
}
226232

227233
[MethodImpl(MethodImplOptions.AggressiveInlining)]
228-
private ValueTask EnforceFlowControlAsync(CancellationToken cancellationToken)
234+
private ValueTask MaybeEnforceFlowControlAsync(CancellationToken cancellationToken)
229235
{
230236
if (_flowControlBlock.IsSet)
231237
{

projects/RabbitMQ.Client/Impl/Channel.PublisherConfirms.cs

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -141,6 +141,7 @@ private async Task MaybeConfirmSelect(CancellationToken cancellationToken)
141141
if (_publisherConfirmationTrackingEnabled)
142142
{
143143
_confirmsTaskCompletionSources.Clear();
144+
MaybeUnblockPublishers();
144145
}
145146
_nextPublishSeqNo = 1;
146147
}
@@ -186,13 +187,15 @@ private void HandleAck(ulong deliveryTag, bool multiple)
186187
{
187188
pair.Value.SetResult(true);
188189
_confirmsTaskCompletionSources.Remove(pair.Key, out _);
190+
MaybeUnblockPublishers();
189191
}
190192
}
191193
}
192194
else
193195
{
194196
if (_confirmsTaskCompletionSources.TryRemove(deliveryTag, out TaskCompletionSource<bool>? tcs))
195197
{
198+
MaybeUnblockPublishers();
196199
tcs.SetResult(true);
197200
}
198201
}
@@ -212,13 +215,15 @@ private void HandleNack(ulong deliveryTag, bool multiple, bool isReturn)
212215
{
213216
pair.Value.SetException(new PublishException(pair.Key, isReturn));
214217
_confirmsTaskCompletionSources.Remove(pair.Key, out _);
218+
MaybeUnblockPublishers();
215219
}
216220
}
217221
}
218222
else
219223
{
220224
if (_confirmsTaskCompletionSources.Remove(deliveryTag, out TaskCompletionSource<bool>? tcs))
221225
{
226+
MaybeUnblockPublishers();
222227
tcs.SetException(new PublishException(deliveryTag, isReturn));
223228
}
224229
}
@@ -261,6 +266,7 @@ await _confirmSemaphore.WaitAsync(reason.CancellationToken)
261266
}
262267

263268
_confirmsTaskCompletionSources.Clear();
269+
MaybeUnblockPublishers();
264270
}
265271
}
266272
finally
@@ -285,6 +291,7 @@ await _confirmSemaphore.WaitAsync(cancellationToken)
285291
{
286292
publisherConfirmationTcs = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);
287293
_confirmsTaskCompletionSources[publishSequenceNumber] = publisherConfirmationTcs;
294+
MaybeBlockPublishers();
288295
}
289296

290297
_nextPublishSeqNo++;
@@ -311,6 +318,7 @@ private bool MaybeHandleExceptionWithEnabledPublisherConfirmations(PublisherConf
311318
if (_publisherConfirmationTrackingEnabled)
312319
{
313320
_confirmsTaskCompletionSources.TryRemove(publisherConfirmationInfo.PublishSequenceNumber, out _);
321+
MaybeUnblockPublishers();
314322
}
315323

316324
exceptionWasHandled = publisherConfirmationInfo.MaybeHandleException(ex);
@@ -334,5 +342,49 @@ await publisherConfirmationInfo.MaybeWaitForConfirmationAsync(cancellationToken)
334342
}
335343
}
336344
}
345+
346+
[MethodImpl(MethodImplOptions.AggressiveInlining)]
347+
private ValueTask MaybeEnforceOutstandingPublisherConfirmationsAsync(CancellationToken cancellationToken)
348+
{
349+
if (_publisherConfirmationTrackingEnabled)
350+
{
351+
if (_maxOutstandingPublisherConfirmsReached.IsSet)
352+
{
353+
return default;
354+
}
355+
else
356+
{
357+
return _maxOutstandingPublisherConfirmsReached.WaitAsync(cancellationToken);
358+
}
359+
}
360+
361+
return default;
362+
}
363+
364+
[MethodImpl(MethodImplOptions.AggressiveInlining)]
365+
private void MaybeBlockPublishers()
366+
{
367+
if (_publisherConfirmationTrackingEnabled)
368+
{
369+
if (_maxOutstandingPublisherConfirmations is not null
370+
&& _confirmsTaskCompletionSources.Count >= _maxOutstandingPublisherConfirmations)
371+
{
372+
_maxOutstandingPublisherConfirmsReached.Reset();
373+
}
374+
}
375+
}
376+
377+
[MethodImpl(MethodImplOptions.AggressiveInlining)]
378+
private void MaybeUnblockPublishers()
379+
{
380+
if (_publisherConfirmationTrackingEnabled)
381+
{
382+
if (_maxOutstandingPublisherConfirmations is not null
383+
&& _confirmsTaskCompletionSources.Count < _maxOutstandingPublisherConfirmations)
384+
{
385+
_maxOutstandingPublisherConfirmsReached.Set();
386+
}
387+
}
388+
}
337389
}
338390
}

0 commit comments

Comments
 (0)