diff --git a/src/common/base/src/runtime/profile/profiles.rs b/src/common/base/src/runtime/profile/profiles.rs index 2aa2a7d2db850..eb2d1bb3a7801 100644 --- a/src/common/base/src/runtime/profile/profiles.rs +++ b/src/common/base/src/runtime/profile/profiles.rs @@ -44,6 +44,7 @@ pub enum ProfileStatisticsName { SpillReadBytes, SpillReadTime, RuntimeFilterPruneParts, + RuntimeFilterMergeIntoSourceBuildBloomPruneParts, MemoryUsage, } @@ -236,6 +237,13 @@ pub fn get_statistics_desc() -> Arc unit: StatisticsUnit::Count, plain_statistics: true, }), + (ProfileStatisticsName::RuntimeFilterMergeIntoSourceBuildBloomPruneParts, ProfileDesc { + display_name: "parts pruned by merge into target table bloom filter", + desc: "The partitions pruned by merge into target table bloom filter", + index: ProfileStatisticsName::RuntimeFilterMergeIntoSourceBuildBloomPruneParts as usize, + unit: StatisticsUnit::Count, + plain_statistics: true, + }), (ProfileStatisticsName::MemoryUsage, ProfileDesc { display_name: "memory usage", desc: "The real time memory usage", diff --git a/src/query/catalog/src/merge_into_join.rs b/src/query/catalog/src/merge_into_join.rs index db48f84b66a5b..9866de83c6de3 100644 --- a/src/query/catalog/src/merge_into_join.rs +++ b/src/query/catalog/src/merge_into_join.rs @@ -12,6 +12,12 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::sync::Arc; + +use databend_storages_common_table_meta::meta::Location; + +use crate::table::Table; + #[derive(Clone)] pub enum MergeIntoJoinType { Left, @@ -23,14 +29,36 @@ pub enum MergeIntoJoinType { NormalJoin, } -// for now, we just support MergeIntoJoinType::Left to use MergeIntoBlockInfoHashTable in two situations: -// 1. distributed broadcast join and target table as build side. -// 2. in standalone mode and target table as build side. -// we will support Inner next, so the merge_into_join_type is only Left for current implementation in fact. +pub type MergeIntoSourceBuildSegments = Arc>; + +// MergeIntoJoin is used in two cases: +// I. target build optimization: +// we should support MergeIntoJoinType::Left(need to support LeftInner,LeftAnti) to use MergeIntoBlockInfoHashTable in two situations: +// 1. distributed broadcast join and target table as build side (don't support this now). +// 2. in standalone mode and target table as build side (supported). +// 3. native(not-supported) and parquet(supported) format +// for the `target build optimization`, merge_into_join_type is only Left for current implementation in fact.And we +// don't support distributed mode and parquet. So only if it is a LeftJoin and non-dirtributed-merge-into(even if in distributed environment +// but the merge_into_optimizer can also give a non-dirtributed-merge-into) and uses parquet, the `target build optimization` can be enabled. +// II. source build runtime bloom filter. +// only if it's a RightJoin((need to support RightInner,RightAnti)),target_tbl_idx is not invalid. We can make sure it enables +// `source build runtime bloom filter`. And in this case the table_info and catalog_info must be some. We give an `assert` for this judge. +// +// So let's do a summary: +// In most cases, the MergeIntoJoin's `merge_into_join_type` is NormalJoin (even this is a real merge into join),because +// MergeIntoJoin is used to judge whether we enable target_build_optimizations or source build runtime bloom filter. +// Both of these optimizations all will check `merge_into_join_type` to judge if the correlated optimization is enabled. +// So we only `set_merge_into_join` when we use these two optimziations (of course there is an exception, the `merge_into_optimzier` can't distinct +// parquet and native, so it will think it's using parquet format in default, so if it's a native format in fact, the `interpreter_merge_into` will rollback it. +// But the modification is a little tricky. Because the `MergeIntoJoinType` doesn't contain any info about parquet or native,we set the target_table_index as +// DUMMY_TABLE_INDEX, but in fact it has a target_table_index. However it's ok to do this. Firstly we can do this to make sure the `target build optimization` is +// closed. And it won't cause a conflict with `source build runtime bloom filter`. Because for the `target build optimization`, it's target build, and for +// `source build runtime bloom filter`, it's source build). pub struct MergeIntoJoin { pub merge_into_join_type: MergeIntoJoinType, pub is_distributed: bool, pub target_tbl_idx: usize, + pub table: Option>, } impl Default for MergeIntoJoin { @@ -40,6 +68,7 @@ impl Default for MergeIntoJoin { is_distributed: false, // Invalid Index target_tbl_idx: usize::MAX, + table: None, } } } diff --git a/src/query/catalog/src/runtime_filter_info.rs b/src/query/catalog/src/runtime_filter_info.rs index 7c29f193879f0..f674df617c0fa 100644 --- a/src/query/catalog/src/runtime_filter_info.rs +++ b/src/query/catalog/src/runtime_filter_info.rs @@ -12,14 +12,20 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::sync::Arc; + use databend_common_expression::Expr; +use parking_lot::RwLock; use xorf::BinaryFuse16; +pub type MergeIntoSourceBuildSiphashkeys = (Vec, Arc>>>); + #[derive(Clone, Debug, Default)] pub struct RuntimeFilterInfo { inlist: Vec>, min_max: Vec>, bloom: Vec<(String, BinaryFuse16)>, + siphashes: MergeIntoSourceBuildSiphashkeys, } impl RuntimeFilterInfo { @@ -31,6 +37,16 @@ impl RuntimeFilterInfo { self.bloom.push(bloom); } + pub fn get_merge_into_source_build_siphashkeys(&self) -> MergeIntoSourceBuildSiphashkeys { + self.siphashes.clone() + } + + pub fn add_merge_into_source_build_siphashkeys(&mut self, digests: (String, Vec)) { + self.siphashes.0.push(digests.0); + let mut borrow_hash_keys = self.siphashes.1.write(); + borrow_hash_keys.push(digests.1) + } + pub fn add_min_max(&mut self, expr: Expr) { self.min_max.push(expr); } @@ -60,6 +76,9 @@ impl RuntimeFilterInfo { } pub fn is_empty(&self) -> bool { - self.inlist.is_empty() && self.bloom.is_empty() && self.min_max.is_empty() + self.inlist.is_empty() + && self.bloom.is_empty() + && self.min_max.is_empty() + && self.siphashes.0.len() == 0 } } diff --git a/src/query/catalog/src/table_context.rs b/src/query/catalog/src/table_context.rs index 9ea0c6af760ab..01a592b2ea40a 100644 --- a/src/query/catalog/src/table_context.rs +++ b/src/query/catalog/src/table_context.rs @@ -56,10 +56,12 @@ use xorf::BinaryFuse16; use crate::catalog::Catalog; use crate::cluster_info::Cluster; use crate::merge_into_join::MergeIntoJoin; +use crate::merge_into_join::MergeIntoSourceBuildSegments; use crate::plan::DataSourcePlan; use crate::plan::PartInfoPtr; use crate::plan::Partitions; use crate::query_kind::QueryKind; +use crate::runtime_filter_info::MergeIntoSourceBuildSiphashkeys; use crate::runtime_filter_info::RuntimeFilterInfo; use crate::statistics::data_cache_statistics::DataCacheMetrics; use crate::table::Table; @@ -255,11 +257,16 @@ pub trait TableContext: Send + Sync { fn get_query_profiles(&self) -> Vec; + // fn set_runtime_filter(&self, filters: (usize, RuntimeFilterInfo)); fn set_merge_into_join(&self, join: MergeIntoJoin); fn get_merge_into_join(&self) -> MergeIntoJoin; + // set the target table's segments + fn set_merge_into_source_build_segments(&self, segments: MergeIntoSourceBuildSegments); + // get the target table's segments + fn get_merge_into_source_build_segments(&self) -> MergeIntoSourceBuildSegments; fn get_bloom_runtime_filter_with_id(&self, id: usize) -> Vec<(String, BinaryFuse16)>; @@ -267,6 +274,13 @@ pub trait TableContext: Send + Sync { fn get_min_max_runtime_filter_with_id(&self, id: usize) -> Vec>; + fn get_merge_into_source_build_siphashkeys_with_id( + &self, + id: usize, + ) -> Option; + + fn get_merge_into_source_build_bloom_probe_keys(&self, id: usize) -> Vec; + fn has_bloom_runtime_filters(&self, id: usize) -> bool; fn txn_mgr(&self) -> TxnManagerRef; diff --git a/src/query/service/src/interpreters/interpreter_explain.rs b/src/query/service/src/interpreters/interpreter_explain.rs index 133455d68ce3b..e590622370600 100644 --- a/src/query/service/src/interpreters/interpreter_explain.rs +++ b/src/query/service/src/interpreters/interpreter_explain.rs @@ -25,14 +25,18 @@ use databend_common_expression::DataBlock; use databend_common_expression::FromData; use databend_common_pipeline_core::processors::PlanProfile; use databend_common_sql::binder::ExplainConfig; +use databend_common_sql::binder::MergeIntoType; use databend_common_sql::optimizer::ColumnSet; use databend_common_sql::plans::UpdatePlan; use databend_common_sql::BindContext; use databend_common_sql::InsertInputSource; use databend_common_sql::MetadataRef; +use databend_common_storages_factory::Table; +use databend_common_storages_fuse::FuseTable; use databend_common_storages_result_cache::gen_result_cache_key; use databend_common_storages_result_cache::ResultCacheReader; use databend_common_users::UserApiProvider; +use databend_storages_common_table_meta::meta::TableSnapshot; use super::InterpreterFactory; use super::UpdateInterpreter; @@ -159,6 +163,36 @@ impl Interpreter for ExplainInterpreter { .await? } Plan::MergeInto(plan) => { + // if we enable merge_into_source_build_bloom, we should set the segments here + if !plan.change_join_order + && matches!(plan.merge_type, MergeIntoType::FullOperation) + && self + .ctx + .get_settings() + .get_enable_merge_into_source_build_bloom()? + { + let merge_into_join = self.ctx.get_merge_into_join(); + let table = merge_into_join.table.as_ref().unwrap(); + let fuse_table = + table.as_any().downcast_ref::().ok_or_else(|| { + ErrorCode::Unimplemented(format!( + "table {}, engine type {}, does not support MERGE INTO", + table.name(), + table.get_table_info().engine(), + )) + })?; + let base_snapshot = + fuse_table.read_table_snapshot().await?.unwrap_or_else(|| { + Arc::new(TableSnapshot::new_empty_snapshot( + fuse_table.schema().as_ref().clone(), + None, + )) + }); + self.ctx.set_merge_into_source_build_segments(Arc::new( + base_snapshot.segments.clone(), + )); + } + self.explain_analyze( &plan.input, &plan.meta_data, diff --git a/src/query/service/src/interpreters/interpreter_merge_into.rs b/src/query/service/src/interpreters/interpreter_merge_into.rs index 39e1806e364d5..af57c4b18389b 100644 --- a/src/query/service/src/interpreters/interpreter_merge_into.rs +++ b/src/query/service/src/interpreters/interpreter_merge_into.rs @@ -17,6 +17,7 @@ use std::sync::Arc; use std::u64::MAX; use databend_common_catalog::merge_into_join::MergeIntoJoin; +use databend_common_catalog::merge_into_join::MergeIntoJoinType; use databend_common_catalog::table::TableExt; use databend_common_exception::ErrorCode; use databend_common_exception::Result; @@ -181,7 +182,7 @@ impl MergeIntoInterpreter { // for `target_build_optimization` we don't need to read rowId column. for now, there are two cases we don't read rowid: // I. InsertOnly, the MergeIntoType is InsertOnly - // II. target build optimization for this pr. the MergeIntoType is MergeIntoType + // II. target build optimization for this pr. the MergeIntoType is FullOperation let mut target_build_optimization = matches!(self.plan.merge_type, MergeIntoType::FullOperation) && !self.plan.columns_set.contains(&self.plan.row_id_index); @@ -199,6 +200,7 @@ impl MergeIntoInterpreter { target_tbl_idx: DUMMY_TABLE_INDEX, is_distributed: merge_into_join.is_distributed, merge_into_join_type: merge_into_join.merge_into_join_type, + table: merge_into_join.table.clone(), }); } } @@ -287,7 +289,6 @@ impl MergeIntoInterpreter { let table_info = fuse_table.get_table_info().clone(); let catalog_ = self.ctx.get_catalog(catalog).await?; - // merge_into_source is used to recv join's datablocks and split them into macthed and not matched // datablocks. let merge_into_source = if !*distributed && extract_exchange { @@ -435,6 +436,17 @@ impl MergeIntoInterpreter { .enumerate() .collect(); + // try add catalog_info and table_info for `source_build_bloom_filter` + let merge_into_join = self.ctx.get_merge_into_join(); + let source_build_bloom_filter = matches!( + merge_into_join.merge_into_join_type, + MergeIntoJoinType::Right + ) && merge_into_join.target_tbl_idx != DUMMY_TABLE_INDEX; + if source_build_bloom_filter { + self.ctx + .set_merge_into_source_build_segments(Arc::new(base_snapshot.segments.clone())); + } + let commit_input = if !distributed { // recv datablocks from matched upstream and unmatched upstream // transform and append data diff --git a/src/query/service/src/pipelines/processors/transforms/hash_join/hash_join_build_state.rs b/src/query/service/src/pipelines/processors/transforms/hash_join/hash_join_build_state.rs index d0c9ee56ce443..c8d7ed1d331cf 100644 --- a/src/query/service/src/pipelines/processors/transforms/hash_join/hash_join_build_state.rs +++ b/src/query/service/src/pipelines/processors/transforms/hash_join/hash_join_build_state.rs @@ -22,6 +22,7 @@ use std::sync::Arc; use databend_common_arrow::arrow::bitmap::Bitmap; use databend_common_base::base::tokio::sync::Barrier; +use databend_common_catalog::merge_into_join::MergeIntoJoinType; use databend_common_catalog::runtime_filter_info::RuntimeFilterInfo; use databend_common_catalog::table_context::TableContext; use databend_common_exception::ErrorCode; @@ -55,6 +56,8 @@ use databend_common_hashtable::StringRawEntry; use databend_common_hashtable::STRING_EARLY_SIZE; use databend_common_sql::plans::JoinType; use databend_common_sql::ColumnSet; +use databend_common_storages_fuse::operations::can_merge_into_target_build_bloom_filter; +use databend_storages_common_index::BloomIndex; use ethnum::U256; use itertools::Itertools; use log::info; @@ -846,9 +849,21 @@ impl HashJoinBuildState { probe_key, )?; } + + // add BloomIndex hash keys for merge into source build. + if can_merge_into_target_build_bloom_filter(self.ctx.clone(), *table_index)? { + self.build_merge_into_runtime_filter_siphashes( + build_chunks, + &mut runtime_filter, + build_key, + probe_key, + )?; + } + if self.enable_bloom_runtime_filter { self.bloom_runtime_filter(build_chunks, &mut runtime_filter, build_key, probe_key)?; } + if self.enable_min_max_runtime_filter { self.min_max_runtime_filter( build_chunks, @@ -913,6 +928,84 @@ impl HashJoinBuildState { Ok(()) } + // for merge into source build cases, like below: + // merge into `t1` using `t2` on xxx when matched xx when not matched xxx, if merge_into_optimizer + // gives `t2` as source build side, we can build source join keys `siphashes`, that's because we use + // siphash to build target table's bloom index block. + // in this way, we can avoid current `runtime_filter()` func's performance cost, especially for large + // target table case, the `runtime_filter()`'s cost is even higher than disable `runtime_filter()`. + // However, for `build_runtime_filter_siphashes()` usages, we currently just used for merge into, + // we doesn't support join query, and it's only for `source build` cases. In fact, source build is the + // main case in most time. + fn build_merge_into_runtime_filter_siphashes( + &self, + data_blocks: &[DataBlock], + runtime_filter: &mut RuntimeFilterInfo, + build_key: &Expr, + probe_key: &Expr, + ) -> Result<()> { + // `calculate_nullable_column_digest`, `apply_bloom_pruning` + let merge_type = self.ctx.get_merge_into_join(); + if matches!(merge_type.merge_into_join_type, MergeIntoJoinType::Right) { + let id = match probe_key { + Expr::ColumnRef { id, .. } => Some(id), + Expr::Cast { + expr: box Expr::ColumnRef { id, .. }, + .. + } => Some(id), + _ => None, + }; + + if let Some(id) = id { + let mut columns = Vec::with_capacity(data_blocks.len()); + for block in data_blocks.iter() { + if block.num_columns() == 0 { + continue; + } + let evaluator = Evaluator::new(block, &self.func_ctx, &BUILTIN_FUNCTIONS); + let column = evaluator + .run(build_key)? + .convert_to_full_column(build_key.data_type(), block.num_rows()); + columns.push(column); + } + if columns.is_empty() { + return Ok(()); + } + let build_key_column = Column::concat_columns(columns.into_iter())?; + // maybe there will be null values here, so we use nullable column, the null value will be treat as default + // value for the specified type, like String -> "", int -> 0. so we need to remove the null hash values here. + let (hashes, bitmap_op) = BloomIndex::calculate_nullable_column_digest( + &self.func_ctx, + &build_key_column, + &build_key_column.data_type(), + )?; + if let Some(bitmap) = bitmap_op { + // no null values + let digests = if bitmap.unset_bits() == 0 { + hashes.to_vec() + } else { + let mut new_hashes = Vec::with_capacity(bitmap.len()); + assert_eq!(hashes.len(), bitmap.len()); + for row_idx in 0..bitmap.len() { + if bitmap.get_bit(row_idx) { + new_hashes.push(hashes[row_idx]) + } + } + new_hashes.to_vec() + }; + // id is probe key name + runtime_filter + .add_merge_into_source_build_siphashkeys((id.to_string(), digests)); + } else { + // id is probe key name + runtime_filter + .add_merge_into_source_build_siphashkeys((id.to_string(), hashes.to_vec())); + } + } + } + Ok(()) + } + fn inlist_runtime_filter( &self, runtime_filter: &mut RuntimeFilterInfo, diff --git a/src/query/service/src/sessions/query_ctx.rs b/src/query/service/src/sessions/query_ctx.rs index 57a3de9dcbc47..26b79c85ec652 100644 --- a/src/query/service/src/sessions/query_ctx.rs +++ b/src/query/service/src/sessions/query_ctx.rs @@ -39,12 +39,14 @@ use databend_common_base::runtime::profile::Profile; use databend_common_base::runtime::profile::ProfileStatisticsName; use databend_common_base::runtime::TrySpawn; use databend_common_catalog::merge_into_join::MergeIntoJoin; +use databend_common_catalog::merge_into_join::MergeIntoSourceBuildSegments; use databend_common_catalog::plan::DataSourceInfo; use databend_common_catalog::plan::DataSourcePlan; use databend_common_catalog::plan::PartInfoPtr; use databend_common_catalog::plan::Partitions; use databend_common_catalog::plan::StageTableInfo; use databend_common_catalog::query_kind::QueryKind; +use databend_common_catalog::runtime_filter_info::MergeIntoSourceBuildSiphashkeys; use databend_common_catalog::runtime_filter_info::RuntimeFilterInfo; use databend_common_catalog::statistics::data_cache_statistics::DataCacheMetrics; use databend_common_catalog::table_args::TableArgs; @@ -1001,6 +1003,13 @@ impl TableContext for QueryContext { for filter in filters.1.get_min_max() { v.get_mut().add_min_max(filter.clone()); } + let (keys, siphashkeys) = filters.1.get_merge_into_source_build_siphashkeys(); + for (idx, key) in keys.into_iter().enumerate() { + v.get_mut().add_merge_into_source_build_siphashkeys(( + key, + siphashkeys.read()[idx].clone(), + )) + } for filter in filters.1.blooms() { v.get_mut().add_bloom(filter); } @@ -1014,6 +1023,7 @@ impl TableContext for QueryContext { merge_into_join_type: merge_into_join.merge_into_join_type.clone(), is_distributed: merge_into_join.is_distributed, target_tbl_idx: merge_into_join.target_tbl_idx, + table: merge_into_join.table.clone(), } } @@ -1025,6 +1035,24 @@ impl TableContext for QueryContext { } } + fn get_merge_into_source_build_siphashkeys_with_id( + &self, + id: usize, + ) -> Option { + let runtime_filters = self.shared.runtime_filters.read(); + runtime_filters + .get(&id) + .map(|v| v.get_merge_into_source_build_siphashkeys()) + } + + fn get_merge_into_source_build_bloom_probe_keys(&self, id: usize) -> Vec { + let runtime_filters = self.shared.runtime_filters.read(); + match runtime_filters.get(&id) { + Some(v) => v.get_merge_into_source_build_siphashkeys().0.clone(), + None => vec![], + } + } + fn get_inlist_runtime_filter_with_id(&self, id: IndexType) -> Vec> { let runtime_filters = self.shared.runtime_filters.read(); match runtime_filters.get(&id) { @@ -1052,6 +1080,16 @@ impl TableContext for QueryContext { self.shared.session.session_ctx.txn_mgr() } + fn set_merge_into_source_build_segments(&self, segments: MergeIntoSourceBuildSegments) { + let mut merge_into_source_build_segments = + self.shared.merge_into_source_build_segments.write(); + *merge_into_source_build_segments = segments; + } + + fn get_merge_into_source_build_segments(&self) -> MergeIntoSourceBuildSegments { + self.shared.merge_into_source_build_segments.read().clone() + } + fn get_read_block_thresholds(&self) -> BlockThresholds { *self.block_threshold.read() } diff --git a/src/query/service/src/sessions/query_ctx_shared.rs b/src/query/service/src/sessions/query_ctx_shared.rs index cf8168c72f7b3..4d36dd91e920c 100644 --- a/src/query/service/src/sessions/query_ctx_shared.rs +++ b/src/query/service/src/sessions/query_ctx_shared.rs @@ -26,6 +26,7 @@ use databend_common_base::runtime::drop_guard; use databend_common_base::runtime::Runtime; use databend_common_catalog::catalog::CatalogManager; use databend_common_catalog::merge_into_join::MergeIntoJoin; +use databend_common_catalog::merge_into_join::MergeIntoSourceBuildSegments; use databend_common_catalog::query_kind::QueryKind; use databend_common_catalog::runtime_filter_info::RuntimeFilterInfo; use databend_common_catalog::statistics::data_cache_statistics::DataCacheMetrics; @@ -121,6 +122,9 @@ pub struct QueryContextShared { pub(in crate::sessions) merge_into_join: Arc>, + pub(in crate::sessions) merge_into_source_build_segments: + Arc>, + // Records query level data cache metrics pub(in crate::sessions) query_cache_metrics: DataCacheMetrics, } @@ -170,6 +174,7 @@ impl QueryContextShared { query_profiles: Arc::new(RwLock::new(HashMap::new())), runtime_filters: Default::default(), merge_into_join: Default::default(), + merge_into_source_build_segments: Default::default(), })) } diff --git a/src/query/service/tests/it/sql/exec/get_table_bind_test.rs b/src/query/service/tests/it/sql/exec/get_table_bind_test.rs index c55367c403964..8861d98b0667a 100644 --- a/src/query/service/tests/it/sql/exec/get_table_bind_test.rs +++ b/src/query/service/tests/it/sql/exec/get_table_bind_test.rs @@ -26,10 +26,12 @@ use databend_common_catalog::catalog::Catalog; use databend_common_catalog::cluster_info::Cluster; use databend_common_catalog::database::Database; use databend_common_catalog::merge_into_join::MergeIntoJoin; +use databend_common_catalog::merge_into_join::MergeIntoSourceBuildSegments; use databend_common_catalog::plan::DataSourcePlan; use databend_common_catalog::plan::PartInfoPtr; use databend_common_catalog::plan::Partitions; use databend_common_catalog::query_kind::QueryKind; +use databend_common_catalog::runtime_filter_info::MergeIntoSourceBuildSiphashkeys; use databend_common_catalog::runtime_filter_info::RuntimeFilterInfo; use databend_common_catalog::statistics::data_cache_statistics::DataCacheMetrics; use databend_common_catalog::table::Table; @@ -818,6 +820,25 @@ impl TableContext for CtxDelegation { fn set_read_block_thresholds(&self, _thresholds: BlockThresholds) { todo!() } + + fn get_merge_into_source_build_siphashkeys_with_id( + &self, + _id: usize, + ) -> Option { + todo!() + } + + fn get_merge_into_source_build_bloom_probe_keys(&self, _id: usize) -> Vec { + todo!() + } + + fn set_merge_into_source_build_segments(&self, _segments: MergeIntoSourceBuildSegments) { + todo!() + } + + fn get_merge_into_source_build_segments(&self) -> MergeIntoSourceBuildSegments { + todo!() + } } #[tokio::test(flavor = "multi_thread")] diff --git a/src/query/service/tests/it/storages/fuse/operations/commit.rs b/src/query/service/tests/it/storages/fuse/operations/commit.rs index 582eb573aae50..7b705e422044e 100644 --- a/src/query/service/tests/it/storages/fuse/operations/commit.rs +++ b/src/query/service/tests/it/storages/fuse/operations/commit.rs @@ -25,6 +25,7 @@ use databend_common_catalog::catalog::Catalog; use databend_common_catalog::cluster_info::Cluster; use databend_common_catalog::database::Database; use databend_common_catalog::merge_into_join::MergeIntoJoin; +use databend_common_catalog::merge_into_join::MergeIntoSourceBuildSegments; use databend_common_catalog::plan::DataSourcePlan; use databend_common_catalog::plan::PartInfoPtr; use databend_common_catalog::plan::Partitions; @@ -374,6 +375,28 @@ impl TableContext for CtxDelegation { self } + fn get_merge_into_source_build_bloom_probe_keys(&self, _: usize) -> Vec { + todo!() + } + + fn set_merge_into_source_build_segments(&self, _: Arc>) { + todo!() + } + + fn get_merge_into_source_build_siphashkeys_with_id( + &self, + _: usize, + ) -> std::option::Option<( + Vec, + Arc>>>, + )> { + todo!() + } + + fn get_merge_into_source_build_segments(&self) -> MergeIntoSourceBuildSegments { + todo!() + } + fn build_table_from_source_plan(&self, _plan: &DataSourcePlan) -> Result> { todo!() } diff --git a/src/query/settings/src/settings_default.rs b/src/query/settings/src/settings_default.rs index 87a5bb65322f8..f84371aef476f 100644 --- a/src/query/settings/src/settings_default.rs +++ b/src/query/settings/src/settings_default.rs @@ -504,6 +504,12 @@ impl DefaultSettings { mode: SettingMode::Both, range: Some(SettingRange::Numeric(0..=1)), }), + ("enable_merge_into_source_build_bloom", DefaultSettingValue { + value: UserSettingValue::UInt64(0), + desc: "Enable merge into source build bloom.", + mode: SettingMode::Both, + range: Some(SettingRange::Numeric(0..=1)), + }), ("enable_distributed_merge_into", DefaultSettingValue { value: UserSettingValue::UInt64(0), desc: "Enables distributed execution for 'MERGE INTO'.", diff --git a/src/query/settings/src/settings_getter_setter.rs b/src/query/settings/src/settings_getter_setter.rs index 4457cff462cc3..912bd73cd5768 100644 --- a/src/query/settings/src/settings_getter_setter.rs +++ b/src/query/settings/src/settings_getter_setter.rs @@ -451,6 +451,10 @@ impl Settings { Ok(self.try_get_u64("enable_experimental_merge_into")? != 0) } + pub fn get_enable_merge_into_source_build_bloom(&self) -> Result { + Ok(self.try_get_u64("enable_merge_into_source_build_bloom")? != 0) + } + pub fn get_enable_distributed_merge_into(&self) -> Result { Ok(self.try_get_u64("enable_distributed_merge_into")? != 0) } diff --git a/src/query/sql/src/planner/binder/merge_into.rs b/src/query/sql/src/planner/binder/merge_into.rs index 20de7fa66000c..c72e13bf904b3 100644 --- a/src/query/sql/src/planner/binder/merge_into.rs +++ b/src/query/sql/src/planner/binder/merge_into.rs @@ -471,6 +471,8 @@ impl Binder { row_id_index: column_binding.index, split_idx, can_try_update_column_only: self.can_try_update_column_only(&matched_clauses), + table_schema, + can_merge_into_source_build_bloom: false, }) } diff --git a/src/query/sql/src/planner/format/display_plan.rs b/src/query/sql/src/planner/format/display_plan.rs index b98826931e191..8c3895527a0a8 100644 --- a/src/query/sql/src/planner/format/display_plan.rs +++ b/src/query/sql/src/planner/format/display_plan.rs @@ -313,6 +313,11 @@ fn format_merge_into(merge_into: &MergeInto) -> Result { table_entry.name(), ); + let can_merge_into_source_build_bloom_format = FormatTreeNode::new(format!( + "can_merge_into_source_build_bloom: {}", + merge_into.can_merge_into_source_build_bloom + )); + let target_build_optimization = matches!(merge_into.merge_type, MergeIntoType::FullOperation) && !merge_into.columns_set.contains(&merge_into.row_id_index); let target_build_optimization_format = FormatTreeNode::new(format!( @@ -390,6 +395,7 @@ fn format_merge_into(merge_into: &MergeInto) -> Result { vec![distributed_format], vec![target_build_optimization_format], vec![can_try_update_column_only_format], + vec![can_merge_into_source_build_bloom_format], matched_children, unmatched_children, vec![input_format_child], diff --git a/src/query/sql/src/planner/optimizer/optimizer.rs b/src/query/sql/src/planner/optimizer/optimizer.rs index 7ebba409909a5..d3b7d46f82fd4 100644 --- a/src/query/sql/src/planner/optimizer/optimizer.rs +++ b/src/query/sql/src/planner/optimizer/optimizer.rs @@ -16,6 +16,7 @@ use std::collections::HashSet; use std::sync::Arc; use databend_common_ast::ast::ExplainKind; +use databend_common_base::runtime::block_on; use databend_common_catalog::merge_into_join::MergeIntoJoin; use databend_common_catalog::merge_into_join::MergeIntoJoinType; use databend_common_catalog::table_context::TableContext; @@ -397,35 +398,47 @@ async fn optimize_merge_into(opt_ctx: OptimizerContext, plan: Box) -> false }; - // we just support left join to use MergeIntoBlockInfoHashTable, we - // don't support spill for now, and we need the macthed clauses' count - // is one, just support `merge into t using source when matched then - // update xx when not matched then insert xx`. - let flag = plan.matched_evaluators.len() == 1 - && plan.matched_evaluators[0].condition.is_none() - && plan.matched_evaluators[0].update.is_some() - && !opt_ctx - .table_ctx - .get_settings() - .get_enable_distributed_merge_into()?; - let mut new_columns_set = plan.columns_set.clone(); - if change_join_order + // try add source_build bloom filter + let mut enable_merge_into_source_build_bloom = false; + if !change_join_order && matches!(plan.merge_type, MergeIntoType::FullOperation) && opt_ctx .table_ctx .get_settings() - .get_join_spilling_memory_ratio()? - == 0 - && flag + .get_enable_merge_into_source_build_bloom()? { - new_columns_set.remove(&plan.row_id_index); + let merge_into_join = opt_ctx.table_ctx.get_merge_into_join(); + // this is the first time set, so it must be none, and we will set it in `interpreter_merge_into` + assert!(merge_into_join.table.is_none()); + let table = block_on(async { + opt_ctx + .table_ctx + .get_table( + plan.catalog.as_str(), + plan.database.as_str(), + plan.table.as_str(), + ) + .await + })?; + opt_ctx.table_ctx.set_merge_into_join(MergeIntoJoin { - merge_into_join_type: MergeIntoJoinType::Left, - is_distributed: false, target_tbl_idx: plan.target_table_idx, - }) + is_distributed: false, // we will set it after later optimization. + merge_into_join_type: MergeIntoJoinType::Right, + table: Some(table), + }); + enable_merge_into_source_build_bloom = true; } + // we just support left join to use MergeIntoBlockInfoHashTable, we + // don't support spill for now, and we need the macthed clauses' count + // is one, just support `merge into t using source when matched then + // update xx when not matched then insert xx`. + let flag = plan.matched_evaluators.len() == 1 + && plan.matched_evaluators[0].condition.is_none() + && plan.matched_evaluators[0].update.is_some(); + let mut new_columns_set = plan.columns_set.clone(); + // try to optimize distributed join, only if // - distributed optimization is enabled // - no local table scan @@ -485,24 +498,65 @@ async fn optimize_merge_into(opt_ctx: OptimizerContext, plan: Box) -> // III. (merge_into_join_sexpr.clone(), false) }; + // try add target_build_optimizations + if change_join_order + && matches!(plan.merge_type, MergeIntoType::FullOperation) + && opt_ctx + .table_ctx + .get_settings() + .get_join_spilling_memory_ratio()? + == 0 + && flag + { + new_columns_set.remove(&plan.row_id_index); + let merge_into_join = opt_ctx.table_ctx.get_merge_into_join(); + opt_ctx.table_ctx.set_merge_into_join(MergeIntoJoin { + merge_into_join_type: MergeIntoJoinType::Left, + is_distributed: distributed, + target_tbl_idx: plan.target_table_idx, + table: merge_into_join.table.clone(), + }) + } Ok(Plan::MergeInto(Box::new(MergeInto { input: Box::new(optimized_distributed_merge_into_join_sexpr), distributed, change_join_order, columns_set: new_columns_set.clone(), + can_merge_into_source_build_bloom: enable_merge_into_source_build_bloom, ..*plan }))) } else { + // try add target_build_optimizations + if change_join_order + && matches!(plan.merge_type, MergeIntoType::FullOperation) + && opt_ctx + .table_ctx + .get_settings() + .get_join_spilling_memory_ratio()? + == 0 + && flag + { + new_columns_set.remove(&plan.row_id_index); + let merge_into_join = opt_ctx.table_ctx.get_merge_into_join(); + opt_ctx.table_ctx.set_merge_into_join(MergeIntoJoin { + merge_into_join_type: MergeIntoJoinType::Left, + is_distributed: false, + target_tbl_idx: plan.target_table_idx, + table: merge_into_join.table.clone(), + }) + } Ok(Plan::MergeInto(Box::new(MergeInto { input: join_sexpr, change_join_order, columns_set: new_columns_set, + can_merge_into_source_build_bloom: enable_merge_into_source_build_bloom, ..*plan }))) } } +// Todo(JackTan25): This method is useless for now, it will be used for distributed target build optimization in the future. fn try_to_change_as_broadcast_join( merge_into_join_sexpr: SExpr, _change_join_order: bool, @@ -515,6 +569,7 @@ fn try_to_change_as_broadcast_join( let right_exchange = merge_into_join_sexpr.child(0)?.child(1)?; if let RelOperator::Exchange(Exchange::Broadcast) = right_exchange.plan.as_ref() { let join: Join = merge_into_join_sexpr.child(0)?.plan().clone().try_into()?; + let join_s_expr = merge_into_join_sexpr .child(0)? .replace_plan(Arc::new(RelOperator::Join(join))); diff --git a/src/query/sql/src/planner/plans/merge_into.rs b/src/query/sql/src/planner/plans/merge_into.rs index 8d6dd7d1ef625..42a788630baea 100644 --- a/src/query/sql/src/planner/plans/merge_into.rs +++ b/src/query/sql/src/planner/plans/merge_into.rs @@ -14,6 +14,7 @@ use std::collections::HashMap; use std::collections::HashSet; +use std::sync::Arc; use databend_common_ast::ast::TableAlias; use databend_common_exception::ErrorCode; @@ -24,6 +25,7 @@ use databend_common_expression::DataField; use databend_common_expression::DataSchemaRef; use databend_common_expression::DataSchemaRefExt; use databend_common_expression::FieldIndex; +use databend_common_expression::TableSchema; use databend_common_meta_types::MetaId; use crate::binder::MergeIntoType; @@ -76,6 +78,8 @@ pub struct MergeInto { // `update *`` or `update set t1.a = t2.a ...`, the right expr on the `=` must be only a column, // we don't support complex expressions. pub can_try_update_column_only: bool, + pub can_merge_into_source_build_bloom: bool, + pub table_schema: Arc, } impl std::fmt::Debug for MergeInto { diff --git a/src/query/storages/common/index/src/lib.rs b/src/query/storages/common/index/src/lib.rs index 3d01eed8e79d5..db26f2318c395 100644 --- a/src/query/storages/common/index/src/lib.rs +++ b/src/query/storages/common/index/src/lib.rs @@ -25,6 +25,7 @@ mod range_index; pub use bloom_index::BloomIndex; pub use bloom_index::BloomIndexMeta; pub use bloom_index::FilterEvalResult; +pub use filters::Xor8Filter; pub use index::Index; pub use inverted_index::InvertedIndexDirectory; pub use page_index::PageIndex; diff --git a/src/query/storages/fuse/src/operations/mod.rs b/src/query/storages/fuse/src/operations/mod.rs index 0ce366d706be3..ff5744e1f68aa 100644 --- a/src/query/storages/fuse/src/operations/mod.rs +++ b/src/query/storages/fuse/src/operations/mod.rs @@ -43,6 +43,7 @@ pub use delete::MutationBlockPruningContext; pub use merge_into::*; pub use mutation::*; pub use read::build_row_fetcher_pipeline; +pub use read::can_merge_into_target_build_bloom_filter; pub use read::need_reserve_block_info; pub use replace_into::*; pub use util::acquire_task_permit; diff --git a/src/query/storages/fuse/src/operations/read/fuse_source.rs b/src/query/storages/fuse/src/operations/read/fuse_source.rs index 814e62378c826..4387915954ada 100644 --- a/src/query/storages/fuse/src/operations/read/fuse_source.rs +++ b/src/query/storages/fuse/src/operations/read/fuse_source.rs @@ -150,7 +150,6 @@ pub fn build_fuse_parquet_source_pipeline( adjust_threads_and_request(false, max_threads, max_io_requests, plan); let mut source_builder = SourcePipeBuilder::create(); - match block_reader.support_blocking_api() { true => { let partitions = dispatch_partitions(ctx.clone(), plan, max_threads); diff --git a/src/query/storages/fuse/src/operations/read/mod.rs b/src/query/storages/fuse/src/operations/read/mod.rs index be309849c7fb3..16ccca3cab0c0 100644 --- a/src/query/storages/fuse/src/operations/read/mod.rs +++ b/src/query/storages/fuse/src/operations/read/mod.rs @@ -32,4 +32,5 @@ pub use native_data_source_deserializer::NativeDeserializeDataTransform; pub use native_data_source_reader::ReadNativeDataSource; pub use parquet_data_source_deserializer::DeserializeDataTransform; pub use parquet_data_source_reader::ReadParquetDataSource; +pub use util::can_merge_into_target_build_bloom_filter; pub use util::need_reserve_block_info; diff --git a/src/query/storages/fuse/src/operations/read/native_data_source_reader.rs b/src/query/storages/fuse/src/operations/read/native_data_source_reader.rs index 372580b97415a..de089b2eb3fbb 100644 --- a/src/query/storages/fuse/src/operations/read/native_data_source_reader.rs +++ b/src/query/storages/fuse/src/operations/read/native_data_source_reader.rs @@ -33,6 +33,8 @@ use databend_common_sql::IndexType; use log::debug; use super::native_data_source::NativeDataSource; +use super::util::build_merge_into_source_build_bloom_info; +use super::util::MergeIntoSourceBuildBloomInfo; use crate::io::AggIndexReader; use crate::io::BlockReader; use crate::io::TableMetaLocationGenerator; @@ -57,6 +59,7 @@ pub struct ReadNativeDataSource { table_schema: Arc, table_index: IndexType, + merge_into_source_build_bloom_info: MergeIntoSourceBuildBloomInfo, } impl ReadNativeDataSource { @@ -73,6 +76,7 @@ impl ReadNativeDataSource { ) -> Result { let batch_size = ctx.get_settings().get_storage_fetch_part_num()? as usize; let func_ctx = ctx.get_function_context()?; + let merge_into_join = ctx.get_merge_into_join(); SyncSourcer::create(ctx.clone(), output.clone(), ReadNativeDataSource:: { func_ctx, id, @@ -86,6 +90,11 @@ impl ReadNativeDataSource { virtual_reader, table_schema, table_index, + merge_into_source_build_bloom_info: build_merge_into_source_build_bloom_info( + ctx, + table_index, + merge_into_join, + )?, }) } } @@ -104,6 +113,7 @@ impl ReadNativeDataSource { ) -> Result { let batch_size = ctx.get_settings().get_storage_fetch_part_num()? as usize; let func_ctx = ctx.get_function_context()?; + let merge_into_join = ctx.get_merge_into_join(); Ok(ProcessorPtr::create(Box::new(ReadNativeDataSource::< false, > { @@ -119,6 +129,11 @@ impl ReadNativeDataSource { virtual_reader, table_schema, table_index, + merge_into_source_build_bloom_info: build_merge_into_source_build_bloom_info( + ctx, + table_index, + merge_into_join, + )?, }))) } } @@ -144,6 +159,11 @@ impl SyncSource for ReadNativeDataSource { &part, &filters, &self.func_ctx, + self.merge_into_source_build_bloom_info + .can_do_merge_into_runtime_filter_bloom, + self.partitions.ctx.clone(), + self.table_index, + &mut self.merge_into_source_build_bloom_info, )? { return Ok(Some(DataBlock::empty())); } @@ -251,6 +271,11 @@ impl Processor for ReadNativeDataSource { &part, &filters, &self.func_ctx, + self.merge_into_source_build_bloom_info + .can_do_merge_into_runtime_filter_bloom, + self.partitions.ctx.clone(), + self.table_index, + &mut self.merge_into_source_build_bloom_info, )? { continue; } diff --git a/src/query/storages/fuse/src/operations/read/parquet_data_source_reader.rs b/src/query/storages/fuse/src/operations/read/parquet_data_source_reader.rs index 96f39ece964b0..5bd8122f069ef 100644 --- a/src/query/storages/fuse/src/operations/read/parquet_data_source_reader.rs +++ b/src/query/storages/fuse/src/operations/read/parquet_data_source_reader.rs @@ -33,6 +33,8 @@ use databend_common_sql::IndexType; use log::debug; use super::parquet_data_source::ParquetDataSource; +use super::util::build_merge_into_source_build_bloom_info; +use super::util::MergeIntoSourceBuildBloomInfo; use crate::fuse_part::FuseBlockPartInfo; use crate::io::AggIndexReader; use crate::io::BlockReader; @@ -45,6 +47,7 @@ use crate::operations::read::runtime_filter_prunner::runtime_filter_pruner; pub struct ReadParquetDataSource { func_ctx: FunctionContext, id: usize, + // build table index, not probe side table index table_index: IndexType, finished: bool, batch_size: usize, @@ -58,6 +61,8 @@ pub struct ReadParquetDataSource { virtual_reader: Arc>, table_schema: Arc, + + merge_into_source_build_bloom_info: MergeIntoSourceBuildBloomInfo, } impl ReadParquetDataSource { @@ -76,6 +81,7 @@ impl ReadParquetDataSource { let batch_size = ctx.get_settings().get_storage_fetch_part_num()? as usize; let func_ctx = ctx.get_function_context()?; if BLOCKING_IO { + let merge_into_join = ctx.get_merge_into_join(); SyncSourcer::create(ctx.clone(), output.clone(), ReadParquetDataSource:: { func_ctx, id, @@ -89,8 +95,14 @@ impl ReadParquetDataSource { index_reader, virtual_reader, table_schema, + merge_into_source_build_bloom_info: build_merge_into_source_build_bloom_info( + ctx, + table_index, + merge_into_join, + )?, }) } else { + let merge_into_join = ctx.get_merge_into_join(); Ok(ProcessorPtr::create(Box::new(ReadParquetDataSource::< false, > { @@ -106,6 +118,11 @@ impl ReadParquetDataSource { index_reader, virtual_reader, table_schema, + merge_into_source_build_bloom_info: build_merge_into_source_build_bloom_info( + ctx, + table_index, + merge_into_join, + )?, }))) } } @@ -132,6 +149,11 @@ impl SyncSource for ReadParquetDataSource { &part, &filters, &self.func_ctx, + self.merge_into_source_build_bloom_info + .can_do_merge_into_runtime_filter_bloom, + self.partitions.ctx.clone(), + self.table_index, + &mut self.merge_into_source_build_bloom_info, )? { return Ok(Some(DataBlock::empty())); } @@ -248,6 +270,11 @@ impl Processor for ReadParquetDataSource { &part, &filters, &self.func_ctx, + self.merge_into_source_build_bloom_info + .can_do_merge_into_runtime_filter_bloom, + self.partitions.ctx.clone(), + self.table_index, + &mut self.merge_into_source_build_bloom_info, )? { continue; } diff --git a/src/query/storages/fuse/src/operations/read/runtime_filter_prunner.rs b/src/query/storages/fuse/src/operations/read/runtime_filter_prunner.rs index 178c95da84c8b..5f837e61ed673 100644 --- a/src/query/storages/fuse/src/operations/read/runtime_filter_prunner.rs +++ b/src/query/storages/fuse/src/operations/read/runtime_filter_prunner.rs @@ -12,13 +12,17 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::collections::hash_map::Entry; use std::collections::HashMap; use std::sync::Arc; use databend_common_arrow::arrow::bitmap::MutableBitmap; use databend_common_base::runtime::profile::Profile; use databend_common_base::runtime::profile::ProfileStatisticsName; +use databend_common_base::runtime::GlobalIORuntime; use databend_common_catalog::plan::PartInfoPtr; +use databend_common_catalog::table_context::TableContext; +use databend_common_exception::ErrorCode; use databend_common_exception::Result; use databend_common_expression::types::NumberColumn; use databend_common_expression::Column; @@ -35,20 +39,32 @@ use databend_common_expression::Scalar; use databend_common_expression::TableSchema; use databend_common_functions::BUILTIN_FUNCTIONS; use databend_common_hashtable::FastHash; +use databend_common_sql::executor::physical_plans::OnConflictField; +use databend_storages_common_cache::LoadParams; use databend_storages_common_index::statistics_to_domain; +use databend_storages_common_table_meta::meta::SegmentInfo; use log::info; use xorf::BinaryFuse16; use xorf::Filter; +use super::util::MergeIntoSourceBuildBloomInfo; +use crate::io::MetaReaders; +use crate::operations::load_bloom_filter; +use crate::operations::try_prune_use_bloom_filter; use crate::FuseBlockPartInfo; +use crate::FuseTable; pub fn runtime_filter_pruner( table_schema: Arc, part: &PartInfoPtr, filters: &[Expr], func_ctx: &FunctionContext, + can_do_merge_into_target_build_bloom_filter: bool, + ctx: Arc, + id: usize, + merge_into_source_build_bloom_info: &mut MergeIntoSourceBuildBloomInfo, ) -> Result { - if filters.is_empty() { + if filters.is_empty() && !can_do_merge_into_target_build_bloom_filter { return Ok(false); } let part = FuseBlockPartInfo::from_part(part)?; @@ -99,7 +115,140 @@ pub fn runtime_filter_pruner( return Ok(true); } - Ok(false) + // if we can't pruned this block, we can try get siphashkeys if this is a merge into source build. + // for every probe key expr, if it's a ColumnRef, we can get the build column hash keys,but if not, + // we can't. so even if we enable this bloom filter, we probally can't do bloom filter in fact. + if can_do_merge_into_target_build_bloom_filter + && ctx + .get_merge_into_source_build_siphashkeys_with_id(id) + .is_some_and(|hash_keys| !hash_keys.0.is_empty()) + { + let pruned = try_prune_merge_into_target_table( + ctx.clone(), + part, + merge_into_source_build_bloom_info, + id, + )?; + if pruned { + Profile::record_usize_profile( + ProfileStatisticsName::RuntimeFilterMergeIntoSourceBuildBloomPruneParts, + 1, + ); + } + Ok(pruned) + } else { + Ok(false) + } +} + +pub(crate) fn try_prune_merge_into_target_table( + ctx: Arc, + part: &FuseBlockPartInfo, + merge_into_source_build_bloom_info: &mut MergeIntoSourceBuildBloomInfo, + id: usize, +) -> Result { + assert!(part.block_meta_index().is_some()); + let block_meta_index = part.block_meta_index().unwrap(); + + let segment_idx = block_meta_index.segment_idx; + let block_idx = block_meta_index.block_idx; + let target_table_segments = ctx.get_merge_into_source_build_segments(); + + let table = merge_into_source_build_bloom_info.table.as_ref().unwrap(); + let fuse_table = table.as_any().downcast_ref::().ok_or_else(|| { + ErrorCode::Unimplemented(format!( + "table {}, engine type {}, does not support MERGE INTO", + table.name(), + table.get_table_info().engine(), + )) + })?; + if let Entry::Vacant(e) = merge_into_source_build_bloom_info + .segment_infos + .entry(segment_idx) + { + let (path, ver) = target_table_segments.get(segment_idx).ok_or_else(|| { + ErrorCode::Internal(format!( + "unexpected, segment (idx {}) not found, during do merge into source build bloom filter", + segment_idx + )) + })?; + + let load_param = LoadParams { + location: path.clone(), + len_hint: None, + ver: *ver, + put_cache: true, + }; + let target_table_schema = table.schema_with_stream(); + let data_accessor = fuse_table.get_operator(); + let segment_reader = + MetaReaders::segment_info_reader(data_accessor.clone(), target_table_schema.clone()); + let compact_segment_info = GlobalIORuntime::instance() + .block_on(async move { segment_reader.read(&load_param).await })?; + let segment_info: SegmentInfo = compact_segment_info.try_into()?; + e.insert(segment_info); + } + // load bloom filter + let segment_info = merge_into_source_build_bloom_info + .segment_infos + .get(&segment_idx) + .unwrap(); + assert!(block_idx < segment_info.blocks.len()); + info!( + "merge into source build runtime bloom filter: segment_idx:{},blk_idx:{}", + segment_idx, block_idx + ); + let block_meta = segment_info.blocks[block_idx].clone(); + let bloom_filter_index_size = block_meta.bloom_filter_index_size; + let index_location = block_meta.bloom_filter_index_location.clone(); + if let Some(index_location) = index_location { + // init bloom info. + if !merge_into_source_build_bloom_info.init_bloom_index_info { + merge_into_source_build_bloom_info.init_bloom_index_info = true; + let merge_into_join = ctx.get_merge_into_join(); + let (bloom_indexes, bloom_fields) = + ctx.get_merge_into_source_build_bloom_probe_keys(merge_into_source_build_bloom_info.target_table_index) + .iter() + .try_fold((Vec::new(),Vec::new()), |mut acc, probe_key_name| { + let table_schema = merge_into_join.table.as_ref().ok_or_else(|| { + ErrorCode::Internal( + "can't get merge into target table schema when build bloom info, it's a bug", + ) + })?.schema(); + let index = table_schema.index_of(probe_key_name)?; + acc.0.push(acc.0.len()); + acc.1.push(OnConflictField { table_field: table_schema.field(index).clone(), field_index: index }); + Ok::<_, ErrorCode>(acc) + })?; + assert_eq!(bloom_fields.len(), bloom_indexes.len()); + merge_into_source_build_bloom_info.bloom_fields = bloom_fields; + merge_into_source_build_bloom_info.bloom_indexes = bloom_indexes; + } + + let bloom_fields = merge_into_source_build_bloom_info.bloom_fields.clone(); + let bloom_indexes = merge_into_source_build_bloom_info.bloom_indexes.clone(); + let operator = fuse_table.get_operator(); + + let filters = GlobalIORuntime::instance().block_on(async move { + load_bloom_filter( + operator, + &bloom_fields, + &index_location, + bloom_filter_index_size, + &bloom_indexes, + ) + .await + }); + Ok(try_prune_use_bloom_filter( + filters, + &ctx.get_merge_into_source_build_siphashkeys_with_id(id) + .unwrap() + .1 + .read(), + )) + } else { + Ok(false) + } } pub(crate) fn update_bitmap_with_bloom_filter( diff --git a/src/query/storages/fuse/src/operations/read/util.rs b/src/query/storages/fuse/src/operations/read/util.rs index e93dfe503c82d..132a4dbeeae27 100644 --- a/src/query/storages/fuse/src/operations/read/util.rs +++ b/src/query/storages/fuse/src/operations/read/util.rs @@ -12,31 +12,68 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::collections::HashMap; use std::sync::Arc; +use databend_common_catalog::merge_into_join::MergeIntoJoin; use databend_common_catalog::merge_into_join::MergeIntoJoinType; use databend_common_catalog::plan::gen_mutation_stream_meta; use databend_common_catalog::plan::InternalColumnMeta; +use databend_common_catalog::table::Table; use databend_common_catalog::table_context::TableContext; use databend_common_exception::Result; use databend_common_expression::BlockMetaInfoPtr; use databend_common_expression::DataBlock; +use databend_common_expression::FieldIndex; use databend_common_expression::Scalar; +use databend_common_sql::executor::physical_plans::OnConflictField; +use databend_storages_common_table_meta::meta::SegmentInfo; use crate::operations::BlockMetaIndex; +use crate::operations::SegmentIndex; use crate::FuseBlockPartInfo; +pub struct MergeIntoSourceBuildBloomInfo { + pub can_do_merge_into_runtime_filter_bloom: bool, + pub segment_infos: HashMap, + pub table: Option>, + pub bloom_indexes: Vec, + pub bloom_fields: Vec, + pub init_bloom_index_info: bool, + pub target_table_index: usize, +} + pub fn need_reserve_block_info(ctx: Arc, table_idx: usize) -> (bool, bool) { let merge_into_join = ctx.get_merge_into_join(); ( matches!( merge_into_join.merge_into_join_type, MergeIntoJoinType::Left - ) && merge_into_join.target_tbl_idx == table_idx, + ) && merge_into_join.target_tbl_idx == table_idx + && !merge_into_join.is_distributed, /* we don't support distributed mod for target build optimization for now, */ merge_into_join.is_distributed, ) } +pub fn can_merge_into_target_build_bloom_filter( + ctx: Arc, + table_idx: usize, +) -> Result { + let merge_into_join = ctx.get_merge_into_join(); + let enabled = matches!( + merge_into_join.merge_into_join_type, + MergeIntoJoinType::Right + ) && merge_into_join.target_tbl_idx == table_idx; + if enabled { + assert!( + ctx.get_settings() + .get_enable_merge_into_source_build_bloom()? + ); + assert!(merge_into_join.table.is_some()); + } + Ok(enabled) +} + pub(crate) fn add_data_block_meta( block: DataBlock, fuse_part: &FuseBlockPartInfo, @@ -82,3 +119,22 @@ pub(crate) fn add_data_block_meta( } block.add_meta(meta) } + +pub fn build_merge_into_source_build_bloom_info( + ctx: Arc, + table_index: usize, + merge_into_join: MergeIntoJoin, +) -> Result { + let enabled_bloom_filter = can_merge_into_target_build_bloom_filter(ctx.clone(), table_index)?; + + Ok(MergeIntoSourceBuildBloomInfo { + can_do_merge_into_runtime_filter_bloom: enabled_bloom_filter, + segment_infos: Default::default(), + table: merge_into_join.table.clone(), + // update bloom_indexes and bloom_field after pipeline running. + bloom_indexes: vec![], + bloom_fields: vec![], + init_bloom_index_info: false, + target_table_index: table_index, + }) +} diff --git a/src/query/storages/fuse/src/operations/replace_into/mutator/merge_into_mutator.rs b/src/query/storages/fuse/src/operations/replace_into/mutator/merge_into_mutator.rs index 8d428c9318bd2..c5d114719ec20 100644 --- a/src/query/storages/fuse/src/operations/replace_into/mutator/merge_into_mutator.rs +++ b/src/query/storages/fuse/src/operations/replace_into/mutator/merge_into_mutator.rs @@ -673,7 +673,7 @@ impl AggregationContext { } // return true if the block is pruned, otherwise false - async fn apply_bloom_pruning( + pub async fn apply_bloom_pruning( &self, block_meta: &BlockMeta, input_hashes: &[Vec], @@ -683,107 +683,116 @@ impl AggregationContext { return false; } if let Some(loc) = &block_meta.bloom_filter_index_location { - match self - .load_bloom_filter( - loc, - block_meta.bloom_filter_index_size, - bloom_on_conflict_field_index, - ) - .await - { - Ok(filters) => { - // the caller ensures that the input_hashes is not empty - let row_count = input_hashes[0].len(); - - // let assume that the target block is prunable - let mut block_pruned = true; - for row in 0..row_count { - // for each row, by default, assume that columns of this row do have conflict with the target block. - let mut row_not_prunable = true; - for (col_idx, col_hash) in input_hashes.iter().enumerate() { - // For each column of current row, check if the corresponding bloom - // filter contains the digest of the column. - // - // Any one of the columns NOT contains by the corresponding bloom filter, - // indicates that the row is prunable(thus, we do not stop on the first column that - // the bloom filter contains). - - // - if bloom filter presents, check if the column is contained - // - if bloom filter absents, do nothing(since by default, we assume that the row is not-prunable) - if let Some(col_filter) = &filters[col_idx] { - let hash = col_hash[row]; - if hash == 0 || !col_filter.contains_digest(hash) { - // - hash == 0 indicates that the column value is null, which equals nothing. - // - NOT `contains_digest`, indicates that this column of row does not match - row_not_prunable = false; - // if one column not match, we do not need to check other columns - break; - } - } - } - if row_not_prunable { - // any row not prunable indicates that the target block is not prunable - block_pruned = false; + let filters = load_bloom_filter( + self.data_accessor.clone(), + &self.on_conflict_fields, + loc, + block_meta.bloom_filter_index_size, + bloom_on_conflict_field_index, + ) + .await; + try_prune_use_bloom_filter(filters, input_hashes) + } else { + // no bloom filter, no pruning + false + } + } +} + +pub fn try_prune_use_bloom_filter( + filters: Result>>>, + input_hashes: &[Vec], +) -> bool { + match filters { + Ok(filters) => { + // the caller ensures that the input_hashes is not empty + let row_count = input_hashes[0].len(); + + // let assume that the target block is prunable + let mut block_pruned = true; + for row in 0..row_count { + // for each row, by default, assume that columns of this row do have conflict with the target block. + let mut row_not_prunable = true; + for (col_idx, col_hash) in input_hashes.iter().enumerate() { + // For each column of current row, check if the corresponding bloom + // filter contains the digest of the column. + // + // Any one of the columns NOT contains by the corresponding bloom filter, + // indicates that the row is prunable(thus, we do not stop on the first column that + // the bloom filter contains). + + // - if bloom filter presents, check if the column is contained + // - if bloom filter absents, do nothing(since by default, we assume that the row is not-prunable) + if let Some(col_filter) = &filters[col_idx] { + let hash = col_hash[row]; + if hash == 0 || !col_filter.contains_digest(hash) { + // - hash == 0 indicates that the column value is null, which equals nothing. + // - NOT `contains_digest`, indicates that this column of row does not match + row_not_prunable = false; + // if one column not match, we do not need to check other columns break; } } - block_pruned } - Err(e) => { - // broken index should not stop us: - warn!("failed to build bloom index column name: {}", e); - // failed to load bloom filter, do not prune - false + if row_not_prunable { + // any row not prunable indicates that the target block is not prunable + block_pruned = false; + break; } } - } else { - // no bloom filter, no pruning + block_pruned + } + Err(e) => { + // broken index should not stop us: + warn!("failed to build bloom index column name: {}", e); + // failed to load bloom filter, do not prune false } } +} - async fn load_bloom_filter( - &self, - location: &Location, - index_len: u64, - bloom_on_conflict_field_index: &[FieldIndex], - ) -> Result>>> { - // different block may have different version of bloom filter index - let mut col_names = Vec::with_capacity(bloom_on_conflict_field_index.len()); - - for idx in bloom_on_conflict_field_index { - let bloom_column_name = BloomIndex::build_filter_column_name( - location.1, - &self.on_conflict_fields[*idx].table_field, - )?; - col_names.push(bloom_column_name); - } +pub async fn load_bloom_filter( + data_accessor: Operator, + on_conflict_fields: &[OnConflictField], + location: &Location, + index_len: u64, + bloom_on_conflict_field_index: &[FieldIndex], +) -> Result>>> { + // different block may have different version of bloom filter index + let mut col_names = Vec::with_capacity(bloom_on_conflict_field_index.len()); + + for idx in bloom_on_conflict_field_index { + let bloom_column_name = BloomIndex::build_filter_column_name( + location.1, + &on_conflict_fields[*idx].table_field, + )?; + col_names.push(bloom_column_name); + } - // using load_bloom_filter_by_columns is attractive, - // but it do not care about the version of the bloom filter index - let block_filter = location - .read_block_filter(self.data_accessor.clone(), &col_names, index_len) - .await?; + // using load_bloom_filter_by_columns is attractive, + // but it do not care about the version of the bloom filter index + let block_filter = location + .read_block_filter(data_accessor.clone(), &col_names, index_len) + .await?; - // reorder the filter according to the order of bloom_on_conflict_field - let mut filters = Vec::with_capacity(bloom_on_conflict_field_index.len()); - for filter_col_name in &col_names { - match block_filter.filter_schema.index_of(filter_col_name) { - Ok(idx) => { - filters.push(Some(block_filter.filters[idx].clone())); - } - Err(_) => { - info!( - "bloom filter column {} not found for block {}", - filter_col_name, location.0 - ); - filters.push(None); - } + // reorder the filter according to the order of bloom_on_conflict_field + let mut filters = Vec::with_capacity(bloom_on_conflict_field_index.len()); + for filter_col_name in &col_names { + match block_filter.filter_schema.index_of(filter_col_name) { + Ok(idx) => { + filters.push(Some(block_filter.filters[idx].clone())); + } + Err(_) => { + info!( + "bloom filter column {} not found for block {}", + filter_col_name, location.0 + ); + filters.push(None); } } - - Ok(filters) } + + Ok(filters) } #[cfg(test)] diff --git a/src/query/storages/fuse/src/operations/replace_into/mutator/mod.rs b/src/query/storages/fuse/src/operations/replace_into/mutator/mod.rs index 1763ea1fb74ba..06b99c7d760df 100644 --- a/src/query/storages/fuse/src/operations/replace_into/mutator/mod.rs +++ b/src/query/storages/fuse/src/operations/replace_into/mutator/mod.rs @@ -20,5 +20,7 @@ mod mutator_replace_into; pub use column_hash::row_hash_of_columns; pub use deletion_accumulator::BlockDeletionKeys; pub use deletion_accumulator::DeletionAccumulator; +pub use merge_into_mutator::load_bloom_filter; +pub use merge_into_mutator::try_prune_use_bloom_filter; pub use merge_into_mutator::MergeIntoOperationAggregator; pub use mutator_replace_into::ReplaceIntoMutator; diff --git a/tests/sqllogictests/suites/mode/cluster/merge_into_non_equal_distributed.test b/tests/sqllogictests/suites/mode/cluster/merge_into_non_equal_distributed.test index 9a94c8c0ddc23..e1fcede45ac84 100644 --- a/tests/sqllogictests/suites/mode/cluster/merge_into_non_equal_distributed.test +++ b/tests/sqllogictests/suites/mode/cluster/merge_into_non_equal_distributed.test @@ -61,6 +61,7 @@ target_table: default.default.t1 ├── distributed: false ├── target_build_optimization: false ├── can_try_update_column_only: true +├── can_merge_into_source_build_bloom: false ├── matched update: [condition: None,update set a = a (#0)] ├── unmatched insert: [condition: None,insert into (a) values(CAST(a (#0) AS Int32 NULL))] └── Join(Right) @@ -140,6 +141,7 @@ target_table: default.default.t1 ├── distributed: false ├── target_build_optimization: false ├── can_try_update_column_only: true +├── can_merge_into_source_build_bloom: false ├── matched update: [condition: None,update set a = a (#0)] ├── unmatched insert: [condition: None,insert into (a) values(CAST(a (#0) AS Int32 NULL))] └── Exchange(Merge) diff --git a/tests/sqllogictests/suites/mode/cluster/merge_into_source_build_bloom.test b/tests/sqllogictests/suites/mode/cluster/merge_into_source_build_bloom.test new file mode 100644 index 0000000000000..7b73ce783246e --- /dev/null +++ b/tests/sqllogictests/suites/mode/cluster/merge_into_source_build_bloom.test @@ -0,0 +1,308 @@ +statement ok +set enable_merge_into_source_build_bloom = 1; + +statement ok +set enable_distributed_merge_into = 1; + +statement ok +set join_spilling_memory_ratio = 0; + +statement ok +set enable_experimental_merge_into = 1; + +statement ok +drop table if exists target_bloom_table; + +statement ok +drop table if exists source_bloom_table; + +statement ok +create table target_bloom_table(a string,b int); + +statement ok +create table source_bloom_table(a string,b int); + +statement ok +insert into source_bloom_table values('abc',2),('def',1); + +statement ok +insert into source_bloom_table values('ff',18),('kgt',27); + +query T +select count(*) from fuse_block('default','source_bloom_table'); +---- +2 + +statement ok +insert into target_bloom_table values('ak7',992),('def',213); + +statement ok +insert into target_bloom_table values('abc12',22),('mkgt',73); + +statement ok +insert into target_bloom_table values('ab77c',93),('dqef',107); + +statement ok +insert into target_bloom_table values('falf',189),('krrgt',207); + +query T +select count(*) from fuse_block('default','target_bloom_table'); +---- +4 + +query T +explain merge into target_bloom_table as t1 using source_bloom_table as t2 on t1.a = t2.a when matched then update * when not matched then insert *; +---- +MergeInto: +target_table: default.default.target_bloom_table +├── distributed: true +├── target_build_optimization: false +├── can_try_update_column_only: true +├── can_merge_into_source_build_bloom: true +├── matched update: [condition: None,update set a = a (#0),b = b (#1)] +├── unmatched insert: [condition: None,insert into (a,b) values(CAST(a (#0) AS String NULL),CAST(b (#1) AS Int32 NULL))] +└── Exchange(Merge) + └── Join(Right) + ├── build keys: [t2.a (#0)] + ├── probe keys: [t1.a (#2)] + ├── other filters: [] + ├── Scan + │ ├── table: default.target_bloom_table + │ ├── filters: [] + │ ├── order by: [] + │ └── limit: NONE + └── Exchange(Broadcast) + └── AddRowNumber(AddRowNumber) + └── Scan + ├── table: default.source_bloom_table + ├── filters: [] + ├── order by: [] + └── limit: NONE + +query TT +merge into target_bloom_table as t1 using source_bloom_table as t2 on t1.a = t2.a when matched then update * when not matched then insert *; +---- +3 1 + +query TT +select * from target_bloom_table order by a,b; +---- +ab77c 93 +abc 2 +abc12 22 +ak7 992 +def 1 +dqef 107 +falf 189 +ff 18 +kgt 27 +krrgt 207 +mkgt 73 + + +statement ok +drop table if exists target_bloom_reorder; + +statement ok +drop table if exists source_bloom_reorder; + +statement ok +create table target_bloom_reorder(a int,b string); + +statement ok +create table source_bloom_reorder(a int,b string); + +statement ok +insert into target_bloom_reorder values(1,'ab'),(2,'cd'); + +statement ok +insert into target_bloom_reorder values(1,'gh'),(2,'ij'); + +statement ok +insert into target_bloom_reorder values(3,'abc'); + +statement ok +insert into source_bloom_reorder values(4,'abc'),(5,'ffff'); + +query T +select count(*) from fuse_block('default','target_bloom_reorder'); +---- +3 + +query T +select count(*) from fuse_block('default','source_bloom_reorder'); +---- +1 + +query T +explain merge into target_bloom_reorder as t1 using source_bloom_reorder as t2 on t1.b = t2.b when matched then update * when not matched then insert *; +---- +MergeInto: +target_table: default.default.target_bloom_reorder +├── distributed: true +├── target_build_optimization: false +├── can_try_update_column_only: true +├── can_merge_into_source_build_bloom: true +├── matched update: [condition: None,update set a = a (#0),b = b (#1)] +├── unmatched insert: [condition: None,insert into (a,b) values(CAST(a (#0) AS Int32 NULL),CAST(b (#1) AS String NULL))] +└── Exchange(Merge) + └── Join(Right) + ├── build keys: [t2.b (#1)] + ├── probe keys: [t1.b (#3)] + ├── other filters: [] + ├── Scan + │ ├── table: default.target_bloom_reorder + │ ├── filters: [] + │ ├── order by: [] + │ └── limit: NONE + └── Exchange(Broadcast) + └── AddRowNumber(AddRowNumber) + └── Scan + ├── table: default.source_bloom_reorder + ├── filters: [] + ├── order by: [] + └── limit: NONE + +query T +merge into target_bloom_reorder as t1 using source_bloom_reorder as t2 on t1.b = t2.b when matched then update * when not matched then insert *; +---- +1 1 + +query TT +select b,a from target_bloom_reorder order by b,a; +---- +ab 1 +abc 4 +cd 2 +ffff 5 +gh 1 +ij 2 + +### test not null column, we can't prune now, but the sql run until to be finished. +statement ok +CREATE TABLE IF NOT EXISTS lineitem_target_origin_200_blocks1 ( + l_orderkey BIGINT not null, + l_partkey BIGINT not null, + l_suppkey BIGINT not null, + l_linenumber BIGINT not null, + l_quantity DECIMAL(15, 2) not null, + l_extendedprice DECIMAL(15, 2) not null, + l_discount DECIMAL(15, 2) not null, + l_tax DECIMAL(15, 2) not null, + l_returnflag STRING not null, + l_linestatus STRING not null, + l_shipdate DATE not null, + l_commitdate DATE not null, + l_receiptdate DATE not null, + l_shipinstruct STRING not null, + l_shipmode STRING not null, + l_comment STRING not null +) CLUSTER BY(l_shipdate, l_orderkey); + +statement ok +CREATE TABLE IF NOT EXISTS lineitem_target_origin_400_blocks1 ( + l_orderkey BIGINT not null, + l_partkey BIGINT not null, + l_suppkey BIGINT not null, + l_linenumber BIGINT not null, + l_quantity DECIMAL(15, 2) not null, + l_extendedprice DECIMAL(15, 2) not null, + l_discount DECIMAL(15, 2) not null, + l_tax DECIMAL(15, 2) not null, + l_returnflag STRING not null, + l_linestatus STRING not null, + l_shipdate DATE not null, + l_commitdate DATE not null, + l_receiptdate DATE not null, + l_shipinstruct STRING not null, + l_shipmode STRING not null, + l_comment STRING not null +) CLUSTER BY(l_shipdate, l_orderkey); + +statement ok +CREATE TABLE IF NOT EXISTS lineitem_random ( + l_orderkey BIGINT not null, + l_partkey BIGINT not null, + l_suppkey BIGINT not null, + l_linenumber BIGINT not null, + l_quantity DECIMAL(15, 2) not null, + l_extendedprice DECIMAL(15, 2) not null, + l_discount DECIMAL(15, 2) not null, + l_tax DECIMAL(15, 2) not null, + l_returnflag STRING not null, + l_linestatus STRING not null, + l_shipdate DATE not null, + l_commitdate DATE not null, + l_receiptdate DATE not null, + l_shipinstruct STRING not null, + l_shipmode STRING not null, + l_comment STRING not null +) engine = random; + +statement ok +insert into lineitem_target_origin_400_blocks1 select * from lineitem_random limit 10; + +statement ok +insert into lineitem_target_origin_400_blocks1 select * from lineitem_random limit 10; + +statement ok +insert into lineitem_target_origin_400_blocks1 select * from lineitem_random limit 10; + +statement ok +insert into lineitem_target_origin_200_blocks1 select * from lineitem_random limit 10; + +statement ok +explain analyze MERGE INTO lineitem_target_origin_400_blocks1 as t1 using lineitem_target_origin_200_blocks1 as t2 on + t1.l_orderkey = t2.l_orderkey and + t1.l_partkey = t2.l_partkey and t1.l_suppkey = t2.l_suppkey and + t1.l_linenumber = t2.l_linenumber and + t1.l_quantity = t2.l_quantity and + t1.l_extendedprice = t2.l_extendedprice and + t1.l_discount = t2.l_discount + when matched then update set + t1.l_orderkey = t2.l_orderkey, + t1.l_partkey = t2.l_partkey, + t1.l_suppkey = t2.l_suppkey, + t1.l_linenumber = t2.l_linenumber, + t1.l_quantity = t2.l_quantity, + t1.l_extendedprice = t2.l_extendedprice, + t1.l_discount = t2.l_discount, + t1.l_tax = t2.l_tax, + t1.l_returnflag = t2.l_returnflag, + t1.l_linestatus = t2.l_linestatus, + t1.l_shipdate = t2.l_shipdate, + t1.l_commitdate = t2.l_commitdate, + t1.l_receiptdate = t2.l_receiptdate, + t1.l_shipinstruct = t2.l_shipinstruct, + t1.l_shipmode = t2.l_shipmode, + t1.l_comment = t2.l_comment + when not matched then insert + values( + t2.l_orderkey, + t2.l_partkey, + t2.l_suppkey, + t2.l_linenumber, + t2.l_quantity, + t2.l_extendedprice, + t2.l_discount, + t2.l_tax, + t2.l_returnflag, + t2.l_linestatus, + t2.l_shipdate, + t2.l_commitdate, + t2.l_receiptdate, + t2.l_shipinstruct, + t2.l_shipmode, + t2.l_comment); + +statement ok +set enable_merge_into_source_build_bloom = 0; + +statement ok +set enable_experimental_merge_into = 0; + +statement ok +set join_spilling_memory_ratio = 60; + +statement ok +set enable_distributed_merge_into = 0; \ No newline at end of file diff --git a/tests/sqllogictests/suites/mode/standalone/explain/09_0039_target_build_merge_into_standalone.test b/tests/sqllogictests/suites/mode/standalone/explain/09_0039_target_build_merge_into_standalone.test index 3780971e67508..c3a8a207d0681 100644 --- a/tests/sqllogictests/suites/mode/standalone/explain/09_0039_target_build_merge_into_standalone.test +++ b/tests/sqllogictests/suites/mode/standalone/explain/09_0039_target_build_merge_into_standalone.test @@ -62,6 +62,7 @@ target_table: default.default.target_build_optimization ├── distributed: false ├── target_build_optimization: true ├── can_try_update_column_only: true +├── can_merge_into_source_build_bloom: false ├── matched update: [condition: None,update set a = t2.a (#0),b = t2.b (#1),c = t2.c (#2)] ├── unmatched insert: [condition: None,insert into (a,b,c) values(CAST(a (#0) AS Int32 NULL),CAST(b (#1) AS String NULL),CAST(c (#2) AS String NULL))] └── Join(Left) @@ -124,6 +125,7 @@ target_table: default.default.target_build_optimization ├── distributed: false ├── target_build_optimization: true ├── can_try_update_column_only: true +├── can_merge_into_source_build_bloom: false ├── matched update: [condition: None,update set a = t2.a (#0),b = t2.b (#1),c = t2.c (#2)] ├── unmatched insert: [condition: None,insert into (a,b,c) values(CAST(a (#0) AS Int32 NULL),CAST(b (#1) AS String NULL),CAST(c (#2) AS String NULL))] └── Join(Left) @@ -203,6 +205,7 @@ target_table: default.default.target_build_optimization ├── distributed: false ├── target_build_optimization: false ├── can_try_update_column_only: true +├── can_merge_into_source_build_bloom: false ├── matched update: [condition: None,update set a = t2.a (#0),b = t2.b (#1),c = t2.c (#2)] ├── unmatched insert: [condition: None,insert into (a,b,c) values(CAST(a (#0) AS Int32 NULL),CAST(b (#1) AS String NULL),CAST(c (#2) AS String NULL))] └── Join(Left) @@ -286,6 +289,7 @@ target_table: default.default.target_build_optimization ├── distributed: false ├── target_build_optimization: false ├── can_try_update_column_only: true +├── can_merge_into_source_build_bloom: false ├── matched update: [condition: None,update set a = t2.a (#0),b = t2.b (#1),c = t2.c (#2)] ├── unmatched insert: [condition: None,insert into (a,b,c) values(CAST(a (#0) AS Int32 NULL),CAST(b (#1) AS String NULL),CAST(c (#2) AS String NULL))] └── Join(Left) diff --git a/tests/sqllogictests/suites/mode/standalone/explain/merge_into.test b/tests/sqllogictests/suites/mode/standalone/explain/merge_into.test index e8098921e116d..b4e2da2fbe06e 100644 --- a/tests/sqllogictests/suites/mode/standalone/explain/merge_into.test +++ b/tests/sqllogictests/suites/mode/standalone/explain/merge_into.test @@ -26,6 +26,7 @@ target_table: default.default.salaries2 ├── distributed: false ├── target_build_optimization: false ├── can_try_update_column_only: false +├── can_merge_into_source_build_bloom: false ├── matched update: [condition: eq(employees2.department (#2), 'HR'),update set salary = plus(salaries2.salary (#4), 1000.00)] ├── matched update: [condition: None,update set salary = plus(salaries2.salary (#4), 500.00)] ├── unmatched insert: [condition: None,insert into (employee_id,salary) values(CAST(employees2.employee_id (#0) AS Int32 NULL),CAST(55000.00 AS Decimal(10, 2) NULL))] @@ -57,6 +58,7 @@ target_table: default.default.salaries2 ├── distributed: false ├── target_build_optimization: false ├── can_try_update_column_only: false +├── can_merge_into_source_build_bloom: false ├── matched update: [condition: eq(employees2.department (#2), 'HR'),update set salary = plus(salaries2.salary (#4), 1000.00)] ├── matched update: [condition: None,update set salary = plus(salaries2.salary (#4), 500.00)] ├── unmatched insert: [condition: None,insert into (employee_id,salary) values(CAST(employees2.employee_id (#0) AS Int32 NULL),CAST(55000.00 AS Decimal(10, 2) NULL))] @@ -102,6 +104,7 @@ target_table: default.default.column_only_optimization_target ├── distributed: false ├── target_build_optimization: true ├── can_try_update_column_only: true +├── can_merge_into_source_build_bloom: false ├── matched update: [condition: None,update set b = t2.b (#1)] ├── unmatched insert: [condition: None,insert into (a,b) values(CAST(a (#0) AS Int32 NULL),CAST(b (#1) AS String NULL))] └── Join(Left) @@ -128,6 +131,7 @@ target_table: default.default.column_only_optimization_target ├── distributed: false ├── target_build_optimization: true ├── can_try_update_column_only: true +├── can_merge_into_source_build_bloom: false ├── matched update: [condition: None,update set a = a (#0),b = b (#1)] ├── unmatched insert: [condition: None,insert into (a,b) values(CAST(a (#0) AS Int32 NULL),CAST(b (#1) AS String NULL))] └── Join(Left) @@ -157,6 +161,7 @@ target_table: default.default.column_only_optimization_target ├── distributed: false ├── target_build_optimization: false ├── can_try_update_column_only: false +├── can_merge_into_source_build_bloom: false ├── matched update: [condition: None,update set b = 'test'] ├── unmatched insert: [condition: None,insert into (a,b) values(CAST(a (#0) AS Int32 NULL),CAST(b (#1) AS String NULL))] └── Join(Left) @@ -183,6 +188,7 @@ target_table: default.default.column_only_optimization_target ├── distributed: false ├── target_build_optimization: false ├── can_try_update_column_only: false +├── can_merge_into_source_build_bloom: false ├── matched update: [condition: None,update set b = concat(t2.b (#1), 'test')] ├── unmatched insert: [condition: None,insert into (a,b) values(CAST(a (#0) AS Int32 NULL),CAST(b (#1) AS String NULL))] └── Join(Left) diff --git a/tests/sqllogictests/suites/mode/standalone/explain/merge_into_source_build_bloom_filter.test b/tests/sqllogictests/suites/mode/standalone/explain/merge_into_source_build_bloom_filter.test new file mode 100644 index 0000000000000..0e1b1a6d70e65 --- /dev/null +++ b/tests/sqllogictests/suites/mode/standalone/explain/merge_into_source_build_bloom_filter.test @@ -0,0 +1,290 @@ +statement ok +set enable_merge_into_source_build_bloom = 1; + +statement ok +set enable_experimental_merge_into = 1; + +statement ok +drop table if exists target_bloom_table; + +statement ok +drop table if exists source_bloom_table; + +statement ok +create table target_bloom_table(a string,b int); + +statement ok +create table source_bloom_table(a string,b int); + +statement ok +insert into source_bloom_table values('abc',2),('def',1); + +statement ok +insert into source_bloom_table values('ff',18),('kgt',27); + +query T +select count(*) from fuse_block('default','source_bloom_table'); +---- +2 + +statement ok +insert into target_bloom_table values('ak7',992),('def',213); + +statement ok +insert into target_bloom_table values('abc12',22),('mkgt',73); + +statement ok +insert into target_bloom_table values('ab77c',93),('dqef',107); + +statement ok +insert into target_bloom_table values('falf',189),('krrgt',207); + +query T +select count(*) from fuse_block('default','target_bloom_table'); +---- +4 + +query T +explain merge into target_bloom_table as t1 using source_bloom_table as t2 on t1.a = t2.a when matched then update * when not matched then insert *; +---- +MergeInto: +target_table: default.default.target_bloom_table +├── distributed: false +├── target_build_optimization: false +├── can_try_update_column_only: true +├── can_merge_into_source_build_bloom: true +├── matched update: [condition: None,update set a = a (#0),b = b (#1)] +├── unmatched insert: [condition: None,insert into (a,b) values(CAST(a (#0) AS String NULL),CAST(b (#1) AS Int32 NULL))] +└── Join(Right) + ├── build keys: [t2.a (#0)] + ├── probe keys: [t1.a (#2)] + ├── other filters: [] + ├── Scan + │ ├── table: default.target_bloom_table + │ ├── filters: [] + │ ├── order by: [] + │ └── limit: NONE + └── Scan + ├── table: default.source_bloom_table + ├── filters: [] + ├── order by: [] + └── limit: NONE + +query TT +merge into target_bloom_table as t1 using source_bloom_table as t2 on t1.a = t2.a when matched then update * when not matched then insert *; +---- +3 1 + +query TT +select * from target_bloom_table order by a,b; +---- +ab77c 93 +abc 2 +abc12 22 +ak7 992 +def 1 +dqef 107 +falf 189 +ff 18 +kgt 27 +krrgt 207 +mkgt 73 + +## test col order +statement ok +drop table if exists target_bloom_reorder; + +statement ok +drop table if exists source_bloom_reorder; + +statement ok +create table target_bloom_reorder(a int,b string); + +statement ok +create table source_bloom_reorder(a int,b string); + +statement ok +insert into target_bloom_reorder values(1,'ab'),(2,'cd'); + +statement ok +insert into target_bloom_reorder values(1,'gh'),(2,'ij'); + +statement ok +insert into target_bloom_reorder values(3,'abc'); + +statement ok +insert into source_bloom_reorder values(4,'abc'),(5,'ffff'); + +query T +select count(*) from fuse_block('default','target_bloom_reorder'); +---- +3 + +query T +select count(*) from fuse_block('default','source_bloom_reorder'); +---- +1 + +query T +explain merge into target_bloom_reorder as t1 using source_bloom_reorder as t2 on t1.b = t2.b when matched then update * when not matched then insert *; +---- +MergeInto: +target_table: default.default.target_bloom_reorder +├── distributed: false +├── target_build_optimization: false +├── can_try_update_column_only: true +├── can_merge_into_source_build_bloom: true +├── matched update: [condition: None,update set a = a (#0),b = b (#1)] +├── unmatched insert: [condition: None,insert into (a,b) values(CAST(a (#0) AS Int32 NULL),CAST(b (#1) AS String NULL))] +└── Join(Right) + ├── build keys: [t2.b (#1)] + ├── probe keys: [t1.b (#3)] + ├── other filters: [] + ├── Scan + │ ├── table: default.target_bloom_reorder + │ ├── filters: [] + │ ├── order by: [] + │ └── limit: NONE + └── Scan + ├── table: default.source_bloom_reorder + ├── filters: [] + ├── order by: [] + └── limit: NONE + +query T +merge into target_bloom_reorder as t1 using source_bloom_reorder as t2 on t1.b = t2.b when matched then update * when not matched then insert *; +---- +1 1 + +query TT +select b,a from target_bloom_reorder order by b,a; +---- +ab 1 +abc 4 +cd 2 +ffff 5 +gh 1 +ij 2 + +### test not null column, we can't prune now, but the sql run until to be finished. +statement ok +CREATE TABLE IF NOT EXISTS lineitem_target_origin_200_blocks1 ( + l_orderkey BIGINT not null, + l_partkey BIGINT not null, + l_suppkey BIGINT not null, + l_linenumber BIGINT not null, + l_quantity DECIMAL(15, 2) not null, + l_extendedprice DECIMAL(15, 2) not null, + l_discount DECIMAL(15, 2) not null, + l_tax DECIMAL(15, 2) not null, + l_returnflag STRING not null, + l_linestatus STRING not null, + l_shipdate DATE not null, + l_commitdate DATE not null, + l_receiptdate DATE not null, + l_shipinstruct STRING not null, + l_shipmode STRING not null, + l_comment STRING not null +) CLUSTER BY(l_shipdate, l_orderkey); + +statement ok +CREATE TABLE IF NOT EXISTS lineitem_target_origin_400_blocks1 ( + l_orderkey BIGINT not null, + l_partkey BIGINT not null, + l_suppkey BIGINT not null, + l_linenumber BIGINT not null, + l_quantity DECIMAL(15, 2) not null, + l_extendedprice DECIMAL(15, 2) not null, + l_discount DECIMAL(15, 2) not null, + l_tax DECIMAL(15, 2) not null, + l_returnflag STRING not null, + l_linestatus STRING not null, + l_shipdate DATE not null, + l_commitdate DATE not null, + l_receiptdate DATE not null, + l_shipinstruct STRING not null, + l_shipmode STRING not null, + l_comment STRING not null +) CLUSTER BY(l_shipdate, l_orderkey); + +statement ok +CREATE TABLE IF NOT EXISTS lineitem_random ( + l_orderkey BIGINT not null, + l_partkey BIGINT not null, + l_suppkey BIGINT not null, + l_linenumber BIGINT not null, + l_quantity DECIMAL(15, 2) not null, + l_extendedprice DECIMAL(15, 2) not null, + l_discount DECIMAL(15, 2) not null, + l_tax DECIMAL(15, 2) not null, + l_returnflag STRING not null, + l_linestatus STRING not null, + l_shipdate DATE not null, + l_commitdate DATE not null, + l_receiptdate DATE not null, + l_shipinstruct STRING not null, + l_shipmode STRING not null, + l_comment STRING not null +) engine = random; + +statement ok +insert into lineitem_target_origin_400_blocks1 select * from lineitem_random limit 10; + +statement ok +insert into lineitem_target_origin_400_blocks1 select * from lineitem_random limit 10; + +statement ok +insert into lineitem_target_origin_400_blocks1 select * from lineitem_random limit 10; + +statement ok +insert into lineitem_target_origin_200_blocks1 select * from lineitem_random limit 10; + +statement ok +explain analyze MERGE INTO lineitem_target_origin_400_blocks1 as t1 using lineitem_target_origin_200_blocks1 as t2 on + t1.l_orderkey = t2.l_orderkey and + t1.l_partkey = t2.l_partkey and t1.l_suppkey = t2.l_suppkey and + t1.l_linenumber = t2.l_linenumber and + t1.l_quantity = t2.l_quantity and + t1.l_extendedprice = t2.l_extendedprice and + t1.l_discount = t2.l_discount + when matched then update set + t1.l_orderkey = t2.l_orderkey, + t1.l_partkey = t2.l_partkey, + t1.l_suppkey = t2.l_suppkey, + t1.l_linenumber = t2.l_linenumber, + t1.l_quantity = t2.l_quantity, + t1.l_extendedprice = t2.l_extendedprice, + t1.l_discount = t2.l_discount, + t1.l_tax = t2.l_tax, + t1.l_returnflag = t2.l_returnflag, + t1.l_linestatus = t2.l_linestatus, + t1.l_shipdate = t2.l_shipdate, + t1.l_commitdate = t2.l_commitdate, + t1.l_receiptdate = t2.l_receiptdate, + t1.l_shipinstruct = t2.l_shipinstruct, + t1.l_shipmode = t2.l_shipmode, + t1.l_comment = t2.l_comment + when not matched then insert + values( + t2.l_orderkey, + t2.l_partkey, + t2.l_suppkey, + t2.l_linenumber, + t2.l_quantity, + t2.l_extendedprice, + t2.l_discount, + t2.l_tax, + t2.l_returnflag, + t2.l_linestatus, + t2.l_shipdate, + t2.l_commitdate, + t2.l_receiptdate, + t2.l_shipinstruct, + t2.l_shipmode, + t2.l_comment); + +statement ok +set enable_merge_into_source_build_bloom = 0; + +statement ok +set enable_experimental_merge_into = 0;