Skip to content

Commit 85c39f7

Browse files
committed
refactor: simplify the codes for insert-select.
1 parent 6deb026 commit 85c39f7

File tree

2 files changed

+15
-73
lines changed

2 files changed

+15
-73
lines changed

src/query/service/src/interpreters/interpreter_insert_v2.rs

Lines changed: 13 additions & 72 deletions
Original file line numberDiff line numberDiff line change
@@ -16,10 +16,8 @@ use std::collections::VecDeque;
1616
use std::sync::Arc;
1717

1818
use common_catalog::table::Table;
19-
use common_datavalues::DataType;
2019
use common_exception::ErrorCode;
2120
use common_exception::Result;
22-
use common_functions::scalars::CastFunction;
2321
use common_streams::DataBlockStream;
2422
use common_streams::SendableDataBlockStream;
2523
use futures_util::StreamExt;
@@ -29,15 +27,12 @@ use super::commit2table;
2927
use super::interpreter_common::append2table;
3028
use super::plan_schedulers::build_schedule_pipepline;
3129
use super::ProcessorExecutorStream;
32-
use crate::clusters::ClusterHelper;
3330
use crate::interpreters::Interpreter;
3431
use crate::interpreters::InterpreterPtr;
35-
use crate::interpreters::SelectInterpreterV2;
3632
use crate::pipelines::executor::ExecutorSettings;
3733
use crate::pipelines::executor::PipelinePullingExecutor;
3834
use crate::pipelines::processors::port::OutputPort;
3935
use crate::pipelines::processors::BlocksSource;
40-
use crate::pipelines::processors::TransformCastSchema;
4136
use crate::pipelines::Pipeline;
4237
use crate::pipelines::PipelineBuildResult;
4338
use crate::pipelines::SourcePipeBuilder;
@@ -46,6 +41,7 @@ use crate::sessions::TableContext;
4641
use crate::sql::executor::DistributedInsertSelect;
4742
use crate::sql::executor::PhysicalPlan;
4843
use crate::sql::executor::PhysicalPlanBuilder;
44+
use crate::sql::executor::PipelineBuilder;
4945
use crate::sql::plans::Insert;
5046
use crate::sql::plans::InsertInputSource;
5147
use crate::sql::plans::Plan;
@@ -111,64 +107,7 @@ impl InsertInterpreterV2 {
111107
);
112108
}
113109
InsertInputSource::SelectPlan(plan) => {
114-
if !self.ctx.get_cluster().is_empty() {
115-
// distributed insert select
116-
if let Some(stream) = self
117-
.schedule_insert_select(plan, self.plan.catalog.clone(), table.clone())
118-
.await?
119-
{
120-
return Ok(stream);
121-
}
122-
// else the plan cannot be executed in cluster mode, fallback to standalone mode
123-
}
124-
125-
let select_interpreter = match &**plan {
126-
Plan::Query {
127-
s_expr,
128-
metadata,
129-
bind_context,
130-
..
131-
} => SelectInterpreterV2::try_create(
132-
self.ctx.clone(),
133-
*bind_context.clone(),
134-
*s_expr.clone(),
135-
metadata.clone(),
136-
),
137-
_ => unreachable!(),
138-
};
139-
140-
build_res = select_interpreter?.create_new_pipeline().await?;
141-
142-
if self.check_schema_cast(plan)? {
143-
let mut functions = Vec::with_capacity(self.plan.schema().fields().len());
144-
145-
for (target_field, original_field) in self
146-
.plan
147-
.schema()
148-
.fields()
149-
.iter()
150-
.zip(plan.schema().fields().iter())
151-
{
152-
let target_type_name = target_field.data_type().name();
153-
let from_type = original_field.data_type().clone();
154-
let cast_function =
155-
CastFunction::create("cast", &target_type_name, from_type)?;
156-
functions.push(cast_function);
157-
}
158-
159-
let func_ctx = self.ctx.try_get_function_context()?;
160-
build_res.main_pipeline.add_transform(
161-
|transform_input_port, transform_output_port| {
162-
TransformCastSchema::try_create(
163-
transform_input_port,
164-
transform_output_port,
165-
self.plan.schema(),
166-
functions.clone(),
167-
func_ctx.clone(),
168-
)
169-
},
170-
)?;
171-
}
110+
return self.schedule_insert_select(plan, table.clone()).await;
172111
}
173112
};
174113
}
@@ -202,9 +141,8 @@ impl InsertInterpreterV2 {
202141
async fn schedule_insert_select(
203142
&self,
204143
plan: &Plan,
205-
catalog: String,
206144
table: Arc<dyn Table>,
207-
) -> Result<Option<SendableDataBlockStream>> {
145+
) -> Result<SendableDataBlockStream> {
208146
// select_plan is already distributed optimized
209147
let (mut select_plan, select_column_bindings) = match plan {
210148
Plan::Query {
@@ -219,11 +157,9 @@ impl InsertInterpreterV2 {
219157
_ => unreachable!(),
220158
};
221159

222-
if !select_plan.is_distributed_plan() {
223-
return Ok(None);
224-
}
225-
226160
table.get_table_info();
161+
let catalog = self.plan.catalog.clone();
162+
let is_distributed_plan = select_plan.is_distributed_plan();
227163

228164
let insert_select_plan = match select_plan {
229165
PhysicalPlan::Exchange(ref mut exchange) => {
@@ -256,7 +192,12 @@ impl InsertInterpreterV2 {
256192
}
257193
};
258194

259-
let mut build_res = build_schedule_pipepline(self.ctx.clone(), &insert_select_plan).await?;
195+
let mut build_res = if !is_distributed_plan {
196+
let builder = PipelineBuilder::create(self.ctx.clone());
197+
builder.finalize(&insert_select_plan)?
198+
} else {
199+
build_schedule_pipepline(self.ctx.clone(), &insert_select_plan).await?
200+
};
260201

261202
let settings = self.ctx.get_settings();
262203
let query_need_abort = self.ctx.query_need_abort();
@@ -277,11 +218,11 @@ impl InsertInterpreterV2 {
277218

278219
commit2table(self.ctx.clone(), table.clone(), self.plan.overwrite).await?;
279220

280-
Ok(Some(Box::pin(DataBlockStream::create(
221+
Ok(Box::pin(DataBlockStream::create(
281222
self.plan.schema(),
282223
None,
283224
vec![],
284-
))))
225+
)))
285226
}
286227
}
287228

src/query/service/src/sql/planner/binder/insert.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ use common_io::prelude::*;
3939
use common_pipeline_transforms::processors::transforms::Transform;
4040
use tracing::debug;
4141

42+
use crate::clusters::ClusterHelper;
4243
use crate::evaluator::EvalNode;
4344
use crate::evaluator::Evaluator;
4445
use crate::pipelines::processors::transforms::ExpressionTransformV2;
@@ -126,7 +127,7 @@ impl<'a> Binder {
126127
let statement = Statement::Query(query);
127128
let select_plan = self.bind_statement(bind_context, &statement).await?;
128129
let opt_ctx = Arc::new(OptimizerContext::new(OptimizerConfig {
129-
enable_distributed_optimization: true,
130+
enable_distributed_optimization: !self.ctx.get_cluster().is_empty(),
130131
}));
131132
let optimized_plan = optimize(self.ctx.clone(), opt_ctx, select_plan)?;
132133
Ok(InsertInputSource::SelectPlan(Box::new(optimized_plan)))

0 commit comments

Comments
 (0)