Skip to content

Commit 38dd74d

Browse files
authored
chore(query): refactor RelOperator[part I] (#18336)
* fix * fix * chore(query): refactor reloperator * chore(query): refactor reloperator * chore(query): refactor reloperator * chore(query): refactor reloperator * chore(query): refactor reloperator
1 parent 1cd9c53 commit 38dd74d

File tree

21 files changed

+382
-1143
lines changed

21 files changed

+382
-1143
lines changed

src/query/sql/src/planner/binder/binder.rs

Lines changed: 1 addition & 106 deletions
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,6 @@ use crate::plans::DropRolePlan;
6464
use crate::plans::DropStagePlan;
6565
use crate::plans::DropUserPlan;
6666
use crate::plans::Plan;
67-
use crate::plans::RelOperator;
6867
use crate::plans::RewriteKind;
6968
use crate::plans::ShowConnectionsPlan;
7069
use crate::plans::ShowFileFormatsPlan;
@@ -106,7 +105,7 @@ pub struct Binder {
106105
pub subquery_executor: Option<Arc<dyn QueryExecutor>>,
107106
}
108107

109-
impl<'a> Binder {
108+
impl Binder {
110109
pub fn new(
111110
ctx: Arc<dyn TableContext>,
112111
catalogs: Arc<CatalogManager>,
@@ -952,110 +951,6 @@ impl<'a> Binder {
952951
Ok(finder.scalars().is_empty())
953952
}
954953

955-
#[allow(dead_code)]
956-
pub(crate) fn check_sexpr_top(&self, s_expr: &SExpr) -> Result<bool> {
957-
let f = |scalar: &ScalarExpr| matches!(scalar, ScalarExpr::UDFCall(_));
958-
let mut finder = Finder::new(&f);
959-
Self::check_sexpr(s_expr, &mut finder)
960-
}
961-
962-
#[recursive::recursive]
963-
pub(crate) fn check_sexpr<F>(s_expr: &'a SExpr, f: &'a mut Finder<'a, F>) -> Result<bool>
964-
where F: Fn(&ScalarExpr) -> bool {
965-
let result = match s_expr.plan.as_ref() {
966-
RelOperator::Scan(scan) => {
967-
f.reset_finder();
968-
if let Some(agg_info) = &scan.agg_index {
969-
for predicate in &agg_info.predicates {
970-
f.visit(predicate)?;
971-
}
972-
for selection in &agg_info.selection {
973-
f.visit(&selection.scalar)?;
974-
}
975-
}
976-
if let Some(predicates) = &scan.push_down_predicates {
977-
for predicate in predicates {
978-
f.visit(predicate)?;
979-
}
980-
}
981-
f.scalars().is_empty()
982-
}
983-
RelOperator::Join(join) => {
984-
f.reset_finder();
985-
for condition in join.equi_conditions.iter() {
986-
f.visit(&condition.left)?;
987-
f.visit(&condition.right)?;
988-
}
989-
for condition in &join.non_equi_conditions {
990-
f.visit(condition)?;
991-
}
992-
f.scalars().is_empty()
993-
}
994-
RelOperator::EvalScalar(eval) => {
995-
f.reset_finder();
996-
for item in &eval.items {
997-
f.visit(&item.scalar)?;
998-
}
999-
f.scalars().is_empty()
1000-
}
1001-
RelOperator::Filter(filter) => {
1002-
f.reset_finder();
1003-
for predicate in &filter.predicates {
1004-
f.visit(predicate)?;
1005-
}
1006-
f.scalars().is_empty()
1007-
}
1008-
RelOperator::Aggregate(aggregate) => {
1009-
f.reset_finder();
1010-
for item in &aggregate.group_items {
1011-
f.visit(&item.scalar)?;
1012-
}
1013-
for item in &aggregate.aggregate_functions {
1014-
f.visit(&item.scalar)?;
1015-
}
1016-
f.scalars().is_empty()
1017-
}
1018-
RelOperator::Exchange(exchange) => {
1019-
f.reset_finder();
1020-
if let crate::plans::Exchange::Hash(hash) = exchange {
1021-
for scalar in hash {
1022-
f.visit(scalar)?;
1023-
}
1024-
}
1025-
f.scalars().is_empty()
1026-
}
1027-
RelOperator::Window(window) => {
1028-
f.reset_finder();
1029-
for scalar_item in &window.arguments {
1030-
f.visit(&scalar_item.scalar)?;
1031-
}
1032-
for scalar_item in &window.partition_by {
1033-
f.visit(&scalar_item.scalar)?;
1034-
}
1035-
for info in &window.order_by {
1036-
f.visit(&info.order_by_item.scalar)?;
1037-
}
1038-
f.scalars().is_empty()
1039-
}
1040-
RelOperator::Udf(_) => false,
1041-
_ => true,
1042-
};
1043-
1044-
match result {
1045-
true => {
1046-
for child in &s_expr.children {
1047-
let mut finder = Finder::new(f.find_fn());
1048-
let flag = Self::check_sexpr(child.as_ref(), &mut finder)?;
1049-
if !flag {
1050-
return Ok(false);
1051-
}
1052-
}
1053-
Ok(true)
1054-
}
1055-
false => Ok(false),
1056-
}
1057-
}
1058-
1059954
pub(crate) fn check_allowed_scalar_expr_with_subquery_for_copy_table(
1060955
&self,
1061956
scalar: &ScalarExpr,

src/query/sql/src/planner/binder/util.rs

Lines changed: 6 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -48,31 +48,12 @@ impl Binder {
4848
cte_types: &mut Vec<DataType>,
4949
) -> Result<()> {
5050
match expr.plan() {
51-
RelOperator::Join(_) | RelOperator::UnionAll(_) => {
52-
self.count_r_cte_scan(expr.child(0)?, cte_scan_names, cte_types)?;
53-
self.count_r_cte_scan(expr.child(1)?, cte_scan_names, cte_types)?;
54-
}
55-
56-
RelOperator::ProjectSet(_)
57-
| RelOperator::AsyncFunction(_)
58-
| RelOperator::Udf(_)
59-
| RelOperator::EvalScalar(_)
60-
| RelOperator::Filter(_) => {
61-
self.count_r_cte_scan(expr.child(0)?, cte_scan_names, cte_types)?;
62-
}
6351
RelOperator::RecursiveCteScan(plan) => {
6452
cte_scan_names.push(plan.table_name.clone());
6553
if cte_types.is_empty() {
6654
cte_types.extend(plan.fields.iter().map(|f| f.data_type().clone()));
6755
}
6856
}
69-
70-
RelOperator::Exchange(_)
71-
| RelOperator::Scan(_)
72-
| RelOperator::DummyTableScan(_)
73-
| RelOperator::ConstantTableScan(_)
74-
| RelOperator::ExpressionScan(_)
75-
| RelOperator::CacheScan(_) => {}
7657
// Each recursive step in a recursive query generates new rows, and these rows are used for the next recursion.
7758
// Each step depends on the results of the previous step, so it's essential to ensure that the result set is built incrementally.
7859
// These operators need to operate on the entire result set,
@@ -89,6 +70,12 @@ impl Binder {
8970
expr.plan().rel_op()
9071
)));
9172
}
73+
_ => {
74+
let arity = expr.plan().arity();
75+
for i in 0..arity {
76+
self.count_r_cte_scan(expr.child(i)?, cte_scan_names, cte_types)?;
77+
}
78+
}
9279
}
9380
Ok(())
9481
}

src/query/sql/src/planner/format/display.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -157,7 +157,7 @@ where
157157
tree.children.extend(stats);
158158
}
159159

160-
let subquerys = op.get_subquery(vec![]);
160+
let subquerys = op.collect_subquery();
161161
if !subquerys.is_empty() {
162162
let subquerys = subquerys
163163
.into_iter()

0 commit comments

Comments
 (0)