Skip to content

Commit 2a17451

Browse files
committed
feat(planner): support independent right join
1 parent 9ea070e commit 2a17451

File tree

6 files changed

+123
-111
lines changed

6 files changed

+123
-111
lines changed

src/query/service/src/pipelines/processors/transforms/hash_join/join_hash_table.rs

Lines changed: 24 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -130,6 +130,7 @@ pub struct JoinHashTable {
130130
pub(crate) row_space: RowSpace,
131131
pub(crate) hash_join_desc: HashJoinDesc,
132132
pub(crate) row_ptrs: RwLock<Vec<RowPtr>>,
133+
pub(crate) probe_schema: DataSchemaRef,
133134
finished_notify: Arc<Notify>,
134135
}
135136

@@ -138,6 +139,7 @@ impl JoinHashTable {
138139
ctx: Arc<QueryContext>,
139140
build_keys: &[PhysicalScalar],
140141
build_schema: DataSchemaRef,
142+
probe_schema: DataSchemaRef,
141143
hash_join_desc: HashJoinDesc,
142144
) -> Result<Arc<JoinHashTable>> {
143145
let hash_key_types: Vec<DataTypeImpl> =
@@ -151,6 +153,7 @@ impl JoinHashTable {
151153
hash_method: HashMethodSerializer::default(),
152154
}),
153155
build_schema,
156+
probe_schema,
154157
hash_join_desc,
155158
)?),
156159
HashMethodKind::KeysU8(hash_method) => Arc::new(JoinHashTable::try_create(
@@ -160,6 +163,7 @@ impl JoinHashTable {
160163
hash_method,
161164
}),
162165
build_schema,
166+
probe_schema,
163167
hash_join_desc,
164168
)?),
165169
HashMethodKind::KeysU16(hash_method) => Arc::new(JoinHashTable::try_create(
@@ -169,6 +173,7 @@ impl JoinHashTable {
169173
hash_method,
170174
}),
171175
build_schema,
176+
probe_schema,
172177
hash_join_desc,
173178
)?),
174179
HashMethodKind::KeysU32(hash_method) => Arc::new(JoinHashTable::try_create(
@@ -178,6 +183,7 @@ impl JoinHashTable {
178183
hash_method,
179184
}),
180185
build_schema,
186+
probe_schema,
181187
hash_join_desc,
182188
)?),
183189
HashMethodKind::KeysU64(hash_method) => Arc::new(JoinHashTable::try_create(
@@ -187,6 +193,7 @@ impl JoinHashTable {
187193
hash_method,
188194
}),
189195
build_schema,
196+
probe_schema,
190197
hash_join_desc,
191198
)?),
192199
HashMethodKind::KeysU128(hash_method) => Arc::new(JoinHashTable::try_create(
@@ -196,6 +203,7 @@ impl JoinHashTable {
196203
hash_method,
197204
}),
198205
build_schema,
206+
probe_schema,
199207
hash_join_desc,
200208
)?),
201209
HashMethodKind::KeysU256(hash_method) => Arc::new(JoinHashTable::try_create(
@@ -205,6 +213,7 @@ impl JoinHashTable {
205213
hash_method,
206214
}),
207215
build_schema,
216+
probe_schema,
208217
hash_join_desc,
209218
)?),
210219
HashMethodKind::KeysU512(hash_method) => Arc::new(JoinHashTable::try_create(
@@ -214,6 +223,7 @@ impl JoinHashTable {
214223
hash_method,
215224
}),
216225
build_schema,
226+
probe_schema,
217227
hash_join_desc,
218228
)?),
219229
})
@@ -223,6 +233,7 @@ impl JoinHashTable {
223233
ctx: Arc<QueryContext>,
224234
hash_table: HashTable,
225235
mut build_data_schema: DataSchemaRef,
236+
mut probe_data_schema: DataSchemaRef,
226237
hash_join_desc: HashJoinDesc,
227238
) -> Result<Self> {
228239
if hash_join_desc.join_type == JoinType::Left
@@ -237,6 +248,16 @@ impl JoinHashTable {
237248
}
238249
build_data_schema = DataSchemaRefExt::create(nullable_field);
239250
};
251+
if hash_join_desc.join_type == JoinType::Right {
252+
let mut nullable_field = Vec::with_capacity(probe_data_schema.fields().len());
253+
for field in probe_data_schema.fields().iter() {
254+
nullable_field.push(DataField::new_nullable(
255+
field.name(),
256+
field.data_type().clone(),
257+
));
258+
}
259+
probe_data_schema = DataSchemaRefExt::create(nullable_field);
260+
}
240261
Ok(Self {
241262
row_space: RowSpace::new(build_data_schema),
242263
ref_count: Mutex::new(0),
@@ -245,6 +266,7 @@ impl JoinHashTable {
245266
ctx,
246267
hash_table: RwLock::new(hash_table),
247268
row_ptrs: RwLock::new(vec![]),
269+
probe_schema: probe_data_schema,
248270
finished_notify: Arc::new(Notify::new()),
249271
})
250272
}
@@ -429,7 +451,8 @@ impl HashJoinState for JoinHashTable {
429451
| JoinType::Anti
430452
| JoinType::Left
431453
| Mark
432-
| JoinType::Single => self.probe_join(input, probe_state),
454+
| JoinType::Single
455+
| JoinType::Right => self.probe_join(input, probe_state),
433456
JoinType::Cross => self.probe_cross_join(input, probe_state),
434457
_ => unimplemented!("{} is unimplemented", self.hash_join_desc.join_type),
435458
}

src/query/service/src/pipelines/processors/transforms/hash_join/result_blocks.rs

Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -158,6 +158,17 @@ impl JoinHashTable {
158158
return Ok(vec![result]);
159159
}
160160
}
161+
JoinType::Right => {
162+
if self.hash_join_desc.other_predicate.is_none() {
163+
let result =
164+
self.right_join::<false, _, _>(hash_table, probe_state, keys_iter, input)?;
165+
return Ok(vec![result]);
166+
} else {
167+
let result =
168+
self.right_join::<true, _, _>(hash_table, probe_state, keys_iter, input)?;
169+
return Ok(vec![result]);
170+
}
171+
}
161172
Mark => {
162173
results.push(DataBlock::empty());
163174
// Three cases will produce Mark join:
@@ -520,6 +531,71 @@ impl JoinHashTable {
520531
DataBlock::filter_block(merged_block, &predicate)
521532
}
522533

534+
fn right_join<const WITH_OTHER_CONJUNCT: bool, Key, IT>(
535+
&self,
536+
hash_table: &HashMap<Key, Vec<RowPtr>>,
537+
probe_state: &mut ProbeState,
538+
keys_iter: IT,
539+
input: &DataBlock,
540+
) -> Result<DataBlock>
541+
where
542+
Key: HashTableKeyable + Clone + 'static,
543+
IT: Iterator<Item = Key> + TrustedLen,
544+
{
545+
let build_indexes = &mut probe_state.build_indexs;
546+
let probe_indexes = &mut probe_state.probe_indexs;
547+
let valids = &probe_state.valids;
548+
let mut validity = MutableBitmap::new();
549+
for (i, key) in keys_iter.enumerate() {
550+
let probe_result_ptr = Self::probe_key(hash_table, key, valids, i);
551+
if let Some(v) = probe_result_ptr {
552+
let probe_result_ptrs = v.get_value();
553+
build_indexes.extend_from_slice(probe_result_ptrs);
554+
probe_indexes.extend(std::iter::repeat(i as u32).take(probe_result_ptrs.len()));
555+
validity.extend_constant(probe_result_ptrs.len(), true);
556+
}
557+
}
558+
559+
// For right join, build side will always appear in the joined table
560+
// Find the unmatched rows in build side
561+
let mut unmatched_build_indexes = vec![];
562+
for kv in hash_table.iter() {
563+
for v in kv.get_value() {
564+
if !build_indexes.contains(v) {
565+
unmatched_build_indexes.push(v.clone());
566+
}
567+
}
568+
}
569+
build_indexes.extend_from_slice(&unmatched_build_indexes);
570+
let build_block = self.row_space.gather(build_indexes)?;
571+
let probe_block = DataBlock::block_take_by_indices(input, probe_indexes)?;
572+
let validity: Bitmap = validity.into();
573+
let nullable_columns = probe_block
574+
.columns()
575+
.iter()
576+
.map(|c| Self::set_validity(c, &validity))
577+
.collect::<Result<Vec<_>>>()?;
578+
let mut nullable_probe_block =
579+
DataBlock::create(self.probe_schema.clone(), nullable_columns);
580+
// Create null block for unmatched rows in probe side
581+
let null_probe_block = DataBlock::create(
582+
self.probe_schema.clone(),
583+
nullable_probe_block
584+
.columns()
585+
.iter()
586+
.map(|c| {
587+
c.data_type()
588+
.create_constant_column(&DataValue::Null, unmatched_build_indexes.len())
589+
})
590+
.collect::<Result<Vec<_>>>()?,
591+
);
592+
593+
nullable_probe_block =
594+
DataBlock::concat_blocks(&vec![nullable_probe_block, null_probe_block])?;
595+
let merged_block = self.merge_eq_block(&nullable_probe_block, &build_block)?;
596+
Ok(merged_block)
597+
}
598+
523599
// modify the bm by the value row_state
524600
// keep the index of the first positive state
525601
// bitmap: [1, 1, 1] with row_state [0, 0], probe_index: [0, 0, 0] (repeat the first element 3 times)

src/query/service/src/sql/executor/physical_plan.rs

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -212,6 +212,17 @@ impl HashJoin {
212212
}
213213
}
214214

215+
JoinType::Right => {
216+
fields.clear();
217+
fields = self.build.output_schema()?.fields().clone();
218+
for field in self.probe.output_schema()?.fields() {
219+
fields.push(DataField::new(
220+
field.name().as_str(),
221+
wrap_nullable(field.data_type()),
222+
));
223+
}
224+
}
225+
215226
JoinType::Semi | JoinType::Anti => {
216227
// Do nothing
217228
}

src/query/service/src/sql/executor/pipeline_builder.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -145,6 +145,7 @@ impl PipelineBuilder {
145145
self.ctx.clone(),
146146
&join.build_keys,
147147
join.build.output_schema()?,
148+
join.probe.output_schema()?,
148149
HashJoinDesc::create(join)?,
149150
)
150151
}

src/query/service/src/sql/planner/binder/join.rs

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -71,15 +71,15 @@ impl<'a> Binder {
7171
}
7272
}
7373
JoinOperator::RightOuter => {
74-
for column in right_context.all_column_bindings().iter() {
75-
bind_context.add_column_binding(column.clone());
76-
}
77-
7874
for column in left_context.all_column_bindings() {
7975
let mut nullable_column = column.clone();
8076
nullable_column.data_type = Box::new(wrap_nullable(&column.data_type));
8177
bind_context.add_column_binding(nullable_column);
8278
}
79+
80+
for column in right_context.all_column_bindings().iter() {
81+
bind_context.add_column_binding(column.clone());
82+
}
8383
}
8484
_ => {
8585
for column in left_context.all_column_bindings() {
@@ -145,12 +145,12 @@ impl<'a> Binder {
145145
right_child,
146146
),
147147
JoinOperator::RightOuter => self.bind_join_with_type(
148-
JoinType::Left,
149-
right_join_conditions,
148+
JoinType::Right,
150149
left_join_conditions,
150+
right_join_conditions,
151151
other_conditions,
152-
right_child,
153152
left_child,
153+
right_child,
154154
),
155155
JoinOperator::FullOuter => self.bind_join_with_type(
156156
JoinType::Full,

tests/logictest/suites/base/15_query/join.test

Lines changed: 4 additions & 103 deletions
Original file line numberDiff line numberDiff line change
@@ -129,12 +129,12 @@ insert into t2 values(1, 4), (2, 3), (6, 8);
129129

130130

131131
statement query IIII
132-
select * from t1 right join t2 on t1.a = t2.c;
132+
select * from t1 right join t2 on t1.a = t2.c order by t2.c;
133133

134134
----
135-
1 4 1 2
136-
2 3 NULL NULL
137-
6 8 NULL NULL
135+
1 2 1 4
136+
NULL NULL 2 3
137+
NULL NULL 6 8
138138

139139

140140
statement query IIII
@@ -322,105 +322,6 @@ select t.number from numbers(10000) as t inner join numbers(1000) as t1 on t.num
322322
statement ok
323323
drop table t;
324324

325-
statement ok
326-
drop table t1;
327-
328-
329-
statement ok
330-
drop table t2;
331-
332-
statement ok
333-
create table t1(a int, b int);
334-
335-
336-
statement ok
337-
create table t2(c int, d int);
338-
339-
340-
statement ok
341-
insert into t1 values(1, 2), (3 ,4), (7, 8);
342-
343-
344-
statement ok
345-
insert into t2 values(1, 4), (2, 3), (6, 8);
346-
347-
348-
statement query IIII
349-
select * from t1 right join t2 on t1.a = t2.c;
350-
351-
----
352-
1 4 1 2
353-
2 3 NULL NULL
354-
6 8 NULL NULL
355-
356-
357-
statement query IIII
358-
select * from t1 left join t2 on t1.a = t2.c;
359-
360-
----
361-
1 2 1 4
362-
3 4 NULL NULL
363-
7 8 NULL NULL
364-
365-
366-
statement query IIII
367-
select * from t1 left outer join t2 on t1.a = t2.c and t1.a > 3 order by a,b,c,d;
368-
369-
----
370-
1 2 NULL NULL
371-
3 4 NULL NULL
372-
7 8 NULL NULL
373-
374-
375-
statement query IIII
376-
select * from t1 left outer join t2 on t1.a = t2.c and t2.c > 4 order by a,b,c,d;
377-
378-
----
379-
1 2 NULL NULL
380-
3 4 NULL NULL
381-
7 8 NULL NULL
382-
383-
384-
statement query IIII
385-
select * from t1 left outer join t2 on t2.c > 4 and t1.a > 3 order by a,b,c,d;
386-
387-
----
388-
1 2 NULL NULL
389-
3 4 NULL NULL
390-
7 8 6 8
391-
392-
393-
statement query IIII
394-
select * from t1 left outer join t2 on t1.a > 3 order by a,b,c,d;
395-
396-
----
397-
1 2 NULL NULL
398-
3 4 NULL NULL
399-
7 8 1 4
400-
7 8 2 3
401-
7 8 6 8
402-
403-
404-
statement query IIII
405-
select * from t1 left outer join t2 on t2.c > 4 order by a,b,c,d;
406-
407-
----
408-
1 2 6 8
409-
3 4 6 8
410-
7 8 6 8
411-
412-
413-
statement query IIII
414-
select * from t1 left outer join t2 on t1.a > t2.c order by a,b,c,d;
415-
416-
----
417-
1 2 NULL NULL
418-
3 4 1 4
419-
3 4 2 3
420-
7 8 1 4
421-
7 8 2 3
422-
7 8 6 8
423-
424325
statement ok
425326
CREATE TABLE t3(c0 BIGINT NULL, c1 DOUBLE NULL);
426327

0 commit comments

Comments
 (0)