Skip to content

Commit e9010e3

Browse files
committed
add bloom index for merge into runtime filter
1 parent e89fee3 commit e9010e3

File tree

9 files changed

+114
-2
lines changed

9 files changed

+114
-2
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

src/query/catalog/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ test = false
1010

1111
[dependencies]
1212
databend-common-base = { path = "../../common/base" }
13+
databend-common-arrow = { path = "../../common/arrow" }
1314
databend-common-config = { path = "../config" }
1415
databend-common-exception = { path = "../../common/exception" }
1516
databend-common-expression = { path = "../expression" }

src/query/catalog/src/plan/datasource/datasource_plan.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,12 +13,14 @@
1313
// limitations under the License.
1414

1515
use std::collections::BTreeMap;
16+
use std::collections::HashMap;
1617

1718
use databend_common_expression::FieldIndex;
1819
use databend_common_expression::RemoteExpr;
1920
use databend_common_expression::Scalar;
2021
use databend_common_expression::TableSchemaRef;
2122
use databend_common_meta_app::schema::CatalogInfo;
23+
use databend_storages_common_table_meta::meta::BlockMeta;
2224

2325
use crate::plan::datasource::datasource_info::DataSourceInfo;
2426
use crate::plan::PartStatistics;
@@ -49,6 +51,7 @@ pub struct DataSourcePlan {
4951
pub data_mask_policy: Option<BTreeMap<FieldIndex, RemoteExpr>>,
5052

5153
pub table_index: usize,
54+
// pub merge_into_target_table_block_meta: Option<HashMap<BlockMetaIndex, BlockMeta>>,
5255
}
5356

5457
impl DataSourcePlan {

src/query/catalog/src/runtime_filter_info.rs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,8 @@
1212
// See the License for the specific language governing permissions and
1313
// limitations under the License.
1414

15+
use databend_common_arrow::arrow::bitmap::Bitmap;
16+
use databend_common_arrow::arrow::buffer::Buffer;
1517
use databend_common_expression::Expr;
1618
use xorf::BinaryFuse16;
1719

@@ -20,6 +22,7 @@ pub struct RuntimeFilterInfo {
2022
inlist: Vec<Expr<String>>,
2123
min_max: Vec<Expr<String>>,
2224
bloom: Vec<(String, BinaryFuse16)>,
25+
siphashes: Vec<(String, (Buffer<u64>, Option<Bitmap>))>,
2326
}
2427

2528
impl RuntimeFilterInfo {
@@ -31,6 +34,13 @@ impl RuntimeFilterInfo {
3134
self.bloom.push(bloom);
3235
}
3336

37+
pub fn add_merge_into_source_build_siphashkeys(
38+
&mut self,
39+
digests: (String, (Buffer<u64>, Option<Bitmap>)),
40+
) {
41+
self.siphashes.push(digests);
42+
}
43+
3444
pub fn add_min_max(&mut self, expr: Expr<String>) {
3545
self.min_max.push(expr);
3646
}

src/query/service/src/pipelines/processors/transforms/hash_join/hash_join_build_state.rs

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,9 @@ use std::sync::atomic::Ordering;
2121
use std::sync::Arc;
2222

2323
use databend_common_arrow::arrow::bitmap::Bitmap;
24+
use databend_common_arrow::arrow::buffer::Buffer;
2425
use databend_common_base::base::tokio::sync::Barrier;
26+
use databend_common_catalog::merge_into_join::MergeIntoJoinType;
2527
use databend_common_catalog::runtime_filter_info::RuntimeFilterInfo;
2628
use databend_common_catalog::table_context::TableContext;
2729
use databend_common_exception::ErrorCode;
@@ -55,6 +57,7 @@ use databend_common_hashtable::StringRawEntry;
5557
use databend_common_hashtable::STRING_EARLY_SIZE;
5658
use databend_common_sql::plans::JoinType;
5759
use databend_common_sql::ColumnSet;
60+
use databend_storages_common_index::BloomIndex;
5861
use ethnum::U256;
5962
use itertools::Itertools;
6063
use log::info;
@@ -845,9 +848,19 @@ impl HashJoinBuildState {
845848
probe_key,
846849
)?;
847850
}
851+
852+
// add BloomIndex hash keys for merge into source build.
853+
self.build_merge_into_runtime_filter_siphashes(
854+
build_chunks,
855+
&mut runtime_filter,
856+
build_key,
857+
probe_key,
858+
)?;
859+
848860
if self.enable_bloom_runtime_filter {
849861
self.bloom_runtime_filter(build_chunks, &mut runtime_filter, build_key, probe_key)?;
850862
}
863+
851864
if self.enable_min_max_runtime_filter {
852865
self.min_max_runtime_filter(
853866
build_chunks,
@@ -912,6 +925,52 @@ impl HashJoinBuildState {
912925
Ok(())
913926
}
914927

928+
// for merge into source build cases, like below:
929+
// merge into `t1` using `t2` on xxx when matched xx when not matched xxx, if merge_into_optimizer
930+
// gives `t2` as source build side, we can build source join keys `siphashes`, that's because we use
931+
// siphash to build target table's bloom index block.
932+
// in this way, we can avoid current `runtime_filter()` func's performance cost, especially for large
933+
// target table case, the `runtime_filter()`'s cost is even higer than disable `runtime_filter()`.
934+
// However, for `build_runtime_filter_siphashes()` usages, we currently just used for merge into,
935+
// we doesn't support join query, and it's only for `source build` cases. In fact, source build is the
936+
// main case in most time.
937+
fn build_merge_into_runtime_filter_siphashes(
938+
&self,
939+
data_blocks: &[DataBlock],
940+
runtime_filter: &mut RuntimeFilterInfo,
941+
build_key: &Expr,
942+
probe_key: &Expr<String>,
943+
) -> Result<()> {
944+
// `calculate_nullable_column_digest`, `apply_bloom_pruning`
945+
let merge_type = self.ctx.get_merge_into_join();
946+
if matches!(merge_type.merge_into_join_type, MergeIntoJoinType::Right) {
947+
if let Expr::ColumnRef { id, .. } = probe_key {
948+
let mut columns = Vec::with_capacity(data_blocks.len());
949+
for block in data_blocks.iter() {
950+
if block.num_columns() == 0 {
951+
continue;
952+
}
953+
let evaluator = Evaluator::new(block, &self.func_ctx, &BUILTIN_FUNCTIONS);
954+
let column = evaluator
955+
.run(build_key)?
956+
.convert_to_full_column(build_key.data_type(), block.num_rows());
957+
columns.push(column);
958+
}
959+
if columns.is_empty() {
960+
return Ok(());
961+
}
962+
let build_key_column = Column::concat_columns(columns.into_iter())?;
963+
let digests = BloomIndex::calculate_nullable_column_digest(
964+
&self.func_ctx,
965+
&build_key_column,
966+
&build_key_column.data_type(),
967+
)?;
968+
runtime_filter.add_merge_into_source_build_siphashkeys((id.to_string(), digests));
969+
}
970+
}
971+
Ok(())
972+
}
973+
915974
fn inlist_runtime_filter(
916975
&self,
917976
runtime_filter: &mut RuntimeFilterInfo,

src/query/settings/src/settings_default.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -485,6 +485,12 @@ impl DefaultSettings {
485485
mode: SettingMode::Both,
486486
range: Some(SettingRange::Numeric(0..=1)),
487487
}),
488+
("enable_merge_into_source_build_bloom", DefaultSettingValue {
489+
value: UserSettingValue::UInt64(0),
490+
desc: "Enable merge into source build bloom.",
491+
mode: SettingMode::Both,
492+
range: Some(SettingRange::Numeric(0..=1)),
493+
}),
488494
("enable_distributed_merge_into", DefaultSettingValue {
489495
value: UserSettingValue::UInt64(0),
490496
desc: "Enable distributed merge into.",

src/query/settings/src/settings_getter_setter.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -438,6 +438,10 @@ impl Settings {
438438
Ok(self.try_get_u64("enable_experimental_merge_into")? != 0)
439439
}
440440

441+
pub fn get_enable_merge_into_source_build_bloom(&self) -> Result<bool> {
442+
Ok(self.try_get_u64("enable_merge_into_source_build_bloom")? != 0)
443+
}
444+
441445
pub fn get_enable_distributed_merge_into(&self) -> Result<bool> {
442446
Ok(self.try_get_u64("enable_distributed_merge_into")? != 0)
443447
}

src/query/sql/src/planner/format/display_plan.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -324,7 +324,7 @@ fn format_merge_into(merge_into: &MergeInto) -> Result<String> {
324324
));
325325
// add macthed clauses
326326
let mut matched_children = Vec::with_capacity(merge_into.matched_evaluators.len());
327-
let taregt_schema = table_entry.table().schema();
327+
let taregt_schema = table_entry.table().schema_with_stream();
328328
for evaluator in &merge_into.matched_evaluators {
329329
let condition_format = evaluator.condition.as_ref().map_or_else(
330330
|| "condition: None".to_string(),

src/query/storages/fuse/src/operations/read/fuse_source.rs

Lines changed: 29 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,8 @@
1515
use std::collections::VecDeque;
1616
use std::sync::Arc;
1717

18+
use databend_common_catalog::merge_into_join::MergeIntoJoinType;
19+
use databend_common_catalog::plan::DataSourceInfo;
1820
use databend_common_catalog::plan::DataSourcePlan;
1921
use databend_common_catalog::plan::PartInfoPtr;
2022
use databend_common_catalog::plan::StealablePartitions;
@@ -148,8 +150,34 @@ pub fn build_fuse_parquet_source_pipeline(
148150
(max_threads, max_io_requests) =
149151
adjust_threads_and_request(false, max_threads, max_io_requests, plan);
150152

151-
let mut source_builder = SourcePipeBuilder::create();
153+
let merge_into_join = ctx.get_merge_into_join();
154+
if ctx
155+
.get_settings()
156+
.get_enable_merge_into_source_build_bloom()?
157+
&& matches!(
158+
merge_into_join.merge_into_join_type,
159+
MergeIntoJoinType::Right
160+
)
161+
&& merge_into_join.target_tbl_idx == plan.table_index
162+
{
163+
// we can add block_metas info for merge into runtime filter, they will
164+
// be used for bloom prune for target table block.
165+
assert!(matches!(plan.source_info, DataSourceInfo::TableSource(_)));
166+
// if let DataSourceInfo::TableSource(table_info) = plan.source_info {
167+
// let table = ctx
168+
// .get_table(table_info.catalog(), table_info.name(), table_name)
169+
// .await?;
170+
// let fuse_table = table.as_any().downcast_ref::<FuseTable>().ok_or_else(|| {
171+
// ErrorCode::Unimplemented(format!(
172+
// "table {}, engine type {}, does not support MERGE INTO",
173+
// table.name(),
174+
// table.get_table_info().engine(),
175+
// ))
176+
// })?;
177+
// }
178+
}
152179

180+
let mut source_builder = SourcePipeBuilder::create();
153181
match block_reader.support_blocking_api() {
154182
true => {
155183
let partitions = dispatch_partitions(ctx.clone(), plan, max_threads);

0 commit comments

Comments
 (0)