Skip to content

Commit a3b855a

Browse files
committed
Restruct mod parquet_reader.
1 parent e34ae24 commit a3b855a

File tree

3 files changed

+170
-124
lines changed

3 files changed

+170
-124
lines changed

src/query/storages/parquet/src/parquet_reader.rs renamed to src/query/storages/parquet/src/parquet_reader/deserialize.rs

Lines changed: 1 addition & 124 deletions
Original file line numberDiff line numberDiff line change
@@ -14,72 +14,27 @@
1414

1515
use std::collections::hash_map::Entry;
1616
use std::collections::HashMap;
17-
use std::collections::HashSet;
1817
use std::sync::Arc;
1918

2019
use common_arrow::arrow::datatypes::Field;
2120
use common_arrow::arrow::io::parquet::read::column_iter_to_arrays;
2221
use common_arrow::arrow::io::parquet::read::ArrayIter;
2322
use common_arrow::arrow::io::parquet::read::RowGroupDeserializer;
24-
use common_arrow::arrow::io::parquet::write::to_parquet_schema;
2523
use common_arrow::parquet::metadata::ColumnDescriptor;
26-
use common_arrow::parquet::metadata::SchemaDescriptor;
2724
use common_arrow::parquet::read::BasicDecompressor;
2825
use common_arrow::parquet::read::PageMetaData;
2926
use common_arrow::parquet::read::PageReader;
3027
use common_catalog::plan::PartInfoPtr;
31-
use common_catalog::plan::Projection;
3228
use common_datablocks::DataBlock;
33-
use common_datavalues::DataSchemaRef;
3429
use common_exception::ErrorCode;
3530
use common_exception::Result;
3631
use common_storage::ColumnLeaf;
37-
use common_storage::ColumnLeaves;
38-
use opendal::Object;
39-
use opendal::Operator;
4032

4133
use crate::ParquetColumnMeta;
4234
use crate::ParquetPartInfo;
43-
44-
#[derive(Clone)]
45-
pub struct ParquetReader {
46-
operator: Operator,
47-
projection: Projection,
48-
projected_schema: DataSchemaRef,
49-
column_leaves: ColumnLeaves,
50-
parquet_schema_descriptor: SchemaDescriptor,
51-
}
35+
use crate::ParquetReader;
5236

5337
impl ParquetReader {
54-
pub fn create(
55-
operator: Operator,
56-
schema: DataSchemaRef,
57-
projection: Projection,
58-
) -> Result<Arc<ParquetReader>> {
59-
let projected_schema = match projection {
60-
Projection::Columns(ref indices) => DataSchemaRef::new(schema.project(indices)),
61-
Projection::InnerColumns(ref path_indices) => {
62-
DataSchemaRef::new(schema.inner_project(path_indices))
63-
}
64-
};
65-
66-
let arrow_schema = schema.to_arrow();
67-
let parquet_schema_descriptor = to_parquet_schema(&arrow_schema)?;
68-
let column_leaves = ColumnLeaves::new_from_schema(&arrow_schema);
69-
70-
Ok(Arc::new(ParquetReader {
71-
operator,
72-
projection,
73-
projected_schema,
74-
parquet_schema_descriptor,
75-
column_leaves,
76-
}))
77-
}
78-
79-
pub fn schema(&self) -> DataSchemaRef {
80-
self.projected_schema.clone()
81-
}
82-
8338
fn to_array_iter(
8439
metas: Vec<&ParquetColumnMeta>,
8540
chunks: Vec<Vec<u8>>,
@@ -168,73 +123,6 @@ impl ParquetReader {
168123
self.try_next_block(&mut deserializer)
169124
}
170125

171-
pub async fn read_columns_data(&self, part: PartInfoPtr) -> Result<Vec<(usize, Vec<u8>)>> {
172-
let part = ParquetPartInfo::from_part(&part)?;
173-
let columns = self.projection.project_column_leaves(&self.column_leaves)?;
174-
let indices = Self::build_projection_indices(&columns);
175-
let mut join_handlers = Vec::with_capacity(indices.len());
176-
177-
for index in indices {
178-
let column_meta = &part.columns_meta[&index];
179-
join_handlers.push(Self::read_column(
180-
self.operator.object(&part.location),
181-
index,
182-
column_meta.offset,
183-
column_meta.length,
184-
));
185-
}
186-
187-
futures::future::try_join_all(join_handlers).await
188-
}
189-
190-
pub fn support_blocking_api(&self) -> bool {
191-
self.operator.metadata().can_blocking()
192-
}
193-
194-
pub fn sync_read_columns_data(&self, part: PartInfoPtr) -> Result<Vec<(usize, Vec<u8>)>> {
195-
let part = ParquetPartInfo::from_part(&part)?;
196-
197-
let columns = self.projection.project_column_leaves(&self.column_leaves)?;
198-
let indices = Self::build_projection_indices(&columns);
199-
let mut results = Vec::with_capacity(indices.len());
200-
201-
for index in indices {
202-
let column_meta = &part.columns_meta[&index];
203-
204-
let op = self.operator.clone();
205-
206-
let location = part.location.clone();
207-
let offset = column_meta.offset;
208-
let length = column_meta.length;
209-
210-
let result = Self::sync_read_column(op.object(&location), index, offset, length);
211-
results.push(result?);
212-
}
213-
214-
Ok(results)
215-
}
216-
217-
pub async fn read_column(
218-
o: Object,
219-
index: usize,
220-
offset: u64,
221-
length: u64,
222-
) -> Result<(usize, Vec<u8>)> {
223-
let chunk = o.range_read(offset..offset + length).await?;
224-
225-
Ok((index, chunk))
226-
}
227-
228-
pub fn sync_read_column(
229-
o: Object,
230-
index: usize,
231-
offset: u64,
232-
length: u64,
233-
) -> Result<(usize, Vec<u8>)> {
234-
let chunk = o.blocking_range_read(offset..offset + length)?;
235-
Ok((index, chunk))
236-
}
237-
238126
fn try_next_block(&self, deserializer: &mut RowGroupDeserializer) -> Result<DataBlock> {
239127
match deserializer.next() {
240128
None => Err(ErrorCode::Internal(
@@ -245,17 +133,6 @@ impl ParquetReader {
245133
}
246134
}
247135

248-
// Build non duplicate leaf_ids to avoid repeated read column from parquet
249-
fn build_projection_indices(columns: &Vec<&ColumnLeaf>) -> HashSet<usize> {
250-
let mut indices = HashSet::with_capacity(columns.len());
251-
for column in columns {
252-
for index in &column.leaf_ids {
253-
indices.insert(*index);
254-
}
255-
}
256-
indices
257-
}
258-
259136
// Build a map to record the count number of each leaf_id
260137
fn build_projection_count_map(columns: &Vec<&ColumnLeaf>) -> HashMap<usize, usize> {
261138
let mut cnt_map = HashMap::with_capacity(columns.len());
Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
// Copyright 2022 Datafuse Labs.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
use std::sync::Arc;
16+
17+
use common_arrow::arrow::io::parquet::write::to_parquet_schema;
18+
use common_arrow::parquet::metadata::SchemaDescriptor;
19+
use common_catalog::plan::Projection;
20+
use common_datavalues::DataSchemaRef;
21+
use common_exception::Result;
22+
use common_storage::ColumnLeaves;
23+
use opendal::Operator;
24+
25+
mod deserialize;
26+
mod read;
27+
28+
#[derive(Clone)]
29+
pub struct ParquetReader {
30+
operator: Operator,
31+
projection: Projection,
32+
projected_schema: DataSchemaRef,
33+
column_leaves: ColumnLeaves,
34+
parquet_schema_descriptor: SchemaDescriptor,
35+
}
36+
37+
impl ParquetReader {
38+
pub fn create(
39+
operator: Operator,
40+
schema: DataSchemaRef,
41+
projection: Projection,
42+
) -> Result<Arc<ParquetReader>> {
43+
let projected_schema = match projection {
44+
Projection::Columns(ref indices) => DataSchemaRef::new(schema.project(indices)),
45+
Projection::InnerColumns(ref path_indices) => {
46+
DataSchemaRef::new(schema.inner_project(path_indices))
47+
}
48+
};
49+
50+
let arrow_schema = schema.to_arrow();
51+
let parquet_schema_descriptor = to_parquet_schema(&arrow_schema)?;
52+
let column_leaves = ColumnLeaves::new_from_schema(&arrow_schema);
53+
54+
Ok(Arc::new(ParquetReader {
55+
operator,
56+
projection,
57+
projected_schema,
58+
parquet_schema_descriptor,
59+
column_leaves,
60+
}))
61+
}
62+
63+
pub fn schema(&self) -> DataSchemaRef {
64+
self.projected_schema.clone()
65+
}
66+
67+
pub fn support_blocking_api(&self) -> bool {
68+
self.operator.metadata().can_blocking()
69+
}
70+
}
Lines changed: 99 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,99 @@
1+
// Copyright 2022 Datafuse Labs.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
use std::collections::HashSet;
16+
17+
use common_catalog::plan::PartInfoPtr;
18+
use common_exception::Result;
19+
use common_storage::ColumnLeaf;
20+
use opendal::Object;
21+
22+
use crate::ParquetPartInfo;
23+
use crate::ParquetReader;
24+
25+
impl ParquetReader {
26+
pub async fn read_columns_data(&self, part: PartInfoPtr) -> Result<Vec<(usize, Vec<u8>)>> {
27+
let part = ParquetPartInfo::from_part(&part)?;
28+
let columns = self.projection.project_column_leaves(&self.column_leaves)?;
29+
let indices = Self::build_projection_indices(&columns);
30+
let mut join_handlers = Vec::with_capacity(indices.len());
31+
32+
for index in indices {
33+
let column_meta = &part.columns_meta[&index];
34+
join_handlers.push(Self::read_column(
35+
self.operator.object(&part.location),
36+
index,
37+
column_meta.offset,
38+
column_meta.length,
39+
));
40+
}
41+
42+
futures::future::try_join_all(join_handlers).await
43+
}
44+
45+
pub fn sync_read_columns_data(&self, part: PartInfoPtr) -> Result<Vec<(usize, Vec<u8>)>> {
46+
let part = ParquetPartInfo::from_part(&part)?;
47+
48+
let columns = self.projection.project_column_leaves(&self.column_leaves)?;
49+
let indices = Self::build_projection_indices(&columns);
50+
let mut results = Vec::with_capacity(indices.len());
51+
52+
for index in indices {
53+
let column_meta = &part.columns_meta[&index];
54+
55+
let op = self.operator.clone();
56+
57+
let location = part.location.clone();
58+
let offset = column_meta.offset;
59+
let length = column_meta.length;
60+
61+
let result = Self::sync_read_column(op.object(&location), index, offset, length);
62+
results.push(result?);
63+
}
64+
65+
Ok(results)
66+
}
67+
68+
pub async fn read_column(
69+
o: Object,
70+
index: usize,
71+
offset: u64,
72+
length: u64,
73+
) -> Result<(usize, Vec<u8>)> {
74+
let chunk = o.range_read(offset..offset + length).await?;
75+
76+
Ok((index, chunk))
77+
}
78+
79+
pub fn sync_read_column(
80+
o: Object,
81+
index: usize,
82+
offset: u64,
83+
length: u64,
84+
) -> Result<(usize, Vec<u8>)> {
85+
let chunk = o.blocking_range_read(offset..offset + length)?;
86+
Ok((index, chunk))
87+
}
88+
89+
// Build non duplicate leaf_ids to avoid repeated read column from parquet
90+
fn build_projection_indices(columns: &Vec<&ColumnLeaf>) -> HashSet<usize> {
91+
let mut indices = HashSet::with_capacity(columns.len());
92+
for column in columns {
93+
for index in &column.leaf_ids {
94+
indices.insert(*index);
95+
}
96+
}
97+
indices
98+
}
99+
}

0 commit comments

Comments
 (0)