Skip to content

refactor(client): simplify pool idle task with async/await #216

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Jul 17, 2025
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
74 changes: 30 additions & 44 deletions src/client/legacy/pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ use futures_channel::oneshot;
use futures_core::ready;
use tracing::{debug, trace};

use hyper::rt::Sleep;
use hyper::rt::Timer as _;

use crate::common::{exec, exec::Exec, timer::Timer};
Expand Down Expand Up @@ -440,13 +439,11 @@ impl<T: Poolable, K: Key> PoolInner<T, K> {
let interval = IdleTask {
timer: timer.clone(),
duration: dur,
deadline: Instant::now(),
fut: timer.sleep_until(Instant::now()), // ready at first tick
pool: WeakOpt::downgrade(pool_ref),
pool_drop_notifier: rx,
};

self.exec.execute(interval);
self.exec.execute(interval.run());
}
}

Expand Down Expand Up @@ -766,55 +763,44 @@ impl Expiration {
}
}

pin_project_lite::pin_project! {
struct IdleTask<T, K: Key> {
timer: Timer,
duration: Duration,
deadline: Instant,
fut: Pin<Box<dyn Sleep>>,
pool: WeakOpt<Mutex<PoolInner<T, K>>>,
// This allows the IdleTask to be notified as soon as the entire
// Pool is fully dropped, and shutdown. This channel is never sent on,
// but Err(Canceled) will be received when the Pool is dropped.
#[pin]
pool_drop_notifier: oneshot::Receiver<Infallible>,
}
struct IdleTask<T, K: Key> {
timer: Timer,
duration: Duration,
pool: WeakOpt<Mutex<PoolInner<T, K>>>,
// This allows the IdleTask to be notified as soon as the entire
// Pool is fully dropped, and shutdown. This channel is never sent on,
// but Err(Canceled) will be received when the Pool is dropped.
pool_drop_notifier: oneshot::Receiver<Infallible>,
}

impl<T: Poolable + 'static, K: Key> Future for IdleTask<T, K> {
type Output = ();
impl<T: Poolable + 'static, K: Key> IdleTask<T, K> {
async fn run(self) {
use futures_util::future;

fn poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> {
let mut this = self.project();
let mut sleep = self.timer.sleep_until(Instant::now() + self.duration);
let mut on_pool_drop = self.pool_drop_notifier;
loop {
match this.pool_drop_notifier.as_mut().poll(cx) {
Poll::Ready(Ok(n)) => match n {},
Poll::Pending => (),
Poll::Ready(Err(_canceled)) => {
trace!("pool closed, canceling idle interval");
return Poll::Ready(());
match future::select(&mut on_pool_drop, &mut sleep).await {
future::Either::Left(_) => {
// pool dropped, bah-bye
break;
}
}

ready!(Pin::new(&mut this.fut).poll(cx));
// Set this task to run after the next deadline
// If the poll missed the deadline by a lot, set the deadline
// from the current time instead
*this.deadline += *this.duration;
if *this.deadline < Instant::now() - Duration::from_millis(5) {
*this.deadline = Instant::now() + *this.duration;
}
*this.fut = this.timer.sleep_until(*this.deadline);
future::Either::Right(((), _)) => {
if let Some(inner) = self.pool.upgrade() {
if let Ok(mut inner) = inner.lock() {
trace!("idle interval checking for expired");
inner.clear_expired();
}
}

if let Some(inner) = this.pool.upgrade() {
if let Ok(mut inner) = inner.lock() {
trace!("idle interval checking for expired");
inner.clear_expired();
continue;
let deadline = Instant::now() + self.duration;
self.timer.reset(&mut sleep, deadline);
}
}
return Poll::Ready(());
}

trace!("pool closed, canceling idle interval");
return;
}
}

Expand Down
Loading