diff --git a/build.ps1 b/build.ps1 index 5e1fe39ba3..e79b28e386 100644 --- a/build.ps1 +++ b/build.ps1 @@ -1,36 +1,45 @@ [CmdletBinding(PositionalBinding=$false)] param( - [switch]$RunTests + [switch]$RunTests, + [switch]$RunTestsUntilFailure ) +$ErrorActionPreference = 'Stop' +Set-StrictMode -Version Latest +$PSNativeCommandUseErrorActionPreference = $true + Write-Host "Run Parameters:" -ForegroundColor Cyan Write-Host "`tPSScriptRoot: $PSScriptRoot" Write-Host "`tRunTests: $RunTests" +Write-Host "`tRunTestsUntilFailure: $RunTestsUntilFailure" Write-Host "`tdotnet --version: $(dotnet --version)" Write-Host "[INFO] building all projects (Build.csproj traversal)..." -ForegroundColor "Magenta" dotnet build "$PSScriptRoot\Build.csproj" Write-Host "[INFO] done building." -ForegroundColor "Green" -if ($RunTests) +if ($RunTests -or $RunTestsUntilFailure) { $tests_dir = Join-Path -Path $PSScriptRoot -ChildPath 'projects' | Join-Path -ChildPath 'Test' $unit_csproj_file = Resolve-Path -LiteralPath (Join-Path -Path $tests_dir -ChildPath 'Unit' | Join-Path -ChildPath 'Unit.csproj') $integration_csproj_file = Resolve-Path -LiteralPath (Join-Path -Path $tests_dir -ChildPath 'Integration' | Join-Path -ChildPath 'Integration.csproj') $sequential_integration_csproj_file = Resolve-Path -LiteralPath (Join-Path -Path $tests_dir -ChildPath 'SequentialIntegration' | Join-Path -ChildPath 'SequentialIntegration.csproj') - foreach ($csproj_file in $unit_csproj_file, $integration_csproj_file, $sequential_integration_csproj_file) + Do { - Write-Host "[INFO] running Unit / Integration tests from '$csproj_file' (all frameworks)" -ForegroundColor "Magenta" - dotnet test $csproj_file --environment 'RABBITMQ_LONG_RUNNING_TESTS=true' --no-restore --no-build --logger "console;verbosity=detailed" - if ($LASTEXITCODE -ne 0) - { - Write-Host "[ERROR] tests errored, exiting" -Foreground "Red" - Exit 1 - } - else + foreach ($csproj_file in $unit_csproj_file, $integration_csproj_file, $sequential_integration_csproj_file) { - Write-Host "[INFO] tests passed" -ForegroundColor "Green" + Write-Host "[INFO] running Unit / Integration tests from '$csproj_file' (all frameworks)" -ForegroundColor "Magenta" + & dotnet test $csproj_file --environment 'RABBITMQ_LONG_RUNNING_TESTS=true' --no-restore --no-build --logger "console;verbosity=detailed" + if ($LASTEXITCODE -ne 0) + { + Write-Host "[ERROR] tests errored, exiting" -Foreground "Red" + Exit 1 + } + else + { + Write-Host "[INFO] tests passed" -ForegroundColor "Green" + } } - } + } While ($RunTestsUntilFailure) } diff --git a/projects/RabbitMQ.Client/client/impl/ConsumerDispatching/ConsumerDispatcherChannelBase.cs b/projects/RabbitMQ.Client/client/impl/ConsumerDispatching/ConsumerDispatcherChannelBase.cs index 5d3b3cc610..bef097f1a3 100644 --- a/projects/RabbitMQ.Client/client/impl/ConsumerDispatching/ConsumerDispatcherChannelBase.cs +++ b/projects/RabbitMQ.Client/client/impl/ConsumerDispatching/ConsumerDispatcherChannelBase.cs @@ -64,7 +64,8 @@ public ValueTask HandleBasicConsumeOkAsync(IBasicConsumer consumer, string consu if (false == _disposed && false == _quiesce) { AddConsumer(consumer, consumerTag); - return _writer.WriteAsync(new WorkStruct(WorkType.ConsumeOk, consumer, consumerTag), cancellationToken); + WorkStruct work = WorkStruct.CreateConsumeOk(consumer, consumerTag); + return _writer.WriteAsync(work, cancellationToken); } else { @@ -78,7 +79,8 @@ public ValueTask HandleBasicDeliverAsync(string consumerTag, ulong deliveryTag, { if (false == _disposed && false == _quiesce) { - var work = new WorkStruct(GetConsumerOrDefault(consumerTag), consumerTag, deliveryTag, redelivered, exchange, routingKey, basicProperties, body); + IBasicConsumer consumer = GetConsumerOrDefault(consumerTag); + var work = WorkStruct.CreateDeliver(consumer, consumerTag, deliveryTag, redelivered, exchange, routingKey, basicProperties, body); return _writer.WriteAsync(work, cancellationToken); } else @@ -91,7 +93,9 @@ public ValueTask HandleBasicCancelOkAsync(string consumerTag, CancellationToken { if (false == _disposed && false == _quiesce) { - return _writer.WriteAsync(new WorkStruct(WorkType.CancelOk, GetAndRemoveConsumer(consumerTag), consumerTag), cancellationToken); + IBasicConsumer consumer = GetAndRemoveConsumer(consumerTag); + WorkStruct work = WorkStruct.CreateCancelOk(consumer, consumerTag); + return _writer.WriteAsync(work, cancellationToken); } else { @@ -103,7 +107,9 @@ public ValueTask HandleBasicCancelAsync(string consumerTag, CancellationToken ca { if (false == _disposed && false == _quiesce) { - return _writer.WriteAsync(new WorkStruct(WorkType.Cancel, GetAndRemoveConsumer(consumerTag), consumerTag), cancellationToken); + IBasicConsumer consumer = GetAndRemoveConsumer(consumerTag); + WorkStruct work = WorkStruct.CreateCancel(consumer, consumerTag); + return _writer.WriteAsync(work, cancellationToken); } else { @@ -226,7 +232,7 @@ await _worker protected sealed override void ShutdownConsumer(IBasicConsumer consumer, ShutdownEventArgs reason) { - _writer.TryWrite(new WorkStruct(consumer, reason)); + _writer.TryWrite(WorkStruct.CreateShutdown(consumer, reason)); } protected override void InternalShutdown() @@ -258,7 +264,7 @@ protected override Task InternalShutdownAsync() public readonly ShutdownEventArgs? Reason; public readonly WorkType WorkType; - public WorkStruct(WorkType type, IBasicConsumer consumer, string consumerTag) + private WorkStruct(WorkType type, IBasicConsumer consumer, string consumerTag) : this() { WorkType = type; @@ -266,7 +272,7 @@ public WorkStruct(WorkType type, IBasicConsumer consumer, string consumerTag) ConsumerTag = consumerTag; } - public WorkStruct(IBasicConsumer consumer, ShutdownEventArgs reason) + private WorkStruct(IBasicConsumer consumer, ShutdownEventArgs reason) : this() { WorkType = WorkType.Shutdown; @@ -274,7 +280,7 @@ public WorkStruct(IBasicConsumer consumer, ShutdownEventArgs reason) Reason = reason; } - public WorkStruct(IBasicConsumer consumer, string consumerTag, ulong deliveryTag, bool redelivered, + private WorkStruct(IBasicConsumer consumer, string consumerTag, ulong deliveryTag, bool redelivered, string exchange, string routingKey, in ReadOnlyBasicProperties basicProperties, RentedMemory body) { WorkType = WorkType.Deliver; @@ -289,6 +295,33 @@ public WorkStruct(IBasicConsumer consumer, string consumerTag, ulong deliveryTag Reason = default; } + public static WorkStruct CreateCancel(IBasicConsumer consumer, string consumerTag) + { + return new WorkStruct(WorkType.Cancel, consumer, consumerTag); + } + + public static WorkStruct CreateCancelOk(IBasicConsumer consumer, string consumerTag) + { + return new WorkStruct(WorkType.CancelOk, consumer, consumerTag); + } + + public static WorkStruct CreateConsumeOk(IBasicConsumer consumer, string consumerTag) + { + return new WorkStruct(WorkType.ConsumeOk, consumer, consumerTag); + } + + public static WorkStruct CreateShutdown(IBasicConsumer consumer, ShutdownEventArgs reason) + { + return new WorkStruct(consumer, reason); + } + + public static WorkStruct CreateDeliver(IBasicConsumer consumer, string consumerTag, ulong deliveryTag, bool redelivered, + string exchange, string routingKey, in ReadOnlyBasicProperties basicProperties, RentedMemory body) + { + return new WorkStruct(consumer, consumerTag, deliveryTag, redelivered, + exchange, routingKey, basicProperties, body); + } + public void Dispose() => Body.Dispose(); } diff --git a/projects/RabbitMQ.Client/client/impl/RpcContinuationQueue.cs b/projects/RabbitMQ.Client/client/impl/RpcContinuationQueue.cs index e6ad807aaf..ceaea16847 100644 --- a/projects/RabbitMQ.Client/client/impl/RpcContinuationQueue.cs +++ b/projects/RabbitMQ.Client/client/impl/RpcContinuationQueue.cs @@ -82,7 +82,7 @@ public void Enqueue(IRpcContinuation k) IRpcContinuation result = Interlocked.CompareExchange(ref _outstandingRpc, k, s_tmp); if (!(result is EmptyRpcContinuation)) { - throw new NotSupportedException("Pipelining of requests forbidden"); + throw new NotSupportedException($"Pipelining of requests forbidden (attempted: {k.GetType()}, enqueued: {result.GetType()})"); } } diff --git a/projects/Test/Common/IntegrationFixture.cs b/projects/Test/Common/IntegrationFixture.cs index 62846d397e..57a17fde6f 100644 --- a/projects/Test/Common/IntegrationFixture.cs +++ b/projects/Test/Common/IntegrationFixture.cs @@ -538,7 +538,7 @@ protected ConnectionFactory CreateConnectionFactory() protected void HandleConnectionShutdown(object sender, ShutdownEventArgs args) { - if (args.Initiator == ShutdownInitiator.Peer) + if (args.Initiator != ShutdownInitiator.Application) { IConnection conn = (IConnection)sender; _output.WriteLine($"{_testDisplayName} connection {conn.ClientProvidedName} shut down: {args}"); @@ -547,7 +547,7 @@ protected void HandleConnectionShutdown(object sender, ShutdownEventArgs args) protected void HandleConnectionShutdown(IConnection conn, ShutdownEventArgs args, Action a) { - if (args.Initiator == ShutdownInitiator.Peer) + if (args.Initiator != ShutdownInitiator.Application) { _output.WriteLine($"{_testDisplayName} connection {conn.ClientProvidedName} shut down: {args}"); } @@ -556,7 +556,7 @@ protected void HandleConnectionShutdown(IConnection conn, ShutdownEventArgs args protected void HandleChannelShutdown(object sender, ShutdownEventArgs args) { - if (args.Initiator == ShutdownInitiator.Peer) + if (args.Initiator != ShutdownInitiator.Application) { IChannel ch = (IChannel)sender; _output.WriteLine($"{_testDisplayName} channel {ch.ChannelNumber} shut down: {args}"); @@ -565,7 +565,7 @@ protected void HandleChannelShutdown(object sender, ShutdownEventArgs args) protected void HandleChannelShutdown(IChannel ch, ShutdownEventArgs args, Action a) { - if (args.Initiator == ShutdownInitiator.Peer) + if (args.Initiator != ShutdownInitiator.Application) { _output.WriteLine($"{_testDisplayName} channel {ch.ChannelNumber} shut down: {args}"); } diff --git a/projects/Test/Integration/ConnectionRecovery/EventHandlerRecovery/Channel/TestRecoveryEventHandlers.cs b/projects/Test/Integration/ConnectionRecovery/EventHandlerRecovery/Channel/TestRecoveryEventHandlers.cs new file mode 100644 index 0000000000..9c3dd76ffb --- /dev/null +++ b/projects/Test/Integration/ConnectionRecovery/EventHandlerRecovery/Channel/TestRecoveryEventHandlers.cs @@ -0,0 +1,60 @@ +// This source code is dual-licensed under the Apache License, version +// 2.0, and the Mozilla Public License, version 2.0. +// +// The APL v2.0: +// +//--------------------------------------------------------------------------- +// Copyright (c) 2007-2020 VMware, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +//--------------------------------------------------------------------------- +// +// The MPL v2.0: +// +//--------------------------------------------------------------------------- +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at https://mozilla.org/MPL/2.0/. +// +// Copyright (c) 2007-2020 VMware, Inc. All rights reserved. +//--------------------------------------------------------------------------- + +using System.Threading; +using System.Threading.Tasks; +using RabbitMQ.Client.Impl; +using Xunit; +using Xunit.Abstractions; + +namespace Test.Integration.ConnectionRecovery.EventHandlerRecovery.Channel +{ + public class TestRecoveryEventHandlers : TestConnectionRecoveryBase + { + public TestRecoveryEventHandlers(ITestOutputHelper output) : base(output) + { + } + + [Fact] + public async Task TestRecoveryEventHandlers_Called() + { + int counter = 0; + ((AutorecoveringChannel)_channel).Recovery += (source, ea) => Interlocked.Increment(ref counter); + + await CloseAndWaitForRecoveryAsync(); + await CloseAndWaitForRecoveryAsync(); + await CloseAndWaitForRecoveryAsync(); + await CloseAndWaitForRecoveryAsync(); + Assert.True(_channel.IsOpen); + Assert.True(counter >= 3); + } + } +} diff --git a/projects/Test/Integration/ConnectionRecovery/EventHandlerRecovery/Channel/TestShutdownEventHandlers.cs b/projects/Test/Integration/ConnectionRecovery/EventHandlerRecovery/Channel/TestShutdownEventHandlers.cs new file mode 100644 index 0000000000..41faeb3cb0 --- /dev/null +++ b/projects/Test/Integration/ConnectionRecovery/EventHandlerRecovery/Channel/TestShutdownEventHandlers.cs @@ -0,0 +1,61 @@ +// This source code is dual-licensed under the Apache License, version +// 2.0, and the Mozilla Public License, version 2.0. +// +// The APL v2.0: +// +//--------------------------------------------------------------------------- +// Copyright (c) 2007-2020 VMware, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +//--------------------------------------------------------------------------- +// +// The MPL v2.0: +// +//--------------------------------------------------------------------------- +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at https://mozilla.org/MPL/2.0/. +// +// Copyright (c) 2007-2020 VMware, Inc. All rights reserved. +//--------------------------------------------------------------------------- + +using System.Threading; +using System.Threading.Tasks; +using Xunit; +using Xunit.Abstractions; + +namespace Test.Integration.ConnectionRecovery.EventHandlerRecovery.Channel +{ + public class TestShutdownEventHandlers : TestConnectionRecoveryBase + { + public TestShutdownEventHandlers(ITestOutputHelper output) : base(output) + { + } + + [Fact] + public async Task TestShutdownEventHandlersOnChannel_Called() + { + int counter = 0; + _channel.ChannelShutdown += (c, args) => Interlocked.Increment(ref counter); + + Assert.True(_channel.IsOpen); + await CloseAndWaitForRecoveryAsync(); + await CloseAndWaitForRecoveryAsync(); + await CloseAndWaitForRecoveryAsync(); + await CloseAndWaitForRecoveryAsync(); + Assert.True(_channel.IsOpen); + + Assert.True(counter >= 3); + } + } +} diff --git a/projects/Test/Integration/ConnectionRecovery/TestEventHandlerRecovery.cs b/projects/Test/Integration/ConnectionRecovery/EventHandlerRecovery/Connection/TestRecoveringConsumerEventHandlers.cs similarity index 56% rename from projects/Test/Integration/ConnectionRecovery/TestEventHandlerRecovery.cs rename to projects/Test/Integration/ConnectionRecovery/EventHandlerRecovery/Connection/TestRecoveringConsumerEventHandlers.cs index 0055750b9d..a3fc5ee383 100644 --- a/projects/Test/Integration/ConnectionRecovery/TestEventHandlerRecovery.cs +++ b/projects/Test/Integration/ConnectionRecovery/EventHandlerRecovery/Connection/TestRecoveringConsumerEventHandlers.cs @@ -35,52 +35,23 @@ using RabbitMQ.Client; using RabbitMQ.Client.Events; using RabbitMQ.Client.Framing.Impl; -using RabbitMQ.Client.Impl; using Xunit; using Xunit.Abstractions; -namespace Test.Integration.ConnectionRecovery +namespace Test.Integration.ConnectionRecovery.EventHandlerRecovery.Connection { - public class TestEventHandlerRecovery : TestConnectionRecoveryBase + public class TestRecoveringConsumerEventHandlers : TestConnectionRecoveryBase { - public TestEventHandlerRecovery(ITestOutputHelper output) : base(output) + public TestRecoveringConsumerEventHandlers(ITestOutputHelper output) : base(output) { } - [Fact] - public async Task TestRecoveryEventHandlersOnConnection() - { - int counter = 0; - ((AutorecoveringConnection)_conn).RecoverySucceeded += (source, ea) => Interlocked.Increment(ref counter); - - await CloseAndWaitForRecoveryAsync(); - await CloseAndWaitForRecoveryAsync(); - await CloseAndWaitForRecoveryAsync(); - await CloseAndWaitForRecoveryAsync(); - Assert.True(_conn.IsOpen); - Assert.True(counter >= 3); - } - - [Fact] - public async Task TestRecoveryEventHandlersOnChannel() - { - int counter = 0; - ((AutorecoveringChannel)_channel).Recovery += (source, ea) => Interlocked.Increment(ref counter); - - await CloseAndWaitForRecoveryAsync(); - await CloseAndWaitForRecoveryAsync(); - await CloseAndWaitForRecoveryAsync(); - await CloseAndWaitForRecoveryAsync(); - Assert.True(_channel.IsOpen); - Assert.True(counter >= 3); - } - [Theory] [InlineData(1)] [InlineData(3)] - public async Task TestRecoveringConsumerHandlerOnConnection(int iterations) + public async Task TestRecoveringConsumerEventHandlers_Called(int iterations) { - string q = (await _channel.QueueDeclareAsync(GenerateQueueName(), false, false, false)).QueueName; + RabbitMQ.Client.QueueDeclareOk q = await _channel.QueueDeclareAsync(GenerateQueueName(), false, false, false); var cons = new EventingBasicConsumer(_channel); await _channel.BasicConsumeAsync(q, true, cons); @@ -96,10 +67,10 @@ public async Task TestRecoveringConsumerHandlerOnConnection(int iterations) } [Fact] - public async Task TestRecoveringConsumerHandlerOnConnection_EventArgumentsArePassedDown() + public async Task TestRecoveringConsumerEventHandler_EventArgumentsArePassedDown() { var myArgs = new Dictionary { { "first-argument", "some-value" } }; - string q = (await _channel.QueueDeclareAsync(GenerateQueueName(), false, false, false)).QueueName; + RabbitMQ.Client.QueueDeclareOk q = await _channel.QueueDeclareAsync(GenerateQueueName(), false, false, false); var cons = new EventingBasicConsumer(_channel); string expectedCTag = await _channel.BasicConsumeAsync(cons, q, arguments: myArgs); @@ -121,37 +92,5 @@ public async Task TestRecoveringConsumerHandlerOnConnection_EventArgumentsArePas string actualVal = (string)Assert.Contains("first-argument", myArgs as IDictionary); Assert.Equal("event-handler-set-this-value", actualVal); } - - [Fact] - public async Task TestShutdownEventHandlersRecoveryOnConnection() - { - int counter = 0; - _conn.ConnectionShutdown += (c, args) => Interlocked.Increment(ref counter); - - Assert.True(_conn.IsOpen); - await CloseAndWaitForRecoveryAsync(); - await CloseAndWaitForRecoveryAsync(); - await CloseAndWaitForRecoveryAsync(); - await CloseAndWaitForRecoveryAsync(); - Assert.True(_conn.IsOpen); - - Assert.True(counter >= 3); - } - - [Fact] - public async Task TestShutdownEventHandlersRecoveryOnChannel() - { - int counter = 0; - _channel.ChannelShutdown += (c, args) => Interlocked.Increment(ref counter); - - Assert.True(_channel.IsOpen); - await CloseAndWaitForRecoveryAsync(); - await CloseAndWaitForRecoveryAsync(); - await CloseAndWaitForRecoveryAsync(); - await CloseAndWaitForRecoveryAsync(); - Assert.True(_channel.IsOpen); - - Assert.True(counter >= 3); - } } } diff --git a/projects/Test/Integration/ConnectionRecovery/EventHandlerRecovery/Connection/TestRecoverySucceededEventHandlers.cs b/projects/Test/Integration/ConnectionRecovery/EventHandlerRecovery/Connection/TestRecoverySucceededEventHandlers.cs new file mode 100644 index 0000000000..eaed0913e9 --- /dev/null +++ b/projects/Test/Integration/ConnectionRecovery/EventHandlerRecovery/Connection/TestRecoverySucceededEventHandlers.cs @@ -0,0 +1,60 @@ +// This source code is dual-licensed under the Apache License, version +// 2.0, and the Mozilla Public License, version 2.0. +// +// The APL v2.0: +// +//--------------------------------------------------------------------------- +// Copyright (c) 2007-2020 VMware, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +//--------------------------------------------------------------------------- +// +// The MPL v2.0: +// +//--------------------------------------------------------------------------- +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at https://mozilla.org/MPL/2.0/. +// +// Copyright (c) 2007-2020 VMware, Inc. All rights reserved. +//--------------------------------------------------------------------------- + +using System.Threading; +using System.Threading.Tasks; +using RabbitMQ.Client.Framing.Impl; +using Xunit; +using Xunit.Abstractions; + +namespace Test.Integration.ConnectionRecovery.EventHandlerRecovery.Connection +{ + public class TestRecoverySucceededEventHandlers : TestConnectionRecoveryBase + { + public TestRecoverySucceededEventHandlers(ITestOutputHelper output) : base(output) + { + } + + [Fact] + public async Task TestRecoverySucceededEventHandlers_Called() + { + int counter = 0; + ((AutorecoveringConnection)_conn).RecoverySucceeded += (source, ea) => Interlocked.Increment(ref counter); + + await CloseAndWaitForRecoveryAsync(); + await CloseAndWaitForRecoveryAsync(); + await CloseAndWaitForRecoveryAsync(); + await CloseAndWaitForRecoveryAsync(); + Assert.True(_conn.IsOpen); + Assert.True(counter >= 3); + } + } +} diff --git a/projects/Test/Integration/ConnectionRecovery/EventHandlerRecovery/Connection/TestShutdownEventHandlers.cs b/projects/Test/Integration/ConnectionRecovery/EventHandlerRecovery/Connection/TestShutdownEventHandlers.cs new file mode 100644 index 0000000000..9f08cae5d6 --- /dev/null +++ b/projects/Test/Integration/ConnectionRecovery/EventHandlerRecovery/Connection/TestShutdownEventHandlers.cs @@ -0,0 +1,61 @@ +// This source code is dual-licensed under the Apache License, version +// 2.0, and the Mozilla Public License, version 2.0. +// +// The APL v2.0: +// +//--------------------------------------------------------------------------- +// Copyright (c) 2007-2020 VMware, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +//--------------------------------------------------------------------------- +// +// The MPL v2.0: +// +//--------------------------------------------------------------------------- +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at https://mozilla.org/MPL/2.0/. +// +// Copyright (c) 2007-2020 VMware, Inc. All rights reserved. +//--------------------------------------------------------------------------- + +using System.Threading; +using System.Threading.Tasks; +using Xunit; +using Xunit.Abstractions; + +namespace Test.Integration.ConnectionRecovery.EventHandlerRecovery.Connection +{ + public class TestShutdownEventHandlers : TestConnectionRecoveryBase + { + public TestShutdownEventHandlers(ITestOutputHelper output) : base(output) + { + } + + [Fact] + public async Task TestShutdownEventHandlers_Called() + { + int counter = 0; + _conn.ConnectionShutdown += (c, args) => Interlocked.Increment(ref counter); + + Assert.True(_conn.IsOpen); + await CloseAndWaitForRecoveryAsync(); + await CloseAndWaitForRecoveryAsync(); + await CloseAndWaitForRecoveryAsync(); + await CloseAndWaitForRecoveryAsync(); + Assert.True(_conn.IsOpen); + + Assert.True(counter >= 3); + } + } +} diff --git a/projects/Test/Integration/TestAsyncConsumer.cs b/projects/Test/Integration/TestAsyncConsumer.cs index 0d61367db0..ce6fd70caa 100644 --- a/projects/Test/Integration/TestAsyncConsumer.cs +++ b/projects/Test/Integration/TestAsyncConsumer.cs @@ -80,10 +80,7 @@ public async Task TestBasicRoundtripConcurrent() { HandleConnectionShutdown(_conn, ea, (args) => { - if (args.Initiator == ShutdownInitiator.Peer) - { - MaybeSetException(ea, publish1SyncSource, publish2SyncSource); - } + MaybeSetException(args, publish1SyncSource, publish2SyncSource); }); }; @@ -91,10 +88,7 @@ public async Task TestBasicRoundtripConcurrent() { HandleChannelShutdown(_channel, ea, (args) => { - if (args.Initiator == ShutdownInitiator.Peer) - { - MaybeSetException(ea, publish1SyncSource, publish2SyncSource); - } + MaybeSetException(args, publish1SyncSource, publish2SyncSource); }); }; @@ -164,10 +158,7 @@ public async Task TestBasicRoundtripConcurrentManyMessages() { HandleConnectionShutdown(_conn, ea, (args) => { - if (args.Initiator == ShutdownInitiator.Peer) - { - MaybeSetException(ea, publish1SyncSource, publish2SyncSource); - } + MaybeSetException(args, publish1SyncSource, publish2SyncSource); }); }; @@ -175,10 +166,7 @@ public async Task TestBasicRoundtripConcurrentManyMessages() { HandleChannelShutdown(_channel, ea, (args) => { - if (args.Initiator == ShutdownInitiator.Peer) - { - MaybeSetException(ea, publish1SyncSource, publish2SyncSource); - } + MaybeSetException(args, publish1SyncSource, publish2SyncSource); }); }; @@ -193,10 +181,7 @@ public async Task TestBasicRoundtripConcurrentManyMessages() { HandleConnectionShutdown(publishConn, ea, (args) => { - if (args.Initiator == ShutdownInitiator.Peer) - { - MaybeSetException(ea, publish1SyncSource, publish2SyncSource); - } + MaybeSetException(args, publish1SyncSource, publish2SyncSource); }); }; using (IChannel publishChannel = await publishConn.CreateChannelAsync()) @@ -205,10 +190,7 @@ public async Task TestBasicRoundtripConcurrentManyMessages() { HandleChannelShutdown(publishChannel, ea, (args) => { - if (args.Initiator == ShutdownInitiator.Peer) - { - MaybeSetException(ea, publish1SyncSource, publish2SyncSource); - } + MaybeSetException(args, publish1SyncSource, publish2SyncSource); }); }; await publishChannel.ConfirmSelectAsync(); @@ -235,10 +217,7 @@ public async Task TestBasicRoundtripConcurrentManyMessages() { HandleConnectionShutdown(consumeConn, ea, (args) => { - if (args.Initiator == ShutdownInitiator.Peer) - { - MaybeSetException(ea, publish1SyncSource, publish2SyncSource); - } + MaybeSetException(ea, publish1SyncSource, publish2SyncSource); }); }; using (IChannel consumeChannel = await consumeConn.CreateChannelAsync()) @@ -247,10 +226,7 @@ public async Task TestBasicRoundtripConcurrentManyMessages() { HandleChannelShutdown(consumeChannel, ea, (args) => { - if (args.Initiator == ShutdownInitiator.Peer) - { - MaybeSetException(ea, publish1SyncSource, publish2SyncSource); - } + MaybeSetException(ea, publish1SyncSource, publish2SyncSource); }); }; @@ -330,10 +306,7 @@ public async Task TestBasicRejectAsync() { HandleConnectionShutdown(_conn, ea, (args) => { - if (args.Initiator == ShutdownInitiator.Peer) - { - MaybeSetException(ea, publishSyncSource); - } + MaybeSetException(args, publishSyncSource); }); }; @@ -341,7 +314,7 @@ public async Task TestBasicRejectAsync() { HandleChannelShutdown(_channel, ea, (args) => { - MaybeSetException(ea, publishSyncSource); + MaybeSetException(args, publishSyncSource); }); }; @@ -427,10 +400,7 @@ public async Task TestBasicAckAsync() { HandleConnectionShutdown(_conn, ea, (args) => { - if (args.Initiator == ShutdownInitiator.Peer) - { - MaybeSetException(ea, publishSyncSource); - } + MaybeSetException(args, publishSyncSource); }); }; @@ -438,10 +408,7 @@ public async Task TestBasicAckAsync() { HandleChannelShutdown(_channel, ea, (args) => { - if (args.Initiator == ShutdownInitiator.Peer) - { - MaybeSetException(ea, publishSyncSource); - } + MaybeSetException(args, publishSyncSource); }); }; @@ -495,10 +462,7 @@ public async Task TestBasicNackAsync() { HandleConnectionShutdown(_conn, ea, (args) => { - if (args.Initiator == ShutdownInitiator.Peer) - { - MaybeSetException(ea, publishSyncSource); - } + MaybeSetException(ea, publishSyncSource); }); }; @@ -506,10 +470,7 @@ public async Task TestBasicNackAsync() { HandleChannelShutdown(_channel, ea, (args) => { - if (args.Initiator == ShutdownInitiator.Peer) - { - MaybeSetException(ea, publishSyncSource); - } + MaybeSetException(ea, publishSyncSource); }); }; @@ -611,19 +572,22 @@ private static void SetException(Exception ex, params TaskCompletionSource } } - private static void MaybeSetException(ShutdownEventArgs ea, params TaskCompletionSource[] tcsAry) + private static void MaybeSetException(ShutdownEventArgs args, params TaskCompletionSource[] tcsAry) { - foreach (TaskCompletionSource tcs in tcsAry) + if (args.Initiator != ShutdownInitiator.Application) { - MaybeSetException(ea, tcs); + foreach (TaskCompletionSource tcs in tcsAry) + { + MaybeSetException(args, tcs); + } } } - private static void MaybeSetException(ShutdownEventArgs ea, TaskCompletionSource tcs) + private static void MaybeSetException(ShutdownEventArgs args, TaskCompletionSource tcs) { - if (ea.Initiator == ShutdownInitiator.Peer) + if (args.Initiator != ShutdownInitiator.Application) { - Exception ex = ea.Exception ?? new Exception(ea.ReplyText); + Exception ex = args.Exception ?? new Exception(args.ReplyText); tcs.TrySetException(ex); } } diff --git a/projects/Test/Integration/TestConcurrentAccessWithSharedConnectionAsync.cs b/projects/Test/Integration/TestConcurrentAccessWithSharedConnectionAsync.cs index de11787dbf..30afc4acda 100644 --- a/projects/Test/Integration/TestConcurrentAccessWithSharedConnectionAsync.cs +++ b/projects/Test/Integration/TestConcurrentAccessWithSharedConnectionAsync.cs @@ -104,9 +104,9 @@ private Task TestConcurrentChannelOpenAndPublishingWithBodyAsync(byte[] body, in { HandleChannelShutdown(ch, ea, (args) => { - if (args.Initiator == ShutdownInitiator.Peer) + if (args.Initiator != ShutdownInitiator.Application) { - tcs.TrySetResult(false); + tcs.TrySetException(args.Exception); } }); }; diff --git a/projects/Test/Integration/TestFloodPublishing.cs b/projects/Test/Integration/TestFloodPublishing.cs index 11c0bec638..27cd24e02a 100644 --- a/projects/Test/Integration/TestFloodPublishing.cs +++ b/projects/Test/Integration/TestFloodPublishing.cs @@ -70,7 +70,7 @@ public async Task TestUnthrottledFloodPublishing() { HandleConnectionShutdown(_conn, ea, (args) => { - if (args.Initiator == ShutdownInitiator.Peer) + if (args.Initiator != ShutdownInitiator.Application) { sawUnexpectedShutdown = true; } @@ -81,7 +81,7 @@ public async Task TestUnthrottledFloodPublishing() { HandleChannelShutdown(_channel, ea, (args) => { - if (args.Initiator == ShutdownInitiator.Peer) + if (args.Initiator != ShutdownInitiator.Application) { sawUnexpectedShutdown = true; } @@ -130,16 +130,16 @@ public async Task TestMultithreadFloodPublishing() int publishCount = 4096; int receivedCount = 0; - var tcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); + var allMessagesSeenTcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); _conn.ConnectionShutdown += (o, ea) => { HandleConnectionShutdown(_conn, ea, (args) => { - if (args.Initiator == ShutdownInitiator.Peer) + if (args.Initiator != ShutdownInitiator.Application) { receivedCount = -1; - tcs.SetResult(false); + allMessagesSeenTcs.TrySetException(args.Exception); } }); }; @@ -148,86 +148,120 @@ public async Task TestMultithreadFloodPublishing() { HandleChannelShutdown(_channel, ea, (args) => { - if (args.Initiator == ShutdownInitiator.Peer) + if (args.Initiator != ShutdownInitiator.Application) { receivedCount = -1; - tcs.SetResult(false); + allMessagesSeenTcs.TrySetException(args.Exception); } }); }; QueueDeclareOk q = await _channel.QueueDeclareAsync(queue: string.Empty, - passive: false, durable: false, exclusive: true, autoDelete: false, arguments: null); + passive: false, durable: false, exclusive: false, autoDelete: true, arguments: null); string queueName = q.QueueName; Task pub = Task.Run(async () => { bool stop = false; - using (IChannel pubCh = await _conn.CreateChannelAsync()) + using (IConnection publishConnection = await _connFactory.CreateConnectionAsync()) { - await pubCh.ConfirmSelectAsync(); - - pubCh.ChannelShutdown += (o, ea) => + publishConnection.ConnectionShutdown += (o, ea) => { - HandleChannelShutdown(pubCh, ea, (args) => + HandleConnectionShutdown(_conn, ea, (args) => { - if (args.Initiator == ShutdownInitiator.Peer) + if (args.Initiator != ShutdownInitiator.Application) { - stop = true; - tcs.TrySetResult(false); + receivedCount = -1; + allMessagesSeenTcs.TrySetException(args.Exception); } }); }; - for (int i = 0; i < publishCount && false == stop; i++) + using (IChannel publishChannel = await publishConnection.CreateChannelAsync()) { - await pubCh.BasicPublishAsync(string.Empty, queueName, sendBody, true); + await publishChannel.ConfirmSelectAsync(); + + publishChannel.ChannelShutdown += (o, ea) => + { + HandleChannelShutdown(publishChannel, ea, (args) => + { + if (args.Initiator != ShutdownInitiator.Application) + { + stop = true; + allMessagesSeenTcs.TrySetException(args.Exception); + } + }); + }; + + for (int i = 0; i < publishCount && false == stop; i++) + { + await publishChannel.BasicPublishAsync(string.Empty, queueName, sendBody, true); + } + + await publishChannel.WaitForConfirmsOrDieAsync(); + await publishChannel.CloseAsync(); } - await pubCh.WaitForConfirmsOrDieAsync(); - await pubCh.CloseAsync(); + await publishConnection.CloseAsync(); } }); var cts = new CancellationTokenSource(WaitSpan); CancellationTokenRegistration ctsr = cts.Token.Register(() => { - tcs.TrySetResult(false); + allMessagesSeenTcs.TrySetCanceled(); }); try { - using (IChannel consumeCh = await _conn.CreateChannelAsync()) + using (IConnection consumeConnection = await _connFactory.CreateConnectionAsync()) { - consumeCh.ChannelShutdown += (o, ea) => + consumeConnection.ConnectionShutdown += (o, ea) => { - HandleChannelShutdown(consumeCh, ea, (args) => + HandleConnectionShutdown(_conn, ea, (args) => { - if (args.Initiator == ShutdownInitiator.Peer) + if (args.Initiator != ShutdownInitiator.Application) { - tcs.TrySetResult(false); + receivedCount = -1; + allMessagesSeenTcs.TrySetException(args.Exception); } }); }; - var consumer = new AsyncEventingBasicConsumer(consumeCh); - consumer.Received += async (o, a) => + using (IChannel consumeChannel = await consumeConnection.CreateChannelAsync()) { - string receivedMessage = _encoding.GetString(a.Body.ToArray()); - Assert.Equal(message, receivedMessage); - if (Interlocked.Increment(ref receivedCount) == publishCount) + consumeChannel.ChannelShutdown += (o, ea) => { - tcs.SetResult(true); - } - await Task.Yield(); - }; + HandleChannelShutdown(consumeChannel, ea, (args) => + { + if (args.Initiator != ShutdownInitiator.Application) + { + allMessagesSeenTcs.TrySetException(args.Exception); + } + }); + }; + + var consumer = new AsyncEventingBasicConsumer(consumeChannel); + consumer.Received += async (o, a) => + { + string receivedMessage = _encoding.GetString(a.Body.ToArray()); + Assert.Equal(message, receivedMessage); + if (Interlocked.Increment(ref receivedCount) == publishCount) + { + allMessagesSeenTcs.SetResult(true); + } + await Task.Yield(); + }; - await consumeCh.BasicConsumeAsync(queue: queueName, autoAck: true, - consumerTag: string.Empty, noLocal: false, exclusive: false, - arguments: null, consumer: consumer); + await consumeChannel.BasicConsumeAsync(queue: queueName, autoAck: true, + consumerTag: string.Empty, noLocal: false, exclusive: false, + arguments: null, consumer: consumer); + + Assert.True(await allMessagesSeenTcs.Task); + await consumeChannel.CloseAsync(); + } - Assert.True(await tcs.Task); - await consumeCh.CloseAsync(); + await consumeConnection.CloseAsync(); } await pub; diff --git a/projects/Test/Integration/TestQueueDeclare.cs b/projects/Test/Integration/TestQueueDeclare.cs index f3a6b3e25d..9927bffdbf 100644 --- a/projects/Test/Integration/TestQueueDeclare.cs +++ b/projects/Test/Integration/TestQueueDeclare.cs @@ -66,7 +66,7 @@ public async void TestConcurrentQueueDeclareAndBindAsync() { HandleConnectionShutdown(_conn, ea, (args) => { - if (ea.Initiator == ShutdownInitiator.Peer) + if (args.Initiator != ShutdownInitiator.Application) { sawShutdown = true; } @@ -77,7 +77,7 @@ public async void TestConcurrentQueueDeclareAndBindAsync() { HandleChannelShutdown(_channel, ea, (args) => { - if (args.Initiator == ShutdownInitiator.Peer) + if (args.Initiator != ShutdownInitiator.Application) { sawShutdown = true; }