Skip to content

Commit e7b89b1

Browse files
committed
add rt bloom filter pruning to async native data source
1 parent 5c803bc commit e7b89b1

File tree

4 files changed

+22
-9
lines changed

4 files changed

+22
-9
lines changed

src/query/storages/fuse/src/operations/read/native_data_source_reader.rs

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ use crate::io::BlockReader;
3838
use crate::io::TableMetaLocationGenerator;
3939
use crate::io::VirtualColumnReader;
4040
use crate::operations::read::data_source_with_meta::DataSourceWithMeta;
41-
use crate::operations::read::runtime_filter_prunner::runtime_filter_pruner;
41+
use crate::operations::read::runtime_filter_prunner::{runtime_bloom_filter_pruner, runtime_range_filter_pruner};
4242
use crate::FuseBlockPartInfo;
4343

4444
pub struct ReadNativeDataSource<const BLOCKING_IO: bool> {
@@ -139,7 +139,7 @@ impl SyncSource for ReadNativeDataSource<true> {
139139
.ctx
140140
.get_min_max_runtime_filter_with_id(self.table_index),
141141
);
142-
if runtime_filter_pruner(
142+
if runtime_range_filter_pruner(
143143
self.table_schema.clone(),
144144
&part,
145145
&filters,
@@ -246,7 +246,7 @@ impl Processor for ReadNativeDataSource<false> {
246246
);
247247
let mut native_part_infos = Vec::with_capacity(parts.len());
248248
for part in parts.into_iter() {
249-
if runtime_filter_pruner(
249+
if runtime_range_filter_pruner(
250250
self.table_schema.clone(),
251251
&part,
252252
&filters,
@@ -255,6 +255,18 @@ impl Processor for ReadNativeDataSource<false> {
255255
continue;
256256
}
257257

258+
if runtime_bloom_filter_pruner(
259+
self.table_schema.clone(),
260+
&part,
261+
&filters,
262+
&self.func_ctx,
263+
&self.block_reader.operator,
264+
)
265+
.await?
266+
{
267+
continue;
268+
}
269+
258270
native_part_infos.push(part.clone());
259271
let block_reader = self.block_reader.clone();
260272
let index_reader = self.index_reader.clone();

src/query/storages/fuse/src/operations/read/parquet_data_source_reader.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ use crate::io::TableMetaLocationGenerator;
4141
use crate::io::VirtualColumnReader;
4242
use crate::operations::read::data_source_with_meta::DataSourceWithMeta;
4343
use crate::operations::read::runtime_filter_prunner::runtime_bloom_filter_pruner;
44-
use crate::operations::read::runtime_filter_prunner::runtime_filter_pruner;
44+
use crate::operations::read::runtime_filter_prunner::runtime_range_filter_pruner;
4545

4646
pub struct ReadParquetDataSource<const BLOCKING_IO: bool> {
4747
func_ctx: FunctionContext,
@@ -128,7 +128,7 @@ impl SyncSource for ReadParquetDataSource<true> {
128128
.ctx
129129
.get_min_max_runtime_filter_with_id(self.table_index),
130130
);
131-
if runtime_filter_pruner(
131+
if runtime_range_filter_pruner(
132132
self.table_schema.clone(),
133133
&part,
134134
&filters,
@@ -244,7 +244,7 @@ impl Processor for ReadParquetDataSource<false> {
244244
);
245245
let mut fuse_part_infos = Vec::with_capacity(parts.len());
246246
for part in parts.into_iter() {
247-
if runtime_filter_pruner(
247+
if runtime_range_filter_pruner(
248248
self.table_schema.clone(),
249249
&part,
250250
&filters,

src/query/storages/fuse/src/operations/read/runtime_filter_prunner.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ use xorf::Filter;
4444
use crate::pruning::BloomPrunerCreator;
4545
use crate::FuseBlockPartInfo;
4646

47-
pub fn runtime_filter_pruner(
47+
pub fn runtime_range_filter_pruner(
4848
table_schema: Arc<TableSchema>,
4949
part: &PartInfoPtr,
5050
filters: &[Expr<String>],
@@ -92,7 +92,7 @@ pub fn runtime_filter_pruner(
9292

9393
if pruned {
9494
info!(
95-
"Pruned partition with {:?} rows by runtime range filter",
95+
"Pruned partition with {:?} rows by runtime range filter",
9696
part.nums_rows
9797
);
9898
Profile::record_usize_profile(ProfileStatisticsName::RuntimeRangeFilterPrunedParts, 1);
@@ -139,7 +139,7 @@ pub async fn runtime_bloom_filter_pruner(
139139

140140
if pruned {
141141
info!(
142-
"Pruned partition with {:?} rows by runtime bloom filter",
142+
"Pruned partition with {:?} rows by runtime bloom filter",
143143
part.nums_rows
144144
);
145145
Profile::record_usize_profile(ProfileStatisticsName::RuntimeBloomFilterPrunedParts, 1);

src/query/storages/fuse/src/operations/read_partitions.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -551,6 +551,7 @@ impl FuseTable {
551551
// row_count should be a hint value of LIMIT,
552552
// not the count the rows in this partition
553553

554+
// duplicated code (TODO)
554555
let bloom_desc = bloom_index_cols.map(|v| BloomIndexDescriptor {
555556
bloom_index_location: meta.bloom_filter_index_location.clone(),
556557
bloom_index_size: meta.bloom_filter_index_size,

0 commit comments

Comments
 (0)