diff --git a/projects/Applications/GH-1647/Program.cs b/projects/Applications/GH-1647/Program.cs
index 18ac9e57d..6a7c4b523 100644
--- a/projects/Applications/GH-1647/Program.cs
+++ b/projects/Applications/GH-1647/Program.cs
@@ -40,11 +40,7 @@
Password = "guest"
};
-var channelOptions = new CreateChannelOptions
-{
- PublisherConfirmationsEnabled = true,
- PublisherConfirmationTrackingEnabled = true
-};
+var channelOptions = new CreateChannelOptions(publisherConfirmationsEnabled: true, publisherConfirmationTrackingEnabled: true);
var props = new BasicProperties();
byte[] msg = Encoding.UTF8.GetBytes("test");
diff --git a/projects/Applications/MassPublish/Program.cs b/projects/Applications/MassPublish/Program.cs
index 3e074e3ff..1abab26f9 100644
--- a/projects/Applications/MassPublish/Program.cs
+++ b/projects/Applications/MassPublish/Program.cs
@@ -137,7 +137,9 @@ await consumeChannel.BasicConsumeAsync(queue: QueueName, autoAck: true, consumer
publishTasks.Add(Task.Run(async () =>
{
- using IChannel publishChannel = await publishConnection.CreateChannelAsync(new CreateChannelOptions { PublisherConfirmationsEnabled = true, PublisherConfirmationTrackingEnabled = true });
+ var createChannelOptions = new CreateChannelOptions(publisherConfirmationsEnabled: true,
+ publisherConfirmationTrackingEnabled: true);
+ using IChannel publishChannel = await publishConnection.CreateChannelAsync(createChannelOptions);
publishChannel.ChannelShutdownAsync += Channel_ChannelShutdownAsync;
for (int i = 0; i < ItemsPerBatch; i++)
diff --git a/projects/Applications/PublisherConfirms/PublisherConfirms.cs b/projects/Applications/PublisherConfirms/PublisherConfirms.cs
index 98f2a5cde..87c1bfd84 100644
--- a/projects/Applications/PublisherConfirms/PublisherConfirms.cs
+++ b/projects/Applications/PublisherConfirms/PublisherConfirms.cs
@@ -43,12 +43,11 @@
const int MESSAGE_COUNT = 50_000;
bool debug = false;
-var channelOpts = new CreateChannelOptions
-{
- PublisherConfirmationsEnabled = true,
- PublisherConfirmationTrackingEnabled = true,
- OutstandingPublisherConfirmationsRateLimiter = new ThrottlingRateLimiter(MAX_OUTSTANDING_CONFIRMS)
-};
+var channelOpts = new CreateChannelOptions(
+ publisherConfirmationsEnabled: true,
+ publisherConfirmationTrackingEnabled: true,
+ outstandingPublisherConfirmationsRateLimiter: new ThrottlingRateLimiter(MAX_OUTSTANDING_CONFIRMS)
+);
var props = new BasicProperties
{
@@ -177,7 +176,7 @@ async Task HandlePublishConfirmsAsynchronously()
await using IConnection connection = await CreateConnectionAsync();
- channelOpts.PublisherConfirmationTrackingEnabled = false;
+ channelOpts = new CreateChannelOptions(publisherConfirmationsEnabled: true, publisherConfirmationTrackingEnabled: false);
await using IChannel channel = await connection.CreateChannelAsync(channelOpts);
// declare a server-named queue
diff --git a/projects/RabbitMQ.Client/ConnectionFactory.cs b/projects/RabbitMQ.Client/ConnectionFactory.cs
index f723ae8e4..d8f669d71 100644
--- a/projects/RabbitMQ.Client/ConnectionFactory.cs
+++ b/projects/RabbitMQ.Client/ConnectionFactory.cs
@@ -185,8 +185,8 @@ public sealed class ConnectionFactory : ConnectionFactoryBase, IConnectionFactor
///
public TimeSpan NetworkRecoveryInterval { get; set; } = TimeSpan.FromSeconds(5);
- private TimeSpan _handshakeContinuationTimeout = TimeSpan.FromSeconds(10);
- private TimeSpan _continuationTimeout = TimeSpan.FromSeconds(20);
+ private TimeSpan _handshakeContinuationTimeout = Constants.DefaultHandshakeContinuationTimeout;
+ private TimeSpan _continuationTimeout = Constants.DefaultContinuationTimeout;
// just here to hold the value that was set through the setter
private string? _clientProvidedName;
diff --git a/projects/RabbitMQ.Client/Constants.cs b/projects/RabbitMQ.Client/Constants.cs
index c6748c872..b9c77dc75 100644
--- a/projects/RabbitMQ.Client/Constants.cs
+++ b/projects/RabbitMQ.Client/Constants.cs
@@ -29,6 +29,8 @@
// Copyright (c) 2007-2024 Broadcom. All Rights Reserved.
//---------------------------------------------------------------------------
+using System;
+
namespace RabbitMQ.Client
{
public static class Constants
@@ -97,5 +99,15 @@ public static class Constants
/// basic.return is sent via the broker.
///
public const string PublishSequenceNumberHeader = "x-dotnet-pub-seq-no";
+
+ ///
+ /// The default timeout for initial AMQP handshake
+ ///
+ public static readonly TimeSpan DefaultHandshakeContinuationTimeout = TimeSpan.FromSeconds(10);
+
+ ///
+ /// The default timeout for RPC methods
+ ///
+ public static readonly TimeSpan DefaultContinuationTimeout = TimeSpan.FromSeconds(20);
}
}
diff --git a/projects/RabbitMQ.Client/CreateChannelOptions.cs b/projects/RabbitMQ.Client/CreateChannelOptions.cs
index 3ffaa065b..5faabc166 100644
--- a/projects/RabbitMQ.Client/CreateChannelOptions.cs
+++ b/projects/RabbitMQ.Client/CreateChannelOptions.cs
@@ -29,6 +29,7 @@
// Copyright (c) 2007-2024 Broadcom. All Rights Reserved.
//---------------------------------------------------------------------------
+using System;
using System.Threading.RateLimiting;
namespace RabbitMQ.Client
@@ -38,6 +39,9 @@ namespace RabbitMQ.Client
///
public sealed class CreateChannelOptions
{
+ private ushort? _connectionConfigConsumerDispatchConcurrency;
+ private TimeSpan _connectionConfigContinuationTimeout;
+
///
/// Enable or disable publisher confirmations on this channel. Defaults to false
///
@@ -49,7 +53,7 @@ public sealed class CreateChannelOptions
/// to allow correlation
/// of the response with the correct message.
///
- public bool PublisherConfirmationsEnabled { get; set; } = false;
+ public readonly bool PublisherConfirmationsEnabled = false;
///
/// Should this library track publisher confirmations for you? Defaults to false
@@ -59,7 +63,7 @@ public sealed class CreateChannelOptions
/// If the broker then sends a basic.return response for the message, this library can
/// then correctly handle the message.
///
- public bool PublisherConfirmationTrackingEnabled { get; set; } = false;
+ public readonly bool PublisherConfirmationTrackingEnabled = false;
///
/// If the publisher confirmation tracking is enabled, this represents the rate limiter used to
@@ -68,7 +72,7 @@ public sealed class CreateChannelOptions
/// Defaults to a with a limit of 128 and a throttling percentage of 50% with a delay during throttling.
///
/// Setting the rate limiter to null disables the rate limiting entirely.
- public RateLimiter? OutstandingPublisherConfirmationsRateLimiter { get; set; } = new ThrottlingRateLimiter(128);
+ public readonly RateLimiter? OutstandingPublisherConfirmationsRateLimiter = new ThrottlingRateLimiter(128);
///
/// Set to a value greater than one to enable concurrent processing. For a concurrency greater than one
@@ -80,11 +84,62 @@ public sealed class CreateChannelOptions
/// For concurrency greater than one this removes the guarantee that consumers handle messages in the order they receive them.
/// In addition to that consumers need to be thread/concurrency safe.
///
- public ushort? ConsumerDispatchConcurrency { get; set; } = null;
+ public readonly ushort? ConsumerDispatchConcurrency = null;
- ///
- /// The default channel options.
- ///
- public static CreateChannelOptions Default { get; } = new CreateChannelOptions();
+ public CreateChannelOptions(bool publisherConfirmationsEnabled,
+ bool publisherConfirmationTrackingEnabled,
+ RateLimiter? outstandingPublisherConfirmationsRateLimiter = null,
+ ushort? consumerDispatchConcurrency = Constants.DefaultConsumerDispatchConcurrency)
+ {
+ PublisherConfirmationsEnabled = publisherConfirmationsEnabled;
+ PublisherConfirmationTrackingEnabled = publisherConfirmationTrackingEnabled;
+ OutstandingPublisherConfirmationsRateLimiter = outstandingPublisherConfirmationsRateLimiter;
+ ConsumerDispatchConcurrency = consumerDispatchConcurrency;
+ }
+
+ internal ushort InternalConsumerDispatchConcurrency
+ {
+ get
+ {
+ if (ConsumerDispatchConcurrency is not null)
+ {
+ return ConsumerDispatchConcurrency.Value;
+ }
+
+ if (_connectionConfigConsumerDispatchConcurrency is not null)
+ {
+ return _connectionConfigConsumerDispatchConcurrency.Value;
+ }
+
+ return Constants.DefaultConsumerDispatchConcurrency;
+ }
+ }
+
+ internal TimeSpan ContinuationTimeout => _connectionConfigContinuationTimeout;
+
+ internal CreateChannelOptions(ConnectionConfig connectionConfig)
+ {
+ _connectionConfigConsumerDispatchConcurrency = connectionConfig.ConsumerDispatchConcurrency;
+ _connectionConfigContinuationTimeout = connectionConfig.ContinuationTimeout;
+ }
+
+ private CreateChannelOptions WithConnectionConfig(ConnectionConfig connectionConfig)
+ {
+ _connectionConfigConsumerDispatchConcurrency = connectionConfig.ConsumerDispatchConcurrency;
+ _connectionConfigContinuationTimeout = connectionConfig.ContinuationTimeout;
+ return this;
+ }
+
+ internal static CreateChannelOptions CreateOrUpdate(CreateChannelOptions? createChannelOptions, ConnectionConfig config)
+ {
+ if (createChannelOptions is null)
+ {
+ return new CreateChannelOptions(config);
+ }
+ else
+ {
+ return createChannelOptions.WithConnectionConfig(config);
+ }
+ }
}
}
diff --git a/projects/RabbitMQ.Client/Impl/AutorecoveringChannel.cs b/projects/RabbitMQ.Client/Impl/AutorecoveringChannel.cs
index d14fb7e5e..d98734f19 100644
--- a/projects/RabbitMQ.Client/Impl/AutorecoveringChannel.cs
+++ b/projects/RabbitMQ.Client/Impl/AutorecoveringChannel.cs
@@ -42,7 +42,7 @@ namespace RabbitMQ.Client.Impl
{
internal sealed class AutorecoveringChannel : IChannel, IRecoverable
{
- private readonly ChannelOptions _channelOptions;
+ private readonly CreateChannelOptions _createChannelOptions;
private readonly List _recordedConsumerTags = new List();
private AutorecoveringConnection _connection;
@@ -73,11 +73,11 @@ public TimeSpan ContinuationTimeout
public AutorecoveringChannel(AutorecoveringConnection conn,
RecoveryAwareChannel innerChannel,
- ChannelOptions channelOptions)
+ CreateChannelOptions createChannelOptions)
{
_connection = conn;
_innerChannel = innerChannel;
- _channelOptions = channelOptions;
+ _createChannelOptions = createChannelOptions;
}
public event AsyncEventHandler BasicAcksAsync
@@ -162,7 +162,7 @@ internal async Task AutomaticallyRecoverAsync(AutorecoveringConnection con
_connection = conn;
- RecoveryAwareChannel newChannel = await conn.CreateNonRecoveringChannelAsync(_channelOptions, cancellationToken)
+ RecoveryAwareChannel newChannel = await conn.CreateNonRecoveringChannelAsync(_createChannelOptions, cancellationToken)
.ConfigureAwait(false);
newChannel.TakeOver(_innerChannel);
diff --git a/projects/RabbitMQ.Client/Impl/AutorecoveringConnection.cs b/projects/RabbitMQ.Client/Impl/AutorecoveringConnection.cs
index d8c746912..f38de44a1 100644
--- a/projects/RabbitMQ.Client/Impl/AutorecoveringConnection.cs
+++ b/projects/RabbitMQ.Client/Impl/AutorecoveringConnection.cs
@@ -185,11 +185,11 @@ public event AsyncEventHandler RecoveringConsumerAs
public IProtocol Protocol => Endpoint.Protocol;
public ValueTask CreateNonRecoveringChannelAsync(
- ChannelOptions channelOptions,
+ CreateChannelOptions createChannelOptions,
CancellationToken cancellationToken = default)
{
ISession session = InnerConnection.CreateSession();
- return RecoveryAwareChannel.CreateAndOpenAsync(session, channelOptions, cancellationToken);
+ return RecoveryAwareChannel.CreateAndOpenAsync(session, createChannelOptions, cancellationToken);
}
public override string ToString()
@@ -251,21 +251,16 @@ await CloseInnerConnectionAsync()
}
}
- public async Task CreateChannelAsync(CreateChannelOptions? options = default,
+ public async Task CreateChannelAsync(CreateChannelOptions? createChannelOptions = default,
CancellationToken cancellationToken = default)
{
EnsureIsOpen();
- options ??= CreateChannelOptions.Default;
-
- ushort cdc = options.ConsumerDispatchConcurrency.GetValueOrDefault(_config.ConsumerDispatchConcurrency);
-
- var channelOptions = ChannelOptions.From(options, _config);
-
- RecoveryAwareChannel recoveryAwareChannel = await CreateNonRecoveringChannelAsync(channelOptions, cancellationToken)
+ createChannelOptions = CreateChannelOptions.CreateOrUpdate(createChannelOptions, _config);
+ RecoveryAwareChannel recoveryAwareChannel = await CreateNonRecoveringChannelAsync(createChannelOptions, cancellationToken)
.ConfigureAwait(false);
- var autorecoveringChannel = new AutorecoveringChannel(this, recoveryAwareChannel, channelOptions);
+ var autorecoveringChannel = new AutorecoveringChannel(this, recoveryAwareChannel, createChannelOptions);
await RecordChannelAsync(autorecoveringChannel, channelsSemaphoreHeld: false, cancellationToken: cancellationToken)
.ConfigureAwait(false);
diff --git a/projects/RabbitMQ.Client/Impl/Channel.cs b/projects/RabbitMQ.Client/Impl/Channel.cs
index 6d27a0571..64be4152e 100644
--- a/projects/RabbitMQ.Client/Impl/Channel.cs
+++ b/projects/RabbitMQ.Client/Impl/Channel.cs
@@ -61,10 +61,10 @@ internal partial class Channel : IChannel, IRecoverable
internal readonly IConsumerDispatcher ConsumerDispatcher;
- public Channel(ISession session, ChannelOptions channelOptions)
+ public Channel(ISession session, CreateChannelOptions createChannelOptions)
{
- ContinuationTimeout = channelOptions.ContinuationTimeout;
- ConsumerDispatcher = new AsyncConsumerDispatcher(this, channelOptions.ConsumerDispatchConcurrency);
+ ContinuationTimeout = createChannelOptions.ContinuationTimeout;
+ ConsumerDispatcher = new AsyncConsumerDispatcher(this, createChannelOptions.InternalConsumerDispatchConcurrency);
Func onExceptionAsync = (exception, context, cancellationToken) =>
OnCallbackExceptionAsync(CallbackExceptionEventArgs.Build(exception, context, cancellationToken));
_basicAcksAsyncWrapper = new AsyncEventingWrapper("OnBasicAck", onExceptionAsync);
@@ -359,12 +359,12 @@ protected bool Enqueue(IRpcContinuation k)
}
}
- internal async Task OpenAsync(ChannelOptions channelOptions,
+ internal async Task OpenAsync(CreateChannelOptions createChannelOptions,
CancellationToken cancellationToken)
{
- ConfigurePublisherConfirmations(channelOptions.PublisherConfirmationsEnabled,
- channelOptions.PublisherConfirmationTrackingEnabled,
- channelOptions.OutstandingPublisherConfirmationsRateLimiter);
+ ConfigurePublisherConfirmations(createChannelOptions.PublisherConfirmationsEnabled,
+ createChannelOptions.PublisherConfirmationTrackingEnabled,
+ createChannelOptions.OutstandingPublisherConfirmationsRateLimiter);
bool enqueued = false;
var k = new ChannelOpenAsyncRpcContinuation(ContinuationTimeout, cancellationToken);
@@ -1493,13 +1493,11 @@ await ModelSendAsync(in method, k.CancellationToken)
}
}
- internal static Task CreateAndOpenAsync(CreateChannelOptions createChannelOptions,
- ConnectionConfig connectionConfig, ISession session,
+ internal static Task CreateAndOpenAsync(CreateChannelOptions createChannelOptions, ISession session,
CancellationToken cancellationToken)
{
- ChannelOptions channelOptions = ChannelOptions.From(createChannelOptions, connectionConfig);
- var channel = new Channel(session, channelOptions);
- return channel.OpenAsync(channelOptions, cancellationToken);
+ var channel = new Channel(session, createChannelOptions);
+ return channel.OpenAsync(createChannelOptions, cancellationToken);
}
///
diff --git a/projects/RabbitMQ.Client/Impl/ChannelOptions.cs b/projects/RabbitMQ.Client/Impl/ChannelOptions.cs
deleted file mode 100644
index fc6b3fb8f..000000000
--- a/projects/RabbitMQ.Client/Impl/ChannelOptions.cs
+++ /dev/null
@@ -1,90 +0,0 @@
-// This source code is dual-licensed under the Apache License, version
-// 2.0, and the Mozilla Public License, version 2.0.
-//
-// The APL v2.0:
-//
-//---------------------------------------------------------------------------
-// Copyright (c) 2007-2024 Broadcom. All Rights Reserved.
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// https://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-//---------------------------------------------------------------------------
-//
-// The MPL v2.0:
-//
-//---------------------------------------------------------------------------
-// This Source Code Form is subject to the terms of the Mozilla Public
-// License, v. 2.0. If a copy of the MPL was not distributed with this
-// file, You can obtain one at https://mozilla.org/MPL/2.0/.
-//
-// Copyright (c) 2007-2024 Broadcom. All Rights Reserved.
-//---------------------------------------------------------------------------
-
-using System;
-using System.Threading.RateLimiting;
-
-namespace RabbitMQ.Client.Impl
-{
- internal sealed class ChannelOptions
- {
- private readonly bool _publisherConfirmationEnabled;
- private readonly bool _publisherConfirmationTrackingEnabled;
- private readonly ushort _consumerDispatchConcurrency;
- private readonly RateLimiter? _outstandingPublisherConfirmationsRateLimiter;
- private readonly TimeSpan _continuationTimeout;
-
- public ChannelOptions(bool publisherConfirmationEnabled,
- bool publisherConfirmationTrackingEnabled,
- ushort consumerDispatchConcurrency,
- RateLimiter? outstandingPublisherConfirmationsRateLimiter,
- TimeSpan continuationTimeout)
- {
- _publisherConfirmationEnabled = publisherConfirmationEnabled;
- _publisherConfirmationTrackingEnabled = publisherConfirmationTrackingEnabled;
- _consumerDispatchConcurrency = consumerDispatchConcurrency;
- _outstandingPublisherConfirmationsRateLimiter = outstandingPublisherConfirmationsRateLimiter;
- _continuationTimeout = continuationTimeout;
- }
-
- public bool PublisherConfirmationsEnabled => _publisherConfirmationEnabled;
-
- public bool PublisherConfirmationTrackingEnabled => _publisherConfirmationTrackingEnabled;
-
- public ushort ConsumerDispatchConcurrency => _consumerDispatchConcurrency;
-
- public RateLimiter? OutstandingPublisherConfirmationsRateLimiter => _outstandingPublisherConfirmationsRateLimiter;
-
- public TimeSpan ContinuationTimeout => _continuationTimeout;
-
- public static ChannelOptions From(CreateChannelOptions createChannelOptions,
- ConnectionConfig connectionConfig)
- {
- ushort cdc = createChannelOptions.ConsumerDispatchConcurrency.GetValueOrDefault(
- connectionConfig.ConsumerDispatchConcurrency);
-
- return new ChannelOptions(createChannelOptions.PublisherConfirmationsEnabled,
- createChannelOptions.PublisherConfirmationTrackingEnabled,
- cdc,
- createChannelOptions.OutstandingPublisherConfirmationsRateLimiter,
- connectionConfig.ContinuationTimeout);
- }
-
- public static ChannelOptions From(ConnectionConfig connectionConfig)
- {
- return new ChannelOptions(publisherConfirmationEnabled: false,
- publisherConfirmationTrackingEnabled: false,
- consumerDispatchConcurrency: Constants.DefaultConsumerDispatchConcurrency,
- outstandingPublisherConfirmationsRateLimiter: null,
- continuationTimeout: connectionConfig.ContinuationTimeout);
- }
- }
-}
diff --git a/projects/RabbitMQ.Client/Impl/Connection.cs b/projects/RabbitMQ.Client/Impl/Connection.cs
index ed2f30c74..af26bb7ac 100644
--- a/projects/RabbitMQ.Client/Impl/Connection.cs
+++ b/projects/RabbitMQ.Client/Impl/Connection.cs
@@ -78,7 +78,7 @@ internal Connection(ConnectionConfig config, IFrameHandler frameHandler)
_sessionManager = new SessionManager(this, 0, config.MaxInboundMessageBodySize);
_session0 = new MainSession(this, config.MaxInboundMessageBodySize);
- _channel0 = new Channel(_session0, ChannelOptions.From(config));
+ _channel0 = new Channel(_session0, new CreateChannelOptions(config));
ClientProperties = new Dictionary(_config.ClientProperties)
{
@@ -268,10 +268,9 @@ public Task CreateChannelAsync(CreateChannelOptions? createChannelOpti
{
EnsureIsOpen();
- createChannelOptions ??= CreateChannelOptions.Default;
+ createChannelOptions = CreateChannelOptions.CreateOrUpdate(createChannelOptions, _config);
ISession session = CreateSession();
-
- return Channel.CreateAndOpenAsync(createChannelOptions, _config, session, cancellationToken);
+ return Channel.CreateAndOpenAsync(createChannelOptions, session, cancellationToken);
}
internal ISession CreateSession()
diff --git a/projects/RabbitMQ.Client/Impl/RecoveryAwareChannel.cs b/projects/RabbitMQ.Client/Impl/RecoveryAwareChannel.cs
index ba6081c98..33f1b2e73 100644
--- a/projects/RabbitMQ.Client/Impl/RecoveryAwareChannel.cs
+++ b/projects/RabbitMQ.Client/Impl/RecoveryAwareChannel.cs
@@ -36,8 +36,8 @@ namespace RabbitMQ.Client.Impl
{
internal sealed class RecoveryAwareChannel : Channel
{
- public RecoveryAwareChannel(ISession session, ChannelOptions channelOptions)
- : base(session, channelOptions)
+ public RecoveryAwareChannel(ISession session, CreateChannelOptions createChannelOptions)
+ : base(session, createChannelOptions)
{
ActiveDeliveryTagOffset = 0;
MaxSeenDeliveryTag = 0;
@@ -104,11 +104,11 @@ public override ValueTask BasicRejectAsync(ulong deliveryTag, bool requeue,
}
}
- internal static async ValueTask CreateAndOpenAsync(ISession session, ChannelOptions channelOptions,
+ internal static async ValueTask CreateAndOpenAsync(ISession session, CreateChannelOptions createChannelOptions,
CancellationToken cancellationToken)
{
- var result = new RecoveryAwareChannel(session, channelOptions);
- return (RecoveryAwareChannel)await result.OpenAsync(channelOptions, cancellationToken)
+ var result = new RecoveryAwareChannel(session, createChannelOptions);
+ return (RecoveryAwareChannel)await result.OpenAsync(createChannelOptions, cancellationToken)
.ConfigureAwait(false);
}
}
diff --git a/projects/RabbitMQ.Client/PublicAPI.Shipped.txt b/projects/RabbitMQ.Client/PublicAPI.Shipped.txt
index 19ad30cab..b0df4fc67 100644
--- a/projects/RabbitMQ.Client/PublicAPI.Shipped.txt
+++ b/projects/RabbitMQ.Client/PublicAPI.Shipped.txt
@@ -919,3 +919,10 @@ static RabbitMQ.Client.IChannelExtensions.BasicPublishAsync(this RabbitMQ.Client
static RabbitMQ.Client.IChannelExtensions.BasicPublishAsync(this RabbitMQ.Client.IChannel! channel, string! exchange, string! routingKey, bool mandatory, System.ReadOnlyMemory body, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.ValueTask
static RabbitMQ.Client.IChannelExtensions.BasicPublishAsync(this RabbitMQ.Client.IChannel! channel, string! exchange, string! routingKey, System.ReadOnlyMemory body, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.ValueTask
static RabbitMQ.Client.IChannelExtensions.BasicPublishAsync(this RabbitMQ.Client.IChannel! channel, RabbitMQ.Client.PublicationAddress! addr, T basicProperties, System.ReadOnlyMemory body, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.ValueTask
+static readonly RabbitMQ.Client.Constants.DefaultContinuationTimeout -> System.TimeSpan
+static readonly RabbitMQ.Client.Constants.DefaultHandshakeContinuationTimeout -> System.TimeSpan
+RabbitMQ.Client.CreateChannelOptions.CreateChannelOptions(bool publisherConfirmationsEnabled, bool publisherConfirmationTrackingEnabled, System.Threading.RateLimiting.RateLimiter? outstandingPublisherConfirmationsRateLimiter = null, ushort? consumerDispatchConcurrency = 1) -> void
+readonly RabbitMQ.Client.CreateChannelOptions.ConsumerDispatchConcurrency -> ushort?
+readonly RabbitMQ.Client.CreateChannelOptions.OutstandingPublisherConfirmationsRateLimiter -> System.Threading.RateLimiting.RateLimiter?
+readonly RabbitMQ.Client.CreateChannelOptions.PublisherConfirmationsEnabled -> bool
+readonly RabbitMQ.Client.CreateChannelOptions.PublisherConfirmationTrackingEnabled -> bool
diff --git a/projects/Test/Common/IntegrationFixture.cs b/projects/Test/Common/IntegrationFixture.cs
index 70fa16adb..aba2464e0 100644
--- a/projects/Test/Common/IntegrationFixture.cs
+++ b/projects/Test/Common/IntegrationFixture.cs
@@ -64,6 +64,9 @@ public abstract class IntegrationFixture : IAsyncLifetime
protected ConnectionFactory _connFactory;
protected IConnection _conn;
+
+ protected CreateChannelOptions _createChannelOptions = new(publisherConfirmationsEnabled: true,
+ publisherConfirmationTrackingEnabled: true);
protected IChannel _channel;
protected static readonly Encoding _encoding = new UTF8Encoding();
@@ -153,7 +156,9 @@ public virtual async Task InitializeAsync()
if (_openChannel)
{
- _channel = await _conn.CreateChannelAsync(new CreateChannelOptions { PublisherConfirmationsEnabled = true, PublisherConfirmationTrackingEnabled = true });
+ _createChannelOptions = new CreateChannelOptions(publisherConfirmationsEnabled: true,
+ publisherConfirmationTrackingEnabled: true, consumerDispatchConcurrency: _consumerDispatchConcurrency);
+ _channel = await _conn.CreateChannelAsync(_createChannelOptions);
}
if (IsVerbose)
diff --git a/projects/Test/Common/TestConnectionRecoveryBase.cs b/projects/Test/Common/TestConnectionRecoveryBase.cs
index bbd65519f..c6b3fbf2c 100644
--- a/projects/Test/Common/TestConnectionRecoveryBase.cs
+++ b/projects/Test/Common/TestConnectionRecoveryBase.cs
@@ -199,7 +199,7 @@ protected async Task PublishMessagesWhileClosingConnAsync(string queueName)
{
using (AutorecoveringConnection publishingConn = await CreateAutorecoveringConnectionAsync())
{
- using (IChannel publishingChannel = await publishingConn.CreateChannelAsync(new CreateChannelOptions { PublisherConfirmationsEnabled = true, PublisherConfirmationTrackingEnabled = true }))
+ using (IChannel publishingChannel = await publishingConn.CreateChannelAsync(_createChannelOptions))
{
for (ushort i = 0; i < TotalMessageCount; i++)
{
@@ -340,9 +340,9 @@ public virtual Task PostHandleDeliveryAsync(ulong deliveryTag,
}
}
- protected static async Task SendAndConsumeMessageAsync(IConnection conn, string queue, string exchange, string routingKey)
+ protected async Task SendAndConsumeMessageAsync(IConnection conn, string queue, string exchange, string routingKey)
{
- using (IChannel ch = await conn.CreateChannelAsync(new CreateChannelOptions { PublisherConfirmationsEnabled = true, PublisherConfirmationTrackingEnabled = true }))
+ using (IChannel ch = await conn.CreateChannelAsync(_createChannelOptions))
{
var tcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously);
diff --git a/projects/Test/Integration/ConnectionRecovery/TestConnectionRecovery.cs b/projects/Test/Integration/ConnectionRecovery/TestConnectionRecovery.cs
index 3fcb978b2..4e516bbbf 100644
--- a/projects/Test/Integration/ConnectionRecovery/TestConnectionRecovery.cs
+++ b/projects/Test/Integration/ConnectionRecovery/TestConnectionRecovery.cs
@@ -96,7 +96,7 @@ await _channel.ExchangeDeclareAsync(exchange: exchangeName,
consumer.ReceivedAsync += MessageReceived;
await _channel.BasicConsumeAsync(queueName, true, consumer);
- await using (IChannel pubCh = await _conn.CreateChannelAsync(new CreateChannelOptions { PublisherConfirmationsEnabled = true, PublisherConfirmationTrackingEnabled = true }))
+ await using (IChannel pubCh = await _conn.CreateChannelAsync(_createChannelOptions))
{
await pubCh.BasicPublishAsync(exchange: exchangeName, routingKey: routingKey, body: body);
await pubCh.CloseAsync();
@@ -106,7 +106,7 @@ await _channel.ExchangeDeclareAsync(exchange: exchangeName,
await CloseAndWaitForRecoveryAsync();
- await using (IChannel pubCh = await _conn.CreateChannelAsync(new CreateChannelOptions { PublisherConfirmationsEnabled = true, PublisherConfirmationTrackingEnabled = true }))
+ await using (IChannel pubCh = await _conn.CreateChannelAsync(_createChannelOptions))
{
await pubCh.BasicPublishAsync(exchange: exchangeName, routingKey: "unused", body: body);
await pubCh.CloseAsync();
diff --git a/projects/Test/Integration/TestAsyncConsumer.cs b/projects/Test/Integration/TestAsyncConsumer.cs
index 773238ff6..7dd4deebb 100644
--- a/projects/Test/Integration/TestAsyncConsumer.cs
+++ b/projects/Test/Integration/TestAsyncConsumer.cs
@@ -213,7 +213,7 @@ public async Task TestBasicRoundtripConcurrentManyMessages()
});
return Task.CompletedTask;
};
- await using (IChannel publishChannel = await publishConn.CreateChannelAsync(new CreateChannelOptions { PublisherConfirmationsEnabled = true, PublisherConfirmationTrackingEnabled = true }))
+ await using (IChannel publishChannel = await publishConn.CreateChannelAsync(_createChannelOptions))
{
AddCallbackExceptionHandlers(publishConn, publishChannel);
publishChannel.DefaultConsumer = new DefaultAsyncConsumer(publishChannel,
@@ -646,7 +646,7 @@ public async Task TestCreateChannelWithinAsyncConsumerCallback_GH650()
var consumer1 = new AsyncEventingBasicConsumer(_channel);
consumer1.ReceivedAsync += async (sender, args) =>
{
- await using IChannel innerChannel = await _conn.CreateChannelAsync(new CreateChannelOptions { PublisherConfirmationsEnabled = true, PublisherConfirmationTrackingEnabled = true });
+ await using IChannel innerChannel = await _conn.CreateChannelAsync(_createChannelOptions);
await innerChannel.BasicPublishAsync(exchangeName, queue2Name,
mandatory: true,
body: Encoding.ASCII.GetBytes(nameof(TestCreateChannelWithinAsyncConsumerCallback_GH650)));
@@ -707,9 +707,15 @@ private async Task ValidateConsumerDispatchConcurrency()
AutorecoveringChannel autorecoveringChannel = (AutorecoveringChannel)_channel;
Assert.Equal(ConsumerDispatchConcurrency, autorecoveringChannel.ConsumerDispatcher.Concurrency);
Assert.Equal(_consumerDispatchConcurrency, autorecoveringChannel.ConsumerDispatcher.Concurrency);
- await using IChannel ch = await _conn.CreateChannelAsync(
- new CreateChannelOptions { ConsumerDispatchConcurrency = expectedConsumerDispatchConcurrency });
+
+ var createChannelOptions = new CreateChannelOptions(publisherConfirmationsEnabled: false,
+ publisherConfirmationTrackingEnabled: false,
+ outstandingPublisherConfirmationsRateLimiter: null,
+ consumerDispatchConcurrency: expectedConsumerDispatchConcurrency);
+
+ await using IChannel ch = await _conn.CreateChannelAsync(createChannelOptions);
AutorecoveringChannel ach = (AutorecoveringChannel)ch;
+
Assert.Equal(expectedConsumerDispatchConcurrency, ach.ConsumerDispatcher.Concurrency);
}
diff --git a/projects/Test/Integration/TestAsyncEventingBasicConsumer.cs b/projects/Test/Integration/TestAsyncEventingBasicConsumer.cs
index cf3c59d31..e20c7ceb8 100644
--- a/projects/Test/Integration/TestAsyncEventingBasicConsumer.cs
+++ b/projects/Test/Integration/TestAsyncEventingBasicConsumer.cs
@@ -105,7 +105,7 @@ public async Task TestAsyncEventingBasicConsumer_GH1038()
await _channel.BasicConsumeAsync(queueName, false, consumer);
//publisher
- await using IChannel publisherChannel = await _conn.CreateChannelAsync(new CreateChannelOptions { PublisherConfirmationsEnabled = true, PublisherConfirmationTrackingEnabled = true });
+ await using IChannel publisherChannel = await _conn.CreateChannelAsync(_createChannelOptions);
byte[] messageBodyBytes = System.Text.Encoding.UTF8.GetBytes("Hello, world!");
var props = new BasicProperties();
await publisherChannel.BasicPublishAsync(exchange: exchangeName, routingKey: string.Empty,
diff --git a/projects/Test/Integration/TestBasicPublish.cs b/projects/Test/Integration/TestBasicPublish.cs
index 107460fd8..92b49aca6 100644
--- a/projects/Test/Integration/TestBasicPublish.cs
+++ b/projects/Test/Integration/TestBasicPublish.cs
@@ -60,7 +60,7 @@ public override Task InitializeAsync()
public async Task TestBasicRoundtripArray()
{
_conn = await _connFactory.CreateConnectionAsync();
- _channel = await _conn.CreateChannelAsync(new CreateChannelOptions { PublisherConfirmationsEnabled = true, PublisherConfirmationTrackingEnabled = true });
+ _channel = await _conn.CreateChannelAsync(_createChannelOptions);
QueueDeclareOk q = await _channel.QueueDeclareAsync();
var bp = new BasicProperties();
@@ -88,7 +88,7 @@ public async Task TestBasicRoundtripArray()
public async Task TestBasicRoundtripCachedString()
{
_conn = await _connFactory.CreateConnectionAsync();
- _channel = await _conn.CreateChannelAsync(new CreateChannelOptions { PublisherConfirmationsEnabled = true, PublisherConfirmationTrackingEnabled = true });
+ _channel = await _conn.CreateChannelAsync(_createChannelOptions);
CachedString exchangeName = new CachedString(string.Empty);
CachedString queueName = new CachedString((await _channel.QueueDeclareAsync()).QueueName);
@@ -116,7 +116,7 @@ public async Task TestBasicRoundtripCachedString()
public async Task TestBasicRoundtripReadOnlyMemory()
{
_conn = await _connFactory.CreateConnectionAsync();
- _channel = await _conn.CreateChannelAsync(new CreateChannelOptions { PublisherConfirmationsEnabled = true, PublisherConfirmationTrackingEnabled = true });
+ _channel = await _conn.CreateChannelAsync(_createChannelOptions);
QueueDeclareOk q = await _channel.QueueDeclareAsync();
byte[] sendBody = _encoding.GetBytes("hi");
@@ -143,7 +143,7 @@ public async Task TestBasicRoundtripReadOnlyMemory()
public async Task CanNotModifyPayloadAfterPublish()
{
_conn = await _connFactory.CreateConnectionAsync();
- _channel = await _conn.CreateChannelAsync(new CreateChannelOptions { PublisherConfirmationsEnabled = true, PublisherConfirmationTrackingEnabled = true });
+ _channel = await _conn.CreateChannelAsync(_createChannelOptions);
QueueDeclareOk q = await _channel.QueueDeclareAsync();
byte[] sendBody = new byte[1000];
@@ -204,7 +204,7 @@ public async Task TestMaxInboundMessageBodySize()
Assert.Equal(maxMsgSize, cf.Endpoint.MaxInboundMessageBodySize);
Assert.Equal(maxMsgSize, conn.Endpoint.MaxInboundMessageBodySize);
- await using (IChannel channel = await conn.CreateChannelAsync(new CreateChannelOptions { PublisherConfirmationsEnabled = true, PublisherConfirmationTrackingEnabled = true }))
+ await using (IChannel channel = await conn.CreateChannelAsync(_createChannelOptions))
{
channel.ChannelShutdownAsync += (o, a) =>
{
@@ -291,7 +291,7 @@ public async Task TestMaxInboundMessageBodySize()
public async Task TestPropertiesRoundtrip_Headers()
{
_conn = await _connFactory.CreateConnectionAsync();
- _channel = await _conn.CreateChannelAsync(new CreateChannelOptions { PublisherConfirmationsEnabled = true, PublisherConfirmationTrackingEnabled = true });
+ _channel = await _conn.CreateChannelAsync(_createChannelOptions);
var subject = new BasicProperties
{
diff --git a/projects/Test/Integration/TestConcurrentAccessWithSharedConnection.cs b/projects/Test/Integration/TestConcurrentAccessWithSharedConnection.cs
index b1febe2a2..348477cd5 100644
--- a/projects/Test/Integration/TestConcurrentAccessWithSharedConnection.cs
+++ b/projects/Test/Integration/TestConcurrentAccessWithSharedConnection.cs
@@ -108,7 +108,8 @@ private Task TestConcurrentChannelOpenAndPublishingWithBodyAsync(byte[] body, in
try
{
- await using IChannel ch = await _conn.CreateChannelAsync(new CreateChannelOptions { PublisherConfirmationsEnabled = true });
+ var createChannelOptions = new CreateChannelOptions(publisherConfirmationsEnabled: true, publisherConfirmationTrackingEnabled: false);
+ await using IChannel ch = await _conn.CreateChannelAsync(createChannelOptions);
ch.ChannelShutdownAsync += (o, ea) =>
{
HandleChannelShutdown(ch, ea, (args) =>
diff --git a/projects/Test/Integration/TestConnectionRecoveryWithoutSetup.cs b/projects/Test/Integration/TestConnectionRecoveryWithoutSetup.cs
index ed5bc466c..b7de2a9b3 100644
--- a/projects/Test/Integration/TestConnectionRecoveryWithoutSetup.cs
+++ b/projects/Test/Integration/TestConnectionRecoveryWithoutSetup.cs
@@ -289,7 +289,7 @@ public async Task TestTopologyRecoveryConsumerFilter()
return Task.CompletedTask;
};
- await using (IChannel ch = await conn.CreateChannelAsync(new CreateChannelOptions { PublisherConfirmationsEnabled = true, PublisherConfirmationTrackingEnabled = true }))
+ await using (IChannel ch = await conn.CreateChannelAsync(_createChannelOptions))
{
await ch.ExchangeDeclareAsync(exchange, "direct");
await ch.QueueDeclareAsync(queueWithRecoveredConsumer, false, false, false);
diff --git a/projects/Test/Integration/TestConnectionTopologyRecovery.cs b/projects/Test/Integration/TestConnectionTopologyRecovery.cs
index c05032190..2faafeb6d 100644
--- a/projects/Test/Integration/TestConnectionTopologyRecovery.cs
+++ b/projects/Test/Integration/TestConnectionTopologyRecovery.cs
@@ -104,7 +104,8 @@ public async Task TestTopologyRecoveryQueueFilter()
tcs.SetResult(true);
return Task.CompletedTask;
};
- IChannel ch = await conn.CreateChannelAsync(new CreateChannelOptions { PublisherConfirmationsEnabled = true, PublisherConfirmationTrackingEnabled = true });
+
+ IChannel ch = await conn.CreateChannelAsync(_createChannelOptions);
await ch.QueueDeclareAsync(queueToRecover, false, false, false);
await ch.QueueDeclareAsync(queueToIgnore, false, false, false);
@@ -155,7 +156,9 @@ public async Task TestTopologyRecoveryExchangeFilter()
tcs.SetResult(true);
return Task.CompletedTask;
};
- IChannel ch = await conn.CreateChannelAsync(new CreateChannelOptions { PublisherConfirmationsEnabled = true, PublisherConfirmationTrackingEnabled = true });
+
+ IChannel ch = await conn.CreateChannelAsync(_createChannelOptions);
+
try
{
await ch.ExchangeDeclareAsync(exchangeToRecover, "topic", false, true);
@@ -272,7 +275,7 @@ public async Task TestTopologyRecoveryDefaultFilterRecoversAllEntities()
return Task.CompletedTask;
};
- IChannel ch = await conn.CreateChannelAsync(new CreateChannelOptions { PublisherConfirmationsEnabled = true, PublisherConfirmationTrackingEnabled = true });
+ IChannel ch = await conn.CreateChannelAsync(_createChannelOptions);
try
{
@@ -367,7 +370,7 @@ await channel.QueueDeclareAsync(rq.Name, false, false, false,
tcs.SetResult(true);
return Task.CompletedTask;
};
- IChannel ch = await conn.CreateChannelAsync(new CreateChannelOptions { PublisherConfirmationsEnabled = true, PublisherConfirmationTrackingEnabled = true });
+ IChannel ch = await conn.CreateChannelAsync(_createChannelOptions);
await ch.QueueDeclareAsync(queueToRecoverWithException, false, false, false);
await ch.QueueDeclareAsync(queueToRecoverSuccessfully, false, false, false);
diff --git a/projects/Test/Integration/TestFloodPublishing.cs b/projects/Test/Integration/TestFloodPublishing.cs
index 337bd3d49..14e4d8c6e 100644
--- a/projects/Test/Integration/TestFloodPublishing.cs
+++ b/projects/Test/Integration/TestFloodPublishing.cs
@@ -65,7 +65,7 @@ public async Task TestUnthrottledFloodPublishing()
_connFactory.AutomaticRecoveryEnabled = false;
_conn = await _connFactory.CreateConnectionAsync();
Assert.IsNotType(_conn);
- _channel = await _conn.CreateChannelAsync(new CreateChannelOptions { PublisherConfirmationsEnabled = true, PublisherConfirmationTrackingEnabled = true });
+ _channel = await _conn.CreateChannelAsync(_createChannelOptions);
_conn.ConnectionShutdownAsync += (_, ea) =>
{
@@ -192,7 +192,7 @@ public async Task TestMultithreadFloodPublishing()
return Task.CompletedTask;
};
- await using (IChannel publishChannel = await publishConnection.CreateChannelAsync(new CreateChannelOptions { PublisherConfirmationsEnabled = true, PublisherConfirmationTrackingEnabled = true }))
+ await using (IChannel publishChannel = await publishConnection.CreateChannelAsync(_createChannelOptions))
{
publishChannel.ChannelShutdownAsync += (o, ea) =>
diff --git a/projects/Test/Integration/TestPublisherConfirms.cs b/projects/Test/Integration/TestPublisherConfirms.cs
index 4926aae6b..d6a551b30 100644
--- a/projects/Test/Integration/TestPublisherConfirms.cs
+++ b/projects/Test/Integration/TestPublisherConfirms.cs
@@ -52,7 +52,7 @@ public TestPublisherConfirms(ITestOutputHelper output)
public async Task TestWaitForConfirmsWithEventsAsync()
{
string queueName = GenerateQueueName();
- await using IChannel ch = await _conn.CreateChannelAsync(new CreateChannelOptions { PublisherConfirmationsEnabled = true, PublisherConfirmationTrackingEnabled = true });
+ await using IChannel ch = await _conn.CreateChannelAsync(_createChannelOptions);
await ch.QueueDeclareAsync(queue: queueName, passive: false, durable: false,
exclusive: true, autoDelete: false, arguments: null);
diff --git a/projects/Test/Integration/TestToxiproxy.cs b/projects/Test/Integration/TestToxiproxy.cs
index d6567ee1b..6e0ab2ec5 100644
--- a/projects/Test/Integration/TestToxiproxy.cs
+++ b/projects/Test/Integration/TestToxiproxy.cs
@@ -127,7 +127,7 @@ public async Task TestCloseConnection()
async Task PublishLoop()
{
- await using IChannel ch = await conn.CreateChannelAsync(new CreateChannelOptions { PublisherConfirmationsEnabled = true, PublisherConfirmationTrackingEnabled = true });
+ await using IChannel ch = await conn.CreateChannelAsync(_createChannelOptions);
QueueDeclareOk q = await ch.QueueDeclareAsync();
while (conn.IsOpen)
{
@@ -206,7 +206,7 @@ public async Task TestThatStoppedSocketResultsInHeartbeatTimeout()
Task pubTask = Task.Run(async () =>
{
await using IConnection conn = await cf.CreateConnectionAsync();
- await using IChannel ch = await conn.CreateChannelAsync(new CreateChannelOptions { PublisherConfirmationsEnabled = true, PublisherConfirmationTrackingEnabled = true });
+ await using IChannel ch = await conn.CreateChannelAsync(_createChannelOptions);
QueueDeclareOk q = await ch.QueueDeclareAsync();
while (conn.IsOpen)
{
@@ -304,12 +304,11 @@ public async Task TestPublisherConfirmationThrottling()
cf.RequestedHeartbeat = TimeSpan.FromSeconds(5);
cf.AutomaticRecoveryEnabled = true;
- var channelOpts = new CreateChannelOptions
- {
- PublisherConfirmationsEnabled = true,
- PublisherConfirmationTrackingEnabled = true,
- OutstandingPublisherConfirmationsRateLimiter = new ThrottlingRateLimiter(MaxOutstandingConfirms)
- };
+ var channelOpts = new CreateChannelOptions(
+ publisherConfirmationsEnabled: true,
+ publisherConfirmationTrackingEnabled: true,
+ outstandingPublisherConfirmationsRateLimiter: new ThrottlingRateLimiter(MaxOutstandingConfirms)
+ );
var channelCreatedTcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously);
var messagesPublishedTcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously);
diff --git a/projects/Test/OAuth2/TestOAuth2.cs b/projects/Test/OAuth2/TestOAuth2.cs
index 347ad12ce..b10ae8cca 100644
--- a/projects/Test/OAuth2/TestOAuth2.cs
+++ b/projects/Test/OAuth2/TestOAuth2.cs
@@ -230,7 +230,8 @@ public async Task SecondConnectionCrashes_GH1429()
private async Task DeclarePublishChannelAsync()
{
Assert.NotNull(_connection);
- IChannel publishChannel = await _connection.CreateChannelAsync(new CreateChannelOptions { PublisherConfirmationsEnabled = true, PublisherConfirmationTrackingEnabled = true });
+ var createChannelOptions = new CreateChannelOptions(publisherConfirmationsEnabled: true, publisherConfirmationTrackingEnabled: true);
+ IChannel publishChannel = await _connection.CreateChannelAsync(createChannelOptions);
await publishChannel.ExchangeDeclareAsync("test_direct", ExchangeType.Direct, true, false);
return publishChannel;
}