Skip to content

Commit d03501c

Browse files
committed
refactor hashed logic
1 parent 5190213 commit d03501c

File tree

5 files changed

+62
-17
lines changed

5 files changed

+62
-17
lines changed

src/query/catalog/src/runtime_filter_info.rs

Lines changed: 5 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,8 @@
1212
// See the License for the specific language governing permissions and
1313
// limitations under the License.
1414

15+
use std::sync::Arc;
16+
1517
use databend_common_arrow::arrow::bitmap::Bitmap;
1618
use databend_common_arrow::arrow::buffer::Buffer;
1719
use databend_common_expression::Expr;
@@ -22,7 +24,7 @@ pub struct RuntimeFilterInfo {
2224
inlist: Vec<Expr<String>>,
2325
min_max: Vec<Expr<String>>,
2426
bloom: Vec<(String, BinaryFuse16)>,
25-
siphashes: Vec<(String, (Buffer<u64>, Option<Bitmap>))>,
27+
siphashes: Vec<(String, Arc<Vec<u64>>)>,
2628
}
2729

2830
impl RuntimeFilterInfo {
@@ -34,16 +36,11 @@ impl RuntimeFilterInfo {
3436
self.bloom.push(bloom);
3537
}
3638

37-
pub fn get_merge_into_source_build_siphashkeys(
38-
&mut self,
39-
) -> Vec<(String, (Buffer<u64>, Option<Bitmap>))> {
39+
pub fn get_merge_into_source_build_siphashkeys(&self) -> Vec<(String, Arc<Vec<u64>>)> {
4040
self.siphashes.clone()
4141
}
4242

43-
pub fn add_merge_into_source_build_siphashkeys(
44-
&mut self,
45-
digests: (String, (Buffer<u64>, Option<Bitmap>)),
46-
) {
43+
pub fn add_merge_into_source_build_siphashkeys(&mut self, digests: (String, Arc<Vec<u64>>)) {
4744
self.siphashes.push(digests);
4845
}
4946

src/query/service/src/pipelines/processors/transforms/hash_join/hash_join_build_state.rs

Lines changed: 38 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,7 @@ use databend_common_hashtable::StringRawEntry;
5757
use databend_common_hashtable::STRING_EARLY_SIZE;
5858
use databend_common_sql::plans::JoinType;
5959
use databend_common_sql::ColumnSet;
60+
use databend_common_storages_fuse::operations::can_merge_into_target_build_bloom_filter;
6061
use databend_storages_common_index::BloomIndex;
6162
use ethnum::U256;
6263
use itertools::Itertools;
@@ -851,12 +852,14 @@ impl HashJoinBuildState {
851852
}
852853

853854
// add BloomIndex hash keys for merge into source build.
854-
self.build_merge_into_runtime_filter_siphashes(
855-
build_chunks,
856-
&mut runtime_filter,
857-
build_key,
858-
probe_key,
859-
)?;
855+
if can_merge_into_target_build_bloom_filter(self.ctx.clone(), *table_index)? {
856+
self.build_merge_into_runtime_filter_siphashes(
857+
build_chunks,
858+
&mut runtime_filter,
859+
build_key,
860+
probe_key,
861+
)?;
862+
}
860863

861864
if self.enable_bloom_runtime_filter {
862865
self.bloom_runtime_filter(build_chunks, &mut runtime_filter, build_key, probe_key)?;
@@ -961,12 +964,39 @@ impl HashJoinBuildState {
961964
return Ok(());
962965
}
963966
let build_key_column = Column::concat_columns(columns.into_iter())?;
964-
let digests = BloomIndex::calculate_nullable_column_digest(
967+
// mabye there will be null values here, so we use nullable column, the null value will be treat as default
968+
// value for the sepcified type, like String -> "", int -> 0. so we need to remove the null hash values here.
969+
let (hashes, bitmap_op) = BloomIndex::calculate_nullable_column_digest(
965970
&self.func_ctx,
966971
&build_key_column,
967972
&build_key_column.data_type(),
968973
)?;
969-
runtime_filter.add_merge_into_source_build_siphashkeys((id.to_string(), digests));
974+
if let Some(bitmap) = bitmap_op {
975+
// no null values
976+
let digests = if bitmap.unset_bits() == 0 {
977+
hashes.to_vec()
978+
} else {
979+
let new_hashes = Vec::with_capacity(bitmap.len());
980+
assert_eq!(hashes.len(), bitmap.len());
981+
for row_idx in 0..bitmap.len() {
982+
if bitmap.get_bit(row_idx) {
983+
new_hashes.push(hashes[row_idx])
984+
}
985+
}
986+
new_hashes.to_vec()
987+
};
988+
// id is probe key name
989+
runtime_filter.add_merge_into_source_build_siphashkeys((
990+
id.to_string(),
991+
Arc::new(digests),
992+
));
993+
} else {
994+
// id is probe key name
995+
runtime_filter.add_merge_into_source_build_siphashkeys((
996+
id.to_string(),
997+
Arc::new(hashes),
998+
));
999+
}
9701000
}
9711001
}
9721002
Ok(())

src/query/service/tests/it/sql/exec/get_table_bind_test.rs

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@ use std::sync::atomic::AtomicUsize;
1818
use std::sync::Arc;
1919

2020
use dashmap::DashMap;
21+
use databend_common_arrow::arrow::bitmap::Bitmap;
22+
use databend_common_arrow::arrow::buffer::Buffer;
2123
use databend_common_base::base::tokio;
2224
use databend_common_base::base::Progress;
2325
use databend_common_base::base::ProgressValues;
@@ -26,6 +28,7 @@ use databend_common_catalog::catalog::Catalog;
2628
use databend_common_catalog::cluster_info::Cluster;
2729
use databend_common_catalog::database::Database;
2830
use databend_common_catalog::merge_into_join::MergeIntoJoin;
31+
use databend_common_catalog::merge_into_join::MergeIntoSourceBuildSegments;
2932
use databend_common_catalog::plan::DataSourcePlan;
3033
use databend_common_catalog::plan::PartInfoPtr;
3134
use databend_common_catalog::plan::Partitions;
@@ -826,6 +829,21 @@ impl TableContext for CtxDelegation {
826829
fn set_read_block_thresholds(&self, _thresholds: BlockThresholds) {
827830
todo!()
828831
}
832+
833+
fn get_merge_into_source_build_siphashkeys_with_id(
834+
&self,
835+
id: usize,
836+
) -> Vec<(String, (Buffer<u64>, Option<Bitmap>))> {
837+
todo!()
838+
}
839+
840+
fn set_merge_into_source_build_segments(&self, segments: MergeIntoSourceBuildSegments) {
841+
todo!()
842+
}
843+
844+
fn get_merge_into_source_build_segments(&self) -> MergeIntoSourceBuildSegments {
845+
todo!()
846+
}
829847
}
830848

831849
#[tokio::test(flavor = "multi_thread")]

src/query/storages/fuse/src/operations/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ pub use delete::MutationBlockPruningContext;
4242
pub use merge_into::*;
4343
pub use mutation::*;
4444
pub use read::build_row_fetcher_pipeline;
45+
pub use read::can_merge_into_target_build_bloom_filter;
4546
pub use read::need_reserve_block_info;
4647
pub use replace_into::*;
4748
pub use util::acquire_task_permit;

src/query/storages/fuse/src/operations/read/runtime_filter_prunner.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -181,7 +181,6 @@ pub fn runtime_filter_pruner(
181181
"merge into source build runtime bloom filter: segment_idx:{},blk_idx:{}",
182182
segment_idx, block_idx
183183
);
184-
// the row_id is generated by block_id, not block_idx,reference to fill_internal_column_meta()
185184
let block_meta = segment_info.blocks[block_idx].clone();
186185
}
187186

0 commit comments

Comments
 (0)