Skip to content

refactor: Integrate the materialized CTE into the plan and pipeline #18226

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 42 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
42 commits
Select commit Hold shift + click to select a range
e880453
add MaterializedCTE plan
SkyFan2002 Jun 19, 2025
466e163
build pipeline
SkyFan2002 Jun 19, 2025
db16044
build pipeline
SkyFan2002 Jun 20, 2025
26bb0ab
add operator
SkyFan2002 Jun 23, 2025
7e986a9
remove m cte temp table
SkyFan2002 Jun 23, 2025
e4ee842
bind
SkyFan2002 Jun 23, 2025
307809c
Merge remote-tracking branch 'upstream/main' into cte_plan
SkyFan2002 Jun 23, 2025
e5d7472
fix
SkyFan2002 Jun 23, 2025
9a53ba2
remove unused field
SkyFan2002 Jun 29, 2025
ff73950
fix bind
SkyFan2002 Jun 30, 2025
5197134
fix schema
SkyFan2002 Jun 30, 2025
ba5be42
fix
SkyFan2002 Jun 30, 2025
7690dbf
make lint
SkyFan2002 Jun 30, 2025
1f22235
Merge branch 'main' into cte_plan
SkyFan2002 Jun 30, 2025
f8f4d7a
fix
SkyFan2002 Jun 30, 2025
4c75ded
fix join
SkyFan2002 Jun 30, 2025
5bb786c
Merge branch 'main' into cte_plan
SkyFan2002 Jun 30, 2025
5a4c0ca
Merge branch 'main' into cte_plan
SkyFan2002 Jun 30, 2025
cc89312
fix
SkyFan2002 Jun 30, 2025
291204a
refine explain
SkyFan2002 Jun 30, 2025
4835fb8
fix
SkyFan2002 Jul 1, 2025
046bd04
fix
SkyFan2002 Jul 2, 2025
4979e0b
fix
SkyFan2002 Jul 2, 2025
af0eeb7
fix
SkyFan2002 Jul 2, 2025
67c2bc3
fix
SkyFan2002 Jul 3, 2025
9a8eb3b
fix
SkyFan2002 Jul 8, 2025
30aa9f3
fix
SkyFan2002 Jul 9, 2025
e39af8e
fix
SkyFan2002 Jul 9, 2025
7b5b406
Merge branch 'main' into cte_plan
SkyFan2002 Jul 9, 2025
a39a569
CleanupUnusedCTE
SkyFan2002 Jul 9, 2025
3686686
fix
SkyFan2002 Jul 10, 2025
fb7bfbd
fix
SkyFan2002 Jul 10, 2025
e600f31
fix
SkyFan2002 Jul 10, 2025
5e0752a
fix
SkyFan2002 Jul 10, 2025
4606b12
refine
SkyFan2002 Jul 10, 2025
70df77d
refine
SkyFan2002 Jul 10, 2025
e38c70f
make lint
SkyFan2002 Jul 10, 2025
6829408
fix
SkyFan2002 Jul 10, 2025
bc4efde
add log
SkyFan2002 Jul 13, 2025
b5ccca9
fix
SkyFan2002 Jul 13, 2025
2f897f4
fix
SkyFan2002 Jul 13, 2025
cf7e7f9
make lint
SkyFan2002 Jul 13, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 0 additions & 4 deletions src/query/catalog/src/table_context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -405,10 +405,6 @@ pub trait TableContext: Send + Sync {
fn is_temp_table(&self, catalog_name: &str, database_name: &str, table_name: &str) -> bool;
fn get_shared_settings(&self) -> Arc<Settings>;

fn add_m_cte_temp_table(&self, database_name: &str, table_name: &str);

async fn drop_m_cte_temp_table(&self) -> Result<()>;

fn add_streams_ref(&self, _catalog: &str, _database: &str, _stream: &str, _consume: bool) {
unimplemented!()
}
Expand Down
2 changes: 0 additions & 2 deletions src/query/service/src/interpreters/hook/compact_hook.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ use log::info;

use crate::interpreters::common::metrics_inc_compact_hook_compact_time_ms;
use crate::interpreters::common::metrics_inc_compact_hook_main_operation_time_ms;
use crate::interpreters::hook::vacuum_hook::hook_clear_m_cte_temp_table;
use crate::interpreters::hook::vacuum_hook::hook_disk_temp_dir;
use crate::interpreters::hook::vacuum_hook::hook_vacuum_temp_files;
use crate::interpreters::Interpreter;
Expand Down Expand Up @@ -188,7 +187,6 @@ async fn compact_table(
let query_ctx = ctx.clone();
build_res.main_pipeline.set_on_finished(always_callback(
move |_info: &ExecutionInfo| {
hook_clear_m_cte_temp_table(&query_ctx)?;
hook_vacuum_temp_files(&query_ctx)?;
hook_disk_temp_dir(&query_ctx)?;
Ok(())
Expand Down
3 changes: 0 additions & 3 deletions src/query/service/src/interpreters/hook/refresh_hook.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ use databend_storages_common_table_meta::meta::Location;
use log::info;
use parking_lot::RwLock;

use crate::interpreters::hook::vacuum_hook::hook_clear_m_cte_temp_table;
use crate::interpreters::hook::vacuum_hook::hook_disk_temp_dir;
use crate::interpreters::hook::vacuum_hook::hook_vacuum_temp_files;
use crate::interpreters::Interpreter;
Expand Down Expand Up @@ -127,7 +126,6 @@ async fn do_refresh(ctx: Arc<QueryContext>, desc: RefreshDesc) -> Result<()> {
let query_ctx = ctx_cloned.clone();
build_res.main_pipeline.set_on_finished(always_callback(
move |_: &ExecutionInfo| {
hook_clear_m_cte_temp_table(&query_ctx)?;
hook_vacuum_temp_files(&query_ctx)?;
hook_disk_temp_dir(&query_ctx)?;
Ok(())
Expand Down Expand Up @@ -164,7 +162,6 @@ async fn do_refresh(ctx: Arc<QueryContext>, desc: RefreshDesc) -> Result<()> {
let query_ctx = ctx_cloned.clone();
build_res.main_pipeline.set_on_finished(always_callback(
move |_info: &ExecutionInfo| {
hook_clear_m_cte_temp_table(&query_ctx)?;
hook_vacuum_temp_files(&query_ctx)?;
hook_disk_temp_dir(&query_ctx)?;
Ok(())
Expand Down
8 changes: 0 additions & 8 deletions src/query/service/src/interpreters/hook/vacuum_hook.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,11 +105,3 @@ pub fn hook_disk_temp_dir(query_ctx: &Arc<QueryContext>) -> Result<()> {

Ok(())
}

pub fn hook_clear_m_cte_temp_table(query_ctx: &Arc<QueryContext>) -> Result<()> {
let _ = GlobalIORuntime::instance().block_on(async move {
query_ctx.drop_m_cte_temp_table().await?;
Ok(())
});
Ok(())
}
2 changes: 0 additions & 2 deletions src/query/service/src/interpreters/interpreter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,6 @@ use log::info;
use md5::Digest;
use md5::Md5;

use super::hook::vacuum_hook::hook_clear_m_cte_temp_table;
use super::hook::vacuum_hook::hook_disk_temp_dir;
use super::hook::vacuum_hook::hook_vacuum_temp_files;
use super::InterpreterMetrics;
Expand Down Expand Up @@ -363,7 +362,6 @@ pub fn on_execution_finished(info: &ExecutionInfo, query_ctx: Arc<QueryContext>)
);
}

hook_clear_m_cte_temp_table(&query_ctx)?;
hook_vacuum_temp_files(&query_ctx)?;
hook_disk_temp_dir(&query_ctx)?;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,6 @@ use crate::interpreters::common::table_option_validation::is_valid_data_retentio
use crate::interpreters::common::table_option_validation::is_valid_option_of_type;
use crate::interpreters::common::table_option_validation::is_valid_random_seed;
use crate::interpreters::common::table_option_validation::is_valid_row_per_block;
use crate::interpreters::hook::vacuum_hook::hook_clear_m_cte_temp_table;
use crate::interpreters::hook::vacuum_hook::hook_disk_temp_dir;
use crate::interpreters::hook::vacuum_hook::hook_vacuum_temp_files;
use crate::interpreters::InsertInterpreter;
Expand Down Expand Up @@ -283,7 +282,6 @@ impl CreateTableInterpreter {
pipeline
.main_pipeline
.set_on_finished(always_callback(move |_: &ExecutionInfo| {
hook_clear_m_cte_temp_table(&query_ctx)?;
hook_vacuum_temp_files(&query_ctx)?;
hook_disk_temp_dir(&query_ctx)?;
Ok(())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,6 @@ use derive_visitor::DriveMut;
use log::error;
use log::warn;

use crate::interpreters::hook::vacuum_hook::hook_clear_m_cte_temp_table;
use crate::interpreters::hook::vacuum_hook::hook_disk_temp_dir;
use crate::interpreters::hook::vacuum_hook::hook_vacuum_temp_files;
use crate::interpreters::interpreter_insert_multi_table::scalar_expr_to_remote_expr;
Expand Down Expand Up @@ -247,7 +246,6 @@ impl ReclusterTableInterpreter {
ctx.evict_table_from_cache(&catalog, &database, &table)?;

ctx.unload_spill_meta();
hook_clear_m_cte_temp_table(&ctx)?;
hook_vacuum_temp_files(&ctx)?;
hook_disk_temp_dir(&ctx)?;
match &info.res {
Expand Down
1 change: 0 additions & 1 deletion src/query/service/src/interpreters/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,6 @@ mod util;

pub use access::ManagementModeAccess;
pub use common::InterpreterQueryLog;
pub use hook::vacuum_hook::hook_clear_m_cte_temp_table;
pub use hook::HookOperator;
pub use interpreter::interpreter_plan_sql;
pub use interpreter::Interpreter;
Expand Down
56 changes: 56 additions & 0 deletions src/query/service/src/pipelines/builders/builder_cte_consumer.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
// Copyright 2021 Datafuse Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use databend_common_exception::ErrorCode;
use databend_common_exception::Result;
use databend_common_sql::executor::physical_plans::CTEConsumer;
use databend_common_storages_fuse::TableContext;

use crate::pipelines::processors::transforms::CTESource;
use crate::pipelines::PipelineBuilder;

impl PipelineBuilder {
pub(crate) fn build_cte_consumer(&mut self, cte: &CTEConsumer) -> Result<()> {
let receiver = self
.cte_receivers
.get(&cte.cte_name)
.ok_or_else(|| {
ErrorCode::Internal(format!("CTE receiver not found for name: {}", cte.cte_name))
})?
.clone();

let mut next_cte_consumer_id = self.next_cte_consumer_id.lock();
let current_consumer_id = *next_cte_consumer_id.get(&cte.cte_name).ok_or_else(|| {
ErrorCode::Internal(format!(
"CTE consumer id not found for name: {}",
cte.cte_name
))
})?;

next_cte_consumer_id.insert(cte.cte_name.clone(), current_consumer_id + 1);

self.main_pipeline.add_source(
|output_port| {
CTESource::create(
self.ctx.clone(),
output_port.clone(),
receiver.clone(),
current_consumer_id,
)
},
self.ctx.get_settings().get_max_threads()? as usize,
)?;
Ok(())
}
}
2 changes: 2 additions & 0 deletions src/query/service/src/pipelines/builders/builder_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ impl PipelineBuilder {
let mut sub_builder =
PipelineBuilder::create(self.func_ctx.clone(), self.settings.clone(), sub_context);
sub_builder.hash_join_states = self.hash_join_states.clone();
sub_builder.cte_receivers = self.cte_receivers.clone();
sub_builder.next_cte_consumer_id = self.next_cte_consumer_id.clone();
sub_builder
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
// Copyright 2021 Datafuse Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::sync::Arc;

use databend_common_exception::Result;
use databend_common_sql::executor::physical_plans::MaterializedCTE;

use crate::pipelines::processors::transforms::MaterializedCteData;
use crate::pipelines::processors::transforms::MaterializedCteSink;
use crate::pipelines::PipelineBuilder;
use crate::sessions::QueryContext;
impl PipelineBuilder {
pub(crate) fn build_materialized_cte(&mut self, cte: &MaterializedCTE) -> Result<()> {
// init builder for cte pipeline
let sub_context = QueryContext::create_from(self.ctx.as_ref());
let mut sub_builder =
PipelineBuilder::create(self.func_ctx.clone(), self.settings.clone(), sub_context);
sub_builder.cte_receivers = self.cte_receivers.clone();
sub_builder.next_cte_consumer_id = self.next_cte_consumer_id.clone();

// build cte pipeline
let mut build_res = sub_builder.finalize(&cte.left)?;
let input_schema = cte.left.output_schema()?;
Self::build_result_projection(
&self.func_ctx,
input_schema,
&cte.cte_output_columns,
&mut build_res.main_pipeline,
false,
)?;
build_res.main_pipeline.try_resize(1)?;
let (tx, rx) = tokio::sync::watch::channel(Arc::new(MaterializedCteData::default()));
self.cte_receivers.insert(cte.cte_name.clone(), rx);
self.next_cte_consumer_id
.lock()
.insert(cte.cte_name.clone(), 0);
build_res
.main_pipeline
.add_sink(|input| MaterializedCteSink::create(input, tx.clone()))?;

// add cte pipeline to pipelines
self.pipelines.push(build_res.main_pipeline);
self.pipelines.extend(build_res.sources_pipelines);

// build main pipeline
self.build_pipeline(&cte.right)?;
Ok(())
}
}
2 changes: 2 additions & 0 deletions src/query/service/src/pipelines/builders/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ mod builder_commit;
mod builder_compact;
mod builder_copy_into_location;
mod builder_copy_into_table;
mod builder_cte_consumer;
mod builder_distributed_insert_select;
mod builder_exchange;
mod builder_fill_missing_columns;
Expand All @@ -30,6 +31,7 @@ mod builder_hilbert_partition;
mod builder_insert_multi_table;
mod builder_join;
mod builder_limit;
mod builder_materialized_cte;
mod builder_mutation;
mod builder_mutation_manipulate;
mod builder_mutation_organize;
Expand Down
10 changes: 10 additions & 0 deletions src/query/service/src/pipelines/pipeline_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,12 @@ use databend_common_pipeline_core::ExecutionInfo;
use databend_common_pipeline_core::Pipeline;
use databend_common_settings::Settings;
use databend_common_sql::executor::PhysicalPlan;
use parking_lot::Mutex;
use tokio::sync::watch::Receiver;

use super::PipelineBuilderData;
use crate::interpreters::CreateTableInterpreter;
use crate::pipelines::processors::transforms::MaterializedCteData;
use crate::pipelines::processors::HashJoinBuildState;
use crate::pipelines::processors::HashJoinState;
use crate::pipelines::PipelineBuildResult;
Expand Down Expand Up @@ -57,6 +60,8 @@ pub struct PipelineBuilder {
pub(crate) is_exchange_stack: Vec<bool>,

pub contain_sink_processor: bool,
pub cte_receivers: HashMap<String, Receiver<Arc<MaterializedCteData>>>,
pub next_cte_consumer_id: Arc<Mutex<HashMap<String, usize>>>,
}

impl PipelineBuilder {
Expand All @@ -78,6 +83,8 @@ impl PipelineBuilder {
r_cte_scan_interpreters: vec![],
contain_sink_processor: false,
is_exchange_stack: vec![],
cte_receivers: HashMap::new(),
next_cte_consumer_id: Arc::new(Mutex::new(HashMap::new())),
}
}

Expand Down Expand Up @@ -295,6 +302,9 @@ impl PipelineBuilder {
PhysicalPlan::Exchange(_) => Err(ErrorCode::Internal(
"Invalid physical plan with PhysicalPlan::Exchange",
)),

PhysicalPlan::MaterializedCTE(cte) => self.build_materialized_cte(cte),
PhysicalPlan::CTEConsumer(cte_consumer) => self.build_cte_consumer(cte_consumer),
}?;

self.is_exchange_stack.pop();
Expand Down
Loading
Loading