Skip to content

Commit 8c7ccc2

Browse files
committed
refactor(interpreter): move execute to execute2
1 parent eeb62b3 commit 8c7ccc2

File tree

66 files changed

+345
-408
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

66 files changed

+345
-408
lines changed

src/query/pipeline/sources/src/processors/sources/mod.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ pub mod stream_source;
2222
pub mod stream_source_v2;
2323
pub mod sync_source;
2424
pub mod sync_source_receiver;
25+
mod one_block_source;
2526

2627
pub use async_source::AsyncSource;
2728
pub use async_source::AsyncSourcer;
@@ -40,6 +41,7 @@ pub use sync_source::SyncSource;
4041
pub use sync_source::SyncSourcer;
4142
pub use sync_source::*;
4243
pub use sync_source_receiver::SyncReceiverSource;
44+
pub use one_block_source::OneBlockSource;
4345

4446
#[allow(dead_code)]
4547
mod source_example {
Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,71 @@
1+
// Copyright 2022 Datafuse Labs.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
use std::any::Any;
16+
use std::collections::VecDeque;
17+
use std::sync::Arc;
18+
19+
use common_catalog::table_context::TableContext;
20+
use common_datablocks::DataBlock;
21+
use common_exception::Result;
22+
use common_pipeline_core::processors::port::OutputPort;
23+
use common_pipeline_core::processors::processor::{Event, ProcessorPtr};
24+
use parking_lot::Mutex;
25+
use common_pipeline_core::processors::Processor;
26+
27+
use crate::processors::sources::SyncSource;
28+
use crate::processors::sources::SyncSourcer;
29+
30+
pub struct OneBlockSource {
31+
output: Arc<OutputPort>,
32+
data_block: Option<DataBlock>,
33+
}
34+
35+
impl OneBlockSource {
36+
pub fn create(output: Arc<OutputPort>, data_block: DataBlock) -> Result<ProcessorPtr> {
37+
Ok(ProcessorPtr::create(Box::new(OneBlockSource {
38+
output,
39+
data_block: Some(data_block),
40+
})))
41+
}
42+
}
43+
44+
#[async_trait::async_trait]
45+
impl Processor for OneBlockSource {
46+
fn name(&self) -> &'static str {
47+
"BlockSource"
48+
}
49+
50+
fn as_any(&mut self) -> &mut dyn Any {
51+
self
52+
}
53+
54+
fn event(&mut self) -> Result<Event> {
55+
if self.output.is_finished() {
56+
return Ok(Event::Finished);
57+
}
58+
59+
if !self.output.can_push() {
60+
return Ok(Event::NeedConsume);
61+
}
62+
63+
if let Some(data_block) = self.data_block.take() {
64+
self.output.push_data(Ok(data_block));
65+
return Ok(Event::NeedConsume);
66+
}
67+
68+
self.output.finish();
69+
Ok(Event::Finished)
70+
}
71+
}

src/query/service/src/interpreters/async_insert_queue.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -393,7 +393,7 @@ impl AsyncInsertManager {
393393
input: Arc::new((**plan).clone()),
394394
})?;
395395

396-
let mut build_res = select_interpreter.create_new_pipeline().await?;
396+
let mut build_res = select_interpreter.execute2().await?;
397397

398398
let mut sink_pipeline_builder = SinkPipeBuilder::create();
399399
for _ in 0..build_res.main_pipeline.output_len() {

src/query/service/src/interpreters/interpreter.rs

Lines changed: 5 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -34,18 +34,13 @@ pub trait Interpreter: Sync + Send {
3434
}
3535

3636
/// The core of the databend processor which will execute the logical plan and get the DataBlock
37-
async fn execute(&self) -> Result<SendableDataBlockStream>;
38-
39-
/// Create the new pipeline for databend's new execution model
40-
/// Currently databend is developing a new execution model with a hybrid pull-based & push-based strategy
41-
/// The method now only is implemented by SelectInterpreter
42-
async fn create_new_pipeline(&self) -> Result<PipelineBuildResult> {
43-
Err(ErrorCode::UnImplement(format!(
44-
"UnImplement create_new_pipeline method for {:?}",
45-
self.name()
46-
)))
37+
async fn execute(&self) -> Result<SendableDataBlockStream> {
38+
unimplemented!()
4739
}
4840

41+
/// The core of the databend processor which will execute the logical plan and build the pipeline
42+
async fn execute2(&self) -> Result<PipelineBuildResult>;
43+
4944
/// Do some start work for the interpreter
5045
/// Such as query counts, query start time and etc
5146
async fn start(&self) -> Result<()> {

src/query/service/src/interpreters/interpreter_call.rs

Lines changed: 5 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ use super::Interpreter;
2525
use crate::interpreters::ProcessorExecutorStream;
2626
use crate::pipelines::executor::ExecutorSettings;
2727
use crate::pipelines::executor::PipelinePullingExecutor;
28+
use crate::pipelines::PipelineBuildResult;
2829
use crate::procedures::ProcedureFactory;
2930
use crate::sessions::QueryContext;
3031
use crate::sessions::TableContext;
@@ -61,7 +62,7 @@ impl Interpreter for CallInterpreter {
6162
}
6263

6364
#[tracing::instrument(level = "debug", name = "call_interpreter_execute", skip(self), fields(ctx.id = self.ctx.get_id().as_str()))]
64-
async fn execute(&self) -> Result<SendableDataBlockStream> {
65+
async fn execute2(&self) -> Result<PipelineBuildResult> {
6566
let plan = &self.plan;
6667

6768
let name = plan.name.clone();
@@ -72,18 +73,10 @@ impl Interpreter for CallInterpreter {
7273
*schema = Some(last_schema);
7374
}
7475

75-
let mut pipeline = Pipeline::create();
76-
func.eval(self.ctx.clone(), plan.args.clone(), &mut pipeline)
76+
let mut build_res = PipelineBuildResult::create();
77+
func.eval(self.ctx.clone(), plan.args.clone(), &mut build_res.main_pipeline)
7778
.await?;
7879

79-
let ctx = &self.ctx;
80-
let settings = ctx.get_settings();
81-
let query_need_abort = ctx.query_need_abort();
82-
pipeline.set_max_threads(settings.get_max_threads()? as usize);
83-
let executor_settings = ExecutorSettings::try_create(&settings)?;
84-
let executor =
85-
PipelinePullingExecutor::try_create(query_need_abort, pipeline, executor_settings)?;
86-
87-
Ok(Box::pin(ProcessorExecutorStream::create(executor)?))
80+
Ok(build_res)
8881
}
8982
}

src/query/service/src/interpreters/interpreter_cluster_key_alter.rs

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ use common_meta_types::UserPrivilegeType;
2020
use common_planners::AlterTableClusterKeyPlan;
2121
use common_streams::DataBlockStream;
2222
use common_streams::SendableDataBlockStream;
23+
use crate::pipelines::PipelineBuildResult;
2324

2425
use super::Interpreter;
2526
use crate::sessions::QueryContext;
@@ -42,7 +43,7 @@ impl Interpreter for AlterTableClusterKeyInterpreter {
4243
"AlterTableClusterKeyInterpreter"
4344
}
4445

45-
async fn execute(&self) -> Result<SendableDataBlockStream> {
46+
async fn execute2(&self) -> Result<PipelineBuildResult> {
4647
let plan = &self.plan;
4748
self.ctx
4849
.get_current_session()
@@ -68,10 +69,7 @@ impl Interpreter for AlterTableClusterKeyInterpreter {
6869
table
6970
.alter_table_cluster_keys(self.ctx.clone(), &self.plan.catalog, cluster_key_str)
7071
.await?;
71-
Ok(Box::pin(DataBlockStream::create(
72-
self.plan.schema(),
73-
None,
74-
vec![],
75-
)))
72+
73+
Ok(PipelineBuildResult::create())
7674
}
7775
}

src/query/service/src/interpreters/interpreter_cluster_key_drop.rs

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ use common_meta_types::UserPrivilegeType;
2020
use common_planners::DropTableClusterKeyPlan;
2121
use common_streams::DataBlockStream;
2222
use common_streams::SendableDataBlockStream;
23+
use crate::pipelines::PipelineBuildResult;
2324

2425
use super::Interpreter;
2526
use crate::sessions::QueryContext;
@@ -42,7 +43,7 @@ impl Interpreter for DropTableClusterKeyInterpreter {
4243
"DropTableClusterKeyInterpreter"
4344
}
4445

45-
async fn execute(&self) -> Result<SendableDataBlockStream> {
46+
async fn execute2(&self) -> Result<PipelineBuildResult> {
4647
let plan = &self.plan;
4748
self.ctx
4849
.get_current_session()
@@ -66,10 +67,7 @@ impl Interpreter for DropTableClusterKeyInterpreter {
6667
table
6768
.drop_table_cluster_keys(self.ctx.clone(), &self.plan.catalog)
6869
.await?;
69-
Ok(Box::pin(DataBlockStream::create(
70-
self.plan.schema(),
71-
None,
72-
vec![],
73-
)))
70+
71+
Ok(PipelineBuildResult::create())
7472
}
7573
}

src/query/service/src/interpreters/interpreter_copy_v2.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -256,7 +256,7 @@ impl CopyInterpreterV2 {
256256
files: vec![],
257257
};
258258

259-
let mut build_res = select_interpreter.create_new_pipeline().await?;
259+
let mut build_res = select_interpreter.execute2().await?;
260260
let table = StageTable::try_create(stage_table_info)?;
261261

262262
append2table(self.ctx.clone(), table.clone(), data_schema.clone(), &mut build_res, false, true)?;
@@ -272,7 +272,7 @@ impl Interpreter for CopyInterpreterV2 {
272272

273273
#[tracing::instrument(level = "debug", name = "copy_interpreter_execute_v2", skip(self), fields(ctx.id = self.ctx.get_id().as_str()))]
274274
async fn execute(&self) -> Result<SendableDataBlockStream> {
275-
let build_res = self.create_new_pipeline().await?;
275+
let build_res = self.execute2().await?;
276276

277277
let settings = self.ctx.get_settings();
278278
let query_need_abort = self.ctx.query_need_abort();
@@ -295,7 +295,7 @@ impl Interpreter for CopyInterpreterV2 {
295295
)))
296296
}
297297

298-
async fn create_new_pipeline(&self) -> Result<PipelineBuildResult> {
298+
async fn execute2(&self) -> Result<PipelineBuildResult> {
299299
match &self.plan {
300300
// TODO(xuanwo): extract them as a separate function.
301301
CopyPlanV2::IntoTable {

src/query/service/src/interpreters/interpreter_database_create.rs

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ use common_streams::DataBlockStream;
2323
use common_streams::SendableDataBlockStream;
2424

2525
use crate::interpreters::Interpreter;
26+
use crate::pipelines::PipelineBuildResult;
2627
use crate::sessions::QueryContext;
2728
use crate::sessions::TableContext;
2829

@@ -45,7 +46,7 @@ impl Interpreter for CreateDatabaseInterpreter {
4546
}
4647

4748
#[tracing::instrument(level = "debug", skip(self), fields(ctx.id = self.ctx.get_id().as_str()))]
48-
async fn execute(&self) -> Result<SendableDataBlockStream> {
49+
async fn execute2(&self) -> Result<PipelineBuildResult> {
4950
self.ctx
5051
.get_current_session()
5152
.validate_privilege(&GrantObject::Global, UserPrivilegeType::Create)
@@ -67,10 +68,6 @@ impl Interpreter for CreateDatabaseInterpreter {
6768
};
6869
catalog.create_database(self.plan.clone().into()).await?;
6970

70-
Ok(Box::pin(DataBlockStream::create(
71-
self.plan.schema(),
72-
None,
73-
vec![],
74-
)))
71+
Ok(PipelineBuildResult::create())
7572
}
7673
}

src/query/service/src/interpreters/interpreter_database_drop.rs

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ use common_streams::DataBlockStream;
2222
use common_streams::SendableDataBlockStream;
2323

2424
use crate::interpreters::Interpreter;
25+
use crate::pipelines::PipelineBuildResult;
2526
use crate::sessions::QueryContext;
2627
use crate::sessions::TableContext;
2728

@@ -42,7 +43,7 @@ impl Interpreter for DropDatabaseInterpreter {
4243
"DropDatabaseInterpreter"
4344
}
4445

45-
async fn execute(&self) -> Result<SendableDataBlockStream> {
46+
async fn execute2(&self) -> Result<PipelineBuildResult> {
4647
self.ctx
4748
.get_current_session()
4849
.validate_privilege(&GrantObject::Global, UserPrivilegeType::Drop)
@@ -51,10 +52,6 @@ impl Interpreter for DropDatabaseInterpreter {
5152
let catalog = self.ctx.get_catalog(&self.plan.catalog)?;
5253
catalog.drop_database(self.plan.clone().into()).await?;
5354

54-
Ok(Box::pin(DataBlockStream::create(
55-
self.plan.schema(),
56-
None,
57-
vec![],
58-
)))
55+
Ok(PipelineBuildResult::create())
5956
}
6057
}

0 commit comments

Comments
 (0)