Skip to content

Commit 15c8faf

Browse files
committed
refactor(interpreter): move execute into trait
1 parent c0acdc6 commit 15c8faf

16 files changed

+125
-92
lines changed

src/query/service/src/interpreters/async_insert_queue.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -506,12 +506,12 @@ impl AsyncInsertManager {
506506
let blocks = Arc::new(Mutex::new(VecDeque::from_iter(
507507
data.entries.iter().map(|x| x.block.clone()),
508508
)));
509-
let source = BlocksSource::create(ctx, output_port.clone(), blocks)?;
509+
let source = BlocksSource::create(ctx.clone(), output_port.clone(), blocks)?;
510510
let mut builder = SourcePipeBuilder::create();
511511
builder.add_source(output_port.clone(), source);
512512

513513
interpreter.set_source_pipe_builder(Some(builder))?;
514-
interpreter.execute().await?;
514+
interpreter.execute(ctx).await?;
515515
Ok(())
516516
}
517517

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
use std::sync::Arc;
2+
use common_catalog::table_context::TableContext;
3+
use common_pipeline_core::{Pipe, SourcePipeBuilder};
4+
use crate::interpreters::InterpreterFactory;
5+
use crate::optimizers::Optimizers;
6+
use crate::sessions::QueryContext;
7+
use crate::sql::PlanParser;
8+
use common_exception::{ErrorCode, Result};
9+
use crate::pipelines::executor::{ExecutorSettings, PipelineCompleteExecutor};
10+
use crate::pipelines::PipelineBuildResult;
11+
12+
fn execute_complete(ctx: Arc<QueryContext>, build_res: PipelineBuildResult) -> Result<()> {
13+
if !build_res.main_pipeline.is_complete_pipeline()? {
14+
return Err(ErrorCode::LogicalError(""));
15+
}
16+
17+
let mut pipelines = build_res.sources_pipelines;
18+
pipelines.push(build_res.main_pipeline);
19+
20+
let settings = ctx.get_settings();
21+
let query_need_abort = ctx.query_need_abort();
22+
let executor_settings = ExecutorSettings::try_create(&settings)?;
23+
PipelineCompleteExecutor::from_pipelines(query_need_abort, pipelines, executor_settings)?
24+
.execute()
25+
}
26+
27+
pub async fn execute_in(ctx: Arc<QueryContext>, query: &str, pipe: Pipe) -> Result<()> {
28+
match pipe {
29+
Pipe::ResizePipe { .. } => Err(ErrorCode::LogicalError("")),
30+
Pipe::SimplePipe { processors, inputs_port, outputs_port } => {
31+
let plan = PlanParser::parse(ctx.clone(), query).await?;
32+
let optimized = Optimizers::create(ctx.clone()).optimize(&plan)?;
33+
let interpreter = InterpreterFactory::get(ctx.clone(), optimized)?;
34+
35+
let mut source_pipe_builder = SourcePipeBuilder::create();
36+
for (index, processor) in processors.into_iter().enumerate() {
37+
source_pipe_builder.add_source(outputs_port[index].clone(), processor);
38+
}
39+
40+
interpreter.set_source_pipe_builder(Some(source_pipe_builder))?;
41+
execute_complete(ctx, interpreter.execute2().await?)
42+
}
43+
}
44+
}
45+
46+
pub async fn execute(ctx: Arc<QueryContext>, query: &str) -> Result<()> {
47+
let plan = PlanParser::parse(ctx.clone(), query).await?;
48+
let optimized = Optimizers::create(ctx.clone()).optimize(&plan)?;
49+
let interpreter = InterpreterFactory::get(ctx.clone(), optimized)?;
50+
51+
match interpreter.execute2().await?.main_pipeline.pipes.is_empty() {
52+
true => Ok(()),
53+
false => Err(ErrorCode::LogicalError(""))
54+
}
55+
}

src/query/service/src/interpreters/interpreter.rs

Lines changed: 39 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -12,14 +12,19 @@
1212
// See the License for the specific language governing permissions and
1313
// limitations under the License.
1414

15-
use common_datavalues::DataSchemaRef;
15+
use std::sync::Arc;
16+
use common_catalog::table_context::TableContext;
17+
use common_datavalues::{DataSchema, DataSchemaRef};
1618
use common_datavalues::DataSchemaRefExt;
1719
use common_exception::ErrorCode;
1820
use common_exception::Result;
19-
use common_streams::SendableDataBlockStream;
21+
use common_streams::{DataBlockStream, SendableDataBlockStream};
22+
use crate::interpreters::ProcessorExecutorStream;
23+
use crate::pipelines::executor::{ExecutorSettings, PipelineCompleteExecutor, PipelinePullingExecutor};
2024

2125
use crate::pipelines::PipelineBuildResult;
2226
use crate::pipelines::SourcePipeBuilder;
27+
use crate::sessions::QueryContext;
2328

2429
#[async_trait::async_trait]
2530
/// Interpreter is a trait for different PlanNode
@@ -34,8 +39,38 @@ pub trait Interpreter: Sync + Send {
3439
}
3540

3641
/// The core of the databend processor which will execute the logical plan and get the DataBlock
37-
async fn execute(&self) -> Result<SendableDataBlockStream> {
38-
unimplemented!()
42+
async fn execute(&self, ctx: Arc<QueryContext>) -> Result<SendableDataBlockStream> {
43+
let build_res = self.execute2().await?;
44+
45+
let settings = ctx.get_settings();
46+
let query_need_abort = ctx.query_need_abort();
47+
let executor_settings = ExecutorSettings::try_create(&settings)?;
48+
49+
if build_res.main_pipeline.is_complete_pipeline()? {
50+
let mut pipelines = build_res.sources_pipelines;
51+
pipelines.push(build_res.main_pipeline);
52+
53+
let complete_executor = PipelineCompleteExecutor::from_pipelines(
54+
query_need_abort,
55+
pipelines,
56+
executor_settings,
57+
)?;
58+
59+
complete_executor.execute()?;
60+
return Ok(Box::pin(DataBlockStream::create(
61+
Arc::new(DataSchema::new(vec![])),
62+
None,
63+
vec![],
64+
)));
65+
}
66+
67+
Ok(Box::pin(Box::pin(ProcessorExecutorStream::create(
68+
PipelinePullingExecutor::from_pipelines(
69+
ctx.query_need_abort(),
70+
build_res,
71+
executor_settings,
72+
)?,
73+
)?)))
3974
}
4075

4176
/// The core of the databend processor which will execute the logical plan and build the pipeline

src/query/service/src/interpreters/interpreter_copy_v2.rs

Lines changed: 0 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -275,30 +275,6 @@ impl Interpreter for CopyInterpreterV2 {
275275
}
276276

277277
#[tracing::instrument(level = "debug", name = "copy_interpreter_execute_v2", skip(self), fields(ctx.id = self.ctx.get_id().as_str()))]
278-
async fn execute(&self) -> Result<SendableDataBlockStream> {
279-
let build_res = self.execute2().await?;
280-
281-
let settings = self.ctx.get_settings();
282-
let query_need_abort = self.ctx.query_need_abort();
283-
let executor_settings = ExecutorSettings::try_create(&settings)?;
284-
285-
let mut pipelines = build_res.sources_pipelines;
286-
pipelines.push(build_res.main_pipeline);
287-
288-
let executor = PipelineCompleteExecutor::from_pipelines(
289-
query_need_abort,
290-
pipelines,
291-
executor_settings,
292-
)?;
293-
294-
executor.execute()?;
295-
Ok(Box::pin(DataBlockStream::create(
296-
Arc::new(DataSchema::new(vec![])),
297-
None,
298-
vec![],
299-
)))
300-
}
301-
302278
async fn execute2(&self) -> Result<PipelineBuildResult> {
303279
match &self.plan {
304280
// TODO(xuanwo): extract them as a separate function.

src/query/service/src/interpreters/interpreter_factory_interceptor.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,7 @@ impl Interpreter for InterceptorInterpreter {
7373
self.inner.schema()
7474
}
7575

76-
async fn execute(&self) -> Result<SendableDataBlockStream> {
76+
async fn execute(&self, ctx: Arc<QueryContext>) -> Result<SendableDataBlockStream> {
7777
// Management mode access check.
7878
match &self.new_plan {
7979
Some(p) => self.management_mode_access.check_new(p)?,
@@ -83,7 +83,7 @@ impl Interpreter for InterceptorInterpreter {
8383
let _ = self
8484
.inner
8585
.set_source_pipe_builder((*self.source_pipe_builder.lock()).clone());
86-
let result_stream = match self.inner.execute().await {
86+
let result_stream = match self.inner.execute(ctx).await {
8787
Ok(s) => s,
8888
Err(e) => {
8989
self.ctx.set_error(e.clone());

src/query/service/src/interpreters/interpreter_insert.rs

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -85,13 +85,6 @@ impl Interpreter for InsertInterpreter {
8585
"InsertIntoInterpreter"
8686
}
8787

88-
async fn execute(&self) -> Result<SendableDataBlockStream> {
89-
let build_res = self.execute2().await?;
90-
91-
execute_pipeline(self.ctx.clone(), build_res)?;
92-
Ok(Box::pin(DataBlockStream::create(self.plan.schema(), None, vec![])))
93-
}
94-
9588
async fn execute2(&self) -> Result<PipelineBuildResult> {
9689
let plan = &self.plan;
9790
let settings = self.ctx.get_settings();

src/query/service/src/interpreters/interpreter_insert_v2.rs

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -83,14 +83,6 @@ impl Interpreter for InsertInterpreterV2 {
8383
"InsertIntoInterpreter"
8484
}
8585

86-
async fn execute(&self) -> Result<SendableDataBlockStream> {
87-
let build_res = self.execute2().await?;
88-
execute_pipeline(self.ctx.clone(), build_res)?;
89-
90-
// Ok(PipelineBuildResult::create())
91-
unimplemented!()
92-
}
93-
9486
async fn execute2(&self) -> Result<PipelineBuildResult> {
9587
let plan = &self.plan;
9688
let settings = self.ctx.get_settings();

src/query/service/src/interpreters/interpreter_select.rs

Lines changed: 14 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -51,23 +51,6 @@ impl SelectInterpreter {
5151
&self.select.input,
5252
)
5353
}
54-
55-
async fn build_pipeline(&self) -> Result<PipelineBuildResult> {
56-
match self.ctx.get_cluster().is_empty() {
57-
true => {
58-
let settings = self.ctx.get_settings();
59-
let builder = QueryPipelineBuilder::create(self.ctx.clone());
60-
let mut query_pipeline = builder.finalize(&self.rewrite_plan()?)?;
61-
query_pipeline.set_max_threads(settings.get_max_threads()? as usize);
62-
Ok(query_pipeline)
63-
}
64-
false => {
65-
let ctx = self.ctx.clone();
66-
let optimized_plan = self.rewrite_plan()?;
67-
plan_schedulers::schedule_query_new(ctx, &optimized_plan).await
68-
}
69-
}
70-
}
7154
}
7255

7356
#[async_trait::async_trait]
@@ -86,22 +69,20 @@ impl Interpreter for SelectInterpreter {
8669
/// Currently, the method has two sets of logic, if `get_enable_new_processor_framework` is turned on in the settings,
8770
/// the execution will use the new processor, otherwise the old processing logic will be executed.
8871
/// Note: there is an issue to track the progress of the new processor: https://github.com/datafuselabs/databend/issues/3379
89-
async fn execute(&self) -> Result<SendableDataBlockStream> {
90-
let build_res = self.build_pipeline().await?;
91-
let query_need_abort = self.ctx.query_need_abort();
92-
let executor_settings = ExecutorSettings::try_create(&self.ctx.get_settings())?;
93-
Ok(Box::pin(ProcessorExecutorStream::create(
94-
PipelinePullingExecutor::from_pipelines(
95-
query_need_abort,
96-
build_res,
97-
executor_settings,
98-
)?,
99-
)?))
100-
}
101-
102-
/// This method will create a new pipeline
103-
/// The QueryPipelineBuilder will use the optimized plan to generate a Pipeline
10472
async fn execute2(&self) -> Result<PipelineBuildResult> {
105-
self.build_pipeline().await
73+
match self.ctx.get_cluster().is_empty() {
74+
true => {
75+
let settings = self.ctx.get_settings();
76+
let builder = QueryPipelineBuilder::create(self.ctx.clone());
77+
let mut query_pipeline = builder.finalize(&self.rewrite_plan()?)?;
78+
query_pipeline.set_max_threads(settings.get_max_threads()? as usize);
79+
Ok(query_pipeline)
80+
}
81+
false => {
82+
let ctx = self.ctx.clone();
83+
let optimized_plan = self.rewrite_plan()?;
84+
plan_schedulers::schedule_query_new(ctx, &optimized_plan).await
85+
}
86+
}
10687
}
10788
}

src/query/service/src/interpreters/interpreter_select_v2.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -91,7 +91,7 @@ impl Interpreter for SelectInterpreterV2 {
9191
}
9292

9393
#[tracing::instrument(level = "debug", name = "select_interpreter_v2_execute", skip(self), fields(ctx.id = self.ctx.get_id().as_str()))]
94-
async fn execute(&self) -> Result<SendableDataBlockStream> {
94+
async fn execute(&self, ctx: Arc<QueryContext>) -> Result<SendableDataBlockStream> {
9595
let build_res = self.build_pipeline().await?;
9696

9797
// WTF: We need to implement different logic for the HTTP handler

src/query/service/src/interpreters/interpreter_table_create_v2.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -160,7 +160,7 @@ impl CreateTableInterpreterV2 {
160160
};
161161
let insert_interpreter_v2 =
162162
InsertInterpreterV2::try_create(self.ctx.clone(), insert_plan, false)?;
163-
insert_interpreter_v2.execute().await?;
163+
insert_interpreter_v2.execute(self.ctx.clone()).await?;
164164

165165
Ok(PipelineBuildResult::create())
166166
}

0 commit comments

Comments
 (0)