Skip to content

Commit 6673bd5

Browse files
authored
Merge pull request #1615 from rabbitmq/rabbitmq-dotnet-client-1611-followup
Add `DispatchConsumersAsyncEnabled` property on `IConnection` (#1611)
2 parents 56b3ebf + 5ccfd7f commit 6673bd5

File tree

7 files changed

+63
-2
lines changed

7 files changed

+63
-2
lines changed
Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1,2 @@
1-
RabbitMQ.Client.BasicProperties.BasicProperties(RabbitMQ.Client.ReadOnlyBasicProperties! input) -> void
1+
RabbitMQ.Client.BasicProperties.BasicProperties(RabbitMQ.Client.ReadOnlyBasicProperties! input) -> void
2+
RabbitMQ.Client.IConnection.DispatchConsumersAsyncEnabled.get -> bool

projects/RabbitMQ.Client/client/api/IConnection.cs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -126,6 +126,11 @@ public interface IConnection : INetworkConnection, IDisposable
126126
/// </summary>
127127
IEnumerable<ShutdownReportEntry> ShutdownReport { get; }
128128

129+
/// <summary>
130+
/// Returns <c>true</c> if the connection is set to use asynchronous consumer dispatchers.
131+
/// </summary>
132+
public bool DispatchConsumersAsyncEnabled { get; }
133+
129134
/// <summary>
130135
/// Application-specific connection name, will be displayed in the management UI
131136
/// if RabbitMQ server supports it. This value doesn't have to be unique and cannot
@@ -236,5 +241,6 @@ Task CloseAsync(ushort reasonCode, string reasonText, TimeSpan timeout, bool abo
236241
/// </summary>
237242
/// <param name="cancellationToken">Cancellation token</param>
238243
Task<IChannel> CreateChannelAsync(CancellationToken cancellationToken = default);
244+
239245
}
240246
}

projects/RabbitMQ.Client/client/impl/AutorecoveringConnection.cs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -176,6 +176,8 @@ public event EventHandler<RecoveringConsumerEventArgs> RecoveringConsumer
176176

177177
public IProtocol Protocol => Endpoint.Protocol;
178178

179+
public bool DispatchConsumersAsyncEnabled => _config.DispatchConsumersAsync;
180+
179181
public async ValueTask<RecoveryAwareChannel> CreateNonRecoveringChannelAsync(CancellationToken cancellationToken)
180182
{
181183
ISession session = InnerConnection.CreateSession();

projects/RabbitMQ.Client/client/impl/ChannelBase.cs

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -975,12 +975,20 @@ public async Task<string> BasicConsumeAsync(string queue, bool autoAck, string c
975975
{
976976
if (ConsumerDispatcher is AsyncConsumerDispatcher)
977977
{
978-
if (!(consumer is IAsyncBasicConsumer))
978+
if (false == (consumer is IAsyncBasicConsumer))
979979
{
980980
throw new InvalidOperationException("When using an AsyncConsumerDispatcher, the consumer must implement IAsyncBasicConsumer");
981981
}
982982
}
983983

984+
if (ConsumerDispatcher is ConsumerDispatcher)
985+
{
986+
if (consumer is IAsyncBasicConsumer)
987+
{
988+
throw new InvalidOperationException("When using an ConsumerDispatcher, the consumer must not implement IAsyncBasicConsumer");
989+
}
990+
}
991+
984992
// NOTE:
985993
// Maybe don't dispose this instance because the CancellationToken must remain
986994
// valid for processing the response.

projects/RabbitMQ.Client/client/impl/Connection.cs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -101,6 +101,8 @@ internal Connection(ConnectionConfig config, IFrameHandler frameHandler)
101101
public int LocalPort => _frameHandler.LocalPort;
102102
public int RemotePort => _frameHandler.RemotePort;
103103

104+
public bool DispatchConsumersAsyncEnabled => _config.DispatchConsumersAsync;
105+
104106
public IDictionary<string, object?>? ServerProperties { get; private set; }
105107

106108
public IEnumerable<ShutdownReportEntry> ShutdownReport => _shutdownReport;

projects/Test/Integration/TestAsyncConsumer.cs

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,8 @@ public TestAsyncConsumer(ITestOutputHelper output)
5252
[Fact]
5353
public async Task TestBasicRoundtripConcurrent()
5454
{
55+
Assert.True(_conn.DispatchConsumersAsyncEnabled);
56+
5557
AddCallbackExceptionHandlers();
5658
_channel.DefaultConsumer = new DefaultAsyncConsumer("_channel,", _output);
5759

@@ -145,6 +147,8 @@ public async Task TestBasicRoundtripConcurrent()
145147
[Fact]
146148
public async Task TestBasicRoundtripConcurrentManyMessages()
147149
{
150+
Assert.True(_conn.DispatchConsumersAsyncEnabled);
151+
148152
AddCallbackExceptionHandlers();
149153
_channel.DefaultConsumer = new DefaultAsyncConsumer("_channel,", _output);
150154

@@ -320,6 +324,8 @@ public async Task TestBasicRoundtripConcurrentManyMessages()
320324
[Fact]
321325
public async Task TestBasicRejectAsync()
322326
{
327+
Assert.True(_conn.DispatchConsumersAsyncEnabled);
328+
323329
string queueName = GenerateQueueName();
324330

325331
var publishSyncSource = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);
@@ -485,6 +491,8 @@ await _channel.BasicConsumeAsync(queue: queueName, autoAck: false,
485491
[Fact]
486492
public async Task TestBasicNackAsync()
487493
{
494+
Assert.True(_conn.DispatchConsumersAsyncEnabled);
495+
488496
var publishSyncSource = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);
489497

490498
_conn.ConnectionShutdown += (o, ea) =>
@@ -558,6 +566,8 @@ await _channel.BasicConsumeAsync(queue: queueName, autoAck: false,
558566
[Fact]
559567
public async Task NonAsyncConsumerShouldThrowInvalidOperationException()
560568
{
569+
Assert.True(_conn.DispatchConsumersAsyncEnabled);
570+
561571
bool sawException = false;
562572
QueueDeclareOk q = await _channel.QueueDeclareAsync(string.Empty, false, false, false);
563573
await _channel.BasicPublishAsync(string.Empty, q.QueueName, GetRandomBody(1024));
@@ -576,6 +586,8 @@ public async Task NonAsyncConsumerShouldThrowInvalidOperationException()
576586
[Fact]
577587
public async Task TestDeclarationOfManyAutoDeleteQueuesWithTransientConsumer()
578588
{
589+
Assert.True(_conn.DispatchConsumersAsyncEnabled);
590+
579591
AssertRecordedQueues((RabbitMQ.Client.Framing.Impl.AutorecoveringConnection)_conn, 0);
580592
var tasks = new List<Task>();
581593
for (int i = 0; i < 256; i++)
@@ -596,6 +608,8 @@ public async Task TestDeclarationOfManyAutoDeleteQueuesWithTransientConsumer()
596608
[Fact]
597609
public async Task TestCreateChannelWithinAsyncConsumerCallback_GH650()
598610
{
611+
Assert.True(_conn.DispatchConsumersAsyncEnabled);
612+
599613
string exchangeName = GenerateExchangeName();
600614
string queue1Name = GenerateQueueName();
601615
string queue2Name = GenerateQueueName();
@@ -663,6 +677,8 @@ public async Task TestCreateChannelWithinAsyncConsumerCallback_GH650()
663677
[Fact]
664678
public async Task TestCloseWithinEventHandler_GH1567()
665679
{
680+
Assert.True(_conn.DispatchConsumersAsyncEnabled);
681+
666682
var tcs = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);
667683

668684
QueueDeclareOk q = await _channel.QueueDeclareAsync();

projects/Test/Integration/TestConsumer.cs

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,9 +49,31 @@ public TestConsumer(ITestOutputHelper output) : base(output)
4949
{
5050
}
5151

52+
[Fact]
53+
public async Task AsyncConsumerShouldThrowInvalidOperationException()
54+
{
55+
Assert.False(_conn.DispatchConsumersAsyncEnabled);
56+
57+
bool sawException = false;
58+
QueueDeclareOk q = await _channel.QueueDeclareAsync(string.Empty, false, false, false);
59+
await _channel.BasicPublishAsync(string.Empty, q.QueueName, GetRandomBody(1024));
60+
var consumer = new AsyncEventingBasicConsumer(_channel);
61+
try
62+
{
63+
string consumerTag = await _channel.BasicConsumeAsync(q.QueueName, false, string.Empty, false, false, null, consumer);
64+
}
65+
catch (InvalidOperationException)
66+
{
67+
sawException = true;
68+
}
69+
Assert.True(sawException, "did not see expected InvalidOperationException");
70+
}
71+
5272
[Fact]
5373
public async Task TestBasicRoundtrip()
5474
{
75+
Assert.False(_conn.DispatchConsumersAsyncEnabled);
76+
5577
TimeSpan waitSpan = TimeSpan.FromSeconds(2);
5678
QueueDeclareOk q = await _channel.QueueDeclareAsync();
5779
await _channel.BasicPublishAsync("", q.QueueName, _body);
@@ -77,6 +99,8 @@ public async Task TestBasicRoundtrip()
7799
[Fact]
78100
public async Task TestBasicRoundtripNoWait()
79101
{
102+
Assert.False(_conn.DispatchConsumersAsyncEnabled);
103+
80104
QueueDeclareOk q = await _channel.QueueDeclareAsync();
81105
await _channel.BasicPublishAsync("", q.QueueName, _body);
82106
var consumer = new EventingBasicConsumer(_channel);
@@ -101,6 +125,8 @@ public async Task TestBasicRoundtripNoWait()
101125
[Fact]
102126
public async Task ConcurrentEventingTestForReceived()
103127
{
128+
Assert.False(_conn.DispatchConsumersAsyncEnabled);
129+
104130
const int NumberOfThreads = 4;
105131
const int NumberOfRegistrations = 5000;
106132

0 commit comments

Comments
 (0)