Skip to content

Commit 556080f

Browse files
authored
refactor(query): remove unsafe implementation of Send and Sync for AcquireQueueGuard (#17818)
The following Send/Sync implementation does not provide any safety guarantee. ``` unsafe impl Send for AcquireQueueGuard {} unsafe impl Sync for AcquireQueueGuard {} ``` In this commit, by removing potential `Sync` access to internal data in `HttpQuery`, `AcquireQueueGuard` does not need to be `Sync` any more. Therefore there is no need to add unsafe Send/Sync impl to it. Because `Arc<T>: Send` requires `T: Sync`, but `Arc<Mutex<T>>: Send` does not require `T: Sync`, we replace `Arc<T>` with `Arc<Mutex<T>>`, to eliminate the `Sync` requirement. Therefore, `AcquireQueueGuard` is only required to be `Send`.
1 parent 66af36f commit 556080f

File tree

4 files changed

+36
-40
lines changed

4 files changed

+36
-40
lines changed

src/query/service/src/servers/http/v1/http_query_handlers.rs

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -273,7 +273,6 @@ async fn query_final_handler(
273273
Some(query) => {
274274
let mut response = query
275275
.get_response_state_only()
276-
.await
277276
.map_err(HttpErrorCode::server_error)?;
278277
// it is safe to set these 2 fields to None, because client now check for null/None first.
279278
response.session = None;
@@ -338,7 +337,6 @@ async fn query_state_handler(
338337
} else {
339338
let response = query
340339
.get_response_state_only()
341-
.await
342340
.map_err(HttpErrorCode::server_error)?;
343341
Ok(QueryResponse::from_internal(query_id, response, false))
344342
}

src/query/service/src/servers/http/v1/query/execute_state.rs

Lines changed: 16 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@ use std::collections::HashMap;
1616
use std::sync::Arc;
1717
use std::time::SystemTime;
1818

19-
use databend_common_base::base::tokio::sync::RwLock;
2019
use databend_common_base::base::ProgressValues;
2120
use databend_common_base::base::SpillProgress;
2221
use databend_common_base::runtime::CatchUnwindFuture;
@@ -34,6 +33,7 @@ use futures::StreamExt;
3433
use log::debug;
3534
use log::error;
3635
use log::info;
36+
use parking_lot::Mutex;
3737
use serde::Deserialize;
3838
use serde::Serialize;
3939
use ExecuteState::*;
@@ -245,25 +245,23 @@ impl Executor {
245245
}
246246
}
247247

248-
#[async_backtrace::framed]
249-
pub async fn start_to_running(this: &Arc<RwLock<Executor>>, state: ExecuteState) {
250-
let mut guard = this.write().await;
248+
pub fn start_to_running(this: &Arc<Mutex<Executor>>, state: ExecuteState) {
249+
let mut guard = this.lock();
251250
if let Starting(_) = &guard.state {
252251
guard.state = state
253252
}
254253
}
255254

256-
#[async_backtrace::framed]
257-
pub async fn start_to_stop(this: &Arc<RwLock<Executor>>, state: ExecuteState) {
258-
let mut guard = this.write().await;
255+
pub fn start_to_stop(this: &Arc<Mutex<Executor>>, state: ExecuteState) {
256+
let mut guard = this.lock();
259257
if let Starting(_) = &guard.state {
260258
guard.state = state
261259
}
262260
}
263-
#[async_backtrace::framed]
264-
pub async fn stop<C>(this: &Arc<RwLock<Executor>>, reason: Result<(), C>) {
261+
262+
pub fn stop<C>(this: &Arc<Mutex<Executor>>, reason: Result<(), C>) {
265263
let reason = reason.with_context(|| "execution stopped");
266-
let mut guard = this.write().await;
264+
let mut guard = this.lock();
267265

268266
let state = match &guard.state {
269267
Starting(s) => {
@@ -337,7 +335,7 @@ impl Executor {
337335
impl ExecuteState {
338336
#[async_backtrace::framed]
339337
pub(crate) async fn try_start_query(
340-
executor: Arc<RwLock<Executor>>,
338+
executor: Arc<Mutex<Executor>>,
341339
sql: String,
342340
session: Arc<Session>,
343341
ctx: Arc<QueryContext>,
@@ -377,7 +375,7 @@ impl ExecuteState {
377375
has_result_set,
378376
};
379377
info!("http query change state to Running");
380-
Executor::start_to_running(&executor, Running(running_state)).await;
378+
Executor::start_to_running(&executor, Running(running_state));
381379

382380
let executor_clone = executor.clone();
383381
let ctx_clone = ctx.clone();
@@ -392,11 +390,11 @@ impl ExecuteState {
392390
);
393391
match CatchUnwindFuture::create(res).await {
394392
Ok(Err(err)) => {
395-
Executor::stop(&executor_clone, Err(err.clone())).await;
393+
Executor::stop(&executor_clone, Err(err.clone()));
396394
block_sender_closer.close();
397395
}
398396
Err(e) => {
399-
Executor::stop(&executor_clone, Err(e)).await;
397+
Executor::stop(&executor_clone, Err(e));
400398
block_sender_closer.close();
401399
}
402400
_ => {}
@@ -411,7 +409,7 @@ async fn execute(
411409
schema: DataSchemaRef,
412410
ctx: Arc<QueryContext>,
413411
block_sender: SizedChannelSender<DataBlock>,
414-
executor: Arc<RwLock<Executor>>,
412+
executor: Arc<Mutex<Executor>>,
415413
) -> Result<(), ExecutionError> {
416414
let make_error = || format!("failed to execute {}", interpreter.name());
417415

@@ -423,11 +421,11 @@ async fn execute(
423421
None => {
424422
let block = DataBlock::empty_with_schema(schema);
425423
block_sender.send(block, 0).await;
426-
Executor::stop::<()>(&executor, Ok(())).await;
424+
Executor::stop::<()>(&executor, Ok(()));
427425
block_sender.close();
428426
}
429427
Some(Err(err)) => {
430-
Executor::stop(&executor, Err(err)).await;
428+
Executor::stop(&executor, Err(err));
431429
block_sender.close();
432430
}
433431
Some(Ok(block)) => {
@@ -444,7 +442,7 @@ async fn execute(
444442
}
445443
};
446444
}
447-
Executor::stop::<()>(&executor, Ok(())).await;
445+
Executor::stop::<()>(&executor, Ok(()));
448446
block_sender.close();
449447
}
450448
}

src/query/service/src/servers/http/v1/query/http_query.rs

Lines changed: 20 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@ use std::time::Instant;
2323

2424
use databend_common_base::base::short_sql;
2525
use databend_common_base::base::tokio::sync::Mutex as TokioMutex;
26-
use databend_common_base::base::tokio::sync::RwLock;
2726
use databend_common_base::runtime::CatchUnwindFuture;
2827
use databend_common_base::runtime::GlobalQueryRuntime;
2928
use databend_common_base::runtime::TrySpawn;
@@ -355,9 +354,9 @@ pub struct HttpQuery {
355354
pub(crate) session_id: String,
356355
pub(crate) node_id: String,
357356
request: HttpQueryRequest,
358-
state: Arc<RwLock<Executor>>,
357+
state: Arc<Mutex<Executor>>,
359358
page_manager: Arc<TokioMutex<PageManager>>,
360-
expire_state: Arc<parking_lot::Mutex<ExpireState>>,
359+
expire_state: Arc<Mutex<ExpireState>>,
361360
/// The timeout for the query result polling. In the normal case, the client driver
362361
/// should fetch the paginated result in a timely manner, and the interval should not
363362
/// exceed this result_timeout_secs.
@@ -514,7 +513,7 @@ impl HttpQuery {
514513

515514
let (block_sender, block_receiver) = sized_spsc(request.pagination.max_rows_in_buffer);
516515

517-
let state = Arc::new(RwLock::new(Executor {
516+
let state = Arc::new(Mutex::new(Executor {
518517
query_id: query_id.clone(),
519518
state: ExecuteState::Starting(ExecuteStarting { ctx: ctx.clone() }),
520519
}));
@@ -565,8 +564,7 @@ impl HttpQuery {
565564
warnings: ctx_clone.pop_warnings(),
566565
};
567566
info!("http query change state to Stopped, fail to start {:?}", e);
568-
Executor::start_to_stop(&state_clone, ExecuteState::Stopped(Box::new(state)))
569-
.await;
567+
Executor::start_to_stop(&state_clone, ExecuteState::Stopped(Box::new(state)));
570568
block_sender_closer.close();
571569
}
572570
}
@@ -608,7 +606,7 @@ impl HttpQuery {
608606
#[fastrace::trace]
609607
pub async fn get_response_page(&self, page_no: usize) -> Result<HttpQueryResponseInternal> {
610608
let data = Some(self.get_page(page_no).await?);
611-
let state = self.get_state().await;
609+
let state = self.get_state();
612610
let session = self.get_response_session().await?;
613611

614612
Ok(HttpQueryResponseInternal {
@@ -621,9 +619,8 @@ impl HttpQuery {
621619
})
622620
}
623621

624-
#[async_backtrace::framed]
625-
pub async fn get_response_state_only(&self) -> Result<HttpQueryResponseInternal> {
626-
let state = self.get_state().await;
622+
pub fn get_response_state_only(&self) -> Result<HttpQueryResponseInternal> {
623+
let state = self.get_state();
627624

628625
Ok(HttpQueryResponseInternal {
629626
data: None,
@@ -635,9 +632,8 @@ impl HttpQuery {
635632
})
636633
}
637634

638-
#[async_backtrace::framed]
639-
async fn get_state(&self) -> ResponseState {
640-
let state = self.state.read().await;
635+
fn get_state(&self) -> ResponseState {
636+
let state = self.state.lock();
641637
state.get_response_state()
642638
}
643639

@@ -648,8 +644,15 @@ impl HttpQuery {
648644
// - role: updated by SET ROLE;
649645
// - secondary_roles: updated by SET SECONDARY ROLES ALL|NONE;
650646
// - settings: updated by SET XXX = YYY;
651-
let executor = self.state.read().await;
652-
let session_state = executor.get_session_state();
647+
648+
let (session_state, is_stopped) = {
649+
let executor = self.state.lock();
650+
651+
let session_state = executor.get_session_state();
652+
let is_stopped = matches!(executor.state, ExecuteState::Stopped(_));
653+
654+
(session_state, is_stopped)
655+
};
653656

654657
let settings = session_state
655658
.settings
@@ -669,7 +672,7 @@ impl HttpQuery {
669672
None
670673
};
671674

672-
if matches!(executor.state, ExecuteState::Stopped(_)) {
675+
if is_stopped {
673676
if let Some(cid) = &self.client_session_id {
674677
let (has_temp_table_after_run, just_changed) = {
675678
let mut guard = self.has_temp_table_after_run.lock();
@@ -764,7 +767,7 @@ impl HttpQuery {
764767
// the query will be removed from the query manager before the session is dropped.
765768
self.detach().await;
766769

767-
Executor::stop(&self.state, Err(reason)).await;
770+
Executor::stop(&self.state, Err(reason));
768771
}
769772

770773
#[async_backtrace::framed]

src/query/service/src/sessions/queue_mgr.rs

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -234,9 +234,6 @@ pub struct AcquireQueueGuard {
234234
permit: Option<Permit>,
235235
}
236236

237-
unsafe impl Send for AcquireQueueGuard {}
238-
unsafe impl Sync for AcquireQueueGuard {}
239-
240237
impl Drop for AcquireQueueGuard {
241238
fn drop(&mut self) {
242239
if self.permit.is_some() {

0 commit comments

Comments
 (0)