Skip to content

Commit 5972b6f

Browse files
authored
Switch to parking_lot RwLock & Mutex. Use trace! for protocol instead of debug! (#42)
* RwLock & parking_lot::Mutex * upgrade to trace
1 parent b3c8ca4 commit 5972b6f

File tree

7 files changed

+48
-41
lines changed

7 files changed

+48
-41
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,3 +25,4 @@ sqlparser = "0.14"
2525
log = "0.4"
2626
arc-swap = "1"
2727
env_logger = "0.9"
28+
parking_lot = "0.11"

src/client.rs

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
/// We are pretending to the server in this scenario,
33
/// and this module implements that.
44
use bytes::{Buf, BufMut, BytesMut};
5-
use log::{debug, error};
5+
use log::{debug, error, trace};
66
use tokio::io::{AsyncReadExt, BufReader};
77
use tokio::net::{
88
tcp::{OwnedReadHalf, OwnedWriteHalf},
@@ -70,7 +70,7 @@ impl Client {
7070
let transaction_mode = config.general.pool_mode.starts_with("t");
7171
drop(config);
7272
loop {
73-
debug!("Waiting for StartupMessage");
73+
trace!("Waiting for StartupMessage");
7474

7575
// Could be StartupMessage or SSLRequest
7676
// which makes this variable length.
@@ -93,7 +93,7 @@ impl Client {
9393
match code {
9494
// Client wants SSL. We don't support it at the moment.
9595
SSL_REQUEST_CODE => {
96-
debug!("Rejecting SSLRequest");
96+
trace!("Rejecting SSLRequest");
9797

9898
let mut no = BytesMut::with_capacity(1);
9999
no.put_u8(b'N');
@@ -103,7 +103,7 @@ impl Client {
103103

104104
// Regular startup message.
105105
PROTOCOL_VERSION_NUMBER => {
106-
debug!("Got StartupMessage");
106+
trace!("Got StartupMessage");
107107

108108
// TODO: perform actual auth.
109109
let parameters = parse_startup(bytes.clone())?;
@@ -116,7 +116,7 @@ impl Client {
116116
write_all(&mut stream, server_info).await?;
117117
backend_key_data(&mut stream, process_id, secret_key).await?;
118118
ready_for_query(&mut stream).await?;
119-
debug!("Startup OK");
119+
trace!("Startup OK");
120120

121121
// Split the read and write streams
122122
// so we can control buffering.
@@ -168,10 +168,10 @@ impl Client {
168168
pub async fn handle(&mut self, mut pool: ConnectionPool) -> Result<(), Error> {
169169
// The client wants to cancel a query it has issued previously.
170170
if self.cancel_mode {
171-
debug!("Sending CancelRequest");
171+
trace!("Sending CancelRequest");
172172

173173
let (process_id, secret_key, address, port) = {
174-
let guard = self.client_server_map.lock().unwrap();
174+
let guard = self.client_server_map.lock();
175175

176176
match guard.get(&(self.process_id, self.secret_key)) {
177177
// Drop the mutex as soon as possible.
@@ -202,7 +202,7 @@ impl Client {
202202
// We expect the client to either start a transaction with regular queries
203203
// or issue commands for our sharding and server selection protocols.
204204
loop {
205-
debug!("Client idle, waiting for message");
205+
trace!("Client idle, waiting for message");
206206

207207
// Client idle, waiting for messages.
208208
self.stats.client_idle(self.process_id);
@@ -216,7 +216,7 @@ impl Client {
216216

217217
// Avoid taking a server if the client just wants to disconnect.
218218
if message[0] as char == 'X' {
219-
debug!("Client disconnecting");
219+
trace!("Client disconnecting");
220220
return Ok(());
221221
}
222222

@@ -333,7 +333,7 @@ impl Client {
333333
let code = message.get_u8() as char;
334334
let _len = message.get_i32() as usize;
335335

336-
debug!("Message: {}", code);
336+
trace!("Message: {}", code);
337337

338338
match code {
339339
// ReadyForQuery
@@ -514,7 +514,7 @@ impl Client {
514514

515515
/// Release the server from being mine. I can't cancel its queries anymore.
516516
pub fn release(&self) {
517-
let mut guard = self.client_server_map.lock().unwrap();
517+
let mut guard = self.client_server_map.lock();
518518
guard.remove(&(self.process_id, self.secret_key));
519519
}
520520
}

src/main.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ extern crate tokio;
3636
extern crate toml;
3737

3838
use log::{error, info};
39+
use parking_lot::Mutex;
3940
use tokio::net::TcpListener;
4041
use tokio::{
4142
signal,
@@ -44,7 +45,7 @@ use tokio::{
4445
};
4546

4647
use std::collections::HashMap;
47-
use std::sync::{Arc, Mutex};
48+
use std::sync::Arc;
4849

4950
mod client;
5051
mod config;

src/messages.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ pub async fn backend_key_data(
3838
Ok(write_all(stream, key_data).await?)
3939
}
4040

41-
#[allow(dead_code)]
41+
/// Construct a `Q`: Query message.
4242
pub fn simple_query(query: &str) -> BytesMut {
4343
let mut res = BytesMut::from(&b"Q"[..]);
4444
let query = format!("{}\0", query);

src/pool.rs

Lines changed: 23 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -3,19 +3,20 @@ use async_trait::async_trait;
33
use bb8::{ManageConnection, Pool, PooledConnection};
44
use bytes::BytesMut;
55
use chrono::naive::NaiveDateTime;
6-
use log::{error, info, warn};
6+
use log::{debug, error, info, warn};
7+
use parking_lot::{Mutex, RwLock};
78

89
use crate::config::{get_config, Address, Role, User};
910
use crate::errors::Error;
1011
use crate::server::Server;
1112
use crate::stats::Reporter;
1213

1314
use std::collections::HashMap;
14-
use std::sync::{Arc, Mutex};
15+
use std::sync::Arc;
1516
use std::time::Instant;
1617

1718
// Banlist: bad servers go in here.
18-
pub type BanList = Arc<Mutex<Vec<HashMap<Address, NaiveDateTime>>>>;
19+
pub type BanList = Arc<RwLock<Vec<HashMap<Address, NaiveDateTime>>>>;
1920
pub type ClientServerMap = Arc<Mutex<HashMap<(i32, i32), (i32, i32, String, String)>>>;
2021

2122
#[derive(Clone, Debug)]
@@ -101,7 +102,7 @@ impl ConnectionPool {
101102
databases: shards,
102103
addresses: addresses,
103104
round_robin: rand::random::<usize>() % address_len, // Start at a random replica
104-
banlist: Arc::new(Mutex::new(banlist)),
105+
banlist: Arc::new(RwLock::new(banlist)),
105106
stats: stats,
106107
}
107108
}
@@ -161,6 +162,8 @@ impl ConnectionPool {
161162
_ => addresses.len(),
162163
};
163164

165+
debug!("Allowed attempts for {:?}: {}", role, allowed_attempts);
166+
164167
let exists = match role {
165168
Some(role) => addresses.iter().filter(|addr| addr.role == role).count() > 0,
166169
None => true,
@@ -251,14 +254,14 @@ impl ConnectionPool {
251254
pub fn ban(&self, address: &Address, shard: usize) {
252255
error!("Banning {:?}", address);
253256
let now = chrono::offset::Utc::now().naive_utc();
254-
let mut guard = self.banlist.lock().unwrap();
257+
let mut guard = self.banlist.write();
255258
guard[shard].insert(address.clone(), now);
256259
}
257260

258261
/// Clear the replica to receive traffic again. Takes effect immediately
259262
/// for all new transactions.
260263
pub fn _unban(&self, address: &Address, shard: usize) {
261-
let mut guard = self.banlist.lock().unwrap();
264+
let mut guard = self.banlist.write();
262265
guard[shard].remove(address);
263266
}
264267

@@ -274,12 +277,14 @@ impl ConnectionPool {
274277
Some(Role::Primary) => return false, // Primary cannot be banned.
275278
};
276279

277-
// If you're not asking for the primary,
278-
// all databases are treated as replicas.
279-
let mut guard = self.banlist.lock().unwrap();
280+
debug!("Available targets for {:?}: {}", role, replicas_available);
281+
282+
let guard = self.banlist.read();
280283

281284
// Everything is banned = nothing is banned.
282285
if guard[shard].len() == replicas_available {
286+
drop(guard);
287+
let mut guard = self.banlist.write();
283288
guard[shard].clear();
284289
drop(guard);
285290
warn!("Unbanning all replicas.");
@@ -291,16 +296,24 @@ impl ConnectionPool {
291296
Some(timestamp) => {
292297
let now = chrono::offset::Utc::now().naive_utc();
293298
let config = get_config();
299+
294300
// Ban expired.
295301
if now.timestamp() - timestamp.timestamp() > config.general.ban_time {
302+
drop(guard);
303+
warn!("Unbanning {:?}", address);
304+
let mut guard = self.banlist.write();
296305
guard[shard].remove(address);
297306
false
298307
} else {
308+
debug!("{:?} is banned", address);
299309
true
300310
}
301311
}
302312

303-
None => false,
313+
None => {
314+
debug!("{:?} is ok", address);
315+
false
316+
}
304317
}
305318
}
306319

src/server.rs

Lines changed: 9 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
use bytes::{Buf, BufMut, BytesMut};
22
///! Implementation of the PostgreSQL server (database) protocol.
33
///! Here we are pretending to the a Postgres client.
4-
use log::{debug, error, info};
4+
use log::{debug, error, info, trace};
55
use tokio::io::{AsyncReadExt, BufReader};
66
use tokio::net::{
77
tcp::{OwnedReadHalf, OwnedWriteHalf},
@@ -75,7 +75,7 @@ impl Server {
7575
}
7676
};
7777

78-
debug!("Sending StartupMessage");
78+
trace!("Sending StartupMessage");
7979

8080
// Send the startup packet telling the server we're a normal Postgres client.
8181
startup(&mut stream, &user.name, database).await?;
@@ -97,7 +97,7 @@ impl Server {
9797
Err(_) => return Err(Error::SocketError),
9898
};
9999

100-
debug!("Message: {}", code);
100+
trace!("Message: {}", code);
101101

102102
match code {
103103
// Authentication
@@ -108,7 +108,7 @@ impl Server {
108108
Err(_) => return Err(Error::SocketError),
109109
};
110110

111-
debug!("Auth: {}", auth_code);
111+
trace!("Auth: {}", auth_code);
112112

113113
match auth_code {
114114
MD5_ENCRYPTED_PASSWORD => {
@@ -141,7 +141,7 @@ impl Server {
141141
Err(_) => return Err(Error::SocketError),
142142
};
143143

144-
debug!("Error: {}", error_code);
144+
trace!("Error: {}", error_code);
145145

146146
match error_code {
147147
// No error message is present in the message.
@@ -300,7 +300,7 @@ impl Server {
300300
let code = message.get_u8() as char;
301301
let _len = message.get_i32();
302302

303-
debug!("Message: {}", code);
303+
trace!("Message: {}", code);
304304

305305
match code {
306306
// ReadyForQuery
@@ -415,7 +415,7 @@ impl Server {
415415

416416
/// Claim this server as mine for the purposes of query cancellation.
417417
pub fn claim(&mut self, process_id: i32, secret_key: i32) {
418-
let mut guard = self.client_server_map.lock().unwrap();
418+
let mut guard = self.client_server_map.lock();
419419
guard.insert(
420420
(process_id, secret_key),
421421
(
@@ -431,18 +431,9 @@ impl Server {
431431
/// It will use the simple query protocol.
432432
/// Result will not be returned, so this is useful for things like `SET` or `ROLLBACK`.
433433
pub async fn query(&mut self, query: &str) -> Result<(), Error> {
434-
let mut query = BytesMut::from(&query.as_bytes()[..]);
435-
query.put_u8(0); // C-string terminator (NULL character).
434+
let query = simple_query(query);
436435

437-
let len = query.len() as i32 + 4;
438-
439-
let mut msg = BytesMut::with_capacity(len as usize + 1);
440-
441-
msg.put_u8(b'Q');
442-
msg.put_i32(len);
443-
msg.put_slice(&query[..]);
444-
445-
self.send(msg).await?;
436+
self.send(query).await?;
446437

447438
loop {
448439
let _ = self.recv().await?;

0 commit comments

Comments
 (0)