Skip to content

feat: Add runtime bloom filter for merge into #14970

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 45 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
45 commits
Select commit Hold shift + click to select a range
7e8717d
enable stage table as non-local-table
JackTan25 Mar 5, 2024
e89fee3
Merge branch 'main' of https://github.com/datafuselabs/databend into …
JackTan25 Mar 15, 2024
e9010e3
add bloom index for merge into runtime filter
JackTan25 Mar 15, 2024
38cd7a7
Merge branch 'main' of https://github.com/datafuselabs/databend into …
JackTan25 Mar 15, 2024
2a51166
Merge branch 'main' of https://github.com/datafuselabs/databend into …
JackTan25 Mar 15, 2024
29641be
Merge branch 'main' of https://github.com/datafuselabs/databend into …
JackTan25 Mar 19, 2024
f1ad0cd
add merge_into_source_build_bloom_info
JackTan25 Mar 19, 2024
7bc84b8
Merge branch 'main' of https://github.com/datafuselabs/databend into …
JackTan25 Mar 19, 2024
5190213
Merge branch 'main' of https://github.com/datafuselabs/databend into …
JackTan25 Mar 20, 2024
d03501c
refactor hashed logic
JackTan25 Mar 20, 2024
7a41e4e
refactot build bloom info
JackTan25 Mar 20, 2024
3778c67
fix typos
JackTan25 Mar 20, 2024
69a5675
add runtime filter,need tests
JackTan25 Mar 20, 2024
1b659a0
Merge branch 'main' into add_runtime_bloom_filter_for_merge_into
JackTan25 Mar 20, 2024
60e963e
fix lint
JackTan25 Mar 20, 2024
ddd3776
Merge branch 'main' of https://github.com/datafuselabs/databend into …
JackTan25 Mar 21, 2024
a86e64f
add profile and tests
JackTan25 Mar 21, 2024
4291c82
fix bugs
JackTan25 Mar 21, 2024
dfdaa64
fix explain
JackTan25 Mar 21, 2024
60bf6b8
fix runtime
JackTan25 Mar 21, 2024
6dcf50a
ad tests and fix tokio runtime io
JackTan25 Mar 21, 2024
ab45aad
Merge branch 'main' into add_runtime_bloom_filter_for_merge_into
JackTan25 Mar 21, 2024
6a208df
fix lint
JackTan25 Mar 21, 2024
7f91710
Merge branch 'add_runtime_bloom_filter_for_merge_into' of https://git…
JackTan25 Mar 21, 2024
946e994
remove a.sql
JackTan25 Mar 21, 2024
885be2b
fix test
JackTan25 Mar 21, 2024
7636c95
remove useless codes
JackTan25 Mar 21, 2024
58e3a37
remove useless codes
JackTan25 Mar 21, 2024
113156f
Merge branch 'main' into add_runtime_bloom_filter_for_merge_into
JackTan25 Mar 21, 2024
a8a2971
add order test
JackTan25 Mar 21, 2024
a99ab13
Merge branch 'add_runtime_bloom_filter_for_merge_into' of https://git…
JackTan25 Mar 21, 2024
9a59269
Merge branch 'main' into add_runtime_bloom_filter_for_merge_into
JackTan25 Mar 22, 2024
6fa0190
fix empty hashkeys bugs
JackTan25 Mar 22, 2024
f845da8
Merge branch 'add_runtime_bloom_filter_for_merge_into' of https://git…
JackTan25 Mar 22, 2024
90433a2
Merge branch 'main' into add_runtime_bloom_filter_for_merge_into
JackTan25 Mar 22, 2024
8b44a29
fix lint
JackTan25 Mar 22, 2024
58d28dd
Merge branch 'add_runtime_bloom_filter_for_merge_into' of https://git…
JackTan25 Mar 22, 2024
7654129
Merge branch 'main' into add_runtime_bloom_filter_for_merge_into
JackTan25 Mar 23, 2024
a1b1b0a
Merge branch 'main' into add_runtime_bloom_filter_for_merge_into
JackTan25 Mar 23, 2024
788fca5
Merge remote-tracking branch 'origin/main' into merge-bloom & Resolves
dantengsky Mar 25, 2024
3deb681
lint
dantengsky Mar 25, 2024
63d9e00
Merge branch 'main' into add_runtime_bloom_filter_for_merge_into
JackTan25 Mar 30, 2024
5ba2c66
fix bloom, thanks @dantengsky
JackTan25 Mar 30, 2024
a12008e
Merge branch 'add_runtime_bloom_filter_for_merge_into' of https://git…
JackTan25 Mar 30, 2024
8aa5d5b
fix lint
JackTan25 Mar 30, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions src/common/base/src/runtime/profile/profiles.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ pub enum ProfileStatisticsName {
SpillReadBytes,
SpillReadTime,
RuntimeFilterPruneParts,
RuntimeFilterMergeIntoSourceBuildBloomPruneParts,
MemoryUsage,
}

Expand Down Expand Up @@ -236,6 +237,13 @@ pub fn get_statistics_desc() -> Arc<BTreeMap<ProfileStatisticsName, ProfileDesc>
unit: StatisticsUnit::Count,
plain_statistics: true,
}),
(ProfileStatisticsName::RuntimeFilterMergeIntoSourceBuildBloomPruneParts, ProfileDesc {
display_name: "parts pruned by merge into target table bloom filter",
desc: "The partitions pruned by merge into target table bloom filter",
index: ProfileStatisticsName::RuntimeFilterMergeIntoSourceBuildBloomPruneParts as usize,
unit: StatisticsUnit::Count,
plain_statistics: true,
}),
(ProfileStatisticsName::MemoryUsage, ProfileDesc {
display_name: "memory usage",
desc: "The real time memory usage",
Expand Down
37 changes: 33 additions & 4 deletions src/query/catalog/src/merge_into_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,12 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::sync::Arc;

use databend_storages_common_table_meta::meta::Location;

use crate::table::Table;

#[derive(Clone)]
pub enum MergeIntoJoinType {
Left,
Expand All @@ -23,14 +29,36 @@ pub enum MergeIntoJoinType {
NormalJoin,
}

// for now, we just support MergeIntoJoinType::Left to use MergeIntoBlockInfoHashTable in two situations:
// 1. distributed broadcast join and target table as build side.
// 2. in standalone mode and target table as build side.
// we will support Inner next, so the merge_into_join_type is only Left for current implementation in fact.
pub type MergeIntoSourceBuildSegments = Arc<Vec<Location>>;

// MergeIntoJoin is used in two cases:
// I. target build optimization:
// we should support MergeIntoJoinType::Left(need to support LeftInner,LeftAnti) to use MergeIntoBlockInfoHashTable in two situations:
// 1. distributed broadcast join and target table as build side (don't support this now).
// 2. in standalone mode and target table as build side (supported).
// 3. native(not-supported) and parquet(supported) format
// for the `target build optimization`, merge_into_join_type is only Left for current implementation in fact.And we
// don't support distributed mode and parquet. So only if it is a LeftJoin and non-dirtributed-merge-into(even if in distributed environment
// but the merge_into_optimizer can also give a non-dirtributed-merge-into) and uses parquet, the `target build optimization` can be enabled.
// II. source build runtime bloom filter.
// only if it's a RightJoin((need to support RightInner,RightAnti)),target_tbl_idx is not invalid. We can make sure it enables
// `source build runtime bloom filter`. And in this case the table_info and catalog_info must be some. We give an `assert` for this judge.
//
// So let's do a summary:
// In most cases, the MergeIntoJoin's `merge_into_join_type` is NormalJoin (even this is a real merge into join),because
// MergeIntoJoin is used to judge whether we enable target_build_optimizations or source build runtime bloom filter.
// Both of these optimizations all will check `merge_into_join_type` to judge if the correlated optimization is enabled.
// So we only `set_merge_into_join` when we use these two optimziations (of course there is an exception, the `merge_into_optimzier` can't distinct
// parquet and native, so it will think it's using parquet format in default, so if it's a native format in fact, the `interpreter_merge_into` will rollback it.
// But the modification is a little tricky. Because the `MergeIntoJoinType` doesn't contain any info about parquet or native,we set the target_table_index as
// DUMMY_TABLE_INDEX, but in fact it has a target_table_index. However it's ok to do this. Firstly we can do this to make sure the `target build optimization` is
// closed. And it won't cause a conflict with `source build runtime bloom filter`. Because for the `target build optimization`, it's target build, and for
// `source build runtime bloom filter`, it's source build).
pub struct MergeIntoJoin {
pub merge_into_join_type: MergeIntoJoinType,
pub is_distributed: bool,
pub target_tbl_idx: usize,
pub table: Option<Arc<dyn Table>>,
}

impl Default for MergeIntoJoin {
Expand All @@ -40,6 +68,7 @@ impl Default for MergeIntoJoin {
is_distributed: false,
// Invalid Index
target_tbl_idx: usize::MAX,
table: None,
}
}
}
21 changes: 20 additions & 1 deletion src/query/catalog/src/runtime_filter_info.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,20 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::sync::Arc;

use databend_common_expression::Expr;
use parking_lot::RwLock;
use xorf::BinaryFuse16;

pub type MergeIntoSourceBuildSiphashkeys = (Vec<String>, Arc<RwLock<Vec<Vec<u64>>>>);

#[derive(Clone, Debug, Default)]
pub struct RuntimeFilterInfo {
inlist: Vec<Expr<String>>,
min_max: Vec<Expr<String>>,
bloom: Vec<(String, BinaryFuse16)>,
siphashes: MergeIntoSourceBuildSiphashkeys,
}

impl RuntimeFilterInfo {
Expand All @@ -31,6 +37,16 @@ impl RuntimeFilterInfo {
self.bloom.push(bloom);
}

pub fn get_merge_into_source_build_siphashkeys(&self) -> MergeIntoSourceBuildSiphashkeys {
self.siphashes.clone()
}

pub fn add_merge_into_source_build_siphashkeys(&mut self, digests: (String, Vec<u64>)) {
self.siphashes.0.push(digests.0);
let mut borrow_hash_keys = self.siphashes.1.write();
borrow_hash_keys.push(digests.1)
}

pub fn add_min_max(&mut self, expr: Expr<String>) {
self.min_max.push(expr);
}
Expand Down Expand Up @@ -60,6 +76,9 @@ impl RuntimeFilterInfo {
}

pub fn is_empty(&self) -> bool {
self.inlist.is_empty() && self.bloom.is_empty() && self.min_max.is_empty()
self.inlist.is_empty()
&& self.bloom.is_empty()
&& self.min_max.is_empty()
&& self.siphashes.0.len() == 0
}
}
14 changes: 14 additions & 0 deletions src/query/catalog/src/table_context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,10 +56,12 @@ use xorf::BinaryFuse16;
use crate::catalog::Catalog;
use crate::cluster_info::Cluster;
use crate::merge_into_join::MergeIntoJoin;
use crate::merge_into_join::MergeIntoSourceBuildSegments;
use crate::plan::DataSourcePlan;
use crate::plan::PartInfoPtr;
use crate::plan::Partitions;
use crate::query_kind::QueryKind;
use crate::runtime_filter_info::MergeIntoSourceBuildSiphashkeys;
use crate::runtime_filter_info::RuntimeFilterInfo;
use crate::statistics::data_cache_statistics::DataCacheMetrics;
use crate::table::Table;
Expand Down Expand Up @@ -255,18 +257,30 @@ pub trait TableContext: Send + Sync {

fn get_query_profiles(&self) -> Vec<PlanProfile>;

// <ProbeSide Index, RuntimeFilterInfo>
fn set_runtime_filter(&self, filters: (usize, RuntimeFilterInfo));

fn set_merge_into_join(&self, join: MergeIntoJoin);

fn get_merge_into_join(&self) -> MergeIntoJoin;
// set the target table's segments
fn set_merge_into_source_build_segments(&self, segments: MergeIntoSourceBuildSegments);
// get the target table's segments
fn get_merge_into_source_build_segments(&self) -> MergeIntoSourceBuildSegments;

fn get_bloom_runtime_filter_with_id(&self, id: usize) -> Vec<(String, BinaryFuse16)>;

fn get_inlist_runtime_filter_with_id(&self, id: usize) -> Vec<Expr<String>>;

fn get_min_max_runtime_filter_with_id(&self, id: usize) -> Vec<Expr<String>>;

fn get_merge_into_source_build_siphashkeys_with_id(
&self,
id: usize,
) -> Option<MergeIntoSourceBuildSiphashkeys>;

fn get_merge_into_source_build_bloom_probe_keys(&self, id: usize) -> Vec<String>;

fn has_bloom_runtime_filters(&self, id: usize) -> bool;
fn txn_mgr(&self) -> TxnManagerRef;

Expand Down
34 changes: 34 additions & 0 deletions src/query/service/src/interpreters/interpreter_explain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,18 @@ use databend_common_expression::DataBlock;
use databend_common_expression::FromData;
use databend_common_pipeline_core::processors::PlanProfile;
use databend_common_sql::binder::ExplainConfig;
use databend_common_sql::binder::MergeIntoType;
use databend_common_sql::optimizer::ColumnSet;
use databend_common_sql::plans::UpdatePlan;
use databend_common_sql::BindContext;
use databend_common_sql::InsertInputSource;
use databend_common_sql::MetadataRef;
use databend_common_storages_factory::Table;
use databend_common_storages_fuse::FuseTable;
use databend_common_storages_result_cache::gen_result_cache_key;
use databend_common_storages_result_cache::ResultCacheReader;
use databend_common_users::UserApiProvider;
use databend_storages_common_table_meta::meta::TableSnapshot;

use super::InterpreterFactory;
use super::UpdateInterpreter;
Expand Down Expand Up @@ -159,6 +163,36 @@ impl Interpreter for ExplainInterpreter {
.await?
}
Plan::MergeInto(plan) => {
// if we enable merge_into_source_build_bloom, we should set the segments here
if !plan.change_join_order
&& matches!(plan.merge_type, MergeIntoType::FullOperation)
&& self
.ctx
.get_settings()
.get_enable_merge_into_source_build_bloom()?
{
let merge_into_join = self.ctx.get_merge_into_join();
let table = merge_into_join.table.as_ref().unwrap();
let fuse_table =
table.as_any().downcast_ref::<FuseTable>().ok_or_else(|| {
ErrorCode::Unimplemented(format!(
"table {}, engine type {}, does not support MERGE INTO",
table.name(),
table.get_table_info().engine(),
))
})?;
let base_snapshot =
fuse_table.read_table_snapshot().await?.unwrap_or_else(|| {
Arc::new(TableSnapshot::new_empty_snapshot(
fuse_table.schema().as_ref().clone(),
None,
))
});
self.ctx.set_merge_into_source_build_segments(Arc::new(
base_snapshot.segments.clone(),
));
}

self.explain_analyze(
&plan.input,
&plan.meta_data,
Expand Down
16 changes: 14 additions & 2 deletions src/query/service/src/interpreters/interpreter_merge_into.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ use std::sync::Arc;
use std::u64::MAX;

use databend_common_catalog::merge_into_join::MergeIntoJoin;
use databend_common_catalog::merge_into_join::MergeIntoJoinType;
use databend_common_catalog::table::TableExt;
use databend_common_exception::ErrorCode;
use databend_common_exception::Result;
Expand Down Expand Up @@ -181,7 +182,7 @@ impl MergeIntoInterpreter {

// for `target_build_optimization` we don't need to read rowId column. for now, there are two cases we don't read rowid:
// I. InsertOnly, the MergeIntoType is InsertOnly
// II. target build optimization for this pr. the MergeIntoType is MergeIntoType
// II. target build optimization for this pr. the MergeIntoType is FullOperation
let mut target_build_optimization =
matches!(self.plan.merge_type, MergeIntoType::FullOperation)
&& !self.plan.columns_set.contains(&self.plan.row_id_index);
Expand All @@ -199,6 +200,7 @@ impl MergeIntoInterpreter {
target_tbl_idx: DUMMY_TABLE_INDEX,
is_distributed: merge_into_join.is_distributed,
merge_into_join_type: merge_into_join.merge_into_join_type,
table: merge_into_join.table.clone(),
});
}
}
Expand Down Expand Up @@ -287,7 +289,6 @@ impl MergeIntoInterpreter {

let table_info = fuse_table.get_table_info().clone();
let catalog_ = self.ctx.get_catalog(catalog).await?;

// merge_into_source is used to recv join's datablocks and split them into macthed and not matched
// datablocks.
let merge_into_source = if !*distributed && extract_exchange {
Expand Down Expand Up @@ -435,6 +436,17 @@ impl MergeIntoInterpreter {
.enumerate()
.collect();

// try add catalog_info and table_info for `source_build_bloom_filter`
let merge_into_join = self.ctx.get_merge_into_join();
let source_build_bloom_filter = matches!(
merge_into_join.merge_into_join_type,
MergeIntoJoinType::Right
) && merge_into_join.target_tbl_idx != DUMMY_TABLE_INDEX;
if source_build_bloom_filter {
self.ctx
.set_merge_into_source_build_segments(Arc::new(base_snapshot.segments.clone()));
}

let commit_input = if !distributed {
// recv datablocks from matched upstream and unmatched upstream
// transform and append data
Expand Down
Loading