Skip to content

Commit 510e727

Browse files
authored
Merge pull request #7634 from xudong963/right_joi
feat(planner): support independent right join
2 parents 1da0789 + fe2444f commit 510e727

File tree

13 files changed

+435
-169
lines changed

13 files changed

+435
-169
lines changed

src/query/service/src/pipelines/processors/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@ pub use transforms::KeyU64HashTable;
5454
pub use transforms::KeyU8HashTable;
5555
pub use transforms::MarkJoinCompactor;
5656
pub use transforms::ProjectionTransform;
57+
pub use transforms::RightJoinCompactor;
5758
pub use transforms::SerializerHashTable;
5859
pub use transforms::SinkBuildHashTable;
5960
pub use transforms::SortMergeCompactor;

src/query/service/src/pipelines/processors/transforms/hash_join/desc.rs

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,17 +12,36 @@
1212
// See the License for the specific language governing permissions and
1313
// limitations under the License.
1414

15+
use std::collections::HashMap;
16+
1517
use common_exception::Result;
1618
use common_functions::scalars::FunctionFactory;
1719
use parking_lot::RwLock;
1820

1921
use crate::evaluator::EvalNode;
2022
use crate::evaluator::Evaluator;
23+
use crate::pipelines::processors::transforms::hash_join::row::RowPtr;
2124
use crate::pipelines::processors::transforms::hash_join::MarkJoinDesc;
2225
use crate::sql::executor::HashJoin;
2326
use crate::sql::executor::PhysicalScalar;
2427
use crate::sql::plans::JoinType;
2528

29+
pub struct RightJoinDesc {
30+
/// Record rows in build side that are matched with rows in probe side.
31+
pub(crate) build_indexes: RwLock<Vec<RowPtr>>,
32+
/// Record row in build side that is matched how many rows in probe side.
33+
pub(crate) row_state: RwLock<HashMap<RowPtr, usize>>,
34+
}
35+
36+
impl RightJoinDesc {
37+
pub fn create() -> Self {
38+
RightJoinDesc {
39+
build_indexes: RwLock::new(Vec::new()),
40+
row_state: RwLock::new(HashMap::new()),
41+
}
42+
}
43+
}
44+
2645
pub struct HashJoinDesc {
2746
pub(crate) build_keys: Vec<EvalNode>,
2847
pub(crate) probe_keys: Vec<EvalNode>,
@@ -31,6 +50,7 @@ pub struct HashJoinDesc {
3150
pub(crate) marker_join_desc: MarkJoinDesc,
3251
/// Whether the Join are derived from correlated subquery.
3352
pub(crate) from_correlated_subquery: bool,
53+
pub(crate) right_join_desc: RightJoinDesc,
3454
}
3555

3656
impl HashJoinDesc {
@@ -50,6 +70,7 @@ impl HashJoinDesc {
5070
marker_index: join.marker_index,
5171
},
5272
from_correlated_subquery: join.from_correlated_subquery,
73+
right_join_desc: RightJoinDesc::create(),
5374
})
5475
}
5576

src/query/service/src/pipelines/processors/transforms/hash_join/hash_join_state.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,4 +44,7 @@ pub trait HashJoinState: Send + Sync {
4444

4545
/// Get mark join results
4646
fn mark_join_blocks(&self) -> Result<Vec<DataBlock>>;
47+
48+
/// Get right join results
49+
fn right_join_blocks(&self, blocks: &[DataBlock]) -> Result<Vec<DataBlock>>;
4750
}

src/query/service/src/pipelines/processors/transforms/hash_join/join_hash_table.rs

Lines changed: 123 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,10 +13,12 @@
1313
// limitations under the License.
1414

1515
use std::borrow::BorrowMut;
16+
use std::collections::HashSet;
1617
use std::fmt::Debug;
1718
use std::sync::Arc;
1819
use std::sync::Mutex;
1920

21+
use common_arrow::arrow::bitmap::Bitmap;
2022
use common_arrow::arrow::bitmap::MutableBitmap;
2123
use common_base::base::tokio::sync::Notify;
2224
use common_datablocks::DataBlock;
@@ -35,7 +37,9 @@ use common_datavalues::DataField;
3537
use common_datavalues::DataSchema;
3638
use common_datavalues::DataSchemaRef;
3739
use common_datavalues::DataSchemaRefExt;
40+
use common_datavalues::DataType;
3841
use common_datavalues::DataTypeImpl;
42+
use common_datavalues::DataValue;
3943
use common_datavalues::NullableType;
4044
use common_exception::ErrorCode;
4145
use common_exception::Result;
@@ -108,7 +112,7 @@ pub enum HashTable {
108112
KeyU512HashTable(KeyU512HashTable),
109113
}
110114

111-
#[derive(Clone, Copy, Eq, PartialEq, Debug)]
115+
#[derive(Clone, Copy, Eq, PartialEq, Debug, Hash)]
112116
pub enum MarkerKind {
113117
True,
114118
False,
@@ -130,6 +134,7 @@ pub struct JoinHashTable {
130134
pub(crate) row_space: RowSpace,
131135
pub(crate) hash_join_desc: HashJoinDesc,
132136
pub(crate) row_ptrs: RwLock<Vec<RowPtr>>,
137+
pub(crate) probe_schema: DataSchemaRef,
133138
finished_notify: Arc<Notify>,
134139
}
135140

@@ -138,6 +143,7 @@ impl JoinHashTable {
138143
ctx: Arc<QueryContext>,
139144
build_keys: &[PhysicalScalar],
140145
build_schema: DataSchemaRef,
146+
probe_schema: DataSchemaRef,
141147
hash_join_desc: HashJoinDesc,
142148
) -> Result<Arc<JoinHashTable>> {
143149
let hash_key_types: Vec<DataTypeImpl> =
@@ -151,6 +157,7 @@ impl JoinHashTable {
151157
hash_method: HashMethodSerializer::default(),
152158
}),
153159
build_schema,
160+
probe_schema,
154161
hash_join_desc,
155162
)?),
156163
HashMethodKind::KeysU8(hash_method) => Arc::new(JoinHashTable::try_create(
@@ -160,6 +167,7 @@ impl JoinHashTable {
160167
hash_method,
161168
}),
162169
build_schema,
170+
probe_schema,
163171
hash_join_desc,
164172
)?),
165173
HashMethodKind::KeysU16(hash_method) => Arc::new(JoinHashTable::try_create(
@@ -169,6 +177,7 @@ impl JoinHashTable {
169177
hash_method,
170178
}),
171179
build_schema,
180+
probe_schema,
172181
hash_join_desc,
173182
)?),
174183
HashMethodKind::KeysU32(hash_method) => Arc::new(JoinHashTable::try_create(
@@ -178,6 +187,7 @@ impl JoinHashTable {
178187
hash_method,
179188
}),
180189
build_schema,
190+
probe_schema,
181191
hash_join_desc,
182192
)?),
183193
HashMethodKind::KeysU64(hash_method) => Arc::new(JoinHashTable::try_create(
@@ -187,6 +197,7 @@ impl JoinHashTable {
187197
hash_method,
188198
}),
189199
build_schema,
200+
probe_schema,
190201
hash_join_desc,
191202
)?),
192203
HashMethodKind::KeysU128(hash_method) => Arc::new(JoinHashTable::try_create(
@@ -196,6 +207,7 @@ impl JoinHashTable {
196207
hash_method,
197208
}),
198209
build_schema,
210+
probe_schema,
199211
hash_join_desc,
200212
)?),
201213
HashMethodKind::KeysU256(hash_method) => Arc::new(JoinHashTable::try_create(
@@ -205,6 +217,7 @@ impl JoinHashTable {
205217
hash_method,
206218
}),
207219
build_schema,
220+
probe_schema,
208221
hash_join_desc,
209222
)?),
210223
HashMethodKind::KeysU512(hash_method) => Arc::new(JoinHashTable::try_create(
@@ -214,6 +227,7 @@ impl JoinHashTable {
214227
hash_method,
215228
}),
216229
build_schema,
230+
probe_schema,
217231
hash_join_desc,
218232
)?),
219233
})
@@ -223,6 +237,7 @@ impl JoinHashTable {
223237
ctx: Arc<QueryContext>,
224238
hash_table: HashTable,
225239
mut build_data_schema: DataSchemaRef,
240+
mut probe_data_schema: DataSchemaRef,
226241
hash_join_desc: HashJoinDesc,
227242
) -> Result<Self> {
228243
if hash_join_desc.join_type == JoinType::Left
@@ -237,6 +252,16 @@ impl JoinHashTable {
237252
}
238253
build_data_schema = DataSchemaRefExt::create(nullable_field);
239254
};
255+
if hash_join_desc.join_type == JoinType::Right {
256+
let mut nullable_field = Vec::with_capacity(probe_data_schema.fields().len());
257+
for field in probe_data_schema.fields().iter() {
258+
nullable_field.push(DataField::new_nullable(
259+
field.name(),
260+
field.data_type().clone(),
261+
));
262+
}
263+
probe_data_schema = DataSchemaRefExt::create(nullable_field);
264+
}
240265
Ok(Self {
241266
row_space: RowSpace::new(build_data_schema),
242267
ref_count: Mutex::new(0),
@@ -245,6 +270,7 @@ impl JoinHashTable {
245270
ctx,
246271
hash_table: RwLock::new(hash_table),
247272
row_ptrs: RwLock::new(vec![]),
273+
probe_schema: probe_data_schema,
248274
finished_notify: Arc::new(Notify::new()),
249275
})
250276
}
@@ -407,6 +433,31 @@ impl JoinHashTable {
407433
}
408434
}
409435
}
436+
437+
fn find_unmatched_build_indexes(&self) -> Result<Vec<RowPtr>> {
438+
// For right join, build side will appear at least once in the joined table
439+
// Find the unmatched rows in build side
440+
let mut unmatched_build_indexes = vec![];
441+
let build_indexes = self.hash_join_desc.right_join_desc.build_indexes.read();
442+
let build_indexes_set: HashSet<&RowPtr> = build_indexes.iter().collect();
443+
// TODO(xudong): remove the line of code below after https://github.com/rust-lang/rust-clippy/issues/8987
444+
#[allow(clippy::significant_drop_in_scrutinee)]
445+
for (chunk_index, chunk) in self.row_space.chunks.read().unwrap().iter().enumerate() {
446+
for row_index in 0..chunk.num_rows() {
447+
let row_ptr = RowPtr {
448+
chunk_index: chunk_index as u32,
449+
row_index: row_index as u32,
450+
marker: None,
451+
};
452+
if !build_indexes_set.contains(&row_ptr) {
453+
let mut row_state = self.hash_join_desc.right_join_desc.row_state.write();
454+
row_state.entry(row_ptr).or_insert(0_usize);
455+
unmatched_build_indexes.push(row_ptr);
456+
}
457+
}
458+
}
459+
Ok(unmatched_build_indexes)
460+
}
410461
}
411462

412463
#[async_trait::async_trait]
@@ -429,7 +480,8 @@ impl HashJoinState for JoinHashTable {
429480
| JoinType::Anti
430481
| JoinType::Left
431482
| Mark
432-
| JoinType::Single => self.probe_join(input, probe_state),
483+
| JoinType::Single
484+
| JoinType::Right => self.probe_join(input, probe_state),
433485
JoinType::Cross => self.probe_cross_join(input, probe_state),
434486
_ => unimplemented!("{} is unimplemented", self.hash_join_desc.join_type),
435487
}
@@ -661,4 +713,73 @@ impl HashJoinState for JoinHashTable {
661713
let build_block = self.row_space.gather(&row_ptrs)?;
662714
Ok(vec![self.merge_eq_block(&marker_block, &build_block)?])
663715
}
716+
717+
fn right_join_blocks(&self, blocks: &[DataBlock]) -> Result<Vec<DataBlock>> {
718+
let unmatched_build_indexes = self.find_unmatched_build_indexes()?;
719+
if unmatched_build_indexes.is_empty() && self.hash_join_desc.other_predicate.is_none() {
720+
return Ok(blocks.to_vec());
721+
}
722+
723+
let unmatched_build_block = self.row_space.gather(&unmatched_build_indexes)?;
724+
// Create null block for unmatched rows in probe side
725+
let null_probe_block = DataBlock::create(
726+
self.probe_schema.clone(),
727+
self.probe_schema
728+
.fields()
729+
.iter()
730+
.map(|df| {
731+
df.data_type()
732+
.clone()
733+
.create_constant_column(&DataValue::Null, unmatched_build_indexes.len())
734+
})
735+
.collect::<Result<Vec<_>>>()?,
736+
);
737+
let mut merged_block = self.merge_eq_block(&unmatched_build_block, &null_probe_block)?;
738+
merged_block = DataBlock::concat_blocks(&[blocks, &[merged_block]].concat())?;
739+
740+
if self.hash_join_desc.other_predicate.is_none() {
741+
return Ok(vec![merged_block]);
742+
}
743+
744+
let (bm, all_true, all_false) = self.get_other_filters(
745+
&merged_block,
746+
self.hash_join_desc.other_predicate.as_ref().unwrap(),
747+
)?;
748+
749+
if all_true {
750+
return Ok(vec![merged_block]);
751+
}
752+
753+
let validity = match (bm, all_false) {
754+
(Some(b), _) => b,
755+
(None, true) => Bitmap::new_zeroed(merged_block.num_rows()),
756+
// must be one of above
757+
_ => unreachable!(),
758+
};
759+
let probe_column_len = self.probe_schema.fields().len();
760+
let probe_columns = merged_block.columns()[0..probe_column_len]
761+
.iter()
762+
.map(|c| Self::set_validity(c, &validity))
763+
.collect::<Result<Vec<_>>>()?;
764+
let probe_block = DataBlock::create(self.probe_schema.clone(), probe_columns);
765+
let build_block = DataBlock::create(
766+
self.row_space.data_schema.clone(),
767+
merged_block.columns()[probe_column_len..].to_vec(),
768+
);
769+
merged_block = self.merge_eq_block(&build_block, &probe_block)?;
770+
771+
// If build_indexes size will greater build table size, we need filter the redundant rows for build side.
772+
let mut build_indexes = self.hash_join_desc.right_join_desc.build_indexes.write();
773+
let mut row_state = self.hash_join_desc.right_join_desc.row_state.write();
774+
build_indexes.extend(&unmatched_build_indexes);
775+
if build_indexes.len() > self.row_space.rows_number() {
776+
let mut bm = validity.into_mut().right().unwrap();
777+
Self::filter_rows_for_right_join(&mut bm, &build_indexes, &mut row_state);
778+
let predicate = BooleanColumn::from_arrow_data(bm.into()).arc();
779+
let filtered_block = DataBlock::filter_block(merged_block, &predicate)?;
780+
return Ok(vec![filtered_block]);
781+
}
782+
783+
Ok(vec![merged_block])
784+
}
664785
}

0 commit comments

Comments
 (0)