Skip to content

Commit 880e236

Browse files
authored
feat(query): add metrics for session queue manager (#14966)
1 parent e807a73 commit 880e236

File tree

2 files changed

+70
-6
lines changed

2 files changed

+70
-6
lines changed

src/common/metrics/src/metrics/session.rs

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,18 +13,31 @@
1313
// limitations under the License.
1414

1515
use std::sync::LazyLock;
16+
use std::time::Duration;
1617

1718
use crate::register_counter;
1819
use crate::register_gauge;
20+
use crate::register_histogram_in_milliseconds;
1921
use crate::Counter;
2022
use crate::Gauge;
23+
use crate::Histogram;
2124

2225
pub static SESSION_CONNECT_NUMBERS: LazyLock<Counter> =
2326
LazyLock::new(|| register_counter("session_connect_numbers"));
2427
pub static SESSION_CLOSE_NUMBERS: LazyLock<Counter> =
2528
LazyLock::new(|| register_counter("session_close_numbers"));
2629
pub static SESSION_ACTIVE_CONNECTIONS: LazyLock<Gauge> =
2730
LazyLock::new(|| register_gauge("session_connections"));
31+
pub static SESSION_QUQUED_QUERIES: LazyLock<Gauge> =
32+
LazyLock::new(|| register_gauge("session_queued_queries"));
33+
pub static SESSION_QUEUE_ABORT_COUNT: LazyLock<Counter> =
34+
LazyLock::new(|| register_counter("session_queue_abort_count"));
35+
pub static SESSION_QUEUE_ACQUIRE_ERROR_COUNT: LazyLock<Counter> =
36+
LazyLock::new(|| register_counter("session_queue_acquire_error_count"));
37+
pub static SESSION_QUEUE_ACQUIRE_TIMEOUT_COUNT: LazyLock<Counter> =
38+
LazyLock::new(|| register_counter("session_queue_acquire_timeout_count"));
39+
pub static SESSION_QUEUE_ACQUIRE_DURATION_MS: LazyLock<Histogram> =
40+
LazyLock::new(|| register_histogram_in_milliseconds("session_queue_acquire_duration_ms"));
2841

2942
pub fn incr_session_connect_numbers() {
3043
SESSION_CONNECT_NUMBERS.inc();
@@ -37,3 +50,23 @@ pub fn incr_session_close_numbers() {
3750
pub fn set_session_active_connections(num: usize) {
3851
SESSION_ACTIVE_CONNECTIONS.set(num as i64);
3952
}
53+
54+
pub fn set_session_queued_queries(num: usize) {
55+
SESSION_QUQUED_QUERIES.set(num as i64);
56+
}
57+
58+
pub fn incr_session_queue_abort_count() {
59+
SESSION_QUEUE_ABORT_COUNT.inc();
60+
}
61+
62+
pub fn incr_session_queue_acquire_error_count() {
63+
SESSION_QUEUE_ACQUIRE_ERROR_COUNT.inc();
64+
}
65+
66+
pub fn incr_session_queue_acquire_timeout_count() {
67+
SESSION_QUEUE_ACQUIRE_TIMEOUT_COUNT.inc();
68+
}
69+
70+
pub fn record_session_queue_acquire_duration_ms(duration: Duration) {
71+
SESSION_QUEUE_ACQUIRE_DURATION_MS.observe(duration.as_millis() as f64);
72+
}

src/query/service/src/sessions/queue_mgr.rs

Lines changed: 37 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
// limitations under the License.
1414

1515
use std::collections::HashMap;
16+
use std::fmt::Display;
1617
use std::future::Future;
1718
use std::hash::Hash;
1819
use std::pin::Pin;
@@ -30,6 +31,11 @@ use databend_common_catalog::table_context::TableContext;
3031
use databend_common_exception::ErrorCode;
3132
use databend_common_exception::Result;
3233
use databend_common_meta_app::principal::UserInfo;
34+
use databend_common_metrics::session::incr_session_queue_abort_count;
35+
use databend_common_metrics::session::incr_session_queue_acquire_error_count;
36+
use databend_common_metrics::session::incr_session_queue_acquire_timeout_count;
37+
use databend_common_metrics::session::record_session_queue_acquire_duration_ms;
38+
use databend_common_metrics::session::set_session_queued_queries;
3339
use log::info;
3440
use parking_lot::Mutex;
3541
use pin_project_lite::pin_project;
@@ -41,7 +47,7 @@ use tokio::time::error::Elapsed;
4147
use crate::sessions::QueryContext;
4248

4349
pub trait QueueData: Send + Sync + 'static {
44-
type Key: Send + Sync + Eq + Hash + Clone + 'static;
50+
type Key: Send + Sync + Eq + Hash + Display + Clone + 'static;
4551

4652
fn get_key(&self) -> Self::Key;
4753

@@ -91,12 +97,14 @@ impl<Data: QueueData> QueueManager<Data> {
9197
pub fn remove(&self, key: Data::Key) -> bool {
9298
let mut queue = self.queue.lock();
9399
if let Some(inner) = queue.remove(&key) {
100+
set_session_queued_queries(queue.len());
94101
inner.waker.wake();
95102
inner.is_abort.store(true, Ordering::SeqCst);
96-
return true;
103+
true
104+
} else {
105+
set_session_queued_queries(queue.len());
106+
false
97107
}
98-
99-
false
100108
}
101109

102110
pub async fn acquire(self: &Arc<Self>, data: Data) -> Result<AcquireQueueGuard> {
@@ -106,20 +114,43 @@ impl<Data: QueueData> QueueManager<Data> {
106114
tokio::time::timeout(timeout, self.semaphore.clone().acquire_owned()),
107115
self.clone(),
108116
);
117+
let start_time = SystemTime::now();
109118

110-
future.await
119+
match future.await {
120+
Ok(v) => {
121+
record_session_queue_acquire_duration_ms(start_time.elapsed().unwrap_or_default());
122+
Ok(v)
123+
}
124+
Err(e) => {
125+
match e.code() {
126+
ErrorCode::ABORTED_QUERY => {
127+
incr_session_queue_abort_count();
128+
}
129+
ErrorCode::TIMEOUT => {
130+
incr_session_queue_acquire_timeout_count();
131+
}
132+
_ => {
133+
incr_session_queue_acquire_error_count();
134+
}
135+
}
136+
Err(e)
137+
}
138+
}
111139
}
112140

113141
pub(crate) fn add_entity(&self, inner: Inner<Data>) -> Data::Key {
114142
let key = inner.data.get_key();
115143
let mut queue = self.queue.lock();
116144
queue.insert(key.clone(), inner);
145+
set_session_queued_queries(queue.len());
117146
key
118147
}
119148

120149
pub(crate) fn remove_entity(&self, key: &Data::Key) -> Option<Arc<Data>> {
121150
let mut queue = self.queue.lock();
122-
queue.remove(key).map(|inner| inner.data.clone())
151+
let data = queue.remove(key).map(|inner| inner.data.clone());
152+
set_session_queued_queries(queue.len());
153+
data
123154
}
124155
}
125156

0 commit comments

Comments
 (0)