Skip to content

Commit 2923e6f

Browse files
committed
sqlx-postgres: Move rfq handling to the bg worker
1 parent 598aeaf commit 2923e6f

File tree

7 files changed

+109
-63
lines changed

7 files changed

+109
-63
lines changed

sqlx-postgres/src/connection/describe.rs

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -414,11 +414,13 @@ WHERE rngtypid = $1
414414

415415
/// Check whether EXPLAIN statements are supported by the current connection
416416
fn is_explain_available(&self) -> bool {
417-
let parameter_statuses = &self.inner.stream.parameter_statuses;
418-
let is_cockroachdb = parameter_statuses.contains_key("crdb_version");
419-
let is_materialize = parameter_statuses.contains_key("mz_version");
420-
let is_questdb = parameter_statuses.contains_key("questdb_version");
421-
!is_cockroachdb && !is_materialize && !is_questdb
417+
self.inner.shared.with_lock(|shared| {
418+
let parameter_statuses = &shared.parameter_statuses;
419+
let is_cockroachdb = parameter_statuses.contains_key("crdb_version");
420+
let is_materialize = parameter_statuses.contains_key("mz_version");
421+
let is_questdb = parameter_statuses.contains_key("questdb_version");
422+
!is_cockroachdb && !is_materialize && !is_questdb
423+
})
422424
}
423425

424426
pub(crate) async fn get_nullable_for_columns(

sqlx-postgres/src/connection/establish.rs

Lines changed: 12 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,10 @@
11
use crate::connection::{sasl, stream::PgStream};
22
use crate::error::Error;
3-
use crate::message::{
4-
Authentication, BackendKeyData, BackendMessageFormat, Password, ReadyForQuery, Startup,
5-
};
3+
use crate::message::{Authentication, BackendKeyData, BackendMessageFormat, Password, Startup};
64
use crate::{PgConnectOptions, PgConnection};
75
use futures_channel::mpsc::unbounded;
86

7+
use super::stream::parse_server_version;
98
use super::worker::Worker;
109

1110
// https://www.postgresql.org/docs/current/protocol-flow.html#id-1.10.5.7.3
@@ -20,9 +19,9 @@ impl PgConnection {
2019

2120
let (notif_tx, notif_rx) = unbounded();
2221

23-
let x = Worker::spawn(stream.into_inner(), notif_tx);
22+
let (x, shared) = Worker::spawn(stream.into_inner(), notif_tx);
2423

25-
let mut conn = PgConnection::new(pg_stream, options, x, notif_rx);
24+
let mut conn = PgConnection::new(pg_stream, options, x, notif_rx, shared);
2625

2726
// To begin a session, a frontend opens a connection to the server
2827
// and sends a startup message.
@@ -65,7 +64,6 @@ impl PgConnection {
6564

6665
let mut process_id = 0;
6766
let mut secret_key = 0;
68-
let transaction_status;
6967

7068
loop {
7169
let message = pipe.recv().await?;
@@ -121,9 +119,7 @@ impl PgConnection {
121119
}
122120

123121
BackendMessageFormat::ReadyForQuery => {
124-
// start-up is completed. The frontend can now issue commands
125-
transaction_status = message.decode::<ReadyForQuery>()?.transaction_status;
126-
122+
// Transaction status is updated in the bg worker.
127123
break;
128124
}
129125

@@ -136,7 +132,13 @@ impl PgConnection {
136132
}
137133
}
138134

139-
conn.inner.transaction_status = transaction_status;
135+
let server_version = conn
136+
.inner
137+
.shared
138+
.remove_parameter_status("server_version")
139+
.map(parse_server_version);
140+
141+
conn.inner.server_version_num = server_version.flatten();
140142
conn.inner.secret_key = secret_key;
141143
conn.inner.process_id = process_id;
142144

sqlx-postgres/src/connection/executor.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -309,8 +309,8 @@ impl PgConnection {
309309
}
310310

311311
BackendMessageFormat::ReadyForQuery => {
312-
// processing of the query string is complete
313-
self.handle_ready_for_query(message)?;
312+
// Processing of the query string is complete, the transaction status is
313+
// updated in the bg worker.
314314
break;
315315
}
316316

sqlx-postgres/src/connection/mod.rs

Lines changed: 11 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -8,14 +8,13 @@ use futures_core::future::BoxFuture;
88
use futures_util::FutureExt;
99
use pipe::Pipe;
1010
use request::{IoRequest, MessageBuf};
11+
use worker::Shared;
1112

1213
use crate::common::StatementCache;
1314
use crate::error::Error;
1415
use crate::ext::ustr::UStr;
1516
use crate::io::StatementId;
16-
use crate::message::{
17-
Close, FrontendMessage, Notification, Query, ReadyForQuery, ReceivedMessage, TransactionStatus,
18-
};
17+
use crate::message::{Close, FrontendMessage, Notification, Query, TransactionStatus};
1918
use crate::statement::PgStatementMetadata;
2019
use crate::transaction::Transaction;
2120
use crate::types::Oid;
@@ -62,6 +61,8 @@ pub struct PgConnectionInner {
6261
#[allow(dead_code)]
6362
secret_key: u32,
6463

64+
pub(crate) server_version_num: Option<u32>,
65+
6566
// sequence of statement IDs for use in preparing statements
6667
// in PostgreSQL, the statement is prepared to a user-supplied identifier
6768
next_statement_id: StatementId,
@@ -74,24 +75,17 @@ pub struct PgConnectionInner {
7475
cache_type_oid: HashMap<UStr, Oid>,
7576
cache_elem_type_to_array: HashMap<Oid, Oid>,
7677

77-
// current transaction status
78-
transaction_status: TransactionStatus,
7978
pub(crate) transaction_depth: usize,
8079

8180
log_settings: LogSettings,
81+
82+
shared: Shared,
8283
}
8384

8485
impl PgConnection {
8586
/// the version number of the server in `libpq` format
8687
pub fn server_version_num(&self) -> Option<u32> {
87-
self.inner.stream.server_version_num
88-
}
89-
90-
#[inline(always)]
91-
fn handle_ready_for_query(&mut self, message: ReceivedMessage) -> Result<(), Error> {
92-
self.inner.transaction_status = message.decode::<ReadyForQuery>()?.transaction_status;
93-
94-
Ok(())
88+
self.inner.server_version_num
9589
}
9690

9791
/// Queue a simple query (not prepared) to execute the next time this connection is used.
@@ -103,7 +97,7 @@ impl PgConnection {
10397
}
10498

10599
pub(crate) fn in_transaction(&self) -> bool {
106-
match self.inner.transaction_status {
100+
match self.inner.shared.get_transaction_status() {
107101
TransactionStatus::Transaction => true,
108102
TransactionStatus::Error | TransactionStatus::Idle => false,
109103
}
@@ -114,6 +108,7 @@ impl PgConnection {
114108
options: &PgConnectOptions,
115109
chan: UnboundedSender<IoRequest>,
116110
notifications: UnboundedReceiver<Notification>,
111+
shared: Shared,
117112
) -> Self {
118113
Self {
119114
inner: Box::new(PgConnectionInner {
@@ -129,7 +124,8 @@ impl PgConnection {
129124
cache_elem_type_to_array: HashMap::new(),
130125
transaction_depth: 0,
131126
stream,
132-
transaction_status: TransactionStatus::Idle,
127+
server_version_num: None,
128+
shared,
133129
}),
134130
}
135131
}

sqlx-postgres/src/connection/stream.rs

Lines changed: 17 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,3 @@
1-
use std::collections::BTreeMap;
21
use std::ops::{ControlFlow, Deref, DerefMut};
32
use std::str::FromStr;
43

@@ -8,8 +7,7 @@ use sqlx_core::bytes::Buf;
87
use crate::connection::tls::MaybeUpgradeTls;
98
use crate::error::Error;
109
use crate::message::{
11-
BackendMessage, BackendMessageFormat, EncodeMessage, FrontendMessage, Notice, ParameterStatus,
12-
ReceivedMessage,
10+
BackendMessage, BackendMessageFormat, EncodeMessage, FrontendMessage, Notice, ReceivedMessage,
1311
};
1412
use crate::net::{self, BufferedSocket, Socket};
1513
use crate::{PgConnectOptions, PgDatabaseError, PgSeverity};
@@ -27,10 +25,6 @@ pub struct PgStream {
2725
// A trait object is okay here as the buffering amortizes the overhead of both the dynamic
2826
// function call as well as the syscall.
2927
inner: BufferedSocket<Box<dyn Socket>>,
30-
31-
pub(crate) parameter_statuses: BTreeMap<String, String>,
32-
33-
pub(crate) server_version_num: Option<u32>,
3428
}
3529

3630
impl PgStream {
@@ -48,8 +42,6 @@ impl PgStream {
4842

4943
Ok(Self {
5044
inner: BufferedSocket::new(socket),
51-
parameter_statuses: BTreeMap::default(),
52-
server_version_num: None,
5345
})
5446
}
5547

@@ -126,25 +118,24 @@ impl PgStream {
126118
return Err(message.decode::<PgDatabaseError>()?.into());
127119
}
128120

129-
BackendMessageFormat::ParameterStatus => {
130-
// informs the frontend about the current (initial)
131-
// setting of backend parameters
121+
// BackendMessageFormat::ParameterStatus => {
122+
// // informs the frontend about the current (initial)
123+
// // setting of backend parameters
132124

133-
let ParameterStatus { name, value } = message.decode()?;
134-
// TODO: handle `client_encoding`, `DateStyle` change
135-
136-
match name.as_str() {
137-
"server_version" => {
138-
self.server_version_num = parse_server_version(&value);
139-
}
140-
_ => {
141-
self.parameter_statuses.insert(name, value);
142-
}
143-
}
125+
// let ParameterStatus { name, value } = message.decode()?;
126+
// // TODO: handle `client_encoding`, `DateStyle` change
144127

145-
continue;
146-
}
128+
// match name.as_str() {
129+
// "server_version" => {
130+
// self.server_version_num = parse_server_version(&value);
131+
// }
132+
// _ => {
133+
// self.parameter_statuses.insert(name, value);
134+
// }
135+
// }
147136

137+
// continue;
138+
// }
148139
BackendMessageFormat::NoticeResponse => {
149140
// do we need this to be more configurable?
150141
// if you are reading this comment and think so, open an issue
@@ -205,7 +196,7 @@ impl DerefMut for PgStream {
205196

206197
// reference:
207198
// https://github.com/postgres/postgres/blob/6feebcb6b44631c3dc435e971bd80c2dd218a5ab/src/interfaces/libpq/fe-exec.c#L1030-L1065
208-
fn parse_server_version(s: &str) -> Option<u32> {
199+
pub fn parse_server_version(s: String) -> Option<u32> {
209200
let mut parts = Vec::<u32>::with_capacity(3);
210201

211202
let mut from = 0;

sqlx-postgres/src/connection/worker.rs

Lines changed: 59 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,15 @@
11
use std::{
2-
collections::VecDeque,
2+
collections::{BTreeMap, VecDeque},
33
future::Future,
44
ops::ControlFlow,
55
pin::Pin,
6+
sync::{Arc, Mutex, MutexGuard},
67
task::{ready, Context, Poll},
78
};
89

910
use crate::message::{
10-
BackendMessageFormat, FrontendMessage, Notification, ReceivedMessage, Terminate,
11+
BackendMessageFormat, FrontendMessage, Notification, ParameterStatus, ReadyForQuery,
12+
ReceivedMessage, Terminate, TransactionStatus,
1113
};
1214
use futures_channel::mpsc::{unbounded, UnboundedReceiver, UnboundedSender};
1315
use futures_util::{SinkExt, StreamExt};
@@ -38,14 +40,16 @@ pub struct Worker {
3840
back_log: VecDeque<UnboundedSender<ReceivedMessage>>,
3941
socket: BufferedSocket<Box<dyn Socket>>,
4042
notif_chan: UnboundedSender<Notification>,
43+
shared: Shared,
4144
}
4245

4346
impl Worker {
4447
pub fn spawn(
4548
socket: BufferedSocket<Box<dyn Socket>>,
4649
notif_chan: UnboundedSender<Notification>,
47-
) -> UnboundedSender<IoRequest> {
50+
) -> (UnboundedSender<IoRequest>, Shared) {
4851
let (tx, rx) = unbounded();
52+
let shared = Shared::new();
4953

5054
let worker = Worker {
5155
state: WorkerState::Open,
@@ -54,10 +58,11 @@ impl Worker {
5458
back_log: VecDeque::new(),
5559
socket,
5660
notif_chan,
61+
shared: shared.clone(),
5762
};
5863

5964
spawn(worker);
60-
tx
65+
(tx, shared)
6166
}
6267

6368
// Tries to receive the next message from the channel. Also handles termination if needed.
@@ -128,6 +133,9 @@ impl Worker {
128133
while let Poll::Ready(response) = self.poll_next_message(cx)? {
129134
match response.format {
130135
BackendMessageFormat::ReadyForQuery => {
136+
let rfq: ReadyForQuery = response.clone().decode()?;
137+
self.shared.set_transaction_status(rfq.transaction_status);
138+
131139
self.send_back(response)?;
132140
// Remove from the backlog so we dont send more responses back.
133141
let _ = self.back_log.pop_front();
@@ -145,6 +153,9 @@ impl Worker {
145153
}
146154
BackendMessageFormat::ParameterStatus => {
147155
// Asynchronous response - todo
156+
//
157+
let ParameterStatus { name, value } = response.decode()?;
158+
self.shared.insert_parameter_status(name, value);
148159
}
149160
BackendMessageFormat::NoticeResponse => {
150161
// Asynchronous response - todo
@@ -226,3 +237,47 @@ impl Future for Worker {
226237
self.poll_shutdown(cx)
227238
}
228239
}
240+
241+
#[derive(Clone)]
242+
pub struct Shared(Arc<Mutex<SharedInner>>);
243+
244+
pub struct SharedInner {
245+
pub parameter_statuses: BTreeMap<String, String>,
246+
pub transaction_status: TransactionStatus,
247+
}
248+
249+
impl Shared {
250+
pub fn new() -> Shared {
251+
Shared(Arc::new(Mutex::new(SharedInner {
252+
parameter_statuses: BTreeMap::new(),
253+
transaction_status: TransactionStatus::Idle,
254+
})))
255+
}
256+
257+
fn lock(&self) -> MutexGuard<'_, SharedInner> {
258+
self.0
259+
.lock()
260+
.expect("BUG: failed to get lock on shared state in worker")
261+
}
262+
263+
pub fn get_transaction_status(&self) -> TransactionStatus {
264+
self.lock().transaction_status
265+
}
266+
267+
fn set_transaction_status(&self, status: TransactionStatus) {
268+
self.lock().transaction_status = status
269+
}
270+
271+
fn insert_parameter_status(&self, name: String, value: String) {
272+
self.lock().parameter_statuses.insert(name, value);
273+
}
274+
275+
pub fn remove_parameter_status(&self, name: &str) -> Option<String> {
276+
self.lock().parameter_statuses.remove(name)
277+
}
278+
279+
pub fn with_lock<T>(&self, f: impl Fn(&mut SharedInner) -> T) -> T {
280+
let mut lock = self.lock();
281+
f(&mut lock)
282+
}
283+
}

sqlx-postgres/src/message/ready_for_query.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ use sqlx_core::bytes::Bytes;
33
use crate::error::Error;
44
use crate::message::{BackendMessage, BackendMessageFormat};
55

6-
#[derive(Debug)]
6+
#[derive(Debug, Clone, Copy)]
77
#[repr(u8)]
88
pub enum TransactionStatus {
99
/// Not in a transaction block.

0 commit comments

Comments
 (0)