Skip to content

Commit 58b456d

Browse files
committed
Convert ArrowSchema to DataSchema only when convert ArrowChunk to DataBlock.
1 parent 5d79da6 commit 58b456d

File tree

7 files changed

+129
-40
lines changed

7 files changed

+129
-40
lines changed

โ€Žsrc/common/arrow/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616

1717
mod parquet_read;
1818
mod parquet_write;
19+
pub mod schema_projection;
1920

2021
pub use arrow;
2122
pub use arrow_format;
Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
// Copyright 2022 Datafuse Labs.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
use std::collections::BTreeMap;
16+
17+
use arrow::datatypes::DataType;
18+
use arrow::datatypes::Field;
19+
use arrow::datatypes::Schema;
20+
21+
/// Project a [`Schema`] by picking the fields at the given indices.
22+
pub fn project(schema: &Schema, indices: &[usize]) -> Schema {
23+
let fields = indices
24+
.iter()
25+
.map(|idx| schema.fields[*idx].clone())
26+
.collect::<Vec<_>>();
27+
Schema::with_metadata(fields.into(), schema.metadata.clone())
28+
}
29+
30+
/// Project a [`Schema`] with inner columns by path.
31+
pub fn inner_project(schema: &Schema, path_indices: &BTreeMap<usize, Vec<usize>>) -> Schema {
32+
let paths: Vec<Vec<usize>> = path_indices.values().cloned().collect();
33+
let fields = paths
34+
.iter()
35+
.map(|path| traverse_paths(&schema.fields, path))
36+
.collect::<Vec<_>>();
37+
Schema::with_metadata(fields.into(), schema.metadata.clone())
38+
}
39+
40+
fn traverse_paths(fields: &[Field], path: &[usize]) -> Field {
41+
assert!(!path.is_empty());
42+
let field = &fields[path[0]];
43+
if path.len() == 1 {
44+
return field.clone();
45+
}
46+
if let DataType::Struct(inner_fields) = field.data_type() {
47+
return traverse_paths(inner_fields, &path[1..]);
48+
}
49+
unreachable!("Unable to get field paths. Fields: {:?}", fields);
50+
}

โ€Žsrc/query/storages/parquet/src/parquet_reader/deserialize.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ impl ParquetReader {
4545
filter: Option<Bitmap>,
4646
) -> Result<DataBlock> {
4747
let mut chunk_map: HashMap<usize, Vec<u8>> = chunks.into_iter().collect();
48-
let mut columns_array_iter = Vec::with_capacity(self.projected_schema.num_fields());
48+
let mut columns_array_iter = Vec::with_capacity(self.projected_arrow_schema.fields.len());
4949

5050
let column_leaves = &self.projected_column_leaves.column_leaves;
5151
let mut cnt_map = Self::build_projection_count_map(column_leaves);
@@ -201,7 +201,7 @@ impl ParquetReader {
201201
"deserializer from row group: fail to get a chunk",
202202
)),
203203
Some(Err(cause)) => Err(ErrorCode::from(cause)),
204-
Some(Ok(chunk)) => DataBlock::from_chunk(&self.projected_schema, &chunk),
204+
Some(Ok(chunk)) => DataBlock::from_chunk(&self.output_schema, &chunk),
205205
}
206206
}
207207

โ€Žsrc/query/storages/parquet/src/parquet_reader/meta.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -14,9 +14,9 @@
1414

1515
use std::fs::File;
1616

17+
use common_arrow::arrow::datatypes::Schema as ArrowSchema;
1718
use common_arrow::arrow::io::parquet::read as pread;
1819
use common_arrow::parquet::metadata::FileMetaData;
19-
use common_datavalues::DataSchema;
2020
use common_exception::ErrorCode;
2121
use common_exception::Result;
2222

@@ -36,11 +36,11 @@ impl ParquetReader {
3636
}
3737

3838
#[inline]
39-
pub fn infer_schema(meta: &FileMetaData) -> Result<DataSchema> {
39+
pub fn infer_schema(meta: &FileMetaData) -> Result<ArrowSchema> {
4040
let mut arrow_schema = pread::infer_schema(meta)?;
4141
arrow_schema.fields.iter_mut().for_each(|f| {
4242
f.name = f.name.to_lowercase();
4343
});
44-
Ok(DataSchema::from(arrow_schema))
44+
Ok(arrow_schema)
4545
}
4646
}

โ€Žsrc/query/storages/parquet/src/parquet_reader/mod.rs

Lines changed: 57 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -21,9 +21,12 @@ use std::collections::HashMap;
2121
use std::collections::HashSet;
2222
use std::sync::Arc;
2323

24+
use common_arrow::arrow::datatypes::Schema as ArrowSchema;
2425
use common_arrow::arrow::io::parquet::write::to_parquet_schema;
2526
use common_arrow::parquet::metadata::ColumnDescriptor;
27+
use common_arrow::schema_projection as ap;
2628
use common_catalog::plan::Projection;
29+
use common_datavalues::DataSchema;
2730
use common_datavalues::DataSchemaRef;
2831
use common_exception::Result;
2932
use common_storage::ColumnLeaves;
@@ -51,7 +54,13 @@ pub struct ParquetReader {
5154
/// ```
5255
columns_to_read: HashSet<usize>,
5356
/// The schema of the [`common_datablocks::DataBlock`] this reader produces.
54-
projected_schema: DataSchemaRef,
57+
output_schema: DataSchemaRef,
58+
/// The actual schema used to read parquet. It will be converted to [`common_datavalues::DataSchema`] when output [`common_datablocks::DataBlock`].
59+
///
60+
/// The reason of using [`ArrowSchema`] to read parquet is that
61+
/// There are some types that Databend not support such as Timestmap of nanoseconds.
62+
/// Such types will be convert to supported types after deserialization.
63+
projected_arrow_schema: ArrowSchema,
5564
/// [`ColumnLeaves`] corresponding to the `projected_schema`.
5665
projected_column_leaves: ColumnLeaves,
5766
/// [`ColumnDescriptor`]s corresponding to the `projected_schema`.
@@ -61,16 +70,53 @@ pub struct ParquetReader {
6170
impl ParquetReader {
6271
pub fn create(
6372
operator: Operator,
64-
schema: DataSchemaRef,
73+
schema: ArrowSchema,
6574
projection: Projection,
6675
) -> Result<Arc<ParquetReader>> {
67-
// Full schema and column leaves.
68-
let arrow_schema = schema.to_arrow();
69-
let column_leaves = ColumnLeaves::new_from_schema(&arrow_schema);
70-
let schema_descriptors = to_parquet_schema(&arrow_schema)?;
76+
let (
77+
projected_arrow_schema,
78+
projected_column_leaves,
79+
projected_column_descriptors,
80+
columns_to_read,
81+
) = Self::do_projection(&schema, &projection)?;
7182

83+
Ok(Arc::new(ParquetReader {
84+
operator,
85+
columns_to_read,
86+
output_schema: Arc::new(DataSchema::from(&projected_arrow_schema)),
87+
projected_arrow_schema,
88+
projected_column_leaves,
89+
projected_column_descriptors,
90+
}))
91+
}
92+
93+
pub fn output_schema(&self) -> DataSchemaRef {
94+
self.output_schema.clone()
95+
}
96+
97+
pub fn columns_to_read(&self) -> &HashSet<usize> {
98+
&self.columns_to_read
99+
}
100+
101+
/// Project the schema and get the needed column leaves.
102+
#[allow(clippy::type_complexity)]
103+
pub fn do_projection(
104+
schema: &ArrowSchema,
105+
projection: &Projection,
106+
) -> Result<(
107+
ArrowSchema,
108+
ColumnLeaves,
109+
HashMap<usize, ColumnDescriptor>,
110+
HashSet<usize>,
111+
)> {
112+
// Full schema and column leaves.
113+
let column_leaves = ColumnLeaves::new_from_schema(schema);
114+
let schema_descriptors = to_parquet_schema(schema)?;
72115
// Project schema
73-
let projected_schema = DataSchemaRef::new(projection.project_schema(&schema));
116+
let projected_arrow_schema = match projection {
117+
Projection::Columns(indices) => ap::project(schema, indices),
118+
Projection::InnerColumns(path_indices) => ap::inner_project(schema, path_indices),
119+
};
74120
// Project column leaves
75121
let projected_column_leaves = ColumnLeaves {
76122
column_leaves: projection
@@ -91,21 +137,11 @@ impl ParquetReader {
91137
.insert(*index, schema_descriptors.columns()[*index].clone());
92138
}
93139
}
94-
95-
Ok(Arc::new(ParquetReader {
96-
operator,
97-
columns_to_read,
98-
projected_schema,
140+
Ok((
141+
projected_arrow_schema,
99142
projected_column_leaves,
100143
projected_column_descriptors,
101-
}))
102-
}
103-
104-
pub fn schema(&self) -> DataSchemaRef {
105-
self.projected_schema.clone()
106-
}
107-
108-
pub fn columns_to_read(&self) -> &HashSet<usize> {
109-
&self.columns_to_read
144+
columns_to_read,
145+
))
110146
}
111147
}

โ€Žsrc/query/storages/parquet/src/table_function/read.rs

Lines changed: 10 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -37,8 +37,7 @@ use crate::ParquetSource;
3737

3838
impl ParquetTable {
3939
pub fn create_reader(&self, projection: Projection) -> Result<Arc<ParquetReader>> {
40-
let table_schema = self.table_info.schema();
41-
ParquetReader::create(self.operator.clone(), table_schema, projection)
40+
ParquetReader::create(self.operator.clone(), self.arrow_schema.clone(), projection)
4241
}
4342

4443
// Build the prewhere reader.
@@ -133,19 +132,15 @@ impl ParquetTable {
133132
let max_io_requests = self.adjust_io_request(&ctx, columns_to_read.len())?;
134133
let ctx_ref = ctx.clone();
135134
// `dummy_reader` is only used for prune columns in row groups.
136-
let dummy_reader = ParquetReader::create(
137-
self.operator.clone(),
138-
plan.source_info.schema(),
139-
columns_to_read,
140-
)?;
135+
let (_, _, _, columns_to_read) =
136+
ParquetReader::do_projection(&plan.source_info.schema().to_arrow(), &columns_to_read)?;
141137
pipeline.set_on_init(move || {
142138
let mut partitions = Vec::with_capacity(locations.len());
143139
for location in &locations {
144140
let file_meta = ParquetReader::read_meta(location)?;
145141
for rg in &file_meta.row_groups {
146-
let mut column_metas =
147-
HashMap::with_capacity(dummy_reader.columns_to_read().len());
148-
for index in dummy_reader.columns_to_read() {
142+
let mut column_metas = HashMap::with_capacity(columns_to_read.len());
143+
for index in &columns_to_read {
149144
let c = &rg.columns()[*index];
150145
let (offset, length) = c.byte_range();
151146
column_metas.insert(*index, ColumnMeta {
@@ -178,8 +173,11 @@ impl ParquetTable {
178173
let output_schema = Arc::new(output_projection.project_schema(&plan.source_info.schema()));
179174

180175
let prewhere_reader = self.build_prewhere_reader(plan)?;
181-
let prewhere_filter =
182-
self.build_prewhere_filter_executor(ctx.clone(), plan, prewhere_reader.schema())?;
176+
let prewhere_filter = self.build_prewhere_filter_executor(
177+
ctx.clone(),
178+
plan,
179+
prewhere_reader.output_schema(),
180+
)?;
183181
let remain_reader = self.build_remain_reader(plan)?;
184182

185183
// Add source pipe.

โ€Žsrc/query/storages/parquet/src/table_function/table.rs

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ use std::sync::Arc;
1818
use chrono::NaiveDateTime;
1919
use chrono::TimeZone;
2020
use chrono::Utc;
21+
use common_arrow::arrow::datatypes::Schema as ArrowSchema;
2122
use common_catalog::plan::DataSourcePlan;
2223
use common_catalog::plan::PartStatistics;
2324
use common_catalog::plan::Partitions;
@@ -27,6 +28,7 @@ use common_catalog::table::Table;
2728
use common_catalog::table_args::TableArgs;
2829
use common_catalog::table_function::TableFunction;
2930
use common_config::GlobalConfig;
31+
use common_datavalues::DataSchema;
3032
use common_datavalues::DataValue;
3133
use common_exception::ErrorCode;
3234
use common_exception::Result;
@@ -45,6 +47,7 @@ pub struct ParquetTable {
4547

4648
file_locations: Vec<String>,
4749
pub(super) table_info: TableInfo,
50+
pub(super) arrow_schema: ArrowSchema,
4851
pub(super) operator: Operator,
4952
}
5053

@@ -105,14 +108,14 @@ impl ParquetTable {
105108
// Assume all parquet files have the same schema.
106109
// If not, throw error during reading.
107110
let first_meta = ParquetReader::read_meta(&file_locations[0])?;
108-
let schema = ParquetReader::infer_schema(&first_meta)?;
111+
let arrow_schema = ParquetReader::infer_schema(&first_meta)?;
109112

110113
let table_info = TableInfo {
111114
ident: TableIdent::new(table_id, 0),
112115
desc: format!("'{}'.'{}'", database_name, table_func_name),
113116
name: table_func_name.to_string(),
114117
meta: TableMeta {
115-
schema: Arc::new(schema),
118+
schema: Arc::new(DataSchema::from(&arrow_schema)),
116119
engine: "SystemReadParquet".to_string(),
117120
// Assuming that created_on is unnecessary for function table,
118121
// we could make created_on fixed to pass test_shuffle_action_try_into.
@@ -131,6 +134,7 @@ impl ParquetTable {
131134
table_args,
132135
file_locations,
133136
table_info,
137+
arrow_schema,
134138
operator,
135139
}))
136140
}

0 commit comments

Comments
ย (0)