Skip to content

Commit eeb78b4

Browse files
committed
sqlx-postgres: Move executor to bg worker
1 parent 7da5f63 commit eeb78b4

File tree

6 files changed

+107
-190
lines changed

6 files changed

+107
-190
lines changed

sqlx-core/src/net/socket/buffered.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -185,6 +185,7 @@ impl<S: Socket> BufferedSocket<S> {
185185
fn poll_handle_read(&mut self, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
186186
// Because of how `BytesMut` works, we should only be shifting capacity back and forth
187187
// between `read` and `available` unless we have to read an oversize message.
188+
188189
while self.read_buf.len() < self.wants_bytes {
189190
self.read_buf
190191
.reserve(self.wants_bytes - self.read_buf.len());

sqlx-postgres/src/connection/executor.rs

Lines changed: 75 additions & 128 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ use crate::io::{PortalId, StatementId};
55
use crate::logger::QueryLogger;
66
use crate::message::{
77
self, BackendMessageFormat, Bind, Close, CommandComplete, DataRow, ParameterDescription, Parse,
8-
ParseComplete, Query, RowDescription,
8+
ParseComplete, RowDescription,
99
};
1010
use crate::statement::PgStatementMetadata;
1111
use crate::{
@@ -20,6 +20,8 @@ use sqlx_core::arguments::Arguments;
2020
use sqlx_core::Either;
2121
use std::{borrow::Cow, pin::pin, sync::Arc};
2222

23+
use super::pipe::Pipe;
24+
2325
async fn prepare(
2426
conn: &mut PgConnection,
2527
sql: &str,
@@ -45,52 +47,45 @@ async fn prepare(
4547
param_types.push(conn.resolve_type_id(&ty.0).await?);
4648
}
4749

48-
// flush and wait until we are re-ready
49-
conn.wait_until_ready().await?;
50+
let mut pipe = conn.pipe(|buf| {
51+
// next we send the PARSE command to the server
52+
buf.write_msg(Parse {
53+
param_types: &param_types,
54+
query: sql,
55+
statement: id,
56+
})?;
57+
58+
if metadata.is_none() {
59+
// get the statement columns and parameters
60+
buf.write_msg(message::Describe::Statement(id))?;
61+
}
5062

51-
// next we send the PARSE command to the server
52-
conn.inner.stream.write_msg(Parse {
53-
param_types: &param_types,
54-
query: sql,
55-
statement: id,
63+
// we ask for the server to immediately send us the result of the PARSE command
64+
buf.write_sync();
65+
Ok(())
5666
})?;
5767

58-
if metadata.is_none() {
59-
// get the statement columns and parameters
60-
conn.inner
61-
.stream
62-
.write_msg(message::Describe::Statement(id))?;
63-
}
64-
65-
// we ask for the server to immediately send us the result of the PARSE command
66-
conn.write_sync();
67-
conn.inner.stream.flush().await?;
68-
6968
// indicates that the SQL query string is now successfully parsed and has semantic validity
70-
conn.inner.stream.recv_expect::<ParseComplete>().await?;
69+
pipe.recv_expect::<ParseComplete>().await?;
7170

7271
let metadata = if let Some(metadata) = metadata {
7372
// each SYNC produces one READY FOR QUERY
74-
conn.recv_ready_for_query().await?;
73+
pipe.recv_ready_for_query().await?;
7574

7675
// we already have metadata
7776
metadata
7877
} else {
79-
let parameters = recv_desc_params(conn).await?;
78+
let parameters = recv_desc_params(&mut pipe).await?;
8079

81-
let rows = recv_desc_rows(conn).await?;
80+
let rows = recv_desc_rows(&mut pipe).await?;
8281

8382
// each SYNC produces one READY FOR QUERY
84-
conn.recv_ready_for_query().await?;
83+
pipe.recv_ready_for_query().await?;
8584

8685
let parameters = conn.handle_parameter_description(parameters).await?;
8786

8887
let (columns, column_names) = conn.handle_row_description(rows, true).await?;
8988

90-
// ensure that if we did fetch custom data, we wait until we are fully ready before
91-
// continuing
92-
conn.wait_until_ready().await?;
93-
9489
Arc::new(PgStatementMetadata {
9590
parameters,
9691
columns,
@@ -101,12 +96,12 @@ async fn prepare(
10196
Ok((id, metadata))
10297
}
10398

104-
async fn recv_desc_params(conn: &mut PgConnection) -> Result<ParameterDescription, Error> {
105-
conn.inner.stream.recv_expect().await
99+
async fn recv_desc_params(pipe: &mut Pipe) -> Result<ParameterDescription, Error> {
100+
pipe.recv_expect().await
106101
}
107102

108-
async fn recv_desc_rows(conn: &mut PgConnection) -> Result<Option<RowDescription>, Error> {
109-
let rows: Option<RowDescription> = match conn.inner.stream.recv().await? {
103+
async fn recv_desc_rows(pipe: &mut Pipe) -> Result<Option<RowDescription>, Error> {
104+
let rows: Option<RowDescription> = match pipe.recv().await? {
110105
// describes the rows that will be returned when the statement is eventually executed
111106
message if message.format == BackendMessageFormat::RowDescription => {
112107
Some(message.decode()?)
@@ -127,44 +122,6 @@ async fn recv_desc_rows(conn: &mut PgConnection) -> Result<Option<RowDescription
127122
}
128123

129124
impl PgConnection {
130-
// wait for CloseComplete to indicate a statement was closed
131-
pub(super) async fn wait_for_close_complete(&mut self, mut count: usize) -> Result<(), Error> {
132-
// we need to wait for the [CloseComplete] to be returned from the server
133-
while count > 0 {
134-
match self.inner.stream.recv().await? {
135-
message if message.format == BackendMessageFormat::PortalSuspended => {
136-
// there was an open portal
137-
// this can happen if the last time a statement was used it was not fully executed
138-
}
139-
140-
message if message.format == BackendMessageFormat::CloseComplete => {
141-
// successfully closed the statement (and freed up the server resources)
142-
count -= 1;
143-
}
144-
145-
message => {
146-
return Err(err_protocol!(
147-
"expecting PortalSuspended or CloseComplete but received {:?}",
148-
message.format
149-
));
150-
}
151-
}
152-
}
153-
154-
Ok(())
155-
}
156-
157-
#[inline(always)]
158-
pub(crate) fn write_sync(&mut self) {
159-
self.inner
160-
.stream
161-
.write_msg(message::Sync)
162-
.expect("BUG: Sync should not be too big for protocol");
163-
164-
// all SYNC messages will return a ReadyForQuery
165-
self.inner.pending_ready_for_query_count += 1;
166-
}
167-
168125
async fn get_or_prepare(
169126
&mut self,
170127
sql: &str,
@@ -182,13 +139,14 @@ impl PgConnection {
182139

183140
if persistent && self.inner.cache_statement.is_enabled() {
184141
if let Some((id, _)) = self.inner.cache_statement.insert(sql, statement.clone()) {
185-
self.inner.stream.write_msg(Close::Statement(id))?;
186-
self.write_sync();
187-
188-
self.inner.stream.flush().await?;
189-
190-
self.wait_for_close_complete(1).await?;
191-
self.recv_ready_for_query().await?;
142+
let mut pipe = self.pipe(|buf| {
143+
buf.write_msg(Close::Statement(id))?;
144+
buf.write_sync();
145+
Ok(())
146+
})?;
147+
148+
pipe.wait_for_close_complete(1).await?;
149+
pipe.recv_ready_for_query().await?;
192150
}
193151
}
194152

@@ -204,10 +162,8 @@ impl PgConnection {
204162
) -> Result<impl Stream<Item = Result<Either<PgQueryResult, PgRow>, Error>> + 'e, Error> {
205163
let mut logger = QueryLogger::new(query, self.inner.log_settings.clone());
206164

207-
// before we continue, wait until we are "ready" to accept more queries
208-
self.wait_until_ready().await?;
209-
210165
let mut metadata: Arc<PgStatementMetadata>;
166+
let mut pipe: Pipe;
211167

212168
let format = if let Some(mut arguments) = arguments {
213169
// Check this before we write anything to the stream.
@@ -234,53 +190,50 @@ impl PgConnection {
234190
// patch holes created during encoding
235191
arguments.apply_patches(self, &metadata.parameters).await?;
236192

237-
// consume messages till `ReadyForQuery` before bind and execute
238-
self.wait_until_ready().await?;
239-
240-
// bind to attach the arguments to the statement and create a portal
241-
self.inner.stream.write_msg(Bind {
242-
portal: PortalId::UNNAMED,
243-
statement,
244-
formats: &[PgValueFormat::Binary],
245-
num_params,
246-
params: &arguments.buffer,
247-
result_formats: &[PgValueFormat::Binary],
248-
})?;
249-
250-
// executes the portal up to the passed limit
251-
// the protocol-level limit acts nearly identically to the `LIMIT` in SQL
252-
self.inner.stream.write_msg(message::Execute {
253-
portal: PortalId::UNNAMED,
254-
// Non-zero limits cause query plan pessimization by disabling parallel workers:
255-
// https://github.com/launchbadge/sqlx/issues/3673
256-
limit: 0,
193+
pipe = self.pipe(|buf| {
194+
// bind to attach the arguments to the statement and create a portal
195+
buf.write_msg(Bind {
196+
portal: PortalId::UNNAMED,
197+
statement,
198+
formats: &[PgValueFormat::Binary],
199+
num_params,
200+
params: &arguments.buffer,
201+
result_formats: &[PgValueFormat::Binary],
202+
})?;
203+
204+
// executes the portal up to the passed limit
205+
// the protocol-level limit acts nearly identically to the `LIMIT` in SQL
206+
buf.write_msg(message::Execute {
207+
portal: PortalId::UNNAMED,
208+
// Non-zero limits cause query plan pessimization by disabling parallel workers:
209+
// https://github.com/launchbadge/sqlx/issues/3673
210+
limit: 0,
211+
})?;
212+
// From https://www.postgresql.org/docs/current/protocol-flow.html:
213+
//
214+
// "An unnamed portal is destroyed at the end of the transaction, or as
215+
// soon as the next Bind statement specifying the unnamed portal as
216+
// destination is issued. (Note that a simple Query message also
217+
// destroys the unnamed portal."
218+
219+
// we ask the database server to close the unnamed portal and free the associated resources
220+
// earlier - after the execution of the current query.
221+
buf.write_msg(Close::Portal(PortalId::UNNAMED))?;
222+
223+
// finally, [Sync] asks postgres to process the messages that we sent and respond with
224+
// a [ReadyForQuery] message when it's completely done. Theoretically, we could send
225+
// dozens of queries before a [Sync] and postgres can handle that. Execution on the server
226+
// is still serial but it would reduce round-trips. Some kind of builder pattern that is
227+
// termed batching might suit this.
228+
buf.write_sync();
229+
Ok(())
257230
})?;
258-
// From https://www.postgresql.org/docs/current/protocol-flow.html:
259-
//
260-
// "An unnamed portal is destroyed at the end of the transaction, or as
261-
// soon as the next Bind statement specifying the unnamed portal as
262-
// destination is issued. (Note that a simple Query message also
263-
// destroys the unnamed portal."
264-
265-
// we ask the database server to close the unnamed portal and free the associated resources
266-
// earlier - after the execution of the current query.
267-
self.inner
268-
.stream
269-
.write_msg(Close::Portal(PortalId::UNNAMED))?;
270-
271-
// finally, [Sync] asks postgres to process the messages that we sent and respond with
272-
// a [ReadyForQuery] message when it's completely done. Theoretically, we could send
273-
// dozens of queries before a [Sync] and postgres can handle that. Execution on the server
274-
// is still serial but it would reduce round-trips. Some kind of builder pattern that is
275-
// termed batching might suit this.
276-
self.write_sync();
277231

278232
// prepared statements are binary
279233
PgValueFormat::Binary
280234
} else {
281235
// Query will trigger a ReadyForQuery
282-
self.inner.stream.write_msg(Query(query))?;
283-
self.inner.pending_ready_for_query_count += 1;
236+
pipe = self.queue_simple_query(query)?;
284237

285238
// metadata starts out as "nothing"
286239
metadata = Arc::new(PgStatementMetadata::default());
@@ -289,11 +242,9 @@ impl PgConnection {
289242
PgValueFormat::Text
290243
};
291244

292-
self.inner.stream.flush().await?;
293-
294245
Ok(try_stream! {
295246
loop {
296-
let message = self.inner.stream.recv().await?;
247+
let message = pipe.recv().await?;
297248

298249
match message.format {
299250
BackendMessageFormat::BindComplete
@@ -451,8 +402,6 @@ impl<'c> Executor<'c> for &'c mut PgConnection {
451402
'c: 'e,
452403
{
453404
Box::pin(async move {
454-
self.wait_until_ready().await?;
455-
456405
let (_, metadata) = self.get_or_prepare(sql, parameters, true, None).await?;
457406

458407
Ok(PgStatement {
@@ -470,8 +419,6 @@ impl<'c> Executor<'c> for &'c mut PgConnection {
470419
'c: 'e,
471420
{
472421
Box::pin(async move {
473-
self.wait_until_ready().await?;
474-
475422
let (stmt_id, metadata) = self.get_or_prepare(sql, &[], true, None).await?;
476423

477424
let nullable = self.get_nullable_for_columns(stmt_id, &metadata).await?;

0 commit comments

Comments
 (0)