Skip to content

Commit a892f0a

Browse files
authored
refactor(query): add copy into location physical plan (#15010)
* reactor(query): add copy into location physical plan * reactor(query): add copy into location physical plan * reactor(query): add copy into location physical plan
1 parent e44d212 commit a892f0a

13 files changed

+189
-54
lines changed

src/query/service/src/interpreters/interpreter_copy_into_location.rs

Lines changed: 38 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -15,22 +15,20 @@
1515
use std::sync::Arc;
1616

1717
use databend_common_catalog::plan::StageTableInfo;
18-
use databend_common_catalog::table::AppendMode;
1918
use databend_common_exception::Result;
2019
use databend_common_expression::infer_table_schema;
21-
use databend_common_expression::DataField;
22-
use databend_common_expression::DataSchemaRef;
23-
use databend_common_expression::DataSchemaRefExt;
2420
use databend_common_meta_app::principal::StageInfo;
21+
use databend_common_sql::executor::physical_plans::CopyIntoLocation;
22+
use databend_common_sql::executor::physical_plans::Project;
23+
use databend_common_sql::executor::PhysicalPlan;
2524
use databend_common_storage::StageFilesInfo;
26-
use databend_common_storages_stage::StageTable;
2725
use log::debug;
2826

2927
use crate::interpreters::common::check_deduplicate_label;
3028
use crate::interpreters::Interpreter;
3129
use crate::interpreters::SelectInterpreter;
3230
use crate::pipelines::PipelineBuildResult;
33-
use crate::pipelines::PipelineBuilder;
31+
use crate::schedulers::build_query_pipeline_without_render_result_set;
3432
use crate::sessions::QueryContext;
3533
use crate::sessions::TableContext;
3634
use crate::sql::plans::CopyIntoLocationPlan;
@@ -48,7 +46,7 @@ impl CopyIntoLocationInterpreter {
4846
}
4947

5048
#[async_backtrace::framed]
51-
async fn build_query(&self, query: &Plan) -> Result<(SelectInterpreter, DataSchemaRef)> {
49+
async fn build_query(&self, query: &Plan) -> Result<SelectInterpreter> {
5250
let (s_expr, metadata, bind_context, formatted_ast) = match query {
5351
Plan::Query {
5452
s_expr,
@@ -69,21 +67,7 @@ impl CopyIntoLocationInterpreter {
6967
false,
7068
)?;
7169

72-
// Building data schema from bind_context columns
73-
// TODO(leiyskey): Extract the following logic as new API of BindContext.
74-
let fields = bind_context
75-
.columns
76-
.iter()
77-
.map(|column_binding| {
78-
DataField::new(
79-
&column_binding.column_name,
80-
*column_binding.data_type.clone(),
81-
)
82-
})
83-
.collect();
84-
let data_schema = DataSchemaRefExt::create(fields);
85-
86-
Ok((select_interpreter, data_schema))
70+
Ok(select_interpreter)
8771
}
8872

8973
/// Build a pipeline for local copy into stage.
@@ -94,36 +78,39 @@ impl CopyIntoLocationInterpreter {
9478
path: &str,
9579
query: &Plan,
9680
) -> Result<PipelineBuildResult> {
97-
let (select_interpreter, data_schema) = self.build_query(query).await?;
98-
let plan = select_interpreter.build_physical_plan().await?;
99-
let mut build_res = select_interpreter.build_pipeline(plan).await?;
100-
let table_schema = infer_table_schema(&data_schema)?;
101-
let stage_table_info = StageTableInfo {
102-
schema: table_schema,
103-
stage_info: stage.clone(),
104-
files_info: StageFilesInfo {
105-
path: path.to_string(),
106-
files: None,
107-
pattern: None,
108-
},
109-
files_to_copy: None,
110-
duplicated_files_detected: vec![],
111-
is_select: false,
112-
default_values: None,
113-
};
114-
let to_table = StageTable::try_create(stage_table_info)?;
115-
PipelineBuilder::build_append2table_with_commit_pipeline(
116-
self.ctx.clone(),
117-
&mut build_res.main_pipeline,
118-
to_table,
119-
data_schema,
120-
None,
121-
vec![],
81+
let query_interpreter = self.build_query(query).await?;
82+
let query_physical_plan = query_interpreter.build_physical_plan().await?;
83+
let query_result_schema = query_interpreter.get_result_schema();
84+
let table_schema = infer_table_schema(&query_result_schema)?;
85+
let projected_query_physical_plan = PhysicalPlan::Project(Project::from_columns_binding(
86+
0,
87+
Box::new(query_physical_plan),
88+
query_interpreter.get_result_columns(),
12289
false,
123-
AppendMode::Normal,
124-
unsafe { self.ctx.get_settings().get_deduplicate_label()? },
125-
)?;
126-
Ok(build_res)
90+
)?);
91+
92+
let mut physical_plan = PhysicalPlan::CopyIntoLocation(Box::new(CopyIntoLocation {
93+
plan_id: 0,
94+
input: Box::new(projected_query_physical_plan),
95+
input_schema: query_result_schema,
96+
to_stage_info: StageTableInfo {
97+
schema: table_schema,
98+
stage_info: stage.clone(),
99+
files_info: StageFilesInfo {
100+
path: path.to_string(),
101+
files: None,
102+
pattern: None,
103+
},
104+
files_to_copy: None,
105+
duplicated_files_detected: vec![],
106+
is_select: false,
107+
default_values: None,
108+
},
109+
}));
110+
111+
let mut next_plan_id = 0;
112+
physical_plan.adjust_plan_id(&mut next_plan_id);
113+
build_query_pipeline_without_render_result_set(&self.ctx, &physical_plan).await
127114
}
128115
}
129116

src/query/service/src/interpreters/interpreter_select.rs

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,9 @@ use databend_common_catalog::table::Table;
1818
use databend_common_exception::ErrorCode;
1919
use databend_common_exception::Result;
2020
use databend_common_expression::infer_table_schema;
21+
use databend_common_expression::DataField;
22+
use databend_common_expression::DataSchemaRef;
23+
use databend_common_expression::DataSchemaRefExt;
2124
use databend_common_expression::TableSchemaRef;
2225
use databend_common_meta_store::MetaStore;
2326
use databend_common_pipeline_core::processors::InputPort;
@@ -84,6 +87,23 @@ impl SelectInterpreter {
8487
self.bind_context.columns.clone()
8588
}
8689

90+
pub fn get_result_schema(&self) -> DataSchemaRef {
91+
// Building data schema from bind_context columns
92+
// TODO(leiyskey): Extract the following logic as new API of BindContext.
93+
let fields = self
94+
.bind_context
95+
.columns
96+
.iter()
97+
.map(|column_binding| {
98+
DataField::new(
99+
&column_binding.column_name,
100+
*column_binding.data_type.clone(),
101+
)
102+
})
103+
.collect();
104+
DataSchemaRefExt::create(fields)
105+
}
106+
87107
#[inline]
88108
#[async_backtrace::framed]
89109
pub async fn build_physical_plan(&self) -> Result<PhysicalPlan> {
Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
// Copyright 2021 Datafuse Labs
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
use databend_common_catalog::table::AppendMode;
16+
use databend_common_catalog::table_context::TableContext;
17+
use databend_common_exception::Result;
18+
use databend_common_sql::executor::physical_plans::CopyIntoLocation;
19+
use databend_common_storages_stage::StageTable;
20+
21+
use crate::pipelines::PipelineBuilder;
22+
23+
impl PipelineBuilder {
24+
pub(crate) fn build_copy_into_location(&mut self, copy: &CopyIntoLocation) -> Result<()> {
25+
self.build_pipeline(&copy.input)?;
26+
27+
let to_table = StageTable::try_create(copy.to_stage_info.clone())?;
28+
PipelineBuilder::build_append2table_with_commit_pipeline(
29+
self.ctx.clone(),
30+
&mut self.main_pipeline,
31+
to_table,
32+
copy.input_schema.clone(),
33+
None,
34+
vec![],
35+
false,
36+
AppendMode::Normal,
37+
unsafe { self.ctx.get_settings().get_deduplicate_label()? },
38+
)
39+
}
40+
}

src/query/service/src/pipelines/builders/mod.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,8 @@ mod builder_aggregate;
1616
mod builder_append_table;
1717
mod builder_commit;
1818
mod builder_compact;
19-
mod builder_copy_into;
19+
mod builder_copy_into_location;
20+
mod builder_copy_into_table;
2021
mod builder_delete;
2122
mod builder_distributed_insert_select;
2223
mod builder_exchange;

src/query/service/src/pipelines/pipeline_builder.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -152,6 +152,7 @@ impl PipelineBuilder {
152152

153153
// Copy into.
154154
PhysicalPlan::CopyIntoTable(copy) => self.build_copy_into_table(copy),
155+
PhysicalPlan::CopyIntoLocation(copy) => self.build_copy_into_location(copy),
155156

156157
// Delete.
157158
PhysicalPlan::DeleteSource(delete) => self.build_delete_source(delete),

src/query/sql/src/executor/format.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ use crate::executor::physical_plans::AggregateFunctionDesc;
3030
use crate::executor::physical_plans::AggregatePartial;
3131
use crate::executor::physical_plans::CommitSink;
3232
use crate::executor::physical_plans::ConstantTableScan;
33+
use crate::executor::physical_plans::CopyIntoLocation;
3334
use crate::executor::physical_plans::CopyIntoTable;
3435
use crate::executor::physical_plans::CteScan;
3536
use crate::executor::physical_plans::DistributedInsertSelect;
@@ -215,6 +216,7 @@ fn to_format_tree(
215216
PhysicalPlan::Udf(plan) => udf_to_format_tree(plan, metadata, profs),
216217
PhysicalPlan::RangeJoin(plan) => range_join_to_format_tree(plan, metadata, profs),
217218
PhysicalPlan::CopyIntoTable(plan) => copy_into_table(plan),
219+
PhysicalPlan::CopyIntoLocation(plan) => copy_into_location(plan),
218220
PhysicalPlan::ReplaceAsyncSourcer(_) => {
219221
Ok(FormatTreeNode::new("ReplaceAsyncSourcer".to_string()))
220222
}
@@ -264,6 +266,10 @@ fn copy_into_table(plan: &CopyIntoTable) -> Result<FormatTreeNode<String>> {
264266
)))
265267
}
266268

269+
fn copy_into_location(_: &CopyIntoLocation) -> Result<FormatTreeNode<String>> {
270+
Ok(FormatTreeNode::new("CopyIntoLocation".to_string()))
271+
}
272+
267273
fn table_scan_to_format_tree(
268274
plan: &TableScan,
269275
metadata: &Metadata,

src/query/sql/src/executor/physical_plan.rs

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ use crate::executor::physical_plans::AggregatePartial;
2727
use crate::executor::physical_plans::CommitSink;
2828
use crate::executor::physical_plans::CompactSource;
2929
use crate::executor::physical_plans::ConstantTableScan;
30+
use crate::executor::physical_plans::CopyIntoLocation;
3031
use crate::executor::physical_plans::CopyIntoTable;
3132
use crate::executor::physical_plans::CteScan;
3233
use crate::executor::physical_plans::DeleteSource;
@@ -95,6 +96,7 @@ pub enum PhysicalPlan {
9596

9697
/// Copy into table
9798
CopyIntoTable(Box<CopyIntoTable>),
99+
CopyIntoLocation(Box<CopyIntoLocation>),
98100

99101
/// Replace
100102
ReplaceAsyncSourcer(ReplaceAsyncSourcer),
@@ -243,6 +245,10 @@ impl PhysicalPlan {
243245
plan.plan_id = *next_id;
244246
*next_id += 1;
245247
}
248+
PhysicalPlan::CopyIntoLocation(plan) => {
249+
plan.plan_id = *next_id;
250+
*next_id += 1;
251+
}
246252
PhysicalPlan::DeleteSource(plan) => {
247253
plan.plan_id = *next_id;
248254
*next_id += 1;
@@ -339,6 +345,7 @@ impl PhysicalPlan {
339345
PhysicalPlan::MergeIntoAppendNotMatched(v) => v.plan_id,
340346
PhysicalPlan::CommitSink(v) => v.plan_id,
341347
PhysicalPlan::CopyIntoTable(v) => v.plan_id,
348+
PhysicalPlan::CopyIntoLocation(v) => v.plan_id,
342349
PhysicalPlan::ReplaceAsyncSourcer(v) => v.plan_id,
343350
PhysicalPlan::ReplaceDeduplicate(v) => v.plan_id,
344351
PhysicalPlan::ReplaceInto(v) => v.plan_id,
@@ -370,6 +377,7 @@ impl PhysicalPlan {
370377
PhysicalPlan::ProjectSet(plan) => plan.output_schema(),
371378
PhysicalPlan::RangeJoin(plan) => plan.output_schema(),
372379
PhysicalPlan::CopyIntoTable(plan) => plan.output_schema(),
380+
PhysicalPlan::CopyIntoLocation(plan) => plan.output_schema(),
373381
PhysicalPlan::CteScan(plan) => plan.output_schema(),
374382
PhysicalPlan::MaterializedCte(plan) => plan.output_schema(),
375383
PhysicalPlan::ConstantTableScan(plan) => plan.output_schema(),
@@ -416,6 +424,7 @@ impl PhysicalPlan {
416424
PhysicalPlan::CommitSink(_) => "CommitSink".to_string(),
417425
PhysicalPlan::RangeJoin(_) => "RangeJoin".to_string(),
418426
PhysicalPlan::CopyIntoTable(_) => "CopyIntoTable".to_string(),
427+
PhysicalPlan::CopyIntoLocation(_) => "CopyIntoLocation".to_string(),
419428
PhysicalPlan::ReplaceAsyncSourcer(_) => "ReplaceAsyncSourcer".to_string(),
420429
PhysicalPlan::ReplaceDeduplicate(_) => "ReplaceDeduplicate".to_string(),
421430
PhysicalPlan::ReplaceInto(_) => "Replace".to_string(),
@@ -488,6 +497,7 @@ impl PhysicalPlan {
488497
),
489498
PhysicalPlan::ReclusterSink(plan) => Box::new(std::iter::once(plan.input.as_ref())),
490499
PhysicalPlan::Udf(plan) => Box::new(std::iter::once(plan.input.as_ref())),
500+
PhysicalPlan::CopyIntoLocation(plan) => Box::new(std::iter::once(plan.input.as_ref())),
491501
}
492502
}
493503

@@ -507,6 +517,7 @@ impl PhysicalPlan {
507517
PhysicalPlan::ProjectSet(plan) => plan.input.try_find_single_data_source(),
508518
PhysicalPlan::RowFetch(plan) => plan.input.try_find_single_data_source(),
509519
PhysicalPlan::Udf(plan) => plan.input.try_find_single_data_source(),
520+
PhysicalPlan::CopyIntoLocation(plan) => plan.input.try_find_single_data_source(),
510521
PhysicalPlan::UnionAll(_)
511522
| PhysicalPlan::ExchangeSource(_)
512523
| PhysicalPlan::HashJoin(_)

src/query/sql/src/executor/physical_plan_display.rs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ use crate::executor::physical_plans::AggregatePartial;
2525
use crate::executor::physical_plans::CommitSink;
2626
use crate::executor::physical_plans::CompactSource;
2727
use crate::executor::physical_plans::ConstantTableScan;
28+
use crate::executor::physical_plans::CopyIntoLocation;
2829
use crate::executor::physical_plans::CopyIntoTable;
2930
use crate::executor::physical_plans::CteScan;
3031
use crate::executor::physical_plans::DeleteSource;
@@ -97,6 +98,9 @@ impl<'a> Display for PhysicalPlanIndentFormatDisplay<'a> {
9798
PhysicalPlan::ProjectSet(unnest) => write!(f, "{}", unnest)?,
9899
PhysicalPlan::RangeJoin(plan) => write!(f, "{}", plan)?,
99100
PhysicalPlan::CopyIntoTable(copy_into_table) => write!(f, "{}", copy_into_table)?,
101+
PhysicalPlan::CopyIntoLocation(copy_into_location) => {
102+
write!(f, "{}", copy_into_location)?
103+
}
100104
PhysicalPlan::ReplaceAsyncSourcer(async_sourcer) => write!(f, "{}", async_sourcer)?,
101105
PhysicalPlan::ReplaceDeduplicate(deduplicate) => write!(f, "{}", deduplicate)?,
102106
PhysicalPlan::ReplaceInto(replace) => write!(f, "{}", replace)?,
@@ -434,6 +438,12 @@ impl Display for CopyIntoTable {
434438
}
435439
}
436440

441+
impl Display for CopyIntoLocation {
442+
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
443+
write!(f, "CopyIntoLocation")
444+
}
445+
}
446+
437447
impl Display for ProjectSet {
438448
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
439449
let scalars = self

src/query/sql/src/executor/physical_plan_visitor.rs

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ use crate::executor::physical_plans::AggregatePartial;
2121
use crate::executor::physical_plans::CommitSink;
2222
use crate::executor::physical_plans::CompactSource;
2323
use crate::executor::physical_plans::ConstantTableScan;
24+
use crate::executor::physical_plans::CopyIntoLocation;
2425
use crate::executor::physical_plans::CopyIntoTable;
2526
use crate::executor::physical_plans::CopyIntoTableSource;
2627
use crate::executor::physical_plans::CteScan;
@@ -81,6 +82,7 @@ pub trait PhysicalPlanReplacer {
8182
PhysicalPlan::CommitSink(plan) => self.replace_commit_sink(plan),
8283
PhysicalPlan::RangeJoin(plan) => self.replace_range_join(plan),
8384
PhysicalPlan::CopyIntoTable(plan) => self.replace_copy_into_table(plan),
85+
PhysicalPlan::CopyIntoLocation(plan) => self.replace_copy_into_location(plan),
8486
PhysicalPlan::ReplaceAsyncSourcer(plan) => self.replace_async_sourcer(plan),
8587
PhysicalPlan::ReplaceDeduplicate(plan) => self.replace_deduplicate(plan),
8688
PhysicalPlan::ReplaceInto(plan) => self.replace_replace_into(plan),
@@ -377,6 +379,17 @@ pub trait PhysicalPlanReplacer {
377379
}
378380
}
379381

382+
fn replace_copy_into_location(&mut self, plan: &CopyIntoLocation) -> Result<PhysicalPlan> {
383+
let input = self.replace(&plan.input)?;
384+
385+
Ok(PhysicalPlan::CopyIntoLocation(Box::new(CopyIntoLocation {
386+
plan_id: plan.plan_id,
387+
input: Box::new(input),
388+
input_schema: plan.input_schema.clone(),
389+
to_stage_info: plan.to_stage_info.clone(),
390+
})))
391+
}
392+
380393
fn replace_insert_select(&mut self, plan: &DistributedInsertSelect) -> Result<PhysicalPlan> {
381394
let input = self.replace(&plan.input)?;
382395

@@ -573,6 +586,9 @@ impl PhysicalPlan {
573586
}
574587
CopyIntoTableSource::Stage(_) => {}
575588
},
589+
PhysicalPlan::CopyIntoLocation(plan) => {
590+
Self::traverse(&plan.input, pre_visit, visit, post_visit)
591+
}
576592
PhysicalPlan::RangeJoin(plan) => {
577593
Self::traverse(&plan.left, pre_visit, visit, post_visit);
578594
Self::traverse(&plan.right, pre_visit, visit, post_visit);

0 commit comments

Comments
 (0)