Skip to content

Commit 67e2885

Browse files
committed
add rt bloom filter profile metric
1 parent b924e8a commit 67e2885

File tree

10 files changed

+98
-30
lines changed

10 files changed

+98
-30
lines changed

src/common/base/src/runtime/profile/profiles.rs

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,8 @@ pub enum ProfileStatisticsName {
4343
SpillReadCount,
4444
SpillReadBytes,
4545
SpillReadTime,
46-
RuntimeFilterPruneParts,
46+
RuntimeRangeFilterPrunedParts,
47+
RuntimeBloomFilterPrunedParts,
4748
MemoryUsage,
4849
}
4950

@@ -229,10 +230,17 @@ pub fn get_statistics_desc() -> Arc<BTreeMap<ProfileStatisticsName, ProfileDesc>
229230
unit: StatisticsUnit::MillisSeconds,
230231
plain_statistics: true,
231232
}),
232-
(ProfileStatisticsName::RuntimeFilterPruneParts, ProfileDesc {
233-
display_name: "parts pruned by runtime filter",
234-
desc: "The partitions pruned by runtime filter",
235-
index: ProfileStatisticsName::RuntimeFilterPruneParts as usize,
233+
(ProfileStatisticsName::RuntimeRangeFilterPrunedParts, ProfileDesc {
234+
display_name: "parts pruned by runtime range filter",
235+
desc: "The partitions pruned by runtime range filter",
236+
index: ProfileStatisticsName::RuntimeRangeFilterPrunedParts as usize,
237+
unit: StatisticsUnit::Count,
238+
plain_statistics: true,
239+
}),
240+
(ProfileStatisticsName::RuntimeBloomFilterPrunedParts, ProfileDesc {
241+
display_name: "parts pruned by runtime bloom filter",
242+
desc: "The partitions pruned by runtime bloom filter",
243+
index: ProfileStatisticsName::RuntimeBloomFilterPrunedParts as usize,
236244
unit: StatisticsUnit::Count,
237245
plain_statistics: true,
238246
}),

src/query/service/tests/it/storages/fuse/operations/read_plan.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -112,7 +112,7 @@ fn test_to_partitions() -> Result<()> {
112112
let column_nodes = ColumnNodes { column_nodes };
113113

114114
// CASE I: no projection
115-
let (s, parts) = FuseTable::to_partitions(None, &blocks_metas, &column_nodes, None, None);
115+
let (s, parts) = FuseTable::to_partitions(None, &blocks_metas, &column_nodes, None, None, None);
116116
assert_eq!(parts.len(), num_of_block as usize);
117117
let expected_block_size: u64 = cols_metas
118118
.values()
@@ -141,7 +141,7 @@ fn test_to_partitions() -> Result<()> {
141141
});
142142

143143
let (stats, parts) =
144-
FuseTable::to_partitions(None, &blocks_metas, &column_nodes, None, push_down);
144+
FuseTable::to_partitions(None, &blocks_metas, &column_nodes, None, push_down, None);
145145
assert_eq!(parts.len(), num_of_block as usize);
146146
assert_eq!(expected_block_size * num_of_block, stats.read_bytes as u64);
147147

src/query/storages/fuse/src/io/read/block/block_reader.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -155,7 +155,8 @@ impl BlockReader {
155155
}
156156

157157
pub fn support_blocking_api(&self) -> bool {
158-
self.operator.info().native_capability().blocking
158+
// self.operator.info().native_capability().blocking
159+
false
159160
}
160161

161162
// Build non duplicate leaf_indices to avoid repeated read column from parquet

src/query/storages/fuse/src/operations/mutation/mutator/recluster_mutator.rs

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -253,8 +253,14 @@ impl ReclusterMutator {
253253
total_bytes: usize,
254254
level: i32,
255255
) {
256-
let (stats, parts) =
257-
FuseTable::to_partitions(Some(&self.schema), block_metas, column_nodes, None, None);
256+
let (stats, parts) = FuseTable::to_partitions(
257+
Some(&self.schema),
258+
block_metas,
259+
column_nodes,
260+
None,
261+
None,
262+
None,
263+
);
258264
let task = ReclusterTask {
259265
parts,
260266
stats,

src/query/storages/fuse/src/operations/read/native_rows_fetcher.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -229,6 +229,7 @@ impl<const BLOCKING_IO: bool> NativeRowsFetcher<BLOCKING_IO> {
229229
&column_nodes,
230230
None,
231231
&self.projection,
232+
None, // TODO bloom filer desc
232233
);
233234
self.part_map.insert(prefix, (part_info, page_size));
234235
}

src/query/storages/fuse/src/operations/read/parquet_data_source_reader.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -244,6 +244,7 @@ impl Processor for ReadParquetDataSource<false> {
244244
);
245245
let mut fuse_part_infos = Vec::with_capacity(parts.len());
246246
for part in parts.into_iter() {
247+
eprintln!("applying range filter");
247248
if runtime_filter_pruner(
248249
self.table_schema.clone(),
249250
&part,
@@ -253,6 +254,7 @@ impl Processor for ReadParquetDataSource<false> {
253254
continue;
254255
}
255256

257+
eprintln!("applying bloom filter");
256258
if runtime_bloom_filter_pruner(
257259
self.table_schema.clone(),
258260
&part,

src/query/storages/fuse/src/operations/read/parquet_rows_fetcher.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -209,6 +209,7 @@ impl<const BLOCKING_IO: bool> ParquetRowsFetcher<BLOCKING_IO> {
209209
&column_nodes,
210210
None,
211211
&self.projection,
212+
None,
212213
);
213214

214215
self.part_map.insert(prefix, part_info);

src/query/storages/fuse/src/operations/read/runtime_filter_prunner.rs

Lines changed: 20 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -92,10 +92,10 @@ pub fn runtime_filter_pruner(
9292

9393
if pruned {
9494
info!(
95-
"Pruned partition with {:?} rows by runtime filter",
95+
"Pruned partition with {:?} rows by runtime range filter",
9696
part.nums_rows
9797
);
98-
Profile::record_usize_profile(ProfileStatisticsName::RuntimeFilterPruneParts, 1);
98+
Profile::record_usize_profile(ProfileStatisticsName::RuntimeRangeFilterPrunedParts, 1);
9999
}
100100

101101
Ok(pruned)
@@ -110,6 +110,8 @@ pub async fn runtime_bloom_filter_pruner(
110110
) -> Result<bool> {
111111
let part = FuseBlockPartInfo::from_part(part)?;
112112

113+
let mut pruned = false;
114+
113115
if let Some(bloom_desc) = &part.bloom_index_descriptor {
114116
let index_location = bloom_desc.bloom_index_location.clone();
115117
let index_size = bloom_desc.bloom_index_size;
@@ -124,17 +126,31 @@ pub async fn runtime_bloom_filter_pruner(
124126
Some(filter_expr),
125127
bloom_index_cols.clone(),
126128
)? {
129+
eprintln!("got bloom pruner");
127130
let should_keep = bloom_pruner
128131
.should_keep(&index_location, index_size, column_ids.clone())
129132
.await;
130133
if !should_keep {
131-
return Ok(true);
134+
pruned = true;
135+
break;
132136
}
137+
} else {
138+
eprintln!("not suitable bloom pruner found")
133139
}
134140
}
141+
} else {
142+
eprintln!("not bloom_index_descriptor found");
143+
}
144+
145+
if pruned {
146+
info!(
147+
"Pruned partition with {:?} rows by runtime bloom filter",
148+
part.nums_rows
149+
);
150+
Profile::record_usize_profile(ProfileStatisticsName::RuntimeBloomFilterPrunedParts, 1);
135151
}
136152

137-
Ok(false)
153+
Ok(pruned)
138154
}
139155

140156
pub(crate) fn update_bitmap_with_bloom_filter(

src/query/storages/fuse/src/operations/read_partitions.rs

Lines changed: 46 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ use databend_common_exception::Result;
3030
use databend_common_expression::Scalar;
3131
use databend_common_expression::TableSchemaRef;
3232
use databend_common_sql::field_default_value;
33+
use databend_common_sql::BloomIndexColumns;
3334
use databend_common_storage::ColumnNodes;
3435
use databend_storages_common_cache::CacheAccessor;
3536
use databend_storages_common_cache_manager::CachedObject;
@@ -43,6 +44,7 @@ use opendal::Operator;
4344
use sha2::Digest;
4445
use sha2::Sha256;
4546

47+
use crate::fuse_part::BloomIndexDescriptor;
4648
use crate::fuse_part::FuseBlockPartInfo;
4749
use crate::pruning::create_segment_location_vector;
4850
use crate::pruning::FusePruner;
@@ -243,8 +245,14 @@ impl FuseTable {
243245
.map(|topk| field_default_value(ctx.clone(), &topk.field).map(|d| (topk, d)))
244246
.transpose()?;
245247

246-
let (mut statistics, parts) =
247-
Self::to_partitions(Some(&schema), block_metas, &column_nodes, top_k, push_downs);
248+
let (mut statistics, parts) = Self::to_partitions(
249+
Some(&schema),
250+
block_metas,
251+
&column_nodes,
252+
top_k,
253+
push_downs,
254+
Some(self.bloom_index_cols.clone()),
255+
);
248256

249257
// Update planner statistics.
250258
statistics.partitions_total = partitions_total;
@@ -266,6 +274,7 @@ impl FuseTable {
266274
column_nodes: &ColumnNodes,
267275
top_k: Option<(TopK, Scalar)>,
268276
push_downs: Option<PushDownInfo>,
277+
bloom_index_cols: Option<BloomIndexColumns>,
269278
) -> (PartStatistics, Partitions) {
270279
let limit = push_downs
271280
.as_ref()
@@ -311,15 +320,28 @@ impl FuseTable {
311320
}
312321

313322
let (mut statistics, mut partitions) = match &push_downs {
314-
None => Self::all_columns_partitions(schema, &block_metas, top_k.clone(), limit),
323+
None => Self::all_columns_partitions(
324+
schema,
325+
&block_metas,
326+
top_k.clone(),
327+
limit,
328+
bloom_index_cols,
329+
),
315330
Some(extras) => match &extras.projection {
316-
None => Self::all_columns_partitions(schema, &block_metas, top_k.clone(), limit),
331+
None => Self::all_columns_partitions(
332+
schema,
333+
&block_metas,
334+
top_k.clone(),
335+
limit,
336+
bloom_index_cols,
337+
),
317338
Some(projection) => Self::projection_partitions(
318339
&block_metas,
319340
column_nodes,
320341
projection,
321342
top_k.clone(),
322343
limit,
344+
bloom_index_cols,
323345
),
324346
},
325347
};
@@ -343,6 +365,7 @@ impl FuseTable {
343365
block_metas: &[(Option<BlockMetaIndex>, Arc<BlockMeta>)],
344366
top_k: Option<(TopK, Scalar)>,
345367
limit: usize,
368+
bloom_index_cols: Option<BloomIndexColumns>,
346369
) -> (PartStatistics, Partitions) {
347370
let mut statistics = PartStatistics::default_exact();
348371
let mut partitions = Partitions::create(PartitionsShuffleKind::Mod, vec![]);
@@ -359,6 +382,7 @@ impl FuseTable {
359382
block_meta_index,
360383
&top_k,
361384
block_meta,
385+
bloom_index_cols.clone(),
362386
));
363387
statistics.read_rows += rows;
364388
statistics.read_bytes += block_meta.block_size as usize;
@@ -383,6 +407,7 @@ impl FuseTable {
383407
projection: &Projection,
384408
top_k: Option<(TopK, Scalar)>,
385409
limit: usize,
410+
bloom_index_cols: Option<BloomIndexColumns>,
386411
) -> (PartStatistics, Partitions) {
387412
let mut statistics = PartStatistics::default_exact();
388413
let mut partitions = Partitions::default();
@@ -401,6 +426,7 @@ impl FuseTable {
401426
column_nodes,
402427
top_k.clone(),
403428
projection,
429+
bloom_index_cols.clone(),
404430
));
405431

406432
let rows = block_meta.row_count as usize;
@@ -434,6 +460,7 @@ impl FuseTable {
434460
block_meta_index: &Option<BlockMetaIndex>,
435461
top_k: &Option<(TopK, Scalar)>,
436462
meta: &BlockMeta,
463+
bloom_index_cols: Option<BloomIndexColumns>,
437464
) -> PartInfoPtr {
438465
let mut columns_meta = HashMap::with_capacity(meta.col_metas.len());
439466
let mut columns_stats = HashMap::with_capacity(meta.col_stats.len());
@@ -467,11 +494,14 @@ impl FuseTable {
467494
.unwrap_or((default.clone(), default.clone()))
468495
});
469496

470-
// let bloom_desc = BloomIndexDescriptor {
471-
// bloom_index_location: meta.bloom_filter_index_location.clone(),
472-
// bloom_index_size: meta.block_size,
473-
// bloom_index_cols: bloom_index_cols.clone(),
474-
//};
497+
let bloom_desc = {
498+
// TODO none if BloomIndexColumns::None
499+
bloom_index_cols.map(|v| BloomIndexDescriptor {
500+
bloom_index_location: meta.bloom_filter_index_location.clone(),
501+
bloom_index_size: meta.block_size,
502+
bloom_index_cols: v,
503+
})
504+
};
475505

476506
FuseBlockPartInfo::create(
477507
location,
@@ -482,7 +512,7 @@ impl FuseTable {
482512
sort_min_max,
483513
block_meta_index.to_owned(),
484514
create_on,
485-
None, // TODO
515+
bloom_desc,
486516
)
487517
}
488518

@@ -492,6 +522,7 @@ impl FuseTable {
492522
column_nodes: &ColumnNodes,
493523
top_k: Option<(TopK, Scalar)>,
494524
projection: &Projection,
525+
bloom_index_cols: Option<BloomIndexColumns>,
495526
) -> PartInfoPtr {
496527
let mut columns_meta = HashMap::with_capacity(projection.len());
497528
let mut columns_stat = HashMap::with_capacity(projection.len());
@@ -523,12 +554,11 @@ impl FuseTable {
523554
// row_count should be a hint value of LIMIT,
524555
// not the count the rows in this partition
525556

526-
// let bloom_desc = BloomIndexDescriptor {
527-
// bloom_index_location: meta.bloom_filter_index_location.clone(),
528-
// bloom_index_size: meta.block_size,
529-
// bloom_index_cols: bloom_index_cols.clone(),
530-
//};
531-
let bloom_desc = None;
557+
let bloom_desc = bloom_index_cols.map(|v| BloomIndexDescriptor {
558+
bloom_index_location: meta.bloom_filter_index_location.clone(),
559+
bloom_index_size: meta.block_size,
560+
bloom_index_cols: v,
561+
});
532562
FuseBlockPartInfo::create(
533563
location,
534564
rows_count,

src/query/storages/fuse/src/pruning/bloom_pruner.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,9 @@ impl BloomPrunerCreator {
7777
let bloom_column_fields = bloom_columns_map.values().cloned().collect::<Vec<_>>();
7878
let point_query_cols = BloomIndex::find_eq_columns(expr, bloom_column_fields)?;
7979

80+
eprintln!("is point_query empty? {}", point_query_cols.is_empty());
81+
eprintln!("expr ? {}", expr);
82+
8083
if !point_query_cols.is_empty() {
8184
// convert to filter column names
8285
let mut filter_fields = Vec::with_capacity(point_query_cols.len());

0 commit comments

Comments
 (0)