Skip to content

Commit 98577aa

Browse files
authored
Merge pull request #9199 from RinChanNOWWW/read-parquet-filter-push-down
feat: push down filter to parquet reader.
2 parents 288c3bb + 58b456d commit 98577aa

File tree

11 files changed

+322
-53
lines changed

11 files changed

+322
-53
lines changed

Cargo.lock

Lines changed: 1 addition & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -144,3 +144,4 @@ rpath = false
144144
# If there are dependencies that need patching, they can be listed below.
145145
# For example:
146146
# arrow-format = { git = "https://github.com/datafuse-extras/arrow-format", rev = "78dacc1" }
147+
parquet2 = { git = "https://github.com/jorgecarleitao/parquet2", rev = "fb08b72" }

src/common/arrow/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616

1717
mod parquet_read;
1818
mod parquet_write;
19+
pub mod schema_projection;
1920

2021
pub use arrow;
2122
pub use arrow_format;
Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
// Copyright 2022 Datafuse Labs.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
use std::collections::BTreeMap;
16+
17+
use arrow::datatypes::DataType;
18+
use arrow::datatypes::Field;
19+
use arrow::datatypes::Schema;
20+
21+
/// Project a [`Schema`] by picking the fields at the given indices.
22+
pub fn project(schema: &Schema, indices: &[usize]) -> Schema {
23+
let fields = indices
24+
.iter()
25+
.map(|idx| schema.fields[*idx].clone())
26+
.collect::<Vec<_>>();
27+
Schema::with_metadata(fields.into(), schema.metadata.clone())
28+
}
29+
30+
/// Project a [`Schema`] with inner columns by path.
31+
pub fn inner_project(schema: &Schema, path_indices: &BTreeMap<usize, Vec<usize>>) -> Schema {
32+
let paths: Vec<Vec<usize>> = path_indices.values().cloned().collect();
33+
let fields = paths
34+
.iter()
35+
.map(|path| traverse_paths(&schema.fields, path))
36+
.collect::<Vec<_>>();
37+
Schema::with_metadata(fields.into(), schema.metadata.clone())
38+
}
39+
40+
fn traverse_paths(fields: &[Field], path: &[usize]) -> Field {
41+
assert!(!path.is_empty());
42+
let field = &fields[path[0]];
43+
if path.len() == 1 {
44+
return field.clone();
45+
}
46+
if let DataType::Struct(inner_fields) = field.data_type() {
47+
return traverse_paths(inner_fields, &path[1..]);
48+
}
49+
unreachable!("Unable to get field paths. Fields: {:?}", fields);
50+
}

src/query/storages/parquet/src/parquet_reader/deserialize.rs

Lines changed: 88 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -15,12 +15,15 @@
1515
use std::collections::hash_map::Entry;
1616
use std::collections::HashMap;
1717
use std::sync::Arc;
18+
use std::sync::Mutex;
1819

20+
use common_arrow::arrow::bitmap::Bitmap;
1921
use common_arrow::arrow::datatypes::Field;
2022
use common_arrow::arrow::io::parquet::read::column_iter_to_arrays;
2123
use common_arrow::arrow::io::parquet::read::ArrayIter;
2224
use common_arrow::arrow::io::parquet::read::RowGroupDeserializer;
2325
use common_arrow::parquet::metadata::ColumnDescriptor;
26+
use common_arrow::parquet::page::CompressedPage;
2427
use common_arrow::parquet::read::BasicDecompressor;
2528
use common_arrow::parquet::read::PageMetaData;
2629
use common_arrow::parquet::read::PageReader;
@@ -29,6 +32,7 @@ use common_exception::ErrorCode;
2932
use common_exception::Result;
3033
use common_storage::ColumnLeaf;
3134

35+
use super::filter::FilterState;
3236
use crate::parquet_part::ColumnMeta;
3337
use crate::parquet_part::ParquetRowGroupPart;
3438
use crate::ParquetReader;
@@ -38,9 +42,10 @@ impl ParquetReader {
3842
&self,
3943
part: &ParquetRowGroupPart,
4044
chunks: Vec<(usize, Vec<u8>)>,
45+
filter: Option<Bitmap>,
4146
) -> Result<DataBlock> {
4247
let mut chunk_map: HashMap<usize, Vec<u8>> = chunks.into_iter().collect();
43-
let mut columns_array_iter = Vec::with_capacity(self.projected_schema.num_fields());
48+
let mut columns_array_iter = Vec::with_capacity(self.projected_arrow_schema.fields.len());
4449

4550
let column_leaves = &self.projected_column_leaves.column_leaves;
4651
let mut cnt_map = Self::build_projection_count_map(column_leaves);
@@ -62,12 +67,18 @@ impl ParquetReader {
6267
metas.push((column_meta, descriptor));
6368
chunks.push(column_chunk);
6469
}
65-
columns_array_iter.push(Self::to_array_iter(
66-
metas,
67-
chunks,
68-
part.num_rows,
69-
column_leaf.field.clone(),
70-
)?);
70+
let array_iter = if let Some(ref bitmap) = filter {
71+
Self::to_array_iter_with_filter(
72+
metas,
73+
chunks,
74+
part.num_rows,
75+
column_leaf.field.clone(),
76+
bitmap.clone(),
77+
)?
78+
} else {
79+
Self::to_array_iter(metas, chunks, part.num_rows, column_leaf.field.clone())?
80+
};
81+
columns_array_iter.push(array_iter);
7182
}
7283

7384
let mut deserializer = RowGroupDeserializer::new(columns_array_iter, part.num_rows, None);
@@ -115,13 +126,82 @@ impl ParquetReader {
115126
)?)
116127
}
117128

129+
/// Almost the same as `to_array_iter`, but with a filter.
130+
fn to_array_iter_with_filter(
131+
metas: Vec<(&ColumnMeta, &ColumnDescriptor)>,
132+
chunks: Vec<Vec<u8>>,
133+
rows: usize,
134+
field: Field,
135+
filter: Bitmap,
136+
) -> Result<ArrayIter<'static>> {
137+
let (columns, types) = metas
138+
.iter()
139+
.zip(chunks.into_iter())
140+
.map(|(&(meta, descriptor), chunk)| {
141+
let filter_state = Arc::new(Mutex::new(FilterState::new(filter.clone())));
142+
let iter_filter_state = filter_state.clone();
143+
144+
let pages = PageReader::new_with_page_meta(
145+
std::io::Cursor::new(chunk),
146+
PageMetaData {
147+
column_start: meta.offset,
148+
num_values: meta.length as i64,
149+
compression: meta.compression.into(),
150+
descriptor: descriptor.descriptor.clone(),
151+
},
152+
Arc::new(move |_, header| {
153+
// If the bitmap for current page is all unset, skip it.
154+
let mut state = filter_state.lock().unwrap();
155+
let num_rows = header.num_values();
156+
let all_unset = state.range_all_unset(num_rows);
157+
if all_unset {
158+
// skip this page.
159+
state.advance(num_rows);
160+
}
161+
!all_unset
162+
}),
163+
vec![],
164+
usize::MAX,
165+
)
166+
.map(move |page| {
167+
page.map(|page| match page {
168+
CompressedPage::Data(mut page) => {
169+
let num_rows = page.num_values();
170+
let mut state = iter_filter_state.lock().unwrap();
171+
if state.range_all_unset(num_rows) {
172+
page.select_rows(vec![]);
173+
} else if !state.range_all_set(num_rows) {
174+
page.select_rows(state.convert_to_intervals(num_rows));
175+
};
176+
state.advance(num_rows);
177+
CompressedPage::Data(page)
178+
}
179+
CompressedPage::Dict(_) => page, // do nothing
180+
})
181+
});
182+
(
183+
BasicDecompressor::new(pages, vec![]),
184+
&descriptor.descriptor.primitive_type,
185+
)
186+
})
187+
.unzip();
188+
189+
Ok(column_iter_to_arrays(
190+
columns,
191+
types,
192+
field,
193+
Some(rows - filter.unset_bits()),
194+
rows,
195+
)?)
196+
}
197+
118198
fn try_next_block(&self, deserializer: &mut RowGroupDeserializer) -> Result<DataBlock> {
119199
match deserializer.next() {
120200
None => Err(ErrorCode::Internal(
121201
"deserializer from row group: fail to get a chunk",
122202
)),
123203
Some(Err(cause)) => Err(ErrorCode::from(cause)),
124-
Some(Ok(chunk)) => DataBlock::from_chunk(&self.projected_schema, &chunk),
204+
Some(Ok(chunk)) => DataBlock::from_chunk(&self.output_schema, &chunk),
125205
}
126206
}
127207

Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
// Copyright 2022 Datafuse Labs.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
use common_arrow::arrow::bitmap::Bitmap;
16+
use common_arrow::parquet::indexes::Interval;
17+
18+
/// A wrapper of [`Bitmap`] with a position mark. It is used to filter rows when reading a parquet file.
19+
///
20+
/// If a whole page is filtered, there is no need to decompress the page.
21+
/// If a row is filtered, there is no need to decode the row.
22+
pub struct FilterState {
23+
bitmap: Bitmap,
24+
pos: usize,
25+
}
26+
27+
impl FilterState {
28+
pub fn new(bitmap: Bitmap) -> Self {
29+
Self { bitmap, pos: 0 }
30+
}
31+
32+
#[inline]
33+
pub fn advance(&mut self, num: usize) {
34+
self.pos += num;
35+
}
36+
37+
/// Return true if [`self.pos`, `self.pos + num`) are set.
38+
#[inline]
39+
pub fn range_all_set(&self, num: usize) -> bool {
40+
self.bitmap.null_count_range(self.pos, num) == 0
41+
}
42+
43+
/// Return true if [`self.pos`, `self.pos + num`) are unset.
44+
#[inline]
45+
pub fn range_all_unset(&self, num: usize) -> bool {
46+
self.bitmap.null_count_range(self.pos, num) == num
47+
}
48+
49+
/// Convert the valditiy of [`self.pos`, `self.pos + num`) to [`Interval`]s.
50+
pub fn convert_to_intervals(&self, num_rows: usize) -> Vec<Interval> {
51+
let mut res = vec![];
52+
let mut started = false;
53+
let mut start = 0;
54+
for (i, v) in self.bitmap.iter().skip(self.pos).take(num_rows).enumerate() {
55+
if v {
56+
if !started {
57+
start = i;
58+
started = true;
59+
}
60+
} else if started {
61+
res.push(Interval::new(start, i - start));
62+
started = false;
63+
}
64+
}
65+
66+
if started {
67+
res.push(Interval::new(start, num_rows - start));
68+
}
69+
70+
res
71+
}
72+
}

src/query/storages/parquet/src/parquet_reader/meta.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -14,9 +14,9 @@
1414

1515
use std::fs::File;
1616

17+
use common_arrow::arrow::datatypes::Schema as ArrowSchema;
1718
use common_arrow::arrow::io::parquet::read as pread;
1819
use common_arrow::parquet::metadata::FileMetaData;
19-
use common_datavalues::DataSchema;
2020
use common_exception::ErrorCode;
2121
use common_exception::Result;
2222

@@ -36,11 +36,11 @@ impl ParquetReader {
3636
}
3737

3838
#[inline]
39-
pub fn infer_schema(meta: &FileMetaData) -> Result<DataSchema> {
39+
pub fn infer_schema(meta: &FileMetaData) -> Result<ArrowSchema> {
4040
let mut arrow_schema = pread::infer_schema(meta)?;
4141
arrow_schema.fields.iter_mut().for_each(|f| {
4242
f.name = f.name.to_lowercase();
4343
});
44-
Ok(DataSchema::from(arrow_schema))
44+
Ok(arrow_schema)
4545
}
4646
}

0 commit comments

Comments
 (0)