diff --git a/Cargo.lock b/Cargo.lock index a115a6fe90866..e5ed3e92b1fb7 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1716,6 +1716,15 @@ dependencies = [ "tracing", ] +[[package]] +name = "common-planner" +version = "0.1.0" +dependencies = [ + "common-catalog", + "common-datavalues", + "parking_lot 0.12.1", +] + [[package]] name = "common-proto-conv" version = "0.1.0" @@ -2533,6 +2542,7 @@ dependencies = [ "common-pipeline-sinks", "common-pipeline-sources", "common-pipeline-transforms", + "common-planner", "common-settings", "common-storage", "common-storages-fuse", diff --git a/Cargo.toml b/Cargo.toml index 22e7b088c4f55..f5a75649ab62f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -30,6 +30,7 @@ members = [ "src/query/functions-v2", "src/query/legacy-parser", "src/query/management", + "src/query/planner", "src/query/pipeline/core", "src/query/pipeline/sinks", "src/query/pipeline/sources", diff --git a/src/query/planner/Cargo.toml b/src/query/planner/Cargo.toml new file mode 100644 index 0000000000000..0ea063ee6c89f --- /dev/null +++ b/src/query/planner/Cargo.toml @@ -0,0 +1,13 @@ +[package] +name = "common-planner" +version = "0.1.0" +authors = ["Databend Authors "] +license = "Apache-2.0" +publish = false +edition = "2021" + +[dependencies] +common-catalog = { path = "../catalog" } +common-datavalues = { path = "../datavalues" } + +parking_lot = "0.12" diff --git a/src/query/planner/src/lib.rs b/src/query/planner/src/lib.rs new file mode 100644 index 0000000000000..7ca45609fd65c --- /dev/null +++ b/src/query/planner/src/lib.rs @@ -0,0 +1,31 @@ +// Copyright 2022 Datafuse Labs. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! Databend Planner is the core part of Databend Query, it will: +//! +//! - Use `Parser` (provided by `common-ast`) to parse query into AST. +//! - Use `Binder` to bind query into `LogicalPlan` +//! - Use `Optimizer` to optimize `LogicalPlan` into `PhysicalPlan` +//! +//! After all the planners work, `Interpreter` will use `PhysicalPlan` to +//! build pipelines, then our processes will produce result data blocks. + +mod metadata; +pub use metadata::ColumnEntry; +pub use metadata::ColumnSet; +pub use metadata::IndexType; +pub use metadata::Metadata; +pub use metadata::MetadataRef; +pub use metadata::TableEntry; +pub use metadata::DUMMY_TABLE_INDEX; diff --git a/src/query/planner/src/metadata.rs b/src/query/planner/src/metadata.rs new file mode 100644 index 0000000000000..dce925ad2ee7a --- /dev/null +++ b/src/query/planner/src/metadata.rs @@ -0,0 +1,318 @@ +// Copyright 2022 Datafuse Labs. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::collections::HashSet; +use std::collections::VecDeque; +use std::fmt::Debug; +use std::fmt::Formatter; +use std::sync::Arc; + +use common_catalog::table::Table; +use common_datavalues::DataField; +use common_datavalues::DataType; +use common_datavalues::DataTypeImpl; +use common_datavalues::StructType; +use common_datavalues::TypeID; +use parking_lot::RwLock; + +/// Planner use [`usize`] as it's index type. +/// +/// This type will be used across the whole planner. +pub type IndexType = usize; + +/// Use IndexType::MAX to represent dummy table. +pub static DUMMY_TABLE_INDEX: IndexType = IndexType::MAX; + +/// ColumnSet represents a set of columns identified by its IndexType. +pub type ColumnSet = HashSet; + +/// A Send & Send version of [`Metadata`]. +/// +/// Callers can clone this ref safely and cheaply. +pub type MetadataRef = Arc>; + +/// Metadata stores information about columns and tables used in a query. +/// Tables and columns are identified with its unique index. +/// Notice that index value of a column can be same with that of a table. +#[derive(Clone, Debug, Default)] +pub struct Metadata { + tables: Vec, + columns: Vec, +} + +impl Metadata { + pub fn table(&self, index: IndexType) -> &TableEntry { + self.tables.get(index).expect("metadata must contain table") + } + + pub fn tables(&self) -> &[TableEntry] { + self.tables.as_slice() + } + + pub fn table_index_by_column_indexes(&self, column_indexes: &ColumnSet) -> Option { + self.columns + .iter() + .find(|v| column_indexes.contains(&v.column_index)) + .and_then(|v| v.table_index) + } + + pub fn column(&self, index: IndexType) -> &ColumnEntry { + self.columns + .get(index) + .expect("metadata must contain column") + } + + pub fn columns(&self) -> &[ColumnEntry] { + self.columns.as_slice() + } + + pub fn columns_by_table_index(&self, index: IndexType) -> Vec { + self.columns + .iter() + .filter(|v| v.table_index == Some(index)) + .cloned() + .collect() + } + + pub fn add_column( + &mut self, + name: String, + data_type: DataTypeImpl, + table_index: Option, + path_indices: Option>, + ) -> IndexType { + let column_index = self.columns.len(); + let column_entry = + ColumnEntry::new(name, data_type, column_index, table_index, path_indices); + self.columns.push(column_entry); + column_index + } + + pub fn add_table( + &mut self, + catalog: String, + database: String, + table_meta: Arc, + ) -> IndexType { + let table_name = table_meta.name().to_string(); + let table_index = self.tables.len(); + let table_entry = TableEntry { + index: table_index, + name: table_name, + database, + catalog, + table: table_meta.clone(), + }; + self.tables.push(table_entry); + let mut struct_fields = VecDeque::new(); + for (i, field) in table_meta.schema().fields().iter().enumerate() { + self.add_column( + field.name().clone(), + field.data_type().clone(), + Some(table_index), + None, + ); + if field.data_type().data_type_id() == TypeID::Struct { + struct_fields.push_back((vec![i], field.clone())); + } + } + // add inner columns of struct column + while !struct_fields.is_empty() { + let (path_indices, field) = struct_fields.pop_front().unwrap(); + let struct_type: StructType = field.data_type().clone().try_into().unwrap(); + + let inner_types = struct_type.types(); + let inner_names = match struct_type.names() { + Some(inner_names) => inner_names + .iter() + .map(|name| format!("{}:{}", field.name(), name)) + .collect::>(), + None => (0..inner_types.len()) + .map(|i| format!("{}:{}", field.name(), i)) + .collect::>(), + }; + for ((i, inner_name), inner_type) in + inner_names.into_iter().enumerate().zip(inner_types.iter()) + { + let mut inner_path_indices = path_indices.clone(); + inner_path_indices.push(i); + + self.add_column( + inner_name.clone(), + inner_type.clone(), + Some(table_index), + Some(inner_path_indices.clone()), + ); + if inner_type.data_type_id() == TypeID::Struct { + let inner_field = DataField::new(&inner_name, inner_type.clone()); + struct_fields.push_back((inner_path_indices, inner_field)); + } + } + } + table_index + } + + pub fn find_smallest_column_within(&self, indices: &[usize]) -> usize { + let entries = indices + .iter() + .map(|i| self.column(*i).clone()) + .collect::>(); + find_smallest_column(entries.as_slice()) + } +} + +#[derive(Clone)] +pub struct TableEntry { + catalog: String, + database: String, + name: String, + index: IndexType, + + table: Arc, +} + +impl Debug for TableEntry { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + f.debug_struct("TableEntry") + .field("catalog", &self.catalog) + .field("database", &self.database) + .field("name", &self.name) + .field("index", &self.index) + .finish_non_exhaustive() + } +} + +impl TableEntry { + pub fn new( + index: IndexType, + name: String, + catalog: String, + database: String, + table: Arc, + ) -> Self { + TableEntry { + index, + name, + catalog, + database, + table, + } + } + + /// Get the catalog name of this table entry. + pub fn catalog(&self) -> &str { + &self.catalog + } + + /// Get the database name of this table entry. + pub fn database(&self) -> &str { + &self.database + } + + /// Get the name of this table entry. + pub fn name(&self) -> &str { + &self.name + } + + /// Get the index this table entry. + pub fn index(&self) -> IndexType { + self.index + } + + /// Get the table of this table entry. + pub fn table(&self) -> Arc { + self.table.clone() + } +} + +#[derive(Clone, Debug)] +pub struct ColumnEntry { + column_index: IndexType, + name: String, + data_type: DataTypeImpl, + + /// Table index of column entry. None if column is derived from a subquery. + table_index: Option, + /// Path indices for inner column of struct data type. + path_indices: Option>, +} + +impl ColumnEntry { + pub fn new( + name: String, + data_type: DataTypeImpl, + column_index: IndexType, + table_index: Option, + path_indices: Option>, + ) -> Self { + ColumnEntry { + column_index, + name, + data_type, + table_index, + path_indices, + } + } + + /// Get the name of this column entry. + pub fn name(&self) -> &str { + &self.name + } + + /// Get the index of this column entry. + pub fn index(&self) -> IndexType { + self.column_index + } + + /// Get the data type of this column entry. + pub fn data_type(&self) -> &DataTypeImpl { + &self.data_type + } + + /// Get the table index of this column entry. + pub fn table_index(&self) -> Option { + self.table_index + } + + /// Get the path indices of this column entry. + pub fn path_indices(&self) -> Option<&[IndexType]> { + self.path_indices.as_deref() + } + + /// Check if this column entry contains path_indices + pub fn has_path_indices(&self) -> bool { + self.path_indices.is_some() + } +} + +/// TODO(xuanwo): migrate this as a function of metadata. +pub fn find_smallest_column(entries: &[ColumnEntry]) -> usize { + debug_assert!(!entries.is_empty()); + let mut column_indexes = entries + .iter() + .map(|entry| entry.column_index) + .collect::>(); + column_indexes.sort(); + let mut smallest_index = column_indexes[0]; + let mut smallest_size = usize::MAX; + for (idx, column_entry) in entries.iter().enumerate() { + if let Ok(bytes) = column_entry.data_type.data_type_id().numeric_byte_size() { + if smallest_size > bytes { + smallest_size = bytes; + smallest_index = entries[idx].column_index; + } + } + } + smallest_index +} diff --git a/src/query/service/Cargo.toml b/src/query/service/Cargo.toml index e0065cb429f49..2eae5989ce71a 100644 --- a/src/query/service/Cargo.toml +++ b/src/query/service/Cargo.toml @@ -61,6 +61,7 @@ common-pipeline-core = { path = "../pipeline/core" } common-pipeline-sinks = { path = "../pipeline/sinks" } common-pipeline-sources = { path = "../pipeline/sources" } common-pipeline-transforms = { path = "../pipeline/transforms" } +common-planner = { path = "../planner" } common-settings = { path = "../settings" } common-storage = { path = "../../common/storage" } common-storages-fuse = { path = "../storages/fuse" } diff --git a/src/query/service/src/interpreters/interpreter_explain_v2.rs b/src/query/service/src/interpreters/interpreter_explain_v2.rs index b3af9dfb4c933..901e0b0da5c71 100644 --- a/src/query/service/src/interpreters/interpreter_explain_v2.rs +++ b/src/query/service/src/interpreters/interpreter_explain_v2.rs @@ -18,6 +18,7 @@ use common_datablocks::DataBlock; use common_datavalues::prelude::*; use common_exception::ErrorCode; use common_exception::Result; +use common_planner::MetadataRef; use super::fragments::Fragmenter; use super::QueryFragmentsActions; @@ -29,7 +30,6 @@ use crate::sql::executor::PhysicalPlanBuilder; use crate::sql::executor::PipelineBuilder; use crate::sql::optimizer::SExpr; use crate::sql::plans::Plan; -use crate::sql::MetadataRef; pub struct ExplainInterpreterV2 { ctx: Arc, diff --git a/src/query/service/src/interpreters/interpreter_insert_v2.rs b/src/query/service/src/interpreters/interpreter_insert_v2.rs index 20a518781c6bc..309f9eb7a9daa 100644 --- a/src/query/service/src/interpreters/interpreter_insert_v2.rs +++ b/src/query/service/src/interpreters/interpreter_insert_v2.rs @@ -39,6 +39,8 @@ use common_pipeline_sources::processors::sources::AsyncSourcer; use common_pipeline_sources::processors::sources::SyncSource; use common_pipeline_sources::processors::sources::SyncSourcer; use common_pipeline_transforms::processors::transforms::Transform; +use common_planner::Metadata; +use common_planner::MetadataRef; use parking_lot::Mutex; use parking_lot::RwLock; @@ -66,8 +68,6 @@ use crate::sql::plans::InsertInputSource; use crate::sql::plans::Plan; use crate::sql::plans::Scalar; use crate::sql::BindContext; -use crate::sql::Metadata; -use crate::sql::MetadataRef; use crate::sql::NameResolutionContext; pub struct InsertInterpreterV2 { @@ -372,7 +372,7 @@ impl ValueSource { schema: DataSchemaRef, ) -> Self { let bind_context = BindContext::new(); - let metadata = Arc::new(RwLock::new(Metadata::create())); + let metadata = Arc::new(RwLock::new(Metadata::default())); Self { data, diff --git a/src/query/service/src/interpreters/interpreter_select_v2.rs b/src/query/service/src/interpreters/interpreter_select_v2.rs index 17aa64889ed04..b3bc6270fb084 100644 --- a/src/query/service/src/interpreters/interpreter_select_v2.rs +++ b/src/query/service/src/interpreters/interpreter_select_v2.rs @@ -16,6 +16,7 @@ use std::sync::Arc; use common_datavalues::DataSchemaRef; use common_exception::Result; +use common_planner::MetadataRef; use super::plan_schedulers::schedule_query_v2; use crate::interpreters::Interpreter; @@ -26,7 +27,6 @@ use crate::sql::executor::PhysicalPlanBuilder; use crate::sql::executor::PipelineBuilder; use crate::sql::optimizer::SExpr; use crate::sql::BindContext; -use crate::sql::MetadataRef; /// Interpret SQL query with ne&w SQL planner pub struct SelectInterpreterV2 { diff --git a/src/query/service/src/pipelines/processors/transforms/hash_join/join_hash_table.rs b/src/query/service/src/pipelines/processors/transforms/hash_join/join_hash_table.rs index c1855c3c401b6..0084227d7957e 100644 --- a/src/query/service/src/pipelines/processors/transforms/hash_join/join_hash_table.rs +++ b/src/query/service/src/pipelines/processors/transforms/hash_join/join_hash_table.rs @@ -44,6 +44,7 @@ use common_datavalues::NullableType; use common_exception::ErrorCode; use common_exception::Result; use common_hashtable::HashMap; +use common_planner::IndexType; use parking_lot::RwLock; use primitive_types::U256; use primitive_types::U512; @@ -59,7 +60,6 @@ use crate::sessions::TableContext; use crate::sql::executor::PhysicalScalar; use crate::sql::planner::plans::JoinType; use crate::sql::plans::JoinType::Mark; -use crate::sql::IndexType; pub struct SerializerHashTable { pub(crate) hash_table: HashMap>, diff --git a/src/query/service/src/sql/executor/expression_builder.rs b/src/query/service/src/sql/executor/expression_builder.rs index bfd9959cd3089..68929f5537a4b 100644 --- a/src/query/service/src/sql/executor/expression_builder.rs +++ b/src/query/service/src/sql/executor/expression_builder.rs @@ -17,9 +17,10 @@ use common_datavalues::DataValue; use common_exception::ErrorCode; use common_exception::Result; use common_legacy_planners::Expression; +use common_planner::IndexType; +use common_planner::MetadataRef; use crate::sql::executor::util::format_field_name; -use crate::sql::planner::IndexType; use crate::sql::plans::AggregateFunction; use crate::sql::plans::AndExpr; use crate::sql::plans::BoundColumnRef; @@ -29,7 +30,6 @@ use crate::sql::plans::ConstantExpr; use crate::sql::plans::FunctionCall; use crate::sql::plans::OrExpr; use crate::sql::plans::Scalar; -use crate::sql::MetadataRef; pub trait FiledNameFormat { fn format(display_name: &str, index: IndexType) -> String; @@ -63,11 +63,9 @@ where ExpressionBuilder: FiledNameFormat pub fn build_and_rename(&self, scalar: &Scalar, index: IndexType) -> Result { let expr = self.build(scalar)?; - let name = self.metadata.read().column(index).name.clone(); - Ok(Expression::Alias( - Self::format(name.as_str(), index), - Box::new(expr), - )) + let metadata = self.metadata.read(); + let name = metadata.column(index).name(); + Ok(Expression::Alias(Self::format(name, index), Box::new(expr))) } pub fn build(&self, scalar: &Scalar) -> Result { @@ -137,8 +135,9 @@ where ExpressionBuilder: FiledNameFormat } pub fn build_column_ref(&self, index: IndexType) -> Result { - let name = self.metadata.read().column(index).name.clone(); - Ok(Expression::Column(Self::format(name.as_str(), index))) + let metadata = self.metadata.read(); + let name = metadata.column(index).name(); + Ok(Expression::Column(Self::format(name, index))) } pub fn build_literal( diff --git a/src/query/service/src/sql/executor/format.rs b/src/query/service/src/sql/executor/format.rs index 73fba5cc14fba..5dd49f8b2d9b5 100644 --- a/src/query/service/src/sql/executor/format.rs +++ b/src/query/service/src/sql/executor/format.rs @@ -16,6 +16,9 @@ use common_ast::ast::FormatTreeNode; use common_exception::ErrorCode; use common_exception::Result; use common_legacy_planners::StageKind; +use common_planner::IndexType; +use common_planner::MetadataRef; +use common_planner::DUMMY_TABLE_INDEX; use itertools::Itertools; use super::AggregateFinal; @@ -30,9 +33,6 @@ use super::Project; use super::Sort; use super::TableScan; use super::UnionAll; -use crate::sql::planner::IndexType; -use crate::sql::MetadataRef; -use crate::sql::DUMMY_TABLE_INDEX; impl PhysicalPlan { pub fn format(&self, metadata: MetadataRef) -> Result { @@ -69,7 +69,7 @@ fn table_scan_to_format_tree( return Ok(FormatTreeNode::new("DummyTableScan".to_string())); } let table = metadata.read().table(plan.table_index).clone(); - let table_name = format!("{}.{}.{}", table.catalog, table.database, table.name); + let table_name = format!("{}.{}.{}", table.catalog(), table.database(), table.name()); let filters = plan .source .push_downs @@ -135,7 +135,7 @@ fn project_to_format_tree( .columns .iter() .sorted() - .map(|column| format!("{} (#{})", metadata.read().column(*column).name, column)) + .map(|column| format!("{} (#{})", metadata.read().column(*column).name(), column)) .collect::>() .join(", "); Ok(FormatTreeNode::with_children("Project".to_string(), vec![ @@ -173,7 +173,7 @@ fn aggregate_partial_to_format_tree( .map(|column| { let index = column.parse::()?; let column = metadata.read().column(index).clone(); - Ok(column.name) + Ok(column.name().to_string()) }) .collect::>>()? .join(", "); @@ -204,7 +204,7 @@ fn aggregate_final_to_format_tree( .map(|column| { let index = column.parse::()?; let column = metadata.read().column(index).clone(); - Ok(column.name) + Ok(column.name().to_string()) }) .collect::>>()? .join(", "); @@ -234,7 +234,7 @@ fn sort_to_format_tree(plan: &Sort, metadata: &MetadataRef) -> Result (column.column_index, path_indices.clone()), + match &column.path_indices() { + Some(path_indices) => (column.index(), path_indices.to_vec()), None => { - let name = metadata.column(*index).name.as_str(); + let name = metadata.column(*index).name(); let idx = schema.index_of(name).unwrap(); - (column.column_index, vec![idx]) + (column.index(), vec![idx]) } } }) .sorted() - .collect::>(); + .collect::>>(); Projection::InnerColumns(col_indices) } } @@ -120,23 +121,23 @@ impl PhysicalPlanBuilder { let metadata = self.metadata.read().clone(); for index in scan.columns.iter() { let column = metadata.column(*index); - if column.path_indices.is_some() { + if column.has_path_indices() { has_inner_column = true; } if let Some(prewhere) = &scan.prewhere { // if there is a prewhere optimization, // we can prune `PhysicalScan`'s ouput schema. if prewhere.output_columns.contains(index) { - name_mapping.insert(column.name.clone(), index.to_string()); + name_mapping.insert(column.name().to_string(), index.to_string()); } } else { - let name = column.name.clone(); + let name = column.name().to_string(); name_mapping.insert(name, index.to_string()); } } let table_entry = metadata.table(scan.table_index); - let table = table_entry.table.clone(); + let table = table_entry.table(); let table_schema = table.schema(); let push_downs = self.push_downs(scan, &table_schema, has_inner_column)?; @@ -144,7 +145,7 @@ impl PhysicalPlanBuilder { let source = table .read_plan_with_catalog( self.ctx.clone(), - table_entry.catalog.clone(), + table_entry.catalog().to_string(), Some(push_downs), ) .await?; diff --git a/src/query/service/src/sql/executor/physical_scalar.rs b/src/query/service/src/sql/executor/physical_scalar.rs index 1889571dc2ad0..7c11c36bb75ab 100644 --- a/src/query/service/src/sql/executor/physical_scalar.rs +++ b/src/query/service/src/sql/executor/physical_scalar.rs @@ -16,10 +16,10 @@ use common_datavalues::format_data_type_sql; use common_datavalues::DataTypeImpl; use common_datavalues::DataValue; use common_exception::Result; +use common_planner::IndexType; +use common_planner::MetadataRef; use super::ColumnID; -use crate::sql::planner::IndexType; -use crate::sql::MetadataRef; /// Serializable and desugared representation of `Scalar`. #[derive(Clone, Debug, Eq, PartialEq, serde::Serialize, serde::Deserialize)] @@ -70,13 +70,13 @@ impl PhysicalScalar { let index = column_id.parse::()?; let column = metadata.read().column(index).clone(); - let table_name = match column.table_index { + let table_name = match column.table_index() { Some(table_index) => { - format!("{}.", metadata.read().table(table_index).name.clone()) + format!("{}.", metadata.read().table(table_index).name()) } None => "".to_string(), }; - Ok(format!("{}{} (#{})", table_name, column.name, index)) + Ok(format!("{}{} (#{})", table_name, column.name(), index)) } PhysicalScalar::Constant { value, .. } => Ok(value.to_string()), PhysicalScalar::Function { name, args, .. } => { @@ -114,7 +114,7 @@ impl AggregateFunctionDesc { .map(|arg| { let index = arg.parse::()?; let column = metadata.read().column(index).clone(); - Ok(column.name) + Ok(column.name().to_string()) }) .collect::>>()? .join(", ") diff --git a/src/query/service/src/sql/executor/util.rs b/src/query/service/src/sql/executor/util.rs index 102c78d2a0535..6add3a726c8f1 100644 --- a/src/query/service/src/sql/executor/util.rs +++ b/src/query/service/src/sql/executor/util.rs @@ -14,11 +14,11 @@ use common_exception::ErrorCode; use common_exception::Result; +use common_planner::IndexType; use once_cell::sync::Lazy; use regex::Regex; use crate::sql::optimizer::SExpr; -use crate::sql::planner::IndexType; use crate::sql::plans::Operator; /// Check if all plans in an expression are physical plans diff --git a/src/query/service/src/sql/planner/binder/aggregate.rs b/src/query/service/src/sql/planner/binder/aggregate.rs index 50b1d1243e584..f7f99345bb300 100644 --- a/src/query/service/src/sql/planner/binder/aggregate.rs +++ b/src/query/service/src/sql/planner/binder/aggregate.rs @@ -22,6 +22,7 @@ use common_ast::DisplayError; use common_datavalues::DataTypeImpl; use common_exception::ErrorCode; use common_exception::Result; +use common_planner::MetadataRef; use crate::sql::binder::scalar::ScalarBinder; use crate::sql::binder::select::SelectList; @@ -29,7 +30,6 @@ use crate::sql::binder::Binder; use crate::sql::binder::ColumnBinding; use crate::sql::binder::Visibility; use crate::sql::optimizer::SExpr; -use crate::sql::planner::metadata::MetadataRef; use crate::sql::plans::Aggregate; use crate::sql::plans::AggregateFunction; use crate::sql::plans::AggregateMode; diff --git a/src/query/service/src/sql/planner/binder/bind_context.rs b/src/query/service/src/sql/planner/binder/bind_context.rs index 7915bb763281a..a81b34a07ae60 100644 --- a/src/query/service/src/sql/planner/binder/bind_context.rs +++ b/src/query/service/src/sql/planner/binder/bind_context.rs @@ -25,12 +25,12 @@ use common_datavalues::DataSchemaRefExt; use common_datavalues::DataTypeImpl; use common_exception::ErrorCode; use common_exception::Result; +use common_planner::IndexType; use parking_lot::RwLock; use super::AggregateInfo; use crate::sql::normalize_identifier; use crate::sql::optimizer::SExpr; -use crate::sql::planner::IndexType; use crate::sql::plans::Scalar; use crate::sql::NameResolutionContext; diff --git a/src/query/service/src/sql/planner/binder/distinct.rs b/src/query/service/src/sql/planner/binder/distinct.rs index 66d90d2e0023b..d9fba8b353a86 100644 --- a/src/query/service/src/sql/planner/binder/distinct.rs +++ b/src/query/service/src/sql/planner/binder/distinct.rs @@ -15,12 +15,12 @@ use std::collections::HashMap; use common_exception::Result; +use common_planner::IndexType; use crate::sql::binder::Binder; use crate::sql::binder::ColumnBinding; use crate::sql::optimizer::SExpr; use crate::sql::planner::semantic::GroupingChecker; -use crate::sql::planner::IndexType; use crate::sql::plans::Aggregate; use crate::sql::plans::AggregateMode; use crate::sql::plans::BoundColumnRef; diff --git a/src/query/service/src/sql/planner/binder/join.rs b/src/query/service/src/sql/planner/binder/join.rs index 4db48ff44b90a..47354720d6e59 100644 --- a/src/query/service/src/sql/planner/binder/join.rs +++ b/src/query/service/src/sql/planner/binder/join.rs @@ -23,6 +23,7 @@ use common_datavalues::type_coercion::compare_coercion; use common_datavalues::wrap_nullable; use common_exception::ErrorCode; use common_exception::Result; +use common_planner::MetadataRef; use crate::sessions::TableContext; use crate::sql::binder::scalar_common::split_conjunctions; @@ -34,7 +35,6 @@ use crate::sql::optimizer::ColumnSet; use crate::sql::optimizer::SExpr; use crate::sql::planner::binder::scalar::ScalarBinder; use crate::sql::planner::binder::Binder; -use crate::sql::planner::metadata::MetadataRef; use crate::sql::planner::semantic::NameResolutionContext; use crate::sql::plans::BoundColumnRef; use crate::sql::plans::JoinType; diff --git a/src/query/service/src/sql/planner/binder/mod.rs b/src/query/service/src/sql/planner/binder/mod.rs index bc858bff97a56..0e1aadbd7a578 100644 --- a/src/query/service/src/sql/planner/binder/mod.rs +++ b/src/query/service/src/sql/planner/binder/mod.rs @@ -35,6 +35,7 @@ use common_legacy_planners::DropUserUDFPlan; use common_legacy_planners::ShowGrantsPlan; use common_legacy_planners::UseDatabasePlan; use common_meta_types::UserDefinedFunction; +use common_planner::MetadataRef; pub use scalar::ScalarBinder; pub use scalar_common::*; @@ -43,7 +44,6 @@ use super::plans::RewriteKind; use super::semantic::NameResolutionContext; use crate::catalogs::CatalogManager; use crate::sessions::TableContext; -use crate::sql::planner::metadata::MetadataRef; mod aggregate; mod bind_context; diff --git a/src/query/service/src/sql/planner/binder/project.rs b/src/query/service/src/sql/planner/binder/project.rs index 6054cec2b29a7..729ed205c99b1 100644 --- a/src/query/service/src/sql/planner/binder/project.rs +++ b/src/query/service/src/sql/planner/binder/project.rs @@ -18,6 +18,7 @@ use common_ast::ast::Indirection; use common_ast::ast::SelectTarget; use common_exception::ErrorCode; use common_exception::Result; +use common_planner::IndexType; use super::bind_context::NameResolutionResult; use crate::sql::binder::select::SelectItem; @@ -31,7 +32,6 @@ use crate::sql::planner::binder::Binder; use crate::sql::planner::binder::ColumnBinding; use crate::sql::planner::semantic::normalize_identifier; use crate::sql::planner::semantic::GroupingChecker; -use crate::sql::planner::IndexType; use crate::sql::plans::BoundColumnRef; use crate::sql::plans::EvalScalar; use crate::sql::plans::Project; diff --git a/src/query/service/src/sql/planner/binder/scalar.rs b/src/query/service/src/sql/planner/binder/scalar.rs index 06d4a58dce0a1..228a9da6a1a1c 100644 --- a/src/query/service/src/sql/planner/binder/scalar.rs +++ b/src/query/service/src/sql/planner/binder/scalar.rs @@ -17,10 +17,10 @@ use std::sync::Arc; use common_ast::ast::Expr; use common_datavalues::DataTypeImpl; use common_exception::Result; +use common_planner::MetadataRef; use crate::sessions::TableContext; use crate::sql::planner::binder::BindContext; -use crate::sql::planner::metadata::MetadataRef; use crate::sql::planner::semantic::NameResolutionContext; use crate::sql::planner::semantic::TypeChecker; use crate::sql::plans::Scalar; diff --git a/src/query/service/src/sql/planner/binder/sort.rs b/src/query/service/src/sql/planner/binder/sort.rs index dbf4571203cf1..04b16088b054b 100644 --- a/src/query/service/src/sql/planner/binder/sort.rs +++ b/src/query/service/src/sql/planner/binder/sort.rs @@ -21,6 +21,7 @@ use common_ast::ast::OrderByExpr; use common_ast::DisplayError; use common_exception::ErrorCode; use common_exception::Result; +use common_planner::IndexType; use super::bind_context::NameResolutionResult; use crate::sql::binder::scalar::ScalarBinder; @@ -30,7 +31,6 @@ use crate::sql::binder::ColumnBinding; use crate::sql::normalize_identifier; use crate::sql::optimizer::SExpr; use crate::sql::planner::semantic::GroupingChecker; -use crate::sql::planner::IndexType; use crate::sql::plans::AggregateFunction; use crate::sql::plans::AndExpr; use crate::sql::plans::BoundColumnRef; diff --git a/src/query/service/src/sql/planner/binder/table.rs b/src/query/service/src/sql/planner/binder/table.rs index 0c74c0b43446f..449d27ea43998 100644 --- a/src/query/service/src/sql/planner/binder/table.rs +++ b/src/query/service/src/sql/planner/binder/table.rs @@ -31,6 +31,7 @@ use common_datavalues::prelude::*; use common_exception::ErrorCode; use common_exception::Result; use common_legacy_planners::Expression; +use common_planner::IndexType; use crate::sql::binder::scalar::ScalarBinder; use crate::sql::binder::Binder; @@ -40,7 +41,6 @@ use crate::sql::binder::Visibility; use crate::sql::optimizer::SExpr; use crate::sql::planner::semantic::normalize_identifier; use crate::sql::planner::semantic::TypeChecker; -use crate::sql::planner::IndexType; use crate::sql::plans::ConstantExpr; use crate::sql::plans::LogicalGet; use crate::sql::plans::Scalar; @@ -294,11 +294,11 @@ impl<'a> Binder { for column in columns.iter() { let column_binding = ColumnBinding { database_name: Some(database_name.to_string()), - table_name: Some(table.name.clone()), - column_name: column.name.clone(), - index: column.column_index, - data_type: Box::new(column.data_type.clone()), - visibility: if column.path_indices.is_some() { + table_name: Some(table.name().to_string()), + column_name: column.name().to_string(), + index: column.index(), + data_type: Box::new(column.data_type().clone()), + visibility: if column.has_path_indices() { Visibility::InVisible } else { Visibility::Visible @@ -306,12 +306,12 @@ impl<'a> Binder { }; bind_context.add_column_binding(column_binding); } - let stat = table.table.statistics(self.ctx.clone()).await?; + let stat = table.table().statistics(self.ctx.clone()).await?; Ok(( SExpr::create_leaf( LogicalGet { table_index, - columns: columns.into_iter().map(|col| col.column_index).collect(), + columns: columns.into_iter().map(|col| col.index()).collect(), push_down_predicates: None, limit: None, order_by: None, diff --git a/src/query/service/src/sql/planner/format/display_rel_operator.rs b/src/query/service/src/sql/planner/format/display_rel_operator.rs index d39dde404a746..530c82a9da476 100644 --- a/src/query/service/src/sql/planner/format/display_rel_operator.rs +++ b/src/query/service/src/sql/planner/format/display_rel_operator.rs @@ -17,6 +17,7 @@ use std::fmt::Display; use common_ast::ast::FormatTreeNode; use common_datavalues::format_data_type_sql; use common_functions::scalars::FunctionFactory; +use common_planner::MetadataRef; use itertools::Itertools; use crate::sql::optimizer::SExpr; @@ -38,7 +39,6 @@ use crate::sql::plans::Project; use crate::sql::plans::RelOperator; use crate::sql::plans::Scalar; use crate::sql::plans::Sort; -use crate::sql::MetadataRef; use crate::sql::ScalarExpr; #[derive(Clone)] @@ -246,7 +246,9 @@ fn physical_scan_to_format_tree( vec![ FormatTreeNode::new(FormatContext::Text(format!( "table: {}.{}.{}", - &table.catalog, &table.database, &table.name, + table.catalog(), + table.database(), + table.name(), ))), FormatTreeNode::new(FormatContext::Text(format!( "filters: [{}]", @@ -268,7 +270,7 @@ fn physical_scan_to_format_tree( .iter() .map(|item| format!( "{} (#{}) {}", - metadata.read().column(item.index).name.clone(), + metadata.read().column(item.index).name(), item.index, if item.asc { "ASC" } else { "DESC" } )) @@ -302,7 +304,9 @@ fn logical_get_to_format_tree( vec![ FormatTreeNode::new(FormatContext::Text(format!( "table: {}.{}.{}", - &table.catalog, &table.database, &table.name, + table.catalog(), + table.database(), + table.name(), ))), FormatTreeNode::new(FormatContext::Text(format!( "filters: [{}]", @@ -324,7 +328,7 @@ fn logical_get_to_format_tree( .iter() .map(|item| format!( "{} (#{}) {}", - metadata.read().column(item.index).name.clone(), + metadata.read().column(item.index).name(), item.index, if item.asc { "ASC" } else { "DESC" } )) @@ -556,7 +560,7 @@ fn project_to_format_tree( .read() .columns() .iter() - .map(|entry| format!("{} (#{})", entry.name.clone(), entry.column_index)) + .map(|entry| format!("{} (#{})", entry.name(), entry.index())) .collect::>(); // Sorted by column index to make display of Project stable let project_columns = op @@ -591,7 +595,8 @@ fn sort_to_format_tree( .items .iter() .map(|item| { - let name = metadata.read().column(item.index).name.clone(); + let metadata = metadata.read(); + let name = metadata.column(item.index).name(); format!( "{} (#{}) {}", name, diff --git a/src/query/service/src/sql/planner/metadata.rs b/src/query/service/src/sql/planner/metadata.rs index daf128014ebea..1074be08fe7da 100644 --- a/src/query/service/src/sql/planner/metadata.rs +++ b/src/query/service/src/sql/planner/metadata.rs @@ -12,233 +12,11 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::collections::VecDeque; -use std::fmt::Debug; -use std::sync::Arc; - use common_ast::ast::Expr; use common_ast::ast::Literal; use common_datavalues::prelude::*; -use parking_lot::RwLock; - -use crate::sql::optimizer::ColumnSet; -use crate::sql::planner::IndexType; -use crate::storages::Table; - -pub static DUMMY_TABLE_INDEX: IndexType = IndexType::MAX; - -pub type MetadataRef = Arc>; - -#[derive(Clone)] -pub struct TableEntry { - pub index: IndexType, - pub name: String, - pub catalog: String, - pub database: String, - - pub table: Arc, -} - -impl Debug for TableEntry { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - write!( - f, - "TableEntry {{ index: {:?}, name: {:?}, catalog: {:?}, database: {:?} }}", - self.index, self.name, self.catalog, self.database - ) - } -} - -impl TableEntry { - pub fn new( - index: IndexType, - name: String, - catalog: String, - database: String, - table: Arc, - ) -> Self { - TableEntry { - index, - name, - catalog, - database, - table, - } - } -} - -#[derive(Clone, Debug)] -pub struct ColumnEntry { - pub column_index: IndexType, - pub name: String, - pub data_type: DataTypeImpl, - - // Table index of column entry. None if column is derived from a subquery. - pub table_index: Option, - // Path indices for inner column of struct data type. - pub path_indices: Option>, -} - -impl ColumnEntry { - pub fn new( - name: String, - data_type: DataTypeImpl, - column_index: IndexType, - table_index: Option, - path_indices: Option>, - ) -> Self { - ColumnEntry { - column_index, - name, - data_type, - table_index, - path_indices, - } - } -} - -/// Metadata stores information about columns and tables used in a query. -/// Tables and columns are identified with its unique index, notice that index value of a column can -/// be same with that of a table. -#[derive(Clone, Debug, Default)] -pub struct Metadata { - tables: Vec, - columns: Vec, -} - -impl Metadata { - pub fn create() -> Self { - Self { - tables: vec![], - columns: vec![], - } - } - - pub fn table(&self, index: IndexType) -> &TableEntry { - self.tables.get(index).unwrap() - } - - pub fn tables(&self) -> &[TableEntry] { - self.tables.as_slice() - } - - pub fn table_index_by_column_indexes(&self, column_indexes: &ColumnSet) -> Option { - for column in self.columns.iter() { - if column_indexes.contains(&column.column_index) { - return column.table_index; - } - } - None - } - - pub fn column(&self, index: IndexType) -> &ColumnEntry { - self.columns.get(index).unwrap() - } - - pub fn columns(&self) -> &[ColumnEntry] { - self.columns.as_slice() - } - - pub fn columns_by_table_index(&self, index: IndexType) -> Vec { - let mut result = vec![]; - for col in self.columns.iter() { - match col.table_index { - Some(table_index) if table_index == index => { - result.push(col.clone()); - } - _ => {} - } - } - - result - } - - pub fn add_column( - &mut self, - name: String, - data_type: DataTypeImpl, - table_index: Option, - path_indices: Option>, - ) -> IndexType { - let column_index = self.columns.len(); - let column_entry = - ColumnEntry::new(name, data_type, column_index, table_index, path_indices); - self.columns.push(column_entry); - column_index - } - - pub fn add_table( - &mut self, - catalog: String, - database: String, - table_meta: Arc, - ) -> IndexType { - let table_name = table_meta.name().to_string(); - let table_index = self.tables.len(); - let table_entry = TableEntry { - index: table_index, - name: table_name, - database, - catalog, - table: table_meta.clone(), - }; - self.tables.push(table_entry); - let mut struct_fields = VecDeque::new(); - for (i, field) in table_meta.schema().fields().iter().enumerate() { - self.add_column( - field.name().clone(), - field.data_type().clone(), - Some(table_index), - None, - ); - if field.data_type().data_type_id() == TypeID::Struct { - struct_fields.push_back((vec![i], field.clone())); - } - } - // add inner columns of struct column - while !struct_fields.is_empty() { - let (path_indices, field) = struct_fields.pop_front().unwrap(); - let struct_type: StructType = field.data_type().clone().try_into().unwrap(); - - let inner_types = struct_type.types(); - let inner_names = match struct_type.names() { - Some(inner_names) => inner_names - .iter() - .map(|name| format!("{}:{}", field.name(), name)) - .collect::>(), - None => (0..inner_types.len()) - .map(|i| format!("{}:{}", field.name(), i)) - .collect::>(), - }; - for ((i, inner_name), inner_type) in - inner_names.into_iter().enumerate().zip(inner_types.iter()) - { - let mut inner_path_indices = path_indices.clone(); - inner_path_indices.push(i); - - self.add_column( - inner_name.clone(), - inner_type.clone(), - Some(table_index), - Some(inner_path_indices.clone()), - ); - if inner_type.data_type_id() == TypeID::Struct { - let inner_field = DataField::new(&inner_name, inner_type.clone()); - struct_fields.push_back((inner_path_indices, inner_field)); - } - } - } - table_index - } - - pub fn find_smallest_column_within(&self, indices: &[usize]) -> usize { - let entries = indices - .iter() - .map(|i| self.column(*i).clone()) - .collect::>(); - find_smallest_column(entries.as_slice()) - } -} +use common_planner::ColumnEntry; +use common_planner::IndexType; pub fn optimize_remove_count_args(name: &str, distinct: bool, args: &[&Expr]) -> bool { name.eq_ignore_ascii_case("count") @@ -252,16 +30,16 @@ pub fn find_smallest_column(entries: &[ColumnEntry]) -> usize { debug_assert!(!entries.is_empty()); let mut column_indexes = entries .iter() - .map(|entry| entry.column_index) + .map(|entry| entry.index()) .collect::>(); column_indexes.sort(); let mut smallest_index = column_indexes[0]; let mut smallest_size = usize::MAX; for (idx, column_entry) in entries.iter().enumerate() { - if let Ok(bytes) = column_entry.data_type.data_type_id().numeric_byte_size() { + if let Ok(bytes) = column_entry.data_type().data_type_id().numeric_byte_size() { if smallest_size > bytes { smallest_size = bytes; - smallest_index = entries[idx].column_index; + smallest_index = entries[idx].index(); } } } diff --git a/src/query/service/src/sql/planner/mod.rs b/src/query/service/src/sql/planner/mod.rs index 29c9113a6bd53..f9cc669e16949 100644 --- a/src/query/service/src/sql/planner/mod.rs +++ b/src/query/service/src/sql/planner/mod.rs @@ -37,12 +37,9 @@ pub use binder::Binder; pub use binder::ColumnBinding; pub use binder::Visibility; use common_catalog::catalog::CatalogManager; +use common_planner::Metadata; +use common_planner::MetadataRef; pub use metadata::find_smallest_column; -pub use metadata::ColumnEntry; -pub use metadata::Metadata; -pub use metadata::MetadataRef; -pub use metadata::TableEntry; -pub use metadata::DUMMY_TABLE_INDEX; pub use semantic::normalize_identifier; pub use semantic::IdentifierNormalizer; pub use semantic::NameResolutionContext; @@ -56,8 +53,6 @@ use crate::sessions::TableContext; const PROBE_INSERT_INITIAL_TOKENS: usize = 128; const PROBE_INSERT_MAX_TOKENS: usize = 128 * 8; -pub type IndexType = usize; - pub struct Planner { ctx: Arc, } @@ -101,7 +96,7 @@ impl Planner { let (stmt, format) = parse_sql(&tokens, sql_dialect, &backtrace)?; // Step 3: Bind AST with catalog, and generate a pure logical SExpr - let metadata = Arc::new(RwLock::new(Metadata::create())); + let metadata = Arc::new(RwLock::new(Metadata::default())); let name_resolution_ctx = NameResolutionContext::try_from(settings.as_ref())?; let binder = Binder::new( self.ctx.clone(), diff --git a/src/query/service/src/sql/planner/optimizer/cascades/mod.rs b/src/query/service/src/sql/planner/optimizer/cascades/mod.rs index 176e817f2a888..3b1af15d3ddde 100644 --- a/src/query/service/src/sql/planner/optimizer/cascades/mod.rs +++ b/src/query/service/src/sql/planner/optimizer/cascades/mod.rs @@ -22,6 +22,7 @@ use std::sync::Arc; use common_catalog::table_context::TableContext; use common_exception::ErrorCode; use common_exception::Result; +use common_planner::IndexType; use super::cost::Cost; use super::cost::CostContext; @@ -35,7 +36,6 @@ use crate::sql::optimizer::memo::Memo; use crate::sql::optimizer::rule::RuleSet; use crate::sql::optimizer::rule::TransformState; use crate::sql::optimizer::SExpr; -use crate::sql::planner::IndexType; use crate::sql::plans::Operator; /// A cascades-style search engine to enumerate possible alternations of a relational expression and diff --git a/src/query/service/src/sql/planner/optimizer/cost/mod.rs b/src/query/service/src/sql/planner/optimizer/cost/mod.rs index 4f73737f9921c..31e670e77f1fc 100644 --- a/src/query/service/src/sql/planner/optimizer/cost/mod.rs +++ b/src/query/service/src/sql/planner/optimizer/cost/mod.rs @@ -17,11 +17,11 @@ mod cost_model; use std::ops::Add; use common_exception::Result; +use common_planner::IndexType; pub use cost_model::DefaultCostModel; use super::MExpr; use super::Memo; -use crate::sql::planner::IndexType; #[derive(Debug, Clone, Copy, PartialEq, PartialOrd)] pub struct Cost(pub f64); diff --git a/src/query/service/src/sql/planner/optimizer/group.rs b/src/query/service/src/sql/planner/optimizer/group.rs index ec32632c01050..04be35d9b99a8 100644 --- a/src/query/service/src/sql/planner/optimizer/group.rs +++ b/src/query/service/src/sql/planner/optimizer/group.rs @@ -13,10 +13,10 @@ // limitations under the License. use common_exception::Result; +use common_planner::IndexType; use crate::sql::optimizer::m_expr::MExpr; use crate::sql::optimizer::property::RelationalProperty; -use crate::sql::planner::IndexType; /// `Group` is a set of logically equivalent relational expressions represented with `MExpr`. #[derive(Clone)] diff --git a/src/query/service/src/sql/planner/optimizer/heuristic/decorrelate.rs b/src/query/service/src/sql/planner/optimizer/heuristic/decorrelate.rs index b985c768e02fa..486293021a226 100644 --- a/src/query/service/src/sql/planner/optimizer/heuristic/decorrelate.rs +++ b/src/query/service/src/sql/planner/optimizer/heuristic/decorrelate.rs @@ -20,6 +20,8 @@ use common_datavalues::DataTypeImpl; use common_datavalues::NullableType; use common_exception::ErrorCode; use common_exception::Result; +use common_planner::IndexType; +use common_planner::MetadataRef; use crate::sql::binder::wrap_cast; use crate::sql::binder::JoinPredicate; @@ -30,7 +32,6 @@ use crate::sql::optimizer::heuristic::subquery_rewriter::UnnestResult; use crate::sql::optimizer::ColumnSet; use crate::sql::optimizer::RelExpr; use crate::sql::optimizer::SExpr; -use crate::sql::planner::IndexType; use crate::sql::plans::Aggregate; use crate::sql::plans::AggregateFunction; use crate::sql::plans::AggregateMode; @@ -55,7 +56,6 @@ use crate::sql::plans::SubqueryExpr; use crate::sql::plans::SubqueryType; use crate::sql::plans::UnionAll; use crate::sql::ColumnBinding; -use crate::sql::MetadataRef; use crate::sql::ScalarExpr; /// Decorrelate subqueries inside `s_expr`. @@ -443,12 +443,12 @@ impl SubqueryRewriter { self.derived_columns.insert( *correlated_column, metadata.add_column( - column_entry.name.clone(), - if let DataTypeImpl::Nullable(_) = column_entry.data_type { - column_entry.data_type.clone() + column_entry.name().to_string(), + if let DataTypeImpl::Nullable(_) = column_entry.data_type() { + column_entry.data_type().clone() } else { DataTypeImpl::Nullable(NullableType::create( - column_entry.data_type.clone(), + column_entry.data_type().clone(), )) }, None, @@ -495,8 +495,8 @@ impl SubqueryRewriter { database_name: None, table_name: None, column_name: "".to_string(), - index: column_entry.column_index, - data_type: Box::from(column_entry.data_type.clone()), + index: column_entry.index(), + data_type: Box::from(column_entry.data_type().clone()), visibility: Visibility::Visible, }, }) @@ -529,7 +529,7 @@ impl SubqueryRewriter { table_name: None, column_name: format!("subquery_{}", derived_column), index: *derived_column, - data_type: Box::from(column_entry.data_type.clone()), + data_type: Box::from(column_entry.data_type().clone()), visibility: Visibility::Visible, }; items.push(ScalarItem { @@ -598,7 +598,7 @@ impl SubqueryRewriter { table_name: None, column_name: format!("subquery_{}", derived_column), index: *derived_column, - data_type: Box::from(column_entry.data_type.clone()), + data_type: Box::from(column_entry.data_type().clone()), visibility: Visibility::Visible, } }; @@ -767,7 +767,7 @@ impl SubqueryRewriter { let data_type = { let metadata = self.metadata.read(); let column_entry = metadata.column(*correlated_column); - column_entry.data_type.clone() + column_entry.data_type().clone() }; let right_column = Scalar::BoundColumnRef(BoundColumnRef { column: ColumnBinding { diff --git a/src/query/service/src/sql/planner/optimizer/heuristic/mod.rs b/src/query/service/src/sql/planner/optimizer/heuristic/mod.rs index ef0a38743c6e9..110172ab7649f 100644 --- a/src/query/service/src/sql/planner/optimizer/heuristic/mod.rs +++ b/src/query/service/src/sql/planner/optimizer/heuristic/mod.rs @@ -22,6 +22,7 @@ mod subquery_rewriter; use std::sync::Arc; use common_exception::Result; +use common_planner::MetadataRef; use once_cell::sync::Lazy; use self::prewhere_optimization::PrewhereOptimizer; @@ -34,7 +35,6 @@ pub use crate::sql::optimizer::heuristic::rule_list::RuleList; use crate::sql::optimizer::rule::TransformState; use crate::sql::optimizer::SExpr; use crate::sql::BindContext; -use crate::sql::MetadataRef; pub static DEFAULT_REWRITE_RULES: Lazy> = Lazy::new(|| { vec![ diff --git a/src/query/service/src/sql/planner/optimizer/heuristic/prewhere_optimization.rs b/src/query/service/src/sql/planner/optimizer/heuristic/prewhere_optimization.rs index 2698ca5fcb8ee..f83deec3dee45 100644 --- a/src/query/service/src/sql/planner/optimizer/heuristic/prewhere_optimization.rs +++ b/src/query/service/src/sql/planner/optimizer/heuristic/prewhere_optimization.rs @@ -13,6 +13,7 @@ // limitations under the License. use common_exception::Result; +use common_planner::MetadataRef; use crate::sql::optimizer::ColumnSet; use crate::sql::optimizer::SExpr; @@ -22,7 +23,6 @@ use crate::sql::plans::PatternPlan; use crate::sql::plans::Prewhere; use crate::sql::plans::RelOp; use crate::sql::plans::Scalar; -use crate::sql::MetadataRef; pub struct PrewhereOptimizer { metadata: MetadataRef, @@ -95,7 +95,7 @@ impl PrewhereOptimizer { let mut get: LogicalGet = s_expr.child(0)?.plan().clone().try_into()?; let metadata = self.metadata.read().clone(); - let table = metadata.table(get.table_index).table.clone(); + let table = metadata.table(get.table_index).table(); if !table.support_prewhere() { // cannot optimize return Ok(s_expr); diff --git a/src/query/service/src/sql/planner/optimizer/heuristic/prune_columns.rs b/src/query/service/src/sql/planner/optimizer/heuristic/prune_columns.rs index 6c0662e1f5c55..4fff4ac914738 100644 --- a/src/query/service/src/sql/planner/optimizer/heuristic/prune_columns.rs +++ b/src/query/service/src/sql/planner/optimizer/heuristic/prune_columns.rs @@ -14,6 +14,7 @@ use common_exception::ErrorCode; use common_exception::Result; +use common_planner::MetadataRef; use itertools::Itertools; use crate::sql::find_smallest_column; @@ -25,7 +26,6 @@ use crate::sql::plans::EvalScalar; use crate::sql::plans::LogicalGet; use crate::sql::plans::Project; use crate::sql::plans::RelOperator; -use crate::sql::MetadataRef; use crate::sql::ScalarExpr; pub struct ColumnPruner { diff --git a/src/query/service/src/sql/planner/optimizer/heuristic/subquery_rewriter.rs b/src/query/service/src/sql/planner/optimizer/heuristic/subquery_rewriter.rs index e41e0dfdfbba7..716f4e948e1f2 100644 --- a/src/query/service/src/sql/planner/optimizer/heuristic/subquery_rewriter.rs +++ b/src/query/service/src/sql/planner/optimizer/heuristic/subquery_rewriter.rs @@ -22,13 +22,14 @@ use common_datavalues::UInt64Type; use common_exception::ErrorCode; use common_exception::Result; use common_functions::aggregates::AggregateFunctionFactory; +use common_planner::IndexType; +use common_planner::MetadataRef; use crate::sql::binder::ColumnBinding; use crate::sql::binder::Visibility; use crate::sql::optimizer::ColumnSet; use crate::sql::optimizer::RelExpr; use crate::sql::optimizer::SExpr; -use crate::sql::planner::IndexType; use crate::sql::plans::Aggregate; use crate::sql::plans::AggregateFunction; use crate::sql::plans::AggregateMode; @@ -50,7 +51,6 @@ use crate::sql::plans::Scalar; use crate::sql::plans::ScalarItem; use crate::sql::plans::SubqueryExpr; use crate::sql::plans::SubqueryType; -use crate::sql::MetadataRef; use crate::sql::ScalarExpr; pub enum UnnestResult { diff --git a/src/query/service/src/sql/planner/optimizer/m_expr.rs b/src/query/service/src/sql/planner/optimizer/m_expr.rs index fb2368d02a641..b471e59802c38 100644 --- a/src/query/service/src/sql/planner/optimizer/m_expr.rs +++ b/src/query/service/src/sql/planner/optimizer/m_expr.rs @@ -14,6 +14,7 @@ use common_exception::ErrorCode; use common_exception::Result; +use common_planner::IndexType; use super::group::Group; use crate::sql::optimizer::memo::Memo; @@ -21,7 +22,6 @@ use crate::sql::optimizer::pattern_extractor::PatternExtractor; use crate::sql::optimizer::rule::RulePtr; use crate::sql::optimizer::rule::TransformState; use crate::sql::optimizer::SExpr; -use crate::sql::planner::IndexType; use crate::sql::plans::Operator; use crate::sql::plans::RelOperator; diff --git a/src/query/service/src/sql/planner/optimizer/memo.rs b/src/query/service/src/sql/planner/optimizer/memo.rs index 6e2637caddcce..ac4590fe6f77e 100644 --- a/src/query/service/src/sql/planner/optimizer/memo.rs +++ b/src/query/service/src/sql/planner/optimizer/memo.rs @@ -16,13 +16,13 @@ use std::collections::HashSet; use common_exception::ErrorCode; use common_exception::Result; +use common_planner::IndexType; use super::RelExpr; use super::RelationalProperty; use crate::sql::optimizer::group::Group; use crate::sql::optimizer::m_expr::MExpr; use crate::sql::optimizer::s_expr::SExpr; -use crate::sql::planner::IndexType; use crate::sql::plans::RelOperator; /// `Memo` is a search space which memoize possible plans of a query. diff --git a/src/query/service/src/sql/planner/optimizer/mod.rs b/src/query/service/src/sql/planner/optimizer/mod.rs index b806062a70663..616df752c07b7 100644 --- a/src/query/service/src/sql/planner/optimizer/mod.rs +++ b/src/query/service/src/sql/planner/optimizer/mod.rs @@ -30,6 +30,7 @@ use std::sync::Arc; use common_ast::ast::ExplainKind; use common_exception::Result; +use common_planner::MetadataRef; pub use heuristic::HeuristicOptimizer; pub use heuristic::DEFAULT_REWRITE_RULES; pub use m_expr::MExpr; @@ -55,7 +56,6 @@ pub use crate::sql::optimizer::heuristic::RuleList; pub use crate::sql::optimizer::rule::RuleID; use crate::sql::optimizer::rule::RuleSet; use crate::sql::plans::CopyPlanV2; -use crate::sql::MetadataRef; #[derive(Debug, Clone, Default)] pub struct OptimizerConfig { diff --git a/src/query/service/src/sql/planner/optimizer/property/mod.rs b/src/query/service/src/sql/planner/optimizer/property/mod.rs index 50ee01045e186..0402b825dffd4 100644 --- a/src/query/service/src/sql/planner/optimizer/property/mod.rs +++ b/src/query/service/src/sql/planner/optimizer/property/mod.rs @@ -19,9 +19,9 @@ mod stat; use std::collections::HashSet; pub use builder::RelExpr; +use common_planner::IndexType; pub use enforcer::require_property; -use crate::sql::planner::IndexType; use crate::sql::plans::Scalar; pub type ColumnSet = HashSet; diff --git a/src/query/service/src/sql/planner/optimizer/s_expr.rs b/src/query/service/src/sql/planner/optimizer/s_expr.rs index 918570fff057f..72261e9bb062c 100644 --- a/src/query/service/src/sql/planner/optimizer/s_expr.rs +++ b/src/query/service/src/sql/planner/optimizer/s_expr.rs @@ -14,11 +14,11 @@ use common_exception::ErrorCode; use common_exception::Result; +use common_planner::IndexType; use super::RelationalProperty; use crate::sql::optimizer::rule::AppliedRules; use crate::sql::optimizer::rule::RuleID; -use crate::sql::planner::IndexType; use crate::sql::plans::Operator; use crate::sql::plans::PatternPlan; use crate::sql::plans::RelOp; diff --git a/src/query/service/src/sql/planner/optimizer/util.rs b/src/query/service/src/sql/planner/optimizer/util.rs index f96d1d6f8da64..f336f798812f6 100644 --- a/src/query/service/src/sql/planner/optimizer/util.rs +++ b/src/query/service/src/sql/planner/optimizer/util.rs @@ -12,10 +12,11 @@ // See the License for the specific language governing permissions and // limitations under the License. +use common_planner::MetadataRef; + use super::SExpr; use crate::sql::plans::JoinType; use crate::sql::plans::RelOperator; -use crate::sql::MetadataRef; /// Check if a query will read data from local tables(e.g. system tables). pub fn contains_local_table_scan(s_expr: &SExpr, metadata: &MetadataRef) -> bool { @@ -24,7 +25,7 @@ pub fn contains_local_table_scan(s_expr: &SExpr, metadata: &MetadataRef) -> bool .iter() .any(|s_expr| contains_local_table_scan(s_expr, metadata)) || if let RelOperator::LogicalGet(get) = s_expr.plan() { - metadata.read().table(get.table_index).table.is_local() + metadata.read().table(get.table_index).table().is_local() } else { false } diff --git a/src/query/service/src/sql/planner/plans/eval_scalar.rs b/src/query/service/src/sql/planner/plans/eval_scalar.rs index b499f2b118013..f2cd00a1cca46 100644 --- a/src/query/service/src/sql/planner/plans/eval_scalar.rs +++ b/src/query/service/src/sql/planner/plans/eval_scalar.rs @@ -13,13 +13,13 @@ // limitations under the License.#[derive(Clone, Debug)] use common_exception::Result; +use common_planner::IndexType; use crate::sql::optimizer::ColumnSet; use crate::sql::optimizer::PhysicalProperty; use crate::sql::optimizer::RelExpr; use crate::sql::optimizer::RelationalProperty; use crate::sql::optimizer::RequiredProperty; -use crate::sql::planner::IndexType; use crate::sql::plans::LogicalOperator; use crate::sql::plans::Operator; use crate::sql::plans::PhysicalOperator; diff --git a/src/query/service/src/sql/planner/plans/hash_join.rs b/src/query/service/src/sql/planner/plans/hash_join.rs index e892dca735d75..6d9501892e944 100644 --- a/src/query/service/src/sql/planner/plans/hash_join.rs +++ b/src/query/service/src/sql/planner/plans/hash_join.rs @@ -13,13 +13,13 @@ // limitations under the License. use common_exception::Result; +use common_planner::IndexType; use super::JoinType; use crate::sql::optimizer::Distribution; use crate::sql::optimizer::PhysicalProperty; use crate::sql::optimizer::RelExpr; use crate::sql::optimizer::RequiredProperty; -use crate::sql::planner::IndexType; use crate::sql::plans::LogicalOperator; use crate::sql::plans::Operator; use crate::sql::plans::PhysicalOperator; diff --git a/src/query/service/src/sql/planner/plans/logical_get.rs b/src/query/service/src/sql/planner/plans/logical_get.rs index 25d732663018a..20accdedbbd10 100644 --- a/src/query/service/src/sql/planner/plans/logical_get.rs +++ b/src/query/service/src/sql/planner/plans/logical_get.rs @@ -14,12 +14,12 @@ use common_catalog::table::TableStatistics; use common_exception::Result; +use common_planner::IndexType; use itertools::Itertools; use crate::sql::optimizer::ColumnSet; use crate::sql::optimizer::RelExpr; use crate::sql::optimizer::RelationalProperty; -use crate::sql::planner::IndexType; use crate::sql::plans::LogicalOperator; use crate::sql::plans::Operator; use crate::sql::plans::PhysicalOperator; diff --git a/src/query/service/src/sql/planner/plans/logical_join.rs b/src/query/service/src/sql/planner/plans/logical_join.rs index c67a4113e8d70..2b9b0df877505 100644 --- a/src/query/service/src/sql/planner/plans/logical_join.rs +++ b/src/query/service/src/sql/planner/plans/logical_join.rs @@ -16,11 +16,11 @@ use std::fmt::Display; use std::fmt::Formatter; use common_exception::Result; +use common_planner::IndexType; use super::ScalarExpr; use crate::sql::optimizer::RelExpr; use crate::sql::optimizer::RelationalProperty; -use crate::sql::planner::IndexType; use crate::sql::plans::LogicalOperator; use crate::sql::plans::Operator; use crate::sql::plans::PhysicalOperator; diff --git a/src/query/service/src/sql/planner/plans/mod.rs b/src/query/service/src/sql/planner/plans/mod.rs index bcce440896206..63da7cd7841b4 100644 --- a/src/query/service/src/sql/planner/plans/mod.rs +++ b/src/query/service/src/sql/planner/plans/mod.rs @@ -86,6 +86,7 @@ use common_legacy_planners::TruncateTablePlan; use common_legacy_planners::UndropDatabasePlan; use common_legacy_planners::UndropTablePlan; use common_legacy_planners::UseDatabasePlan; +use common_planner::MetadataRef; pub use copy_v2::CopyPlanV2; pub use copy_v2::ValidationMode; pub use create_table_v2::CreateTablePlanV2; @@ -116,7 +117,6 @@ pub use sort::SortItem; pub use union_all::UnionAll; use super::BindContext; -use super::MetadataRef; use crate::sql::optimizer::SExpr; #[derive(Clone, Debug)] diff --git a/src/query/service/src/sql/planner/plans/physical_scan.rs b/src/query/service/src/sql/planner/plans/physical_scan.rs index 64ac64b4e9f84..95db88c853f75 100644 --- a/src/query/service/src/sql/planner/plans/physical_scan.rs +++ b/src/query/service/src/sql/planner/plans/physical_scan.rs @@ -15,6 +15,7 @@ use std::hash::Hash; use common_exception::Result; +use common_planner::IndexType; use itertools::Itertools; use super::logical_get::Prewhere; @@ -23,7 +24,6 @@ use crate::sql::optimizer::Distribution; use crate::sql::optimizer::PhysicalProperty; use crate::sql::optimizer::RelExpr; use crate::sql::optimizer::RequiredProperty; -use crate::sql::planner::IndexType; use crate::sql::plans::LogicalOperator; use crate::sql::plans::Operator; use crate::sql::plans::PhysicalOperator; diff --git a/src/query/service/src/sql/planner/plans/scalar.rs b/src/query/service/src/sql/planner/plans/scalar.rs index 973b2e6eb9fb1..1ee74e7263976 100644 --- a/src/query/service/src/sql/planner/plans/scalar.rs +++ b/src/query/service/src/sql/planner/plans/scalar.rs @@ -21,11 +21,11 @@ use common_datavalues::DataValue; use common_exception::ErrorCode; use common_exception::Result; use common_functions::scalars::FunctionFactory; +use common_planner::IndexType; use crate::sql::binder::ColumnBinding; use crate::sql::optimizer::ColumnSet; use crate::sql::optimizer::SExpr; -use crate::sql::planner::IndexType; pub trait ScalarExpr { /// Get return type and nullability diff --git a/src/query/service/src/sql/planner/plans/sort.rs b/src/query/service/src/sql/planner/plans/sort.rs index b8e690f5ed14e..fba881381e79c 100644 --- a/src/query/service/src/sql/planner/plans/sort.rs +++ b/src/query/service/src/sql/planner/plans/sort.rs @@ -13,13 +13,13 @@ // limitations under the License. use common_exception::Result; +use common_planner::IndexType; use crate::sql::optimizer::Distribution; use crate::sql::optimizer::PhysicalProperty; use crate::sql::optimizer::RelExpr; use crate::sql::optimizer::RelationalProperty; use crate::sql::optimizer::RequiredProperty; -use crate::sql::planner::IndexType; use crate::sql::plans::LogicalOperator; use crate::sql::plans::Operator; use crate::sql::plans::PhysicalOperator; diff --git a/src/query/service/src/sql/planner/semantic/type_check.rs b/src/query/service/src/sql/planner/semantic/type_check.rs index 58f5d5097b64d..710f7f760eb85 100644 --- a/src/query/service/src/sql/planner/semantic/type_check.rs +++ b/src/query/service/src/sql/planner/semantic/type_check.rs @@ -53,6 +53,7 @@ use common_functions::scalars::CastFunction; use common_functions::scalars::FunctionFactory; use common_functions::scalars::TupleFunction; use common_legacy_planners::validate_function_arg; +use common_planner::MetadataRef; use common_users::UserApiProvider; use super::name_resolution::NameResolutionContext; @@ -65,7 +66,6 @@ use crate::sql::binder::Binder; use crate::sql::binder::NameResolutionResult; use crate::sql::optimizer::RelExpr; use crate::sql::planner::metadata::optimize_remove_count_args; -use crate::sql::planner::metadata::MetadataRef; use crate::sql::plans::AggregateFunction; use crate::sql::plans::AndExpr; use crate::sql::plans::BoundColumnRef; diff --git a/src/query/service/tests/it/sql/planner/format/mod.rs b/src/query/service/tests/it/sql/planner/format/mod.rs index f63553006772e..f3d0cc8afe7f6 100644 --- a/src/query/service/tests/it/sql/planner/format/mod.rs +++ b/src/query/service/tests/it/sql/planner/format/mod.rs @@ -20,6 +20,7 @@ use common_datavalues::DataValue; use common_meta_app::schema::TableIdent; use common_meta_app::schema::TableInfo; use common_meta_app::schema::TableMeta; +use common_planner::Metadata; use databend_query::sql::optimizer::SExpr; use databend_query::sql::planner::plans::JoinType; use databend_query::sql::plans::BoundColumnRef; @@ -29,7 +30,6 @@ use databend_query::sql::plans::FunctionCall; use databend_query::sql::plans::PhysicalHashJoin; use databend_query::sql::plans::PhysicalScan; use databend_query::sql::ColumnBinding; -use databend_query::sql::Metadata; use databend_query::sql::Visibility; use databend_query::storages::Table; use parking_lot::RwLock; @@ -66,7 +66,7 @@ impl Table for DummyTable { #[test] fn test_format() { - let mut metadata = Metadata::create(); + let mut metadata = Metadata::default(); let col1 = metadata.add_column("col1".to_string(), BooleanType::new_impl(), None, None); let col2 = metadata.add_column("col2".to_string(), BooleanType::new_impl(), None, None); let tab1 = metadata.add_table(