Skip to content

Commit 0001c82

Browse files
authored
feat: support adjust query's priority (#15352)
* feat: support adjust query's priority Signed-off-by: Liuqing Yue <dqhl76@gmail.com> * chore: refine some comments Signed-off-by: Liuqing Yue <dqhl76@gmail.com> * enable queries executor Signed-off-by: Liuqing Yue <dqhl76@gmail.com> * fix: max points should load from atomic Signed-off-by: Liuqing Yue <dqhl76@gmail.com> * chore: make clippy happy Signed-off-by: Liuqing Yue <dqhl76@gmail.com> * update and refine comments Signed-off-by: Liuqing Yue <dqhl76@gmail.com> * fix unit test Signed-off-by: Liuqing Yue <dqhl76@gmail.com> fix: license header Signed-off-by: Liuqing Yue <dqhl76@gmail.com> fix: license header Signed-off-by: Liuqing Yue <dqhl76@gmail.com> * fix: fix scheduling Signed-off-by: Liuqing Yue <dqhl76@gmail.com> * fix: fix scheduling Signed-off-by: Liuqing Yue <dqhl76@gmail.com> * fixup stateful test Signed-off-by: Liuqing Yue <dqhl76@gmail.com> fixup stateful test Signed-off-by: Liuqing Yue <dqhl76@gmail.com> fix: ci test suits permission dennie Signed-off-by: Liuqing Yue <dqhl76@gmail.com> * feat: add cluster mode support for set priority Signed-off-by: Liuqing Yue <dqhl76@gmail.com> save Signed-off-by: Liuqing Yue <dqhl76@gmail.com> fixup Signed-off-by: Liuqing Yue <dqhl76@gmail.com> * fixup Signed-off-by: Liuqing Yue <dqhl76@gmail.com> * disable queries executor, ensure not affect Signed-off-by: Liuqing Yue <dqhl76@gmail.com> * disable test Signed-off-by: Liuqing Yue <dqhl76@gmail.com> --------- Signed-off-by: Liuqing Yue <dqhl76@gmail.com>
1 parent 3d1d40d commit 0001c82

34 files changed

+513
-22
lines changed

src/query/ast/src/ast/statements/mod.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ mod notification;
3434
mod password_policy;
3535
mod pipe;
3636
mod presign;
37+
mod priority;
3738
mod procedure;
3839
mod replace;
3940
mod script;
@@ -74,6 +75,7 @@ pub use notification::*;
7475
pub use password_policy::*;
7576
pub use pipe::*;
7677
pub use presign::*;
78+
pub use priority::*;
7779
pub use procedure::*;
7880
pub use replace::*;
7981
pub use script::*;
Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
// Copyright 2021 Datafuse Labs
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
use std::fmt::Display;
16+
use std::fmt::Formatter;
17+
18+
use derive_visitor::Drive;
19+
use derive_visitor::DriveMut;
20+
#[derive(Debug, Clone, PartialEq, Eq, Drive, DriveMut)]
21+
pub enum Priority {
22+
HIGH,
23+
MEDIUM,
24+
LOW,
25+
}
26+
27+
impl Display for Priority {
28+
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
29+
match self {
30+
Priority::HIGH => write!(f, "HIGH"),
31+
Priority::MEDIUM => write!(f, "MEDIUM"),
32+
Priority::LOW => write!(f, "LOW"),
33+
}
34+
}
35+
}

src/query/ast/src/ast/statements/statement.rs

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -334,6 +334,13 @@ pub enum Statement {
334334
// sequence
335335
CreateSequence(CreateSequenceStmt),
336336
DropSequence(DropSequenceStmt),
337+
338+
// Set priority for query
339+
SetPriority {
340+
priority: Priority,
341+
#[drive(skip)]
342+
object_id: String,
343+
},
337344
}
338345

339346
#[derive(Debug, Clone, PartialEq)]
@@ -729,6 +736,14 @@ impl Display for Statement {
729736
Statement::CreateSequence(stmt) => write!(f, "{stmt}")?,
730737
Statement::DropSequence(stmt) => write!(f, "{stmt}")?,
731738
Statement::CreateDynamicTable(stmt) => write!(f, "{stmt}")?,
739+
Statement::SetPriority {
740+
priority,
741+
object_id,
742+
} => {
743+
write!(f, "SET PRIORITY")?;
744+
write!(f, " {priority}")?;
745+
write!(f, " '{object_id}'")?;
746+
}
732747
}
733748
Ok(())
734749
}

src/query/ast/src/ast/visitors/visitor.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -834,4 +834,5 @@ pub trait Visitor<'ast>: Sized {
834834

835835
fn visit_create_sequence(&mut self, _stmt: &'ast CreateSequenceStmt) {}
836836
fn visit_drop_sequence(&mut self, _stmt: &'ast DropSequenceStmt) {}
837+
fn visit_set_priority(&mut self, _priority: &'ast Priority, _object_id: &'ast str) {}
837838
}

src/query/ast/src/ast/visitors/visitor_mut.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -845,4 +845,5 @@ pub trait VisitorMut: Sized {
845845

846846
fn visit_create_sequence(&mut self, _stmt: &mut CreateSequenceStmt) {}
847847
fn visit_drop_sequence(&mut self, _stmt: &mut DropSequenceStmt) {}
848+
fn visit_set_priority(&mut self, _priority: &mut Priority, _object_id: &mut String) {}
848849
}

src/query/ast/src/ast/visitors/walk.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -589,5 +589,9 @@ pub fn walk_statement<'a, V: Visitor<'a>>(visitor: &mut V, statement: &'a Statem
589589
Statement::CreateSequence(stmt) => visitor.visit_create_sequence(stmt),
590590
Statement::DropSequence(stmt) => visitor.visit_drop_sequence(stmt),
591591
Statement::CreateDynamicTable(stmt) => visitor.visit_create_dynamic_table(stmt),
592+
Statement::SetPriority {
593+
priority,
594+
object_id,
595+
} => visitor.visit_set_priority(priority, object_id),
592596
}
593597
}

src/query/ast/src/ast/visitors/walk_mut.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -588,5 +588,9 @@ pub fn walk_statement_mut<V: VisitorMut>(visitor: &mut V, statement: &mut Statem
588588
Statement::ExecuteImmediate(_) => {}
589589
Statement::CreateSequence(stmt) => visitor.visit_create_sequence(stmt),
590590
Statement::DropSequence(stmt) => visitor.visit_drop_sequence(stmt),
591+
Statement::SetPriority {
592+
priority,
593+
object_id,
594+
} => visitor.visit_set_priority(priority, object_id),
591595
}
592596
}

src/query/ast/src/parser/statement.rs

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -350,6 +350,16 @@ pub fn statement_body(i: Input) -> IResult<Statement> {
350350
},
351351
);
352352

353+
let set_priority = map(
354+
rule! {
355+
SET ~ PRIORITY ~ #priority ~ #parameter_to_string
356+
},
357+
|(_, _, priority, object_id)| Statement::SetPriority {
358+
object_id,
359+
priority,
360+
},
361+
);
362+
353363
let set_variable = map(
354364
rule! {
355365
SET ~ GLOBAL? ~ #ident ~ "=" ~ #subexpr(0)
@@ -2052,6 +2062,7 @@ pub fn statement_body(i: Input) -> IResult<Statement> {
20522062
| #show_locks : "`SHOW LOCKS [IN ACCOUNT] [WHERE ...]`"
20532063
| #kill_stmt : "`KILL (QUERY | CONNECTION) <object_id>`"
20542064
| #vacuum_temp_files : "VACUUM TEMPORARY FILES [RETAIN number SECONDS|DAYS] [LIMIT number]"
2065+
| #set_priority: "SET PRIORITY (HIGH | MEDIUM | LOW) <object_id>"
20552066
),
20562067
// database
20572068
rule!(
@@ -3636,6 +3647,14 @@ pub fn kill_target(i: Input) -> IResult<KillTarget> {
36363647
))(i)
36373648
}
36383649

3650+
pub fn priority(i: Input) -> IResult<Priority> {
3651+
alt((
3652+
value(Priority::LOW, rule! { LOW }),
3653+
value(Priority::MEDIUM, rule! { MEDIUM }),
3654+
value(Priority::HIGH, rule! { HIGH }),
3655+
))(i)
3656+
}
3657+
36393658
pub fn limit_where(i: Input) -> IResult<ShowLimit> {
36403659
map(
36413660
rule! {

src/query/ast/src/parser/token.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -637,6 +637,8 @@ pub enum TokenKind {
637637
GZIP,
638638
#[token("HAVING", ignore(ascii_case))]
639639
HAVING,
640+
#[token("HIGH", ignore(ascii_case))]
641+
HIGH,
640642
#[token("HISTORY", ignore(ascii_case))]
641643
HISTORY,
642644
#[token("HIVE", ignore(ascii_case))]
@@ -736,6 +738,8 @@ pub enum TokenKind {
736738
LIMIT,
737739
#[token("LIST", ignore(ascii_case))]
738740
LIST,
741+
#[token("LOW", ignore(ascii_case))]
742+
LOW,
739743
#[token("LZO", ignore(ascii_case))]
740744
LZO,
741745
#[token("MASKING", ignore(ascii_case))]
@@ -746,6 +750,8 @@ pub enum TokenKind {
746750
MAX_FILE_SIZE,
747751
#[token("MASTER_KEY", ignore(ascii_case))]
748752
MASTER_KEY,
753+
#[token("MEDIUM", ignore(ascii_case))]
754+
MEDIUM,
749755
#[token("MEMO", ignore(ascii_case))]
750756
MEMO,
751757
#[token("MEMORY", ignore(ascii_case))]
@@ -862,6 +868,8 @@ pub enum TokenKind {
862868
POSITION,
863869
#[token("PROCESSLIST", ignore(ascii_case))]
864870
PROCESSLIST,
871+
#[token("PRIORITY", ignore(ascii_case))]
872+
PRIORITY,
865873
#[token("PURGE", ignore(ascii_case))]
866874
PURGE,
867875
#[token("QUARTER", ignore(ascii_case))]

src/query/service/src/interpreters/access/privilege_access.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -988,7 +988,7 @@ impl AccessChecker for PrivilegeAccess {
988988
self.validate_access(&GrantObject::Global, UserPrivilegeType::Grant)
989989
.await?;
990990
}
991-
Plan::SetVariable(_) | Plan::UnSetVariable(_) | Plan::Kill(_) => {
991+
Plan::SetVariable(_) | Plan::UnSetVariable(_) | Plan::Kill(_) | Plan::SetPriority(_) => {
992992
self.validate_access(&GrantObject::Global, UserPrivilegeType::Super)
993993
.await?;
994994
}

0 commit comments

Comments
 (0)