Skip to content

refactor(query): use trait to refactor physical plan #18268

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 8 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use databend_common_expression::infer_table_schema;
use databend_common_meta_app::schema::UpdateStreamMetaReq;
use databend_common_pipeline_core::ExecutionInfo;
use databend_common_sql::executor::physical_plans::CopyIntoLocation;
use databend_common_sql::executor::PhysicalPlan;
use databend_common_sql::executor::{IPhysicalPlan, PhysicalPlan, PhysicalPlanMeta};
use databend_storages_common_stage::CopyIntoLocationInfo;
use log::debug;
use log::info;
Expand Down Expand Up @@ -89,14 +89,14 @@ impl CopyIntoLocationInterpreter {
let query_result_schema = query_interpreter.get_result_schema();
let table_schema = infer_table_schema(&query_result_schema)?;

let mut physical_plan = PhysicalPlan::CopyIntoLocation(Box::new(CopyIntoLocation {
plan_id: 0,
input: Box::new(query_physical_plan),
let mut physical_plan: Box<dyn IPhysicalPlan> = Box::new(CopyIntoLocation {
input: query_physical_plan,
project_columns: query_interpreter.get_result_columns(),
input_data_schema: query_result_schema,
input_table_schema: table_schema,
info: info.clone(),
}));
meta: PhysicalPlanMeta::new("CopyIntoLocation"),
});

let mut next_plan_id = 0;
physical_plan.adjust_plan_id(&mut next_plan_id);
Expand Down
42 changes: 21 additions & 21 deletions src/query/service/src/interpreters/interpreter_copy_into_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ use databend_common_sql::executor::physical_plans::FragmentKind;
use databend_common_sql::executor::physical_plans::MutationKind;
use databend_common_sql::executor::physical_plans::TableScan;
use databend_common_sql::executor::table_read_plan::ToReadDataSourcePlan;
use databend_common_sql::executor::PhysicalPlan;
use databend_common_sql::executor::{IPhysicalPlan, PhysicalPlan, PhysicalPlanMeta};
use databend_common_storage::StageFileInfo;
use databend_common_storages_fuse::FuseTable;
use databend_common_storages_stage::StageTable;
Expand Down Expand Up @@ -100,7 +100,7 @@ impl CopyIntoTableInterpreter {
&self,
table_info: TableInfo,
plan: &CopyIntoTablePlan,
) -> Result<(PhysicalPlan, Vec<UpdateStreamMetaReq>)> {
) -> Result<(Box<dyn IPhysicalPlan>, Vec<UpdateStreamMetaReq>)> {
let to_table = self
.ctx
.get_table(
Expand All @@ -125,7 +125,7 @@ impl CopyIntoTableInterpreter {

let (query_interpreter, update_stream_meta) = self.build_query(&query).await?;
update_stream_meta_reqs = update_stream_meta;
let query_physical_plan = Box::new(query_interpreter.build_physical_plan().await?);
let query_physical_plan = query_interpreter.build_physical_plan().await?;

let result_columns = query_interpreter.get_result_columns();
(
Expand All @@ -145,21 +145,20 @@ impl CopyIntoTableInterpreter {
}

(
CopyIntoTableSource::Stage(Box::new(PhysicalPlan::TableScan(TableScan {
plan_id: 0,
CopyIntoTableSource::Stage(Box::new(TableScan {
scan_id: 0,
name_mapping,
stat_info: None,
table_index: None,
internal_column: None,
source: Box::new(data_source_plan),
}))),
meta: PhysicalPlanMeta::new("TableScan"),
})),
None,
)
};

let mut root = PhysicalPlan::CopyIntoTable(Box::new(CopyIntoTable {
plan_id: 0,
let mut root: Box<dyn IPhysicalPlan> = Box::new(CopyIntoTable {
required_values_schema: plan.required_values_schema.clone(),
values_consts: plan.values_consts.clone(),
required_source_schema: plan.required_source_schema.clone(),
Expand All @@ -171,16 +170,17 @@ impl CopyIntoTableInterpreter {
source,
is_transform: plan.is_transform,
table_meta_timestamps,
}));
meta: PhysicalPlanMeta::new("CopyIntoTable"),
});

if plan.enable_distributed {
root = PhysicalPlan::Exchange(Exchange {
plan_id: 0,
input: Box::new(root),
root = Box::new(Exchange {
input: root,
kind: FragmentKind::Merge,
keys: Vec::new(),
allow_adjust_parallelism: true,
ignore_exchange: false,
meta: PhysicalPlanMeta::new("Exchange"),
});
}

Expand Down Expand Up @@ -318,14 +318,14 @@ impl CopyIntoTableInterpreter {

if self.plan.stage_table_info.copy_into_table_options.purge
&& !self
.plan
.stage_table_info
.duplicated_files_detected
.is_empty()
.plan
.stage_table_info
.duplicated_files_detected
.is_empty()
&& self
.ctx
.get_settings()
.get_enable_purge_duplicated_files_in_copy()?
.ctx
.get_settings()
.get_enable_purge_duplicated_files_in_copy()?
{
info!(
"purge_duplicated_files_in_copy enabled, number of duplicated files: {}",
Expand All @@ -337,7 +337,7 @@ impl CopyIntoTableInterpreter {
self.plan.stage_table_info.duplicated_files_detected.clone(),
self.plan.stage_table_info.stage_info.clone(),
)
.await?;
.await?;
}
Ok(PipelineBuildResult::create())
}
Expand Down Expand Up @@ -406,7 +406,7 @@ impl Interpreter for CopyIntoTableInterpreter {
unsafe { self.ctx.get_settings().get_deduplicate_label()? },
self.plan.path_prefix.clone(),
)
.await?;
.await?;
}

// Execute hook.
Expand Down
47 changes: 13 additions & 34 deletions src/query/service/src/interpreters/interpreter_explain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ use databend_common_pipeline_core::processors::PlanProfile;
use databend_common_pipeline_core::ExecutionInfo;
use databend_common_pipeline_core::Pipeline;
use databend_common_sql::binder::ExplainConfig;
use databend_common_sql::executor::format_partial_tree;
use databend_common_sql::executor::IPhysicalPlan;
use databend_common_sql::executor::MutationBuildInfo;
use databend_common_sql::plans::Mutation;
use databend_common_sql::BindContext;
Expand Down Expand Up @@ -68,6 +68,7 @@ use crate::sql::executor::PhysicalPlan;
use crate::sql::executor::PhysicalPlanBuilder;
use crate::sql::optimizer::ir::SExpr;
use crate::sql::plans::Plan;
use databend_common_sql::executor::PhysicalPlanDynExt;

pub struct ExplainInterpreter {
ctx: Arc<QueryContext>,
Expand Down Expand Up @@ -164,22 +165,11 @@ impl Interpreter for ExplainInterpreter {
_ => self.explain_plan(&self.plan)?,
},

ExplainKind::Join => match &self.plan {
Plan::Query {
s_expr,
metadata,
bind_context,
..
} => {
let ctx = self.ctx.clone();
let mut builder = PhysicalPlanBuilder::new(metadata.clone(), ctx, true);
let plan = builder.build(s_expr, bind_context.column_set()).await?;
self.explain_join_order(&plan, metadata)?
}
_ => Err(ErrorCode::Unimplemented(
ExplainKind::Join => {
return Err(ErrorCode::Unimplemented(
"Unsupported EXPLAIN JOIN statement",
))?,
},
));
}

ExplainKind::AnalyzePlan | ExplainKind::Graphical => match &self.plan {
Plan::Query {
Expand Down Expand Up @@ -324,7 +314,7 @@ impl ExplainInterpreter {

pub async fn explain_physical_plan(
&self,
plan: &PhysicalPlan,
plan: &Box<dyn IPhysicalPlan>,
metadata: &MetadataRef,
formatted_ast: &Option<String>,
) -> Result<Vec<DataBlock>> {
Expand Down Expand Up @@ -360,25 +350,15 @@ impl ExplainInterpreter {
}
}

let metadata = metadata.read();
let result = plan
.format(metadata.clone(), Default::default())?
.format(&metadata, Default::default())?
.format_pretty()?;
let line_split_result: Vec<&str> = result.lines().collect();
let formatted_plan = StringType::from_data(line_split_result);
Ok(vec![DataBlock::new_from_columns(vec![formatted_plan])])
}

pub fn explain_join_order(
&self,
plan: &PhysicalPlan,
metadata: &MetadataRef,
) -> Result<Vec<DataBlock>> {
let result = plan.format_join(metadata)?.format_pretty()?;
let line_split_result: Vec<&str> = result.lines().collect();
let formatted_plan = StringType::from_data(line_split_result);
Ok(vec![DataBlock::new_from_columns(vec![formatted_plan])])
}

fn format_pipeline(build_res: &PipelineBuildResult) -> Vec<DataBlock> {
let mut blocks = Vec::with_capacity(1 + build_res.sources_pipelines.len());
// Format root pipeline
Expand Down Expand Up @@ -479,10 +459,9 @@ impl ExplainInterpreter {
if !pruned_partitions_stats.is_empty() {
plan.set_pruning_stats(&mut pruned_partitions_stats);
}
let result = if self.partial {
format_partial_tree(&plan, metadata, &query_profiles)?.format_pretty()?
} else {
plan.format(metadata.clone(), query_profiles.clone())?
let result = {
let metadata = metadata.read();
plan.format(&metadata, query_profiles.clone())?
.format_pretty()?
};
let line_split_result: Vec<&str> = result.lines().collect();
Expand Down Expand Up @@ -586,7 +565,7 @@ impl ExplainInterpreter {

fn inject_pruned_partitions_stats(
&self,
plan: &mut PhysicalPlan,
plan: &mut Box<dyn IPhysicalPlan>,
metadata: &MetadataRef,
) -> Result<()> {
let mut sources = vec![];
Expand Down
61 changes: 32 additions & 29 deletions src/query/service/src/interpreters/interpreter_index_refresh.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,10 @@ use databend_common_pipeline_transforms::processors::TransformPipelineHelper;
use databend_common_sql::evaluator::BlockOperator;
use databend_common_sql::evaluator::CompoundBlockOperator;
use databend_common_sql::executor::physical_plans::TableScan;
use databend_common_sql::executor::{DeriveHandle, PhysicalPlanDynExt};
use databend_common_sql::executor::IPhysicalPlan;
use databend_common_sql::executor::PhysicalPlan;
use databend_common_sql::executor::PhysicalPlanBuilder;
use databend_common_sql::executor::PhysicalPlanReplacer;
use databend_common_sql::plans::Plan;
use databend_common_sql::plans::RefreshIndexPlan;
use databend_common_sql::plans::RelOperator;
Expand All @@ -54,6 +55,7 @@ use crate::interpreters::Interpreter;
use crate::pipelines::PipelineBuildResult;
use crate::schedulers::build_query_pipeline_without_render_result_set;
use crate::sessions::QueryContext;
use crate::test_kits::query_count;

pub struct RefreshIndexInterpreter {
ctx: Arc<QueryContext>,
Expand Down Expand Up @@ -120,32 +122,20 @@ impl RefreshIndexInterpreter {
#[async_backtrace::framed]
async fn get_read_source(
&self,
query_plan: &PhysicalPlan,
query_plan: &Box<dyn IPhysicalPlan>,
fuse_table: Arc<FuseTable>,
segments: Option<Vec<Location>>,
) -> Result<Option<DataSourcePlan>> {
let mut source = vec![];
let mut sources = vec![];
query_plan.get_all_data_source(&mut sources);

let mut collect_read_source = |plan: &PhysicalPlan| {
if let PhysicalPlan::TableScan(scan) = plan {
source.push(*scan.source.clone())
}
};

PhysicalPlan::traverse(
query_plan,
&mut |_| true,
&mut collect_read_source,
&mut |_| {},
);

if source.len() != 1 {
if sources.len() != 1 {
Err(ErrorCode::Internal(
"Invalid source with multiple table scan when do refresh aggregating index"
.to_string(),
))
} else {
let mut source = source.remove(0);
let (_, mut source) = sources.remove(0);
let partitions = match segments {
Some(segment_locs) if !segment_locs.is_empty() => {
let segment_locations = create_segment_location_vector(segment_locs, None);
Expand Down Expand Up @@ -187,7 +177,7 @@ impl RefreshIndexInterpreter {
};

if !source.parts.is_empty() {
Ok(Some(source))
Ok(Some(Box::into_inner(source)))
} else {
Ok(None)
}
Expand Down Expand Up @@ -278,10 +268,8 @@ impl Interpreter for RefreshIndexInterpreter {

let new_index_meta = self.update_index_meta(&new_read_source)?;

let mut replace_read_source = ReadSourceReplacer {
source: new_read_source,
};
query_plan = replace_read_source.replace(&query_plan)?;
let mut handle = ReadSourceDeriveHandle::new(new_read_source);
query_plan = query_plan.derive_with(&mut handle);

let mut build_res =
build_query_pipeline_without_render_result_set(&self.ctx, &query_plan).await?;
Expand Down Expand Up @@ -377,14 +365,29 @@ async fn modify_last_update(ctx: Arc<QueryContext>, req: UpdateIndexReq) -> Resu
Ok(())
}

struct ReadSourceReplacer {
struct ReadSourceDeriveHandle {
source: DataSourcePlan,
}

impl PhysicalPlanReplacer for ReadSourceReplacer {
fn replace_table_scan(&mut self, plan: &TableScan) -> Result<PhysicalPlan> {
let mut plan = plan.clone();
plan.source = Box::new(self.source.clone());
Ok(PhysicalPlan::TableScan(plan))
impl ReadSourceDeriveHandle {
pub fn new(source:DataSourcePlan) -> Box<dyn DeriveHandle> {
Box::new(ReadSourceDeriveHandle { source })
}
}

impl DeriveHandle for ReadSourceDeriveHandle {
fn derive(
&mut self,
v: &Box<dyn IPhysicalPlan>,
children: Vec<Box<dyn IPhysicalPlan>>,
) -> std::result::Result<Box<dyn IPhysicalPlan>, Vec<Box<dyn IPhysicalPlan>>> {
let Some(table_scan) = v.downcast_ref::<TableScan>() else {
return Err(children);
};

Ok(Box::new(TableScan {
source: Box::new(self.source.clone()),
..table_scan.clone()
}))
}
}
Loading