Skip to content

Commit 598aeaf

Browse files
committed
sqlx-postgres: Update PgListener
1 parent eeb78b4 commit 598aeaf

File tree

4 files changed

+34
-110
lines changed

4 files changed

+34
-110
lines changed

sqlx-postgres/src/connection/mod.rs

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ pub struct PgConnectionInner {
5050

5151
chan: UnboundedSender<IoRequest>,
5252

53-
notifications: UnboundedReceiver<Notification>,
53+
pub(crate) notifications: UnboundedReceiver<Notification>,
5454

5555
// process id of this backend
5656
// used to send cancel requests
@@ -74,9 +74,6 @@ pub struct PgConnectionInner {
7474
cache_type_oid: HashMap<UStr, Oid>,
7575
cache_elem_type_to_array: HashMap<Oid, Oid>,
7676

77-
// number of ReadyForQuery messages that we are currently expecting
78-
pub(crate) pending_ready_for_query_count: usize,
79-
8077
// current transaction status
8178
transaction_status: TransactionStatus,
8279
pub(crate) transaction_depth: usize,
@@ -101,7 +98,7 @@ impl PgConnection {
10198
///
10299
/// Used for rolling back transactions and releasing advisory locks.
103100
#[inline(always)]
104-
pub(crate) fn queue_simple_query(&mut self, query: &str) -> Result<Pipe, Error> {
101+
pub(crate) fn queue_simple_query(&self, query: &str) -> Result<Pipe, Error> {
105102
self.pipe(|buf| buf.write_msg(Query(query)))
106103
}
107104

@@ -132,7 +129,6 @@ impl PgConnection {
132129
cache_elem_type_to_array: HashMap::new(),
133130
transaction_depth: 0,
134131
stream,
135-
pending_ready_for_query_count: 0,
136132
transaction_status: TransactionStatus::Idle,
137133
}),
138134
}

sqlx-postgres/src/connection/stream.rs

Lines changed: 2 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -2,16 +2,14 @@ use std::collections::BTreeMap;
22
use std::ops::{ControlFlow, Deref, DerefMut};
33
use std::str::FromStr;
44

5-
use futures_channel::mpsc::UnboundedSender;
6-
use futures_util::SinkExt;
75
use log::Level;
86
use sqlx_core::bytes::Buf;
97

108
use crate::connection::tls::MaybeUpgradeTls;
119
use crate::error::Error;
1210
use crate::message::{
13-
BackendMessage, BackendMessageFormat, EncodeMessage, FrontendMessage, Notice, Notification,
14-
ParameterStatus, ReceivedMessage,
11+
BackendMessage, BackendMessageFormat, EncodeMessage, FrontendMessage, Notice, ParameterStatus,
12+
ReceivedMessage,
1513
};
1614
use crate::net::{self, BufferedSocket, Socket};
1715
use crate::{PgConnectOptions, PgDatabaseError, PgSeverity};
@@ -30,11 +28,6 @@ pub struct PgStream {
3028
// function call as well as the syscall.
3129
inner: BufferedSocket<Box<dyn Socket>>,
3230

33-
// buffer of unreceived notification messages from `PUBLISH`
34-
// this is set when creating a PgListener and only written to if that listener is
35-
// re-used for query execution in-between receiving messages
36-
pub(crate) notifications: Option<UnboundedSender<Notification>>,
37-
3831
pub(crate) parameter_statuses: BTreeMap<String, String>,
3932

4033
pub(crate) server_version_num: Option<u32>,
@@ -55,7 +48,6 @@ impl PgStream {
5548

5649
Ok(Self {
5750
inner: BufferedSocket::new(socket),
58-
notifications: None,
5951
parameter_statuses: BTreeMap::default(),
6052
server_version_num: None,
6153
})
@@ -134,15 +126,6 @@ impl PgStream {
134126
return Err(message.decode::<PgDatabaseError>()?.into());
135127
}
136128

137-
BackendMessageFormat::NotificationResponse => {
138-
if let Some(buffer) = &mut self.notifications {
139-
let notification: Notification = message.decode()?;
140-
let _ = buffer.send(notification).await;
141-
142-
continue;
143-
}
144-
}
145-
146129
BackendMessageFormat::ParameterStatus => {
147130
// informs the frontend about the current (initial)
148131
// setting of backend parameters

sqlx-postgres/src/listener.rs

Lines changed: 30 additions & 82 deletions
Original file line numberDiff line numberDiff line change
@@ -1,20 +1,17 @@
11
use std::fmt::{self, Debug};
2-
use std::io;
32
use std::str::from_utf8;
43

5-
use futures_channel::mpsc;
64
use futures_core::future::BoxFuture;
75
use futures_core::stream::{BoxStream, Stream};
86
use futures_util::{FutureExt, StreamExt, TryFutureExt, TryStreamExt};
97
use sqlx_core::acquire::Acquire;
108
use sqlx_core::transaction::Transaction;
119
use sqlx_core::Either;
12-
use tracing::Instrument;
1310

1411
use crate::describe::Describe;
1512
use crate::error::Error;
1613
use crate::executor::{Execute, Executor};
17-
use crate::message::{BackendMessageFormat, Notification};
14+
use crate::message::Notification;
1815
use crate::pool::PoolOptions;
1916
use crate::pool::{Pool, PoolConnection};
2017
use crate::{PgConnection, PgQueryResult, PgRow, PgStatement, PgTypeInfo, Postgres};
@@ -28,8 +25,6 @@ use crate::{PgConnection, PgQueryResult, PgRow, PgStatement, PgTypeInfo, Postgre
2825
pub struct PgListener {
2926
pool: Pool<Postgres>,
3027
connection: Option<PoolConnection<Postgres>>,
31-
buffer_rx: mpsc::UnboundedReceiver<Notification>,
32-
buffer_tx: Option<mpsc::UnboundedSender<Notification>>,
3328
channels: Vec<String>,
3429
ignore_close_event: bool,
3530
eager_reconnect: bool,
@@ -58,17 +53,11 @@ impl PgListener {
5853

5954
pub async fn connect_with(pool: &Pool<Postgres>) -> Result<Self, Error> {
6055
// Pull out an initial connection
61-
let mut connection = pool.acquire().await?;
62-
63-
// Setup a notification buffer
64-
let (sender, receiver) = mpsc::unbounded();
65-
connection.inner.stream.notifications = Some(sender);
56+
let connection = pool.acquire().await?;
6657

6758
Ok(Self {
6859
pool: pool.clone(),
6960
connection: Some(connection),
70-
buffer_rx: receiver,
71-
buffer_tx: None,
7261
channels: Vec::new(),
7362
ignore_close_event: false,
7463
eager_reconnect: true,
@@ -173,7 +162,6 @@ impl PgListener {
173162
async fn connect_if_needed(&mut self) -> Result<(), Error> {
174163
if self.connection.is_none() {
175164
let mut connection = self.pool.acquire().await?;
176-
connection.inner.stream.notifications = self.buffer_tx.take();
177165

178166
connection
179167
.execute(&*build_listen_all_query(&self.channels))
@@ -263,67 +251,37 @@ impl PgListener {
263251
// Fetch our `CloseEvent` listener, if applicable.
264252
let mut close_event = (!self.ignore_close_event).then(|| self.pool.close_event());
265253

266-
loop {
267-
let next_message = self.connection().await?.inner.stream.recv_unchecked();
268-
269-
let res = if let Some(ref mut close_event) = close_event {
270-
// cancels the wait and returns `Err(PoolClosed)` if the pool is closed
271-
// before `next_message` returns, or if the pool was already closed
272-
close_event.do_until(next_message).await?
273-
} else {
274-
next_message.await
275-
};
276-
277-
let message = match res {
278-
Ok(message) => message,
279-
280-
// The connection is dead, ensure that it is dropped,
281-
// update self state, and loop to try again.
282-
Err(Error::Io(err))
283-
if matches!(
284-
err.kind(),
285-
io::ErrorKind::ConnectionAborted |
286-
io::ErrorKind::UnexpectedEof |
287-
// see ERRORS section in tcp(7) man page (https://man7.org/linux/man-pages/man7/tcp.7.html)
288-
io::ErrorKind::TimedOut |
289-
io::ErrorKind::BrokenPipe
290-
) =>
291-
{
292-
if let Some(mut conn) = self.connection.take() {
293-
self.buffer_tx = conn.inner.stream.notifications.take();
294-
// Close the connection in a background task, so we can continue.
295-
conn.close_on_drop();
296-
}
297-
298-
if self.eager_reconnect {
299-
self.connect_if_needed().await?;
300-
}
301-
302-
// lost connection
303-
return Ok(None);
304-
}
305-
306-
// Forward other errors
307-
Err(error) => {
308-
return Err(error);
309-
}
310-
};
254+
let next_message = self.connection().await?.inner.notifications.next();
311255

312-
match message.format {
313-
// We've received an async notification, return it.
314-
BackendMessageFormat::NotificationResponse => {
315-
return Ok(Some(PgNotification(message.decode()?)));
256+
let res = if let Some(ref mut close_event) = close_event {
257+
// cancels the wait and returns `Err(PoolClosed)` if the pool is closed
258+
// before `next_message` returns, or if the pool was already closed
259+
close_event.do_until(next_message).await?
260+
} else {
261+
next_message.await
262+
};
263+
264+
let message = match res {
265+
Some(message) => message,
266+
267+
// The connection is dead, ensure that it is dropped,
268+
// update self state, and loop to try again.
269+
None => {
270+
if let Some(mut conn) = self.connection.take() {
271+
// Close the connection in a background task, so we can continue.
272+
conn.close_on_drop();
316273
}
317274

318-
// Mark the connection as ready for another query
319-
BackendMessageFormat::ReadyForQuery => {
320-
self.connection().await?.inner.pending_ready_for_query_count -= 1;
275+
if self.eager_reconnect {
276+
self.connect_if_needed().await?;
321277
}
322278

323-
// Ignore unexpected messages
324-
_ => {}
279+
// lost connection
280+
return Ok(None);
325281
}
326-
}
282+
};
283+
284+
Ok(Some(PgNotification(message)))
327285
}
328286

329287
/// Receives the next notification that already exists in the connection buffer, if any.
@@ -332,7 +290,7 @@ impl PgListener {
332290
///
333291
/// This is helpful if you want to retrieve all buffered notifications and process them in batches.
334292
pub fn next_buffered(&mut self) -> Option<PgNotification> {
335-
if let Ok(Some(notification)) = self.buffer_rx.try_next() {
293+
if let Ok(Some(notification)) = self.connection.as_mut()?.inner.notifications.try_next() {
336294
Some(PgNotification(notification))
337295
} else {
338296
None
@@ -356,18 +314,8 @@ impl PgListener {
356314

357315
impl Drop for PgListener {
358316
fn drop(&mut self) {
359-
if let Some(mut conn) = self.connection.take() {
360-
let fut = async move {
361-
let _ = conn.execute("UNLISTEN *").await;
362-
363-
// inline the drop handler from `PoolConnection` so it doesn't try to spawn another task
364-
// otherwise, it may trigger a panic if this task is dropped because the runtime is going away:
365-
// https://github.com/launchbadge/sqlx/issues/1389
366-
conn.return_to_pool().await;
367-
};
368-
369-
// Unregister any listeners before returning the connection to the pool.
370-
crate::rt::spawn(fut.in_current_span());
317+
if let Some(conn) = self.connection.take() {
318+
let _ = conn.queue_simple_query("UNLISTEN *");
371319
}
372320
}
373321
}

tests/postgres/postgres.rs

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1126,9 +1126,6 @@ async fn test_listener_try_recv_buffered() -> anyhow::Result<()> {
11261126
txn.commit().await?;
11271127
}
11281128

1129-
// Still no notifications buffered, since we haven't awaited the listener yet.
1130-
assert!(listener.next_buffered().is_none());
1131-
11321129
// Activate connection.
11331130
sqlx::query!("SELECT 1 AS one")
11341131
.fetch_all(&mut listener)

0 commit comments

Comments
 (0)