From af89891d8d18cff5ea507f93168fd5236606a841 Mon Sep 17 00:00:00 2001 From: Joey de Waal Date: Fri, 20 Sep 2024 21:15:29 +0200 Subject: [PATCH 1/5] Spawn a task when calling acquire on sqlx::Pool --- sqlx-core/src/pool/inner.rs | 10 +++++++++- sqlx-core/src/pool/options.rs | 3 +++ 2 files changed, 12 insertions(+), 1 deletion(-) diff --git a/sqlx-core/src/pool/inner.rs b/sqlx-core/src/pool/inner.rs index bbcc43134e..8ed0592be3 100644 --- a/sqlx-core/src/pool/inner.rs +++ b/sqlx-core/src/pool/inner.rs @@ -290,8 +290,16 @@ impl PoolInner { } }; + // Attempt to connect... - return self.connect(deadline, guard).await; + let pool = self.clone(); + return crate::rt::spawn(async move { + crate::rt::timeout(pool.options.connect_timeout, async move { + pool.connect(deadline, guard).await + }) + }).await + .await + .map_err(|_| Error::PoolTimedOut)?; } } ) diff --git a/sqlx-core/src/pool/options.rs b/sqlx-core/src/pool/options.rs index 96dbf8ee3d..106c308740 100644 --- a/sqlx-core/src/pool/options.rs +++ b/sqlx-core/src/pool/options.rs @@ -79,6 +79,7 @@ pub struct PoolOptions { pub(crate) acquire_slow_level: LevelFilter, pub(crate) acquire_slow_threshold: Duration, pub(crate) acquire_timeout: Duration, + pub(crate) connect_timeout: Duration, pub(crate) min_connections: u32, pub(crate) max_lifetime: Option, pub(crate) idle_timeout: Option, @@ -102,6 +103,7 @@ impl Clone for PoolOptions { acquire_slow_threshold: self.acquire_slow_threshold, acquire_slow_level: self.acquire_slow_level, acquire_timeout: self.acquire_timeout, + connect_timeout: self.connect_timeout, min_connections: self.min_connections, max_lifetime: self.max_lifetime, idle_timeout: self.idle_timeout, @@ -158,6 +160,7 @@ impl PoolOptions { // to not flag typical time to add a new connection to a pool. acquire_slow_threshold: Duration::from_secs(2), acquire_timeout: Duration::from_secs(30), + connect_timeout: Duration::from_secs(30), idle_timeout: Some(Duration::from_secs(10 * 60)), max_lifetime: Some(Duration::from_secs(30 * 60)), fair: true, From 6e724694ba10597eaaf2e4e3e23f1fe3947b0b8e Mon Sep 17 00:00:00 2001 From: Joey de Waal Date: Fri, 20 Sep 2024 22:31:34 +0200 Subject: [PATCH 2/5] remove space --- sqlx-core/src/pool/inner.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/sqlx-core/src/pool/inner.rs b/sqlx-core/src/pool/inner.rs index 8ed0592be3..cc3f04c9b8 100644 --- a/sqlx-core/src/pool/inner.rs +++ b/sqlx-core/src/pool/inner.rs @@ -290,7 +290,6 @@ impl PoolInner { } }; - // Attempt to connect... let pool = self.clone(); return crate::rt::spawn(async move { From 9e03327529812c44c4a199d981c632903fc036c5 Mon Sep 17 00:00:00 2001 From: Joey de Waal Date: Fri, 20 Sep 2024 22:48:00 +0200 Subject: [PATCH 3/5] fix clippy warning --- sqlx-core/src/pool/inner.rs | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/sqlx-core/src/pool/inner.rs b/sqlx-core/src/pool/inner.rs index cc3f04c9b8..2efc8e8b56 100644 --- a/sqlx-core/src/pool/inner.rs +++ b/sqlx-core/src/pool/inner.rs @@ -295,9 +295,8 @@ impl PoolInner { return crate::rt::spawn(async move { crate::rt::timeout(pool.options.connect_timeout, async move { pool.connect(deadline, guard).await - }) + }).await }).await - .await .map_err(|_| Error::PoolTimedOut)?; } } From e32c121cd5fb184d15b4afb5113bead79102d300 Mon Sep 17 00:00:00 2001 From: Joey de Waal Date: Sat, 21 Sep 2024 18:13:02 +0200 Subject: [PATCH 4/5] Pool: check idle connections while opening a new one --- sqlx-core/src/pool/inner.rs | 81 +++++++++++++++++++++++++++---------- 1 file changed, 60 insertions(+), 21 deletions(-) diff --git a/sqlx-core/src/pool/inner.rs b/sqlx-core/src/pool/inner.rs index 2efc8e8b56..49cf6f59b2 100644 --- a/sqlx-core/src/pool/inner.rs +++ b/sqlx-core/src/pool/inner.rs @@ -5,6 +5,7 @@ use crate::database::Database; use crate::error::Error; use crate::pool::{deadline_as_timeout, CloseEvent, Pool, PoolOptions}; use crossbeam_queue::ArrayQueue; +use either::Either; use crate::sync::{AsyncSemaphore, AsyncSemaphoreReleaser}; @@ -251,7 +252,7 @@ impl PoolInner { } let acquire_started_at = Instant::now(); - let deadline = acquire_started_at + self.options.acquire_timeout; + let connect_deadline = acquire_started_at + self.options.connect_timeout; let acquired = crate::rt::timeout( self.options.acquire_timeout, @@ -260,21 +261,18 @@ impl PoolInner { // Handles the close-event internally let permit = self.acquire_permit().await?; + // First attempt to pop a connection from the idle queue and check if we can + // use it. + let guard = match self.get_live_idle(permit).await { + // All good! + Ok(Either::Left(conn)) => return Ok(conn), - // First attempt to pop a connection from the idle queue. - let guard = match self.pop_idle(permit) { + // if the connection isn't usable for one reason or another, + // we get the `DecrementSizeGuard` back to open a new one + Err(guard) => guard, - // Then, check that we can use it... - Ok(conn) => match check_idle_conn(conn, &self.options).await { - - // All good! - Ok(live) => return Ok(live), - - // if the connection isn't usable for one reason or another, - // we get the `DecrementSizeGuard` back to open a new one - Err(guard) => guard, - }, - Err(permit) => if let Ok(guard) = self.try_increment_size(permit) { + // we can open a new connection + Ok(Either::Right(permit)) => if let Ok(guard) = self.try_increment_size(permit) { // we can open a new connection guard } else { @@ -290,14 +288,19 @@ impl PoolInner { } }; - // Attempt to connect... + let pool = self.clone(); - return crate::rt::spawn(async move { - crate::rt::timeout(pool.options.connect_timeout, async move { - pool.connect(deadline, guard).await - }).await - }).await - .map_err(|_| Error::PoolTimedOut)?; + let pool2 = self.clone(); + + // Future that tries to get a live idle connection. + let idle_fut = std::pin::pin!(pool.get_idle_conn(connect_deadline.clone())); + + // Future that tries to open a new connection. + let conn_fut = crate::rt::spawn(async move { + pool2.connect(connect_deadline, guard).await + }); + + return future::select(idle_fut, conn_fut).await.factor_first().0; } } ) @@ -404,6 +407,28 @@ impl PoolInner { } } + // Tries to get a live connection that was idle in a loop. + pub async fn get_idle_conn( + self: &Arc, + deadline: Instant, + ) -> Result>, Error> { + + let mut backoff = Duration::from_millis(10); + let max_backoff = deadline_as_timeout(deadline)? / 5; + + loop { + let new_permit = self.acquire_permit().await?; + match self.get_live_idle(new_permit).await { + Ok(Either::Left(live)) => { + return Ok(live) + }, + _ => (), + }; + crate::rt::sleep(backoff).await; + backoff = cmp::min(backoff * 2, max_backoff); + } + } + /// Try to maintain `min_connections`, returning any errors (including `PoolTimedOut`). pub async fn try_min_connections(self: &Arc, deadline: Instant) -> Result<(), Error> { while self.size() < self.options.min_connections { @@ -428,6 +453,20 @@ impl PoolInner { Ok(()) } + // Tries to get a live idle connection. + pub async fn get_live_idle<'a>(self: &'a Arc, permit: AsyncSemaphoreReleaser<'a>) -> Result>, AsyncSemaphoreReleaser<'a>>, DecrementSizeGuard> { + match self.pop_idle(permit) { + Ok(conn) => match check_idle_conn(conn, &self.options).await { + // All good! + Ok(live) => Ok(Either::Left(live)), + // if the connection isn't usable for one reason or another, + // we get the `DecrementSizeGuard` back to open a new one + Err(guard) => Err(guard) + }, + Err(permit) => Ok(Either::Right(permit)) + } + } + /// Attempt to maintain `min_connections`, logging if unable. pub async fn min_connections_maintenance(self: &Arc, deadline: Option) { let deadline = deadline.unwrap_or_else(|| { From a4e16d202fdde90eb5018571d58e5a5c27617550 Mon Sep 17 00:00:00 2001 From: Joey de Waal Date: Sat, 21 Sep 2024 18:35:09 +0200 Subject: [PATCH 5/5] fix clippy warnings --- sqlx-core/src/pool/inner.rs | 38 ++++++++++++++++++------------------- 1 file changed, 19 insertions(+), 19 deletions(-) diff --git a/sqlx-core/src/pool/inner.rs b/sqlx-core/src/pool/inner.rs index 49cf6f59b2..47a4196618 100644 --- a/sqlx-core/src/pool/inner.rs +++ b/sqlx-core/src/pool/inner.rs @@ -293,7 +293,7 @@ impl PoolInner { let pool2 = self.clone(); // Future that tries to get a live idle connection. - let idle_fut = std::pin::pin!(pool.get_idle_conn(connect_deadline.clone())); + let idle_fut = std::pin::pin!(pool.get_idle_conn(connect_deadline)); // Future that tries to open a new connection. let conn_fut = crate::rt::spawn(async move { @@ -408,21 +408,17 @@ impl PoolInner { } // Tries to get a live connection that was idle in a loop. - pub async fn get_idle_conn( + async fn get_idle_conn( self: &Arc, deadline: Instant, ) -> Result>, Error> { - let mut backoff = Duration::from_millis(10); let max_backoff = deadline_as_timeout(deadline)? / 5; loop { let new_permit = self.acquire_permit().await?; - match self.get_live_idle(new_permit).await { - Ok(Either::Left(live)) => { - return Ok(live) - }, - _ => (), + if let Ok(Either::Left(live)) = self.get_live_idle(new_permit).await { + return Ok(live); }; crate::rt::sleep(backoff).await; backoff = cmp::min(backoff * 2, max_backoff); @@ -454,17 +450,21 @@ impl PoolInner { } // Tries to get a live idle connection. - pub async fn get_live_idle<'a>(self: &'a Arc, permit: AsyncSemaphoreReleaser<'a>) -> Result>, AsyncSemaphoreReleaser<'a>>, DecrementSizeGuard> { - match self.pop_idle(permit) { - Ok(conn) => match check_idle_conn(conn, &self.options).await { - // All good! - Ok(live) => Ok(Either::Left(live)), - // if the connection isn't usable for one reason or another, - // we get the `DecrementSizeGuard` back to open a new one - Err(guard) => Err(guard) - }, - Err(permit) => Ok(Either::Right(permit)) - } + async fn get_live_idle<'a>( + self: &'a Arc, + permit: AsyncSemaphoreReleaser<'a>, + ) -> Result>, AsyncSemaphoreReleaser<'a>>, DecrementSizeGuard> + { + match self.pop_idle(permit) { + Ok(conn) => match check_idle_conn(conn, &self.options).await { + // All good! + Ok(live) => Ok(Either::Left(live)), + // if the connection isn't usable for one reason or another, + // we get the `DecrementSizeGuard` back to open a new one + Err(guard) => Err(guard), + }, + Err(permit) => Ok(Either::Right(permit)), + } } /// Attempt to maintain `min_connections`, logging if unable.