Skip to content

Commit f1ad0cd

Browse files
committed
add merge_into_source_build_bloom_info
1 parent 29641be commit f1ad0cd

File tree

15 files changed

+372
-32
lines changed

15 files changed

+372
-32
lines changed

src/query/catalog/src/merge_into_join.rs

Lines changed: 37 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,12 @@
1212
// See the License for the specific language governing permissions and
1313
// limitations under the License.
1414

15+
use std::sync::Arc;
16+
17+
use databend_common_meta_app::schema::CatalogInfo;
18+
use databend_common_meta_app::schema::TableInfo;
19+
use databend_storages_common_table_meta::meta::Location;
20+
1521
#[derive(Clone)]
1622
pub enum MergeIntoJoinType {
1723
Left,
@@ -23,14 +29,38 @@ pub enum MergeIntoJoinType {
2329
NormalJoin,
2430
}
2531

26-
// for now, we just support MergeIntoJoinType::Left to use MergeIntoBlockInfoHashTable in two situations:
27-
// 1. distributed broadcast join and target table as build side.
28-
// 2. in standalone mode and target table as build side.
29-
// we will support Inner next, so the merge_into_join_type is only Left for current implementation in fact.
32+
pub type MergeIntoSourceBuildSegments = Arc<Vec<(usize, Location)>>;
33+
34+
// MergeIntoJoin is used in two cases:
35+
// I. target build optimization:
36+
// we should support MergeIntoJoinType::Left(need to support LeftInner,LeftAnti) to use MergeIntoBlockInfoHashTable in two situations:
37+
// 1. distributed broadcast join and target table as build side (don't support this now).
38+
// 2. in standalone mode and target table as build side (supported).
39+
// 3. native(not-supported) and parquet(supported) format
40+
// for the `target build optimization`, merge_into_join_type is only Left for current implementation in fact.And we
41+
// don't support distributed mode and parquet. So only if it is a LeftJoin and non-dirtributed-merge-into(even if in distributed environment
42+
// but the merge_into_optimizer can also give a non-dirtributed-merge-into) and uses parquet, the `target build optimization` can be enabled.
43+
// II. source build runtime bloom filter.
44+
// only if it's a RightJoin((need to support RightInner,RightAnti)),target_tbl_idx is not invalid. We can make sure it enables
45+
// `source build runtime bloom filter`. And in this case the table_info and catalog_info must be some. We give an `assert` for this judge.
46+
//
47+
// So let's do a summary:
48+
// In most cases, the MergeIntoJoin's `merge_into_join_type` is NormalJoin (even this is a real merge into join),because
49+
// MergeIntoJoin is used to judge whether we enable target_build_optimizations or source build runtime bloom filter.
50+
// Both of these optimizations all will check `merge_into_join_type` to judge if the correlated optimization is enabled.
51+
// So we only `set_merge_into_join` when we use these two optimziations (of course there is an exception, the `merge_into_optimzier` can't distinct
52+
// parquet and native, so it will think it's using parquet format in default, so if it's a native format in fact, the `interpreter_merge_into` will rollback it.
53+
// But the modification is a little tricky. Because the `MergeIntoJoinType` doesn't contain any info about parquet or native,we set the target_table_index as
54+
// DUMMY_TABLE_INDEX, but in fact it has a target_table_index. However it's ok to do this. Firstly we can do this to make sure the `target build optimization` is
55+
// closed. And it won't cause a conflict with `source build runtime bloom filter`. Because for the `target build optimization`, it's target build, and for
56+
// `source build runtime bloom filter`, it's source build).
3057
pub struct MergeIntoJoin {
3158
pub merge_into_join_type: MergeIntoJoinType,
3259
pub is_distributed: bool,
3360
pub target_tbl_idx: usize,
61+
pub table_info: Option<TableInfo>,
62+
pub catalog_info: Option<CatalogInfo>,
63+
pub database_name: String,
3464
}
3565

3666
impl Default for MergeIntoJoin {
@@ -40,6 +70,9 @@ impl Default for MergeIntoJoin {
4070
is_distributed: false,
4171
// Invalid Index
4272
target_tbl_idx: usize::MAX,
73+
table_info: None,
74+
catalog_info: None,
75+
database_name: Default::default(),
4376
}
4477
}
4578
}

src/query/catalog/src/plan/datasource/datasource_plan.rs

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -13,14 +13,12 @@
1313
// limitations under the License.
1414

1515
use std::collections::BTreeMap;
16-
use std::collections::HashMap;
1716

1817
use databend_common_expression::FieldIndex;
1918
use databend_common_expression::RemoteExpr;
2019
use databend_common_expression::Scalar;
2120
use databend_common_expression::TableSchemaRef;
2221
use databend_common_meta_app::schema::CatalogInfo;
23-
use databend_storages_common_table_meta::meta::BlockMeta;
2422

2523
use crate::plan::datasource::datasource_info::DataSourceInfo;
2624
use crate::plan::PartStatistics;
@@ -51,7 +49,6 @@ pub struct DataSourcePlan {
5149
pub data_mask_policy: Option<BTreeMap<FieldIndex, RemoteExpr>>,
5250

5351
pub table_index: usize,
54-
// pub merge_into_target_table_block_meta: Option<HashMap<BlockMetaIndex, BlockMeta>>,
5552
}
5653

5754
impl DataSourcePlan {

src/query/catalog/src/runtime_filter_info.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,12 @@ impl RuntimeFilterInfo {
3434
self.bloom.push(bloom);
3535
}
3636

37+
pub fn get_merge_into_source_build_siphashkeys(
38+
&mut self,
39+
) -> Vec<(String, (Buffer<u64>, Option<Bitmap>))> {
40+
self.siphashes.clone()
41+
}
42+
3743
pub fn add_merge_into_source_build_siphashkeys(
3844
&mut self,
3945
digests: (String, (Buffer<u64>, Option<Bitmap>)),

src/query/catalog/src/table_context.rs

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,8 @@ use std::sync::Arc;
2222
use std::time::SystemTime;
2323

2424
use dashmap::DashMap;
25+
use databend_common_arrow::arrow::bitmap::Bitmap;
26+
use databend_common_arrow::arrow::buffer::Buffer;
2527
use databend_common_base::base::Progress;
2628
use databend_common_base::base::ProgressValues;
2729
use databend_common_base::runtime::profile::Profile;
@@ -55,6 +57,7 @@ use xorf::BinaryFuse16;
5557
use crate::catalog::Catalog;
5658
use crate::cluster_info::Cluster;
5759
use crate::merge_into_join::MergeIntoJoin;
60+
use crate::merge_into_join::MergeIntoSourceBuildSegments;
5861
use crate::plan::DataSourcePlan;
5962
use crate::plan::PartInfoPtr;
6063
use crate::plan::Partitions;
@@ -254,18 +257,28 @@ pub trait TableContext: Send + Sync {
254257

255258
fn get_query_profiles(&self) -> Vec<PlanProfile>;
256259

260+
// <ProbeSide Index, RuntimeFilterInfo>
257261
fn set_runtime_filter(&self, filters: (usize, RuntimeFilterInfo));
258262

259263
fn set_merge_into_join(&self, join: MergeIntoJoin);
260264

261265
fn get_merge_into_join(&self) -> MergeIntoJoin;
266+
// set the target table's segments
267+
fn set_merge_into_source_build_segments(&self, segments: MergeIntoSourceBuildSegments);
268+
// get the target table's segments
269+
fn get_merge_into_source_build_segments(&self) -> MergeIntoSourceBuildSegments;
262270

263271
fn get_bloom_runtime_filter_with_id(&self, id: usize) -> Vec<(String, BinaryFuse16)>;
264272

265273
fn get_inlist_runtime_filter_with_id(&self, id: usize) -> Vec<Expr<String>>;
266274

267275
fn get_min_max_runtime_filter_with_id(&self, id: usize) -> Vec<Expr<String>>;
268276

277+
fn get_merge_into_source_build_siphashkeys_with_id(
278+
&self,
279+
id: usize,
280+
) -> Vec<(String, (Buffer<u64>, Option<Bitmap>))>;
281+
269282
fn has_bloom_runtime_filters(&self, id: usize) -> bool;
270283
fn txn_mgr(&self) -> TxnManagerRef;
271284
}

src/query/service/src/interpreters/interpreter_merge_into.rs

Lines changed: 21 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ use std::sync::Arc;
1717
use std::u64::MAX;
1818

1919
use databend_common_catalog::merge_into_join::MergeIntoJoin;
20+
use databend_common_catalog::merge_into_join::MergeIntoJoinType;
2021
use databend_common_catalog::table::TableExt;
2122
use databend_common_exception::ErrorCode;
2223
use databend_common_exception::Result;
@@ -181,7 +182,7 @@ impl MergeIntoInterpreter {
181182

182183
// for `target_build_optimization` we don't need to read rowId column. for now, there are two cases we don't read rowid:
183184
// I. InsertOnly, the MergeIntoType is InsertOnly
184-
// II. target build optimization for this pr. the MergeIntoType is MergeIntoType
185+
// II. target build optimization for this pr. the MergeIntoType is FullOperation
185186
let mut target_build_optimization =
186187
matches!(self.plan.merge_type, MergeIntoType::FullOperation)
187188
&& !self.plan.columns_set.contains(&self.plan.row_id_index);
@@ -199,6 +200,9 @@ impl MergeIntoInterpreter {
199200
target_tbl_idx: DUMMY_TABLE_INDEX,
200201
is_distributed: merge_into_join.is_distributed,
201202
merge_into_join_type: merge_into_join.merge_into_join_type,
203+
table_info: merge_into_join.table_info.clone(),
204+
catalog_info: merge_into_join.catalog_info.clone(),
205+
database_name: merge_into_join.database_name.clone(),
202206
});
203207
}
204208
}
@@ -287,7 +291,22 @@ impl MergeIntoInterpreter {
287291

288292
let table_info = fuse_table.get_table_info().clone();
289293
let catalog_ = self.ctx.get_catalog(catalog).await?;
290-
294+
// try add catalog_info and table_info for `source_build_bloom_filter`
295+
let merge_into_join = self.ctx.get_merge_into_join();
296+
let source_build_bloom_filter = matches!(
297+
merge_into_join.merge_into_join_type,
298+
MergeIntoJoinType::Right
299+
) && merge_into_join.target_tbl_idx != DUMMY_TABLE_INDEX;
300+
if source_build_bloom_filter {
301+
self.ctx.set_merge_into_join(MergeIntoJoin {
302+
target_tbl_idx: merge_into_join.target_tbl_idx,
303+
is_distributed: merge_into_join.is_distributed,
304+
merge_into_join_type: merge_into_join.merge_into_join_type,
305+
table_info: Some(table_info.clone()),
306+
catalog_info: Some(catalog_.info()),
307+
database_name: merge_into_join.database_name.clone(),
308+
})
309+
}
291310
// merge_into_source is used to recv join's datablocks and split them into macthed and not matched
292311
// datablocks.
293312
let merge_into_source = if !*distributed && extract_exchange {

src/query/service/src/sessions/query_ctx.rs

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,13 +32,16 @@ use std::time::UNIX_EPOCH;
3232
use chrono_tz::Tz;
3333
use dashmap::mapref::multiple::RefMulti;
3434
use dashmap::DashMap;
35+
use databend_common_arrow::arrow::bitmap::Bitmap;
36+
use databend_common_arrow::arrow::buffer::Buffer;
3537
use databend_common_base::base::tokio::task::JoinHandle;
3638
use databend_common_base::base::Progress;
3739
use databend_common_base::base::ProgressValues;
3840
use databend_common_base::runtime::profile::Profile;
3941
use databend_common_base::runtime::profile::ProfileStatisticsName;
4042
use databend_common_base::runtime::TrySpawn;
4143
use databend_common_catalog::merge_into_join::MergeIntoJoin;
44+
use databend_common_catalog::merge_into_join::MergeIntoSourceBuildSegments;
4245
use databend_common_catalog::plan::DataSourceInfo;
4346
use databend_common_catalog::plan::DataSourcePlan;
4447
use databend_common_catalog::plan::PartInfoPtr;
@@ -1008,6 +1011,9 @@ impl TableContext for QueryContext {
10081011
merge_into_join_type: merge_into_join.merge_into_join_type.clone(),
10091012
is_distributed: merge_into_join.is_distributed,
10101013
target_tbl_idx: merge_into_join.target_tbl_idx,
1014+
catalog_info: merge_into_join.catalog_info.clone(),
1015+
table_info: merge_into_join.table_info.clone(),
1016+
database_name: merge_into_join.database_name.clone(),
10111017
}
10121018
}
10131019

@@ -1019,6 +1025,17 @@ impl TableContext for QueryContext {
10191025
}
10201026
}
10211027

1028+
fn get_merge_into_source_build_siphashkeys_with_id(
1029+
&self,
1030+
id: IndexType,
1031+
) -> Vec<(String, (Buffer<u64>, Option<Bitmap>))> {
1032+
let runtime_filters = self.shared.runtime_filters.read();
1033+
match runtime_filters.get(&id) {
1034+
Some(v) => v.get_merge_into_source_build_siphashkeys(),
1035+
None => vec![],
1036+
}
1037+
}
1038+
10221039
fn get_inlist_runtime_filter_with_id(&self, id: IndexType) -> Vec<Expr<String>> {
10231040
let runtime_filters = self.shared.runtime_filters.read();
10241041
match runtime_filters.get(&id) {
@@ -1045,6 +1062,16 @@ impl TableContext for QueryContext {
10451062
fn txn_mgr(&self) -> TxnManagerRef {
10461063
self.shared.session.session_ctx.txn_mgr()
10471064
}
1065+
1066+
fn set_merge_into_source_build_segments(&self, segments: MergeIntoSourceBuildSegments) {
1067+
let mut merge_into_source_build_segments =
1068+
self.shared.merge_into_source_build_segments.write();
1069+
*merge_into_source_build_segments = segments;
1070+
}
1071+
1072+
fn get_merge_into_source_build_segments(&self) -> MergeIntoSourceBuildSegments {
1073+
self.shared.merge_into_source_build_segments.read().clone()
1074+
}
10481075
}
10491076

10501077
impl TrySpawn for QueryContext {

src/query/service/src/sessions/query_ctx_shared.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ use databend_common_base::runtime::drop_guard;
2626
use databend_common_base::runtime::Runtime;
2727
use databend_common_catalog::catalog::CatalogManager;
2828
use databend_common_catalog::merge_into_join::MergeIntoJoin;
29+
use databend_common_catalog::merge_into_join::MergeIntoSourceBuildSegments;
2930
use databend_common_catalog::query_kind::QueryKind;
3031
use databend_common_catalog::runtime_filter_info::RuntimeFilterInfo;
3132
use databend_common_catalog::statistics::data_cache_statistics::DataCacheMetrics;
@@ -121,6 +122,9 @@ pub struct QueryContextShared {
121122

122123
pub(in crate::sessions) merge_into_join: Arc<RwLock<MergeIntoJoin>>,
123124

125+
pub(in crate::sessions) merge_into_source_build_segments:
126+
Arc<RwLock<MergeIntoSourceBuildSegments>>,
127+
124128
// Records query level data cache metrics
125129
pub(in crate::sessions) query_cache_metrics: DataCacheMetrics,
126130
}
@@ -170,6 +174,7 @@ impl QueryContextShared {
170174
query_profiles: Arc::new(RwLock::new(HashMap::new())),
171175
runtime_filters: Default::default(),
172176
merge_into_join: Default::default(),
177+
merge_into_source_build_segments: Default::default(),
173178
}))
174179
}
175180

0 commit comments

Comments
 (0)