File tree Expand file tree Collapse file tree 3 files changed +21
-11
lines changed Expand file tree Collapse file tree 3 files changed +21
-11
lines changed Original file line number Diff line number Diff line change @@ -6,7 +6,7 @@ use crate::pool::PoolConnection;
6
6
use crate :: rt:: JoinHandle ;
7
7
use crate :: Error ;
8
8
use ease_off:: EaseOff ;
9
- use event_listener:: Event ;
9
+ use event_listener:: { listener , Event } ;
10
10
use std:: fmt:: { Display , Formatter } ;
11
11
use std:: future:: Future ;
12
12
use std:: ptr;
@@ -318,7 +318,8 @@ impl ConnectionCounter {
318
318
319
319
pub async fn drain ( & self ) {
320
320
while self . count . load ( Ordering :: Acquire ) > 0 {
321
- self . connect_available . listen ( ) . await ;
321
+ listener ! ( self . connect_available => permit_released) ;
322
+ permit_released. await ;
322
323
}
323
324
}
324
325
@@ -386,13 +387,14 @@ impl ConnectionCounter {
386
387
return acquired;
387
388
}
388
389
389
- self . connect_available . listen ( ) . await ;
390
-
391
390
if attempt == 2 {
392
391
tracing:: warn!(
393
392
"unable to acquire a connect permit after sleeping; this may indicate a bug"
394
393
) ;
395
394
}
395
+
396
+ listener ! ( self . connect_available => connect_available) ;
397
+ connect_available. await ;
396
398
}
397
399
398
400
panic ! ( "BUG: was never able to acquire a connection despite waking many times" )
Original file line number Diff line number Diff line change @@ -8,6 +8,8 @@ use futures_util::FutureExt;
8
8
use std:: sync:: atomic:: { AtomicUsize , Ordering } ;
9
9
use std:: sync:: Arc ;
10
10
11
+ use event_listener:: listener;
12
+
11
13
pub struct IdleQueue < DB : Database > {
12
14
queue : ArrayQueue < Idle < DB > > ,
13
15
// Keep a separate count because `ArrayQueue::len()` loops until the head and tail pointers
@@ -36,7 +38,8 @@ impl<DB: Database> IdleQueue<DB> {
36
38
37
39
for attempt in 1usize .. {
38
40
if should_wait {
39
- self . release_event . listen ( ) . await ;
41
+ listener ! ( self . release_event => release_event) ;
42
+ release_event. await ;
40
43
}
41
44
42
45
if let Some ( conn) = self . try_acquire ( pool) {
Original file line number Diff line number Diff line change @@ -17,7 +17,7 @@ use crate::rt::JoinHandle;
17
17
use crate :: { private_tracing_dynamic_event, rt} ;
18
18
use either:: Either ;
19
19
use futures_util:: future:: { self , OptionFuture } ;
20
- use futures_util:: { select , FutureExt } ;
20
+ use futures_util:: { FutureExt } ;
21
21
use std:: time:: { Duration , Instant } ;
22
22
use tracing:: Level ;
23
23
@@ -77,14 +77,19 @@ impl<DB: Database> PoolInner<DB> {
77
77
78
78
// Keep clearing the idle queue as connections are released until the count reaches zero.
79
79
async move {
80
- let mut drained = pin ! ( self . counter. drain( ) ) . fuse ( ) ;
80
+ let mut drained = pin ! ( self . counter. drain( ) ) ;
81
81
82
82
loop {
83
- select ! {
84
- idle = self . idle. acquire( self ) => {
83
+ let mut acquire_idle = pin ! ( self . idle. acquire( self ) ) ;
84
+
85
+ // Not using `futures::select!{}` here because it requires a proc-macro dep,
86
+ // and frankly it's a little broken.
87
+ match future:: select ( drained. as_mut ( ) , acquire_idle. as_mut ( ) ) . await {
88
+ // *not* `either::Either`; they rolled their own
89
+ future:: Either :: Left ( _) => break ,
90
+ future:: Either :: Right ( ( idle, _) ) => {
85
91
idle. close ( ) . await ;
86
- } ,
87
- ( ) = drained. as_mut( ) => break ,
92
+ }
88
93
}
89
94
}
90
95
}
You can’t perform that action at this time.
0 commit comments