Skip to content

Commit e66061f

Browse files
committed
wip
1 parent 10dce5f commit e66061f

File tree

7 files changed

+56
-32
lines changed

7 files changed

+56
-32
lines changed

src/query/sql/src/planner/bloom_index.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ use databend_common_settings::Settings;
3232
use crate::normalize_identifier;
3333
use crate::planner::semantic::NameResolutionContext;
3434

35-
#[derive(Clone)]
35+
#[derive(Clone, serde::Serialize, serde::Deserialize, PartialEq, Debug)]
3636
pub enum BloomIndexColumns {
3737
/// Default, all columns that support bloom index.
3838
All,

src/query/storages/fuse/src/fuse_part.rs

Lines changed: 10 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -29,12 +29,19 @@ use databend_common_exception::ErrorCode;
2929
use databend_common_exception::Result;
3030
use databend_common_expression::ColumnId;
3131
use databend_common_expression::Scalar;
32+
use databend_common_sql::BloomIndexColumns;
3233
use databend_storages_common_pruner::BlockMetaIndex;
3334
use databend_storages_common_table_meta::meta::ColumnMeta;
3435
use databend_storages_common_table_meta::meta::ColumnStatistics;
3536
use databend_storages_common_table_meta::meta::Compression;
3637
use databend_storages_common_table_meta::meta::Location;
3738

39+
#[derive(serde::Serialize, serde::Deserialize, PartialEq, Debug)]
40+
pub struct BloomIndexDescriptor {
41+
pub bloom_index_location: Option<Location>,
42+
pub bloom_index_size: u64,
43+
pub bloom_index_cols: BloomIndexColumns,
44+
}
3845
/// Fuse table partition information.
3946
#[derive(serde::Serialize, serde::Deserialize, PartialEq, Debug)]
4047
pub struct FuseBlockPartInfo {
@@ -49,9 +56,7 @@ pub struct FuseBlockPartInfo {
4956
pub sort_min_max: Option<(Scalar, Scalar)>,
5057
pub block_meta_index: Option<BlockMetaIndex>,
5158

52-
pub bloom_index_location: Option<Location>,
53-
pub bloom_index_size: u64,
54-
pub bloom_index_column_ids: Vec<ColumnId>,
59+
pub bloom_index_descriptor: Option<BloomIndexDescriptor>,
5560
}
5661

5762
#[typetag::serde(name = "fuse")]
@@ -88,9 +93,7 @@ impl FuseBlockPartInfo {
8893
sort_min_max: Option<(Scalar, Scalar)>,
8994
block_meta_index: Option<BlockMetaIndex>,
9095
create_on: Option<DateTime<Utc>>,
91-
bloom_index_location: Option<Location>,
92-
bloom_index_size: u64,
93-
bloom_index_column_ids: Vec<ColumnId>,
96+
bloom_index_descriptor: Option<BloomIndexDescriptor>,
9497
) -> Arc<Box<dyn PartInfo>> {
9598
Arc::new(Box::new(FuseBlockPartInfo {
9699
location,
@@ -101,9 +104,7 @@ impl FuseBlockPartInfo {
101104
sort_min_max,
102105
block_meta_index,
103106
columns_stat,
104-
bloom_index_location,
105-
bloom_index_size,
106-
bloom_index_column_ids,
107+
bloom_index_descriptor,
107108
}))
108109
}
109110

src/query/storages/fuse/src/io/read/agg_index/agg_index_reader_native.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@ impl AggIndexReader {
5555
None,
5656
None,
5757
None,
58+
None,
5859
);
5960
let res = self
6061
.reader
@@ -107,6 +108,7 @@ impl AggIndexReader {
107108
None,
108109
None,
109110
None,
111+
None,
110112
);
111113
let res = self
112114
.reader

src/query/storages/fuse/src/io/read/agg_index/agg_index_reader_parquet.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@ impl AggIndexReader {
4949
None,
5050
None,
5151
None,
52+
None,
5253
);
5354
let res = self
5455
.reader
@@ -99,6 +100,7 @@ impl AggIndexReader {
99100
None,
100101
None,
101102
None,
103+
None,
102104
);
103105
Some((part, res))
104106
}

src/query/storages/fuse/src/io/read/virtual_column/virtual_column_reader_parquet.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,7 @@ impl VirtualColumnReader {
9191
None,
9292
None,
9393
None,
94+
None,
9495
);
9596

9697
let merge_io_result =
@@ -133,6 +134,7 @@ impl VirtualColumnReader {
133134
None,
134135
None,
135136
None,
137+
None,
136138
);
137139

138140
let merge_io_result = BlockReader::merge_io_read(

src/query/storages/fuse/src/operations/read/runtime_filter_prunner.rs

Lines changed: 18 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -119,23 +119,25 @@ pub async fn runtime_bloom_filter_pruner(
119119
let part = FuseBlockPartInfo::from_part(part)?;
120120

121121
let block_meta: BlockMeta;
122-
let index_location = part.bloom_filter_index_location.clone();
123-
let index_size = part.bloom_filter_index_size;
124-
let column_ids = part.columns_meta.keys().cloned().collect::<Vec<_>>();
122+
if let Some(bloom_desc) = &part.bloom_index_descriptor {
123+
let index_location = bloom_desc.bloom_index_location.clone();
124+
let index_size = bloom_desc.bloom_index_size;
125+
let column_ids = part.columns_meta.keys().cloned().collect::<Vec<_>>();
125126

126-
for filter_expr in filters {
127-
if let Some(bloom_pruner) = BloomPrunerCreator::create(
128-
func_ctx.clone(),
129-
&table_schema,
130-
dal.clone(),
131-
Some(filter_expr),
132-
bloom_index_cols.clone(),
133-
)? {
134-
let should_keep = bloom_pruner
135-
.should_keep(&index_location, index_size, column_ids.clone())
136-
.await;
137-
if !should_keep {
138-
return Ok(true);
127+
for filter_expr in filters {
128+
if let Some(bloom_pruner) = BloomPrunerCreator::create(
129+
func_ctx.clone(),
130+
&table_schema,
131+
dal.clone(),
132+
Some(filter_expr),
133+
bloom_index_cols.clone(),
134+
)? {
135+
let should_keep = bloom_pruner
136+
.should_keep(&index_location, index_size, column_ids.clone())
137+
.await;
138+
if !should_keep {
139+
return Ok(true);
140+
}
139141
}
140142
}
141143
}

src/query/storages/fuse/src/operations/read_partitions.rs

Lines changed: 21 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -27,22 +27,26 @@ use databend_common_catalog::plan::TopK;
2727
use databend_common_catalog::table::Table;
2828
use databend_common_catalog::table_context::TableContext;
2929
use databend_common_exception::Result;
30-
use databend_common_expression::{ColumnId, Scalar};
30+
use databend_common_expression::ColumnId;
31+
use databend_common_expression::Scalar;
3132
use databend_common_expression::TableSchemaRef;
3233
use databend_common_sql::field_default_value;
34+
use databend_common_sql::BloomIndexColumns;
3335
use databend_common_storage::ColumnNodes;
3436
use databend_storages_common_cache::CacheAccessor;
3537
use databend_storages_common_cache_manager::CachedObject;
3638
use databend_storages_common_pruner::BlockMetaIndex;
37-
use databend_storages_common_table_meta::meta::{BlockMeta, Location};
39+
use databend_storages_common_table_meta::meta::BlockMeta;
3840
use databend_storages_common_table_meta::meta::ColumnStatistics;
41+
use databend_storages_common_table_meta::meta::Location;
3942
use databend_storages_common_table_meta::table::ChangeType;
4043
use log::debug;
4144
use log::info;
4245
use opendal::Operator;
4346
use sha2::Digest;
4447
use sha2::Sha256;
4548

49+
use crate::fuse_part::BloomIndexDescriptor;
4650
use crate::fuse_part::FuseBlockPartInfo;
4751
use crate::pruning::create_segment_location_vector;
4852
use crate::pruning::FusePruner;
@@ -430,7 +434,6 @@ impl FuseTable {
430434
}
431435

432436
fn all_columns_part(
433-
&self,
434437
schema: Option<&TableSchemaRef>,
435438
block_meta_index: &Option<BlockMetaIndex>,
436439
top_k: &Option<(TopK, Scalar)>,
@@ -468,6 +471,12 @@ impl FuseTable {
468471
.unwrap_or((default.clone(), default.clone()))
469472
});
470473

474+
// let bloom_desc = BloomIndexDescriptor {
475+
// bloom_index_location: meta.bloom_filter_index_location.clone(),
476+
// bloom_index_size: meta.block_size,
477+
// bloom_index_cols: bloom_index_cols.clone(),
478+
//};
479+
471480
FuseBlockPartInfo::create(
472481
location,
473482
rows_count,
@@ -477,9 +486,7 @@ impl FuseTable {
477486
sort_min_max,
478487
block_meta_index.to_owned(),
479488
create_on,
480-
meta.bloom_filter_index_location.clone(),
481-
meta.bloom_filter_index_size,
482-
self.bloom_index_cols.clone(),
489+
None, // TODO
483490
)
484491
}
485492

@@ -519,6 +526,13 @@ impl FuseTable {
519526
// TODO
520527
// row_count should be a hint value of LIMIT,
521528
// not the count the rows in this partition
529+
530+
// let bloom_desc = BloomIndexDescriptor {
531+
// bloom_index_location: meta.bloom_filter_index_location.clone(),
532+
// bloom_index_size: meta.block_size,
533+
// bloom_index_cols: bloom_index_cols.clone(),
534+
//};
535+
let bloom_desc = None;
522536
FuseBlockPartInfo::create(
523537
location,
524538
rows_count,
@@ -528,6 +542,7 @@ impl FuseTable {
528542
sort_min_max,
529543
block_meta_index.to_owned(),
530544
create_on,
545+
bloom_desc,
531546
)
532547
}
533548
}

0 commit comments

Comments
 (0)