Skip to content

Add Connection Pool Close configuration #389

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 8 commits into from
Aug 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 10 additions & 1 deletion RabbitMQ.Stream.Client/ClientExceptions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,8 @@ internal static bool IsAKnownException(Exception exception)
return x.Any();
}

return exception is (SocketException or TimeoutException or LeaderNotFoundException or InvalidOperationException or OperationCanceledException) ||
return exception is (SocketException or TimeoutException or LeaderNotFoundException
or InvalidOperationException or OperationCanceledException) ||
IsStreamNotAvailable(exception);
}

Expand Down Expand Up @@ -191,4 +192,12 @@ public TooManyConnectionsException(string s)
{
}
}

public class PendingConnectionsException : Exception
{
public PendingConnectionsException(string s)
: base(s)
{
}
}
}
142 changes: 130 additions & 12 deletions RabbitMQ.Stream.Client/ConnectionsPool.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,41 @@

namespace RabbitMQ.Stream.Client;

public enum ConnectionClosePolicy
{
/// <summary>
/// The connection is closed when the last consumer or producer is removed.
/// </summary>
CloseWhenEmpty,

/// <summary>
/// The connection is closed when the last consumer or producer is removed and the connection is not used for a certain time.
/// </summary>
CloseWhenEmptyAndIdle
}

public class ConnectionCloseConfig
{
/// <summary>
/// Policy to close the connection.
/// </summary>

public ConnectionClosePolicy Policy { get; set; } = ConnectionClosePolicy.CloseWhenEmpty;

/// <summary>
/// The connection is closed when the last consumer or producer is removed and the connection is not used for a certain time.
/// Idle time is valid only if the policy is CloseWhenEmptyAndIdle.
/// </summary>
public TimeSpan IdleTime { get; set; } = TimeSpan.FromMinutes(5);

/// <summary>
/// Interval to check the idle time.
/// Default is high because the check is done in a separate thread.
/// The filed is internal to help the test.
/// </summary>
internal TimeSpan CheckIdleTime { get; set; } = TimeSpan.FromSeconds(60);
}

public class ConnectionPoolConfig
{
/// <summary>
Expand All @@ -30,6 +65,11 @@ public class ConnectionPoolConfig
/// but it is not the best for performance.
/// </summary>
public byte ProducersPerConnection { get; set; } = 1;

/// <summary>
/// Define the connection close policy.
/// </summary>
public ConnectionCloseConfig ConnectionCloseConfig { get; set; } = new ConnectionCloseConfig();
}

public class LastSecret
Expand Down Expand Up @@ -87,9 +127,10 @@ public bool Available
/// subscriptionIds
/// publisherIds
/// </summary>
public class ConnectionsPool
public class ConnectionsPool : IDisposable
{
private static readonly object s_lock = new();
private bool _isRunning = false;

internal static byte FindNextValidId(List<byte> ids, byte nextId = 0)
{
Expand Down Expand Up @@ -127,16 +168,56 @@ internal static byte FindNextValidId(List<byte> ids, byte nextId = 0)
private readonly byte _idsPerConnection;
private readonly SemaphoreSlim _semaphoreSlim = new(1, 1);
private readonly LastSecret _lastSecret = new();
private readonly Task _checkIdleConnectionTimeTask;

/// <summary>
/// Init the pool with the max connections and the max ids per connection
/// </summary>
/// <param name="maxConnections"> The max connections are allowed for session</param>
/// <param name="idsPerConnection"> The max ids per Connection</param>
public ConnectionsPool(int maxConnections, byte idsPerConnection)
/// <param name="connectionCloseConfig"> Policy to close the connections in the pool</param>
public ConnectionsPool(int maxConnections, byte idsPerConnection, ConnectionCloseConfig connectionCloseConfig)
{
_maxConnections = maxConnections;
_idsPerConnection = idsPerConnection;
ConnectionPoolConfig = connectionCloseConfig;
_isRunning = true;
if (ConnectionPoolConfig.Policy == ConnectionClosePolicy.CloseWhenEmptyAndIdle)
{
_checkIdleConnectionTimeTask = Task.Run(CheckIdleConnectionTime);
}
}

private ConnectionCloseConfig ConnectionPoolConfig { get; }

private async Task CheckIdleConnectionTime()
{
while (_isRunning)
{
await Task.Delay(ConnectionPoolConfig.CheckIdleTime)
.ConfigureAwait(false);

if (!_isRunning)
{
var now = DateTime.UtcNow;
var connectionItems = Connections.Values.ToList();
foreach (var connectionItem in connectionItems.Where(connectionItem =>
connectionItem.EntitiesCount == 0 &&
connectionItem.LastUsed.Add(ConnectionPoolConfig.IdleTime) < now))
{
CloseItemAndConnection("Idle connection", connectionItem);
}
}
else
{
var connectionItems = Connections.Values.ToList();
foreach (var connectionItem in connectionItems.Where(
connectionItem => connectionItem.EntitiesCount == 0))
{
CloseItemAndConnection("Idle connection", connectionItem);
}
}
}
}

/// <summary>
Expand Down Expand Up @@ -208,10 +289,7 @@ public bool TryMergeClientParameters(ClientParameters clientParameters, out Clie
return false;
}

cp = clientParameters with
{
Password = _lastSecret.Secret
};
cp = clientParameters with { Password = _lastSecret.Secret };
return true;
}

Expand Down Expand Up @@ -264,20 +342,31 @@ public void MaybeClose(string clientId, string reason)
return;
}

// close the connection
connectionItem.Client.Close(reason);
connectionItem.LastUsed = DateTime.UtcNow;

// remove the connection from the pool
// it means that the connection is closed
// we don't care if it is called two times for the same connection
Connections.TryRemove(clientId, out _);
if (ConnectionPoolConfig.Policy == ConnectionClosePolicy.CloseWhenEmpty)
{
CloseItemAndConnection(reason, connectionItem);
}
}
finally
{
_semaphoreSlim.Release();
}
}

private void CloseItemAndConnection(string reason, ConnectionItem connectionItem)
{
// close the connection
connectionItem.Client.Close(reason);
// remove the connection from the pool
// it means that the connection is closed
// we don't care if it is called two times for the same connection
Connections.TryRemove(connectionItem.Client.ClientId, out _);
}

internal int PendingConnections => Connections.Values.Count(x => x.EntitiesCount > 0);

/// <summary>
/// Removes the consumer entity from the client.
/// When the metadata update is called we need to remove the consumer entity from the client.
Expand Down Expand Up @@ -328,4 +417,33 @@ public void RemoveProducerEntityFromStream(string clientId, byte id, string stre
}

public int ConnectionsCount => Connections.Count;

public async Task Close()
{
// The pool can't be closed if there are pending connections with the policy: CloseWhenEmptyAndIdle
// else there is no way to close the pending connections.
// The user needs to close the pending connections before to close the pool.
// At the moment when the pool is closed the pending connections are not closed with CloseWhenEmpty
// because the pool is not strictly bound to the stream system.
// The StreamSystem doesn't close the connections when it is closed. That was by design
// We could consider (Version 2.0) to close all the Producers and Consumers and their connection when the StreamSystem is closed.
// Other clients like Java and Golang close the connections when the Environment (alias StreamSystem) is closed.
if (PendingConnections > 0 && ConnectionPoolConfig.Policy == ConnectionClosePolicy.CloseWhenEmptyAndIdle)
{
throw new PendingConnectionsException(
$"There are {PendingConnections} pending connections. With the policy CloseWhenEmptyAndIdle you need to close them");
}

_isRunning = false;
if (_checkIdleConnectionTimeTask is not null)
{
await _checkIdleConnectionTimeTask.ConfigureAwait(false);
}
}

public void Dispose()
{
_semaphoreSlim.Dispose();
GC.SuppressFinalize(this);
}
}
17 changes: 16 additions & 1 deletion RabbitMQ.Stream.Client/PublicAPI.Unshipped.txt
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,15 @@ RabbitMQ.Stream.Client.ClientParameters.OnMetadataUpdate -> RabbitMQ.Stream.Clie
RabbitMQ.Stream.Client.ClientParameters.RpcTimeOut.get -> System.TimeSpan
RabbitMQ.Stream.Client.ClientParameters.RpcTimeOut.set -> void
RabbitMQ.Stream.Client.Connection.UpdateCloseStatus(string reason) -> void
RabbitMQ.Stream.Client.ConnectionCloseConfig
RabbitMQ.Stream.Client.ConnectionCloseConfig.ConnectionCloseConfig() -> void
RabbitMQ.Stream.Client.ConnectionCloseConfig.IdleTime.get -> System.TimeSpan
RabbitMQ.Stream.Client.ConnectionCloseConfig.IdleTime.set -> void
RabbitMQ.Stream.Client.ConnectionCloseConfig.Policy.get -> RabbitMQ.Stream.Client.ConnectionClosePolicy
RabbitMQ.Stream.Client.ConnectionCloseConfig.Policy.set -> void
RabbitMQ.Stream.Client.ConnectionClosePolicy
RabbitMQ.Stream.Client.ConnectionClosePolicy.CloseWhenEmpty = 0 -> RabbitMQ.Stream.Client.ConnectionClosePolicy
RabbitMQ.Stream.Client.ConnectionClosePolicy.CloseWhenEmptyAndIdle = 1 -> RabbitMQ.Stream.Client.ConnectionClosePolicy
RabbitMQ.Stream.Client.ConnectionItem
RabbitMQ.Stream.Client.ConnectionItem.Available.get -> bool
RabbitMQ.Stream.Client.ConnectionItem.BrokerInfo.get -> string
Expand All @@ -67,14 +76,18 @@ RabbitMQ.Stream.Client.ConnectionItem.IdsPerConnection.get -> byte
RabbitMQ.Stream.Client.ConnectionItem.LastUsed.get -> System.DateTime
RabbitMQ.Stream.Client.ConnectionItem.LastUsed.set -> void
RabbitMQ.Stream.Client.ConnectionPoolConfig
RabbitMQ.Stream.Client.ConnectionPoolConfig.ConnectionCloseConfig.get -> RabbitMQ.Stream.Client.ConnectionCloseConfig
RabbitMQ.Stream.Client.ConnectionPoolConfig.ConnectionCloseConfig.set -> void
RabbitMQ.Stream.Client.ConnectionPoolConfig.ConnectionPoolConfig() -> void
RabbitMQ.Stream.Client.ConnectionPoolConfig.ConsumersPerConnection.get -> byte
RabbitMQ.Stream.Client.ConnectionPoolConfig.ConsumersPerConnection.set -> void
RabbitMQ.Stream.Client.ConnectionPoolConfig.ProducersPerConnection.get -> byte
RabbitMQ.Stream.Client.ConnectionPoolConfig.ProducersPerConnection.set -> void
RabbitMQ.Stream.Client.ConnectionsPool
RabbitMQ.Stream.Client.ConnectionsPool.Close() -> System.Threading.Tasks.Task
RabbitMQ.Stream.Client.ConnectionsPool.ConnectionsCount.get -> int
RabbitMQ.Stream.Client.ConnectionsPool.ConnectionsPool(int maxConnections, byte idsPerConnection) -> void
RabbitMQ.Stream.Client.ConnectionsPool.ConnectionsPool(int maxConnections, byte idsPerConnection, RabbitMQ.Stream.Client.ConnectionCloseConfig connectionCloseConfig) -> void
RabbitMQ.Stream.Client.ConnectionsPool.Dispose() -> void
RabbitMQ.Stream.Client.ConnectionsPool.MaybeClose(string clientId, string reason) -> void
RabbitMQ.Stream.Client.ConnectionsPool.Remove(string clientId) -> void
RabbitMQ.Stream.Client.ConnectionsPool.RemoveConsumerEntityFromStream(string clientId, byte id, string stream) -> void
Expand Down Expand Up @@ -194,6 +207,8 @@ RabbitMQ.Stream.Client.PartitionsSuperStreamSpec
RabbitMQ.Stream.Client.PartitionsSuperStreamSpec.Partitions.get -> int
RabbitMQ.Stream.Client.PartitionsSuperStreamSpec.PartitionsSuperStreamSpec(string Name) -> void
RabbitMQ.Stream.Client.PartitionsSuperStreamSpec.PartitionsSuperStreamSpec(string Name, int partitions) -> void
RabbitMQ.Stream.Client.PendingConnectionsException
RabbitMQ.Stream.Client.PendingConnectionsException.PendingConnectionsException(string s) -> void
RabbitMQ.Stream.Client.ProducerFilter
RabbitMQ.Stream.Client.ProducerFilter.FilterValue.get -> System.Func<RabbitMQ.Stream.Client.Message, string>
RabbitMQ.Stream.Client.ProducerFilter.FilterValue.set -> void
Expand Down
23 changes: 20 additions & 3 deletions RabbitMQ.Stream.Client/StreamSystem.cs
Original file line number Diff line number Diff line change
Expand Up @@ -73,13 +73,13 @@ private StreamSystem(ClientParameters clientParameters, Client client,
_clientParameters = clientParameters;
_client = client;
_logger = logger ?? NullLogger<StreamSystem>.Instance;
// we don't expose the the max connections per producer/consumer
// we don't expose the max connections per producer/consumer
// for the moment. We can expose it in the future if needed
PoolConsumers = new ConnectionsPool(0,
connectionPoolConfig.ConsumersPerConnection);
connectionPoolConfig.ConsumersPerConnection, connectionPoolConfig.ConnectionCloseConfig);

PoolProducers = new ConnectionsPool(0,
connectionPoolConfig.ProducersPerConnection);
connectionPoolConfig.ProducersPerConnection, connectionPoolConfig.ConnectionCloseConfig);
}

public bool IsClosed => _client.IsClosed;
Expand Down Expand Up @@ -139,6 +139,23 @@ public static async Task<StreamSystem> Create(StreamSystemConfig config, ILogger
public async Task Close()
{
await _client.Close("system close").ConfigureAwait(false);

try
{
await PoolConsumers.Close()
.ConfigureAwait(false);
await PoolProducers.Close()
.ConfigureAwait(false);
}
catch
{
}
finally
{
PoolConsumers.Dispose();
PoolProducers.Dispose();
}

_logger?.LogDebug("Client Closed");
}

Expand Down
4 changes: 2 additions & 2 deletions Tests/ClientTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ public async void DeclarePublisherShouldReturnErrorCode()

var (publisherId, result) =
await client.DeclarePublisher(publisherRef, "this-stream-does-not-exist", confirmed, errored,
new ConnectionsPool(0, 1));
new ConnectionsPool(0, 1, new ConnectionCloseConfig()));
Assert.Equal(ResponseCode.StreamDoesNotExist, result.ResponseCode);
await client.Close("done");
}
Expand All @@ -124,7 +124,7 @@ public async void DeclareConsumerShouldReturnErrorCode()
var client = await Client.Create(clientParameters);
var (subId, subscribeResponse) = await client.Subscribe(
"this-stream-does-not-exist", new OffsetTypeLast(), 1,
new Dictionary<string, string>(), null, null, new ConnectionsPool(0, 1));
new Dictionary<string, string>(), null, null, new ConnectionsPool(0, 1, new ConnectionCloseConfig()));
Assert.Equal(ResponseCode.StreamDoesNotExist, subscribeResponse.ResponseCode);
await client.Close("done");
}
Expand Down
Loading