Skip to content

Commit d4186b7

Browse files
authored
More admin (#53)
* more admin * more admin * show lists * tests
1 parent aaeef69 commit d4186b7

File tree

4 files changed

+157
-16
lines changed

4 files changed

+157
-16
lines changed

.circleci/run_tests.sh

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,10 @@ psql -e -h 127.0.0.1 -p 6432 -d pgbouncer -c 'SHOW STATS' > /dev/null
6262
psql -h 127.0.0.1 -p 6432 -d pgbouncer -c 'RELOAD' > /dev/null
6363
psql -h 127.0.0.1 -p 6432 -d pgbouncer -c 'SHOW CONFIG' > /dev/null
6464
psql -h 127.0.0.1 -p 6432 -d pgbouncer -c 'SHOW DATABASES' > /dev/null
65+
psql -h 127.0.0.1 -p 6432 -d pgbouncer -c 'SHOW LISTS' > /dev/null
66+
psql -h 127.0.0.1 -p 6432 -d pgbouncer -c 'SHOW POOLS' > /dev/null
67+
psql -h 127.0.0.1 -p 6432 -d pgbouncer -c 'SHOW VERSION' > /dev/null
68+
psql -h 127.0.0.1 -p 6432 -d pgbouncer -c "SET client_encoding TO 'utf8'" > /dev/null # will ignore
6569
(! psql -e -h 127.0.0.1 -p 6432 -d random_db -c 'SHOW STATS' > /dev/null)
6670

6771
# Start PgCat in debug to demonstrate failover better

src/admin.rs

Lines changed: 124 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ use tokio::net::tcp::OwnedWriteHalf;
44

55
use std::collections::HashMap;
66

7-
use crate::config::{get_config, parse, Role};
7+
use crate::config::{get_config, parse};
88
use crate::errors::Error;
99
use crate::messages::*;
1010
use crate::pool::ConnectionPool;
@@ -41,6 +41,15 @@ pub async fn handle_admin(
4141
} else if query.starts_with("SHOW DATABASES") {
4242
trace!("SHOW DATABASES");
4343
show_databases(stream, &pool).await
44+
} else if query.starts_with("SHOW POOLS") {
45+
trace!("SHOW POOLS");
46+
show_pools(stream, &pool).await
47+
} else if query.starts_with("SHOW LISTS") {
48+
trace!("SHOW LISTS");
49+
show_lists(stream, &pool).await
50+
} else if query.starts_with("SHOW VERSION") {
51+
trace!("SHOW VERSION");
52+
show_version(stream).await
4453
} else if query.starts_with("SET ") {
4554
trace!("SET");
4655
ignore_set(stream).await
@@ -49,6 +58,118 @@ pub async fn handle_admin(
4958
}
5059
}
5160

61+
/// SHOW LISTS
62+
async fn show_lists(stream: &mut OwnedWriteHalf, pool: &ConnectionPool) -> Result<(), Error> {
63+
let stats = get_stats();
64+
65+
let columns = vec![("list", DataType::Text), ("items", DataType::Int4)];
66+
67+
let mut res = BytesMut::new();
68+
res.put(row_description(&columns));
69+
res.put(data_row(&vec![
70+
"databases".to_string(),
71+
pool.databases().to_string(),
72+
]));
73+
res.put(data_row(&vec!["users".to_string(), "1".to_string()]));
74+
res.put(data_row(&vec![
75+
"pools".to_string(),
76+
pool.databases().to_string(),
77+
]));
78+
res.put(data_row(&vec![
79+
"free_clients".to_string(),
80+
stats["cl_idle"].to_string(),
81+
]));
82+
res.put(data_row(&vec![
83+
"used_clients".to_string(),
84+
stats["cl_active"].to_string(),
85+
]));
86+
res.put(data_row(&vec![
87+
"login_clients".to_string(),
88+
"0".to_string(),
89+
]));
90+
res.put(data_row(&vec![
91+
"free_servers".to_string(),
92+
stats["sv_idle"].to_string(),
93+
]));
94+
res.put(data_row(&vec![
95+
"used_servers".to_string(),
96+
stats["sv_active"].to_string(),
97+
]));
98+
res.put(data_row(&vec!["dns_names".to_string(), "0".to_string()]));
99+
res.put(data_row(&vec!["dns_zones".to_string(), "0".to_string()]));
100+
res.put(data_row(&vec!["dns_queries".to_string(), "0".to_string()]));
101+
res.put(data_row(&vec!["dns_pending".to_string(), "0".to_string()]));
102+
103+
res.put(command_complete("SHOW"));
104+
105+
res.put_u8(b'Z');
106+
res.put_i32(5);
107+
res.put_u8(b'I');
108+
109+
write_all_half(stream, res).await
110+
}
111+
112+
/// SHOW VERSION
113+
async fn show_version(stream: &mut OwnedWriteHalf) -> Result<(), Error> {
114+
let mut res = BytesMut::new();
115+
116+
res.put(row_description(&vec![("version", DataType::Text)]));
117+
res.put(data_row(&vec!["PgCat 0.1.0".to_string()]));
118+
res.put(command_complete("SHOW"));
119+
120+
res.put_u8(b'Z');
121+
res.put_i32(5);
122+
res.put_u8(b'I');
123+
124+
write_all_half(stream, res).await
125+
}
126+
127+
/// SHOW POOLS
128+
async fn show_pools(stream: &mut OwnedWriteHalf, _pool: &ConnectionPool) -> Result<(), Error> {
129+
let stats = get_stats();
130+
let config = {
131+
let guard = get_config();
132+
&*guard.clone()
133+
};
134+
135+
let columns = vec![
136+
("database", DataType::Text),
137+
("user", DataType::Text),
138+
("cl_active", DataType::Numeric),
139+
("cl_waiting", DataType::Numeric),
140+
("cl_cancel_req", DataType::Numeric),
141+
("sv_active", DataType::Numeric),
142+
("sv_idle", DataType::Numeric),
143+
("sv_used", DataType::Numeric),
144+
("sv_tested", DataType::Numeric),
145+
("sv_login", DataType::Numeric),
146+
("maxwait", DataType::Numeric),
147+
("maxwait_us", DataType::Numeric),
148+
("pool_mode", DataType::Text),
149+
];
150+
151+
let mut res = BytesMut::new();
152+
res.put(row_description(&columns));
153+
154+
let mut row = vec![String::from("all"), config.user.name.clone()];
155+
156+
for column in &columns[2..columns.len() - 1] {
157+
let value = stats.get(column.0).unwrap_or(&0).to_string();
158+
row.push(value);
159+
}
160+
161+
row.push(config.general.pool_mode.to_string());
162+
163+
res.put(data_row(&row));
164+
res.put(command_complete("SHOW"));
165+
166+
res.put_u8(b'Z');
167+
res.put_i32(5);
168+
res.put_u8(b'I');
169+
170+
write_all_half(stream, res).await
171+
}
172+
52173
/// SHOW DATABASES
53174
async fn show_databases(stream: &mut OwnedWriteHalf, pool: &ConnectionPool) -> Result<(), Error> {
54175
let guard = get_config();
@@ -79,23 +200,13 @@ async fn show_databases(stream: &mut OwnedWriteHalf, pool: &ConnectionPool) -> R
79200

80201
for shard in 0..pool.shards() {
81202
let database_name = &config.shards[&shard.to_string()].database;
82-
let mut replica_count = 0;
83203

84204
for server in 0..pool.servers(shard) {
85205
let address = pool.address(shard, server);
86-
let name = match address.role {
87-
Role::Primary => format!("shard_{}_primary", shard),
88-
89-
Role::Replica => {
90-
let name = format!("shard_{}_replica_{}", shard, replica_count);
91-
replica_count += 1;
92-
name
93-
}
94-
};
95206
let pool_state = pool.pool_state(shard, server);
96207

97208
res.put(data_row(&vec![
98-
name, // name
209+
address.name(), // name
99210
address.host.to_string(), // host
100211
address.port.to_string(), // port
101212
database_name.to_string(), // database
@@ -222,7 +333,7 @@ async fn show_stats(stream: &mut OwnedWriteHalf) -> Result<(), Error> {
222333
res.put(row_description(&columns));
223334

224335
let mut row = vec![
225-
String::from("all shards"), // TODO: per-database stats,
336+
String::from("all"), // TODO: per-database stats,
226337
];
227338

228339
for column in &columns[1..] {

src/config.rs

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@ pub struct Address {
5252
pub port: String,
5353
pub shard: usize,
5454
pub role: Role,
55+
pub replica_number: usize,
5556
}
5657

5758
impl Default for Address {
@@ -60,11 +61,22 @@ impl Default for Address {
6061
host: String::from("127.0.0.1"),
6162
port: String::from("5432"),
6263
shard: 0,
64+
replica_number: 0,
6365
role: Role::Replica,
6466
}
6567
}
6668
}
6769

70+
impl Address {
71+
pub fn name(&self) -> String {
72+
match self.role {
73+
Role::Primary => format!("shard_{}_primary", self.shard),
74+
75+
Role::Replica => format!("shard_{}_replica_{}", self.shard, self.replica_number),
76+
}
77+
}
78+
}
79+
6880
#[derive(Clone, PartialEq, Hash, std::cmp::Eq, Deserialize, Debug)]
6981
pub struct User {
7082
pub name: String,

src/pool.rs

Lines changed: 17 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,8 @@ impl ConnectionPool {
4949
for shard_idx in shard_ids {
5050
let shard = &config.shards[&shard_idx];
5151
let mut pools = Vec::new();
52-
let mut replica_addresses = Vec::new();
52+
let mut servers = Vec::new();
53+
let mut replica_number = 0;
5354

5455
for server in shard.servers.iter() {
5556
let role = match server.2.as_ref() {
@@ -65,9 +66,14 @@ impl ConnectionPool {
6566
host: server.0.clone(),
6667
port: server.1.to_string(),
6768
role: role,
69+
replica_number,
6870
shard: shard_idx.parse::<usize>().unwrap(),
6971
};
7072

73+
if role == Role::Replica {
74+
replica_number += 1;
75+
}
76+
7177
let manager = ServerPool::new(
7278
address.clone(),
7379
config.user.clone(),
@@ -87,11 +93,11 @@ impl ConnectionPool {
8793
.unwrap();
8894

8995
pools.push(pool);
90-
replica_addresses.push(address);
96+
servers.push(address);
9197
}
9298

9399
shards.push(pools);
94-
addresses.push(replica_addresses);
100+
addresses.push(servers);
95101
banlist.push(HashMap::new());
96102
}
97103

@@ -337,6 +343,14 @@ impl ConnectionPool {
337343
self.addresses[shard].len()
338344
}
339345

346+
pub fn databases(&self) -> usize {
347+
let mut databases = 0;
348+
for shard in 0..self.shards() {
349+
databases += self.servers(shard);
350+
}
351+
databases
352+
}
353+
340354
pub fn pool_state(&self, shard: usize, server: usize) -> bb8::State {
341355
self.databases[shard][server].state()
342356
}

0 commit comments

Comments
 (0)