Skip to content

Commit 8cf2470

Browse files
authored
feat(query): add statement_queued_timeout_in_seconds setting for queries queue (#14945)
* feat(query): add timeout for queries queue * feat(query): add timeout for queries queue
1 parent e6d4b8e commit 8cf2470

File tree

8 files changed

+40
-9
lines changed

8 files changed

+40
-9
lines changed

src/common/exception/src/exception_code.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -149,6 +149,7 @@ build_exceptions! {
149149
UnknownCatalog(1119),
150150
UnknownCatalogType(1120),
151151
UnmatchMaskPolicyReturnType(1121),
152+
Timeout(1122),
152153

153154
// Data Related Errors
154155

src/query/config/src/config.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1399,7 +1399,7 @@ pub struct QueryConfig {
13991399
#[clap(long, value_name = "VALUE", default_value = "256")]
14001400
pub max_active_sessions: u64,
14011401

1402-
#[clap(long, value_name = "VALUE", default_value = "0")]
1402+
#[clap(long, value_name = "VALUE", default_value = "8")]
14031403
pub max_running_queries: u64,
14041404

14051405
/// The max total memory in bytes that can be used by this process.

src/query/config/src/inner.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -247,7 +247,7 @@ impl Default for QueryConfig {
247247
mysql_tls_server_cert: "".to_string(),
248248
mysql_tls_server_key: "".to_string(),
249249
max_active_sessions: 256,
250-
max_running_queries: 0,
250+
max_running_queries: 8,
251251
max_server_memory_usage: 0,
252252
max_memory_limit_enabled: false,
253253
clickhouse_http_handler_host: "127.0.0.1".to_string(),

src/query/service/src/sessions/queue_mgr.rs

Lines changed: 22 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ use std::sync::Arc;
2222
use std::task::Context;
2323
use std::task::Poll;
2424
use std::task::Waker;
25+
use std::time::Duration;
2526
use std::time::SystemTime;
2627

2728
use databend_common_base::base::GlobalInstance;
@@ -35,6 +36,7 @@ use pin_project_lite::pin_project;
3536
use tokio::sync::AcquireError;
3637
use tokio::sync::OwnedSemaphorePermit;
3738
use tokio::sync::Semaphore;
39+
use tokio::time::error::Elapsed;
3840

3941
use crate::sessions::QueryContext;
4042

@@ -44,6 +46,8 @@ pub trait QueueData: Send + Sync + 'static {
4446
fn get_key(&self) -> Self::Key;
4547

4648
fn remove_error_message(key: Option<Self::Key>) -> ErrorCode;
49+
50+
fn timeout(&self) -> Duration;
4751
}
4852

4953
pub(crate) struct Inner<Data: QueueData> {
@@ -96,9 +100,10 @@ impl<Data: QueueData> QueueManager<Data> {
96100
}
97101

98102
pub async fn acquire(self: &Arc<Self>, data: Data) -> Result<AcquireQueueGuard> {
103+
let timeout = data.timeout();
99104
let future = AcquireQueueFuture::create(
100105
Arc::new(data),
101-
self.semaphore.clone().acquire_owned(),
106+
tokio::time::timeout(timeout, self.semaphore.clone().acquire_owned()),
102107
self.clone(),
103108
);
104109

@@ -131,7 +136,7 @@ impl AcquireQueueGuard {
131136

132137
pin_project! {
133138
pub struct AcquireQueueFuture<Data: QueueData, T>
134-
where T: Future<Output = Result<OwnedSemaphorePermit, AcquireError>>
139+
where T: Future<Output = Result<Result<OwnedSemaphorePermit, AcquireError>, Elapsed>>
135140
{
136141
#[pin]
137142
inner: T,
@@ -146,7 +151,7 @@ where T: Future<Output = Result<OwnedSemaphorePermit, AcquireError>>
146151
}
147152

148153
impl<Data: QueueData, T> AcquireQueueFuture<Data, T>
149-
where T: Future<Output = Result<OwnedSemaphorePermit, AcquireError>>
154+
where T: Future<Output = Result<Result<OwnedSemaphorePermit, AcquireError>, Elapsed>>
150155
{
151156
pub fn create(data: Arc<Data>, inner: T, mgr: Arc<QueueManager<Data>>) -> Self {
152157
AcquireQueueFuture {
@@ -161,7 +166,7 @@ where T: Future<Output = Result<OwnedSemaphorePermit, AcquireError>>
161166
}
162167

163168
impl<Data: QueueData, T> Future for AcquireQueueFuture<Data, T>
164-
where T: Future<Output = Result<OwnedSemaphorePermit, AcquireError>>
169+
where T: Future<Output = Result<Result<OwnedSemaphorePermit, AcquireError>, Elapsed>>
165170
{
166171
type Output = Result<AcquireQueueGuard>;
167172

@@ -181,8 +186,9 @@ where T: Future<Output = Result<OwnedSemaphorePermit, AcquireError>>
181186
}
182187

183188
Poll::Ready(match res {
184-
Ok(v) => Ok(AcquireQueueGuard::create(v)),
185-
Err(_) => Err(ErrorCode::TokioError("acquire queue failure.")),
189+
Ok(Ok(v)) => Ok(AcquireQueueGuard::create(v)),
190+
Ok(Err(_)) => Err(ErrorCode::TokioError("acquire queue failure.")),
191+
Err(_elapsed) => Err(ErrorCode::Timeout("query queuing timeout")),
186192
})
187193
}
188194
Poll::Pending => {
@@ -209,14 +215,20 @@ pub struct QueryEntry {
209215
pub query_id: String,
210216
pub create_time: SystemTime,
211217
pub user_info: UserInfo,
218+
pub timeout: Duration,
212219
}
213220

214221
impl QueryEntry {
215222
pub fn create(ctx: &Arc<QueryContext>) -> Result<QueryEntry> {
223+
let settings = ctx.get_settings();
216224
Ok(QueryEntry {
217225
query_id: ctx.get_id(),
218226
create_time: ctx.get_created_time(),
219227
user_info: ctx.get_current_user()?,
228+
timeout: match settings.get_statement_queued_timeout()? {
229+
0 => Duration::from_secs(60 * 60 * 24 * 365 * 35),
230+
timeout => Duration::from_secs(timeout),
231+
},
220232
})
221233
}
222234
}
@@ -237,6 +249,10 @@ impl QueueData for QueryEntry {
237249
)),
238250
}
239251
}
252+
253+
fn timeout(&self) -> Duration {
254+
self.timeout
255+
}
240256
}
241257

242258
pub type QueriesQueueManager = QueueManager<QueryEntry>;

src/query/service/tests/it/sessions/queue_mgr.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,10 @@ impl QueueData for TestData {
3636
fn remove_error_message(key: Option<Self::Key>) -> ErrorCode {
3737
ErrorCode::Internal(format!("{:?}", key))
3838
}
39+
40+
fn timeout(&self) -> Duration {
41+
Duration::from_secs(1000)
42+
}
3943
}
4044

4145
#[tokio::test(flavor = "multi_thread")]

src/query/service/tests/it/storages/testdata/configs_table_basic.txt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -99,7 +99,7 @@ DB.Table: 'system'.'configs', Table: configs-table_id:1, ver:0, Engine: SystemCo
9999
| 'query' | 'max_active_sessions' | '256' | '' |
100100
| 'query' | 'max_memory_limit_enabled' | 'false' | '' |
101101
| 'query' | 'max_query_log_size' | '10000' | '' |
102-
| 'query' | 'max_running_queries' | '0' | '' |
102+
| 'query' | 'max_running_queries' | '8' | '' |
103103
| 'query' | 'max_server_memory_usage' | '0' | '' |
104104
| 'query' | 'max_storage_io_requests' | 'null' | '' |
105105
| 'query' | 'metric_api_address' | '127.0.0.1:7070' | '' |

src/query/settings/src/settings_default.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -709,6 +709,12 @@ impl DefaultSettings {
709709
mode: SettingMode::Both,
710710
range: None,
711711
}),
712+
("statement_queued_timeout_in_seconds", DefaultSettingValue {
713+
value: UserSettingValue::UInt64(0),
714+
desc: "The maximum waiting seconds in the queue. The default value is 0(no limit).",
715+
mode: SettingMode::Both,
716+
range: Some(SettingRange::Numeric(0..=u64::MAX)),
717+
})
712718
]);
713719

714720
Ok(Arc::new(DefaultSettings {

src/query/settings/src/settings_getter_setter.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -626,4 +626,8 @@ impl Settings {
626626
pub fn get_enable_experimental_queries_executor(&self) -> Result<bool> {
627627
Ok(self.try_get_u64("enable_experimental_queries_executor")? == 1)
628628
}
629+
630+
pub fn get_statement_queued_timeout(&self) -> Result<u64> {
631+
self.try_get_u64("statement_queued_timeout_in_seconds")
632+
}
629633
}

0 commit comments

Comments
 (0)