Skip to content

Commit 964a5e1

Browse files
authored
Don't drop connections if DB hasn't changed (#175)
* Don't drop connections if DB hasn't changed * Incoporate connect_timeout into the pool config * use the field
1 parent d126c74 commit 964a5e1

File tree

2 files changed

+54
-9
lines changed

2 files changed

+54
-9
lines changed

src/config.rs

Lines changed: 35 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ use log::{error, info};
44
use once_cell::sync::Lazy;
55
use serde_derive::{Deserialize, Serialize};
66
use std::collections::{HashMap, HashSet};
7-
use std::hash::Hash;
7+
use std::hash::{Hash, Hasher};
88
use std::path::Path;
99
use std::sync::Arc;
1010
use tokio::fs::File;
@@ -122,7 +122,7 @@ impl Address {
122122
}
123123

124124
/// PostgreSQL user.
125-
#[derive(Clone, PartialEq, Hash, std::cmp::Eq, Serialize, Deserialize, Debug)]
125+
#[derive(Clone, PartialEq, Hash, Eq, Serialize, Deserialize, Debug)]
126126
pub struct User {
127127
pub username: String,
128128
pub password: String,
@@ -232,7 +232,7 @@ impl Default for General {
232232
/// Pool mode:
233233
/// - transaction: server serves one transaction,
234234
/// - session: server is attached to the client.
235-
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Copy)]
235+
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq, Copy, Hash)]
236236
pub enum PoolMode {
237237
#[serde(alias = "transaction", alias = "Transaction")]
238238
Transaction,
@@ -250,7 +250,7 @@ impl ToString for PoolMode {
250250
}
251251
}
252252

253-
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq)]
253+
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq)]
254254
pub struct Pool {
255255
#[serde(default = "Pool::default_pool_mode")]
256256
pub pool_mode: PoolMode,
@@ -263,11 +263,35 @@ pub struct Pool {
263263
#[serde(default)] // False
264264
pub primary_reads_enabled: bool,
265265

266+
#[serde(default = "General::default_connect_timeout")]
267+
pub connect_timeout: u64,
268+
266269
pub sharding_function: String,
267270
pub shards: HashMap<String, Shard>,
268271
pub users: HashMap<String, User>,
269272
}
270273

274+
impl Hash for Pool {
275+
fn hash<H: Hasher>(&self, state: &mut H) {
276+
self.pool_mode.hash(state);
277+
self.default_role.hash(state);
278+
self.query_parser_enabled.hash(state);
279+
self.primary_reads_enabled.hash(state);
280+
self.sharding_function.hash(state);
281+
self.connect_timeout.hash(state);
282+
283+
for (key, value) in &self.shards {
284+
key.hash(state);
285+
value.hash(state);
286+
}
287+
288+
for (key, value) in &self.users {
289+
key.hash(state);
290+
value.hash(state);
291+
}
292+
}
293+
}
294+
271295
impl Pool {
272296
fn default_pool_mode() -> PoolMode {
273297
PoolMode::Transaction
@@ -284,6 +308,7 @@ impl Default for Pool {
284308
query_parser_enabled: false,
285309
primary_reads_enabled: false,
286310
sharding_function: "pg_bigint_hash".to_string(),
311+
connect_timeout: General::default_connect_timeout(),
287312
}
288313
}
289314
}
@@ -296,7 +321,7 @@ pub struct ServerConfig {
296321
}
297322

298323
/// Shard configuration.
299-
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq)]
324+
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Hash, Eq)]
300325
pub struct Shard {
301326
pub database: String,
302327
pub servers: Vec<ServerConfig>,
@@ -575,7 +600,10 @@ pub async fn parse(path: &str) -> Result<(), Error> {
575600
None => (),
576601
};
577602

578-
for (pool_name, pool) in &config.pools {
603+
for (pool_name, mut pool) in &mut config.pools {
604+
// Copy the connect timeout over for hashing.
605+
pool.connect_timeout = config.general.connect_timeout;
606+
579607
match pool.sharding_function.as_ref() {
580608
"pg_bigint_hash" => (),
581609
"sha1" => (),
@@ -666,7 +694,7 @@ pub async fn reload_config(client_server_map: ClientServerMap) -> Result<bool, E
666694
let new_config = get_config();
667695

668696
if old_config.pools != new_config.pools {
669-
info!("Pool configuration changed, re-creating server pools");
697+
info!("Pool configuration changed");
670698
ConnectionPool::from_config(client_server_map).await?;
671699
Ok(true)
672700
} else if old_config != new_config {

src/pool.rs

Lines changed: 19 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ use once_cell::sync::Lazy;
88
use parking_lot::{Mutex, RwLock};
99
use rand::seq::SliceRandom;
1010
use rand::thread_rng;
11-
use std::collections::HashMap;
11+
use std::collections::{HashMap, HashSet};
1212
use std::sync::Arc;
1313
use std::time::Instant;
1414

@@ -26,6 +26,8 @@ pub type PoolMap = HashMap<(String, String), ConnectionPool>;
2626
/// This is atomic and safe and read-optimized.
2727
/// The pool is recreated dynamically when the config is reloaded.
2828
pub static POOLS: Lazy<ArcSwap<PoolMap>> = Lazy::new(|| ArcSwap::from_pointee(HashMap::default()));
29+
static POOLS_HASH: Lazy<ArcSwap<HashSet<crate::config::Pool>>> =
30+
Lazy::new(|| ArcSwap::from_pointee(HashSet::default()));
2931

3032
/// Pool settings.
3133
#[derive(Clone, Debug)]
@@ -101,9 +103,23 @@ impl ConnectionPool {
101103
let mut new_pools = HashMap::new();
102104
let mut address_id = 0;
103105

106+
let mut pools_hash = (*(*POOLS_HASH.load())).clone();
107+
104108
for (pool_name, pool_config) in &config.pools {
109+
let changed = pools_hash.insert(pool_config.clone());
110+
111+
if !changed {
112+
info!("[db: {}] has not changed", pool_name);
113+
continue;
114+
}
115+
105116
// There is one pool per database/user pair.
106117
for (_, user) in &pool_config.users {
118+
info!(
119+
"[pool: {}][user: {}] creating new pool",
120+
pool_name, user.username
121+
);
122+
107123
let mut shards = Vec::new();
108124
let mut addresses = Vec::new();
109125
let mut banlist = Vec::new();
@@ -156,7 +172,7 @@ impl ConnectionPool {
156172
let pool = Pool::builder()
157173
.max_size(user.pool_size)
158174
.connection_timeout(std::time::Duration::from_millis(
159-
config.general.connect_timeout,
175+
pool_config.connect_timeout,
160176
))
161177
.test_on_check_out(false)
162178
.build(manager)
@@ -217,6 +233,7 @@ impl ConnectionPool {
217233
}
218234

219235
POOLS.store(Arc::new(new_pools.clone()));
236+
POOLS_HASH.store(Arc::new(pools_hash.clone()));
220237

221238
Ok(())
222239
}

0 commit comments

Comments
 (0)