Skip to content

Commit e94fb53

Browse files
committed
Implement a skeleton of distributed insert select.
1 parent 6b620fb commit e94fb53

File tree

10 files changed

+235
-23
lines changed

10 files changed

+235
-23
lines changed

src/query/service/src/interpreters/interpreter_common.rs

Lines changed: 28 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ use common_exception::Result;
2525
use common_meta_types::GrantObject;
2626
use common_meta_types::StageFile;
2727
use common_meta_types::UserStageInfo;
28+
use common_pipeline_core::Pipeline;
2829
use futures::TryStreamExt;
2930
use regex::Regex;
3031
use tracing::debug;
@@ -39,26 +40,39 @@ use crate::sessions::TableContext;
3940
use crate::storages::stage::StageSourceHelper;
4041
use crate::storages::Table;
4142

43+
pub fn fill_missing_columns(
44+
ctx: Arc<QueryContext>,
45+
source_schema: &DataSchemaRef,
46+
target_schema: &DataSchemaRef,
47+
pipeline: &mut Pipeline,
48+
) -> Result<()> {
49+
let need_fill_missing_columns = target_schema != source_schema;
50+
if need_fill_missing_columns {
51+
pipeline.add_transform(|transform_input_port, transform_output_port| {
52+
TransformAddOn::try_create(
53+
transform_input_port,
54+
transform_output_port,
55+
source_schema.clone(),
56+
target_schema.clone(),
57+
ctx.clone(),
58+
)
59+
})?;
60+
}
61+
Ok(())
62+
}
63+
4264
pub fn append2table(
4365
ctx: Arc<QueryContext>,
4466
table: Arc<dyn Table>,
4567
source_schema: DataSchemaRef,
4668
mut build_res: PipelineBuildResult,
4769
) -> Result<()> {
48-
let need_fill_missing_columns = table.schema() != source_schema;
49-
if need_fill_missing_columns {
50-
build_res
51-
.main_pipeline
52-
.add_transform(|transform_input_port, transform_output_port| {
53-
TransformAddOn::try_create(
54-
transform_input_port,
55-
transform_output_port,
56-
source_schema.clone(),
57-
table.schema(),
58-
ctx.clone(),
59-
)
60-
})?;
61-
}
70+
fill_missing_columns(
71+
ctx.clone(),
72+
&source_schema,
73+
&table.schema(),
74+
&mut build_res.main_pipeline,
75+
)?;
6276

6377
table.append2(ctx.clone(), &mut build_res.main_pipeline)?;
6478
let query_need_abort = ctx.query_need_abort();

src/query/service/src/interpreters/interpreter_insert_v2.rs

Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,19 +15,25 @@
1515
use std::collections::VecDeque;
1616
use std::sync::Arc;
1717

18+
use common_catalog::table::Table;
1819
use common_datavalues::DataType;
1920
use common_exception::ErrorCode;
2021
use common_exception::Result;
2122
use common_functions::scalars::CastFunction;
23+
use common_planners::StageKind;
2224
use common_streams::DataBlockStream;
2325
use common_streams::SendableDataBlockStream;
2426
use parking_lot::Mutex;
2527

2628
use super::commit2table;
2729
use super::interpreter_common::append2table;
30+
use super::plan_schedulers::build_schedule_pipepline;
31+
use crate::clusters::ClusterHelper;
2832
use crate::interpreters::Interpreter;
2933
use crate::interpreters::InterpreterPtr;
3034
use crate::interpreters::SelectInterpreterV2;
35+
use crate::pipelines::executor::ExecutorSettings;
36+
use crate::pipelines::executor::PipelineCompleteExecutor;
3137
use crate::pipelines::processors::port::OutputPort;
3238
use crate::pipelines::processors::BlocksSource;
3339
use crate::pipelines::processors::TransformCastSchema;
@@ -36,6 +42,10 @@ use crate::pipelines::PipelineBuildResult;
3642
use crate::pipelines::SourcePipeBuilder;
3743
use crate::sessions::QueryContext;
3844
use crate::sessions::TableContext;
45+
use crate::sql::executor::DistributedInsertSelect;
46+
use crate::sql::executor::Exchange;
47+
use crate::sql::executor::PhysicalPlan;
48+
use crate::sql::executor::PhysicalPlanBuilder;
3949
use crate::sql::plans::Insert;
4050
use crate::sql::plans::InsertInputSource;
4151
use crate::sql::plans::Plan;
@@ -101,6 +111,13 @@ impl InsertInterpreterV2 {
101111
);
102112
}
103113
InsertInputSource::SelectPlan(plan) => {
114+
if !self.ctx.get_cluster().is_empty() {
115+
// distributed insert select
116+
return self
117+
.schedule_insert_select(plan, self.plan.catalog.clone(), table.clone())
118+
.await;
119+
}
120+
104121
let select_interpreter = match &**plan {
105122
Plan::Query {
106123
s_expr,
@@ -177,6 +194,62 @@ impl InsertInterpreterV2 {
177194
let cast_needed = select_schema != *output_schema;
178195
Ok(cast_needed)
179196
}
197+
198+
async fn schedule_insert_select(
199+
&self,
200+
plan: &Box<Plan>,
201+
catalog: String,
202+
table: Arc<dyn Table>,
203+
) -> Result<SendableDataBlockStream> {
204+
let inner_plan = match &**plan {
205+
Plan::Query {
206+
s_expr, metadata, ..
207+
} => {
208+
let builder = PhysicalPlanBuilder::new(metadata.clone(), self.ctx.clone());
209+
builder.build(s_expr).await?
210+
}
211+
_ => unreachable!(),
212+
};
213+
214+
table.get_table_info();
215+
216+
let insert_select_plan = PhysicalPlan::DistributedInsertSelect(DistributedInsertSelect {
217+
input: Box::new(inner_plan),
218+
catalog,
219+
table_info: table.get_table_info().clone(),
220+
select_schema: plan.schema(),
221+
insert_schema: self.schema(),
222+
cast_needed: self.check_schema_cast(plan)?,
223+
});
224+
225+
let final_plan = PhysicalPlan::Exchange(Exchange {
226+
input: Box::new(insert_select_plan),
227+
kind: StageKind::Merge,
228+
keys: vec![],
229+
});
230+
231+
let mut build_res = build_schedule_pipepline(self.ctx.clone(), &final_plan).await?;
232+
233+
let settings = self.ctx.get_settings();
234+
let query_need_abort = self.ctx.query_need_abort();
235+
let executor_settings = ExecutorSettings::try_create(&settings)?;
236+
build_res.set_max_threads(settings.get_max_threads()? as usize);
237+
let mut pipelines = build_res.sources_pipelines;
238+
pipelines.push(build_res.main_pipeline);
239+
let executor = PipelineCompleteExecutor::from_pipelines(
240+
query_need_abort,
241+
pipelines,
242+
executor_settings,
243+
)?;
244+
executor.execute()?;
245+
commit2table(self.ctx.clone(), table.clone(), self.plan.overwrite).await?;
246+
247+
Ok(Box::pin(DataBlockStream::create(
248+
self.plan.schema(),
249+
None,
250+
vec![],
251+
)))
252+
}
180253
}
181254

182255
#[async_trait::async_trait]

src/query/service/src/interpreters/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -99,6 +99,7 @@ pub use interpreter_cluster_key_drop::DropTableClusterKeyInterpreter;
9999
pub use interpreter_clustering_history::InterpreterClusteringHistory;
100100
pub use interpreter_common::append2table;
101101
pub use interpreter_common::commit2table;
102+
pub use interpreter_common::fill_missing_columns;
102103
pub use interpreter_common::list_files_from_dal;
103104
pub use interpreter_database_create::CreateDatabaseInterpreter;
104105
pub use interpreter_database_drop::DropDatabaseInterpreter;

src/query/service/src/interpreters/plan_schedulers/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
mod plan_scheduler_query;
1616
mod plan_scheduler_rewriter;
1717

18+
pub use plan_scheduler_query::build_schedule_pipepline;
1819
pub use plan_scheduler_query::schedule_query_new;
1920
pub use plan_scheduler_query::schedule_query_v2;
2021
pub use plan_scheduler_rewriter::apply_plan_rewrite;

src/query/service/src/interpreters/plan_schedulers/plan_scheduler_query.rs

Lines changed: 16 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,22 @@ pub async fn schedule_query_v2(
8181
return Ok(build_res);
8282
}
8383

84+
let mut build_res = build_schedule_pipepline(ctx.clone(), plan).await?;
85+
86+
let input_schema = plan.output_schema()?;
87+
PipelineBuilderV2::render_result_set(
88+
input_schema,
89+
result_columns,
90+
&mut build_res.main_pipeline,
91+
)?;
92+
93+
Ok(build_res)
94+
}
95+
96+
pub async fn build_schedule_pipepline(
97+
ctx: Arc<QueryContext>,
98+
plan: &PhysicalPlan,
99+
) -> Result<PipelineBuildResult> {
84100
let fragmenter = Fragmenter::try_create(ctx.clone())?;
85101
let root_fragment = fragmenter.build_fragment(plan)?;
86102

@@ -95,13 +111,5 @@ pub async fn schedule_query_v2(
95111

96112
let settings = ctx.get_settings();
97113
build_res.set_max_threads(settings.get_max_threads()? as usize);
98-
99-
let input_schema = plan.output_schema()?;
100-
PipelineBuilderV2::render_result_set(
101-
input_schema,
102-
result_columns,
103-
&mut build_res.main_pipeline,
104-
)?;
105-
106114
Ok(build_res)
107115
}

src/query/service/src/sql/executor/format.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,9 @@ fn to_format_tree(plan: &PhysicalPlan, metadata: &MetadataRef) -> Result<FormatT
5353
PhysicalPlan::HashJoin(plan) => hash_join_to_format_tree(plan, metadata),
5454
PhysicalPlan::Exchange(plan) => exchange_to_format_tree(plan, metadata),
5555
PhysicalPlan::UnionAll(plan) => union_all_to_format_tree(plan, metadata),
56-
PhysicalPlan::ExchangeSource(_) | PhysicalPlan::ExchangeSink(_) => {
56+
PhysicalPlan::ExchangeSource(_)
57+
| PhysicalPlan::ExchangeSink(_)
58+
| PhysicalPlan::DistributedInsertSelect(_) => {
5759
Err(ErrorCode::LogicalError("Invalid physical plan"))
5860
}
5961
}

src/query/service/src/sql/executor/physical_plan.rs

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,9 +21,11 @@ use common_datavalues::DataField;
2121
use common_datavalues::DataSchemaRef;
2222
use common_datavalues::DataSchemaRefExt;
2323
use common_datavalues::NullableType;
24+
use common_datavalues::StringType;
2425
use common_datavalues::ToDataType;
2526
use common_datavalues::Vu8;
2627
use common_exception::Result;
28+
use common_meta_app::schema::TableInfo;
2729
use common_planners::ReadDataSourcePlan;
2830
use common_planners::StageKind;
2931

@@ -303,6 +305,25 @@ impl UnionAll {
303305
}
304306
}
305307

308+
#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
309+
pub struct DistributedInsertSelect {
310+
pub input: Box<PhysicalPlan>,
311+
pub catalog: String,
312+
pub table_info: TableInfo,
313+
pub insert_schema: DataSchemaRef,
314+
pub select_schema: DataSchemaRef,
315+
pub cast_needed: bool,
316+
}
317+
318+
impl DistributedInsertSelect {
319+
pub fn output_schema(&self) -> Result<DataSchemaRef> {
320+
Ok(DataSchemaRefExt::create(vec![
321+
DataField::new("seg_loc", StringType::new_impl()),
322+
DataField::new("seg_info", StringType::new_impl()),
323+
]))
324+
}
325+
}
326+
306327
#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
307328
pub enum PhysicalPlan {
308329
TableScan(TableScan),
@@ -317,6 +338,9 @@ pub enum PhysicalPlan {
317338
Exchange(Exchange),
318339
UnionAll(UnionAll),
319340

341+
/// For insert into ... select ... in cluster
342+
DistributedInsertSelect(DistributedInsertSelect),
343+
320344
/// Synthesized by fragmenter
321345
ExchangeSource(ExchangeSource),
322346
ExchangeSink(ExchangeSink),
@@ -346,6 +370,7 @@ impl PhysicalPlan {
346370
PhysicalPlan::ExchangeSource(plan) => plan.output_schema(),
347371
PhysicalPlan::ExchangeSink(plan) => plan.output_schema(),
348372
PhysicalPlan::UnionAll(plan) => plan.output_schema(),
373+
PhysicalPlan::DistributedInsertSelect(plan) => plan.output_schema(),
349374
}
350375
}
351376

@@ -368,6 +393,9 @@ impl PhysicalPlan {
368393
PhysicalPlan::UnionAll(plan) => Box::new(
369394
std::iter::once(plan.left.as_ref()).chain(std::iter::once(plan.right.as_ref())),
370395
),
396+
PhysicalPlan::DistributedInsertSelect(plan) => {
397+
Box::new(std::iter::once(plan.input.as_ref()))
398+
}
371399
}
372400
}
373401
}

src/query/service/src/sql/executor/physical_plan_display.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ use std::fmt::Formatter;
1818
use common_datavalues::format_data_type_sql;
1919
use itertools::Itertools;
2020

21+
use super::DistributedInsertSelect;
2122
use crate::sql::executor::AggregateFinal;
2223
use crate::sql::executor::AggregatePartial;
2324
use crate::sql::executor::EvalScalar;
@@ -64,6 +65,7 @@ impl<'a> Display for PhysicalPlanIndentFormatDisplay<'a> {
6465
PhysicalPlan::ExchangeSource(source) => write!(f, "{}", source)?,
6566
PhysicalPlan::ExchangeSink(sink) => write!(f, "{}", sink)?,
6667
PhysicalPlan::UnionAll(union_all) => write!(f, "{}", union_all)?,
68+
PhysicalPlan::DistributedInsertSelect(insert_select) => write!(f, "{}", insert_select)?,
6769
}
6870

6971
for node in self.node.children() {
@@ -313,3 +315,9 @@ impl Display for UnionAll {
313315
write!(f, "UnionAll")
314316
}
315317
}
318+
319+
impl Display for DistributedInsertSelect {
320+
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
321+
write!(f, "DistributedInsertSelect")
322+
}
323+
}

src/query/service/src/sql/executor/physical_plan_visitor.rs

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ use common_exception::Result;
1616

1717
use super::AggregateFinal;
1818
use super::AggregatePartial;
19+
use super::DistributedInsertSelect;
1920
use super::EvalScalar;
2021
use super::Exchange;
2122
use super::ExchangeSink;
@@ -45,6 +46,7 @@ pub trait PhysicalPlanReplacer {
4546
PhysicalPlan::ExchangeSource(plan) => self.replace_exchange_source(plan),
4647
PhysicalPlan::ExchangeSink(plan) => self.replace_exchange_sink(plan),
4748
PhysicalPlan::UnionAll(plan) => self.replace_union(plan),
49+
PhysicalPlan::DistributedInsertSelect(plan) => self.replace_insert_select(plan),
4850
}
4951
}
5052

@@ -174,6 +176,21 @@ pub trait PhysicalPlanReplacer {
174176
schema: plan.schema.clone(),
175177
}))
176178
}
179+
180+
fn replace_insert_select(&mut self, plan: &DistributedInsertSelect) -> Result<PhysicalPlan> {
181+
let input = self.replace(&plan.input)?;
182+
183+
Ok(PhysicalPlan::DistributedInsertSelect(
184+
DistributedInsertSelect {
185+
input: Box::new(input),
186+
catalog: plan.catalog.clone(),
187+
table_info: plan.table_info.clone(),
188+
select_schema: plan.select_schema.clone(),
189+
insert_schema: plan.insert_schema.clone(),
190+
cast_needed: plan.cast_needed,
191+
},
192+
))
193+
}
177194
}
178195

179196
impl PhysicalPlan {
@@ -223,6 +240,9 @@ impl PhysicalPlan {
223240
Self::traverse(&plan.left, pre_visit, visit, post_visit);
224241
Self::traverse(&plan.right, pre_visit, visit, post_visit);
225242
}
243+
PhysicalPlan::DistributedInsertSelect(plan) => {
244+
Self::traverse(&plan.input, pre_visit, visit, post_visit);
245+
}
226246
}
227247
post_visit(plan);
228248
}

0 commit comments

Comments
 (0)