Skip to content

Commit 28ed854

Browse files
committed
Extract Connection::open into a new trait Connect::connect
1 parent 3183413 commit 28ed854

File tree

9 files changed

+150
-105
lines changed

9 files changed

+150
-105
lines changed

sqlx-core/src/connection.rs

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -9,12 +9,6 @@ use std::convert::TryInto;
99
/// Prefer running queries from [Pool] unless there is a specific need for a single, continuous
1010
/// connection.
1111
pub trait Connection: Executor + Send + 'static {
12-
/// Establish a new database connection.
13-
fn open<T>(url: T) -> BoxFuture<'static, crate::Result<Self>>
14-
where
15-
T: TryInto<Url, Error = crate::Error>,
16-
Self: Sized;
17-
1812
/// Close this database connection.
1913
fn close(self) -> BoxFuture<'static, crate::Result<()>>;
2014

@@ -23,3 +17,14 @@ pub trait Connection: Executor + Send + 'static {
2317
Box::pin(self.execute("SELECT 1", Default::default()).map_ok(|_| ()))
2418
}
2519
}
20+
21+
/// Represents a type that can directly establish a new connection.
22+
pub trait Connect {
23+
type Connection: Connection;
24+
25+
/// Establish a new database connection.
26+
fn connect<T>(url: T) -> BoxFuture<'static, crate::Result<Self::Connection>>
27+
where
28+
T: TryInto<Url, Error = crate::Error>,
29+
Self: Sized;
30+
}

sqlx-core/src/lib.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ pub use database::Database;
4343
#[doc(inline)]
4444
pub use error::{Error, Result};
4545

46-
pub use connection::Connection;
46+
pub use connection::{Connect, Connection};
4747
pub use executor::Executor;
4848
pub use query::{query, Query};
4949
pub use query_as::{query_as, QueryAs};

sqlx-core/src/mysql/connection.rs

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -591,15 +591,30 @@ impl MySqlConnection {
591591
}
592592
}
593593

594-
impl Connection for MySqlConnection {
595-
fn open<T>(url: T) -> BoxFuture<'static, crate::Result<Self>>
594+
impl MySqlConnection {
595+
#[deprecated(note = "please use 'connect' instead")]
596+
pub fn open<T>(url: T) -> BoxFuture<'static, crate::Result<Self>>
596597
where
597598
T: TryInto<Url, Error = crate::Error>,
598599
Self: Sized,
599600
{
600601
Box::pin(MySqlConnection::open(url.try_into()))
601602
}
603+
}
604+
605+
impl Connect for MySqlConnection {
606+
type Connection = MySqlConnection;
607+
608+
fn connect<T>(url: T) -> BoxFuture<'static, Result<MySqlConnection>>
609+
where
610+
T: TryInto<Url, Error = crate::Error>,
611+
Self: Sized,
612+
{
613+
Box::pin(PgConnection::open(url.try_into()))
614+
}
615+
}
602616

617+
impl Connection for MySqlConnection {
603618
fn close(self) -> BoxFuture<'static, crate::Result<()>> {
604619
Box::pin(self.close())
605620
}

sqlx-core/src/pool/executor.rs

Lines changed: 28 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -1,34 +1,40 @@
11
use futures_core::{future::BoxFuture, stream::BoxStream};
22
use futures_util::StreamExt;
33

4-
use crate::{describe::Describe, executor::Executor, pool::Pool, Database};
4+
use crate::{
5+
connection::{Connect, Connection},
6+
describe::Describe,
7+
executor::Executor,
8+
pool::Pool,
9+
Database,
10+
};
511

6-
impl<DB> Executor for Pool<DB>
12+
impl<C> Executor for Pool<C>
713
where
8-
DB: Database,
14+
C: Connection + Connect<Connection = C>,
915
{
10-
type Database = DB;
16+
type Database = <C as Executor>::Database;
1117

1218
fn send<'e, 'q: 'e>(&'e mut self, commands: &'q str) -> BoxFuture<'e, crate::Result<()>> {
13-
Box::pin(async move { <&Pool<DB> as Executor>::send(&mut &*self, commands).await })
19+
Box::pin(async move { <&Pool<C> as Executor>::send(&mut &*self, commands).await })
1420
}
1521

1622
fn execute<'e, 'q: 'e>(
1723
&'e mut self,
1824
query: &'q str,
19-
args: DB::Arguments,
25+
args: <<C as Executor>::Database as Database>::Arguments,
2026
) -> BoxFuture<'e, crate::Result<u64>> {
21-
Box::pin(async move { <&Pool<DB> as Executor>::execute(&mut &*self, query, args).await })
27+
Box::pin(async move { <&Pool<C> as Executor>::execute(&mut &*self, query, args).await })
2228
}
2329

2430
fn fetch<'e, 'q: 'e>(
2531
&'e mut self,
2632
query: &'q str,
27-
args: DB::Arguments,
28-
) -> BoxStream<'e, crate::Result<DB::Row>> {
33+
args: <<C as Executor>::Database as Database>::Arguments,
34+
) -> BoxStream<'e, crate::Result<<<C as Executor>::Database as Database>::Row>> {
2935
Box::pin(async_stream::try_stream! {
3036
let mut self_ = &*self;
31-
let mut s = <&Pool<DB> as Executor>::fetch(&mut self_, query, args);
37+
let mut s = <&Pool<C> as Executor>::fetch(&mut self_, query, args);
3238

3339
while let Some(row) = s.next().await.transpose()? {
3440
yield row;
@@ -39,26 +45,26 @@ where
3945
fn fetch_optional<'e, 'q: 'e>(
4046
&'e mut self,
4147
query: &'q str,
42-
args: DB::Arguments,
43-
) -> BoxFuture<'e, crate::Result<Option<DB::Row>>> {
48+
args: <<C as Executor>::Database as Database>::Arguments,
49+
) -> BoxFuture<'e, crate::Result<Option<<<C as Executor>::Database as Database>::Row>>> {
4450
Box::pin(
45-
async move { <&Pool<DB> as Executor>::fetch_optional(&mut &*self, query, args).await },
51+
async move { <&Pool<C> as Executor>::fetch_optional(&mut &*self, query, args).await },
4652
)
4753
}
4854

4955
fn describe<'e, 'q: 'e>(
5056
&'e mut self,
5157
query: &'q str,
5258
) -> BoxFuture<'e, crate::Result<Describe<Self::Database>>> {
53-
Box::pin(async move { <&Pool<DB> as Executor>::describe(&mut &*self, query).await })
59+
Box::pin(async move { <&Pool<C> as Executor>::describe(&mut &*self, query).await })
5460
}
5561
}
5662

57-
impl<DB> Executor for &'_ Pool<DB>
63+
impl<C> Executor for &'_ Pool<C>
5864
where
59-
DB: Database,
65+
C: Connection + Connect<Connection = C>,
6066
{
61-
type Database = DB;
67+
type Database = <C as Executor>::Database;
6268

6369
fn send<'e, 'q: 'e>(&'e mut self, commands: &'q str) -> BoxFuture<'e, crate::Result<()>> {
6470
Box::pin(async move { self.acquire().await?.send(commands).await })
@@ -67,16 +73,16 @@ where
6773
fn execute<'e, 'q: 'e>(
6874
&'e mut self,
6975
query: &'q str,
70-
args: DB::Arguments,
76+
args: <<C as Executor>::Database as Database>::Arguments,
7177
) -> BoxFuture<'e, crate::Result<u64>> {
7278
Box::pin(async move { self.acquire().await?.execute(query, args).await })
7379
}
7480

7581
fn fetch<'e, 'q: 'e>(
7682
&'e mut self,
7783
query: &'q str,
78-
args: DB::Arguments,
79-
) -> BoxStream<'e, crate::Result<DB::Row>> {
84+
args: <<C as Executor>::Database as Database>::Arguments,
85+
) -> BoxStream<'e, crate::Result<<<C as Executor>::Database as Database>::Row>> {
8086
Box::pin(async_stream::try_stream! {
8187
let mut live = self.acquire().await?;
8288
let mut s = live.fetch(query, args);
@@ -90,8 +96,8 @@ where
9096
fn fetch_optional<'e, 'q: 'e>(
9197
&'e mut self,
9298
query: &'q str,
93-
args: DB::Arguments,
94-
) -> BoxFuture<'e, crate::Result<Option<DB::Row>>> {
99+
args: <<C as Executor>::Database as Database>::Arguments,
100+
) -> BoxFuture<'e, crate::Result<Option<<<C as Executor>::Database as Database>::Row>>> {
95101
Box::pin(async move { self.acquire().await?.fetch_optional(query, args).await })
96102
}
97103

sqlx-core/src/pool/inner.rs

Lines changed: 28 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -8,24 +8,23 @@ use crossbeam_queue::{ArrayQueue, SegQueue};
88
use futures_channel::oneshot::{channel, Sender};
99

1010
use super::{Idle, Live, Options};
11-
use crate::{error::Error, Connection, Database};
11+
use crate::{
12+
connection::{Connect, Connection},
13+
error::Error,
14+
};
1215

13-
pub(super) struct SharedPool<DB>
14-
where
15-
DB: Database,
16-
{
16+
pub(super) struct SharedPool<C> {
1717
url: String,
18-
idle: ArrayQueue<Idle<DB>>,
19-
waiters: SegQueue<Sender<Live<DB>>>,
18+
idle: ArrayQueue<Idle<C>>,
19+
waiters: SegQueue<Sender<Live<C>>>,
2020
size: AtomicU32,
2121
is_closed: AtomicBool,
2222
options: Options,
2323
}
2424

25-
impl<DB> SharedPool<DB>
25+
impl<C> SharedPool<C>
2626
where
27-
DB: Database,
28-
DB::Connection: Connection<Database = DB>,
27+
C: Connection + Connect<Connection = C>,
2928
{
3029
pub(super) async fn new_arc(url: &str, options: Options) -> crate::Result<Arc<Self>> {
3130
let pool = Arc::new(Self {
@@ -96,15 +95,15 @@ where
9695
}
9796

9897
#[inline]
99-
pub(super) fn try_acquire(&self) -> Option<Live<DB>> {
98+
pub(super) fn try_acquire(&self) -> Option<Live<C>> {
10099
if self.is_closed.load(Ordering::Acquire) {
101100
return None;
102101
}
103102

104103
Some(self.idle.pop().ok()?.live)
105104
}
106105

107-
pub(super) fn release(&self, mut live: Live<DB>) {
106+
pub(super) fn release(&self, mut live: Live<C>) {
108107
// Try waiters in (FIFO) order until one is still waiting ..
109108
while let Ok(waiter) = self.waiters.pop() {
110109
live = match waiter.send(live) {
@@ -123,7 +122,7 @@ where
123122
});
124123
}
125124

126-
pub(super) async fn acquire(&self) -> crate::Result<Live<DB>> {
125+
pub(super) async fn acquire(&self) -> crate::Result<Live<C>> {
127126
let start = Instant::now();
128127
let deadline = start + self.options.connect_timeout;
129128

@@ -198,7 +197,7 @@ where
198197
Err(Error::PoolClosed)
199198
}
200199

201-
async fn eventually_connect(&self, deadline: Instant) -> crate::Result<Live<DB>> {
200+
async fn eventually_connect(&self, deadline: Instant) -> crate::Result<Live<C>> {
202201
loop {
203202
// [connect] will raise an error when past deadline
204203
// [connect] returns None if its okay to retry
@@ -208,7 +207,7 @@ where
208207
}
209208
}
210209

211-
async fn connect(&self, deadline: Instant) -> crate::Result<Option<Live<DB>>> {
210+
async fn connect(&self, deadline: Instant) -> crate::Result<Option<Live<C>>> {
212211
// FIXME: Code between `-` is duplicate with [acquire]
213212
// ---------------------------------
214213

@@ -227,8 +226,8 @@ where
227226

228227
// ---------------------------------
229228

230-
// result here is `Result<Result<DB, Error>, TimeoutError>`
231-
match timeout(until, DB::Connection::open(&self.url)).await {
229+
// result here is `Result<Result<C, Error>, TimeoutError>`
230+
match timeout(until, C::connect(&self.url)).await {
232231
// successfully established connection
233232
Ok(Ok(raw)) => {
234233
Ok(Some(Live {
@@ -260,18 +259,18 @@ where
260259
}
261260
}
262261

263-
impl<DB: Database> Idle<DB>
262+
impl<C> Idle<C>
264263
where
265-
DB::Connection: Connection<Database = DB>,
264+
C: Connection,
266265
{
267266
async fn close(self) {
268267
self.live.close().await;
269268
}
270269
}
271270

272-
impl<DB: Database> Live<DB>
271+
impl<C> Live<C>
273272
where
274-
DB::Connection: Connection<Database = DB>,
273+
C: Connection,
275274
{
276275
async fn close(self) {
277276
let _ = self.raw.close().await;
@@ -280,21 +279,24 @@ where
280279

281280
// NOTE: Function names here are bizzare. Helpful help would be appreciated.
282281

283-
fn is_beyond_lifetime<DB: Database>(live: &Live<DB>, options: &Options) -> bool {
282+
fn is_beyond_lifetime<C>(live: &Live<C>, options: &Options) -> bool {
284283
// check if connection was within max lifetime (or not set)
285284
options
286285
.max_lifetime
287286
.map_or(false, |max| live.created.elapsed() > max)
288287
}
289288

290-
fn is_beyond_idle<DB: Database>(idle: &Idle<DB>, options: &Options) -> bool {
289+
fn is_beyond_idle<C>(idle: &Idle<C>, options: &Options) -> bool {
291290
// if connection wasn't idle too long (or not set)
292291
options
293292
.idle_timeout
294293
.map_or(false, |timeout| idle.since.elapsed() > timeout)
295294
}
296295

297-
async fn check_live<DB: Database>(mut live: Live<DB>, options: &Options) -> Option<Live<DB>> {
296+
async fn check_live<C>(mut live: Live<C>, options: &Options) -> Option<Live<C>>
297+
where
298+
C: Connection,
299+
{
298300
// If the connection we pulled has expired, close the connection and
299301
// immediately create a new connection
300302
if is_beyond_lifetime(&live, options) {
@@ -325,9 +327,9 @@ async fn check_live<DB: Database>(mut live: Live<DB>, options: &Options) -> Opti
325327
}
326328

327329
/// if `max_lifetime` or `idle_timeout` is set, spawn a task that reaps senescent connections
328-
fn spawn_reaper<DB: Database>(pool: &Arc<SharedPool<DB>>)
330+
fn spawn_reaper<C>(pool: &Arc<SharedPool<C>>)
329331
where
330-
DB::Connection: Connection<Database = DB>,
332+
C: Connection,
331333
{
332334
let period = match (pool.options.max_lifetime, pool.options.idle_timeout) {
333335
(Some(it), None) | (None, Some(it)) => it,

0 commit comments

Comments
 (0)