diff --git a/design/connection-pool/Core8AzureAccessToken.png b/design/connection-pool/Core8AzureAccessToken.png new file mode 100644 index 0000000000..67e210e05c Binary files /dev/null and b/design/connection-pool/Core8AzureAccessToken.png differ diff --git a/design/connection-pool/Core8AzureDefault.png b/design/connection-pool/Core8AzureDefault.png new file mode 100644 index 0000000000..dab9883261 Binary files /dev/null and b/design/connection-pool/Core8AzureDefault.png differ diff --git a/design/connection-pool/Core8Local.png b/design/connection-pool/Core8Local.png new file mode 100644 index 0000000000..4e68b57c3a Binary files /dev/null and b/design/connection-pool/Core8Local.png differ diff --git a/design/connection-pool/Core8LocalLinux.png b/design/connection-pool/Core8LocalLinux.png new file mode 100644 index 0000000000..b4c4d92c8b Binary files /dev/null and b/design/connection-pool/Core8LocalLinux.png differ diff --git a/design/connection-pool/Framework481AzureDefault.png b/design/connection-pool/Framework481AzureDefault.png new file mode 100644 index 0000000000..c073438249 Binary files /dev/null and b/design/connection-pool/Framework481AzureDefault.png differ diff --git a/design/connection-pool/Framework481Local.png b/design/connection-pool/Framework481Local.png new file mode 100644 index 0000000000..918e854b5b Binary files /dev/null and b/design/connection-pool/Framework481Local.png differ diff --git a/design/connection-pool/MeanTimeAzureCold-1.png b/design/connection-pool/MeanTimeAzureCold-1.png new file mode 100644 index 0000000000..f92251b2da Binary files /dev/null and b/design/connection-pool/MeanTimeAzureCold-1.png differ diff --git a/design/connection-pool/MeanTimeAzureCold.png b/design/connection-pool/MeanTimeAzureCold.png new file mode 100644 index 0000000000..ca70fd6583 Binary files /dev/null and b/design/connection-pool/MeanTimeAzureCold.png differ diff --git a/design/connection-pool/MeanTimeAzureWarm-1.png b/design/connection-pool/MeanTimeAzureWarm-1.png new file mode 100644 index 0000000000..a9cfa7925e Binary files /dev/null and b/design/connection-pool/MeanTimeAzureWarm-1.png differ diff --git a/design/connection-pool/MeanTimeAzureWarm.png b/design/connection-pool/MeanTimeAzureWarm.png new file mode 100644 index 0000000000..0c666ea7ee Binary files /dev/null and b/design/connection-pool/MeanTimeAzureWarm.png differ diff --git a/design/connection-pool/MeanTimeLocal.png b/design/connection-pool/MeanTimeLocal.png new file mode 100644 index 0000000000..0dd2f836e0 Binary files /dev/null and b/design/connection-pool/MeanTimeLocal.png differ diff --git a/design/connection-pool/MeanTimeLocalWarm-1.png b/design/connection-pool/MeanTimeLocalWarm-1.png new file mode 100644 index 0000000000..703b5a9e98 Binary files /dev/null and b/design/connection-pool/MeanTimeLocalWarm-1.png differ diff --git a/design/connection-pool/MeanTimeLocalWarm.png b/design/connection-pool/MeanTimeLocalWarm.png new file mode 100644 index 0000000000..4df6b21de8 Binary files /dev/null and b/design/connection-pool/MeanTimeLocalWarm.png differ diff --git a/design/connection-pool/SqlClient Connection Pool (11).png b/design/connection-pool/SqlClient Connection Pool (11).png new file mode 100644 index 0000000000..88cf62bebc Binary files /dev/null and b/design/connection-pool/SqlClient Connection Pool (11).png differ diff --git a/design/connection-pool/connection-pool-design-doc.md b/design/connection-pool/connection-pool-design-doc.md new file mode 100644 index 0000000000..94bbad7df2 --- /dev/null +++ b/design/connection-pool/connection-pool-design-doc.md @@ -0,0 +1,87 @@ +# Design Document: `ChannelDbConnectionPool` +## Problem Statement +The current connection pool implementation is slow to open new connections and does not follow async best practices. + +Connection opening is serialized, causing delays when multiple new connections are required simultaneously. This is done using a semaphore to rate limit connection creation. When multiple new connections are requested, they queue up waiting for the semaphore. Once acquired, the thread opens the connection and releases the semaphore, allowing the next thread to proceed. This approach was initially designed to prevent overwhelming the server but can lead to significant delays, especially in high-latency environments. + +Async requests are also serialized through an additional queue. When an async request is made, it is added to a queue, and a reader thread processes these requests one by one. This method was chosen due to the lack of async APIs in native SNI, resulting in synchronous handling of async requests on a dedicated thread. + + +## Design Goals +- Enable parallel connection opening. +- Minimize thread contention and synchronization overhead. +- Follow async best practices to reduce managed threadpool pressure and enable other components of the driver to modernize their async code. + +## Overview +The core of the design is the Channel data structure from the [System.Threading.Channels library](https://learn.microsoft.com/en-us/dotnet/core/extensions/channels) (available to .NET Framework as a nuget package) (Also see Stephen Toub's intro [here](https://devblogs.microsoft.com/dotnet/an-introduction-to-system-threading-channels/)). Channels are thread-safe, async-first queues that fit well for the connection pooling use case. + +A single channel holds the idle connections managed by the pool. A channel reader reads idle connections out of the channel to vend them to SqlConnections. A channel writer writes connections back to the channel when they are returned to the pool. + +Pool maintenance operations (warmup, pruning) are handled asynchronously as Tasks. + +Transaction-enlisted connections are stored in a separate dictionary data structure, in the same manner as the WaitHandleDbConnectionPool implementation. + +This design is based on the [PoolingDataSource](https://github.com/npgsql/npgsql/blob/main/src/Npgsql/PoolingDataSource.cs) class from the npgsql driver. The npgsql implemenation is proven to be reliable and performant in real production workloads. + +### Why the Channel Data Structure is a Good Fit + +1. **Thread-Safety**: + Channels are designed to facilitate thread-safe communication between producers (e.g., threads returning connections to the pool) and consumers (e.g., threads requesting connections). This eliminates the need for complex locking mechanisms, reducing the risk of race conditions and deadlocks. + +2. **Built-In Request Queueing**: + Channels provide a succinct API to wait asynchronously if no connections are available at the time of the request. + +3. **Asynchronous Support**: + Channels provide a robust async API surface, simplifying the async paths for the connection pool. + +4. **Performant**: + Channels are fast and avoid extra allocations, making them suitable for high throughput applications: https://devblogs.microsoft.com/dotnet/an-introduction-to-system-threading-channels/#performance + +## Workflows +1. **Warmup**: +New connections are written to the tail of the idle channel by an async task. + +2. **Acquire Connection**: +Idle connections are acquired from the head of the idle channel. + +3. **Release Connection**: +Connections that are released to the pool are added to the tail of the idle channel. + +4. **Pruning**: +Connections are pruned from the head of the idle channel. + +![Diagram showing user interactions and subprocesses interacting with the idle connection channel]() + + +## Performance Benchmarks +Note: All graphed results use managed SNI. See full results below for native SNI. + +The channel based implementation shows significant performance improvements across frameworks and operating systems. In particular, interacting with a warm pool is much faster. + +![Chart showing mean time to open 100 local connections with cold pool](MeanTimeLocal.png) +![Chart showing mean time to open 100 local connections with warm pool](MeanTimeLocalWarm-1.png) +![Chart showing mean time to open 10 azure connections with warm pool](MeanTimeAzureWarm-1.png) + + +When interacting with a cold pool and connecting to an Azure database, performance is equivalent to the legacy implementation *provided enough threads are made available in the managed threadpool*. This requirement highlights a bottleneck present further down the stack when acquiring federated auth tokens. + +![Chart showing mean time to open 10 azure connections with cold pool](MeanTimeAzureCold-1.png) + + +### Windows - .NET 8.0 +![Performance results Windows - .NET 8.0](Core8Local.png) + +### Windows - .NET Framework 4.8.1 +![Performance results Windows - .NET Framework 4.8.1](Framework481Local.png) + +### Linux - net8.0 +![Performance results Linux - net8.0](Core8LocalLinux.png) + +### Windows - net8.0 - AzureSQL - Default Azure Credential +![Performance results Windows - net8.0 - AzureSQL - Default Azure Credential](Core8AzureDefault.png) + +### Windows - .NET Framework 4.8.1 - AzureSQL - Default Azure Credential +![Performance results Windows - .NET Framework 4.8.1 - AzureSQL - Default Azure Credential](Framework481AzureDefault.png) + +### Windows - NET 8.0 - Azure SQL - AccessToken +![Performance results Windows - NET 8.0 - Azure SQL - AccessToken](Core8AzureAccessToken.png) \ No newline at end of file diff --git a/design/connection-pool/connection-pooling-primer.md b/design/connection-pool/connection-pooling-primer.md new file mode 100644 index 0000000000..7832f0c414 --- /dev/null +++ b/design/connection-pool/connection-pooling-primer.md @@ -0,0 +1,18 @@ +# Primer on Connection Pooling + +## Overview +Connection pooling is a technique used to manage database connections efficiently. It involves maintaining a pool of connections that can be reused, reducing the overhead of opening and closing connections frequently. + +## Key Concepts +- **Connection Pool Initialization**: A connection pool is created for each unique connection string. The pool maintains a collection of connections that are already logged in and ready to use. +- **Connection Reuse and Creation**: Connections are reused from the pool if available; otherwise, new connections are created. Connection strings are used to differentiate pools, and any change in the connection string results in a new pool. +- **Connection String Sensitivity**: Connection pooling is not sensitive to whitespace in connection strings. Different authentication methods for the same user result in separate pools. +- **Pool Management**: Pools are managed per process. A pool manager oversees all pools and determines which pool to use based on the connection string. +- **Session Settings**: SQL Server provides a procedure (SP reset connection) to reset session settings when reusing a connection. SP reset connection is triggered every time a connection is reused from the pool. +- **Handling Transactions**: Connections involved in transactions are handled separately and may be reset while preserving the transaction state. +- **Connection Liveness**: Connection liveness is checked when pulling connections from the pool. Dead connections are discarded, and new ones are created if necessary. +- **Connection Pruning**: Idle connections above the minimum threshold are closed periodically to manage resources. Pruning helps reclaim leaked connections and maintain the pool size within the defined limits. +- **Warm-Up Process**: On application startup, the pool warms up to the minimum pool size by creating connections in the background. +- **Handling Broken Connections**: Broken connections are detected and handled by creating new connections if the session cannot be recovered. +- **Concurrency and Async Handling**: Connection creation should happen on separate threads to avoid queuing and improve performance. +- **Security Considerations**: Pools are separated based on user authentication to prevent unauthorized access. \ No newline at end of file diff --git a/src/Microsoft.Data.SqlClient/netcore/src/Microsoft.Data.SqlClient.csproj b/src/Microsoft.Data.SqlClient/netcore/src/Microsoft.Data.SqlClient.csproj index 62d35ab21a..54146cb175 100644 --- a/src/Microsoft.Data.SqlClient/netcore/src/Microsoft.Data.SqlClient.csproj +++ b/src/Microsoft.Data.SqlClient/netcore/src/Microsoft.Data.SqlClient.csproj @@ -95,6 +95,9 @@ Microsoft\Data\ProviderBase\DbConnectionFactory.cs + + Microsoft\Data\SqlClient\ConnectionPool\ChannelDbConnectionPool.cs + Microsoft\Data\SqlClient\ConnectionPool\DbConnectionPool.cs @@ -137,6 +140,24 @@ Microsoft\Data\SqlClient\ConnectionPool\SqlConnectionPoolProviderInfo.cs + + Microsoft\Data\SqlClient\ConnectionPool\TransactedConnectionPool.cs + + + Microsoft\Data\SqlClient\RateLimiter\AsyncFlagFunc.cs + + + Microsoft\Data\SqlClient\RateLimiter\BlockingPeriodRateLimiter.cs + + + Microsoft\Data\SqlClient\RateLimiter\ConcurrencyRateLimiter.cs + + + Microsoft\Data\SqlClient\RateLimiter\PassthroughRateLimiter.cs + + + Microsoft\Data\SqlClient\RateLimiter\RateLimiterBase.cs + Microsoft\Data\ProviderBase\DbMetaDataFactory.cs diff --git a/src/Microsoft.Data.SqlClient/netcore/src/Microsoft/Data/SqlClient/SqlConnection.cs b/src/Microsoft.Data.SqlClient/netcore/src/Microsoft/Data/SqlClient/SqlConnection.cs index 52112bfe74..c2694df9a4 100644 --- a/src/Microsoft.Data.SqlClient/netcore/src/Microsoft/Data/SqlClient/SqlConnection.cs +++ b/src/Microsoft.Data.SqlClient/netcore/src/Microsoft/Data/SqlClient/SqlConnection.cs @@ -1439,8 +1439,8 @@ public override void Open() Open(SqlConnectionOverrides.None); } - private bool TryOpenWithRetry(TaskCompletionSource retry, SqlConnectionOverrides overrides) - => RetryLogicProvider.Execute(this, () => TryOpen(retry, overrides)); + private bool TryOpenWithRetry(TaskCompletionSource taskCompletionSource, SqlConnectionOverrides overrides) + => RetryLogicProvider.Execute(this, () => TryOpen(taskCompletionSource, overrides)); /// public void Open(SqlConnectionOverrides overrides) @@ -1949,7 +1949,7 @@ private void PrepareStatisticsForNewConnection() } } - private bool TryOpen(TaskCompletionSource retry, SqlConnectionOverrides overrides = SqlConnectionOverrides.None) + private bool TryOpen(TaskCompletionSource taskCompletionSource, SqlConnectionOverrides overrides = SqlConnectionOverrides.None) { SqlConnectionString connectionOptions = (SqlConnectionString)ConnectionOptions; @@ -2011,14 +2011,14 @@ private bool TryOpen(TaskCompletionSource retry, SqlConnec if (ForceNewConnection) { - if (!InnerConnection.TryReplaceConnection(this, ConnectionFactory, retry, UserConnectionOptions)) + if (!InnerConnection.TryReplaceConnection(this, ConnectionFactory, taskCompletionSource, UserConnectionOptions)) { return false; } } else { - if (!InnerConnection.TryOpenConnection(this, ConnectionFactory, retry, UserConnectionOptions)) + if (!InnerConnection.TryOpenConnection(this, ConnectionFactory, taskCompletionSource, UserConnectionOptions)) { return false; } diff --git a/src/Microsoft.Data.SqlClient/netcore/src/Microsoft/Data/SqlClient/SqlConnectionFactory.cs b/src/Microsoft.Data.SqlClient/netcore/src/Microsoft/Data/SqlClient/SqlConnectionFactory.cs index ca9bcc2cd9..de627b70db 100644 --- a/src/Microsoft.Data.SqlClient/netcore/src/Microsoft/Data/SqlClient/SqlConnectionFactory.cs +++ b/src/Microsoft.Data.SqlClient/netcore/src/Microsoft/Data/SqlClient/SqlConnectionFactory.cs @@ -86,7 +86,7 @@ override protected DbConnectionInternal CreateConnection(DbConnectionOptions opt redirectedUserInstance = true; string instanceName; - if (pool == null || (pool != null && pool.Count <= 0)) + if (pool == null || (pool != null && ((SqlConnectionPoolProviderInfo)pool.ProviderInfo).InstanceName == null)) { // Non-pooled or pooled and no connections in the pool. SqlInternalConnectionTds sseConnection = null; try diff --git a/src/Microsoft.Data.SqlClient/netfx/src/Microsoft.Data.SqlClient.csproj b/src/Microsoft.Data.SqlClient/netfx/src/Microsoft.Data.SqlClient.csproj index 52767110a0..640436330d 100644 --- a/src/Microsoft.Data.SqlClient/netfx/src/Microsoft.Data.SqlClient.csproj +++ b/src/Microsoft.Data.SqlClient/netfx/src/Microsoft.Data.SqlClient.csproj @@ -279,6 +279,9 @@ Microsoft\Data\ProviderBase\DbConnectionInternal.cs + + Microsoft\Data\SqlClient\ConnectionPool\ChannelDbConnectionPool.cs + Microsoft\Data\SqlClient\ConnectionPool\DbConnectionPool.cs @@ -318,12 +321,30 @@ Microsoft\Data\SqlClient\ConnectionPool\SqlConnectionPoolGroupProviderInfo.cs + + Microsoft\Data\SqlClient\ConnectionPool\TransactedConnectionPool.cs + Microsoft\Data\SqlClient\SqlConnectionPoolKey.cs Microsoft\Data\SqlClient\ConnectionPool\SqlConnectionPoolProviderInfo.cs + + Microsoft\Data\SqlClient\RateLimiter\AsyncFlagFunc.cs + + + Microsoft\Data\SqlClient\RateLimiter\BlockingPeriodRateLimiter.cs + + + Microsoft\Data\SqlClient\RateLimiter\ConcurrencyRateLimiter.cs + + + Microsoft\Data\SqlClient\RateLimiter\PassthroughRateLimiter.cs + + + Microsoft\Data\SqlClient\RateLimiter\RateLimiterBase.cs + Microsoft\Data\SqlClient\Diagnostics\SqlClientMetrics.cs @@ -984,6 +1005,7 @@ + diff --git a/src/Microsoft.Data.SqlClient/netfx/src/Microsoft/Data/SqlClient/SqlConnectionFactory.cs b/src/Microsoft.Data.SqlClient/netfx/src/Microsoft/Data/SqlClient/SqlConnectionFactory.cs index d3402a48bd..e70f829070 100644 --- a/src/Microsoft.Data.SqlClient/netfx/src/Microsoft/Data/SqlClient/SqlConnectionFactory.cs +++ b/src/Microsoft.Data.SqlClient/netfx/src/Microsoft/Data/SqlClient/SqlConnectionFactory.cs @@ -93,7 +93,7 @@ override protected DbConnectionInternal CreateConnection(DbConnectionOptions opt redirectedUserInstance = true; string instanceName; - if (pool == null || (pool != null && pool.Count <= 0)) + if (pool == null || (pool != null && ((SqlConnectionPoolProviderInfo)pool.ProviderInfo).InstanceName == null)) { // Non-pooled or pooled and no connections in the pool. SqlInternalConnectionTds sseConnection = null; diff --git a/src/Microsoft.Data.SqlClient/src/Microsoft.Data.SqlClient.csproj b/src/Microsoft.Data.SqlClient/src/Microsoft.Data.SqlClient.csproj index 70f6b3ff2c..f42fe42b2c 100644 --- a/src/Microsoft.Data.SqlClient/src/Microsoft.Data.SqlClient.csproj +++ b/src/Microsoft.Data.SqlClient/src/Microsoft.Data.SqlClient.csproj @@ -6,7 +6,7 @@ - + diff --git a/src/Microsoft.Data.SqlClient/src/Microsoft/Data/ProviderBase/DbConnectionInternal.cs b/src/Microsoft.Data.SqlClient/src/Microsoft/Data/ProviderBase/DbConnectionInternal.cs index b2cf6c25af..8de85cc3d2 100644 --- a/src/Microsoft.Data.SqlClient/src/Microsoft/Data/ProviderBase/DbConnectionInternal.cs +++ b/src/Microsoft.Data.SqlClient/src/Microsoft/Data/ProviderBase/DbConnectionInternal.cs @@ -95,6 +95,7 @@ internal DbConnectionInternal(ConnectionState state, bool hidePassword, bool all } #region Properties + internal DateTime OpenTimestamp => _createTime; internal bool AllowSetConnectionString { get; } @@ -464,8 +465,8 @@ internal virtual void CloseConnection(DbConnection owningObject, DbConnectionFac // into the pool. if (connectionPool is not null) { - // PutObject calls Deactivate for us... - connectionPool.PutObject(this, owningObject); + // ReturnInternalConnection calls Deactivate for us... + connectionPool.ReturnInternalConnection(this, owningObject); // NOTE: Before we leave the PutObject call, another thread may have // already popped the connection from the pool, so don't expect to be @@ -814,7 +815,7 @@ internal void SetInStasis() internal virtual bool TryOpenConnection( DbConnection outerConnection, DbConnectionFactory connectionFactory, - TaskCompletionSource retry, + TaskCompletionSource taskCompletionSource, DbConnectionOptions userOptions) { throw ADP.ConnectionAlreadyOpen(State); @@ -823,7 +824,7 @@ internal virtual bool TryOpenConnection( internal virtual bool TryReplaceConnection( DbConnection outerConnection, DbConnectionFactory connectionFactory, - TaskCompletionSource retry, + TaskCompletionSource taskCompletionSource, DbConnectionOptions userOptions) { throw ADP.MethodNotImplemented(); diff --git a/src/Microsoft.Data.SqlClient/src/Microsoft/Data/SqlClient/ConnectionPool/ChannelDbConnectionPool.cs b/src/Microsoft.Data.SqlClient/src/Microsoft/Data/SqlClient/ConnectionPool/ChannelDbConnectionPool.cs new file mode 100644 index 0000000000..be568bbc3c --- /dev/null +++ b/src/Microsoft.Data.SqlClient/src/Microsoft/Data/SqlClient/ConnectionPool/ChannelDbConnectionPool.cs @@ -0,0 +1,1121 @@ +// Licensed to the .NET Foundation under one or more agreements. +// The .NET Foundation licenses this file to you under the MIT license. +// See the LICENSE file in the project root for more information. + +using System; +using System.Collections.Concurrent; +using System.Collections.Generic; +using System.Data; +using System.Data.Common; +using System.Diagnostics; +using System.Diagnostics.CodeAnalysis; +using System.Runtime.CompilerServices; +using System.Threading; +using System.Threading.Channels; +using System.Threading.Tasks; +using System.Transactions; +using Microsoft.Data.Common; +using Microsoft.Data.ProviderBase; +using Microsoft.Data.SqlClient.RateLimiter; + +using static Microsoft.Data.SqlClient.ConnectionPool.DbConnectionPoolState; + +#nullable enable + +namespace Microsoft.Data.SqlClient.ConnectionPool +{ + internal sealed class ChannelDbConnectionPool : DbConnectionPool + { + #region Interface + internal override int Count => _numConnectors; + + internal override DbConnectionFactory ConnectionFactory => _connectionFactory; + + internal override bool ErrorOccurred => false; + + private bool HasTransactionAffinity => PoolGroupOptions.HasTransactionAffinity; + + internal override TimeSpan LoadBalanceTimeout => PoolGroupOptions.LoadBalanceTimeout; + + internal override DbConnectionPoolIdentity Identity => _identity; + + internal override bool IsRunning + { + get { return State is Running; } + } + + private int MaxPoolSize => PoolGroupOptions.MaxPoolSize; + + private int MinPoolSize => PoolGroupOptions.MinPoolSize; + + internal override DbConnectionPoolGroup PoolGroup => _connectionPoolGroup; + + internal override DbConnectionPoolGroupOptions PoolGroupOptions => _connectionPoolGroupOptions; + + internal override DbConnectionPoolProviderInfo ProviderInfo => _connectionPoolProviderInfo; + + /// + /// Return the pooled authentication contexts. + /// + internal override ConcurrentDictionary AuthenticationContexts => _pooledDbAuthenticationContexts; + + internal override bool UseLoadBalancing => PoolGroupOptions.UseLoadBalancing; + + /// + /// This only clears idle connections. Connections that are in use are not affected. + /// Different from previous behavior where all connections in the pool are doomed and eventually cleaned up. + /// + internal override void Clear() + { + Interlocked.Increment(ref _clearCounter); + + if (Interlocked.CompareExchange(ref _isClearing, 1, 0) == 1) + return; + + try + { + var count = _idleCount; + while (count > 0 && _idleConnectorReader.TryRead(out var connector)) + { + if (connector is null) + { + continue; + } + if (CheckIdleConnector(connector)) + { + CloseConnector(connector); + count--; + } + } + } + finally + { + _isClearing = 0; + } + } + + internal override bool TryGetConnection(DbConnection owningObject, TaskCompletionSource taskCompletionSource, DbConnectionOptions userOptions, out DbConnectionInternal? connection) + { + if (taskCompletionSource is not null) + { + Task.Run(async () => + { + //TODO: use same timespan everywhere and tick down for queueuing and actual connection opening work + try + { + var connection = await GetInternalConnection(owningObject, userOptions, TimeSpan.FromSeconds(owningObject.ConnectionTimeout), true, CancellationToken.None).ConfigureAwait(false); + taskCompletionSource.SetResult(connection); + } catch (Exception e) + { + taskCompletionSource.SetException(e); + } + }); + connection = null; + return false; + } + else + { + //TODO: use same timespan everywhere and tick down for queueuing and actual connection opening work + var task = GetInternalConnection(owningObject, userOptions, TimeSpan.FromSeconds(owningObject.ConnectionTimeout), false, CancellationToken.None); + //TODO: move sync over async limit to this spot? + connection = task.GetAwaiter().GetResult(); + return connection is not null; + } + } + + private void PrepareConnection(DbConnection owningObject, DbConnectionInternal obj, Transaction? transaction) + { + lock (obj) + { // Protect against Clear and ReclaimEmancipatedObjects, which call IsEmancipated, which is affected by PrePush and PostPop + obj.PostPop(owningObject); + } + try + { + obj.ActivateConnection(transaction); + } + catch + { + // if Activate throws an exception + // put it back in the pool or have it properly disposed of + this.ReturnInternalConnection(obj, owningObject); + throw; + } + } + + /// + /// Creates a new connection to replace an existing connection + /// + /// Outer connection that currently owns + /// Options used to create the new connection + /// Inner connection that will be replaced + /// A new inner connection that is attached to the + internal override DbConnectionInternal? ReplaceConnection(DbConnection owningObject, DbConnectionOptions userOptions, DbConnectionInternal oldConnection) + { + SqlClientEventSource.Log.TryPoolerTraceEvent(" {0}, replacing connection.", ObjectId); + DbConnectionInternal? newConnection = OpenNewInternalConnection(owningObject, userOptions, TimeSpan.FromSeconds(owningObject.ConnectionTimeout), false, default).Result; + + if (newConnection != null) + { + SqlClientEventSource.Metrics.SoftConnectRequest(); + PrepareConnection(owningObject, newConnection, oldConnection.EnlistedTransaction); + oldConnection.PrepareForReplaceConnection(); + oldConnection.DeactivateConnection(); + oldConnection.Dispose(); + } + + return newConnection; + + } + + internal override void ReturnInternalConnection(DbConnectionInternal connector, object? owningObject) + { + // Once a connection is closing (which is the state that we're in at + // this point in time) you cannot delegate a transaction to or enlist + // a transaction in it, so we can correctly presume that if there was + // not a delegated or enlisted transaction to start with, that there + // will not be a delegated or enlisted transaction once we leave the + // lock. + + lock (connector) + { + // Calling PrePush prevents the object from being reclaimed + // once we leave the lock, because it sets _pooledCount such + // that it won't appear to be out of the pool. What that + // means, is that we're now responsible for this connection: + // it won't get reclaimed if it gets lost. + connector.PrePush(owningObject); + + // TODO: Consider using a Cer to ensure that we mark the object for reclaimation in the event something bad happens? + } + + if (!CheckConnector(connector)) + { + return; + } + + DeactivateObject(connector); + } + + private void DeactivateObject(DbConnectionInternal obj) + { + SqlClientEventSource.Log.TryPoolerTraceEvent(" {0}, Connection {1}, Deactivating.", ObjectId, obj.ObjectID); + obj.DeactivateConnection(); + + bool returnToGeneralPool = false; + bool destroyObject = false; + bool rootTxn = false; + + if (obj.IsConnectionDoomed) + { + // the object is not fit for reuse -- just dispose of it. + destroyObject = true; + } + else + { + lock (obj) + { + // A connection with a delegated transaction cannot currently + // be returned to a different customer until the transaction + // actually completes, so we send it into Stasis -- the SysTx + // transaction object will ensure that it is owned (not lost), + // and it will be certain to put it back into the pool. + + if (State is ShuttingDown) + { + if (obj.IsTransactionRoot) + { + // SQLHotfix# 50003503 - connections that are affiliated with a + // root transaction and that also happen to be in a connection + // pool that is being shutdown need to be put in stasis so that + // the root transaction isn't effectively orphaned with no + // means to promote itself to a full delegated transaction or + // Commit or Rollback + obj.SetInStasis(); + rootTxn = true; + } + else + { + // connection is being closed and the pool has been marked as shutting + // down, so destroy this object. + destroyObject = true; + } + } + else + { + if (obj.IsNonPoolableTransactionRoot) + { + obj.SetInStasis(); + rootTxn = true; + } + else if (obj.CanBePooled) + { + // We must put this connection into the transacted pool + // while inside a lock to prevent a race condition with + // the transaction asynchronously completing on a second + // thread. + + Transaction transaction = obj.EnlistedTransaction; + if (transaction != null) + { + // NOTE: we're not locking on _state, so it's possible that its + // value could change between the conditional check and here. + // Although perhaps not ideal, this is OK because the + // DelegatedTransactionEnded event will clean up the + // connection appropriately regardless of the pool state. + _transactedConnectionPool.PutTransactedObject(transaction, obj); + rootTxn = true; + } + else + { + // return to general pool + returnToGeneralPool = true; + } + } + else + { + if (obj.IsTransactionRoot && !obj.IsConnectionDoomed) + { + // SQLHotfix# 50003503 - if the object cannot be pooled but is a transaction + // root, then we must have hit one of two race conditions: + // 1) PruneConnectionPoolGroups shutdown the pool and marked this connection + // as non-poolable while we were processing within this lock + // 2) The LoadBalancingTimeout expired on this connection and marked this + // connection as DoNotPool. + // + // This connection needs to be put in stasis so that the root transaction isn't + // effectively orphaned with no means to promote itself to a full delegated + // transaction or Commit or Rollback + obj.SetInStasis(); + rootTxn = true; + } + else + { + // object is not fit for reuse -- just dispose of it + destroyObject = true; + } + } + } + } + } + + if (returnToGeneralPool) + { + // Only push the connection into the general pool if we didn't + // already push it onto the transacted pool, put it into stasis, + // or want to destroy it. + Debug.Assert(destroyObject == false); + // Statement order is important since we have synchronous completions on the channel. + Interlocked.Increment(ref _idleCount); + var written = _idleConnectorWriter.TryWrite(obj); + Debug.Assert(written); + } + else if (destroyObject) + { + // Connections that have been marked as no longer + // poolable (e.g. exceeded their connection lifetime) are not, in fact, + // returned to the general pool + CloseConnector(obj); + } + + //------------------------------------------------------------------------------------- + // postcondition + + // ensure that the connection was processed + Debug.Assert(rootTxn == true || returnToGeneralPool == true || destroyObject == true); + } + + internal override void PutObjectFromTransactedPool(DbConnectionInternal? obj) + { + Debug.Assert(obj != null, "null pooledObject?"); + Debug.Assert(obj!.EnlistedTransaction == null, "pooledObject is still enlisted?"); + + obj.DeactivateConnection(); + + // called by the transacted connection pool , once it's removed the + // connection from it's list. We put the connection back in general + // circulation. + + // NOTE: there is no locking required here because if we're in this + // method, we can safely presume that the caller is the only person + // that is using the connection, and that all pre-push logic has been + // done and all transactions are ended. + SqlClientEventSource.Log.TryPoolerTraceEvent(" {0}, Connection {1}, Transaction has ended.", ObjectId, obj.ObjectID); + + if (State is Running && obj.CanBePooled) + { + Interlocked.Increment(ref _idleCount); + var written = _idleConnectorWriter.TryWrite(obj); + Debug.Assert(written); + } + else + { + CloseConnector(obj); + } + } + + internal override void Startup() + { + WarmUp(); + } + + internal override void Shutdown() + { + // NOTE: this occupies a thread for the whole duration of the shutdown process. + var shutdownTask = new Task(async () => await ShutdownAsync().ConfigureAwait(false)); + shutdownTask.RunSynchronously(); + } + + // TransactionEnded merely provides the plumbing for DbConnectionInternal to access the transacted pool + // that is implemented inside DbConnectionPool. This method's counterpart (PutTransactedObject) should + // only be called from DbConnectionPool.DeactivateObject and thus the plumbing to provide access to + // other objects is unnecessary (hence the asymmetry of Ended but no Begin) + internal override void TransactionEnded(Transaction? transaction, DbConnectionInternal? transactedObject) + { + Debug.Assert(transaction != null, "null transaction?"); + Debug.Assert(transactedObject != null, "null transactedObject?"); + + // Note: connection may still be associated with transaction due to Explicit Unbinding requirement. + SqlClientEventSource.Log.TryPoolerTraceEvent(" {0}, Transaction {1}, Connection {2}, Transaction Completed", ObjectId, transaction?.GetHashCode(), transactedObject?.ObjectID); + + // called by the internal connection when it get's told that the + // transaction is completed. We tell the transacted pool to remove + // the connection from it's list, then we put the connection back in + // general circulation. + _transactedConnectionPool.TransactionEnded(transaction, transactedObject); + } + + private DbConnectionInternal? GetFromTransactedPool(out Transaction? transaction) + { + transaction = ADP.GetCurrentTransaction(); + if (transaction == null) + { + return null; + } + + + DbConnectionInternal? obj = _transactedConnectionPool.GetTransactedObject(transaction); + if (obj == null) + { + return null; + } + + SqlClientEventSource.Log.TryPoolerTraceEvent(" {0}, Connection {1}, Popped from transacted pool.", ObjectId, obj.ObjectID); + SqlClientEventSource.Metrics.ExitFreeConnection(); + + if (obj.IsTransactionRoot) + { + try + { + obj.IsConnectionAlive(true); + } + catch + { + SqlClientEventSource.Log.TryPoolerTraceEvent(" {0}, Connection {1}, found dead and removed.", ObjectId, obj.ObjectID); + CloseConnector(obj); + throw; + } + } + else if (!obj.IsConnectionAlive()) + { + SqlClientEventSource.Log.TryPoolerTraceEvent(" {0}, Connection {1}, found dead and removed.", ObjectId, obj.ObjectID); + CloseConnector(obj); + obj = null; + } + + return obj; + } + #endregion + + #region Implementation + private readonly DbConnectionPoolIdentity _identity; + + private readonly DbConnectionFactory _connectionFactory; + private readonly DbConnectionPoolGroup _connectionPoolGroup; + private readonly DbConnectionPoolGroupOptions _connectionPoolGroupOptions; + private DbConnectionPoolProviderInfo _connectionPoolProviderInfo; + + /// + /// The private member which carries the set of authenticationcontexts for this pool (based on the user's identity). + /// + private readonly ConcurrentDictionary _pooledDbAuthenticationContexts; + + // Prevents synchronous operations which depend on async operations on managed + // threads from blocking on all available threads, which would stop async tasks + // from being scheduled and cause deadlocks. Use ProcessorCount/2 as a balance + // between sync and async tasks. + private static SemaphoreSlim SyncOverAsyncSemaphore { get; } = new(Math.Max(1, Environment.ProcessorCount / 2)); + + private static int _objectTypeCount; // EventSource counter + private static TimeSpan DefaultPruningPeriod = TimeSpan.FromMinutes(2); + private static TimeSpan MinIdleCountPeriod = TimeSpan.FromSeconds(1); + + #region private readonly + private readonly int _objectID = Interlocked.Increment(ref _objectTypeCount); + private readonly RateLimiterBase _connectionRateLimiter; + //TODO: readonly TimeSpan _connectionLifetime; + + /// + /// Tracks all connectors currently managed by this pool, whether idle or busy. + /// Only updated rarely - when physical connections are opened/closed - but is read in perf-sensitive contexts. + /// + private readonly DbConnectionInternal?[] _connectors; + + /// + /// Tracks all connectors currently managed by this pool that are in a transaction. + /// + private readonly TransactedConnectionPool _transactedConnectionPool; + + /// + /// Reader side for the idle connector channel. Contains nulls in order to release waiting attempts after + /// a connector has been physically closed/broken. + /// + private readonly ChannelReader _idleConnectorReader; + private readonly ChannelWriter _idleConnectorWriter; + + private readonly CancellationTokenSource _shutdownCTS; + private readonly CancellationToken _shutdownCT; + + private Task _warmupTask; + private readonly SemaphoreSlim _warmupLock; + + + private readonly Timer _pruningTimer; + private readonly Timer _minIdleCountTimer; + private readonly SemaphoreSlim _pruningLock; + + internal int _minIdleCount; + internal Timer PruningTimer => _pruningTimer; + internal Timer MinIdleCountTimer => _minIdleCountTimer; + internal Task PruningTask { get; set; } + internal SemaphoreSlim PruningLock => _pruningLock; + #endregion + + // Counts the total number of open connectors tracked by the pool. + private volatile int _numConnectors; + + // Counts the number of connectors currently sitting idle in the pool. + private volatile int _idleCount; + + /// + /// Incremented every time this pool is cleared. Allows us to identify connections which were + /// created before the clear. + /// + private volatile int _clearCounter; + private volatile int _isClearing; + + /// + /// Initializes a new PoolingDataSource. + /// + //TODO: support auth contexts and provider info + internal ChannelDbConnectionPool( + DbConnectionFactory connectionFactory, + DbConnectionPoolGroup connectionPoolGroup, + DbConnectionPoolIdentity identity, + DbConnectionPoolProviderInfo connectionPoolProviderInfo, + RateLimiterBase connectionRateLimiter) + { + State = Initializing; + + _connectionRateLimiter = connectionRateLimiter; + _connectionFactory = connectionFactory; + _connectionPoolGroup = connectionPoolGroup; + _connectionPoolGroupOptions = connectionPoolGroup.PoolGroupOptions; + _connectionPoolProviderInfo = connectionPoolProviderInfo; + _identity = identity; + _pooledDbAuthenticationContexts = new ConcurrentDictionary< + DbConnectionPoolAuthenticationContextKey, + DbConnectionPoolAuthenticationContext>( + concurrencyLevel: 4 * Environment.ProcessorCount, + capacity: 2); + + _connectors = new DbConnectionInternal[MaxPoolSize]; + _transactedConnectionPool = new TransactedConnectionPool(this); + + // We enforce Max Pool Size, so no need to to create a bounded channel (which is less efficient) + // On the consuming side, we have the multiplexing write loop but also non-multiplexing Rents + // On the producing side, we have connections being released back into the pool (both multiplexing and not) + var idleChannel = Channel.CreateUnbounded(); + _idleConnectorReader = idleChannel.Reader; + _idleConnectorWriter = idleChannel.Writer; + + _shutdownCTS = new CancellationTokenSource(); + _shutdownCT = _shutdownCTS.Token; + + _warmupTask = Task.CompletedTask; + _warmupLock = new SemaphoreSlim(1); + + _pruningTimer = new Timer(PruneIdleConnections, this, DefaultPruningPeriod, DefaultPruningPeriod); + + _minIdleCount = int.MaxValue; + + // TODO: make these private readonly if possible + // TODO: base pruning timer on a user provided param? + _minIdleCountTimer = new Timer(UpdateMinIdleCount, this, MinIdleCountPeriod, MinIdleCountPeriod); + _pruningLock = new SemaphoreSlim(1); + PruningTask = Task.CompletedTask; + + State = Running; + } + + #region properties + internal TimeSpan ConnectionLifetime => PoolGroupOptions.LoadBalanceTimeout; + internal int ObjectID => _objectID; + internal bool IsWarmupEnabled { get; set; } = true; + #endregion + + + /// + internal async Task GetInternalConnection(DbConnection owningConnection, DbConnectionOptions userOptions, TimeSpan timeout, bool async, CancellationToken cancellationToken) + { + DbConnectionInternal? connector = null; + Transaction? transaction = null; + + if (HasTransactionAffinity) + { + connector ??= GetFromTransactedPool(out transaction); + } + + connector ??= GetIdleConnector(); + + connector ??= await OpenNewInternalConnection(owningConnection, userOptions, timeout, async, cancellationToken).ConfigureAwait(false); + + if (connector != null) + { + PrepareConnection(owningConnection, connector, transaction); + return connector; + } + + // We're at max capacity. Block on the idle channel with a timeout. + // Note that Channels guarantee fair FIFO behavior to callers of ReadAsync (first-come first- + // served), which is crucial to us. + using CancellationTokenSource linkedSource = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken); + CancellationToken finalToken = linkedSource.Token; + linkedSource.CancelAfter(timeout); + //TODO: respect remaining time, linkedSource.CancelAfter(timeout.CheckAndGetTimeLeft()); + + try + { + while (true) + { + try + { + if (async) + { + connector = await _idleConnectorReader.ReadAsync(finalToken).ConfigureAwait(false); + } + else + { + SyncOverAsyncSemaphore.Wait(finalToken); + try + { + // If there are no connections in the channel, then this call will block until one is available. + // Because this call uses the managed thread pool, we need to limit the number of + // threads allowed to block here to avoid a deadlock. + ConfiguredValueTaskAwaitable.ConfiguredValueTaskAwaiter awaiter = + _idleConnectorReader.ReadAsync(finalToken).ConfigureAwait(false).GetAwaiter(); + using ManualResetEventSlim mres = new ManualResetEventSlim(false, 0); + + // Cancellation happens through the ReadAsync call, which will complete the task. + awaiter.UnsafeOnCompleted(() => mres.Set()); + mres.Wait(CancellationToken.None); + connector = awaiter.GetResult(); + } + finally + { + SyncOverAsyncSemaphore.Release(); + } + } + + if (connector != null && CheckIdleConnector(connector)) + { + PrepareConnection(owningConnection, connector, transaction); + return connector; + } + } + catch (OperationCanceledException) + { + cancellationToken.ThrowIfCancellationRequested(); + Debug.Assert(finalToken.IsCancellationRequested); + + throw ADP.PooledOpenTimeout(); + } + catch (ChannelClosedException) + { + //TODO: exceptions from resource file + throw new Exception("The connection pool has been shut down."); + } + + // If we're here, our waiting attempt on the idle connector channel was released with a null + // (or bad connector), or we're in sync mode. Check again if a new idle connector has appeared since we last checked. + connector = GetIdleConnector(); + + // We might have closed a connector in the meantime and no longer be at max capacity + // so try to open a new connector and if that fails, loop again. + connector ??= await OpenNewInternalConnection(owningConnection, userOptions, timeout, async, cancellationToken).ConfigureAwait(false); + + if (connector != null) + { + PrepareConnection(owningConnection, connector, transaction); + return connector; + } + } + } + finally + { + //TODO: log error + } + } + + /// + /// Tries to read a connector from the idle connector channel. + /// + /// Returns true if a valid idles connector is found, otherwise returns false. + /// TODO: profile the inlining to see if it's necessary + [MethodImpl(MethodImplOptions.AggressiveInlining)] + internal DbConnectionInternal? GetIdleConnector() + { + + while (_idleConnectorReader.TryRead(out DbConnectionInternal? connector)) + { + if (CheckIdleConnector(connector)) + { + return connector; + } + } + + return null; + } + + /// + /// Checks that the provided connector is live and unexpired and closes it if needed. + /// Decrements the idle count as long as the connector is not null. + /// + /// The connector to be checked. + /// Returns true if the connector is live and unexpired, otherwise returns false. + /// TODO: profile the inlining to see if it's necessary + [MethodImpl(MethodImplOptions.AggressiveInlining)] + private bool CheckIdleConnector(DbConnectionInternal? connector) + { + if (connector is null) + { + return false; + } + + // Only decrement when the connector has a value. + Interlocked.Decrement(ref _idleCount); + + return CheckConnector(connector); + } + + /// + /// Checks that the provided connector is live and unexpired and closes it if needed. + /// + /// + /// Returns true if the connector is live and unexpired, otherwise returns false. + private bool CheckConnector(DbConnectionInternal connector) + { + // If Clear/ClearAll has been been called since this connector was first opened, + // throw it away. The same if it's broken (in which case CloseConnector is only + // used to update state/perf counter). + //TODO: check clear counter + + // An connector could be broken because of a keepalive that occurred while it was + // idling in the pool + // TODO: Consider removing the pool from the keepalive code. The following branch is simply irrelevant + // if keepalive isn't turned on. + if (!connector.IsConnectionAlive()) + { + CloseConnector(connector); + return false; + } + + if (ConnectionLifetime != TimeSpan.Zero && DateTime.UtcNow > connector.OpenTimestamp + ConnectionLifetime) + { + CloseConnector(connector); + return false; + } + + return true; + } + + /// + /// Closes the provided connector and adjust pool state accordingly. + /// + /// The connector to be closed. + private void CloseConnector(DbConnectionInternal connector) + { + try + { + connector.Dispose(); + } + catch + { + //TODO: log error + } + + // TODO: check clear counter so that we don't clear new connections + + int i; + for (i = 0; i < MaxPoolSize; i++) + { + if (Interlocked.CompareExchange(ref _connectors[i], null, connector) == connector) + { + break; + } + } + + // If CloseConnector is being called from within OpenNewConnector (e.g. an error happened during a connection initializer which + // causes the connector to Break, and therefore return the connector), then we haven't yet added the connector to Connectors. + // In this case, there's no state to revert here (that's all taken care of in OpenNewConnector), skip it. + if (i == MaxPoolSize) + { + return; + } + + var numConnectors = Interlocked.Decrement(ref _numConnectors); + Debug.Assert(numConnectors >= 0); + + // If a connector has been closed for any reason, we write a null to the idle connector channel to wake up + // a waiter, who will open a new physical connection + // Statement order is important since we have synchronous completions on the channel. + _idleConnectorWriter.TryWrite(null); + + // Ensure that we return to min pool size if closing this connector brought us below min pool size. + WarmUp(); + } + + /// + /// A state object used to pass context to the rate limited connector creation operation. + /// + internal readonly struct OpenInternalConnectionState + { + internal OpenInternalConnectionState( + ChannelDbConnectionPool pool, + DbConnection? owningConnection, + DbConnectionOptions userOptions, + TimeSpan timeout) + { + Pool = pool; + OwningConnection = owningConnection; + UserOptions = userOptions; + Timeout = timeout; + } + + internal readonly ChannelDbConnectionPool Pool; + internal readonly DbConnection? OwningConnection; + internal readonly DbConnectionOptions UserOptions; + internal readonly TimeSpan Timeout; + } + + /// + internal Task OpenNewInternalConnection(DbConnection? owningConnection, DbConnectionOptions userOptions, TimeSpan timeout, bool async, CancellationToken cancellationToken) + { + return _connectionRateLimiter.Execute( + RateLimitedOpen, + new OpenInternalConnectionState( + pool: this, + owningConnection: owningConnection, + userOptions: userOptions, + timeout: timeout + ), + async, + cancellationToken + ); + + + static Task RateLimitedOpen(OpenInternalConnectionState state, bool async, CancellationToken cancellationToken) + { + // As long as we're under max capacity, attempt to increase the connector count and open a new connection. + for (var numConnectors = state.Pool._numConnectors; numConnectors < state.Pool.MaxPoolSize; numConnectors = state.Pool._numConnectors) + { + // Note that we purposefully don't use SpinWait for this: https://github.com/dotnet/coreclr/pull/21437 + if (Interlocked.CompareExchange(ref state.Pool._numConnectors, numConnectors + 1, numConnectors) != numConnectors) + { + continue; + } + + try + { + // We've managed to increase the open counter, open a physical connection. + var startTime = Stopwatch.GetTimestamp(); + + // TODO: This blocks the thread for several network calls! + // This will be unexpected to async callers. + // Our options are limited because DbConnectionInternal doesn't support async open. + // It's better to block this thread and keep throughput high than to queue all of our opens onto a single worker thread. + // Add an async path when this support is added to DbConnectionInternal. + DbConnectionInternal? newConnection = state.Pool.ConnectionFactory.CreatePooledConnection( + state.Pool, + state.OwningConnection, + state.Pool._connectionPoolGroup.ConnectionOptions, + state.Pool._connectionPoolGroup.PoolKey, + state.UserOptions); + + if (newConnection == null) + { + throw ADP.InternalError(ADP.InternalErrorCode.CreateObjectReturnedNull); // CreateObject succeeded, but null object + } + if (!newConnection.CanBePooled) + { + throw ADP.InternalError(ADP.InternalErrorCode.NewObjectCannotBePooled); // CreateObject succeeded, but non-poolable object + } + + newConnection.PrePush(null); + + int i; + for (i = 0; i < state.Pool.MaxPoolSize; i++) + { + if (Interlocked.CompareExchange(ref state.Pool._connectors[i], newConnection, null) == null) + { + break; + } + } + + Debug.Assert(i < state.Pool.MaxPoolSize, $"Could not find free slot in {state.Pool._connectors} when opening."); + if (i == state.Pool.MaxPoolSize) + { + //TODO: generic exception? + throw new Exception($"Could not find free slot in {state.Pool._connectors} when opening. Please report a bug."); + } + + return Task.FromResult(newConnection); + } + catch + { + // Physical open failed, decrement the open and busy counter back down. + Interlocked.Decrement(ref state.Pool._numConnectors); + + // In case there's a waiting attempt on the channel, we write a null to the idle connector channel + // to wake it up, so it will try opening (and probably throw immediately) + // Statement order is important since we have synchronous completions on the channel. + state.Pool._idleConnectorWriter.TryWrite(null); + + throw; + } + } + + return Task.FromResult(null); + } + } + + /// + /// Initiates a task to prune idle connections from the pool. + /// + internal void PruneIdleConnections(object? state) + { + // Important not to throw here. We're not in an async function, so there's no task to automatically + // propagate the exception to. If we throw, we'll crash the process. + if (_shutdownCT.IsCancellationRequested) + { + return; + } + + if (State is ShuttingDown || !PruningTask.IsCompleted || !PruningLock.Wait(0)) + { + return; + } + + try + { + if (PruningTask.IsCompleted && State is Running) + { + PruningTask = _PruneIdleConnections(); + } + } + finally + { + PruningLock.Release(); + } + + return; + + async Task _PruneIdleConnections() + { + try + { + int numConnectionsToPrune = _minIdleCount; + + // Reset _minIdleCount for the next pruning period + _minIdleCount = int.MaxValue; + + // If we don't stop on null, we might cycle a bit? + // we might read out all of the nulls we just wrote into the channel + // that might not be bad... + while (numConnectionsToPrune > 0 && + _numConnectors > MinPoolSize && + _idleConnectorReader.TryRead(out var connector)) + { + _shutdownCT.ThrowIfCancellationRequested(); + + if (connector == null) + { + continue; + } + + if (CheckIdleConnector(connector)) + { + CloseConnector(connector); + } + + numConnectionsToPrune--; + } + } + catch + { + // TODO: log exception + } + + // Min pool size check above is best effort and may over prune. + // Ensure warmup runs to bring us back up to min pool size if necessary. + await WarmUp().ConfigureAwait(false); + } + } + + /// + /// Periodically checks the current idle connector count to maintain a minimum idle connector count. + /// Runs indefinitely until the timer is disposed or a cancellation is indicated on the pool shutdown + /// cancellation token. + /// + /// A ValueTask tracking this operation. + internal void UpdateMinIdleCount(object? state) + { + // Important not to throw here. We're not in an async function, so there's no task to automatically + // propagate the exception to. If we throw, we'll crash the process. + if (_shutdownCT.IsCancellationRequested) + { + return; + } + + try + { + if (State is not Running) + { + return; + } + try + { + int currentMinIdle; + int currentIdle; + do + { + currentMinIdle = _minIdleCount; + currentIdle = _idleCount; + if (currentIdle >= currentMinIdle) + { + break; + } + } + while (Interlocked.CompareExchange(ref _minIdleCount, currentIdle, currentMinIdle) != currentMinIdle); + } + catch + { + // TODO: log exception + } + } catch (OperationCanceledException) + { + // TODO: log here? + } + } + + /// + /// Warms up the pool by bringing it up to min pool size. + /// We may await the underlying operation multiple times, so we need to use Task + /// in place of ValueTask so that it cannot be recycled. + /// + /// A ValueTask containing a Task that represents the warmup process. + internal Task WarmUp() + { + if (State is ShuttingDown || !IsWarmupEnabled) + { + return Task.CompletedTask; + } + + // Avoid semaphore wait if task is still running + if (!_warmupTask.IsCompleted || !_warmupLock.Wait(0)) + { + return _warmupTask; + } + + try + { + // The task may have been started by another thread while we were + // waiting on the semaphore + if (_warmupTask.IsCompleted && State is Running) + { + _warmupTask = _WarmUp(_shutdownCT); + } + } + finally + { + _warmupLock.Release(); + } + + return _warmupTask; + + async Task _WarmUp(CancellationToken ct) + { + // Best effort, we may over or under create due to race conditions. + // Open new connections slowly. If many connections are needed immediately + // upon pool creation they can always be created via user-initiated requests as fast + // as a parallel, pool-initiated approach could. + while (_numConnectors < MinPoolSize) + { + ct.ThrowIfCancellationRequested(); + + // Obey the same rate limit as user-initiated opens. + // Ensures that pool-initiated opens are queued properly alongside user requests. + DbConnectionInternal? connector = await OpenNewInternalConnection( + null, + // connections opened by the pool use the pool groups options in place of user provided options + _connectionPoolGroup.ConnectionOptions, + ConnectionLifetime, + true, + ct) + .ConfigureAwait(false); + + // If connector is null, then we hit the max pool size and can stop + // warming up the pool. + if (connector == null) + { + return; + } + + // The connector has never been used, so it's safe to immediately add it to the + // pool without resetting it. + Interlocked.Increment(ref _idleCount); + var written = _idleConnectorWriter.TryWrite(connector); + Debug.Assert(written); + } + } + } + + /// + /// Shutsdown the pool and disposes pool resources. + /// + internal async Task ShutdownAsync() + { + SqlClientEventSource.Log.TryPoolerTraceEvent(" {0}", ObjectID); + + State = ShuttingDown; + + // Cancel background tasks + _shutdownCTS.Cancel(); + await Task.WhenAll( + PruningTask, + _warmupTask).ConfigureAwait(false); + + // Clean pool state + Clear(); + + // Handle disposable resources + _shutdownCTS.Dispose(); + _warmupLock.Dispose(); + PruningTimer.Dispose(); + MinIdleCountTimer.Dispose(); + _connectionRateLimiter?.Dispose(); + } + + // TODO: override clear method + #endregion + } +} diff --git a/src/Microsoft.Data.SqlClient/src/Microsoft/Data/SqlClient/ConnectionPool/DbConnectionPool.cs b/src/Microsoft.Data.SqlClient/src/Microsoft/Data/SqlClient/ConnectionPool/DbConnectionPool.cs index 0d00227469..6b308e7c47 100644 --- a/src/Microsoft.Data.SqlClient/src/Microsoft/Data/SqlClient/ConnectionPool/DbConnectionPool.cs +++ b/src/Microsoft.Data.SqlClient/src/Microsoft/Data/SqlClient/ConnectionPool/DbConnectionPool.cs @@ -47,15 +47,11 @@ internal abstract class DbConnectionPool #region Abstract Methods internal abstract void Clear(); - internal abstract void DestroyObject(DbConnectionInternal obj); - - internal abstract bool TryGetConnection(DbConnection owningObject, TaskCompletionSource retry, DbConnectionOptions userOptions, out DbConnectionInternal connection); + internal abstract bool TryGetConnection(DbConnection owningObject, TaskCompletionSource taskCompletionSource, DbConnectionOptions userOptions, out DbConnectionInternal connection); internal abstract DbConnectionInternal ReplaceConnection(DbConnection owningObject, DbConnectionOptions userOptions, DbConnectionInternal oldConnection); - internal abstract void PutNewObject(DbConnectionInternal obj); - - internal abstract void PutObject(DbConnectionInternal obj, object owningObject); + internal abstract void ReturnInternalConnection(DbConnectionInternal obj, object owningObject); internal abstract void PutObjectFromTransactedPool(DbConnectionInternal obj); diff --git a/src/Microsoft.Data.SqlClient/src/Microsoft/Data/SqlClient/ConnectionPool/DbConnectionPoolGroup.cs b/src/Microsoft.Data.SqlClient/src/Microsoft/Data/SqlClient/ConnectionPool/DbConnectionPoolGroup.cs index af3b2d6bdf..f63880dd7b 100644 --- a/src/Microsoft.Data.SqlClient/src/Microsoft/Data/SqlClient/ConnectionPool/DbConnectionPoolGroup.cs +++ b/src/Microsoft.Data.SqlClient/src/Microsoft/Data/SqlClient/ConnectionPool/DbConnectionPoolGroup.cs @@ -3,12 +3,13 @@ // See the LICENSE file in the project root for more information. -using Microsoft.Data.Common; -using Microsoft.Data.ProviderBase; using System.Collections.Concurrent; using System.Collections.Generic; using System.Diagnostics; using System.Threading; +using Microsoft.Data.Common; +using Microsoft.Data.ProviderBase; +using Microsoft.Data.SqlClient.RateLimiter; namespace Microsoft.Data.SqlClient.ConnectionPool { @@ -185,7 +186,15 @@ internal DbConnectionPool GetConnectionPool(DbConnectionFactory connectionFactor if (!_poolCollection.TryGetValue(currentIdentity, out pool)) { DbConnectionPoolProviderInfo connectionPoolProviderInfo = connectionFactory.CreateConnectionPoolProviderInfo(ConnectionOptions); - DbConnectionPool newPool = new WaitHandleDbConnectionPool(connectionFactory, this, currentIdentity, connectionPoolProviderInfo); + DbConnectionPool newPool; + if (LocalAppContextSwitches.UseLegacyConnectionPool) + { + newPool = new WaitHandleDbConnectionPool(connectionFactory, this, currentIdentity, connectionPoolProviderInfo); + } + else + { + newPool = new ChannelDbConnectionPool(connectionFactory, this, currentIdentity, connectionPoolProviderInfo, new PassthroughRateLimiter()); + } if (MarkPoolGroupAsActive()) { diff --git a/src/Microsoft.Data.SqlClient/src/Microsoft/Data/SqlClient/ConnectionPool/TransactedConnectionPool.cs b/src/Microsoft.Data.SqlClient/src/Microsoft/Data/SqlClient/ConnectionPool/TransactedConnectionPool.cs new file mode 100644 index 0000000000..cec29f8ab7 --- /dev/null +++ b/src/Microsoft.Data.SqlClient/src/Microsoft/Data/SqlClient/ConnectionPool/TransactedConnectionPool.cs @@ -0,0 +1,271 @@ +using System.Collections.Generic; +using System.Diagnostics; +using System.Transactions; +using Microsoft.Data.ProviderBase; + +namespace Microsoft.Data.SqlClient.ConnectionPool +{ + internal class TransactedConnectionPool + { + // This class is a way to stash our cloned Tx key for later disposal when it's no longer needed. + // We can't get at the key in the dictionary without enumerating entries, so we stash an extra + // copy as part of the value. + private sealed class TransactedConnectionList : List + { + private Transaction _transaction; + internal TransactedConnectionList(int initialAllocation, Transaction tx) : base(initialAllocation) + { + _transaction = tx; + } + + internal void Dispose() + { + if (_transaction != null) + { + _transaction.Dispose(); + } + } + } + + Dictionary _transactedCxns; + + DbConnectionPool _pool; + + private static int _objectTypeCount; // EventSource Counter + internal readonly int _objectID = System.Threading.Interlocked.Increment(ref _objectTypeCount); + + internal TransactedConnectionPool(DbConnectionPool pool) + { + Debug.Assert(pool != null, "null pool?"); + + _pool = pool; + _transactedCxns = new Dictionary(); + SqlClientEventSource.Log.TryPoolerTraceEvent(" {0}, Constructed for connection pool {1}", ObjectID, _pool.ObjectId); + } + + internal int ObjectID + { + get + { + return _objectID; + } + } + + internal DbConnectionPool Pool + { + get + { + return _pool; + } + } + + internal DbConnectionInternal GetTransactedObject(Transaction transaction) + { + Debug.Assert(transaction != null, "null transaction?"); + + DbConnectionInternal transactedObject = null; + + TransactedConnectionList connections; + bool txnFound = false; + + lock (_transactedCxns) + { + txnFound = _transactedCxns.TryGetValue(transaction, out connections); + } + + // NOTE: GetTransactedObject is only used when AutoEnlist = True and the ambient transaction + // (Sys.Txns.Txn.Current) is still valid/non-null. This, in turn, means that we don't need + // to worry about a pending asynchronous TransactionCompletedEvent to trigger processing in + // TransactionEnded below and potentially wipe out the connections list underneath us. It + // is similarly alright if a pending addition to the connections list in PutTransactedObject + // below is not completed prior to the lock on the connections object here...getting a new + // connection is probably better than unnecessarily locking + if (txnFound) + { + Debug.Assert(connections != null); + + // synchronize multi-threaded access with PutTransactedObject (TransactionEnded should + // not be a concern, see comments above) + lock (connections) + { + int i = connections.Count - 1; + if (0 <= i) + { + transactedObject = connections[i]; + connections.RemoveAt(i); + } + } + } + + if (transactedObject != null) + { + SqlClientEventSource.Log.TryPoolerTraceEvent(" {0}, Transaction {1}, Connection {2}, Popped.", ObjectID, transaction.GetHashCode(), transactedObject.ObjectID); + } + return transactedObject; + } + + internal void PutTransactedObject(Transaction transaction, DbConnectionInternal transactedObject) + { + Debug.Assert(transaction != null, "null transaction?"); + Debug.Assert(transactedObject != null, "null transactedObject?"); + + TransactedConnectionList connections; + bool txnFound = false; + + // NOTE: because TransactionEnded is an asynchronous notification, there's no guarantee + // around the order in which PutTransactionObject and TransactionEnded are called. + + lock (_transactedCxns) + { + // Check if a transacted pool has been created for this transaction + if (txnFound = _transactedCxns.TryGetValue(transaction, out connections)) + { + Debug.Assert(connections != null); + + // synchronize multi-threaded access with GetTransactedObject + lock (connections) + { + Debug.Assert(0 > connections.IndexOf(transactedObject), "adding to pool a second time?"); + SqlClientEventSource.Log.TryPoolerTraceEvent(" {0}, Transaction {1}, Connection {2}, Pushing.", ObjectID, transaction.GetHashCode(), transactedObject.ObjectID); + connections.Add(transactedObject); + } + } + } + + // CONSIDER: the following code is more complicated than it needs to be to avoid cloning the + // transaction and allocating memory within a lock. Is that complexity really necessary? + if (!txnFound) + { + // create the transacted pool, making sure to clone the associated transaction + // for use as a key in our internal dictionary of transactions and connections + Transaction transactionClone = null; + TransactedConnectionList newConnections = null; + + try + { + transactionClone = transaction.Clone(); + newConnections = new TransactedConnectionList(2, transactionClone); // start with only two connections in the list; most times we won't need that many. + + lock (_transactedCxns) + { + // NOTE: in the interim between the locks on the transacted pool (this) during + // execution of this method, another thread (threadB) may have attempted to + // add a different connection to the transacted pool under the same + // transaction. As a result, threadB may have completed creating the + // transacted pool while threadA was processing the above instructions. + if (txnFound = _transactedCxns.TryGetValue(transaction, out connections)) + { + Debug.Assert(connections != null); + + // synchronize multi-threaded access with GetTransactedObject + lock (connections) + { + Debug.Assert(0 > connections.IndexOf(transactedObject), "adding to pool a second time?"); + SqlClientEventSource.Log.TryPoolerTraceEvent(" {0}, Transaction {1}, Connection {2}, Pushing.", ObjectID, transaction.GetHashCode(), transactedObject.ObjectID); + connections.Add(transactedObject); + } + } + else + { + SqlClientEventSource.Log.TryPoolerTraceEvent(" {0}, Transaction {1}, Connection {2}, Adding List to transacted pool.", ObjectID, transaction.GetHashCode(), transactedObject.ObjectID); + + // add the connection/transacted object to the list + newConnections.Add(transactedObject); + + _transactedCxns.Add(transactionClone, newConnections); + transactionClone = null; // we've used it -- don't throw it or the TransactedConnectionList that references it away. + } + } + } + finally + { + if (transactionClone != null) + { + if (newConnections != null) + { + // another thread created the transaction pool and thus the new + // TransactedConnectionList was not used, so dispose of it and + // the transaction clone that it incorporates. + newConnections.Dispose(); + } + else + { + // memory allocation for newConnections failed...clean up unused transactionClone + transactionClone.Dispose(); + } + } + } + SqlClientEventSource.Log.TryPoolerTraceEvent(" {0}, Transaction {1}, Connection {2}, Added.", ObjectID, transaction.GetHashCode(), transactedObject.ObjectID); + } + + SqlClientEventSource.Metrics.EnterFreeConnection(); + } + + internal void TransactionEnded(Transaction transaction, DbConnectionInternal transactedObject) + { + SqlClientEventSource.Log.TryPoolerTraceEvent(" {0}, Transaction {1}, Connection {2}, Transaction Completed", ObjectID, transaction.GetHashCode(), transactedObject.ObjectID); + TransactedConnectionList connections; + int entry = -1; + + // NOTE: because TransactionEnded is an asynchronous notification, there's no guarantee + // around the order in which PutTransactionObject and TransactionEnded are called. As + // such, it is possible that the transaction does not yet have a pool created. + + // TODO: is this a plausible and/or likely scenario? Do we need to have a mechanism to ensure + // TODO: that the pending creation of a transacted pool for this transaction is aborted when + // TODO: PutTransactedObject finally gets some CPU time? + + lock (_transactedCxns) + { + if (_transactedCxns.TryGetValue(transaction, out connections)) + { + Debug.Assert(connections != null); + + bool shouldDisposeConnections = false; + + // Lock connections to avoid conflict with GetTransactionObject + lock (connections) + { + entry = connections.IndexOf(transactedObject); + + if (entry >= 0) + { + connections.RemoveAt(entry); + } + + // Once we've completed all the ended notifications, we can + // safely remove the list from the transacted pool. + if (0 >= connections.Count) + { + SqlClientEventSource.Log.TryPoolerTraceEvent(" {0}, Transaction {1}, Removing List from transacted pool.", ObjectID, transaction.GetHashCode()); + _transactedCxns.Remove(transaction); + + // we really need to dispose our connection list; it may have + // native resources via the tx and GC may not happen soon enough. + shouldDisposeConnections = true; + } + } + + if (shouldDisposeConnections) + { + connections.Dispose(); + } + } + else + { + SqlClientEventSource.Log.TryPoolerTraceEvent(" {0}, Transaction {1}, Connection {2}, Transacted pool not yet created prior to transaction completing. Connection may be leaked.", ObjectID, transaction.GetHashCode(), transactedObject.ObjectID); + } + } + + // If (and only if) we found the connection in the list of + // connections, we'll put it back... + if (0 <= entry) + { + + SqlClientEventSource.Metrics.ExitFreeConnection(); + Pool.PutObjectFromTransactedPool(transactedObject); + } + } + + } +} diff --git a/src/Microsoft.Data.SqlClient/src/Microsoft/Data/SqlClient/ConnectionPool/WaitHandleDbConnectionPool.cs b/src/Microsoft.Data.SqlClient/src/Microsoft/Data/SqlClient/ConnectionPool/WaitHandleDbConnectionPool.cs index 9db1c7a5ec..5c022a1ddd 100644 --- a/src/Microsoft.Data.SqlClient/src/Microsoft/Data/SqlClient/ConnectionPool/WaitHandleDbConnectionPool.cs +++ b/src/Microsoft.Data.SqlClient/src/Microsoft/Data/SqlClient/ConnectionPool/WaitHandleDbConnectionPool.cs @@ -21,26 +21,6 @@ namespace Microsoft.Data.SqlClient.ConnectionPool { internal sealed class WaitHandleDbConnectionPool : DbConnectionPool { - // This class is a way to stash our cloned Tx key for later disposal when it's no longer needed. - // We can't get at the key in the dictionary without enumerating entries, so we stash an extra - // copy as part of the value. - private sealed class TransactedConnectionList : List - { - private Transaction _transaction; - internal TransactedConnectionList(int initialAllocation, Transaction tx) : base(initialAllocation) - { - _transaction = tx; - } - - internal void Dispose() - { - if (_transaction != null) - { - _transaction.Dispose(); - } - } - } - private sealed class PendingGetConnection { public PendingGetConnection(long dueTime, DbConnection owner, TaskCompletionSource completion, DbConnectionOptions userOptions) @@ -56,250 +36,6 @@ public PendingGetConnection(long dueTime, DbConnection owner, TaskCompletionSour public DbConnectionOptions UserOptions { get; private set; } } - private sealed class TransactedConnectionPool - { - Dictionary _transactedCxns; - - DbConnectionPool _pool; - - private static int _objectTypeCount; // EventSource Counter - internal readonly int _objectID = System.Threading.Interlocked.Increment(ref _objectTypeCount); - - internal TransactedConnectionPool(DbConnectionPool pool) - { - Debug.Assert(pool != null, "null pool?"); - - _pool = pool; - _transactedCxns = new Dictionary(); - SqlClientEventSource.Log.TryPoolerTraceEvent(" {0}, Constructed for connection pool {1}", ObjectID, _pool.ObjectId); - } - - internal int ObjectID - { - get - { - return _objectID; - } - } - - internal DbConnectionPool Pool - { - get - { - return _pool; - } - } - - internal DbConnectionInternal GetTransactedObject(Transaction transaction) - { - Debug.Assert(transaction != null, "null transaction?"); - - DbConnectionInternal transactedObject = null; - - TransactedConnectionList connections; - bool txnFound = false; - - lock (_transactedCxns) - { - txnFound = _transactedCxns.TryGetValue(transaction, out connections); - } - - // NOTE: GetTransactedObject is only used when AutoEnlist = True and the ambient transaction - // (Sys.Txns.Txn.Current) is still valid/non-null. This, in turn, means that we don't need - // to worry about a pending asynchronous TransactionCompletedEvent to trigger processing in - // TransactionEnded below and potentially wipe out the connections list underneath us. It - // is similarly alright if a pending addition to the connections list in PutTransactedObject - // below is not completed prior to the lock on the connections object here...getting a new - // connection is probably better than unnecessarily locking - if (txnFound) - { - Debug.Assert(connections != null); - - // synchronize multi-threaded access with PutTransactedObject (TransactionEnded should - // not be a concern, see comments above) - lock (connections) - { - int i = connections.Count - 1; - if (0 <= i) - { - transactedObject = connections[i]; - connections.RemoveAt(i); - } - } - } - - if (transactedObject != null) - { - SqlClientEventSource.Log.TryPoolerTraceEvent(" {0}, Transaction {1}, Connection {2}, Popped.", ObjectID, transaction.GetHashCode(), transactedObject.ObjectID); - } - return transactedObject; - } - - internal void PutTransactedObject(Transaction transaction, DbConnectionInternal transactedObject) - { - Debug.Assert(transaction != null, "null transaction?"); - Debug.Assert(transactedObject != null, "null transactedObject?"); - - TransactedConnectionList connections; - bool txnFound = false; - - // NOTE: because TransactionEnded is an asynchronous notification, there's no guarantee - // around the order in which PutTransactionObject and TransactionEnded are called. - - lock (_transactedCxns) - { - // Check if a transacted pool has been created for this transaction - if (txnFound = _transactedCxns.TryGetValue(transaction, out connections)) - { - Debug.Assert(connections != null); - - // synchronize multi-threaded access with GetTransactedObject - lock (connections) - { - Debug.Assert(0 > connections.IndexOf(transactedObject), "adding to pool a second time?"); - SqlClientEventSource.Log.TryPoolerTraceEvent(" {0}, Transaction {1}, Connection {2}, Pushing.", ObjectID, transaction.GetHashCode(), transactedObject.ObjectID); - connections.Add(transactedObject); - } - } - } - - // CONSIDER: the following code is more complicated than it needs to be to avoid cloning the - // transaction and allocating memory within a lock. Is that complexity really necessary? - if (!txnFound) - { - // create the transacted pool, making sure to clone the associated transaction - // for use as a key in our internal dictionary of transactions and connections - Transaction transactionClone = null; - TransactedConnectionList newConnections = null; - - try - { - transactionClone = transaction.Clone(); - newConnections = new TransactedConnectionList(2, transactionClone); // start with only two connections in the list; most times we won't need that many. - - lock (_transactedCxns) - { - // NOTE: in the interim between the locks on the transacted pool (this) during - // execution of this method, another thread (threadB) may have attempted to - // add a different connection to the transacted pool under the same - // transaction. As a result, threadB may have completed creating the - // transacted pool while threadA was processing the above instructions. - if (txnFound = _transactedCxns.TryGetValue(transaction, out connections)) - { - Debug.Assert(connections != null); - - // synchronize multi-threaded access with GetTransactedObject - lock (connections) - { - Debug.Assert(0 > connections.IndexOf(transactedObject), "adding to pool a second time?"); - SqlClientEventSource.Log.TryPoolerTraceEvent(" {0}, Transaction {1}, Connection {2}, Pushing.", ObjectID, transaction.GetHashCode(), transactedObject.ObjectID); - connections.Add(transactedObject); - } - } - else - { - SqlClientEventSource.Log.TryPoolerTraceEvent(" {0}, Transaction {1}, Connection {2}, Adding List to transacted pool.", ObjectID, transaction.GetHashCode(), transactedObject.ObjectID); - - // add the connection/transacted object to the list - newConnections.Add(transactedObject); - - _transactedCxns.Add(transactionClone, newConnections); - transactionClone = null; // we've used it -- don't throw it or the TransactedConnectionList that references it away. - } - } - } - finally - { - if (transactionClone != null) - { - if (newConnections != null) - { - // another thread created the transaction pool and thus the new - // TransactedConnectionList was not used, so dispose of it and - // the transaction clone that it incorporates. - newConnections.Dispose(); - } - else - { - // memory allocation for newConnections failed...clean up unused transactionClone - transactionClone.Dispose(); - } - } - } - SqlClientEventSource.Log.TryPoolerTraceEvent(" {0}, Transaction {1}, Connection {2}, Added.", ObjectID, transaction.GetHashCode(), transactedObject.ObjectID); - } - - SqlClientEventSource.Metrics.EnterFreeConnection(); - } - - internal void TransactionEnded(Transaction transaction, DbConnectionInternal transactedObject) - { - SqlClientEventSource.Log.TryPoolerTraceEvent(" {0}, Transaction {1}, Connection {2}, Transaction Completed", ObjectID, transaction.GetHashCode(), transactedObject.ObjectID); - TransactedConnectionList connections; - int entry = -1; - - // NOTE: because TransactionEnded is an asynchronous notification, there's no guarantee - // around the order in which PutTransactionObject and TransactionEnded are called. As - // such, it is possible that the transaction does not yet have a pool created. - - // TODO: is this a plausible and/or likely scenario? Do we need to have a mechanism to ensure - // TODO: that the pending creation of a transacted pool for this transaction is aborted when - // TODO: PutTransactedObject finally gets some CPU time? - - lock (_transactedCxns) - { - if (_transactedCxns.TryGetValue(transaction, out connections)) - { - Debug.Assert(connections != null); - - bool shouldDisposeConnections = false; - - // Lock connections to avoid conflict with GetTransactionObject - lock (connections) - { - entry = connections.IndexOf(transactedObject); - - if (entry >= 0) - { - connections.RemoveAt(entry); - } - - // Once we've completed all the ended notifications, we can - // safely remove the list from the transacted pool. - if (0 >= connections.Count) - { - SqlClientEventSource.Log.TryPoolerTraceEvent(" {0}, Transaction {1}, Removing List from transacted pool.", ObjectID, transaction.GetHashCode()); - _transactedCxns.Remove(transaction); - - // we really need to dispose our connection list; it may have - // native resources via the tx and GC may not happen soon enough. - shouldDisposeConnections = true; - } - } - - if (shouldDisposeConnections) - { - connections.Dispose(); - } - } - else - { - SqlClientEventSource.Log.TryPoolerTraceEvent(" {0}, Transaction {1}, Connection {2}, Transacted pool not yet created prior to transaction completing. Connection may be leaked.", ObjectID, transaction.GetHashCode(), transactedObject.ObjectID); - } - } - - // If (and only if) we found the connection in the list of - // connections, we'll put it back... - if (0 <= entry) - { - - SqlClientEventSource.Metrics.ExitFreeConnection(); - Pool.PutObjectFromTransactedPool(transactedObject); - } - } - - } - private sealed class PoolWaitHandles { private readonly Semaphore _poolSemaphore; @@ -934,7 +670,7 @@ private void DeactivateObject(DbConnectionInternal obj) Debug.Assert(rootTxn == true || returnToGeneralPool == true || destroyObject == true); } - internal override void DestroyObject(DbConnectionInternal obj) + private void DestroyObject(DbConnectionInternal obj) { // A connection with a delegated transaction cannot be disposed of // until the delegated transaction has actually completed. Instead, @@ -1093,7 +829,7 @@ private void WaitForPendingOpen() if (!next.Completion.TrySetResult(connection)) { // if the completion was cancelled, lets try and get this connection back for the next try - PutObject(connection, next.Owner); + ReturnInternalConnection(connection, next.Owner); } } } @@ -1108,12 +844,12 @@ private void WaitForPendingOpen() } while (_pendingOpens.TryPeek(out next)); } - internal override bool TryGetConnection(DbConnection owningObject, TaskCompletionSource retry, DbConnectionOptions userOptions, out DbConnectionInternal connection) + internal override bool TryGetConnection(DbConnection owningObject, TaskCompletionSource taskCompletionSource, DbConnectionOptions userOptions, out DbConnectionInternal connection) { uint waitForMultipleObjectsTimeout = 0; bool allowCreate = false; - if (retry == null) + if (taskCompletionSource == null) { waitForMultipleObjectsTimeout = (uint)CreationTimeout; @@ -1136,7 +872,7 @@ internal override bool TryGetConnection(DbConnection owningObject, TaskCompletio { return true; } - else if (retry == null) + else if (taskCompletionSource == null) { // timed out on a sync call return true; @@ -1146,7 +882,7 @@ internal override bool TryGetConnection(DbConnection owningObject, TaskCompletio new PendingGetConnection( CreationTimeout == 0 ? Timeout.Infinite : ADP.TimerCurrent() + ADP.TimerFromSeconds(CreationTimeout / 1000), owningObject, - retry, + taskCompletionSource, userOptions); _pendingOpens.Enqueue(pendingGetConnection); @@ -1376,7 +1112,7 @@ private void PrepareConnection(DbConnection owningObject, DbConnectionInternal o { // if Activate throws an exception // put it back in the pool or have it properly disposed of - this.PutObject(obj, owningObject); + this.ReturnInternalConnection(obj, owningObject); throw; } } @@ -1613,7 +1349,7 @@ private void PoolCreateRequest(object state) } } - internal override void PutNewObject(DbConnectionInternal obj) + private void PutNewObject(DbConnectionInternal obj) { Debug.Assert(obj != null, "why are we adding a null object to the pool?"); @@ -1626,7 +1362,7 @@ internal override void PutNewObject(DbConnectionInternal obj) } - internal override void PutObject(DbConnectionInternal obj, object owningObject) + internal override void ReturnInternalConnection(DbConnectionInternal obj, object owningObject) { Debug.Assert(obj != null, "null obj?"); diff --git a/src/Microsoft.Data.SqlClient/src/Microsoft/Data/SqlClient/LocalAppContextSwitches.cs b/src/Microsoft.Data.SqlClient/src/Microsoft/Data/SqlClient/LocalAppContextSwitches.cs index 23eae838ae..7b566199d8 100644 --- a/src/Microsoft.Data.SqlClient/src/Microsoft/Data/SqlClient/LocalAppContextSwitches.cs +++ b/src/Microsoft.Data.SqlClient/src/Microsoft/Data/SqlClient/LocalAppContextSwitches.cs @@ -22,6 +22,7 @@ private enum Tristate : byte internal const string LegacyVarTimeZeroScaleBehaviourString = @"Switch.Microsoft.Data.SqlClient.LegacyVarTimeZeroScaleBehaviour"; internal const string UseCompatibilityProcessSniString = @"Switch.Microsoft.Data.SqlClient.UseCompatibilityProcessSni"; internal const string UseCompatibilityAsyncBehaviourString = @"Switch.Microsoft.Data.SqlClient.UseCompatibilityAsyncBehaviour"; + internal const string UseLegacyConnectionPoolString = @"Switch.Microsoft.Data.SqlClient.UseLegacyConnectionPool"; // this field is accessed through reflection in tests and should not be renamed or have the type changed without refactoring NullRow related tests private static Tristate s_legacyRowVersionNullBehavior; @@ -32,6 +33,7 @@ private enum Tristate : byte private static Tristate s_legacyVarTimeZeroScaleBehaviour; private static Tristate s_useCompatProcessSni; private static Tristate s_useCompatAsyncBehaviour; + private static Tristate s_useLegacyConnectionPool; #if NET static LocalAppContextSwitches() @@ -87,6 +89,7 @@ public static bool DisableTNIRByDefault } } #endif + /// /// In TdsParser the ProcessSni function changed significantly when the packet /// multiplexing code needed for high speed multi-packet column values was added. @@ -148,6 +151,25 @@ public static bool UseCompatibilityAsyncBehaviour } } + public static bool UseLegacyConnectionPool + { + get + { + if (s_useLegacyConnectionPool == Tristate.NotInitialized) + { + if (AppContext.TryGetSwitch(UseLegacyConnectionPoolString, out bool returnedValue) && returnedValue) + { + s_useLegacyConnectionPool = Tristate.True; + } + else + { + s_useLegacyConnectionPool = Tristate.False; + } + } + return s_useLegacyConnectionPool == Tristate.True; + } + } + /// /// When using Encrypt=false in the connection string, a security warning is output to the console if the TLS version is 1.2 or lower. /// This warning can be suppressed by enabling this AppContext switch. diff --git a/src/Microsoft.Data.SqlClient/src/Microsoft/Data/SqlClient/RateLimiter/AsyncFlagFunc.cs b/src/Microsoft.Data.SqlClient/src/Microsoft/Data/SqlClient/RateLimiter/AsyncFlagFunc.cs new file mode 100644 index 0000000000..f50320310c --- /dev/null +++ b/src/Microsoft.Data.SqlClient/src/Microsoft/Data/SqlClient/RateLimiter/AsyncFlagFunc.cs @@ -0,0 +1,20 @@ +// Licensed to the .NET Foundation under one or more agreements. +// The .NET Foundation licenses this file to you under the MIT license. +// See the LICENSE file in the project root for more information. + +using System.Threading; + +namespace Microsoft.Data.SqlClient.RateLimiter +{ + /// + /// A function that operates asynchronously based on a flag. If isAsync is true, the function operates asynchronously. + /// If isAsync is false, the function operates synchronously. + /// + /// The type accepted by the callback as input. + /// The type returned by the callback. + /// An instance of State to be passed to the callback. + /// Indicates whether the function should operate asynchronously. + /// Allows cancellation of the operation. + /// Returns the result of the callback. + internal delegate TResult AsyncFlagFunc(TState state, bool isAsync, CancellationToken cancellationToken); +} diff --git a/src/Microsoft.Data.SqlClient/src/Microsoft/Data/SqlClient/RateLimiter/BlockingPeriodRateLimiter.cs b/src/Microsoft.Data.SqlClient/src/Microsoft/Data/SqlClient/RateLimiter/BlockingPeriodRateLimiter.cs new file mode 100644 index 0000000000..df4a444ecb --- /dev/null +++ b/src/Microsoft.Data.SqlClient/src/Microsoft/Data/SqlClient/RateLimiter/BlockingPeriodRateLimiter.cs @@ -0,0 +1,38 @@ +// Licensed to the .NET Foundation under one or more agreements. +// The .NET Foundation licenses this file to you under the MIT license. +// See the LICENSE file in the project root for more information. + +using System; +using System.Threading; +using System.Threading.Tasks; + +#nullable enable + +namespace Microsoft.Data.SqlClient.RateLimiter +{ + /// + /// A rate limiter that enforces a backoff (blocking) period upon error. + /// Each subsequent error increases the blocking duration, up to a maximum, until a success occurs. + /// + internal sealed class BlockingPeriodRateLimiter : RateLimiterBase + { + public BlockingPeriodRateLimiter(RateLimiterBase? next = null) : base(next) + { + } + + /// + internal override Task Execute( + AsyncFlagFunc> callback, + State state, + bool isAsync, + CancellationToken cancellationToken = default) + { + throw new NotImplementedException(); + } + + public override void Dispose() + { + throw new NotImplementedException(); + } + } +} diff --git a/src/Microsoft.Data.SqlClient/src/Microsoft/Data/SqlClient/RateLimiter/ConcurrencyRateLimiter.cs b/src/Microsoft.Data.SqlClient/src/Microsoft/Data/SqlClient/RateLimiter/ConcurrencyRateLimiter.cs new file mode 100644 index 0000000000..061fee63a3 --- /dev/null +++ b/src/Microsoft.Data.SqlClient/src/Microsoft/Data/SqlClient/RateLimiter/ConcurrencyRateLimiter.cs @@ -0,0 +1,73 @@ +// Licensed to the .NET Foundation under one or more agreements. +// The .NET Foundation licenses this file to you under the MIT license. +// See the LICENSE file in the project root for more information. + +using System.Threading; +using System.Threading.Tasks; + +#nullable enable + +namespace Microsoft.Data.SqlClient.RateLimiter +{ + /// + /// A rate limiter that enforces a concurrency limit. + /// When the limit is reached, new requests must wait until a spot is freed upon completion of an existing request. + /// + internal class ConcurrencyRateLimiter : RateLimiterBase + { + private readonly SemaphoreSlim _concurrencyLimitSemaphore; + + /// + /// Initializes a new ConcurrencyRateLimiter with the specified concurrency limit. + /// + /// The maximum number of concurrent requests. + /// The next rate limiter to apply. + internal ConcurrencyRateLimiter(int concurrencyLimit, RateLimiterBase? next = null) : base(next) + { + _concurrencyLimitSemaphore = new SemaphoreSlim(concurrencyLimit); + } + + /// + internal sealed override async Task Execute( + AsyncFlagFunc> callback, + State state, + bool isAsync, + CancellationToken cancellationToken = default) + { + cancellationToken.ThrowIfCancellationRequested(); + + //TODO: in the future, we can enforce order + if (isAsync) + { + await _concurrencyLimitSemaphore.WaitAsync(cancellationToken).ConfigureAwait(false); + } + else + { + _concurrencyLimitSemaphore.Wait(cancellationToken); + } + + try + { + cancellationToken.ThrowIfCancellationRequested(); + if (Next != null) + { + return await Next.Execute(callback, state, isAsync, cancellationToken).ConfigureAwait(false); + } + else + { + return await callback(state, isAsync, cancellationToken).ConfigureAwait(false); + } + } + finally + { + _concurrencyLimitSemaphore.Release(); + } + } + + public override void Dispose() + { + _concurrencyLimitSemaphore.Dispose(); + Next?.Dispose(); + } + } +} diff --git a/src/Microsoft.Data.SqlClient/src/Microsoft/Data/SqlClient/RateLimiter/PassthroughRateLimiter.cs b/src/Microsoft.Data.SqlClient/src/Microsoft/Data/SqlClient/RateLimiter/PassthroughRateLimiter.cs new file mode 100644 index 0000000000..8cbfc14282 --- /dev/null +++ b/src/Microsoft.Data.SqlClient/src/Microsoft/Data/SqlClient/RateLimiter/PassthroughRateLimiter.cs @@ -0,0 +1,44 @@ +// Licensed to the .NET Foundation under one or more agreements. +// The .NET Foundation licenses this file to you under the MIT license. +// See the LICENSE file in the project root for more information. + +using System.Threading; +using System.Threading.Tasks; + +#nullable enable +namespace Microsoft.Data.SqlClient.RateLimiter +{ + /// + /// A no-op rate limiter that simply executes the callback or passes through to the next rate limiter. + /// + internal sealed class PassthroughRateLimiter : RateLimiterBase + { + //TODO: no state, add static instance + + internal PassthroughRateLimiter(RateLimiterBase? next = null) : base(next) + { + } + + /// + internal override Task Execute( + AsyncFlagFunc> callback, + State state, + bool isAsync, + CancellationToken cancellationToken = default) + { + if (Next != null) + { + return Next.Execute(callback, state, isAsync, cancellationToken); + } + else + { + return callback(state, isAsync, cancellationToken); + } + } + + public override void Dispose() + { + Next?.Dispose(); + } + } +} diff --git a/src/Microsoft.Data.SqlClient/src/Microsoft/Data/SqlClient/RateLimiter/RateLimiterBase.cs b/src/Microsoft.Data.SqlClient/src/Microsoft/Data/SqlClient/RateLimiter/RateLimiterBase.cs new file mode 100644 index 0000000000..e6fc946f7d --- /dev/null +++ b/src/Microsoft.Data.SqlClient/src/Microsoft/Data/SqlClient/RateLimiter/RateLimiterBase.cs @@ -0,0 +1,47 @@ +// Licensed to the .NET Foundation under one or more agreements. +// The .NET Foundation licenses this file to you under the MIT license. +// See the LICENSE file in the project root for more information. + +using System; +using System.Threading; +using System.Threading.Tasks; + +#nullable enable +namespace Microsoft.Data.SqlClient.RateLimiter +{ + /// + /// An interface for rate limiters that execute arbitraty code. Intended to be small and self contained and chained together to achieve more complex behavior. + /// + internal abstract class RateLimiterBase : IDisposable + { + + /// + /// The next rate limiter that should be executed within the context of this rate limiter. + /// + private RateLimiterBase? _next; + protected RateLimiterBase? Next => _next; + + internal RateLimiterBase(RateLimiterBase? next = null) + { + _next = next; + } + + /// + /// Execute the provided callback within the context of the rate limit, or pass the responsibility along to the next rate limiter. + /// + /// The type accepted by the callback as input. + /// The type of the result returned by the callback. + /// The callback function to execute. + /// An instance of State to be passed to the callback. + /// Whether this method should run asynchronously. + /// Cancels outstanding requests. + /// Returns the result of the callback or the next rate limiter. + internal abstract Task Execute( + AsyncFlagFunc> callback, + State state, + bool isAsync, + CancellationToken cancellationToken = default); + + public abstract void Dispose(); + } +} diff --git a/src/Microsoft.Data.SqlClient/tests/ManualTests/SQL/Common/SystemDataInternals/ConnectionPoolHelper.cs b/src/Microsoft.Data.SqlClient/tests/ManualTests/SQL/Common/SystemDataInternals/ConnectionPoolHelper.cs index e930f437e9..d5d3c90867 100644 --- a/src/Microsoft.Data.SqlClient/tests/ManualTests/SQL/Common/SystemDataInternals/ConnectionPoolHelper.cs +++ b/src/Microsoft.Data.SqlClient/tests/ManualTests/SQL/Common/SystemDataInternals/ConnectionPoolHelper.cs @@ -15,7 +15,7 @@ internal static class ConnectionPoolHelper { private static Assembly s_MicrosoftDotData = Assembly.Load(new AssemblyName(typeof(SqlConnection).GetTypeInfo().Assembly.FullName)); private static Type s_dbConnectionPool = s_MicrosoftDotData.GetType("Microsoft.Data.SqlClient.ConnectionPool.DbConnectionPool"); - private static Type s_waitHandleDbConnectionPool = s_MicrosoftDotData.GetType("Microsoft.Data.SqlClient.ConnectionPool.WaitHandleDbConnectionPool"); + private static Type s_waitHandleDbConnectionPool = s_MicrosoftDotData.GetType("Microsoft.Data.SqlClient.ConnectionPool.ChannelDbConnectionPool"); private static Type s_dbConnectionPoolGroup = s_MicrosoftDotData.GetType("Microsoft.Data.SqlClient.ConnectionPool.DbConnectionPoolGroup"); private static Type s_dbConnectionPoolIdentity = s_MicrosoftDotData.GetType("Microsoft.Data.SqlClient.ConnectionPool.DbConnectionPoolIdentity"); private static Type s_dbConnectionFactory = s_MicrosoftDotData.GetType("Microsoft.Data.ProviderBase.DbConnectionFactory"); diff --git a/tools/props/Versions.props b/tools/props/Versions.props index d4e2619f3b..1818e5a0fb 100644 --- a/tools/props/Versions.props +++ b/tools/props/Versions.props @@ -29,6 +29,7 @@ 8.0.0 8.0.5 4.3.0 + 9.0.4 diff --git a/tools/specs/Microsoft.Data.SqlClient.nuspec b/tools/specs/Microsoft.Data.SqlClient.nuspec index 90f05cf0e2..cc3280c406 100644 --- a/tools/specs/Microsoft.Data.SqlClient.nuspec +++ b/tools/specs/Microsoft.Data.SqlClient.nuspec @@ -40,6 +40,7 @@ When using NuGet 3.x this package requires at least version 3.4. +