Skip to content

Commit d600ec0

Browse files
authored
feat(logging): Log acquires from connection pool (#3073)
Signed-off-by: Joshua Potts <8704475+iamjpotts@users.noreply.github.com>
1 parent e0a72cf commit d600ec0

File tree

4 files changed

+92
-6
lines changed

4 files changed

+92
-6
lines changed

sqlx-core/src/logger.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,12 @@ pub fn private_level_filter_to_levels(
5454
tracing_level.zip(filter.to_level())
5555
}
5656

57+
pub(crate) fn private_level_filter_to_trace_level(
58+
filter: log::LevelFilter,
59+
) -> Option<tracing::Level> {
60+
private_level_filter_to_levels(filter).map(|(level, _)| level)
61+
}
62+
5763
pub use sqlformat;
5864

5965
pub struct QueryLogger<'q> {

sqlx-core/src/pool/inner.rs

Lines changed: 37 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -14,10 +14,13 @@ use std::sync::atomic::{AtomicBool, AtomicU32, AtomicUsize, Ordering};
1414
use std::sync::{Arc, RwLock};
1515
use std::task::Poll;
1616

17+
use crate::logger::private_level_filter_to_trace_level;
1718
use crate::pool::options::PoolConnectionMetadata;
19+
use crate::private_tracing_dynamic_event;
1820
use futures_util::future::{self};
1921
use futures_util::FutureExt;
2022
use std::time::{Duration, Instant};
23+
use tracing::Level;
2124

2225
pub(crate) struct PoolInner<DB: Database> {
2326
pub(super) connect_options: RwLock<Arc<<DB::Connection as Connection>::Options>>,
@@ -28,6 +31,8 @@ pub(crate) struct PoolInner<DB: Database> {
2831
is_closed: AtomicBool,
2932
pub(super) on_closed: event_listener::Event,
3033
pub(super) options: PoolOptions<DB>,
34+
pub(crate) acquire_time_level: Option<Level>,
35+
pub(crate) acquire_slow_level: Option<Level>,
3136
}
3237

3338
impl<DB: Database> PoolInner<DB> {
@@ -54,6 +59,8 @@ impl<DB: Database> PoolInner<DB> {
5459
num_idle: AtomicUsize::new(0),
5560
is_closed: AtomicBool::new(false),
5661
on_closed: event_listener::Event::new(),
62+
acquire_time_level: private_level_filter_to_trace_level(options.acquire_time_level),
63+
acquire_slow_level: private_level_filter_to_trace_level(options.acquire_slow_level),
5764
options,
5865
};
5966

@@ -241,9 +248,10 @@ impl<DB: Database> PoolInner<DB> {
241248
return Err(Error::PoolClosed);
242249
}
243250

244-
let deadline = Instant::now() + self.options.acquire_timeout;
251+
let acquire_started_at = Instant::now();
252+
let deadline = acquire_started_at + self.options.acquire_timeout;
245253

246-
crate::rt::timeout(
254+
let acquired = crate::rt::timeout(
247255
self.options.acquire_timeout,
248256
async {
249257
loop {
@@ -272,7 +280,7 @@ impl<DB: Database> PoolInner<DB> {
272280
// or if the pool was closed between `acquire_permit()` and
273281
// `try_increment_size()`.
274282
tracing::debug!("woke but was unable to acquire idle connection or open new one; retrying");
275-
// If so, we're likely in the current-thread runtime if it's Tokio
283+
// If so, we're likely in the current-thread runtime if it's Tokio,
276284
// and so we should yield to let any spawned return_to_pool() tasks
277285
// execute.
278286
crate::rt::yield_now().await;
@@ -286,7 +294,32 @@ impl<DB: Database> PoolInner<DB> {
286294
}
287295
)
288296
.await
289-
.map_err(|_| Error::PoolTimedOut)?
297+
.map_err(|_| Error::PoolTimedOut)??;
298+
299+
let acquired_after = acquire_started_at.elapsed();
300+
301+
let acquire_slow_level = self
302+
.acquire_slow_level
303+
.filter(|_| acquired_after > self.options.acquire_slow_threshold);
304+
305+
if let Some(level) = acquire_slow_level {
306+
private_tracing_dynamic_event!(
307+
target: "sqlx::pool::acquire",
308+
level,
309+
aquired_after_secs = acquired_after.as_secs_f64(),
310+
slow_acquire_threshold_secs = self.options.acquire_slow_threshold.as_secs_f64(),
311+
"acquired connection, but time to acquire exceeded slow threshold"
312+
);
313+
} else if let Some(level) = self.acquire_time_level {
314+
private_tracing_dynamic_event!(
315+
target: "sqlx::pool::acquire",
316+
level,
317+
aquired_after_secs = acquired_after.as_secs_f64(),
318+
"acquired connection"
319+
);
320+
}
321+
322+
Ok(acquired)
290323
}
291324

292325
pub(super) async fn connect(

sqlx-core/src/pool/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -230,7 +230,7 @@ pub use self::maybe::MaybePoolConnection;
230230
/// scheme. However, in a web context, telling a client "go away, maybe try again later" results in
231231
/// a sub-optimal user experience.
232232
///
233-
/// Instead with a connection pool, clients are made to wait in a fair queue for a connection to
233+
/// Instead, with a connection pool, clients are made to wait in a fair queue for a connection to
234234
/// become available; by using a single connection pool for your whole application, you can ensure
235235
/// that you don't exceed the connection limit of your database server while allowing response
236236
/// time to degrade gracefully at high load.

sqlx-core/src/pool/options.rs

Lines changed: 48 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ use crate::error::Error;
44
use crate::pool::inner::PoolInner;
55
use crate::pool::Pool;
66
use futures_core::future::BoxFuture;
7+
use log::LevelFilter;
78
use std::fmt::{self, Debug, Formatter};
89
use std::sync::Arc;
910
use std::time::{Duration, Instant};
@@ -74,6 +75,9 @@ pub struct PoolOptions<DB: Database> {
7475
>,
7576
>,
7677
pub(crate) max_connections: u32,
78+
pub(crate) acquire_time_level: LevelFilter,
79+
pub(crate) acquire_slow_level: LevelFilter,
80+
pub(crate) acquire_slow_threshold: Duration,
7781
pub(crate) acquire_timeout: Duration,
7882
pub(crate) min_connections: u32,
7983
pub(crate) max_lifetime: Option<Duration>,
@@ -94,6 +98,9 @@ impl<DB: Database> Clone for PoolOptions<DB> {
9498
before_acquire: self.before_acquire.clone(),
9599
after_release: self.after_release.clone(),
96100
max_connections: self.max_connections,
101+
acquire_time_level: self.acquire_time_level,
102+
acquire_slow_threshold: self.acquire_slow_threshold,
103+
acquire_slow_level: self.acquire_slow_level,
97104
acquire_timeout: self.acquire_timeout,
98105
min_connections: self.min_connections,
99106
max_lifetime: self.max_lifetime,
@@ -143,6 +150,13 @@ impl<DB: Database> PoolOptions<DB> {
143150
// A production application will want to set a higher limit than this.
144151
max_connections: 10,
145152
min_connections: 0,
153+
// Logging all acquires is opt-in
154+
acquire_time_level: LevelFilter::Off,
155+
// Default to warning, because an acquire timeout will be an error
156+
acquire_slow_level: LevelFilter::Warn,
157+
// Fast enough to catch problems (e.g. a full pool); slow enough
158+
// to not flag typical time to add a new connection to a pool.
159+
acquire_slow_threshold: Duration::from_secs(2),
146160
acquire_timeout: Duration::from_secs(30),
147161
idle_timeout: Some(Duration::from_secs(10 * 60)),
148162
max_lifetime: Some(Duration::from_secs(30 * 60)),
@@ -198,6 +212,39 @@ impl<DB: Database> PoolOptions<DB> {
198212
self.min_connections
199213
}
200214

215+
/// Enable logging of time taken to acquire a connection from the connection pool via
216+
/// [`Pool::acquire()`].
217+
///
218+
/// If slow acquire logging is also enabled, this level is used for acquires that are not
219+
/// considered slow.
220+
pub fn acquire_time_level(mut self, level: LevelFilter) -> Self {
221+
self.acquire_time_level = level;
222+
self
223+
}
224+
225+
/// Log excessive time taken to acquire a connection at a different log level than time taken
226+
/// for faster connection acquires via [`Pool::acquire()`].
227+
pub fn acquire_slow_level(mut self, level: LevelFilter) -> Self {
228+
self.acquire_slow_level = level;
229+
self
230+
}
231+
232+
/// Set a threshold for reporting excessive time taken to acquire a connection from
233+
/// the connection pool via [`Pool::acquire()`]. When the threshold is exceeded, a warning is logged.
234+
///
235+
/// Defaults to a value that should not typically be exceeded by the pool enlarging
236+
/// itself with an additional new connection.
237+
pub fn acquire_slow_threshold(mut self, threshold: Duration) -> Self {
238+
self.acquire_slow_threshold = threshold;
239+
self
240+
}
241+
242+
/// Get the threshold for reporting excessive time taken to acquire a connection via
243+
/// [`Pool::acquire()`].
244+
pub fn get_acquire_slow_threshold(&self) -> Duration {
245+
self.acquire_slow_threshold
246+
}
247+
201248
/// Set the maximum amount of time to spend waiting for a connection in [`Pool::acquire()`].
202249
///
203250
/// Caps the total amount of time `Pool::acquire()` can spend waiting across multiple phases:
@@ -269,7 +316,7 @@ impl<DB: Database> PoolOptions<DB> {
269316
self
270317
}
271318

272-
/// Get's whether `test_before_acquire` is currently set.
319+
/// Get whether `test_before_acquire` is currently set.
273320
pub fn get_test_before_acquire(&self) -> bool {
274321
self.test_before_acquire
275322
}

0 commit comments

Comments
 (0)