Skip to content

Commit 014e122

Browse files
committed
sqlx-postgres: Remove PgStream from PgConnection
1 parent fb6b53b commit 014e122

File tree

5 files changed

+40
-174
lines changed

5 files changed

+40
-174
lines changed

sqlx-postgres/src/connection/establish.rs

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -13,15 +13,13 @@ use super::worker::Worker;
1313
impl PgConnection {
1414
pub(crate) async fn establish(options: &PgConnectOptions) -> Result<Self, Error> {
1515
// Upgrade to TLS if we were asked to and the server supports it
16-
let pg_stream = PgStream::connect(options).await?;
17-
1816
let stream = PgStream::connect(options).await?;
1917

2018
let (notif_tx, notif_rx) = unbounded();
2119

22-
let (x, shared) = Worker::spawn(stream.into_inner(), notif_tx);
20+
let (channel, shared) = Worker::spawn(stream.into_inner(), notif_tx);
2321

24-
let mut conn = PgConnection::new(pg_stream, options, x, notif_rx, shared);
22+
let mut conn = PgConnection::new(options, channel, notif_rx, shared);
2523

2624
// To begin a session, a frontend opens a connection to the server
2725
// and sends a startup message.

sqlx-postgres/src/connection/mod.rs

Lines changed: 2 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -23,8 +23,6 @@ use crate::{PgConnectOptions, PgTypeInfo, Postgres};
2323

2424
pub(crate) use sqlx_core::connection::*;
2525

26-
pub use self::stream::PgStream;
27-
2826
pub(crate) mod describe;
2927
mod establish;
3028
mod executor;
@@ -43,11 +41,6 @@ pub struct PgConnection {
4341
}
4442

4543
pub struct PgConnectionInner {
46-
// underlying TCP or UDS stream,
47-
// wrapped in a potentially TLS stream,
48-
// wrapped in a buffered stream
49-
pub(crate) stream: PgStream,
50-
5144
chan: UnboundedSender<IoRequest>,
5245

5346
pub(crate) notifications: UnboundedReceiver<Notification>,
@@ -112,7 +105,6 @@ impl PgConnection {
112105
}
113106

114107
fn new(
115-
stream: PgStream,
116108
options: &PgConnectOptions,
117109
chan: UnboundedSender<IoRequest>,
118110
notifications: UnboundedReceiver<Notification>,
@@ -132,7 +124,6 @@ impl PgConnection {
132124
cache_elem_type_to_array: HashMap::new(),
133125
cache_table_to_column_names: HashMap::new(),
134126
transaction_depth: 0,
135-
stream,
136127
server_version_num: None,
137128
shared,
138129
}),
@@ -289,7 +280,7 @@ impl Connection for PgConnection {
289280
}
290281

291282
fn shrink_buffers(&mut self) {
292-
self.inner.stream.shrink_buffers();
283+
// No-op
293284
}
294285

295286
#[doc(hidden)]
@@ -299,7 +290,7 @@ impl Connection for PgConnection {
299290

300291
#[doc(hidden)]
301292
fn should_flush(&self) -> bool {
302-
!self.inner.stream.write_buffer().is_empty()
293+
false
303294
}
304295
}
305296

sqlx-postgres/src/connection/stream.rs

Lines changed: 1 addition & 156 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,9 @@
1-
use std::ops::{ControlFlow, Deref, DerefMut};
21
use std::str::FromStr;
32

4-
use log::Level;
5-
use sqlx_core::bytes::Buf;
6-
73
use crate::connection::tls::MaybeUpgradeTls;
84
use crate::error::Error;
9-
use crate::message::{
10-
BackendMessage, BackendMessageFormat, EncodeMessage, FrontendMessage, Notice, ReceivedMessage,
11-
};
125
use crate::net::{self, BufferedSocket, Socket};
13-
use crate::{PgConnectOptions, PgDatabaseError, PgSeverity};
6+
use crate::PgConnectOptions;
147

158
// the stream is a separate type from the connection to uphold the invariant where an instantiated
169
// [PgConnection] is a **valid** connection to postgres
@@ -44,154 +37,6 @@ impl PgStream {
4437
inner: BufferedSocket::new(socket),
4538
})
4639
}
47-
48-
#[inline(always)]
49-
pub(crate) fn write_msg(&mut self, message: impl FrontendMessage) -> Result<(), Error> {
50-
self.write(EncodeMessage(message))
51-
}
52-
53-
pub(crate) async fn send<T>(&mut self, message: T) -> Result<(), Error>
54-
where
55-
T: FrontendMessage,
56-
{
57-
self.write_msg(message)?;
58-
self.flush().await?;
59-
Ok(())
60-
}
61-
62-
// Expect a specific type and format
63-
pub(crate) async fn recv_expect<B: BackendMessage>(&mut self) -> Result<B, Error> {
64-
self.recv().await?.decode()
65-
}
66-
67-
pub(crate) async fn recv_unchecked(&mut self) -> Result<ReceivedMessage, Error> {
68-
// NOTE: to not break everything, this should be cancel-safe;
69-
// DO NOT modify `buf` unless a full message has been read
70-
self.inner
71-
.try_read(|buf| {
72-
// all packets in postgres start with a 5-byte header
73-
// this header contains the message type and the total length of the message
74-
let Some(mut header) = buf.get(..5) else {
75-
return Ok(ControlFlow::Continue(5));
76-
};
77-
78-
let format = BackendMessageFormat::try_from_u8(header.get_u8())?;
79-
80-
let message_len = header.get_u32() as usize;
81-
82-
let expected_len = message_len
83-
.checked_add(1)
84-
// this shouldn't really happen but is mostly a sanity check
85-
.ok_or_else(|| {
86-
err_protocol!("message_len + 1 overflows usize: {message_len}")
87-
})?;
88-
89-
if buf.len() < expected_len {
90-
return Ok(ControlFlow::Continue(expected_len));
91-
}
92-
93-
// `buf` SHOULD NOT be modified ABOVE this line
94-
95-
// pop off the format code since it's not counted in `message_len`
96-
buf.advance(1);
97-
98-
// consume the message, including the length prefix
99-
let mut contents = buf.split_to(message_len).freeze();
100-
101-
// cut off the length prefix
102-
contents.advance(4);
103-
104-
Ok(ControlFlow::Break(ReceivedMessage { format, contents }))
105-
})
106-
.await
107-
}
108-
109-
// Get the next message from the server
110-
// May wait for more data from the server
111-
pub(crate) async fn recv(&mut self) -> Result<ReceivedMessage, Error> {
112-
loop {
113-
let message = self.recv_unchecked().await?;
114-
115-
match message.format {
116-
BackendMessageFormat::ErrorResponse => {
117-
// An error returned from the database server.
118-
return Err(message.decode::<PgDatabaseError>()?.into());
119-
}
120-
121-
// BackendMessageFormat::ParameterStatus => {
122-
// // informs the frontend about the current (initial)
123-
// // setting of backend parameters
124-
125-
// let ParameterStatus { name, value } = message.decode()?;
126-
// // TODO: handle `client_encoding`, `DateStyle` change
127-
128-
// match name.as_str() {
129-
// "server_version" => {
130-
// self.server_version_num = parse_server_version(&value);
131-
// }
132-
// _ => {
133-
// self.parameter_statuses.insert(name, value);
134-
// }
135-
// }
136-
137-
// continue;
138-
// }
139-
BackendMessageFormat::NoticeResponse => {
140-
// do we need this to be more configurable?
141-
// if you are reading this comment and think so, open an issue
142-
143-
let notice: Notice = message.decode()?;
144-
145-
let (log_level, tracing_level) = match notice.severity() {
146-
PgSeverity::Fatal | PgSeverity::Panic | PgSeverity::Error => {
147-
(Level::Error, tracing::Level::ERROR)
148-
}
149-
PgSeverity::Warning => (Level::Warn, tracing::Level::WARN),
150-
PgSeverity::Notice => (Level::Info, tracing::Level::INFO),
151-
PgSeverity::Debug => (Level::Debug, tracing::Level::DEBUG),
152-
PgSeverity::Info | PgSeverity::Log => (Level::Trace, tracing::Level::TRACE),
153-
};
154-
155-
let log_is_enabled = log::log_enabled!(
156-
target: "sqlx::postgres::notice",
157-
log_level
158-
) || sqlx_core::private_tracing_dynamic_enabled!(
159-
target: "sqlx::postgres::notice",
160-
tracing_level
161-
);
162-
if log_is_enabled {
163-
sqlx_core::private_tracing_dynamic_event!(
164-
target: "sqlx::postgres::notice",
165-
tracing_level,
166-
message = notice.message()
167-
);
168-
}
169-
170-
continue;
171-
}
172-
173-
_ => {}
174-
}
175-
176-
return Ok(message);
177-
}
178-
}
179-
}
180-
181-
impl Deref for PgStream {
182-
type Target = BufferedSocket<Box<dyn Socket>>;
183-
184-
#[inline]
185-
fn deref(&self) -> &Self::Target {
186-
&self.inner
187-
}
188-
}
189-
190-
impl DerefMut for PgStream {
191-
#[inline]
192-
fn deref_mut(&mut self) -> &mut Self::Target {
193-
&mut self.inner
194-
}
19540
}
19641

19742
// reference:

sqlx-postgres/src/connection/worker.rs

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ use std::{
88
};
99

1010
use crate::message::{
11-
BackendMessageFormat, FrontendMessage, Notification, ParameterStatus, ReadyForQuery,
11+
BackendMessageFormat, FrontendMessage, Notice, Notification, ParameterStatus, ReadyForQuery,
1212
ReceivedMessage, Terminate, TransactionStatus,
1313
};
1414
use futures_channel::mpsc::{unbounded, UnboundedReceiver, UnboundedSender};
@@ -153,12 +153,16 @@ impl Worker {
153153
}
154154
BackendMessageFormat::ParameterStatus => {
155155
// Asynchronous response
156-
//
157156
let ParameterStatus { name, value } = response.decode()?;
158157
self.shared.insert_parameter_status(name, value);
159158
}
160159
BackendMessageFormat::NoticeResponse => {
161-
// Asynchronous response - todo
160+
// do we need this to be more configurable?
161+
// if you are reading this comment and think so, open an issue
162+
163+
let notice: Notice = response.decode()?;
164+
165+
notice.emit_notice();
162166
}
163167
_ => self.send_back(response)?,
164168
}

sqlx-postgres/src/message/response.rs

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
use std::ops::Range;
22
use std::str::from_utf8;
33

4+
use log::Level;
45
use memchr::memchr;
56

67
use sqlx_core::bytes::Bytes;
@@ -90,6 +91,33 @@ impl Notice {
9091
.map(|(_, range)| &self.storage[range])
9192
.next()
9293
}
94+
95+
pub(crate) fn emit_notice(&self) {
96+
let (log_level, tracing_level) = match self.severity() {
97+
PgSeverity::Fatal | PgSeverity::Panic | PgSeverity::Error => {
98+
(Level::Error, tracing::Level::ERROR)
99+
}
100+
PgSeverity::Warning => (Level::Warn, tracing::Level::WARN),
101+
PgSeverity::Notice => (Level::Info, tracing::Level::INFO),
102+
PgSeverity::Debug => (Level::Debug, tracing::Level::DEBUG),
103+
PgSeverity::Info | PgSeverity::Log => (Level::Trace, tracing::Level::TRACE),
104+
};
105+
106+
let log_is_enabled = log::log_enabled!(
107+
target: "sqlx::postgres::notice",
108+
log_level
109+
) || sqlx_core::private_tracing_dynamic_enabled!(
110+
target: "sqlx::postgres::notice",
111+
tracing_level
112+
);
113+
if log_is_enabled {
114+
sqlx_core::private_tracing_dynamic_event!(
115+
target: "sqlx::postgres::notice",
116+
tracing_level,
117+
message = self.message()
118+
);
119+
}
120+
}
93121
}
94122

95123
impl Notice {

0 commit comments

Comments
 (0)