Skip to content

Commit 0607283

Browse files
authored
Merge pull request #7527 from RinChanNOWWW/distributed_insert
feat(query): Support distributed insert select in new planner.
2 parents f2a8f0d + 1653b3e commit 0607283

File tree

26 files changed

+383
-107
lines changed

26 files changed

+383
-107
lines changed

src/query/catalog/src/table.rs

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -140,7 +140,12 @@ pub trait Table: Sync + Send {
140140
)))
141141
}
142142

143-
fn append2(&self, _: Arc<dyn TableContext>, _: &mut Pipeline) -> Result<()> {
143+
fn append2(
144+
&self,
145+
_: Arc<dyn TableContext>,
146+
_: &mut Pipeline,
147+
_need_output: bool,
148+
) -> Result<()> {
144149
Err(ErrorCode::UnImplement(format!(
145150
"append2 operation for table {} is not implemented, table engine is {}",
146151
self.name(),

src/query/pipeline/sinks/src/processors/sinks/context_sink.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ impl ContextSink {
3434
}
3535

3636
impl Sink for ContextSink {
37-
const NAME: &'static str = "ContextSink ";
37+
const NAME: &'static str = "ContextSink";
3838

3939
fn consume(&mut self, block: DataBlock) -> Result<()> {
4040
self.ctx.push_precommit_block(block);

src/query/service/src/interpreters/interpreter_common.rs

Lines changed: 30 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ use common_exception::Result;
2525
use common_meta_types::GrantObject;
2626
use common_meta_types::StageFile;
2727
use common_meta_types::UserStageInfo;
28+
use common_pipeline_core::Pipeline;
2829
use futures::TryStreamExt;
2930
use regex::Regex;
3031
use tracing::debug;
@@ -39,28 +40,41 @@ use crate::sessions::TableContext;
3940
use crate::storages::stage::StageSourceHelper;
4041
use crate::storages::Table;
4142

43+
pub fn fill_missing_columns(
44+
ctx: Arc<QueryContext>,
45+
source_schema: &DataSchemaRef,
46+
target_schema: &DataSchemaRef,
47+
pipeline: &mut Pipeline,
48+
) -> Result<()> {
49+
let need_fill_missing_columns = target_schema != source_schema;
50+
if need_fill_missing_columns {
51+
pipeline.add_transform(|transform_input_port, transform_output_port| {
52+
TransformAddOn::try_create(
53+
transform_input_port,
54+
transform_output_port,
55+
source_schema.clone(),
56+
target_schema.clone(),
57+
ctx.clone(),
58+
)
59+
})?;
60+
}
61+
Ok(())
62+
}
63+
4264
pub fn append2table(
4365
ctx: Arc<QueryContext>,
4466
table: Arc<dyn Table>,
4567
source_schema: DataSchemaRef,
4668
mut build_res: PipelineBuildResult,
4769
) -> Result<()> {
48-
let need_fill_missing_columns = table.schema() != source_schema;
49-
if need_fill_missing_columns {
50-
build_res
51-
.main_pipeline
52-
.add_transform(|transform_input_port, transform_output_port| {
53-
TransformAddOn::try_create(
54-
transform_input_port,
55-
transform_output_port,
56-
source_schema.clone(),
57-
table.schema(),
58-
ctx.clone(),
59-
)
60-
})?;
61-
}
62-
63-
table.append2(ctx.clone(), &mut build_res.main_pipeline)?;
70+
fill_missing_columns(
71+
ctx.clone(),
72+
&source_schema,
73+
&table.schema(),
74+
&mut build_res.main_pipeline,
75+
)?;
76+
77+
table.append2(ctx.clone(), &mut build_res.main_pipeline, false)?;
6478
let query_need_abort = ctx.query_need_abort();
6579
let executor_settings = ExecutorSettings::try_create(&ctx.get_settings())?;
6680
build_res.set_max_threads(ctx.get_settings().get_max_threads()? as usize);

src/query/service/src/interpreters/interpreter_copy_v2.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -180,7 +180,7 @@ impl CopyInterpreterV2 {
180180

181181
let table = ctx.get_table(catalog_name, db_name, tbl_name).await?;
182182

183-
table.append2(ctx.clone(), &mut pipeline)?;
183+
table.append2(ctx.clone(), &mut pipeline, false)?;
184184
pipeline.set_max_threads(settings.get_max_threads()? as usize);
185185

186186
let query_need_abort = ctx.query_need_abort();

src/query/service/src/interpreters/interpreter_insert_v2.rs

Lines changed: 98 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -15,27 +15,33 @@
1515
use std::collections::VecDeque;
1616
use std::sync::Arc;
1717

18-
use common_datavalues::DataType;
18+
use common_catalog::table::Table;
1919
use common_exception::ErrorCode;
2020
use common_exception::Result;
21-
use common_functions::scalars::CastFunction;
2221
use common_streams::DataBlockStream;
2322
use common_streams::SendableDataBlockStream;
23+
use futures_util::StreamExt;
2424
use parking_lot::Mutex;
2525

2626
use super::commit2table;
2727
use super::interpreter_common::append2table;
28+
use super::plan_schedulers::build_schedule_pipepline;
29+
use super::ProcessorExecutorStream;
2830
use crate::interpreters::Interpreter;
2931
use crate::interpreters::InterpreterPtr;
30-
use crate::interpreters::SelectInterpreterV2;
32+
use crate::pipelines::executor::ExecutorSettings;
33+
use crate::pipelines::executor::PipelinePullingExecutor;
3134
use crate::pipelines::processors::port::OutputPort;
3235
use crate::pipelines::processors::BlocksSource;
33-
use crate::pipelines::processors::TransformCastSchema;
3436
use crate::pipelines::Pipeline;
3537
use crate::pipelines::PipelineBuildResult;
3638
use crate::pipelines::SourcePipeBuilder;
3739
use crate::sessions::QueryContext;
3840
use crate::sessions::TableContext;
41+
use crate::sql::executor::DistributedInsertSelect;
42+
use crate::sql::executor::PhysicalPlan;
43+
use crate::sql::executor::PhysicalPlanBuilder;
44+
use crate::sql::executor::PipelineBuilder;
3945
use crate::sql::plans::Insert;
4046
use crate::sql::plans::InsertInputSource;
4147
use crate::sql::plans::Plan;
@@ -101,53 +107,7 @@ impl InsertInterpreterV2 {
101107
);
102108
}
103109
InsertInputSource::SelectPlan(plan) => {
104-
let select_interpreter = match &**plan {
105-
Plan::Query {
106-
s_expr,
107-
metadata,
108-
bind_context,
109-
..
110-
} => SelectInterpreterV2::try_create(
111-
self.ctx.clone(),
112-
*bind_context.clone(),
113-
*s_expr.clone(),
114-
metadata.clone(),
115-
),
116-
_ => unreachable!(),
117-
};
118-
119-
build_res = select_interpreter?.create_new_pipeline().await?;
120-
121-
if self.check_schema_cast(plan)? {
122-
let mut functions = Vec::with_capacity(self.plan.schema().fields().len());
123-
124-
for (target_field, original_field) in self
125-
.plan
126-
.schema()
127-
.fields()
128-
.iter()
129-
.zip(plan.schema().fields().iter())
130-
{
131-
let target_type_name = target_field.data_type().name();
132-
let from_type = original_field.data_type().clone();
133-
let cast_function =
134-
CastFunction::create("cast", &target_type_name, from_type)?;
135-
functions.push(cast_function);
136-
}
137-
138-
let func_ctx = self.ctx.try_get_function_context()?;
139-
build_res.main_pipeline.add_transform(
140-
|transform_input_port, transform_output_port| {
141-
TransformCastSchema::try_create(
142-
transform_input_port,
143-
transform_output_port,
144-
self.plan.schema(),
145-
functions.clone(),
146-
func_ctx.clone(),
147-
)
148-
},
149-
)?;
150-
}
110+
return self.schedule_insert_select(plan, table.clone()).await;
151111
}
152112
};
153113
}
@@ -177,6 +137,93 @@ impl InsertInterpreterV2 {
177137
let cast_needed = select_schema != *output_schema;
178138
Ok(cast_needed)
179139
}
140+
141+
async fn schedule_insert_select(
142+
&self,
143+
plan: &Plan,
144+
table: Arc<dyn Table>,
145+
) -> Result<SendableDataBlockStream> {
146+
// select_plan is already distributed optimized
147+
let (mut select_plan, select_column_bindings) = match plan {
148+
Plan::Query {
149+
s_expr,
150+
metadata,
151+
bind_context,
152+
..
153+
} => {
154+
let builder = PhysicalPlanBuilder::new(metadata.clone(), self.ctx.clone());
155+
(builder.build(s_expr).await?, bind_context.columns.clone())
156+
}
157+
_ => unreachable!(),
158+
};
159+
160+
table.get_table_info();
161+
let catalog = self.plan.catalog.clone();
162+
let is_distributed_plan = select_plan.is_distributed_plan();
163+
164+
let insert_select_plan = match select_plan {
165+
PhysicalPlan::Exchange(ref mut exchange) => {
166+
// insert can be dispatched to different nodes
167+
let input = exchange.input.clone();
168+
exchange.input = Box::new(PhysicalPlan::DistributedInsertSelect(Box::new(
169+
DistributedInsertSelect {
170+
input,
171+
catalog,
172+
table_info: table.get_table_info().clone(),
173+
select_schema: plan.schema(),
174+
select_column_bindings,
175+
insert_schema: self.plan.schema(),
176+
cast_needed: self.check_schema_cast(plan)?,
177+
},
178+
)));
179+
select_plan
180+
}
181+
other_plan => {
182+
// insert should wait until all nodes finished
183+
PhysicalPlan::DistributedInsertSelect(Box::new(DistributedInsertSelect {
184+
input: Box::new(other_plan),
185+
catalog,
186+
table_info: table.get_table_info().clone(),
187+
select_schema: plan.schema(),
188+
select_column_bindings,
189+
insert_schema: self.plan.schema(),
190+
cast_needed: self.check_schema_cast(plan)?,
191+
}))
192+
}
193+
};
194+
195+
let mut build_res = if !is_distributed_plan {
196+
let builder = PipelineBuilder::create(self.ctx.clone());
197+
builder.finalize(&insert_select_plan)?
198+
} else {
199+
build_schedule_pipepline(self.ctx.clone(), &insert_select_plan).await?
200+
};
201+
202+
let settings = self.ctx.get_settings();
203+
let query_need_abort = self.ctx.query_need_abort();
204+
let executor_settings = ExecutorSettings::try_create(&settings)?;
205+
build_res.set_max_threads(settings.get_max_threads()? as usize);
206+
207+
let executor = PipelinePullingExecutor::from_pipelines(
208+
query_need_abort,
209+
build_res,
210+
executor_settings,
211+
)?;
212+
213+
let mut stream: SendableDataBlockStream =
214+
Box::pin(ProcessorExecutorStream::create(executor)?);
215+
while let Some(block) = stream.next().await {
216+
block?;
217+
}
218+
219+
commit2table(self.ctx.clone(), table.clone(), self.plan.overwrite).await?;
220+
221+
Ok(Box::pin(DataBlockStream::create(
222+
self.plan.schema(),
223+
None,
224+
vec![],
225+
)))
226+
}
180227
}
181228

182229
#[async_trait::async_trait]

src/query/service/src/interpreters/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -99,6 +99,7 @@ pub use interpreter_cluster_key_drop::DropTableClusterKeyInterpreter;
9999
pub use interpreter_clustering_history::InterpreterClusteringHistory;
100100
pub use interpreter_common::append2table;
101101
pub use interpreter_common::commit2table;
102+
pub use interpreter_common::fill_missing_columns;
102103
pub use interpreter_common::list_files_from_dal;
103104
pub use interpreter_database_create::CreateDatabaseInterpreter;
104105
pub use interpreter_database_drop::DropDatabaseInterpreter;

src/query/service/src/interpreters/plan_schedulers/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
mod plan_scheduler_query;
1616
mod plan_scheduler_rewriter;
1717

18+
pub use plan_scheduler_query::build_schedule_pipepline;
1819
pub use plan_scheduler_query::schedule_query_new;
1920
pub use plan_scheduler_query::schedule_query_v2;
2021
pub use plan_scheduler_rewriter::apply_plan_rewrite;

src/query/service/src/interpreters/plan_schedulers/plan_scheduler_query.rs

Lines changed: 16 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,22 @@ pub async fn schedule_query_v2(
8181
return Ok(build_res);
8282
}
8383

84+
let mut build_res = build_schedule_pipepline(ctx.clone(), plan).await?;
85+
86+
let input_schema = plan.output_schema()?;
87+
PipelineBuilderV2::render_result_set(
88+
input_schema,
89+
result_columns,
90+
&mut build_res.main_pipeline,
91+
)?;
92+
93+
Ok(build_res)
94+
}
95+
96+
pub async fn build_schedule_pipepline(
97+
ctx: Arc<QueryContext>,
98+
plan: &PhysicalPlan,
99+
) -> Result<PipelineBuildResult> {
84100
let fragmenter = Fragmenter::try_create(ctx.clone())?;
85101
let root_fragment = fragmenter.build_fragment(plan)?;
86102

@@ -95,13 +111,5 @@ pub async fn schedule_query_v2(
95111

96112
let settings = ctx.get_settings();
97113
build_res.set_max_threads(settings.get_max_threads()? as usize);
98-
99-
let input_schema = plan.output_schema()?;
100-
PipelineBuilderV2::render_result_set(
101-
input_schema,
102-
result_columns,
103-
&mut build_res.main_pipeline,
104-
)?;
105-
106114
Ok(build_res)
107115
}

src/query/service/src/sql/executor/format.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,9 @@ fn to_format_tree(plan: &PhysicalPlan, metadata: &MetadataRef) -> Result<FormatT
5353
PhysicalPlan::HashJoin(plan) => hash_join_to_format_tree(plan, metadata),
5454
PhysicalPlan::Exchange(plan) => exchange_to_format_tree(plan, metadata),
5555
PhysicalPlan::UnionAll(plan) => union_all_to_format_tree(plan, metadata),
56-
PhysicalPlan::ExchangeSource(_) | PhysicalPlan::ExchangeSink(_) => {
56+
PhysicalPlan::ExchangeSource(_)
57+
| PhysicalPlan::ExchangeSink(_)
58+
| PhysicalPlan::DistributedInsertSelect(_) => {
5759
Err(ErrorCode::LogicalError("Invalid physical plan"))
5860
}
5961
}

0 commit comments

Comments
 (0)