Skip to content

Commit b3c8ca4

Browse files
committed
Another example of a sharding function (#41)
* Another example of a sharding function * tests
1 parent dce72ba commit b3c8ca4

File tree

4 files changed

+99
-12
lines changed

4 files changed

+99
-12
lines changed

pgcat.toml

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -96,3 +96,13 @@ query_parser_enabled = false
9696
# load balancing of read queries. Otherwise, the primary will only be used for write
9797
# queries. The primary can always be explicitely selected with our custom protocol.
9898
primary_reads_enabled = true
99+
100+
# So what if you wanted to implement a different hashing function,
101+
# or you've already built one and you want this pooler to use it?
102+
#
103+
# Current options:
104+
#
105+
# pg_bigint_hash: PARTITION BY HASH (Postgres hashing function)
106+
# sha1: A hashing function based on SHA1
107+
#
108+
sharding_function = "pg_bigint_hash"

src/config.rs

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -118,6 +118,7 @@ pub struct QueryRouter {
118118
pub default_role: String,
119119
pub query_parser_enabled: bool,
120120
pub primary_reads_enabled: bool,
121+
pub sharding_function: String,
121122
}
122123

123124
impl Default for QueryRouter {
@@ -126,6 +127,7 @@ impl Default for QueryRouter {
126127
default_role: String::from("any"),
127128
query_parser_enabled: false,
128129
primary_reads_enabled: true,
130+
sharding_function: "pg_bigint_hash".to_string(),
129131
}
130132
}
131133
}
@@ -159,6 +161,8 @@ impl Config {
159161
self.general.healthcheck_timeout
160162
);
161163
info!("Connection timeout: {}ms", self.general.connect_timeout);
164+
info!("Sharding function: {}", self.query_router.sharding_function);
165+
info!("Number of shards: {}", self.shards.len());
162166
}
163167
}
164168

@@ -193,6 +197,18 @@ pub async fn parse(path: &str) -> Result<(), Error> {
193197
}
194198
};
195199

200+
match config.query_router.sharding_function.as_ref() {
201+
"pg_bigint_hash" => (),
202+
"sha1" => (),
203+
_ => {
204+
error!(
205+
"Supported sharding functions are: 'pg_bigint_hash', 'sha1', got: '{}'",
206+
config.query_router.sharding_function
207+
);
208+
return Err(Error::BadConfig);
209+
}
210+
};
211+
196212
// Quick config sanity check.
197213
for shard in &config.shards {
198214
// We use addresses as unique identifiers,

src/query_router.rs

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
use crate::config::{get_config, Role};
2-
use crate::sharding::Sharder;
2+
use crate::sharding::{Sharder, ShardingFunction};
33
/// Route queries automatically based on explicitely requested
44
/// or implied query characteristics.
55
use bytes::{Buf, BytesMut};
@@ -48,6 +48,9 @@ pub struct QueryRouter {
4848

4949
// Should we try to parse queries?
5050
query_parser_enabled: bool,
51+
52+
// Which sharding function are we using?
53+
sharding_function: ShardingFunction,
5154
}
5255

5356
impl QueryRouter {
@@ -76,6 +79,12 @@ impl QueryRouter {
7679
_ => unreachable!(),
7780
};
7881

82+
let sharding_function = match config.query_router.sharding_function.as_ref() {
83+
"pg_bigint_hash" => ShardingFunction::PgBigintHash,
84+
"sha1" => ShardingFunction::Sha1,
85+
_ => unreachable!(),
86+
};
87+
7988
QueryRouter {
8089
default_server_role: default_server_role,
8190
shards: config.shards.len(),
@@ -84,6 +93,7 @@ impl QueryRouter {
8493
active_shard: None,
8594
primary_reads_enabled: config.query_router.primary_reads_enabled,
8695
query_parser_enabled: config.query_router.query_parser_enabled,
96+
sharding_function,
8797
}
8898
}
8999

@@ -139,8 +149,8 @@ impl QueryRouter {
139149

140150
match command {
141151
Command::SetShardingKey => {
142-
let sharder = Sharder::new(self.shards);
143-
let shard = sharder.pg_bigint_hash(value.parse::<i64>().unwrap());
152+
let sharder = Sharder::new(self.shards, self.sharding_function);
153+
let shard = sharder.shard(value.parse::<i64>().unwrap());
144154
self.active_shard = Some(shard);
145155
value = shard.to_string();
146156
}

src/sharding.rs

Lines changed: 60 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,26 +1,62 @@
1+
use sha1::{Digest, Sha1};
2+
13
// https://github.com/postgres/postgres/blob/27b77ecf9f4d5be211900eda54d8155ada50d696/src/include/catalog/partition.h#L20
24
const PARTITION_HASH_SEED: u64 = 0x7A5B22367996DCFD;
35

6+
#[derive(Debug, PartialEq, Copy, Clone)]
7+
pub enum ShardingFunction {
8+
PgBigintHash,
9+
Sha1,
10+
}
11+
412
pub struct Sharder {
513
shards: usize,
14+
sharding_function: ShardingFunction,
615
}
716

817
impl Sharder {
9-
pub fn new(shards: usize) -> Sharder {
10-
Sharder { shards: shards }
18+
pub fn new(shards: usize, sharding_function: ShardingFunction) -> Sharder {
19+
Sharder {
20+
shards,
21+
sharding_function,
22+
}
23+
}
24+
25+
pub fn shard(&self, key: i64) -> usize {
26+
match self.sharding_function {
27+
ShardingFunction::PgBigintHash => self.pg_bigint_hash(key),
28+
ShardingFunction::Sha1 => self.sha1(key),
29+
}
1130
}
1231

1332
/// Hash function used by Postgres to determine which partition
1433
/// to put the row in when using HASH(column) partitioning.
1534
/// Source: https://github.com/postgres/postgres/blob/27b77ecf9f4d5be211900eda54d8155ada50d696/src/common/hashfn.c#L631
1635
/// Supports only 1 bigint at the moment, but we can add more later.
17-
pub fn pg_bigint_hash(&self, key: i64) -> usize {
36+
fn pg_bigint_hash(&self, key: i64) -> usize {
1837
let mut lohalf = key as u32;
1938
let hihalf = (key >> 32) as u32;
2039
lohalf ^= if key >= 0 { hihalf } else { !hihalf };
2140
Self::combine(0, Self::pg_u32_hash(lohalf)) as usize % self.shards
2241
}
2342

43+
/// Example of a hashing function based on SHA1.
44+
fn sha1(&self, key: i64) -> usize {
45+
let mut hasher = Sha1::new();
46+
47+
hasher.update(&key.to_string().as_bytes());
48+
49+
let result = hasher.finalize();
50+
51+
// Convert the SHA1 hash into hex so we can parse it as a large integer.
52+
let hex = format!("{:x}", result);
53+
54+
// Parse the last 8 bytes as an integer (8 bytes = bigint).
55+
let key = i64::from_str_radix(&hex[hex.len() - 8..], 16).unwrap() as usize;
56+
57+
key % self.shards
58+
}
59+
2460
#[inline]
2561
fn rot(x: u32, k: u32) -> u32 {
2662
(x << k) | (x >> (32 - k))
@@ -109,36 +145,51 @@ mod test {
109145
// confirming that we implemented Postgres BIGINT hashing correctly.
110146
#[test]
111147
fn test_pg_bigint_hash() {
112-
let sharder = Sharder::new(5);
148+
let sharder = Sharder::new(5, ShardingFunction::PgBigintHash);
113149

114150
let shard_0 = vec![1, 4, 5, 14, 19, 39, 40, 46, 47, 53];
115151

116152
for v in shard_0 {
117-
assert_eq!(sharder.pg_bigint_hash(v), 0);
153+
assert_eq!(sharder.shard(v), 0);
118154
}
119155

120156
let shard_1 = vec![2, 3, 11, 17, 21, 23, 30, 49, 51, 54];
121157

122158
for v in shard_1 {
123-
assert_eq!(sharder.pg_bigint_hash(v), 1);
159+
assert_eq!(sharder.shard(v), 1);
124160
}
125161

126162
let shard_2 = vec![6, 7, 15, 16, 18, 20, 25, 28, 34, 35];
127163

128164
for v in shard_2 {
129-
assert_eq!(sharder.pg_bigint_hash(v), 2);
165+
assert_eq!(sharder.shard(v), 2);
130166
}
131167

132168
let shard_3 = vec![8, 12, 13, 22, 29, 31, 33, 36, 41, 43];
133169

134170
for v in shard_3 {
135-
assert_eq!(sharder.pg_bigint_hash(v), 3);
171+
assert_eq!(sharder.shard(v), 3);
136172
}
137173

138174
let shard_4 = vec![9, 10, 24, 26, 27, 32, 37, 38, 42, 45];
139175

140176
for v in shard_4 {
141-
assert_eq!(sharder.pg_bigint_hash(v), 4);
177+
assert_eq!(sharder.shard(v), 4);
178+
}
179+
}
180+
181+
#[test]
182+
fn test_sha1_hash() {
183+
let sharder = Sharder::new(12, ShardingFunction::Sha1);
184+
let ids = vec![
185+
0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19,
186+
];
187+
let shards = vec![
188+
4, 7, 8, 3, 6, 0, 0, 10, 3, 11, 1, 7, 4, 4, 11, 2, 5, 0, 8, 3,
189+
];
190+
191+
for (i, id) in ids.iter().enumerate() {
192+
assert_eq!(sharder.shard(*id), shards[i]);
142193
}
143194
}
144195
}

0 commit comments

Comments
 (0)