Skip to content

Commit a823c4e

Browse files
authored
Merge pull request #7731 from Xuanwo/move-out-planner
refactor(planner): Move all metadata out
2 parents f9fc86a + f09b757 commit a823c4e

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

52 files changed

+494
-341
lines changed

Cargo.lock

Lines changed: 10 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ members = [
3030
"src/query/functions-v2",
3131
"src/query/legacy-parser",
3232
"src/query/management",
33+
"src/query/planner",
3334
"src/query/pipeline/core",
3435
"src/query/pipeline/sinks",
3536
"src/query/pipeline/sources",

src/query/planner/Cargo.toml

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
[package]
2+
name = "common-planner"
3+
version = "0.1.0"
4+
authors = ["Databend Authors <opensource@datafuselabs.com>"]
5+
license = "Apache-2.0"
6+
publish = false
7+
edition = "2021"
8+
9+
[dependencies]
10+
common-catalog = { path = "../catalog" }
11+
common-datavalues = { path = "../datavalues" }
12+
13+
parking_lot = "0.12"

src/query/planner/src/lib.rs

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
// Copyright 2022 Datafuse Labs.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
//! Databend Planner is the core part of Databend Query, it will:
16+
//!
17+
//! - Use `Parser` (provided by `common-ast`) to parse query into AST.
18+
//! - Use `Binder` to bind query into `LogicalPlan`
19+
//! - Use `Optimizer` to optimize `LogicalPlan` into `PhysicalPlan`
20+
//!
21+
//! After all the planners work, `Interpreter` will use `PhysicalPlan` to
22+
//! build pipelines, then our processes will produce result data blocks.
23+
24+
mod metadata;
25+
pub use metadata::ColumnEntry;
26+
pub use metadata::ColumnSet;
27+
pub use metadata::IndexType;
28+
pub use metadata::Metadata;
29+
pub use metadata::MetadataRef;
30+
pub use metadata::TableEntry;
31+
pub use metadata::DUMMY_TABLE_INDEX;

src/query/planner/src/metadata.rs

Lines changed: 318 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,318 @@
1+
// Copyright 2022 Datafuse Labs.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
use std::collections::HashSet;
16+
use std::collections::VecDeque;
17+
use std::fmt::Debug;
18+
use std::fmt::Formatter;
19+
use std::sync::Arc;
20+
21+
use common_catalog::table::Table;
22+
use common_datavalues::DataField;
23+
use common_datavalues::DataType;
24+
use common_datavalues::DataTypeImpl;
25+
use common_datavalues::StructType;
26+
use common_datavalues::TypeID;
27+
use parking_lot::RwLock;
28+
29+
/// Planner use [`usize`] as it's index type.
30+
///
31+
/// This type will be used across the whole planner.
32+
pub type IndexType = usize;
33+
34+
/// Use IndexType::MAX to represent dummy table.
35+
pub static DUMMY_TABLE_INDEX: IndexType = IndexType::MAX;
36+
37+
/// ColumnSet represents a set of columns identified by its IndexType.
38+
pub type ColumnSet = HashSet<IndexType>;
39+
40+
/// A Send & Send version of [`Metadata`].
41+
///
42+
/// Callers can clone this ref safely and cheaply.
43+
pub type MetadataRef = Arc<RwLock<Metadata>>;
44+
45+
/// Metadata stores information about columns and tables used in a query.
46+
/// Tables and columns are identified with its unique index.
47+
/// Notice that index value of a column can be same with that of a table.
48+
#[derive(Clone, Debug, Default)]
49+
pub struct Metadata {
50+
tables: Vec<TableEntry>,
51+
columns: Vec<ColumnEntry>,
52+
}
53+
54+
impl Metadata {
55+
pub fn table(&self, index: IndexType) -> &TableEntry {
56+
self.tables.get(index).expect("metadata must contain table")
57+
}
58+
59+
pub fn tables(&self) -> &[TableEntry] {
60+
self.tables.as_slice()
61+
}
62+
63+
pub fn table_index_by_column_indexes(&self, column_indexes: &ColumnSet) -> Option<IndexType> {
64+
self.columns
65+
.iter()
66+
.find(|v| column_indexes.contains(&v.column_index))
67+
.and_then(|v| v.table_index)
68+
}
69+
70+
pub fn column(&self, index: IndexType) -> &ColumnEntry {
71+
self.columns
72+
.get(index)
73+
.expect("metadata must contain column")
74+
}
75+
76+
pub fn columns(&self) -> &[ColumnEntry] {
77+
self.columns.as_slice()
78+
}
79+
80+
pub fn columns_by_table_index(&self, index: IndexType) -> Vec<ColumnEntry> {
81+
self.columns
82+
.iter()
83+
.filter(|v| v.table_index == Some(index))
84+
.cloned()
85+
.collect()
86+
}
87+
88+
pub fn add_column(
89+
&mut self,
90+
name: String,
91+
data_type: DataTypeImpl,
92+
table_index: Option<IndexType>,
93+
path_indices: Option<Vec<IndexType>>,
94+
) -> IndexType {
95+
let column_index = self.columns.len();
96+
let column_entry =
97+
ColumnEntry::new(name, data_type, column_index, table_index, path_indices);
98+
self.columns.push(column_entry);
99+
column_index
100+
}
101+
102+
pub fn add_table(
103+
&mut self,
104+
catalog: String,
105+
database: String,
106+
table_meta: Arc<dyn Table>,
107+
) -> IndexType {
108+
let table_name = table_meta.name().to_string();
109+
let table_index = self.tables.len();
110+
let table_entry = TableEntry {
111+
index: table_index,
112+
name: table_name,
113+
database,
114+
catalog,
115+
table: table_meta.clone(),
116+
};
117+
self.tables.push(table_entry);
118+
let mut struct_fields = VecDeque::new();
119+
for (i, field) in table_meta.schema().fields().iter().enumerate() {
120+
self.add_column(
121+
field.name().clone(),
122+
field.data_type().clone(),
123+
Some(table_index),
124+
None,
125+
);
126+
if field.data_type().data_type_id() == TypeID::Struct {
127+
struct_fields.push_back((vec![i], field.clone()));
128+
}
129+
}
130+
// add inner columns of struct column
131+
while !struct_fields.is_empty() {
132+
let (path_indices, field) = struct_fields.pop_front().unwrap();
133+
let struct_type: StructType = field.data_type().clone().try_into().unwrap();
134+
135+
let inner_types = struct_type.types();
136+
let inner_names = match struct_type.names() {
137+
Some(inner_names) => inner_names
138+
.iter()
139+
.map(|name| format!("{}:{}", field.name(), name))
140+
.collect::<Vec<_>>(),
141+
None => (0..inner_types.len())
142+
.map(|i| format!("{}:{}", field.name(), i))
143+
.collect::<Vec<_>>(),
144+
};
145+
for ((i, inner_name), inner_type) in
146+
inner_names.into_iter().enumerate().zip(inner_types.iter())
147+
{
148+
let mut inner_path_indices = path_indices.clone();
149+
inner_path_indices.push(i);
150+
151+
self.add_column(
152+
inner_name.clone(),
153+
inner_type.clone(),
154+
Some(table_index),
155+
Some(inner_path_indices.clone()),
156+
);
157+
if inner_type.data_type_id() == TypeID::Struct {
158+
let inner_field = DataField::new(&inner_name, inner_type.clone());
159+
struct_fields.push_back((inner_path_indices, inner_field));
160+
}
161+
}
162+
}
163+
table_index
164+
}
165+
166+
pub fn find_smallest_column_within(&self, indices: &[usize]) -> usize {
167+
let entries = indices
168+
.iter()
169+
.map(|i| self.column(*i).clone())
170+
.collect::<Vec<_>>();
171+
find_smallest_column(entries.as_slice())
172+
}
173+
}
174+
175+
#[derive(Clone)]
176+
pub struct TableEntry {
177+
catalog: String,
178+
database: String,
179+
name: String,
180+
index: IndexType,
181+
182+
table: Arc<dyn Table>,
183+
}
184+
185+
impl Debug for TableEntry {
186+
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
187+
f.debug_struct("TableEntry")
188+
.field("catalog", &self.catalog)
189+
.field("database", &self.database)
190+
.field("name", &self.name)
191+
.field("index", &self.index)
192+
.finish_non_exhaustive()
193+
}
194+
}
195+
196+
impl TableEntry {
197+
pub fn new(
198+
index: IndexType,
199+
name: String,
200+
catalog: String,
201+
database: String,
202+
table: Arc<dyn Table>,
203+
) -> Self {
204+
TableEntry {
205+
index,
206+
name,
207+
catalog,
208+
database,
209+
table,
210+
}
211+
}
212+
213+
/// Get the catalog name of this table entry.
214+
pub fn catalog(&self) -> &str {
215+
&self.catalog
216+
}
217+
218+
/// Get the database name of this table entry.
219+
pub fn database(&self) -> &str {
220+
&self.database
221+
}
222+
223+
/// Get the name of this table entry.
224+
pub fn name(&self) -> &str {
225+
&self.name
226+
}
227+
228+
/// Get the index this table entry.
229+
pub fn index(&self) -> IndexType {
230+
self.index
231+
}
232+
233+
/// Get the table of this table entry.
234+
pub fn table(&self) -> Arc<dyn Table> {
235+
self.table.clone()
236+
}
237+
}
238+
239+
#[derive(Clone, Debug)]
240+
pub struct ColumnEntry {
241+
column_index: IndexType,
242+
name: String,
243+
data_type: DataTypeImpl,
244+
245+
/// Table index of column entry. None if column is derived from a subquery.
246+
table_index: Option<IndexType>,
247+
/// Path indices for inner column of struct data type.
248+
path_indices: Option<Vec<IndexType>>,
249+
}
250+
251+
impl ColumnEntry {
252+
pub fn new(
253+
name: String,
254+
data_type: DataTypeImpl,
255+
column_index: IndexType,
256+
table_index: Option<IndexType>,
257+
path_indices: Option<Vec<IndexType>>,
258+
) -> Self {
259+
ColumnEntry {
260+
column_index,
261+
name,
262+
data_type,
263+
table_index,
264+
path_indices,
265+
}
266+
}
267+
268+
/// Get the name of this column entry.
269+
pub fn name(&self) -> &str {
270+
&self.name
271+
}
272+
273+
/// Get the index of this column entry.
274+
pub fn index(&self) -> IndexType {
275+
self.column_index
276+
}
277+
278+
/// Get the data type of this column entry.
279+
pub fn data_type(&self) -> &DataTypeImpl {
280+
&self.data_type
281+
}
282+
283+
/// Get the table index of this column entry.
284+
pub fn table_index(&self) -> Option<IndexType> {
285+
self.table_index
286+
}
287+
288+
/// Get the path indices of this column entry.
289+
pub fn path_indices(&self) -> Option<&[IndexType]> {
290+
self.path_indices.as_deref()
291+
}
292+
293+
/// Check if this column entry contains path_indices
294+
pub fn has_path_indices(&self) -> bool {
295+
self.path_indices.is_some()
296+
}
297+
}
298+
299+
/// TODO(xuanwo): migrate this as a function of metadata.
300+
pub fn find_smallest_column(entries: &[ColumnEntry]) -> usize {
301+
debug_assert!(!entries.is_empty());
302+
let mut column_indexes = entries
303+
.iter()
304+
.map(|entry| entry.column_index)
305+
.collect::<Vec<IndexType>>();
306+
column_indexes.sort();
307+
let mut smallest_index = column_indexes[0];
308+
let mut smallest_size = usize::MAX;
309+
for (idx, column_entry) in entries.iter().enumerate() {
310+
if let Ok(bytes) = column_entry.data_type.data_type_id().numeric_byte_size() {
311+
if smallest_size > bytes {
312+
smallest_size = bytes;
313+
smallest_index = entries[idx].column_index;
314+
}
315+
}
316+
}
317+
smallest_index
318+
}

0 commit comments

Comments
 (0)