Skip to content

Commit 10dce5f

Browse files
committed
wip
1 parent 8e67459 commit 10dce5f

File tree

3 files changed

+65
-7
lines changed

3 files changed

+65
-7
lines changed

src/query/storages/fuse/src/fuse_part.rs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,10 @@ pub struct FuseBlockPartInfo {
4848

4949
pub sort_min_max: Option<(Scalar, Scalar)>,
5050
pub block_meta_index: Option<BlockMetaIndex>,
51+
52+
pub bloom_index_location: Option<Location>,
53+
pub bloom_index_size: u64,
54+
pub bloom_index_column_ids: Vec<ColumnId>,
5155
}
5256

5357
#[typetag::serde(name = "fuse")]
@@ -84,6 +88,9 @@ impl FuseBlockPartInfo {
8488
sort_min_max: Option<(Scalar, Scalar)>,
8589
block_meta_index: Option<BlockMetaIndex>,
8690
create_on: Option<DateTime<Utc>>,
91+
bloom_index_location: Option<Location>,
92+
bloom_index_size: u64,
93+
bloom_index_column_ids: Vec<ColumnId>,
8794
) -> Arc<Box<dyn PartInfo>> {
8895
Arc::new(Box::new(FuseBlockPartInfo {
8996
location,
@@ -94,6 +101,9 @@ impl FuseBlockPartInfo {
94101
sort_min_max,
95102
block_meta_index,
96103
columns_stat,
104+
bloom_index_location,
105+
bloom_index_size,
106+
bloom_index_column_ids,
97107
}))
98108
}
99109

src/query/storages/fuse/src/operations/read/runtime_filter_prunner.rs

Lines changed: 49 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -35,11 +35,15 @@ use databend_common_expression::Scalar;
3535
use databend_common_expression::TableSchema;
3636
use databend_common_functions::BUILTIN_FUNCTIONS;
3737
use databend_common_hashtable::FastHash;
38+
use databend_common_sql::BloomIndexColumns;
3839
use databend_storages_common_index::statistics_to_domain;
40+
use databend_storages_common_table_meta::meta::BlockMeta;
3941
use log::info;
42+
use opendal::Operator;
4043
use xorf::BinaryFuse16;
4144
use xorf::Filter;
4245

46+
use crate::pruning::BloomPrunerCreator;
4347
use crate::FuseBlockPartInfo;
4448

4549
pub fn runtime_filter_pruner(
@@ -52,6 +56,7 @@ pub fn runtime_filter_pruner(
5256
return Ok(false);
5357
}
5458
let part = FuseBlockPartInfo::from_part(part)?;
59+
eprintln!("filters {:#?}", filters);
5560
let pruned = filters.iter().any(|filter| {
5661
let column_refs = filter.column_refs();
5762
// Currently only support filter with one column(probe key).
@@ -90,15 +95,54 @@ pub fn runtime_filter_pruner(
9095
false
9196
});
9297

93-
info!(
94-
"Pruned partition with {:?} rows by runtime filter",
95-
part.nums_rows
96-
);
97-
Profile::record_usize_profile(ProfileStatisticsName::RuntimeFilterPruneParts, 1);
98+
if pruned {
99+
info!(
100+
"Pruned partition with {:?} rows by runtime filter",
101+
part.nums_rows
102+
);
103+
Profile::record_usize_profile(ProfileStatisticsName::RuntimeFilterPruneParts, 1);
104+
} else {
105+
Profile::record_usize_profile(ProfileStatisticsName::RuntimeFilterPruneParts, 0);
106+
}
98107

99108
Ok(pruned)
100109
}
101110

111+
pub async fn runtime_bloom_filter_pruner(
112+
table_schema: Arc<TableSchema>,
113+
part: &PartInfoPtr,
114+
filters: &[Expr<String>],
115+
func_ctx: &FunctionContext,
116+
dal: Operator,
117+
bloom_index_cols: BloomIndexColumns,
118+
) -> Result<bool> {
119+
let part = FuseBlockPartInfo::from_part(part)?;
120+
121+
let block_meta: BlockMeta;
122+
let index_location = part.bloom_filter_index_location.clone();
123+
let index_size = part.bloom_filter_index_size;
124+
let column_ids = part.columns_meta.keys().cloned().collect::<Vec<_>>();
125+
126+
for filter_expr in filters {
127+
if let Some(bloom_pruner) = BloomPrunerCreator::create(
128+
func_ctx.clone(),
129+
&table_schema,
130+
dal.clone(),
131+
Some(filter_expr),
132+
bloom_index_cols.clone(),
133+
)? {
134+
let should_keep = bloom_pruner
135+
.should_keep(&index_location, index_size, column_ids.clone())
136+
.await;
137+
if !should_keep {
138+
return Ok(true);
139+
}
140+
}
141+
}
142+
143+
Ok(false)
144+
}
145+
102146
pub(crate) fn update_bitmap_with_bloom_filter(
103147
column: Column,
104148
filter: &BinaryFuse16,

src/query/storages/fuse/src/operations/read_partitions.rs

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,14 +27,14 @@ use databend_common_catalog::plan::TopK;
2727
use databend_common_catalog::table::Table;
2828
use databend_common_catalog::table_context::TableContext;
2929
use databend_common_exception::Result;
30-
use databend_common_expression::Scalar;
30+
use databend_common_expression::{ColumnId, Scalar};
3131
use databend_common_expression::TableSchemaRef;
3232
use databend_common_sql::field_default_value;
3333
use databend_common_storage::ColumnNodes;
3434
use databend_storages_common_cache::CacheAccessor;
3535
use databend_storages_common_cache_manager::CachedObject;
3636
use databend_storages_common_pruner::BlockMetaIndex;
37-
use databend_storages_common_table_meta::meta::BlockMeta;
37+
use databend_storages_common_table_meta::meta::{BlockMeta, Location};
3838
use databend_storages_common_table_meta::meta::ColumnStatistics;
3939
use databend_storages_common_table_meta::table::ChangeType;
4040
use log::debug;
@@ -430,6 +430,7 @@ impl FuseTable {
430430
}
431431

432432
fn all_columns_part(
433+
&self,
433434
schema: Option<&TableSchemaRef>,
434435
block_meta_index: &Option<BlockMetaIndex>,
435436
top_k: &Option<(TopK, Scalar)>,
@@ -476,6 +477,9 @@ impl FuseTable {
476477
sort_min_max,
477478
block_meta_index.to_owned(),
478479
create_on,
480+
meta.bloom_filter_index_location.clone(),
481+
meta.bloom_filter_index_size,
482+
self.bloom_index_cols.clone(),
479483
)
480484
}
481485

0 commit comments

Comments
 (0)