Skip to content

Commit 89a18ba

Browse files
committed
refactor(interpreter): refactor insert
1 parent 009335e commit 89a18ba

File tree

1 file changed

+9
-47
lines changed

1 file changed

+9
-47
lines changed

src/query/service/src/interpreters/interpreter_insert.rs

Lines changed: 9 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -27,11 +27,12 @@ use common_streams::DataBlockStream;
2727
use common_streams::SendableDataBlockStream;
2828
use parking_lot::Mutex;
2929
use common_base::base::{GlobalIORuntime, TrySpawn};
30+
use common_catalog::table::Table;
3031

3132
use super::commit2table;
3233
use super::interpreter_common::append2table;
3334
use crate::interpreters::Interpreter;
34-
use crate::interpreters::interpreter_common::execute_pipeline;
35+
use crate::interpreters::interpreter_common::{commit_table_pipeline, execute_pipeline};
3536
use crate::interpreters::SelectInterpreter;
3637
use crate::pipelines::processors::port::OutputPort;
3738
use crate::pipelines::processors::{BlocksSource, TransformAddOn};
@@ -170,52 +171,13 @@ impl Interpreter for InsertInterpreter {
170171
};
171172
}
172173

173-
if table.schema() != plan.schema() {
174-
build_res.main_pipeline
175-
.add_transform(|transform_input_port, transform_output_port| {
176-
TransformAddOn::try_create(
177-
transform_input_port,
178-
transform_output_port,
179-
plan.schema().clone(),
180-
table.schema(),
181-
self.ctx.clone(),
182-
)
183-
})?;
184-
}
185-
186-
table.append2(self.ctx.clone(), &mut build_res.main_pipeline)?;
187-
188-
let ctx = self.ctx.clone();
189-
let overwrite = self.plan.overwrite;
190-
191-
build_res.main_pipeline.set_on_finished(move |may_error| {
192-
// capture out variable
193-
let overwrite = overwrite;
194-
let ctx = ctx.clone();
195-
let table = table.clone();
196-
197-
if may_error.is_none() {
198-
let append_entries = ctx.consume_precommit_blocks();
199-
// We must put the commit operation to global runtime, which will avoid the "dispatch dropped without returning error" in tower
200-
let catalog_name = ctx.get_current_catalog();
201-
let commit_handle = GlobalIORuntime::instance().spawn(async move {
202-
table
203-
.commit_insertion(ctx, &catalog_name, append_entries, overwrite)
204-
.await
205-
});
206-
207-
return match futures::executor::block_on(commit_handle) {
208-
Ok(Ok(_)) => Ok(()),
209-
Ok(Err(error)) => Err(error),
210-
Err(cause) => Err(ErrorCode::PanicError(format!(
211-
"Maybe panic while in commit insert. {}",
212-
cause
213-
)))
214-
};
215-
}
216-
217-
Err(may_error.as_ref().unwrap().clone())
218-
});
174+
append2table(
175+
self.ctx.clone(),
176+
table,
177+
plan.schema(),
178+
&mut build_res,
179+
self.plan.overwrite,
180+
)?;
219181

220182
Ok(build_res)
221183
}

0 commit comments

Comments
 (0)