Skip to content

Commit 69a5675

Browse files
committed
add runtime filter,need tests
1 parent 3778c67 commit 69a5675

File tree

13 files changed

+265
-201
lines changed

13 files changed

+265
-201
lines changed

src/query/catalog/src/runtime_filter_info.rs

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -15,14 +15,17 @@
1515
use std::sync::Arc;
1616

1717
use databend_common_expression::Expr;
18+
use parking_lot::RwLock;
1819
use xorf::BinaryFuse16;
1920

21+
pub type MergeIntoSourceBuildSiphashkeys = (Vec<String>, Arc<RwLock<Vec<Vec<u64>>>>);
22+
2023
#[derive(Clone, Debug, Default)]
2124
pub struct RuntimeFilterInfo {
2225
inlist: Vec<Expr<String>>,
2326
min_max: Vec<Expr<String>>,
2427
bloom: Vec<(String, BinaryFuse16)>,
25-
siphashes: Vec<(String, Arc<Vec<u64>>)>,
28+
siphashes: MergeIntoSourceBuildSiphashkeys,
2629
}
2730

2831
impl RuntimeFilterInfo {
@@ -34,12 +37,14 @@ impl RuntimeFilterInfo {
3437
self.bloom.push(bloom);
3538
}
3639

37-
pub fn get_merge_into_source_build_siphashkeys(&self) -> Vec<(String, Arc<Vec<u64>>)> {
40+
pub fn get_merge_into_source_build_siphashkeys(&self) -> MergeIntoSourceBuildSiphashkeys {
3841
self.siphashes.clone()
3942
}
4043

41-
pub fn add_merge_into_source_build_siphashkeys(&mut self, digests: (String, Arc<Vec<u64>>)) {
42-
self.siphashes.push(digests);
44+
pub fn add_merge_into_source_build_siphashkeys(&mut self, digests: (String, Vec<u64>)) {
45+
self.siphashes.0.push(digests.0);
46+
let mut borrow_hash_keys = self.siphashes.1.write();
47+
borrow_hash_keys.push(digests.1)
4348
}
4449

4550
pub fn add_min_max(&mut self, expr: Expr<String>) {

src/query/catalog/src/table_context.rs

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,8 +22,6 @@ use std::sync::Arc;
2222
use std::time::SystemTime;
2323

2424
use dashmap::DashMap;
25-
use databend_common_arrow::arrow::bitmap::Bitmap;
26-
use databend_common_arrow::arrow::buffer::Buffer;
2725
use databend_common_base::base::Progress;
2826
use databend_common_base::base::ProgressValues;
2927
use databend_common_base::runtime::profile::Profile;
@@ -63,6 +61,7 @@ use crate::plan::DataSourcePlan;
6361
use crate::plan::PartInfoPtr;
6462
use crate::plan::Partitions;
6563
use crate::query_kind::QueryKind;
64+
use crate::runtime_filter_info::MergeIntoSourceBuildSiphashkeys;
6665
use crate::runtime_filter_info::RuntimeFilterInfo;
6766
use crate::statistics::data_cache_statistics::DataCacheMetrics;
6867
use crate::table::Table;
@@ -278,7 +277,7 @@ pub trait TableContext: Send + Sync {
278277
fn get_merge_into_source_build_siphashkeys_with_id(
279278
&self,
280279
id: usize,
281-
) -> Vec<(String, Arc<Vec<u64>>)>;
280+
) -> Option<MergeIntoSourceBuildSiphashkeys>;
282281

283282
fn get_merge_into_source_build_bloom_probe_keys(&self, id: usize) -> Vec<String>;
284283

src/query/service/src/interpreters/interpreter_merge_into.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -203,6 +203,7 @@ impl MergeIntoInterpreter {
203203
table_info: merge_into_join.table_info.clone(),
204204
catalog_info: merge_into_join.catalog_info.clone(),
205205
database_name: merge_into_join.database_name.clone(),
206+
table_schema: merge_into_join.table_schema.clone(),
206207
});
207208
}
208209
}
@@ -305,6 +306,7 @@ impl MergeIntoInterpreter {
305306
table_info: Some(table_info.clone()),
306307
catalog_info: Some(catalog_.info()),
307308
database_name: merge_into_join.database_name.clone(),
309+
table_schema: merge_into_join.table_schema.clone(),
308310
})
309311
}
310312
// merge_into_source is used to recv join's datablocks and split them into macthed and not matched

src/query/service/src/pipelines/processors/transforms/hash_join/hash_join_build_state.rs

Lines changed: 6 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@ use std::sync::atomic::Ordering;
2121
use std::sync::Arc;
2222

2323
use databend_common_arrow::arrow::bitmap::Bitmap;
24-
use databend_common_arrow::arrow::buffer::Buffer;
2524
use databend_common_base::base::tokio::sync::Barrier;
2625
use databend_common_catalog::merge_into_join::MergeIntoJoinType;
2726
use databend_common_catalog::runtime_filter_info::RuntimeFilterInfo;
@@ -964,7 +963,7 @@ impl HashJoinBuildState {
964963
return Ok(());
965964
}
966965
let build_key_column = Column::concat_columns(columns.into_iter())?;
967-
// mabye there will be null values here, so we use nullable column, the null value will be treat as default
966+
// maybe there will be null values here, so we use nullable column, the null value will be treat as default
968967
// value for the sepcified type, like String -> "", int -> 0. so we need to remove the null hash values here.
969968
let (hashes, bitmap_op) = BloomIndex::calculate_nullable_column_digest(
970969
&self.func_ctx,
@@ -976,7 +975,7 @@ impl HashJoinBuildState {
976975
let digests = if bitmap.unset_bits() == 0 {
977976
hashes.to_vec()
978977
} else {
979-
let new_hashes = Vec::with_capacity(bitmap.len());
978+
let mut new_hashes = Vec::with_capacity(bitmap.len());
980979
assert_eq!(hashes.len(), bitmap.len());
981980
for row_idx in 0..bitmap.len() {
982981
if bitmap.get_bit(row_idx) {
@@ -986,16 +985,12 @@ impl HashJoinBuildState {
986985
new_hashes.to_vec()
987986
};
988987
// id is probe key name
989-
runtime_filter.add_merge_into_source_build_siphashkeys((
990-
id.to_string(),
991-
Arc::new(digests),
992-
));
988+
runtime_filter
989+
.add_merge_into_source_build_siphashkeys((id.to_string(), digests));
993990
} else {
994991
// id is probe key name
995-
runtime_filter.add_merge_into_source_build_siphashkeys((
996-
id.to_string(),
997-
Arc::new(hashes),
998-
));
992+
runtime_filter
993+
.add_merge_into_source_build_siphashkeys((id.to_string(), hashes.to_vec()));
999994
}
1000995
}
1001996
}

src/query/service/src/sessions/query_ctx.rs

Lines changed: 7 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -32,8 +32,6 @@ use std::time::UNIX_EPOCH;
3232
use chrono_tz::Tz;
3333
use dashmap::mapref::multiple::RefMulti;
3434
use dashmap::DashMap;
35-
use databend_common_arrow::arrow::bitmap::Bitmap;
36-
use databend_common_arrow::arrow::buffer::Buffer;
3735
use databend_common_base::base::tokio::task::JoinHandle;
3836
use databend_common_base::base::Progress;
3937
use databend_common_base::base::ProgressValues;
@@ -48,6 +46,7 @@ use databend_common_catalog::plan::PartInfoPtr;
4846
use databend_common_catalog::plan::Partitions;
4947
use databend_common_catalog::plan::StageTableInfo;
5048
use databend_common_catalog::query_kind::QueryKind;
49+
use databend_common_catalog::runtime_filter_info::MergeIntoSourceBuildSiphashkeys;
5150
use databend_common_catalog::runtime_filter_info::RuntimeFilterInfo;
5251
use databend_common_catalog::statistics::data_cache_statistics::DataCacheMetrics;
5352
use databend_common_catalog::table_args::TableArgs;
@@ -1031,23 +1030,18 @@ impl TableContext for QueryContext {
10311030

10321031
fn get_merge_into_source_build_siphashkeys_with_id(
10331032
&self,
1034-
id: IndexType,
1035-
) -> Vec<(String, Arc<Vec<u64>>)> {
1033+
id: usize,
1034+
) -> Option<MergeIntoSourceBuildSiphashkeys> {
10361035
let runtime_filters = self.shared.runtime_filters.read();
1037-
match runtime_filters.get(&id) {
1038-
Some(v) => v.get_merge_into_source_build_siphashkeys(),
1039-
None => vec![],
1040-
}
1036+
runtime_filters
1037+
.get(&id)
1038+
.map(|v| v.get_merge_into_source_build_siphashkeys())
10411039
}
10421040

10431041
fn get_merge_into_source_build_bloom_probe_keys(&self, id: usize) -> Vec<String> {
10441042
let runtime_filters = self.shared.runtime_filters.read();
10451043
match runtime_filters.get(&id) {
1046-
Some(v) => v
1047-
.get_merge_into_source_build_siphashkeys()
1048-
.iter()
1049-
.map(|key| key.0)
1050-
.collect(),
1044+
Some(v) => v.get_merge_into_source_build_siphashkeys().0.clone(),
10511045
None => vec![],
10521046
}
10531047
}

src/query/service/tests/it/sql/exec/get_table_bind_test.rs

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,6 @@ use std::sync::atomic::AtomicUsize;
1818
use std::sync::Arc;
1919

2020
use dashmap::DashMap;
21-
use databend_common_arrow::arrow::bitmap::Bitmap;
22-
use databend_common_arrow::arrow::buffer::Buffer;
2321
use databend_common_base::base::tokio;
2422
use databend_common_base::base::Progress;
2523
use databend_common_base::base::ProgressValues;
@@ -33,6 +31,7 @@ use databend_common_catalog::plan::DataSourcePlan;
3331
use databend_common_catalog::plan::PartInfoPtr;
3432
use databend_common_catalog::plan::Partitions;
3533
use databend_common_catalog::query_kind::QueryKind;
34+
use databend_common_catalog::runtime_filter_info::MergeIntoSourceBuildSiphashkeys;
3635
use databend_common_catalog::runtime_filter_info::RuntimeFilterInfo;
3736
use databend_common_catalog::statistics::data_cache_statistics::DataCacheMetrics;
3837
use databend_common_catalog::table::Table;
@@ -832,16 +831,16 @@ impl TableContext for CtxDelegation {
832831

833832
fn get_merge_into_source_build_siphashkeys_with_id(
834833
&self,
835-
id: usize,
836-
) -> Vec<(String, Arc<Vec<u64>>)> {
834+
_id: usize,
835+
) -> Option<MergeIntoSourceBuildSiphashkeys> {
837836
todo!()
838837
}
839838

840-
fn get_merge_into_source_build_bloom_probe_keys(&self, id: usize) -> Vec<String> {
839+
fn get_merge_into_source_build_bloom_probe_keys(&self, _id: usize) -> Vec<String> {
841840
todo!()
842841
}
843842

844-
fn set_merge_into_source_build_segments(&self, segments: MergeIntoSourceBuildSegments) {
843+
fn set_merge_into_source_build_segments(&self, _segments: MergeIntoSourceBuildSegments) {
845844
todo!()
846845
}
847846

src/query/service/tests/it/storages/fuse/operations/commit.rs

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -375,6 +375,33 @@ impl TableContext for CtxDelegation {
375375
self
376376
}
377377

378+
fn get_merge_into_source_build_bloom_probe_keys(&self, _: usize) -> Vec<std::string::String> {
379+
todo!()
380+
}
381+
382+
fn set_merge_into_source_build_segments(
383+
&self,
384+
_: Arc<Vec<(usize, (std::string::String, u64))>>,
385+
) {
386+
todo!()
387+
}
388+
389+
fn get_merge_into_source_build_siphashkeys_with_id(
390+
&self,
391+
_: usize,
392+
) -> std::option::Option<(
393+
Vec<std::string::String>,
394+
Arc<parking_lot::lock_api::RwLock<parking_lot::RawRwLock, Vec<Vec<u64>>>>,
395+
)> {
396+
todo!()
397+
}
398+
399+
fn get_merge_into_source_build_segments(
400+
&self,
401+
) -> Arc<Vec<(usize, (std::string::String, u64))>> {
402+
todo!()
403+
}
404+
378405
fn build_table_from_source_plan(&self, _plan: &DataSourcePlan) -> Result<Arc<dyn Table>> {
379406
todo!()
380407
}

src/query/sql/src/planner/optimizer/optimizer.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -416,8 +416,8 @@ fn optimize_merge_into(opt_ctx: OptimizerContext, plan: Box<MergeInto>) -> Resul
416416
{
417417
let merge_into_join = opt_ctx.table_ctx.get_merge_into_join();
418418
// this is the first time set, so it must be none, and we will set it in `interpreter_merge_into`
419-
assert!(matches!(merge_into_join.catalog_info, None));
420-
assert!(matches!(merge_into_join.table_info, None));
419+
assert!(merge_into_join.catalog_info.is_none());
420+
assert!(merge_into_join.table_info.is_none());
421421
assert!(merge_into_join.database_name.as_str() == "");
422422
opt_ctx.table_ctx.set_merge_into_join(MergeIntoJoin {
423423
// we will set catalog_info and table_info in `interpreter_merge_into`
@@ -573,7 +573,7 @@ fn try_to_change_as_broadcast_join(
573573
if let RelOperator::Exchange(Exchange::Merge) = merge_into_join_sexpr.plan.as_ref() {
574574
let right_exchange = merge_into_join_sexpr.child(0)?.child(1)?;
575575
if let RelOperator::Exchange(Exchange::Broadcast) = right_exchange.plan.as_ref() {
576-
let mut join: Join = merge_into_join_sexpr.child(0)?.plan().clone().try_into()?;
576+
let join: Join = merge_into_join_sexpr.child(0)?.plan().clone().try_into()?;
577577

578578
let join_s_expr = merge_into_join_sexpr
579579
.child(0)?

src/query/storages/fuse/src/operations/read/native_data_source_reader.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,6 @@
1515
use std::any::Any;
1616
use std::sync::Arc;
1717

18-
use databend_common_catalog::merge_into_join;
1918
use databend_common_catalog::plan::PartInfoPtr;
2019
use databend_common_catalog::plan::StealablePartitions;
2120
use databend_common_catalog::table_context::TableContext;

0 commit comments

Comments
 (0)