Skip to content

Commit dd63305

Browse files
authored
refactor(client): simplify pool idle task with async/await (#216)
1 parent afd758b commit dd63305

File tree

1 file changed

+30
-44
lines changed

1 file changed

+30
-44
lines changed

src/client/legacy/pool.rs

Lines changed: 30 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@ use futures_channel::oneshot;
1717
use futures_core::ready;
1818
use tracing::{debug, trace};
1919

20-
use hyper::rt::Sleep;
2120
use hyper::rt::Timer as _;
2221

2322
use crate::common::{exec, exec::Exec, timer::Timer};
@@ -440,13 +439,11 @@ impl<T: Poolable, K: Key> PoolInner<T, K> {
440439
let interval = IdleTask {
441440
timer: timer.clone(),
442441
duration: dur,
443-
deadline: Instant::now(),
444-
fut: timer.sleep_until(Instant::now()), // ready at first tick
445442
pool: WeakOpt::downgrade(pool_ref),
446443
pool_drop_notifier: rx,
447444
};
448445

449-
self.exec.execute(interval);
446+
self.exec.execute(interval.run());
450447
}
451448
}
452449

@@ -766,55 +763,44 @@ impl Expiration {
766763
}
767764
}
768765

769-
pin_project_lite::pin_project! {
770-
struct IdleTask<T, K: Key> {
771-
timer: Timer,
772-
duration: Duration,
773-
deadline: Instant,
774-
fut: Pin<Box<dyn Sleep>>,
775-
pool: WeakOpt<Mutex<PoolInner<T, K>>>,
776-
// This allows the IdleTask to be notified as soon as the entire
777-
// Pool is fully dropped, and shutdown. This channel is never sent on,
778-
// but Err(Canceled) will be received when the Pool is dropped.
779-
#[pin]
780-
pool_drop_notifier: oneshot::Receiver<Infallible>,
781-
}
766+
struct IdleTask<T, K: Key> {
767+
timer: Timer,
768+
duration: Duration,
769+
pool: WeakOpt<Mutex<PoolInner<T, K>>>,
770+
// This allows the IdleTask to be notified as soon as the entire
771+
// Pool is fully dropped, and shutdown. This channel is never sent on,
772+
// but Err(Canceled) will be received when the Pool is dropped.
773+
pool_drop_notifier: oneshot::Receiver<Infallible>,
782774
}
783775

784-
impl<T: Poolable + 'static, K: Key> Future for IdleTask<T, K> {
785-
type Output = ();
776+
impl<T: Poolable + 'static, K: Key> IdleTask<T, K> {
777+
async fn run(self) {
778+
use futures_util::future;
786779

787-
fn poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> {
788-
let mut this = self.project();
780+
let mut sleep = self.timer.sleep_until(Instant::now() + self.duration);
781+
let mut on_pool_drop = self.pool_drop_notifier;
789782
loop {
790-
match this.pool_drop_notifier.as_mut().poll(cx) {
791-
Poll::Ready(Ok(n)) => match n {},
792-
Poll::Pending => (),
793-
Poll::Ready(Err(_canceled)) => {
794-
trace!("pool closed, canceling idle interval");
795-
return Poll::Ready(());
783+
match future::select(&mut on_pool_drop, &mut sleep).await {
784+
future::Either::Left(_) => {
785+
// pool dropped, bah-bye
786+
break;
796787
}
797-
}
798-
799-
ready!(Pin::new(&mut this.fut).poll(cx));
800-
// Set this task to run after the next deadline
801-
// If the poll missed the deadline by a lot, set the deadline
802-
// from the current time instead
803-
*this.deadline += *this.duration;
804-
if *this.deadline < Instant::now() - Duration::from_millis(5) {
805-
*this.deadline = Instant::now() + *this.duration;
806-
}
807-
*this.fut = this.timer.sleep_until(*this.deadline);
788+
future::Either::Right(((), _)) => {
789+
if let Some(inner) = self.pool.upgrade() {
790+
if let Ok(mut inner) = inner.lock() {
791+
trace!("idle interval checking for expired");
792+
inner.clear_expired();
793+
}
794+
}
808795

809-
if let Some(inner) = this.pool.upgrade() {
810-
if let Ok(mut inner) = inner.lock() {
811-
trace!("idle interval checking for expired");
812-
inner.clear_expired();
813-
continue;
796+
let deadline = Instant::now() + self.duration;
797+
self.timer.reset(&mut sleep, deadline);
814798
}
815799
}
816-
return Poll::Ready(());
817800
}
801+
802+
trace!("pool closed, canceling idle interval");
803+
return;
818804
}
819805
}
820806

0 commit comments

Comments
 (0)