Skip to content

Commit 2d6cb0e

Browse files
committed
sqlx-postgres: Move executor to bg worker
1 parent c39501a commit 2d6cb0e

File tree

6 files changed

+107
-190
lines changed

6 files changed

+107
-190
lines changed

sqlx-core/src/net/socket/buffered.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -185,6 +185,7 @@ impl<S: Socket> BufferedSocket<S> {
185185
fn poll_handle_read(&mut self, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
186186
// Because of how `BytesMut` works, we should only be shifting capacity back and forth
187187
// between `read` and `available` unless we have to read an oversize message.
188+
188189
while self.read_buf.len() < self.wants_bytes {
189190
self.read_buf
190191
.reserve(self.wants_bytes - self.read_buf.len());

sqlx-postgres/src/connection/executor.rs

Lines changed: 75 additions & 128 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ use crate::io::{PortalId, StatementId};
55
use crate::logger::QueryLogger;
66
use crate::message::{
77
self, BackendMessageFormat, Bind, Close, CommandComplete, DataRow, ParameterDescription, Parse,
8-
ParseComplete, Query, RowDescription,
8+
ParseComplete, RowDescription,
99
};
1010
use crate::statement::PgStatementMetadata;
1111
use crate::{
@@ -20,6 +20,8 @@ use sqlx_core::arguments::Arguments;
2020
use sqlx_core::Either;
2121
use std::{borrow::Cow, pin::pin, sync::Arc};
2222

23+
use super::pipe::Pipe;
24+
2325
async fn prepare(
2426
conn: &mut PgConnection,
2527
sql: &str,
@@ -46,54 +48,47 @@ async fn prepare(
4648
param_types.push(conn.resolve_type_id(&ty.0).await?);
4749
}
4850

49-
// flush and wait until we are re-ready
50-
conn.wait_until_ready().await?;
51+
let mut pipe = conn.pipe(|buf| {
52+
// next we send the PARSE command to the server
53+
buf.write_msg(Parse {
54+
param_types: &param_types,
55+
query: sql,
56+
statement: id,
57+
})?;
58+
59+
if metadata.is_none() {
60+
// get the statement columns and parameters
61+
buf.write_msg(message::Describe::Statement(id))?;
62+
}
5163

52-
// next we send the PARSE command to the server
53-
conn.inner.stream.write_msg(Parse {
54-
param_types: &param_types,
55-
query: sql,
56-
statement: id,
64+
// we ask for the server to immediately send us the result of the PARSE command
65+
buf.write_sync();
66+
Ok(())
5767
})?;
5868

59-
if metadata.is_none() {
60-
// get the statement columns and parameters
61-
conn.inner
62-
.stream
63-
.write_msg(message::Describe::Statement(id))?;
64-
}
65-
66-
// we ask for the server to immediately send us the result of the PARSE command
67-
conn.write_sync();
68-
conn.inner.stream.flush().await?;
69-
7069
// indicates that the SQL query string is now successfully parsed and has semantic validity
71-
conn.inner.stream.recv_expect::<ParseComplete>().await?;
70+
pipe.recv_expect::<ParseComplete>().await?;
7271

7372
let metadata = if let Some(metadata) = metadata {
7473
// each SYNC produces one READY FOR QUERY
75-
conn.recv_ready_for_query().await?;
74+
pipe.recv_ready_for_query().await?;
7675

7776
// we already have metadata
7877
metadata
7978
} else {
80-
let parameters = recv_desc_params(conn).await?;
79+
let parameters = recv_desc_params(&mut pipe).await?;
8180

82-
let rows = recv_desc_rows(conn).await?;
81+
let rows = recv_desc_rows(&mut pipe).await?;
8382

8483
// each SYNC produces one READY FOR QUERY
85-
conn.recv_ready_for_query().await?;
84+
pipe.recv_ready_for_query().await?;
8685

8786
let parameters = conn.handle_parameter_description(parameters).await?;
8887

8988
let (columns, column_names) = conn
9089
.handle_row_description(rows, true, fetch_column_origin)
9190
.await?;
9291

93-
// ensure that if we did fetch custom data, we wait until we are fully ready before
94-
// continuing
95-
conn.wait_until_ready().await?;
96-
9792
Arc::new(PgStatementMetadata {
9893
parameters,
9994
columns,
@@ -104,12 +99,12 @@ async fn prepare(
10499
Ok((id, metadata))
105100
}
106101

107-
async fn recv_desc_params(conn: &mut PgConnection) -> Result<ParameterDescription, Error> {
108-
conn.inner.stream.recv_expect().await
102+
async fn recv_desc_params(pipe: &mut Pipe) -> Result<ParameterDescription, Error> {
103+
pipe.recv_expect().await
109104
}
110105

111-
async fn recv_desc_rows(conn: &mut PgConnection) -> Result<Option<RowDescription>, Error> {
112-
let rows: Option<RowDescription> = match conn.inner.stream.recv().await? {
106+
async fn recv_desc_rows(pipe: &mut Pipe) -> Result<Option<RowDescription>, Error> {
107+
let rows: Option<RowDescription> = match pipe.recv().await? {
113108
// describes the rows that will be returned when the statement is eventually executed
114109
message if message.format == BackendMessageFormat::RowDescription => {
115110
Some(message.decode()?)
@@ -130,44 +125,6 @@ async fn recv_desc_rows(conn: &mut PgConnection) -> Result<Option<RowDescription
130125
}
131126

132127
impl PgConnection {
133-
// wait for CloseComplete to indicate a statement was closed
134-
pub(super) async fn wait_for_close_complete(&mut self, mut count: usize) -> Result<(), Error> {
135-
// we need to wait for the [CloseComplete] to be returned from the server
136-
while count > 0 {
137-
match self.inner.stream.recv().await? {
138-
message if message.format == BackendMessageFormat::PortalSuspended => {
139-
// there was an open portal
140-
// this can happen if the last time a statement was used it was not fully executed
141-
}
142-
143-
message if message.format == BackendMessageFormat::CloseComplete => {
144-
// successfully closed the statement (and freed up the server resources)
145-
count -= 1;
146-
}
147-
148-
message => {
149-
return Err(err_protocol!(
150-
"expecting PortalSuspended or CloseComplete but received {:?}",
151-
message.format
152-
));
153-
}
154-
}
155-
}
156-
157-
Ok(())
158-
}
159-
160-
#[inline(always)]
161-
pub(crate) fn write_sync(&mut self) {
162-
self.inner
163-
.stream
164-
.write_msg(message::Sync)
165-
.expect("BUG: Sync should not be too big for protocol");
166-
167-
// all SYNC messages will return a ReadyForQuery
168-
self.inner.pending_ready_for_query_count += 1;
169-
}
170-
171128
async fn get_or_prepare(
172129
&mut self,
173130
sql: &str,
@@ -194,13 +151,14 @@ impl PgConnection {
194151

195152
if persistent && self.inner.cache_statement.is_enabled() {
196153
if let Some((id, _)) = self.inner.cache_statement.insert(sql, statement.clone()) {
197-
self.inner.stream.write_msg(Close::Statement(id))?;
198-
self.write_sync();
199-
200-
self.inner.stream.flush().await?;
201-
202-
self.wait_for_close_complete(1).await?;
203-
self.recv_ready_for_query().await?;
154+
let mut pipe = self.pipe(|buf| {
155+
buf.write_msg(Close::Statement(id))?;
156+
buf.write_sync();
157+
Ok(())
158+
})?;
159+
160+
pipe.wait_for_close_complete(1).await?;
161+
pipe.recv_ready_for_query().await?;
204162
}
205163
}
206164

@@ -216,10 +174,8 @@ impl PgConnection {
216174
) -> Result<impl Stream<Item = Result<Either<PgQueryResult, PgRow>, Error>> + 'e, Error> {
217175
let mut logger = QueryLogger::new(query, self.inner.log_settings.clone());
218176

219-
// before we continue, wait until we are "ready" to accept more queries
220-
self.wait_until_ready().await?;
221-
222177
let mut metadata: Arc<PgStatementMetadata>;
178+
let mut pipe: Pipe;
223179

224180
let format = if let Some(mut arguments) = arguments {
225181
// Check this before we write anything to the stream.
@@ -246,53 +202,50 @@ impl PgConnection {
246202
// patch holes created during encoding
247203
arguments.apply_patches(self, &metadata.parameters).await?;
248204

249-
// consume messages till `ReadyForQuery` before bind and execute
250-
self.wait_until_ready().await?;
251-
252-
// bind to attach the arguments to the statement and create a portal
253-
self.inner.stream.write_msg(Bind {
254-
portal: PortalId::UNNAMED,
255-
statement,
256-
formats: &[PgValueFormat::Binary],
257-
num_params,
258-
params: &arguments.buffer,
259-
result_formats: &[PgValueFormat::Binary],
260-
})?;
261-
262-
// executes the portal up to the passed limit
263-
// the protocol-level limit acts nearly identically to the `LIMIT` in SQL
264-
self.inner.stream.write_msg(message::Execute {
265-
portal: PortalId::UNNAMED,
266-
// Non-zero limits cause query plan pessimization by disabling parallel workers:
267-
// https://github.com/launchbadge/sqlx/issues/3673
268-
limit: 0,
205+
pipe = self.pipe(|buf| {
206+
// bind to attach the arguments to the statement and create a portal
207+
buf.write_msg(Bind {
208+
portal: PortalId::UNNAMED,
209+
statement,
210+
formats: &[PgValueFormat::Binary],
211+
num_params,
212+
params: &arguments.buffer,
213+
result_formats: &[PgValueFormat::Binary],
214+
})?;
215+
216+
// executes the portal up to the passed limit
217+
// the protocol-level limit acts nearly identically to the `LIMIT` in SQL
218+
buf.write_msg(message::Execute {
219+
portal: PortalId::UNNAMED,
220+
// Non-zero limits cause query plan pessimization by disabling parallel workers:
221+
// https://github.com/launchbadge/sqlx/issues/3673
222+
limit: 0,
223+
})?;
224+
// From https://www.postgresql.org/docs/current/protocol-flow.html:
225+
//
226+
// "An unnamed portal is destroyed at the end of the transaction, or as
227+
// soon as the next Bind statement specifying the unnamed portal as
228+
// destination is issued. (Note that a simple Query message also
229+
// destroys the unnamed portal."
230+
231+
// we ask the database server to close the unnamed portal and free the associated resources
232+
// earlier - after the execution of the current query.
233+
buf.write_msg(Close::Portal(PortalId::UNNAMED))?;
234+
235+
// finally, [Sync] asks postgres to process the messages that we sent and respond with
236+
// a [ReadyForQuery] message when it's completely done. Theoretically, we could send
237+
// dozens of queries before a [Sync] and postgres can handle that. Execution on the server
238+
// is still serial but it would reduce round-trips. Some kind of builder pattern that is
239+
// termed batching might suit this.
240+
buf.write_sync();
241+
Ok(())
269242
})?;
270-
// From https://www.postgresql.org/docs/current/protocol-flow.html:
271-
//
272-
// "An unnamed portal is destroyed at the end of the transaction, or as
273-
// soon as the next Bind statement specifying the unnamed portal as
274-
// destination is issued. (Note that a simple Query message also
275-
// destroys the unnamed portal."
276-
277-
// we ask the database server to close the unnamed portal and free the associated resources
278-
// earlier - after the execution of the current query.
279-
self.inner
280-
.stream
281-
.write_msg(Close::Portal(PortalId::UNNAMED))?;
282-
283-
// finally, [Sync] asks postgres to process the messages that we sent and respond with
284-
// a [ReadyForQuery] message when it's completely done. Theoretically, we could send
285-
// dozens of queries before a [Sync] and postgres can handle that. Execution on the server
286-
// is still serial but it would reduce round-trips. Some kind of builder pattern that is
287-
// termed batching might suit this.
288-
self.write_sync();
289243

290244
// prepared statements are binary
291245
PgValueFormat::Binary
292246
} else {
293247
// Query will trigger a ReadyForQuery
294-
self.inner.stream.write_msg(Query(query))?;
295-
self.inner.pending_ready_for_query_count += 1;
248+
pipe = self.queue_simple_query(query)?;
296249

297250
// metadata starts out as "nothing"
298251
metadata = Arc::new(PgStatementMetadata::default());
@@ -301,11 +254,9 @@ impl PgConnection {
301254
PgValueFormat::Text
302255
};
303256

304-
self.inner.stream.flush().await?;
305-
306257
Ok(try_stream! {
307258
loop {
308-
let message = self.inner.stream.recv().await?;
259+
let message = pipe.recv().await?;
309260

310261
match message.format {
311262
BackendMessageFormat::BindComplete
@@ -463,8 +414,6 @@ impl<'c> Executor<'c> for &'c mut PgConnection {
463414
'c: 'e,
464415
{
465416
Box::pin(async move {
466-
self.wait_until_ready().await?;
467-
468417
let (_, metadata) = self
469418
.get_or_prepare(sql, parameters, true, None, true)
470419
.await?;
@@ -484,8 +433,6 @@ impl<'c> Executor<'c> for &'c mut PgConnection {
484433
'c: 'e,
485434
{
486435
Box::pin(async move {
487-
self.wait_until_ready().await?;
488-
489436
let (stmt_id, metadata) = self.get_or_prepare(sql, &[], true, None, true).await?;
490437

491438
let nullable = self.get_nullable_for_columns(stmt_id, &metadata).await?;

0 commit comments

Comments
 (0)