diff --git a/bb8/Cargo.toml b/bb8/Cargo.toml index fb22788..be18b2c 100644 --- a/bb8/Cargo.toml +++ b/bb8/Cargo.toml @@ -1,9 +1,9 @@ [package] name = "bb8" -version = "0.8.0" -description = "Full-featured async (tokio-based) connection pool (like r2d2)" +version = "0.8.0-rc.1" +description = "[FORK] Full-featured async (tokio-based) connection pool (like r2d2)" license = "MIT" -repository = "https://github.com/djc/bb8" +repository = "https://github.com/gaoqiangz/bb8" edition = "2021" workspace = ".." readme = "../README.md" diff --git a/bb8/src/api.rs b/bb8/src/api.rs index 794d58d..624ca12 100644 --- a/bb8/src/api.rs +++ b/bb8/src/api.rs @@ -1,8 +1,10 @@ use std::borrow::Cow; use std::error; use std::fmt; +use std::future::Future; use std::marker::PhantomData; use std::ops::{Deref, DerefMut}; +use std::pin::Pin; use std::time::Duration; use async_trait::async_trait; @@ -253,7 +255,7 @@ impl Builder { self } - fn build_inner(self, manager: M) -> Pool { + fn build_inner(self, manager: M, executor: E) -> Pool { if let Some(min_idle) = self.min_idle { assert!( self.max_size >= min_idle, @@ -262,7 +264,7 @@ impl Builder { } Pool { - inner: PoolInner::new(self, manager), + inner: PoolInner::new(self, manager, executor), } } @@ -271,7 +273,19 @@ impl Builder { /// The `Pool` will not be returned until it has established its configured /// minimum number of connections, or it times out. pub async fn build(self, manager: M) -> Result, M::Error> { - let pool = self.build_inner(manager); + self.build_with_executor(manager, TokioExecutor).await + } + + /// Consumes the builder with the specified executor, returning a new, initialized `Pool`. + /// + /// The `Pool` will not be returned until it has established its configured + /// minimum number of connections, or it times out. + pub async fn build_with_executor( + self, + manager: M, + executor: E, + ) -> Result, M::Error> { + let pool = self.build_inner(manager, executor); pool.inner.start_connections().await.map(|()| pool) } @@ -280,7 +294,15 @@ impl Builder { /// Unlike `build`, this does not wait for any connections to be established /// before returning. pub fn build_unchecked(self, manager: M) -> Pool { - let p = self.build_inner(manager); + self.build_unchecked_with_executor(manager, TokioExecutor) + } + + /// Consumes the builder with the specified executor, returning a new, initialized `Pool`. + /// + /// Unlike `build`, this does not wait for any connections to be established + /// before returning. + pub fn build_unchecked_with_executor(self, manager: M, executor: E) -> Pool { + let p = self.build_inner(manager, executor); p.inner.spawn_start_connections(); p } @@ -456,3 +478,17 @@ impl ErrorSink for NopErrorSink { Box::new(*self) } } + +/// An executor of futures. +pub trait Executor: Sync + Send + 'static { + /// Place the future into the executor to be run. + fn execute(&self, fut: Pin + Send>>); +} + +struct TokioExecutor; + +impl Executor for TokioExecutor { + fn execute(&self, fut: Pin + Send>>) { + tokio::spawn(fut); + } +} diff --git a/bb8/src/inner.rs b/bb8/src/inner.rs index 71f6ab8..0597ba1 100644 --- a/bb8/src/inner.rs +++ b/bb8/src/inner.rs @@ -3,14 +3,13 @@ use std::fmt; use std::future::Future; use std::sync::{Arc, Weak}; use std::time::{Duration, Instant}; +use tokio::time::{interval_at, sleep, timeout, Interval}; use futures_channel::oneshot; use futures_util::stream::{FuturesUnordered, StreamExt}; use futures_util::TryFutureExt; -use tokio::spawn; -use tokio::time::{interval_at, sleep, timeout, Interval}; -use crate::api::{Builder, ManageConnection, PooledConnection, RunError}; +use crate::api::{Builder, Executor, ManageConnection, PooledConnection, RunError}; use crate::internals::{Approval, ApprovalIter, Conn, SharedPool, State}; pub(crate) struct PoolInner @@ -24,15 +23,15 @@ impl PoolInner where M: ManageConnection + Send, { - pub(crate) fn new(builder: Builder, manager: M) -> Self { - let inner = Arc::new(SharedPool::new(builder, manager)); + pub(crate) fn new(builder: Builder, manager: M, executor: E) -> Self { + let inner = Arc::new(SharedPool::new(builder, manager, executor)); if inner.statics.max_lifetime.is_some() || inner.statics.idle_timeout.is_some() { let s = Arc::downgrade(&inner); if let Some(shared) = s.upgrade() { let start = Instant::now() + shared.statics.reaper_rate; let interval = interval_at(start.into(), shared.statics.reaper_rate); - schedule_reaping(interval, s); + schedule_reaping(inner.executor.as_ref(), interval, s); } } @@ -59,7 +58,7 @@ where } let this = self.clone(); - spawn(async move { + self.inner.executor.execute(Box::pin(async move { let mut stream = this.replenish_idle_connections(approvals); while let Some(result) = stream.next().await { match result { @@ -67,7 +66,7 @@ where Err(e) => this.inner.statics.error_sink.sink(e), } } - }); + })); } fn replenish_idle_connections( @@ -254,11 +253,14 @@ where } } -fn schedule_reaping(mut interval: Interval, weak_shared: Weak>) -where +fn schedule_reaping( + executor: &dyn Executor, + mut interval: Interval, + weak_shared: Weak>, +) where M: ManageConnection, { - spawn(async move { + executor.execute(Box::pin(async move { loop { let _ = interval.tick().await; if let Some(inner) = weak_shared.upgrade() { @@ -267,5 +269,5 @@ where break; } } - }); + })); } diff --git a/bb8/src/internals.rs b/bb8/src/internals.rs index b9a63db..0d2529f 100644 --- a/bb8/src/internals.rs +++ b/bb8/src/internals.rs @@ -1,12 +1,12 @@ use std::cmp::min; +use std::collections::VecDeque; use std::sync::Arc; use std::time::Instant; use futures_channel::oneshot; use parking_lot::Mutex; -use crate::api::{Builder, ManageConnection}; -use std::collections::VecDeque; +use crate::api::{Builder, Executor, ManageConnection}; /// The guts of a `Pool`. #[allow(missing_debug_implementations)] @@ -15,6 +15,7 @@ where M: ManageConnection + Send, { pub(crate) statics: Builder, + pub(crate) executor: Box, pub(crate) manager: M, pub(crate) internals: Mutex>, } @@ -23,11 +24,12 @@ impl SharedPool where M: ManageConnection + Send, { - pub(crate) fn new(statics: Builder, manager: M) -> Self { + pub(crate) fn new(statics: Builder, manager: M, executor: E) -> Self { Self { statics, manager, internals: Mutex::new(PoolInternals::default()), + executor: Box::new(executor), } } } diff --git a/bb8/src/lib.rs b/bb8/src/lib.rs index b85f910..37526f9 100644 --- a/bb8/src/lib.rs +++ b/bb8/src/lib.rs @@ -35,7 +35,7 @@ mod api; pub use api::{ - Builder, CustomizeConnection, ErrorSink, ManageConnection, NopErrorSink, Pool, + Builder, CustomizeConnection, ErrorSink, Executor, ManageConnection, NopErrorSink, Pool, PooledConnection, RunError, State, };