diff --git a/.editorconfig b/.editorconfig index 96ff657df..748bdad85 100644 --- a/.editorconfig +++ b/.editorconfig @@ -6,6 +6,21 @@ trim_trailing_whitespace = true insert_final_newline = true indent_style = space indent_size = 4 +dotnet_style_operator_placement_when_wrapping = beginning_of_line +tab_width = 4 +end_of_line = crlf +dotnet_style_coalesce_expression = true:suggestion +dotnet_style_null_propagation = true:suggestion +dotnet_style_prefer_is_null_check_over_reference_equality_method = true:suggestion +dotnet_style_prefer_auto_properties = true:suggestion +dotnet_style_object_initializer = true:suggestion +dotnet_style_collection_initializer = true:suggestion +dotnet_style_prefer_simplified_boolean_expressions = true:suggestion +dotnet_style_prefer_conditional_expression_over_assignment = true:silent +dotnet_style_prefer_conditional_expression_over_return = true:silent +dotnet_style_explicit_tuple_names = true:suggestion +dotnet_style_prefer_inferred_tuple_names = true:suggestion +dotnet_style_prefer_inferred_anonymous_type_member_names = true:suggestion [*.cs] # New line preferences @@ -26,7 +41,7 @@ csharp_indent_switch_labels = true csharp_indent_labels = one_less_than_current # Modifier preferences -csharp_preferred_modifier_order = public,private,protected,internal,static,extern,new,virtual,abstract,sealed,override,readonly,unsafe,volatile,async:suggestion +csharp_preferred_modifier_order = public,private,protected,internal,file,static,extern,new,virtual,abstract,sealed,override,readonly,unsafe,required,volatile,async:error # avoid this. unless absolutely necessary dotnet_style_qualification_for_field = false:suggestion @@ -44,7 +59,7 @@ dotnet_style_predefined_type_for_member_access = true:suggestion # name all constant fields using PascalCase dotnet_naming_rule.constant_fields_should_be_pascal_case.severity = suggestion dotnet_naming_rule.constant_fields_should_be_pascal_case.symbols = constant_fields -dotnet_naming_rule.constant_fields_should_be_pascal_case.style = pascal_case_style +dotnet_naming_rule.constant_fields_should_be_pascal_case.style = pascal_case_style dotnet_naming_symbols.constant_fields.applicable_kinds = field dotnet_naming_symbols.constant_fields.required_modifiers = const dotnet_naming_style.pascal_case_style.capitalization = pascal_case @@ -52,7 +67,7 @@ dotnet_naming_style.pascal_case_style.capitalization = pascal_case # static fields should have s_ prefix dotnet_naming_rule.static_fields_should_have_prefix.severity = suggestion dotnet_naming_rule.static_fields_should_have_prefix.symbols = static_fields -dotnet_naming_rule.static_fields_should_have_prefix.style = static_prefix_style +dotnet_naming_rule.static_fields_should_have_prefix.style = static_prefix_style dotnet_naming_symbols.static_fields.applicable_kinds = field dotnet_naming_symbols.static_fields.required_modifiers = static dotnet_naming_symbols.static_fields.applicable_accessibilities = private, internal, private_protected @@ -62,7 +77,7 @@ dotnet_naming_style.static_prefix_style.capitalization = camel_case # internal and private fields should be _camelCase dotnet_naming_rule.camel_case_for_private_internal_fields.severity = suggestion dotnet_naming_rule.camel_case_for_private_internal_fields.symbols = private_internal_fields -dotnet_naming_rule.camel_case_for_private_internal_fields.style = camel_case_underscore_style +dotnet_naming_rule.camel_case_for_private_internal_fields.style = camel_case_underscore_style dotnet_naming_symbols.private_internal_fields.applicable_kinds = field dotnet_naming_symbols.private_internal_fields.applicable_accessibilities = private, internal dotnet_naming_style.camel_case_underscore_style.required_prefix = _ @@ -71,7 +86,7 @@ dotnet_naming_style.camel_case_underscore_style.capitalization = camel_case # Code style defaults csharp_using_directive_placement = outside_namespace:suggestion dotnet_sort_system_directives_first = true -csharp_prefer_braces = true:refactoring +csharp_prefer_braces = true:silent csharp_preserve_single_line_blocks = true:none csharp_preserve_single_line_statements = false:none csharp_prefer_static_local_function = true:suggestion @@ -97,14 +112,14 @@ dotnet_style_prefer_conditional_expression_over_return = true:refactoring csharp_prefer_simple_default_expression = true:suggestion # Expression-bodied members -csharp_style_expression_bodied_methods = true:refactoring -csharp_style_expression_bodied_constructors = true:refactoring -csharp_style_expression_bodied_operators = true:refactoring -csharp_style_expression_bodied_properties = true:refactoring -csharp_style_expression_bodied_indexers = true:refactoring -csharp_style_expression_bodied_accessors = true:refactoring -csharp_style_expression_bodied_lambdas = true:refactoring -csharp_style_expression_bodied_local_functions = true:refactoring +csharp_style_expression_bodied_methods = true:silent +csharp_style_expression_bodied_constructors = true:silent +csharp_style_expression_bodied_operators = true:silent +csharp_style_expression_bodied_properties = true:silent +csharp_style_expression_bodied_indexers = true:silent +csharp_style_expression_bodied_accessors = true:silent +csharp_style_expression_bodied_lambdas = true:silent +csharp_style_expression_bodied_local_functions = true:silent # Pattern matching csharp_style_pattern_matching_over_is_with_cast_check = true:suggestion @@ -158,6 +173,8 @@ csharp_space_between_parentheses = false csharp_space_between_square_brackets = false # Analyzers +dotnet_diagnostic.IDE0036.severity = error + dotnet_code_quality.api_surface = public dotnet_code_quality.ca1802.api_surface = private, internal @@ -170,6 +187,11 @@ dotnet_diagnostic.RS0041.severity = none dotnet_diagnostic.RS0051.severity = none dotnet_diagnostic.CA2007.severity = error +csharp_style_namespace_declarations = block_scoped:silent +csharp_style_prefer_method_group_conversion = true:silent +csharp_style_prefer_top_level_statements = true:silent +csharp_style_prefer_primary_constructors = true:suggestion +csharp_prefer_system_threading_lock = true:suggestion # C++ Files [*.{cpp,h,in}] diff --git a/RabbitMQDotNetClient.sln b/RabbitMQDotNetClient.sln index 4307f8500..deb687a47 100644 --- a/RabbitMQDotNetClient.sln +++ b/RabbitMQDotNetClient.sln @@ -46,6 +46,8 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "GH-1647", "projects\Applica EndProject Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "PublisherConfirms", "projects\Applications\PublisherConfirms\PublisherConfirms.csproj", "{13149F73-2CDB-4ECF-BF2C-403860045751}" EndProject +Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "GH-1749", "projects\Applications\GH-1749\GH-1749.csproj", "{725D9986-ACD1-424E-AF4C-2BEB407D2BD9}" +EndProject Global GlobalSection(SolutionConfigurationPlatforms) = preSolution Debug|Any CPU = Debug|Any CPU @@ -108,6 +110,10 @@ Global {13149F73-2CDB-4ECF-BF2C-403860045751}.Debug|Any CPU.Build.0 = Debug|Any CPU {13149F73-2CDB-4ECF-BF2C-403860045751}.Release|Any CPU.ActiveCfg = Release|Any CPU {13149F73-2CDB-4ECF-BF2C-403860045751}.Release|Any CPU.Build.0 = Release|Any CPU + {725D9986-ACD1-424E-AF4C-2BEB407D2BD9}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {725D9986-ACD1-424E-AF4C-2BEB407D2BD9}.Debug|Any CPU.Build.0 = Debug|Any CPU + {725D9986-ACD1-424E-AF4C-2BEB407D2BD9}.Release|Any CPU.ActiveCfg = Release|Any CPU + {725D9986-ACD1-424E-AF4C-2BEB407D2BD9}.Release|Any CPU.Build.0 = Release|Any CPU EndGlobalSection GlobalSection(SolutionProperties) = preSolution HideSolutionNode = FALSE @@ -123,6 +129,7 @@ Global {AB5B7C53-D7EC-4985-A6DE-70178E4B688A} = {EFD4BED5-13A5-4D9C-AADF-CAB7E1573704} {64ED07BF-4D77-47CD-AF4F-5B4525686FA1} = {D21B282C-49E6-4A30-887B-9626D94B8D69} {13149F73-2CDB-4ECF-BF2C-403860045751} = {D21B282C-49E6-4A30-887B-9626D94B8D69} + {725D9986-ACD1-424E-AF4C-2BEB407D2BD9} = {D21B282C-49E6-4A30-887B-9626D94B8D69} EndGlobalSection GlobalSection(ExtensibilityGlobals) = postSolution SolutionGuid = {3C6A0C44-FA63-4101-BBF9-2598641167D1} diff --git a/projects/Applications/CreateChannel/Program.cs b/projects/Applications/CreateChannel/Program.cs index 4ed61c6f9..01c33860c 100644 --- a/projects/Applications/CreateChannel/Program.cs +++ b/projects/Applications/CreateChannel/Program.cs @@ -30,11 +30,12 @@ //--------------------------------------------------------------------------- using System; +using System.Collections.Generic; using System.Diagnostics; -using System.Threading; using System.Threading.Tasks; using RabbitMQ.Client; +using RabbitMQ.Client.Exceptions; namespace CreateChannel { @@ -44,49 +45,89 @@ public static class Program private const int ChannelsToOpen = 50; private static int channelsOpened; - private static AutoResetEvent doneEvent; public static async Task Main() { - doneEvent = new AutoResetEvent(false); + var doneTcs = new TaskCompletionSource(); var connectionFactory = new ConnectionFactory { }; await using IConnection connection = await connectionFactory.CreateConnectionAsync(); var watch = Stopwatch.StartNew(); - _ = Task.Run(async () => + var workTask = Task.Run(async () => { - var channels = new IChannel[ChannelsToOpen]; - for (int i = 0; i < Repeats; i++) + try { - for (int j = 0; j < channels.Length; j++) + var channelOpenTasks = new List>(); + var channelDisposeTasks = new List(); + var channels = new List(); + for (int i = 0; i < Repeats; i++) { - channels[j] = await connection.CreateChannelAsync(); - channelsOpened++; - } + for (int j = 0; j < ChannelsToOpen; j++) + { + channelOpenTasks.Add(connection.CreateChannelAsync()); + } - for (int j = 0; j < channels.Length; j++) - { - await channels[j].DisposeAsync(); + for (int j = 0; j < channelOpenTasks.Count; j++) + { + IChannel ch = await channelOpenTasks[j]; + if (j % 8 == 0) + { + try + { + await ch.QueueDeclarePassiveAsync(Guid.NewGuid().ToString()); + } + catch (OperationInterruptedException) + { + await ch.DisposeAsync(); + } + catch (Exception ex) + { + _ = Console.Error.WriteLineAsync($"{DateTime.Now:s} [ERROR] {ex}"); + } + } + else + { + channels.Add(ch); + channelsOpened++; + } + } + channelOpenTasks.Clear(); + + for (int j = 0; j < channels.Count; j++) + { + channelDisposeTasks.Add(channels[j].DisposeAsync()); + } + + for (int j = 0; j < channels.Count; j++) + { + await channelDisposeTasks[j]; + } + channelDisposeTasks.Clear(); } - } - doneEvent.Set(); + doneTcs.SetResult(true); + } + catch (Exception ex) + { + doneTcs.SetException(ex); + } }); Console.WriteLine($"{Repeats} times opening {ChannelsToOpen} channels on a connection. => Total channel open/close: {Repeats * ChannelsToOpen}"); Console.WriteLine(); Console.WriteLine("Opened"); - while (!doneEvent.WaitOne(500)) + while (false == doneTcs.Task.IsCompleted) { Console.WriteLine($"{channelsOpened,5}"); + await Task.Delay(150); } watch.Stop(); Console.WriteLine($"{channelsOpened,5}"); Console.WriteLine(); - Console.WriteLine($"Took {watch.Elapsed.TotalMilliseconds} ms"); + Console.WriteLine($"Took {watch.Elapsed}"); - Console.ReadLine(); + await workTask; } } } diff --git a/projects/Applications/GH-1647/GH-1647.csproj b/projects/Applications/GH-1647/GH-1647.csproj index f2591d159..f08f12982 100644 --- a/projects/Applications/GH-1647/GH-1647.csproj +++ b/projects/Applications/GH-1647/GH-1647.csproj @@ -9,7 +9,7 @@ - + diff --git a/projects/Applications/GH-1749/GH-1749.csproj b/projects/Applications/GH-1749/GH-1749.csproj new file mode 100644 index 000000000..9b238226a --- /dev/null +++ b/projects/Applications/GH-1749/GH-1749.csproj @@ -0,0 +1,15 @@ + + + + Exe + net8.0 + GH_1749 + enable + enable + + + + + + + diff --git a/projects/Applications/GH-1749/Program.cs b/projects/Applications/GH-1749/Program.cs new file mode 100644 index 000000000..d543e0263 --- /dev/null +++ b/projects/Applications/GH-1749/Program.cs @@ -0,0 +1,199 @@ +// This source code is dual-licensed under the Apache License, version +// 2.0, and the Mozilla Public License, version 2.0. +// +// The APL v2.0: +// +//--------------------------------------------------------------------------- +// Copyright (c) 2007-2024 Broadcom. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +//--------------------------------------------------------------------------- +// +// The MPL v2.0: +// +//--------------------------------------------------------------------------- +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at https://mozilla.org/MPL/2.0/. +// +// Copyright (c) 2007-2024 Broadcom. All Rights Reserved. +//--------------------------------------------------------------------------- + +#pragma warning disable CA2007 // Consider calling ConfigureAwait on the awaited task + +using System.Runtime.ExceptionServices; +using RabbitMQ.Client; +using RabbitMQ.Client.Events; +using RabbitMQ.Client.Exceptions; + +namespace GH_1749 +{ + class GH1749Consumer : AsyncDefaultBasicConsumer + { + public GH1749Consumer(IChannel channel) : base(channel) + { + } + + protected override Task OnCancelAsync(string[] consumerTags, CancellationToken cancellationToken = default) + { + Console.WriteLine("{0} [INFO] OnCancelAsync, tags[0]: {1}", DateTime.Now.ToString("s"), consumerTags[0]); + return base.OnCancelAsync(consumerTags, cancellationToken); + } + } + + static class Program + { + const string DefaultHostName = "localhost"; + const string ConnectionClientProvidedName = "GH_1749"; + static readonly CancellationTokenSource s_cancellationTokenSource = new(); + static readonly CancellationToken s_cancellationToken = s_cancellationTokenSource.Token; + + static async Task Main(string[] args) + { + string hostname = DefaultHostName; + if (args.Length > 0) + { + hostname = args[0]; + } + + AppDomain.CurrentDomain.FirstChanceException += CurrentDomain_FirstChanceException; + + ConnectionFactory connectionFactory = new() + { + HostName = hostname, + AutomaticRecoveryEnabled = false, + TopologyRecoveryEnabled = false, + RequestedConnectionTimeout = TimeSpan.FromSeconds(600), + RequestedHeartbeat = TimeSpan.FromSeconds(600), + UserName = "guest", + Password = "guest", + ClientProvidedName = ConnectionClientProvidedName + }; + + var channelOptions = new CreateChannelOptions(publisherConfirmationsEnabled: true, publisherConfirmationTrackingEnabled: true); + await using var connection = await connectionFactory.CreateConnectionAsync(); + + connection.RecoverySucceededAsync += (object sender, AsyncEventArgs ea) => + { + Console.WriteLine("{0} [INFO] saw RecoverySucceededAsync, event: {1}", Now, ea); + return Task.CompletedTask; + }; + + connection.CallbackExceptionAsync += Connection_CallbackExceptionAsync; + + connection.ConnectionBlockedAsync += Connection_ConnectionBlockedAsync; + connection.ConnectionUnblockedAsync += Connection_ConnectionUnblockedAsync; + + connection.ConnectionRecoveryErrorAsync += Connection_ConnectionRecoveryErrorAsync; + + connection.ConnectionShutdownAsync += (object sender, ShutdownEventArgs ea) => + { + Console.WriteLine("{0} [INFO] saw ConnectionShutdownAsync, event: {1}", Now, ea); + return Task.CompletedTask; + }; + + connection.ConsumerTagChangeAfterRecoveryAsync += Connection_ConsumerTagChangeAfterRecoveryAsync; + connection.QueueNameChangedAfterRecoveryAsync += Connection_QueueNameChangedAfterRecoveryAsync; + + connection.RecoveringConsumerAsync += Connection_RecoveringConsumerAsync; + + await using var channel = await connection.CreateChannelAsync(options: channelOptions); + + channel.CallbackExceptionAsync += Channel_CallbackExceptionAsync; + channel.ChannelShutdownAsync += Channel_ChannelShutdownAsync; + + try + { + await channel.QueueDeclarePassiveAsync(Guid.NewGuid().ToString()); + } + catch (OperationInterruptedException) + { + await channel.DisposeAsync(); + // rabbitmq-dotnet-client-1749 + // await Task.Delay(2000); + } + } + + private static string Now => DateTime.Now.ToString("s"); + + private static Task Channel_CallbackExceptionAsync(object sender, CallbackExceptionEventArgs ea) + { + Console.WriteLine("{0} [INFO] channel saw CallbackExceptionAsync, event: {1}", Now, ea); + Console.WriteLine("{0} [INFO] channel CallbackExceptionAsync, exception: {1}", Now, ea.Exception); + return Task.CompletedTask; + } + + private static Task Channel_ChannelShutdownAsync(object sender, ShutdownEventArgs ea) + { + Console.WriteLine("{0} [INFO] saw ChannelShutdownAsync, event: {1}", Now, ea); + return Task.CompletedTask; + // rabbitmq-dotnet-client-1749 + // return Task.Delay(1000); + } + + private static void CurrentDomain_FirstChanceException(object? sender, FirstChanceExceptionEventArgs e) + { + if (e.Exception is ObjectDisposedException) + { + Console.WriteLine("{0} [INFO] saw FirstChanceException, exception: {1}", Now, e.Exception); + } + } + + private static Task Connection_CallbackExceptionAsync(object sender, CallbackExceptionEventArgs ea) + { + Console.WriteLine("{0} [INFO] connection saw CallbackExceptionAsync, event: {1}", Now, ea); + Console.WriteLine("{0} [INFO] connection CallbackExceptionAsync, exception: {1}", Now, ea.Exception); + return Task.CompletedTask; + } + + private static Task Connection_ConnectionBlockedAsync(object sender, ConnectionBlockedEventArgs ea) + { + Console.WriteLine("{0} [INFO] saw ConnectionBlockedAsync, event: {1}", Now, ea); + return Task.CompletedTask; + } + + private static Task Connection_ConnectionUnblockedAsync(object sender, AsyncEventArgs ea) + { + Console.WriteLine("{0} [INFO] saw ConnectionUnlockedAsync, event: {1}", Now, ea); + return Task.CompletedTask; + } + + private static Task Connection_ConnectionRecoveryErrorAsync(object sender, ConnectionRecoveryErrorEventArgs ea) + { + Console.WriteLine("{0} [INFO] saw ConnectionRecoveryErrorAsync, event: {1}", Now, ea); + Console.WriteLine("{0} [INFO] ConnectionRecoveryErrorAsync, exception: {1}", Now, ea.Exception); + return Task.CompletedTask; + } + + private static Task Connection_ConsumerTagChangeAfterRecoveryAsync(object sender, ConsumerTagChangedAfterRecoveryEventArgs ea) + { + Console.WriteLine("{0} [INFO] saw ConsumerTagChangeAfterRecoveryAsync, event: {1}", Now, ea); + Console.WriteLine("{0} [INFO] ConsumerTagChangeAfterRecoveryAsync, tags: {1} {2}", Now, ea.TagBefore, ea.TagAfter); + return Task.CompletedTask; + } + + private static Task Connection_QueueNameChangedAfterRecoveryAsync(object sender, QueueNameChangedAfterRecoveryEventArgs ea) + { + Console.WriteLine("{0} [INFO] saw QueueNameChangedAfterRecoveryAsync, event: {1}", Now, ea); + Console.WriteLine("{0} [INFO] QueueNameChangedAfterRecoveryAsync, queue names: {1} {2}", Now, ea.NameBefore, ea.NameAfter); + return Task.CompletedTask; + } + + private static Task Connection_RecoveringConsumerAsync(object sender, RecoveringConsumerEventArgs ea) + { + Console.WriteLine("{0} [INFO] saw RecoveringConsumerAsync, event: {1}, tag: {2}", Now, ea, ea.ConsumerTag); + return Task.CompletedTask; + } + } +} + diff --git a/projects/Directory.Build.targets b/projects/Directory.Build.targets index 50373e79f..744a9fc90 100644 --- a/projects/Directory.Build.targets +++ b/projects/Directory.Build.targets @@ -3,6 +3,7 @@ 00240000048000009400000006020000002400005253413100040000010001008d20ec856aeeb8c3153a77faa2d80e6e43b5db93224a20cc7ae384f65f142e89730e2ff0fcc5d578bbe96fa98a7196c77329efdee4579b3814c0789e5a39b51df6edd75b602a33ceabdfcf19a3feb832f31d8254168cd7ba5700dfbca301fbf8db614ba41ba18474de0a5f4c2d51c995bc3636c641c8cbe76f45717bfcb943b5 $([System.IO.Path]::Combine('$(IntermediateOutputPath)','$(TargetFrameworkMoniker).AssemblyAttributes$(DefaultLanguageSourceExtension)')) + true diff --git a/projects/RabbitMQ.Client/ConsumerDispatching/ConsumerDispatcherChannelBase.cs b/projects/RabbitMQ.Client/ConsumerDispatching/ConsumerDispatcherChannelBase.cs index 963fced57..761ec8bc9 100644 --- a/projects/RabbitMQ.Client/ConsumerDispatching/ConsumerDispatcherChannelBase.cs +++ b/projects/RabbitMQ.Client/ConsumerDispatching/ConsumerDispatcherChannelBase.cs @@ -202,6 +202,15 @@ protected override Task InternalShutdownAsync() protected abstract Task ProcessChannelAsync(); + protected enum WorkType : byte + { + Shutdown, + Cancel, + CancelOk, + Deliver, + ConsumeOk + } + protected readonly struct WorkStruct : IDisposable { public readonly IAsyncBasicConsumer Consumer; @@ -276,42 +285,35 @@ public static WorkStruct CreateDeliver(IAsyncBasicConsumer consumer, string cons public void Dispose() => Body.Dispose(); } - protected enum WorkType : byte + public void Dispose() { - Shutdown, - Cancel, - CancelOk, - Deliver, - ConsumeOk + // Do not change this code. Put cleanup code in 'Dispose(bool disposing)' method + Dispose(disposing: true); + GC.SuppressFinalize(this); } protected virtual void Dispose(bool disposing) { - if (!_disposed) + if (_disposed) { - try - { - if (disposing) - { - Quiesce(); - } - } - catch - { - // CHOMP - } - finally + return; + } + + try + { + if (disposing) { - _disposed = true; + Quiesce(); } } - } - - public void Dispose() - { - // Do not change this code. Put cleanup code in 'Dispose(bool disposing)' method - Dispose(disposing: true); - GC.SuppressFinalize(this); + catch + { + // CHOMP + } + finally + { + _disposed = true; + } } } } diff --git a/projects/RabbitMQ.Client/Impl/AutorecoveringChannel.cs b/projects/RabbitMQ.Client/Impl/AutorecoveringChannel.cs index b057af115..96a0477bd 100644 --- a/projects/RabbitMQ.Client/Impl/AutorecoveringChannel.cs +++ b/projects/RabbitMQ.Client/Impl/AutorecoveringChannel.cs @@ -48,6 +48,7 @@ internal sealed class AutorecoveringChannel : IChannel, IRecoverable private AutorecoveringConnection _connection; private RecoveryAwareChannel _innerChannel; + private bool _disposed; private int _isDisposing; @@ -254,7 +255,15 @@ await _connection.DeleteRecordedChannelAsync(this, public override string ToString() => InnerChannel.ToString(); - public void Dispose() => DisposeAsync().AsTask().GetAwaiter().GetResult(); + public void Dispose() + { + if (_disposed) + { + return; + } + + DisposeAsync().AsTask().GetAwaiter().GetResult(); + } public async ValueTask DisposeAsync() { diff --git a/projects/RabbitMQ.Client/Impl/AutorecoveringConnection.cs b/projects/RabbitMQ.Client/Impl/AutorecoveringConnection.cs index 879ed249a..26d92e1f7 100644 --- a/projects/RabbitMQ.Client/Impl/AutorecoveringConnection.cs +++ b/projects/RabbitMQ.Client/Impl/AutorecoveringConnection.cs @@ -270,7 +270,15 @@ await RecordChannelAsync(autorecoveringChannel, channelsSemaphoreHeld: false, ca return autorecoveringChannel; } - public void Dispose() => DisposeAsync().AsTask().GetAwaiter().GetResult(); + public void Dispose() + { + if (_disposed) + { + return; + } + + DisposeAsync().AsTask().GetAwaiter().GetResult(); + } public async ValueTask DisposeAsync() { diff --git a/projects/RabbitMQ.Client/Impl/Channel.cs b/projects/RabbitMQ.Client/Impl/Channel.cs index 7293f584c..dc51ae5d4 100644 --- a/projects/RabbitMQ.Client/Impl/Channel.cs +++ b/projects/RabbitMQ.Client/Impl/Channel.cs @@ -47,45 +47,61 @@ namespace RabbitMQ.Client.Impl { internal partial class Channel : IChannel, IRecoverable { - ///Only used to kick-start a connection open - ///sequence. See - internal TaskCompletionSource? m_connectionStartCell; - private Exception? m_connectionStartException; + private Exception? _connectionStartException; // AMQP only allows one RPC operation to be active at a time. - protected readonly SemaphoreSlim _rpcSemaphore = new SemaphoreSlim(1, 1); + private readonly SemaphoreSlim _rpcSemaphore = new SemaphoreSlim(1, 1); private readonly RpcContinuationQueue _continuationQueue = new RpcContinuationQueue(); private ShutdownEventArgs? _closeReason; - public ShutdownEventArgs? CloseReason => Volatile.Read(ref _closeReason); private TaskCompletionSource? _serverOriginatedChannelCloseTcs; - internal readonly IConsumerDispatcher ConsumerDispatcher; - private bool _disposed; private int _isDisposing; + private readonly AsyncEventingWrapper _basicAcksAsyncWrapper; + private readonly AsyncEventingWrapper _basicNacksAsyncWrapper; + private readonly AsyncEventingWrapper _basicReturnAsyncWrapper; + private readonly AsyncEventingWrapper _callbackExceptionAsyncWrapper; + private readonly AsyncEventingWrapper _flowControlAsyncWrapper; + private readonly AsyncEventingWrapper _channelShutdownAsyncWrapper; + private readonly AsyncEventingWrapper _recoveryAsyncWrapper; + + internal readonly IConsumerDispatcher ConsumerDispatcher; + + /// + ///Only used to kick-start a connection open + ///sequence. See + /// + internal TaskCompletionSource? ConnectionStartCell; + public Channel(ISession session, CreateChannelOptions createChannelOptions) { ContinuationTimeout = createChannelOptions.ContinuationTimeout; ConsumerDispatcher = new AsyncConsumerDispatcher(this, createChannelOptions.InternalConsumerDispatchConcurrency); - Func onExceptionAsync = (exception, context, cancellationToken) => - OnCallbackExceptionAsync(CallbackExceptionEventArgs.Build(exception, context, cancellationToken)); - _basicAcksAsyncWrapper = new AsyncEventingWrapper("OnBasicAck", onExceptionAsync); - _basicNacksAsyncWrapper = new AsyncEventingWrapper("OnBasicNack", onExceptionAsync); - _basicReturnAsyncWrapper = new AsyncEventingWrapper("OnBasicReturn", onExceptionAsync); + + _basicAcksAsyncWrapper = + new AsyncEventingWrapper("OnBasicAck", OnExceptionAsync); + _basicNacksAsyncWrapper = + new AsyncEventingWrapper("OnBasicNack", OnExceptionAsync); + _basicReturnAsyncWrapper = + new AsyncEventingWrapper("OnBasicReturn", OnExceptionAsync); _callbackExceptionAsyncWrapper = new AsyncEventingWrapper(string.Empty, (exception, context, cancellationToken) => Task.CompletedTask); - _flowControlAsyncWrapper = new AsyncEventingWrapper("OnFlowControl", onExceptionAsync); - _channelShutdownAsyncWrapper = new AsyncEventingWrapper("OnChannelShutdownAsync", onExceptionAsync); - _recoveryAsyncWrapper = new AsyncEventingWrapper("OnChannelRecovery", onExceptionAsync); + _flowControlAsyncWrapper = + new AsyncEventingWrapper("OnFlowControl", OnExceptionAsync); + _channelShutdownAsyncWrapper = + new AsyncEventingWrapper("OnChannelShutdownAsync", OnExceptionAsync); + _recoveryAsyncWrapper = + new AsyncEventingWrapper("OnChannelRecovery", OnExceptionAsync); + session.CommandReceived = HandleCommandAsync; session.SessionShutdownAsync += OnSessionShutdownAsync; Session = session; } - internal TimeSpan HandshakeContinuationTimeout { get; set; } = TimeSpan.FromSeconds(10); + public ShutdownEventArgs? CloseReason => Volatile.Read(ref _closeReason); public TimeSpan ContinuationTimeout { get; set; } @@ -95,40 +111,30 @@ public event AsyncEventHandler BasicAcksAsync remove => _basicAcksAsyncWrapper.RemoveHandler(value); } - private AsyncEventingWrapper _basicAcksAsyncWrapper; - public event AsyncEventHandler BasicNacksAsync { add => _basicNacksAsyncWrapper.AddHandler(value); remove => _basicNacksAsyncWrapper.RemoveHandler(value); } - private AsyncEventingWrapper _basicNacksAsyncWrapper; - public event AsyncEventHandler BasicReturnAsync { add => _basicReturnAsyncWrapper.AddHandler(value); remove => _basicReturnAsyncWrapper.RemoveHandler(value); } - private AsyncEventingWrapper _basicReturnAsyncWrapper; - public event AsyncEventHandler CallbackExceptionAsync { add => _callbackExceptionAsyncWrapper.AddHandler(value); remove => _callbackExceptionAsyncWrapper.RemoveHandler(value); } - private AsyncEventingWrapper _callbackExceptionAsyncWrapper; - public event AsyncEventHandler FlowControlAsync { add => _flowControlAsyncWrapper.AddHandler(value); remove => _flowControlAsyncWrapper.RemoveHandler(value); } - private AsyncEventingWrapper _flowControlAsyncWrapper; - public event AsyncEventHandler ChannelShutdownAsync { add @@ -145,21 +151,12 @@ public event AsyncEventHandler ChannelShutdownAsync remove => _channelShutdownAsyncWrapper.RemoveHandler(value); } - private AsyncEventingWrapper _channelShutdownAsyncWrapper; - public event AsyncEventHandler RecoveryAsync { add => _recoveryAsyncWrapper.AddHandler(value); remove => _recoveryAsyncWrapper.RemoveHandler(value); } - private AsyncEventingWrapper _recoveryAsyncWrapper; - - internal Task RunRecoveryEventHandlers(object sender, CancellationToken cancellationToken) - { - return _recoveryAsyncWrapper.InvokeAsync(sender, AsyncEventArgs.CreateOrDefault(cancellationToken)); - } - public int ChannelNumber => ((Session)Session).ChannelNumber; public IAsyncBasicConsumer? DefaultConsumer @@ -177,27 +174,16 @@ public IAsyncBasicConsumer? DefaultConsumer public ISession Session { get; private set; } - public Exception? ConnectionStartException => m_connectionStartException; + public Exception? ConnectionStartException => _connectionStartException; public void MaybeSetConnectionStartException(Exception ex) { - if (m_connectionStartCell != null) + if (ConnectionStartCell != null) { - m_connectionStartException = ex; + _connectionStartException = ex; } } - protected void TakeOver(Channel other) - { - _basicAcksAsyncWrapper.Takeover(other._basicAcksAsyncWrapper); - _basicNacksAsyncWrapper.Takeover(other._basicNacksAsyncWrapper); - _basicReturnAsyncWrapper.Takeover(other._basicReturnAsyncWrapper); - _callbackExceptionAsyncWrapper.Takeover(other._callbackExceptionAsyncWrapper); - _flowControlAsyncWrapper.Takeover(other._flowControlAsyncWrapper); - _channelShutdownAsyncWrapper.Takeover(other._channelShutdownAsyncWrapper); - _recoveryAsyncWrapper.Takeover(other._recoveryAsyncWrapper); - } - public Task CloseAsync(ushort replyCode, string replyText, bool abort, CancellationToken cancellationToken) { @@ -265,41 +251,60 @@ await ConsumerDispatcher.WaitForShutdownAsync() } } - internal async ValueTask ConnectionOpenAsync(string virtualHost, CancellationToken cancellationToken) + public virtual ValueTask BasicAckAsync(ulong deliveryTag, bool multiple, + CancellationToken cancellationToken) { - using var timeoutTokenSource = new CancellationTokenSource(HandshakeContinuationTimeout); - using var lts = CancellationTokenSource.CreateLinkedTokenSource(timeoutTokenSource.Token, cancellationToken); - var method = new ConnectionOpen(virtualHost); - // Note: must be awaited or else the timeoutTokenSource instance will be disposed - await ModelSendAsync(in method, lts.Token).ConfigureAwait(false); + var method = new BasicAck(deliveryTag, multiple); + return ModelSendAsync(in method, cancellationToken); } - internal async ValueTask ConnectionSecureOkAsync(byte[] response, + public virtual ValueTask BasicNackAsync(ulong deliveryTag, bool multiple, bool requeue, + CancellationToken cancellationToken) + { + var method = new BasicNack(deliveryTag, multiple, requeue); + return ModelSendAsync(in method, cancellationToken); + } + + public virtual ValueTask BasicRejectAsync(ulong deliveryTag, bool requeue, + CancellationToken cancellationToken) + { + var method = new BasicReject(deliveryTag, requeue); + return ModelSendAsync(in method, cancellationToken); + } + + public async Task BasicCancelAsync(string consumerTag, bool noWait, CancellationToken cancellationToken) { bool enqueued = false; - var k = new ConnectionSecureOrTuneAsyncRpcContinuation(ContinuationTimeout, cancellationToken); + // NOTE: DON'T dispose these instances because the CancellationTokens must remain + // valid for processing the response. + var k = new BasicCancelAsyncRpcContinuation(consumerTag, ConsumerDispatcher, + ContinuationTimeout, cancellationToken); await _rpcSemaphore.WaitAsync(k.CancellationToken) .ConfigureAwait(false); try { - enqueued = Enqueue(k); + var method = new BasicCancel(consumerTag, noWait); - try + if (noWait) { - var method = new ConnectionSecureOk(response); await ModelSendAsync(in method, k.CancellationToken) .ConfigureAwait(false); + ConsumerDispatcher.GetAndRemoveConsumer(consumerTag); } - catch (AlreadyClosedException) + else { - // let continuation throw OperationInterruptedException, - // which is a much more suitable exception before connection - // negotiation finishes + enqueued = Enqueue(k); + + await ModelSendAsync(in method, k.CancellationToken) + .ConfigureAwait(false); + + bool result = await k; + Debug.Assert(result); } - return await k; + return; } finally { @@ -311,13 +316,14 @@ await ModelSendAsync(in method, k.CancellationToken) } } - internal async ValueTask ConnectionStartOkAsync( - IDictionary clientProperties, - string mechanism, byte[] response, string locale, + public async Task BasicConsumeAsync(string queue, bool autoAck, string consumerTag, bool noLocal, bool exclusive, + IDictionary? arguments, IAsyncBasicConsumer consumer, CancellationToken cancellationToken) { + // NOTE: DON'T dispose this instance because the CancellationToken must remain + // valid for processing the response. bool enqueued = false; - var k = new ConnectionSecureOrTuneAsyncRpcContinuation(ContinuationTimeout, cancellationToken); + var k = new BasicConsumeAsyncRpcContinuation(consumer, ConsumerDispatcher, ContinuationTimeout, cancellationToken); await _rpcSemaphore.WaitAsync(k.CancellationToken) .ConfigureAwait(false); @@ -325,18 +331,9 @@ await _rpcSemaphore.WaitAsync(k.CancellationToken) { enqueued = Enqueue(k); - try - { - var method = new ConnectionStartOk(clientProperties, mechanism, response, locale); - await ModelSendAsync(in method, k.CancellationToken) - .ConfigureAwait(false); - } - catch (AlreadyClosedException) - { - // let continuation throw OperationInterruptedException, - // which is a much more suitable exception before connection - // negotiation finishes - } + var method = new BasicConsume(queue, consumerTag, noLocal, autoAck, exclusive, false, arguments); + await ModelSendAsync(in method, k.CancellationToken) + .ConfigureAwait(false); return await k; } @@ -350,29 +347,61 @@ await ModelSendAsync(in method, k.CancellationToken) } } - protected bool Enqueue(IRpcContinuation k) + public async Task BasicGetAsync(string queue, bool autoAck, + CancellationToken cancellationToken) { - if (IsOpen) + bool enqueued = false; + + var k = new BasicGetAsyncRpcContinuation(AdjustDeliveryTag, ContinuationTimeout, cancellationToken); + + await _rpcSemaphore.WaitAsync(k.CancellationToken) + .ConfigureAwait(false); + try { - _continuationQueue.Enqueue(k); - return true; + enqueued = Enqueue(k); + + var method = new BasicGet(queue, autoAck); + await ModelSendAsync(in method, k.CancellationToken) + .ConfigureAwait(false); + + BasicGetResult? result = await k; + + using Activity? activity = result != null + ? RabbitMQActivitySource.BasicGet(result.RoutingKey, + result.Exchange, + result.DeliveryTag, result.BasicProperties, result.Body.Length) + : RabbitMQActivitySource.BasicGetEmpty(queue); + + activity?.SetStartTime(k.StartTime); + + return result; } - else + finally { - k.HandleChannelShutdown(CloseReason); - return false; + if (false == enqueued) + { + k.Dispose(); + } + _rpcSemaphore.Release(); } } - internal async Task OpenAsync(CreateChannelOptions createChannelOptions, + public async Task UpdateSecretAsync(string newSecret, string reason, CancellationToken cancellationToken) { - ConfigurePublisherConfirmations(createChannelOptions.PublisherConfirmationsEnabled, - createChannelOptions.PublisherConfirmationTrackingEnabled, - createChannelOptions.OutstandingPublisherConfirmationsRateLimiter); + if (newSecret is null) + { + throw new ArgumentNullException(nameof(newSecret)); + } + + if (reason is null) + { + throw new ArgumentNullException(nameof(reason)); + } bool enqueued = false; - var k = new ChannelOpenAsyncRpcContinuation(ContinuationTimeout, cancellationToken); + var k = new SimpleAsyncRpcContinuation(ProtocolCommandId.ConnectionUpdateSecretOk, + ContinuationTimeout, cancellationToken); await _rpcSemaphore.WaitAsync(k.CancellationToken) .ConfigureAwait(false); @@ -380,15 +409,14 @@ await _rpcSemaphore.WaitAsync(k.CancellationToken) { enqueued = Enqueue(k); - var method = new ChannelOpen(); + byte[] newSecretBytes = Encoding.UTF8.GetBytes(newSecret); + var method = new ConnectionUpdateSecret(newSecretBytes, reason); await ModelSendAsync(in method, k.CancellationToken) .ConfigureAwait(false); bool result = await k; Debug.Assert(result); - - await MaybeConfirmSelect(cancellationToken) - .ConfigureAwait(false); + return; } finally { @@ -398,511 +426,102 @@ await MaybeConfirmSelect(cancellationToken) } _rpcSemaphore.Release(); } - - return this; } - internal async Task FinishCloseAsync(CancellationToken cancellationToken) + public async Task BasicQosAsync(uint prefetchSize, ushort prefetchCount, bool global, + CancellationToken cancellationToken) { - ShutdownEventArgs? reason = CloseReason; - if (reason != null) + bool enqueued = false; + var k = new BasicQosAsyncRpcContinuation(ContinuationTimeout, cancellationToken); + + await _rpcSemaphore.WaitAsync(k.CancellationToken) + .ConfigureAwait(false); + try { - await Session.CloseAsync(reason) + enqueued = Enqueue(k); + + var method = new BasicQos(prefetchSize, prefetchCount, global); + await ModelSendAsync(in method, k.CancellationToken) .ConfigureAwait(false); - } - m_connectionStartCell?.TrySetResult(null); + bool result = await k; + Debug.Assert(result); + return; + } + finally + { + if (false == enqueued) + { + k.Dispose(); + } + _rpcSemaphore.Release(); + } } - private async Task HandleCommandAsync(IncomingCommand cmd, CancellationToken cancellationToken) + public async Task ExchangeBindAsync(string destination, string source, string routingKey, + IDictionary? arguments, bool noWait, + CancellationToken cancellationToken) { - /* - * If DispatchCommandAsync returns `true`, it means that the incoming command is server-originated, and has - * already been handled. - * - * Else, the incoming command is the return of an RPC call, and must be handled. - */ + bool enqueued = false; + var k = new ExchangeBindAsyncRpcContinuation(ContinuationTimeout, cancellationToken); + + await _rpcSemaphore.WaitAsync(k.CancellationToken) + .ConfigureAwait(false); try { - if (false == await DispatchCommandAsync(cmd, cancellationToken) - .ConfigureAwait(false)) + var method = new ExchangeBind(destination, source, routingKey, noWait, arguments); + + if (noWait) { - using (IRpcContinuation c = _continuationQueue.Next()) - { - await c.HandleCommandAsync(cmd) - .ConfigureAwait(false); - } + await ModelSendAsync(in method, k.CancellationToken) + .ConfigureAwait(false); + } + else + { + enqueued = Enqueue(k); + + await ModelSendAsync(in method, k.CancellationToken) + .ConfigureAwait(false); + + bool result = await k; + Debug.Assert(result); } + + return; } finally { - cmd.ReturnBuffers(); + if (false == enqueued) + { + k.Dispose(); + } + _rpcSemaphore.Release(); } } - [MethodImpl(MethodImplOptions.AggressiveInlining)] - protected ValueTask ModelSendAsync(in T method, CancellationToken cancellationToken) where T : struct, IOutgoingAmqpMethod + public Task ExchangeDeclarePassiveAsync(string exchange, CancellationToken cancellationToken) { - return Session.TransmitAsync(in method, cancellationToken); + return ExchangeDeclareAsync(exchange: exchange, type: string.Empty, passive: true, + durable: false, autoDelete: false, arguments: null, noWait: false, + cancellationToken: cancellationToken); } - [MethodImpl(MethodImplOptions.AggressiveInlining)] - protected ValueTask ModelSendAsync(in TMethod method, in THeader header, ReadOnlyMemory body, CancellationToken cancellationToken) - where TMethod : struct, IOutgoingAmqpMethod - where THeader : IAmqpHeader - { - return Session.TransmitAsync(in method, in header, body, cancellationToken); - } - - internal Task OnCallbackExceptionAsync(CallbackExceptionEventArgs args) - { - return _callbackExceptionAsyncWrapper.InvokeAsync(this, args); - } - - ///Broadcasts notification of the final shutdown of the channel. - /// - /// - ///Do not call anywhere other than at the end of OnSessionShutdownAsync. - /// - /// - ///Must not be called when m_closeReason is null, because - ///otherwise there's a window when a new continuation could be - ///being enqueued at the same time as we're broadcasting the - ///shutdown event. See the definition of Enqueue() above. - /// - /// - private async Task OnChannelShutdownAsync(ShutdownEventArgs reason) - { - _continuationQueue.HandleChannelShutdown(reason); - - await _channelShutdownAsyncWrapper.InvokeAsync(this, reason) - .ConfigureAwait(false); - - await MaybeHandlePublisherConfirmationTcsOnChannelShutdownAsync(reason) - .ConfigureAwait(false); - - _flowControlBlock.Set(); - } - - /* - * Note: - * Attempting to make this method async, with the resulting fallout, - * resulted in many flaky test results, especially around disposing - * Channels/Connections - * - * Aborted PR: https://github.com/rabbitmq/rabbitmq-dotnet-client/pull/1551 - */ - private async Task OnSessionShutdownAsync(object? sender, ShutdownEventArgs reason) - { - ConsumerDispatcher.Quiesce(); - SetCloseReason(reason); - await OnChannelShutdownAsync(reason) - .ConfigureAwait(false); - await ConsumerDispatcher.ShutdownAsync(reason) - .ConfigureAwait(false); - } - - [MemberNotNull(nameof(_closeReason))] - internal bool SetCloseReason(ShutdownEventArgs reason) - { - if (reason is null) - { - throw new ArgumentNullException(nameof(reason)); - } - - // NB: this ensures that CloseAsync is only called once on a channel - return Interlocked.CompareExchange(ref _closeReason, reason, null) is null; - } - - public override string ToString() - => Session.ToString()!; - - void IDisposable.Dispose() - { - if (_disposed) - { - return; - } - - Dispose(true); - } - - protected virtual void Dispose(bool disposing) - { - if (IsDisposing) - { - return; - } - - if (disposing) - { - try - { - if (IsOpen) - { - this.AbortAsync().GetAwaiter().GetResult(); - } - - _serverOriginatedChannelCloseTcs?.Task.Wait(TimeSpan.FromSeconds(5)); - - ConsumerDispatcher.Dispose(); - _rpcSemaphore.Dispose(); - _confirmSemaphore.Dispose(); - _outstandingPublisherConfirmationsRateLimiter?.Dispose(); - } - finally - { - _disposed = true; - } - } - } - - public async ValueTask DisposeAsync() - { - await DisposeAsyncCore() - .ConfigureAwait(false); - - Dispose(false); - } - - protected virtual async ValueTask DisposeAsyncCore() - { - if (IsDisposing) - { - return; - } - - try - { - if (IsOpen) - { - await this.AbortAsync().ConfigureAwait(false); - } - - if (_serverOriginatedChannelCloseTcs is not null) - { - await _serverOriginatedChannelCloseTcs.Task.WaitAsync(TimeSpan.FromSeconds(5)) - .ConfigureAwait(false); - } - - ConsumerDispatcher.Dispose(); - _rpcSemaphore.Dispose(); - _confirmSemaphore.Dispose(); - - if (_outstandingPublisherConfirmationsRateLimiter is not null) - { - await _outstandingPublisherConfirmationsRateLimiter.DisposeAsync() - .ConfigureAwait(false); - } - } - finally - { - _disposed = true; - } - } - - public Task ConnectionTuneOkAsync(ushort channelMax, uint frameMax, ushort heartbeat, CancellationToken cancellationToken) - { - var method = new ConnectionTuneOk(channelMax, frameMax, heartbeat); - return ModelSendAsync(in method, cancellationToken).AsTask(); - } - - protected async Task HandleBasicAck(IncomingCommand cmd, - CancellationToken cancellationToken = default) - { - var ack = new BasicAck(cmd.MethodSpan); - if (!_basicAcksAsyncWrapper.IsEmpty) - { - var args = new BasicAckEventArgs(ack._deliveryTag, ack._multiple, cancellationToken); - await _basicAcksAsyncWrapper.InvokeAsync(this, args) - .ConfigureAwait(false); - } - - HandleAck(ack._deliveryTag, ack._multiple); - - return true; - } - - protected async Task HandleBasicNack(IncomingCommand cmd, - CancellationToken cancellationToken = default) - { - var nack = new BasicNack(cmd.MethodSpan); - if (!_basicNacksAsyncWrapper.IsEmpty) - { - var args = new BasicNackEventArgs( - nack._deliveryTag, nack._multiple, nack._requeue, cancellationToken); - await _basicNacksAsyncWrapper.InvokeAsync(this, args) - .ConfigureAwait(false); - } - - HandleNack(nack._deliveryTag, nack._multiple, false); - - return true; - } - - protected async Task HandleBasicReturn(IncomingCommand cmd, CancellationToken cancellationToken) - { - var basicReturn = new BasicReturn(cmd.MethodSpan); - - var e = new BasicReturnEventArgs(basicReturn._replyCode, basicReturn._replyText, - basicReturn._exchange, basicReturn._routingKey, - new ReadOnlyBasicProperties(cmd.HeaderSpan), cmd.Body.Memory, cancellationToken); - - if (!_basicReturnAsyncWrapper.IsEmpty) - { - await _basicReturnAsyncWrapper.InvokeAsync(this, e) - .ConfigureAwait(false); - } - - HandleReturn(e); - - return true; - } - - protected async Task HandleBasicCancelAsync(IncomingCommand cmd, CancellationToken cancellationToken) - { - string consumerTag = new BasicCancel(cmd.MethodSpan)._consumerTag; - await ConsumerDispatcher.HandleBasicCancelAsync(consumerTag, cancellationToken) - .ConfigureAwait(false); - return true; - } - - protected async Task HandleBasicDeliverAsync(IncomingCommand cmd, CancellationToken cancellationToken) - { - var method = new BasicDeliver(cmd.MethodSpan); - var header = new ReadOnlyBasicProperties(cmd.HeaderSpan); - await ConsumerDispatcher.HandleBasicDeliverAsync( - method._consumerTag, - AdjustDeliveryTag(method._deliveryTag), - method._redelivered, - method._exchange, - method._routingKey, - header, - /* - * Takeover Body so it doesn't get returned as it is necessary - * for handling the Basic.Deliver method by client code. - */ - cmd.TakeoverBody(), - cancellationToken).ConfigureAwait(false); - return true; - } - - protected virtual ulong AdjustDeliveryTag(ulong deliveryTag) - { - return deliveryTag; - } - - protected async Task HandleChannelCloseAsync(IncomingCommand cmd, CancellationToken cancellationToken) - { - TaskCompletionSource? serverOriginatedChannelCloseTcs = _serverOriginatedChannelCloseTcs; - if (serverOriginatedChannelCloseTcs is null) - { - // Attempt to assign the new TCS only if _tcs is still null - _ = Interlocked.CompareExchange(ref _serverOriginatedChannelCloseTcs, - new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously), null); - } - - try - { - var channelClose = new ChannelClose(cmd.MethodSpan); - SetCloseReason(new ShutdownEventArgs(ShutdownInitiator.Peer, - channelClose._replyCode, - channelClose._replyText, - channelClose._classId, - channelClose._methodId)); - - await Session.CloseAsync(_closeReason, notify: false) - .ConfigureAwait(false); - - var method = new ChannelCloseOk(); - await ModelSendAsync(in method, cancellationToken) - .ConfigureAwait(false); - - await Session.NotifyAsync(cancellationToken) - .ConfigureAwait(false); - - _serverOriginatedChannelCloseTcs?.TrySetResult(true); - return true; - } - catch (Exception ex) - { - _serverOriginatedChannelCloseTcs?.TrySetException(ex); - throw; - } - } - - protected async Task HandleChannelCloseOkAsync(IncomingCommand cmd, CancellationToken cancellationToken) - { - /* - * Note: - * This call _must_ come before completing the async continuation - */ - await FinishCloseAsync(cancellationToken) - .ConfigureAwait(false); - - if (_continuationQueue.TryPeek(out ChannelCloseAsyncRpcContinuation? k)) - { - _continuationQueue.Next(); - await k.HandleCommandAsync(cmd) - .ConfigureAwait(false); - } - - return true; - } - - protected async Task HandleChannelFlowAsync(IncomingCommand cmd, CancellationToken cancellationToken) - { - bool active = new ChannelFlow(cmd.MethodSpan)._active; - if (active) - { - _flowControlBlock.Set(); - } - else - { - _flowControlBlock.Reset(); - } - - var method = new ChannelFlowOk(active); - await ModelSendAsync(in method, cancellationToken). - ConfigureAwait(false); - - if (!_flowControlAsyncWrapper.IsEmpty) - { - await _flowControlAsyncWrapper.InvokeAsync(this, new FlowControlEventArgs(active, cancellationToken)) - .ConfigureAwait(false); - } - - return true; - } - - protected async Task HandleConnectionBlockedAsync(IncomingCommand cmd, CancellationToken cancellationToken) - { - string reason = new ConnectionBlocked(cmd.MethodSpan)._reason; - await Session.Connection.HandleConnectionBlockedAsync(reason, cancellationToken) - .ConfigureAwait(false); - return true; - } - - protected async Task HandleConnectionCloseAsync(IncomingCommand cmd, CancellationToken cancellationToken) - { - var method = new ConnectionClose(cmd.MethodSpan); - var reason = new ShutdownEventArgs(ShutdownInitiator.Peer, method._replyCode, method._replyText, method._classId, method._methodId); - try - { - await Session.Connection.ClosedViaPeerAsync(reason, cancellationToken) - .ConfigureAwait(false); - - var replyMethod = new ConnectionCloseOk(); - await ModelSendAsync(in replyMethod, cancellationToken) - .ConfigureAwait(false); - - SetCloseReason(Session.Connection.CloseReason!); - } - catch (IOException) - { - // Ignored. We're only trying to be polite by sending - // the close-ok, after all. - } - catch (AlreadyClosedException) - { - // Ignored. We're only trying to be polite by sending - // the close-ok, after all. - } - - return true; - } - - protected async Task HandleConnectionSecureAsync(IncomingCommand cmd, CancellationToken cancellationToken) - { - var k = (ConnectionSecureOrTuneAsyncRpcContinuation)_continuationQueue.Next(); - await k.HandleCommandAsync(new IncomingCommand()) - .ConfigureAwait(false); // release the continuation. - return true; - } - - protected async Task HandleConnectionStartAsync(IncomingCommand cmd, CancellationToken cancellationToken) - { - if (m_connectionStartCell is null) - { - var reason = new ShutdownEventArgs(ShutdownInitiator.Library, Constants.CommandInvalid, "Unexpected Connection.Start"); - await Session.Connection.CloseAsync(reason, false, - InternalConstants.DefaultConnectionCloseTimeout, - cancellationToken) - .ConfigureAwait(false); - } - else - { - var method = new ConnectionStart(cmd.MethodSpan); - var details = new ConnectionStartDetails(method._locales, method._mechanisms, - method._serverProperties, method._versionMajor, method._versionMinor); - m_connectionStartCell.SetResult(details); - m_connectionStartCell = null; - } - - return true; - } - - protected async Task HandleConnectionTuneAsync(IncomingCommand cmd, CancellationToken cancellationToken) - { - // Note: `using` here to ensure instance is disposed - using var k = (ConnectionSecureOrTuneAsyncRpcContinuation)_continuationQueue.Next(); - - // Note: releases the continuation and returns the buffers - await k.HandleCommandAsync(cmd) - .ConfigureAwait(false); - - return true; - } - - protected async Task HandleConnectionUnblockedAsync(CancellationToken cancellationToken) - { - await Session.Connection.HandleConnectionUnblockedAsync(cancellationToken) - .ConfigureAwait(false); - return true; - } - - public virtual ValueTask BasicAckAsync(ulong deliveryTag, bool multiple, - CancellationToken cancellationToken) - { - var method = new BasicAck(deliveryTag, multiple); - return ModelSendAsync(in method, cancellationToken); - } - - public virtual ValueTask BasicNackAsync(ulong deliveryTag, bool multiple, bool requeue, - CancellationToken cancellationToken) - { - var method = new BasicNack(deliveryTag, multiple, requeue); - return ModelSendAsync(in method, cancellationToken); - } - - public virtual ValueTask BasicRejectAsync(ulong deliveryTag, bool requeue, - CancellationToken cancellationToken) - { - var method = new BasicReject(deliveryTag, requeue); - return ModelSendAsync(in method, cancellationToken); - } - - public async Task BasicCancelAsync(string consumerTag, bool noWait, + public async Task ExchangeDeclareAsync(string exchange, string type, bool durable, bool autoDelete, + IDictionary? arguments, bool passive, bool noWait, CancellationToken cancellationToken) { bool enqueued = false; - // NOTE: - // Maybe don't dispose these instances because the CancellationTokens must remain - // valid for processing the response. - var k = new BasicCancelAsyncRpcContinuation(consumerTag, ConsumerDispatcher, - ContinuationTimeout, cancellationToken); + var k = new ExchangeDeclareAsyncRpcContinuation(ContinuationTimeout, cancellationToken); await _rpcSemaphore.WaitAsync(k.CancellationToken) .ConfigureAwait(false); try { - var method = new BasicCancel(consumerTag, noWait); - + var method = new ExchangeDeclare(exchange, type, passive, durable, autoDelete, false, noWait, arguments); if (noWait) { await ModelSendAsync(in method, k.CancellationToken) .ConfigureAwait(false); - ConsumerDispatcher.GetAndRemoveConsumer(consumerTag); } else { @@ -927,27 +546,35 @@ await ModelSendAsync(in method, k.CancellationToken) } } - public async Task BasicConsumeAsync(string queue, bool autoAck, string consumerTag, bool noLocal, bool exclusive, - IDictionary? arguments, IAsyncBasicConsumer consumer, + public async Task ExchangeDeleteAsync(string exchange, bool ifUnused, bool noWait, CancellationToken cancellationToken) { - // NOTE: - // Maybe don't dispose this instance because the CancellationToken must remain - // valid for processing the response. bool enqueued = false; - var k = new BasicConsumeAsyncRpcContinuation(consumer, ConsumerDispatcher, ContinuationTimeout, cancellationToken); + var k = new ExchangeDeleteAsyncRpcContinuation(ContinuationTimeout, cancellationToken); await _rpcSemaphore.WaitAsync(k.CancellationToken) .ConfigureAwait(false); try { - enqueued = Enqueue(k); + var method = new ExchangeDelete(exchange, ifUnused, Nowait: noWait); - var method = new BasicConsume(queue, consumerTag, noLocal, autoAck, exclusive, false, arguments); - await ModelSendAsync(in method, k.CancellationToken) - .ConfigureAwait(false); + if (noWait) + { + await ModelSendAsync(in method, k.CancellationToken) + .ConfigureAwait(false); + } + else + { + enqueued = Enqueue(k); - return await k; + await ModelSendAsync(in method, k.CancellationToken) + .ConfigureAwait(false); + + bool result = await k; + Debug.Assert(result); + } + + return; } finally { @@ -959,34 +586,36 @@ await ModelSendAsync(in method, k.CancellationToken) } } - public async Task BasicGetAsync(string queue, bool autoAck, + public async Task ExchangeUnbindAsync(string destination, string source, string routingKey, + IDictionary? arguments, bool noWait, CancellationToken cancellationToken) { bool enqueued = false; - - var k = new BasicGetAsyncRpcContinuation(AdjustDeliveryTag, ContinuationTimeout, cancellationToken); + var k = new ExchangeUnbindAsyncRpcContinuation(ContinuationTimeout, cancellationToken); await _rpcSemaphore.WaitAsync(k.CancellationToken) .ConfigureAwait(false); try { - enqueued = Enqueue(k); - - var method = new BasicGet(queue, autoAck); - await ModelSendAsync(in method, k.CancellationToken) - .ConfigureAwait(false); + var method = new ExchangeUnbind(destination, source, routingKey, noWait, arguments); - BasicGetResult? result = await k; + if (noWait) + { + await ModelSendAsync(in method, k.CancellationToken) + .ConfigureAwait(false); + } + else + { + enqueued = Enqueue(k); - using Activity? activity = result != null - ? RabbitMQActivitySource.BasicGet(result.RoutingKey, - result.Exchange, - result.DeliveryTag, result.BasicProperties, result.Body.Length) - : RabbitMQActivitySource.BasicGetEmpty(queue); + await ModelSendAsync(in method, k.CancellationToken) + .ConfigureAwait(false); - activity?.SetStartTime(k.StartTime); + bool result = await k; + Debug.Assert(result); + } - return result; + return; } finally { @@ -998,37 +627,68 @@ await ModelSendAsync(in method, k.CancellationToken) } } - public async Task UpdateSecretAsync(string newSecret, string reason, + public Task QueueDeclarePassiveAsync(string queue, CancellationToken cancellationToken) { - if (newSecret is null) - { - throw new ArgumentNullException(nameof(newSecret)); - } + return QueueDeclareAsync(queue: queue, passive: true, + durable: false, exclusive: false, autoDelete: false, + noWait: false, arguments: null, cancellationToken: cancellationToken); + } - if (reason is null) + public async Task QueueDeclareAsync(string queue, bool durable, bool exclusive, bool autoDelete, + IDictionary? arguments, bool passive, bool noWait, + CancellationToken cancellationToken) + { + if (true == noWait) { - throw new ArgumentNullException(nameof(reason)); + if (queue == string.Empty) + { + throw new InvalidOperationException("noWait must not be used with a server-named queue."); + } + + if (true == passive) + { + throw new InvalidOperationException("It does not make sense to use noWait: true and passive: true"); + } } bool enqueued = false; - var k = new SimpleAsyncRpcContinuation(ProtocolCommandId.ConnectionUpdateSecretOk, - ContinuationTimeout, cancellationToken); + var k = new QueueDeclareAsyncRpcContinuation(ContinuationTimeout, cancellationToken); await _rpcSemaphore.WaitAsync(k.CancellationToken) .ConfigureAwait(false); try { - enqueued = Enqueue(k); + var method = new QueueDeclare(queue, passive, durable, exclusive, autoDelete, noWait, arguments); - byte[] newSecretBytes = Encoding.UTF8.GetBytes(newSecret); - var method = new ConnectionUpdateSecret(newSecretBytes, reason); - await ModelSendAsync(in method, k.CancellationToken) - .ConfigureAwait(false); + if (noWait) + { - bool result = await k; - Debug.Assert(result); - return; + await ModelSendAsync(in method, k.CancellationToken) + .ConfigureAwait(false); + + if (false == passive) + { + CurrentQueue = queue; + } + + return new QueueDeclareOk(queue, 0, 0); + } + else + { + enqueued = Enqueue(k); + + await ModelSendAsync(in method, k.CancellationToken) + .ConfigureAwait(false); + + QueueDeclareOk result = await k; + if (false == passive) + { + CurrentQueue = result.QueueName; + } + + return result; + } } finally { @@ -1040,24 +700,35 @@ await ModelSendAsync(in method, k.CancellationToken) } } - public async Task BasicQosAsync(uint prefetchSize, ushort prefetchCount, bool global, + public async Task QueueBindAsync(string queue, string exchange, string routingKey, + IDictionary? arguments, bool noWait, CancellationToken cancellationToken) { bool enqueued = false; - var k = new BasicQosAsyncRpcContinuation(ContinuationTimeout, cancellationToken); + var k = new QueueBindAsyncRpcContinuation(ContinuationTimeout, cancellationToken); await _rpcSemaphore.WaitAsync(k.CancellationToken) .ConfigureAwait(false); try { - enqueued = Enqueue(k); + var method = new QueueBind(queue, exchange, routingKey, noWait, arguments); - var method = new BasicQos(prefetchSize, prefetchCount, global); - await ModelSendAsync(in method, k.CancellationToken) - .ConfigureAwait(false); + if (noWait) + { + await ModelSendAsync(in method, k.CancellationToken) + .ConfigureAwait(false); + } + else + { + enqueued = Enqueue(k); + + await ModelSendAsync(in method, k.CancellationToken) + .ConfigureAwait(false); + + bool result = await k; + Debug.Assert(result); + } - bool result = await k; - Debug.Assert(result); return; } finally @@ -1070,23 +741,40 @@ await ModelSendAsync(in method, k.CancellationToken) } } - public async Task ExchangeBindAsync(string destination, string source, string routingKey, - IDictionary? arguments, bool noWait, + public async Task MessageCountAsync(string queue, + CancellationToken cancellationToken) + { + QueueDeclareOk ok = await QueueDeclarePassiveAsync(queue, cancellationToken) + .ConfigureAwait(false); + return ok.MessageCount; + } + + public async Task ConsumerCountAsync(string queue, + CancellationToken cancellationToken) + { + QueueDeclareOk ok = await QueueDeclarePassiveAsync(queue, cancellationToken) + .ConfigureAwait(false); + return ok.ConsumerCount; + } + + public async Task QueueDeleteAsync(string queue, bool ifUnused, bool ifEmpty, bool noWait, CancellationToken cancellationToken) { bool enqueued = false; - var k = new ExchangeBindAsyncRpcContinuation(ContinuationTimeout, cancellationToken); + var k = new QueueDeleteAsyncRpcContinuation(ContinuationTimeout, cancellationToken); await _rpcSemaphore.WaitAsync(k.CancellationToken) .ConfigureAwait(false); try { - var method = new ExchangeBind(destination, source, routingKey, noWait, arguments); + var method = new QueueDelete(queue, ifUnused, ifEmpty, noWait); if (noWait) { await ModelSendAsync(in method, k.CancellationToken) .ConfigureAwait(false); + + return 0; } else { @@ -1095,11 +783,36 @@ await ModelSendAsync(in method, k.CancellationToken) await ModelSendAsync(in method, k.CancellationToken) .ConfigureAwait(false); - bool result = await k; - Debug.Assert(result); + return await k; } + } + finally + { + if (false == enqueued) + { + k.Dispose(); + } + _rpcSemaphore.Release(); + } + } - return; + public async Task QueuePurgeAsync(string queue, CancellationToken cancellationToken) + { + bool enqueued = false; + + var k = new QueuePurgeAsyncRpcContinuation(ContinuationTimeout, cancellationToken); + + await _rpcSemaphore.WaitAsync(k.CancellationToken) + .ConfigureAwait(false); + try + { + enqueued = Enqueue(k); + + var method = new QueuePurge(queue, false); + await ModelSendAsync(in method, k.CancellationToken) + .ConfigureAwait(false); + + return await k; } finally { @@ -1111,41 +824,54 @@ await ModelSendAsync(in method, k.CancellationToken) } } - public Task ExchangeDeclarePassiveAsync(string exchange, CancellationToken cancellationToken) + public async Task QueueUnbindAsync(string queue, string exchange, string routingKey, + IDictionary? arguments, + CancellationToken cancellationToken) { - return ExchangeDeclareAsync(exchange: exchange, type: string.Empty, passive: true, - durable: false, autoDelete: false, arguments: null, noWait: false, - cancellationToken: cancellationToken); + bool enqueued = false; + var k = new QueueUnbindAsyncRpcContinuation(ContinuationTimeout, cancellationToken); + + await _rpcSemaphore.WaitAsync(k.CancellationToken) + .ConfigureAwait(false); + try + { + enqueued = Enqueue(k); + + var method = new QueueUnbind(queue, exchange, routingKey, arguments); + await ModelSendAsync(in method, k.CancellationToken) + .ConfigureAwait(false); + + bool result = await k; + Debug.Assert(result); + return; + } + finally + { + if (false == enqueued) + { + k.Dispose(); + } + _rpcSemaphore.Release(); + } } - public async Task ExchangeDeclareAsync(string exchange, string type, bool durable, bool autoDelete, - IDictionary? arguments, bool passive, bool noWait, - CancellationToken cancellationToken) + public async Task TxCommitAsync(CancellationToken cancellationToken) { bool enqueued = false; - var k = new ExchangeDeclareAsyncRpcContinuation(ContinuationTimeout, cancellationToken); + var k = new TxCommitAsyncRpcContinuation(ContinuationTimeout, cancellationToken); await _rpcSemaphore.WaitAsync(k.CancellationToken) .ConfigureAwait(false); try { - var method = new ExchangeDeclare(exchange, type, passive, durable, autoDelete, false, noWait, arguments); - if (noWait) - { - await ModelSendAsync(in method, k.CancellationToken) - .ConfigureAwait(false); - } - else - { - enqueued = Enqueue(k); - - await ModelSendAsync(in method, k.CancellationToken) - .ConfigureAwait(false); + enqueued = Enqueue(k); - bool result = await k; - Debug.Assert(result); - } + var method = new TxCommit(); + await ModelSendAsync(in method, k.CancellationToken) + .ConfigureAwait(false); + bool result = await k; + Debug.Assert(result); return; } finally @@ -1158,34 +884,23 @@ await ModelSendAsync(in method, k.CancellationToken) } } - public async Task ExchangeDeleteAsync(string exchange, bool ifUnused, bool noWait, - CancellationToken cancellationToken) + public async Task TxRollbackAsync(CancellationToken cancellationToken) { bool enqueued = false; - var k = new ExchangeDeleteAsyncRpcContinuation(ContinuationTimeout, cancellationToken); + var k = new TxRollbackAsyncRpcContinuation(ContinuationTimeout, cancellationToken); await _rpcSemaphore.WaitAsync(k.CancellationToken) .ConfigureAwait(false); try { - var method = new ExchangeDelete(exchange, ifUnused, Nowait: noWait); - - if (noWait) - { - await ModelSendAsync(in method, k.CancellationToken) - .ConfigureAwait(false); - } - else - { - enqueued = Enqueue(k); - - await ModelSendAsync(in method, k.CancellationToken) - .ConfigureAwait(false); + enqueued = Enqueue(k); - bool result = await k; - Debug.Assert(result); - } + var method = new TxRollback(); + await ModelSendAsync(in method, k.CancellationToken) + .ConfigureAwait(false); + bool result = await k; + Debug.Assert(result); return; } finally @@ -1198,35 +913,23 @@ await ModelSendAsync(in method, k.CancellationToken) } } - public async Task ExchangeUnbindAsync(string destination, string source, string routingKey, - IDictionary? arguments, bool noWait, - CancellationToken cancellationToken) + public async Task TxSelectAsync(CancellationToken cancellationToken) { bool enqueued = false; - var k = new ExchangeUnbindAsyncRpcContinuation(ContinuationTimeout, cancellationToken); + var k = new TxSelectAsyncRpcContinuation(ContinuationTimeout, cancellationToken); await _rpcSemaphore.WaitAsync(k.CancellationToken) .ConfigureAwait(false); try { - var method = new ExchangeUnbind(destination, source, routingKey, noWait, arguments); - - if (noWait) - { - await ModelSendAsync(in method, k.CancellationToken) - .ConfigureAwait(false); - } - else - { - enqueued = Enqueue(k); - - await ModelSendAsync(in method, k.CancellationToken) - .ConfigureAwait(false); + enqueued = Enqueue(k); - bool result = await k; - Debug.Assert(result); - } + var method = new TxSelect(); + await ModelSendAsync(in method, k.CancellationToken) + .ConfigureAwait(false); + bool result = await k; + Debug.Assert(result); return; } finally @@ -1239,164 +942,196 @@ await ModelSendAsync(in method, k.CancellationToken) } } - public Task QueueDeclarePassiveAsync(string queue, - CancellationToken cancellationToken) - { - return QueueDeclareAsync(queue: queue, passive: true, - durable: false, exclusive: false, autoDelete: false, - noWait: false, arguments: null, cancellationToken: cancellationToken); - } + public override string ToString() + => Session.ToString()!; - public async Task QueueDeclareAsync(string queue, bool durable, bool exclusive, bool autoDelete, - IDictionary? arguments, bool passive, bool noWait, - CancellationToken cancellationToken) + public void Dispose() { - if (true == noWait) + if (_disposed) { - if (queue == string.Empty) - { - throw new InvalidOperationException("noWait must not be used with a server-named queue."); - } - - if (true == passive) - { - throw new InvalidOperationException("It does not make sense to use noWait: true and passive: true"); - } + return; } - bool enqueued = false; - var k = new QueueDeclareAsyncRpcContinuation(ContinuationTimeout, cancellationToken); + Dispose(true); + } - await _rpcSemaphore.WaitAsync(k.CancellationToken) - .ConfigureAwait(false); - try + public async ValueTask DisposeAsync() + { + if (_disposed) { - var method = new QueueDeclare(queue, passive, durable, exclusive, autoDelete, noWait, arguments); + return; + } - if (noWait) - { + await DisposeAsyncCore(true) + .ConfigureAwait(false); - await ModelSendAsync(in method, k.CancellationToken) - .ConfigureAwait(false); + Dispose(false); + } - if (false == passive) - { - CurrentQueue = queue; - } + protected virtual void Dispose(bool disposing) + { + if (_disposed) + { + return; + } - return new QueueDeclareOk(queue, 0, 0); - } - else - { - enqueued = Enqueue(k); + if (IsDisposing) + { + return; + } - await ModelSendAsync(in method, k.CancellationToken) - .ConfigureAwait(false); + try + { + MaybeAbort(); - QueueDeclareOk result = await k; - if (false == passive) - { - CurrentQueue = result.QueueName; - } + MaybeDisposeOutstandingPublisherConfirmationsRateLimiter(); - return result; - } + MaybeWaitForServerOriginatedClose(); } finally { - if (false == enqueued) + try { - k.Dispose(); + ConsumerDispatcher.Dispose(); + _rpcSemaphore.Dispose(); + _confirmSemaphore.Dispose(); } - _rpcSemaphore.Release(); + catch + { + } + _disposed = true; } } - public async Task QueueBindAsync(string queue, string exchange, string routingKey, - IDictionary? arguments, bool noWait, - CancellationToken cancellationToken) + protected virtual async ValueTask DisposeAsyncCore(bool disposing) { - bool enqueued = false; - var k = new QueueBindAsyncRpcContinuation(ContinuationTimeout, cancellationToken); - - await _rpcSemaphore.WaitAsync(k.CancellationToken) - .ConfigureAwait(false); - try + if (_disposed) { - var method = new QueueBind(queue, exchange, routingKey, noWait, arguments); + return; + } - if (noWait) + if (IsDisposing) + { + try { - await ModelSendAsync(in method, k.CancellationToken) + await MaybeAbortAsync() .ConfigureAwait(false); - } - else - { - enqueued = Enqueue(k); - await ModelSendAsync(in method, k.CancellationToken) + await MaybeDisposeOutstandingPublisherConfirmationsRateLimiterAsync() .ConfigureAwait(false); - bool result = await k; - Debug.Assert(result); + await MaybeWaitForServerOriginatedCloseAsync() + .ConfigureAwait(false); } - - return; - } - finally - { - if (false == enqueued) + finally { - k.Dispose(); + try + { + ConsumerDispatcher.Dispose(); + _rpcSemaphore.Dispose(); + _confirmSemaphore.Dispose(); + } + catch + { + } + _disposed = true; } - _rpcSemaphore.Release(); } } - public async Task MessageCountAsync(string queue, - CancellationToken cancellationToken) + protected virtual ulong AdjustDeliveryTag(ulong deliveryTag) + { + return deliveryTag; + } + + protected void TakeOver(Channel other) + { + _basicAcksAsyncWrapper.Takeover(other._basicAcksAsyncWrapper); + _basicNacksAsyncWrapper.Takeover(other._basicNacksAsyncWrapper); + _basicReturnAsyncWrapper.Takeover(other._basicReturnAsyncWrapper); + _callbackExceptionAsyncWrapper.Takeover(other._callbackExceptionAsyncWrapper); + _flowControlAsyncWrapper.Takeover(other._flowControlAsyncWrapper); + _channelShutdownAsyncWrapper.Takeover(other._channelShutdownAsyncWrapper); + _recoveryAsyncWrapper.Takeover(other._recoveryAsyncWrapper); + } + + [MethodImpl(MethodImplOptions.AggressiveInlining)] + protected ValueTask ModelSendAsync(in T method, CancellationToken cancellationToken) where T : struct, IOutgoingAmqpMethod + { + return Session.TransmitAsync(in method, cancellationToken); + } + + [MethodImpl(MethodImplOptions.AggressiveInlining)] + protected ValueTask ModelSendAsync(in TMethod method, in THeader header, ReadOnlyMemory body, CancellationToken cancellationToken) + where TMethod : struct, IOutgoingAmqpMethod + where THeader : IAmqpHeader + { + return Session.TransmitAsync(in method, in header, body, cancellationToken); + } + + internal static Task CreateAndOpenAsync(CreateChannelOptions createChannelOptions, ISession session, + CancellationToken cancellationToken) + { + var channel = new Channel(session, createChannelOptions); + return channel.OpenAsync(createChannelOptions, cancellationToken); + } + + internal Task ConnectionTuneOkAsync(ushort channelMax, uint frameMax, ushort heartbeat, CancellationToken cancellationToken) + { + var method = new ConnectionTuneOk(channelMax, frameMax, heartbeat); + return ModelSendAsync(in method, cancellationToken).AsTask(); + } + + internal Task OnExceptionAsync(Exception exception, string context, CancellationToken cancellationToken) => + OnCallbackExceptionAsync(CallbackExceptionEventArgs.Build(exception, context, cancellationToken)); + + internal Task OnCallbackExceptionAsync(CallbackExceptionEventArgs args) + { + return _callbackExceptionAsyncWrapper.InvokeAsync(this, args); + } + + internal TimeSpan HandshakeContinuationTimeout { get; set; } = TimeSpan.FromSeconds(10); + + internal Task RunRecoveryEventHandlers(object sender, CancellationToken cancellationToken) { - QueueDeclareOk ok = await QueueDeclarePassiveAsync(queue, cancellationToken) - .ConfigureAwait(false); - return ok.MessageCount; + return _recoveryAsyncWrapper.InvokeAsync(sender, AsyncEventArgs.CreateOrDefault(cancellationToken)); } - public async Task ConsumerCountAsync(string queue, - CancellationToken cancellationToken) + internal async ValueTask ConnectionOpenAsync(string virtualHost, CancellationToken cancellationToken) { - QueueDeclareOk ok = await QueueDeclarePassiveAsync(queue, cancellationToken) - .ConfigureAwait(false); - return ok.ConsumerCount; + using var timeoutTokenSource = new CancellationTokenSource(HandshakeContinuationTimeout); + using var lts = CancellationTokenSource.CreateLinkedTokenSource(timeoutTokenSource.Token, cancellationToken); + var method = new ConnectionOpen(virtualHost); + // Note: must be awaited or else the timeoutTokenSource instance will be disposed + await ModelSendAsync(in method, lts.Token).ConfigureAwait(false); } - public async Task QueueDeleteAsync(string queue, bool ifUnused, bool ifEmpty, bool noWait, + internal async ValueTask ConnectionSecureOkAsync(byte[] response, CancellationToken cancellationToken) { bool enqueued = false; - var k = new QueueDeleteAsyncRpcContinuation(ContinuationTimeout, cancellationToken); + var k = new ConnectionSecureOrTuneAsyncRpcContinuation(ContinuationTimeout, cancellationToken); await _rpcSemaphore.WaitAsync(k.CancellationToken) .ConfigureAwait(false); try { - var method = new QueueDelete(queue, ifUnused, ifEmpty, noWait); + enqueued = Enqueue(k); - if (noWait) + try { + var method = new ConnectionSecureOk(response); await ModelSendAsync(in method, k.CancellationToken) .ConfigureAwait(false); - - return 0; } - else + catch (AlreadyClosedException) { - enqueued = Enqueue(k); - - await ModelSendAsync(in method, k.CancellationToken) - .ConfigureAwait(false); - - return await k; + // let continuation throw OperationInterruptedException, + // which is a much more suitable exception before connection + // negotiation finishes } + + return await k; } finally { @@ -1408,11 +1143,13 @@ await ModelSendAsync(in method, k.CancellationToken) } } - public async Task QueuePurgeAsync(string queue, CancellationToken cancellationToken) + internal async ValueTask ConnectionStartOkAsync( + IDictionary clientProperties, + string mechanism, byte[] response, string locale, + CancellationToken cancellationToken) { bool enqueued = false; - - var k = new QueuePurgeAsyncRpcContinuation(ContinuationTimeout, cancellationToken); + var k = new ConnectionSecureOrTuneAsyncRpcContinuation(ContinuationTimeout, cancellationToken); await _rpcSemaphore.WaitAsync(k.CancellationToken) .ConfigureAwait(false); @@ -1420,9 +1157,18 @@ await _rpcSemaphore.WaitAsync(k.CancellationToken) { enqueued = Enqueue(k); - var method = new QueuePurge(queue, false); - await ModelSendAsync(in method, k.CancellationToken) - .ConfigureAwait(false); + try + { + var method = new ConnectionStartOk(clientProperties, mechanism, response, locale); + await ModelSendAsync(in method, k.CancellationToken) + .ConfigureAwait(false); + } + catch (AlreadyClosedException) + { + // let continuation throw OperationInterruptedException, + // which is a much more suitable exception before connection + // negotiation finishes + } return await k; } @@ -1436,12 +1182,15 @@ await ModelSendAsync(in method, k.CancellationToken) } } - public async Task QueueUnbindAsync(string queue, string exchange, string routingKey, - IDictionary? arguments, + internal async Task OpenAsync(CreateChannelOptions createChannelOptions, CancellationToken cancellationToken) { + ConfigurePublisherConfirmations(createChannelOptions.PublisherConfirmationsEnabled, + createChannelOptions.PublisherConfirmationTrackingEnabled, + createChannelOptions.OutstandingPublisherConfirmationsRateLimiter); + bool enqueued = false; - var k = new QueueUnbindAsyncRpcContinuation(ContinuationTimeout, cancellationToken); + var k = new ChannelOpenAsyncRpcContinuation(ContinuationTimeout, cancellationToken); await _rpcSemaphore.WaitAsync(k.CancellationToken) .ConfigureAwait(false); @@ -1449,13 +1198,15 @@ await _rpcSemaphore.WaitAsync(k.CancellationToken) { enqueued = Enqueue(k); - var method = new QueueUnbind(queue, exchange, routingKey, arguments); + var method = new ChannelOpen(); await ModelSendAsync(in method, k.CancellationToken) .ConfigureAwait(false); bool result = await k; Debug.Assert(result); - return; + + await MaybeConfirmSelect(cancellationToken) + .ConfigureAwait(false); } finally { @@ -1465,100 +1216,154 @@ await ModelSendAsync(in method, k.CancellationToken) } _rpcSemaphore.Release(); } + + return this; } - public async Task TxCommitAsync(CancellationToken cancellationToken) + internal async Task FinishCloseAsync(CancellationToken cancellationToken) { - bool enqueued = false; - var k = new TxCommitAsyncRpcContinuation(ContinuationTimeout, cancellationToken); + ShutdownEventArgs? reason = CloseReason; + if (reason != null) + { + await Session.CloseAsync(reason) + .ConfigureAwait(false); + } - await _rpcSemaphore.WaitAsync(k.CancellationToken) - .ConfigureAwait(false); - try + ConnectionStartCell?.TrySetResult(null); + } + + [MemberNotNull(nameof(_closeReason))] + internal bool SetCloseReason(ShutdownEventArgs reason) + { + if (reason is null) { - enqueued = Enqueue(k); + throw new ArgumentNullException(nameof(reason)); + } - var method = new TxCommit(); - await ModelSendAsync(in method, k.CancellationToken) - .ConfigureAwait(false); + // NB: this ensures that CloseAsync is only called once on a channel + return Interlocked.CompareExchange(ref _closeReason, reason, null) is null; + } - bool result = await k; - Debug.Assert(result); - return; + private bool Enqueue(IRpcContinuation k) + { + if (IsOpen) + { + _continuationQueue.Enqueue(k); + return true; } - finally + else { - if (false == enqueued) - { - k.Dispose(); - } - _rpcSemaphore.Release(); + k.HandleChannelShutdown(CloseReason); + return false; } } - public async Task TxRollbackAsync(CancellationToken cancellationToken) + ///Broadcasts notification of the final shutdown of the channel. + /// + /// + ///Do not call anywhere other than at the end of OnSessionShutdownAsync. + /// + /// + ///Must not be called when m_closeReason is null, because + ///otherwise there's a window when a new continuation could be + ///being enqueued at the same time as we're broadcasting the + ///shutdown event. See the definition of Enqueue() above. + /// + /// + private async Task OnChannelShutdownAsync(ShutdownEventArgs reason) { - bool enqueued = false; - var k = new TxRollbackAsyncRpcContinuation(ContinuationTimeout, cancellationToken); + _continuationQueue.HandleChannelShutdown(reason); - await _rpcSemaphore.WaitAsync(k.CancellationToken) + await _channelShutdownAsyncWrapper.InvokeAsync(this, reason) .ConfigureAwait(false); - try - { - enqueued = Enqueue(k); - var method = new TxRollback(); - await ModelSendAsync(in method, k.CancellationToken) - .ConfigureAwait(false); + await MaybeHandlePublisherConfirmationTcsOnChannelShutdownAsync(reason) + .ConfigureAwait(false); - bool result = await k; - Debug.Assert(result); - return; - } - finally - { - if (false == enqueued) - { - k.Dispose(); - } - _rpcSemaphore.Release(); - } + _flowControlBlock.Set(); } - public async Task TxSelectAsync(CancellationToken cancellationToken) + /* + * Note: + * Attempting to make this method async, with the resulting fallout, + * resulted in many flaky test results, especially around disposing + * Channels/Connections + * + * Aborted PR: https://github.com/rabbitmq/rabbitmq-dotnet-client/pull/1551 + */ + private async Task OnSessionShutdownAsync(object? sender, ShutdownEventArgs reason) { - bool enqueued = false; - var k = new TxSelectAsyncRpcContinuation(ContinuationTimeout, cancellationToken); - - await _rpcSemaphore.WaitAsync(k.CancellationToken) + ConsumerDispatcher.Quiesce(); + SetCloseReason(reason); + await OnChannelShutdownAsync(reason) .ConfigureAwait(false); + await ConsumerDispatcher.ShutdownAsync(reason) + .ConfigureAwait(false); + } + + private async Task HandleChannelCloseAsync(IncomingCommand cmd, CancellationToken cancellationToken) + { + TaskCompletionSource? serverOriginatedChannelCloseTcs = _serverOriginatedChannelCloseTcs; + if (serverOriginatedChannelCloseTcs is null) + { + // Attempt to assign the new TCS only if _tcs is still null + _ = Interlocked.CompareExchange(ref _serverOriginatedChannelCloseTcs, + new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously), null); + } + try { - enqueued = Enqueue(k); + var channelClose = new ChannelClose(cmd.MethodSpan); + SetCloseReason(new ShutdownEventArgs(ShutdownInitiator.Peer, + channelClose._replyCode, + channelClose._replyText, + channelClose._classId, + channelClose._methodId)); - var method = new TxSelect(); - await ModelSendAsync(in method, k.CancellationToken) + await Session.CloseAsync(_closeReason, notify: false) .ConfigureAwait(false); - bool result = await k; - Debug.Assert(result); - return; + var method = new ChannelCloseOk(); + await ModelSendAsync(in method, cancellationToken) + .ConfigureAwait(false); + + await Session.NotifyAsync(cancellationToken) + .ConfigureAwait(false); + + _serverOriginatedChannelCloseTcs?.TrySetResult(true); + return true; } - finally + catch (Exception ex) { - if (false == enqueued) - { - k.Dispose(); - } - _rpcSemaphore.Release(); + _serverOriginatedChannelCloseTcs?.TrySetException(ex); + throw; } } - internal static Task CreateAndOpenAsync(CreateChannelOptions createChannelOptions, ISession session, - CancellationToken cancellationToken) + private async Task HandleCommandAsync(IncomingCommand cmd, CancellationToken cancellationToken) { - var channel = new Channel(session, createChannelOptions); - return channel.OpenAsync(createChannelOptions, cancellationToken); + /* + * If DispatchCommandAsync returns `true`, it means that the incoming command is server-originated, and has + * already been handled. + * + * Else, the incoming command is the return of an RPC call, and must be handled. + */ + try + { + if (false == await DispatchCommandAsync(cmd, cancellationToken) + .ConfigureAwait(false)) + { + using (IRpcContinuation c = _continuationQueue.Next()) + { + await c.HandleCommandAsync(cmd) + .ConfigureAwait(false); + } + } + } + finally + { + cmd.ReturnBuffers(); + } } /// @@ -1661,5 +1466,293 @@ private bool IsDisposing return false; } } + + private async Task HandleBasicAck(IncomingCommand cmd, + CancellationToken cancellationToken = default) + { + var ack = new BasicAck(cmd.MethodSpan); + if (!_basicAcksAsyncWrapper.IsEmpty) + { + var args = new BasicAckEventArgs(ack._deliveryTag, ack._multiple, cancellationToken); + await _basicAcksAsyncWrapper.InvokeAsync(this, args) + .ConfigureAwait(false); + } + + HandleAck(ack._deliveryTag, ack._multiple); + + return true; + } + + private async Task HandleBasicNack(IncomingCommand cmd, + CancellationToken cancellationToken = default) + { + var nack = new BasicNack(cmd.MethodSpan); + if (!_basicNacksAsyncWrapper.IsEmpty) + { + var args = new BasicNackEventArgs( + nack._deliveryTag, nack._multiple, nack._requeue, cancellationToken); + await _basicNacksAsyncWrapper.InvokeAsync(this, args) + .ConfigureAwait(false); + } + + HandleNack(nack._deliveryTag, nack._multiple, false); + + return true; + } + + private async Task HandleBasicReturn(IncomingCommand cmd, CancellationToken cancellationToken) + { + var basicReturn = new BasicReturn(cmd.MethodSpan); + + var e = new BasicReturnEventArgs(basicReturn._replyCode, basicReturn._replyText, + basicReturn._exchange, basicReturn._routingKey, + new ReadOnlyBasicProperties(cmd.HeaderSpan), cmd.Body.Memory, cancellationToken); + + if (!_basicReturnAsyncWrapper.IsEmpty) + { + await _basicReturnAsyncWrapper.InvokeAsync(this, e) + .ConfigureAwait(false); + } + + HandleReturn(e); + + return true; + } + + private async Task HandleBasicCancelAsync(IncomingCommand cmd, CancellationToken cancellationToken) + { + string consumerTag = new BasicCancel(cmd.MethodSpan)._consumerTag; + await ConsumerDispatcher.HandleBasicCancelAsync(consumerTag, cancellationToken) + .ConfigureAwait(false); + return true; + } + + private async Task HandleBasicDeliverAsync(IncomingCommand cmd, CancellationToken cancellationToken) + { + var method = new BasicDeliver(cmd.MethodSpan); + var header = new ReadOnlyBasicProperties(cmd.HeaderSpan); + await ConsumerDispatcher.HandleBasicDeliverAsync( + method._consumerTag, + AdjustDeliveryTag(method._deliveryTag), + method._redelivered, + method._exchange, + method._routingKey, + header, + /* + * Takeover Body so it doesn't get returned as it is necessary + * for handling the Basic.Deliver method by client code. + */ + cmd.TakeoverBody(), + cancellationToken).ConfigureAwait(false); + return true; + } + + private async Task HandleChannelCloseOkAsync(IncomingCommand cmd, CancellationToken cancellationToken) + { + /* + * Note: + * This call _must_ come before completing the async continuation + */ + await FinishCloseAsync(cancellationToken) + .ConfigureAwait(false); + + if (_continuationQueue.TryPeek(out ChannelCloseAsyncRpcContinuation? k)) + { + _continuationQueue.Next(); + await k.HandleCommandAsync(cmd) + .ConfigureAwait(false); + } + + return true; + } + + private async Task HandleChannelFlowAsync(IncomingCommand cmd, CancellationToken cancellationToken) + { + bool active = new ChannelFlow(cmd.MethodSpan)._active; + if (active) + { + _flowControlBlock.Set(); + } + else + { + _flowControlBlock.Reset(); + } + + var method = new ChannelFlowOk(active); + await ModelSendAsync(in method, cancellationToken). + ConfigureAwait(false); + + if (!_flowControlAsyncWrapper.IsEmpty) + { + await _flowControlAsyncWrapper.InvokeAsync(this, new FlowControlEventArgs(active, cancellationToken)) + .ConfigureAwait(false); + } + + return true; + } + + private async Task HandleConnectionBlockedAsync(IncomingCommand cmd, CancellationToken cancellationToken) + { + string reason = new ConnectionBlocked(cmd.MethodSpan)._reason; + await Session.Connection.HandleConnectionBlockedAsync(reason, cancellationToken) + .ConfigureAwait(false); + return true; + } + + private async Task HandleConnectionCloseAsync(IncomingCommand cmd, CancellationToken cancellationToken) + { + var method = new ConnectionClose(cmd.MethodSpan); + var reason = new ShutdownEventArgs(ShutdownInitiator.Peer, method._replyCode, method._replyText, method._classId, method._methodId); + try + { + await Session.Connection.ClosedViaPeerAsync(reason, cancellationToken) + .ConfigureAwait(false); + + var replyMethod = new ConnectionCloseOk(); + await ModelSendAsync(in replyMethod, cancellationToken) + .ConfigureAwait(false); + + SetCloseReason(Session.Connection.CloseReason!); + } + catch (IOException) + { + // Ignored. We're only trying to be polite by sending + // the close-ok, after all. + } + catch (AlreadyClosedException) + { + // Ignored. We're only trying to be polite by sending + // the close-ok, after all. + } + + return true; + } + + private async Task HandleConnectionSecureAsync(IncomingCommand cmd, CancellationToken cancellationToken) + { + var k = (ConnectionSecureOrTuneAsyncRpcContinuation)_continuationQueue.Next(); + await k.HandleCommandAsync(new IncomingCommand()) + .ConfigureAwait(false); // release the continuation. + return true; + } + + private async Task HandleConnectionStartAsync(IncomingCommand cmd, CancellationToken cancellationToken) + { + if (ConnectionStartCell is null) + { + var reason = new ShutdownEventArgs(ShutdownInitiator.Library, Constants.CommandInvalid, "Unexpected Connection.Start"); + await Session.Connection.CloseAsync(reason, false, + InternalConstants.DefaultConnectionCloseTimeout, + cancellationToken) + .ConfigureAwait(false); + } + else + { + var method = new ConnectionStart(cmd.MethodSpan); + var details = new ConnectionStartDetails(method._locales, method._mechanisms, + method._serverProperties, method._versionMajor, method._versionMinor); + ConnectionStartCell.SetResult(details); + ConnectionStartCell = null; + } + + return true; + } + + private async Task HandleConnectionTuneAsync(IncomingCommand cmd, CancellationToken cancellationToken) + { + // Note: `using` here to ensure instance is disposed + using var k = (ConnectionSecureOrTuneAsyncRpcContinuation)_continuationQueue.Next(); + + // Note: releases the continuation and returns the buffers + await k.HandleCommandAsync(cmd) + .ConfigureAwait(false); + + return true; + } + + private async Task HandleConnectionUnblockedAsync(CancellationToken cancellationToken) + { + await Session.Connection.HandleConnectionUnblockedAsync(cancellationToken) + .ConfigureAwait(false); + return true; + } + + private void MaybeAbort() + { + if (IsOpen) + { + this.AbortAsync().GetAwaiter().GetResult(); + } + } + + private Task MaybeAbortAsync() + { + if (IsOpen) + { + return this.AbortAsync(); + } + else + { + return Task.CompletedTask; + } + } + + private void MaybeDisposeOutstandingPublisherConfirmationsRateLimiter() + { + if (_outstandingPublisherConfirmationsRateLimiter is not null) + { + try + { + _outstandingPublisherConfirmationsRateLimiter.Dispose(); + } + catch + { + } + } + } + + private async Task MaybeDisposeOutstandingPublisherConfirmationsRateLimiterAsync() + { + if (_outstandingPublisherConfirmationsRateLimiter is not null) + { + try + { + await _outstandingPublisherConfirmationsRateLimiter.DisposeAsync() + .ConfigureAwait(false); + } + catch + { + } + } + } + + private void MaybeWaitForServerOriginatedClose() + { + if (_serverOriginatedChannelCloseTcs is not null) + { + try + { + _serverOriginatedChannelCloseTcs.Task.Wait(TimeSpan.FromSeconds(5)); + } + catch + { + } + } + } + + private async Task MaybeWaitForServerOriginatedCloseAsync() + { + if (_serverOriginatedChannelCloseTcs is not null) + { + try + { + await _serverOriginatedChannelCloseTcs.Task.WaitAsync(TimeSpan.FromSeconds(5)) + .ConfigureAwait(false); + } + catch + { + } + } + } } } diff --git a/projects/RabbitMQ.Client/Impl/Connection.Commands.cs b/projects/RabbitMQ.Client/Impl/Connection.Commands.cs index ed79e6ea0..b44e3ee85 100644 --- a/projects/RabbitMQ.Client/Impl/Connection.Commands.cs +++ b/projects/RabbitMQ.Client/Impl/Connection.Commands.cs @@ -93,7 +93,7 @@ private async ValueTask StartAndTuneAsync(CancellationToken cancellationToken) }, state: connectionStartCell, useSynchronizationContext: false); #endif - _channel0.m_connectionStartCell = connectionStartCell; + _channel0.ConnectionStartCell = connectionStartCell; _channel0.HandshakeContinuationTimeout = _config.HandshakeContinuationTimeout; _frameHandler.ReadTimeout = _config.HandshakeContinuationTimeout; diff --git a/projects/RabbitMQ.Client/Impl/Connection.cs b/projects/RabbitMQ.Client/Impl/Connection.cs index fd88b5b5b..850d5e5d8 100644 --- a/projects/RabbitMQ.Client/Impl/Connection.cs +++ b/projects/RabbitMQ.Client/Impl/Connection.cs @@ -54,11 +54,17 @@ internal sealed partial class Connection : IConnection private readonly Channel _channel0; private readonly MainSession _session0; - private Guid _id = Guid.NewGuid(); + private readonly Guid _id = Guid.NewGuid(); private SessionManager _sessionManager; private ShutdownEventArgs? _closeReason; - public ShutdownEventArgs? CloseReason => Volatile.Read(ref _closeReason); + private ShutdownReportEntry[] _shutdownReport = Array.Empty(); + + private readonly AsyncEventingWrapper _callbackExceptionAsyncWrapper; + private readonly AsyncEventingWrapper _connectionBlockedAsyncWrapper; + private readonly AsyncEventingWrapper _connectionUnblockedAsyncWrapper; + private readonly AsyncEventingWrapper _consumerAboutToBeRecoveredAsyncWrapper; + private readonly AsyncEventingWrapper _connectionShutdownAsyncWrapper; internal Connection(ConnectionConfig config, IFrameHandler frameHandler) { @@ -94,6 +100,8 @@ Task onExceptionAsync(Exception exception, string context, CancellationToken can OnCallbackExceptionAsync(CallbackExceptionEventArgs.Build(exception, context, cancellationToken)); } + public ShutdownEventArgs? CloseReason => Volatile.Read(ref _closeReason); + public Guid Id => _id; public string? ClientProvidedName => _config.ClientProvidedName; @@ -114,48 +122,33 @@ Task onExceptionAsync(Exception exception, string context, CancellationToken can public IDictionary? ServerProperties { get; private set; } public IEnumerable ShutdownReport => _shutdownReport; - private ShutdownReportEntry[] _shutdownReport = Array.Empty(); ///Explicit implementation of IConnection.Protocol. IProtocol IConnection.Protocol => Endpoint.Protocol; - ///Another overload of a Protocol property, useful - ///for exposing a tighter type. - internal ProtocolBase Protocol => (ProtocolBase)Endpoint.Protocol; - - ///Used for testing only. - internal IFrameHandler FrameHandler - { - get { return _frameHandler; } - } - public event AsyncEventHandler CallbackExceptionAsync { add => _callbackExceptionAsyncWrapper.AddHandler(value); remove => _callbackExceptionAsyncWrapper.RemoveHandler(value); } - private AsyncEventingWrapper _callbackExceptionAsyncWrapper; public event AsyncEventHandler ConnectionBlockedAsync { add => _connectionBlockedAsyncWrapper.AddHandler(value); remove => _connectionBlockedAsyncWrapper.RemoveHandler(value); } - private AsyncEventingWrapper _connectionBlockedAsyncWrapper; public event AsyncEventHandler ConnectionUnblockedAsync { add => _connectionUnblockedAsyncWrapper.AddHandler(value); remove => _connectionUnblockedAsyncWrapper.RemoveHandler(value); } - private AsyncEventingWrapper _connectionUnblockedAsyncWrapper; public event AsyncEventHandler RecoveringConsumerAsync { add => _consumerAboutToBeRecoveredAsyncWrapper.AddHandler(value); remove => _consumerAboutToBeRecoveredAsyncWrapper.RemoveHandler(value); } - private AsyncEventingWrapper _consumerAboutToBeRecoveredAsyncWrapper; public event AsyncEventHandler ConnectionShutdownAsync { @@ -178,7 +171,6 @@ public event AsyncEventHandler ConnectionShutdownAsync _connectionShutdownAsyncWrapper.RemoveHandler(value); } } - private AsyncEventingWrapper _connectionShutdownAsyncWrapper; /// /// This event is never fired by non-recovering connections but it is a part of the interface. @@ -216,6 +208,96 @@ public event AsyncEventHandler QueueName remove { } } + public Task CreateChannelAsync(CreateChannelOptions? createChannelOptions = default, + CancellationToken cancellationToken = default) + { + EnsureIsOpen(); + + createChannelOptions = CreateChannelOptions.CreateOrUpdate(createChannelOptions, _config); + ISession session = CreateSession(); + return Channel.CreateAndOpenAsync(createChannelOptions, session, cancellationToken); + } + + ///Asynchronous API-side invocation of connection.close with timeout. + public Task CloseAsync(ushort reasonCode, string reasonText, TimeSpan timeout, bool abort, + CancellationToken cancellationToken = default) + { + var reason = new ShutdownEventArgs(ShutdownInitiator.Application, reasonCode, reasonText); + return CloseAsync(reason, abort, timeout, cancellationToken); + } + + public override string ToString() + { + return $"Connection({_id},{Endpoint})"; + } + + public void Dispose() + { + if (_disposed) + { + return; + } + + DisposeAsync().AsTask().GetAwaiter().GetResult(); + } + + public async ValueTask DisposeAsync() + { + if (_disposed) + { + return; + } + + if (IsDisposing) + { + return; + } + + try + { + if (IsOpen) + { + await this.AbortAsync() + .ConfigureAwait(false); + } + + _session0.Dispose(); + _mainLoopCts.Dispose(); + + await _channel0.DisposeAsync() + .ConfigureAwait(false); + } + catch (OperationInterruptedException) + { + // ignored, see rabbitmq/rabbitmq-dotnet-client#133 + } + finally + { + _disposed = true; + } + } + + internal Task OnCallbackExceptionAsync(CallbackExceptionEventArgs args) + { + return _callbackExceptionAsyncWrapper.InvokeAsync(this, args); + } + + internal ValueTask WriteAsync(RentedMemory frames, CancellationToken cancellationToken) + { + Activity.Current.SetNetworkTags(_frameHandler); + return _frameHandler.WriteAsync(frames, cancellationToken); + } + + ///Another overload of a Protocol property, useful + ///for exposing a tighter type. + internal ProtocolBase Protocol => (ProtocolBase)Endpoint.Protocol; + + ///Used for testing only. + internal IFrameHandler FrameHandler + { + get { return _frameHandler; } + } + internal void TakeOver(Connection other) { _callbackExceptionAsyncWrapper.Takeover(other._callbackExceptionAsyncWrapper); @@ -265,16 +347,6 @@ await CloseAsync(ea, true, } } - public Task CreateChannelAsync(CreateChannelOptions? createChannelOptions = default, - CancellationToken cancellationToken = default) - { - EnsureIsOpen(); - - createChannelOptions = CreateChannelOptions.CreateOrUpdate(createChannelOptions, _config); - ISession session = CreateSession(); - return Channel.CreateAndOpenAsync(createChannelOptions, session, cancellationToken); - } - internal ISession CreateSession() { return _sessionManager.Create(); @@ -295,14 +367,6 @@ internal void EnsureIsOpen() } } - ///Asynchronous API-side invocation of connection.close with timeout. - public Task CloseAsync(ushort reasonCode, string reasonText, TimeSpan timeout, bool abort, - CancellationToken cancellationToken = default) - { - var reason = new ShutdownEventArgs(ShutdownInitiator.Application, reasonCode, reasonText); - return CloseAsync(reason, abort, timeout, cancellationToken); - } - ///Asychronously try to close connection in a graceful way /// /// @@ -436,13 +500,17 @@ private async Task FinishCloseAsync(CancellationToken cancellationToken) { _mainLoopCts.Cancel(); _closed = true; + MaybeStopHeartbeatTimers(); await _frameHandler.CloseAsync(cancellationToken) .ConfigureAwait(false); + _channel0.SetCloseReason(CloseReason!); + await _channel0.FinishCloseAsync(cancellationToken) .ConfigureAwait(false); + RabbitMqClientEventSource.Log.ConnectionClosed(); } @@ -476,50 +544,6 @@ private void LogCloseError(string error, Exception ex) } } - internal Task OnCallbackExceptionAsync(CallbackExceptionEventArgs args) - { - return _callbackExceptionAsyncWrapper.InvokeAsync(this, args); - } - - internal ValueTask WriteAsync(RentedMemory frames, CancellationToken cancellationToken) - { - Activity.Current.SetNetworkTags(_frameHandler); - return _frameHandler.WriteAsync(frames, cancellationToken); - } - - public void Dispose() => DisposeAsync().AsTask().GetAwaiter().GetResult(); - - public async ValueTask DisposeAsync() - { - if (IsDisposing) - { - return; - } - - try - { - if (IsOpen) - { - await this.AbortAsync() - .ConfigureAwait(false); - } - - _session0.Dispose(); - _mainLoopCts.Dispose(); - - await _channel0.DisposeAsync() - .ConfigureAwait(false); - } - catch (OperationInterruptedException) - { - // ignored, see rabbitmq/rabbitmq-dotnet-client#133 - } - finally - { - _disposed = true; - } - } - [MethodImpl(MethodImplOptions.AggressiveInlining)] private void ThrowIfDisposed() { @@ -534,11 +558,6 @@ private void ThrowIfDisposed() static void ThrowDisposed() => throw new ObjectDisposedException(typeof(Connection).FullName); } - public override string ToString() - { - return $"Connection({_id},{Endpoint})"; - } - [DoesNotReturn] private static void ThrowAlreadyClosedException(ShutdownEventArgs closeReason) { diff --git a/projects/RabbitMQ.Client/Protocols.cs b/projects/RabbitMQ.Client/Protocols.cs index 9c4da91a6..d955683d3 100644 --- a/projects/RabbitMQ.Client/Protocols.cs +++ b/projects/RabbitMQ.Client/Protocols.cs @@ -39,11 +39,11 @@ public static class Protocols /// /// Protocol version 0-9-1 as modified by Pivotal. /// - public readonly static IProtocol AMQP_0_9_1 = new Framing.Protocol(); + public static readonly IProtocol AMQP_0_9_1 = new Framing.Protocol(); /// /// Retrieve the current default protocol variant (currently AMQP_0_9_1). /// - public readonly static IProtocol DefaultProtocol = AMQP_0_9_1; + public static readonly IProtocol DefaultProtocol = AMQP_0_9_1; } } diff --git a/projects/Test/Integration/TestToxiproxy.cs b/projects/Test/Integration/TestToxiproxy.cs index 4aeb219ec..3f0307bb7 100644 --- a/projects/Test/Integration/TestToxiproxy.cs +++ b/projects/Test/Integration/TestToxiproxy.cs @@ -41,12 +41,14 @@ using Xunit; using Xunit.Abstractions; +#nullable enable + namespace Test.Integration { public class TestToxiproxy : IntegrationFixture { private readonly TimeSpan _heartbeatTimeout = TimeSpan.FromSeconds(1); - private ToxiproxyManager _toxiproxyManager; + private ToxiproxyManager? _toxiproxyManager; private int _proxyPort; public TestToxiproxy(ITestOutputHelper output) : base(output) @@ -61,14 +63,24 @@ public override Task InitializeAsync() Assert.Null(_conn); Assert.Null(_channel); - _toxiproxyManager = new ToxiproxyManager(_testDisplayName, IsRunningInCI, IsWindows); - _proxyPort = ToxiproxyManager.ProxyPort; - return _toxiproxyManager.InitializeAsync(); + if (AreToxiproxyTestsEnabled) + { + _toxiproxyManager = new ToxiproxyManager(_testDisplayName, IsRunningInCI, IsWindows); + _proxyPort = ToxiproxyManager.ProxyPort; + return _toxiproxyManager.InitializeAsync(); + } + else + { + return Task.CompletedTask; + } } public override async Task DisposeAsync() { - await _toxiproxyManager.DisposeAsync(); + if (_toxiproxyManager is not null) + { + await _toxiproxyManager.DisposeAsync(); + } await base.DisposeAsync(); } @@ -77,6 +89,7 @@ public override async Task DisposeAsync() public async Task TestCloseConnection() { Skip.IfNot(AreToxiproxyTestsEnabled, "RABBITMQ_TOXIPROXY_TESTS is not set, skipping test"); + Assert.NotNull(_toxiproxyManager); ConnectionFactory cf = CreateConnectionFactory(); cf.Port = _proxyPort; @@ -199,6 +212,7 @@ async Task PublishLoop() public async Task TestThatStoppedSocketResultsInHeartbeatTimeout() { Skip.IfNot(AreToxiproxyTestsEnabled, "RABBITMQ_TOXIPROXY_TESTS is not set, skipping test"); + Assert.NotNull(_toxiproxyManager); ConnectionFactory cf = CreateConnectionFactory(); cf.Port = _proxyPort; @@ -246,6 +260,7 @@ await Assert.ThrowsAsync(() => public async Task TestTcpReset_GH1464() { Skip.IfNot(AreToxiproxyTestsEnabled, "RABBITMQ_TOXIPROXY_TESTS is not set, skipping test"); + Assert.NotNull(_toxiproxyManager); ConnectionFactory cf = CreateConnectionFactory(); cf.Endpoint = new AmqpTcpEndpoint(IPAddress.Loopback.ToString(), _proxyPort); @@ -298,6 +313,7 @@ public async Task TestTcpReset_GH1464() public async Task TestPublisherConfirmationThrottling() { Skip.IfNot(AreToxiproxyTestsEnabled, "RABBITMQ_TOXIPROXY_TESTS is not set, skipping test"); + Assert.NotNull(_toxiproxyManager); const int TotalMessageCount = 64; const int MaxOutstandingConfirms = 8; @@ -397,7 +413,7 @@ private bool AreToxiproxyTestsEnabled { get { - string s = Environment.GetEnvironmentVariable("RABBITMQ_TOXIPROXY_TESTS"); + string? s = Environment.GetEnvironmentVariable("RABBITMQ_TOXIPROXY_TESTS"); if (string.IsNullOrEmpty(s)) {