diff --git a/src/query/catalog/src/table_context.rs b/src/query/catalog/src/table_context.rs index 734f8319a02ee..cd9302575fb86 100644 --- a/src/query/catalog/src/table_context.rs +++ b/src/query/catalog/src/table_context.rs @@ -405,10 +405,6 @@ pub trait TableContext: Send + Sync { fn is_temp_table(&self, catalog_name: &str, database_name: &str, table_name: &str) -> bool; fn get_shared_settings(&self) -> Arc; - fn add_m_cte_temp_table(&self, database_name: &str, table_name: &str); - - async fn drop_m_cte_temp_table(&self) -> Result<()>; - fn add_streams_ref(&self, _catalog: &str, _database: &str, _stream: &str, _consume: bool) { unimplemented!() } diff --git a/src/query/service/src/interpreters/hook/compact_hook.rs b/src/query/service/src/interpreters/hook/compact_hook.rs index e2112c19dae18..2b719b0fd454a 100644 --- a/src/query/service/src/interpreters/hook/compact_hook.rs +++ b/src/query/service/src/interpreters/hook/compact_hook.rs @@ -33,7 +33,6 @@ use log::info; use crate::interpreters::common::metrics_inc_compact_hook_compact_time_ms; use crate::interpreters::common::metrics_inc_compact_hook_main_operation_time_ms; -use crate::interpreters::hook::vacuum_hook::hook_clear_m_cte_temp_table; use crate::interpreters::hook::vacuum_hook::hook_disk_temp_dir; use crate::interpreters::hook::vacuum_hook::hook_vacuum_temp_files; use crate::interpreters::Interpreter; @@ -188,7 +187,6 @@ async fn compact_table( let query_ctx = ctx.clone(); build_res.main_pipeline.set_on_finished(always_callback( move |_info: &ExecutionInfo| { - hook_clear_m_cte_temp_table(&query_ctx)?; hook_vacuum_temp_files(&query_ctx)?; hook_disk_temp_dir(&query_ctx)?; Ok(()) diff --git a/src/query/service/src/interpreters/hook/refresh_hook.rs b/src/query/service/src/interpreters/hook/refresh_hook.rs index 67c444f6b43d7..54cc9526edfa9 100644 --- a/src/query/service/src/interpreters/hook/refresh_hook.rs +++ b/src/query/service/src/interpreters/hook/refresh_hook.rs @@ -38,7 +38,6 @@ use databend_storages_common_table_meta::meta::Location; use log::info; use parking_lot::RwLock; -use crate::interpreters::hook::vacuum_hook::hook_clear_m_cte_temp_table; use crate::interpreters::hook::vacuum_hook::hook_disk_temp_dir; use crate::interpreters::hook::vacuum_hook::hook_vacuum_temp_files; use crate::interpreters::Interpreter; @@ -127,7 +126,6 @@ async fn do_refresh(ctx: Arc, desc: RefreshDesc) -> Result<()> { let query_ctx = ctx_cloned.clone(); build_res.main_pipeline.set_on_finished(always_callback( move |_: &ExecutionInfo| { - hook_clear_m_cte_temp_table(&query_ctx)?; hook_vacuum_temp_files(&query_ctx)?; hook_disk_temp_dir(&query_ctx)?; Ok(()) @@ -164,7 +162,6 @@ async fn do_refresh(ctx: Arc, desc: RefreshDesc) -> Result<()> { let query_ctx = ctx_cloned.clone(); build_res.main_pipeline.set_on_finished(always_callback( move |_info: &ExecutionInfo| { - hook_clear_m_cte_temp_table(&query_ctx)?; hook_vacuum_temp_files(&query_ctx)?; hook_disk_temp_dir(&query_ctx)?; Ok(()) diff --git a/src/query/service/src/interpreters/hook/vacuum_hook.rs b/src/query/service/src/interpreters/hook/vacuum_hook.rs index c3eed71fc475e..a0c1cf396ef79 100644 --- a/src/query/service/src/interpreters/hook/vacuum_hook.rs +++ b/src/query/service/src/interpreters/hook/vacuum_hook.rs @@ -105,11 +105,3 @@ pub fn hook_disk_temp_dir(query_ctx: &Arc) -> Result<()> { Ok(()) } - -pub fn hook_clear_m_cte_temp_table(query_ctx: &Arc) -> Result<()> { - let _ = GlobalIORuntime::instance().block_on(async move { - query_ctx.drop_m_cte_temp_table().await?; - Ok(()) - }); - Ok(()) -} diff --git a/src/query/service/src/interpreters/interpreter.rs b/src/query/service/src/interpreters/interpreter.rs index 1d747ba85ad4b..c3c0a98c79113 100644 --- a/src/query/service/src/interpreters/interpreter.rs +++ b/src/query/service/src/interpreters/interpreter.rs @@ -49,7 +49,6 @@ use log::info; use md5::Digest; use md5::Md5; -use super::hook::vacuum_hook::hook_clear_m_cte_temp_table; use super::hook::vacuum_hook::hook_disk_temp_dir; use super::hook::vacuum_hook::hook_vacuum_temp_files; use super::InterpreterMetrics; @@ -363,7 +362,6 @@ pub fn on_execution_finished(info: &ExecutionInfo, query_ctx: Arc) ); } - hook_clear_m_cte_temp_table(&query_ctx)?; hook_vacuum_temp_files(&query_ctx)?; hook_disk_temp_dir(&query_ctx)?; diff --git a/src/query/service/src/interpreters/interpreter_table_create.rs b/src/query/service/src/interpreters/interpreter_table_create.rs index de7c27cdd1777..bd859b90edce7 100644 --- a/src/query/service/src/interpreters/interpreter_table_create.rs +++ b/src/query/service/src/interpreters/interpreter_table_create.rs @@ -74,7 +74,6 @@ use crate::interpreters::common::table_option_validation::is_valid_data_retentio use crate::interpreters::common::table_option_validation::is_valid_option_of_type; use crate::interpreters::common::table_option_validation::is_valid_random_seed; use crate::interpreters::common::table_option_validation::is_valid_row_per_block; -use crate::interpreters::hook::vacuum_hook::hook_clear_m_cte_temp_table; use crate::interpreters::hook::vacuum_hook::hook_disk_temp_dir; use crate::interpreters::hook::vacuum_hook::hook_vacuum_temp_files; use crate::interpreters::InsertInterpreter; @@ -283,7 +282,6 @@ impl CreateTableInterpreter { pipeline .main_pipeline .set_on_finished(always_callback(move |_: &ExecutionInfo| { - hook_clear_m_cte_temp_table(&query_ctx)?; hook_vacuum_temp_files(&query_ctx)?; hook_disk_temp_dir(&query_ctx)?; Ok(()) diff --git a/src/query/service/src/interpreters/interpreter_table_recluster.rs b/src/query/service/src/interpreters/interpreter_table_recluster.rs index f3c53597b06d7..3c27d2ba621f9 100644 --- a/src/query/service/src/interpreters/interpreter_table_recluster.rs +++ b/src/query/service/src/interpreters/interpreter_table_recluster.rs @@ -68,7 +68,6 @@ use derive_visitor::DriveMut; use log::error; use log::warn; -use crate::interpreters::hook::vacuum_hook::hook_clear_m_cte_temp_table; use crate::interpreters::hook::vacuum_hook::hook_disk_temp_dir; use crate::interpreters::hook::vacuum_hook::hook_vacuum_temp_files; use crate::interpreters::interpreter_insert_multi_table::scalar_expr_to_remote_expr; @@ -247,7 +246,6 @@ impl ReclusterTableInterpreter { ctx.evict_table_from_cache(&catalog, &database, &table)?; ctx.unload_spill_meta(); - hook_clear_m_cte_temp_table(&ctx)?; hook_vacuum_temp_files(&ctx)?; hook_disk_temp_dir(&ctx)?; match &info.res { diff --git a/src/query/service/src/interpreters/mod.rs b/src/query/service/src/interpreters/mod.rs index d221ac3d8c6b8..7311fff3cd7f5 100644 --- a/src/query/service/src/interpreters/mod.rs +++ b/src/query/service/src/interpreters/mod.rs @@ -174,7 +174,6 @@ mod util; pub use access::ManagementModeAccess; pub use common::InterpreterQueryLog; -pub use hook::vacuum_hook::hook_clear_m_cte_temp_table; pub use hook::HookOperator; pub use interpreter::interpreter_plan_sql; pub use interpreter::Interpreter; diff --git a/src/query/service/src/pipelines/builders/builder_cte_consumer.rs b/src/query/service/src/pipelines/builders/builder_cte_consumer.rs new file mode 100644 index 0000000000000..1fa78f91fbc7d --- /dev/null +++ b/src/query/service/src/pipelines/builders/builder_cte_consumer.rs @@ -0,0 +1,56 @@ +// Copyright 2021 Datafuse Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use databend_common_exception::ErrorCode; +use databend_common_exception::Result; +use databend_common_sql::executor::physical_plans::CTEConsumer; +use databend_common_storages_fuse::TableContext; + +use crate::pipelines::processors::transforms::CTESource; +use crate::pipelines::PipelineBuilder; + +impl PipelineBuilder { + pub(crate) fn build_cte_consumer(&mut self, cte: &CTEConsumer) -> Result<()> { + let receiver = self + .cte_receivers + .get(&cte.cte_name) + .ok_or_else(|| { + ErrorCode::Internal(format!("CTE receiver not found for name: {}", cte.cte_name)) + })? + .clone(); + + let mut next_cte_consumer_id = self.next_cte_consumer_id.lock(); + let current_consumer_id = *next_cte_consumer_id.get(&cte.cte_name).ok_or_else(|| { + ErrorCode::Internal(format!( + "CTE consumer id not found for name: {}", + cte.cte_name + )) + })?; + + next_cte_consumer_id.insert(cte.cte_name.clone(), current_consumer_id + 1); + + self.main_pipeline.add_source( + |output_port| { + CTESource::create( + self.ctx.clone(), + output_port.clone(), + receiver.clone(), + current_consumer_id, + ) + }, + self.ctx.get_settings().get_max_threads()? as usize, + )?; + Ok(()) + } +} diff --git a/src/query/service/src/pipelines/builders/builder_join.rs b/src/query/service/src/pipelines/builders/builder_join.rs index 7937b11b3344c..09b1b9a7bb3d7 100644 --- a/src/query/service/src/pipelines/builders/builder_join.rs +++ b/src/query/service/src/pipelines/builders/builder_join.rs @@ -41,6 +41,8 @@ impl PipelineBuilder { let mut sub_builder = PipelineBuilder::create(self.func_ctx.clone(), self.settings.clone(), sub_context); sub_builder.hash_join_states = self.hash_join_states.clone(); + sub_builder.cte_receivers = self.cte_receivers.clone(); + sub_builder.next_cte_consumer_id = self.next_cte_consumer_id.clone(); sub_builder } diff --git a/src/query/service/src/pipelines/builders/builder_materialized_cte.rs b/src/query/service/src/pipelines/builders/builder_materialized_cte.rs new file mode 100644 index 0000000000000..e90efe0b06163 --- /dev/null +++ b/src/query/service/src/pipelines/builders/builder_materialized_cte.rs @@ -0,0 +1,61 @@ +// Copyright 2021 Datafuse Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::sync::Arc; + +use databend_common_exception::Result; +use databend_common_sql::executor::physical_plans::MaterializedCTE; + +use crate::pipelines::processors::transforms::MaterializedCteData; +use crate::pipelines::processors::transforms::MaterializedCteSink; +use crate::pipelines::PipelineBuilder; +use crate::sessions::QueryContext; +impl PipelineBuilder { + pub(crate) fn build_materialized_cte(&mut self, cte: &MaterializedCTE) -> Result<()> { + // init builder for cte pipeline + let sub_context = QueryContext::create_from(self.ctx.as_ref()); + let mut sub_builder = + PipelineBuilder::create(self.func_ctx.clone(), self.settings.clone(), sub_context); + sub_builder.cte_receivers = self.cte_receivers.clone(); + sub_builder.next_cte_consumer_id = self.next_cte_consumer_id.clone(); + + // build cte pipeline + let mut build_res = sub_builder.finalize(&cte.left)?; + let input_schema = cte.left.output_schema()?; + Self::build_result_projection( + &self.func_ctx, + input_schema, + &cte.cte_output_columns, + &mut build_res.main_pipeline, + false, + )?; + build_res.main_pipeline.try_resize(1)?; + let (tx, rx) = tokio::sync::watch::channel(Arc::new(MaterializedCteData::default())); + self.cte_receivers.insert(cte.cte_name.clone(), rx); + self.next_cte_consumer_id + .lock() + .insert(cte.cte_name.clone(), 0); + build_res + .main_pipeline + .add_sink(|input| MaterializedCteSink::create(input, tx.clone()))?; + + // add cte pipeline to pipelines + self.pipelines.push(build_res.main_pipeline); + self.pipelines.extend(build_res.sources_pipelines); + + // build main pipeline + self.build_pipeline(&cte.right)?; + Ok(()) + } +} diff --git a/src/query/service/src/pipelines/builders/mod.rs b/src/query/service/src/pipelines/builders/mod.rs index 552580bc7c467..ba86e21fd6dc0 100644 --- a/src/query/service/src/pipelines/builders/mod.rs +++ b/src/query/service/src/pipelines/builders/mod.rs @@ -22,6 +22,7 @@ mod builder_commit; mod builder_compact; mod builder_copy_into_location; mod builder_copy_into_table; +mod builder_cte_consumer; mod builder_distributed_insert_select; mod builder_exchange; mod builder_fill_missing_columns; @@ -30,6 +31,7 @@ mod builder_hilbert_partition; mod builder_insert_multi_table; mod builder_join; mod builder_limit; +mod builder_materialized_cte; mod builder_mutation; mod builder_mutation_manipulate; mod builder_mutation_organize; diff --git a/src/query/service/src/pipelines/pipeline_builder.rs b/src/query/service/src/pipelines/pipeline_builder.rs index e2e2b1c30a001..c3b909d72075c 100644 --- a/src/query/service/src/pipelines/pipeline_builder.rs +++ b/src/query/service/src/pipelines/pipeline_builder.rs @@ -27,9 +27,12 @@ use databend_common_pipeline_core::ExecutionInfo; use databend_common_pipeline_core::Pipeline; use databend_common_settings::Settings; use databend_common_sql::executor::PhysicalPlan; +use parking_lot::Mutex; +use tokio::sync::watch::Receiver; use super::PipelineBuilderData; use crate::interpreters::CreateTableInterpreter; +use crate::pipelines::processors::transforms::MaterializedCteData; use crate::pipelines::processors::HashJoinBuildState; use crate::pipelines::processors::HashJoinState; use crate::pipelines::PipelineBuildResult; @@ -57,6 +60,8 @@ pub struct PipelineBuilder { pub(crate) is_exchange_stack: Vec, pub contain_sink_processor: bool, + pub cte_receivers: HashMap>>, + pub next_cte_consumer_id: Arc>>, } impl PipelineBuilder { @@ -78,6 +83,8 @@ impl PipelineBuilder { r_cte_scan_interpreters: vec![], contain_sink_processor: false, is_exchange_stack: vec![], + cte_receivers: HashMap::new(), + next_cte_consumer_id: Arc::new(Mutex::new(HashMap::new())), } } @@ -295,6 +302,9 @@ impl PipelineBuilder { PhysicalPlan::Exchange(_) => Err(ErrorCode::Internal( "Invalid physical plan with PhysicalPlan::Exchange", )), + + PhysicalPlan::MaterializedCTE(cte) => self.build_materialized_cte(cte), + PhysicalPlan::CTEConsumer(cte_consumer) => self.build_cte_consumer(cte_consumer), }?; self.is_exchange_stack.pop(); diff --git a/src/query/service/src/pipelines/processors/transforms/materialized_cte.rs b/src/query/service/src/pipelines/processors/transforms/materialized_cte.rs new file mode 100644 index 0000000000000..092ae0ac94366 --- /dev/null +++ b/src/query/service/src/pipelines/processors/transforms/materialized_cte.rs @@ -0,0 +1,136 @@ +use std::collections::HashMap; +use std::sync::Arc; +use std::sync::Mutex; + +use databend_common_exception::ErrorCode; +use databend_common_exception::Result; +use databend_common_expression::DataBlock; +use databend_common_pipeline_core::processors::InputPort; +use databend_common_pipeline_core::processors::OutputPort; +use databend_common_pipeline_core::processors::ProcessorPtr; +// Copyright 2021 Datafuse Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +use databend_common_pipeline_sinks::Sink; +use databend_common_pipeline_sinks::Sinker; +use databend_common_pipeline_sources::AsyncSource; +use databend_common_pipeline_sources::AsyncSourcer; +use tokio::sync::watch::Receiver; +use tokio::sync::watch::Sender; + +pub struct MaterializedCteSink { + sender: Sender>, + blocks: Vec, +} + +#[derive(Default)] +pub struct MaterializedCteData { + blocks: Vec, + // consumer_id -> current_index + consumer_states: Arc>>, +} + +impl MaterializedCteData { + pub fn new(blocks: Vec) -> Self { + Self { + blocks, + consumer_states: Arc::new(Mutex::new(HashMap::new())), + } + } + + pub fn get_next_block(&self, consumer_id: usize) -> Option { + let mut states = self.consumer_states.lock().unwrap(); + let current_index = states.get(&consumer_id).copied().unwrap_or(0); + + if current_index < self.blocks.len() { + let block = self.blocks[current_index].clone(); + states.insert(consumer_id, current_index + 1); + Some(block) + } else { + None + } + } +} + +impl MaterializedCteSink { + pub fn create( + input: Arc, + sender: Sender>, + ) -> Result { + Ok(ProcessorPtr::create(Sinker::create(input, Self { + blocks: vec![], + sender, + }))) + } +} + +impl Sink for MaterializedCteSink { + const NAME: &'static str = "MaterializedCteSink"; + + fn consume(&mut self, data_block: DataBlock) -> Result<()> { + self.blocks.push(data_block); + Ok(()) + } + + fn on_finish(&mut self) -> Result<()> { + self.sender + .send(Arc::new(MaterializedCteData::new(self.blocks.clone()))) + .map_err(|_| { + ErrorCode::Internal("Failed to send blocks to materialized cte consumer") + })?; + Ok(()) + } +} + +pub struct CTESource { + receiver: Receiver>, + data: Option>, + consumer_id: usize, +} + +impl CTESource { + pub fn create( + ctx: Arc, + output_port: Arc, + receiver: Receiver>, + consumer_id: usize, + ) -> Result { + AsyncSourcer::create(ctx, output_port, Self { + receiver, + data: None, + consumer_id, + }) + } +} + +#[async_trait::async_trait] +impl AsyncSource for CTESource { + const NAME: &'static str = "CTEConsumerSource"; + + #[async_backtrace::framed] + async fn generate(&mut self) -> Result> { + if self.data.is_none() { + self.receiver.changed().await.map_err(|_| { + ErrorCode::Internal("Failed to get data from receiver in CTEConsumerSource") + })?; + self.data = Some(self.receiver.borrow().clone()); + } + + if let Some(data) = &self.data { + if let Some(block) = data.get_next_block(self.consumer_id) { + return Ok(Some(block)); + } + } + Ok(None) + } +} diff --git a/src/query/service/src/pipelines/processors/transforms/mod.rs b/src/query/service/src/pipelines/processors/transforms/mod.rs index 7776ca90e1780..2a619c974aaf4 100644 --- a/src/query/service/src/pipelines/processors/transforms/mod.rs +++ b/src/query/service/src/pipelines/processors/transforms/mod.rs @@ -16,6 +16,7 @@ pub mod aggregator; #[allow(dead_code)] mod broadcast; mod hash_join; +mod materialized_cte; pub(crate) mod range_join; mod runtime_pool; mod transform_add_computed_columns; @@ -45,6 +46,9 @@ mod window; pub use broadcast::BroadcastSinkProcessor; pub use broadcast::BroadcastSourceProcessor; pub use hash_join::*; +pub use materialized_cte::CTESource; +pub use materialized_cte::MaterializedCteData; +pub use materialized_cte::MaterializedCteSink; pub use transform_add_computed_columns::TransformAddComputedColumns; pub use transform_add_const_columns::TransformAddConstColumns; pub use transform_add_internal_columns::TransformAddInternalColumns; diff --git a/src/query/service/src/pipelines/processors/transforms/transform_recursive_cte_source.rs b/src/query/service/src/pipelines/processors/transforms/transform_recursive_cte_source.rs index c7ae2cd8c73fa..8f61f51e61778 100644 --- a/src/query/service/src/pipelines/processors/transforms/transform_recursive_cte_source.rs +++ b/src/query/service/src/pipelines/processors/transforms/transform_recursive_cte_source.rs @@ -329,6 +329,10 @@ async fn create_memory_table_for_cte_scan( PhysicalPlan::AsyncFunction(plan) => { create_memory_table_for_cte_scan(ctx, plan.input.as_ref()).await?; } + PhysicalPlan::MaterializedCTE(plan) => { + create_memory_table_for_cte_scan(ctx, plan.left.as_ref()).await?; + create_memory_table_for_cte_scan(ctx, plan.right.as_ref()).await?; + } PhysicalPlan::TableScan(_) | PhysicalPlan::ConstantTableScan(_) | PhysicalPlan::ExpressionScan(_) @@ -361,6 +365,7 @@ async fn create_memory_table_for_cte_scan( | PhysicalPlan::ChunkMerge(_) | PhysicalPlan::ChunkCommitInsert(_) | PhysicalPlan::BroadcastSource(_) + | PhysicalPlan::CTEConsumer(_) | PhysicalPlan::BroadcastSink(_) => {} } Ok(()) diff --git a/src/query/service/src/schedulers/fragments/fragmenter.rs b/src/query/service/src/schedulers/fragments/fragmenter.rs index 2e7a6e878b819..29b36f051b3ce 100644 --- a/src/query/service/src/schedulers/fragments/fragmenter.rs +++ b/src/query/service/src/schedulers/fragments/fragmenter.rs @@ -17,6 +17,7 @@ use std::sync::Arc; use databend_common_catalog::table_context::TableContext; use databend_common_exception::Result; use databend_common_meta_types::NodeInfo; +use databend_common_sql::executor::physical_plans::CTEConsumer; use databend_common_sql::executor::physical_plans::CompactSource; use databend_common_sql::executor::physical_plans::ConstantTableScan; use databend_common_sql::executor::physical_plans::CopyIntoTable; @@ -26,6 +27,7 @@ use databend_common_sql::executor::physical_plans::ExchangeSink; use databend_common_sql::executor::physical_plans::ExchangeSource; use databend_common_sql::executor::physical_plans::FragmentKind; use databend_common_sql::executor::physical_plans::HashJoin; +use databend_common_sql::executor::physical_plans::MaterializedCTE; use databend_common_sql::executor::physical_plans::MutationSource; use databend_common_sql::executor::physical_plans::Recluster; use databend_common_sql::executor::physical_plans::ReplaceInto; @@ -67,6 +69,7 @@ enum State { Compact, Recluster, Other, + CTEConsumer, } impl Fragmenter { @@ -160,6 +163,11 @@ impl PhysicalPlanReplacer for Fragmenter { Ok(PhysicalPlan::TableScan(plan.clone())) } + fn replace_cte_consumer(&mut self, plan: &CTEConsumer) -> Result { + self.state = State::CTEConsumer; + Ok(PhysicalPlan::CTEConsumer(Box::new(plan.clone()))) + } + fn replace_constant_table_scan(&mut self, plan: &ConstantTableScan) -> Result { self.state = State::SelectLeaf; Ok(PhysicalPlan::ConstantTableScan(plan.clone())) @@ -310,6 +318,7 @@ impl PhysicalPlanReplacer for Fragmenter { State::ReplaceInto => FragmentType::ReplaceInto, State::Compact => FragmentType::Compact, State::Recluster => FragmentType::Recluster, + State::CTEConsumer => FragmentType::Intermediate, }; self.state = State::Other; let exchange = Self::get_exchange(self.ctx.clone(), &plan)?; @@ -341,4 +350,23 @@ impl PhysicalPlanReplacer for Fragmenter { source_fragment_id, })) } + + fn replace_materialized_cte(&mut self, plan: &MaterializedCTE) -> Result { + let mut fragments = vec![]; + let left = self.replace(plan.left.as_ref())?; + + fragments.append(&mut self.fragments); + let right = self.replace(plan.right.as_ref())?; + fragments.append(&mut self.fragments); + + self.fragments = fragments; + Ok(PhysicalPlan::MaterializedCTE(Box::new(MaterializedCTE { + plan_id: plan.plan_id, + left: Box::new(left), + right: Box::new(right), + stat_info: plan.stat_info.clone(), + cte_name: plan.cte_name.clone(), + cte_output_columns: plan.cte_output_columns.clone(), + }))) + } } diff --git a/src/query/service/src/schedulers/fragments/query_fragment_actions.rs b/src/query/service/src/schedulers/fragments/query_fragment_actions.rs index ea2dccf3d2379..9c0d5d42f888e 100644 --- a/src/query/service/src/schedulers/fragments/query_fragment_actions.rs +++ b/src/query/service/src/schedulers/fragments/query_fragment_actions.rs @@ -130,6 +130,13 @@ impl QueryFragmentsActions { pub fn get_root_fragment_ids(&self) -> Result> { let mut fragment_ids = Vec::new(); for fragment_actions in &self.fragments_actions { + if fragment_actions.fragment_actions.is_empty() { + return Err(ErrorCode::Internal(format!( + "Fragment actions is empty for fragment_id: {}", + fragment_actions.fragment_id + ))); + } + let plan = &fragment_actions.fragment_actions[0].physical_plan; if !matches!(plan, PhysicalPlan::ExchangeSink(_)) { fragment_ids.push(fragment_actions.fragment_id); diff --git a/src/query/service/src/servers/http/v1/query/http_query.rs b/src/query/service/src/servers/http/v1/query/http_query.rs index 4c07eddf12b62..3a1d21488ce95 100644 --- a/src/query/service/src/servers/http/v1/query/http_query.rs +++ b/src/query/service/src/servers/http/v1/query/http_query.rs @@ -791,9 +791,6 @@ impl HttpQuery { .with_context(|| "failed to start query") .flatten() { - crate::interpreters::hook_clear_m_cte_temp_table(&query_context) - .inspect_err(|e| warn!("clear_m_cte_temp_table fail: {e}")) - .ok(); let state = ExecuteStopped { stats: Progresses::default(), schema: vec![], diff --git a/src/query/service/src/sessions/query_ctx.rs b/src/query/service/src/sessions/query_ctx.rs index ea4e9f744fffc..51e9018842023 100644 --- a/src/query/service/src/sessions/query_ctx.rs +++ b/src/query/service/src/sessions/query_ctx.rs @@ -89,7 +89,6 @@ use databend_common_meta_app::principal::UserPrivilegeType; use databend_common_meta_app::principal::COPY_MAX_FILES_COMMIT_MSG; use databend_common_meta_app::principal::COPY_MAX_FILES_PER_COMMIT; use databend_common_meta_app::schema::CatalogType; -use databend_common_meta_app::schema::DropTableByIdReq; use databend_common_meta_app::schema::GetTableCopiedFileReq; use databend_common_meta_app::schema::TableInfo; use databend_common_meta_app::storage::StorageParams; @@ -121,14 +120,12 @@ use databend_common_storages_stream::stream_table::StreamTable; use databend_common_users::GrantObjectVisibilityChecker; use databend_common_users::UserApiProvider; use databend_common_version::DATABEND_COMMIT_VERSION; -use databend_storages_common_session::drop_table_by_id; use databend_storages_common_session::SessionState; use databend_storages_common_session::TxnManagerRef; use databend_storages_common_table_meta::meta::Location; use databend_storages_common_table_meta::meta::SnapshotTimestampValidationContext; use databend_storages_common_table_meta::meta::TableMetaTimestamps; use databend_storages_common_table_meta::meta::TableSnapshot; -use databend_storages_common_table_meta::table::OPT_KEY_TEMP_PREFIX; use jiff::tz::TimeZone; use jiff::Zoned; use log::debug; @@ -168,9 +165,6 @@ pub struct QueryContext { fragment_id: Arc, // Used by synchronized generate aggregating indexes when new data written. written_segment_locs: Arc>>, - // Temp table for materialized CTE, first string is the database_name, second string is the table_name - // All temp tables' catalog is `CATALOG_DEFAULT`, so we don't need to store it. - m_cte_temp_table: Arc>>, } impl QueryContext { @@ -196,7 +190,6 @@ impl QueryContext { fragment_id: Arc::new(AtomicUsize::new(0)), written_segment_locs: Default::default(), block_threshold: Default::default(), - m_cte_temp_table: Default::default(), }) } @@ -1875,44 +1868,6 @@ impl TableContext for QueryContext { .is_temp_table(database_name, table_name) } - fn add_m_cte_temp_table(&self, database_name: &str, table_name: &str) { - self.m_cte_temp_table - .write() - .push((database_name.to_string(), table_name.to_string())); - } - - async fn drop_m_cte_temp_table(&self) -> Result<()> { - let temp_tbl_mgr = self.shared.session.session_ctx.temp_tbl_mgr(); - let m_cte_temp_table = self.m_cte_temp_table.read().clone(); - let tenant = self.get_tenant(); - for (db_name, table_name) in m_cte_temp_table.iter() { - let table = self.get_table(CATALOG_DEFAULT, db_name, table_name).await?; - let db = self - .get_catalog(CATALOG_DEFAULT) - .await? - .get_database(&tenant, db_name) - .await?; - let drop_table_req = DropTableByIdReq { - if_exists: true, - tenant: tenant.clone(), - tb_id: table.get_table_info().ident.table_id, - table_name: table_name.to_string(), - db_id: db.get_db_info().database_id.db_id, - db_name: db.name().to_string(), - engine: table.engine().to_string(), - temp_prefix: table - .options() - .get(OPT_KEY_TEMP_PREFIX) - .cloned() - .unwrap_or_default(), - }; - drop_table_by_id(temp_tbl_mgr.clone(), drop_table_req).await?; - } - let mut m_cte_temp_table = self.m_cte_temp_table.write(); - m_cte_temp_table.clear(); - Ok(()) - } - fn add_streams_ref(&self, catalog: &str, database: &str, stream: &str, consume: bool) { let mut streams = self.shared.streams_refs.write(); let stream_key = ( diff --git a/src/query/service/tests/it/sql/exec/get_table_bind_test.rs b/src/query/service/tests/it/sql/exec/get_table_bind_test.rs index 81c123a701730..0f51b72876704 100644 --- a/src/query/service/tests/it/sql/exec/get_table_bind_test.rs +++ b/src/query/service/tests/it/sql/exec/get_table_bind_test.rs @@ -994,14 +994,6 @@ impl TableContext for CtxDelegation { false } - fn add_m_cte_temp_table(&self, _database_name: &str, _table_name: &str) { - todo!() - } - - async fn drop_m_cte_temp_table(&self) -> Result<()> { - todo!() - } - fn set_cluster(&self, _: Arc) { todo!() } diff --git a/src/query/service/tests/it/storages/fuse/operations/commit.rs b/src/query/service/tests/it/storages/fuse/operations/commit.rs index f354105c06a89..bc51dcd27fe48 100644 --- a/src/query/service/tests/it/storages/fuse/operations/commit.rs +++ b/src/query/service/tests/it/storages/fuse/operations/commit.rs @@ -872,13 +872,6 @@ impl TableContext for CtxDelegation { fn is_temp_table(&self, _catalog_name: &str, _database_name: &str, _table_name: &str) -> bool { false } - fn add_m_cte_temp_table(&self, _database_name: &str, _table_name: &str) { - todo!() - } - - async fn drop_m_cte_temp_table(&self) -> Result<()> { - todo!() - } fn set_cluster(&self, _: Arc) { todo!() diff --git a/src/query/sql/src/executor/format.rs b/src/query/sql/src/executor/format.rs index 684607c500e4e..0fa30d2531459 100644 --- a/src/query/sql/src/executor/format.rs +++ b/src/query/sql/src/executor/format.rs @@ -527,6 +527,32 @@ fn to_format_tree( PhysicalPlan::BroadcastSink(_plan) => { Ok(FormatTreeNode::new("RuntimeFilterSink".to_string())) } + PhysicalPlan::MaterializedCTE(plan) => { + let mut children = Vec::new(); + append_profile_info(&mut children, profs, plan.plan_id); + children.push(to_format_tree(&plan.left, metadata, profs, context)?); + children.push(to_format_tree(&plan.right, metadata, profs, context)?); + Ok(FormatTreeNode::with_children( + format!("MaterializedCTE: {}", plan.cte_name), + children, + )) + } + PhysicalPlan::CTEConsumer(plan) => { + let mut children = Vec::new(); + children.push(FormatTreeNode::new(format!( + "cte_name: {}", + plan.cte_name.clone() + ))); + children.push(FormatTreeNode::new(format!( + "cte_schema: [{}]", + format_output_columns(plan.cte_schema.clone(), metadata, false) + ))); + append_profile_info(&mut children, profs, plan.plan_id); + Ok(FormatTreeNode::with_children( + "CTEConsumer".to_string(), + children, + )) + } } } diff --git a/src/query/sql/src/executor/physical_plan.rs b/src/query/sql/src/executor/physical_plan.rs index 33f29fabf5cd1..7899b6aca9633 100644 --- a/src/query/sql/src/executor/physical_plan.rs +++ b/src/query/sql/src/executor/physical_plan.rs @@ -37,6 +37,7 @@ use crate::executor::physical_plans::AggregateExpand; use crate::executor::physical_plans::AggregateFinal; use crate::executor::physical_plans::AggregatePartial; use crate::executor::physical_plans::AsyncFunction; +use crate::executor::physical_plans::CTEConsumer; use crate::executor::physical_plans::CacheScan; use crate::executor::physical_plans::ChunkAppendData; use crate::executor::physical_plans::ChunkCastSchema; @@ -62,6 +63,7 @@ use crate::executor::physical_plans::ExpressionScan; use crate::executor::physical_plans::Filter; use crate::executor::physical_plans::HashJoin; use crate::executor::physical_plans::Limit; +use crate::executor::physical_plans::MaterializedCTE; use crate::executor::physical_plans::Mutation; use crate::executor::physical_plans::ProjectSet; use crate::executor::physical_plans::RangeJoin; @@ -161,6 +163,9 @@ pub enum PhysicalPlan { // broadcast BroadcastSource(BroadcastSource), BroadcastSink(BroadcastSink), + + MaterializedCTE(Box), + CTEConsumer(Box), } impl PhysicalPlan { @@ -422,6 +427,16 @@ impl PhysicalPlan { *next_id += 1; plan.input.adjust_plan_id(next_id); } + PhysicalPlan::MaterializedCTE(plan) => { + plan.plan_id = *next_id; + *next_id += 1; + plan.left.adjust_plan_id(next_id); + plan.right.adjust_plan_id(next_id); + } + PhysicalPlan::CTEConsumer(plan) => { + plan.plan_id = *next_id; + *next_id += 1; + } } } @@ -480,6 +495,8 @@ impl PhysicalPlan { PhysicalPlan::RecursiveCteScan(v) => v.plan_id, PhysicalPlan::BroadcastSource(v) => v.plan_id, PhysicalPlan::BroadcastSink(v) => v.plan_id, + PhysicalPlan::MaterializedCTE(v) => v.plan_id, + PhysicalPlan::CTEConsumer(v) => v.plan_id, } } @@ -537,6 +554,8 @@ impl PhysicalPlan { PhysicalPlan::ChunkAppendData(_) => todo!(), PhysicalPlan::ChunkMerge(_) => todo!(), PhysicalPlan::ChunkCommitInsert(_) => todo!(), + PhysicalPlan::MaterializedCTE(plan) => plan.output_schema(), + PhysicalPlan::CTEConsumer(plan) => plan.output_schema(), } } @@ -600,6 +619,8 @@ impl PhysicalPlan { PhysicalPlan::ChunkCommitInsert(_) => "Commit".to_string(), PhysicalPlan::BroadcastSource(_) => "RuntimeFilterSource".to_string(), PhysicalPlan::BroadcastSink(_) => "RuntimeFilterSink".to_string(), + PhysicalPlan::MaterializedCTE(_) => "MaterializedCTE".to_string(), + PhysicalPlan::CTEConsumer(_) => "CTEConsumer".to_string(), } } @@ -613,6 +634,7 @@ impl PhysicalPlan { | PhysicalPlan::ReplaceAsyncSourcer(_) | PhysicalPlan::Recluster(_) | PhysicalPlan::RecursiveCteScan(_) + | PhysicalPlan::CTEConsumer(_) | PhysicalPlan::BroadcastSource(_) => Box::new(std::iter::empty()), PhysicalPlan::HilbertPartition(plan) => Box::new(std::iter::once(plan.input.as_ref())), PhysicalPlan::Filter(plan) => Box::new(std::iter::once(plan.input.as_ref())), @@ -674,6 +696,9 @@ impl PhysicalPlan { CopyIntoTableSource::Query(v) => Box::new(std::iter::once(v.as_ref())), CopyIntoTableSource::Stage(v) => Box::new(std::iter::once(v.as_ref())), }, + PhysicalPlan::MaterializedCTE(plan) => Box::new( + std::iter::once(plan.left.as_ref()).chain(std::iter::once(plan.right.as_ref())), + ), } } @@ -687,6 +712,7 @@ impl PhysicalPlan { | PhysicalPlan::ReplaceAsyncSourcer(_) | PhysicalPlan::Recluster(_) | PhysicalPlan::BroadcastSource(_) + | PhysicalPlan::CTEConsumer(_) | PhysicalPlan::RecursiveCteScan(_) => Box::new(std::iter::empty()), PhysicalPlan::HilbertPartition(plan) => Box::new(std::iter::once(plan.input.as_mut())), PhysicalPlan::Filter(plan) => Box::new(std::iter::once(plan.input.as_mut())), @@ -747,6 +773,9 @@ impl PhysicalPlan { CopyIntoTableSource::Query(v) => Box::new(std::iter::once(v.as_mut())), CopyIntoTableSource::Stage(v) => Box::new(std::iter::once(v.as_mut())), }, + PhysicalPlan::MaterializedCTE(plan) => Box::new( + std::iter::once(plan.left.as_mut()).chain(std::iter::once(plan.right.as_mut())), + ), PhysicalPlan::BroadcastSink(plan) => Box::new(std::iter::once(plan.input.as_mut())), } } @@ -805,7 +834,9 @@ impl PhysicalPlan { | PhysicalPlan::ChunkMerge(_) | PhysicalPlan::ChunkCommitInsert(_) | PhysicalPlan::BroadcastSource(_) - | PhysicalPlan::BroadcastSink(_) => None, + | PhysicalPlan::BroadcastSink(_) + | PhysicalPlan::MaterializedCTE(_) + | PhysicalPlan::CTEConsumer(_) => None, } } diff --git a/src/query/sql/src/executor/physical_plan_builder.rs b/src/query/sql/src/executor/physical_plan_builder.rs index c50adc9f30db4..aa948994a22b5 100644 --- a/src/query/sql/src/executor/physical_plan_builder.rs +++ b/src/query/sql/src/executor/physical_plan_builder.rs @@ -129,6 +129,13 @@ impl PhysicalPlanBuilder { self.build_mutation_source(mutation_source).await } RelOperator::CompactBlock(compact) => self.build_compact_block(compact).await, + RelOperator::MaterializedCTE(materialized_cte) => { + self.build_materialized_cte(s_expr, materialized_cte, stat_info, required) + .await + } + RelOperator::CTEConsumer(cte_consumer) => { + self.build_cte_consumer(cte_consumer, stat_info).await + } } } diff --git a/src/query/sql/src/executor/physical_plan_visitor.rs b/src/query/sql/src/executor/physical_plan_visitor.rs index 7267b7665e370..a98cedc17f71a 100644 --- a/src/query/sql/src/executor/physical_plan_visitor.rs +++ b/src/query/sql/src/executor/physical_plan_visitor.rs @@ -29,6 +29,7 @@ use crate::executor::physical_plans::AggregateExpand; use crate::executor::physical_plans::AggregateFinal; use crate::executor::physical_plans::AggregatePartial; use crate::executor::physical_plans::AsyncFunction; +use crate::executor::physical_plans::CTEConsumer; use crate::executor::physical_plans::ChunkAppendData; use crate::executor::physical_plans::ChunkCastSchema; use crate::executor::physical_plans::ChunkCommitInsert; @@ -52,6 +53,7 @@ use crate::executor::physical_plans::ExchangeSource; use crate::executor::physical_plans::Filter; use crate::executor::physical_plans::HashJoin; use crate::executor::physical_plans::Limit; +use crate::executor::physical_plans::MaterializedCTE; use crate::executor::physical_plans::Mutation; use crate::executor::physical_plans::MutationSource; use crate::executor::physical_plans::ProjectSet; @@ -124,6 +126,8 @@ pub trait PhysicalPlanReplacer { PhysicalPlan::ChunkCommitInsert(plan) => self.replace_chunk_commit_insert(plan), PhysicalPlan::BroadcastSource(plan) => self.replace_runtime_filter_source(plan), PhysicalPlan::BroadcastSink(plan) => self.replace_runtime_filter_sink(plan), + PhysicalPlan::MaterializedCTE(plan) => self.replace_materialized_cte(plan), + PhysicalPlan::CTEConsumer(plan) => self.replace_cte_consumer(plan), } } @@ -643,6 +647,23 @@ pub trait PhysicalPlanReplacer { }, ))) } + + fn replace_materialized_cte(&mut self, plan: &MaterializedCTE) -> Result { + let left = self.replace(&plan.left)?; + let right = self.replace(&plan.right)?; + Ok(PhysicalPlan::MaterializedCTE(Box::new(MaterializedCTE { + plan_id: plan.plan_id, + left: Box::new(left), + right: Box::new(right), + stat_info: plan.stat_info.clone(), + cte_name: plan.cte_name.clone(), + cte_output_columns: plan.cte_output_columns.clone(), + }))) + } + + fn replace_cte_consumer(&mut self, plan: &CTEConsumer) -> Result { + Ok(PhysicalPlan::CTEConsumer(Box::new(plan.clone()))) + } } impl PhysicalPlan { @@ -666,6 +687,7 @@ impl PhysicalPlan { | PhysicalPlan::ExchangeSource(_) | PhysicalPlan::CompactSource(_) | PhysicalPlan::MutationSource(_) + | PhysicalPlan::CTEConsumer(_) | PhysicalPlan::BroadcastSource(_) => {} PhysicalPlan::Filter(plan) => { Self::traverse(&plan.input, pre_visit, visit, post_visit); @@ -795,6 +817,10 @@ impl PhysicalPlan { PhysicalPlan::BroadcastSink(plan) => { Self::traverse(&plan.input, pre_visit, visit, post_visit); } + PhysicalPlan::MaterializedCTE(plan) => { + Self::traverse(&plan.left, pre_visit, visit, post_visit); + Self::traverse(&plan.right, pre_visit, visit, post_visit); + } } post_visit(plan); } diff --git a/src/query/sql/src/executor/physical_plans/mod.rs b/src/query/sql/src/executor/physical_plans/mod.rs index 2f2afb69d3368..db1051333ba03 100644 --- a/src/query/sql/src/executor/physical_plans/mod.rs +++ b/src/query/sql/src/executor/physical_plans/mod.rs @@ -38,6 +38,7 @@ mod physical_hash_join; mod physical_join; mod physical_join_filter; mod physical_limit; +mod physical_materialized_cte; mod physical_multi_table_insert; mod physical_mutation; mod physical_mutation_into_organize; @@ -60,6 +61,7 @@ mod physical_union_all; mod physical_window; mod physical_window_partition; +mod physical_cte_consumer; pub use common::*; pub use physical_add_stream_column::AddStreamColumn; pub use physical_aggregate_expand::AggregateExpand; @@ -77,6 +79,7 @@ pub use physical_compact_source::CompactSource; pub use physical_constant_table_scan::ConstantTableScan; pub use physical_copy_into_location::CopyIntoLocation; pub use physical_copy_into_table::*; +pub use physical_cte_consumer::*; pub use physical_distributed_insert_select::DistributedInsertSelect; pub use physical_eval_scalar::EvalScalar; pub use physical_exchange::Exchange; @@ -90,6 +93,7 @@ pub use physical_join_filter::JoinRuntimeFilter; pub use physical_join_filter::PhysicalRuntimeFilter; pub use physical_join_filter::PhysicalRuntimeFilters; pub use physical_limit::Limit; +pub use physical_materialized_cte::MaterializedCTE; pub use physical_multi_table_insert::*; pub use physical_mutation::*; pub use physical_mutation_into_organize::MutationOrganize; diff --git a/src/query/sql/src/executor/physical_plans/physical_cte_consumer.rs b/src/query/sql/src/executor/physical_plans/physical_cte_consumer.rs new file mode 100644 index 0000000000000..cfb418c43fae2 --- /dev/null +++ b/src/query/sql/src/executor/physical_plans/physical_cte_consumer.rs @@ -0,0 +1,52 @@ +// Copyright 2021 Datafuse Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use databend_common_exception::Result; +use databend_common_expression::DataSchemaRef; + +use crate::executor::explain::PlanStatsInfo; +use crate::executor::PhysicalPlan; +use crate::executor::PhysicalPlanBuilder; + +/// This is a leaf operator that consumes the result of a materialized CTE. +#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)] +pub struct CTEConsumer { + // A unique id of operator in a `PhysicalPlan` tree, only used for display. + pub plan_id: u32, + // Only used for explain + pub stat_info: Option, + pub cte_name: String, + pub cte_schema: DataSchemaRef, +} + +impl CTEConsumer { + pub fn output_schema(&self) -> Result { + Ok(self.cte_schema.clone()) + } +} + +impl PhysicalPlanBuilder { + pub(crate) async fn build_cte_consumer( + &mut self, + cte_consumer: &crate::plans::CTEConsumer, + stat_info: PlanStatsInfo, + ) -> Result { + Ok(PhysicalPlan::CTEConsumer(Box::new(CTEConsumer { + plan_id: 0, + stat_info: Some(stat_info), + cte_name: cte_consumer.cte_name.clone(), + cte_schema: cte_consumer.cte_schema.clone(), + }))) + } +} diff --git a/src/query/sql/src/executor/physical_plans/physical_materialized_cte.rs b/src/query/sql/src/executor/physical_plans/physical_materialized_cte.rs new file mode 100644 index 0000000000000..fc61ae7ab3bea --- /dev/null +++ b/src/query/sql/src/executor/physical_plans/physical_materialized_cte.rs @@ -0,0 +1,73 @@ +// Copyright 2021 Datafuse Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use databend_common_exception::Result; +use databend_common_expression::DataSchemaRef; + +use crate::executor::explain::PlanStatsInfo; +use crate::executor::PhysicalPlan; +use crate::executor::PhysicalPlanBuilder; +use crate::optimizer::ir::SExpr; +use crate::ColumnBinding; +use crate::ColumnSet; + +/// This is a binary operator that executes its children in order (left to right), and returns the results of the right child +#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)] +pub struct MaterializedCTE { + // A unique id of operator in a `PhysicalPlan` tree, only used for display. + pub plan_id: u32, + // Only used for explain + pub stat_info: Option, + pub left: Box, + pub right: Box, + pub cte_name: String, + pub cte_output_columns: Vec, +} + +impl MaterializedCTE { + pub fn output_schema(&self) -> Result { + self.right.output_schema() + } +} + +impl PhysicalPlanBuilder { + pub(crate) async fn build_materialized_cte( + &mut self, + s_expr: &SExpr, + materialized_cte: &crate::plans::MaterializedCTE, + stat_info: PlanStatsInfo, + required: ColumnSet, + ) -> Result { + let left_side = Box::new( + self.build( + s_expr.child(0)?, + materialized_cte + .cte_output_columns + .iter() + .map(|c| c.index) + .collect(), + ) + .await?, + ); + let right_side = Box::new(self.build(s_expr.child(1)?, required).await?); + Ok(PhysicalPlan::MaterializedCTE(Box::new(MaterializedCTE { + plan_id: 0, + stat_info: Some(stat_info), + left: left_side, + right: right_side, + cte_name: materialized_cte.cte_name.clone(), + cte_output_columns: materialized_cte.cte_output_columns.clone(), + }))) + } +} diff --git a/src/query/sql/src/planner/binder/bind_context.rs b/src/query/sql/src/planner/binder/bind_context.rs index 2cf026e08983e..b0c1c7a7d6608 100644 --- a/src/query/sql/src/planner/binder/bind_context.rs +++ b/src/query/sql/src/planner/binder/bind_context.rs @@ -196,8 +196,6 @@ pub struct CteInfo { pub query: Query, pub materialized: bool, pub recursive: bool, - pub cte_idx: IndexType, - // If cte is materialized, save its columns pub columns: Vec, } diff --git a/src/query/sql/src/planner/binder/bind_query/bind.rs b/src/query/sql/src/planner/binder/bind_query/bind.rs index 46867955cb746..9090af1ae1bd3 100644 --- a/src/query/sql/src/planner/binder/bind_query/bind.rs +++ b/src/query/sql/src/planner/binder/bind_query/bind.rs @@ -15,30 +15,16 @@ use std::collections::HashMap; use std::sync::Arc; -use databend_common_ast::ast::ColumnDefinition; -use databend_common_ast::ast::CreateOption; -use databend_common_ast::ast::CreateTableSource; -use databend_common_ast::ast::CreateTableStmt; -use databend_common_ast::ast::Engine; use databend_common_ast::ast::Expr; -use databend_common_ast::ast::Identifier; use databend_common_ast::ast::Query; use databend_common_ast::ast::SetExpr; use databend_common_ast::ast::TableReference; -use databend_common_ast::ast::TableType; use databend_common_ast::ast::With; -use databend_common_ast::ast::CTE; -use databend_common_ast::Span; -use databend_common_catalog::catalog::CATALOG_DEFAULT; use databend_common_exception::ErrorCode; use databend_common_exception::Result; -use databend_common_expression::types::convert_to_type_name; use derive_visitor::Drive; -use derive_visitor::DriveMut; use derive_visitor::Visitor; -use derive_visitor::VisitorMut; -use crate::binder::CteInfo; use crate::normalize_identifier; use crate::optimizer::ir::SExpr; use crate::planner::binder::scalar::ScalarBinder; @@ -82,7 +68,7 @@ impl Binder { } } } - // Initialize cte map. + self.init_cte(bind_context, &with)?; // Extract limit and offset from query. @@ -98,6 +84,10 @@ impl Binder { // Bind limit. s_expr = self.bind_query_limit(query, s_expr, limit, offset); + if let Some(with) = &with { + s_expr = self.bind_materialized_cte(with, s_expr, bind_context.cte_context.clone())?; + } + Ok((s_expr, bind_context)) } @@ -130,52 +120,6 @@ impl Binder { Ok(()) } - // Initialize cte map. - pub(crate) fn init_cte( - &mut self, - bind_context: &mut BindContext, - with: &Option, - ) -> Result<()> { - let with = if let Some(with) = with { - with - } else { - return Ok(()); - }; - - for (idx, cte) in with.ctes.iter().enumerate() { - let table_name = self.normalize_identifier(&cte.alias.name).name; - if bind_context.cte_context.cte_map.contains_key(&table_name) { - return Err(ErrorCode::SemanticError(format!( - "Duplicate common table expression: {table_name}" - ))); - } - let column_name = cte - .alias - .columns - .iter() - .map(|ident| self.normalize_identifier(ident).name) - .collect(); - let cte_info = CteInfo { - columns_alias: column_name, - query: *cte.query.clone(), - recursive: with.recursive, - cte_idx: idx, - columns: vec![], - materialized: cte.materialized, - }; - // If the CTE is materialized, we'll construct a temp table for it. - if cte.materialized { - self.m_cte_to_temp_table(cte, idx, with.clone())?; - } - bind_context - .cte_context - .cte_map - .insert(table_name, cte_info); - } - - Ok(()) - } - pub(crate) fn bind_query_order_by( &mut self, bind_context: &mut BindContext, @@ -243,157 +187,4 @@ impl Binder { Arc::new(child), )) } - - fn m_cte_to_temp_table(&mut self, cte: &CTE, cte_index: usize, mut with: With) -> Result<()> { - let engine = if self.ctx.get_settings().get_persist_materialized_cte()? { - Engine::Fuse - } else { - Engine::Memory - }; - let query_id = self.ctx.get_id(); - let database = self.ctx.get_current_database(); - let mut table_identifier = cte.alias.name.clone(); - table_identifier.name = format!("{}${}", table_identifier.name, query_id.replace("-", "")); - let table_name = normalize_identifier(&table_identifier, &self.name_resolution_ctx).name; - self.m_cte_table_name.insert( - normalize_identifier(&cte.alias.name, &self.name_resolution_ctx).name, - table_name.clone(), - ); - if self - .ctx - .is_temp_table(CATALOG_DEFAULT, &database, &table_name) - { - return Err(ErrorCode::Internal(format!( - "Temporary table {:?} already exists in current session, please change the materialized CTE name", - table_name - ))); - } - - let mut expr_replacer = TableNameReplacer::new( - database.clone(), - self.m_cte_table_name.clone(), - self.name_resolution_ctx.clone(), - ); - let mut as_query = cte.query.clone(); - with.ctes.truncate(cte_index); - with.ctes.retain(|cte| !cte.materialized); - as_query.with = if !with.ctes.is_empty() { - Some(with) - } else { - None - }; - as_query.drive_mut(&mut expr_replacer); - - let source = if cte.alias.columns.is_empty() { - None - } else { - let mut bind_context = BindContext::new(); - let (_, bind_context) = self.bind_query(&mut bind_context, &as_query)?; - let columns = &bind_context.columns; - if columns.len() != cte.alias.columns.len() { - return Err(ErrorCode::Internal("Number of columns does not match")); - } - Some(CreateTableSource::Columns( - columns - .iter() - .zip(cte.alias.columns.iter()) - .map(|(column, ident)| { - let data_type = convert_to_type_name(&column.data_type); - ColumnDefinition { - name: ident.clone(), - data_type, - expr: None, - comment: None, - } - }) - .collect(), - None, - )) - }; - - let catalog = self.ctx.get_current_catalog(); - let create_table_stmt = CreateTableStmt { - create_option: CreateOption::Create, - catalog: Some(Identifier::from_name(Span::None, catalog.clone())), - database: Some(Identifier::from_name(Span::None, database.clone())), - table: table_identifier, - source, - engine: Some(engine), - uri_location: None, - cluster_by: None, - table_options: Default::default(), - iceberg_table_partition: None, - table_properties: Default::default(), - as_query: Some(as_query), - table_type: TableType::Temporary, - }; - - let create_table_sql = create_table_stmt.to_string(); - log::info!("[CTE]create_table_sql: {create_table_sql}"); - if let Some(subquery_executor) = &self.subquery_executor { - let _ = databend_common_base::runtime::block_on(async move { - subquery_executor - .execute_query_with_sql_string(&create_table_sql) - .await - })?; - } else { - return Err(ErrorCode::Internal("Binder's Subquery executor is not set")); - }; - - self.ctx.add_m_cte_temp_table(&database, &table_name); - - self.ctx - .evict_table_from_cache(&catalog, &database, &table_name) - } -} - -#[derive(VisitorMut)] -#[visitor(TableReference(enter), Expr(enter))] -pub struct TableNameReplacer { - database: String, - new_name: HashMap, - name_resolution_ctx: NameResolutionContext, -} - -impl TableNameReplacer { - pub fn new( - database: String, - new_name: HashMap, - name_resolution_ctx: NameResolutionContext, - ) -> Self { - Self { - database, - new_name, - name_resolution_ctx, - } - } - - fn replace_identifier(&mut self, identifier: &mut Identifier) { - let name = normalize_identifier(identifier, &self.name_resolution_ctx).name; - if let Some(new_name) = self.new_name.get(&name) { - identifier.name = new_name.clone(); - } - } - - fn enter_table_reference(&mut self, table_reference: &mut TableReference) { - if let TableReference::Table { - database, table, .. - } = table_reference - { - if database.is_none() || database.as_ref().unwrap().name == self.database { - self.replace_identifier(table); - } - } - } - - fn enter_expr(&mut self, expr: &mut Expr) { - if let Expr::ColumnRef { column, .. } = expr { - if column.database.is_none() || column.database.as_ref().unwrap().name == self.database - { - if let Some(table_identifier) = &mut column.table { - self.replace_identifier(table_identifier); - } - } - } - } } diff --git a/src/query/sql/src/planner/binder/bind_table_reference/bind_cte.rs b/src/query/sql/src/planner/binder/bind_table_reference/bind_cte.rs new file mode 100644 index 0000000000000..e4d067cd0d40f --- /dev/null +++ b/src/query/sql/src/planner/binder/bind_table_reference/bind_cte.rs @@ -0,0 +1,191 @@ +// Copyright 2021 Datafuse Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::sync::Arc; + +use databend_common_ast::ast::Query; +use databend_common_ast::ast::TableAlias; +use databend_common_ast::ast::With; +use databend_common_exception::ErrorCode; +use databend_common_exception::Result; +use databend_common_expression::DataField; +use databend_common_expression::DataSchemaRefExt; +use indexmap::IndexMap; + +use crate::binder::BindContext; +use crate::binder::Binder; +use crate::binder::CteContext; +use crate::binder::CteInfo; +use crate::normalize_identifier; +use crate::optimizer::ir::SExpr; +use crate::plans::CTEConsumer; +use crate::plans::MaterializedCTE; +use crate::plans::RelOperator; + +impl Binder { + pub fn init_cte(&mut self, bind_context: &mut BindContext, with: &Option) -> Result<()> { + let Some(with) = with else { + return Ok(()); + }; + for cte in with.ctes.iter() { + let cte_name = self.normalize_identifier(&cte.alias.name).name; + if bind_context.cte_context.cte_map.contains_key(&cte_name) { + return Err(ErrorCode::SemanticError(format!( + "Duplicate common table expression: {cte_name}" + ))); + } + + let column_name = cte + .alias + .columns + .iter() + .map(|ident| self.normalize_identifier(ident).name) + .collect(); + + let cte_info = CteInfo { + columns_alias: column_name, + query: *cte.query.clone(), + recursive: with.recursive, + materialized: cte.materialized, + columns: vec![], + }; + bind_context.cte_context.cte_map.insert(cte_name, cte_info); + } + + Ok(()) + } + pub fn bind_cte_consumer( + &mut self, + bind_context: &mut BindContext, + table_name: &str, + alias: &Option, + cte_info: &CteInfo, + ) -> Result<(SExpr, BindContext)> { + let (s_expr, cte_bind_context) = self.bind_cte_definition( + table_name, + bind_context.cte_context.cte_map.as_ref(), + &cte_info.query, + )?; + + let (table_alias, column_alias) = match alias { + Some(alias) => { + let table_alias = normalize_identifier(&alias.name, &self.name_resolution_ctx).name; + let column_alias = if alias.columns.is_empty() { + cte_info.columns_alias.clone() + } else { + alias + .columns + .iter() + .map(|column| normalize_identifier(column, &self.name_resolution_ctx).name) + .collect() + }; + (table_alias, column_alias) + } + None => (table_name.to_string(), cte_info.columns_alias.clone()), + }; + + if !column_alias.is_empty() && column_alias.len() != cte_bind_context.columns.len() { + return Err(ErrorCode::SemanticError(format!( + "The CTE '{}' has {} columns ({:?}), but {} aliases ({:?}) were provided. Ensure the number of aliases matches the number of columns in the CTE.", + table_name, + cte_bind_context.columns.len(), + cte_bind_context.columns.iter().map(|c| &c.column_name).collect::>(), + column_alias.len(), + column_alias, + ))); + } + + let mut cte_output_columns = cte_bind_context.columns.clone(); + for column in cte_output_columns.iter_mut() { + column.database_name = None; + column.table_name = Some(table_alias.clone()); + } + for (index, column_name) in column_alias.iter().enumerate() { + cte_output_columns[index].column_name = column_name.clone(); + } + + let fields = cte_output_columns + .iter() + .map(|column_binding| { + DataField::new( + &column_binding.index.to_string(), + *column_binding.data_type.clone(), + ) + }) + .collect(); + let cte_schema = DataSchemaRefExt::create(fields); + + let mut new_bind_context = bind_context.clone(); + for column in cte_output_columns { + new_bind_context.add_column_binding(column); + } + + let s_expr = SExpr::create_leaf(Arc::new(RelOperator::CTEConsumer(CTEConsumer { + cte_name: table_name.to_string(), + cte_schema, + def: s_expr, + }))); + Ok((s_expr, new_bind_context)) + } + + pub fn bind_cte_definition( + &mut self, + cte_name: &str, + cte_map: &IndexMap, + query: &Query, + ) -> Result<(SExpr, BindContext)> { + let mut prev_cte_map = Box::new(IndexMap::new()); + for (name, cte_info) in cte_map.iter() { + if name == cte_name { + break; + } + prev_cte_map.insert(name.clone(), cte_info.clone()); + } + let mut cte_bind_context = BindContext { + cte_context: CteContext { + cte_name: Some(cte_name.to_string()), + cte_map: prev_cte_map, + }, + ..Default::default() + }; + let (s_expr, cte_bind_context) = self.bind_query(&mut cte_bind_context, query)?; + Ok((s_expr, cte_bind_context)) + } + + pub fn bind_materialized_cte( + &mut self, + with: &With, + main_query_expr: SExpr, + cte_context: CteContext, + ) -> Result { + let mut current_expr = main_query_expr; + + for cte in with.ctes.iter().rev() { + if cte.materialized { + let cte_name = self.normalize_identifier(&cte.alias.name).name; + let (s_expr, bind_context) = + self.bind_cte_definition(&cte_name, &cte_context.cte_map, &cte.query)?; + + let materialized_cte = MaterializedCTE::new(cte_name, bind_context.columns); + current_expr = SExpr::create_binary( + Arc::new(materialized_cte.into()), + Arc::new(s_expr), + Arc::new(current_expr), + ); + } + } + + Ok(current_expr) + } +} diff --git a/src/query/sql/src/planner/binder/bind_table_reference/bind_table.rs b/src/query/sql/src/planner/binder/bind_table_reference/bind_table.rs index 7848170697095..d43bc3f7dfbba 100644 --- a/src/query/sql/src/planner/binder/bind_table_reference/bind_table.rs +++ b/src/query/sql/src/planner/binder/bind_table_reference/bind_table.rs @@ -34,7 +34,6 @@ use crate::binder::util::TableIdentifier; use crate::binder::Binder; use crate::optimizer::ir::SExpr; use crate::BindContext; - impl Binder { /// Bind a base table. /// A base table is a table that is not a view or CTE. @@ -59,6 +58,15 @@ impl Binder { table_identifier.table_name_alias(), ); + if let Some(cte_name) = &bind_context.cte_context.cte_name { + if cte_name == &table_name { + return Err(ErrorCode::SemanticError(format!( + "The cte {table_name} is not recursive, but it references itself.", + )) + .set_span(*span)); + } + } + let (consume, max_batch_size, with_opts_str) = if let Some(with_options) = with_options { check_with_opt_valid(with_options)?; let consume = get_with_opt_consume(with_options)?; @@ -69,12 +77,10 @@ impl Binder { (false, None, String::new()) }; - // Check and bind common table expression - let mut cte_suffix_name = None; let cte_map = bind_context.cte_context.cte_map.clone(); if let Some(cte_info) = cte_map.get(&table_name) { if cte_info.materialized { - cte_suffix_name = Some(self.ctx.get_id().replace("-", "")); + return self.bind_cte_consumer(bind_context, &table_name, alias, cte_info); } else { if self .metadata @@ -104,11 +110,6 @@ impl Binder { // Resolve table with catalog let table_meta = { - let table_name = if let Some(cte_suffix_name) = cte_suffix_name.as_ref() { - format!("{}${}", &table_name, cte_suffix_name) - } else { - table_name.clone() - }; match self.resolve_data_source( catalog.as_str(), database.as_str(), @@ -161,7 +162,6 @@ impl Binder { bind_context.view_info.is_some(), bind_context.planning_agg_index, false, - None, false, ); let (s_expr, mut bind_context) = self.bind_base_table( @@ -247,7 +247,6 @@ impl Binder { false, false, false, - None, bind_context.allow_virtual_column, ); let (s_expr, mut new_bind_context) = @@ -280,7 +279,6 @@ impl Binder { bind_context.view_info.is_some(), bind_context.planning_agg_index, false, - cte_suffix_name, bind_context.allow_virtual_column, ); diff --git a/src/query/sql/src/planner/binder/bind_table_reference/bind_table_function.rs b/src/query/sql/src/planner/binder/bind_table_reference/bind_table_function.rs index 3cfe23223548f..31c9b8bcbd0df 100644 --- a/src/query/sql/src/planner/binder/bind_table_reference/bind_table_function.rs +++ b/src/query/sql/src/planner/binder/bind_table_reference/bind_table_function.rs @@ -151,7 +151,6 @@ impl Binder { false, false, false, - None, false, ); @@ -214,7 +213,6 @@ impl Binder { false, false, false, - None, false, ); diff --git a/src/query/sql/src/planner/binder/bind_table_reference/mod.rs b/src/query/sql/src/planner/binder/bind_table_reference/mod.rs index 542911474aa34..75a42e6f60ddf 100644 --- a/src/query/sql/src/planner/binder/bind_table_reference/mod.rs +++ b/src/query/sql/src/planner/binder/bind_table_reference/mod.rs @@ -13,6 +13,7 @@ // limitations under the License. mod bind; +mod bind_cte; mod bind_join; mod bind_location; mod bind_obfuscate; diff --git a/src/query/sql/src/planner/binder/table.rs b/src/query/sql/src/planner/binder/table.rs index c659a4b12910a..fcd5ac7b6e82c 100644 --- a/src/query/sql/src/planner/binder/table.rs +++ b/src/query/sql/src/planner/binder/table.rs @@ -142,7 +142,6 @@ impl Binder { false, false, true, - None, bind_context.allow_virtual_column, ); @@ -164,15 +163,6 @@ impl Binder { alias: &Option, cte_info: &CteInfo, ) -> Result<(SExpr, BindContext)> { - if let Some(cte_name) = &bind_context.cte_context.cte_name { - if cte_name == table_name { - return Err(ErrorCode::SemanticError(format!( - "The cte {table_name} is not recursive, but it references itself.", - )) - .set_span(span)); - } - } - let mut new_bind_context = BindContext { parent: Some(Box::new(bind_context.clone())), bound_internal_columns: BTreeMap::new(), diff --git a/src/query/sql/src/planner/binder/util.rs b/src/query/sql/src/planner/binder/util.rs index 039a42c2f806d..be6568d310337 100644 --- a/src/query/sql/src/planner/binder/util.rs +++ b/src/query/sql/src/planner/binder/util.rs @@ -53,6 +53,11 @@ impl Binder { self.count_r_cte_scan(expr.child(1)?, cte_scan_names, cte_types)?; } + RelOperator::MaterializedCTE(_) => { + self.count_r_cte_scan(expr.child(0)?, cte_scan_names, cte_types)?; + self.count_r_cte_scan(expr.child(1)?, cte_scan_names, cte_types)?; + } + RelOperator::ProjectSet(_) | RelOperator::AsyncFunction(_) | RelOperator::Udf(_) @@ -72,6 +77,7 @@ impl Binder { | RelOperator::DummyTableScan(_) | RelOperator::ConstantTableScan(_) | RelOperator::ExpressionScan(_) + | RelOperator::CTEConsumer(_) | RelOperator::CacheScan(_) => {} // Each recursive step in a recursive query generates new rows, and these rows are used for the next recursion. // Each step depends on the results of the previous step, so it's essential to ensure that the result set is built incrementally. diff --git a/src/query/sql/src/planner/dataframe.rs b/src/query/sql/src/planner/dataframe.rs index 277a010eace81..98ac53efc657d 100644 --- a/src/query/sql/src/planner/dataframe.rs +++ b/src/query/sql/src/planner/dataframe.rs @@ -101,7 +101,6 @@ impl Dataframe { false, false, false, - None, false, ); diff --git a/src/query/sql/src/planner/expression/expression_parser.rs b/src/query/sql/src/planner/expression/expression_parser.rs index 52ed0cc99a6fb..8937e880ec55f 100644 --- a/src/query/sql/src/planner/expression/expression_parser.rs +++ b/src/query/sql/src/planner/expression/expression_parser.rs @@ -63,7 +63,6 @@ pub fn bind_table(table_meta: Arc) -> Result<(BindContext, MetadataRe false, false, false, - None, false, ); diff --git a/src/query/sql/src/planner/metadata/metadata.rs b/src/query/sql/src/planner/metadata/metadata.rs index 11e16b1541f7f..f7889a42c8447 100644 --- a/src/query/sql/src/planner/metadata/metadata.rs +++ b/src/query/sql/src/planner/metadata/metadata.rs @@ -341,11 +341,9 @@ impl Metadata { source_of_view: bool, source_of_index: bool, source_of_stage: bool, - cte_suffix_name: Option, allow_virtual_column: bool, ) -> IndexType { let table_name = table_meta.name().to_string(); - let table_name = Self::remove_cte_suffix(table_name, cte_suffix_name); let table_index = self.tables.len(); // If exists table alias name, use it instead of origin name @@ -539,15 +537,6 @@ impl Metadata { self.base_column_scan_id.get(&column_index).cloned() } - fn remove_cte_suffix(mut table_name: String, cte_suffix_name: Option) -> String { - if let Some(suffix) = cte_suffix_name { - if table_name.ends_with(&suffix) { - table_name.truncate(table_name.len() - suffix.len() - 1); - } - } - table_name - } - pub fn replace_all_tables(&mut self, table: Arc) { for entry in self.tables.iter_mut() { entry.table = table.clone(); diff --git a/src/query/sql/src/planner/optimizer/ir/expr/s_expr.rs b/src/query/sql/src/planner/optimizer/ir/expr/s_expr.rs index 4900fe5167077..a63586634106e 100644 --- a/src/query/sql/src/planner/optimizer/ir/expr/s_expr.rs +++ b/src/query/sql/src/planner/optimizer/ir/expr/s_expr.rs @@ -384,6 +384,8 @@ impl SExpr { | RelOperator::CacheScan(_) | RelOperator::RecursiveCteScan(_) | RelOperator::Mutation(_) + | RelOperator::MaterializedCTE(_) + | RelOperator::CTEConsumer(_) | RelOperator::CompactBlock(_) => {} }; for child in &self.children { @@ -485,10 +487,13 @@ impl SExpr { | crate::plans::RelOp::ConstantTableScan | crate::plans::RelOp::ExpressionScan | crate::plans::RelOp::CacheScan + | crate::plans::RelOp::CTEConsumer | crate::plans::RelOp::RecursiveCteScan => Ok(None), crate::plans::RelOp::Join => self.probe_side_child().get_data_distribution(), + crate::plans::RelOp::MaterializedCTE => self.child(1)?.get_data_distribution(), + crate::plans::RelOp::Exchange => { Ok(Some(self.plan.as_ref().clone().try_into().unwrap())) } diff --git a/src/query/sql/src/planner/optimizer/ir/format.rs b/src/query/sql/src/planner/optimizer/ir/format.rs index f9613af6b35ef..5d6909f2cf374 100644 --- a/src/query/sql/src/planner/optimizer/ir/format.rs +++ b/src/query/sql/src/planner/optimizer/ir/format.rs @@ -80,6 +80,8 @@ fn display_rel_op(rel_op: &RelOperator) -> String { RelOperator::Mutation(_) => "MergeInto".to_string(), RelOperator::MutationSource(_) => "MutationSource".to_string(), RelOperator::CompactBlock(_) => "CompactBlock".to_string(), + RelOperator::MaterializedCTE(_) => "MaterializedCTE".to_string(), + RelOperator::CTEConsumer(_) => "CTEConsumer".to_string(), } } diff --git a/src/query/sql/src/planner/optimizer/optimizer.rs b/src/query/sql/src/planner/optimizer/optimizer.rs index 73213ecd31832..2141a9e9bbe0c 100644 --- a/src/query/sql/src/planner/optimizer/optimizer.rs +++ b/src/query/sql/src/planner/optimizer/optimizer.rs @@ -26,6 +26,7 @@ use crate::binder::MutationType; use crate::optimizer::ir::Memo; use crate::optimizer::ir::SExpr; use crate::optimizer::optimizers::distributed::BroadcastToShuffleOptimizer; +use crate::optimizer::optimizers::operator::CleanupUnusedCTEOptimizer; use crate::optimizer::optimizers::operator::DeduplicateJoinConditionOptimizer; use crate::optimizer::optimizers::operator::PullUpFilterOptimizer; use crate::optimizer::optimizers::operator::RuleNormalizeAggregateOptimizer; @@ -273,7 +274,9 @@ pub async fn optimize_query(opt_ctx: Arc, s_expr: SExpr) -> Re .add_if( !opt_ctx.get_planning_agg_index(), RecursiveRuleOptimizer::new(opt_ctx.clone(), [RuleID::EliminateEvalScalar].as_slice()), - ); + ) + // 15. Clean up unused CTEs + .add(CleanupUnusedCTEOptimizer); // 15. Execute the pipeline let s_expr = pipeline.execute().await?; diff --git a/src/query/sql/src/planner/optimizer/optimizers/hyper_dp/dphyp.rs b/src/query/sql/src/planner/optimizer/optimizers/hyper_dp/dphyp.rs index 868b55f05e935..bb17f389104a0 100644 --- a/src/query/sql/src/planner/optimizer/optimizers/hyper_dp/dphyp.rs +++ b/src/query/sql/src/planner/optimizer/optimizers/hyper_dp/dphyp.rs @@ -270,6 +270,80 @@ impl DPhpyOptimizer { Ok((new_s_expr, left_res.1 && right_res.1)) } + async fn process_materialized_cte_node( + &mut self, + s_expr: &SExpr, + ) -> Result<(Arc, bool)> { + let mut left_dphyp = DPhpyOptimizer::new(self.opt_ctx.clone()); + let left_expr = left_dphyp.optimize_async(s_expr.child(0)?).await?; + + let mut right_dphyp = DPhpyOptimizer::new(self.opt_ctx.clone()); + let right_expr = right_dphyp.optimize_async(s_expr.child(1)?).await?; + + // Merge table_index_map from right child into current table_index_map + let relation_idx = self.join_relations.len() as IndexType; + for table_index in right_dphyp.table_index_map.keys() { + self.table_index_map.insert(*table_index, relation_idx); + } + + let new_s_expr = s_expr.replace_children([Arc::new(left_expr), Arc::new(right_expr)]); + self.join_relations.push(JoinRelation::new( + &new_s_expr, + self.sample_executor().clone(), + )); + Ok((Arc::new(new_s_expr), true)) + } + + async fn process_cte_consumer_node( + &mut self, + s_expr: &SExpr, + join_relation: Option<&SExpr>, + ) -> Result<(Arc, bool)> { + let cte_consumer = match s_expr.plan() { + RelOperator::CTEConsumer(consumer) => consumer, + _ => unreachable!(), + }; + + let join_relation = if let Some(relation) = join_relation { + // Check if relation contains filter, if exists, check if the filter in `filters` + // If exists, remove it from `filters` + self.check_filter(relation); + JoinRelation::new(relation, self.sample_executor().clone()) + } else { + JoinRelation::new(s_expr, self.sample_executor().clone()) + }; + + // Map table indexes before adding to join_relations + let relation_idx = self.join_relations.len() as IndexType; + let table_indexes = Self::get_table_indexes(&cte_consumer.def); + for table_index in table_indexes { + self.table_index_map.insert(table_index, relation_idx); + } + + self.join_relations.push(join_relation); + Ok((Arc::new(s_expr.clone()), true)) + } + + fn get_table_indexes(s_expr: &SExpr) -> Vec { + let mut table_indexes = Vec::new(); + Self::collect_table_indexes_recursive(s_expr, &mut table_indexes); + table_indexes + } + + fn collect_table_indexes_recursive(s_expr: &SExpr, table_indexes: &mut Vec) { + if let RelOperator::Scan(scan) = s_expr.plan() { + table_indexes.push(scan.table_index); + } + + if let RelOperator::CTEConsumer(cte_consumer) = s_expr.plan() { + Self::collect_table_indexes_recursive(&cte_consumer.def, table_indexes); + } + + for child in s_expr.children() { + Self::collect_table_indexes_recursive(child, table_indexes); + } + } + /// Process a unary operator node async fn process_unary_node( &mut self, @@ -332,6 +406,11 @@ impl DPhpyOptimizer { RelOperator::Join(_) => self.process_join_node(s_expr, join_conditions).await, + RelOperator::MaterializedCTE(_) => self.process_materialized_cte_node(s_expr).await, + RelOperator::CTEConsumer(_) => { + self.process_cte_consumer_node(s_expr, join_relation).await + } + RelOperator::ProjectSet(_) | RelOperator::Aggregate(_) | RelOperator::Sort(_) diff --git a/src/query/sql/src/planner/optimizer/optimizers/hyper_dp/dynamic_sample/dynamic_sample.rs b/src/query/sql/src/planner/optimizer/optimizers/hyper_dp/dynamic_sample/dynamic_sample.rs index eb645151c4230..cdc9071d13c84 100644 --- a/src/query/sql/src/planner/optimizer/optimizers/hyper_dp/dynamic_sample/dynamic_sample.rs +++ b/src/query/sql/src/planner/optimizer/optimizers/hyper_dp/dynamic_sample/dynamic_sample.rs @@ -92,6 +92,8 @@ pub async fn dynamic_sample( | RelOperator::RecursiveCteScan(_) | RelOperator::Mutation(_) | RelOperator::CompactBlock(_) + | RelOperator::MaterializedCTE(_) + | RelOperator::CTEConsumer(_) | RelOperator::MutationSource(_) => { s_expr.plan().derive_stats(&RelExpr::with_s_expr(s_expr)) } diff --git a/src/query/sql/src/planner/optimizer/optimizers/mod.rs b/src/query/sql/src/planner/optimizer/optimizers/mod.rs index c4f5e6998b631..aa38f80db04fc 100644 --- a/src/query/sql/src/planner/optimizer/optimizers/mod.rs +++ b/src/query/sql/src/planner/optimizer/optimizers/mod.rs @@ -21,3 +21,4 @@ pub mod rule; pub use cascades::CascadesOptimizer; pub use hyper_dp::DPhpyOptimizer; +pub use operator::CleanupUnusedCTEOptimizer; diff --git a/src/query/sql/src/planner/optimizer/optimizers/operator/cte/cleanup_unused_cte.rs b/src/query/sql/src/planner/optimizer/optimizers/operator/cte/cleanup_unused_cte.rs new file mode 100644 index 0000000000000..b29d06afe460d --- /dev/null +++ b/src/query/sql/src/planner/optimizer/optimizers/operator/cte/cleanup_unused_cte.rs @@ -0,0 +1,93 @@ +// Copyright 2021 Datafuse Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::collections::HashSet; +use std::sync::Arc; + +use databend_common_exception::Result; + +use crate::optimizer::ir::SExpr; +use crate::optimizer::Optimizer; +use crate::plans::RelOperator; + +/// Optimizer that removes unused CTEs from the query plan. +/// This optimizer should be applied at the end of the optimization pipeline +/// to clean up any CTEs that are not referenced by any CTEConsumer. +pub struct CleanupUnusedCTEOptimizer; + +impl CleanupUnusedCTEOptimizer { + /// Collect all CTE names that are referenced by CTEConsumer nodes + fn collect_referenced_ctes(s_expr: &SExpr) -> Result> { + let mut referenced_ctes = HashSet::new(); + Self::collect_referenced_ctes_recursive(s_expr, &mut referenced_ctes)?; + Ok(referenced_ctes) + } + + /// Recursively traverse the expression tree to find CTEConsumer nodes + fn collect_referenced_ctes_recursive( + s_expr: &SExpr, + referenced_ctes: &mut HashSet, + ) -> Result<()> { + // Check if current node is a CTEConsumer + if let RelOperator::CTEConsumer(consumer) = s_expr.plan() { + referenced_ctes.insert(consumer.cte_name.clone()); + } + + // Recursively process children + for child in s_expr.children() { + Self::collect_referenced_ctes_recursive(child, referenced_ctes)?; + } + + Ok(()) + } + + /// Remove unused CTEs from the expression tree + fn remove_unused_ctes(s_expr: &SExpr, referenced_ctes: &HashSet) -> Result { + if let RelOperator::MaterializedCTE(m_cte) = s_expr.plan() { + // If this CTE is not referenced, remove it by returning the right child + if !referenced_ctes.contains(&m_cte.cte_name) { + // Return the right child (main query) and skip the left child (CTE definition) + let right_child = s_expr.child(1)?; + return Self::remove_unused_ctes(right_child, referenced_ctes); + } + } + + // Process children recursively + let mut optimized_children = Vec::with_capacity(s_expr.arity()); + for child in s_expr.children() { + let optimized_child = Self::remove_unused_ctes(child, referenced_ctes)?; + optimized_children.push(Arc::new(optimized_child)); + } + + // Create new expression with optimized children + let mut new_expr = s_expr.clone(); + new_expr.children = optimized_children; + Ok(new_expr) + } +} + +#[async_trait::async_trait] +impl Optimizer for CleanupUnusedCTEOptimizer { + fn name(&self) -> String { + "CleanupUnusedCTEOptimizer".to_string() + } + + async fn optimize(&mut self, s_expr: &SExpr) -> Result { + // Collect all referenced CTEs + let referenced_ctes = Self::collect_referenced_ctes(s_expr)?; + + // Remove unused CTEs + Self::remove_unused_ctes(s_expr, &referenced_ctes) + } +} diff --git a/src/query/sql/src/planner/optimizer/optimizers/operator/cte/mod.rs b/src/query/sql/src/planner/optimizer/optimizers/operator/cte/mod.rs new file mode 100644 index 0000000000000..0aeab30d3574b --- /dev/null +++ b/src/query/sql/src/planner/optimizer/optimizers/operator/cte/mod.rs @@ -0,0 +1,17 @@ +// Copyright 2021 Datafuse Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +mod cleanup_unused_cte; + +pub use cleanup_unused_cte::CleanupUnusedCTEOptimizer; diff --git a/src/query/sql/src/planner/optimizer/optimizers/operator/decorrelate/subquery_decorrelator.rs b/src/query/sql/src/planner/optimizer/optimizers/operator/decorrelate/subquery_decorrelator.rs index 079f80afcb9fe..a3f091cda6831 100644 --- a/src/query/sql/src/planner/optimizer/optimizers/operator/decorrelate/subquery_decorrelator.rs +++ b/src/query/sql/src/planner/optimizer/optimizers/operator/decorrelate/subquery_decorrelator.rs @@ -325,6 +325,12 @@ impl SubqueryDecorrelatorOptimizer { )) } + RelOperator::MaterializedCTE(_) => Ok(SExpr::create_binary( + s_expr.plan.clone(), + Arc::new(self.optimize_sync(s_expr.left_child())?), + Arc::new(self.optimize_sync(s_expr.right_child())?), + )), + RelOperator::DummyTableScan(_) | RelOperator::Scan(_) | RelOperator::ConstantTableScan(_) @@ -334,6 +340,7 @@ impl SubqueryDecorrelatorOptimizer { | RelOperator::RecursiveCteScan(_) | RelOperator::Mutation(_) | RelOperator::MutationSource(_) + | RelOperator::CTEConsumer(_) | RelOperator::CompactBlock(_) => Ok(s_expr.clone()), } } diff --git a/src/query/sql/src/planner/optimizer/optimizers/operator/mod.rs b/src/query/sql/src/planner/optimizer/optimizers/operator/mod.rs index 0c0566a886f9e..93c8833bec214 100644 --- a/src/query/sql/src/planner/optimizer/optimizers/operator/mod.rs +++ b/src/query/sql/src/planner/optimizer/optimizers/operator/mod.rs @@ -13,12 +13,14 @@ // limitations under the License. mod aggregate; +mod cte; mod decorrelate; mod filter; mod join; pub use aggregate::RuleNormalizeAggregateOptimizer; pub use aggregate::RuleStatsAggregateOptimizer; +pub use cte::CleanupUnusedCTEOptimizer; pub use decorrelate::FlattenInfo; pub use decorrelate::SubqueryDecorrelatorOptimizer; pub use decorrelate::UnnestResult; diff --git a/src/query/sql/src/planner/optimizer/optimizers/rule/join_rules/rule_semi_to_inner_join.rs b/src/query/sql/src/planner/optimizer/optimizers/rule/join_rules/rule_semi_to_inner_join.rs index 46ebf4648860d..1cfa72a2ed63c 100644 --- a/src/query/sql/src/planner/optimizer/optimizers/rule/join_rules/rule_semi_to_inner_join.rs +++ b/src/query/sql/src/planner/optimizer/optimizers/rule/join_rules/rule_semi_to_inner_join.rs @@ -152,6 +152,8 @@ fn find_group_by_keys( | RelOperator::RecursiveCteScan(_) | RelOperator::Mutation(_) | RelOperator::MutationSource(_) + | RelOperator::MaterializedCTE(_) + | RelOperator::CTEConsumer(_) | RelOperator::CompactBlock(_) => {} } Ok(()) diff --git a/src/query/sql/src/planner/plans/cte_consumer.rs b/src/query/sql/src/planner/plans/cte_consumer.rs new file mode 100644 index 0000000000000..01580bc10133c --- /dev/null +++ b/src/query/sql/src/planner/plans/cte_consumer.rs @@ -0,0 +1,95 @@ +// Copyright 2021 Datafuse Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::hash::Hash; +use std::hash::Hasher; +use std::sync::Arc; + +use databend_common_catalog::table_context::TableContext; +use databend_common_exception::ErrorCode; +use databend_common_exception::Result; +use databend_common_expression::DataSchemaRef; + +use crate::optimizer::ir::PhysicalProperty; +use crate::optimizer::ir::RelExpr; +use crate::optimizer::ir::RelationalProperty; +use crate::optimizer::ir::RequiredProperty; +use crate::optimizer::ir::SExpr; +use crate::optimizer::ir::StatInfo; +use crate::plans::Operator; +use crate::plans::RelOp; + +#[derive(Clone, Debug, PartialEq, Eq)] +pub struct CTEConsumer { + pub cte_name: String, + pub cte_schema: DataSchemaRef, + pub def: SExpr, +} + +impl Hash for CTEConsumer { + fn hash(&self, state: &mut H) { + self.cte_name.hash(state); + } +} + +impl Operator for CTEConsumer { + fn rel_op(&self) -> RelOp { + RelOp::CTEConsumer + } + + /// Get arity of this operator + fn arity(&self) -> usize { + 0 + } + + /// Derive statistics information + fn derive_stats(&self, _rel_expr: &RelExpr) -> Result> { + RelExpr::with_s_expr(&self.def).derive_cardinality() + } + + /// Derive relational property + fn derive_relational_prop(&self, _rel_expr: &RelExpr) -> Result> { + RelExpr::with_s_expr(&self.def).derive_relational_prop() + } + + /// Derive physical property + fn derive_physical_prop(&self, _rel_expr: &RelExpr) -> Result { + RelExpr::with_s_expr(&self.def).derive_physical_prop() + } + + /// Compute required property for child with index `child_index` + fn compute_required_prop_child( + &self, + _ctx: Arc, + _rel_expr: &RelExpr, + _child_index: usize, + _required: &RequiredProperty, + ) -> Result { + Err(ErrorCode::Internal( + "Cannot compute required property for children of cte_consumer".to_string(), + )) + } + + /// Enumerate all possible combinations of required property for children + fn compute_required_prop_children( + &self, + _ctx: Arc, + _rel_expr: &RelExpr, + _required: &RequiredProperty, + ) -> Result>> { + Err(ErrorCode::Internal( + "Cannot compute required property for children of cte_consumer".to_string(), + )) + } +} diff --git a/src/query/sql/src/planner/plans/materialized_cte.rs b/src/query/sql/src/planner/plans/materialized_cte.rs new file mode 100644 index 0000000000000..523f05afb0abc --- /dev/null +++ b/src/query/sql/src/planner/plans/materialized_cte.rs @@ -0,0 +1,66 @@ +// Copyright 2021 Datafuse Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::hash::Hash; +use std::sync::Arc; + +use databend_common_exception::Result; + +use crate::optimizer::ir::PhysicalProperty; +use crate::optimizer::ir::RelExpr; +use crate::optimizer::ir::RelationalProperty; +use crate::optimizer::ir::StatInfo; +use crate::plans::Operator; +use crate::plans::RelOp; +use crate::ColumnBinding; +#[derive(Clone, Debug, PartialEq, Eq, Hash)] +pub struct MaterializedCTE { + pub cte_name: String, + pub cte_output_columns: Vec, +} + +impl MaterializedCTE { + pub fn new(cte_name: String, output_columns: Vec) -> Self { + Self { + cte_name, + cte_output_columns: output_columns, + } + } +} + +impl Operator for MaterializedCTE { + fn rel_op(&self) -> RelOp { + RelOp::MaterializedCTE + } + + /// Get arity of this operator + fn arity(&self) -> usize { + 2 + } + + /// Derive relational property + fn derive_relational_prop(&self, rel_expr: &RelExpr) -> Result> { + rel_expr.derive_relational_prop_child(1) + } + + /// Derive physical property + fn derive_physical_prop(&self, rel_expr: &RelExpr) -> Result { + rel_expr.derive_physical_prop_child(1) + } + + /// Derive statistics information + fn derive_stats(&self, rel_expr: &RelExpr) -> Result> { + rel_expr.derive_cardinality_child(1) + } +} diff --git a/src/query/sql/src/planner/plans/mod.rs b/src/query/sql/src/planner/plans/mod.rs index 2f324de8199d8..bcd237abc68a6 100644 --- a/src/query/sql/src/planner/plans/mod.rs +++ b/src/query/sql/src/planner/plans/mod.rs @@ -19,6 +19,7 @@ mod call; mod constant_table_scan; mod copy_into_location; mod copy_into_table; +mod cte_consumer; mod data_mask; mod ddl; mod dummy_table_scan; @@ -31,6 +32,7 @@ mod insert_multi_table; mod join; mod kill; mod limit; +mod materialized_cte; mod mutation; mod mutation_source; mod operator; @@ -61,6 +63,7 @@ pub use call::CallPlan; pub use constant_table_scan::ConstantTableScan; pub use copy_into_location::*; pub use copy_into_table::*; +pub use cte_consumer::*; pub use data_mask::*; pub use ddl::*; pub use dummy_table_scan::DummyTableScan; @@ -73,6 +76,7 @@ pub use insert_multi_table::*; pub use join::*; pub use kill::KillPlan; pub use limit::*; +pub use materialized_cte::*; pub use mutation::MatchedEvaluator; pub use mutation::Mutation; pub use mutation::UnmatchedEvaluator; diff --git a/src/query/sql/src/planner/plans/operator.rs b/src/query/sql/src/planner/plans/operator.rs index 3d3a67518fed0..72d23ace98c73 100644 --- a/src/query/sql/src/planner/plans/operator.rs +++ b/src/query/sql/src/planner/plans/operator.rs @@ -29,6 +29,7 @@ use crate::optimizer::ir::StatInfo; use crate::plans::r_cte_scan::RecursiveCteScan; use crate::plans::Aggregate; use crate::plans::AsyncFunction; +use crate::plans::CTEConsumer; use crate::plans::CacheScan; use crate::plans::ConstantTableScan; use crate::plans::DummyTableScan; @@ -38,6 +39,7 @@ use crate::plans::ExpressionScan; use crate::plans::Filter; use crate::plans::Join; use crate::plans::Limit; +use crate::plans::MaterializedCTE; use crate::plans::Mutation; use crate::plans::OptimizeCompactBlock; use crate::plans::ProjectSet; @@ -119,6 +121,8 @@ pub enum RelOp { MergeInto, CompactBlock, MutationSource, + MaterializedCTE, + CTEConsumer, // Pattern Pattern, @@ -155,6 +159,8 @@ pub enum RelOperator { Mutation(Mutation), CompactBlock(OptimizeCompactBlock), MutationSource(MutationSource), + MaterializedCTE(MaterializedCTE), + CTEConsumer(CTEConsumer), } impl RelOperator { @@ -172,6 +178,8 @@ impl RelOperator { | RelOperator::AsyncFunction(_) | RelOperator::RecursiveCteScan(_) | RelOperator::Mutation(_) + | RelOperator::MaterializedCTE(_) + | RelOperator::CTEConsumer(_) | RelOperator::CompactBlock(_) => false, RelOperator::Join(op) => op.has_subquery(), RelOperator::EvalScalar(op) => op.items.iter().any(|expr| expr.scalar.has_subquery()), @@ -219,6 +227,8 @@ impl RelOperator { | RelOperator::RecursiveCteScan(_) | RelOperator::Mutation(_) | RelOperator::CompactBlock(_) + | RelOperator::MaterializedCTE(_) + | RelOperator::CTEConsumer(_) | RelOperator::MutationSource(_) => (), RelOperator::Join(op) => { for condition in &op.equi_conditions { @@ -299,6 +309,8 @@ impl Operator for RelOperator { RelOperator::Mutation(rel_op) => rel_op.rel_op(), RelOperator::CompactBlock(rel_op) => rel_op.rel_op(), RelOperator::MutationSource(rel_op) => rel_op.rel_op(), + RelOperator::MaterializedCTE(rel_op) => rel_op.rel_op(), + RelOperator::CTEConsumer(rel_op) => rel_op.rel_op(), } } @@ -325,6 +337,8 @@ impl Operator for RelOperator { RelOperator::Mutation(rel_op) => rel_op.arity(), RelOperator::CompactBlock(rel_op) => rel_op.arity(), RelOperator::MutationSource(rel_op) => rel_op.arity(), + RelOperator::MaterializedCTE(rel_op) => rel_op.arity(), + RelOperator::CTEConsumer(rel_op) => rel_op.arity(), } } @@ -351,6 +365,8 @@ impl Operator for RelOperator { RelOperator::Mutation(rel_op) => rel_op.derive_relational_prop(rel_expr), RelOperator::CompactBlock(rel_op) => rel_op.derive_relational_prop(rel_expr), RelOperator::MutationSource(rel_op) => rel_op.derive_relational_prop(rel_expr), + RelOperator::MaterializedCTE(rel_op) => rel_op.derive_relational_prop(rel_expr), + RelOperator::CTEConsumer(rel_op) => rel_op.derive_relational_prop(rel_expr), } } @@ -377,6 +393,8 @@ impl Operator for RelOperator { RelOperator::Mutation(rel_op) => rel_op.derive_physical_prop(rel_expr), RelOperator::CompactBlock(rel_op) => rel_op.derive_physical_prop(rel_expr), RelOperator::MutationSource(rel_op) => rel_op.derive_physical_prop(rel_expr), + RelOperator::MaterializedCTE(rel_op) => rel_op.derive_physical_prop(rel_expr), + RelOperator::CTEConsumer(rel_op) => rel_op.derive_physical_prop(rel_expr), } } @@ -403,6 +421,8 @@ impl Operator for RelOperator { RelOperator::Mutation(rel_op) => rel_op.derive_stats(rel_expr), RelOperator::CompactBlock(rel_op) => rel_op.derive_stats(rel_expr), RelOperator::MutationSource(rel_op) => rel_op.derive_stats(rel_expr), + RelOperator::MaterializedCTE(rel_op) => rel_op.derive_stats(rel_expr), + RelOperator::CTEConsumer(rel_op) => rel_op.derive_stats(rel_expr), } } @@ -477,6 +497,12 @@ impl Operator for RelOperator { RelOperator::MutationSource(rel_op) => { rel_op.compute_required_prop_child(ctx, rel_expr, child_index, required) } + RelOperator::MaterializedCTE(rel_op) => { + rel_op.compute_required_prop_child(ctx, rel_expr, child_index, required) + } + RelOperator::CTEConsumer(rel_op) => { + rel_op.compute_required_prop_child(ctx, rel_expr, child_index, required) + } } } @@ -550,6 +576,12 @@ impl Operator for RelOperator { RelOperator::MutationSource(rel_op) => { rel_op.compute_required_prop_children(ctx, rel_expr, required) } + RelOperator::MaterializedCTE(rel_op) => { + rel_op.compute_required_prop_children(ctx, rel_expr, required) + } + RelOperator::CTEConsumer(rel_op) => { + rel_op.compute_required_prop_children(ctx, rel_expr, required) + } } } } @@ -938,3 +970,23 @@ impl TryFrom for MutationSource { } } } + +impl From for RelOperator { + fn from(v: MaterializedCTE) -> Self { + Self::MaterializedCTE(v) + } +} + +impl TryFrom for MaterializedCTE { + type Error = ErrorCode; + fn try_from(value: RelOperator) -> Result { + if let RelOperator::MaterializedCTE(value) = value { + Ok(value) + } else { + Err(ErrorCode::Internal(format!( + "Cannot downcast {:?} to MaterializedCTE", + value.rel_op() + ))) + } + } +} diff --git a/tests/sqllogictests/suites/query/cleanup_unused_cte.test b/tests/sqllogictests/suites/query/cleanup_unused_cte.test new file mode 100644 index 0000000000000..4c38d8a4e6845 --- /dev/null +++ b/tests/sqllogictests/suites/query/cleanup_unused_cte.test @@ -0,0 +1,112 @@ +# Test for CleanupUnusedCTEOptimizer +# This test verifies that unused CTEs are properly removed from the query plan + +# Test case 1: CTE is used, should not be removed +query I +with t1 as materialized (select number as a from numbers(10)) select t1.a from t1; +---- +0 +1 +2 +3 +4 +5 +6 +7 +8 +9 + +# Test case 2: CTE is not used, should be removed +query I +with t1 as materialized (select number as a from numbers(10)) select number as b from numbers(5); +---- +0 +1 +2 +3 +4 + +# Test case 3: Multiple CTEs, some used and some unused +query I +with t1 as materialized (select number as a from numbers(10)), + t2 as materialized (select number as b from numbers(20)), + t3 as materialized (select number as c from numbers(30)) +select t1.a from t1 join t2 on t1.a = t2.b; +---- +0 +1 +2 +3 +4 +5 +6 +7 +8 +9 + +# Test case 4: Nested CTEs, inner CTE is unused +query I +with t1 as materialized (select number as a from numbers(10)), + t2 as materialized (select a as b from t1), + t3 as materialized (select number as c from numbers(5)) +select t2.b from t2; +---- +0 +1 +2 +3 +4 +5 +6 +7 +8 +9 + +# Test case 5: All CTEs are unused +query I +with t1 as materialized (select number as a from numbers(10)), + t2 as materialized (select number as b from numbers(20)) +select number as c from numbers(3); +---- +0 +1 +2 + +# Test case 6: CTE with complex query, should be removed when unused +query I +with t1 as materialized ( + select number as a, number * 2 as b + from numbers(10) + where number > 5 +) +select number as c from numbers(3); +---- +0 +1 +2 + +# Test case 7: CTE with aggregation, should be removed when unused +query I +with t1 as materialized ( + select number as a, count(*) as cnt + from numbers(10) + group by number +) +select number as b from numbers(3); +---- +0 +1 +2 + +# Test case 8: CTE with join, should be removed when unused +query I +with t1 as materialized ( + select n1.number as a, n2.number as b + from numbers(5) n1 + join numbers(5) n2 on n1.number = n2.number +) +select number as c from numbers(3); +---- +0 +1 +2 \ No newline at end of file