@@ -2,6 +2,7 @@ use conduit::RequestExt;
2
2
use diesel:: prelude:: * ;
3
3
use diesel:: r2d2:: { self , ConnectionManager , CustomizeConnection } ;
4
4
use parking_lot:: { ReentrantMutex , ReentrantMutexGuard } ;
5
+ use prometheus:: Histogram ;
5
6
use std:: sync:: Arc ;
6
7
use std:: { ops:: Deref , time:: Duration } ;
7
8
use url:: Url ;
@@ -10,14 +11,18 @@ use crate::middleware::app::RequestApp;
10
11
11
12
#[ derive( Clone ) ]
12
13
pub enum DieselPool {
13
- Pool ( r2d2:: Pool < ConnectionManager < PgConnection > > ) ,
14
+ Pool {
15
+ pool : r2d2:: Pool < ConnectionManager < PgConnection > > ,
16
+ time_to_obtain_connection_metric : Histogram ,
17
+ } ,
14
18
Test ( Arc < ReentrantMutex < PgConnection > > ) ,
15
19
}
16
20
17
21
impl DieselPool {
18
22
pub ( crate ) fn new (
19
23
url : & str ,
20
24
config : r2d2:: Builder < ConnectionManager < PgConnection > > ,
25
+ time_to_obtain_connection_metric : Histogram ,
21
26
) -> Result < DieselPool , PoolError > {
22
27
let manager = ConnectionManager :: new ( connection_url ( url) ) ;
23
28
@@ -32,7 +37,10 @@ impl DieselPool {
32
37
// serving errors for the first connections until the pool is initialized) and if we can't
33
38
// establish any connection continue booting up the application. The database pool will
34
39
// automatically be marked as unhealthy and the rest of the application will adapt.
35
- let pool = DieselPool :: Pool ( config. build_unchecked ( manager) ) ;
40
+ let pool = DieselPool :: Pool {
41
+ pool : config. build_unchecked ( manager) ,
42
+ time_to_obtain_connection_metric,
43
+ } ;
36
44
match pool. wait_until_healthy ( Duration :: from_secs ( 5 ) ) {
37
45
Ok ( ( ) ) => { }
38
46
Err ( PoolError :: UnhealthyPool ) => { }
@@ -52,22 +60,25 @@ impl DieselPool {
52
60
53
61
pub fn get ( & self ) -> Result < DieselPooledConn < ' _ > , PoolError > {
54
62
match self {
55
- DieselPool :: Pool ( pool) => {
63
+ DieselPool :: Pool {
64
+ pool,
65
+ time_to_obtain_connection_metric,
66
+ } => time_to_obtain_connection_metric. observe_closure_duration ( || {
56
67
if let Some ( conn) = pool. try_get ( ) {
57
68
Ok ( DieselPooledConn :: Pool ( conn) )
58
69
} else if !self . is_healthy ( ) {
59
70
Err ( PoolError :: UnhealthyPool )
60
71
} else {
61
72
Ok ( DieselPooledConn :: Pool ( pool. get ( ) . map_err ( PoolError :: R2D2 ) ?) )
62
73
}
63
- }
74
+ } ) ,
64
75
DieselPool :: Test ( conn) => Ok ( DieselPooledConn :: Test ( conn. lock ( ) ) ) ,
65
76
}
66
77
}
67
78
68
79
pub fn state ( & self ) -> PoolState {
69
80
match self {
70
- DieselPool :: Pool ( pool) => {
81
+ DieselPool :: Pool { pool, .. } => {
71
82
let state = pool. state ( ) ;
72
83
PoolState {
73
84
connections : state. connections ,
@@ -83,7 +94,7 @@ impl DieselPool {
83
94
84
95
pub fn wait_until_healthy ( & self , timeout : Duration ) -> Result < ( ) , PoolError > {
85
96
match self {
86
- DieselPool :: Pool ( pool) => match pool. get_timeout ( timeout) {
97
+ DieselPool :: Pool { pool, .. } => match pool. get_timeout ( timeout) {
87
98
Ok ( _) => Ok ( ( ) ) ,
88
99
Err ( _) if !self . is_healthy ( ) => Err ( PoolError :: UnhealthyPool ) ,
89
100
Err ( err) => Err ( PoolError :: R2D2 ( err) ) ,
0 commit comments