Skip to content

Commit b5c211a

Browse files
committed
Move all metadata out
Signed-off-by: Xuanwo <github@xuanwo.io>
1 parent 4e1075d commit b5c211a

28 files changed

+287
-252
lines changed

Cargo.lock

Lines changed: 5 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

src/query/planner/Cargo.toml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,3 +7,7 @@ publish = false
77
edition = "2021"
88

99
[dependencies]
10+
common-catalog = { path = "../catalog" }
11+
common-datavalues = { path = "../datavalues" }
12+
13+
parking_lot = "0.12"

src/query/planner/src/lib.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,5 +22,10 @@
2222
//! build pipelines, then our processes will produce result data blocks.
2323
2424
mod metadata;
25+
pub use metadata::ColumnEntry;
26+
pub use metadata::ColumnSet;
2527
pub use metadata::IndexType;
28+
pub use metadata::Metadata;
29+
pub use metadata::MetadataRef;
30+
pub use metadata::TableEntry;
2631
pub use metadata::DUMMY_TABLE_INDEX;

src/query/planner/src/metadata.rs

Lines changed: 242 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,10 +12,252 @@
1212
// See the License for the specific language governing permissions and
1313
// limitations under the License.
1414

15+
use std::collections::HashSet;
16+
use std::collections::VecDeque;
17+
use std::fmt::Debug;
18+
use std::fmt::Formatter;
19+
use std::sync::Arc;
20+
21+
use common_catalog::table::Table;
22+
use common_datavalues::DataField;
23+
use common_datavalues::DataType;
24+
use common_datavalues::DataTypeImpl;
25+
use common_datavalues::StructType;
26+
use common_datavalues::TypeID;
27+
use parking_lot::RwLock;
28+
1529
/// Planner use [`usize`] as it's index type.
1630
///
1731
/// This type will be used across the whole planner.
1832
pub type IndexType = usize;
1933

2034
/// Use IndexType::MAX to represent dummy table.
2135
pub static DUMMY_TABLE_INDEX: IndexType = IndexType::MAX;
36+
37+
/// ColumnSet represents a set of columns identified by its IndexType.
38+
pub type ColumnSet = HashSet<IndexType>;
39+
40+
/// A Send & Send version of [`Metadata`].
41+
///
42+
/// Callers can clone this ref safely and cheaply.
43+
pub type MetadataRef = Arc<RwLock<Metadata>>;
44+
45+
/// Metadata stores information about columns and tables used in a query.
46+
/// Tables and columns are identified with its unique index.
47+
/// Notice that index value of a column can be same with that of a table.
48+
#[derive(Clone, Debug, Default)]
49+
pub struct Metadata {
50+
tables: Vec<TableEntry>,
51+
columns: Vec<ColumnEntry>,
52+
}
53+
54+
impl Metadata {
55+
pub fn table(&self, index: IndexType) -> &TableEntry {
56+
self.tables.get(index).expect("metadata must contain table")
57+
}
58+
59+
pub fn tables(&self) -> &[TableEntry] {
60+
self.tables.as_slice()
61+
}
62+
63+
pub fn table_index_by_column_indexes(&self, column_indexes: &ColumnSet) -> Option<IndexType> {
64+
self.columns
65+
.iter()
66+
.find(|v| column_indexes.contains(&v.column_index))
67+
.and_then(|v| v.table_index)
68+
}
69+
70+
pub fn column(&self, index: IndexType) -> &ColumnEntry {
71+
self.columns
72+
.get(index)
73+
.expect("metadata must contain column")
74+
}
75+
76+
pub fn columns(&self) -> &[ColumnEntry] {
77+
self.columns.as_slice()
78+
}
79+
80+
pub fn columns_by_table_index(&self, index: IndexType) -> Vec<ColumnEntry> {
81+
self.columns
82+
.iter()
83+
.filter(|v| v.table_index == Some(index))
84+
.cloned()
85+
.collect()
86+
}
87+
88+
pub fn add_column(
89+
&mut self,
90+
name: String,
91+
data_type: DataTypeImpl,
92+
table_index: Option<IndexType>,
93+
path_indices: Option<Vec<IndexType>>,
94+
) -> IndexType {
95+
let column_index = self.columns.len();
96+
let column_entry =
97+
ColumnEntry::new(name, data_type, column_index, table_index, path_indices);
98+
self.columns.push(column_entry);
99+
column_index
100+
}
101+
102+
pub fn add_table(
103+
&mut self,
104+
catalog: String,
105+
database: String,
106+
table_meta: Arc<dyn Table>,
107+
) -> IndexType {
108+
let table_name = table_meta.name().to_string();
109+
let table_index = self.tables.len();
110+
let table_entry = TableEntry {
111+
index: table_index,
112+
name: table_name,
113+
database,
114+
catalog,
115+
table: table_meta.clone(),
116+
};
117+
self.tables.push(table_entry);
118+
let mut struct_fields = VecDeque::new();
119+
for (i, field) in table_meta.schema().fields().iter().enumerate() {
120+
self.add_column(
121+
field.name().clone(),
122+
field.data_type().clone(),
123+
Some(table_index),
124+
None,
125+
);
126+
if field.data_type().data_type_id() == TypeID::Struct {
127+
struct_fields.push_back((vec![i], field.clone()));
128+
}
129+
}
130+
// add inner columns of struct column
131+
while !struct_fields.is_empty() {
132+
let (path_indices, field) = struct_fields.pop_front().unwrap();
133+
let struct_type: StructType = field.data_type().clone().try_into().unwrap();
134+
135+
let inner_types = struct_type.types();
136+
let inner_names = match struct_type.names() {
137+
Some(inner_names) => inner_names
138+
.iter()
139+
.map(|name| format!("{}:{}", field.name(), name))
140+
.collect::<Vec<_>>(),
141+
None => (0..inner_types.len())
142+
.map(|i| format!("{}:{}", field.name(), i))
143+
.collect::<Vec<_>>(),
144+
};
145+
for ((i, inner_name), inner_type) in
146+
inner_names.into_iter().enumerate().zip(inner_types.iter())
147+
{
148+
let mut inner_path_indices = path_indices.clone();
149+
inner_path_indices.push(i);
150+
151+
self.add_column(
152+
inner_name.clone(),
153+
inner_type.clone(),
154+
Some(table_index),
155+
Some(inner_path_indices.clone()),
156+
);
157+
if inner_type.data_type_id() == TypeID::Struct {
158+
let inner_field = DataField::new(&inner_name, inner_type.clone());
159+
struct_fields.push_back((inner_path_indices, inner_field));
160+
}
161+
}
162+
}
163+
table_index
164+
}
165+
166+
pub fn find_smallest_column_within(&self, indices: &[usize]) -> usize {
167+
let entries = indices
168+
.iter()
169+
.map(|i| self.column(*i).clone())
170+
.collect::<Vec<_>>();
171+
find_smallest_column(entries.as_slice())
172+
}
173+
}
174+
175+
#[derive(Clone)]
176+
pub struct TableEntry {
177+
pub index: IndexType,
178+
pub name: String,
179+
pub catalog: String,
180+
pub database: String,
181+
182+
pub table: Arc<dyn Table>,
183+
}
184+
185+
impl Debug for TableEntry {
186+
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
187+
f.debug_struct("TableEntry")
188+
.field("index", &self.index)
189+
.field("name", &self.name)
190+
.field("catalog", &self.catalog)
191+
.field("database", &self.database)
192+
.finish_non_exhaustive()
193+
}
194+
}
195+
196+
impl TableEntry {
197+
pub fn new(
198+
index: IndexType,
199+
name: String,
200+
catalog: String,
201+
database: String,
202+
table: Arc<dyn Table>,
203+
) -> Self {
204+
TableEntry {
205+
index,
206+
name,
207+
catalog,
208+
database,
209+
table,
210+
}
211+
}
212+
}
213+
214+
#[derive(Clone, Debug)]
215+
pub struct ColumnEntry {
216+
pub column_index: IndexType,
217+
pub name: String,
218+
pub data_type: DataTypeImpl,
219+
220+
/// Table index of column entry. None if column is derived from a subquery.
221+
pub table_index: Option<IndexType>,
222+
/// Path indices for inner column of struct data type.
223+
pub path_indices: Option<Vec<IndexType>>,
224+
}
225+
226+
impl ColumnEntry {
227+
pub fn new(
228+
name: String,
229+
data_type: DataTypeImpl,
230+
column_index: IndexType,
231+
table_index: Option<IndexType>,
232+
path_indices: Option<Vec<IndexType>>,
233+
) -> Self {
234+
ColumnEntry {
235+
column_index,
236+
name,
237+
data_type,
238+
table_index,
239+
path_indices,
240+
}
241+
}
242+
}
243+
244+
/// TODO(xuanwo): migrate this as a function of metadata.
245+
pub fn find_smallest_column(entries: &[ColumnEntry]) -> usize {
246+
debug_assert!(!entries.is_empty());
247+
let mut column_indexes = entries
248+
.iter()
249+
.map(|entry| entry.column_index)
250+
.collect::<Vec<IndexType>>();
251+
column_indexes.sort();
252+
let mut smallest_index = column_indexes[0];
253+
let mut smallest_size = usize::MAX;
254+
for (idx, column_entry) in entries.iter().enumerate() {
255+
if let Ok(bytes) = column_entry.data_type.data_type_id().numeric_byte_size() {
256+
if smallest_size > bytes {
257+
smallest_size = bytes;
258+
smallest_index = entries[idx].column_index;
259+
}
260+
}
261+
}
262+
smallest_index
263+
}

src/query/service/src/interpreters/interpreter_explain_v2.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ use common_datablocks::DataBlock;
1818
use common_datavalues::prelude::*;
1919
use common_exception::ErrorCode;
2020
use common_exception::Result;
21+
use common_planner::MetadataRef;
2122

2223
use super::fragments::Fragmenter;
2324
use super::QueryFragmentsActions;
@@ -29,7 +30,6 @@ use crate::sql::executor::PhysicalPlanBuilder;
2930
use crate::sql::executor::PipelineBuilder;
3031
use crate::sql::optimizer::SExpr;
3132
use crate::sql::plans::Plan;
32-
use crate::sql::MetadataRef;
3333

3434
pub struct ExplainInterpreterV2 {
3535
ctx: Arc<QueryContext>,

src/query/service/src/interpreters/interpreter_insert_v2.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,8 @@ use common_pipeline_sources::processors::sources::AsyncSourcer;
3939
use common_pipeline_sources::processors::sources::SyncSource;
4040
use common_pipeline_sources::processors::sources::SyncSourcer;
4141
use common_pipeline_transforms::processors::transforms::Transform;
42+
use common_planner::Metadata;
43+
use common_planner::MetadataRef;
4244
use parking_lot::Mutex;
4345
use parking_lot::RwLock;
4446

@@ -66,8 +68,6 @@ use crate::sql::plans::InsertInputSource;
6668
use crate::sql::plans::Plan;
6769
use crate::sql::plans::Scalar;
6870
use crate::sql::BindContext;
69-
use crate::sql::Metadata;
70-
use crate::sql::MetadataRef;
7171
use crate::sql::NameResolutionContext;
7272

7373
pub struct InsertInterpreterV2 {
@@ -372,7 +372,7 @@ impl ValueSource {
372372
schema: DataSchemaRef,
373373
) -> Self {
374374
let bind_context = BindContext::new();
375-
let metadata = Arc::new(RwLock::new(Metadata::create()));
375+
let metadata = Arc::new(RwLock::new(Metadata::default()));
376376

377377
Self {
378378
data,

src/query/service/src/interpreters/interpreter_select_v2.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ use std::sync::Arc;
1616

1717
use common_datavalues::DataSchemaRef;
1818
use common_exception::Result;
19+
use common_planner::MetadataRef;
1920

2021
use super::plan_schedulers::schedule_query_v2;
2122
use crate::interpreters::Interpreter;
@@ -26,7 +27,6 @@ use crate::sql::executor::PhysicalPlanBuilder;
2627
use crate::sql::executor::PipelineBuilder;
2728
use crate::sql::optimizer::SExpr;
2829
use crate::sql::BindContext;
29-
use crate::sql::MetadataRef;
3030

3131
/// Interpret SQL query with ne&w SQL planner
3232
pub struct SelectInterpreterV2 {

src/query/service/src/sql/executor/expression_builder.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ use common_exception::ErrorCode;
1818
use common_exception::Result;
1919
use common_legacy_planners::Expression;
2020
use common_planner::IndexType;
21+
use common_planner::MetadataRef;
2122

2223
use crate::sql::executor::util::format_field_name;
2324
use crate::sql::plans::AggregateFunction;
@@ -29,7 +30,6 @@ use crate::sql::plans::ConstantExpr;
2930
use crate::sql::plans::FunctionCall;
3031
use crate::sql::plans::OrExpr;
3132
use crate::sql::plans::Scalar;
32-
use crate::sql::MetadataRef;
3333

3434
pub trait FiledNameFormat {
3535
fn format(display_name: &str, index: IndexType) -> String;

src/query/service/src/sql/executor/format.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ use common_exception::ErrorCode;
1717
use common_exception::Result;
1818
use common_legacy_planners::StageKind;
1919
use common_planner::IndexType;
20+
use common_planner::MetadataRef;
2021
use common_planner::DUMMY_TABLE_INDEX;
2122
use itertools::Itertools;
2223

@@ -32,7 +33,6 @@ use super::Project;
3233
use super::Sort;
3334
use super::TableScan;
3435
use super::UnionAll;
35-
use crate::sql::MetadataRef;
3636

3737
impl PhysicalPlan {
3838
pub fn format(&self, metadata: MetadataRef) -> Result<String> {

src/query/service/src/sql/executor/physical_plan_builder.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,8 @@ use common_legacy_planners::Extras;
2626
use common_legacy_planners::PrewhereInfo;
2727
use common_legacy_planners::Projection;
2828
use common_legacy_planners::StageKind;
29+
use common_planner::Metadata;
30+
use common_planner::MetadataRef;
2931
use common_planner::DUMMY_TABLE_INDEX;
3032
use common_storages_fuse::TableContext;
3133
use itertools::Itertools;
@@ -58,8 +60,6 @@ use crate::sql::plans::Exchange;
5860
use crate::sql::plans::PhysicalScan;
5961
use crate::sql::plans::RelOperator;
6062
use crate::sql::plans::Scalar;
61-
use crate::sql::Metadata;
62-
use crate::sql::MetadataRef;
6363
use crate::sql::ScalarExpr;
6464
use crate::storages::ToReadDataSourcePlan;
6565

0 commit comments

Comments
 (0)