Skip to content

Commit 4e6515b

Browse files
authored
feat(planner): support timeout termination for query optimization (#13955)
add stats for cascades optimizer
1 parent e9934ab commit 4e6515b

File tree

7 files changed

+144
-38
lines changed

7 files changed

+144
-38
lines changed

โ€Žsrc/query/sql/src/planner/optimizer/cascades/cascade.rs

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ use common_catalog::table_context::TableContext;
1919
use common_exception::ErrorCode;
2020
use common_exception::Result;
2121
use log::debug;
22+
use log::info;
2223

2324
use super::explore_rules::get_explore_rule_set;
2425
use crate::optimizer::cascades::scheduler::Scheduler;
@@ -87,10 +88,29 @@ impl CascadesOptimizer {
8788
.group_index;
8889

8990
let root_task = OptimizeGroupTask::new(self.ctx.clone(), root_index);
90-
let mut scheduler = Scheduler::new();
91+
92+
let start_time = std::time::Instant::now();
93+
let mut num_task_apply_rule = 0;
94+
let mut scheduler = Scheduler::new().with_callback(|task| {
95+
if let Task::ApplyRule(_) = task {
96+
num_task_apply_rule += 1;
97+
}
98+
});
9199
scheduler.add_task(Task::OptimizeGroup(root_task));
92100
scheduler.run(self)?;
93101

102+
let scheduled_task_count = scheduler.scheduled_task_count();
103+
drop(scheduler);
104+
let elapsed = start_time.elapsed();
105+
106+
info!(
107+
"optimizer stats - total task number: {:#?}, total execution time: {:.3}ms, average execution time: {:.3}ms, apply rule task number: {:#?}",
108+
scheduled_task_count,
109+
elapsed.as_millis() as f64,
110+
elapsed.as_millis() as f64 / scheduled_task_count as f64,
111+
num_task_apply_rule,
112+
);
113+
94114
debug!("Memo:\n{}", display_memo(&self.memo, &self.best_cost_map)?);
95115

96116
self.find_optimal_plan(root_index)

โ€Žsrc/query/sql/src/planner/optimizer/cascades/scheduler.rs

Lines changed: 47 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,29 +20,69 @@ use log::debug;
2020
use super::tasks::Task;
2121
use super::CascadesOptimizer;
2222

23-
pub struct Scheduler {
23+
#[allow(clippy::type_complexity)]
24+
pub struct Scheduler<'a> {
2425
task_queue: VecDeque<Task>,
2526

2627
/// A counter to track the number of tasks
2728
/// that have been scheduled.
2829
scheduled_task_count: u64,
30+
31+
/// The maximum number of tasks that can be scheduled.
32+
/// If the number of scheduled tasks exceeds this limit,
33+
/// the scheduler will stop scheduling new tasks.
34+
task_limit: u64,
35+
36+
/// Task callback functions invoked before a task is executed.
37+
callback: Option<Box<dyn FnMut(&Task) + 'a>>,
2938
}
3039

31-
impl Scheduler {
40+
impl<'a> Scheduler<'a> {
3241
pub fn new() -> Self {
3342
Self {
3443
task_queue: Default::default(),
3544
scheduled_task_count: 0,
45+
task_limit: u64::MAX,
46+
callback: None,
3647
}
3748
}
3849

50+
/// Set the maximum number of tasks that can be scheduled.
51+
#[allow(dead_code)]
52+
pub fn with_task_limit(mut self, task_limit: u64) -> Self {
53+
self.task_limit = task_limit;
54+
self
55+
}
56+
57+
/// Add a callback function that will be invoked before a task is executed.
58+
pub fn with_callback(mut self, callback: impl FnMut(&Task) + 'a) -> Self {
59+
self.callback = Some(Box::new(callback));
60+
self
61+
}
62+
3963
pub fn run(&mut self, optimizer: &mut CascadesOptimizer) -> Result<()> {
40-
while let Some(task) = self.task_queue.pop_front() {
64+
while let Some(mut task) = self.task_queue.pop_front() {
65+
if self.scheduled_task_count > self.task_limit {
66+
// Skip explore tasks if the task limit is reached.
67+
match task {
68+
Task::ExploreGroup(t) => {
69+
task = Task::ExploreGroup(t.with_termination());
70+
}
71+
Task::ExploreExpr(t) => {
72+
task = Task::ExploreExpr(t.with_termination());
73+
}
74+
_ => {}
75+
}
76+
}
77+
4178
if task.ref_count() > 0 {
4279
// The task is still referenced by other tasks, requeue it.
4380
self.task_queue.push_back(task);
4481
continue;
4582
}
83+
if let Some(callback) = &mut self.callback {
84+
callback(&task);
85+
}
4686
task.execute(optimizer, self)?;
4787

4888
// Update the counter
@@ -60,4 +100,8 @@ impl Scheduler {
60100
pub fn add_task(&mut self, task: Task) {
61101
self.task_queue.push_back(task);
62102
}
103+
104+
pub fn scheduled_task_count(&self) -> u64 {
105+
self.scheduled_task_count
106+
}
63107
}

โ€Žsrc/query/sql/src/planner/optimizer/cascades/tasks/explore_expr.rs

Lines changed: 29 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,8 @@ pub struct ExploreExprTask {
5656

5757
pub ref_count: Rc<SharedCounter>,
5858
pub parent: Option<Rc<SharedCounter>>,
59+
60+
should_terminate: bool,
5961
}
6062

6163
impl ExploreExprTask {
@@ -71,29 +73,32 @@ impl ExploreExprTask {
7173
m_expr_index,
7274
ref_count: Rc::new(SharedCounter::new()),
7375
parent: None,
76+
should_terminate: false,
7477
}
7578
}
7679

77-
pub fn with_parent(
78-
ctx: Arc<dyn TableContext>,
79-
group_index: IndexType,
80-
m_expr_index: IndexType,
81-
parent: &Rc<SharedCounter>,
82-
) -> Self {
83-
let mut task = Self::new(ctx, group_index, m_expr_index);
80+
pub fn with_parent(mut self, parent: &Rc<SharedCounter>) -> Self {
8481
parent.inc();
85-
task.parent = Some(parent.clone());
86-
task
82+
self.parent = Some(parent.clone());
83+
self
84+
}
85+
86+
pub fn with_termination(mut self) -> Self {
87+
self.should_terminate = true;
88+
self
8789
}
8890

8991
pub fn execute(
9092
mut self,
9193
optimizer: &mut CascadesOptimizer,
92-
scheduler: &mut Scheduler,
94+
scheduler: &mut Scheduler<'_>,
9395
) -> Result<()> {
9496
if matches!(self.state, ExploreExprState::ExploredSelf) {
9597
return Ok(());
9698
}
99+
if self.should_terminate {
100+
return self.terminate();
101+
}
97102
self.transition(optimizer, scheduler)?;
98103
scheduler.add_task(Task::ExploreExpr(self));
99104
Ok(())
@@ -102,7 +107,7 @@ impl ExploreExprTask {
102107
fn transition(
103108
&mut self,
104109
optimizer: &mut CascadesOptimizer,
105-
scheduler: &mut Scheduler,
110+
scheduler: &mut Scheduler<'_>,
106111
) -> Result<()> {
107112
let event = match self.state {
108113
ExploreExprState::Init => self.explore_children(optimizer, scheduler)?,
@@ -129,7 +134,7 @@ impl ExploreExprTask {
129134
fn explore_children(
130135
&mut self,
131136
optimizer: &mut CascadesOptimizer,
132-
scheduler: &mut Scheduler,
137+
scheduler: &mut Scheduler<'_>,
133138
) -> Result<ExploreExprEvent> {
134139
let m_expr = optimizer
135140
.memo
@@ -142,7 +147,7 @@ impl ExploreExprTask {
142147
// If the child group isn't explored, then schedule a `ExploreGroupTask` for it.
143148
all_children_explored = false;
144149
let explore_group_task =
145-
ExploreGroupTask::with_parent(self.ctx.clone(), *child, &self.ref_count);
150+
ExploreGroupTask::new(self.ctx.clone(), *child).with_parent(&self.ref_count);
146151
scheduler.add_task(Task::ExploreGroup(explore_group_task));
147152
}
148153
}
@@ -157,7 +162,7 @@ impl ExploreExprTask {
157162
fn explore_self(
158163
&mut self,
159164
optimizer: &mut CascadesOptimizer,
160-
scheduler: &mut Scheduler,
165+
scheduler: &mut Scheduler<'_>,
161166
) -> Result<ExploreExprEvent> {
162167
let m_expr = optimizer
163168
.memo
@@ -181,4 +186,14 @@ impl ExploreExprTask {
181186
}
182187
Ok(ExploreExprEvent::ExploredSelf)
183188
}
189+
190+
fn terminate(&mut self) -> Result<()> {
191+
if let Some(parent) = &self.parent {
192+
parent.dec();
193+
}
194+
195+
self.state = ExploreExprState::ExploredSelf;
196+
197+
Ok(())
198+
}
184199
}

โ€Žsrc/query/sql/src/planner/optimizer/cascades/tasks/explore_group.rs

Lines changed: 31 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,8 @@ pub struct ExploreGroupTask {
5252

5353
pub ref_count: Rc<SharedCounter>,
5454
pub parent: Option<Rc<SharedCounter>>,
55+
56+
pub should_terminate: bool,
5557
}
5658

5759
impl ExploreGroupTask {
@@ -63,28 +65,33 @@ impl ExploreGroupTask {
6365
last_explored_m_expr: None,
6466
ref_count: Rc::new(SharedCounter::new()),
6567
parent: None,
68+
should_terminate: false,
6669
}
6770
}
6871

69-
pub fn with_parent(
70-
ctx: Arc<dyn TableContext>,
71-
group_index: IndexType,
72-
parent: &Rc<SharedCounter>,
73-
) -> Self {
74-
let mut task = Self::new(ctx, group_index);
72+
pub fn with_parent(mut self, parent: &Rc<SharedCounter>) -> Self {
7573
parent.inc();
76-
task.parent = Some(parent.clone());
77-
task
74+
self.parent = Some(parent.clone());
75+
self
76+
}
77+
78+
/// Mark this task as a termination task.
79+
pub fn with_termination(mut self) -> Self {
80+
self.should_terminate = true;
81+
self
7882
}
7983

8084
pub fn execute(
8185
mut self,
8286
optimizer: &mut CascadesOptimizer,
83-
scheduler: &mut Scheduler,
87+
scheduler: &mut Scheduler<'_>,
8488
) -> Result<()> {
8589
if matches!(self.state, ExploreGroupState::Explored) {
8690
return Ok(());
8791
}
92+
if self.should_terminate {
93+
return self.terminate(optimizer);
94+
}
8895
self.transition(optimizer, scheduler)?;
8996
scheduler.add_task(Task::ExploreGroup(self));
9097
Ok(())
@@ -93,7 +100,7 @@ impl ExploreGroupTask {
93100
pub fn transition(
94101
&mut self,
95102
optimizer: &mut CascadesOptimizer,
96-
scheduler: &mut Scheduler,
103+
scheduler: &mut Scheduler<'_>,
97104
) -> Result<()> {
98105
let event = match self.state {
99106
ExploreGroupState::Init => self.explore_group(optimizer, scheduler)?,
@@ -115,7 +122,7 @@ impl ExploreGroupTask {
115122
fn explore_group(
116123
&mut self,
117124
optimizer: &mut CascadesOptimizer,
118-
scheduler: &mut Scheduler,
125+
scheduler: &mut Scheduler<'_>,
119126
) -> Result<ExploreGroupEvent> {
120127
let group = optimizer.memo.group_mut(self.group_index)?;
121128

@@ -130,17 +137,24 @@ impl ExploreGroupTask {
130137
}
131138

132139
for m_expr in group.m_exprs.iter().skip(start_index) {
133-
let task = ExploreExprTask::with_parent(
134-
self.ctx.clone(),
135-
m_expr.group_index,
136-
m_expr.index,
137-
&self.ref_count,
138-
);
140+
let task = ExploreExprTask::new(self.ctx.clone(), m_expr.group_index, m_expr.index)
141+
.with_parent(&self.ref_count);
139142
scheduler.add_task(Task::ExploreExpr(task));
140143
}
141144

142145
self.last_explored_m_expr = Some(group.num_exprs());
143146

144147
Ok(ExploreGroupEvent::Exploring)
145148
}
149+
150+
pub fn terminate(&mut self, cascades_optimizer: &mut CascadesOptimizer) -> Result<()> {
151+
if let Some(parent) = &self.parent {
152+
parent.dec();
153+
}
154+
155+
let group = cascades_optimizer.memo.group_mut(self.group_index)?;
156+
group.set_state(GroupState::Explored);
157+
158+
Ok(())
159+
}
146160
}

โ€Žsrc/query/sql/src/planner/optimizer/cascades/tasks/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,7 @@ impl Task {
7070
pub fn execute(
7171
self,
7272
optimizer: &mut CascadesOptimizer,
73-
scheduler: &mut Scheduler,
73+
scheduler: &mut Scheduler<'_>,
7474
) -> Result<()> {
7575
match self {
7676
Task::ApplyRule(task) => task.execute(optimizer),

โ€Žsrc/query/sql/src/planner/optimizer/cascades/tasks/optimize_group.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -126,8 +126,8 @@ impl OptimizeGroupTask {
126126
) -> Result<OptimizeGroupEvent> {
127127
let group = optimizer.memo.group(self.group_index)?;
128128
if !group.state.explored() {
129-
let task =
130-
ExploreGroupTask::with_parent(self.ctx.clone(), group.group_index, &self.ref_count);
129+
let task = ExploreGroupTask::new(self.ctx.clone(), group.group_index)
130+
.with_parent(&self.ref_count);
131131
scheduler.add_task(Task::ExploreGroup(task));
132132
Ok(OptimizeGroupEvent::Exploring)
133133
} else {

โ€Žsrc/query/sql/src/planner/optimizer/memo.rs

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -149,4 +149,17 @@ impl Memo {
149149
self.groups.push(group);
150150
group_index
151151
}
152+
153+
/// Get an estimate of the memory size of the memo.
154+
pub fn mem_size(&self) -> usize {
155+
// Since all the `RelOperator` are interned,
156+
// we only need to count the size of `m_expr_lookup_table`.
157+
// We assume the `RelOperator`s are the major part of the memo.
158+
self.m_expr_lookup_table.len() * std::mem::size_of::<RelOperator>()
159+
}
160+
161+
/// Get the number of groups in the memo.
162+
pub fn num_groups(&self) -> usize {
163+
self.groups.len()
164+
}
152165
}

0 commit comments

Comments
ย (0)