Skip to content

Commit bc5b9e4

Browse files
authored
Merge pull request #8 from levkk/default-role
add default server role; bug fix
2 parents 595e564 + e826343 commit bc5b9e4

File tree

5 files changed

+65
-12
lines changed

5 files changed

+65
-12
lines changed

pgcat.toml

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -69,3 +69,15 @@ servers = [
6969
# [ "127.0.1.1", 5432, "replica" ],
7070
]
7171
database = "shard2"
72+
73+
74+
# Settings for our query routing layer.
75+
[query_router]
76+
77+
# If the client doesn't specify, route traffic to
78+
# this role by default.
79+
#
80+
# any: round-robin between primary and replicas,
81+
# replica: round-robin between replicas only without touching the primary,
82+
# primary: all queries go to the primary unless otherwise specified.
83+
default_role = "any"

src/client.rs

Lines changed: 14 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,10 @@ pub struct Client {
4848
// Clients are mapped to servers while they use them. This allows a client
4949
// to connect and cancel a query.
5050
client_server_map: ClientServerMap,
51+
52+
// Unless client specifies, route queries to the servers that have this role,
53+
// e.g. primary or replicas or any.
54+
default_server_role: Option<Role>,
5155
}
5256

5357
impl Client {
@@ -58,6 +62,7 @@ impl Client {
5862
mut stream: TcpStream,
5963
client_server_map: ClientServerMap,
6064
transaction_mode: bool,
65+
default_server_role: Option<Role>,
6166
) -> Result<Client, Error> {
6267
loop {
6368
// Could be StartupMessage or SSLRequest
@@ -114,6 +119,7 @@ impl Client {
114119
process_id: process_id,
115120
secret_key: secret_key,
116121
client_server_map: client_server_map,
122+
default_server_role: default_server_role,
117123
});
118124
}
119125

@@ -133,6 +139,7 @@ impl Client {
133139
process_id: process_id,
134140
secret_key: secret_key,
135141
client_server_map: client_server_map,
142+
default_server_role: default_server_role,
136143
});
137144
}
138145

@@ -172,7 +179,7 @@ impl Client {
172179
let mut shard: Option<usize> = None;
173180

174181
// Active database role we want to talk to, e.g. primary or replica.
175-
let mut role: Option<Role> = None;
182+
let mut role: Option<Role> = self.default_server_role;
176183

177184
loop {
178185
// Read a complete message from the client, which normally would be
@@ -275,7 +282,7 @@ impl Client {
275282
// Release server
276283
if !server.in_transaction() && self.transaction_mode {
277284
shard = None;
278-
role = None;
285+
role = self.default_server_role;
279286
break;
280287
}
281288
}
@@ -338,7 +345,7 @@ impl Client {
338345
// Release server
339346
if !server.in_transaction() && self.transaction_mode {
340347
shard = None;
341-
role = None;
348+
role = self.default_server_role;
342349
break;
343350
}
344351
}
@@ -366,7 +373,7 @@ impl Client {
366373
if !server.in_transaction() && self.transaction_mode {
367374
println!("Releasing after copy done");
368375
shard = None;
369-
role = None;
376+
role = self.default_server_role;
370377
break;
371378
}
372379
}
@@ -382,15 +389,15 @@ impl Client {
382389
}
383390

384391
/// Release the server from being mine. I can't cancel its queries anymore.
385-
pub fn release(&mut self) {
392+
pub fn release(&self) {
386393
let mut guard = self.client_server_map.lock().unwrap();
387394
guard.remove(&(self.process_id, self.secret_key));
388395
}
389396

390397
/// Determine if the query is part of our special syntax, extract
391398
/// the shard key, and return the shard to query based on Postgres'
392399
/// PARTITION BY HASH function.
393-
fn select_shard(&mut self, mut buf: BytesMut, shards: usize) -> Option<usize> {
400+
fn select_shard(&self, mut buf: BytesMut, shards: usize) -> Option<usize> {
394401
let code = buf.get_u8() as char;
395402

396403
// Only supporting simpe protocol here, so
@@ -425,7 +432,7 @@ impl Client {
425432
}
426433

427434
// Pick a primary or a replica from the pool.
428-
fn select_role(&mut self, mut buf: BytesMut) -> Option<Role> {
435+
fn select_role(&self, mut buf: BytesMut) -> Option<Role> {
429436
let code = buf.get_u8() as char;
430437

431438
// Same story as select_shard() above.

src/config.rs

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,11 +43,17 @@ pub struct Shard {
4343
pub database: String,
4444
}
4545

46+
#[derive(Deserialize, Debug, Clone)]
47+
pub struct QueryRouter {
48+
pub default_role: String,
49+
}
50+
4651
#[derive(Deserialize, Debug, Clone)]
4752
pub struct Config {
4853
pub general: General,
4954
pub user: User,
5055
pub shards: HashMap<String, Shard>,
56+
pub query_router: QueryRouter,
5157
}
5258

5359
/// Parse the config.
@@ -118,6 +124,19 @@ pub async fn parse(path: &str) -> Result<Config, Error> {
118124
}
119125
}
120126

127+
match config.query_router.default_role.as_ref() {
128+
"any" => (),
129+
"primary" => (),
130+
"replica" => (),
131+
other => {
132+
println!(
133+
"> Query router default_role must be 'primary', 'replica', or 'any', got: '{}'",
134+
other
135+
);
136+
return Err(Error::BadConfig);
137+
}
138+
};
139+
121140
Ok(config)
122141
}
123142

@@ -132,5 +151,6 @@ mod test {
132151
assert_eq!(config.shards.len(), 3);
133152
assert_eq!(config.shards["1"].servers[0].0, "127.0.0.1");
134153
assert_eq!(config.shards["0"].servers[0].2, "primary");
154+
assert_eq!(config.query_router.default_role, "any");
135155
}
136156
}

src/main.rs

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ mod sharding;
4040

4141
// Support for query cancellation: this maps our process_ids and
4242
// secret keys to the backend's.
43+
use config::Role;
4344
use pool::{ClientServerMap, ConnectionPool};
4445

4546
/// Main!
@@ -87,6 +88,15 @@ async fn main() {
8788

8889
let pool = ConnectionPool::from_config(config.clone(), client_server_map.clone()).await;
8990
let transaction_mode = config.general.pool_mode == "transaction";
91+
let default_server_role = match config.query_router.default_role.as_ref() {
92+
"any" => None,
93+
"primary" => Some(Role::Primary),
94+
"replica" => Some(Role::Replica),
95+
_ => {
96+
println!("> Config error, got unexpected query_router.default_role.");
97+
return;
98+
}
99+
};
90100

91101
println!("> Waiting for clients...");
92102

@@ -109,7 +119,14 @@ async fn main() {
109119
addr, transaction_mode
110120
);
111121

112-
match client::Client::startup(socket, client_server_map, transaction_mode).await {
122+
match client::Client::startup(
123+
socket,
124+
client_server_map,
125+
transaction_mode,
126+
default_server_role,
127+
)
128+
.await
129+
{
113130
Ok(mut client) => {
114131
println!(">> Client {:?} authenticated successfully!", addr);
115132

src/pool.rs

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -124,10 +124,7 @@ impl ConnectionPool {
124124
// Make sure if a specific role is requested, it's available in the pool.
125125
match role {
126126
Some(role) => {
127-
let role_count = addresses
128-
.iter()
129-
.filter(|&db| db.role == Role::Primary)
130-
.count();
127+
let role_count = addresses.iter().filter(|&db| db.role == role).count();
131128

132129
if role_count == 0 {
133130
println!(

0 commit comments

Comments
 (0)