1
1
use std:: fmt:: { self , Debug } ;
2
- use std:: io;
3
2
use std:: str:: from_utf8;
4
3
5
- use futures_channel:: mpsc;
6
4
use futures_core:: future:: BoxFuture ;
7
5
use futures_core:: stream:: { BoxStream , Stream } ;
8
6
use futures_util:: { FutureExt , StreamExt , TryFutureExt , TryStreamExt } ;
9
7
use sqlx_core:: acquire:: Acquire ;
10
8
use sqlx_core:: transaction:: Transaction ;
11
9
use sqlx_core:: Either ;
12
- use tracing:: Instrument ;
13
10
14
11
use crate :: describe:: Describe ;
15
12
use crate :: error:: Error ;
16
13
use crate :: executor:: { Execute , Executor } ;
17
- use crate :: message:: { BackendMessageFormat , Notification } ;
14
+ use crate :: message:: Notification ;
18
15
use crate :: pool:: PoolOptions ;
19
16
use crate :: pool:: { Pool , PoolConnection } ;
20
17
use crate :: { PgConnection , PgQueryResult , PgRow , PgStatement , PgTypeInfo , Postgres } ;
@@ -28,8 +25,6 @@ use crate::{PgConnection, PgQueryResult, PgRow, PgStatement, PgTypeInfo, Postgre
28
25
pub struct PgListener {
29
26
pool : Pool < Postgres > ,
30
27
connection : Option < PoolConnection < Postgres > > ,
31
- buffer_rx : mpsc:: UnboundedReceiver < Notification > ,
32
- buffer_tx : Option < mpsc:: UnboundedSender < Notification > > ,
33
28
channels : Vec < String > ,
34
29
ignore_close_event : bool ,
35
30
eager_reconnect : bool ,
@@ -58,17 +53,11 @@ impl PgListener {
58
53
59
54
pub async fn connect_with ( pool : & Pool < Postgres > ) -> Result < Self , Error > {
60
55
// Pull out an initial connection
61
- let mut connection = pool. acquire ( ) . await ?;
62
-
63
- // Setup a notification buffer
64
- let ( sender, receiver) = mpsc:: unbounded ( ) ;
65
- connection. inner . stream . notifications = Some ( sender) ;
56
+ let connection = pool. acquire ( ) . await ?;
66
57
67
58
Ok ( Self {
68
59
pool : pool. clone ( ) ,
69
60
connection : Some ( connection) ,
70
- buffer_rx : receiver,
71
- buffer_tx : None ,
72
61
channels : Vec :: new ( ) ,
73
62
ignore_close_event : false ,
74
63
eager_reconnect : true ,
@@ -173,7 +162,6 @@ impl PgListener {
173
162
async fn connect_if_needed ( & mut self ) -> Result < ( ) , Error > {
174
163
if self . connection . is_none ( ) {
175
164
let mut connection = self . pool . acquire ( ) . await ?;
176
- connection. inner . stream . notifications = self . buffer_tx . take ( ) ;
177
165
178
166
connection
179
167
. execute ( & * build_listen_all_query ( & self . channels ) )
@@ -263,67 +251,37 @@ impl PgListener {
263
251
// Fetch our `CloseEvent` listener, if applicable.
264
252
let mut close_event = ( !self . ignore_close_event ) . then ( || self . pool . close_event ( ) ) ;
265
253
266
- loop {
267
- let next_message = self . connection ( ) . await ?. inner . stream . recv_unchecked ( ) ;
268
-
269
- let res = if let Some ( ref mut close_event) = close_event {
270
- // cancels the wait and returns `Err(PoolClosed)` if the pool is closed
271
- // before `next_message` returns, or if the pool was already closed
272
- close_event. do_until ( next_message) . await ?
273
- } else {
274
- next_message. await
275
- } ;
276
-
277
- let message = match res {
278
- Ok ( message) => message,
279
-
280
- // The connection is dead, ensure that it is dropped,
281
- // update self state, and loop to try again.
282
- Err ( Error :: Io ( err) )
283
- if matches ! (
284
- err. kind( ) ,
285
- io:: ErrorKind :: ConnectionAborted |
286
- io:: ErrorKind :: UnexpectedEof |
287
- // see ERRORS section in tcp(7) man page (https://man7.org/linux/man-pages/man7/tcp.7.html)
288
- io:: ErrorKind :: TimedOut |
289
- io:: ErrorKind :: BrokenPipe
290
- ) =>
291
- {
292
- if let Some ( mut conn) = self . connection . take ( ) {
293
- self . buffer_tx = conn. inner . stream . notifications . take ( ) ;
294
- // Close the connection in a background task, so we can continue.
295
- conn. close_on_drop ( ) ;
296
- }
297
-
298
- if self . eager_reconnect {
299
- self . connect_if_needed ( ) . await ?;
300
- }
301
-
302
- // lost connection
303
- return Ok ( None ) ;
304
- }
305
-
306
- // Forward other errors
307
- Err ( error) => {
308
- return Err ( error) ;
309
- }
310
- } ;
254
+ let next_message = self . connection ( ) . await ?. inner . notifications . next ( ) ;
311
255
312
- match message. format {
313
- // We've received an async notification, return it.
314
- BackendMessageFormat :: NotificationResponse => {
315
- return Ok ( Some ( PgNotification ( message. decode ( ) ?) ) ) ;
256
+ let res = if let Some ( ref mut close_event) = close_event {
257
+ // cancels the wait and returns `Err(PoolClosed)` if the pool is closed
258
+ // before `next_message` returns, or if the pool was already closed
259
+ close_event. do_until ( next_message) . await ?
260
+ } else {
261
+ next_message. await
262
+ } ;
263
+
264
+ let message = match res {
265
+ Some ( message) => message,
266
+
267
+ // The connection is dead, ensure that it is dropped,
268
+ // update self state, and loop to try again.
269
+ None => {
270
+ if let Some ( mut conn) = self . connection . take ( ) {
271
+ // Close the connection in a background task, so we can continue.
272
+ conn. close_on_drop ( ) ;
316
273
}
317
274
318
- // Mark the connection as ready for another query
319
- BackendMessageFormat :: ReadyForQuery => {
320
- self . connection ( ) . await ?. inner . pending_ready_for_query_count -= 1 ;
275
+ if self . eager_reconnect {
276
+ self . connect_if_needed ( ) . await ?;
321
277
}
322
278
323
- // Ignore unexpected messages
324
- _ => { }
279
+ // lost connection
280
+ return Ok ( None ) ;
325
281
}
326
- }
282
+ } ;
283
+
284
+ Ok ( Some ( PgNotification ( message) ) )
327
285
}
328
286
329
287
/// Receives the next notification that already exists in the connection buffer, if any.
@@ -332,7 +290,7 @@ impl PgListener {
332
290
///
333
291
/// This is helpful if you want to retrieve all buffered notifications and process them in batches.
334
292
pub fn next_buffered ( & mut self ) -> Option < PgNotification > {
335
- if let Ok ( Some ( notification) ) = self . buffer_rx . try_next ( ) {
293
+ if let Ok ( Some ( notification) ) = self . connection . as_mut ( ) ? . inner . notifications . try_next ( ) {
336
294
Some ( PgNotification ( notification) )
337
295
} else {
338
296
None
@@ -356,18 +314,8 @@ impl PgListener {
356
314
357
315
impl Drop for PgListener {
358
316
fn drop ( & mut self ) {
359
- if let Some ( mut conn) = self . connection . take ( ) {
360
- let fut = async move {
361
- let _ = conn. execute ( "UNLISTEN *" ) . await ;
362
-
363
- // inline the drop handler from `PoolConnection` so it doesn't try to spawn another task
364
- // otherwise, it may trigger a panic if this task is dropped because the runtime is going away:
365
- // https://github.com/launchbadge/sqlx/issues/1389
366
- conn. return_to_pool ( ) . await ;
367
- } ;
368
-
369
- // Unregister any listeners before returning the connection to the pool.
370
- crate :: rt:: spawn ( fut. in_current_span ( ) ) ;
317
+ if let Some ( conn) = self . connection . take ( ) {
318
+ let _ = conn. queue_simple_query ( "UNLISTEN *" ) ;
371
319
}
372
320
}
373
321
}
0 commit comments