Skip to content

feat: experimental runtime bloom pruning #15382

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 3 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 13 additions & 5 deletions src/common/base/src/runtime/profile/profiles.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,8 @@ pub enum ProfileStatisticsName {
SpillReadCount,
SpillReadBytes,
SpillReadTime,
RuntimeFilterPruneParts,
RuntimeRangeFilterPrunedParts,
RuntimeBloomFilterPrunedParts,
MemoryUsage,
}

Expand Down Expand Up @@ -229,10 +230,17 @@ pub fn get_statistics_desc() -> Arc<BTreeMap<ProfileStatisticsName, ProfileDesc>
unit: StatisticsUnit::MillisSeconds,
plain_statistics: true,
}),
(ProfileStatisticsName::RuntimeFilterPruneParts, ProfileDesc {
display_name: "parts pruned by runtime filter",
desc: "The partitions pruned by runtime filter",
index: ProfileStatisticsName::RuntimeFilterPruneParts as usize,
(ProfileStatisticsName::RuntimeRangeFilterPrunedParts, ProfileDesc {
display_name: "parts pruned by runtime range filter",
desc: "The partitions pruned by runtime range filter",
index: ProfileStatisticsName::RuntimeRangeFilterPrunedParts as usize,
unit: StatisticsUnit::Count,
plain_statistics: true,
}),
(ProfileStatisticsName::RuntimeBloomFilterPrunedParts, ProfileDesc {
display_name: "parts pruned by runtime bloom filter",
desc: "The partitions pruned by runtime bloom filter",
index: ProfileStatisticsName::RuntimeBloomFilterPrunedParts as usize,
unit: StatisticsUnit::Count,
plain_statistics: true,
}),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -962,68 +962,69 @@ impl HashJoinBuildState {
}
// Generate min max filter using build column
let min_max = build_key_column.remove_nullable().domain();

let min_max_filter = match min_max {
Domain::Number(domain) => match domain {
NumberDomain::UInt8(simple_domain) => {
let min = Scalar::Number(NumberScalar::from(simple_domain.min));
let max = Scalar::Number(NumberScalar::from(simple_domain.max));
min_max_filter(min, max, probe_key)?
min_max_filter(&self.func_ctx, min, max, probe_key)?
}
NumberDomain::UInt16(simple_domain) => {
let min = Scalar::Number(NumberScalar::from(simple_domain.min));
let max = Scalar::Number(NumberScalar::from(simple_domain.max));
min_max_filter(min, max, probe_key)?
min_max_filter(&self.func_ctx, min, max, probe_key)?
}
NumberDomain::UInt32(simple_domain) => {
let min = Scalar::Number(NumberScalar::from(simple_domain.min));
let max = Scalar::Number(NumberScalar::from(simple_domain.max));
min_max_filter(min, max, probe_key)?
min_max_filter(&self.func_ctx, min, max, probe_key)?
}
NumberDomain::UInt64(simple_domain) => {
let min = Scalar::Number(NumberScalar::from(simple_domain.min));
let max = Scalar::Number(NumberScalar::from(simple_domain.max));
min_max_filter(min, max, probe_key)?
min_max_filter(&self.func_ctx, min, max, probe_key)?
}
NumberDomain::Int8(simple_domain) => {
let min = Scalar::Number(NumberScalar::from(simple_domain.min));
let max = Scalar::Number(NumberScalar::from(simple_domain.max));
min_max_filter(min, max, probe_key)?
min_max_filter(&self.func_ctx, min, max, probe_key)?
}
NumberDomain::Int16(simple_domain) => {
let min = Scalar::Number(NumberScalar::from(simple_domain.min));
let max = Scalar::Number(NumberScalar::from(simple_domain.max));
min_max_filter(min, max, probe_key)?
min_max_filter(&self.func_ctx, min, max, probe_key)?
}
NumberDomain::Int32(simple_domain) => {
let min = Scalar::Number(NumberScalar::from(simple_domain.min));
let max = Scalar::Number(NumberScalar::from(simple_domain.max));
min_max_filter(min, max, probe_key)?
min_max_filter(&self.func_ctx, min, max, probe_key)?
}
NumberDomain::Int64(simple_domain) => {
let min = Scalar::Number(NumberScalar::from(simple_domain.min));
let max = Scalar::Number(NumberScalar::from(simple_domain.max));
min_max_filter(min, max, probe_key)?
min_max_filter(&self.func_ctx, min, max, probe_key)?
}
NumberDomain::Float32(simple_domain) => {
let min = Scalar::Number(NumberScalar::from(simple_domain.min));
let max = Scalar::Number(NumberScalar::from(simple_domain.max));
min_max_filter(min, max, probe_key)?
min_max_filter(&self.func_ctx, min, max, probe_key)?
}
NumberDomain::Float64(simple_domain) => {
let min = Scalar::Number(NumberScalar::from(simple_domain.min));
let max = Scalar::Number(NumberScalar::from(simple_domain.max));
min_max_filter(min, max, probe_key)?
min_max_filter(&self.func_ctx, min, max, probe_key)?
}
},
Domain::String(domain) => {
let min = Scalar::String(domain.min);
let max = Scalar::String(domain.max.unwrap());
min_max_filter(min, max, probe_key)?
min_max_filter(&self.func_ctx, min, max, probe_key)?
}
Domain::Date(date_domain) => {
let min = Scalar::Date(date_domain.min);
let max = Scalar::Date(date_domain.max);
min_max_filter(min, max, probe_key)?
min_max_filter(&self.func_ctx, min, max, probe_key)?
}
_ => unreachable!(),
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ use databend_common_expression::type_check;
use databend_common_expression::types::AnyType;
use databend_common_expression::types::DataType;
use databend_common_expression::Column;
use databend_common_expression::ConstantFolder;
use databend_common_expression::DataBlock;
use databend_common_expression::DataField;
use databend_common_expression::DataSchemaRef;
Expand Down Expand Up @@ -233,10 +234,17 @@ where

// Generate min max runtime filter
pub(crate) fn min_max_filter(
func_ctx: &FunctionContext,
min: Scalar,
max: Scalar,
probe_key: &Expr<String>,
) -> Result<Option<Expr<String>>> {
if min == max {
// if min equals max, return a `eq` expression
// which can be used by both range filter and bloom filter
return eq_filter(func_ctx, min, probe_key);
}

if let Expr::ColumnRef {
span,
id,
Expand Down Expand Up @@ -283,3 +291,42 @@ pub(crate) fn min_max_filter(
}
Ok(None)
}

fn eq_filter(
func_ctx: &FunctionContext,
scalar: Scalar,
probe_key: &Expr<String>,
) -> Result<Option<Expr<String>>> {
if let Expr::ColumnRef {
span,
id,
data_type,
display_name,
} = probe_key
{
let raw_probe_key = RawExpr::ColumnRef {
span: *span,
id: id.to_string(),
data_type: data_type.clone(),
display_name: display_name.clone(),
};

let min = RawExpr::Constant { span: None, scalar };
let eq_func = RawExpr::FunctionCall {
span: None,
name: "eq".to_string(),
params: vec![],
args: vec![raw_probe_key.clone(), min],
};
let expr = type_check::check(&eq_func, &BUILTIN_FUNCTIONS)?;

// Fold
// `Cast { expr: Constant { scalar: .., data_type: T }, dest_type: Nullable(T) }`
// to
// `Constant { scalar: .., data_type: Nullable(T) }`
// so that the expression can be utilized by bloom filter
let (expr, _) = ConstantFolder::fold(&expr, func_ctx, &BUILTIN_FUNCTIONS);
return Ok(Some(expr));
}
Ok(None)
}
2 changes: 1 addition & 1 deletion src/query/service/src/test_kits/block_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ impl<'a> BlockWriter<'a> {

let bloom_index_cols = BloomIndexColumns::All;
let bloom_columns_map =
bloom_index_cols.bloom_index_fields(schema.clone(), BloomIndex::supported_type)?;
bloom_index_cols.bloom_index_fields(schema.as_ref(), BloomIndex::supported_type)?;
let maybe_bloom_index = BloomIndex::try_create(
FunctionContext::default(),
location.1,
Expand Down
21 changes: 18 additions & 3 deletions src/query/service/tests/it/storages/fuse/operations/read_plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,15 @@ fn test_to_partitions() -> Result<()> {
let column_nodes = ColumnNodes { column_nodes };

// CASE I: no projection
let (s, parts) = FuseTable::to_partitions(None, &blocks_metas, &column_nodes, None, None);
let bloom_index_cols = None;
let (s, parts) = FuseTable::to_partitions(
None,
&blocks_metas,
&column_nodes,
None,
None,
bloom_index_cols,
);
assert_eq!(parts.len(), num_of_block as usize);
let expected_block_size: u64 = cols_metas
.values()
Expand Down Expand Up @@ -140,8 +148,15 @@ fn test_to_partitions() -> Result<()> {
..Default::default()
});

let (stats, parts) =
FuseTable::to_partitions(None, &blocks_metas, &column_nodes, None, push_down);
let bloom_index_cols = None;
let (stats, parts) = FuseTable::to_partitions(
None,
&blocks_metas,
&column_nodes,
None,
push_down,
bloom_index_cols,
);
assert_eq!(parts.len(), num_of_block as usize);
assert_eq!(expected_block_size * num_of_block, stats.read_bytes as u64);

Expand Down
5 changes: 3 additions & 2 deletions src/query/sql/src/planner/bloom_index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,15 @@ use databend_common_expression::ComputedExpr;
use databend_common_expression::FieldIndex;
use databend_common_expression::TableDataType;
use databend_common_expression::TableField;
use databend_common_expression::TableSchema;
use databend_common_expression::TableSchemaRef;
use databend_common_meta_app::tenant::Tenant;
use databend_common_settings::Settings;

use crate::normalize_identifier;
use crate::planner::semantic::NameResolutionContext;

#[derive(Clone)]
#[derive(Clone, serde::Serialize, serde::Deserialize, PartialEq, Debug)]
pub enum BloomIndexColumns {
/// Default, all columns that support bloom index.
All,
Expand Down Expand Up @@ -111,7 +112,7 @@ impl BloomIndexColumns {
/// Get table field based on the BloomIndexColumns and schema.
pub fn bloom_index_fields<F>(
&self,
schema: TableSchemaRef,
schema: &TableSchema,
verify_type: F,
) -> Result<BTreeMap<FieldIndex, TableField>>
where
Expand Down
11 changes: 11 additions & 0 deletions src/query/storages/fuse/src/fuse_part.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,19 @@ use databend_common_exception::ErrorCode;
use databend_common_exception::Result;
use databend_common_expression::ColumnId;
use databend_common_expression::Scalar;
use databend_common_sql::BloomIndexColumns;
use databend_storages_common_pruner::BlockMetaIndex;
use databend_storages_common_table_meta::meta::ColumnMeta;
use databend_storages_common_table_meta::meta::ColumnStatistics;
use databend_storages_common_table_meta::meta::Compression;
use databend_storages_common_table_meta::meta::Location;

#[derive(serde::Serialize, serde::Deserialize, PartialEq, Debug)]
pub struct BloomIndexDescriptor {
pub bloom_index_location: Option<Location>,
pub bloom_index_size: u64,
pub bloom_index_cols: BloomIndexColumns,
}
/// Fuse table partition information.
#[derive(serde::Serialize, serde::Deserialize, PartialEq, Debug)]
pub struct FuseBlockPartInfo {
Expand All @@ -48,6 +55,8 @@ pub struct FuseBlockPartInfo {

pub sort_min_max: Option<(Scalar, Scalar)>,
pub block_meta_index: Option<BlockMetaIndex>,

pub bloom_index_descriptor: Option<BloomIndexDescriptor>,
}

#[typetag::serde(name = "fuse")]
Expand Down Expand Up @@ -84,6 +93,7 @@ impl FuseBlockPartInfo {
sort_min_max: Option<(Scalar, Scalar)>,
block_meta_index: Option<BlockMetaIndex>,
create_on: Option<DateTime<Utc>>,
bloom_index_descriptor: Option<BloomIndexDescriptor>,
) -> Arc<Box<dyn PartInfo>> {
Arc::new(Box::new(FuseBlockPartInfo {
location,
Expand All @@ -94,6 +104,7 @@ impl FuseBlockPartInfo {
sort_min_max,
block_meta_index,
columns_stat,
bloom_index_descriptor,
}))
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ impl AggIndexReader {
.enumerate()
.map(|(i, c)| (i as u32, ColumnMeta::Native(c)))
.collect();
let bloom_index_cols = None;
let part = FuseBlockPartInfo::create(
loc.to_string(),
num_rows,
Expand All @@ -55,6 +56,7 @@ impl AggIndexReader {
None,
None,
None,
bloom_index_cols,
);
let res = self
.reader
Expand Down Expand Up @@ -98,6 +100,7 @@ impl AggIndexReader {
.enumerate()
.map(|(i, c)| (i as u32, ColumnMeta::Native(c)))
.collect();
let bloom_index_cols = None;
let part = FuseBlockPartInfo::create(
loc.to_string(),
num_rows,
Expand All @@ -107,6 +110,7 @@ impl AggIndexReader {
None,
None,
None,
bloom_index_cols,
);
let res = self
.reader
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ impl AggIndexReader {
debug_assert_eq!(metadata.row_groups.len(), 1);
let row_group = &metadata.row_groups[0];
let columns_meta = build_columns_meta(row_group);
let bloom_index_cols = None;
let part = FuseBlockPartInfo::create(
loc.to_string(),
row_group.num_rows() as u64,
Expand All @@ -49,6 +50,7 @@ impl AggIndexReader {
None,
None,
None,
bloom_index_cols,
);
let res = self
.reader
Expand Down Expand Up @@ -90,6 +92,7 @@ impl AggIndexReader {
.await
.inspect_err(|e| debug!("Read aggregating index `{loc}` failed: {e}"))
.ok()?;
let bloom_index_cols = None;
let part = FuseBlockPartInfo::create(
loc.to_string(),
row_group.num_rows() as u64,
Expand All @@ -99,6 +102,7 @@ impl AggIndexReader {
None,
None,
None,
bloom_index_cols,
);
Some((part, res))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ impl VirtualColumnReader {
let (ranges, ignore_column_ids) = self.read_columns_meta(&schema, &columns_meta);

if !ranges.is_empty() {
let bloom_index_cols = None;
let part = FuseBlockPartInfo::create(
loc.to_string(),
row_group.num_rows() as u64,
Expand All @@ -91,6 +92,7 @@ impl VirtualColumnReader {
None,
None,
None,
bloom_index_cols,
);

let merge_io_result =
Expand Down Expand Up @@ -124,6 +126,7 @@ impl VirtualColumnReader {
let (ranges, ignore_column_ids) = self.read_columns_meta(&schema, &columns_meta);

if !ranges.is_empty() {
let bloom_index_cols = None;
let part = FuseBlockPartInfo::create(
loc.to_string(),
row_group.num_rows() as u64,
Expand All @@ -133,6 +136,7 @@ impl VirtualColumnReader {
None,
None,
None,
bloom_index_cols,
);

let merge_io_result = BlockReader::merge_io_read(
Expand Down
Loading