From a6e188aa3def9e936fa63a57cd7888cebb6df61f Mon Sep 17 00:00:00 2001 From: Luke Bakken Date: Mon, 3 Jun 2024 16:46:57 -0700 Subject: [PATCH] Change test to match code provided by @neilgreatorex --- .../SequentialIntegrationFixture.cs | 10 +++- .../TestConnectionBlockedChannelLeak.cs | 60 +++++++++---------- 2 files changed, 36 insertions(+), 34 deletions(-) diff --git a/projects/Test/SequentialIntegration/SequentialIntegrationFixture.cs b/projects/Test/SequentialIntegration/SequentialIntegrationFixture.cs index b3284ec62c..d03bb30ba2 100644 --- a/projects/Test/SequentialIntegration/SequentialIntegrationFixture.cs +++ b/projects/Test/SequentialIntegration/SequentialIntegrationFixture.cs @@ -42,11 +42,15 @@ public SequentialIntegrationFixture(ITestOutputHelper output) : base(output) { } + public async Task BlockAsync() + { + await _rabbitMQCtl.ExecRabbitMQCtlAsync("set_vm_memory_high_watermark absolute 10"); + await Task.Delay(TimeSpan.FromSeconds(1)); + } + public async Task BlockAsync(IChannel channel) { - await _rabbitMQCtl.ExecRabbitMQCtlAsync("set_vm_memory_high_watermark 0.000000001"); - // give rabbitmqctl some time to do its job - await Task.Delay(TimeSpan.FromSeconds(5)); + await BlockAsync(); await channel.BasicPublishAsync(exchange: "amq.direct", routingKey: Guid.NewGuid().ToString(), _encoding.GetBytes("message")); } diff --git a/projects/Test/SequentialIntegration/TestConnectionBlockedChannelLeak.cs b/projects/Test/SequentialIntegration/TestConnectionBlockedChannelLeak.cs index e86fc6d993..db12fc45d8 100644 --- a/projects/Test/SequentialIntegration/TestConnectionBlockedChannelLeak.cs +++ b/projects/Test/SequentialIntegration/TestConnectionBlockedChannelLeak.cs @@ -30,7 +30,6 @@ //--------------------------------------------------------------------------- using System; -using System.Collections.Generic; using System.Threading; using System.Threading.Tasks; using RabbitMQ.Client; @@ -49,14 +48,6 @@ public TestConnectionBlockedChannelLeak(ITestOutputHelper output) : base(output) public override async Task InitializeAsync() { await UnblockAsync(); - _connFactory = new ConnectionFactory - { - AutomaticRecoveryEnabled = true, - ClientProvidedName = _testDisplayName, - ContinuationTimeout = TimeSpan.FromSeconds(2) - }; - _conn = await _connFactory.CreateConnectionAsync(); - _channel = await _conn.CreateChannelAsync(); } public override async Task DisposeAsync() @@ -68,49 +59,55 @@ public override async Task DisposeAsync() [Fact] public async Task TestConnectionBlockedChannelLeak_GH1573() { - string exchangeName = GenerateExchangeName(); + await BlockAsync(); - var tcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); + var connectionBlockedTcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); + var connectionUnblockedTcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); using var cts = new CancellationTokenSource(WaitSpan); using CancellationTokenRegistration ctr = cts.Token.Register(() => { - tcs.TrySetCanceled(); + connectionBlockedTcs.TrySetCanceled(); + connectionUnblockedTcs.TrySetCanceled(); }); + _connFactory = new ConnectionFactory + { + AutomaticRecoveryEnabled = true, + ClientProvidedName = _testDisplayName, + ContinuationTimeout = TimeSpan.FromSeconds(2) + }; + _conn = await _connFactory.CreateConnectionAsync(); + _channel = await _conn.CreateChannelAsync(); + + string exchangeName = GenerateExchangeName(); + _conn.ConnectionBlocked += (object sender, ConnectionBlockedEventArgs args) => { - UnblockAsync(); + connectionBlockedTcs.SetResult(true); }; _conn.ConnectionUnblocked += (object sender, EventArgs ea) => { - tcs.SetResult(true); + connectionUnblockedTcs.SetResult(true); }; - await BlockAsync(_channel); - - using (IChannel publishChannel = await _conn.CreateChannelAsync()) + async Task ExchangeDeclareAndPublish() { - await publishChannel.ExchangeDeclareAsync(exchangeName, ExchangeType.Direct, autoDelete: true); - await publishChannel.BasicPublishAsync(exchangeName, exchangeName, GetRandomBody(), mandatory: true); - await publishChannel.CloseAsync(); + using (IChannel publishChannel = await _conn.CreateChannelAsync()) + { + await publishChannel.ExchangeDeclareAsync(exchangeName, ExchangeType.Direct, autoDelete: true); + await publishChannel.BasicPublishAsync(exchangeName, exchangeName, GetRandomBody(), mandatory: true); + await publishChannel.CloseAsync(); + } } + await Assert.ThrowsAnyAsync(ExchangeDeclareAndPublish); - var channels = new List(); for (int i = 1; i <= 5; i++) { - IChannel c = await _conn.CreateChannelAsync(); - channels.Add(c); + await Assert.ThrowsAnyAsync(() => _conn.CreateChannelAsync()); } - /* - * Note: - * This wait probably isn't necessary, if the above CreateChannelAsync - * calls were to timeout, we'd get exceptions on the await - */ - await Task.Delay(TimeSpan.FromSeconds(5)); - // Note: debugging // var rmq = new RabbitMQCtl(_output); // string output = await rmq.ExecRabbitMQCtlAsync("list_channels"); @@ -121,7 +118,8 @@ public async Task TestConnectionBlockedChannelLeak_GH1573() // output = await rmq.ExecRabbitMQCtlAsync("list_channels"); // _output.WriteLine("CHANNELS 1: {0}", output); - Assert.True(await tcs.Task, "Unblock notification not received."); + Assert.True(await connectionBlockedTcs.Task, "Blocked notification not received."); + Assert.True(await connectionUnblockedTcs.Task, "Unblocked notification not received."); } } }