Skip to content

Commit 7a41e4e

Browse files
committed
refactot build bloom info
1 parent d03501c commit 7a41e4e

File tree

12 files changed

+94
-43
lines changed

12 files changed

+94
-43
lines changed

src/query/catalog/src/merge_into_join.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414

1515
use std::sync::Arc;
1616

17+
use databend_common_expression::TableSchemaRef;
1718
use databend_common_meta_app::schema::CatalogInfo;
1819
use databend_common_meta_app::schema::TableInfo;
1920
use databend_storages_common_table_meta::meta::Location;
@@ -61,6 +62,7 @@ pub struct MergeIntoJoin {
6162
pub table_info: Option<TableInfo>,
6263
pub catalog_info: Option<CatalogInfo>,
6364
pub database_name: String,
65+
pub table_schema: Option<TableSchemaRef>,
6466
}
6567

6668
impl Default for MergeIntoJoin {
@@ -73,6 +75,7 @@ impl Default for MergeIntoJoin {
7375
table_info: None,
7476
catalog_info: None,
7577
database_name: Default::default(),
78+
table_schema: None,
7679
}
7780
}
7881
}

src/query/catalog/src/runtime_filter_info.rs

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,8 +14,6 @@
1414

1515
use std::sync::Arc;
1616

17-
use databend_common_arrow::arrow::bitmap::Bitmap;
18-
use databend_common_arrow::arrow::buffer::Buffer;
1917
use databend_common_expression::Expr;
2018
use xorf::BinaryFuse16;
2119

src/query/catalog/src/table_context.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -278,7 +278,9 @@ pub trait TableContext: Send + Sync {
278278
fn get_merge_into_source_build_siphashkeys_with_id(
279279
&self,
280280
id: usize,
281-
) -> Vec<(String, (Buffer<u64>, Option<Bitmap>))>;
281+
) -> Vec<(String, Arc<Vec<u64>>)>;
282+
283+
fn get_merge_into_source_build_bloom_probe_keys(&self, id: usize) -> Vec<String>;
282284

283285
fn has_bloom_runtime_filters(&self, id: usize) -> bool;
284286
fn txn_mgr(&self) -> TxnManagerRef;

src/query/service/src/sessions/query_ctx.rs

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1017,6 +1017,7 @@ impl TableContext for QueryContext {
10171017
catalog_info: merge_into_join.catalog_info.clone(),
10181018
table_info: merge_into_join.table_info.clone(),
10191019
database_name: merge_into_join.database_name.clone(),
1020+
table_schema: merge_into_join.table_schema.clone(),
10201021
}
10211022
}
10221023

@@ -1031,14 +1032,26 @@ impl TableContext for QueryContext {
10311032
fn get_merge_into_source_build_siphashkeys_with_id(
10321033
&self,
10331034
id: IndexType,
1034-
) -> Vec<(String, (Buffer<u64>, Option<Bitmap>))> {
1035+
) -> Vec<(String, Arc<Vec<u64>>)> {
10351036
let runtime_filters = self.shared.runtime_filters.read();
10361037
match runtime_filters.get(&id) {
10371038
Some(v) => v.get_merge_into_source_build_siphashkeys(),
10381039
None => vec![],
10391040
}
10401041
}
10411042

1043+
fn get_merge_into_source_build_bloom_probe_keys(&self, id: usize) -> Vec<String> {
1044+
let runtime_filters = self.shared.runtime_filters.read();
1045+
match runtime_filters.get(&id) {
1046+
Some(v) => v
1047+
.get_merge_into_source_build_siphashkeys()
1048+
.iter()
1049+
.map(|key| key.0)
1050+
.collect(),
1051+
None => vec![],
1052+
}
1053+
}
1054+
10421055
fn get_inlist_runtime_filter_with_id(&self, id: IndexType) -> Vec<Expr<String>> {
10431056
let runtime_filters = self.shared.runtime_filters.read();
10441057
match runtime_filters.get(&id) {

src/query/service/tests/it/sql/exec/get_table_bind_test.rs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -833,7 +833,11 @@ impl TableContext for CtxDelegation {
833833
fn get_merge_into_source_build_siphashkeys_with_id(
834834
&self,
835835
id: usize,
836-
) -> Vec<(String, (Buffer<u64>, Option<Bitmap>))> {
836+
) -> Vec<(String, Arc<Vec<u64>>)> {
837+
todo!()
838+
}
839+
840+
fn get_merge_into_source_build_bloom_probe_keys(&self, id: usize) -> Vec<String> {
837841
todo!()
838842
}
839843

src/query/sql/src/planner/binder/merge_into.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -488,6 +488,7 @@ impl Binder {
488488
row_id_index: column_binding.index,
489489
split_idx,
490490
can_try_update_column_only: self.can_try_update_column_only(&matched_clauses),
491+
table_schema,
491492
})
492493
}
493494

src/query/sql/src/planner/optimizer/optimizer.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -427,6 +427,7 @@ fn optimize_merge_into(opt_ctx: OptimizerContext, plan: Box<MergeInto>) -> Resul
427427
is_distributed: false, // we will set it after later optimization.
428428
merge_into_join_type: MergeIntoJoinType::Right,
429429
database_name: plan.database.clone(),
430+
table_schema: Some(plan.table_schema.clone()),
430431
})
431432
}
432433

@@ -517,6 +518,7 @@ fn optimize_merge_into(opt_ctx: OptimizerContext, plan: Box<MergeInto>) -> Resul
517518
catalog_info: merge_into_join.catalog_info.clone(),
518519
table_info: merge_into_join.table_info.clone(),
519520
database_name: merge_into_join.database_name.clone(),
521+
table_schema: merge_into_join.table_schema.clone(),
520522
})
521523
}
522524

@@ -547,6 +549,7 @@ fn optimize_merge_into(opt_ctx: OptimizerContext, plan: Box<MergeInto>) -> Resul
547549
catalog_info: merge_into_join.catalog_info.clone(),
548550
table_info: merge_into_join.table_info.clone(),
549551
database_name: merge_into_join.database_name,
552+
table_schema: merge_into_join.table_schema.clone(),
550553
})
551554
}
552555
Ok(Plan::MergeInto(Box::new(MergeInto {

src/query/sql/src/planner/plans/merge_into.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414

1515
use std::collections::HashMap;
1616
use std::collections::HashSet;
17+
use std::sync::Arc;
1718

1819
use databend_common_ast::ast::TableAlias;
1920
use databend_common_exception::ErrorCode;
@@ -24,6 +25,7 @@ use databend_common_expression::DataField;
2425
use databend_common_expression::DataSchemaRef;
2526
use databend_common_expression::DataSchemaRefExt;
2627
use databend_common_expression::FieldIndex;
28+
use databend_common_expression::TableSchema;
2729
use databend_common_meta_types::MetaId;
2830

2931
use crate::binder::MergeIntoType;
@@ -77,6 +79,8 @@ pub struct MergeInto {
7779
// `update *`` or `update set t1.a = t2.a ...`, the right expr on the `=` must be only a column,
7880
// we don't support complex expressions.
7981
pub can_try_update_column_only: bool,
82+
83+
pub table_schema: Arc<TableSchema>,
8084
}
8185

8286
impl std::fmt::Debug for MergeInto {

src/query/storages/fuse/src/operations/read/native_data_source_reader.rs

Lines changed: 11 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -33,8 +33,8 @@ use databend_common_pipeline_sources::SyncSourcer;
3333
use databend_common_sql::IndexType;
3434
use log::debug;
3535

36-
use super::can_merge_into_target_build_bloom_filter;
3736
use super::native_data_source::NativeDataSource;
37+
use super::util::build_merge_into_source_build_bloom_info;
3838
use super::util::MergeIntoSourceBuildBloomInfo;
3939
use crate::io::AggIndexReader;
4040
use crate::io::BlockReader;
@@ -91,16 +91,11 @@ impl ReadNativeDataSource<true> {
9191
virtual_reader,
9292
table_schema,
9393
table_index,
94-
merge_into_source_build_bloom_info: MergeIntoSourceBuildBloomInfo {
95-
can_do_merge_into_rumtime_filter_bloom: can_merge_into_target_build_bloom_filter(
96-
ctx.clone(),
97-
table_index,
98-
)?,
99-
segment_infos: Default::default(),
100-
catalog_info: merge_into_join.catalog_info.clone(),
101-
table_info: merge_into_join.table_info.clone(),
102-
database_name: merge_into_join.database_name.clone(),
103-
},
94+
merge_into_source_build_bloom_info: build_merge_into_source_build_bloom_info(
95+
ctx,
96+
table_index,
97+
merge_into_join,
98+
)?,
10499
})
105100
}
106101
}
@@ -135,16 +130,11 @@ impl ReadNativeDataSource<false> {
135130
virtual_reader,
136131
table_schema,
137132
table_index,
138-
merge_into_source_build_bloom_info: MergeIntoSourceBuildBloomInfo {
139-
can_do_merge_into_rumtime_filter_bloom: can_merge_into_target_build_bloom_filter(
140-
ctx.clone(),
141-
table_index,
142-
)?,
143-
segment_infos: Default::default(),
144-
catalog_info: merge_into_join.catalog_info.clone(),
145-
table_info: merge_into_join.table_info.clone(),
146-
database_name: merge_into_join.database_name.clone(),
147-
},
133+
merge_into_source_build_bloom_info: build_merge_into_source_build_bloom_info(
134+
ctx,
135+
table_index,
136+
merge_into_join,
137+
)?,
148138
})))
149139
}
150140
}

src/query/storages/fuse/src/operations/read/parquet_data_source_reader.rs

Lines changed: 11 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -32,8 +32,8 @@ use databend_common_pipeline_sources::SyncSourcer;
3232
use databend_common_sql::IndexType;
3333
use log::debug;
3434

35-
use super::can_merge_into_target_build_bloom_filter;
3635
use super::parquet_data_source::ParquetDataSource;
36+
use super::util::build_merge_into_source_build_bloom_info;
3737
use super::util::MergeIntoSourceBuildBloomInfo;
3838
use crate::fuse_part::FusePartInfo;
3939
use crate::io::AggIndexReader;
@@ -95,14 +95,11 @@ impl<const BLOCKING_IO: bool> ReadParquetDataSource<BLOCKING_IO> {
9595
index_reader,
9696
virtual_reader,
9797
table_schema,
98-
merge_into_source_build_bloom_info: MergeIntoSourceBuildBloomInfo {
99-
can_do_merge_into_rumtime_filter_bloom:
100-
can_merge_into_target_build_bloom_filter(ctx.clone(), table_index)?,
101-
segment_infos: Default::default(),
102-
catalog_info: merge_into_join.catalog_info.clone(),
103-
table_info: merge_into_join.table_info.clone(),
104-
database_name: merge_into_join.database_name.clone(),
105-
},
98+
merge_into_source_build_bloom_info: build_merge_into_source_build_bloom_info(
99+
ctx,
100+
table_index,
101+
merge_into_join,
102+
)?,
106103
})
107104
} else {
108105
let merge_into_join = ctx.get_merge_into_join();
@@ -121,14 +118,11 @@ impl<const BLOCKING_IO: bool> ReadParquetDataSource<BLOCKING_IO> {
121118
index_reader,
122119
virtual_reader,
123120
table_schema,
124-
merge_into_source_build_bloom_info: MergeIntoSourceBuildBloomInfo {
125-
can_do_merge_into_rumtime_filter_bloom:
126-
can_merge_into_target_build_bloom_filter(ctx.clone(), table_index)?,
127-
segment_infos: Default::default(),
128-
catalog_info: merge_into_join.catalog_info.clone(),
129-
table_info: merge_into_join.table_info.clone(),
130-
database_name: merge_into_join.database_name.clone(),
131-
},
121+
merge_into_source_build_bloom_info: build_merge_into_source_build_bloom_info(
122+
ctx,
123+
table_index,
124+
merge_into_join,
125+
)?,
132126
})))
133127
}
134128
}

0 commit comments

Comments
 (0)