Skip to content

fix(query): refactor RelOp #18302

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 16 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 2 additions & 6 deletions src/query/ee/tests/it/aggregating_index/index_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ use databend_common_expression::SendableDataBlockStream;
use databend_common_expression::SortColumnDescription;
use databend_common_sql::optimizer::SExpr;
use databend_common_sql::planner::plans::Plan;
use databend_common_sql::plans::RelOperator;
use databend_common_sql::Planner;
use databend_enterprise_query::test_kits::context::EESetup;
use databend_query::interpreters::InterpreterFactory;
Expand Down Expand Up @@ -596,7 +595,7 @@ fn is_index_scan_plan(plan: &Plan) -> bool {
}

fn is_index_scan_sexpr(s_expr: &SExpr) -> bool {
if let RelOperator::Scan(scan) = s_expr.plan() {
if let Some(scan) = s_expr.plan.as_any().downcast_ref::<Scan>() {
scan.agg_index.is_some()
} else {
s_expr.children().any(is_index_scan_sexpr)
Expand Down Expand Up @@ -1041,10 +1040,7 @@ async fn test_fuzz_impl(format: &str, spill: bool) -> Result<()> {
"query_out_of_memory_behavior".to_string(),
"spilling".to_string(),
),
(
"max_query_memory_usage".to_string(),
"1".to_string(),
),
("max_query_memory_usage".to_string(), "1".to_string()),
]))
} else {
None
Expand Down
4 changes: 2 additions & 2 deletions src/query/service/src/interpreters/access/privilege_access.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1038,7 +1038,7 @@ impl AccessChecker for PrivilegeAccess {
self.validate_table_access(&plan.catalog, &plan.database, &plan.table, UserPrivilegeType::Super, false, false).await?
}
Plan::OptimizeCompactBlock { s_expr, .. } => {
let plan: OptimizeCompactBlock = s_expr.plan().clone().try_into()?;
let plan = s_expr.plan().as_any().downcast_ref::<OptimizeCompactBlock>().unwrap();
self.validate_table_access(&plan.catalog, &plan.database, &plan.table, UserPrivilegeType::Super, false, false).await?
}
Plan::VacuumTable(plan) => {
Expand Down Expand Up @@ -1095,7 +1095,7 @@ impl AccessChecker for PrivilegeAccess {
self.validate_insert_source(ctx, &plan.source).await?;
}
Plan::DataMutation { s_expr, .. } => {
let plan: Mutation = s_expr.plan().clone().try_into()?;
let plan = s_expr.plan().as_any().downcast_ref::<Mutation>().unwrap();
if enable_experimental_rbac_check {
let s_expr = s_expr.child(0)?;
match s_expr.get_udfs() {
Expand Down
7 changes: 3 additions & 4 deletions src/query/service/src/interpreters/hook/compact_hook.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ use databend_common_sql::executor::physical_plans::MutationKind;
use databend_common_sql::optimizer::ir::SExpr;
use databend_common_sql::plans::OptimizeCompactBlock;
use databend_common_sql::plans::ReclusterPlan;
use databend_common_sql::plans::RelOperator;
use databend_storages_common_table_meta::table::ClusterType;
use log::info;

Expand Down Expand Up @@ -169,13 +168,13 @@ async fn compact_table(

{
// do compact.
let compact_block = RelOperator::CompactBlock(OptimizeCompactBlock {
let compact_block = OptimizeCompactBlock {
catalog: compact_target.catalog.clone(),
database: compact_target.database.clone(),
table: compact_target.table.clone(),
limit: compaction_limits.clone(),
});
let s_expr = SExpr::create_leaf(Arc::new(compact_block));
};
let s_expr = SExpr::create_leaf(compact_block);
let compact_interpreter = OptimizeCompactBlockInterpreter::try_create(
ctx.clone(),
s_expr,
Expand Down
14 changes: 7 additions & 7 deletions src/query/service/src/interpreters/interpreter_explain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -150,14 +150,14 @@ impl Interpreter for ExplainInterpreter {
schema,
metadata,
} => {
let mutation: Mutation = s_expr.plan().clone().try_into()?;
let mutation = s_expr.plan().as_any().downcast_ref::<Mutation>().unwrap();
let interpreter = MutationInterpreter::try_create(
self.ctx.clone(),
*s_expr.clone(),
schema.clone(),
metadata.clone(),
)?;
let mut plan = interpreter.build_physical_plan(&mutation, true).await?;
let mut plan = interpreter.build_physical_plan(mutation, true).await?;
self.inject_pruned_partitions_stats(&mut plan, metadata)?;
self.explain_physical_plan(&plan, metadata, &None).await?
}
Expand Down Expand Up @@ -199,13 +199,13 @@ impl Interpreter for ExplainInterpreter {
.await?
}
Plan::DataMutation { s_expr, .. } => {
let plan: Mutation = s_expr.plan().clone().try_into()?;
let mutation = s_expr.plan().as_any().downcast_ref::<Mutation>().unwrap();
let mutation_build_info =
build_mutation_info(self.ctx.clone(), &plan, true).await?;
build_mutation_info(self.ctx.clone(), mutation, true).await?;
self.explain_analyze(
s_expr.child(0)?,
&plan.metadata,
*plan.required_columns.clone(),
&mutation.metadata,
mutation.required_columns.clone(),
Some(mutation_build_info),
true,
)
Expand Down Expand Up @@ -562,7 +562,7 @@ impl ExplainInterpreter {
s_expr: SExpr,
schema: DataSchemaRef,
) -> Result<Vec<DataBlock>> {
let mutation: Mutation = s_expr.plan().clone().try_into()?;
let mutation = s_expr.plan().as_any().downcast_ref::<Mutation>().unwrap();
let interpreter = MutationInterpreter::try_create(
self.ctx.clone(),
s_expr,
Expand Down
2 changes: 1 addition & 1 deletion src/query/service/src/interpreters/interpreter_factory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -488,7 +488,7 @@ impl InterpreterFactory {

Plan::Replace(replace) => ReplaceInterpreter::try_create(ctx, *replace.clone()),
Plan::DataMutation { s_expr, schema, .. } => {
let mutation: Mutation = s_expr.plan().clone().try_into()?;
let mutation = s_expr.plan().as_any().downcast_ref::<Mutation>().unwrap();
Ok(Arc::new(MutationInterpreter::try_create(
ctx,
*s_expr.clone(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,10 @@ use databend_common_sql::executor::physical_plans::TableScan;
use databend_common_sql::executor::PhysicalPlan;
use databend_common_sql::executor::PhysicalPlanBuilder;
use databend_common_sql::executor::PhysicalPlanReplacer;
use databend_common_sql::plans::EvalScalar;
use databend_common_sql::plans::Operator;
use databend_common_sql::plans::Plan;
use databend_common_sql::plans::RefreshIndexPlan;
use databend_common_sql::plans::RelOperator;
use databend_common_storages_fuse::operations::AggIndexSink;
use databend_common_storages_fuse::pruning::create_segment_location_vector;
use databend_common_storages_fuse::FuseBlockPartInfo;
Expand Down Expand Up @@ -223,7 +224,8 @@ impl Interpreter for RefreshIndexInterpreter {
bind_context,
..
} => {
let schema = if let RelOperator::EvalScalar(eval) = s_expr.plan() {
let schema = if let Some(eval) = s_expr.plan().as_any().downcast_ref::<EvalScalar>()
{
let fields = eval
.items
.iter()
Expand Down
10 changes: 7 additions & 3 deletions src/query/service/src/interpreters/interpreter_mutation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,11 +93,15 @@ impl Interpreter for MutationInterpreter {
return Ok(PipelineBuildResult::create());
}

let mutation: Mutation = self.s_expr.plan().clone().try_into()?;
let mutation = self
.s_expr
.plan()
.as_any()
.downcast_ref::<Mutation>()
.unwrap();

// Build physical plan.
let physical_plan = self.build_physical_plan(&mutation, false).await?;

let physical_plan = self.build_physical_plan(mutation, false).await?;
let query_plan = physical_plan
.format(self.metadata.clone(), Default::default())?
.format_pretty()?;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,12 @@ impl Interpreter for OptimizeCompactBlockInterpreter {
database,
table,
limit,
} = self.s_expr.plan().clone().try_into()?;
} = self
.s_expr
.plan()
.as_any()
.downcast_ref::<OptimizeCompactBlock>()
.unwrap();

// try add lock table.
let lock_guard = self
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ impl Interpreter for AddTableColumnInterpreter {
let mut planner = Planner::new(self.ctx.clone());
let (plan, _) = planner.plan_sql(&query).await?;
if let Plan::DataMutation { s_expr, schema, .. } = plan {
let mutation: Mutation = s_expr.plan().clone().try_into()?;
let mutation = s_expr.plan().as_any().downcast_ref::<Mutation>().unwrap();
let interpreter = MutationInterpreter::try_create(
self.ctx.clone(),
*s_expr,
Expand Down
2 changes: 1 addition & 1 deletion src/query/service/src/test_kits/fuse.rs
Original file line number Diff line number Diff line change
Expand Up @@ -332,7 +332,7 @@ pub async fn do_mutation(
s_expr: SExpr,
schema: DataSchemaRef,
) -> Result<()> {
let mutation: Mutation = s_expr.plan().clone().try_into()?;
let mutation = s_expr.plan().as_any().downcast_ref::<Mutation>().unwrap();
let interpreter =
MutationInterpreter::try_create(ctx.clone(), s_expr, schema, mutation.metadata.clone())?;
let _ = interpreter.execute(ctx).await?;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ use databend_common_sql::plans::AggregateMode;
use databend_common_sql::plans::JoinType;
use databend_common_sql::plans::Limit;
use databend_common_sql::plans::Operator;
use databend_common_sql::plans::RelOperator;

use crate::sql::planner::optimizer::test_utils::ExprBuilder;

Expand Down Expand Up @@ -227,17 +226,14 @@ fn test_replace_node() {
self.node_types
.push(format!("{:?}", expr.plan.as_ref().rel_op()));
let op = expr.plan();
if let RelOperator::Limit(limit) = op {
if let Some(limit) = op.as_any().downcast_ref::<Limit>() {
if let Some(limit_value) = limit.limit {
let new_limit = Limit {
limit: Some(limit_value * 2),
offset: limit.offset,
before_exchange: limit.before_exchange,
};
let replacement = SExpr::create_unary(
Arc::new(RelOperator::Limit(new_limit)),
Arc::new(expr.child(0).unwrap().clone()),
);
let replacement = SExpr::create_unary(new_limit, expr.child(0).unwrap());
return Ok(VisitAction::Replace(replacement));
}
}
Expand Down Expand Up @@ -267,7 +263,7 @@ fn test_replace_node() {

// Check if the Limit has been doubled
let op = transformed.plan();
if let RelOperator::Limit(limit) = op {
if let Some(limit) = op.as_any().downcast_ref::<Limit>() {
assert_eq!(limit.limit, Some(20));
} else {
panic!("Root should be a Limit node");
Expand Down Expand Up @@ -344,17 +340,14 @@ async fn test_async_replace_node() {
self.node_types
.push(format!("{:?}", expr.plan.as_ref().rel_op()));
let op = expr.plan();
if let RelOperator::Limit(limit) = op {
if let Some(limit) = op.as_any().downcast_ref::<Limit>() {
if let Some(limit_value) = limit.limit {
let new_limit = Limit {
limit: Some(limit_value * 2),
offset: limit.offset,
before_exchange: limit.before_exchange,
};
let replacement = SExpr::create_unary(
Arc::new(RelOperator::Limit(new_limit)),
Arc::new(expr.child(0).unwrap().clone()),
);
let replacement = SExpr::create_unary(new_limit, expr.child(0).unwrap());
return Ok(VisitAction::Replace(replacement));
}
}
Expand Down Expand Up @@ -384,7 +377,7 @@ async fn test_async_replace_node() {

// Check if the Limit has been doubled
let op = transformed.plan();
if let RelOperator::Limit(limit) = op {
if let Some(limit) = op.as_any().downcast_ref::<Limit>() {
assert_eq!(limit.limit, Some(20));
} else {
panic!("Root should be a Limit node");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,9 @@ use databend_common_sql::optimizer::ir::SExpr;
use databend_common_sql::optimizer::ir::SExprVisitor;
use databend_common_sql::optimizer::ir::VisitAction;
use databend_common_sql::optimizer::OptimizerContext;
use databend_common_sql::plans::Operator;
use databend_common_sql::plans::Plan;
use databend_common_sql::plans::RelOperator;
use databend_common_sql::plans::Scan;
use databend_common_sql::plans::Statistics;
use databend_common_sql::BaseTableColumn;
use databend_common_sql::ColumnEntry;
Expand Down Expand Up @@ -364,7 +365,7 @@ struct StatsApplier<'a> {

impl<'a> SExprVisitor for StatsApplier<'a> {
fn visit(&mut self, expr: &SExpr) -> Result<VisitAction> {
if let RelOperator::Scan(scan) = expr.plan() {
if let Some(scan) = expr.plan().as_any().downcast_ref::<Scan>() {
let metadata = self.metadata.read();
let table = metadata.table(scan.table_index);

Expand All @@ -387,9 +388,7 @@ impl<'a> SExprVisitor for StatsApplier<'a> {
histograms: HashMap::new(),
});

return Ok(VisitAction::Replace(
expr.replace_plan(Arc::new(RelOperator::Scan(new_scan))),
));
return Ok(VisitAction::Replace(expr.replace_plan(new_scan)));
}
}
Ok(VisitAction::Continue)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,9 @@ use databend_common_expression::types::NumberDataType;
use databend_common_sql::optimizer::ir::SExpr;
use databend_common_sql::optimizer::optimizers::operator::DeduplicateJoinConditionOptimizer;
use databend_common_sql::planner::plans::JoinType;
use databend_common_sql::planner::plans::RelOperator;
use databend_common_sql::planner::plans::ScalarExpr;
use databend_common_sql::plans::Join;
use databend_common_sql::plans::Scan;

use crate::sql::planner::optimizer::test_utils::*;

Expand All @@ -33,7 +34,7 @@ fn run_optimizer(s_expr: SExpr) -> Result<SExpr> {
/// Converts an SExpr to a readable string representation using a simple indented format
fn sexpr_to_string(s_expr: &SExpr) -> String {
fn format_join_conditions(s_expr: &SExpr) -> String {
if let RelOperator::Join(join) = s_expr.plan() {
if let Some(join) = s_expr.plan().as_any().downcast_ref::<Join>() {
let conditions: Vec<String> = join
.equi_conditions
.iter()
Expand Down Expand Up @@ -85,7 +86,7 @@ fn sexpr_to_string(s_expr: &SExpr) -> String {
let indent = " ".repeat(depth);
let mut result = String::new();

if let RelOperator::Join(_) = s_expr.plan() {
if let Some(join) = s_expr.plan().as_any().downcast_ref::<Join>() {
// Add the join node with conditions
result.push_str(&format!(
"{indent}Join {}\n",
Expand All @@ -97,7 +98,7 @@ fn sexpr_to_string(s_expr: &SExpr) -> String {
for child in children {
result.push_str(&build_tree(child, depth + 1));
}
} else if let RelOperator::Scan(scan) = s_expr.plan() {
} else if let Some(scan) = s_expr.plan().as_any().downcast_ref::<Scan>() {
// Leaf node (table scan)
result.push_str(&format!("{indent}Table t{}\n", scan.table_index));
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ use databend_common_sql::optimizer::OptimizerContext;
use databend_common_sql::plans::AggIndexInfo;
use databend_common_sql::plans::CreateTablePlan;
use databend_common_sql::plans::Plan;
use databend_common_sql::plans::RelOperator;
use databend_common_sql::plans::Scan;
use databend_common_sql::BindContext;
use databend_common_sql::Binder;
use databend_common_sql::Metadata;
Expand Down Expand Up @@ -463,10 +463,10 @@ async fn plan_sql(
}

fn find_push_down_index_info(s_expr: &SExpr) -> Result<&Option<AggIndexInfo>> {
match s_expr.plan() {
RelOperator::Scan(scan) => Ok(&scan.agg_index),
_ => find_push_down_index_info(s_expr.child(0)?),
if let Some(scan) = s_expr.plan().as_any().downcast_ref::<Scan>() {
return Ok(&scan.agg_index);
}
find_push_down_index_info(s_expr.child(0)?)
}

fn format_selection(info: &AggIndexInfo) -> Vec<String> {
Expand Down
Loading
Loading