Skip to content

Commit 780f63a

Browse files
authored
Merge pull request #9154 from RinChanNOWWW/improve-read-parquet
feat: `read_parquet` read meta before read data.
2 parents 7e66eca + d12105a commit 780f63a

File tree

14 files changed

+513
-695
lines changed

14 files changed

+513
-695
lines changed

Cargo.lock

Lines changed: 0 additions & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

src/query/storages/parquet/Cargo.toml

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,6 @@ common-storage = { path = "../../../common/storage" }
2727

2828
async-trait = { version = "0.1.57", package = "async-trait-fn" }
2929
chrono = { workspace = true }
30-
futures = "0.3.24"
3130
glob = "0.3.0"
3231
opendal = "0.22"
3332
serde = { workspace = true }

src/query/storages/parquet/src/lib.rs

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,7 @@ mod parquet_reader;
2020
mod parquet_source;
2121
mod table_function;
2222

23-
pub use parquet_column::ParquetColumnMeta;
24-
pub use parquet_part::ParquetPartInfo;
23+
pub use parquet_part::ParquetLocationPart;
2524
pub use parquet_reader::ParquetReader;
26-
pub use parquet_source::ParquetTableSource;
25+
pub use parquet_source::ParquetSource;
2726
pub use table_function::ParquetTable;

src/query/storages/parquet/src/parquet_column.rs

Lines changed: 0 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -55,28 +55,3 @@ impl From<ParquetCompression> for Compression {
5555
}
5656
}
5757
}
58-
59-
#[derive(serde::Serialize, serde::Deserialize, PartialEq, Eq)]
60-
pub struct ParquetColumnMeta {
61-
pub offset: u64,
62-
pub length: u64,
63-
pub num_values: u64,
64-
65-
pub compression: Compression,
66-
}
67-
68-
impl ParquetColumnMeta {
69-
pub fn create(
70-
offset: u64,
71-
length: u64,
72-
num_values: u64,
73-
compression: Compression,
74-
) -> ParquetColumnMeta {
75-
ParquetColumnMeta {
76-
offset,
77-
length,
78-
num_values,
79-
compression,
80-
}
81-
}
82-
}

src/query/storages/parquet/src/parquet_part.rs

Lines changed: 104 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -19,31 +19,116 @@ use std::hash::Hash;
1919
use std::hash::Hasher;
2020
use std::sync::Arc;
2121

22+
use common_arrow::parquet::compression::Compression as ParquetCompression;
2223
use common_catalog::plan::PartInfo;
2324
use common_catalog::plan::PartInfoPtr;
2425
use common_exception::ErrorCode;
2526
use common_exception::Result;
2627

27-
use crate::ParquetColumnMeta;
28+
#[derive(serde::Serialize, serde::Deserialize, PartialEq, Eq)]
29+
pub struct ParquetLocationPart {
30+
pub location: String,
31+
}
32+
33+
#[typetag::serde(name = "parquet_location")]
34+
impl PartInfo for ParquetLocationPart {
35+
fn as_any(&self) -> &dyn Any {
36+
self
37+
}
38+
39+
fn equals(&self, info: &Box<dyn PartInfo>) -> bool {
40+
match info.as_any().downcast_ref::<ParquetLocationPart>() {
41+
None => false,
42+
Some(other) => self == other,
43+
}
44+
}
45+
46+
fn hash(&self) -> u64 {
47+
let mut s = DefaultHasher::new();
48+
self.location.hash(&mut s);
49+
s.finish()
50+
}
51+
}
52+
53+
impl ParquetLocationPart {
54+
pub fn create(location: String) -> Arc<Box<dyn PartInfo>> {
55+
Arc::new(Box::new(ParquetLocationPart { location }))
56+
}
57+
58+
pub fn from_part(info: &PartInfoPtr) -> Result<&ParquetLocationPart> {
59+
match info.as_any().downcast_ref::<ParquetLocationPart>() {
60+
Some(part_ref) => Ok(part_ref),
61+
None => Err(ErrorCode::Internal(
62+
"Cannot downcast from PartInfo to ParquetLocationPart.",
63+
)),
64+
}
65+
}
66+
}
67+
68+
#[derive(serde::Serialize, serde::Deserialize, Debug, Eq, PartialEq, Hash, Clone, Copy)]
69+
pub enum Compression {
70+
Uncompressed,
71+
Snappy,
72+
Gzip,
73+
Lzo,
74+
Brotli,
75+
Lz4,
76+
Zstd,
77+
Lz4Raw,
78+
}
79+
80+
impl From<Compression> for ParquetCompression {
81+
fn from(value: Compression) -> Self {
82+
match value {
83+
Compression::Uncompressed => ParquetCompression::Uncompressed,
84+
Compression::Snappy => ParquetCompression::Snappy,
85+
Compression::Gzip => ParquetCompression::Gzip,
86+
Compression::Lzo => ParquetCompression::Lzo,
87+
Compression::Brotli => ParquetCompression::Brotli,
88+
Compression::Lz4 => ParquetCompression::Lz4,
89+
Compression::Zstd => ParquetCompression::Zstd,
90+
Compression::Lz4Raw => ParquetCompression::Lz4Raw,
91+
}
92+
}
93+
}
94+
95+
impl From<ParquetCompression> for Compression {
96+
fn from(value: ParquetCompression) -> Self {
97+
match value {
98+
ParquetCompression::Uncompressed => Compression::Uncompressed,
99+
ParquetCompression::Snappy => Compression::Snappy,
100+
ParquetCompression::Gzip => Compression::Gzip,
101+
ParquetCompression::Lzo => Compression::Lzo,
102+
ParquetCompression::Brotli => Compression::Brotli,
103+
ParquetCompression::Lz4 => Compression::Lz4,
104+
ParquetCompression::Zstd => Compression::Zstd,
105+
ParquetCompression::Lz4Raw => Compression::Lz4Raw,
106+
}
107+
}
108+
}
109+
110+
#[derive(serde::Serialize, serde::Deserialize, PartialEq, Eq)]
111+
pub struct ColumnMeta {
112+
pub offset: u64,
113+
pub length: u64,
114+
pub compression: Compression,
115+
}
28116

29117
#[derive(serde::Serialize, serde::Deserialize, PartialEq, Eq)]
30-
pub struct ParquetPartInfo {
118+
pub struct ParquetRowGroupPart {
31119
pub location: String,
32-
/// FusePartInfo itself is not versioned
33-
/// the `format_version` is the version of the block which the `location` points to
34-
pub format_version: u64,
35-
pub nums_rows: usize,
36-
pub columns_meta: HashMap<usize, ParquetColumnMeta>,
120+
pub num_rows: usize,
121+
pub column_metas: HashMap<usize, ColumnMeta>,
37122
}
38123

39-
#[typetag::serde(name = "parquet")]
40-
impl PartInfo for ParquetPartInfo {
124+
#[typetag::serde(name = "parquet_row_group")]
125+
impl PartInfo for ParquetRowGroupPart {
41126
fn as_any(&self) -> &dyn Any {
42127
self
43128
}
44129

45130
fn equals(&self, info: &Box<dyn PartInfo>) -> bool {
46-
match info.as_any().downcast_ref::<ParquetPartInfo>() {
131+
match info.as_any().downcast_ref::<ParquetRowGroupPart>() {
47132
None => false,
48133
Some(other) => self == other,
49134
}
@@ -56,26 +141,24 @@ impl PartInfo for ParquetPartInfo {
56141
}
57142
}
58143

59-
impl ParquetPartInfo {
144+
impl ParquetRowGroupPart {
60145
pub fn create(
61146
location: String,
62-
format_version: u64,
63-
rows_count: u64,
64-
columns_meta: HashMap<usize, ParquetColumnMeta>,
147+
num_rows: usize,
148+
column_metas: HashMap<usize, ColumnMeta>,
65149
) -> Arc<Box<dyn PartInfo>> {
66-
Arc::new(Box::new(ParquetPartInfo {
150+
Arc::new(Box::new(ParquetRowGroupPart {
67151
location,
68-
format_version,
69-
columns_meta,
70-
nums_rows: rows_count as usize,
152+
num_rows,
153+
column_metas,
71154
}))
72155
}
73156

74-
pub fn from_part(info: &PartInfoPtr) -> Result<&ParquetPartInfo> {
75-
match info.as_any().downcast_ref::<ParquetPartInfo>() {
157+
pub fn from_part(info: &PartInfoPtr) -> Result<&ParquetRowGroupPart> {
158+
match info.as_any().downcast_ref::<ParquetRowGroupPart>() {
76159
Some(part_ref) => Ok(part_ref),
77160
None => Err(ErrorCode::Internal(
78-
"Cannot downcast from PartInfo to FusePartInfo.",
161+
"Cannot downcast from PartInfo to ParquetRowGroupPart.",
79162
)),
80163
}
81164
}

src/query/storages/parquet/src/parquet_reader/deserialize.rs

Lines changed: 61 additions & 69 deletions
Original file line numberDiff line numberDiff line change
@@ -24,105 +24,97 @@ use common_arrow::parquet::metadata::ColumnDescriptor;
2424
use common_arrow::parquet::read::BasicDecompressor;
2525
use common_arrow::parquet::read::PageMetaData;
2626
use common_arrow::parquet::read::PageReader;
27-
use common_catalog::plan::PartInfoPtr;
2827
use common_datablocks::DataBlock;
2928
use common_exception::ErrorCode;
3029
use common_exception::Result;
3130
use common_storage::ColumnLeaf;
3231

33-
use crate::ParquetColumnMeta;
34-
use crate::ParquetPartInfo;
32+
use crate::parquet_part::ColumnMeta;
33+
use crate::parquet_part::ParquetRowGroupPart;
3534
use crate::ParquetReader;
3635

3736
impl ParquetReader {
38-
fn to_array_iter(
39-
metas: Vec<&ParquetColumnMeta>,
40-
chunks: Vec<Vec<u8>>,
41-
rows: usize,
42-
column_descriptors: Vec<&ColumnDescriptor>,
43-
field: Field,
44-
) -> Result<ArrayIter<'static>> {
45-
let columns = metas
46-
.iter()
47-
.zip(chunks.into_iter().zip(column_descriptors.iter()))
48-
.map(|(meta, (chunk, column_descriptor))| {
49-
let page_meta_data = PageMetaData {
50-
column_start: meta.offset,
51-
num_values: meta.num_values as i64,
52-
compression: meta.compression.into(),
53-
descriptor: column_descriptor.descriptor.clone(),
54-
};
55-
let pages = PageReader::new_with_page_meta(
56-
std::io::Cursor::new(chunk),
57-
page_meta_data,
58-
Arc::new(|_, _| true),
59-
vec![],
60-
usize::MAX,
61-
);
62-
Ok(BasicDecompressor::new(pages, vec![]))
63-
})
64-
.collect::<Result<Vec<_>>>()?;
65-
66-
let types = column_descriptors
67-
.iter()
68-
.map(|column_descriptor| &column_descriptor.descriptor.primitive_type)
69-
.collect::<Vec<_>>();
70-
71-
Ok(column_iter_to_arrays(
72-
columns,
73-
types,
74-
field,
75-
Some(rows),
76-
rows,
77-
)?)
78-
}
79-
8037
pub fn deserialize(
8138
&self,
82-
part: PartInfoPtr,
39+
part: &ParquetRowGroupPart,
8340
chunks: Vec<(usize, Vec<u8>)>,
8441
) -> Result<DataBlock> {
85-
let part = ParquetPartInfo::from_part(&part)?;
8642
let mut chunk_map: HashMap<usize, Vec<u8>> = chunks.into_iter().collect();
87-
let mut columns_array_iter = Vec::with_capacity(self.projection.len());
43+
let mut columns_array_iter = Vec::with_capacity(self.projected_schema.num_fields());
44+
45+
let column_leaves = &self.projected_column_leaves.column_leaves;
46+
let mut cnt_map = Self::build_projection_count_map(column_leaves);
8847

89-
let num_rows = part.nums_rows;
90-
let columns = self.projection.project_column_leaves(&self.column_leaves)?;
91-
let mut cnt_map = Self::build_projection_count_map(&columns);
92-
for column in &columns {
93-
let field = column.field.clone();
94-
let indices = &column.leaf_ids;
95-
let mut column_metas = Vec::with_capacity(indices.len());
96-
let mut column_chunks = Vec::with_capacity(indices.len());
97-
let mut column_descriptors = Vec::with_capacity(indices.len());
48+
for column_leaf in column_leaves {
49+
let indices = &column_leaf.leaf_ids;
50+
let mut metas = Vec::with_capacity(indices.len());
51+
let mut chunks = Vec::with_capacity(indices.len());
9852
for index in indices {
99-
let column_meta = &part.columns_meta[index];
53+
let column_meta = &part.column_metas[index];
10054
let cnt = cnt_map.get_mut(index).unwrap();
10155
*cnt -= 1;
10256
let column_chunk = if cnt > &mut 0 {
10357
chunk_map.get(index).unwrap().clone()
10458
} else {
10559
chunk_map.remove(index).unwrap()
10660
};
107-
let column_descriptor = &self.parquet_schema_descriptor.columns()[*index];
108-
column_metas.push(column_meta);
109-
column_chunks.push(column_chunk);
110-
column_descriptors.push(column_descriptor);
61+
let descriptor = &self.projected_column_descriptors[index];
62+
metas.push((column_meta, descriptor));
63+
chunks.push(column_chunk);
11164
}
11265
columns_array_iter.push(Self::to_array_iter(
113-
column_metas,
114-
column_chunks,
115-
num_rows,
116-
column_descriptors,
117-
field,
66+
metas,
67+
chunks,
68+
part.num_rows,
69+
column_leaf.field.clone(),
11870
)?);
11971
}
12072

121-
let mut deserializer = RowGroupDeserializer::new(columns_array_iter, num_rows, None);
73+
let mut deserializer = RowGroupDeserializer::new(columns_array_iter, part.num_rows, None);
12274

12375
self.try_next_block(&mut deserializer)
12476
}
12577

78+
/// The number of columns can be greater than 1 because the it may be a nested type.
79+
/// Combine multiple columns into one arrow array.
80+
fn to_array_iter(
81+
metas: Vec<(&ColumnMeta, &ColumnDescriptor)>,
82+
chunks: Vec<Vec<u8>>,
83+
rows: usize,
84+
field: Field,
85+
) -> Result<ArrayIter<'static>> {
86+
let (columns, types) = metas
87+
.iter()
88+
.zip(chunks.into_iter())
89+
.map(|(&(meta, descriptor), chunk)| {
90+
let pages = PageReader::new_with_page_meta(
91+
std::io::Cursor::new(chunk),
92+
PageMetaData {
93+
column_start: meta.offset,
94+
num_values: meta.length as i64,
95+
compression: meta.compression.into(),
96+
descriptor: descriptor.descriptor.clone(),
97+
},
98+
Arc::new(|_, _| true),
99+
vec![],
100+
usize::MAX,
101+
);
102+
(
103+
BasicDecompressor::new(pages, vec![]),
104+
&descriptor.descriptor.primitive_type,
105+
)
106+
})
107+
.unzip();
108+
109+
Ok(column_iter_to_arrays(
110+
columns,
111+
types,
112+
field,
113+
Some(rows),
114+
rows,
115+
)?)
116+
}
117+
126118
fn try_next_block(&self, deserializer: &mut RowGroupDeserializer) -> Result<DataBlock> {
127119
match deserializer.next() {
128120
None => Err(ErrorCode::Internal(
@@ -134,7 +126,7 @@ impl ParquetReader {
134126
}
135127

136128
// Build a map to record the count number of each leaf_id
137-
fn build_projection_count_map(columns: &Vec<&ColumnLeaf>) -> HashMap<usize, usize> {
129+
fn build_projection_count_map(columns: &[ColumnLeaf]) -> HashMap<usize, usize> {
138130
let mut cnt_map = HashMap::with_capacity(columns.len());
139131
for column in columns {
140132
for index in &column.leaf_ids {

0 commit comments

Comments
 (0)