diff --git a/projects/Applications/GH-1647/Program.cs b/projects/Applications/GH-1647/Program.cs index 18ac9e57d..6a7c4b523 100644 --- a/projects/Applications/GH-1647/Program.cs +++ b/projects/Applications/GH-1647/Program.cs @@ -40,11 +40,7 @@ Password = "guest" }; -var channelOptions = new CreateChannelOptions -{ - PublisherConfirmationsEnabled = true, - PublisherConfirmationTrackingEnabled = true -}; +var channelOptions = new CreateChannelOptions(publisherConfirmationsEnabled: true, publisherConfirmationTrackingEnabled: true); var props = new BasicProperties(); byte[] msg = Encoding.UTF8.GetBytes("test"); diff --git a/projects/Applications/MassPublish/Program.cs b/projects/Applications/MassPublish/Program.cs index 3e074e3ff..1abab26f9 100644 --- a/projects/Applications/MassPublish/Program.cs +++ b/projects/Applications/MassPublish/Program.cs @@ -137,7 +137,9 @@ await consumeChannel.BasicConsumeAsync(queue: QueueName, autoAck: true, consumer publishTasks.Add(Task.Run(async () => { - using IChannel publishChannel = await publishConnection.CreateChannelAsync(new CreateChannelOptions { PublisherConfirmationsEnabled = true, PublisherConfirmationTrackingEnabled = true }); + var createChannelOptions = new CreateChannelOptions(publisherConfirmationsEnabled: true, + publisherConfirmationTrackingEnabled: true); + using IChannel publishChannel = await publishConnection.CreateChannelAsync(createChannelOptions); publishChannel.ChannelShutdownAsync += Channel_ChannelShutdownAsync; for (int i = 0; i < ItemsPerBatch; i++) diff --git a/projects/Applications/PublisherConfirms/PublisherConfirms.cs b/projects/Applications/PublisherConfirms/PublisherConfirms.cs index 98f2a5cde..87c1bfd84 100644 --- a/projects/Applications/PublisherConfirms/PublisherConfirms.cs +++ b/projects/Applications/PublisherConfirms/PublisherConfirms.cs @@ -43,12 +43,11 @@ const int MESSAGE_COUNT = 50_000; bool debug = false; -var channelOpts = new CreateChannelOptions -{ - PublisherConfirmationsEnabled = true, - PublisherConfirmationTrackingEnabled = true, - OutstandingPublisherConfirmationsRateLimiter = new ThrottlingRateLimiter(MAX_OUTSTANDING_CONFIRMS) -}; +var channelOpts = new CreateChannelOptions( + publisherConfirmationsEnabled: true, + publisherConfirmationTrackingEnabled: true, + outstandingPublisherConfirmationsRateLimiter: new ThrottlingRateLimiter(MAX_OUTSTANDING_CONFIRMS) +); var props = new BasicProperties { @@ -177,7 +176,7 @@ async Task HandlePublishConfirmsAsynchronously() await using IConnection connection = await CreateConnectionAsync(); - channelOpts.PublisherConfirmationTrackingEnabled = false; + channelOpts = new CreateChannelOptions(publisherConfirmationsEnabled: true, publisherConfirmationTrackingEnabled: false); await using IChannel channel = await connection.CreateChannelAsync(channelOpts); // declare a server-named queue diff --git a/projects/RabbitMQ.Client/ConnectionFactory.cs b/projects/RabbitMQ.Client/ConnectionFactory.cs index f723ae8e4..d8f669d71 100644 --- a/projects/RabbitMQ.Client/ConnectionFactory.cs +++ b/projects/RabbitMQ.Client/ConnectionFactory.cs @@ -185,8 +185,8 @@ public sealed class ConnectionFactory : ConnectionFactoryBase, IConnectionFactor /// public TimeSpan NetworkRecoveryInterval { get; set; } = TimeSpan.FromSeconds(5); - private TimeSpan _handshakeContinuationTimeout = TimeSpan.FromSeconds(10); - private TimeSpan _continuationTimeout = TimeSpan.FromSeconds(20); + private TimeSpan _handshakeContinuationTimeout = Constants.DefaultHandshakeContinuationTimeout; + private TimeSpan _continuationTimeout = Constants.DefaultContinuationTimeout; // just here to hold the value that was set through the setter private string? _clientProvidedName; diff --git a/projects/RabbitMQ.Client/Constants.cs b/projects/RabbitMQ.Client/Constants.cs index c6748c872..b9c77dc75 100644 --- a/projects/RabbitMQ.Client/Constants.cs +++ b/projects/RabbitMQ.Client/Constants.cs @@ -29,6 +29,8 @@ // Copyright (c) 2007-2024 Broadcom. All Rights Reserved. //--------------------------------------------------------------------------- +using System; + namespace RabbitMQ.Client { public static class Constants @@ -97,5 +99,15 @@ public static class Constants /// basic.return is sent via the broker. /// public const string PublishSequenceNumberHeader = "x-dotnet-pub-seq-no"; + + /// + /// The default timeout for initial AMQP handshake + /// + public static readonly TimeSpan DefaultHandshakeContinuationTimeout = TimeSpan.FromSeconds(10); + + /// + /// The default timeout for RPC methods + /// + public static readonly TimeSpan DefaultContinuationTimeout = TimeSpan.FromSeconds(20); } } diff --git a/projects/RabbitMQ.Client/CreateChannelOptions.cs b/projects/RabbitMQ.Client/CreateChannelOptions.cs index 3ffaa065b..5faabc166 100644 --- a/projects/RabbitMQ.Client/CreateChannelOptions.cs +++ b/projects/RabbitMQ.Client/CreateChannelOptions.cs @@ -29,6 +29,7 @@ // Copyright (c) 2007-2024 Broadcom. All Rights Reserved. //--------------------------------------------------------------------------- +using System; using System.Threading.RateLimiting; namespace RabbitMQ.Client @@ -38,6 +39,9 @@ namespace RabbitMQ.Client /// public sealed class CreateChannelOptions { + private ushort? _connectionConfigConsumerDispatchConcurrency; + private TimeSpan _connectionConfigContinuationTimeout; + /// /// Enable or disable publisher confirmations on this channel. Defaults to false /// @@ -49,7 +53,7 @@ public sealed class CreateChannelOptions /// to allow correlation /// of the response with the correct message. /// - public bool PublisherConfirmationsEnabled { get; set; } = false; + public readonly bool PublisherConfirmationsEnabled = false; /// /// Should this library track publisher confirmations for you? Defaults to false @@ -59,7 +63,7 @@ public sealed class CreateChannelOptions /// If the broker then sends a basic.return response for the message, this library can /// then correctly handle the message. /// - public bool PublisherConfirmationTrackingEnabled { get; set; } = false; + public readonly bool PublisherConfirmationTrackingEnabled = false; /// /// If the publisher confirmation tracking is enabled, this represents the rate limiter used to @@ -68,7 +72,7 @@ public sealed class CreateChannelOptions /// Defaults to a with a limit of 128 and a throttling percentage of 50% with a delay during throttling. /// /// Setting the rate limiter to null disables the rate limiting entirely. - public RateLimiter? OutstandingPublisherConfirmationsRateLimiter { get; set; } = new ThrottlingRateLimiter(128); + public readonly RateLimiter? OutstandingPublisherConfirmationsRateLimiter = new ThrottlingRateLimiter(128); /// /// Set to a value greater than one to enable concurrent processing. For a concurrency greater than one @@ -80,11 +84,62 @@ public sealed class CreateChannelOptions /// For concurrency greater than one this removes the guarantee that consumers handle messages in the order they receive them. /// In addition to that consumers need to be thread/concurrency safe. /// - public ushort? ConsumerDispatchConcurrency { get; set; } = null; + public readonly ushort? ConsumerDispatchConcurrency = null; - /// - /// The default channel options. - /// - public static CreateChannelOptions Default { get; } = new CreateChannelOptions(); + public CreateChannelOptions(bool publisherConfirmationsEnabled, + bool publisherConfirmationTrackingEnabled, + RateLimiter? outstandingPublisherConfirmationsRateLimiter = null, + ushort? consumerDispatchConcurrency = Constants.DefaultConsumerDispatchConcurrency) + { + PublisherConfirmationsEnabled = publisherConfirmationsEnabled; + PublisherConfirmationTrackingEnabled = publisherConfirmationTrackingEnabled; + OutstandingPublisherConfirmationsRateLimiter = outstandingPublisherConfirmationsRateLimiter; + ConsumerDispatchConcurrency = consumerDispatchConcurrency; + } + + internal ushort InternalConsumerDispatchConcurrency + { + get + { + if (ConsumerDispatchConcurrency is not null) + { + return ConsumerDispatchConcurrency.Value; + } + + if (_connectionConfigConsumerDispatchConcurrency is not null) + { + return _connectionConfigConsumerDispatchConcurrency.Value; + } + + return Constants.DefaultConsumerDispatchConcurrency; + } + } + + internal TimeSpan ContinuationTimeout => _connectionConfigContinuationTimeout; + + internal CreateChannelOptions(ConnectionConfig connectionConfig) + { + _connectionConfigConsumerDispatchConcurrency = connectionConfig.ConsumerDispatchConcurrency; + _connectionConfigContinuationTimeout = connectionConfig.ContinuationTimeout; + } + + private CreateChannelOptions WithConnectionConfig(ConnectionConfig connectionConfig) + { + _connectionConfigConsumerDispatchConcurrency = connectionConfig.ConsumerDispatchConcurrency; + _connectionConfigContinuationTimeout = connectionConfig.ContinuationTimeout; + return this; + } + + internal static CreateChannelOptions CreateOrUpdate(CreateChannelOptions? createChannelOptions, ConnectionConfig config) + { + if (createChannelOptions is null) + { + return new CreateChannelOptions(config); + } + else + { + return createChannelOptions.WithConnectionConfig(config); + } + } } } diff --git a/projects/RabbitMQ.Client/Impl/AutorecoveringChannel.cs b/projects/RabbitMQ.Client/Impl/AutorecoveringChannel.cs index d14fb7e5e..d98734f19 100644 --- a/projects/RabbitMQ.Client/Impl/AutorecoveringChannel.cs +++ b/projects/RabbitMQ.Client/Impl/AutorecoveringChannel.cs @@ -42,7 +42,7 @@ namespace RabbitMQ.Client.Impl { internal sealed class AutorecoveringChannel : IChannel, IRecoverable { - private readonly ChannelOptions _channelOptions; + private readonly CreateChannelOptions _createChannelOptions; private readonly List _recordedConsumerTags = new List(); private AutorecoveringConnection _connection; @@ -73,11 +73,11 @@ public TimeSpan ContinuationTimeout public AutorecoveringChannel(AutorecoveringConnection conn, RecoveryAwareChannel innerChannel, - ChannelOptions channelOptions) + CreateChannelOptions createChannelOptions) { _connection = conn; _innerChannel = innerChannel; - _channelOptions = channelOptions; + _createChannelOptions = createChannelOptions; } public event AsyncEventHandler BasicAcksAsync @@ -162,7 +162,7 @@ internal async Task AutomaticallyRecoverAsync(AutorecoveringConnection con _connection = conn; - RecoveryAwareChannel newChannel = await conn.CreateNonRecoveringChannelAsync(_channelOptions, cancellationToken) + RecoveryAwareChannel newChannel = await conn.CreateNonRecoveringChannelAsync(_createChannelOptions, cancellationToken) .ConfigureAwait(false); newChannel.TakeOver(_innerChannel); diff --git a/projects/RabbitMQ.Client/Impl/AutorecoveringConnection.cs b/projects/RabbitMQ.Client/Impl/AutorecoveringConnection.cs index d8c746912..f38de44a1 100644 --- a/projects/RabbitMQ.Client/Impl/AutorecoveringConnection.cs +++ b/projects/RabbitMQ.Client/Impl/AutorecoveringConnection.cs @@ -185,11 +185,11 @@ public event AsyncEventHandler RecoveringConsumerAs public IProtocol Protocol => Endpoint.Protocol; public ValueTask CreateNonRecoveringChannelAsync( - ChannelOptions channelOptions, + CreateChannelOptions createChannelOptions, CancellationToken cancellationToken = default) { ISession session = InnerConnection.CreateSession(); - return RecoveryAwareChannel.CreateAndOpenAsync(session, channelOptions, cancellationToken); + return RecoveryAwareChannel.CreateAndOpenAsync(session, createChannelOptions, cancellationToken); } public override string ToString() @@ -251,21 +251,16 @@ await CloseInnerConnectionAsync() } } - public async Task CreateChannelAsync(CreateChannelOptions? options = default, + public async Task CreateChannelAsync(CreateChannelOptions? createChannelOptions = default, CancellationToken cancellationToken = default) { EnsureIsOpen(); - options ??= CreateChannelOptions.Default; - - ushort cdc = options.ConsumerDispatchConcurrency.GetValueOrDefault(_config.ConsumerDispatchConcurrency); - - var channelOptions = ChannelOptions.From(options, _config); - - RecoveryAwareChannel recoveryAwareChannel = await CreateNonRecoveringChannelAsync(channelOptions, cancellationToken) + createChannelOptions = CreateChannelOptions.CreateOrUpdate(createChannelOptions, _config); + RecoveryAwareChannel recoveryAwareChannel = await CreateNonRecoveringChannelAsync(createChannelOptions, cancellationToken) .ConfigureAwait(false); - var autorecoveringChannel = new AutorecoveringChannel(this, recoveryAwareChannel, channelOptions); + var autorecoveringChannel = new AutorecoveringChannel(this, recoveryAwareChannel, createChannelOptions); await RecordChannelAsync(autorecoveringChannel, channelsSemaphoreHeld: false, cancellationToken: cancellationToken) .ConfigureAwait(false); diff --git a/projects/RabbitMQ.Client/Impl/Channel.cs b/projects/RabbitMQ.Client/Impl/Channel.cs index 6d27a0571..64be4152e 100644 --- a/projects/RabbitMQ.Client/Impl/Channel.cs +++ b/projects/RabbitMQ.Client/Impl/Channel.cs @@ -61,10 +61,10 @@ internal partial class Channel : IChannel, IRecoverable internal readonly IConsumerDispatcher ConsumerDispatcher; - public Channel(ISession session, ChannelOptions channelOptions) + public Channel(ISession session, CreateChannelOptions createChannelOptions) { - ContinuationTimeout = channelOptions.ContinuationTimeout; - ConsumerDispatcher = new AsyncConsumerDispatcher(this, channelOptions.ConsumerDispatchConcurrency); + ContinuationTimeout = createChannelOptions.ContinuationTimeout; + ConsumerDispatcher = new AsyncConsumerDispatcher(this, createChannelOptions.InternalConsumerDispatchConcurrency); Func onExceptionAsync = (exception, context, cancellationToken) => OnCallbackExceptionAsync(CallbackExceptionEventArgs.Build(exception, context, cancellationToken)); _basicAcksAsyncWrapper = new AsyncEventingWrapper("OnBasicAck", onExceptionAsync); @@ -359,12 +359,12 @@ protected bool Enqueue(IRpcContinuation k) } } - internal async Task OpenAsync(ChannelOptions channelOptions, + internal async Task OpenAsync(CreateChannelOptions createChannelOptions, CancellationToken cancellationToken) { - ConfigurePublisherConfirmations(channelOptions.PublisherConfirmationsEnabled, - channelOptions.PublisherConfirmationTrackingEnabled, - channelOptions.OutstandingPublisherConfirmationsRateLimiter); + ConfigurePublisherConfirmations(createChannelOptions.PublisherConfirmationsEnabled, + createChannelOptions.PublisherConfirmationTrackingEnabled, + createChannelOptions.OutstandingPublisherConfirmationsRateLimiter); bool enqueued = false; var k = new ChannelOpenAsyncRpcContinuation(ContinuationTimeout, cancellationToken); @@ -1493,13 +1493,11 @@ await ModelSendAsync(in method, k.CancellationToken) } } - internal static Task CreateAndOpenAsync(CreateChannelOptions createChannelOptions, - ConnectionConfig connectionConfig, ISession session, + internal static Task CreateAndOpenAsync(CreateChannelOptions createChannelOptions, ISession session, CancellationToken cancellationToken) { - ChannelOptions channelOptions = ChannelOptions.From(createChannelOptions, connectionConfig); - var channel = new Channel(session, channelOptions); - return channel.OpenAsync(channelOptions, cancellationToken); + var channel = new Channel(session, createChannelOptions); + return channel.OpenAsync(createChannelOptions, cancellationToken); } /// diff --git a/projects/RabbitMQ.Client/Impl/ChannelOptions.cs b/projects/RabbitMQ.Client/Impl/ChannelOptions.cs deleted file mode 100644 index fc6b3fb8f..000000000 --- a/projects/RabbitMQ.Client/Impl/ChannelOptions.cs +++ /dev/null @@ -1,90 +0,0 @@ -// This source code is dual-licensed under the Apache License, version -// 2.0, and the Mozilla Public License, version 2.0. -// -// The APL v2.0: -// -//--------------------------------------------------------------------------- -// Copyright (c) 2007-2024 Broadcom. All Rights Reserved. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// https://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. -//--------------------------------------------------------------------------- -// -// The MPL v2.0: -// -//--------------------------------------------------------------------------- -// This Source Code Form is subject to the terms of the Mozilla Public -// License, v. 2.0. If a copy of the MPL was not distributed with this -// file, You can obtain one at https://mozilla.org/MPL/2.0/. -// -// Copyright (c) 2007-2024 Broadcom. All Rights Reserved. -//--------------------------------------------------------------------------- - -using System; -using System.Threading.RateLimiting; - -namespace RabbitMQ.Client.Impl -{ - internal sealed class ChannelOptions - { - private readonly bool _publisherConfirmationEnabled; - private readonly bool _publisherConfirmationTrackingEnabled; - private readonly ushort _consumerDispatchConcurrency; - private readonly RateLimiter? _outstandingPublisherConfirmationsRateLimiter; - private readonly TimeSpan _continuationTimeout; - - public ChannelOptions(bool publisherConfirmationEnabled, - bool publisherConfirmationTrackingEnabled, - ushort consumerDispatchConcurrency, - RateLimiter? outstandingPublisherConfirmationsRateLimiter, - TimeSpan continuationTimeout) - { - _publisherConfirmationEnabled = publisherConfirmationEnabled; - _publisherConfirmationTrackingEnabled = publisherConfirmationTrackingEnabled; - _consumerDispatchConcurrency = consumerDispatchConcurrency; - _outstandingPublisherConfirmationsRateLimiter = outstandingPublisherConfirmationsRateLimiter; - _continuationTimeout = continuationTimeout; - } - - public bool PublisherConfirmationsEnabled => _publisherConfirmationEnabled; - - public bool PublisherConfirmationTrackingEnabled => _publisherConfirmationTrackingEnabled; - - public ushort ConsumerDispatchConcurrency => _consumerDispatchConcurrency; - - public RateLimiter? OutstandingPublisherConfirmationsRateLimiter => _outstandingPublisherConfirmationsRateLimiter; - - public TimeSpan ContinuationTimeout => _continuationTimeout; - - public static ChannelOptions From(CreateChannelOptions createChannelOptions, - ConnectionConfig connectionConfig) - { - ushort cdc = createChannelOptions.ConsumerDispatchConcurrency.GetValueOrDefault( - connectionConfig.ConsumerDispatchConcurrency); - - return new ChannelOptions(createChannelOptions.PublisherConfirmationsEnabled, - createChannelOptions.PublisherConfirmationTrackingEnabled, - cdc, - createChannelOptions.OutstandingPublisherConfirmationsRateLimiter, - connectionConfig.ContinuationTimeout); - } - - public static ChannelOptions From(ConnectionConfig connectionConfig) - { - return new ChannelOptions(publisherConfirmationEnabled: false, - publisherConfirmationTrackingEnabled: false, - consumerDispatchConcurrency: Constants.DefaultConsumerDispatchConcurrency, - outstandingPublisherConfirmationsRateLimiter: null, - continuationTimeout: connectionConfig.ContinuationTimeout); - } - } -} diff --git a/projects/RabbitMQ.Client/Impl/Connection.cs b/projects/RabbitMQ.Client/Impl/Connection.cs index ed2f30c74..af26bb7ac 100644 --- a/projects/RabbitMQ.Client/Impl/Connection.cs +++ b/projects/RabbitMQ.Client/Impl/Connection.cs @@ -78,7 +78,7 @@ internal Connection(ConnectionConfig config, IFrameHandler frameHandler) _sessionManager = new SessionManager(this, 0, config.MaxInboundMessageBodySize); _session0 = new MainSession(this, config.MaxInboundMessageBodySize); - _channel0 = new Channel(_session0, ChannelOptions.From(config)); + _channel0 = new Channel(_session0, new CreateChannelOptions(config)); ClientProperties = new Dictionary(_config.ClientProperties) { @@ -268,10 +268,9 @@ public Task CreateChannelAsync(CreateChannelOptions? createChannelOpti { EnsureIsOpen(); - createChannelOptions ??= CreateChannelOptions.Default; + createChannelOptions = CreateChannelOptions.CreateOrUpdate(createChannelOptions, _config); ISession session = CreateSession(); - - return Channel.CreateAndOpenAsync(createChannelOptions, _config, session, cancellationToken); + return Channel.CreateAndOpenAsync(createChannelOptions, session, cancellationToken); } internal ISession CreateSession() diff --git a/projects/RabbitMQ.Client/Impl/RecoveryAwareChannel.cs b/projects/RabbitMQ.Client/Impl/RecoveryAwareChannel.cs index ba6081c98..33f1b2e73 100644 --- a/projects/RabbitMQ.Client/Impl/RecoveryAwareChannel.cs +++ b/projects/RabbitMQ.Client/Impl/RecoveryAwareChannel.cs @@ -36,8 +36,8 @@ namespace RabbitMQ.Client.Impl { internal sealed class RecoveryAwareChannel : Channel { - public RecoveryAwareChannel(ISession session, ChannelOptions channelOptions) - : base(session, channelOptions) + public RecoveryAwareChannel(ISession session, CreateChannelOptions createChannelOptions) + : base(session, createChannelOptions) { ActiveDeliveryTagOffset = 0; MaxSeenDeliveryTag = 0; @@ -104,11 +104,11 @@ public override ValueTask BasicRejectAsync(ulong deliveryTag, bool requeue, } } - internal static async ValueTask CreateAndOpenAsync(ISession session, ChannelOptions channelOptions, + internal static async ValueTask CreateAndOpenAsync(ISession session, CreateChannelOptions createChannelOptions, CancellationToken cancellationToken) { - var result = new RecoveryAwareChannel(session, channelOptions); - return (RecoveryAwareChannel)await result.OpenAsync(channelOptions, cancellationToken) + var result = new RecoveryAwareChannel(session, createChannelOptions); + return (RecoveryAwareChannel)await result.OpenAsync(createChannelOptions, cancellationToken) .ConfigureAwait(false); } } diff --git a/projects/RabbitMQ.Client/PublicAPI.Shipped.txt b/projects/RabbitMQ.Client/PublicAPI.Shipped.txt index 19ad30cab..b0df4fc67 100644 --- a/projects/RabbitMQ.Client/PublicAPI.Shipped.txt +++ b/projects/RabbitMQ.Client/PublicAPI.Shipped.txt @@ -919,3 +919,10 @@ static RabbitMQ.Client.IChannelExtensions.BasicPublishAsync(this RabbitMQ.Client static RabbitMQ.Client.IChannelExtensions.BasicPublishAsync(this RabbitMQ.Client.IChannel! channel, string! exchange, string! routingKey, bool mandatory, System.ReadOnlyMemory body, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.ValueTask static RabbitMQ.Client.IChannelExtensions.BasicPublishAsync(this RabbitMQ.Client.IChannel! channel, string! exchange, string! routingKey, System.ReadOnlyMemory body, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.ValueTask static RabbitMQ.Client.IChannelExtensions.BasicPublishAsync(this RabbitMQ.Client.IChannel! channel, RabbitMQ.Client.PublicationAddress! addr, T basicProperties, System.ReadOnlyMemory body, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.ValueTask +static readonly RabbitMQ.Client.Constants.DefaultContinuationTimeout -> System.TimeSpan +static readonly RabbitMQ.Client.Constants.DefaultHandshakeContinuationTimeout -> System.TimeSpan +RabbitMQ.Client.CreateChannelOptions.CreateChannelOptions(bool publisherConfirmationsEnabled, bool publisherConfirmationTrackingEnabled, System.Threading.RateLimiting.RateLimiter? outstandingPublisherConfirmationsRateLimiter = null, ushort? consumerDispatchConcurrency = 1) -> void +readonly RabbitMQ.Client.CreateChannelOptions.ConsumerDispatchConcurrency -> ushort? +readonly RabbitMQ.Client.CreateChannelOptions.OutstandingPublisherConfirmationsRateLimiter -> System.Threading.RateLimiting.RateLimiter? +readonly RabbitMQ.Client.CreateChannelOptions.PublisherConfirmationsEnabled -> bool +readonly RabbitMQ.Client.CreateChannelOptions.PublisherConfirmationTrackingEnabled -> bool diff --git a/projects/Test/Common/IntegrationFixture.cs b/projects/Test/Common/IntegrationFixture.cs index 70fa16adb..aba2464e0 100644 --- a/projects/Test/Common/IntegrationFixture.cs +++ b/projects/Test/Common/IntegrationFixture.cs @@ -64,6 +64,9 @@ public abstract class IntegrationFixture : IAsyncLifetime protected ConnectionFactory _connFactory; protected IConnection _conn; + + protected CreateChannelOptions _createChannelOptions = new(publisherConfirmationsEnabled: true, + publisherConfirmationTrackingEnabled: true); protected IChannel _channel; protected static readonly Encoding _encoding = new UTF8Encoding(); @@ -153,7 +156,9 @@ public virtual async Task InitializeAsync() if (_openChannel) { - _channel = await _conn.CreateChannelAsync(new CreateChannelOptions { PublisherConfirmationsEnabled = true, PublisherConfirmationTrackingEnabled = true }); + _createChannelOptions = new CreateChannelOptions(publisherConfirmationsEnabled: true, + publisherConfirmationTrackingEnabled: true, consumerDispatchConcurrency: _consumerDispatchConcurrency); + _channel = await _conn.CreateChannelAsync(_createChannelOptions); } if (IsVerbose) diff --git a/projects/Test/Common/TestConnectionRecoveryBase.cs b/projects/Test/Common/TestConnectionRecoveryBase.cs index bbd65519f..c6b3fbf2c 100644 --- a/projects/Test/Common/TestConnectionRecoveryBase.cs +++ b/projects/Test/Common/TestConnectionRecoveryBase.cs @@ -199,7 +199,7 @@ protected async Task PublishMessagesWhileClosingConnAsync(string queueName) { using (AutorecoveringConnection publishingConn = await CreateAutorecoveringConnectionAsync()) { - using (IChannel publishingChannel = await publishingConn.CreateChannelAsync(new CreateChannelOptions { PublisherConfirmationsEnabled = true, PublisherConfirmationTrackingEnabled = true })) + using (IChannel publishingChannel = await publishingConn.CreateChannelAsync(_createChannelOptions)) { for (ushort i = 0; i < TotalMessageCount; i++) { @@ -340,9 +340,9 @@ public virtual Task PostHandleDeliveryAsync(ulong deliveryTag, } } - protected static async Task SendAndConsumeMessageAsync(IConnection conn, string queue, string exchange, string routingKey) + protected async Task SendAndConsumeMessageAsync(IConnection conn, string queue, string exchange, string routingKey) { - using (IChannel ch = await conn.CreateChannelAsync(new CreateChannelOptions { PublisherConfirmationsEnabled = true, PublisherConfirmationTrackingEnabled = true })) + using (IChannel ch = await conn.CreateChannelAsync(_createChannelOptions)) { var tcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); diff --git a/projects/Test/Integration/ConnectionRecovery/TestConnectionRecovery.cs b/projects/Test/Integration/ConnectionRecovery/TestConnectionRecovery.cs index 3fcb978b2..4e516bbbf 100644 --- a/projects/Test/Integration/ConnectionRecovery/TestConnectionRecovery.cs +++ b/projects/Test/Integration/ConnectionRecovery/TestConnectionRecovery.cs @@ -96,7 +96,7 @@ await _channel.ExchangeDeclareAsync(exchange: exchangeName, consumer.ReceivedAsync += MessageReceived; await _channel.BasicConsumeAsync(queueName, true, consumer); - await using (IChannel pubCh = await _conn.CreateChannelAsync(new CreateChannelOptions { PublisherConfirmationsEnabled = true, PublisherConfirmationTrackingEnabled = true })) + await using (IChannel pubCh = await _conn.CreateChannelAsync(_createChannelOptions)) { await pubCh.BasicPublishAsync(exchange: exchangeName, routingKey: routingKey, body: body); await pubCh.CloseAsync(); @@ -106,7 +106,7 @@ await _channel.ExchangeDeclareAsync(exchange: exchangeName, await CloseAndWaitForRecoveryAsync(); - await using (IChannel pubCh = await _conn.CreateChannelAsync(new CreateChannelOptions { PublisherConfirmationsEnabled = true, PublisherConfirmationTrackingEnabled = true })) + await using (IChannel pubCh = await _conn.CreateChannelAsync(_createChannelOptions)) { await pubCh.BasicPublishAsync(exchange: exchangeName, routingKey: "unused", body: body); await pubCh.CloseAsync(); diff --git a/projects/Test/Integration/TestAsyncConsumer.cs b/projects/Test/Integration/TestAsyncConsumer.cs index 773238ff6..7dd4deebb 100644 --- a/projects/Test/Integration/TestAsyncConsumer.cs +++ b/projects/Test/Integration/TestAsyncConsumer.cs @@ -213,7 +213,7 @@ public async Task TestBasicRoundtripConcurrentManyMessages() }); return Task.CompletedTask; }; - await using (IChannel publishChannel = await publishConn.CreateChannelAsync(new CreateChannelOptions { PublisherConfirmationsEnabled = true, PublisherConfirmationTrackingEnabled = true })) + await using (IChannel publishChannel = await publishConn.CreateChannelAsync(_createChannelOptions)) { AddCallbackExceptionHandlers(publishConn, publishChannel); publishChannel.DefaultConsumer = new DefaultAsyncConsumer(publishChannel, @@ -646,7 +646,7 @@ public async Task TestCreateChannelWithinAsyncConsumerCallback_GH650() var consumer1 = new AsyncEventingBasicConsumer(_channel); consumer1.ReceivedAsync += async (sender, args) => { - await using IChannel innerChannel = await _conn.CreateChannelAsync(new CreateChannelOptions { PublisherConfirmationsEnabled = true, PublisherConfirmationTrackingEnabled = true }); + await using IChannel innerChannel = await _conn.CreateChannelAsync(_createChannelOptions); await innerChannel.BasicPublishAsync(exchangeName, queue2Name, mandatory: true, body: Encoding.ASCII.GetBytes(nameof(TestCreateChannelWithinAsyncConsumerCallback_GH650))); @@ -707,9 +707,15 @@ private async Task ValidateConsumerDispatchConcurrency() AutorecoveringChannel autorecoveringChannel = (AutorecoveringChannel)_channel; Assert.Equal(ConsumerDispatchConcurrency, autorecoveringChannel.ConsumerDispatcher.Concurrency); Assert.Equal(_consumerDispatchConcurrency, autorecoveringChannel.ConsumerDispatcher.Concurrency); - await using IChannel ch = await _conn.CreateChannelAsync( - new CreateChannelOptions { ConsumerDispatchConcurrency = expectedConsumerDispatchConcurrency }); + + var createChannelOptions = new CreateChannelOptions(publisherConfirmationsEnabled: false, + publisherConfirmationTrackingEnabled: false, + outstandingPublisherConfirmationsRateLimiter: null, + consumerDispatchConcurrency: expectedConsumerDispatchConcurrency); + + await using IChannel ch = await _conn.CreateChannelAsync(createChannelOptions); AutorecoveringChannel ach = (AutorecoveringChannel)ch; + Assert.Equal(expectedConsumerDispatchConcurrency, ach.ConsumerDispatcher.Concurrency); } diff --git a/projects/Test/Integration/TestAsyncEventingBasicConsumer.cs b/projects/Test/Integration/TestAsyncEventingBasicConsumer.cs index cf3c59d31..e20c7ceb8 100644 --- a/projects/Test/Integration/TestAsyncEventingBasicConsumer.cs +++ b/projects/Test/Integration/TestAsyncEventingBasicConsumer.cs @@ -105,7 +105,7 @@ public async Task TestAsyncEventingBasicConsumer_GH1038() await _channel.BasicConsumeAsync(queueName, false, consumer); //publisher - await using IChannel publisherChannel = await _conn.CreateChannelAsync(new CreateChannelOptions { PublisherConfirmationsEnabled = true, PublisherConfirmationTrackingEnabled = true }); + await using IChannel publisherChannel = await _conn.CreateChannelAsync(_createChannelOptions); byte[] messageBodyBytes = System.Text.Encoding.UTF8.GetBytes("Hello, world!"); var props = new BasicProperties(); await publisherChannel.BasicPublishAsync(exchange: exchangeName, routingKey: string.Empty, diff --git a/projects/Test/Integration/TestBasicPublish.cs b/projects/Test/Integration/TestBasicPublish.cs index 107460fd8..92b49aca6 100644 --- a/projects/Test/Integration/TestBasicPublish.cs +++ b/projects/Test/Integration/TestBasicPublish.cs @@ -60,7 +60,7 @@ public override Task InitializeAsync() public async Task TestBasicRoundtripArray() { _conn = await _connFactory.CreateConnectionAsync(); - _channel = await _conn.CreateChannelAsync(new CreateChannelOptions { PublisherConfirmationsEnabled = true, PublisherConfirmationTrackingEnabled = true }); + _channel = await _conn.CreateChannelAsync(_createChannelOptions); QueueDeclareOk q = await _channel.QueueDeclareAsync(); var bp = new BasicProperties(); @@ -88,7 +88,7 @@ public async Task TestBasicRoundtripArray() public async Task TestBasicRoundtripCachedString() { _conn = await _connFactory.CreateConnectionAsync(); - _channel = await _conn.CreateChannelAsync(new CreateChannelOptions { PublisherConfirmationsEnabled = true, PublisherConfirmationTrackingEnabled = true }); + _channel = await _conn.CreateChannelAsync(_createChannelOptions); CachedString exchangeName = new CachedString(string.Empty); CachedString queueName = new CachedString((await _channel.QueueDeclareAsync()).QueueName); @@ -116,7 +116,7 @@ public async Task TestBasicRoundtripCachedString() public async Task TestBasicRoundtripReadOnlyMemory() { _conn = await _connFactory.CreateConnectionAsync(); - _channel = await _conn.CreateChannelAsync(new CreateChannelOptions { PublisherConfirmationsEnabled = true, PublisherConfirmationTrackingEnabled = true }); + _channel = await _conn.CreateChannelAsync(_createChannelOptions); QueueDeclareOk q = await _channel.QueueDeclareAsync(); byte[] sendBody = _encoding.GetBytes("hi"); @@ -143,7 +143,7 @@ public async Task TestBasicRoundtripReadOnlyMemory() public async Task CanNotModifyPayloadAfterPublish() { _conn = await _connFactory.CreateConnectionAsync(); - _channel = await _conn.CreateChannelAsync(new CreateChannelOptions { PublisherConfirmationsEnabled = true, PublisherConfirmationTrackingEnabled = true }); + _channel = await _conn.CreateChannelAsync(_createChannelOptions); QueueDeclareOk q = await _channel.QueueDeclareAsync(); byte[] sendBody = new byte[1000]; @@ -204,7 +204,7 @@ public async Task TestMaxInboundMessageBodySize() Assert.Equal(maxMsgSize, cf.Endpoint.MaxInboundMessageBodySize); Assert.Equal(maxMsgSize, conn.Endpoint.MaxInboundMessageBodySize); - await using (IChannel channel = await conn.CreateChannelAsync(new CreateChannelOptions { PublisherConfirmationsEnabled = true, PublisherConfirmationTrackingEnabled = true })) + await using (IChannel channel = await conn.CreateChannelAsync(_createChannelOptions)) { channel.ChannelShutdownAsync += (o, a) => { @@ -291,7 +291,7 @@ public async Task TestMaxInboundMessageBodySize() public async Task TestPropertiesRoundtrip_Headers() { _conn = await _connFactory.CreateConnectionAsync(); - _channel = await _conn.CreateChannelAsync(new CreateChannelOptions { PublisherConfirmationsEnabled = true, PublisherConfirmationTrackingEnabled = true }); + _channel = await _conn.CreateChannelAsync(_createChannelOptions); var subject = new BasicProperties { diff --git a/projects/Test/Integration/TestConcurrentAccessWithSharedConnection.cs b/projects/Test/Integration/TestConcurrentAccessWithSharedConnection.cs index b1febe2a2..348477cd5 100644 --- a/projects/Test/Integration/TestConcurrentAccessWithSharedConnection.cs +++ b/projects/Test/Integration/TestConcurrentAccessWithSharedConnection.cs @@ -108,7 +108,8 @@ private Task TestConcurrentChannelOpenAndPublishingWithBodyAsync(byte[] body, in try { - await using IChannel ch = await _conn.CreateChannelAsync(new CreateChannelOptions { PublisherConfirmationsEnabled = true }); + var createChannelOptions = new CreateChannelOptions(publisherConfirmationsEnabled: true, publisherConfirmationTrackingEnabled: false); + await using IChannel ch = await _conn.CreateChannelAsync(createChannelOptions); ch.ChannelShutdownAsync += (o, ea) => { HandleChannelShutdown(ch, ea, (args) => diff --git a/projects/Test/Integration/TestConnectionRecoveryWithoutSetup.cs b/projects/Test/Integration/TestConnectionRecoveryWithoutSetup.cs index ed5bc466c..b7de2a9b3 100644 --- a/projects/Test/Integration/TestConnectionRecoveryWithoutSetup.cs +++ b/projects/Test/Integration/TestConnectionRecoveryWithoutSetup.cs @@ -289,7 +289,7 @@ public async Task TestTopologyRecoveryConsumerFilter() return Task.CompletedTask; }; - await using (IChannel ch = await conn.CreateChannelAsync(new CreateChannelOptions { PublisherConfirmationsEnabled = true, PublisherConfirmationTrackingEnabled = true })) + await using (IChannel ch = await conn.CreateChannelAsync(_createChannelOptions)) { await ch.ExchangeDeclareAsync(exchange, "direct"); await ch.QueueDeclareAsync(queueWithRecoveredConsumer, false, false, false); diff --git a/projects/Test/Integration/TestConnectionTopologyRecovery.cs b/projects/Test/Integration/TestConnectionTopologyRecovery.cs index c05032190..2faafeb6d 100644 --- a/projects/Test/Integration/TestConnectionTopologyRecovery.cs +++ b/projects/Test/Integration/TestConnectionTopologyRecovery.cs @@ -104,7 +104,8 @@ public async Task TestTopologyRecoveryQueueFilter() tcs.SetResult(true); return Task.CompletedTask; }; - IChannel ch = await conn.CreateChannelAsync(new CreateChannelOptions { PublisherConfirmationsEnabled = true, PublisherConfirmationTrackingEnabled = true }); + + IChannel ch = await conn.CreateChannelAsync(_createChannelOptions); await ch.QueueDeclareAsync(queueToRecover, false, false, false); await ch.QueueDeclareAsync(queueToIgnore, false, false, false); @@ -155,7 +156,9 @@ public async Task TestTopologyRecoveryExchangeFilter() tcs.SetResult(true); return Task.CompletedTask; }; - IChannel ch = await conn.CreateChannelAsync(new CreateChannelOptions { PublisherConfirmationsEnabled = true, PublisherConfirmationTrackingEnabled = true }); + + IChannel ch = await conn.CreateChannelAsync(_createChannelOptions); + try { await ch.ExchangeDeclareAsync(exchangeToRecover, "topic", false, true); @@ -272,7 +275,7 @@ public async Task TestTopologyRecoveryDefaultFilterRecoversAllEntities() return Task.CompletedTask; }; - IChannel ch = await conn.CreateChannelAsync(new CreateChannelOptions { PublisherConfirmationsEnabled = true, PublisherConfirmationTrackingEnabled = true }); + IChannel ch = await conn.CreateChannelAsync(_createChannelOptions); try { @@ -367,7 +370,7 @@ await channel.QueueDeclareAsync(rq.Name, false, false, false, tcs.SetResult(true); return Task.CompletedTask; }; - IChannel ch = await conn.CreateChannelAsync(new CreateChannelOptions { PublisherConfirmationsEnabled = true, PublisherConfirmationTrackingEnabled = true }); + IChannel ch = await conn.CreateChannelAsync(_createChannelOptions); await ch.QueueDeclareAsync(queueToRecoverWithException, false, false, false); await ch.QueueDeclareAsync(queueToRecoverSuccessfully, false, false, false); diff --git a/projects/Test/Integration/TestFloodPublishing.cs b/projects/Test/Integration/TestFloodPublishing.cs index 337bd3d49..14e4d8c6e 100644 --- a/projects/Test/Integration/TestFloodPublishing.cs +++ b/projects/Test/Integration/TestFloodPublishing.cs @@ -65,7 +65,7 @@ public async Task TestUnthrottledFloodPublishing() _connFactory.AutomaticRecoveryEnabled = false; _conn = await _connFactory.CreateConnectionAsync(); Assert.IsNotType(_conn); - _channel = await _conn.CreateChannelAsync(new CreateChannelOptions { PublisherConfirmationsEnabled = true, PublisherConfirmationTrackingEnabled = true }); + _channel = await _conn.CreateChannelAsync(_createChannelOptions); _conn.ConnectionShutdownAsync += (_, ea) => { @@ -192,7 +192,7 @@ public async Task TestMultithreadFloodPublishing() return Task.CompletedTask; }; - await using (IChannel publishChannel = await publishConnection.CreateChannelAsync(new CreateChannelOptions { PublisherConfirmationsEnabled = true, PublisherConfirmationTrackingEnabled = true })) + await using (IChannel publishChannel = await publishConnection.CreateChannelAsync(_createChannelOptions)) { publishChannel.ChannelShutdownAsync += (o, ea) => diff --git a/projects/Test/Integration/TestPublisherConfirms.cs b/projects/Test/Integration/TestPublisherConfirms.cs index 4926aae6b..d6a551b30 100644 --- a/projects/Test/Integration/TestPublisherConfirms.cs +++ b/projects/Test/Integration/TestPublisherConfirms.cs @@ -52,7 +52,7 @@ public TestPublisherConfirms(ITestOutputHelper output) public async Task TestWaitForConfirmsWithEventsAsync() { string queueName = GenerateQueueName(); - await using IChannel ch = await _conn.CreateChannelAsync(new CreateChannelOptions { PublisherConfirmationsEnabled = true, PublisherConfirmationTrackingEnabled = true }); + await using IChannel ch = await _conn.CreateChannelAsync(_createChannelOptions); await ch.QueueDeclareAsync(queue: queueName, passive: false, durable: false, exclusive: true, autoDelete: false, arguments: null); diff --git a/projects/Test/Integration/TestToxiproxy.cs b/projects/Test/Integration/TestToxiproxy.cs index d6567ee1b..6e0ab2ec5 100644 --- a/projects/Test/Integration/TestToxiproxy.cs +++ b/projects/Test/Integration/TestToxiproxy.cs @@ -127,7 +127,7 @@ public async Task TestCloseConnection() async Task PublishLoop() { - await using IChannel ch = await conn.CreateChannelAsync(new CreateChannelOptions { PublisherConfirmationsEnabled = true, PublisherConfirmationTrackingEnabled = true }); + await using IChannel ch = await conn.CreateChannelAsync(_createChannelOptions); QueueDeclareOk q = await ch.QueueDeclareAsync(); while (conn.IsOpen) { @@ -206,7 +206,7 @@ public async Task TestThatStoppedSocketResultsInHeartbeatTimeout() Task pubTask = Task.Run(async () => { await using IConnection conn = await cf.CreateConnectionAsync(); - await using IChannel ch = await conn.CreateChannelAsync(new CreateChannelOptions { PublisherConfirmationsEnabled = true, PublisherConfirmationTrackingEnabled = true }); + await using IChannel ch = await conn.CreateChannelAsync(_createChannelOptions); QueueDeclareOk q = await ch.QueueDeclareAsync(); while (conn.IsOpen) { @@ -304,12 +304,11 @@ public async Task TestPublisherConfirmationThrottling() cf.RequestedHeartbeat = TimeSpan.FromSeconds(5); cf.AutomaticRecoveryEnabled = true; - var channelOpts = new CreateChannelOptions - { - PublisherConfirmationsEnabled = true, - PublisherConfirmationTrackingEnabled = true, - OutstandingPublisherConfirmationsRateLimiter = new ThrottlingRateLimiter(MaxOutstandingConfirms) - }; + var channelOpts = new CreateChannelOptions( + publisherConfirmationsEnabled: true, + publisherConfirmationTrackingEnabled: true, + outstandingPublisherConfirmationsRateLimiter: new ThrottlingRateLimiter(MaxOutstandingConfirms) + ); var channelCreatedTcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); var messagesPublishedTcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); diff --git a/projects/Test/OAuth2/TestOAuth2.cs b/projects/Test/OAuth2/TestOAuth2.cs index 347ad12ce..b10ae8cca 100644 --- a/projects/Test/OAuth2/TestOAuth2.cs +++ b/projects/Test/OAuth2/TestOAuth2.cs @@ -230,7 +230,8 @@ public async Task SecondConnectionCrashes_GH1429() private async Task DeclarePublishChannelAsync() { Assert.NotNull(_connection); - IChannel publishChannel = await _connection.CreateChannelAsync(new CreateChannelOptions { PublisherConfirmationsEnabled = true, PublisherConfirmationTrackingEnabled = true }); + var createChannelOptions = new CreateChannelOptions(publisherConfirmationsEnabled: true, publisherConfirmationTrackingEnabled: true); + IChannel publishChannel = await _connection.CreateChannelAsync(createChannelOptions); await publishChannel.ExchangeDeclareAsync("test_direct", ExchangeType.Direct, true, false); return publishChannel; }