Skip to content

Commit 5d79da6

Browse files
committed
feat: push down filter.
1 parent 4533c87 commit 5d79da6

File tree

6 files changed

+193
-13
lines changed

6 files changed

+193
-13
lines changed

Cargo.lock

Lines changed: 1 addition & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -142,3 +142,4 @@ rpath = false
142142
# If there are dependencies that need patching, they can be listed below.
143143
# For example:
144144
# arrow-format = { git = "https://github.com/datafuse-extras/arrow-format", rev = "78dacc1" }
145+
parquet2 = { git = "https://github.com/jorgecarleitao/parquet2", rev = "fb08b72" }

src/query/storages/parquet/src/parquet_reader/deserialize.rs

Lines changed: 86 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -15,12 +15,15 @@
1515
use std::collections::hash_map::Entry;
1616
use std::collections::HashMap;
1717
use std::sync::Arc;
18+
use std::sync::Mutex;
1819

20+
use common_arrow::arrow::bitmap::Bitmap;
1921
use common_arrow::arrow::datatypes::Field;
2022
use common_arrow::arrow::io::parquet::read::column_iter_to_arrays;
2123
use common_arrow::arrow::io::parquet::read::ArrayIter;
2224
use common_arrow::arrow::io::parquet::read::RowGroupDeserializer;
2325
use common_arrow::parquet::metadata::ColumnDescriptor;
26+
use common_arrow::parquet::page::CompressedPage;
2427
use common_arrow::parquet::read::BasicDecompressor;
2528
use common_arrow::parquet::read::PageMetaData;
2629
use common_arrow::parquet::read::PageReader;
@@ -29,6 +32,7 @@ use common_exception::ErrorCode;
2932
use common_exception::Result;
3033
use common_storage::ColumnLeaf;
3134

35+
use super::filter::FilterState;
3236
use crate::parquet_part::ColumnMeta;
3337
use crate::parquet_part::ParquetRowGroupPart;
3438
use crate::ParquetReader;
@@ -38,6 +42,7 @@ impl ParquetReader {
3842
&self,
3943
part: &ParquetRowGroupPart,
4044
chunks: Vec<(usize, Vec<u8>)>,
45+
filter: Option<Bitmap>,
4146
) -> Result<DataBlock> {
4247
let mut chunk_map: HashMap<usize, Vec<u8>> = chunks.into_iter().collect();
4348
let mut columns_array_iter = Vec::with_capacity(self.projected_schema.num_fields());
@@ -62,12 +67,18 @@ impl ParquetReader {
6267
metas.push((column_meta, descriptor));
6368
chunks.push(column_chunk);
6469
}
65-
columns_array_iter.push(Self::to_array_iter(
66-
metas,
67-
chunks,
68-
part.num_rows,
69-
column_leaf.field.clone(),
70-
)?);
70+
let array_iter = if let Some(ref bitmap) = filter {
71+
Self::to_array_iter_with_filter(
72+
metas,
73+
chunks,
74+
part.num_rows,
75+
column_leaf.field.clone(),
76+
bitmap.clone(),
77+
)?
78+
} else {
79+
Self::to_array_iter(metas, chunks, part.num_rows, column_leaf.field.clone())?
80+
};
81+
columns_array_iter.push(array_iter);
7182
}
7283

7384
let mut deserializer = RowGroupDeserializer::new(columns_array_iter, part.num_rows, None);
@@ -115,6 +126,75 @@ impl ParquetReader {
115126
)?)
116127
}
117128

129+
/// Almost the same as `to_array_iter`, but with a filter.
130+
fn to_array_iter_with_filter(
131+
metas: Vec<(&ColumnMeta, &ColumnDescriptor)>,
132+
chunks: Vec<Vec<u8>>,
133+
rows: usize,
134+
field: Field,
135+
filter: Bitmap,
136+
) -> Result<ArrayIter<'static>> {
137+
let (columns, types) = metas
138+
.iter()
139+
.zip(chunks.into_iter())
140+
.map(|(&(meta, descriptor), chunk)| {
141+
let filter_state = Arc::new(Mutex::new(FilterState::new(filter.clone())));
142+
let iter_filter_state = filter_state.clone();
143+
144+
let pages = PageReader::new_with_page_meta(
145+
std::io::Cursor::new(chunk),
146+
PageMetaData {
147+
column_start: meta.offset,
148+
num_values: meta.length as i64,
149+
compression: meta.compression.into(),
150+
descriptor: descriptor.descriptor.clone(),
151+
},
152+
Arc::new(move |_, header| {
153+
// If the bitmap for current page is all unset, skip it.
154+
let mut state = filter_state.lock().unwrap();
155+
let num_rows = header.num_values();
156+
let all_unset = state.range_all_unset(num_rows);
157+
if all_unset {
158+
// skip this page.
159+
state.advance(num_rows);
160+
}
161+
!all_unset
162+
}),
163+
vec![],
164+
usize::MAX,
165+
)
166+
.map(move |page| {
167+
page.map(|page| match page {
168+
CompressedPage::Data(mut page) => {
169+
let num_rows = page.num_values();
170+
let mut state = iter_filter_state.lock().unwrap();
171+
if state.range_all_unset(num_rows) {
172+
page.select_rows(vec![]);
173+
} else if !state.range_all_set(num_rows) {
174+
page.select_rows(state.convert_to_intervals(num_rows));
175+
};
176+
state.advance(num_rows);
177+
CompressedPage::Data(page)
178+
}
179+
CompressedPage::Dict(_) => page, // do nothing
180+
})
181+
});
182+
(
183+
BasicDecompressor::new(pages, vec![]),
184+
&descriptor.descriptor.primitive_type,
185+
)
186+
})
187+
.unzip();
188+
189+
Ok(column_iter_to_arrays(
190+
columns,
191+
types,
192+
field,
193+
Some(rows - filter.unset_bits()),
194+
rows,
195+
)?)
196+
}
197+
118198
fn try_next_block(&self, deserializer: &mut RowGroupDeserializer) -> Result<DataBlock> {
119199
match deserializer.next() {
120200
None => Err(ErrorCode::Internal(
Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
// Copyright 2022 Datafuse Labs.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
use common_arrow::arrow::bitmap::Bitmap;
16+
use common_arrow::parquet::indexes::Interval;
17+
18+
/// A wrapper of [`Bitmap`] with a position mark. It is used to filter rows when reading a parquet file.
19+
///
20+
/// If a whole page is filtered, there is no need to decompress the page.
21+
/// If a row is filtered, there is no need to decode the row.
22+
pub struct FilterState {
23+
bitmap: Bitmap,
24+
pos: usize,
25+
}
26+
27+
impl FilterState {
28+
pub fn new(bitmap: Bitmap) -> Self {
29+
Self { bitmap, pos: 0 }
30+
}
31+
32+
#[inline]
33+
pub fn advance(&mut self, num: usize) {
34+
self.pos += num;
35+
}
36+
37+
/// Return true if [`self.pos`, `self.pos + num`) are set.
38+
#[inline]
39+
pub fn range_all_set(&self, num: usize) -> bool {
40+
self.bitmap.null_count_range(self.pos, num) == 0
41+
}
42+
43+
/// Return true if [`self.pos`, `self.pos + num`) are unset.
44+
#[inline]
45+
pub fn range_all_unset(&self, num: usize) -> bool {
46+
self.bitmap.null_count_range(self.pos, num) == num
47+
}
48+
49+
/// Convert the valditiy of [`self.pos`, `self.pos + num`) to [`Interval`]s.
50+
pub fn convert_to_intervals(&self, num_rows: usize) -> Vec<Interval> {
51+
let mut res = vec![];
52+
let mut started = false;
53+
let mut start = 0;
54+
for (i, v) in self.bitmap.iter().skip(self.pos).take(num_rows).enumerate() {
55+
if v {
56+
if !started {
57+
start = i;
58+
started = true;
59+
}
60+
} else if started {
61+
res.push(Interval::new(start, i - start));
62+
started = false;
63+
}
64+
}
65+
66+
if started {
67+
res.push(Interval::new(start, num_rows - start));
68+
}
69+
70+
res
71+
}
72+
}

src/query/storages/parquet/src/parquet_reader/mod.rs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,8 +13,10 @@
1313
// limitations under the License.
1414

1515
mod deserialize;
16+
mod filter;
1617
mod meta;
1718
mod read;
19+
1820
use std::collections::HashMap;
1921
use std::collections::HashSet;
2022
use std::sync::Arc;
@@ -38,12 +40,14 @@ pub struct ParquetReader {
3840
/// The indices of columns need to read by this reader.
3941
///
4042
/// Use [`HashSet`] to avoid duplicate indices.
41-
/// Duplicate indices will exist when there are nested types.
43+
/// Duplicate indices will exist when there are nested types or
44+
/// select a same field multiple times.
4245
///
4346
/// For example:
4447
///
4548
/// ```sql
4649
/// select a, a.b, a.c from t;
50+
/// select a, b, a from t;
4751
/// ```
4852
columns_to_read: HashSet<usize>,
4953
/// The schema of the [`common_datablocks::DataBlock`] this reader produces.

src/query/storages/parquet/src/parquet_source.rs

Lines changed: 28 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,10 @@ use common_base::base::ProgressValues;
2020
use common_catalog::plan::PartInfoPtr;
2121
use common_catalog::table_context::TableContext;
2222
use common_datablocks::DataBlock;
23+
use common_datavalues::BooleanColumn;
2324
use common_datavalues::ColumnRef;
2425
use common_datavalues::DataSchemaRef;
26+
use common_datavalues::Series;
2527
use common_exception::ErrorCode;
2628
use common_exception::Result;
2729
use common_functions::scalars::FunctionContext;
@@ -93,7 +95,7 @@ impl ParquetSource {
9395
fn do_prewhere_filter(&mut self, part: PartInfoPtr, chunks: Vec<IndexedChunk>) -> Result<()> {
9496
let rg_part = ParquetRowGroupPart::from_part(&part)?;
9597
// deserialize prewhere data block first
96-
let data_block = self.prewhere_reader.deserialize(rg_part, chunks)?;
98+
let data_block = self.prewhere_reader.deserialize(rg_part, chunks, None)?;
9799
if let Some(filter) = self.prewhere_filter.as_ref() {
98100
// do filter
99101
let res = filter
@@ -128,6 +130,7 @@ impl ParquetSource {
128130
self.state =
129131
Generated(self.ctx.try_get_part(), block.resort(self.output_schema())?);
130132
} else {
133+
let data_block = DataBlock::filter_block(data_block, &filter)?;
131134
self.state = State::ReadDataRemain(part, PrewhereData { data_block, filter });
132135
}
133136
Ok(())
@@ -153,7 +156,28 @@ impl ParquetSource {
153156
let block = if chunks.is_empty() {
154157
prewhere_blocks
155158
} else if let Some(remain_reader) = self.remain_reader.as_ref() {
156-
let remain_block = remain_reader.deserialize(rg_part, chunks)?;
159+
// filter is already converted to non-null boolean column
160+
let remain_block = if filter.is_const() && filter.get_bool(0)? {
161+
// don't need filter
162+
remain_reader.deserialize(rg_part, chunks, None)?
163+
} else {
164+
let boolean_col = Series::check_get::<BooleanColumn>(&filter)?;
165+
let bitmap = boolean_col.values();
166+
if bitmap.unset_bits() == 0 {
167+
// don't need filter
168+
remain_reader.deserialize(rg_part, chunks, None)?
169+
} else {
170+
remain_reader.deserialize(rg_part, chunks, Some(bitmap.clone()))?
171+
}
172+
};
173+
assert!(
174+
prewhere_blocks.num_rows() == remain_block.num_rows(),
175+
"prewhere and remain blocks should have same row number. (prewhere: {}, remain: {})",
176+
prewhere_blocks.num_rows(),
177+
remain_block.num_rows()
178+
);
179+
180+
// Combine two blocks.
157181
for (col, field) in remain_block
158182
.columns()
159183
.iter()
@@ -171,11 +195,11 @@ impl ParquetSource {
171195
bytes: block.memory_size(),
172196
};
173197
self.scan_progress.incr(&progress_values);
174-
DataBlock::filter_block(block, &filter)?
198+
block
175199
} else {
176200
// There is only prewhere reader.
177201
assert!(self.remain_reader.is_none());
178-
let block = self.prewhere_reader.deserialize(rg_part, chunks)?;
202+
let block = self.prewhere_reader.deserialize(rg_part, chunks, None)?;
179203
let progress_values = ProgressValues {
180204
rows: block.num_rows(),
181205
bytes: block.memory_size(),

0 commit comments

Comments
 (0)