Skip to content

Commit 4d4d0dc

Browse files
committed
Make every row group as a part.
1 parent d1bb641 commit 4d4d0dc

File tree

9 files changed

+264
-151
lines changed

9 files changed

+264
-151
lines changed

src/query/storages/parquet/src/lib.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ mod parquet_reader;
2020
mod parquet_source;
2121
mod table_function;
2222

23-
pub use parquet_part::ParquetPart;
23+
pub use parquet_part::ParquetLocationPart;
2424
pub use parquet_reader::ParquetReader;
2525
pub use parquet_source::ParquetSource;
2626
pub use table_function::ParquetTable;

src/query/storages/parquet/src/parquet_part.rs

Lines changed: 109 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -18,24 +18,25 @@ use std::hash::Hash;
1818
use std::hash::Hasher;
1919
use std::sync::Arc;
2020

21+
use common_arrow::parquet::compression::Compression as ParquetCompression;
2122
use common_catalog::plan::PartInfo;
2223
use common_catalog::plan::PartInfoPtr;
2324
use common_exception::ErrorCode;
2425
use common_exception::Result;
2526

2627
#[derive(serde::Serialize, serde::Deserialize, PartialEq, Eq)]
27-
pub struct ParquetPart {
28+
pub struct ParquetLocationPart {
2829
pub location: String,
2930
}
3031

31-
#[typetag::serde(name = "parquet")]
32-
impl PartInfo for ParquetPart {
32+
#[typetag::serde(name = "parquet_location")]
33+
impl PartInfo for ParquetLocationPart {
3334
fn as_any(&self) -> &dyn Any {
3435
self
3536
}
3637

3738
fn equals(&self, info: &Box<dyn PartInfo>) -> bool {
38-
match info.as_any().downcast_ref::<ParquetPart>() {
39+
match info.as_any().downcast_ref::<ParquetLocationPart>() {
3940
None => false,
4041
Some(other) => self == other,
4142
}
@@ -48,16 +49,115 @@ impl PartInfo for ParquetPart {
4849
}
4950
}
5051

51-
impl ParquetPart {
52+
impl ParquetLocationPart {
5253
pub fn create(location: String) -> Arc<Box<dyn PartInfo>> {
53-
Arc::new(Box::new(ParquetPart { location }))
54+
Arc::new(Box::new(ParquetLocationPart { location }))
5455
}
5556

56-
pub fn from_part(info: &PartInfoPtr) -> Result<&ParquetPart> {
57-
match info.as_any().downcast_ref::<ParquetPart>() {
57+
pub fn from_part(info: &PartInfoPtr) -> Result<&ParquetLocationPart> {
58+
match info.as_any().downcast_ref::<ParquetLocationPart>() {
5859
Some(part_ref) => Ok(part_ref),
5960
None => Err(ErrorCode::Internal(
60-
"Cannot downcast from PartInfo to FusePartInfo.",
61+
"Cannot downcast from PartInfo to ParquetLocationPart.",
62+
)),
63+
}
64+
}
65+
}
66+
67+
#[derive(serde::Serialize, serde::Deserialize, Debug, Eq, PartialEq, Hash, Clone, Copy)]
68+
pub enum Compression {
69+
Uncompressed,
70+
Snappy,
71+
Gzip,
72+
Lzo,
73+
Brotli,
74+
Lz4,
75+
Zstd,
76+
Lz4Raw,
77+
}
78+
79+
impl From<Compression> for ParquetCompression {
80+
fn from(value: Compression) -> Self {
81+
match value {
82+
Compression::Uncompressed => ParquetCompression::Uncompressed,
83+
Compression::Snappy => ParquetCompression::Snappy,
84+
Compression::Gzip => ParquetCompression::Gzip,
85+
Compression::Lzo => ParquetCompression::Lzo,
86+
Compression::Brotli => ParquetCompression::Brotli,
87+
Compression::Lz4 => ParquetCompression::Lz4,
88+
Compression::Zstd => ParquetCompression::Zstd,
89+
Compression::Lz4Raw => ParquetCompression::Lz4Raw,
90+
}
91+
}
92+
}
93+
94+
impl Into<Compression> for ParquetCompression {
95+
fn into(self) -> Compression {
96+
match self {
97+
ParquetCompression::Uncompressed => Compression::Uncompressed,
98+
ParquetCompression::Snappy => Compression::Snappy,
99+
ParquetCompression::Gzip => Compression::Gzip,
100+
ParquetCompression::Lzo => Compression::Lzo,
101+
ParquetCompression::Brotli => Compression::Brotli,
102+
ParquetCompression::Lz4 => Compression::Lz4,
103+
ParquetCompression::Zstd => Compression::Zstd,
104+
ParquetCompression::Lz4Raw => Compression::Lz4Raw,
105+
}
106+
}
107+
}
108+
109+
#[derive(serde::Serialize, serde::Deserialize, PartialEq, Eq)]
110+
pub struct ColumnMeta {
111+
pub offset: u64,
112+
pub length: u64,
113+
pub compression: Compression,
114+
}
115+
116+
#[derive(serde::Serialize, serde::Deserialize, PartialEq, Eq)]
117+
pub struct ParquetRowGroupPart {
118+
pub location: String,
119+
pub num_rows: usize,
120+
pub column_metas: Vec<ColumnMeta>,
121+
}
122+
123+
#[typetag::serde(name = "parquet_row_group")]
124+
impl PartInfo for ParquetRowGroupPart {
125+
fn as_any(&self) -> &dyn Any {
126+
self
127+
}
128+
129+
fn equals(&self, info: &Box<dyn PartInfo>) -> bool {
130+
match info.as_any().downcast_ref::<ParquetRowGroupPart>() {
131+
None => false,
132+
Some(other) => self == other,
133+
}
134+
}
135+
136+
fn hash(&self) -> u64 {
137+
let mut s = DefaultHasher::new();
138+
self.location.hash(&mut s);
139+
s.finish()
140+
}
141+
}
142+
143+
impl ParquetRowGroupPart {
144+
pub fn create(
145+
location: String,
146+
num_rows: usize,
147+
column_metas: Vec<ColumnMeta>,
148+
) -> Arc<Box<dyn PartInfo>> {
149+
Arc::new(Box::new(ParquetRowGroupPart {
150+
location,
151+
num_rows,
152+
column_metas,
153+
}))
154+
}
155+
156+
pub fn from_part(info: &PartInfoPtr) -> Result<&ParquetRowGroupPart> {
157+
match info.as_any().downcast_ref::<ParquetRowGroupPart>() {
158+
Some(part_ref) => Ok(part_ref),
159+
None => Err(ErrorCode::Internal(
160+
"Cannot downcast from PartInfo to ParquetRowGroupPart.",
61161
)),
62162
}
63163
}

src/query/storages/parquet/src/parquet_reader/deserialize.rs

Lines changed: 19 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,7 @@ use common_arrow::arrow::datatypes::Field;
2020
use common_arrow::arrow::io::parquet::read::column_iter_to_arrays;
2121
use common_arrow::arrow::io::parquet::read::ArrayIter;
2222
use common_arrow::arrow::io::parquet::read::RowGroupDeserializer;
23-
use common_arrow::parquet::metadata::ColumnChunkMetaData;
24-
use common_arrow::parquet::metadata::RowGroupMetaData;
23+
use common_arrow::parquet::metadata::ColumnDescriptor;
2524
use common_arrow::parquet::read::BasicDecompressor;
2625
use common_arrow::parquet::read::PageMetaData;
2726
use common_arrow::parquet::read::PageReader;
@@ -30,21 +29,19 @@ use common_exception::ErrorCode;
3029
use common_exception::Result;
3130
use common_storage::ColumnLeaf;
3231

32+
use crate::parquet_part::ColumnMeta;
33+
use crate::parquet_part::ParquetRowGroupPart;
3334
use crate::ParquetReader;
3435

3536
impl ParquetReader {
3637
pub fn deserialize(
3738
&self,
38-
rg: &RowGroupMetaData,
39+
part: &ParquetRowGroupPart,
3940
chunks: Vec<(usize, Vec<u8>)>,
4041
) -> Result<DataBlock> {
4142
let mut chunk_map: HashMap<usize, Vec<u8>> = chunks.into_iter().collect();
4243
let mut columns_array_iter = Vec::with_capacity(self.projected_schema.num_fields());
4344

44-
let column_metas = self
45-
.get_column_metas(rg)
46-
.into_iter()
47-
.collect::<HashMap<_, _>>();
4845
let column_leaves = &self.projected_column_leaves.column_leaves;
4946
let mut cnt_map = Self::build_projection_count_map(column_leaves);
5047

@@ -53,52 +50,58 @@ impl ParquetReader {
5350
let mut metas = Vec::with_capacity(indices.len());
5451
let mut chunks = Vec::with_capacity(indices.len());
5552
for index in indices {
56-
let column_meta = column_metas[index];
53+
let column_meta = &part.column_metas[*index];
5754
let cnt = cnt_map.get_mut(index).unwrap();
5855
*cnt -= 1;
5956
let column_chunk = if cnt > &mut 0 {
6057
chunk_map.get(index).unwrap().clone()
6158
} else {
6259
chunk_map.remove(index).unwrap()
6360
};
64-
metas.push(column_meta);
61+
let descriptor = &self.projected_column_descriptors[index];
62+
metas.push((column_meta, descriptor));
6563
chunks.push(column_chunk);
6664
}
6765
columns_array_iter.push(Self::to_array_iter(
6866
metas,
6967
chunks,
70-
rg.num_rows(),
68+
part.num_rows,
7169
column_leaf.field.clone(),
7270
)?);
7371
}
7472

75-
let mut deserializer = RowGroupDeserializer::new(columns_array_iter, rg.num_rows(), None);
73+
let mut deserializer = RowGroupDeserializer::new(columns_array_iter, part.num_rows, None);
7674

7775
self.try_next_block(&mut deserializer)
7876
}
7977

80-
/// Combine multiple columns into one arrow array.
8178
/// The number of columns can be greater than 1 because the it may be a nested type.
79+
/// Combine multiple columns into one arrow array.
8280
fn to_array_iter(
83-
metas: Vec<&ColumnChunkMetaData>,
81+
metas: Vec<(&ColumnMeta, &ColumnDescriptor)>,
8482
chunks: Vec<Vec<u8>>,
8583
rows: usize,
8684
field: Field,
8785
) -> Result<ArrayIter<'static>> {
8886
let (columns, types) = metas
8987
.iter()
9088
.zip(chunks.into_iter())
91-
.map(|(&meta, chunk)| {
89+
.map(|(&(meta, descriptor), chunk)| {
9290
let pages = PageReader::new_with_page_meta(
9391
std::io::Cursor::new(chunk),
94-
PageMetaData::from(meta),
92+
PageMetaData {
93+
column_start: meta.offset,
94+
num_values: meta.length as i64,
95+
compression: meta.compression.into(),
96+
descriptor: descriptor.descriptor.clone(),
97+
},
9598
Arc::new(|_, _| true),
9699
vec![],
97100
usize::MAX,
98101
);
99102
(
100103
BasicDecompressor::new(pages, vec![]),
101-
&meta.descriptor().descriptor.primitive_type,
104+
&descriptor.descriptor.primitive_type,
102105
)
103106
})
104107
.unzip();

src/query/storages/parquet/src/parquet_reader/meta.rs

Lines changed: 0 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -15,9 +15,7 @@
1515
use std::fs::File;
1616

1717
use common_arrow::arrow::io::parquet::read as pread;
18-
use common_arrow::parquet::metadata::ColumnChunkMetaData;
1918
use common_arrow::parquet::metadata::FileMetaData;
20-
use common_arrow::parquet::metadata::RowGroupMetaData;
2119
use common_datavalues::DataField;
2220
use common_datavalues::DataSchema;
2321
use common_exception::ErrorCode;
@@ -54,22 +52,4 @@ impl ParquetReader {
5452

5553
Ok(DataSchema::new(field))
5654
}
57-
58-
pub fn get_column_metas<'a>(
59-
&self,
60-
rg: &'a RowGroupMetaData,
61-
) -> Vec<(usize, &'a ColumnChunkMetaData)> {
62-
let columns = rg.columns();
63-
let column_leaves = &self.projected_column_leaves.column_leaves;
64-
let mut column_metas =
65-
Vec::with_capacity(column_leaves.iter().map(|col| col.leaf_ids.len()).sum());
66-
for column in column_leaves {
67-
let indices = &column.leaf_ids;
68-
for index in indices {
69-
let column_meta = &columns[*index];
70-
column_metas.push((*index, column_meta));
71-
}
72-
}
73-
column_metas
74-
}
7555
}

src/query/storages/parquet/src/parquet_reader/mod.rs

Lines changed: 36 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,8 +15,12 @@
1515
mod deserialize;
1616
mod meta;
1717
mod read;
18+
use std::collections::HashMap;
19+
use std::collections::HashSet;
1820
use std::sync::Arc;
1921

22+
use common_arrow::arrow::io::parquet::write::to_parquet_schema;
23+
use common_arrow::parquet::metadata::ColumnDescriptor;
2024
use common_catalog::plan::Projection;
2125
use common_datavalues::DataSchemaRef;
2226
use common_exception::Result;
@@ -27,8 +31,23 @@ pub use read::IndexedChunk;
2731
#[derive(Clone)]
2832
pub struct ParquetReader {
2933
operator: Operator,
34+
/// The indices of columns need to read by this reader.
35+
///
36+
/// Use [`HashSet`] to avoid duplicate indices.
37+
/// Duplicate indices will exist when there are nested types.
38+
///
39+
/// For example:
40+
///
41+
/// ```sql
42+
/// select a, a.b, a.c from t;
43+
/// ```
44+
columns_to_read: HashSet<usize>,
45+
/// The schema of the [`common_datablocks::DataBlock`] this reader produces.
3046
projected_schema: DataSchemaRef,
47+
/// [`ColumnLeaves`] corresponding to the `projected_schema`.
3148
projected_column_leaves: ColumnLeaves,
49+
/// [`ColumnDescriptor`]s corresponding to the `projected_schema`.
50+
projected_column_descriptors: HashMap<usize, ColumnDescriptor>,
3251
}
3352

3453
impl ParquetReader {
@@ -40,26 +59,42 @@ impl ParquetReader {
4059
// Full schema and column leaves.
4160
let arrow_schema = schema.to_arrow();
4261
let column_leaves = ColumnLeaves::new_from_schema(&arrow_schema);
62+
let schema_descriptors = to_parquet_schema(&arrow_schema)?;
4363

44-
// Projected schema and column leaves.
64+
// Project schema
4565
let projected_schema = match projection {
4666
Projection::Columns(ref indices) => DataSchemaRef::new(schema.project(indices)),
4767
Projection::InnerColumns(ref path_indices) => {
4868
DataSchemaRef::new(schema.inner_project(path_indices))
4969
}
5070
};
71+
// Project column leaves
5172
let projected_column_leaves = ColumnLeaves {
5273
column_leaves: projection
5374
.project_column_leaves(&column_leaves)?
5475
.iter()
5576
.map(|&leaf| leaf.clone())
5677
.collect(),
5778
};
79+
let column_leaves = &projected_column_leaves.column_leaves;
80+
// Project column descriptors and collect columns to read
81+
let mut projected_column_descriptors = HashMap::with_capacity(column_leaves.len());
82+
let mut columns_to_read =
83+
HashSet::with_capacity(column_leaves.iter().map(|leaf| leaf.leaf_ids.len()).sum());
84+
for column_leaf in column_leaves {
85+
for index in &column_leaf.leaf_ids {
86+
columns_to_read.insert(*index);
87+
projected_column_descriptors
88+
.insert(*index, schema_descriptors.columns()[*index].clone());
89+
}
90+
}
5891

5992
Ok(Arc::new(ParquetReader {
6093
operator,
94+
columns_to_read,
6195
projected_schema,
6296
projected_column_leaves,
97+
projected_column_descriptors,
6398
}))
6499
}
65100

0 commit comments

Comments
 (0)