Skip to content

Commit e41890f

Browse files
authored
Merge pull request #9080 from RinChanNOWWW/read-parquet
feat(query): new table function `read_parquet` to read parquet files as a table.
2 parents ee1e005 + d1bb0bf commit e41890f

File tree

37 files changed

+1880
-279
lines changed

37 files changed

+1880
-279
lines changed

Cargo.lock

Lines changed: 28 additions & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,7 @@ members = [
5858
"src/query/storages/stage",
5959
"src/query/storages/system",
6060
"src/query/storages/view",
61+
"src/query/storages/parquet",
6162
"src/query/users",
6263
# databend-query
6364
"src/query/service",

src/common/storage/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ edition = { workspace = true }
1010
storage-hdfs = ["opendal/services-hdfs"]
1111

1212
[dependencies]
13+
common-arrow = { path = "../arrow" }
1314
common-auth = { path = "../auth" }
1415
common-base = { path = "../base" }
1516
common-exception = { path = "../exception" }

src/common/storage/src/column_leaf.rs

Lines changed: 100 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,100 @@
1+
// Copyright 2022 Datafuse Labs.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
//! This module provides data structures for build column indexes.
16+
//! It's used by Fuse Engine and Parquet Engine.
17+
18+
use common_arrow::arrow::datatypes::DataType as ArrowType;
19+
use common_arrow::arrow::datatypes::Field as ArrowField;
20+
use common_arrow::arrow::datatypes::Schema as ArrowSchema;
21+
use common_exception::ErrorCode;
22+
use common_exception::Result;
23+
24+
#[derive(Debug, Clone)]
25+
pub struct ColumnLeaves {
26+
pub column_leaves: Vec<ColumnLeaf>,
27+
}
28+
29+
impl ColumnLeaves {
30+
pub fn new_from_schema(schema: &ArrowSchema) -> Self {
31+
let mut leaf_id = 0;
32+
let mut column_leaves = Vec::with_capacity(schema.fields.len());
33+
34+
for field in &schema.fields {
35+
let column_leaf = Self::traverse_fields_dfs(field, &mut leaf_id);
36+
column_leaves.push(column_leaf);
37+
}
38+
39+
Self { column_leaves }
40+
}
41+
42+
fn traverse_fields_dfs(field: &ArrowField, leaf_id: &mut usize) -> ColumnLeaf {
43+
match &field.data_type {
44+
ArrowType::Struct(inner_fields) => {
45+
let mut child_column_leaves = Vec::with_capacity(inner_fields.len());
46+
let mut child_leaf_ids = Vec::with_capacity(inner_fields.len());
47+
for inner_field in inner_fields {
48+
let child_column_leaf = Self::traverse_fields_dfs(inner_field, leaf_id);
49+
child_leaf_ids.extend(child_column_leaf.leaf_ids.clone());
50+
child_column_leaves.push(child_column_leaf);
51+
}
52+
ColumnLeaf::new(field.clone(), child_leaf_ids, Some(child_column_leaves))
53+
}
54+
_ => {
55+
let column_leaf = ColumnLeaf::new(field.clone(), vec![*leaf_id], None);
56+
*leaf_id += 1;
57+
column_leaf
58+
}
59+
}
60+
}
61+
62+
pub fn traverse_path<'a>(
63+
column_leaves: &'a [ColumnLeaf],
64+
path: &'a [usize],
65+
) -> Result<&'a ColumnLeaf> {
66+
let column_leaf = &column_leaves[path[0]];
67+
if path.len() > 1 {
68+
return match &column_leaf.children {
69+
Some(ref children) => Self::traverse_path(children, &path[1..]),
70+
None => Err(ErrorCode::Internal(format!(
71+
"Cannot get column_leaf by path: {:?}",
72+
path
73+
))),
74+
};
75+
}
76+
Ok(column_leaf)
77+
}
78+
}
79+
80+
/// `ColumnLeaf` contains all the leaf column ids of the column.
81+
/// For the nested types, it may contain more than one leaf column.
82+
#[derive(Debug, Clone)]
83+
pub struct ColumnLeaf {
84+
pub field: ArrowField,
85+
// `leaf_ids` is the indices of all the leaf columns in DFS order,
86+
// through which we can find the meta information of the leaf columns.
87+
pub leaf_ids: Vec<usize>,
88+
// Optional children column for nested types.
89+
pub children: Option<Vec<ColumnLeaf>>,
90+
}
91+
92+
impl ColumnLeaf {
93+
pub fn new(field: ArrowField, leaf_ids: Vec<usize>, children: Option<Vec<ColumnLeaf>>) -> Self {
94+
Self {
95+
field,
96+
leaf_ids,
97+
children,
98+
}
99+
}
100+
}

src/common/storage/src/lib.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,3 +68,7 @@ mod utils;
6868

6969
mod cache;
7070
pub use cache::FuseCachePolicy;
71+
72+
mod column_leaf;
73+
pub use column_leaf::ColumnLeaf;
74+
pub use column_leaf::ColumnLeaves;

src/query/catalog/src/plan/projection.rs

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,9 @@ use std::collections::BTreeMap;
1616
use std::fmt::Formatter;
1717

1818
use common_datavalues::DataSchema;
19+
use common_exception::Result;
20+
use common_storage::ColumnLeaf;
21+
use common_storage::ColumnLeaves;
1922

2023
#[derive(serde::Serialize, serde::Deserialize, Clone, PartialEq, Eq)]
2124
pub enum Projection {
@@ -49,6 +52,28 @@ impl Projection {
4952
Projection::InnerColumns(path_indices) => schema.inner_project(path_indices),
5053
}
5154
}
55+
56+
pub fn project_column_leaves<'a>(
57+
&'a self,
58+
column_leaves: &'a ColumnLeaves,
59+
) -> Result<Vec<&ColumnLeaf>> {
60+
let column_leaves = match self {
61+
Projection::Columns(indices) => indices
62+
.iter()
63+
.map(|idx| &column_leaves.column_leaves[*idx])
64+
.collect(),
65+
Projection::InnerColumns(path_indices) => {
66+
let paths: Vec<&Vec<usize>> = path_indices.values().collect();
67+
paths
68+
.iter()
69+
.map(|path| {
70+
ColumnLeaves::traverse_path(&column_leaves.column_leaves, path).unwrap()
71+
})
72+
.collect()
73+
}
74+
};
75+
Ok(column_leaves)
76+
}
5277
}
5378

5479
impl core::fmt::Debug for Projection {

src/query/catalog/src/plan/pushdown.rs

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414

1515
use std::fmt::Debug;
1616

17+
use common_datavalues::DataSchemaRef;
1718
use common_meta_types::UserStageInfo;
1819

1920
use crate::plan::Expression;
@@ -61,3 +62,31 @@ pub struct PushDownInfo {
6162
/// Optional stage info, used for COPY into <table> from stage
6263
pub stage: Option<StagePushDownInfo>,
6364
}
65+
66+
impl PushDownInfo {
67+
pub fn prewhere_of_push_downs(push_downs: &Option<PushDownInfo>) -> Option<PrewhereInfo> {
68+
if let Some(PushDownInfo { prewhere, .. }) = push_downs {
69+
prewhere.clone()
70+
} else {
71+
None
72+
}
73+
}
74+
75+
pub fn projection_of_push_downs(
76+
schema: &DataSchemaRef,
77+
push_downs: &Option<PushDownInfo>,
78+
) -> Projection {
79+
if let Some(PushDownInfo {
80+
projection: Some(prj),
81+
..
82+
}) = push_downs
83+
{
84+
prj.clone()
85+
} else {
86+
let indices = (0..schema.fields().len())
87+
.into_iter()
88+
.collect::<Vec<usize>>();
89+
Projection::Columns(indices)
90+
}
91+
}
92+
}

src/query/service/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,7 @@ common-storages-index = { path = "../storages/index" }
6565
common-storages-information-schema = { path = "../storages/information-schema" }
6666
common-storages-memory = { path = "../storages/memory" }
6767
common-storages-null = { path = "../storages/null" }
68+
common-storages-parquet = { path = "../storages/parquet" }
6869
common-storages-random = { path = "../storages/random" }
6970
common-storages-share = { path = "../storages/share" }
7071
common-storages-stage = { path = "../storages/stage" }

src/query/service/src/table_functions/memory_block_part.rs

Lines changed: 0 additions & 43 deletions
This file was deleted.

src/query/service/src/table_functions/mod.rs

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -13,16 +13,14 @@
1313
// limitations under the License.
1414

1515
mod async_crash_me;
16-
mod memory_block_part;
17-
mod numbers_part;
18-
mod numbers_table;
16+
mod numbers;
1917
mod sync_crash_me;
2018
mod table_function;
2119
mod table_function_factory;
2220

23-
pub use memory_block_part::generate_numbers_parts;
24-
pub use numbers_part::NumbersPartInfo;
25-
pub use numbers_table::NumbersTable;
21+
pub use numbers::generate_numbers_parts;
22+
pub use numbers::NumbersPartInfo;
23+
pub use numbers::NumbersTable;
2624
pub use table_function::TableFunction;
2725
pub use table_function_factory::TableArgs;
2826
pub use table_function_factory::TableFunctionFactory;

0 commit comments

Comments
 (0)