Skip to content

Reduce futures-util #238

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Jun 1, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,13 @@ description = "An async extension for Diesel the safe, extensible ORM and Query
rust-version = "1.82.0"

[dependencies]
futures-core = "0.3.17"
futures-channel = { version = "0.3.17", default-features = false, features = [
"std",
"sink",
], optional = true }
futures-util = { version = "0.3.17", default-features = false, features = [
"std",
"alloc",
"sink",
] }
tokio-postgres = { version = "0.7.10", optional = true }
Expand Down
4 changes: 2 additions & 2 deletions src/async_connection_wrapper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,9 @@
//! as replacement for the existing connection
//! implementations provided by diesel

use futures_util::Future;
use futures_util::Stream;
use futures_core::Stream;
use futures_util::StreamExt;
use std::future::Future;
use std::pin::Pin;

/// This is a helper trait that allows to customize the
Expand Down
10 changes: 6 additions & 4 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,9 +78,11 @@ use diesel::connection::{CacheSize, Instrumentation};
use diesel::query_builder::{AsQuery, QueryFragment, QueryId};
use diesel::row::Row;
use diesel::{ConnectionResult, QueryResult};
use futures_util::future::BoxFuture;
use futures_util::{Future, FutureExt, Stream};
use futures_core::future::BoxFuture;
use futures_core::Stream;
use futures_util::FutureExt;
use std::fmt::Debug;
use std::future::Future;

pub use scoped_futures;
use scoped_futures::{ScopedBoxFuture, ScopedFutureExt};
Expand Down Expand Up @@ -322,15 +324,15 @@ pub trait AsyncConnection: SimpleAsyncConnection + Sized + Send {
.map_err(|_| diesel::result::Error::RollbackTransaction)
.and_then(move |r| {
let _ = user_result_tx.send(r);
futures_util::future::ready(Err(diesel::result::Error::RollbackTransaction))
std::future::ready(Err(diesel::result::Error::RollbackTransaction))
})
.scope_boxed()
})
.then(move |_r| {
let r = user_result_rx
.try_recv()
.expect("Transaction did not succeed");
futures_util::future::ready(r)
std::future::ready(r)
})
}

Expand Down
8 changes: 5 additions & 3 deletions src/mysql/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,13 @@ use diesel::query_builder::QueryBuilder;
use diesel::query_builder::{bind_collector::RawBytesBindCollector, QueryFragment, QueryId};
use diesel::result::{ConnectionError, ConnectionResult};
use diesel::QueryResult;
use futures_util::future::BoxFuture;
use futures_util::stream::{self, BoxStream};
use futures_util::{Future, FutureExt, StreamExt, TryStreamExt};
use futures_core::future::BoxFuture;
use futures_core::stream::BoxStream;
use futures_util::stream;
use futures_util::{FutureExt, StreamExt, TryStreamExt};
use mysql_async::prelude::Queryable;
use mysql_async::{Opts, OptsBuilder, Statement};
use std::future::Future;

mod error_helper;
mod row;
Expand Down
13 changes: 8 additions & 5 deletions src/pg/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,14 @@ use diesel::query_builder::bind_collector::RawBytesBindCollector;
use diesel::query_builder::{AsQuery, QueryBuilder, QueryFragment, QueryId};
use diesel::result::{DatabaseErrorKind, Error};
use diesel::{ConnectionError, ConnectionResult, QueryResult};
use futures_util::future::BoxFuture;
use futures_core::future::BoxFuture;
use futures_core::stream::BoxStream;
use futures_util::future::Either;
use futures_util::stream::{BoxStream, TryStreamExt};
use futures_util::stream::TryStreamExt;
use futures_util::TryFutureExt;
use futures_util::{Future, FutureExt, StreamExt};
use futures_util::{FutureExt, StreamExt};
use std::collections::{HashMap, HashSet};
use std::future::Future;
use std::sync::Arc;
use tokio::sync::broadcast;
use tokio::sync::oneshot;
Expand Down Expand Up @@ -443,10 +445,11 @@ impl AsyncPgConnection {
async fn set_config_options(&mut self) -> QueryResult<()> {
use crate::run_query_dsl::RunQueryDsl;

futures_util::try_join!(
futures_util::future::try_join(
diesel::sql_query("SET TIME ZONE 'UTC'").execute(self),
diesel::sql_query("SET CLIENT_ENCODING TO 'UTF8'").execute(self),
)?;
)
.await?;
Ok(())
}

Expand Down
9 changes: 4 additions & 5 deletions src/pooled_connection/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@ use crate::{TransactionManager, UpdateAndFetchResults};
use diesel::associations::HasTable;
use diesel::connection::{CacheSize, Instrumentation};
use diesel::QueryResult;
use futures_util::future::BoxFuture;
use futures_util::{future, FutureExt};
use futures_core::future::BoxFuture;
use futures_util::FutureExt;
use std::borrow::Cow;
use std::fmt;
use std::future::Future;
Expand Down Expand Up @@ -47,11 +47,10 @@ impl std::error::Error for PoolError {}

/// Type of the custom setup closure passed to [`ManagerConfig::custom_setup`]
pub type SetupCallback<C> =
Box<dyn Fn(&str) -> future::BoxFuture<diesel::ConnectionResult<C>> + Send + Sync>;
Box<dyn Fn(&str) -> BoxFuture<diesel::ConnectionResult<C>> + Send + Sync>;

/// Type of the recycle check callback for the [`RecyclingMethod::CustomFunction`] variant
pub type RecycleCheckCallback<C> =
dyn Fn(&mut C) -> future::BoxFuture<QueryResult<()>> + Send + Sync;
pub type RecycleCheckCallback<C> = dyn Fn(&mut C) -> BoxFuture<QueryResult<()>> + Send + Sync;

/// Possible methods of how a connection is recycled.
#[derive(Default)]
Expand Down
11 changes: 6 additions & 5 deletions src/run_query_dsl/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,9 @@ use diesel::associations::HasTable;
use diesel::query_builder::IntoUpdateTarget;
use diesel::result::QueryResult;
use diesel::AsChangeset;
use futures_util::future::BoxFuture;
use futures_util::{future, stream, FutureExt, Stream, StreamExt, TryFutureExt, TryStreamExt};
use futures_core::future::BoxFuture;
use futures_core::Stream;
use futures_util::{future, stream, FutureExt, StreamExt, TryFutureExt, TryStreamExt};
use std::future::Future;
use std::pin::Pin;

Expand Down Expand Up @@ -398,7 +399,7 @@ pub trait RunQueryDsl<Conn>: Sized {
/// .await?
/// .try_fold(Vec::new(), |mut acc, item| {
/// acc.push(item);
/// futures_util::future::ready(Ok(acc))
/// std::future::ready(Ok(acc))
/// })
/// .await?;
/// assert_eq!(vec!["Sean", "Tess"], data);
Expand Down Expand Up @@ -427,7 +428,7 @@ pub trait RunQueryDsl<Conn>: Sized {
/// .await?
/// .try_fold(Vec::new(), |mut acc, item| {
/// acc.push(item);
/// futures_util::future::ready(Ok(acc))
/// std::future::ready(Ok(acc))
/// })
/// .await?;
/// let expected_data = vec![
Expand Down Expand Up @@ -467,7 +468,7 @@ pub trait RunQueryDsl<Conn>: Sized {
/// .await?
/// .try_fold(Vec::new(), |mut acc, item| {
/// acc.push(item);
/// futures_util::future::ready(Ok(acc))
/// std::future::ready(Ok(acc))
/// })
/// .await?;
/// let expected_data = vec![
Expand Down
18 changes: 10 additions & 8 deletions src/stmt_cache.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,15 @@
use diesel::connection::statement_cache::{MaybeCached, StatementCallbackReturnType};
use diesel::QueryResult;
use futures_util::{future, FutureExt, TryFutureExt};
use std::future::Future;
use futures_core::future::BoxFuture;
use futures_util::future::Either;
use futures_util::{FutureExt, TryFutureExt};
use std::future::{self, Future};

pub(crate) struct CallbackHelper<F>(pub(crate) F);

type PrepareFuture<'a, C, S> = future::Either<
type PrepareFuture<'a, C, S> = Either<
future::Ready<QueryResult<(MaybeCached<'a, S>, C)>>,
future::BoxFuture<'a, QueryResult<(MaybeCached<'a, S>, C)>>,
BoxFuture<'a, QueryResult<(MaybeCached<'a, S>, C)>>,
>;

impl<S, F, C> StatementCallbackReturnType<S, C> for CallbackHelper<F>
Expand All @@ -18,22 +20,22 @@ where
type Return<'a> = PrepareFuture<'a, C, S>;

fn from_error<'a>(e: diesel::result::Error) -> Self::Return<'a> {
future::Either::Left(future::ready(Err(e)))
Either::Left(future::ready(Err(e)))
}

fn map_to_no_cache<'a>(self) -> Self::Return<'a>
where
Self: 'a,
{
future::Either::Right(
Either::Right(
self.0
.map_ok(|(stmt, conn)| (MaybeCached::CannotCache(stmt), conn))
.boxed(),
)
}

fn map_to_cache(stmt: &mut S, conn: C) -> Self::Return<'_> {
future::Either::Left(future::ready(Ok((MaybeCached::Cached(stmt), conn))))
Either::Left(future::ready(Ok((MaybeCached::Cached(stmt), conn))))
}

fn register_cache<'a>(
Expand All @@ -43,7 +45,7 @@ where
where
Self: 'a,
{
future::Either::Right(
Either::Right(
self.0
.map_ok(|(stmt, conn)| (MaybeCached::Cached(callback(stmt)), conn))
.boxed(),
Expand Down
4 changes: 2 additions & 2 deletions src/sync_connection_wrapper/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
//!
//! * using a sync Connection implementation in async context
//! * using the same code base for async crates needing multiple backends
use futures_util::future::BoxFuture;
use futures_core::future::BoxFuture;
use std::error::Error;

#[cfg(feature = "sqlite")]
Expand Down Expand Up @@ -100,7 +100,7 @@ mod implementation {
};
use diesel::row::IntoOwnedRow;
use diesel::{ConnectionResult, QueryResult};
use futures_util::stream::BoxStream;
use futures_core::stream::BoxStream;
use futures_util::{FutureExt, StreamExt, TryFutureExt};
use std::marker::PhantomData;
use std::sync::{Arc, Mutex};
Expand Down
Loading