1
- use log:: { error, info} ;
1
+ use log:: { debug , error, info} ;
2
2
use once_cell:: sync:: OnceCell ;
3
3
use statsd:: Client ;
4
4
/// Events collector and publisher.
@@ -10,6 +10,7 @@ use std::sync::{Arc, Mutex};
10
10
use crate :: config:: get_config;
11
11
12
12
static LATEST_STATS : OnceCell < Arc < Mutex < HashMap < String , i64 > > > > = OnceCell :: new ( ) ;
13
+ static STAT_PERIOD : u64 = 15000 ; //15 seconds
13
14
14
15
#[ derive( Debug , Clone , Copy ) ]
15
16
enum EventName {
@@ -228,7 +229,15 @@ impl Collector {
228
229
( "total_xact_count" , 0 ) ,
229
230
( "total_sent" , 0 ) ,
230
231
( "total_received" , 0 ) ,
232
+ ( "total_xact_time" , 0 ) ,
233
+ ( "total_query_time" , 0 ) ,
231
234
( "total_wait_time" , 0 ) ,
235
+ ( "avg_xact_time" , 0 ) ,
236
+ ( "avg_query_time" , 0 ) ,
237
+ ( "avg_xact_count" , 0 ) ,
238
+ ( "avg_sent" , 0 ) ,
239
+ ( "avg_received" , 0 ) ,
240
+ ( "avg_wait_time" , 0 ) ,
232
241
( "maxwait_us" , 0 ) ,
233
242
( "maxwait" , 0 ) ,
234
243
( "cl_waiting" , 0 ) ,
@@ -240,11 +249,18 @@ impl Collector {
240
249
( "sv_tested" , 0 ) ,
241
250
] ) ;
242
251
252
+ // Stats saved after each iteration of the flush event. Used in calculation
253
+ // of averages in the last flush period.
254
+ let mut old_stats: HashMap < String , i64 > = HashMap :: new ( ) ;
255
+
256
+ // Track which state the client and server are at any given time.
243
257
let mut client_server_states: HashMap < i32 , EventName > = HashMap :: new ( ) ;
244
- let tx = self . tx . clone ( ) ;
245
258
259
+ // Flush stats to StatsD and calculate averages every 15 seconds.
260
+ let tx = self . tx . clone ( ) ;
246
261
tokio:: task:: spawn ( async move {
247
- let mut interval = tokio:: time:: interval ( tokio:: time:: Duration :: from_millis ( 15000 ) ) ;
262
+ let mut interval =
263
+ tokio:: time:: interval ( tokio:: time:: Duration :: from_millis ( STAT_PERIOD ) ) ;
248
264
loop {
249
265
interval. tick ( ) . await ;
250
266
let _ = tx. try_send ( Event {
@@ -255,6 +271,7 @@ impl Collector {
255
271
}
256
272
} ) ;
257
273
274
+ // The collector loop
258
275
loop {
259
276
let stat = match self . rx . recv ( ) . await {
260
277
Some ( stat) => stat,
@@ -291,10 +308,11 @@ impl Collector {
291
308
* counter += stat. value ;
292
309
293
310
let counter = stats. entry ( "maxwait_us" ) . or_insert ( 0 ) ;
311
+ let mic_part = stat. value % 1_000_000 ;
294
312
295
313
// Report max time here
296
- if stat . value > * counter {
297
- * counter = stat . value ;
314
+ if mic_part > * counter {
315
+ * counter = mic_part ;
298
316
}
299
317
300
318
let counter = stats. entry ( "maxwait" ) . or_insert ( 0 ) ;
@@ -320,6 +338,7 @@ impl Collector {
320
338
}
321
339
322
340
EventName :: FlushStatsToStatsD => {
341
+ // Calculate connection states
323
342
for ( _, state) in & client_server_states {
324
343
match state {
325
344
EventName :: ClientActive => {
@@ -361,8 +380,26 @@ impl Collector {
361
380
} ;
362
381
}
363
382
364
- info ! ( "{:?}" , stats) ;
383
+ // Calculate averages
384
+ for stat in & [
385
+ "avg_query_count" ,
386
+ "avgxact_count" ,
387
+ "avg_sent" ,
388
+ "avg_received" ,
389
+ "avg_wait_time" ,
390
+ ] {
391
+ let total_name = stat. replace ( "avg_" , "total_" ) ;
392
+ let old_value = old_stats. entry ( total_name. clone ( ) ) . or_insert ( 0 ) ;
393
+ let new_value = stats. get ( total_name. as_str ( ) ) . unwrap_or ( & 0 ) . to_owned ( ) ;
394
+ let avg = ( new_value - * old_value) / ( STAT_PERIOD as i64 / 1_000 ) ; // Avg / second
395
+
396
+ stats. insert ( stat, avg) ;
397
+ * old_value = new_value;
398
+ }
365
399
400
+ debug ! ( "{:?}" , stats) ;
401
+
402
+ // Update latest stats used in SHOW STATS
366
403
match LATEST_STATS . get ( ) {
367
404
Some ( arc) => {
368
405
let mut guard = arc. lock ( ) . unwrap ( ) ;
@@ -376,9 +413,24 @@ impl Collector {
376
413
377
414
let mut pipeline = self . client . pipeline ( ) ;
378
415
379
- for ( key, value) in stats. iter_mut ( ) {
416
+ for ( key, value) in stats. iter ( ) {
380
417
pipeline. gauge ( key, * value as f64 ) ;
381
- * value = 0 ;
418
+ }
419
+
420
+ // These are re-calculated every iteration of the loop, so we don't want to add values
421
+ // from the last iteration.
422
+ for stat in & [
423
+ "cl_active" ,
424
+ "cl_waiting" ,
425
+ "cl_idle" ,
426
+ "sv_idle" ,
427
+ "sv_active" ,
428
+ "sv_tested" ,
429
+ "sv_login" ,
430
+ "maxwait" ,
431
+ "maxwait_us" ,
432
+ ] {
433
+ stats. insert ( stat, 0 ) ;
382
434
}
383
435
384
436
pipeline. send ( & self . client ) ;
0 commit comments