@@ -17,7 +17,7 @@ use crate::rt::JoinHandle;
17
17
use crate :: { private_tracing_dynamic_event, rt} ;
18
18
use either:: Either ;
19
19
use futures_util:: future:: { self , OptionFuture } ;
20
- use futures_util:: FutureExt ;
20
+ use futures_util:: { select , FutureExt } ;
21
21
use std:: time:: { Duration , Instant } ;
22
22
use tracing:: Level ;
23
23
@@ -75,12 +75,18 @@ impl<DB: Database> PoolInner<DB> {
75
75
pub ( super ) fn close < ' a > ( self : & ' a Arc < Self > ) -> impl Future < Output = ( ) > + ' a {
76
76
self . mark_closed ( ) ;
77
77
78
+ // Keep clearing the idle queue as connections are released until the count reaches zero.
78
79
async move {
79
- while let Some ( idle) = self . idle . try_acquire ( self ) {
80
- idle. close ( ) . await ;
80
+ let mut drained = pin ! ( self . counter. drain( ) ) ;
81
+
82
+ loop {
83
+ select ! {
84
+ idle = self . idle. acquire( self ) => {
85
+ idle. close( ) . await ;
86
+ } ,
87
+ ( ) = drained. as_mut( ) => break ,
88
+ }
81
89
}
82
-
83
- self . counter . drain ( ) . await ;
84
90
}
85
91
}
86
92
@@ -116,7 +122,7 @@ impl<DB: Database> PoolInner<DB> {
116
122
let acquire_started_at = Instant :: now ( ) ;
117
123
118
124
let mut close_event = pin ! ( self . close_event( ) ) ;
119
- let mut deadline = pin ! ( crate :: rt:: sleep( self . options. acquire_timeout) ) ;
125
+ let mut deadline = pin ! ( rt:: sleep( self . options. acquire_timeout) ) ;
120
126
let mut acquire_idle = pin ! ( self . idle. acquire( self ) . fuse( ) ) ;
121
127
let mut before_acquire = OptionFuture :: from ( None ) ;
122
128
let mut acquire_connect_permit = pin ! ( OptionFuture :: from( Some (
@@ -130,6 +136,9 @@ impl<DB: Database> PoolInner<DB> {
130
136
// * If we acquire a `ConnectPermit`, we begin the connection loop (with backoff)
131
137
// as implemented by `DynConnector`.
132
138
// * If we acquire an idle connection, we then start polling `check_idle_conn()`.
139
+ //
140
+ // This doesn't quite fit into `select!{}` because the set of futures that may be polled
141
+ // at a given time is dynamic, so it's actually simpler to hand-roll it.
133
142
let acquired = future:: poll_fn ( |cx| {
134
143
use std:: task:: Poll :: * ;
135
144
0 commit comments