Skip to content

Commit eeb62b3

Browse files
committed
refactor(interpreter): refactor copy v2
1 parent 89a18ba commit eeb62b3

File tree

6 files changed

+126
-185
lines changed

6 files changed

+126
-185
lines changed

src/query/service/src/interpreters/interpreter_common.rs

Lines changed: 30 additions & 89 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ pub fn append2table(
4545
source_schema: DataSchemaRef,
4646
build_res: &mut PipelineBuildResult,
4747
overwrite: bool,
48+
need_commit: bool,
4849
) -> Result<()> {
4950
if table.schema() != source_schema {
5051
build_res
@@ -62,34 +63,36 @@ pub fn append2table(
6263

6364
table.append2(ctx.clone(), &mut build_res.main_pipeline)?;
6465

65-
build_res.main_pipeline.set_on_finished(move |may_error| {
66-
// capture out variable
67-
let overwrite = overwrite;
68-
let ctx = ctx.clone();
69-
let table = table.clone();
70-
71-
if may_error.is_none() {
72-
let append_entries = ctx.consume_precommit_blocks();
73-
// We must put the commit operation to global runtime, which will avoid the "dispatch dropped without returning error" in tower
74-
let catalog_name = ctx.get_current_catalog();
75-
let commit_handle = GlobalIORuntime::instance().spawn(async move {
76-
table
77-
.commit_insertion(ctx, &catalog_name, append_entries, overwrite)
78-
.await
79-
});
80-
81-
return match futures::executor::block_on(commit_handle) {
82-
Ok(Ok(_)) => Ok(()),
83-
Ok(Err(error)) => Err(error),
84-
Err(cause) => Err(ErrorCode::PanicError(format!(
85-
"Maybe panic while in commit insert. {}",
86-
cause
87-
)))
88-
};
89-
}
66+
if need_commit {
67+
build_res.main_pipeline.set_on_finished(move |may_error| {
68+
// capture out variable
69+
let overwrite = overwrite;
70+
let ctx = ctx.clone();
71+
let table = table.clone();
72+
73+
if may_error.is_none() {
74+
let append_entries = ctx.consume_precommit_blocks();
75+
// We must put the commit operation to global runtime, which will avoid the "dispatch dropped without returning error" in tower
76+
let catalog_name = ctx.get_current_catalog();
77+
let commit_handle = GlobalIORuntime::instance().spawn(async move {
78+
table
79+
.commit_insertion(ctx, &catalog_name, append_entries, overwrite)
80+
.await
81+
});
82+
83+
return match futures::executor::block_on(commit_handle) {
84+
Ok(Ok(_)) => Ok(()),
85+
Ok(Err(error)) => Err(error),
86+
Err(cause) => Err(ErrorCode::PanicError(format!(
87+
"Maybe panic while in commit insert. {}",
88+
cause
89+
)))
90+
};
91+
}
9092

91-
Err(may_error.as_ref().unwrap().clone())
92-
});
93+
Err(may_error.as_ref().unwrap().clone())
94+
});
95+
}
9396

9497
Ok(())
9598
}
@@ -104,68 +107,6 @@ pub fn execute_pipeline(ctx: Arc<QueryContext>, mut res: PipelineBuildResult) ->
104107
executor.execute()
105108
}
106109

107-
pub fn commit_table_pipeline(
108-
ctx: Arc<QueryContext>,
109-
table: Arc<dyn Table>,
110-
overwrite: bool,
111-
build_res: &mut PipelineBuildResult,
112-
) -> Result<()> {
113-
build_res.main_pipeline.set_on_finished(move |may_error| {
114-
// capture out variable
115-
let overwrite = overwrite;
116-
let ctx = ctx.clone();
117-
let table = table.clone();
118-
119-
if may_error.is_none() {
120-
let append_entries = ctx.consume_precommit_blocks();
121-
// We must put the commit operation to global runtime, which will avoid the "dispatch dropped without returning error" in tower
122-
let catalog_name = ctx.get_current_catalog();
123-
let commit_handle = GlobalIORuntime::instance().spawn(async move {
124-
table
125-
.commit_insertion(ctx, &catalog_name, append_entries, overwrite)
126-
.await
127-
});
128-
129-
return match futures::executor::block_on(commit_handle) {
130-
Ok(Ok(_)) => Ok(()),
131-
Ok(Err(error)) => Err(error),
132-
Err(cause) => Err(ErrorCode::PanicError(format!(
133-
"Maybe panic while in commit insert. {}",
134-
cause
135-
)))
136-
};
137-
}
138-
139-
Err(may_error.as_ref().unwrap().clone())
140-
});
141-
142-
Ok(())
143-
}
144-
145-
pub async fn commit2table(
146-
ctx: Arc<QueryContext>,
147-
table: Arc<dyn Table>,
148-
overwrite: bool,
149-
) -> Result<()> {
150-
let append_entries = ctx.consume_precommit_blocks();
151-
// We must put the commit operation to global runtime, which will avoid the "dispatch dropped without returning error" in tower
152-
let catalog_name = ctx.get_current_catalog();
153-
let handler = GlobalIORuntime::instance().spawn(async move {
154-
table
155-
.commit_insertion(ctx, &catalog_name, append_entries, overwrite)
156-
.await
157-
});
158-
159-
match handler.await {
160-
Ok(Ok(_)) => Ok(()),
161-
Ok(Err(cause)) => Err(cause),
162-
Err(cause) => Err(ErrorCode::PanicError(format!(
163-
"Maybe panic while in commit insert. {}",
164-
cause
165-
))),
166-
}
167-
}
168-
169110
pub async fn validate_grant_object_exists(
170111
ctx: &Arc<QueryContext>,
171112
object: &GrantObject,

src/query/service/src/interpreters/interpreter_copy_v2.rs

Lines changed: 81 additions & 74 deletions
Original file line numberDiff line numberDiff line change
@@ -26,14 +26,15 @@ use common_streams::DataBlockStream;
2626
use common_streams::SendableDataBlockStream;
2727
use futures::TryStreamExt;
2828
use regex::Regex;
29+
use common_base::base::{GlobalIORuntime, TrySpawn};
2930

3031
use super::append2table;
31-
use super::commit2table;
3232
use crate::interpreters::Interpreter;
33+
use crate::interpreters::interpreter_common::{execute_pipeline};
3334
use crate::interpreters::SelectInterpreterV2;
3435
use crate::pipelines::executor::ExecutorSettings;
3536
use crate::pipelines::executor::PipelineCompleteExecutor;
36-
use crate::pipelines::Pipeline;
37+
use crate::pipelines::{Pipeline, PipelineBuildResult};
3738
use crate::sessions::QueryContext;
3839
use crate::sessions::TableContext;
3940
use crate::sql::plans::CopyPlanV2;
@@ -112,11 +113,11 @@ impl CopyInterpreterV2 {
112113
}
113114
}
114115

115-
async fn purge_files(&self, from: &ReadDataSourcePlan, files: &Vec<String>) -> Result<()> {
116+
async fn purge_files(ctx: Arc<QueryContext>, from: &ReadDataSourcePlan, files: &Vec<String>) -> Result<()> {
116117
match &from.source_info {
117118
SourceInfo::StageSource(table_info) => {
118119
if table_info.stage_info.copy_options.purge {
119-
let rename_me: Arc<dyn TableContext> = self.ctx.clone();
120+
let rename_me: Arc<dyn TableContext> = ctx.clone();
120121
let op = StageSourceHelper::get_op(&rename_me, &table_info.stage_info).await?;
121122
for f in files {
122123
if let Err(e) = op.object(f).delete().await {
@@ -137,10 +138,10 @@ impl CopyInterpreterV2 {
137138
/// Rewrite the ReadDataSourcePlan.S3StageSource.file_name to new file name.
138139
fn rewrite_read_plan_file_name(
139140
mut plan: ReadDataSourcePlan,
140-
files: Vec<String>,
141+
files: &[String],
141142
) -> ReadDataSourcePlan {
142143
if let SourceInfo::StageSource(ref mut stage) = plan.source_info {
143-
stage.files = files
144+
stage.files = files.to_vec()
144145
}
145146
plan
146147
}
@@ -160,40 +161,64 @@ impl CopyInterpreterV2 {
160161
tbl_name: &String,
161162
from: &ReadDataSourcePlan,
162163
files: Vec<String>,
163-
) -> Result<Vec<DataBlock>> {
164-
let ctx = self.ctx.clone();
165-
let settings = self.ctx.get_settings();
164+
) -> Result<PipelineBuildResult> {
165+
let mut build_res = PipelineBuildResult::create();
166166

167-
let mut pipeline = Pipeline::create();
168-
let read_source_plan = from.clone();
169-
let read_source_plan = Self::rewrite_read_plan_file_name(read_source_plan, files);
167+
let read_source_plan = Self::rewrite_read_plan_file_name(from.clone(), &files);
170168
tracing::info!("copy_files_to_table from source: {:?}", read_source_plan);
171-
let table = ctx.build_table_from_source_plan(&read_source_plan)?;
172-
let res = table.read2(ctx.clone(), &read_source_plan, &mut pipeline);
173-
if let Err(e) = res {
174-
return Err(e);
175-
}
176169

177-
let table = ctx.get_table(catalog_name, db_name, tbl_name).await?;
170+
let from_table = self.ctx.build_table_from_source_plan(&read_source_plan)?;
171+
from_table.read2(self.ctx.clone(), &read_source_plan, &mut build_res.main_pipeline)?;
178172

179-
table.append2(ctx.clone(), &mut pipeline)?;
180-
pipeline.set_max_threads(settings.get_max_threads()? as usize);
173+
let to_table = self.ctx.get_table(catalog_name, db_name, tbl_name).await?;
181174

182-
let query_need_abort = ctx.query_need_abort();
183-
let executor_settings = ExecutorSettings::try_create(&settings)?;
184-
let executor =
185-
PipelineCompleteExecutor::try_create(query_need_abort, pipeline, executor_settings)?;
186-
executor.execute()?;
175+
to_table.append2(self.ctx.clone(), &mut build_res.main_pipeline)?;
187176

188-
Ok(ctx.consume_precommit_blocks())
177+
let ctx = self.ctx.clone();
178+
let catalog_name = catalog_name.clone();
179+
let files = files.clone();
180+
let from = from.clone();
181+
182+
build_res.main_pipeline.set_on_finished(move |may_error| {
183+
if may_error.is_none() {
184+
// capture out variable
185+
let ctx = ctx.clone();
186+
let files = files.clone();
187+
let from = from.clone();
188+
let catalog_name = catalog_name.clone();
189+
let to_table = to_table.clone();
190+
191+
let task = GlobalIORuntime::instance().spawn(async move {
192+
// Commit
193+
let operations = ctx.consume_precommit_blocks();
194+
to_table.commit_insertion(ctx.clone(), &catalog_name, operations, false).await?;
195+
196+
// Purge
197+
CopyInterpreterV2::purge_files(ctx, &from, &files).await
198+
});
199+
200+
return match futures::executor::block_on(task) {
201+
Ok(Ok(_)) => Ok(()),
202+
Ok(Err(error)) => Err(error),
203+
Err(cause) => Err(ErrorCode::PanicError(format!(
204+
"Maybe panic while in commit insert. {}",
205+
cause
206+
)))
207+
};
208+
}
209+
210+
Err(may_error.as_ref().unwrap().clone())
211+
});
212+
213+
Ok(build_res)
189214
}
190215

191216
async fn execute_copy_into_stage(
192217
&self,
193218
stage: &UserStageInfo,
194219
path: &str,
195220
query: &Plan,
196-
) -> Result<SendableDataBlockStream> {
221+
) -> Result<PipelineBuildResult> {
197222
let (s_expr, metadata, bind_context) = match query {
198223
Plan::Query {
199224
s_expr,
@@ -231,22 +256,11 @@ impl CopyInterpreterV2 {
231256
files: vec![],
232257
};
233258

234-
let build_res = select_interpreter.create_new_pipeline().await?;
259+
let mut build_res = select_interpreter.create_new_pipeline().await?;
235260
let table = StageTable::try_create(stage_table_info)?;
236261

237-
append2table(
238-
self.ctx.clone(),
239-
table.clone(),
240-
data_schema.clone(),
241-
build_res,
242-
)?;
243-
commit2table(self.ctx.clone(), table.clone(), false).await?;
244-
245-
Ok(Box::pin(DataBlockStream::create(
246-
Arc::new(DataSchema::empty()),
247-
None,
248-
vec![],
249-
)))
262+
append2table(self.ctx.clone(), table.clone(), data_schema.clone(), &mut build_res, false, true)?;
263+
Ok(build_res)
250264
}
251265
}
252266

@@ -258,6 +272,30 @@ impl Interpreter for CopyInterpreterV2 {
258272

259273
#[tracing::instrument(level = "debug", name = "copy_interpreter_execute_v2", skip(self), fields(ctx.id = self.ctx.get_id().as_str()))]
260274
async fn execute(&self) -> Result<SendableDataBlockStream> {
275+
let build_res = self.create_new_pipeline().await?;
276+
277+
let settings = self.ctx.get_settings();
278+
let query_need_abort = self.ctx.query_need_abort();
279+
let executor_settings = ExecutorSettings::try_create(&settings)?;
280+
281+
let mut pipelines = build_res.sources_pipelines;
282+
pipelines.push(build_res.main_pipeline);
283+
284+
let executor = PipelineCompleteExecutor::from_pipelines(
285+
query_need_abort,
286+
pipelines,
287+
executor_settings,
288+
)?;
289+
290+
executor.execute()?;
291+
Ok(Box::pin(DataBlockStream::create(
292+
Arc::new(DataSchema::new(vec![])),
293+
None,
294+
vec![],
295+
)))
296+
}
297+
298+
async fn create_new_pipeline(&self) -> Result<PipelineBuildResult> {
261299
match &self.plan {
262300
// TODO(xuanwo): extract them as a separate function.
263301
CopyPlanV2::IntoTable {
@@ -290,40 +328,9 @@ impl Interpreter for CopyInterpreterV2 {
290328
}
291329

292330
tracing::info!("matched files: {:?}, pattern: {}", &files, pattern);
293-
294-
let write_results = self
295-
.copy_files_to_table(
296-
catalog_name,
297-
database_name,
298-
table_name,
299-
from,
300-
files.clone(),
301-
)
302-
.await?;
303-
304-
let table = self
305-
.ctx
306-
.get_table(catalog_name, database_name, table_name)
307-
.await?;
308-
309-
// Commit.
310-
table
311-
.commit_insertion(self.ctx.clone(), catalog_name, write_results, false)
312-
.await?;
313-
314-
// Purge
315-
self.purge_files(from, &files).await?;
316-
317-
Ok(Box::pin(DataBlockStream::create(
318-
// TODO(xuanwo): Is this correct?
319-
Arc::new(DataSchema::new(vec![])),
320-
None,
321-
vec![],
322-
)))
331+
self.copy_files_to_table(catalog_name, database_name, table_name, from, files.clone()).await
323332
}
324-
CopyPlanV2::IntoStage {
325-
stage, from, path, ..
326-
} => self.execute_copy_into_stage(stage, path, from).await,
333+
CopyPlanV2::IntoStage { stage, from, path, .. } => self.execute_copy_into_stage(stage, path, from).await,
327334
}
328335
}
329336
}

src/query/service/src/interpreters/interpreter_insert.rs

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -28,11 +28,9 @@ use common_streams::SendableDataBlockStream;
2828
use parking_lot::Mutex;
2929
use common_base::base::{GlobalIORuntime, TrySpawn};
3030
use common_catalog::table::Table;
31-
32-
use super::commit2table;
3331
use super::interpreter_common::append2table;
3432
use crate::interpreters::Interpreter;
35-
use crate::interpreters::interpreter_common::{commit_table_pipeline, execute_pipeline};
33+
use crate::interpreters::interpreter_common::{execute_pipeline};
3634
use crate::interpreters::SelectInterpreter;
3735
use crate::pipelines::processors::port::OutputPort;
3836
use crate::pipelines::processors::{BlocksSource, TransformAddOn};
@@ -177,6 +175,7 @@ impl Interpreter for InsertInterpreter {
177175
plan.schema(),
178176
&mut build_res,
179177
self.plan.overwrite,
178+
true,
180179
)?;
181180

182181
Ok(build_res)

0 commit comments

Comments
 (0)