Skip to content

Commit 3b79546

Browse files
authored
Fix client states reporting (#27)
* Fix client states reporting * clean
1 parent d4c1fc8 commit 3b79546

File tree

3 files changed

+70
-50
lines changed

3 files changed

+70
-50
lines changed

src/client.rs

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -192,6 +192,9 @@ impl Client {
192192
// We expect the client to either start a transaction with regular queries
193193
// or issue commands for our sharding and server selection protocols.
194194
loop {
195+
// Client idle, waiting for messages.
196+
self.stats.client_idle(self.process_id);
197+
195198
// Read a complete message from the client, which normally would be
196199
// either a `Q` (query) or `P` (prepare, extended protocol).
197200
// We can parse it here before grabbing a server from the pool,
@@ -243,6 +246,9 @@ impl Client {
243246
continue;
244247
}
245248

249+
// Waiting for server connection.
250+
self.stats.client_waiting(self.process_id);
251+
246252
// Grab a server from the pool: the client issued a regular query.
247253
let connection = match pool.get(query_router.shard(), query_router.role()).await {
248254
Ok(conn) => conn,
@@ -261,6 +267,9 @@ impl Client {
261267
// Claim this server as mine for query cancellation.
262268
server.claim(self.process_id, self.secret_key);
263269

270+
// Client active
271+
self.stats.client_active(self.process_id);
272+
264273
// Transaction loop. Multiple queries can be issued by the client here.
265274
// The connection belongs to the client until the transaction is over,
266275
// or until the client disconnects if we are in session mode.
@@ -330,7 +339,6 @@ impl Client {
330339
// If we are in session mode, we keep the server until the client disconnects.
331340
if self.transaction_mode {
332341
// Report this client as idle.
333-
self.stats.client_idle();
334342
break;
335343
}
336344
}
@@ -412,7 +420,6 @@ impl Client {
412420
self.stats.transaction();
413421

414422
if self.transaction_mode {
415-
self.stats.client_idle();
416423
break;
417424
}
418425
}
@@ -446,7 +453,6 @@ impl Client {
446453
self.stats.transaction();
447454

448455
if self.transaction_mode {
449-
self.stats.client_idle();
450456
break;
451457
}
452458
}
@@ -471,3 +477,9 @@ impl Client {
471477
guard.remove(&(self.process_id, self.secret_key));
472478
}
473479
}
480+
481+
impl Drop for Client {
482+
fn drop(&mut self) {
483+
self.stats.client_disconnecting(self.process_id);
484+
}
485+
}

src/pool.rs

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -147,10 +147,6 @@ impl ConnectionPool {
147147
role: Option<Role>,
148148
) -> Result<(PooledConnection<'_, ServerPool>, Address), Error> {
149149
let now = Instant::now();
150-
151-
// We are waiting for a server now.
152-
self.stats.client_waiting();
153-
154150
let addresses = &self.addresses[shard];
155151

156152
let mut allowed_attempts = match role {
@@ -222,7 +218,6 @@ impl ConnectionPool {
222218
Ok(res) => match res {
223219
Ok(_) => {
224220
self.stats.checkout_time(now.elapsed().as_micros());
225-
self.stats.client_active();
226221
return Ok((conn, address.clone()));
227222
}
228223
Err(_) => {

src/stats.rs

Lines changed: 55 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ use std::time::Instant;
77

88
use crate::config::get_config;
99

10-
#[derive(Debug)]
10+
#[derive(Debug, Clone, Copy)]
1111
pub enum StatisticName {
1212
CheckoutTime,
1313
//QueryRuntime,
@@ -16,15 +16,17 @@ pub enum StatisticName {
1616
Transactions,
1717
DataSent,
1818
DataReceived,
19-
ClientsWaiting,
20-
ClientsActive,
21-
ClientsIdle,
19+
ClientWaiting,
20+
ClientActive,
21+
ClientIdle,
22+
ClientDisconnecting,
2223
}
2324

2425
#[derive(Debug)]
2526
pub struct Statistic {
2627
pub name: StatisticName,
2728
pub value: i64,
29+
pub process_id: Option<i32>,
2830
}
2931

3032
#[derive(Clone, Debug)]
@@ -41,6 +43,7 @@ impl Reporter {
4143
let statistic = Statistic {
4244
name: StatisticName::Queries,
4345
value: 1,
46+
process_id: None,
4447
};
4548

4649
let _ = self.tx.try_send(statistic);
@@ -50,6 +53,7 @@ impl Reporter {
5053
let statistic = Statistic {
5154
name: StatisticName::Transactions,
5255
value: 1,
56+
process_id: None,
5357
};
5458

5559
let _ = self.tx.try_send(statistic);
@@ -59,6 +63,7 @@ impl Reporter {
5963
let statistic = Statistic {
6064
name: StatisticName::DataSent,
6165
value: amount as i64,
66+
process_id: None,
6267
};
6368

6469
let _ = self.tx.try_send(statistic);
@@ -68,6 +73,7 @@ impl Reporter {
6873
let statistic = Statistic {
6974
name: StatisticName::DataReceived,
7075
value: amount as i64,
76+
process_id: None,
7177
};
7278

7379
let _ = self.tx.try_send(statistic);
@@ -77,54 +83,48 @@ impl Reporter {
7783
let statistic = Statistic {
7884
name: StatisticName::CheckoutTime,
7985
value: ms as i64,
86+
process_id: None,
8087
};
8188

8289
let _ = self.tx.try_send(statistic);
8390
}
8491

85-
pub fn client_waiting(&mut self) {
92+
pub fn client_waiting(&mut self, process_id: i32) {
8693
let statistic = Statistic {
87-
name: StatisticName::ClientsWaiting,
94+
name: StatisticName::ClientWaiting,
8895
value: 1,
89-
};
90-
91-
let _ = self.tx.try_send(statistic);
92-
93-
let statistic = Statistic {
94-
name: StatisticName::ClientsIdle,
95-
value: -1,
96+
process_id: Some(process_id),
9697
};
9798

9899
let _ = self.tx.try_send(statistic);
99100
}
100101

101-
pub fn client_active(&mut self) {
102-
let statistic = Statistic {
103-
name: StatisticName::ClientsWaiting,
104-
value: -1,
105-
};
106-
107-
let _ = self.tx.try_send(statistic);
102+
pub fn client_active(&mut self, process_id: i32) {
108103

109104
let statistic = Statistic {
110-
name: StatisticName::ClientsActive,
105+
name: StatisticName::ClientActive,
111106
value: 1,
107+
process_id: Some(process_id),
112108
};
113109

114110
let _ = self.tx.try_send(statistic);
115111
}
116112

117-
pub fn client_idle(&mut self) {
113+
pub fn client_idle(&mut self, process_id: i32) {
118114
let statistic = Statistic {
119-
name: StatisticName::ClientsActive,
120-
value: -1,
115+
name: StatisticName::ClientIdle,
116+
value: 1,
117+
process_id: Some(process_id),
121118
};
122119

123120
let _ = self.tx.try_send(statistic);
121+
}
124122

123+
pub fn client_disconnecting(&mut self, process_id: i32) {
125124
let statistic = Statistic {
126-
name: StatisticName::ClientsIdle,
125+
name: StatisticName::ClientDisconnecting,
127126
value: 1,
127+
process_id: Some(process_id),
128128
};
129129

130130
let _ = self.tx.try_send(statistic);
@@ -158,6 +158,8 @@ impl Collector {
158158
("cl_idle", 0),
159159
]);
160160

161+
let mut client_states: HashMap<i32, StatisticName> = HashMap::new();
162+
161163
let mut now = Instant::now();
162164

163165
loop {
@@ -210,32 +212,43 @@ impl Collector {
210212
}
211213
}
212214

213-
StatisticName::ClientsActive => {
214-
let counter = stats.entry("cl_active").or_insert(0);
215-
216-
*counter += stat.value;
217-
*counter = std::cmp::max(*counter, 0);
218-
}
219-
220-
StatisticName::ClientsWaiting => {
221-
let counter = stats.entry("cl_waiting").or_insert(0);
222-
*counter += stat.value;
223-
*counter = std::cmp::max(*counter, 0);
215+
StatisticName::ClientActive | StatisticName::ClientWaiting | StatisticName::ClientIdle => {
216+
client_states.insert(stat.process_id.unwrap(), stat.name);
224217
}
225218

226-
StatisticName::ClientsIdle => {
227-
let counter = stats.entry("cl_idle").or_insert(0);
228-
*counter += stat.value;
229-
*counter = std::cmp::max(*counter, 0);
219+
StatisticName::ClientDisconnecting => {
220+
client_states.remove(&stat.process_id.unwrap());
230221
}
231222
};
232223

224+
233225
// It's been 15 seconds. If there is no traffic, it won't publish anything,
234226
// but it also doesn't matter then.
235227
if now.elapsed().as_secs() > 15 {
236-
let mut pipeline = self.client.pipeline();
228+
for (_, state) in &client_states {
229+
match state {
230+
StatisticName::ClientActive => {
231+
let counter = stats.entry("cl_active").or_insert(0);
232+
*counter += 1;
233+
}
234+
235+
StatisticName::ClientWaiting => {
236+
let counter = stats.entry("cl_waiting").or_insert(0);
237+
*counter += 1;
238+
}
239+
240+
StatisticName::ClientIdle => {
241+
let counter = stats.entry("cl_idle").or_insert(0);
242+
*counter += 1;
243+
}
244+
245+
_ => unreachable!(),
246+
};
247+
}
248+
249+
println!(">> Reporting to StatsD: {:?}", stats);
237250

238-
println!(">> Publishing statistics to StatsD: {:?}", stats);
251+
let mut pipeline = self.client.pipeline();
239252

240253
for (key, value) in stats.iter_mut() {
241254
pipeline.gauge(key, *value as f64);

0 commit comments

Comments
 (0)