Skip to content

Commit 916e70e

Browse files
authored
refactor: remove project operator (#15479)
* chore: remove project operator * fix * remove project
1 parent d2b3fd9 commit 916e70e

19 files changed

+76
-239
lines changed

src/query/service/src/interpreters/interpreter_copy_into_location.rs

Lines changed: 2 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@ use databend_common_exception::Result;
1919
use databend_common_expression::infer_table_schema;
2020
use databend_common_meta_app::principal::StageInfo;
2121
use databend_common_sql::executor::physical_plans::CopyIntoLocation;
22-
use databend_common_sql::executor::physical_plans::Project;
2322
use databend_common_sql::executor::PhysicalPlan;
2423
use databend_common_storage::StageFilesInfo;
2524
use log::debug;
@@ -82,16 +81,11 @@ impl CopyIntoLocationInterpreter {
8281
let query_physical_plan = query_interpreter.build_physical_plan().await?;
8382
let query_result_schema = query_interpreter.get_result_schema();
8483
let table_schema = infer_table_schema(&query_result_schema)?;
85-
let projected_query_physical_plan = PhysicalPlan::Project(Project::from_columns_binding(
86-
0,
87-
Box::new(query_physical_plan),
88-
query_interpreter.get_result_columns(),
89-
false,
90-
)?);
9184

9285
let mut physical_plan = PhysicalPlan::CopyIntoLocation(Box::new(CopyIntoLocation {
9386
plan_id: 0,
94-
input: Box::new(projected_query_physical_plan),
87+
input: Box::new(query_physical_plan),
88+
project_columns: query_interpreter.get_result_columns(),
9589
input_schema: query_result_schema,
9690
to_stage_info: StageTableInfo {
9791
schema: table_schema,

src/query/service/src/interpreters/interpreter_copy_into_table.rs

Lines changed: 17 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,6 @@ use databend_common_sql::executor::physical_plans::CopyIntoTableSource;
2828
use databend_common_sql::executor::physical_plans::Exchange;
2929
use databend_common_sql::executor::physical_plans::FragmentKind;
3030
use databend_common_sql::executor::physical_plans::MutationKind;
31-
use databend_common_sql::executor::physical_plans::Project;
3231
use databend_common_sql::executor::physical_plans::TableScan;
3332
use databend_common_sql::executor::table_read_plan::ToReadDataSourcePlan;
3433
use databend_common_sql::executor::PhysicalPlan;
@@ -106,20 +105,16 @@ impl CopyIntoTableInterpreter {
106105
)
107106
.await?;
108107
let mut update_stream_meta_reqs = vec![];
109-
let source = if let Some(ref query) = plan.query {
108+
let (source, project_columns) = if let Some(ref query) = plan.query {
110109
let (query_interpreter, update_stream_meta) = self.build_query(query).await?;
111110
update_stream_meta_reqs = update_stream_meta;
112111
let query_physical_plan = Box::new(query_interpreter.build_physical_plan().await?);
113112

114113
let result_columns = query_interpreter.get_result_columns();
115-
CopyIntoTableSource::Query(Box::new(PhysicalPlan::Project(
116-
Project::from_columns_binding(
117-
0,
118-
query_physical_plan,
119-
result_columns,
120-
query_interpreter.get_ignore_result(),
121-
)?,
122-
)))
114+
(
115+
CopyIntoTableSource::Query(query_physical_plan),
116+
Some(result_columns),
117+
)
123118
} else {
124119
let stage_table = StageTable::try_create(plan.stage_table_info.clone())?;
125120

@@ -139,14 +134,17 @@ impl CopyIntoTableInterpreter {
139134
name_mapping.insert(field.name.clone(), idx);
140135
}
141136

142-
CopyIntoTableSource::Stage(Box::new(PhysicalPlan::TableScan(TableScan {
143-
plan_id: 0,
144-
name_mapping,
145-
stat_info: None,
146-
table_index: None,
147-
internal_column: None,
148-
source: Box::new(data_source_plan),
149-
})))
137+
(
138+
CopyIntoTableSource::Stage(Box::new(PhysicalPlan::TableScan(TableScan {
139+
plan_id: 0,
140+
name_mapping,
141+
stat_info: None,
142+
table_index: None,
143+
internal_column: None,
144+
source: Box::new(data_source_plan),
145+
}))),
146+
None,
147+
)
150148
};
151149

152150
let mut root = PhysicalPlan::CopyIntoTable(Box::new(CopyIntoTable {
@@ -160,7 +158,7 @@ impl CopyIntoTableInterpreter {
160158
force: plan.force,
161159
write_mode: plan.write_mode,
162160
validation_mode: plan.validation_mode.clone(),
163-
161+
project_columns,
164162
source,
165163
}));
166164

src/query/service/src/interpreters/interpreter_insert_multi_table.rs

Lines changed: 7 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,6 @@ use databend_common_sql::executor::physical_plans::ChunkFillAndReorder;
3434
use databend_common_sql::executor::physical_plans::ChunkMerge;
3535
use databend_common_sql::executor::physical_plans::FillAndReorder;
3636
use databend_common_sql::executor::physical_plans::MultiInsertEvalScalar;
37-
use databend_common_sql::executor::physical_plans::Project;
3837
use databend_common_sql::executor::physical_plans::SerializableTable;
3938
use databend_common_sql::executor::physical_plans::ShuffleStrategy;
4039
use databend_common_sql::executor::PhysicalPlan;
@@ -44,6 +43,7 @@ use databend_common_sql::plans::FunctionCall;
4443
use databend_common_sql::plans::InsertMultiTable;
4544
use databend_common_sql::plans::Into;
4645
use databend_common_sql::plans::Plan;
46+
use databend_common_sql::BindContext;
4747
use databend_common_sql::MetadataRef;
4848
use databend_common_sql::ScalarExpr;
4949

@@ -104,7 +104,7 @@ impl Interpreter for InsertMultiTableInterpreter {
104104

105105
impl InsertMultiTableInterpreter {
106106
async fn build_physical_plan(&self) -> Result<PhysicalPlan> {
107-
let (mut root, metadata) = self.build_source_physical_plan().await?;
107+
let (mut root, metadata, bind_ctx) = self.build_source_physical_plan().await?;
108108
let update_stream_meta = build_update_stream_meta_seq(self.ctx.clone(), &metadata).await?;
109109
let source_schema = root.output_schema()?;
110110
let branches = self.build_insert_into_branches().await?;
@@ -150,6 +150,7 @@ impl InsertMultiTableInterpreter {
150150
root = PhysicalPlan::Duplicate(Box::new(Duplicate {
151151
plan_id: 0,
152152
input: Box::new(root),
153+
project_columns: bind_ctx.columns.clone(),
153154
n: branches.len(),
154155
}));
155156

@@ -207,7 +208,9 @@ impl InsertMultiTableInterpreter {
207208
Ok(root)
208209
}
209210

210-
async fn build_source_physical_plan(&self) -> Result<(PhysicalPlan, MetadataRef)> {
211+
async fn build_source_physical_plan(
212+
&self,
213+
) -> Result<(PhysicalPlan, MetadataRef, Box<BindContext>)> {
211214
match &self.plan.input_source {
212215
Plan::Query {
213216
s_expr,
@@ -218,21 +221,7 @@ impl InsertMultiTableInterpreter {
218221
let mut builder1 =
219222
PhysicalPlanBuilder::new(metadata.clone(), self.ctx.clone(), false);
220223
let input_source = builder1.build(s_expr, bind_context.column_set()).await?;
221-
let input_schema = input_source.output_schema()?;
222-
let mut projections = Vec::with_capacity(input_schema.num_fields());
223-
for col in &bind_context.columns {
224-
let index = col.index;
225-
projections.push(input_schema.index_of(index.to_string().as_str())?);
226-
}
227-
let rendered_input_source = PhysicalPlan::Project(Project {
228-
plan_id: 0,
229-
input: Box::new(input_source),
230-
projections,
231-
ignore_result: false,
232-
columns: Default::default(),
233-
stat_info: None,
234-
});
235-
Ok((rendered_input_source, metadata.clone()))
224+
Ok((input_source, metadata.clone(), bind_context.clone()))
236225
}
237226
_ => unreachable!(),
238227
}

src/query/service/src/pipelines/builders/builder_copy_into_location.rs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,15 @@ impl PipelineBuilder {
2424
pub(crate) fn build_copy_into_location(&mut self, copy: &CopyIntoLocation) -> Result<()> {
2525
self.build_pipeline(&copy.input)?;
2626

27+
// Reorder the result for select clause
28+
PipelineBuilder::build_result_projection(
29+
&self.func_ctx,
30+
copy.input.output_schema()?,
31+
&copy.project_columns,
32+
&mut self.main_pipeline,
33+
false,
34+
)?;
35+
2736
let to_table = StageTable::try_create(copy.to_stage_info.clone())?;
2837
PipelineBuilder::build_append2table_with_commit_pipeline(
2938
self.ctx.clone(),

src/query/service/src/pipelines/builders/builder_copy_into_table.rs

Lines changed: 23 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,10 @@ use databend_common_catalog::table::AppendMode;
2020
use databend_common_catalog::table::Table;
2121
use databend_common_catalog::table_context::TableContext;
2222
use databend_common_exception::Result;
23+
use databend_common_expression::DataField;
2324
use databend_common_expression::DataSchema;
2425
use databend_common_expression::DataSchemaRef;
26+
use databend_common_expression::DataSchemaRefExt;
2527
use databend_common_expression::Scalar;
2628
use databend_common_meta_app::principal::StageInfo;
2729
use databend_common_meta_app::schema::TableCopiedFileInfo;
@@ -48,7 +50,27 @@ impl PipelineBuilder {
4850
let source_schema = match &copy.source {
4951
CopyIntoTableSource::Query(input) => {
5052
self.build_pipeline(input)?;
51-
input.output_schema()?
53+
// Reorder the result for select clause
54+
PipelineBuilder::build_result_projection(
55+
&self.func_ctx,
56+
input.output_schema()?,
57+
copy.project_columns.as_ref().unwrap(),
58+
&mut self.main_pipeline,
59+
false,
60+
)?;
61+
let fields = copy
62+
.project_columns
63+
.as_ref()
64+
.unwrap()
65+
.iter()
66+
.map(|column_binding| {
67+
DataField::new(
68+
&column_binding.column_name,
69+
*column_binding.data_type.clone(),
70+
)
71+
})
72+
.collect();
73+
DataSchemaRefExt::create(fields)
5274
}
5375
CopyIntoTableSource::Stage(input) => {
5476
self.ctx

src/query/service/src/pipelines/builders/builder_insert_multi_table.rs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,16 @@ use crate::sql::evaluator::CompoundBlockOperator;
4242
impl PipelineBuilder {
4343
pub(crate) fn build_duplicate(&mut self, plan: &Duplicate) -> Result<()> {
4444
self.build_pipeline(&plan.input)?;
45+
46+
// Reorder the result for select clause
47+
PipelineBuilder::build_result_projection(
48+
&self.func_ctx,
49+
plan.input.output_schema()?,
50+
&plan.project_columns,
51+
&mut self.main_pipeline,
52+
false,
53+
)?;
54+
4555
self.main_pipeline.duplicate(true, plan.n)?;
4656
Ok(())
4757
}

src/query/service/src/pipelines/builders/builder_merge_into.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -118,7 +118,6 @@ impl PipelineBuilder {
118118
// 2.2 matched only: rowids and MutationLogs
119119
// 2.3 insert only: MutationLogs
120120
self.build_pipeline(input)?;
121-
self.main_pipeline.try_resize(1)?;
122121

123122
// deserialize MixRowIdKindAndLog
124123
if *change_join_order {

src/query/service/src/pipelines/builders/builder_project.rs

Lines changed: 0 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -21,37 +21,13 @@ use databend_common_pipeline_core::Pipeline;
2121
use databend_common_pipeline_sinks::EmptySink;
2222
use databend_common_sql::evaluator::BlockOperator;
2323
use databend_common_sql::evaluator::CompoundBlockOperator;
24-
use databend_common_sql::executor::physical_plans::Project;
2524
use databend_common_sql::executor::physical_plans::ProjectSet;
2625
use databend_common_sql::ColumnBinding;
2726

2827
use crate::pipelines::processors::transforms::TransformSRF;
2928
use crate::pipelines::PipelineBuilder;
3029

3130
impl PipelineBuilder {
32-
pub(crate) fn build_project(&mut self, project: &Project) -> Result<()> {
33-
self.build_pipeline(&project.input)?;
34-
35-
if project.ignore_result {
36-
return self
37-
.main_pipeline
38-
.add_sink(|input| Ok(ProcessorPtr::create(EmptySink::create(input))));
39-
}
40-
41-
let num_input_columns = project.input.output_schema()?.num_fields();
42-
self.main_pipeline.add_transform(|input, output| {
43-
Ok(ProcessorPtr::create(CompoundBlockOperator::create(
44-
input,
45-
output,
46-
num_input_columns,
47-
self.func_ctx.clone(),
48-
vec![BlockOperator::Project {
49-
projection: project.projections.clone(),
50-
}],
51-
)))
52-
})
53-
}
54-
5531
pub fn build_result_projection(
5632
func_ctx: &FunctionContext,
5733
input_schema: DataSchemaRef,

src/query/service/src/pipelines/pipeline_builder.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -126,7 +126,6 @@ impl PipelineBuilder {
126126
PhysicalPlan::CteScan(scan) => self.build_cte_scan(scan),
127127
PhysicalPlan::ConstantTableScan(scan) => self.build_constant_table_scan(scan),
128128
PhysicalPlan::Filter(filter) => self.build_filter(filter),
129-
PhysicalPlan::Project(project) => self.build_project(project),
130129
PhysicalPlan::EvalScalar(eval_scalar) => self.build_eval_scalar(eval_scalar),
131130
PhysicalPlan::AggregateExpand(aggregate) => self.build_aggregate_expand(aggregate),
132131
PhysicalPlan::AggregatePartial(aggregate) => self.build_aggregate_partial(aggregate),

src/query/service/tests/it/pipelines/builders/runtime_filter.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,6 @@ fn find_join(plan: &PhysicalPlan) -> Result<HashJoin> {
6464
match plan {
6565
PhysicalPlan::HashJoin(join) => Ok(join.clone()),
6666
PhysicalPlan::Filter(plan) => find_join(plan.input.as_ref()),
67-
PhysicalPlan::Project(plan) => find_join(plan.input.as_ref()),
6867
PhysicalPlan::EvalScalar(plan) => find_join(plan.input.as_ref()),
6968
PhysicalPlan::ProjectSet(plan) => find_join(plan.input.as_ref()),
7069
PhysicalPlan::AggregateExpand(plan) => find_join(plan.input.as_ref()),

0 commit comments

Comments
 (0)