Skip to content

Commit 316bafb

Browse files
dqhl76zhang2014
andauthored
refactor(executor): control queries executor via configuration (#18283)
* refactor: make queries executor lazy init * test * fix: queries executor hang * fix: executor of r-cte run in others threads * fix: use config(enable) + settings(fallback) to control * clean * save * fixut * Revert "save" This reverts commit 8fe0c86. * follow review suggestion * follow review suggestion --------- Co-authored-by: Winter Zhang <coswde@gmail.com>
1 parent e2b5c7e commit 316bafb

File tree

14 files changed

+90
-46
lines changed

14 files changed

+90
-46
lines changed

src/query/config/src/config.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1946,6 +1946,9 @@ pub struct QueryConfig {
19461946

19471947
#[clap(skip)]
19481948
pub resources_management: Option<ResourcesManagementConfig>,
1949+
1950+
#[clap(long, value_name = "VALUE", default_value = "false")]
1951+
pub enable_queries_executor: bool,
19491952
}
19501953

19511954
impl Default for QueryConfig {
@@ -2047,6 +2050,7 @@ impl TryInto<InnerQueryConfig> for QueryConfig {
20472050
.map(|(k, v)| (k, v.into()))
20482051
.collect(),
20492052
resources_management: self.resources_management,
2053+
enable_queries_executor: self.enable_queries_executor,
20502054
})
20512055
}
20522056
}
@@ -2155,6 +2159,7 @@ impl From<InnerQueryConfig> for QueryConfig {
21552159
network_policy_whitelist: inner.network_policy_whitelist,
21562160
settings: HashMap::new(),
21572161
resources_management: None,
2162+
enable_queries_executor: inner.enable_queries_executor,
21582163
}
21592164
}
21602165
}

src/query/config/src/inner.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -256,6 +256,8 @@ pub struct QueryConfig {
256256

257257
pub settings: HashMap<String, UserSettingValue>,
258258
pub resources_management: Option<ResourcesManagementConfig>,
259+
260+
pub enable_queries_executor: bool,
259261
}
260262

261263
impl Default for QueryConfig {
@@ -343,6 +345,7 @@ impl Default for QueryConfig {
343345
network_policy_whitelist: Vec::new(),
344346
settings: HashMap::new(),
345347
resources_management: None,
348+
enable_queries_executor: false,
346349
}
347350
}
348351
}

src/query/service/Cargo.toml

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,6 @@ io-uring = [
1919
"databend-common-meta-store/io-uring",
2020
]
2121

22-
enable_queries_executor = []
23-
2422
[dependencies]
2523
anyhow = { workspace = true }
2624
arrow-array = { workspace = true }

src/query/service/src/global_services.rs

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,6 @@ use crate::catalogs::IcebergCreator;
5050
use crate::clusters::ClusterDiscovery;
5151
use crate::history_tables::GlobalHistoryLog;
5252
use crate::locks::LockManager;
53-
#[cfg(feature = "enable_queries_executor")]
5453
use crate::pipelines::executor::GlobalQueriesExecutor;
5554
use crate::servers::flight::v1::exchange::DataExchangeManager;
5655
use crate::servers::http::v1::ClientSessionManager;
@@ -163,15 +162,14 @@ impl GlobalServices {
163162
CloudControlApiProvider::init(addr, config.query.cloud_control_grpc_timeout).await?;
164163
}
165164

166-
#[cfg(feature = "enable_queries_executor")]
167-
{
168-
GlobalQueriesExecutor::init()?;
169-
}
170-
171165
if !ee_mode {
172166
DummyResourcesManagement::init()?;
173167
}
174168

169+
if config.query.enable_queries_executor {
170+
GlobalQueriesExecutor::init()?;
171+
}
172+
175173
Self::init_workload_mgr(config).await?;
176174

177175
if config.log.history.on {

src/query/service/src/interpreters/interpreter_set.rs

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -113,6 +113,25 @@ impl SetInterpreter {
113113
.await?;
114114
true
115115
}
116+
"use_legacy_query_executor" => {
117+
// This is a fallback setting, allowing user to fallback from **queries** executor
118+
// to the **query** executor. So, if queries executor not enable in the config
119+
// we will return an error.
120+
// TODO: we will remove this setting when queries executor is stable.
121+
let config = GlobalConfig::instance();
122+
if !config.query.enable_queries_executor {
123+
return Err(
124+
ErrorCode::InvalidArgument("This setting is not allowed when queries executor is not enabled in the configuration"));
125+
}
126+
if scalar.as_str() == "0" {
127+
return Err(ErrorCode::InvalidArgument(
128+
"This setting is not allowed set to 0, if already enable in the configuration, please use unset to revert this",
129+
));
130+
}
131+
self.set_settings(var.to_string(), scalar.clone(), is_global)
132+
.await?;
133+
true
134+
}
116135
_ => {
117136
self.set_settings(var.to_string(), scalar.clone(), is_global)
118137
.await?;

src/query/service/src/pipelines/executor/executor_graph.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -445,6 +445,7 @@ impl ExecutingGraph {
445445
let max_points = self.max_points.load(Ordering::SeqCst);
446446
let mut expected_value = 0;
447447
let mut desired_value = 0;
448+
448449
loop {
449450
match self.points.compare_exchange_weak(
450451
expected_value,
@@ -460,6 +461,7 @@ impl ExecutingGraph {
460461
let epoch = new_expected & EPOCH_MASK;
461462

462463
expected_value = new_expected;
464+
463465
if epoch > global_epoch as u64 {
464466
desired_value = new_expected;
465467
} else if epoch < global_epoch as u64 {

src/query/service/src/pipelines/executor/executor_settings.rs

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ use std::sync::Arc;
1616
use std::time::Duration;
1717

1818
use databend_common_catalog::table_context::TableContext;
19+
use databend_common_config::GlobalConfig;
1920
use databend_common_exception::Result;
2021

2122
#[derive(Clone)]
@@ -34,8 +35,18 @@ impl ExecutorSettings {
3435
let max_threads = settings.get_max_threads()?;
3536
let max_execute_time_in_seconds = settings.get_max_execute_time_in_seconds()?;
3637

38+
let config_enable_queries_executor = GlobalConfig::instance().query.enable_queries_executor;
39+
let setting_use_legacy_query_executor = settings.get_use_legacy_query_executor()?;
40+
// If `use_legacy_query_executor` is set to 1, we disable the queries executor
41+
// Otherwise, we all follow configuration
42+
let enable_queries_executor = if setting_use_legacy_query_executor {
43+
false
44+
} else {
45+
config_enable_queries_executor
46+
};
47+
3748
Ok(ExecutorSettings {
38-
enable_queries_executor: settings.get_enable_experimental_queries_executor()?,
49+
enable_queries_executor,
3950
query_id: Arc::new(query_id),
4051
max_execute_time_in_seconds: Duration::from_secs(max_execute_time_in_seconds),
4152
max_threads,

src/query/service/src/pipelines/executor/global_queries_executor.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ impl GlobalQueriesExecutor {
2727
pub fn init() -> Result<()> {
2828
let num_cpus = num_cpus::get();
2929
GlobalInstance::set(QueriesPipelineExecutor::create(num_cpus)?);
30-
Thread::spawn(|| {
30+
Thread::named_spawn(Some("GlobalQueriesExecutor".to_string()), || {
3131
if let Err(e) = Self::instance().execute() {
3232
info!("Executor finished with error: {:?}", e);
3333
}

src/query/service/src/pipelines/executor/queries_executor_tasks.rs

Lines changed: 18 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,15 @@ impl WorkersTasks {
4949
pub fn swap_tasks(&mut self) {
5050
mem::swap(&mut self.current_tasks, &mut self.next_tasks);
5151
}
52+
53+
pub fn wakeup_all_waiting_workers(&mut self, condvar: &WorkersCondvar) {
54+
for worker_id in 0..self.workers_waiting_status.total_size() {
55+
if self.workers_waiting_status.is_waiting(worker_id) {
56+
self.workers_waiting_status.wakeup_worker(worker_id);
57+
condvar.wakeup(worker_id);
58+
}
59+
}
60+
}
5261
}
5362

5463
pub struct QueriesExecutorTasksQueue {
@@ -163,19 +172,18 @@ impl QueriesExecutorTasksQueue {
163172
workers_tasks
164173
.next_tasks
165174
.push_task(worker_id, ExecutorTask::Sync(proc));
166-
167175
worker_id += 1;
176+
168177
if worker_id == workers_tasks.next_tasks.workers_sync_tasks.len() {
169178
worker_id = 0;
170179
}
171-
172-
if workers_tasks.workers_waiting_status.is_waiting(worker_id) {
173-
workers_tasks
174-
.workers_waiting_status
175-
.wakeup_worker(worker_id);
176-
condvar.wakeup(worker_id);
177-
}
178180
}
181+
182+
// Wake up all workers that is waiting
183+
// Only check one worker and wake it up is not enough,
184+
// in queries executor, this worker may be blocked and wait for a result
185+
// e.g. TransformRecursiveCteSource
186+
workers_tasks.wakeup_all_waiting_workers(&condvar);
179187
}
180188

181189
pub fn init_async_tasks(
@@ -190,19 +198,13 @@ impl QueriesExecutorTasksQueue {
190198
workers_tasks
191199
.next_tasks
192200
.push_task(worker_id, ExecutorTask::Async(proc));
193-
194201
worker_id += 1;
195202
if worker_id == workers_tasks.next_tasks.workers_sync_tasks.len() {
196203
worker_id = 0;
197204
}
198-
199-
if workers_tasks.workers_waiting_status.is_waiting(worker_id) {
200-
workers_tasks
201-
.workers_waiting_status
202-
.wakeup_worker(worker_id);
203-
condvar.wakeup(worker_id);
204-
}
205205
}
206+
207+
workers_tasks.wakeup_all_waiting_workers(&condvar);
206208
}
207209

208210
pub fn completed_async_task(&self, condvar: Arc<WorkersCondvar>, task: CompletedAsyncTask) {

src/query/service/src/pipelines/executor/queries_pipeline_executor.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -109,7 +109,7 @@ impl QueriesPipelineExecutor {
109109
for thread_num in 0..threads {
110110
let this = self.clone();
111111
#[allow(unused_mut)]
112-
let mut name = format!("PipelineExecutor-{}", thread_num);
112+
let mut name = format!("QueriesPipelineExecutor-{}", thread_num);
113113

114114
#[cfg(debug_assertions)]
115115
{

0 commit comments

Comments
 (0)