Skip to content

Commit bf48cd7

Browse files
committed
refactor(interpreter): move commit into pipeline
1 parent 6b620fb commit bf48cd7

File tree

4 files changed

+104
-46
lines changed

4 files changed

+104
-46
lines changed

src/query/service/src/interpreters/interpreter_common.rs

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -61,13 +61,23 @@ pub fn append2table(
6161
}
6262

6363
table.append2(ctx.clone(), &mut build_res.main_pipeline)?;
64+
6465
let query_need_abort = ctx.query_need_abort();
6566
let executor_settings = ExecutorSettings::try_create(&ctx.get_settings())?;
6667
build_res.set_max_threads(ctx.get_settings().get_max_threads()? as usize);
6768
let mut pipelines = build_res.sources_pipelines;
6869
pipelines.push(build_res.main_pipeline);
69-
let executor =
70-
PipelineCompleteExecutor::from_pipelines(query_need_abort, pipelines, executor_settings)?;
70+
let executor = PipelineCompleteExecutor::from_pipelines(query_need_abort, pipelines, executor_settings)?;
71+
executor.execute()
72+
}
73+
74+
pub fn execute_pipeline(ctx: Arc<QueryContext>, mut res: PipelineBuildResult) -> Result<()> {
75+
let query_need_abort = ctx.query_need_abort();
76+
let executor_settings = ExecutorSettings::try_create(&ctx.get_settings())?;
77+
res.set_max_threads(ctx.get_settings().get_max_threads()? as usize);
78+
let mut pipelines = res.sources_pipelines;
79+
pipelines.push(res.main_pipeline);
80+
let executor = PipelineCompleteExecutor::from_pipelines(query_need_abort, pipelines, executor_settings)?;
7181
executor.execute()
7282
}
7383

src/query/service/src/interpreters/interpreter_insert.rs

Lines changed: 59 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -26,13 +26,15 @@ use common_planners::SelectPlan;
2626
use common_streams::DataBlockStream;
2727
use common_streams::SendableDataBlockStream;
2828
use parking_lot::Mutex;
29+
use common_base::base::{GlobalIORuntime, TrySpawn};
2930

3031
use super::commit2table;
3132
use super::interpreter_common::append2table;
3233
use crate::interpreters::Interpreter;
34+
use crate::interpreters::interpreter_common::execute_pipeline;
3335
use crate::interpreters::SelectInterpreter;
3436
use crate::pipelines::processors::port::OutputPort;
35-
use crate::pipelines::processors::BlocksSource;
37+
use crate::pipelines::processors::{BlocksSource, TransformAddOn};
3638
use crate::pipelines::processors::TransformCastSchema;
3739
use crate::pipelines::Pipeline;
3840
use crate::pipelines::PipelineBuildResult;
@@ -85,13 +87,21 @@ impl Interpreter for InsertInterpreter {
8587
}
8688

8789
async fn execute(&self) -> Result<SendableDataBlockStream> {
90+
let build_res = self.create_new_pipeline().await?;
91+
92+
execute_pipeline(self.ctx.clone(), build_res)?;
93+
Ok(Box::pin(DataBlockStream::create(self.plan.schema(), None, vec![])))
94+
}
95+
96+
async fn create_new_pipeline(&self) -> Result<PipelineBuildResult> {
8897
let plan = &self.plan;
8998
let settings = self.ctx.get_settings();
9099
let table = self
91100
.ctx
92101
.get_table(&plan.catalog, &plan.database, &plan.table)
93102
.await?;
94-
let mut build_res = self.create_new_pipeline().await?;
103+
104+
let mut build_res = PipelineBuildResult::create();
95105
let mut builder = SourcePipeBuilder::create();
96106
if self.async_insert {
97107
build_res.main_pipeline.add_pipe(
@@ -160,21 +170,54 @@ impl Interpreter for InsertInterpreter {
160170
};
161171
}
162172

163-
append2table(self.ctx.clone(), table.clone(), plan.schema(), build_res)?;
164-
commit2table(self.ctx.clone(), table.clone(), plan.overwrite).await?;
165-
Ok(Box::pin(DataBlockStream::create(
166-
self.plan.schema(),
167-
None,
168-
vec![],
169-
)))
170-
}
173+
if table.schema() != plan.schema() {
174+
build_res.main_pipeline
175+
.add_transform(|transform_input_port, transform_output_port| {
176+
TransformAddOn::try_create(
177+
transform_input_port,
178+
transform_output_port,
179+
plan.schema().clone(),
180+
table.schema(),
181+
self.ctx.clone(),
182+
)
183+
})?;
184+
}
171185

172-
async fn create_new_pipeline(&self) -> Result<PipelineBuildResult> {
173-
let insert_pipeline = Pipeline::create();
174-
Ok(PipelineBuildResult {
175-
main_pipeline: insert_pipeline,
176-
sources_pipelines: vec![],
177-
})
186+
table.append2(self.ctx.clone(), &mut build_res.main_pipeline)?;
187+
188+
let ctx = self.ctx.clone();
189+
let overwrite = self.plan.overwrite;
190+
191+
build_res.main_pipeline.set_on_finished(move |may_error| {
192+
// capture out variable
193+
let overwrite = overwrite;
194+
let ctx = ctx.clone();
195+
let table = table.clone();
196+
197+
if may_error.is_none() {
198+
let append_entries = ctx.consume_precommit_blocks();
199+
// We must put the commit operation to global runtime, which will avoid the "dispatch dropped without returning error" in tower
200+
let catalog_name = ctx.get_current_catalog();
201+
let commit_handle = GlobalIORuntime::instance().spawn(async move {
202+
table
203+
.commit_insertion(ctx, &catalog_name, append_entries, overwrite)
204+
.await
205+
});
206+
207+
return match futures::executor::block_on(commit_handle) {
208+
Ok(Ok(_)) => Ok(()),
209+
Ok(Err(error)) => Err(error),
210+
Err(cause) => Err(ErrorCode::PanicError(format!(
211+
"Maybe panic while in commit insert. {}",
212+
cause
213+
)))
214+
};
215+
}
216+
217+
Err(may_error.as_ref().unwrap().clone())
218+
});
219+
220+
Ok(build_res)
178221
}
179222

180223
fn set_source_pipe_builder(&self, builder: Option<SourcePipeBuilder>) -> Result<()> {

src/query/service/src/interpreters/interpreter_insert_v2.rs

Lines changed: 24 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,30 @@ impl InsertInterpreterV2 {
6161
}))
6262
}
6363

64-
async fn execute_new(&self) -> Result<SendableDataBlockStream> {
64+
fn check_schema_cast(&self, plan: &Plan) -> common_exception::Result<bool> {
65+
let output_schema = &self.plan.schema;
66+
let select_schema = plan.schema();
67+
68+
// validate schema
69+
if select_schema.fields().len() < output_schema.fields().len() {
70+
return Err(ErrorCode::BadArguments(
71+
"Fields in select statement is less than expected",
72+
));
73+
}
74+
75+
// check if cast needed
76+
let cast_needed = select_schema != *output_schema;
77+
Ok(cast_needed)
78+
}
79+
}
80+
81+
#[async_trait::async_trait]
82+
impl Interpreter for InsertInterpreterV2 {
83+
fn name(&self) -> &str {
84+
"InsertIntoInterpreter"
85+
}
86+
87+
async fn execute(&self) -> Result<SendableDataBlockStream> {
6588
let plan = &self.plan;
6689
let settings = self.ctx.get_settings();
6790
let table = self
@@ -162,33 +185,6 @@ impl InsertInterpreterV2 {
162185
)))
163186
}
164187

165-
fn check_schema_cast(&self, plan: &Plan) -> common_exception::Result<bool> {
166-
let output_schema = &self.plan.schema;
167-
let select_schema = plan.schema();
168-
169-
// validate schema
170-
if select_schema.fields().len() < output_schema.fields().len() {
171-
return Err(ErrorCode::BadArguments(
172-
"Fields in select statement is less than expected",
173-
));
174-
}
175-
176-
// check if cast needed
177-
let cast_needed = select_schema != *output_schema;
178-
Ok(cast_needed)
179-
}
180-
}
181-
182-
#[async_trait::async_trait]
183-
impl Interpreter for InsertInterpreterV2 {
184-
fn name(&self) -> &str {
185-
"InsertIntoInterpreter"
186-
}
187-
188-
async fn execute(&self) -> Result<SendableDataBlockStream> {
189-
self.execute_new().await
190-
}
191-
192188
async fn create_new_pipeline(&self) -> Result<PipelineBuildResult> {
193189
let insert_pipeline = Pipeline::create();
194190
Ok(PipelineBuildResult {

src/query/service/src/pipelines/pipeline_build_res.rs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,8 @@
1212
// See the License for the specific language governing permissions and
1313
// limitations under the License.
1414

15+
use std::sync::Arc;
16+
use common_catalog::table::Table;
1517
use common_pipeline_core::Pipeline;
1618

1719
pub struct PipelineBuildResult {
@@ -21,6 +23,13 @@ pub struct PipelineBuildResult {
2123
}
2224

2325
impl PipelineBuildResult {
26+
pub fn create() -> PipelineBuildResult {
27+
PipelineBuildResult {
28+
main_pipeline: Pipeline::create(),
29+
sources_pipelines: vec![],
30+
}
31+
}
32+
2433
pub fn set_max_threads(&mut self, max_threads: usize) {
2534
self.main_pipeline.set_max_threads(max_threads);
2635

0 commit comments

Comments
 (0)