Skip to content

Commit e2bca75

Browse files
committed
support non-equi condition
1 parent fd7c133 commit e2bca75

File tree

4 files changed

+82
-10
lines changed

4 files changed

+82
-10
lines changed

src/query/service/src/pipelines/processors/transforms/hash_join/join_hash_table.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -108,7 +108,7 @@ pub enum HashTable {
108108
KeyU512HashTable(KeyU512HashTable),
109109
}
110110

111-
#[derive(Clone, Copy, Eq, PartialEq, Debug)]
111+
#[derive(Clone, Copy, Eq, PartialEq, Debug, Hash)]
112112
pub enum MarkerKind {
113113
True,
114114
False,

src/query/service/src/pipelines/processors/transforms/hash_join/result_blocks.rs

Lines changed: 68 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -546,17 +546,24 @@ impl JoinHashTable {
546546
let probe_indexes = &mut probe_state.probe_indexs;
547547
let valids = &probe_state.valids;
548548
let mut validity = MutableBitmap::new();
549+
let mut row_state = std::collections::HashMap::new();
549550
for (i, key) in keys_iter.enumerate() {
550551
let probe_result_ptr = Self::probe_key(hash_table, key, valids, i);
551552
if let Some(v) = probe_result_ptr {
552553
let probe_result_ptrs = v.get_value();
553554
build_indexes.extend_from_slice(probe_result_ptrs);
555+
for row_ptr in probe_result_ptrs.iter() {
556+
row_state
557+
.entry(row_ptr.clone())
558+
.and_modify(|e| *e += 1)
559+
.or_insert(1 as usize);
560+
}
554561
probe_indexes.extend(std::iter::repeat(i as u32).take(probe_result_ptrs.len()));
555562
validity.extend_constant(probe_result_ptrs.len(), true);
556563
}
557564
}
558565

559-
// For right join, build side will always appear in the joined table
566+
// For right join, build side will appear at lease once in the joined table
560567
// Find the unmatched rows in build side
561568
let mut unmatched_build_indexes = vec![];
562569
for kv in hash_table.iter() {
@@ -591,7 +598,45 @@ impl JoinHashTable {
591598
);
592599

593600
nullable_probe_block = DataBlock::concat_blocks(&[nullable_probe_block, null_probe_block])?;
594-
let merged_block = self.merge_eq_block(&nullable_probe_block, &build_block)?;
601+
let mut merged_block = self.merge_eq_block(&nullable_probe_block, &build_block)?;
602+
if !WITH_OTHER_CONJUNCT {
603+
return Ok(merged_block);
604+
}
605+
606+
let (bm, all_true, all_false) = self.get_other_filters(
607+
&merged_block,
608+
self.hash_join_desc.other_predicate.as_ref().unwrap(),
609+
)?;
610+
611+
if all_true {
612+
return Ok(merged_block);
613+
}
614+
615+
let validity = match (bm, all_false) {
616+
(Some(b), _) => b,
617+
(None, true) => Bitmap::new_zeroed(merged_block.num_rows()),
618+
// must be one of above
619+
_ => unreachable!(),
620+
};
621+
622+
let nullable_columns = nullable_probe_block
623+
.columns()
624+
.iter()
625+
.map(|c| Self::set_validity(c, &validity))
626+
.collect::<Result<Vec<_>>>()?;
627+
nullable_probe_block = DataBlock::create(self.probe_schema.clone(), nullable_columns);
628+
merged_block = self.merge_eq_block(&nullable_probe_block, &build_block)?;
629+
630+
// If there are only non-equi conditions, build_indexes size will greater build table size
631+
// Because the case will cause cross join.
632+
// We need filter the redundant rows for build side.
633+
if build_indexes.len() > self.row_space.rows_number() {
634+
let mut bm = validity.into_mut().right().unwrap();
635+
Self::filter_rows_for_right_join(&mut bm, build_indexes, &mut row_state);
636+
let predicate = BooleanColumn::from_arrow_data(bm.into()).arc();
637+
return DataBlock::filter_block(merged_block, &predicate);
638+
}
639+
595640
Ok(merged_block)
596641
}
597642

@@ -644,9 +689,9 @@ impl JoinHashTable {
644689
}
645690

646691
// keep at least one index of the positive state and the null state
647-
// bitmap: [1, 0, 1] with row_state [2, 0], probe_index: [0, 0, 1]
692+
// bitmap: [1, 0, 1] with row_state [2, 1], probe_index: [0, 0, 1]
648693
// bitmap will be [1, 0, 1] -> [1, 0, 1] -> [1, 0, 1] -> [1, 0, 1]
649-
// row_state will be [2, 0] -> [2, 0] -> [1, 0] -> [1, 0]
694+
// row_state will be [2, 1] -> [2, 1] -> [1, 1] -> [1, 1]
650695
fn fill_null_for_left_join(
651696
bm: &mut MutableBitmap,
652697
probe_indexs: &[u32],
@@ -672,6 +717,25 @@ impl JoinHashTable {
672717
}
673718
}
674719

720+
fn filter_rows_for_right_join(
721+
bm: &mut MutableBitmap,
722+
build_indexes: &[RowPtr],
723+
row_state: &mut std::collections::HashMap<RowPtr, usize>,
724+
) {
725+
for (index, row) in build_indexes.iter().enumerate() {
726+
if row_state[row] == 1 {
727+
if !bm.get(index) {
728+
bm.set(index, true)
729+
}
730+
continue;
731+
}
732+
733+
if !bm.get(index) {
734+
row_state.entry(*row).and_modify(|e| *e -= 1);
735+
}
736+
}
737+
}
738+
675739
// return an (option bitmap, all_true, all_false)
676740
fn get_other_filters(
677741
&self,

src/query/service/src/pipelines/processors/transforms/hash_join/row.rs

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ impl Chunk {
3636
}
3737
}
3838

39-
#[derive(Clone, Copy, Debug)]
39+
#[derive(Clone, Copy, Debug, Hash)]
4040
pub struct RowPtr {
4141
pub chunk_index: u32,
4242
pub row_index: u32,
@@ -81,6 +81,11 @@ impl RowSpace {
8181
chunks.iter().map(|c| c.data_block.clone()).collect()
8282
}
8383

84+
pub fn rows_number(&self) -> usize {
85+
let chunks = self.chunks.read().unwrap();
86+
chunks.iter().map(|c| c.num_rows()).sum()
87+
}
88+
8489
pub fn gather(&self, row_ptrs: &[RowPtr]) -> Result<DataBlock> {
8590
let data_blocks = self.datablocks();
8691
let num_rows = data_blocks
@@ -111,3 +116,5 @@ impl PartialEq for RowPtr {
111116
self.chunk_index == other.chunk_index && self.row_index == other.row_index
112117
}
113118
}
119+
120+
impl Eq for RowPtr {}

src/query/service/src/sql/planner/binder/join.rs

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -198,9 +198,11 @@ impl<'a> Binder {
198198
marker_index: None,
199199
from_correlated_subquery: false,
200200
};
201-
let expr = SExpr::create_binary(inner_join.into(), left_child, right_child);
202-
203-
Ok(expr)
201+
Ok(SExpr::create_binary(
202+
inner_join.into(),
203+
left_child,
204+
right_child,
205+
))
204206
}
205207
}
206208

@@ -361,7 +363,6 @@ impl<'a> JoinConditionResolver<'a> {
361363
// For example, `t1.a + t1.b = t2.a` is a valid one while `t1.a + t2.a = t2.b` isn't.
362364
//
363365
// Only equi-predicate can be exploited by common join algorithms(e.g. sort-merge join, hash join).
364-
// For the predicates that aren't equi-predicate, we will lift them as a `Filter` operator.
365366
if let Some((left, right)) = split_equivalent_predicate(predicate) {
366367
self.add_conditions(left, right, left_join_conditions, right_join_conditions)?;
367368
} else {

0 commit comments

Comments
 (0)