diff --git a/sqlx-core/src/pool/inner.rs b/sqlx-core/src/pool/inner.rs index bbcc43134e..47a4196618 100644 --- a/sqlx-core/src/pool/inner.rs +++ b/sqlx-core/src/pool/inner.rs @@ -5,6 +5,7 @@ use crate::database::Database; use crate::error::Error; use crate::pool::{deadline_as_timeout, CloseEvent, Pool, PoolOptions}; use crossbeam_queue::ArrayQueue; +use either::Either; use crate::sync::{AsyncSemaphore, AsyncSemaphoreReleaser}; @@ -251,7 +252,7 @@ impl PoolInner { } let acquire_started_at = Instant::now(); - let deadline = acquire_started_at + self.options.acquire_timeout; + let connect_deadline = acquire_started_at + self.options.connect_timeout; let acquired = crate::rt::timeout( self.options.acquire_timeout, @@ -260,21 +261,18 @@ impl PoolInner { // Handles the close-event internally let permit = self.acquire_permit().await?; + // First attempt to pop a connection from the idle queue and check if we can + // use it. + let guard = match self.get_live_idle(permit).await { + // All good! + Ok(Either::Left(conn)) => return Ok(conn), - // First attempt to pop a connection from the idle queue. - let guard = match self.pop_idle(permit) { + // if the connection isn't usable for one reason or another, + // we get the `DecrementSizeGuard` back to open a new one + Err(guard) => guard, - // Then, check that we can use it... - Ok(conn) => match check_idle_conn(conn, &self.options).await { - - // All good! - Ok(live) => return Ok(live), - - // if the connection isn't usable for one reason or another, - // we get the `DecrementSizeGuard` back to open a new one - Err(guard) => guard, - }, - Err(permit) => if let Ok(guard) = self.try_increment_size(permit) { + // we can open a new connection + Ok(Either::Right(permit)) => if let Ok(guard) = self.try_increment_size(permit) { // we can open a new connection guard } else { @@ -290,8 +288,19 @@ impl PoolInner { } }; - // Attempt to connect... - return self.connect(deadline, guard).await; + + let pool = self.clone(); + let pool2 = self.clone(); + + // Future that tries to get a live idle connection. + let idle_fut = std::pin::pin!(pool.get_idle_conn(connect_deadline)); + + // Future that tries to open a new connection. + let conn_fut = crate::rt::spawn(async move { + pool2.connect(connect_deadline, guard).await + }); + + return future::select(idle_fut, conn_fut).await.factor_first().0; } } ) @@ -398,6 +407,24 @@ impl PoolInner { } } + // Tries to get a live connection that was idle in a loop. + async fn get_idle_conn( + self: &Arc, + deadline: Instant, + ) -> Result>, Error> { + let mut backoff = Duration::from_millis(10); + let max_backoff = deadline_as_timeout(deadline)? / 5; + + loop { + let new_permit = self.acquire_permit().await?; + if let Ok(Either::Left(live)) = self.get_live_idle(new_permit).await { + return Ok(live); + }; + crate::rt::sleep(backoff).await; + backoff = cmp::min(backoff * 2, max_backoff); + } + } + /// Try to maintain `min_connections`, returning any errors (including `PoolTimedOut`). pub async fn try_min_connections(self: &Arc, deadline: Instant) -> Result<(), Error> { while self.size() < self.options.min_connections { @@ -422,6 +449,24 @@ impl PoolInner { Ok(()) } + // Tries to get a live idle connection. + async fn get_live_idle<'a>( + self: &'a Arc, + permit: AsyncSemaphoreReleaser<'a>, + ) -> Result>, AsyncSemaphoreReleaser<'a>>, DecrementSizeGuard> + { + match self.pop_idle(permit) { + Ok(conn) => match check_idle_conn(conn, &self.options).await { + // All good! + Ok(live) => Ok(Either::Left(live)), + // if the connection isn't usable for one reason or another, + // we get the `DecrementSizeGuard` back to open a new one + Err(guard) => Err(guard), + }, + Err(permit) => Ok(Either::Right(permit)), + } + } + /// Attempt to maintain `min_connections`, logging if unable. pub async fn min_connections_maintenance(self: &Arc, deadline: Option) { let deadline = deadline.unwrap_or_else(|| { diff --git a/sqlx-core/src/pool/options.rs b/sqlx-core/src/pool/options.rs index 96dbf8ee3d..106c308740 100644 --- a/sqlx-core/src/pool/options.rs +++ b/sqlx-core/src/pool/options.rs @@ -79,6 +79,7 @@ pub struct PoolOptions { pub(crate) acquire_slow_level: LevelFilter, pub(crate) acquire_slow_threshold: Duration, pub(crate) acquire_timeout: Duration, + pub(crate) connect_timeout: Duration, pub(crate) min_connections: u32, pub(crate) max_lifetime: Option, pub(crate) idle_timeout: Option, @@ -102,6 +103,7 @@ impl Clone for PoolOptions { acquire_slow_threshold: self.acquire_slow_threshold, acquire_slow_level: self.acquire_slow_level, acquire_timeout: self.acquire_timeout, + connect_timeout: self.connect_timeout, min_connections: self.min_connections, max_lifetime: self.max_lifetime, idle_timeout: self.idle_timeout, @@ -158,6 +160,7 @@ impl PoolOptions { // to not flag typical time to add a new connection to a pool. acquire_slow_threshold: Duration::from_secs(2), acquire_timeout: Duration::from_secs(30), + connect_timeout: Duration::from_secs(30), idle_timeout: Some(Duration::from_secs(10 * 60)), max_lifetime: Some(Duration::from_secs(30 * 60)), fair: true,