Skip to content

Commit 009335e

Browse files
committed
refactor(interpreter): refactor insert v2
1 parent bf48cd7 commit 009335e

File tree

2 files changed

+92
-26
lines changed

2 files changed

+92
-26
lines changed

src/query/service/src/interpreters/interpreter_common.rs

Lines changed: 71 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -43,10 +43,10 @@ pub fn append2table(
4343
ctx: Arc<QueryContext>,
4444
table: Arc<dyn Table>,
4545
source_schema: DataSchemaRef,
46-
mut build_res: PipelineBuildResult,
46+
build_res: &mut PipelineBuildResult,
47+
overwrite: bool,
4748
) -> Result<()> {
48-
let need_fill_missing_columns = table.schema() != source_schema;
49-
if need_fill_missing_columns {
49+
if table.schema() != source_schema {
5050
build_res
5151
.main_pipeline
5252
.add_transform(|transform_input_port, transform_output_port| {
@@ -62,13 +62,36 @@ pub fn append2table(
6262

6363
table.append2(ctx.clone(), &mut build_res.main_pipeline)?;
6464

65-
let query_need_abort = ctx.query_need_abort();
66-
let executor_settings = ExecutorSettings::try_create(&ctx.get_settings())?;
67-
build_res.set_max_threads(ctx.get_settings().get_max_threads()? as usize);
68-
let mut pipelines = build_res.sources_pipelines;
69-
pipelines.push(build_res.main_pipeline);
70-
let executor = PipelineCompleteExecutor::from_pipelines(query_need_abort, pipelines, executor_settings)?;
71-
executor.execute()
65+
build_res.main_pipeline.set_on_finished(move |may_error| {
66+
// capture out variable
67+
let overwrite = overwrite;
68+
let ctx = ctx.clone();
69+
let table = table.clone();
70+
71+
if may_error.is_none() {
72+
let append_entries = ctx.consume_precommit_blocks();
73+
// We must put the commit operation to global runtime, which will avoid the "dispatch dropped without returning error" in tower
74+
let catalog_name = ctx.get_current_catalog();
75+
let commit_handle = GlobalIORuntime::instance().spawn(async move {
76+
table
77+
.commit_insertion(ctx, &catalog_name, append_entries, overwrite)
78+
.await
79+
});
80+
81+
return match futures::executor::block_on(commit_handle) {
82+
Ok(Ok(_)) => Ok(()),
83+
Ok(Err(error)) => Err(error),
84+
Err(cause) => Err(ErrorCode::PanicError(format!(
85+
"Maybe panic while in commit insert. {}",
86+
cause
87+
)))
88+
};
89+
}
90+
91+
Err(may_error.as_ref().unwrap().clone())
92+
});
93+
94+
Ok(())
7295
}
7396

7497
pub fn execute_pipeline(ctx: Arc<QueryContext>, mut res: PipelineBuildResult) -> Result<()> {
@@ -81,6 +104,44 @@ pub fn execute_pipeline(ctx: Arc<QueryContext>, mut res: PipelineBuildResult) ->
81104
executor.execute()
82105
}
83106

107+
pub fn commit_table_pipeline(
108+
ctx: Arc<QueryContext>,
109+
table: Arc<dyn Table>,
110+
overwrite: bool,
111+
build_res: &mut PipelineBuildResult,
112+
) -> Result<()> {
113+
build_res.main_pipeline.set_on_finished(move |may_error| {
114+
// capture out variable
115+
let overwrite = overwrite;
116+
let ctx = ctx.clone();
117+
let table = table.clone();
118+
119+
if may_error.is_none() {
120+
let append_entries = ctx.consume_precommit_blocks();
121+
// We must put the commit operation to global runtime, which will avoid the "dispatch dropped without returning error" in tower
122+
let catalog_name = ctx.get_current_catalog();
123+
let commit_handle = GlobalIORuntime::instance().spawn(async move {
124+
table
125+
.commit_insertion(ctx, &catalog_name, append_entries, overwrite)
126+
.await
127+
});
128+
129+
return match futures::executor::block_on(commit_handle) {
130+
Ok(Ok(_)) => Ok(()),
131+
Ok(Err(error)) => Err(error),
132+
Err(cause) => Err(ErrorCode::PanicError(format!(
133+
"Maybe panic while in commit insert. {}",
134+
cause
135+
)))
136+
};
137+
}
138+
139+
Err(may_error.as_ref().unwrap().clone())
140+
});
141+
142+
Ok(())
143+
}
144+
84145
pub async fn commit2table(
85146
ctx: Arc<QueryContext>,
86147
table: Arc<dyn Table>,

src/query/service/src/interpreters/interpreter_insert_v2.rs

Lines changed: 21 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ use parking_lot::Mutex;
2626
use super::commit2table;
2727
use super::interpreter_common::append2table;
2828
use crate::interpreters::Interpreter;
29+
use crate::interpreters::interpreter_common::{commit_table_pipeline, execute_pipeline};
2930
use crate::interpreters::InterpreterPtr;
3031
use crate::interpreters::SelectInterpreterV2;
3132
use crate::pipelines::processors::port::OutputPort;
@@ -85,14 +86,25 @@ impl Interpreter for InsertInterpreterV2 {
8586
}
8687

8788
async fn execute(&self) -> Result<SendableDataBlockStream> {
89+
let build_res = self.create_new_pipeline().await?;
90+
execute_pipeline(self.ctx.clone(), build_res)?;
91+
92+
Ok(Box::pin(DataBlockStream::create(
93+
self.plan.schema(),
94+
None,
95+
vec![],
96+
)))
97+
}
98+
99+
async fn create_new_pipeline(&self) -> Result<PipelineBuildResult> {
88100
let plan = &self.plan;
89101
let settings = self.ctx.get_settings();
90102
let table = self
91103
.ctx
92104
.get_table(&plan.catalog, &plan.database, &plan.table)
93105
.await?;
94106

95-
let mut build_res = self.create_new_pipeline().await?;
107+
let mut build_res = PipelineBuildResult::create();
96108
let mut builder = SourcePipeBuilder::create();
97109

98110
if self.async_insert {
@@ -175,22 +187,15 @@ impl Interpreter for InsertInterpreterV2 {
175187
};
176188
}
177189

178-
append2table(self.ctx.clone(), table.clone(), plan.schema(), build_res)?;
179-
commit2table(self.ctx.clone(), table.clone(), self.plan.overwrite).await?;
190+
append2table(
191+
self.ctx.clone(),
192+
table.clone(),
193+
plan.schema(),
194+
&mut build_res,
195+
self.plan.overwrite,
196+
)?;
180197

181-
Ok(Box::pin(DataBlockStream::create(
182-
self.plan.schema(),
183-
None,
184-
vec![],
185-
)))
186-
}
187-
188-
async fn create_new_pipeline(&self) -> Result<PipelineBuildResult> {
189-
let insert_pipeline = Pipeline::create();
190-
Ok(PipelineBuildResult {
191-
main_pipeline: insert_pipeline,
192-
sources_pipelines: vec![],
193-
})
198+
Ok(build_res)
194199
}
195200

196201
fn set_source_pipe_builder(&self, builder: Option<SourcePipeBuilder>) -> Result<()> {

0 commit comments

Comments
 (0)