Skip to content

Commit 07fc475

Browse files
committed
Finish the distributed insert select pipeline.
1 parent e94fb53 commit 07fc475

File tree

17 files changed

+109
-42
lines changed

17 files changed

+109
-42
lines changed

src/query/catalog/src/table.rs

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -140,7 +140,12 @@ pub trait Table: Sync + Send {
140140
)))
141141
}
142142

143-
fn append2(&self, _: Arc<dyn TableContext>, _: &mut Pipeline) -> Result<()> {
143+
fn append2(
144+
&self,
145+
_: Arc<dyn TableContext>,
146+
_: &mut Pipeline,
147+
_need_output: bool,
148+
) -> Result<()> {
144149
Err(ErrorCode::UnImplement(format!(
145150
"append2 operation for table {} is not implemented, table engine is {}",
146151
self.name(),

src/query/pipeline/sinks/src/processors/sinks/context_sink.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ impl ContextSink {
3434
}
3535

3636
impl Sink for ContextSink {
37-
const NAME: &'static str = "ContextSink ";
37+
const NAME: &'static str = "ContextSink";
3838

3939
fn consume(&mut self, block: DataBlock) -> Result<()> {
4040
self.ctx.push_precommit_block(block);

src/query/service/src/interpreters/interpreter_common.rs

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ use chrono::TimeZone;
1919
use chrono::Utc;
2020
use common_base::base::GlobalIORuntime;
2121
use common_base::base::TrySpawn;
22+
use common_datablocks::DataBlock;
2223
use common_datavalues::DataSchemaRef;
2324
use common_exception::ErrorCode;
2425
use common_exception::Result;
@@ -74,7 +75,7 @@ pub fn append2table(
7475
&mut build_res.main_pipeline,
7576
)?;
7677

77-
table.append2(ctx.clone(), &mut build_res.main_pipeline)?;
78+
table.append2(ctx.clone(), &mut build_res.main_pipeline, false)?;
7879
let query_need_abort = ctx.query_need_abort();
7980
let executor_settings = ExecutorSettings::try_create(&ctx.get_settings())?;
8081
build_res.set_max_threads(ctx.get_settings().get_max_threads()? as usize);
@@ -85,12 +86,12 @@ pub fn append2table(
8586
executor.execute()
8687
}
8788

88-
pub async fn commit2table(
89+
pub async fn commit2table_with_append_entries(
8990
ctx: Arc<QueryContext>,
9091
table: Arc<dyn Table>,
9192
overwrite: bool,
93+
append_entries: Vec<DataBlock>,
9294
) -> Result<()> {
93-
let append_entries = ctx.consume_precommit_blocks();
9495
// We must put the commit operation to global runtime, which will avoid the "dispatch dropped without returning error" in tower
9596
let catalog_name = ctx.get_current_catalog();
9697
let handler = GlobalIORuntime::instance().spawn(async move {
@@ -109,6 +110,15 @@ pub async fn commit2table(
109110
}
110111
}
111112

113+
pub async fn commit2table(
114+
ctx: Arc<QueryContext>,
115+
table: Arc<dyn Table>,
116+
overwrite: bool,
117+
) -> Result<()> {
118+
let append_entries = ctx.consume_precommit_blocks();
119+
commit2table_with_append_entries(ctx, table, overwrite, append_entries).await
120+
}
121+
112122
pub async fn validate_grant_object_exists(
113123
ctx: &Arc<QueryContext>,
114124
object: &GrantObject,

src/query/service/src/interpreters/interpreter_copy_v2.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -176,7 +176,7 @@ impl CopyInterpreterV2 {
176176

177177
let table = ctx.get_table(catalog_name, db_name, tbl_name).await?;
178178

179-
table.append2(ctx.clone(), &mut pipeline)?;
179+
table.append2(ctx.clone(), &mut pipeline, false)?;
180180
pipeline.set_max_threads(settings.get_max_threads()? as usize);
181181

182182
let query_need_abort = ctx.query_need_abort();

src/query/service/src/interpreters/interpreter_insert_v2.rs

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -26,14 +26,15 @@ use common_streams::SendableDataBlockStream;
2626
use parking_lot::Mutex;
2727

2828
use super::commit2table;
29+
use super::commit2table_with_append_entries;
2930
use super::interpreter_common::append2table;
3031
use super::plan_schedulers::build_schedule_pipepline;
3132
use crate::clusters::ClusterHelper;
3233
use crate::interpreters::Interpreter;
3334
use crate::interpreters::InterpreterPtr;
3435
use crate::interpreters::SelectInterpreterV2;
3536
use crate::pipelines::executor::ExecutorSettings;
36-
use crate::pipelines::executor::PipelineCompleteExecutor;
37+
use crate::pipelines::executor::PipelinePullingExecutor;
3738
use crate::pipelines::processors::port::OutputPort;
3839
use crate::pipelines::processors::BlocksSource;
3940
use crate::pipelines::processors::TransformCastSchema;
@@ -218,7 +219,7 @@ impl InsertInterpreterV2 {
218219
catalog,
219220
table_info: table.get_table_info().clone(),
220221
select_schema: plan.schema(),
221-
insert_schema: self.schema(),
222+
insert_schema: self.plan.schema(),
222223
cast_needed: self.check_schema_cast(plan)?,
223224
});
224225

@@ -234,15 +235,15 @@ impl InsertInterpreterV2 {
234235
let query_need_abort = self.ctx.query_need_abort();
235236
let executor_settings = ExecutorSettings::try_create(&settings)?;
236237
build_res.set_max_threads(settings.get_max_threads()? as usize);
237-
let mut pipelines = build_res.sources_pipelines;
238-
pipelines.push(build_res.main_pipeline);
239-
let executor = PipelineCompleteExecutor::from_pipelines(
238+
239+
let executor = PipelinePullingExecutor::from_pipelines(
240240
query_need_abort,
241-
pipelines,
241+
build_res,
242242
executor_settings,
243243
)?;
244-
executor.execute()?;
245-
commit2table(self.ctx.clone(), table.clone(), self.plan.overwrite).await?;
244+
245+
commit2table_with_append_entries(self.ctx.clone(), table, self.plan.overwrite, vec![])
246+
.await?;
246247

247248
Ok(Box::pin(DataBlockStream::create(
248249
self.plan.schema(),

src/query/service/src/interpreters/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -99,6 +99,7 @@ pub use interpreter_cluster_key_drop::DropTableClusterKeyInterpreter;
9999
pub use interpreter_clustering_history::InterpreterClusteringHistory;
100100
pub use interpreter_common::append2table;
101101
pub use interpreter_common::commit2table;
102+
pub use interpreter_common::commit2table_with_append_entries;
102103
pub use interpreter_common::fill_missing_columns;
103104
pub use interpreter_common::list_files_from_dal;
104105
pub use interpreter_database_create::CreateDatabaseInterpreter;

src/query/service/src/sql/executor/physical_plan.rs

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -21,13 +21,13 @@ use common_datavalues::DataField;
2121
use common_datavalues::DataSchemaRef;
2222
use common_datavalues::DataSchemaRefExt;
2323
use common_datavalues::NullableType;
24-
use common_datavalues::StringType;
2524
use common_datavalues::ToDataType;
2625
use common_datavalues::Vu8;
2726
use common_exception::Result;
2827
use common_meta_app::schema::TableInfo;
2928
use common_planners::ReadDataSourcePlan;
3029
use common_planners::StageKind;
30+
use common_planners::SINK_SCHEMA;
3131

3232
use super::physical_scalar::PhysicalScalar;
3333
use super::AggregateFunctionDesc;
@@ -317,10 +317,7 @@ pub struct DistributedInsertSelect {
317317

318318
impl DistributedInsertSelect {
319319
pub fn output_schema(&self) -> Result<DataSchemaRef> {
320-
Ok(DataSchemaRefExt::create(vec![
321-
DataField::new("seg_loc", StringType::new_impl()),
322-
DataField::new("seg_info", StringType::new_impl()),
323-
]))
320+
Ok(SINK_SCHEMA.clone())
324321
}
325322
}
326323

src/query/service/src/sql/executor/pipeline_builder.rs

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -126,7 +126,7 @@ impl PipelineBuilder {
126126
PhysicalPlan::ExchangeSource(source) => self.build_exchange_source(source),
127127
PhysicalPlan::UnionAll(union_all) => self.build_union_all(union_all),
128128
PhysicalPlan::DistributedInsertSelect(insert_select) => {
129-
self.build_insert_select(insert_select)
129+
self.build_distributed_insert_select(insert_select)
130130
}
131131
PhysicalPlan::Exchange(_) => Err(ErrorCode::LogicalError(
132132
"Invalid physical plan with PhysicalPlan::Exchange",
@@ -532,7 +532,10 @@ impl PipelineBuilder {
532532
Ok(())
533533
}
534534

535-
pub fn build_insert_select(&mut self, insert_select: &DistributedInsertSelect) -> Result<()> {
535+
pub fn build_distributed_insert_select(
536+
&mut self,
537+
insert_select: &DistributedInsertSelect,
538+
) -> Result<()> {
536539
let select_schema = &insert_select.select_schema;
537540
let insert_schema = &insert_select.insert_schema;
538541

@@ -576,7 +579,7 @@ impl PipelineBuilder {
576579
&mut self.main_pipeline,
577580
)?;
578581

579-
table.append2(self.ctx.clone(), &mut self.main_pipeline)?;
582+
table.append2(self.ctx.clone(), &mut self.main_pipeline, true)?;
580583

581584
Ok(())
582585
}

src/query/service/src/storages/stage/stage_table.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -123,7 +123,7 @@ impl Table for StageTable {
123123
Ok(())
124124
}
125125

126-
fn append2(&self, ctx: Arc<dyn TableContext>, pipeline: &mut Pipeline) -> Result<()> {
126+
fn append2(&self, ctx: Arc<dyn TableContext>, pipeline: &mut Pipeline, _: bool) -> Result<()> {
127127
let mut sink_pipeline_builder = SinkPipeBuilder::create();
128128
for _ in 0..pipeline.output_len() {
129129
let input_port = InputPort::create();

src/query/service/tests/it/storages/fuse/operations/navigate.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -158,7 +158,7 @@ async fn test_fuse_historical_table_is_read_only() -> Result<()> {
158158
.await?;
159159

160160
// check append2
161-
let res = tbl.append2(ctx.clone(), &mut Pipeline::create());
161+
let res = tbl.append2(ctx.clone(), &mut Pipeline::create(), false);
162162
assert_not_writable(res, "append2");
163163

164164
// check append_data

0 commit comments

Comments
 (0)