Skip to content

Commit b0b8c5d

Browse files
committed
add TransformRightJoin
1 parent cdb3074 commit b0b8c5d

File tree

8 files changed

+200
-86
lines changed

8 files changed

+200
-86
lines changed

src/query/service/src/pipelines/processors/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@ pub use transforms::KeyU64HashTable;
5454
pub use transforms::KeyU8HashTable;
5555
pub use transforms::MarkJoinCompactor;
5656
pub use transforms::ProjectionTransform;
57+
pub use transforms::RightJoinCompactor;
5758
pub use transforms::SerializerHashTable;
5859
pub use transforms::SinkBuildHashTable;
5960
pub use transforms::SortMergeCompactor;

src/query/service/src/pipelines/processors/transforms/hash_join/desc.rs

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,17 +12,36 @@
1212
// See the License for the specific language governing permissions and
1313
// limitations under the License.
1414

15+
use std::collections::HashMap;
16+
1517
use common_exception::Result;
1618
use common_functions::scalars::FunctionFactory;
1719
use parking_lot::RwLock;
1820

1921
use crate::evaluator::EvalNode;
2022
use crate::evaluator::Evaluator;
23+
use crate::pipelines::processors::transforms::hash_join::row::RowPtr;
2124
use crate::pipelines::processors::transforms::hash_join::MarkJoinDesc;
2225
use crate::sql::executor::HashJoin;
2326
use crate::sql::executor::PhysicalScalar;
2427
use crate::sql::plans::JoinType;
2528

29+
pub struct RightJoinDesc {
30+
/// Record rows in build side that are matched with rows in probe side.
31+
pub(crate) build_indexes: RwLock<Vec<RowPtr>>,
32+
/// Record row in build side that is matched how many rows in probe side.
33+
pub(crate) row_state: RwLock<HashMap<RowPtr, usize>>,
34+
}
35+
36+
impl RightJoinDesc {
37+
pub fn create() -> Self {
38+
RightJoinDesc {
39+
build_indexes: RwLock::new(vec![]),
40+
row_state: RwLock::new(HashMap::new()),
41+
}
42+
}
43+
}
44+
2645
pub struct HashJoinDesc {
2746
pub(crate) build_keys: Vec<EvalNode>,
2847
pub(crate) probe_keys: Vec<EvalNode>,
@@ -31,6 +50,7 @@ pub struct HashJoinDesc {
3150
pub(crate) marker_join_desc: MarkJoinDesc,
3251
/// Whether the Join are derived from correlated subquery.
3352
pub(crate) from_correlated_subquery: bool,
53+
pub(crate) right_join_desc: RightJoinDesc,
3454
}
3555

3656
impl HashJoinDesc {
@@ -50,6 +70,7 @@ impl HashJoinDesc {
5070
marker_index: join.marker_index,
5171
},
5272
from_correlated_subquery: join.from_correlated_subquery,
73+
right_join_desc: RightJoinDesc::create(),
5374
})
5475
}
5576

src/query/service/src/pipelines/processors/transforms/hash_join/hash_join_state.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,4 +44,7 @@ pub trait HashJoinState: Send + Sync {
4444

4545
/// Get mark join results
4646
fn mark_join_blocks(&self) -> Result<Vec<DataBlock>>;
47+
48+
/// Get right join results
49+
fn right_join_blocks(&self, blocks: &[DataBlock]) -> Result<Vec<DataBlock>>;
4750
}

src/query/service/src/pipelines/processors/transforms/hash_join/join_hash_table.rs

Lines changed: 93 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ use std::fmt::Debug;
1717
use std::sync::Arc;
1818
use std::sync::Mutex;
1919

20+
use common_arrow::arrow::bitmap::Bitmap;
2021
use common_arrow::arrow::bitmap::MutableBitmap;
2122
use common_base::base::tokio::sync::Notify;
2223
use common_datablocks::DataBlock;
@@ -35,7 +36,9 @@ use common_datavalues::DataField;
3536
use common_datavalues::DataSchema;
3637
use common_datavalues::DataSchemaRef;
3738
use common_datavalues::DataSchemaRefExt;
39+
use common_datavalues::DataType;
3840
use common_datavalues::DataTypeImpl;
41+
use common_datavalues::DataValue;
3942
use common_datavalues::NullableType;
4043
use common_exception::ErrorCode;
4144
use common_exception::Result;
@@ -684,4 +687,94 @@ impl HashJoinState for JoinHashTable {
684687
let build_block = self.row_space.gather(&row_ptrs)?;
685688
Ok(vec![self.merge_eq_block(&marker_block, &build_block)?])
686689
}
690+
691+
fn right_join_blocks(&self, blocks: &[DataBlock]) -> Result<Vec<DataBlock>> {
692+
// For right join, build side will appear at lease once in the joined table
693+
// Find the unmatched rows in build side
694+
let mut unmatched_build_indexes = vec![];
695+
{
696+
let chunks = self.row_space.chunks.read().unwrap();
697+
for (chunk_index, chunk) in chunks.iter().enumerate() {
698+
for row_index in 0..chunk.num_rows() {
699+
let row_ptr = RowPtr {
700+
chunk_index: chunk_index as u32,
701+
row_index: row_index as u32,
702+
marker: None,
703+
};
704+
if !self
705+
.hash_join_desc
706+
.right_join_desc
707+
.build_indexes
708+
.read()
709+
.contains(&row_ptr)
710+
{
711+
unmatched_build_indexes.push(row_ptr);
712+
}
713+
}
714+
}
715+
}
716+
717+
let unmatched_build_block = self.row_space.gather(&unmatched_build_indexes)?;
718+
// Create null block for unmatched rows in probe side
719+
let null_probe_block = DataBlock::create(
720+
self.probe_schema.clone(),
721+
self.probe_schema
722+
.fields()
723+
.iter()
724+
.map(|df| {
725+
df.data_type()
726+
.clone()
727+
.create_constant_column(&DataValue::Null, unmatched_build_indexes.len())
728+
})
729+
.collect::<Result<Vec<_>>>()?,
730+
);
731+
let mut merged_block = self.merge_eq_block(&null_probe_block, &unmatched_build_block)?;
732+
merged_block = DataBlock::concat_blocks(&[blocks, &[merged_block]].concat())?;
733+
734+
if self.hash_join_desc.other_predicate.is_none() {
735+
return Ok(vec![merged_block]);
736+
}
737+
738+
let (bm, all_true, all_false) = self.get_other_filters(
739+
&merged_block,
740+
self.hash_join_desc.other_predicate.as_ref().unwrap(),
741+
)?;
742+
743+
if all_true {
744+
return Ok(vec![merged_block]);
745+
}
746+
747+
let validity = match (bm, all_false) {
748+
(Some(b), _) => b,
749+
(None, true) => Bitmap::new_zeroed(merged_block.num_rows()),
750+
// must be one of above
751+
_ => unreachable!(),
752+
};
753+
754+
let probe_column_len = self.probe_schema.fields().len();
755+
let probe_columns = merged_block.columns()[0..probe_column_len]
756+
.iter()
757+
.map(|c| Self::set_validity(c, &validity))
758+
.collect::<Result<Vec<_>>>()?;
759+
let probe_block = DataBlock::create(self.probe_schema.clone(), probe_columns);
760+
let build_block = DataBlock::create(
761+
self.row_space.data_schema.clone(),
762+
merged_block.columns()[probe_column_len..].to_vec(),
763+
);
764+
merged_block = self.merge_eq_block(&probe_block, &build_block)?;
765+
766+
// If there are only non-equi conditions, build_indexes size will greater build table size
767+
// Because the case will cause cross join.
768+
// We need filter the redundant rows for build side.
769+
let build_indexes = self.hash_join_desc.right_join_desc.build_indexes.read();
770+
let mut row_state = self.hash_join_desc.right_join_desc.row_state.write();
771+
if build_indexes.len() > self.row_space.rows_number() {
772+
let mut bm = validity.into_mut().right().unwrap();
773+
Self::filter_rows_for_right_join(&mut bm, &build_indexes, &mut row_state);
774+
let predicate = BooleanColumn::from_arrow_data(bm.into()).arc();
775+
return Ok(vec![DataBlock::filter_block(merged_block, &predicate)?]);
776+
}
777+
778+
Ok(vec![merged_block])
779+
}
687780
}

src/query/service/src/pipelines/processors/transforms/hash_join/result_blocks.rs

Lines changed: 22 additions & 86 deletions
Original file line numberDiff line numberDiff line change
@@ -159,15 +159,8 @@ impl JoinHashTable {
159159
}
160160
}
161161
JoinType::Right => {
162-
if self.hash_join_desc.other_predicate.is_none() {
163-
let result =
164-
self.right_join::<false, _, _>(hash_table, probe_state, keys_iter, input)?;
165-
return Ok(vec![result]);
166-
} else {
167-
let result =
168-
self.right_join::<true, _, _>(hash_table, probe_state, keys_iter, input)?;
169-
return Ok(vec![result]);
170-
}
162+
let result = self.right_join::<_, _>(hash_table, probe_state, keys_iter, input)?;
163+
return Ok(vec![result]);
171164
}
172165
Mark => {
173166
results.push(DataBlock::empty());
@@ -531,7 +524,7 @@ impl JoinHashTable {
531524
DataBlock::filter_block(merged_block, &predicate)
532525
}
533526

534-
fn right_join<const WITH_OTHER_CONJUNCT: bool, Key, IT>(
527+
fn right_join<Key, IT>(
535528
&self,
536529
hash_table: &HashMap<Key, Vec<RowPtr>>,
537530
probe_state: &mut ProbeState,
@@ -542,102 +535,45 @@ impl JoinHashTable {
542535
Key: HashTableKeyable + Clone + 'static,
543536
IT: Iterator<Item = Key> + TrustedLen,
544537
{
545-
let build_indexes = &mut probe_state.build_indexs;
538+
let local_build_indexes = &mut probe_state.build_indexs;
546539
let probe_indexes = &mut probe_state.probe_indexs;
547540
let valids = &probe_state.valids;
548541
let mut validity = MutableBitmap::new();
549-
let mut row_state = std::collections::HashMap::new();
550542
for (i, key) in keys_iter.enumerate() {
551543
let probe_result_ptr = Self::probe_key(hash_table, key, valids, i);
552544
if let Some(v) = probe_result_ptr {
553545
let probe_result_ptrs = v.get_value();
554-
build_indexes.extend_from_slice(probe_result_ptrs);
546+
{
547+
let mut build_indexes =
548+
self.hash_join_desc.right_join_desc.build_indexes.write();
549+
build_indexes.extend_from_slice(probe_result_ptrs);
550+
local_build_indexes.extend_from_slice(probe_result_ptrs);
551+
}
555552
for row_ptr in probe_result_ptrs.iter() {
556-
row_state
557-
.entry(*row_ptr)
558-
.and_modify(|e| *e += 1)
559-
.or_insert(1_usize);
553+
{
554+
let mut row_state = self.hash_join_desc.right_join_desc.row_state.write();
555+
row_state
556+
.entry(*row_ptr)
557+
.and_modify(|e| *e += 1)
558+
.or_insert(1_usize);
559+
}
560560
}
561561
probe_indexes.extend(std::iter::repeat(i as u32).take(probe_result_ptrs.len()));
562562
validity.extend_constant(probe_result_ptrs.len(), true);
563563
}
564564
}
565565

566-
// For right join, build side will appear at lease once in the joined table
567-
// Find the unmatched rows in build side
568-
let mut unmatched_build_indexes = vec![];
569-
for kv in hash_table.iter() {
570-
for v in kv.get_value() {
571-
if !build_indexes.contains(v) {
572-
unmatched_build_indexes.push(*v);
573-
}
574-
}
575-
}
576-
build_indexes.extend_from_slice(&unmatched_build_indexes);
577-
let build_block = self.row_space.gather(build_indexes)?;
566+
let build_block = self.row_space.gather(local_build_indexes)?;
578567
let probe_block = DataBlock::block_take_by_indices(input, probe_indexes)?;
579568
let validity: Bitmap = validity.into();
580569
let nullable_columns = probe_block
581570
.columns()
582571
.iter()
583572
.map(|c| Self::set_validity(c, &validity))
584573
.collect::<Result<Vec<_>>>()?;
585-
let mut nullable_probe_block =
586-
DataBlock::create(self.probe_schema.clone(), nullable_columns);
587-
// Create null block for unmatched rows in probe side
588-
let null_probe_block = DataBlock::create(
589-
self.probe_schema.clone(),
590-
nullable_probe_block
591-
.columns()
592-
.iter()
593-
.map(|c| {
594-
c.data_type()
595-
.create_constant_column(&DataValue::Null, unmatched_build_indexes.len())
596-
})
597-
.collect::<Result<Vec<_>>>()?,
598-
);
599-
600-
nullable_probe_block = DataBlock::concat_blocks(&[nullable_probe_block, null_probe_block])?;
601-
let mut merged_block = self.merge_eq_block(&nullable_probe_block, &build_block)?;
602-
if !WITH_OTHER_CONJUNCT {
603-
return Ok(merged_block);
604-
}
605-
606-
let (bm, all_true, all_false) = self.get_other_filters(
607-
&merged_block,
608-
self.hash_join_desc.other_predicate.as_ref().unwrap(),
609-
)?;
610-
611-
if all_true {
612-
return Ok(merged_block);
613-
}
614-
615-
let validity = match (bm, all_false) {
616-
(Some(b), _) => b,
617-
(None, true) => Bitmap::new_zeroed(merged_block.num_rows()),
618-
// must be one of above
619-
_ => unreachable!(),
620-
};
621-
622-
let nullable_columns = nullable_probe_block
623-
.columns()
624-
.iter()
625-
.map(|c| Self::set_validity(c, &validity))
626-
.collect::<Result<Vec<_>>>()?;
627-
nullable_probe_block = DataBlock::create(self.probe_schema.clone(), nullable_columns);
628-
merged_block = self.merge_eq_block(&nullable_probe_block, &build_block)?;
629-
630-
// If there are only non-equi conditions, build_indexes size will greater build table size
631-
// Because the case will cause cross join.
632-
// We need filter the redundant rows for build side.
633-
if build_indexes.len() > self.row_space.rows_number() {
634-
let mut bm = validity.into_mut().right().unwrap();
635-
Self::filter_rows_for_right_join(&mut bm, build_indexes, &mut row_state);
636-
let predicate = BooleanColumn::from_arrow_data(bm.into()).arc();
637-
return DataBlock::filter_block(merged_block, &predicate);
638-
}
574+
let nullable_probe_block = DataBlock::create(self.probe_schema.clone(), nullable_columns);
639575

640-
Ok(merged_block)
576+
self.merge_eq_block(&nullable_probe_block, &build_block)
641577
}
642578

643579
// modify the bm by the value row_state
@@ -717,7 +653,7 @@ impl JoinHashTable {
717653
}
718654
}
719655

720-
fn filter_rows_for_right_join(
656+
pub(crate) fn filter_rows_for_right_join(
721657
bm: &mut MutableBitmap,
722658
build_indexes: &[RowPtr],
723659
row_state: &mut std::collections::HashMap<RowPtr, usize>,
@@ -737,7 +673,7 @@ impl JoinHashTable {
737673
}
738674

739675
// return an (option bitmap, all_true, all_false)
740-
fn get_other_filters(
676+
pub(crate) fn get_other_filters(
741677
&self,
742678
merged_block: &DataBlock,
743679
filter: &EvalNode,

src/query/service/src/pipelines/processors/transforms/mod.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ mod transform_window_func;
3838

3939
pub mod group_by;
4040
mod transform_merge_block;
41+
mod transform_right_join;
4142

4243
pub use aggregator::AggregatorParams;
4344
pub use aggregator::AggregatorTransformParams;
@@ -79,6 +80,8 @@ pub use transform_mark_join::TransformMarkJoin;
7980
pub use transform_merge_block::TransformMergeBlock;
8081
pub use transform_project::TransformProject;
8182
pub use transform_rename::TransformRename;
83+
pub use transform_right_join::RightJoinCompactor;
84+
pub use transform_right_join::TransformRightJoin;
8285
pub use transform_sort_merge::SortMergeCompactor;
8386
pub use transform_sort_merge::TransformSortMerge;
8487
pub use transform_sort_partial::get_sort_descriptions;

0 commit comments

Comments
 (0)