Skip to content

Commit fd3782c

Browse files
committed
sqlx-postgres: Add Pipe utility
1 parent 5e76eb2 commit fd3782c

File tree

2 files changed

+82
-0
lines changed

2 files changed

+82
-0
lines changed

sqlx-postgres/src/connection/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ pub use self::stream::PgStream;
2727
pub(crate) mod describe;
2828
mod establish;
2929
mod executor;
30+
mod pipe;
3031
mod request;
3132
mod sasl;
3233
mod stream;

sqlx-postgres/src/connection/pipe.rs

Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,81 @@
1+
use futures_channel::mpsc::UnboundedReceiver;
2+
use futures_util::StreamExt;
3+
use sqlx_core::Error;
4+
5+
use crate::{
6+
message::{BackendMessage, BackendMessageFormat, ReadyForQuery, ReceivedMessage},
7+
PgDatabaseError,
8+
};
9+
10+
pub struct Pipe {
11+
receiver: UnboundedReceiver<ReceivedMessage>,
12+
}
13+
14+
impl Pipe {
15+
pub fn new(receiver: UnboundedReceiver<ReceivedMessage>) -> Pipe {
16+
Self { receiver }
17+
}
18+
19+
pub(crate) async fn recv_expect<B: BackendMessage>(&mut self) -> Result<B, Error> {
20+
self.recv().await?.decode()
21+
}
22+
23+
pub async fn recv_ready_for_query(&mut self) -> Result<(), Error> {
24+
let _: ReadyForQuery = self.recv_expect().await?;
25+
Ok(())
26+
}
27+
28+
pub(crate) async fn wait_ready_for_query(&mut self) -> Result<(), Error> {
29+
loop {
30+
let message = self.recv().await?;
31+
32+
if let BackendMessageFormat::ReadyForQuery = message.format {
33+
let _: ReadyForQuery = message.decode()?;
34+
break;
35+
}
36+
}
37+
38+
Ok(())
39+
}
40+
41+
// wait for CloseComplete to indicate a statement was closed
42+
pub async fn wait_for_close_complete(&mut self, mut count: usize) -> Result<(), Error> {
43+
// we need to wait for the [CloseComplete] to be returned from the server
44+
while count > 0 {
45+
match self.recv().await? {
46+
message if message.format == BackendMessageFormat::PortalSuspended => {
47+
// there was an open portal
48+
// this can happen if the last time a statement was used it was not fully executed
49+
}
50+
51+
message if message.format == BackendMessageFormat::CloseComplete => {
52+
// successfully closed the statement (and freed up the server resources)
53+
count -= 1;
54+
}
55+
56+
message => {
57+
return Err(err_protocol!(
58+
"expecting PortalSuspended or CloseComplete but received {:?}",
59+
message.format
60+
));
61+
}
62+
}
63+
}
64+
65+
Ok(())
66+
}
67+
68+
pub(crate) async fn recv(&mut self) -> Result<ReceivedMessage, Error> {
69+
let message = self
70+
.receiver
71+
.next()
72+
.await
73+
.ok_or_else(|| sqlx_core::Error::WorkerCrashed)?;
74+
75+
if message.format == BackendMessageFormat::ErrorResponse {
76+
Err(message.decode::<PgDatabaseError>()?.into())
77+
} else {
78+
Ok(message)
79+
}
80+
}
81+
}

0 commit comments

Comments
 (0)