Skip to content

Commit a9ce9b6

Browse files
committed
sqlx-postgres: Move rfq handling to the bg worker
1 parent 7667f50 commit a9ce9b6

File tree

7 files changed

+109
-63
lines changed

7 files changed

+109
-63
lines changed

sqlx-postgres/src/connection/describe.rs

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -490,11 +490,13 @@ WHERE rngtypid = $1
490490

491491
/// Check whether EXPLAIN statements are supported by the current connection
492492
fn is_explain_available(&self) -> bool {
493-
let parameter_statuses = &self.inner.stream.parameter_statuses;
494-
let is_cockroachdb = parameter_statuses.contains_key("crdb_version");
495-
let is_materialize = parameter_statuses.contains_key("mz_version");
496-
let is_questdb = parameter_statuses.contains_key("questdb_version");
497-
!is_cockroachdb && !is_materialize && !is_questdb
493+
self.inner.shared.with_lock(|shared| {
494+
let parameter_statuses = &shared.parameter_statuses;
495+
let is_cockroachdb = parameter_statuses.contains_key("crdb_version");
496+
let is_materialize = parameter_statuses.contains_key("mz_version");
497+
let is_questdb = parameter_statuses.contains_key("questdb_version");
498+
!is_cockroachdb && !is_materialize && !is_questdb
499+
})
498500
}
499501

500502
pub(crate) async fn get_nullable_for_columns(

sqlx-postgres/src/connection/establish.rs

Lines changed: 12 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,10 @@
11
use crate::connection::{sasl, stream::PgStream};
22
use crate::error::Error;
3-
use crate::message::{
4-
Authentication, BackendKeyData, BackendMessageFormat, Password, ReadyForQuery, Startup,
5-
};
3+
use crate::message::{Authentication, BackendKeyData, BackendMessageFormat, Password, Startup};
64
use crate::{PgConnectOptions, PgConnection};
75
use futures_channel::mpsc::unbounded;
86

7+
use super::stream::parse_server_version;
98
use super::worker::Worker;
109

1110
// https://www.postgresql.org/docs/current/protocol-flow.html#id-1.10.5.7.3
@@ -20,9 +19,9 @@ impl PgConnection {
2019

2120
let (notif_tx, notif_rx) = unbounded();
2221

23-
let x = Worker::spawn(stream.into_inner(), notif_tx);
22+
let (x, shared) = Worker::spawn(stream.into_inner(), notif_tx);
2423

25-
let mut conn = PgConnection::new(pg_stream, options, x, notif_rx);
24+
let mut conn = PgConnection::new(pg_stream, options, x, notif_rx, shared);
2625

2726
// To begin a session, a frontend opens a connection to the server
2827
// and sends a startup message.
@@ -65,7 +64,6 @@ impl PgConnection {
6564

6665
let mut process_id = 0;
6766
let mut secret_key = 0;
68-
let transaction_status;
6967

7068
loop {
7169
let message = pipe.recv().await?;
@@ -121,9 +119,7 @@ impl PgConnection {
121119
}
122120

123121
BackendMessageFormat::ReadyForQuery => {
124-
// start-up is completed. The frontend can now issue commands
125-
transaction_status = message.decode::<ReadyForQuery>()?.transaction_status;
126-
122+
// Transaction status is updated in the bg worker.
127123
break;
128124
}
129125

@@ -136,7 +132,13 @@ impl PgConnection {
136132
}
137133
}
138134

139-
conn.inner.transaction_status = transaction_status;
135+
let server_version = conn
136+
.inner
137+
.shared
138+
.remove_parameter_status("server_version")
139+
.map(parse_server_version);
140+
141+
conn.inner.server_version_num = server_version.flatten();
140142
conn.inner.secret_key = secret_key;
141143
conn.inner.process_id = process_id;
142144

sqlx-postgres/src/connection/executor.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -321,8 +321,8 @@ impl PgConnection {
321321
}
322322

323323
BackendMessageFormat::ReadyForQuery => {
324-
// processing of the query string is complete
325-
self.handle_ready_for_query(message)?;
324+
// Processing of the query string is complete, the transaction status is
325+
// updated in the bg worker.
326326
break;
327327
}
328328

sqlx-postgres/src/connection/mod.rs

Lines changed: 11 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -9,14 +9,13 @@ use futures_core::future::BoxFuture;
99
use futures_util::FutureExt;
1010
use pipe::Pipe;
1111
use request::{IoRequest, MessageBuf};
12+
use worker::Shared;
1213

1314
use crate::common::StatementCache;
1415
use crate::error::Error;
1516
use crate::ext::ustr::UStr;
1617
use crate::io::StatementId;
17-
use crate::message::{
18-
Close, FrontendMessage, Notification, Query, ReadyForQuery, ReceivedMessage, TransactionStatus,
19-
};
18+
use crate::message::{Close, FrontendMessage, Notification, Query, TransactionStatus};
2019
use crate::statement::PgStatementMetadata;
2120
use crate::transaction::Transaction;
2221
use crate::types::Oid;
@@ -63,6 +62,8 @@ pub struct PgConnectionInner {
6362
#[allow(dead_code)]
6463
secret_key: u32,
6564

65+
pub(crate) server_version_num: Option<u32>,
66+
6667
// sequence of statement IDs for use in preparing statements
6768
// in PostgreSQL, the statement is prepared to a user-supplied identifier
6869
next_statement_id: StatementId,
@@ -76,11 +77,11 @@ pub struct PgConnectionInner {
7677
cache_elem_type_to_array: HashMap<Oid, Oid>,
7778
cache_table_to_column_names: HashMap<Oid, TableColumns>,
7879

79-
// current transaction status
80-
transaction_status: TransactionStatus,
8180
pub(crate) transaction_depth: usize,
8281

8382
log_settings: LogSettings,
83+
84+
shared: Shared,
8485
}
8586

8687
pub(crate) struct TableColumns {
@@ -92,14 +93,7 @@ pub(crate) struct TableColumns {
9293
impl PgConnection {
9394
/// the version number of the server in `libpq` format
9495
pub fn server_version_num(&self) -> Option<u32> {
95-
self.inner.stream.server_version_num
96-
}
97-
98-
#[inline(always)]
99-
fn handle_ready_for_query(&mut self, message: ReceivedMessage) -> Result<(), Error> {
100-
self.inner.transaction_status = message.decode::<ReadyForQuery>()?.transaction_status;
101-
102-
Ok(())
96+
self.inner.server_version_num
10397
}
10498

10599
/// Queue a simple query (not prepared) to execute the next time this connection is used.
@@ -111,7 +105,7 @@ impl PgConnection {
111105
}
112106

113107
pub(crate) fn in_transaction(&self) -> bool {
114-
match self.inner.transaction_status {
108+
match self.inner.shared.get_transaction_status() {
115109
TransactionStatus::Transaction => true,
116110
TransactionStatus::Error | TransactionStatus::Idle => false,
117111
}
@@ -122,6 +116,7 @@ impl PgConnection {
122116
options: &PgConnectOptions,
123117
chan: UnboundedSender<IoRequest>,
124118
notifications: UnboundedReceiver<Notification>,
119+
shared: Shared,
125120
) -> Self {
126121
Self {
127122
inner: Box::new(PgConnectionInner {
@@ -138,7 +133,8 @@ impl PgConnection {
138133
cache_table_to_column_names: HashMap::new(),
139134
transaction_depth: 0,
140135
stream,
141-
transaction_status: TransactionStatus::Idle,
136+
server_version_num: None,
137+
shared,
142138
}),
143139
}
144140
}

sqlx-postgres/src/connection/stream.rs

Lines changed: 17 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,3 @@
1-
use std::collections::BTreeMap;
21
use std::ops::{ControlFlow, Deref, DerefMut};
32
use std::str::FromStr;
43

@@ -8,8 +7,7 @@ use sqlx_core::bytes::Buf;
87
use crate::connection::tls::MaybeUpgradeTls;
98
use crate::error::Error;
109
use crate::message::{
11-
BackendMessage, BackendMessageFormat, EncodeMessage, FrontendMessage, Notice, ParameterStatus,
12-
ReceivedMessage,
10+
BackendMessage, BackendMessageFormat, EncodeMessage, FrontendMessage, Notice, ReceivedMessage,
1311
};
1412
use crate::net::{self, BufferedSocket, Socket};
1513
use crate::{PgConnectOptions, PgDatabaseError, PgSeverity};
@@ -27,10 +25,6 @@ pub struct PgStream {
2725
// A trait object is okay here as the buffering amortizes the overhead of both the dynamic
2826
// function call as well as the syscall.
2927
inner: BufferedSocket<Box<dyn Socket>>,
30-
31-
pub(crate) parameter_statuses: BTreeMap<String, String>,
32-
33-
pub(crate) server_version_num: Option<u32>,
3428
}
3529

3630
impl PgStream {
@@ -48,8 +42,6 @@ impl PgStream {
4842

4943
Ok(Self {
5044
inner: BufferedSocket::new(socket),
51-
parameter_statuses: BTreeMap::default(),
52-
server_version_num: None,
5345
})
5446
}
5547

@@ -126,25 +118,24 @@ impl PgStream {
126118
return Err(message.decode::<PgDatabaseError>()?.into());
127119
}
128120

129-
BackendMessageFormat::ParameterStatus => {
130-
// informs the frontend about the current (initial)
131-
// setting of backend parameters
121+
// BackendMessageFormat::ParameterStatus => {
122+
// // informs the frontend about the current (initial)
123+
// // setting of backend parameters
132124

133-
let ParameterStatus { name, value } = message.decode()?;
134-
// TODO: handle `client_encoding`, `DateStyle` change
135-
136-
match name.as_str() {
137-
"server_version" => {
138-
self.server_version_num = parse_server_version(&value);
139-
}
140-
_ => {
141-
self.parameter_statuses.insert(name, value);
142-
}
143-
}
125+
// let ParameterStatus { name, value } = message.decode()?;
126+
// // TODO: handle `client_encoding`, `DateStyle` change
144127

145-
continue;
146-
}
128+
// match name.as_str() {
129+
// "server_version" => {
130+
// self.server_version_num = parse_server_version(&value);
131+
// }
132+
// _ => {
133+
// self.parameter_statuses.insert(name, value);
134+
// }
135+
// }
147136

137+
// continue;
138+
// }
148139
BackendMessageFormat::NoticeResponse => {
149140
// do we need this to be more configurable?
150141
// if you are reading this comment and think so, open an issue
@@ -205,7 +196,7 @@ impl DerefMut for PgStream {
205196

206197
// reference:
207198
// https://github.com/postgres/postgres/blob/6feebcb6b44631c3dc435e971bd80c2dd218a5ab/src/interfaces/libpq/fe-exec.c#L1030-L1065
208-
fn parse_server_version(s: &str) -> Option<u32> {
199+
pub fn parse_server_version(s: String) -> Option<u32> {
209200
let mut parts = Vec::<u32>::with_capacity(3);
210201

211202
let mut from = 0;

sqlx-postgres/src/connection/worker.rs

Lines changed: 59 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,15 @@
11
use std::{
2-
collections::VecDeque,
2+
collections::{BTreeMap, VecDeque},
33
future::Future,
44
ops::ControlFlow,
55
pin::Pin,
6+
sync::{Arc, Mutex, MutexGuard},
67
task::{ready, Context, Poll},
78
};
89

910
use crate::message::{
10-
BackendMessageFormat, FrontendMessage, Notification, ReceivedMessage, Terminate,
11+
BackendMessageFormat, FrontendMessage, Notification, ParameterStatus, ReadyForQuery,
12+
ReceivedMessage, Terminate, TransactionStatus,
1113
};
1214
use futures_channel::mpsc::{unbounded, UnboundedReceiver, UnboundedSender};
1315
use futures_util::{SinkExt, StreamExt};
@@ -38,14 +40,16 @@ pub struct Worker {
3840
back_log: VecDeque<UnboundedSender<ReceivedMessage>>,
3941
socket: BufferedSocket<Box<dyn Socket>>,
4042
notif_chan: UnboundedSender<Notification>,
43+
shared: Shared,
4144
}
4245

4346
impl Worker {
4447
pub fn spawn(
4548
socket: BufferedSocket<Box<dyn Socket>>,
4649
notif_chan: UnboundedSender<Notification>,
47-
) -> UnboundedSender<IoRequest> {
50+
) -> (UnboundedSender<IoRequest>, Shared) {
4851
let (tx, rx) = unbounded();
52+
let shared = Shared::new();
4953

5054
let worker = Worker {
5155
state: WorkerState::Open,
@@ -54,10 +58,11 @@ impl Worker {
5458
back_log: VecDeque::new(),
5559
socket,
5660
notif_chan,
61+
shared: shared.clone(),
5762
};
5863

5964
spawn(worker);
60-
tx
65+
(tx, shared)
6166
}
6267

6368
// Tries to receive the next message from the channel. Also handles termination if needed.
@@ -128,6 +133,9 @@ impl Worker {
128133
while let Poll::Ready(response) = self.poll_next_message(cx)? {
129134
match response.format {
130135
BackendMessageFormat::ReadyForQuery => {
136+
let rfq: ReadyForQuery = response.clone().decode()?;
137+
self.shared.set_transaction_status(rfq.transaction_status);
138+
131139
self.send_back(response)?;
132140
// Remove from the backlog so we dont send more responses back.
133141
let _ = self.back_log.pop_front();
@@ -145,6 +153,9 @@ impl Worker {
145153
}
146154
BackendMessageFormat::ParameterStatus => {
147155
// Asynchronous response - todo
156+
//
157+
let ParameterStatus { name, value } = response.decode()?;
158+
self.shared.insert_parameter_status(name, value);
148159
}
149160
BackendMessageFormat::NoticeResponse => {
150161
// Asynchronous response - todo
@@ -226,3 +237,47 @@ impl Future for Worker {
226237
self.poll_shutdown(cx)
227238
}
228239
}
240+
241+
#[derive(Clone)]
242+
pub struct Shared(Arc<Mutex<SharedInner>>);
243+
244+
pub struct SharedInner {
245+
pub parameter_statuses: BTreeMap<String, String>,
246+
pub transaction_status: TransactionStatus,
247+
}
248+
249+
impl Shared {
250+
pub fn new() -> Shared {
251+
Shared(Arc::new(Mutex::new(SharedInner {
252+
parameter_statuses: BTreeMap::new(),
253+
transaction_status: TransactionStatus::Idle,
254+
})))
255+
}
256+
257+
fn lock(&self) -> MutexGuard<'_, SharedInner> {
258+
self.0
259+
.lock()
260+
.expect("BUG: failed to get lock on shared state in worker")
261+
}
262+
263+
pub fn get_transaction_status(&self) -> TransactionStatus {
264+
self.lock().transaction_status
265+
}
266+
267+
fn set_transaction_status(&self, status: TransactionStatus) {
268+
self.lock().transaction_status = status
269+
}
270+
271+
fn insert_parameter_status(&self, name: String, value: String) {
272+
self.lock().parameter_statuses.insert(name, value);
273+
}
274+
275+
pub fn remove_parameter_status(&self, name: &str) -> Option<String> {
276+
self.lock().parameter_statuses.remove(name)
277+
}
278+
279+
pub fn with_lock<T>(&self, f: impl Fn(&mut SharedInner) -> T) -> T {
280+
let mut lock = self.lock();
281+
f(&mut lock)
282+
}
283+
}

sqlx-postgres/src/message/ready_for_query.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ use sqlx_core::bytes::Bytes;
33
use crate::error::Error;
44
use crate::message::{BackendMessage, BackendMessageFormat};
55

6-
#[derive(Debug)]
6+
#[derive(Debug, Clone, Copy)]
77
#[repr(u8)]
88
pub enum TransactionStatus {
99
/// Not in a transaction block.

0 commit comments

Comments
 (0)