Skip to content

Commit 7d35a62

Browse files
authored
Merge pull request #7582 from zhang2014/refactor/interpreter_2
refactor(interpreter): remove sendable stream in interpreter
2 parents 990433f + 4650f7b commit 7d35a62

File tree

130 files changed

+870
-1025
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

130 files changed

+870
-1025
lines changed

src/query/pipeline/sources/src/processors/sources/mod.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ pub mod deserializer;
1818
pub mod empty_source;
1919
pub mod file_splitter;
2020
pub mod multi_file_splitter;
21+
mod one_block_source;
2122
pub mod stream_source;
2223
pub mod stream_source_v2;
2324
pub mod sync_source;
@@ -33,6 +34,7 @@ pub use file_splitter::FileSplitter;
3334
pub use file_splitter::FileSplitterState;
3435
pub use multi_file_splitter::MultiFileSplitter;
3536
pub use multi_file_splitter::OperatorInfo;
37+
pub use one_block_source::OneBlockSource;
3638
pub use stream_source::StreamSource;
3739
pub use stream_source::StreamSourceNoSkipEmpty;
3840
pub use stream_source_v2::StreamSourceV2;
Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
// Copyright 2022 Datafuse Labs.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
use std::any::Any;
16+
use std::sync::Arc;
17+
18+
use common_datablocks::DataBlock;
19+
use common_exception::Result;
20+
use common_pipeline_core::processors::port::OutputPort;
21+
use common_pipeline_core::processors::processor::Event;
22+
use common_pipeline_core::processors::processor::ProcessorPtr;
23+
use common_pipeline_core::processors::Processor;
24+
25+
pub struct OneBlockSource {
26+
output: Arc<OutputPort>,
27+
data_block: Option<DataBlock>,
28+
}
29+
30+
impl OneBlockSource {
31+
pub fn create(output: Arc<OutputPort>, data_block: DataBlock) -> Result<ProcessorPtr> {
32+
Ok(ProcessorPtr::create(Box::new(OneBlockSource {
33+
output,
34+
data_block: Some(data_block),
35+
})))
36+
}
37+
}
38+
39+
#[async_trait::async_trait]
40+
impl Processor for OneBlockSource {
41+
fn name(&self) -> &'static str {
42+
"BlockSource"
43+
}
44+
45+
fn as_any(&mut self) -> &mut dyn Any {
46+
self
47+
}
48+
49+
fn event(&mut self) -> Result<Event> {
50+
if self.output.is_finished() {
51+
return Ok(Event::Finished);
52+
}
53+
54+
if !self.output.can_push() {
55+
return Ok(Event::NeedConsume);
56+
}
57+
58+
if let Some(data_block) = self.data_block.take() {
59+
self.output.push_data(Ok(data_block));
60+
return Ok(Event::NeedConsume);
61+
}
62+
63+
self.output.finish();
64+
Ok(Event::Finished)
65+
}
66+
}

src/query/service/benches/suites/mod.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -37,8 +37,8 @@ pub async fn select_executor(sql: &str) -> Result<()> {
3737
let ctx = executor_session.create_query_context().await?;
3838

3939
if let PlanNode::Select(plan) = PlanParser::parse(ctx.clone(), sql).await? {
40-
let executor = SelectInterpreter::try_create(ctx, plan)?;
41-
let mut stream = executor.execute().await?;
40+
let executor = SelectInterpreter::try_create(ctx.clone(), plan)?;
41+
let mut stream = executor.execute(ctx.clone()).await?;
4242
while let Some(_block) = stream.next().await {}
4343
} else {
4444
unreachable!()

src/query/service/src/interpreters/async_insert_queue.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -393,7 +393,7 @@ impl AsyncInsertManager {
393393
input: Arc::new((**plan).clone()),
394394
})?;
395395

396-
let mut build_res = select_interpreter.create_new_pipeline().await?;
396+
let mut build_res = select_interpreter.execute2().await?;
397397

398398
let mut sink_pipeline_builder = SinkPipeBuilder::create();
399399
for _ in 0..build_res.main_pipeline.output_len() {
@@ -506,12 +506,12 @@ impl AsyncInsertManager {
506506
let blocks = Arc::new(Mutex::new(VecDeque::from_iter(
507507
data.entries.iter().map(|x| x.block.clone()),
508508
)));
509-
let source = BlocksSource::create(ctx, output_port.clone(), blocks)?;
509+
let source = BlocksSource::create(ctx.clone(), output_port.clone(), blocks)?;
510510
let mut builder = SourcePipeBuilder::create();
511511
builder.add_source(output_port.clone(), source);
512512

513513
interpreter.set_source_pipe_builder(Some(builder))?;
514-
interpreter.execute().await?;
514+
interpreter.execute(ctx).await?;
515515
Ok(())
516516
}
517517

src/query/service/src/interpreters/interpreter.rs

Lines changed: 53 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -12,14 +12,24 @@
1212
// See the License for the specific language governing permissions and
1313
// limitations under the License.
1414

15+
use std::sync::Arc;
16+
17+
use common_catalog::table_context::TableContext;
18+
use common_datavalues::DataSchema;
1519
use common_datavalues::DataSchemaRef;
1620
use common_datavalues::DataSchemaRefExt;
1721
use common_exception::ErrorCode;
1822
use common_exception::Result;
23+
use common_streams::DataBlockStream;
1924
use common_streams::SendableDataBlockStream;
2025

26+
use crate::interpreters::ProcessorExecutorStream;
27+
use crate::pipelines::executor::ExecutorSettings;
28+
use crate::pipelines::executor::PipelineCompleteExecutor;
29+
use crate::pipelines::executor::PipelinePullingExecutor;
2130
use crate::pipelines::PipelineBuildResult;
2231
use crate::pipelines::SourcePipeBuilder;
32+
use crate::sessions::QueryContext;
2333

2434
#[async_trait::async_trait]
2535
/// Interpreter is a trait for different PlanNode
@@ -34,18 +44,52 @@ pub trait Interpreter: Sync + Send {
3444
}
3545

3646
/// The core of the databend processor which will execute the logical plan and get the DataBlock
37-
async fn execute(&self) -> Result<SendableDataBlockStream>;
47+
async fn execute(&self, ctx: Arc<QueryContext>) -> Result<SendableDataBlockStream> {
48+
let mut build_res = self.execute2().await?;
3849

39-
/// Create the new pipeline for databend's new execution model
40-
/// Currently databend is developing a new execution model with a hybrid pull-based & push-based strategy
41-
/// The method now only is implemented by SelectInterpreter
42-
async fn create_new_pipeline(&self) -> Result<PipelineBuildResult> {
43-
Err(ErrorCode::UnImplement(format!(
44-
"UnImplement create_new_pipeline method for {:?}",
45-
self.name()
46-
)))
50+
if build_res.main_pipeline.pipes.is_empty() {
51+
return Ok(Box::pin(DataBlockStream::create(
52+
self.schema(),
53+
None,
54+
vec![],
55+
)));
56+
}
57+
58+
let settings = ctx.get_settings();
59+
let query_need_abort = ctx.query_need_abort();
60+
let executor_settings = ExecutorSettings::try_create(&settings)?;
61+
build_res.set_max_threads(settings.get_max_threads()? as usize);
62+
63+
if build_res.main_pipeline.is_complete_pipeline()? {
64+
let mut pipelines = build_res.sources_pipelines;
65+
pipelines.push(build_res.main_pipeline);
66+
67+
let complete_executor = PipelineCompleteExecutor::from_pipelines(
68+
query_need_abort,
69+
pipelines,
70+
executor_settings,
71+
)?;
72+
73+
complete_executor.execute()?;
74+
return Ok(Box::pin(DataBlockStream::create(
75+
Arc::new(DataSchema::new(vec![])),
76+
None,
77+
vec![],
78+
)));
79+
}
80+
81+
Ok(Box::pin(Box::pin(ProcessorExecutorStream::create(
82+
PipelinePullingExecutor::from_pipelines(
83+
ctx.query_need_abort(),
84+
build_res,
85+
executor_settings,
86+
)?,
87+
)?)))
4788
}
4889

90+
/// The core of the databend processor which will execute the logical plan and build the pipeline
91+
async fn execute2(&self) -> Result<PipelineBuildResult>;
92+
4993
/// Do some start work for the interpreter
5094
/// Such as query counts, query start time and etc
5195
async fn start(&self) -> Result<()> {

src/query/service/src/interpreters/interpreter_call.rs

Lines changed: 10 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -18,13 +18,9 @@ use std::sync::RwLock;
1818
use common_datavalues::DataSchemaRef;
1919
use common_exception::Result;
2020
use common_legacy_planners::CallPlan;
21-
use common_pipeline_core::Pipeline;
22-
use common_streams::SendableDataBlockStream;
2321

2422
use super::Interpreter;
25-
use crate::interpreters::ProcessorExecutorStream;
26-
use crate::pipelines::executor::ExecutorSettings;
27-
use crate::pipelines::executor::PipelinePullingExecutor;
23+
use crate::pipelines::PipelineBuildResult;
2824
use crate::procedures::ProcedureFactory;
2925
use crate::sessions::QueryContext;
3026
use crate::sessions::TableContext;
@@ -61,7 +57,7 @@ impl Interpreter for CallInterpreter {
6157
}
6258

6359
#[tracing::instrument(level = "debug", name = "call_interpreter_execute", skip(self), fields(ctx.id = self.ctx.get_id().as_str()))]
64-
async fn execute(&self) -> Result<SendableDataBlockStream> {
60+
async fn execute2(&self) -> Result<PipelineBuildResult> {
6561
let plan = &self.plan;
6662

6763
let name = plan.name.clone();
@@ -72,18 +68,14 @@ impl Interpreter for CallInterpreter {
7268
*schema = Some(last_schema);
7369
}
7470

75-
let mut pipeline = Pipeline::create();
76-
func.eval(self.ctx.clone(), plan.args.clone(), &mut pipeline)
77-
.await?;
71+
let mut build_res = PipelineBuildResult::create();
72+
func.eval(
73+
self.ctx.clone(),
74+
plan.args.clone(),
75+
&mut build_res.main_pipeline,
76+
)
77+
.await?;
7878

79-
let ctx = &self.ctx;
80-
let settings = ctx.get_settings();
81-
let query_need_abort = ctx.query_need_abort();
82-
pipeline.set_max_threads(settings.get_max_threads()? as usize);
83-
let executor_settings = ExecutorSettings::try_create(&settings)?;
84-
let executor =
85-
PipelinePullingExecutor::try_create(query_need_abort, pipeline, executor_settings)?;
86-
87-
Ok(Box::pin(ProcessorExecutorStream::create(executor)?))
79+
Ok(build_res)
8880
}
8981
}

src/query/service/src/interpreters/interpreter_cluster_key_alter.rs

Lines changed: 4 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -18,10 +18,9 @@ use common_exception::Result;
1818
use common_legacy_planners::AlterTableClusterKeyPlan;
1919
use common_meta_types::GrantObject;
2020
use common_meta_types::UserPrivilegeType;
21-
use common_streams::DataBlockStream;
22-
use common_streams::SendableDataBlockStream;
2321

2422
use super::Interpreter;
23+
use crate::pipelines::PipelineBuildResult;
2524
use crate::sessions::QueryContext;
2625
use crate::sessions::TableContext;
2726

@@ -42,7 +41,7 @@ impl Interpreter for AlterTableClusterKeyInterpreter {
4241
"AlterTableClusterKeyInterpreter"
4342
}
4443

45-
async fn execute(&self) -> Result<SendableDataBlockStream> {
44+
async fn execute2(&self) -> Result<PipelineBuildResult> {
4645
let plan = &self.plan;
4746
self.ctx
4847
.get_current_session()
@@ -68,10 +67,7 @@ impl Interpreter for AlterTableClusterKeyInterpreter {
6867
table
6968
.alter_table_cluster_keys(self.ctx.clone(), &self.plan.catalog, cluster_key_str)
7069
.await?;
71-
Ok(Box::pin(DataBlockStream::create(
72-
self.plan.schema(),
73-
None,
74-
vec![],
75-
)))
70+
71+
Ok(PipelineBuildResult::create())
7672
}
7773
}

src/query/service/src/interpreters/interpreter_cluster_key_drop.rs

Lines changed: 4 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -18,10 +18,9 @@ use common_exception::Result;
1818
use common_legacy_planners::DropTableClusterKeyPlan;
1919
use common_meta_types::GrantObject;
2020
use common_meta_types::UserPrivilegeType;
21-
use common_streams::DataBlockStream;
22-
use common_streams::SendableDataBlockStream;
2321

2422
use super::Interpreter;
23+
use crate::pipelines::PipelineBuildResult;
2524
use crate::sessions::QueryContext;
2625
use crate::sessions::TableContext;
2726

@@ -42,7 +41,7 @@ impl Interpreter for DropTableClusterKeyInterpreter {
4241
"DropTableClusterKeyInterpreter"
4342
}
4443

45-
async fn execute(&self) -> Result<SendableDataBlockStream> {
44+
async fn execute2(&self) -> Result<PipelineBuildResult> {
4645
let plan = &self.plan;
4746
self.ctx
4847
.get_current_session()
@@ -66,10 +65,7 @@ impl Interpreter for DropTableClusterKeyInterpreter {
6665
table
6766
.drop_table_cluster_keys(self.ctx.clone(), &self.plan.catalog)
6867
.await?;
69-
Ok(Box::pin(DataBlockStream::create(
70-
self.plan.schema(),
71-
None,
72-
vec![],
73-
)))
68+
69+
Ok(PipelineBuildResult::create())
7470
}
7571
}

0 commit comments

Comments
 (0)