Skip to content

Commit c6ccc6b

Browse files
authored
Merge pull request #5 from levkk/levkk-replica-primary
#4 Primary/replica selection
2 parents 00f2d39 + 495d6ce commit c6ccc6b

File tree

11 files changed

+427
-45
lines changed

11 files changed

+427
-45
lines changed

README.md

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,8 +34,9 @@ See [sharding README](./tests/sharding/README.md) for sharding logic testing.
3434
3. `COPY` protocol support.
3535
4. Query cancellation.
3636
5. Round-robin load balancing of replicas.
37-
6. Banlist & failover
37+
6. Banlist & failover.
3838
7. Sharding!
39+
8. Explicit query routing to primary or replicas.
3940

4041
### Session mode
4142
Each client owns its own server for the duration of the session. Commands like `SET` are allowed.
@@ -56,7 +57,8 @@ this might be relevant given than this is a transactional pooler but if you're n
5657
### Round-robin load balancing
5758
This is the novel part. PgBouncer doesn't support it and suggests we use DNS or a TCP proxy instead.
5859
We prefer to have everything as part of one package; arguably, it's easier to understand and optimize.
59-
This pooler will round-robin between multiple replicas keeping load reasonably even.
60+
This pooler will round-robin between multiple replicas keeping load reasonably even. If the primary is in
61+
the pool as well, it'll be treated as a replica for read-only queries.
6062

6163
### Banlist & failover
6264
This is where it gets even more interesting. If we fail to connect to one of the replicas or it fails a health check,
@@ -82,6 +84,19 @@ SET SHARDING KEY TO '1234';
8284

8385
This sharding key will be hashed and the pooler will select a shard to use for the next transaction. If the pooler is in session mode, this sharding key has to be set as the first query on startup & cannot be changed until the client re-connects.
8486

87+
### Explicit read/write query routing
88+
89+
If you want to have the primary and replicas in the same pooler, you'd probably want to
90+
route queries explicitely to the primary or replicas, depending if they are reads or writes (e.g `SELECT`s or `INSERT`/`UPDATE`, etc). To help with this, we introduce some more custom syntax:
91+
92+
```sql
93+
SET SERVER ROLE TO 'primary';
94+
SET SERVER ROLE TO 'replica';
95+
```
96+
97+
After executing this, the next transaction will be routed to the primary or replica respectively. By default, all queries will be load-balanced between all servers, so if the client wants to write or talk to the primary, they have to explicitely select it using the syntax above.
98+
99+
85100

86101
## Missing
87102

pgcat.toml

Lines changed: 13 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -43,26 +43,29 @@ password = "sharding_user"
4343
# Shard 0
4444
[shards.0]
4545

46-
# [ host, port ]
46+
# [ host, port, role ]
4747
servers = [
48-
[ "127.0.0.1", 5432 ],
49-
[ "localhost", 5432 ],
48+
[ "127.0.0.1", 5432, "primary" ],
49+
[ "localhost", 5432, "replica" ],
50+
# [ "127.0.1.1", 5432, "replica" ],
5051
]
5152
# Database name (e.g. "postgres")
5253
database = "shard0"
5354

5455
[shards.1]
55-
# [ host, port ]
56+
# [ host, port, role ]
5657
servers = [
57-
[ "127.0.0.1", 5432 ],
58-
[ "localhost", 5432 ],
58+
[ "127.0.0.1", 5432, "primary" ],
59+
[ "localhost", 5432, "replica" ],
60+
# [ "127.0.1.1", 5432, "replica" ],
5961
]
6062
database = "shard1"
6163

6264
[shards.2]
63-
# [ host, port ]
65+
# [ host, port, role ]
6466
servers = [
65-
[ "127.0.0.1", 5432 ],
66-
[ "localhost", 5432 ],
67+
[ "127.0.0.1", 5432, "primary" ],
68+
[ "localhost", 5432, "replica" ],
69+
# [ "127.0.1.1", 5432, "replica" ],
6770
]
68-
database = "shard2"
71+
database = "shard2"

src/client.rs

Lines changed: 70 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -7,13 +7,15 @@ use tokio::io::{AsyncReadExt, BufReader};
77
use tokio::net::tcp::{OwnedReadHalf, OwnedWriteHalf};
88
use tokio::net::TcpStream;
99

10+
use crate::config::Role;
1011
use crate::errors::Error;
1112
use crate::messages::*;
1213
use crate::pool::{ClientServerMap, ConnectionPool};
1314
use crate::server::Server;
1415
use crate::sharding::Sharder;
1516

1617
const SHARDING_REGEX: &str = r"SET SHARDING KEY TO '[0-9]+';";
18+
const ROLE_REGEX: &str = r"SET SERVER ROLE TO '(PRIMARY|REPLICA)';";
1719

1820
/// The client state. One of these is created per client.
1921
pub struct Client {
@@ -45,6 +47,9 @@ pub struct Client {
4547

4648
// sharding regex
4749
sharding_regex: Regex,
50+
51+
// role detection regex
52+
role_regex: Regex,
4853
}
4954

5055
impl Client {
@@ -57,6 +62,7 @@ impl Client {
5762
transaction_mode: bool,
5863
) -> Result<Client, Error> {
5964
let sharding_regex = Regex::new(SHARDING_REGEX).unwrap();
65+
let role_regex = Regex::new(ROLE_REGEX).unwrap();
6066

6167
loop {
6268
// Could be StartupMessage or SSLRequest
@@ -114,6 +120,7 @@ impl Client {
114120
secret_key: secret_key,
115121
client_server_map: client_server_map,
116122
sharding_regex: sharding_regex,
123+
role_regex: role_regex,
117124
});
118125
}
119126

@@ -134,6 +141,7 @@ impl Client {
134141
secret_key: secret_key,
135142
client_server_map: client_server_map,
136143
sharding_regex: sharding_regex,
144+
role_regex: role_regex,
137145
});
138146
}
139147

@@ -145,7 +153,7 @@ impl Client {
145153
}
146154

147155
/// Client loop. We handle all messages between the client and the database here.
148-
pub async fn handle(&mut self, pool: ConnectionPool) -> Result<(), Error> {
156+
pub async fn handle(&mut self, mut pool: ConnectionPool) -> Result<(), Error> {
149157
// Special: cancelling existing running query
150158
if self.cancel_mode {
151159
let (process_id, secret_key, address, port) = {
@@ -172,6 +180,9 @@ impl Client {
172180
// - if in transaction mode, this lives for the duration of one transaction.
173181
let mut shard: Option<usize> = None;
174182

183+
// Active database role we want to talk to, e.g. primary or replica.
184+
let mut role: Option<Role> = None;
185+
175186
loop {
176187
// Read a complete message from the client, which normally would be
177188
// either a `Q` (query) or `P` (prepare, extended protocol).
@@ -182,18 +193,36 @@ impl Client {
182193

183194
// Parse for special select shard command.
184195
// SET SHARDING KEY TO 'bigint';
185-
match self.select_shard(message.clone(), pool.shards()).await {
196+
match self.select_shard(message.clone(), pool.shards()) {
186197
Some(s) => {
187-
set_sharding_key(&mut self.write).await?;
198+
custom_protocol_response_ok(&mut self.write, "SET SHARDING KEY").await?;
188199
shard = Some(s);
189200
continue;
190201
}
191202
None => (),
192203
};
193204

205+
// Parse for special server role selection command.
206+
//
207+
match self.select_role(message.clone()) {
208+
Some(r) => {
209+
custom_protocol_response_ok(&mut self.write, "SET SERVER ROLE").await?;
210+
role = Some(r);
211+
continue;
212+
}
213+
None => (),
214+
};
215+
194216
// Grab a server from the pool.
195217
// None = any shard
196-
let connection = pool.get(shard).await.unwrap();
218+
let connection = match pool.get(shard, role).await {
219+
Ok(conn) => conn,
220+
Err(err) => {
221+
println!(">> Could not get connection from pool: {:?}", err);
222+
return Err(err);
223+
}
224+
};
225+
197226
let mut proxy = connection.0;
198227
let _address = connection.1;
199228
let server = &mut *proxy;
@@ -232,10 +261,13 @@ impl Client {
232261

233262
match code {
234263
'Q' => {
264+
// TODO: implement retries here for read-only transactions.
235265
server.send(original).await?;
236266

237267
loop {
268+
// TODO: implement retries here for read-only transactions.
238269
let response = server.recv().await?;
270+
239271
match write_all_half(&mut self.write, response).await {
240272
Ok(_) => (),
241273
Err(err) => {
@@ -252,6 +284,7 @@ impl Client {
252284
// Release server
253285
if !server.in_transaction() && self.transaction_mode {
254286
shard = None;
287+
role = None;
255288
break;
256289
}
257290
}
@@ -290,10 +323,13 @@ impl Client {
290323
'S' => {
291324
// Extended protocol, client requests sync
292325
self.buffer.put(&original[..]);
326+
327+
// TODO: retries for read-only transactions
293328
server.send(self.buffer.clone()).await?;
294329
self.buffer.clear();
295330

296331
loop {
332+
// TODO: retries for read-only transactions
297333
let response = server.recv().await?;
298334
match write_all_half(&mut self.write, response).await {
299335
Ok(_) => (),
@@ -311,6 +347,7 @@ impl Client {
311347
// Release server
312348
if !server.in_transaction() && self.transaction_mode {
313349
shard = None;
350+
role = None;
314351
break;
315352
}
316353
}
@@ -338,6 +375,7 @@ impl Client {
338375
if !server.in_transaction() && self.transaction_mode {
339376
println!("Releasing after copy done");
340377
shard = None;
378+
role = None;
341379
break;
342380
}
343381
}
@@ -361,7 +399,7 @@ impl Client {
361399
/// Determine if the query is part of our special syntax, extract
362400
/// the shard key, and return the shard to query based on Postgres'
363401
/// PARTITION BY HASH function.
364-
async fn select_shard(&mut self, mut buf: BytesMut, shards: usize) -> Option<usize> {
402+
fn select_shard(&mut self, mut buf: BytesMut, shards: usize) -> Option<usize> {
365403
let code = buf.get_u8() as char;
366404

367405
// Only supporting simpe protocol here, so
@@ -390,4 +428,31 @@ impl Client {
390428
None
391429
}
392430
}
431+
432+
// Pick a primary or a replica from the pool.
433+
fn select_role(&mut self, mut buf: BytesMut) -> Option<Role> {
434+
let code = buf.get_u8() as char;
435+
436+
// Same story as select_shard() above.
437+
match code {
438+
'Q' => (),
439+
_ => return None,
440+
};
441+
442+
let len = buf.get_i32();
443+
let query = String::from_utf8_lossy(&buf[..len as usize - 4 - 1]).to_ascii_uppercase();
444+
445+
// Copy / paste from above. If we get one more of these use cases,
446+
// it'll be time to abstract :).
447+
if self.role_regex.is_match(&query) {
448+
let role = query.split("'").collect::<Vec<&str>>()[1];
449+
match role {
450+
"PRIMARY" => Some(Role::Primary),
451+
"REPLICA" => Some(Role::Replica),
452+
_ => return None,
453+
}
454+
} else {
455+
None
456+
}
457+
}
393458
}

src/config.rs

Lines changed: 51 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,14 +3,21 @@ use tokio::fs::File;
33
use tokio::io::AsyncReadExt;
44
use toml;
55

6-
use std::collections::HashMap;
6+
use std::collections::{HashMap, HashSet};
77

88
use crate::errors::Error;
99

10+
#[derive(Clone, PartialEq, Deserialize, Hash, std::cmp::Eq, Debug, Copy)]
11+
pub enum Role {
12+
Primary,
13+
Replica,
14+
}
15+
1016
#[derive(Clone, PartialEq, Hash, std::cmp::Eq, Debug)]
1117
pub struct Address {
1218
pub host: String,
1319
pub port: String,
20+
pub role: Role,
1421
}
1522

1623
#[derive(Clone, PartialEq, Hash, std::cmp::Eq, Deserialize, Debug)]
@@ -32,7 +39,7 @@ pub struct General {
3239

3340
#[derive(Deserialize, Debug, Clone)]
3441
pub struct Shard {
35-
pub servers: Vec<(String, u16)>,
42+
pub servers: Vec<(String, u16, String)>,
3643
pub database: String,
3744
}
3845

@@ -70,6 +77,47 @@ pub async fn parse(path: &str) -> Result<Config, Error> {
7077
}
7178
};
7279

80+
// Quick config sanity check.
81+
for shard in &config.shards {
82+
// We use addresses as unique identifiers,
83+
// let's make sure they are unique in the config as well.
84+
let mut dup_check = HashSet::new();
85+
let mut primary_count = 0;
86+
87+
for server in &shard.1.servers {
88+
dup_check.insert(server);
89+
90+
// Check that we define only zero or one primary.
91+
match server.2.as_ref() {
92+
"primary" => primary_count += 1,
93+
_ => (),
94+
};
95+
96+
// Check role spelling.
97+
match server.2.as_ref() {
98+
"primary" => (),
99+
"replica" => (),
100+
_ => {
101+
println!(
102+
"> Shard {} server role must be either 'primary' or 'replica', got: '{}'",
103+
shard.0, server.2
104+
);
105+
return Err(Error::BadConfig);
106+
}
107+
};
108+
}
109+
110+
if primary_count > 1 {
111+
println!("> Shard {} has more than on primary configured.", &shard.0);
112+
return Err(Error::BadConfig);
113+
}
114+
115+
if dup_check.len() != shard.1.servers.len() {
116+
println!("> Shard {} contains duplicate server configs.", &shard.0);
117+
return Err(Error::BadConfig);
118+
}
119+
}
120+
73121
Ok(config)
74122
}
75123

@@ -83,5 +131,6 @@ mod test {
83131
assert_eq!(config.general.pool_size, 15);
84132
assert_eq!(config.shards.len(), 3);
85133
assert_eq!(config.shards["1"].servers[0].0, "127.0.0.1");
134+
assert_eq!(config.shards["0"].servers[0].2, "primary");
86135
}
87136
}

src/errors.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,4 +8,5 @@ pub enum Error {
88
// ServerTimeout,
99
// DirtyServer,
1010
BadConfig,
11+
AllServersDown,
1112
}

src/main.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,7 @@ async fn main() {
7373
"> Healthcheck timeout: {}ms",
7474
config.general.healthcheck_timeout
7575
);
76+
println!("> Connection timeout: {}ms", config.general.connect_timeout);
7677

7778
let pool = ConnectionPool::from_config(config.clone(), client_server_map.clone()).await;
7879
let transaction_mode = config.general.pool_mode == "transaction";

0 commit comments

Comments
 (0)