Skip to content

Commit 3d1d40d

Browse files
authored
fix(executor): fix queries executor cannot kill problem and reactivate kill test assertion (#15443)
* fix(executor): fix queries executor cluster cannot kill problem and reactivate kill test assert Signed-off-by: Liuqing Yue <dqhl76@gmail.com> * fix: add finish notify in graph Signed-off-by: Liuqing Yue <dqhl76@gmail.com> * disable queries executor, ensure not affect Signed-off-by: Liuqing Yue <dqhl76@gmail.com> --------- Signed-off-by: Liuqing Yue <dqhl76@gmail.com>
1 parent f0075de commit 3d1d40d

File tree

3 files changed

+20
-4
lines changed

3 files changed

+20
-4
lines changed

src/query/service/src/pipelines/executor/executor_graph.rs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ use std::sync::atomic::AtomicUsize;
2121
use std::sync::atomic::Ordering;
2222
use std::sync::Arc;
2323

24+
use databend_common_base::base::WatchNotify;
2425
use databend_common_base::runtime::error_info::NodeErrorType;
2526
use databend_common_base::runtime::profile::Profile;
2627
use databend_common_base::runtime::profile::ProfileStatisticsName;
@@ -191,6 +192,7 @@ struct ExecutingGraph {
191192
points: AtomicU64,
192193
query_id: Arc<String>,
193194
should_finish: AtomicBool,
195+
finished_notify: Arc<WatchNotify>,
194196
finish_condvar_notify: Option<Arc<(Mutex<bool>, Condvar)>>,
195197
finished_error: Mutex<Option<ErrorCode>>,
196198
}
@@ -212,6 +214,7 @@ impl ExecutingGraph {
212214
points: AtomicU64::new((MAX_POINTS << 32) | init_epoch as u64),
213215
query_id,
214216
should_finish: AtomicBool::new(false),
217+
finished_notify: Arc::new(WatchNotify::new()),
215218
finish_condvar_notify,
216219
finished_error: Mutex::new(None),
217220
})
@@ -235,6 +238,7 @@ impl ExecutingGraph {
235238
points: AtomicU64::new((MAX_POINTS << 32) | init_epoch as u64),
236239
query_id,
237240
should_finish: AtomicBool::new(false),
241+
finished_notify: Arc::new(WatchNotify::new()),
238242
finish_condvar_notify,
239243
finished_error: Mutex::new(None),
240244
})
@@ -793,6 +797,7 @@ impl RunningGraph {
793797
return Ok(());
794798
}
795799
self.0.should_finish.store(true, Ordering::SeqCst);
800+
self.0.finished_notify.notify_waiters();
796801
self.interrupt_running_nodes();
797802
let mut finished_error = self.0.finished_error.lock();
798803
if finished_error.is_none() {
@@ -836,6 +841,10 @@ impl RunningGraph {
836841
self.0.points.load(Ordering::SeqCst)
837842
}
838843

844+
pub fn get_finished_notify(&self) -> Arc<WatchNotify> {
845+
self.0.finished_notify.clone()
846+
}
847+
839848
pub fn format_graph_nodes(&self) -> String {
840849
pub struct NodeDisplay {
841850
id: usize,

src/query/service/src/pipelines/executor/processor_async_task.rs

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,10 @@ impl ExecutorTasksQueue {
7878
}
7979
}
8080
}
81+
82+
pub fn is_queries_executor(&self) -> bool {
83+
matches!(self, ExecutorTasksQueue::QueriesExecutorTasksQueue(_))
84+
}
8185
}
8286

8387
pub struct ProcessorAsyncTask {
@@ -101,7 +105,11 @@ impl ProcessorAsyncTask {
101105
graph: Arc<RunningGraph>,
102106
inner: Inner,
103107
) -> ProcessorAsyncTask {
104-
let finished_notify = queue.get_finished_notify();
108+
let finished_notify = if queue.is_queries_executor() {
109+
graph.get_finished_notify()
110+
} else {
111+
queue.get_finished_notify()
112+
};
105113

106114
let inner = async move {
107115
let left = Box::pin(inner);

tests/suites/1_stateful/02_query/02_0000_kill_query.py

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -37,12 +37,11 @@
3737
res = mycursor.fetchone()
3838
kill_query = "kill query " + str(res[0]) + ";"
3939
mycursor.execute(kill_query)
40-
time.sleep(0.1)
40+
time.sleep(0.5)
4141
mycursor.execute(
4242
"SELECT * FROM system.processes WHERE extra_info LIKE '%SELECT max(number)%' AND extra_info NOT LIKE '%system.processes%';"
4343
)
4444
res = mycursor.fetchone()
4545

46-
## TODO NEW EXPRESSION
47-
## assert res is None
46+
assert res is None
4847
client1.expect(prompt)

0 commit comments

Comments
 (0)