Skip to content

Commit b1b0a7e

Browse files
committed
refactor(interpreter): try fix test failure
1 parent 198d485 commit b1b0a7e

File tree

6 files changed

+53
-24
lines changed

6 files changed

+53
-24
lines changed

src/query/service/src/interpreters/interpreter_common.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,6 @@ pub fn append2table(
6868
build_res: &mut PipelineBuildResult,
6969
overwrite: bool,
7070
need_commit: bool,
71-
need_output: bool,
7271
) -> Result<()> {
7372
fill_missing_columns(
7473
ctx.clone(),
@@ -77,7 +76,7 @@ pub fn append2table(
7776
&mut build_res.main_pipeline,
7877
)?;
7978

80-
table.append2(ctx.clone(), &mut build_res.main_pipeline, need_output)?;
79+
table.append2(ctx.clone(), &mut build_res.main_pipeline, false)?;
8180

8281
if need_commit {
8382
build_res.main_pipeline.set_on_finished(move |may_error| {

src/query/service/src/interpreters/interpreter_copy_v2.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -275,7 +275,6 @@ impl CopyInterpreterV2 {
275275
&mut build_res,
276276
false,
277277
true,
278-
false,
279278
)?;
280279
Ok(build_res)
281280
}

src/query/service/src/interpreters/interpreter_insert.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -164,7 +164,6 @@ impl Interpreter for InsertInterpreter {
164164
&mut build_res,
165165
self.plan.overwrite,
166166
true,
167-
false,
168167
)?;
169168

170169
Ok(build_res)

src/query/service/src/interpreters/interpreter_insert_v2.rs

Lines changed: 40 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,8 @@
1515
use std::collections::VecDeque;
1616
use std::sync::Arc;
1717

18+
use common_base::base::GlobalIORuntime;
19+
use common_base::base::TrySpawn;
1820
use common_exception::ErrorCode;
1921
use common_exception::Result;
2022
use parking_lot::Mutex;
@@ -173,27 +175,47 @@ impl Interpreter for InsertInterpreterV2 {
173175
}
174176
};
175177

176-
build_res = match is_distributed_plan {
178+
let mut build_res = match is_distributed_plan {
177179
true => {
178-
build_schedule_pipepline(self.ctx.clone(), &insert_select_plan).await?
180+
build_schedule_pipepline(self.ctx.clone(), &insert_select_plan).await
181+
}
182+
false => {
183+
PipelineBuilder::create(self.ctx.clone()).finalize(&insert_select_plan)
184+
}
185+
}?;
186+
187+
let ctx = self.ctx.clone();
188+
let overwrite = self.plan.overwrite;
189+
build_res.main_pipeline.set_on_finished(move |may_error| {
190+
// capture out variable
191+
let overwrite = overwrite;
192+
let ctx = ctx.clone();
193+
let table = table.clone();
194+
195+
if may_error.is_none() {
196+
let append_entries = ctx.consume_precommit_blocks();
197+
// We must put the commit operation to global runtime, which will avoid the "dispatch dropped without returning error" in tower
198+
let catalog_name = ctx.get_current_catalog();
199+
let commit_handle = GlobalIORuntime::instance().spawn(async move {
200+
table
201+
.commit_insertion(ctx, &catalog_name, append_entries, overwrite)
202+
.await
203+
});
204+
205+
return match futures::executor::block_on(commit_handle) {
206+
Ok(Ok(_)) => Ok(()),
207+
Ok(Err(error)) => Err(error),
208+
Err(cause) => Err(ErrorCode::PanicError(format!(
209+
"Maybe panic while in commit insert. {}",
210+
cause
211+
))),
212+
};
179213
}
180-
false => PipelineBuilder::create(self.ctx.clone())
181-
.finalize(&insert_select_plan)?,
182-
};
183214

184-
if is_distributed_plan {
185-
append2table(
186-
self.ctx.clone(),
187-
table.clone(),
188-
plan.schema(),
189-
&mut build_res,
190-
self.plan.overwrite,
191-
true,
192-
true,
193-
)?;
194-
195-
return Ok(build_res);
196-
}
215+
Err(may_error.as_ref().unwrap().clone())
216+
});
217+
218+
return Ok(build_res);
197219
}
198220
};
199221
}
@@ -205,7 +227,6 @@ impl Interpreter for InsertInterpreterV2 {
205227
&mut build_res,
206228
self.plan.overwrite,
207229
true,
208-
false,
209230
)?;
210231

211232
Ok(build_res)

src/query/service/src/servers/mysql/writers/query_result_writer.rs

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -106,6 +106,18 @@ impl<'a, W: AsyncWrite + Send + Unpin> DFQueryResultWriter<'a, W> {
106106
// XXX: num_columns == 0 may is error?
107107

108108
if !query_result.has_result_set {
109+
// For statements without result sets, we still need to pull the stream because errors may occur in the stream.
110+
let blocks = &mut query_result.blocks;
111+
while let Some(block) = blocks.next().await {
112+
if let Err(e) = block {
113+
dataset_writer
114+
.error(ErrorKind::ER_UNKNOWN_ERROR, &e.to_string().as_bytes())
115+
.await?;
116+
117+
return Ok(());
118+
}
119+
}
120+
109121
dataset_writer.completed(OkResponse::default()).await?;
110122
return Ok(());
111123
}

src/query/service/tests/it/storages/fuse/table_test_fixture.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -296,7 +296,6 @@ impl TestFixture {
296296
&mut build_res,
297297
overwrite,
298298
commit,
299-
false,
300299
)?;
301300

302301
execute_pipeline(self.ctx.clone(), build_res)

0 commit comments

Comments
 (0)