Skip to content

Commit 7737e9f

Browse files
committed
ready for review
1 parent b718cf1 commit 7737e9f

File tree

5 files changed

+16
-20
lines changed

5 files changed

+16
-20
lines changed

src/query/service/src/pipelines/processors/transforms/hash_join/join_hash_table.rs

Lines changed: 7 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -693,8 +693,7 @@ impl HashJoinState for JoinHashTable {
693693
// Find the unmatched rows in build side
694694
let mut unmatched_build_indexes = vec![];
695695
{
696-
let chunks = self.row_space.chunks.read().unwrap();
697-
for (chunk_index, chunk) in chunks.iter().enumerate() {
696+
for (chunk_index, chunk) in self.row_space.chunks.read().unwrap().iter().enumerate() {
698697
for row_index in 0..chunk.num_rows() {
699698
let row_ptr = RowPtr {
700699
chunk_index: chunk_index as u32,
@@ -708,6 +707,8 @@ impl HashJoinState for JoinHashTable {
708707
.read()
709708
.contains(&row_ptr)
710709
{
710+
let mut row_state = self.hash_join_desc.right_join_desc.row_state.write();
711+
row_state.entry(row_ptr.clone()).or_insert(0_usize);
711712
unmatched_build_indexes.push(row_ptr);
712713
}
713714
}
@@ -754,7 +755,6 @@ impl HashJoinState for JoinHashTable {
754755
// must be one of above
755756
_ => unreachable!(),
756757
};
757-
dbg!(merged_block.clone());
758758
let probe_column_len = self.probe_schema.fields().len();
759759
let probe_columns = merged_block.columns()[0..probe_column_len]
760760
.iter()
@@ -765,19 +765,17 @@ impl HashJoinState for JoinHashTable {
765765
self.row_space.data_schema.clone(),
766766
merged_block.columns()[probe_column_len..].to_vec(),
767767
);
768-
merged_block = self.merge_eq_block(&probe_block, &build_block)?;
768+
merged_block = self.merge_eq_block(&build_block, &probe_block)?;
769769

770-
// If there are only non-equi conditions, build_indexes size will greater build table size
771-
// Because the case will cause cross join.
772-
// We need filter the redundant rows for build side.
773-
let build_indexes = self.hash_join_desc.right_join_desc.build_indexes.read();
770+
// If build_indexes size will greater build table size, we need filter the redundant rows for build side.
771+
let mut build_indexes = self.hash_join_desc.right_join_desc.build_indexes.write();
774772
let mut row_state = self.hash_join_desc.right_join_desc.row_state.write();
773+
build_indexes.extend_from_slice(&unmatched_build_indexes);
775774
if build_indexes.len() > self.row_space.rows_number() {
776775
let mut bm = validity.into_mut().right().unwrap();
777776
Self::filter_rows_for_right_join(&mut bm, &build_indexes, &mut row_state);
778777
let predicate = BooleanColumn::from_arrow_data(bm.into()).arc();
779778
let filtered_block = DataBlock::filter_block(merged_block, &predicate)?;
780-
dbg!(filtered_block.clone());
781779
return Ok(vec![filtered_block]);
782780
}
783781

src/query/service/src/pipelines/processors/transforms/hash_join/result_blocks.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -659,7 +659,7 @@ impl JoinHashTable {
659659
row_state: &mut std::collections::HashMap<RowPtr, usize>,
660660
) {
661661
for (index, row) in build_indexes.iter().enumerate() {
662-
if row_state[row] == 1 {
662+
if row_state[row] == 1 || row_state[row] == 0 {
663663
if !bm.get(index) {
664664
bm.set(index, true)
665665
}

src/query/service/src/pipelines/processors/transforms/hash_join/row.rs

Lines changed: 1 addition & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ impl Chunk {
3636
}
3737
}
3838

39-
#[derive(Clone, Copy, Debug, Hash)]
39+
#[derive(Clone, Copy, Debug, Hash, PartialEq, Eq)]
4040
pub struct RowPtr {
4141
pub chunk_index: u32,
4242
pub row_index: u32,
@@ -110,11 +110,3 @@ impl RowSpace {
110110
}
111111
}
112112
}
113-
114-
impl PartialEq for RowPtr {
115-
fn eq(&self, other: &Self) -> bool {
116-
self.chunk_index == other.chunk_index && self.row_index == other.row_index
117-
}
118-
}
119-
120-
impl Eq for RowPtr {}

src/query/service/src/sql/executor/physical_plan.rs

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -214,13 +214,18 @@ impl HashJoin {
214214

215215
JoinType::Right => {
216216
fields.clear();
217-
fields = self.build.output_schema()?.fields().clone();
218217
for field in self.probe.output_schema()?.fields() {
219218
fields.push(DataField::new(
220219
field.name().as_str(),
221220
wrap_nullable(field.data_type()),
222221
));
223222
}
223+
for field in self.build.output_schema()?.fields() {
224+
fields.push(DataField::new(
225+
field.name().as_str(),
226+
field.data_type().clone(),
227+
));
228+
}
224229
}
225230

226231
JoinType::Semi | JoinType::Anti => {

src/query/service/src/sql/executor/pipeline_builder.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -471,6 +471,7 @@ impl PipelineBuilder {
471471
}
472472

473473
if join.join_type == JoinType::Right {
474+
self.main_pipeline.resize(1)?;
474475
self.main_pipeline.add_transform(|input, output| {
475476
TransformRightJoin::try_create(
476477
input,

0 commit comments

Comments
 (0)