Skip to content

Commit f5a204f

Browse files
committed
sqlx-postgres: Remove PgStream
1 parent c0a5459 commit f5a204f

File tree

4 files changed

+98
-121
lines changed

4 files changed

+98
-121
lines changed

sqlx-postgres/src/connection/establish.rs

Lines changed: 72 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,23 +1,22 @@
1-
use crate::connection::{sasl, stream::PgStream};
1+
use crate::connection::sasl;
22
use crate::error::Error;
33
use crate::message::{Authentication, BackendKeyData, BackendMessageFormat, Password, Startup};
44
use crate::{PgConnectOptions, PgConnection};
55
use futures_channel::mpsc::unbounded;
6+
use std::str::FromStr;
67

7-
use super::stream::parse_server_version;
8-
use super::worker::Worker;
8+
use super::worker::{Shared, Worker};
99

1010
// https://www.postgresql.org/docs/current/protocol-flow.html#id-1.10.5.7.3
1111
// https://www.postgresql.org/docs/current/protocol-flow.html#id-1.10.5.7.11
1212

1313
impl PgConnection {
1414
pub(crate) async fn establish(options: &PgConnectOptions) -> Result<Self, Error> {
15-
// Upgrade to TLS if we were asked to and the server supports it
16-
let stream = PgStream::connect(options).await?;
17-
1815
let (notif_tx, notif_rx) = unbounded();
16+
let shared = Shared::new();
1917

20-
let (channel, shared) = Worker::spawn(stream.into_inner(), notif_tx);
18+
// Upgrade to TLS if we were asked to and the server supports it
19+
let channel = Worker::connect(options, notif_tx, shared.clone()).await?;
2120

2221
let mut conn = PgConnection::new(options, channel, notif_rx, shared);
2322

@@ -143,3 +142,69 @@ impl PgConnection {
143142
Ok(conn)
144143
}
145144
}
145+
146+
// reference:
147+
// https://github.com/postgres/postgres/blob/6feebcb6b44631c3dc435e971bd80c2dd218a5ab/src/interfaces/libpq/fe-exec.c#L1030-L1065
148+
fn parse_server_version(s: impl Into<String>) -> Option<u32> {
149+
let s = s.into();
150+
let mut parts = Vec::<u32>::with_capacity(3);
151+
152+
let mut from = 0;
153+
let mut chs = s.char_indices().peekable();
154+
while let Some((i, ch)) = chs.next() {
155+
match ch {
156+
'.' => {
157+
if let Ok(num) = u32::from_str(&s[from..i]) {
158+
parts.push(num);
159+
from = i + 1;
160+
} else {
161+
break;
162+
}
163+
}
164+
_ if ch.is_ascii_digit() => {
165+
if chs.peek().is_none() {
166+
if let Ok(num) = u32::from_str(&s[from..]) {
167+
parts.push(num);
168+
}
169+
break;
170+
}
171+
}
172+
_ => {
173+
if let Ok(num) = u32::from_str(&s[from..i]) {
174+
parts.push(num);
175+
}
176+
break;
177+
}
178+
};
179+
}
180+
181+
let version_num = match parts.as_slice() {
182+
[major, minor, rev] => (100 * major + minor) * 100 + rev,
183+
[major, minor] if *major >= 10 => 100 * 100 * major + minor,
184+
[major, minor] => (100 * major + minor) * 100,
185+
[major] => 100 * 100 * major,
186+
_ => return None,
187+
};
188+
189+
Some(version_num)
190+
}
191+
192+
#[cfg(test)]
193+
mod tests {
194+
use super::parse_server_version;
195+
196+
#[test]
197+
fn test_parse_server_version_num() {
198+
// old style
199+
assert_eq!(parse_server_version("9.6.1"), Some(90601));
200+
// new style
201+
assert_eq!(parse_server_version("10.1"), Some(100001));
202+
// old style without minor version
203+
assert_eq!(parse_server_version("9.6devel"), Some(90600));
204+
// new style without minor version, e.g. */
205+
assert_eq!(parse_server_version("10devel"), Some(100000));
206+
assert_eq!(parse_server_version("13devel87"), Some(130000));
207+
// unknown
208+
assert_eq!(parse_server_version("unknown"), None);
209+
}
210+
}

sqlx-postgres/src/connection/mod.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,6 @@ mod executor;
2929
mod pipe;
3030
mod request;
3131
mod sasl;
32-
mod stream;
3332
mod tls;
3433
mod worker;
3534

sqlx-postgres/src/connection/stream.rs

Lines changed: 0 additions & 105 deletions
This file was deleted.

sqlx-postgres/src/connection/worker.rs

Lines changed: 26 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -7,20 +7,23 @@ use std::{
77
task::{ready, Context, Poll},
88
};
99

10-
use crate::message::{
11-
BackendMessageFormat, FrontendMessage, Notice, Notification, ParameterStatus, ReadyForQuery,
12-
ReceivedMessage, Terminate, TransactionStatus,
10+
use crate::{
11+
message::{
12+
BackendMessageFormat, FrontendMessage, Notice, Notification, ParameterStatus,
13+
ReadyForQuery, ReceivedMessage, Terminate, TransactionStatus,
14+
},
15+
PgConnectOptions,
1316
};
1417
use futures_channel::mpsc::{unbounded, UnboundedReceiver, UnboundedSender};
1518
use futures_util::{SinkExt, StreamExt};
1619
use sqlx_core::{
1720
bytes::Buf,
18-
net::{BufferedSocket, Socket},
21+
net::{self, BufferedSocket, Socket},
1922
rt::spawn,
2023
Result,
2124
};
2225

23-
use super::request::IoRequest;
26+
use super::{request::IoRequest, tls::MaybeUpgradeTls};
2427

2528
#[derive(PartialEq, Debug)]
2629
enum WorkerState {
@@ -44,12 +47,27 @@ pub struct Worker {
4447
}
4548

4649
impl Worker {
50+
pub(super) async fn connect(
51+
options: &PgConnectOptions,
52+
notif_chan: UnboundedSender<Notification>,
53+
shared: Shared,
54+
) -> crate::Result<UnboundedSender<IoRequest>> {
55+
let socket_result = match options.fetch_socket() {
56+
Some(ref path) => net::connect_uds(path, MaybeUpgradeTls(options)).await?,
57+
None => net::connect_tcp(&options.host, options.port, MaybeUpgradeTls(options)).await?,
58+
};
59+
60+
let socket = BufferedSocket::new(socket_result?);
61+
62+
Ok(Worker::spawn(socket, notif_chan, shared))
63+
}
64+
4765
pub fn spawn(
4866
socket: BufferedSocket<Box<dyn Socket>>,
4967
notif_chan: UnboundedSender<Notification>,
50-
) -> (UnboundedSender<IoRequest>, Shared) {
68+
shared: Shared,
69+
) -> UnboundedSender<IoRequest> {
5170
let (tx, rx) = unbounded();
52-
let shared = Shared::new();
5371

5472
let worker = Worker {
5573
state: WorkerState::Open,
@@ -62,7 +80,7 @@ impl Worker {
6280
};
6381

6482
spawn(worker);
65-
(tx, shared)
83+
tx
6684
}
6785

6886
// Tries to receive the next message from the channel. Also handles termination if needed.

0 commit comments

Comments
 (0)